diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 1423 |
1 files changed, 902 insertions, 521 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index c82b66669..691d65ae3 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2016-2021 Taler Systems SA + Copyright (C) 2016-2022 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -26,7 +26,9 @@ #include "taler_exchangedb_lib.h" #include "taler_exchangedb_plugin.h" #include "taler_json_lib.h" +#include "taler_kyclogic_lib.h" #include "taler_bank_service.h" +#include "taler_dbevents.h" /** @@ -43,7 +45,13 @@ struct AggregationUnit struct TALER_MerchantPublicKeyP merchant_pub; /** - * Total amount to be transferred, before subtraction of @e wire_fee and rounding down. + * Transient amount already found aggregated, + * set only if @e have_transient is true. + */ + struct TALER_Amount trans; + + /** + * Total amount to be transferred, before subtraction of @e fees.wire and rounding down. */ struct TALER_Amount total_amount; @@ -55,12 +63,7 @@ struct AggregationUnit /** * Wire fee we charge for @e wp at @e execution_time. */ - struct TALER_Amount wire_fee; - - /** - * Hash of @e wire. - */ - struct GNUNET_HashCode h_wire; + struct TALER_WireFeeSet fees; /** * Wire transfer identifier we use. @@ -68,20 +71,20 @@ struct AggregationUnit struct TALER_WireTransferIdentifierRawP wtid; /** - * Row ID of the transaction that started it all. - */ - uint64_t row_id; - - /** * The current time (which triggered the aggregation and * defines the wire fee). */ - struct GNUNET_TIME_Absolute execution_time; + struct GNUNET_TIME_Timestamp execution_time; /** * Wire details of the merchant. */ - json_t *wire; + char *payto_uri; + + /** + * Selected wire target for the aggregation. + */ + struct TALER_PaytoHashP h_payto; /** * Exchange wire account to be used for the preparation and @@ -90,25 +93,53 @@ struct AggregationUnit const struct TALER_EXCHANGEDB_AccountInfo *wa; /** - * Array of row_ids from the aggregation. + * Row in KYC table for legitimization requirements + * that are pending for this aggregation, or 0 if none. */ - uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT]; + uint64_t requirement_row; /** - * Offset specifying how many @e additional_rows are in use. + * Set to #GNUNET_OK during transient checking + * while everything is OK. Otherwise see return + * value of #do_aggregate(). */ - unsigned int rows_offset; + enum GNUNET_GenericReturnValue ret; /** - * Set to #GNUNET_YES if we have to abort due to failure. + * Do we have an entry in the transient table for + * this aggregation? */ - int failed; + bool have_transient; + +}; + + +/** + * Work shard we are processing. + */ +struct Shard +{ + + /** + * When did we start processing the shard? + */ + struct GNUNET_TIME_Timestamp start_time; + + /** + * Starting row of the shard. + */ + uint32_t shard_start; + + /** + * Inclusive end row of the shard. + */ + uint32_t shard_end; /** - * Set to #GNUNET_YES if we encountered a refund during #refund_by_coin_cb. - * Used to wave the deposit fee. + * Number of starting points found in the shard. */ - int have_refund; + uint64_t work_counter; + }; @@ -119,13 +150,26 @@ struct AggregationUnit static struct TALER_Amount currency_round_unit; /** + * What is the largest amount we transfer before triggering + * an AML check? + */ +static struct TALER_Amount aml_threshold; + +/** * What is the base URL of this exchange? Used in the - * wire transfer subjects to that merchants and governments + * wire transfer subjects so that merchants and governments * can ask for the list of aggregated deposits. */ static char *exchange_base_url; /** + * Set to #GNUNET_YES if this exchange does not support KYC checks + * and thus deposits are to be aggregated regardless of the + * KYC status of the target account. + */ +static int kyc_off; + +/** * The exchange's configuration. */ static const struct GNUNET_CONFIGURATION_Handle *cfg; @@ -146,6 +190,13 @@ static struct GNUNET_SCHEDULER_Task *task; static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval; /** + * How big are the shards we are processing? Is an inclusive offset, so every + * shard ranges from [X,X+shard_size) exclusive. So a shard covers + * shard_size slots. The maximum value for shard_size is INT32_MAX+1. + */ +static uint32_t shard_size; + +/** * Value to return from main(). 0 on success, non-zero on errors. */ static int global_ret; @@ -160,13 +211,22 @@ static int test_mode; * Main work function that queries the DB and aggregates transactions * into larger wire transfers. * - * @param cls NULL + * @param cls a `struct Shard *` */ static void run_aggregation (void *cls); /** + * Work on transactions unlocked by KYC. + * + * @param cls NULL + */ +static void +drain_kyc_alerts (void *cls); + + +/** * Free data stored in @a au, but not @a au itself (stack allocated). * * @param au aggregation unit to clean up @@ -175,8 +235,7 @@ static void cleanup_au (struct AggregationUnit *au) { GNUNET_assert (NULL != au); - if (NULL != au->wire) - json_decref (au->wire); + GNUNET_free (au->payto_uri); memset (au, 0, sizeof (*au)); @@ -199,6 +258,7 @@ shutdown_task (void *cls) GNUNET_SCHEDULER_cancel (task); task = NULL; } + TALER_KYCLOGIC_kyc_done (); TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; TALER_EXCHANGEDB_unload_accounts (); @@ -207,12 +267,12 @@ shutdown_task (void *cls) /** - * Parse the configuration for wirewatch. + * Parse the configuration for aggregator. * * @return #GNUNET_OK on success */ -static int -parse_wirewatch_config (void) +static enum GNUNET_GenericReturnValue +parse_aggregator_config (void) { if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, @@ -241,11 +301,20 @@ parse_wirewatch_config (void) "taler", "CURRENCY_ROUND_UNIT", ¤cy_round_unit)) || - ( (0 != currency_round_unit.fraction) && - (0 != currency_round_unit.value) ) ) + (TALER_amount_is_zero (¤cy_round_unit)) ) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Need non-zero value in section `TALER' under `CURRENCY_ROUND_UNIT'\n"); + "Need non-zero amount in section `taler' under `CURRENCY_ROUND_UNIT'\n"); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + TALER_config_get_amount (cfg, + "exchange", + "AML_THRESHOLD", + &aml_threshold)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Need amount in section `exchange' under `AML_THRESHOLD'\n"); return GNUNET_SYSERR; } @@ -271,379 +340,799 @@ parse_wirewatch_config (void) /** - * Callback invoked with information about refunds applicable - * to a particular coin. Subtract refunded amount(s) from - * the aggregation unit's total amount. + * Perform a database commit. If it fails, print a warning. * - * @param cls closure with a `struct AggregationUnit *` - * @param amount_with_fee what was the refunded amount with the fee - * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop + * @return status of commit */ -static int -refund_by_coin_cb (void *cls, - const struct TALER_Amount *amount_with_fee) +static enum GNUNET_DB_QueryStatus +commit_or_warn (void) { - struct AggregationUnit *aux = cls; + enum GNUNET_DB_QueryStatus qs; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator subtracts applicable refund of amount %s\n", - TALER_amount2s (amount_with_fee)); - aux->have_refund = GNUNET_YES; - if (0 > - TALER_amount_subtract (&aux->total_amount, - &aux->total_amount, - amount_with_fee)) + qs = db_plugin->commit (db_plugin->cls); + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + return qs; + GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs) + ? GNUNET_ERROR_TYPE_INFO + : GNUNET_ERROR_TYPE_ERROR, + "Failed to commit database transaction!\n"); + return qs; +} + + +/** + * Release lock on shard @a s in the database. + * On error, terminates this process. + * + * @param[in] s shard to free (and memory to release) + */ +static void +release_shard (struct Shard *s) +{ + enum GNUNET_DB_QueryStatus qs; + + qs = db_plugin->release_revolving_shard ( + db_plugin->cls, + "aggregator", + s->shard_start, + s->shard_end); + GNUNET_free (s); + switch (qs) { + case GNUNET_DB_STATUS_HARD_ERROR: + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); GNUNET_break (0); - return GNUNET_SYSERR; + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* Strange, but let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + break; } - return GNUNET_OK; } /** - * Function called with details about deposits that have been made, - * with the goal of executing the corresponding wire transaction. + * Trigger the wire transfer for the @a au_active + * and delete the record of the aggregation. * - * @param cls a `struct AggregationUnit` - * @param row_id identifies database entry - * @param exchange_timestamp when did the deposit happen - * @param wallet_timestamp when did the contract happen - * @param merchant_pub public key of the merchant - * @param coin_pub public key of the coin - * @param amount_with_fee amount that was deposited including fee - * @param deposit_fee amount the exchange gets to keep as transaction fees - * @param h_contract_terms hash of the proposal data known to merchant and customer - * @param wire_deadline by which the merchant advised that he would like the - * wire transfer to be executed - * @param wire wire details for the merchant - * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate + * @param au_active information about the aggregation */ static enum GNUNET_DB_QueryStatus -deposit_cb (void *cls, - uint64_t row_id, - struct GNUNET_TIME_Absolute exchange_timestamp, - struct GNUNET_TIME_Absolute wallet_timestamp, - const struct TALER_MerchantPublicKeyP *merchant_pub, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount_with_fee, - const struct TALER_Amount *deposit_fee, - const struct GNUNET_HashCode *h_contract_terms, - struct GNUNET_TIME_Absolute wire_deadline, - const json_t *wire) +trigger_wire_transfer (const struct AggregationUnit *au_active) { - struct AggregationUnit *au = cls; enum GNUNET_DB_QueryStatus qs; - (void) cls; - /* NOTE: potential optimization: use custom SQL API to not - fetch this one: */ - (void) wire_deadline; /* already checked by SQL query */ - (void) exchange_timestamp; - (void) wallet_timestamp; - au->merchant_pub = *merchant_pub; GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Aggregator processing payment %s with amount %s\n", - TALER_B2S (coin_pub), - TALER_amount2s (amount_with_fee)); - au->row_id = row_id; - au->total_amount = *amount_with_fee; - au->have_refund = GNUNET_NO; - qs = db_plugin->select_refunds_by_coin (db_plugin->cls, - coin_pub, - &au->merchant_pub, - h_contract_terms, - &refund_by_coin_cb, - au); - if (0 > qs) + "Preparing wire transfer of %s to %s\n", + TALER_amount2s (&au_active->final_amount), + TALER_B2S (&au_active->merchant_pub)); { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; + void *buf; + size_t buf_size; + + TALER_BANK_prepare_transfer (au_active->payto_uri, + &au_active->final_amount, + exchange_base_url, + &au_active->wtid, + &buf, + &buf_size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Storing %u bytes of wire prepare data\n", + (unsigned int) buf_size); + /* Commit our intention to execute the wire transfer! */ + qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, + au_active->wa->method, + buf, + buf_size); + GNUNET_free (buf); } - if (GNUNET_NO == au->have_refund) + /* Commit the WTID data to 'wire_out' */ + if (qs >= 0) + qs = db_plugin->store_wire_transfer_out (db_plugin->cls, + au_active->execution_time, + &au_active->wtid, + &au_active->h_payto, + au_active->wa->section_name, + &au_active->final_amount); + + if ( (qs >= 0) && + au_active->have_transient) + qs = db_plugin->delete_aggregation_transient (db_plugin->cls, + &au_active->h_payto, + &au_active->wtid); + return qs; +} + + +/** + * Callback to return all applicable amounts for the KYC + * decision to @ a cb. + * + * @param cls a `struct AggregationUnit *` + * @param limit time limit for the iteration + * @param cb function to call with the amounts + * @param cb_cls closure for @a cb + */ +static void +return_relevant_amounts (void *cls, + struct GNUNET_TIME_Absolute limit, + TALER_EXCHANGEDB_KycAmountCallback cb, + void *cb_cls) +{ + const struct AggregationUnit *au_active = cls; + enum GNUNET_DB_QueryStatus qs; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Returning amount %s in KYC check\n", + TALER_amount2s (&au_active->total_amount)); + if (GNUNET_OK != + cb (cb_cls, + &au_active->total_amount, + GNUNET_TIME_absolute_get ())) + return; + qs = db_plugin->select_aggregation_amounts_for_kyc_check ( + db_plugin->cls, + &au_active->h_payto, + limit, + cb, + cb_cls); + if (GNUNET_DB_STATUS_HARD_ERROR == qs) { - struct TALER_Amount ntotal; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to select aggregation amounts for KYC limit check!\n"); + } +} + + +/** + * Test if KYC is required for a transfer to @a h_payto. + * + * @param[in,out] au_active aggregation unit to check for + * @return true if KYC checks are satisfied + */ +static bool +kyc_satisfied (struct AggregationUnit *au_active) +{ + char *requirement; + enum GNUNET_DB_QueryStatus qs; + if (kyc_off) + return true; + qs = TALER_KYCLOGIC_kyc_test_required ( + TALER_KYCLOGIC_KYC_TRIGGER_DEPOSIT, + &au_active->h_payto, + db_plugin->select_satisfied_kyc_processes, + db_plugin->cls, + &return_relevant_amounts, + (void *) au_active, + &requirement); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + if (NULL == requirement) + return true; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "KYC requirement for %s is %s\n", + TALER_amount2s (&au_active->total_amount), + requirement); + qs = db_plugin->insert_kyc_requirement_for_account ( + db_plugin->cls, + requirement, + &au_active->h_payto, + NULL, /* not a reserve */ + &au_active->requirement_row); + if (qs < 0) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to persist KYC requirement `%s' in DB!\n", + requirement); + } + else + { GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Non-refunded transaction, subtracting deposit fee %s\n", - TALER_amount2s (deposit_fee)); - if (0 > - TALER_amount_subtract (&ntotal, - amount_with_fee, - deposit_fee)) - { - /* This should never happen, issue a warning, but continue processing - with an amount of zero, least we hang here for good. */ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Fatally malformed record at row %llu over %s (deposit fee exceeds deposited value)\n", - (unsigned long long) row_id, - TALER_amount2s (amount_with_fee)); - GNUNET_assert (GNUNET_OK == - TALER_amount_set_zero (au->total_amount.currency, - &au->total_amount)); - } - else - { - au->total_amount = ntotal; - } + "Legitimization process %llu started\n", + (unsigned long long) au_active->requirement_row); } + GNUNET_free (requirement); + return false; +} + + +/** + * Function called on each @a amount that was found to + * be relevant for an AML check. + * + * @param cls closure with the `struct TALER_Amount *` where we store the sum + * @param amount encountered transaction amount + * @param date when was the amount encountered + * @return #GNUNET_OK to continue to iterate, + * #GNUNET_NO to abort iteration + * #GNUNET_SYSERR on internal error (also abort itaration) + */ +static enum GNUNET_GenericReturnValue +sum_for_aml ( + void *cls, + const struct TALER_Amount *amount, + struct GNUNET_TIME_Absolute date) +{ + struct TALER_Amount *sum = cls; - GNUNET_assert (NULL == au->wire); - if (NULL == (au->wire = json_incref ((json_t *) wire))) + (void) date; + if (0 > + TALER_amount_add (sum, + sum, + amount)) { GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; + return GNUNET_SYSERR; } - if (GNUNET_OK != - TALER_JSON_merchant_wire_signature_hash (wire, - &au->h_wire)) + return GNUNET_OK; +} + + +/** + * Test if AML is required for a transfer to @a h_payto. + * + * @param[in,out] au_active aggregation unit to check for + * @return true if AML checks are satisfied + */ +static bool +aml_satisfied (struct AggregationUnit *au_active) +{ + enum GNUNET_DB_QueryStatus qs; + struct TALER_Amount total; + struct TALER_Amount threshold; + enum TALER_AmlDecisionState decision; + struct TALER_EXCHANGEDB_KycStatus kyc; + + total = au_active->final_amount; + qs = db_plugin->select_aggregation_amounts_for_kyc_check ( + db_plugin->cls, + &au_active->h_payto, + GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), + GNUNET_TIME_UNIT_MONTHS), + &sum_for_aml, + &total); + if (qs < 0) { - GNUNET_break (0); - json_decref (au->wire); - au->wire = NULL; - return GNUNET_DB_STATUS_HARD_ERROR; + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; } - GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, - &au->wtid, - sizeof (au->wtid)); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n", - TALER_B2S (&au->wtid), - TALER_amount2s (amount_with_fee), - (unsigned long long) row_id); + qs = db_plugin->select_aml_threshold (db_plugin->cls, + &au_active->h_payto, + &decision, + &kyc, + &threshold); + if (qs < 0) { - char *url; - - url = TALER_JSON_wire_to_payto (au->wire); - au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (url); - if (NULL == au->wa) + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + { + threshold = aml_threshold; /* use default */ + decision = TALER_AML_NORMAL; + } + switch (decision) + { + case TALER_AML_NORMAL: + if (0 >= TALER_amount_cmp (&total, + &threshold)) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "No exchange account configured for `%s', please fix your setup to continue!\n", - url); - GNUNET_free (url); - return GNUNET_DB_STATUS_HARD_ERROR; + /* total <= threshold, do nothing */ + return true; + } + qs = db_plugin->trigger_aml_process (db_plugin->cls, + &au_active->h_payto, + &total); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; } - GNUNET_free (url); + return false; + case TALER_AML_PENDING: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "AML already pending, doing nothing\n"); + return false; + case TALER_AML_FROZEN: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Account frozen, doing nothing\n"); + return false; + } + GNUNET_assert (0); + return false; +} + + +/** + * Perform the main aggregation work for @a au. Expects to be in + * a working transaction, which the caller must also ultimately commit + * (or rollback) depending on our return value. + * + * @param[in,out] au aggregation unit to work on + * @return #GNUNET_OK if aggregation succeeded, + * #GNUNET_NO to rollback and try again (serialization issue) + * #GNUNET_SYSERR hard error, terminate aggregator process + */ +static enum GNUNET_GenericReturnValue +do_aggregate (struct AggregationUnit *au) +{ + enum GNUNET_DB_QueryStatus qs; + + au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri ( + au->payto_uri); + if (NULL == au->wa) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No exchange account configured for `%s', please fix your setup to continue!\n", + au->payto_uri); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; } - /* make sure we have current fees */ - au->execution_time = GNUNET_TIME_absolute_get (); - (void) GNUNET_TIME_round_abs (&au->execution_time); { - struct TALER_Amount closing_fee; - struct GNUNET_TIME_Absolute start_date; - struct GNUNET_TIME_Absolute end_date; + struct GNUNET_TIME_Timestamp start_date; + struct GNUNET_TIME_Timestamp end_date; struct TALER_MasterSignatureP master_sig; - enum GNUNET_DB_QueryStatus qs; qs = db_plugin->get_wire_fee (db_plugin->cls, au->wa->method, au->execution_time, &start_date, &end_date, - &au->wire_fee, - &closing_fee, + &au->fees, &master_sig); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Could not get wire fees for %s at %s. Aborting run.\n", au->wa->method, - GNUNET_STRINGS_absolute_time_to_string (au->execution_time)); - return GNUNET_DB_STATUS_HARD_ERROR; + GNUNET_TIME_timestamp2s (au->execution_time)); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; } } + /* Now try to find other deposits to aggregate */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator starts aggregation for deposit %llu to %s with wire fee %s\n", - (unsigned long long) row_id, - TALER_B2S (&au->wtid), - TALER_amount2s (&au->wire_fee)); - qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, - &au->wtid, - row_id); - if (qs <= 0) + "Found ready deposit for %s, aggregating by target %s\n", + TALER_B2S (&au->merchant_pub), + au->payto_uri); + qs = db_plugin->select_aggregation_transient (db_plugin->cls, + &au->h_payto, + &au->merchant_pub, + au->wa->section_name, + &au->wtid, + &au->trans); + switch (qs) { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to lookup transient aggregates!\n"); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* serializiability issue, try again */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Serialization issue, trying again later!\n"); + return GNUNET_NO; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, + &au->wtid, + sizeof (au->wtid)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No transient aggregation found, starting %s\n", + TALER_B2S (&au->wtid)); + au->have_transient = false; + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + au->have_transient = true; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transient aggregation found, resuming %s\n", + TALER_B2S (&au->wtid)); + break; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator marks deposit %llu as done\n", - (unsigned long long) row_id); - qs = db_plugin->mark_deposit_done (db_plugin->cls, - row_id); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) + qs = db_plugin->aggregate (db_plugin->cls, + &au->h_payto, + &au->merchant_pub, + &au->wtid, + &au->total_amount); + if (GNUNET_DB_STATUS_HARD_ERROR == qs) { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to execute aggregation!\n"); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; } - return qs; -} - - -/** - * Function called with details about another deposit we - * can aggregate into an existing aggregation unit. - * - * @param cls a `struct AggregationUnit` - * @param row_id identifies database entry - * @param coin_pub public key of the coin - * @param amount_with_fee amount that was deposited including fee - * @param deposit_fee amount the exchange gets to keep as transaction fees - * @param h_contract_terms hash of the proposal data known to merchant and customer - * @return transaction status code - */ -static enum GNUNET_DB_QueryStatus -aggregate_cb (void *cls, - uint64_t row_id, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount_with_fee, - const struct TALER_Amount *deposit_fee, - const struct GNUNET_HashCode *h_contract_terms) -{ - struct AggregationUnit *au = cls; - struct TALER_Amount old; - enum GNUNET_DB_QueryStatus qs; - - if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { - /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */ - GNUNET_break (0); - /* Skip this one, but keep going with the overall transaction */ - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + /* serializiability issue, try again */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Serialization issue, trying again later!\n"); + return GNUNET_NO; } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Aggregation total is %s.\n", + TALER_amount2s (&au->total_amount)); + /* Subtract wire transfer fee and round to the unit supported by the + wire transfer method; Check if after rounding down, we still have + an amount to transfer, and if not mark as 'tiny'. */ + if (au->have_transient) + GNUNET_assert (0 <= + TALER_amount_add (&au->total_amount, + &au->total_amount, + &au->trans)); - /* add to total */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding transaction amount %s from row %llu to aggregation\n", - TALER_amount2s (amount_with_fee), - (unsigned long long) row_id); - /* save the existing total aggregate in 'old', for later */ - old = au->total_amount; - /* we begin with the total contribution of the current coin */ - au->total_amount = *amount_with_fee; - /* compute contribution of this coin (after fees) */ - au->have_refund = GNUNET_NO; - qs = db_plugin->select_refunds_by_coin (db_plugin->cls, - coin_pub, - &au->merchant_pub, - h_contract_terms, - &refund_by_coin_cb, - au); - if (0 > qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - if (GNUNET_NO == au->have_refund) - { - struct TALER_Amount tmp; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Subtracting deposit fee %s for non-refunded coin\n", - TALER_amount2s (deposit_fee)); - if (0 > - TALER_amount_subtract (&tmp, + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Rounding aggregate of %s\n", + TALER_amount2s (&au->total_amount)); + if ( (0 >= + TALER_amount_subtract (&au->final_amount, &au->total_amount, - deposit_fee)) + &au->fees.wire)) || + (GNUNET_SYSERR == + TALER_amount_round_down (&au->final_amount, + ¤cy_round_unit)) || + (TALER_amount_is_zero (&au->final_amount)) || + (! kyc_satisfied (au)) || + (! aml_satisfied (au)) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Not ready for wire transfer (%d/%s)\n", + qs, + TALER_amount2s (&au->final_amount)); + if (au->have_transient) + qs = db_plugin->update_aggregation_transient (db_plugin->cls, + &au->h_payto, + &au->wtid, + au->requirement_row, + &au->total_amount); + else + qs = db_plugin->create_aggregation_transient (db_plugin->cls, + &au->h_payto, + au->wa->section_name, + &au->merchant_pub, + &au->wtid, + au->requirement_row, + &au->total_amount); + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Fatally malformed record at %llu over amount %s (deposit fee exceeds deposited value)\n", - (unsigned long long) row_id, - TALER_amount2s (&au->total_amount)); - GNUNET_assert (GNUNET_OK == - TALER_amount_set_zero (old.currency, - &au->total_amount)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue, trying again later!\n"); + return GNUNET_NO; } - else + if (GNUNET_DB_STATUS_HARD_ERROR == qs) { - au->total_amount = tmp; + GNUNET_break (0); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; } + /* commit */ + return GNUNET_OK; } - /* now add the au->total_amount with the (remaining) contribution of - the current coin to the 'old' value with the current aggregate value */ + qs = trigger_wire_transfer (au); + switch (qs) { - struct TALER_Amount tmp; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue during aggregation; trying again later!\n") + ; + return GNUNET_NO; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; + default: + break; + } + { + struct TALER_CoinDepositEventP rep = { + .header.size = htons (sizeof (rep)), + .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED), + .merchant_pub = au->merchant_pub + }; + + db_plugin->event_notify (db_plugin->cls, + &rep.header, + NULL, + 0); + } + return GNUNET_OK; + +} + + +static void +run_aggregation (void *cls) +{ + struct Shard *s = cls; + struct AggregationUnit au_active; + enum GNUNET_DB_QueryStatus qs; + enum GNUNET_GenericReturnValue ret; - if (0 > - TALER_amount_add (&tmp, - &au->total_amount, - &old)) + task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Checking for ready deposits to aggregate\n"); + /* make sure we have current fees */ + memset (&au_active, + 0, + sizeof (au_active)); + au_active.execution_time = GNUNET_TIME_timestamp_get (); + if (GNUNET_OK != + db_plugin->start_deferred_wire_out (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to start database transaction!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + } + qs = db_plugin->get_ready_deposit ( + db_plugin->cls, + s->shard_start, + s->shard_end, + &au_active.merchant_pub, + &au_active.payto_uri); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + cleanup_au (&au_active); + db_plugin->rollback (db_plugin->cls); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to begin deposit iteration!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + cleanup_au (&au_active); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + s); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Overflow or currency incompatibility during aggregation at %llu\n", - (unsigned long long) row_id); - /* Skip this one, but keep going! */ - au->total_amount = old; - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + uint64_t counter = s->work_counter; + struct GNUNET_TIME_Relative duration + = GNUNET_TIME_absolute_get_duration (s->start_time.abs_time); + + cleanup_au (&au_active); + db_plugin->rollback (db_plugin->cls); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Completed shard [%u,%u] after %s with %llu deposits\n", + (unsigned int) s->shard_start, + (unsigned int) s->shard_end, + GNUNET_TIME_relative2s (duration, + true), + (unsigned long long) counter); + release_shard (s); + if ( (GNUNET_YES == test_mode) && + (0 == counter) ) + { + /* in test mode, shutdown after a shard is done with 0 work */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No work done and in test mode, shutting down\n"); + GNUNET_SCHEDULER_shutdown (); + return; + } + GNUNET_assert (NULL == task); + /* If we ended up doing zero work, sleep a bit */ + if (0 == counter) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Going to sleep for %s before trying again\n", + GNUNET_TIME_relative2s (aggregator_idle_sleep_interval, + true)); + task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, + &drain_kyc_alerts, + NULL); + } + else + { + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + } + return; } - au->total_amount = tmp; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + s->work_counter++; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Found ready deposit!\n"); + /* continued below */ + break; } - /* "append" to our list of rows */ - au->additional_rows[au->rows_offset++] = row_id; - /* insert into aggregation tracking table */ - qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, - &au->wtid, - row_id); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) + TALER_payto_hash (au_active.payto_uri, + &au_active.h_payto); + ret = do_aggregate (&au_active); + cleanup_au (&au_active); + switch (ret) { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; + case GNUNET_SYSERR: + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); + release_shard (s); + return; + case GNUNET_NO: + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + s); + return; + case GNUNET_OK: + /* continued below */ + break; } - qs = db_plugin->mark_deposit_done (db_plugin->cls, - row_id); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Committing aggregation result\n"); + + /* Now we can finally commit the overall transaction, as we are + again consistent if all of this passes. */ + switch (commit_or_warn ()) { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* try again */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue on commit; trying again later!\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + s); + return; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ + release_shard (s); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Commit complete, going again\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + s); + return; + default: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ + release_shard (s); + return; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator marked deposit %llu as DONE\n", - (unsigned long long) row_id); - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; } /** - * Perform a database commit. If it fails, print a warning. + * Select a shard to work on. * - * @return status of commit + * @param cls NULL */ -static enum GNUNET_DB_QueryStatus -commit_or_warn (void) +static void +run_shard (void *cls) { + struct Shard *s; enum GNUNET_DB_QueryStatus qs; - qs = db_plugin->commit (db_plugin->cls); - if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) - return qs; - GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs) - ? GNUNET_ERROR_TYPE_INFO - : GNUNET_ERROR_TYPE_ERROR, - "Failed to commit database transaction!\n"); - return qs; + (void) cls; + task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Running aggregation shard\n"); + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + s = GNUNET_new (struct Shard); + s->start_time = GNUNET_TIME_timestamp_get (); + qs = db_plugin->begin_revolving_shard (db_plugin->cls, + "aggregator", + shard_size, + 1U + INT32_MAX, + &s->shard_start, + &s->shard_end); + if (0 >= qs) + { + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + static struct GNUNET_TIME_Relative delay; + + GNUNET_free (s); + delay = GNUNET_TIME_randomized_backoff (delay, + GNUNET_TIME_UNIT_SECONDS); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_delayed (delay, + &run_shard, + NULL); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to begin shard (%d)!\n", + qs); + GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR != qs); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Starting shard [%u:%u]!\n", + (unsigned int) s->shard_start, + (unsigned int) s->shard_end); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + s); } /** - * Main work function that queries the DB and aggregates transactions - * into larger wire transfers. + * Function called on transient aggregations matching + * a particular hash of a payto URI. * - * @param cls NULL + * @param cls + * @param payto_uri corresponding payto URI + * @param wtid wire transfer identifier of transient aggregation + * @param merchant_pub public key of the merchant + * @param total amount aggregated so far + * @return true to continue to iterate */ +static bool +handle_transient_cb ( + void *cls, + const char *payto_uri, + const struct TALER_WireTransferIdentifierRawP *wtid, + const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_Amount *total) +{ + struct AggregationUnit *au = cls; + + if (GNUNET_OK != au->ret) + { + GNUNET_break (0); + return false; + } + au->payto_uri = GNUNET_strdup (payto_uri); + au->wtid = *wtid; + au->merchant_pub = *merchant_pub; + au->trans = *total; + au->have_transient = true; + au->ret = do_aggregate (au); + GNUNET_free (au->payto_uri); + return (GNUNET_OK == au->ret); +} + + static void -run_aggregation (void *cls) +drain_kyc_alerts (void *cls) { - struct AggregationUnit au_active; enum GNUNET_DB_QueryStatus qs; + struct AggregationUnit au; (void) cls; task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Checking for ready deposits to aggregate\n"); + "Draining KYC alerts\n"); + memset (&au, + 0, + sizeof (au)); + au.execution_time = GNUNET_TIME_timestamp_get (); if (GNUNET_SYSERR == db_plugin->preflight (db_plugin->cls)) { @@ -654,7 +1143,8 @@ run_aggregation (void *cls) return; } if (GNUNET_OK != - db_plugin->start_deferred_wire_out (db_plugin->cls)) + db_plugin->start (db_plugin->cls, + "handle kyc alerts")) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to start database transaction!\n"); @@ -662,262 +1152,125 @@ run_aggregation (void *cls) GNUNET_SCHEDULER_shutdown (); return; } - memset (&au_active, - 0, - sizeof (au_active)); - qs = db_plugin->get_ready_deposit (db_plugin->cls, - &deposit_cb, - &au_active); - if (0 >= qs) + while (1) { - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); - if (GNUNET_DB_STATUS_HARD_ERROR == qs) + qs = db_plugin->drain_kyc_alert (db_plugin->cls, + 1, + &au.h_payto); + switch (qs) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to execute deposit iteration!\n"); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); return; - } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - /* should re-try immediately */ + case GNUNET_DB_STATUS_SOFT_ERROR: + db_plugin->rollback (db_plugin->cls); GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); return; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "No more ready deposits, going to sleep\n"); - if (GNUNET_YES == test_mode) - { - /* in test mode, shutdown if we end up being idle */ - GNUNET_SCHEDULER_shutdown (); - } - else - { - /* nothing to do, sleep for a minute and try again */ + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + qs = db_plugin->commit (db_plugin->cls); + if (qs < 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to commit KYC drain\n"); GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, - &run_aggregation, - NULL); + task = GNUNET_SCHEDULER_add_now (&run_shard, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* handled below */ + break; } - return; - } - - /* Now try to find other deposits to aggregate */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Found ready deposit for %s, aggregating\n", - TALER_B2S (&au_active.merchant_pub)); - qs = db_plugin->iterate_matching_deposits (db_plugin->cls, - &au_active.h_wire, - &au_active.merchant_pub, - &aggregate_cb, - &au_active, - TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT); - if ( (GNUNET_DB_STATUS_HARD_ERROR == qs) || - (GNUNET_YES == au_active.failed) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to execute deposit iteration!\n"); - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return; - } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - /* serializiability issue, try again */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Serialization issue, trying again later!\n"); - db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); - return; - } - /* Subtract wire transfer fee and round to the unit supported by the - wire transfer method; Check if after rounding down, we still have - an amount to transfer, and if not mark as 'tiny'. */ - if ( (0 >= - TALER_amount_subtract (&au_active.final_amount, - &au_active.total_amount, - &au_active.wire_fee)) || - (GNUNET_SYSERR == - TALER_amount_round_down (&au_active.final_amount, - ¤cy_round_unit)) || - ( (0 == au_active.final_amount.value) && - (0 == au_active.final_amount.fraction) ) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Aggregate value too low for transfer (%d/%s)\n", - qs, - TALER_amount2s (&au_active.final_amount)); - /* Rollback ongoing transaction, as we will not use the respective - WTID and thus need to remove the tracking data */ - db_plugin->rollback (db_plugin->cls); - - /* There were results, just the value was too low. Start another - transaction to mark all* of the selected deposits as minor! */ - if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - "aggregator mark tiny transactions")) + au.ret = GNUNET_OK; + qs = db_plugin->find_aggregation_transient (db_plugin->cls, + &au.h_payto, + &handle_transient_cb, + &au); + switch (qs) { + case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = EXIT_FAILURE; - cleanup_au (&au_active); - GNUNET_SCHEDULER_shutdown (); + "Failed to lookup transient aggregates!\n"); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); return; - } - /* Mark transactions by row_id as minor */ - qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - au_active.row_id); - if (0 <= qs) - { - for (unsigned int i = 0; i<au_active.rows_offset; i++) - { - qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - au_active.additional_rows[i]); - if (0 > qs) - break; - } - } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { + case GNUNET_DB_STATUS_SOFT_ERROR: + /* serializiability issue, try again */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Serialization issue, trying again later!\n"); db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); - /* start again */ GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + continue; /* while (1) */ + default: + break; } - if (GNUNET_DB_STATUS_HARD_ERROR == qs) - { - db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return; - } - /* commit */ - (void) commit_or_warn (); - cleanup_au (&au_active); - - /* start again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); - return; - } - { - char *amount_s; - - amount_s = TALER_amount_to_string (&au_active.final_amount); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Preparing wire transfer of %s to %s\n", - amount_s, - TALER_B2S (&au_active.merchant_pub)); - GNUNET_free (amount_s); - } + break; + } /* while(1) */ { - void *buf; - size_t buf_size; + enum GNUNET_GenericReturnValue ret; + ret = au.ret; + cleanup_au (&au); + switch (ret) { - char *url; - - url = TALER_JSON_wire_to_payto (au_active.wire); - TALER_BANK_prepare_transfer (url, - &au_active.final_amount, - exchange_base_url, - &au_active.wtid, - &buf, - &buf_size); - GNUNET_free (url); + case GNUNET_SYSERR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ + return; + case GNUNET_NO: + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_OK: + /* continued below */ + break; } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Storing %u bytes of wire prepare data\n", - (unsigned int) buf_size); - /* Commit our intention to execute the wire transfer! */ - qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, - au_active.wa->method, - buf, - buf_size); - GNUNET_free (buf); } - /* Commit the WTID data to 'wire_out' to finally satisfy aggregation - table constraints */ - if (qs >= 0) - qs = db_plugin->store_wire_transfer_out (db_plugin->cls, - au_active.execution_time, - &au_active.wtid, - au_active.wire, - au_active.wa->section_name, - &au_active.final_amount); - cleanup_au (&au_active); - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Serialization issue for prepared wire data; trying again later!\n"); - db_plugin->rollback (db_plugin->cls); - /* start again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - NULL); - return; - } - if (GNUNET_DB_STATUS_HARD_ERROR == qs) - { - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls); - /* die hard */ - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Stored wire transfer out instructions\n"); - - /* Now we can finally commit the overall transaction, as we are - again consistent if all of this passes. */ switch (commit_or_warn ()) { case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Commit issue for prepared wire data; trying again later!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue on commit; trying again later!\n"); GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); return; case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Preparation complete, going again\n"); + "Commit complete, going again\n"); GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); return; default: GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ return; } } @@ -937,22 +1290,46 @@ run (void *cls, const char *cfgfile, const struct GNUNET_CONFIGURATION_Handle *c) { + unsigned long long ass; (void) cls; (void) args; (void) cfgfile; cfg = c; - if (GNUNET_OK != parse_wirewatch_config ()) + if (GNUNET_OK != + parse_aggregator_config ()) + { + cfg = NULL; + global_ret = EXIT_NOTCONFIGURED; + return; + } + if (GNUNET_OK != + GNUNET_CONFIGURATION_get_value_number (cfg, + "exchange", + "AGGREGATOR_SHARD_SIZE", + &ass)) { cfg = NULL; global_ret = EXIT_NOTCONFIGURED; return; } + if ( (0 == ass) || + (ass > INT32_MAX) ) + shard_size = 1U + INT32_MAX; + else + shard_size = (uint32_t) ass; + if (GNUNET_OK != + TALER_KYCLOGIC_kyc_init (cfg)) + { + cfg = NULL; + global_ret = EXIT_NOTCONFIGURED; + return; + } + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - cls); } @@ -974,6 +1351,10 @@ main (int argc, "test", "run in test mode and exit when idle", &test_mode), + GNUNET_GETOPT_option_flag ('y', + "kyc-off", + "perform wire transfers without KYC checks", + &kyc_off), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; |