09-plugin-acl-change.py 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. #!/usr/bin/env python3
  2. # A clean start=False client connects, and publishes to a topic it has access
  3. # to with QoS 2 - but does not send a PUBREL. It closes the connection. The
  4. # access to the topic is revoked, the client reconnects and it attempts to
  5. # complete the flow. Is the publish allowed? It should not be.
  6. from mosq_test_helper import *
  7. def write_config(filename, port, plugin_ver):
  8. with open(filename, 'w') as f:
  9. f.write("listener %d\n" % (port))
  10. f.write("auth_plugin c/auth_plugin_acl_change.so\n")
  11. f.write("allow_anonymous true\n")
  12. def do_test(plugin_ver):
  13. port = mosq_test.get_port()
  14. conf_file = os.path.basename(__file__).replace('.py', '.conf')
  15. write_config(conf_file, port, plugin_ver)
  16. rc = 1
  17. connect1_packet = mosq_test.gen_connect("acl-change-test", clean_session=False)
  18. connack1_packet = mosq_test.gen_connack(rc=0)
  19. connect2_packet = mosq_test.gen_connect("acl-change-test", clean_session=False)
  20. connack2_packet = mosq_test.gen_connack(rc=0,flags=1)
  21. mid = 1
  22. subscribe_packet = mosq_test.gen_subscribe(mid, "#", 0)
  23. suback_packet = mosq_test.gen_suback(mid, 0)
  24. mid = 2
  25. publish1_packet = mosq_test.gen_publish("publish/topic", qos=2, mid=mid, payload="message")
  26. pubrec1_packet = mosq_test.gen_pubrec(mid)
  27. pubrel1_packet = mosq_test.gen_pubrel(mid)
  28. pubcomp1_packet = mosq_test.gen_pubcomp(mid)
  29. broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
  30. try:
  31. sock = mosq_test.do_client_connect(connect1_packet, connack1_packet, timeout=20, port=port)
  32. mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback 1")
  33. mosq_test.do_send_receive(sock, publish1_packet, pubrec1_packet, "pubrec")
  34. sock.close()
  35. # ACL has changed
  36. sock = mosq_test.do_client_connect(connect2_packet, connack2_packet, timeout=20, port=port)
  37. mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback 2")
  38. mosq_test.do_send_receive(sock, pubrel1_packet, pubcomp1_packet, "pubcomp")
  39. mosq_test.do_ping(sock)
  40. rc = 0
  41. sock.close()
  42. except mosq_test.TestError:
  43. pass
  44. except Exception as err:
  45. print(err)
  46. finally:
  47. os.remove(conf_file)
  48. broker.terminate()
  49. broker.wait()
  50. (stdo, stde) = broker.communicate()
  51. if rc:
  52. print(stde.decode('utf-8'))
  53. exit(rc)
  54. do_test(4)