From e3a0bc0d1f1d14123b56b041b475c8090d20ec1c Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Wed, 13 Jan 2021 19:47:45 +0100 Subject: fix sync issues, add rudimentary test --- src/auditor/taler-auditor-sync.c | 150 +++++++++++++++++++++++++++++++++++---- 1 file changed, 137 insertions(+), 13 deletions(-) (limited to 'src/auditor/taler-auditor-sync.c') diff --git a/src/auditor/taler-auditor-sync.c b/src/auditor/taler-auditor-sync.c index 3a57c37ba..84562c5b2 100644 --- a/src/auditor/taler-auditor-sync.c +++ b/src/auditor/taler-auditor-sync.c @@ -50,15 +50,45 @@ static unsigned int transaction_size = 512; /** * Number of records copied in this transaction. */ -static unsigned int actual_size; +static unsigned long long actual_size; -static struct Table +/** + * Terminate once synchronization is achieved. + */ +static int exit_if_synced; + + +/** + * Information we track per replicated table. + */ +struct Table { + /** + * Which table is this record about? + */ enum TALER_EXCHANGEDB_ReplicatedTable rt; + + /** + * Up to which record is the destination table synchronized. + */ uint64_t start_serial; + + /** + * Highest serial in the source table. + */ uint64_t end_serial; + + /** + * Marker for the end of the list of #tables. + */ bool end; -} tables[] = { +}; + + +/** + * Information about replicated tables. + */ +static struct Table tables[] = { { .rt = TALER_EXCHANGEDB_RT_DENOMINATIONS}, { .rt = TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS}, { .rt = TALER_EXCHANGEDB_RT_RESERVES}, @@ -94,6 +124,11 @@ struct InsertContext */ struct TALER_EXCHANGEDB_Session *ds; + /** + * Table we are replicating. + */ + struct Table *table; + /** * Set to error if insertion created an error. */ @@ -123,10 +158,32 @@ do_insert (void *cls, td); if (0 >= qs) { + switch (qs) + { + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_assert (0); + break; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to insert record into table %d: no change\n", + td->table); + break; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization error inserting record into table %d (will retry)\n", + td->table); + break; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to insert record into table %d: hard error\n", + td->table); + break; + } ctx->qs = qs; return GNUNET_SYSERR; } actual_size++; + ctx->table->start_serial = td->serial; return GNUNET_OK; } @@ -175,9 +232,17 @@ transact (struct TALER_EXCHANGEDB_Session *ss, return GNUNET_SYSERR; for (unsigned int i = 0; ! tables[i].end; i++) { - printf ("%d ", i); - fflush (stdout); - while (tables[i].start_serial < tables[i].end_serial) + struct Table *table = &tables[i]; + + if (table->start_serial == table->end_serial) + continue; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Replicating table %d from %llu to %llu\n", + i, + (unsigned long long) table->start_serial, + (unsigned long long) table->end_serial); + ctx.table = table; + while (table->start_serial < table->end_serial) { enum GNUNET_DB_QueryStatus qs; @@ -193,21 +258,32 @@ transact (struct TALER_EXCHANGEDB_Session *ss, return GNUNET_SYSERR; qs = src->lookup_records_by_table (src->cls, ss, - tables[i].rt, - tables[i].start_serial, + table->rt, + table->start_serial, &do_insert, &ctx); if (ctx.qs < 0) qs = ctx.qs; if (GNUNET_DB_STATUS_HARD_ERROR == qs) { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to lookup records from table %d: hard error\n", + i); global_ret = 3; return GNUNET_SYSERR; } if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization error looking up records from table %d (will retry)\n", + i); return GNUNET_SYSERR; /* will retry */ + } if (0 == qs) { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to lookup records from table %d: no results\n", + i); GNUNET_break (0); /* should be impossible */ global_ret = 4; return GNUNET_SYSERR; @@ -219,16 +295,26 @@ transact (struct TALER_EXCHANGEDB_Session *ss, qs = dst->commit (dst->cls, ds); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Serialization error committing transaction on table %d (will retry)\n", + i); continue; + } if (GNUNET_DB_STATUS_HARD_ERROR == qs) { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Hard error committing transaction on table %d\n", + i); global_ret = 5; return GNUNET_SYSERR; } } } /* we do not care about conflicting UPDATEs to src table, so safe to just rollback */ - printf ("\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Sync pass completed successfully with %llu updates\n", + actual_size); return GNUNET_OK; } @@ -248,18 +334,43 @@ do_sync (void *cls) sync_task = NULL; actual_size = 0; ss = src->get_session (src->cls); + if (NULL == ss) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to begin transaction with data source. Exiting\n"); + return; + } ds = dst->get_session (dst->cls); + if (NULL == ds) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to begin transaction with data destination. Exiting\n"); + return; + } if (GNUNET_OK != transact (ss, ds)) { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transaction failed, rolling back\n"); src->rollback (src->cls, ss); dst->rollback (dst->cls, ds); } if (0 != global_ret) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transaction failed permanently, exiting\n"); + return; + } + if ( (0 == actual_size) && + (exit_if_synced) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Databases are synchronized. Exiting\n"); return; + } if (actual_size < transaction_size / 2) { delay = GNUNET_TIME_STD_BACKOFF (delay); @@ -268,6 +379,10 @@ do_sync (void *cls) { delay = GNUNET_TIME_UNIT_ZERO; } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Next sync pass in %s\n", + GNUNET_STRINGS_relative_time_to_string (delay, + GNUNET_YES)); sync_task = GNUNET_SCHEDULER_add_delayed (delay, &do_sync, NULL); @@ -450,6 +565,7 @@ main (int argc, { char *src_cfgfile = NULL; char *dst_cfgfile = NULL; + char *level = GNUNET_strdup ("WARNING"); struct GNUNET_CONFIGURATION_Handle *src_cfg; struct GNUNET_CONFIGURATION_Handle *dst_cfg; const struct GNUNET_GETOPT_CommandLineOption options[] = { @@ -466,15 +582,18 @@ main (int argc, gettext_noop ( "target SIZE for a the number of records to copy in one transaction"), &transaction_size), + GNUNET_GETOPT_option_flag ( + 't', + "terminate-when-synchronized", + gettext_noop ( + "terminate as soon as the databases are synchronized"), + &exit_if_synced), GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), + GNUNET_GETOPT_option_loglevel (&level), GNUNET_GETOPT_OPTION_END }; TALER_gcrypt_init (); /* must trigger initialization manually at this point! */ - GNUNET_assert (GNUNET_OK == - GNUNET_log_setup ("taler-auditor-sync", - "WARNING", - NULL)); { int ret; @@ -486,6 +605,11 @@ main (int argc, if (GNUNET_SYSERR == ret) return 1; } + GNUNET_assert (GNUNET_OK == + GNUNET_log_setup ("taler-auditor-sync", + level, + NULL)); + GNUNET_free (level); if (0 == strcmp (src_cfgfile, dst_cfgfile)) { -- cgit v1.2.3