test_poplib.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. """Test script for poplib module."""
  2. # Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL
  3. # a real test suite
  4. import poplib
  5. import asyncore
  6. import asynchat
  7. import socket
  8. import os
  9. import time
  10. import errno
  11. from unittest import TestCase, skipUnless
  12. from test import test_support
  13. from test.test_support import HOST
  14. threading = test_support.import_module('threading')
  15. # the dummy data returned by server when LIST and RETR commands are issued
  16. LIST_RESP = '1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n'
  17. RETR_RESP = """From: postmaster@python.org\
  18. \r\nContent-Type: text/plain\r\n\
  19. MIME-Version: 1.0\r\n\
  20. Subject: Dummy\r\n\
  21. \r\n\
  22. line1\r\n\
  23. line2\r\n\
  24. line3\r\n\
  25. .\r\n"""
  26. class DummyPOP3Handler(asynchat.async_chat):
  27. def __init__(self, conn):
  28. asynchat.async_chat.__init__(self, conn)
  29. self.set_terminator("\r\n")
  30. self.in_buffer = []
  31. self.push('+OK dummy pop3 server ready.')
  32. def collect_incoming_data(self, data):
  33. self.in_buffer.append(data)
  34. def found_terminator(self):
  35. line = ''.join(self.in_buffer)
  36. self.in_buffer = []
  37. cmd = line.split(' ')[0].lower()
  38. space = line.find(' ')
  39. if space != -1:
  40. arg = line[space + 1:]
  41. else:
  42. arg = ""
  43. if hasattr(self, 'cmd_' + cmd):
  44. method = getattr(self, 'cmd_' + cmd)
  45. method(arg)
  46. else:
  47. self.push('-ERR unrecognized POP3 command "%s".' %cmd)
  48. def handle_error(self):
  49. raise
  50. def push(self, data):
  51. asynchat.async_chat.push(self, data + '\r\n')
  52. def cmd_echo(self, arg):
  53. # sends back the received string (used by the test suite)
  54. self.push(arg)
  55. def cmd_user(self, arg):
  56. if arg != "guido":
  57. self.push("-ERR no such user")
  58. self.push('+OK password required')
  59. def cmd_pass(self, arg):
  60. if arg != "python":
  61. self.push("-ERR wrong password")
  62. self.push('+OK 10 messages')
  63. def cmd_stat(self, arg):
  64. self.push('+OK 10 100')
  65. def cmd_list(self, arg):
  66. if arg:
  67. self.push('+OK %s %s' %(arg, arg))
  68. else:
  69. self.push('+OK')
  70. asynchat.async_chat.push(self, LIST_RESP)
  71. cmd_uidl = cmd_list
  72. def cmd_retr(self, arg):
  73. self.push('+OK %s bytes' %len(RETR_RESP))
  74. asynchat.async_chat.push(self, RETR_RESP)
  75. cmd_top = cmd_retr
  76. def cmd_dele(self, arg):
  77. self.push('+OK message marked for deletion.')
  78. def cmd_noop(self, arg):
  79. self.push('+OK done nothing.')
  80. def cmd_rpop(self, arg):
  81. self.push('+OK done nothing.')
  82. class DummyPOP3Server(asyncore.dispatcher, threading.Thread):
  83. handler = DummyPOP3Handler
  84. def __init__(self, address, af=socket.AF_INET):
  85. threading.Thread.__init__(self)
  86. asyncore.dispatcher.__init__(self)
  87. self.create_socket(af, socket.SOCK_STREAM)
  88. self.bind(address)
  89. self.listen(5)
  90. self.active = False
  91. self.active_lock = threading.Lock()
  92. self.host, self.port = self.socket.getsockname()[:2]
  93. def start(self):
  94. assert not self.active
  95. self.__flag = threading.Event()
  96. threading.Thread.start(self)
  97. self.__flag.wait()
  98. def run(self):
  99. self.active = True
  100. self.__flag.set()
  101. while self.active and asyncore.socket_map:
  102. self.active_lock.acquire()
  103. asyncore.loop(timeout=0.1, count=1)
  104. self.active_lock.release()
  105. asyncore.close_all(ignore_all=True)
  106. def stop(self):
  107. assert self.active
  108. self.active = False
  109. self.join()
  110. def handle_accept(self):
  111. conn, addr = self.accept()
  112. self.handler = self.handler(conn)
  113. self.close()
  114. def handle_connect(self):
  115. self.close()
  116. handle_read = handle_connect
  117. def writable(self):
  118. return 0
  119. def handle_error(self):
  120. raise
  121. class TestPOP3Class(TestCase):
  122. def assertOK(self, resp):
  123. self.assertTrue(resp.startswith("+OK"))
  124. def setUp(self):
  125. self.server = DummyPOP3Server((HOST, 0))
  126. self.server.start()
  127. self.client = poplib.POP3(self.server.host, self.server.port)
  128. def tearDown(self):
  129. self.client.quit()
  130. self.server.stop()
  131. def test_getwelcome(self):
  132. self.assertEqual(self.client.getwelcome(), '+OK dummy pop3 server ready.')
  133. def test_exceptions(self):
  134. self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err')
  135. def test_user(self):
  136. self.assertOK(self.client.user('guido'))
  137. self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
  138. def test_pass_(self):
  139. self.assertOK(self.client.pass_('python'))
  140. self.assertRaises(poplib.error_proto, self.client.user, 'invalid')
  141. def test_stat(self):
  142. self.assertEqual(self.client.stat(), (10, 100))
  143. def test_list(self):
  144. self.assertEqual(self.client.list()[1:],
  145. (['1 1', '2 2', '3 3', '4 4', '5 5'], 25))
  146. self.assertTrue(self.client.list('1').endswith("OK 1 1"))
  147. def test_retr(self):
  148. expected = ('+OK 116 bytes',
  149. ['From: postmaster@python.org', 'Content-Type: text/plain',
  150. 'MIME-Version: 1.0', 'Subject: Dummy',
  151. '', 'line1', 'line2', 'line3'],
  152. 113)
  153. self.assertEqual(self.client.retr('foo'), expected)
  154. def test_too_long_lines(self):
  155. self.assertRaises(poplib.error_proto, self.client._shortcmd,
  156. 'echo +%s' % ((poplib._MAXLINE + 10) * 'a'))
  157. def test_dele(self):
  158. self.assertOK(self.client.dele('foo'))
  159. def test_noop(self):
  160. self.assertOK(self.client.noop())
  161. def test_rpop(self):
  162. self.assertOK(self.client.rpop('foo'))
  163. def test_top(self):
  164. expected = ('+OK 116 bytes',
  165. ['From: postmaster@python.org', 'Content-Type: text/plain',
  166. 'MIME-Version: 1.0', 'Subject: Dummy', '',
  167. 'line1', 'line2', 'line3'],
  168. 113)
  169. self.assertEqual(self.client.top(1, 1), expected)
  170. def test_uidl(self):
  171. self.client.uidl()
  172. self.client.uidl('foo')
  173. SUPPORTS_SSL = False
  174. if hasattr(poplib, 'POP3_SSL'):
  175. import ssl
  176. SUPPORTS_SSL = True
  177. CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem")
  178. class DummyPOP3_SSLHandler(DummyPOP3Handler):
  179. def __init__(self, conn):
  180. asynchat.async_chat.__init__(self, conn)
  181. self.socket = ssl.wrap_socket(self.socket, certfile=CERTFILE,
  182. server_side=True,
  183. do_handshake_on_connect=False)
  184. # Must try handshake before calling push()
  185. self._ssl_accepting = True
  186. self._do_ssl_handshake()
  187. self.set_terminator("\r\n")
  188. self.in_buffer = []
  189. self.push('+OK dummy pop3 server ready.')
  190. def _do_ssl_handshake(self):
  191. try:
  192. self.socket.do_handshake()
  193. except ssl.SSLError, err:
  194. if err.args[0] in (ssl.SSL_ERROR_WANT_READ,
  195. ssl.SSL_ERROR_WANT_WRITE):
  196. return
  197. elif err.args[0] == ssl.SSL_ERROR_EOF:
  198. return self.handle_close()
  199. raise
  200. except socket.error, err:
  201. if err.args[0] == errno.ECONNABORTED:
  202. return self.handle_close()
  203. else:
  204. self._ssl_accepting = False
  205. def handle_read(self):
  206. if self._ssl_accepting:
  207. self._do_ssl_handshake()
  208. else:
  209. DummyPOP3Handler.handle_read(self)
  210. requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported')
  211. @requires_ssl
  212. class TestPOP3_SSLClass(TestPOP3Class):
  213. # repeat previous tests by using poplib.POP3_SSL
  214. def setUp(self):
  215. self.server = DummyPOP3Server((HOST, 0))
  216. self.server.handler = DummyPOP3_SSLHandler
  217. self.server.start()
  218. self.client = poplib.POP3_SSL(self.server.host, self.server.port)
  219. def test__all__(self):
  220. self.assertIn('POP3_SSL', poplib.__all__)
  221. class TestTimeouts(TestCase):
  222. def setUp(self):
  223. self.evt = threading.Event()
  224. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  225. self.sock.settimeout(60) # Safety net. Look issue 11812
  226. self.port = test_support.bind_port(self.sock)
  227. self.thread = threading.Thread(target=self.server, args=(self.evt,self.sock))
  228. self.thread.setDaemon(True)
  229. self.thread.start()
  230. self.evt.wait()
  231. def tearDown(self):
  232. self.thread.join()
  233. del self.thread # Clear out any dangling Thread objects.
  234. def server(self, evt, serv):
  235. serv.listen(5)
  236. evt.set()
  237. try:
  238. conn, addr = serv.accept()
  239. conn.send("+ Hola mundo\n")
  240. conn.close()
  241. except socket.timeout:
  242. pass
  243. finally:
  244. serv.close()
  245. def testTimeoutDefault(self):
  246. self.assertIsNone(socket.getdefaulttimeout())
  247. socket.setdefaulttimeout(30)
  248. try:
  249. pop = poplib.POP3(HOST, self.port)
  250. finally:
  251. socket.setdefaulttimeout(None)
  252. self.assertEqual(pop.sock.gettimeout(), 30)
  253. pop.sock.close()
  254. def testTimeoutNone(self):
  255. self.assertIsNone(socket.getdefaulttimeout())
  256. socket.setdefaulttimeout(30)
  257. try:
  258. pop = poplib.POP3(HOST, self.port, timeout=None)
  259. finally:
  260. socket.setdefaulttimeout(None)
  261. self.assertIsNone(pop.sock.gettimeout())
  262. pop.sock.close()
  263. def testTimeoutValue(self):
  264. pop = poplib.POP3(HOST, self.port, timeout=30)
  265. self.assertEqual(pop.sock.gettimeout(), 30)
  266. pop.sock.close()
  267. def test_main():
  268. tests = [TestPOP3Class, TestTimeouts,
  269. TestPOP3_SSLClass]
  270. thread_info = test_support.threading_setup()
  271. try:
  272. test_support.run_unittest(*tests)
  273. finally:
  274. test_support.threading_cleanup(*thread_info)
  275. if __name__ == '__main__':
  276. test_main()