08-ssl-hup-disconnect.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. #!/usr/bin/env python3
  2. # Test whether a client connected with a client certificate when
  3. # use_identity_as_username is true is then disconnected when a SIGHUP is
  4. # received.
  5. # https://github.com/eclipse/mosquitto/issues/1402
  6. from mosq_test_helper import *
  7. import signal
  8. if sys.version < '2.7':
  9. print("WARNING: SSL not supported on Python 2.6")
  10. exit(0)
  11. def write_config(filename, pw_file, port, option):
  12. with open(filename, 'w') as f:
  13. f.write("listener %d\n" % (port))
  14. f.write("cafile ../ssl/all-ca.crt\n")
  15. f.write("certfile ../ssl/server.crt\n")
  16. f.write("keyfile ../ssl/server.key\n")
  17. f.write("require_certificate true\n")
  18. f.write("%s true\n" % (option))
  19. f.write("password_file %s\n" % (pw_file))
  20. def write_pwfile(filename):
  21. with open(filename, 'w') as f:
  22. # Username "test client", password test
  23. f.write('test client:$6$njERlZMi/7DzNB9E$iiavfuXvUm8iyDZArTy7smTxh07GXXOrOsqxfW6gkOYVXHGk+W+i/8d3xDxrMwEPygEBhoA8A/gjQC0N2M4Lkw==\n')
  24. def do_test(option):
  25. port = mosq_test.get_port()
  26. conf_file = os.path.basename(__file__).replace('.py', '.conf')
  27. pw_file = os.path.basename(__file__).replace('.py', '.pwfile')
  28. write_config(conf_file, pw_file, port, option)
  29. write_pwfile(pw_file)
  30. rc = 1
  31. keepalive = 10
  32. connect_packet = mosq_test.gen_connect("connect-success-test", keepalive=keepalive)
  33. connack_packet = mosq_test.gen_connack(rc=0)
  34. broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port, use_conf=True)
  35. try:
  36. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  37. ssock = ssl.wrap_socket(sock, ca_certs="../ssl/test-root-ca.crt", certfile="../ssl/client.crt", keyfile="../ssl/client.key", cert_reqs=ssl.CERT_REQUIRED)
  38. ssock.settimeout(20)
  39. ssock.connect(("localhost", port))
  40. mosq_test.do_send_receive(ssock, connect_packet, connack_packet, "connack")
  41. broker.send_signal(signal.SIGHUP)
  42. time.sleep(1)
  43. # This will fail if we've been disconnected
  44. mosq_test.do_ping(ssock)
  45. rc = 0
  46. ssock.close()
  47. except mosq_test.TestError:
  48. pass
  49. finally:
  50. os.remove(conf_file)
  51. os.remove(pw_file)
  52. broker.terminate()
  53. broker.wait()
  54. (stdo, stde) = broker.communicate()
  55. if rc:
  56. print(stde.decode('utf-8'))
  57. exit(rc)
  58. do_test("use_identity_as_username")
  59. do_test("use_subject_as_username")
  60. exit(0)