summaryrefslogtreecommitdiff
path: root/src/auditor
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2018-10-27 21:27:23 +0200
committerChristian Grothoff <christian@grothoff.org>2018-10-27 21:27:23 +0200
commit7b62deabac967c2ba94502211ffd553eda572622 (patch)
treed3293140c44c17ee77071062bec2bd1a49588ccd /src/auditor
parent2024ccd2f9cc3465a008a88c836be9ce26694b19 (diff)
downloadexchange-7b62deabac967c2ba94502211ffd553eda572622.tar.gz
exchange-7b62deabac967c2ba94502211ffd553eda572622.tar.bz2
exchange-7b62deabac967c2ba94502211ffd553eda572622.zip
split up progress points of auditor by coin/reserve/aggregation to minimize DB conflicts
Diffstat (limited to 'src/auditor')
-rw-r--r--src/auditor/taler-auditor.c292
1 files changed, 179 insertions, 113 deletions
diff --git a/src/auditor/taler-auditor.c b/src/auditor/taler-auditor.c
index 83f87ae0a..12ababc55 100644
--- a/src/auditor/taler-auditor.c
+++ b/src/auditor/taler-auditor.c
@@ -109,9 +109,19 @@ static struct GNUNET_TIME_Relative idle_reserve_expiration_time;
static struct TALER_MasterPublicKeyP master_pub;
/**
- * Last reserve_in serial ID seen.
+ * Checkpointing our progress for reserves.
*/
-static struct TALER_AUDITORDB_ProgressPoint pp;
+static struct TALER_AUDITORDB_ProgressPointReserve ppr;
+
+/**
+ * Checkpointing our progress for aggregations.
+ */
+static struct TALER_AUDITORDB_ProgressPointAggregation ppa;
+
+/**
+ * Checkpointing our progress for coins.
+ */
+static struct TALER_AUDITORDB_ProgressPointCoin ppc;
/**
* Array of reports about denomination keys with an
@@ -773,8 +783,8 @@ handle_reserve_in (void *cls,
enum GNUNET_DB_QueryStatus qs;
/* should be monotonically increasing */
- GNUNET_assert (rowid >= pp.last_reserve_in_serial_id);
- pp.last_reserve_in_serial_id = rowid + 1;
+ GNUNET_assert (rowid >= ppr.last_reserve_in_serial_id);
+ ppr.last_reserve_in_serial_id = rowid + 1;
GNUNET_CRYPTO_hash (reserve_pub,
sizeof (*reserve_pub),
@@ -861,8 +871,8 @@ handle_reserve_out (void *cls,
enum GNUNET_DB_QueryStatus qs;
/* should be monotonically increasing */
- GNUNET_assert (rowid >= pp.last_reserve_out_serial_id);
- pp.last_reserve_out_serial_id = rowid + 1;
+ GNUNET_assert (rowid >= ppr.last_reserve_out_serial_id);
+ ppr.last_reserve_out_serial_id = rowid + 1;
/* lookup denomination pub data (make sure denom_pub is valid, establish fees) */
qs = get_denomination_info (denom_pub,
@@ -1011,8 +1021,8 @@ handle_payback_by_reserve (void *cls,
const char *rev;
/* should be monotonically increasing */
- GNUNET_assert (rowid >= pp.last_reserve_payback_serial_id);
- pp.last_reserve_payback_serial_id = rowid + 1;
+ GNUNET_assert (rowid >= ppr.last_reserve_payback_serial_id);
+ ppr.last_reserve_payback_serial_id = rowid + 1;
GNUNET_CRYPTO_rsa_public_key_hash (coin->denom_pub.rsa_public_key,
&pr.h_denom_pub);
@@ -1202,8 +1212,8 @@ handle_reserve_closed (void *cls,
enum GNUNET_DB_QueryStatus qs;
/* should be monotonically increasing */
- GNUNET_assert (rowid >= pp.last_reserve_close_serial_id);
- pp.last_reserve_close_serial_id = rowid + 1;
+ GNUNET_assert (rowid >= ppr.last_reserve_close_serial_id);
+ ppr.last_reserve_close_serial_id = rowid + 1;
GNUNET_CRYPTO_hash (reserve_pub,
sizeof (*reserve_pub),
@@ -1507,9 +1517,33 @@ analyze_reserves (void *cls)
struct ReserveContext rc;
enum GNUNET_DB_QueryStatus qsx;
enum GNUNET_DB_QueryStatus qs;
+ enum GNUNET_DB_QueryStatus qsp;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Analyzing reserves\n");
+ qsp = adb->get_auditor_progress_reserve (adb->cls,
+ asession,
+ &master_pub,
+ &ppr);
+ if (0 > qsp)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsp);
+ return qsp;
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsp)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
+ _("First analysis using this auditor, starting audit from scratch\n"));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Resuming reserve audit at %llu/%llu/%llu/%llu\n"),
+ (unsigned long long) ppr.last_reserve_in_serial_id,
+ (unsigned long long) ppr.last_reserve_out_serial_id,
+ (unsigned long long) ppr.last_reserve_payback_serial_id,
+ (unsigned long long) ppr.last_reserve_close_serial_id);
+ }
rc.qs = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
qsx = adb->get_reserve_summary (adb->cls,
asession,
@@ -1528,7 +1562,7 @@ analyze_reserves (void *cls)
qs = edb->select_reserves_in_above_serial_id (edb->cls,
esession,
- pp.last_reserve_in_serial_id,
+ ppr.last_reserve_in_serial_id,
&handle_reserve_in,
&rc);
if (qs < 0)
@@ -1538,7 +1572,7 @@ analyze_reserves (void *cls)
}
qs = edb->select_reserves_out_above_serial_id (edb->cls,
esession,
- pp.last_reserve_out_serial_id,
+ ppr.last_reserve_out_serial_id,
&handle_reserve_out,
&rc);
if (qs < 0)
@@ -1548,7 +1582,7 @@ analyze_reserves (void *cls)
}
qs = edb->select_payback_above_serial_id (edb->cls,
esession,
- pp.last_reserve_payback_serial_id,
+ ppr.last_reserve_payback_serial_id,
&handle_payback_by_reserve,
&rc);
if (qs < 0)
@@ -1558,7 +1592,7 @@ analyze_reserves (void *cls)
}
qs = edb->select_reserve_closed_above_serial_id (edb->cls,
esession,
- pp.last_reserve_close_serial_id,
+ ppr.last_reserve_close_serial_id,
&handle_reserve_closed,
&rc);
if (qs < 0)
@@ -1599,6 +1633,29 @@ analyze_reserves (void *cls)
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return qs;
}
+ if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsp)
+ qs = adb->update_auditor_progress_reserve (adb->cls,
+ asession,
+ &master_pub,
+ &ppr);
+ else
+ qs = adb->insert_auditor_progress_reserve (adb->cls,
+ asession,
+ &master_pub,
+ &ppr);
+ if (0 >= qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Failed to update auditor DB, not recording progress\n");
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return qs;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Concluded reserve audit step at %llu/%llu/%llu/%llu\n"),
+ (unsigned long long) ppr.last_reserve_in_serial_id,
+ (unsigned long long) ppr.last_reserve_out_serial_id,
+ (unsigned long long) ppr.last_reserve_payback_serial_id,
+ (unsigned long long) ppr.last_reserve_close_serial_id);
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
@@ -2388,8 +2445,8 @@ check_wire_out_cb (void *cls,
char *method;
/* should be monotonically increasing */
- GNUNET_assert (rowid >= pp.last_wire_out_serial_id);
- pp.last_wire_out_serial_id = rowid + 1;
+ GNUNET_assert (rowid >= ppa.last_wire_out_serial_id);
+ ppa.last_wire_out_serial_id = rowid + 1;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Checking wire transfer %s over %s performed on %s\n",
@@ -2565,9 +2622,31 @@ analyze_aggregations (void *cls)
struct WireFeeInfo *wfi;
enum GNUNET_DB_QueryStatus qsx;
enum GNUNET_DB_QueryStatus qs;
+ enum GNUNET_DB_QueryStatus qsp;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Analyzing aggregations\n");
+ qsp = adb->get_auditor_progress_aggregation (adb->cls,
+ asession,
+ &master_pub,
+ &ppa);
+ if (0 > qsp)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsp);
+ return qsp;
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsp)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
+ _("First analysis using this auditor, starting audit from scratch\n"));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Resuming aggregation audit at %llu\n"),
+ (unsigned long long) ppa.last_wire_out_serial_id);
+ }
+
memset (&ac,
0,
sizeof (ac));
@@ -2583,7 +2662,7 @@ analyze_aggregations (void *cls)
ac.qs = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
qs = edb->select_wire_out_above_serial_id (edb->cls,
esession,
- pp.last_wire_out_serial_id,
+ ppa.last_wire_out_serial_id,
&check_wire_out_cb,
&ac);
if (0 > qs)
@@ -2627,6 +2706,27 @@ analyze_aggregations (void *cls)
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == ac.qs);
return ac.qs;
}
+ if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsp)
+ qs = adb->update_auditor_progress_aggregation (adb->cls,
+ asession,
+ &master_pub,
+ &ppa);
+ else
+ qs = adb->insert_auditor_progress_aggregation (adb->cls,
+ asession,
+ &master_pub,
+ &ppa);
+ if (0 >= qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Failed to update auditor DB, not recording progress\n");
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return qs;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Concluded aggregation audit step at %llu\n"),
+ (unsigned long long) ppa.last_wire_out_serial_id);
+
return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
}
@@ -2925,8 +3025,8 @@ withdraw_cb (void *cls,
struct TALER_Amount value;
enum GNUNET_DB_QueryStatus qs;
- GNUNET_assert (rowid >= pp.last_withdraw_serial_id); /* should be monotonically increasing */
- pp.last_withdraw_serial_id = rowid + 1;
+ GNUNET_assert (rowid >= ppc.last_withdraw_serial_id); /* should be monotonically increasing */
+ ppc.last_withdraw_serial_id = rowid + 1;
qs = get_denomination_info (denom_pub,
&dki,
@@ -3078,8 +3178,8 @@ refresh_session_cb (void *cls,
struct TALER_Amount tmp;
enum GNUNET_DB_QueryStatus qs;
- GNUNET_assert (rowid >= pp.last_melt_serial_id); /* should be monotonically increasing */
- pp.last_melt_serial_id = rowid + 1;
+ GNUNET_assert (rowid >= ppc.last_melt_serial_id); /* should be monotonically increasing */
+ ppc.last_melt_serial_id = rowid + 1;
qs = get_denomination_info (denom_pub,
&dki,
@@ -3412,8 +3512,8 @@ deposit_cb (void *cls,
struct TALER_Amount tmp;
enum GNUNET_DB_QueryStatus qs;
- GNUNET_assert (rowid >= pp.last_deposit_serial_id); /* should be monotonically increasing */
- pp.last_deposit_serial_id = rowid + 1;
+ GNUNET_assert (rowid >= ppc.last_deposit_serial_id); /* should be monotonically increasing */
+ ppc.last_deposit_serial_id = rowid + 1;
qs = get_denomination_info (denom_pub,
&dki,
@@ -3562,8 +3662,8 @@ refund_cb (void *cls,
struct TALER_Amount refund_fee;
enum GNUNET_DB_QueryStatus qs;
- GNUNET_assert (rowid >= pp.last_refund_serial_id); /* should be monotonically increasing */
- pp.last_refund_serial_id = rowid + 1;
+ GNUNET_assert (rowid >= ppc.last_refund_serial_id); /* should be monotonically increasing */
+ ppc.last_refund_serial_id = rowid + 1;
qs = get_denomination_info (denom_pub,
&dki,
@@ -3702,9 +3802,34 @@ analyze_coins (void *cls)
struct CoinContext cc;
enum GNUNET_DB_QueryStatus qs;
enum GNUNET_DB_QueryStatus qsx;
+ enum GNUNET_DB_QueryStatus qsp;
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Analyzing coins\n");
+ qsp = adb->get_auditor_progress_coin (adb->cls,
+ asession,
+ &master_pub,
+ &ppc);
+ if (0 > qsp)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsp);
+ return qsp;
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsp)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
+ _("First analysis using this auditor, starting audit from scratch\n"));
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Resuming coin audit at %llu/%llu/%llu/%llu\n"),
+ (unsigned long long) ppc.last_deposit_serial_id,
+ (unsigned long long) ppc.last_melt_serial_id,
+ (unsigned long long) ppc.last_refund_serial_id,
+ (unsigned long long) ppc.last_withdraw_serial_id);
+ }
+
/* setup 'cc' */
cc.qs = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
cc.denom_summaries = GNUNET_CONTAINER_multihashmap_create (256,
@@ -3727,7 +3852,7 @@ analyze_coins (void *cls)
if (0 >
(qs = edb->select_reserves_out_above_serial_id (edb->cls,
esession,
- pp.last_withdraw_serial_id,
+ ppc.last_withdraw_serial_id,
&withdraw_cb,
&cc)) )
{
@@ -3739,7 +3864,7 @@ analyze_coins (void *cls)
if (0 >
(qs = edb->select_refunds_above_serial_id (edb->cls,
esession,
- pp.last_refund_serial_id,
+ ppc.last_refund_serial_id,
&refund_cb,
&cc)))
{
@@ -3751,7 +3876,7 @@ analyze_coins (void *cls)
if (0 >
(qs = edb->select_refreshs_above_serial_id (edb->cls,
esession,
- pp.last_melt_serial_id,
+ ppc.last_melt_serial_id,
&refresh_session_cb,
&cc)))
{
@@ -3763,7 +3888,7 @@ analyze_coins (void *cls)
if (0 >
(qs = edb->select_deposits_above_serial_id (edb->cls,
esession,
- pp.last_deposit_serial_id,
+ ppc.last_deposit_serial_id,
&deposit_cb,
&cc)))
{
@@ -3805,6 +3930,30 @@ analyze_coins (void *cls)
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return qs;
}
+
+ if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsp)
+ qs = adb->update_auditor_progress_coin (adb->cls,
+ asession,
+ &master_pub,
+ &ppc);
+ else
+ qs = adb->insert_auditor_progress_coin (adb->cls,
+ asession,
+ &master_pub,
+ &ppc);
+ if (0 >= qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Failed to update auditor DB, not recording progress\n");
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return qs;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ _("Concluded coin audit step at %llu/%llu/%llu/%llu\n"),
+ (unsigned long long) ppc.last_deposit_serial_id,
+ (unsigned long long) ppc.last_melt_serial_id,
+ (unsigned long long) ppc.last_refund_serial_id,
+ (unsigned long long) ppc.last_withdraw_serial_id);
return qs;
}
@@ -3823,88 +3972,6 @@ typedef enum GNUNET_DB_QueryStatus
/**
- * Perform the given @a analysis incrementally, checkpointing our
- * progress in the auditor DB.
- *
- * @param analysis analysis to run
- * @param analysis_cls closure for @a analysis
- * @return transaction status code
- */
-static enum GNUNET_DB_QueryStatus
-incremental_processing (Analysis analysis,
- void *analysis_cls)
-{
- enum GNUNET_DB_QueryStatus qs;
- enum GNUNET_DB_QueryStatus qsx;
-
- qsx = adb->get_auditor_progress (adb->cls,
- asession,
- &master_pub,
- &pp);
- if (0 > qsx)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx);
- return qsx;
- }
- if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsx)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
- _("First analysis using this auditor, starting audit from scratch\n"));
- }
- else
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _("Resuming audit at %llu/%llu/%llu/%llu/%llu/%llu/%llu\n"),
- (unsigned long long) pp.last_reserve_in_serial_id,
- (unsigned long long) pp.last_reserve_out_serial_id,
- (unsigned long long) pp.last_withdraw_serial_id,
- (unsigned long long) pp.last_deposit_serial_id,
- (unsigned long long) pp.last_melt_serial_id,
- (unsigned long long) pp.last_refund_serial_id,
- (unsigned long long) pp.last_wire_out_serial_id);
- }
- qs = analysis (analysis_cls);
- if (0 > qs)
- {
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Serialization issue, not recording progress\n");
- else
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Hard database error, not recording progress\n");
- return qs;
- }
- if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx)
- qs = adb->update_auditor_progress (adb->cls,
- asession,
- &master_pub,
- &pp);
- else
- qs = adb->insert_auditor_progress (adb->cls,
- asession,
- &master_pub,
- &pp);
- if (0 >= qs)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Failed to update auditor DB, not recording progress\n");
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
- }
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- _("Concluded audit step at %llu/%llu/%llu/%llu/%llu/%llu/%llu\n\n"),
- (unsigned long long) pp.last_reserve_in_serial_id,
- (unsigned long long) pp.last_reserve_out_serial_id,
- (unsigned long long) pp.last_withdraw_serial_id,
- (unsigned long long) pp.last_deposit_serial_id,
- (unsigned long long) pp.last_melt_serial_id,
- (unsigned long long) pp.last_refund_serial_id,
- (unsigned long long) pp.last_wire_out_serial_id);
- return qs;
-}
-
-
-/**
* Perform the given @a analysis within a transaction scope.
* Commit on success.
*
@@ -3938,8 +4005,7 @@ transact (Analysis analysis,
GNUNET_break (0);
return GNUNET_SYSERR;
}
- qs = incremental_processing (analysis,
- analysis_cls);
+ qs = analysis (analysis_cls);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
{
qs = edb->commit (edb->cls,