summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorChristian Grothoff <grothoff@gnunet.org>2023-05-18 14:45:28 +0200
committerChristian Grothoff <grothoff@gnunet.org>2023-05-18 14:45:28 +0200
commitbac71237632d31487c075a29f9e95d95ee7909bd (patch)
tree9b52aab659751e228e6989b48af450b2353e9673 /src
parent9f081d28d73357af41ae469028f0ef7548071a48 (diff)
downloadexchange-bac71237632d31487c075a29f9e95d95ee7909bd.tar.gz
exchange-bac71237632d31487c075a29f9e95d95ee7909bd.tar.bz2
exchange-bac71237632d31487c075a29f9e95d95ee7909bd.zip
array-based do_reserves_in_insert
Diffstat (limited to 'src')
-rw-r--r--src/exchangedb/exchange_do_reserves_in_insert.sql141
-rw-r--r--src/exchangedb/pg_reserves_in_insert.c89
2 files changed, 222 insertions, 8 deletions
diff --git a/src/exchangedb/exchange_do_reserves_in_insert.sql b/src/exchangedb/exchange_do_reserves_in_insert.sql
index dffcd8b55..bc1431ad5 100644
--- a/src/exchangedb/exchange_do_reserves_in_insert.sql
+++ b/src/exchangedb/exchange_do_reserves_in_insert.sql
@@ -963,3 +963,144 @@ BEGIN
CLOSE curs_transaction_exist;
RETURN;
END $$;
+
+
+
+
+
+
+
+
+
+
+
+
+
+CREATE OR REPLACE FUNCTION exchange_do_array_reserves_insert(
+ IN in_gc_date INT8,
+ IN in_reserve_expiration INT8,
+ IN ina_reserve_pub BYTEA[],
+ IN ina_wire_ref INT8[],
+ IN ina_credit_val INT8[],
+ IN ina_credit_frac INT4[],
+ IN ina_exchange_account_name VARCHAR[],
+ IN ina_execution_date INT8[],
+ IN ina_wire_source_h_payto BYTEA[],
+ IN ina_payto_uri VARCHAR[],
+ IN ina_notify TEXT[],
+ OUT transaction_duplicate BOOLEAN,
+ OUT ruuid INT8)
+LANGUAGE plpgsql
+AS $$
+DECLARE
+ curs REFCURSOR;
+DECLARE
+ conflict BOOL;
+DECLARE
+ dup BOOL;
+DECLARE
+ uuid INT8;
+DECLARE
+ i RECORD;
+BEGIN
+
+ INSERT INTO wire_targets
+ (wire_target_h_payto
+ ,payto_uri)
+ SELECT
+ wire_source_h_payto
+ ,payto_uri
+ FROM
+ UNNEST (ina_wire_source_h_payto) AS wire_source_h_payto
+ ,UNNEST (ina_payto_uri) AS payto_uri
+ ON CONFLICT DO NOTHING;
+
+ OPEN curs FOR
+ WITH reserve_changes AS (
+ SELECT
+ reserve_pub
+ ,wire_ref
+ ,credit_val
+ ,credit_frac
+ ,exchange_account_name
+ ,execution_date
+ ,wire_source_h_payto
+ ,payto_uri
+ ,notify
+ FROM
+ UNNEST (ina_reserve_pub) AS reserve_pub
+ ,UNNEST (ina_wire_ref) AS wire_ref
+ ,UNNEST (ina_credit_val) AS credit_val
+ ,UNNEST (ina_credit_frac) AS credit_frac
+ ,UNNEST (ina_exchange_account_name) AS exchange_account_name
+ ,UNNEST (ina_execution_date) AS execution_date
+ ,UNNEST (ina_wire_source_h_payto) AS wire_source_h_payto
+ ,UNNEST (ina_notify) AS notify;
+
+
+ <<loop>> LOOP
+ FETCH FROM curs INTO i;
+ IF NOT FOUND
+ THEN
+ EXIT loop;
+ END IF;
+
+ INSERT INTO reserves
+ (reserve_pub
+ ,current_balance_val
+ ,current_balance_frac
+ ,expiration_date
+ ,gc_date
+ ) VALUES (
+ i.reserve_pub
+ ,i.credit_val
+ ,i.credit_frac
+ ,in_reserve_expiration
+ ,in_gc_date
+ )
+ ON CONFLICT DO NOTHING
+ RETURNING reserve_uuid
+ INTO uuid;
+ conflict = NOT FOUND;
+
+ INSERT INTO reserves_in
+ (reserve_pub
+ ,wire_reference
+ ,credit_val
+ ,credit_frac
+ ,exchange_account_section
+ ,wire_source_h_payto
+ ,execution_date
+ ) VALUES (
+ i.reserve_pub
+ ,i.wire_reference
+ ,i.credit_val
+ ,i.credit_frac
+ ,i.exchange_account_section
+ ,i.wire_source_h_payto
+ ,i.execution_date
+ ON CONFLICT DO NOTHING;
+
+ IF NOT FOUND
+ THEN
+ IF conflict
+ THEN
+ dup = TRUE;
+ else
+ dup = FALSE;
+ END IF;
+ ELSE
+ IF NOT conflict
+ THEN
+ EXECUTE FORMAT (
+ 'NOTIFY %s'
+ ,i.notify);
+ END IF;
+ dup = FALSE;
+ END IF;
+ RETURN (dup,uuid);
+ END LOOP loop_reserve;
+ CLOSE curs;
+
+ RETURN;
+END $$;
diff --git a/src/exchangedb/pg_reserves_in_insert.c b/src/exchangedb/pg_reserves_in_insert.c
index 1b7e62d99..72fde7499 100644
--- a/src/exchangedb/pg_reserves_in_insert.c
+++ b/src/exchangedb/pg_reserves_in_insert.c
@@ -619,12 +619,36 @@ TEH_PG_reserves_in_insert (
#if 0
+/**
+ * Closure for our helper_cb()
+ */
struct Context
{
+ /**
+ * Array of reserve UUIDs to initialize.
+ */
uint64_t *reserve_uuids;
+
+ /**
+ * Array with entries set to 'true' for duplicate transactions.
+ */
bool *transaction_duplicates;
+
+ /**
+ * Array with entries set to 'true' for rows with conflicts.
+ */
bool *conflicts;
+
+ /**
+ * Set to #GNUNET_SYSERR on failures.
+ */
struct GNUNET_GenericReturnValue status;
+
+ /**
+ * Single value (no array) set to true if we need
+ * to follow-up with an update.
+ */
+ bool *needs_update;
};
@@ -665,6 +689,7 @@ helper_cb (void *cls,
ctx->status = GNUNET_SYSERR;
return;
}
+ *ctx->need_update |= ctx->conflicts[i];
}
}
@@ -685,7 +710,6 @@ TEH_PG_reserves_in_insertN (
const char *sender_account_details[GNUNET_NZL (reserves_length)];
const char *exchange_account_names[GNUNET_NZL (reserves_length)];
uint64_t wire_references[GNUNET_NZL (reserves_length)];
-
uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];
bool transaction_duplicates[GNUNET_NZL (reserves_length)];
bool conflicts[GNUNET_NZL (reserves_length)];
@@ -693,6 +717,7 @@ TEH_PG_reserves_in_insertN (
= GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
struct GNUNET_TIME_Timestamp gc
= GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time);
+ bool needs_update = false;
enum GNUNET_DB_QueryStatus qs;
for (unsigned int i = 0; i<reserves_length; i++)
@@ -709,6 +734,25 @@ TEH_PG_reserves_in_insertN (
exchange_account_names[i] = reserve->exchange_account_name;
wire_references[i] = reserve->wire_reference;
}
+
+ /* NOTE: kind-of pointless to explicitly start a transaction here... */
+ if (GNUNET_OK !=
+ TEH_PG_preflight (pg))
+ {
+ GNUNET_break (0);
+ qs = GNUNET_DB_STATUS_HARD_ERROR;
+ goto finished;
+ }
+
+ if (GNUNET_OK !=
+ TEH_PG_start_read_committed (pg,
+ "READ_COMMITED"))
+ {
+ GNUNET_break (0);
+ qs = GNUNET_DB_STATUS_HARD_ERROR;
+ goto finished;
+ }
+
PREPARE (pg,
"reserves_insert_with_array",
"SELECT"
@@ -752,6 +796,7 @@ TEH_PG_reserves_in_insertN (
.reserve_uuids = reserve_uuids,
.transaction_duplicates = transaction_duplicates,
.conflicts = conflicts,
+ .needs_update = &needs_update,
.status = GNUNET_OK
};
@@ -766,12 +811,42 @@ TEH_PG_reserves_in_insertN (
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to insert into reserves (%d)\n",
qs);
- for (unsigned int i = 0; i<reserves_length; i++)
- GNUNET_free (rrs[i].notify_s);
- return qs;
+ goto finished;
}
}
+ {
+ enum GNUNET_DB_QueryStatus cs;
+
+ cs = TEH_PG_commit (pg);
+ if (cs < 0)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to commit\n");
+ qs = cs;
+ goto finished;
+ }
+ }
+ for (unsigned int i = 0; i<reserves_length; i++)
+ {
+ results[i] = transaction_duplicates[i]
+ ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
+ : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ }
+
+ if (! need_update)
+ {
+ qs = reserves_length;
+ goto finished;
+ }
+ if (GNUNET_OK !=
+ TEH_PG_start (pg,
+ "reserve-insert-continued"))
+ {
+ GNUNET_break (0);
+ qs = GNUNET_DB_STATUS_HARD_ERROR;
+ goto finished;
+ }
for (unsigned int i = 0; i<reserves_length; i++)
{
@@ -806,16 +881,14 @@ TEH_PG_reserves_in_insertN (
"Failed to update reserves (%d)\n",
qs);
results[i] = qs;
- for (unsigned int i = 0; i<reserves_length; i++)
- GNUNET_free (rrs[i].notify_s);
- return qs;
+ goto finished;
}
results[i] = duplicate
? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
: GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
}
- // FIXME: convert results back for caller, too!
+finished:
for (unsigned int i = 0; i<reserves_length; i++)
GNUNET_free (rrs[i].notify_s);
return qs;