summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-wirewatch.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-wirewatch.c')
-rw-r--r--src/exchange/taler-exchange-wirewatch.c1061
1 files changed, 723 insertions, 338 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index 47f2bdb6b..da5d9c098 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2016--2020 Taler Systems SA
+ Copyright (C) 2016--2023 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -13,7 +13,6 @@
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-
/**
* @file taler-exchange-wirewatch.c
* @brief Process that watches for wire transfers to the exchange's bank account
@@ -29,106 +28,125 @@
#include "taler_json_lib.h"
#include "taler_bank_service.h"
-#define DEBUG_LOGGING 0
/**
- * What is the initial batch size we use for credit history
+ * How long to wait for an HTTP reply if there
+ * are no transactions pending at the server?
+ */
+#define LONGPOLL_TIMEOUT GNUNET_TIME_UNIT_MINUTES
+
+/**
+ * What is the maximum batch size we use for credit history
* requests with the bank. See `batch_size` below.
*/
-#define INITIAL_BATCH_SIZE 1024
+#define MAXIMUM_BATCH_SIZE 1024
/**
- * Information we keep for each supported account.
+ * Information about our account.
*/
-struct WireAccount
-{
- /**
- * Accounts are kept in a DLL.
- */
- struct WireAccount *next;
+static const struct TALER_EXCHANGEDB_AccountInfo *ai;
- /**
- * Plugins are kept in a DLL.
- */
- struct WireAccount *prev;
+/**
+ * Active request for history.
+ */
+static struct TALER_BANK_CreditHistoryHandle *hh;
- /**
- * Name of the section that configures this account.
- */
- char *section_name;
+/**
+ * Set to true if the request for history did actually
+ * return transaction items.
+ */
+static bool hh_returned_data;
- /**
- * Database session we are using for the current transaction.
- */
- struct TALER_EXCHANGEDB_Session *session;
+/**
+ * Set to true if the request for history did not
+ * succeed because the account was unknown.
+ */
+static bool hh_account_404;
- /**
- * Active request for history.
- */
- struct TALER_BANK_CreditHistoryHandle *hh;
+/**
+ * When did we start the last @e hh request?
+ */
+static struct GNUNET_TIME_Absolute hh_start_time;
- /**
- * Authentication data.
- */
- struct TALER_BANK_AuthenticationData auth;
+/**
+ * Until when is processing this wire plugin delayed?
+ */
+static struct GNUNET_TIME_Absolute delayed_until;
- /**
- * Until when is processing this wire plugin delayed?
- */
- struct GNUNET_TIME_Absolute delayed_until;
+/**
+ * Encoded offset in the wire transfer list from where
+ * to start the next query with the bank.
+ */
+static uint64_t batch_start;
- /**
- * Encoded offset in the wire transfer list from where
- * to start the next query with the bank.
- */
- uint64_t last_row_off;
+/**
+ * Latest row offset seen in this transaction, becomes
+ * the new #batch_start upon commit.
+ */
+static uint64_t latest_row_off;
- /**
- * Latest row offset seen in this transaction, becomes
- * the new #last_row_off upon commit.
- */
- uint64_t latest_row_off;
+/**
+ * Offset where our current shard begins (inclusive).
+ */
+static uint64_t shard_start;
- /**
- * How many transactions do we retrieve per batch?
- */
- unsigned int batch_size;
+/**
+ * Offset where our current shard ends (exclusive).
+ */
+static uint64_t shard_end;
- /**
- * How many transactions did we see in the current batch?
- */
- unsigned int current_batch_size;
+/**
+ * When did we start with the shard?
+ */
+static struct GNUNET_TIME_Absolute shard_start_time;
- /**
- * Are we running from scratch and should re-process all transactions
- * for this account?
- */
- int reset_mode;
+/**
+ * For how long did we lock the shard?
+ */
+static struct GNUNET_TIME_Absolute shard_end_time;
- /**
- * Should we delay the next request to the wire plugin a bit? Set to
- * #GNUNET_NO if we actually did some work.
- */
- int delay;
+/**
+ * How long did we take to finish the last shard
+ * for this account?
+ */
+static struct GNUNET_TIME_Relative shard_delay;
-};
+/**
+ * How long did we take to finish the last shard
+ * for this account?
+ */
+static struct GNUNET_TIME_Relative longpoll_timeout;
+/**
+ * Name of our job in the shard table.
+ */
+static char *job_name;
/**
- * Head of list of loaded wire plugins.
+ * How many transactions do we retrieve per batch?
*/
-static struct WireAccount *wa_head;
+static unsigned int batch_size;
/**
- * Tail of list of loaded wire plugins.
+ * How much do we increment @e batch_size on success?
*/
-static struct WireAccount *wa_tail;
+static unsigned int batch_thresh;
/**
- * Wire account we are currently processing. This would go away
- * if we ever start processing all accounts in parallel.
+ * Did work remain in the transaction queue? Set to true
+ * if we did some work and thus there might be more.
*/
-static struct WireAccount *wa_pos;
+static bool progress;
+
+/**
+ * Did we start a transaction yet?
+ */
+static bool started_transaction;
+
+/**
+ * Is this shard still open for processing.
+ */
+static bool shard_open;
/**
* Handle to the context for interacting with the bank.
@@ -152,24 +170,39 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/**
* How long should we sleep when idle before trying to find more work?
+ * Also used for how long we wait to grab a shard before trying it again.
+ * The value should be set to a bit above the average time it takes to
+ * process a shard.
*/
static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
+ * How long do we sleep on serialization conflicts?
+ */
+static struct GNUNET_TIME_Relative wirewatch_conflict_sleep_interval;
+
+/**
+ * Modulus to apply to group shards. The shard size must ultimately be a
+ * multiple of the batch size. Thus, if this is not a multiple of the
+ * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
+ */
+static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
+
+/**
+ * How many workers should we plan our scheduling with?
+ */
+static unsigned int max_workers = 16;
+
+/**
+ * -e command-line option: exit on errors talking to the bank?
+ */
+static int exit_on_error;
+
+/**
* Value to return from main(). 0 on success, non-zero on
* on serious errors.
*/
-static enum
-{
- GR_SUCCESS = 0,
- GR_DATABASE_SESSION_FAIL = 1,
- GR_DATABASE_TRANSACTION_BEGIN_FAIL = 2,
- GR_DATABASE_SELECT_LATEST_HARD_FAIL = 3,
- GR_BANK_REQUEST_HISTORY_FAIL = 4,
- GR_CONFIGURATION_INVALID = 5,
- GR_CMD_LINE_UTF8_ERROR = 6,
- GR_CMD_LINE_OPTIONS_WRONG = 7,
-} global_ret;
+static int global_ret;
/**
* Are we run in testing mode and should only do one pass?
@@ -177,15 +210,20 @@ static enum
static int test_mode;
/**
- * Are we running from scratch and should re-process all transactions?
+ * Should we ignore if the bank does not know our bank
+ * account?
*/
-static int reset_mode;
+static int ignore_account_404;
/**
* Current task waiting for execution, if any.
*/
static struct GNUNET_SCHEDULER_Task *task;
+/**
+ * Name of the configuration section with the account we should watch.
+ */
+static char *account_section;
/**
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
@@ -195,27 +233,32 @@ static struct GNUNET_SCHEDULER_Task *task;
static void
shutdown_task (void *cls)
{
+ enum GNUNET_DB_QueryStatus qs;
(void) cls;
- {
- struct WireAccount *wa;
- while (NULL != (wa = wa_head))
- {
- if (NULL != wa->hh)
- {
- TALER_BANK_credit_history_cancel (wa->hh);
- wa->hh = NULL;
- }
- GNUNET_CONTAINER_DLL_remove (wa_head,
- wa_tail,
- wa);
- TALER_BANK_auth_free (&wa->auth);
- GNUNET_free (wa->section_name);
- GNUNET_free (wa);
- }
+ if (NULL != hh)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "History request cancelled on shutdown\n");
+ TALER_BANK_credit_history_cancel (hh);
+ hh = NULL;
}
- wa_pos = NULL;
-
+ if (started_transaction)
+ {
+ db_plugin->rollback (db_plugin->cls);
+ started_transaction = false;
+ }
+ if (shard_open)
+ {
+ qs = db_plugin->abort_shard (db_plugin->cls,
+ job_name,
+ shard_start,
+ shard_end);
+ if (qs <= 0)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to abort work shard on shutdown\n");
+ }
+ GNUNET_free (job_name);
if (NULL != ctx)
{
GNUNET_CURL_fini (ctx);
@@ -233,6 +276,8 @@ shutdown_task (void *cls)
}
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
+ TALER_EXCHANGEDB_unload_accounts ();
+ cfg = NULL;
}
@@ -241,35 +286,36 @@ shutdown_task (void *cls)
* account to our list (if it is enabled and we can load the plugin).
*
* @param cls closure, NULL
- * @param ai account information
+ * @param in_ai account information
*/
static void
add_account_cb (void *cls,
- const struct TALER_EXCHANGEDB_AccountInfo *ai)
+ const struct TALER_EXCHANGEDB_AccountInfo *in_ai)
{
- struct WireAccount *wa;
-
(void) cls;
- if (GNUNET_YES != ai->credit_enabled)
+ if (! in_ai->credit_enabled)
return; /* not enabled for us, skip */
- wa = GNUNET_new (struct WireAccount);
- wa->reset_mode = reset_mode;
- if (GNUNET_OK !=
- TALER_BANK_auth_parse_cfg (cfg,
- ai->section_name,
- &wa->auth))
+ if ( (NULL != account_section) &&
+ (0 != strcasecmp (in_ai->section_name,
+ account_section)) )
+ return; /* not enabled for us, skip */
+ if (NULL != ai)
{
- GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
- "Failed to load account `%s'\n",
- ai->section_name);
- GNUNET_free (wa);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Multiple accounts enabled (%s and %s), use '-a' command-line option to select one!\n",
+ ai->section_name,
+ in_ai->section_name);
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_INVALIDARGUMENT;
return;
}
- wa->section_name = GNUNET_strdup (ai->section_name);
- wa->batch_size = INITIAL_BATCH_SIZE;
- GNUNET_CONTAINER_DLL_insert (wa_head,
- wa_tail,
- wa);
+ ai = in_ai;
+ GNUNET_asprintf (&job_name,
+ "wirewatch-%s",
+ ai->section_name);
+ batch_size = MAXIMUM_BATCH_SIZE;
+ if (0 != shard_size % batch_size)
+ batch_size = shard_size;
}
@@ -279,7 +325,7 @@ add_account_cb (void *cls,
*
* @return #GNUNET_OK on success
*/
-static int
+static enum GNUNET_GenericReturnValue
exchange_serve_process_config (void)
{
if (GNUNET_OK !=
@@ -296,18 +342,26 @@ exchange_serve_process_config (void)
if (NULL ==
(db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
{
- fprintf (stderr,
- "Failed to initialize DB subsystem\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to initialize DB subsystem\n");
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_OK !=
+ TALER_EXCHANGEDB_load_accounts (cfg,
+ TALER_EXCHANGEDB_ALO_CREDIT
+ | TALER_EXCHANGEDB_ALO_AUTHDATA))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "No wire accounts configured for credit!\n");
return GNUNET_SYSERR;
}
- TALER_EXCHANGEDB_find_accounts (cfg,
- &add_account_cb,
+ TALER_EXCHANGEDB_find_accounts (&add_account_cb,
NULL);
- if (NULL == wa_head)
+ if (NULL == ai)
{
- fprintf (stderr,
- "No wire accounts configured for credit!\n");
- TALER_EXCHANGEDB_plugin_unload (db_plugin);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "No accounts enabled for credit!\n");
+ GNUNET_SCHEDULER_shutdown ();
return GNUNET_SYSERR;
}
return GNUNET_OK;
@@ -315,248 +369,550 @@ exchange_serve_process_config (void)
/**
- * Query for incoming wire transfers.
+ * Lock a shard and then begin to query for incoming wire transfers.
*
* @param cls NULL
*/
static void
-find_transfers (void *cls);
+lock_shard (void *cls);
/**
- * Callbacks of this type are used to serve the result of asking
- * the bank for the transaction history.
+ * Continue with the credit history of the shard.
*
- * @param cls closure with the `struct WioreAccount *` we are processing
- * @param http_status HTTP status code from the server
- * @param ec taler error code
- * @param serial_id identification of the position at which we are querying
- * @param details details about the wire transfer
- * @param json raw JSON response
- * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
- */
-static int
-history_cb (void *cls,
- unsigned int http_status,
- enum TALER_ErrorCode ec,
- uint64_t serial_id,
- const struct TALER_BANK_CreditDetails *details,
- const json_t *json)
+ * @param cls NULL
+ */
+static void
+continue_with_shard (void *cls);
+
+
+/**
+ * We encountered a serialization error. Rollback the transaction and try
+ * again.
+ */
+static void
+handle_soft_error (void)
+{
+ db_plugin->rollback (db_plugin->cls);
+ started_transaction = false;
+ if (1 < batch_size)
+ {
+ batch_thresh = batch_size;
+ batch_size /= 2;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Reduced batch size to %llu due to serialization issue\n",
+ (unsigned long long) batch_size);
+ }
+ /* Reset to beginning of transaction, and go again
+ from there. */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Encountered soft error, resetting start point to batch start\n");
+ latest_row_off = batch_start;
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+ NULL);
+}
+
+
+/**
+ * Schedule the #lock_shard() operation.
+ */
+static void
+schedule_transfers (void)
+{
+ if (shard_open)
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Will retry my shard (%llu,%llu] of %s in %s\n",
+ (unsigned long long) shard_start,
+ (unsigned long long) shard_end,
+ job_name,
+ GNUNET_STRINGS_relative_time_to_string (
+ GNUNET_TIME_absolute_get_remaining (delayed_until),
+ true));
+ else
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Will try to lock next shard of %s in %s\n",
+ job_name,
+ GNUNET_STRINGS_relative_time_to_string (
+ GNUNET_TIME_absolute_get_remaining (delayed_until),
+ true));
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_at (delayed_until,
+ &lock_shard,
+ NULL);
+}
+
+
+/**
+ * We are done with the work that is possible right now (and the transaction
+ * was committed, if there was one to commit). Move on to the next shard.
+ */
+static void
+transaction_completed (void)
+{
+ if ( (batch_start + batch_size ==
+ latest_row_off) &&
+ (batch_size < MAXIMUM_BATCH_SIZE) )
+ {
+ /* The current batch size worked without serialization
+ issues, and we are allowed to grow. Do so slowly. */
+ int delta;
+
+ delta = ((int) batch_thresh - (int) batch_size) / 4;
+ if (delta < 0)
+ delta = -delta;
+ batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
+ batch_size + delta + 1);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Increasing batch size to %llu\n",
+ (unsigned long long) batch_size);
+ }
+
+ if ( (! progress) && test_mode)
+ {
+ /* Transaction list was drained and we are in
+ test mode. So we are done. */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Transaction list drained and in test mode. Exiting\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (! (hh_returned_data || hh_account_404) )
+ {
+ /* Enforce long-polling delay even if the server ignored it
+ and returned earlier */
+ struct GNUNET_TIME_Relative latency;
+ struct GNUNET_TIME_Relative left;
+
+ latency = GNUNET_TIME_absolute_get_duration (hh_start_time);
+ left = GNUNET_TIME_relative_subtract (longpoll_timeout,
+ latency);
+ if (! (test_mode ||
+ GNUNET_TIME_relative_is_zero (left)) )
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Server did not respect long-polling, enforcing client-side by sleeping for %s\n",
+ GNUNET_TIME_relative2s (left,
+ true));
+ delayed_until = GNUNET_TIME_relative_to_absolute (left);
+ }
+ if (hh_account_404)
+ delayed_until = GNUNET_TIME_relative_to_absolute (
+ GNUNET_TIME_UNIT_MILLISECONDS);
+ if (test_mode)
+ delayed_until = GNUNET_TIME_UNIT_ZERO_ABS;
+ GNUNET_assert (NULL == task);
+ schedule_transfers ();
+}
+
+
+/**
+ * We got incoming transaction details from the bank. Add them
+ * to the database.
+ *
+ * @param details array of transaction details
+ * @param details_length length of the @a details array
+ */
+static void
+process_reply (const struct TALER_BANK_CreditDetails *details,
+ unsigned int details_length)
{
- struct WireAccount *wa = cls;
- struct TALER_EXCHANGEDB_Session *session = wa->session;
enum GNUNET_DB_QueryStatus qs;
+ bool shard_done;
+ uint64_t lroff = latest_row_off;
- (void) json;
- if (NULL == details)
+ if (0 == details_length)
{
- wa->hh = NULL;
- if (TALER_EC_NONE != ec)
+ /* Server should have used 204, not 200! */
+ GNUNET_break_op (0);
+ transaction_completed ();
+ return;
+ }
+ hh_returned_data = true;
+ /* check serial IDs for range constraints */
+ for (unsigned int i = 0; i<details_length; i++)
+ {
+ const struct TALER_BANK_CreditDetails *cd = &details[i];
+
+ if (cd->serial_id < lroff)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Error fetching history: ec=%u, http_status=%u\n",
- (unsigned int) ec,
- http_status);
+ "Serial ID %llu not monotonic (got %llu before). Failing!\n",
+ (unsigned long long) cd->serial_id,
+ (unsigned long long) lroff);
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "End of list. Committing progress!\n");
- qs = db_plugin->commit (db_plugin->cls,
- session);
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ if (cd->serial_id > shard_end)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Got DB soft error for commit\n");
- /* reduce transaction size to reduce rollback probability */
- if (2 > wa->current_batch_size)
- wa->current_batch_size /= 2;
- /* try again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
- return GNUNET_OK; /* will be ignored anyway */
+ /* we are *past* the current shard (likely because the serial_id of the
+ shard_end happens to not exist in the DB). So commit and stop this
+ iteration! */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Serial ID %llu past shard end at %llu, ending iteration early!\n",
+ (unsigned long long) cd->serial_id,
+ (unsigned long long) shard_end);
+ details_length = i;
+ progress = true;
+ lroff = cd->serial_id - 1;
+ break;
}
- if (0 < qs)
+ lroff = cd->serial_id;
+ }
+ if (0 != details_length)
+ {
+ enum GNUNET_DB_QueryStatus qss[details_length];
+ struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length];
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Importing %u transactions\n",
+ details_length);
+ for (unsigned int i = 0; i<details_length; i++)
{
- /* transaction success, update #last_row_off */
- wa->last_row_off = wa->latest_row_off;
- wa->latest_row_off = 0; /* should not be needed */
- wa->session = NULL; /* should not be needed */
- /* if successful at limit, try increasing transaction batch size (AIMD) */
- if ( (wa->current_batch_size == wa->batch_size) &&
- (UINT_MAX > wa->batch_size) )
- wa->batch_size++;
+ const struct TALER_BANK_CreditDetails *cd = &details[i];
+ struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[i];
+
+ res->reserve_pub = &cd->reserve_pub;
+ res->balance = &cd->amount;
+ res->execution_time = cd->execution_date;
+ res->sender_account_details = cd->debit_account_uri;
+ res->exchange_account_name = ai->section_name;
+ res->wire_reference = cd->serial_id;
}
- GNUNET_break (0 <= qs);
- if ( (GNUNET_YES == wa->delay) &&
- (test_mode) &&
- (NULL == wa->next) )
+ qs = db_plugin->reserves_in_insert (db_plugin->cls,
+ reserves,
+ details_length,
+ qss);
+ switch (qs)
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Shutdown due to test mode!\n");
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
- return GNUNET_OK;
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for reserves_in_insert (%u). Rolling back.\n",
+ details_length);
+ handle_soft_error ();
+ return;
+ default:
+ break;
}
- if (GNUNET_YES == wa->delay)
+ for (unsigned int i = 0; i<details_length; i++)
{
- wa->delayed_until
- = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
- wa_pos = wa_pos->next;
- if (NULL == wa_pos)
- wa_pos = wa_head;
- GNUNET_assert (NULL != wa_pos);
+ const struct TALER_BANK_CreditDetails *cd = &details[i];
+
+ switch (qss[i])
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n",
+ i);
+ handle_soft_error ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* Either wirewatch was freshly started after the system was
+ shutdown and we're going over an incomplete shard again
+ after being restarted, or the shard lock period was too
+ short (number of workers set incorrectly?) and a 2nd
+ wirewatcher has been stealing our work while we are still
+ at it. */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Attempted to import transaction %llu (%s) twice. "
+ "This should happen rarely (if not, ask for support).\n",
+ (unsigned long long) cd->serial_id,
+ job_name);
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Imported transaction %llu.\n",
+ (unsigned long long) cd->serial_id);
+ /* normal case */
+ progress = true;
+ break;
+ }
}
- task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
- &find_transfers,
- NULL);
- return GNUNET_OK; /* will be ignored anyway */
}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Adding wire transfer over %s with (hashed) subject `%s'\n",
- TALER_amount2s (&details->amount),
- TALER_B2S (&details->reserve_pub));
-
- /**
- * Debug block.
- */
-#if DEBUG_LOGGING
+
+ latest_row_off = lroff;
+ shard_done = (shard_end <= latest_row_off);
+ if (shard_done)
{
- /** Should be 53, give 80 just to be extra conservative (and aligned). */
-#define PUBSIZE 80
- char wtid_s[PUBSIZE];
-
- GNUNET_break (NULL !=
- GNUNET_STRINGS_data_to_string (&details->reserve_pub,
- sizeof (details->reserve_pub),
- &wtid_s[0],
- PUBSIZE));
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Plain text subject (= reserve_pub): %s\n",
- wtid_s);
+ /* shard is complete, mark this as well */
+ qs = db_plugin->complete_shard (db_plugin->cls,
+ job_name,
+ shard_start,
+ shard_end);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for complete_shard. Rolling back.\n");
+ handle_soft_error ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0);
+ /* Not expected, but let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ progress = true;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Completed shard %s (%llu,%llu] after %s\n",
+ job_name,
+ (unsigned long long) shard_start,
+ (unsigned long long) shard_end,
+ GNUNET_STRINGS_relative_time_to_string (
+ GNUNET_TIME_absolute_get_duration (shard_start_time),
+ true));
+ break;
+ }
+ shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
+ shard_open = false;
+ transaction_completed ();
+ return;
}
-#endif
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+ NULL);
+}
+
- if (wa->current_batch_size < UINT_MAX)
- wa->current_batch_size++;
- qs = db_plugin->reserves_in_insert (db_plugin->cls,
- session,
- &details->reserve_pub,
- &details->amount,
- details->execution_date,
- details->debit_account_url,
- wa->section_name,
- serial_id);
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+/**
+ * Callbacks of this type are used to serve the result of asking
+ * the bank for the transaction history.
+ *
+ * @param cls NULL
+ * @param reply response we got from the bank
+ */
+static void
+history_cb (void *cls,
+ const struct TALER_BANK_CreditHistoryResponse *reply)
+{
+ (void) cls;
+ GNUNET_assert (NULL == task);
+ hh = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "History request returned with HTTP status %u\n",
+ reply->http_status);
+ switch (reply->http_status)
{
- GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls,
- session);
- GNUNET_SCHEDULER_shutdown ();
- return GNUNET_SYSERR;
+ case MHD_HTTP_OK:
+ process_reply (reply->details.ok.details,
+ reply->details.ok.details_length);
+ return;
+ case MHD_HTTP_NO_CONTENT:
+ transaction_completed ();
+ return;
+ case MHD_HTTP_NOT_FOUND:
+ hh_account_404 = true;
+ if (ignore_account_404)
+ {
+ transaction_completed ();
+ return;
+ }
+ break;
+ default:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error fetching history: %s (%u)\n",
+ TALER_ErrorCode_get_hint (reply->ec),
+ reply->http_status);
+ break;
}
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ if (! exit_on_error)
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Got DB soft error for reserves_in_insert. Rolling back.\n");
- db_plugin->rollback (db_plugin->cls,
- session);
- /* try again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
- return GNUNET_SYSERR;
+ transaction_completed ();
+ return;
+ }
+ GNUNET_SCHEDULER_shutdown ();
+}
+
+
+static void
+continue_with_shard (void *cls)
+{
+ unsigned int limit;
+
+ (void) cls;
+ task = NULL;
+ GNUNET_assert (shard_end > latest_row_off);
+ limit = GNUNET_MIN (batch_size,
+ shard_end - latest_row_off);
+ GNUNET_assert (NULL == hh);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Requesting credit history starting from %llu\n",
+ (unsigned long long) latest_row_off);
+ hh_start_time = GNUNET_TIME_absolute_get ();
+ hh_returned_data = false;
+ hh_account_404 = false;
+ hh = TALER_BANK_credit_history (ctx,
+ ai->auth,
+ latest_row_off,
+ limit,
+ test_mode
+ ? GNUNET_TIME_UNIT_ZERO
+ : longpoll_timeout,
+ &history_cb,
+ NULL);
+ if (NULL == hh)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to start request for account history!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
}
- wa->delay = GNUNET_NO;
- wa->latest_row_off = serial_id;
- return GNUNET_OK;
}
/**
- * Query for incoming wire transfers.
+ * Reserve a shard for us to work on.
*
* @param cls NULL
*/
static void
-find_transfers (void *cls)
+lock_shard (void *cls)
{
- struct TALER_EXCHANGEDB_Session *session;
enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_TIME_Relative delay;
+ uint64_t last_shard_start = shard_start;
+ uint64_t last_shard_end = shard_end;
(void) cls;
task = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Checking for incoming wire transfers\n");
- if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to obtain database session!\n");
- global_ret = GR_DATABASE_SESSION_FAIL;
+ "Failed to obtain database connection!\n");
+ global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
- db_plugin->preflight (db_plugin->cls,
- session);
- if (GNUNET_OK !=
- db_plugin->start (db_plugin->cls,
- session,
- "wirewatch check for incoming wire transfers"))
+ if ( (shard_open) &&
+ (GNUNET_TIME_absolute_is_future (shard_end_time)) )
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to start database transaction!\n");
- global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
- GNUNET_SCHEDULER_shutdown ();
+ progress = false;
+ batch_start = latest_row_off;
+ task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+ NULL);
return;
}
- if (! wa_pos->reset_mode)
+ if (shard_open)
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Shard not completed in time, will try to re-acquire\n");
+ /* How long we lock a shard depends on the number of
+ workers expected, and how long we usually took to
+ process a shard. */
+ if (0 == max_workers)
+ delay = GNUNET_TIME_UNIT_ZERO;
+ else
+ delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
+ GNUNET_CRYPTO_QUALITY_WEAK,
+ 4 * GNUNET_TIME_relative_max (
+ wirewatch_idle_sleep_interval,
+ GNUNET_TIME_relative_multiply (shard_delay,
+ max_workers)).rel_value_us);
+ shard_start_time = GNUNET_TIME_absolute_get ();
+ qs = db_plugin->begin_shard (db_plugin->cls,
+ job_name,
+ delay,
+ shard_size,
+ &shard_start,
+ &shard_end);
+ switch (qs)
{
- qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls,
- session,
- wa_pos->section_name,
- &wa_pos->last_row_off);
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to obtain starting point for montoring from database!\n");
- db_plugin->rollback (db_plugin->cls,
- session);
- global_ret = GR_DATABASE_SELECT_LATEST_HARD_FAIL;
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain starting point for montoring from database!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* try again */
{
- /* try again */
- db_plugin->rollback (db_plugin->cls,
- session);
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
- return;
+ struct GNUNET_TIME_Relative rdelay;
+
+ wirewatch_conflict_sleep_interval
+ = GNUNET_TIME_STD_BACKOFF (wirewatch_conflict_sleep_interval);
+ rdelay = GNUNET_TIME_randomize (wirewatch_conflict_sleep_interval);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization error tying to obtain shard %s, will try again in %s!\n",
+ job_name,
+ GNUNET_STRINGS_relative_time_to_string (rdelay,
+ true));
+#if 1
+ if (GNUNET_TIME_relative_cmp (rdelay,
+ >,
+ GNUNET_TIME_UNIT_SECONDS))
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Delay would have been for %s\n",
+ GNUNET_TIME_relative2s (rdelay,
+ true));
+ rdelay = GNUNET_TIME_relative_min (rdelay,
+ GNUNET_TIME_UNIT_SECONDS);
+#endif
+ delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);
}
- wa_pos->reset_mode = GNUNET_NO;
+ GNUNET_assert (NULL == task);
+ schedule_transfers ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "No shard available, will try again for %s in %s!\n",
+ job_name,
+ GNUNET_STRINGS_relative_time_to_string (
+ wirewatch_idle_sleep_interval,
+ true));
+ delayed_until = GNUNET_TIME_relative_to_absolute (
+ wirewatch_idle_sleep_interval);
+ shard_open = false;
+ GNUNET_assert (NULL == task);
+ schedule_transfers ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* continued below */
+ wirewatch_conflict_sleep_interval = GNUNET_TIME_UNIT_ZERO;
+ break;
}
- wa_pos->delay = GNUNET_YES;
- wa_pos->current_batch_size = 0; /* reset counter */
-
+ shard_end_time = GNUNET_TIME_relative_to_absolute (delay);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "wirewatch: requesting incoming history from %s\n",
- wa_pos->auth.wire_gateway_url);
- wa_pos->session = session;
- wa_pos->hh = TALER_BANK_credit_history (ctx,
- &wa_pos->auth,
- wa_pos->last_row_off,
- wa_pos->batch_size,
- &history_cb,
- wa_pos);
- if (NULL == wa_pos->hh)
+ "Starting with shard %s at (%llu,%llu] locked for %s\n",
+ job_name,
+ (unsigned long long) shard_start,
+ (unsigned long long) shard_end,
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ true));
+ progress = false;
+ batch_start = shard_start;
+ if ( (shard_open) &&
+ (shard_start == last_shard_start) &&
+ (shard_end == last_shard_end) )
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to start request for account history!\n");
- db_plugin->rollback (db_plugin->cls,
- session);
- global_ret = GR_BANK_REQUEST_HISTORY_FAIL;
- GNUNET_SCHEDULER_shutdown ();
- return;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Continuing from %llu\n",
+ (unsigned long long) latest_row_off);
+ GNUNET_break (latest_row_off >= batch_start); /* resume where we left things */
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Resetting shard start to original start point (%d)\n",
+ shard_open ? 1 : 0);
+ latest_row_off = batch_start;
}
+ shard_open = true;
+ task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+ NULL);
}
@@ -577,27 +933,28 @@ run (void *cls,
(void) cls;
(void) args;
(void) cfgfile;
+
cfg = c;
+ GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+ cls);
if (GNUNET_OK !=
exchange_serve_process_config ())
{
- global_ret = GR_CONFIGURATION_INVALID;
+ global_ret = EXIT_NOTCONFIGURED;
+ GNUNET_SCHEDULER_shutdown ();
return;
}
- wa_pos = wa_head;
- GNUNET_assert (NULL != wa_pos);
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
- GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
- cls);
ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
&rc);
- rc = GNUNET_CURL_gnunet_rc_create (ctx);
if (NULL == ctx)
{
GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_NO_RESTART;
return;
}
+ rc = GNUNET_CURL_gnunet_rc_create (ctx);
+ schedule_transfers ();
}
@@ -613,35 +970,63 @@ main (int argc,
char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_option_string ('a',
+ "account",
+ "SECTION_NAME",
+ "name of the configuration section with the account we should watch (needed if more than one is enabled for crediting)",
+ &account_section),
+ GNUNET_GETOPT_option_flag ('e',
+ "exit-on-error",
+ "terminate wirewatch if we failed to download information from the bank",
+ &exit_on_error),
+ GNUNET_GETOPT_option_relative_time ('f',
+ "longpoll-timeout",
+ "DELAY",
+ "what is the timeout when asking the bank about new transactions, specify with unit (e.g. --longpoll-timeout=30s)",
+ &longpoll_timeout),
+ GNUNET_GETOPT_option_flag ('I',
+ "ignore-not-found",
+ "continue, even if the bank account of the exchange was not found",
+ &ignore_account_404),
+ GNUNET_GETOPT_option_uint ('S',
+ "size",
+ "SIZE",
+ "Size to process per shard (default: 1024)",
+ &shard_size),
GNUNET_GETOPT_option_timetravel ('T',
"timetravel"),
GNUNET_GETOPT_option_flag ('t',
"test",
"run in test mode and exit when idle",
&test_mode),
- GNUNET_GETOPT_option_flag ('r',
- "reset",
- "start fresh with all transactions in the history",
- &reset_mode),
+ GNUNET_GETOPT_option_uint ('w',
+ "workers",
+ "COUNT",
+ "Plan work load with up to COUNT worker processes (default: 16)",
+ &max_workers),
+ GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
GNUNET_GETOPT_OPTION_END
};
+ enum GNUNET_GenericReturnValue ret;
+ longpoll_timeout = LONGPOLL_TIMEOUT;
if (GNUNET_OK !=
GNUNET_STRINGS_get_utf8_args (argc, argv,
&argc, &argv))
- return GR_CMD_LINE_UTF8_ERROR;
- if (GNUNET_OK !=
- GNUNET_PROGRAM_run (argc, argv,
- "taler-exchange-wirewatch",
- gettext_noop (
- "background process that watches for incoming wire transfers from customers"),
- options,
- &run, NULL))
- {
- GNUNET_free_nz ((void *) argv);
- return GR_CMD_LINE_OPTIONS_WRONG;
- }
+ return EXIT_INVALIDARGUMENT;
+ TALER_OS_init ();
+ ret = GNUNET_PROGRAM_run (
+ argc, argv,
+ "taler-exchange-wirewatch",
+ gettext_noop (
+ "background process that watches for incoming wire transfers from customers"),
+ options,
+ &run, NULL);
GNUNET_free_nz ((void *) argv);
+ if (GNUNET_SYSERR == ret)
+ return EXIT_INVALIDARGUMENT;
+ if (GNUNET_NO == ret)
+ return EXIT_SUCCESS;
return global_ret;
}