summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-08-24 15:45:55 +0200
committerChristian Grothoff <christian@grothoff.org>2021-08-24 15:45:55 +0200
commit3a01b8aba03d16220c94a85b5074717291225fba (patch)
tree4c7c3fa38137cd86fbb334bd7c00ca8b40d28a43 /src
parent287527d0da1777dd1c256ef01e87f43631b6b1d4 (diff)
downloadmerchant-3a01b8aba03d16220c94a85b5074717291225fba.tar.gz
merchant-3a01b8aba03d16220c94a85b5074717291225fba.tar.bz2
merchant-3a01b8aba03d16220c94a85b5074717291225fba.zip
-begin with new backend event subscription logic
Diffstat (limited to 'src')
-rw-r--r--src/backend/taler-merchant-httpd.h44
-rw-r--r--src/backend/taler-merchant-httpd_get-orders-ID.c148
-rw-r--r--src/backend/taler-merchant-httpd_private-get-orders-ID.c112
3 files changed, 251 insertions, 53 deletions
diff --git a/src/backend/taler-merchant-httpd.h b/src/backend/taler-merchant-httpd.h
index 474957aa..06f29e7a 100644
--- a/src/backend/taler-merchant-httpd.h
+++ b/src/backend/taler-merchant-httpd.h
@@ -156,6 +156,50 @@ struct TMH_MerchantInstance
/**
+ * Event triggered when an order is paid.
+ */
+struct TMH_OrderPayEvent
+{
+ /**
+ * Type is #TALER_DBEVENT_MERCHANT_ORDER_PAID
+ */
+ struct GNUNET_DB_EventHeaderP header;
+
+ /**
+ * Always zero (for alignment).
+ */
+ uint32_t reserved;
+
+ /**
+ * Hash of the order ID.
+ */
+ struct GNUNET_HashCode h_order_id;
+};
+
+
+/**
+ * Event triggered when an order's refund is increased.
+ */
+struct TMH_OrderRefundEvent
+{
+ /**
+ * Type is #TALER_DBEVENT_MERCHANT_ORDER_REFUND
+ */
+ struct GNUNET_DB_EventHeaderP header;
+
+ /**
+ * Always zero (for alignment).
+ */
+ uint32_t reserved;
+
+ /**
+ * Hash of the order ID.
+ */
+ struct GNUNET_HashCode h_order_id;
+};
+
+
+/**
* @brief Struct describing an URL and the handler for it.
*
* The overall URL is always @e url_prefix, optionally followed by the
diff --git a/src/backend/taler-merchant-httpd_get-orders-ID.c b/src/backend/taler-merchant-httpd_get-orders-ID.c
index 957e32c9..72b96eb8 100644
--- a/src/backend/taler-merchant-httpd_get-orders-ID.c
+++ b/src/backend/taler-merchant-httpd_get-orders-ID.c
@@ -23,6 +23,7 @@
#include <jansson.h>
#include <gnunet/gnunet_uri_lib.h>
#include <taler/taler_signatures.h>
+#include <taler/taler_dbevents.h>
#include <taler/taler_json_lib.h>
#include <taler/taler_exchange_service.h>
#include "taler-merchant-httpd_exchanges.h"
@@ -75,6 +76,11 @@ struct GetOrderData
struct TMH_SuspendedConnection sc;
/**
+ * Database event we are waiting on to be resuming.
+ */
+ struct GNUNET_DB_EventHandler *eh;
+
+ /**
* Which merchant instance is this for?
*/
struct MerchantInstance *mi;
@@ -175,11 +181,40 @@ TMH_force_wallet_get_order_resume (void)
GNUNET_assert (god->suspended);
god->suspended = false;
MHD_resume_connection (god->sc.con);
+ TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */
}
}
/**
+ * We have received a trigger from the database
+ * that we should (possibly) resume the request.
+ *
+ * @param cls a `struct GetOrderData` to resume
+ * @param extra string encoding refund amount (or NULL)
+ * @param extra_size number of bytes in @a extra
+ */
+static void
+resume_by_event (void *cls,
+ const void *extra,
+ size_t extra_size)
+{
+ struct GetOrderData *god = cls;
+
+ (void) extra;
+ (void) extra_size;
+ if (! god->suspended)
+ return; /* duplicate event is possible */
+ god->suspended = false;
+ GNUNET_CONTAINER_DLL_insert (god_head,
+ god_tail,
+ god);
+ MHD_resume_connection (god->sc.con);
+ TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */
+}
+
+
+/**
* Suspend this @a god until the trigger is satisfied.
*
* @param god request to suspend
@@ -691,6 +726,11 @@ god_cleanup (void *cls)
json_decref (god->contract_terms);
god->contract_terms = NULL;
}
+ if (NULL != god->eh)
+ {
+ TMH_db->event_listen_cancel (god->eh);
+ god->eh = NULL;
+ }
GNUNET_free (god);
}
@@ -770,37 +810,8 @@ TMH_get_orders_ID (const struct TMH_RequestHandler *rh,
god->session_id = MHD_lookup_connection_value (connection,
MHD_GET_ARGUMENT_KIND,
"session_id");
- {
- const char *long_poll_timeout_ms;
-
- long_poll_timeout_ms = MHD_lookup_connection_value (connection,
- MHD_GET_ARGUMENT_KIND,
- "timeout_ms");
- if (NULL != long_poll_timeout_ms)
- {
- unsigned int timeout;
- char dummy;
-
- if (1 != sscanf (long_poll_timeout_ms,
- "%u%c",
- &timeout,
- &dummy))
- {
- GNUNET_break_op (0);
- return TALER_MHD_reply_with_error (connection,
- MHD_HTTP_BAD_REQUEST,
- TALER_EC_GENERIC_PARAMETER_MALFORMED,
- "timeout_ms (must be non-negative number)");
- }
- /* If HTML is requested, we never long poll. Makes no sense */
- if (! god->generate_html)
- god->sc.long_poll_timeout
- = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (
- GNUNET_TIME_UNIT_MILLISECONDS,
- timeout));
- }
- }
+ /* process await_refund_obtained argument */
{
const char *await_refund_obtained_s;
@@ -810,7 +821,8 @@ TMH_get_orders_ID (const struct TMH_RequestHandler *rh,
"await_refund_obtained");
god->sc.awaiting_refund_obtained =
(NULL != await_refund_obtained_s)
- ? 0 == strcasecmp (await_refund_obtained_s, "yes")
+ ? 0 == strcasecmp (await_refund_obtained_s,
+ "yes")
: false;
}
@@ -837,6 +849,80 @@ TMH_get_orders_ID (const struct TMH_RequestHandler *rh,
god->sc.awaiting_refund = true;
}
}
+
+
+ /* process timeout_ms argument */
+ {
+ const char *long_poll_timeout_ms;
+
+ long_poll_timeout_ms = MHD_lookup_connection_value (connection,
+ MHD_GET_ARGUMENT_KIND,
+ "timeout_ms");
+ if (NULL != long_poll_timeout_ms)
+ {
+ unsigned int timeout_ms;
+ char dummy;
+
+ if (1 != sscanf (long_poll_timeout_ms,
+ "%u%c",
+ &timeout_ms,
+ &dummy))
+ {
+ GNUNET_break_op (0);
+ return TALER_MHD_reply_with_error (connection,
+ MHD_HTTP_BAD_REQUEST,
+ TALER_EC_GENERIC_PARAMETER_MALFORMED,
+ "timeout_ms (must be non-negative number)");
+ }
+ /* If HTML is requested, we never long poll. Makes no sense */
+ if (! god->generate_html)
+ {
+ struct GNUNET_TIME_Relative timeout;
+
+ timeout = GNUNET_TIME_relative_multiply (
+ GNUNET_TIME_UNIT_MILLISECONDS,
+ timeout_ms);
+ god->sc.long_poll_timeout
+ = GNUNET_TIME_relative_to_absolute (timeout);
+ if (! GNUNET_TIME_relative_is_zero (timeout))
+ {
+ if (god->sc.awaiting_refund)
+ {
+ struct TMH_OrderPayEvent refund_eh = {
+ .header.size = htons (sizeof (refund_eh)),
+ .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_REFUND)
+ };
+
+ GNUNET_CRYPTO_hash (god->order_id,
+ strlen (god->order_id),
+ &refund_eh.h_order_id);
+ god->eh = TMH_db->event_listen (TMH_db->cls,
+ &refund_eh.header,
+ timeout,
+ &resume_by_event,
+ god);
+ }
+ else /* ! waiting for refund */
+ {
+ struct TMH_OrderPayEvent pay_eh = {
+ .header.size = htons (sizeof (pay_eh)),
+ .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_PAID)
+ };
+
+ GNUNET_CRYPTO_hash (god->order_id,
+ strlen (god->order_id),
+ &pay_eh.h_order_id);
+ god->eh = TMH_db->event_listen (TMH_db->cls,
+ &pay_eh.header,
+ timeout,
+ &resume_by_event,
+ god);
+ }
+ } /* end of timeout non-zero */
+ } /* end of HTML generation NOT requested */
+ } /* end of timeout_ms argument provided */
+ } /* end of timeout_ms argument handling */
+
} /* end of first-time initialization / sanity checks */
if (god->suspended)
diff --git a/src/backend/taler-merchant-httpd_private-get-orders-ID.c b/src/backend/taler-merchant-httpd_private-get-orders-ID.c
index e11ce9e9..6bba091b 100644
--- a/src/backend/taler-merchant-httpd_private-get-orders-ID.c
+++ b/src/backend/taler-merchant-httpd_private-get-orders-ID.c
@@ -23,6 +23,7 @@
#include "taler-merchant-httpd_private-get-orders-ID.h"
#include "taler-merchant-httpd_get-orders-ID.h"
#include <taler/taler_json_lib.h>
+#include <taler/taler_dbevents.h>
#include "taler-merchant-httpd_mhd.h"
#include "taler-merchant-httpd_exchanges.h"
#include "taler-merchant-httpd_helper.h"
@@ -142,11 +143,6 @@ struct GetOrderRequestContext
struct GetOrderRequestContext *prev;
/**
- * Handle to the exchange, only valid while the @e fo succeeds.
- */
- struct TALER_EXCHANGE_Handle *eh;
-
- /**
* Head of DLL of individual queries for transfer data.
*/
struct TransferQuery *tq_head;
@@ -162,6 +158,11 @@ struct GetOrderRequestContext
struct GNUNET_SCHEDULER_Task *tt;
/**
+ * Database event we are waiting on to be resuming.
+ */
+ struct GNUNET_DB_EventHandler *eh;
+
+ /**
* Contract terms of the payment we are checking. NULL when they
* are not (yet) known.
*/
@@ -246,6 +247,11 @@ struct GetOrderRequestContext
unsigned int wire_hc;
/**
+ * Set to true if this request is currently suspended.
+ */
+ bool suspended;
+
+ /**
* Set to true if this payment has been refunded and
* @e refund_amount is initialized.
*/
@@ -292,6 +298,7 @@ gorc_resume (struct GetOrderRequestContext *gorc,
{
struct TransferQuery *tq;
+ GNUNET_assert (gorc->suspended);
if (NULL != gorc->tt)
{
GNUNET_SCHEDULER_cancel (gorc->tt);
@@ -315,6 +322,35 @@ gorc_resume (struct GetOrderRequestContext *gorc,
GNUNET_CONTAINER_DLL_remove (gorc_head,
gorc_tail,
gorc);
+ gorc->suspended = false;
+ MHD_resume_connection (gorc->sc.con);
+ TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */
+}
+
+
+/**
+ * We have received a trigger from the database
+ * that we should (possibly) resume the request.
+ *
+ * @param cls a `struct GetOrderRequestContext` to resume
+ * @param extra string encoding refund amount (or NULL)
+ * @param extra_size number of bytes in @a extra
+ */
+static void
+resume_by_event (void *cls,
+ const void *extra,
+ size_t extra_size)
+{
+ struct GetOrderRequestContext *gorc = cls;
+
+ (void) extra;
+ (void) extra_size;
+ if (! gorc->suspended)
+ return; /* duplicate event is possible */
+ gorc->suspended = false;
+ GNUNET_CONTAINER_DLL_insert (gorc_head,
+ gorc_tail,
+ gorc);
MHD_resume_connection (gorc->sc.con);
TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */
}
@@ -615,6 +651,11 @@ gorc_cleanup (void *cls)
if (NULL != gorc->wire_reports)
json_decref (gorc->wire_reports);
GNUNET_assert (NULL == gorc->tt);
+ if (NULL != gorc->eh)
+ {
+ TMH_db->event_listen_cancel (gorc->eh);
+ gorc->eh = NULL;
+ }
GNUNET_free (gorc);
}
@@ -772,6 +813,20 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh,
gorc->session_id = MHD_lookup_connection_value (connection,
MHD_GET_ARGUMENT_KIND,
"session_id");
+ /* process 'transfer' argument */
+ {
+ const char *transfer_s;
+
+ transfer_s = MHD_lookup_connection_value (connection,
+ MHD_GET_ARGUMENT_KIND,
+ "transfer");
+ if ( (NULL != transfer_s) &&
+ (0 == strcasecmp (transfer_s,
+ "yes")) )
+ gorc->transfer_status_requested = true;
+ }
+
+ /* process 'timeout_ms' argument */
{
const char *long_poll_timeout_s;
@@ -780,11 +835,14 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh,
"timeout_ms");
if (NULL != long_poll_timeout_s)
{
- unsigned long long timeout;
+ unsigned int timeout_ms;
+ char dummy;
+ struct GNUNET_TIME_Relative timeout;
if (1 != sscanf (long_poll_timeout_s,
- "%llu",
- &timeout))
+ "%u%c",
+ &timeout_ms,
+ &dummy))
{
GNUNET_break_op (0);
return TALER_MHD_reply_with_error (connection,
@@ -792,10 +850,27 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh,
TALER_EC_GENERIC_PARAMETER_MALFORMED,
"timeout_ms must be non-negative number");
}
+ timeout = GNUNET_TIME_relative_multiply (
+ GNUNET_TIME_UNIT_MILLISECONDS,
+ timeout_ms);
gorc->sc.long_poll_timeout
- = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply (
- GNUNET_TIME_UNIT_MILLISECONDS,
- timeout));
+ = GNUNET_TIME_relative_to_absolute (timeout);
+ if (! GNUNET_TIME_relative_is_zero (timeout))
+ {
+ struct TMH_OrderPayEvent pay_eh = {
+ .header.size = htons (sizeof (pay_eh)),
+ .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_PAID)
+ };
+
+ GNUNET_CRYPTO_hash (hc->infix,
+ strlen (hc->infix),
+ &pay_eh.h_order_id);
+ gorc->eh = TMH_db->event_listen (TMH_db->cls,
+ &pay_eh.header,
+ timeout,
+ &resume_by_event,
+ gorc);
+ }
}
else
{
@@ -803,18 +878,8 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh,
}
}
- {
- const char *transfer_s;
- transfer_s = MHD_lookup_connection_value (connection,
- MHD_GET_ARGUMENT_KIND,
- "transfer");
- if ( (NULL != transfer_s) &&
- (0 == strcasecmp (transfer_s,
- "yes")) )
- gorc->transfer_status_requested = true;
- }
- }
+ } /* end first-time per-request initialization */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting GET /private/orders/%s processing with timeout %s\n",
@@ -1073,6 +1138,7 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh,
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Suspending GET /private/orders/%s\n",
hc->infix);
+ gorc->suspended = true;
TMH_long_poll_suspend (hc->infix,
gorc->session_id,
gorc->fulfillment_url,
@@ -1116,6 +1182,7 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh,
gorc->tt = GNUNET_SCHEDULER_add_delayed (EXCHANGE_TIMEOUT,
&exchange_timeout_cb,
gorc);
+ gorc->suspended = true;
MHD_suspend_connection (connection);
return MHD_YES;
}
@@ -1127,6 +1194,7 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh,
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Suspending GET /private/orders/%s\n",
hc->infix);
+ gorc->suspended = true;
TMH_long_poll_suspend (hc->infix,
gorc->session_id,
gorc->fulfillment_url,