02-subpub-qos0-retain-as-publish.py 2.1 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. #!/usr/bin/env python3
  2. # Test whether a client subscribed to a topic with retain-as-published set works as expected.
  3. # MQTT v5
  4. from mosq_test_helper import *
  5. def do_test():
  6. rc = 1
  7. keepalive = 60
  8. connect_packet = mosq_test.gen_connect("subpub-qos1-test", keepalive=keepalive, proto_ver=5)
  9. connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
  10. mid = 530
  11. subscribe1_packet = mosq_test.gen_subscribe(mid, "subpub/normal", 0, proto_ver=5)
  12. suback1_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
  13. mid = 531
  14. subscribe2_packet = mosq_test.gen_subscribe(mid, "subpub/rap", 0 | mqtt5_opts.MQTT_SUB_OPT_RETAIN_AS_PUBLISHED, proto_ver=5)
  15. suback2_packet = mosq_test.gen_suback(mid, 0, proto_ver=5)
  16. publish1_packet = mosq_test.gen_publish("subpub/normal", qos=0, retain=True, payload="message", proto_ver=5)
  17. publish2_packet = mosq_test.gen_publish("subpub/rap", qos=0, retain=True, payload="message", proto_ver=5)
  18. publish1r_packet = mosq_test.gen_publish("subpub/normal", qos=0, retain=False, payload="message", proto_ver=5)
  19. publish2r_packet = mosq_test.gen_publish("subpub/rap", qos=0, retain=True, payload="message", proto_ver=5)
  20. mid = 1
  21. publish3_packet = mosq_test.gen_publish("subpub/receive", qos=1, mid=mid, payload="success", proto_ver=5)
  22. port = mosq_test.get_port()
  23. broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
  24. try:
  25. sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
  26. mosq_test.do_send_receive(sock, subscribe1_packet, suback1_packet, "suback1")
  27. mosq_test.do_send_receive(sock, subscribe2_packet, suback2_packet, "suback2")
  28. mosq_test.do_send_receive(sock, publish1_packet, publish1r_packet, "publish1")
  29. mosq_test.do_send_receive(sock, publish2_packet, publish2r_packet, "publish2")
  30. rc = 0
  31. sock.close()
  32. except mosq_test.TestError:
  33. pass
  34. finally:
  35. broker.terminate()
  36. broker.wait()
  37. (stdo, stde) = broker.communicate()
  38. if rc:
  39. print(stde.decode('utf-8'))
  40. exit(rc)
  41. do_test()
  42. exit(0)