123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- #!/usr/bin/env python3
- # Test whether a valid CONNECT results in the correct CONNACK packet.
- from mosq_test_helper import *
- import importlib
- from os import walk
- import socket
- import json
- from collections import deque
- import mosq_test
- send = 1
- recv = 2
- disconnected_check = 3
- connected_check = 4
- publish = 5
- class SingleMsg(object):
- __slots__ = 'action', 'message', 'comment'
- def __init__(self, action, message, comment=''):
- self.action = action
- self.message = message
- self.comment = comment
- class MsgSequence(object):
- __slots__ = 'name', 'msgs', 'expect_disconnect'
- def __init__(self, name, default_connect=True, proto_ver=4, expect_disconnect=True):
- self.name = name
- self.msgs = deque()
- self.expect_disconnect = expect_disconnect
- if default_connect:
- self.add_default_connect(proto_ver=proto_ver)
- def add_default_connect(self, proto_ver):
- self.add_send(mosq_test.gen_connect(self.name, keepalive=60, proto_ver=proto_ver))
- self.add_recv(mosq_test.gen_connack(rc=0, proto_ver=proto_ver), "default connack")
- def add_send(self, message):
- self._add(send, message)
- def add_recv(self, message, comment):
- self._add(recv, message, comment)
- def add_publish(self, message, comment):
- self._add(publish, message, comment)
- def add_connected_check(self):
- self._add(connected_check, b"")
- def add_disconnected_check(self):
- self._add(disconnected_check, b"")
- def _add(self, action, message, comment=""):
- msg = SingleMsg(action, message, comment)
- self.msgs.append(msg)
- def _connected_check(self, sock):
- try:
- mosq_test.do_ping(sock)
- except mosq_test.TestError:
- raise ValueError("connection failed")
- def _send_message(self, sock, msg):
- sock.send(msg.message)
- def _publish_message(self, msg):
- sock = mosq_test.client_connect_only(hostname="localhost", port=1888, timeout=2)
- sock.send(mosq_test.gen_connect("helper", keepalive=60))
- mosq_test.expect_packet(sock, "connack", mosq_test.gen_connack(rc=0))
- m = msg.message
- if m['qos'] == 0:
- sock.send(mosq_test.gen_publish(topic=m['topic'], payload=m['payload']))
- elif m['qos'] == 1:
- sock.send(mosq_test.gen_publish(mid=1, qos=1, topic=m['topic'], payload=m['payload']))
- mosq_test.expect_packet(sock, "helper puback", mosq_test.gen_puback(mid=1))
- elif m['qos'] == 2:
- sock.send(mosq_test.gen_publish(mid=1, qos=2, topic=m['topic'], payload=m['payload']))
- mosq_test.expect_packet(sock, "helper pubrec", mosq_test.gen_pubrec(mid=1))
- sock.send(mosq_test.gen_pubrel(mid=1))
- mosq_test.expect_packet(sock, "helper pubcomp", mosq_test.gen_pubcomp(mid=1))
- sock.close()
- def _recv_message(self, sock, msg):
- data = sock.recv(len(msg.message))
- if data != msg.message:
- raise ValueError("Receive message %s | %s | %s" % (msg.comment, data, msg.message))
- def _disconnected_check(self, sock):
- try:
- data = sock.recv(1)
- if len(data) == 1 and self.expect_disconnect:
- raise ValueError("Still connected")
- except ConnectionResetError:
- if self.expect_disconnect:
- pass
- else:
- raise
- def _process_message(self, sock, msg):
- if msg.action == send:
- self._send_message(sock, msg)
- elif msg.action == recv:
- self._recv_message(sock, msg)
- elif msg.action == publish:
- self._publish_message(msg)
- elif msg.action == disconnected_check:
- self._disconnected_check(sock)
- elif msg.action == connected_check:
- self._connected_check(sock)
- def process_next(self, sock):
- msg = self.msgs.popleft()
- self._process_message(sock, msg)
- def process_all(self, sock):
- while len(self.msgs):
- self.process_next(sock)
- if self.expect_disconnect:
- self._disconnected_check(sock)
- else:
- self._connected_check(sock)
- def do_test(hostname, port):
- rc = 0
- sequences = []
- for (_, _, filenames) in walk("data"):
- sequences.extend(filenames)
- break
- total = 0
- succeeded = 0
- test = None
- for seq in sorted(sequences):
- if seq[-5:] != ".json":
- continue
- with open("data/"+seq, "r") as f:
- test_file = json.load(f)
- for g in test_file:
- group_name = g["group"]
- try:
- disabled = g["disable"]
- if disabled:
- continue
- except KeyError:
- pass
- tests = g["tests"]
- for t in tests:
- tname = group_name + " " + t["name"]
- try:
- proto_ver = t["ver"]
- except KeyError:
- proto_ver = 4
- try:
- connect = t["connect"]
- except KeyError:
- connect = True
- try:
- expect_disconnect = t["expect_disconnect"]
- except KeyError:
- expect_disconnect = True
- this_test = MsgSequence(tname,
- proto_ver=proto_ver,
- expect_disconnect=expect_disconnect,
- default_connect=connect)
- for m in t["msgs"]:
- try:
- c = m["comment"]
- except KeyError:
- c = ""
- if m["type"] == "send":
- this_test.add_send(bytes.fromhex(m["payload"].replace(" ", "")))
- elif m["type"] == "recv":
- this_test.add_recv(bytes.fromhex(m["payload"].replace(" ", "")), c)
- elif m["type"] == "publish":
- this_test.add_publish(m, c)
- total += 1
- try:
- sock = mosq_test.client_connect_only(hostname=hostname, port=port, timeout=2)
- this_test.process_all(sock)
- print("\033[32m" + tname + "\033[0m")
- succeeded += 1
- except ValueError as e:
- print("\033[31m" + tname + " failed: " + str(e) + "\033[0m")
- rc = 1
- except ConnectionResetError as e:
- print("\033[31m" + tname + " failed: " + str(e) + "\033[0m")
- rc = 1
- except socket.timeout as e:
- print("\033[31m" + tname + " failed: " + str(e) + "\033[0m")
- rc = 1
- except mosq_test.TestError as e:
- print("\033[31m" + tname + " failed: " + str(e) + "\033[0m")
- rc = 1
- print("%d tests total\n%d tests succeeded" % (total, succeeded))
- return rc
- hostname = "localhost"
- port = mosq_test.get_port()
- broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port, nolog=True)
- rc = 0
- try:
- rc = do_test(hostname=hostname, port=port)
- finally:
- broker.terminate()
- broker.wait()
- (stdo, stde) = broker.communicate()
- if rc:
- #print(stde.decode('utf-8'))
- exit(rc)
|