multiprocess.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835
  1. """
  2. Overview
  3. ========
  4. The multiprocess plugin enables you to distribute your test run among a set of
  5. worker processes that run tests in parallel. This can speed up CPU-bound test
  6. runs (as long as the number of work processeses is around the number of
  7. processors or cores available), but is mainly useful for IO-bound tests that
  8. spend most of their time waiting for data to arrive from someplace else.
  9. .. note ::
  10. See :doc:`../doc_tests/test_multiprocess/multiprocess` for
  11. additional documentation and examples. Use of this plugin on python
  12. 2.5 or earlier requires the multiprocessing_ module, also available
  13. from PyPI.
  14. .. _multiprocessing : http://code.google.com/p/python-multiprocessing/
  15. How tests are distributed
  16. =========================
  17. The ideal case would be to dispatch each test to a worker process
  18. separately. This ideal is not attainable in all cases, however, because many
  19. test suites depend on context (class, module or package) fixtures.
  20. The plugin can't know (unless you tell it -- see below!) if a context fixture
  21. can be called many times concurrently (is re-entrant), or if it can be shared
  22. among tests running in different processes. Therefore, if a context has
  23. fixtures, the default behavior is to dispatch the entire suite to a worker as
  24. a unit.
  25. Controlling distribution
  26. ^^^^^^^^^^^^^^^^^^^^^^^^
  27. There are two context-level variables that you can use to control this default
  28. behavior.
  29. If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True``
  30. in the context, and the plugin will dispatch tests in suites bound to that
  31. context as if the context had no fixtures. This means that the fixtures will
  32. execute concurrently and multiple times, typically once per test.
  33. If a context's fixtures can be shared by tests running in different processes
  34. -- such as a package-level fixture that starts an external http server or
  35. initializes a shared database -- then set ``_multiprocess_shared_ = True`` in
  36. the context. These fixtures will then execute in the primary nose process, and
  37. tests in those contexts will be individually dispatched to run in parallel.
  38. How results are collected and reported
  39. ======================================
  40. As each test or suite executes in a worker process, results (failures, errors,
  41. and specially handled exceptions like SkipTest) are collected in that
  42. process. When the worker process finishes, it returns results to the main
  43. nose process. There, any progress output is printed (dots!), and the
  44. results from the test run are combined into a consolidated result
  45. set. When results have been received for all dispatched tests, or all
  46. workers have died, the result summary is output as normal.
  47. Beware!
  48. =======
  49. Not all test suites will benefit from, or even operate correctly using, this
  50. plugin. For example, CPU-bound tests will run more slowly if you don't have
  51. multiple processors. There are also some differences in plugin
  52. interactions and behaviors due to the way in which tests are dispatched and
  53. loaded. In general, test loading under this plugin operates as if it were
  54. always in directed mode instead of discovered mode. For instance, doctests
  55. in test modules will always be found when using this plugin with the doctest
  56. plugin.
  57. But the biggest issue you will face is probably concurrency. Unless you
  58. have kept your tests as religiously pure unit tests, with no side-effects, no
  59. ordering issues, and no external dependencies, chances are you will experience
  60. odd, intermittent and unexplainable failures and errors when using this
  61. plugin. This doesn't necessarily mean the plugin is broken; it may mean that
  62. your test suite is not safe for concurrency.
  63. New Features in 1.1.0
  64. =====================
  65. * functions generated by test generators are now added to the worker queue
  66. making them multi-threaded.
  67. * fixed timeout functionality, now functions will be terminated with a
  68. TimedOutException exception when they exceed their execution time. The
  69. worker processes are not terminated.
  70. * added ``--process-restartworker`` option to restart workers once they are
  71. done, this helps control memory usage. Sometimes memory leaks can accumulate
  72. making long runs very difficult.
  73. * added global _instantiate_plugins to configure which plugins are started
  74. on the worker processes.
  75. """
  76. import logging
  77. import os
  78. import sys
  79. import time
  80. import traceback
  81. import unittest
  82. import pickle
  83. import signal
  84. import nose.case
  85. from nose.core import TextTestRunner
  86. from nose import failure
  87. from nose import loader
  88. from nose.plugins.base import Plugin
  89. from nose.pyversion import bytes_
  90. from nose.result import TextTestResult
  91. from nose.suite import ContextSuite
  92. from nose.util import test_address
  93. try:
  94. # 2.7+
  95. from unittest.runner import _WritelnDecorator
  96. except ImportError:
  97. from unittest import _WritelnDecorator
  98. from Queue import Empty
  99. from warnings import warn
  100. try:
  101. from cStringIO import StringIO
  102. except ImportError:
  103. import StringIO
  104. # this is a list of plugin classes that will be checked for and created inside
  105. # each worker process
  106. _instantiate_plugins = None
  107. log = logging.getLogger(__name__)
  108. Process = Queue = Pool = Event = Value = Array = None
  109. # have to inherit KeyboardInterrupt to it will interrupt process properly
  110. class TimedOutException(KeyboardInterrupt):
  111. def __init__(self, value = "Timed Out"):
  112. self.value = value
  113. def __str__(self):
  114. return repr(self.value)
  115. def _import_mp():
  116. global Process, Queue, Pool, Event, Value, Array
  117. try:
  118. from multiprocessing import Manager, Process
  119. #prevent the server process created in the manager which holds Python
  120. #objects and allows other processes to manipulate them using proxies
  121. #to interrupt on SIGINT (keyboardinterrupt) so that the communication
  122. #channel between subprocesses and main process is still usable after
  123. #ctrl+C is received in the main process.
  124. old=signal.signal(signal.SIGINT, signal.SIG_IGN)
  125. m = Manager()
  126. #reset it back so main process will receive a KeyboardInterrupt
  127. #exception on ctrl+c
  128. signal.signal(signal.SIGINT, old)
  129. Queue, Pool, Event, Value, Array = (
  130. m.Queue, m.Pool, m.Event, m.Value, m.Array
  131. )
  132. except ImportError:
  133. warn("multiprocessing module is not available, multiprocess plugin "
  134. "cannot be used", RuntimeWarning)
  135. class TestLet:
  136. def __init__(self, case):
  137. try:
  138. self._id = case.id()
  139. except AttributeError:
  140. pass
  141. self._short_description = case.shortDescription()
  142. self._str = str(case)
  143. def id(self):
  144. return self._id
  145. def shortDescription(self):
  146. return self._short_description
  147. def __str__(self):
  148. return self._str
  149. class MultiProcess(Plugin):
  150. """
  151. Run tests in multiple processes. Requires processing module.
  152. """
  153. score = 1000
  154. status = {}
  155. def options(self, parser, env):
  156. """
  157. Register command-line options.
  158. """
  159. parser.add_option("--processes", action="store",
  160. default=env.get('NOSE_PROCESSES', 0),
  161. dest="multiprocess_workers",
  162. metavar="NUM",
  163. help="Spread test run among this many processes. "
  164. "Set a number equal to the number of processors "
  165. "or cores in your machine for best results. "
  166. "Pass a negative number to have the number of "
  167. "processes automatically set to the number of "
  168. "cores. Passing 0 means to disable parallel "
  169. "testing. Default is 0 unless NOSE_PROCESSES is "
  170. "set. "
  171. "[NOSE_PROCESSES]")
  172. parser.add_option("--process-timeout", action="store",
  173. default=env.get('NOSE_PROCESS_TIMEOUT', 10),
  174. dest="multiprocess_timeout",
  175. metavar="SECONDS",
  176. help="Set timeout for return of results from each "
  177. "test runner process. Default is 10. "
  178. "[NOSE_PROCESS_TIMEOUT]")
  179. parser.add_option("--process-restartworker", action="store_true",
  180. default=env.get('NOSE_PROCESS_RESTARTWORKER', False),
  181. dest="multiprocess_restartworker",
  182. help="If set, will restart each worker process once"
  183. " their tests are done, this helps control memory "
  184. "leaks from killing the system. "
  185. "[NOSE_PROCESS_RESTARTWORKER]")
  186. def configure(self, options, config):
  187. """
  188. Configure plugin.
  189. """
  190. try:
  191. self.status.pop('active')
  192. except KeyError:
  193. pass
  194. if not hasattr(options, 'multiprocess_workers'):
  195. self.enabled = False
  196. return
  197. # don't start inside of a worker process
  198. if config.worker:
  199. return
  200. self.config = config
  201. try:
  202. workers = int(options.multiprocess_workers)
  203. except (TypeError, ValueError):
  204. workers = 0
  205. if workers:
  206. _import_mp()
  207. if Process is None:
  208. self.enabled = False
  209. return
  210. # Negative number of workers will cause multiprocessing to hang.
  211. # Set the number of workers to the CPU count to avoid this.
  212. if workers < 0:
  213. try:
  214. import multiprocessing
  215. workers = multiprocessing.cpu_count()
  216. except NotImplementedError:
  217. self.enabled = False
  218. return
  219. self.enabled = True
  220. self.config.multiprocess_workers = workers
  221. t = float(options.multiprocess_timeout)
  222. self.config.multiprocess_timeout = t
  223. r = int(options.multiprocess_restartworker)
  224. self.config.multiprocess_restartworker = r
  225. self.status['active'] = True
  226. def prepareTestLoader(self, loader):
  227. """Remember loader class so MultiProcessTestRunner can instantiate
  228. the right loader.
  229. """
  230. self.loaderClass = loader.__class__
  231. def prepareTestRunner(self, runner):
  232. """Replace test runner with MultiProcessTestRunner.
  233. """
  234. # replace with our runner class
  235. return MultiProcessTestRunner(stream=runner.stream,
  236. verbosity=self.config.verbosity,
  237. config=self.config,
  238. loaderClass=self.loaderClass)
  239. def signalhandler(sig, frame):
  240. raise TimedOutException()
  241. class MultiProcessTestRunner(TextTestRunner):
  242. waitkilltime = 5.0 # max time to wait to terminate a process that does not
  243. # respond to SIGILL
  244. def __init__(self, **kw):
  245. self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
  246. super(MultiProcessTestRunner, self).__init__(**kw)
  247. def collect(self, test, testQueue, tasks, to_teardown, result):
  248. # dispatch and collect results
  249. # put indexes only on queue because tests aren't picklable
  250. for case in self.nextBatch(test):
  251. log.debug("Next batch %s (%s)", case, type(case))
  252. if (isinstance(case, nose.case.Test) and
  253. isinstance(case.test, failure.Failure)):
  254. log.debug("Case is a Failure")
  255. case(result) # run here to capture the failure
  256. continue
  257. # handle shared fixtures
  258. if isinstance(case, ContextSuite) and case.context is failure.Failure:
  259. log.debug("Case is a Failure")
  260. case(result) # run here to capture the failure
  261. continue
  262. elif isinstance(case, ContextSuite) and self.sharedFixtures(case):
  263. log.debug("%s has shared fixtures", case)
  264. try:
  265. case.setUp()
  266. except (KeyboardInterrupt, SystemExit):
  267. raise
  268. except:
  269. log.debug("%s setup failed", sys.exc_info())
  270. result.addError(case, sys.exc_info())
  271. else:
  272. to_teardown.append(case)
  273. if case.factory:
  274. ancestors=case.factory.context.get(case, [])
  275. for an in ancestors[:2]:
  276. #log.debug('reset ancestor %s', an)
  277. if getattr(an, '_multiprocess_shared_', False):
  278. an._multiprocess_can_split_=True
  279. #an._multiprocess_shared_=False
  280. self.collect(case, testQueue, tasks, to_teardown, result)
  281. else:
  282. test_addr = self.addtask(testQueue,tasks,case)
  283. log.debug("Queued test %s (%s) to %s",
  284. len(tasks), test_addr, testQueue)
  285. def startProcess(self, iworker, testQueue, resultQueue, shouldStop, result):
  286. currentaddr = Value('c',bytes_(''))
  287. currentstart = Value('d',time.time())
  288. keyboardCaught = Event()
  289. p = Process(target=runner,
  290. args=(iworker, testQueue,
  291. resultQueue,
  292. currentaddr,
  293. currentstart,
  294. keyboardCaught,
  295. shouldStop,
  296. self.loaderClass,
  297. result.__class__,
  298. pickle.dumps(self.config)))
  299. p.currentaddr = currentaddr
  300. p.currentstart = currentstart
  301. p.keyboardCaught = keyboardCaught
  302. old = signal.signal(signal.SIGILL, signalhandler)
  303. p.start()
  304. signal.signal(signal.SIGILL, old)
  305. return p
  306. def run(self, test):
  307. """
  308. Execute the test (which may be a test suite). If the test is a suite,
  309. distribute it out among as many processes as have been configured, at
  310. as fine a level as is possible given the context fixtures defined in
  311. the suite or any sub-suites.
  312. """
  313. log.debug("%s.run(%s) (%s)", self, test, os.getpid())
  314. wrapper = self.config.plugins.prepareTest(test)
  315. if wrapper is not None:
  316. test = wrapper
  317. # plugins can decorate or capture the output stream
  318. wrapped = self.config.plugins.setOutputStream(self.stream)
  319. if wrapped is not None:
  320. self.stream = wrapped
  321. testQueue = Queue()
  322. resultQueue = Queue()
  323. tasks = []
  324. completed = []
  325. workers = []
  326. to_teardown = []
  327. shouldStop = Event()
  328. result = self._makeResult()
  329. start = time.time()
  330. self.collect(test, testQueue, tasks, to_teardown, result)
  331. log.debug("Starting %s workers", self.config.multiprocess_workers)
  332. for i in range(self.config.multiprocess_workers):
  333. p = self.startProcess(i, testQueue, resultQueue, shouldStop, result)
  334. workers.append(p)
  335. log.debug("Started worker process %s", i+1)
  336. total_tasks = len(tasks)
  337. # need to keep track of the next time to check for timeouts in case
  338. # more than one process times out at the same time.
  339. nexttimeout=self.config.multiprocess_timeout
  340. thrownError = None
  341. try:
  342. while tasks:
  343. log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
  344. len(completed), total_tasks,nexttimeout)
  345. try:
  346. iworker, addr, newtask_addrs, batch_result = resultQueue.get(
  347. timeout=nexttimeout)
  348. log.debug('Results received for worker %d, %s, new tasks: %d',
  349. iworker,addr,len(newtask_addrs))
  350. try:
  351. try:
  352. tasks.remove(addr)
  353. except ValueError:
  354. log.warn('worker %s failed to remove from tasks: %s',
  355. iworker,addr)
  356. total_tasks += len(newtask_addrs)
  357. tasks.extend(newtask_addrs)
  358. except KeyError:
  359. log.debug("Got result for unknown task? %s", addr)
  360. log.debug("current: %s",str(list(tasks)[0]))
  361. else:
  362. completed.append([addr,batch_result])
  363. self.consolidate(result, batch_result)
  364. if (self.config.stopOnError
  365. and not result.wasSuccessful()):
  366. # set the stop condition
  367. shouldStop.set()
  368. break
  369. if self.config.multiprocess_restartworker:
  370. log.debug('joining worker %s',iworker)
  371. # wait for working, but not that important if worker
  372. # cannot be joined in fact, for workers that add to
  373. # testQueue, they will not terminate until all their
  374. # items are read
  375. workers[iworker].join(timeout=1)
  376. if not shouldStop.is_set() and not testQueue.empty():
  377. log.debug('starting new process on worker %s',iworker)
  378. workers[iworker] = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
  379. except Empty:
  380. log.debug("Timed out with %s tasks pending "
  381. "(empty testQueue=%r): %s",
  382. len(tasks),testQueue.empty(),str(tasks))
  383. any_alive = False
  384. for iworker, w in enumerate(workers):
  385. if w.is_alive():
  386. worker_addr = bytes_(w.currentaddr.value,'ascii')
  387. timeprocessing = time.time() - w.currentstart.value
  388. if ( len(worker_addr) == 0
  389. and timeprocessing > self.config.multiprocess_timeout-0.1):
  390. log.debug('worker %d has finished its work item, '
  391. 'but is not exiting? do we wait for it?',
  392. iworker)
  393. else:
  394. any_alive = True
  395. if (len(worker_addr) > 0
  396. and timeprocessing > self.config.multiprocess_timeout-0.1):
  397. log.debug('timed out worker %s: %s',
  398. iworker,worker_addr)
  399. w.currentaddr.value = bytes_('')
  400. # If the process is in C++ code, sending a SIGILL
  401. # might not send a python KeybordInterrupt exception
  402. # therefore, send multiple signals until an
  403. # exception is caught. If this takes too long, then
  404. # terminate the process
  405. w.keyboardCaught.clear()
  406. startkilltime = time.time()
  407. while not w.keyboardCaught.is_set() and w.is_alive():
  408. if time.time()-startkilltime > self.waitkilltime:
  409. # have to terminate...
  410. log.error("terminating worker %s",iworker)
  411. w.terminate()
  412. # there is a small probability that the
  413. # terminated process might send a result,
  414. # which has to be specially handled or
  415. # else processes might get orphaned.
  416. workers[iworker] = w = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
  417. break
  418. os.kill(w.pid, signal.SIGILL)
  419. time.sleep(0.1)
  420. if not any_alive and testQueue.empty():
  421. log.debug("All workers dead")
  422. break
  423. nexttimeout=self.config.multiprocess_timeout
  424. for w in workers:
  425. if w.is_alive() and len(w.currentaddr.value) > 0:
  426. timeprocessing = time.time()-w.currentstart.value
  427. if timeprocessing <= self.config.multiprocess_timeout:
  428. nexttimeout = min(nexttimeout,
  429. self.config.multiprocess_timeout-timeprocessing)
  430. log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
  431. except (KeyboardInterrupt, SystemExit), e:
  432. log.info('parent received ctrl-c when waiting for test results')
  433. thrownError = e
  434. #resultQueue.get(False)
  435. result.addError(test, sys.exc_info())
  436. try:
  437. for case in to_teardown:
  438. log.debug("Tearing down shared fixtures for %s", case)
  439. try:
  440. case.tearDown()
  441. except (KeyboardInterrupt, SystemExit):
  442. raise
  443. except:
  444. result.addError(case, sys.exc_info())
  445. stop = time.time()
  446. # first write since can freeze on shutting down processes
  447. result.printErrors()
  448. result.printSummary(start, stop)
  449. self.config.plugins.finalize(result)
  450. if thrownError is None:
  451. log.debug("Tell all workers to stop")
  452. for w in workers:
  453. if w.is_alive():
  454. testQueue.put('STOP', block=False)
  455. # wait for the workers to end
  456. for iworker,worker in enumerate(workers):
  457. if worker.is_alive():
  458. log.debug('joining worker %s',iworker)
  459. worker.join()
  460. if worker.is_alive():
  461. log.debug('failed to join worker %s',iworker)
  462. except (KeyboardInterrupt, SystemExit):
  463. log.info('parent received ctrl-c when shutting down: stop all processes')
  464. for worker in workers:
  465. if worker.is_alive():
  466. worker.terminate()
  467. if thrownError: raise thrownError
  468. else: raise
  469. return result
  470. def addtask(testQueue,tasks,case):
  471. arg = None
  472. if isinstance(case,nose.case.Test) and hasattr(case.test,'arg'):
  473. # this removes the top level descriptor and allows real function
  474. # name to be returned
  475. case.test.descriptor = None
  476. arg = case.test.arg
  477. test_addr = MultiProcessTestRunner.address(case)
  478. testQueue.put((test_addr,arg), block=False)
  479. if arg is not None:
  480. test_addr += str(arg)
  481. if tasks is not None:
  482. tasks.append(test_addr)
  483. return test_addr
  484. addtask = staticmethod(addtask)
  485. def address(case):
  486. if hasattr(case, 'address'):
  487. file, mod, call = case.address()
  488. elif hasattr(case, 'context'):
  489. file, mod, call = test_address(case.context)
  490. else:
  491. raise Exception("Unable to convert %s to address" % case)
  492. parts = []
  493. if file is None:
  494. if mod is None:
  495. raise Exception("Unaddressable case %s" % case)
  496. else:
  497. parts.append(mod)
  498. else:
  499. # strip __init__.py(c) from end of file part
  500. # if present, having it there confuses loader
  501. dirname, basename = os.path.split(file)
  502. if basename.startswith('__init__'):
  503. file = dirname
  504. parts.append(file)
  505. if call is not None:
  506. parts.append(call)
  507. return ':'.join(map(str, parts))
  508. address = staticmethod(address)
  509. def nextBatch(self, test):
  510. # allows tests or suites to mark themselves as not safe
  511. # for multiprocess execution
  512. if hasattr(test, 'context'):
  513. if not getattr(test.context, '_multiprocess_', True):
  514. return
  515. if ((isinstance(test, ContextSuite)
  516. and test.hasFixtures(self.checkCanSplit))
  517. or not getattr(test, 'can_split', True)
  518. or not isinstance(test, unittest.TestSuite)):
  519. # regular test case, or a suite with context fixtures
  520. # special case: when run like nosetests path/to/module.py
  521. # the top-level suite has only one item, and it shares
  522. # the same context as that item. In that case, we want the
  523. # item, not the top-level suite
  524. if isinstance(test, ContextSuite):
  525. contained = list(test)
  526. if (len(contained) == 1
  527. and getattr(contained[0],
  528. 'context', None) == test.context):
  529. test = contained[0]
  530. yield test
  531. else:
  532. # Suite is without fixtures at this level; but it may have
  533. # fixtures at any deeper level, so we need to examine it all
  534. # the way down to the case level
  535. for case in test:
  536. for batch in self.nextBatch(case):
  537. yield batch
  538. def checkCanSplit(context, fixt):
  539. """
  540. Callback that we use to check whether the fixtures found in a
  541. context or ancestor are ones we care about.
  542. Contexts can tell us that their fixtures are reentrant by setting
  543. _multiprocess_can_split_. So if we see that, we return False to
  544. disregard those fixtures.
  545. """
  546. if not fixt:
  547. return False
  548. if getattr(context, '_multiprocess_can_split_', False):
  549. return False
  550. return True
  551. checkCanSplit = staticmethod(checkCanSplit)
  552. def sharedFixtures(self, case):
  553. context = getattr(case, 'context', None)
  554. if not context:
  555. return False
  556. return getattr(context, '_multiprocess_shared_', False)
  557. def consolidate(self, result, batch_result):
  558. log.debug("batch result is %s" , batch_result)
  559. try:
  560. output, testsRun, failures, errors, errorClasses = batch_result
  561. except ValueError:
  562. log.debug("result in unexpected format %s", batch_result)
  563. failure.Failure(*sys.exc_info())(result)
  564. return
  565. self.stream.write(output)
  566. result.testsRun += testsRun
  567. result.failures.extend(failures)
  568. result.errors.extend(errors)
  569. for key, (storage, label, isfail) in errorClasses.items():
  570. if key not in result.errorClasses:
  571. # Ordinarily storage is result attribute
  572. # but it's only processed through the errorClasses
  573. # dict, so it's ok to fake it here
  574. result.errorClasses[key] = ([], label, isfail)
  575. mystorage, _junk, _junk = result.errorClasses[key]
  576. mystorage.extend(storage)
  577. log.debug("Ran %s tests (total: %s)", testsRun, result.testsRun)
  578. def runner(ix, testQueue, resultQueue, currentaddr, currentstart,
  579. keyboardCaught, shouldStop, loaderClass, resultClass, config):
  580. try:
  581. try:
  582. return __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
  583. keyboardCaught, shouldStop, loaderClass, resultClass, config)
  584. except KeyboardInterrupt:
  585. log.debug('Worker %s keyboard interrupt, stopping',ix)
  586. except Empty:
  587. log.debug("Worker %s timed out waiting for tasks", ix)
  588. def __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
  589. keyboardCaught, shouldStop, loaderClass, resultClass, config):
  590. config = pickle.loads(config)
  591. dummy_parser = config.parserClass()
  592. if _instantiate_plugins is not None:
  593. for pluginclass in _instantiate_plugins:
  594. plugin = pluginclass()
  595. plugin.addOptions(dummy_parser,{})
  596. config.plugins.addPlugin(plugin)
  597. config.plugins.configure(config.options,config)
  598. config.plugins.begin()
  599. log.debug("Worker %s executing, pid=%d", ix,os.getpid())
  600. loader = loaderClass(config=config)
  601. loader.suiteClass.suiteClass = NoSharedFixtureContextSuite
  602. def get():
  603. return testQueue.get(timeout=config.multiprocess_timeout)
  604. def makeResult():
  605. stream = _WritelnDecorator(StringIO())
  606. result = resultClass(stream, descriptions=1,
  607. verbosity=config.verbosity,
  608. config=config)
  609. plug_result = config.plugins.prepareTestResult(result)
  610. if plug_result:
  611. return plug_result
  612. return result
  613. def batch(result):
  614. failures = [(TestLet(c), err) for c, err in result.failures]
  615. errors = [(TestLet(c), err) for c, err in result.errors]
  616. errorClasses = {}
  617. for key, (storage, label, isfail) in result.errorClasses.items():
  618. errorClasses[key] = ([(TestLet(c), err) for c, err in storage],
  619. label, isfail)
  620. return (
  621. result.stream.getvalue(),
  622. result.testsRun,
  623. failures,
  624. errors,
  625. errorClasses)
  626. for test_addr, arg in iter(get, 'STOP'):
  627. if shouldStop.is_set():
  628. log.exception('Worker %d STOPPED',ix)
  629. break
  630. result = makeResult()
  631. test = loader.loadTestsFromNames([test_addr])
  632. test.testQueue = testQueue
  633. test.tasks = []
  634. test.arg = arg
  635. log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
  636. try:
  637. if arg is not None:
  638. test_addr = test_addr + str(arg)
  639. currentaddr.value = bytes_(test_addr)
  640. currentstart.value = time.time()
  641. test(result)
  642. currentaddr.value = bytes_('')
  643. resultQueue.put((ix, test_addr, test.tasks, batch(result)))
  644. except KeyboardInterrupt, e: #TimedOutException:
  645. timeout = isinstance(e, TimedOutException)
  646. if timeout:
  647. keyboardCaught.set()
  648. if len(currentaddr.value):
  649. if timeout:
  650. msg = 'Worker %s timed out, failing current test %s'
  651. else:
  652. msg = 'Worker %s keyboard interrupt, failing current test %s'
  653. log.exception(msg,ix,test_addr)
  654. currentaddr.value = bytes_('')
  655. failure.Failure(*sys.exc_info())(result)
  656. resultQueue.put((ix, test_addr, test.tasks, batch(result)))
  657. else:
  658. if timeout:
  659. msg = 'Worker %s test %s timed out'
  660. else:
  661. msg = 'Worker %s test %s keyboard interrupt'
  662. log.debug(msg,ix,test_addr)
  663. resultQueue.put((ix, test_addr, test.tasks, batch(result)))
  664. if not timeout:
  665. raise
  666. except SystemExit:
  667. currentaddr.value = bytes_('')
  668. log.exception('Worker %s system exit',ix)
  669. raise
  670. except:
  671. currentaddr.value = bytes_('')
  672. log.exception("Worker %s error running test or returning "
  673. "results",ix)
  674. failure.Failure(*sys.exc_info())(result)
  675. resultQueue.put((ix, test_addr, test.tasks, batch(result)))
  676. if config.multiprocess_restartworker:
  677. break
  678. log.debug("Worker %s ending", ix)
  679. class NoSharedFixtureContextSuite(ContextSuite):
  680. """
  681. Context suite that never fires shared fixtures.
  682. When a context sets _multiprocess_shared_, fixtures in that context
  683. are executed by the main process. Using this suite class prevents them
  684. from executing in the runner process as well.
  685. """
  686. testQueue = None
  687. tasks = None
  688. arg = None
  689. def setupContext(self, context):
  690. if getattr(context, '_multiprocess_shared_', False):
  691. return
  692. super(NoSharedFixtureContextSuite, self).setupContext(context)
  693. def teardownContext(self, context):
  694. if getattr(context, '_multiprocess_shared_', False):
  695. return
  696. super(NoSharedFixtureContextSuite, self).teardownContext(context)
  697. def run(self, result):
  698. """Run tests in suite inside of suite fixtures.
  699. """
  700. # proxy the result for myself
  701. log.debug("suite %s (%s) run called, tests: %s",
  702. id(self), self, self._tests)
  703. if self.resultProxy:
  704. result, orig = self.resultProxy(result, self), result
  705. else:
  706. result, orig = result, result
  707. try:
  708. #log.debug('setUp for %s', id(self));
  709. self.setUp()
  710. except KeyboardInterrupt:
  711. raise
  712. except:
  713. self.error_context = 'setup'
  714. result.addError(self, self._exc_info())
  715. return
  716. try:
  717. for test in self._tests:
  718. if (isinstance(test,nose.case.Test)
  719. and self.arg is not None):
  720. test.test.arg = self.arg
  721. else:
  722. test.arg = self.arg
  723. test.testQueue = self.testQueue
  724. test.tasks = self.tasks
  725. if result.shouldStop:
  726. log.debug("stopping")
  727. break
  728. # each nose.case.Test will create its own result proxy
  729. # so the cases need the original result, to avoid proxy
  730. # chains
  731. #log.debug('running test %s in suite %s', test, self);
  732. try:
  733. test(orig)
  734. except KeyboardInterrupt, e:
  735. timeout = isinstance(e, TimedOutException)
  736. if timeout:
  737. msg = 'Timeout when running test %s in suite %s'
  738. else:
  739. msg = 'KeyboardInterrupt when running test %s in suite %s'
  740. log.debug(msg, test, self)
  741. err = (TimedOutException,TimedOutException(str(test)),
  742. sys.exc_info()[2])
  743. test.config.plugins.addError(test,err)
  744. orig.addError(test,err)
  745. if not timeout:
  746. raise
  747. finally:
  748. self.has_run = True
  749. try:
  750. #log.debug('tearDown for %s', id(self));
  751. self.tearDown()
  752. except KeyboardInterrupt:
  753. raise
  754. except:
  755. self.error_context = 'teardown'
  756. result.addError(self, self._exc_info())