123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835 |
- """
- Overview
- ========
- The multiprocess plugin enables you to distribute your test run among a set of
- worker processes that run tests in parallel. This can speed up CPU-bound test
- runs (as long as the number of work processeses is around the number of
- processors or cores available), but is mainly useful for IO-bound tests that
- spend most of their time waiting for data to arrive from someplace else.
- .. note ::
- See :doc:`../doc_tests/test_multiprocess/multiprocess` for
- additional documentation and examples. Use of this plugin on python
- 2.5 or earlier requires the multiprocessing_ module, also available
- from PyPI.
- .. _multiprocessing : http://code.google.com/p/python-multiprocessing/
- How tests are distributed
- =========================
- The ideal case would be to dispatch each test to a worker process
- separately. This ideal is not attainable in all cases, however, because many
- test suites depend on context (class, module or package) fixtures.
- The plugin can't know (unless you tell it -- see below!) if a context fixture
- can be called many times concurrently (is re-entrant), or if it can be shared
- among tests running in different processes. Therefore, if a context has
- fixtures, the default behavior is to dispatch the entire suite to a worker as
- a unit.
- Controlling distribution
- ^^^^^^^^^^^^^^^^^^^^^^^^
- There are two context-level variables that you can use to control this default
- behavior.
- If a context's fixtures are re-entrant, set ``_multiprocess_can_split_ = True``
- in the context, and the plugin will dispatch tests in suites bound to that
- context as if the context had no fixtures. This means that the fixtures will
- execute concurrently and multiple times, typically once per test.
- If a context's fixtures can be shared by tests running in different processes
- -- such as a package-level fixture that starts an external http server or
- initializes a shared database -- then set ``_multiprocess_shared_ = True`` in
- the context. These fixtures will then execute in the primary nose process, and
- tests in those contexts will be individually dispatched to run in parallel.
- How results are collected and reported
- ======================================
- As each test or suite executes in a worker process, results (failures, errors,
- and specially handled exceptions like SkipTest) are collected in that
- process. When the worker process finishes, it returns results to the main
- nose process. There, any progress output is printed (dots!), and the
- results from the test run are combined into a consolidated result
- set. When results have been received for all dispatched tests, or all
- workers have died, the result summary is output as normal.
- Beware!
- =======
- Not all test suites will benefit from, or even operate correctly using, this
- plugin. For example, CPU-bound tests will run more slowly if you don't have
- multiple processors. There are also some differences in plugin
- interactions and behaviors due to the way in which tests are dispatched and
- loaded. In general, test loading under this plugin operates as if it were
- always in directed mode instead of discovered mode. For instance, doctests
- in test modules will always be found when using this plugin with the doctest
- plugin.
- But the biggest issue you will face is probably concurrency. Unless you
- have kept your tests as religiously pure unit tests, with no side-effects, no
- ordering issues, and no external dependencies, chances are you will experience
- odd, intermittent and unexplainable failures and errors when using this
- plugin. This doesn't necessarily mean the plugin is broken; it may mean that
- your test suite is not safe for concurrency.
- New Features in 1.1.0
- =====================
- * functions generated by test generators are now added to the worker queue
- making them multi-threaded.
- * fixed timeout functionality, now functions will be terminated with a
- TimedOutException exception when they exceed their execution time. The
- worker processes are not terminated.
- * added ``--process-restartworker`` option to restart workers once they are
- done, this helps control memory usage. Sometimes memory leaks can accumulate
- making long runs very difficult.
- * added global _instantiate_plugins to configure which plugins are started
- on the worker processes.
- """
- import logging
- import os
- import sys
- import time
- import traceback
- import unittest
- import pickle
- import signal
- import nose.case
- from nose.core import TextTestRunner
- from nose import failure
- from nose import loader
- from nose.plugins.base import Plugin
- from nose.pyversion import bytes_
- from nose.result import TextTestResult
- from nose.suite import ContextSuite
- from nose.util import test_address
- try:
- # 2.7+
- from unittest.runner import _WritelnDecorator
- except ImportError:
- from unittest import _WritelnDecorator
- from Queue import Empty
- from warnings import warn
- try:
- from cStringIO import StringIO
- except ImportError:
- import StringIO
- # this is a list of plugin classes that will be checked for and created inside
- # each worker process
- _instantiate_plugins = None
- log = logging.getLogger(__name__)
- Process = Queue = Pool = Event = Value = Array = None
- # have to inherit KeyboardInterrupt to it will interrupt process properly
- class TimedOutException(KeyboardInterrupt):
- def __init__(self, value = "Timed Out"):
- self.value = value
- def __str__(self):
- return repr(self.value)
- def _import_mp():
- global Process, Queue, Pool, Event, Value, Array
- try:
- from multiprocessing import Manager, Process
- #prevent the server process created in the manager which holds Python
- #objects and allows other processes to manipulate them using proxies
- #to interrupt on SIGINT (keyboardinterrupt) so that the communication
- #channel between subprocesses and main process is still usable after
- #ctrl+C is received in the main process.
- old=signal.signal(signal.SIGINT, signal.SIG_IGN)
- m = Manager()
- #reset it back so main process will receive a KeyboardInterrupt
- #exception on ctrl+c
- signal.signal(signal.SIGINT, old)
- Queue, Pool, Event, Value, Array = (
- m.Queue, m.Pool, m.Event, m.Value, m.Array
- )
- except ImportError:
- warn("multiprocessing module is not available, multiprocess plugin "
- "cannot be used", RuntimeWarning)
- class TestLet:
- def __init__(self, case):
- try:
- self._id = case.id()
- except AttributeError:
- pass
- self._short_description = case.shortDescription()
- self._str = str(case)
- def id(self):
- return self._id
- def shortDescription(self):
- return self._short_description
- def __str__(self):
- return self._str
- class MultiProcess(Plugin):
- """
- Run tests in multiple processes. Requires processing module.
- """
- score = 1000
- status = {}
- def options(self, parser, env):
- """
- Register command-line options.
- """
- parser.add_option("--processes", action="store",
- default=env.get('NOSE_PROCESSES', 0),
- dest="multiprocess_workers",
- metavar="NUM",
- help="Spread test run among this many processes. "
- "Set a number equal to the number of processors "
- "or cores in your machine for best results. "
- "Pass a negative number to have the number of "
- "processes automatically set to the number of "
- "cores. Passing 0 means to disable parallel "
- "testing. Default is 0 unless NOSE_PROCESSES is "
- "set. "
- "[NOSE_PROCESSES]")
- parser.add_option("--process-timeout", action="store",
- default=env.get('NOSE_PROCESS_TIMEOUT', 10),
- dest="multiprocess_timeout",
- metavar="SECONDS",
- help="Set timeout for return of results from each "
- "test runner process. Default is 10. "
- "[NOSE_PROCESS_TIMEOUT]")
- parser.add_option("--process-restartworker", action="store_true",
- default=env.get('NOSE_PROCESS_RESTARTWORKER', False),
- dest="multiprocess_restartworker",
- help="If set, will restart each worker process once"
- " their tests are done, this helps control memory "
- "leaks from killing the system. "
- "[NOSE_PROCESS_RESTARTWORKER]")
- def configure(self, options, config):
- """
- Configure plugin.
- """
- try:
- self.status.pop('active')
- except KeyError:
- pass
- if not hasattr(options, 'multiprocess_workers'):
- self.enabled = False
- return
- # don't start inside of a worker process
- if config.worker:
- return
- self.config = config
- try:
- workers = int(options.multiprocess_workers)
- except (TypeError, ValueError):
- workers = 0
- if workers:
- _import_mp()
- if Process is None:
- self.enabled = False
- return
- # Negative number of workers will cause multiprocessing to hang.
- # Set the number of workers to the CPU count to avoid this.
- if workers < 0:
- try:
- import multiprocessing
- workers = multiprocessing.cpu_count()
- except NotImplementedError:
- self.enabled = False
- return
- self.enabled = True
- self.config.multiprocess_workers = workers
- t = float(options.multiprocess_timeout)
- self.config.multiprocess_timeout = t
- r = int(options.multiprocess_restartworker)
- self.config.multiprocess_restartworker = r
- self.status['active'] = True
- def prepareTestLoader(self, loader):
- """Remember loader class so MultiProcessTestRunner can instantiate
- the right loader.
- """
- self.loaderClass = loader.__class__
- def prepareTestRunner(self, runner):
- """Replace test runner with MultiProcessTestRunner.
- """
- # replace with our runner class
- return MultiProcessTestRunner(stream=runner.stream,
- verbosity=self.config.verbosity,
- config=self.config,
- loaderClass=self.loaderClass)
- def signalhandler(sig, frame):
- raise TimedOutException()
- class MultiProcessTestRunner(TextTestRunner):
- waitkilltime = 5.0 # max time to wait to terminate a process that does not
- # respond to SIGILL
- def __init__(self, **kw):
- self.loaderClass = kw.pop('loaderClass', loader.defaultTestLoader)
- super(MultiProcessTestRunner, self).__init__(**kw)
- def collect(self, test, testQueue, tasks, to_teardown, result):
- # dispatch and collect results
- # put indexes only on queue because tests aren't picklable
- for case in self.nextBatch(test):
- log.debug("Next batch %s (%s)", case, type(case))
- if (isinstance(case, nose.case.Test) and
- isinstance(case.test, failure.Failure)):
- log.debug("Case is a Failure")
- case(result) # run here to capture the failure
- continue
- # handle shared fixtures
- if isinstance(case, ContextSuite) and case.context is failure.Failure:
- log.debug("Case is a Failure")
- case(result) # run here to capture the failure
- continue
- elif isinstance(case, ContextSuite) and self.sharedFixtures(case):
- log.debug("%s has shared fixtures", case)
- try:
- case.setUp()
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- log.debug("%s setup failed", sys.exc_info())
- result.addError(case, sys.exc_info())
- else:
- to_teardown.append(case)
- if case.factory:
- ancestors=case.factory.context.get(case, [])
- for an in ancestors[:2]:
- #log.debug('reset ancestor %s', an)
- if getattr(an, '_multiprocess_shared_', False):
- an._multiprocess_can_split_=True
- #an._multiprocess_shared_=False
- self.collect(case, testQueue, tasks, to_teardown, result)
- else:
- test_addr = self.addtask(testQueue,tasks,case)
- log.debug("Queued test %s (%s) to %s",
- len(tasks), test_addr, testQueue)
- def startProcess(self, iworker, testQueue, resultQueue, shouldStop, result):
- currentaddr = Value('c',bytes_(''))
- currentstart = Value('d',time.time())
- keyboardCaught = Event()
- p = Process(target=runner,
- args=(iworker, testQueue,
- resultQueue,
- currentaddr,
- currentstart,
- keyboardCaught,
- shouldStop,
- self.loaderClass,
- result.__class__,
- pickle.dumps(self.config)))
- p.currentaddr = currentaddr
- p.currentstart = currentstart
- p.keyboardCaught = keyboardCaught
- old = signal.signal(signal.SIGILL, signalhandler)
- p.start()
- signal.signal(signal.SIGILL, old)
- return p
- def run(self, test):
- """
- Execute the test (which may be a test suite). If the test is a suite,
- distribute it out among as many processes as have been configured, at
- as fine a level as is possible given the context fixtures defined in
- the suite or any sub-suites.
- """
- log.debug("%s.run(%s) (%s)", self, test, os.getpid())
- wrapper = self.config.plugins.prepareTest(test)
- if wrapper is not None:
- test = wrapper
- # plugins can decorate or capture the output stream
- wrapped = self.config.plugins.setOutputStream(self.stream)
- if wrapped is not None:
- self.stream = wrapped
- testQueue = Queue()
- resultQueue = Queue()
- tasks = []
- completed = []
- workers = []
- to_teardown = []
- shouldStop = Event()
- result = self._makeResult()
- start = time.time()
- self.collect(test, testQueue, tasks, to_teardown, result)
- log.debug("Starting %s workers", self.config.multiprocess_workers)
- for i in range(self.config.multiprocess_workers):
- p = self.startProcess(i, testQueue, resultQueue, shouldStop, result)
- workers.append(p)
- log.debug("Started worker process %s", i+1)
- total_tasks = len(tasks)
- # need to keep track of the next time to check for timeouts in case
- # more than one process times out at the same time.
- nexttimeout=self.config.multiprocess_timeout
- thrownError = None
- try:
- while tasks:
- log.debug("Waiting for results (%s/%s tasks), next timeout=%.3fs",
- len(completed), total_tasks,nexttimeout)
- try:
- iworker, addr, newtask_addrs, batch_result = resultQueue.get(
- timeout=nexttimeout)
- log.debug('Results received for worker %d, %s, new tasks: %d',
- iworker,addr,len(newtask_addrs))
- try:
- try:
- tasks.remove(addr)
- except ValueError:
- log.warn('worker %s failed to remove from tasks: %s',
- iworker,addr)
- total_tasks += len(newtask_addrs)
- tasks.extend(newtask_addrs)
- except KeyError:
- log.debug("Got result for unknown task? %s", addr)
- log.debug("current: %s",str(list(tasks)[0]))
- else:
- completed.append([addr,batch_result])
- self.consolidate(result, batch_result)
- if (self.config.stopOnError
- and not result.wasSuccessful()):
- # set the stop condition
- shouldStop.set()
- break
- if self.config.multiprocess_restartworker:
- log.debug('joining worker %s',iworker)
- # wait for working, but not that important if worker
- # cannot be joined in fact, for workers that add to
- # testQueue, they will not terminate until all their
- # items are read
- workers[iworker].join(timeout=1)
- if not shouldStop.is_set() and not testQueue.empty():
- log.debug('starting new process on worker %s',iworker)
- workers[iworker] = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
- except Empty:
- log.debug("Timed out with %s tasks pending "
- "(empty testQueue=%r): %s",
- len(tasks),testQueue.empty(),str(tasks))
- any_alive = False
- for iworker, w in enumerate(workers):
- if w.is_alive():
- worker_addr = bytes_(w.currentaddr.value,'ascii')
- timeprocessing = time.time() - w.currentstart.value
- if ( len(worker_addr) == 0
- and timeprocessing > self.config.multiprocess_timeout-0.1):
- log.debug('worker %d has finished its work item, '
- 'but is not exiting? do we wait for it?',
- iworker)
- else:
- any_alive = True
- if (len(worker_addr) > 0
- and timeprocessing > self.config.multiprocess_timeout-0.1):
- log.debug('timed out worker %s: %s',
- iworker,worker_addr)
- w.currentaddr.value = bytes_('')
- # If the process is in C++ code, sending a SIGILL
- # might not send a python KeybordInterrupt exception
- # therefore, send multiple signals until an
- # exception is caught. If this takes too long, then
- # terminate the process
- w.keyboardCaught.clear()
- startkilltime = time.time()
- while not w.keyboardCaught.is_set() and w.is_alive():
- if time.time()-startkilltime > self.waitkilltime:
- # have to terminate...
- log.error("terminating worker %s",iworker)
- w.terminate()
- # there is a small probability that the
- # terminated process might send a result,
- # which has to be specially handled or
- # else processes might get orphaned.
- workers[iworker] = w = self.startProcess(iworker, testQueue, resultQueue, shouldStop, result)
- break
- os.kill(w.pid, signal.SIGILL)
- time.sleep(0.1)
- if not any_alive and testQueue.empty():
- log.debug("All workers dead")
- break
- nexttimeout=self.config.multiprocess_timeout
- for w in workers:
- if w.is_alive() and len(w.currentaddr.value) > 0:
- timeprocessing = time.time()-w.currentstart.value
- if timeprocessing <= self.config.multiprocess_timeout:
- nexttimeout = min(nexttimeout,
- self.config.multiprocess_timeout-timeprocessing)
- log.debug("Completed %s tasks (%s remain)", len(completed), len(tasks))
- except (KeyboardInterrupt, SystemExit), e:
- log.info('parent received ctrl-c when waiting for test results')
- thrownError = e
- #resultQueue.get(False)
-
- result.addError(test, sys.exc_info())
- try:
- for case in to_teardown:
- log.debug("Tearing down shared fixtures for %s", case)
- try:
- case.tearDown()
- except (KeyboardInterrupt, SystemExit):
- raise
- except:
- result.addError(case, sys.exc_info())
- stop = time.time()
- # first write since can freeze on shutting down processes
- result.printErrors()
- result.printSummary(start, stop)
- self.config.plugins.finalize(result)
- if thrownError is None:
- log.debug("Tell all workers to stop")
- for w in workers:
- if w.is_alive():
- testQueue.put('STOP', block=False)
- # wait for the workers to end
- for iworker,worker in enumerate(workers):
- if worker.is_alive():
- log.debug('joining worker %s',iworker)
- worker.join()
- if worker.is_alive():
- log.debug('failed to join worker %s',iworker)
- except (KeyboardInterrupt, SystemExit):
- log.info('parent received ctrl-c when shutting down: stop all processes')
- for worker in workers:
- if worker.is_alive():
- worker.terminate()
- if thrownError: raise thrownError
- else: raise
- return result
- def addtask(testQueue,tasks,case):
- arg = None
- if isinstance(case,nose.case.Test) and hasattr(case.test,'arg'):
- # this removes the top level descriptor and allows real function
- # name to be returned
- case.test.descriptor = None
- arg = case.test.arg
- test_addr = MultiProcessTestRunner.address(case)
- testQueue.put((test_addr,arg), block=False)
- if arg is not None:
- test_addr += str(arg)
- if tasks is not None:
- tasks.append(test_addr)
- return test_addr
- addtask = staticmethod(addtask)
- def address(case):
- if hasattr(case, 'address'):
- file, mod, call = case.address()
- elif hasattr(case, 'context'):
- file, mod, call = test_address(case.context)
- else:
- raise Exception("Unable to convert %s to address" % case)
- parts = []
- if file is None:
- if mod is None:
- raise Exception("Unaddressable case %s" % case)
- else:
- parts.append(mod)
- else:
- # strip __init__.py(c) from end of file part
- # if present, having it there confuses loader
- dirname, basename = os.path.split(file)
- if basename.startswith('__init__'):
- file = dirname
- parts.append(file)
- if call is not None:
- parts.append(call)
- return ':'.join(map(str, parts))
- address = staticmethod(address)
- def nextBatch(self, test):
- # allows tests or suites to mark themselves as not safe
- # for multiprocess execution
- if hasattr(test, 'context'):
- if not getattr(test.context, '_multiprocess_', True):
- return
- if ((isinstance(test, ContextSuite)
- and test.hasFixtures(self.checkCanSplit))
- or not getattr(test, 'can_split', True)
- or not isinstance(test, unittest.TestSuite)):
- # regular test case, or a suite with context fixtures
- # special case: when run like nosetests path/to/module.py
- # the top-level suite has only one item, and it shares
- # the same context as that item. In that case, we want the
- # item, not the top-level suite
- if isinstance(test, ContextSuite):
- contained = list(test)
- if (len(contained) == 1
- and getattr(contained[0],
- 'context', None) == test.context):
- test = contained[0]
- yield test
- else:
- # Suite is without fixtures at this level; but it may have
- # fixtures at any deeper level, so we need to examine it all
- # the way down to the case level
- for case in test:
- for batch in self.nextBatch(case):
- yield batch
- def checkCanSplit(context, fixt):
- """
- Callback that we use to check whether the fixtures found in a
- context or ancestor are ones we care about.
- Contexts can tell us that their fixtures are reentrant by setting
- _multiprocess_can_split_. So if we see that, we return False to
- disregard those fixtures.
- """
- if not fixt:
- return False
- if getattr(context, '_multiprocess_can_split_', False):
- return False
- return True
- checkCanSplit = staticmethod(checkCanSplit)
- def sharedFixtures(self, case):
- context = getattr(case, 'context', None)
- if not context:
- return False
- return getattr(context, '_multiprocess_shared_', False)
- def consolidate(self, result, batch_result):
- log.debug("batch result is %s" , batch_result)
- try:
- output, testsRun, failures, errors, errorClasses = batch_result
- except ValueError:
- log.debug("result in unexpected format %s", batch_result)
- failure.Failure(*sys.exc_info())(result)
- return
- self.stream.write(output)
- result.testsRun += testsRun
- result.failures.extend(failures)
- result.errors.extend(errors)
- for key, (storage, label, isfail) in errorClasses.items():
- if key not in result.errorClasses:
- # Ordinarily storage is result attribute
- # but it's only processed through the errorClasses
- # dict, so it's ok to fake it here
- result.errorClasses[key] = ([], label, isfail)
- mystorage, _junk, _junk = result.errorClasses[key]
- mystorage.extend(storage)
- log.debug("Ran %s tests (total: %s)", testsRun, result.testsRun)
- def runner(ix, testQueue, resultQueue, currentaddr, currentstart,
- keyboardCaught, shouldStop, loaderClass, resultClass, config):
- try:
- try:
- return __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
- keyboardCaught, shouldStop, loaderClass, resultClass, config)
- except KeyboardInterrupt:
- log.debug('Worker %s keyboard interrupt, stopping',ix)
- except Empty:
- log.debug("Worker %s timed out waiting for tasks", ix)
- def __runner(ix, testQueue, resultQueue, currentaddr, currentstart,
- keyboardCaught, shouldStop, loaderClass, resultClass, config):
- config = pickle.loads(config)
- dummy_parser = config.parserClass()
- if _instantiate_plugins is not None:
- for pluginclass in _instantiate_plugins:
- plugin = pluginclass()
- plugin.addOptions(dummy_parser,{})
- config.plugins.addPlugin(plugin)
- config.plugins.configure(config.options,config)
- config.plugins.begin()
- log.debug("Worker %s executing, pid=%d", ix,os.getpid())
- loader = loaderClass(config=config)
- loader.suiteClass.suiteClass = NoSharedFixtureContextSuite
- def get():
- return testQueue.get(timeout=config.multiprocess_timeout)
- def makeResult():
- stream = _WritelnDecorator(StringIO())
- result = resultClass(stream, descriptions=1,
- verbosity=config.verbosity,
- config=config)
- plug_result = config.plugins.prepareTestResult(result)
- if plug_result:
- return plug_result
- return result
- def batch(result):
- failures = [(TestLet(c), err) for c, err in result.failures]
- errors = [(TestLet(c), err) for c, err in result.errors]
- errorClasses = {}
- for key, (storage, label, isfail) in result.errorClasses.items():
- errorClasses[key] = ([(TestLet(c), err) for c, err in storage],
- label, isfail)
- return (
- result.stream.getvalue(),
- result.testsRun,
- failures,
- errors,
- errorClasses)
- for test_addr, arg in iter(get, 'STOP'):
- if shouldStop.is_set():
- log.exception('Worker %d STOPPED',ix)
- break
- result = makeResult()
- test = loader.loadTestsFromNames([test_addr])
- test.testQueue = testQueue
- test.tasks = []
- test.arg = arg
- log.debug("Worker %s Test is %s (%s)", ix, test_addr, test)
- try:
- if arg is not None:
- test_addr = test_addr + str(arg)
- currentaddr.value = bytes_(test_addr)
- currentstart.value = time.time()
- test(result)
- currentaddr.value = bytes_('')
- resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- except KeyboardInterrupt, e: #TimedOutException:
- timeout = isinstance(e, TimedOutException)
- if timeout:
- keyboardCaught.set()
- if len(currentaddr.value):
- if timeout:
- msg = 'Worker %s timed out, failing current test %s'
- else:
- msg = 'Worker %s keyboard interrupt, failing current test %s'
- log.exception(msg,ix,test_addr)
- currentaddr.value = bytes_('')
- failure.Failure(*sys.exc_info())(result)
- resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- else:
- if timeout:
- msg = 'Worker %s test %s timed out'
- else:
- msg = 'Worker %s test %s keyboard interrupt'
- log.debug(msg,ix,test_addr)
- resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- if not timeout:
- raise
- except SystemExit:
- currentaddr.value = bytes_('')
- log.exception('Worker %s system exit',ix)
- raise
- except:
- currentaddr.value = bytes_('')
- log.exception("Worker %s error running test or returning "
- "results",ix)
- failure.Failure(*sys.exc_info())(result)
- resultQueue.put((ix, test_addr, test.tasks, batch(result)))
- if config.multiprocess_restartworker:
- break
- log.debug("Worker %s ending", ix)
- class NoSharedFixtureContextSuite(ContextSuite):
- """
- Context suite that never fires shared fixtures.
- When a context sets _multiprocess_shared_, fixtures in that context
- are executed by the main process. Using this suite class prevents them
- from executing in the runner process as well.
- """
- testQueue = None
- tasks = None
- arg = None
- def setupContext(self, context):
- if getattr(context, '_multiprocess_shared_', False):
- return
- super(NoSharedFixtureContextSuite, self).setupContext(context)
- def teardownContext(self, context):
- if getattr(context, '_multiprocess_shared_', False):
- return
- super(NoSharedFixtureContextSuite, self).teardownContext(context)
- def run(self, result):
- """Run tests in suite inside of suite fixtures.
- """
- # proxy the result for myself
- log.debug("suite %s (%s) run called, tests: %s",
- id(self), self, self._tests)
- if self.resultProxy:
- result, orig = self.resultProxy(result, self), result
- else:
- result, orig = result, result
- try:
- #log.debug('setUp for %s', id(self));
- self.setUp()
- except KeyboardInterrupt:
- raise
- except:
- self.error_context = 'setup'
- result.addError(self, self._exc_info())
- return
- try:
- for test in self._tests:
- if (isinstance(test,nose.case.Test)
- and self.arg is not None):
- test.test.arg = self.arg
- else:
- test.arg = self.arg
- test.testQueue = self.testQueue
- test.tasks = self.tasks
- if result.shouldStop:
- log.debug("stopping")
- break
- # each nose.case.Test will create its own result proxy
- # so the cases need the original result, to avoid proxy
- # chains
- #log.debug('running test %s in suite %s', test, self);
- try:
- test(orig)
- except KeyboardInterrupt, e:
- timeout = isinstance(e, TimedOutException)
- if timeout:
- msg = 'Timeout when running test %s in suite %s'
- else:
- msg = 'KeyboardInterrupt when running test %s in suite %s'
- log.debug(msg, test, self)
- err = (TimedOutException,TimedOutException(str(test)),
- sys.exc_info()[2])
- test.config.plugins.addError(test,err)
- orig.addError(test,err)
- if not timeout:
- raise
- finally:
- self.has_run = True
- try:
- #log.debug('tearDown for %s', id(self));
- self.tearDown()
- except KeyboardInterrupt:
- raise
- except:
- self.error_context = 'teardown'
- result.addError(self, self._exc_info())
|