summaryrefslogtreecommitdiff
path: root/src/exchange/taler-exchange-aggregator.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchange/taler-exchange-aggregator.c')
-rw-r--r--src/exchange/taler-exchange-aggregator.c1233
1 files changed, 735 insertions, 498 deletions
diff --git a/src/exchange/taler-exchange-aggregator.c b/src/exchange/taler-exchange-aggregator.c
index 73bbcc594..691d65ae3 100644
--- a/src/exchange/taler-exchange-aggregator.c
+++ b/src/exchange/taler-exchange-aggregator.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2016-2021 Taler Systems SA
+ Copyright (C) 2016-2022 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -26,7 +26,9 @@
#include "taler_exchangedb_lib.h"
#include "taler_exchangedb_plugin.h"
#include "taler_json_lib.h"
+#include "taler_kyclogic_lib.h"
#include "taler_bank_service.h"
+#include "taler_dbevents.h"
/**
@@ -43,7 +45,13 @@ struct AggregationUnit
struct TALER_MerchantPublicKeyP merchant_pub;
/**
- * Total amount to be transferred, before subtraction of @e wire_fee and rounding down.
+ * Transient amount already found aggregated,
+ * set only if @e have_transient is true.
+ */
+ struct TALER_Amount trans;
+
+ /**
+ * Total amount to be transferred, before subtraction of @e fees.wire and rounding down.
*/
struct TALER_Amount total_amount;
@@ -55,7 +63,7 @@ struct AggregationUnit
/**
* Wire fee we charge for @e wp at @e execution_time.
*/
- struct TALER_Amount wire_fee;
+ struct TALER_WireFeeSet fees;
/**
* Wire transfer identifier we use.
@@ -63,15 +71,10 @@ struct AggregationUnit
struct TALER_WireTransferIdentifierRawP wtid;
/**
- * Row ID of the transaction that started it all.
- */
- uint64_t row_id;
-
- /**
* The current time (which triggered the aggregation and
* defines the wire fee).
*/
- struct GNUNET_TIME_Absolute execution_time;
+ struct GNUNET_TIME_Timestamp execution_time;
/**
* Wire details of the merchant.
@@ -81,7 +84,7 @@ struct AggregationUnit
/**
* Selected wire target for the aggregation.
*/
- uint64_t wire_target;
+ struct TALER_PaytoHashP h_payto;
/**
* Exchange wire account to be used for the preparation and
@@ -90,20 +93,24 @@ struct AggregationUnit
const struct TALER_EXCHANGEDB_AccountInfo *wa;
/**
- * Array of row_ids from the aggregation.
+ * Row in KYC table for legitimization requirements
+ * that are pending for this aggregation, or 0 if none.
*/
- uint64_t additional_rows[TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT];
+ uint64_t requirement_row;
/**
- * Offset specifying how many @e additional_rows are in use.
+ * Set to #GNUNET_OK during transient checking
+ * while everything is OK. Otherwise see return
+ * value of #do_aggregate().
*/
- unsigned int rows_offset;
+ enum GNUNET_GenericReturnValue ret;
/**
- * Set to true if we encountered a refund during #refund_by_coin_cb.
- * Used to wave the deposit fee.
+ * Do we have an entry in the transient table for
+ * this aggregation?
*/
- bool have_refund;
+ bool have_transient;
+
};
@@ -116,7 +123,7 @@ struct Shard
/**
* When did we start processing the shard?
*/
- struct GNUNET_TIME_Absolute start_time;
+ struct GNUNET_TIME_Timestamp start_time;
/**
* Starting row of the shard.
@@ -143,6 +150,12 @@ struct Shard
static struct TALER_Amount currency_round_unit;
/**
+ * What is the largest amount we transfer before triggering
+ * an AML check?
+ */
+static struct TALER_Amount aml_threshold;
+
+/**
* What is the base URL of this exchange? Used in the
* wire transfer subjects so that merchants and governments
* can ask for the list of aggregated deposits.
@@ -171,7 +184,6 @@ static struct TALER_EXCHANGEDB_Plugin *db_plugin;
*/
static struct GNUNET_SCHEDULER_Task *task;
-
/**
* How long should we sleep when idle before trying to find more work?
*/
@@ -199,19 +211,19 @@ static int test_mode;
* Main work function that queries the DB and aggregates transactions
* into larger wire transfers.
*
- * @param cls NULL
+ * @param cls a `struct Shard *`
*/
static void
run_aggregation (void *cls);
/**
- * Select a shard to work on.
+ * Work on transactions unlocked by KYC.
*
* @param cls NULL
*/
static void
-run_shard (void *cls);
+drain_kyc_alerts (void *cls);
/**
@@ -246,6 +258,7 @@ shutdown_task (void *cls)
GNUNET_SCHEDULER_cancel (task);
task = NULL;
}
+ TALER_KYCLOGIC_kyc_done ();
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
TALER_EXCHANGEDB_unload_accounts ();
@@ -254,12 +267,12 @@ shutdown_task (void *cls)
/**
- * Parse the configuration for wirewatch.
+ * Parse the configuration for aggregator.
*
* @return #GNUNET_OK on success
*/
static enum GNUNET_GenericReturnValue
-parse_wirewatch_config (void)
+parse_aggregator_config (void)
{
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_string (cfg,
@@ -288,11 +301,20 @@ parse_wirewatch_config (void)
"taler",
"CURRENCY_ROUND_UNIT",
&currency_round_unit)) ||
- ( (0 != currency_round_unit.fraction) &&
- (0 != currency_round_unit.value) ) )
+ (TALER_amount_is_zero (&currency_round_unit)) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Need non-zero amount in section `taler' under `CURRENCY_ROUND_UNIT'\n");
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_OK !=
+ TALER_config_get_amount (cfg,
+ "exchange",
+ "AML_THRESHOLD",
+ &aml_threshold))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Need non-zero value in section `TALER' under `CURRENCY_ROUND_UNIT'\n");
+ "Need amount in section `exchange' under `AML_THRESHOLD'\n");
return GNUNET_SYSERR;
}
@@ -318,384 +340,534 @@ parse_wirewatch_config (void)
/**
- * Callback invoked with information about refunds applicable
- * to a particular coin. Subtract refunded amount(s) from
- * the aggregation unit's total amount.
+ * Perform a database commit. If it fails, print a warning.
*
- * @param cls closure with a `struct AggregationUnit *`
- * @param amount_with_fee what was the refunded amount with the fee
- * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop
+ * @return status of commit
*/
-static enum GNUNET_GenericReturnValue
-refund_by_coin_cb (void *cls,
- const struct TALER_Amount *amount_with_fee)
+static enum GNUNET_DB_QueryStatus
+commit_or_warn (void)
{
- struct AggregationUnit *aux = cls;
+ enum GNUNET_DB_QueryStatus qs;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Aggregator subtracts applicable refund of amount %s\n",
- TALER_amount2s (amount_with_fee));
- aux->have_refund = true;
- if (0 >
- TALER_amount_subtract (&aux->total_amount,
- &aux->total_amount,
- amount_with_fee))
+ qs = db_plugin->commit (db_plugin->cls);
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ return qs;
+ GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ ? GNUNET_ERROR_TYPE_INFO
+ : GNUNET_ERROR_TYPE_ERROR,
+ "Failed to commit database transaction!\n");
+ return qs;
+}
+
+
+/**
+ * Release lock on shard @a s in the database.
+ * On error, terminates this process.
+ *
+ * @param[in] s shard to free (and memory to release)
+ */
+static void
+release_shard (struct Shard *s)
+{
+ enum GNUNET_DB_QueryStatus qs;
+
+ qs = db_plugin->release_revolving_shard (
+ db_plugin->cls,
+ "aggregator",
+ s->shard_start,
+ s->shard_end);
+ GNUNET_free (s);
+ switch (qs)
{
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
GNUNET_break (0);
- return GNUNET_SYSERR;
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ /* Strange, but let's just continue */
+ break;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* normal case */
+ break;
}
- return GNUNET_OK;
}
/**
- * Function called with details about deposits that have been made,
- * with the goal of executing the corresponding wire transaction.
+ * Trigger the wire transfer for the @a au_active
+ * and delete the record of the aggregation.
*
- * @param cls a `struct AggregationUnit`
- * @param row_id identifies database entry
- * @param merchant_pub public key of the merchant
- * @param coin_pub public key of the coin
- * @param amount_with_fee amount that was deposited including fee
- * @param deposit_fee amount the exchange gets to keep as transaction fees
- * @param h_contract_terms hash of the proposal data known to merchant and customer
- * @param wire_target target account for the wire transfer
- * @param payto_uri URI of the target account
- * @return transaction status code, #GNUNET_DB_STATUS_SUCCESS_ONE_RESULT to continue to iterate
+ * @param au_active information about the aggregation
*/
static enum GNUNET_DB_QueryStatus
-deposit_cb (void *cls,
- uint64_t row_id,
- const struct TALER_MerchantPublicKeyP *merchant_pub,
- const struct TALER_CoinSpendPublicKeyP *coin_pub,
- const struct TALER_Amount *amount_with_fee,
- const struct TALER_Amount *deposit_fee,
- const struct TALER_PrivateContractHash *h_contract_terms,
- uint64_t wire_target,
- const char *payto_uri)
+trigger_wire_transfer (const struct AggregationUnit *au_active)
{
- struct AggregationUnit *au = cls;
enum GNUNET_DB_QueryStatus qs;
- au->merchant_pub = *merchant_pub;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Aggregator processing payment %s with amount %s\n",
- TALER_B2S (coin_pub),
- TALER_amount2s (amount_with_fee));
- au->row_id = row_id;
- au->total_amount = *amount_with_fee;
- au->have_refund = false;
- qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
- coin_pub,
- &au->merchant_pub,
- h_contract_terms,
- &refund_by_coin_cb,
- au);
- if (0 > qs)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
- }
- if (! au->have_refund)
+ "Preparing wire transfer of %s to %s\n",
+ TALER_amount2s (&au_active->final_amount),
+ TALER_B2S (&au_active->merchant_pub));
{
- struct TALER_Amount ntotal;
+ void *buf;
+ size_t buf_size;
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Non-refunded transaction, subtracting deposit fee %s\n",
- TALER_amount2s (deposit_fee));
- if (0 >
- TALER_amount_subtract (&ntotal,
- amount_with_fee,
- deposit_fee))
- {
- /* This should never happen, issue a warning, but continue processing
- with an amount of zero, least we hang here for good. */
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Fatally malformed record at row %llu over %s (deposit fee exceeds deposited value)\n",
- (unsigned long long) row_id,
- TALER_amount2s (amount_with_fee));
- GNUNET_assert (GNUNET_OK ==
- TALER_amount_set_zero (au->total_amount.currency,
- &au->total_amount));
- }
- else
- {
- au->total_amount = ntotal;
- }
+ TALER_BANK_prepare_transfer (au_active->payto_uri,
+ &au_active->final_amount,
+ exchange_base_url,
+ &au_active->wtid,
+ &buf,
+ &buf_size);
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Storing %u bytes of wire prepare data\n",
+ (unsigned int) buf_size);
+ /* Commit our intention to execute the wire transfer! */
+ qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
+ au_active->wa->method,
+ buf,
+ buf_size);
+ GNUNET_free (buf);
}
+ /* Commit the WTID data to 'wire_out' */
+ if (qs >= 0)
+ qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
+ au_active->execution_time,
+ &au_active->wtid,
+ &au_active->h_payto,
+ au_active->wa->section_name,
+ &au_active->final_amount);
+
+ if ( (qs >= 0) &&
+ au_active->have_transient)
+ qs = db_plugin->delete_aggregation_transient (db_plugin->cls,
+ &au_active->h_payto,
+ &au_active->wtid);
+ return qs;
+}
+
+
+/**
+ * Callback to return all applicable amounts for the KYC
+ * decision to @ a cb.
+ *
+ * @param cls a `struct AggregationUnit *`
+ * @param limit time limit for the iteration
+ * @param cb function to call with the amounts
+ * @param cb_cls closure for @a cb
+ */
+static void
+return_relevant_amounts (void *cls,
+ struct GNUNET_TIME_Absolute limit,
+ TALER_EXCHANGEDB_KycAmountCallback cb,
+ void *cb_cls)
+{
+ const struct AggregationUnit *au_active = cls;
+ enum GNUNET_DB_QueryStatus qs;
- GNUNET_assert (NULL == au->payto_uri);
- au->payto_uri = GNUNET_strdup (payto_uri);
- au->wire_target = wire_target;
- GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
- &au->wtid,
- sizeof (au->wtid));
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Starting aggregation under H(WTID)=%s, starting amount %s at %llu\n",
- TALER_B2S (&au->wtid),
- TALER_amount2s (amount_with_fee),
- (unsigned long long) row_id);
- au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (payto_uri);
- if (NULL == au->wa)
+ "Returning amount %s in KYC check\n",
+ TALER_amount2s (&au_active->total_amount));
+ if (GNUNET_OK !=
+ cb (cb_cls,
+ &au_active->total_amount,
+ GNUNET_TIME_absolute_get ()))
+ return;
+ qs = db_plugin->select_aggregation_amounts_for_kyc_check (
+ db_plugin->cls,
+ &au_active->h_payto,
+ limit,
+ cb,
+ cb_cls);
+ if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "No exchange account configured for `%s', please fix your setup to continue!\n",
- payto_uri);
- return GNUNET_DB_STATUS_HARD_ERROR;
+ "Failed to select aggregation amounts for KYC limit check!\n");
}
+}
- /* make sure we have current fees */
- au->execution_time = GNUNET_TIME_absolute_get ();
- (void) GNUNET_TIME_round_abs (&au->execution_time);
- {
- struct TALER_Amount closing_fee;
- struct GNUNET_TIME_Absolute start_date;
- struct GNUNET_TIME_Absolute end_date;
- struct TALER_MasterSignatureP master_sig;
- enum GNUNET_DB_QueryStatus qs;
- qs = db_plugin->get_wire_fee (db_plugin->cls,
- au->wa->method,
- au->execution_time,
- &start_date,
- &end_date,
- &au->wire_fee,
- &closing_fee,
- &master_sig);
- if (0 >= qs)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Could not get wire fees for %s at %s. Aborting run.\n",
- au->wa->method,
- GNUNET_STRINGS_absolute_time_to_string (au->execution_time));
- return GNUNET_DB_STATUS_HARD_ERROR;
- }
- }
+/**
+ * Test if KYC is required for a transfer to @a h_payto.
+ *
+ * @param[in,out] au_active aggregation unit to check for
+ * @return true if KYC checks are satisfied
+ */
+static bool
+kyc_satisfied (struct AggregationUnit *au_active)
+{
+ char *requirement;
+ enum GNUNET_DB_QueryStatus qs;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Aggregator starts aggregation for deposit %llu to %s with wire fee %s\n",
- (unsigned long long) row_id,
- TALER_B2S (&au->wtid),
- TALER_amount2s (&au->wire_fee));
- qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
- &au->wtid,
- row_id);
- if (qs <= 0)
+ if (kyc_off)
+ return true;
+ qs = TALER_KYCLOGIC_kyc_test_required (
+ TALER_KYCLOGIC_KYC_TRIGGER_DEPOSIT,
+ &au_active->h_payto,
+ db_plugin->select_satisfied_kyc_processes,
+ db_plugin->cls,
+ &return_relevant_amounts,
+ (void *) au_active,
+ &requirement);
+ if (qs < 0)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
+ return false;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Aggregator marks deposit %llu as done\n",
- (unsigned long long) row_id);
- qs = db_plugin->mark_deposit_done (db_plugin->cls,
- row_id);
- if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
+ if (NULL == requirement)
+ return true;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "KYC requirement for %s is %s\n",
+ TALER_amount2s (&au_active->total_amount),
+ requirement);
+ qs = db_plugin->insert_kyc_requirement_for_account (
+ db_plugin->cls,
+ requirement,
+ &au_active->h_payto,
+ NULL, /* not a reserve */
+ &au_active->requirement_row);
+ if (qs < 0)
{
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to persist KYC requirement `%s' in DB!\n",
+ requirement);
}
- return qs;
+ else
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Legitimization process %llu started\n",
+ (unsigned long long) au_active->requirement_row);
+ }
+ GNUNET_free (requirement);
+ return false;
}
/**
- * Function called with details about another deposit we
- * can aggregate into an existing aggregation unit.
+ * Function called on each @a amount that was found to
+ * be relevant for an AML check.
*
- * @param cls a `struct AggregationUnit`
- * @param row_id identifies database entry
- * @param coin_pub public key of the coin
- * @param amount_with_fee amount that was deposited including fee
- * @param deposit_fee amount the exchange gets to keep as transaction fees
- * @param h_contract_terms hash of the proposal data known to merchant and customer
- * @return transaction status code
+ * @param cls closure with the `struct TALER_Amount *` where we store the sum
+ * @param amount encountered transaction amount
+ * @param date when was the amount encountered
+ * @return #GNUNET_OK to continue to iterate,
+ * #GNUNET_NO to abort iteration
+ * #GNUNET_SYSERR on internal error (also abort itaration)
*/
-static enum GNUNET_DB_QueryStatus
-aggregate_cb (void *cls,
- uint64_t row_id,
- const struct TALER_CoinSpendPublicKeyP *coin_pub,
- const struct TALER_Amount *amount_with_fee,
- const struct TALER_Amount *deposit_fee,
- const struct TALER_PrivateContractHash *h_contract_terms)
+static enum GNUNET_GenericReturnValue
+sum_for_aml (
+ void *cls,
+ const struct TALER_Amount *amount,
+ struct GNUNET_TIME_Absolute date)
{
- struct AggregationUnit *au = cls;
- struct TALER_Amount old;
- enum GNUNET_DB_QueryStatus qs;
+ struct TALER_Amount *sum = cls;
- if (au->rows_offset >= TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT)
+ (void) date;
+ if (0 >
+ TALER_amount_add (sum,
+ sum,
+ amount))
{
- /* Bug: we asked for at most #TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT results! */
GNUNET_break (0);
- /* Skip this one, but keep going with the overall transaction */
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ return GNUNET_SYSERR;
}
+ return GNUNET_OK;
+}
- /* add to total */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Adding transaction amount %s from row %llu to aggregation\n",
- TALER_amount2s (amount_with_fee),
- (unsigned long long) row_id);
- /* save the existing total aggregate in 'old', for later */
- old = au->total_amount;
- /* we begin with the total contribution of the current coin */
- au->total_amount = *amount_with_fee;
- /* compute contribution of this coin (after fees) */
- au->have_refund = false;
- qs = db_plugin->select_refunds_by_coin (db_plugin->cls,
- coin_pub,
- &au->merchant_pub,
- h_contract_terms,
- &refund_by_coin_cb,
- au);
- if (0 > qs)
- {
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
- }
- if (! au->have_refund)
- {
- struct TALER_Amount tmp;
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Subtracting deposit fee %s for non-refunded coin\n",
- TALER_amount2s (deposit_fee));
- if (0 >
- TALER_amount_subtract (&tmp,
- &au->total_amount,
- deposit_fee))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Fatally malformed record at %llu over amount %s (deposit fee exceeds deposited value)\n",
- (unsigned long long) row_id,
- TALER_amount2s (&au->total_amount));
- GNUNET_assert (GNUNET_OK ==
- TALER_amount_set_zero (old.currency,
- &au->total_amount));
- }
- else
- {
- au->total_amount = tmp;
- }
- }
+/**
+ * Test if AML is required for a transfer to @a h_payto.
+ *
+ * @param[in,out] au_active aggregation unit to check for
+ * @return true if AML checks are satisfied
+ */
+static bool
+aml_satisfied (struct AggregationUnit *au_active)
+{
+ enum GNUNET_DB_QueryStatus qs;
+ struct TALER_Amount total;
+ struct TALER_Amount threshold;
+ enum TALER_AmlDecisionState decision;
+ struct TALER_EXCHANGEDB_KycStatus kyc;
- /* now add the au->total_amount with the (remaining) contribution of
- the current coin to the 'old' value with the current aggregate value */
+ total = au_active->final_amount;
+ qs = db_plugin->select_aggregation_amounts_for_kyc_check (
+ db_plugin->cls,
+ &au_active->h_payto,
+ GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (),
+ GNUNET_TIME_UNIT_MONTHS),
+ &sum_for_aml,
+ &total);
+ if (qs < 0)
{
- struct TALER_Amount tmp;
-
- if (0 >
- TALER_amount_add (&tmp,
- &au->total_amount,
- &old))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Overflow or currency incompatibility during aggregation at %llu\n",
- (unsigned long long) row_id);
- /* Skip this one, but keep going! */
- au->total_amount = old;
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
- }
- au->total_amount = tmp;
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
}
-
- /* "append" to our list of rows */
- au->additional_rows[au->rows_offset++] = row_id;
- /* insert into aggregation tracking table */
- qs = db_plugin->insert_aggregation_tracking (db_plugin->cls,
- &au->wtid,
- row_id);
- if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
+ qs = db_plugin->select_aml_threshold (db_plugin->cls,
+ &au_active->h_payto,
+ &decision,
+ &kyc,
+ &threshold);
+ if (qs < 0)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
+ return false;
}
- qs = db_plugin->mark_deposit_done (db_plugin->cls,
- row_id);
- if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT != qs)
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
{
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
- return qs;
+ threshold = aml_threshold; /* use default */
+ decision = TALER_AML_NORMAL;
}
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Aggregator marked deposit %llu as DONE\n",
- (unsigned long long) row_id);
- return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
+ switch (decision)
+ {
+ case TALER_AML_NORMAL:
+ if (0 >= TALER_amount_cmp (&total,
+ &threshold))
+ {
+ /* total <= threshold, do nothing */
+ return true;
+ }
+ qs = db_plugin->trigger_aml_process (db_plugin->cls,
+ &au_active->h_payto,
+ &total);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return false;
+ }
+ return false;
+ case TALER_AML_PENDING:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "AML already pending, doing nothing\n");
+ return false;
+ case TALER_AML_FROZEN:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Account frozen, doing nothing\n");
+ return false;
+ }
+ GNUNET_assert (0);
+ return false;
}
/**
- * Perform a database commit. If it fails, print a warning.
+ * Perform the main aggregation work for @a au. Expects to be in
+ * a working transaction, which the caller must also ultimately commit
+ * (or rollback) depending on our return value.
*
- * @return status of commit
+ * @param[in,out] au aggregation unit to work on
+ * @return #GNUNET_OK if aggregation succeeded,
+ * #GNUNET_NO to rollback and try again (serialization issue)
+ * #GNUNET_SYSERR hard error, terminate aggregator process
*/
-static enum GNUNET_DB_QueryStatus
-commit_or_warn (void)
+static enum GNUNET_GenericReturnValue
+do_aggregate (struct AggregationUnit *au)
{
enum GNUNET_DB_QueryStatus qs;
- qs = db_plugin->commit (db_plugin->cls);
- if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
- return qs;
- GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
- ? GNUNET_ERROR_TYPE_INFO
- : GNUNET_ERROR_TYPE_ERROR,
- "Failed to commit database transaction!\n");
- return qs;
-}
+ au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (
+ au->payto_uri);
+ if (NULL == au->wa)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "No exchange account configured for `%s', please fix your setup to continue!\n",
+ au->payto_uri);
+ global_ret = EXIT_FAILURE;
+ return GNUNET_SYSERR;
+ }
+ {
+ struct GNUNET_TIME_Timestamp start_date;
+ struct GNUNET_TIME_Timestamp end_date;
+ struct TALER_MasterSignatureP master_sig;
-/**
- * Release lock on shard @a s in the database.
- * On error, terminates this process.
- *
- * @param[in] s shard to free (and memory to release)
- */
-static void
-release_shard (struct Shard *s)
-{
- enum GNUNET_DB_QueryStatus qs;
+ qs = db_plugin->get_wire_fee (db_plugin->cls,
+ au->wa->method,
+ au->execution_time,
+ &start_date,
+ &end_date,
+ &au->fees,
+ &master_sig);
+ if (0 >= qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Could not get wire fees for %s at %s. Aborting run.\n",
+ au->wa->method,
+ GNUNET_TIME_timestamp2s (au->execution_time));
+ global_ret = EXIT_FAILURE;
+ return GNUNET_SYSERR;
+ }
+ }
- qs = db_plugin->release_revolving_shard (
- db_plugin->cls,
- "aggregator",
- s->shard_start,
- s->shard_end);
- GNUNET_free (s);
+ /* Now try to find other deposits to aggregate */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Found ready deposit for %s, aggregating by target %s\n",
+ TALER_B2S (&au->merchant_pub),
+ au->payto_uri);
+ qs = db_plugin->select_aggregation_transient (db_plugin->cls,
+ &au->h_payto,
+ &au->merchant_pub,
+ au->wa->section_name,
+ &au->wtid,
+ &au->trans);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to lookup transient aggregates!\n");
+ global_ret = EXIT_FAILURE;
+ return GNUNET_SYSERR;
case GNUNET_DB_STATUS_SOFT_ERROR:
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
- GNUNET_break (0);
- GNUNET_SCHEDULER_shutdown ();
- return;
+ /* serializiability issue, try again */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Serialization issue, trying again later!\n");
+ return GNUNET_NO;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
- /* Strange, but let's just continue */
+ GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
+ &au->wtid,
+ sizeof (au->wtid));
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No transient aggregation found, starting %s\n",
+ TALER_B2S (&au->wtid));
+ au->have_transient = false;
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
- /* normal case */
+ au->have_transient = true;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Transient aggregation found, resuming %s\n",
+ TALER_B2S (&au->wtid));
+ break;
+ }
+ qs = db_plugin->aggregate (db_plugin->cls,
+ &au->h_payto,
+ &au->merchant_pub,
+ &au->wtid,
+ &au->total_amount);
+ if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to execute aggregation!\n");
+ global_ret = EXIT_FAILURE;
+ return GNUNET_SYSERR;
+ }
+ if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ {
+ /* serializiability issue, try again */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Serialization issue, trying again later!\n");
+ return GNUNET_NO;
+ }
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Aggregation total is %s.\n",
+ TALER_amount2s (&au->total_amount));
+ /* Subtract wire transfer fee and round to the unit supported by the
+ wire transfer method; Check if after rounding down, we still have
+ an amount to transfer, and if not mark as 'tiny'. */
+ if (au->have_transient)
+ GNUNET_assert (0 <=
+ TALER_amount_add (&au->total_amount,
+ &au->total_amount,
+ &au->trans));
+
+
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Rounding aggregate of %s\n",
+ TALER_amount2s (&au->total_amount));
+ if ( (0 >=
+ TALER_amount_subtract (&au->final_amount,
+ &au->total_amount,
+ &au->fees.wire)) ||
+ (GNUNET_SYSERR ==
+ TALER_amount_round_down (&au->final_amount,
+ &currency_round_unit)) ||
+ (TALER_amount_is_zero (&au->final_amount)) ||
+ (! kyc_satisfied (au)) ||
+ (! aml_satisfied (au)) )
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Not ready for wire transfer (%d/%s)\n",
+ qs,
+ TALER_amount2s (&au->final_amount));
+ if (au->have_transient)
+ qs = db_plugin->update_aggregation_transient (db_plugin->cls,
+ &au->h_payto,
+ &au->wtid,
+ au->requirement_row,
+ &au->total_amount);
+ else
+ qs = db_plugin->create_aggregation_transient (db_plugin->cls,
+ &au->h_payto,
+ au->wa->section_name,
+ &au->merchant_pub,
+ &au->wtid,
+ au->requirement_row,
+ &au->total_amount);
+ if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Serialization issue, trying again later!\n");
+ return GNUNET_NO;
+ }
+ if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+ {
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ return GNUNET_SYSERR;
+ }
+ /* commit */
+ return GNUNET_OK;
+ }
+
+ qs = trigger_wire_transfer (au);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Serialization issue during aggregation; trying again later!\n")
+ ;
+ return GNUNET_NO;
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ return GNUNET_SYSERR;
+ default:
break;
}
+ {
+ struct TALER_CoinDepositEventP rep = {
+ .header.size = htons (sizeof (rep)),
+ .header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED),
+ .merchant_pub = au->merchant_pub
+ };
+
+ db_plugin->event_notify (db_plugin->cls,
+ &rep.header,
+ NULL,
+ 0);
+ }
+ return GNUNET_OK;
+
}
-/**
- * Main work function that queries the DB and aggregates transactions
- * into larger wire transfers.
- *
- * @param cls a `struct Shard *`
- */
static void
run_aggregation (void *cls)
{
struct Shard *s = cls;
struct AggregationUnit au_active;
enum GNUNET_DB_QueryStatus qs;
+ enum GNUNET_GenericReturnValue ret;
task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Checking for ready deposits to aggregate\n");
+ /* make sure we have current fees */
+ memset (&au_active,
+ 0,
+ sizeof (au_active));
+ au_active.execution_time = GNUNET_TIME_timestamp_get ();
if (GNUNET_OK !=
db_plugin->start_deferred_wire_out (db_plugin->cls))
{
@@ -706,16 +878,12 @@ run_aggregation (void *cls)
release_shard (s);
return;
}
- memset (&au_active,
- 0,
- sizeof (au_active));
qs = db_plugin->get_ready_deposit (
db_plugin->cls,
s->shard_start,
s->shard_end,
- kyc_off ? true : false,
- &deposit_cb,
- &au_active);
+ &au_active.merchant_pub,
+ &au_active.payto_uri);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
@@ -738,7 +906,7 @@ run_aggregation (void *cls)
{
uint64_t counter = s->work_counter;
struct GNUNET_TIME_Relative duration
- = GNUNET_TIME_absolute_get_duration (s->start_time);
+ = GNUNET_TIME_absolute_get_duration (s->start_time.abs_time);
cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls);
@@ -746,210 +914,71 @@ run_aggregation (void *cls)
"Completed shard [%u,%u] after %s with %llu deposits\n",
(unsigned int) s->shard_start,
(unsigned int) s->shard_end,
- GNUNET_STRINGS_relative_time_to_string (duration,
- GNUNET_YES),
+ GNUNET_TIME_relative2s (duration,
+ true),
(unsigned long long) counter);
release_shard (s);
if ( (GNUNET_YES == test_mode) &&
(0 == counter) )
{
/* in test mode, shutdown after a shard is done with 0 work */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "No work done and in test mode, shutting down\n");
GNUNET_SCHEDULER_shutdown ();
return;
}
GNUNET_assert (NULL == task);
/* If we ended up doing zero work, sleep a bit */
if (0 == counter)
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Going to sleep for %s before trying again\n",
+ GNUNET_TIME_relative2s (aggregator_idle_sleep_interval,
+ true));
task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
- &run_shard,
+ &drain_kyc_alerts,
NULL);
+ }
else
- task = GNUNET_SCHEDULER_add_now (&run_shard,
+ {
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
+ }
return;
}
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
s->work_counter++;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Found ready deposit!\n");
/* continued below */
break;
}
- /* Now try to find other deposits to aggregate */
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Found ready deposit for %s, aggregating by target %llu\n",
- TALER_B2S (&au_active.merchant_pub),
- (unsigned long long) au_active.wire_target);
- qs = db_plugin->iterate_matching_deposits (db_plugin->cls,
- au_active.wire_target,
- &au_active.merchant_pub,
- &aggregate_cb,
- &au_active,
- TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT);
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
+ TALER_payto_hash (au_active.payto_uri,
+ &au_active.h_payto);
+ ret = do_aggregate (&au_active);
+ cleanup_au (&au_active);
+ switch (ret)
{
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to execute deposit iteration!\n");
- cleanup_au (&au_active);
- db_plugin->rollback (db_plugin->cls);
+ case GNUNET_SYSERR:
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
+ db_plugin->rollback (db_plugin->cls);
release_shard (s);
return;
- }
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
- /* serializiability issue, try again */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Serialization issue, trying again later!\n");
+ case GNUNET_NO:
db_plugin->rollback (db_plugin->cls);
- cleanup_au (&au_active);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
return;
+ case GNUNET_OK:
+ /* continued below */
+ break;
}
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Found %d other deposits to combine into wire transfer.\n",
- qs);
-
- /* Subtract wire transfer fee and round to the unit supported by the
- wire transfer method; Check if after rounding down, we still have
- an amount to transfer, and if not mark as 'tiny'. */
- if ( (0 >=
- TALER_amount_subtract (&au_active.final_amount,
- &au_active.total_amount,
- &au_active.wire_fee)) ||
- (GNUNET_SYSERR ==
- TALER_amount_round_down (&au_active.final_amount,
- &currency_round_unit)) ||
- ( (0 == au_active.final_amount.value) &&
- (0 == au_active.final_amount.fraction) ) )
- {
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Aggregate value too low for transfer (%d/%s)\n",
- qs,
- TALER_amount2s (&au_active.final_amount));
- /* Rollback ongoing transaction, as we will not use the respective
- WTID and thus need to remove the tracking data */
- db_plugin->rollback (db_plugin->cls);
- /* There were results, just the value was too low. Start another
- transaction to mark all* of the selected deposits as minor! */
- if (GNUNET_OK !=
- db_plugin->start (db_plugin->cls,
- "aggregator mark tiny transactions"))
- {
- GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
- "Failed to start database transaction!\n");
- global_ret = EXIT_FAILURE;
- cleanup_au (&au_active);
- GNUNET_SCHEDULER_shutdown ();
- release_shard (s);
- return;
- }
- /* Mark transactions by row_id as minor */
- qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
- au_active.row_id);
- if (0 <= qs)
- {
- for (unsigned int i = 0; i<au_active.rows_offset; i++)
- {
- qs = db_plugin->mark_deposit_tiny (db_plugin->cls,
- au_active.additional_rows[i]);
- if (0 > qs)
- break;
- }
- }
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Serialization issue, trying again later!\n");
- db_plugin->rollback (db_plugin->cls);
- cleanup_au (&au_active);
- /* start again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- s);
- return;
- }
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
- {
- db_plugin->rollback (db_plugin->cls);
- cleanup_au (&au_active);
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- release_shard (s);
- return;
- }
- /* commit */
- (void) commit_or_warn ();
- cleanup_au (&au_active);
-
- /* start again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- s);
- return;
- }
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Preparing wire transfer of %s to %s\n",
- TALER_amount2s (&au_active.final_amount),
- TALER_B2S (&au_active.merchant_pub));
- {
- void *buf;
- size_t buf_size;
-
- TALER_BANK_prepare_transfer (au_active.payto_uri,
- &au_active.final_amount,
- exchange_base_url,
- &au_active.wtid,
- &buf,
- &buf_size);
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Storing %u bytes of wire prepare data\n",
- (unsigned int) buf_size);
- /* Commit our intention to execute the wire transfer! */
- qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
- au_active.wa->method,
- buf,
- buf_size);
- GNUNET_free (buf);
- }
- /* Commit the WTID data to 'wire_out' to finally satisfy aggregation
- table constraints */
- if (qs >= 0)
- qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
- au_active.execution_time,
- &au_active.wtid,
- au_active.wire_target,
- au_active.wa->section_name,
- &au_active.final_amount);
- cleanup_au (&au_active);
-
- if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
- {
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Serialization issue for prepared wire data; trying again later!\n");
- db_plugin->rollback (db_plugin->cls);
- /* start again */
- GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_aggregation,
- s);
- return;
- }
- if (GNUNET_DB_STATUS_HARD_ERROR == qs)
- {
- GNUNET_break (0);
- db_plugin->rollback (db_plugin->cls);
- /* die hard */
- global_ret = EXIT_FAILURE;
- GNUNET_SCHEDULER_shutdown ();
- release_shard (s);
- return;
- }
-
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Stored wire transfer out instructions\n");
+ "Committing aggregation result\n");
/* Now we can finally commit the overall transaction, as we are
again consistent if all of this passes. */
@@ -957,8 +986,8 @@ run_aggregation (void *cls)
{
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
- GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
- "Commit issue for prepared wire data; trying again later!\n");
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Serialization issue on commit; trying again later!\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
@@ -967,11 +996,12 @@ run_aggregation (void *cls)
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
+ db_plugin->rollback (db_plugin->cls); /* just in case */
release_shard (s);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Preparation complete, going again\n");
+ "Commit complete, going again\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
@@ -980,6 +1010,7 @@ run_aggregation (void *cls)
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
+ db_plugin->rollback (db_plugin->cls); /* just in case */
release_shard (s);
return;
}
@@ -999,6 +1030,8 @@ run_shard (void *cls)
(void) cls;
task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Running aggregation shard\n");
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
{
@@ -1009,7 +1042,7 @@ run_shard (void *cls)
return;
}
s = GNUNET_new (struct Shard);
- s->start_time = GNUNET_TIME_absolute_get ();
+ s->start_time = GNUNET_TIME_timestamp_get ();
qs = db_plugin->begin_revolving_shard (db_plugin->cls,
"aggregator",
shard_size,
@@ -1025,6 +1058,7 @@ run_shard (void *cls)
GNUNET_free (s);
delay = GNUNET_TIME_randomized_backoff (delay,
GNUNET_TIME_UNIT_SECONDS);
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_delayed (delay,
&run_shard,
NULL);
@@ -1042,12 +1076,207 @@ run_shard (void *cls)
"Starting shard [%u:%u]!\n",
(unsigned int) s->shard_start,
(unsigned int) s->shard_end);
+ GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
}
/**
+ * Function called on transient aggregations matching
+ * a particular hash of a payto URI.
+ *
+ * @param cls
+ * @param payto_uri corresponding payto URI
+ * @param wtid wire transfer identifier of transient aggregation
+ * @param merchant_pub public key of the merchant
+ * @param total amount aggregated so far
+ * @return true to continue to iterate
+ */
+static bool
+handle_transient_cb (
+ void *cls,
+ const char *payto_uri,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_Amount *total)
+{
+ struct AggregationUnit *au = cls;
+
+ if (GNUNET_OK != au->ret)
+ {
+ GNUNET_break (0);
+ return false;
+ }
+ au->payto_uri = GNUNET_strdup (payto_uri);
+ au->wtid = *wtid;
+ au->merchant_pub = *merchant_pub;
+ au->trans = *total;
+ au->have_transient = true;
+ au->ret = do_aggregate (au);
+ GNUNET_free (au->payto_uri);
+ return (GNUNET_OK == au->ret);
+}
+
+
+static void
+drain_kyc_alerts (void *cls)
+{
+ enum GNUNET_DB_QueryStatus qs;
+ struct AggregationUnit au;
+
+ (void) cls;
+ task = NULL;
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Draining KYC alerts\n");
+ memset (&au,
+ 0,
+ sizeof (au));
+ au.execution_time = GNUNET_TIME_timestamp_get ();
+ if (GNUNET_SYSERR ==
+ db_plugin->preflight (db_plugin->cls))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to obtain database connection!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ if (GNUNET_OK !=
+ db_plugin->start (db_plugin->cls,
+ "handle kyc alerts"))
+ {
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to start database transaction!\n");
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ while (1)
+ {
+ qs = db_plugin->drain_kyc_alert (db_plugin->cls,
+ 1,
+ &au.h_payto);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ qs = db_plugin->commit (db_plugin->cls);
+ if (qs < 0)
+ GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
+ "Failed to commit KYC drain\n");
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&run_shard,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
+ /* handled below */
+ break;
+ }
+
+ au.ret = GNUNET_OK;
+ qs = db_plugin->find_aggregation_transient (db_plugin->cls,
+ &au.h_payto,
+ &handle_transient_cb,
+ &au);
+ switch (qs)
+ {
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
+ "Failed to lookup transient aggregates!\n");
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* serializiability issue, try again */
+ GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
+ "Serialization issue, trying again later!\n");
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ continue; /* while (1) */
+ default:
+ break;
+ }
+ break;
+ } /* while(1) */
+
+ {
+ enum GNUNET_GenericReturnValue ret;
+
+ ret = au.ret;
+ cleanup_au (&au);
+ switch (ret)
+ {
+ case GNUNET_SYSERR:
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ db_plugin->rollback (db_plugin->cls); /* just in case */
+ return;
+ case GNUNET_NO:
+ db_plugin->rollback (db_plugin->cls);
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ case GNUNET_OK:
+ /* continued below */
+ break;
+ }
+ }
+
+ switch (commit_or_warn ())
+ {
+ case GNUNET_DB_STATUS_SOFT_ERROR:
+ /* try again */
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Serialization issue on commit; trying again later!\n");
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ case GNUNET_DB_STATUS_HARD_ERROR:
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ db_plugin->rollback (db_plugin->cls); /* just in case */
+ return;
+ case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
+ GNUNET_log (GNUNET_ERROR_TYPE_INFO,
+ "Commit complete, going again\n");
+ GNUNET_assert (NULL == task);
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
+ NULL);
+ return;
+ default:
+ GNUNET_break (0);
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ db_plugin->rollback (db_plugin->cls); /* just in case */
+ return;
+ }
+}
+
+
+/**
* First task.
*
* @param cls closure, NULL
@@ -1067,7 +1296,8 @@ run (void *cls,
(void) cfgfile;
cfg = c;
- if (GNUNET_OK != parse_wirewatch_config ())
+ if (GNUNET_OK !=
+ parse_aggregator_config ())
{
cfg = NULL;
global_ret = EXIT_NOTCONFIGURED;
@@ -1088,11 +1318,18 @@ run (void *cls,
shard_size = 1U + INT32_MAX;
else
shard_size = (uint32_t) ass;
+ if (GNUNET_OK !=
+ TALER_KYCLOGIC_kyc_init (cfg))
+ {
+ cfg = NULL;
+ global_ret = EXIT_NOTCONFIGURED;
+ return;
+ }
+ GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
+ NULL);
GNUNET_assert (NULL == task);
- task = GNUNET_SCHEDULER_add_now (&run_shard,
+ task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
- GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
- cls);
}