test_signal.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. import unittest
  2. from test import test_support
  3. from contextlib import closing
  4. import gc
  5. import pickle
  6. import select
  7. import signal
  8. import subprocess
  9. import traceback
  10. import sys, os, time, errno
  11. if sys.platform in ('os2', 'riscos'):
  12. raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
  13. class HandlerBCalled(Exception):
  14. pass
  15. def exit_subprocess():
  16. """Use os._exit(0) to exit the current subprocess.
  17. Otherwise, the test catches the SystemExit and continues executing
  18. in parallel with the original test, so you wind up with an
  19. exponential number of tests running concurrently.
  20. """
  21. os._exit(0)
  22. def ignoring_eintr(__func, *args, **kwargs):
  23. try:
  24. return __func(*args, **kwargs)
  25. except EnvironmentError as e:
  26. if e.errno != errno.EINTR:
  27. raise
  28. return None
  29. @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
  30. class InterProcessSignalTests(unittest.TestCase):
  31. MAX_DURATION = 20 # Entire test should last at most 20 sec.
  32. def setUp(self):
  33. self.using_gc = gc.isenabled()
  34. gc.disable()
  35. def tearDown(self):
  36. if self.using_gc:
  37. gc.enable()
  38. def format_frame(self, frame, limit=None):
  39. return ''.join(traceback.format_stack(frame, limit=limit))
  40. def handlerA(self, signum, frame):
  41. self.a_called = True
  42. if test_support.verbose:
  43. print "handlerA invoked from signal %s at:\n%s" % (
  44. signum, self.format_frame(frame, limit=1))
  45. def handlerB(self, signum, frame):
  46. self.b_called = True
  47. if test_support.verbose:
  48. print "handlerB invoked from signal %s at:\n%s" % (
  49. signum, self.format_frame(frame, limit=1))
  50. raise HandlerBCalled(signum, self.format_frame(frame))
  51. def wait(self, child):
  52. """Wait for child to finish, ignoring EINTR."""
  53. while True:
  54. try:
  55. child.wait()
  56. return
  57. except OSError as e:
  58. if e.errno != errno.EINTR:
  59. raise
  60. def run_test(self):
  61. # Install handlers. This function runs in a sub-process, so we
  62. # don't worry about re-setting the default handlers.
  63. signal.signal(signal.SIGHUP, self.handlerA)
  64. signal.signal(signal.SIGUSR1, self.handlerB)
  65. signal.signal(signal.SIGUSR2, signal.SIG_IGN)
  66. signal.signal(signal.SIGALRM, signal.default_int_handler)
  67. # Variables the signals will modify:
  68. self.a_called = False
  69. self.b_called = False
  70. # Let the sub-processes know who to send signals to.
  71. pid = os.getpid()
  72. if test_support.verbose:
  73. print "test runner's pid is", pid
  74. child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
  75. if child:
  76. self.wait(child)
  77. if not self.a_called:
  78. time.sleep(1) # Give the signal time to be delivered.
  79. self.assertTrue(self.a_called)
  80. self.assertFalse(self.b_called)
  81. self.a_called = False
  82. # Make sure the signal isn't delivered while the previous
  83. # Popen object is being destroyed, because __del__ swallows
  84. # exceptions.
  85. del child
  86. try:
  87. child = subprocess.Popen(['kill', '-USR1', str(pid)])
  88. # This wait should be interrupted by the signal's exception.
  89. self.wait(child)
  90. time.sleep(1) # Give the signal time to be delivered.
  91. self.fail('HandlerBCalled exception not raised')
  92. except HandlerBCalled:
  93. self.assertTrue(self.b_called)
  94. self.assertFalse(self.a_called)
  95. if test_support.verbose:
  96. print "HandlerBCalled exception caught"
  97. child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
  98. if child:
  99. self.wait(child) # Nothing should happen.
  100. try:
  101. signal.alarm(1)
  102. # The race condition in pause doesn't matter in this case,
  103. # since alarm is going to raise a KeyboardException, which
  104. # will skip the call.
  105. signal.pause()
  106. # But if another signal arrives before the alarm, pause
  107. # may return early.
  108. time.sleep(1)
  109. except KeyboardInterrupt:
  110. if test_support.verbose:
  111. print "KeyboardInterrupt (the alarm() went off)"
  112. except:
  113. self.fail("Some other exception woke us from pause: %s" %
  114. traceback.format_exc())
  115. else:
  116. self.fail("pause returned of its own accord, and the signal"
  117. " didn't arrive after another second.")
  118. # Issue 3864. Unknown if this affects earlier versions of freebsd also.
  119. @unittest.skipIf(sys.platform=='freebsd6',
  120. 'inter process signals not reliable (do not mix well with threading) '
  121. 'on freebsd6')
  122. def test_main(self):
  123. # This function spawns a child process to insulate the main
  124. # test-running process from all the signals. It then
  125. # communicates with that child process over a pipe and
  126. # re-raises information about any exceptions the child
  127. # raises. The real work happens in self.run_test().
  128. os_done_r, os_done_w = os.pipe()
  129. with closing(os.fdopen(os_done_r)) as done_r, \
  130. closing(os.fdopen(os_done_w, 'w')) as done_w:
  131. child = os.fork()
  132. if child == 0:
  133. # In the child process; run the test and report results
  134. # through the pipe.
  135. try:
  136. done_r.close()
  137. # Have to close done_w again here because
  138. # exit_subprocess() will skip the enclosing with block.
  139. with closing(done_w):
  140. try:
  141. self.run_test()
  142. except:
  143. pickle.dump(traceback.format_exc(), done_w)
  144. else:
  145. pickle.dump(None, done_w)
  146. except:
  147. print 'Uh oh, raised from pickle.'
  148. traceback.print_exc()
  149. finally:
  150. exit_subprocess()
  151. done_w.close()
  152. # Block for up to MAX_DURATION seconds for the test to finish.
  153. r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
  154. if done_r in r:
  155. tb = pickle.load(done_r)
  156. if tb:
  157. self.fail(tb)
  158. else:
  159. os.kill(child, signal.SIGKILL)
  160. self.fail('Test deadlocked after %d seconds.' %
  161. self.MAX_DURATION)
  162. @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
  163. class BasicSignalTests(unittest.TestCase):
  164. def trivial_signal_handler(self, *args):
  165. pass
  166. def test_out_of_range_signal_number_raises_error(self):
  167. self.assertRaises(ValueError, signal.getsignal, 4242)
  168. self.assertRaises(ValueError, signal.signal, 4242,
  169. self.trivial_signal_handler)
  170. def test_setting_signal_handler_to_none_raises_error(self):
  171. self.assertRaises(TypeError, signal.signal,
  172. signal.SIGUSR1, None)
  173. def test_getsignal(self):
  174. hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
  175. self.assertEqual(signal.getsignal(signal.SIGHUP),
  176. self.trivial_signal_handler)
  177. signal.signal(signal.SIGHUP, hup)
  178. self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
  179. @unittest.skipUnless(sys.platform == "win32", "Windows specific")
  180. class WindowsSignalTests(unittest.TestCase):
  181. def test_issue9324(self):
  182. # Updated for issue #10003, adding SIGBREAK
  183. handler = lambda x, y: None
  184. for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
  185. signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
  186. signal.SIGTERM):
  187. # Set and then reset a handler for signals that work on windows
  188. signal.signal(sig, signal.signal(sig, handler))
  189. with self.assertRaises(ValueError):
  190. signal.signal(-1, handler)
  191. with self.assertRaises(ValueError):
  192. signal.signal(7, handler)
  193. class WakeupFDTests(unittest.TestCase):
  194. def test_invalid_fd(self):
  195. fd = test_support.make_bad_fd()
  196. self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
  197. @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
  198. class WakeupSignalTests(unittest.TestCase):
  199. TIMEOUT_FULL = 10
  200. TIMEOUT_HALF = 5
  201. def test_wakeup_fd_early(self):
  202. import select
  203. signal.alarm(1)
  204. before_time = time.time()
  205. # We attempt to get a signal during the sleep,
  206. # before select is called
  207. time.sleep(self.TIMEOUT_FULL)
  208. mid_time = time.time()
  209. self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
  210. select.select([self.read], [], [], self.TIMEOUT_FULL)
  211. after_time = time.time()
  212. self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
  213. def test_wakeup_fd_during(self):
  214. import select
  215. signal.alarm(1)
  216. before_time = time.time()
  217. # We attempt to get a signal during the select call
  218. self.assertRaises(select.error, select.select,
  219. [self.read], [], [], self.TIMEOUT_FULL)
  220. after_time = time.time()
  221. self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
  222. def setUp(self):
  223. import fcntl
  224. self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
  225. self.read, self.write = os.pipe()
  226. flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
  227. flags = flags | os.O_NONBLOCK
  228. fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
  229. self.old_wakeup = signal.set_wakeup_fd(self.write)
  230. def tearDown(self):
  231. signal.set_wakeup_fd(self.old_wakeup)
  232. os.close(self.read)
  233. os.close(self.write)
  234. signal.signal(signal.SIGALRM, self.alrm)
  235. @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
  236. class SiginterruptTest(unittest.TestCase):
  237. def setUp(self):
  238. """Install a no-op signal handler that can be set to allow
  239. interrupts or not, and arrange for the original signal handler to be
  240. re-installed when the test is finished.
  241. """
  242. self.signum = signal.SIGUSR1
  243. oldhandler = signal.signal(self.signum, lambda x,y: None)
  244. self.addCleanup(signal.signal, self.signum, oldhandler)
  245. def readpipe_interrupted(self):
  246. """Perform a read during which a signal will arrive. Return True if the
  247. read is interrupted by the signal and raises an exception. Return False
  248. if it returns normally.
  249. """
  250. # Create a pipe that can be used for the read. Also clean it up
  251. # when the test is over, since nothing else will (but see below for
  252. # the write end).
  253. r, w = os.pipe()
  254. self.addCleanup(os.close, r)
  255. # Create another process which can send a signal to this one to try
  256. # to interrupt the read.
  257. ppid = os.getpid()
  258. pid = os.fork()
  259. if pid == 0:
  260. # Child code: sleep to give the parent enough time to enter the
  261. # read() call (there's a race here, but it's really tricky to
  262. # eliminate it); then signal the parent process. Also, sleep
  263. # again to make it likely that the signal is delivered to the
  264. # parent process before the child exits. If the child exits
  265. # first, the write end of the pipe will be closed and the test
  266. # is invalid.
  267. try:
  268. time.sleep(0.2)
  269. os.kill(ppid, self.signum)
  270. time.sleep(0.2)
  271. finally:
  272. # No matter what, just exit as fast as possible now.
  273. exit_subprocess()
  274. else:
  275. # Parent code.
  276. # Make sure the child is eventually reaped, else it'll be a
  277. # zombie for the rest of the test suite run.
  278. self.addCleanup(os.waitpid, pid, 0)
  279. # Close the write end of the pipe. The child has a copy, so
  280. # it's not really closed until the child exits. We need it to
  281. # close when the child exits so that in the non-interrupt case
  282. # the read eventually completes, otherwise we could just close
  283. # it *after* the test.
  284. os.close(w)
  285. # Try the read and report whether it is interrupted or not to
  286. # the caller.
  287. try:
  288. d = os.read(r, 1)
  289. return False
  290. except OSError, err:
  291. if err.errno != errno.EINTR:
  292. raise
  293. return True
  294. def test_without_siginterrupt(self):
  295. """If a signal handler is installed and siginterrupt is not called
  296. at all, when that signal arrives, it interrupts a syscall that's in
  297. progress.
  298. """
  299. i = self.readpipe_interrupted()
  300. self.assertTrue(i)
  301. # Arrival of the signal shouldn't have changed anything.
  302. i = self.readpipe_interrupted()
  303. self.assertTrue(i)
  304. def test_siginterrupt_on(self):
  305. """If a signal handler is installed and siginterrupt is called with
  306. a true value for the second argument, when that signal arrives, it
  307. interrupts a syscall that's in progress.
  308. """
  309. signal.siginterrupt(self.signum, 1)
  310. i = self.readpipe_interrupted()
  311. self.assertTrue(i)
  312. # Arrival of the signal shouldn't have changed anything.
  313. i = self.readpipe_interrupted()
  314. self.assertTrue(i)
  315. def test_siginterrupt_off(self):
  316. """If a signal handler is installed and siginterrupt is called with
  317. a false value for the second argument, when that signal arrives, it
  318. does not interrupt a syscall that's in progress.
  319. """
  320. signal.siginterrupt(self.signum, 0)
  321. i = self.readpipe_interrupted()
  322. self.assertFalse(i)
  323. # Arrival of the signal shouldn't have changed anything.
  324. i = self.readpipe_interrupted()
  325. self.assertFalse(i)
  326. @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
  327. class ItimerTest(unittest.TestCase):
  328. def setUp(self):
  329. self.hndl_called = False
  330. self.hndl_count = 0
  331. self.itimer = None
  332. self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
  333. def tearDown(self):
  334. signal.signal(signal.SIGALRM, self.old_alarm)
  335. if self.itimer is not None: # test_itimer_exc doesn't change this attr
  336. # just ensure that itimer is stopped
  337. signal.setitimer(self.itimer, 0)
  338. def sig_alrm(self, *args):
  339. self.hndl_called = True
  340. if test_support.verbose:
  341. print("SIGALRM handler invoked", args)
  342. def sig_vtalrm(self, *args):
  343. self.hndl_called = True
  344. if self.hndl_count > 3:
  345. # it shouldn't be here, because it should have been disabled.
  346. raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
  347. "timer.")
  348. elif self.hndl_count == 3:
  349. # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
  350. signal.setitimer(signal.ITIMER_VIRTUAL, 0)
  351. if test_support.verbose:
  352. print("last SIGVTALRM handler call")
  353. self.hndl_count += 1
  354. if test_support.verbose:
  355. print("SIGVTALRM handler invoked", args)
  356. def sig_prof(self, *args):
  357. self.hndl_called = True
  358. signal.setitimer(signal.ITIMER_PROF, 0)
  359. if test_support.verbose:
  360. print("SIGPROF handler invoked", args)
  361. def test_itimer_exc(self):
  362. # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
  363. # defines it ?
  364. self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
  365. # Negative times are treated as zero on some platforms.
  366. if 0:
  367. self.assertRaises(signal.ItimerError,
  368. signal.setitimer, signal.ITIMER_REAL, -1)
  369. def test_itimer_real(self):
  370. self.itimer = signal.ITIMER_REAL
  371. signal.setitimer(self.itimer, 1.0)
  372. if test_support.verbose:
  373. print("\ncall pause()...")
  374. signal.pause()
  375. self.assertEqual(self.hndl_called, True)
  376. # Issue 3864. Unknown if this affects earlier versions of freebsd also.
  377. @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'),
  378. 'itimer not reliable (does not mix well with threading) on some BSDs.')
  379. def test_itimer_virtual(self):
  380. self.itimer = signal.ITIMER_VIRTUAL
  381. signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
  382. signal.setitimer(self.itimer, 0.3, 0.2)
  383. start_time = time.time()
  384. while time.time() - start_time < 60.0:
  385. # use up some virtual time by doing real work
  386. _ = pow(12345, 67890, 10000019)
  387. if signal.getitimer(self.itimer) == (0.0, 0.0):
  388. break # sig_vtalrm handler stopped this itimer
  389. else: # Issue 8424
  390. self.skipTest("timeout: likely cause: machine too slow or load too "
  391. "high")
  392. # virtual itimer should be (0.0, 0.0) now
  393. self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
  394. # and the handler should have been called
  395. self.assertEqual(self.hndl_called, True)
  396. # Issue 3864. Unknown if this affects earlier versions of freebsd also.
  397. @unittest.skipIf(sys.platform=='freebsd6',
  398. 'itimer not reliable (does not mix well with threading) on freebsd6')
  399. def test_itimer_prof(self):
  400. self.itimer = signal.ITIMER_PROF
  401. signal.signal(signal.SIGPROF, self.sig_prof)
  402. signal.setitimer(self.itimer, 0.2, 0.2)
  403. start_time = time.time()
  404. while time.time() - start_time < 60.0:
  405. # do some work
  406. _ = pow(12345, 67890, 10000019)
  407. if signal.getitimer(self.itimer) == (0.0, 0.0):
  408. break # sig_prof handler stopped this itimer
  409. else: # Issue 8424
  410. self.skipTest("timeout: likely cause: machine too slow or load too "
  411. "high")
  412. # profiling itimer should be (0.0, 0.0) now
  413. self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
  414. # and the handler should have been called
  415. self.assertEqual(self.hndl_called, True)
  416. def test_main():
  417. test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
  418. WakeupFDTests, WakeupSignalTests,
  419. SiginterruptTest, ItimerTest,
  420. WindowsSignalTests)
  421. if __name__ == "__main__":
  422. test_main()