loop.c 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. /*
  2. Copyright (c) 2010-2020 Roger Light <roger@atchoo.org>
  3. All rights reserved. This program and the accompanying materials
  4. are made available under the terms of the Eclipse Public License 2.0
  5. and Eclipse Distribution License v1.0 which accompany this distribution.
  6. The Eclipse Public License is available at
  7. https://www.eclipse.org/legal/epl-2.0/
  8. and the Eclipse Distribution License is available at
  9. http://www.eclipse.org/org/documents/edl-v10.php.
  10. SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause
  11. Contributors:
  12. Roger Light - initial implementation and documentation.
  13. */
  14. #include "config.h"
  15. #include <errno.h>
  16. #ifndef WIN32
  17. #include <sys/select.h>
  18. #include <time.h>
  19. #endif
  20. #include "mosquitto.h"
  21. #include "mosquitto_internal.h"
  22. #include "net_mosq.h"
  23. #include "packet_mosq.h"
  24. #include "socks_mosq.h"
  25. #include "tls_mosq.h"
  26. #include "util_mosq.h"
  27. #if !defined(WIN32) && !defined(__SYMBIAN32__) && !defined(__QNX__)
  28. #define HAVE_PSELECT
  29. #endif
  30. int mosquitto_loop(struct mosquitto *mosq, int timeout, int max_packets)
  31. {
  32. #ifdef HAVE_PSELECT
  33. struct timespec local_timeout;
  34. #else
  35. struct timeval local_timeout;
  36. #endif
  37. fd_set readfds, writefds;
  38. int fdcount;
  39. int rc;
  40. char pairbuf;
  41. int maxfd = 0;
  42. time_t now;
  43. time_t timeout_ms;
  44. if(!mosq || max_packets < 1) return MOSQ_ERR_INVAL;
  45. #ifndef WIN32
  46. if(mosq->sock >= FD_SETSIZE || mosq->sockpairR >= FD_SETSIZE){
  47. return MOSQ_ERR_INVAL;
  48. }
  49. #endif
  50. FD_ZERO(&readfds);
  51. FD_ZERO(&writefds);
  52. if(mosq->sock != INVALID_SOCKET){
  53. maxfd = mosq->sock;
  54. FD_SET(mosq->sock, &readfds);
  55. pthread_mutex_lock(&mosq->current_out_packet_mutex);
  56. pthread_mutex_lock(&mosq->out_packet_mutex);
  57. if(mosq->out_packet || mosq->current_out_packet){
  58. FD_SET(mosq->sock, &writefds);
  59. }
  60. #ifdef WITH_TLS
  61. if(mosq->ssl){
  62. if(mosq->want_write){
  63. FD_SET(mosq->sock, &writefds);
  64. }else if(mosq->want_connect){
  65. /* Remove possible FD_SET from above, we don't want to check
  66. * for writing if we are still connecting, unless want_write is
  67. * definitely set. The presence of outgoing packets does not
  68. * matter yet. */
  69. FD_CLR(mosq->sock, &writefds);
  70. }
  71. }
  72. #endif
  73. pthread_mutex_unlock(&mosq->out_packet_mutex);
  74. pthread_mutex_unlock(&mosq->current_out_packet_mutex);
  75. }else{
  76. #ifdef WITH_SRV
  77. if(mosq->achan){
  78. if(mosquitto__get_state(mosq) == mosq_cs_connect_srv){
  79. rc = ares_fds(mosq->achan, &readfds, &writefds);
  80. if(rc > maxfd){
  81. maxfd = rc;
  82. }
  83. }else{
  84. return MOSQ_ERR_NO_CONN;
  85. }
  86. }
  87. #else
  88. return MOSQ_ERR_NO_CONN;
  89. #endif
  90. }
  91. if(mosq->sockpairR != INVALID_SOCKET){
  92. /* sockpairR is used to break out of select() before the timeout, on a
  93. * call to publish() etc. */
  94. FD_SET(mosq->sockpairR, &readfds);
  95. if((int)mosq->sockpairR > maxfd){
  96. maxfd = mosq->sockpairR;
  97. }
  98. }
  99. timeout_ms = timeout;
  100. if(timeout_ms < 0){
  101. timeout_ms = 1000;
  102. }
  103. now = mosquitto_time();
  104. if(mosq->next_msg_out && now + timeout_ms/1000 > mosq->next_msg_out){
  105. timeout_ms = (mosq->next_msg_out - now)*1000;
  106. }
  107. if(timeout_ms < 0){
  108. /* There has been a delay somewhere which means we should have already
  109. * sent a message. */
  110. timeout_ms = 0;
  111. }
  112. local_timeout.tv_sec = timeout_ms/1000;
  113. #ifdef HAVE_PSELECT
  114. local_timeout.tv_nsec = (timeout_ms-local_timeout.tv_sec*1000)*1000000;
  115. #else
  116. local_timeout.tv_usec = (timeout_ms-local_timeout.tv_sec*1000)*1000;
  117. #endif
  118. #ifdef HAVE_PSELECT
  119. fdcount = pselect(maxfd+1, &readfds, &writefds, NULL, &local_timeout, NULL);
  120. #else
  121. fdcount = select(maxfd+1, &readfds, &writefds, NULL, &local_timeout);
  122. #endif
  123. if(fdcount == -1){
  124. #ifdef WIN32
  125. errno = WSAGetLastError();
  126. #endif
  127. if(errno == EINTR){
  128. return MOSQ_ERR_SUCCESS;
  129. }else{
  130. return MOSQ_ERR_ERRNO;
  131. }
  132. }else{
  133. if(mosq->sock != INVALID_SOCKET){
  134. if(FD_ISSET(mosq->sock, &readfds)){
  135. rc = mosquitto_loop_read(mosq, max_packets);
  136. if(rc || mosq->sock == INVALID_SOCKET){
  137. return rc;
  138. }
  139. }
  140. if(mosq->sockpairR != INVALID_SOCKET && FD_ISSET(mosq->sockpairR, &readfds)){
  141. #ifndef WIN32
  142. if(read(mosq->sockpairR, &pairbuf, 1) == 0){
  143. }
  144. #else
  145. recv(mosq->sockpairR, &pairbuf, 1, 0);
  146. #endif
  147. /* Fake write possible, to stimulate output write even though
  148. * we didn't ask for it, because at that point the publish or
  149. * other command wasn't present. */
  150. if(mosq->sock != INVALID_SOCKET)
  151. FD_SET(mosq->sock, &writefds);
  152. }
  153. if(mosq->sock != INVALID_SOCKET && FD_ISSET(mosq->sock, &writefds)){
  154. #ifdef WITH_TLS
  155. if(mosq->want_connect){
  156. rc = net__socket_connect_tls(mosq);
  157. if(rc) return rc;
  158. }else
  159. #endif
  160. {
  161. rc = mosquitto_loop_write(mosq, max_packets);
  162. if(rc || mosq->sock == INVALID_SOCKET){
  163. return rc;
  164. }
  165. }
  166. }
  167. }
  168. #ifdef WITH_SRV
  169. if(mosq->achan){
  170. ares_process(mosq->achan, &readfds, &writefds);
  171. }
  172. #endif
  173. }
  174. return mosquitto_loop_misc(mosq);
  175. }
  176. static int interruptible_sleep(struct mosquitto *mosq, time_t reconnect_delay)
  177. {
  178. #ifdef HAVE_PSELECT
  179. struct timespec local_timeout;
  180. #else
  181. struct timeval local_timeout;
  182. #endif
  183. fd_set readfds;
  184. int fdcount;
  185. char pairbuf;
  186. int maxfd = 0;
  187. #ifndef WIN32
  188. while(mosq->sockpairR != INVALID_SOCKET && read(mosq->sockpairR, &pairbuf, 1) > 0);
  189. #else
  190. while(mosq->sockpairR != INVALID_SOCKET && recv(mosq->sockpairR, &pairbuf, 1, 0) > 0);
  191. #endif
  192. local_timeout.tv_sec = reconnect_delay;
  193. #ifdef HAVE_PSELECT
  194. local_timeout.tv_nsec = 0;
  195. #else
  196. local_timeout.tv_usec = 0;
  197. #endif
  198. FD_ZERO(&readfds);
  199. maxfd = 0;
  200. if(mosq->sockpairR != INVALID_SOCKET){
  201. /* sockpairR is used to break out of select() before the
  202. * timeout, when mosquitto_loop_stop() is called */
  203. FD_SET(mosq->sockpairR, &readfds);
  204. maxfd = mosq->sockpairR;
  205. }
  206. #ifdef HAVE_PSELECT
  207. fdcount = pselect(maxfd+1, &readfds, NULL, NULL, &local_timeout, NULL);
  208. #else
  209. fdcount = select(maxfd+1, &readfds, NULL, NULL, &local_timeout);
  210. #endif
  211. if(fdcount == -1){
  212. #ifdef WIN32
  213. errno = WSAGetLastError();
  214. #endif
  215. if(errno == EINTR){
  216. return MOSQ_ERR_SUCCESS;
  217. }else{
  218. return MOSQ_ERR_ERRNO;
  219. }
  220. }else if(mosq->sockpairR != INVALID_SOCKET && FD_ISSET(mosq->sockpairR, &readfds)){
  221. #ifndef WIN32
  222. if(read(mosq->sockpairR, &pairbuf, 1) == 0){
  223. }
  224. #else
  225. recv(mosq->sockpairR, &pairbuf, 1, 0);
  226. #endif
  227. }
  228. return MOSQ_ERR_SUCCESS;
  229. }
  230. int mosquitto_loop_forever(struct mosquitto *mosq, int timeout, int max_packets)
  231. {
  232. int run = 1;
  233. int rc = MOSQ_ERR_SUCCESS;
  234. unsigned long reconnect_delay;
  235. enum mosquitto_client_state state;
  236. if(!mosq) return MOSQ_ERR_INVAL;
  237. mosq->reconnects = 0;
  238. while(run){
  239. do{
  240. #ifdef HAVE_PTHREAD_CANCEL
  241. pthread_testcancel();
  242. #endif
  243. rc = mosquitto_loop(mosq, timeout, max_packets);
  244. }while(run && rc == MOSQ_ERR_SUCCESS);
  245. /* Quit after fatal errors. */
  246. switch(rc){
  247. case MOSQ_ERR_NOMEM:
  248. case MOSQ_ERR_PROTOCOL:
  249. case MOSQ_ERR_INVAL:
  250. case MOSQ_ERR_NOT_FOUND:
  251. case MOSQ_ERR_TLS:
  252. case MOSQ_ERR_PAYLOAD_SIZE:
  253. case MOSQ_ERR_NOT_SUPPORTED:
  254. case MOSQ_ERR_AUTH:
  255. case MOSQ_ERR_ACL_DENIED:
  256. case MOSQ_ERR_UNKNOWN:
  257. case MOSQ_ERR_EAI:
  258. case MOSQ_ERR_PROXY:
  259. return rc;
  260. case MOSQ_ERR_ERRNO:
  261. break;
  262. }
  263. if(errno == EPROTO){
  264. return rc;
  265. }
  266. do{
  267. #ifdef HAVE_PTHREAD_CANCEL
  268. pthread_testcancel();
  269. #endif
  270. rc = MOSQ_ERR_SUCCESS;
  271. state = mosquitto__get_state(mosq);
  272. if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
  273. run = 0;
  274. }else{
  275. if(mosq->reconnect_delay_max > mosq->reconnect_delay){
  276. if(mosq->reconnect_exponential_backoff){
  277. reconnect_delay = mosq->reconnect_delay*(mosq->reconnects+1)*(mosq->reconnects+1);
  278. }else{
  279. reconnect_delay = mosq->reconnect_delay*(mosq->reconnects+1);
  280. }
  281. }else{
  282. reconnect_delay = mosq->reconnect_delay;
  283. }
  284. if(reconnect_delay > mosq->reconnect_delay_max){
  285. reconnect_delay = mosq->reconnect_delay_max;
  286. }else{
  287. mosq->reconnects++;
  288. }
  289. rc = interruptible_sleep(mosq, (time_t)reconnect_delay);
  290. if(rc) return rc;
  291. state = mosquitto__get_state(mosq);
  292. if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
  293. run = 0;
  294. }else{
  295. rc = mosquitto_reconnect(mosq);
  296. }
  297. }
  298. }while(run && rc != MOSQ_ERR_SUCCESS);
  299. }
  300. return rc;
  301. }
  302. int mosquitto_loop_misc(struct mosquitto *mosq)
  303. {
  304. if(!mosq) return MOSQ_ERR_INVAL;
  305. if(mosq->sock == INVALID_SOCKET) return MOSQ_ERR_NO_CONN;
  306. return mosquitto__check_keepalive(mosq);
  307. }
  308. static int mosquitto__loop_rc_handle(struct mosquitto *mosq, int rc)
  309. {
  310. enum mosquitto_client_state state;
  311. if(rc){
  312. net__socket_close(mosq);
  313. state = mosquitto__get_state(mosq);
  314. if(state == mosq_cs_disconnecting || state == mosq_cs_disconnected){
  315. rc = MOSQ_ERR_SUCCESS;
  316. }
  317. pthread_mutex_lock(&mosq->callback_mutex);
  318. if(mosq->on_disconnect){
  319. mosq->in_callback = true;
  320. mosq->on_disconnect(mosq, mosq->userdata, rc);
  321. mosq->in_callback = false;
  322. }
  323. if(mosq->on_disconnect_v5){
  324. mosq->in_callback = true;
  325. mosq->on_disconnect_v5(mosq, mosq->userdata, rc, NULL);
  326. mosq->in_callback = false;
  327. }
  328. pthread_mutex_unlock(&mosq->callback_mutex);
  329. }
  330. return rc;
  331. }
  332. int mosquitto_loop_read(struct mosquitto *mosq, int max_packets)
  333. {
  334. int rc = MOSQ_ERR_SUCCESS;
  335. int i;
  336. if(max_packets < 1) return MOSQ_ERR_INVAL;
  337. #ifdef WITH_TLS
  338. if(mosq->want_connect){
  339. rc = net__socket_connect_tls(mosq);
  340. if (MOSQ_ERR_TLS == rc){
  341. rc = mosquitto__loop_rc_handle(mosq, rc);
  342. }
  343. return rc;
  344. }
  345. #endif
  346. pthread_mutex_lock(&mosq->msgs_out.mutex);
  347. max_packets = mosq->msgs_out.queue_len;
  348. pthread_mutex_unlock(&mosq->msgs_out.mutex);
  349. pthread_mutex_lock(&mosq->msgs_in.mutex);
  350. max_packets += mosq->msgs_in.queue_len;
  351. pthread_mutex_unlock(&mosq->msgs_in.mutex);
  352. if(max_packets < 1) max_packets = 1;
  353. /* Queue len here tells us how many messages are awaiting processing and
  354. * have QoS > 0. We should try to deal with that many in this loop in order
  355. * to keep up. */
  356. for(i=0; i<max_packets || SSL_DATA_PENDING(mosq); i++){
  357. #ifdef WITH_SOCKS
  358. if(mosq->socks5_host){
  359. rc = socks5__read(mosq);
  360. }else
  361. #endif
  362. {
  363. rc = packet__read(mosq);
  364. }
  365. if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
  366. return mosquitto__loop_rc_handle(mosq, rc);
  367. }
  368. }
  369. return rc;
  370. }
  371. int mosquitto_loop_write(struct mosquitto *mosq, int max_packets)
  372. {
  373. int rc = MOSQ_ERR_SUCCESS;
  374. int i;
  375. if(max_packets < 1) return MOSQ_ERR_INVAL;
  376. for(i=0; i<max_packets; i++){
  377. rc = packet__write(mosq);
  378. if(rc || errno == EAGAIN || errno == COMPAT_EWOULDBLOCK){
  379. return mosquitto__loop_rc_handle(mosq, rc);
  380. }
  381. }
  382. return rc;
  383. }