summaryrefslogtreecommitdiff
path: root/src/exchangedb
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchangedb')
-rw-r--r--src/exchangedb/exchange-0002.sql34
-rw-r--r--src/exchangedb/plugin_exchangedb_postgres.c297
2 files changed, 329 insertions, 2 deletions
diff --git a/src/exchangedb/exchange-0002.sql b/src/exchangedb/exchange-0002.sql
index b03a7b512..361b69b8d 100644
--- a/src/exchangedb/exchange-0002.sql
+++ b/src/exchangedb/exchange-0002.sql
@@ -1,6 +1,6 @@
--
-- This file is part of TALER
--- Copyright (C) 2020 Taler Systems SA
+-- Copyright (C) 2020-2021 Taler Systems SA
--
-- TALER is free software; you can redistribute it and/or modify it under the
-- terms of the GNU General Public License as published by the Free Software
@@ -374,5 +374,37 @@ COMMENT ON TABLE signkey_revocations
IS 'remembering which online signing keys have been revoked';
+
+CREATE TABLE IF NOT EXISTS work_shards
+ (shard_serial_id BIGSERIAL UNIQUE
+ ,last_attempt INT8 NOT NULL
+ ,start_row INT8 NOT NULL
+ ,end_row INT8 NOT NULL
+ ,completed BOOLEAN NOT NULL
+ ,job_name VARCHAR NOT NULL
+ ,PRIMARY KEY (job_name, start_row)
+ );
+CREATE INDEX IF NOT EXISTS work_shards_index
+ ON work_shards
+ (job_name
+ ,completed
+ ,last_attempt
+ );
+COMMENT ON TABLE work_shards
+ IS 'coordinates work between multiple processes working on the same job';
+COMMENT ON COLUMN work_shards.shard_serial_id
+ IS 'unique serial number identifying the shard';
+COMMENT ON COLUMN work_shards.last_attempt
+ IS 'last time a worker attempted to work on the shard';
+COMMENT ON COLUMN work_shards.completed
+ IS 'set to TRUE once the shard is finished by a worker';
+COMMENT ON COLUMN work_shards.start_row
+ IS 'row at which the shard scope starts, inclusive';
+COMMENT ON COLUMN work_shards.end_row
+ IS 'row at which the shard scope ends, exclusive';
+COMMENT ON COLUMN work_shards.job_name
+ IS 'unique name of the job the workers on this shard are performing';
+
+
-- Complete transaction
COMMIT;
diff --git a/src/exchangedb/plugin_exchangedb_postgres.c b/src/exchangedb/plugin_exchangedb_postgres.c
index 04dc03cdd..e61a1ac7a 100644
--- a/src/exchangedb/plugin_exchangedb_postgres.c
+++ b/src/exchangedb/plugin_exchangedb_postgres.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2014--2020 Taler Systems SA
+ Copyright (C) 2014--2021 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -2438,6 +2438,52 @@ postgres_get_session (void *cls)
") VALUES "
"($1, $2, $3, $4, $5, $6, $7, $8);",
8),
+
+ /* Used in #postgres_begin_shard() */
+ GNUNET_PQ_make_prepare ("get_open_shard",
+ "SELECT"
+ " start_row"
+ ",end_row"
+ " FROM work_shards"
+ " WHERE job_name=$1"
+ " AND last_attempt<$2"
+ " AND completed=FALSE"
+ " ORDER BY last_attempt ASC"
+ " LIMIT 1;",
+ 2),
+ GNUNET_PQ_make_prepare ("reclaim_shard",
+ "UPDATE work_shards"
+ " SET last_attempt=$2"
+ " WHERE job_name=$1"
+ " AND start_row=$3"
+ " AND end_row=$4",
+ 4),
+ GNUNET_PQ_make_prepare ("get_last_shard",
+ "SELECT"
+ " end_row"
+ " FROM work_shards"
+ " WHERE job_name=$1"
+ " AND completed=FALSE"
+ " ORDER BY end_row DESC"
+ " LIMIT 1;",
+ 1),
+ GNUNET_PQ_make_prepare ("claim_next_shard",
+ "INSERT INTO work_shards"
+ "(job_name"
+ ",last_attempt"
+ ",start_row"
+ ",end_row"
+ ") VALUES "
+ "($1, $2, $3, $4);",
+ 4),
+ /* Used in #postgres_complete_shard() */
+ GNUNET_PQ_make_prepare ("complete_shard",
+ "UPDATE work_shards"
+ " SET completed=TRUE"
+ " WHERE job_name=$1"
+ " AND start_row=$2"
+ " AND end_row=$3",
+ 3),
GNUNET_PQ_PREPARED_STATEMENT_END
};
@@ -10150,6 +10196,251 @@ postgres_insert_records_by_table (void *cls,
/**
+ * Function called to grab a work shard on an operation @a op. Runs in its
+ * own transaction (hence no session provided).
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param job_name name of the operation to grab a word shard for
+ * @param delay minimum age of a shard to grab
+ * @param size desired shard size
+ * @param[out] start_row inclusive start row of the shard (returned)
+ * @param[out] end_row exclusive end row of the shard (returned)
+ * @return transaction status code
+ */
+static enum GNUNET_DB_QueryStatus
+postgres_begin_shard (void *cls,
+ const char *job_name,
+ struct GNUNET_TIME_Relative delay,
+ uint64_t shard_size,
+ uint64_t *start_row,
+ uint64_t *end_row)
+{
+ struct TALER_EXCHANGEDB_Session *session;
+
+ session = postgres_get_session (cls);
+ if (NULL == session)
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ for (unsigned int retries = 0; retries<3; retries++)
+ {
+ if (GNUNET_OK !=
+ postgres_start (cls,
+ session,
+ "begin_shard"))
+ {
+ GNUNET_break (0);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+
+ {
+ struct GNUNET_TIME_Absolute past;
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_absolute_time (&past),
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint64 ("start_row",
+ start_row),
+ GNUNET_PQ_result_spec_uint64 ("end_row",
+ end_row),
+ GNUNET_PQ_result_spec_end
+ };
+
+ past = GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (),
+ delay);
+ qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
+ "get_open_shard",
+ params,
+ rs);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (cls,
+ session);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (cls,
+ session);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ {
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_uint64 (start_row),
+ GNUNET_PQ_query_param_uint64 (end_row),
+ GNUNET_PQ_query_param_end
+ };
+
+ now = GNUNET_TIME_absolute_get ();
+ qs = GNUNET_PQ_eval_prepared_non_select (session->conn,
+ "reclaim_shard",
+ params);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (cls,
+ session);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (cls,
+ session);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ goto commit;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0); /* logic error, should be impossible */
+ postgres_rollback (cls,
+ session);
+ return GNUNET_DB_STATUS_HARD_ERROR;
+ }
+ }
+ break; /* actually unreachable */
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ break; /* continued below */
+ }
+ } /* get_open_shard */
+
+ /* No open shard, find last 'end_row' */
+ {
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_end
+ };
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint64 ("end_row",
+ start_row),
+ GNUNET_PQ_result_spec_end
+ };
+
+ qs = GNUNET_PQ_eval_prepared_singleton_select (session->conn,
+ "get_last_shard",
+ params,
+ rs);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (cls,
+ session);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (cls,
+ session);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ *start_row = 0; /* base-case: no shards yet */
+ break; /* continued below */
+ }
+ *end_row = *start_row + shard_size;
+ } /* get_last_shard */
+
+ /* Claim fresh shard */
+ {
+ enum GNUNET_DB_QueryStatus qs;
+ struct GNUNET_TIME_Absolute now;
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_uint64 (start_row),
+ GNUNET_PQ_query_param_uint64 (end_row),
+ GNUNET_PQ_query_param_end
+ };
+
+ now = GNUNET_TIME_absolute_get ();
+ qs = GNUNET_PQ_eval_prepared_non_select (session->conn,
+ "claim_next_shard",
+ params);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (cls,
+ session);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (cls,
+ session);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* continued below */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_break (0);
+ postgres_rollback (cls,
+ session);
+ continue;
+ }
+ } /* claim_next_shard */
+
+ /* commit */
+commit:
+ {
+ enum GNUNET_DB_QueryStatus qs;
+
+ qs = postgres_commit (cls,
+ session);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ postgres_rollback (cls,
+ session);
+ return qs;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ postgres_rollback (cls,
+ session);
+ continue;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ }
+ }
+ } /* retry 'for' loop */
+ return GNUNET_DB_STATUS_SOFT_ERROR;
+}
+
+
+/**
+ * Function called to persist that work on a shard was completed.
+ *
+ * @param cls the @e cls of this struct with the plugin-specific state
+ * @param session a session
+ * @param job_name name of the operation to grab a word shard for
+ * @param start_row inclusive start row of the shard
+ * @param end_row exclusive end row of the shard
+ * @return transaction status code
+ */
+enum GNUNET_DB_QueryStatus
+postgres_complete_shard (void *cls,
+ struct TALER_EXCHANGEDB_Session *session,
+ const char *job_name,
+ uint64_t start_row,
+ uint64_t end_row)
+{
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_string (job_name),
+ GNUNET_PQ_query_param_uint64 (&start_row),
+ GNUNET_PQ_query_param_uint64 (&end_row),
+ GNUNET_PQ_query_param_end
+ };
+
+ (void) cls;
+ return GNUNET_PQ_eval_prepared_non_select (session->conn,
+ "complete_shard",
+ params);
+}
+
+
+/**
* Initialize Postgres database subsystem.
*
* @param cls a configuration instance
@@ -10353,6 +10644,10 @@ libtaler_plugin_exchangedb_postgres_init (void *cls)
= &postgres_lookup_records_by_table;
plugin->insert_records_by_table
= &postgres_insert_records_by_table;
+ plugin->begin_shard
+ = &postgres_begin_shard;
+ plugin->complete_shard
+ = &postgres_complete_shard;
return plugin;
}