01-connect-uname-password-denied-no-will.py 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. #!/usr/bin/env python3
  2. # Test whether a connection is denied if it provides a correct username but
  3. # incorrect password. The client has a will, but it should not be sent. Check that.
  4. from mosq_test_helper import *
  5. def write_config(filename, port, pw_file):
  6. with open(filename, 'w') as f:
  7. f.write("port %d\n" % (port))
  8. f.write("password_file %s\n" % (pw_file))
  9. f.write("allow_anonymous false\n")
  10. def write_pwfile(filename):
  11. with open(filename, 'w') as f:
  12. # Username user, password password
  13. f.write('user:$6$vZY4TS+/HBxHw38S$vvjVFECzb8dyuu/mruD2QKTfdFn0WmKxbc+1TsdB0L8EdHk3v9JRmfjHd56+VaTnUcSZOZ/hzkdvWCtxlX7AUQ==\n')
  14. def do_test(proto_ver):
  15. pw_file = os.path.basename(__file__).replace('.py', '.pwfile')
  16. port = mosq_test.get_port()
  17. conf_file = os.path.basename(__file__).replace('.py', '.conf')
  18. write_config(conf_file, port, pw_file)
  19. write_pwfile(pw_file)
  20. rc = 1
  21. keepalive = 10
  22. connect1_packet = mosq_test.gen_connect("connect-uname-pwd-test", keepalive=keepalive, username="user", password="password", will_topic="will/test", will_payload=b"will msg", proto_ver=proto_ver)
  23. connack1_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
  24. mid = 1
  25. subscribe_packet = mosq_test.gen_subscribe(mid, topic="will/test", qos=0, proto_ver=proto_ver)
  26. suback_packet = mosq_test.gen_suback(mid, 0, proto_ver=proto_ver)
  27. connect2_packet = mosq_test.gen_connect("connect-uname-pwd-test", keepalive=keepalive, username="user", password="password9", proto_ver=proto_ver)
  28. if proto_ver == 5:
  29. connack2_packet = mosq_test.gen_connack(rc=mqtt5_rc.MQTT_RC_NOT_AUTHORIZED, proto_ver=proto_ver, properties=None)
  30. else:
  31. connack2_packet = mosq_test.gen_connack(rc=5, proto_ver=proto_ver)
  32. broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
  33. try:
  34. sock1 = mosq_test.do_client_connect(connect1_packet, connack1_packet, port=port)
  35. mosq_test.do_send_receive(sock1, subscribe_packet, suback_packet)
  36. sock2 = mosq_test.do_client_connect(connect2_packet, connack2_packet, port=port)
  37. sock2.close()
  38. # If we receive a will here, this is an error
  39. mosq_test.do_ping(sock1)
  40. sock1.close()
  41. rc = 0
  42. except mosq_test.TestError:
  43. pass
  44. finally:
  45. os.remove(conf_file)
  46. os.remove(pw_file)
  47. broker.terminate()
  48. broker.wait()
  49. (stdo, stde) = broker.communicate()
  50. if rc:
  51. print(stde.decode('utf-8'))
  52. print("proto_ver=%d" % (proto_ver))
  53. exit(rc)
  54. do_test(proto_ver=4)
  55. do_test(proto_ver=5)
  56. exit(0)