summaryrefslogtreecommitdiff
path: root/src/auditor/taler-auditor-sync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/auditor/taler-auditor-sync.c')
-rw-r--r--src/auditor/taler-auditor-sync.c645
1 files changed, 645 insertions, 0 deletions
diff --git a/src/auditor/taler-auditor-sync.c b/src/auditor/taler-auditor-sync.c
new file mode 100644
index 000000000..e4022d325
--- /dev/null
+++ b/src/auditor/taler-auditor-sync.c
@@ -0,0 +1,645 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2020-2022 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+/**
+ * @file taler-auditor-sync.c
+ * @brief Tool used by the auditor to make a 'safe' copy of the exchanges' database.
+ * @author Christian Grothoff
+ */
+#include <platform.h>
+#include "taler_exchangedb_lib.h"
+
+
+/**
+ * Handle to access the exchange's source database.
+ */
+static struct TALER_EXCHANGEDB_Plugin *src;
+
+/**
+ * Handle to access the exchange's destination database.
+ */
+static struct TALER_EXCHANGEDB_Plugin *dst;
+
+/**
+ * Return value from #main().
+ */
+static int global_ret;
+
+/**
+ * Main task to do synchronization.
+ */
+static struct GNUNET_SCHEDULER_Task *sync_task;
+
+/**
+ * What is our target transaction size (number of records)?
+ */
+static unsigned int transaction_size = 512;
+
+/**
+ * Number of records copied in this transaction.
+ */
+static unsigned long long actual_size;
+
+/**
+ * Terminate once synchronization is achieved.
+ */
+static int exit_if_synced;
+
+
+/**
+ * Information we track per replicated table.
+ */
+struct Table
+{
+ /**
+ * Which table is this record about?
+ */
+ enum TALER_EXCHANGEDB_ReplicatedTable rt;
+
+ /**
+ * Up to which record is the destination table synchronized.
+ */
+ uint64_t start_serial;
+
+ /**
+ * Highest serial in the source table.
+ */
+ uint64_t end_serial;
+
+ /**
+ * Marker for the end of the list of #tables.
+ */
+ bool end;
+};
+
+
+/**
+ * Information about replicated tables.
+ */
+static struct Table tables[] = {
+ { .rt = TALER_EXCHANGEDB_RT_DENOMINATIONS},
+ { .rt = TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS},
+ { .rt = TALER_EXCHANGEDB_RT_WIRE_TARGETS},
+ { .rt = TALER_EXCHANGEDB_RT_LEGITIMIZATION_PROCESSES},
+ { .rt = TALER_EXCHANGEDB_RT_LEGITIMIZATION_REQUIREMENTS},
+ { .rt = TALER_EXCHANGEDB_RT_RESERVES},
+ { .rt = TALER_EXCHANGEDB_RT_RESERVES_IN},
+ { .rt = TALER_EXCHANGEDB_RT_RESERVES_CLOSE},
+ { .rt = TALER_EXCHANGEDB_RT_RESERVES_OPEN_REQUESTS},
+ { .rt = TALER_EXCHANGEDB_RT_RESERVES_OPEN_DEPOSITS},
+ { .rt = TALER_EXCHANGEDB_RT_RESERVES_OUT},
+ { .rt = TALER_EXCHANGEDB_RT_AUDITORS},
+ { .rt = TALER_EXCHANGEDB_RT_AUDITOR_DENOM_SIGS},
+ { .rt = TALER_EXCHANGEDB_RT_EXCHANGE_SIGN_KEYS},
+ { .rt = TALER_EXCHANGEDB_RT_SIGNKEY_REVOCATIONS},
+ { .rt = TALER_EXCHANGEDB_RT_KNOWN_COINS},
+ { .rt = TALER_EXCHANGEDB_RT_REFRESH_COMMITMENTS},
+ { .rt = TALER_EXCHANGEDB_RT_REFRESH_REVEALED_COINS},
+ { .rt = TALER_EXCHANGEDB_RT_REFRESH_TRANSFER_KEYS},
+ { .rt = TALER_EXCHANGEDB_RT_BATCH_DEPOSITS},
+ { .rt = TALER_EXCHANGEDB_RT_COIN_DEPOSITS},
+ { .rt = TALER_EXCHANGEDB_RT_REFUNDS},
+ { .rt = TALER_EXCHANGEDB_RT_WIRE_OUT},
+ { .rt = TALER_EXCHANGEDB_RT_AGGREGATION_TRACKING},
+ { .rt = TALER_EXCHANGEDB_RT_WIRE_FEE},
+ { .rt = TALER_EXCHANGEDB_RT_GLOBAL_FEE},
+ { .rt = TALER_EXCHANGEDB_RT_RECOUP},
+ { .rt = TALER_EXCHANGEDB_RT_RECOUP_REFRESH },
+ { .rt = TALER_EXCHANGEDB_RT_EXTENSIONS},
+ { .rt = TALER_EXCHANGEDB_RT_POLICY_DETAILS },
+ { .rt = TALER_EXCHANGEDB_RT_POLICY_FULFILLMENTS },
+ { .rt = TALER_EXCHANGEDB_RT_PURSE_REQUESTS},
+ { .rt = TALER_EXCHANGEDB_RT_PURSE_DECISION},
+ { .rt = TALER_EXCHANGEDB_RT_PURSE_MERGES},
+ { .rt = TALER_EXCHANGEDB_RT_PURSE_DEPOSITS},
+ { .rt = TALER_EXCHANGEDB_RT_ACCOUNT_MERGES},
+ { .rt = TALER_EXCHANGEDB_RT_HISTORY_REQUESTS},
+ { .rt = TALER_EXCHANGEDB_RT_CLOSE_REQUESTS},
+ { .rt = TALER_EXCHANGEDB_RT_WADS_OUT},
+ { .rt = TALER_EXCHANGEDB_RT_WADS_OUT_ENTRIES},
+ { .rt = TALER_EXCHANGEDB_RT_WADS_IN},
+ { .rt = TALER_EXCHANGEDB_RT_WADS_IN_ENTRIES},
+ { .rt = TALER_EXCHANGEDB_RT_PROFIT_DRAINS},
+ { .end = true }
+};
+
+
+/**
+ * Closure for #do_insert.
+ */
+struct InsertContext
+{
+ /**
+ * Table we are replicating.
+ */
+ struct Table *table;
+
+ /**
+ * Set to error if insertion created an error.
+ */
+ enum GNUNET_DB_QueryStatus qs;
+};
+
+
+/**
+ * Function called on data to replicate in the auditor's database.
+ *
+ * @param cls closure, a `struct InsertContext`
+ * @param td record from an exchange table
+ * @return #GNUNET_OK to continue to iterate,
+ * #GNUNET_SYSERR to fail with an error
+ */
+static enum GNUNET_GenericReturnValue
+do_insert (void *cls,
+ const struct TALER_EXCHANGEDB_TableData *td)
+{
+ struct InsertContext *ctx = cls;
+ enum GNUNET_DB_QueryStatus qs;
+
+ if (0 >= ctx->qs)
+ return GNUNET_SYSERR;
+ qs = dst->insert_records_by_table (dst->cls,
+ td);
+ if (0 >= qs)
+ {
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ GNUNET_assert (0);
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to insert record into table %d: no change\n",
+ td->table);
+ break;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization error inserting record into table %d (will retry)\n",
+ td->table);
+ break;
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to insert record into table %d: hard error\n",
+ td->table);
+ break;
+ }
+ ctx->qs = qs;
+ return GNUNET_SYSERR;
+ }
+ actual_size++;
+ ctx->table->start_serial = td->serial;
+ return GNUNET_OK;
+}
+
+
+/**
+ * Run one replication transaction.
+ *
+ * @return #GNUNET_OK on success, #GNUNET_SYSERR to rollback
+ */
+static enum GNUNET_GenericReturnValue
+transact (void)
+{
+ struct InsertContext ctx = {
+ .qs = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
+ };
+
+ if (0 >
+ src->start (src->cls,
+ "lookup src serials"))
+ return GNUNET_SYSERR;
+ for (unsigned int i = 0; ! tables[i].end; i++)
+ src->lookup_serial_by_table (src->cls,
+ tables[i].rt,
+ &tables[i].end_serial);
+ src->rollback (src->cls);
+ if (GNUNET_OK !=
+ dst->start (dst->cls,
+ "lookup dst serials"))
+ return GNUNET_SYSERR;
+ for (unsigned int i = 0; ! tables[i].end; i++)
+ dst->lookup_serial_by_table (dst->cls,
+ tables[i].rt,
+ &tables[i].start_serial);
+ dst->rollback (dst->cls);
+ for (unsigned int i = 0; ! tables[i].end; i++)
+ {
+ struct Table *table = &tables[i];
+
+ if (table->start_serial == table->end_serial)
+ continue;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Replicating table %d from %llu to %llu\n",
+ i,
+ (unsigned long long) table->start_serial,
+ (unsigned long long) table->end_serial);
+ ctx.table = table;
+ while (table->start_serial < table->end_serial)
+ {
+ enum GNUNET_DB_QueryStatus qs;
+
+ if (GNUNET_OK !=
+ src->start (src->cls,
+ "copy table (src)"))
+ return GNUNET_SYSERR;
+ if (GNUNET_OK !=
+ dst->start (dst->cls,
+ "copy table (dst)"))
+ return GNUNET_SYSERR;
+ qs = src->lookup_records_by_table (src->cls,
+ table->rt,
+ table->start_serial,
+ &do_insert,
+ &ctx);
+ if (ctx.qs < 0)
+ qs = ctx.qs;
+ if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to lookup records from table %d: hard error\n",
+ i);
+ global_ret = EXIT_FAILURE;
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization error looking up records from table %d (will retry)\n",
+ i);
+ return GNUNET_SYSERR; /* will retry */
+ }
+ if (0 == qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to lookup records from table %d: no results\n",
+ i);
+ GNUNET_break (0); /* should be impossible */
+ global_ret = EXIT_FAILURE;
+ return GNUNET_SYSERR;
+ }
+ if (0 == ctx.qs)
+ return GNUNET_SYSERR; /* insertion failed, maybe record existed? try again */
+ src->rollback (src->cls);
+ qs = dst->commit (dst->cls);
+ if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization error committing transaction on table %d (will retry)\n",
+ i);
+ continue;
+ }
+ if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Hard error committing transaction on table %d\n",
+ i);
+ global_ret = EXIT_FAILURE;
+ return GNUNET_SYSERR;
+ }
+ }
+ }
+ /* we do not care about conflicting UPDATEs to src table, so safe to just rollback */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Sync pass completed successfully with %llu updates\n",
+ actual_size);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Task to do the actual synchronization work.
+ *
+ * @param cls NULL, unused
+ */
+static void
+do_sync (void *cls)
+{
+ static struct GNUNET_TIME_Relative delay;
+
+ (void) cls;
+ sync_task = NULL;
+ actual_size = 0;
+ if (GNUNET_SYSERR ==
+ src->preflight (src->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to begin transaction with data source. Exiting\n");
+ return;
+ }
+ if (GNUNET_SYSERR ==
+ dst->preflight (dst->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to begin transaction with data destination. Exiting\n");
+ return;
+ }
+ if (GNUNET_OK != transact ())
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Transaction failed, rolling back\n");
+ src->rollback (src->cls);
+ dst->rollback (dst->cls);
+ }
+ if (0 != global_ret)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Transaction failed permanently, exiting\n");
+ return;
+ }
+ if ( (0 == actual_size) &&
+ (exit_if_synced) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Databases are synchronized. Exiting\n");
+ return;
+ }
+ if (actual_size < transaction_size / 2)
+ {
+ delay = GNUNET_TIME_STD_BACKOFF (delay);
+ }
+ else if (actual_size >= transaction_size)
+ {
+ delay = GNUNET_TIME_UNIT_ZERO;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Next sync pass in %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
+ sync_task = GNUNET_SCHEDULER_add_delayed (delay,
+ &do_sync,
+ NULL);
+}
+
+
+/**
+ * Set an option of type 'char *' from the command line with
+ * filename expansion a la #GNUNET_STRINGS_filename_expand().
+ *
+ * @param ctx command line processing context
+ * @param scls additional closure (will point to the `char *`,
+ * which will be allocated)
+ * @param option name of the option
+ * @param value actual value of the option (a string)
+ * @return #GNUNET_OK
+ */
+static enum GNUNET_GenericReturnValue
+set_filename (struct GNUNET_GETOPT_CommandLineProcessorContext *ctx,
+ void *scls,
+ const char *option,
+ const char *value)
+{
+ char **val = scls;
+
+ (void) ctx;
+ (void) option;
+ GNUNET_assert (NULL != value);
+ GNUNET_free (*val);
+ *val = GNUNET_STRINGS_filename_expand (value);
+ return GNUNET_OK;
+}
+
+
+/**
+ * Allow user to specify configuration file name (-s option)
+ *
+ * @param[out] fn set to the name of the configuration file
+ */
+static struct GNUNET_GETOPT_CommandLineOption
+option_cfgfile_src (char **fn)
+{
+ struct GNUNET_GETOPT_CommandLineOption clo = {
+ .shortName = 's',
+ .name = "source-configuration",
+ .argumentHelp = "FILENAME",
+ .description = gettext_noop (
+ "use configuration file FILENAME for the SOURCE database"),
+ .require_argument = 1,
+ .processor = &set_filename,
+ .scls = (void *) fn
+ };
+
+ return clo;
+}
+
+
+/**
+ * Allow user to specify configuration file name (-d option)
+ *
+ * @param[out] fn set to the name of the configuration file
+ */
+static struct GNUNET_GETOPT_CommandLineOption
+option_cfgfile_dst (char **fn)
+{
+ struct GNUNET_GETOPT_CommandLineOption clo = {
+ .shortName = 'd',
+ .name = "destination-configuration",
+ .argumentHelp = "FILENAME",
+ .description = gettext_noop (
+ "use configuration file FILENAME for the DESTINATION database"),
+ .require_argument = 1,
+ .processor = &set_filename,
+ .scls = (void *) fn
+ };
+
+ return clo;
+}
+
+
+static struct GNUNET_CONFIGURATION_Handle *
+load_config (const char *cfgfile)
+{
+ struct GNUNET_CONFIGURATION_Handle *cfg;
+
+ cfg = GNUNET_CONFIGURATION_create ();
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Loading config file: %s\n",
+ cfgfile);
+ if (GNUNET_SYSERR ==
+ GNUNET_CONFIGURATION_load (cfg,
+ cfgfile))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Malformed configuration file `%s', exit ...\n",
+ cfgfile);
+ GNUNET_CONFIGURATION_destroy (cfg);
+ return NULL;
+ }
+ return cfg;
+}
+
+
+/**
+ * Shutdown task.
+ *
+ * @param cls NULL, unused
+ */
+static void
+do_shutdown (void *cls)
+{
+ (void) cls;
+ if (NULL != sync_task)
+ {
+ GNUNET_SCHEDULER_cancel (sync_task);
+ sync_task = NULL;
+ }
+}
+
+
+/**
+ * Initial task.
+ *
+ * @param cls NULL, unused
+ */
+static void
+run (void *cls)
+{
+ (void) cls;
+
+ GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
+ NULL);
+ sync_task = GNUNET_SCHEDULER_add_now (&do_sync,
+ NULL);
+}
+
+
+/**
+ * Setup plugins in #src and #dst and #run() the main
+ * logic with those plugins.
+ */
+static void
+setup (struct GNUNET_CONFIGURATION_Handle *src_cfg,
+ struct GNUNET_CONFIGURATION_Handle *dst_cfg)
+{
+ src = TALER_EXCHANGEDB_plugin_load (src_cfg);
+ if (NULL == src)
+ {
+ global_ret = EXIT_NOTINSTALLED;
+ return;
+ }
+ dst = TALER_EXCHANGEDB_plugin_load (dst_cfg);
+ if (NULL == dst)
+ {
+ global_ret = EXIT_NOTINSTALLED;
+ TALER_EXCHANGEDB_plugin_unload (src);
+ src = NULL;
+ return;
+ }
+ GNUNET_SCHEDULER_run (&run,
+ NULL);
+ TALER_EXCHANGEDB_plugin_unload (src);
+ src = NULL;
+ TALER_EXCHANGEDB_plugin_unload (dst);
+ dst = NULL;
+}
+
+
+/**
+ * The main function of the taler-auditor-exchange tool. This tool is used
+ * to add (or remove) an exchange's master key and base URL to the auditor's
+ * database.
+ *
+ * @param argc number of arguments from the command line
+ * @param argv command line arguments
+ * @return 0 ok, non-zero on error
+ */
+int
+main (int argc,
+ char *const *argv)
+{
+ char *src_cfgfile = NULL;
+ char *dst_cfgfile = NULL;
+ char *level = GNUNET_strdup ("WARNING");
+ struct GNUNET_CONFIGURATION_Handle *src_cfg;
+ struct GNUNET_CONFIGURATION_Handle *dst_cfg;
+ const struct GNUNET_GETOPT_CommandLineOption options[] = {
+ GNUNET_GETOPT_option_mandatory (
+ option_cfgfile_src (&src_cfgfile)),
+ GNUNET_GETOPT_option_mandatory (
+ option_cfgfile_dst (&dst_cfgfile)),
+ GNUNET_GETOPT_option_help (
+ gettext_noop ("Make a safe copy of an exchange database")),
+ GNUNET_GETOPT_option_uint (
+ 'b',
+ "batch",
+ "SIZE",
+ gettext_noop (
+ "target SIZE for a the number of records to copy in one transaction"),
+ &transaction_size),
+ GNUNET_GETOPT_option_flag (
+ 't',
+ "terminate-when-synchronized",
+ gettext_noop (
+ "terminate as soon as the databases are synchronized"),
+ &exit_if_synced),
+ GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
+ GNUNET_GETOPT_option_loglevel (&level),
+ GNUNET_GETOPT_OPTION_END
+ };
+
+ TALER_OS_init ();
+ TALER_gcrypt_init (); /* must trigger initialization manually at this point! */
+ {
+ int ret;
+
+ ret = GNUNET_GETOPT_run ("taler-auditor-sync",
+ options,
+ argc, argv);
+ if (GNUNET_NO == ret)
+ return EXIT_SUCCESS;
+ if (GNUNET_SYSERR == ret)
+ return EXIT_INVALIDARGUMENT;
+ }
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_log_setup ("taler-auditor-sync",
+ level,
+ NULL));
+ GNUNET_free (level);
+ /* suppress compiler warnings... */
+ GNUNET_assert (NULL != src_cfgfile);
+ GNUNET_assert (NULL != dst_cfgfile);
+ if (0 == strcmp (src_cfgfile,
+ dst_cfgfile))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Source and destination configuration files must differ!\n");
+ return EXIT_INVALIDARGUMENT;
+ }
+ src_cfg = load_config (src_cfgfile);
+ if (NULL == src_cfg)
+ {
+ GNUNET_free (src_cfgfile);
+ GNUNET_free (dst_cfgfile);
+ return EXIT_NOTCONFIGURED;
+ }
+ dst_cfg = load_config (dst_cfgfile);
+ if (NULL == dst_cfg)
+ {
+ GNUNET_CONFIGURATION_destroy (src_cfg);
+ GNUNET_free (src_cfgfile);
+ GNUNET_free (dst_cfgfile);
+ return EXIT_NOTCONFIGURED;
+ }
+ setup (src_cfg,
+ dst_cfg);
+ GNUNET_CONFIGURATION_destroy (src_cfg);
+ GNUNET_CONFIGURATION_destroy (dst_cfg);
+ GNUNET_free (src_cfgfile);
+ GNUNET_free (dst_cfgfile);
+
+ return global_ret;
+}
+
+
+/* end of taler-auditor-sync.c */