From 09ec056da4ebbbc3437df05344075a4d109ec204 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 30 Jul 2023 16:28:47 +0200 Subject: misc bug fixes in batch insert --- src/benchmark/benchmark-common.conf | 3 + src/benchmark/taler-bank-benchmark.c | 14 +- src/exchangedb/exchange_do_reserves_in_insert.sql | 79 +++-- src/exchangedb/pg_reserves_in_insert.c | 396 ++++++++++++---------- 4 files changed, 267 insertions(+), 225 deletions(-) diff --git a/src/benchmark/benchmark-common.conf b/src/benchmark/benchmark-common.conf index 11e106d2f..3a11ea317 100644 --- a/src/benchmark/benchmark-common.conf +++ b/src/benchmark/benchmark-common.conf @@ -75,6 +75,9 @@ CONFIG="postgres:///talercheck" [syncdb-postgres] CONFIG="postgres:///talercheck" +[exchange] +WIREWATCH_IDLE_SLEEP_INTERVAL = 5000 ms + [bank] HTTP_PORT=8082 SERVE=http diff --git a/src/benchmark/taler-bank-benchmark.c b/src/benchmark/taler-bank-benchmark.c index 224b6d5d1..cde6217e4 100644 --- a/src/benchmark/taler-bank-benchmark.c +++ b/src/benchmark/taler-bank-benchmark.c @@ -38,6 +38,7 @@ #include "taler_testing_lib.h" #include "taler_error_codes.h" +#define SHARD_SIZE "1024" /** * Credentials to use for the benchmark. @@ -74,11 +75,6 @@ static unsigned int howmany_reserves = 1; */ static unsigned int howmany_clients = 1; -/** - * How many bank worker threads do we want to create. - */ -static unsigned int howmany_threads; - /** * How many wirewatch processes do we want to create. */ @@ -339,6 +335,7 @@ parallel_benchmark (void) "taler-exchange-wirewatch", "-c", cfg_filename, "-a", exchange_bank_section, + "-S", SHARD_SIZE, (NULL != loglev) ? "-L" : NULL, loglev, NULL); @@ -376,6 +373,7 @@ parallel_benchmark (void) "taler-exchange-wirewatch", "-c", cfg_filename, "-a", exchange_bank_section, + "-S", SHARD_SIZE, "-t", (NULL != loglev) ? "-L" : NULL, loglev, @@ -405,6 +403,7 @@ parallel_benchmark (void) "taler-exchange-wirewatch", "-c", cfg_filename, "-a", exchange_bank_section, + "-S", SHARD_SIZE, "-t", (NULL != loglev) ? "-L" : NULL, loglev, @@ -451,11 +450,6 @@ main (int argc, "NPROCS", "How many client processes we should run", &howmany_clients), - GNUNET_GETOPT_option_uint ('P', - "service-parallelism", - "NTHREADS", - "How many service threads we should create", - &howmany_threads), GNUNET_GETOPT_option_uint ('r', "reserves", "NRESERVES", diff --git a/src/exchangedb/exchange_do_reserves_in_insert.sql b/src/exchangedb/exchange_do_reserves_in_insert.sql index b88aaa44d..f8c7f1971 100644 --- a/src/exchangedb/exchange_do_reserves_in_insert.sql +++ b/src/exchangedb/exchange_do_reserves_in_insert.sql @@ -922,47 +922,47 @@ DECLARE conflict BOOL; dup BOOL; uuid INT8; - i RECORD; + i INT4; + ini_reserve_pub BYTEA; + ini_wire_ref INT8; + ini_credit taler_amount; + ini_exchange_account_name TEXT; + ini_execution_date INT8; + ini_wire_source_h_payto BYTEA; + ini_payto_uri TEXT; + ini_notify TEXT; BEGIN - INSERT INTO wire_targets - (wire_target_h_payto - ,payto_uri) - SELECT - wire_source_h_payto - ,payto_uri - FROM - UNNEST (ina_wire_source_h_payto) AS wire_source_h_payto - ,UNNEST (ina_payto_uri) AS payto_uri - ON CONFLICT DO NOTHING; - - FOR i IN - SELECT - reserve_pub - ,wire_ref - ,credit - ,exchange_account_name - ,execution_date - ,wire_source_h_payto - ,payto_uri - ,notify - FROM - UNNEST (ina_reserve_pub) AS reserve_pub - ,UNNEST (ina_wire_ref) AS wire_ref - ,UNNEST (ina_credit) AS credit - ,UNNEST (ina_exchange_account_name) AS exchange_account_name - ,UNNEST (ina_execution_date) AS execution_date - ,UNNEST (ina_wire_source_h_payto) AS wire_source_h_payto - ,UNNEST (ina_notify) AS notify + FOR i IN 1..array_length(ina_reserve_pub,1) LOOP + ini_reserve_pub = ina_reserve_pub[i]; + ini_wire_ref = ina_wire_ref[i]; + ini_credit = ina_credit[i]; + ini_exchange_account_name = ina_exchange_account_name[i]; + ini_execution_date = ina_execution_date[i]; + ini_wire_source_h_payto = ina_wire_source_h_payto[i]; + ini_payto_uri = ina_payto_uri[i]; + ini_notify = ina_notify[i]; + +-- RAISE WARNING 'Starting loop on %', ini_notify; + + INSERT INTO wire_targets + (wire_target_h_payto + ,payto_uri + ) VALUES ( + ini_wire_source_h_payto + ,ini_payto_uri + ) + ON CONFLICT DO NOTHING; + INSERT INTO reserves (reserve_pub ,current_balance ,expiration_date ,gc_date ) VALUES ( - i.reserve_pub - ,i.credit + ini_reserve_pub + ,ini_credit ,in_reserve_expiration ,in_gc_date ) @@ -979,12 +979,12 @@ BEGIN ,wire_source_h_payto ,execution_date ) VALUES ( - i.reserve_pub - ,i.wire_reference - ,i.credit - ,i.exchange_account_section - ,i.wire_source_h_payto - ,i.execution_date + ini_reserve_pub + ,ini_wire_ref + ,ini_credit + ,ini_exchange_account_name + ,ini_wire_source_h_payto + ,ini_execution_date ) ON CONFLICT DO NOTHING; @@ -1001,12 +1001,11 @@ BEGIN THEN EXECUTE FORMAT ( 'NOTIFY %s' - ,i.notify); + ,ini_notify); END IF; dup = FALSE; END IF; RETURN NEXT (dup,uuid); END LOOP; - RETURN; END $$; diff --git a/src/exchangedb/pg_reserves_in_insert.c b/src/exchangedb/pg_reserves_in_insert.c index f4fe655ec..77a68c90d 100644 --- a/src/exchangedb/pg_reserves_in_insert.c +++ b/src/exchangedb/pg_reserves_in_insert.c @@ -513,6 +513,8 @@ transact ( if (! need_update) return reserves_length; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "updated needed!\n"); if (GNUNET_OK != TEH_PG_start (pg, "reserve-insert-continued")) @@ -689,7 +691,8 @@ helper_cb (void *cls, ctx->status = GNUNET_SYSERR; return; } - ctx->needs_update |= ctx->conflicts[i]; + if (! ctx->transaction_duplicates[i]) + ctx->needs_update |= ctx->conflicts[i]; } } @@ -697,208 +700,251 @@ helper_cb (void *cls, enum GNUNET_DB_QueryStatus TEH_PG_reserves_in_insert ( void *cls, - const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, - unsigned int reserves_length, + const struct TALER_EXCHANGEDB_ReserveInInfo *real_reserves, + unsigned int real_reserves_length, unsigned int batch_size, enum GNUNET_DB_QueryStatus *results) { - (void) batch_size; struct PostgresClosure *pg = cls; - struct TALER_PaytoHashP h_paytos[GNUNET_NZL (reserves_length)]; - char *notify_s[GNUNET_NZL (reserves_length)]; - struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)]; - struct TALER_Amount balances[GNUNET_NZL (reserves_length)]; - struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)]; - const char *sender_account_details[GNUNET_NZL (reserves_length)]; - const char *exchange_account_names[GNUNET_NZL (reserves_length)]; - uint64_t wire_references[GNUNET_NZL (reserves_length)]; - uint64_t reserve_uuids[GNUNET_NZL (reserves_length)]; - bool transaction_duplicates[GNUNET_NZL (reserves_length)]; - bool conflicts[GNUNET_NZL (reserves_length)]; - struct GNUNET_TIME_Timestamp reserve_expiration - = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); - struct GNUNET_TIME_Timestamp gc - = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time); - enum GNUNET_DB_QueryStatus qs; - bool need_update; - - for (unsigned int i = 0; isender_account_details, - &h_paytos[i]); - notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub); - reserve_pubs[i] = *reserve->reserve_pub; - balances[i] = *reserve->balance; - execution_times[i] = reserve->execution_time; - sender_account_details[i] = reserve->sender_account_details; - exchange_account_names[i] = reserve->exchange_account_name; - wire_references[i] = reserve->wire_reference; - } + unsigned int dups = 0; + batch_size = real_reserves_length; + enum GNUNET_DB_QueryStatus rqs = 0; - /* NOTE: kind-of pointless to explicitly start a transaction here... */ - if (GNUNET_OK != - TEH_PG_preflight (pg)) + for (unsigned int batch = 0; batch < real_reserves_length; + batch += batch_size) { - GNUNET_break (0); - qs = GNUNET_DB_STATUS_HARD_ERROR; - goto finished; - } - - if (GNUNET_OK != - TEH_PG_start_read_committed (pg, - "READ_COMMITED")) - { - GNUNET_break (0); - qs = GNUNET_DB_STATUS_HARD_ERROR; - goto finished; - } - - PREPARE (pg, - "reserves_insert_with_array", - "SELECT" - " transaction_duplicate" - ",ruuid" - " FROM exchange_do_array_reserves_insert" - " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10);"); - { - struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_timestamp (&gc), - GNUNET_PQ_query_param_timestamp (&reserve_expiration), - GNUNET_PQ_query_param_array_auto_from_type (reserves_length, - reserve_pubs, - pg->conn), - GNUNET_PQ_query_param_array_uint64 (reserves_length, - wire_references, - pg->conn), - TALER_PQ_query_param_array_amount ( - reserves_length, - balances, - pg->conn), - GNUNET_PQ_query_param_array_ptrs_string ( - reserves_length, - (const char **) exchange_account_names, - pg->conn), - GNUNET_PQ_query_param_array_timestamp ( - reserves_length, - execution_times, - pg->conn), - GNUNET_PQ_query_param_array_auto_from_type ( - reserves_length, - h_paytos, - pg->conn), - GNUNET_PQ_query_param_array_ptrs_string ( - reserves_length, - (const char **) sender_account_details, - pg->conn), - GNUNET_PQ_query_param_array_ptrs_string ( - reserves_length, - (const char **) notify_s, - pg->conn), - GNUNET_PQ_query_param_end - }; - struct Context ctx = { - .reserve_uuids = reserve_uuids, - .transaction_duplicates = transaction_duplicates, - .conflicts = conflicts, - .needs_update = false, - .status = GNUNET_OK - }; + const struct TALER_EXCHANGEDB_ReserveInInfo *reserves = + &real_reserves[batch]; + unsigned int reserves_length = GNUNET_MIN (batch_size, + real_reserves_length - batch); + struct TALER_PaytoHashP h_paytos[GNUNET_NZL (reserves_length)]; + char *notify_s[GNUNET_NZL (reserves_length)]; + struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)]; + struct TALER_Amount balances[GNUNET_NZL (reserves_length)]; + struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)]; + const char *sender_account_details[GNUNET_NZL (reserves_length)]; + const char *exchange_account_names[GNUNET_NZL (reserves_length)]; + uint64_t wire_references[GNUNET_NZL (reserves_length)]; + uint64_t reserve_uuids[GNUNET_NZL (reserves_length)]; + bool transaction_duplicates[GNUNET_NZL (reserves_length)]; + bool conflicts[GNUNET_NZL (reserves_length)]; + struct GNUNET_TIME_Timestamp reserve_expiration + = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); + struct GNUNET_TIME_Timestamp gc + = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time); + enum GNUNET_DB_QueryStatus qs; + bool need_update; + + for (unsigned int i = 0; isender_account_details, + &h_paytos[i]); + notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub); + reserve_pubs[i] = *reserve->reserve_pub; + balances[i] = *reserve->balance; + execution_times[i] = reserve->execution_time; + sender_account_details[i] = reserve->sender_account_details; + exchange_account_names[i] = reserve->exchange_account_name; + wire_references[i] = reserve->wire_reference; + } - qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, - "reserves_insert_with_array", - params, - &helper_cb, - &ctx); - if ( (qs < 0) || - (GNUNET_OK != ctx.status) ) + /* NOTE: kind-of pointless to explicitly start a transaction here... */ + if (GNUNET_OK != + TEH_PG_preflight (pg)) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to insert into reserves (%d)\n", - qs); + GNUNET_break (0); + qs = GNUNET_DB_STATUS_HARD_ERROR; goto finished; } - need_update = ctx.needs_update; - } - - { - enum GNUNET_DB_QueryStatus cs; - - cs = TEH_PG_commit (pg); - if (cs < 0) + if (GNUNET_OK != + TEH_PG_start_read_committed (pg, + "READ_COMMITED")) { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to commit\n"); - qs = cs; + GNUNET_break (0); + qs = GNUNET_DB_STATUS_HARD_ERROR; goto finished; } - } - for (unsigned int i = 0; iconn, - &balances[i]), - GNUNET_PQ_query_param_string (exchange_account_names[i]), - GNUNET_PQ_query_param_auto_from_type (&h_paytos[i]), - GNUNET_PQ_query_param_string (notify_s[i]), + GNUNET_PQ_query_param_array_auto_from_type (reserves_length, + reserve_pubs, + pg->conn), + GNUNET_PQ_query_param_array_uint64 (reserves_length, + wire_references, + pg->conn), + TALER_PQ_query_param_array_amount ( + reserves_length, + balances, + pg->conn), + GNUNET_PQ_query_param_array_ptrs_string ( + reserves_length, + (const char **) exchange_account_names, + pg->conn), + GNUNET_PQ_query_param_array_timestamp ( + reserves_length, + execution_times, + pg->conn), + GNUNET_PQ_query_param_array_auto_from_type ( + reserves_length, + h_paytos, + pg->conn), + GNUNET_PQ_query_param_array_ptrs_string ( + reserves_length, + (const char **) sender_account_details, + pg->conn), + GNUNET_PQ_query_param_array_ptrs_string ( + reserves_length, + (const char **) notify_s, + pg->conn), GNUNET_PQ_query_param_end }; - struct GNUNET_PQ_ResultSpec rs[] = { - GNUNET_PQ_result_spec_bool ("duplicate", - &duplicate), - GNUNET_PQ_result_spec_end + struct Context ctx = { + .reserve_uuids = reserve_uuids, + .transaction_duplicates = transaction_duplicates, + .conflicts = conflicts, + .needs_update = false, + .status = GNUNET_OK }; - enum GNUNET_DB_QueryStatus qs; - qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, - "reserves_update", - params, - rs); - if (qs < 0) + qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, + "reserves_insert_with_array", + params, + &helper_cb, + &ctx); + if ( (qs < 0) || + (GNUNET_OK != ctx.status) ) { GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to update reserves (%d)\n", + "Failed to insert into reserves (%d)\n", qs); - results[i] = qs; goto finished; } - results[i] = duplicate + need_update = ctx.needs_update; + } + + { + enum GNUNET_DB_QueryStatus cs; + + cs = TEH_PG_commit (pg); + if (cs < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to commit\n"); + qs = cs; + goto finished; + } + } + + for (unsigned int i = 0; iconn, + &balances[i]), + GNUNET_PQ_query_param_string (exchange_account_names[i]), + GNUNET_PQ_query_param_auto_from_type (&h_paytos[i]), + GNUNET_PQ_query_param_string (notify_s[i]), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_bool ("duplicate", + &duplicate), + GNUNET_PQ_result_spec_end + }; + enum GNUNET_DB_QueryStatus qs; + + qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "reserves_update", + params, + rs); + if (qs < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to update reserves (%d)\n", + qs); + results[i] = qs; + goto finished; + } + results[i] = duplicate ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + } + } + { + enum GNUNET_DB_QueryStatus cs; + + cs = TEH_PG_commit (pg); + if (cs < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to commit\n"); + qs = cs; + goto finished; + } } - } finished: + for (unsigned int i = 0; iconn); - for (unsigned int i = 0; i