summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-transfer.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-transfer.c')
-rw-r--r--src/exchange/taler-exchange-transfer.c717
1 files changed, 531 insertions, 186 deletions
diff --git a/src/exchange/taler-exchange-transfer.c b/src/exchange/taler-exchange-transfer.c
index 1793dc988..9724b41fc 100644
--- a/src/exchange/taler-exchange-transfer.c
+++ b/src/exchange/taler-exchange-transfer.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2016-2020 Taler Systems SA
+ Copyright (C) 2016-2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -27,6 +27,51 @@
#include "taler_json_lib.h"
#include "taler_bank_service.h"
+/**
+ * What is the default batch size we use for credit history
+ * requests with the bank. See `batch_size` below.
+ */
+#define DEFAULT_BATCH_SIZE (4 * 1024)
+
+/**
+ * How often will we retry a request (given certain
+ * HTTP status codes) before giving up?
+ */
+#define MAX_RETRIES 16
+
+/**
+ * Information about our work shard.
+ */
+struct Shard
+{
+
+ /**
+ * Time when we started to work on this shard.
+ */
+ struct GNUNET_TIME_Absolute shard_start_time;
+
+ /**
+ * Offset the shard begins at.
+ */
+ uint64_t shard_start;
+
+ /**
+ * Exclusive offset where the shard ends.
+ */
+ uint64_t shard_end;
+
+ /**
+ * Offset where our current batch begins.
+ */
+ uint64_t batch_start;
+
+ /**
+ * Highest row processed in the current batch.
+ */
+ uint64_t batch_end;
+
+};
+
/**
* Data we keep to #run_transfers(). There is at most
@@ -38,9 +83,16 @@ struct WirePrepareData
{
/**
- * Database session for all of our transactions.
+ * All transfers done in the same transaction
+ * are kept in a DLL.
+ */
+ struct WirePrepareData *next;
+
+ /**
+ * All transfers done in the same transaction
+ * are kept in a DLL.
*/
- struct TALER_EXCHANGEDB_Session *session;
+ struct WirePrepareData *prev;
/**
* Wire execution handle.
@@ -50,13 +102,24 @@ struct WirePrepareData
/**
* Wire account used for this preparation.
*/
- struct TALER_EXCHANGEDB_WireAccount *wa;
+ const struct TALER_EXCHANGEDB_AccountInfo *wa;
/**
* Row ID of the transfer.
*/
unsigned long long row_id;
+ /**
+ * Number of bytes allocated after this struct
+ * with the prewire data.
+ */
+ size_t buf_size;
+
+ /**
+ * How often did we retry so far?
+ */
+ unsigned int retries;
+
};
@@ -76,10 +139,21 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
static struct GNUNET_SCHEDULER_Task *task;
/**
- * If we are currently executing a transfer, information about
- * the active transfer is here. Otherwise, this variable is NULL.
+ * If we are currently executing transfers, information about
+ * the active transfers is here. Otherwise, this variable is NULL.
+ */
+static struct WirePrepareData *wpd_head;
+
+/**
+ * If we are currently executing transfers, information about
+ * the active transfers is here. Otherwise, this variable is NULL.
+ */
+static struct WirePrepareData *wpd_tail;
+
+/**
+ * Information about our work shard.
*/
-static struct WirePrepareData *wpd;
+static struct Shard *shard;
/**
* Handle to the context for interacting with the bank / wire gateway.
@@ -87,39 +161,71 @@ static struct WirePrepareData *wpd;
static struct GNUNET_CURL_Context *ctx;
/**
- * Scheduler context for running the @e ctx.
+ * Randomized back-off we use on serialization errors.
*/
-static struct GNUNET_CURL_RescheduleContext *rc;
+static struct GNUNET_TIME_Relative serialization_delay;
/**
- * How long should we sleep when idle before trying to find more work?
+ * Scheduler context for running the @e ctx.
*/
-static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
+static struct GNUNET_CURL_RescheduleContext *rc;
/**
* Value to return from main(). 0 on success, non-zero on errors.
*/
-static enum
-{
- GR_SUCCESS = 0,
- GR_WIRE_TRANSFER_FAILED = 1,
- GR_DATABASE_COMMIT_HARD_FAIL = 2,
- GR_INVARIANT_FAILURE = 3,
- GR_WIRE_ACCOUNT_NOT_CONFIGURED = 4,
- GR_WIRE_TRANSFER_BEGIN_FAIL = 5,
- GR_DATABASE_TRANSACTION_BEGIN_FAIL = 6,
- GR_DATABASE_SESSION_START_FAIL = 7,
- GR_CONFIGURATION_INVALID = 8,
- GR_CMD_LINE_UTF8_ERROR = 9,
- GR_CMD_LINE_OPTIONS_WRONG = 10,
- GR_DATABASE_FETCH_FAILURE = 11,
-} global_ret;
+static int global_ret;
/**
* #GNUNET_YES if we are in test mode and should exit when idle.
*/
static int test_mode;
+/**
+ * How long should we sleep when idle before trying to find more work?
+ * Also used for how long we wait to grab a shard before trying it again.
+ * The value should be set to a bit above the average time it takes to
+ * process a shard.
+ */
+static struct GNUNET_TIME_Relative transfer_idle_sleep_interval;
+
+/**
+ * How long did we take to finish the last shard?
+ */
+static struct GNUNET_TIME_Relative shard_delay;
+
+/**
+ * Size of the shards.
+ */
+static unsigned int shard_size = DEFAULT_BATCH_SIZE;
+
+/**
+ * How many workers should we plan our scheduling with?
+ */
+static unsigned int max_workers = 0;
+
+
+/**
+ * Clean up all active bank interactions.
+ */
+static void
+cleanup_wpd (void)
+{
+ struct WirePrepareData *wpd;
+
+ while (NULL != (wpd = wpd_head))
+ {
+ GNUNET_CONTAINER_DLL_remove (wpd_head,
+ wpd_tail,
+ wpd);
+ if (NULL != wpd->eh)
+ {
+ TALER_BANK_transfer_cancel (wpd->eh);
+ wpd->eh = NULL;
+ }
+ GNUNET_free (wpd);
+ }
+}
+
/**
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
@@ -130,16 +236,6 @@ static void
shutdown_task (void *cls)
{
(void) cls;
- if (NULL != ctx)
- {
- GNUNET_CURL_fini (ctx);
- ctx = NULL;
- }
- if (NULL != rc)
- {
- GNUNET_CURL_gnunet_rc_destroy (rc);
- rc = NULL;
- }
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Running shutdown\n");
if (NULL != task)
@@ -147,42 +243,43 @@ shutdown_task (void *cls)
GNUNET_SCHEDULER_cancel (task);
task = NULL;
}
- if (NULL != wpd)
- {
- if (NULL != wpd->eh)
- {
- TALER_BANK_transfer_cancel (wpd->eh);
- wpd->eh = NULL;
- }
- db_plugin->rollback (db_plugin->cls,
- wpd->session);
- GNUNET_free (wpd);
- wpd = NULL;
- }
+ cleanup_wpd ();
+ GNUNET_free (shard);
+ db_plugin->rollback (db_plugin->cls); /* just in case */
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
TALER_EXCHANGEDB_unload_accounts ();
cfg = NULL;
+ if (NULL != ctx)
+ {
+ GNUNET_CURL_fini (ctx);
+ ctx = NULL;
+ }
+ if (NULL != rc)
+ {
+ GNUNET_CURL_gnunet_rc_destroy (rc);
+ rc = NULL;
+ }
}
/**
- * Parse the configuration for wirewatch.
+ * Parse the configuration for taler-exchange-transfer.
*
* @return #GNUNET_OK on success
*/
-static int
-parse_wirewatch_config (void)
+static enum GNUNET_GenericReturnValue
+parse_transfer_config (void)
{
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_time (cfg,
"exchange",
- "AGGREGATOR_IDLE_SLEEP_INTERVAL",
- &aggregator_idle_sleep_interval))
+ "TRANSFER_IDLE_SLEEP_INTERVAL",
+ &transfer_idle_sleep_interval))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"exchange",
- "AGGREGATOR_IDLE_SLEEP_INTERVAL");
+ "TRANSFER_IDLE_SLEEP_INTERVAL");
return GNUNET_SYSERR;
}
if (NULL ==
@@ -193,7 +290,9 @@ parse_wirewatch_config (void)
return GNUNET_SYSERR;
}
if (GNUNET_OK !=
- TALER_EXCHANGEDB_load_accounts (cfg))
+ TALER_EXCHANGEDB_load_accounts (cfg,
+ TALER_EXCHANGEDB_ALO_DEBIT
+ | TALER_EXCHANGEDB_ALO_AUTHDATA))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"No wire accounts configured for debit!\n");
@@ -208,18 +307,19 @@ parse_wirewatch_config (void)
/**
* Perform a database commit. If it fails, print a warning.
*
- * @param session session to perform the commit for.
* @return status of commit
*/
static enum GNUNET_DB_QueryStatus
-commit_or_warn (struct TALER_EXCHANGEDB_Session *session)
+commit_or_warn (void)
{
enum GNUNET_DB_QueryStatus qs;
- qs = db_plugin->commit (db_plugin->cls,
- session);
+ qs = db_plugin->commit (db_plugin->cls);
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ {
+ serialization_delay = GNUNET_TIME_UNIT_ZERO;
return qs;
+ }
GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
? GNUNET_ERROR_TYPE_INFO
: GNUNET_ERROR_TYPE_ERROR,
@@ -238,100 +338,178 @@ static void
run_transfers (void *cls);
+static void
+run_transfers_delayed (void *cls)
+{
+ (void) cls;
+ shard->shard_start_time = GNUNET_TIME_absolute_get ();
+ run_transfers (NULL);
+}
+
+
+/**
+ * Select shard to process.
+ *
+ * @param cls NULL
+ */
+static void
+select_shard (void *cls);
+
+
+/**
+ * We are done with the current batch. Commit
+ * and move on.
+ */
+static void
+batch_done (void)
+{
+ /* batch done */
+ GNUNET_assert (NULL == wpd_head);
+ switch (commit_or_warn ())
+ {
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* try again */
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization failure, trying again immediately!\n");
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ shard->batch_start = shard->batch_end + 1;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Batch complete\n");
+ /* continue with #run_transfers(), just to guard
+ against the unlikely case that there are more. */
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ NULL);
+ return;
+ default:
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+}
+
+
/**
* Function called with the result from the execute step.
* On success, we mark the respective wire transfer as finished,
* and in general we afterwards continue to #run_transfers(),
* except for irrecoverable errors.
*
- * @param cls NULL
- * @param http_status_code #MHD_HTTP_OK on success
- * @param ec taler error code
- * @param row_id unique ID of the wire transfer in the bank's records
- * @param wire_timestamp when did the transfer happen
+ * @param cls `struct WirePrepareData` we are working on
+ * @param tr transfer response
*/
static void
wire_confirm_cb (void *cls,
- unsigned int http_status_code,
- enum TALER_ErrorCode ec,
- uint64_t row_id,
- struct GNUNET_TIME_Absolute wire_timestamp)
+ const struct TALER_BANK_TransferResponse *tr)
{
- struct TALER_EXCHANGEDB_Session *session = wpd->session;
+ struct WirePrepareData *wpd = cls;
enum GNUNET_DB_QueryStatus qs;
- (void) cls;
- (void) row_id;
- (void) wire_timestamp;
wpd->eh = NULL;
- if (MHD_HTTP_OK != http_status_code)
+ switch (tr->http_status)
{
+ case MHD_HTTP_OK:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Wire transfer %llu completed successfully\n",
+ (unsigned long long) wpd->row_id);
+ qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
+ wpd->row_id);
+ /* continued below */
+ break;
+ case MHD_HTTP_NOT_FOUND:
+ case MHD_HTTP_CONFLICT:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Wire transaction failed: %u/%d\n",
- http_status_code,
- ec);
- db_plugin->rollback (db_plugin->cls,
- session);
- global_ret = GR_WIRE_TRANSFER_FAILED;
- GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (wpd);
- wpd = NULL;
- return;
- }
- qs = db_plugin->wire_prepare_data_mark_finished (db_plugin->cls,
- session,
+ "Wire transaction %llu failed: %u/%d\n",
+ (unsigned long long) wpd->row_id,
+ tr->http_status,
+ tr->ec);
+ qs = db_plugin->wire_prepare_data_mark_failed (db_plugin->cls,
wpd->row_id);
- if (0 >= qs)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- db_plugin->rollback (db_plugin->cls,
- session);
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
- /* try again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_transfers,
- NULL);
- }
- else
+ /* continued below */
+ break;
+ case 0:
+ case MHD_HTTP_TOO_MANY_REQUESTS:
+ case MHD_HTTP_INTERNAL_SERVER_ERROR:
+ case MHD_HTTP_BAD_GATEWAY:
+ case MHD_HTTP_SERVICE_UNAVAILABLE:
+ case MHD_HTTP_GATEWAY_TIMEOUT:
+ wpd->retries++;
+ if (wpd->retries < MAX_RETRIES)
{
- global_ret = GR_DATABASE_COMMIT_HARD_FAIL;
- GNUNET_SCHEDULER_shutdown ();
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Wire transfer %llu failed (%u), trying again\n",
+ (unsigned long long) wpd->row_id,
+ tr->http_status);
+ wpd->eh = TALER_BANK_transfer (ctx,
+ wpd->wa->auth,
+ &wpd[1],
+ wpd->buf_size,
+ &wire_confirm_cb,
+ wpd);
+ return;
}
- GNUNET_free (wpd);
- wpd = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Wire transaction %llu failed: %u/%d\n",
+ (unsigned long long) wpd->row_id,
+ tr->http_status,
+ tr->ec);
+ cleanup_wpd ();
+ db_plugin->rollback (db_plugin->cls);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Wire transfer %llu failed: %u/%d\n",
+ (unsigned long long) wpd->row_id,
+ tr->http_status,
+ tr->ec);
+ db_plugin->rollback (db_plugin->cls);
+ cleanup_wpd ();
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
return;
}
- GNUNET_free (wpd);
- wpd = NULL;
- switch (commit_or_warn (session))
+ shard->batch_end = GNUNET_MAX (wpd->row_id,
+ shard->batch_end);
+ switch (qs)
{
case GNUNET_DB_STATUS_SOFT_ERROR:
- /* try again */
+ db_plugin->rollback (db_plugin->cls);
+ cleanup_wpd ();
GNUNET_assert (NULL == task);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization failure, trying again immediately!\n");
task = GNUNET_SCHEDULER_add_now (&run_transfers,
NULL);
return;
case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
- global_ret = GR_DATABASE_COMMIT_HARD_FAIL;
+ db_plugin->rollback (db_plugin->cls);
+ cleanup_wpd ();
+ global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Wire transfer complete\n");
- /* continue with #run_transfers(), just to guard
- against the unlikely case that there are more. */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_transfers,
- NULL);
- return;
- default:
- GNUNET_break (0);
- global_ret = GR_INVARIANT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- return;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ GNUNET_CONTAINER_DLL_remove (wpd_head,
+ wpd_tail,
+ wpd);
+ GNUNET_free (wpd);
+ break;
}
+ if (NULL != wpd_head)
+ return; /* wait for other queries to complete */
+ batch_done ();
}
@@ -352,19 +530,40 @@ wire_prepare_cb (void *cls,
const char *buf,
size_t buf_size)
{
- struct TALER_EXCHANGEDB_WireAccount *wa;
+ struct WirePrepareData *wpd;
(void) cls;
+ if ( (NULL != task) ||
+ (EXIT_SUCCESS != global_ret) )
+ return; /* current transaction was aborted */
+ if (rowid >= shard->shard_end)
+ {
+ /* skip */
+ shard->batch_end = shard->shard_end - 1;
+ if (NULL != wpd_head)
+ return;
+ batch_done ();
+ return;
+ }
if ( (NULL == wire_method) ||
(NULL == buf) )
{
GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls,
- wpd->session);
- global_ret = GR_DATABASE_FETCH_FAILURE;
- goto cleanup;
+ db_plugin->rollback (db_plugin->cls);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
}
+ wpd = GNUNET_malloc (sizeof (struct WirePrepareData)
+ + buf_size);
+ GNUNET_memcpy (&wpd[1],
+ buf,
+ buf_size);
+ wpd->buf_size = buf_size;
wpd->row_id = rowid;
+ GNUNET_CONTAINER_DLL_insert (wpd_head,
+ wpd_tail,
+ wpd);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting wire transfer %llu\n",
(unsigned long long) rowid);
@@ -374,31 +573,27 @@ wire_prepare_cb (void *cls,
/* Should really never happen here, as when we get
here the wire account should be in the cache. */
GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls,
- wpd->session);
- global_ret = GR_WIRE_ACCOUNT_NOT_CONFIGURED;
- goto cleanup;
+ cleanup_wpd ();
+ db_plugin->rollback (db_plugin->cls);
+ global_ret = EXIT_NO_RESTART;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
}
- wa = wpd->wa;
wpd->eh = TALER_BANK_transfer (ctx,
- &wa->auth,
+ wpd->wa->auth,
buf,
buf_size,
&wire_confirm_cb,
- NULL);
+ wpd);
if (NULL == wpd->eh)
{
GNUNET_break (0); /* Irrecoverable */
- db_plugin->rollback (db_plugin->cls,
- wpd->session);
- global_ret = GR_WIRE_TRANSFER_BEGIN_FAIL;
- goto cleanup;
+ cleanup_wpd ();
+ db_plugin->rollback (db_plugin->cls);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
}
- return;
-cleanup:
- GNUNET_SCHEDULER_shutdown ();
- GNUNET_free (wpd);
- wpd = NULL;
}
@@ -412,58 +607,98 @@ static void
run_transfers (void *cls)
{
enum GNUNET_DB_QueryStatus qs;
- struct TALER_EXCHANGEDB_Session *session;
+ int64_t limit;
(void) cls;
task = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Checking for pending wire transfers\n");
- if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
+ limit = shard->shard_end - shard->batch_start;
+ if (0 >= limit)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to obtain database session!\n");
- global_ret = GR_DATABASE_SESSION_START_FAIL;
- GNUNET_SCHEDULER_shutdown ();
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Shard [%llu,%llu) completed\n",
+ (unsigned long long) shard->shard_start,
+ (unsigned long long) shard->batch_end);
+ qs = db_plugin->complete_shard (db_plugin->cls,
+ "transfer",
+ shard->shard_start,
+ shard->batch_end + 1);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_free (shard);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Got DB soft error for complete_shard. Rolling back.\n");
+ GNUNET_free (shard);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&select_shard,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* already existed, ok, let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ break;
+ }
+ shard_delay = GNUNET_TIME_absolute_get_duration (
+ shard->shard_start_time);
+ GNUNET_free (shard);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&select_shard,
+ NULL);
return;
}
+ /* cap number of parallel connections to a reasonable
+ limit for concurrent requests to the bank */
+ limit = GNUNET_MIN (limit,
+ 256);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Checking for %lld pending wire transfers [%llu-...)\n",
+ (long long) limit,
+ (unsigned long long) shard->batch_start);
if (GNUNET_OK !=
- db_plugin->start (db_plugin->cls,
- session,
- "aggregator run transfer"))
+ db_plugin->start_read_committed (db_plugin->cls,
+ "aggregator run transfer"))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
- global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
+ global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
- wpd = GNUNET_new (struct WirePrepareData);
- wpd->session = session;
+ GNUNET_assert (NULL == task);
qs = db_plugin->wire_prepare_data_get (db_plugin->cls,
- session,
+ shard->batch_start,
+ limit,
&wire_prepare_cb,
NULL);
- if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
- return; /* continued via continuation set in #wire_prepare_cb() */
- db_plugin->rollback (db_plugin->cls,
- session);
- GNUNET_free (wpd);
- wpd = NULL;
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
+ cleanup_wpd ();
+ db_plugin->rollback (db_plugin->cls);
GNUNET_break (0);
- global_ret = GR_DATABASE_COMMIT_HARD_FAIL;
+ global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization failure, trying again immediately!\n");
+ cleanup_wpd ();
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_transfers,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* no more prepared wire transfers, go sleep a bit! */
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == wpd_head);
GNUNET_assert (NULL == task);
if (GNUNET_YES == test_mode)
{
@@ -475,15 +710,105 @@ run_transfers (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No more pending wire transfers, going idle\n");
- task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
- &run_transfers,
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
+ &run_transfers_delayed,
+ NULL);
+ }
+ return;
+ default:
+ /* continued in wire_prepare_cb() */
+ return;
+ }
+}
+
+
+/**
+ * Select shard to process.
+ *
+ * @param cls NULL
+ */
+static void
+select_shard (void *cls)
+{
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_TIME_Relative delay;
+ uint64_t start;
+ uint64_t end;
+
+ (void) cls;
+ task = NULL;
+ GNUNET_assert (NULL == wpd_head);
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database connection!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (0 == max_workers)
+ delay = GNUNET_TIME_UNIT_ZERO;
+ else
+ delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
+ GNUNET_CRYPTO_QUALITY_WEAK,
+ 4 * GNUNET_TIME_relative_max (
+ transfer_idle_sleep_interval,
+ GNUNET_TIME_relative_multiply (shard_delay,
+ max_workers)).rel_value_us);
+ qs = db_plugin->begin_shard (db_plugin->cls,
+ "transfer",
+ delay,
+ shard_size,
+ &start,
+ &end);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain starting point for montoring from database!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* try again */
+ {
+ serialization_delay = GNUNET_TIME_randomized_backoff (serialization_delay,
+ GNUNET_TIME_UNIT_SECONDS);
+ GNUNET_assert (NULL == task);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization failure, trying again in %s!\n",
+ GNUNET_TIME_relative2s (serialization_delay,
+ true));
+ task = GNUNET_SCHEDULER_add_delayed (serialization_delay,
+ &select_shard,
NULL);
}
return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_delayed (transfer_idle_sleep_interval,
+ &select_shard,
+ NULL);
+ return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- /* should be impossible */
- GNUNET_assert (0);
+ /* continued below */
+ break;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Starting with shard [%llu,%llu)\n",
+ (unsigned long long) start,
+ (unsigned long long) end);
+ shard = GNUNET_new (struct Shard);
+ shard->shard_start_time = GNUNET_TIME_absolute_get ();
+ shard->shard_start = start;
+ shard->shard_end = end;
+ shard->batch_start = start;
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ NULL);
}
@@ -506,10 +831,10 @@ run (void *cls,
(void) cfgfile;
cfg = c;
- if (GNUNET_OK != parse_wirewatch_config ())
+ if (GNUNET_OK != parse_transfer_config ())
{
cfg = NULL;
- global_ret = GR_CONFIGURATION_INVALID;
+ global_ret = EXIT_NOTCONFIGURED;
return;
}
ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
@@ -520,9 +845,17 @@ run (void *cls,
GNUNET_break (0);
return;
}
-
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database connection!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_transfers,
+ task = GNUNET_SCHEDULER_add_now (&select_shard,
NULL);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
cls);
@@ -541,32 +874,44 @@ main (int argc,
char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_option_uint ('S',
+ "size",
+ "SIZE",
+ "Size to process per shard (default: 1024)",
+ &shard_size),
GNUNET_GETOPT_option_timetravel ('T',
"timetravel"),
GNUNET_GETOPT_option_flag ('t',
"test",
"run in test mode and exit when idle",
&test_mode),
+ GNUNET_GETOPT_option_uint ('w',
+ "workers",
+ "COUNT",
+ "Plan work load with up to COUNT worker processes (default: 16)",
+ &max_workers),
GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
GNUNET_GETOPT_OPTION_END
};
+ enum GNUNET_GenericReturnValue ret;
if (GNUNET_OK !=
GNUNET_STRINGS_get_utf8_args (argc, argv,
&argc, &argv))
- return GR_CMD_LINE_UTF8_ERROR;
- if (GNUNET_OK !=
- GNUNET_PROGRAM_run (argc, argv,
- "taler-exchange-transfer",
- gettext_noop (
- "background process that executes outgoing wire transfers"),
- options,
- &run, NULL))
- {
- GNUNET_free ((void *) argv);
- return GR_CMD_LINE_OPTIONS_WRONG;
- }
- GNUNET_free ((void *) argv);
+ return EXIT_INVALIDARGUMENT;
+ TALER_OS_init ();
+ ret = GNUNET_PROGRAM_run (
+ argc, argv,
+ "taler-exchange-transfer",
+ gettext_noop (
+ "background process that executes outgoing wire transfers"),
+ options,
+ &run, NULL);
+ GNUNET_free_nz ((void *) argv);
+ if (GNUNET_SYSERR == ret)
+ return EXIT_INVALIDARGUMENT;
+ if (GNUNET_NO == ret)
+ return EXIT_SUCCESS;
return global_ret;
}