mosq_test.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. import errno
  2. import os
  3. import socket
  4. import subprocess
  5. import struct
  6. import sys
  7. import time
  8. import mqtt5_props
  9. import __main__
  10. import atexit
  11. vg_index = 1
  12. vg_logfiles = []
  13. class TestError(Exception):
  14. def __init__(self, message="Mismatched packets"):
  15. self.message = message
  16. def start_broker(filename, cmd=None, port=0, use_conf=False, expect_fail=False, nolog=False):
  17. global vg_index
  18. global vg_logfiles
  19. delay = 0.1
  20. if use_conf == True:
  21. cmd = ['../../src/mosquitto', '-v', '-c', filename.replace('.py', '.conf')]
  22. if port == 0:
  23. port = 1888
  24. else:
  25. if cmd is None and port != 0:
  26. cmd = ['../../src/mosquitto', '-v', '-p', str(port)]
  27. elif cmd is None and port == 0:
  28. port = 1888
  29. cmd = ['../../src/mosquitto', '-v', '-c', filename.replace('.py', '.conf')]
  30. elif cmd is not None and port == 0:
  31. port = 1888
  32. if os.environ.get('MOSQ_USE_VALGRIND') is not None:
  33. logfile = filename+'.'+str(vg_index)+'.vglog'
  34. cmd = ['valgrind', '-q', '--trace-children=yes', '--leak-check=full', '--show-leak-kinds=all', '--log-file='+logfile] + cmd
  35. vg_logfiles.append(logfile)
  36. vg_index += 1
  37. delay = 1
  38. #print(port)
  39. #print(cmd)
  40. if nolog == False:
  41. broker = subprocess.Popen(cmd, stderr=subprocess.PIPE)
  42. else:
  43. broker = subprocess.Popen(cmd, stderr=subprocess.DEVNULL)
  44. for i in range(0, 20):
  45. time.sleep(delay)
  46. c = None
  47. try:
  48. c = socket.create_connection(("localhost", port))
  49. except socket.error as err:
  50. if err.errno != errno.ECONNREFUSED:
  51. raise
  52. if c is not None:
  53. c.close()
  54. return broker
  55. if expect_fail == False:
  56. outs, errs = broker.communicate(timeout=1)
  57. print("FAIL: unable to start broker: %s" % errs)
  58. raise IOError
  59. else:
  60. return None
  61. def start_client(filename, cmd, env, port=1888):
  62. if cmd is None:
  63. raise ValueError
  64. if os.environ.get('MOSQ_USE_VALGRIND') is not None:
  65. cmd = ['valgrind', '-q', '--log-file='+filename+'.vglog'] + cmd
  66. cmd = cmd + [str(port)]
  67. return subprocess.Popen(cmd, env=env)
  68. def expect_packet(sock, name, expected):
  69. if len(expected) > 0:
  70. rlen = len(expected)
  71. else:
  72. rlen = 1
  73. packet_recvd = sock.recv(rlen)
  74. if packet_matches(name, packet_recvd, expected):
  75. return True
  76. else:
  77. raise TestError
  78. def packet_matches(name, recvd, expected):
  79. if recvd != expected:
  80. print("FAIL: Received incorrect "+name+".")
  81. try:
  82. print("Received: "+to_string(recvd))
  83. except struct.error:
  84. print("Received (not decoded, len=%d): %s" % (len(recvd), recvd))
  85. try:
  86. print("Expected: "+to_string(expected))
  87. except struct.error:
  88. print("Expected (not decoded, len=%d): %s" % (len(expected), expected))
  89. return False
  90. else:
  91. return True
  92. def receive_unordered(sock, recv1_packet, recv2_packet, error_string):
  93. expected1 = recv1_packet + recv2_packet
  94. expected2 = recv2_packet + recv1_packet
  95. recvd = b''
  96. while len(recvd) < len(expected1):
  97. r = sock.recv(1)
  98. if len(r) == 0:
  99. raise ValueError(error_string)
  100. recvd += r
  101. if recvd == expected1 or recvd == expected2:
  102. return
  103. else:
  104. packet_matches(error_string, recvd, expected2)
  105. raise ValueError(error_string)
  106. def do_send_receive(sock, send_packet, receive_packet, error_string="send receive error"):
  107. size = len(send_packet)
  108. total_sent = 0
  109. while total_sent < size:
  110. sent = sock.send(send_packet[total_sent:])
  111. if sent == 0:
  112. raise RuntimeError("socket connection broken")
  113. total_sent += sent
  114. if expect_packet(sock, error_string, receive_packet):
  115. return sock
  116. else:
  117. sock.close()
  118. raise ValueError
  119. # Useful for mocking a client receiving (with ack) a qos1 publish
  120. def do_receive_send(sock, receive_packet, send_packet, error_string="receive send error"):
  121. if expect_packet(sock, error_string, receive_packet):
  122. size = len(send_packet)
  123. total_sent = 0
  124. while total_sent < size:
  125. sent = sock.send(send_packet[total_sent:])
  126. if sent == 0:
  127. raise RuntimeError("socket connection broken")
  128. total_sent += sent
  129. return sock
  130. else:
  131. sock.close()
  132. raise ValueError
  133. def client_connect_only(hostname="localhost", port=1888, timeout=10):
  134. sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  135. sock.settimeout(timeout)
  136. sock.connect((hostname, port))
  137. return sock
  138. def do_client_connect(connect_packet, connack_packet, hostname="localhost", port=1888, timeout=10, connack_error="connack"):
  139. sock = client_connect_only(hostname, port, timeout)
  140. return do_send_receive(sock, connect_packet, connack_packet, connack_error)
  141. def remaining_length(packet):
  142. l = min(5, len(packet))
  143. all_bytes = struct.unpack("!"+"B"*l, packet[:l])
  144. mult = 1
  145. rl = 0
  146. for i in range(1,l-1):
  147. byte = all_bytes[i]
  148. rl += (byte & 127) * mult
  149. mult *= 128
  150. if byte & 128 == 0:
  151. packet = packet[i+1:]
  152. break
  153. return (packet, rl)
  154. def to_hex_string(packet):
  155. if len(packet) == 0:
  156. return ""
  157. s = ""
  158. while len(packet) > 0:
  159. packet0 = struct.unpack("!B", packet[0])
  160. s = s+hex(packet0[0]) + " "
  161. packet = packet[1:]
  162. return s
  163. def to_string(packet):
  164. if len(packet) == 0:
  165. return ""
  166. packet0 = struct.unpack("!B%ds" % (len(packet)-1), bytes(packet))
  167. packet0 = packet0[0]
  168. cmd = packet0 & 0xF0
  169. if cmd == 0x00:
  170. # Reserved
  171. return "0x00"
  172. elif cmd == 0x10:
  173. # CONNECT
  174. (packet, rl) = remaining_length(packet)
  175. pack_format = "!H" + str(len(packet)-2) + 's'
  176. (slen, packet) = struct.unpack(pack_format, packet)
  177. pack_format = "!" + str(slen)+'sBBH' + str(len(packet)-slen-4) + 's'
  178. (protocol, proto_ver, flags, keepalive, packet) = struct.unpack(pack_format, packet)
  179. s = "CONNECT, proto="+str(protocol)+str(proto_ver)+", keepalive="+str(keepalive)
  180. if flags&2:
  181. s = s+", clean-session"
  182. else:
  183. s = s+", durable"
  184. pack_format = "!H" + str(len(packet)-2) + 's'
  185. (slen, packet) = struct.unpack(pack_format, packet)
  186. pack_format = "!" + str(slen)+'s' + str(len(packet)-slen) + 's'
  187. (client_id, packet) = struct.unpack(pack_format, packet)
  188. s = s+", id="+str(client_id)
  189. if flags&4:
  190. pack_format = "!H" + str(len(packet)-2) + 's'
  191. (slen, packet) = struct.unpack(pack_format, packet)
  192. pack_format = "!" + str(slen)+'s' + str(len(packet)-slen) + 's'
  193. (will_topic, packet) = struct.unpack(pack_format, packet)
  194. s = s+", will-topic="+str(will_topic)
  195. pack_format = "!H" + str(len(packet)-2) + 's'
  196. (slen, packet) = struct.unpack(pack_format, packet)
  197. pack_format = "!" + str(slen)+'s' + str(len(packet)-slen) + 's'
  198. (will_message, packet) = struct.unpack(pack_format, packet)
  199. s = s+", will-message="+will_message
  200. s = s+", will-qos="+str((flags&24)>>3)
  201. s = s+", will-retain="+str((flags&32)>>5)
  202. if flags&128:
  203. pack_format = "!H" + str(len(packet)-2) + 's'
  204. (slen, packet) = struct.unpack(pack_format, packet)
  205. pack_format = "!" + str(slen)+'s' + str(len(packet)-slen) + 's'
  206. (username, packet) = struct.unpack(pack_format, packet)
  207. s = s+", username="+str(username)
  208. if flags&64:
  209. pack_format = "!H" + str(len(packet)-2) + 's'
  210. (slen, packet) = struct.unpack(pack_format, packet)
  211. pack_format = "!" + str(slen)+'s' + str(len(packet)-slen) + 's'
  212. (password, packet) = struct.unpack(pack_format, packet)
  213. s = s+", password="+str(password)
  214. if flags&1:
  215. s = s+", reserved=1"
  216. return s
  217. elif cmd == 0x20:
  218. # CONNACK
  219. (cmd, rl, resv, rc) = struct.unpack('!BBBB', packet)
  220. return "CONNACK, rl="+str(rl)+", res="+str(resv)+", rc="+str(rc)
  221. elif cmd == 0x30:
  222. # PUBLISH
  223. dup = (packet0 & 0x08)>>3
  224. qos = (packet0 & 0x06)>>1
  225. retain = (packet0 & 0x01)
  226. (packet, rl) = remaining_length(packet)
  227. pack_format = "!H" + str(len(packet)-2) + 's'
  228. (tlen, packet) = struct.unpack(pack_format, packet)
  229. pack_format = "!" + str(tlen)+'s' + str(len(packet)-tlen) + 's'
  230. (topic, packet) = struct.unpack(pack_format, packet)
  231. s = "PUBLISH, rl="+str(rl)+", topic="+str(topic)+", qos="+str(qos)+", retain="+str(retain)+", dup="+str(dup)
  232. if qos > 0:
  233. pack_format = "!H" + str(len(packet)-2) + 's'
  234. (mid, packet) = struct.unpack(pack_format, packet)
  235. s = s + ", mid="+str(mid)
  236. s = s + ", payload="+str(packet)
  237. return s
  238. elif cmd == 0x40:
  239. # PUBACK
  240. if len(packet) == 5:
  241. (cmd, rl, mid, reason_code) = struct.unpack('!BBHB', packet)
  242. return "PUBACK, rl="+str(rl)+", mid="+str(mid)+", reason_code="+str(reason_code)
  243. else:
  244. (cmd, rl, mid) = struct.unpack('!BBH', packet)
  245. return "PUBACK, rl="+str(rl)+", mid="+str(mid)
  246. elif cmd == 0x50:
  247. # PUBREC
  248. if len(packet) == 5:
  249. (cmd, rl, mid, reason_code) = struct.unpack('!BBHB', packet)
  250. return "PUBREC, rl="+str(rl)+", mid="+str(mid)+", reason_code="+str(reason_code)
  251. else:
  252. (cmd, rl, mid) = struct.unpack('!BBH', packet)
  253. return "PUBREC, rl="+str(rl)+", mid="+str(mid)
  254. elif cmd == 0x60:
  255. # PUBREL
  256. dup = (packet0 & 0x08)>>3
  257. (cmd, rl, mid) = struct.unpack('!BBH', packet)
  258. return "PUBREL, rl="+str(rl)+", mid="+str(mid)+", dup="+str(dup)
  259. elif cmd == 0x70:
  260. # PUBCOMP
  261. (cmd, rl, mid) = struct.unpack('!BBH', packet)
  262. return "PUBCOMP, rl="+str(rl)+", mid="+str(mid)
  263. elif cmd == 0x80:
  264. # SUBSCRIBE
  265. (packet, rl) = remaining_length(packet)
  266. pack_format = "!H" + str(len(packet)-2) + 's'
  267. (mid, packet) = struct.unpack(pack_format, packet)
  268. s = "SUBSCRIBE, rl="+str(rl)+", mid="+str(mid)
  269. topic_index = 0
  270. while len(packet) > 0:
  271. pack_format = "!H" + str(len(packet)-2) + 's'
  272. (tlen, packet) = struct.unpack(pack_format, packet)
  273. pack_format = "!" + str(tlen)+'sB' + str(len(packet)-tlen-1) + 's'
  274. (topic, qos, packet) = struct.unpack(pack_format, packet)
  275. s = s + ", topic"+str(topic_index)+"="+str(topic)+","+str(qos)
  276. return s
  277. elif cmd == 0x90:
  278. # SUBACK
  279. (packet, rl) = remaining_length(packet)
  280. pack_format = "!H" + str(len(packet)-2) + 's'
  281. (mid, packet) = struct.unpack(pack_format, packet)
  282. pack_format = "!" + "B"*len(packet)
  283. granted_qos = struct.unpack(pack_format, packet)
  284. s = "SUBACK, rl="+str(rl)+", mid="+str(mid)+", granted_qos="+str(granted_qos[0])
  285. for i in range(1, len(granted_qos)-1):
  286. s = s+", "+str(granted_qos[i])
  287. return s
  288. elif cmd == 0xA0:
  289. # UNSUBSCRIBE
  290. (packet, rl) = remaining_length(packet)
  291. pack_format = "!H" + str(len(packet)-2) + 's'
  292. (mid, packet) = struct.unpack(pack_format, packet)
  293. s = "UNSUBSCRIBE, rl="+str(rl)+", mid="+str(mid)
  294. topic_index = 0
  295. while len(packet) > 0:
  296. pack_format = "!H" + str(len(packet)-2) + 's'
  297. (tlen, packet) = struct.unpack(pack_format, packet)
  298. pack_format = "!" + str(tlen)+'s' + str(len(packet)-tlen) + 's'
  299. (topic, packet) = struct.unpack(pack_format, packet)
  300. s = s + ", topic"+str(topic_index)+"="+str(topic)
  301. return s
  302. elif cmd == 0xB0:
  303. # UNSUBACK
  304. (cmd, rl, mid) = struct.unpack('!BBH', packet)
  305. return "UNSUBACK, rl="+str(rl)+", mid="+str(mid)
  306. elif cmd == 0xC0:
  307. # PINGREQ
  308. (cmd, rl) = struct.unpack('!BB', packet)
  309. return "PINGREQ, rl="+str(rl)
  310. elif cmd == 0xD0:
  311. # PINGRESP
  312. (cmd, rl) = struct.unpack('!BB', packet)
  313. return "PINGRESP, rl="+str(rl)
  314. elif cmd == 0xE0:
  315. # DISCONNECT
  316. if len(packet) == 3:
  317. (cmd, rl, reason_code) = struct.unpack('!BBB', packet)
  318. return "DISCONNECT, rl="+str(rl)+", reason_code="+str(reason_code)
  319. else:
  320. (cmd, rl) = struct.unpack('!BB', packet)
  321. return "DISCONNECT, rl="+str(rl)
  322. elif cmd == 0xF0:
  323. # AUTH
  324. (cmd, rl) = struct.unpack('!BB', packet)
  325. return "AUTH, rl="+str(rl)
  326. def read_varint(sock, rl):
  327. varint = 0
  328. multiplier = 1
  329. while True:
  330. byte = sock.recv(1)
  331. byte, = struct.unpack("!B", byte)
  332. varint += (byte & 127)*multiplier
  333. multiplier *= 128
  334. rl -= 1
  335. if byte & 128 == 0x00:
  336. return (varint, rl)
  337. def mqtt_read_string(sock, rl):
  338. slen = sock.recv(2)
  339. slen, = struct.unpack("!H", slen)
  340. payload = sock.recv(slen)
  341. payload, = struct.unpack("!%ds" % (slen), payload)
  342. rl -= (2 + slen)
  343. return (payload, rl)
  344. def read_publish(sock, proto_ver=4):
  345. cmd, = struct.unpack("!B", sock.recv(1))
  346. if cmd & 0xF0 != 0x30:
  347. raise ValueError
  348. qos = (cmd & 0x06) >> 1
  349. rl, t = read_varint(sock, 0)
  350. topic, rl = mqtt_read_string(sock, rl)
  351. if qos > 0:
  352. sock.recv(2)
  353. rl -= 1
  354. if proto_ver == 5:
  355. proplen, rl = read_varint(sock, rl)
  356. sock.recv(proplen)
  357. rl -= proplen
  358. payload = sock.recv(rl).decode('utf-8')
  359. return payload
  360. def gen_fixed_hdr(command, remaining_length):
  361. return struct.pack("B", command) + pack_remaining_length(remaining_length)
  362. def gen_variable_hdr(mid=None):
  363. if mid is not None:
  364. return struct.pack("!H", mid)
  365. else:
  366. return b""
  367. def gen_connect(client_id, clean_session=True, keepalive=60, username=None, password=None, will_topic=None, will_qos=0, will_retain=False, will_payload=b"", proto_ver=4, connect_reserved=False, properties=b"", will_properties=b"", session_expiry=-1):
  368. if (proto_ver&0x7F) == 3 or proto_ver == 0:
  369. remaining_length = 12
  370. elif (proto_ver&0x7F) == 4 or proto_ver == 5:
  371. remaining_length = 10
  372. else:
  373. raise ValueError
  374. if client_id != None:
  375. client_id = client_id.encode("utf-8")
  376. remaining_length = remaining_length + 2+len(client_id)
  377. else:
  378. remaining_length = remaining_length + 2
  379. connect_flags = 0
  380. if connect_reserved:
  381. connect_flags = connect_flags | 0x01
  382. if clean_session:
  383. connect_flags = connect_flags | 0x02
  384. if proto_ver == 5:
  385. if properties == b"":
  386. properties += mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 20)
  387. if session_expiry != -1:
  388. properties += mqtt5_props.gen_uint32_prop(mqtt5_props.PROP_SESSION_EXPIRY_INTERVAL, session_expiry)
  389. properties = mqtt5_props.prop_finalise(properties)
  390. remaining_length += len(properties)
  391. if will_topic != None:
  392. will_topic = will_topic.encode("utf-8")
  393. remaining_length = remaining_length + 2+len(will_topic) + 2+len(will_payload)
  394. connect_flags = connect_flags | 0x04 | ((will_qos&0x03) << 3)
  395. if will_retain:
  396. connect_flags = connect_flags | 32
  397. if proto_ver == 5:
  398. will_properties = mqtt5_props.prop_finalise(will_properties)
  399. remaining_length += len(will_properties)
  400. if username != None:
  401. username = username.encode("utf-8")
  402. remaining_length = remaining_length + 2+len(username)
  403. connect_flags = connect_flags | 0x80
  404. if password != None:
  405. password = password.encode("utf-8")
  406. connect_flags = connect_flags | 0x40
  407. remaining_length = remaining_length + 2+len(password)
  408. rl = pack_remaining_length(remaining_length)
  409. packet = struct.pack("!B"+str(len(rl))+"s", 0x10, rl)
  410. if (proto_ver&0x7F) == 3 or proto_ver == 0:
  411. packet = packet + struct.pack("!H6sBBH", len(b"MQIsdp"), b"MQIsdp", proto_ver, connect_flags, keepalive)
  412. elif (proto_ver&0x7F) == 4 or proto_ver == 5:
  413. packet = packet + struct.pack("!H4sBBH", len(b"MQTT"), b"MQTT", proto_ver, connect_flags, keepalive)
  414. if proto_ver == 5:
  415. packet += properties
  416. if client_id != None:
  417. packet = packet + struct.pack("!H"+str(len(client_id))+"s", len(client_id), bytes(client_id))
  418. else:
  419. packet = packet + struct.pack("!H", 0)
  420. if will_topic != None:
  421. packet += will_properties
  422. packet = packet + struct.pack("!H"+str(len(will_topic))+"s", len(will_topic), will_topic)
  423. if len(will_payload) > 0:
  424. packet = packet + struct.pack("!H"+str(len(will_payload))+"s", len(will_payload), will_payload)
  425. else:
  426. packet = packet + struct.pack("!H", 0)
  427. if username != None:
  428. packet = packet + struct.pack("!H"+str(len(username))+"s", len(username), username)
  429. if password != None:
  430. packet = packet + struct.pack("!H"+str(len(password))+"s", len(password), password)
  431. return packet
  432. def gen_connack(flags=0, rc=0, proto_ver=4, properties=b"", property_helper=True):
  433. if proto_ver == 5:
  434. if property_helper == True:
  435. if properties is not None:
  436. properties = mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_TOPIC_ALIAS_MAXIMUM, 10) \
  437. + properties + mqtt5_props.gen_uint16_prop(mqtt5_props.PROP_RECEIVE_MAXIMUM, 20)
  438. else:
  439. properties = b""
  440. properties = mqtt5_props.prop_finalise(properties)
  441. packet = struct.pack('!BBBB', 32, 2+len(properties), flags, rc) + properties
  442. else:
  443. packet = struct.pack('!BBBB', 32, 2, flags, rc);
  444. return packet
  445. def gen_publish(topic, qos, payload=None, retain=False, dup=False, mid=0, proto_ver=4, properties=b""):
  446. topic = topic.encode("utf-8")
  447. rl = 2+len(topic)
  448. pack_format = "H"+str(len(topic))+"s"
  449. if qos > 0:
  450. rl = rl + 2
  451. pack_format = pack_format + "H"
  452. if proto_ver == 5:
  453. properties = mqtt5_props.prop_finalise(properties)
  454. rl += len(properties)
  455. # This will break if len(properties) > 127
  456. pack_format = pack_format + "%ds"%(len(properties))
  457. if payload != None:
  458. payload = payload.encode("utf-8")
  459. rl = rl + len(payload)
  460. pack_format = pack_format + str(len(payload))+"s"
  461. else:
  462. payload = b""
  463. pack_format = pack_format + "0s"
  464. rlpacked = pack_remaining_length(rl)
  465. cmd = 48 | (qos<<1)
  466. if retain:
  467. cmd = cmd + 1
  468. if dup:
  469. cmd = cmd + 8
  470. if proto_ver == 5:
  471. if qos > 0:
  472. return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, mid, properties, payload)
  473. else:
  474. return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, properties, payload)
  475. else:
  476. if qos > 0:
  477. return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, mid, payload)
  478. else:
  479. return struct.pack("!B" + str(len(rlpacked))+"s" + pack_format, cmd, rlpacked, len(topic), topic, payload)
  480. def _gen_command_with_mid(cmd, mid, proto_ver=4, reason_code=-1, properties=None):
  481. if proto_ver == 5 and (reason_code != -1 or properties is not None):
  482. if reason_code == -1:
  483. reason_code = 0
  484. if properties is None:
  485. return struct.pack('!BBHB', cmd, 3, mid, reason_code)
  486. elif properties == "":
  487. return struct.pack('!BBHBB', cmd, 4, mid, reason_code, 0)
  488. else:
  489. properties = mqtt5_props.prop_finalise(properties)
  490. pack_format = "!BBHB"+str(len(properties))+"s"
  491. return struct.pack(pack_format, cmd, 2+1+len(properties), mid, reason_code, properties)
  492. else:
  493. return struct.pack('!BBH', cmd, 2, mid)
  494. def gen_puback(mid, proto_ver=4, reason_code=-1, properties=None):
  495. return _gen_command_with_mid(64, mid, proto_ver, reason_code, properties)
  496. def gen_pubrec(mid, proto_ver=4, reason_code=-1, properties=None):
  497. return _gen_command_with_mid(80, mid, proto_ver, reason_code, properties)
  498. def gen_pubrel(mid, dup=False, proto_ver=4, reason_code=-1, properties=None):
  499. if dup:
  500. cmd = 96+8+2
  501. else:
  502. cmd = 96+2
  503. return _gen_command_with_mid(cmd, mid, proto_ver, reason_code, properties)
  504. def gen_pubcomp(mid, proto_ver=4, reason_code=-1, properties=None):
  505. return _gen_command_with_mid(112, mid, proto_ver, reason_code, properties)
  506. def gen_subscribe(mid, topic, qos, cmd=130, proto_ver=4, properties=b""):
  507. topic = topic.encode("utf-8")
  508. packet = struct.pack("!B", cmd)
  509. if proto_ver == 5:
  510. if properties == b"":
  511. packet += pack_remaining_length(2+1+2+len(topic)+1)
  512. pack_format = "!HBH"+str(len(topic))+"sB"
  513. return packet + struct.pack(pack_format, mid, 0, len(topic), topic, qos)
  514. else:
  515. properties = mqtt5_props.prop_finalise(properties)
  516. packet += pack_remaining_length(2+1+2+len(topic)+len(properties))
  517. pack_format = "!H"+str(len(properties))+"s"+"H"+str(len(topic))+"sB"
  518. return packet + struct.pack(pack_format, mid, properties, len(topic), topic, qos)
  519. else:
  520. packet += pack_remaining_length(2+2+len(topic)+1)
  521. pack_format = "!HH"+str(len(topic))+"sB"
  522. return packet + struct.pack(pack_format, mid, len(topic), topic, qos)
  523. def gen_suback(mid, qos, proto_ver=4):
  524. if proto_ver == 5:
  525. return struct.pack('!BBHBB', 144, 2+1+1, mid, 0, qos)
  526. else:
  527. return struct.pack('!BBHB', 144, 2+1, mid, qos)
  528. def gen_unsubscribe(mid, topic, cmd=162, proto_ver=4, properties=b""):
  529. topic = topic.encode("utf-8")
  530. if proto_ver == 5:
  531. if properties == b"":
  532. pack_format = "!BBHBH"+str(len(topic))+"s"
  533. return struct.pack(pack_format, cmd, 2+2+len(topic)+1, mid, 0, len(topic), topic)
  534. else:
  535. properties = mqtt5_props.prop_finalise(properties)
  536. packet = struct.pack("!B", cmd)
  537. l = 2+2+len(topic)+1+len(properties)
  538. packet += pack_remaining_length(l)
  539. pack_format = "!HB"+str(len(properties))+"sH"+str(len(topic))+"s"
  540. packet += struct.pack(pack_format, mid, len(properties), properties, len(topic), topic)
  541. return packet
  542. else:
  543. pack_format = "!BBHH"+str(len(topic))+"s"
  544. return struct.pack(pack_format, cmd, 2+2+len(topic), mid, len(topic), topic)
  545. def gen_unsubscribe_multiple(mid, topics, proto_ver=4):
  546. packet = b""
  547. remaining_length = 0
  548. for t in topics:
  549. t = t.encode("utf-8")
  550. remaining_length += 2+len(t)
  551. packet += struct.pack("!H"+str(len(t))+"s", len(t), t)
  552. if proto_ver == 5:
  553. remaining_length += 2+1
  554. return struct.pack("!BBHB", 162, remaining_length, mid, 0) + packet
  555. else:
  556. remaining_length += 2
  557. return struct.pack("!BBH", 162, remaining_length, mid) + packet
  558. def gen_unsuback(mid, reason_code=0, proto_ver=4):
  559. if proto_ver == 5:
  560. if isinstance(reason_code, list):
  561. reason_code_count = len(reason_code)
  562. p = struct.pack('!BBHB', 176, 3+reason_code_count, mid, 0)
  563. for r in reason_code:
  564. p += struct.pack('B', r)
  565. return p
  566. else:
  567. return struct.pack('!BBHBB', 176, 4, mid, 0, reason_code)
  568. else:
  569. return struct.pack('!BBH', 176, 2, mid)
  570. def gen_pingreq():
  571. return struct.pack('!BB', 192, 0)
  572. def gen_pingresp():
  573. return struct.pack('!BB', 208, 0)
  574. def _gen_short(cmd, reason_code=-1, proto_ver=5, properties=None):
  575. if proto_ver == 5 and (reason_code != -1 or properties is not None):
  576. if reason_code == -1:
  577. reason_code = 0
  578. if properties is None:
  579. return struct.pack('!BBB', cmd, 1, reason_code)
  580. elif properties == "":
  581. return struct.pack('!BBBB', cmd, 2, reason_code, 0)
  582. else:
  583. properties = mqtt5_props.prop_finalise(properties)
  584. return struct.pack("!BBB", cmd, 1+len(properties), reason_code) + properties
  585. else:
  586. return struct.pack('!BB', cmd, 0)
  587. def gen_disconnect(reason_code=-1, proto_ver=4, properties=None):
  588. return _gen_short(0xE0, reason_code, proto_ver, properties)
  589. def gen_auth(reason_code=-1, properties=None):
  590. return _gen_short(0xF0, reason_code, 5, properties)
  591. def pack_remaining_length(remaining_length):
  592. s = b""
  593. while True:
  594. byte = remaining_length % 128
  595. remaining_length = remaining_length // 128
  596. # If there are more digits to encode, set the top bit of this digit
  597. if remaining_length > 0:
  598. byte = byte | 0x80
  599. s = s + struct.pack("!B", byte)
  600. if remaining_length == 0:
  601. return s
  602. def get_port(count=1):
  603. if count == 1:
  604. if len(sys.argv) == 2:
  605. return int(sys.argv[1])
  606. else:
  607. return 1888
  608. else:
  609. if len(sys.argv) >= 1+count:
  610. p = ()
  611. for i in range(0, count):
  612. p = p + (int(sys.argv[1+i]),)
  613. return p
  614. else:
  615. return tuple(range(1888, 1888+count))
  616. def get_lib_port():
  617. if len(sys.argv) == 3:
  618. return int(sys.argv[2])
  619. else:
  620. return 1888
  621. def do_ping(sock, error_string="pingresp"):
  622. do_send_receive(sock, gen_pingreq(), gen_pingresp(), error_string)
  623. @atexit.register
  624. def test_cleanup():
  625. global vg_logfiles
  626. if os.environ.get('MOSQ_USE_VALGRIND') is not None:
  627. for f in vg_logfiles:
  628. try:
  629. if os.stat(f).st_size == 0:
  630. os.remove(f)
  631. except OSError:
  632. pass