123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import os
- import pprint
- import unittest
- import tempfile
- import _hotshot
- import gc
- from test import test_support
- # Silence Py3k warning
- hotshot = test_support.import_module('hotshot', deprecated=True)
- from hotshot.log import ENTER, EXIT, LINE
- from hotshot import stats
- def shortfilename(fn):
- # We use a really shortened filename since an exact match is made,
- # and the source may be either a Python source file or a
- # pre-compiled bytecode file.
- if fn:
- return os.path.splitext(os.path.basename(fn))[0]
- else:
- return fn
- class UnlinkingLogReader(hotshot.log.LogReader):
- """Extend the LogReader so the log file is unlinked when we're
- done with it."""
- def __init__(self, logfn):
- self.__logfn = logfn
- hotshot.log.LogReader.__init__(self, logfn)
- def next(self, index=None):
- try:
- return hotshot.log.LogReader.next(self)
- except StopIteration:
- self.close()
- os.unlink(self.__logfn)
- raise
- class HotShotTestCase(unittest.TestCase):
- def new_profiler(self, lineevents=0, linetimings=1):
- self.logfn = test_support.TESTFN
- return hotshot.Profile(self.logfn, lineevents, linetimings)
- def get_logreader(self):
- return UnlinkingLogReader(self.logfn)
- def get_events_wotime(self):
- L = []
- for event in self.get_logreader():
- what, (filename, lineno, funcname), tdelta = event
- L.append((what, (shortfilename(filename), lineno, funcname)))
- return L
- def check_events(self, expected):
- events = self.get_events_wotime()
- if events != expected:
- self.fail(
- "events did not match expectation; got:\n%s\nexpected:\n%s"
- % (pprint.pformat(events), pprint.pformat(expected)))
- def run_test(self, callable, events, profiler=None):
- if profiler is None:
- profiler = self.new_profiler()
- self.assertTrue(not profiler._prof.closed)
- profiler.runcall(callable)
- self.assertTrue(not profiler._prof.closed)
- profiler.close()
- self.assertTrue(profiler._prof.closed)
- self.check_events(events)
- def test_addinfo(self):
- def f(p):
- p.addinfo("test-key", "test-value")
- profiler = self.new_profiler()
- profiler.runcall(f, profiler)
- profiler.close()
- log = self.get_logreader()
- info = log._info
- list(log)
- self.assertTrue(info["test-key"] == ["test-value"])
- def test_line_numbers(self):
- def f():
- y = 2
- x = 1
- def g():
- f()
- f_lineno = f.func_code.co_firstlineno
- g_lineno = g.func_code.co_firstlineno
- events = [(ENTER, ("test_hotshot", g_lineno, "g")),
- (LINE, ("test_hotshot", g_lineno+1, "g")),
- (ENTER, ("test_hotshot", f_lineno, "f")),
- (LINE, ("test_hotshot", f_lineno+1, "f")),
- (LINE, ("test_hotshot", f_lineno+2, "f")),
- (EXIT, ("test_hotshot", f_lineno, "f")),
- (EXIT, ("test_hotshot", g_lineno, "g")),
- ]
- self.run_test(g, events, self.new_profiler(lineevents=1))
- def test_start_stop(self):
- # Make sure we don't return NULL in the start() and stop()
- # methods when there isn't an error. Bug in 2.2 noted by
- # Anthony Baxter.
- profiler = self.new_profiler()
- profiler.start()
- profiler.stop()
- profiler.close()
- os.unlink(self.logfn)
- def test_bad_sys_path(self):
- import sys
- import os
- orig_path = sys.path
- coverage = hotshot._hotshot.coverage
- try:
- # verify we require a list for sys.path
- sys.path = 'abc'
- self.assertRaises(RuntimeError, coverage, test_support.TESTFN)
- # verify that we require sys.path exists
- del sys.path
- self.assertRaises(RuntimeError, coverage, test_support.TESTFN)
- finally:
- sys.path = orig_path
- if os.path.exists(test_support.TESTFN):
- os.remove(test_support.TESTFN)
- def test_logreader_eof_error(self):
- emptyfile = tempfile.NamedTemporaryFile()
- try:
- self.assertRaises((IOError, EOFError), _hotshot.logreader,
- emptyfile.name)
- finally:
- emptyfile.close()
- gc.collect()
- def test_load_stats(self):
- def start(prof):
- prof.start()
- # Make sure stats can be loaded when start and stop of profiler
- # are not executed in the same stack frame.
- profiler = self.new_profiler()
- start(profiler)
- profiler.stop()
- profiler.close()
- stats.load(self.logfn)
- os.unlink(self.logfn)
- def test_large_info(self):
- p = self.new_profiler()
- self.assertRaises(ValueError, p.addinfo, "A", "A" * 0xfceb)
- def test_main():
- test_support.run_unittest(HotShotTestCase)
- if __name__ == "__main__":
- test_main()
|