summaryrefslogtreecommitdiff
path: root/src/exchange
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2020-03-15 21:20:56 +0100
committerChristian Grothoff <christian@grothoff.org>2020-03-15 21:20:56 +0100
commitd3f7cc11842a3e2a574431919179d037e56715ea (patch)
tree8b4c14787de5815432ed28ed6d550dbd4c58eed2 /src/exchange
parentc898a1e13b1f06dafec35051dfd232510bd28de3 (diff)
downloadexchange-d3f7cc11842a3e2a574431919179d037e56715ea.tar.gz
exchange-d3f7cc11842a3e2a574431919179d037e56715ea.tar.bz2
exchange-d3f7cc11842a3e2a574431919179d037e56715ea.zip
clean up wirewatch logic
Diffstat (limited to 'src/exchange')
-rw-r--r--src/exchange/exchange.conf8
-rw-r--r--src/exchange/taler-exchange-wirewatch.c249
2 files changed, 151 insertions, 106 deletions
diff --git a/src/exchange/exchange.conf b/src/exchange/exchange.conf
index 8144bddcd..9de198949 100644
--- a/src/exchange/exchange.conf
+++ b/src/exchange/exchange.conf
@@ -54,9 +54,15 @@ PORT = 8081
BASE_URL = http://localhost:8081/
-# How long should the aggregator sleep if it has nothing to do?
+# How long should the aggregator (and closer, and transfer)
+# sleep if it has nothing to do?
AGGREGATOR_IDLE_SLEEP_INTERVAL = 60 s
+# How long should wirewatch sleep if it has nothing to do?
+# (Set very aggressively here for the demonstrators to be
+# super fast.)
+WIREWATCH_IDLE_SLEEP_INTERVAL = 1 s
+
# how long is one signkey valid?
SIGNKEY_DURATION = 4 weeks
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index 3731f6633..04bf21698 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2016, 2017, 2018 Taler Systems SA
+ Copyright (C) 2016--2020 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -29,12 +29,13 @@
#include "taler_json_lib.h"
#include "taler_bank_service.h"
+#define DEBUG_LOGGING 0
+
/**
- * How long do we sleep before trying again if there
- * are no transactions returned by the wire plugin?
+ * What is the initial batch size we use for credit history
+ * requests with the bank. See `batch_size` below.
*/
-#define DELAY GNUNET_TIME_UNIT_SECONDS
-
+#define INITIAL_BATCH_SIZE 1024
/**
* Information we keep for each supported account.
@@ -57,20 +58,58 @@ struct WireAccount
char *section_name;
/**
+ * Database session we are using for the current transaction.
+ */
+ struct TALER_EXCHANGEDB_Session *session;
+
+ /**
+ * Active request for history.
+ */
+ struct TALER_BANK_CreditHistoryHandle *hh;
+
+ /**
* Authentication data.
*/
struct TALER_BANK_AuthenticationData auth;
/**
+ * Until when is processing this wire plugin delayed?
+ */
+ struct GNUNET_TIME_Absolute delayed_until;
+
+ /**
+ * Encoded offset in the wire transfer list from where
+ * to start the next query with the bank.
+ */
+ uint64_t last_row_off;
+
+ /**
+ * Latest row offset seen in this transaction, becomes
+ * the new #last_row_off upon commit.
+ */
+ uint64_t latest_row_off;
+
+ /**
+ * How many transactions do we retrieve per batch?
+ */
+ unsigned int batch_size;
+
+ /**
+ * How many transactions did we see in the current batch?
+ */
+ unsigned int current_batch_size;
+
+ /**
* Are we running from scratch and should re-process all transactions
* for this account?
*/
int reset_mode;
/**
- * Until when is processing this wire plugin delayed?
+ * Should we delay the next request to the wire plugin a bit? Set to
+ * #GNUNET_NO if we actually did some work.
*/
- struct GNUNET_TIME_Absolute delayed_until;
+ int delay;
};
@@ -86,7 +125,8 @@ static struct WireAccount *wa_head;
static struct WireAccount *wa_tail;
/**
- * Wire plugin we are currently using.
+ * Wire account we are currently processing. This would go away
+ * if we ever start processing all accounts in parallel.
*/
static struct WireAccount *wa_pos;
@@ -111,27 +151,25 @@ static const struct GNUNET_CONFIGURATION_Handle *cfg;
static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/**
- * Value to return from main(). #GNUNET_OK on success, #GNUNET_SYSERR
- * on serious errors.
- */
-static int global_ret;
-
-/**
- * Encoded offset in the wire transfer list from where
- * to start the next query with the bank.
+ * How long should we sleep when idle before trying to find more work?
*/
-static uint64_t last_row_off;
+static struct GNUNET_TIME_Relative wirewatch_idle_sleep_interval;
/**
- * Latest row offset seen in this transaction, becomes
- * the new #last_row_off upon commit.
- */
-static uint64_t latest_row_off;
-
-/**
- * Should we delay the next request to the wire plugin a bit?
+ * Value to return from main(). 0 on success, non-zero on
+ * on serious errors.
*/
-static int delay;
+static enum
+{
+ GR_SUCCESS = 0,
+ GR_DATABASE_SESSION_FAIL = 1,
+ GR_DATABASE_TRANSACTION_BEGIN_FAIL = 2,
+ GR_DATABASE_SELECT_LATEST_HARD_FAIL = 3,
+ GR_BANK_REQUEST_HISTORY_FAIL = 4,
+ GR_CONFIGURATION_INVALID = 5,
+ GR_CMD_LINE_UTF8_ERROR = 6,
+ GR_CMD_LINE_OPTIONS_WRONG = 7,
+} global_ret;
/**
* Are we run in testing mode and should only do one pass?
@@ -144,25 +182,10 @@ static int test_mode;
static int reset_mode;
/**
- * How many transactions do we retrieve per batch?
- */
-static unsigned int batch_size = 1024;
-
-/**
- * How many transactions did we see in the current batch?
- */
-static unsigned int current_batch_size;
-
-/**
- * Next task to run, if any.
+ * Current task waiting for execution, if any.
*/
static struct GNUNET_SCHEDULER_Task *task;
-/**
- * Active request for history.
- */
-static struct TALER_BANK_CreditHistoryHandle *hh;
-
/**
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
@@ -173,11 +196,26 @@ static void
shutdown_task (void *cls)
{
(void) cls;
- if (NULL != hh)
{
- TALER_BANK_credit_history_cancel (hh);
- hh = NULL;
+ struct WireAccount *wa;
+
+ while (NULL != (wa = wa_head))
+ {
+ if (NULL != wa->hh)
+ {
+ TALER_BANK_credit_history_cancel (wa->hh);
+ wa->hh = NULL;
+ }
+ GNUNET_CONTAINER_DLL_remove (wa_head,
+ wa_tail,
+ wa);
+ TALER_BANK_auth_free (&wa->auth);
+ GNUNET_free (wa->section_name);
+ GNUNET_free (wa);
+ }
}
+ wa_pos = NULL;
+
if (NULL != ctx)
{
GNUNET_CURL_fini (ctx);
@@ -195,21 +233,6 @@ shutdown_task (void *cls)
}
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
- {
- struct WireAccount *wa;
-
- while (NULL != (wa = wa_head))
- {
- GNUNET_CONTAINER_DLL_remove (wa_head,
- wa_tail,
- wa);
- TALER_BANK_auth_free (&wa->auth);
- GNUNET_free (wa->section_name);
- GNUNET_free (wa);
- }
- }
- wa_pos = NULL;
- last_row_off = 0;
}
@@ -243,6 +266,7 @@ add_account_cb (void *cls,
return;
}
wa->section_name = GNUNET_strdup (ai->section_name);
+ wa->batch_size = INITIAL_BATCH_SIZE;
GNUNET_CONTAINER_DLL_insert (wa_head,
wa_tail,
wa);
@@ -258,6 +282,17 @@ add_account_cb (void *cls,
static int
exchange_serve_process_config (void)
{
+ if (GNUNET_OK !=
+ GNUNET_CONFIGURATION_get_value_time (cfg,
+ "exchange",
+ "WIREWATCH_IDLE_SLEEP_INTERVAL",
+ &wirewatch_idle_sleep_interval))
+ {
+ GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
+ "exchange",
+ "WIREWATCH_IDLE_SLEEP_INTERVAL");
+ return GNUNET_SYSERR;
+ }
if (NULL ==
(db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
{
@@ -292,7 +327,7 @@ find_transfers (void *cls);
* Callbacks of this type are used to serve the result of asking
* the bank for the transaction history.
*
- * @param cls closure with the `struct TALER_EXCHANGEDB_Session *`
+ * @param cls closure with the `struct WioreAccount *` we are processing
* @param http_status HTTP status code from the server
* @param ec taler error code
* @param serial_id identification of the position at which we are querying
@@ -308,13 +343,14 @@ history_cb (void *cls,
const struct TALER_BANK_CreditDetails *details,
const json_t *json)
{
- struct TALER_EXCHANGEDB_Session *session = cls;
+ struct WireAccount *wa = cls;
+ struct TALER_EXCHANGEDB_Session *session = wa->session;
enum GNUNET_DB_QueryStatus qs;
(void) json;
if (NULL == details)
{
- hh = NULL;
+ wa->hh = NULL;
if (TALER_EC_NONE != ec)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -331,8 +367,8 @@ history_cb (void *cls,
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Got DB soft error for commit\n");
/* reduce transaction size to reduce rollback probability */
- if (2 > current_batch_size)
- current_batch_size /= 2;
+ if (2 > wa->current_batch_size)
+ wa->current_batch_size /= 2;
/* try again */
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&find_transfers,
@@ -342,27 +378,28 @@ history_cb (void *cls,
if (0 < qs)
{
/* transaction success, update #last_row_off */
- last_row_off = latest_row_off;
- latest_row_off = 0;
-
+ wa->last_row_off = wa->latest_row_off;
+ wa->latest_row_off = 0; /* should not be needed */
+ wa->session = NULL; /* should not be needed */
/* if successful at limit, try increasing transaction batch size (AIMD) */
- if (current_batch_size == batch_size)
- batch_size++;
+ if ( (wa->current_batch_size == wa->batch_size) &&
+ (UINT_MAX > wa->batch_size) )
+ wa->batch_size++;
}
GNUNET_break (0 <= qs);
- if ( (GNUNET_YES == delay) &&
+ if ( (GNUNET_YES == wa->delay) &&
(test_mode) &&
- (NULL == wa_pos->next) )
+ (NULL == wa->next) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Shutdown due to test mode!\n");
GNUNET_SCHEDULER_shutdown ();
return GNUNET_OK;
}
- if (GNUNET_YES == delay)
+ if (GNUNET_YES == wa->delay)
{
- wa_pos->delayed_until
- = GNUNET_TIME_relative_to_absolute (DELAY);
+ wa->delayed_until
+ = GNUNET_TIME_relative_to_absolute (wirewatch_idle_sleep_interval);
wa_pos = wa_pos->next;
if (NULL == wa_pos)
wa_pos = wa_head;
@@ -381,29 +418,32 @@ history_cb (void *cls,
/**
* Debug block.
*/
+#if DEBUG_LOGGING
{
-/* Should be 53, give 80 just to be redundant. */
+ /** Should be 53, give 80 just to be extra conservative (and aligned). */
#define PUBSIZE 80
char wtid_s[PUBSIZE];
- GNUNET_break
- (NULL != GNUNET_STRINGS_data_to_string (&details->reserve_pub,
- sizeof (details->reserve_pub),
- &wtid_s[0],
- PUBSIZE));
+ GNUNET_break (NULL !=
+ GNUNET_STRINGS_data_to_string (&details->reserve_pub,
+ sizeof (details->reserve_pub),
+ &wtid_s[0],
+ PUBSIZE));
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Plain text subject (= reserve_pub): %s\n",
wtid_s);
}
+#endif
- current_batch_size++;
+ if (wa->current_batch_size < UINT_MAX)
+ wa->current_batch_size++;
qs = db_plugin->reserves_in_insert (db_plugin->cls,
session,
&details->reserve_pub,
&details->amount,
details->execution_date,
details->debit_account_url,
- wa_pos->section_name,
+ wa->section_name,
serial_id);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
@@ -425,8 +465,8 @@ history_cb (void *cls,
NULL);
return GNUNET_SYSERR;
}
-
- latest_row_off = serial_id;
+ wa->delay = GNUNET_NO;
+ wa->latest_row_off = serial_id;
return GNUNET_OK;
}
@@ -446,12 +486,11 @@ find_transfers (void *cls)
task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Checking for incoming wire transfers\n");
-
if (NULL == (session = db_plugin->get_session (db_plugin->cls)))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database session!\n");
- global_ret = GNUNET_SYSERR;
+ global_ret = GR_DATABASE_SESSION_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
}
@@ -464,7 +503,7 @@ find_transfers (void *cls)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
- global_ret = GNUNET_SYSERR;
+ global_ret = GR_DATABASE_TRANSACTION_BEGIN_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
}
@@ -473,14 +512,14 @@ find_transfers (void *cls)
qs = db_plugin->get_latest_reserve_in_reference (db_plugin->cls,
session,
wa_pos->section_name,
- &last_row_off);
+ &wa_pos->last_row_off);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain starting point for montoring from database!\n");
db_plugin->rollback (db_plugin->cls,
session);
- global_ret = GNUNET_SYSERR;
+ global_ret = GR_DATABASE_SELECT_LATEST_HARD_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
}
@@ -493,28 +532,28 @@ find_transfers (void *cls)
NULL);
return;
}
+ wa_pos->reset_mode = GNUNET_NO;
}
- wa_pos->reset_mode = GNUNET_NO;
- delay = GNUNET_YES;
- current_batch_size = 0;
+ wa_pos->delay = GNUNET_YES;
+ wa_pos->current_batch_size = 0; /* reset counter */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"wirewatch: requesting incoming history from %s\n",
wa_pos->auth.wire_gateway_url);
-
- hh = TALER_BANK_credit_history (ctx,
- &wa_pos->auth,
- last_row_off,
- batch_size,
- &history_cb,
- session);
- if (NULL == hh)
+ wa_pos->session = session;
+ wa_pos->hh = TALER_BANK_credit_history (ctx,
+ &wa_pos->auth,
+ wa_pos->last_row_off,
+ wa_pos->batch_size,
+ &history_cb,
+ wa_pos);
+ if (NULL == wa_pos->hh)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start request for account history!\n");
db_plugin->rollback (db_plugin->cls,
session);
- global_ret = GNUNET_SYSERR;
+ global_ret = GR_BANK_REQUEST_HISTORY_FAIL;
GNUNET_SCHEDULER_shutdown ();
return;
}
@@ -542,7 +581,7 @@ run (void *cls,
if (GNUNET_OK !=
exchange_serve_process_config ())
{
- global_ret = 1;
+ global_ret = GR_CONFIGURATION_INVALID;
return;
}
wa_pos = wa_head;
@@ -567,7 +606,7 @@ run (void *cls,
*
* @param argc number of arguments from the command line
* @param argv command line arguments
- * @return 0 ok, 1 on error
+ * @return 0 ok, non-zero on error
*/
int
main (int argc,
@@ -590,7 +629,7 @@ main (int argc,
if (GNUNET_OK !=
GNUNET_STRINGS_get_utf8_args (argc, argv,
&argc, &argv))
- return 2;
+ return GR_CMD_LINE_UTF8_ERROR;
if (GNUNET_OK !=
GNUNET_PROGRAM_run (argc, argv,
"taler-exchange-wirewatch",
@@ -600,7 +639,7 @@ main (int argc,
&run, NULL))
{
GNUNET_free ((void *) argv);
- return 1;
+ return GR_CMD_LINE_OPTIONS_WRONG;
}
GNUNET_free ((void *) argv);
return global_ret;