From 48c2edc28dc54c21ecb30b9aec31a3699c3f9bd0 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 27 Jan 2016 16:42:24 +0100 Subject: working on mintdb for #4141 --- src/include/taler_mintdb_plugin.h | 54 +++---- src/mint/taler-mint-aggregator.c | 21 +-- src/mintdb/plugin_mintdb_postgres.c | 295 ++++++++++++++++++++++++++++++------ 3 files changed, 278 insertions(+), 92 deletions(-) (limited to 'src') diff --git a/src/include/taler_mintdb_plugin.h b/src/include/taler_mintdb_plugin.h index 65c694a72..11b305839 100644 --- a/src/include/taler_mintdb_plugin.h +++ b/src/include/taler_mintdb_plugin.h @@ -530,22 +530,24 @@ struct TALER_MINTDB_Session; * corresponding wire transaction. * * @param cls closure - * @param id transaction ID (used as future `min_id` to avoid - * iterating over transactions more than once) + * @param rowid unique ID for the deposit in our DB, used for marking + * it as 'tiny' or 'done' + * @param merchant_pub public key of the merchant + * @param coin_pub public key of the coin * @param amount_with_fee amount that was deposited including fee * @param deposit_fee amount the mint gets to keep as transaction fees * @param transaction_id unique transaction ID chosen by the merchant * @param h_contract hash of the contract between merchant and customer * @param wire_deadline by which the merchant adviced that he would like the * wire transfer to be executed - * @param wire wire details for the merchant + * @param wire wire details for the merchant, NULL from iterate_matching_deposits() * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop */ typedef int (*TALER_MINTDB_DepositIterator)(void *cls, - // unsigned long long rowid, /* ? */ - // May also need/want Merchant pub!? - uint64_t id, + unsigned long long rowid, + const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_CoinSpendPublicKeyP *coin_pub, const struct TALER_Amount *amount_with_fee, const struct TALER_Amount *deposit_fee, uint64_t transaction_id, @@ -940,10 +942,10 @@ struct TALER_MINTDB_Plugin * #GNUNET_SYSERR on error */ int - (*iterate_ready_deposits) (void *cls, - struct TALER_MINTDB_Session *session, - TALER_MINTDB_DepositIterator deposit_cb, - void *deposit_cb_cls); + (*get_ready_deposit) (void *cls, + struct TALER_MINTDB_Session *session, + TALER_MINTDB_DepositIterator deposit_cb, + void *deposit_cb_cls); /** @@ -953,9 +955,10 @@ struct TALER_MINTDB_Plugin * @param cls the @e cls of this struct with the plugin-specific state * @param session connection to the database * @param h_wire destination of the wire transfer - * @param FIXME: do we also need merchant_pub here? + * @param merchant_pub public key of the merchant * @param deposit_cb function to call for each deposit * @param deposit_cb_cls closure for @a deposit_cb + * @param limit maximum number of matching deposits to return * @return number of rows processed, 0 if none exist, * #GNUNET_SYSERR on error */ @@ -963,33 +966,10 @@ struct TALER_MINTDB_Plugin (*iterate_matching_deposits) (void *cls, struct TALER_MINTDB_Session *session, const struct GNUNET_HashCode *h_wire, + const struct TALER_MerchantPublicKeyP *merchant_pub, TALER_MINTDB_DepositIterator deposit_cb, - void *deposit_cb_cls); - - - /** - * Obtain information about deposits. Iterates over all deposits - * above a certain ID. Use a @a min_id of 0 to start at the beginning. - * This operation is executed in its own transaction in transaction - * mode "REPEATABLE READ", i.e. we should only see valid deposits. - * - * @param cls the @e cls of this struct with the plugin-specific state - * @param session connection to the database - * @param min_id deposit to start at - * @param limit maximum number of transactions to fetch - * @param deposit_cb function to call for each deposit - * @param deposit_cb_cls closure for @a deposit_cb - * @return number of rows processed, 0 if none exist, - * #GNUNET_SYSERR on error - * @deprecated this is likely dead - */ - int - (*iterate_deposits) (void *cls, - struct TALER_MINTDB_Session *session, - uint64_t min_id, - uint32_t limit, - TALER_MINTDB_DepositIterator deposit_cb, - void *deposit_cb_cls); + void *deposit_cb_cls, + uint32_t limit); /** diff --git a/src/mint/taler-mint-aggregator.c b/src/mint/taler-mint-aggregator.c index 70a68d007..ee0f6ab22 100644 --- a/src/mint/taler-mint-aggregator.c +++ b/src/mint/taler-mint-aggregator.c @@ -148,8 +148,9 @@ mint_serve_process_config (const char *mint_directory) * with the goal of executing the corresponding wire transaction. * * @param cls closure - * @param id transaction ID (used as future `min_id` to avoid - * iterating over transactions more than once) + * @param row_id identifies database entry + * @param merchant_pub public key of the merchant + * @param coin_pub public key of the coin * @param amount_with_fee amount that was deposited including fee * @param deposit_fee amount the mint gets to keep as transaction fees * @param transaction_id unique transaction ID chosen by the merchant @@ -161,7 +162,9 @@ mint_serve_process_config (const char *mint_directory) */ static int deposit_cb (void *cls, - uint64_t id, + unsigned long long row_id, + const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_CoinSpendPublicKeyP *coin_pub, const struct TALER_Amount *amount_with_fee, const struct TALER_Amount *deposit_fee, uint64_t transaction_id, @@ -204,12 +207,12 @@ run (void *cls, *global_ret = GNUNET_SYSERR; return; } - ret = db_plugin->iterate_deposits (db_plugin->cls, - session, - 0 /* FIXME: remove? */, - 128 /* FIXME: make configurable? */, - &deposit_cb, - NULL); + ret = db_plugin->get_ready_deposit (db_plugin->cls, + session, + &deposit_cb, + NULL); + // FIXME: handle 0 == ret... + if (GNUNET_OK != ret) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, diff --git a/src/mintdb/plugin_mintdb_postgres.c b/src/mintdb/plugin_mintdb_postgres.c index e9a9466b2..5ebf8dc7b 100644 --- a/src/mintdb/plugin_mintdb_postgres.c +++ b/src/mintdb/plugin_mintdb_postgres.c @@ -425,7 +425,7 @@ postgres_create_tables (void *cls, /* This table contains the wire transfers the mint is supposed to execute to transmit funds to the merchants (and manage refunds). */ SQLEXEC("CREATE TABLE IF NOT EXISTS deposits " - "(serial_id BIGSERIAL" + "(serial_id BIGSERIAL PRIMARY KEY" ",coin_pub BYTEA NOT NULL CHECK (LENGTH(coin_pub)=32)" ",denom_pub BYTEA NOT NULL REFERENCES denominations (pub)" ",denom_sig BYTEA NOT NULL" @@ -444,6 +444,8 @@ postgres_create_tables (void *cls, ",h_wire BYTEA NOT NULL CHECK (LENGTH(h_wire)=64)" ",coin_sig BYTEA NOT NULL CHECK (LENGTH(coin_sig)=64)" ",wire TEXT NOT NULL" + ",tiny BOOLEAN NOT NULL DEFAULT false" + ",done BOOLEAN NOT NULL DEFAULT false" ")"); /* Index for get_deposit statement on coin_pub, transaction_id and merchant_pub */ SQLEXEC_INDEX("CREATE INDEX deposits_coin_pub_index " @@ -899,8 +901,8 @@ postgres_prepare (PGconn *db_conn) " )", 5, NULL); - /* Used in #postgres_iterate_deposits() */ - PREPARE ("deposits_iterate", + /* Used in #postgres_get_ready_deposit() */ + PREPARE ("deposits_get_ready", "SELECT" " serial_id" ",amount_with_fee_val" @@ -913,11 +915,50 @@ postgres_prepare (PGconn *db_conn) ",transaction_id" ",h_contract" ",wire" + ",merchant_pub" + ",coin_pub" " FROM deposits" - " WHERE serial_id>=$1" - " ORDER BY serial_id ASC" - " LIMIT $2;", - 2, NULL); + " WHERE" + " tiny=false AND" + " done=false" + " ORDER BY execution_time ASC" + " LIMIT 1;", + 0, NULL); + /* Used in #postgres_iterate_matching_deposits() */ + PREPARE ("deposits_iterate_matching", + "SELECT" + " serial_id" + ",amount_with_fee_val" + ",amount_with_fee_frac" + ",amount_with_fee_curr" + ",deposit_fee_val" + ",deposit_fee_frac" + ",deposit_fee_curr" + ",wire_deadline" + ",transaction_id" + ",h_contract" + ",coin_pub" + " FROM deposits" + " WHERE" + " merchant_pub=$1 AND" + " h_wire=$2 AND" + " done=false" + " ORDER BY execution_time ASC" + " LIMIT $3", + 3, NULL); + /* Used in #postgres_mark_deposit_tiny() */ + PREPARE ("mark_deposit_tiny", + "UPDATE deposits" + " SET tiny=true" + " WHERE serial_id=$1", + 1, NULL); + /* Used in #postgres_mark_deposit_done() */ + PREPARE ("mark_deposit_done", + "UPDATE deposits" + " SET done=true" + " WHERE serial_id=$1", + 1, NULL); + /* Used in #postgres_get_coin_transactions() to obtain information about how a coin has been spend with /deposit requests. */ PREPARE ("get_deposit_with_coin_pub", @@ -2039,82 +2080,239 @@ postgres_have_deposit (void *cls, /** - * Obtain information about deposits. Iterates over all deposits - * above a certain ID. Use a @a min_id of 0 to start at the beginning. - * This operation is executed in its own transaction in transaction - * mode "REPEATABLE READ", i.e. we should only see valid deposits. + * Mark a deposit as tiny, thereby declaring that it cannot be + * executed by itself and should no longer be returned by + * @e iterate_ready_deposits() * * @param cls the @e cls of this struct with the plugin-specific state * @param session connection to the database - * @param min_id deposit to start at - * @param limit maximum number of transactions to fetch - * @param deposit_cb function to call for each deposit + * @param deposit_rowid identifies the deposit row to modify + * @return #GNUNET_OK on success, #GNUNET_SYSERR on error + */ +static int +postgres_mark_deposit_tiny (void *cls, + struct TALER_MINTDB_Session *session, + unsigned long long rowid) +{ + uint64_t serial_id = rowid; + struct TALER_PQ_QueryParam params[] = { + TALER_PQ_query_param_uint64 (&serial_id), + TALER_PQ_query_param_end + }; + PGresult *result; + + result = TALER_PQ_exec_prepared (session->conn, + "mark_deposit_tiny", + params); + if (PGRES_COMMAND_OK != + PQresultStatus (result)) + { + BREAK_DB_ERR (result); + PQclear (result); + return GNUNET_SYSERR; + } + PQclear (result); + return GNUNET_OK; +} + + +/** + * Mark a deposit as done, thereby declaring that it cannot be + * executed at all anymore, and should no longer be returned by + * @e iterate_ready_deposits() or @e iterate_matching_deposits(). + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param session connection to the database + * @param deposit_rowid identifies the deposit row to modify + * @return #GNUNET_OK on success, #GNUNET_SYSERR on error + */ +static int +postgres_mark_deposit_done (void *cls, + struct TALER_MINTDB_Session *session, + unsigned long long rowid) +{ + uint64_t serial_id = rowid; + struct TALER_PQ_QueryParam params[] = { + TALER_PQ_query_param_uint64 (&serial_id), + TALER_PQ_query_param_end + }; + PGresult *result; + + result = TALER_PQ_exec_prepared (session->conn, + "mark_deposit_done", + params); + if (PGRES_COMMAND_OK != + PQresultStatus (result)) + { + BREAK_DB_ERR (result); + PQclear (result); + return GNUNET_SYSERR; + } + PQclear (result); + return GNUNET_OK; +} + + +/** + * Obtain information about deposits that are ready to be executed. + * Such deposits must not be marked as "tiny" or "done", and the + * execution time must be in the past. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param session connection to the database + * @param deposit_cb function to call for ONE such deposit * @param deposit_cb_cls closure for @a deposit_cb * @return number of rows processed, 0 if none exist, * #GNUNET_SYSERR on error */ static int -postgres_iterate_deposits (void *cls, - struct TALER_MINTDB_Session *session, - uint64_t min_id, - uint32_t limit, - TALER_MINTDB_DepositIterator deposit_cb, - void *deposit_cb_cls) +postgres_get_ready_deposit (void *cls, + struct TALER_MINTDB_Session *session, + TALER_MINTDB_DepositIterator deposit_cb, + void *deposit_cb_cls) { struct TALER_PQ_QueryParam params[] = { - TALER_PQ_query_param_uint64 (&min_id), - TALER_PQ_query_param_uint32 (&limit), TALER_PQ_query_param_end }; PGresult *result; - unsigned int i; unsigned int n; + int ret; - if (GNUNET_OK != - postgres_start (cls, session)) - return GNUNET_SYSERR; - result = PQexec (session->conn, - "SET TRANSACTION REPEATABLE READ"); - if (PGRES_COMMAND_OK != + result = TALER_PQ_exec_prepared (session->conn, + "deposits_get_ready", + params); + if (PGRES_TUPLES_OK != PQresultStatus (result)) { - TALER_LOG_ERROR ("Failed to set transaction to REPEATABL EREAD: %s\n", - PQresultErrorMessage (result)); - GNUNET_break (0); + BREAK_DB_ERR (result); PQclear (result); return GNUNET_SYSERR; } + if (0 == (n = PQntuples (result))) + { + PQclear (result); + return 0; + } + GNUNET_break (1 == n); + { + struct TALER_Amount amount_with_fee; + struct TALER_Amount deposit_fee; + struct GNUNET_TIME_Absolute wire_deadline; + struct GNUNET_HashCode h_contract; + struct TALER_MerchantPublicKeyP merchant_pub; + struct TALER_CoinSpendPublicKeyP coin_pub; + uint64_t transaction_id; + uint64_t serial_id; + json_t *wire; + struct TALER_PQ_ResultSpec rs[] = { + TALER_PQ_result_spec_uint64 ("serial_id", + &serial_id), + TALER_PQ_result_spec_uint64 ("transaction_id", + &transaction_id), + TALER_PQ_result_spec_amount ("amount_with_fee", + &amount_with_fee), + TALER_PQ_result_spec_amount ("deposit_fee", + &deposit_fee), + TALER_PQ_result_spec_absolute_time ("wire_deadline", + &wire_deadline), + TALER_PQ_result_spec_auto_from_type ("h_contract", + &h_contract), + TALER_PQ_result_spec_auto_from_type ("merchant_pub", + &merchant_pub), + TALER_PQ_result_spec_auto_from_type ("coin_pub", + &coin_pub), + TALER_PQ_result_spec_json ("wire", + &wire), + TALER_PQ_result_spec_end + }; + if (GNUNET_OK != + TALER_PQ_extract_result (result, rs, 0)) + { + GNUNET_break (0); + PQclear (result); + return GNUNET_SYSERR; + } + ret = deposit_cb (deposit_cb_cls, + serial_id, + &merchant_pub, + &coin_pub, + &amount_with_fee, + &deposit_fee, + transaction_id, + &h_contract, + wire_deadline, + wire); + TALER_PQ_cleanup_result (rs); + PQclear (result); + } + return (GNUNET_OK == ret) ? 1 : 0; +} + + +/** + * Obtain information about other pending deposits for the same + * destination. Those deposits must not already be "done". + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param session connection to the database + * @param h_wire destination of the wire transfer + * @param merchant_pub public key of the merchant + * @param deposit_cb function to call for each deposit + * @param deposit_cb_cls closure for @a deposit_cb + * @param limit maximum number of matching deposits to return + * @return number of rows processed, 0 if none exist, + * #GNUNET_SYSERR on error + */ +static int +postgres_iterate_matching_deposits (void *cls, + struct TALER_MINTDB_Session *session, + const struct GNUNET_HashCode *h_wire, + const struct TALER_MerchantPublicKeyP *merchant_pub, + TALER_MINTDB_DepositIterator deposit_cb, + void *deposit_cb_cls, + uint32_t limit) +{ + struct TALER_PQ_QueryParam params[] = { + TALER_PQ_query_param_auto_from_type (merchant_pub), + TALER_PQ_query_param_auto_from_type (h_wire), + TALER_PQ_query_param_uint32 (&limit), + TALER_PQ_query_param_end + }; + PGresult *result; + unsigned int i; + unsigned int n; result = TALER_PQ_exec_prepared (session->conn, - "deposits_iterate", + "deposits_iterate_matching", params); if (PGRES_TUPLES_OK != PQresultStatus (result)) { BREAK_DB_ERR (result); PQclear (result); - postgres_rollback (cls, session); return GNUNET_SYSERR; } if (0 == (n = PQntuples (result))) { PQclear (result); - postgres_rollback (cls, session); return 0; } + if (n > limit) + n = limit; for (i=0;iget_reserve_history = &postgres_get_reserve_history; plugin->free_reserve_history = &common_free_reserve_history; plugin->have_deposit = &postgres_have_deposit; - plugin->iterate_deposits = &postgres_iterate_deposits; + plugin->mark_deposit_tiny = &postgres_mark_deposit_tiny; + plugin->mark_deposit_done = &postgres_mark_deposit_done; + plugin->get_ready_deposit = &postgres_get_ready_deposit; + plugin->iterate_matching_deposits = &postgres_iterate_matching_deposits; plugin->insert_deposit = &postgres_insert_deposit; plugin->get_refresh_session = &postgres_get_refresh_session; plugin->create_refresh_session = &postgres_create_refresh_session; -- cgit v1.2.3