From 5149af93147c54055d99af688993de3fb4c36ddf Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Fri, 3 Sep 2021 19:08:02 +0200 Subject: preliminary work on supporting sharding/parallel aggregation (undertested, but tests pass again) --- src/exchangedb/Makefile.am | 4 +- src/exchangedb/drop0002.sql | 2 - src/exchangedb/drop0003.sql | 26 ++ src/exchangedb/exchange-0003.sql | 75 ++++++ src/exchangedb/plugin_exchangedb_postgres.c | 397 ++++++++++++++++++++++++++-- src/exchangedb/test_exchangedb.c | 15 +- 6 files changed, 493 insertions(+), 26 deletions(-) create mode 100644 src/exchangedb/drop0003.sql create mode 100644 src/exchangedb/exchange-0003.sql (limited to 'src/exchangedb') diff --git a/src/exchangedb/Makefile.am b/src/exchangedb/Makefile.am index 5114fb9cb..eae037267 100644 --- a/src/exchangedb/Makefile.am +++ b/src/exchangedb/Makefile.am @@ -18,8 +18,10 @@ sql_DATA = \ exchange-0000.sql \ exchange-0001.sql \ exchange-0002.sql \ + exchange-0003.sql \ drop0001.sql \ - drop0002.sql + drop0002.sql \ + drop0003.sql EXTRA_DIST = \ exchangedb.conf \ diff --git a/src/exchangedb/drop0002.sql b/src/exchangedb/drop0002.sql index 5bffab667..12db64c54 100644 --- a/src/exchangedb/drop0002.sql +++ b/src/exchangedb/drop0002.sql @@ -17,8 +17,6 @@ -- Everything in one big transaction BEGIN; --- exchange-0002 did not create new tables, so nothing to do here. - -- Unregister patch (0002.sql) SELECT _v.unregister_patch('exchange-0002'); diff --git a/src/exchangedb/drop0003.sql b/src/exchangedb/drop0003.sql new file mode 100644 index 000000000..fbdab04c6 --- /dev/null +++ b/src/exchangedb/drop0003.sql @@ -0,0 +1,26 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2020 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see +-- + +-- Everything in one big transaction +BEGIN; + +-- Unregister patch (0003.sql) +SELECT _v.unregister_patch('exchange-0003'); + +DROP TABLE IF EXISTS revolving_work_shards CASCADE; + +-- And we're out of here... +COMMIT; diff --git a/src/exchangedb/exchange-0003.sql b/src/exchangedb/exchange-0003.sql new file mode 100644 index 000000000..e1c9273fb --- /dev/null +++ b/src/exchangedb/exchange-0003.sql @@ -0,0 +1,75 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2021 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see +-- + +-- Everything in one big transaction +BEGIN; + +-- Check patch versioning is in place. +SELECT _v.register_patch('exchange-0003', NULL, NULL); + + + +ALTER TABLE deposits + ADD COLUMN shard INT4 NOT NULL DEFAULT 0; +COMMENT ON COLUMN deposits.shard + IS 'Used for load sharding. Should be set based on h_wire, merchant_pub and a service salt. Default of 0 onlyapplies for colums migrated from a previous version without sharding support. 64-bit value because we need an *unsigned* 32-bit value.'; + +DROP INDEX deposits_get_ready_index; +CREATE INDEX deposits_get_ready_index + ON deposits + (shard + ,tiny + ,done + ,wire_deadline + ,refund_deadline + ); +COMMENT ON INDEX deposits_get_ready_index + IS 'for deposits_get_ready'; + + + +CREATE UNLOGGED TABLE IF NOT EXISTS revolving_work_shards + (shard_serial_id BIGSERIAL UNIQUE + ,last_attempt INT8 NOT NULL + ,start_row INT4 NOT NULL + ,end_row INT4 NOT NULL + ,active BOOLEAN NOT NULL DEFAULT FALSE + ,job_name VARCHAR NOT NULL + ,PRIMARY KEY (job_name, start_row) + ); +CREATE INDEX IF NOT EXISTS revolving_work_shards_index + ON revolving_work_shards + (job_name + ,active + ,last_attempt + ); +COMMENT ON TABLE revolving_work_shards + IS 'coordinates work between multiple processes working on the same job with partitions that need to be repeatedly processed; unlogged because on system crashes the locks represented by this table will have to be cleared anyway, typically using "taler-exchange-dbinit -s"'; +COMMENT ON COLUMN revolving_work_shards.shard_serial_id + IS 'unique serial number identifying the shard'; +COMMENT ON COLUMN revolving_work_shards.last_attempt + IS 'last time a worker attempted to work on the shard'; +COMMENT ON COLUMN revolving_work_shards.active + IS 'set to TRUE when a worker is active on the shard'; +COMMENT ON COLUMN revolving_work_shards.start_row + IS 'row at which the shard scope starts, inclusive'; +COMMENT ON COLUMN revolving_work_shards.end_row + IS 'row at which the shard scope ends, exclusive'; +COMMENT ON COLUMN revolving_work_shards.job_name + IS 'unique name of the job the workers on this shard are performing'; + +-- Complete transaction +COMMIT; diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c index ae090baf7..70c337c57 100644 --- a/src/exchangedb/plugin_exchangedb_postgres.c +++ b/src/exchangedb/plugin_exchangedb_postgres.c @@ -874,11 +874,12 @@ prepare_statements (struct PostgresClosure *pg) ",coin_sig" ",wire" ",exchange_timestamp" + ",shard" ") SELECT known_coin_id, $2, $3, $4, $5, $6, " - " $7, $8, $9, $10, $11, $12" + " $7, $8, $9, $10, $11, $12, $13" " FROM known_coins" " WHERE coin_pub=$1;", - 12), + 13), /* Fetch an existing deposit request, used to ensure idempotency during /deposit processing. Used in #postgres_have_deposit(). */ GNUNET_PQ_make_prepare ("get_deposit", @@ -958,13 +959,18 @@ prepare_statements (struct PostgresClosure *pg) " FROM deposits" " JOIN known_coins kc USING (known_coin_id)" " JOIN denominations denom USING (denominations_serial)" - " WHERE tiny=FALSE" - " AND done=FALSE" - " AND wire_deadline<=$1" - " AND refund_deadline<$1" - " ORDER BY wire_deadline ASC" + " WHERE " + " shard >= $2" + " AND shard <= $3" + " AND tiny=FALSE" + " AND done=FALSE" + " AND wire_deadline<=$1" + " AND refund_deadline<$1" + " ORDER BY " + " shard ASC" + " ,wire_deadline ASC" " LIMIT 1;", - 1), + 3), /* Used in #postgres_iterate_matching_deposits() */ GNUNET_PQ_make_prepare ("deposits_iterate_matching", "SELECT" @@ -2399,6 +2405,18 @@ prepare_statements (struct PostgresClosure *pg) " ORDER BY last_attempt ASC" " LIMIT 1;", 2), + /* Used in #postgres_begin_revolving_shard() */ + GNUNET_PQ_make_prepare ("get_open_revolving_shard", + "SELECT" + " start_row" + ",end_row" + " FROM revolving_work_shards" + " WHERE job_name=$1" + " AND active=FALSE" + " ORDER BY last_attempt ASC" + " LIMIT 1;", + 2), + /* Used in #postgres_begin_shard() */ GNUNET_PQ_make_prepare ("reclaim_shard", "UPDATE work_shards" " SET last_attempt=$2" @@ -2406,6 +2424,16 @@ prepare_statements (struct PostgresClosure *pg) " AND start_row=$3" " AND end_row=$4", 4), + /* Used in #postgres_begin_revolving_shard() */ + GNUNET_PQ_make_prepare ("reclaim_revolving_shard", + "UPDATE revolving_work_shards" + " SET last_attempt=$2" + " ,active=TRUE" + " WHERE job_name=$1" + " AND start_row=$3" + " AND end_row=$4", + 4), + /* Used in #postgres_begin_shard() */ GNUNET_PQ_make_prepare ("get_last_shard", "SELECT" " end_row" @@ -2414,6 +2442,16 @@ prepare_statements (struct PostgresClosure *pg) " ORDER BY end_row DESC" " LIMIT 1;", 1), + /* Used in #postgres_begin_revolving_shard() */ + GNUNET_PQ_make_prepare ("get_last_revolving_shard", + "SELECT" + " end_row" + " FROM revolving_work_shards" + " WHERE job_name=$1" + " ORDER BY end_row DESC" + " LIMIT 1;", + 1), + /* Used in #postgres_begin_shard() */ GNUNET_PQ_make_prepare ("claim_next_shard", "INSERT INTO work_shards" "(job_name" @@ -2423,6 +2461,17 @@ prepare_statements (struct PostgresClosure *pg) ") VALUES " "($1, $2, $3, $4);", 4), + /* Used in #postgres_claim_revolving_shard() */ + GNUNET_PQ_make_prepare ("create_revolving_shard", + "INSERT INTO revolving_work_shards" + "(job_name" + ",last_attempt" + ",start_row" + ",end_row" + ",active" + ") VALUES " + "($1, $2, $3, $4, TRUE);", + 4), /* Used in #postgres_complete_shard() */ GNUNET_PQ_make_prepare ("complete_shard", "UPDATE work_shards" @@ -2431,6 +2480,18 @@ prepare_statements (struct PostgresClosure *pg) " AND start_row=$2" " AND end_row=$3", 3), + /* Used in #postgres_complete_shard() */ + GNUNET_PQ_make_prepare ("release_revolving_shard", + "UPDATE revolving_work_shards" + " SET active=FALSE" + " WHERE job_name=$1" + " AND start_row=$2" + " AND end_row=$3", + 3), + /* Used in #postgres_delete_revolving_shards() */ + GNUNET_PQ_make_prepare ("delete_revolving_shards", + "DELETE FROM revolving_work_shards", + 0), GNUNET_PQ_PREPARED_STATEMENT_END }; @@ -4462,12 +4523,16 @@ postgres_mark_deposit_done (void *cls, * execution time must be in the past. * * @param cls the @e cls of this struct with the plugin-specific state + * @param start_shard_row minimum shard row to select + * @param end_shard_row maximum shard row to select (inclusive) * @param deposit_cb function to call for ONE such deposit * @param deposit_cb_cls closure for @a deposit_cb * @return transaction status code */ static enum GNUNET_DB_QueryStatus postgres_get_ready_deposit (void *cls, + uint32_t start_shard_row, + uint32_t end_shard_row, TALER_EXCHANGEDB_DepositIterator deposit_cb, void *deposit_cb_cls) { @@ -4475,6 +4540,8 @@ postgres_get_ready_deposit (void *cls, struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); struct GNUNET_PQ_QueryParam params[] = { TALER_PQ_query_param_absolute_time (&now), + GNUNET_PQ_query_param_uint32 (&start_shard_row), + GNUNET_PQ_query_param_uint32 (&end_shard_row), GNUNET_PQ_query_param_end }; struct TALER_Amount amount_with_fee; @@ -4504,6 +4571,8 @@ postgres_get_ready_deposit (void *cls, enum GNUNET_DB_QueryStatus qs; (void) GNUNET_TIME_round_abs (&now); + GNUNET_assert (start_shard_row < end_shard_row); + GNUNET_assert (end_shard_row <= INT32_MAX); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Finding ready deposits by deadline %s (%llu)\n", GNUNET_STRINGS_absolute_time_to_string (now), @@ -4900,6 +4969,35 @@ postgres_ensure_coin_known (void *cls, } +/** + * Compute the shard number of a given @a deposit + * + * @param deposit deposit to compute shard for + * @return shard number + */ +static uint32_t +compute_shard (const struct TALER_EXCHANGEDB_Deposit *deposit) +{ + uint32_t res; + + GNUNET_assert (GNUNET_YES == + GNUNET_CRYPTO_kdf (&res, + sizeof (res), + &deposit->h_wire, + sizeof (deposit->h_wire), + &deposit->merchant_pub, + sizeof (deposit->merchant_pub), + NULL, 0)); + /* interpret hash result as NBO for platform independence, + convert to HBO and map to [0..2^31-1] range */ + res = ntohl (res); + if (res > INT32_MAX) + res += INT32_MIN; + GNUNET_assert (res <= INT32_MAX); + return res; +} + + /** * Insert information about deposited coin into the database. * @@ -4914,6 +5012,7 @@ postgres_insert_deposit (void *cls, const struct TALER_EXCHANGEDB_Deposit *deposit) { struct PostgresClosure *pg = cls; + uint32_t shard = compute_shard (deposit); struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_auto_from_type (&deposit->coin.coin_pub), TALER_PQ_query_param_amount (&deposit->amount_with_fee), @@ -4926,9 +5025,11 @@ postgres_insert_deposit (void *cls, GNUNET_PQ_query_param_auto_from_type (&deposit->csig), TALER_PQ_query_param_json (deposit->receiver_wire_account), TALER_PQ_query_param_absolute_time (&exchange_timestamp), + GNUNET_PQ_query_param_uint32 (&shard), GNUNET_PQ_query_param_end }; + GNUNET_assert (shard <= INT32_MAX); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Inserting deposit to be executed at %s (%llu/%llu)\n", GNUNET_STRINGS_absolute_time_to_string (deposit->wire_deadline), @@ -6933,18 +7034,19 @@ postgres_wire_prepare_data_get (void *cls, /** - * Start a transaction where we transiently violate the foreign + * Starts a READ COMMITTED transaction where we transiently violate the foreign * constraints on the "wire_out" table as we insert aggregations * and only add the wire transfer out at the end. * * @param cls the @e cls of this struct with the plugin-specific state * @return #GNUNET_OK on success */ -static int +static enum GNUNET_GenericReturnValue postgres_start_deferred_wire_out (void *cls) { struct PostgresClosure *pg = cls; struct GNUNET_PQ_ExecuteStatement es[] = { + GNUNET_PQ_make_execute ("START TRANSACTION ISOLATION LEVEL READ COMMITTED"), GNUNET_PQ_make_execute ("SET CONSTRAINTS wire_out_ref DEFERRED"), GNUNET_PQ_EXECUTE_STATEMENT_END }; @@ -6952,10 +7054,6 @@ postgres_start_deferred_wire_out (void *cls) if (GNUNET_SYSERR == postgres_preflight (pg)) return GNUNET_SYSERR; - if (GNUNET_OK != - postgres_start (pg, - "deferred wire out")) - return GNUNET_SYSERR; if (GNUNET_OK != GNUNET_PQ_exec_statements (pg->conn, es)) @@ -6966,6 +7064,7 @@ postgres_start_deferred_wire_out (void *cls) postgres_rollback (pg); return GNUNET_SYSERR; } + pg->transaction_name = "deferred wire out"; return GNUNET_OK; } @@ -8041,7 +8140,7 @@ struct RecoupSerialContext /** * Status code, set to #GNUNET_SYSERR on hard errors. */ - int status; + enum GNUNET_GenericReturnValue status; }; @@ -10380,6 +10479,268 @@ postgres_complete_shard (void *cls, } +/** + * Function called to grab a revolving work shard on an operation @a op. Runs + * in its own transaction. Returns the oldest inactive shard. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param job_name name of the operation to grab a revolving shard for + * @param shard_size desired shard size + * @param shard_limit exclusive end of the shard range + * @param[out] start_row inclusive start row of the shard (returned) + * @param[out] end_row exclusive end row of the shard (returned) + * @return transaction status code + */ +static enum GNUNET_DB_QueryStatus +postgres_begin_revolving_shard (void *cls, + const char *job_name, + uint32_t shard_size, + uint32_t shard_limit, + uint32_t *start_row, + uint32_t *end_row) +{ + struct PostgresClosure *pg = cls; + + GNUNET_assert (shard_limit <= 1U + (uint32_t) INT32_MAX); + GNUNET_assert (shard_limit > 0); + GNUNET_assert (shard_size > 0); + for (unsigned int retries = 0; retries<3; retries++) + { + if (GNUNET_OK != + postgres_start (pg, + "begin_revolving_shard")) + { + GNUNET_break (0); + return GNUNET_DB_STATUS_HARD_ERROR; + } + + /* First, find last 'end_row' */ + { + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint32 ("end_row", + start_row), + GNUNET_PQ_result_spec_end + }; + + qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "get_last_revolving_shard", + params, + rs); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + postgres_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + postgres_rollback (pg); + continue; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + break; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + *start_row = 0; /* base-case: no shards yet */ + break; /* continued below */ + } + } /* get_last_shard */ + + if (*start_row < shard_limit) + { + /* Claim fresh shard */ + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_TIME_Absolute now; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_absolute_time (&now), + GNUNET_PQ_query_param_uint32 (start_row), + GNUNET_PQ_query_param_uint32 (end_row), + GNUNET_PQ_query_param_end + }; + + *end_row = GNUNET_MIN (shard_limit, + *start_row + shard_size); + now = GNUNET_TIME_absolute_get (); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Trying to claim shard %llu-%llu\n", + (unsigned long long) *start_row, + (unsigned long long) *end_row); + qs = GNUNET_PQ_eval_prepared_non_select (pg->conn, + "create_revolving_shard", + params); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + postgres_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + postgres_rollback (pg); + continue; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* continued below (with commit) */ + break; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* someone else got this shard already, + try again */ + postgres_rollback (pg); + continue; + } + } /* end create fresh reovlving shard */ + else + { + /* claim oldest existing shard */ + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint32 ("start_row", + start_row), + GNUNET_PQ_result_spec_uint32 ("end_row", + end_row), + GNUNET_PQ_result_spec_end + }; + + qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "get_open_revolving_shard", + params, + rs); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + postgres_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + postgres_rollback (pg); + continue; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* no open shards available */ + postgres_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + { + enum GNUNET_DB_QueryStatus qs; + struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get (); + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_absolute_time (&now), + GNUNET_PQ_query_param_uint32 (start_row), + GNUNET_PQ_query_param_uint32 (end_row), + GNUNET_PQ_query_param_end + }; + + qs = GNUNET_PQ_eval_prepared_non_select (pg->conn, + "reclaim_revolving_shard", + params); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + postgres_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + postgres_rollback (pg); + continue; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + break; /* continue with commit */ + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_break (0); /* logic error, should be impossible */ + postgres_rollback (pg); + return GNUNET_DB_STATUS_HARD_ERROR; + } + } + break; /* continue with commit */ + } + } /* end claim oldest existing shard */ + + /* commit */ + { + enum GNUNET_DB_QueryStatus qs; + + qs = postgres_commit (pg); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + postgres_rollback (pg); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + postgres_rollback (pg); + continue; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + } + } + } /* retry 'for' loop */ + return GNUNET_DB_STATUS_SOFT_ERROR; +} + + +/** + * Function called to release a revolving shard + * back into the work pool. Clears the + * "completed" flag. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @param job_name name of the operation to grab a word shard for + * @param start_row inclusive start row of the shard + * @param end_row exclusive end row of the shard + * @return transaction status code + */ +enum GNUNET_DB_QueryStatus +postgres_release_revolving_shard (void *cls, + const char *job_name, + uint32_t start_row, + uint32_t end_row) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_string (job_name), + GNUNET_PQ_query_param_uint32 (&start_row), + GNUNET_PQ_query_param_uint32 (&end_row), + GNUNET_PQ_query_param_end + }; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Releasing revolving shard %s %u-%u\n", + job_name, + (unsigned int) start_row, + (unsigned int) end_row); + return GNUNET_PQ_eval_prepared_non_select (pg->conn, + "release_revolving_shard", + params); +} + + +/** + * Function called to delete all revolving shards. + * To be used after a crash or when the shard size is + * changed. + * + * @param cls the @e cls of this struct with the plugin-specific state + * @return transaction status code + */ +enum GNUNET_DB_QueryStatus +postgres_delete_revolving_shards (void *cls) +{ + struct PostgresClosure *pg = cls; + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_end + }; + + return GNUNET_PQ_eval_prepared_non_select (pg->conn, + "delete_revolving_shards", + params); +} + + /** * Initialize Postgres database subsystem. * @@ -10592,6 +10953,12 @@ libtaler_plugin_exchangedb_postgres_init (void *cls) = &postgres_begin_shard; plugin->complete_shard = &postgres_complete_shard; + plugin->begin_revolving_shard + = &postgres_begin_revolving_shard; + plugin->release_revolving_shard + = &postgres_release_revolving_shard; + plugin->delete_revolving_shards + = &postgres_delete_revolving_shards; return plugin; } diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c index 1cccb23c2..8478fac0d 100644 --- a/src/exchangedb/test_exchangedb.c +++ b/src/exchangedb/test_exchangedb.c @@ -804,29 +804,22 @@ static uint64_t deposit_rowid; * @param cls closure a `struct TALER_EXCHANGEDB_Deposit *` * @param rowid unique ID for the deposit in our DB, used for marking * it as 'tiny' or 'done' - * @param exchange_timestamp when did the deposit happen - * @param wallet_timestamp when did the wallet sign the contract * @param merchant_pub public key of the merchant * @param coin_pub public key of the coin * @param amount_with_fee amount that was deposited including fee * @param deposit_fee amount the exchange gets to keep as transaction fees * @param h_contract_terms hash of the proposal data known to merchant and customer - * @param wire_deadline by which the merchant advised that he would like the - * wire transfer to be executed * @param wire wire details for the merchant * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate */ static enum GNUNET_DB_QueryStatus deposit_cb (void *cls, uint64_t rowid, - struct GNUNET_TIME_Absolute exchange_timestamp, - struct GNUNET_TIME_Absolute wallet_timestamp, const struct TALER_MerchantPublicKeyP *merchant_pub, const struct TALER_CoinSpendPublicKeyP *coin_pub, const struct TALER_Amount *amount_with_fee, const struct TALER_Amount *deposit_fee, const struct GNUNET_HashCode *h_contract_terms, - struct GNUNET_TIME_Absolute wire_deadline, const json_t *wire) { struct TALER_EXCHANGEDB_Deposit *deposit = cls; @@ -1896,9 +1889,11 @@ run (void *cls) &matching_deposit_cb, &deposit, 2)); - sleep (2); /* giv deposit time to be ready */ + sleep (2); /* give deposit time to be ready */ FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != plugin->get_ready_deposit (plugin->cls, + 0, + INT32_MAX, &deposit_cb, &deposit)); FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != @@ -1911,11 +1906,15 @@ run (void *cls) deposit_rowid)); FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != plugin->get_ready_deposit (plugin->cls, + 0, + INT32_MAX, &deposit_cb, &deposit)); plugin->rollback (plugin->cls); FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != plugin->get_ready_deposit (plugin->cls, + 0, + INT32_MAX, &deposit_cb, &deposit)); FAILIF (GNUNET_OK != -- cgit v1.2.3