From f60e38077c7e2ddd2ac6ae3e35c4b27f3225512a Mon Sep 17 00:00:00 2001 From: Joseph Date: Wed, 16 Nov 2022 11:44:48 -0500 Subject: some modifications on batch_test for reserves_in --- src/exchangedb/Makefile.am | 18 +- src/exchangedb/pg_batch_reserves_in_insert.c | 284 +++++++++++++++++++++++++++ src/exchangedb/pg_batch_reserves_in_insert.h | 34 ++++ src/exchangedb/pg_reserves_in_insert.c | 14 +- src/exchangedb/plugin_exchangedb_postgres.c | 5 +- src/exchangedb/procedures.sql | 58 ++++++ src/exchangedb/test_exchangedb_by_j.c | 39 ++-- 7 files changed, 423 insertions(+), 29 deletions(-) create mode 100644 src/exchangedb/pg_batch_reserves_in_insert.c create mode 100644 src/exchangedb/pg_batch_reserves_in_insert.h (limited to 'src/exchangedb') diff --git a/src/exchangedb/Makefile.am b/src/exchangedb/Makefile.am index 4d892efef..59aeb3212 100644 --- a/src/exchangedb/Makefile.am +++ b/src/exchangedb/Makefile.am @@ -239,6 +239,7 @@ libtaler_plugin_exchangedb_postgres_la_SOURCES = \ pg_select_purse_deposits_above_serial_id.h pg_select_purse_deposits_above_serial_id.c \ pg_select_account_merges_above_serial_id.h pg_select_account_merges_above_serial_id.c \ pg_select_all_purse_decisions_above_serial_id.h pg_select_all_purse_decisions_above_serial_id.c \ + pg_batch_reserves_in_insert.h pg_batch_reserves_in_insert.c \ pg_select_reserve_open_above_serial_id.c pg_select_reserve_open_above_serial_id.h libtaler_plugin_exchangedb_postgres_la_LIBADD = \ $(LTLIBINTL) @@ -275,11 +276,13 @@ libtalerexchangedb_la_LDFLAGS = \ check_PROGRAMS = \ test-exchangedb-postgres \ bench-db-postgres\ + perf-exchangedb-reserves-in-insert-postgres\ test-exchangedb-by-j-postgres AM_TESTS_ENVIRONMENT=export TALER_PREFIX=$${TALER_PREFIX:-@libdir@};export PATH=$${TALER_PREFIX:-@prefix@}/bin:$$PATH; TESTS = \ test-exchangedb-postgres\ - test-exchangedb-by-j-postgres + test-exchangedb-by-j-postgres\ + perf-exchangedb-reserves-in-insert-postgres test_exchangedb_postgres_SOURCES = \ test_exchangedb.c test_exchangedb_postgres_LDADD = \ @@ -304,6 +307,19 @@ test_exchangedb_by_j_postgres_LDADD = \ -lgnunetutil \ $(XLIB) + +perf_exchangedb_reserves_in_insert_postgres_SOURCES = \ + perf_exchangedb_reserves_in_insert.c +perf_exchangedb_reserves_in_insert_postgres_LDADD = \ + libtalerexchangedb.la \ + $(top_builddir)/src/json/libtalerjson.la \ + $(top_builddir)/src/util/libtalerutil.la \ + $(top_builddir)/src/pq/libtalerpq.la \ + -ljansson \ + -lgnunetjson \ + -lgnunetutil \ + $(XLIB) + bench_db_postgres_SOURCES = \ bench_db.c bench_db_postgres_LDADD = \ diff --git a/src/exchangedb/pg_batch_reserves_in_insert.c b/src/exchangedb/pg_batch_reserves_in_insert.c new file mode 100644 index 000000000..d7ce47dc2 --- /dev/null +++ b/src/exchangedb/pg_batch_reserves_in_insert.c @@ -0,0 +1,284 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see + */ +/** + * @file exchangedb/pg_bash_reserves_in_insert.c + * @brief Implementation of the reserves_in_insert function for Postgres + * @author JOSEPHxu + */ +#include "platform.h" +#include "taler_error_codes.h" +#include "taler_dbevents.h" +#include "taler_pq_lib.h" +#include "pg_batch_reserves_in_insert.h" +#include "pg_helper.h" +#include "pg_start.h" +#include "pg_start_read_committed.h" +#include "pg_commit.h" +#include "pg_reserves_get.h" +#include "pg_reserves_update.h" +#include "pg_setup_wire_target.h" +#include "pg_event_notify.h" + + +/** + * Generate event notification for the reserve + * change. + * + * @param pg plugin state + * @param reserve_pub reserve to notfiy on + */ +static void +notify_on_reserve (struct PostgresClosure *pg, + const struct TALER_ReservePublicKeyP *reserve_pub) +{ + struct TALER_ReserveEventP rep = { + .header.size = htons (sizeof (rep)), + .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING), + .reserve_pub = *reserve_pub + }; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Notifying on reserve!\n"); + TEH_PG_event_notify (pg, + &rep.header, + NULL, + 0); +} + + +enum GNUNET_DB_QueryStatus +TEH_PG_batch_reserves_in_insert (void *cls, + const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, + unsigned int reserves_length, + enum GNUNET_DB_QueryStatus *results) +{ + struct PostgresClosure *pg = cls; + enum GNUNET_DB_QueryStatus qs1; + struct TALER_EXCHANGEDB_Reserve reserve; + struct GNUNET_TIME_Timestamp expiry; + struct GNUNET_TIME_Timestamp gc; + uint64_t reserve_uuid; + + reserve.pub = reserves->reserve_pub; + expiry = GNUNET_TIME_absolute_to_timestamp ( + GNUNET_TIME_absolute_add (reserves->execution_time.abs_time, + pg->idle_reserve_expiration_time)); + gc = GNUNET_TIME_absolute_to_timestamp ( + GNUNET_TIME_absolute_add (GNUNET_TIME_absolute_get (), + pg->legal_reserve_expiration_time)); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Creating reserve %s with expiration in %s\n", + TALER_B2S (&(reserves->reserve_pub)), + GNUNET_STRINGS_relative_time_to_string ( + pg->idle_reserve_expiration_time, + GNUNET_NO)); + /* Optimistically assume this is a new reserve, create balance for the first + time; we do this before adding the actual transaction to "reserves_in", + as for a new reserve it can't be a duplicate 'add' operation, and as + the 'add' operation needs the reserve entry as a foreign key. */ + { + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (&(reserves->reserve_pub)), + TALER_PQ_query_param_amount (&(reserves->balance)), + GNUNET_PQ_query_param_timestamp (&expiry), + GNUNET_PQ_query_param_timestamp (&gc), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("reserve_uuid", + &reserve_uuid), + GNUNET_PQ_result_spec_end + }; + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Reserve does not exist; creating a new one\n"); + /* Note: query uses 'on conflict do nothing' */ + PREPARE (pg, + "reserve_create", + "SELECT bash_reserves_in('34', '20','//asdddfs3', '60', '20'),bash_reserves_in('24', '10','//dfs3', '40', '50'),bash_reserves_in('42', '40','//d43', '40', '50'),bash_reserves_in('44', '10','//ghs3', '40', '50') AS existed from reserves;"); + + qs1 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "reserve_create", + params, + rs); + if (qs1 < 0) + return qs1; + } + + /* Create new incoming transaction, "ON CONFLICT DO NOTHING" + is again used to guard against duplicates. */ + { + enum GNUNET_DB_QueryStatus qs2; + enum GNUNET_DB_QueryStatus qs3; + struct TALER_PaytoHashP h_payto; + + qs3 = TEH_PG_setup_wire_target (pg, + reserves->sender_account_details, + &h_payto); + if (qs3 < 0) + return qs3; + /* We do not have the UUID, so insert by public key */ + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_auto_from_type (&reserve.pub), + GNUNET_PQ_query_param_uint64 (&(reserves->wire_reference)), + TALER_PQ_query_param_amount (&(reserves->balance)), + GNUNET_PQ_query_param_string (reserves->exchange_account_name), + GNUNET_PQ_query_param_auto_from_type (&h_payto), + GNUNET_PQ_query_param_timestamp (&reserves->execution_time), + GNUNET_PQ_query_param_end + }; + + PREPARE (pg, + "reserves_in_add_transaction", + "INSERT INTO reserves_in " + "(reserve_pub" + ",wire_reference" + ",credit_val" + ",credit_frac" + ",exchange_account_section" + ",wire_source_h_payto" + ",execution_date" + ") VALUES ($1, $2, $3, $4, $5, $6, $7)" + " ON CONFLICT DO NOTHING;"); + qs2 = GNUNET_PQ_eval_prepared_non_select (pg->conn, + "reserves_in_add_transaction", + params); + /* qs2 could be 0 as statement used 'ON CONFLICT DO NOTHING' */ + if (0 >= qs2) + { + if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs2) && + (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1) ) + { + /* Conflict for the transaction, but the reserve was + just now created, that should be impossible. */ + GNUNET_break (0); /* should be impossible: reserve was fresh, + but transaction already known */ + return GNUNET_DB_STATUS_HARD_ERROR; + } + /* Transaction was already known or error. We are finished. */ + return qs2; + } + } + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs1) + { + /* New reserve, we are finished */ + notify_on_reserve (pg, + &(reserves->reserve_pub)); + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + } + + /* we were wrong with our optimistic assumption: + reserve did already exist, need to do an update instead */ + { + /* We need to move away from 'read committed' to serializable. + Also, we know that it should be safe to commit at this point. + (We are only run in a larger transaction for performance.) */ + enum GNUNET_DB_QueryStatus cs; + + cs = TEH_PG_commit(pg); + if (cs < 0) + return cs; + if (GNUNET_OK != + TEH_PG_start (pg, + "reserve-update-serializable")) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + } + { + enum GNUNET_DB_QueryStatus reserve_exists; + + reserve_exists = TEH_PG_reserves_get (pg, + &reserve); + switch (reserve_exists) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + return reserve_exists; + case GNUNET_DB_STATUS_SOFT_ERROR: + return reserve_exists; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* First we got a conflict, but then we cannot select? Very strange. */ + GNUNET_break (0); + return GNUNET_DB_STATUS_SOFT_ERROR; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* continued below */ + break; + } + } + + { + struct TALER_EXCHANGEDB_Reserve updated_reserve; + enum GNUNET_DB_QueryStatus qs3; + + /* If the reserve already existed, we need to still update the + balance; we do this after checking for duplication, as + otherwise we might have to actually pay the cost to roll this + back for duplicate transactions; like this, we should virtually + never actually have to rollback anything. */ + updated_reserve.pub = reserve.pub; + if (0 > + TALER_amount_add (&updated_reserve.balance, + &reserve.balance, + &reserves->balance)) + { + /* currency overflow or incompatible currency */ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Attempt to deposit incompatible amount into reserve\n"); + return GNUNET_DB_STATUS_HARD_ERROR; + } + updated_reserve.expiry = GNUNET_TIME_timestamp_max (expiry, + reserve.expiry); + updated_reserve.gc = GNUNET_TIME_timestamp_max (gc, + reserve.gc); + qs3 = TEH_PG_reserves_update (pg, + &updated_reserve); + switch (qs3) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + return qs3; + case GNUNET_DB_STATUS_SOFT_ERROR: + return qs3; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* How can the UPDATE not work here? Very strange. */ + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* continued below */ + break; + } + } + notify_on_reserve (pg, + &reserves->reserve_pub); + /* Go back to original transaction mode */ + { + enum GNUNET_DB_QueryStatus cs; + + cs = TEH_PG_commit (pg); + if (cs < 0) + return cs; + if (GNUNET_OK != + TEH_PG_start_read_committed (pg, + "reserve-insert-continued")) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + } + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; +} diff --git a/src/exchangedb/pg_batch_reserves_in_insert.h b/src/exchangedb/pg_batch_reserves_in_insert.h new file mode 100644 index 000000000..9422096db --- /dev/null +++ b/src/exchangedb/pg_batch_reserves_in_insert.h @@ -0,0 +1,34 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see + */ +/** + * @file exchangedb/pg_batch_reserves_in_insert.h + * @brief implementation of the batch_reserves_in_insert function for Postgres + * @author Christian Grothoff + */ +#ifndef PG_BATCH_RESERVES_IN_INSERT_H +#define PG_BATCH_RESERVES_IN_INSERT_H + +#include "taler_util.h" +#include "taler_json_lib.h" +#include "taler_exchangedb_plugin.h" + + +enum GNUNET_DB_QueryStatus +TEH_PG_batch_reserves_in_insert (void *cls, + const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, + unsigned int reserves_length, + enum GNUNET_DB_QueryStatus *results); +#endif diff --git a/src/exchangedb/pg_reserves_in_insert.c b/src/exchangedb/pg_reserves_in_insert.c index 2fcca241d..428e19231 100644 --- a/src/exchangedb/pg_reserves_in_insert.c +++ b/src/exchangedb/pg_reserves_in_insert.c @@ -31,6 +31,8 @@ #include "pg_reserves_update.h" #include "pg_setup_wire_target.h" #include "pg_event_notify.h" + + /** * Generate event notification for the reserve * change. @@ -38,7 +40,6 @@ * @param pg plugin state * @param reserve_pub reserve to notfiy on */ - static void notify_on_reserve (struct PostgresClosure *pg, const struct TALER_ReservePublicKeyP *reserve_pub) @@ -52,11 +53,12 @@ notify_on_reserve (struct PostgresClosure *pg, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Notifying on reserve!\n"); TEH_PG_event_notify (pg, - &rep.header, - NULL, - 0); + &rep.header, + NULL, + 0); } + enum GNUNET_DB_QueryStatus TEH_PG_reserves_in_insert (void *cls, const struct TALER_ReservePublicKeyP *reserve_pub, @@ -135,8 +137,8 @@ TEH_PG_reserves_in_insert (void *cls, struct TALER_PaytoHashP h_payto; qs3 = TEH_PG_setup_wire_target (pg, - sender_account_details, - &h_payto); + sender_account_details, + &h_payto); if (qs3 < 0) return qs3; /* We do not have the UUID, so insert by public key */ diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 6a2a473c1..114217004 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -207,7 +207,7 @@ #include "pg_reserves_update.h" #include "pg_setup_wire_target.h" #include "pg_compute_shard.h" - +#include "pg_batch_reserves_in_insert.h" /** * Set to 1 to enable Postgres auto_explain module. This will * slow down things a _lot_, but also provide extensive logging @@ -5446,7 +5446,8 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) = &TEH_PG_select_purse_by_merge_pub; plugin->set_purse_balance = &TEH_PG_set_purse_balance; - + plugin->batch_reserves_in_insert + = &TEH_PG_batch_reserves_in_insert; return plugin; } diff --git a/src/exchangedb/procedures.sql b/src/exchangedb/procedures.sql index a732ef75f..a9d90294e 100644 --- a/src/exchangedb/procedures.sql +++ b/src/exchangedb/procedures.sql @@ -2521,3 +2521,61 @@ BEGIN END $$; COMMIT; + +/*************************************************************/ + + +CREATE OR REPLACE FUNCTION bash_reserves_in( + IN amount_val INT8, + IN amount_frac INT4, + IN rpub BYTEA, + IN now INT8, + IN min_reserve_gc INT8, + OUT reserve_found BOOLEAN, + OUT ruuid INT8) +LANGUAGE plpgsql +AS $$ +DECLARE + existed BOOLEAN; + not_existed BOOLEAN; +BEGIN + SELECT reserves.reserve_uuid into ruuid from reserves + where reserves.reserve_pub = rpub; + IF ruuid IS NOT NULL + THEN + existed = TRUE; + UPDATE reserves + SET (current_balance_val + ,current_balance_frac + ,expiration_date + ,gc_date) = + (amount_val + ,amount_frac + ,now + ,min_reserve_gc) + WHERE + reserve_pub = rpub + RETURNING existed into reserve_found; + END IF; + IF NOT FOUND + THEN + SELECT MAX(reserve_uuid)+1 into ruuid from reserves; + existed = FALSE; + INSERT INTO reserves + (reserve_uuid + ,reserve_pub + ,current_balance_val + ,current_balance_frac + ,expiration_date + ,gc_date) + VALUES + (ruuid + ,rpub + ,amount_val + ,amount_frac + ,now + ,min_reserve_gc) RETURNING existed into reserve_found; + + END IF; + +END $$; diff --git a/src/exchangedb/test_exchangedb_by_j.c b/src/exchangedb/test_exchangedb_by_j.c index b81d82d65..175691e92 100644 --- a/src/exchangedb/test_exchangedb_by_j.c +++ b/src/exchangedb/test_exchangedb_by_j.c @@ -98,46 +98,46 @@ run (void *cls) goto cleanup; } - for (unsigned int i = 0; i< 7; i++) + for (unsigned int i = 0; i< 8; i++) { - static unsigned int batches[] = {1, 1, 0, 2, 4, 16, 64}; + static unsigned int batches[] = {1, 1, 0, 2, 4, 16, 64, 256}; const char *sndr = "payto://x-taler-bank/localhost:8080/1"; struct TALER_Amount value; unsigned int batch_size = batches[i]; struct GNUNET_TIME_Absolute now; struct GNUNET_TIME_Timestamp ts; struct GNUNET_TIME_Relative duration; - struct TALER_ReservePublicKeyP reserve_pub; - + struct TALER_EXCHANGEDB_ReserveInInfo reserves[batch_size]; + enum GNUNET_DB_QueryStatus *results; GNUNET_assert (GNUNET_OK == TALER_string_to_amount (CURRENCY ":1.000010", &value)); now = GNUNET_TIME_absolute_get (); ts = GNUNET_TIME_timestamp_get (); - fprintf (stdout, - "Now: %llu\n", - now.abs_value_us); plugin->start (plugin->cls, "test_by_exchange_j"); for (unsigned int k = 0; kreserves_in_insert (plugin->cls, - &reserve_pub, - &value, - ts, - sndr, - "section", - 4)); + RND_BLK (&reserves[k].reserve_pub); + reserves[k].balance = value; + reserves[k].execution_time = ts; + reserves[k].sender_account_details = sndr; + reserves[k].exchange_account_name = "name"; } + FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != + plugin->batch_reserves_in_insert (plugin->cls, + reserves, + batch_size, + &results)); + + plugin->commit (plugin->cls); duration = GNUNET_TIME_absolute_get_duration (now); fprintf (stdout, "for a batchsize equal to %d it took %s\n", batch_size, GNUNET_STRINGS_relative_time_to_string (duration, - GNUNET_YES) ); + GNUNET_NO) ); } drop: GNUNET_break (GNUNET_OK == @@ -174,7 +174,7 @@ main (int argc, (void) GNUNET_asprintf (&config_filename, "%s.conf", testname); - fprintf (stderr, + fprintf (stdout, "Using config: %s\n", config_filename); cfg = GNUNET_CONFIGURATION_create (); @@ -195,5 +195,4 @@ main (int argc, return result; } - -/* end of test_exchangedb.c */ +/* end of test_exchangedb_by_j.c */ -- cgit v1.2.3 From b9ccfbd66b192e11766f4129bae65c16fddffc5a Mon Sep 17 00:00:00 2001 From: Joseph Date: Fri, 18 Nov 2022 11:18:45 -0500 Subject: some modifications, there is one error which display (no function matches the given name and argument types) --- .../perf_exchangedb_reserves_in_insert.c | 195 +++++++++++++++++++++ src/exchangedb/pg_batch_reserves_in_insert.c | 195 ++++----------------- src/exchangedb/pg_batch_reserves_in_insert.h | 1 + src/exchangedb/plugin_exchangedb_postgres.c | 2 + src/exchangedb/procedures.sql | 150 +++++++++++----- src/exchangedb/test_exchangedb_by_j.c | 11 +- 6 files changed, 338 insertions(+), 216 deletions(-) create mode 100644 src/exchangedb/perf_exchangedb_reserves_in_insert.c (limited to 'src/exchangedb') diff --git a/src/exchangedb/perf_exchangedb_reserves_in_insert.c b/src/exchangedb/perf_exchangedb_reserves_in_insert.c new file mode 100644 index 000000000..6c91b6bca --- /dev/null +++ b/src/exchangedb/perf_exchangedb_reserves_in_insert.c @@ -0,0 +1,195 @@ +/* + This file is part of TALER + Copyright (C) 2014-2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU General Public License for more details. + + You should have received a copy of the GNU General Public License along with + TALER; see the file COPYING. If not, see +*/ +/** + * @file exchangedb/test_exchangedb_by_j.c + * @brief test cases for DB interaction functions + * @author Joseph Xu + */ +#include "platform.h" +#include "taler_exchangedb_lib.h" +#include "taler_json_lib.h" +#include "taler_exchangedb_plugin.h" + +/** + * Global result from the testcase. + */ +static int result; + +/** + * Report line of error if @a cond is true, and jump to label "drop". + */ +#define FAILIF(cond) \ + do { \ + if (! (cond)) { break;} \ + GNUNET_break (0); \ + goto drop; \ + } while (0) + + +/** + * Initializes @a ptr with random data. + */ +#define RND_BLK(ptr) \ + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_WEAK, ptr, sizeof (*ptr)) + +/** + * Initializes @a ptr with zeros. + */ +#define ZR_BLK(ptr) \ + memset (ptr, 0, sizeof (*ptr)) + + +/** + * Currency we use. Must match test-exchange-db-*.conf. + */ +#define CURRENCY "EUR" + +/** + * Database plugin under test. + */ +static struct TALER_EXCHANGEDB_Plugin *plugin; + + +/** + * Main function that will be run by the scheduler. + * + * @param cls closure with config + */ +static void +run (void *cls) +{ + struct GNUNET_CONFIGURATION_Handle *cfg = cls; + const uint32_t num_partitions = 10; + + if (NULL == + (plugin = TALER_EXCHANGEDB_plugin_load (cfg))) + { + GNUNET_break (0); + result = 77; + return; + } + (void) plugin->drop_tables (plugin->cls); + if (GNUNET_OK != + plugin->create_tables (plugin->cls)) + { + GNUNET_break (0); + result = 77; + goto cleanup; + } + if (GNUNET_OK != + plugin->setup_partitions (plugin->cls, + num_partitions)) + { + GNUNET_break (0); + result = 77; + goto cleanup; + } + + for (unsigned int i = 0; i< 8; i++) + { + static unsigned int batches[] = {1, 1, 0, 2, 4, 16, 64, 256}; + const char *sndr = "payto://x-taler-bank/localhost:8080/1"; + struct TALER_Amount value; + unsigned int batch_size = batches[i]; + struct GNUNET_TIME_Absolute now; + struct GNUNET_TIME_Timestamp ts; + struct GNUNET_TIME_Relative duration; + struct TALER_ReservePublicKeyP reserve_pub; + + GNUNET_assert (GNUNET_OK == + TALER_string_to_amount (CURRENCY ":1.000010", + &value)); + now = GNUNET_TIME_absolute_get (); + ts = GNUNET_TIME_timestamp_get (); + plugin->start (plugin->cls, + "test_by_exchange_j"); + for (unsigned int k = 0; kreserves_in_insert (plugin->cls, + &reserve_pub, + &value, + ts, + sndr, + "section", + 4)); + } + plugin->commit (plugin->cls); + duration = GNUNET_TIME_absolute_get_duration (now); + fprintf (stdout, + "for a batchsize equal to %d it took %s\n", + batch_size, + GNUNET_STRINGS_relative_time_to_string (duration, + GNUNET_NO) ); + } +drop: + GNUNET_break (GNUNET_OK == + plugin->drop_tables (plugin->cls)); +cleanup: + TALER_EXCHANGEDB_plugin_unload (plugin); + plugin = NULL; +} + + +int +main (int argc, + char *const argv[]) +{ + const char *plugin_name; + char *config_filename; + char *testname; + struct GNUNET_CONFIGURATION_Handle *cfg; + + (void) argc; + result = -1; + if (NULL == (plugin_name = strrchr (argv[0], (int) '-'))) + { + GNUNET_break (0); + return -1; + } + GNUNET_log_setup (argv[0], + "WARNING", + NULL); + plugin_name++; + (void) GNUNET_asprintf (&testname, + "test-exchange-db-%s", + plugin_name); + (void) GNUNET_asprintf (&config_filename, + "%s.conf", + testname); + fprintf (stdout, + "Using config: %s\n", + config_filename); + cfg = GNUNET_CONFIGURATION_create (); + if (GNUNET_OK != + GNUNET_CONFIGURATION_parse (cfg, + config_filename)) + { + GNUNET_break (0); + GNUNET_free (config_filename); + GNUNET_free (testname); + return 2; + } + GNUNET_SCHEDULER_run (&run, + cfg); + GNUNET_CONFIGURATION_destroy (cfg); + GNUNET_free (config_filename); + GNUNET_free (testname); + return result; +} + +/* end of test_exchangedb_by_j.c */ diff --git a/src/exchangedb/pg_batch_reserves_in_insert.c b/src/exchangedb/pg_batch_reserves_in_insert.c index d7ce47dc2..216de96be 100644 --- a/src/exchangedb/pg_batch_reserves_in_insert.c +++ b/src/exchangedb/pg_batch_reserves_in_insert.c @@ -14,7 +14,7 @@ TALER; see the file COPYING. If not, see */ /** - * @file exchangedb/pg_bash_reserves_in_insert.c + * @file exchangedb/pg_batch_reserves_in_insert.c * @brief Implementation of the reserves_in_insert function for Postgres * @author JOSEPHxu */ @@ -70,7 +70,12 @@ TEH_PG_batch_reserves_in_insert (void *cls, struct TALER_EXCHANGEDB_Reserve reserve; struct GNUNET_TIME_Timestamp expiry; struct GNUNET_TIME_Timestamp gc; + struct TALER_PaytoHashP h_payto; uint64_t reserve_uuid; + bool conflicted; + bool transaction_duplicate; + struct GNUNET_TIME_Timestamp reserve_expiration + = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); reserve.pub = reserves->reserve_pub; expiry = GNUNET_TIME_absolute_to_timestamp ( @@ -91,15 +96,28 @@ TEH_PG_batch_reserves_in_insert (void *cls, the 'add' operation needs the reserve entry as a foreign key. */ { struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_auto_from_type (&(reserves->reserve_pub)), - TALER_PQ_query_param_amount (&(reserves->balance)), - GNUNET_PQ_query_param_timestamp (&expiry), - GNUNET_PQ_query_param_timestamp (&gc), + GNUNET_PQ_query_param_auto_from_type (&reserves->reserve_pub), /*$1*/ + TALER_PQ_query_param_amount (&reserves->balance), /*$2+3*/ + GNUNET_PQ_query_param_timestamp (&expiry), /*$4*/ + GNUNET_PQ_query_param_timestamp (&gc), /*$5*/ + GNUNET_PQ_query_param_uint64 (&reserves->wire_reference), /*6*/ + TALER_PQ_query_param_amount (&reserves->balance), /*7+8*/ + GNUNET_PQ_query_param_string (reserves->exchange_account_name), /*9*/ + GNUNET_PQ_query_param_timestamp (&reserves->execution_time), /*10*/ + GNUNET_PQ_query_param_auto_from_type (&h_payto), /*11*/ + GNUNET_PQ_query_param_string (reserves->sender_account_details),/*12*/ + GNUNET_PQ_query_param_timestamp (&reserve_expiration),/*13*/ GNUNET_PQ_query_param_end }; + + /* We should get all our results into results[]*/ struct GNUNET_PQ_ResultSpec rs[] = { GNUNET_PQ_result_spec_uint64 ("reserve_uuid", &reserve_uuid), + GNUNET_PQ_result_spec_bool ("conflicted", + &conflicted), + GNUNET_PQ_result_spec_bool ("transaction_duplicate", + &transaction_duplicate), GNUNET_PQ_result_spec_end }; @@ -108,7 +126,12 @@ TEH_PG_batch_reserves_in_insert (void *cls, /* Note: query uses 'on conflict do nothing' */ PREPARE (pg, "reserve_create", - "SELECT bash_reserves_in('34', '20','//asdddfs3', '60', '20'),bash_reserves_in('24', '10','//dfs3', '40', '50'),bash_reserves_in('42', '40','//d43', '40', '50'),bash_reserves_in('44', '10','//ghs3', '40', '50') AS existed from reserves;"); + "SELECT " + "out_reserve_found AS conflicted" + ",transaction_duplicate" + ",ruuid" + " FROM batch_reserves_in" + " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12);"); qs1 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, "reserve_create", @@ -118,167 +141,9 @@ TEH_PG_batch_reserves_in_insert (void *cls, return qs1; } - /* Create new incoming transaction, "ON CONFLICT DO NOTHING" - is again used to guard against duplicates. */ - { - enum GNUNET_DB_QueryStatus qs2; - enum GNUNET_DB_QueryStatus qs3; - struct TALER_PaytoHashP h_payto; - - qs3 = TEH_PG_setup_wire_target (pg, - reserves->sender_account_details, - &h_payto); - if (qs3 < 0) - return qs3; - /* We do not have the UUID, so insert by public key */ - struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_auto_from_type (&reserve.pub), - GNUNET_PQ_query_param_uint64 (&(reserves->wire_reference)), - TALER_PQ_query_param_amount (&(reserves->balance)), - GNUNET_PQ_query_param_string (reserves->exchange_account_name), - GNUNET_PQ_query_param_auto_from_type (&h_payto), - GNUNET_PQ_query_param_timestamp (&reserves->execution_time), - GNUNET_PQ_query_param_end - }; - - PREPARE (pg, - "reserves_in_add_transaction", - "INSERT INTO reserves_in " - "(reserve_pub" - ",wire_reference" - ",credit_val" - ",credit_frac" - ",exchange_account_section" - ",wire_source_h_payto" - ",execution_date" - ") VALUES ($1, $2, $3, $4, $5, $6, $7)" - " ON CONFLICT DO NOTHING;"); - qs2 = GNUNET_PQ_eval_prepared_non_select (pg->conn, - "reserves_in_add_transaction", - params); - /* qs2 could be 0 as statement used 'ON CONFLICT DO NOTHING' */ - if (0 >= qs2) - { - if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs2) && - (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1) ) - { - /* Conflict for the transaction, but the reserve was - just now created, that should be impossible. */ - GNUNET_break (0); /* should be impossible: reserve was fresh, - but transaction already known */ - return GNUNET_DB_STATUS_HARD_ERROR; - } - /* Transaction was already known or error. We are finished. */ - return qs2; - } - } - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs1) - { - /* New reserve, we are finished */ - notify_on_reserve (pg, - &(reserves->reserve_pub)); - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - } - - /* we were wrong with our optimistic assumption: - reserve did already exist, need to do an update instead */ - { - /* We need to move away from 'read committed' to serializable. - Also, we know that it should be safe to commit at this point. - (We are only run in a larger transaction for performance.) */ - enum GNUNET_DB_QueryStatus cs; - - cs = TEH_PG_commit(pg); - if (cs < 0) - return cs; - if (GNUNET_OK != - TEH_PG_start (pg, - "reserve-update-serializable")) - { - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; - } - } - { - enum GNUNET_DB_QueryStatus reserve_exists; - - reserve_exists = TEH_PG_reserves_get (pg, - &reserve); - switch (reserve_exists) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - return reserve_exists; - case GNUNET_DB_STATUS_SOFT_ERROR: - return reserve_exists; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* First we got a conflict, but then we cannot select? Very strange. */ - GNUNET_break (0); - return GNUNET_DB_STATUS_SOFT_ERROR; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* continued below */ - break; - } - } - - { - struct TALER_EXCHANGEDB_Reserve updated_reserve; - enum GNUNET_DB_QueryStatus qs3; - - /* If the reserve already existed, we need to still update the - balance; we do this after checking for duplication, as - otherwise we might have to actually pay the cost to roll this - back for duplicate transactions; like this, we should virtually - never actually have to rollback anything. */ - updated_reserve.pub = reserve.pub; - if (0 > - TALER_amount_add (&updated_reserve.balance, - &reserve.balance, - &reserves->balance)) - { - /* currency overflow or incompatible currency */ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Attempt to deposit incompatible amount into reserve\n"); - return GNUNET_DB_STATUS_HARD_ERROR; - } - updated_reserve.expiry = GNUNET_TIME_timestamp_max (expiry, - reserve.expiry); - updated_reserve.gc = GNUNET_TIME_timestamp_max (gc, - reserve.gc); - qs3 = TEH_PG_reserves_update (pg, - &updated_reserve); - switch (qs3) - { - case GNUNET_DB_STATUS_HARD_ERROR: - GNUNET_break (0); - return qs3; - case GNUNET_DB_STATUS_SOFT_ERROR: - return qs3; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* How can the UPDATE not work here? Very strange. */ - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* continued below */ - break; - } - } notify_on_reserve (pg, &reserves->reserve_pub); - /* Go back to original transaction mode */ - { - enum GNUNET_DB_QueryStatus cs; - cs = TEH_PG_commit (pg); - if (cs < 0) - return cs; - if (GNUNET_OK != - TEH_PG_start_read_committed (pg, - "reserve-insert-continued")) - { - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; - } - } + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } diff --git a/src/exchangedb/pg_batch_reserves_in_insert.h b/src/exchangedb/pg_batch_reserves_in_insert.h index 9422096db..766795672 100644 --- a/src/exchangedb/pg_batch_reserves_in_insert.h +++ b/src/exchangedb/pg_batch_reserves_in_insert.h @@ -31,4 +31,5 @@ TEH_PG_batch_reserves_in_insert (void *cls, const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, unsigned int reserves_length, enum GNUNET_DB_QueryStatus *results); + #endif diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index 114217004..5b59d4b05 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -5446,8 +5446,10 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) = &TEH_PG_select_purse_by_merge_pub; plugin->set_purse_balance = &TEH_PG_set_purse_balance; + plugin->batch_reserves_in_insert = &TEH_PG_batch_reserves_in_insert; + return plugin; } diff --git a/src/exchangedb/procedures.sql b/src/exchangedb/procedures.sql index a9d90294e..1e96301a4 100644 --- a/src/exchangedb/procedures.sql +++ b/src/exchangedb/procedures.sql @@ -2520,62 +2520,120 @@ BEGIN policy_details_serial_id = out_policy_details_serial_id; END $$; -COMMIT; - -/*************************************************************/ - - -CREATE OR REPLACE FUNCTION bash_reserves_in( - IN amount_val INT8, - IN amount_frac INT4, - IN rpub BYTEA, - IN now INT8, - IN min_reserve_gc INT8, - OUT reserve_found BOOLEAN, +CREATE OR REPLACE FUNCTION batch_reserves_in( + IN in_reserve_pub BYTEA, + IN in_current_balance_val INT8, + IN in_current_balance_frac INT4, + IN in_expiration_date INT8, + IN in_gc_date INT8, + IN in_wire_ref INT8, + IN in_credit_val INT8, + IN in_credit_frac INT4, + IN in_exchange_account_name VARCHAR, + IN in_exectution_date INT4, + IN in_wire_source_h_payto BYTEA, ---h_payto + IN in_payto_uri VARCHAR, + IN in_reserve_expiration INT8, + OUT out_reserve_found BOOLEAN, + OUT transaction_duplicate BOOLEAN, OUT ruuid INT8) LANGUAGE plpgsql AS $$ DECLARE - existed BOOLEAN; - not_existed BOOLEAN; + my_amount_val INT8; +DECLARE + my_amount_frac INT4; BEGIN - SELECT reserves.reserve_uuid into ruuid from reserves - where reserves.reserve_pub = rpub; - IF ruuid IS NOT NULL - THEN - existed = TRUE; - UPDATE reserves - SET (current_balance_val - ,current_balance_frac - ,expiration_date - ,gc_date) = - (amount_val - ,amount_frac - ,now - ,min_reserve_gc) - WHERE - reserve_pub = rpub - RETURNING existed into reserve_found; - END IF; - IF NOT FOUND - THEN - SELECT MAX(reserve_uuid)+1 into ruuid from reserves; - existed = FALSE; - INSERT INTO reserves - (reserve_uuid - ,reserve_pub + + SELECT + current_balance_val + ,current_balance_frac + INTO + my_amount_val + ,my_amount_frac + FROM reserves + WHERE reserves.reserve_pub = in_reserve_pub; + + INSERT INTO reserves + (reserve_pub ,current_balance_val ,current_balance_frac ,expiration_date ,gc_date) VALUES - (ruuid - ,rpub - ,amount_val - ,amount_frac - ,now - ,min_reserve_gc) RETURNING existed into reserve_found; - + (in_reserve_pub + ,in_current_balance_val + ,in_current_balance_frac + ,in_expiration_date + ,in_gc_date) + ON CONFLICT DO NOTHING + RETURNING reserves.reserve_uuid INTO ruuid; + + --IF THE INSERT WAS NOT SUCCESSFUL, REMEMBER IT + IF NOT FOUND + THEN + out_reserve_found = FALSE; + ELSE + out_reserve_found = TRUE; END IF; + --SIMPLE INSERT ON CONFLICT DO NOTHING + INSERT INTO wire_targets + (wire_target_h_payto + ,payto_uri) + VALUES + (in_wire_source_h_payto + ,in_payto_uri) + ON CONFLICT DO NOTHING; + + INSERT INTO reserves_in + (reserve_pub + ,wire_reference + ,credit_val + ,credit_frac + ,exchange_account_section + ,wire_source_h_payto + ,execution_date) + VALUES + (in_reserve_pub + ,in_wire_ref + ,in_current_balance_val + ,in_credit_frac + ,in_exchange_account_section + ,in_wire_source_h_payto + ,in_execution_date); + + --IF THE INSERTION WAS A SUCCESS IT MEANS NO DUPLICATED TRANSACTION + IF FOUND + THEN + transaction_duplicate = FALSE; + IF out_reserve_found = TRUE + THEN + UPDATE reserves + SET + in_current_balance_frac=in_current_balance_frac+my_amount_frac + - CASE + WHEN in_current_balance_frac + my_amount_frac >= 100000000 + THEN 100000000 + ELSE 0 + END + ,in_current_balance_val=in_current_balance_val+my_amount_val + + CASE + WHEN in_current_balance_frac + my_amount_frac >= 100000000 + THEN 1 + ELSE 0 + END + ,expiration_date=GREATEST(in_expiration_date,in_reserve_expiration) + ,gc_date=GREATEST(in_gc_date,in_reserve_expiration) + WHERE reserves.reserve_pub=in_reserve_pub; + RETURN; + ELSE + RETURN; + END IF; + ELSE + transaction_duplicate = TRUE; + RETURN; + END IF; END $$; + +COMMIT; diff --git a/src/exchangedb/test_exchangedb_by_j.c b/src/exchangedb/test_exchangedb_by_j.c index 175691e92..eb600103e 100644 --- a/src/exchangedb/test_exchangedb_by_j.c +++ b/src/exchangedb/test_exchangedb_by_j.c @@ -33,7 +33,7 @@ static int result; */ #define FAILIF(cond) \ do { \ - if (! (cond)) { break;} \ + if (! (cond)) {break;} \ GNUNET_break (0); \ goto drop; \ } while (0) @@ -108,14 +108,15 @@ run (void *cls) struct GNUNET_TIME_Timestamp ts; struct GNUNET_TIME_Relative duration; struct TALER_EXCHANGEDB_ReserveInInfo reserves[batch_size]; - enum GNUNET_DB_QueryStatus *results; + enum GNUNET_DB_QueryStatus results[batch_size]; GNUNET_assert (GNUNET_OK == TALER_string_to_amount (CURRENCY ":1.000010", &value)); now = GNUNET_TIME_absolute_get (); ts = GNUNET_TIME_timestamp_get (); plugin->start (plugin->cls, - "test_by_exchange_j"); + "test_by_j"); + for (unsigned int k = 0; kbatch_reserves_in_insert (plugin->cls, reserves, batch_size, - &results)); - + results)); plugin->commit (plugin->cls); duration = GNUNET_TIME_absolute_get_duration (now); -- cgit v1.2.3 From dbfd4e252a1bdbc7e4c8c97fd4780a13369b872c Mon Sep 17 00:00:00 2001 From: Joseph Date: Mon, 21 Nov 2022 09:45:16 -0500 Subject: batch test for reserves-in-insert --- src/exchangedb/pg_batch_reserves_in_insert.c | 13 +++--- src/exchangedb/procedures.sql | 60 ++++++++++++++-------------- src/exchangedb/test_exchangedb_by_j.c | 5 ++- 3 files changed, 41 insertions(+), 37 deletions(-) (limited to 'src/exchangedb') diff --git a/src/exchangedb/pg_batch_reserves_in_insert.c b/src/exchangedb/pg_batch_reserves_in_insert.c index 216de96be..fd056e0ca 100644 --- a/src/exchangedb/pg_batch_reserves_in_insert.c +++ b/src/exchangedb/pg_batch_reserves_in_insert.c @@ -25,6 +25,7 @@ #include "pg_batch_reserves_in_insert.h" #include "pg_helper.h" #include "pg_start.h" +#include "pg_rollback.h" #include "pg_start_read_committed.h" #include "pg_commit.h" #include "pg_reserves_get.h" @@ -97,7 +98,6 @@ TEH_PG_batch_reserves_in_insert (void *cls, { struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_auto_from_type (&reserves->reserve_pub), /*$1*/ - TALER_PQ_query_param_amount (&reserves->balance), /*$2+3*/ GNUNET_PQ_query_param_timestamp (&expiry), /*$4*/ GNUNET_PQ_query_param_timestamp (&gc), /*$5*/ GNUNET_PQ_query_param_uint64 (&reserves->wire_reference), /*6*/ @@ -121,6 +121,7 @@ TEH_PG_batch_reserves_in_insert (void *cls, GNUNET_PQ_result_spec_end }; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Reserve does not exist; creating a new one\n"); /* Note: query uses 'on conflict do nothing' */ @@ -129,21 +130,23 @@ TEH_PG_batch_reserves_in_insert (void *cls, "SELECT " "out_reserve_found AS conflicted" ",transaction_duplicate" - ",ruuid" + ",ruuid AS reserve_uuid" " FROM batch_reserves_in" - " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11,$12);"); + " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);"); qs1 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, "reserve_create", params, rs); + + if (qs1 < 0) return qs1; } - + if ((int)conflicted == 0 && (int)transaction_duplicate == 1) + TEH_PG_rollback(pg); notify_on_reserve (pg, &reserves->reserve_pub); - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } diff --git a/src/exchangedb/procedures.sql b/src/exchangedb/procedures.sql index 1e96301a4..ff282735c 100644 --- a/src/exchangedb/procedures.sql +++ b/src/exchangedb/procedures.sql @@ -2522,15 +2522,13 @@ END $$; CREATE OR REPLACE FUNCTION batch_reserves_in( IN in_reserve_pub BYTEA, - IN in_current_balance_val INT8, - IN in_current_balance_frac INT4, IN in_expiration_date INT8, IN in_gc_date INT8, IN in_wire_ref INT8, IN in_credit_val INT8, IN in_credit_frac INT4, IN in_exchange_account_name VARCHAR, - IN in_exectution_date INT4, + IN in_exectution_date INT8, IN in_wire_source_h_payto BYTEA, ---h_payto IN in_payto_uri VARCHAR, IN in_reserve_expiration INT8, @@ -2545,15 +2543,6 @@ DECLARE my_amount_frac INT4; BEGIN - SELECT - current_balance_val - ,current_balance_frac - INTO - my_amount_val - ,my_amount_frac - FROM reserves - WHERE reserves.reserve_pub = in_reserve_pub; - INSERT INTO reserves (reserve_pub ,current_balance_val @@ -2562,18 +2551,19 @@ BEGIN ,gc_date) VALUES (in_reserve_pub - ,in_current_balance_val - ,in_current_balance_frac + ,in_credit_val + ,in_credit_frac ,in_expiration_date ,in_gc_date) - ON CONFLICT DO NOTHING - RETURNING reserves.reserve_uuid INTO ruuid; + ON CONFLICT DO NOTHING + RETURNING reserve_uuid INTO ruuid; - --IF THE INSERT WAS NOT SUCCESSFUL, REMEMBER IT - IF NOT FOUND + IF FOUND THEN + -- We made a change, so the reserve did not previously exist. out_reserve_found = FALSE; ELSE + -- We made no change, which means the reserve existed. out_reserve_found = TRUE; END IF; @@ -2597,42 +2587,52 @@ BEGIN VALUES (in_reserve_pub ,in_wire_ref - ,in_current_balance_val + ,in_credit_val ,in_credit_frac - ,in_exchange_account_section + ,in_exchange_account_name ,in_wire_source_h_payto - ,in_execution_date); + ,in_expiration_date); --IF THE INSERTION WAS A SUCCESS IT MEANS NO DUPLICATED TRANSACTION IF FOUND THEN transaction_duplicate = FALSE; - IF out_reserve_found = TRUE + IF out_reserve_found THEN UPDATE reserves SET - in_current_balance_frac=in_current_balance_frac+my_amount_frac + current_balance_frac = current_balance_frac+in_credit_frac - CASE - WHEN in_current_balance_frac + my_amount_frac >= 100000000 + WHEN current_balance_frac + in_credit_frac >= 100000000 THEN 100000000 - ELSE 0 + ELSE 1 END - ,in_current_balance_val=in_current_balance_val+my_amount_val + ,current_balance_val = current_balance_val+in_credit_val + CASE - WHEN in_current_balance_frac + my_amount_frac >= 100000000 + WHEN current_balance_frac + in_credit_frac >= 100000000 THEN 1 ELSE 0 END - ,expiration_date=GREATEST(in_expiration_date,in_reserve_expiration) - ,gc_date=GREATEST(in_gc_date,in_reserve_expiration) + ,expiration_date=GREATEST(expiration_date,in_expiration_date) + ,gc_date=GREATEST(gc_date,in_expiration_date) WHERE reserves.reserve_pub=in_reserve_pub; + out_reserve_found = TRUE; RETURN; ELSE + out_reserve_found=FALSE; RETURN; END IF; + out_reserve_found = TRUE; ELSE transaction_duplicate = TRUE; - RETURN; + IF out_reserve_found + THEN + out_reserve_found = TRUE; + RETURN; + ELSE + out_reserve_found = FALSE; + RETURN; + END IF; END IF; END $$; diff --git a/src/exchangedb/test_exchangedb_by_j.c b/src/exchangedb/test_exchangedb_by_j.c index eb600103e..831416b4d 100644 --- a/src/exchangedb/test_exchangedb_by_j.c +++ b/src/exchangedb/test_exchangedb_by_j.c @@ -98,9 +98,9 @@ run (void *cls) goto cleanup; } - for (unsigned int i = 0; i< 8; i++) + for (unsigned int i = 0; i< 7; i++) { - static unsigned int batches[] = {1, 1, 0, 2, 4, 16, 64, 256}; + static unsigned int batches[] = {1, 1, 2, 4, 16, 64, 256}; const char *sndr = "payto://x-taler-bank/localhost:8080/1"; struct TALER_Amount value; unsigned int batch_size = batches[i]; @@ -124,6 +124,7 @@ run (void *cls) reserves[k].execution_time = ts; reserves[k].sender_account_details = sndr; reserves[k].exchange_account_name = "name"; + reserves[k].wire_reference = k; } FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != -- cgit v1.2.3 From eba2a5d90c1dbc2e5121e404ac4154dc70d9cace Mon Sep 17 00:00:00 2001 From: Joseph Date: Mon, 21 Nov 2022 10:37:50 -0500 Subject: new batch insertion code --- .../perf_exchangedb_reserves_in_insert.c | 4 ++ src/exchangedb/pg_batch_reserves_in_insert.c | 64 +++++++++++----------- src/exchangedb/test-exchange-db-postgres.conf | 2 +- src/exchangedb/test_exchangedb_by_j.c | 10 +++- 4 files changed, 43 insertions(+), 37 deletions(-) (limited to 'src/exchangedb') diff --git a/src/exchangedb/perf_exchangedb_reserves_in_insert.c b/src/exchangedb/perf_exchangedb_reserves_in_insert.c index 6c91b6bca..9a0ec0944 100644 --- a/src/exchangedb/perf_exchangedb_reserves_in_insert.c +++ b/src/exchangedb/perf_exchangedb_reserves_in_insert.c @@ -114,6 +114,8 @@ run (void *cls) &value)); now = GNUNET_TIME_absolute_get (); ts = GNUNET_TIME_timestamp_get (); + for (unsigned int r=0;r<10;r++) + { plugin->start (plugin->cls, "test_by_exchange_j"); for (unsigned int k = 0; kcommit (plugin->cls); + } duration = GNUNET_TIME_absolute_get_duration (now); fprintf (stdout, "for a batchsize equal to %d it took %s\n", @@ -136,6 +139,7 @@ run (void *cls) GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_NO) ); } + result = 0; drop: GNUNET_break (GNUNET_OK == plugin->drop_tables (plugin->cls)); diff --git a/src/exchangedb/pg_batch_reserves_in_insert.c b/src/exchangedb/pg_batch_reserves_in_insert.c index fd056e0ca..4a1a2792e 100644 --- a/src/exchangedb/pg_batch_reserves_in_insert.c +++ b/src/exchangedb/pg_batch_reserves_in_insert.c @@ -62,13 +62,12 @@ notify_on_reserve (struct PostgresClosure *pg, enum GNUNET_DB_QueryStatus TEH_PG_batch_reserves_in_insert (void *cls, - const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, - unsigned int reserves_length, - enum GNUNET_DB_QueryStatus *results) + const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, + unsigned int reserves_length, + enum GNUNET_DB_QueryStatus *results) { struct PostgresClosure *pg = cls; enum GNUNET_DB_QueryStatus qs1; - struct TALER_EXCHANGEDB_Reserve reserve; struct GNUNET_TIME_Timestamp expiry; struct GNUNET_TIME_Timestamp gc; struct TALER_PaytoHashP h_payto; @@ -78,7 +77,14 @@ TEH_PG_batch_reserves_in_insert (void *cls, struct GNUNET_TIME_Timestamp reserve_expiration = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); - reserve.pub = reserves->reserve_pub; + PREPARE (pg, + "reserve_create", + "SELECT " + "out_reserve_found AS conflicted" + ",transaction_duplicate" + ",ruuid AS reserve_uuid" + " FROM batch_reserves_in" + " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);"); expiry = GNUNET_TIME_absolute_to_timestamp ( GNUNET_TIME_absolute_add (reserves->execution_time.abs_time, pg->idle_reserve_expiration_time)); @@ -95,21 +101,22 @@ TEH_PG_batch_reserves_in_insert (void *cls, time; we do this before adding the actual transaction to "reserves_in", as for a new reserve it can't be a duplicate 'add' operation, and as the 'add' operation needs the reserve entry as a foreign key. */ + for (unsigned int i=0;ireserve_pub), /*$1*/ + GNUNET_PQ_query_param_auto_from_type (&reserve->reserve_pub), /*$1*/ GNUNET_PQ_query_param_timestamp (&expiry), /*$4*/ GNUNET_PQ_query_param_timestamp (&gc), /*$5*/ - GNUNET_PQ_query_param_uint64 (&reserves->wire_reference), /*6*/ - TALER_PQ_query_param_amount (&reserves->balance), /*7+8*/ - GNUNET_PQ_query_param_string (reserves->exchange_account_name), /*9*/ - GNUNET_PQ_query_param_timestamp (&reserves->execution_time), /*10*/ + GNUNET_PQ_query_param_uint64 (&reserve->wire_reference), /*6*/ + TALER_PQ_query_param_amount (&reserve->balance), /*7+8*/ + GNUNET_PQ_query_param_string (reserve->exchange_account_name), /*9*/ + GNUNET_PQ_query_param_timestamp (&reserve->execution_time), /*10*/ GNUNET_PQ_query_param_auto_from_type (&h_payto), /*11*/ - GNUNET_PQ_query_param_string (reserves->sender_account_details),/*12*/ + GNUNET_PQ_query_param_string (reserve->sender_account_details),/*12*/ GNUNET_PQ_query_param_timestamp (&reserve_expiration),/*13*/ GNUNET_PQ_query_param_end }; - /* We should get all our results into results[]*/ struct GNUNET_PQ_ResultSpec rs[] = { GNUNET_PQ_result_spec_uint64 ("reserve_uuid", @@ -121,32 +128,23 @@ TEH_PG_batch_reserves_in_insert (void *cls, GNUNET_PQ_result_spec_end }; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Reserve does not exist; creating a new one\n"); + TALER_payto_hash (reserve->sender_account_details, + &h_payto); /* Note: query uses 'on conflict do nothing' */ - PREPARE (pg, - "reserve_create", - "SELECT " - "out_reserve_found AS conflicted" - ",transaction_duplicate" - ",ruuid AS reserve_uuid" - " FROM batch_reserves_in" - " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);"); - qs1 = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, "reserve_create", params, rs); - - if (qs1 < 0) - return qs1; + return qs1; + notify_on_reserve (pg, + &reserve->reserve_pub); + GNUNET_assert (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs1); + results[i] = (transaction_duplicate) + ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS + : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + if ( (! conflicted) && transaction_duplicate) + TEH_PG_rollback(pg); } - if ((int)conflicted == 0 && (int)transaction_duplicate == 1) - TEH_PG_rollback(pg); - notify_on_reserve (pg, - &reserves->reserve_pub); - - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + return reserves_length; } diff --git a/src/exchangedb/test-exchange-db-postgres.conf b/src/exchangedb/test-exchange-db-postgres.conf index 05104cb7f..92bdde393 100644 --- a/src/exchangedb/test-exchange-db-postgres.conf +++ b/src/exchangedb/test-exchange-db-postgres.conf @@ -7,7 +7,7 @@ BASE_URL = http://localhost/ [exchangedb-postgres] #The connection string the plugin has to use for connecting to the database -CONFIG = postgres:///talercheck +CONFIG = postgres://dab:test@localhost/talercheck # Where are the SQL files to setup our tables? SQL_DIR = $DATADIR/sql/exchange/ diff --git a/src/exchangedb/test_exchangedb_by_j.c b/src/exchangedb/test_exchangedb_by_j.c index 831416b4d..b2f6ddeb5 100644 --- a/src/exchangedb/test_exchangedb_by_j.c +++ b/src/exchangedb/test_exchangedb_by_j.c @@ -114,8 +114,10 @@ run (void *cls) &value)); now = GNUNET_TIME_absolute_get (); ts = GNUNET_TIME_timestamp_get (); - plugin->start (plugin->cls, - "test_by_j"); + for (unsigned int r=0;r<10;r++) + { + plugin->start_read_committed (plugin->cls, + "test_by_j"); for (unsigned int k = 0; kbatch_reserves_in_insert (plugin->cls, reserves, batch_size, results)); plugin->commit (plugin->cls); + } duration = GNUNET_TIME_absolute_get_duration (now); fprintf (stdout, "for a batchsize equal to %d it took %s\n", @@ -141,6 +144,7 @@ run (void *cls) GNUNET_STRINGS_relative_time_to_string (duration, GNUNET_NO) ); } + result = 0; drop: GNUNET_break (GNUNET_OK == plugin->drop_tables (plugin->cls)); -- cgit v1.2.3