test_asynchat.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. # test asynchat
  2. import errno
  3. import asyncore
  4. import asynchat
  5. import socket
  6. import time
  7. import unittest
  8. import sys
  9. from test import test_support
  10. try:
  11. import threading
  12. except ImportError:
  13. threading = None
  14. HOST = test_support.HOST
  15. SERVER_QUIT = 'QUIT\n'
  16. if threading:
  17. class echo_server(threading.Thread):
  18. # parameter to determine the number of bytes passed back to the
  19. # client each send
  20. chunk_size = 1
  21. def __init__(self, event):
  22. threading.Thread.__init__(self)
  23. self.event = event
  24. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  25. self.port = test_support.bind_port(self.sock)
  26. # This will be set if the client wants us to wait before echoing data
  27. # back.
  28. self.start_resend_event = None
  29. def run(self):
  30. self.sock.listen(1)
  31. self.event.set()
  32. conn, client = self.sock.accept()
  33. self.buffer = ""
  34. # collect data until quit message is seen
  35. while SERVER_QUIT not in self.buffer:
  36. data = conn.recv(1)
  37. if not data:
  38. break
  39. self.buffer = self.buffer + data
  40. # remove the SERVER_QUIT message
  41. self.buffer = self.buffer.replace(SERVER_QUIT, '')
  42. if self.start_resend_event:
  43. self.start_resend_event.wait()
  44. # re-send entire set of collected data
  45. try:
  46. # this may fail on some tests, such as test_close_when_done, since
  47. # the client closes the channel when it's done sending
  48. while self.buffer:
  49. n = conn.send(self.buffer[:self.chunk_size])
  50. time.sleep(0.001)
  51. self.buffer = self.buffer[n:]
  52. except:
  53. pass
  54. conn.close()
  55. self.sock.close()
  56. class echo_client(asynchat.async_chat):
  57. def __init__(self, terminator, server_port):
  58. asynchat.async_chat.__init__(self)
  59. self.contents = []
  60. self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
  61. self.connect((HOST, server_port))
  62. self.set_terminator(terminator)
  63. self.buffer = ''
  64. def handle_connect(self):
  65. pass
  66. if sys.platform == 'darwin':
  67. # select.poll returns a select.POLLHUP at the end of the tests
  68. # on darwin, so just ignore it
  69. def handle_expt(self):
  70. pass
  71. def collect_incoming_data(self, data):
  72. self.buffer += data
  73. def found_terminator(self):
  74. self.contents.append(self.buffer)
  75. self.buffer = ""
  76. def start_echo_server():
  77. event = threading.Event()
  78. s = echo_server(event)
  79. s.start()
  80. event.wait()
  81. event.clear()
  82. time.sleep(0.01) # Give server time to start accepting.
  83. return s, event
  84. @unittest.skipUnless(threading, 'Threading required for this test.')
  85. class TestAsynchat(unittest.TestCase):
  86. usepoll = False
  87. def setUp (self):
  88. self._threads = test_support.threading_setup()
  89. def tearDown (self):
  90. test_support.threading_cleanup(*self._threads)
  91. def line_terminator_check(self, term, server_chunk):
  92. event = threading.Event()
  93. s = echo_server(event)
  94. s.chunk_size = server_chunk
  95. s.start()
  96. event.wait()
  97. event.clear()
  98. time.sleep(0.01) # Give server time to start accepting.
  99. c = echo_client(term, s.port)
  100. c.push("hello ")
  101. c.push("world%s" % term)
  102. c.push("I'm not dead yet!%s" % term)
  103. c.push(SERVER_QUIT)
  104. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  105. s.join()
  106. self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
  107. # the line terminator tests below check receiving variously-sized
  108. # chunks back from the server in order to exercise all branches of
  109. # async_chat.handle_read
  110. def test_line_terminator1(self):
  111. # test one-character terminator
  112. for l in (1,2,3):
  113. self.line_terminator_check('\n', l)
  114. def test_line_terminator2(self):
  115. # test two-character terminator
  116. for l in (1,2,3):
  117. self.line_terminator_check('\r\n', l)
  118. def test_line_terminator3(self):
  119. # test three-character terminator
  120. for l in (1,2,3):
  121. self.line_terminator_check('qqq', l)
  122. def numeric_terminator_check(self, termlen):
  123. # Try reading a fixed number of bytes
  124. s, event = start_echo_server()
  125. c = echo_client(termlen, s.port)
  126. data = "hello world, I'm not dead yet!\n"
  127. c.push(data)
  128. c.push(SERVER_QUIT)
  129. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  130. s.join()
  131. self.assertEqual(c.contents, [data[:termlen]])
  132. def test_numeric_terminator1(self):
  133. # check that ints & longs both work (since type is
  134. # explicitly checked in async_chat.handle_read)
  135. self.numeric_terminator_check(1)
  136. self.numeric_terminator_check(1L)
  137. def test_numeric_terminator2(self):
  138. self.numeric_terminator_check(6L)
  139. def test_none_terminator(self):
  140. # Try reading a fixed number of bytes
  141. s, event = start_echo_server()
  142. c = echo_client(None, s.port)
  143. data = "hello world, I'm not dead yet!\n"
  144. c.push(data)
  145. c.push(SERVER_QUIT)
  146. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  147. s.join()
  148. self.assertEqual(c.contents, [])
  149. self.assertEqual(c.buffer, data)
  150. def test_simple_producer(self):
  151. s, event = start_echo_server()
  152. c = echo_client('\n', s.port)
  153. data = "hello world\nI'm not dead yet!\n"
  154. p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
  155. c.push_with_producer(p)
  156. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  157. s.join()
  158. self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
  159. def test_string_producer(self):
  160. s, event = start_echo_server()
  161. c = echo_client('\n', s.port)
  162. data = "hello world\nI'm not dead yet!\n"
  163. c.push_with_producer(data+SERVER_QUIT)
  164. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  165. s.join()
  166. self.assertEqual(c.contents, ["hello world", "I'm not dead yet!"])
  167. def test_empty_line(self):
  168. # checks that empty lines are handled correctly
  169. s, event = start_echo_server()
  170. c = echo_client('\n', s.port)
  171. c.push("hello world\n\nI'm not dead yet!\n")
  172. c.push(SERVER_QUIT)
  173. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  174. s.join()
  175. self.assertEqual(c.contents, ["hello world", "", "I'm not dead yet!"])
  176. def test_close_when_done(self):
  177. s, event = start_echo_server()
  178. s.start_resend_event = threading.Event()
  179. c = echo_client('\n', s.port)
  180. c.push("hello world\nI'm not dead yet!\n")
  181. c.push(SERVER_QUIT)
  182. c.close_when_done()
  183. asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
  184. # Only allow the server to start echoing data back to the client after
  185. # the client has closed its connection. This prevents a race condition
  186. # where the server echoes all of its data before we can check that it
  187. # got any down below.
  188. s.start_resend_event.set()
  189. s.join()
  190. self.assertEqual(c.contents, [])
  191. # the server might have been able to send a byte or two back, but this
  192. # at least checks that it received something and didn't just fail
  193. # (which could still result in the client not having received anything)
  194. self.assertTrue(len(s.buffer) > 0)
  195. class TestAsynchat_WithPoll(TestAsynchat):
  196. usepoll = True
  197. class TestAsynchatMocked(unittest.TestCase):
  198. def test_blockingioerror(self):
  199. # Issue #16133: handle_read() must ignore blocking I/O errors like
  200. # EAGAIN
  201. class fake_socket:
  202. def fileno(self):
  203. return 0
  204. def recv(self, size):
  205. raise socket.error(errno.EAGAIN, "EAGAIN")
  206. class MyChat(asynchat.async_chat):
  207. def handle_error(self):
  208. raise Exception("error")
  209. sock = fake_socket()
  210. dispatcher = MyChat()
  211. dispatcher.set_socket(sock)
  212. self.addCleanup(dispatcher.del_channel)
  213. # must not call handle_error()
  214. dispatcher.handle_read()
  215. class TestHelperFunctions(unittest.TestCase):
  216. def test_find_prefix_at_end(self):
  217. self.assertEqual(asynchat.find_prefix_at_end("qwerty\r", "\r\n"), 1)
  218. self.assertEqual(asynchat.find_prefix_at_end("qwertydkjf", "\r\n"), 0)
  219. class TestFifo(unittest.TestCase):
  220. def test_basic(self):
  221. f = asynchat.fifo()
  222. f.push(7)
  223. f.push('a')
  224. self.assertEqual(len(f), 2)
  225. self.assertEqual(f.first(), 7)
  226. self.assertEqual(f.pop(), (1, 7))
  227. self.assertEqual(len(f), 1)
  228. self.assertEqual(f.first(), 'a')
  229. self.assertEqual(f.is_empty(), False)
  230. self.assertEqual(f.pop(), (1, 'a'))
  231. self.assertEqual(len(f), 0)
  232. self.assertEqual(f.is_empty(), True)
  233. self.assertEqual(f.pop(), (0, None))
  234. def test_given_list(self):
  235. f = asynchat.fifo(['x', 17, 3])
  236. self.assertEqual(len(f), 3)
  237. self.assertEqual(f.pop(), (1, 'x'))
  238. self.assertEqual(f.pop(), (1, 17))
  239. self.assertEqual(f.pop(), (1, 3))
  240. self.assertEqual(f.pop(), (0, None))
  241. def test_main(verbose=None):
  242. test_support.run_unittest(TestAsynchat, TestAsynchat_WithPoll,
  243. TestAsynchatMocked,
  244. TestHelperFunctions, TestFifo)
  245. if __name__ == "__main__":
  246. test_main(verbose=True)