summaryrefslogtreecommitdiff
path: root/src/exchange
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-09-03 19:08:02 +0200
committerChristian Grothoff <christian@grothoff.org>2021-09-03 19:08:02 +0200
commit5149af93147c54055d99af688993de3fb4c36ddf (patch)
tree8652e56bed70976309412e11c35305a352576a9b /src/exchange
parent6e1877b142d4819a248b01aebfdd6f337f82a509 (diff)
downloadexchange-5149af93147c54055d99af688993de3fb4c36ddf.tar.gz
exchange-5149af93147c54055d99af688993de3fb4c36ddf.tar.bz2
exchange-5149af93147c54055d99af688993de3fb4c36ddf.zip
preliminary work on supporting sharding/parallel aggregation (undertested, but tests pass again)
Diffstat (limited to 'src/exchange')
-rw-r--r--src/exchange/exchange.conf11
-rw-r--r--src/exchange/taler-exchange-aggregator.c273
2 files changed, 223 insertions, 61 deletions
diff --git a/src/exchange/exchange.conf b/src/exchange/exchange.conf
index f8d99f75..68c1556d 100644
--- a/src/exchange/exchange.conf
+++ b/src/exchange/exchange.conf
@@ -45,6 +45,17 @@ BASE_URL = http://localhost:8081/
# sleep if it has nothing to do?
AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s
+# Values of 0 or above 2^31 disable sharding, which
+# is a sane default for most use-cases.
+# When changing this value, you MUST stop all
+# aggregators and manually run
+#
+# $ taler-exchange-dbinit -s
+#
+# against the exchange's database. Otherwise, the
+# aggregation logic will break badly!
+AGGREGATOR_SHARD_SIZE = 2147483648
+
# How long should wirewatch sleep if it has nothing to do?
# (Set very aggressively here for the demonstrators to be
# super fast.)
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index e202290d..0fc13c14 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -108,6 +108,35 @@ struct AggregationUnit
/**
+ * Work shard we are processing.
+ */
+struct Shard
+{
+
+ /**
+ * When did we start processing the shard?
+ */
+ struct GNUNET_TIME_Absolute start_time;
+
+ /**
+ * Starting row of the shard.
+ */
+ uint32_t shard_start;
+
+ /**
+ * Exclusive end row of the shard.
+ */
+ uint32_t shard_end;
+
+ /**
+ * Number of starting points found in the shard.
+ */
+ uint64_t work_counter;
+
+};
+
+
+/**
* What is the smallest unit we support for wire transfers?
* We will need to round down to a multiple of this amount.
*/
@@ -135,12 +164,20 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
*/
static struct GNUNET_SCHEDULER_Task *task;
+
/**
* How long should we sleep when idle before trying to find more work?
*/
static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
/**
+ * How big are the shards we are processing? Is an inclusive offset, so every
+ * shard ranges from [X,X+shard_size) exclusive. So a shard covers
+ * shard_size slots. The maximum value for shard_size is INT32_MAX+1.
+ */
+static uint32_t shard_size;
+
+/**
* Value to return from main(). 0 on success, non-zero on errors.
*/
static int global_ret;
@@ -162,6 +199,15 @@ run_aggregation (void *cls);
/**
+ * Select a shard to work on.
+ *
+ * @param cls NULL
+ */
+static void
+run_shard (void *cls);
+
+
+/**
* Free data stored in @a au, but not @a au itself (stack allocated).
*
* @param au aggregation unit to clean up
@@ -612,30 +658,56 @@ commit_or_warn (void)
/**
+ * Release lock on shard @a s in the database.
+ * On error, terminates this process.
+ *
+ * @param[in] s shard to free (and memory to release)
+ */
+static void
+release_shard (struct Shard *s)
+{
+ enum GNUNET_DB_QueryStatus qs;
+
+ qs = db_plugin->release_revolving_shard (
+ db_plugin->cls,
+ "aggregator",
+ s->shard_start,
+ s->shard_end);
+ GNUNET_free (s);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* Strange, but let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ break;
+ }
+}
+
+
+/**
* Main work function that queries the DB and aggregates transactions
* into larger wire transfers.
*
- * @param cls NULL
+ * @param cls a `struct Shard *`
*/
static void
run_aggregation (void *cls)
{
+ struct Shard *s = cls;
struct AggregationUnit au_active;
enum GNUNET_DB_QueryStatus qs;
- (void) cls;
task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Checking for ready deposits to aggregate\n");
- if (GNUNET_SYSERR ==
- db_plugin->preflight (db_plugin->cls))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to obtain database connection!\n");
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
if (GNUNET_OK !=
db_plugin->start_deferred_wire_out (db_plugin->cls))
{
@@ -643,50 +715,70 @@ run_aggregation (void *cls)
"Failed to start database transaction!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
return;
}
memset (&au_active,
0,
sizeof (au_active));
- qs = db_plugin->get_ready_deposit (db_plugin->cls,
- &deposit_cb,
- &au_active);
- if (0 >= qs)
+ qs = db_plugin->get_ready_deposit (
+ db_plugin->cls,
+ s->shard_start,
+ s->shard_end - 1, /* -1: exclusive->inclusive */
+ &deposit_cb,
+ &au_active);
+ switch (qs)
{
+ case GNUNET_DB_STATUS_HARD_ERROR:
cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls);
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to execute deposit iteration!\n");
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to begin deposit iteration!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ cleanup_au (&au_active);
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ s);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
{
- /* should re-try immediately */
+ uint64_t counter = s->work_counter;
+ struct GNUNET_TIME_Relative duration
+ = GNUNET_TIME_absolute_get_duration (s->start_time);
+
+ cleanup_au (&au_active);
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Completed shard after %s\n",
+ GNUNET_STRINGS_relative_time_to_string (duration,
+ GNUNET_YES));
+ release_shard (s);
+ if (GNUNET_YES == test_mode)
+ {
+ /* in test mode, shutdown after a shard is done */
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
+ /* If we ended up doing zero work, sleep a bit */
+ if (0 == counter)
+ task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
+ &run_shard,
+ NULL);
+ else
+ task = GNUNET_SCHEDULER_add_now (&run_shard,
+ NULL);
return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "No more ready deposits, going to sleep\n");
- if (GNUNET_YES == test_mode)
- {
- /* in test mode, shutdown if we end up being idle */
- GNUNET_SCHEDULER_shutdown ();
- }
- else
- {
- /* nothing to do, sleep for a minute and try again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
- &run_aggregation,
- NULL);
- }
- return;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ s->work_counter++;
+ /* continued below */
+ break;
}
/* Now try to find other deposits to aggregate */
@@ -707,6 +799,7 @@ run_aggregation (void *cls)
db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
return;
}
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@@ -718,7 +811,7 @@ run_aggregation (void *cls)
cleanup_au (&au_active);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
+ s);
return;
}
@@ -754,6 +847,7 @@ run_aggregation (void *cls)
global_ret = EXIT_FAILURE;
cleanup_au (&au_active);
GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
return;
}
/* Mark transactions by row_id as minor */
@@ -778,7 +872,7 @@ run_aggregation (void *cls)
/* start again */
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
+ s);
return;
}
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
@@ -787,6 +881,7 @@ run_aggregation (void *cls)
cleanup_au (&au_active);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
return;
}
/* commit */
@@ -796,20 +891,13 @@ run_aggregation (void *cls)
/* start again */
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
+ s);
return;
}
- {
- char *amount_s;
-
- amount_s = TALER_amount_to_string (&au_active.final_amount);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Preparing wire transfer of %s to %s\n",
- amount_s,
- TALER_B2S (&au_active.merchant_pub));
- GNUNET_free (amount_s);
- }
-
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Preparing wire transfer of %s to %s\n",
+ TALER_amount2s (&au_active.final_amount),
+ TALER_B2S (&au_active.merchant_pub));
{
void *buf;
size_t buf_size;
@@ -856,7 +944,7 @@ run_aggregation (void *cls)
/* start again */
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
+ s);
return;
}
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
@@ -866,6 +954,7 @@ run_aggregation (void *cls)
/* die hard */
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
return;
}
@@ -882,26 +971,72 @@ run_aggregation (void *cls)
"Commit issue for prepared wire data; trying again later!\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
+ s);
return;
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Preparation complete, going again\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- NULL);
+ s);
return;
default:
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
+ return;
+ }
+}
+
+
+/**
+ * Select a shard to work on.
+ *
+ * @param cls NULL
+ */
+static void
+run_shard (void *cls)
+{
+ struct Shard *s;
+ enum GNUNET_DB_QueryStatus qs;
+
+ (void) cls;
+ task = NULL;
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database connection!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
return;
}
+ s = GNUNET_new (struct Shard);
+ s->start_time = GNUNET_TIME_absolute_get ();
+ qs = db_plugin->begin_revolving_shard (db_plugin->cls,
+ "aggregator",
+ shard_size,
+ 1U + INT32_MAX,
+ &s->shard_start,
+ &s->shard_end);
+ if (0 >= qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to begin shard!\n");
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ s);
}
@@ -919,6 +1054,7 @@ run (void *cls,
const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *c)
{
+ unsigned long long ass;
(void) cls;
(void) args;
(void) cfgfile;
@@ -930,8 +1066,23 @@ run (void *cls,
global_ret = EXIT_NOTCONFIGURED;
return;
}
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_number (cfg,
+ "exchange",
+ "AGGREGATOR_SHARD_SIZE",
+ &ass))
+ {
+ cfg = NULL;
+ global_ret = EXIT_NOTCONFIGURED;
+ return;
+ }
+ if ( (0 == ass) ||
+ (ass > INT32_MAX) )
+ shard_size = 1U + INT32_MAX;
+ else
+ shard_size = (uint32_t) ass;
GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ task = GNUNET_SCHEDULER_add_now (&run_shard,
NULL);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
cls);