03-publish-b2c-qos2-len.py 3.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. #!/usr/bin/env python3
  2. # Check whether the broker handles a v5 PUBREC, PUBCOMP with all combinations
  3. # of with/without reason code and properties.
  4. from mosq_test_helper import *
  5. def helper(port):
  6. connect_packet = mosq_test.gen_connect("test-helper", keepalive=60)
  7. connack_packet = mosq_test.gen_connack(rc=0)
  8. mid = 1
  9. publish_packet = mosq_test.gen_publish("qos2/len/test", qos=2, mid=mid, payload="len-message")
  10. pubrec_packet = mosq_test.gen_pubrec(mid)
  11. pubrel_packet = mosq_test.gen_pubrel(mid)
  12. pubcomp_packet = mosq_test.gen_pubcomp(mid)
  13. sock = mosq_test.do_client_connect(connect_packet, connack_packet, connack_error="helper connack", port=port)
  14. mosq_test.do_send_receive(sock, publish_packet, pubrec_packet, "helper pubrec")
  15. mosq_test.do_send_receive(sock, pubrel_packet, pubcomp_packet, "helper pubcomp")
  16. sock.close()
  17. def len_test(test, pubrec_packet, pubcomp_packet):
  18. rc = 1
  19. mid = 3265
  20. keepalive = 60
  21. connect_packet = mosq_test.gen_connect("pub-test", keepalive=keepalive, clean_session=False, proto_ver=5)
  22. connack_packet = mosq_test.gen_connack(flags=0, rc=0, proto_ver=5)
  23. subscribe_packet = mosq_test.gen_subscribe(mid, "qos2/len/test", 2, proto_ver=5)
  24. suback_packet = mosq_test.gen_suback(mid, 2, proto_ver=5)
  25. mid = 1
  26. publish_packet = mosq_test.gen_publish("qos2/len/test", qos=2, mid=mid, payload="len-message", proto_ver=5)
  27. pubrel_packet = mosq_test.gen_pubrel(mid)
  28. port = mosq_test.get_port()
  29. broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
  30. try:
  31. sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
  32. mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
  33. helper(port)
  34. # Should have now received a publish command
  35. mosq_test.expect_packet(sock, "publish", publish_packet)
  36. mosq_test.do_send_receive(sock, pubrec_packet, pubrel_packet, "pubrel")
  37. sock.send(pubcomp_packet)
  38. mosq_test.do_ping(sock)
  39. rc = 0
  40. sock.close()
  41. except mosq_test.TestError:
  42. pass
  43. finally:
  44. broker.terminate()
  45. broker.wait()
  46. (stdo, stde) = broker.communicate()
  47. if rc:
  48. print(stde.decode('utf-8'))
  49. if rc != 0:
  50. print(test)
  51. exit(rc)
  52. # No reason code, no properties
  53. pubrec_packet = mosq_test.gen_pubrec(1)
  54. pubcomp_packet = mosq_test.gen_pubcomp(1)
  55. len_test("qos2 len 2", pubrec_packet, pubcomp_packet)
  56. # Reason code, no properties
  57. pubrec_packet = mosq_test.gen_pubrec(1, proto_ver=5, reason_code=0x00)
  58. pubcomp_packet = mosq_test.gen_pubcomp(1, proto_ver=5, reason_code=0x00)
  59. len_test("qos2 len 3", pubrec_packet, pubcomp_packet)
  60. # Reason code, empty properties
  61. pubrec_packet = mosq_test.gen_pubrec(1, proto_ver=5, reason_code=0x00, properties="")
  62. pubcomp_packet = mosq_test.gen_pubcomp(1, proto_ver=5, reason_code=0x00, properties="")
  63. len_test("qos2 len 4", pubrec_packet, pubcomp_packet)
  64. # Reason code, one property
  65. props = mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "key", "value")
  66. pubrec_packet = mosq_test.gen_pubrec(1, proto_ver=5, reason_code=0x00, properties=props)
  67. props = mqtt5_props.gen_string_pair_prop(mqtt5_props.PROP_USER_PROPERTY, "key", "value")
  68. pubcomp_packet = mosq_test.gen_pubcomp(1, proto_ver=5, reason_code=0x00, properties=props)
  69. len_test("qos2 len >5", pubrec_packet, pubcomp_packet)