summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/exchange/taler-exchange-wirewatch.c445
1 files changed, 244 insertions, 201 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index 898d678a3..21d2df150 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -107,6 +107,12 @@ struct WireAccount
struct GNUNET_TIME_Absolute shard_start_time;
/**
+ * How long did we take to finish the last shard
+ * for this account?
+ */
+ struct GNUNET_TIME_Relative shard_delay;
+
+ /**
* Name of our job in the shard table.
*/
char *job_name;
@@ -117,16 +123,11 @@ struct WireAccount
unsigned int batch_size;
/**
- * How much do we incremnt @e batch_size on success?
+ * How much do we increment @e batch_size on success?
*/
unsigned int batch_thresh;
/**
- * How many transactions did we see in the current batch?
- */
- unsigned int current_batch_size;
-
- /**
* Should we delay the next request to the wire plugin a bit? Set to
* false if we actually did some work.
*/
@@ -151,12 +152,6 @@ static struct WireAccount *wa_head;
static struct WireAccount *wa_tail;
/**
- * Wire account we are currently processing. This would go away
- * if we ever start processing all accounts in parallel.
- */
-static struct WireAccount *wa_pos;
-
-/**
* Handle to the context for interacting with the bank.
*/
static struct GNUNET_CURL_Context *ctx;
@@ -185,11 +180,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
- * How long did we take to finish the last shard?
- */
-static struct GNUNET_TIME_Relative shard_delay;
-
-/**
* Modulus to apply to group shards. The shard size must ultimately be a
* multiple of the batch size. Thus, if this is not a multiple of the
* #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
@@ -249,9 +239,9 @@ shutdown_task (void *cls)
wa->started_transaction = false;
}
qs = db_plugin->abort_shard (db_plugin->cls,
- wa_pos->job_name,
- wa_pos->shard_start,
- wa_pos->shard_end);
+ wa->job_name,
+ wa->shard_start,
+ wa->shard_end);
if (qs <= 0)
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to abort work shard on shutdown\n");
@@ -259,8 +249,6 @@ shutdown_task (void *cls)
GNUNET_free (wa);
}
}
- wa_pos = NULL;
-
if (NULL != ctx)
{
GNUNET_CURL_fini (ctx);
@@ -359,12 +347,22 @@ exchange_serve_process_config (void)
/**
- * Query for incoming wire transfers.
+ * Lock a shard and then begin to query for incoming wire transfers.
*
- * @param cls NULL
+ * @param cls a `struct WireAccount` to operate on
*/
static void
-find_transfers (void *cls);
+lock_shard (void *cls);
+
+
+/**
+ * Continue with the credit history of the shard
+ * reserved as @a wa.
+ *
+ * @param[in,out] cls `struct WireAccount *` account with shard to continue processing
+ */
+static void
+continue_with_shard (void *cls);
/**
@@ -387,23 +385,59 @@ handle_soft_error (struct WireAccount *wa)
(unsigned long long) wa->batch_size);
}
GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
+ /* Reset to beginning of transaction, and go again
+ from there. */
+ wa->latest_row_off = wa->batch_start;
+ task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+ wa);
}
/**
- * We are done with a shard, move on to the next one.
+ * Schedule the #lock_shard() operation for
+ * @a wa. If @a wa is NULL, start with #wa_head.
+ *
+ * @param wa account to schedule #lock_shard() for,
+ * possibly NULL (!).
+ */
+static void
+schedule_transfers (struct WireAccount *wa)
+{
+ if (NULL == wa)
+ {
+ wa = wa_head;
+ GNUNET_assert (NULL != wa);
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Will try to lock next shard of %s in %s\n",
+ wa->job_name,
+ GNUNET_STRINGS_relative_time_to_string (
+ GNUNET_TIME_absolute_get_remaining (wa->delayed_until),
+ GNUNET_YES));
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_at (wa->delayed_until,
+ &lock_shard,
+ wa);
+}
+
+
+/**
+ * We are done with the work that is possible on @a wa right now (and the
+ * transaction was committed, if there was one to commit). Move on to the next
+ * account.
*
* @param wa wire account for which we completed a shard
*/
static void
-shard_completed (struct WireAccount *wa)
+account_completed (struct WireAccount *wa)
{
- /* transaction success, update #last_row_off */
- wa->batch_start = wa->latest_row_off;
- if (wa->batch_size < MAXIMUM_BATCH_SIZE)
+ GNUNET_assert (! wa->started_transaction);
+ if ( (wa->batch_start + wa->batch_size ==
+ wa->latest_row_off) &&
+ (wa->batch_size < MAXIMUM_BATCH_SIZE) )
{
+ /* The current batch size worked without serialization
+ issues, and we are allowed to grow. Do so slowly. */
int delta;
delta = ((int) wa->batch_thresh - (int) wa->batch_size) / 4;
@@ -411,45 +445,45 @@ shard_completed (struct WireAccount *wa)
delta = -delta;
wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
wa->batch_size + delta + 1);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Increasing batch size to %llu\n",
(unsigned long long) wa->batch_size);
}
+
if (wa->delay)
{
+ /* This account was finished, block this one for the
+ #wirewatch_idle_sleep_interval and move on to the next one. */
wa->delayed_until
= GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
- wa_pos = wa_pos->next;
- if (NULL == wa_pos)
- wa_pos = wa_head;
- GNUNET_assert (NULL != wa_pos);
+ wa = wa->next;
}
- GNUNET_assert (NULL == task);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Will look for more transfers in %s\n",
- GNUNET_STRINGS_relative_time_to_string (
- GNUNET_TIME_absolute_get_remaining (wa_pos->delayed_until),
- GNUNET_YES));
- task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
- &find_transfers,
- NULL);
+ schedule_transfers (wa);
}
/**
- * We are finished with the current shard. Update the database, marking the
- * shard as finished.
+ * Check if we are finished with the current shard. If so, update the
+ * database, marking the shard as finished.
*
* @param wa wire account to commit for
- * @return true on success
+ * @return true if we were indeed done with the shard
*/
static bool
-mark_shard_done (struct WireAccount *wa)
+check_shard_done (struct WireAccount *wa)
{
enum GNUNET_DB_QueryStatus qs;
if (wa->shard_end > wa->latest_row_off)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Shard %s (%llu,%llu] at %llu\n",
+ wa->job_name,
+ (unsigned long long) wa->shard_start,
+ (unsigned long long) wa->shard_end,
+ (unsigned long long) wa->latest_row_off);
return false; /* actually, not done! */
+ }
/* shard is complete, mark this as well */
qs = db_plugin->complete_shard (db_plugin->cls,
wa->job_name,
@@ -468,12 +502,19 @@ mark_shard_done (struct WireAccount *wa)
handle_soft_error (wa);
return false;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- /* already existed, ok, let's just continue */
+ GNUNET_break (0);
+ /* Not expected, but let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
- shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
-
+ wa->shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Completed shard %s (%llu,%llu] after %s\n",
+ wa->job_name,
+ (unsigned long long) wa->shard_start,
+ (unsigned long long) wa->shard_end,
+ GNUNET_STRINGS_relative_time_to_string (wa->shard_delay,
+ GNUNET_YES));
break;
}
return true;
@@ -481,16 +522,6 @@ mark_shard_done (struct WireAccount *wa)
/**
- * Continue with the credit history of the shard
- * reserved as @a wa_pos.
- *
- * @param[in,out] wa_pos shard to continue processing
- */
-static void
-continue_with_shard (struct WireAccount *wa_pos);
-
-
-/**
* We are finished with the current transaction, try
* to commit and then schedule the next iteration.
*
@@ -502,8 +533,17 @@ do_commit (struct WireAccount *wa)
enum GNUNET_DB_QueryStatus qs;
bool shard_done;
+ shard_done = check_shard_done (wa);
wa->started_transaction = false;
- shard_done = mark_shard_done (wa);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Committing %s progress (%llu,%llu] at %llu\n (%s)",
+ wa->job_name,
+ (unsigned long long) wa->shard_start,
+ (unsigned long long) wa->shard_end,
+ (unsigned long long) wa->latest_row_off,
+ shard_done
+ ? "shard done"
+ : "shard incomplete");
qs = db_plugin->commit (db_plugin->cls);
switch (qs)
{
@@ -521,7 +561,7 @@ do_commit (struct WireAccount *wa)
break;
}
if (shard_done)
- shard_completed (wa);
+ account_completed (wa);
else
continue_with_shard (wa);
}
@@ -568,63 +608,67 @@ history_cb (void *cls,
}
if (wa->started_transaction)
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "End of list. Committing progress!\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "End of list. Committing progress on %s of (%llu,%llu]!\n",
+ wa->job_name,
+ (unsigned long long) wa->batch_start,
+ (unsigned long long) wa->latest_row_off);
do_commit (wa);
+ return GNUNET_OK; /* will be ignored anyway */
}
- else
+ /* We did not even start a transaction. */
+ if ( (wa->delay) &&
+ (test_mode) &&
+ (NULL == wa->next) )
{
- if ( (wa->delay) &&
- (test_mode) &&
- (NULL == wa->next) )
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Shutdown due to test mode!\n");
- GNUNET_SCHEDULER_shutdown ();
- return GNUNET_OK;
- }
- else
- {
- shard_completed (wa);
- }
+ /* We exit on idle */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Shutdown due to test mode!\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return GNUNET_OK;
}
+ account_completed (wa);
return GNUNET_OK; /* will be ignored anyway */
}
+
+ /* We did get 'details' from the bank. Do sanity checks before inserting. */
if (serial_id < wa->latest_row_off)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Serial ID %llu not monotonic (got %llu before). Failing!\n",
(unsigned long long) serial_id,
(unsigned long long) wa->latest_row_off);
- if (wa->started_transaction)
- {
- wa->started_transaction = false;
- db_plugin->rollback (db_plugin->cls);
- }
GNUNET_SCHEDULER_shutdown ();
wa->hh = NULL;
return GNUNET_SYSERR;
}
+ /* If we got 'limit' transactions back from the bank,
+ we should not introduce any delay before the next
+ call. */
if (serial_id >= wa->max_row_off)
wa->delay = false;
if (serial_id > wa->shard_end)
{
- /* we are done with the current shard, commit and stop this iteration! */
+ /* we are *past* the current shard (likely because the serial_id of the
+ shard_end happens to not exist in the DB). So commit and stop this
+ iteration! */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serial ID %llu past shard end at %llu, ending iteration early!\n",
(unsigned long long) serial_id,
(unsigned long long) wa->shard_end);
- wa->latest_row_off = serial_id;
+ wa->latest_row_off = serial_id - 1; /* excluding serial_id! */
+ wa->hh = NULL;
if (wa->started_transaction)
{
do_commit (wa);
}
else
{
- if (mark_shard_done (wa))
- shard_completed (wa);
+ if (check_shard_done (wa))
+ account_completed (wa);
+ else
+ continue_with_shard (wa);
}
- wa->hh = NULL;
return GNUNET_SYSERR;
}
if (! wa->started_transaction)
@@ -640,7 +684,6 @@ history_cb (void *cls,
wa->hh = NULL;
return GNUNET_SYSERR;
}
- wa_pos->shard_start_time = GNUNET_TIME_absolute_get ();
wa->started_transaction = true;
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
@@ -675,6 +718,17 @@ history_cb (void *cls,
wa->hh = NULL;
return GNUNET_SYSERR;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* Either wirewatch was freshly started after the system was
+ shutdown and we're going over an incomplete shard again
+ after being restarted, or the shard lock period was too
+ short (number of workers set incorrectly?) and a 2nd
+ wirewatcher has been stealing our work while we are still
+ at it. */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Attempted to import transaction %llu (%s) twice. "
+ "This should happen rarely (if not, ask for support).\n",
+ (unsigned long long) serial_id,
+ wa->job_name);
/* already existed, ok, let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
@@ -686,130 +740,121 @@ history_cb (void *cls,
}
-/**
- * Query for incoming wire transfers.
- *
- * @param cls NULL
- */
static void
-find_transfers (void *cls)
+continue_with_shard (void *cls)
{
- enum GNUNET_DB_QueryStatus qs;
+ struct WireAccount *wa = cls;
+ unsigned int limit;
- (void) cls;
- task = NULL;
- if (GNUNET_SYSERR ==
- db_plugin->preflight (db_plugin->cls))
+ limit = GNUNET_MIN (wa->batch_size,
+ wa->shard_end - wa->latest_row_off);
+ wa->max_row_off = wa->latest_row_off + limit;
+ GNUNET_assert (NULL == wa->hh);
+ wa->hh = TALER_BANK_credit_history (ctx,
+ wa->ai->auth,
+ wa->latest_row_off,
+ limit,
+ test_mode
+ ? GNUNET_TIME_UNIT_ZERO
+ : LONGPOLL_TIMEOUT,
+ &history_cb,
+ wa);
+ if (NULL == wa->hh)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to obtain database connection!\n");
+ "Failed to start request for account history!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
- wa_pos->delay = true;
- wa_pos->current_batch_size = 0; /* reset counter */
- if (wa_pos->shard_end <= wa_pos->batch_start)
- {
- uint64_t start;
- uint64_t end;
- struct GNUNET_TIME_Relative delay;
- /* advance to next shard */
-
- if (0 == max_workers)
- delay = GNUNET_TIME_UNIT_ZERO;
- else
- delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
- GNUNET_CRYPTO_QUALITY_WEAK,
- 4 * GNUNET_TIME_relative_max (
- wirewatch_idle_sleep_interval,
- GNUNET_TIME_relative_multiply (shard_delay,
- max_workers)).rel_value_us);
- qs = db_plugin->begin_shard (db_plugin->cls,
- wa_pos->job_name,
- delay,
- shard_size,
- &start,
- &end);
- switch (qs)
- {
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to obtain starting point for montoring from database!\n");
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- return;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- /* try again */
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Serialization error tying to obtain shard, will try again in %s!\n",
- GNUNET_STRINGS_relative_time_to_string (
- wirewatch_idle_sleep_interval,
- GNUNET_YES));
- task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
- &find_transfers,
- NULL);
- return;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- GNUNET_break (0);
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "No shard available, will try again in %s!\n",
- GNUNET_STRINGS_relative_time_to_string (
- wirewatch_idle_sleep_interval,
- GNUNET_YES));
- task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
- &find_transfers,
- NULL);
- return;
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- wa_pos->shard_start = start;
- wa_pos->shard_end = end;
- wa_pos->batch_start = start;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Starting with shard at [%llu,%llu) locked for %s\n",
- (unsigned long long) start,
- (unsigned long long) end,
- GNUNET_STRINGS_relative_time_to_string (delay,
- GNUNET_YES));
- break;
- }
- }
- wa_pos->latest_row_off = wa_pos->batch_start;
- continue_with_shard (wa_pos);
}
static void
-continue_with_shard (struct WireAccount *wa_pos)
+lock_shard (void *cls)
{
- unsigned int limit;
+ struct WireAccount *wa = cls;
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_TIME_Relative delay;
- limit = GNUNET_MIN (wa_pos->batch_size,
- wa_pos->shard_end - wa_pos->latest_row_off);
- GNUNET_assert (NULL == wa_pos->hh);
- wa_pos->max_row_off = wa_pos->latest_row_off + limit - 1;
- wa_pos->hh = TALER_BANK_credit_history (ctx,
- wa_pos->ai->auth,
- wa_pos->latest_row_off,
- limit,
- test_mode
- ? GNUNET_TIME_UNIT_ZERO
- : LONGPOLL_TIMEOUT,
- &history_cb,
- wa_pos);
- if (NULL == wa_pos->hh)
+ task = NULL;
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to start request for account history!\n");
- if (wa_pos->started_transaction)
- {
- db_plugin->rollback (db_plugin->cls);
- wa_pos->started_transaction = false;
- }
+ "Failed to obtain database connection!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ /* How long we lock a shard depends on the number of
+ workers expected, and how long we usually took to
+ process a shard. */
+ if (0 == max_workers)
+ delay = GNUNET_TIME_UNIT_ZERO;
+ else
+ delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
+ GNUNET_CRYPTO_QUALITY_WEAK,
+ 4 * GNUNET_TIME_relative_max (
+ wirewatch_idle_sleep_interval,
+ GNUNET_TIME_relative_multiply (wa->shard_delay,
+ max_workers)).rel_value_us);
+ wa->shard_start_time = GNUNET_TIME_absolute_get ();
+ qs = db_plugin->begin_shard (db_plugin->cls,
+ wa->job_name,
+ delay,
+ shard_size,
+ &wa->shard_start,
+ &wa->shard_end);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain starting point for montoring from database!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* try again */
+ GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization error tying to obtain shard %s, will try again in %s!\n",
+ wa->job_name,
+ GNUNET_STRINGS_relative_time_to_string (
+ wirewatch_idle_sleep_interval,
+ GNUNET_YES));
+ wa->delayed_until = GNUNET_TIME_relative_to_absolute (
+ wirewatch_idle_sleep_interval);
+ schedule_transfers (wa->next);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0);
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "No shard available, will try again for %s in %s!\n",
+ wa->job_name,
+ GNUNET_STRINGS_relative_time_to_string (
+ wirewatch_idle_sleep_interval,
+ GNUNET_YES));
+ wa->delayed_until = GNUNET_TIME_relative_to_absolute (
+ wirewatch_idle_sleep_interval);
+ schedule_transfers (wa->next);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* continued below */
+ break;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Starting with shard %s at (%llu,%llu] locked for %s\n",
+ wa->job_name,
+ (unsigned long long) wa->shard_start,
+ (unsigned long long) wa->shard_end,
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
+ wa->delay = true; /* default is to delay, unless
+ we find out that we're really busy */
+ wa->batch_start = wa->shard_start;
+ wa->latest_row_off = wa->batch_start;
+ continue_with_shard (wa);
}
@@ -838,21 +883,19 @@ run (void *cls,
global_ret = EXIT_NOTCONFIGURED;
return;
}
- wa_pos = wa_head;
- GNUNET_assert (NULL != wa_pos);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
cls);
ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
&rc);
- rc = GNUNET_CURL_gnunet_rc_create (ctx);
if (NULL == ctx)
{
GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
return;
}
-
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
+ rc = GNUNET_CURL_gnunet_rc_create (ctx);
+ task = GNUNET_SCHEDULER_add_now (&lock_shard,
+ wa_head);
}