123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- #!/usr/bin/env python3
- # Test whether a PUBLISH to a topic with an offline subscriber results in a queued message
- import Queue
- import random
- import string
- import subprocess
- import socket
- import threading
- import time
- try:
- import paho.mqtt.client
- import paho.mqtt.publish
- except ImportError:
- print("WARNING: paho.mqtt module not available, skipping byte count test.")
- exit(0)
- from mosq_test_helper import *
- rc = 1
- port = mosq_test.get_port()
- def registerOfflineSubscriber():
- """Just a durable client to trigger queuing"""
- client = paho.mqtt.client.Client("sub-qos1-offline", clean_session=False)
- client.connect("localhost", port=port)
- client.subscribe("test/publish/queueing/#", 1)
- client.loop()
- client.disconnect()
- broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
- class BrokerMonitor(threading.Thread):
- def __init__(self, group=None, target=None, name=None, args=(), kwargs=None, verbose=None):
- threading.Thread.__init__(self, group=group, target=target, name=name, verbose=verbose)
- self.rq, self.cq = args
- self.stored = -1
- self.stored_bytes = -1
- self.dropped = -1
- def store_count(self, client, userdata, message):
- self.stored = int(message.payload)
- def store_bytes(self, client, userdata, message):
- self.stored_bytes = int(message.payload)
- def publish_dropped(self, client, userdata, message):
- self.dropped = int(message.payload)
- def run(self):
- client = paho.mqtt.client.Client("broker-monitor")
- client.connect("localhost", port=port)
- client.message_callback_add("$SYS/broker/store/messages/count", self.store_count)
- client.message_callback_add("$SYS/broker/store/messages/bytes", self.store_bytes)
- client.message_callback_add("$SYS/broker/publish/messages/dropped", self.publish_dropped)
- client.subscribe("$SYS/broker/store/messages/#")
- client.subscribe("$SYS/broker/publish/messages/dropped")
- while True:
- expect_drops = cq.get()
- self.cq.task_done()
- if expect_drops == "quit":
- break
- first = time.time()
- while self.stored < 0 or self.stored_bytes < 0 or (expect_drops and self.dropped < 0):
- client.loop(timeout=0.5)
- if time.time() - 10 > first:
- print("ABORT TIMEOUT")
- break
- if expect_drops:
- self.rq.put((self.stored, self.stored_bytes, self.dropped))
- else:
- self.rq.put((self.stored, self.stored_bytes, 0))
- self.stored = -1
- self.stored_bytes = -1
- self.dropped = -1
- client.disconnect()
- rq = Queue.Queue()
- cq = Queue.Queue()
- brokerMonitor = BrokerMonitor(args=(rq,cq))
- class StoreCounts():
- def __init__(self):
- self.stored = 0
- self.bstored = 0
- self.drops = 0
- self.diff_stored = 0
- self.diff_bstored = 0
- self.diff_drops = 0
- def update(self, tup):
- self.diff_stored = tup[0] - self.stored
- self.stored = tup[0]
- self.diff_bstored = tup[1] - self.bstored
- self.bstored = tup[1]
- self.diff_drops = tup[2] - self.drops
- self.drops = tup[2]
- def __repr__(self):
- return "s: %d (%d) b: %d (%d) d: %d (%d)" % (self.stored, self.diff_stored, self.bstored, self.diff_bstored, self.drops, self.diff_drops)
- try:
- registerOfflineSubscriber()
- time.sleep(2.5) # Wait for first proper dump of stats
- brokerMonitor.start()
- counts = StoreCounts()
- cq.put(True) # Expect a dropped count (of 0, initial)
- counts.update(rq.get()) # Initial start
- print("rq.get (INITIAL) gave us: ", counts)
- rq.task_done()
- # publish 10 short messages, should be no drops
- print("publishing 10 short")
- cq.put(False) # expect no updated drop count
- msgs_short10 = [("test/publish/queueing/%d" % x,
- ''.join(random.choice(string.hexdigits) for _ in range(10)),
- 1, False) for x in range(1, 10 + 1)]
- paho.mqtt.publish.multiple(msgs_short10, port=port)
- counts.update(rq.get()) # Initial start
- print("rq.get (short) gave us: ", counts)
- rq.task_done()
- if counts.diff_stored != 10 or counts.diff_bstored < 100:
- raise ValueError
- if counts.diff_drops != 0:
- raise ValueError
- # publish 10 mediums (40bytes). should fail after 8, when it finally crosses 400
- print("publishing 10 medium")
- cq.put(True) # expect a drop count
- msgs_medium10 = [("test/publish/queueing/%d" % x,
- ''.join(random.choice(string.hexdigits) for _ in range(40)),
- 1, False) for x in range(1, 10 + 1)]
- paho.mqtt.publish.multiple(msgs_medium10, port=port)
- counts.update(rq.get()) # Initial start
- print("rq.get (medium) gave us: ", counts)
- rq.task_done()
- if counts.diff_stored != 8 or counts.diff_bstored < 320:
- raise ValueError
- if counts.diff_drops != 2:
- raise ValueError
- rc = 0
- except mosq_test.TestError:
- pass
- finally:
- cq.put("quit")
- brokerMonitor.join()
- rq.join()
- cq.join()
- broker.terminate()
- (stdo, stde) = broker.communicate()
- if rc:
- print(stde.decode('utf-8'))
- exit(rc)
|