ares_event_thread.c (16067B)
1 /* MIT License 2 * 3 * Copyright (c) 2024 Brad House 4 * 5 * Permission is hereby granted, free of charge, to any person obtaining a copy 6 * of this software and associated documentation files (the "Software"), to deal 7 * in the Software without restriction, including without limitation the rights 8 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 9 * copies of the Software, and to permit persons to whom the Software is 10 * furnished to do so, subject to the following conditions: 11 * 12 * The above copyright notice and this permission notice (including the next 13 * paragraph) shall be included in all copies or substantial portions of the 14 * Software. 15 * 16 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 17 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 18 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 19 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 20 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 21 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE 22 * SOFTWARE. 23 * 24 * SPDX-License-Identifier: MIT 25 */ 26 #include "ares_private.h" 27 #include "ares_event.h" 28 29 #ifdef CARES_THREADS 30 static void ares_event_destroy_cb(void *arg) 31 { 32 ares_event_t *event = arg; 33 if (event == NULL) { 34 return; /* LCOV_EXCL_LINE: DefensiveCoding */ 35 } 36 37 /* Unregister from the event thread if it was registered with one */ 38 if (event->e) { 39 const ares_event_thread_t *e = event->e; 40 e->ev_sys->event_del(event); 41 event->e = NULL; 42 } 43 44 if (event->free_data_cb && event->data) { 45 event->free_data_cb(event->data); 46 } 47 48 ares_free(event); 49 } 50 51 static void ares_event_signal(const ares_event_t *event) 52 { 53 if (event == NULL || event->signal_cb == NULL) { 54 return; /* LCOV_EXCL_LINE: DefensiveCoding */ 55 } 56 event->signal_cb(event); 57 } 58 59 static void ares_event_thread_wake(const ares_event_thread_t *e) 60 { 61 if (e == NULL) { 62 return; /* LCOV_EXCL_LINE: DefensiveCoding */ 63 } 64 65 ares_event_signal(e->ev_signal); 66 } 67 68 /* See if a pending update already exists. We don't want to enqueue multiple 69 * updates for the same event handle. Right now this is O(n) based on number 70 * of updates already enqueued. In the future, it might make sense to make 71 * this O(1) with a hashtable. 72 * NOTE: in some cases a delete then re-add of the same fd, but really pointing 73 * to a different destination can happen due to a quick close of a 74 * connection then creation of a new one. So we need to look at the 75 * flags and ignore any delete events when finding a match since we 76 * need to process the delete always, it can't be combined with other 77 * updates. */ 78 static ares_event_t *ares_event_update_find(ares_event_thread_t *e, 79 ares_socket_t fd, const void *data) 80 { 81 ares_llist_node_t *node; 82 83 for (node = ares_llist_node_first(e->ev_updates); node != NULL; 84 node = ares_llist_node_next(node)) { 85 ares_event_t *ev = ares_llist_node_val(node); 86 87 if (fd != ARES_SOCKET_BAD && fd == ev->fd && ev->flags != 0) { 88 return ev; 89 } 90 91 if (fd == ARES_SOCKET_BAD && ev->fd == ARES_SOCKET_BAD && 92 data == ev->data && ev->flags != 0) { 93 return ev; 94 } 95 } 96 97 return NULL; 98 } 99 100 ares_status_t ares_event_update(ares_event_t **event, ares_event_thread_t *e, 101 ares_event_flags_t flags, ares_event_cb_t cb, 102 ares_socket_t fd, void *data, 103 ares_event_free_data_t free_data_cb, 104 ares_event_signal_cb_t signal_cb) 105 { 106 ares_event_t *ev = NULL; 107 ares_status_t status; 108 109 if (e == NULL) { 110 return ARES_EFORMERR; /* LCOV_EXCL_LINE: DefensiveCoding */ 111 } 112 113 /* Callback must be specified if not a removal event. */ 114 if (flags != ARES_EVENT_FLAG_NONE && cb == NULL) { 115 return ARES_EFORMERR; 116 } 117 118 if (event != NULL) { 119 *event = NULL; 120 } 121 122 /* Validate flags */ 123 if (fd == ARES_SOCKET_BAD) { 124 if (flags & (ARES_EVENT_FLAG_READ | ARES_EVENT_FLAG_WRITE)) { 125 return ARES_EFORMERR; 126 } 127 if (!(flags & ARES_EVENT_FLAG_OTHER)) { 128 return ARES_EFORMERR; 129 } 130 } else { 131 if (flags & ARES_EVENT_FLAG_OTHER) { 132 return ARES_EFORMERR; 133 } 134 } 135 136 /* That's all the validation we can really do */ 137 138 ares_thread_mutex_lock(e->mutex); 139 140 /* See if we have a queued update already */ 141 ev = ares_event_update_find(e, fd, data); 142 if (ev == NULL) { 143 /* Allocate a new one */ 144 ev = ares_malloc_zero(sizeof(*ev)); 145 if (ev == NULL) { 146 status = ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */ 147 goto done; /* LCOV_EXCL_LINE: OutOfMemory */ 148 } 149 150 if (ares_llist_insert_last(e->ev_updates, ev) == NULL) { 151 ares_free(ev); /* LCOV_EXCL_LINE: OutOfMemory */ 152 status = ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */ 153 goto done; /* LCOV_EXCL_LINE: OutOfMemory */ 154 } 155 } 156 157 ev->flags = flags; 158 ev->fd = fd; 159 if (ev->cb == NULL) { 160 ev->cb = cb; 161 } 162 if (ev->data == NULL) { 163 ev->data = data; 164 } 165 if (ev->free_data_cb == NULL) { 166 ev->free_data_cb = free_data_cb; 167 } 168 if (ev->signal_cb == NULL) { 169 ev->signal_cb = signal_cb; 170 } 171 172 if (event != NULL) { 173 *event = ev; 174 } 175 176 status = ARES_SUCCESS; 177 178 done: 179 if (status == ARES_SUCCESS) { 180 /* Wake event thread if successful so it can pull the updates */ 181 ares_event_thread_wake(e); 182 } 183 184 ares_thread_mutex_unlock(e->mutex); 185 186 return status; 187 } 188 189 static void ares_event_thread_process_fd(ares_event_thread_t *e, 190 ares_socket_t fd, void *data, 191 ares_event_flags_t flags) 192 { 193 ares_fd_events_t event; 194 (void)data; 195 196 event.fd = fd; 197 event.events = 0; 198 if (flags & ARES_EVENT_FLAG_READ) { 199 event.events |= ARES_FD_EVENT_READ; 200 } 201 if (flags & ARES_EVENT_FLAG_WRITE) { 202 event.events |= ARES_FD_EVENT_WRITE; 203 } 204 ares_process_fds(e->channel, &event, 1, ARES_PROCESS_FLAG_SKIP_NON_FD); 205 } 206 207 static void ares_event_thread_sockstate_cb(void *data, ares_socket_t socket_fd, 208 int readable, int writable) 209 { 210 ares_event_thread_t *e = data; 211 ares_event_flags_t flags = ARES_EVENT_FLAG_NONE; 212 213 if (readable) { 214 flags |= ARES_EVENT_FLAG_READ; 215 } 216 217 if (writable) { 218 flags |= ARES_EVENT_FLAG_WRITE; 219 } 220 221 /* Update channel fd. This function will lock e->mutex and also wake the 222 * event thread to process the update */ 223 ares_event_update(NULL, e, flags, ares_event_thread_process_fd, socket_fd, 224 NULL, NULL, NULL); 225 } 226 227 static void notifywrite_cb(void *data) 228 { 229 ares_event_thread_t *e = data; 230 231 ares_thread_mutex_lock(e->mutex); 232 e->process_pending_write = ARES_TRUE; 233 ares_thread_mutex_unlock(e->mutex); 234 235 ares_event_thread_wake(e); 236 } 237 238 static void ares_event_process_updates(ares_event_thread_t *e) 239 { 240 ares_llist_node_t *node; 241 242 /* Iterate across all updates and apply to internal list, removing from update 243 * list */ 244 while ((node = ares_llist_node_first(e->ev_updates)) != NULL) { 245 ares_event_t *newev = ares_llist_node_claim(node); 246 ares_event_t *oldev; 247 248 if (newev->fd == ARES_SOCKET_BAD) { 249 oldev = ares_htable_vpvp_get_direct(e->ev_cust_handles, newev->data); 250 } else { 251 oldev = ares_htable_asvp_get_direct(e->ev_sock_handles, newev->fd); 252 } 253 254 /* Adding new */ 255 if (oldev == NULL) { 256 newev->e = e; 257 /* Don't try to add a new event if all flags are cleared, that's basically 258 * someone trying to delete something already deleted. Also if it fails 259 * to add, cleanup. */ 260 if (newev->flags == ARES_EVENT_FLAG_NONE || 261 !e->ev_sys->event_add(newev)) { 262 newev->e = NULL; 263 ares_event_destroy_cb(newev); 264 } else { 265 if (newev->fd == ARES_SOCKET_BAD) { 266 ares_htable_vpvp_insert(e->ev_cust_handles, newev->data, newev); 267 } else { 268 ares_htable_asvp_insert(e->ev_sock_handles, newev->fd, newev); 269 } 270 } 271 continue; 272 } 273 274 /* Removal request */ 275 if (newev->flags == ARES_EVENT_FLAG_NONE) { 276 /* the callback for the removal will call e->ev_sys->event_del(e, event) 277 */ 278 if (newev->fd == ARES_SOCKET_BAD) { 279 ares_htable_vpvp_remove(e->ev_cust_handles, newev->data); 280 } else { 281 ares_htable_asvp_remove(e->ev_sock_handles, newev->fd); 282 } 283 ares_free(newev); 284 continue; 285 } 286 287 /* Modify request -- only flags can be changed */ 288 e->ev_sys->event_mod(oldev, newev->flags); 289 oldev->flags = newev->flags; 290 ares_free(newev); 291 } 292 } 293 294 static void ares_event_thread_cleanup(ares_event_thread_t *e) 295 { 296 /* Manually free any updates that weren't processed */ 297 if (e->ev_updates != NULL) { 298 ares_llist_node_t *node; 299 300 while ((node = ares_llist_node_first(e->ev_updates)) != NULL) { 301 ares_event_destroy_cb(ares_llist_node_claim(node)); 302 } 303 ares_llist_destroy(e->ev_updates); 304 e->ev_updates = NULL; 305 } 306 307 if (e->ev_sock_handles != NULL) { 308 ares_htable_asvp_destroy(e->ev_sock_handles); 309 e->ev_sock_handles = NULL; 310 } 311 312 if (e->ev_cust_handles != NULL) { 313 ares_htable_vpvp_destroy(e->ev_cust_handles); 314 e->ev_cust_handles = NULL; 315 } 316 317 if (e->ev_sys != NULL && e->ev_sys->destroy != NULL) { 318 e->ev_sys->destroy(e); 319 e->ev_sys = NULL; 320 } 321 } 322 323 static void *ares_event_thread(void *arg) 324 { 325 ares_event_thread_t *e = arg; 326 ares_thread_mutex_lock(e->mutex); 327 328 while (e->isup) { 329 struct timeval tv; 330 const struct timeval *tvout; 331 unsigned long timeout_ms = 0; /* 0 = unlimited */ 332 ares_bool_t process_pending_write; 333 334 ares_event_process_updates(e); 335 336 /* Don't hold a mutex while waiting on events or calling into anything 337 * that might require a c-ares channel lock since a callback could be 338 * triggered cross-thread */ 339 ares_thread_mutex_unlock(e->mutex); 340 341 tvout = ares_timeout(e->channel, NULL, &tv); 342 if (tvout != NULL) { 343 timeout_ms = 344 (unsigned long)((tvout->tv_sec * 1000) + (tvout->tv_usec / 1000) + 1); 345 } 346 347 e->ev_sys->wait(e, timeout_ms); 348 349 /* Process pending write operation */ 350 ares_thread_mutex_lock(e->mutex); 351 process_pending_write = e->process_pending_write; 352 e->process_pending_write = ARES_FALSE; 353 ares_thread_mutex_unlock(e->mutex); 354 if (process_pending_write) { 355 ares_process_pending_write(e->channel); 356 } 357 358 /* Relock before we loop again */ 359 ares_thread_mutex_lock(e->mutex); 360 361 /* Each iteration should do timeout processing and any other cleanup 362 * that may not have been performed */ 363 if (e->isup) { 364 ares_thread_mutex_unlock(e->mutex); 365 ares_process_fds(e->channel, NULL, 0, ARES_PROCESS_FLAG_NONE); 366 ares_thread_mutex_lock(e->mutex); 367 } 368 } 369 370 /* Lets cleanup while we're in the thread itself */ 371 ares_event_thread_cleanup(e); 372 373 ares_thread_mutex_unlock(e->mutex); 374 375 return NULL; 376 } 377 378 static void ares_event_thread_destroy_int(ares_event_thread_t *e) 379 { 380 /* Wake thread and tell it to shutdown if it exists */ 381 ares_thread_mutex_lock(e->mutex); 382 if (e->isup) { 383 e->isup = ARES_FALSE; 384 ares_event_thread_wake(e); 385 } 386 ares_thread_mutex_unlock(e->mutex); 387 388 /* Wait for thread to shutdown */ 389 if (e->thread) { 390 void *rv = NULL; 391 ares_thread_join(e->thread, &rv); 392 e->thread = NULL; 393 } 394 395 /* If the event thread ever got to the point of starting, this is a no-op 396 * as it runs this same cleanup when it shuts down */ 397 ares_event_thread_cleanup(e); 398 399 ares_thread_mutex_destroy(e->mutex); 400 e->mutex = NULL; 401 402 ares_free(e); 403 } 404 405 void ares_event_thread_destroy(ares_channel_t *channel) 406 { 407 ares_event_thread_t *e = channel->sock_state_cb_data; 408 409 if (e == NULL) { 410 return; /* LCOV_EXCL_LINE: DefensiveCoding */ 411 } 412 413 ares_event_thread_destroy_int(e); 414 channel->sock_state_cb_data = NULL; 415 channel->sock_state_cb = NULL; 416 channel->notify_pending_write_cb = NULL; 417 channel->notify_pending_write_cb_data = NULL; 418 } 419 420 static const ares_event_sys_t *ares_event_fetch_sys(ares_evsys_t evsys) 421 { 422 switch (evsys) { 423 case ARES_EVSYS_WIN32: 424 #if defined(USE_WINSOCK) 425 return &ares_evsys_win32; 426 #else 427 return NULL; 428 #endif 429 430 case ARES_EVSYS_EPOLL: 431 #if defined(HAVE_EPOLL) 432 return &ares_evsys_epoll; 433 #else 434 return NULL; 435 #endif 436 437 case ARES_EVSYS_KQUEUE: 438 #if defined(HAVE_KQUEUE) 439 return &ares_evsys_kqueue; 440 #else 441 return NULL; 442 #endif 443 444 case ARES_EVSYS_POLL: 445 #if defined(HAVE_POLL) 446 return &ares_evsys_poll; 447 #else 448 return NULL; 449 #endif 450 451 case ARES_EVSYS_SELECT: 452 #if defined(HAVE_PIPE) 453 return &ares_evsys_select; 454 #else 455 return NULL; 456 #endif 457 458 /* case ARES_EVSYS_DEFAULT: */ 459 default: 460 break; 461 } 462 463 /* default */ 464 #if defined(USE_WINSOCK) 465 return &ares_evsys_win32; 466 #elif defined(HAVE_KQUEUE) 467 return &ares_evsys_kqueue; 468 #elif defined(HAVE_EPOLL) 469 return &ares_evsys_epoll; 470 #elif defined(HAVE_POLL) 471 return &ares_evsys_poll; 472 #elif defined(HAVE_PIPE) 473 return &ares_evsys_select; 474 #else 475 return NULL; 476 #endif 477 } 478 479 ares_status_t ares_event_thread_init(ares_channel_t *channel) 480 { 481 ares_event_thread_t *e; 482 483 e = ares_malloc_zero(sizeof(*e)); 484 if (e == NULL) { 485 return ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */ 486 } 487 488 e->mutex = ares_thread_mutex_create(); 489 if (e->mutex == NULL) { 490 ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: OutOfMemory */ 491 return ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */ 492 } 493 494 e->ev_updates = ares_llist_create(NULL); 495 if (e->ev_updates == NULL) { 496 ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: OutOfMemory */ 497 return ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */ 498 } 499 500 e->ev_sock_handles = ares_htable_asvp_create(ares_event_destroy_cb); 501 if (e->ev_sock_handles == NULL) { 502 ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: OutOfMemory */ 503 return ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */ 504 } 505 506 e->ev_cust_handles = ares_htable_vpvp_create(NULL, ares_event_destroy_cb); 507 if (e->ev_cust_handles == NULL) { 508 ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: OutOfMemory */ 509 return ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */ 510 } 511 512 e->channel = channel; 513 e->isup = ARES_TRUE; 514 e->ev_sys = ares_event_fetch_sys(channel->evsys); 515 if (e->ev_sys == NULL) { 516 ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: UntestablePath */ 517 return ARES_ENOTIMP; /* LCOV_EXCL_LINE: UntestablePath */ 518 } 519 520 channel->sock_state_cb = ares_event_thread_sockstate_cb; 521 channel->sock_state_cb_data = e; 522 channel->notify_pending_write_cb = notifywrite_cb; 523 channel->notify_pending_write_cb_data = e; 524 525 if (!e->ev_sys->init(e)) { 526 /* LCOV_EXCL_START: UntestablePath */ 527 ares_event_thread_destroy_int(e); 528 channel->sock_state_cb = NULL; 529 channel->sock_state_cb_data = NULL; 530 return ARES_ESERVFAIL; 531 /* LCOV_EXCL_STOP */ 532 } 533 534 /* Before starting the thread, process any possible events the initialization 535 * might have enqueued as we may actually depend on these being valid 536 * immediately upon return, which may mean before the thread is fully spawned 537 * and processed the list itself. We don't want any sort of race conditions 538 * (like the event system wake handle itself). */ 539 ares_event_process_updates(e); 540 541 /* Start thread */ 542 if (ares_thread_create(&e->thread, ares_event_thread, e) != ARES_SUCCESS) { 543 /* LCOV_EXCL_START: UntestablePath */ 544 ares_event_thread_destroy_int(e); 545 channel->sock_state_cb = NULL; 546 channel->sock_state_cb_data = NULL; 547 return ARES_ESERVFAIL; 548 /* LCOV_EXCL_STOP */ 549 } 550 551 return ARES_SUCCESS; 552 } 553 554 #else 555 556 ares_status_t ares_event_thread_init(ares_channel_t *channel) 557 { 558 (void)channel; 559 return ARES_ENOTIMP; 560 } 561 562 void ares_event_thread_destroy(ares_channel_t *channel) 563 { 564 (void)channel; 565 } 566 567 #endif