Ei kuvausta
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

tests.py 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. import unittest
  2. from time import sleep
  3. import uuid
  4. import socket
  5. import requests
  6. import os
  7. TEST_SERVER = 'sovereign.local'
  8. TEST_ADDRESS = 'sovereign@sovereign.local'
  9. TEST_PASSWORD = 'foo'
  10. CA_BUNDLE = 'roles/common/files/wildcard_ca.pem'
  11. socket.setdefaulttimeout(5)
  12. os.environ['REQUESTS_CA_BUNDLE'] = CA_BUNDLE
  13. class SSHTests(unittest.TestCase):
  14. def test_ssh_banner(self):
  15. """SSH is responding with its banner"""
  16. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  17. s.connect((TEST_SERVER, 22))
  18. data = s.recv(1024)
  19. s.close()
  20. self.assertRegexpMatches(data, '^SSH-2.0-OpenSSH')
  21. class WebTests(unittest.TestCase):
  22. def test_blog_http(self):
  23. """Blog is redirecting to https"""
  24. # FIXME: requests won't verify sovereign.local with *.sovereign.local cert
  25. r = requests.get('http://' + TEST_SERVER, verify=False)
  26. # We should be redirected to https
  27. self.assertEquals(r.history[0].status_code, 301)
  28. self.assertEquals(r.url, 'https://' + TEST_SERVER + '/')
  29. # 403 - Since there is no documents in the blog directory
  30. self.assertEquals(r.status_code, 403)
  31. def test_mail_autoconfig_http_and_https(self):
  32. """Email autoconfiguration XML file is accessible over both HTTP and HTTPS"""
  33. # Test getting the file over HTTP and HTTPS
  34. for proto in ['http', 'https']:
  35. r = requests.get(proto + '://autoconfig.' + TEST_SERVER + '/mail/config-v1.1.xml')
  36. # 200 - We should see the XML file
  37. self.assertEquals(r.status_code, 200)
  38. self.assertIn('application/xml', r.headers['Content-Type'])
  39. self.assertIn('clientConfig version="1.1"', r.content)
  40. def test_webmail_http(self):
  41. """Webmail is redirecting to https and displaying login page"""
  42. r = requests.get('http://mail.' + TEST_SERVER)
  43. # We should be redirected to https
  44. self.assertEquals(r.history[0].status_code, 301)
  45. self.assertEquals(r.url, 'https://mail.' + TEST_SERVER + '/')
  46. # 200 - We should be at the login page
  47. self.assertEquals(r.status_code, 200)
  48. self.assertIn(
  49. 'Welcome to Roundcube Webmail',
  50. r.content
  51. )
  52. def test_zpush_http_unauthorized(self):
  53. r = requests.get('http://mail.' + TEST_SERVER + '/Microsoft-Server-ActiveSync')
  54. # We should be redirected to https
  55. self.assertEquals(r.history[0].status_code, 301)
  56. self.assertEquals(r.url, 'https://mail.' + TEST_SERVER + '/Microsoft-Server-ActiveSync')
  57. # Unauthorized
  58. self.assertEquals(r.status_code, 401)
  59. def test_zpush_https(self):
  60. r = requests.post('https://mail.' + TEST_SERVER + '/Microsoft-Server-ActiveSync',
  61. auth=('sovereign@sovereign.local', 'foo'),
  62. params={
  63. 'DeviceId': '1234',
  64. 'DeviceType': 'testbot',
  65. 'Cmd': 'Ping',
  66. })
  67. self.assertEquals(r.headers['content-type'],
  68. 'application/vnd.ms-sync.wbxml')
  69. self.assertEquals(r.headers['ms-server-activesync'],
  70. '14.0')
  71. def test_owncloud_http(self):
  72. """ownCloud is redirecting to https and displaying login page"""
  73. r = requests.get('http://cloud.' + TEST_SERVER)
  74. # We should be redirected to https
  75. self.assertEquals(r.history[0].status_code, 301)
  76. self.assertEquals(r.url, 'https://cloud.' + TEST_SERVER + '/')
  77. # 200 - We should be at the login page
  78. self.assertEquals(r.status_code, 200)
  79. self.assertIn(
  80. 'ownCloud',
  81. r.content
  82. )
  83. def test_selfoss_http(self):
  84. """selfoss is redirecting to https and displaying login page"""
  85. r = requests.get('http://news.' + TEST_SERVER)
  86. # We should be redirected to https
  87. self.assertEquals(r.history[0].status_code, 301)
  88. self.assertEquals(r.url, 'https://news.' + TEST_SERVER + '/')
  89. # 200 - We should be at the login page
  90. self.assertEquals(r.status_code, 200)
  91. self.assertIn(
  92. 'selfoss',
  93. r.content
  94. )
  95. self.assertIn(
  96. 'login',
  97. r.content
  98. )
  99. def test_cgit_http(self):
  100. """CGit web interface is displaying home page"""
  101. r = requests.get('http://git.' + TEST_SERVER, verify=False)
  102. # We should be redirected to https
  103. self.assertEquals(r.history[0].status_code, 301)
  104. self.assertEquals(r.url, 'https://git.' + TEST_SERVER + '/')
  105. # 200 - We should be at the repository page
  106. self.assertEquals(r.status_code, 200)
  107. self.assertIn(
  108. 'git repository',
  109. r.content
  110. )
  111. def test_newebe_http(self):
  112. """Newebe is displaying home page"""
  113. r = requests.get('http://newebe.' + TEST_SERVER, verify=False)
  114. # We should be redirected to https
  115. self.assertEquals(r.history[0].status_code, 301)
  116. self.assertEquals(r.url, 'https://newebe.' + TEST_SERVER + '/')
  117. # 200 - We should be at the repository page
  118. self.assertEquals(r.status_code, 200)
  119. self.assertIn(
  120. 'Newebe, Freedom to Share',
  121. r.content
  122. )
  123. class IRCTests(unittest.TestCase):
  124. def test_irc_auth(self):
  125. """ZNC is accepting encrypted logins"""
  126. import ssl
  127. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  128. ssl_sock = ssl.wrap_socket(s, ca_certs=CA_BUNDLE, cert_reqs=ssl.CERT_REQUIRED)
  129. ssl_sock.connect((TEST_SERVER, 6697))
  130. # Check the encryption parameters
  131. cipher, version, bits = ssl_sock.cipher()
  132. self.assertEquals(cipher, 'AES256-GCM-SHA384')
  133. self.assertEquals(version, 'TLSv1/SSLv3')
  134. self.assertEquals(bits, 256)
  135. # Login
  136. ssl_sock.send('CAP REQ sasl multi-prefix\r\n')
  137. ssl_sock.send('PASS foo\r\n')
  138. ssl_sock.send('NICK sovereign\r\n')
  139. ssl_sock.send('USER sovereign 0 * Sov\r\n')
  140. # Read until we see the ZNC banner (or timeout)
  141. while 1:
  142. r = ssl_sock.recv(1024)
  143. if 'Connected to ZNC' in r:
  144. break
  145. def new_message(from_email, to_email):
  146. """Creates an email (headers & body) with a random subject"""
  147. from email.mime.text import MIMEText
  148. msg = MIMEText('Testing')
  149. msg['Subject'] = uuid.uuid4().hex[:8]
  150. msg['From'] = from_email
  151. msg['To'] = to_email
  152. return msg.as_string(), msg['subject']
  153. class MailTests(unittest.TestCase):
  154. def assertIMAPReceived(self, subject):
  155. """Connects with IMAP and asserts the existence of an email, then deletes it"""
  156. import imaplib
  157. sleep(1)
  158. # Login to IMAP
  159. m = imaplib.IMAP4_SSL(TEST_SERVER, 993)
  160. m.login(TEST_ADDRESS, TEST_PASSWORD)
  161. m.select()
  162. # Assert the message exists
  163. typ, data = m.search(None, '(SUBJECT \"{}\")'.format(subject))
  164. self.assertTrue(len(data[0].split()), 1)
  165. # Delete it & logout
  166. m.store(data[0].strip(), '+FLAGS', '\\Deleted')
  167. m.expunge()
  168. m.close()
  169. m.logout()
  170. def assertPOP3Received(self, subject):
  171. """Connects with POP3S and asserts the existence of an email, then deletes it"""
  172. import poplib
  173. sleep(1)
  174. # Login to POP3
  175. mail = poplib.POP3_SSL(TEST_SERVER, 995)
  176. mail.user(TEST_ADDRESS)
  177. mail.pass_(TEST_PASSWORD)
  178. # Assert the message exists
  179. num = len(mail.list()[1])
  180. resp, text, octets = mail.retr(num)
  181. self.assertTrue("Subject: " + subject in text)
  182. # Delete it and log out
  183. mail.dele(num)
  184. mail.quit()
  185. def test_imap_requires_ssl(self):
  186. """IMAP without SSL is NOT available"""
  187. import imaplib
  188. with self.assertRaisesRegexp(socket.timeout, 'timed out'):
  189. imaplib.IMAP4(TEST_SERVER, 143)
  190. def test_pop3_requires_ssl(self):
  191. """POP3 without SSL is NOT available"""
  192. import poplib
  193. with self.assertRaisesRegexp(socket.timeout, 'timed out'):
  194. poplib.POP3(TEST_SERVER, 110)
  195. def test_smtps(self):
  196. """Email sent from an MUA via SMTPS is delivered"""
  197. import smtplib
  198. msg, subject = new_message(TEST_ADDRESS, 'root@sovereign.local')
  199. s = smtplib.SMTP_SSL(TEST_SERVER, 465)
  200. s.login(TEST_ADDRESS, TEST_PASSWORD)
  201. s.sendmail(TEST_ADDRESS, ['root@sovereign.local'], msg)
  202. s.quit()
  203. self.assertIMAPReceived(subject)
  204. def test_smtps_delimiter_to(self):
  205. """Email sent to address with delimiter is delivered"""
  206. import smtplib
  207. msg, subject = new_message(TEST_ADDRESS, 'root+foo@sovereign.local')
  208. s = smtplib.SMTP_SSL(TEST_SERVER, 465)
  209. s.login(TEST_ADDRESS, TEST_PASSWORD)
  210. s.sendmail(TEST_ADDRESS, ['root+foo@sovereign.local'], msg)
  211. s.quit()
  212. self.assertIMAPReceived(subject)
  213. def test_smtps_requires_auth(self):
  214. """SMTPS with no authentication is rejected"""
  215. import smtplib
  216. s = smtplib.SMTP_SSL(TEST_SERVER, 465)
  217. with self.assertRaisesRegexp(smtplib.SMTPRecipientsRefused, 'Access denied'):
  218. s.sendmail(TEST_ADDRESS, ['root@sovereign.local'], 'Test')
  219. s.quit()
  220. def test_smtp(self):
  221. """Email sent from an MTA is delivered"""
  222. import smtplib
  223. msg, subject = new_message('someone@example.com', TEST_ADDRESS)
  224. s = smtplib.SMTP(TEST_SERVER, 25)
  225. s.sendmail('someone@example.com', [TEST_ADDRESS], msg)
  226. s.quit()
  227. self.assertIMAPReceived(subject)
  228. def test_smtp_tls(self):
  229. """Email sent from an MTA via SMTP+TLS is delivered"""
  230. import smtplib
  231. msg, subject = new_message('someone@example.com', TEST_ADDRESS)
  232. s = smtplib.SMTP(TEST_SERVER, 25)
  233. s.starttls()
  234. s.sendmail('someone@example.com', [TEST_ADDRESS], msg)
  235. s.quit()
  236. self.assertIMAPReceived(subject)
  237. def test_smtps_headers(self):
  238. """Email sent from an MUA has DKIM and TLS headers"""
  239. import smtplib
  240. import imaplib
  241. # Send a message to root
  242. msg, subject = new_message(TEST_ADDRESS, 'root@sovereign.local')
  243. s = smtplib.SMTP_SSL(TEST_SERVER, 465)
  244. s.login(TEST_ADDRESS, TEST_PASSWORD)
  245. s.sendmail(TEST_ADDRESS, ['root@sovereign.local'], msg)
  246. s.quit()
  247. sleep(1)
  248. # Get the message
  249. m = imaplib.IMAP4_SSL(TEST_SERVER, 993)
  250. m.login(TEST_ADDRESS, TEST_PASSWORD)
  251. m.select()
  252. _, res = m.search(None, '(SUBJECT \"{}\")'.format(subject))
  253. _, data = m.fetch(res[0], '(RFC822)')
  254. self.assertIn(
  255. 'DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=sovereign.local;',
  256. data[0][1]
  257. )
  258. self.assertIn(
  259. 'ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)',
  260. data[0][1]
  261. )
  262. # Clean up
  263. m.store(res[0].strip(), '+FLAGS', '\\Deleted')
  264. m.expunge()
  265. m.close()
  266. m.logout()
  267. def test_smtp_headers(self):
  268. """Email sent from an MTA via SMTP+TLS has X-DSPAM and TLS headers"""
  269. import smtplib
  270. import imaplib
  271. # Send a message to root
  272. msg, subject = new_message('someone@example.com', TEST_ADDRESS)
  273. s = smtplib.SMTP(TEST_SERVER, 25)
  274. s.starttls()
  275. s.sendmail('someone@example.com', [TEST_ADDRESS], msg)
  276. s.quit()
  277. sleep(1)
  278. # Get the message
  279. m = imaplib.IMAP4_SSL(TEST_SERVER, 993)
  280. m.login(TEST_ADDRESS, TEST_PASSWORD)
  281. m.select()
  282. _, res = m.search(None, '(SUBJECT \"{}\")'.format(subject))
  283. _, data = m.fetch(res[0], '(RFC822)')
  284. self.assertIn(
  285. 'X-DSPAM-Result: ',
  286. data[0][1]
  287. )
  288. self.assertIn(
  289. 'ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)',
  290. data[0][1]
  291. )
  292. # Clean up
  293. m.store(res[0].strip(), '+FLAGS', '\\Deleted')
  294. m.expunge()
  295. m.close()
  296. m.logout()
  297. def test_pop3s(self):
  298. """Connects with POP3S and asserts the existance of an email, then deletes it"""
  299. import smtplib
  300. msg, subject = new_message(TEST_ADDRESS, 'root@sovereign.local')
  301. s = smtplib.SMTP_SSL(TEST_SERVER, 465)
  302. s.login(TEST_ADDRESS, TEST_PASSWORD)
  303. s.sendmail(TEST_ADDRESS, ['root@sovereign.local'], msg)
  304. s.quit()
  305. self.assertPOP3Received(subject)
  306. class XMPPTests(unittest.TestCase):
  307. def test_xmpp_c2s(self):
  308. """Prosody is listening on 5222 for clients and requiring TLS"""
  309. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  310. s.connect((TEST_SERVER, 5222))
  311. # Based off http://wiki.xmpp.org/web/Programming_Jabber_Clients
  312. s.send("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
  313. "xmlns='jabber:client' to='sovereign.local' version='1.0'>")
  314. data = s.recv(1024)
  315. s.close()
  316. self.assertRegexpMatches(
  317. data,
  318. "<starttls xmlns='urn:ietf:params:xml:ns:xmpp-tls'><required/></starttls>"
  319. )
  320. def test_xmpp_s2s(self):
  321. """Prosody is listening on 5269 for servers"""
  322. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  323. s.connect((TEST_SERVER, 5269))
  324. # Base off http://xmpp.org/extensions/xep-0114.html
  325. s.send("<stream:stream xmlns:stream='http://etherx.jabber.org/streams' "
  326. "xmlns='jabber:component:accept' to='sovereign.local'>")
  327. data = s.recv(1024)
  328. s.close()
  329. self.assertRegexpMatches(
  330. data,
  331. "from='sovereign.local'"
  332. )