From b856d56d95f92eb9dedb0af49493350ea8ea2268 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Thu, 24 Mar 2022 17:33:29 +0100 Subject: rework deposits sharding, towards making aggregator faster (not necessarily done) --- src/exchange/taler-exchange-aggregator.c | 83 ++++++++++++++++++++++++-------- 1 file changed, 64 insertions(+), 19 deletions(-) (limited to 'src/exchange') diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index abab347fe..c34d47f9c 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -28,6 +28,18 @@ #include "taler_json_lib.h" #include "taler_bank_service.h" +struct AdditionalDeposit +{ + /** + * Public key of the coin. + */ + struct TALER_CoinSpendPublicKeyP coin_pub; + + /** + * Row of the deposit. + */ + uint64_t row; +}; /** * Information about one aggregation process to be executed. There is @@ -42,6 +54,11 @@ struct AggregationUnit */ struct TALER_MerchantPublicKeyP merchant_pub; + /** + * Public key of the coin. + */ + struct TALER_CoinSpendPublicKeyP coin_pub; + /** * Total amount to be transferred, before subtraction of @e fees.wire and rounding down. */ @@ -97,7 +114,8 @@ struct AggregationUnit /** * Array of row_ids from the aggregation. */ - uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT]; + struct AdditionalDeposit + additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT]; /** * Offset specifying how many @e additional_rows are in use. @@ -383,7 +401,8 @@ deposit_cb (void *cls, enum GNUNET_DB_QueryStatus qs; au->merchant_pub = *merchant_pub; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + au->coin_pub = *coin_pub; + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Aggregator processing payment %s with amount %s\n", TALER_B2S (coin_pub), TALER_amount2s (amount_with_fee)); @@ -405,7 +424,7 @@ deposit_cb (void *cls, { struct TALER_Amount ntotal; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Non-refunded transaction, subtracting deposit fee %s\n", TALER_amount2s (deposit_fee)); if (0 > @@ -428,6 +447,9 @@ deposit_cb (void *cls, au->total_amount = ntotal; } } + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Amount after fee is %s\n", + TALER_amount2s (&au->total_amount)); GNUNET_assert (NULL == au->payto_uri); au->payto_uri = GNUNET_strdup (payto_uri); @@ -437,7 +459,7 @@ deposit_cb (void *cls, GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, &au->wtid, sizeof (au->wtid)); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n", TALER_B2S (&au->wtid), TALER_amount2s (amount_with_fee), @@ -493,7 +515,7 @@ deposit_cb (void *cls, "Aggregator marks deposit %llu as done\n", (unsigned long long) row_id); qs = db_plugin->mark_deposit_done (db_plugin->cls, - merchant_pub, + coin_pub, row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { @@ -528,6 +550,8 @@ aggregate_cb (void *cls, struct TALER_Amount old; enum GNUNET_DB_QueryStatus qs; + if (row_id == au->row_id) + return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) { /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */ @@ -605,18 +629,29 @@ aggregate_cb (void *cls, } /* "append" to our list of rows */ - au->additional_rows[au->rows_offset++] = row_id; + au->additional_rows[au->rows_offset].coin_pub = *coin_pub; + au->additional_rows[au->rows_offset].row = row_id; + au->rows_offset++; /* insert into aggregation tracking table */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Adding %llu to aggregate %s\n", + (unsigned long long) row_id, + TALER_B2S (&au->wtid)); qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, &au->wtid, row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to add %llu to aggregate %s: %d\n", + (unsigned long long) row_id, + TALER_B2S (&au->wtid), + qs); GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return qs; } qs = db_plugin->mark_deposit_done (db_plugin->cls, - &au->merchant_pub, + coin_pub, row_id); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) { @@ -775,7 +810,7 @@ run_aggregation (void *cls) } /* Now try to find other deposits to aggregate */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found ready deposit for %s, aggregating by target %llu\n", TALER_B2S (&au_active.merchant_pub), (unsigned long long) au_active.wire_target); @@ -808,13 +843,17 @@ run_aggregation (void *cls) s); return; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Found %d other deposits to combine into wire transfer.\n", - qs); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found %d other deposits to combine into wire transfer with fee %s.\n", + qs, + TALER_amount2s (&au_active.fees.wire)); /* Subtract wire transfer fee and round to the unit supported by the wire transfer method; Check if after rounding down, we still have an amount to transfer, and if not mark as 'tiny'. */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Rounding aggregate of %s\n", + TALER_amount2s (&au_active.total_amount)); if ( (0 >= TALER_amount_subtract (&au_active.final_amount, &au_active.total_amount, @@ -822,8 +861,7 @@ run_aggregation (void *cls) (GNUNET_SYSERR == TALER_amount_round_down (&au_active.final_amount, ¤cy_round_unit)) || - ( (0 == au_active.final_amount.value) && - (0 == au_active.final_amount.fraction) ) ) + (TALER_amount_is_zero (&au_active.final_amount)) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Aggregate value too low for transfer (%d/%s)\n", @@ -848,23 +886,29 @@ run_aggregation (void *cls) return; } /* Mark transactions by row_id as minor */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Marking %s (%llu) as tiny\n", + TALER_B2S (&au_active.coin_pub), + (unsigned long long) au_active.row_id); qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - &au_active.merchant_pub, + &au_active.coin_pub, au_active.row_id); - if (0 <= qs) + if (0 < qs) { for (unsigned int i = 0; imark_deposit_tiny (db_plugin->cls, - &au_active.merchant_pub, - au_active.additional_rows[i]); - if (0 > qs) + &au_active.additional_rows[i]. + coin_pub, + au_active.additional_rows[i].row); + if (0 >= qs) break; } } + GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Serialization issue, trying again later!\n"); db_plugin->rollback (db_plugin->cls); cleanup_au (&au_active); @@ -876,6 +920,7 @@ run_aggregation (void *cls) } if (GNUNET_DB_STATUS_HARD_ERROR == qs) { + GNUNET_break (0); db_plugin->rollback (db_plugin->cls); cleanup_au (&au_active); global_ret = EXIT_FAILURE; -- cgit v1.2.3