diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 967 |
1 files changed, 730 insertions, 237 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 9568f011e..691d65ae3 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2016-2021 Taler Systems SA + Copyright (C) 2016-2022 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -26,7 +26,9 @@ #include "taler_exchangedb_lib.h" #include "taler_exchangedb_plugin.h" #include "taler_json_lib.h" +#include "taler_kyclogic_lib.h" #include "taler_bank_service.h" +#include "taler_dbevents.h" /** @@ -43,6 +45,12 @@ struct AggregationUnit struct TALER_MerchantPublicKeyP merchant_pub; /** + * Transient amount already found aggregated, + * set only if @e have_transient is true. + */ + struct TALER_Amount trans; + + /** * Total amount to be transferred, before subtraction of @e fees.wire and rounding down. */ struct TALER_Amount total_amount; @@ -84,6 +92,25 @@ struct AggregationUnit */ const struct TALER_EXCHANGEDB_AccountInfo *wa; + /** + * Row in KYC table for legitimization requirements + * that are pending for this aggregation, or 0 if none. + */ + uint64_t requirement_row; + + /** + * Set to #GNUNET_OK during transient checking + * while everything is OK. Otherwise see return + * value of #do_aggregate(). + */ + enum GNUNET_GenericReturnValue ret; + + /** + * Do we have an entry in the transient table for + * this aggregation? + */ + bool have_transient; + }; @@ -123,6 +150,12 @@ struct Shard static struct TALER_Amount currency_round_unit; /** + * What is the largest amount we transfer before triggering + * an AML check? + */ +static struct TALER_Amount aml_threshold; + +/** * What is the base URL of this exchange? Used in the * wire transfer subjects so that merchants and governments * can ask for the list of aggregated deposits. @@ -151,7 +184,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin; */ static struct GNUNET_SCHEDULER_Task *task; - /** * How long should we sleep when idle before trying to find more work? */ @@ -186,12 +218,12 @@ run_aggregation (void *cls); /** - * Select a shard to work on. + * Work on transactions unlocked by KYC. * * @param cls NULL */ static void -run_shard (void *cls); +drain_kyc_alerts (void *cls); /** @@ -226,6 +258,7 @@ shutdown_task (void *cls) GNUNET_SCHEDULER_cancel (task); task = NULL; } + TALER_KYCLOGIC_kyc_done (); TALER_EXCHANGEDB_plugin_unload (db_plugin); db_plugin = NULL; TALER_EXCHANGEDB_unload_accounts (); @@ -234,12 +267,12 @@ shutdown_task (void *cls) /** - * Parse the configuration for wirewatch. + * Parse the configuration for aggregator. * * @return #GNUNET_OK on success */ static enum GNUNET_GenericReturnValue -parse_wirewatch_config (void) +parse_aggregator_config (void) { if (GNUNET_OK != GNUNET_CONFIGURATION_get_value_string (cfg, @@ -268,11 +301,20 @@ parse_wirewatch_config (void) "taler", "CURRENCY_ROUND_UNIT", ¤cy_round_unit)) || - ( (0 != currency_round_unit.fraction) && - (0 != currency_round_unit.value) ) ) + (TALER_amount_is_zero (¤cy_round_unit)) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Need non-zero amount in section `taler' under `CURRENCY_ROUND_UNIT'\n"); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + TALER_config_get_amount (cfg, + "exchange", + "AML_THRESHOLD", + &aml_threshold)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Need non-zero value in section `TALER' under `CURRENCY_ROUND_UNIT'\n"); + "Need amount in section `exchange' under `AML_THRESHOLD'\n"); return GNUNET_SYSERR; } @@ -341,6 +383,7 @@ release_shard (struct Shard *s) case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); GNUNET_break (0); + global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: @@ -353,109 +396,286 @@ release_shard (struct Shard *s) } +/** + * Trigger the wire transfer for the @a au_active + * and delete the record of the aggregation. + * + * @param au_active information about the aggregation + */ +static enum GNUNET_DB_QueryStatus +trigger_wire_transfer (const struct AggregationUnit *au_active) +{ + enum GNUNET_DB_QueryStatus qs; + + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Preparing wire transfer of %s to %s\n", + TALER_amount2s (&au_active->final_amount), + TALER_B2S (&au_active->merchant_pub)); + { + void *buf; + size_t buf_size; + + TALER_BANK_prepare_transfer (au_active->payto_uri, + &au_active->final_amount, + exchange_base_url, + &au_active->wtid, + &buf, + &buf_size); + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Storing %u bytes of wire prepare data\n", + (unsigned int) buf_size); + /* Commit our intention to execute the wire transfer! */ + qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, + au_active->wa->method, + buf, + buf_size); + GNUNET_free (buf); + } + /* Commit the WTID data to 'wire_out' */ + if (qs >= 0) + qs = db_plugin->store_wire_transfer_out (db_plugin->cls, + au_active->execution_time, + &au_active->wtid, + &au_active->h_payto, + au_active->wa->section_name, + &au_active->final_amount); + + if ( (qs >= 0) && + au_active->have_transient) + qs = db_plugin->delete_aggregation_transient (db_plugin->cls, + &au_active->h_payto, + &au_active->wtid); + return qs; +} + + +/** + * Callback to return all applicable amounts for the KYC + * decision to @ a cb. + * + * @param cls a `struct AggregationUnit *` + * @param limit time limit for the iteration + * @param cb function to call with the amounts + * @param cb_cls closure for @a cb + */ static void -run_aggregation (void *cls) +return_relevant_amounts (void *cls, + struct GNUNET_TIME_Absolute limit, + TALER_EXCHANGEDB_KycAmountCallback cb, + void *cb_cls) { - struct Shard *s = cls; - struct AggregationUnit au_active; + const struct AggregationUnit *au_active = cls; enum GNUNET_DB_QueryStatus qs; - struct TALER_Amount trans; - bool have_transient = true; /* squash compiler warning */ - task = NULL; GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Checking for ready deposits to aggregate\n"); - /* make sure we have current fees */ - memset (&au_active, - 0, - sizeof (au_active)); - au_active.execution_time = GNUNET_TIME_timestamp_get (); + "Returning amount %s in KYC check\n", + TALER_amount2s (&au_active->total_amount)); if (GNUNET_OK != - db_plugin->start_deferred_wire_out (db_plugin->cls)) + cb (cb_cls, + &au_active->total_amount, + GNUNET_TIME_absolute_get ())) + return; + qs = db_plugin->select_aggregation_amounts_for_kyc_check ( + db_plugin->cls, + &au_active->h_payto, + limit, + cb, + cb_cls); + if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to start database transaction!\n"); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; + "Failed to select aggregation amounts for KYC limit check!\n"); } - qs = db_plugin->get_ready_deposit ( +} + + +/** + * Test if KYC is required for a transfer to @a h_payto. + * + * @param[in,out] au_active aggregation unit to check for + * @return true if KYC checks are satisfied + */ +static bool +kyc_satisfied (struct AggregationUnit *au_active) +{ + char *requirement; + enum GNUNET_DB_QueryStatus qs; + + if (kyc_off) + return true; + qs = TALER_KYCLOGIC_kyc_test_required ( + TALER_KYCLOGIC_KYC_TRIGGER_DEPOSIT, + &au_active->h_payto, + db_plugin->select_satisfied_kyc_processes, db_plugin->cls, - s->shard_start, - s->shard_end, - kyc_off ? true : false, - &au_active.merchant_pub, - &au_active.payto_uri); - switch (qs) + &return_relevant_amounts, + (void *) au_active, + &requirement); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + if (NULL == requirement) + return true; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "KYC requirement for %s is %s\n", + TALER_amount2s (&au_active->total_amount), + requirement); + qs = db_plugin->insert_kyc_requirement_for_account ( + db_plugin->cls, + requirement, + &au_active->h_payto, + NULL, /* not a reserve */ + &au_active->requirement_row); + if (qs < 0) { - case GNUNET_DB_STATUS_HARD_ERROR: - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Failed to begin deposit iteration!\n"); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; - case GNUNET_DB_STATUS_SOFT_ERROR: - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; - case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: - { - uint64_t counter = s->work_counter; - struct GNUNET_TIME_Relative duration - = GNUNET_TIME_absolute_get_duration (s->start_time.abs_time); + "Failed to persist KYC requirement `%s' in DB!\n", + requirement); + } + else + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Legitimization process %llu started\n", + (unsigned long long) au_active->requirement_row); + } + GNUNET_free (requirement); + return false; +} - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Completed shard [%u,%u] after %s with %llu deposits\n", - (unsigned int) s->shard_start, - (unsigned int) s->shard_end, - GNUNET_TIME_relative2s (duration, - true), - (unsigned long long) counter); - release_shard (s); - if ( (GNUNET_YES == test_mode) && - (0 == counter) ) - { - /* in test mode, shutdown after a shard is done with 0 work */ - GNUNET_SCHEDULER_shutdown (); - return; - } - GNUNET_assert (NULL == task); - /* If we ended up doing zero work, sleep a bit */ - if (0 == counter) - task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, - &run_shard, - NULL); - else - task = GNUNET_SCHEDULER_add_now (&run_shard, - NULL); - return; + +/** + * Function called on each @a amount that was found to + * be relevant for an AML check. + * + * @param cls closure with the `struct TALER_Amount *` where we store the sum + * @param amount encountered transaction amount + * @param date when was the amount encountered + * @return #GNUNET_OK to continue to iterate, + * #GNUNET_NO to abort iteration + * #GNUNET_SYSERR on internal error (also abort itaration) + */ +static enum GNUNET_GenericReturnValue +sum_for_aml ( + void *cls, + const struct TALER_Amount *amount, + struct GNUNET_TIME_Absolute date) +{ + struct TALER_Amount *sum = cls; + + (void) date; + if (0 > + TALER_amount_add (sum, + sum, + amount)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Test if AML is required for a transfer to @a h_payto. + * + * @param[in,out] au_active aggregation unit to check for + * @return true if AML checks are satisfied + */ +static bool +aml_satisfied (struct AggregationUnit *au_active) +{ + enum GNUNET_DB_QueryStatus qs; + struct TALER_Amount total; + struct TALER_Amount threshold; + enum TALER_AmlDecisionState decision; + struct TALER_EXCHANGEDB_KycStatus kyc; + + total = au_active->final_amount; + qs = db_plugin->select_aggregation_amounts_for_kyc_check ( + db_plugin->cls, + &au_active->h_payto, + GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), + GNUNET_TIME_UNIT_MONTHS), + &sum_for_aml, + &total); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + qs = db_plugin->select_aml_threshold (db_plugin->cls, + &au_active->h_payto, + &decision, + &kyc, + &threshold); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + { + threshold = aml_threshold; /* use default */ + decision = TALER_AML_NORMAL; + } + switch (decision) + { + case TALER_AML_NORMAL: + if (0 >= TALER_amount_cmp (&total, + &threshold)) + { + /* total <= threshold, do nothing */ + return true; } - case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - s->work_counter++; - /* continued below */ - break; + qs = db_plugin->trigger_aml_process (db_plugin->cls, + &au_active->h_payto, + &total); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + return false; + case TALER_AML_PENDING: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "AML already pending, doing nothing\n"); + return false; + case TALER_AML_FROZEN: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Account frozen, doing nothing\n"); + return false; } - au_active.wa = TALER_EXCHANGEDB_find_account_by_payto_uri ( - au_active.payto_uri); - if (NULL == au_active.wa) + GNUNET_assert (0); + return false; +} + + +/** + * Perform the main aggregation work for @a au. Expects to be in + * a working transaction, which the caller must also ultimately commit + * (or rollback) depending on our return value. + * + * @param[in,out] au aggregation unit to work on + * @return #GNUNET_OK if aggregation succeeded, + * #GNUNET_NO to rollback and try again (serialization issue) + * #GNUNET_SYSERR hard error, terminate aggregator process + */ +static enum GNUNET_GenericReturnValue +do_aggregate (struct AggregationUnit *au) +{ + enum GNUNET_DB_QueryStatus qs; + + au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri ( + au->payto_uri); + if (NULL == au->wa) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "No exchange account configured for `%s', please fix your setup to continue!\n", - au_active.payto_uri); + au->payto_uri); global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - db_plugin->rollback (db_plugin->cls); - release_shard (s); - return; + return GNUNET_SYSERR; } { @@ -464,234 +684,301 @@ run_aggregation (void *cls) struct TALER_MasterSignatureP master_sig; qs = db_plugin->get_wire_fee (db_plugin->cls, - au_active.wa->method, - au_active.execution_time, + au->wa->method, + au->execution_time, &start_date, &end_date, - &au_active.fees, + &au->fees, &master_sig); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Could not get wire fees for %s at %s. Aborting run.\n", - au_active.wa->method, - GNUNET_TIME_timestamp2s (au_active.execution_time)); + au->wa->method, + GNUNET_TIME_timestamp2s (au->execution_time)); global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - db_plugin->rollback (db_plugin->cls); - release_shard (s); - return; + return GNUNET_SYSERR; } } - /* Now try to find other deposits to aggregate */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Found ready deposit for %s, aggregating by target %s\n", - TALER_B2S (&au_active.merchant_pub), - au_active.payto_uri); - TALER_payto_hash (au_active.payto_uri, - &au_active.h_payto); - + TALER_B2S (&au->merchant_pub), + au->payto_uri); qs = db_plugin->select_aggregation_transient (db_plugin->cls, - &au_active.h_payto, - au_active.wa->section_name, - &au_active.wtid, - &trans); + &au->h_payto, + &au->merchant_pub, + au->wa->section_name, + &au->wtid, + &au->trans); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to lookup transient aggregates!\n"); - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; + return GNUNET_SYSERR; case GNUNET_DB_STATUS_SOFT_ERROR: /* serializiability issue, try again */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Serialization issue, trying again later!\n"); - db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; + return GNUNET_NO; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, - &au_active.wtid, - sizeof (au_active.wtid)); - have_transient = false; + &au->wtid, + sizeof (au->wtid)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No transient aggregation found, starting %s\n", + TALER_B2S (&au->wtid)); + au->have_transient = false; break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: - have_transient = true; + au->have_transient = true; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transient aggregation found, resuming %s\n", + TALER_B2S (&au->wtid)); break; } qs = db_plugin->aggregate (db_plugin->cls, - &au_active.h_payto, - &au_active.merchant_pub, - &au_active.wtid, - &au_active.total_amount); + &au->h_payto, + &au->merchant_pub, + &au->wtid, + &au->total_amount); if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Failed to execute aggregation!\n"); - cleanup_au (&au_active); - db_plugin->rollback (db_plugin->cls); global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; + return GNUNET_SYSERR; } if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { /* serializiability issue, try again */ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Serialization issue, trying again later!\n"); - db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; + return GNUNET_NO; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Aggregation total is %s.\n", - TALER_amount2s (&au_active.total_amount)); - + TALER_amount2s (&au->total_amount)); /* Subtract wire transfer fee and round to the unit supported by the wire transfer method; Check if after rounding down, we still have an amount to transfer, and if not mark as 'tiny'. */ - if (have_transient) + if (au->have_transient) GNUNET_assert (0 <= - TALER_amount_add (&au_active.total_amount, - &au_active.total_amount, - &trans)); + TALER_amount_add (&au->total_amount, + &au->total_amount, + &au->trans)); + + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Rounding aggregate of %s\n", - TALER_amount2s (&au_active.total_amount)); + TALER_amount2s (&au->total_amount)); if ( (0 >= - TALER_amount_subtract (&au_active.final_amount, - &au_active.total_amount, - &au_active.fees.wire)) || + TALER_amount_subtract (&au->final_amount, + &au->total_amount, + &au->fees.wire)) || (GNUNET_SYSERR == - TALER_amount_round_down (&au_active.final_amount, + TALER_amount_round_down (&au->final_amount, ¤cy_round_unit)) || - (TALER_amount_is_zero (&au_active.final_amount)) ) + (TALER_amount_is_zero (&au->final_amount)) || + (! kyc_satisfied (au)) || + (! aml_satisfied (au)) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Aggregate value too low for transfer (%d/%s)\n", + "Not ready for wire transfer (%d/%s)\n", qs, - TALER_amount2s (&au_active.final_amount)); - if (have_transient) + TALER_amount2s (&au->final_amount)); + if (au->have_transient) qs = db_plugin->update_aggregation_transient (db_plugin->cls, - &au_active.h_payto, - &au_active.wtid, - &au_active.total_amount); + &au->h_payto, + &au->wtid, + au->requirement_row, + &au->total_amount); else qs = db_plugin->create_aggregation_transient (db_plugin->cls, - &au_active.h_payto, - au_active.wa->section_name, - &au_active.wtid, - &au_active.total_amount); + &au->h_payto, + au->wa->section_name, + &au->merchant_pub, + &au->wtid, + au->requirement_row, + &au->total_amount); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Serialization issue, trying again later!\n"); - db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); - /* start again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; + return GNUNET_NO; } if (GNUNET_DB_STATUS_HARD_ERROR == qs) { GNUNET_break (0); - db_plugin->rollback (db_plugin->cls); - cleanup_au (&au_active); global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - release_shard (s); - return; + return GNUNET_SYSERR; } /* commit */ - (void) commit_or_warn (); - cleanup_au (&au_active); + return GNUNET_OK; + } - /* start again */ - GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_aggregation, - s); - return; + qs = trigger_wire_transfer (au); + switch (qs) + { + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue during aggregation; trying again later!\n") + ; + return GNUNET_NO; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + return GNUNET_SYSERR; + default: + break; } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Preparing wire transfer of %s to %s\n", - TALER_amount2s (&au_active.final_amount), - TALER_B2S (&au_active.merchant_pub)); { - void *buf; - size_t buf_size; - - TALER_BANK_prepare_transfer (au_active.payto_uri, - &au_active.final_amount, - exchange_base_url, - &au_active.wtid, - &buf, - &buf_size); - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Storing %u bytes of wire prepare data\n", - (unsigned int) buf_size); - /* Commit our intention to execute the wire transfer! */ - qs = db_plugin->wire_prepare_data_insert (db_plugin->cls, - au_active.wa->method, - buf, - buf_size); - GNUNET_free (buf); + struct TALER_CoinDepositEventP rep = { + .header.size = htons (sizeof (rep)), + .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED), + .merchant_pub = au->merchant_pub + }; + + db_plugin->event_notify (db_plugin->cls, + &rep.header, + NULL, + 0); } - /* Commit the WTID data to 'wire_out' */ - if (qs >= 0) - qs = db_plugin->store_wire_transfer_out (db_plugin->cls, - au_active.execution_time, - &au_active.wtid, - &au_active.h_payto, - au_active.wa->section_name, - &au_active.final_amount); + return GNUNET_OK; - if ( (qs >= 0) && - have_transient) - qs = db_plugin->delete_aggregation_transient (db_plugin->cls, - &au_active.h_payto, - &au_active.wtid); - cleanup_au (&au_active); +} - if (GNUNET_DB_STATUS_SOFT_ERROR == qs) + +static void +run_aggregation (void *cls) +{ + struct Shard *s = cls; + struct AggregationUnit au_active; + enum GNUNET_DB_QueryStatus qs; + enum GNUNET_GenericReturnValue ret; + + task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Checking for ready deposits to aggregate\n"); + /* make sure we have current fees */ + memset (&au_active, + 0, + sizeof (au_active)); + au_active.execution_time = GNUNET_TIME_timestamp_get (); + if (GNUNET_OK != + db_plugin->start_deferred_wire_out (db_plugin->cls)) { - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Serialization issue for prepared wire data; trying again later!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to start database transaction!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + } + qs = db_plugin->get_ready_deposit ( + db_plugin->cls, + s->shard_start, + s->shard_end, + &au_active.merchant_pub, + &au_active.payto_uri); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + cleanup_au (&au_active); + db_plugin->rollback (db_plugin->cls); + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to begin deposit iteration!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + release_shard (s); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + cleanup_au (&au_active); db_plugin->rollback (db_plugin->cls); - /* start again */ GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + { + uint64_t counter = s->work_counter; + struct GNUNET_TIME_Relative duration + = GNUNET_TIME_absolute_get_duration (s->start_time.abs_time); + + cleanup_au (&au_active); + db_plugin->rollback (db_plugin->cls); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Completed shard [%u,%u] after %s with %llu deposits\n", + (unsigned int) s->shard_start, + (unsigned int) s->shard_end, + GNUNET_TIME_relative2s (duration, + true), + (unsigned long long) counter); + release_shard (s); + if ( (GNUNET_YES == test_mode) && + (0 == counter) ) + { + /* in test mode, shutdown after a shard is done with 0 work */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No work done and in test mode, shutting down\n"); + GNUNET_SCHEDULER_shutdown (); + return; + } + GNUNET_assert (NULL == task); + /* If we ended up doing zero work, sleep a bit */ + if (0 == counter) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Going to sleep for %s before trying again\n", + GNUNET_TIME_relative2s (aggregator_idle_sleep_interval, + true)); + task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, + &drain_kyc_alerts, + NULL); + } + else + { + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + } + return; + } + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + s->work_counter++; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Found ready deposit!\n"); + /* continued below */ + break; } - if (GNUNET_DB_STATUS_HARD_ERROR == qs) + + TALER_payto_hash (au_active.payto_uri, + &au_active.h_payto); + ret = do_aggregate (&au_active); + cleanup_au (&au_active); + switch (ret) { - GNUNET_break (0); - db_plugin->rollback (db_plugin->cls); - /* die hard */ + case GNUNET_SYSERR: global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); release_shard (s); return; + case GNUNET_NO: + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_aggregation, + s); + return; + case GNUNET_OK: + /* continued below */ + break; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Stored wire transfer out instructions\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Committing aggregation result\n"); /* Now we can finally commit the overall transaction, as we are again consistent if all of this passes. */ @@ -699,8 +986,8 @@ run_aggregation (void *cls) { case GNUNET_DB_STATUS_SOFT_ERROR: /* try again */ - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, - "Commit issue for prepared wire data; trying again later!\n"); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue on commit; trying again later!\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); @@ -714,7 +1001,7 @@ run_aggregation (void *cls) return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Preparation complete, going again\n"); + "Commit complete, going again\n"); GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); @@ -743,6 +1030,8 @@ run_shard (void *cls) (void) cls; task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Running aggregation shard\n"); if (GNUNET_SYSERR == db_plugin->preflight (db_plugin->cls)) { @@ -769,6 +1058,7 @@ run_shard (void *cls) GNUNET_free (s); delay = GNUNET_TIME_randomized_backoff (delay, GNUNET_TIME_UNIT_SECONDS); + GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_delayed (delay, &run_shard, NULL); @@ -786,12 +1076,207 @@ run_shard (void *cls) "Starting shard [%u:%u]!\n", (unsigned int) s->shard_start, (unsigned int) s->shard_end); + GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); } /** + * Function called on transient aggregations matching + * a particular hash of a payto URI. + * + * @param cls + * @param payto_uri corresponding payto URI + * @param wtid wire transfer identifier of transient aggregation + * @param merchant_pub public key of the merchant + * @param total amount aggregated so far + * @return true to continue to iterate + */ +static bool +handle_transient_cb ( + void *cls, + const char *payto_uri, + const struct TALER_WireTransferIdentifierRawP *wtid, + const struct TALER_MerchantPublicKeyP *merchant_pub, + const struct TALER_Amount *total) +{ + struct AggregationUnit *au = cls; + + if (GNUNET_OK != au->ret) + { + GNUNET_break (0); + return false; + } + au->payto_uri = GNUNET_strdup (payto_uri); + au->wtid = *wtid; + au->merchant_pub = *merchant_pub; + au->trans = *total; + au->have_transient = true; + au->ret = do_aggregate (au); + GNUNET_free (au->payto_uri); + return (GNUNET_OK == au->ret); +} + + +static void +drain_kyc_alerts (void *cls) +{ + enum GNUNET_DB_QueryStatus qs; + struct AggregationUnit au; + + (void) cls; + task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Draining KYC alerts\n"); + memset (&au, + 0, + sizeof (au)); + au.execution_time = GNUNET_TIME_timestamp_get (); + if (GNUNET_SYSERR == + db_plugin->preflight (db_plugin->cls)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to obtain database connection!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + if (GNUNET_OK != + db_plugin->start (db_plugin->cls, + "handle kyc alerts")) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to start database transaction!\n"); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + while (1) + { + qs = db_plugin->drain_kyc_alert (db_plugin->cls, + 1, + &au.h_payto); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + qs = db_plugin->commit (db_plugin->cls); + if (qs < 0) + GNUNET_log (GNUNET_ERROR_TYPE_WARNING, + "Failed to commit KYC drain\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&run_shard, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + /* handled below */ + break; + } + + au.ret = GNUNET_OK; + qs = db_plugin->find_aggregation_transient (db_plugin->cls, + &au.h_payto, + &handle_transient_cb, + &au); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Failed to lookup transient aggregates!\n"); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* serializiability issue, try again */ + GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + "Serialization issue, trying again later!\n"); + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + continue; /* while (1) */ + default: + break; + } + break; + } /* while(1) */ + + { + enum GNUNET_GenericReturnValue ret; + + ret = au.ret; + cleanup_au (&au); + switch (ret) + { + case GNUNET_SYSERR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ + return; + case GNUNET_NO: + db_plugin->rollback (db_plugin->cls); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_OK: + /* continued below */ + break; + } + } + + switch (commit_or_warn ()) + { + case GNUNET_DB_STATUS_SOFT_ERROR: + /* try again */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Serialization issue on commit; trying again later!\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ + return; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Commit complete, going again\n"); + GNUNET_assert (NULL == task); + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, + NULL); + return; + default: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + db_plugin->rollback (db_plugin->cls); /* just in case */ + return; + } +} + + +/** * First task. * * @param cls closure, NULL @@ -811,7 +1296,8 @@ run (void *cls, (void) cfgfile; cfg = c; - if (GNUNET_OK != parse_wirewatch_config ()) + if (GNUNET_OK != + parse_aggregator_config ()) { cfg = NULL; global_ret = EXIT_NOTCONFIGURED; @@ -832,11 +1318,18 @@ run (void *cls, shard_size = 1U + INT32_MAX; else shard_size = (uint32_t) ass; + if (GNUNET_OK != + TALER_KYCLOGIC_kyc_init (cfg)) + { + cfg = NULL; + global_ret = EXIT_NOTCONFIGURED; + return; + } + GNUNET_SCHEDULER_add_shutdown (&shutdown_task, + NULL); GNUNET_assert (NULL == task); - task = GNUNET_SCHEDULER_add_now (&run_shard, + task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); - GNUNET_SCHEDULER_add_shutdown (&shutdown_task, - cls); } |