ssl_servers.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209
  1. import os
  2. import sys
  3. import ssl
  4. import pprint
  5. import urllib
  6. import urlparse
  7. # Rename HTTPServer to _HTTPServer so as to avoid confusion with HTTPSServer.
  8. from BaseHTTPServer import HTTPServer as _HTTPServer, BaseHTTPRequestHandler
  9. from SimpleHTTPServer import SimpleHTTPRequestHandler
  10. from test import test_support as support
  11. threading = support.import_module("threading")
  12. here = os.path.dirname(__file__)
  13. HOST = support.HOST
  14. CERTFILE = os.path.join(here, 'keycert.pem')
  15. # This one's based on HTTPServer, which is based on SocketServer
  16. class HTTPSServer(_HTTPServer):
  17. def __init__(self, server_address, handler_class, context):
  18. _HTTPServer.__init__(self, server_address, handler_class)
  19. self.context = context
  20. def __str__(self):
  21. return ('<%s %s:%s>' %
  22. (self.__class__.__name__,
  23. self.server_name,
  24. self.server_port))
  25. def get_request(self):
  26. # override this to wrap socket with SSL
  27. try:
  28. sock, addr = self.socket.accept()
  29. sslconn = self.context.wrap_socket(sock, server_side=True)
  30. except OSError as e:
  31. # socket errors are silenced by the caller, print them here
  32. if support.verbose:
  33. sys.stderr.write("Got an error:\n%s\n" % e)
  34. raise
  35. return sslconn, addr
  36. class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
  37. # need to override translate_path to get a known root,
  38. # instead of using os.curdir, since the test could be
  39. # run from anywhere
  40. server_version = "TestHTTPS/1.0"
  41. root = here
  42. # Avoid hanging when a request gets interrupted by the client
  43. timeout = 5
  44. def translate_path(self, path):
  45. """Translate a /-separated PATH to the local filename syntax.
  46. Components that mean special things to the local file system
  47. (e.g. drive or directory names) are ignored. (XXX They should
  48. probably be diagnosed.)
  49. """
  50. # abandon query parameters
  51. path = urlparse.urlparse(path)[2]
  52. path = os.path.normpath(urllib.unquote(path))
  53. words = path.split('/')
  54. words = filter(None, words)
  55. path = self.root
  56. for word in words:
  57. drive, word = os.path.splitdrive(word)
  58. head, word = os.path.split(word)
  59. path = os.path.join(path, word)
  60. return path
  61. def log_message(self, format, *args):
  62. # we override this to suppress logging unless "verbose"
  63. if support.verbose:
  64. sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
  65. (self.server.server_address,
  66. self.server.server_port,
  67. self.request.cipher(),
  68. self.log_date_time_string(),
  69. format%args))
  70. class StatsRequestHandler(BaseHTTPRequestHandler):
  71. """Example HTTP request handler which returns SSL statistics on GET
  72. requests.
  73. """
  74. server_version = "StatsHTTPS/1.0"
  75. def do_GET(self, send_body=True):
  76. """Serve a GET request."""
  77. sock = self.rfile.raw._sock
  78. context = sock.context
  79. stats = {
  80. 'session_cache': context.session_stats(),
  81. 'cipher': sock.cipher(),
  82. 'compression': sock.compression(),
  83. }
  84. body = pprint.pformat(stats)
  85. body = body.encode('utf-8')
  86. self.send_response(200)
  87. self.send_header("Content-type", "text/plain; charset=utf-8")
  88. self.send_header("Content-Length", str(len(body)))
  89. self.end_headers()
  90. if send_body:
  91. self.wfile.write(body)
  92. def do_HEAD(self):
  93. """Serve a HEAD request."""
  94. self.do_GET(send_body=False)
  95. def log_request(self, format, *args):
  96. if support.verbose:
  97. BaseHTTPRequestHandler.log_request(self, format, *args)
  98. class HTTPSServerThread(threading.Thread):
  99. def __init__(self, context, host=HOST, handler_class=None):
  100. self.flag = None
  101. self.server = HTTPSServer((host, 0),
  102. handler_class or RootedHTTPRequestHandler,
  103. context)
  104. self.port = self.server.server_port
  105. threading.Thread.__init__(self)
  106. self.daemon = True
  107. def __str__(self):
  108. return "<%s %s>" % (self.__class__.__name__, self.server)
  109. def start(self, flag=None):
  110. self.flag = flag
  111. threading.Thread.start(self)
  112. def run(self):
  113. if self.flag:
  114. self.flag.set()
  115. try:
  116. self.server.serve_forever(0.05)
  117. finally:
  118. self.server.server_close()
  119. def stop(self):
  120. self.server.shutdown()
  121. def make_https_server(case, context=None, certfile=CERTFILE,
  122. host=HOST, handler_class=None):
  123. if context is None:
  124. context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
  125. # We assume the certfile contains both private key and certificate
  126. context.load_cert_chain(certfile)
  127. server = HTTPSServerThread(context, host, handler_class)
  128. flag = threading.Event()
  129. server.start(flag)
  130. flag.wait()
  131. def cleanup():
  132. if support.verbose:
  133. sys.stdout.write('stopping HTTPS server\n')
  134. server.stop()
  135. if support.verbose:
  136. sys.stdout.write('joining HTTPS thread\n')
  137. server.join()
  138. case.addCleanup(cleanup)
  139. return server
  140. if __name__ == "__main__":
  141. import argparse
  142. parser = argparse.ArgumentParser(
  143. description='Run a test HTTPS server. '
  144. 'By default, the current directory is served.')
  145. parser.add_argument('-p', '--port', type=int, default=4433,
  146. help='port to listen on (default: %(default)s)')
  147. parser.add_argument('-q', '--quiet', dest='verbose', default=True,
  148. action='store_false', help='be less verbose')
  149. parser.add_argument('-s', '--stats', dest='use_stats_handler', default=False,
  150. action='store_true', help='always return stats page')
  151. parser.add_argument('--curve-name', dest='curve_name', type=str,
  152. action='store',
  153. help='curve name for EC-based Diffie-Hellman')
  154. parser.add_argument('--ciphers', dest='ciphers', type=str,
  155. help='allowed cipher list')
  156. parser.add_argument('--dh', dest='dh_file', type=str, action='store',
  157. help='PEM file containing DH parameters')
  158. args = parser.parse_args()
  159. support.verbose = args.verbose
  160. if args.use_stats_handler:
  161. handler_class = StatsRequestHandler
  162. else:
  163. handler_class = RootedHTTPRequestHandler
  164. handler_class.root = os.getcwd()
  165. context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
  166. context.load_cert_chain(CERTFILE)
  167. if args.curve_name:
  168. context.set_ecdh_curve(args.curve_name)
  169. if args.dh_file:
  170. context.load_dh_params(args.dh_file)
  171. if args.ciphers:
  172. context.set_ciphers(args.ciphers)
  173. server = HTTPSServer(("", args.port), handler_class, context)
  174. if args.verbose:
  175. print("Listening on https://localhost:{0.port}".format(args))
  176. server.serve_forever(0.1)