prop_subpub_helper.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950
  1. #!/usr/bin/env python3
  2. # Test whether a client subscribed to a topic receives its own message sent to that topic.
  3. # Does a given property get sent through?
  4. # MQTT v5
  5. from mosq_test_helper import *
  6. def prop_subpub_helper(props_out, props_in, expect_proto_error=False):
  7. rc = 1
  8. mid = 53
  9. keepalive = 60
  10. connect_packet = mosq_test.gen_connect("subpub-qos0-test", keepalive=keepalive, proto_ver=5)
  11. connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
  12. subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos0", 0, proto_ver=5)
  13. suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
  14. publish_packet_out = mosq_test.gen_publish("subpub/qos0", qos=0, payload="message", proto_ver=5, properties=props_out)
  15. publish_packet_expected = mosq_test.gen_publish("subpub/qos0", qos=0, payload="message", proto_ver=5, properties=props_in)
  16. disconnect_packet = mosq_test.gen_disconnect(reason_code=mqtt5_rc.MQTT_RC_PROTOCOL_ERROR, proto_ver=5)
  17. port = mosq_test.get_port()
  18. broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
  19. try:
  20. sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
  21. mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
  22. if expect_proto_error:
  23. mosq_test.do_send_receive(sock, publish_packet_out, disconnect_packet, "publish")
  24. else:
  25. mosq_test.do_send_receive(sock, publish_packet_out, publish_packet_expected, "publish")
  26. rc = 0
  27. sock.close()
  28. except mosq_test.TestError:
  29. pass
  30. finally:
  31. broker.terminate()
  32. broker.wait()
  33. (stdo, stde) = broker.communicate()
  34. if rc:
  35. print(stde.decode('utf-8'))
  36. exit(rc)