test_kqueue.py 7.3 KB


  1. """
  2. Tests for kqueue wrapper.
  3. """
  4. import socket
  5. import errno
  6. import time
  7. import select
  8. import sys
  9. import unittest
  10. from test import test_support
  11. if not hasattr(select, "kqueue"):
  12. raise unittest.SkipTest("test works only on BSD")
  13. class TestKQueue(unittest.TestCase):
  14. def test_create_queue(self):
  15. kq = select.kqueue()
  16. self.assertTrue(kq.fileno() > 0, kq.fileno())
  17. self.assertTrue(not kq.closed)
  18. kq.close()
  19. self.assertTrue(kq.closed)
  20. self.assertRaises(ValueError, kq.fileno)
  21. def test_create_event(self):
  22. fd = sys.stderr.fileno()
  23. ev = select.kevent(fd)
  24. other = select.kevent(1000)
  25. self.assertEqual(ev.ident, fd)
  26. self.assertEqual(ev.filter, select.KQ_FILTER_READ)
  27. self.assertEqual(ev.flags, select.KQ_EV_ADD)
  28. self.assertEqual(ev.fflags, 0)
  29. self.assertEqual(ev.data, 0)
  30. self.assertEqual(ev.udata, 0)
  31. self.assertEqual(ev, ev)
  32. self.assertNotEqual(ev, other)
  33. self.assertEqual(cmp(ev, other), -1)
  34. self.assertTrue(ev < other)
  35. self.assertTrue(other >= ev)
  36. self.assertRaises(TypeError, cmp, ev, None)
  37. self.assertRaises(TypeError, cmp, ev, 1)
  38. self.assertRaises(TypeError, cmp, ev, "ev")
  39. ev = select.kevent(fd, select.KQ_FILTER_WRITE)
  40. self.assertEqual(ev.ident, fd)
  41. self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
  42. self.assertEqual(ev.flags, select.KQ_EV_ADD)
  43. self.assertEqual(ev.fflags, 0)
  44. self.assertEqual(ev.data, 0)
  45. self.assertEqual(ev.udata, 0)
  46. self.assertEqual(ev, ev)
  47. self.assertNotEqual(ev, other)
  48. ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
  49. self.assertEqual(ev.ident, fd)
  50. self.assertEqual(ev.filter, select.KQ_FILTER_WRITE)
  51. self.assertEqual(ev.flags, select.KQ_EV_ONESHOT)
  52. self.assertEqual(ev.fflags, 0)
  53. self.assertEqual(ev.data, 0)
  54. self.assertEqual(ev.udata, 0)
  55. self.assertEqual(ev, ev)
  56. self.assertNotEqual(ev, other)
  57. ev = select.kevent(1, 2, 3, 4, 5, 6)
  58. self.assertEqual(ev.ident, 1)
  59. self.assertEqual(ev.filter, 2)
  60. self.assertEqual(ev.flags, 3)
  61. self.assertEqual(ev.fflags, 4)
  62. self.assertEqual(ev.data, 5)
  63. self.assertEqual(ev.udata, 6)
  64. self.assertEqual(ev, ev)
  65. self.assertNotEqual(ev, other)
  66. bignum = 0x7fff
  67. ev = select.kevent(bignum, 1, 2, 3, bignum - 1, bignum)
  68. self.assertEqual(ev.ident, bignum)
  69. self.assertEqual(ev.filter, 1)
  70. self.assertEqual(ev.flags, 2)
  71. self.assertEqual(ev.fflags, 3)
  72. self.assertEqual(ev.data, bignum - 1)
  73. self.assertEqual(ev.udata, bignum)
  74. self.assertEqual(ev, ev)
  75. self.assertNotEqual(ev, other)
  76. # Issue 11973
  77. bignum = 0xffff
  78. ev = select.kevent(0, 1, bignum)
  79. self.assertEqual(ev.ident, 0)
  80. self.assertEqual(ev.filter, 1)
  81. self.assertEqual(ev.flags, bignum)
  82. self.assertEqual(ev.fflags, 0)
  83. self.assertEqual(ev.data, 0)
  84. self.assertEqual(ev.udata, 0)
  85. self.assertEqual(ev, ev)
  86. self.assertNotEqual(ev, other)
  87. # Issue 11973
  88. bignum = 0xffffffff
  89. ev = select.kevent(0, 1, 2, bignum)
  90. self.assertEqual(ev.ident, 0)
  91. self.assertEqual(ev.filter, 1)
  92. self.assertEqual(ev.flags, 2)
  93. self.assertEqual(ev.fflags, bignum)
  94. self.assertEqual(ev.data, 0)
  95. self.assertEqual(ev.udata, 0)
  96. self.assertEqual(ev, ev)
  97. self.assertNotEqual(ev, other)
  98. def test_queue_event(self):
  99. serverSocket = socket.socket()
  100. serverSocket.bind(('127.0.0.1', 0))
  101. serverSocket.listen(1)
  102. client = socket.socket()
  103. client.setblocking(False)
  104. try:
  105. client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
  106. except socket.error, e:
  107. self.assertEqual(e.args[0], errno.EINPROGRESS)
  108. else:
  109. #raise AssertionError("Connect should have raised EINPROGRESS")
  110. pass # FreeBSD doesn't raise an exception here
  111. server, addr = serverSocket.accept()
  112. kq = select.kqueue()
  113. kq2 = select.kqueue.fromfd(kq.fileno())
  114. ev = select.kevent(server.fileno(),
  115. select.KQ_FILTER_WRITE,
  116. select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  117. kq.control([ev], 0)
  118. ev = select.kevent(server.fileno(),
  119. select.KQ_FILTER_READ,
  120. select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  121. kq.control([ev], 0)
  122. ev = select.kevent(client.fileno(),
  123. select.KQ_FILTER_WRITE,
  124. select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  125. kq2.control([ev], 0)
  126. ev = select.kevent(client.fileno(),
  127. select.KQ_FILTER_READ,
  128. select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  129. kq2.control([ev], 0)
  130. events = kq.control(None, 4, 1)
  131. events = set((e.ident, e.filter) for e in events)
  132. self.assertEqual(events, set([
  133. (client.fileno(), select.KQ_FILTER_WRITE),
  134. (server.fileno(), select.KQ_FILTER_WRITE)]))
  135. client.send("Hello!")
  136. server.send("world!!!")
  137. # We may need to call it several times
  138. for i in range(10):
  139. events = kq.control(None, 4, 1)
  140. if len(events) == 4:
  141. break
  142. time.sleep(1.0)
  143. else:
  144. self.fail('timeout waiting for event notifications')
  145. events = set((e.ident, e.filter) for e in events)
  146. self.assertEqual(events, set([
  147. (client.fileno(), select.KQ_FILTER_WRITE),
  148. (client.fileno(), select.KQ_FILTER_READ),
  149. (server.fileno(), select.KQ_FILTER_WRITE),
  150. (server.fileno(), select.KQ_FILTER_READ)]))
  151. # Remove completely client, and server read part
  152. ev = select.kevent(client.fileno(),
  153. select.KQ_FILTER_WRITE,
  154. select.KQ_EV_DELETE)
  155. kq.control([ev], 0)
  156. ev = select.kevent(client.fileno(),
  157. select.KQ_FILTER_READ,
  158. select.KQ_EV_DELETE)
  159. kq.control([ev], 0)
  160. ev = select.kevent(server.fileno(),
  161. select.KQ_FILTER_READ,
  162. select.KQ_EV_DELETE)
  163. kq.control([ev], 0, 0)
  164. events = kq.control([], 4, 0.99)
  165. events = set((e.ident, e.filter) for e in events)
  166. self.assertEqual(events, set([
  167. (server.fileno(), select.KQ_FILTER_WRITE)]))
  168. client.close()
  169. server.close()
  170. serverSocket.close()
  171. def testPair(self):
  172. kq = select.kqueue()
  173. a, b = socket.socketpair()
  174. a.send(b'foo')
  175. event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  176. event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD | select.KQ_EV_ENABLE)
  177. r = kq.control([event1, event2], 1, 1)
  178. self.assertTrue(r)
  179. self.assertFalse(r[0].flags & select.KQ_EV_ERROR)
  180. self.assertEqual(b.recv(r[0].data), b'foo')
  181. a.close()
  182. b.close()
  183. kq.close()
  184. def test_main():
  185. test_support.run_unittest(TestKQueue)
  186. if __name__ == "__main__":
  187. test_main()