handle_pubackcomp.c 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163
  1. /*
  2. Copyright (c) 2009-2020 Roger Light <roger@atchoo.org>
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License 2.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. https://www.eclipse.org/legal/epl-2.0/
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
  11. Contributors:
  12. Roger Light - initial implementation and documentation.
  13. */
  14. #include "config.h"
  15. #include <assert.h>
  16. #include <stdio.h>
  17. #include <string.h>
  18. #ifdef WITH_BROKER
  19. # include "mosquitto_broker_internal.h"
  20. #endif
  21. #include "mosquitto.h"
  22. #include "logging_mosq.h"
  23. #include "memory_mosq.h"
  24. #include "messages_mosq.h"
  25. #include "mqtt_protocol.h"
  26. #include "net_mosq.h"
  27. #include "packet_mosq.h"
  28. #include "read_handle.h"
  29. #include "send_mosq.h"
  30. #include "util_mosq.h"
  31. int handle__pubackcomp(struct mosquitto *mosq, const char *type)
  32. {
  33. uint8_t reason_code = 0;
  34. uint16_t mid;
  35. int rc;
  36. mosquitto_property *properties = NULL;
  37. int qos;
  38. assert(mosq);
  39. if(mosquitto__get_state(mosq) != mosq_cs_active){
  40. return MOSQ_ERR_PROTOCOL;
  41. }
  42. if(mosq->protocol != mosq_p_mqtt31){
  43. if((mosq->in_packet.command&0x0F) != 0x00){
  44. return MOSQ_ERR_MALFORMED_PACKET;
  45. }
  46. }
  47. pthread_mutex_lock(&mosq->msgs_out.mutex);
  48. util__increment_send_quota(mosq);
  49. pthread_mutex_unlock(&mosq->msgs_out.mutex);
  50. rc = packet__read_uint16(&mosq->in_packet, &mid);
  51. if(rc) return rc;
  52. if(type[3] == 'A'){ /* pubAck or pubComp */
  53. if(mosq->in_packet.command != CMD_PUBACK){
  54. return MOSQ_ERR_MALFORMED_PACKET;
  55. }
  56. qos = 1;
  57. }else{
  58. if(mosq->in_packet.command != CMD_PUBCOMP){
  59. return MOSQ_ERR_MALFORMED_PACKET;
  60. }
  61. qos = 2;
  62. }
  63. if(mid == 0){
  64. return MOSQ_ERR_PROTOCOL;
  65. }
  66. if(mosq->protocol == mosq_p_mqtt5 && mosq->in_packet.remaining_length > 2){
  67. rc = packet__read_byte(&mosq->in_packet, &reason_code);
  68. if(rc){
  69. return rc;
  70. }
  71. if(mosq->in_packet.remaining_length > 3){
  72. rc = property__read_all(CMD_PUBACK, &mosq->in_packet, &properties);
  73. if(rc) return rc;
  74. }
  75. if(type[3] == 'A'){ /* pubAck or pubComp */
  76. if(reason_code != MQTT_RC_SUCCESS
  77. && reason_code != MQTT_RC_NO_MATCHING_SUBSCRIBERS
  78. && reason_code != MQTT_RC_UNSPECIFIED
  79. && reason_code != MQTT_RC_IMPLEMENTATION_SPECIFIC
  80. && reason_code != MQTT_RC_NOT_AUTHORIZED
  81. && reason_code != MQTT_RC_TOPIC_NAME_INVALID
  82. && reason_code != MQTT_RC_PACKET_ID_IN_USE
  83. && reason_code != MQTT_RC_QUOTA_EXCEEDED
  84. && reason_code != MQTT_RC_PAYLOAD_FORMAT_INVALID
  85. ){
  86. return MOSQ_ERR_PROTOCOL;
  87. }
  88. }else{
  89. if(reason_code != MQTT_RC_SUCCESS
  90. && reason_code != MQTT_RC_PACKET_ID_NOT_FOUND
  91. ){
  92. return MOSQ_ERR_PROTOCOL;
  93. }
  94. }
  95. }
  96. if(mosq->in_packet.pos < mosq->in_packet.remaining_length){
  97. #ifdef WITH_BROKER
  98. mosquitto_property_free_all(&properties);
  99. #endif
  100. return MOSQ_ERR_MALFORMED_PACKET;
  101. }
  102. #ifdef WITH_BROKER
  103. log__printf(NULL, MOSQ_LOG_DEBUG, "Received %s from %s (Mid: %d, RC:%d)", type, mosq->id, mid, reason_code);
  104. /* Immediately free, we don't do anything with Reason String or User Property at the moment */
  105. mosquitto_property_free_all(&properties);
  106. rc = db__message_delete_outgoing(mosq, mid, mosq_ms_wait_for_pubcomp, qos);
  107. if(rc == MOSQ_ERR_NOT_FOUND){
  108. log__printf(mosq, MOSQ_LOG_WARNING, "Warning: Received %s from %s for an unknown packet identifier %d.", type, mosq->id, mid);
  109. return MOSQ_ERR_SUCCESS;
  110. }else{
  111. return rc;
  112. }
  113. #else
  114. log__printf(mosq, MOSQ_LOG_DEBUG, "Client %s received %s (Mid: %d, RC:%d)", mosq->id, type, mid, reason_code);
  115. rc = message__delete(mosq, mid, mosq_md_out, qos);
  116. if(rc == MOSQ_ERR_SUCCESS){
  117. /* Only inform the client the message has been sent once. */
  118. pthread_mutex_lock(&mosq->callback_mutex);
  119. if(mosq->on_publish){
  120. mosq->in_callback = true;
  121. mosq->on_publish(mosq, mosq->userdata, mid);
  122. mosq->in_callback = false;
  123. }
  124. if(mosq->on_publish_v5){
  125. mosq->in_callback = true;
  126. mosq->on_publish_v5(mosq, mosq->userdata, mid, reason_code, properties);
  127. mosq->in_callback = false;
  128. }
  129. pthread_mutex_unlock(&mosq->callback_mutex);
  130. mosquitto_property_free_all(&properties);
  131. }else if(rc != MOSQ_ERR_NOT_FOUND){
  132. return rc;
  133. }
  134. pthread_mutex_lock(&mosq->msgs_out.mutex);
  135. message__release_to_inflight(mosq, mosq_md_out);
  136. pthread_mutex_unlock(&mosq->msgs_out.mutex);
  137. return MOSQ_ERR_SUCCESS;
  138. #endif
  139. }