summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2023-07-30 16:28:47 +0200
committerChristian Grothoff <christian@grothoff.org>2023-07-30 16:28:47 +0200
commit09ec056da4ebbbc3437df05344075a4d109ec204 (patch)
tree3df12b85e54f986c676954dcafb0693fe6e47f23
parenta29f04189c8b7457ede0b3ac01222b88853325a8 (diff)
downloadexchange-09ec056da4ebbbc3437df05344075a4d109ec204.tar.gz
exchange-09ec056da4ebbbc3437df05344075a4d109ec204.tar.bz2
exchange-09ec056da4ebbbc3437df05344075a4d109ec204.zip
misc bug fixes in batch insert
-rw-r--r--src/benchmark/benchmark-common.conf3
-rw-r--r--src/benchmark/taler-bank-benchmark.c14
-rw-r--r--src/exchangedb/exchange_do_reserves_in_insert.sql79
-rw-r--r--src/exchangedb/pg_reserves_in_insert.c396
4 files changed, 267 insertions, 225 deletions
diff --git a/src/benchmark/benchmark-common.conf b/src/benchmark/benchmark-common.conf
index 11e106d2f..3a11ea317 100644
--- a/src/benchmark/benchmark-common.conf
+++ b/src/benchmark/benchmark-common.conf
@@ -75,6 +75,9 @@ CONFIG="postgres:///talercheck"
[syncdb-postgres]
CONFIG="postgres:///talercheck"
+[exchange]
+WIREWATCH_IDLE_SLEEP_INTERVAL = 5000 ms
+
[bank]
HTTP_PORT=8082
SERVE=http
diff --git a/src/benchmark/taler-bank-benchmark.c b/src/benchmark/taler-bank-benchmark.c
index 224b6d5d1..cde6217e4 100644
--- a/src/benchmark/taler-bank-benchmark.c
+++ b/src/benchmark/taler-bank-benchmark.c
@@ -38,6 +38,7 @@
#include "taler_testing_lib.h"
#include "taler_error_codes.h"
+#define SHARD_SIZE "1024"
/**
* Credentials to use for the benchmark.
@@ -75,11 +76,6 @@ static unsigned int howmany_reserves = 1;
static unsigned int howmany_clients = 1;
/**
- * How many bank worker threads do we want to create.
- */
-static unsigned int howmany_threads;
-
-/**
* How many wirewatch processes do we want to create.
*/
static unsigned int start_wirewatch;
@@ -339,6 +335,7 @@ parallel_benchmark (void)
"taler-exchange-wirewatch",
"-c", cfg_filename,
"-a", exchange_bank_section,
+ "-S", SHARD_SIZE,
(NULL != loglev) ? "-L" : NULL,
loglev,
NULL);
@@ -376,6 +373,7 @@ parallel_benchmark (void)
"taler-exchange-wirewatch",
"-c", cfg_filename,
"-a", exchange_bank_section,
+ "-S", SHARD_SIZE,
"-t",
(NULL != loglev) ? "-L" : NULL,
loglev,
@@ -405,6 +403,7 @@ parallel_benchmark (void)
"taler-exchange-wirewatch",
"-c", cfg_filename,
"-a", exchange_bank_section,
+ "-S", SHARD_SIZE,
"-t",
(NULL != loglev) ? "-L" : NULL,
loglev,
@@ -451,11 +450,6 @@ main (int argc,
"NPROCS",
"How many client processes we should run",
&howmany_clients),
- GNUNET_GETOPT_option_uint ('P',
- "service-parallelism",
- "NTHREADS",
- "How many service threads we should create",
- &howmany_threads),
GNUNET_GETOPT_option_uint ('r',
"reserves",
"NRESERVES",
diff --git a/src/exchangedb/exchange_do_reserves_in_insert.sql b/src/exchangedb/exchange_do_reserves_in_insert.sql
index b88aaa44d..f8c7f1971 100644
--- a/src/exchangedb/exchange_do_reserves_in_insert.sql
+++ b/src/exchangedb/exchange_do_reserves_in_insert.sql
@@ -922,47 +922,47 @@ DECLARE
conflict BOOL;
dup BOOL;
uuid INT8;
- i RECORD;
+ i INT4;
+ ini_reserve_pub BYTEA;
+ ini_wire_ref INT8;
+ ini_credit taler_amount;
+ ini_exchange_account_name TEXT;
+ ini_execution_date INT8;
+ ini_wire_source_h_payto BYTEA;
+ ini_payto_uri TEXT;
+ ini_notify TEXT;
BEGIN
- INSERT INTO wire_targets
- (wire_target_h_payto
- ,payto_uri)
- SELECT
- wire_source_h_payto
- ,payto_uri
- FROM
- UNNEST (ina_wire_source_h_payto) AS wire_source_h_payto
- ,UNNEST (ina_payto_uri) AS payto_uri
- ON CONFLICT DO NOTHING;
-
- FOR i IN
- SELECT
- reserve_pub
- ,wire_ref
- ,credit
- ,exchange_account_name
- ,execution_date
- ,wire_source_h_payto
- ,payto_uri
- ,notify
- FROM
- UNNEST (ina_reserve_pub) AS reserve_pub
- ,UNNEST (ina_wire_ref) AS wire_ref
- ,UNNEST (ina_credit) AS credit
- ,UNNEST (ina_exchange_account_name) AS exchange_account_name
- ,UNNEST (ina_execution_date) AS execution_date
- ,UNNEST (ina_wire_source_h_payto) AS wire_source_h_payto
- ,UNNEST (ina_notify) AS notify
+ FOR i IN 1..array_length(ina_reserve_pub,1)
LOOP
+ ini_reserve_pub = ina_reserve_pub[i];
+ ini_wire_ref = ina_wire_ref[i];
+ ini_credit = ina_credit[i];
+ ini_exchange_account_name = ina_exchange_account_name[i];
+ ini_execution_date = ina_execution_date[i];
+ ini_wire_source_h_payto = ina_wire_source_h_payto[i];
+ ini_payto_uri = ina_payto_uri[i];
+ ini_notify = ina_notify[i];
+
+-- RAISE WARNING 'Starting loop on %', ini_notify;
+
+ INSERT INTO wire_targets
+ (wire_target_h_payto
+ ,payto_uri
+ ) VALUES (
+ ini_wire_source_h_payto
+ ,ini_payto_uri
+ )
+ ON CONFLICT DO NOTHING;
+
INSERT INTO reserves
(reserve_pub
,current_balance
,expiration_date
,gc_date
) VALUES (
- i.reserve_pub
- ,i.credit
+ ini_reserve_pub
+ ,ini_credit
,in_reserve_expiration
,in_gc_date
)
@@ -979,12 +979,12 @@ BEGIN
,wire_source_h_payto
,execution_date
) VALUES (
- i.reserve_pub
- ,i.wire_reference
- ,i.credit
- ,i.exchange_account_section
- ,i.wire_source_h_payto
- ,i.execution_date
+ ini_reserve_pub
+ ,ini_wire_ref
+ ,ini_credit
+ ,ini_exchange_account_name
+ ,ini_wire_source_h_payto
+ ,ini_execution_date
)
ON CONFLICT DO NOTHING;
@@ -1001,12 +1001,11 @@ BEGIN
THEN
EXECUTE FORMAT (
'NOTIFY %s'
- ,i.notify);
+ ,ini_notify);
END IF;
dup = FALSE;
END IF;
RETURN NEXT (dup,uuid);
END LOOP;
-
RETURN;
END $$;
diff --git a/src/exchangedb/pg_reserves_in_insert.c b/src/exchangedb/pg_reserves_in_insert.c
index f4fe655ec..77a68c90d 100644
--- a/src/exchangedb/pg_reserves_in_insert.c
+++ b/src/exchangedb/pg_reserves_in_insert.c
@@ -513,6 +513,8 @@ transact (
if (! need_update)
return reserves_length;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "updated needed!\n");
if (GNUNET_OK !=
TEH_PG_start (pg,
"reserve-insert-continued"))
@@ -689,7 +691,8 @@ helper_cb (void *cls,
ctx->status = GNUNET_SYSERR;
return;
}
- ctx->needs_update |= ctx->conflicts[i];
+ if (! ctx->transaction_duplicates[i])
+ ctx->needs_update |= ctx->conflicts[i];
}
}
@@ -697,208 +700,251 @@ helper_cb (void *cls,
enum GNUNET_DB_QueryStatus
TEH_PG_reserves_in_insert (
void *cls,
- const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
- unsigned int reserves_length,
+ const struct TALER_EXCHANGEDB_ReserveInInfo *real_reserves,
+ unsigned int real_reserves_length,
unsigned int batch_size,
enum GNUNET_DB_QueryStatus *results)
{
- (void) batch_size;
struct PostgresClosure *pg = cls;
- struct TALER_PaytoHashP h_paytos[GNUNET_NZL (reserves_length)];
- char *notify_s[GNUNET_NZL (reserves_length)];
- struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)];
- struct TALER_Amount balances[GNUNET_NZL (reserves_length)];
- struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)];
- const char *sender_account_details[GNUNET_NZL (reserves_length)];
- const char *exchange_account_names[GNUNET_NZL (reserves_length)];
- uint64_t wire_references[GNUNET_NZL (reserves_length)];
- uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];
- bool transaction_duplicates[GNUNET_NZL (reserves_length)];
- bool conflicts[GNUNET_NZL (reserves_length)];
- struct GNUNET_TIME_Timestamp reserve_expiration
- = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
- struct GNUNET_TIME_Timestamp gc
- = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time);
- enum GNUNET_DB_QueryStatus qs;
- bool need_update;
-
- for (unsigned int i = 0; i<reserves_length; i++)
- {
- const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
-
- TALER_payto_hash (reserve->sender_account_details,
- &h_paytos[i]);
- notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
- reserve_pubs[i] = *reserve->reserve_pub;
- balances[i] = *reserve->balance;
- execution_times[i] = reserve->execution_time;
- sender_account_details[i] = reserve->sender_account_details;
- exchange_account_names[i] = reserve->exchange_account_name;
- wire_references[i] = reserve->wire_reference;
- }
+ unsigned int dups = 0;
+ batch_size = real_reserves_length;
+ enum GNUNET_DB_QueryStatus rqs = 0;
- /* NOTE: kind-of pointless to explicitly start a transaction here... */
- if (GNUNET_OK !=
- TEH_PG_preflight (pg))
+ for (unsigned int batch = 0; batch < real_reserves_length;
+ batch += batch_size)
{
- GNUNET_break (0);
- qs = GNUNET_DB_STATUS_HARD_ERROR;
- goto finished;
- }
-
- if (GNUNET_OK !=
- TEH_PG_start_read_committed (pg,
- "READ_COMMITED"))
- {
- GNUNET_break (0);
- qs = GNUNET_DB_STATUS_HARD_ERROR;
- goto finished;
- }
-
- PREPARE (pg,
- "reserves_insert_with_array",
- "SELECT"
- " transaction_duplicate"
- ",ruuid"
- " FROM exchange_do_array_reserves_insert"
- " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10);");
- {
- struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_timestamp (&gc),
- GNUNET_PQ_query_param_timestamp (&reserve_expiration),
- GNUNET_PQ_query_param_array_auto_from_type (reserves_length,
- reserve_pubs,
- pg->conn),
- GNUNET_PQ_query_param_array_uint64 (reserves_length,
- wire_references,
- pg->conn),
- TALER_PQ_query_param_array_amount (
- reserves_length,
- balances,
- pg->conn),
- GNUNET_PQ_query_param_array_ptrs_string (
- reserves_length,
- (const char **) exchange_account_names,
- pg->conn),
- GNUNET_PQ_query_param_array_timestamp (
- reserves_length,
- execution_times,
- pg->conn),
- GNUNET_PQ_query_param_array_auto_from_type (
- reserves_length,
- h_paytos,
- pg->conn),
- GNUNET_PQ_query_param_array_ptrs_string (
- reserves_length,
- (const char **) sender_account_details,
- pg->conn),
- GNUNET_PQ_query_param_array_ptrs_string (
- reserves_length,
- (const char **) notify_s,
- pg->conn),
- GNUNET_PQ_query_param_end
- };
- struct Context ctx = {
- .reserve_uuids = reserve_uuids,
- .transaction_duplicates = transaction_duplicates,
- .conflicts = conflicts,
- .needs_update = false,
- .status = GNUNET_OK
- };
+ const struct TALER_EXCHANGEDB_ReserveInInfo *reserves =
+ &real_reserves[batch];
+ unsigned int reserves_length = GNUNET_MIN (batch_size,
+ real_reserves_length - batch);
+ struct TALER_PaytoHashP h_paytos[GNUNET_NZL (reserves_length)];
+ char *notify_s[GNUNET_NZL (reserves_length)];
+ struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)];
+ struct TALER_Amount balances[GNUNET_NZL (reserves_length)];
+ struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)];
+ const char *sender_account_details[GNUNET_NZL (reserves_length)];
+ const char *exchange_account_names[GNUNET_NZL (reserves_length)];
+ uint64_t wire_references[GNUNET_NZL (reserves_length)];
+ uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];
+ bool transaction_duplicates[GNUNET_NZL (reserves_length)];
+ bool conflicts[GNUNET_NZL (reserves_length)];
+ struct GNUNET_TIME_Timestamp reserve_expiration
+ = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
+ struct GNUNET_TIME_Timestamp gc
+ = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time);
+ enum GNUNET_DB_QueryStatus qs;
+ bool need_update;
+
+ for (unsigned int i = 0; i<reserves_length; i++)
+ {
+ const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
+
+ TALER_payto_hash (reserve->sender_account_details,
+ &h_paytos[i]);
+ notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
+ reserve_pubs[i] = *reserve->reserve_pub;
+ balances[i] = *reserve->balance;
+ execution_times[i] = reserve->execution_time;
+ sender_account_details[i] = reserve->sender_account_details;
+ exchange_account_names[i] = reserve->exchange_account_name;
+ wire_references[i] = reserve->wire_reference;
+ }
- qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
- "reserves_insert_with_array",
- params,
- &helper_cb,
- &ctx);
- if ( (qs < 0) ||
- (GNUNET_OK != ctx.status) )
+ /* NOTE: kind-of pointless to explicitly start a transaction here... */
+ if (GNUNET_OK !=
+ TEH_PG_preflight (pg))
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to insert into reserves (%d)\n",
- qs);
+ GNUNET_break (0);
+ qs = GNUNET_DB_STATUS_HARD_ERROR;
goto finished;
}
- need_update = ctx.needs_update;
- }
-
- {
- enum GNUNET_DB_QueryStatus cs;
-
- cs = TEH_PG_commit (pg);
- if (cs < 0)
+ if (GNUNET_OK !=
+ TEH_PG_start_read_committed (pg,
+ "READ_COMMITED"))
{
- GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to commit\n");
- qs = cs;
+ GNUNET_break (0);
+ qs = GNUNET_DB_STATUS_HARD_ERROR;
goto finished;
}
- }
- for (unsigned int i = 0; i<reserves_length; i++)
- {
- results[i] = transaction_duplicates[i]
- ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
- : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
- }
-
- if (! need_update)
- {
- qs = reserves_length;
- goto finished;
- }
- if (GNUNET_OK !=
- TEH_PG_start (pg,
- "reserve-insert-continued"))
- {
- GNUNET_break (0);
- qs = GNUNET_DB_STATUS_HARD_ERROR;
- goto finished;
- }
-
- for (unsigned int i = 0; i<reserves_length; i++)
- {
- if (! conflicts[i])
- continue;
+ PREPARE (pg,
+ "reserves_insert_with_array",
+ "SELECT"
+ " transaction_duplicate"
+ ",ruuid"
+ " FROM exchange_do_array_reserves_insert"
+ " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10);");
{
- bool duplicate;
struct GNUNET_PQ_QueryParam params[] = {
- GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]),
+ GNUNET_PQ_query_param_timestamp (&gc),
GNUNET_PQ_query_param_timestamp (&reserve_expiration),
- GNUNET_PQ_query_param_uint64 (&wire_references[i]),
- TALER_PQ_query_param_amount (pg->conn,
- &balances[i]),
- GNUNET_PQ_query_param_string (exchange_account_names[i]),
- GNUNET_PQ_query_param_auto_from_type (&h_paytos[i]),
- GNUNET_PQ_query_param_string (notify_s[i]),
+ GNUNET_PQ_query_param_array_auto_from_type (reserves_length,
+ reserve_pubs,
+ pg->conn),
+ GNUNET_PQ_query_param_array_uint64 (reserves_length,
+ wire_references,
+ pg->conn),
+ TALER_PQ_query_param_array_amount (
+ reserves_length,
+ balances,
+ pg->conn),
+ GNUNET_PQ_query_param_array_ptrs_string (
+ reserves_length,
+ (const char **) exchange_account_names,
+ pg->conn),
+ GNUNET_PQ_query_param_array_timestamp (
+ reserves_length,
+ execution_times,
+ pg->conn),
+ GNUNET_PQ_query_param_array_auto_from_type (
+ reserves_length,
+ h_paytos,
+ pg->conn),
+ GNUNET_PQ_query_param_array_ptrs_string (
+ reserves_length,
+ (const char **) sender_account_details,
+ pg->conn),
+ GNUNET_PQ_query_param_array_ptrs_string (
+ reserves_length,
+ (const char **) notify_s,
+ pg->conn),
GNUNET_PQ_query_param_end
};
- struct GNUNET_PQ_ResultSpec rs[] = {
- GNUNET_PQ_result_spec_bool ("duplicate",
- &duplicate),
- GNUNET_PQ_result_spec_end
+ struct Context ctx = {
+ .reserve_uuids = reserve_uuids,
+ .transaction_duplicates = transaction_duplicates,
+ .conflicts = conflicts,
+ .needs_update = false,
+ .status = GNUNET_OK
};
- enum GNUNET_DB_QueryStatus qs;
- qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
- "reserves_update",
- params,
- rs);
- if (qs < 0)
+ qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
+ "reserves_insert_with_array",
+ params,
+ &helper_cb,
+ &ctx);
+ if ( (qs < 0) ||
+ (GNUNET_OK != ctx.status) )
{
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
- "Failed to update reserves (%d)\n",
+ "Failed to insert into reserves (%d)\n",
qs);
- results[i] = qs;
goto finished;
}
- results[i] = duplicate
+ need_update = ctx.needs_update;
+ }
+
+ {
+ enum GNUNET_DB_QueryStatus cs;
+
+ cs = TEH_PG_commit (pg);
+ if (cs < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to commit\n");
+ qs = cs;
+ goto finished;
+ }
+ }
+
+ for (unsigned int i = 0; i<reserves_length; i++)
+ {
+ if (transaction_duplicates[i])
+ dups++;
+ results[i] = transaction_duplicates[i]
+ ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
+ : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ }
+
+ if (! need_update)
+ {
+ qs = reserves_length;
+ goto finished;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Reserve update needed for some reserves in the batch\n");
+ PREPARE (pg,
+ "reserves_update",
+ "SELECT"
+ " out_duplicate AS duplicate "
+ "FROM exchange_do_batch_reserves_update"
+ " ($1,$2,$3,$4,$5,$6,$7);");
+
+ if (GNUNET_OK !=
+ TEH_PG_start (pg,
+ "reserve-insert-continued"))
+ {
+ GNUNET_break (0);
+ qs = GNUNET_DB_STATUS_HARD_ERROR;
+ goto finished;
+ }
+
+ for (unsigned int i = 0; i<reserves_length; i++)
+ {
+ if (transaction_duplicates[i])
+ continue;
+ if (! conflicts[i])
+ continue;
+ {
+ bool duplicate;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]),
+ GNUNET_PQ_query_param_timestamp (&reserve_expiration),
+ GNUNET_PQ_query_param_uint64 (&wire_references[i]),
+ TALER_PQ_query_param_amount (pg->conn,
+ &balances[i]),
+ GNUNET_PQ_query_param_string (exchange_account_names[i]),
+ GNUNET_PQ_query_param_auto_from_type (&h_paytos[i]),
+ GNUNET_PQ_query_param_string (notify_s[i]),
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_bool ("duplicate",
+ &duplicate),
+ GNUNET_PQ_result_spec_end
+ };
+ enum GNUNET_DB_QueryStatus qs;
+
+ qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "reserves_update",
+ params,
+ rs);
+ if (qs < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to update reserves (%d)\n",
+ qs);
+ results[i] = qs;
+ goto finished;
+ }
+ results[i] = duplicate
? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
: GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ }
+ }
+ {
+ enum GNUNET_DB_QueryStatus cs;
+
+ cs = TEH_PG_commit (pg);
+ if (cs < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to commit\n");
+ qs = cs;
+ goto finished;
+ }
}
- }
finished:
+ for (unsigned int i = 0; i<reserves_length; i++)
+ GNUNET_free (notify_s[i]);
+ if (qs < 0)
+ return qs;
+ rqs += qs;
+ }
GNUNET_PQ_event_do_poll (pg->conn);
- for (unsigned int i = 0; i<reserves_length; i++)
- GNUNET_free (notify_s[i]);
- return qs;
+ if (0 != dups)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "%u/%u duplicates among incoming transactions. Try increasing WIREWATCH_IDLE_SLEEP_INTERVAL in the [exchange] configuration section (if this happens a lot).\n",
+ dups,
+ real_reserves_length);
+ return rqs;
}