plugintest.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416
  1. """
  2. Testing Plugins
  3. ===============
  4. The plugin interface is well-tested enough to safely unit test your
  5. use of its hooks with some level of confidence. However, there is also
  6. a mixin for unittest.TestCase called PluginTester that's designed to
  7. test plugins in their native runtime environment.
  8. Here's a simple example with a do-nothing plugin and a composed suite.
  9. >>> import unittest
  10. >>> from nose.plugins import Plugin, PluginTester
  11. >>> class FooPlugin(Plugin):
  12. ... pass
  13. >>> class TestPluginFoo(PluginTester, unittest.TestCase):
  14. ... activate = '--with-foo'
  15. ... plugins = [FooPlugin()]
  16. ... def test_foo(self):
  17. ... for line in self.output:
  18. ... # i.e. check for patterns
  19. ... pass
  20. ...
  21. ... # or check for a line containing ...
  22. ... assert "ValueError" in self.output
  23. ... def makeSuite(self):
  24. ... class TC(unittest.TestCase):
  25. ... def runTest(self):
  26. ... raise ValueError("I hate foo")
  27. ... return [TC('runTest')]
  28. ...
  29. >>> res = unittest.TestResult()
  30. >>> case = TestPluginFoo('test_foo')
  31. >>> _ = case(res)
  32. >>> res.errors
  33. []
  34. >>> res.failures
  35. []
  36. >>> res.wasSuccessful()
  37. True
  38. >>> res.testsRun
  39. 1
  40. And here is a more complex example of testing a plugin that has extra
  41. arguments and reads environment variables.
  42. >>> import unittest, os
  43. >>> from nose.plugins import Plugin, PluginTester
  44. >>> class FancyOutputter(Plugin):
  45. ... name = "fancy"
  46. ... def configure(self, options, conf):
  47. ... Plugin.configure(self, options, conf)
  48. ... if not self.enabled:
  49. ... return
  50. ... self.fanciness = 1
  51. ... if options.more_fancy:
  52. ... self.fanciness = 2
  53. ... if 'EVEN_FANCIER' in self.env:
  54. ... self.fanciness = 3
  55. ...
  56. ... def options(self, parser, env=os.environ):
  57. ... self.env = env
  58. ... parser.add_option('--more-fancy', action='store_true')
  59. ... Plugin.options(self, parser, env=env)
  60. ...
  61. ... def report(self, stream):
  62. ... stream.write("FANCY " * self.fanciness)
  63. ...
  64. >>> class TestFancyOutputter(PluginTester, unittest.TestCase):
  65. ... activate = '--with-fancy' # enables the plugin
  66. ... plugins = [FancyOutputter()]
  67. ... args = ['--more-fancy']
  68. ... env = {'EVEN_FANCIER': '1'}
  69. ...
  70. ... def test_fancy_output(self):
  71. ... assert "FANCY FANCY FANCY" in self.output, (
  72. ... "got: %s" % self.output)
  73. ... def makeSuite(self):
  74. ... class TC(unittest.TestCase):
  75. ... def runTest(self):
  76. ... raise ValueError("I hate fancy stuff")
  77. ... return [TC('runTest')]
  78. ...
  79. >>> res = unittest.TestResult()
  80. >>> case = TestFancyOutputter('test_fancy_output')
  81. >>> _ = case(res)
  82. >>> res.errors
  83. []
  84. >>> res.failures
  85. []
  86. >>> res.wasSuccessful()
  87. True
  88. >>> res.testsRun
  89. 1
  90. """
  91. import re
  92. import sys
  93. from warnings import warn
  94. try:
  95. from cStringIO import StringIO
  96. except ImportError:
  97. from StringIO import StringIO
  98. __all__ = ['PluginTester', 'run']
  99. from os import getpid
  100. class MultiProcessFile(object):
  101. """
  102. helper for testing multiprocessing
  103. multiprocessing poses a problem for doctests, since the strategy
  104. of replacing sys.stdout/stderr with file-like objects then
  105. inspecting the results won't work: the child processes will
  106. write to the objects, but the data will not be reflected
  107. in the parent doctest-ing process.
  108. The solution is to create file-like objects which will interact with
  109. multiprocessing in a more desirable way.
  110. All processes can write to this object, but only the creator can read.
  111. This allows the testing system to see a unified picture of I/O.
  112. """
  113. def __init__(self):
  114. # per advice at:
  115. # http://docs.python.org/library/multiprocessing.html#all-platforms
  116. self.__master = getpid()
  117. self.__queue = Manager().Queue()
  118. self.__buffer = StringIO()
  119. self.softspace = 0
  120. def buffer(self):
  121. if getpid() != self.__master:
  122. return
  123. from Queue import Empty
  124. from collections import defaultdict
  125. cache = defaultdict(str)
  126. while True:
  127. try:
  128. pid, data = self.__queue.get_nowait()
  129. except Empty:
  130. break
  131. if pid == ():
  132. #show parent output after children
  133. #this is what users see, usually
  134. pid = ( 1e100, ) # googol!
  135. cache[pid] += data
  136. for pid in sorted(cache):
  137. #self.__buffer.write( '%s wrote: %r\n' % (pid, cache[pid]) ) #DEBUG
  138. self.__buffer.write( cache[pid] )
  139. def write(self, data):
  140. # note that these pids are in the form of current_process()._identity
  141. # rather than OS pids
  142. from multiprocessing import current_process
  143. pid = current_process()._identity
  144. self.__queue.put((pid, data))
  145. def __iter__(self):
  146. "getattr doesn't work for iter()"
  147. self.buffer()
  148. return self.__buffer
  149. def seek(self, offset, whence=0):
  150. self.buffer()
  151. return self.__buffer.seek(offset, whence)
  152. def getvalue(self):
  153. self.buffer()
  154. return self.__buffer.getvalue()
  155. def __getattr__(self, attr):
  156. return getattr(self.__buffer, attr)
  157. try:
  158. from multiprocessing import Manager
  159. Buffer = MultiProcessFile
  160. except ImportError:
  161. Buffer = StringIO
  162. class PluginTester(object):
  163. """A mixin for testing nose plugins in their runtime environment.
  164. Subclass this and mix in unittest.TestCase to run integration/functional
  165. tests on your plugin. When setUp() is called, the stub test suite is
  166. executed with your plugin so that during an actual test you can inspect the
  167. artifacts of how your plugin interacted with the stub test suite.
  168. - activate
  169. - the argument to send nosetests to activate the plugin
  170. - suitepath
  171. - if set, this is the path of the suite to test. Otherwise, you
  172. will need to use the hook, makeSuite()
  173. - plugins
  174. - the list of plugins to make available during the run. Note
  175. that this does not mean these plugins will be *enabled* during
  176. the run -- only the plugins enabled by the activate argument
  177. or other settings in argv or env will be enabled.
  178. - args
  179. - a list of arguments to add to the nosetests command, in addition to
  180. the activate argument
  181. - env
  182. - optional dict of environment variables to send nosetests
  183. """
  184. activate = None
  185. suitepath = None
  186. args = None
  187. env = {}
  188. argv = None
  189. plugins = []
  190. ignoreFiles = None
  191. def makeSuite(self):
  192. """returns a suite object of tests to run (unittest.TestSuite())
  193. If self.suitepath is None, this must be implemented. The returned suite
  194. object will be executed with all plugins activated. It may return
  195. None.
  196. Here is an example of a basic suite object you can return ::
  197. >>> import unittest
  198. >>> class SomeTest(unittest.TestCase):
  199. ... def runTest(self):
  200. ... raise ValueError("Now do something, plugin!")
  201. ...
  202. >>> unittest.TestSuite([SomeTest()]) # doctest: +ELLIPSIS
  203. <unittest...TestSuite tests=[<...SomeTest testMethod=runTest>]>
  204. """
  205. raise NotImplementedError
  206. def _execPlugin(self):
  207. """execute the plugin on the internal test suite.
  208. """
  209. from nose.config import Config
  210. from nose.core import TestProgram
  211. from nose.plugins.manager import PluginManager
  212. suite = None
  213. stream = Buffer()
  214. conf = Config(env=self.env,
  215. stream=stream,
  216. plugins=PluginManager(plugins=self.plugins))
  217. if self.ignoreFiles is not None:
  218. conf.ignoreFiles = self.ignoreFiles
  219. if not self.suitepath:
  220. suite = self.makeSuite()
  221. self.nose = TestProgram(argv=self.argv, config=conf, suite=suite,
  222. exit=False)
  223. self.output = AccessDecorator(stream)
  224. def setUp(self):
  225. """runs nosetests with the specified test suite, all plugins
  226. activated.
  227. """
  228. self.argv = ['nosetests', self.activate]
  229. if self.args:
  230. self.argv.extend(self.args)
  231. if self.suitepath:
  232. self.argv.append(self.suitepath)
  233. self._execPlugin()
  234. class AccessDecorator(object):
  235. stream = None
  236. _buf = None
  237. def __init__(self, stream):
  238. self.stream = stream
  239. stream.seek(0)
  240. self._buf = stream.read()
  241. stream.seek(0)
  242. def __contains__(self, val):
  243. return val in self._buf
  244. def __iter__(self):
  245. return iter(self.stream)
  246. def __str__(self):
  247. return self._buf
  248. def blankline_separated_blocks(text):
  249. "a bunch of === characters is also considered a blank line"
  250. block = []
  251. for line in text.splitlines(True):
  252. block.append(line)
  253. line = line.strip()
  254. if not line or line.startswith('===') and not line.strip('='):
  255. yield "".join(block)
  256. block = []
  257. if block:
  258. yield "".join(block)
  259. def remove_stack_traces(out):
  260. # this regexp taken from Python 2.5's doctest
  261. traceback_re = re.compile(r"""
  262. # Grab the traceback header. Different versions of Python have
  263. # said different things on the first traceback line.
  264. ^(?P<hdr> Traceback\ \(
  265. (?: most\ recent\ call\ last
  266. | innermost\ last
  267. ) \) :
  268. )
  269. \s* $ # toss trailing whitespace on the header.
  270. (?P<stack> .*?) # don't blink: absorb stuff until...
  271. ^(?=\w) # a line *starts* with alphanum.
  272. .*?(?P<exception> \w+ ) # exception name
  273. (?P<msg> [:\n] .*) # the rest
  274. """, re.VERBOSE | re.MULTILINE | re.DOTALL)
  275. blocks = []
  276. for block in blankline_separated_blocks(out):
  277. blocks.append(traceback_re.sub(r"\g<hdr>\n...\n\g<exception>\g<msg>", block))
  278. return "".join(blocks)
  279. def simplify_warnings(out):
  280. warn_re = re.compile(r"""
  281. # Cut the file and line no, up to the warning name
  282. ^.*:\d+:\s
  283. (?P<category>\w+): \s+ # warning category
  284. (?P<detail>.+) $ \n? # warning message
  285. ^ .* $ # stack frame
  286. """, re.VERBOSE | re.MULTILINE)
  287. return warn_re.sub(r"\g<category>: \g<detail>", out)
  288. def remove_timings(out):
  289. return re.sub(
  290. r"Ran (\d+ tests?) in [0-9.]+s", r"Ran \1 in ...s", out)
  291. def munge_nose_output_for_doctest(out):
  292. """Modify nose output to make it easy to use in doctests."""
  293. out = remove_stack_traces(out)
  294. out = simplify_warnings(out)
  295. out = remove_timings(out)
  296. return out.strip()
  297. def run(*arg, **kw):
  298. """
  299. Specialized version of nose.run for use inside of doctests that
  300. test test runs.
  301. This version of run() prints the result output to stdout. Before
  302. printing, the output is processed by replacing the timing
  303. information with an ellipsis (...), removing traceback stacks, and
  304. removing trailing whitespace.
  305. Use this version of run wherever you are writing a doctest that
  306. tests nose (or unittest) test result output.
  307. Note: do not use doctest: +ELLIPSIS when testing nose output,
  308. since ellipses ("test_foo ... ok") in your expected test runner
  309. output may match multiple lines of output, causing spurious test
  310. passes!
  311. """
  312. from nose import run
  313. from nose.config import Config
  314. from nose.plugins.manager import PluginManager
  315. buffer = Buffer()
  316. if 'config' not in kw:
  317. plugins = kw.pop('plugins', [])
  318. if isinstance(plugins, list):
  319. plugins = PluginManager(plugins=plugins)
  320. env = kw.pop('env', {})
  321. kw['config'] = Config(env=env, plugins=plugins)
  322. if 'argv' not in kw:
  323. kw['argv'] = ['nosetests', '-v']
  324. kw['config'].stream = buffer
  325. # Set up buffering so that all output goes to our buffer,
  326. # or warn user if deprecated behavior is active. If this is not
  327. # done, prints and warnings will either be out of place or
  328. # disappear.
  329. stderr = sys.stderr
  330. stdout = sys.stdout
  331. if kw.pop('buffer_all', False):
  332. sys.stdout = sys.stderr = buffer
  333. restore = True
  334. else:
  335. restore = False
  336. warn("The behavior of nose.plugins.plugintest.run() will change in "
  337. "the next release of nose. The current behavior does not "
  338. "correctly account for output to stdout and stderr. To enable "
  339. "correct behavior, use run_buffered() instead, or pass "
  340. "the keyword argument buffer_all=True to run().",
  341. DeprecationWarning, stacklevel=2)
  342. try:
  343. run(*arg, **kw)
  344. finally:
  345. if restore:
  346. sys.stderr = stderr
  347. sys.stdout = stdout
  348. out = buffer.getvalue()
  349. print munge_nose_output_for_doctest(out)
  350. def run_buffered(*arg, **kw):
  351. kw['buffer_all'] = True
  352. run(*arg, **kw)
  353. if __name__ == '__main__':
  354. import doctest
  355. doctest.testmod()