From 7b62deabac967c2ba94502211ffd553eda572622 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sat, 27 Oct 2018 21:27:23 +0200 Subject: split up progress points of auditor by coin/reserve/aggregation to minimize DB conflicts --- src/auditor/taler-auditor.c | 292 +++++++++++++++++++++++++++----------------- 1 file changed, 179 insertions(+), 113 deletions(-) (limited to 'src/auditor/taler-auditor.c') diff --git a/src/auditor/taler-auditor.c b/src/auditor/taler-auditor.c index 83f87ae0a..12ababc55 100644 --- a/src/auditor/taler-auditor.c +++ b/src/auditor/taler-auditor.c @@ -109,9 +109,19 @@ static struct GNUNET_TIME_Relative idle_reserve_expiration_time; static struct TALER_MasterPublicKeyP master_pub; /** - * Last reserve_in serial ID seen. + * Checkpointing our progress for reserves. */ -static struct TALER_AUDITORDB_ProgressPoint pp; +static struct TALER_AUDITORDB_ProgressPointReserve ppr; + +/** + * Checkpointing our progress for aggregations. + */ +static struct TALER_AUDITORDB_ProgressPointAggregation ppa; + +/** + * Checkpointing our progress for coins. + */ +static struct TALER_AUDITORDB_ProgressPointCoin ppc; /** * Array of reports about denomination keys with an @@ -773,8 +783,8 @@ handle_reserve_in (void *cls, enum GNUNET_DB_QueryStatus qs; /* should be monotonically increasing */ - GNUNET_assert (rowid >= pp.last_reserve_in_serial_id); - pp.last_reserve_in_serial_id = rowid + 1; + GNUNET_assert (rowid >= ppr.last_reserve_in_serial_id); + ppr.last_reserve_in_serial_id = rowid + 1; GNUNET_CRYPTO_hash (reserve_pub, sizeof (*reserve_pub), @@ -861,8 +871,8 @@ handle_reserve_out (void *cls, enum GNUNET_DB_QueryStatus qs; /* should be monotonically increasing */ - GNUNET_assert (rowid >= pp.last_reserve_out_serial_id); - pp.last_reserve_out_serial_id = rowid + 1; + GNUNET_assert (rowid >= ppr.last_reserve_out_serial_id); + ppr.last_reserve_out_serial_id = rowid + 1; /* lookup denomination pub data (make sure denom_pub is valid, establish fees) */ qs = get_denomination_info (denom_pub, @@ -1011,8 +1021,8 @@ handle_payback_by_reserve (void *cls, const char *rev; /* should be monotonically increasing */ - GNUNET_assert (rowid >= pp.last_reserve_payback_serial_id); - pp.last_reserve_payback_serial_id = rowid + 1; + GNUNET_assert (rowid >= ppr.last_reserve_payback_serial_id); + ppr.last_reserve_payback_serial_id = rowid + 1; GNUNET_CRYPTO_rsa_public_key_hash (coin->denom_pub.rsa_public_key, &pr.h_denom_pub); @@ -1202,8 +1212,8 @@ handle_reserve_closed (void *cls, enum GNUNET_DB_QueryStatus qs; /* should be monotonically increasing */ - GNUNET_assert (rowid >= pp.last_reserve_close_serial_id); - pp.last_reserve_close_serial_id = rowid + 1; + GNUNET_assert (rowid >= ppr.last_reserve_close_serial_id); + ppr.last_reserve_close_serial_id = rowid + 1; GNUNET_CRYPTO_hash (reserve_pub, sizeof (*reserve_pub), @@ -1507,9 +1517,33 @@ analyze_reserves (void *cls) struct ReserveContext rc; enum GNUNET_DB_QueryStatus qsx; enum GNUNET_DB_QueryStatus qs; + enum GNUNET_DB_QueryStatus qsp; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Analyzing reserves\n"); + qsp = adb->get_auditor_progress_reserve (adb->cls, + asession, + &master_pub, + &ppr); + if (0 > qsp) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsp); + return qsp; + } + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsp) + { + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + _("First analysis using this auditor, starting audit from scratch\n")); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Resuming reserve audit at %llu/%llu/%llu/%llu\n"), + (unsigned long long) ppr.last_reserve_in_serial_id, + (unsigned long long) ppr.last_reserve_out_serial_id, + (unsigned long long) ppr.last_reserve_payback_serial_id, + (unsigned long long) ppr.last_reserve_close_serial_id); + } rc.qs = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; qsx = adb->get_reserve_summary (adb->cls, asession, @@ -1528,7 +1562,7 @@ analyze_reserves (void *cls) qs = edb->select_reserves_in_above_serial_id (edb->cls, esession, - pp.last_reserve_in_serial_id, + ppr.last_reserve_in_serial_id, &handle_reserve_in, &rc); if (qs < 0) @@ -1538,7 +1572,7 @@ analyze_reserves (void *cls) } qs = edb->select_reserves_out_above_serial_id (edb->cls, esession, - pp.last_reserve_out_serial_id, + ppr.last_reserve_out_serial_id, &handle_reserve_out, &rc); if (qs < 0) @@ -1548,7 +1582,7 @@ analyze_reserves (void *cls) } qs = edb->select_payback_above_serial_id (edb->cls, esession, - pp.last_reserve_payback_serial_id, + ppr.last_reserve_payback_serial_id, &handle_payback_by_reserve, &rc); if (qs < 0) @@ -1558,7 +1592,7 @@ analyze_reserves (void *cls) } qs = edb->select_reserve_closed_above_serial_id (edb->cls, esession, - pp.last_reserve_close_serial_id, + ppr.last_reserve_close_serial_id, &handle_reserve_closed, &rc); if (qs < 0) @@ -1599,6 +1633,29 @@ analyze_reserves (void *cls) GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return qs; } + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsp) + qs = adb->update_auditor_progress_reserve (adb->cls, + asession, + &master_pub, + &ppr); + else + qs = adb->insert_auditor_progress_reserve (adb->cls, + asession, + &master_pub, + &ppr); + if (0 >= qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Failed to update auditor DB, not recording progress\n"); + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Concluded reserve audit step at %llu/%llu/%llu/%llu\n"), + (unsigned long long) ppr.last_reserve_in_serial_id, + (unsigned long long) ppr.last_reserve_out_serial_id, + (unsigned long long) ppr.last_reserve_payback_serial_id, + (unsigned long long) ppr.last_reserve_close_serial_id); return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } @@ -2388,8 +2445,8 @@ check_wire_out_cb (void *cls, char *method; /* should be monotonically increasing */ - GNUNET_assert (rowid >= pp.last_wire_out_serial_id); - pp.last_wire_out_serial_id = rowid + 1; + GNUNET_assert (rowid >= ppa.last_wire_out_serial_id); + ppa.last_wire_out_serial_id = rowid + 1; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Checking wire transfer %s over %s performed on %s\n", @@ -2565,9 +2622,31 @@ analyze_aggregations (void *cls) struct WireFeeInfo *wfi; enum GNUNET_DB_QueryStatus qsx; enum GNUNET_DB_QueryStatus qs; + enum GNUNET_DB_QueryStatus qsp; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Analyzing aggregations\n"); + qsp = adb->get_auditor_progress_aggregation (adb->cls, + asession, + &master_pub, + &ppa); + if (0 > qsp) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsp); + return qsp; + } + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsp) + { + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + _("First analysis using this auditor, starting audit from scratch\n")); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Resuming aggregation audit at %llu\n"), + (unsigned long long) ppa.last_wire_out_serial_id); + } + memset (&ac, 0, sizeof (ac)); @@ -2583,7 +2662,7 @@ analyze_aggregations (void *cls) ac.qs = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; qs = edb->select_wire_out_above_serial_id (edb->cls, esession, - pp.last_wire_out_serial_id, + ppa.last_wire_out_serial_id, &check_wire_out_cb, &ac); if (0 > qs) @@ -2627,6 +2706,27 @@ analyze_aggregations (void *cls) GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == ac.qs); return ac.qs; } + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsp) + qs = adb->update_auditor_progress_aggregation (adb->cls, + asession, + &master_pub, + &ppa); + else + qs = adb->insert_auditor_progress_aggregation (adb->cls, + asession, + &master_pub, + &ppa); + if (0 >= qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Failed to update auditor DB, not recording progress\n"); + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Concluded aggregation audit step at %llu\n"), + (unsigned long long) ppa.last_wire_out_serial_id); + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } @@ -2925,8 +3025,8 @@ withdraw_cb (void *cls, struct TALER_Amount value; enum GNUNET_DB_QueryStatus qs; - GNUNET_assert (rowid >= pp.last_withdraw_serial_id); /* should be monotonically increasing */ - pp.last_withdraw_serial_id = rowid + 1; + GNUNET_assert (rowid >= ppc.last_withdraw_serial_id); /* should be monotonically increasing */ + ppc.last_withdraw_serial_id = rowid + 1; qs = get_denomination_info (denom_pub, &dki, @@ -3078,8 +3178,8 @@ refresh_session_cb (void *cls, struct TALER_Amount tmp; enum GNUNET_DB_QueryStatus qs; - GNUNET_assert (rowid >= pp.last_melt_serial_id); /* should be monotonically increasing */ - pp.last_melt_serial_id = rowid + 1; + GNUNET_assert (rowid >= ppc.last_melt_serial_id); /* should be monotonically increasing */ + ppc.last_melt_serial_id = rowid + 1; qs = get_denomination_info (denom_pub, &dki, @@ -3412,8 +3512,8 @@ deposit_cb (void *cls, struct TALER_Amount tmp; enum GNUNET_DB_QueryStatus qs; - GNUNET_assert (rowid >= pp.last_deposit_serial_id); /* should be monotonically increasing */ - pp.last_deposit_serial_id = rowid + 1; + GNUNET_assert (rowid >= ppc.last_deposit_serial_id); /* should be monotonically increasing */ + ppc.last_deposit_serial_id = rowid + 1; qs = get_denomination_info (denom_pub, &dki, @@ -3562,8 +3662,8 @@ refund_cb (void *cls, struct TALER_Amount refund_fee; enum GNUNET_DB_QueryStatus qs; - GNUNET_assert (rowid >= pp.last_refund_serial_id); /* should be monotonically increasing */ - pp.last_refund_serial_id = rowid + 1; + GNUNET_assert (rowid >= ppc.last_refund_serial_id); /* should be monotonically increasing */ + ppc.last_refund_serial_id = rowid + 1; qs = get_denomination_info (denom_pub, &dki, @@ -3702,9 +3802,34 @@ analyze_coins (void *cls) struct CoinContext cc; enum GNUNET_DB_QueryStatus qs; enum GNUNET_DB_QueryStatus qsx; + enum GNUNET_DB_QueryStatus qsp; GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Analyzing coins\n"); + qsp = adb->get_auditor_progress_coin (adb->cls, + asession, + &master_pub, + &ppc); + if (0 > qsp) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsp); + return qsp; + } + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsp) + { + GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, + _("First analysis using this auditor, starting audit from scratch\n")); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Resuming coin audit at %llu/%llu/%llu/%llu\n"), + (unsigned long long) ppc.last_deposit_serial_id, + (unsigned long long) ppc.last_melt_serial_id, + (unsigned long long) ppc.last_refund_serial_id, + (unsigned long long) ppc.last_withdraw_serial_id); + } + /* setup 'cc' */ cc.qs = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; cc.denom_summaries = GNUNET_CONTAINER_multihashmap_create (256, @@ -3727,7 +3852,7 @@ analyze_coins (void *cls) if (0 > (qs = edb->select_reserves_out_above_serial_id (edb->cls, esession, - pp.last_withdraw_serial_id, + ppc.last_withdraw_serial_id, &withdraw_cb, &cc)) ) { @@ -3739,7 +3864,7 @@ analyze_coins (void *cls) if (0 > (qs = edb->select_refunds_above_serial_id (edb->cls, esession, - pp.last_refund_serial_id, + ppc.last_refund_serial_id, &refund_cb, &cc))) { @@ -3751,7 +3876,7 @@ analyze_coins (void *cls) if (0 > (qs = edb->select_refreshs_above_serial_id (edb->cls, esession, - pp.last_melt_serial_id, + ppc.last_melt_serial_id, &refresh_session_cb, &cc))) { @@ -3763,7 +3888,7 @@ analyze_coins (void *cls) if (0 > (qs = edb->select_deposits_above_serial_id (edb->cls, esession, - pp.last_deposit_serial_id, + ppc.last_deposit_serial_id, &deposit_cb, &cc))) { @@ -3805,6 +3930,30 @@ analyze_coins (void *cls) GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return qs; } + + if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsp) + qs = adb->update_auditor_progress_coin (adb->cls, + asession, + &master_pub, + &ppc); + else + qs = adb->insert_auditor_progress_coin (adb->cls, + asession, + &master_pub, + &ppc); + if (0 >= qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Failed to update auditor DB, not recording progress\n"); + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return qs; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + _("Concluded coin audit step at %llu/%llu/%llu/%llu\n"), + (unsigned long long) ppc.last_deposit_serial_id, + (unsigned long long) ppc.last_melt_serial_id, + (unsigned long long) ppc.last_refund_serial_id, + (unsigned long long) ppc.last_withdraw_serial_id); return qs; } @@ -3822,88 +3971,6 @@ typedef enum GNUNET_DB_QueryStatus (*Analysis)(void *cls); -/** - * Perform the given @a analysis incrementally, checkpointing our - * progress in the auditor DB. - * - * @param analysis analysis to run - * @param analysis_cls closure for @a analysis - * @return transaction status code - */ -static enum GNUNET_DB_QueryStatus -incremental_processing (Analysis analysis, - void *analysis_cls) -{ - enum GNUNET_DB_QueryStatus qs; - enum GNUNET_DB_QueryStatus qsx; - - qsx = adb->get_auditor_progress (adb->cls, - asession, - &master_pub, - &pp); - if (0 > qsx) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx); - return qsx; - } - if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qsx) - { - GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, - _("First analysis using this auditor, starting audit from scratch\n")); - } - else - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Resuming audit at %llu/%llu/%llu/%llu/%llu/%llu/%llu\n"), - (unsigned long long) pp.last_reserve_in_serial_id, - (unsigned long long) pp.last_reserve_out_serial_id, - (unsigned long long) pp.last_withdraw_serial_id, - (unsigned long long) pp.last_deposit_serial_id, - (unsigned long long) pp.last_melt_serial_id, - (unsigned long long) pp.last_refund_serial_id, - (unsigned long long) pp.last_wire_out_serial_id); - } - qs = analysis (analysis_cls); - if (0 > qs) - { - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Serialization issue, not recording progress\n"); - else - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Hard database error, not recording progress\n"); - return qs; - } - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx) - qs = adb->update_auditor_progress (adb->cls, - asession, - &master_pub, - &pp); - else - qs = adb->insert_auditor_progress (adb->cls, - asession, - &master_pub, - &pp); - if (0 >= qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Failed to update auditor DB, not recording progress\n"); - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - _("Concluded audit step at %llu/%llu/%llu/%llu/%llu/%llu/%llu\n\n"), - (unsigned long long) pp.last_reserve_in_serial_id, - (unsigned long long) pp.last_reserve_out_serial_id, - (unsigned long long) pp.last_withdraw_serial_id, - (unsigned long long) pp.last_deposit_serial_id, - (unsigned long long) pp.last_melt_serial_id, - (unsigned long long) pp.last_refund_serial_id, - (unsigned long long) pp.last_wire_out_serial_id); - return qs; -} - - /** * Perform the given @a analysis within a transaction scope. * Commit on success. @@ -3938,8 +4005,7 @@ transact (Analysis analysis, GNUNET_break (0); return GNUNET_SYSERR; } - qs = incremental_processing (analysis, - analysis_cls); + qs = analysis (analysis_cls); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) { qs = edb->commit (edb->cls, -- cgit v1.2.3