summaryrefslogtreecommitdiff
path: root/src/exchangedb
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-09-03 19:08:02 +0200
committerChristian Grothoff <christian@grothoff.org>2021-09-03 19:08:02 +0200
commit5149af93147c54055d99af688993de3fb4c36ddf (patch)
tree8652e56bed70976309412e11c35305a352576a9b /src/exchangedb
parent6e1877b142d4819a248b01aebfdd6f337f82a509 (diff)
downloadexchange-5149af93147c54055d99af688993de3fb4c36ddf.tar.gz
exchange-5149af93147c54055d99af688993de3fb4c36ddf.tar.bz2
exchange-5149af93147c54055d99af688993de3fb4c36ddf.zip
preliminary work on supporting sharding/parallel aggregation (undertested, but tests pass again)
Diffstat (limited to 'src/exchangedb')
-rw-r--r--src/exchangedb/Makefile.am4
-rw-r--r--src/exchangedb/drop0002.sql2
-rw-r--r--src/exchangedb/drop0003.sql26
-rw-r--r--src/exchangedb/exchange-0003.sql75
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c397
-rw-r--r--src/exchangedb/test_exchangedb.c15
6 files changed, 493 insertions, 26 deletions
diff --git a/src/exchangedb/Makefile.am b/src/exchangedb/Makefile.am
index 5114fb9cb..eae037267 100644
--- a/src/exchangedb/Makefile.am
+++ b/src/exchangedb/Makefile.am
@@ -18,8 +18,10 @@ sql_DATA = \
exchange-0000.sql \
exchange-0001.sql \
exchange-0002.sql \
+ exchange-0003.sql \
drop0001.sql \
- drop0002.sql
+ drop0002.sql \
+ drop0003.sql
EXTRA_DIST = \
exchangedb.conf \
diff --git a/src/exchangedb/drop0002.sql b/src/exchangedb/drop0002.sql
index 5bffab667..12db64c54 100644
--- a/src/exchangedb/drop0002.sql
+++ b/src/exchangedb/drop0002.sql
@@ -17,8 +17,6 @@
-- Everything in one big transaction
BEGIN;
--- exchange-0002 did not create new tables, so nothing to do here.
-
-- Unregister patch (0002.sql)
SELECT _v.unregister_patch('exchange-0002');
diff --git a/src/exchangedb/drop0003.sql b/src/exchangedb/drop0003.sql
new file mode 100644
index 000000000..fbdab04c6
--- /dev/null
+++ b/src/exchangedb/drop0003.sql
@@ -0,0 +1,26 @@
+--
+-- This file is part of TALER
+-- Copyright (C) 2020 Taler Systems SA
+--
+-- TALER is free software; you can redistribute it and/or modify it under the
+-- terms of the GNU General Public License as published by the Free Software
+-- Foundation; either version 3, or (at your option) any later version.
+--
+-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+-- A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License along with
+-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+--
+
+-- Everything in one big transaction
+BEGIN;
+
+-- Unregister patch (0003.sql)
+SELECT _v.unregister_patch('exchange-0003');
+
+DROP TABLE IF EXISTS revolving_work_shards CASCADE;
+
+-- And we're out of here...
+COMMIT;
diff --git a/src/exchangedb/exchange-0003.sql b/src/exchangedb/exchange-0003.sql
new file mode 100644
index 000000000..e1c9273fb
--- /dev/null
+++ b/src/exchangedb/exchange-0003.sql
@@ -0,0 +1,75 @@
+--
+-- This file is part of TALER
+-- Copyright (C) 2021 Taler Systems SA
+--
+-- TALER is free software; you can redistribute it and/or modify it under the
+-- terms of the GNU General Public License as published by the Free Software
+-- Foundation; either version 3, or (at your option) any later version.
+--
+-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+-- A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License along with
+-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+--
+
+-- Everything in one big transaction
+BEGIN;
+
+-- Check patch versioning is in place.
+SELECT _v.register_patch('exchange-0003', NULL, NULL);
+
+
+
+ALTER TABLE deposits
+ ADD COLUMN shard INT4 NOT NULL DEFAULT 0;
+COMMENT ON COLUMN deposits.shard
+ IS 'Used for load sharding. Should be set based on h_wire, merchant_pub and a service salt. Default of 0 onlyapplies for colums migrated from a previous version without sharding support. 64-bit value because we need an *unsigned* 32-bit value.';
+
+DROP INDEX deposits_get_ready_index;
+CREATE INDEX deposits_get_ready_index
+ ON deposits
+ (shard
+ ,tiny
+ ,done
+ ,wire_deadline
+ ,refund_deadline
+ );
+COMMENT ON INDEX deposits_get_ready_index
+ IS 'for deposits_get_ready';
+
+
+
+CREATE UNLOGGED TABLE IF NOT EXISTS revolving_work_shards
+ (shard_serial_id BIGSERIAL UNIQUE
+ ,last_attempt INT8 NOT NULL
+ ,start_row INT4 NOT NULL
+ ,end_row INT4 NOT NULL
+ ,active BOOLEAN NOT NULL DEFAULT FALSE
+ ,job_name VARCHAR NOT NULL
+ ,PRIMARY KEY (job_name, start_row)
+ );
+CREATE INDEX IF NOT EXISTS revolving_work_shards_index
+ ON revolving_work_shards
+ (job_name
+ ,active
+ ,last_attempt
+ );
+COMMENT ON TABLE revolving_work_shards
+ IS 'coordinates work between multiple processes working on the same job with partitions that need to be repeatedly processed; unlogged because on system crashes the locks represented by this table will have to be cleared anyway, typically using "taler-exchange-dbinit -s"';
+COMMENT ON COLUMN revolving_work_shards.shard_serial_id
+ IS 'unique serial number identifying the shard';
+COMMENT ON COLUMN revolving_work_shards.last_attempt
+ IS 'last time a worker attempted to work on the shard';
+COMMENT ON COLUMN revolving_work_shards.active
+ IS 'set to TRUE when a worker is active on the shard';
+COMMENT ON COLUMN revolving_work_shards.start_row
+ IS 'row at which the shard scope starts, inclusive';
+COMMENT ON COLUMN revolving_work_shards.end_row
+ IS 'row at which the shard scope ends, exclusive';
+COMMENT ON COLUMN revolving_work_shards.job_name
+ IS 'unique name of the job the workers on this shard are performing';
+
+-- Complete transaction
+COMMIT;
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index ae090baf7..70c337c57 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -874,11 +874,12 @@ prepare_statements (struct PostgresClosure *pg)
",coin_sig"
",wire"
",exchange_timestamp"
+ ",shard"
") SELECT known_coin_id, $2, $3, $4, $5, $6, "
- " $7, $8, $9, $10, $11, $12"
+ " $7, $8, $9, $10, $11, $12, $13"
" FROM known_coins"
" WHERE coin_pub=$1;",
- 12),
+ 13),
/* Fetch an existing deposit request, used to ensure idempotency
during /deposit processing. Used in #postgres_have_deposit(). */
GNUNET_PQ_make_prepare ("get_deposit",
@@ -958,13 +959,18 @@ prepare_statements (struct PostgresClosure *pg)
" FROM deposits"
" JOIN known_coins kc USING (known_coin_id)"
" JOIN denominations denom USING (denominations_serial)"
- " WHERE tiny=FALSE"
- " AND done=FALSE"
- " AND wire_deadline<=$1"
- " AND refund_deadline<$1"
- " ORDER BY wire_deadline ASC"
+ " WHERE "
+ " shard >= $2"
+ " AND shard <= $3"
+ " AND tiny=FALSE"
+ " AND done=FALSE"
+ " AND wire_deadline<=$1"
+ " AND refund_deadline<$1"
+ " ORDER BY "
+ " shard ASC"
+ " ,wire_deadline ASC"
" LIMIT 1;",
- 1),
+ 3),
/* Used in #postgres_iterate_matching_deposits() */
GNUNET_PQ_make_prepare ("deposits_iterate_matching",
"SELECT"
@@ -2399,6 +2405,18 @@ prepare_statements (struct PostgresClosure *pg)
" ORDER BY last_attempt ASC"
" LIMIT 1;",
2),
+ /* Used in #postgres_begin_revolving_shard() */
+ GNUNET_PQ_make_prepare ("get_open_revolving_shard",
+ "SELECT"
+ " start_row"
+ ",end_row"
+ " FROM revolving_work_shards"
+ " WHERE job_name=$1"
+ " AND active=FALSE"
+ " ORDER BY last_attempt ASC"
+ " LIMIT 1;",
+ 2),
+ /* Used in #postgres_begin_shard() */
GNUNET_PQ_make_prepare ("reclaim_shard",
"UPDATE work_shards"
" SET last_attempt=$2"
@@ -2406,6 +2424,16 @@ prepare_statements (struct PostgresClosure *pg)
" AND start_row=$3"
" AND end_row=$4",
4),
+ /* Used in #postgres_begin_revolving_shard() */
+ GNUNET_PQ_make_prepare ("reclaim_revolving_shard",
+ "UPDATE revolving_work_shards"
+ " SET last_attempt=$2"
+ " ,active=TRUE"
+ " WHERE job_name=$1"
+ " AND start_row=$3"
+ " AND end_row=$4",
+ 4),
+ /* Used in #postgres_begin_shard() */
GNUNET_PQ_make_prepare ("get_last_shard",
"SELECT"
" end_row"
@@ -2414,6 +2442,16 @@ prepare_statements (struct PostgresClosure *pg)
" ORDER BY end_row DESC"
" LIMIT 1;",
1),
+ /* Used in #postgres_begin_revolving_shard() */
+ GNUNET_PQ_make_prepare ("get_last_revolving_shard",
+ "SELECT"
+ " end_row"
+ " FROM revolving_work_shards"
+ " WHERE job_name=$1"
+ " ORDER BY end_row DESC"
+ " LIMIT 1;",
+ 1),
+ /* Used in #postgres_begin_shard() */
GNUNET_PQ_make_prepare ("claim_next_shard",
"INSERT INTO work_shards"
"(job_name"
@@ -2423,6 +2461,17 @@ prepare_statements (struct PostgresClosure *pg)
") VALUES "
"($1, $2, $3, $4);",
4),
+ /* Used in #postgres_claim_revolving_shard() */
+ GNUNET_PQ_make_prepare ("create_revolving_shard",
+ "INSERT INTO revolving_work_shards"
+ "(job_name"
+ ",last_attempt"
+ ",start_row"
+ ",end_row"
+ ",active"
+ ") VALUES "
+ "($1, $2, $3, $4, TRUE);",
+ 4),
/* Used in #postgres_complete_shard() */
GNUNET_PQ_make_prepare ("complete_shard",
"UPDATE work_shards"
@@ -2431,6 +2480,18 @@ prepare_statements (struct PostgresClosure *pg)
" AND start_row=$2"
" AND end_row=$3",
3),
+ /* Used in #postgres_complete_shard() */
+ GNUNET_PQ_make_prepare ("release_revolving_shard",
+ "UPDATE revolving_work_shards"
+ " SET active=FALSE"
+ " WHERE job_name=$1"
+ " AND start_row=$2"
+ " AND end_row=$3",
+ 3),
+ /* Used in #postgres_delete_revolving_shards() */
+ GNUNET_PQ_make_prepare ("delete_revolving_shards",
+ "DELETE FROM revolving_work_shards",
+ 0),
GNUNET_PQ_PREPARED_STATEMENT_END
};
@@ -4462,12 +4523,16 @@ postgres_mark_deposit_done (void *cls,
* execution time must be in the past.
*
* @param cls the @e cls of this struct with the plugin-specific state
+ * @param start_shard_row minimum shard row to select
+ * @param end_shard_row maximum shard row to select (inclusive)
* @param deposit_cb function to call for ONE such deposit
* @param deposit_cb_cls closure for @a deposit_cb
* @return transaction status code
*/
static enum GNUNET_DB_QueryStatus
postgres_get_ready_deposit (void *cls,
+ uint32_t start_shard_row,
+ uint32_t end_shard_row,
TALER_EXCHANGEDB_DepositIterator deposit_cb,
void *deposit_cb_cls)
{
@@ -4475,6 +4540,8 @@ postgres_get_ready_deposit (void *cls,
struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
struct GNUNET_PQ_QueryParam params[] = {
TALER_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_uint32 (&start_shard_row),
+ GNUNET_PQ_query_param_uint32 (&end_shard_row),
GNUNET_PQ_query_param_end
};
struct TALER_Amount amount_with_fee;
@@ -4504,6 +4571,8 @@ postgres_get_ready_deposit (void *cls,
enum GNUNET_DB_QueryStatus qs;
(void) GNUNET_TIME_round_abs (&now);
+ GNUNET_assert (start_shard_row < end_shard_row);
+ GNUNET_assert (end_shard_row <= INT32_MAX);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Finding ready deposits by deadline %s (%llu)\n",
GNUNET_STRINGS_absolute_time_to_string (now),
@@ -4901,6 +4970,35 @@ postgres_ensure_coin_known (void *cls,
/**
+ * Compute the shard number of a given @a deposit
+ *
+ * @param deposit deposit to compute shard for
+ * @return shard number
+ */
+static uint32_t
+compute_shard (const struct TALER_EXCHANGEDB_Deposit *deposit)
+{
+ uint32_t res;
+
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CRYPTO_kdf (&res,
+ sizeof (res),
+ &deposit->h_wire,
+ sizeof (deposit->h_wire),
+ &deposit->merchant_pub,
+ sizeof (deposit->merchant_pub),
+ NULL, 0));
+ /* interpret hash result as NBO for platform independence,
+ convert to HBO and map to [0..2^31-1] range */
+ res = ntohl (res);
+ if (res > INT32_MAX)
+ res += INT32_MIN;
+ GNUNET_assert (res <= INT32_MAX);
+ return res;
+}
+
+
+/**
* Insert information about deposited coin into the database.
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
@@ -4914,6 +5012,7 @@ postgres_insert_deposit (void *cls,
const struct TALER_EXCHANGEDB_Deposit *deposit)
{
struct PostgresClosure *pg = cls;
+ uint32_t shard = compute_shard (deposit);
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&deposit->coin.coin_pub),
TALER_PQ_query_param_amount (&deposit->amount_with_fee),
@@ -4926,9 +5025,11 @@ postgres_insert_deposit (void *cls,
GNUNET_PQ_query_param_auto_from_type (&deposit->csig),
TALER_PQ_query_param_json (deposit->receiver_wire_account),
TALER_PQ_query_param_absolute_time (&exchange_timestamp),
+ GNUNET_PQ_query_param_uint32 (&shard),
GNUNET_PQ_query_param_end
};
+ GNUNET_assert (shard <= INT32_MAX);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Inserting deposit to be executed at %s (%llu/%llu)\n",
GNUNET_STRINGS_absolute_time_to_string (deposit->wire_deadline),
@@ -6933,18 +7034,19 @@ postgres_wire_prepare_data_get (void *cls,
/**
- * Start a transaction where we transiently violate the foreign
+ * Starts a READ COMMITTED transaction where we transiently violate the foreign
* constraints on the "wire_out" table as we insert aggregations
* and only add the wire transfer out at the end.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @return #GNUNET_OK on success
*/
-static int
+static enum GNUNET_GenericReturnValue
postgres_start_deferred_wire_out (void *cls)
{
struct PostgresClosure *pg = cls;
struct GNUNET_PQ_ExecuteStatement es[] = {
+ GNUNET_PQ_make_execute ("START TRANSACTION ISOLATION LEVEL READ COMMITTED"),
GNUNET_PQ_make_execute ("SET CONSTRAINTS wire_out_ref DEFERRED"),
GNUNET_PQ_EXECUTE_STATEMENT_END
};
@@ -6953,10 +7055,6 @@ postgres_start_deferred_wire_out (void *cls)
postgres_preflight (pg))
return GNUNET_SYSERR;
if (GNUNET_OK !=
- postgres_start (pg,
- "deferred wire out"))
- return GNUNET_SYSERR;
- if (GNUNET_OK !=
GNUNET_PQ_exec_statements (pg->conn,
es))
{
@@ -6966,6 +7064,7 @@ postgres_start_deferred_wire_out (void *cls)
postgres_rollback (pg);
return GNUNET_SYSERR;
}
+ pg->transaction_name = "deferred wire out";
return GNUNET_OK;
}
@@ -8041,7 +8140,7 @@ struct RecoupSerialContext
/**
* Status code, set to #GNUNET_SYSERR on hard errors.
*/
- int status;
+ enum GNUNET_GenericReturnValue status;
};
@@ -10381,6 +10480,268 @@ postgres_complete_shard (void *cls,
/**
+ * Function called to grab a revolving work shard on an operation @a op. Runs
+ * in its own transaction. Returns the oldest inactive shard.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param job_name name of the operation to grab a revolving shard for
+ * @param shard_size desired shard size
+ * @param shard_limit exclusive end of the shard range
+ * @param[out] start_row inclusive start row of the shard (returned)
+ * @param[out] end_row exclusive end row of the shard (returned)
+ * @return transaction status code
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_begin_revolving_shard (void *cls,
+ const char *job_name,
+ uint32_t shard_size,
+ uint32_t shard_limit,
+ uint32_t *start_row,
+ uint32_t *end_row)
+{
+ struct PostgresClosure *pg = cls;
+
+ GNUNET_assert (shard_limit <= 1U + (uint32_t) INT32_MAX);
+ GNUNET_assert (shard_limit > 0);
+ GNUNET_assert (shard_size > 0);
+ for (unsigned int retries = 0; retries<3; retries++)
+ {
+ if (GNUNET_OK !=
+ postgres_start (pg,
+ "begin_revolving_shard"))
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+
+ /* First, find last 'end_row' */
+ {
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint32 ("end_row",
+ start_row),
+ GNUNET_PQ_result_spec_end
+ };
+
+ qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "get_last_revolving_shard",
+ params,
+ rs);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (pg);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (pg);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ *start_row = 0; /* base-case: no shards yet */
+ break; /* continued below */
+ }
+ } /* get_last_shard */
+
+ if (*start_row < shard_limit)
+ {
+ /* Claim fresh shard */
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_uint32 (start_row),
+ GNUNET_PQ_query_param_uint32 (end_row),
+ GNUNET_PQ_query_param_end
+ };
+
+ *end_row = GNUNET_MIN (shard_limit,
+ *start_row + shard_size);
+ now = GNUNET_TIME_absolute_get ();
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Trying to claim shard %llu-%llu\n",
+ (unsigned long long) *start_row,
+ (unsigned long long) *end_row);
+ qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "create_revolving_shard",
+ params);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (pg);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (pg);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* continued below (with commit) */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* someone else got this shard already,
+ try again */
+ postgres_rollback (pg);
+ continue;
+ }
+ } /* end create fresh reovlving shard */
+ else
+ {
+ /* claim oldest existing shard */
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint32 ("start_row",
+ start_row),
+ GNUNET_PQ_result_spec_uint32 ("end_row",
+ end_row),
+ GNUNET_PQ_result_spec_end
+ };
+
+ qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "get_open_revolving_shard",
+ params,
+ rs);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (pg);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (pg);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* no open shards available */
+ postgres_rollback (pg);
+ return qs;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ {
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_uint32 (start_row),
+ GNUNET_PQ_query_param_uint32 (end_row),
+ GNUNET_PQ_query_param_end
+ };
+
+ qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "reclaim_revolving_shard",
+ params);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (pg);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (pg);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ break; /* continue with commit */
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0); /* logic error, should be impossible */
+ postgres_rollback (pg);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ }
+ break; /* continue with commit */
+ }
+ } /* end claim oldest existing shard */
+
+ /* commit */
+ {
+ enum GNUNET_DB_QueryStatus qs;
+
+ qs = postgres_commit (pg);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (pg);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (pg);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ }
+ }
+ } /* retry 'for' loop */
+ return GNUNET_DB_STATUS_SOFT_ERROR;
+}
+
+
+/**
+ * Function called to release a revolving shard
+ * back into the work pool. Clears the
+ * "completed" flag.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param job_name name of the operation to grab a word shard for
+ * @param start_row inclusive start row of the shard
+ * @param end_row exclusive end row of the shard
+ * @return transaction status code
+ */
+enum GNUNET_DB_QueryStatus
+postgres_release_revolving_shard (void *cls,
+ const char *job_name,
+ uint32_t start_row,
+ uint32_t end_row)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_uint32 (&start_row),
+ GNUNET_PQ_query_param_uint32 (&end_row),
+ GNUNET_PQ_query_param_end
+ };
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Releasing revolving shard %s %u-%u\n",
+ job_name,
+ (unsigned int) start_row,
+ (unsigned int) end_row);
+ return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "release_revolving_shard",
+ params);
+}
+
+
+/**
+ * Function called to delete all revolving shards.
+ * To be used after a crash or when the shard size is
+ * changed.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @return transaction status code
+ */
+enum GNUNET_DB_QueryStatus
+postgres_delete_revolving_shards (void *cls)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_end
+ };
+
+ return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "delete_revolving_shards",
+ params);
+}
+
+
+/**
* Initialize Postgres database subsystem.
*
* @param cls a configuration instance
@@ -10592,6 +10953,12 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
= &postgres_begin_shard;
plugin->complete_shard
= &postgres_complete_shard;
+ plugin->begin_revolving_shard
+ = &postgres_begin_revolving_shard;
+ plugin->release_revolving_shard
+ = &postgres_release_revolving_shard;
+ plugin->delete_revolving_shards
+ = &postgres_delete_revolving_shards;
return plugin;
}
diff --git a/src/exchangedb/test_exchangedb.c b/src/exchangedb/test_exchangedb.c
index 1cccb23c2..8478fac0d 100644
--- a/src/exchangedb/test_exchangedb.c
+++ b/src/exchangedb/test_exchangedb.c
@@ -804,29 +804,22 @@ static uint64_t deposit_rowid;
* @param cls closure a `struct TALER_EXCHANGEDB_Deposit *`
* @param rowid unique ID for the deposit in our DB, used for marking
* it as 'tiny' or 'done'
- * @param exchange_timestamp when did the deposit happen
- * @param wallet_timestamp when did the wallet sign the contract
* @param merchant_pub public key of the merchant
* @param coin_pub public key of the coin
* @param amount_with_fee amount that was deposited including fee
* @param deposit_fee amount the exchange gets to keep as transaction fees
* @param h_contract_terms hash of the proposal data known to merchant and customer
- * @param wire_deadline by which the merchant advised that he would like the
- * wire transfer to be executed
* @param wire wire details for the merchant
* @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
*/
static enum GNUNET_DB_QueryStatus
deposit_cb (void *cls,
uint64_t rowid,
- struct GNUNET_TIME_Absolute exchange_timestamp,
- struct GNUNET_TIME_Absolute wallet_timestamp,
const struct TALER_MerchantPublicKeyP *merchant_pub,
const struct TALER_CoinSpendPublicKeyP *coin_pub,
const struct TALER_Amount *amount_with_fee,
const struct TALER_Amount *deposit_fee,
const struct GNUNET_HashCode *h_contract_terms,
- struct GNUNET_TIME_Absolute wire_deadline,
const json_t *wire)
{
struct TALER_EXCHANGEDB_Deposit *deposit = cls;
@@ -1896,9 +1889,11 @@ run (void *cls)
&matching_deposit_cb,
&deposit,
2));
- sleep (2); /* giv deposit time to be ready */
+ sleep (2); /* give deposit time to be ready */
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->get_ready_deposit (plugin->cls,
+ 0,
+ INT32_MAX,
&deposit_cb,
&deposit));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
@@ -1911,11 +1906,15 @@ run (void *cls)
deposit_rowid));
FAILIF (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS !=
plugin->get_ready_deposit (plugin->cls,
+ 0,
+ INT32_MAX,
&deposit_cb,
&deposit));
plugin->rollback (plugin->cls);
FAILIF (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT !=
plugin->get_ready_deposit (plugin->cls,
+ 0,
+ INT32_MAX,
&deposit_cb,
&deposit));
FAILIF (GNUNET_OK !=