test_multiprocessing.py 80 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679
  1. #
  2. # Unit tests for the multiprocessing package
  3. #
  4. import unittest
  5. import Queue
  6. import time
  7. import sys
  8. import os
  9. import gc
  10. import signal
  11. import array
  12. import socket
  13. import random
  14. import logging
  15. import errno
  16. import test.script_helper
  17. from test import test_support
  18. from StringIO import StringIO
  19. _multiprocessing = test_support.import_module('_multiprocessing')
  20. # import threading after _multiprocessing to raise a more relevant error
  21. # message: "No module named _multiprocessing". _multiprocessing is not compiled
  22. # without thread support.
  23. import threading
  24. # Work around broken sem_open implementations
  25. test_support.import_module('multiprocessing.synchronize')
  26. import multiprocessing.dummy
  27. import multiprocessing.connection
  28. import multiprocessing.managers
  29. import multiprocessing.heap
  30. import multiprocessing.pool
  31. from multiprocessing import util
  32. try:
  33. from multiprocessing import reduction
  34. HAS_REDUCTION = True
  35. except ImportError:
  36. HAS_REDUCTION = False
  37. try:
  38. from multiprocessing.sharedctypes import Value, copy
  39. HAS_SHAREDCTYPES = True
  40. except ImportError:
  41. HAS_SHAREDCTYPES = False
  42. try:
  43. import msvcrt
  44. except ImportError:
  45. msvcrt = None
  46. #
  47. #
  48. #
  49. latin = str
  50. #
  51. # Constants
  52. #
  53. LOG_LEVEL = util.SUBWARNING
  54. #LOG_LEVEL = logging.DEBUG
  55. DELTA = 0.1
  56. CHECK_TIMINGS = False # making true makes tests take a lot longer
  57. # and can sometimes cause some non-serious
  58. # failures because some calls block a bit
  59. # longer than expected
  60. if CHECK_TIMINGS:
  61. TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
  62. else:
  63. TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
  64. HAVE_GETVALUE = not getattr(_multiprocessing,
  65. 'HAVE_BROKEN_SEM_GETVALUE', False)
  66. WIN32 = (sys.platform == "win32")
  67. try:
  68. MAXFD = os.sysconf("SC_OPEN_MAX")
  69. except:
  70. MAXFD = 256
  71. #
  72. # Some tests require ctypes
  73. #
  74. try:
  75. from ctypes import Structure, c_int, c_double
  76. except ImportError:
  77. Structure = object
  78. c_int = c_double = None
  79. def check_enough_semaphores():
  80. """Check that the system supports enough semaphores to run the test."""
  81. # minimum number of semaphores available according to POSIX
  82. nsems_min = 256
  83. try:
  84. nsems = os.sysconf("SC_SEM_NSEMS_MAX")
  85. except (AttributeError, ValueError):
  86. # sysconf not available or setting not available
  87. return
  88. if nsems == -1 or nsems >= nsems_min:
  89. return
  90. raise unittest.SkipTest("The OS doesn't support enough semaphores "
  91. "to run the test (required: %d)." % nsems_min)
  92. #
  93. # Creates a wrapper for a function which records the time it takes to finish
  94. #
  95. class TimingWrapper(object):
  96. def __init__(self, func):
  97. self.func = func
  98. self.elapsed = None
  99. def __call__(self, *args, **kwds):
  100. t = time.time()
  101. try:
  102. return self.func(*args, **kwds)
  103. finally:
  104. self.elapsed = time.time() - t
  105. #
  106. # Base class for test cases
  107. #
  108. class BaseTestCase(object):
  109. ALLOWED_TYPES = ('processes', 'manager', 'threads')
  110. def assertTimingAlmostEqual(self, a, b):
  111. if CHECK_TIMINGS:
  112. self.assertAlmostEqual(a, b, 1)
  113. def assertReturnsIfImplemented(self, value, func, *args):
  114. try:
  115. res = func(*args)
  116. except NotImplementedError:
  117. pass
  118. else:
  119. return self.assertEqual(value, res)
  120. # For the sanity of Windows users, rather than crashing or freezing in
  121. # multiple ways.
  122. def __reduce__(self, *args):
  123. raise NotImplementedError("shouldn't try to pickle a test case")
  124. __reduce_ex__ = __reduce__
  125. #
  126. # Return the value of a semaphore
  127. #
  128. def get_value(self):
  129. try:
  130. return self.get_value()
  131. except AttributeError:
  132. try:
  133. return self._Semaphore__value
  134. except AttributeError:
  135. try:
  136. return self._value
  137. except AttributeError:
  138. raise NotImplementedError
  139. #
  140. # Testcases
  141. #
  142. class _TestProcess(BaseTestCase):
  143. ALLOWED_TYPES = ('processes', 'threads')
  144. def test_current(self):
  145. if self.TYPE == 'threads':
  146. self.skipTest('test not appropriate for {}'.format(self.TYPE))
  147. current = self.current_process()
  148. authkey = current.authkey
  149. self.assertTrue(current.is_alive())
  150. self.assertTrue(not current.daemon)
  151. self.assertIsInstance(authkey, bytes)
  152. self.assertTrue(len(authkey) > 0)
  153. self.assertEqual(current.ident, os.getpid())
  154. self.assertEqual(current.exitcode, None)
  155. @classmethod
  156. def _test(cls, q, *args, **kwds):
  157. current = cls.current_process()
  158. q.put(args)
  159. q.put(kwds)
  160. q.put(current.name)
  161. if cls.TYPE != 'threads':
  162. q.put(bytes(current.authkey))
  163. q.put(current.pid)
  164. def test_process(self):
  165. q = self.Queue(1)
  166. e = self.Event()
  167. args = (q, 1, 2)
  168. kwargs = {'hello':23, 'bye':2.54}
  169. name = 'SomeProcess'
  170. p = self.Process(
  171. target=self._test, args=args, kwargs=kwargs, name=name
  172. )
  173. p.daemon = True
  174. current = self.current_process()
  175. if self.TYPE != 'threads':
  176. self.assertEqual(p.authkey, current.authkey)
  177. self.assertEqual(p.is_alive(), False)
  178. self.assertEqual(p.daemon, True)
  179. self.assertNotIn(p, self.active_children())
  180. self.assertTrue(type(self.active_children()) is list)
  181. self.assertEqual(p.exitcode, None)
  182. p.start()
  183. self.assertEqual(p.exitcode, None)
  184. self.assertEqual(p.is_alive(), True)
  185. self.assertIn(p, self.active_children())
  186. self.assertEqual(q.get(), args[1:])
  187. self.assertEqual(q.get(), kwargs)
  188. self.assertEqual(q.get(), p.name)
  189. if self.TYPE != 'threads':
  190. self.assertEqual(q.get(), current.authkey)
  191. self.assertEqual(q.get(), p.pid)
  192. p.join()
  193. self.assertEqual(p.exitcode, 0)
  194. self.assertEqual(p.is_alive(), False)
  195. self.assertNotIn(p, self.active_children())
  196. @classmethod
  197. def _test_terminate(cls):
  198. time.sleep(1000)
  199. def test_terminate(self):
  200. if self.TYPE == 'threads':
  201. self.skipTest('test not appropriate for {}'.format(self.TYPE))
  202. p = self.Process(target=self._test_terminate)
  203. p.daemon = True
  204. p.start()
  205. self.assertEqual(p.is_alive(), True)
  206. self.assertIn(p, self.active_children())
  207. self.assertEqual(p.exitcode, None)
  208. p.terminate()
  209. join = TimingWrapper(p.join)
  210. self.assertEqual(join(), None)
  211. self.assertTimingAlmostEqual(join.elapsed, 0.0)
  212. self.assertEqual(p.is_alive(), False)
  213. self.assertNotIn(p, self.active_children())
  214. p.join()
  215. # XXX sometimes get p.exitcode == 0 on Windows ...
  216. #self.assertEqual(p.exitcode, -signal.SIGTERM)
  217. def test_cpu_count(self):
  218. try:
  219. cpus = multiprocessing.cpu_count()
  220. except NotImplementedError:
  221. cpus = 1
  222. self.assertTrue(type(cpus) is int)
  223. self.assertTrue(cpus >= 1)
  224. def test_active_children(self):
  225. self.assertEqual(type(self.active_children()), list)
  226. p = self.Process(target=time.sleep, args=(DELTA,))
  227. self.assertNotIn(p, self.active_children())
  228. p.daemon = True
  229. p.start()
  230. self.assertIn(p, self.active_children())
  231. p.join()
  232. self.assertNotIn(p, self.active_children())
  233. @classmethod
  234. def _test_recursion(cls, wconn, id):
  235. from multiprocessing import forking
  236. wconn.send(id)
  237. if len(id) < 2:
  238. for i in range(2):
  239. p = cls.Process(
  240. target=cls._test_recursion, args=(wconn, id+[i])
  241. )
  242. p.start()
  243. p.join()
  244. def test_recursion(self):
  245. rconn, wconn = self.Pipe(duplex=False)
  246. self._test_recursion(wconn, [])
  247. time.sleep(DELTA)
  248. result = []
  249. while rconn.poll():
  250. result.append(rconn.recv())
  251. expected = [
  252. [],
  253. [0],
  254. [0, 0],
  255. [0, 1],
  256. [1],
  257. [1, 0],
  258. [1, 1]
  259. ]
  260. self.assertEqual(result, expected)
  261. @classmethod
  262. def _test_sys_exit(cls, reason, testfn):
  263. sys.stderr = open(testfn, 'w')
  264. sys.exit(reason)
  265. def test_sys_exit(self):
  266. # See Issue 13854
  267. if self.TYPE == 'threads':
  268. self.skipTest('test not appropriate for {}'.format(self.TYPE))
  269. testfn = test_support.TESTFN
  270. self.addCleanup(test_support.unlink, testfn)
  271. for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
  272. p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
  273. p.daemon = True
  274. p.start()
  275. p.join(5)
  276. self.assertEqual(p.exitcode, code)
  277. with open(testfn, 'r') as f:
  278. self.assertEqual(f.read().rstrip(), str(reason))
  279. for reason in (True, False, 8):
  280. p = self.Process(target=sys.exit, args=(reason,))
  281. p.daemon = True
  282. p.start()
  283. p.join(5)
  284. self.assertEqual(p.exitcode, reason)
  285. #
  286. #
  287. #
  288. class _UpperCaser(multiprocessing.Process):
  289. def __init__(self):
  290. multiprocessing.Process.__init__(self)
  291. self.child_conn, self.parent_conn = multiprocessing.Pipe()
  292. def run(self):
  293. self.parent_conn.close()
  294. for s in iter(self.child_conn.recv, None):
  295. self.child_conn.send(s.upper())
  296. self.child_conn.close()
  297. def submit(self, s):
  298. assert type(s) is str
  299. self.parent_conn.send(s)
  300. return self.parent_conn.recv()
  301. def stop(self):
  302. self.parent_conn.send(None)
  303. self.parent_conn.close()
  304. self.child_conn.close()
  305. class _TestSubclassingProcess(BaseTestCase):
  306. ALLOWED_TYPES = ('processes',)
  307. def test_subclassing(self):
  308. uppercaser = _UpperCaser()
  309. uppercaser.daemon = True
  310. uppercaser.start()
  311. self.assertEqual(uppercaser.submit('hello'), 'HELLO')
  312. self.assertEqual(uppercaser.submit('world'), 'WORLD')
  313. uppercaser.stop()
  314. uppercaser.join()
  315. #
  316. #
  317. #
  318. def queue_empty(q):
  319. if hasattr(q, 'empty'):
  320. return q.empty()
  321. else:
  322. return q.qsize() == 0
  323. def queue_full(q, maxsize):
  324. if hasattr(q, 'full'):
  325. return q.full()
  326. else:
  327. return q.qsize() == maxsize
  328. class _TestQueue(BaseTestCase):
  329. @classmethod
  330. def _test_put(cls, queue, child_can_start, parent_can_continue):
  331. child_can_start.wait()
  332. for i in range(6):
  333. queue.get()
  334. parent_can_continue.set()
  335. def test_put(self):
  336. MAXSIZE = 6
  337. queue = self.Queue(maxsize=MAXSIZE)
  338. child_can_start = self.Event()
  339. parent_can_continue = self.Event()
  340. proc = self.Process(
  341. target=self._test_put,
  342. args=(queue, child_can_start, parent_can_continue)
  343. )
  344. proc.daemon = True
  345. proc.start()
  346. self.assertEqual(queue_empty(queue), True)
  347. self.assertEqual(queue_full(queue, MAXSIZE), False)
  348. queue.put(1)
  349. queue.put(2, True)
  350. queue.put(3, True, None)
  351. queue.put(4, False)
  352. queue.put(5, False, None)
  353. queue.put_nowait(6)
  354. # the values may be in buffer but not yet in pipe so sleep a bit
  355. time.sleep(DELTA)
  356. self.assertEqual(queue_empty(queue), False)
  357. self.assertEqual(queue_full(queue, MAXSIZE), True)
  358. put = TimingWrapper(queue.put)
  359. put_nowait = TimingWrapper(queue.put_nowait)
  360. self.assertRaises(Queue.Full, put, 7, False)
  361. self.assertTimingAlmostEqual(put.elapsed, 0)
  362. self.assertRaises(Queue.Full, put, 7, False, None)
  363. self.assertTimingAlmostEqual(put.elapsed, 0)
  364. self.assertRaises(Queue.Full, put_nowait, 7)
  365. self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
  366. self.assertRaises(Queue.Full, put, 7, True, TIMEOUT1)
  367. self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
  368. self.assertRaises(Queue.Full, put, 7, False, TIMEOUT2)
  369. self.assertTimingAlmostEqual(put.elapsed, 0)
  370. self.assertRaises(Queue.Full, put, 7, True, timeout=TIMEOUT3)
  371. self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
  372. child_can_start.set()
  373. parent_can_continue.wait()
  374. self.assertEqual(queue_empty(queue), True)
  375. self.assertEqual(queue_full(queue, MAXSIZE), False)
  376. proc.join()
  377. @classmethod
  378. def _test_get(cls, queue, child_can_start, parent_can_continue):
  379. child_can_start.wait()
  380. #queue.put(1)
  381. queue.put(2)
  382. queue.put(3)
  383. queue.put(4)
  384. queue.put(5)
  385. parent_can_continue.set()
  386. def test_get(self):
  387. queue = self.Queue()
  388. child_can_start = self.Event()
  389. parent_can_continue = self.Event()
  390. proc = self.Process(
  391. target=self._test_get,
  392. args=(queue, child_can_start, parent_can_continue)
  393. )
  394. proc.daemon = True
  395. proc.start()
  396. self.assertEqual(queue_empty(queue), True)
  397. child_can_start.set()
  398. parent_can_continue.wait()
  399. time.sleep(DELTA)
  400. self.assertEqual(queue_empty(queue), False)
  401. # Hangs unexpectedly, remove for now
  402. #self.assertEqual(queue.get(), 1)
  403. self.assertEqual(queue.get(True, None), 2)
  404. self.assertEqual(queue.get(True), 3)
  405. self.assertEqual(queue.get(timeout=1), 4)
  406. self.assertEqual(queue.get_nowait(), 5)
  407. self.assertEqual(queue_empty(queue), True)
  408. get = TimingWrapper(queue.get)
  409. get_nowait = TimingWrapper(queue.get_nowait)
  410. self.assertRaises(Queue.Empty, get, False)
  411. self.assertTimingAlmostEqual(get.elapsed, 0)
  412. self.assertRaises(Queue.Empty, get, False, None)
  413. self.assertTimingAlmostEqual(get.elapsed, 0)
  414. self.assertRaises(Queue.Empty, get_nowait)
  415. self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
  416. self.assertRaises(Queue.Empty, get, True, TIMEOUT1)
  417. self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
  418. self.assertRaises(Queue.Empty, get, False, TIMEOUT2)
  419. self.assertTimingAlmostEqual(get.elapsed, 0)
  420. self.assertRaises(Queue.Empty, get, timeout=TIMEOUT3)
  421. self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
  422. proc.join()
  423. @classmethod
  424. def _test_fork(cls, queue):
  425. for i in range(10, 20):
  426. queue.put(i)
  427. # note that at this point the items may only be buffered, so the
  428. # process cannot shutdown until the feeder thread has finished
  429. # pushing items onto the pipe.
  430. def test_fork(self):
  431. # Old versions of Queue would fail to create a new feeder
  432. # thread for a forked process if the original process had its
  433. # own feeder thread. This test checks that this no longer
  434. # happens.
  435. queue = self.Queue()
  436. # put items on queue so that main process starts a feeder thread
  437. for i in range(10):
  438. queue.put(i)
  439. # wait to make sure thread starts before we fork a new process
  440. time.sleep(DELTA)
  441. # fork process
  442. p = self.Process(target=self._test_fork, args=(queue,))
  443. p.daemon = True
  444. p.start()
  445. # check that all expected items are in the queue
  446. for i in range(20):
  447. self.assertEqual(queue.get(), i)
  448. self.assertRaises(Queue.Empty, queue.get, False)
  449. p.join()
  450. def test_qsize(self):
  451. q = self.Queue()
  452. try:
  453. self.assertEqual(q.qsize(), 0)
  454. except NotImplementedError:
  455. self.skipTest('qsize method not implemented')
  456. q.put(1)
  457. self.assertEqual(q.qsize(), 1)
  458. q.put(5)
  459. self.assertEqual(q.qsize(), 2)
  460. q.get()
  461. self.assertEqual(q.qsize(), 1)
  462. q.get()
  463. self.assertEqual(q.qsize(), 0)
  464. @classmethod
  465. def _test_task_done(cls, q):
  466. for obj in iter(q.get, None):
  467. time.sleep(DELTA)
  468. q.task_done()
  469. def test_task_done(self):
  470. queue = self.JoinableQueue()
  471. if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
  472. self.skipTest("requires 'queue.task_done()' method")
  473. workers = [self.Process(target=self._test_task_done, args=(queue,))
  474. for i in xrange(4)]
  475. for p in workers:
  476. p.daemon = True
  477. p.start()
  478. for i in xrange(10):
  479. queue.put(i)
  480. queue.join()
  481. for p in workers:
  482. queue.put(None)
  483. for p in workers:
  484. p.join()
  485. def test_no_import_lock_contention(self):
  486. with test_support.temp_cwd():
  487. module_name = 'imported_by_an_imported_module'
  488. with open(module_name + '.py', 'w') as f:
  489. f.write("""if 1:
  490. import multiprocessing
  491. q = multiprocessing.Queue()
  492. q.put('knock knock')
  493. q.get(timeout=3)
  494. q.close()
  495. """)
  496. with test_support.DirsOnSysPath(os.getcwd()):
  497. try:
  498. __import__(module_name)
  499. except Queue.Empty:
  500. self.fail("Probable regression on import lock contention;"
  501. " see Issue #22853")
  502. #
  503. #
  504. #
  505. class _TestLock(BaseTestCase):
  506. def test_lock(self):
  507. lock = self.Lock()
  508. self.assertEqual(lock.acquire(), True)
  509. self.assertEqual(lock.acquire(False), False)
  510. self.assertEqual(lock.release(), None)
  511. self.assertRaises((ValueError, threading.ThreadError), lock.release)
  512. def test_rlock(self):
  513. lock = self.RLock()
  514. self.assertEqual(lock.acquire(), True)
  515. self.assertEqual(lock.acquire(), True)
  516. self.assertEqual(lock.acquire(), True)
  517. self.assertEqual(lock.release(), None)
  518. self.assertEqual(lock.release(), None)
  519. self.assertEqual(lock.release(), None)
  520. self.assertRaises((AssertionError, RuntimeError), lock.release)
  521. def test_lock_context(self):
  522. with self.Lock():
  523. pass
  524. class _TestSemaphore(BaseTestCase):
  525. def _test_semaphore(self, sem):
  526. self.assertReturnsIfImplemented(2, get_value, sem)
  527. self.assertEqual(sem.acquire(), True)
  528. self.assertReturnsIfImplemented(1, get_value, sem)
  529. self.assertEqual(sem.acquire(), True)
  530. self.assertReturnsIfImplemented(0, get_value, sem)
  531. self.assertEqual(sem.acquire(False), False)
  532. self.assertReturnsIfImplemented(0, get_value, sem)
  533. self.assertEqual(sem.release(), None)
  534. self.assertReturnsIfImplemented(1, get_value, sem)
  535. self.assertEqual(sem.release(), None)
  536. self.assertReturnsIfImplemented(2, get_value, sem)
  537. def test_semaphore(self):
  538. sem = self.Semaphore(2)
  539. self._test_semaphore(sem)
  540. self.assertEqual(sem.release(), None)
  541. self.assertReturnsIfImplemented(3, get_value, sem)
  542. self.assertEqual(sem.release(), None)
  543. self.assertReturnsIfImplemented(4, get_value, sem)
  544. def test_bounded_semaphore(self):
  545. sem = self.BoundedSemaphore(2)
  546. self._test_semaphore(sem)
  547. # Currently fails on OS/X
  548. #if HAVE_GETVALUE:
  549. # self.assertRaises(ValueError, sem.release)
  550. # self.assertReturnsIfImplemented(2, get_value, sem)
  551. def test_timeout(self):
  552. if self.TYPE != 'processes':
  553. self.skipTest('test not appropriate for {}'.format(self.TYPE))
  554. sem = self.Semaphore(0)
  555. acquire = TimingWrapper(sem.acquire)
  556. self.assertEqual(acquire(False), False)
  557. self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
  558. self.assertEqual(acquire(False, None), False)
  559. self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
  560. self.assertEqual(acquire(False, TIMEOUT1), False)
  561. self.assertTimingAlmostEqual(acquire.elapsed, 0)
  562. self.assertEqual(acquire(True, TIMEOUT2), False)
  563. self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
  564. self.assertEqual(acquire(timeout=TIMEOUT3), False)
  565. self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
  566. class _TestCondition(BaseTestCase):
  567. @classmethod
  568. def f(cls, cond, sleeping, woken, timeout=None):
  569. cond.acquire()
  570. sleeping.release()
  571. cond.wait(timeout)
  572. woken.release()
  573. cond.release()
  574. def check_invariant(self, cond):
  575. # this is only supposed to succeed when there are no sleepers
  576. if self.TYPE == 'processes':
  577. try:
  578. sleepers = (cond._sleeping_count.get_value() -
  579. cond._woken_count.get_value())
  580. self.assertEqual(sleepers, 0)
  581. self.assertEqual(cond._wait_semaphore.get_value(), 0)
  582. except NotImplementedError:
  583. pass
  584. def test_notify(self):
  585. cond = self.Condition()
  586. sleeping = self.Semaphore(0)
  587. woken = self.Semaphore(0)
  588. p = self.Process(target=self.f, args=(cond, sleeping, woken))
  589. p.daemon = True
  590. p.start()
  591. p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
  592. p.daemon = True
  593. p.start()
  594. # wait for both children to start sleeping
  595. sleeping.acquire()
  596. sleeping.acquire()
  597. # check no process/thread has woken up
  598. time.sleep(DELTA)
  599. self.assertReturnsIfImplemented(0, get_value, woken)
  600. # wake up one process/thread
  601. cond.acquire()
  602. cond.notify()
  603. cond.release()
  604. # check one process/thread has woken up
  605. time.sleep(DELTA)
  606. self.assertReturnsIfImplemented(1, get_value, woken)
  607. # wake up another
  608. cond.acquire()
  609. cond.notify()
  610. cond.release()
  611. # check other has woken up
  612. time.sleep(DELTA)
  613. self.assertReturnsIfImplemented(2, get_value, woken)
  614. # check state is not mucked up
  615. self.check_invariant(cond)
  616. p.join()
  617. def test_notify_all(self):
  618. cond = self.Condition()
  619. sleeping = self.Semaphore(0)
  620. woken = self.Semaphore(0)
  621. # start some threads/processes which will timeout
  622. for i in range(3):
  623. p = self.Process(target=self.f,
  624. args=(cond, sleeping, woken, TIMEOUT1))
  625. p.daemon = True
  626. p.start()
  627. t = threading.Thread(target=self.f,
  628. args=(cond, sleeping, woken, TIMEOUT1))
  629. t.daemon = True
  630. t.start()
  631. # wait for them all to sleep
  632. for i in xrange(6):
  633. sleeping.acquire()
  634. # check they have all timed out
  635. for i in xrange(6):
  636. woken.acquire()
  637. self.assertReturnsIfImplemented(0, get_value, woken)
  638. # check state is not mucked up
  639. self.check_invariant(cond)
  640. # start some more threads/processes
  641. for i in range(3):
  642. p = self.Process(target=self.f, args=(cond, sleeping, woken))
  643. p.daemon = True
  644. p.start()
  645. t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
  646. t.daemon = True
  647. t.start()
  648. # wait for them to all sleep
  649. for i in xrange(6):
  650. sleeping.acquire()
  651. # check no process/thread has woken up
  652. time.sleep(DELTA)
  653. self.assertReturnsIfImplemented(0, get_value, woken)
  654. # wake them all up
  655. cond.acquire()
  656. cond.notify_all()
  657. cond.release()
  658. # check they have all woken
  659. time.sleep(DELTA)
  660. self.assertReturnsIfImplemented(6, get_value, woken)
  661. # check state is not mucked up
  662. self.check_invariant(cond)
  663. def test_timeout(self):
  664. cond = self.Condition()
  665. wait = TimingWrapper(cond.wait)
  666. cond.acquire()
  667. res = wait(TIMEOUT1)
  668. cond.release()
  669. self.assertEqual(res, None)
  670. self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
  671. class _TestEvent(BaseTestCase):
  672. @classmethod
  673. def _test_event(cls, event):
  674. time.sleep(TIMEOUT2)
  675. event.set()
  676. def test_event(self):
  677. event = self.Event()
  678. wait = TimingWrapper(event.wait)
  679. # Removed temporarily, due to API shear, this does not
  680. # work with threading._Event objects. is_set == isSet
  681. self.assertEqual(event.is_set(), False)
  682. # Removed, threading.Event.wait() will return the value of the __flag
  683. # instead of None. API Shear with the semaphore backed mp.Event
  684. self.assertEqual(wait(0.0), False)
  685. self.assertTimingAlmostEqual(wait.elapsed, 0.0)
  686. self.assertEqual(wait(TIMEOUT1), False)
  687. self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
  688. event.set()
  689. # See note above on the API differences
  690. self.assertEqual(event.is_set(), True)
  691. self.assertEqual(wait(), True)
  692. self.assertTimingAlmostEqual(wait.elapsed, 0.0)
  693. self.assertEqual(wait(TIMEOUT1), True)
  694. self.assertTimingAlmostEqual(wait.elapsed, 0.0)
  695. # self.assertEqual(event.is_set(), True)
  696. event.clear()
  697. #self.assertEqual(event.is_set(), False)
  698. p = self.Process(target=self._test_event, args=(event,))
  699. p.daemon = True
  700. p.start()
  701. self.assertEqual(wait(), True)
  702. #
  703. #
  704. #
  705. class _TestValue(BaseTestCase):
  706. ALLOWED_TYPES = ('processes',)
  707. codes_values = [
  708. ('i', 4343, 24234),
  709. ('d', 3.625, -4.25),
  710. ('h', -232, 234),
  711. ('c', latin('x'), latin('y'))
  712. ]
  713. def setUp(self):
  714. if not HAS_SHAREDCTYPES:
  715. self.skipTest("requires multiprocessing.sharedctypes")
  716. @classmethod
  717. def _test(cls, values):
  718. for sv, cv in zip(values, cls.codes_values):
  719. sv.value = cv[2]
  720. def test_value(self, raw=False):
  721. if raw:
  722. values = [self.RawValue(code, value)
  723. for code, value, _ in self.codes_values]
  724. else:
  725. values = [self.Value(code, value)
  726. for code, value, _ in self.codes_values]
  727. for sv, cv in zip(values, self.codes_values):
  728. self.assertEqual(sv.value, cv[1])
  729. proc = self.Process(target=self._test, args=(values,))
  730. proc.daemon = True
  731. proc.start()
  732. proc.join()
  733. for sv, cv in zip(values, self.codes_values):
  734. self.assertEqual(sv.value, cv[2])
  735. def test_rawvalue(self):
  736. self.test_value(raw=True)
  737. def test_getobj_getlock(self):
  738. val1 = self.Value('i', 5)
  739. lock1 = val1.get_lock()
  740. obj1 = val1.get_obj()
  741. val2 = self.Value('i', 5, lock=None)
  742. lock2 = val2.get_lock()
  743. obj2 = val2.get_obj()
  744. lock = self.Lock()
  745. val3 = self.Value('i', 5, lock=lock)
  746. lock3 = val3.get_lock()
  747. obj3 = val3.get_obj()
  748. self.assertEqual(lock, lock3)
  749. arr4 = self.Value('i', 5, lock=False)
  750. self.assertFalse(hasattr(arr4, 'get_lock'))
  751. self.assertFalse(hasattr(arr4, 'get_obj'))
  752. self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
  753. arr5 = self.RawValue('i', 5)
  754. self.assertFalse(hasattr(arr5, 'get_lock'))
  755. self.assertFalse(hasattr(arr5, 'get_obj'))
  756. class _TestArray(BaseTestCase):
  757. ALLOWED_TYPES = ('processes',)
  758. @classmethod
  759. def f(cls, seq):
  760. for i in range(1, len(seq)):
  761. seq[i] += seq[i-1]
  762. @unittest.skipIf(c_int is None, "requires _ctypes")
  763. def test_array(self, raw=False):
  764. seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
  765. if raw:
  766. arr = self.RawArray('i', seq)
  767. else:
  768. arr = self.Array('i', seq)
  769. self.assertEqual(len(arr), len(seq))
  770. self.assertEqual(arr[3], seq[3])
  771. self.assertEqual(list(arr[2:7]), list(seq[2:7]))
  772. arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
  773. self.assertEqual(list(arr[:]), seq)
  774. self.f(seq)
  775. p = self.Process(target=self.f, args=(arr,))
  776. p.daemon = True
  777. p.start()
  778. p.join()
  779. self.assertEqual(list(arr[:]), seq)
  780. @unittest.skipIf(c_int is None, "requires _ctypes")
  781. def test_array_from_size(self):
  782. size = 10
  783. # Test for zeroing (see issue #11675).
  784. # The repetition below strengthens the test by increasing the chances
  785. # of previously allocated non-zero memory being used for the new array
  786. # on the 2nd and 3rd loops.
  787. for _ in range(3):
  788. arr = self.Array('i', size)
  789. self.assertEqual(len(arr), size)
  790. self.assertEqual(list(arr), [0] * size)
  791. arr[:] = range(10)
  792. self.assertEqual(list(arr), range(10))
  793. del arr
  794. @unittest.skipIf(c_int is None, "requires _ctypes")
  795. def test_rawarray(self):
  796. self.test_array(raw=True)
  797. @unittest.skipIf(c_int is None, "requires _ctypes")
  798. def test_array_accepts_long(self):
  799. arr = self.Array('i', 10L)
  800. self.assertEqual(len(arr), 10)
  801. raw_arr = self.RawArray('i', 10L)
  802. self.assertEqual(len(raw_arr), 10)
  803. @unittest.skipIf(c_int is None, "requires _ctypes")
  804. def test_getobj_getlock_obj(self):
  805. arr1 = self.Array('i', range(10))
  806. lock1 = arr1.get_lock()
  807. obj1 = arr1.get_obj()
  808. arr2 = self.Array('i', range(10), lock=None)
  809. lock2 = arr2.get_lock()
  810. obj2 = arr2.get_obj()
  811. lock = self.Lock()
  812. arr3 = self.Array('i', range(10), lock=lock)
  813. lock3 = arr3.get_lock()
  814. obj3 = arr3.get_obj()
  815. self.assertEqual(lock, lock3)
  816. arr4 = self.Array('i', range(10), lock=False)
  817. self.assertFalse(hasattr(arr4, 'get_lock'))
  818. self.assertFalse(hasattr(arr4, 'get_obj'))
  819. self.assertRaises(AttributeError,
  820. self.Array, 'i', range(10), lock='notalock')
  821. arr5 = self.RawArray('i', range(10))
  822. self.assertFalse(hasattr(arr5, 'get_lock'))
  823. self.assertFalse(hasattr(arr5, 'get_obj'))
  824. #
  825. #
  826. #
  827. class _TestContainers(BaseTestCase):
  828. ALLOWED_TYPES = ('manager',)
  829. def test_list(self):
  830. a = self.list(range(10))
  831. self.assertEqual(a[:], range(10))
  832. b = self.list()
  833. self.assertEqual(b[:], [])
  834. b.extend(range(5))
  835. self.assertEqual(b[:], range(5))
  836. self.assertEqual(b[2], 2)
  837. self.assertEqual(b[2:10], [2,3,4])
  838. b *= 2
  839. self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
  840. self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
  841. self.assertEqual(a[:], range(10))
  842. d = [a, b]
  843. e = self.list(d)
  844. self.assertEqual(
  845. e[:],
  846. [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
  847. )
  848. f = self.list([a])
  849. a.append('hello')
  850. self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
  851. def test_dict(self):
  852. d = self.dict()
  853. indices = range(65, 70)
  854. for i in indices:
  855. d[i] = chr(i)
  856. self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
  857. self.assertEqual(sorted(d.keys()), indices)
  858. self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
  859. self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
  860. def test_namespace(self):
  861. n = self.Namespace()
  862. n.name = 'Bob'
  863. n.job = 'Builder'
  864. n._hidden = 'hidden'
  865. self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
  866. del n.job
  867. self.assertEqual(str(n), "Namespace(name='Bob')")
  868. self.assertTrue(hasattr(n, 'name'))
  869. self.assertTrue(not hasattr(n, 'job'))
  870. #
  871. #
  872. #
  873. def sqr(x, wait=0.0):
  874. time.sleep(wait)
  875. return x*x
  876. class SayWhenError(ValueError): pass
  877. def exception_throwing_generator(total, when):
  878. for i in range(total):
  879. if i == when:
  880. raise SayWhenError("Somebody said when")
  881. yield i
  882. class _TestPool(BaseTestCase):
  883. def test_apply(self):
  884. papply = self.pool.apply
  885. self.assertEqual(papply(sqr, (5,)), sqr(5))
  886. self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
  887. def test_map(self):
  888. pmap = self.pool.map
  889. self.assertEqual(pmap(sqr, range(10)), map(sqr, range(10)))
  890. self.assertEqual(pmap(sqr, range(100), chunksize=20),
  891. map(sqr, range(100)))
  892. def test_map_unplicklable(self):
  893. # Issue #19425 -- failure to pickle should not cause a hang
  894. if self.TYPE == 'threads':
  895. self.skipTest('test not appropriate for {}'.format(self.TYPE))
  896. class A(object):
  897. def __reduce__(self):
  898. raise RuntimeError('cannot pickle')
  899. with self.assertRaises(RuntimeError):
  900. self.pool.map(sqr, [A()]*10)
  901. def test_map_chunksize(self):
  902. try:
  903. self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
  904. except multiprocessing.TimeoutError:
  905. self.fail("pool.map_async with chunksize stalled on null list")
  906. def test_async(self):
  907. res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
  908. get = TimingWrapper(res.get)
  909. self.assertEqual(get(), 49)
  910. self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
  911. def test_async_timeout(self):
  912. res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
  913. get = TimingWrapper(res.get)
  914. self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
  915. self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
  916. def test_imap(self):
  917. it = self.pool.imap(sqr, range(10))
  918. self.assertEqual(list(it), map(sqr, range(10)))
  919. it = self.pool.imap(sqr, range(10))
  920. for i in range(10):
  921. self.assertEqual(it.next(), i*i)
  922. self.assertRaises(StopIteration, it.next)
  923. it = self.pool.imap(sqr, range(1000), chunksize=100)
  924. for i in range(1000):
  925. self.assertEqual(it.next(), i*i)
  926. self.assertRaises(StopIteration, it.next)
  927. def test_imap_handle_iterable_exception(self):
  928. if self.TYPE == 'manager':
  929. self.skipTest('test not appropriate for {}'.format(self.TYPE))
  930. it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
  931. for i in range(3):
  932. self.assertEqual(next(it), i*i)
  933. self.assertRaises(SayWhenError, it.next)
  934. # SayWhenError seen at start of problematic chunk's results
  935. it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
  936. for i in range(6):
  937. self.assertEqual(next(it), i*i)
  938. self.assertRaises(SayWhenError, it.next)
  939. it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
  940. for i in range(4):
  941. self.assertEqual(next(it), i*i)
  942. self.assertRaises(SayWhenError, it.next)
  943. def test_imap_unordered(self):
  944. it = self.pool.imap_unordered(sqr, range(1000))
  945. self.assertEqual(sorted(it), map(sqr, range(1000)))
  946. it = self.pool.imap_unordered(sqr, range(1000), chunksize=53)
  947. self.assertEqual(sorted(it), map(sqr, range(1000)))
  948. def test_imap_unordered_handle_iterable_exception(self):
  949. if self.TYPE == 'manager':
  950. self.skipTest('test not appropriate for {}'.format(self.TYPE))
  951. it = self.pool.imap_unordered(sqr,
  952. exception_throwing_generator(10, 3),
  953. 1)
  954. expected_values = map(sqr, range(10))
  955. with self.assertRaises(SayWhenError):
  956. # imap_unordered makes it difficult to anticipate the SayWhenError
  957. for i in range(10):
  958. value = next(it)
  959. self.assertIn(value, expected_values)
  960. expected_values.remove(value)
  961. it = self.pool.imap_unordered(sqr,
  962. exception_throwing_generator(20, 7),
  963. 2)
  964. expected_values = map(sqr, range(20))
  965. with self.assertRaises(SayWhenError):
  966. for i in range(20):
  967. value = next(it)
  968. self.assertIn(value, expected_values)
  969. expected_values.remove(value)
  970. def test_make_pool(self):
  971. self.assertRaises(ValueError, multiprocessing.Pool, -1)
  972. self.assertRaises(ValueError, multiprocessing.Pool, 0)
  973. p = multiprocessing.Pool(3)
  974. self.assertEqual(3, len(p._pool))
  975. p.close()
  976. p.join()
  977. def test_terminate(self):
  978. p = self.Pool(4)
  979. result = p.map_async(
  980. time.sleep, [0.1 for i in range(10000)], chunksize=1
  981. )
  982. p.terminate()
  983. join = TimingWrapper(p.join)
  984. join()
  985. self.assertTrue(join.elapsed < 0.2)
  986. def test_empty_iterable(self):
  987. # See Issue 12157
  988. p = self.Pool(1)
  989. self.assertEqual(p.map(sqr, []), [])
  990. self.assertEqual(list(p.imap(sqr, [])), [])
  991. self.assertEqual(list(p.imap_unordered(sqr, [])), [])
  992. self.assertEqual(p.map_async(sqr, []).get(), [])
  993. p.close()
  994. p.join()
  995. def unpickleable_result():
  996. return lambda: 42
  997. class _TestPoolWorkerErrors(BaseTestCase):
  998. ALLOWED_TYPES = ('processes', )
  999. def test_unpickleable_result(self):
  1000. from multiprocessing.pool import MaybeEncodingError
  1001. p = multiprocessing.Pool(2)
  1002. # Make sure we don't lose pool processes because of encoding errors.
  1003. for iteration in range(20):
  1004. res = p.apply_async(unpickleable_result)
  1005. self.assertRaises(MaybeEncodingError, res.get)
  1006. p.close()
  1007. p.join()
  1008. class _TestPoolWorkerLifetime(BaseTestCase):
  1009. ALLOWED_TYPES = ('processes', )
  1010. def test_pool_worker_lifetime(self):
  1011. p = multiprocessing.Pool(3, maxtasksperchild=10)
  1012. self.assertEqual(3, len(p._pool))
  1013. origworkerpids = [w.pid for w in p._pool]
  1014. # Run many tasks so each worker gets replaced (hopefully)
  1015. results = []
  1016. for i in range(100):
  1017. results.append(p.apply_async(sqr, (i, )))
  1018. # Fetch the results and verify we got the right answers,
  1019. # also ensuring all the tasks have completed.
  1020. for (j, res) in enumerate(results):
  1021. self.assertEqual(res.get(), sqr(j))
  1022. # Refill the pool
  1023. p._repopulate_pool()
  1024. # Wait until all workers are alive
  1025. # (countdown * DELTA = 5 seconds max startup process time)
  1026. countdown = 50
  1027. while countdown and not all(w.is_alive() for w in p._pool):
  1028. countdown -= 1
  1029. time.sleep(DELTA)
  1030. finalworkerpids = [w.pid for w in p._pool]
  1031. # All pids should be assigned. See issue #7805.
  1032. self.assertNotIn(None, origworkerpids)
  1033. self.assertNotIn(None, finalworkerpids)
  1034. # Finally, check that the worker pids have changed
  1035. self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
  1036. p.close()
  1037. p.join()
  1038. def test_pool_worker_lifetime_early_close(self):
  1039. # Issue #10332: closing a pool whose workers have limited lifetimes
  1040. # before all the tasks completed would make join() hang.
  1041. p = multiprocessing.Pool(3, maxtasksperchild=1)
  1042. results = []
  1043. for i in range(6):
  1044. results.append(p.apply_async(sqr, (i, 0.3)))
  1045. p.close()
  1046. p.join()
  1047. # check the results
  1048. for (j, res) in enumerate(results):
  1049. self.assertEqual(res.get(), sqr(j))
  1050. #
  1051. # Test that manager has expected number of shared objects left
  1052. #
  1053. class _TestZZZNumberOfObjects(BaseTestCase):
  1054. # Because test cases are sorted alphabetically, this one will get
  1055. # run after all the other tests for the manager. It tests that
  1056. # there have been no "reference leaks" for the manager's shared
  1057. # objects. Note the comment in _TestPool.test_terminate().
  1058. ALLOWED_TYPES = ('manager',)
  1059. def test_number_of_objects(self):
  1060. EXPECTED_NUMBER = 1 # the pool object is still alive
  1061. multiprocessing.active_children() # discard dead process objs
  1062. gc.collect() # do garbage collection
  1063. refs = self.manager._number_of_objects()
  1064. debug_info = self.manager._debug_info()
  1065. if refs != EXPECTED_NUMBER:
  1066. print self.manager._debug_info()
  1067. print debug_info
  1068. self.assertEqual(refs, EXPECTED_NUMBER)
  1069. #
  1070. # Test of creating a customized manager class
  1071. #
  1072. from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
  1073. class FooBar(object):
  1074. def f(self):
  1075. return 'f()'
  1076. def g(self):
  1077. raise ValueError
  1078. def _h(self):
  1079. return '_h()'
  1080. def baz():
  1081. for i in xrange(10):
  1082. yield i*i
  1083. class IteratorProxy(BaseProxy):
  1084. _exposed_ = ('next', '__next__')
  1085. def __iter__(self):
  1086. return self
  1087. def next(self):
  1088. return self._callmethod('next')
  1089. def __next__(self):
  1090. return self._callmethod('__next__')
  1091. class MyManager(BaseManager):
  1092. pass
  1093. MyManager.register('Foo', callable=FooBar)
  1094. MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
  1095. MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
  1096. class _TestMyManager(BaseTestCase):
  1097. ALLOWED_TYPES = ('manager',)
  1098. def test_mymanager(self):
  1099. manager = MyManager()
  1100. manager.start()
  1101. foo = manager.Foo()
  1102. bar = manager.Bar()
  1103. baz = manager.baz()
  1104. foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
  1105. bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
  1106. self.assertEqual(foo_methods, ['f', 'g'])
  1107. self.assertEqual(bar_methods, ['f', '_h'])
  1108. self.assertEqual(foo.f(), 'f()')
  1109. self.assertRaises(ValueError, foo.g)
  1110. self.assertEqual(foo._callmethod('f'), 'f()')
  1111. self.assertRaises(RemoteError, foo._callmethod, '_h')
  1112. self.assertEqual(bar.f(), 'f()')
  1113. self.assertEqual(bar._h(), '_h()')
  1114. self.assertEqual(bar._callmethod('f'), 'f()')
  1115. self.assertEqual(bar._callmethod('_h'), '_h()')
  1116. self.assertEqual(list(baz), [i*i for i in range(10)])
  1117. manager.shutdown()
  1118. #
  1119. # Test of connecting to a remote server and using xmlrpclib for serialization
  1120. #
  1121. _queue = Queue.Queue()
  1122. def get_queue():
  1123. return _queue
  1124. class QueueManager(BaseManager):
  1125. '''manager class used by server process'''
  1126. QueueManager.register('get_queue', callable=get_queue)
  1127. class QueueManager2(BaseManager):
  1128. '''manager class which specifies the same interface as QueueManager'''
  1129. QueueManager2.register('get_queue')
  1130. SERIALIZER = 'xmlrpclib'
  1131. class _TestRemoteManager(BaseTestCase):
  1132. ALLOWED_TYPES = ('manager',)
  1133. values = ['hello world', None, True, 2.25,
  1134. #'hall\xc3\xa5 v\xc3\xa4rlden'] # UTF-8
  1135. ]
  1136. result = values[:]
  1137. if test_support.have_unicode:
  1138. #result[-1] = u'hall\xe5 v\xe4rlden'
  1139. uvalue = test_support.u(r'\u043f\u0440\u0438\u0432\u0456\u0442 '
  1140. r'\u0441\u0432\u0456\u0442')
  1141. values.append(uvalue)
  1142. result.append(uvalue)
  1143. @classmethod
  1144. def _putter(cls, address, authkey):
  1145. manager = QueueManager2(
  1146. address=address, authkey=authkey, serializer=SERIALIZER
  1147. )
  1148. manager.connect()
  1149. queue = manager.get_queue()
  1150. # Note that xmlrpclib will deserialize object as a list not a tuple
  1151. queue.put(tuple(cls.values))
  1152. def test_remote(self):
  1153. authkey = os.urandom(32)
  1154. manager = QueueManager(
  1155. address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER
  1156. )
  1157. manager.start()
  1158. p = self.Process(target=self._putter, args=(manager.address, authkey))
  1159. p.daemon = True
  1160. p.start()
  1161. manager2 = QueueManager2(
  1162. address=manager.address, authkey=authkey, serializer=SERIALIZER
  1163. )
  1164. manager2.connect()
  1165. queue = manager2.get_queue()
  1166. self.assertEqual(queue.get(), self.result)
  1167. # Because we are using xmlrpclib for serialization instead of
  1168. # pickle this will cause a serialization error.
  1169. self.assertRaises(Exception, queue.put, time.sleep)
  1170. # Make queue finalizer run before the server is stopped
  1171. del queue
  1172. manager.shutdown()
  1173. class _TestManagerRestart(BaseTestCase):
  1174. @classmethod
  1175. def _putter(cls, address, authkey):
  1176. manager = QueueManager(
  1177. address=address, authkey=authkey, serializer=SERIALIZER)
  1178. manager.connect()
  1179. queue = manager.get_queue()
  1180. queue.put('hello world')
  1181. def test_rapid_restart(self):
  1182. authkey = os.urandom(32)
  1183. manager = QueueManager(
  1184. address=(test.test_support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
  1185. srvr = manager.get_server()
  1186. addr = srvr.address
  1187. # Close the connection.Listener socket which gets opened as a part
  1188. # of manager.get_server(). It's not needed for the test.
  1189. srvr.listener.close()
  1190. manager.start()
  1191. p = self.Process(target=self._putter, args=(manager.address, authkey))
  1192. p.daemon = True
  1193. p.start()
  1194. queue = manager.get_queue()
  1195. self.assertEqual(queue.get(), 'hello world')
  1196. del queue
  1197. manager.shutdown()
  1198. manager = QueueManager(
  1199. address=addr, authkey=authkey, serializer=SERIALIZER)
  1200. manager.start()
  1201. manager.shutdown()
  1202. #
  1203. #
  1204. #
  1205. SENTINEL = latin('')
  1206. class _TestConnection(BaseTestCase):
  1207. ALLOWED_TYPES = ('processes', 'threads')
  1208. @classmethod
  1209. def _echo(cls, conn):
  1210. for msg in iter(conn.recv_bytes, SENTINEL):
  1211. conn.send_bytes(msg)
  1212. conn.close()
  1213. def test_connection(self):
  1214. conn, child_conn = self.Pipe()
  1215. p = self.Process(target=self._echo, args=(child_conn,))
  1216. p.daemon = True
  1217. p.start()
  1218. seq = [1, 2.25, None]
  1219. msg = latin('hello world')
  1220. longmsg = msg * 10
  1221. arr = array.array('i', range(4))
  1222. if self.TYPE == 'processes':
  1223. self.assertEqual(type(conn.fileno()), int)
  1224. self.assertEqual(conn.send(seq), None)
  1225. self.assertEqual(conn.recv(), seq)
  1226. self.assertEqual(conn.send_bytes(msg), None)
  1227. self.assertEqual(conn.recv_bytes(), msg)
  1228. if self.TYPE == 'processes':
  1229. buffer = array.array('i', [0]*10)
  1230. expected = list(arr) + [0] * (10 - len(arr))
  1231. self.assertEqual(conn.send_bytes(arr), None)
  1232. self.assertEqual(conn.recv_bytes_into(buffer),
  1233. len(arr) * buffer.itemsize)
  1234. self.assertEqual(list(buffer), expected)
  1235. buffer = array.array('i', [0]*10)
  1236. expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
  1237. self.assertEqual(conn.send_bytes(arr), None)
  1238. self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
  1239. len(arr) * buffer.itemsize)
  1240. self.assertEqual(list(buffer), expected)
  1241. buffer = bytearray(latin(' ' * 40))
  1242. self.assertEqual(conn.send_bytes(longmsg), None)
  1243. try:
  1244. res = conn.recv_bytes_into(buffer)
  1245. except multiprocessing.BufferTooShort, e:
  1246. self.assertEqual(e.args, (longmsg,))
  1247. else:
  1248. self.fail('expected BufferTooShort, got %s' % res)
  1249. poll = TimingWrapper(conn.poll)
  1250. self.assertEqual(poll(), False)
  1251. self.assertTimingAlmostEqual(poll.elapsed, 0)
  1252. self.assertEqual(poll(TIMEOUT1), False)
  1253. self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
  1254. conn.send(None)
  1255. time.sleep(.1)
  1256. self.assertEqual(poll(TIMEOUT1), True)
  1257. self.assertTimingAlmostEqual(poll.elapsed, 0)
  1258. self.assertEqual(conn.recv(), None)
  1259. really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb
  1260. conn.send_bytes(really_big_msg)
  1261. self.assertEqual(conn.recv_bytes(), really_big_msg)
  1262. conn.send_bytes(SENTINEL) # tell child to quit
  1263. child_conn.close()
  1264. if self.TYPE == 'processes':
  1265. self.assertEqual(conn.readable, True)
  1266. self.assertEqual(conn.writable, True)
  1267. self.assertRaises(EOFError, conn.recv)
  1268. self.assertRaises(EOFError, conn.recv_bytes)
  1269. p.join()
  1270. def test_duplex_false(self):
  1271. reader, writer = self.Pipe(duplex=False)
  1272. self.assertEqual(writer.send(1), None)
  1273. self.assertEqual(reader.recv(), 1)
  1274. if self.TYPE == 'processes':
  1275. self.assertEqual(reader.readable, True)
  1276. self.assertEqual(reader.writable, False)
  1277. self.assertEqual(writer.readable, False)
  1278. self.assertEqual(writer.writable, True)
  1279. self.assertRaises(IOError, reader.send, 2)
  1280. self.assertRaises(IOError, writer.recv)
  1281. self.assertRaises(IOError, writer.poll)
  1282. def test_spawn_close(self):
  1283. # We test that a pipe connection can be closed by parent
  1284. # process immediately after child is spawned. On Windows this
  1285. # would have sometimes failed on old versions because
  1286. # child_conn would be closed before the child got a chance to
  1287. # duplicate it.
  1288. conn, child_conn = self.Pipe()
  1289. p = self.Process(target=self._echo, args=(child_conn,))
  1290. p.daemon = True
  1291. p.start()
  1292. child_conn.close() # this might complete before child initializes
  1293. msg = latin('hello')
  1294. conn.send_bytes(msg)
  1295. self.assertEqual(conn.recv_bytes(), msg)
  1296. conn.send_bytes(SENTINEL)
  1297. conn.close()
  1298. p.join()
  1299. def test_sendbytes(self):
  1300. if self.TYPE != 'processes':
  1301. self.skipTest('test not appropriate for {}'.format(self.TYPE))
  1302. msg = latin('abcdefghijklmnopqrstuvwxyz')
  1303. a, b = self.Pipe()
  1304. a.send_bytes(msg)
  1305. self.assertEqual(b.recv_bytes(), msg)
  1306. a.send_bytes(msg, 5)
  1307. self.assertEqual(b.recv_bytes(), msg[5:])
  1308. a.send_bytes(msg, 7, 8)
  1309. self.assertEqual(b.recv_bytes(), msg[7:7+8])
  1310. a.send_bytes(msg, 26)
  1311. self.assertEqual(b.recv_bytes(), latin(''))
  1312. a.send_bytes(msg, 26, 0)
  1313. self.assertEqual(b.recv_bytes(), latin(''))
  1314. self.assertRaises(ValueError, a.send_bytes, msg, 27)
  1315. self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
  1316. self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
  1317. self.assertRaises(ValueError, a.send_bytes, msg, -1)
  1318. self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
  1319. @classmethod
  1320. def _is_fd_assigned(cls, fd):
  1321. try:
  1322. os.fstat(fd)
  1323. except OSError as e:
  1324. if e.errno == errno.EBADF:
  1325. return False
  1326. raise
  1327. else:
  1328. return True
  1329. @classmethod
  1330. def _writefd(cls, conn, data, create_dummy_fds=False):
  1331. if create_dummy_fds:
  1332. for i in range(0, 256):
  1333. if not cls._is_fd_assigned(i):
  1334. os.dup2(conn.fileno(), i)
  1335. fd = reduction.recv_handle(conn)
  1336. if msvcrt:
  1337. fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
  1338. os.write(fd, data)
  1339. os.close(fd)
  1340. @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
  1341. def test_fd_transfer(self):
  1342. if self.TYPE != 'processes':
  1343. self.skipTest("only makes sense with processes")
  1344. conn, child_conn = self.Pipe(duplex=True)
  1345. p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
  1346. p.daemon = True
  1347. p.start()
  1348. with open(test_support.TESTFN, "wb") as f:
  1349. fd = f.fileno()
  1350. if msvcrt:
  1351. fd = msvcrt.get_osfhandle(fd)
  1352. reduction.send_handle(conn, fd, p.pid)
  1353. p.join()
  1354. with open(test_support.TESTFN, "rb") as f:
  1355. self.assertEqual(f.read(), b"foo")
  1356. @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
  1357. @unittest.skipIf(sys.platform == "win32",
  1358. "test semantics don't make sense on Windows")
  1359. @unittest.skipIf(MAXFD <= 256,
  1360. "largest assignable fd number is too small")
  1361. @unittest.skipUnless(hasattr(os, "dup2"),
  1362. "test needs os.dup2()")
  1363. def test_large_fd_transfer(self):
  1364. # With fd > 256 (issue #11657)
  1365. if self.TYPE != 'processes':
  1366. self.skipTest("only makes sense with processes")
  1367. conn, child_conn = self.Pipe(duplex=True)
  1368. p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
  1369. p.daemon = True
  1370. p.start()
  1371. with open(test_support.TESTFN, "wb") as f:
  1372. fd = f.fileno()
  1373. for newfd in range(256, MAXFD):
  1374. if not self._is_fd_assigned(newfd):
  1375. break
  1376. else:
  1377. self.fail("could not find an unassigned large file descriptor")
  1378. os.dup2(fd, newfd)
  1379. try:
  1380. reduction.send_handle(conn, newfd, p.pid)
  1381. finally:
  1382. os.close(newfd)
  1383. p.join()
  1384. with open(test_support.TESTFN, "rb") as f:
  1385. self.assertEqual(f.read(), b"bar")
  1386. @classmethod
  1387. def _send_data_without_fd(self, conn):
  1388. os.write(conn.fileno(), b"\0")
  1389. @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
  1390. @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
  1391. def test_missing_fd_transfer(self):
  1392. # Check that exception is raised when received data is not
  1393. # accompanied by a file descriptor in ancillary data.
  1394. if self.TYPE != 'processes':
  1395. self.skipTest("only makes sense with processes")
  1396. conn, child_conn = self.Pipe(duplex=True)
  1397. p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
  1398. p.daemon = True
  1399. p.start()
  1400. self.assertRaises(RuntimeError, reduction.recv_handle, conn)
  1401. p.join()
  1402. class _TestListenerClient(BaseTestCase):
  1403. ALLOWED_TYPES = ('processes', 'threads')
  1404. @classmethod
  1405. def _test(cls, address):
  1406. conn = cls.connection.Client(address)
  1407. conn.send('hello')
  1408. conn.close()
  1409. def test_listener_client(self):
  1410. for family in self.connection.families:
  1411. l = self.connection.Listener(family=family)
  1412. p = self.Process(target=self._test, args=(l.address,))
  1413. p.daemon = True
  1414. p.start()
  1415. conn = l.accept()
  1416. self.assertEqual(conn.recv(), 'hello')
  1417. p.join()
  1418. l.close()
  1419. def test_issue14725(self):
  1420. l = self.connection.Listener()
  1421. p = self.Process(target=self._test, args=(l.address,))
  1422. p.daemon = True
  1423. p.start()
  1424. time.sleep(1)
  1425. # On Windows the client process should by now have connected,
  1426. # written data and closed the pipe handle by now. This causes
  1427. # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue
  1428. # 14725.
  1429. conn = l.accept()
  1430. self.assertEqual(conn.recv(), 'hello')
  1431. conn.close()
  1432. p.join()
  1433. l.close()
  1434. #
  1435. # Test of sending connection and socket objects between processes
  1436. #
  1437. """
  1438. class _TestPicklingConnections(BaseTestCase):
  1439. ALLOWED_TYPES = ('processes',)
  1440. def _listener(self, conn, families):
  1441. for fam in families:
  1442. l = self.connection.Listener(family=fam)
  1443. conn.send(l.address)
  1444. new_conn = l.accept()
  1445. conn.send(new_conn)
  1446. if self.TYPE == 'processes':
  1447. l = socket.socket()
  1448. l.bind(('localhost', 0))
  1449. conn.send(l.getsockname())
  1450. l.listen(1)
  1451. new_conn, addr = l.accept()
  1452. conn.send(new_conn)
  1453. conn.recv()
  1454. def _remote(self, conn):
  1455. for (address, msg) in iter(conn.recv, None):
  1456. client = self.connection.Client(address)
  1457. client.send(msg.upper())
  1458. client.close()
  1459. if self.TYPE == 'processes':
  1460. address, msg = conn.recv()
  1461. client = socket.socket()
  1462. client.connect(address)
  1463. client.sendall(msg.upper())
  1464. client.close()
  1465. conn.close()
  1466. def test_pickling(self):
  1467. try:
  1468. multiprocessing.allow_connection_pickling()
  1469. except ImportError:
  1470. return
  1471. families = self.connection.families
  1472. lconn, lconn0 = self.Pipe()
  1473. lp = self.Process(target=self._listener, args=(lconn0, families))
  1474. lp.daemon = True
  1475. lp.start()
  1476. lconn0.close()
  1477. rconn, rconn0 = self.Pipe()
  1478. rp = self.Process(target=self._remote, args=(rconn0,))
  1479. rp.daemon = True
  1480. rp.start()
  1481. rconn0.close()
  1482. for fam in families:
  1483. msg = ('This connection uses family %s' % fam).encode('ascii')
  1484. address = lconn.recv()
  1485. rconn.send((address, msg))
  1486. new_conn = lconn.recv()
  1487. self.assertEqual(new_conn.recv(), msg.upper())
  1488. rconn.send(None)
  1489. if self.TYPE == 'processes':
  1490. msg = latin('This connection uses a normal socket')
  1491. address = lconn.recv()
  1492. rconn.send((address, msg))
  1493. if hasattr(socket, 'fromfd'):
  1494. new_conn = lconn.recv()
  1495. self.assertEqual(new_conn.recv(100), msg.upper())
  1496. else:
  1497. # XXX On Windows with Py2.6 need to backport fromfd()
  1498. discard = lconn.recv_bytes()
  1499. lconn.send(None)
  1500. rconn.close()
  1501. lconn.close()
  1502. lp.join()
  1503. rp.join()
  1504. """
  1505. #
  1506. #
  1507. #
  1508. class _TestHeap(BaseTestCase):
  1509. ALLOWED_TYPES = ('processes',)
  1510. def test_heap(self):
  1511. iterations = 5000
  1512. maxblocks = 50
  1513. blocks = []
  1514. # create and destroy lots of blocks of different sizes
  1515. for i in xrange(iterations):
  1516. size = int(random.lognormvariate(0, 1) * 1000)
  1517. b = multiprocessing.heap.BufferWrapper(size)
  1518. blocks.append(b)
  1519. if len(blocks) > maxblocks:
  1520. i = random.randrange(maxblocks)
  1521. del blocks[i]
  1522. # get the heap object
  1523. heap = multiprocessing.heap.BufferWrapper._heap
  1524. # verify the state of the heap
  1525. all = []
  1526. occupied = 0
  1527. heap._lock.acquire()
  1528. self.addCleanup(heap._lock.release)
  1529. for L in heap._len_to_seq.values():
  1530. for arena, start, stop in L:
  1531. all.append((heap._arenas.index(arena), start, stop,
  1532. stop-start, 'free'))
  1533. for arena, start, stop in heap._allocated_blocks:
  1534. all.append((heap._arenas.index(arena), start, stop,
  1535. stop-start, 'occupied'))
  1536. occupied += (stop-start)
  1537. all.sort()
  1538. for i in range(len(all)-1):
  1539. (arena, start, stop) = all[i][:3]
  1540. (narena, nstart, nstop) = all[i+1][:3]
  1541. self.assertTrue((arena != narena and nstart == 0) or
  1542. (stop == nstart))
  1543. def test_free_from_gc(self):
  1544. # Check that freeing of blocks by the garbage collector doesn't deadlock
  1545. # (issue #12352).
  1546. # Make sure the GC is enabled, and set lower collection thresholds to
  1547. # make collections more frequent (and increase the probability of
  1548. # deadlock).
  1549. if not gc.isenabled():
  1550. gc.enable()
  1551. self.addCleanup(gc.disable)
  1552. thresholds = gc.get_threshold()
  1553. self.addCleanup(gc.set_threshold, *thresholds)
  1554. gc.set_threshold(10)
  1555. # perform numerous block allocations, with cyclic references to make
  1556. # sure objects are collected asynchronously by the gc
  1557. for i in range(5000):
  1558. a = multiprocessing.heap.BufferWrapper(1)
  1559. b = multiprocessing.heap.BufferWrapper(1)
  1560. # circular references
  1561. a.buddy = b
  1562. b.buddy = a
  1563. #
  1564. #
  1565. #
  1566. class _Foo(Structure):
  1567. _fields_ = [
  1568. ('x', c_int),
  1569. ('y', c_double)
  1570. ]
  1571. class _TestSharedCTypes(BaseTestCase):
  1572. ALLOWED_TYPES = ('processes',)
  1573. def setUp(self):
  1574. if not HAS_SHAREDCTYPES:
  1575. self.skipTest("requires multiprocessing.sharedctypes")
  1576. @classmethod
  1577. def _double(cls, x, y, foo, arr, string):
  1578. x.value *= 2
  1579. y.value *= 2
  1580. foo.x *= 2
  1581. foo.y *= 2
  1582. string.value *= 2
  1583. for i in range(len(arr)):
  1584. arr[i] *= 2
  1585. def test_sharedctypes(self, lock=False):
  1586. x = Value('i', 7, lock=lock)
  1587. y = Value(c_double, 1.0/3.0, lock=lock)
  1588. foo = Value(_Foo, 3, 2, lock=lock)
  1589. arr = self.Array('d', range(10), lock=lock)
  1590. string = self.Array('c', 20, lock=lock)
  1591. string.value = latin('hello')
  1592. p = self.Process(target=self._double, args=(x, y, foo, arr, string))
  1593. p.daemon = True
  1594. p.start()
  1595. p.join()
  1596. self.assertEqual(x.value, 14)
  1597. self.assertAlmostEqual(y.value, 2.0/3.0)
  1598. self.assertEqual(foo.x, 6)
  1599. self.assertAlmostEqual(foo.y, 4.0)
  1600. for i in range(10):
  1601. self.assertAlmostEqual(arr[i], i*2)
  1602. self.assertEqual(string.value, latin('hellohello'))
  1603. def test_synchronize(self):
  1604. self.test_sharedctypes(lock=True)
  1605. def test_copy(self):
  1606. foo = _Foo(2, 5.0)
  1607. bar = copy(foo)
  1608. foo.x = 0
  1609. foo.y = 0
  1610. self.assertEqual(bar.x, 2)
  1611. self.assertAlmostEqual(bar.y, 5.0)
  1612. #
  1613. #
  1614. #
  1615. class _TestFinalize(BaseTestCase):
  1616. ALLOWED_TYPES = ('processes',)
  1617. @classmethod
  1618. def _test_finalize(cls, conn):
  1619. class Foo(object):
  1620. pass
  1621. a = Foo()
  1622. util.Finalize(a, conn.send, args=('a',))
  1623. del a # triggers callback for a
  1624. b = Foo()
  1625. close_b = util.Finalize(b, conn.send, args=('b',))
  1626. close_b() # triggers callback for b
  1627. close_b() # does nothing because callback has already been called
  1628. del b # does nothing because callback has already been called
  1629. c = Foo()
  1630. util.Finalize(c, conn.send, args=('c',))
  1631. d10 = Foo()
  1632. util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
  1633. d01 = Foo()
  1634. util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
  1635. d02 = Foo()
  1636. util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
  1637. d03 = Foo()
  1638. util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
  1639. util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
  1640. util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
  1641. # call multiprocessing's cleanup function then exit process without
  1642. # garbage collecting locals
  1643. util._exit_function()
  1644. conn.close()
  1645. os._exit(0)
  1646. def test_finalize(self):
  1647. conn, child_conn = self.Pipe()
  1648. p = self.Process(target=self._test_finalize, args=(child_conn,))
  1649. p.daemon = True
  1650. p.start()
  1651. p.join()
  1652. result = [obj for obj in iter(conn.recv, 'STOP')]
  1653. self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
  1654. #
  1655. # Test that from ... import * works for each module
  1656. #
  1657. class _TestImportStar(BaseTestCase):
  1658. ALLOWED_TYPES = ('processes',)
  1659. def test_import(self):
  1660. modules = [
  1661. 'multiprocessing', 'multiprocessing.connection',
  1662. 'multiprocessing.heap', 'multiprocessing.managers',
  1663. 'multiprocessing.pool', 'multiprocessing.process',
  1664. 'multiprocessing.synchronize', 'multiprocessing.util'
  1665. ]
  1666. if HAS_REDUCTION:
  1667. modules.append('multiprocessing.reduction')
  1668. if c_int is not None:
  1669. # This module requires _ctypes
  1670. modules.append('multiprocessing.sharedctypes')
  1671. for name in modules:
  1672. __import__(name)
  1673. mod = sys.modules[name]
  1674. for attr in getattr(mod, '__all__', ()):
  1675. self.assertTrue(
  1676. hasattr(mod, attr),
  1677. '%r does not have attribute %r' % (mod, attr)
  1678. )
  1679. #
  1680. # Quick test that logging works -- does not test logging output
  1681. #
  1682. class _TestLogging(BaseTestCase):
  1683. ALLOWED_TYPES = ('processes',)
  1684. def test_enable_logging(self):
  1685. logger = multiprocessing.get_logger()
  1686. logger.setLevel(util.SUBWARNING)
  1687. self.assertTrue(logger is not None)
  1688. logger.debug('this will not be printed')
  1689. logger.info('nor will this')
  1690. logger.setLevel(LOG_LEVEL)
  1691. @classmethod
  1692. def _test_level(cls, conn):
  1693. logger = multiprocessing.get_logger()
  1694. conn.send(logger.getEffectiveLevel())
  1695. def test_level(self):
  1696. LEVEL1 = 32
  1697. LEVEL2 = 37
  1698. logger = multiprocessing.get_logger()
  1699. root_logger = logging.getLogger()
  1700. root_level = root_logger.level
  1701. reader, writer = multiprocessing.Pipe(duplex=False)
  1702. logger.setLevel(LEVEL1)
  1703. p = self.Process(target=self._test_level, args=(writer,))
  1704. p.daemon = True
  1705. p.start()
  1706. self.assertEqual(LEVEL1, reader.recv())
  1707. logger.setLevel(logging.NOTSET)
  1708. root_logger.setLevel(LEVEL2)
  1709. p = self.Process(target=self._test_level, args=(writer,))
  1710. p.daemon = True
  1711. p.start()
  1712. self.assertEqual(LEVEL2, reader.recv())
  1713. root_logger.setLevel(root_level)
  1714. logger.setLevel(level=LOG_LEVEL)
  1715. # class _TestLoggingProcessName(BaseTestCase):
  1716. #
  1717. # def handle(self, record):
  1718. # assert record.processName == multiprocessing.current_process().name
  1719. # self.__handled = True
  1720. #
  1721. # def test_logging(self):
  1722. # handler = logging.Handler()
  1723. # handler.handle = self.handle
  1724. # self.__handled = False
  1725. # # Bypass getLogger() and side-effects
  1726. # logger = logging.getLoggerClass()(
  1727. # 'multiprocessing.test.TestLoggingProcessName')
  1728. # logger.addHandler(handler)
  1729. # logger.propagate = False
  1730. #
  1731. # logger.warn('foo')
  1732. # assert self.__handled
  1733. #
  1734. # Check that Process.join() retries if os.waitpid() fails with EINTR
  1735. #
  1736. class _TestPollEintr(BaseTestCase):
  1737. ALLOWED_TYPES = ('processes',)
  1738. @classmethod
  1739. def _killer(cls, pid):
  1740. time.sleep(0.5)
  1741. os.kill(pid, signal.SIGUSR1)
  1742. @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
  1743. def test_poll_eintr(self):
  1744. got_signal = [False]
  1745. def record(*args):
  1746. got_signal[0] = True
  1747. pid = os.getpid()
  1748. oldhandler = signal.signal(signal.SIGUSR1, record)
  1749. try:
  1750. killer = self.Process(target=self._killer, args=(pid,))
  1751. killer.start()
  1752. p = self.Process(target=time.sleep, args=(1,))
  1753. p.start()
  1754. p.join()
  1755. self.assertTrue(got_signal[0])
  1756. self.assertEqual(p.exitcode, 0)
  1757. killer.join()
  1758. finally:
  1759. signal.signal(signal.SIGUSR1, oldhandler)
  1760. #
  1761. # Test to verify handle verification, see issue 3321
  1762. #
  1763. class TestInvalidHandle(unittest.TestCase):
  1764. @unittest.skipIf(WIN32, "skipped on Windows")
  1765. def test_invalid_handles(self):
  1766. conn = _multiprocessing.Connection(44977608)
  1767. self.assertRaises(IOError, conn.poll)
  1768. self.assertRaises(IOError, _multiprocessing.Connection, -1)
  1769. #
  1770. # Functions used to create test cases from the base ones in this module
  1771. #
  1772. def get_attributes(Source, names):
  1773. d = {}
  1774. for name in names:
  1775. obj = getattr(Source, name)
  1776. if type(obj) == type(get_attributes):
  1777. obj = staticmethod(obj)
  1778. d[name] = obj
  1779. return d
  1780. def create_test_cases(Mixin, type):
  1781. result = {}
  1782. glob = globals()
  1783. Type = type.capitalize()
  1784. for name in glob.keys():
  1785. if name.startswith('_Test'):
  1786. base = glob[name]
  1787. if type in base.ALLOWED_TYPES:
  1788. newname = 'With' + Type + name[1:]
  1789. class Temp(base, unittest.TestCase, Mixin):
  1790. pass
  1791. result[newname] = Temp
  1792. Temp.__name__ = newname
  1793. Temp.__module__ = Mixin.__module__
  1794. return result
  1795. #
  1796. # Create test cases
  1797. #
  1798. class ProcessesMixin(object):
  1799. TYPE = 'processes'
  1800. Process = multiprocessing.Process
  1801. locals().update(get_attributes(multiprocessing, (
  1802. 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
  1803. 'Condition', 'Event', 'Value', 'Array', 'RawValue',
  1804. 'RawArray', 'current_process', 'active_children', 'Pipe',
  1805. 'connection', 'JoinableQueue', 'Pool'
  1806. )))
  1807. testcases_processes = create_test_cases(ProcessesMixin, type='processes')
  1808. globals().update(testcases_processes)
  1809. class ManagerMixin(object):
  1810. TYPE = 'manager'
  1811. Process = multiprocessing.Process
  1812. manager = object.__new__(multiprocessing.managers.SyncManager)
  1813. locals().update(get_attributes(manager, (
  1814. 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
  1815. 'Condition', 'Event', 'Value', 'Array', 'list', 'dict',
  1816. 'Namespace', 'JoinableQueue', 'Pool'
  1817. )))
  1818. testcases_manager = create_test_cases(ManagerMixin, type='manager')
  1819. globals().update(testcases_manager)
  1820. class ThreadsMixin(object):
  1821. TYPE = 'threads'
  1822. Process = multiprocessing.dummy.Process
  1823. locals().update(get_attributes(multiprocessing.dummy, (
  1824. 'Queue', 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore',
  1825. 'Condition', 'Event', 'Value', 'Array', 'current_process',
  1826. 'active_children', 'Pipe', 'connection', 'dict', 'list',
  1827. 'Namespace', 'JoinableQueue', 'Pool'
  1828. )))
  1829. testcases_threads = create_test_cases(ThreadsMixin, type='threads')
  1830. globals().update(testcases_threads)
  1831. class OtherTest(unittest.TestCase):
  1832. # TODO: add more tests for deliver/answer challenge.
  1833. def test_deliver_challenge_auth_failure(self):
  1834. class _FakeConnection(object):
  1835. def recv_bytes(self, size):
  1836. return b'something bogus'
  1837. def send_bytes(self, data):
  1838. pass
  1839. self.assertRaises(multiprocessing.AuthenticationError,
  1840. multiprocessing.connection.deliver_challenge,
  1841. _FakeConnection(), b'abc')
  1842. def test_answer_challenge_auth_failure(self):
  1843. class _FakeConnection(object):
  1844. def __init__(self):
  1845. self.count = 0
  1846. def recv_bytes(self, size):
  1847. self.count += 1
  1848. if self.count == 1:
  1849. return multiprocessing.connection.CHALLENGE
  1850. elif self.count == 2:
  1851. return b'something bogus'
  1852. return b''
  1853. def send_bytes(self, data):
  1854. pass
  1855. self.assertRaises(multiprocessing.AuthenticationError,
  1856. multiprocessing.connection.answer_challenge,
  1857. _FakeConnection(), b'abc')
  1858. #
  1859. # Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
  1860. #
  1861. def initializer(ns):
  1862. ns.test += 1
  1863. class TestInitializers(unittest.TestCase):
  1864. def setUp(self):
  1865. self.mgr = multiprocessing.Manager()
  1866. self.ns = self.mgr.Namespace()
  1867. self.ns.test = 0
  1868. def tearDown(self):
  1869. self.mgr.shutdown()
  1870. def test_manager_initializer(self):
  1871. m = multiprocessing.managers.SyncManager()
  1872. self.assertRaises(TypeError, m.start, 1)
  1873. m.start(initializer, (self.ns,))
  1874. self.assertEqual(self.ns.test, 1)
  1875. m.shutdown()
  1876. def test_pool_initializer(self):
  1877. self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
  1878. p = multiprocessing.Pool(1, initializer, (self.ns,))
  1879. p.close()
  1880. p.join()
  1881. self.assertEqual(self.ns.test, 1)
  1882. #
  1883. # Issue 5155, 5313, 5331: Test process in processes
  1884. # Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
  1885. #
  1886. def _this_sub_process(q):
  1887. try:
  1888. item = q.get(block=False)
  1889. except Queue.Empty:
  1890. pass
  1891. def _test_process(q):
  1892. queue = multiprocessing.Queue()
  1893. subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
  1894. subProc.daemon = True
  1895. subProc.start()
  1896. subProc.join()
  1897. def _afunc(x):
  1898. return x*x
  1899. def pool_in_process():
  1900. pool = multiprocessing.Pool(processes=4)
  1901. x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
  1902. class _file_like(object):
  1903. def __init__(self, delegate):
  1904. self._delegate = delegate
  1905. self._pid = None
  1906. @property
  1907. def cache(self):
  1908. pid = os.getpid()
  1909. # There are no race conditions since fork keeps only the running thread
  1910. if pid != self._pid:
  1911. self._pid = pid
  1912. self._cache = []
  1913. return self._cache
  1914. def write(self, data):
  1915. self.cache.append(data)
  1916. def flush(self):
  1917. self._delegate.write(''.join(self.cache))
  1918. self._cache = []
  1919. class TestStdinBadfiledescriptor(unittest.TestCase):
  1920. def test_queue_in_process(self):
  1921. queue = multiprocessing.Queue()
  1922. proc = multiprocessing.Process(target=_test_process, args=(queue,))
  1923. proc.start()
  1924. proc.join()
  1925. def test_pool_in_process(self):
  1926. p = multiprocessing.Process(target=pool_in_process)
  1927. p.start()
  1928. p.join()
  1929. def test_flushing(self):
  1930. sio = StringIO()
  1931. flike = _file_like(sio)
  1932. flike.write('foo')
  1933. proc = multiprocessing.Process(target=lambda: flike.flush())
  1934. flike.flush()
  1935. assert sio.getvalue() == 'foo'
  1936. #
  1937. # Test interaction with socket timeouts - see Issue #6056
  1938. #
  1939. class TestTimeouts(unittest.TestCase):
  1940. @classmethod
  1941. def _test_timeout(cls, child, address):
  1942. time.sleep(1)
  1943. child.send(123)
  1944. child.close()
  1945. conn = multiprocessing.connection.Client(address)
  1946. conn.send(456)
  1947. conn.close()
  1948. def test_timeout(self):
  1949. old_timeout = socket.getdefaulttimeout()
  1950. try:
  1951. socket.setdefaulttimeout(0.1)
  1952. parent, child = multiprocessing.Pipe(duplex=True)
  1953. l = multiprocessing.connection.Listener(family='AF_INET')
  1954. p = multiprocessing.Process(target=self._test_timeout,
  1955. args=(child, l.address))
  1956. p.start()
  1957. child.close()
  1958. self.assertEqual(parent.recv(), 123)
  1959. parent.close()
  1960. conn = l.accept()
  1961. self.assertEqual(conn.recv(), 456)
  1962. conn.close()
  1963. l.close()
  1964. p.join(10)
  1965. finally:
  1966. socket.setdefaulttimeout(old_timeout)
  1967. #
  1968. # Test what happens with no "if __name__ == '__main__'"
  1969. #
  1970. class TestNoForkBomb(unittest.TestCase):
  1971. def test_noforkbomb(self):
  1972. name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
  1973. if WIN32:
  1974. rc, out, err = test.script_helper.assert_python_failure(name)
  1975. self.assertEqual(out, '')
  1976. self.assertIn('RuntimeError', err)
  1977. else:
  1978. rc, out, err = test.script_helper.assert_python_ok(name)
  1979. self.assertEqual(out.rstrip(), '123')
  1980. self.assertEqual(err, '')
  1981. #
  1982. # Issue 12098: check sys.flags of child matches that for parent
  1983. #
  1984. class TestFlags(unittest.TestCase):
  1985. @classmethod
  1986. def run_in_grandchild(cls, conn):
  1987. conn.send(tuple(sys.flags))
  1988. @classmethod
  1989. def run_in_child(cls):
  1990. import json
  1991. r, w = multiprocessing.Pipe(duplex=False)
  1992. p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
  1993. p.start()
  1994. grandchild_flags = r.recv()
  1995. p.join()
  1996. r.close()
  1997. w.close()
  1998. flags = (tuple(sys.flags), grandchild_flags)
  1999. print(json.dumps(flags))
  2000. @test_support.requires_unicode # XXX json needs unicode support
  2001. def test_flags(self):
  2002. import json, subprocess
  2003. # start child process using unusual flags
  2004. prog = ('from test.test_multiprocessing import TestFlags; ' +
  2005. 'TestFlags.run_in_child()')
  2006. data = subprocess.check_output(
  2007. [sys.executable, '-E', '-B', '-O', '-c', prog])
  2008. child_flags, grandchild_flags = json.loads(data.decode('ascii'))
  2009. self.assertEqual(child_flags, grandchild_flags)
  2010. #
  2011. # Issue #17555: ForkAwareThreadLock
  2012. #
  2013. class TestForkAwareThreadLock(unittest.TestCase):
  2014. # We recurisvely start processes. Issue #17555 meant that the
  2015. # after fork registry would get duplicate entries for the same
  2016. # lock. The size of the registry at generation n was ~2**n.
  2017. @classmethod
  2018. def child(cls, n, conn):
  2019. if n > 1:
  2020. p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
  2021. p.start()
  2022. p.join()
  2023. else:
  2024. conn.send(len(util._afterfork_registry))
  2025. conn.close()
  2026. def test_lock(self):
  2027. r, w = multiprocessing.Pipe(False)
  2028. l = util.ForkAwareThreadLock()
  2029. old_size = len(util._afterfork_registry)
  2030. p = multiprocessing.Process(target=self.child, args=(5, w))
  2031. p.start()
  2032. new_size = r.recv()
  2033. p.join()
  2034. self.assertLessEqual(new_size, old_size)
  2035. #
  2036. # Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
  2037. #
  2038. class TestIgnoreEINTR(unittest.TestCase):
  2039. @classmethod
  2040. def _test_ignore(cls, conn):
  2041. def handler(signum, frame):
  2042. pass
  2043. signal.signal(signal.SIGUSR1, handler)
  2044. conn.send('ready')
  2045. x = conn.recv()
  2046. conn.send(x)
  2047. conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block
  2048. @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
  2049. def test_ignore(self):
  2050. conn, child_conn = multiprocessing.Pipe()
  2051. try:
  2052. p = multiprocessing.Process(target=self._test_ignore,
  2053. args=(child_conn,))
  2054. p.daemon = True
  2055. p.start()
  2056. child_conn.close()
  2057. self.assertEqual(conn.recv(), 'ready')
  2058. time.sleep(0.1)
  2059. os.kill(p.pid, signal.SIGUSR1)
  2060. time.sleep(0.1)
  2061. conn.send(1234)
  2062. self.assertEqual(conn.recv(), 1234)
  2063. time.sleep(0.1)
  2064. os.kill(p.pid, signal.SIGUSR1)
  2065. self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
  2066. time.sleep(0.1)
  2067. p.join()
  2068. finally:
  2069. conn.close()
  2070. @classmethod
  2071. def _test_ignore_listener(cls, conn):
  2072. def handler(signum, frame):
  2073. pass
  2074. signal.signal(signal.SIGUSR1, handler)
  2075. l = multiprocessing.connection.Listener()
  2076. conn.send(l.address)
  2077. a = l.accept()
  2078. a.send('welcome')
  2079. @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
  2080. def test_ignore_listener(self):
  2081. conn, child_conn = multiprocessing.Pipe()
  2082. try:
  2083. p = multiprocessing.Process(target=self._test_ignore_listener,
  2084. args=(child_conn,))
  2085. p.daemon = True
  2086. p.start()
  2087. child_conn.close()
  2088. address = conn.recv()
  2089. time.sleep(0.1)
  2090. os.kill(p.pid, signal.SIGUSR1)
  2091. time.sleep(0.1)
  2092. client = multiprocessing.connection.Client(address)
  2093. self.assertEqual(client.recv(), 'welcome')
  2094. p.join()
  2095. finally:
  2096. conn.close()
  2097. #
  2098. #
  2099. #
  2100. testcases_other = [OtherTest, TestInvalidHandle, TestInitializers,
  2101. TestStdinBadfiledescriptor, TestTimeouts, TestNoForkBomb,
  2102. TestFlags, TestForkAwareThreadLock, TestIgnoreEINTR]
  2103. #
  2104. #
  2105. #
  2106. def test_main(run=None):
  2107. if sys.platform.startswith("linux"):
  2108. try:
  2109. lock = multiprocessing.RLock()
  2110. except OSError:
  2111. raise unittest.SkipTest("OSError raises on RLock creation, see issue 3111!")
  2112. check_enough_semaphores()
  2113. if run is None:
  2114. from test.test_support import run_unittest as run
  2115. util.get_temp_dir() # creates temp directory for use by all processes
  2116. multiprocessing.get_logger().setLevel(LOG_LEVEL)
  2117. ProcessesMixin.pool = multiprocessing.Pool(4)
  2118. ThreadsMixin.pool = multiprocessing.dummy.Pool(4)
  2119. ManagerMixin.manager.__init__()
  2120. ManagerMixin.manager.start()
  2121. ManagerMixin.pool = ManagerMixin.manager.Pool(4)
  2122. testcases = (
  2123. sorted(testcases_processes.values(), key=lambda tc:tc.__name__) +
  2124. sorted(testcases_threads.values(), key=lambda tc:tc.__name__) +
  2125. sorted(testcases_manager.values(), key=lambda tc:tc.__name__) +
  2126. testcases_other
  2127. )
  2128. loadTestsFromTestCase = unittest.defaultTestLoader.loadTestsFromTestCase
  2129. suite = unittest.TestSuite(loadTestsFromTestCase(tc) for tc in testcases)
  2130. # (ncoghlan): Whether or not sys.exc_clear is executed by the threading
  2131. # module during these tests is at least platform dependent and possibly
  2132. # non-deterministic on any given platform. So we don't mind if the listed
  2133. # warnings aren't actually raised.
  2134. with test_support.check_py3k_warnings(
  2135. (".+__(get|set)slice__ has been removed", DeprecationWarning),
  2136. (r"sys.exc_clear\(\) not supported", DeprecationWarning),
  2137. quiet=True):
  2138. run(suite)
  2139. ThreadsMixin.pool.terminate()
  2140. ProcessesMixin.pool.terminate()
  2141. ManagerMixin.pool.terminate()
  2142. ManagerMixin.manager.shutdown()
  2143. del ProcessesMixin.pool, ThreadsMixin.pool, ManagerMixin.pool
  2144. def main():
  2145. test_main(unittest.TextTestRunner(verbosity=2).run)
  2146. if __name__ == '__main__':
  2147. main()