02-subpub-qos1-message-expiry-retain.py 3.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. #!/usr/bin/env python3
  2. # Test whether the broker reduces the message expiry interval when republishing
  3. # a retained message, and eventually removes it.
  4. # MQTT v5
  5. # Helper publishes a message, with a medium length expiry with retained set. It
  6. # publishes a second message with retained set but no expiry.
  7. # Client connects, subscribes, gets messages, disconnects.
  8. # We wait until the expiry will have expired.
  9. # Client connects, subscribes, doesn't get expired message, does get
  10. # non-expired message.
  11. from mosq_test_helper import *
  12. def do_test():
  13. rc = 1
  14. keepalive = 60
  15. connect_packet = mosq_test.gen_connect("subpub", keepalive=keepalive, proto_ver=5)
  16. connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
  17. mid = 1
  18. subscribe1_packet = mosq_test.gen_subscribe(mid, "subpub/expired", 1, proto_ver=5)
  19. suback1_packet = mosq_test.gen_suback(mid, 1, proto_ver=5)
  20. mid = 2
  21. subscribe2_packet = mosq_test.gen_subscribe(mid, "subpub/kept", 1, proto_ver=5)
  22. suback2_packet = mosq_test.gen_suback(mid, 1, proto_ver=5)
  23. helper_connect = mosq_test.gen_connect("helper", proto_ver=5)
  24. helper_connack = mosq_test.gen_connack(rc=0, proto_ver=5)
  25. mid=1
  26. props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_MESSAGE_EXPIRY_INTERVAL, 4)
  27. publish1_packet = mosq_test.gen_publish("subpub/expired", mid=mid, qos=1, retain=True, payload="message1", proto_ver=5, properties=props)
  28. puback1_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
  29. mid=2
  30. publish2s_packet = mosq_test.gen_publish("subpub/kept", mid=mid, qos=1, retain=True, payload="message2", proto_ver=5)
  31. puback2s_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
  32. mid=1
  33. publish2r_packet = mosq_test.gen_publish("subpub/kept", mid=mid, qos=1, retain=True, payload="message2", proto_ver=5)
  34. puback2r_packet = mosq_test.gen_puback(mid, proto_ver=5, reason_code=mqtt5_rc.MQTT_RC_NO_MATCHING_SUBSCRIBERS)
  35. port = mosq_test.get_port()
  36. broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
  37. try:
  38. helper = mosq_test.do_client_connect(helper_connect, helper_connack, timeout=20, port=port)
  39. mosq_test.do_send_receive(helper, publish1_packet, puback1_packet, "puback 1")
  40. mosq_test.do_send_receive(helper, publish2s_packet, puback2s_packet, "puback 2")
  41. helper.close()
  42. sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
  43. mosq_test.do_send_receive(sock, subscribe1_packet, suback1_packet, "suback 1-1")
  44. mosq_test.expect_packet(sock, "publish 1", publish1_packet)
  45. sock.send(puback1_packet)
  46. mosq_test.do_send_receive(sock, subscribe2_packet, suback2_packet, "suback 2-1")
  47. mosq_test.expect_packet(sock, "publish 2", publish2s_packet)
  48. sock.send(puback2s_packet)
  49. sock.close()
  50. time.sleep(5)
  51. sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
  52. mosq_test.do_send_receive(sock, subscribe1_packet, suback1_packet, "suback 1-2")
  53. # We shouldn't receive a publish here
  54. # This will fail if we do receive a publish
  55. mosq_test.do_send_receive(sock, subscribe2_packet, suback2_packet, "suback 2-2")
  56. mosq_test.expect_packet(sock, "publish 2", publish2r_packet)
  57. sock.send(puback2r_packet)
  58. sock.close()
  59. rc = 0
  60. except mosq_test.TestError:
  61. pass
  62. finally:
  63. broker.terminate()
  64. broker.wait()
  65. (stdo, stde) = broker.communicate()
  66. if rc:
  67. print(stde.decode('utf-8'))
  68. print("proto_ver=%d" % (proto_ver))
  69. exit(rc)
  70. do_test()
  71. exit(0)