test_shutil.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918
  1. # Copyright (C) 2003 Python Software Foundation
  2. import unittest
  3. import shutil
  4. import tempfile
  5. import sys
  6. import stat
  7. import os
  8. import os.path
  9. import errno
  10. import subprocess
  11. from distutils.spawn import find_executable
  12. from shutil import (make_archive,
  13. register_archive_format, unregister_archive_format,
  14. get_archive_formats)
  15. import tarfile
  16. import warnings
  17. from test import test_support as support
  18. from test.test_support import TESTFN, check_warnings, captured_stdout
  19. TESTFN2 = TESTFN + "2"
  20. try:
  21. import grp
  22. import pwd
  23. UID_GID_SUPPORT = True
  24. except ImportError:
  25. UID_GID_SUPPORT = False
  26. try:
  27. import zlib
  28. except ImportError:
  29. zlib = None
  30. try:
  31. import zipfile
  32. ZIP_SUPPORT = True
  33. except ImportError:
  34. ZIP_SUPPORT = find_executable('zip')
  35. class TestShutil(unittest.TestCase):
  36. def setUp(self):
  37. super(TestShutil, self).setUp()
  38. self.tempdirs = []
  39. def tearDown(self):
  40. super(TestShutil, self).tearDown()
  41. while self.tempdirs:
  42. d = self.tempdirs.pop()
  43. shutil.rmtree(d, os.name in ('nt', 'cygwin'))
  44. def write_file(self, path, content='xxx'):
  45. """Writes a file in the given path.
  46. path can be a string or a sequence.
  47. """
  48. if isinstance(path, (list, tuple)):
  49. path = os.path.join(*path)
  50. f = open(path, 'w')
  51. try:
  52. f.write(content)
  53. finally:
  54. f.close()
  55. def mkdtemp(self):
  56. """Create a temporary directory that will be cleaned up.
  57. Returns the path of the directory.
  58. """
  59. d = tempfile.mkdtemp()
  60. self.tempdirs.append(d)
  61. return d
  62. def test_rmtree_errors(self):
  63. # filename is guaranteed not to exist
  64. filename = tempfile.mktemp()
  65. self.assertRaises(OSError, shutil.rmtree, filename)
  66. @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
  67. @unittest.skipIf(sys.platform[:6] == 'cygwin',
  68. "This test can't be run on Cygwin (issue #1071513).")
  69. @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
  70. "This test can't be run reliably as root (issue #1076467).")
  71. def test_on_error(self):
  72. self.errorState = 0
  73. os.mkdir(TESTFN)
  74. self.childpath = os.path.join(TESTFN, 'a')
  75. f = open(self.childpath, 'w')
  76. f.close()
  77. old_dir_mode = os.stat(TESTFN).st_mode
  78. old_child_mode = os.stat(self.childpath).st_mode
  79. # Make unwritable.
  80. os.chmod(self.childpath, stat.S_IREAD)
  81. os.chmod(TESTFN, stat.S_IREAD)
  82. shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
  83. # Test whether onerror has actually been called.
  84. self.assertEqual(self.errorState, 2,
  85. "Expected call to onerror function did not happen.")
  86. # Make writable again.
  87. os.chmod(TESTFN, old_dir_mode)
  88. os.chmod(self.childpath, old_child_mode)
  89. # Clean up.
  90. shutil.rmtree(TESTFN)
  91. def check_args_to_onerror(self, func, arg, exc):
  92. # test_rmtree_errors deliberately runs rmtree
  93. # on a directory that is chmod 400, which will fail.
  94. # This function is run when shutil.rmtree fails.
  95. # 99.9% of the time it initially fails to remove
  96. # a file in the directory, so the first time through
  97. # func is os.remove.
  98. # However, some Linux machines running ZFS on
  99. # FUSE experienced a failure earlier in the process
  100. # at os.listdir. The first failure may legally
  101. # be either.
  102. if self.errorState == 0:
  103. if func is os.remove:
  104. self.assertEqual(arg, self.childpath)
  105. else:
  106. self.assertIs(func, os.listdir,
  107. "func must be either os.remove or os.listdir")
  108. self.assertEqual(arg, TESTFN)
  109. self.assertTrue(issubclass(exc[0], OSError))
  110. self.errorState = 1
  111. else:
  112. self.assertEqual(func, os.rmdir)
  113. self.assertEqual(arg, TESTFN)
  114. self.assertTrue(issubclass(exc[0], OSError))
  115. self.errorState = 2
  116. def test_rmtree_dont_delete_file(self):
  117. # When called on a file instead of a directory, don't delete it.
  118. handle, path = tempfile.mkstemp()
  119. os.fdopen(handle).close()
  120. self.assertRaises(OSError, shutil.rmtree, path)
  121. os.remove(path)
  122. def test_copytree_simple(self):
  123. def write_data(path, data):
  124. f = open(path, "w")
  125. f.write(data)
  126. f.close()
  127. def read_data(path):
  128. f = open(path)
  129. data = f.read()
  130. f.close()
  131. return data
  132. src_dir = tempfile.mkdtemp()
  133. dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
  134. write_data(os.path.join(src_dir, 'test.txt'), '123')
  135. os.mkdir(os.path.join(src_dir, 'test_dir'))
  136. write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
  137. try:
  138. shutil.copytree(src_dir, dst_dir)
  139. self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
  140. self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
  141. self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
  142. 'test.txt')))
  143. actual = read_data(os.path.join(dst_dir, 'test.txt'))
  144. self.assertEqual(actual, '123')
  145. actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt'))
  146. self.assertEqual(actual, '456')
  147. finally:
  148. for path in (
  149. os.path.join(src_dir, 'test.txt'),
  150. os.path.join(dst_dir, 'test.txt'),
  151. os.path.join(src_dir, 'test_dir', 'test.txt'),
  152. os.path.join(dst_dir, 'test_dir', 'test.txt'),
  153. ):
  154. if os.path.exists(path):
  155. os.remove(path)
  156. for path in (src_dir,
  157. os.path.dirname(dst_dir)
  158. ):
  159. if os.path.exists(path):
  160. shutil.rmtree(path)
  161. def test_copytree_with_exclude(self):
  162. def write_data(path, data):
  163. f = open(path, "w")
  164. f.write(data)
  165. f.close()
  166. def read_data(path):
  167. f = open(path)
  168. data = f.read()
  169. f.close()
  170. return data
  171. # creating data
  172. join = os.path.join
  173. exists = os.path.exists
  174. src_dir = tempfile.mkdtemp()
  175. try:
  176. dst_dir = join(tempfile.mkdtemp(), 'destination')
  177. write_data(join(src_dir, 'test.txt'), '123')
  178. write_data(join(src_dir, 'test.tmp'), '123')
  179. os.mkdir(join(src_dir, 'test_dir'))
  180. write_data(join(src_dir, 'test_dir', 'test.txt'), '456')
  181. os.mkdir(join(src_dir, 'test_dir2'))
  182. write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')
  183. os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
  184. os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
  185. write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
  186. write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
  187. # testing glob-like patterns
  188. try:
  189. patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
  190. shutil.copytree(src_dir, dst_dir, ignore=patterns)
  191. # checking the result: some elements should not be copied
  192. self.assertTrue(exists(join(dst_dir, 'test.txt')))
  193. self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
  194. self.assertTrue(not exists(join(dst_dir, 'test_dir2')))
  195. finally:
  196. if os.path.exists(dst_dir):
  197. shutil.rmtree(dst_dir)
  198. try:
  199. patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
  200. shutil.copytree(src_dir, dst_dir, ignore=patterns)
  201. # checking the result: some elements should not be copied
  202. self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
  203. self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2')))
  204. self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
  205. finally:
  206. if os.path.exists(dst_dir):
  207. shutil.rmtree(dst_dir)
  208. # testing callable-style
  209. try:
  210. def _filter(src, names):
  211. res = []
  212. for name in names:
  213. path = os.path.join(src, name)
  214. if (os.path.isdir(path) and
  215. path.split()[-1] == 'subdir'):
  216. res.append(name)
  217. elif os.path.splitext(path)[-1] in ('.py'):
  218. res.append(name)
  219. return res
  220. shutil.copytree(src_dir, dst_dir, ignore=_filter)
  221. # checking the result: some elements should not be copied
  222. self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2',
  223. 'test.py')))
  224. self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
  225. finally:
  226. if os.path.exists(dst_dir):
  227. shutil.rmtree(dst_dir)
  228. finally:
  229. shutil.rmtree(src_dir)
  230. shutil.rmtree(os.path.dirname(dst_dir))
  231. if hasattr(os, "symlink"):
  232. def test_dont_copy_file_onto_link_to_itself(self):
  233. # bug 851123.
  234. os.mkdir(TESTFN)
  235. src = os.path.join(TESTFN, 'cheese')
  236. dst = os.path.join(TESTFN, 'shop')
  237. try:
  238. f = open(src, 'w')
  239. f.write('cheddar')
  240. f.close()
  241. os.link(src, dst)
  242. self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
  243. with open(src, 'r') as f:
  244. self.assertEqual(f.read(), 'cheddar')
  245. os.remove(dst)
  246. # Using `src` here would mean we end up with a symlink pointing
  247. # to TESTFN/TESTFN/cheese, while it should point at
  248. # TESTFN/cheese.
  249. os.symlink('cheese', dst)
  250. self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
  251. with open(src, 'r') as f:
  252. self.assertEqual(f.read(), 'cheddar')
  253. os.remove(dst)
  254. finally:
  255. try:
  256. shutil.rmtree(TESTFN)
  257. except OSError:
  258. pass
  259. def test_rmtree_on_symlink(self):
  260. # bug 1669.
  261. os.mkdir(TESTFN)
  262. try:
  263. src = os.path.join(TESTFN, 'cheese')
  264. dst = os.path.join(TESTFN, 'shop')
  265. os.mkdir(src)
  266. os.symlink(src, dst)
  267. self.assertRaises(OSError, shutil.rmtree, dst)
  268. finally:
  269. shutil.rmtree(TESTFN, ignore_errors=True)
  270. # Issue #3002: copyfile and copytree block indefinitely on named pipes
  271. @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
  272. def test_copyfile_named_pipe(self):
  273. os.mkfifo(TESTFN)
  274. try:
  275. self.assertRaises(shutil.SpecialFileError,
  276. shutil.copyfile, TESTFN, TESTFN2)
  277. self.assertRaises(shutil.SpecialFileError,
  278. shutil.copyfile, __file__, TESTFN)
  279. finally:
  280. os.remove(TESTFN)
  281. @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
  282. def test_copytree_named_pipe(self):
  283. os.mkdir(TESTFN)
  284. try:
  285. subdir = os.path.join(TESTFN, "subdir")
  286. os.mkdir(subdir)
  287. pipe = os.path.join(subdir, "mypipe")
  288. os.mkfifo(pipe)
  289. try:
  290. shutil.copytree(TESTFN, TESTFN2)
  291. except shutil.Error as e:
  292. errors = e.args[0]
  293. self.assertEqual(len(errors), 1)
  294. src, dst, error_msg = errors[0]
  295. self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
  296. else:
  297. self.fail("shutil.Error should have been raised")
  298. finally:
  299. shutil.rmtree(TESTFN, ignore_errors=True)
  300. shutil.rmtree(TESTFN2, ignore_errors=True)
  301. @unittest.skipUnless(hasattr(os, 'chflags') and
  302. hasattr(errno, 'EOPNOTSUPP') and
  303. hasattr(errno, 'ENOTSUP'),
  304. "requires os.chflags, EOPNOTSUPP & ENOTSUP")
  305. def test_copystat_handles_harmless_chflags_errors(self):
  306. tmpdir = self.mkdtemp()
  307. file1 = os.path.join(tmpdir, 'file1')
  308. file2 = os.path.join(tmpdir, 'file2')
  309. self.write_file(file1, 'xxx')
  310. self.write_file(file2, 'xxx')
  311. def make_chflags_raiser(err):
  312. ex = OSError()
  313. def _chflags_raiser(path, flags):
  314. ex.errno = err
  315. raise ex
  316. return _chflags_raiser
  317. old_chflags = os.chflags
  318. try:
  319. for err in errno.EOPNOTSUPP, errno.ENOTSUP:
  320. os.chflags = make_chflags_raiser(err)
  321. shutil.copystat(file1, file2)
  322. # assert others errors break it
  323. os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
  324. self.assertRaises(OSError, shutil.copystat, file1, file2)
  325. finally:
  326. os.chflags = old_chflags
  327. @unittest.skipUnless(zlib, "requires zlib")
  328. def test_make_tarball(self):
  329. # creating something to tar
  330. root_dir, base_dir = self._create_files('')
  331. tmpdir2 = self.mkdtemp()
  332. # force shutil to create the directory
  333. os.rmdir(tmpdir2)
  334. # working with relative paths
  335. work_dir = os.path.dirname(tmpdir2)
  336. rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
  337. with support.change_cwd(work_dir):
  338. base_name = os.path.abspath(rel_base_name)
  339. tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
  340. # check if the compressed tarball was created
  341. self.assertEqual(tarball, base_name + '.tar.gz')
  342. self.assertTrue(os.path.isfile(tarball))
  343. self.assertTrue(tarfile.is_tarfile(tarball))
  344. with tarfile.open(tarball, 'r:gz') as tf:
  345. self.assertEqual(sorted(tf.getnames()),
  346. ['.', './file1', './file2',
  347. './sub', './sub/file3', './sub2'])
  348. # trying an uncompressed one
  349. with support.change_cwd(work_dir):
  350. tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
  351. self.assertEqual(tarball, base_name + '.tar')
  352. self.assertTrue(os.path.isfile(tarball))
  353. self.assertTrue(tarfile.is_tarfile(tarball))
  354. with tarfile.open(tarball, 'r') as tf:
  355. self.assertEqual(sorted(tf.getnames()),
  356. ['.', './file1', './file2',
  357. './sub', './sub/file3', './sub2'])
  358. def _tarinfo(self, path):
  359. with tarfile.open(path) as tar:
  360. names = tar.getnames()
  361. names.sort()
  362. return tuple(names)
  363. def _create_files(self, base_dir='dist'):
  364. # creating something to tar
  365. root_dir = self.mkdtemp()
  366. dist = os.path.join(root_dir, base_dir)
  367. if not os.path.isdir(dist):
  368. os.makedirs(dist)
  369. self.write_file((dist, 'file1'), 'xxx')
  370. self.write_file((dist, 'file2'), 'xxx')
  371. os.mkdir(os.path.join(dist, 'sub'))
  372. self.write_file((dist, 'sub', 'file3'), 'xxx')
  373. os.mkdir(os.path.join(dist, 'sub2'))
  374. if base_dir:
  375. self.write_file((root_dir, 'outer'), 'xxx')
  376. return root_dir, base_dir
  377. @unittest.skipUnless(zlib, "Requires zlib")
  378. @unittest.skipUnless(find_executable('tar'),
  379. 'Need the tar command to run')
  380. def test_tarfile_vs_tar(self):
  381. root_dir, base_dir = self._create_files()
  382. base_name = os.path.join(self.mkdtemp(), 'archive')
  383. tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
  384. # check if the compressed tarball was created
  385. self.assertEqual(tarball, base_name + '.tar.gz')
  386. self.assertTrue(os.path.isfile(tarball))
  387. # now create another tarball using `tar`
  388. tarball2 = os.path.join(root_dir, 'archive2.tar')
  389. tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
  390. subprocess.check_call(tar_cmd, cwd=root_dir)
  391. self.assertTrue(os.path.isfile(tarball2))
  392. # let's compare both tarballs
  393. self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
  394. # trying an uncompressed one
  395. tarball = make_archive(base_name, 'tar', root_dir, base_dir)
  396. self.assertEqual(tarball, base_name + '.tar')
  397. self.assertTrue(os.path.isfile(tarball))
  398. # now for a dry_run
  399. tarball = make_archive(base_name, 'tar', root_dir, base_dir,
  400. dry_run=True)
  401. self.assertEqual(tarball, base_name + '.tar')
  402. self.assertTrue(os.path.isfile(tarball))
  403. @unittest.skipUnless(zlib, "Requires zlib")
  404. @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
  405. def test_make_zipfile(self):
  406. # creating something to zip
  407. root_dir, base_dir = self._create_files()
  408. tmpdir2 = self.mkdtemp()
  409. # force shutil to create the directory
  410. os.rmdir(tmpdir2)
  411. # working with relative paths
  412. work_dir = os.path.dirname(tmpdir2)
  413. rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
  414. with support.change_cwd(work_dir):
  415. base_name = os.path.abspath(rel_base_name)
  416. res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
  417. self.assertEqual(res, base_name + '.zip')
  418. self.assertTrue(os.path.isfile(res))
  419. self.assertTrue(zipfile.is_zipfile(res))
  420. with zipfile.ZipFile(res) as zf:
  421. self.assertEqual(sorted(zf.namelist()),
  422. ['dist/', 'dist/file1', 'dist/file2',
  423. 'dist/sub/', 'dist/sub/file3', 'dist/sub2/'])
  424. @unittest.skipUnless(zlib, "Requires zlib")
  425. @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
  426. @unittest.skipUnless(find_executable('zip'),
  427. 'Need the zip command to run')
  428. def test_zipfile_vs_zip(self):
  429. root_dir, base_dir = self._create_files()
  430. base_name = os.path.join(self.mkdtemp(), 'archive')
  431. archive = make_archive(base_name, 'zip', root_dir, base_dir)
  432. # check if ZIP file was created
  433. self.assertEqual(archive, base_name + '.zip')
  434. self.assertTrue(os.path.isfile(archive))
  435. # now create another ZIP file using `zip`
  436. archive2 = os.path.join(root_dir, 'archive2.zip')
  437. zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
  438. subprocess.check_call(zip_cmd, cwd=root_dir)
  439. self.assertTrue(os.path.isfile(archive2))
  440. # let's compare both ZIP files
  441. with zipfile.ZipFile(archive) as zf:
  442. names = zf.namelist()
  443. with zipfile.ZipFile(archive2) as zf:
  444. names2 = zf.namelist()
  445. self.assertEqual(sorted(names), sorted(names2))
  446. @unittest.skipUnless(zlib, "Requires zlib")
  447. @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
  448. @unittest.skipUnless(find_executable('unzip'),
  449. 'Need the unzip command to run')
  450. def test_unzip_zipfile(self):
  451. root_dir, base_dir = self._create_files()
  452. base_name = os.path.join(self.mkdtemp(), 'archive')
  453. archive = make_archive(base_name, 'zip', root_dir, base_dir)
  454. # check if ZIP file was created
  455. self.assertEqual(archive, base_name + '.zip')
  456. self.assertTrue(os.path.isfile(archive))
  457. # now check the ZIP file using `unzip -t`
  458. zip_cmd = ['unzip', '-t', archive]
  459. with support.change_cwd(root_dir):
  460. try:
  461. subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
  462. except subprocess.CalledProcessError as exc:
  463. details = exc.output
  464. msg = "{}\n\n**Unzip Output**\n{}"
  465. self.fail(msg.format(exc, details))
  466. def test_make_archive(self):
  467. tmpdir = self.mkdtemp()
  468. base_name = os.path.join(tmpdir, 'archive')
  469. self.assertRaises(ValueError, make_archive, base_name, 'xxx')
  470. @unittest.skipUnless(zlib, "Requires zlib")
  471. def test_make_archive_owner_group(self):
  472. # testing make_archive with owner and group, with various combinations
  473. # this works even if there's not gid/uid support
  474. if UID_GID_SUPPORT:
  475. group = grp.getgrgid(0)[0]
  476. owner = pwd.getpwuid(0)[0]
  477. else:
  478. group = owner = 'root'
  479. root_dir, base_dir = self._create_files()
  480. base_name = os.path.join(self.mkdtemp(), 'archive')
  481. res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
  482. group=group)
  483. self.assertTrue(os.path.isfile(res))
  484. res = make_archive(base_name, 'zip', root_dir, base_dir)
  485. self.assertTrue(os.path.isfile(res))
  486. res = make_archive(base_name, 'tar', root_dir, base_dir,
  487. owner=owner, group=group)
  488. self.assertTrue(os.path.isfile(res))
  489. res = make_archive(base_name, 'tar', root_dir, base_dir,
  490. owner='kjhkjhkjg', group='oihohoh')
  491. self.assertTrue(os.path.isfile(res))
  492. @unittest.skipUnless(zlib, "Requires zlib")
  493. @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
  494. def test_tarfile_root_owner(self):
  495. root_dir, base_dir = self._create_files()
  496. base_name = os.path.join(self.mkdtemp(), 'archive')
  497. group = grp.getgrgid(0)[0]
  498. owner = pwd.getpwuid(0)[0]
  499. with support.change_cwd(root_dir):
  500. archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
  501. owner=owner, group=group)
  502. # check if the compressed tarball was created
  503. self.assertTrue(os.path.isfile(archive_name))
  504. # now checks the rights
  505. archive = tarfile.open(archive_name)
  506. try:
  507. for member in archive.getmembers():
  508. self.assertEqual(member.uid, 0)
  509. self.assertEqual(member.gid, 0)
  510. finally:
  511. archive.close()
  512. def test_make_archive_cwd(self):
  513. current_dir = os.getcwd()
  514. def _breaks(*args, **kw):
  515. raise RuntimeError()
  516. register_archive_format('xxx', _breaks, [], 'xxx file')
  517. try:
  518. try:
  519. make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
  520. except Exception:
  521. pass
  522. self.assertEqual(os.getcwd(), current_dir)
  523. finally:
  524. unregister_archive_format('xxx')
  525. def test_make_tarfile_in_curdir(self):
  526. # Issue #21280
  527. root_dir = self.mkdtemp()
  528. saved_dir = os.getcwd()
  529. try:
  530. os.chdir(root_dir)
  531. self.assertEqual(make_archive('test', 'tar'), 'test.tar')
  532. self.assertTrue(os.path.isfile('test.tar'))
  533. finally:
  534. os.chdir(saved_dir)
  535. @unittest.skipUnless(zlib, "Requires zlib")
  536. def test_make_zipfile_in_curdir(self):
  537. # Issue #21280
  538. root_dir = self.mkdtemp()
  539. saved_dir = os.getcwd()
  540. try:
  541. os.chdir(root_dir)
  542. self.assertEqual(make_archive('test', 'zip'), 'test.zip')
  543. self.assertTrue(os.path.isfile('test.zip'))
  544. finally:
  545. os.chdir(saved_dir)
  546. def test_register_archive_format(self):
  547. self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
  548. self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
  549. 1)
  550. self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
  551. [(1, 2), (1, 2, 3)])
  552. register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
  553. formats = [name for name, params in get_archive_formats()]
  554. self.assertIn('xxx', formats)
  555. unregister_archive_format('xxx')
  556. formats = [name for name, params in get_archive_formats()]
  557. self.assertNotIn('xxx', formats)
  558. class TestMove(unittest.TestCase):
  559. def setUp(self):
  560. filename = "foo"
  561. self.src_dir = tempfile.mkdtemp()
  562. self.dst_dir = tempfile.mkdtemp()
  563. self.src_file = os.path.join(self.src_dir, filename)
  564. self.dst_file = os.path.join(self.dst_dir, filename)
  565. # Try to create a dir in the current directory, hoping that it is
  566. # not located on the same filesystem as the system tmp dir.
  567. try:
  568. self.dir_other_fs = tempfile.mkdtemp(
  569. dir=os.path.dirname(__file__))
  570. self.file_other_fs = os.path.join(self.dir_other_fs,
  571. filename)
  572. except OSError:
  573. self.dir_other_fs = None
  574. with open(self.src_file, "wb") as f:
  575. f.write("spam")
  576. def tearDown(self):
  577. for d in (self.src_dir, self.dst_dir, self.dir_other_fs):
  578. try:
  579. if d:
  580. shutil.rmtree(d)
  581. except:
  582. pass
  583. def _check_move_file(self, src, dst, real_dst):
  584. with open(src, "rb") as f:
  585. contents = f.read()
  586. shutil.move(src, dst)
  587. with open(real_dst, "rb") as f:
  588. self.assertEqual(contents, f.read())
  589. self.assertFalse(os.path.exists(src))
  590. def _check_move_dir(self, src, dst, real_dst):
  591. contents = sorted(os.listdir(src))
  592. shutil.move(src, dst)
  593. self.assertEqual(contents, sorted(os.listdir(real_dst)))
  594. self.assertFalse(os.path.exists(src))
  595. def test_move_file(self):
  596. # Move a file to another location on the same filesystem.
  597. self._check_move_file(self.src_file, self.dst_file, self.dst_file)
  598. def test_move_file_to_dir(self):
  599. # Move a file inside an existing dir on the same filesystem.
  600. self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
  601. def test_move_file_other_fs(self):
  602. # Move a file to an existing dir on another filesystem.
  603. if not self.dir_other_fs:
  604. self.skipTest('dir on other filesystem not available')
  605. self._check_move_file(self.src_file, self.file_other_fs,
  606. self.file_other_fs)
  607. def test_move_file_to_dir_other_fs(self):
  608. # Move a file to another location on another filesystem.
  609. if not self.dir_other_fs:
  610. self.skipTest('dir on other filesystem not available')
  611. self._check_move_file(self.src_file, self.dir_other_fs,
  612. self.file_other_fs)
  613. def test_move_dir(self):
  614. # Move a dir to another location on the same filesystem.
  615. dst_dir = tempfile.mktemp()
  616. try:
  617. self._check_move_dir(self.src_dir, dst_dir, dst_dir)
  618. finally:
  619. try:
  620. shutil.rmtree(dst_dir)
  621. except:
  622. pass
  623. def test_move_dir_other_fs(self):
  624. # Move a dir to another location on another filesystem.
  625. if not self.dir_other_fs:
  626. self.skipTest('dir on other filesystem not available')
  627. dst_dir = tempfile.mktemp(dir=self.dir_other_fs)
  628. try:
  629. self._check_move_dir(self.src_dir, dst_dir, dst_dir)
  630. finally:
  631. try:
  632. shutil.rmtree(dst_dir)
  633. except:
  634. pass
  635. def test_move_dir_to_dir(self):
  636. # Move a dir inside an existing dir on the same filesystem.
  637. self._check_move_dir(self.src_dir, self.dst_dir,
  638. os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
  639. def test_move_dir_to_dir_other_fs(self):
  640. # Move a dir inside an existing dir on another filesystem.
  641. if not self.dir_other_fs:
  642. self.skipTest('dir on other filesystem not available')
  643. self._check_move_dir(self.src_dir, self.dir_other_fs,
  644. os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))
  645. def test_move_dir_sep_to_dir(self):
  646. self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
  647. os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
  648. @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
  649. def test_move_dir_altsep_to_dir(self):
  650. self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
  651. os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
  652. def test_existing_file_inside_dest_dir(self):
  653. # A file with the same name inside the destination dir already exists.
  654. with open(self.dst_file, "wb"):
  655. pass
  656. self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
  657. def test_dont_move_dir_in_itself(self):
  658. # Moving a dir inside itself raises an Error.
  659. dst = os.path.join(self.src_dir, "bar")
  660. self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
  661. def test_destinsrc_false_negative(self):
  662. os.mkdir(TESTFN)
  663. try:
  664. for src, dst in [('srcdir', 'srcdir/dest')]:
  665. src = os.path.join(TESTFN, src)
  666. dst = os.path.join(TESTFN, dst)
  667. self.assertTrue(shutil._destinsrc(src, dst),
  668. msg='_destinsrc() wrongly concluded that '
  669. 'dst (%s) is not in src (%s)' % (dst, src))
  670. finally:
  671. shutil.rmtree(TESTFN, ignore_errors=True)
  672. def test_destinsrc_false_positive(self):
  673. os.mkdir(TESTFN)
  674. try:
  675. for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
  676. src = os.path.join(TESTFN, src)
  677. dst = os.path.join(TESTFN, dst)
  678. self.assertFalse(shutil._destinsrc(src, dst),
  679. msg='_destinsrc() wrongly concluded that '
  680. 'dst (%s) is in src (%s)' % (dst, src))
  681. finally:
  682. shutil.rmtree(TESTFN, ignore_errors=True)
  683. class TestCopyFile(unittest.TestCase):
  684. _delete = False
  685. class Faux(object):
  686. _entered = False
  687. _exited_with = None
  688. _raised = False
  689. def __init__(self, raise_in_exit=False, suppress_at_exit=True):
  690. self._raise_in_exit = raise_in_exit
  691. self._suppress_at_exit = suppress_at_exit
  692. def read(self, *args):
  693. return ''
  694. def __enter__(self):
  695. self._entered = True
  696. def __exit__(self, exc_type, exc_val, exc_tb):
  697. self._exited_with = exc_type, exc_val, exc_tb
  698. if self._raise_in_exit:
  699. self._raised = True
  700. raise IOError("Cannot close")
  701. return self._suppress_at_exit
  702. def tearDown(self):
  703. if self._delete:
  704. del shutil.open
  705. def _set_shutil_open(self, func):
  706. shutil.open = func
  707. self._delete = True
  708. def test_w_source_open_fails(self):
  709. def _open(filename, mode='r'):
  710. if filename == 'srcfile':
  711. raise IOError('Cannot open "srcfile"')
  712. assert 0 # shouldn't reach here.
  713. self._set_shutil_open(_open)
  714. self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile')
  715. def test_w_dest_open_fails(self):
  716. srcfile = self.Faux()
  717. def _open(filename, mode='r'):
  718. if filename == 'srcfile':
  719. return srcfile
  720. if filename == 'destfile':
  721. raise IOError('Cannot open "destfile"')
  722. assert 0 # shouldn't reach here.
  723. self._set_shutil_open(_open)
  724. shutil.copyfile('srcfile', 'destfile')
  725. self.assertTrue(srcfile._entered)
  726. self.assertTrue(srcfile._exited_with[0] is IOError)
  727. self.assertEqual(srcfile._exited_with[1].args,
  728. ('Cannot open "destfile"',))
  729. def test_w_dest_close_fails(self):
  730. srcfile = self.Faux()
  731. destfile = self.Faux(True)
  732. def _open(filename, mode='r'):
  733. if filename == 'srcfile':
  734. return srcfile
  735. if filename == 'destfile':
  736. return destfile
  737. assert 0 # shouldn't reach here.
  738. self._set_shutil_open(_open)
  739. shutil.copyfile('srcfile', 'destfile')
  740. self.assertTrue(srcfile._entered)
  741. self.assertTrue(destfile._entered)
  742. self.assertTrue(destfile._raised)
  743. self.assertTrue(srcfile._exited_with[0] is IOError)
  744. self.assertEqual(srcfile._exited_with[1].args,
  745. ('Cannot close',))
  746. def test_w_source_close_fails(self):
  747. srcfile = self.Faux(True)
  748. destfile = self.Faux()
  749. def _open(filename, mode='r'):
  750. if filename == 'srcfile':
  751. return srcfile
  752. if filename == 'destfile':
  753. return destfile
  754. assert 0 # shouldn't reach here.
  755. self._set_shutil_open(_open)
  756. self.assertRaises(IOError,
  757. shutil.copyfile, 'srcfile', 'destfile')
  758. self.assertTrue(srcfile._entered)
  759. self.assertTrue(destfile._entered)
  760. self.assertFalse(destfile._raised)
  761. self.assertTrue(srcfile._exited_with[0] is None)
  762. self.assertTrue(srcfile._raised)
  763. def test_move_dir_caseinsensitive(self):
  764. # Renames a folder to the same name
  765. # but a different case.
  766. self.src_dir = tempfile.mkdtemp()
  767. dst_dir = os.path.join(
  768. os.path.dirname(self.src_dir),
  769. os.path.basename(self.src_dir).upper())
  770. self.assertNotEqual(self.src_dir, dst_dir)
  771. try:
  772. shutil.move(self.src_dir, dst_dir)
  773. self.assertTrue(os.path.isdir(dst_dir))
  774. finally:
  775. if os.path.exists(dst_dir):
  776. os.rmdir(dst_dir)
  777. def test_main():
  778. support.run_unittest(TestShutil, TestMove, TestCopyFile)
  779. if __name__ == '__main__':
  780. test_main()