pollfd.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487
  1. /*
  2. * libwebsockets - small server side websockets and web server implementation
  3. *
  4. * Copyright (C) 2010-2015 Andy Green <andy@warmcat.com>
  5. *
  6. * This library is free software; you can redistribute it and/or
  7. * modify it under the terms of the GNU Lesser General Public
  8. * License as published by the Free Software Foundation:
  9. * version 2.1 of the License.
  10. *
  11. * This library is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  14. * Lesser General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Lesser General Public
  17. * License along with this library; if not, write to the Free Software
  18. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
  19. * MA 02110-1301 USA
  20. */
  21. #include "private-libwebsockets.h"
  22. int
  23. _lws_change_pollfd(struct lws *wsi, int _and, int _or, struct lws_pollargs *pa)
  24. {
  25. struct lws_context_per_thread *pt;
  26. struct lws_context *context;
  27. int ret = 0, pa_events = 1;
  28. struct lws_pollfd *pfd;
  29. int sampled_tid, tid;
  30. if (!wsi || wsi->position_in_fds_table < 0)
  31. return 0;
  32. if (wsi->handling_pollout && !_and && _or == LWS_POLLOUT) {
  33. /*
  34. * Happening alongside service thread handling POLLOUT.
  35. * The danger is when he is finished, he will disable POLLOUT,
  36. * countermanding what we changed here.
  37. *
  38. * Instead of changing the fds, inform the service thread
  39. * what happened, and ask it to leave POLLOUT active on exit
  40. */
  41. wsi->leave_pollout_active = 1;
  42. /*
  43. * by definition service thread is not in poll wait, so no need
  44. * to cancel service
  45. */
  46. lwsl_debug("%s: using leave_pollout_active\n", __func__);
  47. return 0;
  48. }
  49. context = wsi->context;
  50. pt = &context->pt[(int)wsi->tsi];
  51. assert(wsi->position_in_fds_table >= 0 &&
  52. wsi->position_in_fds_table < pt->fds_count);
  53. pfd = &pt->fds[wsi->position_in_fds_table];
  54. pa->fd = wsi->desc.sockfd;
  55. pa->prev_events = pfd->events;
  56. pa->events = pfd->events = (pfd->events & ~_and) | _or;
  57. //lwsl_notice("%s: wsi %p, posin %d. from %d -> %d\n", __func__, wsi, wsi->position_in_fds_table, pa->prev_events, pa->events);
  58. if (wsi->http2_substream)
  59. return 0;
  60. if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_CHANGE_MODE_POLL_FD,
  61. wsi->user_space, (void *)pa, 0)) {
  62. ret = -1;
  63. goto bail;
  64. }
  65. if (_and & LWS_POLLIN) {
  66. lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_READ);
  67. lws_libuv_io(wsi, LWS_EV_STOP | LWS_EV_READ);
  68. }
  69. if (_or & LWS_POLLIN) {
  70. lws_libev_io(wsi, LWS_EV_START | LWS_EV_READ);
  71. lws_libuv_io(wsi, LWS_EV_START | LWS_EV_READ);
  72. }
  73. if (_and & LWS_POLLOUT) {
  74. lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_WRITE);
  75. lws_libuv_io(wsi, LWS_EV_STOP | LWS_EV_WRITE);
  76. }
  77. if (_or & LWS_POLLOUT) {
  78. lws_libev_io(wsi, LWS_EV_START | LWS_EV_WRITE);
  79. lws_libuv_io(wsi, LWS_EV_START | LWS_EV_WRITE);
  80. }
  81. /*
  82. * if we changed something in this pollfd...
  83. * ... and we're running in a different thread context
  84. * than the service thread...
  85. * ... and the service thread is waiting ...
  86. * then cancel it to force a restart with our changed events
  87. */
  88. #if LWS_POSIX
  89. pa_events = pa->prev_events != pa->events;
  90. #endif
  91. if (pa_events) {
  92. if (lws_plat_change_pollfd(context, wsi, pfd)) {
  93. lwsl_info("%s failed\n", __func__);
  94. ret = -1;
  95. goto bail;
  96. }
  97. sampled_tid = context->service_tid;
  98. if (sampled_tid) {
  99. tid = wsi->vhost->protocols[0].callback(wsi,
  100. LWS_CALLBACK_GET_THREAD_ID, NULL, NULL, 0);
  101. if (tid == -1) {
  102. ret = -1;
  103. goto bail;
  104. }
  105. if (tid != sampled_tid)
  106. lws_cancel_service_pt(wsi);
  107. }
  108. }
  109. bail:
  110. return ret;
  111. }
  112. #ifndef LWS_NO_SERVER
  113. static void
  114. lws_accept_modulation(struct lws_context_per_thread *pt, int allow)
  115. {
  116. // multithread listen seems broken
  117. #if 0
  118. struct lws_vhost *vh = context->vhost_list;
  119. struct lws_pollargs pa1;
  120. while (vh) {
  121. if (allow)
  122. _lws_change_pollfd(pt->wsi_listening,
  123. 0, LWS_POLLIN, &pa1);
  124. else
  125. _lws_change_pollfd(pt->wsi_listening,
  126. LWS_POLLIN, 0, &pa1);
  127. vh = vh->vhost_next;
  128. }
  129. #endif
  130. }
  131. #endif
  132. int
  133. insert_wsi_socket_into_fds(struct lws_context *context, struct lws *wsi)
  134. {
  135. struct lws_pollargs pa = { wsi->desc.sockfd, LWS_POLLIN, 0 };
  136. struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
  137. int ret = 0;
  138. lwsl_debug("%s: %p: tsi=%d, sock=%d, pos-in-fds=%d\n",
  139. __func__, wsi, wsi->tsi, wsi->desc.sockfd, pt->fds_count);
  140. if ((unsigned int)pt->fds_count >= context->fd_limit_per_thread) {
  141. lwsl_err("Too many fds (%d vs %d)\n", context->max_fds,
  142. context->fd_limit_per_thread );
  143. return 1;
  144. }
  145. #if !defined(_WIN32) && !defined(LWS_WITH_ESP8266)
  146. if (wsi->desc.sockfd >= context->max_fds) {
  147. lwsl_err("Socket fd %d is too high (%d)\n",
  148. wsi->desc.sockfd, context->max_fds);
  149. return 1;
  150. }
  151. #endif
  152. assert(wsi);
  153. assert(wsi->vhost);
  154. assert(lws_socket_is_valid(wsi->desc.sockfd));
  155. if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
  156. wsi->user_space, (void *) &pa, 1))
  157. return -1;
  158. lws_pt_lock(pt);
  159. pt->count_conns++;
  160. insert_wsi(context, wsi);
  161. #if defined(LWS_WITH_ESP8266)
  162. if (wsi->position_in_fds_table == -1)
  163. #endif
  164. wsi->position_in_fds_table = pt->fds_count;
  165. // lwsl_notice("%s: %p: setting posinfds %d\n", __func__, wsi, wsi->position_in_fds_table);
  166. pt->fds[wsi->position_in_fds_table].fd = wsi->desc.sockfd;
  167. #if LWS_POSIX
  168. pt->fds[wsi->position_in_fds_table].events = LWS_POLLIN;
  169. #else
  170. pt->fds[wsi->position_in_fds_table].events = 0; // LWS_POLLIN;
  171. #endif
  172. pa.events = pt->fds[pt->fds_count].events;
  173. lws_plat_insert_socket_into_fds(context, wsi);
  174. /* external POLL support via protocol 0 */
  175. if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_ADD_POLL_FD,
  176. wsi->user_space, (void *) &pa, 0))
  177. ret = -1;
  178. #ifndef LWS_NO_SERVER
  179. /* if no more room, defeat accepts on this thread */
  180. if ((unsigned int)pt->fds_count == context->fd_limit_per_thread - 1)
  181. lws_accept_modulation(pt, 0);
  182. #endif
  183. lws_pt_unlock(pt);
  184. if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
  185. wsi->user_space, (void *)&pa, 1))
  186. ret = -1;
  187. return ret;
  188. }
  189. int
  190. remove_wsi_socket_from_fds(struct lws *wsi)
  191. {
  192. struct lws_context *context = wsi->context;
  193. struct lws_pollargs pa = { wsi->desc.sockfd, 0, 0 };
  194. #if !defined(LWS_WITH_ESP8266)
  195. struct lws_context_per_thread *pt = &context->pt[(int)wsi->tsi];
  196. struct lws *end_wsi;
  197. int v;
  198. #endif
  199. int m, ret = 0;
  200. #if !defined(_WIN32) && !defined(LWS_WITH_ESP8266)
  201. if (wsi->desc.sockfd > context->max_fds) {
  202. lwsl_err("fd %d too high (%d)\n", wsi->desc.sockfd, context->max_fds);
  203. return 1;
  204. }
  205. #endif
  206. if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
  207. wsi->user_space, (void *)&pa, 1))
  208. return -1;
  209. /*
  210. * detach ourselves from vh protocol list if we're on one
  211. * A -> B -> C
  212. * A -> C , or, B -> C, or A -> B
  213. */
  214. lwsl_info("%s: removing same prot wsi %p\n", __func__, wsi);
  215. if (wsi->same_vh_protocol_prev) {
  216. assert (*(wsi->same_vh_protocol_prev) == wsi);
  217. lwsl_info("have prev %p, setting him to our next %p\n",
  218. wsi->same_vh_protocol_prev,
  219. wsi->same_vh_protocol_next);
  220. /* guy who pointed to us should point to our next */
  221. *(wsi->same_vh_protocol_prev) = wsi->same_vh_protocol_next;
  222. } //else
  223. //lwsl_err("null wsi->prev\n");
  224. /* our next should point back to our prev */
  225. if (wsi->same_vh_protocol_next) {
  226. wsi->same_vh_protocol_next->same_vh_protocol_prev =
  227. wsi->same_vh_protocol_prev;
  228. } //else
  229. //lwsl_err("null wsi->next\n");
  230. wsi->same_vh_protocol_prev = NULL;
  231. wsi->same_vh_protocol_next = NULL;
  232. /* the guy who is to be deleted's slot index in pt->fds */
  233. m = wsi->position_in_fds_table;
  234. #if !defined(LWS_WITH_ESP8266)
  235. lws_libev_io(wsi, LWS_EV_STOP | LWS_EV_READ | LWS_EV_WRITE | LWS_EV_PREPARE_DELETION);
  236. lws_libuv_io(wsi, LWS_EV_STOP | LWS_EV_READ | LWS_EV_WRITE | LWS_EV_PREPARE_DELETION);
  237. lws_pt_lock(pt);
  238. lwsl_debug("%s: wsi=%p, sock=%d, fds pos=%d, end guy pos=%d, endfd=%d\n",
  239. __func__, wsi, wsi->desc.sockfd, wsi->position_in_fds_table,
  240. pt->fds_count, pt->fds[pt->fds_count].fd);
  241. /* have the last guy take up the now vacant slot */
  242. pt->fds[m] = pt->fds[pt->fds_count - 1];
  243. #endif
  244. /* this decrements pt->fds_count */
  245. lws_plat_delete_socket_from_fds(context, wsi, m);
  246. #if !defined(LWS_WITH_ESP8266)
  247. v = (int) pt->fds[m].fd;
  248. /* end guy's "position in fds table" is now the deletion guy's old one */
  249. end_wsi = wsi_from_fd(context, v);
  250. if (!end_wsi) {
  251. lwsl_err("no wsi found for sock fd %d at pos %d, pt->fds_count=%d\n", (int)pt->fds[m].fd, m, pt->fds_count);
  252. assert(0);
  253. } else
  254. end_wsi->position_in_fds_table = m;
  255. /* deletion guy's lws_lookup entry needs nuking */
  256. delete_from_fd(context, wsi->desc.sockfd);
  257. /* removed wsi has no position any more */
  258. wsi->position_in_fds_table = -1;
  259. /* remove also from external POLL support via protocol 0 */
  260. if (lws_socket_is_valid(wsi->desc.sockfd))
  261. if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_DEL_POLL_FD,
  262. wsi->user_space, (void *) &pa, 0))
  263. ret = -1;
  264. #ifndef LWS_NO_SERVER
  265. if (!context->being_destroyed)
  266. /* if this made some room, accept connects on this thread */
  267. if ((unsigned int)pt->fds_count < context->fd_limit_per_thread - 1)
  268. lws_accept_modulation(pt, 1);
  269. #endif
  270. lws_pt_unlock(pt);
  271. if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
  272. wsi->user_space, (void *) &pa, 1))
  273. ret = -1;
  274. #endif
  275. return ret;
  276. }
  277. int
  278. lws_change_pollfd(struct lws *wsi, int _and, int _or)
  279. {
  280. struct lws_context_per_thread *pt;
  281. struct lws_context *context;
  282. struct lws_pollargs pa;
  283. int ret = 0;
  284. if (!wsi || !wsi->protocol || wsi->position_in_fds_table < 0)
  285. return 1;
  286. context = lws_get_context(wsi);
  287. if (!context)
  288. return 1;
  289. if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_LOCK_POLL,
  290. wsi->user_space, (void *) &pa, 0))
  291. return -1;
  292. pt = &context->pt[(int)wsi->tsi];
  293. lws_pt_lock(pt);
  294. ret = _lws_change_pollfd(wsi, _and, _or, &pa);
  295. lws_pt_unlock(pt);
  296. if (wsi->vhost->protocols[0].callback(wsi, LWS_CALLBACK_UNLOCK_POLL,
  297. wsi->user_space, (void *) &pa, 0))
  298. ret = -1;
  299. return ret;
  300. }
  301. LWS_VISIBLE int
  302. lws_callback_on_writable(struct lws *wsi)
  303. {
  304. #ifdef LWS_USE_HTTP2
  305. struct lws *network_wsi, *wsi2;
  306. int already;
  307. #endif
  308. if (wsi->state == LWSS_SHUTDOWN)
  309. return 0;
  310. if (wsi->socket_is_permanently_unusable)
  311. return 0;
  312. #ifdef LWS_USE_HTTP2
  313. lwsl_info("%s: %p\n", __func__, wsi);
  314. if (wsi->mode != LWSCM_HTTP2_SERVING)
  315. goto network_sock;
  316. if (wsi->u.http2.requested_POLLOUT) {
  317. lwsl_info("already pending writable\n");
  318. return 1;
  319. }
  320. if (wsi->u.http2.tx_credit <= 0) {
  321. /*
  322. * other side is not able to cope with us sending
  323. * anything so no matter if we have POLLOUT on our side.
  324. *
  325. * Delay waiting for our POLLOUT until peer indicates he has
  326. * space for more using tx window command in http2 layer
  327. */
  328. lwsl_info("%s: %p: waiting_tx_credit (%d)\n", __func__, wsi,
  329. wsi->u.http2.tx_credit);
  330. wsi->u.http2.waiting_tx_credit = 1;
  331. return 0;
  332. }
  333. network_wsi = lws_http2_get_network_wsi(wsi);
  334. already = network_wsi->u.http2.requested_POLLOUT;
  335. /* mark everybody above him as requesting pollout */
  336. wsi2 = wsi;
  337. while (wsi2) {
  338. wsi2->u.http2.requested_POLLOUT = 1;
  339. lwsl_info("mark %p pending writable\n", wsi2);
  340. wsi2 = wsi2->u.http2.parent_wsi;
  341. }
  342. /* for network action, act only on the network wsi */
  343. wsi = network_wsi;
  344. if (already)
  345. return 1;
  346. network_sock:
  347. #endif
  348. if (lws_ext_cb_active(wsi, LWS_EXT_CB_REQUEST_ON_WRITEABLE, NULL, 0))
  349. return 1;
  350. if (wsi->position_in_fds_table < 0) {
  351. lwsl_err("%s: failed to find socket %d\n", __func__, wsi->desc.sockfd);
  352. return -1;
  353. }
  354. if (lws_change_pollfd(wsi, 0, LWS_POLLOUT))
  355. return -1;
  356. return 1;
  357. }
  358. LWS_VISIBLE int
  359. lws_callback_on_writable_all_protocol_vhost(const struct lws_vhost *vhost,
  360. const struct lws_protocols *protocol)
  361. {
  362. struct lws *wsi;
  363. if (protocol < vhost->protocols ||
  364. protocol >= (vhost->protocols + vhost->count_protocols)) {
  365. lwsl_err("%s: protocol %p is not from vhost %p (%p - %p)\n",
  366. __func__, protocol, vhost->protocols, vhost,
  367. (vhost->protocols + vhost->count_protocols));
  368. return -1;
  369. }
  370. wsi = vhost->same_vh_protocol_list[protocol - vhost->protocols];
  371. //lwsl_notice("%s: protocol %p, start wsi %p\n", __func__, protocol, wsi);
  372. while (wsi) {
  373. //lwsl_notice("%s: protocol %p, this wsi %p (wsi->protocol=%p)\n",
  374. // __func__, protocol, wsi, wsi->protocol);
  375. assert(wsi->protocol == protocol);
  376. assert(*wsi->same_vh_protocol_prev == wsi);
  377. if (wsi->same_vh_protocol_next) {
  378. // lwsl_err("my next says %p\n", wsi->same_vh_protocol_next);
  379. // lwsl_err("my next's prev says %p\n",
  380. // wsi->same_vh_protocol_next->same_vh_protocol_prev);
  381. assert(wsi->same_vh_protocol_next->same_vh_protocol_prev == &wsi->same_vh_protocol_next);
  382. }
  383. //lwsl_notice(" apv: %p\n", wsi);
  384. lws_callback_on_writable(wsi);
  385. wsi = wsi->same_vh_protocol_next;
  386. }
  387. return 0;
  388. }
  389. LWS_VISIBLE int
  390. lws_callback_on_writable_all_protocol(const struct lws_context *context,
  391. const struct lws_protocols *protocol)
  392. {
  393. struct lws_vhost *vhost = context->vhost_list;
  394. int n;
  395. while (vhost) {
  396. for (n = 0; n < vhost->count_protocols; n++)
  397. if (protocol->callback ==
  398. vhost->protocols[n].callback &&
  399. !strcmp(protocol->name, vhost->protocols[n].name))
  400. break;
  401. if (n != vhost->count_protocols)
  402. lws_callback_on_writable_all_protocol_vhost(
  403. vhost, &vhost->protocols[n]);
  404. vhost = vhost->vhost_next;
  405. }
  406. return 0;
  407. }