diff options
Diffstat (limited to 'src/exchange/taler-exchange-wirewatch.c')
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 1061 |
1 files changed, 723 insertions, 338 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 47f2bdb6b..da5d9c098 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2016--2020 Taler Systems SA + Copyright (C) 2016--2023 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -13,7 +13,6 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ - /** * @file taler-exchange-wirewatch.c * @brief Process that watches for wire transfers to the exchange's bank account @@ -29,106 +28,125 @@ #include "taler_json_lib.h" #include "taler_bank_service.h" -#define DEBUG_LOGGING 0 /** - * What is the initial batch size we use for credit history + * How long to wait for an HTTP reply if there + * are no transactions pending at the server? + */ +#define LONGPOLL_TIMEOUT GNUNET_TIME_UNIT_MINUTES + +/** + * What is the maximum batch size we use for credit history * requests with the bank. See `batch_size` below. */ -#define INITIAL_BATCH_SIZE 1024 +#define MAXIMUM_BATCH_SIZE 1024 /** - * Information we keep for each supported account. + * Information about our account. */ -struct WireAccount -{ - /** - * Accounts are kept in a DLL. - */ - struct WireAccount *next; +static const struct TALER_EXCHANGEDB_AccountInfo *ai; - /** - * Plugins are kept in a DLL. - */ - struct WireAccount *prev; +/** + * Active request for history. + */ +static struct TALER_BANK_CreditHistoryHandle *hh; - /** - * Name of the section that configures this account. - */ - char *section_name; +/** + * Set to true if the request for history did actually + * return transaction items. + */ +static bool hh_returned_data; - /** - * Database session we are using for the current transaction. - */ - struct TALER_EXCHANGEDB_Session *session; +/** + * Set to true if the request for history did not + * succeed because the account was unknown. + */ +static bool hh_account_404; - /** - * Active request for history. - */ - struct TALER_BANK_CreditHistoryHandle *hh; +/** + * When did we start the last @e hh request? + */ +static struct GNUNET_TIME_Absolute hh_start_time; - /** - * Authentication data. - */ - struct TALER_BANK_AuthenticationData auth; +/** + * Until when is processing this wire plugin delayed? + */ +static struct GNUNET_TIME_Absolute delayed_until; - /** - * Until when is processing this wire plugin delayed? - */ - struct GNUNET_TIME_Absolute delayed_until; +/** + * Encoded offset in the wire transfer list from where + * to start the next query with the bank. + */ +static uint64_t batch_start; - /** - * Encoded offset in the wire transfer list from where - * to start the next query with the bank. - */ - uint64_t last_row_off; +/** + * Latest row offset seen in this transaction, becomes + * the new #batch_start upon commit. + */ +static uint64_t latest_row_off; - /** - * Latest row offset seen in this transaction, becomes - * the new #last_row_off upon commit. - */ - uint64_t latest_row_off; +/** + * Offset where our current shard begins (inclusive). + */ +static uint64_t shard_start; - /** - * How many transactions do we retrieve per batch? - */ - unsigned int batch_size; +/** + * Offset where our current shard ends (exclusive). + */ +static uint64_t shard_end; - /** - * How many transactions did we see in the current batch? - */ - unsigned int current_batch_size; +/** + * When did we start with the shard? + */ +static struct GNUNET_TIME_Absolute shard_start_time; - /** - * Are we running from scratch and should re-process all transactions - * for this account? - */ - int reset_mode; +/** + * For how long did we lock the shard? + */ +static struct GNUNET_TIME_Absolute shard_end_time; - /** - * Should we delay the next request to the wire plugin a bit? Set to - * #GNUNET_NO if we actually did some work. - */ - int delay; +/** + * How long did we take to finish the last shard + * for this account? + */ +static struct GNUNET_TIME_Relative shard_delay; -}; +/** + * How long did we take to finish the last shard + * for this account? + */ +static struct GNUNET_TIME_Relative longpoll_timeout; +/** + * Name of our job in the shard table. + */ +static char *job_name; /** - * Head of list of loaded wire plugins. + * How many transactions do we retrieve per batch? */ -static struct WireAccount *wa_head; +static unsigned int batch_size; /** - * Tail of list of loaded wire plugins. + * How much do we increment @e batch_size on success? */ -static struct WireAccount *wa_tail; +static unsigned int batch_thresh; /** - * Wire account we are currently processing. This would go away - * if we ever start processing all accounts in parallel. + * Did work remain in the transaction queue? Set to true + * if we did some work and thus there might be more. */ -static struct WireAccount *wa_pos; +static bool progress; + +/** + * Did we start a transaction yet? + */ +static bool started_transaction; + +/** + * Is this shard still open for processing. + */ +static bool shard_open; /** * Handle to the context for interacting with the bank. @@ -152,24 +170,39 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; /** * How long should we sleep when idle before trying to find more work? + * Also used for how long we wait to grab a shard before trying it again. + * The value should be set to a bit above the average time it takes to + * process a shard. */ static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval; /** + * How long do we sleep on serialization conflicts? + */ +static struct GNUNET_TIME_Relative wirewatch_conflict_sleep_interval; + +/** + * Modulus to apply to group shards. The shard size must ultimately be a + * multiple of the batch size. Thus, if this is not a multiple of the + * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size. + */ +static unsigned int shard_size = MAXIMUM_BATCH_SIZE; + +/** + * How many workers should we plan our scheduling with? + */ +static unsigned int max_workers = 16; + +/** + * -e command-line option: exit on errors talking to the bank? + */ +static int exit_on_error; + +/** * Value to return from main(). 0 on success, non-zero on * on serious errors. */ -static enum -{ - GR_SUCCESS = 0, - GR_DATABASE_SESSION_FAIL = 1, - GR_DATABASE_TRANSACTION_BEGIN_FAIL = 2, - GR_DATABASE_SELECT_LATEST_HARD_FAIL = 3, - GR_BANK_REQUEST_HISTORY_FAIL = 4, - GR_CONFIGURATION_INVALID = 5, - GR_CMD_LINE_UTF8_ERROR = 6, - GR_CMD_LINE_OPTIONS_WRONG = 7, -} global_ret; +static int global_ret; /** * Are we run in testing mode and should only do one pass? @@ -177,15 +210,20 @@ static enum static int test_mode; /** - * Are we running from scratch and should re-process all transactions? + * Should we ignore if the bank does not know our bank + * account? */ -static int reset_mode; +static int ignore_account_404; /** * Current task waiting for execution, if any. */ static struct GNUNET_SCHEDULER_Task *task; +/** + * Name of the configuration section with the account we should watch. + */ +static char *account_section; /** * We're being aborted with CTRL-C (or SIGTERM). Shut down. @@ -195,27 +233,32 @@ static struct GNUNET_SCHEDULER_Task *task; static void shutdown_task (void *cls) { + enum GNUNET_DB_QueryStatus qs; (void) cls; - { - struct WireAccount *wa; - while (NULL != (wa = wa_head)) - { - if (NULL != wa->hh) - { - TALER_BANK_credit_history_cancel (wa->hh); - wa->hh = NULL; - } - GNUNET_CONTAINER_DLL_remove (wa_head, - wa_tail, - wa); - TALER_BANK_auth_free (&wa->auth); - GNUNET_free (wa->section_name); - GNUNET_free (wa); - } + if (NULL != hh) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "History request cancelled on shutdown\n"); + TALER_BANK_credit_history_cancel (hh); + hh = NULL; } - wa_pos = NULL; - + if (started_transaction) + { + db_plugin->rollback (db_plugin->cls); + started_transaction = false; + } + if (shard_open) + { + qs = db_plugin->abort_shard (db_plugin->cls, + job_name, + shard_start, + shard_end); + if (qs <= 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to abort work shard on shutdown\n"); + } + GNUNET_free (job_name); if (NULL != ctx) { GNUNET_CURL_fini (ctx); @@ -233,6 +276,8 @@ shutdown_task (void *cls) } TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; + TALER_EXCHANGEDB_unload_accounts (); + cfg = NULL; } @@ -241,35 +286,36 @@ shutdown_task (void *cls) * account to our list (if it is enabled and we can load the plugin). * * @param cls closure, NULL - * @param ai account information + * @param in_ai account information */ static void add_account_cb (void *cls, - const struct TALER_EXCHANGEDB_AccountInfo *ai) + const struct TALER_EXCHANGEDB_AccountInfo *in_ai) { - struct WireAccount *wa; - (void) cls; - if (GNUNET_YES != ai->credit_enabled) + if (! in_ai->credit_enabled) return; /* not enabled for us, skip */ - wa = GNUNET_new (struct WireAccount); - wa->reset_mode = reset_mode; - if (GNUNET_OK != - TALER_BANK_auth_parse_cfg (cfg, - ai->section_name, - &wa->auth)) + if ( (NULL != account_section) && + (0 != strcasecmp (in_ai->section_name, + account_section)) ) + return; /* not enabled for us, skip */ + if (NULL != ai) { - GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, - "Failed to load account `%s'\n", - ai->section_name); - GNUNET_free (wa); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Multiple accounts enabled (%s and %s), use '-a' command-line option to select one!\n", + ai->section_name, + in_ai->section_name); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_INVALIDARGUMENT; return; } - wa->section_name = GNUNET_strdup (ai->section_name); - wa->batch_size = INITIAL_BATCH_SIZE; - GNUNET_CONTAINER_DLL_insert (wa_head, - wa_tail, - wa); + ai = in_ai; + GNUNET_asprintf (&job_name, + "wirewatch-%s", + ai->section_name); + batch_size = MAXIMUM_BATCH_SIZE; + if (0 != shard_size % batch_size) + batch_size = shard_size; } @@ -279,7 +325,7 @@ add_account_cb (void *cls, * * @return #GNUNET_OK on success */ -static int +static enum GNUNET_GenericReturnValue exchange_serve_process_config (void) { if (GNUNET_OK != @@ -296,18 +342,26 @@ exchange_serve_process_config (void) if (NULL == (db_plugin = TALER_EXCHANGEDB_plugin_load (cfg))) { - fprintf (stderr, - "Failed to initialize DB subsystem\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to initialize DB subsystem\n"); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + TALER_EXCHANGEDB_load_accounts (cfg, + TALER_EXCHANGEDB_ALO_CREDIT + | TALER_EXCHANGEDB_ALO_AUTHDATA)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No wire accounts configured for credit!\n"); return GNUNET_SYSERR; } - TALER_EXCHANGEDB_find_accounts (cfg, - &add_account_cb, + TALER_EXCHANGEDB_find_accounts (&add_account_cb, NULL); - if (NULL == wa_head) + if (NULL == ai) { - fprintf (stderr, - "No wire accounts configured for credit!\n"); - TALER_EXCHANGEDB_plugin_unload (db_plugin); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No accounts enabled for credit!\n"); + GNUNET_SCHEDULER_shutdown (); return GNUNET_SYSERR; } return GNUNET_OK; @@ -315,248 +369,550 @@ exchange_serve_process_config (void) /** - * Query for incoming wire transfers. + * Lock a shard and then begin to query for incoming wire transfers. * * @param cls NULL */ static void -find_transfers (void *cls); +lock_shard (void *cls); /** - * Callbacks of this type are used to serve the result of asking - * the bank for the transaction history. + * Continue with the credit history of the shard. * - * @param cls closure with the `struct WioreAccount *` we are processing - * @param http_status HTTP status code from the server - * @param ec taler error code - * @param serial_id identification of the position at which we are querying - * @param details details about the wire transfer - * @param json raw JSON response - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration - */ -static int -history_cb (void *cls, - unsigned int http_status, - enum TALER_ErrorCode ec, - uint64_t serial_id, - const struct TALER_BANK_CreditDetails *details, - const json_t *json) + * @param cls NULL + */ +static void +continue_with_shard (void *cls); + + +/** + * We encountered a serialization error. Rollback the transaction and try + * again. + */ +static void +handle_soft_error (void) +{ + db_plugin->rollback (db_plugin->cls); + started_transaction = false; + if (1 < batch_size) + { + batch_thresh = batch_size; + batch_size /= 2; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Reduced batch size to %llu due to serialization issue\n", + (unsigned long long) batch_size); + } + /* Reset to beginning of transaction, and go again + from there. */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Encountered soft error, resetting start point to batch start\n"); + latest_row_off = batch_start; + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&continue_with_shard, + NULL); +} + + +/** + * Schedule the #lock_shard() operation. + */ +static void +schedule_transfers (void) +{ + if (shard_open) + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Will retry my shard (%llu,%llu] of %s in %s\n", + (unsigned long long) shard_start, + (unsigned long long) shard_end, + job_name, + GNUNET_STRINGS_relative_time_to_string ( + GNUNET_TIME_absolute_get_remaining (delayed_until), + true)); + else + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Will try to lock next shard of %s in %s\n", + job_name, + GNUNET_STRINGS_relative_time_to_string ( + GNUNET_TIME_absolute_get_remaining (delayed_until), + true)); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_at (delayed_until, + &lock_shard, + NULL); +} + + +/** + * We are done with the work that is possible right now (and the transaction + * was committed, if there was one to commit). Move on to the next shard. + */ +static void +transaction_completed (void) +{ + if ( (batch_start + batch_size == + latest_row_off) && + (batch_size < MAXIMUM_BATCH_SIZE) ) + { + /* The current batch size worked without serialization + issues, and we are allowed to grow. Do so slowly. */ + int delta; + + delta = ((int) batch_thresh - (int) batch_size) / 4; + if (delta < 0) + delta = -delta; + batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, + batch_size + delta + 1); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Increasing batch size to %llu\n", + (unsigned long long) batch_size); + } + + if ( (! progress) && test_mode) + { + /* Transaction list was drained and we are in + test mode. So we are done. */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transaction list drained and in test mode. Exiting\n"); + GNUNET_SCHEDULER_shutdown (); + return; + } + if (! (hh_returned_data || hh_account_404) ) + { + /* Enforce long-polling delay even if the server ignored it + and returned earlier */ + struct GNUNET_TIME_Relative latency; + struct GNUNET_TIME_Relative left; + + latency = GNUNET_TIME_absolute_get_duration (hh_start_time); + left = GNUNET_TIME_relative_subtract (longpoll_timeout, + latency); + if (! (test_mode || + GNUNET_TIME_relative_is_zero (left)) ) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Server did not respect long-polling, enforcing client-side by sleeping for %s\n", + GNUNET_TIME_relative2s (left, + true)); + delayed_until = GNUNET_TIME_relative_to_absolute (left); + } + if (hh_account_404) + delayed_until = GNUNET_TIME_relative_to_absolute ( + GNUNET_TIME_UNIT_MILLISECONDS); + if (test_mode) + delayed_until = GNUNET_TIME_UNIT_ZERO_ABS; + GNUNET_assert (NULL == task); + schedule_transfers (); +} + + +/** + * We got incoming transaction details from the bank. Add them + * to the database. + * + * @param details array of transaction details + * @param details_length length of the @a details array + */ +static void +process_reply (const struct TALER_BANK_CreditDetails *details, + unsigned int details_length) { - struct WireAccount *wa = cls; - struct TALER_EXCHANGEDB_Session *session = wa->session; enum GNUNET_DB_QueryStatus qs; + bool shard_done; + uint64_t lroff = latest_row_off; - (void) json; - if (NULL == details) + if (0 == details_length) { - wa->hh = NULL; - if (TALER_EC_NONE != ec) + /* Server should have used 204, not 200! */ + GNUNET_break_op (0); + transaction_completed (); + return; + } + hh_returned_data = true; + /* check serial IDs for range constraints */ + for (unsigned int i = 0; i<details_length; i++) + { + const struct TALER_BANK_CreditDetails *cd = &details[i]; + + if (cd->serial_id < lroff) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error fetching history: ec=%u, http_status=%u\n", - (unsigned int) ec, - http_status); + "Serial ID %llu not monotonic (got %llu before). Failing!\n", + (unsigned long long) cd->serial_id, + (unsigned long long) lroff); + db_plugin->rollback (db_plugin->cls); + GNUNET_SCHEDULER_shutdown (); + return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "End of list. Committing progress!\n"); - qs = db_plugin->commit (db_plugin->cls, - session); - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + if (cd->serial_id > shard_end) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Got DB soft error for commit\n"); - /* reduce transaction size to reduce rollback probability */ - if (2 > wa->current_batch_size) - wa->current_batch_size /= 2; - /* try again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&find_transfers, - NULL); - return GNUNET_OK; /* will be ignored anyway */ + /* we are *past* the current shard (likely because the serial_id of the + shard_end happens to not exist in the DB). So commit and stop this + iteration! */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serial ID %llu past shard end at %llu, ending iteration early!\n", + (unsigned long long) cd->serial_id, + (unsigned long long) shard_end); + details_length = i; + progress = true; + lroff = cd->serial_id - 1; + break; } - if (0 < qs) + lroff = cd->serial_id; + } + if (0 != details_length) + { + enum GNUNET_DB_QueryStatus qss[details_length]; + struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length]; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Importing %u transactions\n", + details_length); + for (unsigned int i = 0; i<details_length; i++) { - /* transaction success, update #last_row_off */ - wa->last_row_off = wa->latest_row_off; - wa->latest_row_off = 0; /* should not be needed */ - wa->session = NULL; /* should not be needed */ - /* if successful at limit, try increasing transaction batch size (AIMD) */ - if ( (wa->current_batch_size == wa->batch_size) && - (UINT_MAX > wa->batch_size) ) - wa->batch_size++; + const struct TALER_BANK_CreditDetails *cd = &details[i]; + struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[i]; + + res->reserve_pub = &cd->reserve_pub; + res->balance = &cd->amount; + res->execution_time = cd->execution_date; + res->sender_account_details = cd->debit_account_uri; + res->exchange_account_name = ai->section_name; + res->wire_reference = cd->serial_id; } - GNUNET_break (0 <= qs); - if ( (GNUNET_YES == wa->delay) && - (test_mode) && - (NULL == wa->next) ) + qs = db_plugin->reserves_in_insert (db_plugin->cls, + reserves, + details_length, + qss); + switch (qs) { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Shutdown due to test mode!\n"); + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); GNUNET_SCHEDULER_shutdown (); - return GNUNET_OK; + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for reserves_in_insert (%u). Rolling back.\n", + details_length); + handle_soft_error (); + return; + default: + break; } - if (GNUNET_YES == wa->delay) + for (unsigned int i = 0; i<details_length; i++) { - wa->delayed_until - = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); - wa_pos = wa_pos->next; - if (NULL == wa_pos) - wa_pos = wa_head; - GNUNET_assert (NULL != wa_pos); + const struct TALER_BANK_CreditDetails *cd = &details[i]; + + switch (qss[i]) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n", + i); + handle_soft_error (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* Either wirewatch was freshly started after the system was + shutdown and we're going over an incomplete shard again + after being restarted, or the shard lock period was too + short (number of workers set incorrectly?) and a 2nd + wirewatcher has been stealing our work while we are still + at it. */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Attempted to import transaction %llu (%s) twice. " + "This should happen rarely (if not, ask for support).\n", + (unsigned long long) cd->serial_id, + job_name); + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Imported transaction %llu.\n", + (unsigned long long) cd->serial_id); + /* normal case */ + progress = true; + break; + } } - task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until, - &find_transfers, - NULL); - return GNUNET_OK; /* will be ignored anyway */ } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Adding wire transfer over %s with (hashed) subject `%s'\n", - TALER_amount2s (&details->amount), - TALER_B2S (&details->reserve_pub)); - - /** - * Debug block. - */ -#if DEBUG_LOGGING + + latest_row_off = lroff; + shard_done = (shard_end <= latest_row_off); + if (shard_done) { - /** Should be 53, give 80 just to be extra conservative (and aligned). */ -#define PUBSIZE 80 - char wtid_s[PUBSIZE]; - - GNUNET_break (NULL != - GNUNET_STRINGS_data_to_string (&details->reserve_pub, - sizeof (details->reserve_pub), - &wtid_s[0], - PUBSIZE)); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Plain text subject (= reserve_pub): %s\n", - wtid_s); + /* shard is complete, mark this as well */ + qs = db_plugin->complete_shard (db_plugin->cls, + job_name, + shard_start, + shard_end); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for complete_shard. Rolling back.\n"); + handle_soft_error (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); + /* Not expected, but let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + progress = true; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Completed shard %s (%llu,%llu] after %s\n", + job_name, + (unsigned long long) shard_start, + (unsigned long long) shard_end, + GNUNET_STRINGS_relative_time_to_string ( + GNUNET_TIME_absolute_get_duration (shard_start_time), + true)); + break; + } + shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time); + shard_open = false; + transaction_completed (); + return; } -#endif + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&continue_with_shard, + NULL); +} + - if (wa->current_batch_size < UINT_MAX) - wa->current_batch_size++; - qs = db_plugin->reserves_in_insert (db_plugin->cls, - session, - &details->reserve_pub, - &details->amount, - details->execution_date, - details->debit_account_url, - wa->section_name, - serial_id); - if (GNUNET_DB_STATUS_HARD_ERROR == qs) +/** + * Callbacks of this type are used to serve the result of asking + * the bank for the transaction history. + * + * @param cls NULL + * @param reply response we got from the bank + */ +static void +history_cb (void *cls, + const struct TALER_BANK_CreditHistoryResponse *reply) +{ + (void) cls; + GNUNET_assert (NULL == task); + hh = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "History request returned with HTTP status %u\n", + reply->http_status); + switch (reply->http_status) { - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls, - session); - GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + case MHD_HTTP_OK: + process_reply (reply->details.ok.details, + reply->details.ok.details_length); + return; + case MHD_HTTP_NO_CONTENT: + transaction_completed (); + return; + case MHD_HTTP_NOT_FOUND: + hh_account_404 = true; + if (ignore_account_404) + { + transaction_completed (); + return; + } + break; + default: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error fetching history: %s (%u)\n", + TALER_ErrorCode_get_hint (reply->ec), + reply->http_status); + break; } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + if (! exit_on_error) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Got DB soft error for reserves_in_insert. Rolling back.\n"); - db_plugin->rollback (db_plugin->cls, - session); - /* try again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&find_transfers, - NULL); - return GNUNET_SYSERR; + transaction_completed (); + return; + } + GNUNET_SCHEDULER_shutdown (); +} + + +static void +continue_with_shard (void *cls) +{ + unsigned int limit; + + (void) cls; + task = NULL; + GNUNET_assert (shard_end > latest_row_off); + limit = GNUNET_MIN (batch_size, + shard_end - latest_row_off); + GNUNET_assert (NULL == hh); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Requesting credit history starting from %llu\n", + (unsigned long long) latest_row_off); + hh_start_time = GNUNET_TIME_absolute_get (); + hh_returned_data = false; + hh_account_404 = false; + hh = TALER_BANK_credit_history (ctx, + ai->auth, + latest_row_off, + limit, + test_mode + ? GNUNET_TIME_UNIT_ZERO + : longpoll_timeout, + &history_cb, + NULL); + if (NULL == hh) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to start request for account history!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; } - wa->delay = GNUNET_NO; - wa->latest_row_off = serial_id; - return GNUNET_OK; } /** - * Query for incoming wire transfers. + * Reserve a shard for us to work on. * * @param cls NULL */ static void -find_transfers (void *cls) +lock_shard (void *cls) { - struct TALER_EXCHANGEDB_Session *session; enum GNUNET_DB_QueryStatus qs; + struct GNUNET_TIME_Relative delay; + uint64_t last_shard_start = shard_start; + uint64_t last_shard_end = shard_end; (void) cls; task = NULL; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Checking for incoming wire transfers\n"); - if (NULL == (session = db_plugin->get_session (db_plugin->cls))) + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to obtain database session!\n"); - global_ret = GR_DATABASE_SESSION_FAIL; + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } - db_plugin->preflight (db_plugin->cls, - session); - if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - session, - "wirewatch check for incoming wire transfers")) + if ( (shard_open) && + (GNUNET_TIME_absolute_is_future (shard_end_time)) ) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL; - GNUNET_SCHEDULER_shutdown (); + progress = false; + batch_start = latest_row_off; + task = GNUNET_SCHEDULER_add_now (&continue_with_shard, + NULL); return; } - if (! wa_pos->reset_mode) + if (shard_open) + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Shard not completed in time, will try to re-acquire\n"); + /* How long we lock a shard depends on the number of + workers expected, and how long we usually took to + process a shard. */ + if (0 == max_workers) + delay = GNUNET_TIME_UNIT_ZERO; + else + delay.rel_value_us = GNUNET_CRYPTO_random_u64 ( + GNUNET_CRYPTO_QUALITY_WEAK, + 4 * GNUNET_TIME_relative_max ( + wirewatch_idle_sleep_interval, + GNUNET_TIME_relative_multiply (shard_delay, + max_workers)).rel_value_us); + shard_start_time = GNUNET_TIME_absolute_get (); + qs = db_plugin->begin_shard (db_plugin->cls, + job_name, + delay, + shard_size, + &shard_start, + &shard_end); + switch (qs) { - qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls, - session, - wa_pos->section_name, - &wa_pos->last_row_off); - if (GNUNET_DB_STATUS_HARD_ERROR == qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to obtain starting point for montoring from database!\n"); - db_plugin->rollback (db_plugin->cls, - session); - global_ret = GR_DATABASE_SELECT_LATEST_HARD_FAIL; - GNUNET_SCHEDULER_shutdown (); - return; - } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain starting point for montoring from database!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* try again */ { - /* try again */ - db_plugin->rollback (db_plugin->cls, - session); - task = GNUNET_SCHEDULER_add_now (&find_transfers, - NULL); - return; + struct GNUNET_TIME_Relative rdelay; + + wirewatch_conflict_sleep_interval + = GNUNET_TIME_STD_BACKOFF (wirewatch_conflict_sleep_interval); + rdelay = GNUNET_TIME_randomize (wirewatch_conflict_sleep_interval); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization error tying to obtain shard %s, will try again in %s!\n", + job_name, + GNUNET_STRINGS_relative_time_to_string (rdelay, + true)); +#if 1 + if (GNUNET_TIME_relative_cmp (rdelay, + >, + GNUNET_TIME_UNIT_SECONDS)) + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Delay would have been for %s\n", + GNUNET_TIME_relative2s (rdelay, + true)); + rdelay = GNUNET_TIME_relative_min (rdelay, + GNUNET_TIME_UNIT_SECONDS); +#endif + delayed_until = GNUNET_TIME_relative_to_absolute (rdelay); } - wa_pos->reset_mode = GNUNET_NO; + GNUNET_assert (NULL == task); + schedule_transfers (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "No shard available, will try again for %s in %s!\n", + job_name, + GNUNET_STRINGS_relative_time_to_string ( + wirewatch_idle_sleep_interval, + true)); + delayed_until = GNUNET_TIME_relative_to_absolute ( + wirewatch_idle_sleep_interval); + shard_open = false; + GNUNET_assert (NULL == task); + schedule_transfers (); + return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* continued below */ + wirewatch_conflict_sleep_interval = GNUNET_TIME_UNIT_ZERO; + break; } - wa_pos->delay = GNUNET_YES; - wa_pos->current_batch_size = 0; /* reset counter */ - + shard_end_time = GNUNET_TIME_relative_to_absolute (delay); GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "wirewatch: requesting incoming history from %s\n", - wa_pos->auth.wire_gateway_url); - wa_pos->session = session; - wa_pos->hh = TALER_BANK_credit_history (ctx, - &wa_pos->auth, - wa_pos->last_row_off, - wa_pos->batch_size, - &history_cb, - wa_pos); - if (NULL == wa_pos->hh) + "Starting with shard %s at (%llu,%llu] locked for %s\n", + job_name, + (unsigned long long) shard_start, + (unsigned long long) shard_end, + GNUNET_STRINGS_relative_time_to_string (delay, + true)); + progress = false; + batch_start = shard_start; + if ( (shard_open) && + (shard_start == last_shard_start) && + (shard_end == last_shard_end) ) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start request for account history!\n"); - db_plugin->rollback (db_plugin->cls, - session); - global_ret = GR_BANK_REQUEST_HISTORY_FAIL; - GNUNET_SCHEDULER_shutdown (); - return; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Continuing from %llu\n", + (unsigned long long) latest_row_off); + GNUNET_break (latest_row_off >= batch_start); /* resume where we left things */ + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Resetting shard start to original start point (%d)\n", + shard_open ? 1 : 0); + latest_row_off = batch_start; } + shard_open = true; + task = GNUNET_SCHEDULER_add_now (&continue_with_shard, + NULL); } @@ -577,27 +933,28 @@ run (void *cls, (void) cls; (void) args; (void) cfgfile; + cfg = c; + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + cls); if (GNUNET_OK != exchange_serve_process_config ()) { - global_ret = GR_CONFIGURATION_INVALID; + global_ret = EXIT_NOTCONFIGURED; + GNUNET_SCHEDULER_shutdown (); return; } - wa_pos = wa_head; - GNUNET_assert (NULL != wa_pos); - task = GNUNET_SCHEDULER_add_now (&find_transfers, - NULL); - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - cls); ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, &rc); - rc = GNUNET_CURL_gnunet_rc_create (ctx); if (NULL == ctx) { GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + global_ret = EXIT_NO_RESTART; return; } + rc = GNUNET_CURL_gnunet_rc_create (ctx); + schedule_transfers (); } @@ -613,35 +970,63 @@ main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_string ('a', + "account", + "SECTION_NAME", + "name of the configuration section with the account we should watch (needed if more than one is enabled for crediting)", + &account_section), + GNUNET_GETOPT_option_flag ('e', + "exit-on-error", + "terminate wirewatch if we failed to download information from the bank", + &exit_on_error), + GNUNET_GETOPT_option_relative_time ('f', + "longpoll-timeout", + "DELAY", + "what is the timeout when asking the bank about new transactions, specify with unit (e.g. --longpoll-timeout=30s)", + &longpoll_timeout), + GNUNET_GETOPT_option_flag ('I', + "ignore-not-found", + "continue, even if the bank account of the exchange was not found", + &ignore_account_404), + GNUNET_GETOPT_option_uint ('S', + "size", + "SIZE", + "Size to process per shard (default: 1024)", + &shard_size), GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_option_flag ('t', "test", "run in test mode and exit when idle", &test_mode), - GNUNET_GETOPT_option_flag ('r', - "reset", - "start fresh with all transactions in the history", - &reset_mode), + GNUNET_GETOPT_option_uint ('w', + "workers", + "COUNT", + "Plan work load with up to COUNT worker processes (default: 16)", + &max_workers), + GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), GNUNET_GETOPT_OPTION_END }; + enum GNUNET_GenericReturnValue ret; + longpoll_timeout = LONGPOLL_TIMEOUT; if (GNUNET_OK != GNUNET_STRINGS_get_utf8_args (argc, argv, &argc, &argv)) - return GR_CMD_LINE_UTF8_ERROR; - if (GNUNET_OK != - GNUNET_PROGRAM_run (argc, argv, - "taler-exchange-wirewatch", - gettext_noop ( - "background process that watches for incoming wire transfers from customers"), - options, - &run, NULL)) - { - GNUNET_free_nz ((void *) argv); - return GR_CMD_LINE_OPTIONS_WRONG; - } + return EXIT_INVALIDARGUMENT; + TALER_OS_init (); + ret = GNUNET_PROGRAM_run ( + argc, argv, + "taler-exchange-wirewatch", + gettext_noop ( + "background process that watches for incoming wire transfers from customers"), + options, + &run, NULL); GNUNET_free_nz ((void *) argv); + if (GNUNET_SYSERR == ret) + return EXIT_INVALIDARGUMENT; + if (GNUNET_NO == ret) + return EXIT_SUCCESS; return global_ret; } |