123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245 |
- """
- This plugin captures logging statements issued during test execution. When an
- error or failure occurs, the captured log messages are attached to the running
- test in the test.capturedLogging attribute, and displayed with the error failure
- output. It is enabled by default but can be turned off with the option
- ``--nologcapture``.
- You can filter captured logging statements with the ``--logging-filter`` option.
- If set, it specifies which logger(s) will be captured; loggers that do not match
- will be passed. Example: specifying ``--logging-filter=sqlalchemy,myapp``
- will ensure that only statements logged via sqlalchemy.engine, myapp
- or myapp.foo.bar logger will be logged.
- You can remove other installed logging handlers with the
- ``--logging-clear-handlers`` option.
- """
- import logging
- from logging import Handler
- import threading
- from nose.plugins.base import Plugin
- from nose.util import anyp, ln, safe_str
- try:
- from cStringIO import StringIO
- except ImportError:
- from StringIO import StringIO
- log = logging.getLogger(__name__)
- class FilterSet(object):
- def __init__(self, filter_components):
- self.inclusive, self.exclusive = self._partition(filter_components)
- # @staticmethod
- def _partition(components):
- inclusive, exclusive = [], []
- for component in components:
- if component.startswith('-'):
- exclusive.append(component[1:])
- else:
- inclusive.append(component)
- return inclusive, exclusive
- _partition = staticmethod(_partition)
- def allow(self, record):
- """returns whether this record should be printed"""
- if not self:
- # nothing to filter
- return True
- return self._allow(record) and not self._deny(record)
- # @staticmethod
- def _any_match(matchers, record):
- """return the bool of whether `record` starts with
- any item in `matchers`"""
- def record_matches_key(key):
- return record == key or record.startswith(key + '.')
- return anyp(bool, map(record_matches_key, matchers))
- _any_match = staticmethod(_any_match)
- def _allow(self, record):
- if not self.inclusive:
- return True
- return self._any_match(self.inclusive, record)
- def _deny(self, record):
- if not self.exclusive:
- return False
- return self._any_match(self.exclusive, record)
- class MyMemoryHandler(Handler):
- def __init__(self, logformat, logdatefmt, filters):
- Handler.__init__(self)
- fmt = logging.Formatter(logformat, logdatefmt)
- self.setFormatter(fmt)
- self.filterset = FilterSet(filters)
- self.buffer = []
- def emit(self, record):
- self.buffer.append(self.format(record))
- def flush(self):
- pass # do nothing
- def truncate(self):
- self.buffer = []
- def filter(self, record):
- if self.filterset.allow(record.name):
- return Handler.filter(self, record)
- def __getstate__(self):
- state = self.__dict__.copy()
- del state['lock']
- return state
- def __setstate__(self, state):
- self.__dict__.update(state)
- self.lock = threading.RLock()
- class LogCapture(Plugin):
- """
- Log capture plugin. Enabled by default. Disable with --nologcapture.
- This plugin captures logging statements issued during test execution,
- appending any output captured to the error or failure output,
- should the test fail or raise an error.
- """
- enabled = True
- env_opt = 'NOSE_NOLOGCAPTURE'
- name = 'logcapture'
- score = 500
- logformat = '%(name)s: %(levelname)s: %(message)s'
- logdatefmt = None
- clear = False
- filters = ['-nose']
- def options(self, parser, env):
- """Register commandline options.
- """
- parser.add_option(
- "--nologcapture", action="store_false",
- default=not env.get(self.env_opt), dest="logcapture",
- help="Disable logging capture plugin. "
- "Logging configuration will be left intact."
- " [NOSE_NOLOGCAPTURE]")
- parser.add_option(
- "--logging-format", action="store", dest="logcapture_format",
- default=env.get('NOSE_LOGFORMAT') or self.logformat,
- metavar="FORMAT",
- help="Specify custom format to print statements. "
- "Uses the same format as used by standard logging handlers."
- " [NOSE_LOGFORMAT]")
- parser.add_option(
- "--logging-datefmt", action="store", dest="logcapture_datefmt",
- default=env.get('NOSE_LOGDATEFMT') or self.logdatefmt,
- metavar="FORMAT",
- help="Specify custom date/time format to print statements. "
- "Uses the same format as used by standard logging handlers."
- " [NOSE_LOGDATEFMT]")
- parser.add_option(
- "--logging-filter", action="store", dest="logcapture_filters",
- default=env.get('NOSE_LOGFILTER'),
- metavar="FILTER",
- help="Specify which statements to filter in/out. "
- "By default, everything is captured. If the output is too"
- " verbose,\nuse this option to filter out needless output.\n"
- "Example: filter=foo will capture statements issued ONLY to\n"
- " foo or foo.what.ever.sub but not foobar or other logger.\n"
- "Specify multiple loggers with comma: filter=foo,bar,baz.\n"
- "If any logger name is prefixed with a minus, eg filter=-foo,\n"
- "it will be excluded rather than included. Default: "
- "exclude logging messages from nose itself (-nose)."
- " [NOSE_LOGFILTER]\n")
- parser.add_option(
- "--logging-clear-handlers", action="store_true",
- default=False, dest="logcapture_clear",
- help="Clear all other logging handlers")
- parser.add_option(
- "--logging-level", action="store",
- default='NOTSET', dest="logcapture_level",
- help="Set the log level to capture")
- def configure(self, options, conf):
- """Configure plugin.
- """
- self.conf = conf
- # Disable if explicitly disabled, or if logging is
- # configured via logging config file
- if not options.logcapture or conf.loggingConfig:
- self.enabled = False
- self.logformat = options.logcapture_format
- self.logdatefmt = options.logcapture_datefmt
- self.clear = options.logcapture_clear
- self.loglevel = options.logcapture_level
- if options.logcapture_filters:
- self.filters = options.logcapture_filters.split(',')
- def setupLoghandler(self):
- # setup our handler with root logger
- root_logger = logging.getLogger()
- if self.clear:
- if hasattr(root_logger, "handlers"):
- for handler in root_logger.handlers:
- root_logger.removeHandler(handler)
- for logger in logging.Logger.manager.loggerDict.values():
- if hasattr(logger, "handlers"):
- for handler in logger.handlers:
- logger.removeHandler(handler)
- # make sure there isn't one already
- # you can't simply use "if self.handler not in root_logger.handlers"
- # since at least in unit tests this doesn't work --
- # LogCapture() is instantiated for each test case while root_logger
- # is module global
- # so we always add new MyMemoryHandler instance
- for handler in root_logger.handlers[:]:
- if isinstance(handler, MyMemoryHandler):
- root_logger.handlers.remove(handler)
- root_logger.addHandler(self.handler)
- # to make sure everything gets captured
- loglevel = getattr(self, "loglevel", "NOTSET")
- root_logger.setLevel(getattr(logging, loglevel))
- def begin(self):
- """Set up logging handler before test run begins.
- """
- self.start()
- def start(self):
- self.handler = MyMemoryHandler(self.logformat, self.logdatefmt,
- self.filters)
- self.setupLoghandler()
- def end(self):
- pass
- def beforeTest(self, test):
- """Clear buffers and handlers before test.
- """
- self.setupLoghandler()
- def afterTest(self, test):
- """Clear buffers after test.
- """
- self.handler.truncate()
- def formatFailure(self, test, err):
- """Add captured log messages to failure output.
- """
- return self.formatError(test, err)
- def formatError(self, test, err):
- """Add captured log messages to error output.
- """
- # logic flow copied from Capture.formatError
- test.capturedLogging = records = self.formatLogRecords()
- if not records:
- return err
- ec, ev, tb = err
- return (ec, self.addCaptureToErr(ev, records), tb)
- def formatLogRecords(self):
- return map(safe_str, self.handler.buffer)
- def addCaptureToErr(self, ev, records):
- return '\n'.join([safe_str(ev), ln('>> begin captured logging <<')] + \
- records + \
- [ln('>> end captured logging <<')])
|