/* This file is part of TALER (C) 2019-2021 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with TALER; see the file COPYING. If not, see */ /** * @file taler-merchant-httpd_private-get-orders.c * @brief implement GET /orders * @author Christian Grothoff */ #include "platform.h" #include "taler-merchant-httpd_private-get-orders.h" #include #include /** * Sensible bound on TALER_MERCHANTDB_OrderFilter.delta */ #define MAX_DELTA 1024 /** * A pending GET /orders request. */ struct TMH_PendingOrder { /** * Kept in a DLL. */ struct TMH_PendingOrder *prev; /** * Kept in a DLL. */ struct TMH_PendingOrder *next; /** * Which connection was suspended. */ struct MHD_Connection *con; /** * Associated heap node. */ struct GNUNET_CONTAINER_HeapNode *hn; /** * Which instance is this client polling? This also defines * which DLL this struct is part of. */ struct TMH_MerchantInstance *mi; /** * At what time does this request expire? If set in the future, we * may wait this long for a payment to arrive before responding. */ struct GNUNET_TIME_Absolute long_poll_timeout; /** * Filter to apply. */ struct TALER_MERCHANTDB_OrderFilter of; /** * The array of orders. */ json_t *pa; /** * The name of the instance we are querying for. */ const char *instance_id; /** * The result after adding the orders (#TALER_EC_NONE for okay, anything else for an error). */ enum TALER_ErrorCode result; /** * Is the structure in the DLL */ bool in_dll; }; /** * Task to timeout pending orders. */ static struct GNUNET_SCHEDULER_Task *order_timeout_task; /** * Heap for orders in long polling awaiting timeout. */ static struct GNUNET_CONTAINER_Heap *order_timeout_heap; /** * We are shutting down (or an instance is being deleted), force resume of all * GET /orders requests. * * @param mi instance to force resuming for */ void TMH_force_get_orders_resume (struct TMH_MerchantInstance *mi) { struct TMH_PendingOrder *po; while (NULL != (po = mi->po_head)) { GNUNET_assert (po->in_dll); GNUNET_CONTAINER_DLL_remove (mi->po_head, mi->po_tail, po); GNUNET_assert (po == GNUNET_CONTAINER_heap_remove_root (order_timeout_heap)); MHD_resume_connection (po->con); po->in_dll = false; } if (NULL != mi->po_eh) { TMH_db->event_listen_cancel (mi->po_eh); mi->po_eh = NULL; } if (NULL != order_timeout_task) { GNUNET_SCHEDULER_cancel (order_timeout_task); order_timeout_task = NULL; } if (NULL != order_timeout_heap) { GNUNET_CONTAINER_heap_destroy (order_timeout_heap); order_timeout_heap = NULL; } } /** * Task run to trigger timeouts on GET /orders requests with long polling. * * @param cls unused */ static void order_timeout (void *cls) { struct TMH_PendingOrder *po; struct TMH_MerchantInstance *mi; (void) cls; order_timeout_task = NULL; while (1) { po = GNUNET_CONTAINER_heap_peek (order_timeout_heap); if (NULL == po) { /* release data structure, we don't need it right now */ GNUNET_CONTAINER_heap_destroy (order_timeout_heap); order_timeout_heap = NULL; return; } if (GNUNET_TIME_absolute_is_future (po->long_poll_timeout)) break; GNUNET_assert (po == GNUNET_CONTAINER_heap_remove_root (order_timeout_heap)); po->hn = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Resuming long polled job due to timeout\n"); mi = po->mi; GNUNET_assert (po->in_dll); GNUNET_CONTAINER_DLL_remove (mi->po_head, mi->po_tail, po); if ( (NULL == mi->po_head) && (NULL != mi->po_eh) ) { TMH_db->event_listen_cancel (mi->po_eh); mi->po_eh = NULL; } po->in_dll = false; MHD_resume_connection (po->con); TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ } order_timeout_task = GNUNET_SCHEDULER_add_at (po->long_poll_timeout, &order_timeout, NULL); } /** * Cleanup our "context", where we stored the JSON array * we are building for the response. * * @param ctx context to clean up, must be a `struct AddOrderState *` */ static void cleanup (void *ctx) { struct TMH_PendingOrder *po = ctx; if (po->in_dll) { struct TMH_MerchantInstance *mi = po->mi; GNUNET_CONTAINER_DLL_remove (mi->po_head, mi->po_tail, po); } if (NULL != po->hn) GNUNET_assert (po == GNUNET_CONTAINER_heap_remove_node (po->hn)); json_decref (po->pa); GNUNET_free (po); } /** * Function called with information about a refund. * It is responsible for summing up the refund amount. * * @param cls closure * @param refund_serial unique serial number of the refund * @param timestamp time of the refund (for grouping of refunds in the wallet UI) * @param coin_pub public coin from which the refund comes from * @param exchange_url URL of the exchange that issued @a coin_pub * @param rtransaction_id identificator of the refund * @param reason human-readable explanation of the refund * @param refund_amount refund amount which is being taken from @a coin_pub * @param pending true if the this refund was not yet processed by the wallet/exchange */ static void process_refunds_cb (void *cls, uint64_t refund_serial, struct GNUNET_TIME_Timestamp timestamp, const struct TALER_CoinSpendPublicKeyP *coin_pub, const char *exchange_url, uint64_t rtransaction_id, const char *reason, const struct TALER_Amount *refund_amount, bool pending) { struct TALER_Amount *total_refund_amount = cls; GNUNET_assert (0 <= TALER_amount_add (total_refund_amount, total_refund_amount, refund_amount)); } /** * Add order details to our JSON array. * * @param cls some closure * @param orig_order_id the order this is about * @param order_serial serial ID of the order * @param creation_time when was the order created */ static void add_order (void *cls, const char *orig_order_id, uint64_t order_serial, struct GNUNET_TIME_Timestamp creation_time) { struct TMH_PendingOrder *po = cls; json_t *contract_terms = NULL; struct TALER_PrivateContractHashP h_contract_terms; enum GNUNET_DB_QueryStatus qs; const char *summary; char *order_id = NULL; bool refundable = false; bool paid; struct TALER_Amount order_amount; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Adding order `%s' (%llu) to result set\n", orig_order_id, (unsigned long long) order_serial); qs = TMH_db->lookup_order_status_by_serial (TMH_db->cls, po->instance_id, order_serial, &order_id, &h_contract_terms, &paid); if (qs < 0) { GNUNET_break (0); po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; return; } if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) { /* Contract terms don't exist, so the order cannot be paid. */ paid = false; if (NULL == orig_order_id) { /* cannot be via DB trigger, and the other code path should have passed an orig_order_id */ GNUNET_break (0); po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; return; } order_id = GNUNET_strdup (orig_order_id); } { /* First try to find the order in the contracts */ uint64_t os; bool paid = false; qs = TMH_db->lookup_contract_terms (TMH_db->cls, po->instance_id, order_id, &contract_terms, &os, &paid, NULL); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) GNUNET_break (os == order_serial); } if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) { /* Might still be unclaimed, so try order table */ struct TALER_MerchantPostDataHashP unused; qs = TMH_db->lookup_order (TMH_db->cls, po->instance_id, order_id, NULL, &unused, &contract_terms); } if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Order %llu disappeared during iteration. Skipping.\n", (unsigned long long) order_serial); json_decref (contract_terms); /* should still be NULL */ GNUNET_free (order_id); return; } if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { GNUNET_break (0); po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; json_decref (contract_terms); GNUNET_free (order_id); return; } { struct GNUNET_TIME_Timestamp rd; struct GNUNET_JSON_Specification spec[] = { TALER_JSON_spec_amount ("amount", TMH_currency, &order_amount), GNUNET_JSON_spec_timestamp ("refund_deadline", &rd), GNUNET_JSON_spec_string ("summary", &summary), GNUNET_JSON_spec_end () }; if (GNUNET_OK != GNUNET_JSON_parse (contract_terms, spec, NULL, NULL)) { GNUNET_break (0); po->result = TALER_EC_MERCHANT_GENERIC_DB_CONTRACT_CONTENT_INVALID; json_decref (contract_terms); GNUNET_free (order_id); return; } if (GNUNET_TIME_absolute_is_future (rd.abs_time) && paid) { struct TALER_Amount refund_amount; GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TMH_currency, &refund_amount)); qs = TMH_db->lookup_refunds_detailed (TMH_db->cls, po->instance_id, &h_contract_terms, &process_refunds_cb, &refund_amount); if (0 > qs) { GNUNET_break (0); po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; json_decref (contract_terms); GNUNET_free (order_id); return; } if (0 > TALER_amount_cmp (&refund_amount, &order_amount)) refundable = true; } } GNUNET_assert (0 == json_array_append_new ( po->pa, GNUNET_JSON_PACK ( GNUNET_JSON_pack_string ("order_id", order_id), GNUNET_JSON_pack_uint64 ("row_id", order_serial), GNUNET_JSON_pack_timestamp ("timestamp", creation_time), TALER_JSON_pack_amount ("amount", &order_amount), GNUNET_JSON_pack_string ("summary", summary), GNUNET_JSON_pack_bool ("refundable", refundable), GNUNET_JSON_pack_bool ("paid", paid)))); json_decref (contract_terms); GNUNET_free (order_id); } /** * We have received a trigger from the database * that we should (possibly) resume some requests. * * @param cls a `struct TMH_MerchantInstance` * @param extra a `struct TMH_OrderChangeEventP` * @param extra_size number of bytes in @a extra */ static void resume_by_event (void *cls, const void *extra, size_t extra_size) { struct TMH_MerchantInstance *mi = cls; const struct TMH_OrderChangeEventDetailsP *oce = extra; struct TMH_PendingOrder *pn; enum TMH_OrderStateFlags osf; uint64_t order_serial_id; struct GNUNET_TIME_Timestamp date; if (sizeof (*oce) != extra_size) { GNUNET_break (0); return; } osf = (enum TMH_OrderStateFlags) ntohl (oce->order_state); order_serial_id = GNUNET_ntohll (oce->order_serial_id); date = GNUNET_TIME_timestamp_ntoh (oce->execution_date); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Received notification about new order %llu\n", (unsigned long long) order_serial_id); for (struct TMH_PendingOrder *po = mi->po_head; NULL != po; po = pn) { pn = po->next; if (! ( ( ((TALER_EXCHANGE_YNA_YES == po->of.paid) == (0 != (osf & TMH_OSF_PAID))) || (TALER_EXCHANGE_YNA_ALL == po->of.paid) ) && ( ((TALER_EXCHANGE_YNA_YES == po->of.refunded) == (0 != (osf & TMH_OSF_REFUNDED))) || (TALER_EXCHANGE_YNA_ALL == po->of.refunded) ) && ( ((TALER_EXCHANGE_YNA_YES == po->of.wired) == (0 != (osf & TMH_OSF_WIRED))) || (TALER_EXCHANGE_YNA_ALL == po->of.wired) ) ) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Client %p waits on different order type\n", po); continue; } if (po->of.delta > 0) { if (order_serial_id < po->of.start_row) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Client %p waits on different order row\n", po); continue; } if (GNUNET_TIME_timestamp_cmp (date, <, po->of.date)) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Client %p waits on different order date\n", po); continue; } po->of.delta--; } else { if (order_serial_id > po->of.start_row) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Client %p waits on different order row\n", po); continue; } if (GNUNET_TIME_timestamp_cmp (date, >, po->of.date)) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Client %p waits on different order date\n", po); continue; } po->of.delta++; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Waking up client %p!\n", po); add_order (po, NULL, order_serial_id, date); GNUNET_assert (po->in_dll); GNUNET_CONTAINER_DLL_remove (mi->po_head, mi->po_tail, po); po->in_dll = false; GNUNET_assert (po == GNUNET_CONTAINER_heap_remove_node (po->hn)); po->hn = NULL; MHD_resume_connection (po->con); TALER_MHD_daemon_trigger (); /* we resumed, kick MHD */ } if (NULL == mi->po_head) { TMH_db->event_listen_cancel (mi->po_eh); mi->po_eh = NULL; } } /** * There has been a change or addition of a new @a order_id. Wake up * long-polling clients that may have been waiting for this event. * * @param mi the instance where the order changed * @param osf order state flags * @param date execution date of the order * @param order_serial_id serial ID of the order in the database */ void TMH_notify_order_change (struct TMH_MerchantInstance *mi, enum TMH_OrderStateFlags osf, struct GNUNET_TIME_Timestamp date, uint64_t order_serial_id) { struct TMH_OrderChangeEventDetailsP oce = { .order_serial_id = GNUNET_htonll (order_serial_id), .execution_date = GNUNET_TIME_timestamp_hton (date), .order_state = htonl (osf) }; struct TMH_OrderChangeEventP eh = { .header.type = htons (TALER_DBEVENT_MERCHANT_ORDERS_CHANGE), .header.size = htons (sizeof (eh)), .merchant_pub = mi->merchant_pub }; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Notifying clients of new order %llu at %s\n", (unsigned long long) order_serial_id, TALER_B2S (&mi->merchant_pub)); TMH_db->event_notify (TMH_db->cls, &eh.header, &oce, sizeof (oce)); } /** * Handle a GET "/orders" request. * * @param rh context of the handler * @param connection the MHD connection to handle * @param[in,out] hc context with further information about the request * @return MHD result code */ MHD_RESULT TMH_private_get_orders (const struct TMH_RequestHandler *rh, struct MHD_Connection *connection, struct TMH_HandlerContext *hc) { struct TMH_PendingOrder *po = hc->ctx; enum GNUNET_DB_QueryStatus qs; struct TALER_MERCHANTDB_OrderFilter of; if (NULL != po) { /* resumed from long-polling, return answer we already have in 'hc->ctx' */ if (TALER_EC_NONE != po->result) { GNUNET_break (0); return TALER_MHD_reply_with_error (connection, MHD_HTTP_INTERNAL_SERVER_ERROR, po->result, NULL); } return TALER_MHD_REPLY_JSON_PACK ( connection, MHD_HTTP_OK, GNUNET_JSON_pack_array_incref ("orders", po->pa)); } if (! (TALER_arg_to_yna (connection, "paid", TALER_EXCHANGE_YNA_ALL, &of.paid)) ) return TALER_MHD_reply_with_error (connection, MHD_HTTP_BAD_REQUEST, TALER_EC_GENERIC_PARAMETER_MALFORMED, "paid"); if (! (TALER_arg_to_yna (connection, "refunded", TALER_EXCHANGE_YNA_ALL, &of.refunded)) ) return TALER_MHD_reply_with_error (connection, MHD_HTTP_BAD_REQUEST, TALER_EC_GENERIC_PARAMETER_MALFORMED, "refunded"); if (! (TALER_arg_to_yna (connection, "wired", TALER_EXCHANGE_YNA_ALL, &of.wired)) ) return TALER_MHD_reply_with_error (connection, MHD_HTTP_BAD_REQUEST, TALER_EC_GENERIC_PARAMETER_MALFORMED, "wired"); { const char *delta_str; delta_str = MHD_lookup_connection_value (connection, MHD_GET_ARGUMENT_KIND, "delta"); if (NULL == delta_str) { of.delta = -20; } else { char dummy; long long ll; if (1 != sscanf (delta_str, "%lld%c", &ll, &dummy)) { GNUNET_break_op (0); return TALER_MHD_reply_with_error (connection, MHD_HTTP_BAD_REQUEST, TALER_EC_GENERIC_PARAMETER_MALFORMED, "delta"); } of.delta = (int64_t) ll; if ( (-MAX_DELTA > of.delta) || (of.delta > MAX_DELTA) ) { GNUNET_break_op (0); return TALER_MHD_reply_with_error (connection, MHD_HTTP_BAD_REQUEST, TALER_EC_GENERIC_PARAMETER_MALFORMED, "delta"); } } } { const char *date_s_str; date_s_str = MHD_lookup_connection_value (connection, MHD_GET_ARGUMENT_KIND, "date_s"); if (NULL == date_s_str) { if (of.delta > 0) of.date = GNUNET_TIME_UNIT_ZERO_TS; else of.date = GNUNET_TIME_UNIT_FOREVER_TS; } else { char dummy; unsigned long long ll; if (1 != sscanf (date_s_str, "%llu%c", &ll, &dummy)) { GNUNET_break_op (0); return TALER_MHD_reply_with_error (connection, MHD_HTTP_BAD_REQUEST, TALER_EC_GENERIC_PARAMETER_MALFORMED, "date_ms"); } of.date = GNUNET_TIME_absolute_to_timestamp ( GNUNET_TIME_absolute_from_s (ll)); if (GNUNET_TIME_absolute_is_never (of.date.abs_time)) { GNUNET_break_op (0); return TALER_MHD_reply_with_error (connection, MHD_HTTP_BAD_REQUEST, TALER_EC_GENERIC_PARAMETER_MALFORMED, "date_s"); } } } { const char *start_row_str; start_row_str = MHD_lookup_connection_value (connection, MHD_GET_ARGUMENT_KIND, "start"); if (NULL == start_row_str) { if (of.delta > 0) of.start_row = 0; else of.start_row = INT64_MAX; } else { char dummy; unsigned long long ull; if (1 != sscanf (start_row_str, "%llu%c", &ull, &dummy)) return TALER_MHD_reply_with_error (connection, MHD_HTTP_BAD_REQUEST, TALER_EC_GENERIC_PARAMETER_MALFORMED, "start"); of.start_row = (uint64_t) ull; if (INT64_MAX < of.start_row) { GNUNET_break_op (0); return TALER_MHD_reply_with_error (connection, MHD_HTTP_BAD_REQUEST, TALER_EC_GENERIC_PARAMETER_MALFORMED, "start"); } } } { const char *timeout_ms_str; timeout_ms_str = MHD_lookup_connection_value (connection, MHD_GET_ARGUMENT_KIND, "timeout_ms"); if (NULL == timeout_ms_str) { of.timeout = GNUNET_TIME_UNIT_ZERO; } else { char dummy; unsigned long long ull; if (1 != sscanf (timeout_ms_str, "%lld%c", &ull, &dummy)) return TALER_MHD_reply_with_error (connection, MHD_HTTP_BAD_REQUEST, TALER_EC_GENERIC_PARAMETER_MALFORMED, "timeout_ms"); of.timeout = GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MILLISECONDS, ull); if (GNUNET_TIME_relative_is_forever (of.timeout)) { GNUNET_break_op (0); return TALER_MHD_reply_with_error (connection, MHD_HTTP_BAD_REQUEST, TALER_EC_GENERIC_PARAMETER_MALFORMED, "timeout_ms"); } } if ( (0 >= of.delta) && (! GNUNET_TIME_relative_is_zero (of.timeout)) ) { GNUNET_break_op (0); of.timeout = GNUNET_TIME_UNIT_ZERO; } } po = GNUNET_new (struct TMH_PendingOrder); hc->ctx = po; hc->cc = &cleanup; po->con = connection; po->pa = json_array (); GNUNET_assert (NULL != po->pa); po->instance_id = hc->instance->settings.id; po->mi = hc->instance; qs = TMH_db->lookup_orders (TMH_db->cls, po->instance_id, &of, &add_order, po); if (0 > qs) { GNUNET_break (0); po->result = TALER_EC_GENERIC_DB_FETCH_FAILED; } if (TALER_EC_NONE != po->result) { GNUNET_break (0); return TALER_MHD_reply_with_error (connection, MHD_HTTP_INTERNAL_SERVER_ERROR, po->result, NULL); } if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) && (! GNUNET_TIME_relative_is_zero (of.timeout)) ) { struct TMH_MerchantInstance *mi = hc->instance; /* setup timeout heap (if not yet exists) */ if (NULL == order_timeout_heap) order_timeout_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MIN); po->hn = GNUNET_CONTAINER_heap_insert (order_timeout_heap, po, po->long_poll_timeout.abs_value_us); po->long_poll_timeout = GNUNET_TIME_relative_to_absolute (of.timeout); po->of = of; GNUNET_CONTAINER_DLL_insert (mi->po_head, mi->po_tail, po); po->in_dll = true; if (NULL == mi->po_eh) { struct TMH_OrderChangeEventP change_eh = { .header.type = htons (TALER_DBEVENT_MERCHANT_ORDERS_CHANGE), .header.size = htons (sizeof (change_eh)), .merchant_pub = mi->merchant_pub }; mi->po_eh = TMH_db->event_listen (TMH_db->cls, &change_eh.header, GNUNET_TIME_UNIT_FOREVER_REL, &resume_by_event, mi); } MHD_suspend_connection (connection); { struct TMH_PendingOrder *pot; /* start timeout task */ pot = GNUNET_CONTAINER_heap_peek (order_timeout_heap); if (NULL != order_timeout_task) GNUNET_SCHEDULER_cancel (order_timeout_task); order_timeout_task = GNUNET_SCHEDULER_add_at (pot->long_poll_timeout, &order_timeout, NULL); } return MHD_YES; } return TALER_MHD_REPLY_JSON_PACK ( connection, MHD_HTTP_OK, GNUNET_JSON_pack_array_incref ("orders", po->pa)); } /* end of taler-merchant-httpd_private-get-orders.c */