summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-wirewatch.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-wirewatch.c')
-rw-r--r--src/exchange/taler-exchange-wirewatch.c286
1 files changed, 183 insertions, 103 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index e9b28030b..da5d9c098 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2016--2022 Taler Systems SA
+ Copyright (C) 2016--2023 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -52,6 +52,23 @@ static const struct TALER_EXCHANGEDB_AccountInfo *ai;
static struct TALER_BANK_CreditHistoryHandle *hh;
/**
+ * Set to true if the request for history did actually
+ * return transaction items.
+ */
+static bool hh_returned_data;
+
+/**
+ * Set to true if the request for history did not
+ * succeed because the account was unknown.
+ */
+static bool hh_account_404;
+
+/**
+ * When did we start the last @e hh request?
+ */
+static struct GNUNET_TIME_Absolute hh_start_time;
+
+/**
* Until when is processing this wire plugin delayed?
*/
static struct GNUNET_TIME_Absolute delayed_until;
@@ -95,6 +112,12 @@ static struct GNUNET_TIME_Absolute shard_end_time;
static struct GNUNET_TIME_Relative shard_delay;
/**
+ * How long did we take to finish the last shard
+ * for this account?
+ */
+static struct GNUNET_TIME_Relative longpoll_timeout;
+
+/**
* Name of our job in the shard table.
*/
static char *job_name;
@@ -154,6 +177,11 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
+ * How long do we sleep on serialization conflicts?
+ */
+static struct GNUNET_TIME_Relative wirewatch_conflict_sleep_interval;
+
+/**
* Modulus to apply to group shards. The shard size must ultimately be a
* multiple of the batch size. Thus, if this is not a multiple of the
* #MAXIMUM_BATCH_SIZE, the batch size will be set to the #shard_size.
@@ -210,6 +238,8 @@ shutdown_task (void *cls)
if (NULL != hh)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "History request cancelled on shutdown\n");
TALER_BANK_credit_history_cancel (hh);
hh = NULL;
}
@@ -218,13 +248,16 @@ shutdown_task (void *cls)
db_plugin->rollback (db_plugin->cls);
started_transaction = false;
}
- qs = db_plugin->abort_shard (db_plugin->cls,
- job_name,
- shard_start,
- shard_end);
- if (qs <= 0)
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to abort work shard on shutdown\n");
+ if (shard_open)
+ {
+ qs = db_plugin->abort_shard (db_plugin->cls,
+ job_name,
+ shard_start,
+ shard_end);
+ if (qs <= 0)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to abort work shard on shutdown\n");
+ }
GNUNET_free (job_name);
if (NULL != ctx)
{
@@ -263,7 +296,7 @@ add_account_cb (void *cls,
if (! in_ai->credit_enabled)
return; /* not enabled for us, skip */
if ( (NULL != account_section) &&
- (0 != strcasecmp (ai->section_name,
+ (0 != strcasecmp (in_ai->section_name,
account_section)) )
return; /* not enabled for us, skip */
if (NULL != ai)
@@ -320,8 +353,6 @@ exchange_serve_process_config (void)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"No wire accounts configured for credit!\n");
- TALER_EXCHANGEDB_plugin_unload (db_plugin);
- db_plugin = NULL;
return GNUNET_SYSERR;
}
TALER_EXCHANGEDB_find_accounts (&add_account_cb,
@@ -331,9 +362,6 @@ exchange_serve_process_config (void)
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"No accounts enabled for credit!\n");
GNUNET_SCHEDULER_shutdown ();
- global_ret = EXIT_INVALIDARGUMENT;
- TALER_EXCHANGEDB_plugin_unload (db_plugin);
- db_plugin = NULL;
return GNUNET_SYSERR;
}
return GNUNET_OK;
@@ -377,6 +405,8 @@ handle_soft_error (void)
}
/* Reset to beginning of transaction, and go again
from there. */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Encountered soft error, resetting start point to batch start\n");
latest_row_off = batch_start;
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
@@ -420,7 +450,6 @@ schedule_transfers (void)
static void
transaction_completed (void)
{
- GNUNET_assert (! started_transaction);
if ( (batch_start + batch_size ==
latest_row_off) &&
(batch_size < MAXIMUM_BATCH_SIZE) )
@@ -448,6 +477,29 @@ transaction_completed (void)
GNUNET_SCHEDULER_shutdown ();
return;
}
+ if (! (hh_returned_data || hh_account_404) )
+ {
+ /* Enforce long-polling delay even if the server ignored it
+ and returned earlier */
+ struct GNUNET_TIME_Relative latency;
+ struct GNUNET_TIME_Relative left;
+
+ latency = GNUNET_TIME_absolute_get_duration (hh_start_time);
+ left = GNUNET_TIME_relative_subtract (longpoll_timeout,
+ latency);
+ if (! (test_mode ||
+ GNUNET_TIME_relative_is_zero (left)) )
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Server did not respect long-polling, enforcing client-side by sleeping for %s\n",
+ GNUNET_TIME_relative2s (left,
+ true));
+ delayed_until = GNUNET_TIME_relative_to_absolute (left);
+ }
+ if (hh_account_404)
+ delayed_until = GNUNET_TIME_relative_to_absolute (
+ GNUNET_TIME_UNIT_MILLISECONDS);
+ if (test_mode)
+ delayed_until = GNUNET_TIME_UNIT_ZERO_ABS;
GNUNET_assert (NULL == task);
schedule_transfers ();
}
@@ -475,6 +527,7 @@ process_reply (const struct TALER_BANK_CreditDetails *details,
transaction_completed ();
return;
}
+ hh_returned_data = true;
/* check serial IDs for range constraints */
for (unsigned int i = 0; i<details_length; i++)
{
@@ -506,34 +559,30 @@ process_reply (const struct TALER_BANK_CreditDetails *details,
}
lroff = cd->serial_id;
}
- if (GNUNET_OK !=
- db_plugin->start_read_committed (db_plugin->cls,
- "wirewatch check for incoming wire transfers"))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to start database transaction!\n");
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- started_transaction = true;
-
- for (unsigned int i = 0; i<details_length; i++)
+ if (0 != details_length)
{
- const struct TALER_BANK_CreditDetails *cd = &details[i];
+ enum GNUNET_DB_QueryStatus qss[details_length];
+ struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length];
- /* FIXME #7276: Consider using Postgres multi-valued insert here,
- for up to 15x speed-up according to
- https://dba.stackexchange.com/questions/224989/multi-row-insert-vs-transactional-single-row-inserts#225006
- (Note: this may require changing both the
- plugin API as well as modifying how this function is called.) */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Importing %u transactions\n",
+ details_length);
+ for (unsigned int i = 0; i<details_length; i++)
+ {
+ const struct TALER_BANK_CreditDetails *cd = &details[i];
+ struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[i];
+
+ res->reserve_pub = &cd->reserve_pub;
+ res->balance = &cd->amount;
+ res->execution_time = cd->execution_date;
+ res->sender_account_details = cd->debit_account_uri;
+ res->exchange_account_name = ai->section_name;
+ res->wire_reference = cd->serial_id;
+ }
qs = db_plugin->reserves_in_insert (db_plugin->cls,
- &cd->reserve_pub,
- &cd->amount,
- cd->execution_date,
- cd->debit_account_uri,
- ai->section_name,
- cd->serial_id);
+ reserves,
+ details_length,
+ qss);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
@@ -542,34 +591,53 @@ process_reply (const struct TALER_BANK_CreditDetails *details,
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Got DB soft error for reserves_in_insert. Rolling back.\n");
+ "Got DB soft error for reserves_in_insert (%u). Rolling back.\n",
+ details_length);
handle_soft_error ();
return;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- /* Either wirewatch was freshly started after the system was
- shutdown and we're going over an incomplete shard again
- after being restarted, or the shard lock period was too
- short (number of workers set incorrectly?) and a 2nd
- wirewatcher has been stealing our work while we are still
- at it. */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Attempted to import transaction %llu (%s) twice. "
- "This should happen rarely (if not, ask for support).\n",
- (unsigned long long) cd->serial_id,
- job_name);
- db_plugin->rollback (db_plugin->cls);
- started_transaction = false;
- /* already existed, ok, let's just continue */
- return;
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Imported transaction %llu.",
- (unsigned long long) cd->serial_id);
- /* normal case */
+ default:
break;
}
- progress = true;
+ for (unsigned int i = 0; i<details_length; i++)
+ {
+ const struct TALER_BANK_CreditDetails *cd = &details[i];
+
+ switch (qss[i])
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n",
+ i);
+ handle_soft_error ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* Either wirewatch was freshly started after the system was
+ shutdown and we're going over an incomplete shard again
+ after being restarted, or the shard lock period was too
+ short (number of workers set incorrectly?) and a 2nd
+ wirewatcher has been stealing our work while we are still
+ at it. */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Attempted to import transaction %llu (%s) twice. "
+ "This should happen rarely (if not, ask for support).\n",
+ (unsigned long long) cd->serial_id,
+ job_name);
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Imported transaction %llu.\n",
+ (unsigned long long) cd->serial_id);
+ /* normal case */
+ progress = true;
+ break;
+ }
+ }
}
+
latest_row_off = lroff;
shard_done = (shard_end <= latest_row_off);
if (shard_done)
@@ -596,6 +664,7 @@ process_reply (const struct TALER_BANK_CreditDetails *details,
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
+ progress = true;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Completed shard %s (%llu,%llu] after %s\n",
job_name,
@@ -606,35 +675,6 @@ process_reply (const struct TALER_BANK_CreditDetails *details,
true));
break;
}
- }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Committing %s progress (%llu,%llu] at %llu\n (%s)",
- job_name,
- (unsigned long long) shard_start,
- (unsigned long long) shard_end,
- (unsigned long long) latest_row_off,
- shard_done
- ? "shard done"
- : "shard incomplete");
- qs = db_plugin->commit (db_plugin->cls);
- switch (qs)
- {
- case GNUNET_DB_STATUS_HARD_ERROR:
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- /* reduce transaction size to reduce rollback probability */
- handle_soft_error ();
- return;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- started_transaction = false;
- /* normal case */
- break;
- }
- if (shard_done)
- {
shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
shard_open = false;
transaction_completed ();
@@ -660,19 +700,20 @@ history_cb (void *cls,
(void) cls;
GNUNET_assert (NULL == task);
hh = NULL;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"History request returned with HTTP status %u\n",
reply->http_status);
switch (reply->http_status)
{
case MHD_HTTP_OK:
- process_reply (reply->details.success.details,
- reply->details.success.details_length);
+ process_reply (reply->details.ok.details,
+ reply->details.ok.details_length);
return;
case MHD_HTTP_NO_CONTENT:
transaction_completed ();
return;
case MHD_HTTP_NOT_FOUND:
+ hh_account_404 = true;
if (ignore_account_404)
{
transaction_completed ();
@@ -680,12 +721,12 @@ history_cb (void *cls,
}
break;
default:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Error fetching history: %s (%u)\n",
+ TALER_ErrorCode_get_hint (reply->ec),
+ reply->http_status);
break;
}
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Error fetching history: %s (%u)\n",
- TALER_ErrorCode_get_hint (reply->ec),
- reply->http_status);
if (! exit_on_error)
{
transaction_completed ();
@@ -707,15 +748,18 @@ continue_with_shard (void *cls)
shard_end - latest_row_off);
GNUNET_assert (NULL == hh);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Requesting credit history staring from %llu\n",
+ "Requesting credit history starting from %llu\n",
(unsigned long long) latest_row_off);
+ hh_start_time = GNUNET_TIME_absolute_get ();
+ hh_returned_data = false;
+ hh_account_404 = false;
hh = TALER_BANK_credit_history (ctx,
ai->auth,
latest_row_off,
limit,
test_mode
? GNUNET_TIME_UNIT_ZERO
- : LONGPOLL_TIMEOUT,
+ : longpoll_timeout,
&history_cb,
NULL);
if (NULL == hh)
@@ -729,6 +773,11 @@ continue_with_shard (void *cls)
}
+/**
+ * Reserve a shard for us to work on.
+ *
+ * @param cls NULL
+ */
static void
lock_shard (void *cls)
{
@@ -792,12 +841,25 @@ lock_shard (void *cls)
{
struct GNUNET_TIME_Relative rdelay;
- rdelay = GNUNET_TIME_randomize (wirewatch_idle_sleep_interval);
+ wirewatch_conflict_sleep_interval
+ = GNUNET_TIME_STD_BACKOFF (wirewatch_conflict_sleep_interval);
+ rdelay = GNUNET_TIME_randomize (wirewatch_conflict_sleep_interval);
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Serialization error tying to obtain shard %s, will try again in %s!\n",
job_name,
GNUNET_STRINGS_relative_time_to_string (rdelay,
true));
+#if 1
+ if (GNUNET_TIME_relative_cmp (rdelay,
+ >,
+ GNUNET_TIME_UNIT_SECONDS))
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Delay would have been for %s\n",
+ GNUNET_TIME_relative2s (rdelay,
+ true));
+ rdelay = GNUNET_TIME_relative_min (rdelay,
+ GNUNET_TIME_UNIT_SECONDS);
+#endif
delayed_until = GNUNET_TIME_relative_to_absolute (rdelay);
}
GNUNET_assert (NULL == task);
@@ -810,7 +872,7 @@ lock_shard (void *cls)
job_name,
GNUNET_STRINGS_relative_time_to_string (
wirewatch_idle_sleep_interval,
- GNUNET_YES));
+ true));
delayed_until = GNUNET_TIME_relative_to_absolute (
wirewatch_idle_sleep_interval);
shard_open = false;
@@ -819,6 +881,7 @@ lock_shard (void *cls)
return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* continued below */
+ wirewatch_conflict_sleep_interval = GNUNET_TIME_UNIT_ZERO;
break;
}
shard_end_time = GNUNET_TIME_relative_to_absolute (delay);
@@ -834,9 +897,19 @@ lock_shard (void *cls)
if ( (shard_open) &&
(shard_start == last_shard_start) &&
(shard_end == last_shard_end) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Continuing from %llu\n",
+ (unsigned long long) latest_row_off);
GNUNET_break (latest_row_off >= batch_start); /* resume where we left things */
+ }
else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Resetting shard start to original start point (%d)\n",
+ shard_open ? 1 : 0);
latest_row_off = batch_start;
+ }
shard_open = true;
task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
NULL);
@@ -877,6 +950,7 @@ run (void *cls,
{
GNUNET_break (0);
GNUNET_SCHEDULER_shutdown ();
+ global_ret = EXIT_NO_RESTART;
return;
}
rc = GNUNET_CURL_gnunet_rc_create (ctx);
@@ -905,6 +979,11 @@ main (int argc,
"exit-on-error",
"terminate wirewatch if we failed to download information from the bank",
&exit_on_error),
+ GNUNET_GETOPT_option_relative_time ('f',
+ "longpoll-timeout",
+ "DELAY",
+ "what is the timeout when asking the bank about new transactions, specify with unit (e.g. --longpoll-timeout=30s)",
+ &longpoll_timeout),
GNUNET_GETOPT_option_flag ('I',
"ignore-not-found",
"continue, even if the bank account of the exchange was not found",
@@ -930,6 +1009,7 @@ main (int argc,
};
enum GNUNET_GenericReturnValue ret;
+ longpoll_timeout = LONGPOLL_TIMEOUT;
if (GNUNET_OK !=
GNUNET_STRINGS_get_utf8_args (argc, argv,
&argc, &argv))