util.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. #
  2. # Module providing various facilities to other parts of the package
  3. #
  4. # multiprocessing/util.py
  5. #
  6. # Copyright (c) 2006-2008, R Oudkerk
  7. # All rights reserved.
  8. #
  9. # Redistribution and use in source and binary forms, with or without
  10. # modification, are permitted provided that the following conditions
  11. # are met:
  12. #
  13. # 1. Redistributions of source code must retain the above copyright
  14. # notice, this list of conditions and the following disclaimer.
  15. # 2. Redistributions in binary form must reproduce the above copyright
  16. # notice, this list of conditions and the following disclaimer in the
  17. # documentation and/or other materials provided with the distribution.
  18. # 3. Neither the name of author nor the names of any contributors may be
  19. # used to endorse or promote products derived from this software
  20. # without specific prior written permission.
  21. #
  22. # THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS "AS IS" AND
  23. # ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
  24. # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
  25. # ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
  26. # FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
  27. # DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
  28. # OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
  29. # HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
  30. # LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
  31. # OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  32. # SUCH DAMAGE.
  33. #
  34. import os
  35. import itertools
  36. import weakref
  37. import atexit
  38. import threading # we want threading to install it's
  39. # cleanup function before multiprocessing does
  40. from subprocess import _args_from_interpreter_flags
  41. from multiprocessing.process import current_process, active_children
  42. __all__ = [
  43. 'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
  44. 'log_to_stderr', 'get_temp_dir', 'register_after_fork',
  45. 'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
  46. 'SUBDEBUG', 'SUBWARNING',
  47. ]
  48. #
  49. # Logging
  50. #
  51. NOTSET = 0
  52. SUBDEBUG = 5
  53. DEBUG = 10
  54. INFO = 20
  55. SUBWARNING = 25
  56. LOGGER_NAME = 'multiprocessing'
  57. DEFAULT_LOGGING_FORMAT = '[%(levelname)s/%(processName)s] %(message)s'
  58. _logger = None
  59. _log_to_stderr = False
  60. def sub_debug(msg, *args):
  61. if _logger:
  62. _logger.log(SUBDEBUG, msg, *args)
  63. def debug(msg, *args):
  64. if _logger:
  65. _logger.log(DEBUG, msg, *args)
  66. def info(msg, *args):
  67. if _logger:
  68. _logger.log(INFO, msg, *args)
  69. def sub_warning(msg, *args):
  70. if _logger:
  71. _logger.log(SUBWARNING, msg, *args)
  72. def get_logger():
  73. '''
  74. Returns logger used by multiprocessing
  75. '''
  76. global _logger
  77. import logging, atexit
  78. logging._acquireLock()
  79. try:
  80. if not _logger:
  81. _logger = logging.getLogger(LOGGER_NAME)
  82. _logger.propagate = 0
  83. logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
  84. logging.addLevelName(SUBWARNING, 'SUBWARNING')
  85. # XXX multiprocessing should cleanup before logging
  86. if hasattr(atexit, 'unregister'):
  87. atexit.unregister(_exit_function)
  88. atexit.register(_exit_function)
  89. else:
  90. atexit._exithandlers.remove((_exit_function, (), {}))
  91. atexit._exithandlers.append((_exit_function, (), {}))
  92. finally:
  93. logging._releaseLock()
  94. return _logger
  95. def log_to_stderr(level=None):
  96. '''
  97. Turn on logging and add a handler which prints to stderr
  98. '''
  99. global _log_to_stderr
  100. import logging
  101. logger = get_logger()
  102. formatter = logging.Formatter(DEFAULT_LOGGING_FORMAT)
  103. handler = logging.StreamHandler()
  104. handler.setFormatter(formatter)
  105. logger.addHandler(handler)
  106. if level:
  107. logger.setLevel(level)
  108. _log_to_stderr = True
  109. return _logger
  110. #
  111. # Function returning a temp directory which will be removed on exit
  112. #
  113. def get_temp_dir():
  114. # get name of a temp directory which will be automatically cleaned up
  115. if current_process()._tempdir is None:
  116. import shutil, tempfile
  117. tempdir = tempfile.mkdtemp(prefix='pymp-')
  118. info('created temp directory %s', tempdir)
  119. Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
  120. current_process()._tempdir = tempdir
  121. return current_process()._tempdir
  122. #
  123. # Support for reinitialization of objects when bootstrapping a child process
  124. #
  125. _afterfork_registry = weakref.WeakValueDictionary()
  126. _afterfork_counter = itertools.count()
  127. def _run_after_forkers():
  128. items = list(_afterfork_registry.items())
  129. items.sort()
  130. for (index, ident, func), obj in items:
  131. try:
  132. func(obj)
  133. except Exception, e:
  134. info('after forker raised exception %s', e)
  135. def register_after_fork(obj, func):
  136. _afterfork_registry[(_afterfork_counter.next(), id(obj), func)] = obj
  137. #
  138. # Finalization using weakrefs
  139. #
  140. _finalizer_registry = {}
  141. _finalizer_counter = itertools.count()
  142. class Finalize(object):
  143. '''
  144. Class which supports object finalization using weakrefs
  145. '''
  146. def __init__(self, obj, callback, args=(), kwargs=None, exitpriority=None):
  147. assert exitpriority is None or type(exitpriority) is int
  148. if obj is not None:
  149. self._weakref = weakref.ref(obj, self)
  150. else:
  151. assert exitpriority is not None
  152. self._callback = callback
  153. self._args = args
  154. self._kwargs = kwargs or {}
  155. self._key = (exitpriority, _finalizer_counter.next())
  156. self._pid = os.getpid()
  157. _finalizer_registry[self._key] = self
  158. def __call__(self, wr=None):
  159. '''
  160. Run the callback unless it has already been called or cancelled
  161. '''
  162. try:
  163. del _finalizer_registry[self._key]
  164. except KeyError:
  165. sub_debug('finalizer no longer registered')
  166. else:
  167. if self._pid != os.getpid():
  168. sub_debug('finalizer ignored because different process')
  169. res = None
  170. else:
  171. sub_debug('finalizer calling %s with args %s and kwargs %s',
  172. self._callback, self._args, self._kwargs)
  173. res = self._callback(*self._args, **self._kwargs)
  174. self._weakref = self._callback = self._args = \
  175. self._kwargs = self._key = None
  176. return res
  177. def cancel(self):
  178. '''
  179. Cancel finalization of the object
  180. '''
  181. try:
  182. del _finalizer_registry[self._key]
  183. except KeyError:
  184. pass
  185. else:
  186. self._weakref = self._callback = self._args = \
  187. self._kwargs = self._key = None
  188. def still_active(self):
  189. '''
  190. Return whether this finalizer is still waiting to invoke callback
  191. '''
  192. return self._key in _finalizer_registry
  193. def __repr__(self):
  194. try:
  195. obj = self._weakref()
  196. except (AttributeError, TypeError):
  197. obj = None
  198. if obj is None:
  199. return '<Finalize object, dead>'
  200. x = '<Finalize object, callback=%s' % \
  201. getattr(self._callback, '__name__', self._callback)
  202. if self._args:
  203. x += ', args=' + str(self._args)
  204. if self._kwargs:
  205. x += ', kwargs=' + str(self._kwargs)
  206. if self._key[0] is not None:
  207. x += ', exitprority=' + str(self._key[0])
  208. return x + '>'
  209. def _run_finalizers(minpriority=None):
  210. '''
  211. Run all finalizers whose exit priority is not None and at least minpriority
  212. Finalizers with highest priority are called first; finalizers with
  213. the same priority will be called in reverse order of creation.
  214. '''
  215. if _finalizer_registry is None:
  216. # This function may be called after this module's globals are
  217. # destroyed. See the _exit_function function in this module for more
  218. # notes.
  219. return
  220. if minpriority is None:
  221. f = lambda p : p[0][0] is not None
  222. else:
  223. f = lambda p : p[0][0] is not None and p[0][0] >= minpriority
  224. items = [x for x in _finalizer_registry.items() if f(x)]
  225. items.sort(reverse=True)
  226. for key, finalizer in items:
  227. sub_debug('calling %s', finalizer)
  228. try:
  229. finalizer()
  230. except Exception:
  231. import traceback
  232. traceback.print_exc()
  233. if minpriority is None:
  234. _finalizer_registry.clear()
  235. #
  236. # Clean up on exit
  237. #
  238. def is_exiting():
  239. '''
  240. Returns true if the process is shutting down
  241. '''
  242. return _exiting or _exiting is None
  243. _exiting = False
  244. def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
  245. active_children=active_children,
  246. current_process=current_process):
  247. # NB: we hold on to references to functions in the arglist due to the
  248. # situation described below, where this function is called after this
  249. # module's globals are destroyed.
  250. global _exiting
  251. info('process shutting down')
  252. debug('running all "atexit" finalizers with priority >= 0')
  253. _run_finalizers(0)
  254. if current_process() is not None:
  255. # NB: we check if the current process is None here because if
  256. # it's None, any call to ``active_children()`` will throw an
  257. # AttributeError (active_children winds up trying to get
  258. # attributes from util._current_process). This happens in a
  259. # variety of shutdown circumstances that are not well-understood
  260. # because module-scope variables are not apparently supposed to
  261. # be destroyed until after this function is called. However,
  262. # they are indeed destroyed before this function is called. See
  263. # issues 9775 and 15881. Also related: 4106, 9205, and 9207.
  264. for p in active_children():
  265. if p._daemonic:
  266. info('calling terminate() for daemon %s', p.name)
  267. p._popen.terminate()
  268. for p in active_children():
  269. info('calling join() for process %s', p.name)
  270. p.join()
  271. debug('running the remaining "atexit" finalizers')
  272. _run_finalizers()
  273. atexit.register(_exit_function)
  274. #
  275. # Some fork aware types
  276. #
  277. class ForkAwareThreadLock(object):
  278. def __init__(self):
  279. self._reset()
  280. register_after_fork(self, ForkAwareThreadLock._reset)
  281. def _reset(self):
  282. self._lock = threading.Lock()
  283. self.acquire = self._lock.acquire
  284. self.release = self._lock.release
  285. class ForkAwareLocal(threading.local):
  286. def __init__(self):
  287. register_after_fork(self, lambda obj : obj.__dict__.clear())
  288. def __reduce__(self):
  289. return type(self), ()