From 696278ce80c7613e39c24e138dd6c99116080adb Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 25 Aug 2021 17:23:35 +0200 Subject: complete implementation of #6956: long polling triggers via database backend --- src/backend/taler-merchant-httpd.c | 474 +-------------------- src/backend/taler-merchant-httpd.h | 195 +++++---- src/backend/taler-merchant-httpd_get-orders-ID.c | 154 +++++-- .../taler-merchant-httpd_post-orders-ID-paid.c | 27 +- .../taler-merchant-httpd_post-orders-ID-pay.c | 44 +- .../taler-merchant-httpd_post-orders-ID-refund.c | 43 +- ...er-merchant-httpd_private-delete-instances-ID.c | 8 +- .../taler-merchant-httpd_private-get-orders-ID.c | 72 ++-- .../taler-merchant-httpd_private-get-orders.c | 153 +++++-- .../taler-merchant-httpd_private-get-orders.h | 10 +- ...-merchant-httpd_private-post-orders-ID-refund.c | 38 +- src/backenddb/plugin_merchantdb_postgres.c | 69 ++- src/include/taler_merchantdb_plugin.h | 19 + src/testing/testing_api_cmd_merchant_get_order.c | 14 +- src/testing/testing_api_cmd_wallet_get_order.c | 9 + 15 files changed, 578 insertions(+), 751 deletions(-) (limited to 'src') diff --git a/src/backend/taler-merchant-httpd.c b/src/backend/taler-merchant-httpd.c index d48c2f92..f95beb3e 100644 --- a/src/backend/taler-merchant-httpd.c +++ b/src/backend/taler-merchant-httpd.c @@ -136,23 +136,6 @@ static int merchant_connection_close; */ static int result; -/** - * MIN-Heap of suspended connections to resume when the timeout expires, - * ordered by timeout. Values are of type `struct MHD_Connection` - */ -static struct GNUNET_CONTAINER_Heap *resume_timeout_heap; - -/** - * Hash map from H(order_id,merchant_pub) to `struct MHD_Connection` - * entries to resume when a payment is made for the given order. - */ -static struct GNUNET_CONTAINER_MultiHashMap *payment_trigger_map; - -/** - * Task responsible for timeouts in the #resume_timeout_heap. - */ -static struct GNUNET_SCHEDULER_Task *resume_timeout_task; - /** * Our configuration. */ @@ -164,24 +147,6 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; char *TMH_default_auth; -/** - * Holds data needed to determine when to resume a connection for - * GET /orders/$ORDER_ID - */ -struct ResumeData -{ - /** - * How much of the order has been refunded. - */ - const struct TALER_Amount *refund_amount; - - /** - * Whether the refunds for the order were obtained. - */ - bool obtained; -}; - - int TMH_check_auth (const char *token, const struct GNUNET_ShortHashCode *salt, @@ -291,393 +256,6 @@ TMH_instance_free_cb (void *cls, } -/** - * Callback that frees all the elements in the #payment_trigger_map. - * This function should actually never be called, as by the time we - * get to it, all payment triggers should have been cleaned up! - * - * @param cls closure, NULL - * @param key current key - * @param value a `struct TMH_SuspendedConnection` - * @return #GNUNET_OK - */ -static int -payment_trigger_free (void *cls, - const struct GNUNET_HashCode *key, - void *value) -{ - struct TMH_SuspendedConnection *sc = value; - - (void) cls; - (void) key; - (void) sc; /* cannot really 'clean up' */ - GNUNET_break (0); - return GNUNET_OK; -} - - -/** - * Compute @a key to use for @a order_id and @a mpub in our - * #payment_trigger_map. - * - * @param order_id an order ID - * @param mpub an instance public key - * @param[out] key set to the hash map key to use - */ -static void -compute_pay_key (const char *order_id, - const struct TALER_MerchantPublicKeyP *mpub, - struct GNUNET_HashCode *key) -{ - size_t olen = strlen (order_id); - char buf[sizeof (*mpub) + olen]; - - /* sanity check for arithmetic overflow */ - GNUNET_assert (olen < 1024 * 1024); - memcpy (buf, - mpub, - sizeof (*mpub)); - memcpy (&buf[sizeof (*mpub)], - order_id, - olen); - GNUNET_CRYPTO_hash (buf, - sizeof (buf), - key); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Pay key for `%s' is %s\n", - order_id, - GNUNET_h2s (key)); -} - - -/** - * Compute @a key to use for @a session_id and @a fulfillment_url in our - * #payment_trigger_map. - * - * @param session_id the session for which @a fulfillment_url matters - * @param fulfillment_url fullfillment URL of an order - * @param[out] key set to the hash map key to use - */ -static void -compute_pay_key2 (const char *session_id, - const char *fulfillment_url, - struct GNUNET_HashCode *key) -{ - size_t slen = strlen (session_id) + 1; - size_t ulen = strlen (fulfillment_url) + 1; - char buf[slen + ulen]; - - memcpy (buf, - session_id, - slen); - memcpy (&buf[slen], - fulfillment_url, - ulen); - GNUNET_CRYPTO_hash (buf, - sizeof (buf), - key); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Pay key2 for %s/%s is %s\n", - session_id, - fulfillment_url, - GNUNET_h2s (key)); -} - - -/** - * Resume processing all suspended connections past timeout. - * - * @param cls unused - */ -static void -do_resume (void *cls) -{ - struct TMH_SuspendedConnection *sc; - - (void) cls; - resume_timeout_task = NULL; - while (1) - { - sc = GNUNET_CONTAINER_heap_peek (resume_timeout_heap); - if (NULL == sc) - return; - if (GNUNET_TIME_absolute_is_future (sc->long_poll_timeout)) - break; - GNUNET_assert (sc == - GNUNET_CONTAINER_heap_remove_root (resume_timeout_heap)); - sc->hn = NULL; - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (payment_trigger_map, - &sc->key, - sc)); - if (sc->has_key2) - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (payment_trigger_map, - &sc->key2, - sc)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Resuming long polled job due to timeout\n"); - MHD_resume_connection (sc->con); - TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ - } - if (NULL != resume_timeout_task) - GNUNET_SCHEDULER_cancel (resume_timeout_task); - resume_timeout_task = GNUNET_SCHEDULER_add_at (sc->long_poll_timeout, - &do_resume, - NULL); -} - - -/** - * Suspend connection from @a sc until payment has been received. - * - * @param order_id the order that we are waiting on - * @param session_id session ID of the requester - * @param fulfillment_url fulfillment URL of the contract - * @param mi the merchant instance we are waiting on - * @param sc connection to suspend - * @param min_refund refund amount we are waiting on to be exceeded before resuming, - * NULL if we are not waiting for refunds - */ -void -TMH_long_poll_suspend (const char *order_id, - const char *session_id, - const char *fulfillment_url, - const struct TMH_MerchantInstance *mi, - struct TMH_SuspendedConnection *sc, - const struct TALER_Amount *min_refund) -{ - compute_pay_key (order_id, - &mi->merchant_pub, - &sc->key); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Suspending operation on key %s\n", - GNUNET_h2s (&sc->key)); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (payment_trigger_map, - &sc->key, - sc, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - if ( (NULL != session_id) && - (NULL != fulfillment_url) ) - { - sc->has_key2 = true; - compute_pay_key2 (session_id, - fulfillment_url, - &sc->key2); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Suspending operation on %s/%s key2 %s\n", - session_id, - fulfillment_url, - GNUNET_h2s (&sc->key2)); - GNUNET_assert (GNUNET_OK == - GNUNET_CONTAINER_multihashmap_put (payment_trigger_map, - &sc->key2, - sc, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_MULTIPLE)); - } - if (NULL != min_refund) - { - sc->awaiting_refund = true; - sc->refund_expected = *min_refund; - } - sc->hn = GNUNET_CONTAINER_heap_insert (resume_timeout_heap, - sc, - sc->long_poll_timeout.abs_value_us); - MHD_suspend_connection (sc->con); - if (NULL != resume_timeout_task) - { - GNUNET_SCHEDULER_cancel (resume_timeout_task); - resume_timeout_task = NULL; - } - sc = GNUNET_CONTAINER_heap_peek (resume_timeout_heap); - resume_timeout_task = GNUNET_SCHEDULER_add_at (sc->long_poll_timeout, - &do_resume, - NULL); -} - - -/** - * Function called to resume suspended connections. - * - * @param cls pointer to a `struct TALER_Amount` indicating the refund amount, or NULL - * @param key key in the #payment_trigger_map - * @param value a `struct TMH_SuspendedConnection` to resume - * @return #GNUNET_OK (continue to iterate) - */ -static int -resume_operation (void *cls, - const struct GNUNET_HashCode *key, - void *value) -{ - const struct ResumeData *rd = cls; - struct TMH_SuspendedConnection *sc = value; - - GNUNET_assert (0 == - GNUNET_memcmp (key, - &sc->key)); - /* If the conditions are satisfied partially, turn them off for future - calls. */ - if ( (sc->awaiting_refund_obtained) && - (rd->obtained)) - sc->awaiting_refund_obtained = false; - if ( (sc->awaiting_refund) && - ( (NULL != rd->refund_amount) && - (1 == TALER_amount_cmp (rd->refund_amount, - &sc->refund_expected)) ) ) - sc->awaiting_refund = false; - - if ( (sc->awaiting_refund_obtained) && - (! rd->obtained)) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not awaking client, refunds not yet obtained\n"); - return GNUNET_OK; - } - if ( (sc->awaiting_refund) && - ( (NULL == rd->refund_amount) || - (1 != TALER_amount_cmp (rd->refund_amount, - &sc->refund_expected)) ) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Not awaking client, refund amount of %s not yet satisfied\n", - TALER_amount2s (&sc->refund_expected)); - return GNUNET_OK; /* skip */ - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Resuming operation suspended pending payment on key %s\n", - GNUNET_h2s (key)); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (payment_trigger_map, - key, - sc)); - if (sc->has_key2) - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (payment_trigger_map, - &sc->key2, - sc)); - GNUNET_assert (sc == - GNUNET_CONTAINER_heap_remove_node (sc->hn)); - sc->hn = NULL; - MHD_resume_connection (sc->con); - TALER_MHD_daemon_trigger (); - return GNUNET_OK; -} - - -/** - * Find out if we have any clients long-polling for @a order_id to be - * confirmed at merchant @a mpub, and if so, tell them to resume. - * - * @param order_id the order that was paid or refunded - * @param mi the merchant instance where the payment or refund happened - * @param refund_amount refunded amount, if the trigger was a refund, otherwise NULL - * @param obtained if true, the wallet has obtained the refunds for the order - */ -void -TMH_long_poll_resume (const char *order_id, - const struct TMH_MerchantInstance *mi, - const struct TALER_Amount *refund_amount, - bool obtained) -{ - struct GNUNET_HashCode key; - struct ResumeData rd = { - .refund_amount = refund_amount, - .obtained = obtained - }; - int ret; - - compute_pay_key (order_id, - &mi->merchant_pub, - &key); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Resuming operations suspended pending payment on key %s up to refund %s\n", - GNUNET_h2s (&key), - (NULL != refund_amount) - ? TALER_amount2s (refund_amount) - : ""); - ret = GNUNET_CONTAINER_multihashmap_get_multiple (payment_trigger_map, - &key, - &resume_operation, - (void *) &rd); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u operations remain suspended pending payment (%d)\n", - GNUNET_CONTAINER_multihashmap_size (payment_trigger_map), - ret); -} - - -/** - * Function called to resume suspended connections. - * - * @param cls NULL - * @param key key in the #payment_trigger_map - * @param value a `struct TMH_SuspendedConnection` to resume - * @return #GNUNET_OK (continue to iterate) - */ -static int -resume_operation2 (void *cls, - const struct GNUNET_HashCode *key, - void *value) -{ - struct TMH_SuspendedConnection *sc = value; - - GNUNET_assert (sc->has_key2); - GNUNET_assert (0 == GNUNET_memcmp (key, - &sc->key2)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Resuming operation suspended pending payment on key %s\n", - GNUNET_h2s (key)); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (payment_trigger_map, - &sc->key, - sc)); - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (payment_trigger_map, - &sc->key2, - sc)); - GNUNET_assert (sc == - GNUNET_CONTAINER_heap_remove_node (sc->hn)); - sc->hn = NULL; - MHD_resume_connection (sc->con); - TALER_MHD_daemon_trigger (); - return GNUNET_OK; -} - - -/** - * Find out if we have any clients long-polling for @a order_id to be - * confirmed at merchant @a mpub, and if so, tell them to resume. - * - * @param session_id the session for which @a fulfillment_url became paid - * @param fulfillment_url fullfillment URL of which an order was paid - */ -void -TMH_long_poll_resume2 (const char *session_id, - const char *fulfillment_url) -{ - struct GNUNET_HashCode key; - int ret; - - compute_pay_key2 (session_id, - fulfillment_url, - &key); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Resuming operations suspended pending payment on %s/%s with key2 %s\n", - session_id, - fulfillment_url, - GNUNET_h2s (&key)); - ret = GNUNET_CONTAINER_multihashmap_get_multiple (payment_trigger_map, - &key, - &resume_operation2, - NULL); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "%u operations remain suspended pending payment (%d)\n", - GNUNET_CONTAINER_multihashmap_size (payment_trigger_map), - ret); -} - - /** * Shutdown task (magically invoked when the application is being * quit) @@ -687,9 +265,6 @@ TMH_long_poll_resume2 (const char *session_id, static void do_shutdown (void *cls) { - struct TMH_SuspendedConnection *sc; - struct MHD_Daemon *mhd; - (void) cls; TMH_force_ac_resume (); TMH_force_pc_resume (); @@ -698,27 +273,12 @@ do_shutdown (void *cls) TMH_force_tip_pickup_resume (); TMH_force_wallet_get_order_resume (); TMH_force_wallet_refund_order_resume (); - mhd = TALER_MHD_daemon_stop (); - /* resume all suspended connections, must be done before stopping #mhd */ - if (NULL != resume_timeout_heap) { - while (NULL != (sc = GNUNET_CONTAINER_heap_remove_root ( - resume_timeout_heap))) - { - sc->hn = NULL; - GNUNET_assert (GNUNET_YES == - GNUNET_CONTAINER_multihashmap_remove (payment_trigger_map, - &sc->key, - sc)); - MHD_resume_connection (sc->con); - } - GNUNET_CONTAINER_heap_destroy (resume_timeout_heap); - resume_timeout_heap = NULL; - } - if (NULL != mhd) - { - MHD_stop_daemon (mhd); - mhd = NULL; + struct MHD_Daemon *mhd; + + mhd = TALER_MHD_daemon_stop (); + if (NULL != mhd) + MHD_stop_daemon (mhd); } TMH_RESERVES_done (); if (NULL != instance_eh) @@ -733,19 +293,6 @@ do_shutdown (void *cls) } TMH_EXCHANGES_done (); TMH_AUDITORS_done (); - if (NULL != resume_timeout_task) - { - GNUNET_SCHEDULER_cancel (resume_timeout_task); - resume_timeout_task = NULL; - } - if (NULL != payment_trigger_map) - { - GNUNET_CONTAINER_multihashmap_iterate (payment_trigger_map, - &payment_trigger_free, - NULL); - GNUNET_CONTAINER_multihashmap_destroy (payment_trigger_map); - payment_trigger_map = NULL; - } if (NULL != TMH_by_id_map) { GNUNET_CONTAINER_multihashmap_iterate (TMH_by_id_map, @@ -2019,7 +1566,7 @@ load_instances (void *cls, ( (0 == extra_len) || ('\0' != id[extra_len - 1]) ) ) { - GNUNET_break (0); /* bogus notification */ + GNUNET_break (0 == extra_len); extra = NULL; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -2073,7 +1620,7 @@ TMH_reload_instances (const char *id) { struct GNUNET_DB_EventHeaderP es = { es.size = ntohs (sizeof (es)), - es.type = ntohs (sizeof (TALER_DBEVENT_MERCHANT_INSTANCE_SETTINGS)) + es.type = ntohs (TALER_DBEVENT_MERCHANT_INSTANCE_SETTINGS) }; GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -2127,11 +1674,6 @@ run (void *cls, result = GNUNET_SYSERR; GNUNET_SCHEDULER_add_shutdown (&do_shutdown, NULL); - resume_timeout_heap - = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); - payment_trigger_map - = GNUNET_CONTAINER_multihashmap_create (16, - GNUNET_YES); if (GNUNET_OK != TALER_config_get_currency (cfg, &TMH_currency)) @@ -2209,7 +1751,7 @@ run (void *cls, { struct GNUNET_DB_EventHeaderP es = { es.size = ntohs (sizeof (es)), - es.type = ntohs (sizeof (TALER_DBEVENT_MERCHANT_INSTANCE_SETTINGS)) + es.type = ntohs (TALER_DBEVENT_MERCHANT_INSTANCE_SETTINGS) }; instance_eh = TMH_db->event_listen (TMH_db->cls, diff --git a/src/backend/taler-merchant-httpd.h b/src/backend/taler-merchant-httpd.h index 6e8362a4..48547e84 100644 --- a/src/backend/taler-merchant-httpd.h +++ b/src/backend/taler-merchant-httpd.h @@ -122,6 +122,12 @@ struct TMH_MerchantInstance */ struct TMH_PendingOrder *po_tail; + /** + * Database event we are waiting on to be resuming + * long-polling requests from the @e po_head. + */ + struct GNUNET_DB_EventHandler *po_eh; + /** * Merchant's private key. */ @@ -155,10 +161,13 @@ struct TMH_MerchantInstance }; +GNUNET_NETWORK_STRUCT_BEGIN + + /** * Event triggered when an order is paid. */ -struct TMH_OrderPayEvent +struct TMH_OrderPayEventP { /** * Type is #TALER_DBEVENT_MERCHANT_ORDER_PAID @@ -168,7 +177,12 @@ struct TMH_OrderPayEvent /** * Always zero (for alignment). */ - uint32_t reserved; + uint32_t reserved GNUNET_PACKED; + + /** + * Merchant's public key + */ + struct TALER_MerchantPublicKeyP merchant_pub; /** * Hash of the order ID. @@ -181,7 +195,7 @@ struct TMH_OrderPayEvent * Event triggered when a fulfillment URL is * bound to a session (as paid). */ -struct TMH_SessionEvent +struct TMH_SessionEventP { /** * Type is #TALER_DBEVENT_MERCHANT_SESSION_CAPTURED @@ -191,7 +205,12 @@ struct TMH_SessionEvent /** * Always zero (for alignment). */ - uint32_t reserved; + uint32_t reserved GNUNET_PACKED; + + /** + * Merchant's public key + */ + struct TALER_MerchantPublicKeyP merchant_pub; /** * Hash of the fulfillment URL. @@ -206,19 +225,28 @@ struct TMH_SessionEvent /** - * Event triggered when an order's refund is increased. + * Event triggered when an order's refund is increased + * or obtained by the respective wallet. + * + * Extra arguments are the amount (as a string). */ -struct TMH_OrderRefundEvent +struct TMH_OrderRefundEventP { /** - * Type is #TALER_DBEVENT_MERCHANT_ORDER_REFUND + * Type is #TALER_DBEVENT_MERCHANT_ORDER_REFUND or + * #TALER_DBEVENT_MERCHANT_REFUND_OBTAINED */ struct GNUNET_DB_EventHeaderP header; /** * Always zero (for alignment). */ - uint32_t reserved; + uint32_t reserved GNUNET_PACKED; + + /** + * Merchant's public key + */ + struct TALER_MerchantPublicKeyP merchant_pub; /** * Hash of the order ID. @@ -227,6 +255,86 @@ struct TMH_OrderRefundEvent }; +/** + * Possible flags indicating the state of an order. + */ +enum TMH_OrderStateFlags +{ + TMH_OSF_NONE = 0, + + /** + * Not yet used. + */ + TMH_OSF_CLAIMED = 1, + + /** + * Customer paid the order. + */ + TMH_OSF_PAID = 2, + + /** + * Merchant granted (possibly partial) refund. + */ + TMH_OSF_REFUNDED = 4, + + /** + * Merchant received the payment from the exchange. + */ + TMH_OSF_WIRED = 8 +}; + + +/** + * Extra information passed for a + * #TALER_DBEVENT_MERCHANT_ORDERS_CHANGE. + */ +struct TMH_OrderChangeEventDetailsP +{ + /** + * Order ID, in NBO. + */ + uint64_t order_serial_id GNUNET_PACKED; + + /** + * Execution date of the order. + */ + struct GNUNET_TIME_AbsoluteNBO execution_date; + + /** + * See `enum TMH_OrderStateFlags`. In NBO. + */ + uint32_t order_state GNUNET_PACKED; + +}; + + +/** + * Event triggered when an order's refund is increased + * or obtained by the respective wallet. + * + * Extra arguments are the amount (as a string). + */ +struct TMH_OrderChangeEventP +{ + /** + * Type is #TALER_DBEVENT_MERCHANT_ORDERS_CHANGE. + */ + struct GNUNET_DB_EventHeaderP header; + + /** + * Always zero (for alignment). + */ + uint32_t reserved GNUNET_PACKED; + + /** + * Merchant's public key + */ + struct TALER_MerchantPublicKeyP merchant_pub; +}; + + +GNUNET_NETWORK_STRUCT_END + /** * @brief Struct describing an URL and the handler for it. * @@ -442,25 +550,6 @@ struct TMH_SuspendedConnection */ struct MHD_Connection *con; - /** - * Associated heap node. Used internally by #TMH_long_poll_suspend() - * and TMH_long_poll_resume(). - */ - struct GNUNET_CONTAINER_HeapNode *hn; - - /** - * Key of this entry in the #payment_trigger_map. Used internally by - * #TMH_long_poll_suspend() and TMH_long_poll_resume(). - */ - struct GNUNET_HashCode key; - - /** - * Optional session/fulfillment URI-based key - * of this entry in the #payment_trigger_map. Used internally by - * TMH_long_poll_resume2(). - */ - struct GNUNET_HashCode key2; - /** * At what time does this request expire? If set in the future, we * may wait this long for a payment to arrive before responding. @@ -482,10 +571,6 @@ struct TMH_SuspendedConnection */ bool awaiting_refund_obtained; - /** - * True if @a key2 is set. - */ - bool has_key2; }; @@ -524,54 +609,6 @@ extern struct GNUNET_TIME_Relative TMH_legal_expiration; extern char *TMH_default_auth; -/** - * Suspend connection from @a sc until payment has been received. - * - * @param order_id the order that we are waiting on - * @param session_id session ID of the requester - * @param fulfillment_url fulfillment URL of the contract - * @param mi the merchant instance we are waiting on - * @param sc connection to suspend - * @param min_refund refund amount we are waiting on to be exceeded before resuming, - * NULL if we are not waiting for refunds - */ -void -TMH_long_poll_suspend (const char *order_id, - const char *session_id, - const char *fulfillment_url, - const struct TMH_MerchantInstance *mi, - struct TMH_SuspendedConnection *sc, - const struct TALER_Amount *min_refund); - - -/** - * Find out if we have any clients long-polling for @a order_id to be - * confirmed at merchant @a mpub, and if so, tell them to resume. - * - * @param order_id the order that was paid or refunded - * @param mi the merchant instance where the payment or refund happened - * @param refund_amount refunded amount, if the trigger was a refund, otherwise NULL - * @param obtained if true, the wallet has obtained the refunds for the order - */ -void -TMH_long_poll_resume (const char *order_id, - const struct TMH_MerchantInstance *mi, - const struct TALER_Amount *refund_amount, - bool obtained); - - -/** - * Find out if we have any clients long-polling for @a order_id to be - * confirmed at merchant @a mpub, and if so, tell them to resume. - * - * @param session_id the session for which @a fulfillment_url became paid - * @param fulfillment_url fullfillment URL of which an order was paid - */ -void -TMH_long_poll_resume2 (const char *session_id, - const char *fulfillment_url); - - /** * Callback that frees an instances removing * it from the global hashmap. diff --git a/src/backend/taler-merchant-httpd_get-orders-ID.c b/src/backend/taler-merchant-httpd_get-orders-ID.c index 72b96eb8..0025a6ce 100644 --- a/src/backend/taler-merchant-httpd_get-orders-ID.c +++ b/src/backend/taler-merchant-httpd_get-orders-ID.c @@ -78,7 +78,12 @@ struct GetOrderData /** * Database event we are waiting on to be resuming. */ - struct GNUNET_DB_EventHandler *eh; + struct GNUNET_DB_EventHandler *pay_eh; + + /** + * Database event we are waiting on to be resuming. + */ + struct GNUNET_DB_EventHandler *refund_eh; /** * Which merchant instance is this for? @@ -200,17 +205,75 @@ resume_by_event (void *cls, size_t extra_size) { struct GetOrderData *god = cls; + struct GNUNET_AsyncScopeSave old; - (void) extra; - (void) extra_size; + GNUNET_async_scope_enter (&god->hc->async_scope_id, + &old); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Received event for %s with argument `%.*s`\n", + god->order_id, + (int) extra_size, + (const char *) extra); if (! god->suspended) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Not suspended, ignoring event\n"); + GNUNET_async_scope_restore (&old); return; /* duplicate event is possible */ + } + if (GNUNET_TIME_absolute_is_future (god->sc.long_poll_timeout) && + god->sc.awaiting_refund) + { + char *as; + struct TALER_Amount a; + + if (0 == extra_size) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "No amount given, but need refund above threshold\n"); + GNUNET_async_scope_restore (&old); + return; /* not relevant */ + } + as = GNUNET_strndup (extra, + extra_size); + if (GNUNET_OK != + TALER_string_to_amount (as, + &a)) + { + GNUNET_break (0); + GNUNET_async_scope_restore (&old); + return; + } + if (GNUNET_OK != + TALER_amount_cmp_currency (&god->sc.refund_expected, + &a)) + { + GNUNET_break (0); + GNUNET_async_scope_restore (&old); + return; /* bad currency!? */ + } + if (1 == TALER_amount_cmp (&god->sc.refund_expected, + &a)) + { + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Amount too small to trigger resuming\n"); + GNUNET_async_scope_restore (&old); + return; /* refund too small */ + } + } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Resuming (%d/%d) by event with argument `%.*s`\n", + (int) GNUNET_TIME_absolute_is_future (god->sc.long_poll_timeout), + god->sc.awaiting_refund, + (int) extra_size, + (const char *) extra); god->suspended = false; - GNUNET_CONTAINER_DLL_insert (god_head, + GNUNET_CONTAINER_DLL_remove (god_head, god_tail, god); MHD_resume_connection (god->sc.con); TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ + GNUNET_async_scope_restore (&old); } @@ -222,7 +285,7 @@ resume_by_event (void *cls, static void suspend_god (struct GetOrderData *god) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Suspending GET /orders/%s\n", god->order_id); if (NULL != god->contract_terms) @@ -236,14 +299,7 @@ suspend_god (struct GetOrderData *god) GNUNET_CONTAINER_DLL_insert (god_head, god_tail, god); - TMH_long_poll_suspend (god->order_id, - god->session_id, - god->fulfillment_url, - god->hc->instance, - &god->sc, - god->sc.awaiting_refund - ? &god->sc.refund_expected - : NULL); + MHD_suspend_connection (god->sc.con); } @@ -536,7 +592,7 @@ send_pay_request (struct GetOrderData *god, (NULL == already_paid_order_id) ) { /* long polling: do not queue a response, suspend connection instead */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Suspending request: long polling for payment\n"); suspend_god (god); return MHD_YES; @@ -726,10 +782,15 @@ god_cleanup (void *cls) json_decref (god->contract_terms); god->contract_terms = NULL; } - if (NULL != god->eh) + if (NULL != god->refund_eh) + { + TMH_db->event_listen_cancel (god->refund_eh); + god->refund_eh = NULL; + } + if (NULL != god->pay_eh) { - TMH_db->event_listen_cancel (god->eh); - god->eh = NULL; + TMH_db->event_listen_cancel (god->pay_eh); + god->pay_eh = NULL; } GNUNET_free (god); } @@ -824,6 +885,9 @@ TMH_get_orders_ID (const struct TMH_RequestHandler *rh, ? 0 == strcasecmp (await_refund_obtained_s, "yes") : false; + if (god->sc.awaiting_refund_obtained) + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Awaiting refund obtained\n"); } { @@ -847,6 +911,9 @@ TMH_get_orders_ID (const struct TMH_RequestHandler *rh, "refund"); } god->sc.awaiting_refund = true; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Awaiting minimum refund of %s\n", + min_refund); } } @@ -886,37 +953,50 @@ TMH_get_orders_ID (const struct TMH_RequestHandler *rh, = GNUNET_TIME_relative_to_absolute (timeout); if (! GNUNET_TIME_relative_is_zero (timeout)) { - if (god->sc.awaiting_refund) + if (god->sc.awaiting_refund || + god->sc.awaiting_refund_obtained) { - struct TMH_OrderPayEvent refund_eh = { +#ifndef TALER_API_VERSION +#define TALER_DBEVENT_MERCHANT_REFUND_OBTAINED 1104 +#endif + struct TMH_OrderPayEventP refund_eh = { .header.size = htons (sizeof (refund_eh)), - .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_REFUND) + .header.type = htons (god->sc.awaiting_refund_obtained + ? TALER_DBEVENT_MERCHANT_REFUND_OBTAINED + : TALER_DBEVENT_MERCHANT_ORDER_REFUND), + .merchant_pub = hc->instance->merchant_pub }; GNUNET_CRYPTO_hash (god->order_id, strlen (god->order_id), &refund_eh.h_order_id); - god->eh = TMH_db->event_listen (TMH_db->cls, - &refund_eh.header, - timeout, - &resume_by_event, - god); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Subscribing to refunds on %s\n", + god->order_id); + god->refund_eh = TMH_db->event_listen (TMH_db->cls, + &refund_eh.header, + timeout, + &resume_by_event, + god); } - else /* ! waiting for refund */ { - struct TMH_OrderPayEvent pay_eh = { + struct TMH_OrderPayEventP pay_eh = { .header.size = htons (sizeof (pay_eh)), - .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_PAID) + .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_PAID), + .merchant_pub = hc->instance->merchant_pub }; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Subscribing to payments on %s\n", + god->order_id); GNUNET_CRYPTO_hash (god->order_id, strlen (god->order_id), &pay_eh.h_order_id); - god->eh = TMH_db->event_listen (TMH_db->cls, - &pay_eh.header, - timeout, - &resume_by_event, - god); + god->pay_eh = TMH_db->event_listen (TMH_db->cls, + &pay_eh.header, + timeout, + &resume_by_event, + god); } } /* end of timeout non-zero */ } /* end of HTML generation NOT requested */ @@ -1156,7 +1236,7 @@ TMH_get_orders_ID (const struct TMH_RequestHandler *rh, public_reorder_url)); } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Claim token or contract matched\n"); if ( (NULL != god->session_id) && @@ -1276,8 +1356,8 @@ TMH_get_orders_ID (const struct TMH_RequestHandler *rh, ( (! god->refunded) || (1 != TALER_amount_cmp (&god->refund_amount, &god->sc.refund_expected)) )) || - ((god->sc.awaiting_refund_obtained) && - (god->refund_available)) ) + ( (god->sc.awaiting_refund_obtained) && + (god->refund_available) ) ) { /* Client is waiting for a refund larger than what we have, suspend until timeout */ @@ -1292,7 +1372,7 @@ TMH_get_orders_ID (const struct TMH_RequestHandler *rh, "Awaiting refund exceeding %s\n", TALER_amount2s (&god->sc.refund_expected)); if (god->sc.awaiting_refund_obtained) - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Awaiting pending refunds\n"); suspend_god (god); return MHD_YES; diff --git a/src/backend/taler-merchant-httpd_post-orders-ID-paid.c b/src/backend/taler-merchant-httpd_post-orders-ID-paid.c index d274c1f6..52f233eb 100644 --- a/src/backend/taler-merchant-httpd_post-orders-ID-paid.c +++ b/src/backend/taler-merchant-httpd_post-orders-ID-paid.c @@ -23,6 +23,7 @@ * @author Christian Grothoff */ #include "platform.h" +#include #include #include #include @@ -33,19 +34,19 @@ * Use database to notify other clients about the * session being captured. * + * @param hc http context * @param session_id the captured session * @param fulfillment_url the URL that is now paid for by @a session_id */ static void -trigger_session_notification (const char *session_id, +trigger_session_notification (struct TMH_HandlerContext *hc, + const char *session_id, const char *fulfillment_url) { -#ifndef TALER_API_VERSION -#define TALER_DBEVENT_MERCHANT_SESSION_CAPTURED 1103 -#endif - struct TMH_SessionEvent session_eh = { + struct TMH_SessionEventP session_eh = { .header.size = htons (sizeof (session_eh)), - .header.type = htons (TALER_DBEVENT_MERCHANT_SESSION_CAPTURED) + .header.type = htons (TALER_DBEVENT_MERCHANT_SESSION_CAPTURED), + .merchant_pub = hc->instance->merchant_pub }; GNUNET_CRYPTO_hash (session_id, @@ -201,21 +202,11 @@ TMH_post_orders_ID_paid (const struct TMH_RequestHandler *rh, /* Wake everybody up who waits for this fulfillment_url and session_id */ if ( (NULL != fulfillment_url) && (NULL != session_id) ) - trigger_session_notification (session_id, + trigger_session_notification (hc, + session_id, fulfillment_url); - if (NULL != fulfillment_url) - TMH_long_poll_resume2 (session_id, - fulfillment_url); - /* fulfillment_url is part of the contract_terms */ json_decref (contract_terms); - /* Resume clients waiting on the order - (NOTE: should never be needed, as /pay - would have triggered those, right?) */ - TMH_long_poll_resume (order_id, - hc->instance, - NULL, - false); return TALER_MHD_reply_static (connection, MHD_HTTP_NO_CONTENT, NULL, diff --git a/src/backend/taler-merchant-httpd_post-orders-ID-pay.c b/src/backend/taler-merchant-httpd_post-orders-ID-pay.c index ea8dd200..7ec76fc5 100644 --- a/src/backend/taler-merchant-httpd_post-orders-ID-pay.c +++ b/src/backend/taler-merchant-httpd_post-orders-ID-pay.c @@ -1300,11 +1300,15 @@ static void trigger_payment_notification (struct PayContext *pc) { { - struct TMH_OrderPayEvent pay_eh = { + struct TMH_OrderPayEventP pay_eh = { .header.size = htons (sizeof (pay_eh)), - .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_PAID) + .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_PAID), + .merchant_pub = pc->hc->instance->merchant_pub }; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Notifying clients about payment of order %s\n", + pc->order_id); GNUNET_CRYPTO_hash (pc->order_id, strlen (pc->order_id), &pay_eh.h_order_id); @@ -1316,14 +1320,16 @@ trigger_payment_notification (struct PayContext *pc) if ( (NULL != pc->session_id) && (NULL != pc->fulfillment_url) ) { -#ifndef TALER_API_VERSION -#define TALER_DBEVENT_MERCHANT_SESSION_CAPTURED 1103 -#endif - struct TMH_SessionEvent session_eh = { + struct TMH_SessionEventP session_eh = { .header.size = htons (sizeof (session_eh)), - .header.type = htons (TALER_DBEVENT_MERCHANT_SESSION_CAPTURED) + .header.type = htons (TALER_DBEVENT_MERCHANT_SESSION_CAPTURED), + .merchant_pub = pc->hc->instance->merchant_pub }; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Notifying clients about session change to %s for %s\n", + pc->session_id, + pc->fulfillment_url); GNUNET_CRYPTO_hash (pc->session_id, strlen (pc->session_id), &session_eh.h_session_id); @@ -1347,7 +1353,6 @@ execute_pay_transaction (struct PayContext *pc) { struct TMH_HandlerContext *hc = pc->hc; const char *instance_id = hc->instance->settings.id; - bool refunded; /* Avoid re-trying transactions on soft errors forever! */ if (pc->retry_counter++ > MAX_RETRIES) @@ -1421,6 +1426,7 @@ execute_pay_transaction (struct PayContext *pc) { enum GNUNET_DB_QueryStatus qs; + /* Check if we refunded some of the coins */ qs = TMH_db->lookup_refunds (TMH_db->cls, instance_id, @@ -1443,10 +1449,9 @@ execute_pay_transaction (struct PayContext *pc) "lookup refunds"); return; } - refunded = (qs > 0); } - /* Check if there are coins that still need to be processed */ + /* Check if there are coins that still need to be processed */ if (0 != pc->pending) { /* we made no DB changes, so we can just rollback */ @@ -1494,7 +1499,6 @@ execute_pay_transaction (struct PayContext *pc) "mark contract paid"); return; } - trigger_payment_notification (pc); } { @@ -1518,25 +1522,9 @@ execute_pay_transaction (struct PayContext *pc) NULL); return; } + trigger_payment_notification (pc); } - /* Notify clients that have been waiting for the payment to succeed */ - if ( (NULL != pc->session_id) && - (NULL != pc->fulfillment_url) ) - TMH_long_poll_resume2 (pc->session_id, - pc->fulfillment_url); - TMH_long_poll_resume (pc->order_id, - hc->instance, - NULL, - false); - TMH_notify_order_change (hc->instance, - pc->order_id, - true, /* paid */ - refunded, - false, /* wired */ - pc->timestamp, - pc->order_serial); - /* Generate response (payment successful) */ { struct GNUNET_CRYPTO_EddsaSignature sig; diff --git a/src/backend/taler-merchant-httpd_post-orders-ID-refund.c b/src/backend/taler-merchant-httpd_post-orders-ID-refund.c index 6eeb4182..ff64d9a2 100644 --- a/src/backend/taler-merchant-httpd_post-orders-ID-refund.c +++ b/src/backend/taler-merchant-httpd_post-orders-ID-refund.c @@ -23,9 +23,11 @@ * @author Jonathan Buchanan */ #include "platform.h" +#include #include #include #include +#include "taler-merchant-httpd.h" #include "taler-merchant-httpd_auditors.h" #include "taler-merchant-httpd_exchanges.h" #include "taler-merchant-httpd_post-orders-ID-refund.h" @@ -165,11 +167,6 @@ struct PostRefundData */ struct TMH_SuspendedConnection sc; - /** - * Which merchant instance is this for? - */ - struct MerchantInstance *mi; - /** * order ID for the payment */ @@ -288,9 +285,7 @@ refund_cleanup (void *ctx) } GNUNET_free (cr); } - json_decref (prd->contract_terms); - GNUNET_free (prd); } @@ -357,6 +352,31 @@ check_resume_prd (struct PostRefundData *prd) } +/** + * Notify applications waiting for a client to obtain + * a refund. + * + * @param prd refund request with the change + */ +static void +notify_refund_obtained (struct PostRefundData *prd) +{ + struct TMH_OrderPayEventP refund_eh = { + .header.size = htons (sizeof (refund_eh)), + .header.type = htons (TALER_DBEVENT_MERCHANT_REFUND_OBTAINED), + .merchant_pub = prd->hc->instance->merchant_pub + }; + + GNUNET_CRYPTO_hash (prd->order_id, + strlen (prd->order_id), + &refund_eh.h_order_id); + TMH_db->event_notify (TMH_db->cls, + &refund_eh.header, + NULL, + 0); +} + + /** * Callbacks of this type are used to serve the result of submitting a * refund request to an exchange. @@ -403,6 +423,10 @@ refund_cb (void *cls, "Failed to persist exchange response to /refund in database: %d\n", qs); } + else + { + notify_refund_obtained (cr->prd); + } } check_resume_prd (cr->prd); } @@ -691,11 +715,6 @@ TMH_post_orders_ID_refund (const struct TMH_RequestHandler *rh, if (NULL == cr->exchange_reply) { /* We need to talk to the exchange */ - /* Notify clients waiting for the refund to be obtained. */ - TMH_long_poll_resume (hc->infix, - hc->instance, - &prd->refund_amount, - true); cr->fo = TMH_EXCHANGES_find_exchange (cr->exchange_url, NULL, GNUNET_NO, diff --git a/src/backend/taler-merchant-httpd_private-delete-instances-ID.c b/src/backend/taler-merchant-httpd_private-delete-instances-ID.c index afcea0c9..9926d0b3 100644 --- a/src/backend/taler-merchant-httpd_private-delete-instances-ID.c +++ b/src/backend/taler-merchant-httpd_private-delete-instances-ID.c @@ -45,7 +45,7 @@ delete_instances_ID (struct TMH_MerchantInstance *mi, if (NULL == purge_s) purge_s = "no"; purge = (0 == strcasecmp (purge_s, - "yes")); + "yes")); if (purge) qs = TMH_db->purge_instance (TMH_db->cls, mi->settings.id); @@ -74,12 +74,6 @@ delete_instances_ID (struct TMH_MerchantInstance *mi, : "Private key unknown"); case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: TMH_reload_instances (mi->settings.id); - if (purge) - TMH_instance_free_cb (NULL, - &mi->h_instance, - mi); - else - mi->deleted = true; return TALER_MHD_reply_static (connection, MHD_HTTP_NO_CONTENT, NULL, diff --git a/src/backend/taler-merchant-httpd_private-get-orders-ID.c b/src/backend/taler-merchant-httpd_private-get-orders-ID.c index 3688bb87..b272e4c0 100644 --- a/src/backend/taler-merchant-httpd_private-get-orders-ID.c +++ b/src/backend/taler-merchant-httpd_private-get-orders-ID.c @@ -167,7 +167,7 @@ struct GetOrderRequestContext * Database event we are waiting on to be resuming * for session capture. */ - struct GNUNET_DB_EventHandler *seh; + struct GNUNET_DB_EventHandler *session_eh; /** * Contract terms of the payment we are checking. NULL when they @@ -305,7 +305,6 @@ gorc_resume (struct GetOrderRequestContext *gorc, { struct TransferQuery *tq; - GNUNET_assert (gorc->suspended); if (NULL != gorc->tt) { GNUNET_SCHEDULER_cancel (gorc->tt); @@ -326,6 +325,7 @@ gorc_resume (struct GetOrderRequestContext *gorc, } gorc->wire_hc = http_status; gorc->wire_ec = ec; + GNUNET_assert (gorc->suspended); GNUNET_CONTAINER_DLL_remove (gorc_head, gorc_tail, gorc); @@ -355,7 +355,7 @@ resume_by_event (void *cls, if (! gorc->suspended) return; /* duplicate event is possible */ gorc->suspended = false; - GNUNET_CONTAINER_DLL_insert (gorc_head, + GNUNET_CONTAINER_DLL_remove (gorc_head, gorc_tail, gorc); MHD_resume_connection (gorc->sc.con); @@ -663,10 +663,10 @@ gorc_cleanup (void *cls) TMH_db->event_listen_cancel (gorc->eh); gorc->eh = NULL; } - if (NULL != gorc->seh) + if (NULL != gorc->session_eh) { - TMH_db->event_listen_cancel (gorc->seh); - gorc->seh = NULL; + TMH_db->event_listen_cancel (gorc->session_eh); + gorc->session_eh = NULL; } GNUNET_free (gorc); } @@ -869,9 +869,10 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, = GNUNET_TIME_relative_to_absolute (timeout); if (! GNUNET_TIME_relative_is_zero (timeout)) { - struct TMH_OrderPayEvent pay_eh = { + struct TMH_OrderPayEventP pay_eh = { .header.size = htons (sizeof (pay_eh)), - .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_PAID) + .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_PAID), + .merchant_pub = hc->instance->merchant_pub }; GNUNET_CRYPTO_hash (hc->infix, @@ -885,12 +886,10 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, if ( (NULL != gorc->session_id) && (NULL != gorc->fulfillment_url) ) { -#ifndef TALER_API_VERSION -#define TALER_DBEVENT_MERCHANT_SESSION_CAPTURED 1103 -#endif - struct TMH_SessionEvent session_eh = { + struct TMH_SessionEventP session_eh = { .header.size = htons (sizeof (session_eh)), - .header.type = htons (TALER_DBEVENT_MERCHANT_SESSION_CAPTURED) + .header.type = htons (TALER_DBEVENT_MERCHANT_SESSION_CAPTURED), + .merchant_pub = hc->instance->merchant_pub }; GNUNET_CRYPTO_hash (gorc->session_id, @@ -899,11 +898,11 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, GNUNET_CRYPTO_hash (gorc->fulfillment_url, strlen (gorc->fulfillment_url), &session_eh.h_fulfillment_url); - gorc->seh = TMH_db->event_listen (TMH_db->cls, - &session_eh.header, - timeout, - &resume_by_event, - gorc); + gorc->session_eh = TMH_db->event_listen (TMH_db->cls, + &session_eh.header, + timeout, + &resume_by_event, + gorc); } } } @@ -916,7 +915,7 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, } /* end first-time per-request initialization */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting GET /private/orders/%s processing with timeout %s\n", hc->infix, GNUNET_STRINGS_absolute_time_to_string ( @@ -1170,16 +1169,14 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, { if (GNUNET_TIME_absolute_is_future (gorc->sc.long_poll_timeout)) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Suspending GET /private/orders/%s\n", hc->infix); + GNUNET_CONTAINER_DLL_insert (gorc_head, + gorc_tail, + gorc); gorc->suspended = true; - TMH_long_poll_suspend (hc->infix, - gorc->session_id, - gorc->fulfillment_url, - hc->instance, - &gorc->sc, - NULL); + MHD_suspend_connection (gorc->sc.con); return MHD_YES; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -1214,11 +1211,11 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, GNUNET_CONTAINER_DLL_insert (gorc_head, gorc_tail, gorc); + gorc->suspended = true; + MHD_suspend_connection (connection); gorc->tt = GNUNET_SCHEDULER_add_delayed (EXCHANGE_TIMEOUT, &exchange_timeout_cb, gorc); - gorc->suspended = true; - MHD_suspend_connection (connection); return MHD_YES; } } @@ -1226,16 +1223,15 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, if ( (! paid) && (GNUNET_TIME_absolute_is_future (gorc->sc.long_poll_timeout)) ) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Suspending GET /private/orders/%s\n", hc->infix); + GNUNET_assert (! gorc->suspended); + GNUNET_CONTAINER_DLL_insert (gorc_head, + gorc_tail, + gorc); gorc->suspended = true; - TMH_long_poll_suspend (hc->infix, - gorc->session_id, - gorc->fulfillment_url, - hc->instance, - &gorc->sc, - NULL); + MHD_suspend_connection (gorc->sc.con); return MHD_YES; } @@ -1364,10 +1360,8 @@ TMH_private_get_orders_ID (const struct TMH_RequestHandler *rh, gorc->order_serial); GNUNET_break (qs >= 0); /* just warn if transaction failed */ TMH_notify_order_change (hc->instance, - hc->infix, - true, /* paid */ - false, /* technically unknown, but OK here */ - true, /* wired */ + TMH_OSF_PAID + | TMH_OSF_WIRED, timestamp, gorc->order_serial); } diff --git a/src/backend/taler-merchant-httpd_private-get-orders.c b/src/backend/taler-merchant-httpd_private-get-orders.c index 0f9e9f13..e2796610 100644 --- a/src/backend/taler-merchant-httpd_private-get-orders.c +++ b/src/backend/taler-merchant-httpd_private-get-orders.c @@ -21,6 +21,7 @@ #include "platform.h" #include "taler-merchant-httpd_private-get-orders.h" #include +#include /** @@ -127,6 +128,11 @@ TMH_force_get_orders_resume (struct TMH_MerchantInstance *mi) MHD_resume_connection (po->con); po->in_dll = false; } + if (NULL != mi->po_eh) + { + TMH_db->event_listen_cancel (mi->po_eh); + mi->po_eh = NULL; + } if (NULL != order_timeout_task) { GNUNET_SCHEDULER_cancel (order_timeout_task); @@ -175,6 +181,12 @@ order_timeout (void *cls) GNUNET_CONTAINER_DLL_remove (mi->po_head, mi->po_tail, po); + if ( (NULL == mi->po_head) && + (NULL != mi->po_eh) ) + { + TMH_db->event_listen_cancel (mi->po_eh); + mi->po_eh = NULL; + } po->in_dll = false; MHD_resume_connection (po->con); TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ @@ -249,14 +261,14 @@ process_refunds_cb (void *cls, /** * Add order details to our JSON array. * - * @param[in,out] cls a `json_t *` JSON array to build - * @param order_id ID of the order + * @param cls some closure + * @param orig_order_id the order this is about * @param order_serial serial ID of the order * @param creation_time when was the order created */ static void add_order (void *cls, - const char *order_id, + const char *orig_order_id, uint64_t order_serial, struct GNUNET_TIME_Absolute creation_time) { @@ -265,24 +277,37 @@ add_order (void *cls, struct GNUNET_HashCode h_contract_terms; enum GNUNET_DB_QueryStatus qs; const char *summary; + char *order_id = NULL; bool refundable = false; bool paid; struct TALER_Amount order_amount; - qs = TMH_db->lookup_order_status (TMH_db->cls, - po->instance_id, - order_id, - &h_contract_terms, - &paid); - /* qs == 0: contract terms don't exist, so the order cannot be paid. */ - if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) - paid = false; + qs = TMH_db->lookup_order_status_by_serial (TMH_db->cls, + po->instance_id, + order_serial, + &order_id, + &h_contract_terms, + &paid); if (qs < 0) { GNUNET_break (0); po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; return; } + /* qs == 0: contract terms don't exist, so the order cannot be paid. */ + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + { + paid = false; + if (NULL == orig_order_id) + { + /* cannot be via DB trigger, and the other code + path should have passed an orig_order_id */ + GNUNET_break (0); + po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; + return; + } + order_id = GNUNET_strdup (orig_order_id); + } if (paid) { @@ -314,6 +339,7 @@ add_order (void *cls, GNUNET_break (0); po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; json_decref (contract_terms); + GNUNET_free (order_id); return; } @@ -338,6 +364,7 @@ add_order (void *cls, GNUNET_break (0); po->result = TALER_EC_MERCHANT_GENERIC_DB_CONTRACT_CONTENT_INVALID; json_decref (contract_terms); + GNUNET_free (order_id); return; } @@ -359,6 +386,7 @@ add_order (void *cls, GNUNET_break (0); po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; json_decref (contract_terms); + GNUNET_free (order_id); return; } if (0 > TALER_amount_cmp (&refund_amount, @@ -386,42 +414,51 @@ add_order (void *cls, GNUNET_JSON_pack_bool ("paid", paid)))); json_decref (contract_terms); + GNUNET_free (order_id); } /** - * There has been a change or addition of a new @a order_id. Wake up - * long-polling clients that may have been waiting for this event. + * We have received a trigger from the database + * that we should (possibly) resume some requests. * - * @param mi the instance where the order changed - * @param order_id the order that changed - * @param paid is the order paid by the customer? - * @param refunded was the order refunded? - * @param wired was the merchant paid via wire transfer? - * @param date execution date of the order - * @param order_serial_id serial ID of the order in the database + * @param cls a `struct TMH_MerchantInstance` + * @param extra a `struct TMH_OrderChangeEventP` + * @param extra_size number of bytes in @a extra */ -void -TMH_notify_order_change (struct TMH_MerchantInstance *mi, - const char *order_id, - bool paid, - bool refunded, - bool wired, - struct GNUNET_TIME_Absolute date, - uint64_t order_serial_id) +static void +resume_by_event (void *cls, + const void *extra, + size_t extra_size) { + struct TMH_MerchantInstance *mi = cls; + const struct TMH_OrderChangeEventDetailsP *oce = extra; struct TMH_PendingOrder *pn; + enum TMH_OrderStateFlags osf; + uint64_t order_serial_id; + struct GNUNET_TIME_Absolute date; + if (sizeof (*oce) != extra_size) + { + GNUNET_break (0); + return; + } + osf = (enum TMH_OrderStateFlags) ntohl (oce->order_state); + order_serial_id = GNUNET_ntohll (oce->order_serial_id); + date = GNUNET_TIME_absolute_ntoh (oce->execution_date); for (struct TMH_PendingOrder *po = mi->po_head; NULL != po; po = pn) { pn = po->next; - if (! ( ( ((TALER_EXCHANGE_YNA_YES == po->of.paid) == paid) || + if (! ( ( ((TALER_EXCHANGE_YNA_YES == po->of.paid) == + (0 != (osf & TMH_OSF_PAID))) || (TALER_EXCHANGE_YNA_ALL == po->of.paid) ) && - ( ((TALER_EXCHANGE_YNA_YES == po->of.refunded) == refunded) || + ( ((TALER_EXCHANGE_YNA_YES == po->of.refunded) == + (0 != (osf & TMH_OSF_REFUNDED))) || (TALER_EXCHANGE_YNA_ALL == po->of.refunded) ) && - ( ((TALER_EXCHANGE_YNA_YES == po->of.wired) == wired) || + ( ((TALER_EXCHANGE_YNA_YES == po->of.wired) == + (0 != (osf & TMH_OSF_WIRED))) || (TALER_EXCHANGE_YNA_ALL == po->of.wired) ) ) ) continue; if (po->of.delta > 0) @@ -441,7 +478,7 @@ TMH_notify_order_change (struct TMH_MerchantInstance *mi, po->of.delta++; } add_order (po, - order_id, + NULL, order_serial_id, date); GNUNET_assert (po->in_dll); @@ -454,6 +491,44 @@ TMH_notify_order_change (struct TMH_MerchantInstance *mi, MHD_resume_connection (po->con); TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ } + if (NULL == mi->po_head) + { + TMH_db->event_listen_cancel (mi->po_eh); + mi->po_eh = NULL; + } +} + + +/** + * There has been a change or addition of a new @a order_id. Wake up + * long-polling clients that may have been waiting for this event. + * + * @param mi the instance where the order changed + * @param osf order state flags + * @param date execution date of the order + * @param order_serial_id serial ID of the order in the database + */ +void +TMH_notify_order_change (struct TMH_MerchantInstance *mi, + enum TMH_OrderStateFlags osf, + struct GNUNET_TIME_Absolute date, + uint64_t order_serial_id) +{ + struct TMH_OrderChangeEventDetailsP oce = { + .order_serial_id = GNUNET_htonll (order_serial_id), + .execution_date = GNUNET_TIME_absolute_hton (date), + .order_state = htonl (osf) + }; + struct TMH_OrderChangeEventP eh = { + .header.type = htons (TALER_DBEVENT_MERCHANT_ORDERS_CHANGE), + .header.size = htons (sizeof (eh)), + .merchant_pub = mi->merchant_pub + }; + + TMH_db->event_notify (TMH_db->cls, + &eh.header, + &oce, + sizeof (oce)); } @@ -688,7 +763,6 @@ TMH_private_get_orders (const struct TMH_RequestHandler *rh, GNUNET_assert (NULL != po->pa); po->instance_id = hc->instance->settings.id; po->mi = hc->instance; - qs = TMH_db->lookup_orders (TMH_db->cls, po->instance_id, &of, @@ -724,6 +798,19 @@ TMH_private_get_orders (const struct TMH_RequestHandler *rh, mi->po_tail, po); po->in_dll = true; + if (NULL == mi->po_eh) + { + struct GNUNET_DB_EventHeaderP change_eh = { + .type = htons (TALER_DBEVENT_MERCHANT_ORDERS_CHANGE), + .size = htons (sizeof (change_eh)) + }; + + mi->po_eh = TMH_db->event_listen (TMH_db->cls, + &change_eh, + GNUNET_TIME_UNIT_FOREVER_REL, + &resume_by_event, + mi); + } MHD_suspend_connection (connection); { struct TMH_PendingOrder *pot; diff --git a/src/backend/taler-merchant-httpd_private-get-orders.h b/src/backend/taler-merchant-httpd_private-get-orders.h index cdd88d54..53a20b17 100644 --- a/src/backend/taler-merchant-httpd_private-get-orders.h +++ b/src/backend/taler-merchant-httpd_private-get-orders.h @@ -43,19 +43,13 @@ TMH_private_get_orders (const struct TMH_RequestHandler *rh, * long-polling clients that may have been waiting for this event. * * @param mi the instance where the order changed - * @param order_id the order that changed - * @param paid is the order paid by the customer? - * @param refunded was the order refunded? - * @param wired was the merchant paid via wire transfer? + * @param osf order state flags * @param date execution date of the order * @param order_serial_id serial ID of the order in the database */ void TMH_notify_order_change (struct TMH_MerchantInstance *mi, - const char *order_id, - bool paid, - bool refunded, - bool wired, + enum TMH_OrderStateFlags osf, struct GNUNET_TIME_Absolute date, uint64_t order_serial_id); diff --git a/src/backend/taler-merchant-httpd_private-post-orders-ID-refund.c b/src/backend/taler-merchant-httpd_private-post-orders-ID-refund.c index 46f30668..ebbfdf0a 100644 --- a/src/backend/taler-merchant-httpd_private-post-orders-ID-refund.c +++ b/src/backend/taler-merchant-httpd_private-post-orders-ID-refund.c @@ -39,22 +39,28 @@ * Use database to notify other clients about the * @a order_id being refunded * - * @param order_id the order receiving a refund + * @param hc handler context we operate in * @param amount the (total) refunded amount */ static void -trigger_refund_notification (const char *order_id, +trigger_refund_notification (struct TMH_HandlerContext *hc, const struct TALER_Amount *amount) { const char *as; - struct TMH_OrderRefundEvent refund_eh = { + struct TMH_OrderRefundEventP refund_eh = { .header.size = htons (sizeof (refund_eh)), - .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_REFUND) + .header.type = htons (TALER_DBEVENT_MERCHANT_ORDER_REFUND), + .merchant_pub = hc->instance->merchant_pub }; + /* Resume clients that may wait for this refund */ as = TALER_amount2s (amount); - GNUNET_CRYPTO_hash (order_id, - strlen (order_id), + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Awakening clients on %s waiting for refund of no more than %s\n", + hc->infix, + as); + GNUNET_CRYPTO_hash (hc->infix, + strlen (hc->infix), &refund_eh.h_order_id); TMH_db->event_notify (TMH_db->cls, &refund_eh.header, @@ -83,7 +89,6 @@ make_taler_refund_uri (struct MHD_Connection *connection, GNUNET_assert (NULL != instance_id); GNUNET_assert (NULL != order_id); - host = MHD_lookup_connection_value (connection, MHD_HEADER_KIND, "Host"); @@ -251,8 +256,6 @@ TMH_private_post_orders_ID_refund (const struct TMH_RequestHandler *rh, { enum GNUNET_DB_QueryStatus qs; - trigger_refund_notification (hc->infix, - &refund); qs = TMH_db->commit (TMH_db->cls); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { @@ -262,6 +265,8 @@ TMH_private_post_orders_ID_refund (const struct TMH_RequestHandler *rh, } if (GNUNET_DB_STATUS_SOFT_ERROR == qs) continue; + trigger_refund_notification (hc, + &refund); } break; } /* retries loop */ @@ -329,15 +334,6 @@ TMH_private_post_orders_ID_refund (const struct TMH_RequestHandler *rh, break; } - /* Resume clients that may wait for this refund */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Awakeing clients on %s waiting for refund of less than %s\n", - hc->infix, - TALER_amount2s (&refund)); - TMH_long_poll_resume (hc->infix, - hc->instance, - &refund, - false); { struct GNUNET_TIME_Absolute timestamp; uint64_t order_serial; @@ -357,10 +353,8 @@ TMH_private_post_orders_ID_refund (const struct TMH_RequestHandler *rh, NULL); } TMH_notify_order_change (hc->instance, - hc->infix, - true, /* paid */ - true, /* refunded */ - false, /* wired, cannot be if we could still do refunds */ + TMH_OSF_PAID + | TMH_OSF_REFUNDED, timestamp, order_serial); } diff --git a/src/backenddb/plugin_merchantdb_postgres.c b/src/backenddb/plugin_merchantdb_postgres.c index 3036a439..d56c4b8b 100644 --- a/src/backenddb/plugin_merchantdb_postgres.c +++ b/src/backenddb/plugin_merchantdb_postgres.c @@ -2500,6 +2500,56 @@ postgres_lookup_order_status (void *cls, } +/** + * Retrieve contract terms given its @a order_serial + * + * @param cls closure + * @param instance_id instance's identifier + * @param order_serial serial ID of the order to look up + * @param[out] order_id set to ID of the order + * @param[out] h_contract_terms set to the hash of the contract. + * @param[out] paid set to the payment status of the contract + * @return transaction status + */ +static enum GNUNET_DB_QueryStatus +postgres_lookup_order_status_by_serial (void *cls, + const char *instance_id, + uint64_t order_serial, + char **order_id, + struct GNUNET_HashCode *h_contract_terms, + bool *paid) +{ + struct PostgresClosure *pg = cls; + uint8_t paid8; + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (instance_id), + GNUNET_PQ_query_param_uint64 (&order_serial), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_auto_from_type ("h_contract_terms", + h_contract_terms), + GNUNET_PQ_result_spec_auto_from_type ("paid", + &paid8), + GNUNET_PQ_result_spec_string ("order_id", + order_id), + GNUNET_PQ_result_spec_end + }; + + check_connection (pg); + qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "lookup_order_status_by_serial", + params, + rs); + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) + *paid = (0 != paid8); + else + *paid = false; /* just to be safe(r) */ + return qs; +} + + /** * Retrieve payment and wire status for a given @a order_serial and session ID. * @@ -7898,12 +7948,27 @@ postgres_connect (void *cls) " h_contract_terms" ",paid" " FROM merchant_contract_terms" - " WHERE merchant_contract_terms.merchant_serial=" + " WHERE merchant_serial=" " (SELECT merchant_serial " " FROM merchant_instances" " WHERE merchant_id=$1)" " AND order_id=$2", 2), + + /* for postgres_lookup_order_status_by_serial() */ + GNUNET_PQ_make_prepare ("lookup_order_status_by_serial", + "SELECT" + " h_contract_terms" + ",order_id" + ",paid" + " FROM merchant_contract_terms" + " WHERE merchant_serial=" + " (SELECT merchant_serial " + " FROM merchant_instances" + " WHERE merchant_id=$1)" + " AND order_serial=$2", + 2), + /* for postgres_lookup_payment_status() */ GNUNET_PQ_make_prepare ("lookup_payment_status", "SELECT" @@ -9046,6 +9111,8 @@ libtaler_plugin_merchantdb_postgres_init (void *cls) plugin->mark_contract_paid = &postgres_mark_contract_paid; plugin->refund_coin = &postgres_refund_coin; plugin->lookup_order_status = &postgres_lookup_order_status; + plugin->lookup_order_status_by_serial = + &postgres_lookup_order_status_by_serial; plugin->lookup_payment_status = &postgres_lookup_payment_status; plugin->lookup_deposits_by_order = &postgres_lookup_deposits_by_order; plugin->lookup_transfer_details_by_order = diff --git a/src/include/taler_merchantdb_plugin.h b/src/include/taler_merchantdb_plugin.h index 36f28094..97079680 100644 --- a/src/include/taler_merchantdb_plugin.h +++ b/src/include/taler_merchantdb_plugin.h @@ -1445,6 +1445,25 @@ struct TALER_MERCHANTDB_Plugin struct GNUNET_HashCode *h_contract_terms, bool *paid); + /** + * Retrieve contract terms given its @a order_serial + * + * @param cls closure + * @param instance_id instance's identifier + * @param order_serial serial ID of the order to look up + * @param[out] order_id set to ID of the order + * @param[out] h_contract_terms set to the hash of the contract. + * @param[out] paid set to the payment status of the contract + * @return transaction status + */ + enum GNUNET_DB_QueryStatus + (*lookup_order_status_by_serial)(void *cls, + const char *instance_id, + uint64_t order_serial, + char **order_id, + struct GNUNET_HashCode *h_contract_terms, + bool *paid); + /** * Retrieve payment and wire status for a given @a order_serial and diff --git a/src/testing/testing_api_cmd_merchant_get_order.c b/src/testing/testing_api_cmd_merchant_get_order.c index 26dd0399..34c2b0b0 100644 --- a/src/testing/testing_api_cmd_merchant_get_order.c +++ b/src/testing/testing_api_cmd_merchant_get_order.c @@ -143,6 +143,9 @@ merchant_get_order_cb ( struct MerchantGetOrderState *gos = cls; gos->ogh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "GET /private/orders/$ID completed with status %u\n", + hr->http_status); if (gos->http_status != hr->http_status) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -870,7 +873,7 @@ conclude_task (void *cls) /** - * Callback to process a GET /orders/$ID request + * Callback to process a GET /private/orders/$ID request * * @param cls closure * @param hr HTTP response details @@ -886,6 +889,9 @@ merchant_poll_order_cb ( struct MerchantPollOrderStartState *pos = cls; pos->ogh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "GET /private/orders/$ID finished with status %u.\n", + hr->http_status); pos->http_status = hr->http_status; switch (hr->http_status) { @@ -1017,6 +1023,12 @@ merchant_poll_order_conclude_run (void *cls, GNUNET_assert (poll_cmd->run == &merchant_poll_order_start_run); pos = poll_cmd->cls; pos->cs = poc; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Waiting on GET /private/orders/$ID of %s (%s)\n", + poc->start_reference, + (NULL == pos->ogh) + ? "finished" + : "active"); if (NULL == pos->ogh) poc->task = GNUNET_SCHEDULER_add_now (&conclude_task, poc); diff --git a/src/testing/testing_api_cmd_wallet_get_order.c b/src/testing/testing_api_cmd_wallet_get_order.c index d0a50a0c..a58dda80 100644 --- a/src/testing/testing_api_cmd_wallet_get_order.c +++ b/src/testing/testing_api_cmd_wallet_get_order.c @@ -600,6 +600,9 @@ wallet_poll_order_cb ( struct WalletPollOrderStartState *pos = cls; pos->ogh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "GET /orders/$ID finished with status %u.\n", + hr->http_status); pos->http_status = hr->http_status; switch (hr->http_status) { @@ -791,6 +794,12 @@ wallet_poll_order_conclude_run (void *cls, GNUNET_assert (poll_cmd->run == &wallet_poll_order_start_run); pos = poll_cmd->cls; pos->cs = poc; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Waiting on GET /orders/$ID of %s (%s)\n", + poc->start_reference, + (NULL == pos->ogh) + ? "finished" + : "active"); if (NULL == pos->ogh) poc->task = GNUNET_SCHEDULER_add_now (&conclude_task, poc); -- cgit v1.2.3