1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630163116321633163416351636163716381639164016411642164316441645164616471648164916501651165216531654165516561657165816591660166116621663166416651666166716681669167016711672167316741675167616771678167916801681168216831684168516861687168816891690169116921693169416951696169716981699170017011702170317041705170617071708170917101711171217131714171517161717171817191720172117221723172417251726172717281729173017311732173317341735173617371738173917401741174217431744174517461747174817491750175117521753175417551756175717581759176017611762176317641765176617671768176917701771177217731774177517761777177817791780178117821783178417851786178717881789179017911792179317941795179617971798179918001801180218031804 |
- import unittest
- from test import test_support
- import errno
- import itertools
- import socket
- import select
- import time
- import traceback
- import Queue
- import sys
- import os
- import array
- import contextlib
- import signal
- import math
- import weakref
- try:
- import _socket
- except ImportError:
- _socket = None
- def try_address(host, port=0, family=socket.AF_INET):
- """Try to bind a socket on the given host:port and return True
- if that has been possible."""
- try:
- sock = socket.socket(family, socket.SOCK_STREAM)
- sock.bind((host, port))
- except (socket.error, socket.gaierror):
- return False
- else:
- sock.close()
- return True
- HOST = test_support.HOST
- MSG = b'Michael Gilfix was here\n'
- SUPPORTS_IPV6 = socket.has_ipv6 and try_address('::1', family=socket.AF_INET6)
- try:
- import thread
- import threading
- except ImportError:
- thread = None
- threading = None
- HOST = test_support.HOST
- MSG = 'Michael Gilfix was here\n'
- class SocketTCPTest(unittest.TestCase):
- def setUp(self):
- self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.port = test_support.bind_port(self.serv)
- self.serv.listen(1)
- def tearDown(self):
- self.serv.close()
- self.serv = None
- class SocketUDPTest(unittest.TestCase):
- def setUp(self):
- self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.port = test_support.bind_port(self.serv)
- def tearDown(self):
- self.serv.close()
- self.serv = None
- class ThreadableTest:
- """Threadable Test class
- The ThreadableTest class makes it easy to create a threaded
- client/server pair from an existing unit test. To create a
- new threaded class from an existing unit test, use multiple
- inheritance:
- class NewClass (OldClass, ThreadableTest):
- pass
- This class defines two new fixture functions with obvious
- purposes for overriding:
- clientSetUp ()
- clientTearDown ()
- Any new test functions within the class must then define
- tests in pairs, where the test name is preceded with a
- '_' to indicate the client portion of the test. Ex:
- def testFoo(self):
- # Server portion
- def _testFoo(self):
- # Client portion
- Any exceptions raised by the clients during their tests
- are caught and transferred to the main thread to alert
- the testing framework.
- Note, the server setup function cannot call any blocking
- functions that rely on the client thread during setup,
- unless serverExplicitReady() is called just before
- the blocking call (such as in setting up a client/server
- connection and performing the accept() in setUp().
- """
- def __init__(self):
- # Swap the true setup function
- self.__setUp = self.setUp
- self.__tearDown = self.tearDown
- self.setUp = self._setUp
- self.tearDown = self._tearDown
- def serverExplicitReady(self):
- """This method allows the server to explicitly indicate that
- it wants the client thread to proceed. This is useful if the
- server is about to execute a blocking routine that is
- dependent upon the client thread during its setup routine."""
- self.server_ready.set()
- def _setUp(self):
- self.server_ready = threading.Event()
- self.client_ready = threading.Event()
- self.done = threading.Event()
- self.queue = Queue.Queue(1)
- # Do some munging to start the client test.
- methodname = self.id()
- i = methodname.rfind('.')
- methodname = methodname[i+1:]
- test_method = getattr(self, '_' + methodname)
- self.client_thread = thread.start_new_thread(
- self.clientRun, (test_method,))
- self.__setUp()
- if not self.server_ready.is_set():
- self.server_ready.set()
- self.client_ready.wait()
- def _tearDown(self):
- self.__tearDown()
- self.done.wait()
- if not self.queue.empty():
- msg = self.queue.get()
- self.fail(msg)
- def clientRun(self, test_func):
- self.server_ready.wait()
- self.clientSetUp()
- self.client_ready.set()
- if not callable(test_func):
- raise TypeError("test_func must be a callable function.")
- try:
- test_func()
- except Exception, strerror:
- self.queue.put(strerror)
- self.clientTearDown()
- def clientSetUp(self):
- raise NotImplementedError("clientSetUp must be implemented.")
- def clientTearDown(self):
- self.done.set()
- thread.exit()
- class ThreadedTCPSocketTest(SocketTCPTest, ThreadableTest):
- def __init__(self, methodName='runTest'):
- SocketTCPTest.__init__(self, methodName=methodName)
- ThreadableTest.__init__(self)
- def clientSetUp(self):
- self.cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- def clientTearDown(self):
- self.cli.close()
- self.cli = None
- ThreadableTest.clientTearDown(self)
- class ThreadedUDPSocketTest(SocketUDPTest, ThreadableTest):
- def __init__(self, methodName='runTest'):
- SocketUDPTest.__init__(self, methodName=methodName)
- ThreadableTest.__init__(self)
- def clientSetUp(self):
- self.cli = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- def clientTearDown(self):
- self.cli.close()
- self.cli = None
- ThreadableTest.clientTearDown(self)
- class SocketConnectedTest(ThreadedTCPSocketTest):
- def __init__(self, methodName='runTest'):
- ThreadedTCPSocketTest.__init__(self, methodName=methodName)
- def setUp(self):
- ThreadedTCPSocketTest.setUp(self)
- # Indicate explicitly we're ready for the client thread to
- # proceed and then perform the blocking call to accept
- self.serverExplicitReady()
- conn, addr = self.serv.accept()
- self.cli_conn = conn
- def tearDown(self):
- self.cli_conn.close()
- self.cli_conn = None
- ThreadedTCPSocketTest.tearDown(self)
- def clientSetUp(self):
- ThreadedTCPSocketTest.clientSetUp(self)
- self.cli.connect((HOST, self.port))
- self.serv_conn = self.cli
- def clientTearDown(self):
- self.serv_conn.close()
- self.serv_conn = None
- ThreadedTCPSocketTest.clientTearDown(self)
- class SocketPairTest(unittest.TestCase, ThreadableTest):
- def __init__(self, methodName='runTest'):
- unittest.TestCase.__init__(self, methodName=methodName)
- ThreadableTest.__init__(self)
- def setUp(self):
- self.serv, self.cli = socket.socketpair()
- def tearDown(self):
- self.serv.close()
- self.serv = None
- def clientSetUp(self):
- pass
- def clientTearDown(self):
- self.cli.close()
- self.cli = None
- ThreadableTest.clientTearDown(self)
- #######################################################################
- ## Begin Tests
- class GeneralModuleTests(unittest.TestCase):
- @unittest.skipUnless(_socket is not None, 'need _socket module')
- def test_csocket_repr(self):
- s = _socket.socket(_socket.AF_INET, _socket.SOCK_STREAM)
- try:
- expected = ('<socket object, fd=%s, family=%s, type=%s, protocol=%s>'
- % (s.fileno(), s.family, s.type, s.proto))
- self.assertEqual(repr(s), expected)
- finally:
- s.close()
- expected = ('<socket object, fd=-1, family=%s, type=%s, protocol=%s>'
- % (s.family, s.type, s.proto))
- self.assertEqual(repr(s), expected)
- def test_weakref(self):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- p = weakref.proxy(s)
- self.assertEqual(p.fileno(), s.fileno())
- s.close()
- s = None
- try:
- p.fileno()
- except ReferenceError:
- pass
- else:
- self.fail('Socket proxy still exists')
- def test_weakref__sock(self):
- s = socket.socket()._sock
- w = weakref.ref(s)
- self.assertIs(w(), s)
- del s
- test_support.gc_collect()
- self.assertIsNone(w())
- def testSocketError(self):
- # Testing socket module exceptions
- def raise_error(*args, **kwargs):
- raise socket.error
- def raise_herror(*args, **kwargs):
- raise socket.herror
- def raise_gaierror(*args, **kwargs):
- raise socket.gaierror
- self.assertRaises(socket.error, raise_error,
- "Error raising socket exception.")
- self.assertRaises(socket.error, raise_herror,
- "Error raising socket exception.")
- self.assertRaises(socket.error, raise_gaierror,
- "Error raising socket exception.")
- def testSendtoErrors(self):
- # Testing that sendto doens't masks failures. See #10169.
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.addCleanup(s.close)
- s.bind(('', 0))
- sockname = s.getsockname()
- # 2 args
- with self.assertRaises(UnicodeEncodeError):
- s.sendto(u'\u2620', sockname)
- with self.assertRaises(TypeError) as cm:
- s.sendto(5j, sockname)
- self.assertIn('not complex', str(cm.exception))
- with self.assertRaises(TypeError) as cm:
- s.sendto('foo', None)
- self.assertIn('not NoneType', str(cm.exception))
- # 3 args
- with self.assertRaises(UnicodeEncodeError):
- s.sendto(u'\u2620', 0, sockname)
- with self.assertRaises(TypeError) as cm:
- s.sendto(5j, 0, sockname)
- self.assertIn('not complex', str(cm.exception))
- with self.assertRaises(TypeError) as cm:
- s.sendto('foo', 0, None)
- self.assertIn('not NoneType', str(cm.exception))
- with self.assertRaises(TypeError) as cm:
- s.sendto('foo', 'bar', sockname)
- self.assertIn('an integer is required', str(cm.exception))
- with self.assertRaises(TypeError) as cm:
- s.sendto('foo', None, None)
- self.assertIn('an integer is required', str(cm.exception))
- # wrong number of args
- with self.assertRaises(TypeError) as cm:
- s.sendto('foo')
- self.assertIn('(1 given)', str(cm.exception))
- with self.assertRaises(TypeError) as cm:
- s.sendto('foo', 0, sockname, 4)
- self.assertIn('(4 given)', str(cm.exception))
- def testCrucialConstants(self):
- # Testing for mission critical constants
- socket.AF_INET
- socket.SOCK_STREAM
- socket.SOCK_DGRAM
- socket.SOCK_RAW
- socket.SOCK_RDM
- socket.SOCK_SEQPACKET
- socket.SOL_SOCKET
- socket.SO_REUSEADDR
- def testHostnameRes(self):
- # Testing hostname resolution mechanisms
- hostname = socket.gethostname()
- try:
- ip = socket.gethostbyname(hostname)
- except socket.error:
- # Probably name lookup wasn't set up right; skip this test
- self.skipTest('name lookup failure')
- self.assertTrue(ip.find('.') >= 0, "Error resolving host to ip.")
- try:
- hname, aliases, ipaddrs = socket.gethostbyaddr(ip)
- except socket.error:
- # Probably a similar problem as above; skip this test
- self.skipTest('address lookup failure')
- all_host_names = [hostname, hname] + aliases
- fqhn = socket.getfqdn(ip)
- if not fqhn in all_host_names:
- self.fail("Error testing host resolution mechanisms. (fqdn: %s, all: %s)" % (fqhn, repr(all_host_names)))
- @unittest.skipUnless(hasattr(sys, 'getrefcount'),
- 'test needs sys.getrefcount()')
- def testRefCountGetNameInfo(self):
- # Testing reference count for getnameinfo
- try:
- # On some versions, this loses a reference
- orig = sys.getrefcount(__name__)
- socket.getnameinfo(__name__,0)
- except TypeError:
- self.assertEqual(sys.getrefcount(__name__), orig,
- "socket.getnameinfo loses a reference")
- def testInterpreterCrash(self):
- # Making sure getnameinfo doesn't crash the interpreter
- try:
- # On some versions, this crashes the interpreter.
- socket.getnameinfo(('x', 0, 0, 0), 0)
- except socket.error:
- pass
- def testNtoH(self):
- # This just checks that htons etc. are their own inverse,
- # when looking at the lower 16 or 32 bits.
- sizes = {socket.htonl: 32, socket.ntohl: 32,
- socket.htons: 16, socket.ntohs: 16}
- for func, size in sizes.items():
- mask = (1L<<size) - 1
- for i in (0, 1, 0xffff, ~0xffff, 2, 0x01234567, 0x76543210):
- self.assertEqual(i & mask, func(func(i&mask)) & mask)
- swapped = func(mask)
- self.assertEqual(swapped & mask, mask)
- self.assertRaises(OverflowError, func, 1L<<34)
- def testNtoHErrors(self):
- good_values = [ 1, 2, 3, 1L, 2L, 3L ]
- bad_values = [ -1, -2, -3, -1L, -2L, -3L ]
- for k in good_values:
- socket.ntohl(k)
- socket.ntohs(k)
- socket.htonl(k)
- socket.htons(k)
- for k in bad_values:
- self.assertRaises(OverflowError, socket.ntohl, k)
- self.assertRaises(OverflowError, socket.ntohs, k)
- self.assertRaises(OverflowError, socket.htonl, k)
- self.assertRaises(OverflowError, socket.htons, k)
- def testGetServBy(self):
- eq = self.assertEqual
- # Find one service that exists, then check all the related interfaces.
- # I've ordered this by protocols that have both a tcp and udp
- # protocol, at least for modern Linuxes.
- if (sys.platform.startswith('linux') or
- sys.platform.startswith('freebsd') or
- sys.platform.startswith('netbsd') or
- sys.platform == 'darwin'):
- # avoid the 'echo' service on this platform, as there is an
- # assumption breaking non-standard port/protocol entry
- services = ('daytime', 'qotd', 'domain')
- else:
- services = ('echo', 'daytime', 'domain')
- for service in services:
- try:
- port = socket.getservbyname(service, 'tcp')
- break
- except socket.error:
- pass
- else:
- raise socket.error
- # Try same call with optional protocol omitted
- port2 = socket.getservbyname(service)
- eq(port, port2)
- # Try udp, but don't barf if it doesn't exist
- try:
- udpport = socket.getservbyname(service, 'udp')
- except socket.error:
- udpport = None
- else:
- eq(udpport, port)
- # Now make sure the lookup by port returns the same service name
- eq(socket.getservbyport(port2), service)
- eq(socket.getservbyport(port, 'tcp'), service)
- if udpport is not None:
- eq(socket.getservbyport(udpport, 'udp'), service)
- # Make sure getservbyport does not accept out of range ports.
- self.assertRaises(OverflowError, socket.getservbyport, -1)
- self.assertRaises(OverflowError, socket.getservbyport, 65536)
- def testDefaultTimeout(self):
- # Testing default timeout
- # The default timeout should initially be None
- self.assertEqual(socket.getdefaulttimeout(), None)
- s = socket.socket()
- self.assertEqual(s.gettimeout(), None)
- s.close()
- # Set the default timeout to 10, and see if it propagates
- socket.setdefaulttimeout(10)
- self.assertEqual(socket.getdefaulttimeout(), 10)
- s = socket.socket()
- self.assertEqual(s.gettimeout(), 10)
- s.close()
- # Reset the default timeout to None, and see if it propagates
- socket.setdefaulttimeout(None)
- self.assertEqual(socket.getdefaulttimeout(), None)
- s = socket.socket()
- self.assertEqual(s.gettimeout(), None)
- s.close()
- # Check that setting it to an invalid value raises ValueError
- self.assertRaises(ValueError, socket.setdefaulttimeout, -1)
- # Check that setting it to an invalid type raises TypeError
- self.assertRaises(TypeError, socket.setdefaulttimeout, "spam")
- @unittest.skipUnless(hasattr(socket, 'inet_aton'),
- 'test needs socket.inet_aton()')
- def testIPv4_inet_aton_fourbytes(self):
- # Test that issue1008086 and issue767150 are fixed.
- # It must return 4 bytes.
- self.assertEqual('\x00'*4, socket.inet_aton('0.0.0.0'))
- self.assertEqual('\xff'*4, socket.inet_aton('255.255.255.255'))
- @unittest.skipUnless(hasattr(socket, 'inet_pton'),
- 'test needs socket.inet_pton()')
- def testIPv4toString(self):
- from socket import inet_aton as f, inet_pton, AF_INET
- g = lambda a: inet_pton(AF_INET, a)
- self.assertEqual('\x00\x00\x00\x00', f('0.0.0.0'))
- self.assertEqual('\xff\x00\xff\x00', f('255.0.255.0'))
- self.assertEqual('\xaa\xaa\xaa\xaa', f('170.170.170.170'))
- self.assertEqual('\x01\x02\x03\x04', f('1.2.3.4'))
- self.assertEqual('\xff\xff\xff\xff', f('255.255.255.255'))
- self.assertEqual('\x00\x00\x00\x00', g('0.0.0.0'))
- self.assertEqual('\xff\x00\xff\x00', g('255.0.255.0'))
- self.assertEqual('\xaa\xaa\xaa\xaa', g('170.170.170.170'))
- self.assertEqual('\xff\xff\xff\xff', g('255.255.255.255'))
- @unittest.skipUnless(hasattr(socket, 'inet_pton'),
- 'test needs socket.inet_pton()')
- def testIPv6toString(self):
- try:
- from socket import inet_pton, AF_INET6, has_ipv6
- if not has_ipv6:
- self.skipTest('IPv6 not available')
- except ImportError:
- self.skipTest('could not import needed symbols from socket')
- f = lambda a: inet_pton(AF_INET6, a)
- self.assertEqual('\x00' * 16, f('::'))
- self.assertEqual('\x00' * 16, f('0::0'))
- self.assertEqual('\x00\x01' + '\x00' * 14, f('1::'))
- self.assertEqual(
- '\x45\xef\x76\xcb\x00\x1a\x56\xef\xaf\xeb\x0b\xac\x19\x24\xae\xae',
- f('45ef:76cb:1a:56ef:afeb:bac:1924:aeae')
- )
- @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
- 'test needs socket.inet_ntop()')
- def testStringToIPv4(self):
- from socket import inet_ntoa as f, inet_ntop, AF_INET
- g = lambda a: inet_ntop(AF_INET, a)
- self.assertEqual('1.0.1.0', f('\x01\x00\x01\x00'))
- self.assertEqual('170.85.170.85', f('\xaa\x55\xaa\x55'))
- self.assertEqual('255.255.255.255', f('\xff\xff\xff\xff'))
- self.assertEqual('1.2.3.4', f('\x01\x02\x03\x04'))
- self.assertEqual('1.0.1.0', g('\x01\x00\x01\x00'))
- self.assertEqual('170.85.170.85', g('\xaa\x55\xaa\x55'))
- self.assertEqual('255.255.255.255', g('\xff\xff\xff\xff'))
- @unittest.skipUnless(hasattr(socket, 'inet_ntop'),
- 'test needs socket.inet_ntop()')
- def testStringToIPv6(self):
- try:
- from socket import inet_ntop, AF_INET6, has_ipv6
- if not has_ipv6:
- self.skipTest('IPv6 not available')
- except ImportError:
- self.skipTest('could not import needed symbols from socket')
- f = lambda a: inet_ntop(AF_INET6, a)
- self.assertEqual('::', f('\x00' * 16))
- self.assertEqual('::1', f('\x00' * 15 + '\x01'))
- self.assertEqual(
- 'aef:b01:506:1001:ffff:9997:55:170',
- f('\x0a\xef\x0b\x01\x05\x06\x10\x01\xff\xff\x99\x97\x00\x55\x01\x70')
- )
- # XXX The following don't test module-level functionality...
- def _get_unused_port(self, bind_address='0.0.0.0'):
- """Use a temporary socket to elicit an unused ephemeral port.
- Args:
- bind_address: Hostname or IP address to search for a port on.
- Returns: A most likely to be unused port.
- """
- tempsock = socket.socket()
- tempsock.bind((bind_address, 0))
- host, port = tempsock.getsockname()
- tempsock.close()
- return port
- def testSockName(self):
- # Testing getsockname()
- port = self._get_unused_port()
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.addCleanup(sock.close)
- sock.bind(("0.0.0.0", port))
- name = sock.getsockname()
- # XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
- # it reasonable to get the host's addr in addition to 0.0.0.0.
- # At least for eCos. This is required for the S/390 to pass.
- try:
- my_ip_addr = socket.gethostbyname(socket.gethostname())
- except socket.error:
- # Probably name lookup wasn't set up right; skip this test
- self.skipTest('name lookup failure')
- self.assertIn(name[0], ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
- self.assertEqual(name[1], port)
- def testGetSockOpt(self):
- # Testing getsockopt()
- # We know a socket should start without reuse==0
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.addCleanup(sock.close)
- reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
- self.assertFalse(reuse != 0, "initial mode is reuse")
- def testSetSockOpt(self):
- # Testing setsockopt()
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.addCleanup(sock.close)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- reuse = sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR)
- self.assertFalse(reuse == 0, "failed to set reuse mode")
- def testSendAfterClose(self):
- # testing send() after close() with timeout
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(1)
- sock.close()
- self.assertRaises(socket.error, sock.send, "spam")
- def testNewAttributes(self):
- # testing .family, .type and .protocol
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.assertEqual(sock.family, socket.AF_INET)
- self.assertEqual(sock.type, socket.SOCK_STREAM)
- self.assertEqual(sock.proto, 0)
- sock.close()
- def test_getsockaddrarg(self):
- sock = socket.socket()
- self.addCleanup(sock.close)
- port = test_support.find_unused_port()
- big_port = port + 65536
- neg_port = port - 65536
- self.assertRaises(OverflowError, sock.bind, (HOST, big_port))
- self.assertRaises(OverflowError, sock.bind, (HOST, neg_port))
- # Since find_unused_port() is inherently subject to race conditions, we
- # call it a couple times if necessary.
- for i in itertools.count():
- port = test_support.find_unused_port()
- try:
- sock.bind((HOST, port))
- except OSError as e:
- if e.errno != errno.EADDRINUSE or i == 5:
- raise
- else:
- break
- @unittest.skipUnless(os.name == "nt", "Windows specific")
- def test_sock_ioctl(self):
- self.assertTrue(hasattr(socket.socket, 'ioctl'))
- self.assertTrue(hasattr(socket, 'SIO_RCVALL'))
- self.assertTrue(hasattr(socket, 'RCVALL_ON'))
- self.assertTrue(hasattr(socket, 'RCVALL_OFF'))
- self.assertTrue(hasattr(socket, 'SIO_KEEPALIVE_VALS'))
- s = socket.socket()
- self.addCleanup(s.close)
- self.assertRaises(ValueError, s.ioctl, -1, None)
- s.ioctl(socket.SIO_KEEPALIVE_VALS, (1, 100, 100))
- def testGetaddrinfo(self):
- try:
- socket.getaddrinfo('localhost', 80)
- except socket.gaierror as err:
- if err.errno == socket.EAI_SERVICE:
- # see http://bugs.python.org/issue1282647
- self.skipTest("buggy libc version")
- raise
- # len of every sequence is supposed to be == 5
- for info in socket.getaddrinfo(HOST, None):
- self.assertEqual(len(info), 5)
- # host can be a domain name, a string representation of an
- # IPv4/v6 address or None
- socket.getaddrinfo('localhost', 80)
- socket.getaddrinfo('127.0.0.1', 80)
- socket.getaddrinfo(None, 80)
- if SUPPORTS_IPV6:
- socket.getaddrinfo('::1', 80)
- # port can be a string service name such as "http", a numeric
- # port number (int or long), or None
- socket.getaddrinfo(HOST, "http")
- socket.getaddrinfo(HOST, 80)
- socket.getaddrinfo(HOST, 80L)
- socket.getaddrinfo(HOST, None)
- # test family and socktype filters
- infos = socket.getaddrinfo(HOST, None, socket.AF_INET)
- for family, _, _, _, _ in infos:
- self.assertEqual(family, socket.AF_INET)
- infos = socket.getaddrinfo(HOST, None, 0, socket.SOCK_STREAM)
- for _, socktype, _, _, _ in infos:
- self.assertEqual(socktype, socket.SOCK_STREAM)
- # test proto and flags arguments
- socket.getaddrinfo(HOST, None, 0, 0, socket.SOL_TCP)
- socket.getaddrinfo(HOST, None, 0, 0, 0, socket.AI_PASSIVE)
- # a server willing to support both IPv4 and IPv6 will
- # usually do this
- socket.getaddrinfo(None, 0, socket.AF_UNSPEC, socket.SOCK_STREAM, 0,
- socket.AI_PASSIVE)
- # Issue 17269: test workaround for OS X platform bug segfault
- if hasattr(socket, 'AI_NUMERICSERV'):
- try:
- # The arguments here are undefined and the call may succeed
- # or fail. All we care here is that it doesn't segfault.
- socket.getaddrinfo("localhost", None, 0, 0, 0,
- socket.AI_NUMERICSERV)
- except socket.gaierror:
- pass
- def check_sendall_interrupted(self, with_timeout):
- # socketpair() is not strictly required, but it makes things easier.
- if not hasattr(signal, 'alarm') or not hasattr(socket, 'socketpair'):
- self.skipTest("signal.alarm and socket.socketpair required for this test")
- # Our signal handlers clobber the C errno by calling a math function
- # with an invalid domain value.
- def ok_handler(*args):
- self.assertRaises(ValueError, math.acosh, 0)
- def raising_handler(*args):
- self.assertRaises(ValueError, math.acosh, 0)
- 1 // 0
- c, s = socket.socketpair()
- old_alarm = signal.signal(signal.SIGALRM, raising_handler)
- try:
- if with_timeout:
- # Just above the one second minimum for signal.alarm
- c.settimeout(1.5)
- with self.assertRaises(ZeroDivisionError):
- signal.alarm(1)
- c.sendall(b"x" * test_support.SOCK_MAX_SIZE)
- if with_timeout:
- signal.signal(signal.SIGALRM, ok_handler)
- signal.alarm(1)
- self.assertRaises(socket.timeout, c.sendall,
- b"x" * test_support.SOCK_MAX_SIZE)
- finally:
- signal.signal(signal.SIGALRM, old_alarm)
- c.close()
- s.close()
- def test_sendall_interrupted(self):
- self.check_sendall_interrupted(False)
- def test_sendall_interrupted_with_timeout(self):
- self.check_sendall_interrupted(True)
- def test_listen_backlog(self):
- for backlog in 0, -1:
- srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- srv.bind((HOST, 0))
- srv.listen(backlog)
- srv.close()
- @test_support.cpython_only
- def test_listen_backlog_overflow(self):
- # Issue 15989
- import _testcapi
- srv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- srv.bind((HOST, 0))
- self.assertRaises(OverflowError, srv.listen, _testcapi.INT_MAX + 1)
- srv.close()
- @unittest.skipUnless(SUPPORTS_IPV6, 'IPv6 required for this test.')
- def test_flowinfo(self):
- self.assertRaises(OverflowError, socket.getnameinfo,
- ('::1',0, 0xffffffff), 0)
- s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
- try:
- self.assertRaises(OverflowError, s.bind, ('::1', 0, -10))
- finally:
- s.close()
- @unittest.skipUnless(thread, 'Threading required for this test.')
- class BasicTCPTest(SocketConnectedTest):
- def __init__(self, methodName='runTest'):
- SocketConnectedTest.__init__(self, methodName=methodName)
- def testRecv(self):
- # Testing large receive over TCP
- msg = self.cli_conn.recv(1024)
- self.assertEqual(msg, MSG)
- def _testRecv(self):
- self.serv_conn.send(MSG)
- def testOverFlowRecv(self):
- # Testing receive in chunks over TCP
- seg1 = self.cli_conn.recv(len(MSG) - 3)
- seg2 = self.cli_conn.recv(1024)
- msg = seg1 + seg2
- self.assertEqual(msg, MSG)
- def _testOverFlowRecv(self):
- self.serv_conn.send(MSG)
- def testRecvFrom(self):
- # Testing large recvfrom() over TCP
- msg, addr = self.cli_conn.recvfrom(1024)
- self.assertEqual(msg, MSG)
- def _testRecvFrom(self):
- self.serv_conn.send(MSG)
- def testOverFlowRecvFrom(self):
- # Testing recvfrom() in chunks over TCP
- seg1, addr = self.cli_conn.recvfrom(len(MSG)-3)
- seg2, addr = self.cli_conn.recvfrom(1024)
- msg = seg1 + seg2
- self.assertEqual(msg, MSG)
- def _testOverFlowRecvFrom(self):
- self.serv_conn.send(MSG)
- def testSendAll(self):
- # Testing sendall() with a 2048 byte string over TCP
- msg = ''
- while 1:
- read = self.cli_conn.recv(1024)
- if not read:
- break
- msg += read
- self.assertEqual(msg, 'f' * 2048)
- def _testSendAll(self):
- big_chunk = 'f' * 2048
- self.serv_conn.sendall(big_chunk)
- @unittest.skipUnless(hasattr(socket, 'fromfd'),
- 'socket.fromfd not available')
- def testFromFd(self):
- # Testing fromfd()
- fd = self.cli_conn.fileno()
- sock = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
- self.addCleanup(sock.close)
- msg = sock.recv(1024)
- self.assertEqual(msg, MSG)
- def _testFromFd(self):
- self.serv_conn.send(MSG)
- def testDup(self):
- # Testing dup()
- sock = self.cli_conn.dup()
- self.addCleanup(sock.close)
- msg = sock.recv(1024)
- self.assertEqual(msg, MSG)
- def _testDup(self):
- self.serv_conn.send(MSG)
- def testShutdown(self):
- # Testing shutdown()
- msg = self.cli_conn.recv(1024)
- self.assertEqual(msg, MSG)
- # wait for _testShutdown to finish: on OS X, when the server
- # closes the connection the client also becomes disconnected,
- # and the client's shutdown call will fail. (Issue #4397.)
- self.done.wait()
- def _testShutdown(self):
- self.serv_conn.send(MSG)
- self.serv_conn.shutdown(2)
- testShutdown_overflow = test_support.cpython_only(testShutdown)
- @test_support.cpython_only
- def _testShutdown_overflow(self):
- import _testcapi
- self.serv_conn.send(MSG)
- # Issue 15989
- self.assertRaises(OverflowError, self.serv_conn.shutdown,
- _testcapi.INT_MAX + 1)
- self.assertRaises(OverflowError, self.serv_conn.shutdown,
- 2 + (_testcapi.UINT_MAX + 1))
- self.serv_conn.shutdown(2)
- @unittest.skipUnless(thread, 'Threading required for this test.')
- class BasicUDPTest(ThreadedUDPSocketTest):
- def __init__(self, methodName='runTest'):
- ThreadedUDPSocketTest.__init__(self, methodName=methodName)
- def testSendtoAndRecv(self):
- # Testing sendto() and Recv() over UDP
- msg = self.serv.recv(len(MSG))
- self.assertEqual(msg, MSG)
- def _testSendtoAndRecv(self):
- self.cli.sendto(MSG, 0, (HOST, self.port))
- def testRecvFrom(self):
- # Testing recvfrom() over UDP
- msg, addr = self.serv.recvfrom(len(MSG))
- self.assertEqual(msg, MSG)
- def _testRecvFrom(self):
- self.cli.sendto(MSG, 0, (HOST, self.port))
- def testRecvFromNegative(self):
- # Negative lengths passed to recvfrom should give ValueError.
- self.assertRaises(ValueError, self.serv.recvfrom, -1)
- def _testRecvFromNegative(self):
- self.cli.sendto(MSG, 0, (HOST, self.port))
- @unittest.skipUnless(thread, 'Threading required for this test.')
- class TCPCloserTest(ThreadedTCPSocketTest):
- def testClose(self):
- conn, addr = self.serv.accept()
- conn.close()
- sd = self.cli
- read, write, err = select.select([sd], [], [], 1.0)
- self.assertEqual(read, [sd])
- self.assertEqual(sd.recv(1), '')
- def _testClose(self):
- self.cli.connect((HOST, self.port))
- time.sleep(1.0)
- @unittest.skipUnless(hasattr(socket, 'socketpair'),
- 'test needs socket.socketpair()')
- @unittest.skipUnless(thread, 'Threading required for this test.')
- class BasicSocketPairTest(SocketPairTest):
- def __init__(self, methodName='runTest'):
- SocketPairTest.__init__(self, methodName=methodName)
- def testRecv(self):
- msg = self.serv.recv(1024)
- self.assertEqual(msg, MSG)
- def _testRecv(self):
- self.cli.send(MSG)
- def testSend(self):
- self.serv.send(MSG)
- def _testSend(self):
- msg = self.cli.recv(1024)
- self.assertEqual(msg, MSG)
- @unittest.skipUnless(thread, 'Threading required for this test.')
- class NonBlockingTCPTests(ThreadedTCPSocketTest):
- def __init__(self, methodName='runTest'):
- ThreadedTCPSocketTest.__init__(self, methodName=methodName)
- def testSetBlocking(self):
- # Testing whether set blocking works
- self.serv.setblocking(True)
- self.assertIsNone(self.serv.gettimeout())
- self.serv.setblocking(False)
- self.assertEqual(self.serv.gettimeout(), 0.0)
- start = time.time()
- try:
- self.serv.accept()
- except socket.error:
- pass
- end = time.time()
- self.assertTrue((end - start) < 1.0, "Error setting non-blocking mode.")
- def _testSetBlocking(self):
- pass
- @test_support.cpython_only
- def testSetBlocking_overflow(self):
- # Issue 15989
- import _testcapi
- if _testcapi.UINT_MAX >= _testcapi.ULONG_MAX:
- self.skipTest('needs UINT_MAX < ULONG_MAX')
- self.serv.setblocking(False)
- self.assertEqual(self.serv.gettimeout(), 0.0)
- self.serv.setblocking(_testcapi.UINT_MAX + 1)
- self.assertIsNone(self.serv.gettimeout())
- _testSetBlocking_overflow = test_support.cpython_only(_testSetBlocking)
- def testAccept(self):
- # Testing non-blocking accept
- self.serv.setblocking(0)
- try:
- conn, addr = self.serv.accept()
- except socket.error:
- pass
- else:
- self.fail("Error trying to do non-blocking accept.")
- read, write, err = select.select([self.serv], [], [])
- if self.serv in read:
- conn, addr = self.serv.accept()
- conn.close()
- else:
- self.fail("Error trying to do accept after select.")
- def _testAccept(self):
- time.sleep(0.1)
- self.cli.connect((HOST, self.port))
- def testConnect(self):
- # Testing non-blocking connect
- conn, addr = self.serv.accept()
- conn.close()
- def _testConnect(self):
- self.cli.settimeout(10)
- self.cli.connect((HOST, self.port))
- def testRecv(self):
- # Testing non-blocking recv
- conn, addr = self.serv.accept()
- conn.setblocking(0)
- try:
- msg = conn.recv(len(MSG))
- except socket.error:
- pass
- else:
- self.fail("Error trying to do non-blocking recv.")
- read, write, err = select.select([conn], [], [])
- if conn in read:
- msg = conn.recv(len(MSG))
- conn.close()
- self.assertEqual(msg, MSG)
- else:
- self.fail("Error during select call to non-blocking socket.")
- def _testRecv(self):
- self.cli.connect((HOST, self.port))
- time.sleep(0.1)
- self.cli.send(MSG)
- @unittest.skipUnless(thread, 'Threading required for this test.')
- class FileObjectClassTestCase(SocketConnectedTest):
- bufsize = -1 # Use default buffer size
- def __init__(self, methodName='runTest'):
- SocketConnectedTest.__init__(self, methodName=methodName)
- def setUp(self):
- SocketConnectedTest.setUp(self)
- self.serv_file = self.cli_conn.makefile('rb', self.bufsize)
- def tearDown(self):
- self.serv_file.close()
- self.assertTrue(self.serv_file.closed)
- SocketConnectedTest.tearDown(self)
- self.serv_file = None
- def clientSetUp(self):
- SocketConnectedTest.clientSetUp(self)
- self.cli_file = self.serv_conn.makefile('wb')
- def clientTearDown(self):
- self.cli_file.close()
- self.assertTrue(self.cli_file.closed)
- self.cli_file = None
- SocketConnectedTest.clientTearDown(self)
- def testSmallRead(self):
- # Performing small file read test
- first_seg = self.serv_file.read(len(MSG)-3)
- second_seg = self.serv_file.read(3)
- msg = first_seg + second_seg
- self.assertEqual(msg, MSG)
- def _testSmallRead(self):
- self.cli_file.write(MSG)
- self.cli_file.flush()
- def testFullRead(self):
- # read until EOF
- msg = self.serv_file.read()
- self.assertEqual(msg, MSG)
- def _testFullRead(self):
- self.cli_file.write(MSG)
- self.cli_file.close()
- def testUnbufferedRead(self):
- # Performing unbuffered file read test
- buf = ''
- while 1:
- char = self.serv_file.read(1)
- if not char:
- break
- buf += char
- self.assertEqual(buf, MSG)
- def _testUnbufferedRead(self):
- self.cli_file.write(MSG)
- self.cli_file.flush()
- def testReadline(self):
- # Performing file readline test
- line = self.serv_file.readline()
- self.assertEqual(line, MSG)
- def _testReadline(self):
- self.cli_file.write(MSG)
- self.cli_file.flush()
- def testReadlineAfterRead(self):
- a_baloo_is = self.serv_file.read(len("A baloo is"))
- self.assertEqual("A baloo is", a_baloo_is)
- _a_bear = self.serv_file.read(len(" a bear"))
- self.assertEqual(" a bear", _a_bear)
- line = self.serv_file.readline()
- self.assertEqual("\n", line)
- line = self.serv_file.readline()
- self.assertEqual("A BALOO IS A BEAR.\n", line)
- line = self.serv_file.readline()
- self.assertEqual(MSG, line)
- def _testReadlineAfterRead(self):
- self.cli_file.write("A baloo is a bear\n")
- self.cli_file.write("A BALOO IS A BEAR.\n")
- self.cli_file.write(MSG)
- self.cli_file.flush()
- def testReadlineAfterReadNoNewline(self):
- end_of_ = self.serv_file.read(len("End Of "))
- self.assertEqual("End Of ", end_of_)
- line = self.serv_file.readline()
- self.assertEqual("Line", line)
- def _testReadlineAfterReadNoNewline(self):
- self.cli_file.write("End Of Line")
- def testClosedAttr(self):
- self.assertTrue(not self.serv_file.closed)
- def _testClosedAttr(self):
- self.assertTrue(not self.cli_file.closed)
- class FileObjectInterruptedTestCase(unittest.TestCase):
- """Test that the file object correctly handles EINTR internally."""
- class MockSocket(object):
- def __init__(self, recv_funcs=()):
- # A generator that returns callables that we'll call for each
- # call to recv().
- self._recv_step = iter(recv_funcs)
- def recv(self, size):
- return self._recv_step.next()()
- @staticmethod
- def _raise_eintr():
- raise socket.error(errno.EINTR)
- def _test_readline(self, size=-1, **kwargs):
- mock_sock = self.MockSocket(recv_funcs=[
- lambda : "This is the first line\nAnd the sec",
- self._raise_eintr,
- lambda : "ond line is here\n",
- lambda : "",
- ])
- fo = socket._fileobject(mock_sock, **kwargs)
- self.assertEqual(fo.readline(size), "This is the first line\n")
- self.assertEqual(fo.readline(size), "And the second line is here\n")
- def _test_read(self, size=-1, **kwargs):
- mock_sock = self.MockSocket(recv_funcs=[
- lambda : "This is the first line\nAnd the sec",
- self._raise_eintr,
- lambda : "ond line is here\n",
- lambda : "",
- ])
- fo = socket._fileobject(mock_sock, **kwargs)
- self.assertEqual(fo.read(size), "This is the first line\n"
- "And the second line is here\n")
- def test_default(self):
- self._test_readline()
- self._test_readline(size=100)
- self._test_read()
- self._test_read(size=100)
- def test_with_1k_buffer(self):
- self._test_readline(bufsize=1024)
- self._test_readline(size=100, bufsize=1024)
- self._test_read(bufsize=1024)
- self._test_read(size=100, bufsize=1024)
- def _test_readline_no_buffer(self, size=-1):
- mock_sock = self.MockSocket(recv_funcs=[
- lambda : "aa",
- lambda : "\n",
- lambda : "BB",
- self._raise_eintr,
- lambda : "bb",
- lambda : "",
- ])
- fo = socket._fileobject(mock_sock, bufsize=0)
- self.assertEqual(fo.readline(size), "aa\n")
- self.assertEqual(fo.readline(size), "BBbb")
- def test_no_buffer(self):
- self._test_readline_no_buffer()
- self._test_readline_no_buffer(size=4)
- self._test_read(bufsize=0)
- self._test_read(size=100, bufsize=0)
- class UnbufferedFileObjectClassTestCase(FileObjectClassTestCase):
- """Repeat the tests from FileObjectClassTestCase with bufsize==0.
- In this case (and in this case only), it should be possible to
- create a file object, read a line from it, create another file
- object, read another line from it, without loss of data in the
- first file object's buffer. Note that httplib relies on this
- when reading multiple requests from the same socket."""
- bufsize = 0 # Use unbuffered mode
- def testUnbufferedReadline(self):
- # Read a line, create a new file object, read another line with it
- line = self.serv_file.readline() # first line
- self.assertEqual(line, "A. " + MSG) # first line
- self.serv_file = self.cli_conn.makefile('rb', 0)
- line = self.serv_file.readline() # second line
- self.assertEqual(line, "B. " + MSG) # second line
- def _testUnbufferedReadline(self):
- self.cli_file.write("A. " + MSG)
- self.cli_file.write("B. " + MSG)
- self.cli_file.flush()
- class LineBufferedFileObjectClassTestCase(FileObjectClassTestCase):
- bufsize = 1 # Default-buffered for reading; line-buffered for writing
- class SocketMemo(object):
- """A wrapper to keep track of sent data, needed to examine write behaviour"""
- def __init__(self, sock):
- self._sock = sock
- self.sent = []
- def send(self, data, flags=0):
- n = self._sock.send(data, flags)
- self.sent.append(data[:n])
- return n
- def sendall(self, data, flags=0):
- self._sock.sendall(data, flags)
- self.sent.append(data)
- def __getattr__(self, attr):
- return getattr(self._sock, attr)
- def getsent(self):
- return [e.tobytes() if isinstance(e, memoryview) else e for e in self.sent]
- def setUp(self):
- FileObjectClassTestCase.setUp(self)
- self.serv_file._sock = self.SocketMemo(self.serv_file._sock)
- def testLinebufferedWrite(self):
- # Write two lines, in small chunks
- msg = MSG.strip()
- print >> self.serv_file, msg,
- print >> self.serv_file, msg
- # second line:
- print >> self.serv_file, msg,
- print >> self.serv_file, msg,
- print >> self.serv_file, msg
- # third line
- print >> self.serv_file, ''
- self.serv_file.flush()
- msg1 = "%s %s\n"%(msg, msg)
- msg2 = "%s %s %s\n"%(msg, msg, msg)
- msg3 = "\n"
- self.assertEqual(self.serv_file._sock.getsent(), [msg1, msg2, msg3])
- def _testLinebufferedWrite(self):
- msg = MSG.strip()
- msg1 = "%s %s\n"%(msg, msg)
- msg2 = "%s %s %s\n"%(msg, msg, msg)
- msg3 = "\n"
- l1 = self.cli_file.readline()
- self.assertEqual(l1, msg1)
- l2 = self.cli_file.readline()
- self.assertEqual(l2, msg2)
- l3 = self.cli_file.readline()
- self.assertEqual(l3, msg3)
- class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
- bufsize = 2 # Exercise the buffering code
- class NetworkConnectionTest(object):
- """Prove network connection."""
- def clientSetUp(self):
- # We're inherited below by BasicTCPTest2, which also inherits
- # BasicTCPTest, which defines self.port referenced below.
- self.cli = socket.create_connection((HOST, self.port))
- self.serv_conn = self.cli
- class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
- """Tests that NetworkConnection does not break existing TCP functionality.
- """
- class NetworkConnectionNoServer(unittest.TestCase):
- class MockSocket(socket.socket):
- def connect(self, *args):
- raise socket.timeout('timed out')
- @contextlib.contextmanager
- def mocked_socket_module(self):
- """Return a socket which times out on connect"""
- old_socket = socket.socket
- socket.socket = self.MockSocket
- try:
- yield
- finally:
- socket.socket = old_socket
- def test_connect(self):
- port = test_support.find_unused_port()
- cli = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.addCleanup(cli.close)
- with self.assertRaises(socket.error) as cm:
- cli.connect((HOST, port))
- self.assertEqual(cm.exception.errno, errno.ECONNREFUSED)
- def test_create_connection(self):
- # Issue #9792: errors raised by create_connection() should have
- # a proper errno attribute.
- port = test_support.find_unused_port()
- with self.assertRaises(socket.error) as cm:
- socket.create_connection((HOST, port))
- # Issue #16257: create_connection() calls getaddrinfo() against
- # 'localhost'. This may result in an IPV6 addr being returned
- # as well as an IPV4 one:
- # >>> socket.getaddrinfo('localhost', port, 0, SOCK_STREAM)
- # >>> [(2, 2, 0, '', ('127.0.0.1', 41230)),
- # (26, 2, 0, '', ('::1', 41230, 0, 0))]
- #
- # create_connection() enumerates through all the addresses returned
- # and if it doesn't successfully bind to any of them, it propagates
- # the last exception it encountered.
- #
- # On Solaris, ENETUNREACH is returned in this circumstance instead
- # of ECONNREFUSED. So, if that errno exists, add it to our list of
- # expected errnos.
- expected_errnos = [ errno.ECONNREFUSED, ]
- if hasattr(errno, 'ENETUNREACH'):
- expected_errnos.append(errno.ENETUNREACH)
- self.assertIn(cm.exception.errno, expected_errnos)
- def test_create_connection_timeout(self):
- # Issue #9792: create_connection() should not recast timeout errors
- # as generic socket errors.
- with self.mocked_socket_module():
- with self.assertRaises(socket.timeout):
- socket.create_connection((HOST, 1234))
- @unittest.skipUnless(thread, 'Threading required for this test.')
- class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
- def __init__(self, methodName='runTest'):
- SocketTCPTest.__init__(self, methodName=methodName)
- ThreadableTest.__init__(self)
- def clientSetUp(self):
- self.source_port = test_support.find_unused_port()
- def clientTearDown(self):
- self.cli.close()
- self.cli = None
- ThreadableTest.clientTearDown(self)
- def _justAccept(self):
- conn, addr = self.serv.accept()
- conn.close()
- testFamily = _justAccept
- def _testFamily(self):
- self.cli = socket.create_connection((HOST, self.port), timeout=30)
- self.addCleanup(self.cli.close)
- self.assertEqual(self.cli.family, 2)
- testSourceAddress = _justAccept
- def _testSourceAddress(self):
- self.cli = socket.create_connection((HOST, self.port), timeout=30,
- source_address=('', self.source_port))
- self.addCleanup(self.cli.close)
- self.assertEqual(self.cli.getsockname()[1], self.source_port)
- # The port number being used is sufficient to show that the bind()
- # call happened.
- testTimeoutDefault = _justAccept
- def _testTimeoutDefault(self):
- # passing no explicit timeout uses socket's global default
- self.assertTrue(socket.getdefaulttimeout() is None)
- socket.setdefaulttimeout(42)
- try:
- self.cli = socket.create_connection((HOST, self.port))
- self.addCleanup(self.cli.close)
- finally:
- socket.setdefaulttimeout(None)
- self.assertEqual(self.cli.gettimeout(), 42)
- testTimeoutNone = _justAccept
- def _testTimeoutNone(self):
- # None timeout means the same as sock.settimeout(None)
- self.assertTrue(socket.getdefaulttimeout() is None)
- socket.setdefaulttimeout(30)
- try:
- self.cli = socket.create_connection((HOST, self.port), timeout=None)
- self.addCleanup(self.cli.close)
- finally:
- socket.setdefaulttimeout(None)
- self.assertEqual(self.cli.gettimeout(), None)
- testTimeoutValueNamed = _justAccept
- def _testTimeoutValueNamed(self):
- self.cli = socket.create_connection((HOST, self.port), timeout=30)
- self.assertEqual(self.cli.gettimeout(), 30)
- testTimeoutValueNonamed = _justAccept
- def _testTimeoutValueNonamed(self):
- self.cli = socket.create_connection((HOST, self.port), 30)
- self.addCleanup(self.cli.close)
- self.assertEqual(self.cli.gettimeout(), 30)
- @unittest.skipUnless(thread, 'Threading required for this test.')
- class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
- def __init__(self, methodName='runTest'):
- SocketTCPTest.__init__(self, methodName=methodName)
- ThreadableTest.__init__(self)
- def clientSetUp(self):
- pass
- def clientTearDown(self):
- self.cli.close()
- self.cli = None
- ThreadableTest.clientTearDown(self)
- def testInsideTimeout(self):
- conn, addr = self.serv.accept()
- self.addCleanup(conn.close)
- time.sleep(3)
- conn.send("done!")
- testOutsideTimeout = testInsideTimeout
- def _testInsideTimeout(self):
- self.cli = sock = socket.create_connection((HOST, self.port))
- data = sock.recv(5)
- self.assertEqual(data, "done!")
- def _testOutsideTimeout(self):
- self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
- self.assertRaises(socket.timeout, lambda: sock.recv(5))
- class Urllib2FileobjectTest(unittest.TestCase):
- # urllib2.HTTPHandler has "borrowed" socket._fileobject, and requires that
- # it close the socket if the close c'tor argument is true
- def testClose(self):
- class MockSocket:
- closed = False
- def flush(self): pass
- def close(self): self.closed = True
- # must not close unless we request it: the original use of _fileobject
- # by module socket requires that the underlying socket not be closed until
- # the _socketobject that created the _fileobject is closed
- s = MockSocket()
- f = socket._fileobject(s)
- f.close()
- self.assertTrue(not s.closed)
- s = MockSocket()
- f = socket._fileobject(s, close=True)
- f.close()
- self.assertTrue(s.closed)
- class TCPTimeoutTest(SocketTCPTest):
- def testTCPTimeout(self):
- def raise_timeout(*args, **kwargs):
- self.serv.settimeout(1.0)
- self.serv.accept()
- self.assertRaises(socket.timeout, raise_timeout,
- "Error generating a timeout exception (TCP)")
- def testTimeoutZero(self):
- ok = False
- try:
- self.serv.settimeout(0.0)
- foo = self.serv.accept()
- except socket.timeout:
- self.fail("caught timeout instead of error (TCP)")
- except socket.error:
- ok = True
- except:
- self.fail("caught unexpected exception (TCP)")
- if not ok:
- self.fail("accept() returned success when we did not expect it")
- @unittest.skipUnless(hasattr(signal, 'alarm'),
- 'test needs signal.alarm()')
- def testInterruptedTimeout(self):
- # XXX I don't know how to do this test on MSWindows or any other
- # plaform that doesn't support signal.alarm() or os.kill(), though
- # the bug should have existed on all platforms.
- self.serv.settimeout(5.0) # must be longer than alarm
- class Alarm(Exception):
- pass
- def alarm_handler(signal, frame):
- raise Alarm
- old_alarm = signal.signal(signal.SIGALRM, alarm_handler)
- try:
- signal.alarm(2) # POSIX allows alarm to be up to 1 second early
- try:
- foo = self.serv.accept()
- except socket.timeout:
- self.fail("caught timeout instead of Alarm")
- except Alarm:
- pass
- except:
- self.fail("caught other exception instead of Alarm:"
- " %s(%s):\n%s" %
- (sys.exc_info()[:2] + (traceback.format_exc(),)))
- else:
- self.fail("nothing caught")
- finally:
- signal.alarm(0) # shut off alarm
- except Alarm:
- self.fail("got Alarm in wrong place")
- finally:
- # no alarm can be pending. Safe to restore old handler.
- signal.signal(signal.SIGALRM, old_alarm)
- class UDPTimeoutTest(SocketUDPTest):
- def testUDPTimeout(self):
- def raise_timeout(*args, **kwargs):
- self.serv.settimeout(1.0)
- self.serv.recv(1024)
- self.assertRaises(socket.timeout, raise_timeout,
- "Error generating a timeout exception (UDP)")
- def testTimeoutZero(self):
- ok = False
- try:
- self.serv.settimeout(0.0)
- foo = self.serv.recv(1024)
- except socket.timeout:
- self.fail("caught timeout instead of error (UDP)")
- except socket.error:
- ok = True
- except:
- self.fail("caught unexpected exception (UDP)")
- if not ok:
- self.fail("recv() returned success when we did not expect it")
- class TestExceptions(unittest.TestCase):
- def testExceptionTree(self):
- self.assertTrue(issubclass(socket.error, Exception))
- self.assertTrue(issubclass(socket.herror, socket.error))
- self.assertTrue(issubclass(socket.gaierror, socket.error))
- self.assertTrue(issubclass(socket.timeout, socket.error))
- @unittest.skipUnless(sys.platform == 'linux', 'Linux specific test')
- class TestLinuxAbstractNamespace(unittest.TestCase):
- UNIX_PATH_MAX = 108
- def testLinuxAbstractNamespace(self):
- address = "\x00python-test-hello\x00\xff"
- s1 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s1.bind(address)
- s1.listen(1)
- s2 = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s2.connect(s1.getsockname())
- s1.accept()
- self.assertEqual(s1.getsockname(), address)
- self.assertEqual(s2.getpeername(), address)
- def testMaxName(self):
- address = "\x00" + "h" * (self.UNIX_PATH_MAX - 1)
- s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s.bind(address)
- self.assertEqual(s.getsockname(), address)
- def testNameOverflow(self):
- address = "\x00" + "h" * self.UNIX_PATH_MAX
- s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- self.assertRaises(socket.error, s.bind, address)
- @unittest.skipUnless(thread, 'Threading required for this test.')
- class BufferIOTest(SocketConnectedTest):
- """
- Test the buffer versions of socket.recv() and socket.send().
- """
- def __init__(self, methodName='runTest'):
- SocketConnectedTest.__init__(self, methodName=methodName)
- def testRecvIntoArray(self):
- buf = array.array('c', ' '*1024)
- nbytes = self.cli_conn.recv_into(buf)
- self.assertEqual(nbytes, len(MSG))
- msg = buf.tostring()[:len(MSG)]
- self.assertEqual(msg, MSG)
- def _testRecvIntoArray(self):
- with test_support.check_py3k_warnings():
- buf = buffer(MSG)
- self.serv_conn.send(buf)
- def testRecvIntoBytearray(self):
- buf = bytearray(1024)
- nbytes = self.cli_conn.recv_into(buf)
- self.assertEqual(nbytes, len(MSG))
- msg = buf[:len(MSG)]
- self.assertEqual(msg, MSG)
- _testRecvIntoBytearray = _testRecvIntoArray
- def testRecvIntoMemoryview(self):
- buf = bytearray(1024)
- nbytes = self.cli_conn.recv_into(memoryview(buf))
- self.assertEqual(nbytes, len(MSG))
- msg = buf[:len(MSG)]
- self.assertEqual(msg, MSG)
- _testRecvIntoMemoryview = _testRecvIntoArray
- def testRecvFromIntoArray(self):
- buf = array.array('c', ' '*1024)
- nbytes, addr = self.cli_conn.recvfrom_into(buf)
- self.assertEqual(nbytes, len(MSG))
- msg = buf.tostring()[:len(MSG)]
- self.assertEqual(msg, MSG)
- def _testRecvFromIntoArray(self):
- with test_support.check_py3k_warnings():
- buf = buffer(MSG)
- self.serv_conn.send(buf)
- def testRecvFromIntoBytearray(self):
- buf = bytearray(1024)
- nbytes, addr = self.cli_conn.recvfrom_into(buf)
- self.assertEqual(nbytes, len(MSG))
- msg = buf[:len(MSG)]
- self.assertEqual(msg, MSG)
- _testRecvFromIntoBytearray = _testRecvFromIntoArray
- def testRecvFromIntoMemoryview(self):
- buf = bytearray(1024)
- nbytes, addr = self.cli_conn.recvfrom_into(memoryview(buf))
- self.assertEqual(nbytes, len(MSG))
- msg = buf[:len(MSG)]
- self.assertEqual(msg, MSG)
- _testRecvFromIntoMemoryview = _testRecvFromIntoArray
- def testRecvFromIntoSmallBuffer(self):
- # See issue #20246.
- buf = bytearray(8)
- self.assertRaises(ValueError, self.cli_conn.recvfrom_into, buf, 1024)
- def _testRecvFromIntoSmallBuffer(self):
- with test_support.check_py3k_warnings():
- buf = buffer(MSG)
- self.serv_conn.send(buf)
- def testRecvFromIntoEmptyBuffer(self):
- buf = bytearray()
- self.cli_conn.recvfrom_into(buf)
- self.cli_conn.recvfrom_into(buf, 0)
- _testRecvFromIntoEmptyBuffer = _testRecvFromIntoArray
- TIPC_STYPE = 2000
- TIPC_LOWER = 200
- TIPC_UPPER = 210
- def isTipcAvailable():
- """Check if the TIPC module is loaded
- The TIPC module is not loaded automatically on Ubuntu and probably
- other Linux distros.
- """
- if not hasattr(socket, "AF_TIPC"):
- return False
- if not os.path.isfile("/proc/modules"):
- return False
- with open("/proc/modules") as f:
- for line in f:
- if line.startswith("tipc "):
- return True
- return False
- @unittest.skipUnless(isTipcAvailable(),
- "TIPC module is not loaded, please 'sudo modprobe tipc'")
- class TIPCTest(unittest.TestCase):
- def testRDM(self):
- srv = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
- cli = socket.socket(socket.AF_TIPC, socket.SOCK_RDM)
- srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
- TIPC_LOWER, TIPC_UPPER)
- srv.bind(srvaddr)
- sendaddr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
- TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
- cli.sendto(MSG, sendaddr)
- msg, recvaddr = srv.recvfrom(1024)
- self.assertEqual(cli.getsockname(), recvaddr)
- self.assertEqual(msg, MSG)
- @unittest.skipUnless(isTipcAvailable(),
- "TIPC module is not loaded, please 'sudo modprobe tipc'")
- class TIPCThreadableTest(unittest.TestCase, ThreadableTest):
- def __init__(self, methodName = 'runTest'):
- unittest.TestCase.__init__(self, methodName = methodName)
- ThreadableTest.__init__(self)
- def setUp(self):
- self.srv = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
- self.srv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- srvaddr = (socket.TIPC_ADDR_NAMESEQ, TIPC_STYPE,
- TIPC_LOWER, TIPC_UPPER)
- self.srv.bind(srvaddr)
- self.srv.listen(5)
- self.serverExplicitReady()
- self.conn, self.connaddr = self.srv.accept()
- def clientSetUp(self):
- # There is a hittable race between serverExplicitReady() and the
- # accept() call; sleep a little while to avoid it, otherwise
- # we could get an exception
- time.sleep(0.1)
- self.cli = socket.socket(socket.AF_TIPC, socket.SOCK_STREAM)
- addr = (socket.TIPC_ADDR_NAME, TIPC_STYPE,
- TIPC_LOWER + (TIPC_UPPER - TIPC_LOWER) / 2, 0)
- self.cli.connect(addr)
- self.cliaddr = self.cli.getsockname()
- def testStream(self):
- msg = self.conn.recv(1024)
- self.assertEqual(msg, MSG)
- self.assertEqual(self.cliaddr, self.connaddr)
- def _testStream(self):
- self.cli.send(MSG)
- self.cli.close()
- def test_main():
- tests = [GeneralModuleTests, BasicTCPTest, TCPCloserTest, TCPTimeoutTest,
- TestExceptions, BufferIOTest, BasicTCPTest2, BasicUDPTest,
- UDPTimeoutTest ]
- tests.extend([
- NonBlockingTCPTests,
- FileObjectClassTestCase,
- FileObjectInterruptedTestCase,
- UnbufferedFileObjectClassTestCase,
- LineBufferedFileObjectClassTestCase,
- SmallBufferedFileObjectClassTestCase,
- Urllib2FileobjectTest,
- NetworkConnectionNoServer,
- NetworkConnectionAttributesTest,
- NetworkConnectionBehaviourTest,
- ])
- tests.append(BasicSocketPairTest)
- tests.append(TestLinuxAbstractNamespace)
- tests.extend([TIPCTest, TIPCThreadableTest])
- thread_info = test_support.threading_setup()
- test_support.run_unittest(*tests)
- test_support.threading_cleanup(*thread_info)
- if __name__ == "__main__":
- test_main()
|