123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501 |
- import unittest
- from test import test_support
- from contextlib import closing
- import gc
- import pickle
- import select
- import signal
- import subprocess
- import traceback
- import sys, os, time, errno
- if sys.platform in ('os2', 'riscos'):
- raise unittest.SkipTest("Can't test signal on %s" % sys.platform)
- class HandlerBCalled(Exception):
- pass
- def exit_subprocess():
- """Use os._exit(0) to exit the current subprocess.
- Otherwise, the test catches the SystemExit and continues executing
- in parallel with the original test, so you wind up with an
- exponential number of tests running concurrently.
- """
- os._exit(0)
- def ignoring_eintr(__func, *args, **kwargs):
- try:
- return __func(*args, **kwargs)
- except EnvironmentError as e:
- if e.errno != errno.EINTR:
- raise
- return None
- @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
- class InterProcessSignalTests(unittest.TestCase):
- MAX_DURATION = 20 # Entire test should last at most 20 sec.
- def setUp(self):
- self.using_gc = gc.isenabled()
- gc.disable()
- def tearDown(self):
- if self.using_gc:
- gc.enable()
- def format_frame(self, frame, limit=None):
- return ''.join(traceback.format_stack(frame, limit=limit))
- def handlerA(self, signum, frame):
- self.a_called = True
- if test_support.verbose:
- print "handlerA invoked from signal %s at:\n%s" % (
- signum, self.format_frame(frame, limit=1))
- def handlerB(self, signum, frame):
- self.b_called = True
- if test_support.verbose:
- print "handlerB invoked from signal %s at:\n%s" % (
- signum, self.format_frame(frame, limit=1))
- raise HandlerBCalled(signum, self.format_frame(frame))
- def wait(self, child):
- """Wait for child to finish, ignoring EINTR."""
- while True:
- try:
- child.wait()
- return
- except OSError as e:
- if e.errno != errno.EINTR:
- raise
- def run_test(self):
- # Install handlers. This function runs in a sub-process, so we
- # don't worry about re-setting the default handlers.
- signal.signal(signal.SIGHUP, self.handlerA)
- signal.signal(signal.SIGUSR1, self.handlerB)
- signal.signal(signal.SIGUSR2, signal.SIG_IGN)
- signal.signal(signal.SIGALRM, signal.default_int_handler)
- # Variables the signals will modify:
- self.a_called = False
- self.b_called = False
- # Let the sub-processes know who to send signals to.
- pid = os.getpid()
- if test_support.verbose:
- print "test runner's pid is", pid
- child = ignoring_eintr(subprocess.Popen, ['kill', '-HUP', str(pid)])
- if child:
- self.wait(child)
- if not self.a_called:
- time.sleep(1) # Give the signal time to be delivered.
- self.assertTrue(self.a_called)
- self.assertFalse(self.b_called)
- self.a_called = False
- # Make sure the signal isn't delivered while the previous
- # Popen object is being destroyed, because __del__ swallows
- # exceptions.
- del child
- try:
- child = subprocess.Popen(['kill', '-USR1', str(pid)])
- # This wait should be interrupted by the signal's exception.
- self.wait(child)
- time.sleep(1) # Give the signal time to be delivered.
- self.fail('HandlerBCalled exception not raised')
- except HandlerBCalled:
- self.assertTrue(self.b_called)
- self.assertFalse(self.a_called)
- if test_support.verbose:
- print "HandlerBCalled exception caught"
- child = ignoring_eintr(subprocess.Popen, ['kill', '-USR2', str(pid)])
- if child:
- self.wait(child) # Nothing should happen.
- try:
- signal.alarm(1)
- # The race condition in pause doesn't matter in this case,
- # since alarm is going to raise a KeyboardException, which
- # will skip the call.
- signal.pause()
- # But if another signal arrives before the alarm, pause
- # may return early.
- time.sleep(1)
- except KeyboardInterrupt:
- if test_support.verbose:
- print "KeyboardInterrupt (the alarm() went off)"
- except:
- self.fail("Some other exception woke us from pause: %s" %
- traceback.format_exc())
- else:
- self.fail("pause returned of its own accord, and the signal"
- " didn't arrive after another second.")
- # Issue 3864. Unknown if this affects earlier versions of freebsd also.
- @unittest.skipIf(sys.platform=='freebsd6',
- 'inter process signals not reliable (do not mix well with threading) '
- 'on freebsd6')
- def test_main(self):
- # This function spawns a child process to insulate the main
- # test-running process from all the signals. It then
- # communicates with that child process over a pipe and
- # re-raises information about any exceptions the child
- # raises. The real work happens in self.run_test().
- os_done_r, os_done_w = os.pipe()
- with closing(os.fdopen(os_done_r)) as done_r, \
- closing(os.fdopen(os_done_w, 'w')) as done_w:
- child = os.fork()
- if child == 0:
- # In the child process; run the test and report results
- # through the pipe.
- try:
- done_r.close()
- # Have to close done_w again here because
- # exit_subprocess() will skip the enclosing with block.
- with closing(done_w):
- try:
- self.run_test()
- except:
- pickle.dump(traceback.format_exc(), done_w)
- else:
- pickle.dump(None, done_w)
- except:
- print 'Uh oh, raised from pickle.'
- traceback.print_exc()
- finally:
- exit_subprocess()
- done_w.close()
- # Block for up to MAX_DURATION seconds for the test to finish.
- r, w, x = select.select([done_r], [], [], self.MAX_DURATION)
- if done_r in r:
- tb = pickle.load(done_r)
- if tb:
- self.fail(tb)
- else:
- os.kill(child, signal.SIGKILL)
- self.fail('Test deadlocked after %d seconds.' %
- @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
- class BasicSignalTests(unittest.TestCase):
- def trivial_signal_handler(self, *args):
- pass
- def test_out_of_range_signal_number_raises_error(self):
- self.assertRaises(ValueError, signal.getsignal, 4242)
- self.assertRaises(ValueError, signal.signal, 4242,
- self.trivial_signal_handler)
- def test_setting_signal_handler_to_none_raises_error(self):
- self.assertRaises(TypeError, signal.signal,
- signal.SIGUSR1, None)
- def test_getsignal(self):
- hup = signal.signal(signal.SIGHUP, self.trivial_signal_handler)
- self.assertEqual(signal.getsignal(signal.SIGHUP),
- self.trivial_signal_handler)
- signal.signal(signal.SIGHUP, hup)
- self.assertEqual(signal.getsignal(signal.SIGHUP), hup)
- @unittest.skipUnless(sys.platform == "win32", "Windows specific")
- class WindowsSignalTests(unittest.TestCase):
- def test_issue9324(self):
- # Updated for issue #10003, adding SIGBREAK
- handler = lambda x, y: None
- for sig in (signal.SIGABRT, signal.SIGBREAK, signal.SIGFPE,
- signal.SIGILL, signal.SIGINT, signal.SIGSEGV,
- signal.SIGTERM):
- # Set and then reset a handler for signals that work on windows
- signal.signal(sig, signal.signal(sig, handler))
- with self.assertRaises(ValueError):
- signal.signal(-1, handler)
- with self.assertRaises(ValueError):
- signal.signal(7, handler)
- class WakeupFDTests(unittest.TestCase):
- def test_invalid_fd(self):
- fd = test_support.make_bad_fd()
- self.assertRaises(ValueError, signal.set_wakeup_fd, fd)
- @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
- class WakeupSignalTests(unittest.TestCase):
- def test_wakeup_fd_early(self):
- import select
- signal.alarm(1)
- before_time = time.time()
- # We attempt to get a signal during the sleep,
- # before select is called
- time.sleep(self.TIMEOUT_FULL)
- mid_time = time.time()
- self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
- select.select([self.read], [], [], self.TIMEOUT_FULL)
- after_time = time.time()
- self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
- def test_wakeup_fd_during(self):
- import select
- signal.alarm(1)
- before_time = time.time()
- # We attempt to get a signal during the select call
- self.assertRaises(select.error, select.select,
- [self.read], [], [], self.TIMEOUT_FULL)
- after_time = time.time()
- self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
- def setUp(self):
- import fcntl
- self.alrm = signal.signal(signal.SIGALRM, lambda x,y:None)
- self.read, self.write = os.pipe()
- flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
- flags = flags | os.O_NONBLOCK
- fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
- self.old_wakeup = signal.set_wakeup_fd(self.write)
- def tearDown(self):
- signal.set_wakeup_fd(self.old_wakeup)
- os.close(self.read)
- os.close(self.write)
- signal.signal(signal.SIGALRM, self.alrm)
- @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
- class SiginterruptTest(unittest.TestCase):
- def setUp(self):
- """Install a no-op signal handler that can be set to allow
- interrupts or not, and arrange for the original signal handler to be
- re-installed when the test is finished.
- """
- self.signum = signal.SIGUSR1
- oldhandler = signal.signal(self.signum, lambda x,y: None)
- self.addCleanup(signal.signal, self.signum, oldhandler)
- def readpipe_interrupted(self):
- """Perform a read during which a signal will arrive. Return True if the
- read is interrupted by the signal and raises an exception. Return False
- if it returns normally.
- """
- # Create a pipe that can be used for the read. Also clean it up
- # when the test is over, since nothing else will (but see below for
- # the write end).
- r, w = os.pipe()
- self.addCleanup(os.close, r)
- # Create another process which can send a signal to this one to try
- # to interrupt the read.
- ppid = os.getpid()
- pid = os.fork()
- if pid == 0:
- # Child code: sleep to give the parent enough time to enter the
- # read() call (there's a race here, but it's really tricky to
- # eliminate it); then signal the parent process. Also, sleep
- # again to make it likely that the signal is delivered to the
- # parent process before the child exits. If the child exits
- # first, the write end of the pipe will be closed and the test
- # is invalid.
- try:
- time.sleep(0.2)
- os.kill(ppid, self.signum)
- time.sleep(0.2)
- finally:
- # No matter what, just exit as fast as possible now.
- exit_subprocess()
- else:
- # Parent code.
- # Make sure the child is eventually reaped, else it'll be a
- # zombie for the rest of the test suite run.
- self.addCleanup(os.waitpid, pid, 0)
- # Close the write end of the pipe. The child has a copy, so
- # it's not really closed until the child exits. We need it to
- # close when the child exits so that in the non-interrupt case
- # the read eventually completes, otherwise we could just close
- # it *after* the test.
- os.close(w)
- # Try the read and report whether it is interrupted or not to
- # the caller.
- try:
- d = os.read(r, 1)
- return False
- except OSError, err:
- if err.errno != errno.EINTR:
- raise
- return True
- def test_without_siginterrupt(self):
- """If a signal handler is installed and siginterrupt is not called
- at all, when that signal arrives, it interrupts a syscall that's in
- progress.
- """
- i = self.readpipe_interrupted()
- self.assertTrue(i)
- # Arrival of the signal shouldn't have changed anything.
- i = self.readpipe_interrupted()
- self.assertTrue(i)
- def test_siginterrupt_on(self):
- """If a signal handler is installed and siginterrupt is called with
- a true value for the second argument, when that signal arrives, it
- interrupts a syscall that's in progress.
- """
- signal.siginterrupt(self.signum, 1)
- i = self.readpipe_interrupted()
- self.assertTrue(i)
- # Arrival of the signal shouldn't have changed anything.
- i = self.readpipe_interrupted()
- self.assertTrue(i)
- def test_siginterrupt_off(self):
- """If a signal handler is installed and siginterrupt is called with
- a false value for the second argument, when that signal arrives, it
- does not interrupt a syscall that's in progress.
- """
- signal.siginterrupt(self.signum, 0)
- i = self.readpipe_interrupted()
- self.assertFalse(i)
- # Arrival of the signal shouldn't have changed anything.
- i = self.readpipe_interrupted()
- self.assertFalse(i)
- @unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
- class ItimerTest(unittest.TestCase):
- def setUp(self):
- self.hndl_called = False
- self.hndl_count = 0
- self.itimer = None
- self.old_alarm = signal.signal(signal.SIGALRM, self.sig_alrm)
- def tearDown(self):
- signal.signal(signal.SIGALRM, self.old_alarm)
- if self.itimer is not None: # test_itimer_exc doesn't change this attr
- # just ensure that itimer is stopped
- signal.setitimer(self.itimer, 0)
- def sig_alrm(self, *args):
- self.hndl_called = True
- if test_support.verbose:
- print("SIGALRM handler invoked", args)
- def sig_vtalrm(self, *args):
- self.hndl_called = True
- if self.hndl_count > 3:
- # it shouldn't be here, because it should have been disabled.
- raise signal.ItimerError("setitimer didn't disable ITIMER_VIRTUAL "
- "timer.")
- elif self.hndl_count == 3:
- # disable ITIMER_VIRTUAL, this function shouldn't be called anymore
- signal.setitimer(signal.ITIMER_VIRTUAL, 0)
- if test_support.verbose:
- print("last SIGVTALRM handler call")
- self.hndl_count += 1
- if test_support.verbose:
- print("SIGVTALRM handler invoked", args)
- def sig_prof(self, *args):
- self.hndl_called = True
- signal.setitimer(signal.ITIMER_PROF, 0)
- if test_support.verbose:
- print("SIGPROF handler invoked", args)
- def test_itimer_exc(self):
- # XXX I'm assuming -1 is an invalid itimer, but maybe some platform
- # defines it ?
- self.assertRaises(signal.ItimerError, signal.setitimer, -1, 0)
- # Negative times are treated as zero on some platforms.
- if 0:
- self.assertRaises(signal.ItimerError,
- signal.setitimer, signal.ITIMER_REAL, -1)
- def test_itimer_real(self):
- self.itimer = signal.ITIMER_REAL
- signal.setitimer(self.itimer, 1.0)
- if test_support.verbose:
- print("\ncall pause()...")
- signal.pause()
- self.assertEqual(self.hndl_called, True)
- # Issue 3864. Unknown if this affects earlier versions of freebsd also.
- @unittest.skipIf(sys.platform in ('freebsd6', 'netbsd5'),
- 'itimer not reliable (does not mix well with threading) on some BSDs.')
- def test_itimer_virtual(self):
- self.itimer = signal.ITIMER_VIRTUAL
- signal.signal(signal.SIGVTALRM, self.sig_vtalrm)
- signal.setitimer(self.itimer, 0.3, 0.2)
- start_time = time.time()
- while time.time() - start_time < 60.0:
- # use up some virtual time by doing real work
- _ = pow(12345, 67890, 10000019)
- if signal.getitimer(self.itimer) == (0.0, 0.0):
- break # sig_vtalrm handler stopped this itimer
- else: # Issue 8424
- self.skipTest("timeout: likely cause: machine too slow or load too "
- "high")
- # virtual itimer should be (0.0, 0.0) now
- self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
- # and the handler should have been called
- self.assertEqual(self.hndl_called, True)
- # Issue 3864. Unknown if this affects earlier versions of freebsd also.
- @unittest.skipIf(sys.platform=='freebsd6',
- 'itimer not reliable (does not mix well with threading) on freebsd6')
- def test_itimer_prof(self):
- self.itimer = signal.ITIMER_PROF
- signal.signal(signal.SIGPROF, self.sig_prof)
- signal.setitimer(self.itimer, 0.2, 0.2)
- start_time = time.time()
- while time.time() - start_time < 60.0:
- # do some work
- _ = pow(12345, 67890, 10000019)
- if signal.getitimer(self.itimer) == (0.0, 0.0):
- break # sig_prof handler stopped this itimer
- else: # Issue 8424
- self.skipTest("timeout: likely cause: machine too slow or load too "
- "high")
- # profiling itimer should be (0.0, 0.0) now
- self.assertEqual(signal.getitimer(self.itimer), (0.0, 0.0))
- # and the handler should have been called
- self.assertEqual(self.hndl_called, True)
- def test_main():
- test_support.run_unittest(BasicSignalTests, InterProcessSignalTests,
- WakeupFDTests, WakeupSignalTests,
- SiginterruptTest, ItimerTest,
- WindowsSignalTests)
- if __name__ == "__main__":
- test_main()