09-acl-empty-file.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. #!/usr/bin/env python3
  2. # Test for CVE-2018-xxxxx
  3. from mosq_test_helper import *
  4. import signal
  5. def write_config(filename, port, per_listener):
  6. with open(filename, 'w') as f:
  7. f.write("per_listener_settings %s\n" % (per_listener))
  8. f.write("port %d\n" % (port))
  9. f.write("allow_anonymous true\n")
  10. f.write("acl_file %s\n" % (filename.replace('.conf', '.acl')))
  11. def write_acl(filename):
  12. with open(filename, 'w') as f:
  13. f.write('#comment\n')
  14. f.write('\n')
  15. def do_test(port, per_listener):
  16. conf_file = os.path.basename(__file__).replace('.py', '.conf')
  17. write_config(conf_file, port, per_listener)
  18. acl_file = os.path.basename(__file__).replace('.py', '.acl')
  19. write_acl(acl_file)
  20. rc = 1
  21. keepalive = 60
  22. connect_packet = mosq_test.gen_connect("acl-check", keepalive=keepalive)
  23. connack_packet = mosq_test.gen_connack(rc=0)
  24. mid = 1
  25. publish_packet = mosq_test.gen_publish("test/topic", qos=0, payload="message")
  26. subscribe_packet = mosq_test.gen_subscribe(mid, "test/topic", 0)
  27. suback_packet = mosq_test.gen_suback(mid, 0)
  28. broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
  29. try:
  30. sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
  31. mosq_test.do_send_receive(sock, subscribe_packet, suback_packet, "suback")
  32. sock.send(publish_packet)
  33. # If we receive the message, this will fail.
  34. mosq_test.do_ping(sock)
  35. rc = 0
  36. sock.close()
  37. except mosq_test.TestError:
  38. pass
  39. finally:
  40. os.remove(conf_file)
  41. os.remove(acl_file)
  42. broker.terminate()
  43. broker.wait()
  44. (stdo, stde) = broker.communicate()
  45. if rc:
  46. print(stde.decode('utf-8'))
  47. exit(rc)
  48. port = mosq_test.get_port()
  49. do_test(port, "false")
  50. do_test(port, "true")