diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 1283 |
1 files changed, 754 insertions, 529 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index caa4528db..691d65ae3 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2016-2021 Taler Systems SA + Copyright (C) 2016-2022 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -26,7 +26,9 @@ #include "taler_exchangedb_lib.h" #include "taler_exchangedb_plugin.h" #include "taler_json_lib.h" +#include "taler_kyclogic_lib.h" #include "taler_bank_service.h" +#include "taler_dbevents.h" /** @@ -43,7 +45,13 @@ struct AggregationUnit struct TALER_MerchantPublicKeyP merchant_pub; /** - * Total amount to be transferred, before subtraction of @e wire_fee and rounding down. + * Transient amount already found aggregated, + * set only if @e have_transient is true. + */ + struct TALER_Amount trans; + + /** + * Total amount to be transferred, before subtraction of @e fees.wire and rounding down. */ struct TALER_Amount total_amount; @@ -55,12 +63,7 @@ struct AggregationUnit /** * Wire fee we charge for @e wp at @e execution_time. */ - struct TALER_Amount wire_fee; - - /** - * Hash of @e wire. - */ - struct GNUNET_HashCode h_wire; + struct TALER_WireFeeSet fees; /** * Wire transfer identifier we use. @@ -68,20 +71,20 @@ struct AggregationUnit struct TALER_WireTransferIdentifierRawP wtid; /** - * Row ID of the transaction that started it all. - */ - uint64_t row_id; - - /** * The current time (which triggered the aggregation and * defines the wire fee). */ - struct GNUNET_TIME_Absolute execution_time; + struct GNUNET_TIME_Timestamp execution_time; /** * Wire details of the merchant. */ - json_t *wire; + char *payto_uri; + + /** + * Selected wire target for the aggregation. + */ + struct TALER_PaytoHashP h_payto; /** * Exchange wire account to be used for the preparation and @@ -90,20 +93,24 @@ struct AggregationUnit const struct TALER_EXCHANGEDB_AccountInfo *wa; /** - * Array of row_ids from the aggregation. + * Row in KYC table for legitimization requirements + * that are pending for this aggregation, or 0 if none. */ - uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT]; + uint64_t requirement_row; /** - * Offset specifying how many @e additional_rows are in use. + * Set to #GNUNET_OK during transient checking + * while everything is OK. Otherwise see return + * value of #do_aggregate(). */ - unsigned int rows_offset; + enum GNUNET_GenericReturnValue ret; /** - * Set to true if we encountered a refund during #refund_by_coin_cb. - * Used to wave the deposit fee. + * Do we have an entry in the transient table for + * this aggregation? */ - bool have_refund; + bool have_transient; + }; @@ -116,7 +123,7 @@ struct Shard /** * When did we start processing the shard? */ - struct GNUNET_TIME_Absolute start_time; + struct GNUNET_TIME_Timestamp start_time; /** * Starting row of the shard. @@ -143,13 +150,26 @@ struct Shard static struct TALER_Amount currency_round_unit; /** + * What is the largest amount we transfer before triggering + * an AML check? + */ +static struct TALER_Amount aml_threshold; + +/** * What is the base URL of this exchange? Used in the - * wire transfer subjects to that merchants and governments + * wire transfer subjects so that merchants and governments * can ask for the list of aggregated deposits. */ static char *exchange_base_url; /** + * Set to #GNUNET_YES if this exchange does not support KYC checks + * and thus deposits are to be aggregated regardless of the + * KYC status of the target account. + */ +static int kyc_off; + +/** * The exchange's configuration. */ static const struct GNUNET_CONFIGURATION_Handle *cfg; @@ -164,7 +184,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; */ static struct GNUNET_SCHEDULER_Task *task; - /** * How long should we sleep when idle before trying to find more work? */ @@ -192,19 +211,19 @@ static int test_mode; * Main work function that queries the DB and aggregates transactions * into larger wire transfers. * - * @param cls NULL + * @param cls a `struct Shard *` */ static void run_aggregation (void *cls); /** - * Select a shard to work on. + * Work on transactions unlocked by KYC. * * @param cls NULL */ static void -run_shard (void *cls); +drain_kyc_alerts (void *cls); /** @@ -216,8 +235,7 @@ static void cleanup_au (struct AggregationUnit *au) { GNUNET_assert (NULL != au); - if (NULL != au->wire) - json_decref (au->wire); + GNUNET_free (au->payto_uri); memset (au, 0, sizeof (*au)); @@ -240,6 +258,7 @@ shutdown_task (void *cls) GNUNET_SCHEDULER_cancel (task); task = NULL; } + TALER_KYCLOGIC_kyc_done (); TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; TALER_EXCHANGEDB_unload_accounts (); @@ -248,12 +267,12 @@ shutdown_task (void *cls) /** - * Parse the configuration for wirewatch. + * Parse the configuration for aggregator. * * @return #GNUNET_OK on success */ static enum GNUNET_GenericReturnValue -parse_wirewatch_config (void) +parse_aggregator_config (void) { if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, @@ -282,11 +301,20 @@ parse_wirewatch_config (void) "taler", "CURRENCY_ROUND_UNIT", ¤cy_round_unit)) || - ( (0 != currency_round_unit.fraction) && - (0 != currency_round_unit.value) ) ) + (TALER_amount_is_zero (¤cy_round_unit)) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Need non-zero amount in section `taler' under `CURRENCY_ROUND_UNIT'\n"); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + TALER_config_get_amount (cfg, + "exchange", + "AML_THRESHOLD", + &aml_threshold)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Need non-zero value in section `TALER' under `CURRENCY_ROUND_UNIT'\n"); + "Need amount in section `exchange' under `AML_THRESHOLD'\n"); return GNUNET_SYSERR; } @@ -312,401 +340,534 @@ parse_wirewatch_config (void) /** - * Callback invoked with information about refunds applicable - * to a particular coin. Subtract refunded amount(s) from - * the aggregation unit's total amount. + * Perform a database commit. If it fails, print a warning. * - * @param cls closure with a `struct AggregationUnit *` - * @param amount_with_fee what was the refunded amount with the fee - * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop + * @return status of commit */ -static int -refund_by_coin_cb (void *cls, - const struct TALER_Amount *amount_with_fee) +static enum GNUNET_DB_QueryStatus +commit_or_warn (void) { - struct AggregationUnit *aux = cls; + enum GNUNET_DB_QueryStatus qs; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator subtracts applicable refund of amount %s\n", - TALER_amount2s (amount_with_fee)); - aux->have_refund = true; - if (0 > - TALER_amount_subtract (&aux->total_amount, - &aux->total_amount, - amount_with_fee)) + qs = db_plugin->commit (db_plugin->cls); + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + return qs; + GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs) + ? GNUNET_ERROR_TYPE_INFO + : GNUNET_ERROR_TYPE_ERROR, + "Failed to commit database transaction!\n"); + return qs; +} + + +/** + * Release lock on shard @a s in the database. + * On error, terminates this process. + * + * @param[in] s shard to free (and memory to release) + */ +static void +release_shard (struct Shard *s) +{ + enum GNUNET_DB_QueryStatus qs; + + qs = db_plugin->release_revolving_shard ( + db_plugin->cls, + "aggregator", + s->shard_start, + s->shard_end); + GNUNET_free (s); + switch (qs) { + case GNUNET_DB_STATUS_HARD_ERROR: + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); GNUNET_break (0); - return GNUNET_SYSERR; + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* Strange, but let's just continue */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* normal case */ + break; } - return GNUNET_OK; } /** - * Function called with details about deposits that have been made, - * with the goal of executing the corresponding wire transaction. + * Trigger the wire transfer for the @a au_active + * and delete the record of the aggregation. * - * @param cls a `struct AggregationUnit` - * @param row_id identifies database entry - * @param merchant_pub public key of the merchant - * @param coin_pub public key of the coin - * @param amount_with_fee amount that was deposited including fee - * @param deposit_fee amount the exchange gets to keep as transaction fees - * @param h_contract_terms hash of the proposal data known to merchant and customer - * @param wire target account for the wire transfer - * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate + * @param au_active information about the aggregation */ static enum GNUNET_DB_QueryStatus -deposit_cb (void *cls, - uint64_t row_id, - const struct TALER_MerchantPublicKeyP *merchant_pub, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount_with_fee, - const struct TALER_Amount *deposit_fee, - const struct GNUNET_HashCode *h_contract_terms, - const json_t *wire) +trigger_wire_transfer (const struct AggregationUnit *au_active) { - struct AggregationUnit *au = cls; enum GNUNET_DB_QueryStatus qs; - au->merchant_pub = *merchant_pub; GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Aggregator processing payment %s with amount %s\n", - TALER_B2S (coin_pub), - TALER_amount2s (amount_with_fee)); - au->row_id = row_id; - au->total_amount = *amount_with_fee; - au->have_refund = false; - qs = db_plugin->select_refunds_by_coin (db_plugin->cls, - coin_pub, - &au->merchant_pub, - h_contract_terms, - &refund_by_coin_cb, - au); - if (0 > qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - if (! au->have_refund) + "Preparing wire transfer of %s to %s\n", + TALER_amount2s (&au_active->final_amount), + TALER_B2S (&au_active->merchant_pub)); { - struct TALER_Amount ntotal; + void *buf; + size_t buf_size; - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Non-refunded transaction, subtracting deposit fee %s\n", - TALER_amount2s (deposit_fee)); - if (0 > - TALER_amount_subtract (&ntotal, - amount_with_fee, - deposit_fee)) - { - /* This should never happen, issue a warning, but continue processing - with an amount of zero, least we hang here for good. */ - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Fatally malformed record at row %llu over %s (deposit fee exceeds deposited value)\n", - (unsigned long long) row_id, - TALER_amount2s (amount_with_fee)); - GNUNET_assert (GNUNET_OK == - TALER_amount_set_zero (au->total_amount.currency, - &au->total_amount)); - } - else - { - au->total_amount = ntotal; - } + TALER_BANK_prepare_transfer (au_active->payto_uri, + &au_active->final_amount, + exchange_base_url, + &au_active->wtid, + &buf, + &buf_size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Storing %u bytes of wire prepare data\n", + (unsigned int) buf_size); + /* Commit our intention to execute the wire transfer! */ + qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, + au_active->wa->method, + buf, + buf_size); + GNUNET_free (buf); } + /* Commit the WTID data to 'wire_out' */ + if (qs >= 0) + qs = db_plugin->store_wire_transfer_out (db_plugin->cls, + au_active->execution_time, + &au_active->wtid, + &au_active->h_payto, + au_active->wa->section_name, + &au_active->final_amount); + + if ( (qs >= 0) && + au_active->have_transient) + qs = db_plugin->delete_aggregation_transient (db_plugin->cls, + &au_active->h_payto, + &au_active->wtid); + return qs; +} + + +/** + * Callback to return all applicable amounts for the KYC + * decision to @ a cb. + * + * @param cls a `struct AggregationUnit *` + * @param limit time limit for the iteration + * @param cb function to call with the amounts + * @param cb_cls closure for @a cb + */ +static void +return_relevant_amounts (void *cls, + struct GNUNET_TIME_Absolute limit, + TALER_EXCHANGEDB_KycAmountCallback cb, + void *cb_cls) +{ + const struct AggregationUnit *au_active = cls; + enum GNUNET_DB_QueryStatus qs; - GNUNET_assert (NULL == au->wire); - if (NULL == (au->wire = json_incref ((json_t *) wire))) - { - GNUNET_break (0); - return GNUNET_DB_STATUS_HARD_ERROR; - } - if (GNUNET_OK != - TALER_JSON_merchant_wire_signature_hash (wire, - &au->h_wire)) - { - GNUNET_break (0); - json_decref (au->wire); - au->wire = NULL; - return GNUNET_DB_STATUS_HARD_ERROR; - } - GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, - &au->wtid, - sizeof (au->wtid)); GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n", - TALER_B2S (&au->wtid), - TALER_amount2s (amount_with_fee), - (unsigned long long) row_id); + "Returning amount %s in KYC check\n", + TALER_amount2s (&au_active->total_amount)); + if (GNUNET_OK != + cb (cb_cls, + &au_active->total_amount, + GNUNET_TIME_absolute_get ())) + return; + qs = db_plugin->select_aggregation_amounts_for_kyc_check ( + db_plugin->cls, + &au_active->h_payto, + limit, + cb, + cb_cls); + if (GNUNET_DB_STATUS_HARD_ERROR == qs) { - char *url; - - url = TALER_JSON_wire_to_payto (au->wire); - au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (url); - if (NULL == au->wa) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "No exchange account configured for `%s', please fix your setup to continue!\n", - url); - GNUNET_free (url); - return GNUNET_DB_STATUS_HARD_ERROR; - } - GNUNET_free (url); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to select aggregation amounts for KYC limit check!\n"); } +} - /* make sure we have current fees */ - au->execution_time = GNUNET_TIME_absolute_get (); - (void) GNUNET_TIME_round_abs (&au->execution_time); - { - struct TALER_Amount closing_fee; - struct GNUNET_TIME_Absolute start_date; - struct GNUNET_TIME_Absolute end_date; - struct TALER_MasterSignatureP master_sig; - enum GNUNET_DB_QueryStatus qs; - qs = db_plugin->get_wire_fee (db_plugin->cls, - au->wa->method, - au->execution_time, - &start_date, - &end_date, - &au->wire_fee, - &closing_fee, - &master_sig); - if (0 >= qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Could not get wire fees for %s at %s. Aborting run.\n", - au->wa->method, - GNUNET_STRINGS_absolute_time_to_string (au->execution_time)); - return GNUNET_DB_STATUS_HARD_ERROR; - } - } +/** + * Test if KYC is required for a transfer to @a h_payto. + * + * @param[in,out] au_active aggregation unit to check for + * @return true if KYC checks are satisfied + */ +static bool +kyc_satisfied (struct AggregationUnit *au_active) +{ + char *requirement; + enum GNUNET_DB_QueryStatus qs; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator starts aggregation for deposit %llu to %s with wire fee %s\n", - (unsigned long long) row_id, - TALER_B2S (&au->wtid), - TALER_amount2s (&au->wire_fee)); - qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, - &au->wtid, - row_id); - if (qs <= 0) + if (kyc_off) + return true; + qs = TALER_KYCLOGIC_kyc_test_required ( + TALER_KYCLOGIC_KYC_TRIGGER_DEPOSIT, + &au_active->h_payto, + db_plugin->select_satisfied_kyc_processes, + db_plugin->cls, + &return_relevant_amounts, + (void *) au_active, + &requirement); + if (qs < 0) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; + return false; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator marks deposit %llu as done\n", - (unsigned long long) row_id); - qs = db_plugin->mark_deposit_done (db_plugin->cls, - row_id); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) + if (NULL == requirement) + return true; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "KYC requirement for %s is %s\n", + TALER_amount2s (&au_active->total_amount), + requirement); + qs = db_plugin->insert_kyc_requirement_for_account ( + db_plugin->cls, + requirement, + &au_active->h_payto, + NULL, /* not a reserve */ + &au_active->requirement_row); + if (qs < 0) { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to persist KYC requirement `%s' in DB!\n", + requirement); } - return qs; + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Legitimization process %llu started\n", + (unsigned long long) au_active->requirement_row); + } + GNUNET_free (requirement); + return false; } /** - * Function called with details about another deposit we - * can aggregate into an existing aggregation unit. + * Function called on each @a amount that was found to + * be relevant for an AML check. * - * @param cls a `struct AggregationUnit` - * @param row_id identifies database entry - * @param coin_pub public key of the coin - * @param amount_with_fee amount that was deposited including fee - * @param deposit_fee amount the exchange gets to keep as transaction fees - * @param h_contract_terms hash of the proposal data known to merchant and customer - * @return transaction status code + * @param cls closure with the `struct TALER_Amount *` where we store the sum + * @param amount encountered transaction amount + * @param date when was the amount encountered + * @return #GNUNET_OK to continue to iterate, + * #GNUNET_NO to abort iteration + * #GNUNET_SYSERR on internal error (also abort itaration) */ -static enum GNUNET_DB_QueryStatus -aggregate_cb (void *cls, - uint64_t row_id, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount_with_fee, - const struct TALER_Amount *deposit_fee, - const struct GNUNET_HashCode *h_contract_terms) +static enum GNUNET_GenericReturnValue +sum_for_aml ( + void *cls, + const struct TALER_Amount *amount, + struct GNUNET_TIME_Absolute date) { - struct AggregationUnit *au = cls; - struct TALER_Amount old; - enum GNUNET_DB_QueryStatus qs; + struct TALER_Amount *sum = cls; - if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) + (void) date; + if (0 > + TALER_amount_add (sum, + sum, + amount)) { - /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */ GNUNET_break (0); - /* Skip this one, but keep going with the overall transaction */ - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + return GNUNET_SYSERR; } + return GNUNET_OK; +} - /* add to total */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Adding transaction amount %s from row %llu to aggregation\n", - TALER_amount2s (amount_with_fee), - (unsigned long long) row_id); - /* save the existing total aggregate in 'old', for later */ - old = au->total_amount; - /* we begin with the total contribution of the current coin */ - au->total_amount = *amount_with_fee; - /* compute contribution of this coin (after fees) */ - au->have_refund = false; - qs = db_plugin->select_refunds_by_coin (db_plugin->cls, - coin_pub, - &au->merchant_pub, - h_contract_terms, - &refund_by_coin_cb, - au); - if (0 > qs) - { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; - } - if (! au->have_refund) - { - struct TALER_Amount tmp; - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Subtracting deposit fee %s for non-refunded coin\n", - TALER_amount2s (deposit_fee)); - if (0 > - TALER_amount_subtract (&tmp, - &au->total_amount, - deposit_fee)) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Fatally malformed record at %llu over amount %s (deposit fee exceeds deposited value)\n", - (unsigned long long) row_id, - TALER_amount2s (&au->total_amount)); - GNUNET_assert (GNUNET_OK == - TALER_amount_set_zero (old.currency, - &au->total_amount)); - } - else - { - au->total_amount = tmp; - } - } +/** + * Test if AML is required for a transfer to @a h_payto. + * + * @param[in,out] au_active aggregation unit to check for + * @return true if AML checks are satisfied + */ +static bool +aml_satisfied (struct AggregationUnit *au_active) +{ + enum GNUNET_DB_QueryStatus qs; + struct TALER_Amount total; + struct TALER_Amount threshold; + enum TALER_AmlDecisionState decision; + struct TALER_EXCHANGEDB_KycStatus kyc; - /* now add the au->total_amount with the (remaining) contribution of - the current coin to the 'old' value with the current aggregate value */ + total = au_active->final_amount; + qs = db_plugin->select_aggregation_amounts_for_kyc_check ( + db_plugin->cls, + &au_active->h_payto, + GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), + GNUNET_TIME_UNIT_MONTHS), + &sum_for_aml, + &total); + if (qs < 0) { - struct TALER_Amount tmp; - - if (0 > - TALER_amount_add (&tmp, - &au->total_amount, - &old)) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Overflow or currency incompatibility during aggregation at %llu\n", - (unsigned long long) row_id); - /* Skip this one, but keep going! */ - au->total_amount = old; - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; - } - au->total_amount = tmp; + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; } - - /* "append" to our list of rows */ - au->additional_rows[au->rows_offset++] = row_id; - /* insert into aggregation tracking table */ - qs = db_plugin->insert_aggregation_tracking (db_plugin->cls, - &au->wtid, - row_id); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) + qs = db_plugin->select_aml_threshold (db_plugin->cls, + &au_active->h_payto, + &decision, + &kyc, + &threshold); + if (qs < 0) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; + return false; } - qs = db_plugin->mark_deposit_done (db_plugin->cls, - row_id); - if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs) + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); - return qs; + threshold = aml_threshold; /* use default */ + decision = TALER_AML_NORMAL; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Aggregator marked deposit %llu as DONE\n", - (unsigned long long) row_id); - return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; + switch (decision) + { + case TALER_AML_NORMAL: + if (0 >= TALER_amount_cmp (&total, + &threshold)) + { + /* total <= threshold, do nothing */ + return true; + } + qs = db_plugin->trigger_aml_process (db_plugin->cls, + &au_active->h_payto, + &total); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + return false; + case TALER_AML_PENDING: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "AML already pending, doing nothing\n"); + return false; + case TALER_AML_FROZEN: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Account frozen, doing nothing\n"); + return false; + } + GNUNET_assert (0); + return false; } /** - * Perform a database commit. If it fails, print a warning. + * Perform the main aggregation work for @a au. Expects to be in + * a working transaction, which the caller must also ultimately commit + * (or rollback) depending on our return value. * - * @return status of commit + * @param[in,out] au aggregation unit to work on + * @return #GNUNET_OK if aggregation succeeded, + * #GNUNET_NO to rollback and try again (serialization issue) + * #GNUNET_SYSERR hard error, terminate aggregator process */ -static enum GNUNET_DB_QueryStatus -commit_or_warn (void) +static enum GNUNET_GenericReturnValue +do_aggregate (struct AggregationUnit *au) { enum GNUNET_DB_QueryStatus qs; - qs = db_plugin->commit (db_plugin->cls); - if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) - return qs; - GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs) - ? GNUNET_ERROR_TYPE_INFO - : GNUNET_ERROR_TYPE_ERROR, - "Failed to commit database transaction!\n"); - return qs; -} + au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri ( + au->payto_uri); + if (NULL == au->wa) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "No exchange account configured for `%s', please fix your setup to continue!\n", + au->payto_uri); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; + } + { + struct GNUNET_TIME_Timestamp start_date; + struct GNUNET_TIME_Timestamp end_date; + struct TALER_MasterSignatureP master_sig; -/** - * Release lock on shard @a s in the database. - * On error, terminates this process. - * - * @param[in] s shard to free (and memory to release) - */ -static void -release_shard (struct Shard *s) -{ - enum GNUNET_DB_QueryStatus qs; + qs = db_plugin->get_wire_fee (db_plugin->cls, + au->wa->method, + au->execution_time, + &start_date, + &end_date, + &au->fees, + &master_sig); + if (0 >= qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Could not get wire fees for %s at %s. Aborting run.\n", + au->wa->method, + GNUNET_TIME_timestamp2s (au->execution_time)); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; + } + } - qs = db_plugin->release_revolving_shard ( - db_plugin->cls, - "aggregator", - s->shard_start, - s->shard_end); - GNUNET_free (s); + /* Now try to find other deposits to aggregate */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Found ready deposit for %s, aggregating by target %s\n", + TALER_B2S (&au->merchant_pub), + au->payto_uri); + qs = db_plugin->select_aggregation_transient (db_plugin->cls, + &au->h_payto, + &au->merchant_pub, + au->wa->section_name, + &au->wtid, + &au->trans); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to lookup transient aggregates!\n"); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; case GNUNET_DB_STATUS_SOFT_ERROR: - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); - GNUNET_break (0); - GNUNET_SCHEDULER_shutdown (); - return; + /* serializiability issue, try again */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Serialization issue, trying again later!\n"); + return GNUNET_NO; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - /* Strange, but let's just continue */ + GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, + &au->wtid, + sizeof (au->wtid)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No transient aggregation found, starting %s\n", + TALER_B2S (&au->wtid)); + au->have_transient = false; break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - /* normal case */ + au->have_transient = true; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transient aggregation found, resuming %s\n", + TALER_B2S (&au->wtid)); + break; + } + qs = db_plugin->aggregate (db_plugin->cls, + &au->h_payto, + &au->merchant_pub, + &au->wtid, + &au->total_amount); + if (GNUNET_DB_STATUS_HARD_ERROR == qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to execute aggregation!\n"); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; + } + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + /* serializiability issue, try again */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Serialization issue, trying again later!\n"); + return GNUNET_NO; + } + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Aggregation total is %s.\n", + TALER_amount2s (&au->total_amount)); + /* Subtract wire transfer fee and round to the unit supported by the + wire transfer method; Check if after rounding down, we still have + an amount to transfer, and if not mark as 'tiny'. */ + if (au->have_transient) + GNUNET_assert (0 <= + TALER_amount_add (&au->total_amount, + &au->total_amount, + &au->trans)); + + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Rounding aggregate of %s\n", + TALER_amount2s (&au->total_amount)); + if ( (0 >= + TALER_amount_subtract (&au->final_amount, + &au->total_amount, + &au->fees.wire)) || + (GNUNET_SYSERR == + TALER_amount_round_down (&au->final_amount, + ¤cy_round_unit)) || + (TALER_amount_is_zero (&au->final_amount)) || + (! kyc_satisfied (au)) || + (! aml_satisfied (au)) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Not ready for wire transfer (%d/%s)\n", + qs, + TALER_amount2s (&au->final_amount)); + if (au->have_transient) + qs = db_plugin->update_aggregation_transient (db_plugin->cls, + &au->h_payto, + &au->wtid, + au->requirement_row, + &au->total_amount); + else + qs = db_plugin->create_aggregation_transient (db_plugin->cls, + &au->h_payto, + au->wa->section_name, + &au->merchant_pub, + &au->wtid, + au->requirement_row, + &au->total_amount); + if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue, trying again later!\n"); + return GNUNET_NO; + } + if (GNUNET_DB_STATUS_HARD_ERROR == qs) + { + GNUNET_break (0); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; + } + /* commit */ + return GNUNET_OK; + } + + qs = trigger_wire_transfer (au); + switch (qs) + { + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue during aggregation; trying again later!\n") + ; + return GNUNET_NO; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; + default: break; } + { + struct TALER_CoinDepositEventP rep = { + .header.size = htons (sizeof (rep)), + .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED), + .merchant_pub = au->merchant_pub + }; + + db_plugin->event_notify (db_plugin->cls, + &rep.header, + NULL, + 0); + } + return GNUNET_OK; + } -/** - * Main work function that queries the DB and aggregates transactions - * into larger wire transfers. - * - * @param cls a `struct Shard *` - */ static void run_aggregation (void *cls) { struct Shard *s = cls; struct AggregationUnit au_active; enum GNUNET_DB_QueryStatus qs; + enum GNUNET_GenericReturnValue ret; task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Checking for ready deposits to aggregate\n"); + /* make sure we have current fees */ + memset (&au_active, + 0, + sizeof (au_active)); + au_active.execution_time = GNUNET_TIME_timestamp_get (); if (GNUNET_OK != db_plugin->start_deferred_wire_out (db_plugin->cls)) { @@ -717,15 +878,12 @@ run_aggregation (void *cls) release_shard (s); return; } - memset (&au_active, - 0, - sizeof (au_active)); qs = db_plugin->get_ready_deposit ( db_plugin->cls, s->shard_start, s->shard_end, - &deposit_cb, - &au_active); + &au_active.merchant_pub, + &au_active.payto_uri); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: @@ -748,7 +906,7 @@ run_aggregation (void *cls) { uint64_t counter = s->work_counter; struct GNUNET_TIME_Relative duration - = GNUNET_TIME_absolute_get_duration (s->start_time); + = GNUNET_TIME_absolute_get_duration (s->start_time.abs_time); cleanup_au (&au_active); db_plugin->rollback (db_plugin->cls); @@ -756,216 +914,71 @@ run_aggregation (void *cls) "Completed shard [%u,%u] after %s with %llu deposits\n", (unsigned int) s->shard_start, (unsigned int) s->shard_end, - GNUNET_STRINGS_relative_time_to_string (duration, - GNUNET_YES), + GNUNET_TIME_relative2s (duration, + true), (unsigned long long) counter); release_shard (s); if ( (GNUNET_YES == test_mode) && (0 == counter) ) { /* in test mode, shutdown after a shard is done with 0 work */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No work done and in test mode, shutting down\n"); GNUNET_SCHEDULER_shutdown (); return; } GNUNET_assert (NULL == task); /* If we ended up doing zero work, sleep a bit */ if (0 == counter) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Going to sleep for %s before trying again\n", + GNUNET_TIME_relative2s (aggregator_idle_sleep_interval, + true)); task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, - &run_shard, + &drain_kyc_alerts, NULL); + } else - task = GNUNET_SCHEDULER_add_now (&run_shard, + { + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); + } return; } case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: s->work_counter++; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Found ready deposit!\n"); /* continued below */ break; } - /* Now try to find other deposits to aggregate */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Found ready deposit for %s, aggregating\n", - TALER_B2S (&au_active.merchant_pub)); - qs = db_plugin->iterate_matching_deposits (db_plugin->cls, - &au_active.h_wire, - &au_active.merchant_pub, - &aggregate_cb, - &au_active, - TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT); - if (GNUNET_DB_STATUS_HARD_ERROR == qs) + TALER_payto_hash (au_active.payto_uri, + &au_active.h_payto); + ret = do_aggregate (&au_active); + cleanup_au (&au_active); + switch (ret) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to execute deposit iteration!\n"); - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); + case GNUNET_SYSERR: global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); release_shard (s); return; - } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - /* serializiability issue, try again */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Serialization issue, trying again later!\n"); + case GNUNET_NO: db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); return; + case GNUNET_OK: + /* continued below */ + break; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Found %d other deposits to combine into wire transfer.\n", - qs); - - /* Subtract wire transfer fee and round to the unit supported by the - wire transfer method; Check if after rounding down, we still have - an amount to transfer, and if not mark as 'tiny'. */ - if ( (0 >= - TALER_amount_subtract (&au_active.final_amount, - &au_active.total_amount, - &au_active.wire_fee)) || - (GNUNET_SYSERR == - TALER_amount_round_down (&au_active.final_amount, - ¤cy_round_unit)) || - ( (0 == au_active.final_amount.value) && - (0 == au_active.final_amount.fraction) ) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Aggregate value too low for transfer (%d/%s)\n", - qs, - TALER_amount2s (&au_active.final_amount)); - /* Rollback ongoing transaction, as we will not use the respective - WTID and thus need to remove the tracking data */ - db_plugin->rollback (db_plugin->cls); - - /* There were results, just the value was too low. Start another - transaction to mark all* of the selected deposits as minor! */ - if (GNUNET_OK != - db_plugin->start (db_plugin->cls, - "aggregator mark tiny transactions")) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = EXIT_FAILURE; - cleanup_au (&au_active); - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; - } - /* Mark transactions by row_id as minor */ - qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - au_active.row_id); - if (0 <= qs) - { - for (unsigned int i = 0; i<au_active.rows_offset; i++) - { - qs = db_plugin->mark_deposit_tiny (db_plugin->cls, - au_active.additional_rows[i]); - if (0 > qs) - break; - } - } - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Serialization issue, trying again later!\n"); - db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); - /* start again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; - } - if (GNUNET_DB_STATUS_HARD_ERROR == qs) - { - db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; - } - /* commit */ - (void) commit_or_warn (); - cleanup_au (&au_active); - /* start again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; - } GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Preparing wire transfer of %s to %s\n", - TALER_amount2s (&au_active.final_amount), - TALER_B2S (&au_active.merchant_pub)); - { - void *buf; - size_t buf_size; - - { - char *url; - - url = TALER_JSON_wire_to_payto (au_active.wire); - TALER_BANK_prepare_transfer (url, - &au_active.final_amount, - exchange_base_url, - &au_active.wtid, - &buf, - &buf_size); - GNUNET_free (url); - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Storing %u bytes of wire prepare data\n", - (unsigned int) buf_size); - /* Commit our intention to execute the wire transfer! */ - qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, - au_active.wa->method, - buf, - buf_size); - GNUNET_free (buf); - } - /* Commit the WTID data to 'wire_out' to finally satisfy aggregation - table constraints */ - if (qs >= 0) - qs = db_plugin->store_wire_transfer_out (db_plugin->cls, - au_active.execution_time, - &au_active.wtid, - au_active.wire, - au_active.wa->section_name, - &au_active.final_amount); - cleanup_au (&au_active); - - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) - { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Serialization issue for prepared wire data; trying again later!\n"); - db_plugin->rollback (db_plugin->cls); - /* start again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; - } - if (GNUNET_DB_STATUS_HARD_ERROR == qs) - { - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls); - /* die hard */ - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; - } - - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Stored wire transfer out instructions\n"); + "Committing aggregation result\n"); /* Now we can finally commit the overall transaction, as we are again consistent if all of this passes. */ @@ -973,8 +986,8 @@ run_aggregation (void *cls) { case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Commit issue for prepared wire data; trying again later!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue on commit; trying again later!\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); @@ -983,11 +996,12 @@ run_aggregation (void *cls) GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ release_shard (s); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Preparation complete, going again\n"); + "Commit complete, going again\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); @@ -996,6 +1010,7 @@ run_aggregation (void *cls) GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ release_shard (s); return; } @@ -1015,6 +1030,8 @@ run_shard (void *cls) (void) cls; task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Running aggregation shard\n"); if (GNUNET_SYSERR == db_plugin->preflight (db_plugin->cls)) { @@ -1025,7 +1042,7 @@ run_shard (void *cls) return; } s = GNUNET_new (struct Shard); - s->start_time = GNUNET_TIME_absolute_get (); + s->start_time = GNUNET_TIME_timestamp_get (); qs = db_plugin->begin_revolving_shard (db_plugin->cls, "aggregator", shard_size, @@ -1041,6 +1058,7 @@ run_shard (void *cls) GNUNET_free (s); delay = GNUNET_TIME_randomized_backoff (delay, GNUNET_TIME_UNIT_SECONDS); + GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_delayed (delay, &run_shard, NULL); @@ -1058,12 +1076,207 @@ run_shard (void *cls) "Starting shard [%u:%u]!\n", (unsigned int) s->shard_start, (unsigned int) s->shard_end); + GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); } /** + * Function called on transient aggregations matching + * a particular hash of a payto URI. + * + * @param cls + * @param payto_uri corresponding payto URI + * @param wtid wire transfer identifier of transient aggregation + * @param merchant_pub public key of the merchant + * @param total amount aggregated so far + * @return true to continue to iterate + */ +static bool +handle_transient_cb ( + void *cls, + const char *payto_uri, + const struct TALER_WireTransferIdentifierRawP *wtid, + const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_Amount *total) +{ + struct AggregationUnit *au = cls; + + if (GNUNET_OK != au->ret) + { + GNUNET_break (0); + return false; + } + au->payto_uri = GNUNET_strdup (payto_uri); + au->wtid = *wtid; + au->merchant_pub = *merchant_pub; + au->trans = *total; + au->have_transient = true; + au->ret = do_aggregate (au); + GNUNET_free (au->payto_uri); + return (GNUNET_OK == au->ret); +} + + +static void +drain_kyc_alerts (void *cls) +{ + enum GNUNET_DB_QueryStatus qs; + struct AggregationUnit au; + + (void) cls; + task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Draining KYC alerts\n"); + memset (&au, + 0, + sizeof (au)); + au.execution_time = GNUNET_TIME_timestamp_get (); + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + if (GNUNET_OK != + db_plugin->start (db_plugin->cls, + "handle kyc alerts")) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to start database transaction!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + while (1) + { + qs = db_plugin->drain_kyc_alert (db_plugin->cls, + 1, + &au.h_payto); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + qs = db_plugin->commit (db_plugin->cls); + if (qs < 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to commit KYC drain\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_shard, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* handled below */ + break; + } + + au.ret = GNUNET_OK; + qs = db_plugin->find_aggregation_transient (db_plugin->cls, + &au.h_payto, + &handle_transient_cb, + &au); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to lookup transient aggregates!\n"); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* serializiability issue, try again */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Serialization issue, trying again later!\n"); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + continue; /* while (1) */ + default: + break; + } + break; + } /* while(1) */ + + { + enum GNUNET_GenericReturnValue ret; + + ret = au.ret; + cleanup_au (&au); + switch (ret) + { + case GNUNET_SYSERR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ + return; + case GNUNET_NO: + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_OK: + /* continued below */ + break; + } + } + + switch (commit_or_warn ()) + { + case GNUNET_DB_STATUS_SOFT_ERROR: + /* try again */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue on commit; trying again later!\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Commit complete, going again\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + default: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ + return; + } +} + + +/** * First task. * * @param cls closure, NULL @@ -1083,7 +1296,8 @@ run (void *cls, (void) cfgfile; cfg = c; - if (GNUNET_OK != parse_wirewatch_config ()) + if (GNUNET_OK != + parse_aggregator_config ()) { cfg = NULL; global_ret = EXIT_NOTCONFIGURED; @@ -1104,11 +1318,18 @@ run (void *cls, shard_size = 1U + INT32_MAX; else shard_size = (uint32_t) ass; + if (GNUNET_OK != + TALER_KYCLOGIC_kyc_init (cfg)) + { + cfg = NULL; + global_ret = EXIT_NOTCONFIGURED; + return; + } + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_shard, + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - cls); } @@ -1130,6 +1351,10 @@ main (int argc, "test", "run in test mode and exit when idle", &test_mode), + GNUNET_GETOPT_option_flag ('y', + "kyc-off", + "perform wire transfers without KYC checks", + &kyc_off), GNUNET_GETOPT_OPTION_END }; enum GNUNET_GenericReturnValue ret; |