123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209 |
- import os
- import sys
- import ssl
- import pprint
- import urllib
- import urlparse
- # Rename HTTPServer to _HTTPServer so as to avoid confusion with HTTPSServer.
- from BaseHTTPServer import HTTPServer as _HTTPServer, BaseHTTPRequestHandler
- from SimpleHTTPServer import SimpleHTTPRequestHandler
- from test import test_support as support
- threading = support.import_module("threading")
- here = os.path.dirname(__file__)
- HOST = support.HOST
- CERTFILE = os.path.join(here, 'keycert.pem')
- # This one's based on HTTPServer, which is based on SocketServer
- class HTTPSServer(_HTTPServer):
- def __init__(self, server_address, handler_class, context):
- _HTTPServer.__init__(self, server_address, handler_class)
- self.context = context
- def __str__(self):
- return ('<%s %s:%s>' %
- (self.__class__.__name__,
- self.server_name,
- self.server_port))
- def get_request(self):
- # override this to wrap socket with SSL
- try:
- sock, addr = self.socket.accept()
- sslconn = self.context.wrap_socket(sock, server_side=True)
- except OSError as e:
- # socket errors are silenced by the caller, print them here
- if support.verbose:
- sys.stderr.write("Got an error:\n%s\n" % e)
- raise
- return sslconn, addr
- class RootedHTTPRequestHandler(SimpleHTTPRequestHandler):
- # need to override translate_path to get a known root,
- # instead of using os.curdir, since the test could be
- # run from anywhere
- server_version = "TestHTTPS/1.0"
- root = here
- # Avoid hanging when a request gets interrupted by the client
- timeout = 5
- def translate_path(self, path):
- """Translate a /-separated PATH to the local filename syntax.
- Components that mean special things to the local file system
- (e.g. drive or directory names) are ignored. (XXX They should
- probably be diagnosed.)
- """
- # abandon query parameters
- path = urlparse.urlparse(path)[2]
- path = os.path.normpath(urllib.unquote(path))
- words = path.split('/')
- words = filter(None, words)
- path = self.root
- for word in words:
- drive, word = os.path.splitdrive(word)
- head, word = os.path.split(word)
- path = os.path.join(path, word)
- return path
- def log_message(self, format, *args):
- # we override this to suppress logging unless "verbose"
- if support.verbose:
- sys.stdout.write(" server (%s:%d %s):\n [%s] %s\n" %
- (self.server.server_address,
- self.server.server_port,
- self.request.cipher(),
- self.log_date_time_string(),
- format%args))
- class StatsRequestHandler(BaseHTTPRequestHandler):
- """Example HTTP request handler which returns SSL statistics on GET
- requests.
- """
- server_version = "StatsHTTPS/1.0"
- def do_GET(self, send_body=True):
- """Serve a GET request."""
- sock = self.rfile.raw._sock
- context = sock.context
- stats = {
- 'session_cache': context.session_stats(),
- 'cipher': sock.cipher(),
- 'compression': sock.compression(),
- }
- body = pprint.pformat(stats)
- body = body.encode('utf-8')
- self.send_response(200)
- self.send_header("Content-type", "text/plain; charset=utf-8")
- self.send_header("Content-Length", str(len(body)))
- self.end_headers()
- if send_body:
- self.wfile.write(body)
- def do_HEAD(self):
- """Serve a HEAD request."""
- self.do_GET(send_body=False)
- def log_request(self, format, *args):
- if support.verbose:
- BaseHTTPRequestHandler.log_request(self, format, *args)
- class HTTPSServerThread(threading.Thread):
- def __init__(self, context, host=HOST, handler_class=None):
- self.flag = None
- self.server = HTTPSServer((host, 0),
- handler_class or RootedHTTPRequestHandler,
- context)
- self.port = self.server.server_port
- threading.Thread.__init__(self)
- self.daemon = True
- def __str__(self):
- return "<%s %s>" % (self.__class__.__name__, self.server)
- def start(self, flag=None):
- self.flag = flag
- threading.Thread.start(self)
- def run(self):
- if self.flag:
- self.flag.set()
- try:
- self.server.serve_forever(0.05)
- finally:
- self.server.server_close()
- def stop(self):
- self.server.shutdown()
- def make_https_server(case, context=None, certfile=CERTFILE,
- host=HOST, handler_class=None):
- if context is None:
- context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- # We assume the certfile contains both private key and certificate
- context.load_cert_chain(certfile)
- server = HTTPSServerThread(context, host, handler_class)
- flag = threading.Event()
- server.start(flag)
- flag.wait()
- def cleanup():
- if support.verbose:
- sys.stdout.write('stopping HTTPS server\n')
- server.stop()
- if support.verbose:
- sys.stdout.write('joining HTTPS thread\n')
- server.join()
- case.addCleanup(cleanup)
- return server
- if __name__ == "__main__":
- import argparse
- parser = argparse.ArgumentParser(
- description='Run a test HTTPS server. '
- 'By default, the current directory is served.')
- parser.add_argument('-p', '--port', type=int, default=4433,
- help='port to listen on (default: %(default)s)')
- parser.add_argument('-q', '--quiet', dest='verbose', default=True,
- action='store_false', help='be less verbose')
- parser.add_argument('-s', '--stats', dest='use_stats_handler', default=False,
- action='store_true', help='always return stats page')
- parser.add_argument('--curve-name', dest='curve_name', type=str,
- action='store',
- help='curve name for EC-based Diffie-Hellman')
- parser.add_argument('--ciphers', dest='ciphers', type=str,
- help='allowed cipher list')
- parser.add_argument('--dh', dest='dh_file', type=str, action='store',
- help='PEM file containing DH parameters')
- args = parser.parse_args()
- support.verbose = args.verbose
- if args.use_stats_handler:
- handler_class = StatsRequestHandler
- else:
- handler_class = RootedHTTPRequestHandler
- handler_class.root = os.getcwd()
- context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
- context.load_cert_chain(CERTFILE)
- if args.curve_name:
- context.set_ecdh_curve(args.curve_name)
- if args.dh_file:
- context.load_dh_params(args.dh_file)
- if args.ciphers:
- context.set_ciphers(args.ciphers)
- server = HTTPSServer(("", args.port), handler_class, context)
- if args.verbose:
- print("Listening on https://localhost:{0.port}".format(args))
- server.serve_forever(0.1)
|