lock_tests.py 15 KB


  1. """
  2. Various tests for synchronization primitives.
  3. """
  4. import sys
  5. import time
  6. from thread import start_new_thread, get_ident
  7. import threading
  8. import unittest
  9. from test import test_support as support
  10. def _wait():
  11. # A crude wait/yield function not relying on synchronization primitives.
  12. time.sleep(0.01)
  13. class Bunch(object):
  14. """
  15. A bunch of threads.
  16. """
  17. def __init__(self, f, n, wait_before_exit=False):
  18. """
  19. Construct a bunch of `n` threads running the same function `f`.
  20. If `wait_before_exit` is True, the threads won't terminate until
  21. do_finish() is called.
  22. """
  23. self.f = f
  24. self.n = n
  25. self.started = []
  26. self.finished = []
  27. self._can_exit = not wait_before_exit
  28. def task():
  29. tid = get_ident()
  30. self.started.append(tid)
  31. try:
  32. f()
  33. finally:
  34. self.finished.append(tid)
  35. while not self._can_exit:
  36. _wait()
  37. try:
  38. for i in range(n):
  39. start_new_thread(task, ())
  40. except:
  41. self._can_exit = True
  42. raise
  43. def wait_for_started(self):
  44. while len(self.started) < self.n:
  45. _wait()
  46. def wait_for_finished(self):
  47. while len(self.finished) < self.n:
  48. _wait()
  49. def do_finish(self):
  50. self._can_exit = True
  51. class BaseTestCase(unittest.TestCase):
  52. def setUp(self):
  53. self._threads = support.threading_setup()
  54. def tearDown(self):
  55. support.threading_cleanup(*self._threads)
  56. support.reap_children()
  57. class BaseLockTests(BaseTestCase):
  58. """
  59. Tests for both recursive and non-recursive locks.
  60. """
  61. def test_constructor(self):
  62. lock = self.locktype()
  63. del lock
  64. def test_acquire_destroy(self):
  65. lock = self.locktype()
  66. lock.acquire()
  67. del lock
  68. def test_acquire_release(self):
  69. lock = self.locktype()
  70. lock.acquire()
  71. lock.release()
  72. del lock
  73. def test_try_acquire(self):
  74. lock = self.locktype()
  75. self.assertTrue(lock.acquire(False))
  76. lock.release()
  77. def test_try_acquire_contended(self):
  78. lock = self.locktype()
  79. lock.acquire()
  80. result = []
  81. def f():
  82. result.append(lock.acquire(False))
  83. Bunch(f, 1).wait_for_finished()
  84. self.assertFalse(result[0])
  85. lock.release()
  86. def test_acquire_contended(self):
  87. lock = self.locktype()
  88. lock.acquire()
  89. N = 5
  90. def f():
  91. lock.acquire()
  92. lock.release()
  93. b = Bunch(f, N)
  94. b.wait_for_started()
  95. _wait()
  96. self.assertEqual(len(b.finished), 0)
  97. lock.release()
  98. b.wait_for_finished()
  99. self.assertEqual(len(b.finished), N)
  100. def test_with(self):
  101. lock = self.locktype()
  102. def f():
  103. lock.acquire()
  104. lock.release()
  105. def _with(err=None):
  106. with lock:
  107. if err is not None:
  108. raise err
  109. _with()
  110. # Check the lock is unacquired
  111. Bunch(f, 1).wait_for_finished()
  112. self.assertRaises(TypeError, _with, TypeError)
  113. # Check the lock is unacquired
  114. Bunch(f, 1).wait_for_finished()
  115. def test_thread_leak(self):
  116. # The lock shouldn't leak a Thread instance when used from a foreign
  117. # (non-threading) thread.
  118. lock = self.locktype()
  119. def f():
  120. lock.acquire()
  121. lock.release()
  122. n = len(threading.enumerate())
  123. # We run many threads in the hope that existing threads ids won't
  124. # be recycled.
  125. Bunch(f, 15).wait_for_finished()
  126. self.assertEqual(n, len(threading.enumerate()))
  127. class LockTests(BaseLockTests):
  128. """
  129. Tests for non-recursive, weak locks
  130. (which can be acquired and released from different threads).
  131. """
  132. def test_reacquire(self):
  133. # Lock needs to be released before re-acquiring.
  134. lock = self.locktype()
  135. phase = []
  136. def f():
  137. lock.acquire()
  138. phase.append(None)
  139. lock.acquire()
  140. phase.append(None)
  141. start_new_thread(f, ())
  142. while len(phase) == 0:
  143. _wait()
  144. _wait()
  145. self.assertEqual(len(phase), 1)
  146. lock.release()
  147. while len(phase) == 1:
  148. _wait()
  149. self.assertEqual(len(phase), 2)
  150. def test_different_thread(self):
  151. # Lock can be released from a different thread.
  152. lock = self.locktype()
  153. lock.acquire()
  154. def f():
  155. lock.release()
  156. b = Bunch(f, 1)
  157. b.wait_for_finished()
  158. lock.acquire()
  159. lock.release()
  160. class RLockTests(BaseLockTests):
  161. """
  162. Tests for recursive locks.
  163. """
  164. def test_reacquire(self):
  165. lock = self.locktype()
  166. lock.acquire()
  167. lock.acquire()
  168. lock.release()
  169. lock.acquire()
  170. lock.release()
  171. lock.release()
  172. def test_release_unacquired(self):
  173. # Cannot release an unacquired lock
  174. lock = self.locktype()
  175. self.assertRaises(RuntimeError, lock.release)
  176. lock.acquire()
  177. lock.acquire()
  178. lock.release()
  179. lock.acquire()
  180. lock.release()
  181. lock.release()
  182. self.assertRaises(RuntimeError, lock.release)
  183. def test_different_thread(self):
  184. # Cannot release from a different thread
  185. lock = self.locktype()
  186. def f():
  187. lock.acquire()
  188. b = Bunch(f, 1, True)
  189. try:
  190. self.assertRaises(RuntimeError, lock.release)
  191. finally:
  192. b.do_finish()
  193. def test__is_owned(self):
  194. lock = self.locktype()
  195. self.assertFalse(lock._is_owned())
  196. lock.acquire()
  197. self.assertTrue(lock._is_owned())
  198. lock.acquire()
  199. self.assertTrue(lock._is_owned())
  200. result = []
  201. def f():
  202. result.append(lock._is_owned())
  203. Bunch(f, 1).wait_for_finished()
  204. self.assertFalse(result[0])
  205. lock.release()
  206. self.assertTrue(lock._is_owned())
  207. lock.release()
  208. self.assertFalse(lock._is_owned())
  209. class EventTests(BaseTestCase):
  210. """
  211. Tests for Event objects.
  212. """
  213. def test_is_set(self):
  214. evt = self.eventtype()
  215. self.assertFalse(evt.is_set())
  216. evt.set()
  217. self.assertTrue(evt.is_set())
  218. evt.set()
  219. self.assertTrue(evt.is_set())
  220. evt.clear()
  221. self.assertFalse(evt.is_set())
  222. evt.clear()
  223. self.assertFalse(evt.is_set())
  224. def _check_notify(self, evt):
  225. # All threads get notified
  226. N = 5
  227. results1 = []
  228. results2 = []
  229. def f():
  230. results1.append(evt.wait())
  231. results2.append(evt.wait())
  232. b = Bunch(f, N)
  233. b.wait_for_started()
  234. _wait()
  235. self.assertEqual(len(results1), 0)
  236. evt.set()
  237. b.wait_for_finished()
  238. self.assertEqual(results1, [True] * N)
  239. self.assertEqual(results2, [True] * N)
  240. def test_notify(self):
  241. evt = self.eventtype()
  242. self._check_notify(evt)
  243. # Another time, after an explicit clear()
  244. evt.set()
  245. evt.clear()
  246. self._check_notify(evt)
  247. def test_timeout(self):
  248. evt = self.eventtype()
  249. results1 = []
  250. results2 = []
  251. N = 5
  252. def f():
  253. results1.append(evt.wait(0.0))
  254. t1 = time.time()
  255. r = evt.wait(0.2)
  256. t2 = time.time()
  257. results2.append((r, t2 - t1))
  258. Bunch(f, N).wait_for_finished()
  259. self.assertEqual(results1, [False] * N)
  260. for r, dt in results2:
  261. self.assertFalse(r)
  262. self.assertTrue(dt >= 0.2, dt)
  263. # The event is set
  264. results1 = []
  265. results2 = []
  266. evt.set()
  267. Bunch(f, N).wait_for_finished()
  268. self.assertEqual(results1, [True] * N)
  269. for r, dt in results2:
  270. self.assertTrue(r)
  271. def test_reset_internal_locks(self):
  272. evt = self.eventtype()
  273. old_lock = evt._Event__cond._Condition__lock
  274. evt._reset_internal_locks()
  275. new_lock = evt._Event__cond._Condition__lock
  276. self.assertIsNot(new_lock, old_lock)
  277. self.assertIs(type(new_lock), type(old_lock))
  278. class ConditionTests(BaseTestCase):
  279. """
  280. Tests for condition variables.
  281. """
  282. def test_acquire(self):
  283. cond = self.condtype()
  284. # Be default we have an RLock: the condition can be acquired multiple
  285. # times.
  286. cond.acquire()
  287. cond.acquire()
  288. cond.release()
  289. cond.release()
  290. lock = threading.Lock()
  291. cond = self.condtype(lock)
  292. cond.acquire()
  293. self.assertFalse(lock.acquire(False))
  294. cond.release()
  295. self.assertTrue(lock.acquire(False))
  296. self.assertFalse(cond.acquire(False))
  297. lock.release()
  298. with cond:
  299. self.assertFalse(lock.acquire(False))
  300. def test_unacquired_wait(self):
  301. cond = self.condtype()
  302. self.assertRaises(RuntimeError, cond.wait)
  303. def test_unacquired_notify(self):
  304. cond = self.condtype()
  305. self.assertRaises(RuntimeError, cond.notify)
  306. def _check_notify(self, cond):
  307. N = 5
  308. results1 = []
  309. results2 = []
  310. phase_num = 0
  311. def f():
  312. cond.acquire()
  313. cond.wait()
  314. cond.release()
  315. results1.append(phase_num)
  316. cond.acquire()
  317. cond.wait()
  318. cond.release()
  319. results2.append(phase_num)
  320. b = Bunch(f, N)
  321. b.wait_for_started()
  322. _wait()
  323. self.assertEqual(results1, [])
  324. # Notify 3 threads at first
  325. cond.acquire()
  326. cond.notify(3)
  327. _wait()
  328. phase_num = 1
  329. cond.release()
  330. while len(results1) < 3:
  331. _wait()
  332. self.assertEqual(results1, [1] * 3)
  333. self.assertEqual(results2, [])
  334. # Notify 5 threads: they might be in their first or second wait
  335. cond.acquire()
  336. cond.notify(5)
  337. _wait()
  338. phase_num = 2
  339. cond.release()
  340. while len(results1) + len(results2) < 8:
  341. _wait()
  342. self.assertEqual(results1, [1] * 3 + [2] * 2)
  343. self.assertEqual(results2, [2] * 3)
  344. # Notify all threads: they are all in their second wait
  345. cond.acquire()
  346. cond.notify_all()
  347. _wait()
  348. phase_num = 3
  349. cond.release()
  350. while len(results2) < 5:
  351. _wait()
  352. self.assertEqual(results1, [1] * 3 + [2] * 2)
  353. self.assertEqual(results2, [2] * 3 + [3] * 2)
  354. b.wait_for_finished()
  355. def test_notify(self):
  356. cond = self.condtype()
  357. self._check_notify(cond)
  358. # A second time, to check internal state is still ok.
  359. self._check_notify(cond)
  360. def test_timeout(self):
  361. cond = self.condtype()
  362. results = []
  363. N = 5
  364. def f():
  365. cond.acquire()
  366. t1 = time.time()
  367. cond.wait(0.2)
  368. t2 = time.time()
  369. cond.release()
  370. results.append(t2 - t1)
  371. Bunch(f, N).wait_for_finished()
  372. self.assertEqual(len(results), 5)
  373. for dt in results:
  374. self.assertTrue(dt >= 0.2, dt)
  375. class BaseSemaphoreTests(BaseTestCase):
  376. """
  377. Common tests for {bounded, unbounded} semaphore objects.
  378. """
  379. def test_constructor(self):
  380. self.assertRaises(ValueError, self.semtype, value = -1)
  381. self.assertRaises(ValueError, self.semtype, value = -sys.maxint)
  382. def test_acquire(self):
  383. sem = self.semtype(1)
  384. sem.acquire()
  385. sem.release()
  386. sem = self.semtype(2)
  387. sem.acquire()
  388. sem.acquire()
  389. sem.release()
  390. sem.release()
  391. def test_acquire_destroy(self):
  392. sem = self.semtype()
  393. sem.acquire()
  394. del sem
  395. def test_acquire_contended(self):
  396. sem = self.semtype(7)
  397. sem.acquire()
  398. N = 10
  399. results1 = []
  400. results2 = []
  401. phase_num = 0
  402. def f():
  403. sem.acquire()
  404. results1.append(phase_num)
  405. sem.acquire()
  406. results2.append(phase_num)
  407. b = Bunch(f, 10)
  408. b.wait_for_started()
  409. while len(results1) + len(results2) < 6:
  410. _wait()
  411. self.assertEqual(results1 + results2, [0] * 6)
  412. phase_num = 1
  413. for i in range(7):
  414. sem.release()
  415. while len(results1) + len(results2) < 13:
  416. _wait()
  417. self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
  418. phase_num = 2
  419. for i in range(6):
  420. sem.release()
  421. while len(results1) + len(results2) < 19:
  422. _wait()
  423. self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
  424. # The semaphore is still locked
  425. self.assertFalse(sem.acquire(False))
  426. # Final release, to let the last thread finish
  427. sem.release()
  428. b.wait_for_finished()
  429. def test_try_acquire(self):
  430. sem = self.semtype(2)
  431. self.assertTrue(sem.acquire(False))
  432. self.assertTrue(sem.acquire(False))
  433. self.assertFalse(sem.acquire(False))
  434. sem.release()
  435. self.assertTrue(sem.acquire(False))
  436. def test_try_acquire_contended(self):
  437. sem = self.semtype(4)
  438. sem.acquire()
  439. results = []
  440. def f():
  441. results.append(sem.acquire(False))
  442. results.append(sem.acquire(False))
  443. Bunch(f, 5).wait_for_finished()
  444. # There can be a thread switch between acquiring the semaphore and
  445. # appending the result, therefore results will not necessarily be
  446. # ordered.
  447. self.assertEqual(sorted(results), [False] * 7 + [True] * 3 )
  448. def test_default_value(self):
  449. # The default initial value is 1.
  450. sem = self.semtype()
  451. sem.acquire()
  452. def f():
  453. sem.acquire()
  454. sem.release()
  455. b = Bunch(f, 1)
  456. b.wait_for_started()
  457. _wait()
  458. self.assertFalse(b.finished)
  459. sem.release()
  460. b.wait_for_finished()
  461. def test_with(self):
  462. sem = self.semtype(2)
  463. def _with(err=None):
  464. with sem:
  465. self.assertTrue(sem.acquire(False))
  466. sem.release()
  467. with sem:
  468. self.assertFalse(sem.acquire(False))
  469. if err:
  470. raise err
  471. _with()
  472. self.assertTrue(sem.acquire(False))
  473. sem.release()
  474. self.assertRaises(TypeError, _with, TypeError)
  475. self.assertTrue(sem.acquire(False))
  476. sem.release()
  477. class SemaphoreTests(BaseSemaphoreTests):
  478. """
  479. Tests for unbounded semaphores.
  480. """
  481. def test_release_unacquired(self):
  482. # Unbounded releases are allowed and increment the semaphore's value
  483. sem = self.semtype(1)
  484. sem.release()
  485. sem.acquire()
  486. sem.acquire()
  487. sem.release()
  488. class BoundedSemaphoreTests(BaseSemaphoreTests):
  489. """
  490. Tests for bounded semaphores.
  491. """
  492. def test_release_unacquired(self):
  493. # Cannot go past the initial value
  494. sem = self.semtype()
  495. self.assertRaises(ValueError, sem.release)
  496. sem.acquire()
  497. sem.release()
  498. self.assertRaises(ValueError, sem.release)