No Description
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

tests.py 9.9KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317
  1. import unittest
  2. from time import sleep
  3. import uuid
  4. import socket
  5. import requests
  6. import os
  7. TEST_SERVER = 'sovereign.local'
  8. TEST_ADDRESS = 'sovereign@sovereign.local'
  9. TEST_PASSWORD = 'foo'
  10. CA_BUNDLE = 'roles/common/files/wildcard_ca.pem'
  11. socket.setdefaulttimeout(5)
  12. os.environ['REQUESTS_CA_BUNDLE'] = CA_BUNDLE
  13. class SSHTests(unittest.TestCase):
  14. def test_ssh_banner(self):
  15. """SSH is responding with its banner"""
  16. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  17. s.connect((TEST_SERVER, 22))
  18. data = s.recv(1024)
  19. s.close()
  20. self.assertRegexpMatches(data, '^SSH-2.0-OpenSSH')
  21. class WebTests(unittest.TestCase):
  22. def test_blog_http(self):
  23. """Blog is redirecting to https"""
  24. # FIXME: requests won't verify sovereign.local with *.sovereign.local cert
  25. r = requests.get('http://' + TEST_SERVER, verify=False)
  26. # We should be redirected to https
  27. self.assertEquals(r.history[0].status_code, 301)
  28. self.assertEquals(r.url, 'https://' + TEST_SERVER + '/')
  29. # 403 - Since there is no documents in the blog directory
  30. self.assertEquals(r.status_code, 403)
  31. def test_webmail_http(self):
  32. """Webmail is redirecting to https and displaying login page"""
  33. r = requests.get('http://mail.' + TEST_SERVER)
  34. # We should be redirected to https
  35. self.assertEquals(r.history[0].status_code, 301)
  36. self.assertEquals(r.url, 'https://mail.' + TEST_SERVER + '/')
  37. # 200 - We should be at the login page
  38. self.assertEquals(r.status_code, 200)
  39. self.assertIn(
  40. 'Welcome to Roundcube Webmail',
  41. r.content
  42. )
  43. def test_owncloud_http(self):
  44. """ownCloud is redirecting to https and displaying login page"""
  45. r = requests.get('http://cloud.' + TEST_SERVER)
  46. # We should be redirected to https
  47. self.assertEquals(r.history[0].status_code, 301)
  48. self.assertEquals(r.url, 'https://cloud.' + TEST_SERVER + '/')
  49. # 200 - We should be at the login page
  50. self.assertEquals(r.status_code, 200)
  51. self.assertIn(
  52. 'ownCloud',
  53. r.content
  54. )
  55. def test_selfoss_http(self):
  56. """selfoss is redirecting to https and displaying login page"""
  57. r = requests.get('http://news.' + TEST_SERVER)
  58. # We should be redirected to https
  59. self.assertEquals(r.history[0].status_code, 301)
  60. self.assertEquals(r.url, 'https://news.' + TEST_SERVER + '/')
  61. # 200 - We should be at the login page
  62. self.assertEquals(r.status_code, 200)
  63. self.assertIn(
  64. 'selfoss',
  65. r.content
  66. )
  67. self.assertIn(
  68. 'login',
  69. r.content
  70. )
  71. def test_znc_http(self):
  72. """ZNC web interface is displaying login page"""
  73. # FIXME: requests won't verify sovereign.local with *.sovereign.local cert
  74. r = requests.get('https://' + TEST_SERVER + ':6697', verify=False)
  75. self.assertEquals(r.status_code, 200)
  76. self.assertIn(
  77. "Welcome to ZNC's web interface!",
  78. r.content
  79. )
  80. class IRCTests(unittest.TestCase):
  81. def test_irc_auth(self):
  82. """ZNC is accepting encrypted logins"""
  83. import ssl
  84. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  85. ssl_sock = ssl.wrap_socket(s, ca_certs=CA_BUNDLE, cert_reqs=ssl.CERT_REQUIRED)
  86. ssl_sock.connect((TEST_SERVER, 6697))
  87. # Check the encryption parameters
  88. cipher, version, bits = ssl_sock.cipher()
  89. self.assertEquals(cipher, 'AES256-SHA')
  90. self.assertEquals(version, 'TLSv1/SSLv3')
  91. self.assertEquals(bits, 256)
  92. # Login
  93. ssl_sock.send('CAP REQ sasl multi-prefix\r\n')
  94. ssl_sock.send('PASS foo\r\n')
  95. ssl_sock.send('NICK sovereign\r\n')
  96. ssl_sock.send('USER sovereign 0 * Sov\r\n')
  97. # Read until we see the ZNC banner (or timeout)
  98. while 1:
  99. r = ssl_sock.recv(1024)
  100. if 'Connected to ZNC' in r:
  101. break
  102. def new_message(from_email, to_email):
  103. """Creates an email (headers & body) with a random subject"""
  104. from email.mime.text import MIMEText
  105. msg = MIMEText('Testing')
  106. msg['Subject'] = uuid.uuid4().hex[:8]
  107. msg['From'] = from_email
  108. msg['To'] = to_email
  109. return msg.as_string(), msg['subject']
  110. class MailTests(unittest.TestCase):
  111. def assertEmailReceived(self, subject):
  112. """Connects with IMAP and asserts the existance of an email, then deletes it"""
  113. import imaplib
  114. sleep(1)
  115. # Login to IMAP
  116. m = imaplib.IMAP4_SSL(TEST_SERVER, 993)
  117. m.login(TEST_ADDRESS, TEST_PASSWORD)
  118. m.select()
  119. # Assert the message exist
  120. typ, data = m.search(None, '(SUBJECT \"{}\")'.format(subject))
  121. self.assertTrue(len(data[0].split()), 1)
  122. # Delete it & logout
  123. m.store(data[0].strip(), '+FLAGS', '\\Deleted')
  124. m.expunge()
  125. m.close()
  126. m.logout()
  127. def test_imap_requires_ssl(self):
  128. """IMAP without SSL is NOT available"""
  129. import imaplib
  130. with self.assertRaisesRegexp(socket.timeout, 'timed out'):
  131. imaplib.IMAP4(TEST_SERVER, 143)
  132. def test_smtps(self):
  133. """Email sent from an MUA via SMTPS is delivered"""
  134. import smtplib
  135. msg, subject = new_message(TEST_ADDRESS, 'root@sovereign.local')
  136. s = smtplib.SMTP_SSL(TEST_SERVER, 465)
  137. s.login(TEST_ADDRESS, TEST_PASSWORD)
  138. s.sendmail(TEST_ADDRESS, ['root@sovereign.local'], msg)
  139. s.quit()
  140. self.assertEmailReceived(subject)
  141. def test_smtps_requires_auth(self):
  142. """SMTPS with no authentication is rejected"""
  143. import smtplib
  144. s = smtplib.SMTP_SSL(TEST_SERVER, 465)
  145. with self.assertRaisesRegexp(smtplib.SMTPRecipientsRefused, 'Access denied'):
  146. s.sendmail(TEST_ADDRESS, ['root@sovereign.local'], 'Test')
  147. s.quit()
  148. def test_smtp(self):
  149. """Email sent from an MTA is delivered"""
  150. import smtplib
  151. msg, subject = new_message('someone@example.com', TEST_ADDRESS)
  152. s = smtplib.SMTP(TEST_SERVER, 25)
  153. s.sendmail('someone@example.com', [TEST_ADDRESS], msg)
  154. s.quit()
  155. self.assertEmailReceived(subject)
  156. def test_smtp_tls(self):
  157. """Email sent from an MTA via SMTP+TLS is delivered"""
  158. import smtplib
  159. msg, subject = new_message('someone@example.com', TEST_ADDRESS)
  160. s = smtplib.SMTP(TEST_SERVER, 25)
  161. s.starttls()
  162. s.sendmail('someone@example.com', [TEST_ADDRESS], msg)
  163. s.quit()
  164. self.assertEmailReceived(subject)
  165. def test_smtps_headers(self):
  166. """Email sent from an MUA has DKIM and TLS headers"""
  167. import smtplib
  168. import imaplib
  169. # Send a message to root
  170. msg, subject = new_message(TEST_ADDRESS, 'root@sovereign.local')
  171. s = smtplib.SMTP_SSL(TEST_SERVER, 465)
  172. s.login(TEST_ADDRESS, TEST_PASSWORD)
  173. s.sendmail(TEST_ADDRESS, ['root@sovereign.local'], msg)
  174. s.quit()
  175. sleep(1)
  176. # Get the message
  177. m = imaplib.IMAP4_SSL(TEST_SERVER, 993)
  178. m.login(TEST_ADDRESS, TEST_PASSWORD)
  179. m.select()
  180. _, res = m.search(None, '(SUBJECT \"{}\")'.format(subject))
  181. _, data = m.fetch(res[0], '(RFC822)')
  182. self.assertIn(
  183. 'DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=sovereign.local;',
  184. data[0][1]
  185. )
  186. self.assertIn(
  187. '(using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits))',
  188. data[0][1]
  189. )
  190. # Clean up
  191. m.store(res[0].strip(), '+FLAGS', '\\Deleted')
  192. m.expunge()
  193. m.close()
  194. m.logout()
  195. def test_smtp_headers(self):
  196. """Email sent from an MTA via SMTP+TLS has X-DSPAM and TLS headers"""
  197. import smtplib
  198. import imaplib
  199. # Send a message to root
  200. msg, subject = new_message('someone@example.com', TEST_ADDRESS)
  201. s = smtplib.SMTP(TEST_SERVER, 25)
  202. s.starttls()
  203. s.sendmail('someone@example.com', [TEST_ADDRESS], msg)
  204. s.quit()
  205. sleep(1)
  206. # Get the message
  207. m = imaplib.IMAP4_SSL(TEST_SERVER, 993)
  208. m.login(TEST_ADDRESS, TEST_PASSWORD)
  209. m.select()
  210. _, res = m.search(None, '(SUBJECT \"{}\")'.format(subject))
  211. _, data = m.fetch(res[0], '(RFC822)')
  212. self.assertIn(
  213. 'X-DSPAM-Result: Innocent',
  214. data[0][1]
  215. )
  216. self.assertIn(
  217. '(using TLSv1 with cipher DHE-RSA-AES256-SHA (256/256 bits))',
  218. data[0][1]
  219. )
  220. # Clean up
  221. m.store(res[0].strip(), '+FLAGS', '\\Deleted')
  222. m.expunge()
  223. m.close()
  224. m.logout()
  225. class XMPPTests(unittest.TestCase):
  226. def test_xmpp_c2s(self):
  227. """Prosody is listening on 5222 for clients and requiring TLS"""
  228. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  229. s.connect((TEST_SERVER, 5222))
  230. # Based off http://wiki.xmpp.org/web/Programming_Jabber_Clients
  231. s.send("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
  232. "xmlns='jabber:client' to='sovereign.local' version='1.0'>")
  233. data = s.recv(1024)
  234. s.close()
  235. self.assertRegexpMatches(
  236. data,
  237. "<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'><required/></starttls>"
  238. )
  239. def test_xmpp_s2s(self):
  240. """Prosody is listening on 5269 for servers"""
  241. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  242. s.connect((TEST_SERVER, 5269))
  243. # Base off http://xmpp.org/extensions/xep-0114.html
  244. s.send("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
  245. "xmlns='jabber:component:accept' to='sovereign.local'>")
  246. data = s.recv(1024)
  247. s.close()
  248. self.assertRegexpMatches(
  249. data,
  250. "from='sovereign.local'"
  251. )