summaryrefslogtreecommitdiff
path: root/src/mintdb/plugin_mintdb_postgres.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/mintdb/plugin_mintdb_postgres.c')
-rw-r--r--src/mintdb/plugin_mintdb_postgres.c919
1 files changed, 863 insertions, 56 deletions
diff --git a/src/mintdb/plugin_mintdb_postgres.c b/src/mintdb/plugin_mintdb_postgres.c
index 0f32cfb8e..2ab3e81ac 100644
--- a/src/mintdb/plugin_mintdb_postgres.c
+++ b/src/mintdb/plugin_mintdb_postgres.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2014, 2015 Christian Grothoff (and other contributing authors)
+ Copyright (C) 2014, 2015, 2016 GNUnet e.V.
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -425,7 +425,7 @@ postgres_create_tables (void *cls,
/* This table contains the wire transfers the mint is supposed to
execute to transmit funds to the merchants (and manage refunds). */
SQLEXEC("CREATE TABLE IF NOT EXISTS deposits "
- "(serial_id BIGSERIAL"
+ "(serial_id BIGSERIAL PRIMARY KEY"
",coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)"
",denom_pub BYTEA NOT NULL REFERENCES denominations (pub)"
",denom_sig BYTEA NOT NULL"
@@ -444,10 +444,49 @@ postgres_create_tables (void *cls,
",h_wire BYTEA NOT NULL CHECK (LENGTH(h_wire)=64)"
",coin_sig BYTEA NOT NULL CHECK (LENGTH(coin_sig)=64)"
",wire TEXT NOT NULL"
+ ",tiny BOOLEAN NOT NULL DEFAULT false"
+ ",done BOOLEAN NOT NULL DEFAULT false"
")");
- /* Index for get_deposit statement on coin_pub, transactiojn_id and merchant_pub */
+ /* Index for get_deposit statement on coin_pub, transaction_id and merchant_pub */
SQLEXEC_INDEX("CREATE INDEX deposits_coin_pub_index "
"ON deposits(coin_pub, transaction_id, merchant_pub)");
+ /* Table for the tracking API, mapping from wire transfer identifiers
+ to transactions and back */
+ SQLEXEC("CREATE TABLE IF NOT EXISTS aggregation_tracking "
+ "(h_contract BYTEA CHECK (LENGTH(h_contract)=64)"
+ ",h_wire BYTEA CHECK (LENGTH(h_wire)=64)"
+ ",coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)"
+ ",merchant_pub BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=32)"
+ ",transaction_id INT8 NOT NULL"
+ ",wtid_raw BYTEA NOT NULL CHECK (LENGTH(merchant_pub)=" TALER_WIRE_TRANSFER_IDENTIFIER_LEN_STR ")"
+ ",execution_time INT8 NOT NULL"
+ ",coin_amount_val INT8 NOT NULL"
+ ",coin_amount_frac INT4 NOT NULL"
+ ",coin_amount_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL"
+ ",coin_fee_val INT8 NOT NULL"
+ ",coin_fee_frac INT4 NOT NULL"
+ ",coin_fee_curr VARCHAR("TALER_CURRENCY_LEN_STR") NOT NULL"
+ ")");
+ /* Index for lookup_transactions statement on wtid */
+ SQLEXEC_INDEX("CREATE INDEX aggregation_tracking_wtid_index "
+ "ON aggregation_tracking(wtid_raw)");
+ /* Index for lookup_deposit_wtid statement */
+ SQLEXEC_INDEX("CREATE INDEX aggregation_tracking_deposit_index "
+ "ON aggregation_tracking(coin_pub,h_contract,h_wire,transaction_id,merchant_pub)");
+
+ /* This table contains the pre-commit data for
+ wire transfers the mint is about to execute. */
+ SQLEXEC("CREATE TABLE IF NOT EXISTS prewire "
+ "(serial_id BIGSERIAL PRIMARY KEY"
+ ",type TEXT NOT NULL"
+ ",finished BOOLEAN NOT NULL DEFAULT false"
+ ",buf BYTEA NOT NULL"
+ ")");
+ /* Index for prepare_data_iterate statement */
+ SQLEXEC_INDEX("CREATE INDEX prepare_iteration_index "
+ "ON prewire(type,finished)");
+
+
#undef SQLEXEC
#undef SQLEXEC_INDEX
@@ -558,6 +597,7 @@ postgres_prepare (PGconn *db_conn)
") VALUES "
"($1, $2, $3, $4, $5);",
5, NULL);
+
/* Used in #postgres_reserves_update() when the reserve is updated */
PREPARE ("reserve_update",
"UPDATE reserves"
@@ -567,6 +607,7 @@ postgres_prepare (PGconn *db_conn)
",current_balance_frac=$3 "
"WHERE current_balance_curr=$4 AND reserve_pub=$5",
5, NULL);
+
/* Used in #postgres_reserves_in_insert() to store transaction details */
PREPARE ("reserves_in_add_transaction",
"INSERT INTO reserves_in "
@@ -579,6 +620,7 @@ postgres_prepare (PGconn *db_conn)
") VALUES "
"($1, $2, $3, $4, $5, $6);",
6, NULL);
+
/* Used in #postgres_get_reserve_history() to obtain inbound transactions
for a reserve */
PREPARE ("reserves_in_get_transactions",
@@ -591,6 +633,7 @@ postgres_prepare (PGconn *db_conn)
" FROM reserves_in"
" WHERE reserve_pub=$1",
1, NULL);
+
/* Used in #postgres_insert_withdraw_info() to store
the signature of a blinded coin with the blinded coin's
details before returning it during /reserve/withdraw. We store
@@ -615,6 +658,7 @@ postgres_prepare (PGconn *db_conn)
") VALUES "
"($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12);",
12, NULL);
+
/* Used in #postgres_get_withdraw_info() to
locate the response for a /reserve/withdraw request
using the hash of the blinded message. Used to
@@ -635,6 +679,7 @@ postgres_prepare (PGconn *db_conn)
" FROM reserves_out"
" WHERE h_blind_ev=$1",
1, NULL);
+
/* Used during #postgres_get_reserve_history() to
obtain all of the /reserve/withdraw operations that
have been performed on a given reserve. (i.e. to
@@ -655,6 +700,7 @@ postgres_prepare (PGconn *db_conn)
" FROM reserves_out"
" WHERE reserve_pub=$1;",
1, NULL);
+
/* Used in #postgres_get_refresh_session() to fetch
high-level information about a refresh session */
PREPARE ("get_refresh_session",
@@ -665,6 +711,7 @@ postgres_prepare (PGconn *db_conn)
" FROM refresh_sessions "
" WHERE session_hash=$1 ",
1, NULL);
+
/* Used in #postgres_create_refresh_session() to store
high-level information about a refresh session */
PREPARE ("insert_refresh_session",
@@ -676,6 +723,7 @@ postgres_prepare (PGconn *db_conn)
") VALUES "
"($1, $2, $3, $4);",
4, NULL);
+
/* Used in #postgres_get_known_coin() to fetch
the denomination public key and signature for
a coin known to the mint. */
@@ -683,9 +731,10 @@ postgres_prepare (PGconn *db_conn)
"SELECT"
" denom_pub"
",denom_sig"
- " FROM known_coins "
+ " FROM known_coins"
" WHERE coin_pub=$1",
1, NULL);
+
/* Used in #postgres_insert_known_coin() to store
the denomination public key and signature for
a coin known to the mint. */
@@ -697,6 +746,7 @@ postgres_prepare (PGconn *db_conn)
") VALUES "
"($1,$2,$3);",
3, NULL);
+
/* Store information about the desired denominations for a
refresh operation, used in #postgres_insert_refresh_order() */
PREPARE ("insert_refresh_order",
@@ -707,6 +757,7 @@ postgres_prepare (PGconn *db_conn)
") VALUES "
"($1, $2, $3);",
3, NULL);
+
/* Obtain information about the desired denominations for a
refresh operation, used in #postgres_get_refresh_order() */
PREPARE ("get_refresh_order",
@@ -732,6 +783,7 @@ postgres_prepare (PGconn *db_conn)
") VALUES "
"($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);",
10, NULL);
+
/* Used in #postgres_get_refresh_melt to obtain information
about melted coins */
PREPARE ("get_refresh_melt",
@@ -747,6 +799,7 @@ postgres_prepare (PGconn *db_conn)
" FROM refresh_melts"
" WHERE session_hash=$1 AND oldcoin_index=$2",
2, NULL);
+
/* Query the 'refresh_melts' by coin public key */
PREPARE ("get_refresh_melt_by_coin",
"SELECT"
@@ -762,6 +815,7 @@ postgres_prepare (PGconn *db_conn)
" FROM refresh_melts"
" WHERE coin_pub=$1",
1, NULL);
+
/* Used in #postgres_insert_refresh_commit_links() to
store commitments */
PREPARE ("insert_refresh_commit_link",
@@ -774,6 +828,7 @@ postgres_prepare (PGconn *db_conn)
") VALUES "
"($1, $2, $3, $4, $5);",
5, NULL);
+
/* Used in #postgres_get_refresh_commit_links() to
retrieve original commitments during /refresh/reveal */
PREPARE ("get_refresh_commit_link",
@@ -783,6 +838,7 @@ postgres_prepare (PGconn *db_conn)
" FROM refresh_commit_link"
" WHERE session_hash=$1 AND cnc_index=$2 AND oldcoin_index=$3",
3, NULL);
+
/* Used in #postgres_insert_refresh_commit_coins() to
store coin commitments. */
PREPARE ("insert_refresh_commit_coin",
@@ -795,6 +851,7 @@ postgres_prepare (PGconn *db_conn)
") VALUES "
"($1, $2, $3, $4, $5);",
5, NULL);
+
/* Used in #postgres_get_refresh_commit_coins() to
retrieve the original coin envelopes, to either be
verified or signed. */
@@ -805,6 +862,7 @@ postgres_prepare (PGconn *db_conn)
" FROM refresh_commit_coin"
" WHERE session_hash=$1 AND cnc_index=$2 AND newcoin_index=$3",
3, NULL);
+
/* Store information about a /deposit the mint is to execute.
Used in #postgres_insert_deposit(). */
PREPARE ("insert_deposit",
@@ -831,6 +889,7 @@ postgres_prepare (PGconn *db_conn)
"($1, $2, $3, $4, $5, $6, $7, $8, $9, $10,"
" $11, $12, $13, $14, $15, $16, $17, $18);",
18, NULL);
+
/* Fetch an existing deposit request, used to ensure idempotency
during /deposit processing. Used in #postgres_have_deposit(). */
PREPARE ("get_deposit",
@@ -851,8 +910,29 @@ postgres_prepare (PGconn *db_conn)
" )",
3, NULL);
- /* Used in #postgres_iterate_deposits() */
- PREPARE ("deposits_iterate",
+ /* Fetch an existing deposit request.
+ Used in #postgres_wire_lookup_deposit_wtid(). */
+ PREPARE ("get_deposit_for_wtid",
+ "SELECT"
+ " amount_with_fee_val"
+ ",amount_with_fee_frac"
+ ",amount_with_fee_curr"
+ ",deposit_fee_val"
+ ",deposit_fee_frac"
+ ",deposit_fee_curr"
+ ",wire_deadline"
+ " FROM deposits"
+ " WHERE ("
+ " (coin_pub=$1) AND"
+ " (transaction_id=$2) AND"
+ " (merchant_pub=$3) AND"
+ " (h_contract=$4) AND"
+ " (h_wire=$5)"
+ " )",
+ 5, NULL);
+
+ /* Used in #postgres_get_ready_deposit() */
+ PREPARE ("deposits_get_ready",
"SELECT"
" serial_id"
",amount_with_fee_val"
@@ -865,11 +945,53 @@ postgres_prepare (PGconn *db_conn)
",transaction_id"
",h_contract"
",wire"
+ ",merchant_pub"
+ ",coin_pub"
" FROM deposits"
- " WHERE serial_id>=$1"
- " ORDER BY serial_id ASC"
- " LIMIT $2;",
- 2, NULL);
+ " WHERE"
+ " tiny=false AND"
+ " done=false"
+ " ORDER BY wire_deadline ASC"
+ " LIMIT 1;",
+ 0, NULL);
+
+ /* Used in #postgres_iterate_matching_deposits() */
+ PREPARE ("deposits_iterate_matching",
+ "SELECT"
+ " serial_id"
+ ",amount_with_fee_val"
+ ",amount_with_fee_frac"
+ ",amount_with_fee_curr"
+ ",deposit_fee_val"
+ ",deposit_fee_frac"
+ ",deposit_fee_curr"
+ ",wire_deadline"
+ ",transaction_id"
+ ",h_contract"
+ ",coin_pub"
+ " FROM deposits"
+ " WHERE"
+ " merchant_pub=$1 AND"
+ " h_wire=$2 AND"
+ " done=false"
+ " ORDER BY wire_deadline ASC"
+ " LIMIT $3",
+ 3, NULL);
+
+ /* Used in #postgres_mark_deposit_tiny() */
+ PREPARE ("mark_deposit_tiny",
+ "UPDATE deposits"
+ " SET tiny=true"
+ " WHERE serial_id=$1",
+ 1, NULL);
+
+ /* Used in #postgres_mark_deposit_done() */
+ PREPARE ("mark_deposit_done",
+ "UPDATE deposits"
+ " SET done=true"
+ " WHERE serial_id=$1",
+ 1, NULL);
+
/* Used in #postgres_get_coin_transactions() to obtain information
about how a coin has been spend with /deposit requests. */
PREPARE ("get_deposit_with_coin_pub",
@@ -904,6 +1026,7 @@ postgres_prepare (PGconn *db_conn)
") VALUES "
"($1, $2, $3)",
3, NULL);
+
/* Used in #postgres_get_link_data_list(). We use the session_hash
to obtain the "noreveal_index" for that session, and then select
the encrypted link vectors (link_vector_enc) and the
@@ -929,6 +1052,7 @@ postgres_prepare (PGconn *db_conn)
" AND ro.newcoin_index=rc.newcoin_index"
" AND rcc.cnc_index=rs.noreveal_index",
1, NULL);
+
/* Used in #postgres_get_transfer(). Given the public key of a
melted coin, we obtain the corresponding encrypted link secret
and the transfer public key. This is done by first finding
@@ -947,6 +1071,97 @@ postgres_prepare (PGconn *db_conn)
" AND rm.oldcoin_index = rcl.oldcoin_index"
" AND rcl.cnc_index=rs.noreveal_index",
1, NULL);
+
+ /* Used in #postgres_lookup_wire_transfer */
+ PREPARE ("lookup_transactions",
+ "SELECT"
+ " h_contract"
+ ",h_wire"
+ ",coin_pub"
+ ",merchant_pub"
+ ",transaction_id"
+ ",execution_time"
+ ",coin_amount_val"
+ ",coin_amount_frac"
+ ",coin_amount_curr"
+ ",coin_fee_val"
+ ",coin_fee_frac"
+ ",coin_fee_curr"
+ " FROM aggregation_tracking"
+ " WHERE wtid_raw=$1",
+ 1, NULL);
+
+ /* Used in #postgres_wire_lookup_deposit_wtid */
+ PREPARE ("lookup_deposit_wtid",
+ "SELECT"
+ " wtid_raw"
+ ",execution_time"
+ ",coin_amount_val"
+ ",coin_amount_frac"
+ ",coin_amount_curr"
+ ",coin_fee_val"
+ ",coin_fee_frac"
+ ",coin_fee_curr"
+ " FROM aggregation_tracking"
+ " WHERE"
+ " coin_pub=$1 AND"
+ " h_contract=$2 AND"
+ " h_wire=$3 AND"
+ " transaction_id=$4 AND"
+ " merchant_pub=$5",
+ 5, NULL);
+
+ /* Used in #postgres_insert_aggregation_tracking */
+ PREPARE ("insert_aggregation_tracking",
+ "INSERT INTO aggregation_tracking "
+ "(h_contract"
+ ",h_wire"
+ ",coin_pub"
+ ",merchant_pub"
+ ",transaction_id"
+ ",wtid_raw"
+ ",execution_time"
+ ",coin_amount_val"
+ ",coin_amount_frac"
+ ",coin_amount_curr"
+ ",coin_fee_val"
+ ",coin_fee_frac"
+ ",coin_fee_curr"
+ ") VALUES "
+ "($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13)",
+ 13, NULL);
+
+
+ /* Used in #postgres_wire_prepare_data_insert() to store
+ wire transfer information before actually committing it with the bank */
+ PREPARE ("wire_prepare_data_insert",
+ "INSERT INTO prewire "
+ "(type"
+ ",buf"
+ ") VALUES "
+ "($1, $2)",
+ 2, NULL);
+
+ /* Used in #postgres_wire_prepare_data_mark_finished() */
+ PREPARE ("wire_prepare_data_mark_done",
+ "UPDATE prewire"
+ " SET finished=true"
+ " WHERE serial_id=$1",
+ 1, NULL);
+
+ /* Used in #postgres_wire_prepare_data_get() */
+ PREPARE ("wire_prepare_data_get",
+ "SELECT"
+ " serial_id"
+ ",buf"
+ " FROM prewire"
+ " WHERE"
+ " type=$1 AND"
+ " finished=false"
+ " ORDER BY serial_id ASC"
+ " LIMIT 1",
+ 1, NULL);
+
return GNUNET_OK;
#undef PREPARE
}
@@ -1925,82 +2140,239 @@ postgres_have_deposit (void *cls,
/**
- * Obtain information about deposits. Iterates over all deposits
- * above a certain ID. Use a @a min_id of 0 to start at the beginning.
- * This operation is executed in its own transaction in transaction
- * mode "REPEATABLE READ", i.e. we should only see valid deposits.
+ * Mark a deposit as tiny, thereby declaring that it cannot be
+ * executed by itself and should no longer be returned by
+ * @e iterate_ready_deposits()
*
* @param cls the @e cls of this struct with the plugin-specific state
* @param session connection to the database
- * @param min_id deposit to start at
- * @param limit maximum number of transactions to fetch
- * @param deposit_cb function to call for each deposit
+ * @param deposit_rowid identifies the deposit row to modify
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
+ */
+static int
+postgres_mark_deposit_tiny (void *cls,
+ struct TALER_MINTDB_Session *session,
+ unsigned long long rowid)
+{
+ uint64_t serial_id = rowid;
+ struct TALER_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_uint64 (&serial_id),
+ TALER_PQ_query_param_end
+ };
+ PGresult *result;
+
+ result = TALER_PQ_exec_prepared (session->conn,
+ "mark_deposit_tiny",
+ params);
+ if (PGRES_COMMAND_OK !=
+ PQresultStatus (result))
+ {
+ BREAK_DB_ERR (result);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ PQclear (result);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Mark a deposit as done, thereby declaring that it cannot be
+ * executed at all anymore, and should no longer be returned by
+ * @e iterate_ready_deposits() or @e iterate_matching_deposits().
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param session connection to the database
+ * @param deposit_rowid identifies the deposit row to modify
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on error
+ */
+static int
+postgres_mark_deposit_done (void *cls,
+ struct TALER_MINTDB_Session *session,
+ unsigned long long rowid)
+{
+ uint64_t serial_id = rowid;
+ struct TALER_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_uint64 (&serial_id),
+ TALER_PQ_query_param_end
+ };
+ PGresult *result;
+
+ result = TALER_PQ_exec_prepared (session->conn,
+ "mark_deposit_done",
+ params);
+ if (PGRES_COMMAND_OK !=
+ PQresultStatus (result))
+ {
+ BREAK_DB_ERR (result);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ PQclear (result);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Obtain information about deposits that are ready to be executed.
+ * Such deposits must not be marked as "tiny" or "done", and the
+ * execution time must be in the past.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param session connection to the database
+ * @param deposit_cb function to call for ONE such deposit
* @param deposit_cb_cls closure for @a deposit_cb
* @return number of rows processed, 0 if none exist,
* #GNUNET_SYSERR on error
*/
static int
-postgres_iterate_deposits (void *cls,
- struct TALER_MINTDB_Session *session,
- uint64_t min_id,
- uint32_t limit,
- TALER_MINTDB_DepositIterator deposit_cb,
- void *deposit_cb_cls)
+postgres_get_ready_deposit (void *cls,
+ struct TALER_MINTDB_Session *session,
+ TALER_MINTDB_DepositIterator deposit_cb,
+ void *deposit_cb_cls)
{
struct TALER_PQ_QueryParam params[] = {
- TALER_PQ_query_param_uint64 (&min_id),
- TALER_PQ_query_param_uint32 (&limit),
TALER_PQ_query_param_end
};
PGresult *result;
- unsigned int i;
unsigned int n;
+ int ret;
- if (GNUNET_OK !=
- postgres_start (cls, session))
- return GNUNET_SYSERR;
- result = PQexec (session->conn,
- "SET TRANSACTION REPEATABLE READ");
- if (PGRES_COMMAND_OK !=
+ result = TALER_PQ_exec_prepared (session->conn,
+ "deposits_get_ready",
+ params);
+ if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
- TALER_LOG_ERROR ("Failed to set transaction to REPEATABL EREAD: %s\n",
- PQresultErrorMessage (result));
- GNUNET_break (0);
+ BREAK_DB_ERR (result);
PQclear (result);
return GNUNET_SYSERR;
}
+ if (0 == (n = PQntuples (result)))
+ {
+ PQclear (result);
+ return 0;
+ }
+ GNUNET_break (1 == n);
+ {
+ struct TALER_Amount amount_with_fee;
+ struct TALER_Amount deposit_fee;
+ struct GNUNET_TIME_Absolute wire_deadline;
+ struct GNUNET_HashCode h_contract;
+ struct TALER_MerchantPublicKeyP merchant_pub;
+ struct TALER_CoinSpendPublicKeyP coin_pub;
+ uint64_t transaction_id;
+ uint64_t serial_id;
+ json_t *wire;
+ struct TALER_PQ_ResultSpec rs[] = {
+ TALER_PQ_result_spec_uint64 ("serial_id",
+ &serial_id),
+ TALER_PQ_result_spec_uint64 ("transaction_id",
+ &transaction_id),
+ TALER_PQ_result_spec_amount ("amount_with_fee",
+ &amount_with_fee),
+ TALER_PQ_result_spec_amount ("deposit_fee",
+ &deposit_fee),
+ TALER_PQ_result_spec_absolute_time ("wire_deadline",
+ &wire_deadline),
+ TALER_PQ_result_spec_auto_from_type ("h_contract",
+ &h_contract),
+ TALER_PQ_result_spec_auto_from_type ("merchant_pub",
+ &merchant_pub),
+ TALER_PQ_result_spec_auto_from_type ("coin_pub",
+ &coin_pub),
+ TALER_PQ_result_spec_json ("wire",
+ &wire),
+ TALER_PQ_result_spec_end
+ };
+ if (GNUNET_OK !=
+ TALER_PQ_extract_result (result, rs, 0))
+ {
+ GNUNET_break (0);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ ret = deposit_cb (deposit_cb_cls,
+ serial_id,
+ &merchant_pub,
+ &coin_pub,
+ &amount_with_fee,
+ &deposit_fee,
+ transaction_id,
+ &h_contract,
+ wire_deadline,
+ wire);
+ TALER_PQ_cleanup_result (rs);
+ PQclear (result);
+ }
+ return (GNUNET_OK == ret) ? 1 : 0;
+}
+
+
+/**
+ * Obtain information about other pending deposits for the same
+ * destination. Those deposits must not already be "done".
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param session connection to the database
+ * @param h_wire destination of the wire transfer
+ * @param merchant_pub public key of the merchant
+ * @param deposit_cb function to call for each deposit
+ * @param deposit_cb_cls closure for @a deposit_cb
+ * @param limit maximum number of matching deposits to return
+ * @return number of rows processed, 0 if none exist,
+ * #GNUNET_SYSERR on error
+ */
+static int
+postgres_iterate_matching_deposits (void *cls,
+ struct TALER_MINTDB_Session *session,
+ const struct GNUNET_HashCode *h_wire,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ TALER_MINTDB_DepositIterator deposit_cb,
+ void *deposit_cb_cls,
+ uint32_t limit)
+{
+ struct TALER_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_auto_from_type (merchant_pub),
+ TALER_PQ_query_param_auto_from_type (h_wire),
+ TALER_PQ_query_param_uint32 (&limit),
+ TALER_PQ_query_param_end
+ };
+ PGresult *result;
+ unsigned int i;
+ unsigned int n;
result = TALER_PQ_exec_prepared (session->conn,
- "deposits_iterate",
+ "deposits_iterate_matching",
params);
if (PGRES_TUPLES_OK !=
PQresultStatus (result))
{
BREAK_DB_ERR (result);
PQclear (result);
- postgres_rollback (cls, session);
return GNUNET_SYSERR;
}
if (0 == (n = PQntuples (result)))
{
PQclear (result);
- postgres_rollback (cls, session);
return 0;
}
+ if (n > limit)
+ n = limit;
for (i=0;i<n;i++)
{
struct TALER_Amount amount_with_fee;
struct TALER_Amount deposit_fee;
struct GNUNET_TIME_Absolute wire_deadline;
struct GNUNET_HashCode h_contract;
- json_t *wire;
+ struct TALER_MerchantPublicKeyP merchant_pub;
+ struct TALER_CoinSpendPublicKeyP coin_pub;
uint64_t transaction_id;
- uint64_t id;
+ uint64_t serial_id;
int ret;
struct TALER_PQ_ResultSpec rs[] = {
- TALER_PQ_result_spec_uint64 ("id",
- &id),
+ TALER_PQ_result_spec_uint64 ("serial_id",
+ &serial_id),
TALER_PQ_result_spec_uint64 ("transaction_id",
&transaction_id),
TALER_PQ_result_spec_amount ("amount_with_fee",
@@ -2011,8 +2383,10 @@ postgres_iterate_deposits (void *cls,
&wire_deadline),
TALER_PQ_result_spec_auto_from_type ("h_contract",
&h_contract),
- TALER_PQ_result_spec_json ("wire",
- &wire),
+ TALER_PQ_result_spec_auto_from_type ("merchant_pub",
+ &merchant_pub),
+ TALER_PQ_result_spec_auto_from_type ("coin_pub",
+ &coin_pub),
TALER_PQ_result_spec_end
};
if (GNUNET_OK !=
@@ -2020,23 +2394,23 @@ postgres_iterate_deposits (void *cls,
{
GNUNET_break (0);
PQclear (result);
- postgres_rollback (cls, session);
return GNUNET_SYSERR;
}
ret = deposit_cb (deposit_cb_cls,
- id,
+ serial_id,
+ &merchant_pub,
+ &coin_pub,
&amount_with_fee,
&deposit_fee,
transaction_id,
&h_contract,
wire_deadline,
- wire);
+ NULL);
TALER_PQ_cleanup_result (rs);
PQclear (result);
if (GNUNET_OK != ret)
break;
}
- postgres_rollback (cls, session);
return i;
}
@@ -3365,11 +3739,101 @@ postgres_get_coin_transactions (void *cls,
/**
+ * Lookup the list of Taler transactions that was aggregated
+ * into a wire transfer by the respective @a wtid.
+ *
+ * @param cls closure
+ * @param session database connection
+ * @param wtid the raw wire transfer identifier we used
+ * @param cb function to call on each transaction found
+ * @param cb_cls closure for @a cb
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on database errors,
+ * #GNUNET_NO if we found no results
+ */
+static int
+postgres_lookup_wire_transfer (void *cls,
+ struct TALER_MINTDB_Session *session,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ TALER_MINTDB_WireTransferDataCallback cb,
+ void *cb_cls)
+{
+ PGresult *result;
+ struct TALER_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_auto_from_type (wtid),
+ TALER_PQ_query_param_end
+ };
+ int nrows;
+ int i;
+
+ /* check if the melt record exists and get it */
+ result = TALER_PQ_exec_prepared (session->conn,
+ "lookup_transactions",
+ params);
+ if (PGRES_TUPLES_OK != PQresultStatus (result))
+ {
+ BREAK_DB_ERR (result);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ nrows = PQntuples (result);
+ if (0 == nrows)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "lookup_wire_transfer() returned 0 matching rows\n");
+ PQclear (result);
+ return GNUNET_NO;
+ }
+ for (i=0;i<nrows;i++)
+ {
+ struct GNUNET_HashCode h_contract;
+ struct GNUNET_HashCode h_wire;
+ struct TALER_CoinSpendPublicKeyP coin_pub;
+ struct TALER_MerchantPublicKeyP merchant_pub;
+ uint64_t transaction_id;
+ struct GNUNET_TIME_Absolute exec_time;
+ struct TALER_Amount coin_amount;
+ struct TALER_Amount coin_fee;
+ struct TALER_Amount transfer_amount;
+ struct TALER_PQ_ResultSpec rs[] = {
+ TALER_PQ_result_spec_auto_from_type ("h_contract", &h_contract),
+ TALER_PQ_result_spec_auto_from_type ("h_wire", &h_wire),
+ TALER_PQ_result_spec_auto_from_type ("coin_pub", &coin_pub),
+ TALER_PQ_result_spec_auto_from_type ("merchant_pub", &merchant_pub),
+ TALER_PQ_result_spec_uint64 ("transaction_id", &transaction_id),
+ TALER_PQ_result_spec_absolute_time ("execution_time", &exec_time),
+ TALER_PQ_result_spec_amount ("coin_amount", &coin_amount),
+ TALER_PQ_result_spec_amount ("coin_fee", &coin_fee),
+ TALER_PQ_result_spec_amount ("transfer_total", &transfer_amount),
+ TALER_PQ_result_spec_end
+ };
+ if (GNUNET_OK != TALER_PQ_extract_result (result, rs, i))
+ {
+ GNUNET_break (0);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ cb (cb_cls,
+ &merchant_pub,
+ &h_wire,
+ &h_contract,
+ transaction_id,
+ &coin_pub,
+ &coin_amount,
+ &coin_fee,
+ &transfer_amount);
+ }
+ PQclear (result);
+ return GNUNET_OK;
+}
+
+
+/**
* Try to find the wire transfer details for a deposit operation.
* If we did not execute the deposit yet, return when it is supposed
* to be executed.
- *
+ *
* @param cls closure
+ * @param session database connection
* @param h_contract hash of the contract
* @param h_wire hash of merchant wire details
* @param coin_pub public key of deposited coin
@@ -3377,10 +3841,12 @@ postgres_get_coin_transactions (void *cls,
* @param transaction_id transaction identifier
* @param cb function to call with the result
* @param cb_cls closure to pass to @a cb
- * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors,
+ * #GNUNET_NO if nothing was found
*/
static int
postgres_wire_lookup_deposit_wtid (void *cls,
+ struct TALER_MINTDB_Session *session,
const struct GNUNET_HashCode *h_contract,
const struct GNUNET_HashCode *h_wire,
const struct TALER_CoinSpendPublicKeyP *coin_pub,
@@ -3389,8 +3855,341 @@ postgres_wire_lookup_deposit_wtid (void *cls,
TALER_MINTDB_DepositWtidCallback cb,
void *cb_cls)
{
- GNUNET_break (0); // not implemented
- return GNUNET_SYSERR;
+ PGresult *result;
+ struct TALER_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_auto_from_type (coin_pub),
+ TALER_PQ_query_param_auto_from_type (h_contract),
+ TALER_PQ_query_param_auto_from_type (h_wire),
+ TALER_PQ_query_param_uint64 (&transaction_id),
+ TALER_PQ_query_param_auto_from_type (merchant_pub),
+ TALER_PQ_query_param_end
+ };
+ int nrows;
+
+ /* check if the melt record exists and get it */
+ result = TALER_PQ_exec_prepared (session->conn,
+ "lookup_deposit_wtid",
+ params);
+ if (PGRES_TUPLES_OK != PQresultStatus (result))
+ {
+ BREAK_DB_ERR (result);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ nrows = PQntuples (result);
+ if (0 == nrows)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "lookup_deposit_wtid returned 0 matching rows\n");
+ PQclear (result);
+
+ /* Check if transaction exists in deposits, so that we just
+ do not have a WTID yet, if so, do call the CB with a NULL wtid
+ and return GNUNET_YES! */
+ {
+ struct TALER_PQ_QueryParam params2[] = {
+ TALER_PQ_query_param_auto_from_type (coin_pub),
+ TALER_PQ_query_param_uint64 (&transaction_id),
+ TALER_PQ_query_param_auto_from_type (merchant_pub),
+ TALER_PQ_query_param_auto_from_type (h_contract),
+ TALER_PQ_query_param_auto_from_type (h_wire),
+ TALER_PQ_query_param_end
+ };
+
+ result = TALER_PQ_exec_prepared (session->conn,
+ "get_deposit_for_wtid",
+ params2);
+ if (PGRES_TUPLES_OK != PQresultStatus (result))
+ {
+ BREAK_DB_ERR (result);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ }
+ nrows = PQntuples (result);
+ if (0 == nrows)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "get_deposit_for_wtid returned 0 matching rows\n");
+ PQclear (result);
+ return GNUNET_NO;
+ }
+
+ /* Ok, we're aware of the transaction, but it has not yet been
+ executed */
+ {
+ struct GNUNET_TIME_Absolute exec_time;
+ struct TALER_Amount coin_amount;
+ struct TALER_Amount coin_fee;
+ struct TALER_PQ_ResultSpec rs[] = {
+ TALER_PQ_result_spec_amount ("amount_with_fee", &coin_amount),
+ TALER_PQ_result_spec_amount ("deposit_fee", &coin_fee),
+ TALER_PQ_result_spec_absolute_time ("wire_deadline", &exec_time),
+ TALER_PQ_result_spec_end
+ };
+
+ if (GNUNET_OK != TALER_PQ_extract_result (result, rs, 0))
+ {
+ GNUNET_break (0);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ cb (cb_cls,
+ NULL,
+ &coin_amount,
+ &coin_fee,
+ exec_time);
+ PQclear (result);
+ return GNUNET_YES;
+ }
+ }
+ if (1 != nrows)
+ {
+ GNUNET_break (0);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ {
+ struct TALER_WireTransferIdentifierRawP wtid;
+ struct GNUNET_TIME_Absolute exec_time;
+ struct TALER_Amount coin_amount;
+ struct TALER_Amount coin_fee;
+ struct TALER_PQ_ResultSpec rs[] = {
+ TALER_PQ_result_spec_auto_from_type ("wtid_raw", &wtid),
+ TALER_PQ_result_spec_absolute_time ("execution_time", &exec_time),
+ TALER_PQ_result_spec_amount ("coin_amount", &coin_amount),
+ TALER_PQ_result_spec_amount ("coin_fee", &coin_fee),
+ TALER_PQ_result_spec_end
+ };
+ if (GNUNET_OK != TALER_PQ_extract_result (result, rs, 0))
+ {
+ GNUNET_break (0);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ cb (cb_cls,
+ &wtid,
+ &coin_amount,
+ &coin_fee,
+ exec_time);
+ }
+ PQclear (result);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Function called to insert aggregation information into the DB.
+ *
+ * @param cls closure
+ * @param session database connection
+ * @param wtid the raw wire transfer identifier we used
+ * @param merchant_pub public key of the merchant (should be same for all callbacks with the same @e cls)
+ * @param h_wire hash of wire transfer details of the merchant (should be same for all callbacks with the same @e cls)
+ * @param h_contract which contract was this payment about
+ * @param transaction_id merchant's transaction ID for the payment
+ * @param coin_pub which public key was this payment about
+ * @param coin_value amount contributed by this coin in total
+ * @param coin_fee deposit fee charged by mint for this coin
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors
+ */
+static int
+postgres_insert_aggregation_tracking (void *cls,
+ struct TALER_MINTDB_Session *session,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct GNUNET_HashCode *h_wire,
+ const struct GNUNET_HashCode *h_contract,
+ uint64_t transaction_id,
+ struct GNUNET_TIME_Absolute execution_time,
+ const struct TALER_CoinSpendPublicKeyP *coin_pub,
+ const struct TALER_Amount *coin_value,
+ const struct TALER_Amount *coin_fee)
+{
+ struct TALER_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_auto_from_type (h_contract),
+ TALER_PQ_query_param_auto_from_type (h_wire),
+ TALER_PQ_query_param_auto_from_type (coin_pub),
+ TALER_PQ_query_param_auto_from_type (merchant_pub),
+ TALER_PQ_query_param_uint64 (&transaction_id),
+ TALER_PQ_query_param_auto_from_type (wtid),
+ TALER_PQ_query_param_absolute_time (&execution_time),
+ TALER_PQ_query_param_amount (coin_value),
+ TALER_PQ_query_param_amount (coin_fee),
+ TALER_PQ_query_param_end
+ };
+ PGresult *result;
+
+ result = TALER_PQ_exec_prepared (session->conn,
+ "insert_aggregation_tracking",
+ params);
+ if (PGRES_COMMAND_OK != PQresultStatus (result))
+ {
+ BREAK_DB_ERR (result);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ if (0 != strcmp ("1", PQcmdTuples (result)))
+ {
+ GNUNET_break (0);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ PQclear (result);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Function called to insert wire transfer commit data into the DB.
+ *
+ * @param cls closure
+ * @param session database connection
+ * @param type type of the wire transfer (i.e. "sepa")
+ * @param buf buffer with wire transfer preparation data
+ * @param buf_size number of bytes in @a buf
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors
+ */
+static int
+postgres_wire_prepare_data_insert (void *cls,
+ struct TALER_MINTDB_Session *session,
+ const char *type,
+ const char *buf,
+ size_t buf_size)
+{
+ PGresult *result;
+ struct TALER_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_fixed_size (type, strlen (type) + 1),
+ TALER_PQ_query_param_fixed_size (buf, buf_size),
+ TALER_PQ_query_param_end
+ };
+
+ result = TALER_PQ_exec_prepared (session->conn,
+ "wire_prepare_data_insert",
+ params);
+ if (PGRES_COMMAND_OK != PQresultStatus (result))
+ {
+ BREAK_DB_ERR (result);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ PQclear (result);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Function called to mark wire transfer commit data as finished.
+ *
+ * @param cls closure
+ * @param session database connection
+ * @param rowid which entry to mark as finished
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR on DB errors
+ */
+static int
+postgres_wire_prepare_data_mark_finished (void *cls,
+ struct TALER_MINTDB_Session *session,
+ unsigned long long rowid)
+{
+ uint64_t serial_id = rowid;
+ struct TALER_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_uint64 (&serial_id),
+ TALER_PQ_query_param_end
+ };
+ PGresult *result;
+
+ result = TALER_PQ_exec_prepared (session->conn,
+ "wire_prepare_data_mark_done",
+ params);
+ if (PGRES_COMMAND_OK !=
+ PQresultStatus (result))
+ {
+ BREAK_DB_ERR (result);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ PQclear (result);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Function called to get an unfinished wire transfer
+ * preparation data. Fetches at most one item.
+ *
+ * @param cls closure
+ * @param session database connection
+ * @param type type fo the wire transfer (i.e. "sepa")
+ * @param cb function to call for ONE unfinished item
+ * @param cb_cls closure for @a cb
+ * @return #GNUNET_OK on success,
+ * #GNUNET_NO if there are no entries,
+ * #GNUNET_SYSERR on DB errors
+ */
+static int
+postgres_wire_prepare_data_get (void *cls,
+ struct TALER_MINTDB_Session *session,
+ const char *type,
+ TALER_MINTDB_WirePreparationCallback cb,
+ void *cb_cls)
+{
+ PGresult *result;
+ struct TALER_PQ_QueryParam params[] = {
+ TALER_PQ_query_param_fixed_size (type, strlen (type) + 1),
+ TALER_PQ_query_param_end
+ };
+
+ result = TALER_PQ_exec_prepared (session->conn,
+ "wire_prepare_data_get",
+ params);
+ if (PGRES_TUPLES_OK != PQresultStatus (result))
+ {
+ QUERY_ERR (result);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ if (0 == PQntuples (result))
+ {
+ PQclear (result);
+ return GNUNET_NO;
+ }
+ if (1 != PQntuples (result))
+ {
+ GNUNET_break (0);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+
+ {
+ uint64_t serial_id;
+ void *buf = NULL;
+ size_t buf_size;
+ struct TALER_PQ_ResultSpec rs[] = {
+ TALER_PQ_result_spec_uint64 ("serial_id",
+ &serial_id),
+ TALER_PQ_result_spec_variable_size ("buf",
+ &buf,
+ &buf_size),
+ TALER_PQ_result_spec_end
+ };
+
+ if (GNUNET_OK !=
+ TALER_PQ_extract_result (result,
+ rs,
+ 0))
+ {
+ GNUNET_break (0);
+ PQclear (result);
+ return GNUNET_SYSERR;
+ }
+ cb (cb_cls,
+ serial_id,
+ buf,
+ buf_size);
+ TALER_PQ_cleanup_result (rs);
+ }
+ PQclear (result);
+ return GNUNET_OK;
}
@@ -3413,6 +4212,7 @@ libtaler_plugin_mintdb_postgres_init (void *cls)
&db_conn_destroy))
{
TALER_LOG_ERROR ("Cannnot create pthread key.\n");
+ GNUNET_free (pg);
return NULL;
}
if (GNUNET_OK !=
@@ -3422,8 +4222,9 @@ libtaler_plugin_mintdb_postgres_init (void *cls)
&pg->connection_cfg_str))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
- "mint",
+ "mintdb-postgres",
"db_conn_str");
+ GNUNET_free (pg);
return NULL;
}
plugin = GNUNET_new (struct TALER_MINTDB_Plugin);
@@ -3443,9 +4244,11 @@ libtaler_plugin_mintdb_postgres_init (void *cls)
plugin->get_reserve_history = &postgres_get_reserve_history;
plugin->free_reserve_history = &common_free_reserve_history;
plugin->have_deposit = &postgres_have_deposit;
- plugin->iterate_deposits = &postgres_iterate_deposits;
+ plugin->mark_deposit_tiny = &postgres_mark_deposit_tiny;
+ plugin->mark_deposit_done = &postgres_mark_deposit_done;
+ plugin->get_ready_deposit = &postgres_get_ready_deposit;
+ plugin->iterate_matching_deposits = &postgres_iterate_matching_deposits;
plugin->insert_deposit = &postgres_insert_deposit;
-
plugin->get_refresh_session = &postgres_get_refresh_session;
plugin->create_refresh_session = &postgres_create_refresh_session;
plugin->insert_refresh_melt = &postgres_insert_refresh_melt;
@@ -3456,7 +4259,6 @@ libtaler_plugin_mintdb_postgres_init (void *cls)
plugin->get_refresh_commit_coins = &postgres_get_refresh_commit_coins;
plugin->insert_refresh_commit_links = &postgres_insert_refresh_commit_links;
plugin->get_refresh_commit_links = &postgres_get_refresh_commit_links;
-
plugin->get_melt_commitment = &postgres_get_melt_commitment;
plugin->free_melt_commitment = &common_free_melt_commitment;
plugin->insert_refresh_out = &postgres_insert_refresh_out;
@@ -3465,7 +4267,12 @@ libtaler_plugin_mintdb_postgres_init (void *cls)
plugin->get_transfer = &postgres_get_transfer;
plugin->get_coin_transactions = &postgres_get_coin_transactions;
plugin->free_coin_transaction_list = &common_free_coin_transaction_list;
+ plugin->lookup_wire_transfer = &postgres_lookup_wire_transfer;
plugin->wire_lookup_deposit_wtid = &postgres_wire_lookup_deposit_wtid;
+ plugin->insert_aggregation_tracking = &postgres_insert_aggregation_tracking;
+ plugin->wire_prepare_data_insert = &postgres_wire_prepare_data_insert;
+ plugin->wire_prepare_data_mark_finished = &postgres_wire_prepare_data_mark_finished;
+ plugin->wire_prepare_data_get = &postgres_wire_prepare_data_get;
return plugin;
}