09-plugin-auth-context-params.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. #!/usr/bin/env python3
  2. # Test whether message parameters are passed to the plugin acl check function.
  3. from mosq_test_helper import *
  4. def write_config(filename, port):
  5. with open(filename, 'w') as f:
  6. f.write("port %d\n" % (port))
  7. f.write("auth_plugin c/auth_plugin_context_params.so\n")
  8. f.write("allow_anonymous false\n")
  9. port = mosq_test.get_port()
  10. conf_file = os.path.basename(__file__).replace('.py', '.conf')
  11. write_config(conf_file, port)
  12. rc = 1
  13. connect_packet = mosq_test.gen_connect("client-params-test", keepalive=42, username="client-username")
  14. connack_packet = mosq_test.gen_connack(rc=0)
  15. mid = 2
  16. subscribe_packet = mosq_test.gen_subscribe(mid, "param/topic", 1)
  17. suback_packet = mosq_test.gen_suback(mid, 1)
  18. mid = 3
  19. publish_packet = mosq_test.gen_publish(topic="param/topic", qos=1, payload="payload contents", retain=1, mid=mid)
  20. puback_packet = mosq_test.gen_puback(mid)
  21. mid = 1
  22. publish_packet_recv = mosq_test.gen_publish(topic="param/topic", qos=1, payload="payload contents", retain=0, mid=mid)
  23. broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
  24. try:
  25. sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=20, port=port)
  26. rc = 0
  27. sock.close()
  28. except mosq_test.TestError:
  29. pass
  30. finally:
  31. os.remove(conf_file)
  32. broker.terminate()
  33. broker.wait()
  34. (stdo, stde) = broker.communicate()
  35. if rc:
  36. print(stde.decode('utf-8'))
  37. exit(rc)