test_gzip.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346
  1. """Test script for the gzip module.
  2. """
  3. import unittest
  4. from test import test_support
  5. import os
  6. import io
  7. import struct
  8. gzip = test_support.import_module('gzip')
  9. data1 = """ int length=DEFAULTALLOC, err = Z_OK;
  10. PyObject *RetVal;
  11. int flushmode = Z_FINISH;
  12. unsigned long start_total_out;
  13. """
  14. data2 = """/* zlibmodule.c -- gzip-compatible data compression */
  15. /* See http://www.gzip.org/zlib/
  16. /* See http://www.winimage.com/zLibDll for Windows */
  17. """
  18. class TestGzip(unittest.TestCase):
  19. filename = test_support.TESTFN
  20. def setUp(self):
  21. test_support.unlink(self.filename)
  22. def tearDown(self):
  23. test_support.unlink(self.filename)
  24. def write_and_read_back(self, data, mode='b'):
  25. b_data = memoryview(data).tobytes()
  26. with gzip.GzipFile(self.filename, 'w'+mode) as f:
  27. l = f.write(data)
  28. self.assertEqual(l, len(b_data))
  29. with gzip.GzipFile(self.filename, 'r'+mode) as f:
  30. self.assertEqual(f.read(), b_data)
  31. @test_support.requires_unicode
  32. def test_unicode_filename(self):
  33. unicode_filename = test_support.TESTFN_UNICODE
  34. try:
  35. unicode_filename.encode(test_support.TESTFN_ENCODING)
  36. except (UnicodeError, TypeError):
  37. self.skipTest("Requires unicode filenames support")
  38. self.filename = unicode_filename
  39. with gzip.GzipFile(unicode_filename, "wb") as f:
  40. f.write(data1 * 50)
  41. with gzip.GzipFile(unicode_filename, "rb") as f:
  42. self.assertEqual(f.read(), data1 * 50)
  43. # Sanity check that we are actually operating on the right file.
  44. with open(unicode_filename, 'rb') as fobj, \
  45. gzip.GzipFile(fileobj=fobj, mode="rb") as f:
  46. self.assertEqual(f.read(), data1 * 50)
  47. def test_write(self):
  48. with gzip.GzipFile(self.filename, 'wb') as f:
  49. f.write(data1 * 50)
  50. # Try flush and fileno.
  51. f.flush()
  52. f.fileno()
  53. if hasattr(os, 'fsync'):
  54. os.fsync(f.fileno())
  55. f.close()
  56. # Test multiple close() calls.
  57. f.close()
  58. # The following test_write_xy methods test that write accepts
  59. # the corresponding bytes-like object type as input
  60. # and that the data written equals bytes(xy) in all cases.
  61. def test_write_memoryview(self):
  62. self.write_and_read_back(memoryview(data1 * 50))
  63. def test_write_incompatible_type(self):
  64. # Test that non-bytes-like types raise TypeError.
  65. # Issue #21560: attempts to write incompatible types
  66. # should not affect the state of the fileobject
  67. with gzip.GzipFile(self.filename, 'wb') as f:
  68. with self.assertRaises(UnicodeEncodeError):
  69. f.write(u'\xff')
  70. with self.assertRaises(TypeError):
  71. f.write([1])
  72. f.write(data1)
  73. with gzip.GzipFile(self.filename, 'rb') as f:
  74. self.assertEqual(f.read(), data1)
  75. def test_read(self):
  76. self.test_write()
  77. # Try reading.
  78. with gzip.GzipFile(self.filename, 'r') as f:
  79. d = f.read()
  80. self.assertEqual(d, data1*50)
  81. def test_read_universal_newlines(self):
  82. # Issue #5148: Reading breaks when mode contains 'U'.
  83. self.test_write()
  84. with gzip.GzipFile(self.filename, 'rU') as f:
  85. d = f.read()
  86. self.assertEqual(d, data1*50)
  87. def test_io_on_closed_object(self):
  88. # Test that I/O operations on closed GzipFile objects raise a
  89. # ValueError, just like the corresponding functions on file objects.
  90. # Write to a file, open it for reading, then close it.
  91. self.test_write()
  92. f = gzip.GzipFile(self.filename, 'r')
  93. f.close()
  94. with self.assertRaises(ValueError):
  95. f.read(1)
  96. with self.assertRaises(ValueError):
  97. f.seek(0)
  98. with self.assertRaises(ValueError):
  99. f.tell()
  100. # Open the file for writing, then close it.
  101. f = gzip.GzipFile(self.filename, 'w')
  102. f.close()
  103. with self.assertRaises(ValueError):
  104. f.write('')
  105. with self.assertRaises(ValueError):
  106. f.flush()
  107. def test_append(self):
  108. self.test_write()
  109. # Append to the previous file
  110. with gzip.GzipFile(self.filename, 'ab') as f:
  111. f.write(data2 * 15)
  112. with gzip.GzipFile(self.filename, 'rb') as f:
  113. d = f.read()
  114. self.assertEqual(d, (data1*50) + (data2*15))
  115. def test_many_append(self):
  116. # Bug #1074261 was triggered when reading a file that contained
  117. # many, many members. Create such a file and verify that reading it
  118. # works.
  119. with gzip.open(self.filename, 'wb', 9) as f:
  120. f.write('a')
  121. for i in range(0, 200):
  122. with gzip.open(self.filename, "ab", 9) as f: # append
  123. f.write('a')
  124. # Try reading the file
  125. with gzip.open(self.filename, "rb") as zgfile:
  126. contents = ""
  127. while 1:
  128. ztxt = zgfile.read(8192)
  129. contents += ztxt
  130. if not ztxt: break
  131. self.assertEqual(contents, 'a'*201)
  132. def test_buffered_reader(self):
  133. # Issue #7471: a GzipFile can be wrapped in a BufferedReader for
  134. # performance.
  135. self.test_write()
  136. with gzip.GzipFile(self.filename, 'rb') as f:
  137. with io.BufferedReader(f) as r:
  138. lines = [line for line in r]
  139. self.assertEqual(lines, 50 * data1.splitlines(True))
  140. def test_readline(self):
  141. self.test_write()
  142. # Try .readline() with varying line lengths
  143. with gzip.GzipFile(self.filename, 'rb') as f:
  144. line_length = 0
  145. while 1:
  146. L = f.readline(line_length)
  147. if not L and line_length != 0: break
  148. self.assertTrue(len(L) <= line_length)
  149. line_length = (line_length + 1) % 50
  150. def test_readlines(self):
  151. self.test_write()
  152. # Try .readlines()
  153. with gzip.GzipFile(self.filename, 'rb') as f:
  154. L = f.readlines()
  155. with gzip.GzipFile(self.filename, 'rb') as f:
  156. while 1:
  157. L = f.readlines(150)
  158. if L == []: break
  159. def test_seek_read(self):
  160. self.test_write()
  161. # Try seek, read test
  162. with gzip.GzipFile(self.filename) as f:
  163. while 1:
  164. oldpos = f.tell()
  165. line1 = f.readline()
  166. if not line1: break
  167. newpos = f.tell()
  168. f.seek(oldpos) # negative seek
  169. if len(line1)>10:
  170. amount = 10
  171. else:
  172. amount = len(line1)
  173. line2 = f.read(amount)
  174. self.assertEqual(line1[:amount], line2)
  175. f.seek(newpos) # positive seek
  176. def test_seek_whence(self):
  177. self.test_write()
  178. # Try seek(whence=1), read test
  179. with gzip.GzipFile(self.filename) as f:
  180. f.read(10)
  181. f.seek(10, whence=1)
  182. y = f.read(10)
  183. self.assertEqual(y, data1[20:30])
  184. def test_seek_write(self):
  185. # Try seek, write test
  186. with gzip.GzipFile(self.filename, 'w') as f:
  187. for pos in range(0, 256, 16):
  188. f.seek(pos)
  189. f.write('GZ\n')
  190. def test_mode(self):
  191. self.test_write()
  192. with gzip.GzipFile(self.filename, 'r') as f:
  193. self.assertEqual(f.myfileobj.mode, 'rb')
  194. def test_1647484(self):
  195. for mode in ('wb', 'rb'):
  196. with gzip.GzipFile(self.filename, mode) as f:
  197. self.assertTrue(hasattr(f, "name"))
  198. self.assertEqual(f.name, self.filename)
  199. def test_mtime(self):
  200. mtime = 123456789
  201. with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
  202. fWrite.write(data1)
  203. with gzip.GzipFile(self.filename) as fRead:
  204. dataRead = fRead.read()
  205. self.assertEqual(dataRead, data1)
  206. self.assertTrue(hasattr(fRead, 'mtime'))
  207. self.assertEqual(fRead.mtime, mtime)
  208. def test_metadata(self):
  209. mtime = 123456789
  210. with gzip.GzipFile(self.filename, 'w', mtime = mtime) as fWrite:
  211. fWrite.write(data1)
  212. with open(self.filename, 'rb') as fRead:
  213. # see RFC 1952: http://www.faqs.org/rfcs/rfc1952.html
  214. idBytes = fRead.read(2)
  215. self.assertEqual(idBytes, '\x1f\x8b') # gzip ID
  216. cmByte = fRead.read(1)
  217. self.assertEqual(cmByte, '\x08') # deflate
  218. flagsByte = fRead.read(1)
  219. self.assertEqual(flagsByte, '\x08') # only the FNAME flag is set
  220. mtimeBytes = fRead.read(4)
  221. self.assertEqual(mtimeBytes, struct.pack('<i', mtime)) # little-endian
  222. xflByte = fRead.read(1)
  223. self.assertEqual(xflByte, '\x02') # maximum compression
  224. osByte = fRead.read(1)
  225. self.assertEqual(osByte, '\xff') # OS "unknown" (OS-independent)
  226. # Since the FNAME flag is set, the zero-terminated filename follows.
  227. # RFC 1952 specifies that this is the name of the input file, if any.
  228. # However, the gzip module defaults to storing the name of the output
  229. # file in this field.
  230. expected = self.filename.encode('Latin-1') + '\x00'
  231. nameBytes = fRead.read(len(expected))
  232. self.assertEqual(nameBytes, expected)
  233. # Since no other flags were set, the header ends here.
  234. # Rather than process the compressed data, let's seek to the trailer.
  235. fRead.seek(os.stat(self.filename).st_size - 8)
  236. crc32Bytes = fRead.read(4) # CRC32 of uncompressed data [data1]
  237. self.assertEqual(crc32Bytes, '\xaf\xd7d\x83')
  238. isizeBytes = fRead.read(4)
  239. self.assertEqual(isizeBytes, struct.pack('<i', len(data1)))
  240. def test_with_open(self):
  241. # GzipFile supports the context management protocol
  242. with gzip.GzipFile(self.filename, "wb") as f:
  243. f.write(b"xxx")
  244. f = gzip.GzipFile(self.filename, "rb")
  245. f.close()
  246. try:
  247. with f:
  248. pass
  249. except ValueError:
  250. pass
  251. else:
  252. self.fail("__enter__ on a closed file didn't raise an exception")
  253. try:
  254. with gzip.GzipFile(self.filename, "wb") as f:
  255. 1 // 0
  256. except ZeroDivisionError:
  257. pass
  258. else:
  259. self.fail("1 // 0 didn't raise an exception")
  260. def test_zero_padded_file(self):
  261. with gzip.GzipFile(self.filename, "wb") as f:
  262. f.write(data1 * 50)
  263. # Pad the file with zeroes
  264. with open(self.filename, "ab") as f:
  265. f.write("\x00" * 50)
  266. with gzip.GzipFile(self.filename, "rb") as f:
  267. d = f.read()
  268. self.assertEqual(d, data1 * 50, "Incorrect data in file")
  269. def test_fileobj_from_fdopen(self):
  270. # Issue #13781: Creating a GzipFile using a fileobj from os.fdopen()
  271. # should not embed the fake filename "<fdopen>" in the output file.
  272. fd = os.open(self.filename, os.O_WRONLY | os.O_CREAT)
  273. with os.fdopen(fd, "wb") as f:
  274. with gzip.GzipFile(fileobj=f, mode="w") as g:
  275. self.assertEqual(g.name, "")
  276. def test_read_with_extra(self):
  277. # Gzip data with an extra field
  278. gzdata = (b'\x1f\x8b\x08\x04\xb2\x17cQ\x02\xff'
  279. b'\x05\x00Extra'
  280. b'\x0bI-.\x01\x002\xd1Mx\x04\x00\x00\x00')
  281. with gzip.GzipFile(fileobj=io.BytesIO(gzdata)) as f:
  282. self.assertEqual(f.read(), b'Test')
  283. def test_main(verbose=None):
  284. test_support.run_unittest(TestGzip)
  285. if __name__ == "__main__":
  286. test_main(verbose=True)