summaryrefslogtreecommitdiff
path: root/src/exchangedb/plugin_exchangedb_postgres.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2021-09-03 19:08:02 +0200
committerChristian Grothoff <christian@grothoff.org>2021-09-03 19:08:02 +0200
commit5149af93147c54055d99af688993de3fb4c36ddf (patch)
tree8652e56bed70976309412e11c35305a352576a9b /src/exchangedb/plugin_exchangedb_postgres.c
parent6e1877b142d4819a248b01aebfdd6f337f82a509 (diff)
downloadexchange-5149af93147c54055d99af688993de3fb4c36ddf.tar.gz
exchange-5149af93147c54055d99af688993de3fb4c36ddf.tar.bz2
exchange-5149af93147c54055d99af688993de3fb4c36ddf.zip
preliminary work on supporting sharding/parallel aggregation (undertested, but tests pass again)
Diffstat (limited to 'src/exchangedb/plugin_exchangedb_postgres.c')
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c397
1 files changed, 382 insertions, 15 deletions
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index ae090baf7..70c337c57 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -874,11 +874,12 @@ prepare_statements (struct PostgresClosure *pg)
",coin_sig"
",wire"
",exchange_timestamp"
+ ",shard"
") SELECT known_coin_id, $2, $3, $4, $5, $6, "
- " $7, $8, $9, $10, $11, $12"
+ " $7, $8, $9, $10, $11, $12, $13"
" FROM known_coins"
" WHERE coin_pub=$1;",
- 12),
+ 13),
/* Fetch an existing deposit request, used to ensure idempotency
during /deposit processing. Used in #postgres_have_deposit(). */
GNUNET_PQ_make_prepare ("get_deposit",
@@ -958,13 +959,18 @@ prepare_statements (struct PostgresClosure *pg)
" FROM deposits"
" JOIN known_coins kc USING (known_coin_id)"
" JOIN denominations denom USING (denominations_serial)"
- " WHERE tiny=FALSE"
- " AND done=FALSE"
- " AND wire_deadline<=$1"
- " AND refund_deadline<$1"
- " ORDER BY wire_deadline ASC"
+ " WHERE "
+ " shard >= $2"
+ " AND shard <= $3"
+ " AND tiny=FALSE"
+ " AND done=FALSE"
+ " AND wire_deadline<=$1"
+ " AND refund_deadline<$1"
+ " ORDER BY "
+ " shard ASC"
+ " ,wire_deadline ASC"
" LIMIT 1;",
- 1),
+ 3),
/* Used in #postgres_iterate_matching_deposits() */
GNUNET_PQ_make_prepare ("deposits_iterate_matching",
"SELECT"
@@ -2399,6 +2405,18 @@ prepare_statements (struct PostgresClosure *pg)
" ORDER BY last_attempt ASC"
" LIMIT 1;",
2),
+ /* Used in #postgres_begin_revolving_shard() */
+ GNUNET_PQ_make_prepare ("get_open_revolving_shard",
+ "SELECT"
+ " start_row"
+ ",end_row"
+ " FROM revolving_work_shards"
+ " WHERE job_name=$1"
+ " AND active=FALSE"
+ " ORDER BY last_attempt ASC"
+ " LIMIT 1;",
+ 2),
+ /* Used in #postgres_begin_shard() */
GNUNET_PQ_make_prepare ("reclaim_shard",
"UPDATE work_shards"
" SET last_attempt=$2"
@@ -2406,6 +2424,16 @@ prepare_statements (struct PostgresClosure *pg)
" AND start_row=$3"
" AND end_row=$4",
4),
+ /* Used in #postgres_begin_revolving_shard() */
+ GNUNET_PQ_make_prepare ("reclaim_revolving_shard",
+ "UPDATE revolving_work_shards"
+ " SET last_attempt=$2"
+ " ,active=TRUE"
+ " WHERE job_name=$1"
+ " AND start_row=$3"
+ " AND end_row=$4",
+ 4),
+ /* Used in #postgres_begin_shard() */
GNUNET_PQ_make_prepare ("get_last_shard",
"SELECT"
" end_row"
@@ -2414,6 +2442,16 @@ prepare_statements (struct PostgresClosure *pg)
" ORDER BY end_row DESC"
" LIMIT 1;",
1),
+ /* Used in #postgres_begin_revolving_shard() */
+ GNUNET_PQ_make_prepare ("get_last_revolving_shard",
+ "SELECT"
+ " end_row"
+ " FROM revolving_work_shards"
+ " WHERE job_name=$1"
+ " ORDER BY end_row DESC"
+ " LIMIT 1;",
+ 1),
+ /* Used in #postgres_begin_shard() */
GNUNET_PQ_make_prepare ("claim_next_shard",
"INSERT INTO work_shards"
"(job_name"
@@ -2423,6 +2461,17 @@ prepare_statements (struct PostgresClosure *pg)
") VALUES "
"($1, $2, $3, $4);",
4),
+ /* Used in #postgres_claim_revolving_shard() */
+ GNUNET_PQ_make_prepare ("create_revolving_shard",
+ "INSERT INTO revolving_work_shards"
+ "(job_name"
+ ",last_attempt"
+ ",start_row"
+ ",end_row"
+ ",active"
+ ") VALUES "
+ "($1, $2, $3, $4, TRUE);",
+ 4),
/* Used in #postgres_complete_shard() */
GNUNET_PQ_make_prepare ("complete_shard",
"UPDATE work_shards"
@@ -2431,6 +2480,18 @@ prepare_statements (struct PostgresClosure *pg)
" AND start_row=$2"
" AND end_row=$3",
3),
+ /* Used in #postgres_complete_shard() */
+ GNUNET_PQ_make_prepare ("release_revolving_shard",
+ "UPDATE revolving_work_shards"
+ " SET active=FALSE"
+ " WHERE job_name=$1"
+ " AND start_row=$2"
+ " AND end_row=$3",
+ 3),
+ /* Used in #postgres_delete_revolving_shards() */
+ GNUNET_PQ_make_prepare ("delete_revolving_shards",
+ "DELETE FROM revolving_work_shards",
+ 0),
GNUNET_PQ_PREPARED_STATEMENT_END
};
@@ -4462,12 +4523,16 @@ postgres_mark_deposit_done (void *cls,
* execution time must be in the past.
*
* @param cls the @e cls of this struct with the plugin-specific state
+ * @param start_shard_row minimum shard row to select
+ * @param end_shard_row maximum shard row to select (inclusive)
* @param deposit_cb function to call for ONE such deposit
* @param deposit_cb_cls closure for @a deposit_cb
* @return transaction status code
*/
static enum GNUNET_DB_QueryStatus
postgres_get_ready_deposit (void *cls,
+ uint32_t start_shard_row,
+ uint32_t end_shard_row,
TALER_EXCHANGEDB_DepositIterator deposit_cb,
void *deposit_cb_cls)
{
@@ -4475,6 +4540,8 @@ postgres_get_ready_deposit (void *cls,
struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
struct GNUNET_PQ_QueryParam params[] = {
TALER_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_uint32 (&start_shard_row),
+ GNUNET_PQ_query_param_uint32 (&end_shard_row),
GNUNET_PQ_query_param_end
};
struct TALER_Amount amount_with_fee;
@@ -4504,6 +4571,8 @@ postgres_get_ready_deposit (void *cls,
enum GNUNET_DB_QueryStatus qs;
(void) GNUNET_TIME_round_abs (&now);
+ GNUNET_assert (start_shard_row < end_shard_row);
+ GNUNET_assert (end_shard_row <= INT32_MAX);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Finding ready deposits by deadline %s (%llu)\n",
GNUNET_STRINGS_absolute_time_to_string (now),
@@ -4901,6 +4970,35 @@ postgres_ensure_coin_known (void *cls,
/**
+ * Compute the shard number of a given @a deposit
+ *
+ * @param deposit deposit to compute shard for
+ * @return shard number
+ */
+static uint32_t
+compute_shard (const struct TALER_EXCHANGEDB_Deposit *deposit)
+{
+ uint32_t res;
+
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CRYPTO_kdf (&res,
+ sizeof (res),
+ &deposit->h_wire,
+ sizeof (deposit->h_wire),
+ &deposit->merchant_pub,
+ sizeof (deposit->merchant_pub),
+ NULL, 0));
+ /* interpret hash result as NBO for platform independence,
+ convert to HBO and map to [0..2^31-1] range */
+ res = ntohl (res);
+ if (res > INT32_MAX)
+ res += INT32_MIN;
+ GNUNET_assert (res <= INT32_MAX);
+ return res;
+}
+
+
+/**
* Insert information about deposited coin into the database.
*
* @param cls the `struct PostgresClosure` with the plugin-specific state
@@ -4914,6 +5012,7 @@ postgres_insert_deposit (void *cls,
const struct TALER_EXCHANGEDB_Deposit *deposit)
{
struct PostgresClosure *pg = cls;
+ uint32_t shard = compute_shard (deposit);
struct GNUNET_PQ_QueryParam params[] = {
GNUNET_PQ_query_param_auto_from_type (&deposit->coin.coin_pub),
TALER_PQ_query_param_amount (&deposit->amount_with_fee),
@@ -4926,9 +5025,11 @@ postgres_insert_deposit (void *cls,
GNUNET_PQ_query_param_auto_from_type (&deposit->csig),
TALER_PQ_query_param_json (deposit->receiver_wire_account),
TALER_PQ_query_param_absolute_time (&exchange_timestamp),
+ GNUNET_PQ_query_param_uint32 (&shard),
GNUNET_PQ_query_param_end
};
+ GNUNET_assert (shard <= INT32_MAX);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Inserting deposit to be executed at %s (%llu/%llu)\n",
GNUNET_STRINGS_absolute_time_to_string (deposit->wire_deadline),
@@ -6933,18 +7034,19 @@ postgres_wire_prepare_data_get (void *cls,
/**
- * Start a transaction where we transiently violate the foreign
+ * Starts a READ COMMITTED transaction where we transiently violate the foreign
* constraints on the "wire_out" table as we insert aggregations
* and only add the wire transfer out at the end.
*
* @param cls the @e cls of this struct with the plugin-specific state
* @return #GNUNET_OK on success
*/
-static int
+static enum GNUNET_GenericReturnValue
postgres_start_deferred_wire_out (void *cls)
{
struct PostgresClosure *pg = cls;
struct GNUNET_PQ_ExecuteStatement es[] = {
+ GNUNET_PQ_make_execute ("START TRANSACTION ISOLATION LEVEL READ COMMITTED"),
GNUNET_PQ_make_execute ("SET CONSTRAINTS wire_out_ref DEFERRED"),
GNUNET_PQ_EXECUTE_STATEMENT_END
};
@@ -6953,10 +7055,6 @@ postgres_start_deferred_wire_out (void *cls)
postgres_preflight (pg))
return GNUNET_SYSERR;
if (GNUNET_OK !=
- postgres_start (pg,
- "deferred wire out"))
- return GNUNET_SYSERR;
- if (GNUNET_OK !=
GNUNET_PQ_exec_statements (pg->conn,
es))
{
@@ -6966,6 +7064,7 @@ postgres_start_deferred_wire_out (void *cls)
postgres_rollback (pg);
return GNUNET_SYSERR;
}
+ pg->transaction_name = "deferred wire out";
return GNUNET_OK;
}
@@ -8041,7 +8140,7 @@ struct RecoupSerialContext
/**
* Status code, set to #GNUNET_SYSERR on hard errors.
*/
- int status;
+ enum GNUNET_GenericReturnValue status;
};
@@ -10381,6 +10480,268 @@ postgres_complete_shard (void *cls,
/**
+ * Function called to grab a revolving work shard on an operation @a op. Runs
+ * in its own transaction. Returns the oldest inactive shard.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param job_name name of the operation to grab a revolving shard for
+ * @param shard_size desired shard size
+ * @param shard_limit exclusive end of the shard range
+ * @param[out] start_row inclusive start row of the shard (returned)
+ * @param[out] end_row exclusive end row of the shard (returned)
+ * @return transaction status code
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_begin_revolving_shard (void *cls,
+ const char *job_name,
+ uint32_t shard_size,
+ uint32_t shard_limit,
+ uint32_t *start_row,
+ uint32_t *end_row)
+{
+ struct PostgresClosure *pg = cls;
+
+ GNUNET_assert (shard_limit <= 1U + (uint32_t) INT32_MAX);
+ GNUNET_assert (shard_limit > 0);
+ GNUNET_assert (shard_size > 0);
+ for (unsigned int retries = 0; retries<3; retries++)
+ {
+ if (GNUNET_OK !=
+ postgres_start (pg,
+ "begin_revolving_shard"))
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+
+ /* First, find last 'end_row' */
+ {
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint32 ("end_row",
+ start_row),
+ GNUNET_PQ_result_spec_end
+ };
+
+ qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "get_last_revolving_shard",
+ params,
+ rs);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (pg);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (pg);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ *start_row = 0; /* base-case: no shards yet */
+ break; /* continued below */
+ }
+ } /* get_last_shard */
+
+ if (*start_row < shard_limit)
+ {
+ /* Claim fresh shard */
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_uint32 (start_row),
+ GNUNET_PQ_query_param_uint32 (end_row),
+ GNUNET_PQ_query_param_end
+ };
+
+ *end_row = GNUNET_MIN (shard_limit,
+ *start_row + shard_size);
+ now = GNUNET_TIME_absolute_get ();
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Trying to claim shard %llu-%llu\n",
+ (unsigned long long) *start_row,
+ (unsigned long long) *end_row);
+ qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "create_revolving_shard",
+ params);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (pg);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (pg);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* continued below (with commit) */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* someone else got this shard already,
+ try again */
+ postgres_rollback (pg);
+ continue;
+ }
+ } /* end create fresh reovlving shard */
+ else
+ {
+ /* claim oldest existing shard */
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint32 ("start_row",
+ start_row),
+ GNUNET_PQ_result_spec_uint32 ("end_row",
+ end_row),
+ GNUNET_PQ_result_spec_end
+ };
+
+ qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "get_open_revolving_shard",
+ params,
+ rs);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (pg);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (pg);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* no open shards available */
+ postgres_rollback (pg);
+ return qs;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ {
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_TIME_Absolute now = GNUNET_TIME_absolute_get ();
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_uint32 (start_row),
+ GNUNET_PQ_query_param_uint32 (end_row),
+ GNUNET_PQ_query_param_end
+ };
+
+ qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "reclaim_revolving_shard",
+ params);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (pg);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (pg);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ break; /* continue with commit */
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0); /* logic error, should be impossible */
+ postgres_rollback (pg);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ }
+ break; /* continue with commit */
+ }
+ } /* end claim oldest existing shard */
+
+ /* commit */
+ {
+ enum GNUNET_DB_QueryStatus qs;
+
+ qs = postgres_commit (pg);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (pg);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (pg);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ }
+ }
+ } /* retry 'for' loop */
+ return GNUNET_DB_STATUS_SOFT_ERROR;
+}
+
+
+/**
+ * Function called to release a revolving shard
+ * back into the work pool. Clears the
+ * "completed" flag.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param job_name name of the operation to grab a word shard for
+ * @param start_row inclusive start row of the shard
+ * @param end_row exclusive end row of the shard
+ * @return transaction status code
+ */
+enum GNUNET_DB_QueryStatus
+postgres_release_revolving_shard (void *cls,
+ const char *job_name,
+ uint32_t start_row,
+ uint32_t end_row)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_uint32 (&start_row),
+ GNUNET_PQ_query_param_uint32 (&end_row),
+ GNUNET_PQ_query_param_end
+ };
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Releasing revolving shard %s %u-%u\n",
+ job_name,
+ (unsigned int) start_row,
+ (unsigned int) end_row);
+ return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "release_revolving_shard",
+ params);
+}
+
+
+/**
+ * Function called to delete all revolving shards.
+ * To be used after a crash or when the shard size is
+ * changed.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @return transaction status code
+ */
+enum GNUNET_DB_QueryStatus
+postgres_delete_revolving_shards (void *cls)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_end
+ };
+
+ return GNUNET_PQ_eval_prepared_non_select (pg->conn,
+ "delete_revolving_shards",
+ params);
+}
+
+
+/**
* Initialize Postgres database subsystem.
*
* @param cls a configuration instance
@@ -10592,6 +10953,12 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
= &postgres_begin_shard;
plugin->complete_shard
= &postgres_complete_shard;
+ plugin->begin_revolving_shard
+ = &postgres_begin_revolving_shard;
+ plugin->release_revolving_shard
+ = &postgres_release_revolving_shard;
+ plugin->delete_revolving_shards
+ = &postgres_delete_revolving_shards;
return plugin;
}