02-subpub-qos2-max-inflight-bytes.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. #!/usr/bin/env python3
  2. # Does the broker respect max_inflight_bytes?
  3. # Also check whether the send quota is dealt with properly when both
  4. # RECEIVE-MAXIMUM and max_inflight_bytes are set.
  5. # MQTT v5
  6. from mosq_test_helper import *
  7. def write_config(filename, port):
  8. with open(filename, 'w') as f:
  9. f.write("listener %d\n" % (port))
  10. f.write("allow_anonymous true\n")
  11. f.write("max_inflight_bytes 16\n")
  12. def send_small(port):
  13. rc = 1
  14. connect_packet = mosq_test.gen_connect("subpub-qos2-test-helper")
  15. connack_packet = mosq_test.gen_connack(rc=0)
  16. sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
  17. for i in range(0, 10):
  18. mid = 1+i
  19. publish_packet = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload=str(i+1))
  20. pubrec_packet = mosq_test.gen_pubrec(mid)
  21. pubrel_packet = mosq_test.gen_pubrel(mid)
  22. pubcomp_packet = mosq_test.gen_pubcomp(mid)
  23. mosq_test.do_send_receive(sock, publish_packet, pubrec_packet, "pubrec")
  24. mosq_test.do_send_receive(sock, pubrel_packet, pubcomp_packet, "pubcomp")
  25. def do_test(proto_ver):
  26. if proto_ver == 4:
  27. exit(0)
  28. rc = 1
  29. keepalive = 60
  30. props = mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 5)
  31. connect_packet = mosq_test.gen_connect("subpub-qos2-test", keepalive=keepalive, proto_ver=5, properties=props)
  32. connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
  33. mid = 1
  34. subscribe_packet = mosq_test.gen_subscribe(mid, "subpub/qos2", 2, proto_ver=5)
  35. suback_packet = mosq_test.gen_suback(mid, 2, proto_ver=5)
  36. port = mosq_test.get_port()
  37. conf_file = os.path.basename(__file__).replace('.py', '.conf')
  38. write_config(conf_file, port)
  39. broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port, use_conf=True)
  40. try:
  41. sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
  42. mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
  43. # Repeat many times to stress the send quota
  44. mid = 0
  45. for i in range(0, 12):
  46. pub = subprocess.Popen(['./02-subpub-qos2-receive-maximum-helper.py', str(port)], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
  47. pub.wait()
  48. (stdo, stde) = pub.communicate()
  49. mid += 1
  50. publish_packet1 = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message1", proto_ver=5)
  51. pubrec_packet1 = mosq_test.gen_pubrec(mid, proto_ver=5)
  52. pubrel_packet1 = mosq_test.gen_pubrel(mid, proto_ver=5)
  53. pubcomp_packet1 = mosq_test.gen_pubcomp(mid, proto_ver=5)
  54. mid += 1
  55. publish_packet2 = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message2", proto_ver=5)
  56. pubrec_packet2 = mosq_test.gen_pubrec(mid, proto_ver=5)
  57. pubrel_packet2 = mosq_test.gen_pubrel(mid, proto_ver=5)
  58. pubcomp_packet2 = mosq_test.gen_pubcomp(mid, proto_ver=5)
  59. mid += 1
  60. publish_packet3 = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="message3", proto_ver=5)
  61. pubrec_packet3 = mosq_test.gen_pubrec(mid, proto_ver=5)
  62. pubrel_packet3 = mosq_test.gen_pubrel(mid, proto_ver=5)
  63. pubcomp_packet3 = mosq_test.gen_pubcomp(mid, proto_ver=5)
  64. mosq_test.expect_packet(sock, "publish1", publish_packet1)
  65. mosq_test.expect_packet(sock, "publish2", publish_packet2)
  66. mosq_test.do_send_receive(sock, pubrec_packet1, pubrel_packet1, "pubrel1")
  67. sock.send(pubcomp_packet1)
  68. mosq_test.expect_packet(sock, "publish3", publish_packet3)
  69. mosq_test.do_send_receive(sock, pubrec_packet2, pubrel_packet2, "pubrel2")
  70. sock.send(pubcomp_packet2)
  71. mosq_test.do_send_receive(sock, pubrec_packet3, pubrel_packet3, "pubrel3")
  72. sock.send(pubcomp_packet3)
  73. # send messages where count will exceed max_inflight_messages, but the
  74. # payload bytes won't exceed max_inflight_bytes
  75. send_small(port)
  76. mid += 1
  77. publish_packet1 = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="1", proto_ver=5)
  78. pubrec_packet1 = mosq_test.gen_pubrec(mid, proto_ver=5)
  79. pubrel_packet1 = mosq_test.gen_pubrel(mid, proto_ver=5)
  80. pubcomp_packet1 = mosq_test.gen_pubcomp(mid, proto_ver=5)
  81. mid += 1
  82. publish_packet2 = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="2", proto_ver=5)
  83. pubrec_packet2 = mosq_test.gen_pubrec(mid, proto_ver=5)
  84. pubrel_packet2 = mosq_test.gen_pubrel(mid, proto_ver=5)
  85. pubcomp_packet2 = mosq_test.gen_pubcomp(mid, proto_ver=5)
  86. mid += 1
  87. publish_packet3 = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="3", proto_ver=5)
  88. pubrec_packet3 = mosq_test.gen_pubrec(mid, proto_ver=5)
  89. pubrel_packet3 = mosq_test.gen_pubrel(mid, proto_ver=5)
  90. pubcomp_packet3 = mosq_test.gen_pubcomp(mid, proto_ver=5)
  91. mid += 1
  92. publish_packet4 = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="4", proto_ver=5)
  93. pubrec_packet4 = mosq_test.gen_pubrec(mid, proto_ver=5)
  94. pubrel_packet4 = mosq_test.gen_pubrel(mid, proto_ver=5)
  95. pubcomp_packet4 = mosq_test.gen_pubcomp(mid, proto_ver=5)
  96. mid += 1
  97. publish_packet5 = mosq_test.gen_publish("subpub/qos2", qos=2, mid=mid, payload="5", proto_ver=5)
  98. pubrec_packet5 = mosq_test.gen_pubrec(mid, proto_ver=5)
  99. pubrel_packet5 = mosq_test.gen_pubrel(mid, proto_ver=5)
  100. pubcomp_packet5 = mosq_test.gen_pubcomp(mid, proto_ver=5)
  101. mosq_test.expect_packet(sock, "publish1s", publish_packet1)
  102. mosq_test.expect_packet(sock, "publish2s", publish_packet2)
  103. mosq_test.expect_packet(sock, "publish3s", publish_packet3)
  104. mosq_test.expect_packet(sock, "publish4s", publish_packet4)
  105. mosq_test.expect_packet(sock, "publish5s", publish_packet5)
  106. mosq_test.do_send_receive(sock, pubrec_packet1, pubrel_packet1, "pubrel1s")
  107. mosq_test.do_send_receive(sock, pubrec_packet2, pubrel_packet2, "pubrel2s")
  108. mosq_test.do_send_receive(sock, pubrec_packet3, pubrel_packet3, "pubrel3s")
  109. mosq_test.do_send_receive(sock, pubrec_packet4, pubrel_packet4, "pubrel4s")
  110. mosq_test.do_send_receive(sock, pubrec_packet5, pubrel_packet5, "pubrel5s")
  111. rc = 0
  112. sock.close()
  113. except mosq_test.TestError:
  114. pass
  115. except Exception as e:
  116. print(e)
  117. finally:
  118. os.remove(conf_file)
  119. broker.terminate()
  120. broker.wait()
  121. (stdo, stde) = broker.communicate()
  122. if rc:
  123. #print(stde.decode('utf-8'))
  124. print("proto_ver=%d" % (proto_ver))
  125. exit(rc)
  126. do_test(proto_ver=5)
  127. exit(0)