From 5149af93147c54055d99af688993de3fb4c36ddf Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 3 Sep 2021 19:08:02 +0200 Subject: preliminary work on supporting sharding/parallel aggregation (undertested, but tests pass again) --- src/exchange/taler-exchange-aggregator.c | 273 ++++++++++++++++++++++++------- 1 file changed, 212 insertions(+), 61 deletions(-) (limited to 'src/exchange/taler-exchange-aggregator.c') diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index e202290d9..0fc13c145 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -107,6 +107,35 @@ struct AggregationUnit }; +/** + * Work shard we are processing. + */ +struct Shard +{ + + /** + * When did we start processing the shard? + */ + struct GNUNET_TIME_Absolute start_time; + + /** + * Starting row of the shard. + */ + uint32_t shard_start; + + /** + * Exclusive end row of the shard. + */ + uint32_t shard_end; + + /** + * Number of starting points found in the shard. + */ + uint64_t work_counter; + +}; + + /** * What is the smallest unit we support for wire transfers? * We will need to round down to a multiple of this amount. @@ -135,11 +164,19 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; */ static struct GNUNET_SCHEDULER_Task *task; + /** * How long should we sleep when idle before trying to find more work? */ static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; +/** + * How big are the shards we are processing? Is an inclusive offset, so every + * shard ranges from [X,X+shard_size) exclusive. So a shard covers + * shard_size slots. The maximum value for shard_size is INT32_MAX+1. + */ +static uint32_t shard_size; + /** * Value to return from main(). 0 on success, non-zero on errors. */ @@ -161,6 +198,15 @@ static void run_aggregation (void *cls); +/** + * Select a shard to work on. + * + * @param cls NULL + */ +static void +run_shard (void *cls); + + /** * Free data stored in @a au, but not @a au itself (stack allocated). * @@ -611,31 +657,57 @@ commit_or_warn (void) } +/** + * Release lock on shard @a s in the database. + * On error, terminates this process. + * + * @param[in] s shard to free (and memory to release) + */ +static void +release_shard (struct Shard *s) +{ + enum GNUNET_DB_QueryStatus qs; + + qs = db_plugin->release_revolving_shard ( + db_plugin->cls, + "aggregator", + s->shard_start, + s->shard_end); + GNUNET_free (s); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* Strange, but let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + break; + } +} + + /** * Main work function that queries the DB and aggregates transactions * into larger wire transfers. * - * @param cls NULL + * @param cls a `struct Shard *` */ static void run_aggregation (void *cls) { + struct Shard *s = cls; struct AggregationUnit au_active; enum GNUNET_DB_QueryStatus qs; - (void) cls; task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Checking for ready deposits to aggregate\n"); - if (GNUNET_SYSERR == - db_plugin->preflight (db_plugin->cls)) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to obtain database connection!\n"); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return; - } if (GNUNET_OK != db_plugin->start_deferred_wire_out (db_plugin->cls)) { @@ -643,50 +715,70 @@ run_aggregation (void *cls) "Failed to start database transaction!\n"); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); + release_shard (s); return; } memset (&au_active, 0, sizeof (au_active)); - qs = db_plugin->get_ready_deposit (db_plugin->cls, - &deposit_cb, - &au_active); - if (0 >= qs) + qs = db_plugin->get_ready_deposit ( + db_plugin->cls, + s->shard_start, + s->shard_end - 1, /* -1: exclusive->inclusive */ + &deposit_cb, + &au_active); + switch (qs) { + case GNUNET_DB_STATUS_HARD_ERROR: cleanup_au (&au_active); db_plugin->rollback (db_plugin->cls); - if (GNUNET_DB_STATUS_HARD_ERROR == qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to execute deposit iteration!\n"); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return; - } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to begin deposit iteration!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + cleanup_au (&au_active); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + s); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: { - /* should re-try immediately */ + uint64_t counter = s->work_counter; + struct GNUNET_TIME_Relative duration + = GNUNET_TIME_absolute_get_duration (s->start_time); + + cleanup_au (&au_active); + db_plugin->rollback (db_plugin->cls); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Completed shard after %s\n", + GNUNET_STRINGS_relative_time_to_string (duration, + GNUNET_YES)); + release_shard (s); + if (GNUNET_YES == test_mode) + { + /* in test mode, shutdown after a shard is done */ + GNUNET_SCHEDULER_shutdown (); + return; + } GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); + /* If we ended up doing zero work, sleep a bit */ + if (0 == counter) + task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, + &run_shard, + NULL); + else + task = GNUNET_SCHEDULER_add_now (&run_shard, + NULL); return; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "No more ready deposits, going to sleep\n"); - if (GNUNET_YES == test_mode) - { - /* in test mode, shutdown if we end up being idle */ - GNUNET_SCHEDULER_shutdown (); - } - else - { - /* nothing to do, sleep for a minute and try again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, - &run_aggregation, - NULL); - } - return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + s->work_counter++; + /* continued below */ + break; } /* Now try to find other deposits to aggregate */ @@ -707,6 +799,7 @@ run_aggregation (void *cls) db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); + release_shard (s); return; } if (GNUNET_DB_STATUS_SOFT_ERROR == qs) @@ -718,7 +811,7 @@ run_aggregation (void *cls) cleanup_au (&au_active); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); + s); return; } @@ -754,6 +847,7 @@ run_aggregation (void *cls) global_ret = EXIT_FAILURE; cleanup_au (&au_active); GNUNET_SCHEDULER_shutdown (); + release_shard (s); return; } /* Mark transactions by row_id as minor */ @@ -778,7 +872,7 @@ run_aggregation (void *cls) /* start again */ GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); + s); return; } if (GNUNET_DB_STATUS_HARD_ERROR == qs) @@ -787,6 +881,7 @@ run_aggregation (void *cls) cleanup_au (&au_active); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); + release_shard (s); return; } /* commit */ @@ -796,20 +891,13 @@ run_aggregation (void *cls) /* start again */ GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); + s); return; } - { - char *amount_s; - - amount_s = TALER_amount_to_string (&au_active.final_amount); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Preparing wire transfer of %s to %s\n", - amount_s, - TALER_B2S (&au_active.merchant_pub)); - GNUNET_free (amount_s); - } - + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Preparing wire transfer of %s to %s\n", + TALER_amount2s (&au_active.final_amount), + TALER_B2S (&au_active.merchant_pub)); { void *buf; size_t buf_size; @@ -856,7 +944,7 @@ run_aggregation (void *cls) /* start again */ GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); + s); return; } if (GNUNET_DB_STATUS_HARD_ERROR == qs) @@ -866,6 +954,7 @@ run_aggregation (void *cls) /* die hard */ global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); + release_shard (s); return; } @@ -882,26 +971,72 @@ run_aggregation (void *cls) "Commit issue for prepared wire data; trying again later!\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); + s); return; case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); + release_shard (s); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Preparation complete, going again\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); + s); return; default: GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + } +} + + +/** + * Select a shard to work on. + * + * @param cls NULL + */ +static void +run_shard (void *cls) +{ + struct Shard *s; + enum GNUNET_DB_QueryStatus qs; + + (void) cls; + task = NULL; + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); return; } + s = GNUNET_new (struct Shard); + s->start_time = GNUNET_TIME_absolute_get (); + qs = db_plugin->begin_revolving_shard (db_plugin->cls, + "aggregator", + shard_size, + 1U + INT32_MAX, + &s->shard_start, + &s->shard_end); + if (0 >= qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to begin shard!\n"); + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + s); } @@ -919,6 +1054,7 @@ run (void *cls, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *c) { + unsigned long long ass; (void) cls; (void) args; (void) cfgfile; @@ -930,8 +1066,23 @@ run (void *cls, global_ret = EXIT_NOTCONFIGURED; return; } + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_number (cfg, + "exchange", + "AGGREGATOR_SHARD_SIZE", + &ass)) + { + cfg = NULL; + global_ret = EXIT_NOTCONFIGURED; + return; + } + if ( (0 == ass) || + (ass > INT32_MAX) ) + shard_size = 1U + INT32_MAX; + else + shard_size = (uint32_t) ass; GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, + task = GNUNET_SCHEDULER_add_now (&run_shard, NULL); GNUNET_SCHEDULER_add_shutdown (&shutdown_task, cls); -- cgit v1.2.3