13-malformed-publish-v5.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. #!/usr/bin/env python3
  2. # Test whether the broker handles malformed packets correctly - PUBLISH
  3. # MQTTv5
  4. from mosq_test_helper import *
  5. rc = 1
  6. def write_config(filename, port):
  7. with open(filename, 'w') as f:
  8. f.write("listener %d\n" % (port))
  9. f.write("allow_anonymous true\n")
  10. f.write("maximum_qos 1\n")
  11. f.write("retain_available false\n")
  12. def do_test(publish_packet, reason_code, error_string):
  13. global rc
  14. rc = 1
  15. keepalive = 10
  16. connect_packet = mosq_test.gen_connect("test", proto_ver=5, keepalive=keepalive)
  17. connack_props = mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_TOPIC_ALIAS_MAXIMUM, 10)
  18. connack_props += mqtt5_props.gen_byte_prop(mqtt5_props.PROP_RETAIN_AVAILABLE, 0)
  19. connack_props += mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 20)
  20. connack_props += mqtt5_props.gen_byte_prop(mqtt5_props.PROP_MAXIMUM_QOS, 1)
  21. connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5, properties=connack_props, property_helper=False)
  22. mid = 0
  23. disconnect_packet = mosq_test.gen_disconnect(proto_ver=5, reason_code=reason_code)
  24. sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
  25. mosq_test.do_send_receive(sock, publish_packet, disconnect_packet, error_string=error_string)
  26. rc = 0
  27. port = mosq_test.get_port()
  28. conf_file = os.path.basename(__file__).replace('.py', '.conf')
  29. write_config(conf_file, port)
  30. broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
  31. try:
  32. # mid == 0
  33. publish_packet = mosq_test.gen_publish(topic="test/topic", qos=1, mid=0, proto_ver=5)
  34. do_test(publish_packet, mqtt5_rc.MQTT_RC_PROTOCOL_ERROR, "mid == 0")
  35. # qos > 2
  36. publish_packet = mosq_test.gen_publish(topic="test/topic", qos=3, mid=1, proto_ver=5)
  37. do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "qos > 2")
  38. # qos > maximum qos
  39. publish_packet = mosq_test.gen_publish(topic="test/topic", qos=2, mid=1, proto_ver=5)
  40. do_test(publish_packet, mqtt5_rc.MQTT_RC_QOS_NOT_SUPPORTED, "qos > maximum qos")
  41. # retain not supported
  42. publish_packet = mosq_test.gen_publish(topic="test/topic", qos=0, retain=True, proto_ver=5, payload="a")
  43. do_test(publish_packet, mqtt5_rc.MQTT_RC_RETAIN_NOT_SUPPORTED, "retain not supported")
  44. # Incorrect property
  45. props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 0)
  46. publish_packet = mosq_test.gen_publish(topic="test/topic", qos=1, mid=1, proto_ver=5, properties=props)
  47. do_test(publish_packet, mqtt5_rc.MQTT_RC_PROTOCOL_ERROR, "Incorrect property")
  48. # Truncated packet, remaining length only
  49. publish_packet = struct.pack("!BB", 48, 0)
  50. do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, remaining length only")
  51. # Truncated packet, empty topic
  52. publish_packet = struct.pack("!BBH", 48, 2, 0)
  53. do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, empty topic")
  54. # Truncated packet, with topic, no properties
  55. publish_packet = struct.pack("!BBH1s", 48, 3, 1, b"a")
  56. do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with topic, no properties")
  57. # Truncated packet, with topic, no mid
  58. publish_packet = struct.pack("!BBH1s", 48+2, 3, 1, b"a")
  59. do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with topic, no mid")
  60. # Truncated packet, with topic, with mid, no properties
  61. publish_packet = struct.pack("!BBH1sH", 48+2, 5, 1, b"a", 1)
  62. do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with topic, with mid, no properties")
  63. # Bad topic
  64. publish_packet = mosq_test.gen_publish(topic="#/test/topic", qos=1, mid=1, proto_ver=5)
  65. do_test(publish_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Bad topic")
  66. except mosq_test.TestError:
  67. pass
  68. finally:
  69. broker.terminate()
  70. broker.wait()
  71. (stdo, stde) = broker.communicate()
  72. os.remove(conf_file)
  73. if rc:
  74. print(stde.decode('utf-8'))
  75. exit(rc)