09-extended-auth-multistep-reauth.py 3.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. #!/usr/bin/env python3
  2. from mosq_test_helper import *
  3. def write_config(filename, port):
  4. with open(filename, 'w') as f:
  5. f.write("port %d\n" % (port))
  6. f.write("allow_anonymous true\n")
  7. f.write("auth_plugin c/auth_plugin_extended_multiple.so\n")
  8. port = mosq_test.get_port()
  9. conf_file = os.path.basename(__file__).replace('.py', '.conf')
  10. write_config(conf_file, port)
  11. rc = 1
  12. # First auth
  13. # ==========
  14. props = mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_METHOD, "mirror")
  15. props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_DATA, "step1")
  16. connect1_packet = mosq_test.gen_connect("client-params-test", keepalive=42, proto_ver=5, properties=props)
  17. # Server to client
  18. props = mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_METHOD, "mirror")
  19. props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_DATA, "1pets")
  20. auth1_1_packet = mosq_test.gen_auth(reason_code=mqtt5_rc.MQTT_RC_CONTINUE_AUTHENTICATION, properties=props)
  21. # Client to server
  22. props = mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_METHOD, "mirror")
  23. props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_DATA, "supercalifragilisticexpialidocious")
  24. auth1_2_packet = mosq_test.gen_auth(reason_code=mqtt5_rc.MQTT_RC_CONTINUE_AUTHENTICATION, properties=props)
  25. props = mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_METHOD, "mirror")
  26. connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=props)
  27. # Second auth
  28. # ===========
  29. props = mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_METHOD, "mirror")
  30. props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_DATA, "step1")
  31. reauth2_packet = mosq_test.gen_auth(reason_code=mqtt5_rc.MQTT_RC_REAUTHENTICATE, properties=props)
  32. # Server to client
  33. props = mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_METHOD, "mirror")
  34. props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_DATA, "1pets")
  35. auth2_1_packet = mosq_test.gen_auth(reason_code=mqtt5_rc.MQTT_RC_CONTINUE_AUTHENTICATION, properties=props)
  36. # Client to server
  37. props = mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_METHOD, "mirror")
  38. props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_DATA, "supercalifragilisticexpialidocious")
  39. auth2_2_packet = mosq_test.gen_auth(reason_code=mqtt5_rc.MQTT_RC_CONTINUE_AUTHENTICATION, properties=props)
  40. props = mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_METHOD, "mirror")
  41. auth2_3_packet = mosq_test.gen_auth(reason_code=0, properties=props)
  42. # Third auth - bad due to different method
  43. # ========================================
  44. props = mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_METHOD, "badmethod")
  45. props += mqtt5_props.gen_string_prop(mqtt5_props.PROP_AUTHENTICATION_DATA, "step1")
  46. reauth3_packet = mosq_test.gen_auth(reason_code=mqtt5_rc.MQTT_RC_REAUTHENTICATE, properties=props)
  47. # Server to client
  48. disconnect3_packet = mosq_test.gen_disconnect(reason_code=mqtt5_rc.MQTT_RC_PROTOCOL_ERROR, proto_ver=5)
  49. broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
  50. try:
  51. sock = mosq_test.do_client_connect(connect1_packet, auth1_1_packet, timeout=20, port=port, connack_error="auth1")
  52. mosq_test.do_send_receive(sock, auth1_2_packet, connack1_packet, "connack1")
  53. mosq_test.do_ping(sock, "pingresp1")
  54. mosq_test.do_send_receive(sock, reauth2_packet, auth2_1_packet, "auth2_1")
  55. mosq_test.do_send_receive(sock, auth2_2_packet, auth2_3_packet, "auth2_3")
  56. mosq_test.do_ping(sock, "pingresp2")
  57. mosq_test.do_send_receive(sock, reauth3_packet, disconnect3_packet, "disconnect3")
  58. rc = 0
  59. sock.close()
  60. except mosq_test.TestError:
  61. pass
  62. finally:
  63. os.remove(conf_file)
  64. broker.terminate()
  65. broker.wait()
  66. (stdo, stde) = broker.communicate()
  67. if rc:
  68. print(stde.decode('utf-8'))
  69. exit(rc)