summaryrefslogtreecommitdiff
path: root/src/exchange
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange')
-rw-r--r--src/exchange/taler-exchange-wirewatch.c85
1 files changed, 50 insertions, 35 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index 40b962f8a..760dbe10b 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2016--2020 Taler Systems SA
+ Copyright (C) 2016--2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -90,6 +90,11 @@ struct WireAccount
uint64_t latest_row_off;
/**
+ * Offset where our current shard ends.
+ */
+ uint64_t shard_end;
+
+ /**
* How many transactions do we retrieve per batch?
*/
unsigned int batch_size;
@@ -103,19 +108,14 @@ struct WireAccount
* Are we running from scratch and should re-process all transactions
* for this account?
*/
- int reset_mode;
+ bool reset_mode;
/**
* Should we delay the next request to the wire plugin a bit? Set to
- * #GNUNET_NO if we actually did some work.
+ * false if we actually did some work.
*/
- int delay;
+ bool delay;
- /**
- * Did we experience a soft failure during the current
- * transaction?
- */
- bool soft_fail;
};
@@ -161,6 +161,11 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
+ * Modulus to apply to group shards.
+ */
+static unsigned int shard_size = 1024;
+
+/**
* Value to return from main(). 0 on success, non-zero on
* on serious errors.
*/
@@ -363,20 +368,10 @@ history_cb (void *cls,
(unsigned int) ec,
http_status);
}
- if (wa->soft_fail)
- {
- /* no point to commit, transaction was already rolled
- back after we encountered a soft failure */
- wa->soft_fail = false;
- qs = GNUNET_DB_STATUS_SOFT_ERROR;
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "End of list. Committing progress!\n");
- qs = db_plugin->commit (db_plugin->cls,
- session);
- }
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "End of list. Committing progress!\n");
+ qs = db_plugin->commit (db_plugin->cls,
+ session);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_SCHEDULER_shutdown ();
@@ -410,7 +405,7 @@ history_cb (void *cls,
"Increasing batch size to %llu\n",
(unsigned long long) wa->batch_size);
}
- if ( (GNUNET_YES == wa->delay) &&
+ if ( (wa->delay) &&
(test_mode) &&
(NULL == wa->next) )
{
@@ -419,7 +414,7 @@ history_cb (void *cls,
GNUNET_SCHEDULER_shutdown ();
return GNUNET_OK;
}
- if (GNUNET_YES == wa->delay)
+ if (wa->delay)
{
wa->delayed_until
= GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
@@ -477,6 +472,7 @@ history_cb (void *cls,
db_plugin->rollback (db_plugin->cls,
session);
GNUNET_SCHEDULER_shutdown ();
+ wa->hh = NULL;
return GNUNET_SYSERR;
}
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
@@ -485,10 +481,13 @@ history_cb (void *cls,
"Got DB soft error for reserves_in_insert. Rolling back.\n");
db_plugin->rollback (db_plugin->cls,
session);
- wa->soft_fail = true;
+ wa->hh = NULL;
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&find_transfers,
+ NULL);
return GNUNET_SYSERR;
}
- wa->delay = GNUNET_NO;
+ wa->delay = false;
wa->latest_row_off = serial_id;
return GNUNET_OK;
}
@@ -504,6 +503,7 @@ find_transfers (void *cls)
{
struct TALER_EXCHANGEDB_Session *session;
enum GNUNET_DB_QueryStatus qs;
+ unsigned int limit;
(void) cls;
task = NULL;
@@ -555,13 +555,21 @@ find_transfers (void *cls)
}
wa_pos->reset_mode = GNUNET_NO;
}
- wa_pos->delay = GNUNET_YES;
+ wa_pos->delay = true;
wa_pos->current_batch_size = 0; /* reset counter */
wa_pos->session = session;
+ if (wa_pos->shard_end == wa_pos->last_row_off)
+ {
+ /* advance to next shard */
+ wa_pos->shard_end += shard_size;
+ }
+ limit = GNUNET_MIN (wa_pos->batch_size,
+ wa_pos->shard_end - wa_pos->last_row_off);
+ GNUNET_assert (NULL == wa_pos->hh);
wa_pos->hh = TALER_BANK_credit_history (ctx,
&wa_pos->auth,
wa_pos->last_row_off,
- wa_pos->batch_size,
+ limit,
&history_cb,
wa_pos);
if (NULL == wa_pos->hh)
@@ -594,6 +602,7 @@ run (void *cls,
(void) cls;
(void) args;
(void) cfgfile;
+
cfg = c;
if (GNUNET_OK !=
exchange_serve_process_config ())
@@ -603,8 +612,6 @@ run (void *cls,
}
wa_pos = wa_head;
GNUNET_assert (NULL != wa_pos);
- task = GNUNET_SCHEDULER_add_now (&find_transfers,
- NULL);
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
cls);
ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
@@ -615,6 +622,9 @@ run (void *cls,
GNUNET_break (0);
return;
}
+
+ task = GNUNET_SCHEDULER_add_now (&find_transfers,
+ NULL);
}
@@ -630,16 +640,21 @@ main (int argc,
char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_option_flag ('r',
+ "reset",
+ "start fresh with all transactions in the history",
+ &reset_mode),
+ GNUNET_GETOPT_option_uint ('S',
+ "size",
+ "SIZE",
+ "Size to process per shard (default: 1024)",
+ &shard_size),
GNUNET_GETOPT_option_timetravel ('T',
"timetravel"),
GNUNET_GETOPT_option_flag ('t',
"test",
"run in test mode and exit when idle",
&test_mode),
- GNUNET_GETOPT_option_flag ('r',
- "reset",
- "start fresh with all transactions in the history",
- &reset_mode),
GNUNET_GETOPT_OPTION_END
};
enum GNUNET_GenericReturnValue ret;