test_subprocess.py 58 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443
  1. import unittest
  2. from test import test_support
  3. import subprocess
  4. import sys
  5. import signal
  6. import os
  7. import errno
  8. import tempfile
  9. import time
  10. import re
  11. import sysconfig
  12. try:
  13. import resource
  14. except ImportError:
  15. resource = None
  16. try:
  17. import threading
  18. except ImportError:
  19. threading = None
  20. mswindows = (sys.platform == "win32")
  21. #
  22. # Depends on the following external programs: Python
  23. #
  24. if mswindows:
  25. SETBINARY = ('import msvcrt; msvcrt.setmode(sys.stdout.fileno(), '
  26. 'os.O_BINARY);')
  27. else:
  28. SETBINARY = ''
  29. class BaseTestCase(unittest.TestCase):
  30. def setUp(self):
  31. # Try to minimize the number of children we have so this test
  32. # doesn't crash on some buildbots (Alphas in particular).
  33. test_support.reap_children()
  34. def tearDown(self):
  35. for inst in subprocess._active:
  36. inst.wait()
  37. subprocess._cleanup()
  38. self.assertFalse(subprocess._active, "subprocess._active not empty")
  39. def assertStderrEqual(self, stderr, expected, msg=None):
  40. # In a debug build, stuff like "[6580 refs]" is printed to stderr at
  41. # shutdown time. That frustrates tests trying to check stderr produced
  42. # from a spawned Python process.
  43. actual = re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
  44. self.assertEqual(actual, expected, msg)
  45. class PopenTestException(Exception):
  46. pass
  47. class PopenExecuteChildRaises(subprocess.Popen):
  48. """Popen subclass for testing cleanup of subprocess.PIPE filehandles when
  49. _execute_child fails.
  50. """
  51. def _execute_child(self, *args, **kwargs):
  52. raise PopenTestException("Forced Exception for Test")
  53. class ProcessTestCase(BaseTestCase):
  54. def test_call_seq(self):
  55. # call() function with sequence argument
  56. rc = subprocess.call([sys.executable, "-c",
  57. "import sys; sys.exit(47)"])
  58. self.assertEqual(rc, 47)
  59. def test_check_call_zero(self):
  60. # check_call() function with zero return code
  61. rc = subprocess.check_call([sys.executable, "-c",
  62. "import sys; sys.exit(0)"])
  63. self.assertEqual(rc, 0)
  64. def test_check_call_nonzero(self):
  65. # check_call() function with non-zero return code
  66. with self.assertRaises(subprocess.CalledProcessError) as c:
  67. subprocess.check_call([sys.executable, "-c",
  68. "import sys; sys.exit(47)"])
  69. self.assertEqual(c.exception.returncode, 47)
  70. def test_check_output(self):
  71. # check_output() function with zero return code
  72. output = subprocess.check_output(
  73. [sys.executable, "-c", "print 'BDFL'"])
  74. self.assertIn('BDFL', output)
  75. def test_check_output_nonzero(self):
  76. # check_call() function with non-zero return code
  77. with self.assertRaises(subprocess.CalledProcessError) as c:
  78. subprocess.check_output(
  79. [sys.executable, "-c", "import sys; sys.exit(5)"])
  80. self.assertEqual(c.exception.returncode, 5)
  81. def test_check_output_stderr(self):
  82. # check_output() function stderr redirected to stdout
  83. output = subprocess.check_output(
  84. [sys.executable, "-c", "import sys; sys.stderr.write('BDFL')"],
  85. stderr=subprocess.STDOUT)
  86. self.assertIn('BDFL', output)
  87. def test_check_output_stdout_arg(self):
  88. # check_output() function stderr redirected to stdout
  89. with self.assertRaises(ValueError) as c:
  90. output = subprocess.check_output(
  91. [sys.executable, "-c", "print 'will not be run'"],
  92. stdout=sys.stdout)
  93. self.fail("Expected ValueError when stdout arg supplied.")
  94. self.assertIn('stdout', c.exception.args[0])
  95. def test_call_kwargs(self):
  96. # call() function with keyword args
  97. newenv = os.environ.copy()
  98. newenv["FRUIT"] = "banana"
  99. rc = subprocess.call([sys.executable, "-c",
  100. 'import sys, os;'
  101. 'sys.exit(os.getenv("FRUIT")=="banana")'],
  102. env=newenv)
  103. self.assertEqual(rc, 1)
  104. def test_invalid_args(self):
  105. # Popen() called with invalid arguments should raise TypeError
  106. # but Popen.__del__ should not complain (issue #12085)
  107. with test_support.captured_stderr() as s:
  108. self.assertRaises(TypeError, subprocess.Popen, invalid_arg_name=1)
  109. argcount = subprocess.Popen.__init__.__code__.co_argcount
  110. too_many_args = [0] * (argcount + 1)
  111. self.assertRaises(TypeError, subprocess.Popen, *too_many_args)
  112. self.assertEqual(s.getvalue(), '')
  113. def test_stdin_none(self):
  114. # .stdin is None when not redirected
  115. p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
  116. stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  117. self.addCleanup(p.stdout.close)
  118. self.addCleanup(p.stderr.close)
  119. p.wait()
  120. self.assertEqual(p.stdin, None)
  121. def test_stdout_none(self):
  122. # .stdout is None when not redirected, and the child's stdout will
  123. # be inherited from the parent. In order to test this we run a
  124. # subprocess in a subprocess:
  125. # this_test
  126. # \-- subprocess created by this test (parent)
  127. # \-- subprocess created by the parent subprocess (child)
  128. # The parent doesn't specify stdout, so the child will use the
  129. # parent's stdout. This test checks that the message printed by the
  130. # child goes to the parent stdout. The parent also checks that the
  131. # child's stdout is None. See #11963.
  132. code = ('import sys; from subprocess import Popen, PIPE;'
  133. 'p = Popen([sys.executable, "-c", "print \'test_stdout_none\'"],'
  134. ' stdin=PIPE, stderr=PIPE);'
  135. 'p.wait(); assert p.stdout is None;')
  136. p = subprocess.Popen([sys.executable, "-c", code],
  137. stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  138. self.addCleanup(p.stdout.close)
  139. self.addCleanup(p.stderr.close)
  140. out, err = p.communicate()
  141. self.assertEqual(p.returncode, 0, err)
  142. self.assertEqual(out.rstrip(), 'test_stdout_none')
  143. def test_stderr_none(self):
  144. # .stderr is None when not redirected
  145. p = subprocess.Popen([sys.executable, "-c", 'print "banana"'],
  146. stdin=subprocess.PIPE, stdout=subprocess.PIPE)
  147. self.addCleanup(p.stdout.close)
  148. self.addCleanup(p.stdin.close)
  149. p.wait()
  150. self.assertEqual(p.stderr, None)
  151. def test_executable_with_cwd(self):
  152. python_dir = os.path.dirname(os.path.realpath(sys.executable))
  153. p = subprocess.Popen(["somethingyoudonthave", "-c",
  154. "import sys; sys.exit(47)"],
  155. executable=sys.executable, cwd=python_dir)
  156. p.wait()
  157. self.assertEqual(p.returncode, 47)
  158. @unittest.skipIf(sysconfig.is_python_build(),
  159. "need an installed Python. See #7774")
  160. def test_executable_without_cwd(self):
  161. # For a normal installation, it should work without 'cwd'
  162. # argument. For test runs in the build directory, see #7774.
  163. p = subprocess.Popen(["somethingyoudonthave", "-c",
  164. "import sys; sys.exit(47)"],
  165. executable=sys.executable)
  166. p.wait()
  167. self.assertEqual(p.returncode, 47)
  168. def test_stdin_pipe(self):
  169. # stdin redirection
  170. p = subprocess.Popen([sys.executable, "-c",
  171. 'import sys; sys.exit(sys.stdin.read() == "pear")'],
  172. stdin=subprocess.PIPE)
  173. p.stdin.write("pear")
  174. p.stdin.close()
  175. p.wait()
  176. self.assertEqual(p.returncode, 1)
  177. def test_stdin_filedes(self):
  178. # stdin is set to open file descriptor
  179. tf = tempfile.TemporaryFile()
  180. d = tf.fileno()
  181. os.write(d, "pear")
  182. os.lseek(d, 0, 0)
  183. p = subprocess.Popen([sys.executable, "-c",
  184. 'import sys; sys.exit(sys.stdin.read() == "pear")'],
  185. stdin=d)
  186. p.wait()
  187. self.assertEqual(p.returncode, 1)
  188. def test_stdin_fileobj(self):
  189. # stdin is set to open file object
  190. tf = tempfile.TemporaryFile()
  191. tf.write("pear")
  192. tf.seek(0)
  193. p = subprocess.Popen([sys.executable, "-c",
  194. 'import sys; sys.exit(sys.stdin.read() == "pear")'],
  195. stdin=tf)
  196. p.wait()
  197. self.assertEqual(p.returncode, 1)
  198. def test_stdout_pipe(self):
  199. # stdout redirection
  200. p = subprocess.Popen([sys.executable, "-c",
  201. 'import sys; sys.stdout.write("orange")'],
  202. stdout=subprocess.PIPE)
  203. self.addCleanup(p.stdout.close)
  204. self.assertEqual(p.stdout.read(), "orange")
  205. def test_stdout_filedes(self):
  206. # stdout is set to open file descriptor
  207. tf = tempfile.TemporaryFile()
  208. d = tf.fileno()
  209. p = subprocess.Popen([sys.executable, "-c",
  210. 'import sys; sys.stdout.write("orange")'],
  211. stdout=d)
  212. p.wait()
  213. os.lseek(d, 0, 0)
  214. self.assertEqual(os.read(d, 1024), "orange")
  215. def test_stdout_fileobj(self):
  216. # stdout is set to open file object
  217. tf = tempfile.TemporaryFile()
  218. p = subprocess.Popen([sys.executable, "-c",
  219. 'import sys; sys.stdout.write("orange")'],
  220. stdout=tf)
  221. p.wait()
  222. tf.seek(0)
  223. self.assertEqual(tf.read(), "orange")
  224. def test_stderr_pipe(self):
  225. # stderr redirection
  226. p = subprocess.Popen([sys.executable, "-c",
  227. 'import sys; sys.stderr.write("strawberry")'],
  228. stderr=subprocess.PIPE)
  229. self.addCleanup(p.stderr.close)
  230. self.assertStderrEqual(p.stderr.read(), "strawberry")
  231. def test_stderr_filedes(self):
  232. # stderr is set to open file descriptor
  233. tf = tempfile.TemporaryFile()
  234. d = tf.fileno()
  235. p = subprocess.Popen([sys.executable, "-c",
  236. 'import sys; sys.stderr.write("strawberry")'],
  237. stderr=d)
  238. p.wait()
  239. os.lseek(d, 0, 0)
  240. self.assertStderrEqual(os.read(d, 1024), "strawberry")
  241. def test_stderr_fileobj(self):
  242. # stderr is set to open file object
  243. tf = tempfile.TemporaryFile()
  244. p = subprocess.Popen([sys.executable, "-c",
  245. 'import sys; sys.stderr.write("strawberry")'],
  246. stderr=tf)
  247. p.wait()
  248. tf.seek(0)
  249. self.assertStderrEqual(tf.read(), "strawberry")
  250. def test_stderr_redirect_with_no_stdout_redirect(self):
  251. # test stderr=STDOUT while stdout=None (not set)
  252. # - grandchild prints to stderr
  253. # - child redirects grandchild's stderr to its stdout
  254. # - the parent should get grandchild's stderr in child's stdout
  255. p = subprocess.Popen([sys.executable, "-c",
  256. 'import sys, subprocess;'
  257. 'rc = subprocess.call([sys.executable, "-c",'
  258. ' "import sys;"'
  259. ' "sys.stderr.write(\'42\')"],'
  260. ' stderr=subprocess.STDOUT);'
  261. 'sys.exit(rc)'],
  262. stdout=subprocess.PIPE,
  263. stderr=subprocess.PIPE)
  264. stdout, stderr = p.communicate()
  265. #NOTE: stdout should get stderr from grandchild
  266. self.assertStderrEqual(stdout, b'42')
  267. self.assertStderrEqual(stderr, b'') # should be empty
  268. self.assertEqual(p.returncode, 0)
  269. def test_stdout_stderr_pipe(self):
  270. # capture stdout and stderr to the same pipe
  271. p = subprocess.Popen([sys.executable, "-c",
  272. 'import sys;'
  273. 'sys.stdout.write("apple");'
  274. 'sys.stdout.flush();'
  275. 'sys.stderr.write("orange")'],
  276. stdout=subprocess.PIPE,
  277. stderr=subprocess.STDOUT)
  278. self.addCleanup(p.stdout.close)
  279. self.assertStderrEqual(p.stdout.read(), "appleorange")
  280. def test_stdout_stderr_file(self):
  281. # capture stdout and stderr to the same open file
  282. tf = tempfile.TemporaryFile()
  283. p = subprocess.Popen([sys.executable, "-c",
  284. 'import sys;'
  285. 'sys.stdout.write("apple");'
  286. 'sys.stdout.flush();'
  287. 'sys.stderr.write("orange")'],
  288. stdout=tf,
  289. stderr=tf)
  290. p.wait()
  291. tf.seek(0)
  292. self.assertStderrEqual(tf.read(), "appleorange")
  293. def test_stdout_filedes_of_stdout(self):
  294. # stdout is set to 1 (#1531862).
  295. # To avoid printing the text on stdout, we do something similar to
  296. # test_stdout_none (see above). The parent subprocess calls the child
  297. # subprocess passing stdout=1, and this test uses stdout=PIPE in
  298. # order to capture and check the output of the parent. See #11963.
  299. code = ('import sys, subprocess; '
  300. 'rc = subprocess.call([sys.executable, "-c", '
  301. ' "import os, sys; sys.exit(os.write(sys.stdout.fileno(), '
  302. '\'test with stdout=1\'))"], stdout=1); '
  303. 'assert rc == 18')
  304. p = subprocess.Popen([sys.executable, "-c", code],
  305. stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  306. self.addCleanup(p.stdout.close)
  307. self.addCleanup(p.stderr.close)
  308. out, err = p.communicate()
  309. self.assertEqual(p.returncode, 0, err)
  310. self.assertEqual(out.rstrip(), 'test with stdout=1')
  311. def test_cwd(self):
  312. tmpdir = tempfile.gettempdir()
  313. # We cannot use os.path.realpath to canonicalize the path,
  314. # since it doesn't expand Tru64 {memb} strings. See bug 1063571.
  315. cwd = os.getcwd()
  316. os.chdir(tmpdir)
  317. tmpdir = os.getcwd()
  318. os.chdir(cwd)
  319. p = subprocess.Popen([sys.executable, "-c",
  320. 'import sys,os;'
  321. 'sys.stdout.write(os.getcwd())'],
  322. stdout=subprocess.PIPE,
  323. cwd=tmpdir)
  324. self.addCleanup(p.stdout.close)
  325. normcase = os.path.normcase
  326. self.assertEqual(normcase(p.stdout.read()), normcase(tmpdir))
  327. def test_env(self):
  328. newenv = os.environ.copy()
  329. newenv["FRUIT"] = "orange"
  330. p = subprocess.Popen([sys.executable, "-c",
  331. 'import sys,os;'
  332. 'sys.stdout.write(os.getenv("FRUIT"))'],
  333. stdout=subprocess.PIPE,
  334. env=newenv)
  335. self.addCleanup(p.stdout.close)
  336. self.assertEqual(p.stdout.read(), "orange")
  337. def test_communicate_stdin(self):
  338. p = subprocess.Popen([sys.executable, "-c",
  339. 'import sys;'
  340. 'sys.exit(sys.stdin.read() == "pear")'],
  341. stdin=subprocess.PIPE)
  342. p.communicate("pear")
  343. self.assertEqual(p.returncode, 1)
  344. def test_communicate_stdout(self):
  345. p = subprocess.Popen([sys.executable, "-c",
  346. 'import sys; sys.stdout.write("pineapple")'],
  347. stdout=subprocess.PIPE)
  348. (stdout, stderr) = p.communicate()
  349. self.assertEqual(stdout, "pineapple")
  350. self.assertEqual(stderr, None)
  351. def test_communicate_stderr(self):
  352. p = subprocess.Popen([sys.executable, "-c",
  353. 'import sys; sys.stderr.write("pineapple")'],
  354. stderr=subprocess.PIPE)
  355. (stdout, stderr) = p.communicate()
  356. self.assertEqual(stdout, None)
  357. self.assertStderrEqual(stderr, "pineapple")
  358. def test_communicate(self):
  359. p = subprocess.Popen([sys.executable, "-c",
  360. 'import sys,os;'
  361. 'sys.stderr.write("pineapple");'
  362. 'sys.stdout.write(sys.stdin.read())'],
  363. stdin=subprocess.PIPE,
  364. stdout=subprocess.PIPE,
  365. stderr=subprocess.PIPE)
  366. self.addCleanup(p.stdout.close)
  367. self.addCleanup(p.stderr.close)
  368. self.addCleanup(p.stdin.close)
  369. (stdout, stderr) = p.communicate("banana")
  370. self.assertEqual(stdout, "banana")
  371. self.assertStderrEqual(stderr, "pineapple")
  372. # This test is Linux specific for simplicity to at least have
  373. # some coverage. It is not a platform specific bug.
  374. @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
  375. "Linux specific")
  376. # Test for the fd leak reported in http://bugs.python.org/issue2791.
  377. def test_communicate_pipe_fd_leak(self):
  378. fd_directory = '/proc/%d/fd' % os.getpid()
  379. num_fds_before_popen = len(os.listdir(fd_directory))
  380. p = subprocess.Popen([sys.executable, "-c", "print()"],
  381. stdout=subprocess.PIPE)
  382. p.communicate()
  383. num_fds_after_communicate = len(os.listdir(fd_directory))
  384. del p
  385. num_fds_after_destruction = len(os.listdir(fd_directory))
  386. self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
  387. self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
  388. def test_communicate_returns(self):
  389. # communicate() should return None if no redirection is active
  390. p = subprocess.Popen([sys.executable, "-c",
  391. "import sys; sys.exit(47)"])
  392. (stdout, stderr) = p.communicate()
  393. self.assertEqual(stdout, None)
  394. self.assertEqual(stderr, None)
  395. def test_communicate_pipe_buf(self):
  396. # communicate() with writes larger than pipe_buf
  397. # This test will probably deadlock rather than fail, if
  398. # communicate() does not work properly.
  399. x, y = os.pipe()
  400. if mswindows:
  401. pipe_buf = 512
  402. else:
  403. pipe_buf = os.fpathconf(x, "PC_PIPE_BUF")
  404. os.close(x)
  405. os.close(y)
  406. p = subprocess.Popen([sys.executable, "-c",
  407. 'import sys,os;'
  408. 'sys.stdout.write(sys.stdin.read(47));'
  409. 'sys.stderr.write("xyz"*%d);'
  410. 'sys.stdout.write(sys.stdin.read())' % pipe_buf],
  411. stdin=subprocess.PIPE,
  412. stdout=subprocess.PIPE,
  413. stderr=subprocess.PIPE)
  414. self.addCleanup(p.stdout.close)
  415. self.addCleanup(p.stderr.close)
  416. self.addCleanup(p.stdin.close)
  417. string_to_write = "abc"*pipe_buf
  418. (stdout, stderr) = p.communicate(string_to_write)
  419. self.assertEqual(stdout, string_to_write)
  420. def test_writes_before_communicate(self):
  421. # stdin.write before communicate()
  422. p = subprocess.Popen([sys.executable, "-c",
  423. 'import sys,os;'
  424. 'sys.stdout.write(sys.stdin.read())'],
  425. stdin=subprocess.PIPE,
  426. stdout=subprocess.PIPE,
  427. stderr=subprocess.PIPE)
  428. self.addCleanup(p.stdout.close)
  429. self.addCleanup(p.stderr.close)
  430. self.addCleanup(p.stdin.close)
  431. p.stdin.write("banana")
  432. (stdout, stderr) = p.communicate("split")
  433. self.assertEqual(stdout, "bananasplit")
  434. self.assertStderrEqual(stderr, "")
  435. def test_universal_newlines(self):
  436. p = subprocess.Popen([sys.executable, "-c",
  437. 'import sys,os;' + SETBINARY +
  438. 'sys.stdout.write("line1\\n");'
  439. 'sys.stdout.flush();'
  440. 'sys.stdout.write("line2\\r");'
  441. 'sys.stdout.flush();'
  442. 'sys.stdout.write("line3\\r\\n");'
  443. 'sys.stdout.flush();'
  444. 'sys.stdout.write("line4\\r");'
  445. 'sys.stdout.flush();'
  446. 'sys.stdout.write("\\nline5");'
  447. 'sys.stdout.flush();'
  448. 'sys.stdout.write("\\nline6");'],
  449. stdout=subprocess.PIPE,
  450. universal_newlines=1)
  451. self.addCleanup(p.stdout.close)
  452. stdout = p.stdout.read()
  453. if hasattr(file, 'newlines'):
  454. # Interpreter with universal newline support
  455. self.assertEqual(stdout,
  456. "line1\nline2\nline3\nline4\nline5\nline6")
  457. else:
  458. # Interpreter without universal newline support
  459. self.assertEqual(stdout,
  460. "line1\nline2\rline3\r\nline4\r\nline5\nline6")
  461. def test_universal_newlines_communicate(self):
  462. # universal newlines through communicate()
  463. p = subprocess.Popen([sys.executable, "-c",
  464. 'import sys,os;' + SETBINARY +
  465. 'sys.stdout.write("line1\\n");'
  466. 'sys.stdout.flush();'
  467. 'sys.stdout.write("line2\\r");'
  468. 'sys.stdout.flush();'
  469. 'sys.stdout.write("line3\\r\\n");'
  470. 'sys.stdout.flush();'
  471. 'sys.stdout.write("line4\\r");'
  472. 'sys.stdout.flush();'
  473. 'sys.stdout.write("\\nline5");'
  474. 'sys.stdout.flush();'
  475. 'sys.stdout.write("\\nline6");'],
  476. stdout=subprocess.PIPE, stderr=subprocess.PIPE,
  477. universal_newlines=1)
  478. self.addCleanup(p.stdout.close)
  479. self.addCleanup(p.stderr.close)
  480. (stdout, stderr) = p.communicate()
  481. if hasattr(file, 'newlines'):
  482. # Interpreter with universal newline support
  483. self.assertEqual(stdout,
  484. "line1\nline2\nline3\nline4\nline5\nline6")
  485. else:
  486. # Interpreter without universal newline support
  487. self.assertEqual(stdout,
  488. "line1\nline2\rline3\r\nline4\r\nline5\nline6")
  489. def test_no_leaking(self):
  490. # Make sure we leak no resources
  491. if not mswindows:
  492. max_handles = 1026 # too much for most UNIX systems
  493. else:
  494. max_handles = 2050 # too much for (at least some) Windows setups
  495. handles = []
  496. try:
  497. for i in range(max_handles):
  498. try:
  499. handles.append(os.open(test_support.TESTFN,
  500. os.O_WRONLY | os.O_CREAT))
  501. except OSError as e:
  502. if e.errno != errno.EMFILE:
  503. raise
  504. break
  505. else:
  506. self.skipTest("failed to reach the file descriptor limit "
  507. "(tried %d)" % max_handles)
  508. # Close a couple of them (should be enough for a subprocess)
  509. for i in range(10):
  510. os.close(handles.pop())
  511. # Loop creating some subprocesses. If one of them leaks some fds,
  512. # the next loop iteration will fail by reaching the max fd limit.
  513. for i in range(15):
  514. p = subprocess.Popen([sys.executable, "-c",
  515. "import sys;"
  516. "sys.stdout.write(sys.stdin.read())"],
  517. stdin=subprocess.PIPE,
  518. stdout=subprocess.PIPE,
  519. stderr=subprocess.PIPE)
  520. data = p.communicate(b"lime")[0]
  521. self.assertEqual(data, b"lime")
  522. finally:
  523. for h in handles:
  524. os.close(h)
  525. test_support.unlink(test_support.TESTFN)
  526. def test_list2cmdline(self):
  527. self.assertEqual(subprocess.list2cmdline(['a b c', 'd', 'e']),
  528. '"a b c" d e')
  529. self.assertEqual(subprocess.list2cmdline(['ab"c', '\\', 'd']),
  530. 'ab\\"c \\ d')
  531. self.assertEqual(subprocess.list2cmdline(['ab"c', ' \\', 'd']),
  532. 'ab\\"c " \\\\" d')
  533. self.assertEqual(subprocess.list2cmdline(['a\\\\\\b', 'de fg', 'h']),
  534. 'a\\\\\\b "de fg" h')
  535. self.assertEqual(subprocess.list2cmdline(['a\\"b', 'c', 'd']),
  536. 'a\\\\\\"b c d')
  537. self.assertEqual(subprocess.list2cmdline(['a\\\\b c', 'd', 'e']),
  538. '"a\\\\b c" d e')
  539. self.assertEqual(subprocess.list2cmdline(['a\\\\b\\ c', 'd', 'e']),
  540. '"a\\\\b\\ c" d e')
  541. self.assertEqual(subprocess.list2cmdline(['ab', '']),
  542. 'ab ""')
  543. def test_poll(self):
  544. p = subprocess.Popen([sys.executable,
  545. "-c", "import time; time.sleep(1)"])
  546. count = 0
  547. while p.poll() is None:
  548. time.sleep(0.1)
  549. count += 1
  550. # We expect that the poll loop probably went around about 10 times,
  551. # but, based on system scheduling we can't control, it's possible
  552. # poll() never returned None. It "should be" very rare that it
  553. # didn't go around at least twice.
  554. self.assertGreaterEqual(count, 2)
  555. # Subsequent invocations should just return the returncode
  556. self.assertEqual(p.poll(), 0)
  557. def test_wait(self):
  558. p = subprocess.Popen([sys.executable,
  559. "-c", "import time; time.sleep(2)"])
  560. self.assertEqual(p.wait(), 0)
  561. # Subsequent invocations should just return the returncode
  562. self.assertEqual(p.wait(), 0)
  563. def test_invalid_bufsize(self):
  564. # an invalid type of the bufsize argument should raise
  565. # TypeError.
  566. with self.assertRaises(TypeError):
  567. subprocess.Popen([sys.executable, "-c", "pass"], "orange")
  568. def test_leaking_fds_on_error(self):
  569. # see bug #5179: Popen leaks file descriptors to PIPEs if
  570. # the child fails to execute; this will eventually exhaust
  571. # the maximum number of open fds. 1024 seems a very common
  572. # value for that limit, but Windows has 2048, so we loop
  573. # 1024 times (each call leaked two fds).
  574. for i in range(1024):
  575. # Windows raises IOError. Others raise OSError.
  576. with self.assertRaises(EnvironmentError) as c:
  577. subprocess.Popen(['nonexisting_i_hope'],
  578. stdout=subprocess.PIPE,
  579. stderr=subprocess.PIPE)
  580. # ignore errors that indicate the command was not found
  581. if c.exception.errno not in (errno.ENOENT, errno.EACCES):
  582. raise c.exception
  583. @unittest.skipIf(threading is None, "threading required")
  584. def test_double_close_on_error(self):
  585. # Issue #18851
  586. fds = []
  587. def open_fds():
  588. for i in range(20):
  589. fds.extend(os.pipe())
  590. time.sleep(0.001)
  591. t = threading.Thread(target=open_fds)
  592. t.start()
  593. try:
  594. with self.assertRaises(EnvironmentError):
  595. subprocess.Popen(['nonexisting_i_hope'],
  596. stdin=subprocess.PIPE,
  597. stdout=subprocess.PIPE,
  598. stderr=subprocess.PIPE)
  599. finally:
  600. t.join()
  601. exc = None
  602. for fd in fds:
  603. # If a double close occurred, some of those fds will
  604. # already have been closed by mistake, and os.close()
  605. # here will raise.
  606. try:
  607. os.close(fd)
  608. except OSError as e:
  609. exc = e
  610. if exc is not None:
  611. raise exc
  612. def test_handles_closed_on_exception(self):
  613. # If CreateProcess exits with an error, ensure the
  614. # duplicate output handles are released
  615. ifhandle, ifname = tempfile.mkstemp()
  616. ofhandle, ofname = tempfile.mkstemp()
  617. efhandle, efname = tempfile.mkstemp()
  618. try:
  619. subprocess.Popen (["*"], stdin=ifhandle, stdout=ofhandle,
  620. stderr=efhandle)
  621. except OSError:
  622. os.close(ifhandle)
  623. os.remove(ifname)
  624. os.close(ofhandle)
  625. os.remove(ofname)
  626. os.close(efhandle)
  627. os.remove(efname)
  628. self.assertFalse(os.path.exists(ifname))
  629. self.assertFalse(os.path.exists(ofname))
  630. self.assertFalse(os.path.exists(efname))
  631. def test_communicate_epipe(self):
  632. # Issue 10963: communicate() should hide EPIPE
  633. p = subprocess.Popen([sys.executable, "-c", 'pass'],
  634. stdin=subprocess.PIPE,
  635. stdout=subprocess.PIPE,
  636. stderr=subprocess.PIPE)
  637. self.addCleanup(p.stdout.close)
  638. self.addCleanup(p.stderr.close)
  639. self.addCleanup(p.stdin.close)
  640. p.communicate("x" * 2**20)
  641. def test_communicate_epipe_only_stdin(self):
  642. # Issue 10963: communicate() should hide EPIPE
  643. p = subprocess.Popen([sys.executable, "-c", 'pass'],
  644. stdin=subprocess.PIPE)
  645. self.addCleanup(p.stdin.close)
  646. time.sleep(2)
  647. p.communicate("x" * 2**20)
  648. # This test is Linux-ish specific for simplicity to at least have
  649. # some coverage. It is not a platform specific bug.
  650. @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
  651. "Linux specific")
  652. def test_failed_child_execute_fd_leak(self):
  653. """Test for the fork() failure fd leak reported in issue16327."""
  654. fd_directory = '/proc/%d/fd' % os.getpid()
  655. fds_before_popen = os.listdir(fd_directory)
  656. with self.assertRaises(PopenTestException):
  657. PopenExecuteChildRaises(
  658. [sys.executable, '-c', 'pass'], stdin=subprocess.PIPE,
  659. stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  660. # NOTE: This test doesn't verify that the real _execute_child
  661. # does not close the file descriptors itself on the way out
  662. # during an exception. Code inspection has confirmed that.
  663. fds_after_exception = os.listdir(fd_directory)
  664. self.assertEqual(fds_before_popen, fds_after_exception)
  665. # context manager
  666. class _SuppressCoreFiles(object):
  667. """Try to prevent core files from being created."""
  668. old_limit = None
  669. def __enter__(self):
  670. """Try to save previous ulimit, then set it to (0, 0)."""
  671. if resource is not None:
  672. try:
  673. self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
  674. resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
  675. except (ValueError, resource.error):
  676. pass
  677. if sys.platform == 'darwin':
  678. # Check if the 'Crash Reporter' on OSX was configured
  679. # in 'Developer' mode and warn that it will get triggered
  680. # when it is.
  681. #
  682. # This assumes that this context manager is used in tests
  683. # that might trigger the next manager.
  684. value = subprocess.Popen(['/usr/bin/defaults', 'read',
  685. 'com.apple.CrashReporter', 'DialogType'],
  686. stdout=subprocess.PIPE).communicate()[0]
  687. if value.strip() == b'developer':
  688. print "this tests triggers the Crash Reporter, that is intentional"
  689. sys.stdout.flush()
  690. def __exit__(self, *args):
  691. """Return core file behavior to default."""
  692. if self.old_limit is None:
  693. return
  694. if resource is not None:
  695. try:
  696. resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
  697. except (ValueError, resource.error):
  698. pass
  699. @unittest.skipUnless(hasattr(signal, 'SIGALRM'),
  700. "Requires signal.SIGALRM")
  701. def test_communicate_eintr(self):
  702. # Issue #12493: communicate() should handle EINTR
  703. def handler(signum, frame):
  704. pass
  705. old_handler = signal.signal(signal.SIGALRM, handler)
  706. self.addCleanup(signal.signal, signal.SIGALRM, old_handler)
  707. # the process is running for 2 seconds
  708. args = [sys.executable, "-c", 'import time; time.sleep(2)']
  709. for stream in ('stdout', 'stderr'):
  710. kw = {stream: subprocess.PIPE}
  711. with subprocess.Popen(args, **kw) as process:
  712. signal.alarm(1)
  713. # communicate() will be interrupted by SIGALRM
  714. process.communicate()
  715. @unittest.skipIf(mswindows, "POSIX specific tests")
  716. class POSIXProcessTestCase(BaseTestCase):
  717. def test_exceptions(self):
  718. # caught & re-raised exceptions
  719. with self.assertRaises(OSError) as c:
  720. p = subprocess.Popen([sys.executable, "-c", ""],
  721. cwd="/this/path/does/not/exist")
  722. # The attribute child_traceback should contain "os.chdir" somewhere.
  723. self.assertIn("os.chdir", c.exception.child_traceback)
  724. def test_run_abort(self):
  725. # returncode handles signal termination
  726. with _SuppressCoreFiles():
  727. p = subprocess.Popen([sys.executable, "-c",
  728. "import os; os.abort()"])
  729. p.wait()
  730. self.assertEqual(-p.returncode, signal.SIGABRT)
  731. def test_preexec(self):
  732. # preexec function
  733. p = subprocess.Popen([sys.executable, "-c",
  734. "import sys, os;"
  735. "sys.stdout.write(os.getenv('FRUIT'))"],
  736. stdout=subprocess.PIPE,
  737. preexec_fn=lambda: os.putenv("FRUIT", "apple"))
  738. self.addCleanup(p.stdout.close)
  739. self.assertEqual(p.stdout.read(), "apple")
  740. class _TestExecuteChildPopen(subprocess.Popen):
  741. """Used to test behavior at the end of _execute_child."""
  742. def __init__(self, testcase, *args, **kwargs):
  743. self._testcase = testcase
  744. subprocess.Popen.__init__(self, *args, **kwargs)
  745. def _execute_child(
  746. self, args, executable, preexec_fn, close_fds, cwd, env,
  747. universal_newlines, startupinfo, creationflags, shell, to_close,
  748. p2cread, p2cwrite,
  749. c2pread, c2pwrite,
  750. errread, errwrite):
  751. try:
  752. subprocess.Popen._execute_child(
  753. self, args, executable, preexec_fn, close_fds,
  754. cwd, env, universal_newlines,
  755. startupinfo, creationflags, shell, to_close,
  756. p2cread, p2cwrite,
  757. c2pread, c2pwrite,
  758. errread, errwrite)
  759. finally:
  760. # Open a bunch of file descriptors and verify that
  761. # none of them are the same as the ones the Popen
  762. # instance is using for stdin/stdout/stderr.
  763. devzero_fds = [os.open("/dev/zero", os.O_RDONLY)
  764. for _ in range(8)]
  765. try:
  766. for fd in devzero_fds:
  767. self._testcase.assertNotIn(
  768. fd, (p2cwrite, c2pread, errread))
  769. finally:
  770. for fd in devzero_fds:
  771. os.close(fd)
  772. @unittest.skipIf(not os.path.exists("/dev/zero"), "/dev/zero required.")
  773. def test_preexec_errpipe_does_not_double_close_pipes(self):
  774. """Issue16140: Don't double close pipes on preexec error."""
  775. def raise_it():
  776. raise RuntimeError("force the _execute_child() errpipe_data path.")
  777. with self.assertRaises(RuntimeError):
  778. self._TestExecuteChildPopen(
  779. self, [sys.executable, "-c", "pass"],
  780. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  781. stderr=subprocess.PIPE, preexec_fn=raise_it)
  782. def test_args_string(self):
  783. # args is a string
  784. f, fname = tempfile.mkstemp()
  785. os.write(f, "#!/bin/sh\n")
  786. os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
  787. sys.executable)
  788. os.close(f)
  789. os.chmod(fname, 0o700)
  790. p = subprocess.Popen(fname)
  791. p.wait()
  792. os.remove(fname)
  793. self.assertEqual(p.returncode, 47)
  794. def test_invalid_args(self):
  795. # invalid arguments should raise ValueError
  796. self.assertRaises(ValueError, subprocess.call,
  797. [sys.executable, "-c",
  798. "import sys; sys.exit(47)"],
  799. startupinfo=47)
  800. self.assertRaises(ValueError, subprocess.call,
  801. [sys.executable, "-c",
  802. "import sys; sys.exit(47)"],
  803. creationflags=47)
  804. def test_shell_sequence(self):
  805. # Run command through the shell (sequence)
  806. newenv = os.environ.copy()
  807. newenv["FRUIT"] = "apple"
  808. p = subprocess.Popen(["echo $FRUIT"], shell=1,
  809. stdout=subprocess.PIPE,
  810. env=newenv)
  811. self.addCleanup(p.stdout.close)
  812. self.assertEqual(p.stdout.read().strip(), "apple")
  813. def test_shell_string(self):
  814. # Run command through the shell (string)
  815. newenv = os.environ.copy()
  816. newenv["FRUIT"] = "apple"
  817. p = subprocess.Popen("echo $FRUIT", shell=1,
  818. stdout=subprocess.PIPE,
  819. env=newenv)
  820. self.addCleanup(p.stdout.close)
  821. self.assertEqual(p.stdout.read().strip(), "apple")
  822. def test_call_string(self):
  823. # call() function with string argument on UNIX
  824. f, fname = tempfile.mkstemp()
  825. os.write(f, "#!/bin/sh\n")
  826. os.write(f, "exec '%s' -c 'import sys; sys.exit(47)'\n" %
  827. sys.executable)
  828. os.close(f)
  829. os.chmod(fname, 0700)
  830. rc = subprocess.call(fname)
  831. os.remove(fname)
  832. self.assertEqual(rc, 47)
  833. def test_specific_shell(self):
  834. # Issue #9265: Incorrect name passed as arg[0].
  835. shells = []
  836. for prefix in ['/bin', '/usr/bin/', '/usr/local/bin']:
  837. for name in ['bash', 'ksh']:
  838. sh = os.path.join(prefix, name)
  839. if os.path.isfile(sh):
  840. shells.append(sh)
  841. if not shells: # Will probably work for any shell but csh.
  842. self.skipTest("bash or ksh required for this test")
  843. sh = '/bin/sh'
  844. if os.path.isfile(sh) and not os.path.islink(sh):
  845. # Test will fail if /bin/sh is a symlink to csh.
  846. shells.append(sh)
  847. for sh in shells:
  848. p = subprocess.Popen("echo $0", executable=sh, shell=True,
  849. stdout=subprocess.PIPE)
  850. self.addCleanup(p.stdout.close)
  851. self.assertEqual(p.stdout.read().strip(), sh)
  852. def _kill_process(self, method, *args):
  853. # Do not inherit file handles from the parent.
  854. # It should fix failures on some platforms.
  855. p = subprocess.Popen([sys.executable, "-c", """if 1:
  856. import sys, time
  857. sys.stdout.write('x\\n')
  858. sys.stdout.flush()
  859. time.sleep(30)
  860. """],
  861. close_fds=True,
  862. stdin=subprocess.PIPE,
  863. stdout=subprocess.PIPE,
  864. stderr=subprocess.PIPE)
  865. # Wait for the interpreter to be completely initialized before
  866. # sending any signal.
  867. p.stdout.read(1)
  868. getattr(p, method)(*args)
  869. return p
  870. @unittest.skipIf(sys.platform.startswith(('netbsd', 'openbsd')),
  871. "Due to known OS bug (issue #16762)")
  872. def _kill_dead_process(self, method, *args):
  873. # Do not inherit file handles from the parent.
  874. # It should fix failures on some platforms.
  875. p = subprocess.Popen([sys.executable, "-c", """if 1:
  876. import sys, time
  877. sys.stdout.write('x\\n')
  878. sys.stdout.flush()
  879. """],
  880. close_fds=True,
  881. stdin=subprocess.PIPE,
  882. stdout=subprocess.PIPE,
  883. stderr=subprocess.PIPE)
  884. # Wait for the interpreter to be completely initialized before
  885. # sending any signal.
  886. p.stdout.read(1)
  887. # The process should end after this
  888. time.sleep(1)
  889. # This shouldn't raise even though the child is now dead
  890. getattr(p, method)(*args)
  891. p.communicate()
  892. def test_send_signal(self):
  893. p = self._kill_process('send_signal', signal.SIGINT)
  894. _, stderr = p.communicate()
  895. self.assertIn('KeyboardInterrupt', stderr)
  896. self.assertNotEqual(p.wait(), 0)
  897. def test_kill(self):
  898. p = self._kill_process('kill')
  899. _, stderr = p.communicate()
  900. self.assertStderrEqual(stderr, '')
  901. self.assertEqual(p.wait(), -signal.SIGKILL)
  902. def test_terminate(self):
  903. p = self._kill_process('terminate')
  904. _, stderr = p.communicate()
  905. self.assertStderrEqual(stderr, '')
  906. self.assertEqual(p.wait(), -signal.SIGTERM)
  907. def test_send_signal_dead(self):
  908. # Sending a signal to a dead process
  909. self._kill_dead_process('send_signal', signal.SIGINT)
  910. def test_kill_dead(self):
  911. # Killing a dead process
  912. self._kill_dead_process('kill')
  913. def test_terminate_dead(self):
  914. # Terminating a dead process
  915. self._kill_dead_process('terminate')
  916. def check_close_std_fds(self, fds):
  917. # Issue #9905: test that subprocess pipes still work properly with
  918. # some standard fds closed
  919. stdin = 0
  920. newfds = []
  921. for a in fds:
  922. b = os.dup(a)
  923. newfds.append(b)
  924. if a == 0:
  925. stdin = b
  926. try:
  927. for fd in fds:
  928. os.close(fd)
  929. out, err = subprocess.Popen([sys.executable, "-c",
  930. 'import sys;'
  931. 'sys.stdout.write("apple");'
  932. 'sys.stdout.flush();'
  933. 'sys.stderr.write("orange")'],
  934. stdin=stdin,
  935. stdout=subprocess.PIPE,
  936. stderr=subprocess.PIPE).communicate()
  937. err = test_support.strip_python_stderr(err)
  938. self.assertEqual((out, err), (b'apple', b'orange'))
  939. finally:
  940. for b, a in zip(newfds, fds):
  941. os.dup2(b, a)
  942. for b in newfds:
  943. os.close(b)
  944. def test_close_fd_0(self):
  945. self.check_close_std_fds([0])
  946. def test_close_fd_1(self):
  947. self.check_close_std_fds([1])
  948. def test_close_fd_2(self):
  949. self.check_close_std_fds([2])
  950. def test_close_fds_0_1(self):
  951. self.check_close_std_fds([0, 1])
  952. def test_close_fds_0_2(self):
  953. self.check_close_std_fds([0, 2])
  954. def test_close_fds_1_2(self):
  955. self.check_close_std_fds([1, 2])
  956. def test_close_fds_0_1_2(self):
  957. # Issue #10806: test that subprocess pipes still work properly with
  958. # all standard fds closed.
  959. self.check_close_std_fds([0, 1, 2])
  960. def check_swap_fds(self, stdin_no, stdout_no, stderr_no):
  961. # open up some temporary files
  962. temps = [tempfile.mkstemp() for i in range(3)]
  963. temp_fds = [fd for fd, fname in temps]
  964. try:
  965. # unlink the files -- we won't need to reopen them
  966. for fd, fname in temps:
  967. os.unlink(fname)
  968. # save a copy of the standard file descriptors
  969. saved_fds = [os.dup(fd) for fd in range(3)]
  970. try:
  971. # duplicate the temp files over the standard fd's 0, 1, 2
  972. for fd, temp_fd in enumerate(temp_fds):
  973. os.dup2(temp_fd, fd)
  974. # write some data to what will become stdin, and rewind
  975. os.write(stdin_no, b"STDIN")
  976. os.lseek(stdin_no, 0, 0)
  977. # now use those files in the given order, so that subprocess
  978. # has to rearrange them in the child
  979. p = subprocess.Popen([sys.executable, "-c",
  980. 'import sys; got = sys.stdin.read();'
  981. 'sys.stdout.write("got %s"%got); sys.stderr.write("err")'],
  982. stdin=stdin_no,
  983. stdout=stdout_no,
  984. stderr=stderr_no)
  985. p.wait()
  986. for fd in temp_fds:
  987. os.lseek(fd, 0, 0)
  988. out = os.read(stdout_no, 1024)
  989. err = test_support.strip_python_stderr(os.read(stderr_no, 1024))
  990. finally:
  991. for std, saved in enumerate(saved_fds):
  992. os.dup2(saved, std)
  993. os.close(saved)
  994. self.assertEqual(out, b"got STDIN")
  995. self.assertEqual(err, b"err")
  996. finally:
  997. for fd in temp_fds:
  998. os.close(fd)
  999. # When duping fds, if there arises a situation where one of the fds is
  1000. # either 0, 1 or 2, it is possible that it is overwritten (#12607).
  1001. # This tests all combinations of this.
  1002. def test_swap_fds(self):
  1003. self.check_swap_fds(0, 1, 2)
  1004. self.check_swap_fds(0, 2, 1)
  1005. self.check_swap_fds(1, 0, 2)
  1006. self.check_swap_fds(1, 2, 0)
  1007. self.check_swap_fds(2, 0, 1)
  1008. self.check_swap_fds(2, 1, 0)
  1009. def test_wait_when_sigchild_ignored(self):
  1010. # NOTE: sigchild_ignore.py may not be an effective test on all OSes.
  1011. sigchild_ignore = test_support.findfile("sigchild_ignore.py",
  1012. subdir="subprocessdata")
  1013. p = subprocess.Popen([sys.executable, sigchild_ignore],
  1014. stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  1015. stdout, stderr = p.communicate()
  1016. self.assertEqual(0, p.returncode, "sigchild_ignore.py exited"
  1017. " non-zero with this error:\n%s" % stderr)
  1018. def test_zombie_fast_process_del(self):
  1019. # Issue #12650: on Unix, if Popen.__del__() was called before the
  1020. # process exited, it wouldn't be added to subprocess._active, and would
  1021. # remain a zombie.
  1022. # spawn a Popen, and delete its reference before it exits
  1023. p = subprocess.Popen([sys.executable, "-c",
  1024. 'import sys, time;'
  1025. 'time.sleep(0.2)'],
  1026. stdout=subprocess.PIPE,
  1027. stderr=subprocess.PIPE)
  1028. self.addCleanup(p.stdout.close)
  1029. self.addCleanup(p.stderr.close)
  1030. ident = id(p)
  1031. pid = p.pid
  1032. del p
  1033. # check that p is in the active processes list
  1034. self.assertIn(ident, [id(o) for o in subprocess._active])
  1035. def test_leak_fast_process_del_killed(self):
  1036. # Issue #12650: on Unix, if Popen.__del__() was called before the
  1037. # process exited, and the process got killed by a signal, it would never
  1038. # be removed from subprocess._active, which triggered a FD and memory
  1039. # leak.
  1040. # spawn a Popen, delete its reference and kill it
  1041. p = subprocess.Popen([sys.executable, "-c",
  1042. 'import time;'
  1043. 'time.sleep(3)'],
  1044. stdout=subprocess.PIPE,
  1045. stderr=subprocess.PIPE)
  1046. self.addCleanup(p.stdout.close)
  1047. self.addCleanup(p.stderr.close)
  1048. ident = id(p)
  1049. pid = p.pid
  1050. del p
  1051. os.kill(pid, signal.SIGKILL)
  1052. # check that p is in the active processes list
  1053. self.assertIn(ident, [id(o) for o in subprocess._active])
  1054. # let some time for the process to exit, and create a new Popen: this
  1055. # should trigger the wait() of p
  1056. time.sleep(0.2)
  1057. with self.assertRaises(EnvironmentError) as c:
  1058. with subprocess.Popen(['nonexisting_i_hope'],
  1059. stdout=subprocess.PIPE,
  1060. stderr=subprocess.PIPE) as proc:
  1061. pass
  1062. # p should have been wait()ed on, and removed from the _active list
  1063. self.assertRaises(OSError, os.waitpid, pid, 0)
  1064. self.assertNotIn(ident, [id(o) for o in subprocess._active])
  1065. def test_pipe_cloexec(self):
  1066. # Issue 12786: check that the communication pipes' FDs are set CLOEXEC,
  1067. # and are not inherited by another child process.
  1068. p1 = subprocess.Popen([sys.executable, "-c",
  1069. 'import os;'
  1070. 'os.read(0, 1)'
  1071. ],
  1072. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  1073. stderr=subprocess.PIPE)
  1074. p2 = subprocess.Popen([sys.executable, "-c", """if True:
  1075. import os, errno, sys
  1076. for fd in %r:
  1077. try:
  1078. os.close(fd)
  1079. except OSError as e:
  1080. if e.errno != errno.EBADF:
  1081. raise
  1082. else:
  1083. sys.exit(1)
  1084. sys.exit(0)
  1085. """ % [f.fileno() for f in (p1.stdin, p1.stdout,
  1086. p1.stderr)]
  1087. ],
  1088. stdin=subprocess.PIPE, stdout=subprocess.PIPE,
  1089. stderr=subprocess.PIPE, close_fds=False)
  1090. p1.communicate('foo')
  1091. _, stderr = p2.communicate()
  1092. self.assertEqual(p2.returncode, 0, "Unexpected error: " + repr(stderr))
  1093. @unittest.skipUnless(mswindows, "Windows specific tests")
  1094. class Win32ProcessTestCase(BaseTestCase):
  1095. def test_startupinfo(self):
  1096. # startupinfo argument
  1097. # We uses hardcoded constants, because we do not want to
  1098. # depend on win32all.
  1099. STARTF_USESHOWWINDOW = 1
  1100. SW_MAXIMIZE = 3
  1101. startupinfo = subprocess.STARTUPINFO()
  1102. startupinfo.dwFlags = STARTF_USESHOWWINDOW
  1103. startupinfo.wShowWindow = SW_MAXIMIZE
  1104. # Since Python is a console process, it won't be affected
  1105. # by wShowWindow, but the argument should be silently
  1106. # ignored
  1107. subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
  1108. startupinfo=startupinfo)
  1109. def test_creationflags(self):
  1110. # creationflags argument
  1111. CREATE_NEW_CONSOLE = 16
  1112. sys.stderr.write(" a DOS box should flash briefly ...\n")
  1113. subprocess.call(sys.executable +
  1114. ' -c "import time; time.sleep(0.25)"',
  1115. creationflags=CREATE_NEW_CONSOLE)
  1116. def test_invalid_args(self):
  1117. # invalid arguments should raise ValueError
  1118. self.assertRaises(ValueError, subprocess.call,
  1119. [sys.executable, "-c",
  1120. "import sys; sys.exit(47)"],
  1121. preexec_fn=lambda: 1)
  1122. self.assertRaises(ValueError, subprocess.call,
  1123. [sys.executable, "-c",
  1124. "import sys; sys.exit(47)"],
  1125. stdout=subprocess.PIPE,
  1126. close_fds=True)
  1127. def test_close_fds(self):
  1128. # close file descriptors
  1129. rc = subprocess.call([sys.executable, "-c",
  1130. "import sys; sys.exit(47)"],
  1131. close_fds=True)
  1132. self.assertEqual(rc, 47)
  1133. def test_shell_sequence(self):
  1134. # Run command through the shell (sequence)
  1135. newenv = os.environ.copy()
  1136. newenv["FRUIT"] = "physalis"
  1137. p = subprocess.Popen(["set"], shell=1,
  1138. stdout=subprocess.PIPE,
  1139. env=newenv)
  1140. self.addCleanup(p.stdout.close)
  1141. self.assertIn("physalis", p.stdout.read())
  1142. def test_shell_string(self):
  1143. # Run command through the shell (string)
  1144. newenv = os.environ.copy()
  1145. newenv["FRUIT"] = "physalis"
  1146. p = subprocess.Popen("set", shell=1,
  1147. stdout=subprocess.PIPE,
  1148. env=newenv)
  1149. self.addCleanup(p.stdout.close)
  1150. self.assertIn("physalis", p.stdout.read())
  1151. def test_call_string(self):
  1152. # call() function with string argument on Windows
  1153. rc = subprocess.call(sys.executable +
  1154. ' -c "import sys; sys.exit(47)"')
  1155. self.assertEqual(rc, 47)
  1156. def _kill_process(self, method, *args):
  1157. # Some win32 buildbot raises EOFError if stdin is inherited
  1158. p = subprocess.Popen([sys.executable, "-c", """if 1:
  1159. import sys, time
  1160. sys.stdout.write('x\\n')
  1161. sys.stdout.flush()
  1162. time.sleep(30)
  1163. """],
  1164. stdin=subprocess.PIPE,
  1165. stdout=subprocess.PIPE,
  1166. stderr=subprocess.PIPE)
  1167. self.addCleanup(p.stdout.close)
  1168. self.addCleanup(p.stderr.close)
  1169. self.addCleanup(p.stdin.close)
  1170. # Wait for the interpreter to be completely initialized before
  1171. # sending any signal.
  1172. p.stdout.read(1)
  1173. getattr(p, method)(*args)
  1174. _, stderr = p.communicate()
  1175. self.assertStderrEqual(stderr, '')
  1176. returncode = p.wait()
  1177. self.assertNotEqual(returncode, 0)
  1178. def _kill_dead_process(self, method, *args):
  1179. p = subprocess.Popen([sys.executable, "-c", """if 1:
  1180. import sys, time
  1181. sys.stdout.write('x\\n')
  1182. sys.stdout.flush()
  1183. sys.exit(42)
  1184. """],
  1185. stdin=subprocess.PIPE,
  1186. stdout=subprocess.PIPE,
  1187. stderr=subprocess.PIPE)
  1188. self.addCleanup(p.stdout.close)
  1189. self.addCleanup(p.stderr.close)
  1190. self.addCleanup(p.stdin.close)
  1191. # Wait for the interpreter to be completely initialized before
  1192. # sending any signal.
  1193. p.stdout.read(1)
  1194. # The process should end after this
  1195. time.sleep(1)
  1196. # This shouldn't raise even though the child is now dead
  1197. getattr(p, method)(*args)
  1198. _, stderr = p.communicate()
  1199. self.assertStderrEqual(stderr, b'')
  1200. rc = p.wait()
  1201. self.assertEqual(rc, 42)
  1202. def test_send_signal(self):
  1203. self._kill_process('send_signal', signal.SIGTERM)
  1204. def test_kill(self):
  1205. self._kill_process('kill')
  1206. def test_terminate(self):
  1207. self._kill_process('terminate')
  1208. def test_send_signal_dead(self):
  1209. self._kill_dead_process('send_signal', signal.SIGTERM)
  1210. def test_kill_dead(self):
  1211. self._kill_dead_process('kill')
  1212. def test_terminate_dead(self):
  1213. self._kill_dead_process('terminate')
  1214. @unittest.skipUnless(getattr(subprocess, '_has_poll', False),
  1215. "poll system call not supported")
  1216. class ProcessTestCaseNoPoll(ProcessTestCase):
  1217. def setUp(self):
  1218. subprocess._has_poll = False
  1219. ProcessTestCase.setUp(self)
  1220. def tearDown(self):
  1221. subprocess._has_poll = True
  1222. ProcessTestCase.tearDown(self)
  1223. class HelperFunctionTests(unittest.TestCase):
  1224. @unittest.skipIf(mswindows, "errno and EINTR make no sense on windows")
  1225. def test_eintr_retry_call(self):
  1226. record_calls = []
  1227. def fake_os_func(*args):
  1228. record_calls.append(args)
  1229. if len(record_calls) == 2:
  1230. raise OSError(errno.EINTR, "fake interrupted system call")
  1231. return tuple(reversed(args))
  1232. self.assertEqual((999, 256),
  1233. subprocess._eintr_retry_call(fake_os_func, 256, 999))
  1234. self.assertEqual([(256, 999)], record_calls)
  1235. # This time there will be an EINTR so it will loop once.
  1236. self.assertEqual((666,),
  1237. subprocess._eintr_retry_call(fake_os_func, 666))
  1238. self.assertEqual([(256, 999), (666,), (666,)], record_calls)
  1239. @unittest.skipUnless(mswindows, "mswindows only")
  1240. class CommandsWithSpaces (BaseTestCase):
  1241. def setUp(self):
  1242. super(CommandsWithSpaces, self).setUp()
  1243. f, fname = tempfile.mkstemp(".py", "te st")
  1244. self.fname = fname.lower ()
  1245. os.write(f, b"import sys;"
  1246. b"sys.stdout.write('%d %s' % (len(sys.argv), [a.lower () for a in sys.argv]))"
  1247. )
  1248. os.close(f)
  1249. def tearDown(self):
  1250. os.remove(self.fname)
  1251. super(CommandsWithSpaces, self).tearDown()
  1252. def with_spaces(self, *args, **kwargs):
  1253. kwargs['stdout'] = subprocess.PIPE
  1254. p = subprocess.Popen(*args, **kwargs)
  1255. self.addCleanup(p.stdout.close)
  1256. self.assertEqual(
  1257. p.stdout.read ().decode("mbcs"),
  1258. "2 [%r, 'ab cd']" % self.fname
  1259. )
  1260. def test_shell_string_with_spaces(self):
  1261. # call() function with string argument with spaces on Windows
  1262. self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
  1263. "ab cd"), shell=1)
  1264. def test_shell_sequence_with_spaces(self):
  1265. # call() function with sequence argument with spaces on Windows
  1266. self.with_spaces([sys.executable, self.fname, "ab cd"], shell=1)
  1267. def test_noshell_string_with_spaces(self):
  1268. # call() function with string argument with spaces on Windows
  1269. self.with_spaces('"%s" "%s" "%s"' % (sys.executable, self.fname,
  1270. "ab cd"))
  1271. def test_noshell_sequence_with_spaces(self):
  1272. # call() function with sequence argument with spaces on Windows
  1273. self.with_spaces([sys.executable, self.fname, "ab cd"])
  1274. def test_main():
  1275. unit_tests = (ProcessTestCase,
  1276. POSIXProcessTestCase,
  1277. Win32ProcessTestCase,
  1278. ProcessTestCaseNoPoll,
  1279. HelperFunctionTests,
  1280. CommandsWithSpaces)
  1281. test_support.run_unittest(*unit_tests)
  1282. test_support.reap_children()
  1283. if __name__ == "__main__":
  1284. test_main()