summaryrefslogtreecommitdiff
path: root/src/exchangedb/spi
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchangedb/spi')
-rw-r--r--src/exchangedb/spi/Makefile9
-rw-r--r--src/exchangedb/spi/README.md37
-rw-r--r--src/exchangedb/spi/own_test.c873
-rw-r--r--src/exchangedb/spi/own_test.control4
-rw-r--r--src/exchangedb/spi/own_test.sql201
-rw-r--r--src/exchangedb/spi/perf_own_test.c25
-rw-r--r--src/exchangedb/spi/pg_aggregate.c411
7 files changed, 1560 insertions, 0 deletions
diff --git a/src/exchangedb/spi/Makefile b/src/exchangedb/spi/Makefile
new file mode 100644
index 000000000..d654d91e9
--- /dev/null
+++ b/src/exchangedb/spi/Makefile
@@ -0,0 +1,9 @@
+EXTENSION = own_test
+MODULES = own_test
+DATA = own_test.sql
+PG_CPPFLAGS = -I /usr/include/postgresql
+
+# postgresql build stuff
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
diff --git a/src/exchangedb/spi/README.md b/src/exchangedb/spi/README.md
new file mode 100644
index 000000000..47eb37b94
--- /dev/null
+++ b/src/exchangedb/spi/README.md
@@ -0,0 +1,37 @@
+ Server Programming Interface (SPI)
+
+
+Overview
+========
+
+This folder contains results from an experiment by Joseph Xu
+to use the Postgres SPI. They are not currently used at all
+by the GNU Taler exchange.
+
+
+Dependencies
+============
+
+These are the direct dependencies for compiling the code:
+
+# apt-get install libpq-dev postgresql-server-dev-13
+# apt-get install libkrb5-dev
+# apt-get install libssl-dev
+
+
+Compilation
+===========
+
+$ make
+
+Loading functions
+=================
+
+# make install
+$ psql "$DB_NAME" < own_test.sql
+
+
+Calling functions
+==================
+
+$ psql -c "SELECT $FUNCTION_NAME($ARGS);" "$DB_NAME"
diff --git a/src/exchangedb/spi/own_test.c b/src/exchangedb/spi/own_test.c
new file mode 100644
index 000000000..ac72fad7b
--- /dev/null
+++ b/src/exchangedb/spi/own_test.c
@@ -0,0 +1,873 @@
+#include "postgres.h"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <postgresql/libpq-fe.h>
+#include <libpq-int.h>
+#include <catalog/pg_type.h>
+#include <executor/spi.h>
+#include <funcapi.h>
+#include <fmgr.h>
+#include <utils/builtins.h>
+#include <utils/array.h>
+#include <sys/time.h>
+#include <utils/numeric.h>
+#include <utils/timestamp.h>
+#include <utils/bytea.h>
+
+#ifdef PG_MODULE_MAGIC
+PG_MODULE_MAGIC;
+#endif
+
+typedef struct
+{
+ Datum col1;
+ Datum col2;
+} valuest;
+
+void _PG_init (void);
+
+void _PG_fini (void);
+
+void
+_PG_init (void)
+{
+}
+
+
+PG_FUNCTION_INFO_V1 (pg_spi_insert_int);
+PG_FUNCTION_INFO_V1 (pg_spi_select_from_x);
+PG_FUNCTION_INFO_V1 (pg_spi_select_pair_from_y);
+// PG_FUNCTION_INFO_V1(pg_spi_select_with_cond);
+PG_FUNCTION_INFO_V1 (pg_spi_update_y);
+PG_FUNCTION_INFO_V1 (pg_spi_prepare_example);
+PG_FUNCTION_INFO_V1 (pg_spi_prepare_example_without_saveplan);
+PG_FUNCTION_INFO_V1 (pg_spi_prepare_insert);
+PG_FUNCTION_INFO_V1 (pg_spi_prepare_insert_without_saveplan);
+// PG_FUNCTION_INFO_V1(pg_spi_prepare_select_with_cond);
+PG_FUNCTION_INFO_V1 (pg_spi_prepare_select_with_cond_without_saveplan);
+PG_FUNCTION_INFO_V1 (pg_spi_prepare_update);
+PG_FUNCTION_INFO_V1 (pg_spi_get_dep_ref_fees);
+// SIMPLE SELECT
+Datum
+pg_spi_prepare_example (PG_FUNCTION_ARGS)
+{
+ static SPIPlanPtr prepared_plan;
+ int ret;
+ int64 result;
+ char *value;
+ SPIPlanPtr new_plan;
+
+ ret = SPI_connect ();
+ if (ret != SPI_OK_CONNECT)
+ {
+ elog (ERROR, "DB connection failed ! \n");
+ }
+ {
+ if (prepared_plan == NULL)
+ {
+ new_plan = SPI_prepare ("SELECT 1 FROM X", 0, NULL);
+ prepared_plan = SPI_saveplan (new_plan);
+
+ if (prepared_plan == NULL)
+ {
+ elog (ERROR, "FAIL TO SAVE !\n");
+ }
+ }
+
+ ret = SPI_execute_plan (prepared_plan, NULL, 0,false, 0);
+ if (ret != SPI_OK_SELECT)
+ {
+ elog (ERROR, "SELECT FAILED %d !\n", ret);
+ }
+
+ if (SPI_tuptable != NULL && SPI_tuptable->vals != NULL &&
+ SPI_tuptable->tupdesc != NULL)
+ {
+ value = SPI_getvalue (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
+ result = atoi (value);
+ }
+ else
+ {
+ elog (ERROR, "EMPTY TABLE !\n");
+ }
+ }
+ SPI_finish ();
+ PG_RETURN_INT64 (result);
+}
+
+
+Datum
+pg_spi_prepare_example_without_saveplan (PG_FUNCTION_ARGS)
+{
+ int ret;
+ int64 result;
+ char *value;
+ SPIPlanPtr new_plan;
+
+ ret = SPI_connect ();
+ if (ret != SPI_OK_CONNECT)
+ {
+ elog (ERROR, "DB connection failed ! \n");
+ }
+
+ {
+ new_plan = SPI_prepare ("SELECT 1 FROM X", 0, NULL);
+ ret = SPI_execute_plan (new_plan, NULL, 0,false, 0);
+ if (ret != SPI_OK_SELECT)
+ {
+ elog (ERROR, "SELECT FAILED %d !\n", ret);
+ }
+
+ if (SPI_tuptable != NULL
+ && SPI_tuptable->vals != NULL
+ && SPI_tuptable->tupdesc != NULL)
+ {
+ value = SPI_getvalue (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1);
+ result = atoi (value);
+ }
+ else
+ {
+ elog (ERROR, "EMPTY TABLE !\n");
+ }
+ }
+ SPI_finish ();
+ PG_RETURN_INT64 (result);// PG_RETURN_INT64(result);
+}
+
+
+// SELECT 1 FROM X
+// V1
+Datum
+pg_spi_select_from_x (PG_FUNCTION_ARGS)
+{
+ int ret;
+ char *query = "SELECT 1 FROM X";
+ uint64 proc;
+ ret = SPI_connect ();
+
+ if (ret != SPI_OK_CONNECT)
+ {
+ elog (ERROR, "SPI_connect failed");
+ }
+
+ ret = SPI_exec (query, 10);
+ proc = SPI_processed;
+ if (ret != SPI_OK_SELECT)
+ {
+ elog (ERROR, "SPI_exec failed");
+ }
+
+ SPI_finish ();
+
+ PG_RETURN_INT64 (proc);
+}
+
+
+// INSERT INTO X VALUES (1)
+Datum
+pg_spi_insert_int (PG_FUNCTION_ARGS)
+{
+ int ret;
+ int nargs;
+ Oid argtypes[1];
+ Datum values[1];
+ char *query = "INSERT INTO X (a) VALUES ($1)";
+
+ ret = SPI_connect ();
+ if (ret != SPI_OK_CONNECT)
+ {
+ elog (ERROR, "SPI_connect failed");
+ }
+
+ nargs = 1;
+ argtypes[0] = INT4OID;
+ values[0] = Int32GetDatum (3);
+
+ ret = SPI_execute_with_args (query, nargs, argtypes, values, NULL, false, 0);
+ if (ret != SPI_OK_INSERT)
+ {
+ elog (ERROR, "SPI_execute_with_args failed");
+ }
+
+ SPI_finish ();
+
+ PG_RETURN_VOID ();
+}
+
+
+Datum
+pg_spi_prepare_insert (PG_FUNCTION_ARGS)
+{
+ static SPIPlanPtr prepared_plan = NULL;
+ int ret;
+ int nargs;
+ Oid argtypes[1];
+ Datum values[1];
+ const char *query = "INSERT INTO X (a) VALUES ($1)";
+ SPIPlanPtr new_plan;
+ ret = SPI_connect ();
+ if (ret != SPI_OK_CONNECT)
+ {
+ elog (ERROR, "SPI_connect failed ! \n");
+ }
+ if (prepared_plan == NULL)
+ {
+
+ argtypes[0] = INT4OID;
+ nargs = 1;
+ values[0] = Int32GetDatum (3);
+ new_plan = SPI_prepare (query, nargs, argtypes);
+ if (new_plan== NULL)
+ {
+ elog (ERROR, "SPI_prepare failed ! \n");
+ }
+ prepared_plan = SPI_saveplan (new_plan);
+ if (prepared_plan == NULL)
+ {
+ elog (ERROR, "SPI_saveplan failed ! \n");
+ }
+ }
+
+ ret = SPI_execute_plan (prepared_plan, values, NULL, false, 0);
+ if (ret != SPI_OK_INSERT)
+ {
+ elog (ERROR, "SPI_execute_plan failed ! \n");
+ }
+
+ SPI_finish ();
+
+ PG_RETURN_VOID ();
+}
+
+
+/*
+Datum
+pg_spi_prepare_insert_bytea(PG_FUNCTION_ARGS)
+{
+ static SPIPlanPtr prepared_plan = NULL;
+ int ret;
+ int nargs;
+ Oid argtypes[1];
+ Datum values[1];
+ Oid argtypes2[1];
+ Datum val[1];
+ char *query = "INSERT INTO X (a) VALUES ($1)";
+ SPIPlanPtr new_plan;
+ ret = SPI_connect();
+ if (ret != SPI_OK_CONNECT)
+ {
+ elog(ERROR, "SPI_connect failed ! \n");
+ }
+ if (prepared_plan == NULL) {
+ argtypes2[0] = BOOLOID;
+ val[0] = BoolGetDatum();
+ argtypes[0] = BYTEAOID;
+ nargs = 1;
+ values[0] = Int32GetDatum(3);
+ new_plan = SPI_prepare(query, nargs, argtypes);
+ if (new_plan== NULL)
+ {
+ elog(ERROR, "SPI_prepare failed ! \n");
+ }
+ prepared_plan = SPI_saveplan(new_plan);
+ if (prepared_plan == NULL)
+ {
+ elog(ERROR, "SPI_saveplan failed ! \n");
+ }
+ }
+
+ ret = SPI_execute_plan(prepared_plan, values, NULL, false, 0);
+ if (ret != SPI_OK_INSERT)
+ {
+ elog(ERROR, "SPI_execute_plan failed ! \n");
+ }
+
+ SPI_finish();
+
+ PG_RETURN_VOID();
+}
+*/
+
+Datum
+pg_spi_prepare_insert_without_saveplan (PG_FUNCTION_ARGS)
+{
+ int ret;
+ int nargs;
+ Oid argtypes[1];
+ Datum values[1];
+ const char *query = "INSERT INTO X (a) VALUES ($1)";
+ SPIPlanPtr new_plan;
+ ret = SPI_connect ();
+ if (ret != SPI_OK_CONNECT)
+ {
+ elog (ERROR, "SPI_connect failed");
+ }
+ {
+ argtypes[0] = INT4OID;
+ nargs = 1;
+ values[0] = Int32GetDatum (3);
+ new_plan = SPI_prepare (query, nargs, argtypes);
+ if (new_plan== NULL)
+ {
+ elog (ERROR, "SPI_prepare failed");
+ }
+ }
+
+ ret = SPI_execute_plan (new_plan, values, NULL, false, 0);
+ if (ret != SPI_OK_INSERT)
+ {
+ elog (ERROR, "SPI_execute_plan failed");
+ }
+
+ SPI_finish ();
+
+ PG_RETURN_VOID ();
+}
+
+
+/*
+Datum
+pg_spi_select_pair_from_y(PG_FUNCTION_ARGS)
+{
+ int ret;
+ valuest result;
+ bool isnull;
+ char *query = "SELECT 1,1 FROM Y";
+ result.col1 = 0;
+ result.col2 = 0;
+
+ if ((ret = SPI_connect()) < 0) {
+ fprintf(stderr, "SPI_connect returned %d\n", ret);
+ exit(1);
+ }
+ ret = SPI_exec(query, 0);
+ if (ret == SPI_OK_SELECT && SPI_processed > 0) {
+ int i;
+ SPITupleTable *tuptable = SPI_tuptable;
+ TupleDesc tupdesc = tuptable->tupdesc;
+ for (i = 0; i < SPI_processed; i++) {
+ HeapTuple tuple = tuptable->vals[i];
+ result.col1 = SPI_getbinval(tuple, tupdesc, 1, &isnull);
+ result.col2 = SPI_getbinval(tuple, tupdesc, 2, &isnull);
+ }
+ }
+ SPI_finish();
+ PG_RETURN_TEXT_P(result);
+}
+*/
+
+// SELECT X FROM Y WHERE Z=$1
+/*
+Datum
+pg_spi_select_with_cond(PG_FUNCTION_ARGS)
+{
+ int ret;
+ char *query;
+ int nargs;
+ Oid argtypes[1];
+ Datum values[1];
+ uint64 proc;
+ query = "SELECT col1 FROM Y WHERE col2 = $1";
+
+ ret = SPI_connect();
+ if (ret != SPI_OK_CONNECT) {
+ elog(ERROR, "SPI_connect failed: %d", ret);
+ }
+ nargs = 1;
+ argtypes[0] = INT4OID;
+ values[0] = Int32GetDatum(2);
+
+ ret = SPI_execute_with_args(query, nargs, argtypes, values, NULL, false, 0);
+ proc = SPI_processed;
+ if (ret != SPI_OK_SELECT)
+ {
+ elog(ERROR, "SPI_execute_with_args failed");
+ }
+
+ SPI_finish();
+
+
+ PG_RETURN_INT64(proc);
+ }*/
+
+////////SELECT WITH COND
+/*
+Datum pg_spi_prepare_select_with_cond(PG_FUNCTION_ARGS) {
+ static SPIPlanPtr prepared_plan = NULL;
+ SPIPlanPtr new_plan;
+ int ret;
+ Datum values[1];
+ uint64 proc;
+ int nargs;
+ Oid argtypes[1];
+ char *query = "SELECT col1 FROM Y WHERE col1 = $1";
+ int result = 0;
+
+ ret = SPI_connect();
+ if (ret != SPI_OK_CONNECT)
+ elog(ERROR, "SPI_connect failed ! \n");
+
+ if (prepared_plan == NULL) {
+
+ argtypes[0] = INT4OID;
+ nargs = 1;
+ values[0] = DatumGetByteaP(SPI_getbinval(tuptable->vals[0], tupdesc, 1, &isnull)); //Value col2
+
+ new_plan = SPI_prepare(query, nargs, argtypes);
+ if (new_plan == NULL)
+ elog(ERROR, "SPI_prepare failed ! \n");
+
+ prepared_plan = SPI_saveplan(new_plan);
+ if (prepared_plan == NULL)
+ elog(ERROR, "SPI_saveplan failed ! \n");
+ }
+
+
+ ret = SPI_execute_plan(prepared_plan, values, NULL, false, 0);
+
+ if (ret != SPI_OK_SELECT) {
+ elog(ERROR, "SPI_execute_plan failed: %d \n", ret);
+ }
+
+ proc = SPI_processed;
+
+ if (proc > 0) {
+ SPITupleTable *tuptable = SPI_tuptable;
+ TupleDesc tupdesc = tuptable->tupdesc;
+ HeapTuple tuple;
+ int i;
+
+ for (i = 0; i < proc; i++) {
+ tuple = tuptable->vals[i];
+ for (int j = 1; j <= tupdesc->natts; j++) {
+ char * value = SPI_getvalue(tuple, tupdesc, j);
+ result += atoi(value);
+ }
+ }
+ }
+ SPI_finish();
+ PG_RETURN_INT64(result);
+}
+*/
+
+Datum
+pg_spi_prepare_select_with_cond_without_saveplan (PG_FUNCTION_ARGS)
+{
+
+ SPIPlanPtr new_plan;
+ int ret;
+ Datum values[1];
+ uint64 proc;
+ int nargs;
+ Oid argtypes[1];
+ char *query = "SELECT col1 FROM Y WHERE col2 = $1";
+ int result = 0;
+
+ ret = SPI_connect ();
+ if (ret != SPI_OK_CONNECT)
+ elog (ERROR, "SPI_connect failed ! \n");
+
+ {
+
+ argtypes[0] = INT4OID;
+ nargs = 1;
+ values[0] = Int32GetDatum (2); // Value col2
+
+ new_plan = SPI_prepare (query, nargs, argtypes);
+ if (new_plan == NULL)
+ elog (ERROR, "SPI_prepare failed ! \n");
+
+ }
+
+
+ ret = SPI_execute_plan (new_plan, values, NULL, false, 0);
+
+ if (ret != SPI_OK_SELECT)
+ {
+ elog (ERROR, "SPI_execute_plan failed: %d \n", ret);
+ }
+
+ proc = SPI_processed;
+
+ if (proc > 0)
+ {
+ SPITupleTable *tuptable = SPI_tuptable;
+ TupleDesc tupdesc = tuptable->tupdesc;
+ HeapTuple tuple;
+ int i;
+
+ for (i = 0; i < proc; i++)
+ {
+ tuple = tuptable->vals[i];
+ for (int j = 1; j <= tupdesc->natts; j++)
+ {
+ char *value = SPI_getvalue (tuple, tupdesc, j);
+ result += atoi (value);
+ }
+ }
+ }
+ SPI_finish ();
+ PG_RETURN_INT64 (result);
+}
+
+
+Datum
+pg_spi_update_y (PG_FUNCTION_ARGS)
+{
+ int ret;
+ int nargs;
+ Oid argtypes[1];
+ Datum values[1];
+ const char *query = "UPDATE Y SET col1 = 4 WHERE (col2 = $1)";
+
+ ret = SPI_connect ();
+ if (ret != SPI_OK_CONNECT)
+ {
+ elog (ERROR, "SPI_connect failed ! \n");
+ }
+
+ nargs = 1;
+ argtypes[0] = INT4OID;
+ values[0] = Int32GetDatum (0);
+
+ ret = SPI_execute_with_args (query, nargs, argtypes, values, NULL, false, 0);
+ if (ret != SPI_OK_UPDATE)
+ {
+ elog (ERROR, "SPI_execute_with_args failed ! \n");
+ }
+
+ SPI_finish ();
+
+ PG_RETURN_VOID ();
+}
+
+
+Datum
+pg_spi_prepare_update (PG_FUNCTION_ARGS)
+{
+ static SPIPlanPtr prepared_plan = NULL;
+ SPIPlanPtr new_plan;
+ int ret;
+ int nargs;
+ Oid argtypes[1];
+ Datum values[1];
+ const char *query = "UPDATE Y SET col1 = 4 WHERE (col2 = $1)";
+
+ ret = SPI_connect ();
+ if (ret != SPI_OK_CONNECT)
+ {
+ elog (ERROR, "SPI_connect failed ! \n");
+ }
+
+ if (prepared_plan == NULL)
+ {
+ argtypes[0] = INT4OID;
+ nargs = 1;
+ values[0] = Int32GetDatum (3);
+ // PREPARE
+ new_plan = SPI_prepare (query, nargs, argtypes);
+ if (new_plan == NULL)
+ elog (ERROR, "SPI_prepare failed ! \n");
+ // SAVEPLAN
+ prepared_plan = SPI_saveplan (new_plan);
+ if (prepared_plan == NULL)
+ elog (ERROR, "SPI_saveplan failed ! \n");
+ }
+ ret = SPI_execute_plan (prepared_plan, values, NULL, false, 0);
+ if (ret != SPI_OK_UPDATE)
+ elog (ERROR, "SPI_execute_plan failed ! \n");
+
+ SPI_finish ();
+ PG_RETURN_VOID ();
+}
+
+
+/*
+Datum
+pg_spi_prepare_update_without_saveplan(PG_FUNCTION_ARGS)
+{}*/
+void
+_PG_fini (void)
+{
+}
+
+
+/*
+
+*/
+
+
+Datum
+pg_spi_get_dep_ref_fees (PG_FUNCTION_ARGS)
+{
+ /* Define plan to save */
+ static SPIPlanPtr deposit_plan;
+ static SPIPlanPtr ref_plan;
+ static SPIPlanPtr fees_plan;
+ static SPIPlanPtr dummy_plan;
+ /* Define variables to update */
+ Timestamp refund_deadline = PG_GETARG_TIMESTAMP (0);
+ bytea *merchant_pub = PG_GETARG_BYTEA_P (1);
+ bytea *wire_target_h_payto = PG_GETARG_BYTEA_P (2);
+ bytea *wtid_raw = PG_GETARG_BYTEA_P (3);
+ bool is_null;
+ /* Define variables to store the results of each SPI query */
+ uint64_t sum_deposit_val = 0;
+ uint32_t sum_deposit_frac = 0;
+ uint64_t s_refund_val = 0;
+ uint32_t s_refund_frac = 0;
+ uint64_t sum_dep_fee_val = 0;
+ uint32_t sum_dep_fee_frac = 0;
+ uint64_t norm_refund_val = 0;
+ uint32_t norm_refund_frac = 0;
+ uint64_t sum_refund_val = 0;
+ uint32_t sum_refund_frac = 0;
+ /* Define variables to store the Tuptable */
+ SPITupleTable *dep_res;
+ SPITupleTable *ref_res;
+ SPITupleTable *ref_by_coin_res;
+ SPITupleTable *norm_ref_by_coin_res;
+ SPITupleTable *fully_refunded_coins_res;
+ SPITupleTable *fees_res;
+ SPITupleTable *dummys_res;
+ /* Define variable to update */
+ Datum values_refund[2];
+ Datum values_deposit[3];
+ Datum values_fees[2];
+ Datum values_dummys[2];
+ TupleDesc tupdesc;
+ /* Define variables to replace some tables */
+ bytea *ref_by_coin_coin_pub;
+ int64 ref_by_coin_deposit_serial_id = 0;
+ bytea *norm_ref_by_coin_coin_pub;
+ int64_t norm_ref_by_coin_deposit_serial_id = 0;
+ bytea *new_dep_coin_pub = NULL;
+ int res = SPI_connect ();
+
+ /* Connect to SPI */
+ if (res < 0)
+ {
+ elog (ERROR, "Could not connect to SPI manager");
+ }
+ if (deposit_plan == NULL)
+ {
+ const char *dep_sql;
+ SPIPlanPtr new_plan;
+
+ // Execute first query and store results in variables
+ dep_sql =
+ "UPDATE deposits SET done=TRUE "
+ "WHERE NOT (done OR policy_blocked) "
+ "AND refund_deadline=$1 "
+ "AND merchant_pub=$2 "
+ "AND wire_target_h_payto=$3 "
+ "RETURNING "
+ "deposit_serial_id,"
+ "coin_pub,"
+ "amount_with_fee_val,"
+ "amount_with_fee_frac;";
+ fprintf (stderr, "dep sql %d\n", 1);
+ new_plan =
+ SPI_prepare (dep_sql, 4,(Oid[]){INT8OID, BYTEAOID, BYTEAOID});
+ fprintf (stderr, "dep sql %d\n", 2);
+ if (new_plan == NULL)
+ elog (ERROR, "SPI_prepare failed for dep \n");
+ deposit_plan = SPI_saveplan (new_plan);
+ if (deposit_plan == NULL)
+ elog (ERROR, "SPI_saveplan failed for dep \n");
+ }
+ fprintf (stdout, "dep sql %d\n", 3);
+
+ values_deposit[0] = Int64GetDatum (refund_deadline);
+ values_deposit[1] = PointerGetDatum (merchant_pub);
+ values_deposit[2] = PointerGetDatum (wire_target_h_payto);
+
+ res = SPI_execute_plan (deposit_plan,
+ values_deposit,
+ NULL,
+ true,
+ 0);
+ fprintf (stdout, "dep sql %d\n", 4);
+ if (res != SPI_OK_UPDATE)
+ {
+ elog (ERROR, "Failed to execute subquery 1 \n");
+ }
+ // STORE TUPTABLE deposit
+ dep_res = SPI_tuptable;
+
+ for (unsigned int i = 0; i < SPI_processed; i++)
+ {
+ int64 dep_deposit_serial_ids = DatumGetInt64 (SPI_getbinval (
+ SPI_tuptable->vals[i],
+ SPI_tuptable->tupdesc, 1,
+ &is_null));
+ bytea *dep_coin_pub = DatumGetByteaP (SPI_getbinval (SPI_tuptable->vals[i],
+ SPI_tuptable->tupdesc,
+ 2, &is_null));
+ int64 dep_amount_val = DatumGetInt64 (SPI_getbinval (SPI_tuptable->vals[i],
+ SPI_tuptable->tupdesc,
+ 3, &is_null));
+ int32 dep_amount_frac = DatumGetInt32 (SPI_getbinval (SPI_tuptable->vals[i],
+ SPI_tuptable->tupdesc,
+ 4, &is_null));
+
+ if (is_null)
+ elog (ERROR, "Failed to retrieve data from deposit \n");
+ if (ref_plan == NULL)
+ {
+ // Execute second query with parameters from first query and store results in variables
+ const char *ref_sql =
+ "SELECT amount_with_fee_val, amount_with_fee_frac, coin_pub, deposit_serial_id "
+ "FROM refunds "
+ "WHERE coin_pub=$1 "
+ "AND deposit_serial_id=$2;";
+ SPIPlanPtr new_plan = SPI_prepare (ref_sql, 3, (Oid[]){BYTEAOID,
+ INT8OID});
+ if (new_plan == NULL)
+ elog (ERROR, "SPI_prepare failed for refund\n");
+ ref_plan = SPI_saveplan (new_plan);
+ if (ref_plan == NULL)
+ elog (ERROR, "SPI_saveplan failed for refund\n");
+ }
+ values_refund[0] = PointerGetDatum (dep_coin_pub);
+ values_refund[1] = Int64GetDatum (dep_deposit_serial_ids);
+ res = SPI_execute_plan (ref_plan,
+ values_refund,
+ NULL,
+ false,
+ 0);
+ if (res != SPI_OK_SELECT)
+ elog (ERROR, "Failed to execute subquery 2\n");
+ // STORE TUPTABLE refund
+ ref_res = SPI_tuptable;
+ for (unsigned int j = 0; j < SPI_processed; j++)
+ {
+ int64 ref_refund_val = DatumGetInt64 (SPI_getbinval (
+ SPI_tuptable->vals[j],
+ SPI_tuptable->tupdesc, 1,
+ &is_null));
+ int32 ref_refund_frac = DatumGetInt32 (SPI_getbinval (
+ SPI_tuptable->vals[j],
+ SPI_tuptable->tupdesc, 2,
+ &is_null));
+ bytea *ref_coin_pub = DatumGetByteaP (SPI_getbinval (
+ SPI_tuptable->vals[j],
+ SPI_tuptable->tupdesc, 3,
+ &is_null));
+ int64 ref_deposit_serial_id = DatumGetInt64 (SPI_getbinval (
+ SPI_tuptable->vals[j],
+ SPI_tuptable->tupdesc, 4,
+ &is_null));
+ // Execute third query with parameters from second query and store results in variables
+ ref_by_coin_coin_pub = ref_coin_pub;
+ ref_by_coin_deposit_serial_id = ref_deposit_serial_id;
+ // LOOP TO GET THE SUM FROM REFUND BY COIN
+ for (unsigned int i = 0; i<SPI_processed; i++)
+ {
+ if ((ref_by_coin_coin_pub ==
+ DatumGetByteaP (SPI_getbinval (SPI_tuptable->vals[i],
+ SPI_tuptable->tupdesc, 1,
+ &is_null)))
+ &&
+ (ref_by_coin_deposit_serial_id ==
+ DatumGetUInt64 (SPI_getbinval (SPI_tuptable->vals[i],
+ SPI_tuptable->tupdesc, 2,
+ &is_null)))
+ )
+ {
+ sum_refund_val += ref_refund_val;
+ sum_refund_frac += ref_refund_frac;
+ norm_ref_by_coin_coin_pub = ref_by_coin_coin_pub;
+ norm_ref_by_coin_deposit_serial_id = ref_by_coin_deposit_serial_id;
+ }
+ }// END SUM CALCULATION
+ // NORMALIZE REFUND VAL FRAC
+ norm_refund_val =
+ (sum_refund_val + sum_refund_frac) / 100000000;
+ norm_refund_frac =
+ sum_refund_frac % 100000000;
+ // Get refund values
+ s_refund_val += sum_refund_val;
+ s_refund_frac = sum_refund_frac;
+ }// END REFUND
+ if (norm_ref_by_coin_coin_pub == dep_coin_pub
+ && ref_by_coin_deposit_serial_id == dep_deposit_serial_ids
+ && norm_refund_val == dep_amount_val
+ && norm_refund_frac == dep_amount_frac)
+ {
+ new_dep_coin_pub = dep_coin_pub;
+ }
+ // Ensure we get the fee for each coin and not only once per denomination
+ if (fees_plan == NULL)
+ {
+ const char *fees_sql =
+ "SELECT "
+ " denom.fee_deposit_val AS fee_val, "
+ " denom.fee_deposit_frac AS fee_frac, "
+ "FROM known_coins kc"
+ "JOIN denominations denom USING (denominations_serial) "
+ "WHERE kc.coin_pub = $1 AND kc.coin_pub != $2;";
+ SPIPlanPtr new_plan = SPI_prepare (fees_sql, 3, (Oid[]){BYTEAOID,
+ BYTEAOID});
+ if (new_plan == NULL)
+ {
+ elog (ERROR, "SPI_prepare for fees failed ! \n");
+ }
+ fees_plan = SPI_saveplan (new_plan);
+ if (fees_plan == NULL)
+ {
+ elog (ERROR, "SPI_saveplan for fees failed ! \n");
+ }
+ }
+ values_fees[0] = PointerGetDatum (dep_coin_pub);
+ values_fees[1] = PointerGetDatum (new_dep_coin_pub);
+ res = SPI_execute_plan (fees_plan, values_fees, NULL, false, 0);
+ if (res != SPI_OK_SELECT)
+ elog (ERROR, "SPI_execute_plan failed for fees \n");
+ fees_res = SPI_tuptable;
+ tupdesc = fees_res->tupdesc;
+ for (unsigned int i = 0; i<SPI_processed; i++)
+ {
+ HeapTuple tuple = fees_res->vals[i];
+ bool is_null;
+ uint64_t fee_val = DatumGetUInt64 (SPI_getbinval (tuple, tupdesc, 1,
+ &is_null));
+ uint32_t fee_frac = DatumGetUInt32 (SPI_getbinval (tuple, tupdesc, 2,
+ &is_null));
+ uint64_t fees_deposit_serial_id = DatumGetUInt64 (SPI_getbinval (tuple,
+ tupdesc,
+ 3,
+ &is_null));
+ if (dummy_plan == NULL)
+ {
+ const char *insert_dummy_sql =
+ "INSERT INTO "
+ "aggregation_tracking(deposit_serial_id, wtid_raw)"
+ " VALUES ($1, $2)";
+
+ SPIPlanPtr new_plan = SPI_prepare (insert_dummy_sql, 2, (Oid[]){INT8OID,
+ BYTEAOID});
+ if (new_plan == NULL)
+ elog (ERROR, "FAILED to prepare aggregation tracking \n");
+ dummy_plan = SPI_saveplan (new_plan);
+ if (dummy_plan == NULL)
+ elog (ERROR, "FAILED to saveplan aggregation tracking\n");
+ }
+ values_dummys[0] = Int64GetDatum (dep_deposit_serial_ids);
+ values_dummys[1] = PointerGetDatum (wtid_raw);
+ res = SPI_execute_plan (dummy_plan, values_dummys, NULL, false, 0);
+ if (res != SPI_OK_INSERT)
+ elog (ERROR, "Failed to insert dummy\n");
+ dummys_res = SPI_tuptable;
+ // Calculation of deposit fees for not fully refunded deposits
+ sum_dep_fee_val += fee_val;
+ sum_dep_fee_frac += fee_frac;
+ }
+ // Get deposit values
+ sum_deposit_val += dep_amount_val;
+ sum_deposit_frac += dep_amount_frac;
+ }// END DEPOSIT
+ SPI_finish ();
+ PG_RETURN_VOID ();
+}
diff --git a/src/exchangedb/spi/own_test.control b/src/exchangedb/spi/own_test.control
new file mode 100644
index 000000000..4e73e207f
--- /dev/null
+++ b/src/exchangedb/spi/own_test.control
@@ -0,0 +1,4 @@
+comment = 'Example extension for testing purposes'
+default_version = '1.0'
+module_pathname = '$libdir/own_test'
+relocatable = true
diff --git a/src/exchangedb/spi/own_test.sql b/src/exchangedb/spi/own_test.sql
new file mode 100644
index 000000000..12729d068
--- /dev/null
+++ b/src/exchangedb/spi/own_test.sql
@@ -0,0 +1,201 @@
+DROP TABLE IF EXISTS X;
+CREATE TABLE X (
+ a integer
+);
+
+INSERT INTO X (a)
+ VALUES (1), (2), (3), (4), (5), (6), (7);
+
+DROP TABLE IF EXISTS Y;
+CREATE TABLE Y (col1 INT, col2 INT);
+INSERT INTO Y (col1,col2)
+ VALUES (1,2), (2,0), (0,4), (4,0), (0,6), (6,7), (7,8);
+
+DROP TABLE IF EXISTS Z;
+CREATE TABLE Z (col1 BYTEA);
+
+DROP TABLE IF EXISTS deposits;
+CREATE TABLE deposits(
+ deposit_serial_id BIGINT GENERATED BY DEFAULT AS IDENTITY
+ ,shard INT8 NOT NULL
+ ,coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)
+ ,known_coin_id INT8 NOT NULL
+ ,amount_with_fee_val INT8 NOT NULL
+ ,amount_with_fee_frac INT4 NOT NULL
+ ,wallet_timestamp INT8 NOT NULL
+ ,exchange_timestamp INT8 NOT NULL
+ ,refund_deadline INT8 NOT NULL
+ ,wire_deadline INT8 NOT NULL
+ ,merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32)
+ ,h_contract_terms BYTEA NOT NULL CHECK (LENGTH(h_contract_terms)=64)
+ ,coin_sig BYTEA NOT NULL CHECK (LENGTH(coin_sig)=64)
+ ,wire_salt BYTEA NOT NULL CHECK (LENGTH(wire_salt)=16)
+ ,wire_target_h_payto BYTEA CHECK (LENGTH(wire_target_h_payto)=32)
+ ,done BOOLEAN NOT NULL DEFAULT FALSE
+ ,policy_blocked BOOLEAN NOT NULL DEFAULT FALSE
+ ,policy_details_serial_id INT8);
+
+
+DROP FUNCTION IF EXISTS pg_spi_insert_int;
+CREATE FUNCTION pg_spi_insert_int()
+ RETURNS VOID
+ LANGUAGE c VOLATILE COST 100
+AS '$libdir/own_test', 'pg_spi_insert_int';
+
+DROP FUNCTION IF EXISTS pg_spi_select_from_x;
+CREATE FUNCTION pg_spi_select_from_x()
+ RETURNS INT8
+ LANGUAGE c COST 100
+AS '$libdir/own_test', 'pg_spi_select_from_x';
+
+/*
+CREATE FUNCTION pg_spi_select_pair_from_y()
+ RETURNS valuest
+ LANGUAGE c COST 100
+AS '$libdir/own_test', 'pg_spi_select_pair_from_y';
+*/
+/*CREATE FUNCTION pg_spi_select_with_cond()
+ RETURNS INT8
+ LANGUAGE c COST 100
+AS '$libdir/own_test', 'pg_spi_select_with_cond';
+*/
+
+DROP FUNCTION IF EXISTS pg_spi_update_y;
+CREATE FUNCTION pg_spi_update_y()
+ RETURNS VOID
+ LANGUAGE c VOLATILE COST 100
+AS '$libdir/own_test', 'pg_spi_update_y';
+
+DROP FUNCTION IF EXISTS pg_spi_prepare_example;
+CREATE FUNCTION pg_spi_prepare_example()
+ RETURNS INT8
+ LANGUAGE c COST 100
+AS '$libdir/own_test', 'pg_spi_prepare_example';
+
+DROP FUNCTION IF EXISTS pg_spi_prepare_example_without_saveplan;
+CREATE FUNCTION pg_spi_prepare_example_without_saveplan()
+ RETURNS INT8
+ LANGUAGE c COST 100
+AS '$libdir/own_test', 'pg_spi_prepare_example_without_saveplan';
+
+DROP FUNCTION IF EXISTS pg_spi_prepare_insert;
+CREATE FUNCTION pg_spi_prepare_insert()
+ RETURNS VOID
+ LANGUAGE c VOLATILE COST 100
+AS '$libdir/own_test', 'pg_spi_prepare_insert';
+
+DROP FUNCTION IF EXISTS pg_spi_prepare_insert_without_saveplan;
+CREATE FUNCTION pg_spi_prepare_insert_without_saveplan()
+ RETURNS VOID
+ LANGUAGE c VOLATILE COST 100
+AS '$libdir/own_test', 'pg_spi_prepare_insert_without_saveplan';
+
+/*
+CREATE FUNCTION pg_spi_prepare_select_with_cond()
+ RETURNS INT8
+ LANGUAGE c COST 100
+AS '$libdir/own_test', 'pg_spi_prepare_select_with_cond';
+*/
+
+DROP FUNCTION IF EXISTS pg_spi_prepare_select_with_cond_without_saveplan;
+CREATE FUNCTION pg_spi_prepare_select_with_cond_without_saveplan()
+ RETURNS INT8
+ LANGUAGE c COST 100
+AS '$libdir/own_test', 'pg_spi_prepare_select_with_cond_without_saveplan';
+
+DROP FUNCTION IF EXISTS pg_spi_prepare_update;
+CREATE FUNCTION pg_spi_prepare_update()
+ RETURNS VOID
+ LANGUAGE c VOLATILE COST 100
+AS '$libdir/own_test', 'pg_spi_prepare_update';
+
+DROP FUNCTION IF EXISTS pg_spi_get_dep_ref_fees;
+CREATE FUNCTION pg_spi_get_dep_ref_fees(
+ IN in_timestamp INT8
+ ,IN merchant_pub BYTEA
+ ,IN wire_target_h_payto BYTEA
+ ,IN wtid BYTEA
+)
+ RETURNS VOID
+ LANGUAGE c VOLATILE COST 100
+AS '$libdir/own_test', 'pg_spi_get_dep_ref_fees';
+
+DROP FUNCTION IF EXISTS update_pg_spi_get_dep_ref_fees;
+CREATE FUNCTION update_pg_spi_get_dep_ref_fees(
+ IN in_refund_deadline INT8,
+ IN in_merchant_pub BYTEA,
+ IN in_wire_target_h_payto BYTEA
+)
+RETURNS SETOF record
+LANGUAGE plpgsql VOLATILE
+AS $$
+DECLARE
+
+BEGIN
+RETURN QUERY
+ UPDATE deposits
+ SET done = TRUE
+ WHERE NOT (done OR policy_blocked)
+ AND refund_deadline < in_refund_deadline
+ AND merchant_pub = in_merchant_pub
+ AND wire_target_h_payto = in_wire_target_h_payto
+ RETURNING
+ deposit_serial_id,
+ coin_pub,
+ amount_with_fee_val,
+ amount_with_fee_frac;
+END $$;
+
+DROP FUNCTION IF EXISTS stored_procedure_update;
+CREATE FUNCTION stored_procedure_update(
+IN in_number INT8
+)
+RETURNS VOID
+LANGUAGE plpgsql
+AS $$
+BEGIN
+ UPDATE Y
+ SET col1 = 4
+ WHERE col2 = in_number;
+END $$;
+
+DROP FUNCTION IF EXISTS stored_procedure_select;
+CREATE FUNCTION stored_procedure_select(OUT out_value INT8)
+RETURNS INT8
+LANGUAGE plpgsql
+AS $$
+BEGIN
+ SELECT 1
+ INTO out_value
+ FROM X;
+ RETURN;
+END $$;
+
+
+DROP FUNCTION IF EXISTS stored_procedure_insert;
+CREATE FUNCTION stored_procedure_insert(
+IN in_number INT8,
+OUT out_number INT8)
+RETURNS INT8
+LANGUAGE plpgsql
+AS $$
+BEGIN
+ INSERT INTO X (a)
+ VALUES (in_number)
+ RETURNING a INTO out_number;
+END $$;
+
+DROP FUNCTION IF EXISTS stored_procedure_select_with_cond;
+CREATE FUNCTION stored_procedure_select_with_cond(
+IN in_number INT8,
+OUT out_number INT8
+)
+RETURNS INT8
+LANGUAGE plpgsql
+AS $$
+BEGIN
+ SELECT col1 INTO out_number
+ FROM Y
+ WHERE col2 = in_number;
+ RETURN;
+END $$;
diff --git a/src/exchangedb/spi/perf_own_test.c b/src/exchangedb/spi/perf_own_test.c
new file mode 100644
index 000000000..92be2235e
--- /dev/null
+++ b/src/exchangedb/spi/perf_own_test.c
@@ -0,0 +1,25 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2014-2023 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+/**
+ * @file exchangedb/spi/perf_own_test.c
+ * @brief benchmark for 'own_test'
+ * @author Joseph Xu
+ */
+#include "exchangedb/platform.h"
+#include "exchangedb/taler_exchangedb_lib.h"
+#include "exchangedb/taler_json_lib.h"
+#include "exchangedb/taler_exchangedb_plugin.h"
+#include "own_test.sql"
diff --git a/src/exchangedb/spi/pg_aggregate.c b/src/exchangedb/spi/pg_aggregate.c
new file mode 100644
index 000000000..721f247c7
--- /dev/null
+++ b/src/exchangedb/spi/pg_aggregate.c
@@ -0,0 +1,411 @@
+#include "postgres.h"
+#include "fmgr.h"
+#include "utils/numeric.h"
+#include "utils/builtins.h"
+#include "executor/spi.h"
+
+PG_MODULE_MAGIC;
+
+PG_FUNCTION_INFO_V1 (get_deposit_summary);
+
+Datum
+get_deposit_summary (PG_FUNCTION_ARGS)
+{
+
+ static SPIPlanPtr deposit_plan;
+ static SPIPlanPtr refund_plan;
+ static SPIPlanPtr refund_by_coin_plan;
+ static SPIPlanPtr norm_refund_by_coin_plan;
+ static SPIPlanPtr fully_refunded_by_coins_plan;
+ static SPIPlanPtr fees_plan;
+
+ int shard = PG_GETARG_INT32 (0);
+ char *sql;
+ char *merchant_pub = text_to_cstring (PG_GETARG_TEXT_P (1));
+ char *wire_target_h_payto = text_to_cstring (PG_GETARG_TEXT_P (2));
+ char *wtid_raw = text_to_cstring (PG_GETARG_TEXT_P (3));
+ int refund_deadline = PG_GETARG_INT32 (4);
+ int conn = SPI_connect ();
+ if (conn != SPI_OK_CONNECT)
+ {
+ elog (ERROR, "DB connection failed ! \n");
+ }
+
+ if (deposit_plan == NULL
+ || refund_plan == NULL
+ || refund_by_coin_plan == NULL
+ || norm_refund_by_coin_plan = NULL
+ || fully_refunded_coins_plan = NULL
+ || fees_plan
+ == NULL)
+ {
+ if (deposit_plan == NULL)
+ {
+ int nargs = 3;
+ Oid argtypes[3];
+ argtypes[0] = INT8OID;
+ argtypes[1] = BYTEAOID;
+ argtypes[2] = BYTEAOID;
+ const char *dep_sql =
+ " UPDATE deposits"
+ " SET done=TRUE"
+ " WHERE NOT (done OR policy_blocked)"
+ " AND refund_deadline < $1"
+ " AND merchant_pub = $2"
+ " AND wire_target_h_payto = $3"
+ " RETURNING"
+ " deposit_serial_id"
+ " ,coin_pub"
+ " ,amount_with_fee_val AS amount_val"
+ " ,amount_with_fee_frac AS amount_frac";
+ SPIPlanPtr new_plan =
+ SPI_prepare (dep_sql, 4, argtypes);
+ if (new_plan == NULL)
+ {
+ elog (ERROR, "SPI_prepare for deposit failed ! \n");
+ }
+ deposit_plan = SPI_saveplan (new_plan);
+ if (deposit_plan == NULL)
+ {
+ elog (ERROR, "SPI_saveplan for deposit failed ! \n");
+ }
+ }
+
+ Datum values[4];
+ values[0] = Int64GetDatum (refund_deadline);
+ values[1] = CStringGetDatum (merchant_pub);
+ values[2] = CStringGetDatum (wire_target_h_payto);
+ int ret = SPI_execute_plan (deposit_plan,
+ values,
+ NULL,
+ true,
+ 0);
+ if (ret != SPI_OK_UPDATE)
+ {
+ elog (ERROR, "Failed to execute subquery 1\n");
+ }
+ uint64_t *dep_deposit_serial_ids = palloc (sizeof(uint64_t)
+ * SPI_processed);
+ BYTEA **dep_coin_pubs = palloc (sizeof(BYTEA *) * SPI_processed);
+ uint64_t *dep_amount_vals = palloc (sizeof(uint64_t) * SPI_processed);
+ uint32_t *dep_amount_fracs = palloc (sizeof(uint32_t) * SPI_processed);
+ for (unsigned int i = 0; i < SPI_processed; i++)
+ {
+ HeapTuple tuple = SPI_tuptable->vals[i];
+ dep_deposit_serial_ids[i] =
+ DatumGetInt64 (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 1, &ret));
+ dep_coin_pubs[i] =
+ DatumGetByteaP (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 2, &ret));
+ dep_amount_vals[i] =
+ DatumGetInt64 (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 3, &ret));
+ dep_amount_fracs[i] =
+ DatumGetInt32 (SPI_getbinval (tuple, SPI_tuptable->tupdesc, 4, &ret));
+ }
+
+
+ if (refund_plan == NULL)
+ {
+ const char *ref_sql =
+ "ref AS ("
+ " SELECT"
+ " amount_with_fee_val AS refund_val"
+ " ,amount_with_fee_frac AS refund_frac"
+ " ,coin_pub"
+ " ,deposit_serial_id"
+ " FROM refunds"
+ " WHERE coin_pub IN (SELECT coin_pub FROM dep)"
+ " AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep)) ";
+ SPIPlanPtr new_plan = SPI_prepare (ref_sql, 0, NULL);
+ if (new_plan == NULL)
+ elog (ERROR, "SPI_prepare for refund failed ! \n");
+ refund_plan = SPI_saveplan (new_plan);
+ if (refund_plan == NULL)
+ {
+ elog (ERROR, "SPI_saveplan for refund failed ! \n");
+ }
+ }
+
+ int64t_t *ref_deposit_serial_ids = palloc (sizeof(int64_t) * SPI_processed);
+
+ int res = SPI_execute_plan (refund_plan, NULL, NULL, false, 0);
+ if (res != SPI_OK_SELECT)
+ {
+ elog (ERROR, "Failed to execute subquery 2\n");
+ }
+ SPITupleTable *tuptable = SPI_tuptable;
+ TupleDesc tupdesc = tuptable->tupdesc;
+ for (unsigned int i = 0; i < SPI_processed; i++)
+ {
+ HeapTuple tuple = tuptable->vals[i];
+ Datum refund_val = SPI_getbinval (tuple, tupdesc, 1, &refund_val_isnull);
+ Datum refund_frac = SPI_getbinval (tuple, tupdesc, 2,
+ &refund_frac_isnull);
+ Datum coin_pub = SPI_getbinval (tuple, tupdesc, 3, &coin_pub_isnull);
+ Datum deposit_serial_id = SPI_getbinval (tuple, tupdesc, 4,
+ &deposit_serial_id_isnull);
+ if (refund_val_isnull
+ || refund_frac_isnull
+ || coin_pub_isnull
+ || deposit_serial_id_isnull)
+ {
+ elog (ERROR, "Failed to retrieve data from subquery 2");
+ }
+ uint64_t refund_val_int = DatumGetUInt64 (refund_val);
+ uint32_t refund_frac_int = DatumGetUInt32 (refund_frac);
+ BYTEA coin_pub = DatumGetByteaP (coin_pub);
+ ref_deposit_serial_ids = DatumGetInt64 (deposit_serial_id);
+
+ refund *new_refund = (refund*) palloc (sizeof(refund));
+ new_refund->coin_pub = coin_pub_str;
+ new_refund->deposit_serial_id = deposit_serial_id_int;
+ new_refund->amount_with_fee_val = refund_val_int;
+ new_refund->amount_with_fee_frac = refund_frac_int;
+ }
+
+
+ if (refund_by_coin_plan == NULL)
+ {
+ const char *ref_by_coin_sql =
+ "ref_by_coin AS ("
+ " SELECT"
+ " SUM(refund_val) AS sum_refund_val"
+ " ,SUM(refund_frac) AS sum_refund_frac"
+ " ,coin_pub"
+ " ,deposit_serial_id"
+ " FROM ref"
+ " GROUP BY coin_pub, deposit_serial_id) ";
+ SPIPlanPtr new_plan = SPI_prepare (ref_by_coin_sql, 0, NULL);
+ if (new_plan == NULL)
+ elog (ERROR, "SPI_prepare for refund by coin failed ! \n");
+ refund_by_coin_plan = SPI_saveplan (new_plan);
+ if (refund_by_coin_plan == NULL)
+ elog (ERROR, "SPI_saveplan for refund failed");
+ }
+
+
+ int res = SPI_execute_plan (refund_by_coin_plan, NULL, NULL, false, 0);
+ if (res != SPI_OK_SELECT)
+ {
+ elog (ERROR, "Failed to execute subquery 2\n");
+ }
+
+ SPITupleTable *tuptable = SPI_tuptable;
+ TupleDesc tupdesc = tuptable->tupdesc;
+ for (unsigned int i = 0; i < SPI_processed; i++)
+ {
+ HeapTuple tuple = tuptable->vals[i];
+ Datum sum_refund_val = SPI_getbinval (tuple, tupdesc, 1,
+ &refund_val_isnull);
+ Datum sum_refund_frac = SPI_getbinval (tuple, tupdesc, 2,
+ &refund_frac_isnull);
+ Datum coin_pub = SPI_getbinval (tuple, tupdesc, 3, &coin_pub_isnull);
+ Datum deposit_serial_id_int = SPI_getbinval (tuple, tupdesc, 4,
+ &deposit_serial_id_isnull);
+ if (refund_val_isnull
+ || refund_frac_isnull
+ || coin_pub_isnull
+ || deposit_serial_id_isnull)
+ {
+ elog (ERROR, "Failed to retrieve data from subquery 2");
+ }
+ uint64_t s_refund_val_int = DatumGetUInt64 (sum_refund_val);
+ uint32_t s_refund_frac_int = DatumGetUInt32 (sum_refund_frac);
+ BYTEA coin_pub = DatumGetByteaP (coin_pub);
+ uint64_t deposit_serial_id_int = DatumGetInt64 (deposit_serial_id_int);
+ refund *new_refund_by_coin = (refund*) palloc (sizeof(refund));
+ new_refund_by_coin->coin_pub = coin_pub;
+ new_refund_by_coin->deposit_serial_id = deposit_serial_id_int;
+ new_refund_by_coin->refund_amount_with_fee_val = s_refund_val_int;
+ new_refund_by_coin->refund_amount_with_fee_frac = s_refund_frac_int;
+ }
+
+
+ if (norm_refund_by_coin_plan == NULL)
+ {
+ const char *norm_ref_by_coin_sql =
+ "norm_ref_by_coin AS ("
+ " SELECT"
+ " coin_pub"
+ " ,deposit_serial_id"
+ " FROM ref_by_coin) ";
+ SPIPlanPtr new_plan = SPI_prepare (norm_ref_by_coin_sql, 0, NULL);
+ if (new_plan == NULL)
+ elog (ERROR, "SPI_prepare for norm refund by coin failed ! \n");
+ norm_refund_by_coin_plan = SPI_saveplan (new_plan);
+ if (norm_refund_by_coin_plan == NULL)
+ elog (ERROR, "SPI_saveplan for norm refund by coin failed ! \n");
+ }
+
+ double norm_refund_val =
+ ((double) new_refund_by_coin->refund_amount_with_fee_val
+ + (double) new_refund_by_coin->refund_amount_with_fee_frac) / 100000000;
+ double norm_refund_frac =
+ (double) new_refund_by_coin->refund_amount_with_fee_frac % 100000000;
+
+ if (fully_refunded_coins_plan == NULL)
+ {
+ const char *fully_refunded_coins_sql =
+ "fully_refunded_coins AS ("
+ " SELECT"
+ " dep.coin_pub"
+ " FROM norm_ref_by_coin norm"
+ " JOIN dep"
+ " ON (norm.coin_pub = dep.coin_pub"
+ " AND norm.deposit_serial_id = dep.deposit_serial_id"
+ " AND norm.norm_refund_val = dep.amount_val"
+ " AND norm.norm_refund_frac = dep.amount_frac)) ";
+ SPIPlanPtr new_plan =
+ SPI_prepare (fully_refunded_coins_sql, 0, NULL);
+ if (new_plan == NULL)
+ elog (ERROR, "SPI_prepare for fully refunded coins failed ! \n");
+ fully_refunded_coins_plan = SPI_saveplan (new_plan);
+ if (fully_refunded_coins_plan == NULL)
+ elog (ERROR, "SPI_saveplan for fully refunded coins failed ! \n");
+ }
+
+ int res = SPI_execute_plan (fully_refunded_coins_sql);
+ if (res != SPI_OK_SELECT)
+ elog (ERROR, "Failed to execute subquery 4\n");
+ SPITupleTable *tuptable = SPI_tuptable;
+ TupleDesc tupdesc = tuptable->tupdesc;
+
+ BYTEA coin_pub = SPI_getbinval (tuple, tupdesc, 1, &coin_pub_isnull);
+ if (fees_plan == NULL)
+ {
+ const char *fees_sql =
+ "SELECT "
+ " denom.fee_deposit_val AS fee_val, "
+ " denom.fee_deposit_frac AS fee_frac, "
+ " cs.deposit_serial_id "
+ "FROM dep cs "
+ "JOIN known_coins kc USING (coin_pub) "
+ "JOIN denominations denom USING (denominations_serial) "
+ "WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins)";
+ SPIPlanPtr new_plan =
+ SPI_prepare (fees_sql, 0, NULL);
+ if (new_plan == NULL)
+ {
+ elog (ERROR, "SPI_prepare for fees failed ! \n");
+ }
+ fees_plan = SPI_saveplan (new_plan);
+ if (fees_plan == NULL)
+ {
+ elog (ERROR, "SPI_saveplan for fees failed ! \n");
+ }
+ }
+ }
+ int fees_ntuples;
+ SPI_execute (fees_sql, true, 0);
+ if (SPI_result_code () != SPI_OK_SELECT)
+ {
+ ereport (
+ ERROR,
+ (errcode (ERRCODE_INTERNAL_ERROR),
+ errmsg ("deposit fee query failed: error code %d \n",
+ SPI_result_code ())));
+ }
+ fees_ntuples = SPI_processed;
+
+ if (fees_ntuples > 0)
+ {
+ for (i = 0; i < fees_ntuples; i++)
+ {
+ Datum fee_val_datum =
+ SPI_getbinval (SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 1,
+ &fee_null);
+ Datum fee_frac_datum =
+ SPI_getbinval (SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 2,
+ &fee_null);
+ Datum deposit_id_datum =
+ SPI_getbinval (SPI_tuptable->vals[i], SPI_tuptable->tupdesc, 3,
+ &deposit_null);
+ if (! fee_null && ! deposit_null)
+ {
+ int64 fee_val = DatumGetInt64 (fee_val_datum);
+ int32 fee_frac = DatumGetInt32 (fee_frac_datum);
+ int64 deposit_id = DatumGetInt64 (deposit_id_datum);
+ sum_fee_value += fee_val;
+ sum_fee_fraction += fee_frac;
+ char *insert_agg_sql =
+ psprintf (
+ "INSERT INTO "
+ "aggregation_tracking(deposit_serial_id, wtid_raw)"
+ " VALUES (%lld, '%s')",
+ deposit_id, wtid_raw);
+ SPI_execute (insert_agg_sql, false, 0);
+ }
+ }
+ }
+
+ TupleDesc tupdesc;
+ SPITupleTable *tuptable = SPI_tuptable;
+ HeapTuple tuple;
+ Datum result;
+
+ if (tuptable == NULL || SPI_processed != 1)
+ {
+ ereport (
+ ERROR,
+ (errcode (ERRCODE_INTERNAL_ERROR),
+ errmsg ("Unexpected result \n")));
+ }
+ tupdesc = SPI_tuptable->tupdesc;
+ tuple = SPI_tuptable->vals[0];
+ result = HeapTupleGetDatum (tuple);
+
+ TupleDesc result_desc = CreateTemplateTupleDesc (6, false);
+ TupleDescInitEntry (result_desc, (AttrNumber) 1, "sum_deposit_value", INT8OID,
+ -1, 0);
+ TupleDescInitEntry (result_desc, (AttrNumber) 2, "sum_deposit_fraction",
+ INT4OID, -1, 0);
+ TupleDescInitEntry (result_desc, (AttrNumber) 3, "sum_refund_value", INT8OID,
+ -1, 0);
+ TupleDescInitEntry (result_desc, (AttrNumber) 4, "sum_refund_fraction",
+ INT4OID, -1, 0);
+ TupleDescInitEntry (result_desc, (AttrNumber) 5, "sum_fee_value", INT8OID, -1,
+ 0);
+ TupleDescInitEntry (result_desc, (AttrNumber) 6, "sum_fee_fraction", INT4OID,
+ -1, 0);
+
+ int ret = SPI_prepare (sql, 4, argtypes);
+ if (ret != SPI_OK_PREPARE)
+ {
+ elog (ERROR, "Failed to prepare statement: %s \n", sql);
+ }
+
+ ret = SPI_execute_plan (plan, args, nulls, true, 0);
+ if (ret != SPI_OK_SELECT)
+ {
+ elog (ERROR, "Failed to execute statement: %s \n", sql);
+ }
+
+ if (SPI_processed > 0)
+ {
+ HeapTuple tuple;
+ Datum values[6];
+ bool nulls[6] = {false};
+ values[0] =
+ SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 1,
+ &nulls[0]);
+ values[1] =
+ SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 2,
+ &nulls[1]);
+ values[2] =
+ SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 3,
+ &nulls[2]);
+ values[3] =
+ SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 4,
+ &nulls[3]);
+ values[4] =
+ SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 5,
+ &nulls[4]);
+ values[5] =
+ SPI_getbinval (SPI_tuptable->vals[0], SPI_tuptable->tupdesc, 6,
+ &nulls[5]);
+ tuple = heap_form_tuple (result_desc, values, nulls);
+ PG_RETURN_DATUM (HeapTupleGetDatum (tuple));
+ }
+ SPI_finish ();
+
+ PG_RETURN_NULL ();
+}