test_imaplib.py 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. from test import test_support as support
  2. # If we end up with a significant number of tests that don't require
  3. # threading, this test module should be split. Right now we skip
  4. # them all if we don't have threading.
  5. threading = support.import_module('threading')
  6. from contextlib import contextmanager
  7. import imaplib
  8. import os.path
  9. import SocketServer
  10. import time
  11. from test_support import reap_threads, verbose, transient_internet
  12. import unittest
  13. try:
  14. import ssl
  15. except ImportError:
  16. ssl = None
  17. CERTFILE = None
  18. class TestImaplib(unittest.TestCase):
  19. def test_that_Time2Internaldate_returns_a_result(self):
  20. # We can check only that it successfully produces a result,
  21. # not the correctness of the result itself, since the result
  22. # depends on the timezone the machine is in.
  23. timevalues = [2000000000, 2000000000.0, time.localtime(2000000000),
  24. '"18-May-2033 05:33:20 +0200"']
  25. for t in timevalues:
  26. imaplib.Time2Internaldate(t)
  27. if ssl:
  28. class SecureTCPServer(SocketServer.TCPServer):
  29. def get_request(self):
  30. newsocket, fromaddr = self.socket.accept()
  31. connstream = ssl.wrap_socket(newsocket,
  32. server_side=True,
  33. certfile=CERTFILE)
  34. return connstream, fromaddr
  35. IMAP4_SSL = imaplib.IMAP4_SSL
  36. else:
  37. class SecureTCPServer:
  38. pass
  39. IMAP4_SSL = None
  40. class SimpleIMAPHandler(SocketServer.StreamRequestHandler):
  41. timeout = 1
  42. def _send(self, message):
  43. if verbose: print "SENT:", message.strip()
  44. self.wfile.write(message)
  45. def handle(self):
  46. # Send a welcome message.
  47. self._send('* OK IMAP4rev1\r\n')
  48. while 1:
  49. # Gather up input until we receive a line terminator or we timeout.
  50. # Accumulate read(1) because it's simpler to handle the differences
  51. # between naked sockets and SSL sockets.
  52. line = ''
  53. while 1:
  54. try:
  55. part = self.rfile.read(1)
  56. if part == '':
  57. # Naked sockets return empty strings..
  58. return
  59. line += part
  60. except IOError:
  61. # ..but SSLSockets raise exceptions.
  62. return
  63. if line.endswith('\r\n'):
  64. break
  65. if verbose: print 'GOT:', line.strip()
  66. splitline = line.split()
  67. tag = splitline[0]
  68. cmd = splitline[1]
  69. args = splitline[2:]
  70. if hasattr(self, 'cmd_%s' % (cmd,)):
  71. getattr(self, 'cmd_%s' % (cmd,))(tag, args)
  72. else:
  73. self._send('%s BAD %s unknown\r\n' % (tag, cmd))
  74. def cmd_CAPABILITY(self, tag, args):
  75. self._send('* CAPABILITY IMAP4rev1\r\n')
  76. self._send('%s OK CAPABILITY completed\r\n' % (tag,))
  77. class BaseThreadedNetworkedTests(unittest.TestCase):
  78. def make_server(self, addr, hdlr):
  79. class MyServer(self.server_class):
  80. def handle_error(self, request, client_address):
  81. self.close_request(request)
  82. self.server_close()
  83. raise
  84. if verbose: print "creating server"
  85. server = MyServer(addr, hdlr)
  86. self.assertEqual(server.server_address, server.socket.getsockname())
  87. if verbose:
  88. print "server created"
  89. print "ADDR =", addr
  90. print "CLASS =", self.server_class
  91. print "HDLR =", server.RequestHandlerClass
  92. t = threading.Thread(
  93. name='%s serving' % self.server_class,
  94. target=server.serve_forever,
  95. # Short poll interval to make the test finish quickly.
  96. # Time between requests is short enough that we won't wake
  97. # up spuriously too many times.
  98. kwargs={'poll_interval':0.01})
  99. t.daemon = True # In case this function raises.
  100. t.start()
  101. if verbose: print "server running"
  102. return server, t
  103. def reap_server(self, server, thread):
  104. if verbose: print "waiting for server"
  105. server.shutdown()
  106. thread.join()
  107. if verbose: print "done"
  108. @contextmanager
  109. def reaped_server(self, hdlr):
  110. server, thread = self.make_server((support.HOST, 0), hdlr)
  111. try:
  112. yield server
  113. finally:
  114. self.reap_server(server, thread)
  115. @reap_threads
  116. def test_connect(self):
  117. with self.reaped_server(SimpleIMAPHandler) as server:
  118. client = self.imap_class(*server.server_address)
  119. client.shutdown()
  120. @reap_threads
  121. def test_issue5949(self):
  122. class EOFHandler(SocketServer.StreamRequestHandler):
  123. def handle(self):
  124. # EOF without sending a complete welcome message.
  125. self.wfile.write('* OK')
  126. with self.reaped_server(EOFHandler) as server:
  127. self.assertRaises(imaplib.IMAP4.abort,
  128. self.imap_class, *server.server_address)
  129. def test_linetoolong(self):
  130. class TooLongHandler(SimpleIMAPHandler):
  131. def handle(self):
  132. # Send a very long response line
  133. self.wfile.write('* OK ' + imaplib._MAXLINE*'x' + '\r\n')
  134. with self.reaped_server(TooLongHandler) as server:
  135. self.assertRaises(imaplib.IMAP4.error,
  136. self.imap_class, *server.server_address)
  137. class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
  138. server_class = SocketServer.TCPServer
  139. imap_class = imaplib.IMAP4
  140. @unittest.skipUnless(ssl, "SSL not available")
  141. class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
  142. server_class = SecureTCPServer
  143. imap_class = IMAP4_SSL
  144. def test_linetoolong(self):
  145. raise unittest.SkipTest("test is not reliable on 2.7; see issue 20118")
  146. class RemoteIMAPTest(unittest.TestCase):
  147. host = 'cyrus.andrew.cmu.edu'
  148. port = 143
  149. username = 'anonymous'
  150. password = 'pass'
  151. imap_class = imaplib.IMAP4
  152. def setUp(self):
  153. with transient_internet(self.host):
  154. self.server = self.imap_class(self.host, self.port)
  155. def tearDown(self):
  156. if self.server is not None:
  157. self.server.logout()
  158. def test_logincapa(self):
  159. self.assertTrue('LOGINDISABLED' in self.server.capabilities)
  160. def test_anonlogin(self):
  161. self.assertTrue('AUTH=ANONYMOUS' in self.server.capabilities)
  162. rs = self.server.login(self.username, self.password)
  163. self.assertEqual(rs[0], 'OK')
  164. def test_logout(self):
  165. rs = self.server.logout()
  166. self.server = None
  167. self.assertEqual(rs[0], 'BYE')
  168. @unittest.skipUnless(ssl, "SSL not available")
  169. class RemoteIMAP_SSLTest(RemoteIMAPTest):
  170. port = 993
  171. imap_class = IMAP4_SSL
  172. def test_logincapa(self):
  173. self.assertFalse('LOGINDISABLED' in self.server.capabilities)
  174. self.assertTrue('AUTH=PLAIN' in self.server.capabilities)
  175. def test_main():
  176. tests = [TestImaplib]
  177. if support.is_resource_enabled('network'):
  178. if ssl:
  179. global CERTFILE
  180. CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
  181. "keycert.pem")
  182. if not os.path.exists(CERTFILE):
  183. raise support.TestFailed("Can't read certificate files!")
  184. tests.extend([
  185. ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
  186. RemoteIMAPTest, RemoteIMAP_SSLTest,
  187. ])
  188. support.run_unittest(*tests)
  189. if __name__ == "__main__":
  190. test_main()