From 737937291cceddd81e0dac676d3cb909250f628a Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 21 May 2022 21:07:24 +0200 Subject: wirewatch spring cleaning --- src/exchange/taler-exchange-wirewatch.c | 445 +++++++++++++++++--------------- 1 file changed, 244 insertions(+), 201 deletions(-) (limited to 'src') diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 898d678a3..21d2df150 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -106,6 +106,12 @@ struct WireAccount */ struct GNUNET_TIME_Absolute shard_start_time; + /** + * How long did we take to finish the last shard + * for this account? + */ + struct GNUNET_TIME_Relative shard_delay; + /** * Name of our job in the shard table. */ @@ -117,15 +123,10 @@ struct WireAccount unsigned int batch_size; /** - * How much do we incremnt @e batch_size on success? + * How much do we increment @e batch_size on success? */ unsigned int batch_thresh; - /** - * How many transactions did we see in the current batch? - */ - unsigned int current_batch_size; - /** * Should we delay the next request to the wire plugin a bit? Set to * false if we actually did some work. @@ -150,12 +151,6 @@ static struct WireAccount *wa_head; */ static struct WireAccount *wa_tail; -/** - * Wire account we are currently processing. This would go away - * if we ever start processing all accounts in parallel. - */ -static struct WireAccount *wa_pos; - /** * Handle to the context for interacting with the bank. */ @@ -184,11 +179,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; */ static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval; -/** - * How long did we take to finish the last shard? - */ -static struct GNUNET_TIME_Relative shard_delay; - /** * Modulus to apply to group shards. The shard size must ultimately be a * multiple of the batch size. Thus, if this is not a multiple of the @@ -249,9 +239,9 @@ shutdown_task (void *cls) wa->started_transaction = false; } qs = db_plugin->abort_shard (db_plugin->cls, - wa_pos->job_name, - wa_pos->shard_start, - wa_pos->shard_end); + wa->job_name, + wa->shard_start, + wa->shard_end); if (qs <= 0) GNUNET_log (GNUNET_ERROR_TYPE_WARNING, "Failed to abort work shard on shutdown\n"); @@ -259,8 +249,6 @@ shutdown_task (void *cls) GNUNET_free (wa); } } - wa_pos = NULL; - if (NULL != ctx) { GNUNET_CURL_fini (ctx); @@ -359,12 +347,22 @@ exchange_serve_process_config (void) /** - * Query for incoming wire transfers. + * Lock a shard and then begin to query for incoming wire transfers. * - * @param cls NULL + * @param cls a `struct WireAccount` to operate on */ static void -find_transfers (void *cls); +lock_shard (void *cls); + + +/** + * Continue with the credit history of the shard + * reserved as @a wa. + * + * @param[in,out] cls `struct WireAccount *` account with shard to continue processing + */ +static void +continue_with_shard (void *cls); /** @@ -387,23 +385,59 @@ handle_soft_error (struct WireAccount *wa) (unsigned long long) wa->batch_size); } GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&find_transfers, - NULL); + /* Reset to beginning of transaction, and go again + from there. */ + wa->latest_row_off = wa->batch_start; + task = GNUNET_SCHEDULER_add_now (&continue_with_shard, + wa); } /** - * We are done with a shard, move on to the next one. + * Schedule the #lock_shard() operation for + * @a wa. If @a wa is NULL, start with #wa_head. + * + * @param wa account to schedule #lock_shard() for, + * possibly NULL (!). + */ +static void +schedule_transfers (struct WireAccount *wa) +{ + if (NULL == wa) + { + wa = wa_head; + GNUNET_assert (NULL != wa); + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Will try to lock next shard of %s in %s\n", + wa->job_name, + GNUNET_STRINGS_relative_time_to_string ( + GNUNET_TIME_absolute_get_remaining (wa->delayed_until), + GNUNET_YES)); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_at (wa->delayed_until, + &lock_shard, + wa); +} + + +/** + * We are done with the work that is possible on @a wa right now (and the + * transaction was committed, if there was one to commit). Move on to the next + * account. * * @param wa wire account for which we completed a shard */ static void -shard_completed (struct WireAccount *wa) +account_completed (struct WireAccount *wa) { - /* transaction success, update #last_row_off */ - wa->batch_start = wa->latest_row_off; - if (wa->batch_size < MAXIMUM_BATCH_SIZE) + GNUNET_assert (! wa->started_transaction); + if ( (wa->batch_start + wa->batch_size == + wa->latest_row_off) && + (wa->batch_size < MAXIMUM_BATCH_SIZE) ) { + /* The current batch size worked without serialization + issues, and we are allowed to grow. Do so slowly. */ int delta; delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4; @@ -411,45 +445,45 @@ shard_completed (struct WireAccount *wa) delta = -delta; wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, wa->batch_size + delta + 1); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Increasing batch size to %llu\n", (unsigned long long) wa->batch_size); } + if (wa->delay) { + /* This account was finished, block this one for the + #wirewatch_idle_sleep_interval and move on to the next one. */ wa->delayed_until = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); - wa_pos = wa_pos->next; - if (NULL == wa_pos) - wa_pos = wa_head; - GNUNET_assert (NULL != wa_pos); + wa = wa->next; } - GNUNET_assert (NULL == task); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Will look for more transfers in %s\n", - GNUNET_STRINGS_relative_time_to_string ( - GNUNET_TIME_absolute_get_remaining (wa_pos->delayed_until), - GNUNET_YES)); - task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until, - &find_transfers, - NULL); + schedule_transfers (wa); } /** - * We are finished with the current shard. Update the database, marking the - * shard as finished. + * Check if we are finished with the current shard. If so, update the + * database, marking the shard as finished. * * @param wa wire account to commit for - * @return true on success + * @return true if we were indeed done with the shard */ static bool -mark_shard_done (struct WireAccount *wa) +check_shard_done (struct WireAccount *wa) { enum GNUNET_DB_QueryStatus qs; if (wa->shard_end > wa->latest_row_off) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Shard %s (%llu,%llu] at %llu\n", + wa->job_name, + (unsigned long long) wa->shard_start, + (unsigned long long) wa->shard_end, + (unsigned long long) wa->latest_row_off); return false; /* actually, not done! */ + } /* shard is complete, mark this as well */ qs = db_plugin->complete_shard (db_plugin->cls, wa->job_name, @@ -468,28 +502,25 @@ mark_shard_done (struct WireAccount *wa) handle_soft_error (wa); return false; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* already existed, ok, let's just continue */ + GNUNET_break (0); + /* Not expected, but let's just continue */ break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: /* normal case */ - shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time); - + wa->shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Completed shard %s (%llu,%llu] after %s\n", + wa->job_name, + (unsigned long long) wa->shard_start, + (unsigned long long) wa->shard_end, + GNUNET_STRINGS_relative_time_to_string (wa->shard_delay, + GNUNET_YES)); break; } return true; } -/** - * Continue with the credit history of the shard - * reserved as @a wa_pos. - * - * @param[in,out] wa_pos shard to continue processing - */ -static void -continue_with_shard (struct WireAccount *wa_pos); - - /** * We are finished with the current transaction, try * to commit and then schedule the next iteration. @@ -502,8 +533,17 @@ do_commit (struct WireAccount *wa) enum GNUNET_DB_QueryStatus qs; bool shard_done; + shard_done = check_shard_done (wa); wa->started_transaction = false; - shard_done = mark_shard_done (wa); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Committing %s progress (%llu,%llu] at %llu\n (%s)", + wa->job_name, + (unsigned long long) wa->shard_start, + (unsigned long long) wa->shard_end, + (unsigned long long) wa->latest_row_off, + shard_done + ? "shard done" + : "shard incomplete"); qs = db_plugin->commit (db_plugin->cls); switch (qs) { @@ -521,7 +561,7 @@ do_commit (struct WireAccount *wa) break; } if (shard_done) - shard_completed (wa); + account_completed (wa); else continue_with_shard (wa); } @@ -568,63 +608,67 @@ history_cb (void *cls, } if (wa->started_transaction) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "End of list. Committing progress!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "End of list. Committing progress on %s of (%llu,%llu]!\n", + wa->job_name, + (unsigned long long) wa->batch_start, + (unsigned long long) wa->latest_row_off); do_commit (wa); + return GNUNET_OK; /* will be ignored anyway */ } - else + /* We did not even start a transaction. */ + if ( (wa->delay) && + (test_mode) && + (NULL == wa->next) ) { - if ( (wa->delay) && - (test_mode) && - (NULL == wa->next) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Shutdown due to test mode!\n"); - GNUNET_SCHEDULER_shutdown (); - return GNUNET_OK; - } - else - { - shard_completed (wa); - } + /* We exit on idle */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Shutdown due to test mode!\n"); + GNUNET_SCHEDULER_shutdown (); + return GNUNET_OK; } + account_completed (wa); return GNUNET_OK; /* will be ignored anyway */ } + + /* We did get 'details' from the bank. Do sanity checks before inserting. */ if (serial_id < wa->latest_row_off) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Serial ID %llu not monotonic (got %llu before). Failing!\n", (unsigned long long) serial_id, (unsigned long long) wa->latest_row_off); - if (wa->started_transaction) - { - wa->started_transaction = false; - db_plugin->rollback (db_plugin->cls); - } GNUNET_SCHEDULER_shutdown (); wa->hh = NULL; return GNUNET_SYSERR; } + /* If we got 'limit' transactions back from the bank, + we should not introduce any delay before the next + call. */ if (serial_id >= wa->max_row_off) wa->delay = false; if (serial_id > wa->shard_end) { - /* we are done with the current shard, commit and stop this iteration! */ + /* we are *past* the current shard (likely because the serial_id of the + shard_end happens to not exist in the DB). So commit and stop this + iteration! */ GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Serial ID %llu past shard end at %llu, ending iteration early!\n", (unsigned long long) serial_id, (unsigned long long) wa->shard_end); - wa->latest_row_off = serial_id; + wa->latest_row_off = serial_id - 1; /* excluding serial_id! */ + wa->hh = NULL; if (wa->started_transaction) { do_commit (wa); } else { - if (mark_shard_done (wa)) - shard_completed (wa); + if (check_shard_done (wa)) + account_completed (wa); + else + continue_with_shard (wa); } - wa->hh = NULL; return GNUNET_SYSERR; } if (! wa->started_transaction) @@ -640,7 +684,6 @@ history_cb (void *cls, wa->hh = NULL; return GNUNET_SYSERR; } - wa_pos->shard_start_time = GNUNET_TIME_absolute_get (); wa->started_transaction = true; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, @@ -675,6 +718,17 @@ history_cb (void *cls, wa->hh = NULL; return GNUNET_SYSERR; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* Either wirewatch was freshly started after the system was + shutdown and we're going over an incomplete shard again + after being restarted, or the shard lock period was too + short (number of workers set incorrectly?) and a 2nd + wirewatcher has been stealing our work while we are still + at it. */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Attempted to import transaction %llu (%s) twice. " + "This should happen rarely (if not, ask for support).\n", + (unsigned long long) serial_id, + wa->job_name); /* already existed, ok, let's just continue */ break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: @@ -686,130 +740,121 @@ history_cb (void *cls, } -/** - * Query for incoming wire transfers. - * - * @param cls NULL - */ static void -find_transfers (void *cls) +continue_with_shard (void *cls) { - enum GNUNET_DB_QueryStatus qs; + struct WireAccount *wa = cls; + unsigned int limit; - (void) cls; - task = NULL; - if (GNUNET_SYSERR == - db_plugin->preflight (db_plugin->cls)) + limit = GNUNET_MIN (wa->batch_size, + wa->shard_end - wa->latest_row_off); + wa->max_row_off = wa->latest_row_off + limit; + GNUNET_assert (NULL == wa->hh); + wa->hh = TALER_BANK_credit_history (ctx, + wa->ai->auth, + wa->latest_row_off, + limit, + test_mode + ? GNUNET_TIME_UNIT_ZERO + : LONGPOLL_TIMEOUT, + &history_cb, + wa); + if (NULL == wa->hh) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to obtain database connection!\n"); + "Failed to start request for account history!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } - wa_pos->delay = true; - wa_pos->current_batch_size = 0; /* reset counter */ - if (wa_pos->shard_end <= wa_pos->batch_start) - { - uint64_t start; - uint64_t end; - struct GNUNET_TIME_Relative delay; - /* advance to next shard */ - - if (0 == max_workers) - delay = GNUNET_TIME_UNIT_ZERO; - else - delay.rel_value_us = GNUNET_CRYPTO_random_u64 ( - GNUNET_CRYPTO_QUALITY_WEAK, - 4 * GNUNET_TIME_relative_max ( - wirewatch_idle_sleep_interval, - GNUNET_TIME_relative_multiply (shard_delay, - max_workers)).rel_value_us); - qs = db_plugin->begin_shard (db_plugin->cls, - wa_pos->job_name, - delay, - shard_size, - &start, - &end); - switch (qs) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to obtain starting point for montoring from database!\n"); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - /* try again */ - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Serialization error tying to obtain shard, will try again in %s!\n", - GNUNET_STRINGS_relative_time_to_string ( - wirewatch_idle_sleep_interval, - GNUNET_YES)); - task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval, - &find_transfers, - NULL); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - GNUNET_break (0); - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "No shard available, will try again in %s!\n", - GNUNET_STRINGS_relative_time_to_string ( - wirewatch_idle_sleep_interval, - GNUNET_YES)); - task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval, - &find_transfers, - NULL); - return; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - wa_pos->shard_start = start; - wa_pos->shard_end = end; - wa_pos->batch_start = start; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Starting with shard at [%llu,%llu) locked for %s\n", - (unsigned long long) start, - (unsigned long long) end, - GNUNET_STRINGS_relative_time_to_string (delay, - GNUNET_YES)); - break; - } - } - wa_pos->latest_row_off = wa_pos->batch_start; - continue_with_shard (wa_pos); } static void -continue_with_shard (struct WireAccount *wa_pos) +lock_shard (void *cls) { - unsigned int limit; + struct WireAccount *wa = cls; + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_TIME_Relative delay; - limit = GNUNET_MIN (wa_pos->batch_size, - wa_pos->shard_end - wa_pos->latest_row_off); - GNUNET_assert (NULL == wa_pos->hh); - wa_pos->max_row_off = wa_pos->latest_row_off + limit - 1; - wa_pos->hh = TALER_BANK_credit_history (ctx, - wa_pos->ai->auth, - wa_pos->latest_row_off, - limit, - test_mode - ? GNUNET_TIME_UNIT_ZERO - : LONGPOLL_TIMEOUT, - &history_cb, - wa_pos); - if (NULL == wa_pos->hh) + task = NULL; + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start request for account history!\n"); - if (wa_pos->started_transaction) - { - db_plugin->rollback (db_plugin->cls); - wa_pos->started_transaction = false; - } + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + /* How long we lock a shard depends on the number of + workers expected, and how long we usually took to + process a shard. */ + if (0 == max_workers) + delay = GNUNET_TIME_UNIT_ZERO; + else + delay.rel_value_us = GNUNET_CRYPTO_random_u64 ( + GNUNET_CRYPTO_QUALITY_WEAK, + 4 * GNUNET_TIME_relative_max ( + wirewatch_idle_sleep_interval, + GNUNET_TIME_relative_multiply (wa->shard_delay, + max_workers)).rel_value_us); + wa->shard_start_time = GNUNET_TIME_absolute_get (); + qs = db_plugin->begin_shard (db_plugin->cls, + wa->job_name, + delay, + shard_size, + &wa->shard_start, + &wa->shard_end); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain starting point for montoring from database!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* try again */ + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization error tying to obtain shard %s, will try again in %s!\n", + wa->job_name, + GNUNET_STRINGS_relative_time_to_string ( + wirewatch_idle_sleep_interval, + GNUNET_YES)); + wa->delayed_until = GNUNET_TIME_relative_to_absolute ( + wirewatch_idle_sleep_interval); + schedule_transfers (wa->next); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "No shard available, will try again for %s in %s!\n", + wa->job_name, + GNUNET_STRINGS_relative_time_to_string ( + wirewatch_idle_sleep_interval, + GNUNET_YES)); + wa->delayed_until = GNUNET_TIME_relative_to_absolute ( + wirewatch_idle_sleep_interval); + schedule_transfers (wa->next); + return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* continued below */ + break; } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Starting with shard %s at (%llu,%llu] locked for %s\n", + wa->job_name, + (unsigned long long) wa->shard_start, + (unsigned long long) wa->shard_end, + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_YES)); + wa->delay = true; /* default is to delay, unless + we find out that we're really busy */ + wa->batch_start = wa->shard_start; + wa->latest_row_off = wa->batch_start; + continue_with_shard (wa); } @@ -838,21 +883,19 @@ run (void *cls, global_ret = EXIT_NOTCONFIGURED; return; } - wa_pos = wa_head; - GNUNET_assert (NULL != wa_pos); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, cls); ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, &rc); - rc = GNUNET_CURL_gnunet_rc_create (ctx); if (NULL == ctx) { GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); return; } - - task = GNUNET_SCHEDULER_add_now (&find_transfers, - NULL); + rc = GNUNET_CURL_gnunet_rc_create (ctx); + task = GNUNET_SCHEDULER_add_now (&lock_shard, + wa_head); } -- cgit v1.2.3