01-connect-windows-line-endings.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. #!/usr/bin/env python3
  2. # Test whether config files with windows line endings are accepted.
  3. # This just connects anonymously - if the config file causes a failure, the
  4. # broker won't start so the connection would fail.
  5. from mosq_test_helper import *
  6. def write_config(filename, port):
  7. with open(filename, 'w') as f:
  8. f.write("listener %d\r\n" % (port))
  9. f.write("allow_anonymous true\r\n")
  10. def do_test():
  11. port = mosq_test.get_port()
  12. conf_file = os.path.basename(__file__).replace('.py', '.conf')
  13. write_config(conf_file, port)
  14. broker = mosq_test.start_broker(filename=os.path.basename(__file__), use_conf=True, port=port)
  15. try:
  16. for proto_ver in [4, 5]:
  17. rc = 1
  18. keepalive = 10
  19. connect_packet = mosq_test.gen_connect("connect-anon-test-%d" % (proto_ver), keepalive=keepalive, proto_ver=proto_ver)
  20. if proto_ver == 5:
  21. connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
  22. else:
  23. connack_packet = mosq_test.gen_connack(rc=0, proto_ver=proto_ver)
  24. sock = mosq_test.do_client_connect(connect_packet, connack_packet, port=port)
  25. sock.close()
  26. rc = 0
  27. except mosq_test.TestError:
  28. pass
  29. finally:
  30. os.remove(conf_file)
  31. broker.terminate()
  32. broker.wait()
  33. (stdo, stde) = broker.communicate()
  34. if rc:
  35. print(stde.decode('utf-8'))
  36. print("proto_ver=%d" % (proto_ver))
  37. exit(rc)
  38. do_test()
  39. exit(0)