summaryrefslogtreecommitdiff
path: root/src/exchangedb/plugin_exchangedb_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchangedb/plugin_exchangedb_postgres.c')
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c188
1 files changed, 122 insertions, 66 deletions
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index b7a3b5f70..f686a8c52 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -40,13 +40,23 @@
/**
- * Log a really unexpected PQ error.
+ * Log a really unexpected PQ error with all the details we can get hold of.
*
* @param result PQ result object of the PQ operation that failed
+ * @param conn SQL connection that was used
*/
-#define BREAK_DB_ERR(result) do { \
+#define BREAK_DB_ERR(result,conn) do { \
+ char *err = PQresultVerboseErrorMessage (result, PQERRORS_VERBOSE, PQSHOW_CONTEXT_ALWAYS); \
GNUNET_break (0); \
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Database failure: %s (%s)\n", PQresultErrorMessage (result), PQresStatus (PQresultStatus (result))); \
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR, \
+ "Database failure: %s/%s/%s/%s/%s/%s", \
+ PQresultErrorField (result, PG_DIAG_MESSAGE_PRIMARY), \
+ PQresultErrorField (result, PG_DIAG_MESSAGE_DETAIL), \
+ PQresultErrorMessage (result), \
+ PQresStatus (PQresultStatus (result)), \
+ PQerrorMessage(conn), \
+ err); \
+ PQfreemem (err); \
} while (0)
@@ -75,7 +85,7 @@
PGresult *result = PQexec (conn, sql); \
if (PGRES_COMMAND_OK != PQresultStatus (result)) \
{ \
- BREAK_DB_ERR (result); \
+ BREAK_DB_ERR (result, conn); \
PQclear (result); \
goto SQLEXEC_fail; \
} \
@@ -134,7 +144,7 @@ struct PostgresClosure
* We already log whenever we care, so this function does nothing
* and merely exists to silence the libpq logging.
*
- * @param arg NULL
+ * @param arg the SQL connection that was used
* @param res information about some libpq event
*/
static void
@@ -149,7 +159,7 @@ pq_notice_receiver_cb (void *arg,
* Function called by libpq whenever it wants to log something.
* We log those using the Taler logger.
*
- * @param arg NULL
+ * @param arg the SQL connection that was used
* @param message information about some libpq event
*/
static void
@@ -186,10 +196,10 @@ connect_to_postgres (struct PostgresClosure *pc)
}
PQsetNoticeReceiver (conn,
&pq_notice_receiver_cb,
- NULL);
+ conn);
PQsetNoticeProcessor (conn,
&pq_notice_processor_cb,
- NULL);
+ conn);
return conn;
}
@@ -465,11 +475,23 @@ postgres_create_tables (void *cls)
",PRIMARY KEY (coin_pub, merchant_pub, h_proposal_data, rtransaction_id)" /* this combo must be unique, and we usually select by coin_pub */
") ");
+ /* This table contains the data for
+ wire transfers the exchange has executed. */
+ SQLEXEC("CREATE TABLE IF NOT EXISTS wire_out "
+ "(wireout_uuid BIGSERIAL PRIMARY KEY"
+ ",execution_date INT8 NOT NULL"
+ ",wtid_raw BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid_raw)=" TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")"
+ ",wire_target TEXT NOT NULL"
+ ",amount_val INT8 NOT NULL"
+ ",amount_frac INT4 NOT NULL"
+ ",amount_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL"
+ ")");
+
/* Table for the tracking API, mapping from wire transfer identifiers
to transactions and back */
SQLEXEC("CREATE TABLE IF NOT EXISTS aggregation_tracking "
"(deposit_serial_id INT8 PRIMARY KEY REFERENCES deposits (deposit_serial_id) ON DELETE CASCADE"
- ",wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=" TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")"
+ ",wtid_raw BYTEA CONSTRAINT wire_out_ref REFERENCES wire_out(wtid_raw) ON DELETE CASCADE DEFERRABLE"
",execution_time INT8 NOT NULL"
")");
/* Index for lookup_transactions statement on wtid */
@@ -505,18 +527,6 @@ postgres_create_tables (void *cls)
SQLEXEC_INDEX("CREATE INDEX prepare_iteration_index "
"ON prewire(type,finished)");
- /* This table contains the data for
- wire transfers the exchange has executed. */
- SQLEXEC("CREATE TABLE IF NOT EXISTS wire_out "
- "(wireout_uuid BIGSERIAL PRIMARY KEY"
- ",execution_date INT8 NOT NULL"
- ",wtid_raw BYTEA NOT NULL CHECK (LENGTH(wtid_raw)=" TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")"
- ",wire_target TEXT NOT NULL"
- ",amount_val INT8 NOT NULL"
- ",amount_frac INT4 NOT NULL"
- ",amount_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL"
- ")");
-
#undef SQLEXEC
#undef SQLEXEC_INDEX
@@ -545,7 +555,7 @@ postgres_prepare (PGconn *db_conn)
result = PQprepare (db_conn, name, sql, __VA_ARGS__); \
if (PGRES_COMMAND_OK != PQresultStatus (result)) \
{ \
- BREAK_DB_ERR (result); \
+ BREAK_DB_ERR (result, db_conn); \
PQclear (result); result = NULL; \
return GNUNET_SYSERR; \
} \
@@ -1270,7 +1280,7 @@ postgres_prepare (PGconn *db_conn)
"INSERT INTO aggregation_tracking "
"(deposit_serial_id"
",wtid_raw"
- ",execution_time"
+ ",execution_time" /* TODO: this field should be eliminated and obtained from wire_out */
") VALUES "
"($1, $2, $3)",
3, NULL);
@@ -1601,7 +1611,7 @@ postgres_insert_denomination_info (void *cls,
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
ret = GNUNET_SYSERR;
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
}
else
{
@@ -2311,7 +2321,7 @@ postgres_have_deposit (void *cls,
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -2402,7 +2412,7 @@ postgres_mark_deposit_tiny (void *cls,
if (PGRES_COMMAND_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -2441,7 +2451,7 @@ postgres_test_deposit_done (void *cls,
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -2507,7 +2517,7 @@ postgres_mark_deposit_done (void *cls,
if (PGRES_COMMAND_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -2549,7 +2559,7 @@ postgres_get_ready_deposit (void *cls,
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -2651,7 +2661,7 @@ postgres_iterate_matching_deposits (void *cls,
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -2742,7 +2752,7 @@ get_known_coin (void *cls,
params);
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -2809,7 +2819,7 @@ insert_known_coin (void *cls,
params);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -2874,7 +2884,7 @@ postgres_insert_deposit (void *cls,
params);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
ret = GNUNET_SYSERR;
}
else
@@ -2919,7 +2929,7 @@ postgres_insert_refund (void *cls,
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
ret = GNUNET_SYSERR;
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
}
else
{
@@ -2959,7 +2969,7 @@ postgres_get_refresh_session (void *cls,
params);
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -3076,7 +3086,7 @@ postgres_create_refresh_session (void *cls,
params);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -3124,7 +3134,7 @@ postgres_insert_refresh_order (void *cls,
}
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -3199,7 +3209,7 @@ postgres_get_refresh_order (void *cls,
}
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
free_dpk_result (denom_pubs, i);
return GNUNET_SYSERR;
@@ -3275,7 +3285,7 @@ postgres_insert_refresh_commit_coins (void *cls,
}
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -3354,7 +3364,7 @@ postgres_get_refresh_commit_coins (void *cls,
params);
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
postgres_free_refresh_commit_coins (cls,
i,
@@ -3426,7 +3436,7 @@ postgres_insert_refresh_transfer_public_key (void *cls,
params);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -3470,7 +3480,7 @@ postgres_get_refresh_transfer_public_key (void *cls,
params);
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -3538,7 +3548,7 @@ postgres_get_refresh_out (void *cls,
params);
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -3595,7 +3605,7 @@ postgres_insert_refresh_out (void *cls,
params);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -3633,7 +3643,7 @@ postgres_get_link_data_list (void *cls,
ldl = NULL;
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return NULL;
}
@@ -3719,7 +3729,7 @@ postgres_get_transfer (void *cls,
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -3872,7 +3882,7 @@ postgres_get_coin_transactions (void *cls,
params);
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
goto cleanup;
}
@@ -3943,7 +3953,7 @@ postgres_get_coin_transactions (void *cls,
params);
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
goto cleanup;
}
@@ -4043,7 +4053,7 @@ postgres_lookup_wire_transfer (void *cls,
params);
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -4159,7 +4169,7 @@ postgres_wire_lookup_deposit_wtid (void *cls,
params);
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -4187,7 +4197,7 @@ postgres_wire_lookup_deposit_wtid (void *cls,
params2);
if (PGRES_TUPLES_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -4301,7 +4311,7 @@ postgres_insert_aggregation_tracking (void *cls,
params);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -4361,7 +4371,7 @@ postgres_get_wire_fee (void *cls,
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -4465,7 +4475,7 @@ postgres_insert_wire_fee (void *cls,
params);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -4509,7 +4519,7 @@ postgres_wire_prepare_data_insert (void *cls,
params);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -4543,7 +4553,7 @@ postgres_wire_prepare_data_mark_finished (void *cls,
if (PGRES_COMMAND_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -4634,6 +4644,51 @@ postgres_wire_prepare_data_get (void *cls,
/**
+ * Start a transaction where we transiently violate the foreign
+ * constraints on the "wire_out" table as we insert aggregations
+ * and only add the wire transfer out at the end.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param session connection to use
+ * @return #GNUNET_OK on success
+ */
+static int
+postgres_start_deferred_wire_out (void *cls,
+ struct TALER_EXCHANGEDB_Session *session)
+{
+ PGresult *result;
+ ExecStatusType ex;
+
+ result = PQexec (session->conn,
+ "START TRANSACTION ISOLATION LEVEL SERIALIZABLE");
+ if (PGRES_COMMAND_OK !=
+ (ex = PQresultStatus (result)))
+ {
+ TALER_LOG_ERROR ("Failed to start transaction (%s): %s\n",
+ PQresStatus (ex),
+ PQerrorMessage (session->conn));
+ GNUNET_break (0);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ result = PQexec (session->conn,
+ "SET CONSTRAINTS wire_out_ref DEFERRED");
+ if (PGRES_COMMAND_OK !=
+ (ex = PQresultStatus (result)))
+ {
+ TALER_LOG_ERROR ("Failed to defer wire_out_ref constraint on transaction (%s): %s\n",
+ PQresStatus (ex),
+ PQerrorMessage (session->conn));
+ GNUNET_break (0);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ PQclear (result);
+ return GNUNET_OK;
+}
+
+
+/**
* Store information about an outgoing wire transfer that was executed.
*
* @param cls closure
@@ -4667,7 +4722,7 @@ postgres_store_wire_transfer_out (void *cls,
params);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -4714,7 +4769,7 @@ postgres_gc (void *cls)
params_none);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, conn);
PQclear (result);
PQfinish (conn);
return GNUNET_SYSERR;
@@ -4725,7 +4780,7 @@ postgres_gc (void *cls)
params_time);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, conn);
PQclear (result);
PQfinish (conn);
return GNUNET_SYSERR;
@@ -4736,7 +4791,7 @@ postgres_gc (void *cls)
params_time);
if (PGRES_COMMAND_OK != PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, conn);
PQclear (result);
PQfinish (conn);
return GNUNET_SYSERR;
@@ -4777,7 +4832,7 @@ postgres_select_deposits_above_serial_id (void *cls,
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -4881,7 +4936,7 @@ postgres_select_refreshs_above_serial_id (void *cls,
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -4978,7 +5033,7 @@ postgres_select_refunds_above_serial_id (void *cls,
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -5069,7 +5124,7 @@ postgres_select_reserves_in_above_serial_id (void *cls,
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -5164,7 +5219,7 @@ postgres_select_reserves_out_above_serial_id (void *cls,
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -5266,7 +5321,7 @@ postgres_select_wire_out_above_serial_id (void *cls,
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- BREAK_DB_ERR (result);
+ BREAK_DB_ERR (result, session->conn);
PQclear (result);
return GNUNET_SYSERR;
}
@@ -5417,6 +5472,7 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
plugin->wire_prepare_data_insert = &postgres_wire_prepare_data_insert;
plugin->wire_prepare_data_mark_finished = &postgres_wire_prepare_data_mark_finished;
plugin->wire_prepare_data_get = &postgres_wire_prepare_data_get;
+ plugin->start_deferred_wire_out = &postgres_start_deferred_wire_out;
plugin->store_wire_transfer_out = &postgres_store_wire_transfer_out;
plugin->gc = &postgres_gc;
plugin->select_deposits_above_serial_id = &postgres_select_deposits_above_serial_id;