From 10f9272e45ea97d1b8f8059c9d285049ff4b606d Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 22 Aug 2021 00:12:18 +0200 Subject: -implement long polling support on reserve status (but not yet in C client library) --- contrib/gana | 2 +- src/exchange/taler-exchange-httpd.c | 26 +++- src/exchange/taler-exchange-httpd_reserves_get.c | 176 +++++++++++++++++++++-- src/exchange/taler-exchange-httpd_reserves_get.h | 9 ++ src/exchangedb/plugin_exchangedb_postgres.c | 102 +++++++++++-- src/include/taler_exchangedb_plugin.h | 31 +++- 6 files changed, 313 insertions(+), 33 deletions(-) diff --git a/contrib/gana b/contrib/gana index 2e967c48b..efb2a1fd6 160000 --- a/contrib/gana +++ b/contrib/gana @@ -1 +1 @@ -Subproject commit 2e967c48b395a3edb85982e2e349cb82e76dcb27 +Subproject commit efb2a1fd64e17159c56ff3674083837b5a657a64 diff --git a/src/exchange/taler-exchange-httpd.c b/src/exchange/taler-exchange-httpd.c index 80649c0bc..c06695e4d 100644 --- a/src/exchange/taler-exchange-httpd.c +++ b/src/exchange/taler-exchange-httpd.c @@ -1430,8 +1430,14 @@ run_single_request (void) } MHD_run (mhd); } - TEH_resume_keys_requests (true); - MHD_stop_daemon (mhd); + { + MHD_socket sock = MHD_quiesce_daemon (mhd); + + TEH_resume_keys_requests (true); + TEH_reserves_get_cleanup (); + MHD_stop_daemon (mhd); + GNUNET_break (0 == close (sock)); + } mhd = NULL; if (cld != waitpid (cld, &status, @@ -1494,8 +1500,15 @@ run_main_loop (int fh, { case GNUNET_OK: case GNUNET_SYSERR: - TEH_resume_keys_requests (true); - MHD_stop_daemon (mhd); + { + MHD_socket sock = MHD_quiesce_daemon (mhd); + + TEH_resume_keys_requests (true); + TEH_reserves_get_cleanup (); + MHD_stop_daemon (mhd); + GNUNET_break (0 == close (sock)); + } + mhd = NULL; break; case GNUNET_NO: { @@ -1507,7 +1520,9 @@ run_main_loop (int fh, flags = fcntl (sock, F_GETFD); GNUNET_assert (-1 != flags); flags &= ~FD_CLOEXEC; - GNUNET_assert (-1 != fcntl (sock, F_SETFD, flags)); + GNUNET_assert (-1 != fcntl (sock, + F_SETFD, + flags)); chld = fork (); if (-1 == chld) { @@ -1551,6 +1566,7 @@ run_main_loop (int fh, sleep (1); /* Now we're really done, practice clean shutdown */ TEH_resume_keys_requests (true); + TEH_reserves_get_cleanup (); MHD_stop_daemon (mhd); } break; diff --git a/src/exchange/taler-exchange-httpd_reserves_get.c b/src/exchange/taler-exchange-httpd_reserves_get.c index d08543a4e..6501f600a 100644 --- a/src/exchange/taler-exchange-httpd_reserves_get.c +++ b/src/exchange/taler-exchange-httpd_reserves_get.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2014-2020 Taler Systems SA + Copyright (C) 2014-2021 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -25,6 +25,7 @@ #include #include "taler_mhd_lib.h" #include "taler_json_lib.h" +#include "taler_dbevents.h" #include "taler-exchange-httpd_reserves_get.h" #include "taler-exchange-httpd_responses.h" @@ -49,25 +50,113 @@ struct ReservePoller */ struct MHD_Connection *connection; - /** - * Entry in the timeout heap. - */ - struct GNUNET_CONTAINER_HeapNode *hn; - /** * Subscription for the database event we are * waiting for. */ - struct GNUNET_DB_EventHandler *eh; + struct TALER_EXCHANGEDB_EventHandler *eh; /** * When will this request time out? */ struct GNUNET_TIME_Absolute timeout; + /** + * True if we are still suspended. + */ + bool suspended; + }; +/** + * Head of list of requests in long polling. + */ +static struct ReservePoller *rp_head; + +/** + * Tail of list of requests in long polling. + */ +static struct ReservePoller *rp_tail; + + +void +TEH_reserves_get_cleanup () +{ + struct ReservePoller *rp; + + while (NULL != (rp = rp_head)) + { + GNUNET_CONTAINER_DLL_remove (rp_head, + rp_tail, + rp); + if (rp->suspended) + { + rp->suspended = false; + MHD_resume_connection (rp->connection); + } + } +} + + +/** + * Function called once a connection is done to + * clean up the `struct ReservePoller` state. + * + * @param rc context to clean up for + */ +static void +rp_cleanup (struct TEH_RequestContext *rc) +{ + struct ReservePoller *rp = rc->rh_ctx; + + if (NULL != rp->eh) + { + TEH_plugin->event_listen_cancel (TEH_plugin->cls, + rp->eh); + rp->eh = NULL; + } + GNUNET_free (rp); +} + + +/** + * Function called on events received from Postgres. + * Wakes up long pollers. + * + * @param cls the `struct TEH_RequestContext *` + * @param extra additional event data provided + * @param extra_size number of bytes in @a extra + */ +static void +db_event_cb (void *cls, + const void *extra, + size_t extra_size) +{ + struct TEH_RequestContext *rc = cls; + struct ReservePoller *rp = rc->rh_ctx; + struct GNUNET_AsyncScopeSave old_scope; + + (void) extra; + (void) extra_size; + if (NULL == rp) + return; /* event triggered while main transaction + was still running */ + if (! rp->suspended) + return; /* might get multiple wake-up events */ + rp->suspended = false; + GNUNET_async_scope_enter (&rc->async_scope_id, + &old_scope); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Resuming from long-polling on reserve\n"); + GNUNET_CONTAINER_DLL_remove (rp_head, + rp_tail, + rp); + MHD_resume_connection (rp->connection); + GNUNET_async_scope_restore (&old_scope); +} + + /** * Send reserve history to client. * @@ -157,6 +246,8 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc, { struct ReserveHistoryContext rsc; MHD_RESULT mhd_ret; + struct GNUNET_TIME_Relative timeout; + struct TALER_EXCHANGEDB_EventHandler *eh = NULL; if (GNUNET_OK != GNUNET_STRINGS_string_to_data (args[0], @@ -170,6 +261,47 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc, TALER_EC_MERCHANT_GENERIC_RESERVE_PUB_MALFORMED, args[0]); } + { + const char *long_poll_timeout_ms; + + long_poll_timeout_ms + = MHD_lookup_connection_value (rc->connection, + MHD_GET_ARGUMENT_KIND, + "timeout_ms"); + if (NULL != long_poll_timeout_ms) + { + unsigned int timeout_ms; + char dummy; + + if (1 != sscanf (long_poll_timeout_ms, + "%u%c", + &timeout_ms, + &dummy)) + { + GNUNET_break_op (0); + return TALER_MHD_reply_with_error (rc->connection, + MHD_HTTP_BAD_REQUEST, + TALER_EC_GENERIC_PARAMETER_MALFORMED, + "timeout_ms (must be non-negative number)"); + } + timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + timeout_ms); + } + } + if (! GNUNET_TIME_relative_is_zero (timeout)) + { + struct TALER_ReserveEventP rep = { + .header.size = htons (sizeof (rep)), + .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING), + .reserve_pub = rsc.reserve_pub + }; + + eh = TEH_plugin->event_listen (TEH_plugin->cls, + timeout, + &rep.header, + &db_event_cb, + rc); + } rsc.rh = NULL; if (GNUNET_OK != TEH_DB_run_transaction (rc->connection, @@ -178,13 +310,33 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc, &reserve_history_transaction, &rsc)) return mhd_ret; - /* generate proper response */ if (NULL == rsc.rh) - return TALER_MHD_reply_with_error (rc->connection, - MHD_HTTP_NOT_FOUND, - TALER_EC_EXCHANGE_RESERVES_GET_STATUS_UNKNOWN, - args[0]); + { + struct ReservePoller *rp = rc->rh_ctx; + + if ( (NULL != rp) || + (GNUNET_TIME_relative_is_zero (timeout)) ) + { + return TALER_MHD_reply_with_error (rc->connection, + MHD_HTTP_NOT_FOUND, + TALER_EC_EXCHANGE_RESERVES_GET_STATUS_UNKNOWN, + args[0]); + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Long-polling on reserve for %s\n", + GNUNET_STRINGS_relative_time_to_string (timeout, + GNUNET_YES)); + rp = GNUNET_new (struct ReservePoller); + rp->connection = rc->connection; + rp->timeout = GNUNET_TIME_relative_to_absolute (timeout); + rp->eh = eh; + rc->rh_ctx = rp; + rc->rh_cleaner = &rp_cleanup; + rp->suspended = true; + MHD_suspend_connection (rc->connection); + return MHD_YES; + } mhd_ret = reply_reserve_history_success (rc->connection, rsc.rh); TEH_plugin->free_reserve_history (TEH_plugin->cls, diff --git a/src/exchange/taler-exchange-httpd_reserves_get.h b/src/exchange/taler-exchange-httpd_reserves_get.h index 1eb9ab60e..30c6559f6 100644 --- a/src/exchange/taler-exchange-httpd_reserves_get.h +++ b/src/exchange/taler-exchange-httpd_reserves_get.h @@ -27,6 +27,15 @@ #include "taler-exchange-httpd.h" +/** + * Shutdown reserves-get subsystem. Resumes all + * suspended long-polling clients and cleans up + * data structures. + */ +void +TEH_reserves_get_cleanup (void); + + /** * Handle a GET "/reserves/" request. Parses the * given "reserve_pub" in @a args (which should contain the diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 1d05fb499..2d7ca0573 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -109,6 +109,39 @@ struct TALER_EXCHANGEDB_Session }; +/** + * Event registration record. + */ +struct TALER_EXCHANGEDB_EventHandler +{ + /** + * Underlying GNUnet event handler. + */ + struct GNUNET_DB_EventHandler *geh; + + /** + * Entry in the heap. + */ + struct GNUNET_CONTAINER_HeapNode *hn; + + /** + * Our timeout. + */ + struct GNUNET_TIME_Absolute timeout; + + /** + * Callback to invoke (on @e timeout). + */ + GNUNET_DB_EventCallback cb; + + /** + * Closure for @e cb. + */ + void *cb_cls; + +}; + + /** * Type of the "cls" argument given to each of the functions in * our API. @@ -132,6 +165,12 @@ struct PostgresClosure */ char *sql_dir; + /** + * Heap of `struct TALER_EXCHANGEDB_EventHandler` + * by timeout. + */ + struct GNUNET_CONTAINER_Heap *event_heap; + /** * After how long should idle reserves be closed? */ @@ -2832,18 +2871,41 @@ handle_events (void *cls) } }; nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2; + struct TALER_EXCHANGEDB_EventHandler *r; GNUNET_assert (0 == pthread_mutex_lock (&pg->event_lock)); while (0 != pg->listener_count) { int ret; + int timeout = -1; /* no timeout */ GNUNET_assert (0 == pthread_mutex_unlock (&pg->event_lock)); + while (1) + { + r = GNUNET_CONTAINER_heap_peek (pg->event_heap); + if (NULL == r) + break; + if (GNUNET_TIME_absolute_is_future (r->timeout)) + break; + GNUNET_assert (r == + GNUNET_CONTAINER_heap_remove_root (pg->event_heap)); + r->hn = NULL; + r->cb (r->cb_cls, + NULL, + 0); + } + if (NULL != r) + { + struct GNUNET_TIME_Relative rem; + + rem = GNUNET_TIME_absolute_get_remaining (r->timeout); + timeout = rem.rel_value_us / GNUNET_TIME_UNIT_MILLISECONDS.rel_value_us; + } ret = poll (pfds, nfds, - -1 /* no timeout */); + timeout); if (-1 == ret) GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING, "poll"); @@ -2909,16 +2971,30 @@ pq_socket_cb (void *cls, * @param cb_cls closure for @a cb * @return handle useful to cancel the listener */ -static struct GNUNET_DB_EventHandler * +static struct TALER_EXCHANGEDB_EventHandler * postgres_event_listen (void *cls, - struct TALER_EXCHANGEDB_Session *session, + struct GNUNET_TIME_Relative timeout, const struct GNUNET_DB_EventHeaderP *es, GNUNET_DB_EventCallback cb, void *cb_cls) { struct PostgresClosure *pg = cls; - struct GNUNET_DB_EventHandler *eh; + struct TALER_EXCHANGEDB_EventHandler *eh; + struct TALER_EXCHANGEDB_Session *session; + session = postgres_get_session (pg); + eh = GNUNET_new (struct TALER_EXCHANGEDB_EventHandler); + eh->cb = cb; + eh->cb_cls = cb_cls; + eh->timeout = GNUNET_TIME_relative_to_absolute (timeout); + eh->geh = GNUNET_PQ_event_listen (session->conn, + es, + cb, + cb_cls); + GNUNET_assert (NULL != eh->geh); + eh->hn = GNUNET_CONTAINER_heap_insert (pg->event_heap, + eh, + eh->timeout.abs_value_us); GNUNET_assert (0 == pthread_mutex_lock (&pg->event_lock)); pg->listener_count++; @@ -2932,11 +3008,6 @@ postgres_event_listen (void *cls, } GNUNET_assert (0 == pthread_mutex_unlock (&pg->event_lock)); - eh = GNUNET_PQ_event_listen (session->conn, - es, - cb, - cb_cls); - GNUNET_assert (NULL != eh); return eh; } @@ -2949,7 +3020,7 @@ postgres_event_listen (void *cls, */ static void postgres_event_listen_cancel (void *cls, - struct GNUNET_DB_EventHandler *eh) + struct TALER_EXCHANGEDB_EventHandler *eh) { struct PostgresClosure *pg = cls; @@ -2971,7 +3042,13 @@ postgres_event_listen_cancel (void *cls, } GNUNET_assert (0 == pthread_mutex_unlock (&pg->event_lock)); - GNUNET_PQ_event_listen_cancel (eh); + if (NULL != eh->hn) + { + GNUNET_CONTAINER_heap_remove_node (eh->hn); + eh->hn = NULL; + } + GNUNET_PQ_event_listen_cancel (eh->geh); + GNUNET_free (eh); } @@ -10917,6 +10994,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) pg = GNUNET_new (struct PostgresClosure); pg->cfg = cfg; + pg->event_heap = GNUNET_CONTAINER_heap_create ( + GNUNET_CONTAINER_HEAP_ORDER_MIN); pg->main_self = pthread_self (); /* loaded while single-threaded! */ if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_filename (cfg, @@ -11166,6 +11245,7 @@ libtaler_plugin_exchangedb_postgres_done (void *cls) GNUNET_break (0 == close (pg->event_fd)); pthread_mutex_destroy (&pg->event_lock); + GNUNET_CONTAINER_heap_destroy (pg->event_heap); GNUNET_free (pg->sql_dir); GNUNET_free (pg->currency); GNUNET_free (pg); diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index 61c764a53..4cf6514f3 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -73,8 +73,31 @@ struct TALER_EXCHANGEDB_DenominationKeyInformationP }; +/** + * Signature of events signalling a reseve got funding. + */ +struct TALER_ReserveEventP +{ + /** + * Of type #TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING. + */ + struct GNUNET_DB_EventHeaderP header; + + /** + * Public key of the reserve the event is about. + */ + struct TALER_ReservePublicKeyP reserve_pub; +}; + + GNUNET_NETWORK_STRUCT_END +/** + * Event registration record. + */ +struct TALER_EXCHANGEDB_EventHandler; + + /** * Meta data about an exchange online signing key. */ @@ -2149,16 +2172,16 @@ struct TALER_EXCHANGEDB_Plugin * Register callback to be invoked on events of type @a es. * * @param cls database context to use - * @param session connection to use + * @param timeout how long to wait at most * @param es specification of the event to listen for * @param cb function to call when the event happens, possibly * multiple times (until cancel is invoked) * @param cb_cls closure for @a cb * @return handle useful to cancel the listener */ - struct GNUNET_DB_EventHandler * + struct TALER_EXCHANGEDB_EventHandler * (*event_listen)(void *cls, - struct TALER_EXCHANGEDB_Session *session, + struct GNUNET_TIME_Relative timeout, const struct GNUNET_DB_EventHeaderP *es, GNUNET_DB_EventCallback cb, void *cb_cls); @@ -2171,7 +2194,7 @@ struct TALER_EXCHANGEDB_Plugin */ void (*event_listen_cancel)(void *cls, - struct GNUNET_DB_EventHandler *eh); + struct TALER_EXCHANGEDB_EventHandler *eh); /** -- cgit v1.2.3