diff options
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r-- | src/exchange/taler-exchange-aggregator.c | 220 |
1 files changed, 201 insertions, 19 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c index 2c2795358..691d65ae3 100644 --- a/src/exchange/taler-exchange-aggregator.c +++ b/src/exchange/taler-exchange-aggregator.c @@ -28,6 +28,7 @@ #include "taler_json_lib.h" #include "taler_kyclogic_lib.h" #include "taler_bank_service.h" +#include "taler_dbevents.h" /** @@ -92,6 +93,12 @@ struct AggregationUnit const struct TALER_EXCHANGEDB_AccountInfo *wa; /** + * Row in KYC table for legitimization requirements + * that are pending for this aggregation, or 0 if none. + */ + uint64_t requirement_row; + + /** * Set to #GNUNET_OK during transient checking * while everything is OK. Otherwise see return * value of #do_aggregate(). @@ -143,6 +150,12 @@ struct Shard static struct TALER_Amount currency_round_unit; /** + * What is the largest amount we transfer before triggering + * an AML check? + */ +static struct TALER_Amount aml_threshold; + +/** * What is the base URL of this exchange? Used in the * wire transfer subjects so that merchants and governments * can ask for the list of aggregated deposits. @@ -288,11 +301,20 @@ parse_aggregator_config (void) "taler", "CURRENCY_ROUND_UNIT", ¤cy_round_unit)) || - ( (0 != currency_round_unit.fraction) && - (0 != currency_round_unit.value) ) ) + (TALER_amount_is_zero (¤cy_round_unit)) ) + { + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Need non-zero amount in section `taler' under `CURRENCY_ROUND_UNIT'\n"); + return GNUNET_SYSERR; + } + if (GNUNET_OK != + TALER_config_get_amount (cfg, + "exchange", + "AML_THRESHOLD", + &aml_threshold)) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Need non-zero value in section `TALER' under `CURRENCY_ROUND_UNIT'\n"); + "Need amount in section `exchange' under `AML_THRESHOLD'\n"); return GNUNET_SYSERR; } @@ -361,6 +383,7 @@ release_shard (struct Shard *s) case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs); GNUNET_break (0); + global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: @@ -469,34 +492,42 @@ return_relevant_amounts (void *cls, /** * Test if KYC is required for a transfer to @a h_payto. * - * @param au_active aggregation unit to check for + * @param[in,out] au_active aggregation unit to check for * @return true if KYC checks are satisfied */ static bool -kyc_satisfied (const struct AggregationUnit *au_active) +kyc_satisfied (struct AggregationUnit *au_active) { - const char *requirement; - uint64_t legi_row; + char *requirement; enum GNUNET_DB_QueryStatus qs; - requirement = TALER_KYCLOGIC_kyc_test_required ( + if (kyc_off) + return true; + qs = TALER_KYCLOGIC_kyc_test_required ( TALER_KYCLOGIC_KYC_TRIGGER_DEPOSIT, &au_active->h_payto, db_plugin->select_satisfied_kyc_processes, db_plugin->cls, &return_relevant_amounts, - (void *) au_active); + (void *) au_active, + &requirement); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + if (NULL == requirement) + return true; GNUNET_log (GNUNET_ERROR_TYPE_INFO, "KYC requirement for %s is %s\n", TALER_amount2s (&au_active->total_amount), requirement); - if (NULL == requirement) - return true; qs = db_plugin->insert_kyc_requirement_for_account ( db_plugin->cls, requirement, &au_active->h_payto, - &legi_row); + NULL, /* not a reserve */ + &au_active->requirement_row); if (qs < 0) { GNUNET_log (GNUNET_ERROR_TYPE_ERROR, @@ -506,9 +537,117 @@ kyc_satisfied (const struct AggregationUnit *au_active) else { GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "New legitimization process %llu started\n", - (unsigned long long) legi_row); + "Legitimization process %llu started\n", + (unsigned long long) au_active->requirement_row); } + GNUNET_free (requirement); + return false; +} + + +/** + * Function called on each @a amount that was found to + * be relevant for an AML check. + * + * @param cls closure with the `struct TALER_Amount *` where we store the sum + * @param amount encountered transaction amount + * @param date when was the amount encountered + * @return #GNUNET_OK to continue to iterate, + * #GNUNET_NO to abort iteration + * #GNUNET_SYSERR on internal error (also abort itaration) + */ +static enum GNUNET_GenericReturnValue +sum_for_aml ( + void *cls, + const struct TALER_Amount *amount, + struct GNUNET_TIME_Absolute date) +{ + struct TALER_Amount *sum = cls; + + (void) date; + if (0 > + TALER_amount_add (sum, + sum, + amount)) + { + GNUNET_break (0); + return GNUNET_SYSERR; + } + return GNUNET_OK; +} + + +/** + * Test if AML is required for a transfer to @a h_payto. + * + * @param[in,out] au_active aggregation unit to check for + * @return true if AML checks are satisfied + */ +static bool +aml_satisfied (struct AggregationUnit *au_active) +{ + enum GNUNET_DB_QueryStatus qs; + struct TALER_Amount total; + struct TALER_Amount threshold; + enum TALER_AmlDecisionState decision; + struct TALER_EXCHANGEDB_KycStatus kyc; + + total = au_active->final_amount; + qs = db_plugin->select_aggregation_amounts_for_kyc_check ( + db_plugin->cls, + &au_active->h_payto, + GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), + GNUNET_TIME_UNIT_MONTHS), + &sum_for_aml, + &total); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + qs = db_plugin->select_aml_threshold (db_plugin->cls, + &au_active->h_payto, + &decision, + &kyc, + &threshold); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) + { + threshold = aml_threshold; /* use default */ + decision = TALER_AML_NORMAL; + } + switch (decision) + { + case TALER_AML_NORMAL: + if (0 >= TALER_amount_cmp (&total, + &threshold)) + { + /* total <= threshold, do nothing */ + return true; + } + qs = db_plugin->trigger_aml_process (db_plugin->cls, + &au_active->h_payto, + &total); + if (qs < 0) + { + GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + return false; + } + return false; + case TALER_AML_PENDING: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "AML already pending, doing nothing\n"); + return false; + case TALER_AML_FROZEN: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Account frozen, doing nothing\n"); + return false; + } + GNUNET_assert (0); return false; } @@ -589,10 +728,16 @@ do_aggregate (struct AggregationUnit *au) GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE, &au->wtid, sizeof (au->wtid)); + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No transient aggregation found, starting %s\n", + TALER_B2S (&au->wtid)); au->have_transient = false; break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: au->have_transient = true; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Transient aggregation found, resuming %s\n", + TALER_B2S (&au->wtid)); break; } qs = db_plugin->aggregate (db_plugin->cls, @@ -614,10 +759,9 @@ do_aggregate (struct AggregationUnit *au) "Serialization issue, trying again later!\n"); return GNUNET_NO; } - GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Aggregation total is %s.\n", TALER_amount2s (&au->total_amount)); - /* Subtract wire transfer fee and round to the unit supported by the wire transfer method; Check if after rounding down, we still have an amount to transfer, and if not mark as 'tiny'. */ @@ -639,7 +783,8 @@ do_aggregate (struct AggregationUnit *au) TALER_amount_round_down (&au->final_amount, ¤cy_round_unit)) || (TALER_amount_is_zero (&au->final_amount)) || - (! kyc_satisfied (au)) ) + (! kyc_satisfied (au)) || + (! aml_satisfied (au)) ) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Not ready for wire transfer (%d/%s)\n", @@ -649,6 +794,7 @@ do_aggregate (struct AggregationUnit *au) qs = db_plugin->update_aggregation_transient (db_plugin->cls, &au->h_payto, &au->wtid, + au->requirement_row, &au->total_amount); else qs = db_plugin->create_aggregation_transient (db_plugin->cls, @@ -656,6 +802,7 @@ do_aggregate (struct AggregationUnit *au) au->wa->section_name, &au->merchant_pub, &au->wtid, + au->requirement_row, &au->total_amount); if (GNUNET_DB_STATUS_SOFT_ERROR == qs) { @@ -678,15 +825,30 @@ do_aggregate (struct AggregationUnit *au) { case GNUNET_DB_STATUS_SOFT_ERROR: GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Serialization issue during aggregation; trying again later!\n"); + "Serialization issue during aggregation; trying again later!\n") + ; return GNUNET_NO; case GNUNET_DB_STATUS_HARD_ERROR: GNUNET_break (0); global_ret = EXIT_FAILURE; return GNUNET_SYSERR; default: - return GNUNET_OK; + break; + } + { + struct TALER_CoinDepositEventP rep = { + .header.size = htons (sizeof (rep)), + .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED), + .merchant_pub = au->merchant_pub + }; + + db_plugin->event_notify (db_plugin->cls, + &rep.header, + NULL, + 0); } + return GNUNET_OK; + } @@ -760,22 +922,34 @@ run_aggregation (void *cls) (0 == counter) ) { /* in test mode, shutdown after a shard is done with 0 work */ + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "No work done and in test mode, shutting down\n"); GNUNET_SCHEDULER_shutdown (); return; } GNUNET_assert (NULL == task); /* If we ended up doing zero work, sleep a bit */ if (0 == counter) + { + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Going to sleep for %s before trying again\n", + GNUNET_TIME_relative2s (aggregator_idle_sleep_interval, + true)); task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval, &drain_kyc_alerts, NULL); + } else + { task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts, NULL); + } return; } case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: s->work_counter++; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Found ready deposit!\n"); /* continued below */ break; } @@ -787,6 +961,7 @@ run_aggregation (void *cls) switch (ret) { case GNUNET_SYSERR: + global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); db_plugin->rollback (db_plugin->cls); release_shard (s); @@ -855,6 +1030,8 @@ run_shard (void *cls) (void) cls; task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Running aggregation shard\n"); if (GNUNET_SYSERR == db_plugin->preflight (db_plugin->cls)) { @@ -881,6 +1058,7 @@ run_shard (void *cls) GNUNET_free (s); delay = GNUNET_TIME_randomized_backoff (delay, GNUNET_TIME_UNIT_SECONDS); + GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_delayed (delay, &run_shard, NULL); @@ -898,6 +1076,7 @@ run_shard (void *cls) "Starting shard [%u:%u]!\n", (unsigned int) s->shard_start, (unsigned int) s->shard_end); + GNUNET_assert (NULL == task); task = GNUNET_SCHEDULER_add_now (&run_aggregation, s); } @@ -948,6 +1127,8 @@ drain_kyc_alerts (void *cls) (void) cls; task = NULL; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Draining KYC alerts\n"); memset (&au, 0, sizeof (au)); @@ -1046,6 +1227,7 @@ drain_kyc_alerts (void *cls) { case GNUNET_SYSERR: GNUNET_break (0); + global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); db_plugin->rollback (db_plugin->cls); /* just in case */ return; |