13-malformed-subscribe-v5.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. #!/usr/bin/env python3
  2. # Test whether the broker handles malformed packets correctly - SUBSCRIBE
  3. # MQTTv5
  4. from mosq_test_helper import *
  5. rc = 1
  6. def do_test(subscribe_packet, reason_code, error_string):
  7. global rc
  8. rc = 1
  9. keepalive = 10
  10. connect_packet = mosq_test.gen_connect("test", proto_ver=5, keepalive=keepalive)
  11. connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
  12. mid = 0
  13. disconnect_packet = mosq_test.gen_disconnect(proto_ver=5, reason_code=reason_code)
  14. sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
  15. mosq_test.do_send_receive(sock, subscribe_packet, disconnect_packet, error_string=error_string)
  16. rc = 0
  17. port = mosq_test.get_port()
  18. broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
  19. try:
  20. # mid == 0
  21. subscribe_packet = mosq_test.gen_subscribe(topic="test/topic", qos=1, mid=0, proto_ver=5)
  22. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "mid == 0")
  23. # qos > 2
  24. subscribe_packet = mosq_test.gen_subscribe(topic="test/topic", qos=3, mid=1, proto_ver=5)
  25. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "qos > 2")
  26. # retain handling = 0x30
  27. subscribe_packet = mosq_test.gen_subscribe(topic="test/topic", qos=0x30, mid=1, proto_ver=5)
  28. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "retain handling = 0x30")
  29. # subscription options = 0xC0
  30. subscribe_packet = mosq_test.gen_subscribe(topic="test/topic", qos=0xC0, mid=1, proto_ver=5)
  31. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "subscription options = 0xC0")
  32. # command flags != 0x02
  33. subscribe_packet = mosq_test.gen_subscribe(topic="test/topic", qos=1, mid=1, proto_ver=5, cmd=128)
  34. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "command flags != 0x02")
  35. # Incorrect property
  36. props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 0)
  37. subscribe_packet = mosq_test.gen_subscribe(topic="test/topic", qos=1, mid=1, proto_ver=5, properties=props)
  38. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Incorrect property")
  39. # Truncated packet, no mid
  40. subscribe_packet = struct.pack("!BB", 130, 0)
  41. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, no mid")
  42. # Truncated packet, no properties
  43. subscribe_packet = struct.pack("!BBH", 130, 2, 1)
  44. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, no properties")
  45. # Truncated packet, with properties field
  46. subscribe_packet = struct.pack("!BBHB", 130, 3, 1, 0)
  47. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with properties field")
  48. # Truncated packet, with properties field, empty topic
  49. subscribe_packet = struct.pack("!BBHBH", 130, 5, 1, 0, 0)
  50. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with properties field, empty topic")
  51. # Truncated packet, with properties field, empty topic, with qos
  52. subscribe_packet = struct.pack("!BBHBHB", 130, 6, 1, 0, 0, 1)
  53. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with properties field, empty topic, with qos")
  54. # Truncated packet, with properties field, with topic, no qos
  55. subscribe_packet = struct.pack("!BBHBH1s", 130, 6, 1, 0, 1, b"a")
  56. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with properties field, with topic, no qos")
  57. # Truncated packet, with properties field, with 1st topic and qos ok, second topic ok, no second qos
  58. subscribe_packet = struct.pack("!BBHHH1sBH1s", 130, 10, 1, 0, 1, b"a", 0, 1, b"b")
  59. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with properties field, with 1st topic and qos ok, second topic ok, no second qos")
  60. # Bad topic
  61. subscribe_packet = mosq_test.gen_subscribe(topic="#/test/topic", qos=1, mid=1, proto_ver=5)
  62. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Bad topic")
  63. # Subscription ID set to 0
  64. props = mqtt5_props.gen_varint_prop(mqtt5_props.PROP_SUBSCRIPTION_IDENTIFIER, 0)
  65. subscribe_packet = mosq_test.gen_subscribe(topic="test/topic", qos=1, mid=1, proto_ver=5, properties=props)
  66. do_test(subscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Subscription ID set to 0")
  67. except mosq_test.TestError:
  68. pass
  69. finally:
  70. broker.terminate()
  71. broker.wait()
  72. (stdo, stde) = broker.communicate()
  73. if rc:
  74. print(stde.decode('utf-8'))
  75. exit(rc)