summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-transfer.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-09-05 15:25:46 +0200
committerChristian Grothoff <christian@grothoff.org>2021-09-05 15:25:57 +0200
commitae8d481e1ce9f694a42619809d2c9b6e6acf3497 (patch)
tree1b0554139c53f7dde177d5cd74a9b3800b3adb33 /src/exchange/taler-exchange-transfer.c
parentadc6c53b5c7e08ff6eba180f872a8a8f8f94cd65 (diff)
downloadexchange-ae8d481e1ce9f694a42619809d2c9b6e6acf3497.tar.gz
exchange-ae8d481e1ce9f694a42619809d2c9b6e6acf3497.tar.bz2
exchange-ae8d481e1ce9f694a42619809d2c9b6e6acf3497.zip
implement taler-exchange-transfer DB sharding logic
Diffstat (limited to 'src/exchange/taler-exchange-transfer.c')
-rw-r--r--src/exchange/taler-exchange-transfer.c405
1 files changed, 325 insertions, 80 deletions
diff --git a/src/exchange/taler-exchange-transfer.c b/src/exchange/taler-exchange-transfer.c
index d6d44eb05..b93d14600 100644
--- a/src/exchange/taler-exchange-transfer.c
+++ b/src/exchange/taler-exchange-transfer.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2016-2020 Taler Systems SA
+ Copyright (C) 2016-2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -27,6 +27,46 @@
#include "taler_json_lib.h"
#include "taler_bank_service.h"
+/**
+ * What is the maximum batch size we use for credit history
+ * requests with the bank. See `batch_size` below.
+ */
+#define MAXIMUM_BATCH_SIZE 1024
+
+
+/**
+ * Information about our work shard.
+ */
+struct Shard
+{
+
+ /**
+ * Time when we started to work on this shard.
+ */
+ struct GNUNET_TIME_Absolute shard_start_time;
+
+ /**
+ * Offset the shard begins at.
+ */
+ uint64_t shard_start;
+
+ /**
+ * Exclusive offset where the shard ends.
+ */
+ uint64_t shard_end;
+
+ /**
+ * Offset where our current batch begins.
+ */
+ uint64_t batch_start;
+
+ /**
+ * Highest row processed in the current batch.
+ */
+ uint64_t batch_end;
+
+};
+
/**
* Data we keep to #run_transfers(). There is at most
@@ -38,6 +78,18 @@ struct WirePrepareData
{
/**
+ * All transfers done in the same transaction
+ * are kept in a DLL.
+ */
+ struct WirePrepareData *next;
+
+ /**
+ * All transfers done in the same transaction
+ * are kept in a DLL.
+ */
+ struct WirePrepareData *prev;
+
+ /**
* Wire execution handle.
*/
struct TALER_BANK_TransferHandle *eh;
@@ -71,10 +123,21 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
static struct GNUNET_SCHEDULER_Task *task;
/**
- * If we are currently executing a transfer, information about
- * the active transfer is here. Otherwise, this variable is NULL.
+ * If we are currently executing transfers, information about
+ * the active transfers is here. Otherwise, this variable is NULL.
+ */
+static struct WirePrepareData *wpd_head;
+
+/**
+ * If we are currently executing transfers, information about
+ * the active transfers is here. Otherwise, this variable is NULL.
+ */
+static struct WirePrepareData *wpd_tail;
+
+/**
+ * Information about our work shard.
*/
-static struct WirePrepareData *wpd;
+static struct Shard *shard;
/**
* Handle to the context for interacting with the bank / wire gateway.
@@ -87,11 +150,6 @@ static struct GNUNET_CURL_Context *ctx;
static struct GNUNET_CURL_RescheduleContext *rc;
/**
- * How long should we sleep when idle before trying to find more work?
- */
-static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
-
-/**
* Value to return from main(). 0 on success, non-zero on errors.
*/
static int global_ret;
@@ -101,6 +159,54 @@ static int global_ret;
*/
static int test_mode;
+/**
+ * How long should we sleep when idle before trying to find more work?
+ * Also used for how long we wait to grab a shard before trying it again.
+ * The value should be set to a bit above the average time it takes to
+ * process a shard.
+ */
+static struct GNUNET_TIME_Relative transfer_idle_sleep_interval;
+
+/**
+ * How long did we take to finish the last shard?
+ */
+static struct GNUNET_TIME_Relative shard_delay;
+
+/**
+ * Modulus to apply to group shards. The shard size must ultimately be a
+ * multiple of the batch size. Thus, if this is not a multiple of the
+ * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
+ */
+static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
+
+/**
+ * How many workers should we plan our scheduling with?
+ */
+static unsigned int max_workers = 16;
+
+
+/**
+ * Clean up all active bank interactions.
+ */
+static void
+cleanup_wpd (void)
+{
+ struct WirePrepareData *wpd;
+
+ while (NULL != (wpd = wpd_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (wpd_head,
+ wpd_tail,
+ wpd);
+ if (NULL != wpd->eh)
+ {
+ TALER_BANK_transfer_cancel (wpd->eh);
+ wpd->eh = NULL;
+ }
+ GNUNET_free (wpd);
+ }
+}
+
/**
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
@@ -128,17 +234,9 @@ shutdown_task (void *cls)
GNUNET_SCHEDULER_cancel (task);
task = NULL;
}
- if (NULL != wpd)
- {
- if (NULL != wpd->eh)
- {
- TALER_BANK_transfer_cancel (wpd->eh);
- wpd->eh = NULL;
- }
- db_plugin->rollback (db_plugin->cls);
- GNUNET_free (wpd);
- wpd = NULL;
- }
+ cleanup_wpd ();
+ GNUNET_free (shard);
+ db_plugin->rollback (db_plugin->cls); /* just in case */
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
TALER_EXCHANGEDB_unload_accounts ();
@@ -151,18 +249,18 @@ shutdown_task (void *cls)
*
* @return #GNUNET_OK on success
*/
-static int
-parse_wirewatch_config (void)
+static enum GNUNET_GenericReturnValue
+parse_transfer_config (void)
{
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_time (cfg,
"exchange",
- "AGGREGATOR_IDLE_SLEEP_INTERVAL",
- &aggregator_idle_sleep_interval))
+ "TRANSFER_IDLE_SLEEP_INTERVAL",
+ &transfer_idle_sleep_interval))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"exchange",
- "AGGREGATOR_IDLE_SLEEP_INTERVAL");
+ "TRANSFER_IDLE_SLEEP_INTERVAL");
return GNUNET_SYSERR;
}
if (NULL ==
@@ -219,12 +317,21 @@ run_transfers (void *cls);
/**
+ * Select shard to process.
+ *
+ * @param cls NULL
+ */
+static void
+select_shard (void *cls);
+
+
+/**
* Function called with the result from the execute step.
* On success, we mark the respective wire transfer as finished,
* and in general we afterwards continue to #run_transfers(),
* except for irrecoverable errors.
*
- * @param cls NULL
+ * @param cls `struct WirePrepareData` we are working on
* @param http_status_code #MHD_HTTP_OK on success
* @param ec taler error code
* @param row_id unique ID of the wire transfer in the bank's records
@@ -237,15 +344,18 @@ wire_confirm_cb (void *cls,
uint64_t row_id,
struct GNUNET_TIME_Absolute wire_timestamp)
{
+ struct WirePrepareData *wpd = cls;
enum GNUNET_DB_QueryStatus qs;
- (void) cls;
(void) row_id;
(void) wire_timestamp;
wpd->eh = NULL;
switch (http_status_code)
{
case MHD_HTTP_OK:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Wire transfer %llu completed successfully\n",
+ (unsigned long long) wpd->row_id);
qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
wpd->row_id);
/* continued below */
@@ -262,38 +372,43 @@ wire_confirm_cb (void *cls,
break;
default:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Wire transaction failed: %u/%d\n",
+ "Wire transfer %llu failed: %u/%d\n",
+ (unsigned long long) wpd->row_id,
http_status_code,
ec);
db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (wpd);
- wpd = NULL;
return;
}
- if (0 >= qs)
+ shard->batch_end = GNUNET_MAX (wpd->row_id,
+ shard->batch_end);
+ switch (qs)
{
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ case GNUNET_DB_STATUS_SOFT_ERROR:
db_plugin->rollback (db_plugin->cls);
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
- /* try again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_transfers,
- NULL);
- }
- else
- {
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- }
- GNUNET_free (wpd);
- wpd = NULL;
+ cleanup_wpd ();
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ NULL);
return;
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ db_plugin->rollback (db_plugin->cls);
+ cleanup_wpd ();
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ GNUNET_CONTAINER_DLL_remove (wpd_head,
+ wpd_tail,
+ wpd);
+ GNUNET_free (wpd);
+ break;
}
- GNUNET_free (wpd);
- wpd = NULL;
+ if (NULL != wpd_head)
+ return; /* wait for other queries to complete */
+ /* batch done */
switch (commit_or_warn ())
{
case GNUNET_DB_STATUS_SOFT_ERROR:
@@ -308,8 +423,9 @@ wire_confirm_cb (void *cls,
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ shard->batch_start = shard->batch_end + 1;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Wire transfer complete\n");
+ "Batch complete\n");
/* continue with #run_transfers(), just to guard
against the unlikely case that there are more. */
GNUNET_assert (NULL == task);
@@ -343,6 +459,7 @@ wire_prepare_cb (void *cls,
size_t buf_size)
{
const struct TALER_EXCHANGEDB_AccountInfo *wa;
+ struct WirePrepareData *wpd;
(void) cls;
if ( (NULL == wire_method) ||
@@ -351,9 +468,14 @@ wire_prepare_cb (void *cls,
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE;
- goto cleanup;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
}
+ wpd = GNUNET_new (struct WirePrepareData);
wpd->row_id = rowid;
+ GNUNET_CONTAINER_DLL_insert (wpd_head,
+ wpd_tail,
+ wpd);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting wire transfer %llu\n",
(unsigned long long) rowid);
@@ -365,7 +487,8 @@ wire_prepare_cb (void *cls,
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_NOTCONFIGURED;
- goto cleanup;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
}
wa = wpd->wa;
wpd->eh = TALER_BANK_transfer (ctx,
@@ -373,19 +496,15 @@ wire_prepare_cb (void *cls,
buf,
buf_size,
&wire_confirm_cb,
- NULL);
+ wpd);
if (NULL == wpd->eh)
{
GNUNET_break (0); /* Irrecoverable */
db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE;
- goto cleanup;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
}
- return;
-cleanup:
- GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (wpd);
- wpd = NULL;
}
@@ -399,23 +518,55 @@ static void
run_transfers (void *cls)
{
enum GNUNET_DB_QueryStatus qs;
+ int64_t limit;
(void) cls;
task = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Checking for pending wire transfers\n");
- if (GNUNET_SYSERR ==
- db_plugin->preflight (db_plugin->cls))
+ limit = shard->shard_end - shard->batch_start;
+ if (0 >= limit)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to obtain database connection!\n");
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Shard [%llu,%llu) completed\n",
+ (unsigned long long) shard->shard_start,
+ (unsigned long long) shard->batch_end);
+ qs = db_plugin->complete_shard (db_plugin->cls,
+ "transfer",
+ shard->shard_start,
+ shard->batch_end + 1);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_free (shard);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for complete_shard. Rolling back.\n");
+ GNUNET_free (shard);
+ task = GNUNET_SCHEDULER_add_now (&select_shard,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* already existed, ok, let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ break;
+ }
+ shard_delay = GNUNET_TIME_absolute_get_duration (shard->shard_start_time);
+ GNUNET_free (shard);
+ task = GNUNET_SCHEDULER_add_now (&select_shard,
+ NULL);
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Checking for %lld pending wire transfers [%llu-...)\n",
+ (long long) limit,
+ (unsigned long long) shard->batch_start);
if (GNUNET_OK !=
- db_plugin->start (db_plugin->cls,
- "aggregator run transfer"))
+ db_plugin->start_read_committed (db_plugin->cls,
+ "aggregator run transfer"))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
@@ -423,30 +574,29 @@ run_transfers (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
- wpd = GNUNET_new (struct WirePrepareData);
qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
+ shard->batch_start,
+ limit,
&wire_prepare_cb,
NULL);
- if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
- return; /* continued via continuation set in #wire_prepare_cb() */
- db_plugin->rollback (db_plugin->cls);
- GNUNET_free (wpd);
- wpd = NULL;
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
+ db_plugin->rollback (db_plugin->cls);
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
+ db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_transfers,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* no more prepared wire transfers, go sleep a bit! */
+ db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task);
if (GNUNET_YES == test_mode)
{
@@ -458,15 +608,92 @@ run_transfers (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No more pending wire transfers, going idle\n");
- task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
+ task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
&run_transfers,
NULL);
}
return;
+ default:
+ /* continued in wire_prepare_cb() */
+ return;
+ }
+}
+
+
+/**
+ * Select shard to process.
+ *
+ * @param cls NULL
+ */
+static void
+select_shard (void *cls)
+{
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_TIME_Relative delay;
+ uint64_t start;
+ uint64_t end;
+
+ (void) cls;
+ task = NULL;
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database connection!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (0 == max_workers)
+ delay = GNUNET_TIME_UNIT_ZERO;
+ else
+ delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
+ GNUNET_CRYPTO_QUALITY_WEAK,
+ 4 * GNUNET_TIME_relative_max (
+ transfer_idle_sleep_interval,
+ GNUNET_TIME_relative_multiply (shard_delay,
+ max_workers)).rel_value_us);
+ qs = db_plugin->begin_shard (db_plugin->cls,
+ "transfer",
+ delay,
+ shard_size,
+ &start,
+ &end);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain starting point for montoring from database!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* try again */
+ task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
+ &select_shard,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0);
+ task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
+ &select_shard,
+ NULL);
+ return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- /* should be impossible */
- GNUNET_assert (0);
+ /* continued below */
+ break;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Starting with shard [%llu,%llu)\n",
+ (unsigned long long) start,
+ (unsigned long long) end);
+ shard = GNUNET_new (struct Shard);
+ shard->shard_start_time = GNUNET_TIME_absolute_get ();
+ shard->shard_start = start;
+ shard->shard_end = end;
+ shard->batch_start = start;
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ NULL);
}
@@ -489,7 +716,7 @@ run (void *cls,
(void) cfgfile;
cfg = c;
- if (GNUNET_OK != parse_wirewatch_config ())
+ if (GNUNET_OK != parse_transfer_config ())
{
cfg = NULL;
global_ret = EXIT_NOTCONFIGURED;
@@ -503,9 +730,17 @@ run (void *cls,
GNUNET_break (0);
return;
}
-
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database connection!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ task = GNUNET_SCHEDULER_add_now (&select_shard,
NULL);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
cls);
@@ -524,12 +759,22 @@ main (int argc,
char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_option_uint ('S',
+ "size",
+ "SIZE",
+ "Size to process per shard (default: 1024)",
+ &shard_size),
GNUNET_GETOPT_option_timetravel ('T',
"timetravel"),
GNUNET_GETOPT_option_flag ('t',
"test",
"run in test mode and exit when idle",
&test_mode),
+ GNUNET_GETOPT_option_uint ('w',
+ "workers",
+ "COUNT",
+ "Plan work load with up to COUNT worker processes (default: 16)",
+ &max_workers),
GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
GNUNET_GETOPT_OPTION_END
};