diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 190 |
1 files changed, 180 insertions, 10 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index dce6b2df4..691d65ae3 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -28,6 +28,7 @@ #include "taler_json_lib.h" #include "taler_kyclogic_lib.h" #include "taler_bank_service.h" +#include "taler_dbevents.h" /** @@ -149,6 +150,12 @@ struct Shard static struct TALER_Amount currency_round_unit; /** + * What is the largest amount we transfer before triggering + * an AML check? + */ +static struct TALER_Amount aml_threshold; + +/** * What is the base URL of this exchange? Used in the * wire transfer subjects so that merchants and governments * can ask for the list of aggregated deposits. @@ -294,11 +301,20 @@ parse_aggregator_config (void) "taler", "CURRENCY_ROUND_UNIT", ¤cy_round_unit)) || - ( (0 != currency_round_unit.fraction) && - (0 != currency_round_unit.value) ) ) + (TALER_amount_is_zero (¤cy_round_unit)) ) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Need non-zero value in section `TALER' under `CURRENCY_ROUND_UNIT'\n"); + "Need non-zero amount in section `taler' under `CURRENCY_ROUND_UNIT'\n"); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + TALER_config_get_amount (cfg, + "exchange", + "AML_THRESHOLD", + &aml_threshold)) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Need amount in section `exchange' under `AML_THRESHOLD'\n"); return GNUNET_SYSERR; } @@ -367,6 +383,7 @@ release_shard (struct Shard *s) case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); GNUNET_break (0); + global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: @@ -481,16 +498,24 @@ return_relevant_amounts (void *cls, static bool kyc_satisfied (struct AggregationUnit *au_active) { - const char *requirement; + char *requirement; enum GNUNET_DB_QueryStatus qs; - requirement = TALER_KYCLOGIC_kyc_test_required ( + if (kyc_off) + return true; + qs = TALER_KYCLOGIC_kyc_test_required ( TALER_KYCLOGIC_KYC_TRIGGER_DEPOSIT, &au_active->h_payto, db_plugin->select_satisfied_kyc_processes, db_plugin->cls, &return_relevant_amounts, - (void *) au_active); + (void *) au_active, + &requirement); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } if (NULL == requirement) return true; GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -501,6 +526,7 @@ kyc_satisfied (struct AggregationUnit *au_active) db_plugin->cls, requirement, &au_active->h_payto, + NULL, /* not a reserve */ &au_active->requirement_row); if (qs < 0) { @@ -514,6 +540,114 @@ kyc_satisfied (struct AggregationUnit *au_active) "Legitimization process %llu started\n", (unsigned long long) au_active->requirement_row); } + GNUNET_free (requirement); + return false; +} + + +/** + * Function called on each @a amount that was found to + * be relevant for an AML check. + * + * @param cls closure with the `struct TALER_Amount *` where we store the sum + * @param amount encountered transaction amount + * @param date when was the amount encountered + * @return #GNUNET_OK to continue to iterate, + * #GNUNET_NO to abort iteration + * #GNUNET_SYSERR on internal error (also abort itaration) + */ +static enum GNUNET_GenericReturnValue +sum_for_aml ( + void *cls, + const struct TALER_Amount *amount, + struct GNUNET_TIME_Absolute date) +{ + struct TALER_Amount *sum = cls; + + (void) date; + if (0 > + TALER_amount_add (sum, + sum, + amount)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Test if AML is required for a transfer to @a h_payto. + * + * @param[in,out] au_active aggregation unit to check for + * @return true if AML checks are satisfied + */ +static bool +aml_satisfied (struct AggregationUnit *au_active) +{ + enum GNUNET_DB_QueryStatus qs; + struct TALER_Amount total; + struct TALER_Amount threshold; + enum TALER_AmlDecisionState decision; + struct TALER_EXCHANGEDB_KycStatus kyc; + + total = au_active->final_amount; + qs = db_plugin->select_aggregation_amounts_for_kyc_check ( + db_plugin->cls, + &au_active->h_payto, + GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), + GNUNET_TIME_UNIT_MONTHS), + &sum_for_aml, + &total); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + qs = db_plugin->select_aml_threshold (db_plugin->cls, + &au_active->h_payto, + &decision, + &kyc, + &threshold); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + { + threshold = aml_threshold; /* use default */ + decision = TALER_AML_NORMAL; + } + switch (decision) + { + case TALER_AML_NORMAL: + if (0 >= TALER_amount_cmp (&total, + &threshold)) + { + /* total <= threshold, do nothing */ + return true; + } + qs = db_plugin->trigger_aml_process (db_plugin->cls, + &au_active->h_payto, + &total); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + return false; + case TALER_AML_PENDING: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "AML already pending, doing nothing\n"); + return false; + case TALER_AML_FROZEN: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Account frozen, doing nothing\n"); + return false; + } + GNUNET_assert (0); return false; } @@ -594,10 +728,16 @@ do_aggregate (struct AggregationUnit *au) GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, &au->wtid, sizeof (au->wtid)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No transient aggregation found, starting %s\n", + TALER_B2S (&au->wtid)); au->have_transient = false; break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: au->have_transient = true; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transient aggregation found, resuming %s\n", + TALER_B2S (&au->wtid)); break; } qs = db_plugin->aggregate (db_plugin->cls, @@ -619,7 +759,7 @@ do_aggregate (struct AggregationUnit *au) "Serialization issue, trying again later!\n"); return GNUNET_NO; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Aggregation total is %s.\n", TALER_amount2s (&au->total_amount)); /* Subtract wire transfer fee and round to the unit supported by the @@ -643,7 +783,8 @@ do_aggregate (struct AggregationUnit *au) TALER_amount_round_down (&au->final_amount, ¤cy_round_unit)) || (TALER_amount_is_zero (&au->final_amount)) || - (! kyc_satisfied (au)) ) + (! kyc_satisfied (au)) || + (! aml_satisfied (au)) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Not ready for wire transfer (%d/%s)\n", @@ -684,15 +825,30 @@ do_aggregate (struct AggregationUnit *au) { case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Serialization issue during aggregation; trying again later!\n"); + "Serialization issue during aggregation; trying again later!\n") + ; return GNUNET_NO; case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); global_ret = EXIT_FAILURE; return GNUNET_SYSERR; default: - return GNUNET_OK; + break; + } + { + struct TALER_CoinDepositEventP rep = { + .header.size = htons (sizeof (rep)), + .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED), + .merchant_pub = au->merchant_pub + }; + + db_plugin->event_notify (db_plugin->cls, + &rep.header, + NULL, + 0); } + return GNUNET_OK; + } @@ -766,18 +922,28 @@ run_aggregation (void *cls) (0 == counter) ) { /* in test mode, shutdown after a shard is done with 0 work */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No work done and in test mode, shutting down\n"); GNUNET_SCHEDULER_shutdown (); return; } GNUNET_assert (NULL == task); /* If we ended up doing zero work, sleep a bit */ if (0 == counter) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Going to sleep for %s before trying again\n", + GNUNET_TIME_relative2s (aggregator_idle_sleep_interval, + true)); task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, &drain_kyc_alerts, NULL); + } else + { task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); + } return; } case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: @@ -795,6 +961,7 @@ run_aggregation (void *cls) switch (ret) { case GNUNET_SYSERR: + global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); db_plugin->rollback (db_plugin->cls); release_shard (s); @@ -891,6 +1058,7 @@ run_shard (void *cls) GNUNET_free (s); delay = GNUNET_TIME_randomized_backoff (delay, GNUNET_TIME_UNIT_SECONDS); + GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_delayed (delay, &run_shard, NULL); @@ -908,6 +1076,7 @@ run_shard (void *cls) "Starting shard [%u:%u]!\n", (unsigned int) s->shard_start, (unsigned int) s->shard_end); + GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); } @@ -1058,6 +1227,7 @@ drain_kyc_alerts (void *cls) { case GNUNET_SYSERR: GNUNET_break (0); + global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); db_plugin->rollback (db_plugin->cls); /* just in case */ return; |