123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- # Copyright (c) 2001-2006 Twisted Matrix Laboratories.
- #
- # Permission is hereby granted, free of charge, to any person obtaining
- # a copy of this software and associated documentation files (the
- # "Software"), to deal in the Software without restriction, including
- # without limitation the rights to use, copy, modify, merge, publish,
- # distribute, sublicense, and/or sell copies of the Software, and to
- # permit persons to whom the Software is furnished to do so, subject to
- # the following conditions:
- #
- # The above copyright notice and this permission notice shall be
- # included in all copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
- # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
- # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
- # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
- # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
- # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
- # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
- """
- Tests for epoll wrapper.
- """
- import socket
- import errno
- import time
- import select
- import unittest
- from test import test_support
- if not hasattr(select, "epoll"):
- raise unittest.SkipTest("test works only on Linux 2.6")
- try:
- select.epoll()
- except IOError, e:
- if e.errno == errno.ENOSYS:
- raise unittest.SkipTest("kernel doesn't support epoll()")
- raise
- class TestEPoll(unittest.TestCase):
- def setUp(self):
- self.serverSocket = socket.socket()
- self.serverSocket.bind(('127.0.0.1', 0))
- self.serverSocket.listen(1)
- self.connections = [self.serverSocket]
- def tearDown(self):
- for skt in self.connections:
- skt.close()
- def _connected_pair(self):
- client = socket.socket()
- client.setblocking(False)
- try:
- client.connect(('127.0.0.1', self.serverSocket.getsockname()[1]))
- except socket.error, e:
- self.assertEqual(e.args[0], errno.EINPROGRESS)
- else:
- raise AssertionError("Connect should have raised EINPROGRESS")
- server, addr = self.serverSocket.accept()
- self.connections.extend((client, server))
- return client, server
- def test_create(self):
- try:
- ep = select.epoll(16)
- except OSError, e:
- raise AssertionError(str(e))
- self.assertTrue(ep.fileno() > 0, ep.fileno())
- self.assertTrue(not ep.closed)
- ep.close()
- self.assertTrue(ep.closed)
- self.assertRaises(ValueError, ep.fileno)
- def test_badcreate(self):
- self.assertRaises(TypeError, select.epoll, 1, 2, 3)
- self.assertRaises(TypeError, select.epoll, 'foo')
- self.assertRaises(TypeError, select.epoll, None)
- self.assertRaises(TypeError, select.epoll, ())
- self.assertRaises(TypeError, select.epoll, ['foo'])
- self.assertRaises(TypeError, select.epoll, {})
- def test_add(self):
- server, client = self._connected_pair()
- ep = select.epoll(2)
- try:
- ep.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
- ep.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
- finally:
- ep.close()
- # adding by object w/ fileno works, too.
- ep = select.epoll(2)
- try:
- ep.register(server, select.EPOLLIN | select.EPOLLOUT)
- ep.register(client, select.EPOLLIN | select.EPOLLOUT)
- finally:
- ep.close()
- ep = select.epoll(2)
- try:
- # TypeError: argument must be an int, or have a fileno() method.
- self.assertRaises(TypeError, ep.register, object(),
- select.EPOLLIN | select.EPOLLOUT)
- self.assertRaises(TypeError, ep.register, None,
- select.EPOLLIN | select.EPOLLOUT)
- # ValueError: file descriptor cannot be a negative integer (-1)
- self.assertRaises(ValueError, ep.register, -1,
- select.EPOLLIN | select.EPOLLOUT)
- # IOError: [Errno 9] Bad file descriptor
- self.assertRaises(IOError, ep.register, 10000,
- select.EPOLLIN | select.EPOLLOUT)
- # registering twice also raises an exception
- ep.register(server, select.EPOLLIN | select.EPOLLOUT)
- self.assertRaises(IOError, ep.register, server,
- select.EPOLLIN | select.EPOLLOUT)
- finally:
- ep.close()
- def test_fromfd(self):
- server, client = self._connected_pair()
- ep = select.epoll(2)
- ep2 = select.epoll.fromfd(ep.fileno())
- ep2.register(server.fileno(), select.EPOLLIN | select.EPOLLOUT)
- ep2.register(client.fileno(), select.EPOLLIN | select.EPOLLOUT)
- events = ep.poll(1, 4)
- events2 = ep2.poll(0.9, 4)
- self.assertEqual(len(events), 2)
- self.assertEqual(len(events2), 2)
- ep.close()
- try:
- ep2.poll(1, 4)
- except IOError, e:
- self.assertEqual(e.args[0], errno.EBADF, e)
- else:
- self.fail("epoll on closed fd didn't raise EBADF")
- def test_control_and_wait(self):
- client, server = self._connected_pair()
- ep = select.epoll(16)
- ep.register(server.fileno(),
- select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
- ep.register(client.fileno(),
- select.EPOLLIN | select.EPOLLOUT | select.EPOLLET)
- now = time.time()
- events = ep.poll(1, 4)
- then = time.time()
- self.assertFalse(then - now > 0.1, then - now)
- events.sort()
- expected = [(client.fileno(), select.EPOLLOUT),
- (server.fileno(), select.EPOLLOUT)]
- expected.sort()
- self.assertEqual(events, expected)
- events = ep.poll(timeout=2.1, maxevents=4)
- self.assertFalse(events)
- client.send("Hello!")
- server.send("world!!!")
- now = time.time()
- events = ep.poll(1, 4)
- then = time.time()
- self.assertFalse(then - now > 0.01)
- events.sort()
- expected = [(client.fileno(), select.EPOLLIN | select.EPOLLOUT),
- (server.fileno(), select.EPOLLIN | select.EPOLLOUT)]
- expected.sort()
- self.assertEqual(events, expected)
- ep.unregister(client.fileno())
- ep.modify(server.fileno(), select.EPOLLOUT)
- now = time.time()
- events = ep.poll(1, 4)
- then = time.time()
- self.assertFalse(then - now > 0.01)
- expected = [(server.fileno(), select.EPOLLOUT)]
- self.assertEqual(events, expected)
- def test_errors(self):
- self.assertRaises(ValueError, select.epoll, -2)
- self.assertRaises(ValueError, select.epoll().register, -1,
- select.EPOLLIN)
- def test_unregister_closed(self):
- server, client = self._connected_pair()
- fd = server.fileno()
- ep = select.epoll(16)
- ep.register(server)
- now = time.time()
- events = ep.poll(1, 4)
- then = time.time()
- self.assertFalse(then - now > 0.01)
- server.close()
- ep.unregister(fd)
- def test_main():
- test_support.run_unittest(TestEPoll)
- if __name__ == "__main__":
- test_main()
|