/*
This file is part of TALER
Copyright (C) 2016-2022 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see
*/
/**
* @file taler-exchange-aggregator.c
* @brief Process that aggregates outgoing transactions and prepares their execution
* @author Christian Grothoff
*/
#include "platform.h"
#include
#include
#include
#include "taler_exchangedb_lib.h"
#include "taler_exchangedb_plugin.h"
#include "taler_json_lib.h"
#include "taler_kyclogic_lib.h"
#include "taler_bank_service.h"
#include "taler_dbevents.h"
/**
* Information about one aggregation process to be executed. There is
* at most one of these around at any given point in time.
* Note that this limits parallelism, and we might want
* to revise this decision at a later point.
*/
struct AggregationUnit
{
/**
* Public key of the merchant.
*/
struct TALER_MerchantPublicKeyP merchant_pub;
/**
* Transient amount already found aggregated,
* set only if @e have_transient is true.
*/
struct TALER_Amount trans;
/**
* Total amount to be transferred, before subtraction of @e fees.wire and rounding down.
*/
struct TALER_Amount total_amount;
/**
* Final amount to be transferred (after fee and rounding down).
*/
struct TALER_Amount final_amount;
/**
* Wire fee we charge for @e wp at @e execution_time.
*/
struct TALER_WireFeeSet fees;
/**
* Wire transfer identifier we use.
*/
struct TALER_WireTransferIdentifierRawP wtid;
/**
* The current time (which triggered the aggregation and
* defines the wire fee).
*/
struct GNUNET_TIME_Timestamp execution_time;
/**
* Wire details of the merchant.
*/
char *payto_uri;
/**
* Selected wire target for the aggregation.
*/
struct TALER_PaytoHashP h_payto;
/**
* Exchange wire account to be used for the preparation and
* eventual execution of the aggregate wire transfer.
*/
const struct TALER_EXCHANGEDB_AccountInfo *wa;
/**
* Row in KYC table for legitimization requirements
* that are pending for this aggregation, or 0 if none.
*/
uint64_t requirement_row;
/**
* Set to #GNUNET_OK during transient checking
* while everything is OK. Otherwise see return
* value of #do_aggregate().
*/
enum GNUNET_GenericReturnValue ret;
/**
* Do we have an entry in the transient table for
* this aggregation?
*/
bool have_transient;
};
/**
* Work shard we are processing.
*/
struct Shard
{
/**
* When did we start processing the shard?
*/
struct GNUNET_TIME_Timestamp start_time;
/**
* Starting row of the shard.
*/
uint32_t shard_start;
/**
* Inclusive end row of the shard.
*/
uint32_t shard_end;
/**
* Number of starting points found in the shard.
*/
uint64_t work_counter;
};
/**
* What is the smallest unit we support for wire transfers?
* We will need to round down to a multiple of this amount.
*/
static struct TALER_Amount currency_round_unit;
/**
* What is the largest amount we transfer before triggering
* an AML check?
*/
static struct TALER_Amount aml_threshold;
/**
* What is the base URL of this exchange? Used in the
* wire transfer subjects so that merchants and governments
* can ask for the list of aggregated deposits.
*/
static char *exchange_base_url;
/**
* Set to #GNUNET_YES if this exchange does not support KYC checks
* and thus deposits are to be aggregated regardless of the
* KYC status of the target account.
*/
static int kyc_off;
/**
* The exchange's configuration.
*/
static const struct GNUNET_CONFIGURATION_Handle *cfg;
/**
* Our database plugin.
*/
static struct TALER_EXCHANGEDB_Plugin *db_plugin;
/**
* Next task to run, if any.
*/
static struct GNUNET_SCHEDULER_Task *task;
/**
* How long should we sleep when idle before trying to find more work?
*/
static struct GNUNET_TIME_Relative aggregator_idle_sleep_interval;
/**
* How big are the shards we are processing? Is an inclusive offset, so every
* shard ranges from [X,X+shard_size) exclusive. So a shard covers
* shard_size slots. The maximum value for shard_size is INT32_MAX+1.
*/
static uint32_t shard_size;
/**
* Value to return from main(). 0 on success, non-zero on errors.
*/
static int global_ret;
/**
* #GNUNET_YES if we are in test mode and should exit when idle.
*/
static int test_mode;
/**
* Main work function that queries the DB and aggregates transactions
* into larger wire transfers.
*
* @param cls a `struct Shard *`
*/
static void
run_aggregation (void *cls);
/**
* Work on transactions unlocked by KYC.
*
* @param cls NULL
*/
static void
drain_kyc_alerts (void *cls);
/**
* Free data stored in @a au, but not @a au itself (stack allocated).
*
* @param au aggregation unit to clean up
*/
static void
cleanup_au (struct AggregationUnit *au)
{
GNUNET_assert (NULL != au);
GNUNET_free (au->payto_uri);
memset (au,
0,
sizeof (*au));
}
/**
* We're being aborted with CTRL-C (or SIGTERM). Shut down.
*
* @param cls closure
*/
static void
shutdown_task (void *cls)
{
(void) cls;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Running shutdown\n");
if (NULL != task)
{
GNUNET_SCHEDULER_cancel (task);
task = NULL;
}
TALER_KYCLOGIC_kyc_done ();
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
TALER_EXCHANGEDB_unload_accounts ();
cfg = NULL;
}
/**
* Parse the configuration for aggregator.
*
* @return #GNUNET_OK on success
*/
static enum GNUNET_GenericReturnValue
parse_aggregator_config (void)
{
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_string (cfg,
"exchange",
"BASE_URL",
&exchange_base_url))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"exchange",
"BASE_URL");
return GNUNET_SYSERR;
}
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_time (cfg,
"exchange",
"AGGREGATOR_IDLE_SLEEP_INTERVAL",
&aggregator_idle_sleep_interval))
{
GNUNET_log_config_missing (GNUNET_ERROR_TYPE_ERROR,
"exchange",
"AGGREGATOR_IDLE_SLEEP_INTERVAL");
return GNUNET_SYSERR;
}
if ( (GNUNET_OK !=
TALER_config_get_amount (cfg,
"taler",
"CURRENCY_ROUND_UNIT",
¤cy_round_unit)) ||
(TALER_amount_is_zero (¤cy_round_unit)) )
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Need non-zero amount in section `taler' under `CURRENCY_ROUND_UNIT'\n");
return GNUNET_SYSERR;
}
if (GNUNET_OK !=
TALER_config_get_amount (cfg,
"exchange",
"AML_THRESHOLD",
&aml_threshold))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Need amount in section `exchange' under `AML_THRESHOLD'\n");
return GNUNET_SYSERR;
}
if (NULL ==
(db_plugin = TALER_EXCHANGEDB_plugin_load (cfg)))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to initialize DB subsystem\n");
return GNUNET_SYSERR;
}
if (GNUNET_OK !=
TALER_EXCHANGEDB_load_accounts (cfg,
TALER_EXCHANGEDB_ALO_DEBIT))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"No wire accounts configured for debit!\n");
TALER_EXCHANGEDB_plugin_unload (db_plugin);
db_plugin = NULL;
return GNUNET_SYSERR;
}
return GNUNET_OK;
}
/**
* Perform a database commit. If it fails, print a warning.
*
* @return status of commit
*/
static enum GNUNET_DB_QueryStatus
commit_or_warn (void)
{
enum GNUNET_DB_QueryStatus qs;
qs = db_plugin->commit (db_plugin->cls);
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
return qs;
GNUNET_log ((GNUNET_DB_STATUS_SOFT_ERROR == qs)
? GNUNET_ERROR_TYPE_INFO
: GNUNET_ERROR_TYPE_ERROR,
"Failed to commit database transaction!\n");
return qs;
}
/**
* Release lock on shard @a s in the database.
* On error, terminates this process.
*
* @param[in] s shard to free (and memory to release)
*/
static void
release_shard (struct Shard *s)
{
enum GNUNET_DB_QueryStatus qs;
qs = db_plugin->release_revolving_shard (
db_plugin->cls,
"aggregator",
s->shard_start,
s->shard_end);
GNUNET_free (s);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR != qs);
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
/* Strange, but let's just continue */
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* normal case */
break;
}
}
/**
* Trigger the wire transfer for the @a au_active
* and delete the record of the aggregation.
*
* @param au_active information about the aggregation
*/
static enum GNUNET_DB_QueryStatus
trigger_wire_transfer (const struct AggregationUnit *au_active)
{
enum GNUNET_DB_QueryStatus qs;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Preparing wire transfer of %s to %s\n",
TALER_amount2s (&au_active->final_amount),
TALER_B2S (&au_active->merchant_pub));
{
void *buf;
size_t buf_size;
TALER_BANK_prepare_transfer (au_active->payto_uri,
&au_active->final_amount,
exchange_base_url,
&au_active->wtid,
&buf,
&buf_size);
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Storing %u bytes of wire prepare data\n",
(unsigned int) buf_size);
/* Commit our intention to execute the wire transfer! */
qs = db_plugin->wire_prepare_data_insert (db_plugin->cls,
au_active->wa->method,
buf,
buf_size);
GNUNET_free (buf);
}
/* Commit the WTID data to 'wire_out' */
if (qs >= 0)
qs = db_plugin->store_wire_transfer_out (db_plugin->cls,
au_active->execution_time,
&au_active->wtid,
&au_active->h_payto,
au_active->wa->section_name,
&au_active->final_amount);
if ( (qs >= 0) &&
au_active->have_transient)
qs = db_plugin->delete_aggregation_transient (db_plugin->cls,
&au_active->h_payto,
&au_active->wtid);
return qs;
}
/**
* Callback to return all applicable amounts for the KYC
* decision to @ a cb.
*
* @param cls a `struct AggregationUnit *`
* @param limit time limit for the iteration
* @param cb function to call with the amounts
* @param cb_cls closure for @a cb
*/
static void
return_relevant_amounts (void *cls,
struct GNUNET_TIME_Absolute limit,
TALER_EXCHANGEDB_KycAmountCallback cb,
void *cb_cls)
{
const struct AggregationUnit *au_active = cls;
enum GNUNET_DB_QueryStatus qs;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Returning amount %s in KYC check\n",
TALER_amount2s (&au_active->total_amount));
if (GNUNET_OK !=
cb (cb_cls,
&au_active->total_amount,
GNUNET_TIME_absolute_get ()))
return;
qs = db_plugin->select_aggregation_amounts_for_kyc_check (
db_plugin->cls,
&au_active->h_payto,
limit,
cb,
cb_cls);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to select aggregation amounts for KYC limit check!\n");
}
}
/**
* Test if KYC is required for a transfer to @a h_payto.
*
* @param[in,out] au_active aggregation unit to check for
* @return true if KYC checks are satisfied
*/
static bool
kyc_satisfied (struct AggregationUnit *au_active)
{
char *requirement;
enum GNUNET_DB_QueryStatus qs;
if (kyc_off)
return true;
qs = TALER_KYCLOGIC_kyc_test_required (
TALER_KYCLOGIC_KYC_TRIGGER_DEPOSIT,
&au_active->h_payto,
db_plugin->select_satisfied_kyc_processes,
db_plugin->cls,
&return_relevant_amounts,
(void *) au_active,
&requirement);
if (qs < 0)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return false;
}
if (NULL == requirement)
return true;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"KYC requirement for %s is %s\n",
TALER_amount2s (&au_active->total_amount),
requirement);
qs = db_plugin->insert_kyc_requirement_for_account (
db_plugin->cls,
requirement,
&au_active->h_payto,
NULL, /* not a reserve */
&au_active->requirement_row);
if (qs < 0)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to persist KYC requirement `%s' in DB!\n",
requirement);
}
else
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Legitimization process %llu started\n",
(unsigned long long) au_active->requirement_row);
}
GNUNET_free (requirement);
return false;
}
/**
* Function called on each @a amount that was found to
* be relevant for an AML check.
*
* @param cls closure with the `struct TALER_Amount *` where we store the sum
* @param amount encountered transaction amount
* @param date when was the amount encountered
* @return #GNUNET_OK to continue to iterate,
* #GNUNET_NO to abort iteration
* #GNUNET_SYSERR on internal error (also abort itaration)
*/
static enum GNUNET_GenericReturnValue
sum_for_aml (
void *cls,
const struct TALER_Amount *amount,
struct GNUNET_TIME_Absolute date)
{
struct TALER_Amount *sum = cls;
(void) date;
if (0 >
TALER_amount_add (sum,
sum,
amount))
{
GNUNET_break (0);
return GNUNET_SYSERR;
}
return GNUNET_OK;
}
/**
* Test if AML is required for a transfer to @a h_payto.
*
* @param[in,out] au_active aggregation unit to check for
* @return true if AML checks are satisfied
*/
static bool
aml_satisfied (struct AggregationUnit *au_active)
{
enum GNUNET_DB_QueryStatus qs;
struct TALER_Amount total;
struct TALER_Amount threshold;
enum TALER_AmlDecisionState decision;
struct TALER_EXCHANGEDB_KycStatus kyc;
total = au_active->final_amount;
qs = db_plugin->select_aggregation_amounts_for_kyc_check (
db_plugin->cls,
&au_active->h_payto,
GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (),
GNUNET_TIME_UNIT_MONTHS),
&sum_for_aml,
&total);
if (qs < 0)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return false;
}
qs = db_plugin->select_aml_threshold (db_plugin->cls,
&au_active->h_payto,
&decision,
&kyc,
&threshold);
if (qs < 0)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return false;
}
if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
{
threshold = aml_threshold; /* use default */
decision = TALER_AML_NORMAL;
}
switch (decision)
{
case TALER_AML_NORMAL:
if (0 >= TALER_amount_cmp (&total,
&threshold))
{
/* total <= threshold, do nothing */
return true;
}
qs = db_plugin->trigger_aml_process (db_plugin->cls,
&au_active->h_payto,
&total);
if (qs < 0)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
return false;
}
return false;
case TALER_AML_PENDING:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"AML already pending, doing nothing\n");
return false;
case TALER_AML_FROZEN:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Account frozen, doing nothing\n");
return false;
}
GNUNET_assert (0);
return false;
}
/**
* Perform the main aggregation work for @a au. Expects to be in
* a working transaction, which the caller must also ultimately commit
* (or rollback) depending on our return value.
*
* @param[in,out] au aggregation unit to work on
* @return #GNUNET_OK if aggregation succeeded,
* #GNUNET_NO to rollback and try again (serialization issue)
* #GNUNET_SYSERR hard error, terminate aggregator process
*/
static enum GNUNET_GenericReturnValue
do_aggregate (struct AggregationUnit *au)
{
enum GNUNET_DB_QueryStatus qs;
au->wa = TALER_EXCHANGEDB_find_account_by_payto_uri (
au->payto_uri);
if (NULL == au->wa)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"No exchange account configured for `%s', please fix your setup to continue!\n",
au->payto_uri);
global_ret = EXIT_FAILURE;
return GNUNET_SYSERR;
}
{
struct GNUNET_TIME_Timestamp start_date;
struct GNUNET_TIME_Timestamp end_date;
struct TALER_MasterSignatureP master_sig;
qs = db_plugin->get_wire_fee (db_plugin->cls,
au->wa->method,
au->execution_time,
&start_date,
&end_date,
&au->fees,
&master_sig);
if (0 >= qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Could not get wire fees for %s at %s. Aborting run.\n",
au->wa->method,
GNUNET_TIME_timestamp2s (au->execution_time));
global_ret = EXIT_FAILURE;
return GNUNET_SYSERR;
}
}
/* Now try to find other deposits to aggregate */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Found ready deposit for %s, aggregating by target %s\n",
TALER_B2S (&au->merchant_pub),
au->payto_uri);
qs = db_plugin->select_aggregation_transient (db_plugin->cls,
&au->h_payto,
&au->merchant_pub,
au->wa->section_name,
&au->wtid,
&au->trans);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to lookup transient aggregates!\n");
global_ret = EXIT_FAILURE;
return GNUNET_SYSERR;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* serializiability issue, try again */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Serialization issue, trying again later!\n");
return GNUNET_NO;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_CRYPTO_random_block (GNUNET_CRYPTO_QUALITY_NONCE,
&au->wtid,
sizeof (au->wtid));
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No transient aggregation found, starting %s\n",
TALER_B2S (&au->wtid));
au->have_transient = false;
break;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
au->have_transient = true;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Transient aggregation found, resuming %s\n",
TALER_B2S (&au->wtid));
break;
}
qs = db_plugin->aggregate (db_plugin->cls,
&au->h_payto,
&au->merchant_pub,
&au->wtid,
&au->total_amount);
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to execute aggregation!\n");
global_ret = EXIT_FAILURE;
return GNUNET_SYSERR;
}
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
/* serializiability issue, try again */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Serialization issue, trying again later!\n");
return GNUNET_NO;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Aggregation total is %s.\n",
TALER_amount2s (&au->total_amount));
/* Subtract wire transfer fee and round to the unit supported by the
wire transfer method; Check if after rounding down, we still have
an amount to transfer, and if not mark as 'tiny'. */
if (au->have_transient)
GNUNET_assert (0 <=
TALER_amount_add (&au->total_amount,
&au->total_amount,
&au->trans));
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Rounding aggregate of %s\n",
TALER_amount2s (&au->total_amount));
if ( (0 >=
TALER_amount_subtract (&au->final_amount,
&au->total_amount,
&au->fees.wire)) ||
(GNUNET_SYSERR ==
TALER_amount_round_down (&au->final_amount,
¤cy_round_unit)) ||
(TALER_amount_is_zero (&au->final_amount)) ||
(! kyc_satisfied (au)) ||
(! aml_satisfied (au)) )
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Not ready for wire transfer (%d/%s)\n",
qs,
TALER_amount2s (&au->final_amount));
if (au->have_transient)
qs = db_plugin->update_aggregation_transient (db_plugin->cls,
&au->h_payto,
&au->wtid,
au->requirement_row,
&au->total_amount);
else
qs = db_plugin->create_aggregation_transient (db_plugin->cls,
&au->h_payto,
au->wa->section_name,
&au->merchant_pub,
&au->wtid,
au->requirement_row,
&au->total_amount);
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serialization issue, trying again later!\n");
return GNUNET_NO;
}
if (GNUNET_DB_STATUS_HARD_ERROR == qs)
{
GNUNET_break (0);
global_ret = EXIT_FAILURE;
return GNUNET_SYSERR;
}
/* commit */
return GNUNET_OK;
}
qs = trigger_wire_transfer (au);
switch (qs)
{
case GNUNET_DB_STATUS_SOFT_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serialization issue during aggregation; trying again later!\n")
;
return GNUNET_NO;
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
global_ret = EXIT_FAILURE;
return GNUNET_SYSERR;
default:
break;
}
{
struct TALER_CoinDepositEventP rep = {
.header.size = htons (sizeof (rep)),
.header.type = htons (TALER_DBEVENT_EXCHANGE_DEPOSIT_STATUS_CHANGED),
.merchant_pub = au->merchant_pub
};
db_plugin->event_notify (db_plugin->cls,
&rep.header,
NULL,
0);
}
return GNUNET_OK;
}
static void
run_aggregation (void *cls)
{
struct Shard *s = cls;
struct AggregationUnit au_active;
enum GNUNET_DB_QueryStatus qs;
enum GNUNET_GenericReturnValue ret;
task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Checking for ready deposits to aggregate\n");
/* make sure we have current fees */
memset (&au_active,
0,
sizeof (au_active));
au_active.execution_time = GNUNET_TIME_timestamp_get ();
if (GNUNET_OK !=
db_plugin->start_deferred_wire_out (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
release_shard (s);
return;
}
qs = db_plugin->get_ready_deposit (
db_plugin->cls,
s->shard_start,
s->shard_end,
&au_active.merchant_pub,
&au_active.payto_uri);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls);
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to begin deposit iteration!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
release_shard (s);
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
{
uint64_t counter = s->work_counter;
struct GNUNET_TIME_Relative duration
= GNUNET_TIME_absolute_get_duration (s->start_time.abs_time);
cleanup_au (&au_active);
db_plugin->rollback (db_plugin->cls);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Completed shard [%u,%u] after %s with %llu deposits\n",
(unsigned int) s->shard_start,
(unsigned int) s->shard_end,
GNUNET_TIME_relative2s (duration,
true),
(unsigned long long) counter);
release_shard (s);
if ( (GNUNET_YES == test_mode) &&
(0 == counter) )
{
/* in test mode, shutdown after a shard is done with 0 work */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"No work done and in test mode, shutting down\n");
GNUNET_SCHEDULER_shutdown ();
return;
}
GNUNET_assert (NULL == task);
/* If we ended up doing zero work, sleep a bit */
if (0 == counter)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Going to sleep for %s before trying again\n",
GNUNET_TIME_relative2s (aggregator_idle_sleep_interval,
true));
task = GNUNET_SCHEDULER_add_delayed (aggregator_idle_sleep_interval,
&drain_kyc_alerts,
NULL);
}
else
{
task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
}
return;
}
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
s->work_counter++;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Found ready deposit!\n");
/* continued below */
break;
}
TALER_payto_hash (au_active.payto_uri,
&au_active.h_payto);
ret = do_aggregate (&au_active);
cleanup_au (&au_active);
switch (ret)
{
case GNUNET_SYSERR:
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
db_plugin->rollback (db_plugin->cls);
release_shard (s);
return;
case GNUNET_NO:
db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
return;
case GNUNET_OK:
/* continued below */
break;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Committing aggregation result\n");
/* Now we can finally commit the overall transaction, as we are
again consistent if all of this passes. */
switch (commit_or_warn ())
{
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serialization issue on commit; trying again later!\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
return;
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
db_plugin->rollback (db_plugin->cls); /* just in case */
release_shard (s);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Commit complete, going again\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
return;
default:
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
db_plugin->rollback (db_plugin->cls); /* just in case */
release_shard (s);
return;
}
}
/**
* Select a shard to work on.
*
* @param cls NULL
*/
static void
run_shard (void *cls)
{
struct Shard *s;
enum GNUNET_DB_QueryStatus qs;
(void) cls;
task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Running aggregation shard\n");
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database connection!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
s = GNUNET_new (struct Shard);
s->start_time = GNUNET_TIME_timestamp_get ();
qs = db_plugin->begin_revolving_shard (db_plugin->cls,
"aggregator",
shard_size,
1U + INT32_MAX,
&s->shard_start,
&s->shard_end);
if (0 >= qs)
{
if (GNUNET_DB_STATUS_SOFT_ERROR == qs)
{
static struct GNUNET_TIME_Relative delay;
GNUNET_free (s);
delay = GNUNET_TIME_randomized_backoff (delay,
GNUNET_TIME_UNIT_SECONDS);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_delayed (delay,
&run_shard,
NULL);
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to begin shard (%d)!\n",
qs);
GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR != qs);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Starting shard [%u:%u]!\n",
(unsigned int) s->shard_start,
(unsigned int) s->shard_end);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_aggregation,
s);
}
/**
* Function called on transient aggregations matching
* a particular hash of a payto URI.
*
* @param cls
* @param payto_uri corresponding payto URI
* @param wtid wire transfer identifier of transient aggregation
* @param merchant_pub public key of the merchant
* @param total amount aggregated so far
* @return true to continue to iterate
*/
static bool
handle_transient_cb (
void *cls,
const char *payto_uri,
const struct TALER_WireTransferIdentifierRawP *wtid,
const struct TALER_MerchantPublicKeyP *merchant_pub,
const struct TALER_Amount *total)
{
struct AggregationUnit *au = cls;
if (GNUNET_OK != au->ret)
{
GNUNET_break (0);
return false;
}
au->payto_uri = GNUNET_strdup (payto_uri);
au->wtid = *wtid;
au->merchant_pub = *merchant_pub;
au->trans = *total;
au->have_transient = true;
au->ret = do_aggregate (au);
GNUNET_free (au->payto_uri);
return (GNUNET_OK == au->ret);
}
static void
drain_kyc_alerts (void *cls)
{
enum GNUNET_DB_QueryStatus qs;
struct AggregationUnit au;
(void) cls;
task = NULL;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Draining KYC alerts\n");
memset (&au,
0,
sizeof (au));
au.execution_time = GNUNET_TIME_timestamp_get ();
if (GNUNET_SYSERR ==
db_plugin->preflight (db_plugin->cls))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to obtain database connection!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
if (GNUNET_OK !=
db_plugin->start (db_plugin->cls,
"handle kyc alerts"))
{
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to start database transaction!\n");
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
while (1)
{
qs = db_plugin->drain_kyc_alert (db_plugin->cls,
1,
&au.h_payto);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
qs = db_plugin->commit (db_plugin->cls);
if (qs < 0)
GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
"Failed to commit KYC drain\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&run_shard,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
/* handled below */
break;
}
au.ret = GNUNET_OK;
qs = db_plugin->find_aggregation_transient (db_plugin->cls,
&au.h_payto,
&handle_transient_cb,
&au);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
"Failed to lookup transient aggregates!\n");
db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
return;
case GNUNET_DB_STATUS_SOFT_ERROR:
/* serializiability issue, try again */
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Serialization issue, trying again later!\n");
db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
continue; /* while (1) */
default:
break;
}
break;
} /* while(1) */
{
enum GNUNET_GenericReturnValue ret;
ret = au.ret;
cleanup_au (&au);
switch (ret)
{
case GNUNET_SYSERR:
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
db_plugin->rollback (db_plugin->cls); /* just in case */
return;
case GNUNET_NO:
db_plugin->rollback (db_plugin->cls);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
return;
case GNUNET_OK:
/* continued below */
break;
}
}
switch (commit_or_warn ())
{
case GNUNET_DB_STATUS_SOFT_ERROR:
/* try again */
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Serialization issue on commit; trying again later!\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
return;
case GNUNET_DB_STATUS_HARD_ERROR:
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
db_plugin->rollback (db_plugin->cls); /* just in case */
return;
case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Commit complete, going again\n");
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
return;
default:
GNUNET_break (0);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
db_plugin->rollback (db_plugin->cls); /* just in case */
return;
}
}
/**
* First task.
*
* @param cls closure, NULL
* @param args remaining command-line arguments
* @param cfgfile name of the configuration file used (for saving, can be NULL!)
* @param c configuration
*/
static void
run (void *cls,
char *const *args,
const char *cfgfile,
const struct GNUNET_CONFIGURATION_Handle *c)
{
unsigned long long ass;
(void) cls;
(void) args;
(void) cfgfile;
cfg = c;
if (GNUNET_OK !=
parse_aggregator_config ())
{
cfg = NULL;
global_ret = EXIT_NOTCONFIGURED;
return;
}
if (GNUNET_OK !=
GNUNET_CONFIGURATION_get_value_number (cfg,
"exchange",
"AGGREGATOR_SHARD_SIZE",
&ass))
{
cfg = NULL;
global_ret = EXIT_NOTCONFIGURED;
return;
}
if ( (0 == ass) ||
(ass > INT32_MAX) )
shard_size = 1U + INT32_MAX;
else
shard_size = (uint32_t) ass;
if (GNUNET_OK !=
TALER_KYCLOGIC_kyc_init (cfg))
{
cfg = NULL;
global_ret = EXIT_NOTCONFIGURED;
return;
}
GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
NULL);
GNUNET_assert (NULL == task);
task = GNUNET_SCHEDULER_add_now (&drain_kyc_alerts,
NULL);
}
/**
* The main function of the taler-exchange-aggregator.
*
* @param argc number of arguments from the command line
* @param argv command line arguments
* @return 0 ok, non-zero on error, see #global_ret
*/
int
main (int argc,
char *const *argv)
{
struct GNUNET_GETOPT_CommandLineOption options[] = {
GNUNET_GETOPT_option_timetravel ('T',
"timetravel"),
GNUNET_GETOPT_option_flag ('t',
"test",
"run in test mode and exit when idle",
&test_mode),
GNUNET_GETOPT_option_flag ('y',
"kyc-off",
"perform wire transfers without KYC checks",
&kyc_off),
GNUNET_GETOPT_OPTION_END
};
enum GNUNET_GenericReturnValue ret;
if (GNUNET_OK !=
GNUNET_STRINGS_get_utf8_args (argc, argv,
&argc, &argv))
return EXIT_INVALIDARGUMENT;
TALER_OS_init ();
ret = GNUNET_PROGRAM_run (
argc, argv,
"taler-exchange-aggregator",
gettext_noop (
"background process that aggregates and executes wire transfers"),
options,
&run, NULL);
GNUNET_free_nz ((void *) argv);
if (GNUNET_SYSERR == ret)
return EXIT_INVALIDARGUMENT;
if (GNUNET_NO == ret)
return EXIT_SUCCESS;
return global_ret;
}
/* end of taler-exchange-aggregator.c */