summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-wirewatch.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-wirewatch.c')
-rw-r--r--src/exchange/taler-exchange-wirewatch.c414
1 files changed, 264 insertions, 150 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index 28fa81e7e..5d35eba57 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -29,13 +29,12 @@
#include "taler_json_lib.h"
#include "taler_bank_service.h"
-#define DEBUG_LOGGING 0
/**
- * What is the initial batch size we use for credit history
+ * What is the maximum batch size we use for credit history
* requests with the bank. See `batch_size` below.
*/
-#define INITIAL_BATCH_SIZE 1024
+#define MAXIMUM_BATCH_SIZE 1024
/**
* Information we keep for each supported account.
@@ -81,34 +80,48 @@ struct WireAccount
* Encoded offset in the wire transfer list from where
* to start the next query with the bank.
*/
- uint64_t last_row_off;
+ uint64_t batch_start;
/**
* Latest row offset seen in this transaction, becomes
- * the new #last_row_off upon commit.
+ * the new #batch_start upon commit.
*/
uint64_t latest_row_off;
/**
- * Offset where our current shard ends.
+ * Offset where our current shard begins (inclusive).
+ */
+ uint64_t shard_start;
+
+ /**
+ * Offset where our current shard ends (exclusive).
*/
uint64_t shard_end;
/**
+ * When did we start with the shard?
+ */
+ struct GNUNET_TIME_Absolute shard_start_time;
+
+ /**
+ * Name of our job in the shard table.
+ */
+ char *job_name;
+
+ /**
* How many transactions do we retrieve per batch?
*/
unsigned int batch_size;
/**
- * How many transactions did we see in the current batch?
+ * How much do we incremnt @e batch_size on success?
*/
- unsigned int current_batch_size;
+ unsigned int batch_increment;
/**
- * Are we running from scratch and should re-process all transactions
- * for this account?
+ * How many transactions did we see in the current batch?
*/
- bool reset_mode;
+ unsigned int current_batch_size;
/**
* Should we delay the next request to the wire plugin a bit? Set to
@@ -157,13 +170,29 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/**
* How long should we sleep when idle before trying to find more work?
+ * Also used for how long we wait to grab a shard before trying it again.
+ * The value should be set to a bit above the average time it takes to
+ * process a shard.
*/
static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
- * Modulus to apply to group shards.
+ * How long did we take to finish the last shard?
*/
-static unsigned int shard_size = 1024;
+static struct GNUNET_TIME_Relative shard_delay;
+
+/**
+ * Modulus to apply to group shards. The shard size must ultimately be a
+ * multiple of the batch size. Thus, if this is not a multiple of the
+ * #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
+ */
+static unsigned int shard_size = MAXIMUM_BATCH_SIZE;
+
+/**
+ * How many workers should we plan our scheduling with?
+ */
+static unsigned int max_workers = 16;
+
/**
* Value to return from main(). 0 on success, non-zero on
@@ -187,11 +216,6 @@ static enum
static int test_mode;
/**
- * Are we running from scratch and should re-process all transactions?
- */
-static int reset_mode;
-
-/**
* Current task waiting for execution, if any.
*/
static struct GNUNET_SCHEDULER_Task *task;
@@ -221,6 +245,7 @@ shutdown_task (void *cls)
wa);
TALER_BANK_auth_free (&wa->auth);
GNUNET_free (wa->section_name);
+ GNUNET_free (wa->job_name);
GNUNET_free (wa);
}
}
@@ -263,7 +288,6 @@ add_account_cb (void *cls,
if (GNUNET_YES != ai->credit_enabled)
return; /* not enabled for us, skip */
wa = GNUNET_new (struct WireAccount);
- wa->reset_mode = reset_mode;
if (GNUNET_OK !=
TALER_BANK_auth_parse_cfg (cfg,
ai->section_name,
@@ -276,7 +300,12 @@ add_account_cb (void *cls,
return;
}
wa->section_name = GNUNET_strdup (ai->section_name);
- wa->batch_size = INITIAL_BATCH_SIZE;
+ GNUNET_asprintf (&wa->job_name,
+ "wirewatch-%s",
+ ai->section_name);
+ wa->batch_size = MAXIMUM_BATCH_SIZE;
+ if (0 != shard_size % wa->batch_size)
+ wa->batch_size = shard_size;
GNUNET_CONTAINER_DLL_insert (wa_head,
wa_tail,
wa);
@@ -334,6 +363,127 @@ find_transfers (void *cls);
/**
+ * We encountered a serialization error.
+ * Rollback the transaction and try again
+ *
+ * @param wa account we are transacting on
+ */
+static void
+handle_soft_error (struct WireAccount *wa)
+{
+ db_plugin->rollback (db_plugin->cls,
+ wa->session);
+ if (1 < wa->batch_size)
+ {
+ wa->batch_size /= 2;
+ wa->batch_increment = 0;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Reduced batch size to %llu due to serialization issue\n",
+ (unsigned long long) wa->batch_size);
+ }
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&find_transfers,
+ NULL);
+}
+
+
+/**
+ * We are finished with the current transaction, try
+ * to commit and then schedule the next iteration.
+ *
+ * @param wa wire account to commit for
+ */
+static void
+do_commit (struct WireAccount *wa)
+{
+ enum GNUNET_DB_QueryStatus qs;
+
+ if (wa->shard_end <= wa->latest_row_off)
+ {
+ /* shard is complete, mark this as well */
+ qs = db_plugin->complete_shard (db_plugin->cls,
+ wa->session,
+ wa->job_name,
+ wa->shard_start,
+ wa->shard_end);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ db_plugin->rollback (db_plugin->cls,
+ wa->session);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for complete_shard. Rolling back.\n");
+ handle_soft_error (wa);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* already existed, ok, let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ shard_delay = GNUNET_TIME_absolute_get_duration (wa->shard_start_time);
+
+ break;
+ }
+ }
+ qs = db_plugin->commit (db_plugin->cls,
+ wa->session);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* reduce transaction size to reduce rollback probability */
+ handle_soft_error (wa);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ break;
+ }
+ /* transaction success, update #last_row_off */
+ wa->batch_start = wa->latest_row_off;
+ wa->session = NULL; /* should not be needed */
+ if (wa->batch_size < MAXIMUM_BATCH_SIZE)
+ {
+ wa->batch_increment++;
+ wa->batch_size = GNUNET_MIN (MAXIMUM_BATCH_SIZE,
+ wa->batch_size + wa->batch_increment);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Increasing batch size to %llu\n",
+ (unsigned long long) wa->batch_size);
+ }
+ if ( (wa->delay) &&
+ (test_mode) &&
+ (NULL == wa->next) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Shutdown due to test mode!\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (wa->delay)
+ {
+ wa->delayed_until
+ = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
+ wa_pos = wa_pos->next;
+ if (NULL == wa_pos)
+ wa_pos = wa_head;
+ GNUNET_assert (NULL != wa_pos);
+ }
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
+ &find_transfers,
+ NULL);
+}
+
+
+/**
* Callbacks of this type are used to serve the result of asking
* the bank for the transaction history.
*
@@ -370,89 +520,38 @@ history_cb (void *cls,
}
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"End of list. Committing progress!\n");
- qs = db_plugin->commit (db_plugin->cls,
- session);
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
- {
- GNUNET_SCHEDULER_shutdown ();
- return GNUNET_OK;
- }
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
- /* reduce transaction size to reduce rollback probability */
- if (2 > wa->batch_size)
- {
- wa->batch_size /= 2;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Reduced batch size to %llu due to serialization issue\n",
- (unsigned long long) wa->batch_size);
- }
- /* try again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
- return GNUNET_OK; /* will be ignored anyway */
- }
- GNUNET_break (0 <= qs);
- /* transaction success, update #last_row_off */
- wa->last_row_off = wa->latest_row_off;
- wa->latest_row_off = 0; /* should not be needed */
- wa->session = NULL; /* should not be needed */
- if (wa->batch_size < INITIAL_BATCH_SIZE)
- {
- wa->batch_size += 1;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Increasing batch size to %llu\n",
- (unsigned long long) wa->batch_size);
- }
- if ( (wa->delay) &&
- (test_mode) &&
- (NULL == wa->next) )
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Shutdown due to test mode!\n");
- GNUNET_SCHEDULER_shutdown ();
- return GNUNET_OK;
- }
- if (wa->delay)
- {
- wa->delayed_until
- = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
- wa_pos = wa_pos->next;
- if (NULL == wa_pos)
- wa_pos = wa_head;
- GNUNET_assert (NULL != wa_pos);
- }
- task = GNUNET_SCHEDULER_add_at (wa_pos->delayed_until,
- &find_transfers,
- NULL);
+ do_commit (wa);
return GNUNET_OK; /* will be ignored anyway */
}
+ if (serial_id < wa->latest_row_off)
+ {
+ /* we are done with the current shard, commit and stop this iteration! */
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Serial ID %llu not monotonic (got %llu before). Failing!\n",
+ (unsigned long long) serial_id,
+ (unsigned long long) wa->latest_row_off);
+ db_plugin->rollback (db_plugin->cls,
+ session);
+ GNUNET_SCHEDULER_shutdown ();
+ wa->hh = NULL;
+ return GNUNET_SYSERR;
+ }
+ if (serial_id > wa->shard_end)
+ {
+ /* we are done with the current shard, commit and stop this iteration! */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Serial ID %llu past shard end at %llu, ending iteration early!\n",
+ (unsigned long long) serial_id,
+ (unsigned long long) wa->shard_end);
+ wa->latest_row_off = serial_id - 1;
+ do_commit (wa);
+ wa->hh = NULL;
+ return GNUNET_SYSERR;
+ }
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Adding wire transfer over %s with (hashed) subject `%s'\n",
TALER_amount2s (&details->amount),
TALER_B2S (&details->reserve_pub));
-
- /**
- * Debug block.
- */
-#if DEBUG_LOGGING
- {
- /** Should be 53, give 80 just to be extra conservative (and aligned). */
-#define PUBSIZE 80
- char wtid_s[PUBSIZE];
-
- GNUNET_break (NULL !=
- GNUNET_STRINGS_data_to_string (&details->reserve_pub,
- sizeof (details->reserve_pub),
- &wtid_s[0],
- PUBSIZE));
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Plain text subject (= reserve_pub): %s\n",
- wtid_s);
- }
-#endif
-
/* FIXME-PERFORMANCE: Consider using Postgres multi-valued insert here,
for up to 15x speed-up according to
https://dba.stackexchange.com/questions/224989/multi-row-insert-vs-transactional-single-row-inserts#225006
@@ -466,26 +565,27 @@ history_cb (void *cls,
details->debit_account_url,
wa->section_name,
serial_id);
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+ switch (qs)
{
+ case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls,
session);
GNUNET_SCHEDULER_shutdown ();
wa->hh = NULL;
return GNUNET_SYSERR;
- }
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Got DB soft error for reserves_in_insert. Rolling back.\n");
- db_plugin->rollback (db_plugin->cls,
- session);
+ handle_soft_error (wa);
wa->hh = NULL;
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
return GNUNET_SYSERR;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* already existed, ok, let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ break;
}
wa->delay = false;
wa->latest_row_off = serial_id;
@@ -515,64 +615,77 @@ find_transfers (void *cls)
GNUNET_SCHEDULER_shutdown ();
return;
}
- db_plugin->preflight (db_plugin->cls,
- session);
- if (GNUNET_OK !=
- db_plugin->start (db_plugin->cls,
- session,
- "wirewatch check for incoming wire transfers"))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to start database transaction!\n");
- global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
wa_pos->delay = true;
wa_pos->current_batch_size = 0; /* reset counter */
wa_pos->session = session;
- if (wa_pos->shard_end == wa_pos->last_row_off)
+ if (wa_pos->shard_end <= wa_pos->batch_start)
{
+ uint64_t start;
+ uint64_t end;
+ struct GNUNET_TIME_Relative delay;
/* advance to next shard */
- // FIXME: if other processes are running in parallel,
- // update 'last_row_off' to next free shard!
- wa_pos->shard_end = wa_pos->last_row_off + shard_size;
- }
- if (! wa_pos->reset_mode)
- {
- // FIXME: need good way to fetch
- // shard data here!
- qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls,
- session,
- wa_pos->section_name,
- &wa_pos->last_row_off);
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+
+ delay.rel_value_us = GNUNET_CRYPTO_random_u64 (
+ GNUNET_CRYPTO_QUALITY_WEAK,
+ 4 * GNUNET_TIME_relative_max (
+ wirewatch_idle_sleep_interval,
+ GNUNET_TIME_relative_multiply (shard_delay,
+ max_workers)).rel_value_us);
+ qs = db_plugin->begin_shard (db_plugin->cls,
+ wa_pos->job_name,
+ delay,
+ shard_size,
+ &start,
+ &end);
+ switch (qs)
{
+ case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain starting point for montoring from database!\n");
- db_plugin->rollback (db_plugin->cls,
- session);
global_ret = GR_DATABASE_SELECT_LATEST_HARD_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
- }
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
+ case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
- db_plugin->rollback (db_plugin->cls,
- session);
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
+ task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
+ &find_transfers,
+ NULL);
return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0);
+ task = GNUNET_SCHEDULER_add_delayed (wirewatch_idle_sleep_interval,
+ &find_transfers,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ wa_pos->shard_start_time = GNUNET_TIME_absolute_get ();
+ wa_pos->shard_start = start;
+ wa_pos->shard_end = end;
+ wa_pos->batch_start = start;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Starting with shard at %llu\n",
+ (unsigned long long) start);
+ break;
}
}
- wa_pos->reset_mode = true;
+ if (GNUNET_OK !=
+ db_plugin->start (db_plugin->cls,
+ session,
+ "wirewatch check for incoming wire transfers"))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to start database transaction!\n");
+ global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
limit = GNUNET_MIN (wa_pos->batch_size,
- wa_pos->shard_end - wa_pos->last_row_off);
+ wa_pos->shard_end - wa_pos->batch_start);
GNUNET_assert (NULL == wa_pos->hh);
+ wa_pos->latest_row_off = wa_pos->batch_start;
wa_pos->hh = TALER_BANK_credit_history (ctx,
&wa_pos->auth,
- wa_pos->last_row_off,
+ wa_pos->batch_start,
limit,
&history_cb,
wa_pos);
@@ -644,10 +757,6 @@ main (int argc,
char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
- GNUNET_GETOPT_option_flag ('r',
- "reset",
- "start fresh with all transactions in the history",
- &reset_mode),
GNUNET_GETOPT_option_uint ('S',
"size",
"SIZE",
@@ -659,6 +768,11 @@ main (int argc,
"test",
"run in test mode and exit when idle",
&test_mode),
+ GNUNET_GETOPT_option_uint ('w',
+ "workers",
+ "COUNT",
+ "Plan work load with up to COUNT worker processes (default: 16)",
+ &max_workers),
GNUNET_GETOPT_OPTION_END
};
enum GNUNET_GenericReturnValue ret;