logcapture.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. """
  2. This plugin captures logging statements issued during test execution. When an
  3. error or failure occurs, the captured log messages are attached to the running
  4. test in the test.capturedLogging attribute, and displayed with the error failure
  5. output. It is enabled by default but can be turned off with the option
  6. ``--nologcapture``.
  7. You can filter captured logging statements with the ``--logging-filter`` option.
  8. If set, it specifies which logger(s) will be captured; loggers that do not match
  9. will be passed. Example: specifying ``--logging-filter=sqlalchemy,myapp``
  10. will ensure that only statements logged via sqlalchemy.engine, myapp
  11. or myapp.foo.bar logger will be logged.
  12. You can remove other installed logging handlers with the
  13. ``--logging-clear-handlers`` option.
  14. """
  15. import logging
  16. from logging import Handler
  17. import threading
  18. from nose.plugins.base import Plugin
  19. from nose.util import anyp, ln, safe_str
  20. try:
  21. from cStringIO import StringIO
  22. except ImportError:
  23. from StringIO import StringIO
  24. log = logging.getLogger(__name__)
  25. class FilterSet(object):
  26. def __init__(self, filter_components):
  27. self.inclusive, self.exclusive = self._partition(filter_components)
  28. # @staticmethod
  29. def _partition(components):
  30. inclusive, exclusive = [], []
  31. for component in components:
  32. if component.startswith('-'):
  33. exclusive.append(component[1:])
  34. else:
  35. inclusive.append(component)
  36. return inclusive, exclusive
  37. _partition = staticmethod(_partition)
  38. def allow(self, record):
  39. """returns whether this record should be printed"""
  40. if not self:
  41. # nothing to filter
  42. return True
  43. return self._allow(record) and not self._deny(record)
  44. # @staticmethod
  45. def _any_match(matchers, record):
  46. """return the bool of whether `record` starts with
  47. any item in `matchers`"""
  48. def record_matches_key(key):
  49. return record == key or record.startswith(key + '.')
  50. return anyp(bool, map(record_matches_key, matchers))
  51. _any_match = staticmethod(_any_match)
  52. def _allow(self, record):
  53. if not self.inclusive:
  54. return True
  55. return self._any_match(self.inclusive, record)
  56. def _deny(self, record):
  57. if not self.exclusive:
  58. return False
  59. return self._any_match(self.exclusive, record)
  60. class MyMemoryHandler(Handler):
  61. def __init__(self, logformat, logdatefmt, filters):
  62. Handler.__init__(self)
  63. fmt = logging.Formatter(logformat, logdatefmt)
  64. self.setFormatter(fmt)
  65. self.filterset = FilterSet(filters)
  66. self.buffer = []
  67. def emit(self, record):
  68. self.buffer.append(self.format(record))
  69. def flush(self):
  70. pass # do nothing
  71. def truncate(self):
  72. self.buffer = []
  73. def filter(self, record):
  74. if self.filterset.allow(record.name):
  75. return Handler.filter(self, record)
  76. def __getstate__(self):
  77. state = self.__dict__.copy()
  78. del state['lock']
  79. return state
  80. def __setstate__(self, state):
  81. self.__dict__.update(state)
  82. self.lock = threading.RLock()
  83. class LogCapture(Plugin):
  84. """
  85. Log capture plugin. Enabled by default. Disable with --nologcapture.
  86. This plugin captures logging statements issued during test execution,
  87. appending any output captured to the error or failure output,
  88. should the test fail or raise an error.
  89. """
  90. enabled = True
  91. env_opt = 'NOSE_NOLOGCAPTURE'
  92. name = 'logcapture'
  93. score = 500
  94. logformat = '%(name)s: %(levelname)s: %(message)s'
  95. logdatefmt = None
  96. clear = False
  97. filters = ['-nose']
  98. def options(self, parser, env):
  99. """Register commandline options.
  100. """
  101. parser.add_option(
  102. "--nologcapture", action="store_false",
  103. default=not env.get(self.env_opt), dest="logcapture",
  104. help="Disable logging capture plugin. "
  105. "Logging configuration will be left intact."
  106. " [NOSE_NOLOGCAPTURE]")
  107. parser.add_option(
  108. "--logging-format", action="store", dest="logcapture_format",
  109. default=env.get('NOSE_LOGFORMAT') or self.logformat,
  110. metavar="FORMAT",
  111. help="Specify custom format to print statements. "
  112. "Uses the same format as used by standard logging handlers."
  113. " [NOSE_LOGFORMAT]")
  114. parser.add_option(
  115. "--logging-datefmt", action="store", dest="logcapture_datefmt",
  116. default=env.get('NOSE_LOGDATEFMT') or self.logdatefmt,
  117. metavar="FORMAT",
  118. help="Specify custom date/time format to print statements. "
  119. "Uses the same format as used by standard logging handlers."
  120. " [NOSE_LOGDATEFMT]")
  121. parser.add_option(
  122. "--logging-filter", action="store", dest="logcapture_filters",
  123. default=env.get('NOSE_LOGFILTER'),
  124. metavar="FILTER",
  125. help="Specify which statements to filter in/out. "
  126. "By default, everything is captured. If the output is too"
  127. " verbose,\nuse this option to filter out needless output.\n"
  128. "Example: filter=foo will capture statements issued ONLY to\n"
  129. " foo or foo.what.ever.sub but not foobar or other logger.\n"
  130. "Specify multiple loggers with comma: filter=foo,bar,baz.\n"
  131. "If any logger name is prefixed with a minus, eg filter=-foo,\n"
  132. "it will be excluded rather than included. Default: "
  133. "exclude logging messages from nose itself (-nose)."
  134. " [NOSE_LOGFILTER]\n")
  135. parser.add_option(
  136. "--logging-clear-handlers", action="store_true",
  137. default=False, dest="logcapture_clear",
  138. help="Clear all other logging handlers")
  139. parser.add_option(
  140. "--logging-level", action="store",
  141. default='NOTSET', dest="logcapture_level",
  142. help="Set the log level to capture")
  143. def configure(self, options, conf):
  144. """Configure plugin.
  145. """
  146. self.conf = conf
  147. # Disable if explicitly disabled, or if logging is
  148. # configured via logging config file
  149. if not options.logcapture or conf.loggingConfig:
  150. self.enabled = False
  151. self.logformat = options.logcapture_format
  152. self.logdatefmt = options.logcapture_datefmt
  153. self.clear = options.logcapture_clear
  154. self.loglevel = options.logcapture_level
  155. if options.logcapture_filters:
  156. self.filters = options.logcapture_filters.split(',')
  157. def setupLoghandler(self):
  158. # setup our handler with root logger
  159. root_logger = logging.getLogger()
  160. if self.clear:
  161. if hasattr(root_logger, "handlers"):
  162. for handler in root_logger.handlers:
  163. root_logger.removeHandler(handler)
  164. for logger in logging.Logger.manager.loggerDict.values():
  165. if hasattr(logger, "handlers"):
  166. for handler in logger.handlers:
  167. logger.removeHandler(handler)
  168. # make sure there isn't one already
  169. # you can't simply use "if self.handler not in root_logger.handlers"
  170. # since at least in unit tests this doesn't work --
  171. # LogCapture() is instantiated for each test case while root_logger
  172. # is module global
  173. # so we always add new MyMemoryHandler instance
  174. for handler in root_logger.handlers[:]:
  175. if isinstance(handler, MyMemoryHandler):
  176. root_logger.handlers.remove(handler)
  177. root_logger.addHandler(self.handler)
  178. # to make sure everything gets captured
  179. loglevel = getattr(self, "loglevel", "NOTSET")
  180. root_logger.setLevel(getattr(logging, loglevel))
  181. def begin(self):
  182. """Set up logging handler before test run begins.
  183. """
  184. self.start()
  185. def start(self):
  186. self.handler = MyMemoryHandler(self.logformat, self.logdatefmt,
  187. self.filters)
  188. self.setupLoghandler()
  189. def end(self):
  190. pass
  191. def beforeTest(self, test):
  192. """Clear buffers and handlers before test.
  193. """
  194. self.setupLoghandler()
  195. def afterTest(self, test):
  196. """Clear buffers after test.
  197. """
  198. self.handler.truncate()
  199. def formatFailure(self, test, err):
  200. """Add captured log messages to failure output.
  201. """
  202. return self.formatError(test, err)
  203. def formatError(self, test, err):
  204. """Add captured log messages to error output.
  205. """
  206. # logic flow copied from Capture.formatError
  207. test.capturedLogging = records = self.formatLogRecords()
  208. if not records:
  209. return err
  210. ec, ev, tb = err
  211. return (ec, self.addCaptureToErr(ev, records), tb)
  212. def formatLogRecords(self):
  213. return map(safe_str, self.handler.buffer)
  214. def addCaptureToErr(self, ev, records):
  215. return '\n'.join([safe_str(ev), ln('>> begin captured logging <<')] + \
  216. records + \
  217. [ln('>> end captured logging <<')])