test_io.py 119 KB


  1. """Unit tests for the io module."""
  2. # Tests of io are scattered over the test suite:
  3. # * test_bufio - tests file buffering
  4. # * test_memoryio - tests BytesIO and StringIO
  5. # * test_fileio - tests FileIO
  6. # * test_file - tests the file interface
  7. # * test_io - tests everything else in the io module
  8. # * test_univnewlines - tests universal newline support
  9. # * test_largefile - tests operations on a file greater than 2**32 bytes
  10. # (only enabled with -ulargefile)
  11. ################################################################################
  12. # ATTENTION TEST WRITERS!!!
  13. ################################################################################
  14. # When writing tests for io, it's important to test both the C and Python
  15. # implementations. This is usually done by writing a base test that refers to
  16. # the type it is testing as an attribute. Then it provides custom subclasses to
  17. # test both implementations. This file has lots of examples.
  18. ################################################################################
  19. from __future__ import print_function
  20. from __future__ import unicode_literals
  21. import os
  22. import sys
  23. import time
  24. import array
  25. import random
  26. import unittest
  27. import weakref
  28. import warnings
  29. import abc
  30. import signal
  31. import errno
  32. from itertools import cycle, count
  33. from collections import deque
  34. from UserList import UserList
  35. from test import test_support as support
  36. import contextlib
  37. import codecs
  38. import io # C implementation of io
  39. import _pyio as pyio # Python implementation of io
  40. try:
  41. import threading
  42. except ImportError:
  43. threading = None
  44. try:
  45. import fcntl
  46. except ImportError:
  47. fcntl = None
  48. __metaclass__ = type
  49. bytes = support.py3k_bytes
  50. def byteslike(*pos, **kw):
  51. return memoryview(bytearray(*pos, **kw))
  52. def _default_chunk_size():
  53. """Get the default TextIOWrapper chunk size"""
  54. with io.open(__file__, "r", encoding="latin1") as f:
  55. return f._CHUNK_SIZE
  56. class MockRawIOWithoutRead:
  57. """A RawIO implementation without read(), so as to exercise the default
  58. RawIO.read() which calls readinto()."""
  59. def __init__(self, read_stack=()):
  60. self._read_stack = list(read_stack)
  61. self._write_stack = []
  62. self._reads = 0
  63. self._extraneous_reads = 0
  64. def write(self, b):
  65. self._write_stack.append(bytes(b))
  66. return len(b)
  67. def writable(self):
  68. return True
  69. def fileno(self):
  70. return 42
  71. def readable(self):
  72. return True
  73. def seekable(self):
  74. return True
  75. def seek(self, pos, whence):
  76. return 0 # wrong but we gotta return something
  77. def tell(self):
  78. return 0 # same comment as above
  79. def readinto(self, buf):
  80. self._reads += 1
  81. max_len = len(buf)
  82. try:
  83. data = self._read_stack[0]
  84. except IndexError:
  85. self._extraneous_reads += 1
  86. return 0
  87. if data is None:
  88. del self._read_stack[0]
  89. return None
  90. n = len(data)
  91. if len(data) <= max_len:
  92. del self._read_stack[0]
  93. buf[:n] = data
  94. return n
  95. else:
  96. buf[:] = data[:max_len]
  97. self._read_stack[0] = data[max_len:]
  98. return max_len
  99. def truncate(self, pos=None):
  100. return pos
  101. class CMockRawIOWithoutRead(MockRawIOWithoutRead, io.RawIOBase):
  102. pass
  103. class PyMockRawIOWithoutRead(MockRawIOWithoutRead, pyio.RawIOBase):
  104. pass
  105. class MockRawIO(MockRawIOWithoutRead):
  106. def read(self, n=None):
  107. self._reads += 1
  108. try:
  109. return self._read_stack.pop(0)
  110. except:
  111. self._extraneous_reads += 1
  112. return b""
  113. class CMockRawIO(MockRawIO, io.RawIOBase):
  114. pass
  115. class PyMockRawIO(MockRawIO, pyio.RawIOBase):
  116. pass
  117. class MisbehavedRawIO(MockRawIO):
  118. def write(self, b):
  119. return MockRawIO.write(self, b) * 2
  120. def read(self, n=None):
  121. return MockRawIO.read(self, n) * 2
  122. def seek(self, pos, whence):
  123. return -123
  124. def tell(self):
  125. return -456
  126. def readinto(self, buf):
  127. MockRawIO.readinto(self, buf)
  128. return len(buf) * 5
  129. class CMisbehavedRawIO(MisbehavedRawIO, io.RawIOBase):
  130. pass
  131. class PyMisbehavedRawIO(MisbehavedRawIO, pyio.RawIOBase):
  132. pass
  133. class CloseFailureIO(MockRawIO):
  134. closed = 0
  135. def close(self):
  136. if not self.closed:
  137. self.closed = 1
  138. raise IOError
  139. class CCloseFailureIO(CloseFailureIO, io.RawIOBase):
  140. pass
  141. class PyCloseFailureIO(CloseFailureIO, pyio.RawIOBase):
  142. pass
  143. class MockFileIO:
  144. def __init__(self, data):
  145. self.read_history = []
  146. super(MockFileIO, self).__init__(data)
  147. def read(self, n=None):
  148. res = super(MockFileIO, self).read(n)
  149. self.read_history.append(None if res is None else len(res))
  150. return res
  151. def readinto(self, b):
  152. res = super(MockFileIO, self).readinto(b)
  153. self.read_history.append(res)
  154. return res
  155. class CMockFileIO(MockFileIO, io.BytesIO):
  156. pass
  157. class PyMockFileIO(MockFileIO, pyio.BytesIO):
  158. pass
  159. class MockNonBlockWriterIO:
  160. def __init__(self):
  161. self._write_stack = []
  162. self._blocker_char = None
  163. def pop_written(self):
  164. s = b"".join(self._write_stack)
  165. self._write_stack[:] = []
  166. return s
  167. def block_on(self, char):
  168. """Block when a given char is encountered."""
  169. self._blocker_char = char
  170. def readable(self):
  171. return True
  172. def seekable(self):
  173. return True
  174. def writable(self):
  175. return True
  176. def write(self, b):
  177. b = bytes(b)
  178. n = -1
  179. if self._blocker_char:
  180. try:
  181. n = b.index(self._blocker_char)
  182. except ValueError:
  183. pass
  184. else:
  185. if n > 0:
  186. # write data up to the first blocker
  187. self._write_stack.append(b[:n])
  188. return n
  189. else:
  190. # cancel blocker and indicate would block
  191. self._blocker_char = None
  192. return None
  193. self._write_stack.append(b)
  194. return len(b)
  195. class CMockNonBlockWriterIO(MockNonBlockWriterIO, io.RawIOBase):
  196. BlockingIOError = io.BlockingIOError
  197. class PyMockNonBlockWriterIO(MockNonBlockWriterIO, pyio.RawIOBase):
  198. BlockingIOError = pyio.BlockingIOError
  199. class IOTest(unittest.TestCase):
  200. def setUp(self):
  201. support.unlink(support.TESTFN)
  202. def tearDown(self):
  203. support.unlink(support.TESTFN)
  204. def write_ops(self, f):
  205. self.assertEqual(f.write(b"blah."), 5)
  206. f.truncate(0)
  207. self.assertEqual(f.tell(), 5)
  208. f.seek(0)
  209. self.assertEqual(f.write(b"blah."), 5)
  210. self.assertEqual(f.seek(0), 0)
  211. self.assertEqual(f.write(b"Hello."), 6)
  212. self.assertEqual(f.tell(), 6)
  213. self.assertEqual(f.seek(-1, 1), 5)
  214. self.assertEqual(f.tell(), 5)
  215. buffer = bytearray(b" world\n\n\n")
  216. self.assertEqual(f.write(buffer), 9)
  217. buffer[:] = b"*" * 9 # Overwrite our copy of the data
  218. self.assertEqual(f.seek(0), 0)
  219. self.assertEqual(f.write(b"h"), 1)
  220. self.assertEqual(f.seek(-1, 2), 13)
  221. self.assertEqual(f.tell(), 13)
  222. self.assertEqual(f.truncate(12), 12)
  223. self.assertEqual(f.tell(), 13)
  224. self.assertRaises(TypeError, f.seek, 0.0)
  225. def read_ops(self, f, buffered=False):
  226. data = f.read(5)
  227. self.assertEqual(data, b"hello")
  228. data = byteslike(data)
  229. self.assertEqual(f.readinto(data), 5)
  230. self.assertEqual(data.tobytes(), b" worl")
  231. data = bytearray(5)
  232. self.assertEqual(f.readinto(data), 2)
  233. self.assertEqual(len(data), 5)
  234. self.assertEqual(data[:2], b"d\n")
  235. self.assertEqual(f.seek(0), 0)
  236. self.assertEqual(f.read(20), b"hello world\n")
  237. self.assertEqual(f.read(1), b"")
  238. self.assertEqual(f.readinto(byteslike(b"x")), 0)
  239. self.assertEqual(f.seek(-6, 2), 6)
  240. self.assertEqual(f.read(5), b"world")
  241. self.assertEqual(f.read(0), b"")
  242. self.assertEqual(f.readinto(byteslike()), 0)
  243. self.assertEqual(f.seek(-6, 1), 5)
  244. self.assertEqual(f.read(5), b" worl")
  245. self.assertEqual(f.tell(), 10)
  246. self.assertRaises(TypeError, f.seek, 0.0)
  247. if buffered:
  248. f.seek(0)
  249. self.assertEqual(f.read(), b"hello world\n")
  250. f.seek(6)
  251. self.assertEqual(f.read(), b"world\n")
  252. self.assertEqual(f.read(), b"")
  253. LARGE = 2**31
  254. def large_file_ops(self, f):
  255. assert f.readable()
  256. assert f.writable()
  257. self.assertEqual(f.seek(self.LARGE), self.LARGE)
  258. self.assertEqual(f.tell(), self.LARGE)
  259. self.assertEqual(f.write(b"xxx"), 3)
  260. self.assertEqual(f.tell(), self.LARGE + 3)
  261. self.assertEqual(f.seek(-1, 1), self.LARGE + 2)
  262. self.assertEqual(f.truncate(), self.LARGE + 2)
  263. self.assertEqual(f.tell(), self.LARGE + 2)
  264. self.assertEqual(f.seek(0, 2), self.LARGE + 2)
  265. self.assertEqual(f.truncate(self.LARGE + 1), self.LARGE + 1)
  266. self.assertEqual(f.tell(), self.LARGE + 2)
  267. self.assertEqual(f.seek(0, 2), self.LARGE + 1)
  268. self.assertEqual(f.seek(-1, 2), self.LARGE)
  269. self.assertEqual(f.read(2), b"x")
  270. def test_invalid_operations(self):
  271. # Try writing on a file opened in read mode and vice-versa.
  272. for mode in ("w", "wb"):
  273. with self.open(support.TESTFN, mode) as fp:
  274. self.assertRaises(IOError, fp.read)
  275. self.assertRaises(IOError, fp.readline)
  276. with self.open(support.TESTFN, "rb") as fp:
  277. self.assertRaises(IOError, fp.write, b"blah")
  278. self.assertRaises(IOError, fp.writelines, [b"blah\n"])
  279. with self.open(support.TESTFN, "r") as fp:
  280. self.assertRaises(IOError, fp.write, "blah")
  281. self.assertRaises(IOError, fp.writelines, ["blah\n"])
  282. def test_raw_file_io(self):
  283. with self.open(support.TESTFN, "wb", buffering=0) as f:
  284. self.assertEqual(f.readable(), False)
  285. self.assertEqual(f.writable(), True)
  286. self.assertEqual(f.seekable(), True)
  287. self.write_ops(f)
  288. with self.open(support.TESTFN, "rb", buffering=0) as f:
  289. self.assertEqual(f.readable(), True)
  290. self.assertEqual(f.writable(), False)
  291. self.assertEqual(f.seekable(), True)
  292. self.read_ops(f)
  293. def test_buffered_file_io(self):
  294. with self.open(support.TESTFN, "wb") as f:
  295. self.assertEqual(f.readable(), False)
  296. self.assertEqual(f.writable(), True)
  297. self.assertEqual(f.seekable(), True)
  298. self.write_ops(f)
  299. with self.open(support.TESTFN, "rb") as f:
  300. self.assertEqual(f.readable(), True)
  301. self.assertEqual(f.writable(), False)
  302. self.assertEqual(f.seekable(), True)
  303. self.read_ops(f, True)
  304. def test_readline(self):
  305. with self.open(support.TESTFN, "wb") as f:
  306. f.write(b"abc\ndef\nxyzzy\nfoo\x00bar\nanother line")
  307. with self.open(support.TESTFN, "rb") as f:
  308. self.assertEqual(f.readline(), b"abc\n")
  309. self.assertEqual(f.readline(10), b"def\n")
  310. self.assertEqual(f.readline(2), b"xy")
  311. self.assertEqual(f.readline(4), b"zzy\n")
  312. self.assertEqual(f.readline(), b"foo\x00bar\n")
  313. self.assertEqual(f.readline(None), b"another line")
  314. self.assertRaises(TypeError, f.readline, 5.3)
  315. with self.open(support.TESTFN, "r") as f:
  316. self.assertRaises(TypeError, f.readline, 5.3)
  317. def test_raw_bytes_io(self):
  318. f = self.BytesIO()
  319. self.write_ops(f)
  320. data = f.getvalue()
  321. self.assertEqual(data, b"hello world\n")
  322. f = self.BytesIO(data)
  323. self.read_ops(f, True)
  324. def test_large_file_ops(self):
  325. # On Windows and Mac OSX this test comsumes large resources; It takes
  326. # a long time to build the >2GB file and takes >2GB of disk space
  327. # therefore the resource must be enabled to run this test.
  328. if sys.platform[:3] == 'win' or sys.platform == 'darwin':
  329. support.requires(
  330. 'largefile',
  331. 'test requires %s bytes and a long time to run' % self.LARGE)
  332. with self.open(support.TESTFN, "w+b", 0) as f:
  333. self.large_file_ops(f)
  334. with self.open(support.TESTFN, "w+b") as f:
  335. self.large_file_ops(f)
  336. def test_with_open(self):
  337. for bufsize in (0, 1, 100):
  338. f = None
  339. with self.open(support.TESTFN, "wb", bufsize) as f:
  340. f.write(b"xxx")
  341. self.assertEqual(f.closed, True)
  342. f = None
  343. try:
  344. with self.open(support.TESTFN, "wb", bufsize) as f:
  345. 1 // 0
  346. except ZeroDivisionError:
  347. self.assertEqual(f.closed, True)
  348. else:
  349. self.fail("1 // 0 didn't raise an exception")
  350. # issue 5008
  351. def test_append_mode_tell(self):
  352. with self.open(support.TESTFN, "wb") as f:
  353. f.write(b"xxx")
  354. with self.open(support.TESTFN, "ab", buffering=0) as f:
  355. self.assertEqual(f.tell(), 3)
  356. with self.open(support.TESTFN, "ab") as f:
  357. self.assertEqual(f.tell(), 3)
  358. with self.open(support.TESTFN, "a") as f:
  359. self.assertGreater(f.tell(), 0)
  360. def test_destructor(self):
  361. record = []
  362. class MyFileIO(self.FileIO):
  363. def __del__(self):
  364. record.append(1)
  365. try:
  366. f = super(MyFileIO, self).__del__
  367. except AttributeError:
  368. pass
  369. else:
  370. f()
  371. def close(self):
  372. record.append(2)
  373. super(MyFileIO, self).close()
  374. def flush(self):
  375. record.append(3)
  376. super(MyFileIO, self).flush()
  377. f = MyFileIO(support.TESTFN, "wb")
  378. f.write(b"xxx")
  379. del f
  380. support.gc_collect()
  381. self.assertEqual(record, [1, 2, 3])
  382. with self.open(support.TESTFN, "rb") as f:
  383. self.assertEqual(f.read(), b"xxx")
  384. def _check_base_destructor(self, base):
  385. record = []
  386. class MyIO(base):
  387. def __init__(self):
  388. # This exercises the availability of attributes on object
  389. # destruction.
  390. # (in the C version, close() is called by the tp_dealloc
  391. # function, not by __del__)
  392. self.on_del = 1
  393. self.on_close = 2
  394. self.on_flush = 3
  395. def __del__(self):
  396. record.append(self.on_del)
  397. try:
  398. f = super(MyIO, self).__del__
  399. except AttributeError:
  400. pass
  401. else:
  402. f()
  403. def close(self):
  404. record.append(self.on_close)
  405. super(MyIO, self).close()
  406. def flush(self):
  407. record.append(self.on_flush)
  408. super(MyIO, self).flush()
  409. f = MyIO()
  410. del f
  411. support.gc_collect()
  412. self.assertEqual(record, [1, 2, 3])
  413. def test_IOBase_destructor(self):
  414. self._check_base_destructor(self.IOBase)
  415. def test_RawIOBase_destructor(self):
  416. self._check_base_destructor(self.RawIOBase)
  417. def test_BufferedIOBase_destructor(self):
  418. self._check_base_destructor(self.BufferedIOBase)
  419. def test_TextIOBase_destructor(self):
  420. self._check_base_destructor(self.TextIOBase)
  421. def test_close_flushes(self):
  422. with self.open(support.TESTFN, "wb") as f:
  423. f.write(b"xxx")
  424. with self.open(support.TESTFN, "rb") as f:
  425. self.assertEqual(f.read(), b"xxx")
  426. def test_array_writes(self):
  427. a = array.array(b'i', range(10))
  428. n = len(a.tostring())
  429. with self.open(support.TESTFN, "wb", 0) as f:
  430. self.assertEqual(f.write(a), n)
  431. with self.open(support.TESTFN, "wb") as f:
  432. self.assertEqual(f.write(a), n)
  433. def test_closefd(self):
  434. self.assertRaises(ValueError, self.open, support.TESTFN, 'w',
  435. closefd=False)
  436. def test_read_closed(self):
  437. with self.open(support.TESTFN, "w") as f:
  438. f.write("egg\n")
  439. with self.open(support.TESTFN, "r") as f:
  440. file = self.open(f.fileno(), "r", closefd=False)
  441. self.assertEqual(file.read(), "egg\n")
  442. file.seek(0)
  443. file.close()
  444. self.assertRaises(ValueError, file.read)
  445. def test_no_closefd_with_filename(self):
  446. # can't use closefd in combination with a file name
  447. self.assertRaises(ValueError, self.open, support.TESTFN, "r", closefd=False)
  448. def test_closefd_attr(self):
  449. with self.open(support.TESTFN, "wb") as f:
  450. f.write(b"egg\n")
  451. with self.open(support.TESTFN, "r") as f:
  452. self.assertEqual(f.buffer.raw.closefd, True)
  453. file = self.open(f.fileno(), "r", closefd=False)
  454. self.assertEqual(file.buffer.raw.closefd, False)
  455. def test_garbage_collection(self):
  456. # FileIO objects are collected, and collecting them flushes
  457. # all data to disk.
  458. f = self.FileIO(support.TESTFN, "wb")
  459. f.write(b"abcxxx")
  460. f.f = f
  461. wr = weakref.ref(f)
  462. del f
  463. support.gc_collect()
  464. self.assertIsNone(wr(), wr)
  465. with self.open(support.TESTFN, "rb") as f:
  466. self.assertEqual(f.read(), b"abcxxx")
  467. def test_unbounded_file(self):
  468. # Issue #1174606: reading from an unbounded stream such as /dev/zero.
  469. zero = "/dev/zero"
  470. if not os.path.exists(zero):
  471. self.skipTest("{0} does not exist".format(zero))
  472. if sys.maxsize > 0x7FFFFFFF:
  473. self.skipTest("test can only run in a 32-bit address space")
  474. if support.real_max_memuse < support._2G:
  475. self.skipTest("test requires at least 2GB of memory")
  476. with self.open(zero, "rb", buffering=0) as f:
  477. self.assertRaises(OverflowError, f.read)
  478. with self.open(zero, "rb") as f:
  479. self.assertRaises(OverflowError, f.read)
  480. with self.open(zero, "r") as f:
  481. self.assertRaises(OverflowError, f.read)
  482. def check_flush_error_on_close(self, *args, **kwargs):
  483. # Test that the file is closed despite failed flush
  484. # and that flush() is called before file closed.
  485. f = self.open(*args, **kwargs)
  486. closed = []
  487. def bad_flush():
  488. closed[:] = [f.closed]
  489. raise IOError()
  490. f.flush = bad_flush
  491. self.assertRaises(IOError, f.close) # exception not swallowed
  492. self.assertTrue(f.closed)
  493. self.assertTrue(closed) # flush() called
  494. self.assertFalse(closed[0]) # flush() called before file closed
  495. f.flush = lambda: None # break reference loop
  496. def test_flush_error_on_close(self):
  497. # raw file
  498. # Issue #5700: io.FileIO calls flush() after file closed
  499. self.check_flush_error_on_close(support.TESTFN, 'wb', buffering=0)
  500. fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
  501. self.check_flush_error_on_close(fd, 'wb', buffering=0)
  502. fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
  503. self.check_flush_error_on_close(fd, 'wb', buffering=0, closefd=False)
  504. os.close(fd)
  505. # buffered io
  506. self.check_flush_error_on_close(support.TESTFN, 'wb')
  507. fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
  508. self.check_flush_error_on_close(fd, 'wb')
  509. fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
  510. self.check_flush_error_on_close(fd, 'wb', closefd=False)
  511. os.close(fd)
  512. # text io
  513. self.check_flush_error_on_close(support.TESTFN, 'w')
  514. fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
  515. self.check_flush_error_on_close(fd, 'w')
  516. fd = os.open(support.TESTFN, os.O_WRONLY|os.O_CREAT)
  517. self.check_flush_error_on_close(fd, 'w', closefd=False)
  518. os.close(fd)
  519. def test_multi_close(self):
  520. f = self.open(support.TESTFN, "wb", buffering=0)
  521. f.close()
  522. f.close()
  523. f.close()
  524. self.assertRaises(ValueError, f.flush)
  525. def test_RawIOBase_read(self):
  526. # Exercise the default RawIOBase.read() implementation (which calls
  527. # readinto() internally).
  528. rawio = self.MockRawIOWithoutRead((b"abc", b"d", None, b"efg", None))
  529. self.assertEqual(rawio.read(2), b"ab")
  530. self.assertEqual(rawio.read(2), b"c")
  531. self.assertEqual(rawio.read(2), b"d")
  532. self.assertEqual(rawio.read(2), None)
  533. self.assertEqual(rawio.read(2), b"ef")
  534. self.assertEqual(rawio.read(2), b"g")
  535. self.assertEqual(rawio.read(2), None)
  536. self.assertEqual(rawio.read(2), b"")
  537. def test_fileio_closefd(self):
  538. # Issue #4841
  539. with self.open(__file__, 'rb') as f1, \
  540. self.open(__file__, 'rb') as f2:
  541. fileio = self.FileIO(f1.fileno(), closefd=False)
  542. # .__init__() must not close f1
  543. fileio.__init__(f2.fileno(), closefd=False)
  544. f1.readline()
  545. # .close() must not close f2
  546. fileio.close()
  547. f2.readline()
  548. def test_nonbuffered_textio(self):
  549. with warnings.catch_warnings(record=True) as recorded:
  550. with self.assertRaises(ValueError):
  551. self.open(support.TESTFN, 'w', buffering=0)
  552. support.gc_collect()
  553. self.assertEqual(recorded, [])
  554. def test_invalid_newline(self):
  555. with warnings.catch_warnings(record=True) as recorded:
  556. with self.assertRaises(ValueError):
  557. self.open(support.TESTFN, 'w', newline='invalid')
  558. support.gc_collect()
  559. self.assertEqual(recorded, [])
  560. def test_buffered_readinto_mixin(self):
  561. # Test the implementation provided by BufferedIOBase
  562. class Stream(self.BufferedIOBase):
  563. def read(self, size):
  564. return b"12345"
  565. stream = Stream()
  566. buffer = byteslike(5)
  567. self.assertEqual(stream.readinto(buffer), 5)
  568. self.assertEqual(buffer.tobytes(), b"12345")
  569. class CIOTest(IOTest):
  570. def test_IOBase_finalize(self):
  571. # Issue #12149: segmentation fault on _PyIOBase_finalize when both a
  572. # class which inherits IOBase and an object of this class are caught
  573. # in a reference cycle and close() is already in the method cache.
  574. class MyIO(self.IOBase):
  575. def close(self):
  576. pass
  577. # create an instance to populate the method cache
  578. MyIO()
  579. obj = MyIO()
  580. obj.obj = obj
  581. wr = weakref.ref(obj)
  582. del MyIO
  583. del obj
  584. support.gc_collect()
  585. self.assertIsNone(wr(), wr)
  586. class PyIOTest(IOTest):
  587. test_array_writes = unittest.skip(
  588. "len(array.array) returns number of elements rather than bytelength"
  589. )(IOTest.test_array_writes)
  590. class CommonBufferedTests:
  591. # Tests common to BufferedReader, BufferedWriter and BufferedRandom
  592. def test_detach(self):
  593. raw = self.MockRawIO()
  594. buf = self.tp(raw)
  595. self.assertIs(buf.detach(), raw)
  596. self.assertRaises(ValueError, buf.detach)
  597. repr(buf) # Should still work
  598. def test_fileno(self):
  599. rawio = self.MockRawIO()
  600. bufio = self.tp(rawio)
  601. self.assertEqual(42, bufio.fileno())
  602. def test_invalid_args(self):
  603. rawio = self.MockRawIO()
  604. bufio = self.tp(rawio)
  605. # Invalid whence
  606. self.assertRaises(ValueError, bufio.seek, 0, -1)
  607. self.assertRaises(ValueError, bufio.seek, 0, 3)
  608. def test_override_destructor(self):
  609. tp = self.tp
  610. record = []
  611. class MyBufferedIO(tp):
  612. def __del__(self):
  613. record.append(1)
  614. try:
  615. f = super(MyBufferedIO, self).__del__
  616. except AttributeError:
  617. pass
  618. else:
  619. f()
  620. def close(self):
  621. record.append(2)
  622. super(MyBufferedIO, self).close()
  623. def flush(self):
  624. record.append(3)
  625. super(MyBufferedIO, self).flush()
  626. rawio = self.MockRawIO()
  627. bufio = MyBufferedIO(rawio)
  628. writable = bufio.writable()
  629. del bufio
  630. support.gc_collect()
  631. if writable:
  632. self.assertEqual(record, [1, 2, 3])
  633. else:
  634. self.assertEqual(record, [1, 2])
  635. def test_context_manager(self):
  636. # Test usability as a context manager
  637. rawio = self.MockRawIO()
  638. bufio = self.tp(rawio)
  639. def _with():
  640. with bufio:
  641. pass
  642. _with()
  643. # bufio should now be closed, and using it a second time should raise
  644. # a ValueError.
  645. self.assertRaises(ValueError, _with)
  646. def test_error_through_destructor(self):
  647. # Test that the exception state is not modified by a destructor,
  648. # even if close() fails.
  649. rawio = self.CloseFailureIO()
  650. def f():
  651. self.tp(rawio).xyzzy
  652. with support.captured_output("stderr") as s:
  653. self.assertRaises(AttributeError, f)
  654. s = s.getvalue().strip()
  655. if s:
  656. # The destructor *may* have printed an unraisable error, check it
  657. self.assertEqual(len(s.splitlines()), 1)
  658. self.assertTrue(s.startswith("Exception IOError: "), s)
  659. self.assertTrue(s.endswith(" ignored"), s)
  660. def test_repr(self):
  661. raw = self.MockRawIO()
  662. b = self.tp(raw)
  663. clsname = "%s.%s" % (self.tp.__module__, self.tp.__name__)
  664. self.assertEqual(repr(b), "<%s>" % clsname)
  665. raw.name = "dummy"
  666. self.assertEqual(repr(b), "<%s name=u'dummy'>" % clsname)
  667. raw.name = b"dummy"
  668. self.assertEqual(repr(b), "<%s name='dummy'>" % clsname)
  669. def test_flush_error_on_close(self):
  670. # Test that buffered file is closed despite failed flush
  671. # and that flush() is called before file closed.
  672. raw = self.MockRawIO()
  673. closed = []
  674. def bad_flush():
  675. closed[:] = [b.closed, raw.closed]
  676. raise IOError()
  677. raw.flush = bad_flush
  678. b = self.tp(raw)
  679. self.assertRaises(IOError, b.close) # exception not swallowed
  680. self.assertTrue(b.closed)
  681. self.assertTrue(raw.closed)
  682. self.assertTrue(closed) # flush() called
  683. self.assertFalse(closed[0]) # flush() called before file closed
  684. self.assertFalse(closed[1])
  685. raw.flush = lambda: None # break reference loop
  686. def test_close_error_on_close(self):
  687. raw = self.MockRawIO()
  688. def bad_flush():
  689. raise IOError('flush')
  690. def bad_close():
  691. raise IOError('close')
  692. raw.close = bad_close
  693. b = self.tp(raw)
  694. b.flush = bad_flush
  695. with self.assertRaises(IOError) as err: # exception not swallowed
  696. b.close()
  697. self.assertEqual(err.exception.args, ('close',))
  698. self.assertFalse(b.closed)
  699. def test_multi_close(self):
  700. raw = self.MockRawIO()
  701. b = self.tp(raw)
  702. b.close()
  703. b.close()
  704. b.close()
  705. self.assertRaises(ValueError, b.flush)
  706. def test_readonly_attributes(self):
  707. raw = self.MockRawIO()
  708. buf = self.tp(raw)
  709. x = self.MockRawIO()
  710. with self.assertRaises((AttributeError, TypeError)):
  711. buf.raw = x
  712. class SizeofTest:
  713. @support.cpython_only
  714. def test_sizeof(self):
  715. bufsize1 = 4096
  716. bufsize2 = 8192
  717. rawio = self.MockRawIO()
  718. bufio = self.tp(rawio, buffer_size=bufsize1)
  719. size = sys.getsizeof(bufio) - bufsize1
  720. rawio = self.MockRawIO()
  721. bufio = self.tp(rawio, buffer_size=bufsize2)
  722. self.assertEqual(sys.getsizeof(bufio), size + bufsize2)
  723. class BufferedReaderTest(unittest.TestCase, CommonBufferedTests):
  724. read_mode = "rb"
  725. def test_constructor(self):
  726. rawio = self.MockRawIO([b"abc"])
  727. bufio = self.tp(rawio)
  728. bufio.__init__(rawio)
  729. bufio.__init__(rawio, buffer_size=1024)
  730. bufio.__init__(rawio, buffer_size=16)
  731. self.assertEqual(b"abc", bufio.read())
  732. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
  733. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
  734. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
  735. rawio = self.MockRawIO([b"abc"])
  736. bufio.__init__(rawio)
  737. self.assertEqual(b"abc", bufio.read())
  738. def test_uninitialized(self):
  739. bufio = self.tp.__new__(self.tp)
  740. del bufio
  741. bufio = self.tp.__new__(self.tp)
  742. self.assertRaisesRegexp((ValueError, AttributeError),
  743. 'uninitialized|has no attribute',
  744. bufio.read, 0)
  745. bufio.__init__(self.MockRawIO())
  746. self.assertEqual(bufio.read(0), b'')
  747. def test_read(self):
  748. for arg in (None, 7):
  749. rawio = self.MockRawIO((b"abc", b"d", b"efg"))
  750. bufio = self.tp(rawio)
  751. self.assertEqual(b"abcdefg", bufio.read(arg))
  752. # Invalid args
  753. self.assertRaises(ValueError, bufio.read, -2)
  754. def test_read1(self):
  755. rawio = self.MockRawIO((b"abc", b"d", b"efg"))
  756. bufio = self.tp(rawio)
  757. self.assertEqual(b"a", bufio.read(1))
  758. self.assertEqual(b"b", bufio.read1(1))
  759. self.assertEqual(rawio._reads, 1)
  760. self.assertEqual(b"c", bufio.read1(100))
  761. self.assertEqual(rawio._reads, 1)
  762. self.assertEqual(b"d", bufio.read1(100))
  763. self.assertEqual(rawio._reads, 2)
  764. self.assertEqual(b"efg", bufio.read1(100))
  765. self.assertEqual(rawio._reads, 3)
  766. self.assertEqual(b"", bufio.read1(100))
  767. self.assertEqual(rawio._reads, 4)
  768. # Invalid args
  769. self.assertRaises(ValueError, bufio.read1, -1)
  770. def test_readinto(self):
  771. rawio = self.MockRawIO((b"abc", b"d", b"efg"))
  772. bufio = self.tp(rawio)
  773. b = bytearray(2)
  774. self.assertEqual(bufio.readinto(b), 2)
  775. self.assertEqual(b, b"ab")
  776. self.assertEqual(bufio.readinto(b), 2)
  777. self.assertEqual(b, b"cd")
  778. self.assertEqual(bufio.readinto(b), 2)
  779. self.assertEqual(b, b"ef")
  780. self.assertEqual(bufio.readinto(b), 1)
  781. self.assertEqual(b, b"gf")
  782. self.assertEqual(bufio.readinto(b), 0)
  783. self.assertEqual(b, b"gf")
  784. def test_readlines(self):
  785. def bufio():
  786. rawio = self.MockRawIO((b"abc\n", b"d\n", b"ef"))
  787. return self.tp(rawio)
  788. self.assertEqual(bufio().readlines(), [b"abc\n", b"d\n", b"ef"])
  789. self.assertEqual(bufio().readlines(5), [b"abc\n", b"d\n"])
  790. self.assertEqual(bufio().readlines(None), [b"abc\n", b"d\n", b"ef"])
  791. def test_buffering(self):
  792. data = b"abcdefghi"
  793. dlen = len(data)
  794. tests = [
  795. [ 100, [ 3, 1, 4, 8 ], [ dlen, 0 ] ],
  796. [ 100, [ 3, 3, 3], [ dlen ] ],
  797. [ 4, [ 1, 2, 4, 2 ], [ 4, 4, 1 ] ],
  798. ]
  799. for bufsize, buf_read_sizes, raw_read_sizes in tests:
  800. rawio = self.MockFileIO(data)
  801. bufio = self.tp(rawio, buffer_size=bufsize)
  802. pos = 0
  803. for nbytes in buf_read_sizes:
  804. self.assertEqual(bufio.read(nbytes), data[pos:pos+nbytes])
  805. pos += nbytes
  806. # this is mildly implementation-dependent
  807. self.assertEqual(rawio.read_history, raw_read_sizes)
  808. def test_read_non_blocking(self):
  809. # Inject some None's in there to simulate EWOULDBLOCK
  810. rawio = self.MockRawIO((b"abc", b"d", None, b"efg", None, None, None))
  811. bufio = self.tp(rawio)
  812. self.assertEqual(b"abcd", bufio.read(6))
  813. self.assertEqual(b"e", bufio.read(1))
  814. self.assertEqual(b"fg", bufio.read())
  815. self.assertEqual(b"", bufio.peek(1))
  816. self.assertIsNone(bufio.read())
  817. self.assertEqual(b"", bufio.read())
  818. rawio = self.MockRawIO((b"a", None, None))
  819. self.assertEqual(b"a", rawio.readall())
  820. self.assertIsNone(rawio.readall())
  821. def test_read_past_eof(self):
  822. rawio = self.MockRawIO((b"abc", b"d", b"efg"))
  823. bufio = self.tp(rawio)
  824. self.assertEqual(b"abcdefg", bufio.read(9000))
  825. def test_read_all(self):
  826. rawio = self.MockRawIO((b"abc", b"d", b"efg"))
  827. bufio = self.tp(rawio)
  828. self.assertEqual(b"abcdefg", bufio.read())
  829. @unittest.skipUnless(threading, 'Threading required for this test.')
  830. @support.requires_resource('cpu')
  831. def test_threads(self):
  832. try:
  833. # Write out many bytes with exactly the same number of 0's,
  834. # 1's... 255's. This will help us check that concurrent reading
  835. # doesn't duplicate or forget contents.
  836. N = 1000
  837. l = list(range(256)) * N
  838. random.shuffle(l)
  839. s = bytes(bytearray(l))
  840. with self.open(support.TESTFN, "wb") as f:
  841. f.write(s)
  842. with self.open(support.TESTFN, self.read_mode, buffering=0) as raw:
  843. bufio = self.tp(raw, 8)
  844. errors = []
  845. results = []
  846. def f():
  847. try:
  848. # Intra-buffer read then buffer-flushing read
  849. for n in cycle([1, 19]):
  850. s = bufio.read(n)
  851. if not s:
  852. break
  853. # list.append() is atomic
  854. results.append(s)
  855. except Exception as e:
  856. errors.append(e)
  857. raise
  858. threads = [threading.Thread(target=f) for x in range(20)]
  859. with support.start_threads(threads):
  860. time.sleep(0.02) # yield
  861. self.assertFalse(errors,
  862. "the following exceptions were caught: %r" % errors)
  863. s = b''.join(results)
  864. for i in range(256):
  865. c = bytes(bytearray([i]))
  866. self.assertEqual(s.count(c), N)
  867. finally:
  868. support.unlink(support.TESTFN)
  869. def test_misbehaved_io(self):
  870. rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
  871. bufio = self.tp(rawio)
  872. self.assertRaises(IOError, bufio.seek, 0)
  873. self.assertRaises(IOError, bufio.tell)
  874. def test_no_extraneous_read(self):
  875. # Issue #9550; when the raw IO object has satisfied the read request,
  876. # we should not issue any additional reads, otherwise it may block
  877. # (e.g. socket).
  878. bufsize = 16
  879. for n in (2, bufsize - 1, bufsize, bufsize + 1, bufsize * 2):
  880. rawio = self.MockRawIO([b"x" * n])
  881. bufio = self.tp(rawio, bufsize)
  882. self.assertEqual(bufio.read(n), b"x" * n)
  883. # Simple case: one raw read is enough to satisfy the request.
  884. self.assertEqual(rawio._extraneous_reads, 0,
  885. "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
  886. # A more complex case where two raw reads are needed to satisfy
  887. # the request.
  888. rawio = self.MockRawIO([b"x" * (n - 1), b"x"])
  889. bufio = self.tp(rawio, bufsize)
  890. self.assertEqual(bufio.read(n), b"x" * n)
  891. self.assertEqual(rawio._extraneous_reads, 0,
  892. "failed for {}: {} != 0".format(n, rawio._extraneous_reads))
  893. class CBufferedReaderTest(BufferedReaderTest, SizeofTest):
  894. tp = io.BufferedReader
  895. def test_constructor(self):
  896. BufferedReaderTest.test_constructor(self)
  897. # The allocation can succeed on 32-bit builds, e.g. with more
  898. # than 2GB RAM and a 64-bit kernel.
  899. if sys.maxsize > 0x7FFFFFFF:
  900. rawio = self.MockRawIO()
  901. bufio = self.tp(rawio)
  902. self.assertRaises((OverflowError, MemoryError, ValueError),
  903. bufio.__init__, rawio, sys.maxsize)
  904. def test_initialization(self):
  905. rawio = self.MockRawIO([b"abc"])
  906. bufio = self.tp(rawio)
  907. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
  908. self.assertRaises(ValueError, bufio.read)
  909. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
  910. self.assertRaises(ValueError, bufio.read)
  911. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
  912. self.assertRaises(ValueError, bufio.read)
  913. def test_misbehaved_io_read(self):
  914. rawio = self.MisbehavedRawIO((b"abc", b"d", b"efg"))
  915. bufio = self.tp(rawio)
  916. # _pyio.BufferedReader seems to implement reading different, so that
  917. # checking this is not so easy.
  918. self.assertRaises(IOError, bufio.read, 10)
  919. def test_garbage_collection(self):
  920. # C BufferedReader objects are collected.
  921. # The Python version has __del__, so it ends into gc.garbage instead
  922. rawio = self.FileIO(support.TESTFN, "w+b")
  923. f = self.tp(rawio)
  924. f.f = f
  925. wr = weakref.ref(f)
  926. del f
  927. support.gc_collect()
  928. self.assertIsNone(wr(), wr)
  929. def test_args_error(self):
  930. # Issue #17275
  931. with self.assertRaisesRegexp(TypeError, "BufferedReader"):
  932. self.tp(io.BytesIO(), 1024, 1024, 1024)
  933. class PyBufferedReaderTest(BufferedReaderTest):
  934. tp = pyio.BufferedReader
  935. class BufferedWriterTest(unittest.TestCase, CommonBufferedTests):
  936. write_mode = "wb"
  937. def test_constructor(self):
  938. rawio = self.MockRawIO()
  939. bufio = self.tp(rawio)
  940. bufio.__init__(rawio)
  941. bufio.__init__(rawio, buffer_size=1024)
  942. bufio.__init__(rawio, buffer_size=16)
  943. self.assertEqual(3, bufio.write(b"abc"))
  944. bufio.flush()
  945. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
  946. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
  947. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
  948. bufio.__init__(rawio)
  949. self.assertEqual(3, bufio.write(b"ghi"))
  950. bufio.flush()
  951. self.assertEqual(b"".join(rawio._write_stack), b"abcghi")
  952. def test_uninitialized(self):
  953. bufio = self.tp.__new__(self.tp)
  954. del bufio
  955. bufio = self.tp.__new__(self.tp)
  956. self.assertRaisesRegexp((ValueError, AttributeError),
  957. 'uninitialized|has no attribute',
  958. bufio.write, b'')
  959. bufio.__init__(self.MockRawIO())
  960. self.assertEqual(bufio.write(b''), 0)
  961. def test_detach_flush(self):
  962. raw = self.MockRawIO()
  963. buf = self.tp(raw)
  964. buf.write(b"howdy!")
  965. self.assertFalse(raw._write_stack)
  966. buf.detach()
  967. self.assertEqual(raw._write_stack, [b"howdy!"])
  968. def test_write(self):
  969. # Write to the buffered IO but don't overflow the buffer.
  970. writer = self.MockRawIO()
  971. bufio = self.tp(writer, 8)
  972. bufio.write(b"abc")
  973. self.assertFalse(writer._write_stack)
  974. buffer = bytearray(b"def")
  975. bufio.write(buffer)
  976. buffer[:] = b"***" # Overwrite our copy of the data
  977. bufio.flush()
  978. self.assertEqual(b"".join(writer._write_stack), b"abcdef")
  979. def test_write_overflow(self):
  980. writer = self.MockRawIO()
  981. bufio = self.tp(writer, 8)
  982. contents = b"abcdefghijklmnop"
  983. for n in range(0, len(contents), 3):
  984. bufio.write(contents[n:n+3])
  985. flushed = b"".join(writer._write_stack)
  986. # At least (total - 8) bytes were implicitly flushed, perhaps more
  987. # depending on the implementation.
  988. self.assertTrue(flushed.startswith(contents[:-8]), flushed)
  989. def check_writes(self, intermediate_func):
  990. # Lots of writes, test the flushed output is as expected.
  991. contents = bytes(range(256)) * 1000
  992. n = 0
  993. writer = self.MockRawIO()
  994. bufio = self.tp(writer, 13)
  995. # Generator of write sizes: repeat each N 15 times then proceed to N+1
  996. def gen_sizes():
  997. for size in count(1):
  998. for i in range(15):
  999. yield size
  1000. sizes = gen_sizes()
  1001. while n < len(contents):
  1002. size = min(next(sizes), len(contents) - n)
  1003. self.assertEqual(bufio.write(contents[n:n+size]), size)
  1004. intermediate_func(bufio)
  1005. n += size
  1006. bufio.flush()
  1007. self.assertEqual(contents,
  1008. b"".join(writer._write_stack))
  1009. def test_writes(self):
  1010. self.check_writes(lambda bufio: None)
  1011. def test_writes_and_flushes(self):
  1012. self.check_writes(lambda bufio: bufio.flush())
  1013. def test_writes_and_seeks(self):
  1014. def _seekabs(bufio):
  1015. pos = bufio.tell()
  1016. bufio.seek(pos + 1, 0)
  1017. bufio.seek(pos - 1, 0)
  1018. bufio.seek(pos, 0)
  1019. self.check_writes(_seekabs)
  1020. def _seekrel(bufio):
  1021. pos = bufio.seek(0, 1)
  1022. bufio.seek(+1, 1)
  1023. bufio.seek(-1, 1)
  1024. bufio.seek(pos, 0)
  1025. self.check_writes(_seekrel)
  1026. def test_writes_and_truncates(self):
  1027. self.check_writes(lambda bufio: bufio.truncate(bufio.tell()))
  1028. def test_write_non_blocking(self):
  1029. raw = self.MockNonBlockWriterIO()
  1030. bufio = self.tp(raw, 8)
  1031. self.assertEqual(bufio.write(b"abcd"), 4)
  1032. self.assertEqual(bufio.write(b"efghi"), 5)
  1033. # 1 byte will be written, the rest will be buffered
  1034. raw.block_on(b"k")
  1035. self.assertEqual(bufio.write(b"jklmn"), 5)
  1036. # 8 bytes will be written, 8 will be buffered and the rest will be lost
  1037. raw.block_on(b"0")
  1038. try:
  1039. bufio.write(b"opqrwxyz0123456789")
  1040. except self.BlockingIOError as e:
  1041. written = e.characters_written
  1042. else:
  1043. self.fail("BlockingIOError should have been raised")
  1044. self.assertEqual(written, 16)
  1045. self.assertEqual(raw.pop_written(),
  1046. b"abcdefghijklmnopqrwxyz")
  1047. self.assertEqual(bufio.write(b"ABCDEFGHI"), 9)
  1048. s = raw.pop_written()
  1049. # Previously buffered bytes were flushed
  1050. self.assertTrue(s.startswith(b"01234567A"), s)
  1051. def test_write_and_rewind(self):
  1052. raw = io.BytesIO()
  1053. bufio = self.tp(raw, 4)
  1054. self.assertEqual(bufio.write(b"abcdef"), 6)
  1055. self.assertEqual(bufio.tell(), 6)
  1056. bufio.seek(0, 0)
  1057. self.assertEqual(bufio.write(b"XY"), 2)
  1058. bufio.seek(6, 0)
  1059. self.assertEqual(raw.getvalue(), b"XYcdef")
  1060. self.assertEqual(bufio.write(b"123456"), 6)
  1061. bufio.flush()
  1062. self.assertEqual(raw.getvalue(), b"XYcdef123456")
  1063. def test_flush(self):
  1064. writer = self.MockRawIO()
  1065. bufio = self.tp(writer, 8)
  1066. bufio.write(b"abc")
  1067. bufio.flush()
  1068. self.assertEqual(b"abc", writer._write_stack[0])
  1069. def test_writelines(self):
  1070. l = [b'ab', b'cd', b'ef']
  1071. writer = self.MockRawIO()
  1072. bufio = self.tp(writer, 8)
  1073. bufio.writelines(l)
  1074. bufio.flush()
  1075. self.assertEqual(b''.join(writer._write_stack), b'abcdef')
  1076. def test_writelines_userlist(self):
  1077. l = UserList([b'ab', b'cd', b'ef'])
  1078. writer = self.MockRawIO()
  1079. bufio = self.tp(writer, 8)
  1080. bufio.writelines(l)
  1081. bufio.flush()
  1082. self.assertEqual(b''.join(writer._write_stack), b'abcdef')
  1083. def test_writelines_error(self):
  1084. writer = self.MockRawIO()
  1085. bufio = self.tp(writer, 8)
  1086. self.assertRaises(TypeError, bufio.writelines, [1, 2, 3])
  1087. self.assertRaises(TypeError, bufio.writelines, None)
  1088. def test_destructor(self):
  1089. writer = self.MockRawIO()
  1090. bufio = self.tp(writer, 8)
  1091. bufio.write(b"abc")
  1092. del bufio
  1093. support.gc_collect()
  1094. self.assertEqual(b"abc", writer._write_stack[0])
  1095. def test_truncate(self):
  1096. # Truncate implicitly flushes the buffer.
  1097. with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
  1098. bufio = self.tp(raw, 8)
  1099. bufio.write(b"abcdef")
  1100. self.assertEqual(bufio.truncate(3), 3)
  1101. self.assertEqual(bufio.tell(), 6)
  1102. with self.open(support.TESTFN, "rb", buffering=0) as f:
  1103. self.assertEqual(f.read(), b"abc")
  1104. @unittest.skipUnless(threading, 'Threading required for this test.')
  1105. @support.requires_resource('cpu')
  1106. def test_threads(self):
  1107. try:
  1108. # Write out many bytes from many threads and test they were
  1109. # all flushed.
  1110. N = 1000
  1111. contents = bytes(range(256)) * N
  1112. sizes = cycle([1, 19])
  1113. n = 0
  1114. queue = deque()
  1115. while n < len(contents):
  1116. size = next(sizes)
  1117. queue.append(contents[n:n+size])
  1118. n += size
  1119. del contents
  1120. # We use a real file object because it allows us to
  1121. # exercise situations where the GIL is released before
  1122. # writing the buffer to the raw streams. This is in addition
  1123. # to concurrency issues due to switching threads in the middle
  1124. # of Python code.
  1125. with self.open(support.TESTFN, self.write_mode, buffering=0) as raw:
  1126. bufio = self.tp(raw, 8)
  1127. errors = []
  1128. def f():
  1129. try:
  1130. while True:
  1131. try:
  1132. s = queue.popleft()
  1133. except IndexError:
  1134. return
  1135. bufio.write(s)
  1136. except Exception as e:
  1137. errors.append(e)
  1138. raise
  1139. threads = [threading.Thread(target=f) for x in range(20)]
  1140. with support.start_threads(threads):
  1141. time.sleep(0.02) # yield
  1142. self.assertFalse(errors,
  1143. "the following exceptions were caught: %r" % errors)
  1144. bufio.close()
  1145. with self.open(support.TESTFN, "rb") as f:
  1146. s = f.read()
  1147. for i in range(256):
  1148. self.assertEqual(s.count(bytes([i])), N)
  1149. finally:
  1150. support.unlink(support.TESTFN)
  1151. def test_misbehaved_io(self):
  1152. rawio = self.MisbehavedRawIO()
  1153. bufio = self.tp(rawio, 5)
  1154. self.assertRaises(IOError, bufio.seek, 0)
  1155. self.assertRaises(IOError, bufio.tell)
  1156. self.assertRaises(IOError, bufio.write, b"abcdef")
  1157. def test_max_buffer_size_deprecation(self):
  1158. with support.check_warnings(("max_buffer_size is deprecated",
  1159. DeprecationWarning)):
  1160. self.tp(self.MockRawIO(), 8, 12)
  1161. def test_write_error_on_close(self):
  1162. raw = self.MockRawIO()
  1163. def bad_write(b):
  1164. raise IOError()
  1165. raw.write = bad_write
  1166. b = self.tp(raw)
  1167. b.write(b'spam')
  1168. self.assertRaises(IOError, b.close) # exception not swallowed
  1169. self.assertTrue(b.closed)
  1170. class CBufferedWriterTest(BufferedWriterTest, SizeofTest):
  1171. tp = io.BufferedWriter
  1172. def test_constructor(self):
  1173. BufferedWriterTest.test_constructor(self)
  1174. # The allocation can succeed on 32-bit builds, e.g. with more
  1175. # than 2GB RAM and a 64-bit kernel.
  1176. if sys.maxsize > 0x7FFFFFFF:
  1177. rawio = self.MockRawIO()
  1178. bufio = self.tp(rawio)
  1179. self.assertRaises((OverflowError, MemoryError, ValueError),
  1180. bufio.__init__, rawio, sys.maxsize)
  1181. def test_initialization(self):
  1182. rawio = self.MockRawIO()
  1183. bufio = self.tp(rawio)
  1184. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=0)
  1185. self.assertRaises(ValueError, bufio.write, b"def")
  1186. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-16)
  1187. self.assertRaises(ValueError, bufio.write, b"def")
  1188. self.assertRaises(ValueError, bufio.__init__, rawio, buffer_size=-1)
  1189. self.assertRaises(ValueError, bufio.write, b"def")
  1190. def test_garbage_collection(self):
  1191. # C BufferedWriter objects are collected, and collecting them flushes
  1192. # all data to disk.
  1193. # The Python version has __del__, so it ends into gc.garbage instead
  1194. rawio = self.FileIO(support.TESTFN, "w+b")
  1195. f = self.tp(rawio)
  1196. f.write(b"123xxx")
  1197. f.x = f
  1198. wr = weakref.ref(f)
  1199. del f
  1200. support.gc_collect()
  1201. self.assertIsNone(wr(), wr)
  1202. with self.open(support.TESTFN, "rb") as f:
  1203. self.assertEqual(f.read(), b"123xxx")
  1204. def test_args_error(self):
  1205. # Issue #17275
  1206. with self.assertRaisesRegexp(TypeError, "BufferedWriter"):
  1207. self.tp(io.BytesIO(), 1024, 1024, 1024)
  1208. class PyBufferedWriterTest(BufferedWriterTest):
  1209. tp = pyio.BufferedWriter
  1210. class BufferedRWPairTest(unittest.TestCase):
  1211. def test_constructor(self):
  1212. pair = self.tp(self.MockRawIO(), self.MockRawIO())
  1213. self.assertFalse(pair.closed)
  1214. def test_uninitialized(self):
  1215. pair = self.tp.__new__(self.tp)
  1216. del pair
  1217. pair = self.tp.__new__(self.tp)
  1218. self.assertRaisesRegexp((ValueError, AttributeError),
  1219. 'uninitialized|has no attribute',
  1220. pair.read, 0)
  1221. self.assertRaisesRegexp((ValueError, AttributeError),
  1222. 'uninitialized|has no attribute',
  1223. pair.write, b'')
  1224. pair.__init__(self.MockRawIO(), self.MockRawIO())
  1225. self.assertEqual(pair.read(0), b'')
  1226. self.assertEqual(pair.write(b''), 0)
  1227. def test_detach(self):
  1228. pair = self.tp(self.MockRawIO(), self.MockRawIO())
  1229. self.assertRaises(self.UnsupportedOperation, pair.detach)
  1230. def test_constructor_max_buffer_size_deprecation(self):
  1231. with support.check_warnings(("max_buffer_size is deprecated",
  1232. DeprecationWarning)):
  1233. self.tp(self.MockRawIO(), self.MockRawIO(), 8, 12)
  1234. def test_constructor_with_not_readable(self):
  1235. class NotReadable(MockRawIO):
  1236. def readable(self):
  1237. return False
  1238. self.assertRaises(IOError, self.tp, NotReadable(), self.MockRawIO())
  1239. def test_constructor_with_not_writeable(self):
  1240. class NotWriteable(MockRawIO):
  1241. def writable(self):
  1242. return False
  1243. self.assertRaises(IOError, self.tp, self.MockRawIO(), NotWriteable())
  1244. def test_read(self):
  1245. pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
  1246. self.assertEqual(pair.read(3), b"abc")
  1247. self.assertEqual(pair.read(1), b"d")
  1248. self.assertEqual(pair.read(), b"ef")
  1249. pair = self.tp(self.BytesIO(b"abc"), self.MockRawIO())
  1250. self.assertEqual(pair.read(None), b"abc")
  1251. def test_readlines(self):
  1252. pair = lambda: self.tp(self.BytesIO(b"abc\ndef\nh"), self.MockRawIO())
  1253. self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
  1254. self.assertEqual(pair().readlines(), [b"abc\n", b"def\n", b"h"])
  1255. self.assertEqual(pair().readlines(5), [b"abc\n", b"def\n"])
  1256. def test_read1(self):
  1257. # .read1() is delegated to the underlying reader object, so this test
  1258. # can be shallow.
  1259. pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
  1260. self.assertEqual(pair.read1(3), b"abc")
  1261. def test_readinto(self):
  1262. pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
  1263. data = byteslike(5)
  1264. self.assertEqual(pair.readinto(data), 5)
  1265. self.assertEqual(data.tobytes(), b"abcde")
  1266. def test_write(self):
  1267. w = self.MockRawIO()
  1268. pair = self.tp(self.MockRawIO(), w)
  1269. pair.write(b"abc")
  1270. pair.flush()
  1271. buffer = bytearray(b"def")
  1272. pair.write(buffer)
  1273. buffer[:] = b"***" # Overwrite our copy of the data
  1274. pair.flush()
  1275. self.assertEqual(w._write_stack, [b"abc", b"def"])
  1276. def test_peek(self):
  1277. pair = self.tp(self.BytesIO(b"abcdef"), self.MockRawIO())
  1278. self.assertTrue(pair.peek(3).startswith(b"abc"))
  1279. self.assertEqual(pair.read(3), b"abc")
  1280. def test_readable(self):
  1281. pair = self.tp(self.MockRawIO(), self.MockRawIO())
  1282. self.assertTrue(pair.readable())
  1283. def test_writeable(self):
  1284. pair = self.tp(self.MockRawIO(), self.MockRawIO())
  1285. self.assertTrue(pair.writable())
  1286. def test_seekable(self):
  1287. # BufferedRWPairs are never seekable, even if their readers and writers
  1288. # are.
  1289. pair = self.tp(self.MockRawIO(), self.MockRawIO())
  1290. self.assertFalse(pair.seekable())
  1291. # .flush() is delegated to the underlying writer object and has been
  1292. # tested in the test_write method.
  1293. def test_close_and_closed(self):
  1294. pair = self.tp(self.MockRawIO(), self.MockRawIO())
  1295. self.assertFalse(pair.closed)
  1296. pair.close()
  1297. self.assertTrue(pair.closed)
  1298. def test_reader_close_error_on_close(self):
  1299. def reader_close():
  1300. reader_non_existing
  1301. reader = self.MockRawIO()
  1302. reader.close = reader_close
  1303. writer = self.MockRawIO()
  1304. pair = self.tp(reader, writer)
  1305. with self.assertRaises(NameError) as err:
  1306. pair.close()
  1307. self.assertIn('reader_non_existing', str(err.exception))
  1308. self.assertTrue(pair.closed)
  1309. self.assertFalse(reader.closed)
  1310. self.assertTrue(writer.closed)
  1311. def test_writer_close_error_on_close(self):
  1312. def writer_close():
  1313. writer_non_existing
  1314. reader = self.MockRawIO()
  1315. writer = self.MockRawIO()
  1316. writer.close = writer_close
  1317. pair = self.tp(reader, writer)
  1318. with self.assertRaises(NameError) as err:
  1319. pair.close()
  1320. self.assertIn('writer_non_existing', str(err.exception))
  1321. self.assertFalse(pair.closed)
  1322. self.assertTrue(reader.closed)
  1323. self.assertFalse(writer.closed)
  1324. def test_reader_writer_close_error_on_close(self):
  1325. def reader_close():
  1326. reader_non_existing
  1327. def writer_close():
  1328. writer_non_existing
  1329. reader = self.MockRawIO()
  1330. reader.close = reader_close
  1331. writer = self.MockRawIO()
  1332. writer.close = writer_close
  1333. pair = self.tp(reader, writer)
  1334. with self.assertRaises(NameError) as err:
  1335. pair.close()
  1336. self.assertIn('reader_non_existing', str(err.exception))
  1337. self.assertFalse(pair.closed)
  1338. self.assertFalse(reader.closed)
  1339. self.assertFalse(writer.closed)
  1340. def test_isatty(self):
  1341. class SelectableIsAtty(MockRawIO):
  1342. def __init__(self, isatty):
  1343. MockRawIO.__init__(self)
  1344. self._isatty = isatty
  1345. def isatty(self):
  1346. return self._isatty
  1347. pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(False))
  1348. self.assertFalse(pair.isatty())
  1349. pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(False))
  1350. self.assertTrue(pair.isatty())
  1351. pair = self.tp(SelectableIsAtty(False), SelectableIsAtty(True))
  1352. self.assertTrue(pair.isatty())
  1353. pair = self.tp(SelectableIsAtty(True), SelectableIsAtty(True))
  1354. self.assertTrue(pair.isatty())
  1355. def test_weakref_clearing(self):
  1356. brw = self.tp(self.MockRawIO(), self.MockRawIO())
  1357. ref = weakref.ref(brw)
  1358. brw = None
  1359. ref = None # Shouldn't segfault.
  1360. class CBufferedRWPairTest(BufferedRWPairTest):
  1361. tp = io.BufferedRWPair
  1362. class PyBufferedRWPairTest(BufferedRWPairTest):
  1363. tp = pyio.BufferedRWPair
  1364. class BufferedRandomTest(BufferedReaderTest, BufferedWriterTest):
  1365. read_mode = "rb+"
  1366. write_mode = "wb+"
  1367. def test_constructor(self):
  1368. BufferedReaderTest.test_constructor(self)
  1369. BufferedWriterTest.test_constructor(self)
  1370. def test_uninitialized(self):
  1371. BufferedReaderTest.test_uninitialized(self)
  1372. BufferedWriterTest.test_uninitialized(self)
  1373. def test_read_and_write(self):
  1374. raw = self.MockRawIO((b"asdf", b"ghjk"))
  1375. rw = self.tp(raw, 8)
  1376. self.assertEqual(b"as", rw.read(2))
  1377. rw.write(b"ddd")
  1378. rw.write(b"eee")
  1379. self.assertFalse(raw._write_stack) # Buffer writes
  1380. self.assertEqual(b"ghjk", rw.read())
  1381. self.assertEqual(b"dddeee", raw._write_stack[0])
  1382. def test_seek_and_tell(self):
  1383. raw = self.BytesIO(b"asdfghjkl")
  1384. rw = self.tp(raw)
  1385. self.assertEqual(b"as", rw.read(2))
  1386. self.assertEqual(2, rw.tell())
  1387. rw.seek(0, 0)
  1388. self.assertEqual(b"asdf", rw.read(4))
  1389. rw.write(b"123f")
  1390. rw.seek(0, 0)
  1391. self.assertEqual(b"asdf123fl", rw.read())
  1392. self.assertEqual(9, rw.tell())
  1393. rw.seek(-4, 2)
  1394. self.assertEqual(5, rw.tell())
  1395. rw.seek(2, 1)
  1396. self.assertEqual(7, rw.tell())
  1397. self.assertEqual(b"fl", rw.read(11))
  1398. rw.flush()
  1399. self.assertEqual(b"asdf123fl", raw.getvalue())
  1400. self.assertRaises(TypeError, rw.seek, 0.0)
  1401. def check_flush_and_read(self, read_func):
  1402. raw = self.BytesIO(b"abcdefghi")
  1403. bufio = self.tp(raw)
  1404. self.assertEqual(b"ab", read_func(bufio, 2))
  1405. bufio.write(b"12")
  1406. self.assertEqual(b"ef", read_func(bufio, 2))
  1407. self.assertEqual(6, bufio.tell())
  1408. bufio.flush()
  1409. self.assertEqual(6, bufio.tell())
  1410. self.assertEqual(b"ghi", read_func(bufio))
  1411. raw.seek(0, 0)
  1412. raw.write(b"XYZ")
  1413. # flush() resets the read buffer
  1414. bufio.flush()
  1415. bufio.seek(0, 0)
  1416. self.assertEqual(b"XYZ", read_func(bufio, 3))
  1417. def test_flush_and_read(self):
  1418. self.check_flush_and_read(lambda bufio, *args: bufio.read(*args))
  1419. def test_flush_and_readinto(self):
  1420. def _readinto(bufio, n=-1):
  1421. b = bytearray(n if n >= 0 else 9999)
  1422. n = bufio.readinto(b)
  1423. return bytes(b[:n])
  1424. self.check_flush_and_read(_readinto)
  1425. def test_flush_and_peek(self):
  1426. def _peek(bufio, n=-1):
  1427. # This relies on the fact that the buffer can contain the whole
  1428. # raw stream, otherwise peek() can return less.
  1429. b = bufio.peek(n)
  1430. if n != -1:
  1431. b = b[:n]
  1432. bufio.seek(len(b), 1)
  1433. return b
  1434. self.check_flush_and_read(_peek)
  1435. def test_flush_and_write(self):
  1436. raw = self.BytesIO(b"abcdefghi")
  1437. bufio = self.tp(raw)
  1438. bufio.write(b"123")
  1439. bufio.flush()
  1440. bufio.write(b"45")
  1441. bufio.flush()
  1442. bufio.seek(0, 0)
  1443. self.assertEqual(b"12345fghi", raw.getvalue())
  1444. self.assertEqual(b"12345fghi", bufio.read())
  1445. def test_threads(self):
  1446. BufferedReaderTest.test_threads(self)
  1447. BufferedWriterTest.test_threads(self)
  1448. def test_writes_and_peek(self):
  1449. def _peek(bufio):
  1450. bufio.peek(1)
  1451. self.check_writes(_peek)
  1452. def _peek(bufio):
  1453. pos = bufio.tell()
  1454. bufio.seek(-1, 1)
  1455. bufio.peek(1)
  1456. bufio.seek(pos, 0)
  1457. self.check_writes(_peek)
  1458. def test_writes_and_reads(self):
  1459. def _read(bufio):
  1460. bufio.seek(-1, 1)
  1461. bufio.read(1)
  1462. self.check_writes(_read)
  1463. def test_writes_and_read1s(self):
  1464. def _read1(bufio):
  1465. bufio.seek(-1, 1)
  1466. bufio.read1(1)
  1467. self.check_writes(_read1)
  1468. def test_writes_and_readintos(self):
  1469. def _read(bufio):
  1470. bufio.seek(-1, 1)
  1471. bufio.readinto(bytearray(1))
  1472. self.check_writes(_read)
  1473. def test_write_after_readahead(self):
  1474. # Issue #6629: writing after the buffer was filled by readahead should
  1475. # first rewind the raw stream.
  1476. for overwrite_size in [1, 5]:
  1477. raw = self.BytesIO(b"A" * 10)
  1478. bufio = self.tp(raw, 4)
  1479. # Trigger readahead
  1480. self.assertEqual(bufio.read(1), b"A")
  1481. self.assertEqual(bufio.tell(), 1)
  1482. # Overwriting should rewind the raw stream if it needs so
  1483. bufio.write(b"B" * overwrite_size)
  1484. self.assertEqual(bufio.tell(), overwrite_size + 1)
  1485. # If the write size was smaller than the buffer size, flush() and
  1486. # check that rewind happens.
  1487. bufio.flush()
  1488. self.assertEqual(bufio.tell(), overwrite_size + 1)
  1489. s = raw.getvalue()
  1490. self.assertEqual(s,
  1491. b"A" + b"B" * overwrite_size + b"A" * (9 - overwrite_size))
  1492. def test_write_rewind_write(self):
  1493. # Various combinations of reading / writing / seeking backwards / writing again
  1494. def mutate(bufio, pos1, pos2):
  1495. assert pos2 >= pos1
  1496. # Fill the buffer
  1497. bufio.seek(pos1)
  1498. bufio.read(pos2 - pos1)
  1499. bufio.write(b'\x02')
  1500. # This writes earlier than the previous write, but still inside
  1501. # the buffer.
  1502. bufio.seek(pos1)
  1503. bufio.write(b'\x01')
  1504. b = b"\x80\x81\x82\x83\x84"
  1505. for i in range(0, len(b)):
  1506. for j in range(i, len(b)):
  1507. raw = self.BytesIO(b)
  1508. bufio = self.tp(raw, 100)
  1509. mutate(bufio, i, j)
  1510. bufio.flush()
  1511. expected = bytearray(b)
  1512. expected[j] = 2
  1513. expected[i] = 1
  1514. self.assertEqual(raw.getvalue(), expected,
  1515. "failed result for i=%d, j=%d" % (i, j))
  1516. def test_truncate_after_read_or_write(self):
  1517. raw = self.BytesIO(b"A" * 10)
  1518. bufio = self.tp(raw, 100)
  1519. self.assertEqual(bufio.read(2), b"AA") # the read buffer gets filled
  1520. self.assertEqual(bufio.truncate(), 2)
  1521. self.assertEqual(bufio.write(b"BB"), 2) # the write buffer increases
  1522. self.assertEqual(bufio.truncate(), 4)
  1523. def test_misbehaved_io(self):
  1524. BufferedReaderTest.test_misbehaved_io(self)
  1525. BufferedWriterTest.test_misbehaved_io(self)
  1526. def test_interleaved_read_write(self):
  1527. # Test for issue #12213
  1528. with self.BytesIO(b'abcdefgh') as raw:
  1529. with self.tp(raw, 100) as f:
  1530. f.write(b"1")
  1531. self.assertEqual(f.read(1), b'b')
  1532. f.write(b'2')
  1533. self.assertEqual(f.read1(1), b'd')
  1534. f.write(b'3')
  1535. buf = bytearray(1)
  1536. f.readinto(buf)
  1537. self.assertEqual(buf, b'f')
  1538. f.write(b'4')
  1539. self.assertEqual(f.peek(1), b'h')
  1540. f.flush()
  1541. self.assertEqual(raw.getvalue(), b'1b2d3f4h')
  1542. with self.BytesIO(b'abc') as raw:
  1543. with self.tp(raw, 100) as f:
  1544. self.assertEqual(f.read(1), b'a')
  1545. f.write(b"2")
  1546. self.assertEqual(f.read(1), b'c')
  1547. f.flush()
  1548. self.assertEqual(raw.getvalue(), b'a2c')
  1549. def test_interleaved_readline_write(self):
  1550. with self.BytesIO(b'ab\ncdef\ng\n') as raw:
  1551. with self.tp(raw) as f:
  1552. f.write(b'1')
  1553. self.assertEqual(f.readline(), b'b\n')
  1554. f.write(b'2')
  1555. self.assertEqual(f.readline(), b'def\n')
  1556. f.write(b'3')
  1557. self.assertEqual(f.readline(), b'\n')
  1558. f.flush()
  1559. self.assertEqual(raw.getvalue(), b'1b\n2def\n3\n')
  1560. class CBufferedRandomTest(CBufferedReaderTest, CBufferedWriterTest,
  1561. BufferedRandomTest, SizeofTest):
  1562. tp = io.BufferedRandom
  1563. def test_constructor(self):
  1564. BufferedRandomTest.test_constructor(self)
  1565. # The allocation can succeed on 32-bit builds, e.g. with more
  1566. # than 2GB RAM and a 64-bit kernel.
  1567. if sys.maxsize > 0x7FFFFFFF:
  1568. rawio = self.MockRawIO()
  1569. bufio = self.tp(rawio)
  1570. self.assertRaises((OverflowError, MemoryError, ValueError),
  1571. bufio.__init__, rawio, sys.maxsize)
  1572. def test_garbage_collection(self):
  1573. CBufferedReaderTest.test_garbage_collection(self)
  1574. CBufferedWriterTest.test_garbage_collection(self)
  1575. def test_args_error(self):
  1576. # Issue #17275
  1577. with self.assertRaisesRegexp(TypeError, "BufferedRandom"):
  1578. self.tp(io.BytesIO(), 1024, 1024, 1024)
  1579. class PyBufferedRandomTest(BufferedRandomTest):
  1580. tp = pyio.BufferedRandom
  1581. # To fully exercise seek/tell, the StatefulIncrementalDecoder has these
  1582. # properties:
  1583. # - A single output character can correspond to many bytes of input.
  1584. # - The number of input bytes to complete the character can be
  1585. # undetermined until the last input byte is received.
  1586. # - The number of input bytes can vary depending on previous input.
  1587. # - A single input byte can correspond to many characters of output.
  1588. # - The number of output characters can be undetermined until the
  1589. # last input byte is received.
  1590. # - The number of output characters can vary depending on previous input.
  1591. class StatefulIncrementalDecoder(codecs.IncrementalDecoder):
  1592. """
  1593. For testing seek/tell behavior with a stateful, buffering decoder.
  1594. Input is a sequence of words. Words may be fixed-length (length set
  1595. by input) or variable-length (period-terminated). In variable-length
  1596. mode, extra periods are ignored. Possible words are:
  1597. - 'i' followed by a number sets the input length, I (maximum 99).
  1598. When I is set to 0, words are space-terminated.
  1599. - 'o' followed by a number sets the output length, O (maximum 99).
  1600. - Any other word is converted into a word followed by a period on
  1601. the output. The output word consists of the input word truncated
  1602. or padded out with hyphens to make its length equal to O. If O
  1603. is 0, the word is output verbatim without truncating or padding.
  1604. I and O are initially set to 1. When I changes, any buffered input is
  1605. re-scanned according to the new I. EOF also terminates the last word.
  1606. """
  1607. def __init__(self, errors='strict'):
  1608. codecs.IncrementalDecoder.__init__(self, errors)
  1609. self.reset()
  1610. def __repr__(self):
  1611. return '<SID %x>' % id(self)
  1612. def reset(self):
  1613. self.i = 1
  1614. self.o = 1
  1615. self.buffer = bytearray()
  1616. def getstate(self):
  1617. i, o = self.i ^ 1, self.o ^ 1 # so that flags = 0 after reset()
  1618. return bytes(self.buffer), i*100 + o
  1619. def setstate(self, state):
  1620. buffer, io = state
  1621. self.buffer = bytearray(buffer)
  1622. i, o = divmod(io, 100)
  1623. self.i, self.o = i ^ 1, o ^ 1
  1624. def decode(self, input, final=False):
  1625. output = ''
  1626. for b in input:
  1627. if self.i == 0: # variable-length, terminated with period
  1628. if b == '.':
  1629. if self.buffer:
  1630. output += self.process_word()
  1631. else:
  1632. self.buffer.append(b)
  1633. else: # fixed-length, terminate after self.i bytes
  1634. self.buffer.append(b)
  1635. if len(self.buffer) == self.i:
  1636. output += self.process_word()
  1637. if final and self.buffer: # EOF terminates the last word
  1638. output += self.process_word()
  1639. return output
  1640. def process_word(self):
  1641. output = ''
  1642. if self.buffer[0] == ord('i'):
  1643. self.i = min(99, int(self.buffer[1:] or 0)) # set input length
  1644. elif self.buffer[0] == ord('o'):
  1645. self.o = min(99, int(self.buffer[1:] or 0)) # set output length
  1646. else:
  1647. output = self.buffer.decode('ascii')
  1648. if len(output) < self.o:
  1649. output += '-'*self.o # pad out with hyphens
  1650. if self.o:
  1651. output = output[:self.o] # truncate to output length
  1652. output += '.'
  1653. self.buffer = bytearray()
  1654. return output
  1655. codecEnabled = False
  1656. @classmethod
  1657. def lookupTestDecoder(cls, name):
  1658. if cls.codecEnabled and name == 'test_decoder':
  1659. latin1 = codecs.lookup('latin-1')
  1660. return codecs.CodecInfo(
  1661. name='test_decoder', encode=latin1.encode, decode=None,
  1662. incrementalencoder=None,
  1663. streamreader=None, streamwriter=None,
  1664. incrementaldecoder=cls)
  1665. # Register the previous decoder for testing.
  1666. # Disabled by default, tests will enable it.
  1667. codecs.register(StatefulIncrementalDecoder.lookupTestDecoder)
  1668. class StatefulIncrementalDecoderTest(unittest.TestCase):
  1669. """
  1670. Make sure the StatefulIncrementalDecoder actually works.
  1671. """
  1672. test_cases = [
  1673. # I=1, O=1 (fixed-length input == fixed-length output)
  1674. (b'abcd', False, 'a.b.c.d.'),
  1675. # I=0, O=0 (variable-length input, variable-length output)
  1676. (b'oiabcd', True, 'abcd.'),
  1677. # I=0, O=0 (should ignore extra periods)
  1678. (b'oi...abcd...', True, 'abcd.'),
  1679. # I=0, O=6 (variable-length input, fixed-length output)
  1680. (b'i.o6.x.xyz.toolongtofit.', False, 'x-----.xyz---.toolon.'),
  1681. # I=2, O=6 (fixed-length input < fixed-length output)
  1682. (b'i.i2.o6xyz', True, 'xy----.z-----.'),
  1683. # I=6, O=3 (fixed-length input > fixed-length output)
  1684. (b'i.o3.i6.abcdefghijklmnop', True, 'abc.ghi.mno.'),
  1685. # I=0, then 3; O=29, then 15 (with longer output)
  1686. (b'i.o29.a.b.cde.o15.abcdefghijabcdefghij.i3.a.b.c.d.ei00k.l.m', True,
  1687. 'a----------------------------.' +
  1688. 'b----------------------------.' +
  1689. 'cde--------------------------.' +
  1690. 'abcdefghijabcde.' +
  1691. 'a.b------------.' +
  1692. '.c.------------.' +
  1693. 'd.e------------.' +
  1694. 'k--------------.' +
  1695. 'l--------------.' +
  1696. 'm--------------.')
  1697. ]
  1698. def test_decoder(self):
  1699. # Try a few one-shot test cases.
  1700. for input, eof, output in self.test_cases:
  1701. d = StatefulIncrementalDecoder()
  1702. self.assertEqual(d.decode(input, eof), output)
  1703. # Also test an unfinished decode, followed by forcing EOF.
  1704. d = StatefulIncrementalDecoder()
  1705. self.assertEqual(d.decode(b'oiabcd'), '')
  1706. self.assertEqual(d.decode(b'', 1), 'abcd.')
  1707. class TextIOWrapperTest(unittest.TestCase):
  1708. def setUp(self):
  1709. self.testdata = b"AAA\r\nBBB\rCCC\r\nDDD\nEEE\r\n"
  1710. self.normalized = b"AAA\nBBB\nCCC\nDDD\nEEE\n".decode("ascii")
  1711. support.unlink(support.TESTFN)
  1712. def tearDown(self):
  1713. support.unlink(support.TESTFN)
  1714. def test_constructor(self):
  1715. r = self.BytesIO(b"\xc3\xa9\n\n")
  1716. b = self.BufferedReader(r, 1000)
  1717. t = self.TextIOWrapper(b)
  1718. t.__init__(b, encoding="latin1", newline="\r\n")
  1719. self.assertEqual(t.encoding, "latin1")
  1720. self.assertEqual(t.line_buffering, False)
  1721. t.__init__(b, encoding="utf8", line_buffering=True)
  1722. self.assertEqual(t.encoding, "utf8")
  1723. self.assertEqual(t.line_buffering, True)
  1724. self.assertEqual("\xe9\n", t.readline())
  1725. self.assertRaises(TypeError, t.__init__, b, newline=42)
  1726. self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
  1727. def test_uninitialized(self):
  1728. t = self.TextIOWrapper.__new__(self.TextIOWrapper)
  1729. del t
  1730. t = self.TextIOWrapper.__new__(self.TextIOWrapper)
  1731. self.assertRaises(Exception, repr, t)
  1732. self.assertRaisesRegexp((ValueError, AttributeError),
  1733. 'uninitialized|has no attribute',
  1734. t.read, 0)
  1735. t.__init__(self.MockRawIO())
  1736. self.assertEqual(t.read(0), u'')
  1737. def test_non_text_encoding_codecs_are_rejected(self):
  1738. # Ensure the constructor complains if passed a codec that isn't
  1739. # marked as a text encoding
  1740. # http://bugs.python.org/issue20404
  1741. r = self.BytesIO()
  1742. b = self.BufferedWriter(r)
  1743. with support.check_py3k_warnings():
  1744. self.TextIOWrapper(b, encoding="hex_codec")
  1745. def test_detach(self):
  1746. r = self.BytesIO()
  1747. b = self.BufferedWriter(r)
  1748. t = self.TextIOWrapper(b)
  1749. self.assertIs(t.detach(), b)
  1750. t = self.TextIOWrapper(b, encoding="ascii")
  1751. t.write("howdy")
  1752. self.assertFalse(r.getvalue())
  1753. t.detach()
  1754. self.assertEqual(r.getvalue(), b"howdy")
  1755. self.assertRaises(ValueError, t.detach)
  1756. # Operations independent of the detached stream should still work
  1757. repr(t)
  1758. self.assertEqual(t.encoding, "ascii")
  1759. self.assertEqual(t.errors, "strict")
  1760. self.assertFalse(t.line_buffering)
  1761. def test_repr(self):
  1762. raw = self.BytesIO("hello".encode("utf-8"))
  1763. b = self.BufferedReader(raw)
  1764. t = self.TextIOWrapper(b, encoding="utf-8")
  1765. modname = self.TextIOWrapper.__module__
  1766. self.assertEqual(repr(t),
  1767. "<%s.TextIOWrapper encoding='utf-8'>" % modname)
  1768. raw.name = "dummy"
  1769. self.assertEqual(repr(t),
  1770. "<%s.TextIOWrapper name=u'dummy' encoding='utf-8'>" % modname)
  1771. raw.name = b"dummy"
  1772. self.assertEqual(repr(t),
  1773. "<%s.TextIOWrapper name='dummy' encoding='utf-8'>" % modname)
  1774. t.buffer.detach()
  1775. repr(t) # Should not raise an exception
  1776. def test_line_buffering(self):
  1777. r = self.BytesIO()
  1778. b = self.BufferedWriter(r, 1000)
  1779. t = self.TextIOWrapper(b, newline="\n", line_buffering=True)
  1780. t.write("X")
  1781. self.assertEqual(r.getvalue(), b"") # No flush happened
  1782. t.write("Y\nZ")
  1783. self.assertEqual(r.getvalue(), b"XY\nZ") # All got flushed
  1784. t.write("A\rB")
  1785. self.assertEqual(r.getvalue(), b"XY\nZA\rB")
  1786. def test_encoding(self):
  1787. # Check the encoding attribute is always set, and valid
  1788. b = self.BytesIO()
  1789. t = self.TextIOWrapper(b, encoding="utf8")
  1790. self.assertEqual(t.encoding, "utf8")
  1791. t = self.TextIOWrapper(b)
  1792. self.assertIsNotNone(t.encoding)
  1793. codecs.lookup(t.encoding)
  1794. def test_encoding_errors_reading(self):
  1795. # (1) default
  1796. b = self.BytesIO(b"abc\n\xff\n")
  1797. t = self.TextIOWrapper(b, encoding="ascii")
  1798. self.assertRaises(UnicodeError, t.read)
  1799. # (2) explicit strict
  1800. b = self.BytesIO(b"abc\n\xff\n")
  1801. t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
  1802. self.assertRaises(UnicodeError, t.read)
  1803. # (3) ignore
  1804. b = self.BytesIO(b"abc\n\xff\n")
  1805. t = self.TextIOWrapper(b, encoding="ascii", errors="ignore")
  1806. self.assertEqual(t.read(), "abc\n\n")
  1807. # (4) replace
  1808. b = self.BytesIO(b"abc\n\xff\n")
  1809. t = self.TextIOWrapper(b, encoding="ascii", errors="replace")
  1810. self.assertEqual(t.read(), "abc\n\ufffd\n")
  1811. def test_encoding_errors_writing(self):
  1812. # (1) default
  1813. b = self.BytesIO()
  1814. t = self.TextIOWrapper(b, encoding="ascii")
  1815. self.assertRaises(UnicodeError, t.write, "\xff")
  1816. # (2) explicit strict
  1817. b = self.BytesIO()
  1818. t = self.TextIOWrapper(b, encoding="ascii", errors="strict")
  1819. self.assertRaises(UnicodeError, t.write, "\xff")
  1820. # (3) ignore
  1821. b = self.BytesIO()
  1822. t = self.TextIOWrapper(b, encoding="ascii", errors="ignore",
  1823. newline="\n")
  1824. t.write("abc\xffdef\n")
  1825. t.flush()
  1826. self.assertEqual(b.getvalue(), b"abcdef\n")
  1827. # (4) replace
  1828. b = self.BytesIO()
  1829. t = self.TextIOWrapper(b, encoding="ascii", errors="replace",
  1830. newline="\n")
  1831. t.write("abc\xffdef\n")
  1832. t.flush()
  1833. self.assertEqual(b.getvalue(), b"abc?def\n")
  1834. def test_newlines(self):
  1835. input_lines = [ "unix\n", "windows\r\n", "os9\r", "last\n", "nonl" ]
  1836. tests = [
  1837. [ None, [ 'unix\n', 'windows\n', 'os9\n', 'last\n', 'nonl' ] ],
  1838. [ '', input_lines ],
  1839. [ '\n', [ "unix\n", "windows\r\n", "os9\rlast\n", "nonl" ] ],
  1840. [ '\r\n', [ "unix\nwindows\r\n", "os9\rlast\nnonl" ] ],
  1841. [ '\r', [ "unix\nwindows\r", "\nos9\r", "last\nnonl" ] ],
  1842. ]
  1843. encodings = (
  1844. 'utf-8', 'latin-1',
  1845. 'utf-16', 'utf-16-le', 'utf-16-be',
  1846. 'utf-32', 'utf-32-le', 'utf-32-be',
  1847. )
  1848. # Try a range of buffer sizes to test the case where \r is the last
  1849. # character in TextIOWrapper._pending_line.
  1850. for encoding in encodings:
  1851. # XXX: str.encode() should return bytes
  1852. data = bytes(''.join(input_lines).encode(encoding))
  1853. for do_reads in (False, True):
  1854. for bufsize in range(1, 10):
  1855. for newline, exp_lines in tests:
  1856. bufio = self.BufferedReader(self.BytesIO(data), bufsize)
  1857. textio = self.TextIOWrapper(bufio, newline=newline,
  1858. encoding=encoding)
  1859. if do_reads:
  1860. got_lines = []
  1861. while True:
  1862. c2 = textio.read(2)
  1863. if c2 == '':
  1864. break
  1865. self.assertEqual(len(c2), 2)
  1866. got_lines.append(c2 + textio.readline())
  1867. else:
  1868. got_lines = list(textio)
  1869. for got_line, exp_line in zip(got_lines, exp_lines):
  1870. self.assertEqual(got_line, exp_line)
  1871. self.assertEqual(len(got_lines), len(exp_lines))
  1872. def test_newlines_input(self):
  1873. testdata = b"AAA\nBB\x00B\nCCC\rDDD\rEEE\r\nFFF\r\nGGG"
  1874. normalized = testdata.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
  1875. for newline, expected in [
  1876. (None, normalized.decode("ascii").splitlines(True)),
  1877. ("", testdata.decode("ascii").splitlines(True)),
  1878. ("\n", ["AAA\n", "BB\x00B\n", "CCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
  1879. ("\r\n", ["AAA\nBB\x00B\nCCC\rDDD\rEEE\r\n", "FFF\r\n", "GGG"]),
  1880. ("\r", ["AAA\nBB\x00B\nCCC\r", "DDD\r", "EEE\r", "\nFFF\r", "\nGGG"]),
  1881. ]:
  1882. buf = self.BytesIO(testdata)
  1883. txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
  1884. self.assertEqual(txt.readlines(), expected)
  1885. txt.seek(0)
  1886. self.assertEqual(txt.read(), "".join(expected))
  1887. def test_newlines_output(self):
  1888. testdict = {
  1889. "": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
  1890. "\n": b"AAA\nBBB\nCCC\nX\rY\r\nZ",
  1891. "\r": b"AAA\rBBB\rCCC\rX\rY\r\rZ",
  1892. "\r\n": b"AAA\r\nBBB\r\nCCC\r\nX\rY\r\r\nZ",
  1893. }
  1894. tests = [(None, testdict[os.linesep])] + sorted(testdict.items())
  1895. for newline, expected in tests:
  1896. buf = self.BytesIO()
  1897. txt = self.TextIOWrapper(buf, encoding="ascii", newline=newline)
  1898. txt.write("AAA\nB")
  1899. txt.write("BB\nCCC\n")
  1900. txt.write("X\rY\r\nZ")
  1901. txt.flush()
  1902. self.assertEqual(buf.closed, False)
  1903. self.assertEqual(buf.getvalue(), expected)
  1904. def test_destructor(self):
  1905. l = []
  1906. base = self.BytesIO
  1907. class MyBytesIO(base):
  1908. def close(self):
  1909. l.append(self.getvalue())
  1910. base.close(self)
  1911. b = MyBytesIO()
  1912. t = self.TextIOWrapper(b, encoding="ascii")
  1913. t.write("abc")
  1914. del t
  1915. support.gc_collect()
  1916. self.assertEqual([b"abc"], l)
  1917. def test_override_destructor(self):
  1918. record = []
  1919. class MyTextIO(self.TextIOWrapper):
  1920. def __del__(self):
  1921. record.append(1)
  1922. try:
  1923. f = super(MyTextIO, self).__del__
  1924. except AttributeError:
  1925. pass
  1926. else:
  1927. f()
  1928. def close(self):
  1929. record.append(2)
  1930. super(MyTextIO, self).close()
  1931. def flush(self):
  1932. record.append(3)
  1933. super(MyTextIO, self).flush()
  1934. b = self.BytesIO()
  1935. t = MyTextIO(b, encoding="ascii")
  1936. del t
  1937. support.gc_collect()
  1938. self.assertEqual(record, [1, 2, 3])
  1939. def test_error_through_destructor(self):
  1940. # Test that the exception state is not modified by a destructor,
  1941. # even if close() fails.
  1942. rawio = self.CloseFailureIO()
  1943. def f():
  1944. self.TextIOWrapper(rawio).xyzzy
  1945. with support.captured_output("stderr") as s:
  1946. self.assertRaises(AttributeError, f)
  1947. s = s.getvalue().strip()
  1948. if s:
  1949. # The destructor *may* have printed an unraisable error, check it
  1950. self.assertEqual(len(s.splitlines()), 1)
  1951. self.assertTrue(s.startswith("Exception IOError: "), s)
  1952. self.assertTrue(s.endswith(" ignored"), s)
  1953. # Systematic tests of the text I/O API
  1954. def test_basic_io(self):
  1955. for chunksize in (1, 2, 3, 4, 5, 15, 16, 17, 31, 32, 33, 63, 64, 65):
  1956. for enc in "ascii", "latin1", "utf8" :# , "utf-16-be", "utf-16-le":
  1957. f = self.open(support.TESTFN, "w+", encoding=enc)
  1958. f._CHUNK_SIZE = chunksize
  1959. self.assertEqual(f.write("abc"), 3)
  1960. f.close()
  1961. f = self.open(support.TESTFN, "r+", encoding=enc)
  1962. f._CHUNK_SIZE = chunksize
  1963. self.assertEqual(f.tell(), 0)
  1964. self.assertEqual(f.read(), "abc")
  1965. cookie = f.tell()
  1966. self.assertEqual(f.seek(0), 0)
  1967. self.assertEqual(f.read(None), "abc")
  1968. f.seek(0)
  1969. self.assertEqual(f.read(2), "ab")
  1970. self.assertEqual(f.read(1), "c")
  1971. self.assertEqual(f.read(1), "")
  1972. self.assertEqual(f.read(), "")
  1973. self.assertEqual(f.tell(), cookie)
  1974. self.assertEqual(f.seek(0), 0)
  1975. self.assertEqual(f.seek(0, 2), cookie)
  1976. self.assertEqual(f.write("def"), 3)
  1977. self.assertEqual(f.seek(cookie), cookie)
  1978. self.assertEqual(f.read(), "def")
  1979. if enc.startswith("utf"):
  1980. self.multi_line_test(f, enc)
  1981. f.close()
  1982. def multi_line_test(self, f, enc):
  1983. f.seek(0)
  1984. f.truncate()
  1985. sample = "s\xff\u0fff\uffff"
  1986. wlines = []
  1987. for size in (0, 1, 2, 3, 4, 5, 30, 31, 32, 33, 62, 63, 64, 65, 1000):
  1988. chars = []
  1989. for i in range(size):
  1990. chars.append(sample[i % len(sample)])
  1991. line = "".join(chars) + "\n"
  1992. wlines.append((f.tell(), line))
  1993. f.write(line)
  1994. f.seek(0)
  1995. rlines = []
  1996. while True:
  1997. pos = f.tell()
  1998. line = f.readline()
  1999. if not line:
  2000. break
  2001. rlines.append((pos, line))
  2002. self.assertEqual(rlines, wlines)
  2003. def test_telling(self):
  2004. f = self.open(support.TESTFN, "w+", encoding="utf8")
  2005. p0 = f.tell()
  2006. f.write("\xff\n")
  2007. p1 = f.tell()
  2008. f.write("\xff\n")
  2009. p2 = f.tell()
  2010. f.seek(0)
  2011. self.assertEqual(f.tell(), p0)
  2012. self.assertEqual(f.readline(), "\xff\n")
  2013. self.assertEqual(f.tell(), p1)
  2014. self.assertEqual(f.readline(), "\xff\n")
  2015. self.assertEqual(f.tell(), p2)
  2016. f.seek(0)
  2017. for line in f:
  2018. self.assertEqual(line, "\xff\n")
  2019. self.assertRaises(IOError, f.tell)
  2020. self.assertEqual(f.tell(), p2)
  2021. f.close()
  2022. def test_seeking(self):
  2023. chunk_size = _default_chunk_size()
  2024. prefix_size = chunk_size - 2
  2025. u_prefix = "a" * prefix_size
  2026. prefix = bytes(u_prefix.encode("utf-8"))
  2027. self.assertEqual(len(u_prefix), len(prefix))
  2028. u_suffix = "\u8888\n"
  2029. suffix = bytes(u_suffix.encode("utf-8"))
  2030. line = prefix + suffix
  2031. f = self.open(support.TESTFN, "wb")
  2032. f.write(line*2)
  2033. f.close()
  2034. f = self.open(support.TESTFN, "r", encoding="utf-8")
  2035. s = f.read(prefix_size)
  2036. self.assertEqual(s, prefix.decode("ascii"))
  2037. self.assertEqual(f.tell(), prefix_size)
  2038. self.assertEqual(f.readline(), u_suffix)
  2039. def test_seeking_too(self):
  2040. # Regression test for a specific bug
  2041. data = b'\xe0\xbf\xbf\n'
  2042. f = self.open(support.TESTFN, "wb")
  2043. f.write(data)
  2044. f.close()
  2045. f = self.open(support.TESTFN, "r", encoding="utf-8")
  2046. f._CHUNK_SIZE # Just test that it exists
  2047. f._CHUNK_SIZE = 2
  2048. f.readline()
  2049. f.tell()
  2050. def test_seek_and_tell(self):
  2051. #Test seek/tell using the StatefulIncrementalDecoder.
  2052. # Make test faster by doing smaller seeks
  2053. CHUNK_SIZE = 128
  2054. def test_seek_and_tell_with_data(data, min_pos=0):
  2055. """Tell/seek to various points within a data stream and ensure
  2056. that the decoded data returned by read() is consistent."""
  2057. f = self.open(support.TESTFN, 'wb')
  2058. f.write(data)
  2059. f.close()
  2060. f = self.open(support.TESTFN, encoding='test_decoder')
  2061. f._CHUNK_SIZE = CHUNK_SIZE
  2062. decoded = f.read()
  2063. f.close()
  2064. for i in range(min_pos, len(decoded) + 1): # seek positions
  2065. for j in [1, 5, len(decoded) - i]: # read lengths
  2066. f = self.open(support.TESTFN, encoding='test_decoder')
  2067. self.assertEqual(f.read(i), decoded[:i])
  2068. cookie = f.tell()
  2069. self.assertEqual(f.read(j), decoded[i:i + j])
  2070. f.seek(cookie)
  2071. self.assertEqual(f.read(), decoded[i:])
  2072. f.close()
  2073. # Enable the test decoder.
  2074. StatefulIncrementalDecoder.codecEnabled = 1
  2075. # Run the tests.
  2076. try:
  2077. # Try each test case.
  2078. for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
  2079. test_seek_and_tell_with_data(input)
  2080. # Position each test case so that it crosses a chunk boundary.
  2081. for input, _, _ in StatefulIncrementalDecoderTest.test_cases:
  2082. offset = CHUNK_SIZE - len(input)//2
  2083. prefix = b'.'*offset
  2084. # Don't bother seeking into the prefix (takes too long).
  2085. min_pos = offset*2
  2086. test_seek_and_tell_with_data(prefix + input, min_pos)
  2087. # Ensure our test decoder won't interfere with subsequent tests.
  2088. finally:
  2089. StatefulIncrementalDecoder.codecEnabled = 0
  2090. def test_encoded_writes(self):
  2091. data = "1234567890"
  2092. tests = ("utf-16",
  2093. "utf-16-le",
  2094. "utf-16-be",
  2095. "utf-32",
  2096. "utf-32-le",
  2097. "utf-32-be")
  2098. for encoding in tests:
  2099. buf = self.BytesIO()
  2100. f = self.TextIOWrapper(buf, encoding=encoding)
  2101. # Check if the BOM is written only once (see issue1753).
  2102. f.write(data)
  2103. f.write(data)
  2104. f.seek(0)
  2105. self.assertEqual(f.read(), data * 2)
  2106. f.seek(0)
  2107. self.assertEqual(f.read(), data * 2)
  2108. self.assertEqual(buf.getvalue(), (data * 2).encode(encoding))
  2109. def test_unreadable(self):
  2110. class UnReadable(self.BytesIO):
  2111. def readable(self):
  2112. return False
  2113. txt = self.TextIOWrapper(UnReadable())
  2114. self.assertRaises(IOError, txt.read)
  2115. def test_read_one_by_one(self):
  2116. txt = self.TextIOWrapper(self.BytesIO(b"AA\r\nBB"))
  2117. reads = ""
  2118. while True:
  2119. c = txt.read(1)
  2120. if not c:
  2121. break
  2122. reads += c
  2123. self.assertEqual(reads, "AA\nBB")
  2124. def test_readlines(self):
  2125. txt = self.TextIOWrapper(self.BytesIO(b"AA\nBB\nCC"))
  2126. self.assertEqual(txt.readlines(), ["AA\n", "BB\n", "CC"])
  2127. txt.seek(0)
  2128. self.assertEqual(txt.readlines(None), ["AA\n", "BB\n", "CC"])
  2129. txt.seek(0)
  2130. self.assertEqual(txt.readlines(5), ["AA\n", "BB\n"])
  2131. # read in amounts equal to TextIOWrapper._CHUNK_SIZE which is 128.
  2132. def test_read_by_chunk(self):
  2133. # make sure "\r\n" straddles 128 char boundary.
  2134. txt = self.TextIOWrapper(self.BytesIO(b"A" * 127 + b"\r\nB"))
  2135. reads = ""
  2136. while True:
  2137. c = txt.read(128)
  2138. if not c:
  2139. break
  2140. reads += c
  2141. self.assertEqual(reads, "A"*127+"\nB")
  2142. def test_writelines(self):
  2143. l = ['ab', 'cd', 'ef']
  2144. buf = self.BytesIO()
  2145. txt = self.TextIOWrapper(buf)
  2146. txt.writelines(l)
  2147. txt.flush()
  2148. self.assertEqual(buf.getvalue(), b'abcdef')
  2149. def test_writelines_userlist(self):
  2150. l = UserList(['ab', 'cd', 'ef'])
  2151. buf = self.BytesIO()
  2152. txt = self.TextIOWrapper(buf)
  2153. txt.writelines(l)
  2154. txt.flush()
  2155. self.assertEqual(buf.getvalue(), b'abcdef')
  2156. def test_writelines_error(self):
  2157. txt = self.TextIOWrapper(self.BytesIO())
  2158. self.assertRaises(TypeError, txt.writelines, [1, 2, 3])
  2159. self.assertRaises(TypeError, txt.writelines, None)
  2160. self.assertRaises(TypeError, txt.writelines, b'abc')
  2161. def test_issue1395_1(self):
  2162. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2163. # read one char at a time
  2164. reads = ""
  2165. while True:
  2166. c = txt.read(1)
  2167. if not c:
  2168. break
  2169. reads += c
  2170. self.assertEqual(reads, self.normalized)
  2171. def test_issue1395_2(self):
  2172. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2173. txt._CHUNK_SIZE = 4
  2174. reads = ""
  2175. while True:
  2176. c = txt.read(4)
  2177. if not c:
  2178. break
  2179. reads += c
  2180. self.assertEqual(reads, self.normalized)
  2181. def test_issue1395_3(self):
  2182. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2183. txt._CHUNK_SIZE = 4
  2184. reads = txt.read(4)
  2185. reads += txt.read(4)
  2186. reads += txt.readline()
  2187. reads += txt.readline()
  2188. reads += txt.readline()
  2189. self.assertEqual(reads, self.normalized)
  2190. def test_issue1395_4(self):
  2191. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2192. txt._CHUNK_SIZE = 4
  2193. reads = txt.read(4)
  2194. reads += txt.read()
  2195. self.assertEqual(reads, self.normalized)
  2196. def test_issue1395_5(self):
  2197. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2198. txt._CHUNK_SIZE = 4
  2199. reads = txt.read(4)
  2200. pos = txt.tell()
  2201. txt.seek(0)
  2202. txt.seek(pos)
  2203. self.assertEqual(txt.read(4), "BBB\n")
  2204. def test_issue2282(self):
  2205. buffer = self.BytesIO(self.testdata)
  2206. txt = self.TextIOWrapper(buffer, encoding="ascii")
  2207. self.assertEqual(buffer.seekable(), txt.seekable())
  2208. def test_append_bom(self):
  2209. # The BOM is not written again when appending to a non-empty file
  2210. filename = support.TESTFN
  2211. for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
  2212. with self.open(filename, 'w', encoding=charset) as f:
  2213. f.write('aaa')
  2214. pos = f.tell()
  2215. with self.open(filename, 'rb') as f:
  2216. self.assertEqual(f.read(), 'aaa'.encode(charset))
  2217. with self.open(filename, 'a', encoding=charset) as f:
  2218. f.write('xxx')
  2219. with self.open(filename, 'rb') as f:
  2220. self.assertEqual(f.read(), 'aaaxxx'.encode(charset))
  2221. def test_seek_bom(self):
  2222. # Same test, but when seeking manually
  2223. filename = support.TESTFN
  2224. for charset in ('utf-8-sig', 'utf-16', 'utf-32'):
  2225. with self.open(filename, 'w', encoding=charset) as f:
  2226. f.write('aaa')
  2227. pos = f.tell()
  2228. with self.open(filename, 'r+', encoding=charset) as f:
  2229. f.seek(pos)
  2230. f.write('zzz')
  2231. f.seek(0)
  2232. f.write('bbb')
  2233. with self.open(filename, 'rb') as f:
  2234. self.assertEqual(f.read(), 'bbbzzz'.encode(charset))
  2235. def test_errors_property(self):
  2236. with self.open(support.TESTFN, "w") as f:
  2237. self.assertEqual(f.errors, "strict")
  2238. with self.open(support.TESTFN, "w", errors="replace") as f:
  2239. self.assertEqual(f.errors, "replace")
  2240. @unittest.skipUnless(threading, 'Threading required for this test.')
  2241. def test_threads_write(self):
  2242. # Issue6750: concurrent writes could duplicate data
  2243. event = threading.Event()
  2244. with self.open(support.TESTFN, "w", buffering=1) as f:
  2245. def run(n):
  2246. text = "Thread%03d\n" % n
  2247. event.wait()
  2248. f.write(text)
  2249. threads = [threading.Thread(target=run, args=(x,))
  2250. for x in range(20)]
  2251. with support.start_threads(threads, event.set):
  2252. time.sleep(0.02)
  2253. with self.open(support.TESTFN) as f:
  2254. content = f.read()
  2255. for n in range(20):
  2256. self.assertEqual(content.count("Thread%03d\n" % n), 1)
  2257. def test_flush_error_on_close(self):
  2258. # Test that text file is closed despite failed flush
  2259. # and that flush() is called before file closed.
  2260. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2261. closed = []
  2262. def bad_flush():
  2263. closed[:] = [txt.closed, txt.buffer.closed]
  2264. raise IOError()
  2265. txt.flush = bad_flush
  2266. self.assertRaises(IOError, txt.close) # exception not swallowed
  2267. self.assertTrue(txt.closed)
  2268. self.assertTrue(txt.buffer.closed)
  2269. self.assertTrue(closed) # flush() called
  2270. self.assertFalse(closed[0]) # flush() called before file closed
  2271. self.assertFalse(closed[1])
  2272. txt.flush = lambda: None # break reference loop
  2273. def test_multi_close(self):
  2274. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2275. txt.close()
  2276. txt.close()
  2277. txt.close()
  2278. self.assertRaises(ValueError, txt.flush)
  2279. def test_readonly_attributes(self):
  2280. txt = self.TextIOWrapper(self.BytesIO(self.testdata), encoding="ascii")
  2281. buf = self.BytesIO(self.testdata)
  2282. with self.assertRaises((AttributeError, TypeError)):
  2283. txt.buffer = buf
  2284. def test_read_nonbytes(self):
  2285. # Issue #17106
  2286. # Crash when underlying read() returns non-bytes
  2287. class NonbytesStream(self.StringIO):
  2288. read1 = self.StringIO.read
  2289. class NonbytesStream(self.StringIO):
  2290. read1 = self.StringIO.read
  2291. t = self.TextIOWrapper(NonbytesStream('a'))
  2292. with self.maybeRaises(TypeError):
  2293. t.read(1)
  2294. t = self.TextIOWrapper(NonbytesStream('a'))
  2295. with self.maybeRaises(TypeError):
  2296. t.readline()
  2297. t = self.TextIOWrapper(NonbytesStream('a'))
  2298. self.assertEqual(t.read(), u'a')
  2299. def test_illegal_decoder(self):
  2300. # Issue #17106
  2301. # Bypass the early encoding check added in issue 20404
  2302. def _make_illegal_wrapper():
  2303. quopri = codecs.lookup("quopri_codec")
  2304. quopri._is_text_encoding = True
  2305. try:
  2306. t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'),
  2307. newline='\n', encoding="quopri_codec")
  2308. finally:
  2309. quopri._is_text_encoding = False
  2310. return t
  2311. # Crash when decoder returns non-string
  2312. with support.check_py3k_warnings():
  2313. t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
  2314. encoding='quopri_codec')
  2315. with self.maybeRaises(TypeError):
  2316. t.read(1)
  2317. with support.check_py3k_warnings():
  2318. t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
  2319. encoding='quopri_codec')
  2320. with self.maybeRaises(TypeError):
  2321. t.readline()
  2322. with support.check_py3k_warnings():
  2323. t = self.TextIOWrapper(self.BytesIO(b'aaaaaa'), newline='\n',
  2324. encoding='quopri_codec')
  2325. with self.maybeRaises(TypeError):
  2326. t.read()
  2327. #else:
  2328. #t = _make_illegal_wrapper()
  2329. #self.assertRaises(TypeError, t.read, 1)
  2330. #t = _make_illegal_wrapper()
  2331. #self.assertRaises(TypeError, t.readline)
  2332. #t = _make_illegal_wrapper()
  2333. #self.assertRaises(TypeError, t.read)
  2334. class CTextIOWrapperTest(TextIOWrapperTest):
  2335. def test_initialization(self):
  2336. r = self.BytesIO(b"\xc3\xa9\n\n")
  2337. b = self.BufferedReader(r, 1000)
  2338. t = self.TextIOWrapper(b)
  2339. self.assertRaises(TypeError, t.__init__, b, newline=42)
  2340. self.assertRaises(ValueError, t.read)
  2341. self.assertRaises(ValueError, t.__init__, b, newline='xyzzy')
  2342. self.assertRaises(ValueError, t.read)
  2343. t = self.TextIOWrapper.__new__(self.TextIOWrapper)
  2344. self.assertRaises(Exception, repr, t)
  2345. def test_garbage_collection(self):
  2346. # C TextIOWrapper objects are collected, and collecting them flushes
  2347. # all data to disk.
  2348. # The Python version has __del__, so it ends in gc.garbage instead.
  2349. rawio = io.FileIO(support.TESTFN, "wb")
  2350. b = self.BufferedWriter(rawio)
  2351. t = self.TextIOWrapper(b, encoding="ascii")
  2352. t.write("456def")
  2353. t.x = t
  2354. wr = weakref.ref(t)
  2355. del t
  2356. support.gc_collect()
  2357. self.assertIsNone(wr(), wr)
  2358. with self.open(support.TESTFN, "rb") as f:
  2359. self.assertEqual(f.read(), b"456def")
  2360. def test_rwpair_cleared_before_textio(self):
  2361. # Issue 13070: TextIOWrapper's finalization would crash when called
  2362. # after the reference to the underlying BufferedRWPair's writer got
  2363. # cleared by the GC.
  2364. for i in range(1000):
  2365. b1 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
  2366. t1 = self.TextIOWrapper(b1, encoding="ascii")
  2367. b2 = self.BufferedRWPair(self.MockRawIO(), self.MockRawIO())
  2368. t2 = self.TextIOWrapper(b2, encoding="ascii")
  2369. # circular references
  2370. t1.buddy = t2
  2371. t2.buddy = t1
  2372. support.gc_collect()
  2373. maybeRaises = unittest.TestCase.assertRaises
  2374. class PyTextIOWrapperTest(TextIOWrapperTest):
  2375. @contextlib.contextmanager
  2376. def maybeRaises(self, *args, **kwds):
  2377. yield
  2378. class IncrementalNewlineDecoderTest(unittest.TestCase):
  2379. def check_newline_decoding_utf8(self, decoder):
  2380. # UTF-8 specific tests for a newline decoder
  2381. def _check_decode(b, s, **kwargs):
  2382. # We exercise getstate() / setstate() as well as decode()
  2383. state = decoder.getstate()
  2384. self.assertEqual(decoder.decode(b, **kwargs), s)
  2385. decoder.setstate(state)
  2386. self.assertEqual(decoder.decode(b, **kwargs), s)
  2387. _check_decode(b'\xe8\xa2\x88', "\u8888")
  2388. _check_decode(b'\xe8', "")
  2389. _check_decode(b'\xa2', "")
  2390. _check_decode(b'\x88', "\u8888")
  2391. _check_decode(b'\xe8', "")
  2392. _check_decode(b'\xa2', "")
  2393. _check_decode(b'\x88', "\u8888")
  2394. _check_decode(b'\xe8', "")
  2395. self.assertRaises(UnicodeDecodeError, decoder.decode, b'', final=True)
  2396. decoder.reset()
  2397. _check_decode(b'\n', "\n")
  2398. _check_decode(b'\r', "")
  2399. _check_decode(b'', "\n", final=True)
  2400. _check_decode(b'\r', "\n", final=True)
  2401. _check_decode(b'\r', "")
  2402. _check_decode(b'a', "\na")
  2403. _check_decode(b'\r\r\n', "\n\n")
  2404. _check_decode(b'\r', "")
  2405. _check_decode(b'\r', "\n")
  2406. _check_decode(b'\na', "\na")
  2407. _check_decode(b'\xe8\xa2\x88\r\n', "\u8888\n")
  2408. _check_decode(b'\xe8\xa2\x88', "\u8888")
  2409. _check_decode(b'\n', "\n")
  2410. _check_decode(b'\xe8\xa2\x88\r', "\u8888")
  2411. _check_decode(b'\n', "\n")
  2412. def check_newline_decoding(self, decoder, encoding):
  2413. result = []
  2414. if encoding is not None:
  2415. encoder = codecs.getincrementalencoder(encoding)()
  2416. def _decode_bytewise(s):
  2417. # Decode one byte at a time
  2418. for b in encoder.encode(s):
  2419. result.append(decoder.decode(b))
  2420. else:
  2421. encoder = None
  2422. def _decode_bytewise(s):
  2423. # Decode one char at a time
  2424. for c in s:
  2425. result.append(decoder.decode(c))
  2426. self.assertEqual(decoder.newlines, None)
  2427. _decode_bytewise("abc\n\r")
  2428. self.assertEqual(decoder.newlines, '\n')
  2429. _decode_bytewise("\nabc")
  2430. self.assertEqual(decoder.newlines, ('\n', '\r\n'))
  2431. _decode_bytewise("abc\r")
  2432. self.assertEqual(decoder.newlines, ('\n', '\r\n'))
  2433. _decode_bytewise("abc")
  2434. self.assertEqual(decoder.newlines, ('\r', '\n', '\r\n'))
  2435. _decode_bytewise("abc\r")
  2436. self.assertEqual("".join(result), "abc\n\nabcabc\nabcabc")
  2437. decoder.reset()
  2438. input = "abc"
  2439. if encoder is not None:
  2440. encoder.reset()
  2441. input = encoder.encode(input)
  2442. self.assertEqual(decoder.decode(input), "abc")
  2443. self.assertEqual(decoder.newlines, None)
  2444. def test_newline_decoder(self):
  2445. encodings = (
  2446. # None meaning the IncrementalNewlineDecoder takes unicode input
  2447. # rather than bytes input
  2448. None, 'utf-8', 'latin-1',
  2449. 'utf-16', 'utf-16-le', 'utf-16-be',
  2450. 'utf-32', 'utf-32-le', 'utf-32-be',
  2451. )
  2452. for enc in encodings:
  2453. decoder = enc and codecs.getincrementaldecoder(enc)()
  2454. decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
  2455. self.check_newline_decoding(decoder, enc)
  2456. decoder = codecs.getincrementaldecoder("utf-8")()
  2457. decoder = self.IncrementalNewlineDecoder(decoder, translate=True)
  2458. self.check_newline_decoding_utf8(decoder)
  2459. def test_newline_bytes(self):
  2460. # Issue 5433: Excessive optimization in IncrementalNewlineDecoder
  2461. def _check(dec):
  2462. self.assertEqual(dec.newlines, None)
  2463. self.assertEqual(dec.decode("\u0D00"), "\u0D00")
  2464. self.assertEqual(dec.newlines, None)
  2465. self.assertEqual(dec.decode("\u0A00"), "\u0A00")
  2466. self.assertEqual(dec.newlines, None)
  2467. dec = self.IncrementalNewlineDecoder(None, translate=False)
  2468. _check(dec)
  2469. dec = self.IncrementalNewlineDecoder(None, translate=True)
  2470. _check(dec)
  2471. class CIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
  2472. pass
  2473. class PyIncrementalNewlineDecoderTest(IncrementalNewlineDecoderTest):
  2474. pass
  2475. # XXX Tests for open()
  2476. class MiscIOTest(unittest.TestCase):
  2477. def tearDown(self):
  2478. support.unlink(support.TESTFN)
  2479. def test___all__(self):
  2480. for name in self.io.__all__:
  2481. obj = getattr(self.io, name, None)
  2482. self.assertIsNotNone(obj, name)
  2483. if name == "open":
  2484. continue
  2485. elif "error" in name.lower() or name == "UnsupportedOperation":
  2486. self.assertTrue(issubclass(obj, Exception), name)
  2487. elif not name.startswith("SEEK_"):
  2488. self.assertTrue(issubclass(obj, self.IOBase))
  2489. def test_attributes(self):
  2490. f = self.open(support.TESTFN, "wb", buffering=0)
  2491. self.assertEqual(f.mode, "wb")
  2492. f.close()
  2493. f = self.open(support.TESTFN, "U")
  2494. self.assertEqual(f.name, support.TESTFN)
  2495. self.assertEqual(f.buffer.name, support.TESTFN)
  2496. self.assertEqual(f.buffer.raw.name, support.TESTFN)
  2497. self.assertEqual(f.mode, "U")
  2498. self.assertEqual(f.buffer.mode, "rb")
  2499. self.assertEqual(f.buffer.raw.mode, "rb")
  2500. f.close()
  2501. f = self.open(support.TESTFN, "w+")
  2502. self.assertEqual(f.mode, "w+")
  2503. self.assertEqual(f.buffer.mode, "rb+") # Does it really matter?
  2504. self.assertEqual(f.buffer.raw.mode, "rb+")
  2505. g = self.open(f.fileno(), "wb", closefd=False)
  2506. self.assertEqual(g.mode, "wb")
  2507. self.assertEqual(g.raw.mode, "wb")
  2508. self.assertEqual(g.name, f.fileno())
  2509. self.assertEqual(g.raw.name, f.fileno())
  2510. f.close()
  2511. g.close()
  2512. def test_io_after_close(self):
  2513. for kwargs in [
  2514. {"mode": "w"},
  2515. {"mode": "wb"},
  2516. {"mode": "w", "buffering": 1},
  2517. {"mode": "w", "buffering": 2},
  2518. {"mode": "wb", "buffering": 0},
  2519. {"mode": "r"},
  2520. {"mode": "rb"},
  2521. {"mode": "r", "buffering": 1},
  2522. {"mode": "r", "buffering": 2},
  2523. {"mode": "rb", "buffering": 0},
  2524. {"mode": "w+"},
  2525. {"mode": "w+b"},
  2526. {"mode": "w+", "buffering": 1},
  2527. {"mode": "w+", "buffering": 2},
  2528. {"mode": "w+b", "buffering": 0},
  2529. ]:
  2530. f = self.open(support.TESTFN, **kwargs)
  2531. f.close()
  2532. self.assertRaises(ValueError, f.flush)
  2533. self.assertRaises(ValueError, f.fileno)
  2534. self.assertRaises(ValueError, f.isatty)
  2535. self.assertRaises(ValueError, f.__iter__)
  2536. if hasattr(f, "peek"):
  2537. self.assertRaises(ValueError, f.peek, 1)
  2538. self.assertRaises(ValueError, f.read)
  2539. if hasattr(f, "read1"):
  2540. self.assertRaises(ValueError, f.read1, 1024)
  2541. if hasattr(f, "readall"):
  2542. self.assertRaises(ValueError, f.readall)
  2543. if hasattr(f, "readinto"):
  2544. self.assertRaises(ValueError, f.readinto, bytearray(1024))
  2545. self.assertRaises(ValueError, f.readline)
  2546. self.assertRaises(ValueError, f.readlines)
  2547. self.assertRaises(ValueError, f.seek, 0)
  2548. self.assertRaises(ValueError, f.tell)
  2549. self.assertRaises(ValueError, f.truncate)
  2550. self.assertRaises(ValueError, f.write,
  2551. b"" if "b" in kwargs['mode'] else "")
  2552. self.assertRaises(ValueError, f.writelines, [])
  2553. self.assertRaises(ValueError, next, f)
  2554. def test_blockingioerror(self):
  2555. # Various BlockingIOError issues
  2556. self.assertRaises(TypeError, self.BlockingIOError)
  2557. self.assertRaises(TypeError, self.BlockingIOError, 1)
  2558. self.assertRaises(TypeError, self.BlockingIOError, 1, 2, 3, 4)
  2559. self.assertRaises(TypeError, self.BlockingIOError, 1, "", None)
  2560. b = self.BlockingIOError(1, "")
  2561. self.assertEqual(b.characters_written, 0)
  2562. class C(unicode):
  2563. pass
  2564. c = C("")
  2565. b = self.BlockingIOError(1, c)
  2566. c.b = b
  2567. b.c = c
  2568. wr = weakref.ref(c)
  2569. del c, b
  2570. support.gc_collect()
  2571. self.assertIsNone(wr(), wr)
  2572. def test_abcs(self):
  2573. # Test the visible base classes are ABCs.
  2574. self.assertIsInstance(self.IOBase, abc.ABCMeta)
  2575. self.assertIsInstance(self.RawIOBase, abc.ABCMeta)
  2576. self.assertIsInstance(self.BufferedIOBase, abc.ABCMeta)
  2577. self.assertIsInstance(self.TextIOBase, abc.ABCMeta)
  2578. def _check_abc_inheritance(self, abcmodule):
  2579. with self.open(support.TESTFN, "wb", buffering=0) as f:
  2580. self.assertIsInstance(f, abcmodule.IOBase)
  2581. self.assertIsInstance(f, abcmodule.RawIOBase)
  2582. self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
  2583. self.assertNotIsInstance(f, abcmodule.TextIOBase)
  2584. with self.open(support.TESTFN, "wb") as f:
  2585. self.assertIsInstance(f, abcmodule.IOBase)
  2586. self.assertNotIsInstance(f, abcmodule.RawIOBase)
  2587. self.assertIsInstance(f, abcmodule.BufferedIOBase)
  2588. self.assertNotIsInstance(f, abcmodule.TextIOBase)
  2589. with self.open(support.TESTFN, "w") as f:
  2590. self.assertIsInstance(f, abcmodule.IOBase)
  2591. self.assertNotIsInstance(f, abcmodule.RawIOBase)
  2592. self.assertNotIsInstance(f, abcmodule.BufferedIOBase)
  2593. self.assertIsInstance(f, abcmodule.TextIOBase)
  2594. def test_abc_inheritance(self):
  2595. # Test implementations inherit from their respective ABCs
  2596. self._check_abc_inheritance(self)
  2597. def test_abc_inheritance_official(self):
  2598. # Test implementations inherit from the official ABCs of the
  2599. # baseline "io" module.
  2600. self._check_abc_inheritance(io)
  2601. @unittest.skipUnless(fcntl, 'fcntl required for this test')
  2602. def test_nonblock_pipe_write_bigbuf(self):
  2603. self._test_nonblock_pipe_write(16*1024)
  2604. @unittest.skipUnless(fcntl, 'fcntl required for this test')
  2605. def test_nonblock_pipe_write_smallbuf(self):
  2606. self._test_nonblock_pipe_write(1024)
  2607. def _set_non_blocking(self, fd):
  2608. flags = fcntl.fcntl(fd, fcntl.F_GETFL)
  2609. self.assertNotEqual(flags, -1)
  2610. res = fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
  2611. self.assertEqual(res, 0)
  2612. def _test_nonblock_pipe_write(self, bufsize):
  2613. sent = []
  2614. received = []
  2615. r, w = os.pipe()
  2616. self._set_non_blocking(r)
  2617. self._set_non_blocking(w)
  2618. # To exercise all code paths in the C implementation we need
  2619. # to play with buffer sizes. For instance, if we choose a
  2620. # buffer size less than or equal to _PIPE_BUF (4096 on Linux)
  2621. # then we will never get a partial write of the buffer.
  2622. rf = self.open(r, mode='rb', closefd=True, buffering=bufsize)
  2623. wf = self.open(w, mode='wb', closefd=True, buffering=bufsize)
  2624. with rf, wf:
  2625. for N in 9999, 73, 7574:
  2626. try:
  2627. i = 0
  2628. while True:
  2629. msg = bytes([i % 26 + 97]) * N
  2630. sent.append(msg)
  2631. wf.write(msg)
  2632. i += 1
  2633. except self.BlockingIOError as e:
  2634. self.assertEqual(e.args[0], errno.EAGAIN)
  2635. sent[-1] = sent[-1][:e.characters_written]
  2636. received.append(rf.read())
  2637. msg = b'BLOCKED'
  2638. wf.write(msg)
  2639. sent.append(msg)
  2640. while True:
  2641. try:
  2642. wf.flush()
  2643. break
  2644. except self.BlockingIOError as e:
  2645. self.assertEqual(e.args[0], errno.EAGAIN)
  2646. self.assertEqual(e.characters_written, 0)
  2647. received.append(rf.read())
  2648. received += iter(rf.read, None)
  2649. sent, received = b''.join(sent), b''.join(received)
  2650. self.assertEqual(sent, received)
  2651. self.assertTrue(wf.closed)
  2652. self.assertTrue(rf.closed)
  2653. class CMiscIOTest(MiscIOTest):
  2654. io = io
  2655. shutdown_error = "RuntimeError: could not find io module state"
  2656. class PyMiscIOTest(MiscIOTest):
  2657. io = pyio
  2658. shutdown_error = "LookupError: unknown encoding: ascii"
  2659. @unittest.skipIf(os.name == 'nt', 'POSIX signals required for this test.')
  2660. class SignalsTest(unittest.TestCase):
  2661. def setUp(self):
  2662. self.oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt)
  2663. def tearDown(self):
  2664. signal.signal(signal.SIGALRM, self.oldalrm)
  2665. def alarm_interrupt(self, sig, frame):
  2666. 1 // 0
  2667. @unittest.skipUnless(threading, 'Threading required for this test.')
  2668. @unittest.skipIf(sys.platform in ('freebsd5', 'freebsd6', 'freebsd7'),
  2669. 'issue #12429: skip test on FreeBSD <= 7')
  2670. def check_interrupted_write(self, item, bytes, **fdopen_kwargs):
  2671. """Check that a partial write, when it gets interrupted, properly
  2672. invokes the signal handler, and bubbles up the exception raised
  2673. in the latter."""
  2674. read_results = []
  2675. def _read():
  2676. s = os.read(r, 1)
  2677. read_results.append(s)
  2678. t = threading.Thread(target=_read)
  2679. t.daemon = True
  2680. r, w = os.pipe()
  2681. try:
  2682. wio = self.io.open(w, **fdopen_kwargs)
  2683. t.start()
  2684. signal.alarm(1)
  2685. # Fill the pipe enough that the write will be blocking.
  2686. # It will be interrupted by the timer armed above. Since the
  2687. # other thread has read one byte, the low-level write will
  2688. # return with a successful (partial) result rather than an EINTR.
  2689. # The buffered IO layer must check for pending signal
  2690. # handlers, which in this case will invoke alarm_interrupt().
  2691. try:
  2692. with self.assertRaises(ZeroDivisionError):
  2693. wio.write(item * (support.PIPE_MAX_SIZE // len(item) + 1))
  2694. finally:
  2695. t.join()
  2696. # We got one byte, get another one and check that it isn't a
  2697. # repeat of the first one.
  2698. read_results.append(os.read(r, 1))
  2699. self.assertEqual(read_results, [bytes[0:1], bytes[1:2]])
  2700. finally:
  2701. os.close(w)
  2702. os.close(r)
  2703. # This is deliberate. If we didn't close the file descriptor
  2704. # before closing wio, wio would try to flush its internal
  2705. # buffer, and block again.
  2706. try:
  2707. wio.close()
  2708. except IOError as e:
  2709. if e.errno != errno.EBADF:
  2710. raise
  2711. def test_interrupted_write_unbuffered(self):
  2712. self.check_interrupted_write(b"xy", b"xy", mode="wb", buffering=0)
  2713. def test_interrupted_write_buffered(self):
  2714. self.check_interrupted_write(b"xy", b"xy", mode="wb")
  2715. def test_interrupted_write_text(self):
  2716. self.check_interrupted_write("xy", b"xy", mode="w", encoding="ascii")
  2717. def check_reentrant_write(self, data, **fdopen_kwargs):
  2718. def on_alarm(*args):
  2719. # Will be called reentrantly from the same thread
  2720. wio.write(data)
  2721. 1//0
  2722. signal.signal(signal.SIGALRM, on_alarm)
  2723. r, w = os.pipe()
  2724. wio = self.io.open(w, **fdopen_kwargs)
  2725. try:
  2726. signal.alarm(1)
  2727. # Either the reentrant call to wio.write() fails with RuntimeError,
  2728. # or the signal handler raises ZeroDivisionError.
  2729. with self.assertRaises((ZeroDivisionError, RuntimeError)) as cm:
  2730. while 1:
  2731. for i in range(100):
  2732. wio.write(data)
  2733. wio.flush()
  2734. # Make sure the buffer doesn't fill up and block further writes
  2735. os.read(r, len(data) * 100)
  2736. exc = cm.exception
  2737. if isinstance(exc, RuntimeError):
  2738. self.assertTrue(str(exc).startswith("reentrant call"), str(exc))
  2739. finally:
  2740. wio.close()
  2741. os.close(r)
  2742. def test_reentrant_write_buffered(self):
  2743. self.check_reentrant_write(b"xy", mode="wb")
  2744. def test_reentrant_write_text(self):
  2745. self.check_reentrant_write("xy", mode="w", encoding="ascii")
  2746. def check_interrupted_read_retry(self, decode, **fdopen_kwargs):
  2747. """Check that a buffered read, when it gets interrupted (either
  2748. returning a partial result or EINTR), properly invokes the signal
  2749. handler and retries if the latter returned successfully."""
  2750. r, w = os.pipe()
  2751. fdopen_kwargs["closefd"] = False
  2752. def alarm_handler(sig, frame):
  2753. os.write(w, b"bar")
  2754. signal.signal(signal.SIGALRM, alarm_handler)
  2755. try:
  2756. rio = self.io.open(r, **fdopen_kwargs)
  2757. os.write(w, b"foo")
  2758. signal.alarm(1)
  2759. # Expected behaviour:
  2760. # - first raw read() returns partial b"foo"
  2761. # - second raw read() returns EINTR
  2762. # - third raw read() returns b"bar"
  2763. self.assertEqual(decode(rio.read(6)), "foobar")
  2764. finally:
  2765. rio.close()
  2766. os.close(w)
  2767. os.close(r)
  2768. def test_interrupterd_read_retry_buffered(self):
  2769. self.check_interrupted_read_retry(lambda x: x.decode('latin1'),
  2770. mode="rb")
  2771. def test_interrupterd_read_retry_text(self):
  2772. self.check_interrupted_read_retry(lambda x: x,
  2773. mode="r")
  2774. @unittest.skipUnless(threading, 'Threading required for this test.')
  2775. def check_interrupted_write_retry(self, item, **fdopen_kwargs):
  2776. """Check that a buffered write, when it gets interrupted (either
  2777. returning a partial result or EINTR), properly invokes the signal
  2778. handler and retries if the latter returned successfully."""
  2779. select = support.import_module("select")
  2780. # A quantity that exceeds the buffer size of an anonymous pipe's
  2781. # write end.
  2782. N = support.PIPE_MAX_SIZE
  2783. r, w = os.pipe()
  2784. fdopen_kwargs["closefd"] = False
  2785. # We need a separate thread to read from the pipe and allow the
  2786. # write() to finish. This thread is started after the SIGALRM is
  2787. # received (forcing a first EINTR in write()).
  2788. read_results = []
  2789. write_finished = False
  2790. error = [None]
  2791. def _read():
  2792. try:
  2793. while not write_finished:
  2794. while r in select.select([r], [], [], 1.0)[0]:
  2795. s = os.read(r, 1024)
  2796. read_results.append(s)
  2797. except BaseException as exc:
  2798. error[0] = exc
  2799. t = threading.Thread(target=_read)
  2800. t.daemon = True
  2801. def alarm1(sig, frame):
  2802. signal.signal(signal.SIGALRM, alarm2)
  2803. signal.alarm(1)
  2804. def alarm2(sig, frame):
  2805. t.start()
  2806. signal.signal(signal.SIGALRM, alarm1)
  2807. try:
  2808. wio = self.io.open(w, **fdopen_kwargs)
  2809. signal.alarm(1)
  2810. # Expected behaviour:
  2811. # - first raw write() is partial (because of the limited pipe buffer
  2812. # and the first alarm)
  2813. # - second raw write() returns EINTR (because of the second alarm)
  2814. # - subsequent write()s are successful (either partial or complete)
  2815. self.assertEqual(N, wio.write(item * N))
  2816. wio.flush()
  2817. write_finished = True
  2818. t.join()
  2819. self.assertIsNone(error[0])
  2820. self.assertEqual(N, sum(len(x) for x in read_results))
  2821. finally:
  2822. write_finished = True
  2823. os.close(w)
  2824. os.close(r)
  2825. # This is deliberate. If we didn't close the file descriptor
  2826. # before closing wio, wio would try to flush its internal
  2827. # buffer, and could block (in case of failure).
  2828. try:
  2829. wio.close()
  2830. except IOError as e:
  2831. if e.errno != errno.EBADF:
  2832. raise
  2833. def test_interrupterd_write_retry_buffered(self):
  2834. self.check_interrupted_write_retry(b"x", mode="wb")
  2835. def test_interrupterd_write_retry_text(self):
  2836. self.check_interrupted_write_retry("x", mode="w", encoding="latin1")
  2837. class CSignalsTest(SignalsTest):
  2838. io = io
  2839. class PySignalsTest(SignalsTest):
  2840. io = pyio
  2841. # Handling reentrancy issues would slow down _pyio even more, so the
  2842. # tests are disabled.
  2843. test_reentrant_write_buffered = None
  2844. test_reentrant_write_text = None
  2845. def test_main():
  2846. tests = (CIOTest, PyIOTest,
  2847. CBufferedReaderTest, PyBufferedReaderTest,
  2848. CBufferedWriterTest, PyBufferedWriterTest,
  2849. CBufferedRWPairTest, PyBufferedRWPairTest,
  2850. CBufferedRandomTest, PyBufferedRandomTest,
  2851. StatefulIncrementalDecoderTest,
  2852. CIncrementalNewlineDecoderTest, PyIncrementalNewlineDecoderTest,
  2853. CTextIOWrapperTest, PyTextIOWrapperTest,
  2854. CMiscIOTest, PyMiscIOTest,
  2855. CSignalsTest, PySignalsTest,
  2856. )
  2857. # Put the namespaces of the IO module we are testing and some useful mock
  2858. # classes in the __dict__ of each test.
  2859. mocks = (MockRawIO, MisbehavedRawIO, MockFileIO, CloseFailureIO,
  2860. MockNonBlockWriterIO, MockRawIOWithoutRead)
  2861. all_members = io.__all__ + ["IncrementalNewlineDecoder"]
  2862. c_io_ns = dict((name, getattr(io, name)) for name in all_members)
  2863. py_io_ns = dict((name, getattr(pyio, name)) for name in all_members)
  2864. globs = globals()
  2865. c_io_ns.update((x.__name__, globs["C" + x.__name__]) for x in mocks)
  2866. py_io_ns.update((x.__name__, globs["Py" + x.__name__]) for x in mocks)
  2867. # Avoid turning open into a bound method.
  2868. py_io_ns["open"] = pyio.OpenWrapper
  2869. for test in tests:
  2870. if test.__name__.startswith("C"):
  2871. for name, obj in c_io_ns.items():
  2872. setattr(test, name, obj)
  2873. elif test.__name__.startswith("Py"):
  2874. for name, obj in py_io_ns.items():
  2875. setattr(test, name, obj)
  2876. support.run_unittest(*tests)
  2877. if __name__ == "__main__":
  2878. test_main()