From b9a9af3a59f3abdb09afb9d0f9e4c0d83df789b7 Mon Sep 17 00:00:00 2001 From: Christian Grothoff Date: Sun, 27 Mar 2022 13:48:25 +0200 Subject: new aggregator mega transaction logic --- src/exchange/taler-exchange-aggregator.c | 545 +++++++------------------------ 1 file changed, 119 insertions(+), 426 deletions(-) (limited to 'src/exchange/taler-exchange-aggregator.c') diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index c34d47f9c..04cf426de 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -28,18 +28,6 @@ #include "taler_json_lib.h" #include "taler_bank_service.h" -struct AdditionalDeposit -{ - /** - * Public key of the coin. - */ - struct TALER_CoinSpendPublicKeyP coin_pub; - - /** - * Row of the deposit. - */ - uint64_t row; -}; /** * Information about one aggregation process to be executed. There is @@ -54,11 +42,6 @@ struct AggregationUnit */ struct TALER_MerchantPublicKeyP merchant_pub; - /** - * Public key of the coin. - */ - struct TALER_CoinSpendPublicKeyP coin_pub; - /** * Total amount to be transferred, before subtraction of @e fees.wire and rounding down. */ @@ -79,11 +62,6 @@ struct AggregationUnit */ struct TALER_WireTransferIdentifierRawP wtid; - /** - * Row ID of the transaction that started it all. - */ - uint64_t row_id; - /** * The current time (which triggered the aggregation and * defines the wire fee). @@ -100,33 +78,12 @@ struct AggregationUnit */ struct TALER_PaytoHashP h_payto; - /** - * Serial number of the wire target. - */ - uint64_t wire_target; - /** * Exchange wire account to be used for the preparation and * eventual execution of the aggregate wire transfer. */ const struct TALER_EXCHANGEDB_AccountInfo *wa; - /** - * Array of row_ids from the aggregation. - */ - struct AdditionalDeposit - additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT]; - - /** - * Offset specifying how many @e additional_rows are in use. - */ - unsigned int rows_offset; - - /** - * Set to true if we encountered a refund during #refund_by_coin_cb. - * Used to wave the deposit fee. - */ - bool have_refund; }; @@ -340,331 +297,6 @@ parse_wirewatch_config (void) } -/** - * Callback invoked with information about refunds applicable - * to a particular coin. Subtract refunded amount(s) from - * the aggregation unit's total amount. - * - * @param cls closure with a `struct AggregationUnit *` - * @param amount_with_fee what was the refunded amount with the fee - * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop - */ -static enum GNUNET_GenericReturnValue -refund_by_coin_cb (void *cls, - const struct TALER_Amount *amount_with_fee) -{ - struct AggregationUnit *aux = cls; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator subtracts applicable refund of amount %s\n", - TALER_amount2s (amount_with_fee)); - aux->have_refund = true; - if (0 > - TALER_amount_subtract (&aux->total_amount, - &aux->total_amount, - amount_with_fee)) - { - GNUNET_break (0); - return GNUNET_SYSERR; - } - return GNUNET_OK; -} - - -/** - * Function called with details about deposits that have been made, - * with the goal of executing the corresponding wire transaction. - * - * @param cls a `struct AggregationUnit` - * @param row_id identifies database entry - * @param merchant_pub public key of the merchant - * @param coin_pub public key of the coin - * @param amount_with_fee amount that was deposited including fee - * @param deposit_fee amount the exchange gets to keep as transaction fees - * @param h_contract_terms hash of the proposal data known to merchant and customer - * @param wire_target target account for the wire transfer - * @param payto_uri URI of the target account - * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate - */ -static enum GNUNET_DB_QueryStatus -deposit_cb (void *cls, - uint64_t row_id, - const struct TALER_MerchantPublicKeyP *merchant_pub, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount_with_fee, - const struct TALER_Amount *deposit_fee, - const struct TALER_PrivateContractHashP *h_contract_terms, - uint64_t wire_target, - const char *payto_uri) -{ - struct AggregationUnit *au = cls; - enum GNUNET_DB_QueryStatus qs; - - au->merchant_pub = *merchant_pub; - au->coin_pub = *coin_pub; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator processing payment %s with amount %s\n", - TALER_B2S (coin_pub), - TALER_amount2s (amount_with_fee)); - au->row_id = row_id; - au->total_amount = *amount_with_fee; - au->have_refund = false; - qs = db_plugin->select_refunds_by_coin (db_plugin->cls, - coin_pub, - &au->merchant_pub, - h_contract_terms, - &refund_by_coin_cb, - au); - if (0 > qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - if (! au->have_refund) - { - struct TALER_Amount ntotal; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Non-refunded transaction, subtracting deposit fee %s\n", - TALER_amount2s (deposit_fee)); - if (0 > - TALER_amount_subtract (&ntotal, - amount_with_fee, - deposit_fee)) - { - /* This should never happen, issue a warning, but continue processing - with an amount of zero, least we hang here for good. */ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Fatally malformed record at row %llu over %s (deposit fee exceeds deposited value)\n", - (unsigned long long) row_id, - TALER_amount2s (amount_with_fee)); - GNUNET_assert (GNUNET_OK == - TALER_amount_set_zero (au->total_amount.currency, - &au->total_amount)); - } - else - { - au->total_amount = ntotal; - } - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Amount after fee is %s\n", - TALER_amount2s (&au->total_amount)); - - GNUNET_assert (NULL == au->payto_uri); - au->payto_uri = GNUNET_strdup (payto_uri); - TALER_payto_hash (payto_uri, - &au->h_payto); - au->wire_target = wire_target; - GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, - &au->wtid, - sizeof (au->wtid)); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n", - TALER_B2S (&au->wtid), - TALER_amount2s (amount_with_fee), - (unsigned long long) row_id); - au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (payto_uri); - if (NULL == au->wa) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "No exchange account configured for `%s', please fix your setup to continue!\n", - payto_uri); - return GNUNET_DB_STATUS_HARD_ERROR; - } - - /* make sure we have current fees */ - au->execution_time = GNUNET_TIME_timestamp_get (); - { - struct GNUNET_TIME_Timestamp start_date; - struct GNUNET_TIME_Timestamp end_date; - struct TALER_MasterSignatureP master_sig; - enum GNUNET_DB_QueryStatus qs; - - qs = db_plugin->get_wire_fee (db_plugin->cls, - au->wa->method, - au->execution_time, - &start_date, - &end_date, - &au->fees, - &master_sig); - if (0 >= qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Could not get wire fees for %s at %s. Aborting run.\n", - au->wa->method, - GNUNET_TIME_timestamp2s (au->execution_time)); - return GNUNET_DB_STATUS_HARD_ERROR; - } - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator starts aggregation for deposit %llu to %s with wire fee %s\n", - (unsigned long long) row_id, - TALER_B2S (&au->wtid), - TALER_amount2s (&au->fees.wire)); - qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, - &au->wtid, - row_id); - if (qs <= 0) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator marks deposit %llu as done\n", - (unsigned long long) row_id); - qs = db_plugin->mark_deposit_done (db_plugin->cls, - coin_pub, - row_id); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - return qs; -} - - -/** - * Function called with details about another deposit we - * can aggregate into an existing aggregation unit. - * - * @param cls a `struct AggregationUnit` - * @param row_id identifies database entry - * @param coin_pub public key of the coin - * @param amount_with_fee amount that was deposited including fee - * @param deposit_fee amount the exchange gets to keep as transaction fees - * @param h_contract_terms hash of the proposal data known to merchant and customer - * @return transaction status code - */ -static enum GNUNET_DB_QueryStatus -aggregate_cb (void *cls, - uint64_t row_id, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount_with_fee, - const struct TALER_Amount *deposit_fee, - const struct TALER_PrivateContractHashP *h_contract_terms) -{ - struct AggregationUnit *au = cls; - struct TALER_Amount old; - enum GNUNET_DB_QueryStatus qs; - - if (row_id == au->row_id) - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) - { - /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */ - GNUNET_break (0); - /* Skip this one, but keep going with the overall transaction */ - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - } - - /* add to total */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding transaction amount %s from row %llu to aggregation\n", - TALER_amount2s (amount_with_fee), - (unsigned long long) row_id); - /* save the existing total aggregate in 'old', for later */ - old = au->total_amount; - /* we begin with the total contribution of the current coin */ - au->total_amount = *amount_with_fee; - /* compute contribution of this coin (after fees) */ - au->have_refund = false; - qs = db_plugin->select_refunds_by_coin (db_plugin->cls, - coin_pub, - &au->merchant_pub, - h_contract_terms, - &refund_by_coin_cb, - au); - if (0 > qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - if (! au->have_refund) - { - struct TALER_Amount tmp; - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Subtracting deposit fee %s for non-refunded coin\n", - TALER_amount2s (deposit_fee)); - if (0 > - TALER_amount_subtract (&tmp, - &au->total_amount, - deposit_fee)) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Fatally malformed record at %llu over amount %s (deposit fee exceeds deposited value)\n", - (unsigned long long) row_id, - TALER_amount2s (&au->total_amount)); - GNUNET_assert (GNUNET_OK == - TALER_amount_set_zero (old.currency, - &au->total_amount)); - } - else - { - au->total_amount = tmp; - } - } - - /* now add the au->total_amount with the (remaining) contribution of - the current coin to the 'old' value with the current aggregate value */ - { - struct TALER_Amount tmp; - - if (0 > - TALER_amount_add (&tmp, - &au->total_amount, - &old)) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Overflow or currency incompatibility during aggregation at %llu\n", - (unsigned long long) row_id); - /* Skip this one, but keep going! */ - au->total_amount = old; - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - } - au->total_amount = tmp; - } - - /* "append" to our list of rows */ - au->additional_rows[au->rows_offset].coin_pub = *coin_pub; - au->additional_rows[au->rows_offset].row = row_id; - au->rows_offset++; - /* insert into aggregation tracking table */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding %llu to aggregate %s\n", - (unsigned long long) row_id, - TALER_B2S (&au->wtid)); - qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, - &au->wtid, - row_id); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_WARNING, - "Failed to add %llu to aggregate %s: %d\n", - (unsigned long long) row_id, - TALER_B2S (&au->wtid), - qs); - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - qs = db_plugin->mark_deposit_done (db_plugin->cls, - coin_pub, - row_id); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator marked deposit %llu as DONE\n", - (unsigned long long) row_id); - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; -} - - /** * Perform a database commit. If it fails, print a warning. * @@ -727,10 +359,17 @@ run_aggregation (void *cls) struct Shard *s = cls; struct AggregationUnit au_active; enum GNUNET_DB_QueryStatus qs; + struct TALER_Amount trans; + bool have_transient; task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Checking for ready deposits to aggregate\n"); + /* make sure we have current fees */ + memset (&au_active, + 0, + sizeof (au_active)); + au_active.execution_time = GNUNET_TIME_timestamp_get (); if (GNUNET_OK != db_plugin->start_deferred_wire_out (db_plugin->cls)) { @@ -741,16 +380,13 @@ run_aggregation (void *cls) release_shard (s); return; } - memset (&au_active, - 0, - sizeof (au_active)); qs = db_plugin->get_ready_deposit ( db_plugin->cls, s->shard_start, s->shard_end, kyc_off ? true : false, - &deposit_cb, - &au_active); + &au_active.merchant_pub, + &au_active.payto_uri); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: @@ -808,22 +444,98 @@ run_aggregation (void *cls) /* continued below */ break; } + au_active.wa = TALER_EXCHANGEDB_find_account_by_payto_uri ( + au_active.payto_uri); + if (NULL == au_active.wa) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No exchange account configured for `%s', please fix your setup to continue!\n", + au_active.payto_uri); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + } + + { + struct GNUNET_TIME_Timestamp start_date; + struct GNUNET_TIME_Timestamp end_date; + struct TALER_MasterSignatureP master_sig; + + qs = db_plugin->get_wire_fee (db_plugin->cls, + au_active.wa->method, + au_active.execution_time, + &start_date, + &end_date, + &au_active.fees, + &master_sig); + if (0 >= qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not get wire fees for %s at %s. Aborting run.\n", + au_active.wa->method, + GNUNET_TIME_timestamp2s (au_active.execution_time)); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + } + } + /* Now try to find other deposits to aggregate */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found ready deposit for %s, aggregating by target %llu\n", + "Found ready deposit for %s, aggregating by target %s\n", TALER_B2S (&au_active.merchant_pub), - (unsigned long long) au_active.wire_target); - qs = db_plugin->iterate_matching_deposits (db_plugin->cls, - &au_active.h_payto, - &au_active.merchant_pub, - &aggregate_cb, - &au_active, - TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT); + au_active.payto_uri); + TALER_payto_hash (au_active.payto_uri, + &au_active.h_payto); + + qs = db_plugin->select_aggregation_transient (db_plugin->cls, + &au_active.h_payto, + au_active.wa->section_name, + &au_active.wtid, + &trans); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to lookup transient aggregates!\n"); + cleanup_au (&au_active); + db_plugin->rollback (db_plugin->cls); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* serializiability issue, try again */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Serialization issue, trying again later!\n"); + db_plugin->rollback (db_plugin->cls); + cleanup_au (&au_active); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + s); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, + &au_active.wtid, + sizeof (au_active.wtid)); + have_transient = false; + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + have_transient = true; + break; + } + qs = db_plugin->aggregate (db_plugin->cls, + &au_active.h_payto, + &au_active.merchant_pub, + &au_active.wtid, + &au_active.total_amount); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to execute deposit iteration!\n"); + "Failed to execute aggregation!\n"); cleanup_au (&au_active); db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; @@ -844,13 +556,17 @@ run_aggregation (void *cls) return; } GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Found %d other deposits to combine into wire transfer with fee %s.\n", - qs, - TALER_amount2s (&au_active.fees.wire)); + "Aggregation total is %s.\n", + TALER_amount2s (&au_active.total_amount)); /* Subtract wire transfer fee and round to the unit supported by the wire transfer method; Check if after rounding down, we still have an amount to transfer, and if not mark as 'tiny'. */ + if (have_transient) + GNUNET_assert (0 <= + TALER_amount_add (&au_active.total_amount, + &au_active.total_amount, + &trans)); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Rounding aggregate of %s\n", TALER_amount2s (&au_active.total_amount)); @@ -867,45 +583,17 @@ run_aggregation (void *cls) "Aggregate value too low for transfer (%d/%s)\n", qs, TALER_amount2s (&au_active.final_amount)); - /* Rollback ongoing transaction, as we will not use the respective - WTID and thus need to remove the tracking data */ - db_plugin->rollback (db_plugin->cls); - - /* There were results, just the value was too low. Start another - transaction to mark all* of the selected deposits as minor! */ - if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - "aggregator mark tiny transactions")) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = EXIT_FAILURE; - cleanup_au (&au_active); - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; - } - /* Mark transactions by row_id as minor */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Marking %s (%llu) as tiny\n", - TALER_B2S (&au_active.coin_pub), - (unsigned long long) au_active.row_id); - qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - &au_active.coin_pub, - au_active.row_id); - if (0 < qs) - { - for (unsigned int i = 0; imark_deposit_tiny (db_plugin->cls, - &au_active.additional_rows[i]. - coin_pub, - au_active.additional_rows[i].row); - if (0 >= qs) - break; - } - } - GNUNET_break (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS != qs); + if (have_transient) + qs = db_plugin->update_aggregation_transient (db_plugin->cls, + &au_active.h_payto, + &au_active.wtid, + &au_active.total_amount); + else + qs = db_plugin->create_aggregation_transient (db_plugin->cls, + &au_active.h_payto, + au_active.wa->section_name, + &au_active.wtid, + &au_active.total_amount); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -962,8 +650,7 @@ run_aggregation (void *cls) buf_size); GNUNET_free (buf); } - /* Commit the WTID data to 'wire_out' to finally satisfy aggregation - table constraints */ + /* Commit the WTID data to 'wire_out' */ if (qs >= 0) qs = db_plugin->store_wire_transfer_out (db_plugin->cls, au_active.execution_time, @@ -971,6 +658,12 @@ run_aggregation (void *cls) &au_active.h_payto, au_active.wa->section_name, &au_active.final_amount); + + if ( (qs >= 0) && + have_transient) + qs = db_plugin->delete_aggregation_transient (db_plugin->cls, + &au_active.h_payto, + &au_active.wtid); cleanup_au (&au_active); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) -- cgit v1.2.3