hooks.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. #-*- coding: ISO-8859-1 -*-
  2. # pysqlite2/test/hooks.py: tests for various SQLite-specific hooks
  3. #
  4. # Copyright (C) 2006-2007 Gerhard Häring <gh@ghaering.de>
  5. #
  6. # This file is part of pysqlite.
  7. #
  8. # This software is provided 'as-is', without any express or implied
  9. # warranty. In no event will the authors be held liable for any damages
  10. # arising from the use of this software.
  11. #
  12. # Permission is granted to anyone to use this software for any purpose,
  13. # including commercial applications, and to alter it and redistribute it
  14. # freely, subject to the following restrictions:
  15. #
  16. # 1. The origin of this software must not be misrepresented; you must not
  17. # claim that you wrote the original software. If you use this software
  18. # in a product, an acknowledgment in the product documentation would be
  19. # appreciated but is not required.
  20. # 2. Altered source versions must be plainly marked as such, and must not be
  21. # misrepresented as being the original software.
  22. # 3. This notice may not be removed or altered from any source distribution.
  23. import os, unittest
  24. import sqlite3 as sqlite
  25. class CollationTests(unittest.TestCase):
  26. def setUp(self):
  27. pass
  28. def tearDown(self):
  29. pass
  30. def CheckCreateCollationNotCallable(self):
  31. con = sqlite.connect(":memory:")
  32. try:
  33. con.create_collation("X", 42)
  34. self.fail("should have raised a TypeError")
  35. except TypeError, e:
  36. self.assertEqual(e.args[0], "parameter must be callable")
  37. def CheckCreateCollationNotAscii(self):
  38. con = sqlite.connect(":memory:")
  39. try:
  40. con.create_collation("collä", cmp)
  41. self.fail("should have raised a ProgrammingError")
  42. except sqlite.ProgrammingError, e:
  43. pass
  44. def CheckCollationIsUsed(self):
  45. if sqlite.version_info < (3, 2, 1): # old SQLite versions crash on this test
  46. return
  47. def mycoll(x, y):
  48. # reverse order
  49. return -cmp(x, y)
  50. con = sqlite.connect(":memory:")
  51. con.create_collation("mycoll", mycoll)
  52. sql = """
  53. select x from (
  54. select 'a' as x
  55. union
  56. select 'b' as x
  57. union
  58. select 'c' as x
  59. ) order by x collate mycoll
  60. """
  61. result = con.execute(sql).fetchall()
  62. if result[0][0] != "c" or result[1][0] != "b" or result[2][0] != "a":
  63. self.fail("the expected order was not returned")
  64. con.create_collation("mycoll", None)
  65. try:
  66. result = con.execute(sql).fetchall()
  67. self.fail("should have raised an OperationalError")
  68. except sqlite.OperationalError, e:
  69. self.assertEqual(e.args[0].lower(), "no such collation sequence: mycoll")
  70. def CheckCollationReturnsLargeInteger(self):
  71. def mycoll(x, y):
  72. # reverse order
  73. return -((x > y) - (x < y)) * 2**32
  74. con = sqlite.connect(":memory:")
  75. con.create_collation("mycoll", mycoll)
  76. sql = """
  77. select x from (
  78. select 'a' as x
  79. union
  80. select 'b' as x
  81. union
  82. select 'c' as x
  83. ) order by x collate mycoll
  84. """
  85. result = con.execute(sql).fetchall()
  86. self.assertEqual(result, [('c',), ('b',), ('a',)],
  87. msg="the expected order was not returned")
  88. def CheckCollationRegisterTwice(self):
  89. """
  90. Register two different collation functions under the same name.
  91. Verify that the last one is actually used.
  92. """
  93. con = sqlite.connect(":memory:")
  94. con.create_collation("mycoll", cmp)
  95. con.create_collation("mycoll", lambda x, y: -cmp(x, y))
  96. result = con.execute("""
  97. select x from (select 'a' as x union select 'b' as x) order by x collate mycoll
  98. """).fetchall()
  99. if result[0][0] != 'b' or result[1][0] != 'a':
  100. self.fail("wrong collation function is used")
  101. def CheckDeregisterCollation(self):
  102. """
  103. Register a collation, then deregister it. Make sure an error is raised if we try
  104. to use it.
  105. """
  106. con = sqlite.connect(":memory:")
  107. con.create_collation("mycoll", cmp)
  108. con.create_collation("mycoll", None)
  109. try:
  110. con.execute("select 'a' as x union select 'b' as x order by x collate mycoll")
  111. self.fail("should have raised an OperationalError")
  112. except sqlite.OperationalError, e:
  113. if not e.args[0].startswith("no such collation sequence"):
  114. self.fail("wrong OperationalError raised")
  115. class ProgressTests(unittest.TestCase):
  116. def CheckProgressHandlerUsed(self):
  117. """
  118. Test that the progress handler is invoked once it is set.
  119. """
  120. con = sqlite.connect(":memory:")
  121. progress_calls = []
  122. def progress():
  123. progress_calls.append(None)
  124. return 0
  125. con.set_progress_handler(progress, 1)
  126. con.execute("""
  127. create table foo(a, b)
  128. """)
  129. self.assertTrue(progress_calls)
  130. def CheckOpcodeCount(self):
  131. """
  132. Test that the opcode argument is respected.
  133. """
  134. con = sqlite.connect(":memory:")
  135. progress_calls = []
  136. def progress():
  137. progress_calls.append(None)
  138. return 0
  139. con.set_progress_handler(progress, 1)
  140. curs = con.cursor()
  141. curs.execute("""
  142. create table foo (a, b)
  143. """)
  144. first_count = len(progress_calls)
  145. progress_calls = []
  146. con.set_progress_handler(progress, 2)
  147. curs.execute("""
  148. create table bar (a, b)
  149. """)
  150. second_count = len(progress_calls)
  151. self.assertGreaterEqual(first_count, second_count)
  152. def CheckCancelOperation(self):
  153. """
  154. Test that returning a non-zero value stops the operation in progress.
  155. """
  156. con = sqlite.connect(":memory:")
  157. progress_calls = []
  158. def progress():
  159. progress_calls.append(None)
  160. return 1
  161. con.set_progress_handler(progress, 1)
  162. curs = con.cursor()
  163. self.assertRaises(
  164. sqlite.OperationalError,
  165. curs.execute,
  166. "create table bar (a, b)")
  167. def CheckClearHandler(self):
  168. """
  169. Test that setting the progress handler to None clears the previously set handler.
  170. """
  171. con = sqlite.connect(":memory:")
  172. action = []
  173. def progress():
  174. action.append(1)
  175. return 0
  176. con.set_progress_handler(progress, 1)
  177. con.set_progress_handler(None, 1)
  178. con.execute("select 1 union select 2 union select 3").fetchall()
  179. self.assertEqual(len(action), 0, "progress handler was not cleared")
  180. def suite():
  181. collation_suite = unittest.makeSuite(CollationTests, "Check")
  182. progress_suite = unittest.makeSuite(ProgressTests, "Check")
  183. return unittest.TestSuite((collation_suite, progress_suite))
  184. def test():
  185. runner = unittest.TextTestRunner()
  186. runner.run(suite())
  187. if __name__ == "__main__":
  188. test()