123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558 |
- """
- Various tests for synchronization primitives.
- """
- import sys
- import time
- from thread import start_new_thread, get_ident
- import threading
- import unittest
- from test import test_support as support
- def _wait():
- # A crude wait/yield function not relying on synchronization primitives.
- time.sleep(0.01)
- class Bunch(object):
- """
- A bunch of threads.
- """
- def __init__(self, f, n, wait_before_exit=False):
- """
- Construct a bunch of `n` threads running the same function `f`.
- If `wait_before_exit` is True, the threads won't terminate until
- do_finish() is called.
- """
- self.f = f
- self.n = n
- self.started = []
- self.finished = []
- self._can_exit = not wait_before_exit
- def task():
- tid = get_ident()
- self.started.append(tid)
- try:
- f()
- finally:
- self.finished.append(tid)
- while not self._can_exit:
- _wait()
- try:
- for i in range(n):
- start_new_thread(task, ())
- except:
- self._can_exit = True
- raise
- def wait_for_started(self):
- while len(self.started) < self.n:
- _wait()
- def wait_for_finished(self):
- while len(self.finished) < self.n:
- _wait()
- def do_finish(self):
- self._can_exit = True
- class BaseTestCase(unittest.TestCase):
- def setUp(self):
- self._threads = support.threading_setup()
- def tearDown(self):
- support.threading_cleanup(*self._threads)
- support.reap_children()
- class BaseLockTests(BaseTestCase):
- """
- Tests for both recursive and non-recursive locks.
- """
- def test_constructor(self):
- lock = self.locktype()
- del lock
- def test_acquire_destroy(self):
- lock = self.locktype()
- lock.acquire()
- del lock
- def test_acquire_release(self):
- lock = self.locktype()
- lock.acquire()
- lock.release()
- del lock
- def test_try_acquire(self):
- lock = self.locktype()
- self.assertTrue(lock.acquire(False))
- lock.release()
- def test_try_acquire_contended(self):
- lock = self.locktype()
- lock.acquire()
- result = []
- def f():
- result.append(lock.acquire(False))
- Bunch(f, 1).wait_for_finished()
- self.assertFalse(result[0])
- lock.release()
- def test_acquire_contended(self):
- lock = self.locktype()
- lock.acquire()
- N = 5
- def f():
- lock.acquire()
- lock.release()
- b = Bunch(f, N)
- b.wait_for_started()
- _wait()
- self.assertEqual(len(b.finished), 0)
- lock.release()
- b.wait_for_finished()
- self.assertEqual(len(b.finished), N)
- def test_with(self):
- lock = self.locktype()
- def f():
- lock.acquire()
- lock.release()
- def _with(err=None):
- with lock:
- if err is not None:
- raise err
- _with()
- # Check the lock is unacquired
- Bunch(f, 1).wait_for_finished()
- self.assertRaises(TypeError, _with, TypeError)
- # Check the lock is unacquired
- Bunch(f, 1).wait_for_finished()
- def test_thread_leak(self):
- # The lock shouldn't leak a Thread instance when used from a foreign
- # (non-threading) thread.
- lock = self.locktype()
- def f():
- lock.acquire()
- lock.release()
- n = len(threading.enumerate())
- # We run many threads in the hope that existing threads ids won't
- # be recycled.
- Bunch(f, 15).wait_for_finished()
- self.assertEqual(n, len(threading.enumerate()))
- class LockTests(BaseLockTests):
- """
- Tests for non-recursive, weak locks
- (which can be acquired and released from different threads).
- """
- def test_reacquire(self):
- # Lock needs to be released before re-acquiring.
- lock = self.locktype()
- phase = []
- def f():
- lock.acquire()
- phase.append(None)
- lock.acquire()
- phase.append(None)
- start_new_thread(f, ())
- while len(phase) == 0:
- _wait()
- _wait()
- self.assertEqual(len(phase), 1)
- lock.release()
- while len(phase) == 1:
- _wait()
- self.assertEqual(len(phase), 2)
- def test_different_thread(self):
- # Lock can be released from a different thread.
- lock = self.locktype()
- lock.acquire()
- def f():
- lock.release()
- b = Bunch(f, 1)
- b.wait_for_finished()
- lock.acquire()
- lock.release()
- class RLockTests(BaseLockTests):
- """
- Tests for recursive locks.
- """
- def test_reacquire(self):
- lock = self.locktype()
- lock.acquire()
- lock.acquire()
- lock.release()
- lock.acquire()
- lock.release()
- lock.release()
- def test_release_unacquired(self):
- # Cannot release an unacquired lock
- lock = self.locktype()
- self.assertRaises(RuntimeError, lock.release)
- lock.acquire()
- lock.acquire()
- lock.release()
- lock.acquire()
- lock.release()
- lock.release()
- self.assertRaises(RuntimeError, lock.release)
- def test_different_thread(self):
- # Cannot release from a different thread
- lock = self.locktype()
- def f():
- lock.acquire()
- b = Bunch(f, 1, True)
- try:
- self.assertRaises(RuntimeError, lock.release)
- finally:
- b.do_finish()
- def test__is_owned(self):
- lock = self.locktype()
- self.assertFalse(lock._is_owned())
- lock.acquire()
- self.assertTrue(lock._is_owned())
- lock.acquire()
- self.assertTrue(lock._is_owned())
- result = []
- def f():
- result.append(lock._is_owned())
- Bunch(f, 1).wait_for_finished()
- self.assertFalse(result[0])
- lock.release()
- self.assertTrue(lock._is_owned())
- lock.release()
- self.assertFalse(lock._is_owned())
- class EventTests(BaseTestCase):
- """
- Tests for Event objects.
- """
- def test_is_set(self):
- evt = self.eventtype()
- self.assertFalse(evt.is_set())
- evt.set()
- self.assertTrue(evt.is_set())
- evt.set()
- self.assertTrue(evt.is_set())
- evt.clear()
- self.assertFalse(evt.is_set())
- evt.clear()
- self.assertFalse(evt.is_set())
- def _check_notify(self, evt):
- # All threads get notified
- N = 5
- results1 = []
- results2 = []
- def f():
- results1.append(evt.wait())
- results2.append(evt.wait())
- b = Bunch(f, N)
- b.wait_for_started()
- _wait()
- self.assertEqual(len(results1), 0)
- evt.set()
- b.wait_for_finished()
- self.assertEqual(results1, [True] * N)
- self.assertEqual(results2, [True] * N)
- def test_notify(self):
- evt = self.eventtype()
- self._check_notify(evt)
- # Another time, after an explicit clear()
- evt.set()
- evt.clear()
- self._check_notify(evt)
- def test_timeout(self):
- evt = self.eventtype()
- results1 = []
- results2 = []
- N = 5
- def f():
- results1.append(evt.wait(0.0))
- t1 = time.time()
- r = evt.wait(0.2)
- t2 = time.time()
- results2.append((r, t2 - t1))
- Bunch(f, N).wait_for_finished()
- self.assertEqual(results1, [False] * N)
- for r, dt in results2:
- self.assertFalse(r)
- self.assertTrue(dt >= 0.2, dt)
- # The event is set
- results1 = []
- results2 = []
- evt.set()
- Bunch(f, N).wait_for_finished()
- self.assertEqual(results1, [True] * N)
- for r, dt in results2:
- self.assertTrue(r)
- def test_reset_internal_locks(self):
- evt = self.eventtype()
- old_lock = evt._Event__cond._Condition__lock
- evt._reset_internal_locks()
- new_lock = evt._Event__cond._Condition__lock
- self.assertIsNot(new_lock, old_lock)
- self.assertIs(type(new_lock), type(old_lock))
- class ConditionTests(BaseTestCase):
- """
- Tests for condition variables.
- """
- def test_acquire(self):
- cond = self.condtype()
- # Be default we have an RLock: the condition can be acquired multiple
- # times.
- cond.acquire()
- cond.acquire()
- cond.release()
- cond.release()
- lock = threading.Lock()
- cond = self.condtype(lock)
- cond.acquire()
- self.assertFalse(lock.acquire(False))
- cond.release()
- self.assertTrue(lock.acquire(False))
- self.assertFalse(cond.acquire(False))
- lock.release()
- with cond:
- self.assertFalse(lock.acquire(False))
- def test_unacquired_wait(self):
- cond = self.condtype()
- self.assertRaises(RuntimeError, cond.wait)
- def test_unacquired_notify(self):
- cond = self.condtype()
- self.assertRaises(RuntimeError, cond.notify)
- def _check_notify(self, cond):
- N = 5
- results1 = []
- results2 = []
- phase_num = 0
- def f():
- cond.acquire()
- cond.wait()
- cond.release()
- results1.append(phase_num)
- cond.acquire()
- cond.wait()
- cond.release()
- results2.append(phase_num)
- b = Bunch(f, N)
- b.wait_for_started()
- _wait()
- self.assertEqual(results1, [])
- # Notify 3 threads at first
- cond.acquire()
- cond.notify(3)
- _wait()
- phase_num = 1
- cond.release()
- while len(results1) < 3:
- _wait()
- self.assertEqual(results1, [1] * 3)
- self.assertEqual(results2, [])
- # Notify 5 threads: they might be in their first or second wait
- cond.acquire()
- cond.notify(5)
- _wait()
- phase_num = 2
- cond.release()
- while len(results1) + len(results2) < 8:
- _wait()
- self.assertEqual(results1, [1] * 3 + [2] * 2)
- self.assertEqual(results2, [2] * 3)
- # Notify all threads: they are all in their second wait
- cond.acquire()
- cond.notify_all()
- _wait()
- phase_num = 3
- cond.release()
- while len(results2) < 5:
- _wait()
- self.assertEqual(results1, [1] * 3 + [2] * 2)
- self.assertEqual(results2, [2] * 3 + [3] * 2)
- b.wait_for_finished()
- def test_notify(self):
- cond = self.condtype()
- self._check_notify(cond)
- # A second time, to check internal state is still ok.
- self._check_notify(cond)
- def test_timeout(self):
- cond = self.condtype()
- results = []
- N = 5
- def f():
- cond.acquire()
- t1 = time.time()
- cond.wait(0.2)
- t2 = time.time()
- cond.release()
- results.append(t2 - t1)
- Bunch(f, N).wait_for_finished()
- self.assertEqual(len(results), 5)
- for dt in results:
- self.assertTrue(dt >= 0.2, dt)
- class BaseSemaphoreTests(BaseTestCase):
- """
- Common tests for {bounded, unbounded} semaphore objects.
- """
- def test_constructor(self):
- self.assertRaises(ValueError, self.semtype, value = -1)
- self.assertRaises(ValueError, self.semtype, value = -sys.maxint)
- def test_acquire(self):
- sem = self.semtype(1)
- sem.acquire()
- sem.release()
- sem = self.semtype(2)
- sem.acquire()
- sem.acquire()
- sem.release()
- sem.release()
- def test_acquire_destroy(self):
- sem = self.semtype()
- sem.acquire()
- del sem
- def test_acquire_contended(self):
- sem = self.semtype(7)
- sem.acquire()
- N = 10
- results1 = []
- results2 = []
- phase_num = 0
- def f():
- sem.acquire()
- results1.append(phase_num)
- sem.acquire()
- results2.append(phase_num)
- b = Bunch(f, 10)
- b.wait_for_started()
- while len(results1) + len(results2) < 6:
- _wait()
- self.assertEqual(results1 + results2, [0] * 6)
- phase_num = 1
- for i in range(7):
- sem.release()
- while len(results1) + len(results2) < 13:
- _wait()
- self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
- phase_num = 2
- for i in range(6):
- sem.release()
- while len(results1) + len(results2) < 19:
- _wait()
- self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
- # The semaphore is still locked
- self.assertFalse(sem.acquire(False))
- # Final release, to let the last thread finish
- sem.release()
- b.wait_for_finished()
- def test_try_acquire(self):
- sem = self.semtype(2)
- self.assertTrue(sem.acquire(False))
- self.assertTrue(sem.acquire(False))
- self.assertFalse(sem.acquire(False))
- sem.release()
- self.assertTrue(sem.acquire(False))
- def test_try_acquire_contended(self):
- sem = self.semtype(4)
- sem.acquire()
- results = []
- def f():
- results.append(sem.acquire(False))
- results.append(sem.acquire(False))
- Bunch(f, 5).wait_for_finished()
- # There can be a thread switch between acquiring the semaphore and
- # appending the result, therefore results will not necessarily be
- # ordered.
- self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
- def test_default_value(self):
- # The default initial value is 1.
- sem = self.semtype()
- sem.acquire()
- def f():
- sem.acquire()
- sem.release()
- b = Bunch(f, 1)
- b.wait_for_started()
- _wait()
- self.assertFalse(b.finished)
- sem.release()
- b.wait_for_finished()
- def test_with(self):
- sem = self.semtype(2)
- def _with(err=None):
- with sem:
- self.assertTrue(sem.acquire(False))
- sem.release()
- with sem:
- self.assertFalse(sem.acquire(False))
- if err:
- raise err
- _with()
- self.assertTrue(sem.acquire(False))
- sem.release()
- self.assertRaises(TypeError, _with, TypeError)
- self.assertTrue(sem.acquire(False))
- sem.release()
- class SemaphoreTests(BaseSemaphoreTests):
- """
- Tests for unbounded semaphores.
- """
- def test_release_unacquired(self):
- # Unbounded releases are allowed and increment the semaphore's value
- sem = self.semtype(1)
- sem.release()
- sem.acquire()
- sem.acquire()
- sem.release()
- class BoundedSemaphoreTests(BaseSemaphoreTests):
- """
- Tests for bounded semaphores.
- """
- def test_release_unacquired(self):
- # Cannot go past the initial value
- sem = self.semtype()
- self.assertRaises(ValueError, sem.release)
- sem.acquire()
- sem.release()
- self.assertRaises(ValueError, sem.release)
|