summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-aggregator.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r--src/exchange/taler-exchange-aggregator.c967
1 files changed, 730 insertions, 237 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index 9568f011e..691d65ae3 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2016-2021 Taler Systems SA
+ Copyright (C) 2016-2022 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -26,7 +26,9 @@
#include "taler_exchangedb_lib.h"
#include "taler_exchangedb_plugin.h"
#include "taler_json_lib.h"
+#include "taler_kyclogic_lib.h"
#include "taler_bank_service.h"
+#include "taler_dbevents.h"
/**
@@ -43,6 +45,12 @@ struct AggregationUnit
struct TALER_MerchantPublicKeyP merchant_pub;
/**
+ * Transient amount already found aggregated,
+ * set only if @e have_transient is true.
+ */
+ struct TALER_Amount trans;
+
+ /**
* Total amount to be transferred, before subtraction of @e fees.wire and rounding down.
*/
struct TALER_Amount total_amount;
@@ -84,6 +92,25 @@ struct AggregationUnit
*/
const struct TALER_EXCHANGEDB_AccountInfo *wa;
+ /**
+ * Row in KYC table for legitimization requirements
+ * that are pending for this aggregation, or 0 if none.
+ */
+ uint64_t requirement_row;
+
+ /**
+ * Set to #GNUNET_OK during transient checking
+ * while everything is OK. Otherwise see return
+ * value of #do_aggregate().
+ */
+ enum GNUNET_GenericReturnValue ret;
+
+ /**
+ * Do we have an entry in the transient table for
+ * this aggregation?
+ */
+ bool have_transient;
+
};
@@ -123,6 +150,12 @@ struct Shard
static struct TALER_Amount currency_round_unit;
/**
+ * What is the largest amount we transfer before triggering
+ * an AML check?
+ */
+static struct TALER_Amount aml_threshold;
+
+/**
* What is the base URL of this exchange? Used in the
* wire transfer subjects so that merchants and governments
* can ask for the list of aggregated deposits.
@@ -151,7 +184,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
*/
static struct GNUNET_SCHEDULER_Task *task;
-
/**
* How long should we sleep when idle before trying to find more work?
*/
@@ -186,12 +218,12 @@ run_aggregation (void *cls);
/**
- * Select a shard to work on.
+ * Work on transactions unlocked by KYC.
*
* @param cls NULL
*/
static void
-run_shard (void *cls);
+drain_kyc_alerts (void *cls);
/**
@@ -226,6 +258,7 @@ shutdown_task (void *cls)
GNUNET_SCHEDULER_cancel (task);
task = NULL;
}
+ TALER_KYCLOGIC_kyc_done ();
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
TALER_EXCHANGEDB_unload_accounts ();
@@ -234,12 +267,12 @@ shutdown_task (void *cls)
/**
- * Parse the configuration for wirewatch.
+ * Parse the configuration for aggregator.
*
* @return #GNUNET_OK on success
*/
static enum GNUNET_GenericReturnValue
-parse_wirewatch_config (void)
+parse_aggregator_config (void)
{
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_string (cfg,
@@ -268,11 +301,20 @@ parse_wirewatch_config (void)
"taler",
"CURRENCY_ROUND_UNIT",
&currency_round_unit)) ||
- ( (0 != currency_round_unit.fraction) &&
- (0 != currency_round_unit.value) ) )
+ (TALER_amount_is_zero (&currency_round_unit)) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Need non-zero amount in section `taler' under `CURRENCY_ROUND_UNIT'\n");
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_OK !=
+ TALER_config_get_amount (cfg,
+ "exchange",
+ "AML_THRESHOLD",
+ &aml_threshold))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Need non-zero value in section `TALER' under `CURRENCY_ROUND_UNIT'\n");
+ "Need amount in section `exchange' under `AML_THRESHOLD'\n");
return GNUNET_SYSERR;
}
@@ -341,6 +383,7 @@ release_shard (struct Shard *s)
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
@@ -353,109 +396,286 @@ release_shard (struct Shard *s)
}
+/**
+ * Trigger the wire transfer for the @a au_active
+ * and delete the record of the aggregation.
+ *
+ * @param au_active information about the aggregation
+ */
+static enum GNUNET_DB_QueryStatus
+trigger_wire_transfer (const struct AggregationUnit *au_active)
+{
+ enum GNUNET_DB_QueryStatus qs;
+
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Preparing wire transfer of %s to %s\n",
+ TALER_amount2s (&au_active->final_amount),
+ TALER_B2S (&au_active->merchant_pub));
+ {
+ void *buf;
+ size_t buf_size;
+
+ TALER_BANK_prepare_transfer (au_active->payto_uri,
+ &au_active->final_amount,
+ exchange_base_url,
+ &au_active->wtid,
+ &buf,
+ &buf_size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Storing %u bytes of wire prepare data\n",
+ (unsigned int) buf_size);
+ /* Commit our intention to execute the wire transfer! */
+ qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
+ au_active->wa->method,
+ buf,
+ buf_size);
+ GNUNET_free (buf);
+ }
+ /* Commit the WTID data to 'wire_out' */
+ if (qs >= 0)
+ qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
+ au_active->execution_time,
+ &au_active->wtid,
+ &au_active->h_payto,
+ au_active->wa->section_name,
+ &au_active->final_amount);
+
+ if ( (qs >= 0) &&
+ au_active->have_transient)
+ qs = db_plugin->delete_aggregation_transient (db_plugin->cls,
+ &au_active->h_payto,
+ &au_active->wtid);
+ return qs;
+}
+
+
+/**
+ * Callback to return all applicable amounts for the KYC
+ * decision to @ a cb.
+ *
+ * @param cls a `struct AggregationUnit *`
+ * @param limit time limit for the iteration
+ * @param cb function to call with the amounts
+ * @param cb_cls closure for @a cb
+ */
static void
-run_aggregation (void *cls)
+return_relevant_amounts (void *cls,
+ struct GNUNET_TIME_Absolute limit,
+ TALER_EXCHANGEDB_KycAmountCallback cb,
+ void *cb_cls)
{
- struct Shard *s = cls;
- struct AggregationUnit au_active;
+ const struct AggregationUnit *au_active = cls;
enum GNUNET_DB_QueryStatus qs;
- struct TALER_Amount trans;
- bool have_transient = true; /* squash compiler warning */
- task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Checking for ready deposits to aggregate\n");
- /* make sure we have current fees */
- memset (&au_active,
- 0,
- sizeof (au_active));
- au_active.execution_time = GNUNET_TIME_timestamp_get ();
+ "Returning amount %s in KYC check\n",
+ TALER_amount2s (&au_active->total_amount));
if (GNUNET_OK !=
- db_plugin->start_deferred_wire_out (db_plugin->cls))
+ cb (cb_cls,
+ &au_active->total_amount,
+ GNUNET_TIME_absolute_get ()))
+ return;
+ qs = db_plugin->select_aggregation_amounts_for_kyc_check (
+ db_plugin->cls,
+ &au_active->h_payto,
+ limit,
+ cb,
+ cb_cls);
+ if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to start database transaction!\n");
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- release_shard (s);
- return;
+ "Failed to select aggregation amounts for KYC limit check!\n");
}
- qs = db_plugin->get_ready_deposit (
+}
+
+
+/**
+ * Test if KYC is required for a transfer to @a h_payto.
+ *
+ * @param[in,out] au_active aggregation unit to check for
+ * @return true if KYC checks are satisfied
+ */
+static bool
+kyc_satisfied (struct AggregationUnit *au_active)
+{
+ char *requirement;
+ enum GNUNET_DB_QueryStatus qs;
+
+ if (kyc_off)
+ return true;
+ qs = TALER_KYCLOGIC_kyc_test_required (
+ TALER_KYCLOGIC_KYC_TRIGGER_DEPOSIT,
+ &au_active->h_payto,
+ db_plugin->select_satisfied_kyc_processes,
db_plugin->cls,
- s->shard_start,
- s->shard_end,
- kyc_off ? true : false,
- &au_active.merchant_pub,
- &au_active.payto_uri);
- switch (qs)
+ &return_relevant_amounts,
+ (void *) au_active,
+ &requirement);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
+ if (NULL == requirement)
+ return true;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "KYC requirement for %s is %s\n",
+ TALER_amount2s (&au_active->total_amount),
+ requirement);
+ qs = db_plugin->insert_kyc_requirement_for_account (
+ db_plugin->cls,
+ requirement,
+ &au_active->h_payto,
+ NULL, /* not a reserve */
+ &au_active->requirement_row);
+ if (qs < 0)
{
- case GNUNET_DB_STATUS_HARD_ERROR:
- cleanup_au (&au_active);
- db_plugin->rollback (db_plugin->cls);
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to begin deposit iteration!\n");
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- release_shard (s);
- return;
- case GNUNET_DB_STATUS_SOFT_ERROR:
- cleanup_au (&au_active);
- db_plugin->rollback (db_plugin->cls);
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- s);
- return;
- case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- {
- uint64_t counter = s->work_counter;
- struct GNUNET_TIME_Relative duration
- = GNUNET_TIME_absolute_get_duration (s->start_time.abs_time);
+ "Failed to persist KYC requirement `%s' in DB!\n",
+ requirement);
+ }
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Legitimization process %llu started\n",
+ (unsigned long long) au_active->requirement_row);
+ }
+ GNUNET_free (requirement);
+ return false;
+}
- cleanup_au (&au_active);
- db_plugin->rollback (db_plugin->cls);
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Completed shard [%u,%u] after %s with %llu deposits\n",
- (unsigned int) s->shard_start,
- (unsigned int) s->shard_end,
- GNUNET_TIME_relative2s (duration,
- true),
- (unsigned long long) counter);
- release_shard (s);
- if ( (GNUNET_YES == test_mode) &&
- (0 == counter) )
- {
- /* in test mode, shutdown after a shard is done with 0 work */
- GNUNET_SCHEDULER_shutdown ();
- return;
- }
- GNUNET_assert (NULL == task);
- /* If we ended up doing zero work, sleep a bit */
- if (0 == counter)
- task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
- &run_shard,
- NULL);
- else
- task = GNUNET_SCHEDULER_add_now (&run_shard,
- NULL);
- return;
+
+/**
+ * Function called on each @a amount that was found to
+ * be relevant for an AML check.
+ *
+ * @param cls closure with the `struct TALER_Amount *` where we store the sum
+ * @param amount encountered transaction amount
+ * @param date when was the amount encountered
+ * @return #GNUNET_OK to continue to iterate,
+ * #GNUNET_NO to abort iteration
+ * #GNUNET_SYSERR on internal error (also abort itaration)
+ */
+static enum GNUNET_GenericReturnValue
+sum_for_aml (
+ void *cls,
+ const struct TALER_Amount *amount,
+ struct GNUNET_TIME_Absolute date)
+{
+ struct TALER_Amount *sum = cls;
+
+ (void) date;
+ if (0 >
+ TALER_amount_add (sum,
+ sum,
+ amount))
+ {
+ GNUNET_break (0);
+ return GNUNET_SYSERR;
+ }
+ return GNUNET_OK;
+}
+
+
+/**
+ * Test if AML is required for a transfer to @a h_payto.
+ *
+ * @param[in,out] au_active aggregation unit to check for
+ * @return true if AML checks are satisfied
+ */
+static bool
+aml_satisfied (struct AggregationUnit *au_active)
+{
+ enum GNUNET_DB_QueryStatus qs;
+ struct TALER_Amount total;
+ struct TALER_Amount threshold;
+ enum TALER_AmlDecisionState decision;
+ struct TALER_EXCHANGEDB_KycStatus kyc;
+
+ total = au_active->final_amount;
+ qs = db_plugin->select_aggregation_amounts_for_kyc_check (
+ db_plugin->cls,
+ &au_active->h_payto,
+ GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (),
+ GNUNET_TIME_UNIT_MONTHS),
+ &sum_for_aml,
+ &total);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
+ qs = db_plugin->select_aml_threshold (db_plugin->cls,
+ &au_active->h_payto,
+ &decision,
+ &kyc,
+ &threshold);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ {
+ threshold = aml_threshold; /* use default */
+ decision = TALER_AML_NORMAL;
+ }
+ switch (decision)
+ {
+ case TALER_AML_NORMAL:
+ if (0 >= TALER_amount_cmp (&total,
+ &threshold))
+ {
+ /* total <= threshold, do nothing */
+ return true;
}
- case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- s->work_counter++;
- /* continued below */
- break;
+ qs = db_plugin->trigger_aml_process (db_plugin->cls,
+ &au_active->h_payto,
+ &total);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
+ return false;
+ case TALER_AML_PENDING:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "AML already pending, doing nothing\n");
+ return false;
+ case TALER_AML_FROZEN:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Account frozen, doing nothing\n");
+ return false;
}
- au_active.wa = TALER_EXCHANGEDB_find_account_by_payto_uri (
- au_active.payto_uri);
- if (NULL == au_active.wa)
+ GNUNET_assert (0);
+ return false;
+}
+
+
+/**
+ * Perform the main aggregation work for @a au. Expects to be in
+ * a working transaction, which the caller must also ultimately commit
+ * (or rollback) depending on our return value.
+ *
+ * @param[in,out] au aggregation unit to work on
+ * @return #GNUNET_OK if aggregation succeeded,
+ * #GNUNET_NO to rollback and try again (serialization issue)
+ * #GNUNET_SYSERR hard error, terminate aggregator process
+ */
+static enum GNUNET_GenericReturnValue
+do_aggregate (struct AggregationUnit *au)
+{
+ enum GNUNET_DB_QueryStatus qs;
+
+ au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (
+ au->payto_uri);
+ if (NULL == au->wa)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"No exchange account configured for `%s', please fix your setup to continue!\n",
- au_active.payto_uri);
+ au->payto_uri);
global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- db_plugin->rollback (db_plugin->cls);
- release_shard (s);
- return;
+ return GNUNET_SYSERR;
}
{
@@ -464,234 +684,301 @@ run_aggregation (void *cls)
struct TALER_MasterSignatureP master_sig;
qs = db_plugin->get_wire_fee (db_plugin->cls,
- au_active.wa->method,
- au_active.execution_time,
+ au->wa->method,
+ au->execution_time,
&start_date,
&end_date,
- &au_active.fees,
+ &au->fees,
&master_sig);
if (0 >= qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Could not get wire fees for %s at %s. Aborting run.\n",
- au_active.wa->method,
- GNUNET_TIME_timestamp2s (au_active.execution_time));
+ au->wa->method,
+ GNUNET_TIME_timestamp2s (au->execution_time));
global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- db_plugin->rollback (db_plugin->cls);
- release_shard (s);
- return;
+ return GNUNET_SYSERR;
}
}
-
/* Now try to find other deposits to aggregate */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Found ready deposit for %s, aggregating by target %s\n",
- TALER_B2S (&au_active.merchant_pub),
- au_active.payto_uri);
- TALER_payto_hash (au_active.payto_uri,
- &au_active.h_payto);
-
+ TALER_B2S (&au->merchant_pub),
+ au->payto_uri);
qs = db_plugin->select_aggregation_transient (db_plugin->cls,
- &au_active.h_payto,
- au_active.wa->section_name,
- &au_active.wtid,
- &trans);
+ &au->h_payto,
+ &au->merchant_pub,
+ au->wa->section_name,
+ &au->wtid,
+ &au->trans);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to lookup transient aggregates!\n");
- cleanup_au (&au_active);
- db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- release_shard (s);
- return;
+ return GNUNET_SYSERR;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* serializiability issue, try again */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Serialization issue, trying again later!\n");
- db_plugin->rollback (db_plugin->cls);
- cleanup_au (&au_active);
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- s);
- return;
+ return GNUNET_NO;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
- &au_active.wtid,
- sizeof (au_active.wtid));
- have_transient = false;
+ &au->wtid,
+ sizeof (au->wtid));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No transient aggregation found, starting %s\n",
+ TALER_B2S (&au->wtid));
+ au->have_transient = false;
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- have_transient = true;
+ au->have_transient = true;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Transient aggregation found, resuming %s\n",
+ TALER_B2S (&au->wtid));
break;
}
qs = db_plugin->aggregate (db_plugin->cls,
- &au_active.h_payto,
- &au_active.merchant_pub,
- &au_active.wtid,
- &au_active.total_amount);
+ &au->h_payto,
+ &au->merchant_pub,
+ &au->wtid,
+ &au->total_amount);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute aggregation!\n");
- cleanup_au (&au_active);
- db_plugin->rollback (db_plugin->cls);
global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- release_shard (s);
- return;
+ return GNUNET_SYSERR;
}
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
/* serializiability issue, try again */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Serialization issue, trying again later!\n");
- db_plugin->rollback (db_plugin->cls);
- cleanup_au (&au_active);
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- s);
- return;
+ return GNUNET_NO;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Aggregation total is %s.\n",
- TALER_amount2s (&au_active.total_amount));
-
+ TALER_amount2s (&au->total_amount));
/* Subtract wire transfer fee and round to the unit supported by the
wire transfer method; Check if after rounding down, we still have
an amount to transfer, and if not mark as 'tiny'. */
- if (have_transient)
+ if (au->have_transient)
GNUNET_assert (0 <=
- TALER_amount_add (&au_active.total_amount,
- &au_active.total_amount,
- &trans));
+ TALER_amount_add (&au->total_amount,
+ &au->total_amount,
+ &au->trans));
+
+
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Rounding aggregate of %s\n",
- TALER_amount2s (&au_active.total_amount));
+ TALER_amount2s (&au->total_amount));
if ( (0 >=
- TALER_amount_subtract (&au_active.final_amount,
- &au_active.total_amount,
- &au_active.fees.wire)) ||
+ TALER_amount_subtract (&au->final_amount,
+ &au->total_amount,
+ &au->fees.wire)) ||
(GNUNET_SYSERR ==
- TALER_amount_round_down (&au_active.final_amount,
+ TALER_amount_round_down (&au->final_amount,
&currency_round_unit)) ||
- (TALER_amount_is_zero (&au_active.final_amount)) )
+ (TALER_amount_is_zero (&au->final_amount)) ||
+ (! kyc_satisfied (au)) ||
+ (! aml_satisfied (au)) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Aggregate value too low for transfer (%d/%s)\n",
+ "Not ready for wire transfer (%d/%s)\n",
qs,
- TALER_amount2s (&au_active.final_amount));
- if (have_transient)
+ TALER_amount2s (&au->final_amount));
+ if (au->have_transient)
qs = db_plugin->update_aggregation_transient (db_plugin->cls,
- &au_active.h_payto,
- &au_active.wtid,
- &au_active.total_amount);
+ &au->h_payto,
+ &au->wtid,
+ au->requirement_row,
+ &au->total_amount);
else
qs = db_plugin->create_aggregation_transient (db_plugin->cls,
- &au_active.h_payto,
- au_active.wa->section_name,
- &au_active.wtid,
- &au_active.total_amount);
+ &au->h_payto,
+ au->wa->section_name,
+ &au->merchant_pub,
+ &au->wtid,
+ au->requirement_row,
+ &au->total_amount);
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serialization issue, trying again later!\n");
- db_plugin->rollback (db_plugin->cls);
- cleanup_au (&au_active);
- /* start again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- s);
- return;
+ return GNUNET_NO;
}
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls);
- cleanup_au (&au_active);
global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- release_shard (s);
- return;
+ return GNUNET_SYSERR;
}
/* commit */
- (void) commit_or_warn ();
- cleanup_au (&au_active);
+ return GNUNET_OK;
+ }
- /* start again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- s);
- return;
+ qs = trigger_wire_transfer (au);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Serialization issue during aggregation; trying again later!\n")
+ ;
+ return GNUNET_NO;
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ return GNUNET_SYSERR;
+ default:
+ break;
}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Preparing wire transfer of %s to %s\n",
- TALER_amount2s (&au_active.final_amount),
- TALER_B2S (&au_active.merchant_pub));
{
- void *buf;
- size_t buf_size;
-
- TALER_BANK_prepare_transfer (au_active.payto_uri,
- &au_active.final_amount,
- exchange_base_url,
- &au_active.wtid,
- &buf,
- &buf_size);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Storing %u bytes of wire prepare data\n",
- (unsigned int) buf_size);
- /* Commit our intention to execute the wire transfer! */
- qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
- au_active.wa->method,
- buf,
- buf_size);
- GNUNET_free (buf);
+ struct TALER_CoinDepositEventP rep = {
+ .header.size = htons (sizeof (rep)),
+ .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED),
+ .merchant_pub = au->merchant_pub
+ };
+
+ db_plugin->event_notify (db_plugin->cls,
+ &rep.header,
+ NULL,
+ 0);
}
- /* Commit the WTID data to 'wire_out' */
- if (qs >= 0)
- qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
- au_active.execution_time,
- &au_active.wtid,
- &au_active.h_payto,
- au_active.wa->section_name,
- &au_active.final_amount);
+ return GNUNET_OK;
- if ( (qs >= 0) &&
- have_transient)
- qs = db_plugin->delete_aggregation_transient (db_plugin->cls,
- &au_active.h_payto,
- &au_active.wtid);
- cleanup_au (&au_active);
+}
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+
+static void
+run_aggregation (void *cls)
+{
+ struct Shard *s = cls;
+ struct AggregationUnit au_active;
+ enum GNUNET_DB_QueryStatus qs;
+ enum GNUNET_GenericReturnValue ret;
+
+ task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Checking for ready deposits to aggregate\n");
+ /* make sure we have current fees */
+ memset (&au_active,
+ 0,
+ sizeof (au_active));
+ au_active.execution_time = GNUNET_TIME_timestamp_get ();
+ if (GNUNET_OK !=
+ db_plugin->start_deferred_wire_out (db_plugin->cls))
{
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Serialization issue for prepared wire data; trying again later!\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to start database transaction!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
+ return;
+ }
+ qs = db_plugin->get_ready_deposit (
+ db_plugin->cls,
+ s->shard_start,
+ s->shard_end,
+ &au_active.merchant_pub,
+ &au_active.payto_uri);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ cleanup_au (&au_active);
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to begin deposit iteration!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ release_shard (s);
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls);
- /* start again */
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ {
+ uint64_t counter = s->work_counter;
+ struct GNUNET_TIME_Relative duration
+ = GNUNET_TIME_absolute_get_duration (s->start_time.abs_time);
+
+ cleanup_au (&au_active);
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Completed shard [%u,%u] after %s with %llu deposits\n",
+ (unsigned int) s->shard_start,
+ (unsigned int) s->shard_end,
+ GNUNET_TIME_relative2s (duration,
+ true),
+ (unsigned long long) counter);
+ release_shard (s);
+ if ( (GNUNET_YES == test_mode) &&
+ (0 == counter) )
+ {
+ /* in test mode, shutdown after a shard is done with 0 work */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No work done and in test mode, shutting down\n");
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ GNUNET_assert (NULL == task);
+ /* If we ended up doing zero work, sleep a bit */
+ if (0 == counter)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Going to sleep for %s before trying again\n",
+ GNUNET_TIME_relative2s (aggregator_idle_sleep_interval,
+ true));
+ task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
+ &drain_kyc_alerts,
+ NULL);
+ }
+ else
+ {
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ }
+ return;
+ }
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ s->work_counter++;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Found ready deposit!\n");
+ /* continued below */
+ break;
}
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+
+ TALER_payto_hash (au_active.payto_uri,
+ &au_active.h_payto);
+ ret = do_aggregate (&au_active);
+ cleanup_au (&au_active);
+ switch (ret)
{
- GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls);
- /* die hard */
+ case GNUNET_SYSERR:
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
+ db_plugin->rollback (db_plugin->cls);
release_shard (s);
return;
+ case GNUNET_NO:
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&run_aggregation,
+ s);
+ return;
+ case GNUNET_OK:
+ /* continued below */
+ break;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Stored wire transfer out instructions\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Committing aggregation result\n");
/* Now we can finally commit the overall transaction, as we are
again consistent if all of this passes. */
@@ -699,8 +986,8 @@ run_aggregation (void *cls)
{
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Commit issue for prepared wire data; trying again later!\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Serialization issue on commit; trying again later!\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
@@ -714,7 +1001,7 @@ run_aggregation (void *cls)
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Preparation complete, going again\n");
+ "Commit complete, going again\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
@@ -743,6 +1030,8 @@ run_shard (void *cls)
(void) cls;
task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Running aggregation shard\n");
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
{
@@ -769,6 +1058,7 @@ run_shard (void *cls)
GNUNET_free (s);
delay = GNUNET_TIME_randomized_backoff (delay,
GNUNET_TIME_UNIT_SECONDS);
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_delayed (delay,
&run_shard,
NULL);
@@ -786,12 +1076,207 @@ run_shard (void *cls)
"Starting shard [%u:%u]!\n",
(unsigned int) s->shard_start,
(unsigned int) s->shard_end);
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
}
/**
+ * Function called on transient aggregations matching
+ * a particular hash of a payto URI.
+ *
+ * @param cls
+ * @param payto_uri corresponding payto URI
+ * @param wtid wire transfer identifier of transient aggregation
+ * @param merchant_pub public key of the merchant
+ * @param total amount aggregated so far
+ * @return true to continue to iterate
+ */
+static bool
+handle_transient_cb (
+ void *cls,
+ const char *payto_uri,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_Amount *total)
+{
+ struct AggregationUnit *au = cls;
+
+ if (GNUNET_OK != au->ret)
+ {
+ GNUNET_break (0);
+ return false;
+ }
+ au->payto_uri = GNUNET_strdup (payto_uri);
+ au->wtid = *wtid;
+ au->merchant_pub = *merchant_pub;
+ au->trans = *total;
+ au->have_transient = true;
+ au->ret = do_aggregate (au);
+ GNUNET_free (au->payto_uri);
+ return (GNUNET_OK == au->ret);
+}
+
+
+static void
+drain_kyc_alerts (void *cls)
+{
+ enum GNUNET_DB_QueryStatus qs;
+ struct AggregationUnit au;
+
+ (void) cls;
+ task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Draining KYC alerts\n");
+ memset (&au,
+ 0,
+ sizeof (au));
+ au.execution_time = GNUNET_TIME_timestamp_get ();
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database connection!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (GNUNET_OK !=
+ db_plugin->start (db_plugin->cls,
+ "handle kyc alerts"))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to start database transaction!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ while (1)
+ {
+ qs = db_plugin->drain_kyc_alert (db_plugin->cls,
+ 1,
+ &au.h_payto);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ qs = db_plugin->commit (db_plugin->cls);
+ if (qs < 0)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to commit KYC drain\n");
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&run_shard,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* handled below */
+ break;
+ }
+
+ au.ret = GNUNET_OK;
+ qs = db_plugin->find_aggregation_transient (db_plugin->cls,
+ &au.h_payto,
+ &handle_transient_cb,
+ &au);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to lookup transient aggregates!\n");
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* serializiability issue, try again */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Serialization issue, trying again later!\n");
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ continue; /* while (1) */
+ default:
+ break;
+ }
+ break;
+ } /* while(1) */
+
+ {
+ enum GNUNET_GenericReturnValue ret;
+
+ ret = au.ret;
+ cleanup_au (&au);
+ switch (ret)
+ {
+ case GNUNET_SYSERR:
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ db_plugin->rollback (db_plugin->cls); /* just in case */
+ return;
+ case GNUNET_NO:
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ case GNUNET_OK:
+ /* continued below */
+ break;
+ }
+ }
+
+ switch (commit_or_warn ())
+ {
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* try again */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Serialization issue on commit; trying again later!\n");
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ db_plugin->rollback (db_plugin->cls); /* just in case */
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Commit complete, going again\n");
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ default:
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ db_plugin->rollback (db_plugin->cls); /* just in case */
+ return;
+ }
+}
+
+
+/**
* First task.
*
* @param cls closure, NULL
@@ -811,7 +1296,8 @@ run (void *cls,
(void) cfgfile;
cfg = c;
- if (GNUNET_OK != parse_wirewatch_config ())
+ if (GNUNET_OK !=
+ parse_aggregator_config ())
{
cfg = NULL;
global_ret = EXIT_NOTCONFIGURED;
@@ -832,11 +1318,18 @@ run (void *cls,
shard_size = 1U + INT32_MAX;
else
shard_size = (uint32_t) ass;
+ if (GNUNET_OK !=
+ TALER_KYCLOGIC_kyc_init (cfg))
+ {
+ cfg = NULL;
+ global_ret = EXIT_NOTCONFIGURED;
+ return;
+ }
+ GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+ NULL);
GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_shard,
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
- GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
- cls);
}