test_tarfile.py 66 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898
  1. import sys
  2. import os
  3. import shutil
  4. import StringIO
  5. from hashlib import md5
  6. import errno
  7. import unittest
  8. import tarfile
  9. from test import test_support
  10. from test import test_support as support
  11. # Check for our compression modules.
  12. try:
  13. import gzip
  14. gzip.GzipFile
  15. except (ImportError, AttributeError):
  16. gzip = None
  17. try:
  18. import bz2
  19. except ImportError:
  20. bz2 = None
  21. def md5sum(data):
  22. return md5(data).hexdigest()
  23. TEMPDIR = os.path.abspath(test_support.TESTFN)
  24. tarname = test_support.findfile("testtar.tar")
  25. gzipname = os.path.join(TEMPDIR, "testtar.tar.gz")
  26. bz2name = os.path.join(TEMPDIR, "testtar.tar.bz2")
  27. tmpname = os.path.join(TEMPDIR, "tmp.tar")
  28. md5_regtype = "65f477c818ad9e15f7feab0c6d37742f"
  29. md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6"
  30. class ReadTest(unittest.TestCase):
  31. tarname = tarname
  32. mode = "r:"
  33. def setUp(self):
  34. self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
  35. def tearDown(self):
  36. self.tar.close()
  37. class UstarReadTest(ReadTest):
  38. def test_fileobj_regular_file(self):
  39. tarinfo = self.tar.getmember("ustar/regtype")
  40. fobj = self.tar.extractfile(tarinfo)
  41. data = fobj.read()
  42. self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
  43. "regular file extraction failed")
  44. def test_fileobj_readlines(self):
  45. self.tar.extract("ustar/regtype", TEMPDIR)
  46. tarinfo = self.tar.getmember("ustar/regtype")
  47. fobj1 = open(os.path.join(TEMPDIR, "ustar/regtype"), "rU")
  48. with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1:
  49. lines1 = fobj1.readlines()
  50. fobj2 = self.tar.extractfile(tarinfo)
  51. lines2 = fobj2.readlines()
  52. self.assertTrue(lines1 == lines2,
  53. "fileobj.readlines() failed")
  54. self.assertTrue(len(lines2) == 114,
  55. "fileobj.readlines() failed")
  56. self.assertTrue(lines2[83] ==
  57. "I will gladly admit that Python is not the fastest running scripting language.\n",
  58. "fileobj.readlines() failed")
  59. def test_fileobj_iter(self):
  60. self.tar.extract("ustar/regtype", TEMPDIR)
  61. tarinfo = self.tar.getmember("ustar/regtype")
  62. with open(os.path.join(TEMPDIR, "ustar/regtype"), "rU") as fobj1:
  63. lines1 = fobj1.readlines()
  64. fobj2 = self.tar.extractfile(tarinfo)
  65. lines2 = [line for line in fobj2]
  66. self.assertTrue(lines1 == lines2,
  67. "fileobj.__iter__() failed")
  68. def test_fileobj_seek(self):
  69. self.tar.extract("ustar/regtype", TEMPDIR)
  70. with open(os.path.join(TEMPDIR, "ustar/regtype"), "rb") as fobj:
  71. data = fobj.read()
  72. tarinfo = self.tar.getmember("ustar/regtype")
  73. fobj = self.tar.extractfile(tarinfo)
  74. text = fobj.read()
  75. fobj.seek(0)
  76. self.assertTrue(0 == fobj.tell(),
  77. "seek() to file's start failed")
  78. fobj.seek(2048, 0)
  79. self.assertTrue(2048 == fobj.tell(),
  80. "seek() to absolute position failed")
  81. fobj.seek(-1024, 1)
  82. self.assertTrue(1024 == fobj.tell(),
  83. "seek() to negative relative position failed")
  84. fobj.seek(1024, 1)
  85. self.assertTrue(2048 == fobj.tell(),
  86. "seek() to positive relative position failed")
  87. s = fobj.read(10)
  88. self.assertTrue(s == data[2048:2058],
  89. "read() after seek failed")
  90. fobj.seek(0, 2)
  91. self.assertTrue(tarinfo.size == fobj.tell(),
  92. "seek() to file's end failed")
  93. self.assertTrue(fobj.read() == "",
  94. "read() at file's end did not return empty string")
  95. fobj.seek(-tarinfo.size, 2)
  96. self.assertTrue(0 == fobj.tell(),
  97. "relative seek() to file's start failed")
  98. fobj.seek(512)
  99. s1 = fobj.readlines()
  100. fobj.seek(512)
  101. s2 = fobj.readlines()
  102. self.assertTrue(s1 == s2,
  103. "readlines() after seek failed")
  104. fobj.seek(0)
  105. self.assertTrue(len(fobj.readline()) == fobj.tell(),
  106. "tell() after readline() failed")
  107. fobj.seek(512)
  108. self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(),
  109. "tell() after seek() and readline() failed")
  110. fobj.seek(0)
  111. line = fobj.readline()
  112. self.assertTrue(fobj.read() == data[len(line):],
  113. "read() after readline() failed")
  114. fobj.close()
  115. # Test if symbolic and hard links are resolved by extractfile(). The
  116. # test link members each point to a regular member whose data is
  117. # supposed to be exported.
  118. def _test_fileobj_link(self, lnktype, regtype):
  119. a = self.tar.extractfile(lnktype)
  120. b = self.tar.extractfile(regtype)
  121. self.assertEqual(a.name, b.name)
  122. def test_fileobj_link1(self):
  123. self._test_fileobj_link("ustar/lnktype", "ustar/regtype")
  124. def test_fileobj_link2(self):
  125. self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype")
  126. def test_fileobj_symlink1(self):
  127. self._test_fileobj_link("ustar/symtype", "ustar/regtype")
  128. def test_fileobj_symlink2(self):
  129. self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype")
  130. def test_issue14160(self):
  131. self._test_fileobj_link("symtype2", "ustar/regtype")
  132. class ListTest(ReadTest, unittest.TestCase):
  133. # Override setUp to use default encoding (UTF-8)
  134. def setUp(self):
  135. self.tar = tarfile.open(self.tarname, mode=self.mode)
  136. def test_list(self):
  137. with test_support.captured_stdout() as t:
  138. self.tar.list(verbose=False)
  139. out = t.getvalue()
  140. self.assertIn('ustar/conttype', out)
  141. self.assertIn('ustar/regtype', out)
  142. self.assertIn('ustar/lnktype', out)
  143. self.assertIn('ustar' + ('/12345' * 40) + '67/longname', out)
  144. self.assertIn('./ustar/linktest2/symtype', out)
  145. self.assertIn('./ustar/linktest2/lnktype', out)
  146. # Make sure it puts trailing slash for directory
  147. self.assertIn('ustar/dirtype/', out)
  148. self.assertIn('ustar/dirtype-with-size/', out)
  149. # Make sure it is able to print non-ASCII characters
  150. self.assertIn('ustar/umlauts-'
  151. '\xc4\xd6\xdc\xe4\xf6\xfc\xdf', out)
  152. self.assertIn('misc/regtype-hpux-signed-chksum-'
  153. '\xc4\xd6\xdc\xe4\xf6\xfc\xdf', out)
  154. self.assertIn('misc/regtype-old-v7-signed-chksum-'
  155. '\xc4\xd6\xdc\xe4\xf6\xfc\xdf', out)
  156. # Make sure it prints files separated by one newline without any
  157. # 'ls -l'-like accessories if verbose flag is not being used
  158. # ...
  159. # ustar/conttype
  160. # ustar/regtype
  161. # ...
  162. self.assertRegexpMatches(out, r'ustar/conttype ?\r?\n'
  163. r'ustar/regtype ?\r?\n')
  164. # Make sure it does not print the source of link without verbose flag
  165. self.assertNotIn('link to', out)
  166. self.assertNotIn('->', out)
  167. def test_list_verbose(self):
  168. with test_support.captured_stdout() as t:
  169. self.tar.list(verbose=True)
  170. out = t.getvalue()
  171. # Make sure it prints files separated by one newline with 'ls -l'-like
  172. # accessories if verbose flag is being used
  173. # ...
  174. # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/conttype
  175. # ?rw-r--r-- tarfile/tarfile 7011 2003-01-06 07:19:43 ustar/regtype
  176. # ...
  177. self.assertRegexpMatches(out, (r'-rw-r--r-- tarfile/tarfile\s+7011 '
  178. r'\d{4}-\d\d-\d\d\s+\d\d:\d\d:\d\d '
  179. r'ustar/\w+type ?\r?\n') * 2)
  180. # Make sure it prints the source of link with verbose flag
  181. self.assertIn('ustar/symtype -> regtype', out)
  182. self.assertIn('./ustar/linktest2/symtype -> ../linktest1/regtype', out)
  183. self.assertIn('./ustar/linktest2/lnktype link to '
  184. './ustar/linktest1/regtype', out)
  185. self.assertIn('gnu' + ('/123' * 125) + '/longlink link to gnu' +
  186. ('/123' * 125) + '/longname', out)
  187. self.assertIn('pax' + ('/123' * 125) + '/longlink link to pax' +
  188. ('/123' * 125) + '/longname', out)
  189. class GzipListTest(ListTest):
  190. tarname = gzipname
  191. mode = "r:gz"
  192. taropen = tarfile.TarFile.gzopen
  193. class Bz2ListTest(ListTest):
  194. tarname = bz2name
  195. mode = "r:bz2"
  196. taropen = tarfile.TarFile.bz2open
  197. class CommonReadTest(ReadTest):
  198. def test_empty_tarfile(self):
  199. # Test for issue6123: Allow opening empty archives.
  200. # This test checks if tarfile.open() is able to open an empty tar
  201. # archive successfully. Note that an empty tar archive is not the
  202. # same as an empty file!
  203. with tarfile.open(tmpname, self.mode.replace("r", "w")):
  204. pass
  205. try:
  206. tar = tarfile.open(tmpname, self.mode)
  207. tar.getnames()
  208. except tarfile.ReadError:
  209. self.fail("tarfile.open() failed on empty archive")
  210. else:
  211. self.assertListEqual(tar.getmembers(), [])
  212. finally:
  213. tar.close()
  214. def test_null_tarfile(self):
  215. # Test for issue6123: Allow opening empty archives.
  216. # This test guarantees that tarfile.open() does not treat an empty
  217. # file as an empty tar archive.
  218. with open(tmpname, "wb"):
  219. pass
  220. self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, self.mode)
  221. self.assertRaises(tarfile.ReadError, tarfile.open, tmpname)
  222. def test_non_existent_tarfile(self):
  223. # Test for issue11513: prevent non-existent gzipped tarfiles raising
  224. # multiple exceptions.
  225. exctype = OSError if '|' in self.mode else IOError
  226. with self.assertRaisesRegexp(exctype, "xxx") as ex:
  227. tarfile.open("xxx", self.mode)
  228. self.assertEqual(ex.exception.errno, errno.ENOENT)
  229. def test_ignore_zeros(self):
  230. # Test TarFile's ignore_zeros option.
  231. if self.mode.endswith(":gz"):
  232. _open = gzip.GzipFile
  233. elif self.mode.endswith(":bz2"):
  234. _open = bz2.BZ2File
  235. else:
  236. _open = open
  237. for char in ('\0', 'a'):
  238. # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a')
  239. # are ignored correctly.
  240. with _open(tmpname, "wb") as fobj:
  241. fobj.write(char * 1024)
  242. fobj.write(tarfile.TarInfo("foo").tobuf())
  243. tar = tarfile.open(tmpname, mode="r", ignore_zeros=True)
  244. try:
  245. self.assertListEqual(tar.getnames(), ["foo"],
  246. "ignore_zeros=True should have skipped the %r-blocks" % char)
  247. finally:
  248. tar.close()
  249. def test_premature_end_of_archive(self):
  250. for size in (512, 600, 1024, 1200):
  251. with tarfile.open(tmpname, "w:") as tar:
  252. t = tarfile.TarInfo("foo")
  253. t.size = 1024
  254. tar.addfile(t, StringIO.StringIO("a" * 1024))
  255. with open(tmpname, "r+b") as fobj:
  256. fobj.truncate(size)
  257. with tarfile.open(tmpname) as tar:
  258. with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
  259. for t in tar:
  260. pass
  261. with tarfile.open(tmpname) as tar:
  262. t = tar.next()
  263. with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
  264. tar.extract(t, TEMPDIR)
  265. with self.assertRaisesRegexp(tarfile.ReadError, "unexpected end of data"):
  266. tar.extractfile(t).read()
  267. class MiscReadTest(CommonReadTest):
  268. taropen = tarfile.TarFile.taropen
  269. def test_no_name_argument(self):
  270. with open(self.tarname, "rb") as fobj:
  271. tar = tarfile.open(fileobj=fobj, mode=self.mode)
  272. self.assertEqual(tar.name, os.path.abspath(fobj.name))
  273. def test_no_name_attribute(self):
  274. with open(self.tarname, "rb") as fobj:
  275. data = fobj.read()
  276. fobj = StringIO.StringIO(data)
  277. self.assertRaises(AttributeError, getattr, fobj, "name")
  278. tar = tarfile.open(fileobj=fobj, mode=self.mode)
  279. self.assertEqual(tar.name, None)
  280. def test_empty_name_attribute(self):
  281. with open(self.tarname, "rb") as fobj:
  282. data = fobj.read()
  283. fobj = StringIO.StringIO(data)
  284. fobj.name = ""
  285. tar = tarfile.open(fileobj=fobj, mode=self.mode)
  286. self.assertEqual(tar.name, None)
  287. def test_illegal_mode_arg(self):
  288. with open(tmpname, 'wb'):
  289. pass
  290. self.addCleanup(os.unlink, tmpname)
  291. with self.assertRaisesRegexp(ValueError, 'mode must be '):
  292. tar = self.taropen(tmpname, 'q')
  293. with self.assertRaisesRegexp(ValueError, 'mode must be '):
  294. tar = self.taropen(tmpname, 'rw')
  295. with self.assertRaisesRegexp(ValueError, 'mode must be '):
  296. tar = self.taropen(tmpname, '')
  297. def test_fileobj_with_offset(self):
  298. # Skip the first member and store values from the second member
  299. # of the testtar.
  300. tar = tarfile.open(self.tarname, mode=self.mode)
  301. try:
  302. tar.next()
  303. t = tar.next()
  304. name = t.name
  305. offset = t.offset
  306. data = tar.extractfile(t).read()
  307. finally:
  308. tar.close()
  309. # Open the testtar and seek to the offset of the second member.
  310. if self.mode.endswith(":gz"):
  311. _open = gzip.GzipFile
  312. elif self.mode.endswith(":bz2"):
  313. _open = bz2.BZ2File
  314. else:
  315. _open = open
  316. fobj = _open(self.tarname, "rb")
  317. try:
  318. fobj.seek(offset)
  319. # Test if the tarfile starts with the second member.
  320. tar = tar.open(self.tarname, mode="r:", fileobj=fobj)
  321. t = tar.next()
  322. self.assertEqual(t.name, name)
  323. # Read to the end of fileobj and test if seeking back to the
  324. # beginning works.
  325. tar.getmembers()
  326. self.assertEqual(tar.extractfile(t).read(), data,
  327. "seek back did not work")
  328. tar.close()
  329. finally:
  330. fobj.close()
  331. def test_fail_comp(self):
  332. # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file.
  333. if self.mode == "r:":
  334. self.skipTest('needs a gz or bz2 mode')
  335. self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode)
  336. with open(tarname, "rb") as fobj:
  337. self.assertRaises(tarfile.ReadError, tarfile.open,
  338. fileobj=fobj, mode=self.mode)
  339. def test_v7_dirtype(self):
  340. # Test old style dirtype member (bug #1336623):
  341. # Old V7 tars create directory members using an AREGTYPE
  342. # header with a "/" appended to the filename field.
  343. tarinfo = self.tar.getmember("misc/dirtype-old-v7")
  344. self.assertTrue(tarinfo.type == tarfile.DIRTYPE,
  345. "v7 dirtype failed")
  346. def test_xstar_type(self):
  347. # The xstar format stores extra atime and ctime fields inside the
  348. # space reserved for the prefix field. The prefix field must be
  349. # ignored in this case, otherwise it will mess up the name.
  350. try:
  351. self.tar.getmember("misc/regtype-xstar")
  352. except KeyError:
  353. self.fail("failed to find misc/regtype-xstar (mangled prefix?)")
  354. def test_check_members(self):
  355. for tarinfo in self.tar:
  356. self.assertTrue(int(tarinfo.mtime) == 07606136617,
  357. "wrong mtime for %s" % tarinfo.name)
  358. if not tarinfo.name.startswith("ustar/"):
  359. continue
  360. self.assertTrue(tarinfo.uname == "tarfile",
  361. "wrong uname for %s" % tarinfo.name)
  362. def test_find_members(self):
  363. self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof",
  364. "could not find all members")
  365. def test_extract_hardlink(self):
  366. # Test hardlink extraction (e.g. bug #857297).
  367. with tarfile.open(tarname, errorlevel=1, encoding="iso8859-1") as tar:
  368. tar.extract("ustar/regtype", TEMPDIR)
  369. self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/regtype"))
  370. tar.extract("ustar/lnktype", TEMPDIR)
  371. self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/lnktype"))
  372. with open(os.path.join(TEMPDIR, "ustar/lnktype"), "rb") as f:
  373. data = f.read()
  374. self.assertEqual(md5sum(data), md5_regtype)
  375. tar.extract("ustar/symtype", TEMPDIR)
  376. self.addCleanup(os.remove, os.path.join(TEMPDIR, "ustar/symtype"))
  377. with open(os.path.join(TEMPDIR, "ustar/symtype"), "rb") as f:
  378. data = f.read()
  379. self.assertEqual(md5sum(data), md5_regtype)
  380. def test_extractall(self):
  381. # Test if extractall() correctly restores directory permissions
  382. # and times (see issue1735).
  383. tar = tarfile.open(tarname, encoding="iso8859-1")
  384. try:
  385. directories = [t for t in tar if t.isdir()]
  386. tar.extractall(TEMPDIR, directories)
  387. for tarinfo in directories:
  388. path = os.path.join(TEMPDIR, tarinfo.name)
  389. if sys.platform != "win32":
  390. # Win32 has no support for fine grained permissions.
  391. self.assertEqual(tarinfo.mode & 0777, os.stat(path).st_mode & 0777)
  392. self.assertEqual(tarinfo.mtime, os.path.getmtime(path))
  393. finally:
  394. tar.close()
  395. def test_init_close_fobj(self):
  396. # Issue #7341: Close the internal file object in the TarFile
  397. # constructor in case of an error. For the test we rely on
  398. # the fact that opening an empty file raises a ReadError.
  399. empty = os.path.join(TEMPDIR, "empty")
  400. with open(empty, "wb") as fobj:
  401. fobj.write("")
  402. try:
  403. tar = object.__new__(tarfile.TarFile)
  404. try:
  405. tar.__init__(empty)
  406. except tarfile.ReadError:
  407. self.assertTrue(tar.fileobj.closed)
  408. else:
  409. self.fail("ReadError not raised")
  410. finally:
  411. support.unlink(empty)
  412. def test_parallel_iteration(self):
  413. # Issue #16601: Restarting iteration over tarfile continued
  414. # from where it left off.
  415. with tarfile.open(self.tarname) as tar:
  416. for m1, m2 in zip(tar, tar):
  417. self.assertEqual(m1.offset, m2.offset)
  418. self.assertEqual(m1.name, m2.name)
  419. class StreamReadTest(CommonReadTest):
  420. mode="r|"
  421. def test_fileobj_regular_file(self):
  422. tarinfo = self.tar.next() # get "regtype" (can't use getmember)
  423. fobj = self.tar.extractfile(tarinfo)
  424. data = fobj.read()
  425. self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype),
  426. "regular file extraction failed")
  427. def test_provoke_stream_error(self):
  428. tarinfos = self.tar.getmembers()
  429. f = self.tar.extractfile(tarinfos[0]) # read the first member
  430. self.assertRaises(tarfile.StreamError, f.read)
  431. def test_compare_members(self):
  432. tar1 = tarfile.open(tarname, encoding="iso8859-1")
  433. try:
  434. tar2 = self.tar
  435. while True:
  436. t1 = tar1.next()
  437. t2 = tar2.next()
  438. if t1 is None:
  439. break
  440. self.assertTrue(t2 is not None, "stream.next() failed.")
  441. if t2.islnk() or t2.issym():
  442. self.assertRaises(tarfile.StreamError, tar2.extractfile, t2)
  443. continue
  444. v1 = tar1.extractfile(t1)
  445. v2 = tar2.extractfile(t2)
  446. if v1 is None:
  447. continue
  448. self.assertTrue(v2 is not None, "stream.extractfile() failed")
  449. self.assertTrue(v1.read() == v2.read(), "stream extraction failed")
  450. finally:
  451. tar1.close()
  452. class DetectReadTest(unittest.TestCase):
  453. def _testfunc_file(self, name, mode):
  454. try:
  455. tar = tarfile.open(name, mode)
  456. except tarfile.ReadError:
  457. self.fail()
  458. else:
  459. tar.close()
  460. def _testfunc_fileobj(self, name, mode):
  461. try:
  462. tar = tarfile.open(name, mode, fileobj=open(name, "rb"))
  463. except tarfile.ReadError:
  464. self.fail()
  465. else:
  466. tar.close()
  467. def _test_modes(self, testfunc):
  468. testfunc(tarname, "r")
  469. testfunc(tarname, "r:")
  470. testfunc(tarname, "r:*")
  471. testfunc(tarname, "r|")
  472. testfunc(tarname, "r|*")
  473. if gzip:
  474. self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz")
  475. self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz")
  476. self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:")
  477. self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|")
  478. testfunc(gzipname, "r")
  479. testfunc(gzipname, "r:*")
  480. testfunc(gzipname, "r:gz")
  481. testfunc(gzipname, "r|*")
  482. testfunc(gzipname, "r|gz")
  483. if bz2:
  484. self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2")
  485. self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2")
  486. self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:")
  487. self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|")
  488. testfunc(bz2name, "r")
  489. testfunc(bz2name, "r:*")
  490. testfunc(bz2name, "r:bz2")
  491. testfunc(bz2name, "r|*")
  492. testfunc(bz2name, "r|bz2")
  493. def test_detect_file(self):
  494. self._test_modes(self._testfunc_file)
  495. def test_detect_fileobj(self):
  496. self._test_modes(self._testfunc_fileobj)
  497. @unittest.skipUnless(bz2, 'requires bz2')
  498. def test_detect_stream_bz2(self):
  499. # Originally, tarfile's stream detection looked for the string
  500. # "BZh91" at the start of the file. This is incorrect because
  501. # the '9' represents the blocksize (900kB). If the file was
  502. # compressed using another blocksize autodetection fails.
  503. with open(tarname, "rb") as fobj:
  504. data = fobj.read()
  505. # Compress with blocksize 100kB, the file starts with "BZh11".
  506. with bz2.BZ2File(tmpname, "wb", compresslevel=1) as fobj:
  507. fobj.write(data)
  508. self._testfunc_file(tmpname, "r|*")
  509. class MemberReadTest(ReadTest):
  510. def _test_member(self, tarinfo, chksum=None, **kwargs):
  511. if chksum is not None:
  512. self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum,
  513. "wrong md5sum for %s" % tarinfo.name)
  514. kwargs["mtime"] = 07606136617
  515. kwargs["uid"] = 1000
  516. kwargs["gid"] = 100
  517. if "old-v7" not in tarinfo.name:
  518. # V7 tar can't handle alphabetic owners.
  519. kwargs["uname"] = "tarfile"
  520. kwargs["gname"] = "tarfile"
  521. for k, v in kwargs.iteritems():
  522. self.assertTrue(getattr(tarinfo, k) == v,
  523. "wrong value in %s field of %s" % (k, tarinfo.name))
  524. def test_find_regtype(self):
  525. tarinfo = self.tar.getmember("ustar/regtype")
  526. self._test_member(tarinfo, size=7011, chksum=md5_regtype)
  527. def test_find_conttype(self):
  528. tarinfo = self.tar.getmember("ustar/conttype")
  529. self._test_member(tarinfo, size=7011, chksum=md5_regtype)
  530. def test_find_dirtype(self):
  531. tarinfo = self.tar.getmember("ustar/dirtype")
  532. self._test_member(tarinfo, size=0)
  533. def test_find_dirtype_with_size(self):
  534. tarinfo = self.tar.getmember("ustar/dirtype-with-size")
  535. self._test_member(tarinfo, size=255)
  536. def test_find_lnktype(self):
  537. tarinfo = self.tar.getmember("ustar/lnktype")
  538. self._test_member(tarinfo, size=0, linkname="ustar/regtype")
  539. def test_find_symtype(self):
  540. tarinfo = self.tar.getmember("ustar/symtype")
  541. self._test_member(tarinfo, size=0, linkname="regtype")
  542. def test_find_blktype(self):
  543. tarinfo = self.tar.getmember("ustar/blktype")
  544. self._test_member(tarinfo, size=0, devmajor=3, devminor=0)
  545. def test_find_chrtype(self):
  546. tarinfo = self.tar.getmember("ustar/chrtype")
  547. self._test_member(tarinfo, size=0, devmajor=1, devminor=3)
  548. def test_find_fifotype(self):
  549. tarinfo = self.tar.getmember("ustar/fifotype")
  550. self._test_member(tarinfo, size=0)
  551. def test_find_sparse(self):
  552. tarinfo = self.tar.getmember("ustar/sparse")
  553. self._test_member(tarinfo, size=86016, chksum=md5_sparse)
  554. def test_find_umlauts(self):
  555. tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
  556. self._test_member(tarinfo, size=7011, chksum=md5_regtype)
  557. def test_find_ustar_longname(self):
  558. name = "ustar/" + "12345/" * 39 + "1234567/longname"
  559. self.assertIn(name, self.tar.getnames())
  560. def test_find_regtype_oldv7(self):
  561. tarinfo = self.tar.getmember("misc/regtype-old-v7")
  562. self._test_member(tarinfo, size=7011, chksum=md5_regtype)
  563. def test_find_pax_umlauts(self):
  564. self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1")
  565. tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
  566. self._test_member(tarinfo, size=7011, chksum=md5_regtype)
  567. class LongnameTest(ReadTest):
  568. def test_read_longname(self):
  569. # Test reading of longname (bug #1471427).
  570. longname = self.subdir + "/" + "123/" * 125 + "longname"
  571. try:
  572. tarinfo = self.tar.getmember(longname)
  573. except KeyError:
  574. self.fail("longname not found")
  575. self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype")
  576. def test_read_longlink(self):
  577. longname = self.subdir + "/" + "123/" * 125 + "longname"
  578. longlink = self.subdir + "/" + "123/" * 125 + "longlink"
  579. try:
  580. tarinfo = self.tar.getmember(longlink)
  581. except KeyError:
  582. self.fail("longlink not found")
  583. self.assertTrue(tarinfo.linkname == longname, "linkname wrong")
  584. def test_truncated_longname(self):
  585. longname = self.subdir + "/" + "123/" * 125 + "longname"
  586. tarinfo = self.tar.getmember(longname)
  587. offset = tarinfo.offset
  588. self.tar.fileobj.seek(offset)
  589. fobj = StringIO.StringIO(self.tar.fileobj.read(3 * 512))
  590. self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj)
  591. def test_header_offset(self):
  592. # Test if the start offset of the TarInfo object includes
  593. # the preceding extended header.
  594. longname = self.subdir + "/" + "123/" * 125 + "longname"
  595. offset = self.tar.getmember(longname).offset
  596. fobj = open(tarname)
  597. fobj.seek(offset)
  598. tarinfo = tarfile.TarInfo.frombuf(fobj.read(512))
  599. self.assertEqual(tarinfo.type, self.longnametype)
  600. class GNUReadTest(LongnameTest):
  601. subdir = "gnu"
  602. longnametype = tarfile.GNUTYPE_LONGNAME
  603. def test_sparse_file(self):
  604. tarinfo1 = self.tar.getmember("ustar/sparse")
  605. fobj1 = self.tar.extractfile(tarinfo1)
  606. tarinfo2 = self.tar.getmember("gnu/sparse")
  607. fobj2 = self.tar.extractfile(tarinfo2)
  608. self.assertTrue(fobj1.read() == fobj2.read(),
  609. "sparse file extraction failed")
  610. class PaxReadTest(LongnameTest):
  611. subdir = "pax"
  612. longnametype = tarfile.XHDTYPE
  613. def test_pax_global_headers(self):
  614. tar = tarfile.open(tarname, encoding="iso8859-1")
  615. try:
  616. tarinfo = tar.getmember("pax/regtype1")
  617. self.assertEqual(tarinfo.uname, "foo")
  618. self.assertEqual(tarinfo.gname, "bar")
  619. self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
  620. tarinfo = tar.getmember("pax/regtype2")
  621. self.assertEqual(tarinfo.uname, "")
  622. self.assertEqual(tarinfo.gname, "bar")
  623. self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
  624. tarinfo = tar.getmember("pax/regtype3")
  625. self.assertEqual(tarinfo.uname, "tarfile")
  626. self.assertEqual(tarinfo.gname, "tarfile")
  627. self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), u"\xc4\xd6\xdc\xe4\xf6\xfc\xdf")
  628. finally:
  629. tar.close()
  630. def test_pax_number_fields(self):
  631. # All following number fields are read from the pax header.
  632. tar = tarfile.open(tarname, encoding="iso8859-1")
  633. try:
  634. tarinfo = tar.getmember("pax/regtype4")
  635. self.assertEqual(tarinfo.size, 7011)
  636. self.assertEqual(tarinfo.uid, 123)
  637. self.assertEqual(tarinfo.gid, 123)
  638. self.assertEqual(tarinfo.mtime, 1041808783.0)
  639. self.assertEqual(type(tarinfo.mtime), float)
  640. self.assertEqual(float(tarinfo.pax_headers["atime"]), 1041808783.0)
  641. self.assertEqual(float(tarinfo.pax_headers["ctime"]), 1041808783.0)
  642. finally:
  643. tar.close()
  644. class WriteTestBase(unittest.TestCase):
  645. # Put all write tests in here that are supposed to be tested
  646. # in all possible mode combinations.
  647. def test_fileobj_no_close(self):
  648. fobj = StringIO.StringIO()
  649. tar = tarfile.open(fileobj=fobj, mode=self.mode)
  650. tar.addfile(tarfile.TarInfo("foo"))
  651. tar.close()
  652. self.assertTrue(fobj.closed is False, "external fileobjs must never closed")
  653. # Issue #20238: Incomplete gzip output with mode="w:gz"
  654. data = fobj.getvalue()
  655. del tar
  656. test_support.gc_collect()
  657. self.assertFalse(fobj.closed)
  658. self.assertEqual(data, fobj.getvalue())
  659. class WriteTest(WriteTestBase):
  660. mode = "w:"
  661. def test_100_char_name(self):
  662. # The name field in a tar header stores strings of at most 100 chars.
  663. # If a string is shorter than 100 chars it has to be padded with '\0',
  664. # which implies that a string of exactly 100 chars is stored without
  665. # a trailing '\0'.
  666. name = "0123456789" * 10
  667. tar = tarfile.open(tmpname, self.mode)
  668. try:
  669. t = tarfile.TarInfo(name)
  670. tar.addfile(t)
  671. finally:
  672. tar.close()
  673. tar = tarfile.open(tmpname)
  674. try:
  675. self.assertTrue(tar.getnames()[0] == name,
  676. "failed to store 100 char filename")
  677. finally:
  678. tar.close()
  679. def test_tar_size(self):
  680. # Test for bug #1013882.
  681. tar = tarfile.open(tmpname, self.mode)
  682. try:
  683. path = os.path.join(TEMPDIR, "file")
  684. with open(path, "wb") as fobj:
  685. fobj.write("aaa")
  686. tar.add(path)
  687. finally:
  688. tar.close()
  689. self.assertTrue(os.path.getsize(tmpname) > 0,
  690. "tarfile is empty")
  691. # The test_*_size tests test for bug #1167128.
  692. def test_file_size(self):
  693. tar = tarfile.open(tmpname, self.mode)
  694. try:
  695. path = os.path.join(TEMPDIR, "file")
  696. with open(path, "wb"):
  697. pass
  698. tarinfo = tar.gettarinfo(path)
  699. self.assertEqual(tarinfo.size, 0)
  700. with open(path, "wb") as fobj:
  701. fobj.write("aaa")
  702. tarinfo = tar.gettarinfo(path)
  703. self.assertEqual(tarinfo.size, 3)
  704. finally:
  705. tar.close()
  706. def test_directory_size(self):
  707. path = os.path.join(TEMPDIR, "directory")
  708. os.mkdir(path)
  709. try:
  710. tar = tarfile.open(tmpname, self.mode)
  711. try:
  712. tarinfo = tar.gettarinfo(path)
  713. self.assertEqual(tarinfo.size, 0)
  714. finally:
  715. tar.close()
  716. finally:
  717. os.rmdir(path)
  718. def test_link_size(self):
  719. if hasattr(os, "link"):
  720. link = os.path.join(TEMPDIR, "link")
  721. target = os.path.join(TEMPDIR, "link_target")
  722. with open(target, "wb") as fobj:
  723. fobj.write("aaa")
  724. os.link(target, link)
  725. try:
  726. tar = tarfile.open(tmpname, self.mode)
  727. try:
  728. # Record the link target in the inodes list.
  729. tar.gettarinfo(target)
  730. tarinfo = tar.gettarinfo(link)
  731. self.assertEqual(tarinfo.size, 0)
  732. finally:
  733. tar.close()
  734. finally:
  735. os.remove(target)
  736. os.remove(link)
  737. def test_symlink_size(self):
  738. if hasattr(os, "symlink"):
  739. path = os.path.join(TEMPDIR, "symlink")
  740. os.symlink("link_target", path)
  741. try:
  742. tar = tarfile.open(tmpname, self.mode)
  743. try:
  744. tarinfo = tar.gettarinfo(path)
  745. self.assertEqual(tarinfo.size, 0)
  746. finally:
  747. tar.close()
  748. finally:
  749. os.remove(path)
  750. def test_add_self(self):
  751. # Test for #1257255.
  752. dstname = os.path.abspath(tmpname)
  753. tar = tarfile.open(tmpname, self.mode)
  754. try:
  755. self.assertTrue(tar.name == dstname, "archive name must be absolute")
  756. tar.add(dstname)
  757. self.assertTrue(tar.getnames() == [], "added the archive to itself")
  758. cwd = os.getcwd()
  759. os.chdir(TEMPDIR)
  760. tar.add(dstname)
  761. os.chdir(cwd)
  762. self.assertTrue(tar.getnames() == [], "added the archive to itself")
  763. finally:
  764. tar.close()
  765. def test_exclude(self):
  766. tempdir = os.path.join(TEMPDIR, "exclude")
  767. os.mkdir(tempdir)
  768. try:
  769. for name in ("foo", "bar", "baz"):
  770. name = os.path.join(tempdir, name)
  771. open(name, "wb").close()
  772. exclude = os.path.isfile
  773. tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
  774. try:
  775. with test_support.check_warnings(("use the filter argument",
  776. DeprecationWarning)):
  777. tar.add(tempdir, arcname="empty_dir", exclude=exclude)
  778. finally:
  779. tar.close()
  780. tar = tarfile.open(tmpname, "r")
  781. try:
  782. self.assertEqual(len(tar.getmembers()), 1)
  783. self.assertEqual(tar.getnames()[0], "empty_dir")
  784. finally:
  785. tar.close()
  786. finally:
  787. shutil.rmtree(tempdir)
  788. def test_filter(self):
  789. tempdir = os.path.join(TEMPDIR, "filter")
  790. os.mkdir(tempdir)
  791. try:
  792. for name in ("foo", "bar", "baz"):
  793. name = os.path.join(tempdir, name)
  794. open(name, "wb").close()
  795. def filter(tarinfo):
  796. if os.path.basename(tarinfo.name) == "bar":
  797. return
  798. tarinfo.uid = 123
  799. tarinfo.uname = "foo"
  800. return tarinfo
  801. tar = tarfile.open(tmpname, self.mode, encoding="iso8859-1")
  802. try:
  803. tar.add(tempdir, arcname="empty_dir", filter=filter)
  804. finally:
  805. tar.close()
  806. tar = tarfile.open(tmpname, "r")
  807. try:
  808. for tarinfo in tar:
  809. self.assertEqual(tarinfo.uid, 123)
  810. self.assertEqual(tarinfo.uname, "foo")
  811. self.assertEqual(len(tar.getmembers()), 3)
  812. finally:
  813. tar.close()
  814. finally:
  815. shutil.rmtree(tempdir)
  816. # Guarantee that stored pathnames are not modified. Don't
  817. # remove ./ or ../ or double slashes. Still make absolute
  818. # pathnames relative.
  819. # For details see bug #6054.
  820. def _test_pathname(self, path, cmp_path=None, dir=False):
  821. # Create a tarfile with an empty member named path
  822. # and compare the stored name with the original.
  823. foo = os.path.join(TEMPDIR, "foo")
  824. if not dir:
  825. open(foo, "w").close()
  826. else:
  827. os.mkdir(foo)
  828. tar = tarfile.open(tmpname, self.mode)
  829. try:
  830. tar.add(foo, arcname=path)
  831. finally:
  832. tar.close()
  833. tar = tarfile.open(tmpname, "r")
  834. try:
  835. t = tar.next()
  836. finally:
  837. tar.close()
  838. if not dir:
  839. os.remove(foo)
  840. else:
  841. os.rmdir(foo)
  842. self.assertEqual(t.name, cmp_path or path.replace(os.sep, "/"))
  843. def test_pathnames(self):
  844. self._test_pathname("foo")
  845. self._test_pathname(os.path.join("foo", ".", "bar"))
  846. self._test_pathname(os.path.join("foo", "..", "bar"))
  847. self._test_pathname(os.path.join(".", "foo"))
  848. self._test_pathname(os.path.join(".", "foo", "."))
  849. self._test_pathname(os.path.join(".", "foo", ".", "bar"))
  850. self._test_pathname(os.path.join(".", "foo", "..", "bar"))
  851. self._test_pathname(os.path.join(".", "foo", "..", "bar"))
  852. self._test_pathname(os.path.join("..", "foo"))
  853. self._test_pathname(os.path.join("..", "foo", ".."))
  854. self._test_pathname(os.path.join("..", "foo", ".", "bar"))
  855. self._test_pathname(os.path.join("..", "foo", "..", "bar"))
  856. self._test_pathname("foo" + os.sep + os.sep + "bar")
  857. self._test_pathname("foo" + os.sep + os.sep, "foo", dir=True)
  858. def test_abs_pathnames(self):
  859. if sys.platform == "win32":
  860. self._test_pathname("C:\\foo", "foo")
  861. else:
  862. self._test_pathname("/foo", "foo")
  863. self._test_pathname("///foo", "foo")
  864. def test_cwd(self):
  865. # Test adding the current working directory.
  866. with support.change_cwd(TEMPDIR):
  867. tar = tarfile.open(tmpname, self.mode)
  868. try:
  869. tar.add(".")
  870. finally:
  871. tar.close()
  872. tar = tarfile.open(tmpname, "r")
  873. try:
  874. for t in tar:
  875. self.assertTrue(t.name == "." or t.name.startswith("./"))
  876. finally:
  877. tar.close()
  878. @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
  879. def test_extractall_symlinks(self):
  880. # Test if extractall works properly when tarfile contains symlinks
  881. tempdir = os.path.join(TEMPDIR, "testsymlinks")
  882. temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
  883. os.mkdir(tempdir)
  884. try:
  885. source_file = os.path.join(tempdir,'source')
  886. target_file = os.path.join(tempdir,'symlink')
  887. with open(source_file,'w') as f:
  888. f.write('something\n')
  889. os.symlink(source_file, target_file)
  890. tar = tarfile.open(temparchive,'w')
  891. tar.add(source_file, arcname=os.path.basename(source_file))
  892. tar.add(target_file, arcname=os.path.basename(target_file))
  893. tar.close()
  894. # Let's extract it to the location which contains the symlink
  895. tar = tarfile.open(temparchive,'r')
  896. # this should not raise OSError: [Errno 17] File exists
  897. try:
  898. tar.extractall(path=tempdir)
  899. except OSError:
  900. self.fail("extractall failed with symlinked files")
  901. finally:
  902. tar.close()
  903. finally:
  904. os.unlink(temparchive)
  905. shutil.rmtree(tempdir)
  906. @unittest.skipUnless(hasattr(os, 'symlink'), "needs os.symlink")
  907. def test_extractall_broken_symlinks(self):
  908. # Test if extractall works properly when tarfile contains broken
  909. # symlinks
  910. tempdir = os.path.join(TEMPDIR, "testsymlinks")
  911. temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
  912. os.mkdir(tempdir)
  913. try:
  914. source_file = os.path.join(tempdir,'source')
  915. target_file = os.path.join(tempdir,'symlink')
  916. with open(source_file,'w') as f:
  917. f.write('something\n')
  918. os.symlink(source_file, target_file)
  919. tar = tarfile.open(temparchive,'w')
  920. tar.add(target_file, arcname=os.path.basename(target_file))
  921. tar.close()
  922. # remove the real file
  923. os.unlink(source_file)
  924. # Let's extract it to the location which contains the symlink
  925. tar = tarfile.open(temparchive,'r')
  926. # this should not raise OSError: [Errno 17] File exists
  927. try:
  928. tar.extractall(path=tempdir)
  929. except OSError:
  930. self.fail("extractall failed with broken symlinked files")
  931. finally:
  932. tar.close()
  933. finally:
  934. os.unlink(temparchive)
  935. shutil.rmtree(tempdir)
  936. @unittest.skipUnless(hasattr(os, 'link'), "needs os.link")
  937. def test_extractall_hardlinks(self):
  938. # Test if extractall works properly when tarfile contains symlinks
  939. tempdir = os.path.join(TEMPDIR, "testsymlinks")
  940. temparchive = os.path.join(TEMPDIR, "testsymlinks.tar")
  941. os.mkdir(tempdir)
  942. try:
  943. source_file = os.path.join(tempdir,'source')
  944. target_file = os.path.join(tempdir,'symlink')
  945. with open(source_file,'w') as f:
  946. f.write('something\n')
  947. os.link(source_file, target_file)
  948. tar = tarfile.open(temparchive,'w')
  949. tar.add(source_file, arcname=os.path.basename(source_file))
  950. tar.add(target_file, arcname=os.path.basename(target_file))
  951. tar.close()
  952. # Let's extract it to the location which contains the symlink
  953. tar = tarfile.open(temparchive,'r')
  954. # this should not raise OSError: [Errno 17] File exists
  955. try:
  956. tar.extractall(path=tempdir)
  957. except OSError:
  958. self.fail("extractall failed with linked files")
  959. finally:
  960. tar.close()
  961. finally:
  962. os.unlink(temparchive)
  963. shutil.rmtree(tempdir)
  964. def test_open_nonwritable_fileobj(self):
  965. for exctype in IOError, EOFError, RuntimeError:
  966. class BadFile(StringIO.StringIO):
  967. first = True
  968. def write(self, data):
  969. if self.first:
  970. self.first = False
  971. raise exctype
  972. f = BadFile()
  973. with self.assertRaises(exctype):
  974. tar = tarfile.open(tmpname, self.mode, fileobj=f,
  975. format=tarfile.PAX_FORMAT,
  976. pax_headers={'non': 'empty'})
  977. self.assertFalse(f.closed)
  978. class StreamWriteTest(WriteTestBase):
  979. mode = "w|"
  980. def test_stream_padding(self):
  981. # Test for bug #1543303.
  982. tar = tarfile.open(tmpname, self.mode)
  983. tar.close()
  984. if self.mode.endswith("gz"):
  985. with gzip.GzipFile(tmpname) as fobj:
  986. data = fobj.read()
  987. elif self.mode.endswith("bz2"):
  988. dec = bz2.BZ2Decompressor()
  989. with open(tmpname, "rb") as fobj:
  990. data = fobj.read()
  991. data = dec.decompress(data)
  992. self.assertTrue(len(dec.unused_data) == 0,
  993. "found trailing data")
  994. else:
  995. with open(tmpname, "rb") as fobj:
  996. data = fobj.read()
  997. self.assertTrue(data.count("\0") == tarfile.RECORDSIZE,
  998. "incorrect zero padding")
  999. @unittest.skipIf(sys.platform == 'win32', 'not appropriate for Windows')
  1000. @unittest.skipUnless(hasattr(os, 'umask'), 'requires os.umask')
  1001. def test_file_mode(self):
  1002. # Test for issue #8464: Create files with correct
  1003. # permissions.
  1004. if os.path.exists(tmpname):
  1005. os.remove(tmpname)
  1006. original_umask = os.umask(0022)
  1007. try:
  1008. tar = tarfile.open(tmpname, self.mode)
  1009. tar.close()
  1010. mode = os.stat(tmpname).st_mode & 0777
  1011. self.assertEqual(mode, 0644, "wrong file permissions")
  1012. finally:
  1013. os.umask(original_umask)
  1014. def test_issue13639(self):
  1015. try:
  1016. with tarfile.open(unicode(tmpname, sys.getfilesystemencoding()), self.mode):
  1017. pass
  1018. except UnicodeDecodeError:
  1019. self.fail("_Stream failed to write unicode filename")
  1020. class GNUWriteTest(unittest.TestCase):
  1021. # This testcase checks for correct creation of GNU Longname
  1022. # and Longlink extended headers (cp. bug #812325).
  1023. def _length(self, s):
  1024. blocks, remainder = divmod(len(s) + 1, 512)
  1025. if remainder:
  1026. blocks += 1
  1027. return blocks * 512
  1028. def _calc_size(self, name, link=None):
  1029. # Initial tar header
  1030. count = 512
  1031. if len(name) > tarfile.LENGTH_NAME:
  1032. # GNU longname extended header + longname
  1033. count += 512
  1034. count += self._length(name)
  1035. if link is not None and len(link) > tarfile.LENGTH_LINK:
  1036. # GNU longlink extended header + longlink
  1037. count += 512
  1038. count += self._length(link)
  1039. return count
  1040. def _test(self, name, link=None):
  1041. tarinfo = tarfile.TarInfo(name)
  1042. if link:
  1043. tarinfo.linkname = link
  1044. tarinfo.type = tarfile.LNKTYPE
  1045. tar = tarfile.open(tmpname, "w")
  1046. try:
  1047. tar.format = tarfile.GNU_FORMAT
  1048. tar.addfile(tarinfo)
  1049. v1 = self._calc_size(name, link)
  1050. v2 = tar.offset
  1051. self.assertTrue(v1 == v2, "GNU longname/longlink creation failed")
  1052. finally:
  1053. tar.close()
  1054. tar = tarfile.open(tmpname)
  1055. try:
  1056. member = tar.next()
  1057. self.assertIsNotNone(member,
  1058. "unable to read longname member")
  1059. self.assertEqual(tarinfo.name, member.name,
  1060. "unable to read longname member")
  1061. self.assertEqual(tarinfo.linkname, member.linkname,
  1062. "unable to read longname member")
  1063. finally:
  1064. tar.close()
  1065. def test_longname_1023(self):
  1066. self._test(("longnam/" * 127) + "longnam")
  1067. def test_longname_1024(self):
  1068. self._test(("longnam/" * 127) + "longname")
  1069. def test_longname_1025(self):
  1070. self._test(("longnam/" * 127) + "longname_")
  1071. def test_longlink_1023(self):
  1072. self._test("name", ("longlnk/" * 127) + "longlnk")
  1073. def test_longlink_1024(self):
  1074. self._test("name", ("longlnk/" * 127) + "longlink")
  1075. def test_longlink_1025(self):
  1076. self._test("name", ("longlnk/" * 127) + "longlink_")
  1077. def test_longnamelink_1023(self):
  1078. self._test(("longnam/" * 127) + "longnam",
  1079. ("longlnk/" * 127) + "longlnk")
  1080. def test_longnamelink_1024(self):
  1081. self._test(("longnam/" * 127) + "longname",
  1082. ("longlnk/" * 127) + "longlink")
  1083. def test_longnamelink_1025(self):
  1084. self._test(("longnam/" * 127) + "longname_",
  1085. ("longlnk/" * 127) + "longlink_")
  1086. class HardlinkTest(unittest.TestCase):
  1087. # Test the creation of LNKTYPE (hardlink) members in an archive.
  1088. def setUp(self):
  1089. self.foo = os.path.join(TEMPDIR, "foo")
  1090. self.bar = os.path.join(TEMPDIR, "bar")
  1091. with open(self.foo, "wb") as fobj:
  1092. fobj.write("foo")
  1093. os.link(self.foo, self.bar)
  1094. self.tar = tarfile.open(tmpname, "w")
  1095. self.tar.add(self.foo)
  1096. def tearDown(self):
  1097. self.tar.close()
  1098. support.unlink(self.foo)
  1099. support.unlink(self.bar)
  1100. def test_add_twice(self):
  1101. # The same name will be added as a REGTYPE every
  1102. # time regardless of st_nlink.
  1103. tarinfo = self.tar.gettarinfo(self.foo)
  1104. self.assertTrue(tarinfo.type == tarfile.REGTYPE,
  1105. "add file as regular failed")
  1106. def test_add_hardlink(self):
  1107. tarinfo = self.tar.gettarinfo(self.bar)
  1108. self.assertTrue(tarinfo.type == tarfile.LNKTYPE,
  1109. "add file as hardlink failed")
  1110. def test_dereference_hardlink(self):
  1111. self.tar.dereference = True
  1112. tarinfo = self.tar.gettarinfo(self.bar)
  1113. self.assertTrue(tarinfo.type == tarfile.REGTYPE,
  1114. "dereferencing hardlink failed")
  1115. class PaxWriteTest(GNUWriteTest):
  1116. def _test(self, name, link=None):
  1117. # See GNUWriteTest.
  1118. tarinfo = tarfile.TarInfo(name)
  1119. if link:
  1120. tarinfo.linkname = link
  1121. tarinfo.type = tarfile.LNKTYPE
  1122. tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT)
  1123. try:
  1124. tar.addfile(tarinfo)
  1125. finally:
  1126. tar.close()
  1127. tar = tarfile.open(tmpname)
  1128. try:
  1129. if link:
  1130. l = tar.getmembers()[0].linkname
  1131. self.assertTrue(link == l, "PAX longlink creation failed")
  1132. else:
  1133. n = tar.getmembers()[0].name
  1134. self.assertTrue(name == n, "PAX longname creation failed")
  1135. finally:
  1136. tar.close()
  1137. def test_pax_global_header(self):
  1138. pax_headers = {
  1139. u"foo": u"bar",
  1140. u"uid": u"0",
  1141. u"mtime": u"1.23",
  1142. u"test": u"\xe4\xf6\xfc",
  1143. u"\xe4\xf6\xfc": u"test"}
  1144. tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT,
  1145. pax_headers=pax_headers)
  1146. try:
  1147. tar.addfile(tarfile.TarInfo("test"))
  1148. finally:
  1149. tar.close()
  1150. # Test if the global header was written correctly.
  1151. tar = tarfile.open(tmpname, encoding="iso8859-1")
  1152. try:
  1153. self.assertEqual(tar.pax_headers, pax_headers)
  1154. self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers)
  1155. # Test if all the fields are unicode.
  1156. for key, val in tar.pax_headers.iteritems():
  1157. self.assertTrue(type(key) is unicode)
  1158. self.assertTrue(type(val) is unicode)
  1159. if key in tarfile.PAX_NUMBER_FIELDS:
  1160. try:
  1161. tarfile.PAX_NUMBER_FIELDS[key](val)
  1162. except (TypeError, ValueError):
  1163. self.fail("unable to convert pax header field")
  1164. finally:
  1165. tar.close()
  1166. def test_pax_extended_header(self):
  1167. # The fields from the pax header have priority over the
  1168. # TarInfo.
  1169. pax_headers = {u"path": u"foo", u"uid": u"123"}
  1170. tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1")
  1171. try:
  1172. t = tarfile.TarInfo()
  1173. t.name = u"\xe4\xf6\xfc" # non-ASCII
  1174. t.uid = 8**8 # too large
  1175. t.pax_headers = pax_headers
  1176. tar.addfile(t)
  1177. finally:
  1178. tar.close()
  1179. tar = tarfile.open(tmpname, encoding="iso8859-1")
  1180. try:
  1181. t = tar.getmembers()[0]
  1182. self.assertEqual(t.pax_headers, pax_headers)
  1183. self.assertEqual(t.name, "foo")
  1184. self.assertEqual(t.uid, 123)
  1185. finally:
  1186. tar.close()
  1187. class UstarUnicodeTest(unittest.TestCase):
  1188. # All *UnicodeTests FIXME
  1189. format = tarfile.USTAR_FORMAT
  1190. def test_iso8859_1_filename(self):
  1191. self._test_unicode_filename("iso8859-1")
  1192. def test_utf7_filename(self):
  1193. self._test_unicode_filename("utf7")
  1194. def test_utf8_filename(self):
  1195. self._test_unicode_filename("utf8")
  1196. def _test_unicode_filename(self, encoding):
  1197. tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict")
  1198. try:
  1199. name = u"\xe4\xf6\xfc"
  1200. tar.addfile(tarfile.TarInfo(name))
  1201. finally:
  1202. tar.close()
  1203. tar = tarfile.open(tmpname, encoding=encoding)
  1204. try:
  1205. self.assertTrue(type(tar.getnames()[0]) is not unicode)
  1206. self.assertEqual(tar.getmembers()[0].name, name.encode(encoding))
  1207. finally:
  1208. tar.close()
  1209. def test_unicode_filename_error(self):
  1210. tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict")
  1211. try:
  1212. tarinfo = tarfile.TarInfo()
  1213. tarinfo.name = "\xe4\xf6\xfc"
  1214. if self.format == tarfile.PAX_FORMAT:
  1215. self.assertRaises(UnicodeError, tar.addfile, tarinfo)
  1216. else:
  1217. tar.addfile(tarinfo)
  1218. tarinfo.name = u"\xe4\xf6\xfc"
  1219. self.assertRaises(UnicodeError, tar.addfile, tarinfo)
  1220. tarinfo.name = "foo"
  1221. tarinfo.uname = u"\xe4\xf6\xfc"
  1222. self.assertRaises(UnicodeError, tar.addfile, tarinfo)
  1223. finally:
  1224. tar.close()
  1225. def test_unicode_argument(self):
  1226. tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict")
  1227. try:
  1228. for t in tar:
  1229. self.assertTrue(type(t.name) is str)
  1230. self.assertTrue(type(t.linkname) is str)
  1231. self.assertTrue(type(t.uname) is str)
  1232. self.assertTrue(type(t.gname) is str)
  1233. finally:
  1234. tar.close()
  1235. def test_uname_unicode(self):
  1236. for name in (u"\xe4\xf6\xfc", "\xe4\xf6\xfc"):
  1237. t = tarfile.TarInfo("foo")
  1238. t.uname = name
  1239. t.gname = name
  1240. fobj = StringIO.StringIO()
  1241. tar = tarfile.open("foo.tar", mode="w", fileobj=fobj, format=self.format, encoding="iso8859-1")
  1242. try:
  1243. tar.addfile(t)
  1244. finally:
  1245. tar.close()
  1246. fobj.seek(0)
  1247. tar = tarfile.open("foo.tar", fileobj=fobj, encoding="iso8859-1")
  1248. t = tar.getmember("foo")
  1249. self.assertEqual(t.uname, "\xe4\xf6\xfc")
  1250. self.assertEqual(t.gname, "\xe4\xf6\xfc")
  1251. class GNUUnicodeTest(UstarUnicodeTest):
  1252. format = tarfile.GNU_FORMAT
  1253. class PaxUnicodeTest(UstarUnicodeTest):
  1254. format = tarfile.PAX_FORMAT
  1255. def _create_unicode_name(self, name):
  1256. tar = tarfile.open(tmpname, "w", format=self.format)
  1257. t = tarfile.TarInfo()
  1258. t.pax_headers["path"] = name
  1259. tar.addfile(t)
  1260. tar.close()
  1261. def test_error_handlers(self):
  1262. # Test if the unicode error handlers work correctly for characters
  1263. # that cannot be expressed in a given encoding.
  1264. self._create_unicode_name(u"\xe4\xf6\xfc")
  1265. for handler, name in (("utf-8", u"\xe4\xf6\xfc".encode("utf8")),
  1266. ("replace", "???"), ("ignore", "")):
  1267. tar = tarfile.open(tmpname, format=self.format, encoding="ascii",
  1268. errors=handler)
  1269. self.assertEqual(tar.getnames()[0], name)
  1270. self.assertRaises(UnicodeError, tarfile.open, tmpname,
  1271. encoding="ascii", errors="strict")
  1272. def test_error_handler_utf8(self):
  1273. # Create a pathname that has one component representable using
  1274. # iso8859-1 and the other only in iso8859-15.
  1275. self._create_unicode_name(u"\xe4\xf6\xfc/\u20ac")
  1276. tar = tarfile.open(tmpname, format=self.format, encoding="iso8859-1",
  1277. errors="utf-8")
  1278. self.assertEqual(tar.getnames()[0], "\xe4\xf6\xfc/" + u"\u20ac".encode("utf8"))
  1279. class AppendTest(unittest.TestCase):
  1280. # Test append mode (cp. patch #1652681).
  1281. def setUp(self):
  1282. self.tarname = tmpname
  1283. if os.path.exists(self.tarname):
  1284. os.remove(self.tarname)
  1285. def _add_testfile(self, fileobj=None):
  1286. with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar:
  1287. tar.addfile(tarfile.TarInfo("bar"))
  1288. def _create_testtar(self, mode="w:"):
  1289. with tarfile.open(tarname, encoding="iso8859-1") as src:
  1290. t = src.getmember("ustar/regtype")
  1291. t.name = "foo"
  1292. f = src.extractfile(t)
  1293. with tarfile.open(self.tarname, mode) as tar:
  1294. tar.addfile(t, f)
  1295. def _test(self, names=["bar"], fileobj=None):
  1296. with tarfile.open(self.tarname, fileobj=fileobj) as tar:
  1297. self.assertEqual(tar.getnames(), names)
  1298. def test_non_existing(self):
  1299. self._add_testfile()
  1300. self._test()
  1301. def test_empty(self):
  1302. tarfile.open(self.tarname, "w:").close()
  1303. self._add_testfile()
  1304. self._test()
  1305. def test_empty_fileobj(self):
  1306. fobj = StringIO.StringIO("\0" * 1024)
  1307. self._add_testfile(fobj)
  1308. fobj.seek(0)
  1309. self._test(fileobj=fobj)
  1310. def test_fileobj(self):
  1311. self._create_testtar()
  1312. with open(self.tarname) as fobj:
  1313. data = fobj.read()
  1314. fobj = StringIO.StringIO(data)
  1315. self._add_testfile(fobj)
  1316. fobj.seek(0)
  1317. self._test(names=["foo", "bar"], fileobj=fobj)
  1318. def test_existing(self):
  1319. self._create_testtar()
  1320. self._add_testfile()
  1321. self._test(names=["foo", "bar"])
  1322. @unittest.skipUnless(gzip, 'requires gzip')
  1323. def test_append_gz(self):
  1324. self._create_testtar("w:gz")
  1325. self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
  1326. @unittest.skipUnless(bz2, 'requires bz2')
  1327. def test_append_bz2(self):
  1328. self._create_testtar("w:bz2")
  1329. self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a")
  1330. # Append mode is supposed to fail if the tarfile to append to
  1331. # does not end with a zero block.
  1332. def _test_error(self, data):
  1333. with open(self.tarname, "wb") as fobj:
  1334. fobj.write(data)
  1335. self.assertRaises(tarfile.ReadError, self._add_testfile)
  1336. def test_null(self):
  1337. self._test_error("")
  1338. def test_incomplete(self):
  1339. self._test_error("\0" * 13)
  1340. def test_premature_eof(self):
  1341. data = tarfile.TarInfo("foo").tobuf()
  1342. self._test_error(data)
  1343. def test_trailing_garbage(self):
  1344. data = tarfile.TarInfo("foo").tobuf()
  1345. self._test_error(data + "\0" * 13)
  1346. def test_invalid(self):
  1347. self._test_error("a" * 512)
  1348. class LimitsTest(unittest.TestCase):
  1349. def test_ustar_limits(self):
  1350. # 100 char name
  1351. tarinfo = tarfile.TarInfo("0123456789" * 10)
  1352. tarinfo.tobuf(tarfile.USTAR_FORMAT)
  1353. # 101 char name that cannot be stored
  1354. tarinfo = tarfile.TarInfo("0123456789" * 10 + "0")
  1355. self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
  1356. # 256 char name with a slash at pos 156
  1357. tarinfo = tarfile.TarInfo("123/" * 62 + "longname")
  1358. tarinfo.tobuf(tarfile.USTAR_FORMAT)
  1359. # 256 char name that cannot be stored
  1360. tarinfo = tarfile.TarInfo("1234567/" * 31 + "longname")
  1361. self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
  1362. # 512 char name
  1363. tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
  1364. self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
  1365. # 512 char linkname
  1366. tarinfo = tarfile.TarInfo("longlink")
  1367. tarinfo.linkname = "123/" * 126 + "longname"
  1368. self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
  1369. # uid > 8 digits
  1370. tarinfo = tarfile.TarInfo("name")
  1371. tarinfo.uid = 010000000
  1372. self.assertRaises(ValueError, tarinfo.tobuf, tarfile.USTAR_FORMAT)
  1373. def test_gnu_limits(self):
  1374. tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
  1375. tarinfo.tobuf(tarfile.GNU_FORMAT)
  1376. tarinfo = tarfile.TarInfo("longlink")
  1377. tarinfo.linkname = "123/" * 126 + "longname"
  1378. tarinfo.tobuf(tarfile.GNU_FORMAT)
  1379. # uid >= 256 ** 7
  1380. tarinfo = tarfile.TarInfo("name")
  1381. tarinfo.uid = 04000000000000000000L
  1382. self.assertRaises(ValueError, tarinfo.tobuf, tarfile.GNU_FORMAT)
  1383. def test_pax_limits(self):
  1384. tarinfo = tarfile.TarInfo("123/" * 126 + "longname")
  1385. tarinfo.tobuf(tarfile.PAX_FORMAT)
  1386. tarinfo = tarfile.TarInfo("longlink")
  1387. tarinfo.linkname = "123/" * 126 + "longname"
  1388. tarinfo.tobuf(tarfile.PAX_FORMAT)
  1389. tarinfo = tarfile.TarInfo("name")
  1390. tarinfo.uid = 04000000000000000000L
  1391. tarinfo.tobuf(tarfile.PAX_FORMAT)
  1392. class MiscTest(unittest.TestCase):
  1393. def test_read_number_fields(self):
  1394. # Issue 24514: Test if empty number fields are converted to zero.
  1395. self.assertEqual(tarfile.nti("\0"), 0)
  1396. self.assertEqual(tarfile.nti(" \0"), 0)
  1397. class ContextManagerTest(unittest.TestCase):
  1398. def test_basic(self):
  1399. with tarfile.open(tarname) as tar:
  1400. self.assertFalse(tar.closed, "closed inside runtime context")
  1401. self.assertTrue(tar.closed, "context manager failed")
  1402. def test_closed(self):
  1403. # The __enter__() method is supposed to raise IOError
  1404. # if the TarFile object is already closed.
  1405. tar = tarfile.open(tarname)
  1406. tar.close()
  1407. with self.assertRaises(IOError):
  1408. with tar:
  1409. pass
  1410. def test_exception(self):
  1411. # Test if the IOError exception is passed through properly.
  1412. with self.assertRaises(Exception) as exc:
  1413. with tarfile.open(tarname) as tar:
  1414. raise IOError
  1415. self.assertIsInstance(exc.exception, IOError,
  1416. "wrong exception raised in context manager")
  1417. self.assertTrue(tar.closed, "context manager failed")
  1418. def test_no_eof(self):
  1419. # __exit__() must not write end-of-archive blocks if an
  1420. # exception was raised.
  1421. try:
  1422. with tarfile.open(tmpname, "w") as tar:
  1423. raise Exception
  1424. except:
  1425. pass
  1426. self.assertEqual(os.path.getsize(tmpname), 0,
  1427. "context manager wrote an end-of-archive block")
  1428. self.assertTrue(tar.closed, "context manager failed")
  1429. def test_eof(self):
  1430. # __exit__() must write end-of-archive blocks, i.e. call
  1431. # TarFile.close() if there was no error.
  1432. with tarfile.open(tmpname, "w"):
  1433. pass
  1434. self.assertNotEqual(os.path.getsize(tmpname), 0,
  1435. "context manager wrote no end-of-archive block")
  1436. def test_fileobj(self):
  1437. # Test that __exit__() did not close the external file
  1438. # object.
  1439. with open(tmpname, "wb") as fobj:
  1440. try:
  1441. with tarfile.open(fileobj=fobj, mode="w") as tar:
  1442. raise Exception
  1443. except:
  1444. pass
  1445. self.assertFalse(fobj.closed, "external file object was closed")
  1446. self.assertTrue(tar.closed, "context manager failed")
  1447. class LinkEmulationTest(ReadTest):
  1448. # Test for issue #8741 regression. On platforms that do not support
  1449. # symbolic or hard links tarfile tries to extract these types of members as
  1450. # the regular files they point to.
  1451. def _test_link_extraction(self, name):
  1452. self.tar.extract(name, TEMPDIR)
  1453. data = open(os.path.join(TEMPDIR, name), "rb").read()
  1454. self.assertEqual(md5sum(data), md5_regtype)
  1455. def test_hardlink_extraction1(self):
  1456. self._test_link_extraction("ustar/lnktype")
  1457. def test_hardlink_extraction2(self):
  1458. self._test_link_extraction("./ustar/linktest2/lnktype")
  1459. def test_symlink_extraction1(self):
  1460. self._test_link_extraction("ustar/symtype")
  1461. def test_symlink_extraction2(self):
  1462. self._test_link_extraction("./ustar/linktest2/symtype")
  1463. class GzipMiscReadTest(MiscReadTest):
  1464. tarname = gzipname
  1465. mode = "r:gz"
  1466. taropen = tarfile.TarFile.gzopen
  1467. class GzipUstarReadTest(UstarReadTest):
  1468. tarname = gzipname
  1469. mode = "r:gz"
  1470. class GzipStreamReadTest(StreamReadTest):
  1471. tarname = gzipname
  1472. mode = "r|gz"
  1473. class GzipWriteTest(WriteTest):
  1474. mode = "w:gz"
  1475. class GzipStreamWriteTest(StreamWriteTest):
  1476. mode = "w|gz"
  1477. class Bz2MiscReadTest(MiscReadTest):
  1478. tarname = bz2name
  1479. mode = "r:bz2"
  1480. taropen = tarfile.TarFile.bz2open
  1481. class Bz2UstarReadTest(UstarReadTest):
  1482. tarname = bz2name
  1483. mode = "r:bz2"
  1484. class Bz2StreamReadTest(StreamReadTest):
  1485. tarname = bz2name
  1486. mode = "r|bz2"
  1487. class Bz2WriteTest(WriteTest):
  1488. mode = "w:bz2"
  1489. class Bz2StreamWriteTest(StreamWriteTest):
  1490. mode = "w|bz2"
  1491. class Bz2PartialReadTest(unittest.TestCase):
  1492. # Issue5068: The _BZ2Proxy.read() method loops forever
  1493. # on an empty or partial bzipped file.
  1494. def _test_partial_input(self, mode):
  1495. class MyStringIO(StringIO.StringIO):
  1496. hit_eof = False
  1497. def read(self, n):
  1498. if self.hit_eof:
  1499. raise AssertionError("infinite loop detected in tarfile.open()")
  1500. self.hit_eof = self.pos == self.len
  1501. return StringIO.StringIO.read(self, n)
  1502. def seek(self, *args):
  1503. self.hit_eof = False
  1504. return StringIO.StringIO.seek(self, *args)
  1505. data = bz2.compress(tarfile.TarInfo("foo").tobuf())
  1506. for x in range(len(data) + 1):
  1507. try:
  1508. tarfile.open(fileobj=MyStringIO(data[:x]), mode=mode)
  1509. except tarfile.ReadError:
  1510. pass # we have no interest in ReadErrors
  1511. def test_partial_input(self):
  1512. self._test_partial_input("r")
  1513. def test_partial_input_bz2(self):
  1514. self._test_partial_input("r:bz2")
  1515. def test_main():
  1516. support.unlink(TEMPDIR)
  1517. os.makedirs(TEMPDIR)
  1518. tests = [
  1519. UstarReadTest,
  1520. MiscReadTest,
  1521. StreamReadTest,
  1522. DetectReadTest,
  1523. MemberReadTest,
  1524. GNUReadTest,
  1525. PaxReadTest,
  1526. ListTest,
  1527. WriteTest,
  1528. StreamWriteTest,
  1529. GNUWriteTest,
  1530. PaxWriteTest,
  1531. UstarUnicodeTest,
  1532. GNUUnicodeTest,
  1533. PaxUnicodeTest,
  1534. AppendTest,
  1535. LimitsTest,
  1536. MiscTest,
  1537. ContextManagerTest,
  1538. ]
  1539. if hasattr(os, "link"):
  1540. tests.append(HardlinkTest)
  1541. else:
  1542. tests.append(LinkEmulationTest)
  1543. with open(tarname, "rb") as fobj:
  1544. data = fobj.read()
  1545. if gzip:
  1546. # Create testtar.tar.gz and add gzip-specific tests.
  1547. support.unlink(gzipname)
  1548. with gzip.open(gzipname, "wb") as tar:
  1549. tar.write(data)
  1550. tests += [
  1551. GzipMiscReadTest,
  1552. GzipUstarReadTest,
  1553. GzipStreamReadTest,
  1554. GzipListTest,
  1555. GzipWriteTest,
  1556. GzipStreamWriteTest,
  1557. ]
  1558. if bz2:
  1559. # Create testtar.tar.bz2 and add bz2-specific tests.
  1560. support.unlink(bz2name)
  1561. tar = bz2.BZ2File(bz2name, "wb")
  1562. try:
  1563. tar.write(data)
  1564. finally:
  1565. tar.close()
  1566. tests += [
  1567. Bz2MiscReadTest,
  1568. Bz2UstarReadTest,
  1569. Bz2StreamReadTest,
  1570. Bz2ListTest,
  1571. Bz2WriteTest,
  1572. Bz2StreamWriteTest,
  1573. Bz2PartialReadTest,
  1574. ]
  1575. try:
  1576. test_support.run_unittest(*tests)
  1577. finally:
  1578. if os.path.exists(TEMPDIR):
  1579. shutil.rmtree(TEMPDIR)
  1580. if __name__ == "__main__":
  1581. test_main()