123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151 |
- #include "config.h"
- #ifndef WIN32
- #include <time.h>
- #endif
- #if defined(WITH_THREADING)
- #if defined(__linux__) || defined(__NetBSD__)
- # include <pthread.h>
- #elif defined(__FreeBSD__) || defined(__OpenBSD__)
- # include <pthread_np.h>
- #endif
- #endif
- #include "mosquitto_internal.h"
- #include "net_mosq.h"
- #include "util_mosq.h"
- void *mosquitto__thread_main(void *obj);
- int mosquitto_loop_start(struct mosquitto *mosq)
- {
- #if defined(WITH_THREADING)
- if(!mosq || mosq->threaded != mosq_ts_none) return MOSQ_ERR_INVAL;
- mosq->threaded = mosq_ts_self;
- if(!pthread_create(&mosq->thread_id, NULL, mosquitto__thread_main, mosq)){
- #if defined(__linux__)
- pthread_setname_np(mosq->thread_id, "mosquitto loop");
- #elif defined(__NetBSD__)
- pthread_setname_np(mosq->thread_id, "%s", "mosquitto loop");
- #elif defined(__FreeBSD__) || defined(__OpenBSD__)
- pthread_set_name_np(mosq->thread_id, "mosquitto loop");
- #endif
- return MOSQ_ERR_SUCCESS;
- }else{
- return MOSQ_ERR_ERRNO;
- }
- #else
- UNUSED(mosq);
- return MOSQ_ERR_NOT_SUPPORTED;
- #endif
- }
- int mosquitto_loop_stop(struct mosquitto *mosq, bool force)
- {
- #if defined(WITH_THREADING)
- # ifndef WITH_BROKER
- char sockpair_data = 0;
- # endif
- if(!mosq || mosq->threaded != mosq_ts_self) return MOSQ_ERR_INVAL;
-
- if(mosq->sockpairW != INVALID_SOCKET){
- #ifndef WIN32
- if(write(mosq->sockpairW, &sockpair_data, 1)){
- }
- #else
- send(mosq->sockpairW, &sockpair_data, 1, 0);
- #endif
- }
- #ifdef HAVE_PTHREAD_CANCEL
- if(force){
- pthread_cancel(mosq->thread_id);
- }
- #endif
- pthread_join(mosq->thread_id, NULL);
- mosq->thread_id = pthread_self();
- mosq->threaded = mosq_ts_none;
- return MOSQ_ERR_SUCCESS;
- #else
- UNUSED(mosq);
- UNUSED(force);
- return MOSQ_ERR_NOT_SUPPORTED;
- #endif
- }
- #ifdef WITH_THREADING
- void *mosquitto__thread_main(void *obj)
- {
- struct mosquitto *mosq = obj;
- #ifndef WIN32
- struct timespec ts;
- ts.tv_sec = 0;
- ts.tv_nsec = 10000000;
- #endif
- if(!mosq) return NULL;
- do{
- if(mosquitto__get_state(mosq) == mosq_cs_new){
- #ifdef WIN32
- Sleep(10);
- #else
- nanosleep(&ts, NULL);
- #endif
- }else{
- break;
- }
- }while(1);
- if(!mosq->keepalive){
-
- mosquitto_loop_forever(mosq, 1000*86400, 1);
- }else{
-
- mosquitto_loop_forever(mosq, mosq->keepalive*1000, 1);
- }
- if(mosq->threaded == mosq_ts_self){
- mosq->threaded = mosq_ts_none;
- }
- return obj;
- }
- #endif
- int mosquitto_threaded_set(struct mosquitto *mosq, bool threaded)
- {
- if(!mosq) return MOSQ_ERR_INVAL;
- if(threaded){
- mosq->threaded = mosq_ts_external;
- }else{
- mosq->threaded = mosq_ts_none;
- }
- return MOSQ_ERR_SUCCESS;
- }
|