From 83631bc98fe70dd73f212581fb54ab3a82560686 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 12 Mar 2020 10:11:24 +0100 Subject: split reserve closing from main aggregation logic --- src/exchange/taler-exchange-aggregator.c | 666 +------------------------------ 1 file changed, 19 insertions(+), 647 deletions(-) (limited to 'src/exchange/taler-exchange-aggregator.c') diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 5f99a472b..431abea4d 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -42,44 +42,6 @@ #include "taler_bank_service.h" -/** - * Information we keep for each supported account of the exchange. - */ -struct WireAccount -{ - /** - * Accounts are kept in a DLL. - */ - struct WireAccount *next; - - /** - * Plugins are kept in a DLL. - */ - struct WireAccount *prev; - - /** - * Authentication data. - */ - struct TALER_BANK_AuthenticationData auth; - - /** - * Wire transfer fee structure. - */ - struct TALER_EXCHANGEDB_AggregateFees *af; - - /** - * Name of the section that configures this account. - */ - char *section_name; - - /** - * Name of the wire method underlying the account. - */ - char *method; - -}; - - /** * Data we keep to #run_transfers(). There is at most * one of these around at any given point in time. @@ -102,7 +64,7 @@ struct WirePrepareData /** * Wire account used for this preparation. */ - struct WireAccount *wa; + struct TALER_EXCHANGEDB_WireAccount *wa; /** * Row ID of the transfer. @@ -170,7 +132,7 @@ struct AggregationUnit * Exchange wire account to be used for the preparation and * eventual execution of the aggregate wire transfer. */ - struct WireAccount *wa; + struct TALER_EXCHANGEDB_WireAccount *wa; /** * Database session for all of our transactions. @@ -206,35 +168,6 @@ struct AggregationUnit }; -/** - * Context we use while closing a reserve. - */ -struct CloseTransferContext -{ - - /** - * Our database session. - */ - struct TALER_EXCHANGEDB_Session *session; - - /** - * Wire transfer method. - */ - char *method; - - /** - * Wire account used for closing the reserve. - */ - struct WireAccount *wa; -}; - - -/** - * Active context while processing reserve closing, - * or NULL. - */ -static struct CloseTransferContext *ctc; - /** * Which currency is used by this exchange? */ @@ -263,16 +196,6 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg; */ static struct TALER_EXCHANGEDB_Plugin *db_plugin; -/** - * Head of list of wire accounts of the exchange. - */ -static struct WireAccount *wa_head; - -/** - * Tail of list of wire accounts of the exchange. - */ -static struct WireAccount *wa_tail; - /** * Next task to run, if any. */ @@ -310,23 +233,6 @@ static int global_ret; */ static int test_mode; -/** - * Did #run_reserve_closures() have any work during its last run? - * Used to detect when we should go to sleep for a while to avoid - * busy waiting. - */ -static int reserves_idle; - - -/** - * Main work function that finds and triggers transfers for reserves - * closures. - * - * @param cls closure - */ -static void -run_reserve_closures (void *cls); - /** * Main work function that queries the DB and aggregates transactions @@ -348,191 +254,6 @@ static void run_transfers (void *cls); -/** - * Find the record valid at time @a now in the fee structure. - * - * @param wa wire transfer fee data structure to update - * @param now timestamp to update fees to - * @return fee valid at @a now, or NULL if unknown - */ -static struct TALER_EXCHANGEDB_AggregateFees * -advance_fees (struct WireAccount *wa, - struct GNUNET_TIME_Absolute now) -{ - struct TALER_EXCHANGEDB_AggregateFees *af; - - af = wa->af; - while ( (NULL != af) && - (af->end_date.abs_value_us < now.abs_value_us) ) - af = af->next; - return af; -} - - -/** - * Update wire transfer fee data structure in @a wa. - * - * @param wa wire account data structure to update - * @param now timestamp to update fees to - * @param session DB session to use - * @return fee valid at @a now, or NULL if unknown - */ -static struct TALER_EXCHANGEDB_AggregateFees * -update_fees (struct WireAccount *wa, - struct GNUNET_TIME_Absolute now, - struct TALER_EXCHANGEDB_Session *session) -{ - enum GNUNET_DB_QueryStatus qs; - struct TALER_EXCHANGEDB_AggregateFees *af; - - af = advance_fees (wa, - now); - if (NULL != af) - return af; - /* Let's try to load it from disk... */ - wa->af = TALER_EXCHANGEDB_fees_read (cfg, - wa->method); - for (struct TALER_EXCHANGEDB_AggregateFees *p = wa->af; - NULL != p; - p = p->next) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Persisting fees starting at %s in database\n", - GNUNET_STRINGS_absolute_time_to_string (p->start_date)); - qs = db_plugin->insert_wire_fee (db_plugin->cls, - session, - wa->method, - p->start_date, - p->end_date, - &p->wire_fee, - &p->closing_fee, - &p->master_sig); - if (qs < 0) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - TALER_EXCHANGEDB_fees_free (wa->af); - wa->af = NULL; - return NULL; - } - } - af = advance_fees (wa, - now); - if (NULL != af) - return af; - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to find current wire transfer fees for `%s' at %s\n", - wa->method, - GNUNET_STRINGS_absolute_time_to_string (now)); - return NULL; -} - - -/** - * Find the wire plugin for the given payto:// URL - * - * @param method wire method we need an account for - * @return NULL on error - */ -static struct WireAccount * -find_account_by_method (const char *method) -{ - for (struct WireAccount *wa = wa_head; NULL != wa; wa = wa->next) - if (0 == strcmp (method, - wa->method)) - return wa; - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "No wire account known for method `%s'\n", - method); - return NULL; -} - - -/** - * Find the wire plugin for the given payto:// URL - * - * @param url wire address we need an account for - * @return NULL on error - */ -static struct WireAccount * -find_account_by_payto_uri (const char *url) -{ - char *method; - struct WireAccount *wa; - - method = TALER_payto_get_method (url); - if (NULL == method) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Invalid payto:// URL `%s'\n", - url); - return NULL; - } - wa = find_account_by_method (method); - GNUNET_free (method); - return wa; -} - - -/** - * Function called with information about a wire account. Adds - * the account to our list. - * - * @param cls closure, NULL - * @param ai account information - */ -static void -add_account_cb (void *cls, - const struct TALER_EXCHANGEDB_AccountInfo *ai) -{ - struct WireAccount *wa; - char *payto_uri; - - (void) cls; - if (GNUNET_YES != ai->debit_enabled) - return; /* not enabled for us, skip */ - wa = GNUNET_new (struct WireAccount); - if (GNUNET_OK != - GNUNET_CONFIGURATION_get_value_string (cfg, - ai->section_name, - "PAYTO_URI", - &payto_uri)) - { - GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR, - ai->section_name, - "PAYTO_URI"); - GNUNET_free (wa); - return; - } - wa->method = TALER_payto_get_method (payto_uri); - if (NULL == wa->method) - { - GNUNET_log_config_invalid (GNUNET_ERROR_TYPE_ERROR, - ai->section_name, - "PAYTO_URI", - "could not obtain wire method from URI"); - GNUNET_free (wa); - return; - } - GNUNET_free (payto_uri); - if (GNUNET_OK != - TALER_BANK_auth_parse_cfg (cfg, - ai->section_name, - &wa->auth)) - { - GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, - "Failed to load exchange account `%s'\n", - ai->section_name); - GNUNET_free (wa->method); - GNUNET_free (wa); - return; - } - wa->section_name = GNUNET_strdup (ai->section_name); - GNUNET_CONTAINER_DLL_insert (wa_head, - wa_tail, - wa); -} - - /** * Free data stored in @a au, but not @a au itself (stack allocated). * @@ -589,32 +310,9 @@ shutdown_task (void *cls) GNUNET_free (wpd); wpd = NULL; } - if (NULL != ctc) - { - db_plugin->rollback (db_plugin->cls, - ctc->session); - GNUNET_free (ctc->method); - GNUNET_free (ctc); - ctc = NULL; - } TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; - - { - struct WireAccount *wa; - - while (NULL != (wa = wa_head)) - { - GNUNET_CONTAINER_DLL_remove (wa_head, - wa_tail, - wa); - TALER_BANK_auth_free (&wa->auth); - TALER_EXCHANGEDB_fees_free (wa->af); - GNUNET_free (wa->section_name); - GNUNET_free (wa->method); - GNUNET_free (wa); - } - } + TALER_EXCHANGEDB_unload_accounts (); cfg = NULL; } @@ -691,10 +389,8 @@ parse_wirewatch_config () "Failed to initialize DB subsystem\n"); return GNUNET_SYSERR; } - TALER_EXCHANGEDB_find_accounts (cfg, - &add_account_cb, - NULL); - if (NULL == wa_head) + if (GNUNET_OK != + TALER_EXCHANGEDB_load_accounts (cfg)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No wire accounts configured for debit!\n"); @@ -836,7 +532,7 @@ deposit_cb (void *cls, char *url; url = TALER_JSON_wire_to_payto (au->wire); - au->wa = find_account_by_payto_uri (url); + au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (url); GNUNET_free (url); } if (NULL == au->wa) @@ -851,9 +547,11 @@ deposit_cb (void *cls, { struct TALER_EXCHANGEDB_AggregateFees *af; - af = update_fees (au->wa, - au->execution_time, - au->session); + af = TALER_EXCHANGEDB_update_fees (cfg, + db_plugin, + au->wa, + au->execution_time, + au->session); if (NULL == af) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -1042,311 +740,6 @@ commit_or_warn (struct TALER_EXCHANGEDB_Session *session) } -/** - * Closure for #expired_reserve_cb(). - */ -struct ExpiredReserveContext -{ - - /** - * Database session we are using. - */ - struct TALER_EXCHANGEDB_Session *session; - - /** - * Set to #GNUNET_YES if the transaction continues - * asynchronously. - */ - int async_cont; -}; - - -/** - * Function called with details about expired reserves. - * We trigger the reserve closure by inserting the respective - * closing record and prewire instructions into the respective - * tables. - * - * @param cls a `struct ExpiredReserveContext *` - * @param reserve_pub public key of the reserve - * @param left amount left in the reserve - * @param account_payto_uri information about the bank account that initially - * caused the reserve to be created - * @param expiration_date when did the reserve expire - * @return transaction status code - */ -static enum GNUNET_DB_QueryStatus -expired_reserve_cb (void *cls, - const struct TALER_ReservePublicKeyP *reserve_pub, - const struct TALER_Amount *left, - const char *account_payto_uri, - struct GNUNET_TIME_Absolute expiration_date) -{ - struct ExpiredReserveContext *erc = cls; - struct TALER_EXCHANGEDB_Session *session = erc->session; - struct GNUNET_TIME_Absolute now; - struct TALER_WireTransferIdentifierRawP wtid; - struct TALER_Amount amount_without_fee; - const struct TALER_Amount *closing_fee; - int ret; - enum GNUNET_DB_QueryStatus qs; - struct WireAccount *wa; - void *buf; - size_t buf_size; - - /* NOTE: potential optimization: use custom SQL API to not - fetch this: */ - GNUNET_assert (NULL == ctc); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Processing reserve closure at %s\n", - GNUNET_STRINGS_absolute_time_to_string (expiration_date)); - now = GNUNET_TIME_absolute_get (); - (void) GNUNET_TIME_round_abs (&now); - - /* lookup account we should use */ - wa = find_account_by_payto_uri (account_payto_uri); - if (NULL == wa) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "No wire account configured to deal with target URI `%s'\n", - account_payto_uri); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_DB_STATUS_HARD_ERROR; - } - - /* lookup `closing_fee` from time of actual reserve expiration - (we may be lagging behind!) */ - { - struct TALER_EXCHANGEDB_AggregateFees *af; - - af = update_fees (wa, - expiration_date, - session); - if (NULL == af) - { - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_DB_STATUS_HARD_ERROR; - } - closing_fee = &af->closing_fee; - } - - /* calculate transfer amount */ - ret = TALER_amount_subtract (&amount_without_fee, - left, - closing_fee); - if ( (GNUNET_SYSERR == ret) || - (GNUNET_NO == ret) ) - { - /* Closing fee higher than or equal to remaining balance, close - without wire transfer. */ - closing_fee = left; - GNUNET_assert (GNUNET_OK == - TALER_amount_get_zero (left->currency, - &amount_without_fee)); - } - /* round down to enable transfer */ - if (GNUNET_SYSERR == - TALER_amount_round_down (&amount_without_fee, - ¤cy_round_unit)) - { - GNUNET_break (0); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_DB_STATUS_HARD_ERROR; - } - if ( (0 == amount_without_fee.value) && - (0 == amount_without_fee.fraction) ) - ret = GNUNET_NO; - - /* NOTE: sizeof (*reserve_pub) == sizeof (wtid) right now, but to - be future-compatible, we use the memset + min construction */ - memset (&wtid, - 0, - sizeof (wtid)); - memcpy (&wtid, - reserve_pub, - GNUNET_MIN (sizeof (wtid), - sizeof (*reserve_pub))); - if (GNUNET_SYSERR != ret) - qs = db_plugin->insert_reserve_closed (db_plugin->cls, - session, - reserve_pub, - now, - account_payto_uri, - &wtid, - left, - closing_fee); - else - qs = GNUNET_DB_STATUS_HARD_ERROR; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Closing reserve %s over %s (%d, %d)\n", - TALER_B2S (reserve_pub), - TALER_amount2s (left), - ret, - qs); - /* Check for hard failure */ - if ( (GNUNET_SYSERR == ret) || - (GNUNET_DB_STATUS_HARD_ERROR == qs) ) - { - GNUNET_break (0); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_DB_STATUS_HARD_ERROR; - } - if ( (GNUNET_OK != ret) || - (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) ) - { - /* Reserve balance was almost zero OR soft error */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Reserve was virtually empty, moving on\n"); - (void) commit_or_warn (session); - erc->async_cont = GNUNET_YES; - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); - return qs; - } - - /* success, perform wire transfer */ - ctc = GNUNET_new (struct CloseTransferContext); - ctc->wa = wa; - ctc->session = session; - ctc->method = TALER_payto_get_method (account_payto_uri); - TALER_BANK_prepare_transfer (account_payto_uri, - &amount_without_fee, - exchange_base_url, - &wtid, - &buf, - &buf_size); - /* Commit our intention to execute the wire transfer! */ - qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, - ctc->session, - ctc->method, - buf, - buf_size); - GNUNET_free (buf); - if (GNUNET_DB_STATUS_HARD_ERROR == qs) - { - GNUNET_break (0); - GNUNET_free (ctc->method); - GNUNET_free (ctc); - ctc = NULL; - return GNUNET_DB_STATUS_HARD_ERROR; - } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - /* start again */ - GNUNET_free (ctc->method); - GNUNET_free (ctc); - ctc = NULL; - return GNUNET_DB_STATUS_SUCCESS_NO_RESULTS; - } - erc->async_cont = GNUNET_YES; - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_transfers, - NULL); - GNUNET_free (ctc->method); - GNUNET_free (ctc); - ctc = NULL; - - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -} - - -/** - * Main work function that finds and triggers transfers for reserves - * closures. - * - * @param cls closure - */ -static void -run_reserve_closures (void *cls) -{ - struct TALER_EXCHANGEDB_Session *session; - enum GNUNET_DB_QueryStatus qs; - const struct GNUNET_SCHEDULER_TaskContext *tc; - struct ExpiredReserveContext erc; - struct GNUNET_TIME_Absolute now; - - (void) cls; - task = NULL; - reserves_idle = GNUNET_NO; - tc = GNUNET_SCHEDULER_get_task_context (); - if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) - return; - if (NULL == (session = db_plugin->get_session (db_plugin->cls))) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to obtain database session!\n"); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - } - - if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - session, - "aggregator reserve closures")) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - } - erc.session = session; - erc.async_cont = GNUNET_NO; - now = GNUNET_TIME_absolute_get (); - (void) GNUNET_TIME_round_abs (&now); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Checking for reserves to close by date %s\n", - GNUNET_STRINGS_absolute_time_to_string (now)); - qs = db_plugin->get_expired_reserves (db_plugin->cls, - session, - now, - &expired_reserve_cb, - &erc); - GNUNET_assert (1 >= qs); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); - global_ret = GNUNET_SYSERR; - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - db_plugin->rollback (db_plugin->cls, - session); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, - NULL); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "No more idle reserves, going back to aggregation\n"); - reserves_idle = GNUNET_YES; - db_plugin->rollback (db_plugin->cls, - session); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); - return; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - (void) commit_or_warn (session); - if (GNUNET_YES == erc.async_cont) - break; - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, - NULL); - return; - } -} - - /** * Main work function that queries the DB and aggregates transactions * into larger wire transfers. @@ -1356,7 +749,6 @@ run_reserve_closures (void *cls) static void run_aggregation (void *cls) { - static unsigned int swap; struct AggregationUnit au_active; struct TALER_EXCHANGEDB_Session *session; enum GNUNET_DB_QueryStatus qs; @@ -1369,13 +761,6 @@ run_aggregation (void *cls) tc = GNUNET_SCHEDULER_get_task_context (); if (0 != (tc->reason & GNUNET_SCHEDULER_REASON_SHUTDOWN)) return; - if (0 == (++swap % 2)) - { - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, - NULL); - return; - } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Checking for ready deposits to aggregate\n"); if (NULL == (session = db_plugin->get_session (db_plugin->cls))) @@ -1420,7 +805,6 @@ run_aggregation (void *cls) if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { /* should re-try immediately */ - swap--; /* do not count failed attempts */ GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); @@ -1428,30 +812,18 @@ run_aggregation (void *cls) } GNUNET_log (GNUNET_ERROR_TYPE_INFO, "No more ready deposits, going to sleep\n"); - if ( (GNUNET_YES == test_mode) && - (swap >= 2) ) + if (GNUNET_YES == test_mode) { /* in test mode, shutdown if we end up being idle */ GNUNET_SCHEDULER_shutdown (); } else { - if ( (GNUNET_NO == reserves_idle) || - (GNUNET_YES == test_mode) ) - { - /* Possibly more to on reserves, go for it immediately */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_reserve_closures, - NULL); - } - else - { - /* nothing to do, sleep for a minute and try again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, - &run_aggregation, - NULL); - } + /* nothing to do, sleep for a minute and try again */ + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, + &run_aggregation, + NULL); } return; } @@ -1791,14 +1163,14 @@ wire_prepare_cb (void *cls, const char *buf, size_t buf_size) { - struct WireAccount *wa; + struct TALER_EXCHANGEDB_WireAccount *wa; (void) cls; wpd->row_id = rowid; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Starting wire transfer %llu\n", (unsigned long long) rowid); - wpd->wa = find_account_by_method (wire_method); + wpd->wa = TALER_EXCHANGEDB_find_account_by_method (wire_method); if (NULL == wpd->wa) { /* Should really never happen here, as when we get -- cgit v1.2.3