From b6b80e61f49db3d5a4a796d95093c1b6784d3f3f Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Mon, 19 Dec 2022 21:41:32 +0100 Subject: refactor wirewatch to enable use of batch API --- src/exchange/taler-exchange-wirewatch.c | 191 +++++++++++++++++++++++++- src/exchangedb/pg_batch2_reserves_in_insert.c | 75 +++++----- src/exchangedb/pg_batch_reserves_in_insert.c | 62 ++++----- src/include/taler_exchangedb_plugin.h | 26 ++-- 4 files changed, 273 insertions(+), 81 deletions(-) (limited to 'src') diff --git a/src/exchange/taler-exchange-wirewatch.c b/src/exchange/taler-exchange-wirewatch.c index 47ecba684..d7eaa7e05 100644 --- a/src/exchange/taler-exchange-wirewatch.c +++ b/src/exchange/taler-exchange-wirewatch.c @@ -656,6 +656,185 @@ process_reply (const struct TALER_BANK_CreditDetails *details, } +/** + * We got incoming transaction details from the bank. Add them + * to the database. + * + * @param details array of transaction details + * @param details_length length of the @a details array + */ +static void +process_reply_batched (const struct TALER_BANK_CreditDetails *details, + unsigned int details_length) +{ + enum GNUNET_DB_QueryStatus qs; + bool shard_done; + uint64_t lroff = latest_row_off; + + if (0 == details_length) + { + /* Server should have used 204, not 200! */ + GNUNET_break_op (0); + transaction_completed (); + return; + } + /* check serial IDs for range constraints */ + for (unsigned int i = 0; iserial_id < lroff) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Serial ID %llu not monotonic (got %llu before). Failing!\n", + (unsigned long long) cd->serial_id, + (unsigned long long) lroff); + db_plugin->rollback (db_plugin->cls); + GNUNET_SCHEDULER_shutdown (); + return; + } + if (cd->serial_id > shard_end) + { + /* we are *past* the current shard (likely because the serial_id of the + shard_end happens to not exist in the DB). So commit and stop this + iteration! */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serial ID %llu past shard end at %llu, ending iteration early!\n", + (unsigned long long) cd->serial_id, + (unsigned long long) shard_end); + details_length = i; + progress = true; + lroff = cd->serial_id - 1; + break; + } + lroff = cd->serial_id; + } + if (0 != details_length) + { + enum GNUNET_DB_QueryStatus qss[details_length]; + struct TALER_EXCHANGEDB_ReserveInInfo reserves[details_length]; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Importing %u transactions\n", + details_length); + for (unsigned int i = 0; ireserve_pub = &cd->reserve_pub; + res->balance = &cd->amount; + res->execution_time = cd->execution_date; + res->sender_account_details = cd->debit_account_uri; + res->exchange_account_name = ai->section_name; + res->wire_reference = cd->serial_id; + } + qs = db_plugin->batch_reserves_in_insert (db_plugin->cls, + reserves, + details_length, + qss); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for batch_reserves_in_insert. Rolling back.\n"); + handle_soft_error (); + return; + default: + break; + } + for (unsigned int i = 0; iserial_id, + job_name); + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Imported transaction %llu.", + (unsigned long long) cd->serial_id); + /* normal case */ + progress = true; + break; + } + } + } + + latest_row_off = lroff; + shard_done = (shard_end <= latest_row_off); + if (shard_done) + { + /* shard is complete, mark this as well */ + qs = db_plugin->complete_shard (db_plugin->cls, + job_name, + shard_start, + shard_end); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Got DB soft error for complete_shard. Rolling back.\n"); + handle_soft_error (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); + /* Not expected, but let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + progress = true; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Completed shard %s (%llu,%llu] after %s\n", + job_name, + (unsigned long long) shard_start, + (unsigned long long) shard_end, + GNUNET_STRINGS_relative_time_to_string ( + GNUNET_TIME_absolute_get_duration (shard_start_time), + true)); + break; + } + shard_delay = GNUNET_TIME_absolute_get_duration (shard_start_time); + shard_open = false; + transaction_completed (); + return; + } + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&continue_with_shard, + NULL); +} + + /** * Callbacks of this type are used to serve the result of asking * the bank for the transaction history. @@ -667,7 +846,11 @@ static void history_cb (void *cls, const struct TALER_BANK_CreditHistoryResponse *reply) { + static int batch_mode = -1; + (void) cls; + if (-1 == batch_mode) + batch_mode = (NULL != getenv ("TALER_USE_BATCH")); GNUNET_assert (NULL == task); hh = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -676,8 +859,12 @@ history_cb (void *cls, switch (reply->http_status) { case MHD_HTTP_OK: - process_reply (reply->details.success.details, - reply->details.success.details_length); + if (0 == batch_mode) + process_reply (reply->details.success.details, + reply->details.success.details_length); + else + process_reply_batched (reply->details.success.details, + reply->details.success.details_length); return; case MHD_HTTP_NO_CONTENT: transaction_completed (); diff --git a/src/exchangedb/pg_batch2_reserves_in_insert.c b/src/exchangedb/pg_batch2_reserves_in_insert.c index 8aca11de0..77120254c 100644 --- a/src/exchangedb/pg_batch2_reserves_in_insert.c +++ b/src/exchangedb/pg_batch2_reserves_in_insert.c @@ -54,9 +54,10 @@ compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub) enum GNUNET_DB_QueryStatus TEH_PG_batch2_reserves_in_insert (void *cls, - const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, - unsigned int reserves_length, - enum GNUNET_DB_QueryStatus *results) + const struct + TALER_EXCHANGEDB_ReserveInInfo *reserves, + unsigned int reserves_length, + enum GNUNET_DB_QueryStatus *results) { struct PostgresClosure *pg = cls; enum GNUNET_DB_QueryStatus qs1; @@ -102,15 +103,15 @@ TEH_PG_batch2_reserves_in_insert (void *cls, pg->legal_reserve_expiration_time)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating reserve %s with expiration in %s\n", - TALER_B2S (&(reserves->reserve_pub)), + TALER_B2S (reserves->reserve_pub), GNUNET_STRINGS_relative_time_to_string ( pg->idle_reserve_expiration_time, GNUNET_NO)); { if (GNUNET_OK != - TEH_PG_start_read_committed(pg, - "READ_COMMITED")) + TEH_PG_start_read_committed (pg, + "READ_COMMITED")) { GNUNET_break (0); return GNUNET_DB_STATUS_HARD_ERROR; @@ -120,31 +121,31 @@ TEH_PG_batch2_reserves_in_insert (void *cls, time; we do this before adding the actual transaction to "reserves_in", as for a new reserve it can't be a duplicate 'add' operation, and as the 'add' operation needs the reserve entry as a foreign key. */ - for (unsigned int i=0;ireserve_pub); + notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub); } - for (unsigned int i=0;i<(reserves_length & ~1);i+=2) + for (unsigned int i = 0; i<(reserves_length & ~1); i += 2) { const struct TALER_EXCHANGEDB_ReserveInInfo *reserve0 = &reserves[i]; - const struct TALER_EXCHANGEDB_ReserveInInfo *reserve1 = &reserves[i+1]; + const struct TALER_EXCHANGEDB_ReserveInInfo *reserve1 = &reserves[i + 1]; struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_auto_from_type (&reserve0->reserve_pub), + GNUNET_PQ_query_param_auto_from_type (reserve0->reserve_pub), GNUNET_PQ_query_param_timestamp (&expiry), GNUNET_PQ_query_param_timestamp (&gc), GNUNET_PQ_query_param_uint64 (&reserve0->wire_reference), - TALER_PQ_query_param_amount (&reserve0->balance), + TALER_PQ_query_param_amount (reserve0->balance), GNUNET_PQ_query_param_string (reserve0->exchange_account_name), GNUNET_PQ_query_param_timestamp (&reserve0->execution_time), GNUNET_PQ_query_param_auto_from_type (&h_payto), GNUNET_PQ_query_param_string (reserve0->sender_account_details), GNUNET_PQ_query_param_timestamp (&reserve_expiration), GNUNET_PQ_query_param_string (notify_s[i]), - GNUNET_PQ_query_param_auto_from_type (&reserve1->reserve_pub), + GNUNET_PQ_query_param_auto_from_type (reserve1->reserve_pub), GNUNET_PQ_query_param_uint64 (&reserve1->wire_reference), - TALER_PQ_query_param_amount (&reserve1->balance), + TALER_PQ_query_param_amount (reserve1->balance), GNUNET_PQ_query_param_string (reserve1->exchange_account_name), GNUNET_PQ_query_param_timestamp (&reserve1->execution_time), GNUNET_PQ_query_param_auto_from_type (&h_payto), @@ -190,24 +191,26 @@ TEH_PG_batch2_reserves_in_insert (void *cls, } if (reserves_length & 1) { - const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[reserves_length-1]; + const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = + &reserves[reserves_length - 1]; // single insert logic here } - GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1); - results[i] = (transaction_duplicate) + GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1); + results[i] = (transaction_duplicate) ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - conflicts[i] = conflicted; - conflicts2[i] = conflicted2; - // fprintf(stdout, "%d", conflicts[i]); - // fprintf(stdout, "%d", conflicts2[i]); - if ((!conflicts[i] && transaction_duplicate) ||(!conflicts2[i] && transaction_duplicate2)) - { - GNUNET_break (0); - TEH_PG_rollback (pg); - return GNUNET_DB_STATUS_HARD_ERROR; - } - need_update |= conflicted |= conflicted2; + conflicts[i] = conflicted; + conflicts2[i] = conflicted2; + // fprintf(stdout, "%d", conflicts[i]); + // fprintf(stdout, "%d", conflicts2[i]); + if ((! conflicts[i] && transaction_duplicate) || (! conflicts2[i] && + transaction_duplicate2)) + { + GNUNET_break (0); + TEH_PG_rollback (pg); + return GNUNET_DB_STATUS_HARD_ERROR; + } + need_update |= conflicted |= conflicted2; } // commit { @@ -218,13 +221,13 @@ TEH_PG_batch2_reserves_in_insert (void *cls, return cs; } - if (!need_update) + if (! need_update) goto exit; // begin serializable { if (GNUNET_OK != - TEH_PG_start(pg, - "reserve-insert-continued")) + TEH_PG_start (pg, + "reserve-insert-continued")) { GNUNET_break (0); return GNUNET_DB_STATUS_HARD_ERROR; @@ -236,17 +239,17 @@ TEH_PG_batch2_reserves_in_insert (void *cls, "reserves_in_add_transaction", "SELECT batch_reserves_update" " ($1,$2,$3,$4,$5,$6,$7,$8,$9);"); - for (unsigned int i=0;ireserve_pub), + GNUNET_PQ_query_param_auto_from_type (reserve->reserve_pub), GNUNET_PQ_query_param_timestamp (&expiry), GNUNET_PQ_query_param_uint64 (&reserve->wire_reference), - TALER_PQ_query_param_amount (&reserve->balance), + TALER_PQ_query_param_amount (reserve->balance), GNUNET_PQ_query_param_string (reserve->exchange_account_name), GNUNET_PQ_query_param_bool (conflicted), GNUNET_PQ_query_param_auto_from_type (&h_payto), @@ -274,8 +277,8 @@ TEH_PG_batch2_reserves_in_insert (void *cls, return cs; } - exit: - for (unsigned int i=0;ilegal_reserve_expiration_time)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Creating reserve %s with expiration in %s\n", - TALER_B2S (&(reserves->reserve_pub)), + TALER_B2S (reserves->reserve_pub), GNUNET_STRINGS_relative_time_to_string ( pg->idle_reserve_expiration_time, GNUNET_NO)); { if (GNUNET_OK != - TEH_PG_start_read_committed(pg, - "READ_COMMITED")) + TEH_PG_start_read_committed (pg, + "READ_COMMITED")) { GNUNET_break (0); return GNUNET_DB_STATUS_HARD_ERROR; @@ -117,18 +117,18 @@ TEH_PG_batch_reserves_in_insert (void *cls, for (unsigned int i = 0; ireserve_pub); + notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub); } - for (unsigned int i=0;ireserve_pub), + GNUNET_PQ_query_param_auto_from_type (reserve->reserve_pub), GNUNET_PQ_query_param_timestamp (&expiry), GNUNET_PQ_query_param_timestamp (&gc), GNUNET_PQ_query_param_uint64 (&reserve->wire_reference), - TALER_PQ_query_param_amount (&reserve->balance), + TALER_PQ_query_param_amount (reserve->balance), GNUNET_PQ_query_param_string (reserve->exchange_account_name), GNUNET_PQ_query_param_timestamp (&reserve->execution_time), GNUNET_PQ_query_param_auto_from_type (&h_payto), @@ -164,19 +164,19 @@ TEH_PG_batch_reserves_in_insert (void *cls, qs1); return qs1; } - GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1); - results[i] = (transaction_duplicate) + GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1); + results[i] = (transaction_duplicate) ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - conflicts[i] = conflicted; - // fprintf(stdout, "%d", conflicts[i]); - if (!conflicts[i] && transaction_duplicate) - { - GNUNET_break (0); - TEH_PG_rollback (pg); - return GNUNET_DB_STATUS_HARD_ERROR; - } - need_update |= conflicted; + conflicts[i] = conflicted; + // fprintf(stdout, "%d", conflicts[i]); + if (! conflicts[i] && transaction_duplicate) + { + GNUNET_break (0); + TEH_PG_rollback (pg); + return GNUNET_DB_STATUS_HARD_ERROR; + } + need_update |= conflicted; } // commit { @@ -187,13 +187,13 @@ TEH_PG_batch_reserves_in_insert (void *cls, return cs; } - if (!need_update) + if (! need_update) goto exit; // begin serializable { if (GNUNET_OK != - TEH_PG_start(pg, - "reserve-insert-continued")) + TEH_PG_start (pg, + "reserve-insert-continued")) { GNUNET_break (0); return GNUNET_DB_STATUS_HARD_ERROR; @@ -205,17 +205,17 @@ TEH_PG_batch_reserves_in_insert (void *cls, "reserves_in_add_transaction", "SELECT batch_reserves_update" " ($1,$2,$3,$4,$5,$6,$7,$8,$9);"); - for (unsigned int i=0;ireserve_pub), + GNUNET_PQ_query_param_auto_from_type (reserve->reserve_pub), GNUNET_PQ_query_param_timestamp (&expiry), GNUNET_PQ_query_param_uint64 (&reserve->wire_reference), - TALER_PQ_query_param_amount (&reserve->balance), + TALER_PQ_query_param_amount (reserve->balance), GNUNET_PQ_query_param_string (reserve->exchange_account_name), GNUNET_PQ_query_param_bool (conflicted), GNUNET_PQ_query_param_auto_from_type (&h_payto), @@ -243,8 +243,8 @@ TEH_PG_batch_reserves_in_insert (void *cls, return cs; } - exit: - for (unsigned int i=0;i