07-will-invalid-utf8.py 1.0 KB

123456789101112131415161718192021222324252627282930313233343536373839
  1. #!/usr/bin/env python3
  2. # Test whether a will topic with invalid UTF-8 fails
  3. from mosq_test_helper import *
  4. def do_test(proto_ver):
  5. rc = 1
  6. mid = 53
  7. keepalive = 60
  8. connect_packet = mosq_test.gen_connect("will-invalid-utf8", keepalive=keepalive, will_topic="invalid/utf8", proto_ver=proto_ver)
  9. b = list(struct.unpack("B"*len(connect_packet), connect_packet))
  10. b[40] = 0 # Topic should never have a 0x0000
  11. connect_packet = struct.pack("B"*len(b), *b)
  12. port = mosq_test.get_port()
  13. broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
  14. try:
  15. sock = mosq_test.do_client_connect(connect_packet, b"", timeout=30, port=port)
  16. rc = 0
  17. sock.close()
  18. except mosq_test.TestError:
  19. pass
  20. finally:
  21. broker.terminate()
  22. broker.wait()
  23. (stdo, stde) = broker.communicate()
  24. if rc:
  25. print(stde.decode('utf-8'))
  26. print("proto_ver=%d" % (proto_ver))
  27. exit(rc)
  28. do_test(proto_ver=4)
  29. do_test(proto_ver=5)
  30. exit(0)