12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322 |
- """Thread module emulating a subset of Java's threading model."""
- import sys as _sys
- try:
- import thread
- except ImportError:
- del _sys.modules[__name__]
- raise
- import warnings
- from collections import deque as _deque
- from itertools import count as _count
- from time import time as _time, sleep as _sleep
- from traceback import format_exc as _format_exc
- # Note regarding PEP 8 compliant aliases
- # This threading model was originally inspired by Java, and inherited
- # the convention of camelCase function and method names from that
- # language. While those names are not in any imminent danger of being
- # deprecated, starting with Python 2.6, the module now provides a
- # PEP 8 compliant alias for any such method name.
- # Using the new PEP 8 compliant names also facilitates substitution
- # with the multiprocessing module, which doesn't provide the old
- # Java inspired names.
- # Rename some stuff so "from threading import *" is safe
- __all__ = ['activeCount', 'active_count', 'Condition', 'currentThread',
- 'current_thread', 'enumerate', 'Event',
- 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
- 'Timer', 'setprofile', 'settrace', 'local', 'stack_size']
- _start_new_thread = thread.start_new_thread
- _allocate_lock = thread.allocate_lock
- _get_ident = thread.get_ident
- ThreadError = thread.error
- del thread
- # sys.exc_clear is used to work around the fact that except blocks
- # don't fully clear the exception until 3.0.
- warnings.filterwarnings('ignore', category=DeprecationWarning,
- module='threading', message='sys.exc_clear')
- # Debug support (adapted from ihooks.py).
- # All the major classes here derive from _Verbose. We force that to
- # be a new-style class so that all the major classes here are new-style.
- # This helps debugging (type(instance) is more revealing for instances
- # of new-style classes).
- _VERBOSE = False
- if __debug__:
- class _Verbose(object):
- def __init__(self, verbose=None):
- if verbose is None:
- verbose = _VERBOSE
- self.__verbose = verbose
- def _note(self, format, *args):
- if self.__verbose:
- format = format % args
- # Issue #4188: calling current_thread() can incur an infinite
- # recursion if it has to create a DummyThread on the fly.
- ident = _get_ident()
- try:
- name = _active[ident].name
- except KeyError:
- name = "<OS thread %d>" % ident
- format = "%s: %s\n" % (name, format)
- _sys.stderr.write(format)
- else:
- # Disable this when using "python -O"
- class _Verbose(object):
- def __init__(self, verbose=None):
- pass
- def _note(self, *args):
- pass
- # Support for profile and trace hooks
- _profile_hook = None
- _trace_hook = None
- def setprofile(func):
- """Set a profile function for all threads started from the threading module.
- The func will be passed to sys.setprofile() for each thread, before its
- run() method is called.
- """
- global _profile_hook
- _profile_hook = func
- def settrace(func):
- """Set a trace function for all threads started from the threading module.
- The func will be passed to sys.settrace() for each thread, before its run()
- method is called.
- """
- global _trace_hook
- _trace_hook = func
- # Synchronization classes
- Lock = _allocate_lock
- def RLock(*args, **kwargs):
- """Factory function that returns a new reentrant lock.
- A reentrant lock must be released by the thread that acquired it. Once a
- thread has acquired a reentrant lock, the same thread may acquire it again
- without blocking; the thread must release it once for each time it has
- acquired it.
- """
- return _RLock(*args, **kwargs)
- class _RLock(_Verbose):
- """A reentrant lock must be released by the thread that acquired it. Once a
- thread has acquired a reentrant lock, the same thread may acquire it
- again without blocking; the thread must release it once for each time it
- has acquired it.
- """
- def __init__(self, verbose=None):
- _Verbose.__init__(self, verbose)
- self.__block = _allocate_lock()
- self.__owner = None
- self.__count = 0
- def __repr__(self):
- owner = self.__owner
- try:
- owner = _active[owner].name
- except KeyError:
- pass
- return "<%s owner=%r count=%d>" % (
- self.__class__.__name__, owner, self.__count)
- def acquire(self, blocking=1):
- """Acquire a lock, blocking or non-blocking.
- When invoked without arguments: if this thread already owns the lock,
- increment the recursion level by one, and return immediately. Otherwise,
- if another thread owns the lock, block until the lock is unlocked. Once
- the lock is unlocked (not owned by any thread), then grab ownership, set
- the recursion level to one, and return. If more than one thread is
- blocked waiting until the lock is unlocked, only one at a time will be
- able to grab ownership of the lock. There is no return value in this
- case.
- When invoked with the blocking argument set to true, do the same thing
- as when called without arguments, and return true.
- When invoked with the blocking argument set to false, do not block. If a
- call without an argument would block, return false immediately;
- otherwise, do the same thing as when called without arguments, and
- return true.
- """
- me = _get_ident()
- if self.__owner == me:
- self.__count = self.__count + 1
- if __debug__:
- self._note("%s.acquire(%s): recursive success", self, blocking)
- return 1
- rc = self.__block.acquire(blocking)
- if rc:
- self.__owner = me
- self.__count = 1
- if __debug__:
- self._note("%s.acquire(%s): initial success", self, blocking)
- else:
- if __debug__:
- self._note("%s.acquire(%s): failure", self, blocking)
- return rc
- __enter__ = acquire
- def release(self):
- """Release a lock, decrementing the recursion level.
- If after the decrement it is zero, reset the lock to unlocked (not owned
- by any thread), and if any other threads are blocked waiting for the
- lock to become unlocked, allow exactly one of them to proceed. If after
- the decrement the recursion level is still nonzero, the lock remains
- locked and owned by the calling thread.
- Only call this method when the calling thread owns the lock. A
- RuntimeError is raised if this method is called when the lock is
- unlocked.
- There is no return value.
- """
- if self.__owner != _get_ident():
- raise RuntimeError("cannot release un-acquired lock")
- self.__count = count = self.__count - 1
- if not count:
- self.__owner = None
- self.__block.release()
- if __debug__:
- self._note("%s.release(): final release", self)
- else:
- if __debug__:
- self._note("%s.release(): non-final release", self)
- def __exit__(self, t, v, tb):
- self.release()
- # Internal methods used by condition variables
- def _acquire_restore(self, count_owner):
- count, owner = count_owner
- self.__block.acquire()
- self.__count = count
- self.__owner = owner
- if __debug__:
- self._note("%s._acquire_restore()", self)
- def _release_save(self):
- if __debug__:
- self._note("%s._release_save()", self)
- count = self.__count
- self.__count = 0
- owner = self.__owner
- self.__owner = None
- self.__block.release()
- return (count, owner)
- def _is_owned(self):
- return self.__owner == _get_ident()
- def Condition(*args, **kwargs):
- """Factory function that returns a new condition variable object.
- A condition variable allows one or more threads to wait until they are
- notified by another thread.
- If the lock argument is given and not None, it must be a Lock or RLock
- object, and it is used as the underlying lock. Otherwise, a new RLock object
- is created and used as the underlying lock.
- """
- return _Condition(*args, **kwargs)
- class _Condition(_Verbose):
- """Condition variables allow one or more threads to wait until they are
- notified by another thread.
- """
- def __init__(self, lock=None, verbose=None):
- _Verbose.__init__(self, verbose)
- if lock is None:
- lock = RLock()
- self.__lock = lock
- # Export the lock's acquire() and release() methods
- self.acquire = lock.acquire
- self.release = lock.release
- # If the lock defines _release_save() and/or _acquire_restore(),
- # these override the default implementations (which just call
- # release() and acquire() on the lock). Ditto for _is_owned().
- try:
- self._release_save = lock._release_save
- except AttributeError:
- pass
- try:
- self._acquire_restore = lock._acquire_restore
- except AttributeError:
- pass
- try:
- self._is_owned = lock._is_owned
- except AttributeError:
- pass
- self.__waiters = []
- def __enter__(self):
- return self.__lock.__enter__()
- def __exit__(self, *args):
- return self.__lock.__exit__(*args)
- def __repr__(self):
- return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
- def _release_save(self):
- self.__lock.release() # No state to save
- def _acquire_restore(self, x):
- self.__lock.acquire() # Ignore saved state
- def _is_owned(self):
- # Return True if lock is owned by current_thread.
- # This method is called only if __lock doesn't have _is_owned().
- if self.__lock.acquire(0):
- self.__lock.release()
- return False
- else:
- return True
- def wait(self, timeout=None):
- """Wait until notified or until a timeout occurs.
- If the calling thread has not acquired the lock when this method is
- called, a RuntimeError is raised.
- This method releases the underlying lock, and then blocks until it is
- awakened by a notify() or notifyAll() call for the same condition
- variable in another thread, or until the optional timeout occurs. Once
- awakened or timed out, it re-acquires the lock and returns.
- When the timeout argument is present and not None, it should be a
- floating point number specifying a timeout for the operation in seconds
- (or fractions thereof).
- When the underlying lock is an RLock, it is not released using its
- release() method, since this may not actually unlock the lock when it
- was acquired multiple times recursively. Instead, an internal interface
- of the RLock class is used, which really unlocks it even when it has
- been recursively acquired several times. Another internal interface is
- then used to restore the recursion level when the lock is reacquired.
- """
- if not self._is_owned():
- raise RuntimeError("cannot wait on un-acquired lock")
- waiter = _allocate_lock()
- waiter.acquire()
- self.__waiters.append(waiter)
- saved_state = self._release_save()
- try: # restore state no matter what (e.g., KeyboardInterrupt)
- if timeout is None:
- waiter.acquire()
- if __debug__:
- self._note("%s.wait(): got it", self)
- else:
- # Balancing act: We can't afford a pure busy loop, so we
- # have to sleep; but if we sleep the whole timeout time,
- # we'll be unresponsive. The scheme here sleeps very
- # little at first, longer as time goes on, but never longer
- # than 20 times per second (or the timeout time remaining).
- endtime = _time() + timeout
- delay = 0.0005 # 500 us -> initial delay of 1 ms
- while True:
- gotit = waiter.acquire(0)
- if gotit:
- break
- remaining = endtime - _time()
- if remaining <= 0:
- break
- delay = min(delay * 2, remaining, .05)
- _sleep(delay)
- if not gotit:
- if __debug__:
- self._note("%s.wait(%s): timed out", self, timeout)
- try:
- self.__waiters.remove(waiter)
- except ValueError:
- pass
- else:
- if __debug__:
- self._note("%s.wait(%s): got it", self, timeout)
- finally:
- self._acquire_restore(saved_state)
- def notify(self, n=1):
- """Wake up one or more threads waiting on this condition, if any.
- If the calling thread has not acquired the lock when this method is
- called, a RuntimeError is raised.
- This method wakes up at most n of the threads waiting for the condition
- variable; it is a no-op if no threads are waiting.
- """
- if not self._is_owned():
- raise RuntimeError("cannot notify on un-acquired lock")
- __waiters = self.__waiters
- waiters = __waiters[:n]
- if not waiters:
- if __debug__:
- self._note("%s.notify(): no waiters", self)
- return
- self._note("%s.notify(): notifying %d waiter%s", self, n,
- n!=1 and "s" or "")
- for waiter in waiters:
- waiter.release()
- try:
- __waiters.remove(waiter)
- except ValueError:
- pass
- def notifyAll(self):
- """Wake up all threads waiting on this condition.
- If the calling thread has not acquired the lock when this method
- is called, a RuntimeError is raised.
- """
- self.notify(len(self.__waiters))
- notify_all = notifyAll
- def Semaphore(*args, **kwargs):
- """A factory function that returns a new semaphore.
- Semaphores manage a counter representing the number of release() calls minus
- the number of acquire() calls, plus an initial value. The acquire() method
- blocks if necessary until it can return without making the counter
- negative. If not given, value defaults to 1.
- """
- return _Semaphore(*args, **kwargs)
- class _Semaphore(_Verbose):
- """Semaphores manage a counter representing the number of release() calls
- minus the number of acquire() calls, plus an initial value. The acquire()
- method blocks if necessary until it can return without making the counter
- negative. If not given, value defaults to 1.
- """
- # After Tim Peters' semaphore class, but not quite the same (no maximum)
- def __init__(self, value=1, verbose=None):
- if value < 0:
- raise ValueError("semaphore initial value must be >= 0")
- _Verbose.__init__(self, verbose)
- self.__cond = Condition(Lock())
- self.__value = value
- def acquire(self, blocking=1):
- """Acquire a semaphore, decrementing the internal counter by one.
- When invoked without arguments: if the internal counter is larger than
- zero on entry, decrement it by one and return immediately. If it is zero
- on entry, block, waiting until some other thread has called release() to
- make it larger than zero. This is done with proper interlocking so that
- if multiple acquire() calls are blocked, release() will wake exactly one
- of them up. The implementation may pick one at random, so the order in
- which blocked threads are awakened should not be relied on. There is no
- return value in this case.
- When invoked with blocking set to true, do the same thing as when called
- without arguments, and return true.
- When invoked with blocking set to false, do not block. If a call without
- an argument would block, return false immediately; otherwise, do the
- same thing as when called without arguments, and return true.
- """
- rc = False
- with self.__cond:
- while self.__value == 0:
- if not blocking:
- break
- if __debug__:
- self._note("%s.acquire(%s): blocked waiting, value=%s",
- self, blocking, self.__value)
- self.__cond.wait()
- else:
- self.__value = self.__value - 1
- if __debug__:
- self._note("%s.acquire: success, value=%s",
- self, self.__value)
- rc = True
- return rc
- __enter__ = acquire
- def release(self):
- """Release a semaphore, incrementing the internal counter by one.
- When the counter is zero on entry and another thread is waiting for it
- to become larger than zero again, wake up that thread.
- """
- with self.__cond:
- self.__value = self.__value + 1
- if __debug__:
- self._note("%s.release: success, value=%s",
- self, self.__value)
- self.__cond.notify()
- def __exit__(self, t, v, tb):
- self.release()
- def BoundedSemaphore(*args, **kwargs):
- """A factory function that returns a new bounded semaphore.
- A bounded semaphore checks to make sure its current value doesn't exceed its
- initial value. If it does, ValueError is raised. In most situations
- semaphores are used to guard resources with limited capacity.
- If the semaphore is released too many times it's a sign of a bug. If not
- given, value defaults to 1.
- Like regular semaphores, bounded semaphores manage a counter representing
- the number of release() calls minus the number of acquire() calls, plus an
- initial value. The acquire() method blocks if necessary until it can return
- without making the counter negative. If not given, value defaults to 1.
- """
- return _BoundedSemaphore(*args, **kwargs)
- class _BoundedSemaphore(_Semaphore):
- """A bounded semaphore checks to make sure its current value doesn't exceed
- its initial value. If it does, ValueError is raised. In most situations
- semaphores are used to guard resources with limited capacity.
- """
- def __init__(self, value=1, verbose=None):
- _Semaphore.__init__(self, value, verbose)
- self._initial_value = value
- def release(self):
- """Release a semaphore, incrementing the internal counter by one.
- When the counter is zero on entry and another thread is waiting for it
- to become larger than zero again, wake up that thread.
- If the number of releases exceeds the number of acquires,
- raise a ValueError.
- """
- with self._Semaphore__cond:
- if self._Semaphore__value >= self._initial_value:
- raise ValueError("Semaphore released too many times")
- self._Semaphore__value += 1
- self._Semaphore__cond.notify()
- def Event(*args, **kwargs):
- """A factory function that returns a new event.
- Events manage a flag that can be set to true with the set() method and reset
- to false with the clear() method. The wait() method blocks until the flag is
- true.
- """
- return _Event(*args, **kwargs)
- class _Event(_Verbose):
- """A factory function that returns a new event object. An event manages a
- flag that can be set to true with the set() method and reset to false
- with the clear() method. The wait() method blocks until the flag is true.
- """
- # After Tim Peters' event class (without is_posted())
- def __init__(self, verbose=None):
- _Verbose.__init__(self, verbose)
- self.__cond = Condition(Lock())
- self.__flag = False
- def _reset_internal_locks(self):
- # private! called by Thread._reset_internal_locks by _after_fork()
- self.__cond.__init__(Lock())
- def isSet(self):
- 'Return true if and only if the internal flag is true.'
- return self.__flag
- is_set = isSet
- def set(self):
- """Set the internal flag to true.
- All threads waiting for the flag to become true are awakened. Threads
- that call wait() once the flag is true will not block at all.
- """
- with self.__cond:
- self.__flag = True
- self.__cond.notify_all()
- def clear(self):
- """Reset the internal flag to false.
- Subsequently, threads calling wait() will block until set() is called to
- set the internal flag to true again.
- """
- with self.__cond:
- self.__flag = False
- def wait(self, timeout=None):
- """Block until the internal flag is true.
- If the internal flag is true on entry, return immediately. Otherwise,
- block until another thread calls set() to set the flag to true, or until
- the optional timeout occurs.
- When the timeout argument is present and not None, it should be a
- floating point number specifying a timeout for the operation in seconds
- (or fractions thereof).
- This method returns the internal flag on exit, so it will always return
- True except if a timeout is given and the operation times out.
- """
- with self.__cond:
- if not self.__flag:
- self.__cond.wait(timeout)
- return self.__flag
- # Helper to generate new thread names
- _counter = _count().next
- _counter() # Consume 0 so first non-main thread has id 1.
- def _newname(template="Thread-%d"):
- return template % _counter()
- # Active thread administration
- _active_limbo_lock = _allocate_lock()
- _active = {} # maps thread id to Thread object
- _limbo = {}
- # Main class for threads
- class Thread(_Verbose):
- """A class that represents a thread of control.
- This class can be safely subclassed in a limited fashion.
- """
- __initialized = False
- # Need to store a reference to sys.exc_info for printing
- # out exceptions when a thread tries to use a global var. during interp.
- # shutdown and thus raises an exception about trying to perform some
- # operation on/with a NoneType
- __exc_info = _sys.exc_info
- # Keep sys.exc_clear too to clear the exception just before
- # allowing .join() to return.
- __exc_clear = _sys.exc_clear
- def __init__(self, group=None, target=None, name=None,
- args=(), kwargs=None, verbose=None):
- """This constructor should always be called with keyword arguments. Arguments are:
- *group* should be None; reserved for future extension when a ThreadGroup
- class is implemented.
- *target* is the callable object to be invoked by the run()
- method. Defaults to None, meaning nothing is called.
- *name* is the thread name. By default, a unique name is constructed of
- the form "Thread-N" where N is a small decimal number.
- *args* is the argument tuple for the target invocation. Defaults to ().
- *kwargs* is a dictionary of keyword arguments for the target
- invocation. Defaults to {}.
- If a subclass overrides the constructor, it must make sure to invoke
- the base class constructor (Thread.__init__()) before doing anything
- else to the thread.
- """
- assert group is None, "group argument must be None for now"
- _Verbose.__init__(self, verbose)
- if kwargs is None:
- kwargs = {}
- self.__target = target
- self.__name = str(name or _newname())
- self.__args = args
- self.__kwargs = kwargs
- self.__daemonic = self._set_daemon()
- self.__ident = None
- self.__started = Event()
- self.__stopped = False
- self.__block = Condition(Lock())
- self.__initialized = True
- # sys.stderr is not stored in the class like
- # sys.exc_info since it can be changed between instances
- self.__stderr = _sys.stderr
- def _reset_internal_locks(self):
- # private! Called by _after_fork() to reset our internal locks as
- # they may be in an invalid state leading to a deadlock or crash.
- if hasattr(self, '_Thread__block'): # DummyThread deletes self.__block
- self.__block.__init__()
- self.__started._reset_internal_locks()
- @property
- def _block(self):
- # used by a unittest
- return self.__block
- def _set_daemon(self):
- # Overridden in _MainThread and _DummyThread
- return current_thread().daemon
- def __repr__(self):
- assert self.__initialized, "Thread.__init__() was not called"
- status = "initial"
- if self.__started.is_set():
- status = "started"
- if self.__stopped:
- status = "stopped"
- if self.__daemonic:
- status += " daemon"
- if self.__ident is not None:
- status += " %s" % self.__ident
- return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
- def start(self):
- """Start the thread's activity.
- It must be called at most once per thread object. It arranges for the
- object's run() method to be invoked in a separate thread of control.
- This method will raise a RuntimeError if called more than once on the
- same thread object.
- """
- if not self.__initialized:
- raise RuntimeError("thread.__init__() not called")
- if self.__started.is_set():
- raise RuntimeError("threads can only be started once")
- if __debug__:
- self._note("%s.start(): starting thread", self)
- with _active_limbo_lock:
- _limbo[self] = self
- try:
- _start_new_thread(self.__bootstrap, ())
- except Exception:
- with _active_limbo_lock:
- del _limbo[self]
- raise
- self.__started.wait()
- def run(self):
- """Method representing the thread's activity.
- You may override this method in a subclass. The standard run() method
- invokes the callable object passed to the object's constructor as the
- target argument, if any, with sequential and keyword arguments taken
- from the args and kwargs arguments, respectively.
- """
- try:
- if self.__target:
- self.__target(*self.__args, **self.__kwargs)
- finally:
- # Avoid a refcycle if the thread is running a function with
- # an argument that has a member that points to the thread.
- del self.__target, self.__args, self.__kwargs
- def __bootstrap(self):
- # Wrapper around the real bootstrap code that ignores
- # exceptions during interpreter cleanup. Those typically
- # happen when a daemon thread wakes up at an unfortunate
- # moment, finds the world around it destroyed, and raises some
- # random exception *** while trying to report the exception in
- # __bootstrap_inner() below ***. Those random exceptions
- # don't help anybody, and they confuse users, so we suppress
- # them. We suppress them only when it appears that the world
- # indeed has already been destroyed, so that exceptions in
- # __bootstrap_inner() during normal business hours are properly
- # reported. Also, we only suppress them for daemonic threads;
- # if a non-daemonic encounters this, something else is wrong.
- try:
- self.__bootstrap_inner()
- except:
- if self.__daemonic and _sys is None:
- return
- raise
- def _set_ident(self):
- self.__ident = _get_ident()
- def __bootstrap_inner(self):
- try:
- self._set_ident()
- self.__started.set()
- with _active_limbo_lock:
- _active[self.__ident] = self
- del _limbo[self]
- if __debug__:
- self._note("%s.__bootstrap(): thread started", self)
- if _trace_hook:
- self._note("%s.__bootstrap(): registering trace hook", self)
- _sys.settrace(_trace_hook)
- if _profile_hook:
- self._note("%s.__bootstrap(): registering profile hook", self)
- _sys.setprofile(_profile_hook)
- try:
- self.run()
- except SystemExit:
- if __debug__:
- self._note("%s.__bootstrap(): raised SystemExit", self)
- except:
- if __debug__:
- self._note("%s.__bootstrap(): unhandled exception", self)
- # If sys.stderr is no more (most likely from interpreter
- # shutdown) use self.__stderr. Otherwise still use sys (as in
- # _sys) in case sys.stderr was redefined since the creation of
- # self.
- if _sys and _sys.stderr is not None:
- print>>_sys.stderr, ("Exception in thread %s:\n%s" %
- (self.name, _format_exc()))
- elif self.__stderr is not None:
- # Do the best job possible w/o a huge amt. of code to
- # approximate a traceback (code ideas from
- # Lib/traceback.py)
- exc_type, exc_value, exc_tb = self.__exc_info()
- try:
- print>>self.__stderr, (
- "Exception in thread " + self.name +
- " (most likely raised during interpreter shutdown):")
- print>>self.__stderr, (
- "Traceback (most recent call last):")
- while exc_tb:
- print>>self.__stderr, (
- ' File "%s", line %s, in %s' %
- (exc_tb.tb_frame.f_code.co_filename,
- exc_tb.tb_lineno,
- exc_tb.tb_frame.f_code.co_name))
- exc_tb = exc_tb.tb_next
- print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
- # Make sure that exc_tb gets deleted since it is a memory
- # hog; deleting everything else is just for thoroughness
- finally:
- del exc_type, exc_value, exc_tb
- else:
- if __debug__:
- self._note("%s.__bootstrap(): normal return", self)
- finally:
- # Prevent a race in
- # test_threading.test_no_refcycle_through_target when
- # the exception keeps the target alive past when we
- # assert that it's dead.
- self.__exc_clear()
- finally:
- with _active_limbo_lock:
- self.__stop()
- try:
- # We don't call self.__delete() because it also
- # grabs _active_limbo_lock.
- del _active[_get_ident()]
- except:
- pass
- def __stop(self):
- # DummyThreads delete self.__block, but they have no waiters to
- # notify anyway (join() is forbidden on them).
- if not hasattr(self, '_Thread__block'):
- return
- self.__block.acquire()
- self.__stopped = True
- self.__block.notify_all()
- self.__block.release()
- def __delete(self):
- "Remove current thread from the dict of currently running threads."
- # Notes about running with dummy_thread:
- #
- # Must take care to not raise an exception if dummy_thread is being
- # used (and thus this module is being used as an instance of
- # dummy_threading). dummy_thread.get_ident() always returns -1 since
- # there is only one thread if dummy_thread is being used. Thus
- # len(_active) is always <= 1 here, and any Thread instance created
- # overwrites the (if any) thread currently registered in _active.
- #
- # An instance of _MainThread is always created by 'threading'. This
- # gets overwritten the instant an instance of Thread is created; both
- # threads return -1 from dummy_thread.get_ident() and thus have the
- # same key in the dict. So when the _MainThread instance created by
- # 'threading' tries to clean itself up when atexit calls this method
- # it gets a KeyError if another Thread instance was created.
- #
- # This all means that KeyError from trying to delete something from
- # _active if dummy_threading is being used is a red herring. But
- # since it isn't if dummy_threading is *not* being used then don't
- # hide the exception.
- try:
- with _active_limbo_lock:
- del _active[_get_ident()]
- # There must not be any python code between the previous line
- # and after the lock is released. Otherwise a tracing function
- # could try to acquire the lock again in the same thread, (in
- # current_thread()), and would block.
- except KeyError:
- if 'dummy_threading' not in _sys.modules:
- raise
- def join(self, timeout=None):
- """Wait until the thread terminates.
- This blocks the calling thread until the thread whose join() method is
- called terminates -- either normally or through an unhandled exception
- or until the optional timeout occurs.
- When the timeout argument is present and not None, it should be a
- floating point number specifying a timeout for the operation in seconds
- (or fractions thereof). As join() always returns None, you must call
- isAlive() after join() to decide whether a timeout happened -- if the
- thread is still alive, the join() call timed out.
- When the timeout argument is not present or None, the operation will
- block until the thread terminates.
- A thread can be join()ed many times.
- join() raises a RuntimeError if an attempt is made to join the current
- thread as that would cause a deadlock. It is also an error to join() a
- thread before it has been started and attempts to do so raises the same
- exception.
- """
- if not self.__initialized:
- raise RuntimeError("Thread.__init__() not called")
- if not self.__started.is_set():
- raise RuntimeError("cannot join thread before it is started")
- if self is current_thread():
- raise RuntimeError("cannot join current thread")
- if __debug__:
- if not self.__stopped:
- self._note("%s.join(): waiting until thread stops", self)
- self.__block.acquire()
- try:
- if timeout is None:
- while not self.__stopped:
- self.__block.wait()
- if __debug__:
- self._note("%s.join(): thread stopped", self)
- else:
- deadline = _time() + timeout
- while not self.__stopped:
- delay = deadline - _time()
- if delay <= 0:
- if __debug__:
- self._note("%s.join(): timed out", self)
- break
- self.__block.wait(delay)
- else:
- if __debug__:
- self._note("%s.join(): thread stopped", self)
- finally:
- self.__block.release()
- @property
- def name(self):
- """A string used for identification purposes only.
- It has no semantics. Multiple threads may be given the same name. The
- initial name is set by the constructor.
- """
- assert self.__initialized, "Thread.__init__() not called"
- return self.__name
- @name.setter
- def name(self, name):
- assert self.__initialized, "Thread.__init__() not called"
- self.__name = str(name)
- @property
- def ident(self):
- """Thread identifier of this thread or None if it has not been started.
- This is a nonzero integer. See the thread.get_ident() function. Thread
- identifiers may be recycled when a thread exits and another thread is
- created. The identifier is available even after the thread has exited.
- """
- assert self.__initialized, "Thread.__init__() not called"
- return self.__ident
- def isAlive(self):
- """Return whether the thread is alive.
- This method returns True just before the run() method starts until just
- after the run() method terminates. The module function enumerate()
- returns a list of all alive threads.
- """
- assert self.__initialized, "Thread.__init__() not called"
- return self.__started.is_set() and not self.__stopped
- is_alive = isAlive
- @property
- def daemon(self):
- """A boolean value indicating whether this thread is a daemon thread (True) or not (False).
- This must be set before start() is called, otherwise RuntimeError is
- raised. Its initial value is inherited from the creating thread; the
- main thread is not a daemon thread and therefore all threads created in
- the main thread default to daemon = False.
- The entire Python program exits when no alive non-daemon threads are
- left.
- """
- assert self.__initialized, "Thread.__init__() not called"
- return self.__daemonic
- @daemon.setter
- def daemon(self, daemonic):
- if not self.__initialized:
- raise RuntimeError("Thread.__init__() not called")
- if self.__started.is_set():
- raise RuntimeError("cannot set daemon status of active thread");
- self.__daemonic = daemonic
- def isDaemon(self):
- return self.daemon
- def setDaemon(self, daemonic):
- self.daemon = daemonic
- def getName(self):
- return self.name
- def setName(self, name):
- self.name = name
- # The timer class was contributed by Itamar Shtull-Trauring
- def Timer(*args, **kwargs):
- """Factory function to create a Timer object.
- Timers call a function after a specified number of seconds:
- t = Timer(30.0, f, args=[], kwargs={})
- t.start()
- t.cancel() # stop the timer's action if it's still waiting
- """
- return _Timer(*args, **kwargs)
- class _Timer(Thread):
- """Call a function after a specified number of seconds:
- t = Timer(30.0, f, args=[], kwargs={})
- t.start()
- t.cancel() # stop the timer's action if it's still waiting
- """
- def __init__(self, interval, function, args=[], kwargs={}):
- Thread.__init__(self)
- self.interval = interval
- self.function = function
- self.args = args
- self.kwargs = kwargs
- self.finished = Event()
- def cancel(self):
- """Stop the timer if it hasn't finished yet"""
- self.finished.set()
- def run(self):
- self.finished.wait(self.interval)
- if not self.finished.is_set():
- self.function(*self.args, **self.kwargs)
- self.finished.set()
- # Special thread class to represent the main thread
- # This is garbage collected through an exit handler
- class _MainThread(Thread):
- def __init__(self):
- Thread.__init__(self, name="MainThread")
- self._Thread__started.set()
- self._set_ident()
- with _active_limbo_lock:
- _active[_get_ident()] = self
- def _set_daemon(self):
- return False
- def _exitfunc(self):
- self._Thread__stop()
- t = _pickSomeNonDaemonThread()
- if t:
- if __debug__:
- self._note("%s: waiting for other threads", self)
- while t:
- t.join()
- t = _pickSomeNonDaemonThread()
- if __debug__:
- self._note("%s: exiting", self)
- self._Thread__delete()
- def _pickSomeNonDaemonThread():
- for t in enumerate():
- if not t.daemon and t.is_alive():
- return t
- return None
- # Dummy thread class to represent threads not started here.
- # These aren't garbage collected when they die, nor can they be waited for.
- # If they invoke anything in threading.py that calls current_thread(), they
- # leave an entry in the _active dict forever after.
- # Their purpose is to return *something* from current_thread().
- # They are marked as daemon threads so we won't wait for them
- # when we exit (conform previous semantics).
- class _DummyThread(Thread):
- def __init__(self):
- Thread.__init__(self, name=_newname("Dummy-%d"))
- # Thread.__block consumes an OS-level locking primitive, which
- # can never be used by a _DummyThread. Since a _DummyThread
- # instance is immortal, that's bad, so release this resource.
- del self._Thread__block
- self._Thread__started.set()
- self._set_ident()
- with _active_limbo_lock:
- _active[_get_ident()] = self
- def _set_daemon(self):
- return True
- def join(self, timeout=None):
- assert False, "cannot join a dummy thread"
- # Global API functions
- def currentThread():
- """Return the current Thread object, corresponding to the caller's thread of control.
- If the caller's thread of control was not created through the threading
- module, a dummy thread object with limited functionality is returned.
- """
- try:
- return _active[_get_ident()]
- except KeyError:
- ##print "current_thread(): no current thread for", _get_ident()
- return _DummyThread()
- current_thread = currentThread
- def activeCount():
- """Return the number of Thread objects currently alive.
- The returned count is equal to the length of the list returned by
- enumerate().
- """
- with _active_limbo_lock:
- return len(_active) + len(_limbo)
- active_count = activeCount
- def _enumerate():
- # Same as enumerate(), but without the lock. Internal use only.
- return _active.values() + _limbo.values()
- def enumerate():
- """Return a list of all Thread objects currently alive.
- The list includes daemonic threads, dummy thread objects created by
- current_thread(), and the main thread. It excludes terminated threads and
- threads that have not yet been started.
- """
- with _active_limbo_lock:
- return _active.values() + _limbo.values()
- from thread import stack_size
- # Create the main thread object,
- # and make it available for the interpreter
- # (Py_Main) as threading._shutdown.
- _shutdown = _MainThread()._exitfunc
- # get thread-local implementation, either from the thread
- # module, or from the python fallback
- try:
- from thread import _local as local
- except ImportError:
- from _threading_local import local
- def _after_fork():
- # This function is called by Python/ceval.c:PyEval_ReInitThreads which
- # is called from PyOS_AfterFork. Here we cleanup threading module state
- # that should not exist after a fork.
- # Reset _active_limbo_lock, in case we forked while the lock was held
- # by another (non-forked) thread. http://bugs.python.org/issue874900
- global _active_limbo_lock
- _active_limbo_lock = _allocate_lock()
- # fork() only copied the current thread; clear references to others.
- new_active = {}
- current = current_thread()
- with _active_limbo_lock:
- for thread in _enumerate():
- # Any lock/condition variable may be currently locked or in an
- # invalid state, so we reinitialize them.
- if hasattr(thread, '_reset_internal_locks'):
- thread._reset_internal_locks()
- if thread is current:
- # There is only one active thread. We reset the ident to
- # its new value since it can have changed.
- ident = _get_ident()
- thread._Thread__ident = ident
- new_active[ident] = thread
- else:
- # All the others are already stopped.
- thread._Thread__stop()
- _limbo.clear()
- _active.clear()
- _active.update(new_active)
- assert len(_active) == 1
- # Self-test code
- def _test():
- class BoundedQueue(_Verbose):
- def __init__(self, limit):
- _Verbose.__init__(self)
- self.mon = RLock()
- self.rc = Condition(self.mon)
- self.wc = Condition(self.mon)
- self.limit = limit
- self.queue = _deque()
- def put(self, item):
- self.mon.acquire()
- while len(self.queue) >= self.limit:
- self._note("put(%s): queue full", item)
- self.wc.wait()
- self.queue.append(item)
- self._note("put(%s): appended, length now %d",
- item, len(self.queue))
- self.rc.notify()
- self.mon.release()
- def get(self):
- self.mon.acquire()
- while not self.queue:
- self._note("get(): queue empty")
- self.rc.wait()
- item = self.queue.popleft()
- self._note("get(): got %s, %d left", item, len(self.queue))
- self.wc.notify()
- self.mon.release()
- return item
- class ProducerThread(Thread):
- def __init__(self, queue, quota):
- Thread.__init__(self, name="Producer")
- self.queue = queue
- self.quota = quota
- def run(self):
- from random import random
- counter = 0
- while counter < self.quota:
- counter = counter + 1
- self.queue.put("%s.%d" % (self.name, counter))
- _sleep(random() * 0.00001)
- class ConsumerThread(Thread):
- def __init__(self, queue, count):
- Thread.__init__(self, name="Consumer")
- self.queue = queue
- self.count = count
- def run(self):
- while self.count > 0:
- item = self.queue.get()
- print item
- self.count = self.count - 1
- NP = 3
- QL = 4
- NI = 5
- Q = BoundedQueue(QL)
- P = []
- for i in range(NP):
- t = ProducerThread(Q, NI)
- t.name = ("Producer-%d" % (i+1))
- P.append(t)
- C = ConsumerThread(Q, NI*NP)
- for t in P:
- t.start()
- _sleep(0.000001)
- C.start()
- for t in P:
- t.join()
- C.join()
- if __name__ == '__main__':
- _test()
|