diff options
Diffstat (limited to 'src/exchange/taler-exchange-wirewatch.c')
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 286 |
1 files changed, 183 insertions, 103 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index e9b28030b..da5d9c098 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2016--2022 Taler Systems SA + Copyright (C) 2016--2023 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -52,6 +52,23 @@ static const struct TALER_EXCHANGEDB_AccountInfo *ai; static struct TALER_BANK_CreditHistoryHandle *hh; /** + * Set to true if the request for history did actually + * return transaction items. + */ +static bool hh_returned_data; + +/** + * Set to true if the request for history did not + * succeed because the account was unknown. + */ +static bool hh_account_404; + +/** + * When did we start the last @e hh request? + */ +static struct GNUNET_TIME_Absolute hh_start_time; + +/** * Until when is processing this wire plugin delayed? */ static struct GNUNET_TIME_Absolute delayed_until; @@ -95,6 +112,12 @@ static struct GNUNET_TIME_Absolute shard_end_time; static struct GNUNET_TIME_Relative shard_delay; /** + * How long did we take to finish the last shard + * for this account? + */ +static struct GNUNET_TIME_Relative longpoll_timeout; + +/** * Name of our job in the shard table. */ static char *job_name; @@ -154,6 +177,11 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval; /** + * How long do we sleep on serialization conflicts? + */ +static struct GNUNET_TIME_Relative wirewatch_conflict_sleep_interval; + +/** * Modulus to apply to group shards. The shard size must ultimately be a * multiple of the batch size. Thus, if this is not a multiple of the * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size. @@ -210,6 +238,8 @@ shutdown_task (void *cls) if (NULL != hh) { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "History request cancelled on shutdown\n"); TALER_BANK_credit_history_cancel (hh); hh = NULL; } @@ -218,13 +248,16 @@ shutdown_task (void *cls) db_plugin->rollback (db_plugin->cls); started_transaction = false; } - qs = db_plugin->abort_shard (db_plugin->cls, - job_name, - shard_start, - shard_end); - if (qs <= 0) - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to abort work shard on shutdown\n"); + if (shard_open) + { + qs = db_plugin->abort_shard (db_plugin->cls, + job_name, + shard_start, + shard_end); + if (qs <= 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to abort work shard on shutdown\n"); + } GNUNET_free (job_name); if (NULL != ctx) { @@ -263,7 +296,7 @@ add_account_cb (void *cls, if (! in_ai->credit_enabled) return; /* not enabled for us, skip */ if ( (NULL != account_section) && - (0 != strcasecmp (ai->section_name, + (0 != strcasecmp (in_ai->section_name, account_section)) ) return; /* not enabled for us, skip */ if (NULL != ai) @@ -320,8 +353,6 @@ exchange_serve_process_config (void) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No wire accounts configured for credit!\n"); - TALER_EXCHANGEDB_plugin_unload (db_plugin); - db_plugin = NULL; return GNUNET_SYSERR; } TALER_EXCHANGEDB_find_accounts (&add_account_cb, @@ -331,9 +362,6 @@ exchange_serve_process_config (void) GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No accounts enabled for credit!\n"); GNUNET_SCHEDULER_shutdown (); - global_ret = EXIT_INVALIDARGUMENT; - TALER_EXCHANGEDB_plugin_unload (db_plugin); - db_plugin = NULL; return GNUNET_SYSERR; } return GNUNET_OK; @@ -377,6 +405,8 @@ handle_soft_error (void) } /* Reset to beginning of transaction, and go again from there. */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Encountered soft error, resetting start point to batch start\n"); latest_row_off = batch_start; GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&continue_with_shard, @@ -420,7 +450,6 @@ schedule_transfers (void) static void transaction_completed (void) { - GNUNET_assert (! started_transaction); if ( (batch_start + batch_size == latest_row_off) && (batch_size < MAXIMUM_BATCH_SIZE) ) @@ -448,6 +477,29 @@ transaction_completed (void) GNUNET_SCHEDULER_shutdown (); return; } + if (! (hh_returned_data || hh_account_404) ) + { + /* Enforce long-polling delay even if the server ignored it + and returned earlier */ + struct GNUNET_TIME_Relative latency; + struct GNUNET_TIME_Relative left; + + latency = GNUNET_TIME_absolute_get_duration (hh_start_time); + left = GNUNET_TIME_relative_subtract (longpoll_timeout, + latency); + if (! (test_mode || + GNUNET_TIME_relative_is_zero (left)) ) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Server did not respect long-polling, enforcing client-side by sleeping for %s\n", + GNUNET_TIME_relative2s (left, + true)); + delayed_until = GNUNET_TIME_relative_to_absolute (left); + } + if (hh_account_404) + delayed_until = GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_UNIT_MILLISECONDS); + if (test_mode) + delayed_until = GNUNET_TIME_UNIT_ZERO_ABS; GNUNET_assert (NULL == task); schedule_transfers (); } @@ -475,6 +527,7 @@ process_reply (const struct TALER_BANK_CreditDetails *details, transaction_completed (); return; } + hh_returned_data = true; /* check serial IDs for range constraints */ for (unsigned int i = 0; i<details_length; i++) { @@ -506,34 +559,30 @@ process_reply (const struct TALER_BANK_CreditDetails *details, } lroff = cd->serial_id; } - if (GNUNET_OK != - db_plugin->start_read_committed (db_plugin->cls, - "wirewatch check for incoming wire transfers")) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return; - } - started_transaction = true; - - for (unsigned int i = 0; i<details_length; i++) + if (0 != details_length) { - const struct TALER_BANK_CreditDetails *cd = &details[i]; + enum GNUNET_DB_QueryStatus qss[details_length]; + struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length]; - /* FIXME #7276: Consider using Postgres multi-valued insert here, - for up to 15x speed-up according to - https://dba.stackexchange.com/questions/224989/multi-row-insert-vs-transactional-single-row-inserts#225006 - (Note: this may require changing both the - plugin API as well as modifying how this function is called.) */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Importing %u transactions\n", + details_length); + for (unsigned int i = 0; i<details_length; i++) + { + const struct TALER_BANK_CreditDetails *cd = &details[i]; + struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[i]; + + res->reserve_pub = &cd->reserve_pub; + res->balance = &cd->amount; + res->execution_time = cd->execution_date; + res->sender_account_details = cd->debit_account_uri; + res->exchange_account_name = ai->section_name; + res->wire_reference = cd->serial_id; + } qs = db_plugin->reserves_in_insert (db_plugin->cls, - &cd->reserve_pub, - &cd->amount, - cd->execution_date, - cd->debit_account_uri, - ai->section_name, - cd->serial_id); + reserves, + details_length, + qss); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: @@ -542,34 +591,53 @@ process_reply (const struct TALER_BANK_CreditDetails *details, return; case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Got DB soft error for reserves_in_insert. Rolling back.\n"); + "Got DB soft error for reserves_in_insert (%u). Rolling back.\n", + details_length); handle_soft_error (); return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* Either wirewatch was freshly started after the system was - shutdown and we're going over an incomplete shard again - after being restarted, or the shard lock period was too - short (number of workers set incorrectly?) and a 2nd - wirewatcher has been stealing our work while we are still - at it. */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Attempted to import transaction %llu (%s) twice. " - "This should happen rarely (if not, ask for support).\n", - (unsigned long long) cd->serial_id, - job_name); - db_plugin->rollback (db_plugin->cls); - started_transaction = false; - /* already existed, ok, let's just continue */ - return; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Imported transaction %llu.", - (unsigned long long) cd->serial_id); - /* normal case */ + default: break; } - progress = true; + for (unsigned int i = 0; i<details_length; i++) + { + const struct TALER_BANK_CreditDetails *cd = &details[i]; + + switch (qss[i]) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n", + i); + handle_soft_error (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* Either wirewatch was freshly started after the system was + shutdown and we're going over an incomplete shard again + after being restarted, or the shard lock period was too + short (number of workers set incorrectly?) and a 2nd + wirewatcher has been stealing our work while we are still + at it. */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Attempted to import transaction %llu (%s) twice. " + "This should happen rarely (if not, ask for support).\n", + (unsigned long long) cd->serial_id, + job_name); + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Imported transaction %llu.\n", + (unsigned long long) cd->serial_id); + /* normal case */ + progress = true; + break; + } + } } + latest_row_off = lroff; shard_done = (shard_end <= latest_row_off); if (shard_done) @@ -596,6 +664,7 @@ process_reply (const struct TALER_BANK_CreditDetails *details, break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: /* normal case */ + progress = true; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Completed shard %s (%llu,%llu] after %s\n", job_name, @@ -606,35 +675,6 @@ process_reply (const struct TALER_BANK_CreditDetails *details, true)); break; } - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Committing %s progress (%llu,%llu] at %llu\n (%s)", - job_name, - (unsigned long long) shard_start, - (unsigned long long) shard_end, - (unsigned long long) latest_row_off, - shard_done - ? "shard done" - : "shard incomplete"); - qs = db_plugin->commit (db_plugin->cls); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - /* reduce transaction size to reduce rollback probability */ - handle_soft_error (); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - started_transaction = false; - /* normal case */ - break; - } - if (shard_done) - { shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time); shard_open = false; transaction_completed (); @@ -660,19 +700,20 @@ history_cb (void *cls, (void) cls; GNUNET_assert (NULL == task); hh = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "History request returned with HTTP status %u\n", reply->http_status); switch (reply->http_status) { case MHD_HTTP_OK: - process_reply (reply->details.success.details, - reply->details.success.details_length); + process_reply (reply->details.ok.details, + reply->details.ok.details_length); return; case MHD_HTTP_NO_CONTENT: transaction_completed (); return; case MHD_HTTP_NOT_FOUND: + hh_account_404 = true; if (ignore_account_404) { transaction_completed (); @@ -680,12 +721,12 @@ history_cb (void *cls, } break; default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error fetching history: %s (%u)\n", + TALER_ErrorCode_get_hint (reply->ec), + reply->http_status); break; } - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error fetching history: %s (%u)\n", - TALER_ErrorCode_get_hint (reply->ec), - reply->http_status); if (! exit_on_error) { transaction_completed (); @@ -707,15 +748,18 @@ continue_with_shard (void *cls) shard_end - latest_row_off); GNUNET_assert (NULL == hh); GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Requesting credit history staring from %llu\n", + "Requesting credit history starting from %llu\n", (unsigned long long) latest_row_off); + hh_start_time = GNUNET_TIME_absolute_get (); + hh_returned_data = false; + hh_account_404 = false; hh = TALER_BANK_credit_history (ctx, ai->auth, latest_row_off, limit, test_mode ? GNUNET_TIME_UNIT_ZERO - : LONGPOLL_TIMEOUT, + : longpoll_timeout, &history_cb, NULL); if (NULL == hh) @@ -729,6 +773,11 @@ continue_with_shard (void *cls) } +/** + * Reserve a shard for us to work on. + * + * @param cls NULL + */ static void lock_shard (void *cls) { @@ -792,12 +841,25 @@ lock_shard (void *cls) { struct GNUNET_TIME_Relative rdelay; - rdelay = GNUNET_TIME_randomize (wirewatch_idle_sleep_interval); + wirewatch_conflict_sleep_interval + = GNUNET_TIME_STD_BACKOFF (wirewatch_conflict_sleep_interval); + rdelay = GNUNET_TIME_randomize (wirewatch_conflict_sleep_interval); GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Serialization error tying to obtain shard %s, will try again in %s!\n", job_name, GNUNET_STRINGS_relative_time_to_string (rdelay, true)); +#if 1 + if (GNUNET_TIME_relative_cmp (rdelay, + >, + GNUNET_TIME_UNIT_SECONDS)) + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Delay would have been for %s\n", + GNUNET_TIME_relative2s (rdelay, + true)); + rdelay = GNUNET_TIME_relative_min (rdelay, + GNUNET_TIME_UNIT_SECONDS); +#endif delayed_until = GNUNET_TIME_relative_to_absolute (rdelay); } GNUNET_assert (NULL == task); @@ -810,7 +872,7 @@ lock_shard (void *cls) job_name, GNUNET_STRINGS_relative_time_to_string ( wirewatch_idle_sleep_interval, - GNUNET_YES)); + true)); delayed_until = GNUNET_TIME_relative_to_absolute ( wirewatch_idle_sleep_interval); shard_open = false; @@ -819,6 +881,7 @@ lock_shard (void *cls) return; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: /* continued below */ + wirewatch_conflict_sleep_interval = GNUNET_TIME_UNIT_ZERO; break; } shard_end_time = GNUNET_TIME_relative_to_absolute (delay); @@ -834,9 +897,19 @@ lock_shard (void *cls) if ( (shard_open) && (shard_start == last_shard_start) && (shard_end == last_shard_end) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Continuing from %llu\n", + (unsigned long long) latest_row_off); GNUNET_break (latest_row_off >= batch_start); /* resume where we left things */ + } else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Resetting shard start to original start point (%d)\n", + shard_open ? 1 : 0); latest_row_off = batch_start; + } shard_open = true; task = GNUNET_SCHEDULER_add_now (&continue_with_shard, NULL); @@ -877,6 +950,7 @@ run (void *cls, { GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_NO_RESTART; return; } rc = GNUNET_CURL_gnunet_rc_create (ctx); @@ -905,6 +979,11 @@ main (int argc, "exit-on-error", "terminate wirewatch if we failed to download information from the bank", &exit_on_error), + GNUNET_GETOPT_option_relative_time ('f', + "longpoll-timeout", + "DELAY", + "what is the timeout when asking the bank about new transactions, specify with unit (e.g. --longpoll-timeout=30s)", + &longpoll_timeout), GNUNET_GETOPT_option_flag ('I', "ignore-not-found", "continue, even if the bank account of the exchange was not found", @@ -930,6 +1009,7 @@ main (int argc, }; enum GNUNET_GenericReturnValue ret; + longpoll_timeout = LONGPOLL_TIMEOUT; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) |