03-publish-qos1-queued-bytes.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. #!/usr/bin/env python3
  2. # Test whether a PUBLISH to a topic with an offline subscriber results in a queued message
  3. import Queue
  4. import random
  5. import string
  6. import subprocess
  7. import socket
  8. import threading
  9. import time
  10. try:
  11. import paho.mqtt.client
  12. import paho.mqtt.publish
  13. except ImportError:
  14. print("WARNING: paho.mqtt module not available, skipping byte count test.")
  15. exit(0)
  16. from mosq_test_helper import *
  17. rc = 1
  18. port = mosq_test.get_port()
  19. def registerOfflineSubscriber():
  20. """Just a durable client to trigger queuing"""
  21. client = paho.mqtt.client.Client("sub-qos1-offline", clean_session=False)
  22. client.connect("localhost", port=port)
  23. client.subscribe("test/publish/queueing/#", 1)
  24. client.loop()
  25. client.disconnect()
  26. broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
  27. class BrokerMonitor(threading.Thread):
  28. def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None):
  29. threading.Thread.__init__(self, group=group, target=target, name=name, verbose=verbose)
  30. self.rq, self.cq = args
  31. self.stored = -1
  32. self.stored_bytes = -1
  33. self.dropped = -1
  34. def store_count(self, client, userdata, message):
  35. self.stored = int(message.payload)
  36. def store_bytes(self, client, userdata, message):
  37. self.stored_bytes = int(message.payload)
  38. def publish_dropped(self, client, userdata, message):
  39. self.dropped = int(message.payload)
  40. def run(self):
  41. client = paho.mqtt.client.Client("broker-monitor")
  42. client.connect("localhost", port=port)
  43. client.message_callback_add("$SYS/broker/store/messages/count", self.store_count)
  44. client.message_callback_add("$SYS/broker/store/messages/bytes", self.store_bytes)
  45. client.message_callback_add("$SYS/broker/publish/messages/dropped", self.publish_dropped)
  46. client.subscribe("$SYS/broker/store/messages/#")
  47. client.subscribe("$SYS/broker/publish/messages/dropped")
  48. while True:
  49. expect_drops = cq.get()
  50. self.cq.task_done()
  51. if expect_drops == "quit":
  52. break
  53. first = time.time()
  54. while self.stored < 0 or self.stored_bytes < 0 or (expect_drops and self.dropped < 0):
  55. client.loop(timeout=0.5)
  56. if time.time() - 10 > first:
  57. print("ABORT TIMEOUT")
  58. break
  59. if expect_drops:
  60. self.rq.put((self.stored, self.stored_bytes, self.dropped))
  61. else:
  62. self.rq.put((self.stored, self.stored_bytes, 0))
  63. self.stored = -1
  64. self.stored_bytes = -1
  65. self.dropped = -1
  66. client.disconnect()
  67. rq = Queue.Queue()
  68. cq = Queue.Queue()
  69. brokerMonitor = BrokerMonitor(args=(rq,cq))
  70. class StoreCounts():
  71. def __init__(self):
  72. self.stored = 0
  73. self.bstored = 0
  74. self.drops = 0
  75. self.diff_stored = 0
  76. self.diff_bstored = 0
  77. self.diff_drops = 0
  78. def update(self, tup):
  79. self.diff_stored = tup[0] - self.stored
  80. self.stored = tup[0]
  81. self.diff_bstored = tup[1] - self.bstored
  82. self.bstored = tup[1]
  83. self.diff_drops = tup[2] - self.drops
  84. self.drops = tup[2]
  85. def __repr__(self):
  86. return "s: %d (%d) b: %d (%d) d: %d (%d)" % (self.stored, self.diff_stored, self.bstored, self.diff_bstored, self.drops, self.diff_drops)
  87. try:
  88. registerOfflineSubscriber()
  89. time.sleep(2.5) # Wait for first proper dump of stats
  90. brokerMonitor.start()
  91. counts = StoreCounts()
  92. cq.put(True) # Expect a dropped count (of 0, initial)
  93. counts.update(rq.get()) # Initial start
  94. print("rq.get (INITIAL) gave us: ", counts)
  95. rq.task_done()
  96. # publish 10 short messages, should be no drops
  97. print("publishing 10 short")
  98. cq.put(False) # expect no updated drop count
  99. msgs_short10 = [("test/publish/queueing/%d" % x,
  100. ''.join(random.choice(string.hexdigits) for _ in range(10)),
  101. 1, False) for x in range(1, 10 + 1)]
  102. paho.mqtt.publish.multiple(msgs_short10, port=port)
  103. counts.update(rq.get()) # Initial start
  104. print("rq.get (short) gave us: ", counts)
  105. rq.task_done()
  106. if counts.diff_stored != 10 or counts.diff_bstored < 100:
  107. raise ValueError
  108. if counts.diff_drops != 0:
  109. raise ValueError
  110. # publish 10 mediums (40bytes). should fail after 8, when it finally crosses 400
  111. print("publishing 10 medium")
  112. cq.put(True) # expect a drop count
  113. msgs_medium10 = [("test/publish/queueing/%d" % x,
  114. ''.join(random.choice(string.hexdigits) for _ in range(40)),
  115. 1, False) for x in range(1, 10 + 1)]
  116. paho.mqtt.publish.multiple(msgs_medium10, port=port)
  117. counts.update(rq.get()) # Initial start
  118. print("rq.get (medium) gave us: ", counts)
  119. rq.task_done()
  120. if counts.diff_stored != 8 or counts.diff_bstored < 320:
  121. raise ValueError
  122. if counts.diff_drops != 2:
  123. raise ValueError
  124. rc = 0
  125. except mosq_test.TestError:
  126. pass
  127. finally:
  128. cq.put("quit")
  129. brokerMonitor.join()
  130. rq.join()
  131. cq.join()
  132. broker.terminate()
  133. (stdo, stde) = broker.communicate()
  134. if rc:
  135. print(stde.decode('utf-8'))
  136. exit(rc)