test_threading.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933
  1. # Very rudimentary test of threading module
  2. import test.test_support
  3. from test.test_support import verbose, cpython_only
  4. from test.script_helper import assert_python_ok
  5. import random
  6. import re
  7. import sys
  8. thread = test.test_support.import_module('thread')
  9. threading = test.test_support.import_module('threading')
  10. import time
  11. import unittest
  12. import weakref
  13. import os
  14. import subprocess
  15. try:
  16. import _testcapi
  17. except ImportError:
  18. _testcapi = None
  19. from test import lock_tests
  20. # A trivial mutable counter.
  21. class Counter(object):
  22. def __init__(self):
  23. self.value = 0
  24. def inc(self):
  25. self.value += 1
  26. def dec(self):
  27. self.value -= 1
  28. def get(self):
  29. return self.value
  30. class TestThread(threading.Thread):
  31. def __init__(self, name, testcase, sema, mutex, nrunning):
  32. threading.Thread.__init__(self, name=name)
  33. self.testcase = testcase
  34. self.sema = sema
  35. self.mutex = mutex
  36. self.nrunning = nrunning
  37. def run(self):
  38. delay = random.random() / 10000.0
  39. if verbose:
  40. print 'task %s will run for %.1f usec' % (
  41. self.name, delay * 1e6)
  42. with self.sema:
  43. with self.mutex:
  44. self.nrunning.inc()
  45. if verbose:
  46. print self.nrunning.get(), 'tasks are running'
  47. self.testcase.assertLessEqual(self.nrunning.get(), 3)
  48. time.sleep(delay)
  49. if verbose:
  50. print 'task', self.name, 'done'
  51. with self.mutex:
  52. self.nrunning.dec()
  53. self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
  54. if verbose:
  55. print '%s is finished. %d tasks are running' % (
  56. self.name, self.nrunning.get())
  57. class BaseTestCase(unittest.TestCase):
  58. def setUp(self):
  59. self._threads = test.test_support.threading_setup()
  60. def tearDown(self):
  61. test.test_support.threading_cleanup(*self._threads)
  62. test.test_support.reap_children()
  63. class ThreadTests(BaseTestCase):
  64. # Create a bunch of threads, let each do some work, wait until all are
  65. # done.
  66. def test_various_ops(self):
  67. # This takes about n/3 seconds to run (about n/3 clumps of tasks,
  68. # times about 1 second per clump).
  69. NUMTASKS = 10
  70. # no more than 3 of the 10 can run at once
  71. sema = threading.BoundedSemaphore(value=3)
  72. mutex = threading.RLock()
  73. numrunning = Counter()
  74. threads = []
  75. for i in range(NUMTASKS):
  76. t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
  77. threads.append(t)
  78. self.assertIsNone(t.ident)
  79. self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, initial\)>$')
  80. t.start()
  81. if verbose:
  82. print 'waiting for all tasks to complete'
  83. for t in threads:
  84. t.join(NUMTASKS)
  85. self.assertFalse(t.is_alive())
  86. self.assertNotEqual(t.ident, 0)
  87. self.assertIsNotNone(t.ident)
  88. self.assertRegexpMatches(repr(t), r'^<TestThread\(.*, \w+ -?\d+\)>$')
  89. if verbose:
  90. print 'all tasks done'
  91. self.assertEqual(numrunning.get(), 0)
  92. def test_ident_of_no_threading_threads(self):
  93. # The ident still must work for the main thread and dummy threads.
  94. self.assertIsNotNone(threading.currentThread().ident)
  95. def f():
  96. ident.append(threading.currentThread().ident)
  97. done.set()
  98. done = threading.Event()
  99. ident = []
  100. thread.start_new_thread(f, ())
  101. done.wait()
  102. self.assertIsNotNone(ident[0])
  103. # Kill the "immortal" _DummyThread
  104. del threading._active[ident[0]]
  105. # run with a small(ish) thread stack size (256kB)
  106. def test_various_ops_small_stack(self):
  107. if verbose:
  108. print 'with 256kB thread stack size...'
  109. try:
  110. threading.stack_size(262144)
  111. except thread.error:
  112. self.skipTest('platform does not support changing thread stack size')
  113. self.test_various_ops()
  114. threading.stack_size(0)
  115. # run with a large thread stack size (1MB)
  116. def test_various_ops_large_stack(self):
  117. if verbose:
  118. print 'with 1MB thread stack size...'
  119. try:
  120. threading.stack_size(0x100000)
  121. except thread.error:
  122. self.skipTest('platform does not support changing thread stack size')
  123. self.test_various_ops()
  124. threading.stack_size(0)
  125. def test_foreign_thread(self):
  126. # Check that a "foreign" thread can use the threading module.
  127. def f(mutex):
  128. # Calling current_thread() forces an entry for the foreign
  129. # thread to get made in the threading._active map.
  130. threading.current_thread()
  131. mutex.release()
  132. mutex = threading.Lock()
  133. mutex.acquire()
  134. tid = thread.start_new_thread(f, (mutex,))
  135. # Wait for the thread to finish.
  136. mutex.acquire()
  137. self.assertIn(tid, threading._active)
  138. self.assertIsInstance(threading._active[tid], threading._DummyThread)
  139. del threading._active[tid]
  140. # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
  141. # exposed at the Python level. This test relies on ctypes to get at it.
  142. def test_PyThreadState_SetAsyncExc(self):
  143. try:
  144. import ctypes
  145. except ImportError:
  146. self.skipTest('requires ctypes')
  147. set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
  148. class AsyncExc(Exception):
  149. pass
  150. exception = ctypes.py_object(AsyncExc)
  151. # First check it works when setting the exception from the same thread.
  152. tid = thread.get_ident()
  153. try:
  154. result = set_async_exc(ctypes.c_long(tid), exception)
  155. # The exception is async, so we might have to keep the VM busy until
  156. # it notices.
  157. while True:
  158. pass
  159. except AsyncExc:
  160. pass
  161. else:
  162. # This code is unreachable but it reflects the intent. If we wanted
  163. # to be smarter the above loop wouldn't be infinite.
  164. self.fail("AsyncExc not raised")
  165. try:
  166. self.assertEqual(result, 1) # one thread state modified
  167. except UnboundLocalError:
  168. # The exception was raised too quickly for us to get the result.
  169. pass
  170. # `worker_started` is set by the thread when it's inside a try/except
  171. # block waiting to catch the asynchronously set AsyncExc exception.
  172. # `worker_saw_exception` is set by the thread upon catching that
  173. # exception.
  174. worker_started = threading.Event()
  175. worker_saw_exception = threading.Event()
  176. class Worker(threading.Thread):
  177. def run(self):
  178. self.id = thread.get_ident()
  179. self.finished = False
  180. try:
  181. while True:
  182. worker_started.set()
  183. time.sleep(0.1)
  184. except AsyncExc:
  185. self.finished = True
  186. worker_saw_exception.set()
  187. t = Worker()
  188. t.daemon = True # so if this fails, we don't hang Python at shutdown
  189. t.start()
  190. if verbose:
  191. print " started worker thread"
  192. # Try a thread id that doesn't make sense.
  193. if verbose:
  194. print " trying nonsensical thread id"
  195. result = set_async_exc(ctypes.c_long(-1), exception)
  196. self.assertEqual(result, 0) # no thread states modified
  197. # Now raise an exception in the worker thread.
  198. if verbose:
  199. print " waiting for worker thread to get started"
  200. ret = worker_started.wait()
  201. self.assertTrue(ret)
  202. if verbose:
  203. print " verifying worker hasn't exited"
  204. self.assertFalse(t.finished)
  205. if verbose:
  206. print " attempting to raise asynch exception in worker"
  207. result = set_async_exc(ctypes.c_long(t.id), exception)
  208. self.assertEqual(result, 1) # one thread state modified
  209. if verbose:
  210. print " waiting for worker to say it caught the exception"
  211. worker_saw_exception.wait(timeout=10)
  212. self.assertTrue(t.finished)
  213. if verbose:
  214. print " all OK -- joining worker"
  215. if t.finished:
  216. t.join()
  217. # else the thread is still running, and we have no way to kill it
  218. def test_limbo_cleanup(self):
  219. # Issue 7481: Failure to start thread should cleanup the limbo map.
  220. def fail_new_thread(*args):
  221. raise thread.error()
  222. _start_new_thread = threading._start_new_thread
  223. threading._start_new_thread = fail_new_thread
  224. try:
  225. t = threading.Thread(target=lambda: None)
  226. self.assertRaises(thread.error, t.start)
  227. self.assertFalse(
  228. t in threading._limbo,
  229. "Failed to cleanup _limbo map on failure of Thread.start().")
  230. finally:
  231. threading._start_new_thread = _start_new_thread
  232. def test_finalize_runnning_thread(self):
  233. # Issue 1402: the PyGILState_Ensure / _Release functions may be called
  234. # very late on python exit: on deallocation of a running thread for
  235. # example.
  236. try:
  237. import ctypes
  238. except ImportError:
  239. self.skipTest('requires ctypes')
  240. rc = subprocess.call([sys.executable, "-c", """if 1:
  241. import ctypes, sys, time, thread
  242. # This lock is used as a simple event variable.
  243. ready = thread.allocate_lock()
  244. ready.acquire()
  245. # Module globals are cleared before __del__ is run
  246. # So we save the functions in class dict
  247. class C:
  248. ensure = ctypes.pythonapi.PyGILState_Ensure
  249. release = ctypes.pythonapi.PyGILState_Release
  250. def __del__(self):
  251. state = self.ensure()
  252. self.release(state)
  253. def waitingThread():
  254. x = C()
  255. ready.release()
  256. time.sleep(100)
  257. thread.start_new_thread(waitingThread, ())
  258. ready.acquire() # Be sure the other thread is waiting.
  259. sys.exit(42)
  260. """])
  261. self.assertEqual(rc, 42)
  262. def test_finalize_with_trace(self):
  263. # Issue1733757
  264. # Avoid a deadlock when sys.settrace steps into threading._shutdown
  265. p = subprocess.Popen([sys.executable, "-c", """if 1:
  266. import sys, threading
  267. # A deadlock-killer, to prevent the
  268. # testsuite to hang forever
  269. def killer():
  270. import os, time
  271. time.sleep(2)
  272. print 'program blocked; aborting'
  273. os._exit(2)
  274. t = threading.Thread(target=killer)
  275. t.daemon = True
  276. t.start()
  277. # This is the trace function
  278. def func(frame, event, arg):
  279. threading.current_thread()
  280. return func
  281. sys.settrace(func)
  282. """],
  283. stdout=subprocess.PIPE,
  284. stderr=subprocess.PIPE)
  285. self.addCleanup(p.stdout.close)
  286. self.addCleanup(p.stderr.close)
  287. stdout, stderr = p.communicate()
  288. rc = p.returncode
  289. self.assertFalse(rc == 2, "interpreted was blocked")
  290. self.assertTrue(rc == 0,
  291. "Unexpected error: " + repr(stderr))
  292. def test_join_nondaemon_on_shutdown(self):
  293. # Issue 1722344
  294. # Raising SystemExit skipped threading._shutdown
  295. p = subprocess.Popen([sys.executable, "-c", """if 1:
  296. import threading
  297. from time import sleep
  298. def child():
  299. sleep(1)
  300. # As a non-daemon thread we SHOULD wake up and nothing
  301. # should be torn down yet
  302. print "Woke up, sleep function is:", sleep
  303. threading.Thread(target=child).start()
  304. raise SystemExit
  305. """],
  306. stdout=subprocess.PIPE,
  307. stderr=subprocess.PIPE)
  308. self.addCleanup(p.stdout.close)
  309. self.addCleanup(p.stderr.close)
  310. stdout, stderr = p.communicate()
  311. self.assertEqual(stdout.strip(),
  312. "Woke up, sleep function is: <built-in function sleep>")
  313. stderr = re.sub(r"^\[\d+ refs\]", "", stderr, re.MULTILINE).strip()
  314. self.assertEqual(stderr, "")
  315. def test_enumerate_after_join(self):
  316. # Try hard to trigger #1703448: a thread is still returned in
  317. # threading.enumerate() after it has been join()ed.
  318. enum = threading.enumerate
  319. old_interval = sys.getcheckinterval()
  320. try:
  321. for i in xrange(1, 100):
  322. # Try a couple times at each thread-switching interval
  323. # to get more interleavings.
  324. sys.setcheckinterval(i // 5)
  325. t = threading.Thread(target=lambda: None)
  326. t.start()
  327. t.join()
  328. l = enum()
  329. self.assertNotIn(t, l,
  330. "#1703448 triggered after %d trials: %s" % (i, l))
  331. finally:
  332. sys.setcheckinterval(old_interval)
  333. def test_no_refcycle_through_target(self):
  334. class RunSelfFunction(object):
  335. def __init__(self, should_raise):
  336. # The links in this refcycle from Thread back to self
  337. # should be cleaned up when the thread completes.
  338. self.should_raise = should_raise
  339. self.thread = threading.Thread(target=self._run,
  340. args=(self,),
  341. kwargs={'yet_another':self})
  342. self.thread.start()
  343. def _run(self, other_ref, yet_another):
  344. if self.should_raise:
  345. raise SystemExit
  346. cyclic_object = RunSelfFunction(should_raise=False)
  347. weak_cyclic_object = weakref.ref(cyclic_object)
  348. cyclic_object.thread.join()
  349. del cyclic_object
  350. self.assertEqual(None, weak_cyclic_object(),
  351. msg=('%d references still around' %
  352. sys.getrefcount(weak_cyclic_object())))
  353. raising_cyclic_object = RunSelfFunction(should_raise=True)
  354. weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
  355. raising_cyclic_object.thread.join()
  356. del raising_cyclic_object
  357. self.assertEqual(None, weak_raising_cyclic_object(),
  358. msg=('%d references still around' %
  359. sys.getrefcount(weak_raising_cyclic_object())))
  360. @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
  361. def test_dummy_thread_after_fork(self):
  362. # Issue #14308: a dummy thread in the active list doesn't mess up
  363. # the after-fork mechanism.
  364. code = """if 1:
  365. import thread, threading, os, time
  366. def background_thread(evt):
  367. # Creates and registers the _DummyThread instance
  368. threading.current_thread()
  369. evt.set()
  370. time.sleep(10)
  371. evt = threading.Event()
  372. thread.start_new_thread(background_thread, (evt,))
  373. evt.wait()
  374. assert threading.active_count() == 2, threading.active_count()
  375. if os.fork() == 0:
  376. assert threading.active_count() == 1, threading.active_count()
  377. os._exit(0)
  378. else:
  379. os.wait()
  380. """
  381. _, out, err = assert_python_ok("-c", code)
  382. self.assertEqual(out, '')
  383. self.assertEqual(err, '')
  384. @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
  385. def test_is_alive_after_fork(self):
  386. # Try hard to trigger #18418: is_alive() could sometimes be True on
  387. # threads that vanished after a fork.
  388. old_interval = sys.getcheckinterval()
  389. # Make the bug more likely to manifest.
  390. sys.setcheckinterval(10)
  391. try:
  392. for i in range(20):
  393. t = threading.Thread(target=lambda: None)
  394. t.start()
  395. pid = os.fork()
  396. if pid == 0:
  397. os._exit(1 if t.is_alive() else 0)
  398. else:
  399. t.join()
  400. pid, status = os.waitpid(pid, 0)
  401. self.assertEqual(0, status)
  402. finally:
  403. sys.setcheckinterval(old_interval)
  404. def test_BoundedSemaphore_limit(self):
  405. # BoundedSemaphore should raise ValueError if released too often.
  406. for limit in range(1, 10):
  407. bs = threading.BoundedSemaphore(limit)
  408. threads = [threading.Thread(target=bs.acquire)
  409. for _ in range(limit)]
  410. for t in threads:
  411. t.start()
  412. for t in threads:
  413. t.join()
  414. threads = [threading.Thread(target=bs.release)
  415. for _ in range(limit)]
  416. for t in threads:
  417. t.start()
  418. for t in threads:
  419. t.join()
  420. self.assertRaises(ValueError, bs.release)
  421. class ThreadJoinOnShutdown(BaseTestCase):
  422. # Between fork() and exec(), only async-safe functions are allowed (issues
  423. # #12316 and #11870), and fork() from a worker thread is known to trigger
  424. # problems with some operating systems (issue #3863): skip problematic tests
  425. # on platforms known to behave badly.
  426. platforms_to_skip = ('freebsd4', 'freebsd5', 'freebsd6', 'netbsd5',
  427. 'os2emx')
  428. def _run_and_join(self, script):
  429. script = """if 1:
  430. import sys, os, time, threading
  431. # a thread, which waits for the main program to terminate
  432. def joiningfunc(mainthread):
  433. mainthread.join()
  434. print 'end of thread'
  435. \n""" + script
  436. p = subprocess.Popen([sys.executable, "-c", script], stdout=subprocess.PIPE)
  437. rc = p.wait()
  438. data = p.stdout.read().replace('\r', '')
  439. p.stdout.close()
  440. self.assertEqual(data, "end of main\nend of thread\n")
  441. self.assertFalse(rc == 2, "interpreter was blocked")
  442. self.assertTrue(rc == 0, "Unexpected error")
  443. def test_1_join_on_shutdown(self):
  444. # The usual case: on exit, wait for a non-daemon thread
  445. script = """if 1:
  446. import os
  447. t = threading.Thread(target=joiningfunc,
  448. args=(threading.current_thread(),))
  449. t.start()
  450. time.sleep(0.1)
  451. print 'end of main'
  452. """
  453. self._run_and_join(script)
  454. @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
  455. @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
  456. def test_2_join_in_forked_process(self):
  457. # Like the test above, but from a forked interpreter
  458. script = """if 1:
  459. childpid = os.fork()
  460. if childpid != 0:
  461. os.waitpid(childpid, 0)
  462. sys.exit(0)
  463. t = threading.Thread(target=joiningfunc,
  464. args=(threading.current_thread(),))
  465. t.start()
  466. print 'end of main'
  467. """
  468. self._run_and_join(script)
  469. @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
  470. @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
  471. def test_3_join_in_forked_from_thread(self):
  472. # Like the test above, but fork() was called from a worker thread
  473. # In the forked process, the main Thread object must be marked as stopped.
  474. script = """if 1:
  475. main_thread = threading.current_thread()
  476. def worker():
  477. childpid = os.fork()
  478. if childpid != 0:
  479. os.waitpid(childpid, 0)
  480. sys.exit(0)
  481. t = threading.Thread(target=joiningfunc,
  482. args=(main_thread,))
  483. print 'end of main'
  484. t.start()
  485. t.join() # Should not block: main_thread is already stopped
  486. w = threading.Thread(target=worker)
  487. w.start()
  488. """
  489. self._run_and_join(script)
  490. def assertScriptHasOutput(self, script, expected_output):
  491. p = subprocess.Popen([sys.executable, "-c", script],
  492. stdout=subprocess.PIPE)
  493. rc = p.wait()
  494. data = p.stdout.read().decode().replace('\r', '')
  495. self.assertEqual(rc, 0, "Unexpected error")
  496. self.assertEqual(data, expected_output)
  497. @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
  498. @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
  499. def test_4_joining_across_fork_in_worker_thread(self):
  500. # There used to be a possible deadlock when forking from a child
  501. # thread. See http://bugs.python.org/issue6643.
  502. # The script takes the following steps:
  503. # - The main thread in the parent process starts a new thread and then
  504. # tries to join it.
  505. # - The join operation acquires the Lock inside the thread's _block
  506. # Condition. (See threading.py:Thread.join().)
  507. # - We stub out the acquire method on the condition to force it to wait
  508. # until the child thread forks. (See LOCK ACQUIRED HERE)
  509. # - The child thread forks. (See LOCK HELD and WORKER THREAD FORKS
  510. # HERE)
  511. # - The main thread of the parent process enters Condition.wait(),
  512. # which releases the lock on the child thread.
  513. # - The child process returns. Without the necessary fix, when the
  514. # main thread of the child process (which used to be the child thread
  515. # in the parent process) attempts to exit, it will try to acquire the
  516. # lock in the Thread._block Condition object and hang, because the
  517. # lock was held across the fork.
  518. script = """if 1:
  519. import os, time, threading
  520. finish_join = False
  521. start_fork = False
  522. def worker():
  523. # Wait until this thread's lock is acquired before forking to
  524. # create the deadlock.
  525. global finish_join
  526. while not start_fork:
  527. time.sleep(0.01)
  528. # LOCK HELD: Main thread holds lock across this call.
  529. childpid = os.fork()
  530. finish_join = True
  531. if childpid != 0:
  532. # Parent process just waits for child.
  533. os.waitpid(childpid, 0)
  534. # Child process should just return.
  535. w = threading.Thread(target=worker)
  536. # Stub out the private condition variable's lock acquire method.
  537. # This acquires the lock and then waits until the child has forked
  538. # before returning, which will release the lock soon after. If
  539. # someone else tries to fix this test case by acquiring this lock
  540. # before forking instead of resetting it, the test case will
  541. # deadlock when it shouldn't.
  542. condition = w._block
  543. orig_acquire = condition.acquire
  544. call_count_lock = threading.Lock()
  545. call_count = 0
  546. def my_acquire():
  547. global call_count
  548. global start_fork
  549. orig_acquire() # LOCK ACQUIRED HERE
  550. start_fork = True
  551. if call_count == 0:
  552. while not finish_join:
  553. time.sleep(0.01) # WORKER THREAD FORKS HERE
  554. with call_count_lock:
  555. call_count += 1
  556. condition.acquire = my_acquire
  557. w.start()
  558. w.join()
  559. print('end of main')
  560. """
  561. self.assertScriptHasOutput(script, "end of main\n")
  562. @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
  563. @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
  564. def test_5_clear_waiter_locks_to_avoid_crash(self):
  565. # Check that a spawned thread that forks doesn't segfault on certain
  566. # platforms, namely OS X. This used to happen if there was a waiter
  567. # lock in the thread's condition variable's waiters list. Even though
  568. # we know the lock will be held across the fork, it is not safe to
  569. # release locks held across forks on all platforms, so releasing the
  570. # waiter lock caused a segfault on OS X. Furthermore, since locks on
  571. # OS X are (as of this writing) implemented with a mutex + condition
  572. # variable instead of a semaphore, while we know that the Python-level
  573. # lock will be acquired, we can't know if the internal mutex will be
  574. # acquired at the time of the fork.
  575. script = """if True:
  576. import os, time, threading
  577. start_fork = False
  578. def worker():
  579. # Wait until the main thread has attempted to join this thread
  580. # before continuing.
  581. while not start_fork:
  582. time.sleep(0.01)
  583. childpid = os.fork()
  584. if childpid != 0:
  585. # Parent process just waits for child.
  586. (cpid, rc) = os.waitpid(childpid, 0)
  587. assert cpid == childpid
  588. assert rc == 0
  589. print('end of worker thread')
  590. else:
  591. # Child process should just return.
  592. pass
  593. w = threading.Thread(target=worker)
  594. # Stub out the private condition variable's _release_save method.
  595. # This releases the condition's lock and flips the global that
  596. # causes the worker to fork. At this point, the problematic waiter
  597. # lock has been acquired once by the waiter and has been put onto
  598. # the waiters list.
  599. condition = w._block
  600. orig_release_save = condition._release_save
  601. def my_release_save():
  602. global start_fork
  603. orig_release_save()
  604. # Waiter lock held here, condition lock released.
  605. start_fork = True
  606. condition._release_save = my_release_save
  607. w.start()
  608. w.join()
  609. print('end of main thread')
  610. """
  611. output = "end of worker thread\nend of main thread\n"
  612. self.assertScriptHasOutput(script, output)
  613. @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
  614. @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
  615. def test_reinit_tls_after_fork(self):
  616. # Issue #13817: fork() would deadlock in a multithreaded program with
  617. # the ad-hoc TLS implementation.
  618. def do_fork_and_wait():
  619. # just fork a child process and wait it
  620. pid = os.fork()
  621. if pid > 0:
  622. os.waitpid(pid, 0)
  623. else:
  624. os._exit(0)
  625. # start a bunch of threads that will fork() child processes
  626. threads = []
  627. for i in range(16):
  628. t = threading.Thread(target=do_fork_and_wait)
  629. threads.append(t)
  630. t.start()
  631. for t in threads:
  632. t.join()
  633. @cpython_only
  634. @unittest.skipIf(_testcapi is None, "need _testcapi module")
  635. def test_frame_tstate_tracing(self):
  636. # Issue #14432: Crash when a generator is created in a C thread that is
  637. # destroyed while the generator is still used. The issue was that a
  638. # generator contains a frame, and the frame kept a reference to the
  639. # Python state of the destroyed C thread. The crash occurs when a trace
  640. # function is setup.
  641. def noop_trace(frame, event, arg):
  642. # no operation
  643. return noop_trace
  644. def generator():
  645. while 1:
  646. yield "genereator"
  647. def callback():
  648. if callback.gen is None:
  649. callback.gen = generator()
  650. return next(callback.gen)
  651. callback.gen = None
  652. old_trace = sys.gettrace()
  653. sys.settrace(noop_trace)
  654. try:
  655. # Install a trace function
  656. threading.settrace(noop_trace)
  657. # Create a generator in a C thread which exits after the call
  658. _testcapi.call_in_temporary_c_thread(callback)
  659. # Call the generator in a different Python thread, check that the
  660. # generator didn't keep a reference to the destroyed thread state
  661. for test in range(3):
  662. # The trace function is still called here
  663. callback()
  664. finally:
  665. sys.settrace(old_trace)
  666. class ThreadingExceptionTests(BaseTestCase):
  667. # A RuntimeError should be raised if Thread.start() is called
  668. # multiple times.
  669. def test_start_thread_again(self):
  670. thread = threading.Thread()
  671. thread.start()
  672. self.assertRaises(RuntimeError, thread.start)
  673. def test_joining_current_thread(self):
  674. current_thread = threading.current_thread()
  675. self.assertRaises(RuntimeError, current_thread.join);
  676. def test_joining_inactive_thread(self):
  677. thread = threading.Thread()
  678. self.assertRaises(RuntimeError, thread.join)
  679. def test_daemonize_active_thread(self):
  680. thread = threading.Thread()
  681. thread.start()
  682. self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
  683. def test_print_exception(self):
  684. script = r"""if 1:
  685. import threading
  686. import time
  687. running = False
  688. def run():
  689. global running
  690. running = True
  691. while running:
  692. time.sleep(0.01)
  693. 1.0/0.0
  694. t = threading.Thread(target=run)
  695. t.start()
  696. while not running:
  697. time.sleep(0.01)
  698. running = False
  699. t.join()
  700. """
  701. rc, out, err = assert_python_ok("-c", script)
  702. self.assertEqual(out, '')
  703. self.assertIn("Exception in thread", err)
  704. self.assertIn("Traceback (most recent call last):", err)
  705. self.assertIn("ZeroDivisionError", err)
  706. self.assertNotIn("Unhandled exception", err)
  707. def test_print_exception_stderr_is_none_1(self):
  708. script = r"""if 1:
  709. import sys
  710. import threading
  711. import time
  712. running = False
  713. def run():
  714. global running
  715. running = True
  716. while running:
  717. time.sleep(0.01)
  718. 1.0/0.0
  719. t = threading.Thread(target=run)
  720. t.start()
  721. while not running:
  722. time.sleep(0.01)
  723. sys.stderr = None
  724. running = False
  725. t.join()
  726. """
  727. rc, out, err = assert_python_ok("-c", script)
  728. self.assertEqual(out, '')
  729. self.assertIn("Exception in thread", err)
  730. self.assertIn("Traceback (most recent call last):", err)
  731. self.assertIn("ZeroDivisionError", err)
  732. self.assertNotIn("Unhandled exception", err)
  733. def test_print_exception_stderr_is_none_2(self):
  734. script = r"""if 1:
  735. import sys
  736. import threading
  737. import time
  738. running = False
  739. def run():
  740. global running
  741. running = True
  742. while running:
  743. time.sleep(0.01)
  744. 1.0/0.0
  745. sys.stderr = None
  746. t = threading.Thread(target=run)
  747. t.start()
  748. while not running:
  749. time.sleep(0.01)
  750. running = False
  751. t.join()
  752. """
  753. rc, out, err = assert_python_ok("-c", script)
  754. self.assertEqual(out, '')
  755. self.assertNotIn("Unhandled exception", err)
  756. class LockTests(lock_tests.LockTests):
  757. locktype = staticmethod(threading.Lock)
  758. class RLockTests(lock_tests.RLockTests):
  759. locktype = staticmethod(threading.RLock)
  760. class EventTests(lock_tests.EventTests):
  761. eventtype = staticmethod(threading.Event)
  762. class ConditionAsRLockTests(lock_tests.RLockTests):
  763. # Condition uses an RLock by default and exports its API.
  764. locktype = staticmethod(threading.Condition)
  765. class ConditionTests(lock_tests.ConditionTests):
  766. condtype = staticmethod(threading.Condition)
  767. class SemaphoreTests(lock_tests.SemaphoreTests):
  768. semtype = staticmethod(threading.Semaphore)
  769. class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
  770. semtype = staticmethod(threading.BoundedSemaphore)
  771. @unittest.skipUnless(sys.platform == 'darwin', 'test macosx problem')
  772. def test_recursion_limit(self):
  773. # Issue 9670
  774. # test that excessive recursion within a non-main thread causes
  775. # an exception rather than crashing the interpreter on platforms
  776. # like Mac OS X or FreeBSD which have small default stack sizes
  777. # for threads
  778. script = """if True:
  779. import threading
  780. def recurse():
  781. return recurse()
  782. def outer():
  783. try:
  784. recurse()
  785. except RuntimeError:
  786. pass
  787. w = threading.Thread(target=outer)
  788. w.start()
  789. w.join()
  790. print('end of main thread')
  791. """
  792. expected_output = "end of main thread\n"
  793. p = subprocess.Popen([sys.executable, "-c", script],
  794. stdout=subprocess.PIPE)
  795. stdout, stderr = p.communicate()
  796. data = stdout.decode().replace('\r', '')
  797. self.assertEqual(p.returncode, 0, "Unexpected error")
  798. self.assertEqual(data, expected_output)
  799. def test_main():
  800. test.test_support.run_unittest(LockTests, RLockTests, EventTests,
  801. ConditionAsRLockTests, ConditionTests,
  802. SemaphoreTests, BoundedSemaphoreTests,
  803. ThreadTests,
  804. ThreadJoinOnShutdown,
  805. ThreadingExceptionTests,
  806. )
  807. if __name__ == "__main__":
  808. test_main()