mosquitto.c 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419
  1. /*
  2. Copyright (c) 2010-2020 Roger Light <roger@atchoo.org>
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License 2.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. https://www.eclipse.org/legal/epl-2.0/
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
  11. Contributors:
  12. Roger Light - initial implementation and documentation.
  13. */
  14. #include "config.h"
  15. #include <errno.h>
  16. #include <signal.h>
  17. #include <string.h>
  18. #ifndef WIN32
  19. #include <sys/time.h>
  20. #include <strings.h>
  21. #endif
  22. #if defined(__APPLE__)
  23. # include <mach/mach_time.h>
  24. #endif
  25. #include "logging_mosq.h"
  26. #include "mosquitto.h"
  27. #include "mosquitto_internal.h"
  28. #include "memory_mosq.h"
  29. #include "messages_mosq.h"
  30. #include "mqtt_protocol.h"
  31. #include "net_mosq.h"
  32. #include "packet_mosq.h"
  33. #include "will_mosq.h"
  34. static unsigned int init_refcount = 0;
  35. void mosquitto__destroy(struct mosquitto *mosq);
  36. int mosquitto_lib_version(int *major, int *minor, int *revision)
  37. {
  38. if(major) *major = LIBMOSQUITTO_MAJOR;
  39. if(minor) *minor = LIBMOSQUITTO_MINOR;
  40. if(revision) *revision = LIBMOSQUITTO_REVISION;
  41. return LIBMOSQUITTO_VERSION_NUMBER;
  42. }
  43. int mosquitto_lib_init(void)
  44. {
  45. int rc;
  46. if (init_refcount == 0) {
  47. #ifdef WIN32
  48. srand((unsigned int)GetTickCount64());
  49. #elif _POSIX_TIMERS>0 && defined(_POSIX_MONOTONIC_CLOCK)
  50. struct timespec tp;
  51. clock_gettime(CLOCK_MONOTONIC, &tp);
  52. srand((unsigned int)tp.tv_nsec);
  53. #elif defined(__APPLE__)
  54. uint64_t ticks;
  55. ticks = mach_absolute_time();
  56. srand((unsigned int)ticks);
  57. #else
  58. struct timeval tv;
  59. gettimeofday(&tv, NULL);
  60. srand(tv.tv_sec*1000 + tv.tv_usec/1000);
  61. #endif
  62. rc = net__init();
  63. if (rc != MOSQ_ERR_SUCCESS) {
  64. return rc;
  65. }
  66. }
  67. init_refcount++;
  68. return MOSQ_ERR_SUCCESS;
  69. }
  70. int mosquitto_lib_cleanup(void)
  71. {
  72. if (init_refcount == 1) {
  73. net__cleanup();
  74. }
  75. if (init_refcount > 0) {
  76. --init_refcount;
  77. }
  78. return MOSQ_ERR_SUCCESS;
  79. }
  80. struct mosquitto *mosquitto_new(const char *id, bool clean_start, void *userdata)
  81. {
  82. struct mosquitto *mosq = NULL;
  83. int rc;
  84. if(clean_start == false && id == NULL){
  85. errno = EINVAL;
  86. return NULL;
  87. }
  88. #ifndef WIN32
  89. signal(SIGPIPE, SIG_IGN);
  90. #endif
  91. mosq = (struct mosquitto *)mosquitto__calloc(1, sizeof(struct mosquitto));
  92. if(mosq){
  93. mosq->sock = INVALID_SOCKET;
  94. #ifdef WITH_THREADING
  95. mosq->thread_id = pthread_self();
  96. #endif
  97. mosq->sockpairR = INVALID_SOCKET;
  98. mosq->sockpairW = INVALID_SOCKET;
  99. rc = mosquitto_reinitialise(mosq, id, clean_start, userdata);
  100. if(rc){
  101. mosquitto_destroy(mosq);
  102. if(rc == MOSQ_ERR_INVAL){
  103. errno = EINVAL;
  104. }else if(rc == MOSQ_ERR_NOMEM){
  105. errno = ENOMEM;
  106. }
  107. return NULL;
  108. }
  109. }else{
  110. errno = ENOMEM;
  111. }
  112. return mosq;
  113. }
  114. int mosquitto_reinitialise(struct mosquitto *mosq, const char *id, bool clean_start, void *userdata)
  115. {
  116. if(!mosq) return MOSQ_ERR_INVAL;
  117. if(clean_start == false && id == NULL){
  118. return MOSQ_ERR_INVAL;
  119. }
  120. mosquitto__destroy(mosq);
  121. memset(mosq, 0, sizeof(struct mosquitto));
  122. if(userdata){
  123. mosq->userdata = userdata;
  124. }else{
  125. mosq->userdata = mosq;
  126. }
  127. mosq->protocol = mosq_p_mqtt311;
  128. mosq->sock = INVALID_SOCKET;
  129. mosq->sockpairR = INVALID_SOCKET;
  130. mosq->sockpairW = INVALID_SOCKET;
  131. mosq->keepalive = 60;
  132. mosq->clean_start = clean_start;
  133. if(id){
  134. if(STREMPTY(id)){
  135. return MOSQ_ERR_INVAL;
  136. }
  137. if(mosquitto_validate_utf8(id, (int)strlen(id))){
  138. return MOSQ_ERR_MALFORMED_UTF8;
  139. }
  140. mosq->id = mosquitto__strdup(id);
  141. }
  142. mosq->in_packet.payload = NULL;
  143. packet__cleanup(&mosq->in_packet);
  144. mosq->out_packet = NULL;
  145. mosq->out_packet_count = 0;
  146. mosq->current_out_packet = NULL;
  147. mosq->last_msg_in = mosquitto_time();
  148. mosq->next_msg_out = mosquitto_time() + mosq->keepalive;
  149. mosq->ping_t = 0;
  150. mosq->last_mid = 0;
  151. mosq->state = mosq_cs_new;
  152. mosq->max_qos = 2;
  153. mosq->msgs_in.inflight_maximum = 20;
  154. mosq->msgs_out.inflight_maximum = 20;
  155. mosq->msgs_in.inflight_quota = 20;
  156. mosq->msgs_out.inflight_quota = 20;
  157. mosq->will = NULL;
  158. mosq->on_connect = NULL;
  159. mosq->on_publish = NULL;
  160. mosq->on_message = NULL;
  161. mosq->on_subscribe = NULL;
  162. mosq->on_unsubscribe = NULL;
  163. mosq->host = NULL;
  164. mosq->port = 1883;
  165. mosq->in_callback = false;
  166. mosq->reconnect_delay = 1;
  167. mosq->reconnect_delay_max = 1;
  168. mosq->reconnect_exponential_backoff = false;
  169. mosq->threaded = mosq_ts_none;
  170. #ifdef WITH_TLS
  171. mosq->ssl = NULL;
  172. mosq->ssl_ctx = NULL;
  173. mosq->ssl_ctx_defaults = true;
  174. mosq->tls_cert_reqs = SSL_VERIFY_PEER;
  175. mosq->tls_insecure = false;
  176. mosq->want_write = false;
  177. mosq->tls_ocsp_required = false;
  178. #endif
  179. #ifdef WITH_THREADING
  180. pthread_mutex_init(&mosq->callback_mutex, NULL);
  181. pthread_mutex_init(&mosq->log_callback_mutex, NULL);
  182. pthread_mutex_init(&mosq->state_mutex, NULL);
  183. pthread_mutex_init(&mosq->out_packet_mutex, NULL);
  184. pthread_mutex_init(&mosq->current_out_packet_mutex, NULL);
  185. pthread_mutex_init(&mosq->msgtime_mutex, NULL);
  186. pthread_mutex_init(&mosq->msgs_in.mutex, NULL);
  187. pthread_mutex_init(&mosq->msgs_out.mutex, NULL);
  188. pthread_mutex_init(&mosq->mid_mutex, NULL);
  189. mosq->thread_id = pthread_self();
  190. #endif
  191. /* This must be after pthread_mutex_init(), otherwise the log mutex may be
  192. * used before being initialised. */
  193. if(net__socketpair(&mosq->sockpairR, &mosq->sockpairW)){
  194. log__printf(mosq, MOSQ_LOG_WARNING,
  195. "Warning: Unable to open socket pair, outgoing publish commands may be delayed.");
  196. }
  197. return MOSQ_ERR_SUCCESS;
  198. }
  199. void mosquitto__destroy(struct mosquitto *mosq)
  200. {
  201. if(!mosq) return;
  202. #ifdef WITH_THREADING
  203. # ifdef HAVE_PTHREAD_CANCEL
  204. if(mosq->threaded == mosq_ts_self && !pthread_equal(mosq->thread_id, pthread_self())){
  205. pthread_cancel(mosq->thread_id);
  206. pthread_join(mosq->thread_id, NULL);
  207. mosq->threaded = mosq_ts_none;
  208. }
  209. # endif
  210. if(mosq->id){
  211. /* If mosq->id is not NULL then the client has already been initialised
  212. * and so the mutexes need destroying. If mosq->id is NULL, the mutexes
  213. * haven't been initialised. */
  214. pthread_mutex_destroy(&mosq->callback_mutex);
  215. pthread_mutex_destroy(&mosq->log_callback_mutex);
  216. pthread_mutex_destroy(&mosq->state_mutex);
  217. pthread_mutex_destroy(&mosq->out_packet_mutex);
  218. pthread_mutex_destroy(&mosq->current_out_packet_mutex);
  219. pthread_mutex_destroy(&mosq->msgtime_mutex);
  220. pthread_mutex_destroy(&mosq->msgs_in.mutex);
  221. pthread_mutex_destroy(&mosq->msgs_out.mutex);
  222. pthread_mutex_destroy(&mosq->mid_mutex);
  223. }
  224. #endif
  225. if(mosq->sock != INVALID_SOCKET){
  226. net__socket_close(mosq);
  227. }
  228. message__cleanup_all(mosq);
  229. will__clear(mosq);
  230. #ifdef WITH_TLS
  231. if(mosq->ssl){
  232. SSL_free(mosq->ssl);
  233. }
  234. if(mosq->ssl_ctx){
  235. SSL_CTX_free(mosq->ssl_ctx);
  236. }
  237. mosquitto__free(mosq->tls_cafile);
  238. mosquitto__free(mosq->tls_capath);
  239. mosquitto__free(mosq->tls_certfile);
  240. mosquitto__free(mosq->tls_keyfile);
  241. if(mosq->tls_pw_callback) mosq->tls_pw_callback = NULL;
  242. mosquitto__free(mosq->tls_version);
  243. mosquitto__free(mosq->tls_ciphers);
  244. mosquitto__free(mosq->tls_psk);
  245. mosquitto__free(mosq->tls_psk_identity);
  246. mosquitto__free(mosq->tls_alpn);
  247. #endif
  248. mosquitto__free(mosq->address);
  249. mosq->address = NULL;
  250. mosquitto__free(mosq->id);
  251. mosq->id = NULL;
  252. mosquitto__free(mosq->username);
  253. mosq->username = NULL;
  254. mosquitto__free(mosq->password);
  255. mosq->password = NULL;
  256. mosquitto__free(mosq->host);
  257. mosq->host = NULL;
  258. mosquitto__free(mosq->bind_address);
  259. mosq->bind_address = NULL;
  260. mosquitto_property_free_all(&mosq->connect_properties);
  261. packet__cleanup_all_no_locks(mosq);
  262. packet__cleanup(&mosq->in_packet);
  263. if(mosq->sockpairR != INVALID_SOCKET){
  264. COMPAT_CLOSE(mosq->sockpairR);
  265. mosq->sockpairR = INVALID_SOCKET;
  266. }
  267. if(mosq->sockpairW != INVALID_SOCKET){
  268. COMPAT_CLOSE(mosq->sockpairW);
  269. mosq->sockpairW = INVALID_SOCKET;
  270. }
  271. }
  272. void mosquitto_destroy(struct mosquitto *mosq)
  273. {
  274. if(!mosq) return;
  275. mosquitto__destroy(mosq);
  276. mosquitto__free(mosq);
  277. }
  278. int mosquitto_socket(struct mosquitto *mosq)
  279. {
  280. if(!mosq) return INVALID_SOCKET;
  281. return mosq->sock;
  282. }
  283. bool mosquitto_want_write(struct mosquitto *mosq)
  284. {
  285. bool result = false;
  286. if(mosq->out_packet || mosq->current_out_packet){
  287. result = true;
  288. }
  289. #ifdef WITH_TLS
  290. if(mosq->ssl){
  291. if (mosq->want_write) {
  292. result = true;
  293. }else if(mosq->want_connect){
  294. result = false;
  295. }
  296. }
  297. #endif
  298. return result;
  299. }
  300. int mosquitto_sub_topic_tokenise(const char *subtopic, char ***topics, int *count)
  301. {
  302. size_t len;
  303. size_t hier_count = 1;
  304. size_t start, stop;
  305. size_t hier;
  306. size_t tlen;
  307. size_t i, j;
  308. if(!subtopic || !topics || !count) return MOSQ_ERR_INVAL;
  309. len = strlen(subtopic);
  310. for(i=0; i<len; i++){
  311. if(subtopic[i] == '/'){
  312. if(i > len-1){
  313. /* Separator at end of line */
  314. }else{
  315. hier_count++;
  316. }
  317. }
  318. }
  319. (*topics) = mosquitto__calloc(hier_count, sizeof(char *));
  320. if(!(*topics)) return MOSQ_ERR_NOMEM;
  321. start = 0;
  322. hier = 0;
  323. for(i=0; i<len+1; i++){
  324. if(subtopic[i] == '/' || subtopic[i] == '\0'){
  325. stop = i;
  326. if(start != stop){
  327. tlen = stop-start + 1;
  328. (*topics)[hier] = mosquitto__calloc(tlen, sizeof(char));
  329. if(!(*topics)[hier]){
  330. for(j=0; j<hier; j++){
  331. mosquitto__free((*topics)[j]);
  332. }
  333. mosquitto__free((*topics));
  334. return MOSQ_ERR_NOMEM;
  335. }
  336. for(j=start; j<stop; j++){
  337. (*topics)[hier][j-start] = subtopic[j];
  338. }
  339. }
  340. start = i+1;
  341. hier++;
  342. }
  343. }
  344. *count = (int)hier_count;
  345. return MOSQ_ERR_SUCCESS;
  346. }
  347. int mosquitto_sub_topic_tokens_free(char ***topics, int count)
  348. {
  349. int i;
  350. if(!topics || !(*topics) || count<1) return MOSQ_ERR_INVAL;
  351. for(i=0; i<count; i++){
  352. mosquitto__free((*topics)[i]);
  353. }
  354. mosquitto__free(*topics);
  355. return MOSQ_ERR_SUCCESS;
  356. }