test_capi.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # Run the _testcapi module tests (tests for the Python/C API): by defn,
  2. # these are all functions _testcapi exports whose name begins with 'test_'.
  3. from __future__ import with_statement
  4. import sys
  5. import time
  6. import random
  7. import unittest
  8. from test import test_support as support
  9. try:
  10. import thread
  11. import threading
  12. except ImportError:
  13. thread = None
  14. threading = None
  15. # Skip this test if the _testcapi module isn't available.
  16. _testcapi = support.import_module('_testcapi')
  17. class CAPITest(unittest.TestCase):
  18. def test_buildvalue_N(self):
  19. _testcapi.test_buildvalue_N()
  20. @unittest.skipUnless(threading, 'Threading required for this test.')
  21. class TestPendingCalls(unittest.TestCase):
  22. def pendingcalls_submit(self, l, n):
  23. def callback():
  24. #this function can be interrupted by thread switching so let's
  25. #use an atomic operation
  26. l.append(None)
  27. for i in range(n):
  28. time.sleep(random.random()*0.02) #0.01 secs on average
  29. #try submitting callback until successful.
  30. #rely on regular interrupt to flush queue if we are
  31. #unsuccessful.
  32. while True:
  33. if _testcapi._pending_threadfunc(callback):
  34. break;
  35. def pendingcalls_wait(self, l, n, context = None):
  36. #now, stick around until l[0] has grown to 10
  37. count = 0;
  38. while len(l) != n:
  39. #this busy loop is where we expect to be interrupted to
  40. #run our callbacks. Note that callbacks are only run on the
  41. #main thread
  42. if False and support.verbose:
  43. print "(%i)"%(len(l),),
  44. for i in xrange(1000):
  45. a = i*i
  46. if context and not context.event.is_set():
  47. continue
  48. count += 1
  49. self.assertTrue(count < 10000,
  50. "timeout waiting for %i callbacks, got %i"%(n, len(l)))
  51. if False and support.verbose:
  52. print "(%i)"%(len(l),)
  53. def test_pendingcalls_threaded(self):
  54. #do every callback on a separate thread
  55. n = 32 #total callbacks
  56. threads = []
  57. class foo(object):pass
  58. context = foo()
  59. context.l = []
  60. context.n = 2 #submits per thread
  61. context.nThreads = n // context.n
  62. context.nFinished = 0
  63. context.lock = threading.Lock()
  64. context.event = threading.Event()
  65. threads = [threading.Thread(target=self.pendingcalls_thread,
  66. args=(context,))
  67. for i in range(context.nThreads)]
  68. with support.start_threads(threads):
  69. self.pendingcalls_wait(context.l, n, context)
  70. def pendingcalls_thread(self, context):
  71. try:
  72. self.pendingcalls_submit(context.l, context.n)
  73. finally:
  74. with context.lock:
  75. context.nFinished += 1
  76. nFinished = context.nFinished
  77. if False and support.verbose:
  78. print "finished threads: ", nFinished
  79. if nFinished == context.nThreads:
  80. context.event.set()
  81. def test_pendingcalls_non_threaded(self):
  82. #again, just using the main thread, likely they will all be dispatched at
  83. #once. It is ok to ask for too many, because we loop until we find a slot.
  84. #the loop can be interrupted to dispatch.
  85. #there are only 32 dispatch slots, so we go for twice that!
  86. l = []
  87. n = 64
  88. self.pendingcalls_submit(l, n)
  89. self.pendingcalls_wait(l, n)
  90. @unittest.skipUnless(threading and thread, 'Threading required for this test.')
  91. class TestThreadState(unittest.TestCase):
  92. @support.reap_threads
  93. def test_thread_state(self):
  94. # some extra thread-state tests driven via _testcapi
  95. def target():
  96. idents = []
  97. def callback():
  98. idents.append(thread.get_ident())
  99. _testcapi._test_thread_state(callback)
  100. a = b = callback
  101. time.sleep(1)
  102. # Check our main thread is in the list exactly 3 times.
  103. self.assertEqual(idents.count(thread.get_ident()), 3,
  104. "Couldn't find main thread correctly in the list")
  105. target()
  106. t = threading.Thread(target=target)
  107. t.start()
  108. t.join()
  109. def test_main():
  110. for name in dir(_testcapi):
  111. if name.startswith('test_'):
  112. test = getattr(_testcapi, name)
  113. if support.verbose:
  114. print "internal", name
  115. try:
  116. test()
  117. except _testcapi.error:
  118. raise support.TestFailed, sys.exc_info()[1]
  119. support.run_unittest(CAPITest, TestPendingCalls, TestThreadState)
  120. if __name__ == "__main__":
  121. test_main()