test_hotshot.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. import os
  2. import pprint
  3. import unittest
  4. import tempfile
  5. import _hotshot
  6. import gc
  7. from test import test_support
  8. # Silence Py3k warning
  9. hotshot = test_support.import_module('hotshot', deprecated=True)
  10. from hotshot.log import ENTER, EXIT, LINE
  11. from hotshot import stats
  12. def shortfilename(fn):
  13. # We use a really shortened filename since an exact match is made,
  14. # and the source may be either a Python source file or a
  15. # pre-compiled bytecode file.
  16. if fn:
  17. return os.path.splitext(os.path.basename(fn))[0]
  18. else:
  19. return fn
  20. class UnlinkingLogReader(hotshot.log.LogReader):
  21. """Extend the LogReader so the log file is unlinked when we're
  22. done with it."""
  23. def __init__(self, logfn):
  24. self.__logfn = logfn
  25. hotshot.log.LogReader.__init__(self, logfn)
  26. def next(self, index=None):
  27. try:
  28. return hotshot.log.LogReader.next(self)
  29. except StopIteration:
  30. self.close()
  31. os.unlink(self.__logfn)
  32. raise
  33. class HotShotTestCase(unittest.TestCase):
  34. def new_profiler(self, lineevents=0, linetimings=1):
  35. self.logfn = test_support.TESTFN
  36. return hotshot.Profile(self.logfn, lineevents, linetimings)
  37. def get_logreader(self):
  38. return UnlinkingLogReader(self.logfn)
  39. def get_events_wotime(self):
  40. L = []
  41. for event in self.get_logreader():
  42. what, (filename, lineno, funcname), tdelta = event
  43. L.append((what, (shortfilename(filename), lineno, funcname)))
  44. return L
  45. def check_events(self, expected):
  46. events = self.get_events_wotime()
  47. if events != expected:
  48. self.fail(
  49. "events did not match expectation; got:\n%s\nexpected:\n%s"
  50. % (pprint.pformat(events), pprint.pformat(expected)))
  51. def run_test(self, callable, events, profiler=None):
  52. if profiler is None:
  53. profiler = self.new_profiler()
  54. self.assertTrue(not profiler._prof.closed)
  55. profiler.runcall(callable)
  56. self.assertTrue(not profiler._prof.closed)
  57. profiler.close()
  58. self.assertTrue(profiler._prof.closed)
  59. self.check_events(events)
  60. def test_addinfo(self):
  61. def f(p):
  62. p.addinfo("test-key", "test-value")
  63. profiler = self.new_profiler()
  64. profiler.runcall(f, profiler)
  65. profiler.close()
  66. log = self.get_logreader()
  67. info = log._info
  68. list(log)
  69. self.assertTrue(info["test-key"] == ["test-value"])
  70. def test_line_numbers(self):
  71. def f():
  72. y = 2
  73. x = 1
  74. def g():
  75. f()
  76. f_lineno = f.func_code.co_firstlineno
  77. g_lineno = g.func_code.co_firstlineno
  78. events = [(ENTER, ("test_hotshot", g_lineno, "g")),
  79. (LINE, ("test_hotshot", g_lineno+1, "g")),
  80. (ENTER, ("test_hotshot", f_lineno, "f")),
  81. (LINE, ("test_hotshot", f_lineno+1, "f")),
  82. (LINE, ("test_hotshot", f_lineno+2, "f")),
  83. (EXIT, ("test_hotshot", f_lineno, "f")),
  84. (EXIT, ("test_hotshot", g_lineno, "g")),
  85. ]
  86. self.run_test(g, events, self.new_profiler(lineevents=1))
  87. def test_start_stop(self):
  88. # Make sure we don't return NULL in the start() and stop()
  89. # methods when there isn't an error. Bug in 2.2 noted by
  90. # Anthony Baxter.
  91. profiler = self.new_profiler()
  92. profiler.start()
  93. profiler.stop()
  94. profiler.close()
  95. os.unlink(self.logfn)
  96. def test_bad_sys_path(self):
  97. import sys
  98. import os
  99. orig_path = sys.path
  100. coverage = hotshot._hotshot.coverage
  101. try:
  102. # verify we require a list for sys.path
  103. sys.path = 'abc'
  104. self.assertRaises(RuntimeError, coverage, test_support.TESTFN)
  105. # verify that we require sys.path exists
  106. del sys.path
  107. self.assertRaises(RuntimeError, coverage, test_support.TESTFN)
  108. finally:
  109. sys.path = orig_path
  110. if os.path.exists(test_support.TESTFN):
  111. os.remove(test_support.TESTFN)
  112. def test_logreader_eof_error(self):
  113. emptyfile = tempfile.NamedTemporaryFile()
  114. try:
  115. self.assertRaises((IOError, EOFError), _hotshot.logreader,
  116. emptyfile.name)
  117. finally:
  118. emptyfile.close()
  119. gc.collect()
  120. def test_load_stats(self):
  121. def start(prof):
  122. prof.start()
  123. # Make sure stats can be loaded when start and stop of profiler
  124. # are not executed in the same stack frame.
  125. profiler = self.new_profiler()
  126. start(profiler)
  127. profiler.stop()
  128. profiler.close()
  129. stats.load(self.logfn)
  130. os.unlink(self.logfn)
  131. def test_large_info(self):
  132. p = self.new_profiler()
  133. self.assertRaises(ValueError, p.addinfo, "A", "A" * 0xfceb)
  134. def test_main():
  135. test_support.run_unittest(HotShotTestCase)
  136. if __name__ == "__main__":
  137. test_main()