summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-aggregator.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r--src/exchange/taler-exchange-aggregator.c220
1 files changed, 201 insertions, 19 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index 2c2795358..691d65ae3 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -28,6 +28,7 @@
#include "taler_json_lib.h"
#include "taler_kyclogic_lib.h"
#include "taler_bank_service.h"
+#include "taler_dbevents.h"
/**
@@ -92,6 +93,12 @@ struct AggregationUnit
const struct TALER_EXCHANGEDB_AccountInfo *wa;
/**
+ * Row in KYC table for legitimization requirements
+ * that are pending for this aggregation, or 0 if none.
+ */
+ uint64_t requirement_row;
+
+ /**
* Set to #GNUNET_OK during transient checking
* while everything is OK. Otherwise see return
* value of #do_aggregate().
@@ -143,6 +150,12 @@ struct Shard
static struct TALER_Amount currency_round_unit;
/**
+ * What is the largest amount we transfer before triggering
+ * an AML check?
+ */
+static struct TALER_Amount aml_threshold;
+
+/**
* What is the base URL of this exchange? Used in the
* wire transfer subjects so that merchants and governments
* can ask for the list of aggregated deposits.
@@ -288,11 +301,20 @@ parse_aggregator_config (void)
"taler",
"CURRENCY_ROUND_UNIT",
&currency_round_unit)) ||
- ( (0 != currency_round_unit.fraction) &&
- (0 != currency_round_unit.value) ) )
+ (TALER_amount_is_zero (&currency_round_unit)) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Need non-zero amount in section `taler' under `CURRENCY_ROUND_UNIT'\n");
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_OK !=
+ TALER_config_get_amount (cfg,
+ "exchange",
+ "AML_THRESHOLD",
+ &aml_threshold))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Need non-zero value in section `TALER' under `CURRENCY_ROUND_UNIT'\n");
+ "Need amount in section `exchange' under `AML_THRESHOLD'\n");
return GNUNET_SYSERR;
}
@@ -361,6 +383,7 @@ release_shard (struct Shard *s)
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
@@ -469,34 +492,42 @@ return_relevant_amounts (void *cls,
/**
* Test if KYC is required for a transfer to @a h_payto.
*
- * @param au_active aggregation unit to check for
+ * @param[in,out] au_active aggregation unit to check for
* @return true if KYC checks are satisfied
*/
static bool
-kyc_satisfied (const struct AggregationUnit *au_active)
+kyc_satisfied (struct AggregationUnit *au_active)
{
- const char *requirement;
- uint64_t legi_row;
+ char *requirement;
enum GNUNET_DB_QueryStatus qs;
- requirement = TALER_KYCLOGIC_kyc_test_required (
+ if (kyc_off)
+ return true;
+ qs = TALER_KYCLOGIC_kyc_test_required (
TALER_KYCLOGIC_KYC_TRIGGER_DEPOSIT,
&au_active->h_payto,
db_plugin->select_satisfied_kyc_processes,
db_plugin->cls,
&return_relevant_amounts,
- (void *) au_active);
+ (void *) au_active,
+ &requirement);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
+ if (NULL == requirement)
+ return true;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"KYC requirement for %s is %s\n",
TALER_amount2s (&au_active->total_amount),
requirement);
- if (NULL == requirement)
- return true;
qs = db_plugin->insert_kyc_requirement_for_account (
db_plugin->cls,
requirement,
&au_active->h_payto,
- &legi_row);
+ NULL, /* not a reserve */
+ &au_active->requirement_row);
if (qs < 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
@@ -506,9 +537,117 @@ kyc_satisfied (const struct AggregationUnit *au_active)
else
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "New legitimization process %llu started\n",
- (unsigned long long) legi_row);
+ "Legitimization process %llu started\n",
+ (unsigned long long) au_active->requirement_row);
}
+ GNUNET_free (requirement);
+ return false;
+}
+
+
+/**
+ * Function called on each @a amount that was found to
+ * be relevant for an AML check.
+ *
+ * @param cls closure with the `struct TALER_Amount *` where we store the sum
+ * @param amount encountered transaction amount
+ * @param date when was the amount encountered
+ * @return #GNUNET_OK to continue to iterate,
+ * #GNUNET_NO to abort iteration
+ * #GNUNET_SYSERR on internal error (also abort itaration)
+ */
+static enum GNUNET_GenericReturnValue
+sum_for_aml (
+ void *cls,
+ const struct TALER_Amount *amount,
+ struct GNUNET_TIME_Absolute date)
+{
+ struct TALER_Amount *sum = cls;
+
+ (void) date;
+ if (0 >
+ TALER_amount_add (sum,
+ sum,
+ amount))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Test if AML is required for a transfer to @a h_payto.
+ *
+ * @param[in,out] au_active aggregation unit to check for
+ * @return true if AML checks are satisfied
+ */
+static bool
+aml_satisfied (struct AggregationUnit *au_active)
+{
+ enum GNUNET_DB_QueryStatus qs;
+ struct TALER_Amount total;
+ struct TALER_Amount threshold;
+ enum TALER_AmlDecisionState decision;
+ struct TALER_EXCHANGEDB_KycStatus kyc;
+
+ total = au_active->final_amount;
+ qs = db_plugin->select_aggregation_amounts_for_kyc_check (
+ db_plugin->cls,
+ &au_active->h_payto,
+ GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (),
+ GNUNET_TIME_UNIT_MONTHS),
+ &sum_for_aml,
+ &total);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
+ qs = db_plugin->select_aml_threshold (db_plugin->cls,
+ &au_active->h_payto,
+ &decision,
+ &kyc,
+ &threshold);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ {
+ threshold = aml_threshold; /* use default */
+ decision = TALER_AML_NORMAL;
+ }
+ switch (decision)
+ {
+ case TALER_AML_NORMAL:
+ if (0 >= TALER_amount_cmp (&total,
+ &threshold))
+ {
+ /* total <= threshold, do nothing */
+ return true;
+ }
+ qs = db_plugin->trigger_aml_process (db_plugin->cls,
+ &au_active->h_payto,
+ &total);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
+ return false;
+ case TALER_AML_PENDING:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "AML already pending, doing nothing\n");
+ return false;
+ case TALER_AML_FROZEN:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Account frozen, doing nothing\n");
+ return false;
+ }
+ GNUNET_assert (0);
return false;
}
@@ -589,10 +728,16 @@ do_aggregate (struct AggregationUnit *au)
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
&au->wtid,
sizeof (au->wtid));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No transient aggregation found, starting %s\n",
+ TALER_B2S (&au->wtid));
au->have_transient = false;
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
au->have_transient = true;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Transient aggregation found, resuming %s\n",
+ TALER_B2S (&au->wtid));
break;
}
qs = db_plugin->aggregate (db_plugin->cls,
@@ -614,10 +759,9 @@ do_aggregate (struct AggregationUnit *au)
"Serialization issue, trying again later!\n");
return GNUNET_NO;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Aggregation total is %s.\n",
TALER_amount2s (&au->total_amount));
-
/* Subtract wire transfer fee and round to the unit supported by the
wire transfer method; Check if after rounding down, we still have
an amount to transfer, and if not mark as 'tiny'. */
@@ -639,7 +783,8 @@ do_aggregate (struct AggregationUnit *au)
TALER_amount_round_down (&au->final_amount,
&currency_round_unit)) ||
(TALER_amount_is_zero (&au->final_amount)) ||
- (! kyc_satisfied (au)) )
+ (! kyc_satisfied (au)) ||
+ (! aml_satisfied (au)) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Not ready for wire transfer (%d/%s)\n",
@@ -649,6 +794,7 @@ do_aggregate (struct AggregationUnit *au)
qs = db_plugin->update_aggregation_transient (db_plugin->cls,
&au->h_payto,
&au->wtid,
+ au->requirement_row,
&au->total_amount);
else
qs = db_plugin->create_aggregation_transient (db_plugin->cls,
@@ -656,6 +802,7 @@ do_aggregate (struct AggregationUnit *au)
au->wa->section_name,
&au->merchant_pub,
&au->wtid,
+ au->requirement_row,
&au->total_amount);
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
@@ -678,15 +825,30 @@ do_aggregate (struct AggregationUnit *au)
{
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Serialization issue during aggregation; trying again later!\n");
+ "Serialization issue during aggregation; trying again later!\n")
+ ;
return GNUNET_NO;
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
global_ret = EXIT_FAILURE;
return GNUNET_SYSERR;
default:
- return GNUNET_OK;
+ break;
+ }
+ {
+ struct TALER_CoinDepositEventP rep = {
+ .header.size = htons (sizeof (rep)),
+ .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED),
+ .merchant_pub = au->merchant_pub
+ };
+
+ db_plugin->event_notify (db_plugin->cls,
+ &rep.header,
+ NULL,
+ 0);
}
+ return GNUNET_OK;
+
}
@@ -760,22 +922,34 @@ run_aggregation (void *cls)
(0 == counter) )
{
/* in test mode, shutdown after a shard is done with 0 work */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No work done and in test mode, shutting down\n");
GNUNET_SCHEDULER_shutdown ();
return;
}
GNUNET_assert (NULL == task);
/* If we ended up doing zero work, sleep a bit */
if (0 == counter)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Going to sleep for %s before trying again\n",
+ GNUNET_TIME_relative2s (aggregator_idle_sleep_interval,
+ true));
task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
&drain_kyc_alerts,
NULL);
+ }
else
+ {
task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
+ }
return;
}
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
s->work_counter++;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Found ready deposit!\n");
/* continued below */
break;
}
@@ -787,6 +961,7 @@ run_aggregation (void *cls)
switch (ret)
{
case GNUNET_SYSERR:
+ global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
db_plugin->rollback (db_plugin->cls);
release_shard (s);
@@ -855,6 +1030,8 @@ run_shard (void *cls)
(void) cls;
task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Running aggregation shard\n");
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
{
@@ -881,6 +1058,7 @@ run_shard (void *cls)
GNUNET_free (s);
delay = GNUNET_TIME_randomized_backoff (delay,
GNUNET_TIME_UNIT_SECONDS);
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_delayed (delay,
&run_shard,
NULL);
@@ -898,6 +1076,7 @@ run_shard (void *cls)
"Starting shard [%u:%u]!\n",
(unsigned int) s->shard_start,
(unsigned int) s->shard_end);
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
}
@@ -948,6 +1127,8 @@ drain_kyc_alerts (void *cls)
(void) cls;
task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Draining KYC alerts\n");
memset (&au,
0,
sizeof (au));
@@ -1046,6 +1227,7 @@ drain_kyc_alerts (void *cls)
{
case GNUNET_SYSERR:
GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
db_plugin->rollback (db_plugin->cls); /* just in case */
return;