From 3a01b8aba03d16220c94a85b5074717291225fba Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Tue, 24 Aug 2021 15:45:55 +0200 Subject: -begin with new backend event subscription logic --- src/backend/taler-merchant-httpd.h | 44 ++++++ src/backend/taler-merchant-httpd_get-orders-ID.c | 148 ++++++++++++++++----- .../taler-merchant-httpd_private-get-orders-ID.c | 112 +++++++++++++--- 3 files changed, 251 insertions(+), 53 deletions(-) (limited to 'src') diff --git a/src/backend/taler-merchant-httpd.h b/src/backend/taler-merchant-httpd.h index 474957aa..06f29e7a 100644 --- a/src/backend/taler-merchant-httpd.h +++ b/src/backend/taler-merchant-httpd.h @@ -155,6 +155,50 @@ struct TMH_MerchantInstance }; +/** + * Event triggered when an order is paid. + */ +struct TMH_OrderPayEvent +{ + /** + * Type is #TALER_DBEVENT_MERCHANT_ORDER_PAID + */ + struct GNUNET_DB_EventHeaderP header; + + /** + * Always zero (for alignment). + */ + uint32_t reserved; + + /** + * Hash of the order ID. + */ + struct GNUNET_HashCode h_order_id; +}; + + +/** + * Event triggered when an order's refund is increased. + */ +struct TMH_OrderRefundEvent +{ + /** + * Type is #TALER_DBEVENT_MERCHANT_ORDER_REFUND + */ + struct GNUNET_DB_EventHeaderP header; + + /** + * Always zero (for alignment). + */ + uint32_t reserved; + + /** + * Hash of the order ID. + */ + struct GNUNET_HashCode h_order_id; +}; + + /** * @brief Struct describing an URL and the handler for it. * diff --git a/src/backend/taler-merchant-httpd_get-orders-ID.c b/src/backend/taler-merchant-httpd_get-orders-ID.c index 957e32c9..72b96eb8 100644 --- a/src/backend/taler-merchant-httpd_get-orders-ID.c +++ b/src/backend/taler-merchant-httpd_get-orders-ID.c @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include "taler-merchant-httpd_exchanges.h" @@ -74,6 +75,11 @@ struct GetOrderData */ struct TMH_SuspendedConnection sc; + /** + * Database event we are waiting on to be resuming. + */ + struct GNUNET_DB_EventHandler *eh; + /** * Which merchant instance is this for? */ @@ -175,10 +181,39 @@ TMH_force_wallet_get_order_resume (void) GNUNET_assert (god->suspended); god->suspended = false; MHD_resume_connection (god->sc.con); + TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ } } +/** + * We have received a trigger from the database + * that we should (possibly) resume the request. + * + * @param cls a `struct GetOrderData` to resume + * @param extra string encoding refund amount (or NULL) + * @param extra_size number of bytes in @a extra + */ +static void +resume_by_event (void *cls, + const void *extra, + size_t extra_size) +{ + struct GetOrderData *god = cls; + + (void) extra; + (void) extra_size; + if (! god->suspended) + return; /* duplicate event is possible */ + god->suspended = false; + GNUNET_CONTAINER_DLL_insert (god_head, + god_tail, + god); + MHD_resume_connection (god->sc.con); + TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ +} + + /** * Suspend this @a god until the trigger is satisfied. * @@ -691,6 +726,11 @@ god_cleanup (void *cls) json_decref (god->contract_terms); god->contract_terms = NULL; } + if (NULL != god->eh) + { + TMH_db->event_listen_cancel (god->eh); + god->eh = NULL; + } GNUNET_free (god); } @@ -770,37 +810,8 @@ TMH_get_orders_ID (const struct TMH_RequestHandler *rh, god->session_id = MHD_lookup_connection_value (connection, MHD_GET_ARGUMENT_KIND, "session_id"); - { - const char *long_poll_timeout_ms; - - long_poll_timeout_ms = MHD_lookup_connection_value (connection, - MHD_GET_ARGUMENT_KIND, - "timeout_ms"); - if (NULL != long_poll_timeout_ms) - { - unsigned int timeout; - char dummy; - - if (1 != sscanf (long_poll_timeout_ms, - "%u%c", - &timeout, - &dummy)) - { - GNUNET_break_op (0); - return TALER_MHD_reply_with_error (connection, - MHD_HTTP_BAD_REQUEST, - TALER_EC_GENERIC_PARAMETER_MALFORMED, - "timeout_ms (must be non-negative number)"); - } - /* If HTML is requested, we never long poll. Makes no sense */ - if (! god->generate_html) - god->sc.long_poll_timeout - = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply ( - GNUNET_TIME_UNIT_MILLISECONDS, - timeout)); - } - } + /* process await_refund_obtained argument */ { const char *await_refund_obtained_s; @@ -810,7 +821,8 @@ TMH_get_orders_ID (const struct TMH_RequestHandler *rh, "await_refund_obtained"); god->sc.awaiting_refund_obtained = (NULL != await_refund_obtained_s) - ? 0 == strcasecmp (await_refund_obtained_s, "yes") + ? 0 == strcasecmp (await_refund_obtained_s, + "yes") : false; } @@ -837,6 +849,80 @@ TMH_get_orders_ID (const struct TMH_RequestHandler *rh, god->sc.awaiting_refund = true; } } + + + /* process timeout_ms argument */ + { + const char *long_poll_timeout_ms; + + long_poll_timeout_ms = MHD_lookup_connection_value (connection, + MHD_GET_ARGUMENT_KIND, + "timeout_ms"); + if (NULL != long_poll_timeout_ms) + { + unsigned int timeout_ms; + char dummy; + + if (1 != sscanf (long_poll_timeout_ms, + "%u%c", + &timeout_ms, + &dummy)) + { + GNUNET_break_op (0); + return TALER_MHD_reply_with_error (connection, + MHD_HTTP_BAD_REQUEST, + TALER_EC_GENERIC_PARAMETER_MALFORMED, + "timeout_ms (must be non-negative number)"); + } + /* If HTML is requested, we never long poll. Makes no sense */ + if (! god->generate_html) + { + struct GNUNET_TIME_Relative timeout; + + timeout = GNUNET_TIME_relative_multiply ( + GNUNET_TIME_UNIT_MILLISECONDS, + timeout_ms); + god->sc.long_poll_timeout + = GNUNET_TIME_relative_to_absolute (timeout); + if (! GNUNET_TIME_relative_is_zero (timeout)) + { + if (god->sc.awaiting_refund) + { + struct TMH_OrderPayEvent refund_eh = { + .header.size = htons (sizeof (refund_eh)), + .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_REFUND) + }; + + GNUNET_CRYPTO_hash (god->order_id, + strlen (god->order_id), + &refund_eh.h_order_id); + god->eh = TMH_db->event_listen (TMH_db->cls, + &refund_eh.header, + timeout, + &resume_by_event, + god); + } + else /* ! waiting for refund */ + { + struct TMH_OrderPayEvent pay_eh = { + .header.size = htons (sizeof (pay_eh)), + .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_PAID) + }; + + GNUNET_CRYPTO_hash (god->order_id, + strlen (god->order_id), + &pay_eh.h_order_id); + god->eh = TMH_db->event_listen (TMH_db->cls, + &pay_eh.header, + timeout, + &resume_by_event, + god); + } + } /* end of timeout non-zero */ + } /* end of HTML generation NOT requested */ + } /* end of timeout_ms argument provided */ + } /* end of timeout_ms argument handling */ + } /* end of first-time initialization / sanity checks */ if (god->suspended) diff --git a/src/backend/taler-merchant-httpd_private-get-orders-ID.c b/src/backend/taler-merchant-httpd_private-get-orders-ID.c index e11ce9e9..6bba091b 100644 --- a/src/backend/taler-merchant-httpd_private-get-orders-ID.c +++ b/src/backend/taler-merchant-httpd_private-get-orders-ID.c @@ -23,6 +23,7 @@ #include "taler-merchant-httpd_private-get-orders-ID.h" #include "taler-merchant-httpd_get-orders-ID.h" #include +#include #include "taler-merchant-httpd_mhd.h" #include "taler-merchant-httpd_exchanges.h" #include "taler-merchant-httpd_helper.h" @@ -141,11 +142,6 @@ struct GetOrderRequestContext */ struct GetOrderRequestContext *prev; - /** - * Handle to the exchange, only valid while the @e fo succeeds. - */ - struct TALER_EXCHANGE_Handle *eh; - /** * Head of DLL of individual queries for transfer data. */ @@ -161,6 +157,11 @@ struct GetOrderRequestContext */ struct GNUNET_SCHEDULER_Task *tt; + /** + * Database event we are waiting on to be resuming. + */ + struct GNUNET_DB_EventHandler *eh; + /** * Contract terms of the payment we are checking. NULL when they * are not (yet) known. @@ -245,6 +246,11 @@ struct GetOrderRequestContext */ unsigned int wire_hc; + /** + * Set to true if this request is currently suspended. + */ + bool suspended; + /** * Set to true if this payment has been refunded and * @e refund_amount is initialized. @@ -292,6 +298,7 @@ gorc_resume (struct GetOrderRequestContext *gorc, { struct TransferQuery *tq; + GNUNET_assert (gorc->suspended); if (NULL != gorc->tt) { GNUNET_SCHEDULER_cancel (gorc->tt); @@ -315,6 +322,35 @@ gorc_resume (struct GetOrderRequestContext *gorc, GNUNET_CONTAINER_DLL_remove (gorc_head, gorc_tail, gorc); + gorc->suspended = false; + MHD_resume_connection (gorc->sc.con); + TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ +} + + +/** + * We have received a trigger from the database + * that we should (possibly) resume the request. + * + * @param cls a `struct GetOrderRequestContext` to resume + * @param extra string encoding refund amount (or NULL) + * @param extra_size number of bytes in @a extra + */ +static void +resume_by_event (void *cls, + const void *extra, + size_t extra_size) +{ + struct GetOrderRequestContext *gorc = cls; + + (void) extra; + (void) extra_size; + if (! gorc->suspended) + return; /* duplicate event is possible */ + gorc->suspended = false; + GNUNET_CONTAINER_DLL_insert (gorc_head, + gorc_tail, + gorc); MHD_resume_connection (gorc->sc.con); TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ } @@ -615,6 +651,11 @@ gorc_cleanup (void *cls) if (NULL != gorc->wire_reports) json_decref (gorc->wire_reports); GNUNET_assert (NULL == gorc->tt); + if (NULL != gorc->eh) + { + TMH_db->event_listen_cancel (gorc->eh); + gorc->eh = NULL; + } GNUNET_free (gorc); } @@ -772,6 +813,20 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, gorc->session_id = MHD_lookup_connection_value (connection, MHD_GET_ARGUMENT_KIND, "session_id"); + /* process 'transfer' argument */ + { + const char *transfer_s; + + transfer_s = MHD_lookup_connection_value (connection, + MHD_GET_ARGUMENT_KIND, + "transfer"); + if ( (NULL != transfer_s) && + (0 == strcasecmp (transfer_s, + "yes")) ) + gorc->transfer_status_requested = true; + } + + /* process 'timeout_ms' argument */ { const char *long_poll_timeout_s; @@ -780,11 +835,14 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, "timeout_ms"); if (NULL != long_poll_timeout_s) { - unsigned long long timeout; + unsigned int timeout_ms; + char dummy; + struct GNUNET_TIME_Relative timeout; if (1 != sscanf (long_poll_timeout_s, - "%llu", - &timeout)) + "%u%c", + &timeout_ms, + &dummy)) { GNUNET_break_op (0); return TALER_MHD_reply_with_error (connection, @@ -792,10 +850,27 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, TALER_EC_GENERIC_PARAMETER_MALFORMED, "timeout_ms must be non-negative number"); } + timeout = GNUNET_TIME_relative_multiply ( + GNUNET_TIME_UNIT_MILLISECONDS, + timeout_ms); gorc->sc.long_poll_timeout - = GNUNET_TIME_relative_to_absolute (GNUNET_TIME_relative_multiply ( - GNUNET_TIME_UNIT_MILLISECONDS, - timeout)); + = GNUNET_TIME_relative_to_absolute (timeout); + if (! GNUNET_TIME_relative_is_zero (timeout)) + { + struct TMH_OrderPayEvent pay_eh = { + .header.size = htons (sizeof (pay_eh)), + .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_PAID) + }; + + GNUNET_CRYPTO_hash (hc->infix, + strlen (hc->infix), + &pay_eh.h_order_id); + gorc->eh = TMH_db->event_listen (TMH_db->cls, + &pay_eh.header, + timeout, + &resume_by_event, + gorc); + } } else { @@ -803,18 +878,8 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, } } - { - const char *transfer_s; - transfer_s = MHD_lookup_connection_value (connection, - MHD_GET_ARGUMENT_KIND, - "transfer"); - if ( (NULL != transfer_s) && - (0 == strcasecmp (transfer_s, - "yes")) ) - gorc->transfer_status_requested = true; - } - } + } /* end first-time per-request initialization */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting GET /private/orders/%s processing with timeout %s\n", @@ -1073,6 +1138,7 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Suspending GET /private/orders/%s\n", hc->infix); + gorc->suspended = true; TMH_long_poll_suspend (hc->infix, gorc->session_id, gorc->fulfillment_url, @@ -1116,6 +1182,7 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, gorc->tt = GNUNET_SCHEDULER_add_delayed (EXCHANGE_TIMEOUT, &exchange_timeout_cb, gorc); + gorc->suspended = true; MHD_suspend_connection (connection); return MHD_YES; } @@ -1127,6 +1194,7 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Suspending GET /private/orders/%s\n", hc->infix); + gorc->suspended = true; TMH_long_poll_suspend (hc->infix, gorc->session_id, gorc->fulfillment_url, -- cgit v1.2.3