06-bridge-clean-session-core.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. #!/usr/bin/env python3
  2. # Test whether a broker handles cleansession and local_cleansession correctly on bridges
  3. #
  4. # Test cases (with settings for broker A (edge). The settings on broker B (core)
  5. # are irrelevant, though you'll need persistence enabled to test, unless you
  6. # can simulate network interruptions.
  7. # Similarly, you'll need persistence on A, _purely_ to simplify the testing with a client
  8. # t# | LCS | CS | queued from (expected)
  9. # | A->B | B->A
  10. # 1 | -(t| t | no | no
  11. # 2 | -(f| f | yes | yes
  12. # 3 | t | t | no | no (as per #1)
  13. # 4 | t | f | no | yes
  14. # 5 | f | t | yes | no
  15. # 6 | f | f | yes | yes (as per #2)
  16. #
  17. # Test setup is two (real) brokers, so that messages can be published and subscribed in both
  18. # directions, with two test clients, one at each end.
  19. # Disable on Travis for now, too unreliable
  20. import os
  21. if os.environ.get('TRAVIS') is not None:
  22. exit(0)
  23. from mosq_test_helper import *
  24. from collections import namedtuple
  25. # Normally we don't want tests to spew debug, but if you're working on a test, it's useful
  26. VERBOSE_TEST=False
  27. def tprint(*args, **kwargs):
  28. if VERBOSE_TEST:
  29. print(" ".join(map(str,args)), **kwargs)
  30. # this is our "A" broker
  31. def write_config_edge(filename, persistence_file, remote_port, listen_port, protocol_version, cs=False, lcs=None):
  32. with open(filename, 'w') as f:
  33. f.write("port %d\n" % (listen_port))
  34. f.write("allow_anonymous true\n")
  35. f.write("\n")
  36. f.write("persistence true\n")
  37. f.write("persistence_file %s\n" % (persistence_file))
  38. f.write("\n")
  39. f.write("connection bridge_sample\n")
  40. f.write("address 127.0.0.1:%d\n" % (remote_port))
  41. f.write("topic br_out/# out 1\n")
  42. f.write("topic br_in/# in 1\n")
  43. f.write("notifications false\n")
  44. # We need to ensure connections break fast enough to keep test times sane
  45. f.write("keepalive_interval 5\n")
  46. f.write("restart_timeout 5\n")
  47. f.write("cleansession %s\n" % ("true" if cs else "false"))
  48. # Ensure defaults are tested
  49. if lcs is not None:
  50. f.write("local_cleansession %s\n" % ("true" if lcs else "false"))
  51. f.write("bridge_protocol_version %s\n" % (protocol_version))
  52. # this is our "B" broker
  53. def write_config_core(filename, listen_port, persistence_file):
  54. with open(filename, 'w') as f:
  55. f.write("port %d\n" % (listen_port))
  56. f.write("allow_anonymous true\n")
  57. f.write("\n")
  58. f.write("persistence true\n")
  59. f.write("persistence_file %s\n" % (persistence_file))
  60. def do_test(proto_ver, cs, lcs=None):
  61. tprint("Running test with cs:%s, lcs: %s and proto: %d" % (cs, lcs, proto_ver))
  62. if proto_ver == 4:
  63. bridge_protocol = "mqttv311"
  64. else:
  65. bridge_protocol = "mqttv50"
  66. # Match default behaviour of broker
  67. expect_queued_ab = True
  68. expect_queued_ba = True
  69. if lcs is None:
  70. lcs = cs
  71. if lcs:
  72. expect_queued_ab = False
  73. if cs:
  74. expect_queued_ba = False
  75. (port_a_listen, port_b_listen) = mosq_test.get_port(2)
  76. conf_file_a = os.path.basename(__file__).replace('.py', '%d_a_edge.conf'%(port_a_listen))
  77. persistence_file_a = os.path.basename(__file__).replace('.py', '%d_a_edge.db'%(port_a_listen))
  78. write_config_edge(conf_file_a, persistence_file_a, port_b_listen, port_a_listen, bridge_protocol, cs=cs, lcs=lcs)
  79. conf_file_b = os.path.basename(__file__).replace('.py', '%d_b_core.conf'%(port_b_listen))
  80. persistence_file_b = os.path.basename(__file__).replace('.py', '%d_b_core.db'%(port_b_listen))
  81. write_config_core(conf_file_b, port_b_listen, persistence_file_b)
  82. AckedPair = namedtuple("AckedPair", "p ack")
  83. def make_conn(client_tag, proto, cs, session_present=False):
  84. client_id = socket.gethostname() + "." + client_tag
  85. keepalive = 60
  86. conn = mosq_test.gen_connect(client_id, keepalive=keepalive, clean_session=cs, proto_ver=proto, session_expiry=0 if cs else 5000)
  87. connack = mosq_test.gen_connack(rc=0, proto_ver=proto_ver, flags=1 if session_present else 0)
  88. return AckedPair(conn, connack)
  89. def make_sub(topic, mid, qos, proto):
  90. if proto_ver == 5:
  91. opts = mqtt5_opts.MQTT_SUB_OPT_NO_LOCAL | mqtt5_opts.MQTT_SUB_OPT_RETAIN_AS_PUBLISHED
  92. else:
  93. opts = 0
  94. sub = mosq_test.gen_subscribe(mid, topic, qos | opts, proto_ver=proto)
  95. suback = mosq_test.gen_suback(mid, qos, proto_ver=proto)
  96. return AckedPair(sub, suback)
  97. def make_pub(topic, mid, proto, qos=1, payload_tag="message", rc=-1):
  98. # Using the mid automatically makes it hard to verify messages that might have been retransmitted.
  99. # encourage users to put sequence numbers in topics instead....
  100. pub = mosq_test.gen_publish(topic, mid=mid, qos=qos, retain=False, payload=payload_tag + "-from-" + topic, proto_ver=proto)
  101. puback = mosq_test.gen_puback(mid, proto_ver=proto, reason_code=rc)
  102. return AckedPair(pub, puback)
  103. # Clients are testing messages in both directions, they need to be durable
  104. conn_a = make_conn("client_a_edge", proto_ver, False)
  105. conn_b = make_conn("client_b_core", proto_ver, False)
  106. # We expect session present when we reconnect
  107. reconn_a = make_conn("client_a_edge", proto_ver, False, session_present=True)
  108. reconn_b = make_conn("client_b_core", proto_ver, False, session_present=True)
  109. # remember, mids are from each broker's point of view, not the "world"
  110. sub_a = make_sub("br_in/#", qos=1, mid=1, proto=proto_ver)
  111. sub_b = make_sub("br_out/#", qos=1, mid=1, proto=proto_ver)
  112. pub_a1 = make_pub("br_out/test-queued1", mid=1, proto=proto_ver)
  113. pub_a2 = make_pub("br_out/test-queued2", mid=2, proto=proto_ver)
  114. pub_a3 = make_pub("br_out/test-queued3", mid=3, proto=proto_ver)
  115. pub_a3r = make_pub("br_out/test-queued3", mid=2, proto=proto_ver) # without queueing, there is no a2
  116. pub_b1 = make_pub("br_in/test-queued1", mid=1, proto=proto_ver)
  117. pub_b2 = make_pub("br_in/test-queued2", mid=2, proto=proto_ver)
  118. pub_b3 = make_pub("br_in/test-queued3", mid=3, proto=proto_ver)
  119. pub_b3r = make_pub("br_in/test-queued3", mid=2, proto=proto_ver) # without queueing, there is no b2
  120. success = False
  121. stde_a1 = stde_b1 = None
  122. try:
  123. # b must start first, as it's the destination of a
  124. broker_b = mosq_test.start_broker(filename=conf_file_b, port=port_b_listen, use_conf=True)
  125. broker_a = mosq_test.start_broker(filename=conf_file_a, port=port_a_listen, use_conf=True)
  126. client_a = mosq_test.do_client_connect(conn_a.p, conn_a.ack, port=port_a_listen)
  127. mosq_test.do_send_receive(client_a, sub_a.p, sub_a.ack, "suback_a")
  128. client_b = mosq_test.do_client_connect(conn_b.p, conn_b.ack, port=port_b_listen)
  129. mosq_test.do_send_receive(client_b, sub_b.p, sub_b.ack, "suback_b")
  130. mosq_test.do_send_receive(client_a, pub_a1.p, pub_a1.ack, "puback_a1")
  131. mosq_test.do_receive_send(client_b, pub_a1.p, pub_a1.ack, "a->b1 (b-side)")
  132. mosq_test.do_send_receive(client_b, pub_b1.p, pub_b1.ack, "puback_b1")
  133. mosq_test.do_receive_send(client_a, pub_b1.p, pub_b1.ack, "b->a1 (a-side)")
  134. tprint("Normal bi-dir bridging works. continuing")
  135. broker_b.terminate()
  136. broker_b.wait()
  137. (stdo_b1, stde_b1) = broker_b.communicate()
  138. # as we're _terminating_ the connections should close ~straight away
  139. tprint("terminated B", time.time())
  140. time.sleep(0.5)
  141. # should be queued (or not)
  142. mosq_test.do_send_receive(client_a, pub_a2.p, pub_a2.ack, "puback_a2")
  143. broker_b = mosq_test.start_broker(filename=conf_file_b, port=port_b_listen, use_conf=True)
  144. # client b needs to reconnect now!
  145. client_b = mosq_test.do_client_connect(reconn_b.p, reconn_b.ack, port=port_b_listen)
  146. tprint("client b reconnected after restarting broker b at ", time.time())
  147. # Need to sleep long enough to be sure of a re-connection...
  148. time.sleep(10) # yuck, this makes the test run for ages!
  149. # should go through
  150. tprint("(B should be alive again now!) sending (after reconn!) a3 at ", time.time())
  151. mosq_test.do_send_receive(client_a, pub_a3.p, pub_a3.ack, "puback_a3")
  152. if expect_queued_ab:
  153. tprint("1.expecting a->b queueing")
  154. mosq_test.do_receive_send(client_b, pub_a2.p, pub_a2.ack, "a->b_2")
  155. mosq_test.do_receive_send(client_b, pub_a3.p, pub_a3.ack, "a->b_3")
  156. else:
  157. tprint("not expecting a->b queueing")
  158. mosq_test.do_receive_send(client_b, pub_a3r.p, pub_a3r.ack, "a->b_3(r)")
  159. tprint("Stage 1 complete, repeating in other direction")
  160. # ok, now repeat in the other direction...
  161. broker_a.terminate()
  162. broker_a.wait()
  163. (stdo_a1, stde_a1) = broker_a.communicate()
  164. time.sleep(0.5)
  165. mosq_test.do_send_receive(client_b, pub_b2.p, pub_b2.ack, "puback_b2")
  166. broker_a = mosq_test.start_broker(filename=conf_file_a, port=port_a_listen, use_conf=True)
  167. # client a needs to reconnect now!
  168. client_a = mosq_test.do_client_connect(reconn_a.p, reconn_a.ack, port=port_a_listen)
  169. tprint("client A reconnected after restarting broker A at ", time.time())
  170. # Need to sleep long enough to be sure of a re-connection...
  171. time.sleep(10) # yuck, this makes the test run for ages!
  172. # should go through
  173. mosq_test.do_send_receive(client_b, pub_b3.p, pub_b3.ack, "puback_b3")
  174. if expect_queued_ba:
  175. tprint("2.expecting b->a queueueing")
  176. mosq_test.do_receive_send(client_a, pub_b2.p, pub_b2.ack, "b->a_2")
  177. mosq_test.do_receive_send(client_a, pub_b3.p, pub_b3.ack, "b->a_3")
  178. else:
  179. tprint("not expecting message b->a_2")
  180. mosq_test.do_receive_send(client_a, pub_b3r.p, pub_b3r.ack, "b->a_3(r)")
  181. success = True
  182. except mosq_test.TestError:
  183. pass
  184. finally:
  185. os.remove(conf_file_a)
  186. os.remove(conf_file_b)
  187. broker_a.terminate()
  188. broker_b.terminate()
  189. broker_a.wait()
  190. broker_b.wait()
  191. (stdo_a, stde_a) = broker_a.communicate()
  192. (stdo_b, stde_b) = broker_b.communicate()
  193. # Must be after terminating!
  194. try:
  195. os.remove(persistence_file_a)
  196. except FileNotFoundError:
  197. print("persistence file a didn't exist, skipping remove")
  198. try:
  199. os.remove(persistence_file_b)
  200. except FileNotFoundError:
  201. print("persistence file b didn't exist, skipping remove")
  202. if not success:
  203. print("Test failed, dumping broker A logs: ")
  204. if stde_a1:
  205. print(stde_a1.decode('utf-8'))
  206. print(stde_a.decode('utf-8'))
  207. print("Test failed, dumping broker B logs: ")
  208. if stde_b1:
  209. print(stde_b1.decode('utf-8'))
  210. print(stde_b.decode('utf-8'))
  211. exit(1)
  212. if sys.argv[3] == "True":
  213. cs = True
  214. elif sys.argv[3] == "False":
  215. cs = False
  216. else:
  217. raise ValueError("cs")
  218. if sys.argv[4] == "True":
  219. lcs = True
  220. elif sys.argv[4] == "False":
  221. lcs = False
  222. elif sys.argv[4] == "None":
  223. lcs = None
  224. else:
  225. raise ValueError("lcs")
  226. do_test(proto_ver=4, cs=cs, lcs=lcs)
  227. # FIXME - v5 clean session bridging doesn't work: see
  228. # https://github.com/eclipse/mosquitto/issues/1632
  229. #do_test(proto_ver=5, cs=cs, lcs=lcs)
  230. exit(0)