From 10f9272e45ea97d1b8f8059c9d285049ff4b606d Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 22 Aug 2021 00:12:18 +0200 Subject: -implement long polling support on reserve status (but not yet in C client library) --- src/exchange/taler-exchange-httpd.c | 26 +++- src/exchange/taler-exchange-httpd_reserves_get.c | 176 +++++++++++++++++++++-- src/exchange/taler-exchange-httpd_reserves_get.h | 9 ++ 3 files changed, 194 insertions(+), 17 deletions(-) (limited to 'src/exchange') diff --git a/src/exchange/taler-exchange-httpd.c b/src/exchange/taler-exchange-httpd.c index 80649c0bc..c06695e4d 100644 --- a/src/exchange/taler-exchange-httpd.c +++ b/src/exchange/taler-exchange-httpd.c @@ -1430,8 +1430,14 @@ run_single_request (void) } MHD_run (mhd); } - TEH_resume_keys_requests (true); - MHD_stop_daemon (mhd); + { + MHD_socket sock = MHD_quiesce_daemon (mhd); + + TEH_resume_keys_requests (true); + TEH_reserves_get_cleanup (); + MHD_stop_daemon (mhd); + GNUNET_break (0 == close (sock)); + } mhd = NULL; if (cld != waitpid (cld, &status, @@ -1494,8 +1500,15 @@ run_main_loop (int fh, { case GNUNET_OK: case GNUNET_SYSERR: - TEH_resume_keys_requests (true); - MHD_stop_daemon (mhd); + { + MHD_socket sock = MHD_quiesce_daemon (mhd); + + TEH_resume_keys_requests (true); + TEH_reserves_get_cleanup (); + MHD_stop_daemon (mhd); + GNUNET_break (0 == close (sock)); + } + mhd = NULL; break; case GNUNET_NO: { @@ -1507,7 +1520,9 @@ run_main_loop (int fh, flags = fcntl (sock, F_GETFD); GNUNET_assert (-1 != flags); flags &= ~FD_CLOEXEC; - GNUNET_assert (-1 != fcntl (sock, F_SETFD, flags)); + GNUNET_assert (-1 != fcntl (sock, + F_SETFD, + flags)); chld = fork (); if (-1 == chld) { @@ -1551,6 +1566,7 @@ run_main_loop (int fh, sleep (1); /* Now we're really done, practice clean shutdown */ TEH_resume_keys_requests (true); + TEH_reserves_get_cleanup (); MHD_stop_daemon (mhd); } break; diff --git a/src/exchange/taler-exchange-httpd_reserves_get.c b/src/exchange/taler-exchange-httpd_reserves_get.c index d08543a4e..6501f600a 100644 --- a/src/exchange/taler-exchange-httpd_reserves_get.c +++ b/src/exchange/taler-exchange-httpd_reserves_get.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2014-2020 Taler Systems SA + Copyright (C) 2014-2021 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -25,6 +25,7 @@ #include #include "taler_mhd_lib.h" #include "taler_json_lib.h" +#include "taler_dbevents.h" #include "taler-exchange-httpd_reserves_get.h" #include "taler-exchange-httpd_responses.h" @@ -49,25 +50,113 @@ struct ReservePoller */ struct MHD_Connection *connection; - /** - * Entry in the timeout heap. - */ - struct GNUNET_CONTAINER_HeapNode *hn; - /** * Subscription for the database event we are * waiting for. */ - struct GNUNET_DB_EventHandler *eh; + struct TALER_EXCHANGEDB_EventHandler *eh; /** * When will this request time out? */ struct GNUNET_TIME_Absolute timeout; + /** + * True if we are still suspended. + */ + bool suspended; + }; +/** + * Head of list of requests in long polling. + */ +static struct ReservePoller *rp_head; + +/** + * Tail of list of requests in long polling. + */ +static struct ReservePoller *rp_tail; + + +void +TEH_reserves_get_cleanup () +{ + struct ReservePoller *rp; + + while (NULL != (rp = rp_head)) + { + GNUNET_CONTAINER_DLL_remove (rp_head, + rp_tail, + rp); + if (rp->suspended) + { + rp->suspended = false; + MHD_resume_connection (rp->connection); + } + } +} + + +/** + * Function called once a connection is done to + * clean up the `struct ReservePoller` state. + * + * @param rc context to clean up for + */ +static void +rp_cleanup (struct TEH_RequestContext *rc) +{ + struct ReservePoller *rp = rc->rh_ctx; + + if (NULL != rp->eh) + { + TEH_plugin->event_listen_cancel (TEH_plugin->cls, + rp->eh); + rp->eh = NULL; + } + GNUNET_free (rp); +} + + +/** + * Function called on events received from Postgres. + * Wakes up long pollers. + * + * @param cls the `struct TEH_RequestContext *` + * @param extra additional event data provided + * @param extra_size number of bytes in @a extra + */ +static void +db_event_cb (void *cls, + const void *extra, + size_t extra_size) +{ + struct TEH_RequestContext *rc = cls; + struct ReservePoller *rp = rc->rh_ctx; + struct GNUNET_AsyncScopeSave old_scope; + + (void) extra; + (void) extra_size; + if (NULL == rp) + return; /* event triggered while main transaction + was still running */ + if (! rp->suspended) + return; /* might get multiple wake-up events */ + rp->suspended = false; + GNUNET_async_scope_enter (&rc->async_scope_id, + &old_scope); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Resuming from long-polling on reserve\n"); + GNUNET_CONTAINER_DLL_remove (rp_head, + rp_tail, + rp); + MHD_resume_connection (rp->connection); + GNUNET_async_scope_restore (&old_scope); +} + + /** * Send reserve history to client. * @@ -157,6 +246,8 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc, { struct ReserveHistoryContext rsc; MHD_RESULT mhd_ret; + struct GNUNET_TIME_Relative timeout; + struct TALER_EXCHANGEDB_EventHandler *eh = NULL; if (GNUNET_OK != GNUNET_STRINGS_string_to_data (args[0], @@ -170,6 +261,47 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc, TALER_EC_MERCHANT_GENERIC_RESERVE_PUB_MALFORMED, args[0]); } + { + const char *long_poll_timeout_ms; + + long_poll_timeout_ms + = MHD_lookup_connection_value (rc->connection, + MHD_GET_ARGUMENT_KIND, + "timeout_ms"); + if (NULL != long_poll_timeout_ms) + { + unsigned int timeout_ms; + char dummy; + + if (1 != sscanf (long_poll_timeout_ms, + "%u%c", + &timeout_ms, + &dummy)) + { + GNUNET_break_op (0); + return TALER_MHD_reply_with_error (rc->connection, + MHD_HTTP_BAD_REQUEST, + TALER_EC_GENERIC_PARAMETER_MALFORMED, + "timeout_ms (must be non-negative number)"); + } + timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, + timeout_ms); + } + } + if (! GNUNET_TIME_relative_is_zero (timeout)) + { + struct TALER_ReserveEventP rep = { + .header.size = htons (sizeof (rep)), + .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING), + .reserve_pub = rsc.reserve_pub + }; + + eh = TEH_plugin->event_listen (TEH_plugin->cls, + timeout, + &rep.header, + &db_event_cb, + rc); + } rsc.rh = NULL; if (GNUNET_OK != TEH_DB_run_transaction (rc->connection, @@ -178,13 +310,33 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc, &reserve_history_transaction, &rsc)) return mhd_ret; - /* generate proper response */ if (NULL == rsc.rh) - return TALER_MHD_reply_with_error (rc->connection, - MHD_HTTP_NOT_FOUND, - TALER_EC_EXCHANGE_RESERVES_GET_STATUS_UNKNOWN, - args[0]); + { + struct ReservePoller *rp = rc->rh_ctx; + + if ( (NULL != rp) || + (GNUNET_TIME_relative_is_zero (timeout)) ) + { + return TALER_MHD_reply_with_error (rc->connection, + MHD_HTTP_NOT_FOUND, + TALER_EC_EXCHANGE_RESERVES_GET_STATUS_UNKNOWN, + args[0]); + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Long-polling on reserve for %s\n", + GNUNET_STRINGS_relative_time_to_string (timeout, + GNUNET_YES)); + rp = GNUNET_new (struct ReservePoller); + rp->connection = rc->connection; + rp->timeout = GNUNET_TIME_relative_to_absolute (timeout); + rp->eh = eh; + rc->rh_ctx = rp; + rc->rh_cleaner = &rp_cleanup; + rp->suspended = true; + MHD_suspend_connection (rc->connection); + return MHD_YES; + } mhd_ret = reply_reserve_history_success (rc->connection, rsc.rh); TEH_plugin->free_reserve_history (TEH_plugin->cls, diff --git a/src/exchange/taler-exchange-httpd_reserves_get.h b/src/exchange/taler-exchange-httpd_reserves_get.h index 1eb9ab60e..30c6559f6 100644 --- a/src/exchange/taler-exchange-httpd_reserves_get.h +++ b/src/exchange/taler-exchange-httpd_reserves_get.h @@ -27,6 +27,15 @@ #include "taler-exchange-httpd.h" +/** + * Shutdown reserves-get subsystem. Resumes all + * suspended long-polling clients and cleans up + * data structures. + */ +void +TEH_reserves_get_cleanup (void); + + /** * Handle a GET "/reserves/" request. Parses the * given "reserve_pub" in @a args (which should contain the -- cgit v1.2.3