123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506 |
- # Adapted from test_file.py by Daniel Stutzbach
- from __future__ import unicode_literals
- import sys
- import os
- import errno
- import unittest
- from array import array
- from weakref import proxy
- from functools import wraps
- from UserList import UserList
- from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd
- from test.test_support import py3k_bytes as bytes, cpython_only
- from test.script_helper import run_python
- from _io import FileIO as _FileIO
- class AutoFileTests(unittest.TestCase):
- # file tests for which a test file is automatically set up
- def setUp(self):
- self.f = _FileIO(TESTFN, 'w')
- def tearDown(self):
- if self.f:
- self.f.close()
- os.remove(TESTFN)
- def testWeakRefs(self):
- # verify weak references
- p = proxy(self.f)
- p.write(bytes(range(10)))
- self.assertEqual(self.f.tell(), p.tell())
- self.f.close()
- self.f = None
- self.assertRaises(ReferenceError, getattr, p, 'tell')
- def testSeekTell(self):
- self.f.write(bytes(range(20)))
- self.assertEqual(self.f.tell(), 20)
- self.f.seek(0)
- self.assertEqual(self.f.tell(), 0)
- self.f.seek(10)
- self.assertEqual(self.f.tell(), 10)
- self.f.seek(5, 1)
- self.assertEqual(self.f.tell(), 15)
- self.f.seek(-5, 1)
- self.assertEqual(self.f.tell(), 10)
- self.f.seek(-5, 2)
- self.assertEqual(self.f.tell(), 15)
- def testAttributes(self):
- # verify expected attributes exist
- f = self.f
- self.assertEqual(f.mode, "wb")
- self.assertEqual(f.closed, False)
- # verify the attributes are readonly
- for attr in 'mode', 'closed':
- self.assertRaises((AttributeError, TypeError),
- setattr, f, attr, 'oops')
- def testReadinto(self):
- # verify readinto
- self.f.write(b"\x01\x02")
- self.f.close()
- a = array(b'b', b'x'*10)
- self.f = _FileIO(TESTFN, 'r')
- n = self.f.readinto(a)
- self.assertEqual(array(b'b', [1, 2]), a[:n])
- def testWritelinesList(self):
- l = [b'123', b'456']
- self.f.writelines(l)
- self.f.close()
- self.f = _FileIO(TESTFN, 'rb')
- buf = self.f.read()
- self.assertEqual(buf, b'123456')
- def testWritelinesUserList(self):
- l = UserList([b'123', b'456'])
- self.f.writelines(l)
- self.f.close()
- self.f = _FileIO(TESTFN, 'rb')
- buf = self.f.read()
- self.assertEqual(buf, b'123456')
- def testWritelinesError(self):
- self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
- self.assertRaises(TypeError, self.f.writelines, None)
- def test_none_args(self):
- self.f.write(b"hi\nbye\nabc")
- self.f.close()
- self.f = _FileIO(TESTFN, 'r')
- self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
- self.f.seek(0)
- self.assertEqual(self.f.readline(None), b"hi\n")
- self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
- def testRepr(self):
- self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>"
- % (self.f.name, self.f.mode))
- del self.f.name
- self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>"
- % (self.f.fileno(), self.f.mode))
- self.f.close()
- self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
- def testErrors(self):
- f = self.f
- self.assertFalse(f.isatty())
- self.assertFalse(f.closed)
- #self.assertEqual(f.name, TESTFN)
- self.assertRaises(ValueError, f.read, 10) # Open for reading
- f.close()
- self.assertTrue(f.closed)
- f = _FileIO(TESTFN, 'r')
- self.assertRaises(TypeError, f.readinto, "")
- self.assertFalse(f.closed)
- f.close()
- self.assertTrue(f.closed)
- def testMethods(self):
- methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable',
- 'read', 'readall', 'readline', 'readlines',
- 'tell', 'truncate', 'flush']
- if sys.platform.startswith('atheos'):
- methods.remove('truncate')
- self.f.close()
- self.assertTrue(self.f.closed)
- for methodname in methods:
- method = getattr(self.f, methodname)
- # should raise on closed file
- self.assertRaises(ValueError, method)
- self.assertRaises(ValueError, self.f.readinto) # XXX should be TypeError?
- self.assertRaises(ValueError, self.f.readinto, bytearray(1))
- self.assertRaises(ValueError, self.f.seek)
- self.assertRaises(ValueError, self.f.seek, 0)
- self.assertRaises(ValueError, self.f.write)
- self.assertRaises(ValueError, self.f.write, b'')
- self.assertRaises(TypeError, self.f.writelines)
- self.assertRaises(ValueError, self.f.writelines, b'')
- def testOpendir(self):
- # Issue 3703: opening a directory should fill the errno
- # Windows always returns "[Errno 13]: Permission denied
- # Unix calls dircheck() and returns "[Errno 21]: Is a directory"
- try:
- _FileIO('.', 'r')
- except IOError as e:
- self.assertNotEqual(e.errno, 0)
- self.assertEqual(e.filename, ".")
- else:
- self.fail("Should have raised IOError")
- @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
- def testOpenDirFD(self):
- fd = os.open('.', os.O_RDONLY)
- with self.assertRaises(IOError) as cm:
- _FileIO(fd, 'r')
- os.close(fd)
- self.assertEqual(cm.exception.errno, errno.EISDIR)
- #A set of functions testing that we get expected behaviour if someone has
- #manually closed the internal file descriptor. First, a decorator:
- def ClosedFD(func):
- @wraps(func)
- def wrapper(self):
- #forcibly close the fd before invoking the problem function
- f = self.f
- os.close(f.fileno())
- try:
- func(self, f)
- finally:
- try:
- self.f.close()
- except IOError:
- pass
- return wrapper
- def ClosedFDRaises(func):
- @wraps(func)
- def wrapper(self):
- #forcibly close the fd before invoking the problem function
- f = self.f
- os.close(f.fileno())
- try:
- func(self, f)
- except IOError as e:
- self.assertEqual(e.errno, errno.EBADF)
- else:
- self.fail("Should have raised IOError")
- finally:
- try:
- self.f.close()
- except IOError:
- pass
- return wrapper
- @ClosedFDRaises
- def testErrnoOnClose(self, f):
- f.close()
- @ClosedFDRaises
- def testErrnoOnClosedWrite(self, f):
- f.write('a')
- @ClosedFDRaises
- def testErrnoOnClosedSeek(self, f):
- f.seek(0)
- @ClosedFDRaises
- def testErrnoOnClosedTell(self, f):
- f.tell()
- @ClosedFDRaises
- def testErrnoOnClosedTruncate(self, f):
- f.truncate(0)
- @ClosedFD
- def testErrnoOnClosedSeekable(self, f):
- f.seekable()
- @ClosedFD
- def testErrnoOnClosedReadable(self, f):
- f.readable()
- @ClosedFD
- def testErrnoOnClosedWritable(self, f):
- f.writable()
- @ClosedFD
- def testErrnoOnClosedFileno(self, f):
- f.fileno()
- @ClosedFD
- def testErrnoOnClosedIsatty(self, f):
- self.assertEqual(f.isatty(), False)
- def ReopenForRead(self):
- try:
- self.f.close()
- except IOError:
- pass
- self.f = _FileIO(TESTFN, 'r')
- os.close(self.f.fileno())
- return self.f
- @ClosedFDRaises
- def testErrnoOnClosedRead(self, f):
- f = self.ReopenForRead()
- f.read(1)
- @ClosedFDRaises
- def testErrnoOnClosedReadall(self, f):
- f = self.ReopenForRead()
- f.readall()
- @ClosedFDRaises
- def testErrnoOnClosedReadinto(self, f):
- f = self.ReopenForRead()
- a = array(b'b', b'x'*10)
- f.readinto(a)
- class OtherFileTests(unittest.TestCase):
- def testAbles(self):
- try:
- f = _FileIO(TESTFN, "w")
- self.assertEqual(f.readable(), False)
- self.assertEqual(f.writable(), True)
- self.assertEqual(f.seekable(), True)
- f.close()
- f = _FileIO(TESTFN, "r")
- self.assertEqual(f.readable(), True)
- self.assertEqual(f.writable(), False)
- self.assertEqual(f.seekable(), True)
- f.close()
- f = _FileIO(TESTFN, "a+")
- self.assertEqual(f.readable(), True)
- self.assertEqual(f.writable(), True)
- self.assertEqual(f.seekable(), True)
- self.assertEqual(f.isatty(), False)
- f.close()
- finally:
- os.unlink(TESTFN)
- @unittest.skipIf(sys.platform == 'win32', 'no ttys on Windows')
- def testAblesOnTTY(self):
- try:
- f = _FileIO("/dev/tty", "a")
- except EnvironmentError:
- # When run in a cron job there just aren't any
- # ttys, so skip the test. This also handles other
- # OS'es that don't support /dev/tty.
- self.skipTest('need /dev/tty')
- else:
- self.assertEqual(f.readable(), False)
- self.assertEqual(f.writable(), True)
- if sys.platform != "darwin" and \
- 'bsd' not in sys.platform and \
- not sys.platform.startswith('sunos'):
- # Somehow /dev/tty appears seekable on some BSDs
- self.assertEqual(f.seekable(), False)
- self.assertEqual(f.isatty(), True)
- f.close()
- def testInvalidModeStrings(self):
- # check invalid mode strings
- for mode in ("", "aU", "wU+", "rw", "rt"):
- try:
- f = _FileIO(TESTFN, mode)
- except ValueError:
- pass
- else:
- f.close()
- self.fail('%r is an invalid file mode' % mode)
- def testModeStrings(self):
- # test that the mode attribute is correct for various mode strings
- # given as init args
- try:
- for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'),
- ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'),
- ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
- ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
- # read modes are last so that TESTFN will exist first
- with _FileIO(TESTFN, modes[0]) as f:
- self.assertEqual(f.mode, modes[1])
- finally:
- if os.path.exists(TESTFN):
- os.unlink(TESTFN)
- def testUnicodeOpen(self):
- # verify repr works for unicode too
- f = _FileIO(str(TESTFN), "w")
- f.close()
- os.unlink(TESTFN)
- def testBytesOpen(self):
- # Opening a bytes filename
- try:
- fn = TESTFN.encode("ascii")
- except UnicodeEncodeError:
- self.skipTest('could not encode %r to ascii' % TESTFN)
- f = _FileIO(fn, "w")
- try:
- f.write(b"abc")
- f.close()
- with open(TESTFN, "rb") as f:
- self.assertEqual(f.read(), b"abc")
- finally:
- os.unlink(TESTFN)
- def testInvalidFd(self):
- self.assertRaises(ValueError, _FileIO, -10)
- self.assertRaises(OSError, _FileIO, make_bad_fd())
- if sys.platform == 'win32':
- import msvcrt
- self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd())
- @cpython_only
- def testInvalidFd_overflow(self):
- # Issue 15989
- import _testcapi
- self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
- self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
- def testBadModeArgument(self):
- # verify that we get a sensible error message for bad mode argument
- bad_mode = "qwerty"
- try:
- f = _FileIO(TESTFN, bad_mode)
- except ValueError as msg:
- if msg.args[0] != 0:
- s = str(msg)
- if TESTFN in s or bad_mode not in s:
- self.fail("bad error message for invalid mode: %s" % s)
- # if msg.args[0] == 0, we're probably on Windows where there may be
- # no obvious way to discover why open() failed.
- else:
- f.close()
- self.fail("no error for invalid mode: %s" % bad_mode)
- def testTruncate(self):
- f = _FileIO(TESTFN, 'w')
- f.write(bytes(bytearray(range(10))))
- self.assertEqual(f.tell(), 10)
- f.truncate(5)
- self.assertEqual(f.tell(), 10)
- self.assertEqual(f.seek(0, os.SEEK_END), 5)
- f.truncate(15)
- self.assertEqual(f.tell(), 5)
- self.assertEqual(f.seek(0, os.SEEK_END), 15)
- f.close()
- def testTruncateOnWindows(self):
- def bug801631():
- # SF bug <http://www.python.org/sf/801631>
- # "file.truncate fault on windows"
- f = _FileIO(TESTFN, 'w')
- f.write(bytes(range(11)))
- f.close()
- f = _FileIO(TESTFN,'r+')
- data = f.read(5)
- if data != bytes(range(5)):
- self.fail("Read on file opened for update failed %r" % data)
- if f.tell() != 5:
- self.fail("File pos after read wrong %d" % f.tell())
- f.truncate()
- if f.tell() != 5:
- self.fail("File pos after ftruncate wrong %d" % f.tell())
- f.close()
- size = os.path.getsize(TESTFN)
- if size != 5:
- self.fail("File size after ftruncate wrong %d" % size)
- try:
- bug801631()
- finally:
- os.unlink(TESTFN)
- def testAppend(self):
- try:
- f = open(TESTFN, 'wb')
- f.write(b'spam')
- f.close()
- f = open(TESTFN, 'ab')
- f.write(b'eggs')
- f.close()
- f = open(TESTFN, 'rb')
- d = f.read()
- f.close()
- self.assertEqual(d, b'spameggs')
- finally:
- try:
- os.unlink(TESTFN)
- except:
- pass
- def testInvalidInit(self):
- self.assertRaises(TypeError, _FileIO, "1", 0, 0)
- def testWarnings(self):
- with check_warnings(quiet=True) as w:
- self.assertEqual(w.warnings, [])
- self.assertRaises(TypeError, _FileIO, [])
- self.assertEqual(w.warnings, [])
- self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
- self.assertEqual(w.warnings, [])
- def test_surrogates(self):
- # Issue #8438: try to open a filename containing surrogates.
- # It should either fail because the file doesn't exist or the filename
- # can't be represented using the filesystem encoding, but not because
- # of a LookupError for the error handler "surrogateescape".
- filename = u'\udc80.txt'
- try:
- with _FileIO(filename):
- pass
- except (UnicodeEncodeError, IOError):
- pass
- # Spawn a separate Python process with a different "file system
- # default encoding", to exercise this further.
- env = dict(os.environ)
- env[b'LC_CTYPE'] = b'C'
- _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env)
- if ('UnicodeEncodeError' not in out and not
- ( ('IOError: [Errno 2] No such file or directory' in out) or
- ('IOError: [Errno 22] Invalid argument' in out) ) ):
- self.fail('Bad output: %r' % out)
- def testUnclosedFDOnException(self):
- class MyException(Exception): pass
- class MyFileIO(_FileIO):
- def __setattr__(self, name, value):
- if name == "name":
- raise MyException("blocked setting name")
- return super(MyFileIO, self).__setattr__(name, value)
- fd = os.open(__file__, os.O_RDONLY)
- self.assertRaises(MyException, MyFileIO, fd)
- os.close(fd) # should not raise OSError(EBADF)
- def test_main():
- # Historically, these tests have been sloppy about removing TESTFN.
- # So get rid of it no matter what.
- try:
- run_unittest(AutoFileTests, OtherFileTests)
- finally:
- if os.path.exists(TESTFN):
- os.unlink(TESTFN)
- if __name__ == '__main__':
- test_main()
|