summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2022-11-17 21:50:20 +0100
committerChristian Grothoff <christian@grothoff.org>2022-11-17 21:50:20 +0100
commit30997afc7fc81f1fa6af85c754390209d0200a67 (patch)
tree0bf8634ba6e33657d2da71187cdb49425e9c22e7
parenta2371912ee7fffe4d4cbb8ba1146b2c3d72ff7be (diff)
downloadexchange-30997afc7fc81f1fa6af85c754390209d0200a67.tar.gz
exchange-30997afc7fc81f1fa6af85c754390209d0200a67.tar.bz2
exchange-30997afc7fc81f1fa6af85c754390209d0200a67.zip
-more work on wirewatch revision
-rw-r--r--src/exchange/taler-exchange-wirewatch.c814
1 files changed, 358 insertions, 456 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index d84344fc8..0d902bf25 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -13,7 +13,6 @@
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-
/**
* @file taler-exchange-wirewatch.c
* @brief Process that watches for wire transfers to the exchange's bank account
@@ -43,122 +42,88 @@
#define MAXIMUM_BATCH_SIZE 1024
/**
- * Information we keep for each supported account.
+ * Information about our account.
*/
-struct WireAccount
-{
- /**
- * Accounts are kept in a DLL.
- */
- struct WireAccount *next;
-
- /**
- * Plugins are kept in a DLL.
- */
- struct WireAccount *prev;
-
- /**
- * Information about this account.
- */
- const struct TALER_EXCHANGEDB_AccountInfo *ai;
-
- /**
- * Active request for history.
- */
- struct TALER_BANK_CreditHistoryHandle *hh;
-
- /**
- * Until when is processing this wire plugin delayed?
- */
- struct GNUNET_TIME_Absolute delayed_until;
-
- /**
- * Encoded offset in the wire transfer list from where
- * to start the next query with the bank.
- */
- uint64_t batch_start;
-
- /**
- * Latest row offset seen in this transaction, becomes
- * the new #batch_start upon commit.
- */
- uint64_t latest_row_off;
-
- /**
- * Maximum row offset this transaction may yield. If we got the
- * maximum number of rows, we must not @e delay before running
- * the next transaction.
- */
- uint64_t max_row_off;
-
- /**
- * Offset where our current shard begins (inclusive).
- */
- uint64_t shard_start;
-
- /**
- * Offset where our current shard ends (exclusive).
- */
- uint64_t shard_end;
-
- /**
- * When did we start with the shard?
- */
- struct GNUNET_TIME_Absolute shard_start_time;
-
- /**
- * For how long did we lock the shard?
- */
- struct GNUNET_TIME_Absolute shard_end_time;
-
- /**
- * How long did we take to finish the last shard
- * for this account?
- */
- struct GNUNET_TIME_Relative shard_delay;
-
- /**
- * Name of our job in the shard table.
- */
- char *job_name;
-
- /**
- * How many transactions do we retrieve per batch?
- */
- unsigned int batch_size;
-
- /**
- * How much do we increment @e batch_size on success?
- */
- unsigned int batch_thresh;
-
- /**
- * Should we delay the next request to the wire plugin a bit? Set to
- * false if we actually did some work.
- */
- bool delay;
-
- /**
- * Did we start a transaction yet?
- */
- bool started_transaction;
-
- /**
- * Is this shard still open for processing.
- */
- bool shard_open;
-};
+static const struct TALER_EXCHANGEDB_AccountInfo *ai;
+
+/**
+ * Active request for history.
+ */
+static struct TALER_BANK_CreditHistoryHandle *hh;
+
+/**
+ * Until when is processing this wire plugin delayed?
+ */
+static struct GNUNET_TIME_Absolute delayed_until;
+
+/**
+ * Encoded offset in the wire transfer list from where
+ * to start the next query with the bank.
+ */
+static uint64_t batch_start;
+
+/**
+ * Latest row offset seen in this transaction, becomes
+ * the new #batch_start upon commit.
+ */
+static uint64_t latest_row_off;
+
+/**
+ * Offset where our current shard begins (inclusive).
+ */
+static uint64_t shard_start;
+
+/**
+ * Offset where our current shard ends (exclusive).
+ */
+static uint64_t shard_end;
+
+/**
+ * When did we start with the shard?
+ */
+static struct GNUNET_TIME_Absolute shard_start_time;
+
+/**
+ * For how long did we lock the shard?
+ */
+static struct GNUNET_TIME_Absolute shard_end_time;
+
+/**
+ * How long did we take to finish the last shard
+ * for this account?
+ */
+static struct GNUNET_TIME_Relative shard_delay;
+
+/**
+ * Name of our job in the shard table.
+ */
+static char *job_name;
+
+/**
+ * How many transactions do we retrieve per batch?
+ */
+static unsigned int batch_size;
+/**
+ * How much do we increment @e batch_size on success?
+ */
+static unsigned int batch_thresh;
/**
- * Head of list of loaded wire plugins.
+ * Did work remain in the transaction queue? Set to true
+ * if we did some work and thus there might be more.
*/
-static struct WireAccount *wa_head;
+static bool progress;
/**
- * Tail of list of loaded wire plugins.
+ * Did we start a transaction yet?
*/
-static struct WireAccount *wa_tail;
+static bool started_transaction;
+
+/**
+ * Is this shard still open for processing.
+ */
+static bool shard_open;
/**
* Handle to the context for interacting with the bank.
@@ -227,6 +192,10 @@ static int ignore_account_404;
*/
static struct GNUNET_SCHEDULER_Task *task;
+/**
+ * Name of the configuration section with the account we should watch.
+ */
+static char *account_section;
/**
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
@@ -236,38 +205,27 @@ static struct GNUNET_SCHEDULER_Task *task;
static void
shutdown_task (void *cls)
{
+ enum GNUNET_DB_QueryStatus qs;
(void) cls;
- {
- struct WireAccount *wa;
- while (NULL != (wa = wa_head))
- {
- enum GNUNET_DB_QueryStatus qs;
-
- if (NULL != wa->hh)
- {
- TALER_BANK_credit_history_cancel (wa->hh);
- wa->hh = NULL;
- }
- GNUNET_CONTAINER_DLL_remove (wa_head,
- wa_tail,
- wa);
- if (wa->started_transaction)
- {
- db_plugin->rollback (db_plugin->cls);
- wa->started_transaction = false;
- }
- qs = db_plugin->abort_shard (db_plugin->cls,
- wa->job_name,
- wa->shard_start,
- wa->shard_end);
- if (qs <= 0)
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to abort work shard on shutdown\n");
- GNUNET_free (wa->job_name);
- GNUNET_free (wa);
- }
+ if (NULL != hh)
+ {
+ TALER_BANK_credit_history_cancel (hh);
+ hh = NULL;
+ }
+ if (started_transaction)
+ {
+ db_plugin->rollback (db_plugin->cls);
+ started_transaction = false;
}
+ qs = db_plugin->abort_shard (db_plugin->cls,
+ job_name,
+ shard_start,
+ shard_end);
+ if (qs <= 0)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to abort work shard on shutdown\n");
+ GNUNET_free (job_name);
if (NULL != ctx)
{
GNUNET_CURL_fini (ctx);
@@ -295,28 +253,36 @@ shutdown_task (void *cls)
* account to our list (if it is enabled and we can load the plugin).
*
* @param cls closure, NULL
- * @param ai account information
+ * @param in_ai account information
*/
static void
add_account_cb (void *cls,
- const struct TALER_EXCHANGEDB_AccountInfo *ai)
+ const struct TALER_EXCHANGEDB_AccountInfo *in_ai)
{
- struct WireAccount *wa;
-
(void) cls;
- if (! ai->credit_enabled)
+ if (! in_ai->credit_enabled)
+ return; /* not enabled for us, skip */
+ if ( (NULL != account_section) &&
+ (0 != strcasecmp (ai->section_name,
+ account_section)) )
return; /* not enabled for us, skip */
- wa = GNUNET_new (struct WireAccount);
- wa->ai = ai;
- GNUNET_asprintf (&wa->job_name,
+ if (NULL != ai)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Multiple accounts enabled (%s and %s), use '-a' command-line option to select one!\n",
+ ai->section_name,
+ in_ai->section_name);
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_INVALIDARGUMENT;
+ return;
+ }
+ ai = in_ai;
+ GNUNET_asprintf (&job_name,
"wirewatch-%s",
ai->section_name);
- wa->batch_size = MAXIMUM_BATCH_SIZE;
- if (0 != shard_size % wa->batch_size)
- wa->batch_size = shard_size;
- GNUNET_CONTAINER_DLL_insert (wa_head,
- wa_tail,
- wa);
+ batch_size = MAXIMUM_BATCH_SIZE;
+ if (0 != shard_size % batch_size)
+ batch_size = shard_size;
}
@@ -360,7 +326,16 @@ exchange_serve_process_config (void)
}
TALER_EXCHANGEDB_find_accounts (&add_account_cb,
NULL);
- GNUNET_assert (NULL != wa_head);
+ if (NULL == ai)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "No accounts enabled for credit!\n");
+ GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_INVALIDARGUMENT;
+ TALER_EXCHANGEDB_plugin_unload (db_plugin);
+ db_plugin = NULL;
+ return GNUNET_SYSERR;
+ }
return GNUNET_OK;
}
@@ -368,240 +343,111 @@ exchange_serve_process_config (void)
/**
* Lock a shard and then begin to query for incoming wire transfers.
*
- * @param cls a `struct WireAccount` to operate on
+ * @param cls NULL
*/
static void
lock_shard (void *cls);
/**
- * Continue with the credit history of the shard
- * reserved as @a wa.
+ * Continue with the credit history of the shard.
*
- * @param[in,out] cls `struct WireAccount *` account with shard to continue processing
+ * @param cls NULL
*/
static void
continue_with_shard (void *cls);
/**
- * We encountered a serialization error.
- * Rollback the transaction and try again
- *
- * @param wa account we are transacting on
+ * We encountered a serialization error. Rollback the transaction and try
+ * again.
*/
static void
-handle_soft_error (struct WireAccount *wa)
+handle_soft_error (void)
{
db_plugin->rollback (db_plugin->cls);
- wa->started_transaction = false;
- if (1 < wa->batch_size)
+ started_transaction = false;
+ if (1 < batch_size)
{
- wa->batch_thresh = wa->batch_size;
- wa->batch_size /= 2;
+ batch_thresh = batch_size;
+ batch_size /= 2;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Reduced batch size to %llu due to serialization issue\n",
- (unsigned long long) wa->batch_size);
+ (unsigned long long) batch_size);
}
/* Reset to beginning of transaction, and go again
from there. */
- wa->latest_row_off = wa->batch_start;
+ latest_row_off = batch_start;
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
- wa);
+ NULL);
}
/**
- * Schedule the #lock_shard() operation for
- * @a wa. If @a wa is NULL, start with #wa_head.
- *
- * @param wa account to schedule #lock_shard() for,
- * possibly NULL (!).
+ * Schedule the #lock_shard() operation.
*/
static void
-schedule_transfers (struct WireAccount *wa)
+schedule_transfers (void)
{
- if (NULL == wa)
- {
- wa = wa_head;
- GNUNET_assert (NULL != wa);
- }
- if (wa->shard_open)
+ if (shard_open)
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Will retry my shard (%llu,%llu] of %s in %s\n",
- (unsigned long long) wa->shard_start,
- (unsigned long long) wa->shard_end,
- wa->job_name,
+ (unsigned long long) shard_start,
+ (unsigned long long) shard_end,
+ job_name,
GNUNET_STRINGS_relative_time_to_string (
- GNUNET_TIME_absolute_get_remaining (wa->delayed_until),
- GNUNET_YES));
+ GNUNET_TIME_absolute_get_remaining (delayed_until),
+ true));
else
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Will try to lock next shard of %s in %s\n",
- wa->job_name,
+ job_name,
GNUNET_STRINGS_relative_time_to_string (
- GNUNET_TIME_absolute_get_remaining (wa->delayed_until),
- GNUNET_YES));
+ GNUNET_TIME_absolute_get_remaining (delayed_until),
+ true));
GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_at (wa->delayed_until,
+ task = GNUNET_SCHEDULER_add_at (delayed_until,
&lock_shard,
- wa);
+ NULL);
}
/**
- * We are done with the work that is possible on @a wa right now (and the
- * transaction was committed, if there was one to commit). Move on to the next
- * account.
- *
- * @param wa wire account for which we completed a shard
+ * We are done with the work that is possible right now (and the transaction
+ * was committed, if there was one to commit). Move on to the next shard.
*/
static void
-account_completed (struct WireAccount *wa)
+transaction_completed (void)
{
- GNUNET_assert (! wa->started_transaction);
- if ( (wa->batch_start + wa->batch_size ==
- wa->latest_row_off) &&
- (wa->batch_size < MAXIMUM_BATCH_SIZE) )
+ GNUNET_assert (! started_transaction);
+ if ( (batch_start + batch_size ==
+ latest_row_off) &&
+ (batch_size < MAXIMUM_BATCH_SIZE) )
{
/* The current batch size worked without serialization
issues, and we are allowed to grow. Do so slowly. */
int delta;
- delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4;
+ delta = ((int) batch_thresh - (int) batch_size) / 4;
if (delta < 0)
delta = -delta;
- wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
- wa->batch_size + delta + 1);
+ batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
+ batch_size + delta + 1);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Increasing batch size to %llu\n",
- (unsigned long long) wa->batch_size);
- }
-
- if (wa->delay)
- {
- /* This account was finished, block this one for the
- #wirewatch_idle_sleep_interval and move on to the next one. */
- wa->delayed_until
- = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
- wa = wa->next;
+ (unsigned long long) batch_size);
}
- GNUNET_assert (NULL == task);
- schedule_transfers (wa);
-}
-
-/**
- * Check if we are finished with the current shard. If so, update the
- * database, marking the shard as finished.
- *
- * @param wa wire account to commit for
- * @return true if we were indeed done with the shard
- */
-static bool
-check_shard_done (struct WireAccount *wa)
-{
- enum GNUNET_DB_QueryStatus qs;
-
- if (wa->shard_end > wa->latest_row_off)
+ if ( (! progress) && test_mode)
{
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Shard %s (%llu,%llu] at %llu\n",
- wa->job_name,
- (unsigned long long) wa->shard_start,
- (unsigned long long) wa->shard_end,
- (unsigned long long) wa->latest_row_off);
- return false; /* actually, not done! */
- }
- /* shard is complete, mark this as well */
- qs = db_plugin->complete_shard (db_plugin->cls,
- wa->job_name,
- wa->shard_start,
- wa->shard_end);
- switch (qs)
- {
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls);
- GNUNET_SCHEDULER_shutdown ();
- return false;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Got DB soft error for complete_shard. Rolling back.\n");
- handle_soft_error (wa);
- return false;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- GNUNET_break (0);
- /* Not expected, but let's just continue */
- break;
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- /* normal case */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Completed shard %s (%llu,%llu] after %s\n",
- wa->job_name,
- (unsigned long long) wa->shard_start,
- (unsigned long long) wa->shard_end,
- GNUNET_STRINGS_relative_time_to_string (
- GNUNET_TIME_absolute_get_duration (wa->shard_start_time),
- GNUNET_YES));
- break;
- }
- return true;
-}
-
-
-/**
- * We are finished with the current transaction, try
- * to commit and then schedule the next iteration.
- *
- * @param wa wire account to commit for
- */
-static void
-do_commit (struct WireAccount *wa)
-{
- enum GNUNET_DB_QueryStatus qs;
- bool shard_done;
-
- GNUNET_assert (NULL == task);
- shard_done = check_shard_done (wa);
- wa->started_transaction = false;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Committing %s progress (%llu,%llu] at %llu\n (%s)",
- wa->job_name,
- (unsigned long long) wa->shard_start,
- (unsigned long long) wa->shard_end,
- (unsigned long long) wa->latest_row_off,
- shard_done
- ? "shard done"
- : "shard incomplete");
- qs = db_plugin->commit (db_plugin->cls);
- switch (qs)
- {
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
+ /* Transaction list was drained and we are in
+ test mode. So we are done. */
GNUNET_SCHEDULER_shutdown ();
return;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- /* reduce transaction size to reduce rollback probability */
- handle_soft_error (wa);
- return;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- /* normal case */
- break;
- }
- if (shard_done)
- {
- wa->shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
- wa->shard_open = false;
- account_completed (wa);
- }
- else
- {
- task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
- wa);
}
+ GNUNET_assert (NULL == task);
+ schedule_transfers ();
}
@@ -609,18 +455,24 @@ do_commit (struct WireAccount *wa)
* We got incoming transaction details from the bank. Add them
* to the database.
*
- * @param wa wire account we are handling
* @param details array of transaction details
* @param details_length length of the @a details array
- * @return true on success
*/
-static bool
-process_reply (struct WireAccount *wa,
- const struct TALER_BANK_CreditDetails *details,
+static void
+process_reply (const struct TALER_BANK_CreditDetails *details,
unsigned int details_length)
{
- uint64_t lroff = wa->latest_row_off;
+ enum GNUNET_DB_QueryStatus qs;
+ bool shard_done;
+ uint64_t lroff = latest_row_off;
+ if (0 == details_length)
+ {
+ /* Server should have used 204, not 200! */
+ GNUNET_break_op (0);
+ transaction_completed ();
+ return;
+ }
/* check serial IDs for range constraints */
for (unsigned int i = 0; i<details_length; i++)
{
@@ -634,16 +486,9 @@ process_reply (struct WireAccount *wa,
(unsigned long long) lroff);
db_plugin->rollback (db_plugin->cls);
GNUNET_SCHEDULER_shutdown ();
- wa->hh = NULL;
- return false;
- }
- if (cd->serial_id >= wa->max_row_off)
- {
- /* We got 'limit' transactions back from the bank, so we should not
- introduce any delay before the next call. */
- wa->delay = false;
+ return;
}
- if (cd->serial_id > wa->shard_end)
+ if (cd->serial_id > shard_end)
{
/* we are *past* the current shard (likely because the serial_id of the
shard_end happens to not exist in the DB). So commit and stop this
@@ -651,19 +496,14 @@ process_reply (struct WireAccount *wa,
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serial ID %llu past shard end at %llu, ending iteration early!\n",
(unsigned long long) cd->serial_id,
- (unsigned long long) wa->shard_end);
+ (unsigned long long) shard_end);
details_length = i;
- wa->delay = false;
+ progress = true;
+ lroff = cd->serial_id - 1;
break;
}
lroff = cd->serial_id;
}
- if (0 == details_length)
- {
- /* Server should have used 204, not 200! */
- GNUNET_break_op (0);
- return true;
- }
if (GNUNET_OK !=
db_plugin->start_read_committed (db_plugin->cls,
"wirewatch check for incoming wire transfers"))
@@ -672,15 +512,13 @@ process_reply (struct WireAccount *wa,
"Failed to start database transaction!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
- wa->hh = NULL;
- return false;
+ return;
}
- wa->started_transaction = true;
+ started_transaction = true;
for (unsigned int i = 0; i<details_length; i++)
{
const struct TALER_BANK_CreditDetails *cd = &details[i];
- enum GNUNET_DB_QueryStatus qs;
/* FIXME #7276: Consider using Postgres multi-valued insert here,
for up to 15x speed-up according to
@@ -692,23 +530,19 @@ process_reply (struct WireAccount *wa,
&cd->amount,
cd->execution_date,
cd->debit_account_uri,
- wa->ai->section_name,
+ ai->section_name,
cd->serial_id);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls);
- wa->started_transaction = false;
GNUNET_SCHEDULER_shutdown ();
- wa->hh = NULL;
- return false;
+ return;
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got DB soft error for reserves_in_insert. Rolling back.\n");
- handle_soft_error (wa);
- wa->hh = NULL;
- return true;
+ handle_soft_error ();
+ return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* Either wirewatch was freshly started after the system was
shutdown and we're going over an incomplete shard again
@@ -720,25 +554,92 @@ process_reply (struct WireAccount *wa,
"Attempted to import transaction %llu (%s) twice. "
"This should happen rarely (if not, ask for support).\n",
(unsigned long long) cd->serial_id,
- wa->job_name);
+ job_name);
db_plugin->rollback (db_plugin->cls);
- wa->latest_row_off = cd->serial_id;
- wa->started_transaction = false;
+ started_transaction = false;
/* already existed, ok, let's just continue */
- return true;
+ return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- wa->latest_row_off = cd->serial_id;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Imported transaction %llu.",
+ (unsigned long long) cd->serial_id);
/* normal case */
break;
}
}
- do_commit (wa);
- if (check_shard_done (wa))
- account_completed (wa);
- else
- task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
- wa);
- return true;
+ latest_row_off = lroff;
+ shard_done = (shard_end <= latest_row_off);
+ if (shard_done)
+ {
+ /* shard is complete, mark this as well */
+ qs = db_plugin->complete_shard (db_plugin->cls,
+ job_name,
+ shard_start,
+ shard_end);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for complete_shard. Rolling back.\n");
+ handle_soft_error ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0);
+ /* Not expected, but let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Completed shard %s (%llu,%llu] after %s\n",
+ job_name,
+ (unsigned long long) shard_start,
+ (unsigned long long) shard_end,
+ GNUNET_STRINGS_relative_time_to_string (
+ GNUNET_TIME_absolute_get_duration (shard_start_time),
+ true));
+ break;
+ }
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Committing %s progress (%llu,%llu] at %llu\n (%s)",
+ job_name,
+ (unsigned long long) shard_start,
+ (unsigned long long) shard_end,
+ (unsigned long long) latest_row_off,
+ shard_done
+ ? "shard done"
+ : "shard incomplete");
+ qs = db_plugin->commit (db_plugin->cls);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* reduce transaction size to reduce rollback probability */
+ handle_soft_error ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ started_transaction = false;
+ /* normal case */
+ break;
+ }
+ if (shard_done)
+ {
+ shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
+ shard_open = false;
+ transaction_completed ();
+ return;
+ }
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+ NULL);
}
@@ -746,76 +647,75 @@ process_reply (struct WireAccount *wa,
* Callbacks of this type are used to serve the result of asking
* the bank for the transaction history.
*
- * @param cls closure with the `struct WireAccount *` we are processing
+ * @param cls NULL
* @param reply response we got from the bank
*/
static void
history_cb (void *cls,
const struct TALER_BANK_CreditHistoryResponse *reply)
{
- struct WireAccount *wa = cls;
- bool ok;
-
+ (void) cls;
GNUNET_assert (NULL == task);
- wa->hh = NULL;
+ hh = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "History request returned with HTTP status %u\n",
+ reply->http_status);
switch (reply->http_status)
{
- case 0:
- ok = false;
case MHD_HTTP_OK:
- ok = process_reply (wa,
- reply->details.success.details,
- reply->details.success.details_length);
- break;
+ process_reply (reply->details.success.details,
+ reply->details.success.details_length);
+ return;
case MHD_HTTP_NO_CONTENT:
- ok = true;
- break;
+ transaction_completed ();
+ return;
case MHD_HTTP_NOT_FOUND:
- ok = ignore_account_404;
+ if (ignore_account_404)
+ {
+ transaction_completed ();
+ return;
+ }
break;
default:
- ok = false;
break;
}
-
- if (! ok)
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error fetching history: %s (%u)\n",
+ TALER_ErrorCode_get_hint (reply->ec),
+ reply->http_status);
+ if (! exit_on_error)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Error fetching history: %s (%u)\n",
- TALER_ErrorCode_get_hint (reply->ec),
- reply->http_status);
- if (! (exit_on_error || test_mode) )
- {
- account_completed (wa);
- return;
- }
- GNUNET_SCHEDULER_shutdown ();
+ transaction_completed ();
return;
}
+ GNUNET_SCHEDULER_shutdown ();
}
static void
continue_with_shard (void *cls)
{
- struct WireAccount *wa = cls;
unsigned int limit;
+ (void) cls;
task = NULL;
- limit = GNUNET_MIN (wa->batch_size,
- wa->shard_end - wa->latest_row_off);
- wa->max_row_off = wa->latest_row_off + limit;
- GNUNET_assert (NULL == wa->hh);
- wa->hh = TALER_BANK_credit_history (ctx,
- wa->ai->auth,
- wa->latest_row_off,
- limit,
- test_mode
- ? GNUNET_TIME_UNIT_ZERO
- : LONGPOLL_TIMEOUT,
- &history_cb,
- wa);
- if (NULL == wa->hh)
+ GNUNET_assert (shard_end > latest_row_off);
+ limit = GNUNET_MIN (batch_size,
+ shard_end - latest_row_off);
+ GNUNET_assert (NULL == hh);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Requesting credit history staring from %llu\n",
+ (unsigned long long) latest_row_off);
+ hh = TALER_BANK_credit_history (ctx,
+ ai->auth,
+ latest_row_off,
+ limit,
+ test_mode
+ ? GNUNET_TIME_UNIT_ZERO
+ : LONGPOLL_TIMEOUT,
+ &history_cb,
+ NULL);
+ if (NULL == hh)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start request for account history!\n");
@@ -829,12 +729,12 @@ continue_with_shard (void *cls)
static void
lock_shard (void *cls)
{
- struct WireAccount *wa = cls;
enum GNUNET_DB_QueryStatus qs;
struct GNUNET_TIME_Relative delay;
- uint64_t last_shard_start = wa->shard_start;
- uint64_t last_shard_end = wa->shard_end;
+ uint64_t last_shard_start = shard_start;
+ uint64_t last_shard_end = shard_end;
+ (void) cls;
task = NULL;
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
@@ -845,17 +745,16 @@ lock_shard (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
- if ( (wa->shard_open) &&
- (GNUNET_TIME_absolute_is_future (wa->shard_end_time)) )
+ if ( (shard_open) &&
+ (GNUNET_TIME_absolute_is_future (shard_end_time)) )
{
- wa->delay = true; /* default is to delay, unless
- we find out that we're really busy */
- wa->batch_start = wa->latest_row_off;
+ progress = false;
+ batch_start = latest_row_off;
task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
- wa);
+ NULL);
return;
}
- if (wa->shard_open)
+ if (shard_open)
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Shard not completed in time, will try to re-acquire\n");
/* How long we lock a shard depends on the number of
@@ -868,15 +767,15 @@ lock_shard (void *cls)
GNUNET_CRYPTO_QUALITY_WEAK,
4 * GNUNET_TIME_relative_max (
wirewatch_idle_sleep_interval,
- GNUNET_TIME_relative_multiply (wa->shard_delay,
+ GNUNET_TIME_relative_multiply (shard_delay,
max_workers)).rel_value_us);
- wa->shard_start_time = GNUNET_TIME_absolute_get ();
+ shard_start_time = GNUNET_TIME_absolute_get ();
qs = db_plugin->begin_shard (db_plugin->cls,
- wa->job_name,
+ job_name,
delay,
shard_size,
- &wa->shard_start,
- &wa->shard_end);
+ &shard_start,
+ &shard_end);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
@@ -893,52 +792,51 @@ lock_shard (void *cls)
rdelay = GNUNET_TIME_randomize (wirewatch_idle_sleep_interval);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Serialization error tying to obtain shard %s, will try again in %s!\n",
- wa->job_name,
+ job_name,
GNUNET_STRINGS_relative_time_to_string (rdelay,
- GNUNET_YES));
- wa->delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);
+ true));
+ delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);
}
GNUNET_assert (NULL == task);
- schedule_transfers (wa->next);
+ schedule_transfers ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_break (0);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"No shard available, will try again for %s in %s!\n",
- wa->job_name,
+ job_name,
GNUNET_STRINGS_relative_time_to_string (
wirewatch_idle_sleep_interval,
GNUNET_YES));
- wa->delayed_until = GNUNET_TIME_relative_to_absolute (
+ delayed_until = GNUNET_TIME_relative_to_absolute (
wirewatch_idle_sleep_interval);
- wa->shard_open = false;
+ shard_open = false;
GNUNET_assert (NULL == task);
- schedule_transfers (wa->next);
+ schedule_transfers ();
return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* continued below */
break;
}
- wa->shard_end_time = GNUNET_TIME_relative_to_absolute (delay);
+ shard_end_time = GNUNET_TIME_relative_to_absolute (delay);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting with shard %s at (%llu,%llu] locked for %s\n",
- wa->job_name,
- (unsigned long long) wa->shard_start,
- (unsigned long long) wa->shard_end,
+ job_name,
+ (unsigned long long) shard_start,
+ (unsigned long long) shard_end,
GNUNET_STRINGS_relative_time_to_string (delay,
- GNUNET_YES));
- wa->delay = true; /* default is to delay, unless
- we find out that we're really busy */
- wa->batch_start = wa->shard_start;
- if ( (wa->shard_open) &&
- (wa->shard_start == last_shard_start) &&
- (wa->shard_end == last_shard_end) )
- GNUNET_break (wa->latest_row_off >= wa->batch_start); /* resume where we left things */
+ true));
+ progress = false;
+ batch_start = shard_start;
+ if ( (shard_open) &&
+ (shard_start == last_shard_start) &&
+ (shard_end == last_shard_end) )
+ GNUNET_break (latest_row_off >= batch_start); /* resume where we left things */
else
- wa->latest_row_off = wa->batch_start;
- wa->shard_open = true;
+ latest_row_off = batch_start;
+ shard_open = true;
task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
- wa);
+ NULL);
}
@@ -961,14 +859,15 @@ run (void *cls,
(void) cfgfile;
cfg = c;
+ GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+ cls);
if (GNUNET_OK !=
exchange_serve_process_config ())
{
global_ret = EXIT_NOTCONFIGURED;
+ GNUNET_SCHEDULER_shutdown ();
return;
}
- GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
- cls);
ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
&rc);
if (NULL == ctx)
@@ -978,9 +877,7 @@ run (void *cls,
return;
}
rc = GNUNET_CURL_gnunet_rc_create (ctx);
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&lock_shard,
- wa_head);
+ schedule_transfers ();
}
@@ -996,6 +893,11 @@ main (int argc,
char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_option_string ('a',
+ "account",
+ "SECTION_NAME",
+ "name of the configuration section with the account we should watch (needed if more than one is enabled for crediting)",
+ &account_section),
GNUNET_GETOPT_option_flag ('e',
"exit-on-error",
"terminate wirewatch if we failed to download information from the bank",