test_poll.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. # Test case for the os.poll() function
  2. import os
  3. import random
  4. import select
  5. try:
  6. import threading
  7. except ImportError:
  8. threading = None
  9. import time
  10. import unittest
  11. from test.test_support import TESTFN, run_unittest, reap_threads, cpython_only
  12. try:
  13. select.poll
  14. except AttributeError:
  15. raise unittest.SkipTest, "select.poll not defined -- skipping test_poll"
  16. def find_ready_matching(ready, flag):
  17. match = []
  18. for fd, mode in ready:
  19. if mode & flag:
  20. match.append(fd)
  21. return match
  22. class PollTests(unittest.TestCase):
  23. def test_poll1(self):
  24. # Basic functional test of poll object
  25. # Create a bunch of pipe and test that poll works with them.
  26. p = select.poll()
  27. NUM_PIPES = 12
  28. MSG = " This is a test."
  29. MSG_LEN = len(MSG)
  30. readers = []
  31. writers = []
  32. r2w = {}
  33. w2r = {}
  34. for i in range(NUM_PIPES):
  35. rd, wr = os.pipe()
  36. p.register(rd)
  37. p.modify(rd, select.POLLIN)
  38. p.register(wr, select.POLLOUT)
  39. readers.append(rd)
  40. writers.append(wr)
  41. r2w[rd] = wr
  42. w2r[wr] = rd
  43. bufs = []
  44. while writers:
  45. ready = p.poll()
  46. ready_writers = find_ready_matching(ready, select.POLLOUT)
  47. if not ready_writers:
  48. raise RuntimeError, "no pipes ready for writing"
  49. wr = random.choice(ready_writers)
  50. os.write(wr, MSG)
  51. ready = p.poll()
  52. ready_readers = find_ready_matching(ready, select.POLLIN)
  53. if not ready_readers:
  54. raise RuntimeError, "no pipes ready for reading"
  55. rd = random.choice(ready_readers)
  56. buf = os.read(rd, MSG_LEN)
  57. self.assertEqual(len(buf), MSG_LEN)
  58. bufs.append(buf)
  59. os.close(r2w[rd]) ; os.close( rd )
  60. p.unregister( r2w[rd] )
  61. p.unregister( rd )
  62. writers.remove(r2w[rd])
  63. self.assertEqual(bufs, [MSG] * NUM_PIPES)
  64. def poll_unit_tests(self):
  65. # returns NVAL for invalid file descriptor
  66. FD = 42
  67. try:
  68. os.close(FD)
  69. except OSError:
  70. pass
  71. p = select.poll()
  72. p.register(FD)
  73. r = p.poll()
  74. self.assertEqual(r[0], (FD, select.POLLNVAL))
  75. f = open(TESTFN, 'w')
  76. fd = f.fileno()
  77. p = select.poll()
  78. p.register(f)
  79. r = p.poll()
  80. self.assertEqual(r[0][0], fd)
  81. f.close()
  82. r = p.poll()
  83. self.assertEqual(r[0], (fd, select.POLLNVAL))
  84. os.unlink(TESTFN)
  85. # type error for invalid arguments
  86. p = select.poll()
  87. self.assertRaises(TypeError, p.register, p)
  88. self.assertRaises(TypeError, p.unregister, p)
  89. # can't unregister non-existent object
  90. p = select.poll()
  91. self.assertRaises(KeyError, p.unregister, 3)
  92. # Test error cases
  93. pollster = select.poll()
  94. class Nope:
  95. pass
  96. class Almost:
  97. def fileno(self):
  98. return 'fileno'
  99. self.assertRaises(TypeError, pollster.register, Nope(), 0)
  100. self.assertRaises(TypeError, pollster.register, Almost(), 0)
  101. # Another test case for poll(). This is copied from the test case for
  102. # select(), modified to use poll() instead.
  103. def test_poll2(self):
  104. cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
  105. p = os.popen(cmd, 'r')
  106. pollster = select.poll()
  107. pollster.register( p, select.POLLIN )
  108. for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
  109. fdlist = pollster.poll(tout)
  110. if (fdlist == []):
  111. continue
  112. fd, flags = fdlist[0]
  113. if flags & select.POLLHUP:
  114. line = p.readline()
  115. if line != "":
  116. self.fail('error: pipe seems to be closed, but still returns data')
  117. continue
  118. elif flags & select.POLLIN:
  119. line = p.readline()
  120. if not line:
  121. break
  122. continue
  123. else:
  124. self.fail('Unexpected return value from select.poll: %s' % fdlist)
  125. p.close()
  126. def test_poll3(self):
  127. # test int overflow
  128. pollster = select.poll()
  129. pollster.register(1)
  130. self.assertRaises(OverflowError, pollster.poll, 1L << 64)
  131. x = 2 + 3
  132. if x != 5:
  133. self.fail('Overflow must have occurred')
  134. # Issues #15989, #17919
  135. self.assertRaises(OverflowError, pollster.register, 0, -1)
  136. self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
  137. self.assertRaises(OverflowError, pollster.modify, 1, -1)
  138. self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
  139. @cpython_only
  140. def test_poll_c_limits(self):
  141. from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
  142. pollster = select.poll()
  143. pollster.register(1)
  144. # Issues #15989, #17919
  145. self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
  146. self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
  147. self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
  148. self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
  149. @unittest.skipUnless(threading, 'Threading required for this test.')
  150. @reap_threads
  151. def test_threaded_poll(self):
  152. r, w = os.pipe()
  153. self.addCleanup(os.close, r)
  154. self.addCleanup(os.close, w)
  155. rfds = []
  156. for i in range(10):
  157. fd = os.dup(r)
  158. self.addCleanup(os.close, fd)
  159. rfds.append(fd)
  160. pollster = select.poll()
  161. for fd in rfds:
  162. pollster.register(fd, select.POLLIN)
  163. t = threading.Thread(target=pollster.poll)
  164. t.start()
  165. try:
  166. time.sleep(0.5)
  167. # trigger ufds array reallocation
  168. for fd in rfds:
  169. pollster.unregister(fd)
  170. pollster.register(w, select.POLLOUT)
  171. self.assertRaises(RuntimeError, pollster.poll)
  172. finally:
  173. # and make the call to poll() from the thread return
  174. os.write(w, b'spam')
  175. t.join()
  176. def test_main():
  177. run_unittest(PollTests)
  178. if __name__ == '__main__':
  179. test_main()