test_telnetlib.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461
  1. import socket
  2. import telnetlib
  3. import time
  4. import Queue
  5. import unittest
  6. from unittest import TestCase
  7. from test import test_support
  8. threading = test_support.import_module('threading')
  9. HOST = test_support.HOST
  10. EOF_sigil = object()
  11. def server(evt, serv, dataq=None):
  12. """ Open a tcp server in three steps
  13. 1) set evt to true to let the parent know we are ready
  14. 2) [optional] if is not False, write the list of data from dataq.get()
  15. to the socket.
  16. """
  17. serv.listen(5)
  18. evt.set()
  19. try:
  20. conn, addr = serv.accept()
  21. if dataq:
  22. data = ''
  23. new_data = dataq.get(True, 0.5)
  24. dataq.task_done()
  25. for item in new_data:
  26. if item == EOF_sigil:
  27. break
  28. if type(item) in [int, float]:
  29. time.sleep(item)
  30. else:
  31. data += item
  32. written = conn.send(data)
  33. data = data[written:]
  34. conn.close()
  35. except socket.timeout:
  36. pass
  37. finally:
  38. serv.close()
  39. class GeneralTests(TestCase):
  40. def setUp(self):
  41. self.evt = threading.Event()
  42. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  43. self.sock.settimeout(60) # Safety net. Look issue 11812
  44. self.port = test_support.bind_port(self.sock)
  45. self.thread = threading.Thread(target=server, args=(self.evt,self.sock))
  46. self.thread.setDaemon(True)
  47. self.thread.start()
  48. self.evt.wait()
  49. def tearDown(self):
  50. self.thread.join()
  51. def testBasic(self):
  52. # connects
  53. telnet = telnetlib.Telnet(HOST, self.port)
  54. telnet.sock.close()
  55. def testTimeoutDefault(self):
  56. self.assertTrue(socket.getdefaulttimeout() is None)
  57. socket.setdefaulttimeout(30)
  58. try:
  59. telnet = telnetlib.Telnet(HOST, self.port)
  60. finally:
  61. socket.setdefaulttimeout(None)
  62. self.assertEqual(telnet.sock.gettimeout(), 30)
  63. telnet.sock.close()
  64. def testTimeoutNone(self):
  65. # None, having other default
  66. self.assertTrue(socket.getdefaulttimeout() is None)
  67. socket.setdefaulttimeout(30)
  68. try:
  69. telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
  70. finally:
  71. socket.setdefaulttimeout(None)
  72. self.assertTrue(telnet.sock.gettimeout() is None)
  73. telnet.sock.close()
  74. def testTimeoutValue(self):
  75. telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
  76. self.assertEqual(telnet.sock.gettimeout(), 30)
  77. telnet.sock.close()
  78. def testTimeoutOpen(self):
  79. telnet = telnetlib.Telnet()
  80. telnet.open(HOST, self.port, timeout=30)
  81. self.assertEqual(telnet.sock.gettimeout(), 30)
  82. telnet.sock.close()
  83. def testGetters(self):
  84. # Test telnet getter methods
  85. telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
  86. t_sock = telnet.sock
  87. self.assertEqual(telnet.get_socket(), t_sock)
  88. self.assertEqual(telnet.fileno(), t_sock.fileno())
  89. telnet.sock.close()
  90. def _read_setUp(self):
  91. self.evt = threading.Event()
  92. self.dataq = Queue.Queue()
  93. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  94. self.sock.settimeout(10)
  95. self.port = test_support.bind_port(self.sock)
  96. self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq))
  97. self.thread.start()
  98. self.evt.wait()
  99. def _read_tearDown(self):
  100. self.thread.join()
  101. class ReadTests(TestCase):
  102. setUp = _read_setUp
  103. tearDown = _read_tearDown
  104. # use a similar approach to testing timeouts as test_timeout.py
  105. # these will never pass 100% but make the fuzz big enough that it is rare
  106. block_long = 0.6
  107. block_short = 0.3
  108. def test_read_until_A(self):
  109. """
  110. read_until(expected, [timeout])
  111. Read until the expected string has been seen, or a timeout is
  112. hit (default is no timeout); may block.
  113. """
  114. want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
  115. self.dataq.put(want)
  116. telnet = telnetlib.Telnet(HOST, self.port)
  117. self.dataq.join()
  118. data = telnet.read_until('match')
  119. self.assertEqual(data, ''.join(want[:-2]))
  120. def test_read_until_B(self):
  121. # test the timeout - it does NOT raise socket.timeout
  122. want = ['hello', self.block_long, 'not seen', EOF_sigil]
  123. self.dataq.put(want)
  124. telnet = telnetlib.Telnet(HOST, self.port)
  125. self.dataq.join()
  126. data = telnet.read_until('not seen', self.block_short)
  127. self.assertEqual(data, want[0])
  128. self.assertEqual(telnet.read_all(), 'not seen')
  129. def test_read_until_with_poll(self):
  130. """Use select.poll() to implement telnet.read_until()."""
  131. want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
  132. self.dataq.put(want)
  133. telnet = telnetlib.Telnet(HOST, self.port)
  134. if not telnet._has_poll:
  135. raise unittest.SkipTest('select.poll() is required')
  136. telnet._has_poll = True
  137. self.dataq.join()
  138. data = telnet.read_until('match')
  139. self.assertEqual(data, ''.join(want[:-2]))
  140. def test_read_until_with_select(self):
  141. """Use select.select() to implement telnet.read_until()."""
  142. want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
  143. self.dataq.put(want)
  144. telnet = telnetlib.Telnet(HOST, self.port)
  145. telnet._has_poll = False
  146. self.dataq.join()
  147. data = telnet.read_until('match')
  148. self.assertEqual(data, ''.join(want[:-2]))
  149. def test_read_all_A(self):
  150. """
  151. read_all()
  152. Read all data until EOF; may block.
  153. """
  154. want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil]
  155. self.dataq.put(want)
  156. telnet = telnetlib.Telnet(HOST, self.port)
  157. self.dataq.join()
  158. data = telnet.read_all()
  159. self.assertEqual(data, ''.join(want[:-1]))
  160. def _test_blocking(self, func):
  161. self.dataq.put([self.block_long, EOF_sigil])
  162. self.dataq.join()
  163. start = time.time()
  164. data = func()
  165. self.assertTrue(self.block_short <= time.time() - start)
  166. def test_read_all_B(self):
  167. self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all)
  168. def test_read_all_C(self):
  169. self.dataq.put([EOF_sigil])
  170. telnet = telnetlib.Telnet(HOST, self.port)
  171. self.dataq.join()
  172. telnet.read_all()
  173. telnet.read_all() # shouldn't raise
  174. def test_read_some_A(self):
  175. """
  176. read_some()
  177. Read at least one byte or EOF; may block.
  178. """
  179. # test 'at least one byte'
  180. want = ['x' * 500, EOF_sigil]
  181. self.dataq.put(want)
  182. telnet = telnetlib.Telnet(HOST, self.port)
  183. self.dataq.join()
  184. data = telnet.read_all()
  185. self.assertTrue(len(data) >= 1)
  186. def test_read_some_B(self):
  187. # test EOF
  188. self.dataq.put([EOF_sigil])
  189. telnet = telnetlib.Telnet(HOST, self.port)
  190. self.dataq.join()
  191. self.assertEqual('', telnet.read_some())
  192. def test_read_some_C(self):
  193. self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some)
  194. def _test_read_any_eager_A(self, func_name):
  195. """
  196. read_very_eager()
  197. Read all data available already queued or on the socket,
  198. without blocking.
  199. """
  200. want = [self.block_long, 'x' * 100, 'y' * 100, EOF_sigil]
  201. expects = want[1] + want[2]
  202. self.dataq.put(want)
  203. telnet = telnetlib.Telnet(HOST, self.port)
  204. self.dataq.join()
  205. func = getattr(telnet, func_name)
  206. data = ''
  207. while True:
  208. try:
  209. data += func()
  210. self.assertTrue(expects.startswith(data))
  211. except EOFError:
  212. break
  213. self.assertEqual(expects, data)
  214. def _test_read_any_eager_B(self, func_name):
  215. # test EOF
  216. self.dataq.put([EOF_sigil])
  217. telnet = telnetlib.Telnet(HOST, self.port)
  218. self.dataq.join()
  219. time.sleep(self.block_short)
  220. func = getattr(telnet, func_name)
  221. self.assertRaises(EOFError, func)
  222. # read_eager and read_very_eager make the same guarantees
  223. # (they behave differently but we only test the guarantees)
  224. def test_read_very_eager_A(self):
  225. self._test_read_any_eager_A('read_very_eager')
  226. def test_read_very_eager_B(self):
  227. self._test_read_any_eager_B('read_very_eager')
  228. def test_read_eager_A(self):
  229. self._test_read_any_eager_A('read_eager')
  230. def test_read_eager_B(self):
  231. self._test_read_any_eager_B('read_eager')
  232. # NB -- we need to test the IAC block which is mentioned in the docstring
  233. # but not in the module docs
  234. def _test_read_any_lazy_B(self, func_name):
  235. self.dataq.put([EOF_sigil])
  236. telnet = telnetlib.Telnet(HOST, self.port)
  237. self.dataq.join()
  238. func = getattr(telnet, func_name)
  239. telnet.fill_rawq()
  240. self.assertRaises(EOFError, func)
  241. def test_read_lazy_A(self):
  242. want = ['x' * 100, EOF_sigil]
  243. self.dataq.put(want)
  244. telnet = telnetlib.Telnet(HOST, self.port)
  245. self.dataq.join()
  246. time.sleep(self.block_short)
  247. self.assertEqual('', telnet.read_lazy())
  248. data = ''
  249. while True:
  250. try:
  251. read_data = telnet.read_lazy()
  252. data += read_data
  253. if not read_data:
  254. telnet.fill_rawq()
  255. except EOFError:
  256. break
  257. self.assertTrue(want[0].startswith(data))
  258. self.assertEqual(data, want[0])
  259. def test_read_lazy_B(self):
  260. self._test_read_any_lazy_B('read_lazy')
  261. def test_read_very_lazy_A(self):
  262. want = ['x' * 100, EOF_sigil]
  263. self.dataq.put(want)
  264. telnet = telnetlib.Telnet(HOST, self.port)
  265. self.dataq.join()
  266. time.sleep(self.block_short)
  267. self.assertEqual('', telnet.read_very_lazy())
  268. data = ''
  269. while True:
  270. try:
  271. read_data = telnet.read_very_lazy()
  272. except EOFError:
  273. break
  274. data += read_data
  275. if not read_data:
  276. telnet.fill_rawq()
  277. self.assertEqual('', telnet.cookedq)
  278. telnet.process_rawq()
  279. self.assertTrue(want[0].startswith(data))
  280. self.assertEqual(data, want[0])
  281. def test_read_very_lazy_B(self):
  282. self._test_read_any_lazy_B('read_very_lazy')
  283. class nego_collector(object):
  284. def __init__(self, sb_getter=None):
  285. self.seen = ''
  286. self.sb_getter = sb_getter
  287. self.sb_seen = ''
  288. def do_nego(self, sock, cmd, opt):
  289. self.seen += cmd + opt
  290. if cmd == tl.SE and self.sb_getter:
  291. sb_data = self.sb_getter()
  292. self.sb_seen += sb_data
  293. tl = telnetlib
  294. class OptionTests(TestCase):
  295. setUp = _read_setUp
  296. tearDown = _read_tearDown
  297. # RFC 854 commands
  298. cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP]
  299. def _test_command(self, data):
  300. """ helper for testing IAC + cmd """
  301. self.setUp()
  302. self.dataq.put(data)
  303. telnet = telnetlib.Telnet(HOST, self.port)
  304. self.dataq.join()
  305. nego = nego_collector()
  306. telnet.set_option_negotiation_callback(nego.do_nego)
  307. txt = telnet.read_all()
  308. cmd = nego.seen
  309. self.assertTrue(len(cmd) > 0) # we expect at least one command
  310. self.assertIn(cmd[0], self.cmds)
  311. self.assertEqual(cmd[1], tl.NOOPT)
  312. self.assertEqual(len(''.join(data[:-1])), len(txt + cmd))
  313. nego.sb_getter = None # break the nego => telnet cycle
  314. self.tearDown()
  315. def test_IAC_commands(self):
  316. # reset our setup
  317. self.dataq.put([EOF_sigil])
  318. telnet = telnetlib.Telnet(HOST, self.port)
  319. self.dataq.join()
  320. self.tearDown()
  321. for cmd in self.cmds:
  322. self._test_command(['x' * 100, tl.IAC + cmd, 'y'*100, EOF_sigil])
  323. self._test_command(['x' * 10, tl.IAC + cmd, 'y'*10, EOF_sigil])
  324. self._test_command([tl.IAC + cmd, EOF_sigil])
  325. # all at once
  326. self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil])
  327. self.assertEqual('', telnet.read_sb_data())
  328. def test_SB_commands(self):
  329. # RFC 855, subnegotiations portion
  330. send = [tl.IAC + tl.SB + tl.IAC + tl.SE,
  331. tl.IAC + tl.SB + tl.IAC + tl.IAC + tl.IAC + tl.SE,
  332. tl.IAC + tl.SB + tl.IAC + tl.IAC + 'aa' + tl.IAC + tl.SE,
  333. tl.IAC + tl.SB + 'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE,
  334. tl.IAC + tl.SB + 'cc' + tl.IAC + tl.IAC + 'dd' + tl.IAC + tl.SE,
  335. EOF_sigil,
  336. ]
  337. self.dataq.put(send)
  338. telnet = telnetlib.Telnet(HOST, self.port)
  339. self.dataq.join()
  340. nego = nego_collector(telnet.read_sb_data)
  341. telnet.set_option_negotiation_callback(nego.do_nego)
  342. txt = telnet.read_all()
  343. self.assertEqual(txt, '')
  344. want_sb_data = tl.IAC + tl.IAC + 'aabb' + tl.IAC + 'cc' + tl.IAC + 'dd'
  345. self.assertEqual(nego.sb_seen, want_sb_data)
  346. self.assertEqual('', telnet.read_sb_data())
  347. nego.sb_getter = None # break the nego => telnet cycle
  348. class ExpectTests(TestCase):
  349. def setUp(self):
  350. self.evt = threading.Event()
  351. self.dataq = Queue.Queue()
  352. self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  353. self.sock.settimeout(10)
  354. self.port = test_support.bind_port(self.sock)
  355. self.thread = threading.Thread(target=server, args=(self.evt,self.sock,
  356. self.dataq))
  357. self.thread.start()
  358. self.evt.wait()
  359. def tearDown(self):
  360. self.thread.join()
  361. # use a similar approach to testing timeouts as test_timeout.py
  362. # these will never pass 100% but make the fuzz big enough that it is rare
  363. block_long = 0.6
  364. block_short = 0.3
  365. def test_expect_A(self):
  366. """
  367. expect(expected, [timeout])
  368. Read until the expected string has been seen, or a timeout is
  369. hit (default is no timeout); may block.
  370. """
  371. want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
  372. self.dataq.put(want)
  373. telnet = telnetlib.Telnet(HOST, self.port)
  374. self.dataq.join()
  375. (_,_,data) = telnet.expect(['match'])
  376. self.assertEqual(data, ''.join(want[:-2]))
  377. def test_expect_B(self):
  378. # test the timeout - it does NOT raise socket.timeout
  379. want = ['hello', self.block_long, 'not seen', EOF_sigil]
  380. self.dataq.put(want)
  381. telnet = telnetlib.Telnet(HOST, self.port)
  382. self.dataq.join()
  383. (_,_,data) = telnet.expect(['not seen'], self.block_short)
  384. self.assertEqual(data, want[0])
  385. self.assertEqual(telnet.read_all(), 'not seen')
  386. def test_expect_with_poll(self):
  387. """Use select.poll() to implement telnet.expect()."""
  388. want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
  389. self.dataq.put(want)
  390. telnet = telnetlib.Telnet(HOST, self.port)
  391. if not telnet._has_poll:
  392. raise unittest.SkipTest('select.poll() is required')
  393. telnet._has_poll = True
  394. self.dataq.join()
  395. (_,_,data) = telnet.expect(['match'])
  396. self.assertEqual(data, ''.join(want[:-2]))
  397. def test_expect_with_select(self):
  398. """Use select.select() to implement telnet.expect()."""
  399. want = ['x' * 10, 'match', 'y' * 10, EOF_sigil]
  400. self.dataq.put(want)
  401. telnet = telnetlib.Telnet(HOST, self.port)
  402. telnet._has_poll = False
  403. self.dataq.join()
  404. (_,_,data) = telnet.expect(['match'])
  405. self.assertEqual(data, ''.join(want[:-2]))
  406. def test_main(verbose=None):
  407. test_support.run_unittest(GeneralTests, ReadTests, OptionTests,
  408. ExpectTests)
  409. if __name__ == '__main__':
  410. test_main()