123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566 |
- /*
- Copyright (c) 2009-2020 Roger Light <roger@atchoo.org>
- All rights reserved. This program and the accompanying materials
- are made available under the terms of the Eclipse Public License 2.0
- and Eclipse Distribution License v1.0 which accompany this distribution.
- The Eclipse Public License is available at
- https://www.eclipse.org/legal/epl-2.0/
- and the Eclipse Distribution License is available at
- http://www.eclipse.org/org/documents/edl-v10.php.
- SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
- Contributors:
- Roger Light - initial implementation and documentation.
- */
- #include "config.h"
- #include <assert.h>
- #include <errno.h>
- #include <string.h>
- #ifdef WITH_BROKER
- # include "mosquitto_broker_internal.h"
- # ifdef WITH_WEBSOCKETS
- # include <libwebsockets.h>
- # endif
- #else
- # include "read_handle.h"
- #endif
- #include "memory_mosq.h"
- #include "mqtt_protocol.h"
- #include "net_mosq.h"
- #include "packet_mosq.h"
- #include "read_handle.h"
- #include "util_mosq.h"
- #ifdef WITH_BROKER
- # include "sys_tree.h"
- # include "send_mosq.h"
- #else
- # define G_BYTES_RECEIVED_INC(A)
- # define G_BYTES_SENT_INC(A)
- # define G_MSGS_SENT_INC(A)
- # define G_PUB_MSGS_SENT_INC(A)
- #endif
- int packet__alloc(struct mosquitto__packet *packet)
- {
- uint8_t remaining_bytes[5], byte;
- uint32_t remaining_length;
- int i;
- assert(packet);
- remaining_length = packet->remaining_length;
- packet->payload = NULL;
- packet->remaining_count = 0;
- do{
- byte = remaining_length % 128;
- remaining_length = remaining_length / 128;
- /* If there are more digits to encode, set the top bit of this digit */
- if(remaining_length > 0){
- byte = byte | 0x80;
- }
- remaining_bytes[packet->remaining_count] = byte;
- packet->remaining_count++;
- }while(remaining_length > 0 && packet->remaining_count < 5);
- if(packet->remaining_count == 5) return MOSQ_ERR_PAYLOAD_SIZE;
- packet->packet_length = packet->remaining_length + 1 + (uint8_t)packet->remaining_count;
- #ifdef WITH_WEBSOCKETS
- packet->payload = mosquitto__malloc(sizeof(uint8_t)*packet->packet_length + LWS_PRE);
- #else
- packet->payload = mosquitto__malloc(sizeof(uint8_t)*packet->packet_length);
- #endif
- if(!packet->payload) return MOSQ_ERR_NOMEM;
- packet->payload[0] = packet->command;
- for(i=0; i<packet->remaining_count; i++){
- packet->payload[i+1] = remaining_bytes[i];
- }
- packet->pos = 1U + (uint8_t)packet->remaining_count;
- return MOSQ_ERR_SUCCESS;
- }
- void packet__cleanup(struct mosquitto__packet *packet)
- {
- if(!packet) return;
- /* Free data and reset values */
- packet->command = 0;
- packet->remaining_count = 0;
- packet->remaining_mult = 1;
- packet->remaining_length = 0;
- mosquitto__free(packet->payload);
- packet->payload = NULL;
- packet->to_process = 0;
- packet->pos = 0;
- }
- void packet__cleanup_all_no_locks(struct mosquitto *mosq)
- {
- struct mosquitto__packet *packet;
- /* Out packet cleanup */
- if(mosq->out_packet && !mosq->current_out_packet){
- mosq->current_out_packet = mosq->out_packet;
- mosq->out_packet = mosq->out_packet->next;
- }
- while(mosq->current_out_packet){
- packet = mosq->current_out_packet;
- /* Free data and reset values */
- mosq->current_out_packet = mosq->out_packet;
- if(mosq->out_packet){
- mosq->out_packet = mosq->out_packet->next;
- }
- packet__cleanup(packet);
- mosquitto__free(packet);
- }
- mosq->out_packet_count = 0;
- packet__cleanup(&mosq->in_packet);
- }
- void packet__cleanup_all(struct mosquitto *mosq)
- {
- pthread_mutex_lock(&mosq->current_out_packet_mutex);
- pthread_mutex_lock(&mosq->out_packet_mutex);
- packet__cleanup_all_no_locks(mosq);
- pthread_mutex_unlock(&mosq->out_packet_mutex);
- pthread_mutex_unlock(&mosq->current_out_packet_mutex);
- }
- int packet__queue(struct mosquitto *mosq, struct mosquitto__packet *packet)
- {
- #ifndef WITH_BROKER
- char sockpair_data = 0;
- #endif
- assert(mosq);
- assert(packet);
- packet->pos = 0;
- packet->to_process = packet->packet_length;
- packet->next = NULL;
- pthread_mutex_lock(&mosq->out_packet_mutex);
- if(mosq->out_packet){
- mosq->out_packet_last->next = packet;
- }else{
- mosq->out_packet = packet;
- }
- mosq->out_packet_last = packet;
- mosq->out_packet_count++;
- pthread_mutex_unlock(&mosq->out_packet_mutex);
- #ifdef WITH_BROKER
- # ifdef WITH_WEBSOCKETS
- if(mosq->wsi){
- lws_callback_on_writable(mosq->wsi);
- return MOSQ_ERR_SUCCESS;
- }else{
- return packet__write(mosq);
- }
- # else
- return packet__write(mosq);
- # endif
- #else
- /* Write a single byte to sockpairW (connected to sockpairR) to break out
- * of select() if in threaded mode. */
- if(mosq->sockpairW != INVALID_SOCKET){
- #ifndef WIN32
- if(write(mosq->sockpairW, &sockpair_data, 1)){
- }
- #else
- send(mosq->sockpairW, &sockpair_data, 1, 0);
- #endif
- }
- if(mosq->in_callback == false && mosq->threaded == mosq_ts_none){
- return packet__write(mosq);
- }else{
- return MOSQ_ERR_SUCCESS;
- }
- #endif
- }
- int packet__check_oversize(struct mosquitto *mosq, uint32_t remaining_length)
- {
- uint32_t len;
- if(mosq->maximum_packet_size == 0) return MOSQ_ERR_SUCCESS;
- len = remaining_length + packet__varint_bytes(remaining_length);
- if(len > mosq->maximum_packet_size){
- return MOSQ_ERR_OVERSIZE_PACKET;
- }else{
- return MOSQ_ERR_SUCCESS;
- }
- }
- int packet__write(struct mosquitto *mosq)
- {
- ssize_t write_length;
- struct mosquitto__packet *packet;
- enum mosquitto_client_state state;
- if(!mosq) return MOSQ_ERR_INVAL;
- if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
- pthread_mutex_lock(&mosq->current_out_packet_mutex);
- pthread_mutex_lock(&mosq->out_packet_mutex);
- if(mosq->out_packet && !mosq->current_out_packet){
- mosq->current_out_packet = mosq->out_packet;
- mosq->out_packet = mosq->out_packet->next;
- if(!mosq->out_packet){
- mosq->out_packet_last = NULL;
- }
- mosq->out_packet_count--;
- }
- pthread_mutex_unlock(&mosq->out_packet_mutex);
- #ifdef WITH_BROKER
- if(mosq->current_out_packet){
- mux__add_out(mosq);
- }
- #endif
- state = mosquitto__get_state(mosq);
- #if defined(WITH_TLS) && !defined(WITH_BROKER)
- if(state == mosq_cs_connect_pending || mosq->want_connect){
- #else
- if(state == mosq_cs_connect_pending){
- #endif
- pthread_mutex_unlock(&mosq->current_out_packet_mutex);
- return MOSQ_ERR_SUCCESS;
- }
- while(mosq->current_out_packet){
- packet = mosq->current_out_packet;
- while(packet->to_process > 0){
- write_length = net__write(mosq, &(packet->payload[packet->pos]), packet->to_process);
- if(write_length > 0){
- G_BYTES_SENT_INC(write_length);
- packet->to_process -= (uint32_t)write_length;
- packet->pos += (uint32_t)write_length;
- }else{
- #ifdef WIN32
- errno = WSAGetLastError();
- #endif
- if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK
- #ifdef WIN32
- || errno == WSAENOTCONN
- #endif
- ){
- pthread_mutex_unlock(&mosq->current_out_packet_mutex);
- return MOSQ_ERR_SUCCESS;
- }else{
- pthread_mutex_unlock(&mosq->current_out_packet_mutex);
- switch(errno){
- case COMPAT_ECONNRESET:
- return MOSQ_ERR_CONN_LOST;
- case COMPAT_EINTR:
- return MOSQ_ERR_SUCCESS;
- default:
- return MOSQ_ERR_ERRNO;
- }
- }
- }
- }
- G_MSGS_SENT_INC(1);
- if(((packet->command)&0xF6) == CMD_PUBLISH){
- G_PUB_MSGS_SENT_INC(1);
- #ifndef WITH_BROKER
- pthread_mutex_lock(&mosq->callback_mutex);
- if(mosq->on_publish){
- /* This is a QoS=0 message */
- mosq->in_callback = true;
- mosq->on_publish(mosq, mosq->userdata, packet->mid);
- mosq->in_callback = false;
- }
- if(mosq->on_publish_v5){
- /* This is a QoS=0 message */
- mosq->in_callback = true;
- mosq->on_publish_v5(mosq, mosq->userdata, packet->mid, 0, NULL);
- mosq->in_callback = false;
- }
- pthread_mutex_unlock(&mosq->callback_mutex);
- }else if(((packet->command)&0xF0) == CMD_DISCONNECT){
- do_client_disconnect(mosq, MOSQ_ERR_SUCCESS, NULL);
- packet__cleanup(packet);
- mosquitto__free(packet);
- return MOSQ_ERR_SUCCESS;
- #endif
- }else if(((packet->command)&0xF0) == CMD_PUBLISH){
- G_PUB_MSGS_SENT_INC(1);
- }
- /* Free data and reset values */
- pthread_mutex_lock(&mosq->out_packet_mutex);
- mosq->current_out_packet = mosq->out_packet;
- if(mosq->out_packet){
- mosq->out_packet = mosq->out_packet->next;
- if(!mosq->out_packet){
- mosq->out_packet_last = NULL;
- }
- mosq->out_packet_count--;
- }
- pthread_mutex_unlock(&mosq->out_packet_mutex);
- packet__cleanup(packet);
- mosquitto__free(packet);
- #ifdef WITH_BROKER
- mosq->next_msg_out = db.now_s + mosq->keepalive;
- #else
- pthread_mutex_lock(&mosq->msgtime_mutex);
- mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
- pthread_mutex_unlock(&mosq->msgtime_mutex);
- #endif
- }
- #ifdef WITH_BROKER
- if (mosq->current_out_packet == NULL) {
- mux__remove_out(mosq);
- }
- #endif
- pthread_mutex_unlock(&mosq->current_out_packet_mutex);
- return MOSQ_ERR_SUCCESS;
- }
- int packet__read(struct mosquitto *mosq)
- {
- uint8_t byte;
- ssize_t read_length;
- int rc = 0;
- enum mosquitto_client_state state;
- if(!mosq){
- return MOSQ_ERR_INVAL;
- }
- if(mosq->sock == INVALID_SOCKET){
- return MOSQ_ERR_NO_CONN;
- }
- state = mosquitto__get_state(mosq);
- if(state == mosq_cs_connect_pending){
- return MOSQ_ERR_SUCCESS;
- }
- /* This gets called if pselect() indicates that there is network data
- * available - ie. at least one byte. What we do depends on what data we
- * already have.
- * If we've not got a command, attempt to read one and save it. This should
- * always work because it's only a single byte.
- * Then try to read the remaining length. This may fail because it is may
- * be more than one byte - will need to save data pending next read if it
- * does fail.
- * Then try to read the remaining payload, where 'payload' here means the
- * combined variable header and actual payload. This is the most likely to
- * fail due to longer length, so save current data and current position.
- * After all data is read, send to mosquitto__handle_packet() to deal with.
- * Finally, free the memory and reset everything to starting conditions.
- */
- if(!mosq->in_packet.command){
- read_length = net__read(mosq, &byte, 1);
- if(read_length == 1){
- mosq->in_packet.command = byte;
- #ifdef WITH_BROKER
- G_BYTES_RECEIVED_INC(1);
- /* Clients must send CONNECT as their first command. */
- if(!(mosq->bridge) && state == mosq_cs_connected && (byte&0xF0) != CMD_CONNECT){
- return MOSQ_ERR_PROTOCOL;
- }
- #endif
- }else{
- if(read_length == 0){
- return MOSQ_ERR_CONN_LOST; /* EOF */
- }
- #ifdef WIN32
- errno = WSAGetLastError();
- #endif
- if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
- return MOSQ_ERR_SUCCESS;
- }else{
- switch(errno){
- case COMPAT_ECONNRESET:
- return MOSQ_ERR_CONN_LOST;
- case COMPAT_EINTR:
- return MOSQ_ERR_SUCCESS;
- default:
- return MOSQ_ERR_ERRNO;
- }
- }
- }
- }
- /* remaining_count is the number of bytes that the remaining_length
- * parameter occupied in this incoming packet. We don't use it here as such
- * (it is used when allocating an outgoing packet), but we must be able to
- * determine whether all of the remaining_length parameter has been read.
- * remaining_count has three states here:
- * 0 means that we haven't read any remaining_length bytes
- * <0 means we have read some remaining_length bytes but haven't finished
- * >0 means we have finished reading the remaining_length bytes.
- */
- if(mosq->in_packet.remaining_count <= 0){
- do{
- read_length = net__read(mosq, &byte, 1);
- if(read_length == 1){
- mosq->in_packet.remaining_count--;
- /* Max 4 bytes length for remaining length as defined by protocol.
- * Anything more likely means a broken/malicious client.
- */
- if(mosq->in_packet.remaining_count < -4){
- return MOSQ_ERR_MALFORMED_PACKET;
- }
- G_BYTES_RECEIVED_INC(1);
- mosq->in_packet.remaining_length += (byte & 127) * mosq->in_packet.remaining_mult;
- mosq->in_packet.remaining_mult *= 128;
- }else{
- if(read_length == 0){
- return MOSQ_ERR_CONN_LOST; /* EOF */
- }
- #ifdef WIN32
- errno = WSAGetLastError();
- #endif
- if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
- return MOSQ_ERR_SUCCESS;
- }else{
- switch(errno){
- case COMPAT_ECONNRESET:
- return MOSQ_ERR_CONN_LOST;
- case COMPAT_EINTR:
- return MOSQ_ERR_SUCCESS;
- default:
- return MOSQ_ERR_ERRNO;
- }
- }
- }
- }while((byte & 128) != 0);
- /* We have finished reading remaining_length, so make remaining_count
- * positive. */
- mosq->in_packet.remaining_count = (int8_t)(mosq->in_packet.remaining_count * -1);
- #ifdef WITH_BROKER
- switch(mosq->in_packet.command & 0xF0){
- case CMD_CONNECT:
- if(mosq->in_packet.remaining_length > 100000){ /* Arbitrary limit, make configurable */
- return MOSQ_ERR_MALFORMED_PACKET;
- }
- break;
- case CMD_PUBACK:
- case CMD_PUBREC:
- case CMD_PUBREL:
- case CMD_PUBCOMP:
- case CMD_UNSUBACK:
- if(mosq->protocol != mosq_p_mqtt5 && mosq->in_packet.remaining_length != 2){
- return MOSQ_ERR_MALFORMED_PACKET;
- }
- break;
- case CMD_PINGREQ:
- case CMD_PINGRESP:
- if(mosq->in_packet.remaining_length != 0){
- return MOSQ_ERR_MALFORMED_PACKET;
- }
- break;
- case CMD_DISCONNECT:
- if(mosq->protocol != mosq_p_mqtt5 && mosq->in_packet.remaining_length != 0){
- return MOSQ_ERR_MALFORMED_PACKET;
- }
- break;
- }
- if(db.config->max_packet_size > 0 && mosq->in_packet.remaining_length+1 > db.config->max_packet_size){
- if(mosq->protocol == mosq_p_mqtt5){
- send__disconnect(mosq, MQTT_RC_PACKET_TOO_LARGE, NULL);
- }
- return MOSQ_ERR_OVERSIZE_PACKET;
- }
- #else
- /* FIXME - client case for incoming message received from broker too large */
- #endif
- if(mosq->in_packet.remaining_length > 0){
- mosq->in_packet.payload = mosquitto__malloc(mosq->in_packet.remaining_length*sizeof(uint8_t));
- if(!mosq->in_packet.payload){
- return MOSQ_ERR_NOMEM;
- }
- mosq->in_packet.to_process = mosq->in_packet.remaining_length;
- }
- }
- while(mosq->in_packet.to_process>0){
- read_length = net__read(mosq, &(mosq->in_packet.payload[mosq->in_packet.pos]), mosq->in_packet.to_process);
- if(read_length > 0){
- G_BYTES_RECEIVED_INC(read_length);
- mosq->in_packet.to_process -= (uint32_t)read_length;
- mosq->in_packet.pos += (uint32_t)read_length;
- }else{
- #ifdef WIN32
- errno = WSAGetLastError();
- #endif
- if(errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
- if(mosq->in_packet.to_process > 1000){
- /* Update last_msg_in time if more than 1000 bytes left to
- * receive. Helps when receiving large messages.
- * This is an arbitrary limit, but with some consideration.
- * If a client can't send 1000 bytes in a second it
- * probably shouldn't be using a 1 second keep alive. */
- #ifdef WITH_BROKER
- keepalive__update(mosq);
- #else
- pthread_mutex_lock(&mosq->msgtime_mutex);
- mosq->last_msg_in = mosquitto_time();
- pthread_mutex_unlock(&mosq->msgtime_mutex);
- #endif
- }
- return MOSQ_ERR_SUCCESS;
- }else{
- switch(errno){
- case COMPAT_ECONNRESET:
- return MOSQ_ERR_CONN_LOST;
- case COMPAT_EINTR:
- return MOSQ_ERR_SUCCESS;
- default:
- return MOSQ_ERR_ERRNO;
- }
- }
- }
- }
- /* All data for this packet is read. */
- mosq->in_packet.pos = 0;
- #ifdef WITH_BROKER
- G_MSGS_RECEIVED_INC(1);
- if(((mosq->in_packet.command)&0xF5) == CMD_PUBLISH){
- G_PUB_MSGS_RECEIVED_INC(1);
- }
- #endif
- rc = handle__packet(mosq);
- /* Free data and reset values */
- packet__cleanup(&mosq->in_packet);
- #ifdef WITH_BROKER
- keepalive__update(mosq);
- #else
- pthread_mutex_lock(&mosq->msgtime_mutex);
- mosq->last_msg_in = mosquitto_time();
- pthread_mutex_unlock(&mosq->msgtime_mutex);
- #endif
- return rc;
- }
|