01-connect-uname-or-anon.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970
  1. #!/usr/bin/env python3
  2. # Test whether an anonymous connection is correctly denied.
  3. from mosq_test_helper import *
  4. def write_config(filename, port, allow_anonymous, password_file):
  5. with open(filename, 'w') as f:
  6. f.write("listener %d\n" % (port))
  7. if allow_anonymous:
  8. f.write("allow_anonymous true\n")
  9. else:
  10. f.write("allow_anonymous false\n")
  11. if password_file:
  12. f.write("password_file %s\n" % (filename.replace('.conf', '.pwfile')))
  13. def do_test(allow_anonymous, password_file, username, expect_success):
  14. port = mosq_test.get_port()
  15. conf_file = os.path.basename(__file__).replace('.py', '.conf')
  16. write_config(conf_file, port, allow_anonymous, password_file)
  17. broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
  18. try:
  19. for proto_ver in [4, 5]:
  20. rc = 1
  21. keepalive = 10
  22. if username:
  23. connect_packet = mosq_test.gen_connect("connect-test-%d" % (proto_ver), keepalive=keepalive, proto_ver=proto_ver, username="user", password="password")
  24. else:
  25. connect_packet = mosq_test.gen_connect("connect-test-%d" % (proto_ver), keepalive=keepalive, proto_ver=proto_ver)
  26. if proto_ver == 5:
  27. if expect_success == True:
  28. connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
  29. else:
  30. connack_packet = mosq_test.gen_connack(rc=mqtt5_rc.MQTT_RC_NOT_AUTHORIZED, proto_ver=proto_ver, properties=None)
  31. else:
  32. if expect_success == True:
  33. connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
  34. else:
  35. connack_packet = mosq_test.gen_connack(rc=5, proto_ver=proto_ver)
  36. sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
  37. sock.close()
  38. rc = 0
  39. except mosq_test.TestError:
  40. pass
  41. finally:
  42. os.remove(conf_file)
  43. broker.terminate()
  44. broker.wait()
  45. (stdo, stde) = broker.communicate()
  46. if rc:
  47. print(stde.decode('utf-8'))
  48. print("proto_ver=%d, allow_anonymous=%d, password_file=%d, username=%d" % (proto_ver, allow_anonymous, password_file, username))
  49. exit(rc)
  50. do_test(allow_anonymous=True, password_file=True, username=True, expect_success=True)
  51. do_test(allow_anonymous=True, password_file=True, username=False, expect_success=True)
  52. do_test(allow_anonymous=True, password_file=False, username=True, expect_success=True)
  53. do_test(allow_anonymous=True, password_file=False, username=False, expect_success=True)
  54. do_test(allow_anonymous=False, password_file=True, username=True, expect_success=True)
  55. do_test(allow_anonymous=False, password_file=True, username=False, expect_success=False)
  56. do_test(allow_anonymous=False, password_file=False, username=True, expect_success=False)
  57. do_test(allow_anonymous=False, password_file=False, username=False, expect_success=False)
  58. exit(0)