test_tempfile.py 35 KB


  1. # tempfile.py unit tests.
  2. import tempfile
  3. import errno
  4. import io
  5. import os
  6. import signal
  7. import shutil
  8. import sys
  9. import re
  10. import warnings
  11. import contextlib
  12. import unittest
  13. from test import test_support as support
  14. warnings.filterwarnings("ignore",
  15. category=RuntimeWarning,
  16. message="mktemp", module=__name__)
  17. if hasattr(os, 'stat'):
  18. import stat
  19. has_stat = 1
  20. else:
  21. has_stat = 0
  22. has_textmode = (tempfile._text_openflags != tempfile._bin_openflags)
  23. has_spawnl = hasattr(os, 'spawnl')
  24. # TEST_FILES may need to be tweaked for systems depending on the maximum
  25. # number of files that can be opened at one time (see ulimit -n)
  26. if sys.platform in ('openbsd3', 'openbsd4'):
  27. TEST_FILES = 48
  28. else:
  29. TEST_FILES = 100
  30. # This is organized as one test for each chunk of code in tempfile.py,
  31. # in order of their appearance in the file. Testing which requires
  32. # threads is not done here.
  33. # Common functionality.
  34. class TC(unittest.TestCase):
  35. str_check = re.compile(r"[a-zA-Z0-9_-]{6}$")
  36. def failOnException(self, what, ei=None):
  37. if ei is None:
  38. ei = sys.exc_info()
  39. self.fail("%s raised %s: %s" % (what, ei[0], ei[1]))
  40. def nameCheck(self, name, dir, pre, suf):
  41. (ndir, nbase) = os.path.split(name)
  42. npre = nbase[:len(pre)]
  43. nsuf = nbase[len(nbase)-len(suf):]
  44. # check for equality of the absolute paths!
  45. self.assertEqual(os.path.abspath(ndir), os.path.abspath(dir),
  46. "file '%s' not in directory '%s'" % (name, dir))
  47. self.assertEqual(npre, pre,
  48. "file '%s' does not begin with '%s'" % (nbase, pre))
  49. self.assertEqual(nsuf, suf,
  50. "file '%s' does not end with '%s'" % (nbase, suf))
  51. nbase = nbase[len(pre):len(nbase)-len(suf)]
  52. self.assertTrue(self.str_check.match(nbase),
  53. "random string '%s' does not match /^[a-zA-Z0-9_-]{6}$/"
  54. % nbase)
  55. test_classes = []
  56. class test_exports(TC):
  57. def test_exports(self):
  58. # There are no surprising symbols in the tempfile module
  59. dict = tempfile.__dict__
  60. expected = {
  61. "NamedTemporaryFile" : 1,
  62. "TemporaryFile" : 1,
  63. "mkstemp" : 1,
  64. "mkdtemp" : 1,
  65. "mktemp" : 1,
  66. "TMP_MAX" : 1,
  67. "gettempprefix" : 1,
  68. "gettempdir" : 1,
  69. "tempdir" : 1,
  70. "template" : 1,
  71. "SpooledTemporaryFile" : 1
  72. }
  73. unexp = []
  74. for key in dict:
  75. if key[0] != '_' and key not in expected:
  76. unexp.append(key)
  77. self.assertTrue(len(unexp) == 0,
  78. "unexpected keys: %s" % unexp)
  79. test_classes.append(test_exports)
  80. class test__RandomNameSequence(TC):
  81. """Test the internal iterator object _RandomNameSequence."""
  82. def setUp(self):
  83. self.r = tempfile._RandomNameSequence()
  84. def test_get_six_char_str(self):
  85. # _RandomNameSequence returns a six-character string
  86. s = self.r.next()
  87. self.nameCheck(s, '', '', '')
  88. def test_many(self):
  89. # _RandomNameSequence returns no duplicate strings (stochastic)
  90. dict = {}
  91. r = self.r
  92. for i in xrange(TEST_FILES):
  93. s = r.next()
  94. self.nameCheck(s, '', '', '')
  95. self.assertNotIn(s, dict)
  96. dict[s] = 1
  97. def test_supports_iter(self):
  98. # _RandomNameSequence supports the iterator protocol
  99. i = 0
  100. r = self.r
  101. try:
  102. for s in r:
  103. i += 1
  104. if i == 20:
  105. break
  106. except:
  107. self.failOnException("iteration")
  108. @unittest.skipUnless(hasattr(os, 'fork'),
  109. "os.fork is required for this test")
  110. def test_process_awareness(self):
  111. # ensure that the random source differs between
  112. # child and parent.
  113. read_fd, write_fd = os.pipe()
  114. pid = None
  115. try:
  116. pid = os.fork()
  117. if not pid:
  118. os.close(read_fd)
  119. os.write(write_fd, next(self.r).encode("ascii"))
  120. os.close(write_fd)
  121. # bypass the normal exit handlers- leave those to
  122. # the parent.
  123. os._exit(0)
  124. parent_value = next(self.r)
  125. child_value = os.read(read_fd, len(parent_value)).decode("ascii")
  126. finally:
  127. if pid:
  128. # best effort to ensure the process can't bleed out
  129. # via any bugs above
  130. try:
  131. os.kill(pid, signal.SIGKILL)
  132. except EnvironmentError:
  133. pass
  134. os.close(read_fd)
  135. os.close(write_fd)
  136. self.assertNotEqual(child_value, parent_value)
  137. test_classes.append(test__RandomNameSequence)
  138. class test__candidate_tempdir_list(TC):
  139. """Test the internal function _candidate_tempdir_list."""
  140. def test_nonempty_list(self):
  141. # _candidate_tempdir_list returns a nonempty list of strings
  142. cand = tempfile._candidate_tempdir_list()
  143. self.assertFalse(len(cand) == 0)
  144. for c in cand:
  145. self.assertIsInstance(c, basestring)
  146. def test_wanted_dirs(self):
  147. # _candidate_tempdir_list contains the expected directories
  148. # Make sure the interesting environment variables are all set.
  149. with support.EnvironmentVarGuard() as env:
  150. for envname in 'TMPDIR', 'TEMP', 'TMP':
  151. dirname = os.getenv(envname)
  152. if not dirname:
  153. env[envname] = os.path.abspath(envname)
  154. cand = tempfile._candidate_tempdir_list()
  155. for envname in 'TMPDIR', 'TEMP', 'TMP':
  156. dirname = os.getenv(envname)
  157. if not dirname: raise ValueError
  158. self.assertIn(dirname, cand)
  159. try:
  160. dirname = os.getcwd()
  161. except (AttributeError, os.error):
  162. dirname = os.curdir
  163. self.assertIn(dirname, cand)
  164. # Not practical to try to verify the presence of OS-specific
  165. # paths in this list.
  166. test_classes.append(test__candidate_tempdir_list)
  167. # We test _get_default_tempdir some more by testing gettempdir.
  168. class TestGetDefaultTempdir(TC):
  169. """Test _get_default_tempdir()."""
  170. def test_no_files_left_behind(self):
  171. # use a private empty directory
  172. our_temp_directory = tempfile.mkdtemp()
  173. try:
  174. # force _get_default_tempdir() to consider our empty directory
  175. def our_candidate_list():
  176. return [our_temp_directory]
  177. with support.swap_attr(tempfile, "_candidate_tempdir_list",
  178. our_candidate_list):
  179. # verify our directory is empty after _get_default_tempdir()
  180. tempfile._get_default_tempdir()
  181. self.assertEqual(os.listdir(our_temp_directory), [])
  182. def raise_OSError(*args, **kwargs):
  183. raise OSError(-1)
  184. with support.swap_attr(io, "open", raise_OSError):
  185. # test again with failing io.open()
  186. with self.assertRaises(IOError) as cm:
  187. tempfile._get_default_tempdir()
  188. self.assertEqual(cm.exception.errno, errno.ENOENT)
  189. self.assertEqual(os.listdir(our_temp_directory), [])
  190. open = io.open
  191. def bad_writer(*args, **kwargs):
  192. fp = open(*args, **kwargs)
  193. fp.write = raise_OSError
  194. return fp
  195. with support.swap_attr(io, "open", bad_writer):
  196. # test again with failing write()
  197. with self.assertRaises(IOError) as cm:
  198. tempfile._get_default_tempdir()
  199. self.assertEqual(cm.exception.errno, errno.ENOENT)
  200. self.assertEqual(os.listdir(our_temp_directory), [])
  201. finally:
  202. shutil.rmtree(our_temp_directory)
  203. test_classes.append(TestGetDefaultTempdir)
  204. class test__get_candidate_names(TC):
  205. """Test the internal function _get_candidate_names."""
  206. def test_retval(self):
  207. # _get_candidate_names returns a _RandomNameSequence object
  208. obj = tempfile._get_candidate_names()
  209. self.assertIsInstance(obj, tempfile._RandomNameSequence)
  210. def test_same_thing(self):
  211. # _get_candidate_names always returns the same object
  212. a = tempfile._get_candidate_names()
  213. b = tempfile._get_candidate_names()
  214. self.assertTrue(a is b)
  215. test_classes.append(test__get_candidate_names)
  216. @contextlib.contextmanager
  217. def _inside_empty_temp_dir():
  218. dir = tempfile.mkdtemp()
  219. try:
  220. with support.swap_attr(tempfile, 'tempdir', dir):
  221. yield
  222. finally:
  223. support.rmtree(dir)
  224. def _mock_candidate_names(*names):
  225. return support.swap_attr(tempfile,
  226. '_get_candidate_names',
  227. lambda: iter(names))
  228. class TestBadTempdir:
  229. def test_read_only_directory(self):
  230. with _inside_empty_temp_dir():
  231. oldmode = mode = os.stat(tempfile.tempdir).st_mode
  232. mode &= ~(stat.S_IWUSR | stat.S_IWGRP | stat.S_IWOTH)
  233. os.chmod(tempfile.tempdir, mode)
  234. try:
  235. if os.access(tempfile.tempdir, os.W_OK):
  236. self.skipTest("can't set the directory read-only")
  237. with self.assertRaises(OSError) as cm:
  238. self.make_temp()
  239. self.assertIn(cm.exception.errno, (errno.EPERM, errno.EACCES))
  240. self.assertEqual(os.listdir(tempfile.tempdir), [])
  241. finally:
  242. os.chmod(tempfile.tempdir, oldmode)
  243. def test_nonexisting_directory(self):
  244. with _inside_empty_temp_dir():
  245. tempdir = os.path.join(tempfile.tempdir, 'nonexistent')
  246. with support.swap_attr(tempfile, 'tempdir', tempdir):
  247. with self.assertRaises(OSError) as cm:
  248. self.make_temp()
  249. self.assertEqual(cm.exception.errno, errno.ENOENT)
  250. def test_non_directory(self):
  251. with _inside_empty_temp_dir():
  252. tempdir = os.path.join(tempfile.tempdir, 'file')
  253. open(tempdir, 'wb').close()
  254. with support.swap_attr(tempfile, 'tempdir', tempdir):
  255. with self.assertRaises(OSError) as cm:
  256. self.make_temp()
  257. self.assertIn(cm.exception.errno, (errno.ENOTDIR, errno.ENOENT))
  258. class test__mkstemp_inner(TestBadTempdir, TC):
  259. """Test the internal function _mkstemp_inner."""
  260. class mkstemped:
  261. _bflags = tempfile._bin_openflags
  262. _tflags = tempfile._text_openflags
  263. _close = os.close
  264. _unlink = os.unlink
  265. def __init__(self, dir, pre, suf, bin):
  266. if bin: flags = self._bflags
  267. else: flags = self._tflags
  268. (self.fd, self.name) = tempfile._mkstemp_inner(dir, pre, suf, flags)
  269. def write(self, str):
  270. os.write(self.fd, str)
  271. def __del__(self):
  272. self._close(self.fd)
  273. self._unlink(self.name)
  274. def do_create(self, dir=None, pre="", suf="", bin=1):
  275. if dir is None:
  276. dir = tempfile.gettempdir()
  277. try:
  278. file = self.mkstemped(dir, pre, suf, bin)
  279. except:
  280. self.failOnException("_mkstemp_inner")
  281. self.nameCheck(file.name, dir, pre, suf)
  282. return file
  283. def test_basic(self):
  284. # _mkstemp_inner can create files
  285. self.do_create().write("blat")
  286. self.do_create(pre="a").write("blat")
  287. self.do_create(suf="b").write("blat")
  288. self.do_create(pre="a", suf="b").write("blat")
  289. self.do_create(pre="aa", suf=".txt").write("blat")
  290. def test_basic_many(self):
  291. # _mkstemp_inner can create many files (stochastic)
  292. extant = range(TEST_FILES)
  293. for i in extant:
  294. extant[i] = self.do_create(pre="aa")
  295. def test_choose_directory(self):
  296. # _mkstemp_inner can create files in a user-selected directory
  297. dir = tempfile.mkdtemp()
  298. try:
  299. self.do_create(dir=dir).write("blat")
  300. finally:
  301. os.rmdir(dir)
  302. @unittest.skipUnless(has_stat, 'os.stat not available')
  303. def test_file_mode(self):
  304. # _mkstemp_inner creates files with the proper mode
  305. file = self.do_create()
  306. mode = stat.S_IMODE(os.stat(file.name).st_mode)
  307. expected = 0600
  308. if sys.platform in ('win32', 'os2emx'):
  309. # There's no distinction among 'user', 'group' and 'world';
  310. # replicate the 'user' bits.
  311. user = expected >> 6
  312. expected = user * (1 + 8 + 64)
  313. self.assertEqual(mode, expected)
  314. @unittest.skipUnless(has_spawnl, 'os.spawnl not available')
  315. def test_noinherit(self):
  316. # _mkstemp_inner file handles are not inherited by child processes
  317. if support.verbose:
  318. v="v"
  319. else:
  320. v="q"
  321. file = self.do_create()
  322. fd = "%d" % file.fd
  323. try:
  324. me = __file__
  325. except NameError:
  326. me = sys.argv[0]
  327. # We have to exec something, so that FD_CLOEXEC will take
  328. # effect. The core of this test is therefore in
  329. # tf_inherit_check.py, which see.
  330. tester = os.path.join(os.path.dirname(os.path.abspath(me)),
  331. "tf_inherit_check.py")
  332. # On Windows a spawn* /path/ with embedded spaces shouldn't be quoted,
  333. # but an arg with embedded spaces should be decorated with double
  334. # quotes on each end
  335. if sys.platform in ('win32',):
  336. decorated = '"%s"' % sys.executable
  337. tester = '"%s"' % tester
  338. else:
  339. decorated = sys.executable
  340. retval = os.spawnl(os.P_WAIT, sys.executable, decorated, tester, v, fd)
  341. self.assertFalse(retval < 0,
  342. "child process caught fatal signal %d" % -retval)
  343. self.assertFalse(retval > 0, "child process reports failure %d"%retval)
  344. @unittest.skipUnless(has_textmode, "text mode not available")
  345. def test_textmode(self):
  346. # _mkstemp_inner can create files in text mode
  347. self.do_create(bin=0).write("blat\n")
  348. # XXX should test that the file really is a text file
  349. def make_temp(self):
  350. return tempfile._mkstemp_inner(tempfile.gettempdir(),
  351. tempfile.template,
  352. '',
  353. tempfile._bin_openflags)
  354. def test_collision_with_existing_file(self):
  355. # _mkstemp_inner tries another name when a file with
  356. # the chosen name already exists
  357. with _inside_empty_temp_dir(), \
  358. _mock_candidate_names('aaa', 'aaa', 'bbb'):
  359. (fd1, name1) = self.make_temp()
  360. os.close(fd1)
  361. self.assertTrue(name1.endswith('aaa'))
  362. (fd2, name2) = self.make_temp()
  363. os.close(fd2)
  364. self.assertTrue(name2.endswith('bbb'))
  365. def test_collision_with_existing_directory(self):
  366. # _mkstemp_inner tries another name when a directory with
  367. # the chosen name already exists
  368. with _inside_empty_temp_dir(), \
  369. _mock_candidate_names('aaa', 'aaa', 'bbb'):
  370. dir = tempfile.mkdtemp()
  371. self.assertTrue(dir.endswith('aaa'))
  372. (fd, name) = self.make_temp()
  373. os.close(fd)
  374. self.assertTrue(name.endswith('bbb'))
  375. test_classes.append(test__mkstemp_inner)
  376. class test_gettempprefix(TC):
  377. """Test gettempprefix()."""
  378. def test_sane_template(self):
  379. # gettempprefix returns a nonempty prefix string
  380. p = tempfile.gettempprefix()
  381. self.assertIsInstance(p, basestring)
  382. self.assertTrue(len(p) > 0)
  383. def test_usable_template(self):
  384. # gettempprefix returns a usable prefix string
  385. # Create a temp directory, avoiding use of the prefix.
  386. # Then attempt to create a file whose name is
  387. # prefix + 'xxxxxx.xxx' in that directory.
  388. p = tempfile.gettempprefix() + "xxxxxx.xxx"
  389. d = tempfile.mkdtemp(prefix="")
  390. try:
  391. p = os.path.join(d, p)
  392. try:
  393. fd = os.open(p, os.O_RDWR | os.O_CREAT)
  394. except:
  395. self.failOnException("os.open")
  396. os.close(fd)
  397. os.unlink(p)
  398. finally:
  399. os.rmdir(d)
  400. test_classes.append(test_gettempprefix)
  401. class test_gettempdir(TC):
  402. """Test gettempdir()."""
  403. def test_directory_exists(self):
  404. # gettempdir returns a directory which exists
  405. dir = tempfile.gettempdir()
  406. self.assertTrue(os.path.isabs(dir) or dir == os.curdir,
  407. "%s is not an absolute path" % dir)
  408. self.assertTrue(os.path.isdir(dir),
  409. "%s is not a directory" % dir)
  410. def test_directory_writable(self):
  411. # gettempdir returns a directory writable by the user
  412. # sneaky: just instantiate a NamedTemporaryFile, which
  413. # defaults to writing into the directory returned by
  414. # gettempdir.
  415. try:
  416. file = tempfile.NamedTemporaryFile()
  417. file.write("blat")
  418. file.close()
  419. except:
  420. self.failOnException("create file in %s" % tempfile.gettempdir())
  421. def test_same_thing(self):
  422. # gettempdir always returns the same object
  423. a = tempfile.gettempdir()
  424. b = tempfile.gettempdir()
  425. self.assertTrue(a is b)
  426. test_classes.append(test_gettempdir)
  427. class test_mkstemp(TC):
  428. """Test mkstemp()."""
  429. def do_create(self, dir=None, pre="", suf=""):
  430. if dir is None:
  431. dir = tempfile.gettempdir()
  432. try:
  433. (fd, name) = tempfile.mkstemp(dir=dir, prefix=pre, suffix=suf)
  434. (ndir, nbase) = os.path.split(name)
  435. adir = os.path.abspath(dir)
  436. self.assertEqual(adir, ndir,
  437. "Directory '%s' incorrectly returned as '%s'" % (adir, ndir))
  438. except:
  439. self.failOnException("mkstemp")
  440. try:
  441. self.nameCheck(name, dir, pre, suf)
  442. finally:
  443. os.close(fd)
  444. os.unlink(name)
  445. def test_basic(self):
  446. # mkstemp can create files
  447. self.do_create()
  448. self.do_create(pre="a")
  449. self.do_create(suf="b")
  450. self.do_create(pre="a", suf="b")
  451. self.do_create(pre="aa", suf=".txt")
  452. self.do_create(dir=".")
  453. def test_choose_directory(self):
  454. # mkstemp can create directories in a user-selected directory
  455. dir = tempfile.mkdtemp()
  456. try:
  457. self.do_create(dir=dir)
  458. finally:
  459. os.rmdir(dir)
  460. test_classes.append(test_mkstemp)
  461. class test_mkdtemp(TestBadTempdir, TC):
  462. """Test mkdtemp()."""
  463. def make_temp(self):
  464. return tempfile.mkdtemp()
  465. def do_create(self, dir=None, pre="", suf=""):
  466. if dir is None:
  467. dir = tempfile.gettempdir()
  468. try:
  469. name = tempfile.mkdtemp(dir=dir, prefix=pre, suffix=suf)
  470. except:
  471. self.failOnException("mkdtemp")
  472. try:
  473. self.nameCheck(name, dir, pre, suf)
  474. return name
  475. except:
  476. os.rmdir(name)
  477. raise
  478. def test_basic(self):
  479. # mkdtemp can create directories
  480. os.rmdir(self.do_create())
  481. os.rmdir(self.do_create(pre="a"))
  482. os.rmdir(self.do_create(suf="b"))
  483. os.rmdir(self.do_create(pre="a", suf="b"))
  484. os.rmdir(self.do_create(pre="aa", suf=".txt"))
  485. def test_basic_many(self):
  486. # mkdtemp can create many directories (stochastic)
  487. extant = range(TEST_FILES)
  488. try:
  489. for i in extant:
  490. extant[i] = self.do_create(pre="aa")
  491. finally:
  492. for i in extant:
  493. if(isinstance(i, basestring)):
  494. os.rmdir(i)
  495. def test_choose_directory(self):
  496. # mkdtemp can create directories in a user-selected directory
  497. dir = tempfile.mkdtemp()
  498. try:
  499. os.rmdir(self.do_create(dir=dir))
  500. finally:
  501. os.rmdir(dir)
  502. @unittest.skipUnless(has_stat, 'os.stat not available')
  503. def test_mode(self):
  504. # mkdtemp creates directories with the proper mode
  505. dir = self.do_create()
  506. try:
  507. mode = stat.S_IMODE(os.stat(dir).st_mode)
  508. mode &= 0777 # Mask off sticky bits inherited from /tmp
  509. expected = 0700
  510. if sys.platform in ('win32', 'os2emx'):
  511. # There's no distinction among 'user', 'group' and 'world';
  512. # replicate the 'user' bits.
  513. user = expected >> 6
  514. expected = user * (1 + 8 + 64)
  515. self.assertEqual(mode, expected)
  516. finally:
  517. os.rmdir(dir)
  518. def test_collision_with_existing_file(self):
  519. # mkdtemp tries another name when a file with
  520. # the chosen name already exists
  521. with _inside_empty_temp_dir(), \
  522. _mock_candidate_names('aaa', 'aaa', 'bbb'):
  523. file = tempfile.NamedTemporaryFile(delete=False)
  524. file.close()
  525. self.assertTrue(file.name.endswith('aaa'))
  526. dir = tempfile.mkdtemp()
  527. self.assertTrue(dir.endswith('bbb'))
  528. def test_collision_with_existing_directory(self):
  529. # mkdtemp tries another name when a directory with
  530. # the chosen name already exists
  531. with _inside_empty_temp_dir(), \
  532. _mock_candidate_names('aaa', 'aaa', 'bbb'):
  533. dir1 = tempfile.mkdtemp()
  534. self.assertTrue(dir1.endswith('aaa'))
  535. dir2 = tempfile.mkdtemp()
  536. self.assertTrue(dir2.endswith('bbb'))
  537. test_classes.append(test_mkdtemp)
  538. class test_mktemp(TC):
  539. """Test mktemp()."""
  540. # For safety, all use of mktemp must occur in a private directory.
  541. # We must also suppress the RuntimeWarning it generates.
  542. def setUp(self):
  543. self.dir = tempfile.mkdtemp()
  544. def tearDown(self):
  545. if self.dir:
  546. os.rmdir(self.dir)
  547. self.dir = None
  548. class mktemped:
  549. _unlink = os.unlink
  550. _bflags = tempfile._bin_openflags
  551. def __init__(self, dir, pre, suf):
  552. self.name = tempfile.mktemp(dir=dir, prefix=pre, suffix=suf)
  553. # Create the file. This will raise an exception if it's
  554. # mysteriously appeared in the meanwhile.
  555. os.close(os.open(self.name, self._bflags, 0600))
  556. def __del__(self):
  557. self._unlink(self.name)
  558. def do_create(self, pre="", suf=""):
  559. try:
  560. file = self.mktemped(self.dir, pre, suf)
  561. except:
  562. self.failOnException("mktemp")
  563. self.nameCheck(file.name, self.dir, pre, suf)
  564. return file
  565. def test_basic(self):
  566. # mktemp can choose usable file names
  567. self.do_create()
  568. self.do_create(pre="a")
  569. self.do_create(suf="b")
  570. self.do_create(pre="a", suf="b")
  571. self.do_create(pre="aa", suf=".txt")
  572. def test_many(self):
  573. # mktemp can choose many usable file names (stochastic)
  574. extant = range(TEST_FILES)
  575. for i in extant:
  576. extant[i] = self.do_create(pre="aa")
  577. ## def test_warning(self):
  578. ## # mktemp issues a warning when used
  579. ## warnings.filterwarnings("error",
  580. ## category=RuntimeWarning,
  581. ## message="mktemp")
  582. ## self.assertRaises(RuntimeWarning,
  583. ## tempfile.mktemp, dir=self.dir)
  584. test_classes.append(test_mktemp)
  585. # We test _TemporaryFileWrapper by testing NamedTemporaryFile.
  586. class test_NamedTemporaryFile(TC):
  587. """Test NamedTemporaryFile()."""
  588. def do_create(self, dir=None, pre="", suf="", delete=True):
  589. if dir is None:
  590. dir = tempfile.gettempdir()
  591. try:
  592. file = tempfile.NamedTemporaryFile(dir=dir, prefix=pre, suffix=suf,
  593. delete=delete)
  594. except:
  595. self.failOnException("NamedTemporaryFile")
  596. self.nameCheck(file.name, dir, pre, suf)
  597. return file
  598. def test_basic(self):
  599. # NamedTemporaryFile can create files
  600. self.do_create()
  601. self.do_create(pre="a")
  602. self.do_create(suf="b")
  603. self.do_create(pre="a", suf="b")
  604. self.do_create(pre="aa", suf=".txt")
  605. def test_creates_named(self):
  606. # NamedTemporaryFile creates files with names
  607. f = tempfile.NamedTemporaryFile()
  608. self.assertTrue(os.path.exists(f.name),
  609. "NamedTemporaryFile %s does not exist" % f.name)
  610. def test_del_on_close(self):
  611. # A NamedTemporaryFile is deleted when closed
  612. dir = tempfile.mkdtemp()
  613. try:
  614. f = tempfile.NamedTemporaryFile(dir=dir)
  615. f.write('blat')
  616. f.close()
  617. self.assertFalse(os.path.exists(f.name),
  618. "NamedTemporaryFile %s exists after close" % f.name)
  619. finally:
  620. os.rmdir(dir)
  621. def test_dis_del_on_close(self):
  622. # Tests that delete-on-close can be disabled
  623. dir = tempfile.mkdtemp()
  624. tmp = None
  625. try:
  626. f = tempfile.NamedTemporaryFile(dir=dir, delete=False)
  627. tmp = f.name
  628. f.write('blat')
  629. f.close()
  630. self.assertTrue(os.path.exists(f.name),
  631. "NamedTemporaryFile %s missing after close" % f.name)
  632. finally:
  633. if tmp is not None:
  634. os.unlink(tmp)
  635. os.rmdir(dir)
  636. def test_multiple_close(self):
  637. # A NamedTemporaryFile can be closed many times without error
  638. f = tempfile.NamedTemporaryFile()
  639. f.write('abc\n')
  640. f.close()
  641. try:
  642. f.close()
  643. f.close()
  644. except:
  645. self.failOnException("close")
  646. def test_context_manager(self):
  647. # A NamedTemporaryFile can be used as a context manager
  648. with tempfile.NamedTemporaryFile() as f:
  649. self.assertTrue(os.path.exists(f.name))
  650. self.assertFalse(os.path.exists(f.name))
  651. def use_closed():
  652. with f:
  653. pass
  654. self.assertRaises(ValueError, use_closed)
  655. def test_no_leak_fd(self):
  656. # Issue #21058: don't leak file descriptor when fdopen() fails
  657. old_close = os.close
  658. old_fdopen = os.fdopen
  659. closed = []
  660. def close(fd):
  661. closed.append(fd)
  662. def fdopen(*args):
  663. raise ValueError()
  664. os.close = close
  665. os.fdopen = fdopen
  666. try:
  667. self.assertRaises(ValueError, tempfile.NamedTemporaryFile)
  668. self.assertEqual(len(closed), 1)
  669. finally:
  670. os.close = old_close
  671. os.fdopen = old_fdopen
  672. def test_bad_mode(self):
  673. dir = tempfile.mkdtemp()
  674. self.addCleanup(support.rmtree, dir)
  675. with self.assertRaises(TypeError):
  676. tempfile.NamedTemporaryFile(mode=(), dir=dir)
  677. self.assertEqual(os.listdir(dir), [])
  678. # How to test the mode and bufsize parameters?
  679. test_classes.append(test_NamedTemporaryFile)
  680. class test_SpooledTemporaryFile(TC):
  681. """Test SpooledTemporaryFile()."""
  682. def do_create(self, max_size=0, dir=None, pre="", suf=""):
  683. if dir is None:
  684. dir = tempfile.gettempdir()
  685. try:
  686. file = tempfile.SpooledTemporaryFile(max_size=max_size, dir=dir, prefix=pre, suffix=suf)
  687. except:
  688. self.failOnException("SpooledTemporaryFile")
  689. return file
  690. def test_basic(self):
  691. # SpooledTemporaryFile can create files
  692. f = self.do_create()
  693. self.assertFalse(f._rolled)
  694. f = self.do_create(max_size=100, pre="a", suf=".txt")
  695. self.assertFalse(f._rolled)
  696. def test_del_on_close(self):
  697. # A SpooledTemporaryFile is deleted when closed
  698. dir = tempfile.mkdtemp()
  699. try:
  700. f = tempfile.SpooledTemporaryFile(max_size=10, dir=dir)
  701. self.assertFalse(f._rolled)
  702. f.write('blat ' * 5)
  703. self.assertTrue(f._rolled)
  704. filename = f.name
  705. f.close()
  706. self.assertFalse(os.path.exists(filename),
  707. "SpooledTemporaryFile %s exists after close" % filename)
  708. finally:
  709. os.rmdir(dir)
  710. def test_rewrite_small(self):
  711. # A SpooledTemporaryFile can be written to multiple within the max_size
  712. f = self.do_create(max_size=30)
  713. self.assertFalse(f._rolled)
  714. for i in range(5):
  715. f.seek(0, 0)
  716. f.write('x' * 20)
  717. self.assertFalse(f._rolled)
  718. def test_write_sequential(self):
  719. # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
  720. # over afterward
  721. f = self.do_create(max_size=30)
  722. self.assertFalse(f._rolled)
  723. f.write('x' * 20)
  724. self.assertFalse(f._rolled)
  725. f.write('x' * 10)
  726. self.assertFalse(f._rolled)
  727. f.write('x')
  728. self.assertTrue(f._rolled)
  729. def test_writelines(self):
  730. # Verify writelines with a SpooledTemporaryFile
  731. f = self.do_create()
  732. f.writelines((b'x', b'y', b'z'))
  733. f.seek(0)
  734. buf = f.read()
  735. self.assertEqual(buf, b'xyz')
  736. def test_writelines_sequential(self):
  737. # A SpooledTemporaryFile should hold exactly max_size bytes, and roll
  738. # over afterward
  739. f = self.do_create(max_size=35)
  740. f.writelines((b'x' * 20, b'x' * 10, b'x' * 5))
  741. self.assertFalse(f._rolled)
  742. f.write(b'x')
  743. self.assertTrue(f._rolled)
  744. def test_xreadlines(self):
  745. f = self.do_create(max_size=20)
  746. f.write(b'abc\n' * 5)
  747. f.seek(0)
  748. self.assertFalse(f._rolled)
  749. self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5)
  750. f.write(b'x\ny')
  751. self.assertTrue(f._rolled)
  752. f.seek(0)
  753. self.assertEqual(list(f.xreadlines()), [b'abc\n'] * 5 + [b'x\n', b'y'])
  754. def test_sparse(self):
  755. # A SpooledTemporaryFile that is written late in the file will extend
  756. # when that occurs
  757. f = self.do_create(max_size=30)
  758. self.assertFalse(f._rolled)
  759. f.seek(100, 0)
  760. self.assertFalse(f._rolled)
  761. f.write('x')
  762. self.assertTrue(f._rolled)
  763. def test_fileno(self):
  764. # A SpooledTemporaryFile should roll over to a real file on fileno()
  765. f = self.do_create(max_size=30)
  766. self.assertFalse(f._rolled)
  767. self.assertTrue(f.fileno() > 0)
  768. self.assertTrue(f._rolled)
  769. def test_multiple_close_before_rollover(self):
  770. # A SpooledTemporaryFile can be closed many times without error
  771. f = tempfile.SpooledTemporaryFile()
  772. f.write('abc\n')
  773. self.assertFalse(f._rolled)
  774. f.close()
  775. try:
  776. f.close()
  777. f.close()
  778. except:
  779. self.failOnException("close")
  780. def test_multiple_close_after_rollover(self):
  781. # A SpooledTemporaryFile can be closed many times without error
  782. f = tempfile.SpooledTemporaryFile(max_size=1)
  783. f.write('abc\n')
  784. self.assertTrue(f._rolled)
  785. f.close()
  786. try:
  787. f.close()
  788. f.close()
  789. except:
  790. self.failOnException("close")
  791. def test_bound_methods(self):
  792. # It should be OK to steal a bound method from a SpooledTemporaryFile
  793. # and use it independently; when the file rolls over, those bound
  794. # methods should continue to function
  795. f = self.do_create(max_size=30)
  796. read = f.read
  797. write = f.write
  798. seek = f.seek
  799. write("a" * 35)
  800. write("b" * 35)
  801. seek(0, 0)
  802. self.assertTrue(read(70) == 'a'*35 + 'b'*35)
  803. def test_properties(self):
  804. f = tempfile.SpooledTemporaryFile(max_size=10)
  805. f.write(b'x' * 10)
  806. self.assertFalse(f._rolled)
  807. self.assertEqual(f.mode, 'w+b')
  808. self.assertIsNone(f.name)
  809. with self.assertRaises(AttributeError):
  810. f.newlines
  811. with self.assertRaises(AttributeError):
  812. f.encoding
  813. f.write(b'x')
  814. self.assertTrue(f._rolled)
  815. self.assertEqual(f.mode, 'w+b')
  816. self.assertIsNotNone(f.name)
  817. with self.assertRaises(AttributeError):
  818. f.newlines
  819. with self.assertRaises(AttributeError):
  820. f.encoding
  821. def test_context_manager_before_rollover(self):
  822. # A SpooledTemporaryFile can be used as a context manager
  823. with tempfile.SpooledTemporaryFile(max_size=1) as f:
  824. self.assertFalse(f._rolled)
  825. self.assertFalse(f.closed)
  826. self.assertTrue(f.closed)
  827. def use_closed():
  828. with f:
  829. pass
  830. self.assertRaises(ValueError, use_closed)
  831. def test_context_manager_during_rollover(self):
  832. # A SpooledTemporaryFile can be used as a context manager
  833. with tempfile.SpooledTemporaryFile(max_size=1) as f:
  834. self.assertFalse(f._rolled)
  835. f.write('abc\n')
  836. f.flush()
  837. self.assertTrue(f._rolled)
  838. self.assertFalse(f.closed)
  839. self.assertTrue(f.closed)
  840. def use_closed():
  841. with f:
  842. pass
  843. self.assertRaises(ValueError, use_closed)
  844. def test_context_manager_after_rollover(self):
  845. # A SpooledTemporaryFile can be used as a context manager
  846. f = tempfile.SpooledTemporaryFile(max_size=1)
  847. f.write('abc\n')
  848. f.flush()
  849. self.assertTrue(f._rolled)
  850. with f:
  851. self.assertFalse(f.closed)
  852. self.assertTrue(f.closed)
  853. def use_closed():
  854. with f:
  855. pass
  856. self.assertRaises(ValueError, use_closed)
  857. test_classes.append(test_SpooledTemporaryFile)
  858. class test_TemporaryFile(TC):
  859. """Test TemporaryFile()."""
  860. def test_basic(self):
  861. # TemporaryFile can create files
  862. # No point in testing the name params - the file has no name.
  863. try:
  864. tempfile.TemporaryFile()
  865. except:
  866. self.failOnException("TemporaryFile")
  867. def test_has_no_name(self):
  868. # TemporaryFile creates files with no names (on this system)
  869. dir = tempfile.mkdtemp()
  870. f = tempfile.TemporaryFile(dir=dir)
  871. f.write('blat')
  872. # Sneaky: because this file has no name, it should not prevent
  873. # us from removing the directory it was created in.
  874. try:
  875. os.rmdir(dir)
  876. except:
  877. ei = sys.exc_info()
  878. # cleanup
  879. f.close()
  880. os.rmdir(dir)
  881. self.failOnException("rmdir", ei)
  882. def test_multiple_close(self):
  883. # A TemporaryFile can be closed many times without error
  884. f = tempfile.TemporaryFile()
  885. f.write('abc\n')
  886. f.close()
  887. try:
  888. f.close()
  889. f.close()
  890. except:
  891. self.failOnException("close")
  892. # How to test the mode and bufsize parameters?
  893. if tempfile.NamedTemporaryFile is not tempfile.TemporaryFile:
  894. test_classes.append(test_TemporaryFile)
  895. def test_main():
  896. support.run_unittest(*test_classes)
  897. if __name__ == "__main__":
  898. test_main()