diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 448 |
1 files changed, 199 insertions, 249 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 71a3efd7d..2704f5910 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2016-2018 Taler Systems SA + Copyright (C) 2016-2020 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -26,6 +26,7 @@ #include "taler_exchangedb_lib.h" #include "taler_exchangedb_plugin.h" #include "taler_json_lib.h" +#include "taler_bank_service.h" #include "taler_wire_lib.h" @@ -45,9 +46,14 @@ struct WireAccount struct WireAccount *prev; /** - * Handle to the plugin. + * Account information. */ - struct TALER_WIRE_Plugin *wire_plugin; + struct TALER_Account account; + + /** + * Authentication data. + */ + struct TALER_BANK_AuthenticationData auth; /** * Wire transfer fee structure. @@ -59,6 +65,11 @@ struct WireAccount */ char *section_name; + /** + * Name of the wire method underlying the account. + */ + char *method; + }; @@ -77,7 +88,7 @@ struct WirePrepareData /** * Wire execution handle. */ - struct TALER_WIRE_ExecuteHandle *eh; + struct TALER_BANK_WireExecuteHandle *eh; /** * Wire plugin used for this preparation. @@ -187,10 +198,6 @@ struct AggregationUnit */ struct CloseTransferContext { - /** - * Handle for preparing the wire transfer. - */ - struct TALER_WIRE_PrepareHandle *ph; /** * Our database session. @@ -263,6 +270,16 @@ static struct WirePrepareData *wpd; static struct AggregationUnit *au; /** + * Handle to the context for interacting with the bank. + */ +static struct GNUNET_CURL_Context *ctx; + +/** + * Scheduler context for running the @e ctx. + */ +static struct GNUNET_CURL_RescheduleContext *rc; + +/** * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR * on serious errors. */ @@ -339,7 +356,7 @@ update_fees (struct WireAccount *wa, return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; /* Let's try to load it from disk... */ wa->af = TALER_EXCHANGEDB_fees_read (cfg, - wa->wire_plugin->method); + wa->method); advance_fees (wa, now); for (struct TALER_EXCHANGEDB_AggregateFees *p = wa->af; @@ -348,7 +365,7 @@ update_fees (struct WireAccount *wa, { qs = db_plugin->insert_wire_fee (db_plugin->cls, session, - wa->wire_plugin->method, + wa->method, p->start_date, p->end_date, &p->wire_fee, @@ -365,7 +382,7 @@ update_fees (struct WireAccount *wa, return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to find current wire transfer fees for `%s'\n", - wa->wire_plugin->method); + wa->method); return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS; } @@ -381,7 +398,7 @@ find_account_by_method (const char *method) { for (struct WireAccount *wa = wa_head; NULL != wa; wa = wa->next) if (0 == strcmp (method, - wa->wire_plugin->method)) + wa->method)) return wa; return NULL; } @@ -431,13 +448,40 @@ add_account_cb (void *cls, if (GNUNET_YES != ai->debit_enabled) return; /* not enabled for us, skip */ wa = GNUNET_new (struct WireAccount); - wa->wire_plugin = TALER_WIRE_plugin_load (cfg, - ai->plugin_name); - if (NULL == wa->wire_plugin) + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_string (cfg, + ai->section_name, + "METHOD", + &wa->method)) { - fprintf (stderr, - "Failed to load wire plugin for `%s'\n", - ai->plugin_name); + GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, + ai->section_name, + "METHOD"); + GNUNET_free (wa); + return; + } + if (GNUNET_OK != + TALER_BANK_auth_parse_cfg (cfg, + ai->section_name, + &wa->auth)) + { + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "Failed to load account `%s'\n", + ai->section_name); + GNUNET_free (wa->method); + GNUNET_free (wa); + return; + } + if (GNUNET_OK != + TALER_BANK_account_parse_cfg (cfg, + ai->section_name, + &wa->account)) + { + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + "Failed to load account `%s'\n", + ai->section_name); + TALER_BANK_auth_free (&wa->auth); + GNUNET_free (wa->method); GNUNET_free (wa); return; } @@ -476,6 +520,16 @@ static void shutdown_task (void *cls) { (void) cls; + if (NULL != ctx) + { + GNUNET_CURL_fini (ctx); + ctx = NULL; + } + if (NULL != rc) + { + GNUNET_CURL_gnunet_rc_destroy (rc); + rc = NULL; + } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Running shutdown\n"); if (NULL != task) @@ -487,9 +541,7 @@ shutdown_task (void *cls) { if (NULL != wpd->eh) { - wpd->wa->wire_plugin->execute_wire_transfer_cancel ( - wpd->wa->wire_plugin->cls, - wpd->eh); + TALER_BANK_execute_wire_transfer_cancel (wpd->eh); wpd->eh = NULL; } db_plugin->rollback (db_plugin->cls, @@ -499,23 +551,12 @@ shutdown_task (void *cls) } if (NULL != au) { - if (NULL != au->ph) - { - au->wa->wire_plugin->prepare_wire_transfer_cancel ( - au->wa->wire_plugin->cls, - au->ph); - au->ph = NULL; - } db_plugin->rollback (db_plugin->cls, au->session); cleanup_au (); } if (NULL != ctc) { - ctc->wa->wire_plugin->prepare_wire_transfer_cancel ( - ctc->wa->wire_plugin->cls, - ctc->ph); - ctc->ph = NULL; db_plugin->rollback (db_plugin->cls, ctc->session); GNUNET_free (ctc->method); @@ -532,9 +573,11 @@ shutdown_task (void *cls) GNUNET_CONTAINER_DLL_remove (wa_head, wa_tail, wa); - TALER_WIRE_plugin_unload (wa->wire_plugin); + TALER_WIRE_account_free (&wa->account); + TALER_BANK_auth_free (&wa->auth); TALER_EXCHANGEDB_fees_free (wa->af); GNUNET_free (wa->section_name); + GNUNET_free (wa->method); GNUNET_free (wa); } } @@ -922,20 +965,6 @@ aggregate_cb (void *cls, /** - * Function to be called with the prepared transfer data - * when running an aggregation on a merchant. - * - * @param cls closure with the `struct AggregationUnit` - * @param buf transaction data to persist, NULL on error - * @param buf_size number of bytes in @a buf, 0 on error - */ -static void -prepare_cb (void *cls, - const char *buf, - size_t buf_size); - - -/** * Main work function that finds and triggers transfers for reserves * closures. * @@ -989,83 +1018,6 @@ commit_or_warn (struct TALER_EXCHANGEDB_Session *session) /** - * Function to be called with the prepared transfer data - * when closing a reserve. - * - * @param cls closure with a `struct CloseTransferContext` - * @param buf transaction data to persist, NULL on error - * @param buf_size number of bytes in @a buf, 0 on error - */ -static void -prepare_close_cb (void *cls, - const char *buf, - size_t buf_size) -{ - enum GNUNET_DB_QueryStatus qs; - - GNUNET_assert (cls == ctc); - - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Prepared for reserve closing\n"); - ctc->ph = NULL; - if (NULL == buf) - { - GNUNET_break (0); /* why? how to best recover? */ - db_plugin->rollback (db_plugin->cls, - ctc->session); - /* start again */ - GNUNET_free (ctc->method); - GNUNET_free (ctc); - ctc = NULL; - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); - return; - } - - /* Commit our intention to execute the wire transfer! */ - qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, - ctc->session, - ctc->method, - buf, - buf_size); - if (GNUNET_DB_STATUS_HARD_ERROR == qs) - { - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - ctc->session); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - GNUNET_free (ctc->method); - GNUNET_free (ctc); - ctc = NULL; - return; - } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - db_plugin->rollback (db_plugin->cls, - ctc->session); - /* start again */ - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); - GNUNET_free (ctc->method); - GNUNET_free (ctc); - ctc = NULL; - return; - } - - /* finally commit */ - (void) commit_or_warn (ctc->session); - GNUNET_free (ctc->method); - GNUNET_free (ctc); - ctc = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Reserve closure committed, running transfer\n"); - task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); -} - - -/** * Closure for #expired_reserve_cb(). */ struct ExpiredReserveContext @@ -1113,6 +1065,8 @@ expired_reserve_cb (void *cls, int ret; enum GNUNET_DB_QueryStatus qs; struct WireAccount *wa; + void *buf; + size_t buf_size; /* NOTE: potential optimization: use custom SQL API to not fetch this: */ @@ -1121,7 +1075,7 @@ expired_reserve_cb (void *cls, now = GNUNET_TIME_absolute_get (); (void) GNUNET_TIME_round_abs (&now); - /* lookup wire plugin */ + /* lookup account we should use */ wa = find_account_by_url (account_details); if (NULL == wa) { @@ -1161,6 +1115,18 @@ expired_reserve_cb (void *cls, TALER_amount_get_zero (left->currency, &amount_without_fee)); } + /* round down to enable transfer */ + if (GNUNET_SYSERR == + TALER_WIRE_amount_round (&amount_without_fee)) + { + GNUNET_break (0); + global_ret = GNUNET_SYSERR; + GNUNET_SCHEDULER_shutdown (); + return GNUNET_DB_STATUS_HARD_ERROR; + } + if ( (0 == amount_without_fee.value) && + (0 == amount_without_fee.fraction) ) + ret = GNUNET_NO; /* NOTE: sizeof (*reserve_pub) == sizeof (wtid) right now, but to be future-compatible, we use the memset + min construction */ @@ -1171,61 +1137,23 @@ expired_reserve_cb (void *cls, reserve_pub, GNUNET_MIN (sizeof (wtid), sizeof (*reserve_pub))); - - qs = db_plugin->insert_reserve_closed (db_plugin->cls, - session, - reserve_pub, - now, - account_details, - &wtid, - left, - closing_fee); - + if (GNUNET_SYSERR != ret) + qs = db_plugin->insert_reserve_closed (db_plugin->cls, + session, + reserve_pub, + now, + account_details, + &wtid, + left, + closing_fee); + else + ret = GNUNET_DB_STATUS_HARD_ERROR; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Closing reserve %s over %s (%d, %d)\n", TALER_B2S (reserve_pub), TALER_amount2s (left), ret, qs); - if ( (GNUNET_OK == ret) && - (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) ) - { - /* success, perform wire transfer */ - if (GNUNET_SYSERR == - wa->wire_plugin->amount_round (wa->wire_plugin->cls, - &amount_without_fee)) - { - GNUNET_break (0); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_DB_STATUS_HARD_ERROR; - } - ctc = GNUNET_new (struct CloseTransferContext); - ctc->wa = wa; - ctc->session = session; - ctc->method = TALER_WIRE_payto_get_method (account_details); - ctc->ph - = wa->wire_plugin->prepare_wire_transfer (wa->wire_plugin->cls, - wa->section_name, - account_details, - &amount_without_fee, - exchange_base_url, - &wtid, - &prepare_close_cb, - ctc); - if (NULL == ctc->ph) - { - GNUNET_break (0); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - GNUNET_free (ctc->method); - GNUNET_free (ctc); - ctc = NULL; - return GNUNET_DB_STATUS_HARD_ERROR; - } - erc->async_cont = GNUNET_YES; - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - } /* Check for hard failure */ if ( (GNUNET_SYSERR == ret) || (GNUNET_DB_STATUS_HARD_ERROR == qs) ) @@ -1235,10 +1163,59 @@ expired_reserve_cb (void *cls, GNUNET_SCHEDULER_shutdown (); return GNUNET_DB_STATUS_HARD_ERROR; } - /* Reserve balance was almost zero OR soft error */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Reserve was virtually empty, moving on\n"); - return qs; + if ( (GNUNET_OK != ret) || + (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) ) + { + /* Reserve balance was almost zero OR soft error */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Reserve was virtually empty, moving on\n"); + (void) commit_or_warn (ctc->session); + GNUNET_free (ctc->method); + GNUNET_free (ctc); + ctc = NULL; + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); + return qs; + } + + /* success, perform wire transfer */ + ctc = GNUNET_new (struct CloseTransferContext); + ctc->wa = wa; + ctc->session = session; + ctc->method = TALER_WIRE_payto_get_method (account_details); + TALER_BANK_prepare_wire_transfer (account_details, + &amount_without_fee, + exchange_base_url, + &wtid, + &buf, + &buf_size); + /* Commit our intention to execute the wire transfer! */ + qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, + ctc->session, + ctc->method, + buf, + buf_size); + GNUNET_free (buf); + if (GNUNET_DB_STATUS_HARD_ERROR == qs) + { + GNUNET_break (0); + GNUNET_free (ctc->method); + GNUNET_free (ctc); + ctc = NULL; + return GNUNET_DB_STATUS_HARD_ERROR; + } + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + /* start again */ + GNUNET_free (ctc->method); + GNUNET_free (ctc); + ctc = NULL; + return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS; + } + erc->async_cont = GNUNET_YES; + task = GNUNET_SCHEDULER_add_now (&run_transfers, + NULL); + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } @@ -1344,6 +1321,8 @@ run_aggregation (void *cls) struct TALER_EXCHANGEDB_Session *session; enum GNUNET_DB_QueryStatus qs; const struct GNUNET_SCHEDULER_TaskContext *tc; + void *buf; + size_t buf_size; (void) cls; task = NULL; @@ -1470,8 +1449,7 @@ run_aggregation (void *cls) &au->total_amount, &au->wire_fee)) || (GNUNET_SYSERR == - au->wa->wire_plugin->amount_round (au->wa->wire_plugin->cls, - &au->final_amount)) || + TALER_WIRE_amount_round (&au->final_amount)) || ( (0 == au->final_amount.value) && (0 == au->final_amount.fraction) ) ) { @@ -1555,70 +1533,26 @@ run_aggregation (void *cls) char *url; url = TALER_JSON_wire_to_payto (au->wire); - au->ph = au->wa->wire_plugin->prepare_wire_transfer ( - au->wa->wire_plugin->cls, - au->wa->section_name, - url, - &au->final_amount, - exchange_base_url, - &au->wtid, - &prepare_cb, - au); + TALER_BANK_prepare_wire_transfer (url, + &au->final_amount, + exchange_base_url, + &au->wtid, + &buf, + &buf_size); GNUNET_free (url); } - if (NULL == au->ph) - { - /* something went very wrong, likely bad configuration, - abort */ - db_plugin->rollback (db_plugin->cls, - session); - cleanup_au (); - GNUNET_SCHEDULER_shutdown (); - return; - } - /* otherwise we continue with #prepare_cb(), see below */ -} - - -/** - * Function to be called with the prepared transfer data. - * - * @param cls NULL - * @param buf transaction data to persist, NULL on error - * @param buf_size number of bytes in @a buf, 0 on error - */ -static void -prepare_cb (void *cls, - const char *buf, - size_t buf_size) -{ - struct TALER_EXCHANGEDB_Session *session = au->session; - enum GNUNET_DB_QueryStatus qs; - - (void) cls; GNUNET_free_non_null (au->additional_rows); au->additional_rows = NULL; - if (NULL == buf) - { - GNUNET_break (0); /* why? how to best recover? */ - db_plugin->rollback (db_plugin->cls, - session); - /* start again */ - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); - cleanup_au (); - return; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Storing %u bytes of wire prepare data\n", (unsigned int) buf_size); /* Commit our intention to execute the wire transfer! */ qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, session, - au->wa->wire_plugin->method, + au->wa->method, buf, buf_size); + GNUNET_free (buf); /* Commit the WTID data to 'wire_out' to finally satisfy aggregation table constraints */ if (qs >= 0) @@ -1691,29 +1625,30 @@ prepare_cb (void *cls, * Function called with the result from the execute step. * * @param cls NULL - * @param success #GNUNET_OK on success, #GNUNET_SYSERR on failure - * @param serial_id unique ID of the wire transfer in the bank's records; UINT64_MAX on error - * @param emsg NULL on success, otherwise an error message + * @param http_status_code #MHD_HTTP_OK on success + * @param ec taler error code + * @param row_id unique ID of the wire transfer in the bank's records + * @param wire_timestamp when did the transfer happen */ static void wire_confirm_cb (void *cls, - int success, - const void *row_id, - size_t row_id_size, - const char *emsg) + unsigned int http_status_code, + enum TALER_ErrorCode ec, + uint64_t row_id, + struct GNUNET_TIME_Absolute wire_timestamp) { struct TALER_EXCHANGEDB_Session *session = wpd->session; enum GNUNET_DB_QueryStatus qs; (void) cls; (void) row_id; - (void) row_id_size; wpd->eh = NULL; - if (GNUNET_SYSERR == success) + if (MHD_HTTP_OK != http_status_code) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Wire transaction failed: %s\n", - emsg); + "Wire transaction failed: %u/%d\n", + http_status_code, + ec); db_plugin->rollback (db_plugin->cls, session); global_ret = GNUNET_SYSERR; @@ -1792,6 +1727,8 @@ wire_prepare_cb (void *cls, const char *buf, size_t buf_size) { + struct WireAccount *wa; + (void) cls; wpd->row_id = rowid; GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -1811,12 +1748,15 @@ wire_prepare_cb (void *cls, wpd = NULL; return; } - wpd->eh = wpd->wa->wire_plugin->execute_wire_transfer ( - wpd->wa->wire_plugin->cls, - buf, - buf_size, - &wire_confirm_cb, - NULL); + wa = wpd->wa; + wpd->eh = TALER_BANK_execute_wire_transfer (ctx, + wa->account.details.x_taler_bank. + account_base_url, + &wa->auth, + buf, + buf_size, + &wire_confirm_cb, + NULL); if (NULL == wpd->eh) { GNUNET_break (0); /* why? how to best recover? */ @@ -1927,6 +1867,7 @@ run (void *cls, (void) cls; (void) args; (void) cfgfile; + if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (c, "exchange", @@ -1947,6 +1888,15 @@ run (void *cls, global_ret = 1; return; } + ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, + &rc); + rc = GNUNET_CURL_gnunet_rc_create (ctx); + if (NULL == ctx) + { + GNUNET_break (0); + return; + } + task = GNUNET_SCHEDULER_add_now (&run_transfers, NULL); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, |