123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325 |
- # Some simple queue module tests, plus some failure conditions
- # to ensure the Queue locks remain stable.
- import Queue
- import time
- import unittest
- from test import test_support
- threading = test_support.import_module('threading')
- QUEUE_SIZE = 5
- # A thread to run a function that unclogs a blocked Queue.
- class _TriggerThread(threading.Thread):
- def __init__(self, fn, args):
- self.fn = fn
- self.args = args
- self.startedEvent = threading.Event()
- threading.Thread.__init__(self)
- def run(self):
- # The sleep isn't necessary, but is intended to give the blocking
- # function in the main thread a chance at actually blocking before
- # we unclog it. But if the sleep is longer than the timeout-based
- # tests wait in their blocking functions, those tests will fail.
- # So we give them much longer timeout values compared to the
- # sleep here (I aimed at 10 seconds for blocking functions --
- # they should never actually wait that long - they should make
- # progress as soon as we call self.fn()).
- time.sleep(0.1)
- self.startedEvent.set()
- self.fn(*self.args)
- # Execute a function that blocks, and in a separate thread, a function that
- # triggers the release. Returns the result of the blocking function. Caution:
- # block_func must guarantee to block until trigger_func is called, and
- # trigger_func must guarantee to change queue state so that block_func can make
- # enough progress to return. In particular, a block_func that just raises an
- # exception regardless of whether trigger_func is called will lead to
- # timing-dependent sporadic failures, and one of those went rarely seen but
- # undiagnosed for years. Now block_func must be unexceptional. If block_func
- # is supposed to raise an exception, call do_exceptional_blocking_test()
- # instead.
- class BlockingTestMixin:
- def tearDown(self):
- self.t = None
- def do_blocking_test(self, block_func, block_args, trigger_func, trigger_args):
- self.t = _TriggerThread(trigger_func, trigger_args)
- self.t.start()
- self.result = block_func(*block_args)
- # If block_func returned before our thread made the call, we failed!
- if not self.t.startedEvent.is_set():
- self.fail("blocking function '%r' appeared not to block" %
- block_func)
- self.t.join(10) # make sure the thread terminates
- if self.t.is_alive():
- self.fail("trigger function '%r' appeared to not return" %
- trigger_func)
- return self.result
- # Call this instead if block_func is supposed to raise an exception.
- def do_exceptional_blocking_test(self,block_func, block_args, trigger_func,
- trigger_args, expected_exception_class):
- self.t = _TriggerThread(trigger_func, trigger_args)
- self.t.start()
- try:
- try:
- block_func(*block_args)
- except expected_exception_class:
- raise
- else:
- self.fail("expected exception of kind %r" %
- expected_exception_class)
- finally:
- self.t.join(10) # make sure the thread terminates
- if self.t.is_alive():
- self.fail("trigger function '%r' appeared to not return" %
- trigger_func)
- if not self.t.startedEvent.is_set():
- self.fail("trigger thread ended but event never set")
- class BaseQueueTest(BlockingTestMixin):
- def setUp(self):
- self.cum = 0
- self.cumlock = threading.Lock()
- def simple_queue_test(self, q):
- if not q.empty():
- raise RuntimeError, "Call this function with an empty queue"
- # I guess we better check things actually queue correctly a little :)
- q.put(111)
- q.put(333)
- q.put(222)
- target_order = dict(Queue = [111, 333, 222],
- LifoQueue = [222, 333, 111],
- PriorityQueue = [111, 222, 333])
- actual_order = [q.get(), q.get(), q.get()]
- self.assertEqual(actual_order, target_order[q.__class__.__name__],
- "Didn't seem to queue the correct data!")
- for i in range(QUEUE_SIZE-1):
- q.put(i)
- self.assertTrue(not q.empty(), "Queue should not be empty")
- self.assertTrue(not q.full(), "Queue should not be full")
- last = 2 * QUEUE_SIZE
- full = 3 * 2 * QUEUE_SIZE
- q.put(last)
- self.assertTrue(q.full(), "Queue should be full")
- try:
- q.put(full, block=0)
- self.fail("Didn't appear to block with a full queue")
- except Queue.Full:
- pass
- try:
- q.put(full, timeout=0.01)
- self.fail("Didn't appear to time-out with a full queue")
- except Queue.Full:
- pass
- # Test a blocking put
- self.do_blocking_test(q.put, (full,), q.get, ())
- self.do_blocking_test(q.put, (full, True, 10), q.get, ())
- # Empty it
- for i in range(QUEUE_SIZE):
- q.get()
- self.assertTrue(q.empty(), "Queue should be empty")
- try:
- q.get(block=0)
- self.fail("Didn't appear to block with an empty queue")
- except Queue.Empty:
- pass
- try:
- q.get(timeout=0.01)
- self.fail("Didn't appear to time-out with an empty queue")
- except Queue.Empty:
- pass
- # Test a blocking get
- self.do_blocking_test(q.get, (), q.put, ('empty',))
- self.do_blocking_test(q.get, (True, 10), q.put, ('empty',))
- def worker(self, q):
- while True:
- x = q.get()
- if x is None:
- q.task_done()
- return
- with self.cumlock:
- self.cum += x
- q.task_done()
- def queue_join_test(self, q):
- self.cum = 0
- for i in (0,1):
- threading.Thread(target=self.worker, args=(q,)).start()
- for i in xrange(100):
- q.put(i)
- q.join()
- self.assertEqual(self.cum, sum(range(100)),
- "q.join() did not block until all tasks were done")
- for i in (0,1):
- q.put(None) # instruct the threads to close
- q.join() # verify that you can join twice
- def test_queue_task_done(self):
- # Test to make sure a queue task completed successfully.
- q = self.type2test()
- try:
- q.task_done()
- except ValueError:
- pass
- else:
- self.fail("Did not detect task count going negative")
- def test_queue_join(self):
- # Test that a queue join()s successfully, and before anything else
- # (done twice for insurance).
- q = self.type2test()
- self.queue_join_test(q)
- self.queue_join_test(q)
- try:
- q.task_done()
- except ValueError:
- pass
- else:
- self.fail("Did not detect task count going negative")
- def test_simple_queue(self):
- # Do it a couple of times on the same queue.
- # Done twice to make sure works with same instance reused.
- q = self.type2test(QUEUE_SIZE)
- self.simple_queue_test(q)
- self.simple_queue_test(q)
- class QueueTest(BaseQueueTest, unittest.TestCase):
- type2test = Queue.Queue
- class LifoQueueTest(BaseQueueTest, unittest.TestCase):
- type2test = Queue.LifoQueue
- class PriorityQueueTest(BaseQueueTest, unittest.TestCase):
- type2test = Queue.PriorityQueue
- # A Queue subclass that can provoke failure at a moment's notice :)
- class FailingQueueException(Exception):
- pass
- class FailingQueue(Queue.Queue):
- def __init__(self, *args):
- self.fail_next_put = False
- self.fail_next_get = False
- Queue.Queue.__init__(self, *args)
- def _put(self, item):
- if self.fail_next_put:
- self.fail_next_put = False
- raise FailingQueueException, "You Lose"
- return Queue.Queue._put(self, item)
- def _get(self):
- if self.fail_next_get:
- self.fail_next_get = False
- raise FailingQueueException, "You Lose"
- return Queue.Queue._get(self)
- class FailingQueueTest(BlockingTestMixin, unittest.TestCase):
- def failing_queue_test(self, q):
- if not q.empty():
- raise RuntimeError, "Call this function with an empty queue"
- for i in range(QUEUE_SIZE-1):
- q.put(i)
- # Test a failing non-blocking put.
- q.fail_next_put = True
- try:
- q.put("oops", block=0)
- self.fail("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- q.fail_next_put = True
- try:
- q.put("oops", timeout=0.1)
- self.fail("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- q.put("last")
- self.assertTrue(q.full(), "Queue should be full")
- # Test a failing blocking put
- q.fail_next_put = True
- try:
- self.do_blocking_test(q.put, ("full",), q.get, ())
- self.fail("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- # Check the Queue isn't damaged.
- # put failed, but get succeeded - re-add
- q.put("last")
- # Test a failing timeout put
- q.fail_next_put = True
- try:
- self.do_exceptional_blocking_test(q.put, ("full", True, 10), q.get, (),
- FailingQueueException)
- self.fail("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- # Check the Queue isn't damaged.
- # put failed, but get succeeded - re-add
- q.put("last")
- self.assertTrue(q.full(), "Queue should be full")
- q.get()
- self.assertTrue(not q.full(), "Queue should not be full")
- q.put("last")
- self.assertTrue(q.full(), "Queue should be full")
- # Test a blocking put
- self.do_blocking_test(q.put, ("full",), q.get, ())
- # Empty it
- for i in range(QUEUE_SIZE):
- q.get()
- self.assertTrue(q.empty(), "Queue should be empty")
- q.put("first")
- q.fail_next_get = True
- try:
- q.get()
- self.fail("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- self.assertTrue(not q.empty(), "Queue should not be empty")
- q.fail_next_get = True
- try:
- q.get(timeout=0.1)
- self.fail("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- self.assertTrue(not q.empty(), "Queue should not be empty")
- q.get()
- self.assertTrue(q.empty(), "Queue should be empty")
- q.fail_next_get = True
- try:
- self.do_exceptional_blocking_test(q.get, (), q.put, ('empty',),
- FailingQueueException)
- self.fail("The queue didn't fail when it should have")
- except FailingQueueException:
- pass
- # put succeeded, but get failed.
- self.assertTrue(not q.empty(), "Queue should not be empty")
- q.get()
- self.assertTrue(q.empty(), "Queue should be empty")
- def test_failing_queue(self):
- # Test to make sure a queue is functioning correctly.
- # Done twice to the same instance.
- q = FailingQueue(QUEUE_SIZE)
- self.failing_queue_test(q)
- self.failing_queue_test(q)
- def test_main():
- test_support.run_unittest(QueueTest, LifoQueueTest, PriorityQueueTest,
- FailingQueueTest)
- if __name__ == "__main__":
- test_main()
|