From 6a98b07ff2e75a429982eaf6b00ce54c95a28e8e Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 18 Mar 2017 03:44:59 +0100 Subject: add wire_out tracking to exchangedb, including deferred constraint, and to aggregator --- ChangeLog | 4 + src/exchange/taler-exchange-aggregator.c | 50 ++++++-- src/exchangedb/plugin_exchangedb_postgres.c | 188 ++++++++++++++++++---------- src/exchangedb/test_exchangedb.c | 41 ++++-- src/include/taler_exchangedb_plugin.h | 15 +++ 5 files changed, 212 insertions(+), 86 deletions(-) diff --git a/ChangeLog b/ChangeLog index 2c512a639..40743eb30 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +Sat Mar 18 03:44:38 CET 2017 + Add 'wire_out' table to exchange DB to track outgoing + wire transfers. -CG + Fri Nov 18 18:53:30 CET 2016 Improved error reporting for bogus wire specifications. Releasing taler-exchange 0.2.0. -CG diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index ae4ee24fd..44154d200 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -705,8 +705,8 @@ run_aggregation (void *cls) return; } if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - session)) + db_plugin->start_deferred_wire_out (db_plugin->cls, + session)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); @@ -908,11 +908,6 @@ prepare_cb (void *cls, { struct TALER_EXCHANGEDB_Session *session = au->session; - if (NULL != au->wire) - { - json_decref (au->wire); - au->wire = NULL; - } GNUNET_free_non_null (au->additional_rows); if (NULL == buf) { @@ -922,6 +917,11 @@ prepare_cb (void *cls, /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); + if (NULL != au->wire) + { + json_decref (au->wire); + au->wire = NULL; + } GNUNET_free (au); au = NULL; return; @@ -941,10 +941,46 @@ prepare_cb (void *cls, /* start again */ task = GNUNET_SCHEDULER_add_now (&run_aggregation, NULL); + if (NULL != au->wire) + { + json_decref (au->wire); + au->wire = NULL; + } + GNUNET_free (au); + au = NULL; + return; + } + + /* Commit the WTID data to 'wire_out' to finally satisfy aggregation + table constraints */ + if (GNUNET_OK != + db_plugin->store_wire_transfer_out (db_plugin->cls, + session, + au->execution_time, + &au->wtid, + au->wire, + &au->total_amount)) + { + GNUNET_break (0); /* why? how to best recover? */ + db_plugin->rollback (db_plugin->cls, + session); + /* start again */ + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + NULL); + if (NULL != au->wire) + { + json_decref (au->wire); + au->wire = NULL; + } GNUNET_free (au); au = NULL; return; } + if (NULL != au->wire) + { + json_decref (au->wire); + au->wire = NULL; + } GNUNET_free (au); au = NULL; diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index b7a3b5f70..f686a8c52 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -40,13 +40,23 @@ /** - * Log a really unexpected PQ error. + * Log a really unexpected PQ error with all the details we can get hold of. * * @param result PQ result object of the PQ operation that failed + * @param conn SQL connection that was used */ -#define BREAK_DB_ERR(result) do { \ +#define BREAK_DB_ERR(result,conn) do { \ + char *err = PQresultVerboseErrorMessage (result, PQERRORS_VERBOSE, PQSHOW_CONTEXT_ALWAYS); \ GNUNET_break (0); \ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Database failure: %s (%s)\n", PQresultErrorMessage (result), PQresStatus (PQresultStatus (result))); \ + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \ + "Database failure: %s/%s/%s/%s/%s/%s", \ + PQresultErrorField (result, PG_DIAG_MESSAGE_PRIMARY), \ + PQresultErrorField (result, PG_DIAG_MESSAGE_DETAIL), \ + PQresultErrorMessage (result), \ + PQresStatus (PQresultStatus (result)), \ + PQerrorMessage(conn), \ + err); \ + PQfreemem (err); \ } while (0) @@ -75,7 +85,7 @@ PGresult *result = PQexec (conn, sql); \ if (PGRES_COMMAND_OK != PQresultStatus (result)) \ { \ - BREAK_DB_ERR (result); \ + BREAK_DB_ERR (result, conn); \ PQclear (result); \ goto SQLEXEC_fail; \ } \ @@ -134,7 +144,7 @@ struct PostgresClosure * We already log whenever we care, so this function does nothing * and merely exists to silence the libpq logging. * - * @param arg NULL + * @param arg the SQL connection that was used * @param res information about some libpq event */ static void @@ -149,7 +159,7 @@ pq_notice_receiver_cb (void *arg, * Function called by libpq whenever it wants to log something. * We log those using the Taler logger. * - * @param arg NULL + * @param arg the SQL connection that was used * @param message information about some libpq event */ static void @@ -186,10 +196,10 @@ connect_to_postgres (struct PostgresClosure *pc) } PQsetNoticeReceiver (conn, &pq_notice_receiver_cb, - NULL); + conn); PQsetNoticeProcessor (conn, &pq_notice_processor_cb, - NULL); + conn); return conn; } @@ -465,11 +475,23 @@ postgres_create_tables (void *cls) ",PRIMARY KEY (coin_pub, merchant_pub, h_proposal_data, rtransaction_id)" /* this combo must be unique, and we usually select by coin_pub */ ") "); + /* This table contains the data for + wire transfers the exchange has executed. */ + SQLEXEC("CREATE TABLE IF NOT EXISTS wire_out " + "(wireout_uuid BIGSERIAL PRIMARY KEY" + ",execution_date INT8 NOT NULL" + ",wtid_raw BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid_raw)=" TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")" + ",wire_target TEXT NOT NULL" + ",amount_val INT8 NOT NULL" + ",amount_frac INT4 NOT NULL" + ",amount_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL" + ")"); + /* Table for the tracking API, mapping from wire transfer identifiers to transactions and back */ SQLEXEC("CREATE TABLE IF NOT EXISTS aggregation_tracking " "(deposit_serial_id INT8 PRIMARY KEY REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE" - ",wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=" TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")" + ",wtid_raw BYTEA CONSTRAINT wire_out_ref REFERENCES wire_out(wtid_raw) ON DELETE CASCADE DEFERRABLE" ",execution_time INT8 NOT NULL" ")"); /* Index for lookup_transactions statement on wtid */ @@ -505,18 +527,6 @@ postgres_create_tables (void *cls) SQLEXEC_INDEX("CREATE INDEX prepare_iteration_index " "ON prewire(type,finished)"); - /* This table contains the data for - wire transfers the exchange has executed. */ - SQLEXEC("CREATE TABLE IF NOT EXISTS wire_out " - "(wireout_uuid BIGSERIAL PRIMARY KEY" - ",execution_date INT8 NOT NULL" - ",wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=" TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")" - ",wire_target TEXT NOT NULL" - ",amount_val INT8 NOT NULL" - ",amount_frac INT4 NOT NULL" - ",amount_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL" - ")"); - #undef SQLEXEC #undef SQLEXEC_INDEX @@ -545,7 +555,7 @@ postgres_prepare (PGconn *db_conn) result = PQprepare (db_conn, name, sql, __VA_ARGS__); \ if (PGRES_COMMAND_OK != PQresultStatus (result)) \ { \ - BREAK_DB_ERR (result); \ + BREAK_DB_ERR (result, db_conn); \ PQclear (result); result = NULL; \ return GNUNET_SYSERR; \ } \ @@ -1270,7 +1280,7 @@ postgres_prepare (PGconn *db_conn) "INSERT INTO aggregation_tracking " "(deposit_serial_id" ",wtid_raw" - ",execution_time" + ",execution_time" /* TODO: this field should be eliminated and obtained from wire_out */ ") VALUES " "($1, $2, $3)", 3, NULL); @@ -1601,7 +1611,7 @@ postgres_insert_denomination_info (void *cls, if (PGRES_COMMAND_OK != PQresultStatus (result)) { ret = GNUNET_SYSERR; - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); } else { @@ -2311,7 +2321,7 @@ postgres_have_deposit (void *cls, if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -2402,7 +2412,7 @@ postgres_mark_deposit_tiny (void *cls, if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -2441,7 +2451,7 @@ postgres_test_deposit_done (void *cls, if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -2507,7 +2517,7 @@ postgres_mark_deposit_done (void *cls, if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -2549,7 +2559,7 @@ postgres_get_ready_deposit (void *cls, if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -2651,7 +2661,7 @@ postgres_iterate_matching_deposits (void *cls, if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -2742,7 +2752,7 @@ get_known_coin (void *cls, params); if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -2809,7 +2819,7 @@ insert_known_coin (void *cls, params); if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -2874,7 +2884,7 @@ postgres_insert_deposit (void *cls, params); if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); ret = GNUNET_SYSERR; } else @@ -2919,7 +2929,7 @@ postgres_insert_refund (void *cls, if (PGRES_COMMAND_OK != PQresultStatus (result)) { ret = GNUNET_SYSERR; - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); } else { @@ -2959,7 +2969,7 @@ postgres_get_refresh_session (void *cls, params); if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -3076,7 +3086,7 @@ postgres_create_refresh_session (void *cls, params); if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -3124,7 +3134,7 @@ postgres_insert_refresh_order (void *cls, } if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -3199,7 +3209,7 @@ postgres_get_refresh_order (void *cls, } if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); free_dpk_result (denom_pubs, i); return GNUNET_SYSERR; @@ -3275,7 +3285,7 @@ postgres_insert_refresh_commit_coins (void *cls, } if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -3354,7 +3364,7 @@ postgres_get_refresh_commit_coins (void *cls, params); if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); postgres_free_refresh_commit_coins (cls, i, @@ -3426,7 +3436,7 @@ postgres_insert_refresh_transfer_public_key (void *cls, params); if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -3470,7 +3480,7 @@ postgres_get_refresh_transfer_public_key (void *cls, params); if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -3538,7 +3548,7 @@ postgres_get_refresh_out (void *cls, params); if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -3595,7 +3605,7 @@ postgres_insert_refresh_out (void *cls, params); if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -3633,7 +3643,7 @@ postgres_get_link_data_list (void *cls, ldl = NULL; if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return NULL; } @@ -3719,7 +3729,7 @@ postgres_get_transfer (void *cls, if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -3872,7 +3882,7 @@ postgres_get_coin_transactions (void *cls, params); if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); goto cleanup; } @@ -3943,7 +3953,7 @@ postgres_get_coin_transactions (void *cls, params); if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); goto cleanup; } @@ -4043,7 +4053,7 @@ postgres_lookup_wire_transfer (void *cls, params); if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -4159,7 +4169,7 @@ postgres_wire_lookup_deposit_wtid (void *cls, params); if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -4187,7 +4197,7 @@ postgres_wire_lookup_deposit_wtid (void *cls, params2); if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -4301,7 +4311,7 @@ postgres_insert_aggregation_tracking (void *cls, params); if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -4361,7 +4371,7 @@ postgres_get_wire_fee (void *cls, if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -4465,7 +4475,7 @@ postgres_insert_wire_fee (void *cls, params); if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -4509,7 +4519,7 @@ postgres_wire_prepare_data_insert (void *cls, params); if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -4543,7 +4553,7 @@ postgres_wire_prepare_data_mark_finished (void *cls, if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -4633,6 +4643,51 @@ postgres_wire_prepare_data_get (void *cls, } +/** + * Start a transaction where we transiently violate the foreign + * constraints on the "wire_out" table as we insert aggregations + * and only add the wire transfer out at the end. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param session connection to use + * @return #GNUNET_OK on success + */ +static int +postgres_start_deferred_wire_out (void *cls, + struct TALER_EXCHANGEDB_Session *session) +{ + PGresult *result; + ExecStatusType ex; + + result = PQexec (session->conn, + "START TRANSACTION ISOLATION LEVEL SERIALIZABLE"); + if (PGRES_COMMAND_OK != + (ex = PQresultStatus (result))) + { + TALER_LOG_ERROR ("Failed to start transaction (%s): %s\n", + PQresStatus (ex), + PQerrorMessage (session->conn)); + GNUNET_break (0); + PQclear (result); + return GNUNET_SYSERR; + } + result = PQexec (session->conn, + "SET CONSTRAINTS wire_out_ref DEFERRED"); + if (PGRES_COMMAND_OK != + (ex = PQresultStatus (result))) + { + TALER_LOG_ERROR ("Failed to defer wire_out_ref constraint on transaction (%s): %s\n", + PQresStatus (ex), + PQerrorMessage (session->conn)); + GNUNET_break (0); + PQclear (result); + return GNUNET_SYSERR; + } + PQclear (result); + return GNUNET_OK; +} + + /** * Store information about an outgoing wire transfer that was executed. * @@ -4667,7 +4722,7 @@ postgres_store_wire_transfer_out (void *cls, params); if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -4714,7 +4769,7 @@ postgres_gc (void *cls) params_none); if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, conn); PQclear (result); PQfinish (conn); return GNUNET_SYSERR; @@ -4725,7 +4780,7 @@ postgres_gc (void *cls) params_time); if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, conn); PQclear (result); PQfinish (conn); return GNUNET_SYSERR; @@ -4736,7 +4791,7 @@ postgres_gc (void *cls) params_time); if (PGRES_COMMAND_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, conn); PQclear (result); PQfinish (conn); return GNUNET_SYSERR; @@ -4777,7 +4832,7 @@ postgres_select_deposits_above_serial_id (void *cls, if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -4881,7 +4936,7 @@ postgres_select_refreshs_above_serial_id (void *cls, if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -4978,7 +5033,7 @@ postgres_select_refunds_above_serial_id (void *cls, if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -5069,7 +5124,7 @@ postgres_select_reserves_in_above_serial_id (void *cls, if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -5164,7 +5219,7 @@ postgres_select_reserves_out_above_serial_id (void *cls, if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -5266,7 +5321,7 @@ postgres_select_wire_out_above_serial_id (void *cls, if (PGRES_TUPLES_OK != PQresultStatus (result)) { - BREAK_DB_ERR (result); + BREAK_DB_ERR (result, session->conn); PQclear (result); return GNUNET_SYSERR; } @@ -5417,6 +5472,7 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) plugin->wire_prepare_data_insert = &postgres_wire_prepare_data_insert; plugin->wire_prepare_data_mark_finished = &postgres_wire_prepare_data_mark_finished; plugin->wire_prepare_data_get = &postgres_wire_prepare_data_get; + plugin->start_deferred_wire_out = &postgres_start_deferred_wire_out; plugin->store_wire_transfer_out = &postgres_store_wire_transfer_out; plugin->gc = &postgres_gc; plugin->select_deposits_above_serial_id = &postgres_select_deposits_above_serial_id; diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index fba162592..bfa1e6a23 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -1241,20 +1241,13 @@ test_wire_out (struct TALER_EXCHANGEDB_Session *session, GNUNET_assert (GNUNET_OK == TALER_string_to_amount (CURRENCY ":1", &wire_out_amount)); + + /* we will transiently violate the wtid constraint on + the aggregation table, so we need to start the special + transaction where this is allowed... */ FAILIF (GNUNET_OK != - plugin->store_wire_transfer_out (plugin->cls, - session, - wire_out_date, - &wire_out_wtid, - wire_out_account, - &wire_out_amount)); - FAILIF (GNUNET_OK != - plugin->select_wire_out_above_serial_id (plugin->cls, - session, - 0, - &audit_wire_cb, - NULL)); - FAILIF (1 != auditor_row_cnt); + plugin->start_deferred_wire_out (plugin->cls, + session)); /* setup values for wire transfer aggregation data */ merchant_pub_wt = deposit->merchant_pub; @@ -1289,6 +1282,7 @@ test_wire_out (struct TALER_EXCHANGEDB_Session *session, &cb_wtid_never, NULL)); } + wtid_wt = wire_out_wtid; /* to statisfy foreign constraint */ /* insert WT data */ FAILIF (GNUNET_OK != plugin->insert_aggregation_tracking (plugin->cls, @@ -1312,6 +1306,27 @@ test_wire_out (struct TALER_EXCHANGEDB_Session *session, &cb_wtid_check, &cb_wtid_never)); + /* Now let's fix the transient constraint violation by + putting in the WTID into the wire_out table */ + FAILIF (GNUNET_OK != + plugin->store_wire_transfer_out (plugin->cls, + session, + wire_out_date, + &wire_out_wtid, + wire_out_account, + &wire_out_amount)); + FAILIF (GNUNET_OK != + plugin->select_wire_out_above_serial_id (plugin->cls, + session, + 0, + &audit_wire_cb, + NULL)); + FAILIF (1 != auditor_row_cnt); + + /* And now the commit should still succeed! */ + FAILIF (GNUNET_OK != + plugin->commit (plugin->cls, + session)); return GNUNET_OK; drop: diff --git a/src/include/taler_exchangedb_plugin.h b/src/include/taler_exchangedb_plugin.h index ef49074e4..4ab3e4a54 100644 --- a/src/include/taler_exchangedb_plugin.h +++ b/src/include/taler_exchangedb_plugin.h @@ -1639,6 +1639,21 @@ struct TALER_EXCHANGEDB_Plugin void *cb_cls); + /** + * Start a transaction where we transiently violate the foreign + * constraints on the "wire_out" table as we insert aggregations + * and only add the wire transfer out at the end. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param session connection to use + * @return #GNUNET_OK on success + */ + int + (*start_deferred_wire_out) (void *cls, + struct TALER_EXCHANGEDB_Session *session); + + + /** * Store information about an outgoing wire transfer that was executed. * -- cgit v1.2.3