123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918 |
- # Copyright (C) 2003 Python Software Foundation
- import unittest
- import shutil
- import tempfile
- import sys
- import stat
- import os
- import os.path
- import errno
- import subprocess
- from distutils.spawn import find_executable
- from shutil import (make_archive,
- register_archive_format, unregister_archive_format,
- get_archive_formats)
- import tarfile
- import warnings
- from test import test_support as support
- from test.test_support import TESTFN, check_warnings, captured_stdout
- TESTFN2 = TESTFN + "2"
- try:
- import grp
- import pwd
- UID_GID_SUPPORT = True
- except ImportError:
- UID_GID_SUPPORT = False
- try:
- import zlib
- except ImportError:
- zlib = None
- try:
- import zipfile
- ZIP_SUPPORT = True
- except ImportError:
- ZIP_SUPPORT = find_executable('zip')
- class TestShutil(unittest.TestCase):
- def setUp(self):
- super(TestShutil, self).setUp()
- self.tempdirs = []
- def tearDown(self):
- super(TestShutil, self).tearDown()
- while self.tempdirs:
- d = self.tempdirs.pop()
- shutil.rmtree(d, os.name in ('nt', 'cygwin'))
- def write_file(self, path, content='xxx'):
- """Writes a file in the given path.
- path can be a string or a sequence.
- """
- if isinstance(path, (list, tuple)):
- path = os.path.join(*path)
- f = open(path, 'w')
- try:
- f.write(content)
- finally:
- f.close()
- def mkdtemp(self):
- """Create a temporary directory that will be cleaned up.
- Returns the path of the directory.
- """
- d = tempfile.mkdtemp()
- self.tempdirs.append(d)
- return d
- def test_rmtree_errors(self):
- # filename is guaranteed not to exist
- filename = tempfile.mktemp()
- self.assertRaises(OSError, shutil.rmtree, filename)
- @unittest.skipUnless(hasattr(os, 'chmod'), 'requires os.chmod()')
- @unittest.skipIf(sys.platform[:6] == 'cygwin',
- "This test can't be run on Cygwin (issue #1071513).")
- @unittest.skipIf(hasattr(os, 'geteuid') and os.geteuid() == 0,
- "This test can't be run reliably as root (issue #1076467).")
- def test_on_error(self):
- self.errorState = 0
- os.mkdir(TESTFN)
- self.childpath = os.path.join(TESTFN, 'a')
- f = open(self.childpath, 'w')
- f.close()
- old_dir_mode = os.stat(TESTFN).st_mode
- old_child_mode = os.stat(self.childpath).st_mode
- # Make unwritable.
- os.chmod(self.childpath, stat.S_IREAD)
- os.chmod(TESTFN, stat.S_IREAD)
- shutil.rmtree(TESTFN, onerror=self.check_args_to_onerror)
- # Test whether onerror has actually been called.
- self.assertEqual(self.errorState, 2,
- "Expected call to onerror function did not happen.")
- # Make writable again.
- os.chmod(TESTFN, old_dir_mode)
- os.chmod(self.childpath, old_child_mode)
- # Clean up.
- shutil.rmtree(TESTFN)
- def check_args_to_onerror(self, func, arg, exc):
- # test_rmtree_errors deliberately runs rmtree
- # on a directory that is chmod 400, which will fail.
- # This function is run when shutil.rmtree fails.
- # 99.9% of the time it initially fails to remove
- # a file in the directory, so the first time through
- # func is os.remove.
- # However, some Linux machines running ZFS on
- # FUSE experienced a failure earlier in the process
- # at os.listdir. The first failure may legally
- # be either.
- if self.errorState == 0:
- if func is os.remove:
- self.assertEqual(arg, self.childpath)
- else:
- self.assertIs(func, os.listdir,
- "func must be either os.remove or os.listdir")
- self.assertEqual(arg, TESTFN)
- self.assertTrue(issubclass(exc[0], OSError))
- self.errorState = 1
- else:
- self.assertEqual(func, os.rmdir)
- self.assertEqual(arg, TESTFN)
- self.assertTrue(issubclass(exc[0], OSError))
- self.errorState = 2
- def test_rmtree_dont_delete_file(self):
- # When called on a file instead of a directory, don't delete it.
- handle, path = tempfile.mkstemp()
- os.fdopen(handle).close()
- self.assertRaises(OSError, shutil.rmtree, path)
- os.remove(path)
- def test_copytree_simple(self):
- def write_data(path, data):
- f = open(path, "w")
- f.write(data)
- f.close()
- def read_data(path):
- f = open(path)
- data = f.read()
- f.close()
- return data
- src_dir = tempfile.mkdtemp()
- dst_dir = os.path.join(tempfile.mkdtemp(), 'destination')
- write_data(os.path.join(src_dir, 'test.txt'), '123')
- os.mkdir(os.path.join(src_dir, 'test_dir'))
- write_data(os.path.join(src_dir, 'test_dir', 'test.txt'), '456')
- try:
- shutil.copytree(src_dir, dst_dir)
- self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test.txt')))
- self.assertTrue(os.path.isdir(os.path.join(dst_dir, 'test_dir')))
- self.assertTrue(os.path.isfile(os.path.join(dst_dir, 'test_dir',
- 'test.txt')))
- actual = read_data(os.path.join(dst_dir, 'test.txt'))
- self.assertEqual(actual, '123')
- actual = read_data(os.path.join(dst_dir, 'test_dir', 'test.txt'))
- self.assertEqual(actual, '456')
- finally:
- for path in (
- os.path.join(src_dir, 'test.txt'),
- os.path.join(dst_dir, 'test.txt'),
- os.path.join(src_dir, 'test_dir', 'test.txt'),
- os.path.join(dst_dir, 'test_dir', 'test.txt'),
- ):
- if os.path.exists(path):
- os.remove(path)
- for path in (src_dir,
- os.path.dirname(dst_dir)
- ):
- if os.path.exists(path):
- shutil.rmtree(path)
- def test_copytree_with_exclude(self):
- def write_data(path, data):
- f = open(path, "w")
- f.write(data)
- f.close()
- def read_data(path):
- f = open(path)
- data = f.read()
- f.close()
- return data
- # creating data
- join = os.path.join
- exists = os.path.exists
- src_dir = tempfile.mkdtemp()
- try:
- dst_dir = join(tempfile.mkdtemp(), 'destination')
- write_data(join(src_dir, 'test.txt'), '123')
- write_data(join(src_dir, 'test.tmp'), '123')
- os.mkdir(join(src_dir, 'test_dir'))
- write_data(join(src_dir, 'test_dir', 'test.txt'), '456')
- os.mkdir(join(src_dir, 'test_dir2'))
- write_data(join(src_dir, 'test_dir2', 'test.txt'), '456')
- os.mkdir(join(src_dir, 'test_dir2', 'subdir'))
- os.mkdir(join(src_dir, 'test_dir2', 'subdir2'))
- write_data(join(src_dir, 'test_dir2', 'subdir', 'test.txt'), '456')
- write_data(join(src_dir, 'test_dir2', 'subdir2', 'test.py'), '456')
- # testing glob-like patterns
- try:
- patterns = shutil.ignore_patterns('*.tmp', 'test_dir2')
- shutil.copytree(src_dir, dst_dir, ignore=patterns)
- # checking the result: some elements should not be copied
- self.assertTrue(exists(join(dst_dir, 'test.txt')))
- self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
- self.assertTrue(not exists(join(dst_dir, 'test_dir2')))
- finally:
- if os.path.exists(dst_dir):
- shutil.rmtree(dst_dir)
- try:
- patterns = shutil.ignore_patterns('*.tmp', 'subdir*')
- shutil.copytree(src_dir, dst_dir, ignore=patterns)
- # checking the result: some elements should not be copied
- self.assertTrue(not exists(join(dst_dir, 'test.tmp')))
- self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2')))
- self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
- finally:
- if os.path.exists(dst_dir):
- shutil.rmtree(dst_dir)
- # testing callable-style
- try:
- def _filter(src, names):
- res = []
- for name in names:
- path = os.path.join(src, name)
- if (os.path.isdir(path) and
- path.split()[-1] == 'subdir'):
- res.append(name)
- elif os.path.splitext(path)[-1] in ('.py'):
- res.append(name)
- return res
- shutil.copytree(src_dir, dst_dir, ignore=_filter)
- # checking the result: some elements should not be copied
- self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir2',
- 'test.py')))
- self.assertTrue(not exists(join(dst_dir, 'test_dir2', 'subdir')))
- finally:
- if os.path.exists(dst_dir):
- shutil.rmtree(dst_dir)
- finally:
- shutil.rmtree(src_dir)
- shutil.rmtree(os.path.dirname(dst_dir))
- if hasattr(os, "symlink"):
- def test_dont_copy_file_onto_link_to_itself(self):
- # bug 851123.
- os.mkdir(TESTFN)
- src = os.path.join(TESTFN, 'cheese')
- dst = os.path.join(TESTFN, 'shop')
- try:
- f = open(src, 'w')
- f.write('cheddar')
- f.close()
- os.link(src, dst)
- self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
- with open(src, 'r') as f:
- self.assertEqual(f.read(), 'cheddar')
- os.remove(dst)
- # Using `src` here would mean we end up with a symlink pointing
- # to TESTFN/TESTFN/cheese, while it should point at
- # TESTFN/cheese.
- os.symlink('cheese', dst)
- self.assertRaises(shutil.Error, shutil.copyfile, src, dst)
- with open(src, 'r') as f:
- self.assertEqual(f.read(), 'cheddar')
- os.remove(dst)
- finally:
- try:
- shutil.rmtree(TESTFN)
- except OSError:
- pass
- def test_rmtree_on_symlink(self):
- # bug 1669.
- os.mkdir(TESTFN)
- try:
- src = os.path.join(TESTFN, 'cheese')
- dst = os.path.join(TESTFN, 'shop')
- os.mkdir(src)
- os.symlink(src, dst)
- self.assertRaises(OSError, shutil.rmtree, dst)
- finally:
- shutil.rmtree(TESTFN, ignore_errors=True)
- # Issue #3002: copyfile and copytree block indefinitely on named pipes
- @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
- def test_copyfile_named_pipe(self):
- os.mkfifo(TESTFN)
- try:
- self.assertRaises(shutil.SpecialFileError,
- shutil.copyfile, TESTFN, TESTFN2)
- self.assertRaises(shutil.SpecialFileError,
- shutil.copyfile, __file__, TESTFN)
- finally:
- os.remove(TESTFN)
- @unittest.skipUnless(hasattr(os, "mkfifo"), 'requires os.mkfifo()')
- def test_copytree_named_pipe(self):
- os.mkdir(TESTFN)
- try:
- subdir = os.path.join(TESTFN, "subdir")
- os.mkdir(subdir)
- pipe = os.path.join(subdir, "mypipe")
- os.mkfifo(pipe)
- try:
- shutil.copytree(TESTFN, TESTFN2)
- except shutil.Error as e:
- errors = e.args[0]
- self.assertEqual(len(errors), 1)
- src, dst, error_msg = errors[0]
- self.assertEqual("`%s` is a named pipe" % pipe, error_msg)
- else:
- self.fail("shutil.Error should have been raised")
- finally:
- shutil.rmtree(TESTFN, ignore_errors=True)
- shutil.rmtree(TESTFN2, ignore_errors=True)
- @unittest.skipUnless(hasattr(os, 'chflags') and
- hasattr(errno, 'EOPNOTSUPP') and
- hasattr(errno, 'ENOTSUP'),
- "requires os.chflags, EOPNOTSUPP & ENOTSUP")
- def test_copystat_handles_harmless_chflags_errors(self):
- tmpdir = self.mkdtemp()
- file1 = os.path.join(tmpdir, 'file1')
- file2 = os.path.join(tmpdir, 'file2')
- self.write_file(file1, 'xxx')
- self.write_file(file2, 'xxx')
- def make_chflags_raiser(err):
- ex = OSError()
- def _chflags_raiser(path, flags):
- ex.errno = err
- raise ex
- return _chflags_raiser
- old_chflags = os.chflags
- try:
- for err in errno.EOPNOTSUPP, errno.ENOTSUP:
- os.chflags = make_chflags_raiser(err)
- shutil.copystat(file1, file2)
- # assert others errors break it
- os.chflags = make_chflags_raiser(errno.EOPNOTSUPP + errno.ENOTSUP)
- self.assertRaises(OSError, shutil.copystat, file1, file2)
- finally:
- os.chflags = old_chflags
- @unittest.skipUnless(zlib, "requires zlib")
- def test_make_tarball(self):
- # creating something to tar
- root_dir, base_dir = self._create_files('')
- tmpdir2 = self.mkdtemp()
- # force shutil to create the directory
- os.rmdir(tmpdir2)
- # working with relative paths
- work_dir = os.path.dirname(tmpdir2)
- rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
- with support.change_cwd(work_dir):
- base_name = os.path.abspath(rel_base_name)
- tarball = make_archive(rel_base_name, 'gztar', root_dir, '.')
- # check if the compressed tarball was created
- self.assertEqual(tarball, base_name + '.tar.gz')
- self.assertTrue(os.path.isfile(tarball))
- self.assertTrue(tarfile.is_tarfile(tarball))
- with tarfile.open(tarball, 'r:gz') as tf:
- self.assertEqual(sorted(tf.getnames()),
- ['.', './file1', './file2',
- './sub', './sub/file3', './sub2'])
- # trying an uncompressed one
- with support.change_cwd(work_dir):
- tarball = make_archive(rel_base_name, 'tar', root_dir, '.')
- self.assertEqual(tarball, base_name + '.tar')
- self.assertTrue(os.path.isfile(tarball))
- self.assertTrue(tarfile.is_tarfile(tarball))
- with tarfile.open(tarball, 'r') as tf:
- self.assertEqual(sorted(tf.getnames()),
- ['.', './file1', './file2',
- './sub', './sub/file3', './sub2'])
- def _tarinfo(self, path):
- with tarfile.open(path) as tar:
- names = tar.getnames()
- names.sort()
- return tuple(names)
- def _create_files(self, base_dir='dist'):
- # creating something to tar
- root_dir = self.mkdtemp()
- dist = os.path.join(root_dir, base_dir)
- if not os.path.isdir(dist):
- os.makedirs(dist)
- self.write_file((dist, 'file1'), 'xxx')
- self.write_file((dist, 'file2'), 'xxx')
- os.mkdir(os.path.join(dist, 'sub'))
- self.write_file((dist, 'sub', 'file3'), 'xxx')
- os.mkdir(os.path.join(dist, 'sub2'))
- if base_dir:
- self.write_file((root_dir, 'outer'), 'xxx')
- return root_dir, base_dir
- @unittest.skipUnless(zlib, "Requires zlib")
- @unittest.skipUnless(find_executable('tar'),
- 'Need the tar command to run')
- def test_tarfile_vs_tar(self):
- root_dir, base_dir = self._create_files()
- base_name = os.path.join(self.mkdtemp(), 'archive')
- tarball = make_archive(base_name, 'gztar', root_dir, base_dir)
- # check if the compressed tarball was created
- self.assertEqual(tarball, base_name + '.tar.gz')
- self.assertTrue(os.path.isfile(tarball))
- # now create another tarball using `tar`
- tarball2 = os.path.join(root_dir, 'archive2.tar')
- tar_cmd = ['tar', '-cf', 'archive2.tar', base_dir]
- subprocess.check_call(tar_cmd, cwd=root_dir)
- self.assertTrue(os.path.isfile(tarball2))
- # let's compare both tarballs
- self.assertEqual(self._tarinfo(tarball), self._tarinfo(tarball2))
- # trying an uncompressed one
- tarball = make_archive(base_name, 'tar', root_dir, base_dir)
- self.assertEqual(tarball, base_name + '.tar')
- self.assertTrue(os.path.isfile(tarball))
- # now for a dry_run
- tarball = make_archive(base_name, 'tar', root_dir, base_dir,
- dry_run=True)
- self.assertEqual(tarball, base_name + '.tar')
- self.assertTrue(os.path.isfile(tarball))
- @unittest.skipUnless(zlib, "Requires zlib")
- @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
- def test_make_zipfile(self):
- # creating something to zip
- root_dir, base_dir = self._create_files()
- tmpdir2 = self.mkdtemp()
- # force shutil to create the directory
- os.rmdir(tmpdir2)
- # working with relative paths
- work_dir = os.path.dirname(tmpdir2)
- rel_base_name = os.path.join(os.path.basename(tmpdir2), 'archive')
- with support.change_cwd(work_dir):
- base_name = os.path.abspath(rel_base_name)
- res = make_archive(rel_base_name, 'zip', root_dir, base_dir)
- self.assertEqual(res, base_name + '.zip')
- self.assertTrue(os.path.isfile(res))
- self.assertTrue(zipfile.is_zipfile(res))
- with zipfile.ZipFile(res) as zf:
- self.assertEqual(sorted(zf.namelist()),
- ['dist/', 'dist/file1', 'dist/file2',
- 'dist/sub/', 'dist/sub/file3', 'dist/sub2/'])
- @unittest.skipUnless(zlib, "Requires zlib")
- @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
- @unittest.skipUnless(find_executable('zip'),
- 'Need the zip command to run')
- def test_zipfile_vs_zip(self):
- root_dir, base_dir = self._create_files()
- base_name = os.path.join(self.mkdtemp(), 'archive')
- archive = make_archive(base_name, 'zip', root_dir, base_dir)
- # check if ZIP file was created
- self.assertEqual(archive, base_name + '.zip')
- self.assertTrue(os.path.isfile(archive))
- # now create another ZIP file using `zip`
- archive2 = os.path.join(root_dir, 'archive2.zip')
- zip_cmd = ['zip', '-q', '-r', 'archive2.zip', base_dir]
- subprocess.check_call(zip_cmd, cwd=root_dir)
- self.assertTrue(os.path.isfile(archive2))
- # let's compare both ZIP files
- with zipfile.ZipFile(archive) as zf:
- names = zf.namelist()
- with zipfile.ZipFile(archive2) as zf:
- names2 = zf.namelist()
- self.assertEqual(sorted(names), sorted(names2))
- @unittest.skipUnless(zlib, "Requires zlib")
- @unittest.skipUnless(ZIP_SUPPORT, 'Need zip support to run')
- @unittest.skipUnless(find_executable('unzip'),
- 'Need the unzip command to run')
- def test_unzip_zipfile(self):
- root_dir, base_dir = self._create_files()
- base_name = os.path.join(self.mkdtemp(), 'archive')
- archive = make_archive(base_name, 'zip', root_dir, base_dir)
- # check if ZIP file was created
- self.assertEqual(archive, base_name + '.zip')
- self.assertTrue(os.path.isfile(archive))
- # now check the ZIP file using `unzip -t`
- zip_cmd = ['unzip', '-t', archive]
- with support.change_cwd(root_dir):
- try:
- subprocess.check_output(zip_cmd, stderr=subprocess.STDOUT)
- except subprocess.CalledProcessError as exc:
- details = exc.output
- msg = "{}\n\n**Unzip Output**\n{}"
- self.fail(msg.format(exc, details))
- def test_make_archive(self):
- tmpdir = self.mkdtemp()
- base_name = os.path.join(tmpdir, 'archive')
- self.assertRaises(ValueError, make_archive, base_name, 'xxx')
- @unittest.skipUnless(zlib, "Requires zlib")
- def test_make_archive_owner_group(self):
- # testing make_archive with owner and group, with various combinations
- # this works even if there's not gid/uid support
- if UID_GID_SUPPORT:
- group = grp.getgrgid(0)[0]
- owner = pwd.getpwuid(0)[0]
- else:
- group = owner = 'root'
- root_dir, base_dir = self._create_files()
- base_name = os.path.join(self.mkdtemp(), 'archive')
- res = make_archive(base_name, 'zip', root_dir, base_dir, owner=owner,
- group=group)
- self.assertTrue(os.path.isfile(res))
- res = make_archive(base_name, 'zip', root_dir, base_dir)
- self.assertTrue(os.path.isfile(res))
- res = make_archive(base_name, 'tar', root_dir, base_dir,
- owner=owner, group=group)
- self.assertTrue(os.path.isfile(res))
- res = make_archive(base_name, 'tar', root_dir, base_dir,
- owner='kjhkjhkjg', group='oihohoh')
- self.assertTrue(os.path.isfile(res))
- @unittest.skipUnless(zlib, "Requires zlib")
- @unittest.skipUnless(UID_GID_SUPPORT, "Requires grp and pwd support")
- def test_tarfile_root_owner(self):
- root_dir, base_dir = self._create_files()
- base_name = os.path.join(self.mkdtemp(), 'archive')
- group = grp.getgrgid(0)[0]
- owner = pwd.getpwuid(0)[0]
- with support.change_cwd(root_dir):
- archive_name = make_archive(base_name, 'gztar', root_dir, 'dist',
- owner=owner, group=group)
- # check if the compressed tarball was created
- self.assertTrue(os.path.isfile(archive_name))
- # now checks the rights
- archive = tarfile.open(archive_name)
- try:
- for member in archive.getmembers():
- self.assertEqual(member.uid, 0)
- self.assertEqual(member.gid, 0)
- finally:
- archive.close()
- def test_make_archive_cwd(self):
- current_dir = os.getcwd()
- def _breaks(*args, **kw):
- raise RuntimeError()
- register_archive_format('xxx', _breaks, [], 'xxx file')
- try:
- try:
- make_archive('xxx', 'xxx', root_dir=self.mkdtemp())
- except Exception:
- pass
- self.assertEqual(os.getcwd(), current_dir)
- finally:
- unregister_archive_format('xxx')
- def test_make_tarfile_in_curdir(self):
- # Issue #21280
- root_dir = self.mkdtemp()
- saved_dir = os.getcwd()
- try:
- os.chdir(root_dir)
- self.assertEqual(make_archive('test', 'tar'), 'test.tar')
- self.assertTrue(os.path.isfile('test.tar'))
- finally:
- os.chdir(saved_dir)
- @unittest.skipUnless(zlib, "Requires zlib")
- def test_make_zipfile_in_curdir(self):
- # Issue #21280
- root_dir = self.mkdtemp()
- saved_dir = os.getcwd()
- try:
- os.chdir(root_dir)
- self.assertEqual(make_archive('test', 'zip'), 'test.zip')
- self.assertTrue(os.path.isfile('test.zip'))
- finally:
- os.chdir(saved_dir)
- def test_register_archive_format(self):
- self.assertRaises(TypeError, register_archive_format, 'xxx', 1)
- self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
- 1)
- self.assertRaises(TypeError, register_archive_format, 'xxx', lambda: x,
- [(1, 2), (1, 2, 3)])
- register_archive_format('xxx', lambda: x, [(1, 2)], 'xxx file')
- formats = [name for name, params in get_archive_formats()]
- self.assertIn('xxx', formats)
- unregister_archive_format('xxx')
- formats = [name for name, params in get_archive_formats()]
- self.assertNotIn('xxx', formats)
- class TestMove(unittest.TestCase):
- def setUp(self):
- filename = "foo"
- self.src_dir = tempfile.mkdtemp()
- self.dst_dir = tempfile.mkdtemp()
- self.src_file = os.path.join(self.src_dir, filename)
- self.dst_file = os.path.join(self.dst_dir, filename)
- # Try to create a dir in the current directory, hoping that it is
- # not located on the same filesystem as the system tmp dir.
- try:
- self.dir_other_fs = tempfile.mkdtemp(
- dir=os.path.dirname(__file__))
- self.file_other_fs = os.path.join(self.dir_other_fs,
- filename)
- except OSError:
- self.dir_other_fs = None
- with open(self.src_file, "wb") as f:
- f.write("spam")
- def tearDown(self):
- for d in (self.src_dir, self.dst_dir, self.dir_other_fs):
- try:
- if d:
- shutil.rmtree(d)
- except:
- pass
- def _check_move_file(self, src, dst, real_dst):
- with open(src, "rb") as f:
- contents = f.read()
- shutil.move(src, dst)
- with open(real_dst, "rb") as f:
- self.assertEqual(contents, f.read())
- self.assertFalse(os.path.exists(src))
- def _check_move_dir(self, src, dst, real_dst):
- contents = sorted(os.listdir(src))
- shutil.move(src, dst)
- self.assertEqual(contents, sorted(os.listdir(real_dst)))
- self.assertFalse(os.path.exists(src))
- def test_move_file(self):
- # Move a file to another location on the same filesystem.
- self._check_move_file(self.src_file, self.dst_file, self.dst_file)
- def test_move_file_to_dir(self):
- # Move a file inside an existing dir on the same filesystem.
- self._check_move_file(self.src_file, self.dst_dir, self.dst_file)
- def test_move_file_other_fs(self):
- # Move a file to an existing dir on another filesystem.
- if not self.dir_other_fs:
- self.skipTest('dir on other filesystem not available')
- self._check_move_file(self.src_file, self.file_other_fs,
- self.file_other_fs)
- def test_move_file_to_dir_other_fs(self):
- # Move a file to another location on another filesystem.
- if not self.dir_other_fs:
- self.skipTest('dir on other filesystem not available')
- self._check_move_file(self.src_file, self.dir_other_fs,
- self.file_other_fs)
- def test_move_dir(self):
- # Move a dir to another location on the same filesystem.
- dst_dir = tempfile.mktemp()
- try:
- self._check_move_dir(self.src_dir, dst_dir, dst_dir)
- finally:
- try:
- shutil.rmtree(dst_dir)
- except:
- pass
- def test_move_dir_other_fs(self):
- # Move a dir to another location on another filesystem.
- if not self.dir_other_fs:
- self.skipTest('dir on other filesystem not available')
- dst_dir = tempfile.mktemp(dir=self.dir_other_fs)
- try:
- self._check_move_dir(self.src_dir, dst_dir, dst_dir)
- finally:
- try:
- shutil.rmtree(dst_dir)
- except:
- pass
- def test_move_dir_to_dir(self):
- # Move a dir inside an existing dir on the same filesystem.
- self._check_move_dir(self.src_dir, self.dst_dir,
- os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
- def test_move_dir_to_dir_other_fs(self):
- # Move a dir inside an existing dir on another filesystem.
- if not self.dir_other_fs:
- self.skipTest('dir on other filesystem not available')
- self._check_move_dir(self.src_dir, self.dir_other_fs,
- os.path.join(self.dir_other_fs, os.path.basename(self.src_dir)))
- def test_move_dir_sep_to_dir(self):
- self._check_move_dir(self.src_dir + os.path.sep, self.dst_dir,
- os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
- @unittest.skipUnless(os.path.altsep, 'requires os.path.altsep')
- def test_move_dir_altsep_to_dir(self):
- self._check_move_dir(self.src_dir + os.path.altsep, self.dst_dir,
- os.path.join(self.dst_dir, os.path.basename(self.src_dir)))
- def test_existing_file_inside_dest_dir(self):
- # A file with the same name inside the destination dir already exists.
- with open(self.dst_file, "wb"):
- pass
- self.assertRaises(shutil.Error, shutil.move, self.src_file, self.dst_dir)
- def test_dont_move_dir_in_itself(self):
- # Moving a dir inside itself raises an Error.
- dst = os.path.join(self.src_dir, "bar")
- self.assertRaises(shutil.Error, shutil.move, self.src_dir, dst)
- def test_destinsrc_false_negative(self):
- os.mkdir(TESTFN)
- try:
- for src, dst in [('srcdir', 'srcdir/dest')]:
- src = os.path.join(TESTFN, src)
- dst = os.path.join(TESTFN, dst)
- self.assertTrue(shutil._destinsrc(src, dst),
- msg='_destinsrc() wrongly concluded that '
- 'dst (%s) is not in src (%s)' % (dst, src))
- finally:
- shutil.rmtree(TESTFN, ignore_errors=True)
- def test_destinsrc_false_positive(self):
- os.mkdir(TESTFN)
- try:
- for src, dst in [('srcdir', 'src/dest'), ('srcdir', 'srcdir.new')]:
- src = os.path.join(TESTFN, src)
- dst = os.path.join(TESTFN, dst)
- self.assertFalse(shutil._destinsrc(src, dst),
- msg='_destinsrc() wrongly concluded that '
- 'dst (%s) is in src (%s)' % (dst, src))
- finally:
- shutil.rmtree(TESTFN, ignore_errors=True)
- class TestCopyFile(unittest.TestCase):
- _delete = False
- class Faux(object):
- _entered = False
- _exited_with = None
- _raised = False
- def __init__(self, raise_in_exit=False, suppress_at_exit=True):
- self._raise_in_exit = raise_in_exit
- self._suppress_at_exit = suppress_at_exit
- def read(self, *args):
- return ''
- def __enter__(self):
- self._entered = True
- def __exit__(self, exc_type, exc_val, exc_tb):
- self._exited_with = exc_type, exc_val, exc_tb
- if self._raise_in_exit:
- self._raised = True
- raise IOError("Cannot close")
- return self._suppress_at_exit
- def tearDown(self):
- if self._delete:
- del shutil.open
- def _set_shutil_open(self, func):
- shutil.open = func
- self._delete = True
- def test_w_source_open_fails(self):
- def _open(filename, mode='r'):
- if filename == 'srcfile':
- raise IOError('Cannot open "srcfile"')
- assert 0 # shouldn't reach here.
- self._set_shutil_open(_open)
- self.assertRaises(IOError, shutil.copyfile, 'srcfile', 'destfile')
- def test_w_dest_open_fails(self):
- srcfile = self.Faux()
- def _open(filename, mode='r'):
- if filename == 'srcfile':
- return srcfile
- if filename == 'destfile':
- raise IOError('Cannot open "destfile"')
- assert 0 # shouldn't reach here.
- self._set_shutil_open(_open)
- shutil.copyfile('srcfile', 'destfile')
- self.assertTrue(srcfile._entered)
- self.assertTrue(srcfile._exited_with[0] is IOError)
- self.assertEqual(srcfile._exited_with[1].args,
- ('Cannot open "destfile"',))
- def test_w_dest_close_fails(self):
- srcfile = self.Faux()
- destfile = self.Faux(True)
- def _open(filename, mode='r'):
- if filename == 'srcfile':
- return srcfile
- if filename == 'destfile':
- return destfile
- assert 0 # shouldn't reach here.
- self._set_shutil_open(_open)
- shutil.copyfile('srcfile', 'destfile')
- self.assertTrue(srcfile._entered)
- self.assertTrue(destfile._entered)
- self.assertTrue(destfile._raised)
- self.assertTrue(srcfile._exited_with[0] is IOError)
- self.assertEqual(srcfile._exited_with[1].args,
- ('Cannot close',))
- def test_w_source_close_fails(self):
- srcfile = self.Faux(True)
- destfile = self.Faux()
- def _open(filename, mode='r'):
- if filename == 'srcfile':
- return srcfile
- if filename == 'destfile':
- return destfile
- assert 0 # shouldn't reach here.
- self._set_shutil_open(_open)
- self.assertRaises(IOError,
- shutil.copyfile, 'srcfile', 'destfile')
- self.assertTrue(srcfile._entered)
- self.assertTrue(destfile._entered)
- self.assertFalse(destfile._raised)
- self.assertTrue(srcfile._exited_with[0] is None)
- self.assertTrue(srcfile._raised)
- def test_move_dir_caseinsensitive(self):
- # Renames a folder to the same name
- # but a different case.
- self.src_dir = tempfile.mkdtemp()
- dst_dir = os.path.join(
- os.path.dirname(self.src_dir),
- os.path.basename(self.src_dir).upper())
- self.assertNotEqual(self.src_dir, dst_dir)
- try:
- shutil.move(self.src_dir, dst_dir)
- self.assertTrue(os.path.isdir(dst_dir))
- finally:
- if os.path.exists(dst_dir):
- os.rmdir(dst_dir)
- def test_main():
- support.run_unittest(TestShutil, TestMove, TestCopyFile)
- if __name__ == '__main__':
- test_main()
|