threading.py 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322
  1. """Thread module emulating a subset of Java's threading model."""
  2. import sys as _sys
  3. try:
  4. import thread
  5. except ImportError:
  6. del _sys.modules[__name__]
  7. raise
  8. import warnings
  9. from collections import deque as _deque
  10. from itertools import count as _count
  11. from time import time as _time, sleep as _sleep
  12. from traceback import format_exc as _format_exc
  13. # Note regarding PEP 8 compliant aliases
  14. # This threading model was originally inspired by Java, and inherited
  15. # the convention of camelCase function and method names from that
  16. # language. While those names are not in any imminent danger of being
  17. # deprecated, starting with Python 2.6, the module now provides a
  18. # PEP 8 compliant alias for any such method name.
  19. # Using the new PEP 8 compliant names also facilitates substitution
  20. # with the multiprocessing module, which doesn't provide the old
  21. # Java inspired names.
  22. # Rename some stuff so "from threading import *" is safe
  23. __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
  24. 'current_thread', 'enumerate', 'Event',
  25. 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
  26. 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
  27. _start_new_thread = thread.start_new_thread
  28. _allocate_lock = thread.allocate_lock
  29. _get_ident = thread.get_ident
  30. ThreadError = thread.error
  31. del thread
  32. # sys.exc_clear is used to work around the fact that except blocks
  33. # don't fully clear the exception until 3.0.
  34. warnings.filterwarnings('ignore', category=DeprecationWarning,
  35. module='threading', message='sys.exc_clear')
  36. # Debug support (adapted from ihooks.py).
  37. # All the major classes here derive from _Verbose. We force that to
  38. # be a new-style class so that all the major classes here are new-style.
  39. # This helps debugging (type(instance) is more revealing for instances
  40. # of new-style classes).
  41. _VERBOSE = False
  42. if __debug__:
  43. class _Verbose(object):
  44. def __init__(self, verbose=None):
  45. if verbose is None:
  46. verbose = _VERBOSE
  47. self.__verbose = verbose
  48. def _note(self, format, *args):
  49. if self.__verbose:
  50. format = format % args
  51. # Issue #4188: calling current_thread() can incur an infinite
  52. # recursion if it has to create a DummyThread on the fly.
  53. ident = _get_ident()
  54. try:
  55. name = _active[ident].name
  56. except KeyError:
  57. name = "<OS thread %d>" % ident
  58. format = "%s: %s\n" % (name, format)
  59. _sys.stderr.write(format)
  60. else:
  61. # Disable this when using "python -O"
  62. class _Verbose(object):
  63. def __init__(self, verbose=None):
  64. pass
  65. def _note(self, *args):
  66. pass
  67. # Support for profile and trace hooks
  68. _profile_hook = None
  69. _trace_hook = None
  70. def setprofile(func):
  71. """Set a profile function for all threads started from the threading module.
  72. The func will be passed to sys.setprofile() for each thread, before its
  73. run() method is called.
  74. """
  75. global _profile_hook
  76. _profile_hook = func
  77. def settrace(func):
  78. """Set a trace function for all threads started from the threading module.
  79. The func will be passed to sys.settrace() for each thread, before its run()
  80. method is called.
  81. """
  82. global _trace_hook
  83. _trace_hook = func
  84. # Synchronization classes
  85. Lock = _allocate_lock
  86. def RLock(*args, **kwargs):
  87. """Factory function that returns a new reentrant lock.
  88. A reentrant lock must be released by the thread that acquired it. Once a
  89. thread has acquired a reentrant lock, the same thread may acquire it again
  90. without blocking; the thread must release it once for each time it has
  91. acquired it.
  92. """
  93. return _RLock(*args, **kwargs)
  94. class _RLock(_Verbose):
  95. """A reentrant lock must be released by the thread that acquired it. Once a
  96. thread has acquired a reentrant lock, the same thread may acquire it
  97. again without blocking; the thread must release it once for each time it
  98. has acquired it.
  99. """
  100. def __init__(self, verbose=None):
  101. _Verbose.__init__(self, verbose)
  102. self.__block = _allocate_lock()
  103. self.__owner = None
  104. self.__count = 0
  105. def __repr__(self):
  106. owner = self.__owner
  107. try:
  108. owner = _active[owner].name
  109. except KeyError:
  110. pass
  111. return "<%s owner=%r count=%d>" % (
  112. self.__class__.__name__, owner, self.__count)
  113. def acquire(self, blocking=1):
  114. """Acquire a lock, blocking or non-blocking.
  115. When invoked without arguments: if this thread already owns the lock,
  116. increment the recursion level by one, and return immediately. Otherwise,
  117. if another thread owns the lock, block until the lock is unlocked. Once
  118. the lock is unlocked (not owned by any thread), then grab ownership, set
  119. the recursion level to one, and return. If more than one thread is
  120. blocked waiting until the lock is unlocked, only one at a time will be
  121. able to grab ownership of the lock. There is no return value in this
  122. case.
  123. When invoked with the blocking argument set to true, do the same thing
  124. as when called without arguments, and return true.
  125. When invoked with the blocking argument set to false, do not block. If a
  126. call without an argument would block, return false immediately;
  127. otherwise, do the same thing as when called without arguments, and
  128. return true.
  129. """
  130. me = _get_ident()
  131. if self.__owner == me:
  132. self.__count = self.__count + 1
  133. if __debug__:
  134. self._note("%s.acquire(%s): recursive success", self, blocking)
  135. return 1
  136. rc = self.__block.acquire(blocking)
  137. if rc:
  138. self.__owner = me
  139. self.__count = 1
  140. if __debug__:
  141. self._note("%s.acquire(%s): initial success", self, blocking)
  142. else:
  143. if __debug__:
  144. self._note("%s.acquire(%s): failure", self, blocking)
  145. return rc
  146. __enter__ = acquire
  147. def release(self):
  148. """Release a lock, decrementing the recursion level.
  149. If after the decrement it is zero, reset the lock to unlocked (not owned
  150. by any thread), and if any other threads are blocked waiting for the
  151. lock to become unlocked, allow exactly one of them to proceed. If after
  152. the decrement the recursion level is still nonzero, the lock remains
  153. locked and owned by the calling thread.
  154. Only call this method when the calling thread owns the lock. A
  155. RuntimeError is raised if this method is called when the lock is
  156. unlocked.
  157. There is no return value.
  158. """
  159. if self.__owner != _get_ident():
  160. raise RuntimeError("cannot release un-acquired lock")
  161. self.__count = count = self.__count - 1
  162. if not count:
  163. self.__owner = None
  164. self.__block.release()
  165. if __debug__:
  166. self._note("%s.release(): final release", self)
  167. else:
  168. if __debug__:
  169. self._note("%s.release(): non-final release", self)
  170. def __exit__(self, t, v, tb):
  171. self.release()
  172. # Internal methods used by condition variables
  173. def _acquire_restore(self, count_owner):
  174. count, owner = count_owner
  175. self.__block.acquire()
  176. self.__count = count
  177. self.__owner = owner
  178. if __debug__:
  179. self._note("%s._acquire_restore()", self)
  180. def _release_save(self):
  181. if __debug__:
  182. self._note("%s._release_save()", self)
  183. count = self.__count
  184. self.__count = 0
  185. owner = self.__owner
  186. self.__owner = None
  187. self.__block.release()
  188. return (count, owner)
  189. def _is_owned(self):
  190. return self.__owner == _get_ident()
  191. def Condition(*args, **kwargs):
  192. """Factory function that returns a new condition variable object.
  193. A condition variable allows one or more threads to wait until they are
  194. notified by another thread.
  195. If the lock argument is given and not None, it must be a Lock or RLock
  196. object, and it is used as the underlying lock. Otherwise, a new RLock object
  197. is created and used as the underlying lock.
  198. """
  199. return _Condition(*args, **kwargs)
  200. class _Condition(_Verbose):
  201. """Condition variables allow one or more threads to wait until they are
  202. notified by another thread.
  203. """
  204. def __init__(self, lock=None, verbose=None):
  205. _Verbose.__init__(self, verbose)
  206. if lock is None:
  207. lock = RLock()
  208. self.__lock = lock
  209. # Export the lock's acquire() and release() methods
  210. self.acquire = lock.acquire
  211. self.release = lock.release
  212. # If the lock defines _release_save() and/or _acquire_restore(),
  213. # these override the default implementations (which just call
  214. # release() and acquire() on the lock). Ditto for _is_owned().
  215. try:
  216. self._release_save = lock._release_save
  217. except AttributeError:
  218. pass
  219. try:
  220. self._acquire_restore = lock._acquire_restore
  221. except AttributeError:
  222. pass
  223. try:
  224. self._is_owned = lock._is_owned
  225. except AttributeError:
  226. pass
  227. self.__waiters = []
  228. def __enter__(self):
  229. return self.__lock.__enter__()
  230. def __exit__(self, *args):
  231. return self.__lock.__exit__(*args)
  232. def __repr__(self):
  233. return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
  234. def _release_save(self):
  235. self.__lock.release() # No state to save
  236. def _acquire_restore(self, x):
  237. self.__lock.acquire() # Ignore saved state
  238. def _is_owned(self):
  239. # Return True if lock is owned by current_thread.
  240. # This method is called only if __lock doesn't have _is_owned().
  241. if self.__lock.acquire(0):
  242. self.__lock.release()
  243. return False
  244. else:
  245. return True
  246. def wait(self, timeout=None):
  247. """Wait until notified or until a timeout occurs.
  248. If the calling thread has not acquired the lock when this method is
  249. called, a RuntimeError is raised.
  250. This method releases the underlying lock, and then blocks until it is
  251. awakened by a notify() or notifyAll() call for the same condition
  252. variable in another thread, or until the optional timeout occurs. Once
  253. awakened or timed out, it re-acquires the lock and returns.
  254. When the timeout argument is present and not None, it should be a
  255. floating point number specifying a timeout for the operation in seconds
  256. (or fractions thereof).
  257. When the underlying lock is an RLock, it is not released using its
  258. release() method, since this may not actually unlock the lock when it
  259. was acquired multiple times recursively. Instead, an internal interface
  260. of the RLock class is used, which really unlocks it even when it has
  261. been recursively acquired several times. Another internal interface is
  262. then used to restore the recursion level when the lock is reacquired.
  263. """
  264. if not self._is_owned():
  265. raise RuntimeError("cannot wait on un-acquired lock")
  266. waiter = _allocate_lock()
  267. waiter.acquire()
  268. self.__waiters.append(waiter)
  269. saved_state = self._release_save()
  270. try: # restore state no matter what (e.g., KeyboardInterrupt)
  271. if timeout is None:
  272. waiter.acquire()
  273. if __debug__:
  274. self._note("%s.wait(): got it", self)
  275. else:
  276. # Balancing act: We can't afford a pure busy loop, so we
  277. # have to sleep; but if we sleep the whole timeout time,
  278. # we'll be unresponsive. The scheme here sleeps very
  279. # little at first, longer as time goes on, but never longer
  280. # than 20 times per second (or the timeout time remaining).
  281. endtime = _time() + timeout
  282. delay = 0.0005 # 500 us -> initial delay of 1 ms
  283. while True:
  284. gotit = waiter.acquire(0)
  285. if gotit:
  286. break
  287. remaining = endtime - _time()
  288. if remaining <= 0:
  289. break
  290. delay = min(delay * 2, remaining, .05)
  291. _sleep(delay)
  292. if not gotit:
  293. if __debug__:
  294. self._note("%s.wait(%s): timed out", self, timeout)
  295. try:
  296. self.__waiters.remove(waiter)
  297. except ValueError:
  298. pass
  299. else:
  300. if __debug__:
  301. self._note("%s.wait(%s): got it", self, timeout)
  302. finally:
  303. self._acquire_restore(saved_state)
  304. def notify(self, n=1):
  305. """Wake up one or more threads waiting on this condition, if any.
  306. If the calling thread has not acquired the lock when this method is
  307. called, a RuntimeError is raised.
  308. This method wakes up at most n of the threads waiting for the condition
  309. variable; it is a no-op if no threads are waiting.
  310. """
  311. if not self._is_owned():
  312. raise RuntimeError("cannot notify on un-acquired lock")
  313. __waiters = self.__waiters
  314. waiters = __waiters[:n]
  315. if not waiters:
  316. if __debug__:
  317. self._note("%s.notify(): no waiters", self)
  318. return
  319. self._note("%s.notify(): notifying %d waiter%s", self, n,
  320. n!=1 and "s" or "")
  321. for waiter in waiters:
  322. waiter.release()
  323. try:
  324. __waiters.remove(waiter)
  325. except ValueError:
  326. pass
  327. def notifyAll(self):
  328. """Wake up all threads waiting on this condition.
  329. If the calling thread has not acquired the lock when this method
  330. is called, a RuntimeError is raised.
  331. """
  332. self.notify(len(self.__waiters))
  333. notify_all = notifyAll
  334. def Semaphore(*args, **kwargs):
  335. """A factory function that returns a new semaphore.
  336. Semaphores manage a counter representing the number of release() calls minus
  337. the number of acquire() calls, plus an initial value. The acquire() method
  338. blocks if necessary until it can return without making the counter
  339. negative. If not given, value defaults to 1.
  340. """
  341. return _Semaphore(*args, **kwargs)
  342. class _Semaphore(_Verbose):
  343. """Semaphores manage a counter representing the number of release() calls
  344. minus the number of acquire() calls, plus an initial value. The acquire()
  345. method blocks if necessary until it can return without making the counter
  346. negative. If not given, value defaults to 1.
  347. """
  348. # After Tim Peters' semaphore class, but not quite the same (no maximum)
  349. def __init__(self, value=1, verbose=None):
  350. if value < 0:
  351. raise ValueError("semaphore initial value must be >= 0")
  352. _Verbose.__init__(self, verbose)
  353. self.__cond = Condition(Lock())
  354. self.__value = value
  355. def acquire(self, blocking=1):
  356. """Acquire a semaphore, decrementing the internal counter by one.
  357. When invoked without arguments: if the internal counter is larger than
  358. zero on entry, decrement it by one and return immediately. If it is zero
  359. on entry, block, waiting until some other thread has called release() to
  360. make it larger than zero. This is done with proper interlocking so that
  361. if multiple acquire() calls are blocked, release() will wake exactly one
  362. of them up. The implementation may pick one at random, so the order in
  363. which blocked threads are awakened should not be relied on. There is no
  364. return value in this case.
  365. When invoked with blocking set to true, do the same thing as when called
  366. without arguments, and return true.
  367. When invoked with blocking set to false, do not block. If a call without
  368. an argument would block, return false immediately; otherwise, do the
  369. same thing as when called without arguments, and return true.
  370. """
  371. rc = False
  372. with self.__cond:
  373. while self.__value == 0:
  374. if not blocking:
  375. break
  376. if __debug__:
  377. self._note("%s.acquire(%s): blocked waiting, value=%s",
  378. self, blocking, self.__value)
  379. self.__cond.wait()
  380. else:
  381. self.__value = self.__value - 1
  382. if __debug__:
  383. self._note("%s.acquire: success, value=%s",
  384. self, self.__value)
  385. rc = True
  386. return rc
  387. __enter__ = acquire
  388. def release(self):
  389. """Release a semaphore, incrementing the internal counter by one.
  390. When the counter is zero on entry and another thread is waiting for it
  391. to become larger than zero again, wake up that thread.
  392. """
  393. with self.__cond:
  394. self.__value = self.__value + 1
  395. if __debug__:
  396. self._note("%s.release: success, value=%s",
  397. self, self.__value)
  398. self.__cond.notify()
  399. def __exit__(self, t, v, tb):
  400. self.release()
  401. def BoundedSemaphore(*args, **kwargs):
  402. """A factory function that returns a new bounded semaphore.
  403. A bounded semaphore checks to make sure its current value doesn't exceed its
  404. initial value. If it does, ValueError is raised. In most situations
  405. semaphores are used to guard resources with limited capacity.
  406. If the semaphore is released too many times it's a sign of a bug. If not
  407. given, value defaults to 1.
  408. Like regular semaphores, bounded semaphores manage a counter representing
  409. the number of release() calls minus the number of acquire() calls, plus an
  410. initial value. The acquire() method blocks if necessary until it can return
  411. without making the counter negative. If not given, value defaults to 1.
  412. """
  413. return _BoundedSemaphore(*args, **kwargs)
  414. class _BoundedSemaphore(_Semaphore):
  415. """A bounded semaphore checks to make sure its current value doesn't exceed
  416. its initial value. If it does, ValueError is raised. In most situations
  417. semaphores are used to guard resources with limited capacity.
  418. """
  419. def __init__(self, value=1, verbose=None):
  420. _Semaphore.__init__(self, value, verbose)
  421. self._initial_value = value
  422. def release(self):
  423. """Release a semaphore, incrementing the internal counter by one.
  424. When the counter is zero on entry and another thread is waiting for it
  425. to become larger than zero again, wake up that thread.
  426. If the number of releases exceeds the number of acquires,
  427. raise a ValueError.
  428. """
  429. with self._Semaphore__cond:
  430. if self._Semaphore__value >= self._initial_value:
  431. raise ValueError("Semaphore released too many times")
  432. self._Semaphore__value += 1
  433. self._Semaphore__cond.notify()
  434. def Event(*args, **kwargs):
  435. """A factory function that returns a new event.
  436. Events manage a flag that can be set to true with the set() method and reset
  437. to false with the clear() method. The wait() method blocks until the flag is
  438. true.
  439. """
  440. return _Event(*args, **kwargs)
  441. class _Event(_Verbose):
  442. """A factory function that returns a new event object. An event manages a
  443. flag that can be set to true with the set() method and reset to false
  444. with the clear() method. The wait() method blocks until the flag is true.
  445. """
  446. # After Tim Peters' event class (without is_posted())
  447. def __init__(self, verbose=None):
  448. _Verbose.__init__(self, verbose)
  449. self.__cond = Condition(Lock())
  450. self.__flag = False
  451. def _reset_internal_locks(self):
  452. # private! called by Thread._reset_internal_locks by _after_fork()
  453. self.__cond.__init__(Lock())
  454. def isSet(self):
  455. 'Return true if and only if the internal flag is true.'
  456. return self.__flag
  457. is_set = isSet
  458. def set(self):
  459. """Set the internal flag to true.
  460. All threads waiting for the flag to become true are awakened. Threads
  461. that call wait() once the flag is true will not block at all.
  462. """
  463. with self.__cond:
  464. self.__flag = True
  465. self.__cond.notify_all()
  466. def clear(self):
  467. """Reset the internal flag to false.
  468. Subsequently, threads calling wait() will block until set() is called to
  469. set the internal flag to true again.
  470. """
  471. with self.__cond:
  472. self.__flag = False
  473. def wait(self, timeout=None):
  474. """Block until the internal flag is true.
  475. If the internal flag is true on entry, return immediately. Otherwise,
  476. block until another thread calls set() to set the flag to true, or until
  477. the optional timeout occurs.
  478. When the timeout argument is present and not None, it should be a
  479. floating point number specifying a timeout for the operation in seconds
  480. (or fractions thereof).
  481. This method returns the internal flag on exit, so it will always return
  482. True except if a timeout is given and the operation times out.
  483. """
  484. with self.__cond:
  485. if not self.__flag:
  486. self.__cond.wait(timeout)
  487. return self.__flag
  488. # Helper to generate new thread names
  489. _counter = _count().next
  490. _counter() # Consume 0 so first non-main thread has id 1.
  491. def _newname(template="Thread-%d"):
  492. return template % _counter()
  493. # Active thread administration
  494. _active_limbo_lock = _allocate_lock()
  495. _active = {} # maps thread id to Thread object
  496. _limbo = {}
  497. # Main class for threads
  498. class Thread(_Verbose):
  499. """A class that represents a thread of control.
  500. This class can be safely subclassed in a limited fashion.
  501. """
  502. __initialized = False
  503. # Need to store a reference to sys.exc_info for printing
  504. # out exceptions when a thread tries to use a global var. during interp.
  505. # shutdown and thus raises an exception about trying to perform some
  506. # operation on/with a NoneType
  507. __exc_info = _sys.exc_info
  508. # Keep sys.exc_clear too to clear the exception just before
  509. # allowing .join() to return.
  510. __exc_clear = _sys.exc_clear
  511. def __init__(self, group=None, target=None, name=None,
  512. args=(), kwargs=None, verbose=None):
  513. """This constructor should always be called with keyword arguments. Arguments are:
  514. *group* should be None; reserved for future extension when a ThreadGroup
  515. class is implemented.
  516. *target* is the callable object to be invoked by the run()
  517. method. Defaults to None, meaning nothing is called.
  518. *name* is the thread name. By default, a unique name is constructed of
  519. the form "Thread-N" where N is a small decimal number.
  520. *args* is the argument tuple for the target invocation. Defaults to ().
  521. *kwargs* is a dictionary of keyword arguments for the target
  522. invocation. Defaults to {}.
  523. If a subclass overrides the constructor, it must make sure to invoke
  524. the base class constructor (Thread.__init__()) before doing anything
  525. else to the thread.
  526. """
  527. assert group is None, "group argument must be None for now"
  528. _Verbose.__init__(self, verbose)
  529. if kwargs is None:
  530. kwargs = {}
  531. self.__target = target
  532. self.__name = str(name or _newname())
  533. self.__args = args
  534. self.__kwargs = kwargs
  535. self.__daemonic = self._set_daemon()
  536. self.__ident = None
  537. self.__started = Event()
  538. self.__stopped = False
  539. self.__block = Condition(Lock())
  540. self.__initialized = True
  541. # sys.stderr is not stored in the class like
  542. # sys.exc_info since it can be changed between instances
  543. self.__stderr = _sys.stderr
  544. def _reset_internal_locks(self):
  545. # private! Called by _after_fork() to reset our internal locks as
  546. # they may be in an invalid state leading to a deadlock or crash.
  547. if hasattr(self, '_Thread__block'): # DummyThread deletes self.__block
  548. self.__block.__init__()
  549. self.__started._reset_internal_locks()
  550. @property
  551. def _block(self):
  552. # used by a unittest
  553. return self.__block
  554. def _set_daemon(self):
  555. # Overridden in _MainThread and _DummyThread
  556. return current_thread().daemon
  557. def __repr__(self):
  558. assert self.__initialized, "Thread.__init__() was not called"
  559. status = "initial"
  560. if self.__started.is_set():
  561. status = "started"
  562. if self.__stopped:
  563. status = "stopped"
  564. if self.__daemonic:
  565. status += " daemon"
  566. if self.__ident is not None:
  567. status += " %s" % self.__ident
  568. return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
  569. def start(self):
  570. """Start the thread's activity.
  571. It must be called at most once per thread object. It arranges for the
  572. object's run() method to be invoked in a separate thread of control.
  573. This method will raise a RuntimeError if called more than once on the
  574. same thread object.
  575. """
  576. if not self.__initialized:
  577. raise RuntimeError("thread.__init__() not called")
  578. if self.__started.is_set():
  579. raise RuntimeError("threads can only be started once")
  580. if __debug__:
  581. self._note("%s.start(): starting thread", self)
  582. with _active_limbo_lock:
  583. _limbo[self] = self
  584. try:
  585. _start_new_thread(self.__bootstrap, ())
  586. except Exception:
  587. with _active_limbo_lock:
  588. del _limbo[self]
  589. raise
  590. self.__started.wait()
  591. def run(self):
  592. """Method representing the thread's activity.
  593. You may override this method in a subclass. The standard run() method
  594. invokes the callable object passed to the object's constructor as the
  595. target argument, if any, with sequential and keyword arguments taken
  596. from the args and kwargs arguments, respectively.
  597. """
  598. try:
  599. if self.__target:
  600. self.__target(*self.__args, **self.__kwargs)
  601. finally:
  602. # Avoid a refcycle if the thread is running a function with
  603. # an argument that has a member that points to the thread.
  604. del self.__target, self.__args, self.__kwargs
  605. def __bootstrap(self):
  606. # Wrapper around the real bootstrap code that ignores
  607. # exceptions during interpreter cleanup. Those typically
  608. # happen when a daemon thread wakes up at an unfortunate
  609. # moment, finds the world around it destroyed, and raises some
  610. # random exception *** while trying to report the exception in
  611. # __bootstrap_inner() below ***. Those random exceptions
  612. # don't help anybody, and they confuse users, so we suppress
  613. # them. We suppress them only when it appears that the world
  614. # indeed has already been destroyed, so that exceptions in
  615. # __bootstrap_inner() during normal business hours are properly
  616. # reported. Also, we only suppress them for daemonic threads;
  617. # if a non-daemonic encounters this, something else is wrong.
  618. try:
  619. self.__bootstrap_inner()
  620. except:
  621. if self.__daemonic and _sys is None:
  622. return
  623. raise
  624. def _set_ident(self):
  625. self.__ident = _get_ident()
  626. def __bootstrap_inner(self):
  627. try:
  628. self._set_ident()
  629. self.__started.set()
  630. with _active_limbo_lock:
  631. _active[self.__ident] = self
  632. del _limbo[self]
  633. if __debug__:
  634. self._note("%s.__bootstrap(): thread started", self)
  635. if _trace_hook:
  636. self._note("%s.__bootstrap(): registering trace hook", self)
  637. _sys.settrace(_trace_hook)
  638. if _profile_hook:
  639. self._note("%s.__bootstrap(): registering profile hook", self)
  640. _sys.setprofile(_profile_hook)
  641. try:
  642. self.run()
  643. except SystemExit:
  644. if __debug__:
  645. self._note("%s.__bootstrap(): raised SystemExit", self)
  646. except:
  647. if __debug__:
  648. self._note("%s.__bootstrap(): unhandled exception", self)
  649. # If sys.stderr is no more (most likely from interpreter
  650. # shutdown) use self.__stderr. Otherwise still use sys (as in
  651. # _sys) in case sys.stderr was redefined since the creation of
  652. # self.
  653. if _sys and _sys.stderr is not None:
  654. print>>_sys.stderr, ("Exception in thread %s:\n%s" %
  655. (self.name, _format_exc()))
  656. elif self.__stderr is not None:
  657. # Do the best job possible w/o a huge amt. of code to
  658. # approximate a traceback (code ideas from
  659. # Lib/traceback.py)
  660. exc_type, exc_value, exc_tb = self.__exc_info()
  661. try:
  662. print>>self.__stderr, (
  663. "Exception in thread " + self.name +
  664. " (most likely raised during interpreter shutdown):")
  665. print>>self.__stderr, (
  666. "Traceback (most recent call last):")
  667. while exc_tb:
  668. print>>self.__stderr, (
  669. ' File "%s", line %s, in %s' %
  670. (exc_tb.tb_frame.f_code.co_filename,
  671. exc_tb.tb_lineno,
  672. exc_tb.tb_frame.f_code.co_name))
  673. exc_tb = exc_tb.tb_next
  674. print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
  675. # Make sure that exc_tb gets deleted since it is a memory
  676. # hog; deleting everything else is just for thoroughness
  677. finally:
  678. del exc_type, exc_value, exc_tb
  679. else:
  680. if __debug__:
  681. self._note("%s.__bootstrap(): normal return", self)
  682. finally:
  683. # Prevent a race in
  684. # test_threading.test_no_refcycle_through_target when
  685. # the exception keeps the target alive past when we
  686. # assert that it's dead.
  687. self.__exc_clear()
  688. finally:
  689. with _active_limbo_lock:
  690. self.__stop()
  691. try:
  692. # We don't call self.__delete() because it also
  693. # grabs _active_limbo_lock.
  694. del _active[_get_ident()]
  695. except:
  696. pass
  697. def __stop(self):
  698. # DummyThreads delete self.__block, but they have no waiters to
  699. # notify anyway (join() is forbidden on them).
  700. if not hasattr(self, '_Thread__block'):
  701. return
  702. self.__block.acquire()
  703. self.__stopped = True
  704. self.__block.notify_all()
  705. self.__block.release()
  706. def __delete(self):
  707. "Remove current thread from the dict of currently running threads."
  708. # Notes about running with dummy_thread:
  709. #
  710. # Must take care to not raise an exception if dummy_thread is being
  711. # used (and thus this module is being used as an instance of
  712. # dummy_threading). dummy_thread.get_ident() always returns -1 since
  713. # there is only one thread if dummy_thread is being used. Thus
  714. # len(_active) is always <= 1 here, and any Thread instance created
  715. # overwrites the (if any) thread currently registered in _active.
  716. #
  717. # An instance of _MainThread is always created by 'threading'. This
  718. # gets overwritten the instant an instance of Thread is created; both
  719. # threads return -1 from dummy_thread.get_ident() and thus have the
  720. # same key in the dict. So when the _MainThread instance created by
  721. # 'threading' tries to clean itself up when atexit calls this method
  722. # it gets a KeyError if another Thread instance was created.
  723. #
  724. # This all means that KeyError from trying to delete something from
  725. # _active if dummy_threading is being used is a red herring. But
  726. # since it isn't if dummy_threading is *not* being used then don't
  727. # hide the exception.
  728. try:
  729. with _active_limbo_lock:
  730. del _active[_get_ident()]
  731. # There must not be any python code between the previous line
  732. # and after the lock is released. Otherwise a tracing function
  733. # could try to acquire the lock again in the same thread, (in
  734. # current_thread()), and would block.
  735. except KeyError:
  736. if 'dummy_threading' not in _sys.modules:
  737. raise
  738. def join(self, timeout=None):
  739. """Wait until the thread terminates.
  740. This blocks the calling thread until the thread whose join() method is
  741. called terminates -- either normally or through an unhandled exception
  742. or until the optional timeout occurs.
  743. When the timeout argument is present and not None, it should be a
  744. floating point number specifying a timeout for the operation in seconds
  745. (or fractions thereof). As join() always returns None, you must call
  746. isAlive() after join() to decide whether a timeout happened -- if the
  747. thread is still alive, the join() call timed out.
  748. When the timeout argument is not present or None, the operation will
  749. block until the thread terminates.
  750. A thread can be join()ed many times.
  751. join() raises a RuntimeError if an attempt is made to join the current
  752. thread as that would cause a deadlock. It is also an error to join() a
  753. thread before it has been started and attempts to do so raises the same
  754. exception.
  755. """
  756. if not self.__initialized:
  757. raise RuntimeError("Thread.__init__() not called")
  758. if not self.__started.is_set():
  759. raise RuntimeError("cannot join thread before it is started")
  760. if self is current_thread():
  761. raise RuntimeError("cannot join current thread")
  762. if __debug__:
  763. if not self.__stopped:
  764. self._note("%s.join(): waiting until thread stops", self)
  765. self.__block.acquire()
  766. try:
  767. if timeout is None:
  768. while not self.__stopped:
  769. self.__block.wait()
  770. if __debug__:
  771. self._note("%s.join(): thread stopped", self)
  772. else:
  773. deadline = _time() + timeout
  774. while not self.__stopped:
  775. delay = deadline - _time()
  776. if delay <= 0:
  777. if __debug__:
  778. self._note("%s.join(): timed out", self)
  779. break
  780. self.__block.wait(delay)
  781. else:
  782. if __debug__:
  783. self._note("%s.join(): thread stopped", self)
  784. finally:
  785. self.__block.release()
  786. @property
  787. def name(self):
  788. """A string used for identification purposes only.
  789. It has no semantics. Multiple threads may be given the same name. The
  790. initial name is set by the constructor.
  791. """
  792. assert self.__initialized, "Thread.__init__() not called"
  793. return self.__name
  794. @name.setter
  795. def name(self, name):
  796. assert self.__initialized, "Thread.__init__() not called"
  797. self.__name = str(name)
  798. @property
  799. def ident(self):
  800. """Thread identifier of this thread or None if it has not been started.
  801. This is a nonzero integer. See the thread.get_ident() function. Thread
  802. identifiers may be recycled when a thread exits and another thread is
  803. created. The identifier is available even after the thread has exited.
  804. """
  805. assert self.__initialized, "Thread.__init__() not called"
  806. return self.__ident
  807. def isAlive(self):
  808. """Return whether the thread is alive.
  809. This method returns True just before the run() method starts until just
  810. after the run() method terminates. The module function enumerate()
  811. returns a list of all alive threads.
  812. """
  813. assert self.__initialized, "Thread.__init__() not called"
  814. return self.__started.is_set() and not self.__stopped
  815. is_alive = isAlive
  816. @property
  817. def daemon(self):
  818. """A boolean value indicating whether this thread is a daemon thread (True) or not (False).
  819. This must be set before start() is called, otherwise RuntimeError is
  820. raised. Its initial value is inherited from the creating thread; the
  821. main thread is not a daemon thread and therefore all threads created in
  822. the main thread default to daemon = False.
  823. The entire Python program exits when no alive non-daemon threads are
  824. left.
  825. """
  826. assert self.__initialized, "Thread.__init__() not called"
  827. return self.__daemonic
  828. @daemon.setter
  829. def daemon(self, daemonic):
  830. if not self.__initialized:
  831. raise RuntimeError("Thread.__init__() not called")
  832. if self.__started.is_set():
  833. raise RuntimeError("cannot set daemon status of active thread");
  834. self.__daemonic = daemonic
  835. def isDaemon(self):
  836. return self.daemon
  837. def setDaemon(self, daemonic):
  838. self.daemon = daemonic
  839. def getName(self):
  840. return self.name
  841. def setName(self, name):
  842. self.name = name
  843. # The timer class was contributed by Itamar Shtull-Trauring
  844. def Timer(*args, **kwargs):
  845. """Factory function to create a Timer object.
  846. Timers call a function after a specified number of seconds:
  847. t = Timer(30.0, f, args=[], kwargs={})
  848. t.start()
  849. t.cancel() # stop the timer's action if it's still waiting
  850. """
  851. return _Timer(*args, **kwargs)
  852. class _Timer(Thread):
  853. """Call a function after a specified number of seconds:
  854. t = Timer(30.0, f, args=[], kwargs={})
  855. t.start()
  856. t.cancel() # stop the timer's action if it's still waiting
  857. """
  858. def __init__(self, interval, function, args=[], kwargs={}):
  859. Thread.__init__(self)
  860. self.interval = interval
  861. self.function = function
  862. self.args = args
  863. self.kwargs = kwargs
  864. self.finished = Event()
  865. def cancel(self):
  866. """Stop the timer if it hasn't finished yet"""
  867. self.finished.set()
  868. def run(self):
  869. self.finished.wait(self.interval)
  870. if not self.finished.is_set():
  871. self.function(*self.args, **self.kwargs)
  872. self.finished.set()
  873. # Special thread class to represent the main thread
  874. # This is garbage collected through an exit handler
  875. class _MainThread(Thread):
  876. def __init__(self):
  877. Thread.__init__(self, name="MainThread")
  878. self._Thread__started.set()
  879. self._set_ident()
  880. with _active_limbo_lock:
  881. _active[_get_ident()] = self
  882. def _set_daemon(self):
  883. return False
  884. def _exitfunc(self):
  885. self._Thread__stop()
  886. t = _pickSomeNonDaemonThread()
  887. if t:
  888. if __debug__:
  889. self._note("%s: waiting for other threads", self)
  890. while t:
  891. t.join()
  892. t = _pickSomeNonDaemonThread()
  893. if __debug__:
  894. self._note("%s: exiting", self)
  895. self._Thread__delete()
  896. def _pickSomeNonDaemonThread():
  897. for t in enumerate():
  898. if not t.daemon and t.is_alive():
  899. return t
  900. return None
  901. # Dummy thread class to represent threads not started here.
  902. # These aren't garbage collected when they die, nor can they be waited for.
  903. # If they invoke anything in threading.py that calls current_thread(), they
  904. # leave an entry in the _active dict forever after.
  905. # Their purpose is to return *something* from current_thread().
  906. # They are marked as daemon threads so we won't wait for them
  907. # when we exit (conform previous semantics).
  908. class _DummyThread(Thread):
  909. def __init__(self):
  910. Thread.__init__(self, name=_newname("Dummy-%d"))
  911. # Thread.__block consumes an OS-level locking primitive, which
  912. # can never be used by a _DummyThread. Since a _DummyThread
  913. # instance is immortal, that's bad, so release this resource.
  914. del self._Thread__block
  915. self._Thread__started.set()
  916. self._set_ident()
  917. with _active_limbo_lock:
  918. _active[_get_ident()] = self
  919. def _set_daemon(self):
  920. return True
  921. def join(self, timeout=None):
  922. assert False, "cannot join a dummy thread"
  923. # Global API functions
  924. def currentThread():
  925. """Return the current Thread object, corresponding to the caller's thread of control.
  926. If the caller's thread of control was not created through the threading
  927. module, a dummy thread object with limited functionality is returned.
  928. """
  929. try:
  930. return _active[_get_ident()]
  931. except KeyError:
  932. ##print "current_thread(): no current thread for", _get_ident()
  933. return _DummyThread()
  934. current_thread = currentThread
  935. def activeCount():
  936. """Return the number of Thread objects currently alive.
  937. The returned count is equal to the length of the list returned by
  938. enumerate().
  939. """
  940. with _active_limbo_lock:
  941. return len(_active) + len(_limbo)
  942. active_count = activeCount
  943. def _enumerate():
  944. # Same as enumerate(), but without the lock. Internal use only.
  945. return _active.values() + _limbo.values()
  946. def enumerate():
  947. """Return a list of all Thread objects currently alive.
  948. The list includes daemonic threads, dummy thread objects created by
  949. current_thread(), and the main thread. It excludes terminated threads and
  950. threads that have not yet been started.
  951. """
  952. with _active_limbo_lock:
  953. return _active.values() + _limbo.values()
  954. from thread import stack_size
  955. # Create the main thread object,
  956. # and make it available for the interpreter
  957. # (Py_Main) as threading._shutdown.
  958. _shutdown = _MainThread()._exitfunc
  959. # get thread-local implementation, either from the thread
  960. # module, or from the python fallback
  961. try:
  962. from thread import _local as local
  963. except ImportError:
  964. from _threading_local import local
  965. def _after_fork():
  966. # This function is called by Python/ceval.c:PyEval_ReInitThreads which
  967. # is called from PyOS_AfterFork. Here we cleanup threading module state
  968. # that should not exist after a fork.
  969. # Reset _active_limbo_lock, in case we forked while the lock was held
  970. # by another (non-forked) thread. http://bugs.python.org/issue874900
  971. global _active_limbo_lock
  972. _active_limbo_lock = _allocate_lock()
  973. # fork() only copied the current thread; clear references to others.
  974. new_active = {}
  975. current = current_thread()
  976. with _active_limbo_lock:
  977. for thread in _enumerate():
  978. # Any lock/condition variable may be currently locked or in an
  979. # invalid state, so we reinitialize them.
  980. if hasattr(thread, '_reset_internal_locks'):
  981. thread._reset_internal_locks()
  982. if thread is current:
  983. # There is only one active thread. We reset the ident to
  984. # its new value since it can have changed.
  985. ident = _get_ident()
  986. thread._Thread__ident = ident
  987. new_active[ident] = thread
  988. else:
  989. # All the others are already stopped.
  990. thread._Thread__stop()
  991. _limbo.clear()
  992. _active.clear()
  993. _active.update(new_active)
  994. assert len(_active) == 1
  995. # Self-test code
  996. def _test():
  997. class BoundedQueue(_Verbose):
  998. def __init__(self, limit):
  999. _Verbose.__init__(self)
  1000. self.mon = RLock()
  1001. self.rc = Condition(self.mon)
  1002. self.wc = Condition(self.mon)
  1003. self.limit = limit
  1004. self.queue = _deque()
  1005. def put(self, item):
  1006. self.mon.acquire()
  1007. while len(self.queue) >= self.limit:
  1008. self._note("put(%s): queue full", item)
  1009. self.wc.wait()
  1010. self.queue.append(item)
  1011. self._note("put(%s): appended, length now %d",
  1012. item, len(self.queue))
  1013. self.rc.notify()
  1014. self.mon.release()
  1015. def get(self):
  1016. self.mon.acquire()
  1017. while not self.queue:
  1018. self._note("get(): queue empty")
  1019. self.rc.wait()
  1020. item = self.queue.popleft()
  1021. self._note("get(): got %s, %d left", item, len(self.queue))
  1022. self.wc.notify()
  1023. self.mon.release()
  1024. return item
  1025. class ProducerThread(Thread):
  1026. def __init__(self, queue, quota):
  1027. Thread.__init__(self, name="Producer")
  1028. self.queue = queue
  1029. self.quota = quota
  1030. def run(self):
  1031. from random import random
  1032. counter = 0
  1033. while counter < self.quota:
  1034. counter = counter + 1
  1035. self.queue.put("%s.%d" % (self.name, counter))
  1036. _sleep(random() * 0.00001)
  1037. class ConsumerThread(Thread):
  1038. def __init__(self, queue, count):
  1039. Thread.__init__(self, name="Consumer")
  1040. self.queue = queue
  1041. self.count = count
  1042. def run(self):
  1043. while self.count > 0:
  1044. item = self.queue.get()
  1045. print item
  1046. self.count = self.count - 1
  1047. NP = 3
  1048. QL = 4
  1049. NI = 5
  1050. Q = BoundedQueue(QL)
  1051. P = []
  1052. for i in range(NP):
  1053. t = ProducerThread(Q, NI)
  1054. t.name = ("Producer-%d" % (i+1))
  1055. P.append(t)
  1056. C = ConsumerThread(Q, NI*NP)
  1057. for t in P:
  1058. t.start()
  1059. _sleep(0.000001)
  1060. C.start()
  1061. for t in P:
  1062. t.join()
  1063. C.join()
  1064. if __name__ == '__main__':
  1065. _test()