test_bsddb.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. """Test script for the bsddb C module by Roger E. Masse
  2. Adapted to unittest format and expanded scope by Raymond Hettinger
  3. """
  4. import os, sys
  5. import unittest
  6. from test import test_support
  7. # Skip test if _bsddb wasn't built.
  8. test_support.import_module('_bsddb')
  9. bsddb = test_support.import_module('bsddb', deprecated=True)
  10. # Just so we know it's imported:
  11. test_support.import_module('dbhash', deprecated=True)
  12. class TestBSDDB(unittest.TestCase):
  13. openflag = 'c'
  14. def setUp(self):
  15. self.f = self.openmethod[0](self.fname, self.openflag, cachesize=32768)
  16. self.d = dict(q='Guido', w='van', e='Rossum', r='invented', t='Python', y='')
  17. for k, v in self.d.iteritems():
  18. self.f[k] = v
  19. def tearDown(self):
  20. self.f.sync()
  21. self.f.close()
  22. if self.fname is None:
  23. return
  24. try:
  25. os.remove(self.fname)
  26. except os.error:
  27. pass
  28. def test_getitem(self):
  29. for k, v in self.d.iteritems():
  30. self.assertEqual(self.f[k], v)
  31. def test_len(self):
  32. self.assertEqual(len(self.f), len(self.d))
  33. def test_change(self):
  34. self.f['r'] = 'discovered'
  35. self.assertEqual(self.f['r'], 'discovered')
  36. self.assertIn('r', self.f.keys())
  37. self.assertIn('discovered', self.f.values())
  38. def test_close_and_reopen(self):
  39. self.assertIsNotNone(self.fname)
  40. self.f.close()
  41. self.f = self.openmethod[0](self.fname, 'w')
  42. for k, v in self.d.iteritems():
  43. self.assertEqual(self.f[k], v)
  44. def assertSetEquals(self, seqn1, seqn2):
  45. self.assertEqual(set(seqn1), set(seqn2))
  46. def test_mapping_iteration_methods(self):
  47. f = self.f
  48. d = self.d
  49. self.assertSetEquals(d, f)
  50. self.assertSetEquals(d.keys(), f.keys())
  51. self.assertSetEquals(d.values(), f.values())
  52. self.assertSetEquals(d.items(), f.items())
  53. self.assertSetEquals(d.iterkeys(), f.iterkeys())
  54. self.assertSetEquals(d.itervalues(), f.itervalues())
  55. self.assertSetEquals(d.iteritems(), f.iteritems())
  56. def test_iter_while_modifying_values(self):
  57. di = iter(self.d)
  58. while 1:
  59. try:
  60. key = di.next()
  61. self.d[key] = 'modified '+key
  62. except StopIteration:
  63. break
  64. # it should behave the same as a dict. modifying values
  65. # of existing keys should not break iteration. (adding
  66. # or removing keys should)
  67. loops_left = len(self.f)
  68. fi = iter(self.f)
  69. while 1:
  70. try:
  71. key = fi.next()
  72. self.f[key] = 'modified '+key
  73. loops_left -= 1
  74. except StopIteration:
  75. break
  76. self.assertEqual(loops_left, 0)
  77. self.test_mapping_iteration_methods()
  78. def test_iter_abort_on_changed_size(self):
  79. def DictIterAbort():
  80. di = iter(self.d)
  81. while 1:
  82. try:
  83. di.next()
  84. self.d['newkey'] = 'SPAM'
  85. except StopIteration:
  86. break
  87. self.assertRaises(RuntimeError, DictIterAbort)
  88. def DbIterAbort():
  89. fi = iter(self.f)
  90. while 1:
  91. try:
  92. fi.next()
  93. self.f['newkey'] = 'SPAM'
  94. except StopIteration:
  95. break
  96. self.assertRaises(RuntimeError, DbIterAbort)
  97. def test_iteritems_abort_on_changed_size(self):
  98. def DictIteritemsAbort():
  99. di = self.d.iteritems()
  100. while 1:
  101. try:
  102. di.next()
  103. self.d['newkey'] = 'SPAM'
  104. except StopIteration:
  105. break
  106. self.assertRaises(RuntimeError, DictIteritemsAbort)
  107. def DbIteritemsAbort():
  108. fi = self.f.iteritems()
  109. while 1:
  110. try:
  111. key, value = fi.next()
  112. del self.f[key]
  113. except StopIteration:
  114. break
  115. self.assertRaises(RuntimeError, DbIteritemsAbort)
  116. def test_iteritems_while_modifying_values(self):
  117. di = self.d.iteritems()
  118. while 1:
  119. try:
  120. k, v = di.next()
  121. self.d[k] = 'modified '+v
  122. except StopIteration:
  123. break
  124. # it should behave the same as a dict. modifying values
  125. # of existing keys should not break iteration. (adding
  126. # or removing keys should)
  127. loops_left = len(self.f)
  128. fi = self.f.iteritems()
  129. while 1:
  130. try:
  131. k, v = fi.next()
  132. self.f[k] = 'modified '+v
  133. loops_left -= 1
  134. except StopIteration:
  135. break
  136. self.assertEqual(loops_left, 0)
  137. self.test_mapping_iteration_methods()
  138. def test_first_next_looping(self):
  139. items = [self.f.first()]
  140. for i in xrange(1, len(self.f)):
  141. items.append(self.f.next())
  142. self.assertSetEquals(items, self.d.items())
  143. def test_previous_last_looping(self):
  144. items = [self.f.last()]
  145. for i in xrange(1, len(self.f)):
  146. items.append(self.f.previous())
  147. self.assertSetEquals(items, self.d.items())
  148. def test_first_while_deleting(self):
  149. # Test for bug 1725856
  150. self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
  151. for _ in self.d:
  152. key = self.f.first()[0]
  153. del self.f[key]
  154. self.assertEqual([], self.f.items(), "expected empty db after test")
  155. def test_last_while_deleting(self):
  156. # Test for bug 1725856's evil twin
  157. self.assertTrue(len(self.d) >= 2, "test requires >=2 items")
  158. for _ in self.d:
  159. key = self.f.last()[0]
  160. del self.f[key]
  161. self.assertEqual([], self.f.items(), "expected empty db after test")
  162. def test_set_location(self):
  163. self.assertEqual(self.f.set_location('e'), ('e', self.d['e']))
  164. def test_contains(self):
  165. for k in self.d:
  166. self.assertIn(k, self.f)
  167. self.assertNotIn('not here', self.f)
  168. def test_has_key(self):
  169. for k in self.d:
  170. self.assertTrue(self.f.has_key(k))
  171. self.assertTrue(not self.f.has_key('not here'))
  172. def test_clear(self):
  173. self.f.clear()
  174. self.assertEqual(len(self.f), 0)
  175. def test__no_deadlock_first(self, debug=0):
  176. # do this so that testers can see what function we're in in
  177. # verbose mode when we deadlock.
  178. sys.stdout.flush()
  179. # in pybsddb's _DBWithCursor this causes an internal DBCursor
  180. # object is created. Other test_ methods in this class could
  181. # inadvertently cause the deadlock but an explicit test is needed.
  182. if debug: print "A"
  183. k,v = self.f.first()
  184. if debug: print "B", k
  185. self.f[k] = "deadlock. do not pass go. do not collect $200."
  186. if debug: print "C"
  187. # if the bsddb implementation leaves the DBCursor open during
  188. # the database write and locking+threading support is enabled
  189. # the cursor's read lock will deadlock the write lock request..
  190. # test the iterator interface
  191. if True:
  192. if debug: print "D"
  193. i = self.f.iteritems()
  194. k,v = i.next()
  195. if debug: print "E"
  196. self.f[k] = "please don't deadlock"
  197. if debug: print "F"
  198. while 1:
  199. try:
  200. k,v = i.next()
  201. except StopIteration:
  202. break
  203. if debug: print "F2"
  204. i = iter(self.f)
  205. if debug: print "G"
  206. while i:
  207. try:
  208. if debug: print "H"
  209. k = i.next()
  210. if debug: print "I"
  211. self.f[k] = "deadlocks-r-us"
  212. if debug: print "J"
  213. except StopIteration:
  214. i = None
  215. if debug: print "K"
  216. # test the legacy cursor interface mixed with writes
  217. self.assertIn(self.f.first()[0], self.d)
  218. k = self.f.next()[0]
  219. self.assertIn(k, self.d)
  220. self.f[k] = "be gone with ye deadlocks"
  221. self.assertTrue(self.f[k], "be gone with ye deadlocks")
  222. def test_for_cursor_memleak(self):
  223. # do the bsddb._DBWithCursor iterator internals leak cursors?
  224. nc1 = len(self.f._cursor_refs)
  225. # create iterator
  226. i = self.f.iteritems()
  227. nc2 = len(self.f._cursor_refs)
  228. # use the iterator (should run to the first yield, creating the cursor)
  229. k, v = i.next()
  230. nc3 = len(self.f._cursor_refs)
  231. # destroy the iterator; this should cause the weakref callback
  232. # to remove the cursor object from self.f._cursor_refs
  233. del i
  234. nc4 = len(self.f._cursor_refs)
  235. self.assertEqual(nc1, nc2)
  236. self.assertEqual(nc1, nc4)
  237. self.assertTrue(nc3 == nc1+1)
  238. def test_popitem(self):
  239. k, v = self.f.popitem()
  240. self.assertIn(k, self.d)
  241. self.assertIn(v, self.d.values())
  242. self.assertNotIn(k, self.f)
  243. self.assertEqual(len(self.d)-1, len(self.f))
  244. def test_pop(self):
  245. k = 'w'
  246. v = self.f.pop(k)
  247. self.assertEqual(v, self.d[k])
  248. self.assertNotIn(k, self.f)
  249. self.assertNotIn(v, self.f.values())
  250. self.assertEqual(len(self.d)-1, len(self.f))
  251. def test_get(self):
  252. self.assertEqual(self.f.get('NotHere'), None)
  253. self.assertEqual(self.f.get('NotHere', 'Default'), 'Default')
  254. self.assertEqual(self.f.get('q', 'Default'), self.d['q'])
  255. def test_setdefault(self):
  256. self.assertEqual(self.f.setdefault('new', 'dog'), 'dog')
  257. self.assertEqual(self.f.setdefault('r', 'cat'), self.d['r'])
  258. def test_update(self):
  259. new = dict(y='life', u='of', i='brian')
  260. self.f.update(new)
  261. self.d.update(new)
  262. for k, v in self.d.iteritems():
  263. self.assertEqual(self.f[k], v)
  264. def test_keyordering(self):
  265. self.assertIs(self.openmethod[0], bsddb.btopen)
  266. keys = self.d.keys()
  267. keys.sort()
  268. self.assertEqual(self.f.first()[0], keys[0])
  269. self.assertEqual(self.f.next()[0], keys[1])
  270. self.assertEqual(self.f.last()[0], keys[-1])
  271. self.assertEqual(self.f.previous()[0], keys[-2])
  272. self.assertEqual(list(self.f), keys)
  273. class TestBTree(TestBSDDB):
  274. fname = test_support.TESTFN
  275. openmethod = [bsddb.btopen]
  276. class TestBTree_InMemory(TestBSDDB):
  277. fname = None
  278. openmethod = [bsddb.btopen]
  279. # if we're using an in-memory only db, we can't reopen it
  280. test_close_and_reopen = None
  281. class TestBTree_InMemory_Truncate(TestBSDDB):
  282. fname = None
  283. openflag = 'n'
  284. openmethod = [bsddb.btopen]
  285. # if we're using an in-memory only db, we can't reopen it
  286. test_close_and_reopen = None
  287. class TestHashTable(TestBSDDB):
  288. fname = test_support.TESTFN
  289. openmethod = [bsddb.hashopen]
  290. # keyordering is specific to btopen method
  291. test_keyordering = None
  292. class TestHashTable_InMemory(TestBSDDB):
  293. fname = None
  294. openmethod = [bsddb.hashopen]
  295. # if we're using an in-memory only db, we can't reopen it
  296. test_close_and_reopen = None
  297. # keyordering is specific to btopen method
  298. test_keyordering = None
  299. ## # (bsddb.rnopen,'Record Numbers'), 'put' for RECNO for bsddb 1.85
  300. ## # appears broken... at least on
  301. ## # Solaris Intel - rmasse 1/97
  302. def test_main(verbose=None):
  303. test_support.run_unittest(
  304. TestBTree,
  305. TestHashTable,
  306. TestBTree_InMemory,
  307. TestHashTable_InMemory,
  308. TestBTree_InMemory_Truncate,
  309. )
  310. if __name__ == "__main__":
  311. test_main(verbose=True)