From 3a01b8aba03d16220c94a85b5074717291225fba Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 24 Aug 2021 15:45:55 +0200 Subject: -begin with new backend event subscription logic --- .../taler-merchant-httpd_private-get-orders-ID.c | 112 +++++++++++++++++---- 1 file changed, 90 insertions(+), 22 deletions(-) (limited to 'src/backend/taler-merchant-httpd_private-get-orders-ID.c') diff --git a/src/backend/taler-merchant-httpd_private-get-orders-ID.c b/src/backend/taler-merchant-httpd_private-get-orders-ID.c index e11ce9e9..6bba091b 100644 --- a/src/backend/taler-merchant-httpd_private-get-orders-ID.c +++ b/src/backend/taler-merchant-httpd_private-get-orders-ID.c @@ -23,6 +23,7 @@ #include "taler-merchant-httpd_private-get-orders-ID.h" #include "taler-merchant-httpd_get-orders-ID.h" #include +#include #include "taler-merchant-httpd_mhd.h" #include "taler-merchant-httpd_exchanges.h" #include "taler-merchant-httpd_helper.h" @@ -141,11 +142,6 @@ struct GetOrderRequestContext */ struct GetOrderRequestContext *prev; - /** - * Handle to the exchange, only valid while the @e fo succeeds. - */ - struct TALER_EXCHANGE_Handle *eh; - /** * Head of DLL of individual queries for transfer data. */ @@ -161,6 +157,11 @@ struct GetOrderRequestContext */ struct GNUNET_SCHEDULER_Task *tt; + /** + * Database event we are waiting on to be resuming. + */ + struct GNUNET_DB_EventHandler *eh; + /** * Contract terms of the payment we are checking. NULL when they * are not (yet) known. @@ -245,6 +246,11 @@ struct GetOrderRequestContext */ unsigned int wire_hc; + /** + * Set to true if this request is currently suspended. + */ + bool suspended; + /** * Set to true if this payment has been refunded and * @e refund_amount is initialized. @@ -292,6 +298,7 @@ gorc_resume (struct GetOrderRequestContext *gorc, { struct TransferQuery *tq; + GNUNET_assert (gorc->suspended); if (NULL != gorc->tt) { GNUNET_SCHEDULER_cancel (gorc->tt); @@ -315,6 +322,35 @@ gorc_resume (struct GetOrderRequestContext *gorc, GNUNET_CONTAINER_DLL_remove (gorc_head, gorc_tail, gorc); + gorc->suspended = false; + MHD_resume_connection (gorc->sc.con); + TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ +} + + +/** + * We have received a trigger from the database + * that we should (possibly) resume the request. + * + * @param cls a `struct GetOrderRequestContext` to resume + * @param extra string encoding refund amount (or NULL) + * @param extra_size number of bytes in @a extra + */ +static void +resume_by_event (void *cls, + const void *extra, + size_t extra_size) +{ + struct GetOrderRequestContext *gorc = cls; + + (void) extra; + (void) extra_size; + if (! gorc->suspended) + return; /* duplicate event is possible */ + gorc->suspended = false; + GNUNET_CONTAINER_DLL_insert (gorc_head, + gorc_tail, + gorc); MHD_resume_connection (gorc->sc.con); TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ } @@ -615,6 +651,11 @@ gorc_cleanup (void *cls) if (NULL != gorc->wire_reports) json_decref (gorc->wire_reports); GNUNET_assert (NULL == gorc->tt); + if (NULL != gorc->eh) + { + TMH_db->event_listen_cancel (gorc->eh); + gorc->eh = NULL; + } GNUNET_free (gorc); } @@ -772,6 +813,20 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, gorc->session_id = MHD_lookup_connection_value (connection, MHD_GET_ARGUMENT_KIND, "session_id"); + /* process 'transfer' argument */ + { + const char *transfer_s; + + transfer_s = MHD_lookup_connection_value (connection, + MHD_GET_ARGUMENT_KIND, + "transfer"); + if ( (NULL != transfer_s) && + (0 == strcasecmp (transfer_s, + "yes")) ) + gorc->transfer_status_requested = true; + } + + /* process 'timeout_ms' argument */ { const char *long_poll_timeout_s; @@ -780,11 +835,14 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, "timeout_ms"); if (NULL != long_poll_timeout_s) { - unsigned long long timeout; + unsigned int timeout_ms; + char dummy; + struct GNUNET_TIME_Relative timeout; if (1 != sscanf (long_poll_timeout_s, - "%llu", - &timeout)) + "%u%c", + &timeout_ms, + &dummy)) { GNUNET_break_op (0); return TALER_MHD_reply_with_error (connection, @@ -792,10 +850,27 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, TALER_EC_GENERIC_PARAMETER_MALFORMED, "timeout_ms must be non-negative number"); } + timeout = GNUNET_TIME_relative_multiply ( + GNUNET_TIME_UNIT_MILLISECONDS, + timeout_ms); gorc->sc.long_poll_timeout - = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply ( - GNUNET_TIME_UNIT_MILLISECONDS, - timeout)); + = GNUNET_TIME_relative_to_absolute (timeout); + if (! GNUNET_TIME_relative_is_zero (timeout)) + { + struct TMH_OrderPayEvent pay_eh = { + .header.size = htons (sizeof (pay_eh)), + .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_PAID) + }; + + GNUNET_CRYPTO_hash (hc->infix, + strlen (hc->infix), + &pay_eh.h_order_id); + gorc->eh = TMH_db->event_listen (TMH_db->cls, + &pay_eh.header, + timeout, + &resume_by_event, + gorc); + } } else { @@ -803,18 +878,8 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, } } - { - const char *transfer_s; - transfer_s = MHD_lookup_connection_value (connection, - MHD_GET_ARGUMENT_KIND, - "transfer"); - if ( (NULL != transfer_s) && - (0 == strcasecmp (transfer_s, - "yes")) ) - gorc->transfer_status_requested = true; - } - } + } /* end first-time per-request initialization */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting GET /private/orders/%s processing with timeout %s\n", @@ -1073,6 +1138,7 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Suspending GET /private/orders/%s\n", hc->infix); + gorc->suspended = true; TMH_long_poll_suspend (hc->infix, gorc->session_id, gorc->fulfillment_url, @@ -1116,6 +1182,7 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, gorc->tt = GNUNET_SCHEDULER_add_delayed (EXCHANGE_TIMEOUT, &exchange_timeout_cb, gorc); + gorc->suspended = true; MHD_suspend_connection (connection); return MHD_YES; } @@ -1127,6 +1194,7 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Suspending GET /private/orders/%s\n", hc->infix); + gorc->suspended = true; TMH_long_poll_suspend (hc->infix, gorc->session_id, gorc->fulfillment_url, -- cgit v1.2.3