14-dynsec-disable-client.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. #!/usr/bin/env python3
  2. from mosq_test_helper import *
  3. import json
  4. import shutil
  5. def write_config(filename, port):
  6. with open(filename, 'w') as f:
  7. f.write("listener %d\n" % (port))
  8. f.write("allow_anonymous true\n")
  9. f.write("plugin ../../plugins/dynamic-security/mosquitto_dynamic_security.so\n")
  10. f.write("plugin_opt_config_file %d/dynamic-security.json\n" % (port))
  11. def command_check(sock, command_payload, expected_response):
  12. command_packet = mosq_test.gen_publish(topic="$CONTROL/dynamic-security/v1", qos=0, payload=json.dumps(command_payload))
  13. sock.send(command_packet)
  14. response = json.loads(mosq_test.read_publish(sock))
  15. if response != expected_response:
  16. print(expected_response)
  17. print(response)
  18. raise ValueError(response)
  19. port = mosq_test.get_port()
  20. conf_file = os.path.basename(__file__).replace('.py', '.conf')
  21. write_config(conf_file, port)
  22. add_client_command = { "commands": [{
  23. "command": "createClient", "username": "user_one",
  24. "password": "password", "clientid": "cid",
  25. "textname": "Name", "textdescription": "Description",
  26. "rolename": "", "correlationData": "2" }]
  27. }
  28. add_client_response = {'responses': [{'command': 'createClient', 'correlationData': '2'}]}
  29. add_client_repeat_response = {'responses':[{"command":"createClient","error":"Client already exists", "correlationData":"2"}]}
  30. get_client_command = { "commands": [{
  31. "command": "getClient", "username": "user_one"}]}
  32. get_client_response1 = {'responses':[{'command': 'getClient', 'data': {'client': {'username': 'user_one', 'clientid': 'cid',
  33. 'textname': 'Name', 'textdescription': 'Description', 'groups': [], 'roles': []}}}]}
  34. get_client_response2 = {'responses':[{'command': 'getClient', 'data': {'client': {'username': 'user_one', 'clientid': 'cid',
  35. 'textname': 'Name', 'textdescription': 'Description', 'disabled':True, 'groups': [], 'roles': []}}}]}
  36. disable_client_command = { "commands": [{
  37. "command": "disableClient", "username": "user_one"}]}
  38. disable_client_response = {'responses':[{'command': 'disableClient'}]}
  39. enable_client_command = { "commands": [{
  40. "command": "enableClient", "username": "user_one"}]}
  41. enable_client_response = {'responses':[{'command': 'enableClient'}]}
  42. rc = 1
  43. keepalive = 10
  44. connect_packet = mosq_test.gen_connect("ctrl-test", keepalive=keepalive, username="admin", password="admin")
  45. connack_packet = mosq_test.gen_connack(rc=0)
  46. client_connect_packet = mosq_test.gen_connect("cid", keepalive=keepalive, username="user_one", password="password")
  47. client_connack_packet1 = mosq_test.gen_connack(rc=5)
  48. client_connack_packet2 = mosq_test.gen_connack(rc=0)
  49. mid = 2
  50. subscribe_packet = mosq_test.gen_subscribe(mid, "$CONTROL/dynamic-security/#", 1)
  51. suback_packet = mosq_test.gen_suback(mid, 1)
  52. try:
  53. os.mkdir(str(port))
  54. shutil.copyfile("dynamic-security-init.json", "%d/dynamic-security.json" % (port))
  55. except FileExistsError:
  56. pass
  57. broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
  58. try:
  59. sock = mosq_test.do_client_connect(connect_packet, connack_packet, timeout=5, port=port)
  60. mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
  61. # Add client
  62. command_check(sock, add_client_command, add_client_response)
  63. # Get client
  64. command_check(sock, get_client_command, get_client_response1)
  65. # Disable client
  66. command_check(sock, disable_client_command, disable_client_response)
  67. # Get client - should be disabled
  68. command_check(sock, get_client_command, get_client_response2)
  69. # Try to log in - should fail
  70. client_sock = mosq_test.do_client_connect(client_connect_packet, client_connack_packet1, timeout=5, port=port)
  71. # Enable client
  72. command_check(sock, enable_client_command, enable_client_response)
  73. # Get client - should be enabled
  74. command_check(sock, get_client_command, get_client_response1)
  75. # Try to log in - should succeed
  76. client_sock = mosq_test.do_client_connect(client_connect_packet, client_connack_packet2, timeout=5, port=port)
  77. client_sock.close()
  78. rc = 0
  79. sock.close()
  80. except mosq_test.TestError:
  81. pass
  82. finally:
  83. os.remove(conf_file)
  84. try:
  85. os.remove(f"{port}/dynamic-security.json")
  86. except FileNotFoundError:
  87. pass
  88. os.rmdir(f"{port}")
  89. broker.terminate()
  90. broker.wait()
  91. (stdo, stde) = broker.communicate()
  92. if rc:
  93. print(stde.decode('utf-8'))
  94. exit(rc)