summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/benchmark/benchmark.conf2
-rw-r--r--src/exchange/exchange.conf11
-rw-r--r--src/exchange/taler-exchange-aggregator.c17
-rw-r--r--src/exchange/taler-exchange-closer.c16
-rw-r--r--src/exchange/taler-exchange-transfer.c405
-rw-r--r--src/exchange/taler-exchange-wirewatch.c1
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c123
-rw-r--r--src/exchangedb/test_exchangedb.c6
-rw-r--r--src/include/taler_exchangedb_plugin.h8
-rw-r--r--src/testing/testing_api_cmd_exec_transfer.c3
10 files changed, 468 insertions, 124 deletions
diff --git a/src/benchmark/benchmark.conf b/src/benchmark/benchmark.conf
index 844106cf..c38981dd 100644
--- a/src/benchmark/benchmark.conf
+++ b/src/benchmark/benchmark.conf
@@ -24,7 +24,7 @@ DB = postgres
# exchange (or the twister) is actually listening.
BASE_URL = "http://localhost:8081/"
-AGGREGATOR_SHARD_SIZE = 268435456
+AGGREGATOR_SHARD_SIZE = 67108864
#AGGREGATOR_SHARD_SIZE = 2147483648
diff --git a/src/exchange/exchange.conf b/src/exchange/exchange.conf
index 68c1556d..4b7f5f5a 100644
--- a/src/exchange/exchange.conf
+++ b/src/exchange/exchange.conf
@@ -41,10 +41,17 @@ PORT = 8081
BASE_URL = http://localhost:8081/
-# How long should the aggregator (and closer, and transfer)
-# sleep if it has nothing to do?
+# How long should the aggregator sleep if it has nothing to do?
AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s
+# How long should the transfer tool
+# sleep if it has nothing to do?
+TRANSFER_IDLE_SLEEP_INTERVAL = 60 s
+
+# How long should the closer tool
+# sleep if it has nothing to do?
+CLOSER_IDLE_SLEEP_INTERVAL = 60 s
+
# Values of 0 or above 2^31 disable sharding, which
# is a sane default for most use-cases.
# When changing this value, you MUST stop all
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index 893fa79f..caa4528d 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -1034,9 +1034,22 @@ run_shard (void *cls)
&s->shard_end);
if (0 >= qs)
{
+ if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ {
+ static struct GNUNET_TIME_Relative delay;
+
+ GNUNET_free (s);
+ delay = GNUNET_TIME_randomized_backoff (delay,
+ GNUNET_TIME_UNIT_SECONDS);
+ task = GNUNET_SCHEDULER_add_delayed (delay,
+ &run_shard,
+ NULL);
+ return;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to begin shard!\n");
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
+ "Failed to begin shard (%d)!\n",
+ qs);
+ GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR != qs);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
diff --git a/src/exchange/taler-exchange-closer.c b/src/exchange/taler-exchange-closer.c
index 926c9355..19cc06c7 100644
--- a/src/exchange/taler-exchange-closer.c
+++ b/src/exchange/taler-exchange-closer.c
@@ -60,7 +60,7 @@ static struct GNUNET_SCHEDULER_Task *task;
/**
* How long should we sleep when idle before trying to find more work?
*/
-static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
+static struct GNUNET_TIME_Relative closer_idle_sleep_interval;
/**
* Value to return from main(). 0 on success, non-zero
@@ -112,8 +112,8 @@ shutdown_task (void *cls)
*
* @return #GNUNET_OK on success
*/
-static int
-parse_wirewatch_config (void)
+static enum GNUNET_GenericReturnValue
+parse_closer_config (void)
{
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_string (cfg,
@@ -129,12 +129,12 @@ parse_wirewatch_config (void)
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_time (cfg,
"exchange",
- "AGGREGATOR_IDLE_SLEEP_INTERVAL",
- &aggregator_idle_sleep_interval))
+ "CLOSER_IDLE_SLEEP_INTERVAL",
+ &closer_idle_sleep_interval))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"exchange",
- "AGGREGATOR_IDLE_SLEEP_INTERVAL");
+ "CLOSER_IDLE_SLEEP_INTERVAL");
return GNUNET_SYSERR;
}
if ( (GNUNET_OK !=
@@ -444,7 +444,7 @@ run_reserve_closures (void *cls)
}
else
{
- task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
+ task = GNUNET_SCHEDULER_add_delayed (closer_idle_sleep_interval,
&run_reserve_closures,
NULL);
}
@@ -480,7 +480,7 @@ run (void *cls,
(void) cfgfile;
cfg = c;
- if (GNUNET_OK != parse_wirewatch_config ())
+ if (GNUNET_OK != parse_closer_config ())
{
cfg = NULL;
global_ret = EXIT_NOTCONFIGURED;
diff --git a/src/exchange/taler-exchange-transfer.c b/src/exchange/taler-exchange-transfer.c
index d6d44eb0..b93d1460 100644
--- a/src/exchange/taler-exchange-transfer.c
+++ b/src/exchange/taler-exchange-transfer.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2016-2020 Taler Systems SA
+ Copyright (C) 2016-2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -27,6 +27,46 @@
#include "taler_json_lib.h"
#include "taler_bank_service.h"
+/**
+ * What is the maximum batch size we use for credit history
+ * requests with the bank. See `batch_size` below.
+ */
+#define MAXIMUM_BATCH_SIZE 1024
+
+
+/**
+ * Information about our work shard.
+ */
+struct Shard
+{
+
+ /**
+ * Time when we started to work on this shard.
+ */
+ struct GNUNET_TIME_Absolute shard_start_time;
+
+ /**
+ * Offset the shard begins at.
+ */
+ uint64_t shard_start;
+
+ /**
+ * Exclusive offset where the shard ends.
+ */
+ uint64_t shard_end;
+
+ /**
+ * Offset where our current batch begins.
+ */
+ uint64_t batch_start;
+
+ /**
+ * Highest row processed in the current batch.
+ */
+ uint64_t batch_end;
+
+};
+
/**
* Data we keep to #run_transfers(). There is at most
@@ -38,6 +78,18 @@ struct WirePrepareData
{
/**
+ * All transfers done in the same transaction
+ * are kept in a DLL.
+ */
+ struct WirePrepareData *next;
+
+ /**
+ * All transfers done in the same transaction
+ * are kept in a DLL.
+ */
+ struct WirePrepareData *prev;
+
+ /**
* Wire execution handle.
*/
struct TALER_BANK_TransferHandle *eh;
@@ -71,10 +123,21 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
static struct GNUNET_SCHEDULER_Task *task;
/**
- * If we are currently executing a transfer, information about
- * the active transfer is here. Otherwise, this variable is NULL.
+ * If we are currently executing transfers, information about
+ * the active transfers is here. Otherwise, this variable is NULL.
+ */
+static struct WirePrepareData *wpd_head;
+
+/**
+ * If we are currently executing transfers, information about
+ * the active transfers is here. Otherwise, this variable is NULL.
+ */
+static struct WirePrepareData *wpd_tail;
+
+/**
+ * Information about our work shard.
*/
-static struct WirePrepareData *wpd;
+static struct Shard *shard;
/**
* Handle to the context for interacting with the bank / wire gateway.
@@ -87,11 +150,6 @@ static struct GNUNET_CURL_Context *ctx;
static struct GNUNET_CURL_RescheduleContext *rc;
/**
- * How long should we sleep when idle before trying to find more work?
- */
-static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
-
-/**
* Value to return from main(). 0 on success, non-zero on errors.
*/
static int global_ret;
@@ -101,6 +159,54 @@ static int global_ret;
*/
static int test_mode;
+/**
+ * How long should we sleep when idle before trying to find more work?
+ * Also used for how long we wait to grab a shard before trying it again.
+ * The value should be set to a bit above the average time it takes to
+ * process a shard.
+ */
+static struct GNUNET_TIME_Relative transfer_idle_sleep_interval;
+
+/**
+ * How long did we take to finish the last shard?
+ */
+static struct GNUNET_TIME_Relative shard_delay;
+
+/**
+ * Modulus to apply to group shards. The shard size must ultimately be a
+ * multiple of the batch size. Thus, if this is not a multiple of the
+ * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
+ */
+static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
+
+/**
+ * How many workers should we plan our scheduling with?
+ */
+static unsigned int max_workers = 16;
+
+
+/**
+ * Clean up all active bank interactions.
+ */
+static void
+cleanup_wpd (void)
+{
+ struct WirePrepareData *wpd;
+
+ while (NULL != (wpd = wpd_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (wpd_head,
+ wpd_tail,
+ wpd);
+ if (NULL != wpd->eh)
+ {
+ TALER_BANK_transfer_cancel (wpd->eh);
+ wpd->eh = NULL;
+ }
+ GNUNET_free (wpd);
+ }
+}
+
/**
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
@@ -128,17 +234,9 @@ shutdown_task (void *cls)
GNUNET_SCHEDULER_cancel (task);
task = NULL;
}
- if (NULL != wpd)
- {
- if (NULL != wpd->eh)
- {
- TALER_BANK_transfer_cancel (wpd->eh);
- wpd->eh = NULL;
- }
- db_plugin->rollback (db_plugin->cls);
- GNUNET_free (wpd);
- wpd = NULL;
- }
+ cleanup_wpd ();
+ GNUNET_free (shard);
+ db_plugin->rollback (db_plugin->cls); /* just in case */
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
TALER_EXCHANGEDB_unload_accounts ();
@@ -151,18 +249,18 @@ shutdown_task (void *cls)
*
* @return #GNUNET_OK on success
*/
-static int
-parse_wirewatch_config (void)
+static enum GNUNET_GenericReturnValue
+parse_transfer_config (void)
{
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_time (cfg,
"exchange",
- "AGGREGATOR_IDLE_SLEEP_INTERVAL",
- &aggregator_idle_sleep_interval))
+ "TRANSFER_IDLE_SLEEP_INTERVAL",
+ &transfer_idle_sleep_interval))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"exchange",
- "AGGREGATOR_IDLE_SLEEP_INTERVAL");
+ "TRANSFER_IDLE_SLEEP_INTERVAL");
return GNUNET_SYSERR;
}
if (NULL ==
@@ -219,12 +317,21 @@ run_transfers (void *cls);
/**
+ * Select shard to process.
+ *
+ * @param cls NULL
+ */
+static void
+select_shard (void *cls);
+
+
+/**
* Function called with the result from the execute step.
* On success, we mark the respective wire transfer as finished,
* and in general we afterwards continue to #run_transfers(),
* except for irrecoverable errors.
*
- * @param cls NULL
+ * @param cls `struct WirePrepareData` we are working on
* @param http_status_code #MHD_HTTP_OK on success
* @param ec taler error code
* @param row_id unique ID of the wire transfer in the bank's records
@@ -237,15 +344,18 @@ wire_confirm_cb (void *cls,
uint64_t row_id,
struct GNUNET_TIME_Absolute wire_timestamp)
{
+ struct WirePrepareData *wpd = cls;
enum GNUNET_DB_QueryStatus qs;
- (void) cls;
(void) row_id;
(void) wire_timestamp;
wpd->eh = NULL;
switch (http_status_code)
{
case MHD_HTTP_OK:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Wire transfer %llu completed successfully\n",
+ (unsigned long long) wpd->row_id);
qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
wpd->row_id);
/* continued below */
@@ -262,38 +372,43 @@ wire_confirm_cb (void *cls,
break;
default:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Wire transaction failed: %u/%d\n",
+ "Wire transfer %llu failed: %u/%d\n",
+ (unsigned long long) wpd->row_id,
http_status_code,
ec);
db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (wpd);
- wpd = NULL;
return;
}
- if (0 >= qs)
+ shard->batch_end = GNUNET_MAX (wpd->row_id,
+ shard->batch_end);
+ switch (qs)
{
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ case GNUNET_DB_STATUS_SOFT_ERROR:
db_plugin->rollback (db_plugin->cls);
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
- /* try again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_transfers,
- NULL);
- }
- else
- {
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- }
- GNUNET_free (wpd);
- wpd = NULL;
+ cleanup_wpd ();
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ NULL);
return;
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ db_plugin->rollback (db_plugin->cls);
+ cleanup_wpd ();
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ GNUNET_CONTAINER_DLL_remove (wpd_head,
+ wpd_tail,
+ wpd);
+ GNUNET_free (wpd);
+ break;
}
- GNUNET_free (wpd);
- wpd = NULL;
+ if (NULL != wpd_head)
+ return; /* wait for other queries to complete */
+ /* batch done */
switch (commit_or_warn ())
{
case GNUNET_DB_STATUS_SOFT_ERROR:
@@ -308,8 +423,9 @@ wire_confirm_cb (void *cls,
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ shard->batch_start = shard->batch_end + 1;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Wire transfer complete\n");
+ "Batch complete\n");
/* continue with #run_transfers(), just to guard
against the unlikely case that there are more. */
GNUNET_assert (NULL == task);
@@ -343,6 +459,7 @@ wire_prepare_cb (void *cls,
size_t buf_size)
{
const struct TALER_EXCHANGEDB_AccountInfo *wa;
+ struct WirePrepareData *wpd;
(void) cls;
if ( (NULL == wire_method) ||
@@ -351,9 +468,14 @@ wire_prepare_cb (void *cls,
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE;
- goto cleanup;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
}
+ wpd = GNUNET_new (struct WirePrepareData);
wpd->row_id = rowid;
+ GNUNET_CONTAINER_DLL_insert (wpd_head,
+ wpd_tail,
+ wpd);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting wire transfer %llu\n",
(unsigned long long) rowid);
@@ -365,7 +487,8 @@ wire_prepare_cb (void *cls,
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_NOTCONFIGURED;
- goto cleanup;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
}
wa = wpd->wa;
wpd->eh = TALER_BANK_transfer (ctx,
@@ -373,19 +496,15 @@ wire_prepare_cb (void *cls,
buf,
buf_size,
&wire_confirm_cb,
- NULL);
+ wpd);
if (NULL == wpd->eh)
{
GNUNET_break (0); /* Irrecoverable */
db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE;
- goto cleanup;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
}
- return;
-cleanup:
- GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (wpd);
- wpd = NULL;
}
@@ -399,23 +518,55 @@ static void
run_transfers (void *cls)
{
enum GNUNET_DB_QueryStatus qs;
+ int64_t limit;
(void) cls;
task = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Checking for pending wire transfers\n");
- if (GNUNET_SYSERR ==
- db_plugin->preflight (db_plugin->cls))
+ limit = shard->shard_end - shard->batch_start;
+ if (0 >= limit)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to obtain database connection!\n");
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Shard [%llu,%llu) completed\n",
+ (unsigned long long) shard->shard_start,
+ (unsigned long long) shard->batch_end);
+ qs = db_plugin->complete_shard (db_plugin->cls,
+ "transfer",
+ shard->shard_start,
+ shard->batch_end + 1);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_free (shard);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for complete_shard. Rolling back.\n");
+ GNUNET_free (shard);
+ task = GNUNET_SCHEDULER_add_now (&select_shard,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* already existed, ok, let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ break;
+ }
+ shard_delay = GNUNET_TIME_absolute_get_duration (shard->shard_start_time);
+ GNUNET_free (shard);
+ task = GNUNET_SCHEDULER_add_now (&select_shard,
+ NULL);
return;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Checking for %lld pending wire transfers [%llu-...)\n",
+ (long long) limit,
+ (unsigned long long) shard->batch_start);
if (GNUNET_OK !=
- db_plugin->start (db_plugin->cls,
- "aggregator run transfer"))
+ db_plugin->start_read_committed (db_plugin->cls,
+ "aggregator run transfer"))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
@@ -423,30 +574,29 @@ run_transfers (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
- wpd = GNUNET_new (struct WirePrepareData);
qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
+ shard->batch_start,
+ limit,
&wire_prepare_cb,
NULL);
- if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
- return; /* continued via continuation set in #wire_prepare_cb() */
- db_plugin->rollback (db_plugin->cls);
- GNUNET_free (wpd);
- wpd = NULL;
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
+ db_plugin->rollback (db_plugin->cls);
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
+ db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_transfers,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* no more prepared wire transfers, go sleep a bit! */
+ db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task);
if (GNUNET_YES == test_mode)
{
@@ -458,15 +608,92 @@ run_transfers (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No more pending wire transfers, going idle\n");
- task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
+ task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
&run_transfers,
NULL);
}
return;
+ default:
+ /* continued in wire_prepare_cb() */
+ return;
+ }
+}
+
+
+/**
+ * Select shard to process.
+ *
+ * @param cls NULL
+ */
+static void
+select_shard (void *cls)
+{
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_TIME_Relative delay;
+ uint64_t start;
+ uint64_t end;
+
+ (void) cls;
+ task = NULL;
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database connection!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (0 == max_workers)
+ delay = GNUNET_TIME_UNIT_ZERO;
+ else
+ delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
+ GNUNET_CRYPTO_QUALITY_WEAK,
+ 4 * GNUNET_TIME_relative_max (
+ transfer_idle_sleep_interval,
+ GNUNET_TIME_relative_multiply (shard_delay,
+ max_workers)).rel_value_us);
+ qs = db_plugin->begin_shard (db_plugin->cls,
+ "transfer",
+ delay,
+ shard_size,
+ &start,
+ &end);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain starting point for montoring from database!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* try again */
+ task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
+ &select_shard,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0);
+ task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
+ &select_shard,
+ NULL);
+ return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- /* should be impossible */
- GNUNET_assert (0);
+ /* continued below */
+ break;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Starting with shard [%llu,%llu)\n",
+ (unsigned long long) start,
+ (unsigned long long) end);
+ shard = GNUNET_new (struct Shard);
+ shard->shard_start_time = GNUNET_TIME_absolute_get ();
+ shard->shard_start = start;
+ shard->shard_end = end;
+ shard->batch_start = start;
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ NULL);
}
@@ -489,7 +716,7 @@ run (void *cls,
(void) cfgfile;
cfg = c;
- if (GNUNET_OK != parse_wirewatch_config ())
+ if (GNUNET_OK != parse_transfer_config ())
{
cfg = NULL;
global_ret = EXIT_NOTCONFIGURED;
@@ -503,9 +730,17 @@ run (void *cls,
GNUNET_break (0);
return;
}
-
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database connection!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ task = GNUNET_SCHEDULER_add_now (&select_shard,
NULL);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
cls);
@@ -524,12 +759,22 @@ main (int argc,
char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_option_uint ('S',
+ "size",
+ "SIZE",
+ "Size to process per shard (default: 1024)",
+ &shard_size),
GNUNET_GETOPT_option_timetravel ('T',
"timetravel"),
GNUNET_GETOPT_option_flag ('t',
"test",
"run in test mode and exit when idle",
&test_mode),
+ GNUNET_GETOPT_option_uint ('w',
+ "workers",
+ "COUNT",
+ "Plan work load with up to COUNT worker processes (default: 16)",
+ &max_workers),
GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
GNUNET_GETOPT_OPTION_END
};
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index fb1fde31..6e2cd1ee 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -752,6 +752,7 @@ main (int argc,
"COUNT",
"Plan work load with up to COUNT worker processes (default: 16)",
&max_workers),
+ GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
GNUNET_GETOPT_OPTION_END
};
enum GNUNET_GenericReturnValue ret;
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index d66370a2..817c1a18 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -1188,11 +1188,12 @@ prepare_statements (struct PostgresClosure *pg)
",type"
",buf"
" FROM prewire"
- " WHERE finished=FALSE"
+ " WHERE prewire_uuid >= $1"
+ " AND finished=FALSE"
" AND failed=FALSE"
" ORDER BY prewire_uuid ASC"
- " LIMIT 1;",
- 0),
+ " LIMIT $2;",
+ 2),
/* Used in #postgres_select_deposits_missing_wire */
GNUNET_PQ_make_prepare ("deposits_get_overdue",
"SELECT"
@@ -6985,51 +6986,115 @@ postgres_wire_prepare_data_mark_failed (
/**
+ * Closure for #prewire_cb().
+ */
+struct PrewireContext
+{
+ /**
+ * Function to call on each result.
+ */
+ TALER_EXCHANGEDB_WirePreparationIterator cb;
+
+ /**
+ * Closure for @a cb.
+ */
+ void *cb_cls;
+
+ /**
+ * #GNUNET_OK if everything went fine.
+ */
+ enum GNUNET_GenericReturnValue status;
+};
+
+
+/**
+ * Invoke the callback for each result.
+ *
+ * @param cls a `struct MissingWireContext *`
+ * @param result SQL result
+ * @param num_results number of rows in @a result
+ */
+static void
+prewire_cb (void *cls,
+ PGresult *result,
+ unsigned int num_results)
+{
+ struct PrewireContext *pc = cls;
+
+ for (unsigned int i = 0; i < num_results; i++)
+ {
+ uint64_t prewire_uuid;
+ char *type;
+ void *buf = NULL;
+ size_t buf_size;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint64 ("prewire_uuid",
+ &prewire_uuid),
+ GNUNET_PQ_result_spec_string ("type",
+ &type),
+ GNUNET_PQ_result_spec_variable_size ("buf",
+ &buf,
+ &buf_size),
+ GNUNET_PQ_result_spec_end
+ };
+
+ if (GNUNET_OK !=
+ GNUNET_PQ_extract_result (result,
+ rs,
+ i))
+ {
+ GNUNET_break (0);
+ pc->status = GNUNET_SYSERR;
+ return;
+ }
+ pc->cb (pc->cb_cls,
+ prewire_uuid,
+ type,
+ buf,
+ buf_size);
+ GNUNET_PQ_cleanup_result (rs);
+ }
+}
+
+
+/**
* Function called to get an unfinished wire transfer
* preparation data. Fetches at most one item.
*
* @param cls closure
+ * @param start_row offset to query table at
+ * @param limit maximum number of results to return
* @param cb function to call for ONE unfinished item
* @param cb_cls closure for @a cb
* @return transaction status code
*/
static enum GNUNET_DB_QueryStatus
postgres_wire_prepare_data_get (void *cls,
+ uint64_t start_row,
+ uint64_t limit,
TALER_EXCHANGEDB_WirePreparationIterator cb,
void *cb_cls)
{
struct PostgresClosure *pg = cls;
- enum GNUNET_DB_QueryStatus qs;
struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_uint64 (&start_row),
+ GNUNET_PQ_query_param_uint64 (&limit),
GNUNET_PQ_query_param_end
};
- uint64_t prewire_uuid;
- char *type;
- void *buf = NULL;
- size_t buf_size;
- struct GNUNET_PQ_ResultSpec rs[] = {
- GNUNET_PQ_result_spec_uint64 ("prewire_uuid",
- &prewire_uuid),
- GNUNET_PQ_result_spec_string ("type",
- &type),
- GNUNET_PQ_result_spec_variable_size ("buf",
- &buf,
- &buf_size),
- GNUNET_PQ_result_spec_end
+ struct PrewireContext pc = {
+ .cb = cb,
+ .cb_cls = cb_cls,
+ .status = GNUNET_OK
};
+ enum GNUNET_DB_QueryStatus qs;
- qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
- "wire_prepare_data_get",
- params,
- rs);
- if (0 >= qs)
- return qs;
- cb (cb_cls,
- prewire_uuid,
- type,
- buf,
- buf_size);
- GNUNET_PQ_cleanup_result (rs);
+ qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
+ "wire_prepare_data_get",
+ params,
+ &prewire_cb,
+ &pc);
+ if (GNUNET_OK != pc.status)
+ return GNUNET_DB_STATUS_HARD_ERROR;
return qs;
}
diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c
index 8478fac0..b332cd6d 100644
--- a/src/exchangedb/test_exchangedb.c
+++ b/src/exchangedb/test_exchangedb.c
@@ -117,6 +117,8 @@ test_wire_prepare (void)
{
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->wire_prepare_data_get (plugin->cls,
+ 0,
+ 1,
&dead_prepare_cb,
NULL));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
@@ -126,10 +128,14 @@ test_wire_prepare (void)
11));
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->wire_prepare_data_get (plugin->cls,
+ 0,
+ 1,
&mark_prepare_cb,
NULL));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->wire_prepare_data_get (plugin->cls,
+ 0,
+ 1,
&dead_prepare_cb,
NULL));
return GNUNET_OK;
diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h
index 163b886c..4037ebac 100644
--- a/src/include/taler_exchangedb_plugin.h
+++ b/src/include/taler_exchangedb_plugin.h
@@ -2964,15 +2964,19 @@ struct TALER_EXCHANGEDB_Plugin
/**
* Function called to get an unfinished wire transfer
- * preparation data. Fetches at most one item.
+ * preparation data.
*
* @param cls closure
- * @param cb function to call for ONE unfinished item
+ * @param start_row offset to query table at
+ * @param limit maximum number of results to return
+ * @param cb function to call for unfinished work
* @param cb_cls closure for @a cb
* @return transaction status code
*/
enum GNUNET_DB_QueryStatus
(*wire_prepare_data_get)(void *cls,
+ uint64_t start_row,
+ uint64_t limit,
TALER_EXCHANGEDB_WirePreparationIterator cb,
void *cb_cls);
diff --git a/src/testing/testing_api_cmd_exec_transfer.c b/src/testing/testing_api_cmd_exec_transfer.c
index 2db445be..796f32d0 100644
--- a/src/testing/testing_api_cmd_exec_transfer.c
+++ b/src/testing/testing_api_cmd_exec_transfer.c
@@ -66,6 +66,9 @@ transfer_run (void *cls,
"taler-exchange-transfer",
"taler-exchange-transfer",
"-c", as->config_filename,
+ "-L", "INFO",
+ "-S", "1",
+ "-w", "0",
"-t", /* exit when done */
NULL);
if (NULL == as->transfer_proc)