summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-wirewatch.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2022-12-19 21:41:32 +0100
committerChristian Grothoff <christian@grothoff.org>2022-12-19 21:41:32 +0100
commitb6b80e61f49db3d5a4a796d95093c1b6784d3f3f (patch)
treea06335764bc3ed9edc42236b62c29fcda18b50c8 /src/exchange/taler-exchange-wirewatch.c
parent709ca561d27d801f405b49d886e9db24b073a785 (diff)
downloadexchange-b6b80e61f49db3d5a4a796d95093c1b6784d3f3f.tar.gz
exchange-b6b80e61f49db3d5a4a796d95093c1b6784d3f3f.tar.bz2
exchange-b6b80e61f49db3d5a4a796d95093c1b6784d3f3f.zip
refactor wirewatch to enable use of batch API
Diffstat (limited to 'src/exchange/taler-exchange-wirewatch.c')
-rw-r--r--src/exchange/taler-exchange-wirewatch.c191
1 files changed, 189 insertions, 2 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c
index 47ecba684..d7eaa7e05 100644
--- a/src/exchange/taler-exchange-wirewatch.c
+++ b/src/exchange/taler-exchange-wirewatch.c
@@ -657,6 +657,185 @@ process_reply (const struct TALER_BANK_CreditDetails *details,
/**
+ * We got incoming transaction details from the bank. Add them
+ * to the database.
+ *
+ * @param details array of transaction details
+ * @param details_length length of the @a details array
+ */
+static void
+process_reply_batched (const struct TALER_BANK_CreditDetails *details,
+ unsigned int details_length)
+{
+ enum GNUNET_DB_QueryStatus qs;
+ bool shard_done;
+ uint64_t lroff = latest_row_off;
+
+ if (0 == details_length)
+ {
+ /* Server should have used 204, not 200! */
+ GNUNET_break_op (0);
+ transaction_completed ();
+ return;
+ }
+ /* check serial IDs for range constraints */
+ for (unsigned int i = 0; i<details_length; i++)
+ {
+ const struct TALER_BANK_CreditDetails *cd = &details[i];
+
+ if (cd->serial_id < lroff)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Serial ID %llu not monotonic (got %llu before). Failing!\n",
+ (unsigned long long) cd->serial_id,
+ (unsigned long long) lroff);
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (cd->serial_id > shard_end)
+ {
+ /* we are *past* the current shard (likely because the serial_id of the
+ shard_end happens to not exist in the DB). So commit and stop this
+ iteration! */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Serial ID %llu past shard end at %llu, ending iteration early!\n",
+ (unsigned long long) cd->serial_id,
+ (unsigned long long) shard_end);
+ details_length = i;
+ progress = true;
+ lroff = cd->serial_id - 1;
+ break;
+ }
+ lroff = cd->serial_id;
+ }
+ if (0 != details_length)
+ {
+ enum GNUNET_DB_QueryStatus qss[details_length];
+ struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length];
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Importing %u transactions\n",
+ details_length);
+ for (unsigned int i = 0; i<details_length; i++)
+ {
+ const struct TALER_BANK_CreditDetails *cd = &details[i];
+ struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[i];
+
+ res->reserve_pub = &cd->reserve_pub;
+ res->balance = &cd->amount;
+ res->execution_time = cd->execution_date;
+ res->sender_account_details = cd->debit_account_uri;
+ res->exchange_account_name = ai->section_name;
+ res->wire_reference = cd->serial_id;
+ }
+ qs = db_plugin->batch_reserves_in_insert (db_plugin->cls,
+ reserves,
+ details_length,
+ qss);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for batch_reserves_in_insert. Rolling back.\n");
+ handle_soft_error ();
+ return;
+ default:
+ break;
+ }
+ for (unsigned int i = 0; i<details_length; i++)
+ {
+ const struct TALER_BANK_CreditDetails *cd = &details[i];
+
+ switch (qss[i])
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n",
+ i);
+ handle_soft_error ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* Either wirewatch was freshly started after the system was
+ shutdown and we're going over an incomplete shard again
+ after being restarted, or the shard lock period was too
+ short (number of workers set incorrectly?) and a 2nd
+ wirewatcher has been stealing our work while we are still
+ at it. */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Attempted to import transaction %llu (%s) twice. "
+ "This should happen rarely (if not, ask for support).\n",
+ (unsigned long long) cd->serial_id,
+ job_name);
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Imported transaction %llu.",
+ (unsigned long long) cd->serial_id);
+ /* normal case */
+ progress = true;
+ break;
+ }
+ }
+ }
+
+ latest_row_off = lroff;
+ shard_done = (shard_end <= latest_row_off);
+ if (shard_done)
+ {
+ /* shard is complete, mark this as well */
+ qs = db_plugin->complete_shard (db_plugin->cls,
+ job_name,
+ shard_start,
+ shard_end);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Got DB soft error for complete_shard. Rolling back.\n");
+ handle_soft_error ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0);
+ /* Not expected, but let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ progress = true;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Completed shard %s (%llu,%llu] after %s\n",
+ job_name,
+ (unsigned long long) shard_start,
+ (unsigned long long) shard_end,
+ GNUNET_STRINGS_relative_time_to_string (
+ GNUNET_TIME_absolute_get_duration (shard_start_time),
+ true));
+ break;
+ }
+ shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time);
+ shard_open = false;
+ transaction_completed ();
+ return;
+ }
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&continue_with_shard,
+ NULL);
+}
+
+
+/**
* Callbacks of this type are used to serve the result of asking
* the bank for the transaction history.
*
@@ -667,7 +846,11 @@ static void
history_cb (void *cls,
const struct TALER_BANK_CreditHistoryResponse *reply)
{
+ static int batch_mode = -1;
+
(void) cls;
+ if (-1 == batch_mode)
+ batch_mode = (NULL != getenv ("TALER_USE_BATCH"));
GNUNET_assert (NULL == task);
hh = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -676,8 +859,12 @@ history_cb (void *cls,
switch (reply->http_status)
{
case MHD_HTTP_OK:
- process_reply (reply->details.success.details,
- reply->details.success.details_length);
+ if (0 == batch_mode)
+ process_reply (reply->details.success.details,
+ reply->details.success.details_length);
+ else
+ process_reply_batched (reply->details.success.details,
+ reply->details.success.details_length);
return;
case MHD_HTTP_NO_CONTENT:
transaction_completed ();