summaryrefslogtreecommitdiff
path: root/src/auditor/taler-auditor-sync.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/auditor/taler-auditor-sync.c')
-rw-r--r--src/auditor/taler-auditor-sync.c150
1 files changed, 137 insertions, 13 deletions
diff --git a/src/auditor/taler-auditor-sync.c b/src/auditor/taler-auditor-sync.c
index 3a57c37ba..84562c5b2 100644
--- a/src/auditor/taler-auditor-sync.c
+++ b/src/auditor/taler-auditor-sync.c
@@ -50,15 +50,45 @@ static unsigned int transaction_size = 512;
/**
* Number of records copied in this transaction.
*/
-static unsigned int actual_size;
+static unsigned long long actual_size;
-static struct Table
+/**
+ * Terminate once synchronization is achieved.
+ */
+static int exit_if_synced;
+
+
+/**
+ * Information we track per replicated table.
+ */
+struct Table
{
+ /**
+ * Which table is this record about?
+ */
enum TALER_EXCHANGEDB_ReplicatedTable rt;
+
+ /**
+ * Up to which record is the destination table synchronized.
+ */
uint64_t start_serial;
+
+ /**
+ * Highest serial in the source table.
+ */
uint64_t end_serial;
+
+ /**
+ * Marker for the end of the list of #tables.
+ */
bool end;
-} tables[] = {
+};
+
+
+/**
+ * Information about replicated tables.
+ */
+static struct Table tables[] = {
{ .rt = TALER_EXCHANGEDB_RT_DENOMINATIONS},
{ .rt = TALER_EXCHANGEDB_RT_DENOMINATION_REVOCATIONS},
{ .rt = TALER_EXCHANGEDB_RT_RESERVES},
@@ -95,6 +125,11 @@ struct InsertContext
struct TALER_EXCHANGEDB_Session *ds;
/**
+ * Table we are replicating.
+ */
+ struct Table *table;
+
+ /**
* Set to error if insertion created an error.
*/
enum GNUNET_DB_QueryStatus qs;
@@ -123,10 +158,32 @@ do_insert (void *cls,
td);
if (0 >= qs)
{
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ GNUNET_assert (0);
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to insert record into table %d: no change\n",
+ td->table);
+ break;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization error inserting record into table %d (will retry)\n",
+ td->table);
+ break;
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to insert record into table %d: hard error\n",
+ td->table);
+ break;
+ }
ctx->qs = qs;
return GNUNET_SYSERR;
}
actual_size++;
+ ctx->table->start_serial = td->serial;
return GNUNET_OK;
}
@@ -175,9 +232,17 @@ transact (struct TALER_EXCHANGEDB_Session *ss,
return GNUNET_SYSERR;
for (unsigned int i = 0; ! tables[i].end; i++)
{
- printf ("%d ", i);
- fflush (stdout);
- while (tables[i].start_serial < tables[i].end_serial)
+ struct Table *table = &tables[i];
+
+ if (table->start_serial == table->end_serial)
+ continue;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Replicating table %d from %llu to %llu\n",
+ i,
+ (unsigned long long) table->start_serial,
+ (unsigned long long) table->end_serial);
+ ctx.table = table;
+ while (table->start_serial < table->end_serial)
{
enum GNUNET_DB_QueryStatus qs;
@@ -193,21 +258,32 @@ transact (struct TALER_EXCHANGEDB_Session *ss,
return GNUNET_SYSERR;
qs = src->lookup_records_by_table (src->cls,
ss,
- tables[i].rt,
- tables[i].start_serial,
+ table->rt,
+ table->start_serial,
&do_insert,
&ctx);
if (ctx.qs < 0)
qs = ctx.qs;
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to lookup records from table %d: hard error\n",
+ i);
global_ret = 3;
return GNUNET_SYSERR;
}
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization error looking up records from table %d (will retry)\n",
+ i);
return GNUNET_SYSERR; /* will retry */
+ }
if (0 == qs)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to lookup records from table %d: no results\n",
+ i);
GNUNET_break (0); /* should be impossible */
global_ret = 4;
return GNUNET_SYSERR;
@@ -219,16 +295,26 @@ transact (struct TALER_EXCHANGEDB_Session *ss,
qs = dst->commit (dst->cls,
ds);
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Serialization error committing transaction on table %d (will retry)\n",
+ i);
continue;
+ }
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Hard error committing transaction on table %d\n",
+ i);
global_ret = 5;
return GNUNET_SYSERR;
}
}
}
/* we do not care about conflicting UPDATEs to src table, so safe to just rollback */
- printf ("\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Sync pass completed successfully with %llu updates\n",
+ actual_size);
return GNUNET_OK;
}
@@ -248,18 +334,43 @@ do_sync (void *cls)
sync_task = NULL;
actual_size = 0;
ss = src->get_session (src->cls);
+ if (NULL == ss)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to begin transaction with data source. Exiting\n");
+ return;
+ }
ds = dst->get_session (dst->cls);
+ if (NULL == ds)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to begin transaction with data destination. Exiting\n");
+ return;
+ }
if (GNUNET_OK !=
transact (ss,
ds))
{
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Transaction failed, rolling back\n");
src->rollback (src->cls,
ss);
dst->rollback (dst->cls,
ds);
}
if (0 != global_ret)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Transaction failed permanently, exiting\n");
+ return;
+ }
+ if ( (0 == actual_size) &&
+ (exit_if_synced) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Databases are synchronized. Exiting\n");
return;
+ }
if (actual_size < transaction_size / 2)
{
delay = GNUNET_TIME_STD_BACKOFF (delay);
@@ -268,6 +379,10 @@ do_sync (void *cls)
{
delay = GNUNET_TIME_UNIT_ZERO;
}
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Next sync pass in %s\n",
+ GNUNET_STRINGS_relative_time_to_string (delay,
+ GNUNET_YES));
sync_task = GNUNET_SCHEDULER_add_delayed (delay,
&do_sync,
NULL);
@@ -450,6 +565,7 @@ main (int argc,
{
char *src_cfgfile = NULL;
char *dst_cfgfile = NULL;
+ char *level = GNUNET_strdup ("WARNING");
struct GNUNET_CONFIGURATION_Handle *src_cfg;
struct GNUNET_CONFIGURATION_Handle *dst_cfg;
const struct GNUNET_GETOPT_CommandLineOption options[] = {
@@ -466,15 +582,18 @@ main (int argc,
gettext_noop (
"target SIZE for a the number of records to copy in one transaction"),
&transaction_size),
+ GNUNET_GETOPT_option_flag (
+ 't',
+ "terminate-when-synchronized",
+ gettext_noop (
+ "terminate as soon as the databases are synchronized"),
+ &exit_if_synced),
GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
+ GNUNET_GETOPT_option_loglevel (&level),
GNUNET_GETOPT_OPTION_END
};
TALER_gcrypt_init (); /* must trigger initialization manually at this point! */
- GNUNET_assert (GNUNET_OK ==
- GNUNET_log_setup ("taler-auditor-sync",
- "WARNING",
- NULL));
{
int ret;
@@ -486,6 +605,11 @@ main (int argc,
if (GNUNET_SYSERR == ret)
return 1;
}
+ GNUNET_assert (GNUNET_OK ==
+ GNUNET_log_setup ("taler-auditor-sync",
+ level,
+ NULL));
+ GNUNET_free (level);
if (0 == strcmp (src_cfgfile,
dst_cfgfile))
{