summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-08-22 00:12:18 +0200
committerChristian Grothoff <christian@grothoff.org>2021-08-22 00:12:18 +0200
commit10f9272e45ea97d1b8f8059c9d285049ff4b606d (patch)
treedbb087c7ce3c4681d75252d79b6170460a2e2ec4 /src
parent9ad3469f07cfd944c2012a44851cdabf46703e22 (diff)
downloadexchange-10f9272e45ea97d1b8f8059c9d285049ff4b606d.tar.gz
exchange-10f9272e45ea97d1b8f8059c9d285049ff4b606d.tar.bz2
exchange-10f9272e45ea97d1b8f8059c9d285049ff4b606d.zip
-implement long polling support on reserve status (but not yet in C client library)
Diffstat (limited to 'src')
-rw-r--r--src/exchange/taler-exchange-httpd.c26
-rw-r--r--src/exchange/taler-exchange-httpd_reserves_get.c176
-rw-r--r--src/exchange/taler-exchange-httpd_reserves_get.h9
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c102
-rw-r--r--src/include/taler_exchangedb_plugin.h31
5 files changed, 312 insertions, 32 deletions
diff --git a/src/exchange/taler-exchange-httpd.c b/src/exchange/taler-exchange-httpd.c
index 80649c0bc..c06695e4d 100644
--- a/src/exchange/taler-exchange-httpd.c
+++ b/src/exchange/taler-exchange-httpd.c
@@ -1430,8 +1430,14 @@ run_single_request (void)
}
MHD_run (mhd);
}
- TEH_resume_keys_requests (true);
- MHD_stop_daemon (mhd);
+ {
+ MHD_socket sock = MHD_quiesce_daemon (mhd);
+
+ TEH_resume_keys_requests (true);
+ TEH_reserves_get_cleanup ();
+ MHD_stop_daemon (mhd);
+ GNUNET_break (0 == close (sock));
+ }
mhd = NULL;
if (cld != waitpid (cld,
&status,
@@ -1494,8 +1500,15 @@ run_main_loop (int fh,
{
case GNUNET_OK:
case GNUNET_SYSERR:
- TEH_resume_keys_requests (true);
- MHD_stop_daemon (mhd);
+ {
+ MHD_socket sock = MHD_quiesce_daemon (mhd);
+
+ TEH_resume_keys_requests (true);
+ TEH_reserves_get_cleanup ();
+ MHD_stop_daemon (mhd);
+ GNUNET_break (0 == close (sock));
+ }
+ mhd = NULL;
break;
case GNUNET_NO:
{
@@ -1507,7 +1520,9 @@ run_main_loop (int fh,
flags = fcntl (sock, F_GETFD);
GNUNET_assert (-1 != flags);
flags &= ~FD_CLOEXEC;
- GNUNET_assert (-1 != fcntl (sock, F_SETFD, flags));
+ GNUNET_assert (-1 != fcntl (sock,
+ F_SETFD,
+ flags));
chld = fork ();
if (-1 == chld)
{
@@ -1551,6 +1566,7 @@ run_main_loop (int fh,
sleep (1);
/* Now we're really done, practice clean shutdown */
TEH_resume_keys_requests (true);
+ TEH_reserves_get_cleanup ();
MHD_stop_daemon (mhd);
}
break;
diff --git a/src/exchange/taler-exchange-httpd_reserves_get.c b/src/exchange/taler-exchange-httpd_reserves_get.c
index d08543a4e..6501f600a 100644
--- a/src/exchange/taler-exchange-httpd_reserves_get.c
+++ b/src/exchange/taler-exchange-httpd_reserves_get.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2014-2020 Taler Systems SA
+ Copyright (C) 2014-2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -25,6 +25,7 @@
#include <jansson.h>
#include "taler_mhd_lib.h"
#include "taler_json_lib.h"
+#include "taler_dbevents.h"
#include "taler-exchange-httpd_reserves_get.h"
#include "taler-exchange-httpd_responses.h"
@@ -50,25 +51,113 @@ struct ReservePoller
struct MHD_Connection *connection;
/**
- * Entry in the timeout heap.
- */
- struct GNUNET_CONTAINER_HeapNode *hn;
-
- /**
* Subscription for the database event we are
* waiting for.
*/
- struct GNUNET_DB_EventHandler *eh;
+ struct TALER_EXCHANGEDB_EventHandler *eh;
/**
* When will this request time out?
*/
struct GNUNET_TIME_Absolute timeout;
+ /**
+ * True if we are still suspended.
+ */
+ bool suspended;
+
};
/**
+ * Head of list of requests in long polling.
+ */
+static struct ReservePoller *rp_head;
+
+/**
+ * Tail of list of requests in long polling.
+ */
+static struct ReservePoller *rp_tail;
+
+
+void
+TEH_reserves_get_cleanup ()
+{
+ struct ReservePoller *rp;
+
+ while (NULL != (rp = rp_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (rp_head,
+ rp_tail,
+ rp);
+ if (rp->suspended)
+ {
+ rp->suspended = false;
+ MHD_resume_connection (rp->connection);
+ }
+ }
+}
+
+
+/**
+ * Function called once a connection is done to
+ * clean up the `struct ReservePoller` state.
+ *
+ * @param rc context to clean up for
+ */
+static void
+rp_cleanup (struct TEH_RequestContext *rc)
+{
+ struct ReservePoller *rp = rc->rh_ctx;
+
+ if (NULL != rp->eh)
+ {
+ TEH_plugin->event_listen_cancel (TEH_plugin->cls,
+ rp->eh);
+ rp->eh = NULL;
+ }
+ GNUNET_free (rp);
+}
+
+
+/**
+ * Function called on events received from Postgres.
+ * Wakes up long pollers.
+ *
+ * @param cls the `struct TEH_RequestContext *`
+ * @param extra additional event data provided
+ * @param extra_size number of bytes in @a extra
+ */
+static void
+db_event_cb (void *cls,
+ const void *extra,
+ size_t extra_size)
+{
+ struct TEH_RequestContext *rc = cls;
+ struct ReservePoller *rp = rc->rh_ctx;
+ struct GNUNET_AsyncScopeSave old_scope;
+
+ (void) extra;
+ (void) extra_size;
+ if (NULL == rp)
+ return; /* event triggered while main transaction
+ was still running */
+ if (! rp->suspended)
+ return; /* might get multiple wake-up events */
+ rp->suspended = false;
+ GNUNET_async_scope_enter (&rc->async_scope_id,
+ &old_scope);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Resuming from long-polling on reserve\n");
+ GNUNET_CONTAINER_DLL_remove (rp_head,
+ rp_tail,
+ rp);
+ MHD_resume_connection (rp->connection);
+ GNUNET_async_scope_restore (&old_scope);
+}
+
+
+/**
* Send reserve history to client.
*
* @param connection connection to the client
@@ -157,6 +246,8 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc,
{
struct ReserveHistoryContext rsc;
MHD_RESULT mhd_ret;
+ struct GNUNET_TIME_Relative timeout;
+ struct TALER_EXCHANGEDB_EventHandler *eh = NULL;
if (GNUNET_OK !=
GNUNET_STRINGS_string_to_data (args[0],
@@ -170,6 +261,47 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc,
TALER_EC_MERCHANT_GENERIC_RESERVE_PUB_MALFORMED,
args[0]);
}
+ {
+ const char *long_poll_timeout_ms;
+
+ long_poll_timeout_ms
+ = MHD_lookup_connection_value (rc->connection,
+ MHD_GET_ARGUMENT_KIND,
+ "timeout_ms");
+ if (NULL != long_poll_timeout_ms)
+ {
+ unsigned int timeout_ms;
+ char dummy;
+
+ if (1 != sscanf (long_poll_timeout_ms,
+ "%u%c",
+ &timeout_ms,
+ &dummy))
+ {
+ GNUNET_break_op (0);
+ return TALER_MHD_reply_with_error (rc->connection,
+ MHD_HTTP_BAD_REQUEST,
+ TALER_EC_GENERIC_PARAMETER_MALFORMED,
+ "timeout_ms (must be non-negative number)");
+ }
+ timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS,
+ timeout_ms);
+ }
+ }
+ if (! GNUNET_TIME_relative_is_zero (timeout))
+ {
+ struct TALER_ReserveEventP rep = {
+ .header.size = htons (sizeof (rep)),
+ .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING),
+ .reserve_pub = rsc.reserve_pub
+ };
+
+ eh = TEH_plugin->event_listen (TEH_plugin->cls,
+ timeout,
+ &rep.header,
+ &db_event_cb,
+ rc);
+ }
rsc.rh = NULL;
if (GNUNET_OK !=
TEH_DB_run_transaction (rc->connection,
@@ -178,13 +310,33 @@ TEH_handler_reserves_get (struct TEH_RequestContext *rc,
&reserve_history_transaction,
&rsc))
return mhd_ret;
-
/* generate proper response */
if (NULL == rsc.rh)
- return TALER_MHD_reply_with_error (rc->connection,
- MHD_HTTP_NOT_FOUND,
- TALER_EC_EXCHANGE_RESERVES_GET_STATUS_UNKNOWN,
- args[0]);
+ {
+ struct ReservePoller *rp = rc->rh_ctx;
+
+ if ( (NULL != rp) ||
+ (GNUNET_TIME_relative_is_zero (timeout)) )
+ {
+ return TALER_MHD_reply_with_error (rc->connection,
+ MHD_HTTP_NOT_FOUND,
+ TALER_EC_EXCHANGE_RESERVES_GET_STATUS_UNKNOWN,
+ args[0]);
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Long-polling on reserve for %s\n",
+ GNUNET_STRINGS_relative_time_to_string (timeout,
+ GNUNET_YES));
+ rp = GNUNET_new (struct ReservePoller);
+ rp->connection = rc->connection;
+ rp->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+ rp->eh = eh;
+ rc->rh_ctx = rp;
+ rc->rh_cleaner = &rp_cleanup;
+ rp->suspended = true;
+ MHD_suspend_connection (rc->connection);
+ return MHD_YES;
+ }
mhd_ret = reply_reserve_history_success (rc->connection,
rsc.rh);
TEH_plugin->free_reserve_history (TEH_plugin->cls,
diff --git a/src/exchange/taler-exchange-httpd_reserves_get.h b/src/exchange/taler-exchange-httpd_reserves_get.h
index 1eb9ab60e..30c6559f6 100644
--- a/src/exchange/taler-exchange-httpd_reserves_get.h
+++ b/src/exchange/taler-exchange-httpd_reserves_get.h
@@ -28,6 +28,15 @@
/**
+ * Shutdown reserves-get subsystem. Resumes all
+ * suspended long-polling clients and cleans up
+ * data structures.
+ */
+void
+TEH_reserves_get_cleanup (void);
+
+
+/**
* Handle a GET "/reserves/" request. Parses the
* given "reserve_pub" in @a args (which should contain the
* EdDSA public key of a reserve) and then respond with the
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index 1d05fb499..2d7ca0573 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -110,6 +110,39 @@ struct TALER_EXCHANGEDB_Session
/**
+ * Event registration record.
+ */
+struct TALER_EXCHANGEDB_EventHandler
+{
+ /**
+ * Underlying GNUnet event handler.
+ */
+ struct GNUNET_DB_EventHandler *geh;
+
+ /**
+ * Entry in the heap.
+ */
+ struct GNUNET_CONTAINER_HeapNode *hn;
+
+ /**
+ * Our timeout.
+ */
+ struct GNUNET_TIME_Absolute timeout;
+
+ /**
+ * Callback to invoke (on @e timeout).
+ */
+ GNUNET_DB_EventCallback cb;
+
+ /**
+ * Closure for @e cb.
+ */
+ void *cb_cls;
+
+};
+
+
+/**
* Type of the "cls" argument given to each of the functions in
* our API.
*/
@@ -133,6 +166,12 @@ struct PostgresClosure
char *sql_dir;
/**
+ * Heap of `struct TALER_EXCHANGEDB_EventHandler`
+ * by timeout.
+ */
+ struct GNUNET_CONTAINER_Heap *event_heap;
+
+ /**
* After how long should idle reserves be closed?
*/
struct GNUNET_TIME_Relative idle_reserve_expiration_time;
@@ -2832,18 +2871,41 @@ handle_events (void *cls)
}
};
nfds_t nfds = (-1 == pg->pg_sock) ? 1 : 2;
+ struct TALER_EXCHANGEDB_EventHandler *r;
GNUNET_assert (0 ==
pthread_mutex_lock (&pg->event_lock));
while (0 != pg->listener_count)
{
int ret;
+ int timeout = -1; /* no timeout */
GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock));
+ while (1)
+ {
+ r = GNUNET_CONTAINER_heap_peek (pg->event_heap);
+ if (NULL == r)
+ break;
+ if (GNUNET_TIME_absolute_is_future (r->timeout))
+ break;
+ GNUNET_assert (r ==
+ GNUNET_CONTAINER_heap_remove_root (pg->event_heap));
+ r->hn = NULL;
+ r->cb (r->cb_cls,
+ NULL,
+ 0);
+ }
+ if (NULL != r)
+ {
+ struct GNUNET_TIME_Relative rem;
+
+ rem = GNUNET_TIME_absolute_get_remaining (r->timeout);
+ timeout = rem.rel_value_us / GNUNET_TIME_UNIT_MILLISECONDS.rel_value_us;
+ }
ret = poll (pfds,
nfds,
- -1 /* no timeout */);
+ timeout);
if (-1 == ret)
GNUNET_log_strerror (GNUNET_ERROR_TYPE_WARNING,
"poll");
@@ -2909,16 +2971,30 @@ pq_socket_cb (void *cls,
* @param cb_cls closure for @a cb
* @return handle useful to cancel the listener
*/
-static struct GNUNET_DB_EventHandler *
+static struct TALER_EXCHANGEDB_EventHandler *
postgres_event_listen (void *cls,
- struct TALER_EXCHANGEDB_Session *session,
+ struct GNUNET_TIME_Relative timeout,
const struct GNUNET_DB_EventHeaderP *es,
GNUNET_DB_EventCallback cb,
void *cb_cls)
{
struct PostgresClosure *pg = cls;
- struct GNUNET_DB_EventHandler *eh;
+ struct TALER_EXCHANGEDB_EventHandler *eh;
+ struct TALER_EXCHANGEDB_Session *session;
+ session = postgres_get_session (pg);
+ eh = GNUNET_new (struct TALER_EXCHANGEDB_EventHandler);
+ eh->cb = cb;
+ eh->cb_cls = cb_cls;
+ eh->timeout = GNUNET_TIME_relative_to_absolute (timeout);
+ eh->geh = GNUNET_PQ_event_listen (session->conn,
+ es,
+ cb,
+ cb_cls);
+ GNUNET_assert (NULL != eh->geh);
+ eh->hn = GNUNET_CONTAINER_heap_insert (pg->event_heap,
+ eh,
+ eh->timeout.abs_value_us);
GNUNET_assert (0 ==
pthread_mutex_lock (&pg->event_lock));
pg->listener_count++;
@@ -2932,11 +3008,6 @@ postgres_event_listen (void *cls,
}
GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock));
- eh = GNUNET_PQ_event_listen (session->conn,
- es,
- cb,
- cb_cls);
- GNUNET_assert (NULL != eh);
return eh;
}
@@ -2949,7 +3020,7 @@ postgres_event_listen (void *cls,
*/
static void
postgres_event_listen_cancel (void *cls,
- struct GNUNET_DB_EventHandler *eh)
+ struct TALER_EXCHANGEDB_EventHandler *eh)
{
struct PostgresClosure *pg = cls;
@@ -2971,7 +3042,13 @@ postgres_event_listen_cancel (void *cls,
}
GNUNET_assert (0 ==
pthread_mutex_unlock (&pg->event_lock));
- GNUNET_PQ_event_listen_cancel (eh);
+ if (NULL != eh->hn)
+ {
+ GNUNET_CONTAINER_heap_remove_node (eh->hn);
+ eh->hn = NULL;
+ }
+ GNUNET_PQ_event_listen_cancel (eh->geh);
+ GNUNET_free (eh);
}
@@ -10917,6 +10994,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
pg = GNUNET_new (struct PostgresClosure);
pg->cfg = cfg;
+ pg->event_heap = GNUNET_CONTAINER_heap_create (
+ GNUNET_CONTAINER_HEAP_ORDER_MIN);
pg->main_self = pthread_self (); /* loaded while single-threaded! */
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_filename (cfg,
@@ -11166,6 +11245,7 @@ libtaler_plugin_exchangedb_postgres_done (void *cls)
GNUNET_break (0 ==
close (pg->event_fd));
pthread_mutex_destroy (&pg->event_lock);
+ GNUNET_CONTAINER_heap_destroy (pg->event_heap);
GNUNET_free (pg->sql_dir);
GNUNET_free (pg->currency);
GNUNET_free (pg);
diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h
index 61c764a53..4cf6514f3 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -73,9 +73,32 @@ struct TALER_EXCHANGEDB_DenominationKeyInformationP
};
+/**
+ * Signature of events signalling a reseve got funding.
+ */
+struct TALER_ReserveEventP
+{
+ /**
+ * Of type #TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING.
+ */
+ struct GNUNET_DB_EventHeaderP header;
+
+ /**
+ * Public key of the reserve the event is about.
+ */
+ struct TALER_ReservePublicKeyP reserve_pub;
+};
+
+
GNUNET_NETWORK_STRUCT_END
/**
+ * Event registration record.
+ */
+struct TALER_EXCHANGEDB_EventHandler;
+
+
+/**
* Meta data about an exchange online signing key.
*/
struct TALER_EXCHANGEDB_SignkeyMetaData
@@ -2149,16 +2172,16 @@ struct TALER_EXCHANGEDB_Plugin
* Register callback to be invoked on events of type @a es.
*
* @param cls database context to use
- * @param session connection to use
+ * @param timeout how long to wait at most
* @param es specification of the event to listen for
* @param cb function to call when the event happens, possibly
* multiple times (until cancel is invoked)
* @param cb_cls closure for @a cb
* @return handle useful to cancel the listener
*/
- struct GNUNET_DB_EventHandler *
+ struct TALER_EXCHANGEDB_EventHandler *
(*event_listen)(void *cls,
- struct TALER_EXCHANGEDB_Session *session,
+ struct GNUNET_TIME_Relative timeout,
const struct GNUNET_DB_EventHeaderP *es,
GNUNET_DB_EventCallback cb,
void *cb_cls);
@@ -2171,7 +2194,7 @@ struct TALER_EXCHANGEDB_Plugin
*/
void
(*event_listen_cancel)(void *cls,
- struct GNUNET_DB_EventHandler *eh);
+ struct TALER_EXCHANGEDB_EventHandler *eh);
/**