From 0271e848138a94e27f472196f5341879fd3ab8ba Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 19 Jun 2021 18:20:19 +0200 Subject: -bugfix, preparations for sharding --- src/exchange/taler-exchange-wirewatch.c | 85 +++++++++++++++++++-------------- 1 file changed, 50 insertions(+), 35 deletions(-) (limited to 'src/exchange/taler-exchange-wirewatch.c') diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 40b962f8a..760dbe10b 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2016--2020 Taler Systems SA + Copyright (C) 2016--2021 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -89,6 +89,11 @@ struct WireAccount */ uint64_t latest_row_off; + /** + * Offset where our current shard ends. + */ + uint64_t shard_end; + /** * How many transactions do we retrieve per batch? */ @@ -103,19 +108,14 @@ struct WireAccount * Are we running from scratch and should re-process all transactions * for this account? */ - int reset_mode; + bool reset_mode; /** * Should we delay the next request to the wire plugin a bit? Set to - * #GNUNET_NO if we actually did some work. + * false if we actually did some work. */ - int delay; + bool delay; - /** - * Did we experience a soft failure during the current - * transaction? - */ - bool soft_fail; }; @@ -160,6 +160,11 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; */ static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval; +/** + * Modulus to apply to group shards. + */ +static unsigned int shard_size = 1024; + /** * Value to return from main(). 0 on success, non-zero on * on serious errors. @@ -363,20 +368,10 @@ history_cb (void *cls, (unsigned int) ec, http_status); } - if (wa->soft_fail) - { - /* no point to commit, transaction was already rolled - back after we encountered a soft failure */ - wa->soft_fail = false; - qs = GNUNET_DB_STATUS_SOFT_ERROR; - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "End of list. Committing progress!\n"); - qs = db_plugin->commit (db_plugin->cls, - session); - } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "End of list. Committing progress!\n"); + qs = db_plugin->commit (db_plugin->cls, + session); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_SCHEDULER_shutdown (); @@ -410,7 +405,7 @@ history_cb (void *cls, "Increasing batch size to %llu\n", (unsigned long long) wa->batch_size); } - if ( (GNUNET_YES == wa->delay) && + if ( (wa->delay) && (test_mode) && (NULL == wa->next) ) { @@ -419,7 +414,7 @@ history_cb (void *cls, GNUNET_SCHEDULER_shutdown (); return GNUNET_OK; } - if (GNUNET_YES == wa->delay) + if (wa->delay) { wa->delayed_until = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); @@ -477,6 +472,7 @@ history_cb (void *cls, db_plugin->rollback (db_plugin->cls, session); GNUNET_SCHEDULER_shutdown (); + wa->hh = NULL; return GNUNET_SYSERR; } if (GNUNET_DB_STATUS_SOFT_ERROR == qs) @@ -485,10 +481,13 @@ history_cb (void *cls, "Got DB soft error for reserves_in_insert. Rolling back.\n"); db_plugin->rollback (db_plugin->cls, session); - wa->soft_fail = true; + wa->hh = NULL; + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&find_transfers, + NULL); return GNUNET_SYSERR; } - wa->delay = GNUNET_NO; + wa->delay = false; wa->latest_row_off = serial_id; return GNUNET_OK; } @@ -504,6 +503,7 @@ find_transfers (void *cls) { struct TALER_EXCHANGEDB_Session *session; enum GNUNET_DB_QueryStatus qs; + unsigned int limit; (void) cls; task = NULL; @@ -555,13 +555,21 @@ find_transfers (void *cls) } wa_pos->reset_mode = GNUNET_NO; } - wa_pos->delay = GNUNET_YES; + wa_pos->delay = true; wa_pos->current_batch_size = 0; /* reset counter */ wa_pos->session = session; + if (wa_pos->shard_end == wa_pos->last_row_off) + { + /* advance to next shard */ + wa_pos->shard_end += shard_size; + } + limit = GNUNET_MIN (wa_pos->batch_size, + wa_pos->shard_end - wa_pos->last_row_off); + GNUNET_assert (NULL == wa_pos->hh); wa_pos->hh = TALER_BANK_credit_history (ctx, &wa_pos->auth, wa_pos->last_row_off, - wa_pos->batch_size, + limit, &history_cb, wa_pos); if (NULL == wa_pos->hh) @@ -594,6 +602,7 @@ run (void *cls, (void) cls; (void) args; (void) cfgfile; + cfg = c; if (GNUNET_OK != exchange_serve_process_config ()) @@ -603,8 +612,6 @@ run (void *cls, } wa_pos = wa_head; GNUNET_assert (NULL != wa_pos); - task = GNUNET_SCHEDULER_add_now (&find_transfers, - NULL); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, cls); ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, @@ -615,6 +622,9 @@ run (void *cls, GNUNET_break (0); return; } + + task = GNUNET_SCHEDULER_add_now (&find_transfers, + NULL); } @@ -630,16 +640,21 @@ main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { + GNUNET_GETOPT_option_flag ('r', + "reset", + "start fresh with all transactions in the history", + &reset_mode), + GNUNET_GETOPT_option_uint ('S', + "size", + "SIZE", + "Size to process per shard (default: 1024)", + &shard_size), GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_option_flag ('t', "test", "run in test mode and exit when idle", &test_mode), - GNUNET_GETOPT_option_flag ('r', - "reset", - "start fresh with all transactions in the history", - &reset_mode), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; -- cgit v1.2.3