quickjs-tart

quickjs-based runtime for wallet-core logic
Log | Files | Refs | README | LICENSE

ares_event_thread.c (16067B)


      1 /* MIT License
      2  *
      3  * Copyright (c) 2024 Brad House
      4  *
      5  * Permission is hereby granted, free of charge, to any person obtaining a copy
      6  * of this software and associated documentation files (the "Software"), to deal
      7  * in the Software without restriction, including without limitation the rights
      8  * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
      9  * copies of the Software, and to permit persons to whom the Software is
     10  * furnished to do so, subject to the following conditions:
     11  *
     12  * The above copyright notice and this permission notice (including the next
     13  * paragraph) shall be included in all copies or substantial portions of the
     14  * Software.
     15  *
     16  * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
     17  * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
     18  * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
     19  * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
     20  * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
     21  * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
     22  * SOFTWARE.
     23  *
     24  * SPDX-License-Identifier: MIT
     25  */
     26 #include "ares_private.h"
     27 #include "ares_event.h"
     28 
     29 #ifdef CARES_THREADS
     30 static void ares_event_destroy_cb(void *arg)
     31 {
     32   ares_event_t *event = arg;
     33   if (event == NULL) {
     34     return; /* LCOV_EXCL_LINE: DefensiveCoding */
     35   }
     36 
     37   /* Unregister from the event thread if it was registered with one */
     38   if (event->e) {
     39     const ares_event_thread_t *e = event->e;
     40     e->ev_sys->event_del(event);
     41     event->e = NULL;
     42   }
     43 
     44   if (event->free_data_cb && event->data) {
     45     event->free_data_cb(event->data);
     46   }
     47 
     48   ares_free(event);
     49 }
     50 
     51 static void ares_event_signal(const ares_event_t *event)
     52 {
     53   if (event == NULL || event->signal_cb == NULL) {
     54     return; /* LCOV_EXCL_LINE: DefensiveCoding */
     55   }
     56   event->signal_cb(event);
     57 }
     58 
     59 static void ares_event_thread_wake(const ares_event_thread_t *e)
     60 {
     61   if (e == NULL) {
     62     return; /* LCOV_EXCL_LINE: DefensiveCoding */
     63   }
     64 
     65   ares_event_signal(e->ev_signal);
     66 }
     67 
     68 /* See if a pending update already exists. We don't want to enqueue multiple
     69  * updates for the same event handle. Right now this is O(n) based on number
     70  * of updates already enqueued.  In the future, it might make sense to make
     71  * this O(1) with a hashtable.
     72  * NOTE: in some cases a delete then re-add of the same fd, but really pointing
     73  *       to a different destination can happen due to a quick close of a
     74  *       connection then creation of a new one.  So we need to look at the
     75  *       flags and ignore any delete events when finding a match since we
     76  *       need to process the delete always, it can't be combined with other
     77  *       updates. */
     78 static ares_event_t *ares_event_update_find(ares_event_thread_t *e,
     79                                             ares_socket_t fd, const void *data)
     80 {
     81   ares_llist_node_t *node;
     82 
     83   for (node = ares_llist_node_first(e->ev_updates); node != NULL;
     84        node = ares_llist_node_next(node)) {
     85     ares_event_t *ev = ares_llist_node_val(node);
     86 
     87     if (fd != ARES_SOCKET_BAD && fd == ev->fd && ev->flags != 0) {
     88       return ev;
     89     }
     90 
     91     if (fd == ARES_SOCKET_BAD && ev->fd == ARES_SOCKET_BAD &&
     92         data == ev->data && ev->flags != 0) {
     93       return ev;
     94     }
     95   }
     96 
     97   return NULL;
     98 }
     99 
    100 ares_status_t ares_event_update(ares_event_t **event, ares_event_thread_t *e,
    101                                 ares_event_flags_t flags, ares_event_cb_t cb,
    102                                 ares_socket_t fd, void *data,
    103                                 ares_event_free_data_t free_data_cb,
    104                                 ares_event_signal_cb_t signal_cb)
    105 {
    106   ares_event_t *ev = NULL;
    107   ares_status_t status;
    108 
    109   if (e == NULL) {
    110     return ARES_EFORMERR; /* LCOV_EXCL_LINE: DefensiveCoding */
    111   }
    112 
    113   /* Callback must be specified if not a removal event. */
    114   if (flags != ARES_EVENT_FLAG_NONE && cb == NULL) {
    115     return ARES_EFORMERR;
    116   }
    117 
    118   if (event != NULL) {
    119     *event = NULL;
    120   }
    121 
    122   /* Validate flags */
    123   if (fd == ARES_SOCKET_BAD) {
    124     if (flags & (ARES_EVENT_FLAG_READ | ARES_EVENT_FLAG_WRITE)) {
    125       return ARES_EFORMERR;
    126     }
    127     if (!(flags & ARES_EVENT_FLAG_OTHER)) {
    128       return ARES_EFORMERR;
    129     }
    130   } else {
    131     if (flags & ARES_EVENT_FLAG_OTHER) {
    132       return ARES_EFORMERR;
    133     }
    134   }
    135 
    136   /* That's all the validation we can really do */
    137 
    138   ares_thread_mutex_lock(e->mutex);
    139 
    140   /* See if we have a queued update already */
    141   ev = ares_event_update_find(e, fd, data);
    142   if (ev == NULL) {
    143     /* Allocate a new one */
    144     ev = ares_malloc_zero(sizeof(*ev));
    145     if (ev == NULL) {
    146       status = ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */
    147       goto done;            /* LCOV_EXCL_LINE: OutOfMemory */
    148     }
    149 
    150     if (ares_llist_insert_last(e->ev_updates, ev) == NULL) {
    151       ares_free(ev);        /* LCOV_EXCL_LINE: OutOfMemory */
    152       status = ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */
    153       goto done;            /* LCOV_EXCL_LINE: OutOfMemory */
    154     }
    155   }
    156 
    157   ev->flags = flags;
    158   ev->fd    = fd;
    159   if (ev->cb == NULL) {
    160     ev->cb = cb;
    161   }
    162   if (ev->data == NULL) {
    163     ev->data = data;
    164   }
    165   if (ev->free_data_cb == NULL) {
    166     ev->free_data_cb = free_data_cb;
    167   }
    168   if (ev->signal_cb == NULL) {
    169     ev->signal_cb = signal_cb;
    170   }
    171 
    172   if (event != NULL) {
    173     *event = ev;
    174   }
    175 
    176   status = ARES_SUCCESS;
    177 
    178 done:
    179   if (status == ARES_SUCCESS) {
    180     /* Wake event thread if successful so it can pull the updates */
    181     ares_event_thread_wake(e);
    182   }
    183 
    184   ares_thread_mutex_unlock(e->mutex);
    185 
    186   return status;
    187 }
    188 
    189 static void ares_event_thread_process_fd(ares_event_thread_t *e,
    190                                          ares_socket_t fd, void *data,
    191                                          ares_event_flags_t flags)
    192 {
    193   ares_fd_events_t event;
    194   (void)data;
    195 
    196   event.fd     = fd;
    197   event.events = 0;
    198   if (flags & ARES_EVENT_FLAG_READ) {
    199     event.events |= ARES_FD_EVENT_READ;
    200   }
    201   if (flags & ARES_EVENT_FLAG_WRITE) {
    202     event.events |= ARES_FD_EVENT_WRITE;
    203   }
    204   ares_process_fds(e->channel, &event, 1, ARES_PROCESS_FLAG_SKIP_NON_FD);
    205 }
    206 
    207 static void ares_event_thread_sockstate_cb(void *data, ares_socket_t socket_fd,
    208                                            int readable, int writable)
    209 {
    210   ares_event_thread_t *e     = data;
    211   ares_event_flags_t   flags = ARES_EVENT_FLAG_NONE;
    212 
    213   if (readable) {
    214     flags |= ARES_EVENT_FLAG_READ;
    215   }
    216 
    217   if (writable) {
    218     flags |= ARES_EVENT_FLAG_WRITE;
    219   }
    220 
    221   /* Update channel fd.  This function will lock e->mutex and also wake the
    222    * event thread to process the update */
    223   ares_event_update(NULL, e, flags, ares_event_thread_process_fd, socket_fd,
    224                     NULL, NULL, NULL);
    225 }
    226 
    227 static void notifywrite_cb(void *data)
    228 {
    229   ares_event_thread_t *e = data;
    230 
    231   ares_thread_mutex_lock(e->mutex);
    232   e->process_pending_write = ARES_TRUE;
    233   ares_thread_mutex_unlock(e->mutex);
    234 
    235   ares_event_thread_wake(e);
    236 }
    237 
    238 static void ares_event_process_updates(ares_event_thread_t *e)
    239 {
    240   ares_llist_node_t *node;
    241 
    242   /* Iterate across all updates and apply to internal list, removing from update
    243    * list */
    244   while ((node = ares_llist_node_first(e->ev_updates)) != NULL) {
    245     ares_event_t *newev = ares_llist_node_claim(node);
    246     ares_event_t *oldev;
    247 
    248     if (newev->fd == ARES_SOCKET_BAD) {
    249       oldev = ares_htable_vpvp_get_direct(e->ev_cust_handles, newev->data);
    250     } else {
    251       oldev = ares_htable_asvp_get_direct(e->ev_sock_handles, newev->fd);
    252     }
    253 
    254     /* Adding new */
    255     if (oldev == NULL) {
    256       newev->e = e;
    257       /* Don't try to add a new event if all flags are cleared, that's basically
    258        * someone trying to delete something already deleted.  Also if it fails
    259        * to add, cleanup. */
    260       if (newev->flags == ARES_EVENT_FLAG_NONE ||
    261           !e->ev_sys->event_add(newev)) {
    262         newev->e = NULL;
    263         ares_event_destroy_cb(newev);
    264       } else {
    265         if (newev->fd == ARES_SOCKET_BAD) {
    266           ares_htable_vpvp_insert(e->ev_cust_handles, newev->data, newev);
    267         } else {
    268           ares_htable_asvp_insert(e->ev_sock_handles, newev->fd, newev);
    269         }
    270       }
    271       continue;
    272     }
    273 
    274     /* Removal request */
    275     if (newev->flags == ARES_EVENT_FLAG_NONE) {
    276       /* the callback for the removal will call e->ev_sys->event_del(e, event)
    277        */
    278       if (newev->fd == ARES_SOCKET_BAD) {
    279         ares_htable_vpvp_remove(e->ev_cust_handles, newev->data);
    280       } else {
    281         ares_htable_asvp_remove(e->ev_sock_handles, newev->fd);
    282       }
    283       ares_free(newev);
    284       continue;
    285     }
    286 
    287     /* Modify request -- only flags can be changed */
    288     e->ev_sys->event_mod(oldev, newev->flags);
    289     oldev->flags = newev->flags;
    290     ares_free(newev);
    291   }
    292 }
    293 
    294 static void ares_event_thread_cleanup(ares_event_thread_t *e)
    295 {
    296   /* Manually free any updates that weren't processed */
    297   if (e->ev_updates != NULL) {
    298     ares_llist_node_t *node;
    299 
    300     while ((node = ares_llist_node_first(e->ev_updates)) != NULL) {
    301       ares_event_destroy_cb(ares_llist_node_claim(node));
    302     }
    303     ares_llist_destroy(e->ev_updates);
    304     e->ev_updates = NULL;
    305   }
    306 
    307   if (e->ev_sock_handles != NULL) {
    308     ares_htable_asvp_destroy(e->ev_sock_handles);
    309     e->ev_sock_handles = NULL;
    310   }
    311 
    312   if (e->ev_cust_handles != NULL) {
    313     ares_htable_vpvp_destroy(e->ev_cust_handles);
    314     e->ev_cust_handles = NULL;
    315   }
    316 
    317   if (e->ev_sys != NULL && e->ev_sys->destroy != NULL) {
    318     e->ev_sys->destroy(e);
    319     e->ev_sys = NULL;
    320   }
    321 }
    322 
    323 static void *ares_event_thread(void *arg)
    324 {
    325   ares_event_thread_t *e = arg;
    326   ares_thread_mutex_lock(e->mutex);
    327 
    328   while (e->isup) {
    329     struct timeval        tv;
    330     const struct timeval *tvout;
    331     unsigned long         timeout_ms = 0; /* 0 = unlimited */
    332     ares_bool_t           process_pending_write;
    333 
    334     ares_event_process_updates(e);
    335 
    336     /* Don't hold a mutex while waiting on events or calling into anything
    337      * that might require a c-ares channel lock since a callback could be
    338      * triggered cross-thread */
    339     ares_thread_mutex_unlock(e->mutex);
    340 
    341     tvout = ares_timeout(e->channel, NULL, &tv);
    342     if (tvout != NULL) {
    343       timeout_ms =
    344         (unsigned long)((tvout->tv_sec * 1000) + (tvout->tv_usec / 1000) + 1);
    345     }
    346 
    347     e->ev_sys->wait(e, timeout_ms);
    348 
    349     /* Process pending write operation */
    350     ares_thread_mutex_lock(e->mutex);
    351     process_pending_write    = e->process_pending_write;
    352     e->process_pending_write = ARES_FALSE;
    353     ares_thread_mutex_unlock(e->mutex);
    354     if (process_pending_write) {
    355       ares_process_pending_write(e->channel);
    356     }
    357 
    358     /* Relock before we loop again */
    359     ares_thread_mutex_lock(e->mutex);
    360 
    361     /* Each iteration should do timeout processing and any other cleanup
    362      * that may not have been performed */
    363     if (e->isup) {
    364       ares_thread_mutex_unlock(e->mutex);
    365       ares_process_fds(e->channel, NULL, 0, ARES_PROCESS_FLAG_NONE);
    366       ares_thread_mutex_lock(e->mutex);
    367     }
    368   }
    369 
    370   /* Lets cleanup while we're in the thread itself */
    371   ares_event_thread_cleanup(e);
    372 
    373   ares_thread_mutex_unlock(e->mutex);
    374 
    375   return NULL;
    376 }
    377 
    378 static void ares_event_thread_destroy_int(ares_event_thread_t *e)
    379 {
    380   /* Wake thread and tell it to shutdown if it exists */
    381   ares_thread_mutex_lock(e->mutex);
    382   if (e->isup) {
    383     e->isup = ARES_FALSE;
    384     ares_event_thread_wake(e);
    385   }
    386   ares_thread_mutex_unlock(e->mutex);
    387 
    388   /* Wait for thread to shutdown */
    389   if (e->thread) {
    390     void *rv = NULL;
    391     ares_thread_join(e->thread, &rv);
    392     e->thread = NULL;
    393   }
    394 
    395   /* If the event thread ever got to the point of starting, this is a no-op
    396    * as it runs this same cleanup when it shuts down */
    397   ares_event_thread_cleanup(e);
    398 
    399   ares_thread_mutex_destroy(e->mutex);
    400   e->mutex = NULL;
    401 
    402   ares_free(e);
    403 }
    404 
    405 void ares_event_thread_destroy(ares_channel_t *channel)
    406 {
    407   ares_event_thread_t *e = channel->sock_state_cb_data;
    408 
    409   if (e == NULL) {
    410     return; /* LCOV_EXCL_LINE: DefensiveCoding */
    411   }
    412 
    413   ares_event_thread_destroy_int(e);
    414   channel->sock_state_cb_data           = NULL;
    415   channel->sock_state_cb                = NULL;
    416   channel->notify_pending_write_cb      = NULL;
    417   channel->notify_pending_write_cb_data = NULL;
    418 }
    419 
    420 static const ares_event_sys_t *ares_event_fetch_sys(ares_evsys_t evsys)
    421 {
    422   switch (evsys) {
    423     case ARES_EVSYS_WIN32:
    424 #if defined(USE_WINSOCK)
    425       return &ares_evsys_win32;
    426 #else
    427       return NULL;
    428 #endif
    429 
    430     case ARES_EVSYS_EPOLL:
    431 #if defined(HAVE_EPOLL)
    432       return &ares_evsys_epoll;
    433 #else
    434       return NULL;
    435 #endif
    436 
    437     case ARES_EVSYS_KQUEUE:
    438 #if defined(HAVE_KQUEUE)
    439       return &ares_evsys_kqueue;
    440 #else
    441       return NULL;
    442 #endif
    443 
    444     case ARES_EVSYS_POLL:
    445 #if defined(HAVE_POLL)
    446       return &ares_evsys_poll;
    447 #else
    448       return NULL;
    449 #endif
    450 
    451     case ARES_EVSYS_SELECT:
    452 #if defined(HAVE_PIPE)
    453       return &ares_evsys_select;
    454 #else
    455       return NULL;
    456 #endif
    457 
    458     /* case ARES_EVSYS_DEFAULT: */
    459     default:
    460       break;
    461   }
    462 
    463     /* default */
    464 #if defined(USE_WINSOCK)
    465   return &ares_evsys_win32;
    466 #elif defined(HAVE_KQUEUE)
    467   return &ares_evsys_kqueue;
    468 #elif defined(HAVE_EPOLL)
    469   return &ares_evsys_epoll;
    470 #elif defined(HAVE_POLL)
    471   return &ares_evsys_poll;
    472 #elif defined(HAVE_PIPE)
    473   return &ares_evsys_select;
    474 #else
    475   return NULL;
    476 #endif
    477 }
    478 
    479 ares_status_t ares_event_thread_init(ares_channel_t *channel)
    480 {
    481   ares_event_thread_t *e;
    482 
    483   e = ares_malloc_zero(sizeof(*e));
    484   if (e == NULL) {
    485     return ARES_ENOMEM; /* LCOV_EXCL_LINE: OutOfMemory */
    486   }
    487 
    488   e->mutex = ares_thread_mutex_create();
    489   if (e->mutex == NULL) {
    490     ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: OutOfMemory */
    491     return ARES_ENOMEM;               /* LCOV_EXCL_LINE: OutOfMemory */
    492   }
    493 
    494   e->ev_updates = ares_llist_create(NULL);
    495   if (e->ev_updates == NULL) {
    496     ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: OutOfMemory */
    497     return ARES_ENOMEM;               /* LCOV_EXCL_LINE: OutOfMemory */
    498   }
    499 
    500   e->ev_sock_handles = ares_htable_asvp_create(ares_event_destroy_cb);
    501   if (e->ev_sock_handles == NULL) {
    502     ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: OutOfMemory */
    503     return ARES_ENOMEM;               /* LCOV_EXCL_LINE: OutOfMemory */
    504   }
    505 
    506   e->ev_cust_handles = ares_htable_vpvp_create(NULL, ares_event_destroy_cb);
    507   if (e->ev_cust_handles == NULL) {
    508     ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: OutOfMemory */
    509     return ARES_ENOMEM;               /* LCOV_EXCL_LINE: OutOfMemory */
    510   }
    511 
    512   e->channel = channel;
    513   e->isup    = ARES_TRUE;
    514   e->ev_sys  = ares_event_fetch_sys(channel->evsys);
    515   if (e->ev_sys == NULL) {
    516     ares_event_thread_destroy_int(e); /* LCOV_EXCL_LINE: UntestablePath */
    517     return ARES_ENOTIMP;              /* LCOV_EXCL_LINE: UntestablePath */
    518   }
    519 
    520   channel->sock_state_cb                = ares_event_thread_sockstate_cb;
    521   channel->sock_state_cb_data           = e;
    522   channel->notify_pending_write_cb      = notifywrite_cb;
    523   channel->notify_pending_write_cb_data = e;
    524 
    525   if (!e->ev_sys->init(e)) {
    526     /* LCOV_EXCL_START: UntestablePath */
    527     ares_event_thread_destroy_int(e);
    528     channel->sock_state_cb      = NULL;
    529     channel->sock_state_cb_data = NULL;
    530     return ARES_ESERVFAIL;
    531     /* LCOV_EXCL_STOP */
    532   }
    533 
    534   /* Before starting the thread, process any possible events the initialization
    535    * might have enqueued as we may actually depend on these being valid
    536    * immediately upon return, which may mean before the thread is fully spawned
    537    * and processed the list itself. We don't want any sort of race conditions
    538    * (like the event system wake handle itself). */
    539   ares_event_process_updates(e);
    540 
    541   /* Start thread */
    542   if (ares_thread_create(&e->thread, ares_event_thread, e) != ARES_SUCCESS) {
    543     /* LCOV_EXCL_START: UntestablePath */
    544     ares_event_thread_destroy_int(e);
    545     channel->sock_state_cb      = NULL;
    546     channel->sock_state_cb_data = NULL;
    547     return ARES_ESERVFAIL;
    548     /* LCOV_EXCL_STOP */
    549   }
    550 
    551   return ARES_SUCCESS;
    552 }
    553 
    554 #else
    555 
    556 ares_status_t ares_event_thread_init(ares_channel_t *channel)
    557 {
    558   (void)channel;
    559   return ARES_ENOTIMP;
    560 }
    561 
    562 void ares_event_thread_destroy(ares_channel_t *channel)
    563 {
    564   (void)channel;
    565 }
    566 
    567 #endif