test_urllib2net.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328
  1. import unittest
  2. from test import test_support
  3. from test.test_urllib2 import sanepathname2url
  4. import socket
  5. import urllib2
  6. import os
  7. import sys
  8. TIMEOUT = 60 # seconds
  9. def _retry_thrice(func, exc, *args, **kwargs):
  10. for i in range(3):
  11. try:
  12. return func(*args, **kwargs)
  13. except exc, last_exc:
  14. continue
  15. except:
  16. raise
  17. raise last_exc
  18. def _wrap_with_retry_thrice(func, exc):
  19. def wrapped(*args, **kwargs):
  20. return _retry_thrice(func, exc, *args, **kwargs)
  21. return wrapped
  22. # Connecting to remote hosts is flaky. Make it more robust by retrying
  23. # the connection several times.
  24. _urlopen_with_retry = _wrap_with_retry_thrice(urllib2.urlopen, urllib2.URLError)
  25. class AuthTests(unittest.TestCase):
  26. """Tests urllib2 authentication features."""
  27. ## Disabled at the moment since there is no page under python.org which
  28. ## could be used to HTTP authentication.
  29. #
  30. # def test_basic_auth(self):
  31. # import httplib
  32. #
  33. # test_url = "http://www.python.org/test/test_urllib2/basic_auth"
  34. # test_hostport = "www.python.org"
  35. # test_realm = 'Test Realm'
  36. # test_user = 'test.test_urllib2net'
  37. # test_password = 'blah'
  38. #
  39. # # failure
  40. # try:
  41. # _urlopen_with_retry(test_url)
  42. # except urllib2.HTTPError, exc:
  43. # self.assertEqual(exc.code, 401)
  44. # else:
  45. # self.fail("urlopen() should have failed with 401")
  46. #
  47. # # success
  48. # auth_handler = urllib2.HTTPBasicAuthHandler()
  49. # auth_handler.add_password(test_realm, test_hostport,
  50. # test_user, test_password)
  51. # opener = urllib2.build_opener(auth_handler)
  52. # f = opener.open('http://localhost/')
  53. # response = _urlopen_with_retry("http://www.python.org/")
  54. #
  55. # # The 'userinfo' URL component is deprecated by RFC 3986 for security
  56. # # reasons, let's not implement it! (it's already implemented for proxy
  57. # # specification strings (that is, URLs or authorities specifying a
  58. # # proxy), so we must keep that)
  59. # self.assertRaises(httplib.InvalidURL,
  60. # urllib2.urlopen, "http://evil:thing@example.com")
  61. class CloseSocketTest(unittest.TestCase):
  62. def test_close(self):
  63. import httplib
  64. # calling .close() on urllib2's response objects should close the
  65. # underlying socket
  66. # delve deep into response to fetch socket._socketobject
  67. response = _urlopen_with_retry("http://www.example.com/")
  68. abused_fileobject = response.fp
  69. self.assertIs(abused_fileobject.__class__, socket._fileobject)
  70. httpresponse = abused_fileobject._sock
  71. self.assertIs(httpresponse.__class__, httplib.HTTPResponse)
  72. fileobject = httpresponse.fp
  73. self.assertIs(fileobject.__class__, socket._fileobject)
  74. self.assertTrue(not fileobject.closed)
  75. response.close()
  76. self.assertTrue(fileobject.closed)
  77. class OtherNetworkTests(unittest.TestCase):
  78. def setUp(self):
  79. if 0: # for debugging
  80. import logging
  81. logger = logging.getLogger("test_urllib2net")
  82. logger.addHandler(logging.StreamHandler())
  83. # XXX The rest of these tests aren't very good -- they don't check much.
  84. # They do sometimes catch some major disasters, though.
  85. def test_ftp(self):
  86. urls = [
  87. 'ftp://ftp.debian.org/debian/README',
  88. ('ftp://ftp.debian.org/debian/non-existent-file',
  89. None, urllib2.URLError),
  90. ]
  91. self._test_urls(urls, self._extra_handlers())
  92. def test_file(self):
  93. TESTFN = test_support.TESTFN
  94. f = open(TESTFN, 'w')
  95. try:
  96. f.write('hi there\n')
  97. f.close()
  98. urls = [
  99. 'file:'+sanepathname2url(os.path.abspath(TESTFN)),
  100. ('file:///nonsensename/etc/passwd', None, urllib2.URLError),
  101. ]
  102. self._test_urls(urls, self._extra_handlers(), retry=True)
  103. finally:
  104. os.remove(TESTFN)
  105. self.assertRaises(ValueError, urllib2.urlopen,'./relative_path/to/file')
  106. # XXX Following test depends on machine configurations that are internal
  107. # to CNRI. Need to set up a public server with the right authentication
  108. # configuration for test purposes.
  109. ## def test_cnri(self):
  110. ## if socket.gethostname() == 'bitdiddle':
  111. ## localhost = 'bitdiddle.cnri.reston.va.us'
  112. ## elif socket.gethostname() == 'bitdiddle.concentric.net':
  113. ## localhost = 'localhost'
  114. ## else:
  115. ## localhost = None
  116. ## if localhost is not None:
  117. ## urls = [
  118. ## 'file://%s/etc/passwd' % localhost,
  119. ## 'http://%s/simple/' % localhost,
  120. ## 'http://%s/digest/' % localhost,
  121. ## 'http://%s/not/found.h' % localhost,
  122. ## ]
  123. ## bauth = HTTPBasicAuthHandler()
  124. ## bauth.add_password('basic_test_realm', localhost, 'jhylton',
  125. ## 'password')
  126. ## dauth = HTTPDigestAuthHandler()
  127. ## dauth.add_password('digest_test_realm', localhost, 'jhylton',
  128. ## 'password')
  129. ## self._test_urls(urls, self._extra_handlers()+[bauth, dauth])
  130. def test_urlwithfrag(self):
  131. urlwith_frag = "http://www.pythontest.net/index.html#frag"
  132. with test_support.transient_internet(urlwith_frag):
  133. req = urllib2.Request(urlwith_frag)
  134. res = urllib2.urlopen(req)
  135. self.assertEqual(res.geturl(),
  136. "http://www.pythontest.net/index.html#frag")
  137. def test_fileno(self):
  138. req = urllib2.Request("http://www.example.com")
  139. opener = urllib2.build_opener()
  140. res = opener.open(req)
  141. try:
  142. res.fileno()
  143. except AttributeError:
  144. self.fail("HTTPResponse object should return a valid fileno")
  145. finally:
  146. res.close()
  147. def test_custom_headers(self):
  148. url = "http://www.example.com"
  149. with test_support.transient_internet(url):
  150. opener = urllib2.build_opener()
  151. request = urllib2.Request(url)
  152. self.assertFalse(request.header_items())
  153. opener.open(request)
  154. self.assertTrue(request.header_items())
  155. self.assertTrue(request.has_header('User-agent'))
  156. request.add_header('User-Agent','Test-Agent')
  157. opener.open(request)
  158. self.assertEqual(request.get_header('User-agent'),'Test-Agent')
  159. def test_sites_no_connection_close(self):
  160. # Some sites do not send Connection: close header.
  161. # Verify that those work properly. (#issue12576)
  162. URL = 'http://www.imdb.com' # No Connection:close
  163. with test_support.transient_internet(URL):
  164. req = urllib2.urlopen(URL)
  165. res = req.read()
  166. self.assertTrue(res)
  167. def _test_urls(self, urls, handlers, retry=True):
  168. import time
  169. import logging
  170. debug = logging.getLogger("test_urllib2").debug
  171. urlopen = urllib2.build_opener(*handlers).open
  172. if retry:
  173. urlopen = _wrap_with_retry_thrice(urlopen, urllib2.URLError)
  174. for url in urls:
  175. if isinstance(url, tuple):
  176. url, req, expected_err = url
  177. else:
  178. req = expected_err = None
  179. with test_support.transient_internet(url):
  180. debug(url)
  181. try:
  182. f = urlopen(url, req, TIMEOUT)
  183. except EnvironmentError as err:
  184. debug(err)
  185. if expected_err:
  186. msg = ("Didn't get expected error(s) %s for %s %s, got %s: %s" %
  187. (expected_err, url, req, type(err), err))
  188. self.assertIsInstance(err, expected_err, msg)
  189. except urllib2.URLError as err:
  190. if isinstance(err[0], socket.timeout):
  191. print >>sys.stderr, "<timeout: %s>" % url
  192. continue
  193. else:
  194. raise
  195. else:
  196. try:
  197. with test_support.transient_internet(url):
  198. buf = f.read()
  199. debug("read %d bytes" % len(buf))
  200. except socket.timeout:
  201. print >>sys.stderr, "<timeout: %s>" % url
  202. f.close()
  203. debug("******** next url coming up...")
  204. time.sleep(0.1)
  205. def _extra_handlers(self):
  206. handlers = []
  207. cfh = urllib2.CacheFTPHandler()
  208. self.addCleanup(cfh.clear_cache)
  209. cfh.setTimeout(1)
  210. handlers.append(cfh)
  211. return handlers
  212. class TimeoutTest(unittest.TestCase):
  213. def test_http_basic(self):
  214. self.assertIsNone(socket.getdefaulttimeout())
  215. url = "http://www.example.com"
  216. with test_support.transient_internet(url, timeout=None):
  217. u = _urlopen_with_retry(url)
  218. self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
  219. def test_http_default_timeout(self):
  220. self.assertIsNone(socket.getdefaulttimeout())
  221. url = "http://www.example.com"
  222. with test_support.transient_internet(url):
  223. socket.setdefaulttimeout(60)
  224. try:
  225. u = _urlopen_with_retry(url)
  226. finally:
  227. socket.setdefaulttimeout(None)
  228. self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 60)
  229. def test_http_no_timeout(self):
  230. self.assertIsNone(socket.getdefaulttimeout())
  231. url = "http://www.example.com"
  232. with test_support.transient_internet(url):
  233. socket.setdefaulttimeout(60)
  234. try:
  235. u = _urlopen_with_retry(url, timeout=None)
  236. finally:
  237. socket.setdefaulttimeout(None)
  238. self.assertIsNone(u.fp._sock.fp._sock.gettimeout())
  239. def test_http_timeout(self):
  240. url = "http://www.example.com"
  241. with test_support.transient_internet(url):
  242. u = _urlopen_with_retry(url, timeout=120)
  243. self.assertEqual(u.fp._sock.fp._sock.gettimeout(), 120)
  244. FTP_HOST = 'ftp://ftp.debian.org/debian/'
  245. def test_ftp_basic(self):
  246. self.assertIsNone(socket.getdefaulttimeout())
  247. with test_support.transient_internet(self.FTP_HOST, timeout=None):
  248. u = _urlopen_with_retry(self.FTP_HOST)
  249. self.assertIsNone(u.fp.fp._sock.gettimeout())
  250. def test_ftp_default_timeout(self):
  251. self.assertIsNone(socket.getdefaulttimeout())
  252. with test_support.transient_internet(self.FTP_HOST):
  253. socket.setdefaulttimeout(60)
  254. try:
  255. u = _urlopen_with_retry(self.FTP_HOST)
  256. finally:
  257. socket.setdefaulttimeout(None)
  258. self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
  259. def test_ftp_no_timeout(self):
  260. self.assertIsNone(socket.getdefaulttimeout(),)
  261. with test_support.transient_internet(self.FTP_HOST):
  262. socket.setdefaulttimeout(60)
  263. try:
  264. u = _urlopen_with_retry(self.FTP_HOST, timeout=None)
  265. finally:
  266. socket.setdefaulttimeout(None)
  267. self.assertIsNone(u.fp.fp._sock.gettimeout())
  268. def test_ftp_timeout(self):
  269. with test_support.transient_internet(self.FTP_HOST):
  270. u = _urlopen_with_retry(self.FTP_HOST, timeout=60)
  271. self.assertEqual(u.fp.fp._sock.gettimeout(), 60)
  272. def test_main():
  273. test_support.requires("network")
  274. test_support.run_unittest(AuthTests,
  275. OtherNetworkTests,
  276. CloseSocketTest,
  277. TimeoutTest,
  278. )
  279. if __name__ == "__main__":
  280. test_main()