synchronize.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344
  1. #
  2. # Module implementing synchronization primitives
  3. #
  4. # multiprocessing/synchronize.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # All rights reserved.
  8. #
  9. # Redistribution and use in source and binary forms, with or without
  10. # modification, are permitted provided that the following conditions
  11. # are met:
  12. #
  13. # 1. Redistributions of source code must retain the above copyright
  14. # notice, this list of conditions and the following disclaimer.
  15. # 2. Redistributions in binary form must reproduce the above copyright
  16. # notice, this list of conditions and the following disclaimer in the
  17. # documentation and/or other materials provided with the distribution.
  18. # 3. Neither the name of author nor the names of any contributors may be
  19. # used to endorse or promote products derived from this software
  20. # without specific prior written permission.
  21. #
  22. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
  23. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  26. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  27. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  28. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  29. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  30. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  31. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  32. # SUCH DAMAGE.
  33. #
  34. __all__ = [
  35. 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
  36. ]
  37. import threading
  38. import os
  39. import sys
  40. from time import time as _time, sleep as _sleep
  41. import _multiprocessing
  42. from multiprocessing.process import current_process
  43. from multiprocessing.util import Finalize, register_after_fork, debug
  44. from multiprocessing.forking import assert_spawning, Popen
  45. # Try to import the mp.synchronize module cleanly, if it fails
  46. # raise ImportError for platforms lacking a working sem_open implementation.
  47. # See issue 3770
  48. try:
  49. from _multiprocessing import SemLock
  50. except (ImportError):
  51. raise ImportError("This platform lacks a functioning sem_open" +
  52. " implementation, therefore, the required" +
  53. " synchronization primitives needed will not" +
  54. " function, see issue 3770.")
  55. #
  56. # Constants
  57. #
  58. RECURSIVE_MUTEX, SEMAPHORE = range(2)
  59. SEM_VALUE_MAX = _multiprocessing.SemLock.SEM_VALUE_MAX
  60. #
  61. # Base class for semaphores and mutexes; wraps `_multiprocessing.SemLock`
  62. #
  63. class SemLock(object):
  64. def __init__(self, kind, value, maxvalue):
  65. sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
  66. debug('created semlock with handle %s' % sl.handle)
  67. self._make_methods()
  68. if sys.platform != 'win32':
  69. def _after_fork(obj):
  70. obj._semlock._after_fork()
  71. register_after_fork(self, _after_fork)
  72. def _make_methods(self):
  73. self.acquire = self._semlock.acquire
  74. self.release = self._semlock.release
  75. def __enter__(self):
  76. return self._semlock.__enter__()
  77. def __exit__(self, *args):
  78. return self._semlock.__exit__(*args)
  79. def __getstate__(self):
  80. assert_spawning(self)
  81. sl = self._semlock
  82. return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
  83. def __setstate__(self, state):
  84. self._semlock = _multiprocessing.SemLock._rebuild(*state)
  85. debug('recreated blocker with handle %r' % state[0])
  86. self._make_methods()
  87. #
  88. # Semaphore
  89. #
  90. class Semaphore(SemLock):
  91. def __init__(self, value=1):
  92. SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
  93. def get_value(self):
  94. return self._semlock._get_value()
  95. def __repr__(self):
  96. try:
  97. value = self._semlock._get_value()
  98. except Exception:
  99. value = 'unknown'
  100. return '<Semaphore(value=%s)>' % value
  101. #
  102. # Bounded semaphore
  103. #
  104. class BoundedSemaphore(Semaphore):
  105. def __init__(self, value=1):
  106. SemLock.__init__(self, SEMAPHORE, value, value)
  107. def __repr__(self):
  108. try:
  109. value = self._semlock._get_value()
  110. except Exception:
  111. value = 'unknown'
  112. return '<BoundedSemaphore(value=%s, maxvalue=%s)>' % \
  113. (value, self._semlock.maxvalue)
  114. #
  115. # Non-recursive lock
  116. #
  117. class Lock(SemLock):
  118. def __init__(self):
  119. SemLock.__init__(self, SEMAPHORE, 1, 1)
  120. def __repr__(self):
  121. try:
  122. if self._semlock._is_mine():
  123. name = current_process().name
  124. if threading.current_thread().name != 'MainThread':
  125. name += '|' + threading.current_thread().name
  126. elif self._semlock._get_value() == 1:
  127. name = 'None'
  128. elif self._semlock._count() > 0:
  129. name = 'SomeOtherThread'
  130. else:
  131. name = 'SomeOtherProcess'
  132. except Exception:
  133. name = 'unknown'
  134. return '<Lock(owner=%s)>' % name
  135. #
  136. # Recursive lock
  137. #
  138. class RLock(SemLock):
  139. def __init__(self):
  140. SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
  141. def __repr__(self):
  142. try:
  143. if self._semlock._is_mine():
  144. name = current_process().name
  145. if threading.current_thread().name != 'MainThread':
  146. name += '|' + threading.current_thread().name
  147. count = self._semlock._count()
  148. elif self._semlock._get_value() == 1:
  149. name, count = 'None', 0
  150. elif self._semlock._count() > 0:
  151. name, count = 'SomeOtherThread', 'nonzero'
  152. else:
  153. name, count = 'SomeOtherProcess', 'nonzero'
  154. except Exception:
  155. name, count = 'unknown', 'unknown'
  156. return '<RLock(%s, %s)>' % (name, count)
  157. #
  158. # Condition variable
  159. #
  160. class Condition(object):
  161. def __init__(self, lock=None):
  162. self._lock = lock or RLock()
  163. self._sleeping_count = Semaphore(0)
  164. self._woken_count = Semaphore(0)
  165. self._wait_semaphore = Semaphore(0)
  166. self._make_methods()
  167. def __getstate__(self):
  168. assert_spawning(self)
  169. return (self._lock, self._sleeping_count,
  170. self._woken_count, self._wait_semaphore)
  171. def __setstate__(self, state):
  172. (self._lock, self._sleeping_count,
  173. self._woken_count, self._wait_semaphore) = state
  174. self._make_methods()
  175. def __enter__(self):
  176. return self._lock.__enter__()
  177. def __exit__(self, *args):
  178. return self._lock.__exit__(*args)
  179. def _make_methods(self):
  180. self.acquire = self._lock.acquire
  181. self.release = self._lock.release
  182. def __repr__(self):
  183. try:
  184. num_waiters = (self._sleeping_count._semlock._get_value() -
  185. self._woken_count._semlock._get_value())
  186. except Exception:
  187. num_waiters = 'unknown'
  188. return '<Condition(%s, %s)>' % (self._lock, num_waiters)
  189. def wait(self, timeout=None):
  190. assert self._lock._semlock._is_mine(), \
  191. 'must acquire() condition before using wait()'
  192. # indicate that this thread is going to sleep
  193. self._sleeping_count.release()
  194. # release lock
  195. count = self._lock._semlock._count()
  196. for i in xrange(count):
  197. self._lock.release()
  198. try:
  199. # wait for notification or timeout
  200. self._wait_semaphore.acquire(True, timeout)
  201. finally:
  202. # indicate that this thread has woken
  203. self._woken_count.release()
  204. # reacquire lock
  205. for i in xrange(count):
  206. self._lock.acquire()
  207. def notify(self):
  208. assert self._lock._semlock._is_mine(), 'lock is not owned'
  209. assert not self._wait_semaphore.acquire(False)
  210. # to take account of timeouts since last notify() we subtract
  211. # woken_count from sleeping_count and rezero woken_count
  212. while self._woken_count.acquire(False):
  213. res = self._sleeping_count.acquire(False)
  214. assert res
  215. if self._sleeping_count.acquire(False): # try grabbing a sleeper
  216. self._wait_semaphore.release() # wake up one sleeper
  217. self._woken_count.acquire() # wait for the sleeper to wake
  218. # rezero _wait_semaphore in case a timeout just happened
  219. self._wait_semaphore.acquire(False)
  220. def notify_all(self):
  221. assert self._lock._semlock._is_mine(), 'lock is not owned'
  222. assert not self._wait_semaphore.acquire(False)
  223. # to take account of timeouts since last notify*() we subtract
  224. # woken_count from sleeping_count and rezero woken_count
  225. while self._woken_count.acquire(False):
  226. res = self._sleeping_count.acquire(False)
  227. assert res
  228. sleepers = 0
  229. while self._sleeping_count.acquire(False):
  230. self._wait_semaphore.release() # wake up one sleeper
  231. sleepers += 1
  232. if sleepers:
  233. for i in xrange(sleepers):
  234. self._woken_count.acquire() # wait for a sleeper to wake
  235. # rezero wait_semaphore in case some timeouts just happened
  236. while self._wait_semaphore.acquire(False):
  237. pass
  238. #
  239. # Event
  240. #
  241. class Event(object):
  242. def __init__(self):
  243. self._cond = Condition(Lock())
  244. self._flag = Semaphore(0)
  245. def is_set(self):
  246. self._cond.acquire()
  247. try:
  248. if self._flag.acquire(False):
  249. self._flag.release()
  250. return True
  251. return False
  252. finally:
  253. self._cond.release()
  254. def set(self):
  255. self._cond.acquire()
  256. try:
  257. self._flag.acquire(False)
  258. self._flag.release()
  259. self._cond.notify_all()
  260. finally:
  261. self._cond.release()
  262. def clear(self):
  263. self._cond.acquire()
  264. try:
  265. self._flag.acquire(False)
  266. finally:
  267. self._cond.release()
  268. def wait(self, timeout=None):
  269. self._cond.acquire()
  270. try:
  271. if self._flag.acquire(False):
  272. self._flag.release()
  273. else:
  274. self._cond.wait(timeout)
  275. if self._flag.acquire(False):
  276. self._flag.release()
  277. return True
  278. return False
  279. finally:
  280. self._cond.release()