test_os.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901
  1. # As a test suite for the os module, this is woefully inadequate, but this
  2. # does add tests for a few functions which have been determined to be more
  3. # portable than they had been thought to be.
  4. import os
  5. import errno
  6. import unittest
  7. import warnings
  8. import sys
  9. import signal
  10. import subprocess
  11. import sysconfig
  12. import textwrap
  13. import time
  14. try:
  15. import resource
  16. except ImportError:
  17. resource = None
  18. from test import test_support
  19. from test.script_helper import assert_python_ok
  20. import mmap
  21. import uuid
  22. warnings.filterwarnings("ignore", "tempnam", RuntimeWarning, __name__)
  23. warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning, __name__)
  24. # Tests creating TESTFN
  25. class FileTests(unittest.TestCase):
  26. def setUp(self):
  27. if os.path.exists(test_support.TESTFN):
  28. os.unlink(test_support.TESTFN)
  29. tearDown = setUp
  30. def test_access(self):
  31. f = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
  32. os.close(f)
  33. self.assertTrue(os.access(test_support.TESTFN, os.W_OK))
  34. def test_closerange(self):
  35. first = os.open(test_support.TESTFN, os.O_CREAT|os.O_RDWR)
  36. # We must allocate two consecutive file descriptors, otherwise
  37. # it will mess up other file descriptors (perhaps even the three
  38. # standard ones).
  39. second = os.dup(first)
  40. try:
  41. retries = 0
  42. while second != first + 1:
  43. os.close(first)
  44. retries += 1
  45. if retries > 10:
  46. # XXX test skipped
  47. self.skipTest("couldn't allocate two consecutive fds")
  48. first, second = second, os.dup(second)
  49. finally:
  50. os.close(second)
  51. # close a fd that is open, and one that isn't
  52. os.closerange(first, first + 2)
  53. self.assertRaises(OSError, os.write, first, "a")
  54. @test_support.cpython_only
  55. def test_rename(self):
  56. path = unicode(test_support.TESTFN)
  57. old = sys.getrefcount(path)
  58. self.assertRaises(TypeError, os.rename, path, 0)
  59. new = sys.getrefcount(path)
  60. self.assertEqual(old, new)
  61. class TemporaryFileTests(unittest.TestCase):
  62. def setUp(self):
  63. self.files = []
  64. os.mkdir(test_support.TESTFN)
  65. def tearDown(self):
  66. for name in self.files:
  67. os.unlink(name)
  68. os.rmdir(test_support.TESTFN)
  69. def check_tempfile(self, name):
  70. # make sure it doesn't already exist:
  71. self.assertFalse(os.path.exists(name),
  72. "file already exists for temporary file")
  73. # make sure we can create the file
  74. open(name, "w")
  75. self.files.append(name)
  76. @unittest.skipUnless(hasattr(os, 'tempnam'), 'test needs os.tempnam()')
  77. def test_tempnam(self):
  78. with warnings.catch_warnings():
  79. warnings.filterwarnings("ignore", "tempnam", RuntimeWarning,
  80. r"test_os$")
  81. warnings.filterwarnings("ignore", "tempnam", DeprecationWarning)
  82. self.check_tempfile(os.tempnam())
  83. name = os.tempnam(test_support.TESTFN)
  84. self.check_tempfile(name)
  85. name = os.tempnam(test_support.TESTFN, "pfx")
  86. self.assertTrue(os.path.basename(name)[:3] == "pfx")
  87. self.check_tempfile(name)
  88. @unittest.skipUnless(hasattr(os, 'tmpfile'), 'test needs os.tmpfile()')
  89. def test_tmpfile(self):
  90. # As with test_tmpnam() below, the Windows implementation of tmpfile()
  91. # attempts to create a file in the root directory of the current drive.
  92. # On Vista and Server 2008, this test will always fail for normal users
  93. # as writing to the root directory requires elevated privileges. With
  94. # XP and below, the semantics of tmpfile() are the same, but the user
  95. # running the test is more likely to have administrative privileges on
  96. # their account already. If that's the case, then os.tmpfile() should
  97. # work. In order to make this test as useful as possible, rather than
  98. # trying to detect Windows versions or whether or not the user has the
  99. # right permissions, just try and create a file in the root directory
  100. # and see if it raises a 'Permission denied' OSError. If it does, then
  101. # test that a subsequent call to os.tmpfile() raises the same error. If
  102. # it doesn't, assume we're on XP or below and the user running the test
  103. # has administrative privileges, and proceed with the test as normal.
  104. with warnings.catch_warnings():
  105. warnings.filterwarnings("ignore", "tmpfile", DeprecationWarning)
  106. if sys.platform == 'win32':
  107. name = '\\python_test_os_test_tmpfile.txt'
  108. if os.path.exists(name):
  109. os.remove(name)
  110. try:
  111. fp = open(name, 'w')
  112. except IOError, first:
  113. # open() failed, assert tmpfile() fails in the same way.
  114. # Although open() raises an IOError and os.tmpfile() raises an
  115. # OSError(), 'args' will be (13, 'Permission denied') in both
  116. # cases.
  117. try:
  118. fp = os.tmpfile()
  119. except OSError, second:
  120. self.assertEqual(first.args, second.args)
  121. else:
  122. self.fail("expected os.tmpfile() to raise OSError")
  123. return
  124. else:
  125. # open() worked, therefore, tmpfile() should work. Close our
  126. # dummy file and proceed with the test as normal.
  127. fp.close()
  128. os.remove(name)
  129. fp = os.tmpfile()
  130. fp.write("foobar")
  131. fp.seek(0,0)
  132. s = fp.read()
  133. fp.close()
  134. self.assertTrue(s == "foobar")
  135. @unittest.skipUnless(hasattr(os, 'tmpnam'), 'test needs os.tmpnam()')
  136. def test_tmpnam(self):
  137. with warnings.catch_warnings():
  138. warnings.filterwarnings("ignore", "tmpnam", RuntimeWarning,
  139. r"test_os$")
  140. warnings.filterwarnings("ignore", "tmpnam", DeprecationWarning)
  141. name = os.tmpnam()
  142. if sys.platform in ("win32",):
  143. # The Windows tmpnam() seems useless. From the MS docs:
  144. #
  145. # The character string that tmpnam creates consists of
  146. # the path prefix, defined by the entry P_tmpdir in the
  147. # file STDIO.H, followed by a sequence consisting of the
  148. # digit characters '0' through '9'; the numerical value
  149. # of this string is in the range 1 - 65,535. Changing the
  150. # definitions of L_tmpnam or P_tmpdir in STDIO.H does not
  151. # change the operation of tmpnam.
  152. #
  153. # The really bizarre part is that, at least under MSVC6,
  154. # P_tmpdir is "\\". That is, the path returned refers to
  155. # the root of the current drive. That's a terrible place to
  156. # put temp files, and, depending on privileges, the user
  157. # may not even be able to open a file in the root directory.
  158. self.assertFalse(os.path.exists(name),
  159. "file already exists for temporary file")
  160. else:
  161. self.check_tempfile(name)
  162. # Test attributes on return values from os.*stat* family.
  163. class StatAttributeTests(unittest.TestCase):
  164. def setUp(self):
  165. os.mkdir(test_support.TESTFN)
  166. self.fname = os.path.join(test_support.TESTFN, "f1")
  167. f = open(self.fname, 'wb')
  168. f.write("ABC")
  169. f.close()
  170. def tearDown(self):
  171. os.unlink(self.fname)
  172. os.rmdir(test_support.TESTFN)
  173. @unittest.skipUnless(hasattr(os, 'stat'), 'test needs os.stat()')
  174. def test_stat_attributes(self):
  175. import stat
  176. result = os.stat(self.fname)
  177. # Make sure direct access works
  178. self.assertEqual(result[stat.ST_SIZE], 3)
  179. self.assertEqual(result.st_size, 3)
  180. # Make sure all the attributes are there
  181. members = dir(result)
  182. for name in dir(stat):
  183. if name[:3] == 'ST_':
  184. attr = name.lower()
  185. if name.endswith("TIME"):
  186. def trunc(x): return int(x)
  187. else:
  188. def trunc(x): return x
  189. self.assertEqual(trunc(getattr(result, attr)),
  190. result[getattr(stat, name)])
  191. self.assertIn(attr, members)
  192. try:
  193. result[200]
  194. self.fail("No exception raised")
  195. except IndexError:
  196. pass
  197. # Make sure that assignment fails
  198. try:
  199. result.st_mode = 1
  200. self.fail("No exception raised")
  201. except (AttributeError, TypeError):
  202. pass
  203. try:
  204. result.st_rdev = 1
  205. self.fail("No exception raised")
  206. except (AttributeError, TypeError):
  207. pass
  208. try:
  209. result.parrot = 1
  210. self.fail("No exception raised")
  211. except AttributeError:
  212. pass
  213. # Use the stat_result constructor with a too-short tuple.
  214. try:
  215. result2 = os.stat_result((10,))
  216. self.fail("No exception raised")
  217. except TypeError:
  218. pass
  219. # Use the constructor with a too-long tuple.
  220. try:
  221. result2 = os.stat_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
  222. except TypeError:
  223. pass
  224. @unittest.skipUnless(hasattr(os, 'statvfs'), 'test needs os.statvfs()')
  225. def test_statvfs_attributes(self):
  226. try:
  227. result = os.statvfs(self.fname)
  228. except OSError, e:
  229. # On AtheOS, glibc always returns ENOSYS
  230. if e.errno == errno.ENOSYS:
  231. self.skipTest('glibc always returns ENOSYS on AtheOS')
  232. # Make sure direct access works
  233. self.assertEqual(result.f_bfree, result[3])
  234. # Make sure all the attributes are there.
  235. members = ('bsize', 'frsize', 'blocks', 'bfree', 'bavail', 'files',
  236. 'ffree', 'favail', 'flag', 'namemax')
  237. for value, member in enumerate(members):
  238. self.assertEqual(getattr(result, 'f_' + member), result[value])
  239. # Make sure that assignment really fails
  240. try:
  241. result.f_bfree = 1
  242. self.fail("No exception raised")
  243. except TypeError:
  244. pass
  245. try:
  246. result.parrot = 1
  247. self.fail("No exception raised")
  248. except AttributeError:
  249. pass
  250. # Use the constructor with a too-short tuple.
  251. try:
  252. result2 = os.statvfs_result((10,))
  253. self.fail("No exception raised")
  254. except TypeError:
  255. pass
  256. # Use the constructor with a too-long tuple.
  257. try:
  258. result2 = os.statvfs_result((0,1,2,3,4,5,6,7,8,9,10,11,12,13,14))
  259. except TypeError:
  260. pass
  261. def test_utime_dir(self):
  262. delta = 1000000
  263. st = os.stat(test_support.TESTFN)
  264. # round to int, because some systems may support sub-second
  265. # time stamps in stat, but not in utime.
  266. os.utime(test_support.TESTFN, (st.st_atime, int(st.st_mtime-delta)))
  267. st2 = os.stat(test_support.TESTFN)
  268. self.assertEqual(st2.st_mtime, int(st.st_mtime-delta))
  269. # Restrict tests to Win32, since there is no guarantee other
  270. # systems support centiseconds
  271. def get_file_system(path):
  272. if sys.platform == 'win32':
  273. root = os.path.splitdrive(os.path.abspath(path))[0] + '\\'
  274. import ctypes
  275. kernel32 = ctypes.windll.kernel32
  276. buf = ctypes.create_string_buffer("", 100)
  277. if kernel32.GetVolumeInformationA(root, None, 0, None, None, None, buf, len(buf)):
  278. return buf.value
  279. @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
  280. @unittest.skipUnless(get_file_system(test_support.TESTFN) == "NTFS",
  281. "requires NTFS")
  282. def test_1565150(self):
  283. t1 = 1159195039.25
  284. os.utime(self.fname, (t1, t1))
  285. self.assertEqual(os.stat(self.fname).st_mtime, t1)
  286. @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
  287. @unittest.skipUnless(get_file_system(test_support.TESTFN) == "NTFS",
  288. "requires NTFS")
  289. def test_large_time(self):
  290. t1 = 5000000000 # some day in 2128
  291. os.utime(self.fname, (t1, t1))
  292. self.assertEqual(os.stat(self.fname).st_mtime, t1)
  293. @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
  294. def test_1686475(self):
  295. # Verify that an open file can be stat'ed
  296. try:
  297. os.stat(r"c:\pagefile.sys")
  298. except WindowsError, e:
  299. if e.errno == 2: # file does not exist; cannot run test
  300. self.skipTest(r'c:\pagefile.sys does not exist')
  301. self.fail("Could not stat pagefile.sys")
  302. from test import mapping_tests
  303. class EnvironTests(mapping_tests.BasicTestMappingProtocol):
  304. """check that os.environ object conform to mapping protocol"""
  305. type2test = None
  306. def _reference(self):
  307. return {"KEY1":"VALUE1", "KEY2":"VALUE2", "KEY3":"VALUE3"}
  308. def _empty_mapping(self):
  309. os.environ.clear()
  310. return os.environ
  311. def setUp(self):
  312. self.__save = dict(os.environ)
  313. os.environ.clear()
  314. def tearDown(self):
  315. os.environ.clear()
  316. os.environ.update(self.__save)
  317. # Bug 1110478
  318. def test_update2(self):
  319. if os.path.exists("/bin/sh"):
  320. os.environ.update(HELLO="World")
  321. with os.popen("/bin/sh -c 'echo $HELLO'") as popen:
  322. value = popen.read().strip()
  323. self.assertEqual(value, "World")
  324. # On FreeBSD < 7 and OS X < 10.6, unsetenv() doesn't return a value (issue
  325. # #13415).
  326. @unittest.skipIf(sys.platform.startswith(('freebsd', 'darwin')),
  327. "due to known OS bug: see issue #13415")
  328. def test_unset_error(self):
  329. if sys.platform == "win32":
  330. # an environment variable is limited to 32,767 characters
  331. key = 'x' * 50000
  332. self.assertRaises(ValueError, os.environ.__delitem__, key)
  333. else:
  334. # "=" is not allowed in a variable name
  335. key = 'key='
  336. self.assertRaises(OSError, os.environ.__delitem__, key)
  337. class WalkTests(unittest.TestCase):
  338. """Tests for os.walk()."""
  339. def test_traversal(self):
  340. import os
  341. from os.path import join
  342. # Build:
  343. # TESTFN/
  344. # TEST1/ a file kid and two directory kids
  345. # tmp1
  346. # SUB1/ a file kid and a directory kid
  347. # tmp2
  348. # SUB11/ no kids
  349. # SUB2/ a file kid and a dirsymlink kid
  350. # tmp3
  351. # link/ a symlink to TESTFN.2
  352. # TEST2/
  353. # tmp4 a lone file
  354. walk_path = join(test_support.TESTFN, "TEST1")
  355. sub1_path = join(walk_path, "SUB1")
  356. sub11_path = join(sub1_path, "SUB11")
  357. sub2_path = join(walk_path, "SUB2")
  358. tmp1_path = join(walk_path, "tmp1")
  359. tmp2_path = join(sub1_path, "tmp2")
  360. tmp3_path = join(sub2_path, "tmp3")
  361. link_path = join(sub2_path, "link")
  362. t2_path = join(test_support.TESTFN, "TEST2")
  363. tmp4_path = join(test_support.TESTFN, "TEST2", "tmp4")
  364. # Create stuff.
  365. os.makedirs(sub11_path)
  366. os.makedirs(sub2_path)
  367. os.makedirs(t2_path)
  368. for path in tmp1_path, tmp2_path, tmp3_path, tmp4_path:
  369. f = file(path, "w")
  370. f.write("I'm " + path + " and proud of it. Blame test_os.\n")
  371. f.close()
  372. if hasattr(os, "symlink"):
  373. os.symlink(os.path.abspath(t2_path), link_path)
  374. sub2_tree = (sub2_path, ["link"], ["tmp3"])
  375. else:
  376. sub2_tree = (sub2_path, [], ["tmp3"])
  377. # Walk top-down.
  378. all = list(os.walk(walk_path))
  379. self.assertEqual(len(all), 4)
  380. # We can't know which order SUB1 and SUB2 will appear in.
  381. # Not flipped: TESTFN, SUB1, SUB11, SUB2
  382. # flipped: TESTFN, SUB2, SUB1, SUB11
  383. flipped = all[0][1][0] != "SUB1"
  384. all[0][1].sort()
  385. self.assertEqual(all[0], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
  386. self.assertEqual(all[1 + flipped], (sub1_path, ["SUB11"], ["tmp2"]))
  387. self.assertEqual(all[2 + flipped], (sub11_path, [], []))
  388. self.assertEqual(all[3 - 2 * flipped], sub2_tree)
  389. # Prune the search.
  390. all = []
  391. for root, dirs, files in os.walk(walk_path):
  392. all.append((root, dirs, files))
  393. # Don't descend into SUB1.
  394. if 'SUB1' in dirs:
  395. # Note that this also mutates the dirs we appended to all!
  396. dirs.remove('SUB1')
  397. self.assertEqual(len(all), 2)
  398. self.assertEqual(all[0], (walk_path, ["SUB2"], ["tmp1"]))
  399. self.assertEqual(all[1], sub2_tree)
  400. # Walk bottom-up.
  401. all = list(os.walk(walk_path, topdown=False))
  402. self.assertEqual(len(all), 4)
  403. # We can't know which order SUB1 and SUB2 will appear in.
  404. # Not flipped: SUB11, SUB1, SUB2, TESTFN
  405. # flipped: SUB2, SUB11, SUB1, TESTFN
  406. flipped = all[3][1][0] != "SUB1"
  407. all[3][1].sort()
  408. self.assertEqual(all[3], (walk_path, ["SUB1", "SUB2"], ["tmp1"]))
  409. self.assertEqual(all[flipped], (sub11_path, [], []))
  410. self.assertEqual(all[flipped + 1], (sub1_path, ["SUB11"], ["tmp2"]))
  411. self.assertEqual(all[2 - 2 * flipped], sub2_tree)
  412. if hasattr(os, "symlink"):
  413. # Walk, following symlinks.
  414. for root, dirs, files in os.walk(walk_path, followlinks=True):
  415. if root == link_path:
  416. self.assertEqual(dirs, [])
  417. self.assertEqual(files, ["tmp4"])
  418. break
  419. else:
  420. self.fail("Didn't follow symlink with followlinks=True")
  421. def tearDown(self):
  422. # Tear everything down. This is a decent use for bottom-up on
  423. # Windows, which doesn't have a recursive delete command. The
  424. # (not so) subtlety is that rmdir will fail unless the dir's
  425. # kids are removed first, so bottom up is essential.
  426. for root, dirs, files in os.walk(test_support.TESTFN, topdown=False):
  427. for name in files:
  428. os.remove(os.path.join(root, name))
  429. for name in dirs:
  430. dirname = os.path.join(root, name)
  431. if not os.path.islink(dirname):
  432. os.rmdir(dirname)
  433. else:
  434. os.remove(dirname)
  435. os.rmdir(test_support.TESTFN)
  436. class MakedirTests (unittest.TestCase):
  437. def setUp(self):
  438. os.mkdir(test_support.TESTFN)
  439. def test_makedir(self):
  440. base = test_support.TESTFN
  441. path = os.path.join(base, 'dir1', 'dir2', 'dir3')
  442. os.makedirs(path) # Should work
  443. path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4')
  444. os.makedirs(path)
  445. # Try paths with a '.' in them
  446. self.assertRaises(OSError, os.makedirs, os.curdir)
  447. path = os.path.join(base, 'dir1', 'dir2', 'dir3', 'dir4', 'dir5', os.curdir)
  448. os.makedirs(path)
  449. path = os.path.join(base, 'dir1', os.curdir, 'dir2', 'dir3', 'dir4',
  450. 'dir5', 'dir6')
  451. os.makedirs(path)
  452. def tearDown(self):
  453. path = os.path.join(test_support.TESTFN, 'dir1', 'dir2', 'dir3',
  454. 'dir4', 'dir5', 'dir6')
  455. # If the tests failed, the bottom-most directory ('../dir6')
  456. # may not have been created, so we look for the outermost directory
  457. # that exists.
  458. while not os.path.exists(path) and path != test_support.TESTFN:
  459. path = os.path.dirname(path)
  460. os.removedirs(path)
  461. class DevNullTests (unittest.TestCase):
  462. def test_devnull(self):
  463. f = file(os.devnull, 'w')
  464. f.write('hello')
  465. f.close()
  466. f = file(os.devnull, 'r')
  467. self.assertEqual(f.read(), '')
  468. f.close()
  469. class URandomTests (unittest.TestCase):
  470. def test_urandom_length(self):
  471. self.assertEqual(len(os.urandom(0)), 0)
  472. self.assertEqual(len(os.urandom(1)), 1)
  473. self.assertEqual(len(os.urandom(10)), 10)
  474. self.assertEqual(len(os.urandom(100)), 100)
  475. self.assertEqual(len(os.urandom(1000)), 1000)
  476. def test_urandom_value(self):
  477. data1 = os.urandom(16)
  478. data2 = os.urandom(16)
  479. self.assertNotEqual(data1, data2)
  480. def get_urandom_subprocess(self, count):
  481. # We need to use repr() and eval() to avoid line ending conversions
  482. # under Windows.
  483. code = '\n'.join((
  484. 'import os, sys',
  485. 'data = os.urandom(%s)' % count,
  486. 'sys.stdout.write(repr(data))',
  487. 'sys.stdout.flush()',
  488. 'print >> sys.stderr, (len(data), data)'))
  489. cmd_line = [sys.executable, '-c', code]
  490. p = subprocess.Popen(cmd_line, stdin=subprocess.PIPE,
  491. stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  492. out, err = p.communicate()
  493. self.assertEqual(p.wait(), 0, (p.wait(), err))
  494. out = eval(out)
  495. self.assertEqual(len(out), count, err)
  496. return out
  497. def test_urandom_subprocess(self):
  498. data1 = self.get_urandom_subprocess(16)
  499. data2 = self.get_urandom_subprocess(16)
  500. self.assertNotEqual(data1, data2)
  501. HAVE_GETENTROPY = (sysconfig.get_config_var('HAVE_GETENTROPY') == 1)
  502. @unittest.skipIf(HAVE_GETENTROPY,
  503. "getentropy() does not use a file descriptor")
  504. class URandomFDTests(unittest.TestCase):
  505. @unittest.skipUnless(resource, "test requires the resource module")
  506. def test_urandom_failure(self):
  507. # Check urandom() failing when it is not able to open /dev/random.
  508. # We spawn a new process to make the test more robust (if getrlimit()
  509. # failed to restore the file descriptor limit after this, the whole
  510. # test suite would crash; this actually happened on the OS X Tiger
  511. # buildbot).
  512. code = """if 1:
  513. import errno
  514. import os
  515. import resource
  516. soft_limit, hard_limit = resource.getrlimit(resource.RLIMIT_NOFILE)
  517. resource.setrlimit(resource.RLIMIT_NOFILE, (1, hard_limit))
  518. try:
  519. os.urandom(16)
  520. except OSError as e:
  521. assert e.errno == errno.EMFILE, e.errno
  522. else:
  523. raise AssertionError("OSError not raised")
  524. """
  525. assert_python_ok('-c', code)
  526. class ExecvpeTests(unittest.TestCase):
  527. def test_execvpe_with_bad_arglist(self):
  528. self.assertRaises(ValueError, os.execvpe, 'notepad', [], None)
  529. @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
  530. class Win32ErrorTests(unittest.TestCase):
  531. def test_rename(self):
  532. self.assertRaises(WindowsError, os.rename, test_support.TESTFN, test_support.TESTFN+".bak")
  533. def test_remove(self):
  534. self.assertRaises(WindowsError, os.remove, test_support.TESTFN)
  535. def test_chdir(self):
  536. self.assertRaises(WindowsError, os.chdir, test_support.TESTFN)
  537. def test_mkdir(self):
  538. f = open(test_support.TESTFN, "w")
  539. try:
  540. self.assertRaises(WindowsError, os.mkdir, test_support.TESTFN)
  541. finally:
  542. f.close()
  543. os.unlink(test_support.TESTFN)
  544. def test_utime(self):
  545. self.assertRaises(WindowsError, os.utime, test_support.TESTFN, None)
  546. def test_chmod(self):
  547. self.assertRaises(WindowsError, os.chmod, test_support.TESTFN, 0)
  548. class TestInvalidFD(unittest.TestCase):
  549. singles = ["fchdir", "fdopen", "dup", "fdatasync", "fstat",
  550. "fstatvfs", "fsync", "tcgetpgrp", "ttyname"]
  551. #singles.append("close")
  552. #We omit close because it doesn'r raise an exception on some platforms
  553. def get_single(f):
  554. def helper(self):
  555. if hasattr(os, f):
  556. self.check(getattr(os, f))
  557. return helper
  558. for f in singles:
  559. locals()["test_"+f] = get_single(f)
  560. def check(self, f, *args):
  561. try:
  562. f(test_support.make_bad_fd(), *args)
  563. except OSError as e:
  564. self.assertEqual(e.errno, errno.EBADF)
  565. else:
  566. self.fail("%r didn't raise a OSError with a bad file descriptor"
  567. % f)
  568. @unittest.skipUnless(hasattr(os, 'isatty'), 'test needs os.isatty()')
  569. def test_isatty(self):
  570. self.assertEqual(os.isatty(test_support.make_bad_fd()), False)
  571. @unittest.skipUnless(hasattr(os, 'closerange'), 'test needs os.closerange()')
  572. def test_closerange(self):
  573. fd = test_support.make_bad_fd()
  574. # Make sure none of the descriptors we are about to close are
  575. # currently valid (issue 6542).
  576. for i in range(10):
  577. try: os.fstat(fd+i)
  578. except OSError:
  579. pass
  580. else:
  581. break
  582. if i < 2:
  583. raise unittest.SkipTest(
  584. "Unable to acquire a range of invalid file descriptors")
  585. self.assertEqual(os.closerange(fd, fd + i-1), None)
  586. @unittest.skipUnless(hasattr(os, 'dup2'), 'test needs os.dup2()')
  587. def test_dup2(self):
  588. self.check(os.dup2, 20)
  589. @unittest.skipUnless(hasattr(os, 'fchmod'), 'test needs os.fchmod()')
  590. def test_fchmod(self):
  591. self.check(os.fchmod, 0)
  592. @unittest.skipUnless(hasattr(os, 'fchown'), 'test needs os.fchown()')
  593. def test_fchown(self):
  594. self.check(os.fchown, -1, -1)
  595. @unittest.skipUnless(hasattr(os, 'fpathconf'), 'test needs os.fpathconf()')
  596. def test_fpathconf(self):
  597. self.check(os.fpathconf, "PC_NAME_MAX")
  598. @unittest.skipUnless(hasattr(os, 'ftruncate'), 'test needs os.ftruncate()')
  599. def test_ftruncate(self):
  600. self.check(os.ftruncate, 0)
  601. @unittest.skipUnless(hasattr(os, 'lseek'), 'test needs os.lseek()')
  602. def test_lseek(self):
  603. self.check(os.lseek, 0, 0)
  604. @unittest.skipUnless(hasattr(os, 'read'), 'test needs os.read()')
  605. def test_read(self):
  606. self.check(os.read, 1)
  607. @unittest.skipUnless(hasattr(os, 'tcsetpgrp'), 'test needs os.tcsetpgrp()')
  608. def test_tcsetpgrpt(self):
  609. self.check(os.tcsetpgrp, 0)
  610. @unittest.skipUnless(hasattr(os, 'write'), 'test needs os.write()')
  611. def test_write(self):
  612. self.check(os.write, " ")
  613. @unittest.skipIf(sys.platform == "win32", "Posix specific tests")
  614. class PosixUidGidTests(unittest.TestCase):
  615. @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()')
  616. def test_setuid(self):
  617. if os.getuid() != 0:
  618. self.assertRaises(os.error, os.setuid, 0)
  619. self.assertRaises(OverflowError, os.setuid, 1<<32)
  620. @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()')
  621. def test_setgid(self):
  622. if os.getuid() != 0:
  623. self.assertRaises(os.error, os.setgid, 0)
  624. self.assertRaises(OverflowError, os.setgid, 1<<32)
  625. @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()')
  626. def test_seteuid(self):
  627. if os.getuid() != 0:
  628. self.assertRaises(os.error, os.seteuid, 0)
  629. self.assertRaises(OverflowError, os.seteuid, 1<<32)
  630. @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()')
  631. def test_setegid(self):
  632. if os.getuid() != 0:
  633. self.assertRaises(os.error, os.setegid, 0)
  634. self.assertRaises(OverflowError, os.setegid, 1<<32)
  635. @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
  636. def test_setreuid(self):
  637. if os.getuid() != 0:
  638. self.assertRaises(os.error, os.setreuid, 0, 0)
  639. self.assertRaises(OverflowError, os.setreuid, 1<<32, 0)
  640. self.assertRaises(OverflowError, os.setreuid, 0, 1<<32)
  641. @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()')
  642. def test_setreuid_neg1(self):
  643. # Needs to accept -1. We run this in a subprocess to avoid
  644. # altering the test runner's process state (issue8045).
  645. subprocess.check_call([
  646. sys.executable, '-c',
  647. 'import os,sys;os.setreuid(-1,-1);sys.exit(0)'])
  648. @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
  649. def test_setregid(self):
  650. if os.getuid() != 0:
  651. self.assertRaises(os.error, os.setregid, 0, 0)
  652. self.assertRaises(OverflowError, os.setregid, 1<<32, 0)
  653. self.assertRaises(OverflowError, os.setregid, 0, 1<<32)
  654. @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()')
  655. def test_setregid_neg1(self):
  656. # Needs to accept -1. We run this in a subprocess to avoid
  657. # altering the test runner's process state (issue8045).
  658. subprocess.check_call([
  659. sys.executable, '-c',
  660. 'import os,sys;os.setregid(-1,-1);sys.exit(0)'])
  661. @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests")
  662. class Win32KillTests(unittest.TestCase):
  663. def _kill(self, sig):
  664. # Start sys.executable as a subprocess and communicate from the
  665. # subprocess to the parent that the interpreter is ready. When it
  666. # becomes ready, send *sig* via os.kill to the subprocess and check
  667. # that the return code is equal to *sig*.
  668. import ctypes
  669. from ctypes import wintypes
  670. import msvcrt
  671. # Since we can't access the contents of the process' stdout until the
  672. # process has exited, use PeekNamedPipe to see what's inside stdout
  673. # without waiting. This is done so we can tell that the interpreter
  674. # is started and running at a point where it could handle a signal.
  675. PeekNamedPipe = ctypes.windll.kernel32.PeekNamedPipe
  676. PeekNamedPipe.restype = wintypes.BOOL
  677. PeekNamedPipe.argtypes = (wintypes.HANDLE, # Pipe handle
  678. ctypes.POINTER(ctypes.c_char), # stdout buf
  679. wintypes.DWORD, # Buffer size
  680. ctypes.POINTER(wintypes.DWORD), # bytes read
  681. ctypes.POINTER(wintypes.DWORD), # bytes avail
  682. ctypes.POINTER(wintypes.DWORD)) # bytes left
  683. msg = "running"
  684. proc = subprocess.Popen([sys.executable, "-c",
  685. "import sys;"
  686. "sys.stdout.write('{}');"
  687. "sys.stdout.flush();"
  688. "input()".format(msg)],
  689. stdout=subprocess.PIPE,
  690. stderr=subprocess.PIPE,
  691. stdin=subprocess.PIPE)
  692. self.addCleanup(proc.stdout.close)
  693. self.addCleanup(proc.stderr.close)
  694. self.addCleanup(proc.stdin.close)
  695. count, max = 0, 100
  696. while count < max and proc.poll() is None:
  697. # Create a string buffer to store the result of stdout from the pipe
  698. buf = ctypes.create_string_buffer(len(msg))
  699. # Obtain the text currently in proc.stdout
  700. # Bytes read/avail/left are left as NULL and unused
  701. rslt = PeekNamedPipe(msvcrt.get_osfhandle(proc.stdout.fileno()),
  702. buf, ctypes.sizeof(buf), None, None, None)
  703. self.assertNotEqual(rslt, 0, "PeekNamedPipe failed")
  704. if buf.value:
  705. self.assertEqual(msg, buf.value)
  706. break
  707. time.sleep(0.1)
  708. count += 1
  709. else:
  710. self.fail("Did not receive communication from the subprocess")
  711. os.kill(proc.pid, sig)
  712. self.assertEqual(proc.wait(), sig)
  713. def test_kill_sigterm(self):
  714. # SIGTERM doesn't mean anything special, but make sure it works
  715. self._kill(signal.SIGTERM)
  716. def test_kill_int(self):
  717. # os.kill on Windows can take an int which gets set as the exit code
  718. self._kill(100)
  719. def _kill_with_event(self, event, name):
  720. tagname = "test_os_%s" % uuid.uuid1()
  721. m = mmap.mmap(-1, 1, tagname)
  722. m[0] = '0'
  723. # Run a script which has console control handling enabled.
  724. proc = subprocess.Popen([sys.executable,
  725. os.path.join(os.path.dirname(__file__),
  726. "win_console_handler.py"), tagname],
  727. creationflags=subprocess.CREATE_NEW_PROCESS_GROUP)
  728. # Let the interpreter startup before we send signals. See #3137.
  729. count, max = 0, 20
  730. while count < max and proc.poll() is None:
  731. if m[0] == '1':
  732. break
  733. time.sleep(0.5)
  734. count += 1
  735. else:
  736. self.fail("Subprocess didn't finish initialization")
  737. os.kill(proc.pid, event)
  738. # proc.send_signal(event) could also be done here.
  739. # Allow time for the signal to be passed and the process to exit.
  740. time.sleep(0.5)
  741. if not proc.poll():
  742. # Forcefully kill the process if we weren't able to signal it.
  743. os.kill(proc.pid, signal.SIGINT)
  744. self.fail("subprocess did not stop on {}".format(name))
  745. @unittest.skip("subprocesses aren't inheriting Ctrl+C property")
  746. def test_CTRL_C_EVENT(self):
  747. from ctypes import wintypes
  748. import ctypes
  749. # Make a NULL value by creating a pointer with no argument.
  750. NULL = ctypes.POINTER(ctypes.c_int)()
  751. SetConsoleCtrlHandler = ctypes.windll.kernel32.SetConsoleCtrlHandler
  752. SetConsoleCtrlHandler.argtypes = (ctypes.POINTER(ctypes.c_int),
  753. wintypes.BOOL)
  754. SetConsoleCtrlHandler.restype = wintypes.BOOL
  755. # Calling this with NULL and FALSE causes the calling process to
  756. # handle Ctrl+C, rather than ignore it. This property is inherited
  757. # by subprocesses.
  758. SetConsoleCtrlHandler(NULL, 0)
  759. self._kill_with_event(signal.CTRL_C_EVENT, "CTRL_C_EVENT")
  760. def test_CTRL_BREAK_EVENT(self):
  761. self._kill_with_event(signal.CTRL_BREAK_EVENT, "CTRL_BREAK_EVENT")
  762. def test_main():
  763. test_support.run_unittest(
  764. FileTests,
  765. TemporaryFileTests,
  766. StatAttributeTests,
  767. EnvironTests,
  768. WalkTests,
  769. MakedirTests,
  770. DevNullTests,
  771. URandomTests,
  772. URandomFDTests,
  773. ExecvpeTests,
  774. Win32ErrorTests,
  775. TestInvalidFD,
  776. PosixUidGidTests,
  777. Win32KillTests
  778. )
  779. if __name__ == "__main__":
  780. test_main()