summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-aggregator.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r--src/exchange/taler-exchange-aggregator.c190
1 files changed, 180 insertions, 10 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index dce6b2df4..691d65ae3 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -28,6 +28,7 @@
#include "taler_json_lib.h"
#include "taler_kyclogic_lib.h"
#include "taler_bank_service.h"
+#include "taler_dbevents.h"
/**
@@ -149,6 +150,12 @@ struct Shard
static struct TALER_Amount currency_round_unit;
/**
+ * What is the largest amount we transfer before triggering
+ * an AML check?
+ */
+static struct TALER_Amount aml_threshold;
+
+/**
* What is the base URL of this exchange? Used in the
* wire transfer subjects so that merchants and governments
* can ask for the list of aggregated deposits.
@@ -294,11 +301,20 @@ parse_aggregator_config (void)
"taler",
"CURRENCY_ROUND_UNIT",
&currency_round_unit)) ||
- ( (0 != currency_round_unit.fraction) &&
- (0 != currency_round_unit.value) ) )
+ (TALER_amount_is_zero (&currency_round_unit)) )
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Need non-zero value in section `TALER' under `CURRENCY_ROUND_UNIT'\n");
+ "Need non-zero amount in section `taler' under `CURRENCY_ROUND_UNIT'\n");
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_OK !=
+ TALER_config_get_amount (cfg,
+ "exchange",
+ "AML_THRESHOLD",
+ &aml_threshold))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Need amount in section `exchange' under `AML_THRESHOLD'\n");
return GNUNET_SYSERR;
}
@@ -367,6 +383,7 @@ release_shard (struct Shard *s)
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
@@ -481,16 +498,24 @@ return_relevant_amounts (void *cls,
static bool
kyc_satisfied (struct AggregationUnit *au_active)
{
- const char *requirement;
+ char *requirement;
enum GNUNET_DB_QueryStatus qs;
- requirement = TALER_KYCLOGIC_kyc_test_required (
+ if (kyc_off)
+ return true;
+ qs = TALER_KYCLOGIC_kyc_test_required (
TALER_KYCLOGIC_KYC_TRIGGER_DEPOSIT,
&au_active->h_payto,
db_plugin->select_satisfied_kyc_processes,
db_plugin->cls,
&return_relevant_amounts,
- (void *) au_active);
+ (void *) au_active,
+ &requirement);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
if (NULL == requirement)
return true;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -501,6 +526,7 @@ kyc_satisfied (struct AggregationUnit *au_active)
db_plugin->cls,
requirement,
&au_active->h_payto,
+ NULL, /* not a reserve */
&au_active->requirement_row);
if (qs < 0)
{
@@ -514,6 +540,114 @@ kyc_satisfied (struct AggregationUnit *au_active)
"Legitimization process %llu started\n",
(unsigned long long) au_active->requirement_row);
}
+ GNUNET_free (requirement);
+ return false;
+}
+
+
+/**
+ * Function called on each @a amount that was found to
+ * be relevant for an AML check.
+ *
+ * @param cls closure with the `struct TALER_Amount *` where we store the sum
+ * @param amount encountered transaction amount
+ * @param date when was the amount encountered
+ * @return #GNUNET_OK to continue to iterate,
+ * #GNUNET_NO to abort iteration
+ * #GNUNET_SYSERR on internal error (also abort itaration)
+ */
+static enum GNUNET_GenericReturnValue
+sum_for_aml (
+ void *cls,
+ const struct TALER_Amount *amount,
+ struct GNUNET_TIME_Absolute date)
+{
+ struct TALER_Amount *sum = cls;
+
+ (void) date;
+ if (0 >
+ TALER_amount_add (sum,
+ sum,
+ amount))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Test if AML is required for a transfer to @a h_payto.
+ *
+ * @param[in,out] au_active aggregation unit to check for
+ * @return true if AML checks are satisfied
+ */
+static bool
+aml_satisfied (struct AggregationUnit *au_active)
+{
+ enum GNUNET_DB_QueryStatus qs;
+ struct TALER_Amount total;
+ struct TALER_Amount threshold;
+ enum TALER_AmlDecisionState decision;
+ struct TALER_EXCHANGEDB_KycStatus kyc;
+
+ total = au_active->final_amount;
+ qs = db_plugin->select_aggregation_amounts_for_kyc_check (
+ db_plugin->cls,
+ &au_active->h_payto,
+ GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (),
+ GNUNET_TIME_UNIT_MONTHS),
+ &sum_for_aml,
+ &total);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
+ qs = db_plugin->select_aml_threshold (db_plugin->cls,
+ &au_active->h_payto,
+ &decision,
+ &kyc,
+ &threshold);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ {
+ threshold = aml_threshold; /* use default */
+ decision = TALER_AML_NORMAL;
+ }
+ switch (decision)
+ {
+ case TALER_AML_NORMAL:
+ if (0 >= TALER_amount_cmp (&total,
+ &threshold))
+ {
+ /* total <= threshold, do nothing */
+ return true;
+ }
+ qs = db_plugin->trigger_aml_process (db_plugin->cls,
+ &au_active->h_payto,
+ &total);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
+ return false;
+ case TALER_AML_PENDING:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "AML already pending, doing nothing\n");
+ return false;
+ case TALER_AML_FROZEN:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Account frozen, doing nothing\n");
+ return false;
+ }
+ GNUNET_assert (0);
return false;
}
@@ -594,10 +728,16 @@ do_aggregate (struct AggregationUnit *au)
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
&au->wtid,
sizeof (au->wtid));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No transient aggregation found, starting %s\n",
+ TALER_B2S (&au->wtid));
au->have_transient = false;
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
au->have_transient = true;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Transient aggregation found, resuming %s\n",
+ TALER_B2S (&au->wtid));
break;
}
qs = db_plugin->aggregate (db_plugin->cls,
@@ -619,7 +759,7 @@ do_aggregate (struct AggregationUnit *au)
"Serialization issue, trying again later!\n");
return GNUNET_NO;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Aggregation total is %s.\n",
TALER_amount2s (&au->total_amount));
/* Subtract wire transfer fee and round to the unit supported by the
@@ -643,7 +783,8 @@ do_aggregate (struct AggregationUnit *au)
TALER_amount_round_down (&au->final_amount,
&currency_round_unit)) ||
(TALER_amount_is_zero (&au->final_amount)) ||
- (! kyc_satisfied (au)) )
+ (! kyc_satisfied (au)) ||
+ (! aml_satisfied (au)) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Not ready for wire transfer (%d/%s)\n",
@@ -684,15 +825,30 @@ do_aggregate (struct AggregationUnit *au)
{
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Serialization issue during aggregation; trying again later!\n");
+ "Serialization issue during aggregation; trying again later!\n")
+ ;
return GNUNET_NO;
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
global_ret = EXIT_FAILURE;
return GNUNET_SYSERR;
default:
- return GNUNET_OK;
+ break;
+ }
+ {
+ struct TALER_CoinDepositEventP rep = {
+ .header.size = htons (sizeof (rep)),
+ .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED),
+ .merchant_pub = au->merchant_pub
+ };
+
+ db_plugin->event_notify (db_plugin->cls,
+ &rep.header,
+ NULL,
+ 0);
}
+ return GNUNET_OK;
+
}
@@ -766,18 +922,28 @@ run_aggregation (void *cls)
(0 == counter) )
{
/* in test mode, shutdown after a shard is done with 0 work */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No work done and in test mode, shutting down\n");
GNUNET_SCHEDULER_shutdown ();
return;
}
GNUNET_assert (NULL == task);
/* If we ended up doing zero work, sleep a bit */
if (0 == counter)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Going to sleep for %s before trying again\n",
+ GNUNET_TIME_relative2s (aggregator_idle_sleep_interval,
+ true));
task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
&drain_kyc_alerts,
NULL);
+ }
else
+ {
task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
+ }
return;
}
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
@@ -795,6 +961,7 @@ run_aggregation (void *cls)
switch (ret)
{
case GNUNET_SYSERR:
+ global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
db_plugin->rollback (db_plugin->cls);
release_shard (s);
@@ -891,6 +1058,7 @@ run_shard (void *cls)
GNUNET_free (s);
delay = GNUNET_TIME_randomized_backoff (delay,
GNUNET_TIME_UNIT_SECONDS);
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_delayed (delay,
&run_shard,
NULL);
@@ -908,6 +1076,7 @@ run_shard (void *cls)
"Starting shard [%u:%u]!\n",
(unsigned int) s->shard_start,
(unsigned int) s->shard_end);
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
}
@@ -1058,6 +1227,7 @@ drain_kyc_alerts (void *cls)
{
case GNUNET_SYSERR:
GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
db_plugin->rollback (db_plugin->cls); /* just in case */
return;