13-malformed-unsubscribe-v5.py 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071
  1. #!/usr/bin/env python3
  2. # Test whether the broker handles malformed packets correctly - UNSUBSCRIBE
  3. # MQTTv5
  4. from mosq_test_helper import *
  5. rc = 1
  6. def do_test(unsubscribe_packet, reason_code, error_string):
  7. global rc
  8. rc = 1
  9. keepalive = 10
  10. connect_packet = mosq_test.gen_connect("test", proto_ver=5, keepalive=keepalive)
  11. connack_packet = mosq_test.gen_connack(rc=0, proto_ver=5)
  12. mid = 0
  13. disconnect_packet = mosq_test.gen_disconnect(proto_ver=5, reason_code=reason_code)
  14. sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
  15. mosq_test.do_send_receive(sock, unsubscribe_packet, disconnect_packet, error_string=error_string)
  16. rc = 0
  17. port = mosq_test.get_port()
  18. broker = mosq_test.start_broker(filename=os.path.basename(__file__), port=port)
  19. try:
  20. # mid == 0
  21. unsubscribe_packet = mosq_test.gen_unsubscribe(topic="test/topic", mid=0, proto_ver=5)
  22. do_test(unsubscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "mid == 0")
  23. # command flags != 0x02
  24. unsubscribe_packet = mosq_test.gen_unsubscribe(topic="test/topic", mid=1, proto_ver=5, cmd=160)
  25. do_test(unsubscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "command flags != 0x02")
  26. # Incorrect property
  27. props = mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, 0)
  28. unsubscribe_packet = mosq_test.gen_unsubscribe(topic="test/topic", mid=1, proto_ver=5, properties=props)
  29. do_test(unsubscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Incorrect property")
  30. # Truncated packet, no mid
  31. unsubscribe_packet = struct.pack("!BB", 162, 0)
  32. do_test(unsubscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, no mid")
  33. # Truncated packet, no properties
  34. unsubscribe_packet = struct.pack("!BBH", 162, 2, 1)
  35. do_test(unsubscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, no properties")
  36. # Truncated packet, with properties field, no topic
  37. unsubscribe_packet = struct.pack("!BBHH", 162, 4, 1, 0)
  38. do_test(unsubscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with properties field, no topic")
  39. # Truncated packet, with properties field, empty topic
  40. unsubscribe_packet = struct.pack("!BBHHH", 162, 5, 1, 0, 0)
  41. do_test(unsubscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Truncated packet, with properties field, empty topic")
  42. # Bad topic
  43. unsubscribe_packet = mosq_test.gen_unsubscribe(topic="#/test/topic", mid=1, proto_ver=5)
  44. do_test(unsubscribe_packet, mqtt5_rc.MQTT_RC_MALFORMED_PACKET, "Bad topic")
  45. except mosq_test.TestError:
  46. pass
  47. finally:
  48. broker.terminate()
  49. broker.wait()
  50. (stdo, stde) = broker.communicate()
  51. if rc:
  52. print(stde.decode('utf-8'))
  53. exit(rc)