03-publish-b2c-disconnect-qos2.py 3.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. #!/usr/bin/env python3
  2. # Does an interrupted QoS 1 flow from broker to client get handled correctly?
  3. from mosq_test_helper import *
  4. def helper(port):
  5. connect_packet = mosq_test.gen_connect("test-helper", keepalive=60)
  6. connack_packet = mosq_test.gen_connack(rc=0)
  7. mid = 312
  8. publish_packet = mosq_test.gen_publish("qos2/disconnect/test", qos=2, mid=mid, payload="disconnect-message")
  9. pubrec_packet = mosq_test.gen_pubrec(mid)
  10. pubrel_packet = mosq_test.gen_pubrel(mid)
  11. pubcomp_packet = mosq_test.gen_pubcomp(mid)
  12. sock = mosq_test.do_client_connect(connect_packet, connack_packet, connack_error="helper connack", port=port)
  13. mosq_test.do_send_receive(sock, publish_packet, pubrec_packet, "helper pubrec")
  14. mosq_test.do_send_receive(sock, pubrel_packet, pubcomp_packet, "helper pubcomp")
  15. sock.close()
  16. def do_test(proto_ver):
  17. rc = 1
  18. mid = 3265
  19. keepalive = 60
  20. connect_packet = mosq_test.gen_connect("pub-qos2-disco-test", keepalive=keepalive, clean_session=False, proto_ver=proto_ver, session_expiry=60)
  21. connack1_packet = mosq_test.gen_connack(flags=0, rc=0, proto_ver=proto_ver)
  22. connack2_packet = mosq_test.gen_connack(flags=1, rc=0, proto_ver=proto_ver)
  23. subscribe_packet = mosq_test.gen_subscribe(mid, "qos2/disconnect/test", 2, proto_ver=proto_ver)
  24. suback_packet = mosq_test.gen_suback(mid, 2, proto_ver=proto_ver)
  25. mid = 1
  26. publish_packet = mosq_test.gen_publish("qos2/disconnect/test", qos=2, mid=mid, payload="disconnect-message", proto_ver=proto_ver)
  27. publish_dup_packet = mosq_test.gen_publish("qos2/disconnect/test", qos=2, mid=mid, payload="disconnect-message", dup=True, proto_ver=proto_ver)
  28. pubrec_packet = mosq_test.gen_pubrec(mid, proto_ver=proto_ver)
  29. pubrel_packet = mosq_test.gen_pubrel(mid, proto_ver=proto_ver)
  30. pubcomp_packet = mosq_test.gen_pubcomp(mid, proto_ver=proto_ver)
  31. mid = 3266
  32. publish2_packet = mosq_test.gen_publish("qos1/outgoing", qos=1, mid=mid, payload="outgoing-message", proto_ver=proto_ver)
  33. puback2_packet = mosq_test.gen_puback(mid, proto_ver=proto_ver)
  34. port = mosq_test.get_port()
  35. broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
  36. try:
  37. sock = mosq_test.do_client_connect(connect_packet, connack1_packet, port=port)
  38. mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
  39. helper(port)
  40. # Should have now received a publish command
  41. mosq_test.expect_packet(sock, "publish", publish_packet)
  42. # Send our outgoing message. When we disconnect the broker
  43. # should get rid of it and assume we're going to retry.
  44. sock.send(publish2_packet)
  45. sock.close()
  46. sock = mosq_test.do_client_connect(connect_packet, connack2_packet, port=port)
  47. mosq_test.expect_packet(sock, "dup publish", publish_dup_packet)
  48. mosq_test.do_send_receive(sock, pubrec_packet, pubrel_packet, "pubrel")
  49. sock.close()
  50. sock = mosq_test.do_client_connect(connect_packet, connack2_packet, port=port)
  51. mosq_test.expect_packet(sock, "dup pubrel", pubrel_packet)
  52. sock.send(pubcomp_packet)
  53. rc = 0
  54. sock.close()
  55. except mosq_test.TestError:
  56. pass
  57. finally:
  58. broker.terminate()
  59. broker.wait()
  60. (stdo, stde) = broker.communicate()
  61. if rc:
  62. print(stde.decode('utf-8'))
  63. print("proto_ver=%d" % (proto_ver))
  64. exit(rc)
  65. do_test(proto_ver=4)
  66. do_test(proto_ver=5)
  67. exit(0)