123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213 |
- # Test case for the os.poll() function
- import os
- import random
- import select
- try:
- import threading
- except ImportError:
- threading = None
- import time
- import unittest
- from test.test_support import TESTFN, run_unittest, reap_threads, cpython_only
- try:
- select.poll
- except AttributeError:
- raise unittest.SkipTest, "select.poll not defined -- skipping test_poll"
- def find_ready_matching(ready, flag):
- match = []
- for fd, mode in ready:
- if mode & flag:
- match.append(fd)
- return match
- class PollTests(unittest.TestCase):
- def test_poll1(self):
- # Basic functional test of poll object
- # Create a bunch of pipe and test that poll works with them.
- p = select.poll()
- NUM_PIPES = 12
- MSG = " This is a test."
- MSG_LEN = len(MSG)
- readers = []
- writers = []
- r2w = {}
- w2r = {}
- for i in range(NUM_PIPES):
- rd, wr = os.pipe()
- p.register(rd)
- p.modify(rd, select.POLLIN)
- p.register(wr, select.POLLOUT)
- readers.append(rd)
- writers.append(wr)
- r2w[rd] = wr
- w2r[wr] = rd
- bufs = []
- while writers:
- ready = p.poll()
- ready_writers = find_ready_matching(ready, select.POLLOUT)
- if not ready_writers:
- raise RuntimeError, "no pipes ready for writing"
- wr = random.choice(ready_writers)
- os.write(wr, MSG)
- ready = p.poll()
- ready_readers = find_ready_matching(ready, select.POLLIN)
- if not ready_readers:
- raise RuntimeError, "no pipes ready for reading"
- rd = random.choice(ready_readers)
- buf = os.read(rd, MSG_LEN)
- self.assertEqual(len(buf), MSG_LEN)
- bufs.append(buf)
- os.close(r2w[rd]) ; os.close( rd )
- p.unregister( r2w[rd] )
- p.unregister( rd )
- writers.remove(r2w[rd])
- self.assertEqual(bufs, [MSG] * NUM_PIPES)
- def poll_unit_tests(self):
- # returns NVAL for invalid file descriptor
- FD = 42
- try:
- os.close(FD)
- except OSError:
- pass
- p = select.poll()
- p.register(FD)
- r = p.poll()
- self.assertEqual(r[0], (FD, select.POLLNVAL))
- f = open(TESTFN, 'w')
- fd = f.fileno()
- p = select.poll()
- p.register(f)
- r = p.poll()
- self.assertEqual(r[0][0], fd)
- f.close()
- r = p.poll()
- self.assertEqual(r[0], (fd, select.POLLNVAL))
- os.unlink(TESTFN)
- # type error for invalid arguments
- p = select.poll()
- self.assertRaises(TypeError, p.register, p)
- self.assertRaises(TypeError, p.unregister, p)
- # can't unregister non-existent object
- p = select.poll()
- self.assertRaises(KeyError, p.unregister, 3)
- # Test error cases
- pollster = select.poll()
- class Nope:
- pass
- class Almost:
- def fileno(self):
- return 'fileno'
- self.assertRaises(TypeError, pollster.register, Nope(), 0)
- self.assertRaises(TypeError, pollster.register, Almost(), 0)
- # Another test case for poll(). This is copied from the test case for
- # select(), modified to use poll() instead.
- def test_poll2(self):
- cmd = 'for i in 0 1 2 3 4 5 6 7 8 9; do echo testing...; sleep 1; done'
- p = os.popen(cmd, 'r')
- pollster = select.poll()
- pollster.register( p, select.POLLIN )
- for tout in (0, 1000, 2000, 4000, 8000, 16000) + (-1,)*10:
- fdlist = pollster.poll(tout)
- if (fdlist == []):
- continue
- fd, flags = fdlist[0]
- if flags & select.POLLHUP:
- line = p.readline()
- if line != "":
- self.fail('error: pipe seems to be closed, but still returns data')
- continue
- elif flags & select.POLLIN:
- line = p.readline()
- if not line:
- break
- continue
- else:
- self.fail('Unexpected return value from select.poll: %s' % fdlist)
- p.close()
- def test_poll3(self):
- # test int overflow
- pollster = select.poll()
- pollster.register(1)
- self.assertRaises(OverflowError, pollster.poll, 1L << 64)
- x = 2 + 3
- if x != 5:
- self.fail('Overflow must have occurred')
- # Issues #15989, #17919
- self.assertRaises(OverflowError, pollster.register, 0, -1)
- self.assertRaises(OverflowError, pollster.register, 0, 1 << 64)
- self.assertRaises(OverflowError, pollster.modify, 1, -1)
- self.assertRaises(OverflowError, pollster.modify, 1, 1 << 64)
- @cpython_only
- def test_poll_c_limits(self):
- from _testcapi import USHRT_MAX, INT_MAX, UINT_MAX
- pollster = select.poll()
- pollster.register(1)
- # Issues #15989, #17919
- self.assertRaises(OverflowError, pollster.register, 0, USHRT_MAX + 1)
- self.assertRaises(OverflowError, pollster.modify, 1, USHRT_MAX + 1)
- self.assertRaises(OverflowError, pollster.poll, INT_MAX + 1)
- self.assertRaises(OverflowError, pollster.poll, UINT_MAX + 1)
- @unittest.skipUnless(threading, 'Threading required for this test.')
- @reap_threads
- def test_threaded_poll(self):
- r, w = os.pipe()
- self.addCleanup(os.close, r)
- self.addCleanup(os.close, w)
- rfds = []
- for i in range(10):
- fd = os.dup(r)
- self.addCleanup(os.close, fd)
- rfds.append(fd)
- pollster = select.poll()
- for fd in rfds:
- pollster.register(fd, select.POLLIN)
- t = threading.Thread(target=pollster.poll)
- t.start()
- try:
- time.sleep(0.5)
- # trigger ufds array reallocation
- for fd in rfds:
- pollster.unregister(fd)
- pollster.register(w, select.POLLOUT)
- self.assertRaises(RuntimeError, pollster.poll)
- finally:
- # and make the call to poll() from the thread return
- os.write(w, b'spam')
- t.join()
- def test_main():
- run_unittest(PollTests)
- if __name__ == '__main__':
- test_main()
|