From 9c51720cbfb86c89bc1f1872432c4f6a66fba5bd Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 21 Jun 2021 00:17:16 +0200 Subject: fixing parallel fakebank to ensure transactions are ordered, fixing indices/constraint preservation after DB update to 0002 --- src/exchange/taler-exchange-wirewatch.c | 414 ++++++++++++++++++++------------ 1 file changed, 264 insertions(+), 150 deletions(-) (limited to 'src/exchange/taler-exchange-wirewatch.c') diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 28fa81e7e..5d35eba57 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -29,13 +29,12 @@ #include "taler_json_lib.h" #include "taler_bank_service.h" -#define DEBUG_LOGGING 0 /** - * What is the initial batch size we use for credit history + * What is the maximum batch size we use for credit history * requests with the bank. See `batch_size` below. */ -#define INITIAL_BATCH_SIZE 1024 +#define MAXIMUM_BATCH_SIZE 1024 /** * Information we keep for each supported account. @@ -81,34 +80,48 @@ struct WireAccount * Encoded offset in the wire transfer list from where * to start the next query with the bank. */ - uint64_t last_row_off; + uint64_t batch_start; /** * Latest row offset seen in this transaction, becomes - * the new #last_row_off upon commit. + * the new #batch_start upon commit. */ uint64_t latest_row_off; /** - * Offset where our current shard ends. + * Offset where our current shard begins (inclusive). + */ + uint64_t shard_start; + + /** + * Offset where our current shard ends (exclusive). */ uint64_t shard_end; + /** + * When did we start with the shard? + */ + struct GNUNET_TIME_Absolute shard_start_time; + + /** + * Name of our job in the shard table. + */ + char *job_name; + /** * How many transactions do we retrieve per batch? */ unsigned int batch_size; /** - * How many transactions did we see in the current batch? + * How much do we incremnt @e batch_size on success? */ - unsigned int current_batch_size; + unsigned int batch_increment; /** - * Are we running from scratch and should re-process all transactions - * for this account? + * How many transactions did we see in the current batch? */ - bool reset_mode; + unsigned int current_batch_size; /** * Should we delay the next request to the wire plugin a bit? Set to @@ -157,13 +170,29 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; /** * How long should we sleep when idle before trying to find more work? + * Also used for how long we wait to grab a shard before trying it again. + * The value should be set to a bit above the average time it takes to + * process a shard. */ static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval; /** - * Modulus to apply to group shards. + * How long did we take to finish the last shard? */ -static unsigned int shard_size = 1024; +static struct GNUNET_TIME_Relative shard_delay; + +/** + * Modulus to apply to group shards. The shard size must ultimately be a + * multiple of the batch size. Thus, if this is not a multiple of the + * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size. + */ +static unsigned int shard_size = MAXIMUM_BATCH_SIZE; + +/** + * How many workers should we plan our scheduling with? + */ +static unsigned int max_workers = 16; + /** * Value to return from main(). 0 on success, non-zero on @@ -186,11 +215,6 @@ static enum */ static int test_mode; -/** - * Are we running from scratch and should re-process all transactions? - */ -static int reset_mode; - /** * Current task waiting for execution, if any. */ @@ -221,6 +245,7 @@ shutdown_task (void *cls) wa); TALER_BANK_auth_free (&wa->auth); GNUNET_free (wa->section_name); + GNUNET_free (wa->job_name); GNUNET_free (wa); } } @@ -263,7 +288,6 @@ add_account_cb (void *cls, if (GNUNET_YES != ai->credit_enabled) return; /* not enabled for us, skip */ wa = GNUNET_new (struct WireAccount); - wa->reset_mode = reset_mode; if (GNUNET_OK != TALER_BANK_auth_parse_cfg (cfg, ai->section_name, @@ -276,7 +300,12 @@ add_account_cb (void *cls, return; } wa->section_name = GNUNET_strdup (ai->section_name); - wa->batch_size = INITIAL_BATCH_SIZE; + GNUNET_asprintf (&wa->job_name, + "wirewatch-%s", + ai->section_name); + wa->batch_size = MAXIMUM_BATCH_SIZE; + if (0 != shard_size % wa->batch_size) + wa->batch_size = shard_size; GNUNET_CONTAINER_DLL_insert (wa_head, wa_tail, wa); @@ -333,6 +362,127 @@ static void find_transfers (void *cls); +/** + * We encountered a serialization error. + * Rollback the transaction and try again + * + * @param wa account we are transacting on + */ +static void +handle_soft_error (struct WireAccount *wa) +{ + db_plugin->rollback (db_plugin->cls, + wa->session); + if (1 < wa->batch_size) + { + wa->batch_size /= 2; + wa->batch_increment = 0; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Reduced batch size to %llu due to serialization issue\n", + (unsigned long long) wa->batch_size); + } + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&find_transfers, + NULL); +} + + +/** + * We are finished with the current transaction, try + * to commit and then schedule the next iteration. + * + * @param wa wire account to commit for + */ +static void +do_commit (struct WireAccount *wa) +{ + enum GNUNET_DB_QueryStatus qs; + + if (wa->shard_end <= wa->latest_row_off) + { + /* shard is complete, mark this as well */ + qs = db_plugin->complete_shard (db_plugin->cls, + wa->session, + wa->job_name, + wa->shard_start, + wa->shard_end); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls, + wa->session); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for complete_shard. Rolling back.\n"); + handle_soft_error (wa); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* already existed, ok, let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time); + + break; + } + } + qs = db_plugin->commit (db_plugin->cls, + wa->session); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* reduce transaction size to reduce rollback probability */ + handle_soft_error (wa); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + break; + } + /* transaction success, update #last_row_off */ + wa->batch_start = wa->latest_row_off; + wa->session = NULL; /* should not be needed */ + if (wa->batch_size < MAXIMUM_BATCH_SIZE) + { + wa->batch_increment++; + wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE, + wa->batch_size + wa->batch_increment); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Increasing batch size to %llu\n", + (unsigned long long) wa->batch_size); + } + if ( (wa->delay) && + (test_mode) && + (NULL == wa->next) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Shutdown due to test mode!\n"); + GNUNET_SCHEDULER_shutdown (); + return; + } + if (wa->delay) + { + wa->delayed_until + = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); + wa_pos = wa_pos->next; + if (NULL == wa_pos) + wa_pos = wa_head; + GNUNET_assert (NULL != wa_pos); + } + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until, + &find_transfers, + NULL); +} + + /** * Callbacks of this type are used to serve the result of asking * the bank for the transaction history. @@ -370,89 +520,38 @@ history_cb (void *cls, } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "End of list. Committing progress!\n"); - qs = db_plugin->commit (db_plugin->cls, - session); - if (GNUNET_DB_STATUS_HARD_ERROR == qs) - { - GNUNET_SCHEDULER_shutdown (); - return GNUNET_OK; - } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - /* reduce transaction size to reduce rollback probability */ - if (2 > wa->batch_size) - { - wa->batch_size /= 2; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Reduced batch size to %llu due to serialization issue\n", - (unsigned long long) wa->batch_size); - } - /* try again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&find_transfers, - NULL); - return GNUNET_OK; /* will be ignored anyway */ - } - GNUNET_break (0 <= qs); - /* transaction success, update #last_row_off */ - wa->last_row_off = wa->latest_row_off; - wa->latest_row_off = 0; /* should not be needed */ - wa->session = NULL; /* should not be needed */ - if (wa->batch_size < INITIAL_BATCH_SIZE) - { - wa->batch_size += 1; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Increasing batch size to %llu\n", - (unsigned long long) wa->batch_size); - } - if ( (wa->delay) && - (test_mode) && - (NULL == wa->next) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Shutdown due to test mode!\n"); - GNUNET_SCHEDULER_shutdown (); - return GNUNET_OK; - } - if (wa->delay) - { - wa->delayed_until - = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval); - wa_pos = wa_pos->next; - if (NULL == wa_pos) - wa_pos = wa_head; - GNUNET_assert (NULL != wa_pos); - } - task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until, - &find_transfers, - NULL); + do_commit (wa); return GNUNET_OK; /* will be ignored anyway */ } + if (serial_id < wa->latest_row_off) + { + /* we are done with the current shard, commit and stop this iteration! */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Serial ID %llu not monotonic (got %llu before). Failing!\n", + (unsigned long long) serial_id, + (unsigned long long) wa->latest_row_off); + db_plugin->rollback (db_plugin->cls, + session); + GNUNET_SCHEDULER_shutdown (); + wa->hh = NULL; + return GNUNET_SYSERR; + } + if (serial_id > wa->shard_end) + { + /* we are done with the current shard, commit and stop this iteration! */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serial ID %llu past shard end at %llu, ending iteration early!\n", + (unsigned long long) serial_id, + (unsigned long long) wa->shard_end); + wa->latest_row_off = serial_id - 1; + do_commit (wa); + wa->hh = NULL; + return GNUNET_SYSERR; + } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Adding wire transfer over %s with (hashed) subject `%s'\n", TALER_amount2s (&details->amount), TALER_B2S (&details->reserve_pub)); - - /** - * Debug block. - */ -#if DEBUG_LOGGING - { - /** Should be 53, give 80 just to be extra conservative (and aligned). */ -#define PUBSIZE 80 - char wtid_s[PUBSIZE]; - - GNUNET_break (NULL != - GNUNET_STRINGS_data_to_string (&details->reserve_pub, - sizeof (details->reserve_pub), - &wtid_s[0], - PUBSIZE)); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Plain text subject (= reserve_pub): %s\n", - wtid_s); - } -#endif - /* FIXME-PERFORMANCE: Consider using Postgres multi-valued insert here, for up to 15x speed-up according to https://dba.stackexchange.com/questions/224989/multi-row-insert-vs-transactional-single-row-inserts#225006 @@ -466,26 +565,27 @@ history_cb (void *cls, details->debit_account_url, wa->section_name, serial_id); - if (GNUNET_DB_STATUS_HARD_ERROR == qs) + switch (qs) { + case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); db_plugin->rollback (db_plugin->cls, session); GNUNET_SCHEDULER_shutdown (); wa->hh = NULL; return GNUNET_SYSERR; - } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Got DB soft error for reserves_in_insert. Rolling back.\n"); - db_plugin->rollback (db_plugin->cls, - session); + handle_soft_error (wa); wa->hh = NULL; - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&find_transfers, - NULL); return GNUNET_SYSERR; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* already existed, ok, let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + break; } wa->delay = false; wa->latest_row_off = serial_id; @@ -515,64 +615,77 @@ find_transfers (void *cls) GNUNET_SCHEDULER_shutdown (); return; } - db_plugin->preflight (db_plugin->cls, - session); - if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - session, - "wirewatch check for incoming wire transfers")) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL; - GNUNET_SCHEDULER_shutdown (); - return; - } wa_pos->delay = true; wa_pos->current_batch_size = 0; /* reset counter */ wa_pos->session = session; - if (wa_pos->shard_end == wa_pos->last_row_off) + if (wa_pos->shard_end <= wa_pos->batch_start) { + uint64_t start; + uint64_t end; + struct GNUNET_TIME_Relative delay; /* advance to next shard */ - // FIXME: if other processes are running in parallel, - // update 'last_row_off' to next free shard! - wa_pos->shard_end = wa_pos->last_row_off + shard_size; - } - if (! wa_pos->reset_mode) - { - // FIXME: need good way to fetch - // shard data here! - qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls, - session, - wa_pos->section_name, - &wa_pos->last_row_off); - if (GNUNET_DB_STATUS_HARD_ERROR == qs) + + delay.rel_value_us = GNUNET_CRYPTO_random_u64 ( + GNUNET_CRYPTO_QUALITY_WEAK, + 4 * GNUNET_TIME_relative_max ( + wirewatch_idle_sleep_interval, + GNUNET_TIME_relative_multiply (shard_delay, + max_workers)).rel_value_us); + qs = db_plugin->begin_shard (db_plugin->cls, + wa_pos->job_name, + delay, + shard_size, + &start, + &end); + switch (qs) { + case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to obtain starting point for montoring from database!\n"); - db_plugin->rollback (db_plugin->cls, - session); global_ret = GR_DATABASE_SELECT_LATEST_HARD_FAIL; GNUNET_SCHEDULER_shutdown (); return; - } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { + case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ - db_plugin->rollback (db_plugin->cls, - session); - task = GNUNET_SCHEDULER_add_now (&find_transfers, - NULL); + task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval, + &find_transfers, + NULL); return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); + task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval, + &find_transfers, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + wa_pos->shard_start_time = GNUNET_TIME_absolute_get (); + wa_pos->shard_start = start; + wa_pos->shard_end = end; + wa_pos->batch_start = start; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Starting with shard at %llu\n", + (unsigned long long) start); + break; } } - wa_pos->reset_mode = true; + if (GNUNET_OK != + db_plugin->start (db_plugin->cls, + session, + "wirewatch check for incoming wire transfers")) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to start database transaction!\n"); + global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL; + GNUNET_SCHEDULER_shutdown (); + return; + } limit = GNUNET_MIN (wa_pos->batch_size, - wa_pos->shard_end - wa_pos->last_row_off); + wa_pos->shard_end - wa_pos->batch_start); GNUNET_assert (NULL == wa_pos->hh); + wa_pos->latest_row_off = wa_pos->batch_start; wa_pos->hh = TALER_BANK_credit_history (ctx, &wa_pos->auth, - wa_pos->last_row_off, + wa_pos->batch_start, limit, &history_cb, wa_pos); @@ -644,10 +757,6 @@ main (int argc, char *const *argv) { struct GNUNET_GETOPT_CommandLineOption options[] = { - GNUNET_GETOPT_option_flag ('r', - "reset", - "start fresh with all transactions in the history", - &reset_mode), GNUNET_GETOPT_option_uint ('S', "size", "SIZE", @@ -659,6 +768,11 @@ main (int argc, "test", "run in test mode and exit when idle", &test_mode), + GNUNET_GETOPT_option_uint ('w', + "workers", + "COUNT", + "Plan work load with up to COUNT worker processes (default: 16)", + &max_workers), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; -- cgit v1.2.3