test_fileio.py 16 KB


  1. # Adapted from test_file.py by Daniel Stutzbach
  2. from __future__ import unicode_literals
  3. import sys
  4. import os
  5. import errno
  6. import unittest
  7. from array import array
  8. from weakref import proxy
  9. from functools import wraps
  10. from UserList import UserList
  11. from test.test_support import TESTFN, check_warnings, run_unittest, make_bad_fd
  12. from test.test_support import py3k_bytes as bytes, cpython_only
  13. from test.script_helper import run_python
  14. from _io import FileIO as _FileIO
  15. class AutoFileTests(unittest.TestCase):
  16. # file tests for which a test file is automatically set up
  17. def setUp(self):
  18. self.f = _FileIO(TESTFN, 'w')
  19. def tearDown(self):
  20. if self.f:
  21. self.f.close()
  22. os.remove(TESTFN)
  23. def testWeakRefs(self):
  24. # verify weak references
  25. p = proxy(self.f)
  26. p.write(bytes(range(10)))
  27. self.assertEqual(self.f.tell(), p.tell())
  28. self.f.close()
  29. self.f = None
  30. self.assertRaises(ReferenceError, getattr, p, 'tell')
  31. def testSeekTell(self):
  32. self.f.write(bytes(range(20)))
  33. self.assertEqual(self.f.tell(), 20)
  34. self.f.seek(0)
  35. self.assertEqual(self.f.tell(), 0)
  36. self.f.seek(10)
  37. self.assertEqual(self.f.tell(), 10)
  38. self.f.seek(5, 1)
  39. self.assertEqual(self.f.tell(), 15)
  40. self.f.seek(-5, 1)
  41. self.assertEqual(self.f.tell(), 10)
  42. self.f.seek(-5, 2)
  43. self.assertEqual(self.f.tell(), 15)
  44. def testAttributes(self):
  45. # verify expected attributes exist
  46. f = self.f
  47. self.assertEqual(f.mode, "wb")
  48. self.assertEqual(f.closed, False)
  49. # verify the attributes are readonly
  50. for attr in 'mode', 'closed':
  51. self.assertRaises((AttributeError, TypeError),
  52. setattr, f, attr, 'oops')
  53. def testReadinto(self):
  54. # verify readinto
  55. self.f.write(b"\x01\x02")
  56. self.f.close()
  57. a = array(b'b', b'x'*10)
  58. self.f = _FileIO(TESTFN, 'r')
  59. n = self.f.readinto(a)
  60. self.assertEqual(array(b'b', [1, 2]), a[:n])
  61. def testWritelinesList(self):
  62. l = [b'123', b'456']
  63. self.f.writelines(l)
  64. self.f.close()
  65. self.f = _FileIO(TESTFN, 'rb')
  66. buf = self.f.read()
  67. self.assertEqual(buf, b'123456')
  68. def testWritelinesUserList(self):
  69. l = UserList([b'123', b'456'])
  70. self.f.writelines(l)
  71. self.f.close()
  72. self.f = _FileIO(TESTFN, 'rb')
  73. buf = self.f.read()
  74. self.assertEqual(buf, b'123456')
  75. def testWritelinesError(self):
  76. self.assertRaises(TypeError, self.f.writelines, [1, 2, 3])
  77. self.assertRaises(TypeError, self.f.writelines, None)
  78. def test_none_args(self):
  79. self.f.write(b"hi\nbye\nabc")
  80. self.f.close()
  81. self.f = _FileIO(TESTFN, 'r')
  82. self.assertEqual(self.f.read(None), b"hi\nbye\nabc")
  83. self.f.seek(0)
  84. self.assertEqual(self.f.readline(None), b"hi\n")
  85. self.assertEqual(self.f.readlines(None), [b"bye\n", b"abc"])
  86. def testRepr(self):
  87. self.assertEqual(repr(self.f), "<_io.FileIO name=%r mode='%s'>"
  88. % (self.f.name, self.f.mode))
  89. del self.f.name
  90. self.assertEqual(repr(self.f), "<_io.FileIO fd=%r mode='%s'>"
  91. % (self.f.fileno(), self.f.mode))
  92. self.f.close()
  93. self.assertEqual(repr(self.f), "<_io.FileIO [closed]>")
  94. def testErrors(self):
  95. f = self.f
  96. self.assertFalse(f.isatty())
  97. self.assertFalse(f.closed)
  98. #self.assertEqual(f.name, TESTFN)
  99. self.assertRaises(ValueError, f.read, 10) # Open for reading
  100. f.close()
  101. self.assertTrue(f.closed)
  102. f = _FileIO(TESTFN, 'r')
  103. self.assertRaises(TypeError, f.readinto, "")
  104. self.assertFalse(f.closed)
  105. f.close()
  106. self.assertTrue(f.closed)
  107. def testMethods(self):
  108. methods = ['fileno', 'isatty', 'seekable', 'readable', 'writable',
  109. 'read', 'readall', 'readline', 'readlines',
  110. 'tell', 'truncate', 'flush']
  111. if sys.platform.startswith('atheos'):
  112. methods.remove('truncate')
  113. self.f.close()
  114. self.assertTrue(self.f.closed)
  115. for methodname in methods:
  116. method = getattr(self.f, methodname)
  117. # should raise on closed file
  118. self.assertRaises(ValueError, method)
  119. self.assertRaises(ValueError, self.f.readinto) # XXX should be TypeError?
  120. self.assertRaises(ValueError, self.f.readinto, bytearray(1))
  121. self.assertRaises(ValueError, self.f.seek)
  122. self.assertRaises(ValueError, self.f.seek, 0)
  123. self.assertRaises(ValueError, self.f.write)
  124. self.assertRaises(ValueError, self.f.write, b'')
  125. self.assertRaises(TypeError, self.f.writelines)
  126. self.assertRaises(ValueError, self.f.writelines, b'')
  127. def testOpendir(self):
  128. # Issue 3703: opening a directory should fill the errno
  129. # Windows always returns "[Errno 13]: Permission denied
  130. # Unix calls dircheck() and returns "[Errno 21]: Is a directory"
  131. try:
  132. _FileIO('.', 'r')
  133. except IOError as e:
  134. self.assertNotEqual(e.errno, 0)
  135. self.assertEqual(e.filename, ".")
  136. else:
  137. self.fail("Should have raised IOError")
  138. @unittest.skipIf(os.name == 'nt', "test only works on a POSIX-like system")
  139. def testOpenDirFD(self):
  140. fd = os.open('.', os.O_RDONLY)
  141. with self.assertRaises(IOError) as cm:
  142. _FileIO(fd, 'r')
  143. os.close(fd)
  144. self.assertEqual(cm.exception.errno, errno.EISDIR)
  145. #A set of functions testing that we get expected behaviour if someone has
  146. #manually closed the internal file descriptor. First, a decorator:
  147. def ClosedFD(func):
  148. @wraps(func)
  149. def wrapper(self):
  150. #forcibly close the fd before invoking the problem function
  151. f = self.f
  152. os.close(f.fileno())
  153. try:
  154. func(self, f)
  155. finally:
  156. try:
  157. self.f.close()
  158. except IOError:
  159. pass
  160. return wrapper
  161. def ClosedFDRaises(func):
  162. @wraps(func)
  163. def wrapper(self):
  164. #forcibly close the fd before invoking the problem function
  165. f = self.f
  166. os.close(f.fileno())
  167. try:
  168. func(self, f)
  169. except IOError as e:
  170. self.assertEqual(e.errno, errno.EBADF)
  171. else:
  172. self.fail("Should have raised IOError")
  173. finally:
  174. try:
  175. self.f.close()
  176. except IOError:
  177. pass
  178. return wrapper
  179. @ClosedFDRaises
  180. def testErrnoOnClose(self, f):
  181. f.close()
  182. @ClosedFDRaises
  183. def testErrnoOnClosedWrite(self, f):
  184. f.write('a')
  185. @ClosedFDRaises
  186. def testErrnoOnClosedSeek(self, f):
  187. f.seek(0)
  188. @ClosedFDRaises
  189. def testErrnoOnClosedTell(self, f):
  190. f.tell()
  191. @ClosedFDRaises
  192. def testErrnoOnClosedTruncate(self, f):
  193. f.truncate(0)
  194. @ClosedFD
  195. def testErrnoOnClosedSeekable(self, f):
  196. f.seekable()
  197. @ClosedFD
  198. def testErrnoOnClosedReadable(self, f):
  199. f.readable()
  200. @ClosedFD
  201. def testErrnoOnClosedWritable(self, f):
  202. f.writable()
  203. @ClosedFD
  204. def testErrnoOnClosedFileno(self, f):
  205. f.fileno()
  206. @ClosedFD
  207. def testErrnoOnClosedIsatty(self, f):
  208. self.assertEqual(f.isatty(), False)
  209. def ReopenForRead(self):
  210. try:
  211. self.f.close()
  212. except IOError:
  213. pass
  214. self.f = _FileIO(TESTFN, 'r')
  215. os.close(self.f.fileno())
  216. return self.f
  217. @ClosedFDRaises
  218. def testErrnoOnClosedRead(self, f):
  219. f = self.ReopenForRead()
  220. f.read(1)
  221. @ClosedFDRaises
  222. def testErrnoOnClosedReadall(self, f):
  223. f = self.ReopenForRead()
  224. f.readall()
  225. @ClosedFDRaises
  226. def testErrnoOnClosedReadinto(self, f):
  227. f = self.ReopenForRead()
  228. a = array(b'b', b'x'*10)
  229. f.readinto(a)
  230. class OtherFileTests(unittest.TestCase):
  231. def testAbles(self):
  232. try:
  233. f = _FileIO(TESTFN, "w")
  234. self.assertEqual(f.readable(), False)
  235. self.assertEqual(f.writable(), True)
  236. self.assertEqual(f.seekable(), True)
  237. f.close()
  238. f = _FileIO(TESTFN, "r")
  239. self.assertEqual(f.readable(), True)
  240. self.assertEqual(f.writable(), False)
  241. self.assertEqual(f.seekable(), True)
  242. f.close()
  243. f = _FileIO(TESTFN, "a+")
  244. self.assertEqual(f.readable(), True)
  245. self.assertEqual(f.writable(), True)
  246. self.assertEqual(f.seekable(), True)
  247. self.assertEqual(f.isatty(), False)
  248. f.close()
  249. finally:
  250. os.unlink(TESTFN)
  251. @unittest.skipIf(sys.platform == 'win32', 'no ttys on Windows')
  252. def testAblesOnTTY(self):
  253. try:
  254. f = _FileIO("/dev/tty", "a")
  255. except EnvironmentError:
  256. # When run in a cron job there just aren't any
  257. # ttys, so skip the test. This also handles other
  258. # OS'es that don't support /dev/tty.
  259. self.skipTest('need /dev/tty')
  260. else:
  261. self.assertEqual(f.readable(), False)
  262. self.assertEqual(f.writable(), True)
  263. if sys.platform != "darwin" and \
  264. 'bsd' not in sys.platform and \
  265. not sys.platform.startswith('sunos'):
  266. # Somehow /dev/tty appears seekable on some BSDs
  267. self.assertEqual(f.seekable(), False)
  268. self.assertEqual(f.isatty(), True)
  269. f.close()
  270. def testInvalidModeStrings(self):
  271. # check invalid mode strings
  272. for mode in ("", "aU", "wU+", "rw", "rt"):
  273. try:
  274. f = _FileIO(TESTFN, mode)
  275. except ValueError:
  276. pass
  277. else:
  278. f.close()
  279. self.fail('%r is an invalid file mode' % mode)
  280. def testModeStrings(self):
  281. # test that the mode attribute is correct for various mode strings
  282. # given as init args
  283. try:
  284. for modes in [('w', 'wb'), ('wb', 'wb'), ('wb+', 'rb+'),
  285. ('w+b', 'rb+'), ('a', 'ab'), ('ab', 'ab'),
  286. ('ab+', 'ab+'), ('a+b', 'ab+'), ('r', 'rb'),
  287. ('rb', 'rb'), ('rb+', 'rb+'), ('r+b', 'rb+')]:
  288. # read modes are last so that TESTFN will exist first
  289. with _FileIO(TESTFN, modes[0]) as f:
  290. self.assertEqual(f.mode, modes[1])
  291. finally:
  292. if os.path.exists(TESTFN):
  293. os.unlink(TESTFN)
  294. def testUnicodeOpen(self):
  295. # verify repr works for unicode too
  296. f = _FileIO(str(TESTFN), "w")
  297. f.close()
  298. os.unlink(TESTFN)
  299. def testBytesOpen(self):
  300. # Opening a bytes filename
  301. try:
  302. fn = TESTFN.encode("ascii")
  303. except UnicodeEncodeError:
  304. self.skipTest('could not encode %r to ascii' % TESTFN)
  305. f = _FileIO(fn, "w")
  306. try:
  307. f.write(b"abc")
  308. f.close()
  309. with open(TESTFN, "rb") as f:
  310. self.assertEqual(f.read(), b"abc")
  311. finally:
  312. os.unlink(TESTFN)
  313. def testInvalidFd(self):
  314. self.assertRaises(ValueError, _FileIO, -10)
  315. self.assertRaises(OSError, _FileIO, make_bad_fd())
  316. if sys.platform == 'win32':
  317. import msvcrt
  318. self.assertRaises(IOError, msvcrt.get_osfhandle, make_bad_fd())
  319. @cpython_only
  320. def testInvalidFd_overflow(self):
  321. # Issue 15989
  322. import _testcapi
  323. self.assertRaises(TypeError, _FileIO, _testcapi.INT_MAX + 1)
  324. self.assertRaises(TypeError, _FileIO, _testcapi.INT_MIN - 1)
  325. def testBadModeArgument(self):
  326. # verify that we get a sensible error message for bad mode argument
  327. bad_mode = "qwerty"
  328. try:
  329. f = _FileIO(TESTFN, bad_mode)
  330. except ValueError as msg:
  331. if msg.args[0] != 0:
  332. s = str(msg)
  333. if TESTFN in s or bad_mode not in s:
  334. self.fail("bad error message for invalid mode: %s" % s)
  335. # if msg.args[0] == 0, we're probably on Windows where there may be
  336. # no obvious way to discover why open() failed.
  337. else:
  338. f.close()
  339. self.fail("no error for invalid mode: %s" % bad_mode)
  340. def testTruncate(self):
  341. f = _FileIO(TESTFN, 'w')
  342. f.write(bytes(bytearray(range(10))))
  343. self.assertEqual(f.tell(), 10)
  344. f.truncate(5)
  345. self.assertEqual(f.tell(), 10)
  346. self.assertEqual(f.seek(0, os.SEEK_END), 5)
  347. f.truncate(15)
  348. self.assertEqual(f.tell(), 5)
  349. self.assertEqual(f.seek(0, os.SEEK_END), 15)
  350. f.close()
  351. def testTruncateOnWindows(self):
  352. def bug801631():
  353. # SF bug <http://www.python.org/sf/801631>
  354. # "file.truncate fault on windows"
  355. f = _FileIO(TESTFN, 'w')
  356. f.write(bytes(range(11)))
  357. f.close()
  358. f = _FileIO(TESTFN,'r+')
  359. data = f.read(5)
  360. if data != bytes(range(5)):
  361. self.fail("Read on file opened for update failed %r" % data)
  362. if f.tell() != 5:
  363. self.fail("File pos after read wrong %d" % f.tell())
  364. f.truncate()
  365. if f.tell() != 5:
  366. self.fail("File pos after ftruncate wrong %d" % f.tell())
  367. f.close()
  368. size = os.path.getsize(TESTFN)
  369. if size != 5:
  370. self.fail("File size after ftruncate wrong %d" % size)
  371. try:
  372. bug801631()
  373. finally:
  374. os.unlink(TESTFN)
  375. def testAppend(self):
  376. try:
  377. f = open(TESTFN, 'wb')
  378. f.write(b'spam')
  379. f.close()
  380. f = open(TESTFN, 'ab')
  381. f.write(b'eggs')
  382. f.close()
  383. f = open(TESTFN, 'rb')
  384. d = f.read()
  385. f.close()
  386. self.assertEqual(d, b'spameggs')
  387. finally:
  388. try:
  389. os.unlink(TESTFN)
  390. except:
  391. pass
  392. def testInvalidInit(self):
  393. self.assertRaises(TypeError, _FileIO, "1", 0, 0)
  394. def testWarnings(self):
  395. with check_warnings(quiet=True) as w:
  396. self.assertEqual(w.warnings, [])
  397. self.assertRaises(TypeError, _FileIO, [])
  398. self.assertEqual(w.warnings, [])
  399. self.assertRaises(ValueError, _FileIO, "/some/invalid/name", "rt")
  400. self.assertEqual(w.warnings, [])
  401. def test_surrogates(self):
  402. # Issue #8438: try to open a filename containing surrogates.
  403. # It should either fail because the file doesn't exist or the filename
  404. # can't be represented using the filesystem encoding, but not because
  405. # of a LookupError for the error handler "surrogateescape".
  406. filename = u'\udc80.txt'
  407. try:
  408. with _FileIO(filename):
  409. pass
  410. except (UnicodeEncodeError, IOError):
  411. pass
  412. # Spawn a separate Python process with a different "file system
  413. # default encoding", to exercise this further.
  414. env = dict(os.environ)
  415. env[b'LC_CTYPE'] = b'C'
  416. _, out = run_python('-c', 'import _io; _io.FileIO(%r)' % filename, env=env)
  417. if ('UnicodeEncodeError' not in out and not
  418. ( ('IOError: [Errno 2] No such file or directory' in out) or
  419. ('IOError: [Errno 22] Invalid argument' in out) ) ):
  420. self.fail('Bad output: %r' % out)
  421. def testUnclosedFDOnException(self):
  422. class MyException(Exception): pass
  423. class MyFileIO(_FileIO):
  424. def __setattr__(self, name, value):
  425. if name == "name":
  426. raise MyException("blocked setting name")
  427. return super(MyFileIO, self).__setattr__(name, value)
  428. fd = os.open(__file__, os.O_RDONLY)
  429. self.assertRaises(MyException, MyFileIO, fd)
  430. os.close(fd) # should not raise OSError(EBADF)
  431. def test_main():
  432. # Historically, these tests have been sloppy about removing TESTFN.
  433. # So get rid of it no matter what.
  434. try:
  435. run_unittest(AutoFileTests, OtherFileTests)
  436. finally:
  437. if os.path.exists(TESTFN):
  438. os.unlink(TESTFN)
  439. if __name__ == '__main__':
  440. test_main()