summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2022-12-20 12:32:42 +0100
committerChristian Grothoff <christian@grothoff.org>2022-12-20 12:32:42 +0100
commitb0f746cf3ea08961a0494451b0aa6996b6830526 (patch)
tree01a8e27f3cf545d17d289692a1e485fe596f475a
parent4cf0d8580f1cb84a962b23f88b0ad7dc0fa37480 (diff)
downloadexchange-b0f746cf3ea08961a0494451b0aa6996b6830526.tar.gz
exchange-b0f746cf3ea08961a0494451b0aa6996b6830526.tar.bz2
exchange-b0f746cf3ea08961a0494451b0aa6996b6830526.zip
-enable batch testing and no DB rest in bank benchmark
-rw-r--r--src/benchmark/taler-bank-benchmark.c10
-rw-r--r--src/exchange/taler-exchange-wirewatch.c217
2 files changed, 221 insertions, 6 deletions
diff --git a/src/benchmark/taler-bank-benchmark.c b/src/benchmark/taler-bank-benchmark.c
index 7c2e51f15..b17bb9411 100644
--- a/src/benchmark/taler-bank-benchmark.c
+++ b/src/benchmark/taler-bank-benchmark.c
@@ -168,6 +168,11 @@ static enum BenchmarkMode mode;
static int linger;
/**
+ * Do not initialize or reset the database.
+ */
+static int incremental;
+
+/**
* Configuration.
*/
static struct GNUNET_CONFIGURATION_Handle *cfg;
@@ -550,6 +555,7 @@ parallel_benchmark (void)
return GNUNET_SYSERR;
}
+ if (0 == incremental)
{
struct GNUNET_OS_Process *dbinit;
@@ -769,6 +775,10 @@ main (int argc,
"linger",
"linger around until key press",
&linger),
+ GNUNET_GETOPT_option_flag ('i',
+ "incremental",
+ "skip initializing and resetting the database",
+ &incremental),
GNUNET_GETOPT_option_string ('l',
"logfile",
"LF",
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index d7eaa7e05..4acf8a16a 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -836,6 +836,188 @@ process_reply_batched (const struct TALER_BANK_CreditDetails *details,
/**
+ * We got incoming transaction details from the bank. Add them
+ * to the database.
+ *
+ * @param batch_size desired batch size
+ * @param details array of transaction details
+ * @param details_length length of the @a details array
+ */
+static void
+process_reply_batched2 (unsigned int batch_size,
+ const struct TALER_BANK_CreditDetails *details,
+ unsigned int details_length)
+{
+ enum GNUNET_DB_QueryStatus qs;
+ bool shard_done;
+ uint64_t lroff = latest_row_off;
+
+ if (0 == details_length)
+ {
+ /* Server should have used 204, not 200! */
+ GNUNET_break_op (0);
+ transaction_completed ();
+ return;
+ }
+ /* check serial IDs for range constraints */
+ for (unsigned int i = 0; i<details_length; i++)
+ {
+ const struct TALER_BANK_CreditDetails *cd = &details[i];
+
+ if (cd->serial_id < lroff)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Serial ID %llu not monotonic (got %llu before). Failing!\n",
+ (unsigned long long) cd->serial_id,
+ (unsigned long long) lroff);
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (cd->serial_id > shard_end)
+ {
+ /* we are *past* the current shard (likely because the serial_id of the
+ shard_end happens to not exist in the DB). So commit and stop this
+ iteration! */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Serial ID %llu past shard end at %llu, ending iteration early!\n",
+ (unsigned long long) cd->serial_id,
+ (unsigned long long) shard_end);
+ details_length = i;
+ progress = true;
+ lroff = cd->serial_id - 1;
+ break;
+ }
+ lroff = cd->serial_id;
+ }
+ if (0 != details_length)
+ {
+ enum GNUNET_DB_QueryStatus qss[details_length];
+ struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length];
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Importing %u transactions\n",
+ details_length);
+ for (unsigned int i = 0; i<details_length; i++)
+ {
+ const struct TALER_BANK_CreditDetails *cd = &details[i];
+ struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[i];
+
+ res->reserve_pub = &cd->reserve_pub;
+ res->balance = &cd->amount;
+ res->execution_time = cd->execution_date;
+ res->sender_account_details = cd->debit_account_uri;
+ res->exchange_account_name = ai->section_name;
+ res->wire_reference = cd->serial_id;
+ }
+ qs = db_plugin->batch2_reserves_in_insert (db_plugin->cls,
+ reserves,
+ details_length,
+ batch_size,
+ qss);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for batch_reserves_in_insert. Rolling back.\n");
+ handle_soft_error ();
+ return;
+ default:
+ break;
+ }
+ for (unsigned int i = 0; i<details_length; i++)
+ {
+ const struct TALER_BANK_CreditDetails *cd = &details[i];
+
+ switch (qss[i])
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n",
+ i);
+ handle_soft_error ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* Either wirewatch was freshly started after the system was
+ shutdown and we're going over an incomplete shard again
+ after being restarted, or the shard lock period was too
+ short (number of workers set incorrectly?) and a 2nd
+ wirewatcher has been stealing our work while we are still
+ at it. */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Attempted to import transaction %llu (%s) twice. "
+ "This should happen rarely (if not, ask for support).\n",
+ (unsigned long long) cd->serial_id,
+ job_name);
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Imported transaction %llu.",
+ (unsigned long long) cd->serial_id);
+ /* normal case */
+ progress = true;
+ break;
+ }
+ }
+ }
+
+ latest_row_off = lroff;
+ shard_done = (shard_end <= latest_row_off);
+ if (shard_done)
+ {
+ /* shard is complete, mark this as well */
+ qs = db_plugin->complete_shard (db_plugin->cls,
+ job_name,
+ shard_start,
+ shard_end);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for complete_shard. Rolling back.\n");
+ handle_soft_error ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0);
+ /* Not expected, but let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ progress = true;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Completed shard %s (%llu,%llu] after %s\n",
+ job_name,
+ (unsigned long long) shard_start,
+ (unsigned long long) shard_end,
+ GNUNET_STRINGS_relative_time_to_string (
+ GNUNET_TIME_absolute_get_duration (shard_start_time),
+ true));
+ break;
+ }
+ shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
+ shard_open = false;
+ transaction_completed ();
+ return;
+ }
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+ NULL);
+}
+
+
+/**
* Callbacks of this type are used to serve the result of asking
* the bank for the transaction history.
*
@@ -846,11 +1028,25 @@ static void
history_cb (void *cls,
const struct TALER_BANK_CreditHistoryResponse *reply)
{
- static int batch_mode = -1;
+ static int batch_mode = -2;
(void) cls;
- if (-1 == batch_mode)
- batch_mode = (NULL != getenv ("TALER_USE_BATCH"));
+ if (-2 == batch_mode)
+ {
+ const char *mode = getenv ("TALER_USE_BATCH");
+ char dummy;
+
+ if (1 != sscanf (mode,
+ "%d%c",
+ &batch_mode,
+ &dummy))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Bad batch mode `%s' specified\n",
+ batch_mode);
+ batch_mode = -1;
+ }
+ }
GNUNET_assert (NULL == task);
hh = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -859,13 +1055,22 @@ history_cb (void *cls,
switch (reply->http_status)
{
case MHD_HTTP_OK:
- if (0 == batch_mode)
+ switch (batch_mode)
+ {
+ case -1:
process_reply (reply->details.success.details,
reply->details.success.details_length);
- else
+ break;
+ case 0:
process_reply_batched (reply->details.success.details,
reply->details.success.details_length);
- return;
+ break;
+ default:
+ process_reply_batched2 ((unsigned int) batch_mode,
+ reply->details.success.details,
+ reply->details.success.details_length);
+ break;
+ }
case MHD_HTTP_NO_CONTENT:
transaction_completed ();
return;