diff options
author | Christian Grothoff <christian@grothoff.org> | 2022-12-19 21:41:32 +0100 |
---|---|---|
committer | Christian Grothoff <christian@grothoff.org> | 2022-12-19 21:41:32 +0100 |
commit | b6b80e61f49db3d5a4a796d95093c1b6784d3f3f (patch) | |
tree | a06335764bc3ed9edc42236b62c29fcda18b50c8 /src/exchange/taler-exchange-wirewatch.c | |
parent | 709ca561d27d801f405b49d886e9db24b073a785 (diff) | |
download | exchange-b6b80e61f49db3d5a4a796d95093c1b6784d3f3f.tar.gz exchange-b6b80e61f49db3d5a4a796d95093c1b6784d3f3f.tar.bz2 exchange-b6b80e61f49db3d5a4a796d95093c1b6784d3f3f.zip |
refactor wirewatch to enable use of batch API
Diffstat (limited to 'src/exchange/taler-exchange-wirewatch.c')
-rw-r--r-- | src/exchange/taler-exchange-wirewatch.c | 191 |
1 files changed, 189 insertions, 2 deletions
diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 47ecba684..d7eaa7e05 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -657,6 +657,185 @@ process_reply (const struct TALER_BANK_CreditDetails *details, /** + * We got incoming transaction details from the bank. Add them + * to the database. + * + * @param details array of transaction details + * @param details_length length of the @a details array + */ +static void +process_reply_batched (const struct TALER_BANK_CreditDetails *details, + unsigned int details_length) +{ + enum GNUNET_DB_QueryStatus qs; + bool shard_done; + uint64_t lroff = latest_row_off; + + if (0 == details_length) + { + /* Server should have used 204, not 200! */ + GNUNET_break_op (0); + transaction_completed (); + return; + } + /* check serial IDs for range constraints */ + for (unsigned int i = 0; i<details_length; i++) + { + const struct TALER_BANK_CreditDetails *cd = &details[i]; + + if (cd->serial_id < lroff) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Serial ID %llu not monotonic (got %llu before). Failing!\n", + (unsigned long long) cd->serial_id, + (unsigned long long) lroff); + db_plugin->rollback (db_plugin->cls); + GNUNET_SCHEDULER_shutdown (); + return; + } + if (cd->serial_id > shard_end) + { + /* we are *past* the current shard (likely because the serial_id of the + shard_end happens to not exist in the DB). So commit and stop this + iteration! */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serial ID %llu past shard end at %llu, ending iteration early!\n", + (unsigned long long) cd->serial_id, + (unsigned long long) shard_end); + details_length = i; + progress = true; + lroff = cd->serial_id - 1; + break; + } + lroff = cd->serial_id; + } + if (0 != details_length) + { + enum GNUNET_DB_QueryStatus qss[details_length]; + struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length]; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Importing %u transactions\n", + details_length); + for (unsigned int i = 0; i<details_length; i++) + { + const struct TALER_BANK_CreditDetails *cd = &details[i]; + struct TALER_EXCHANGEDB_ReserveInInfo *res = &reserves[i]; + + res->reserve_pub = &cd->reserve_pub; + res->balance = &cd->amount; + res->execution_time = cd->execution_date; + res->sender_account_details = cd->debit_account_uri; + res->exchange_account_name = ai->section_name; + res->wire_reference = cd->serial_id; + } + qs = db_plugin->batch_reserves_in_insert (db_plugin->cls, + reserves, + details_length, + qss); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for batch_reserves_in_insert. Rolling back.\n"); + handle_soft_error (); + return; + default: + break; + } + for (unsigned int i = 0; i<details_length; i++) + { + const struct TALER_BANK_CreditDetails *cd = &details[i]; + + switch (qss[i]) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for batch_reserves_in_insert(%u). Rolling back.\n", + i); + handle_soft_error (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* Either wirewatch was freshly started after the system was + shutdown and we're going over an incomplete shard again + after being restarted, or the shard lock period was too + short (number of workers set incorrectly?) and a 2nd + wirewatcher has been stealing our work while we are still + at it. */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Attempted to import transaction %llu (%s) twice. " + "This should happen rarely (if not, ask for support).\n", + (unsigned long long) cd->serial_id, + job_name); + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Imported transaction %llu.", + (unsigned long long) cd->serial_id); + /* normal case */ + progress = true; + break; + } + } + } + + latest_row_off = lroff; + shard_done = (shard_end <= latest_row_off); + if (shard_done) + { + /* shard is complete, mark this as well */ + qs = db_plugin->complete_shard (db_plugin->cls, + job_name, + shard_start, + shard_end); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for complete_shard. Rolling back.\n"); + handle_soft_error (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); + /* Not expected, but let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + progress = true; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Completed shard %s (%llu,%llu] after %s\n", + job_name, + (unsigned long long) shard_start, + (unsigned long long) shard_end, + GNUNET_STRINGS_relative_time_to_string ( + GNUNET_TIME_absolute_get_duration (shard_start_time), + true)); + break; + } + shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time); + shard_open = false; + transaction_completed (); + return; + } + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&continue_with_shard, + NULL); +} + + +/** * Callbacks of this type are used to serve the result of asking * the bank for the transaction history. * @@ -667,7 +846,11 @@ static void history_cb (void *cls, const struct TALER_BANK_CreditHistoryResponse *reply) { + static int batch_mode = -1; + (void) cls; + if (-1 == batch_mode) + batch_mode = (NULL != getenv ("TALER_USE_BATCH")); GNUNET_assert (NULL == task); hh = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -676,8 +859,12 @@ history_cb (void *cls, switch (reply->http_status) { case MHD_HTTP_OK: - process_reply (reply->details.success.details, - reply->details.success.details_length); + if (0 == batch_mode) + process_reply (reply->details.success.details, + reply->details.success.details_length); + else + process_reply_batched (reply->details.success.details, + reply->details.success.details_length); return; case MHD_HTTP_NO_CONTENT: transaction_completed (); |