summaryrefslogtreecommitdiff
path: root/src/auditor/taler-helper-auditor-wire.c
diff options
context:
space:
mode:
authorChristian Grothoff <christian@grothoff.org>2023-09-14 17:37:53 +0200
committerChristian Grothoff <christian@grothoff.org>2023-09-14 17:37:53 +0200
commit6e5092d83473dc1b0200d82744cf0f0056b0c110 (patch)
tree3a402d87e500fd4e85efb2ec19dfae6d942d9265 /src/auditor/taler-helper-auditor-wire.c
parent1d088120a5c378ec5fe2d9cfd86353f9b75220c4 (diff)
downloadexchange-6e5092d83473dc1b0200d82744cf0f0056b0c110.tar.gz
exchange-6e5092d83473dc1b0200d82744cf0f0056b0c110.tar.bz2
exchange-6e5092d83473dc1b0200d82744cf0f0056b0c110.zip
more work towards auditor support for AML/KYC
Diffstat (limited to 'src/auditor/taler-helper-auditor-wire.c')
-rw-r--r--src/auditor/taler-helper-auditor-wire.c451
1 files changed, 375 insertions, 76 deletions
diff --git a/src/auditor/taler-helper-auditor-wire.c b/src/auditor/taler-helper-auditor-wire.c
index 71775bd4b..2d2bdbcc2 100644
--- a/src/auditor/taler-helper-auditor-wire.c
+++ b/src/auditor/taler-helper-auditor-wire.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2017-2022 Taler Systems SA
+ Copyright (C) 2017-2023 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -548,14 +548,18 @@ do_shutdown (void *cls)
start_time),
TALER_JSON_pack_time_abs_human ("wire_auditor_end_time",
GNUNET_TIME_absolute_get ()),
- GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_uuid",
+ GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_id",
start_pp.last_reserve_close_uuid),
- GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_uuid",
+ GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_id",
pp.last_reserve_close_uuid),
- TALER_JSON_pack_time_abs_human ("start_pp_last_timestamp",
- start_pp.last_timestamp.abs_time),
- TALER_JSON_pack_time_abs_human ("end_pp_last_timestamp",
- pp.last_timestamp.abs_time),
+ GNUNET_JSON_pack_uint64 ("start_pp_last_batch_deposit_id",
+ start_pp.last_batch_deposit_uuid),
+ GNUNET_JSON_pack_uint64 ("end_pp_last_batch_deposit_id",
+ pp.last_batch_deposit_uuid),
+ GNUNET_JSON_pack_uint64 ("start_pp_last_aggregation_serial_id",
+ start_pp.last_aggregation_serial),
+ GNUNET_JSON_pack_uint64 ("end_pp_last_aggregation_serial_id",
+ pp.last_aggregation_serial),
GNUNET_JSON_pack_array_steal ("account_progress",
report_account_progress)));
report_wire_out_inconsistencies = NULL;
@@ -773,8 +777,7 @@ commit (enum GNUNET_DB_QueryStatus qs)
GNUNET_JSON_pack_uint64 ("end_reserve_in",
wa->pp.last_reserve_in_serial_id),
GNUNET_JSON_pack_uint64 ("start_wire_out",
- wa->start_pp.
- last_wire_out_serial_id),
+ wa->start_pp.last_wire_out_serial_id),
GNUNET_JSON_pack_uint64 ("end_wire_out",
wa->pp.last_wire_out_serial_id))));
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == wa->qsx)
@@ -818,8 +821,9 @@ commit (enum GNUNET_DB_QueryStatus qs)
return qs;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Concluded audit step at %s\n",
- GNUNET_TIME_timestamp2s (pp.last_timestamp));
+ "Concluded audit step at %llu/%llu\n",
+ (unsigned long long) pp.last_aggregation_serial,
+ (unsigned long long) pp.last_batch_deposit_uuid);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
{
@@ -856,60 +860,207 @@ commit (enum GNUNET_DB_QueryStatus qs)
/* ***************************** Analyze required transfers ************************ */
/**
- * Function called on deposits that are past their due date
- * and have not yet seen a wire transfer.
+ * Closure for import_wire_missing_cb().
+ */
+struct ImportMissingWireContext
+{
+ /**
+ * Set to maximum row ID encountered.
+ */
+ uint64_t max_batch_deposit_uuid;
+
+ /**
+ * Set to database errors in callback.
+ */
+ enum GNUNET_DB_QueryStatus err;
+};
+
+
+/**
+ * Function called on deposits that need to be checked for their
+ * wire transfer.
*
- * @param cls closure, points to a `struct GNUNET_TIME_Timestamp`
+ * @param cls closure, points to a `struct ImportMissingWireContext`
+ * @param batch_deposit_serial_id serial of the entry in the batch deposits table
* @param total_amount value of the missing deposits, including fee
- * @param payto_uri where should the funds be wired
+ * @param wire_target_h_payto where should the funds be wired
* @param deadline what was the earliest requested wire transfer deadline
- * @param kyc_pending NULL if no KYC requirement is pending, otherwise text describing the missing KYC requirement
- * @param aml_status status of AML possibly blocking the transfer
- * @param aml_limit current monthly AML limit
*/
static void
-wire_missing_cb (void *cls,
- const struct TALER_Amount *total_amount,
- const char *payto_uri,
- struct GNUNET_TIME_Timestamp deadline,
- const char *kyc_pending,
- enum TALER_AmlDecisionState status,
- const struct TALER_Amount *aml_limit)
+import_wire_missing_cb (void *cls,
+ uint64_t batch_deposit_serial_id,
+ const struct TALER_Amount *total_amount,
+ const struct TALER_PaytoHashP *wire_target_h_payto,
+ struct GNUNET_TIME_Timestamp deadline)
{
- struct GNUNET_TIME_Timestamp *nt = cls;
- json_t *rep;
+ struct ImportMissingWireContext *wc = cls;
+ enum GNUNET_DB_QueryStatus qs;
+
+ if (wc->err < 0)
+ return; /* already failed */
+ GNUNET_assert (batch_deposit_serial_id > wc->max_batch_deposit_uuid);
+ wc->max_batch_deposit_uuid = batch_deposit_serial_id;
+ qs = TALER_ARL_adb->insert_pending_deposit (
+ TALER_ARL_adb->cls,
+ &TALER_ARL_master_pub,
+ batch_deposit_serial_id,
+ wire_target_h_payto,
+ total_amount,
+ deadline);
+ if (qs < 0)
+ wc->err = qs;
+}
+
+
+/**
+ * Information about a delayed wire transfer and the possible
+ * reasons for the delay.
+ */
+struct ReasonDetail
+{
+ /**
+ * Total amount that should have been transferred.
+ */
+ struct TALER_Amount total_amount;
+
+ /**
+ * Earliest deadline for an expected transfer to the account.
+ */
+ struct GNUNET_TIME_Timestamp deadline;
+
+ /**
+ * Target account, NULL if even that is not known (due to
+ * exchange lacking required entry in wire_targets table).
+ */
+ char *payto_uri;
+
+ /**
+ * Reasons due to pending KYC requests.
+ */
+ char *kyc_pending;
+
+ /**
+ * AML decision state for the target account.
+ */
+ enum TALER_AmlDecisionState status;
+
+ /**
+ * Current AML threshold for the account, may be an invalid account if the
+ * default threshold applies.
+ */
+ struct TALER_Amount aml_limit;
+};
+
+/**
+ * Closure for report_wire_missing_cb().
+ */
+struct ReportMissingWireContext
+{
+ /**
+ * Map from wire_target_h_payto to `struct ReasonDetail`.
+ */
+ struct GNUNET_CONTAINER_MultiShortmap *map;
+
+ /**
+ * Set to database errors in callback.
+ */
+ enum GNUNET_DB_QueryStatus err;
+};
+
+
+/**
+ * Closure for #clear_finished_transfer_cb().
+ */
+struct AggregationContext
+{
+ /**
+ * Set to maximum row ID encountered.
+ */
+ uint64_t max_aggregation_serial;
+
+ /**
+ * Set to database errors in callback.
+ */
+ enum GNUNET_DB_QueryStatus err;
+};
+
+
+/**
+ * Free memory allocated in @a value.
+ *
+ * @param cls unused
+ * @param key unused
+ * @param value must be a `struct ReasonDetail`
+ * @return #GNUNET_YES if we should continue to
+ * iterate,
+ * #GNUNET_NO if not.
+ */
+static enum GNUNET_GenericReturnValue
+free_report_entry (void *cls,
+ const struct GNUNET_ShortHashCode *key,
+ void *value)
+{
+ struct ReasonDetail *rd = value;
+
+ GNUNET_free (rd->kyc_pending);
+ GNUNET_free (rd->payto_uri);
+ GNUNET_free (rd);
+ return GNUNET_YES;
+}
+
+
+/**
+ * We had an entry in our map of wire transfers that
+ * should have been performed. Generate report.
+ *
+ * @param cls unused
+ * @param key unused
+ * @param value must be a `struct ReasonDetail`
+ * @return #GNUNET_YES if we should continue to
+ * iterate,
+ * #GNUNET_NO if not.
+ */
+static enum GNUNET_GenericReturnValue
+generate_report (void *cls,
+ const struct GNUNET_ShortHashCode *key,
+ void *value)
+{
+ struct ReasonDetail *rd = value;
- *nt = GNUNET_TIME_timestamp_min (deadline,
- *nt);
- (void) cls;
- // TODO: maybe split up by category?
- TALER_ARL_amount_add (&total_amount_lag,
- &total_amount_lag,
- total_amount);
/* For now, we simplify and only check that the
amount was tiny */
- if (0 > TALER_amount_cmp (total_amount,
+ if (0 > TALER_amount_cmp (&rd->total_amount,
&tiny_amount))
- return; /* acceptable, amount was tiny */
- if (NULL != kyc_pending)
+ return free_report_entry (cls,
+ key,
+ value); /* acceptable, amount was tiny */
+ // TODO: maybe split total_amount_lag up by category below?
+ TALER_ARL_amount_add (&total_amount_lag,
+ &total_amount_lag,
+ &rd->total_amount);
+ if (NULL != rd->kyc_pending)
{
+ json_t *rep;
+
rep = GNUNET_JSON_PACK (
TALER_JSON_pack_amount ("total_amount",
- total_amount),
+ &rd->total_amount),
TALER_JSON_pack_time_abs_human ("deadline",
- deadline.abs_time),
+ rd->deadline.abs_time),
GNUNET_JSON_pack_string ("kyc_pending",
- kyc_pending),
- GNUNET_JSON_pack_string ("account",
- payto_uri));
+ rd->kyc_pending),
+ GNUNET_JSON_pack_allow_null (
+ GNUNET_JSON_pack_string ("account",
+ rd->payto_uri)));
TALER_ARL_report (report_kyc_lags,
rep);
}
- else if (TALER_AML_NORMAL != status)
+ else if (TALER_AML_NORMAL != rd->status)
{
const char *sstatus;
+ json_t *rep;
- switch (status)
+ switch (rd->status)
{
case TALER_AML_NORMAL:
GNUNET_assert (0);
@@ -923,30 +1074,128 @@ wire_missing_cb (void *cls,
}
rep = GNUNET_JSON_PACK (
TALER_JSON_pack_amount ("total_amount",
- total_amount),
- TALER_JSON_pack_amount ("aml_limit",
- aml_limit),
+ &rd->total_amount),
+ GNUNET_JSON_pack_allow_null (
+ TALER_JSON_pack_amount ("aml_limit",
+ TALER_amount_is_valid (&rd->aml_limit)
+ ? &rd->aml_limit
+ : NULL)),
TALER_JSON_pack_time_abs_human ("deadline",
- deadline.abs_time),
+ rd->deadline.abs_time),
GNUNET_JSON_pack_string ("aml_status",
sstatus),
- GNUNET_JSON_pack_string ("account",
- payto_uri));
+ GNUNET_JSON_pack_allow_null (
+ GNUNET_JSON_pack_string ("account",
+ rd->payto_uri)));
TALER_ARL_report (report_aml_lags,
rep);
}
else
{
+ json_t *rep;
+
rep = GNUNET_JSON_PACK (
TALER_JSON_pack_amount ("total_amount",
- total_amount),
+ &rd->total_amount),
TALER_JSON_pack_time_abs_human ("deadline",
- deadline.abs_time),
- GNUNET_JSON_pack_string ("account",
- payto_uri));
+ rd->deadline.abs_time),
+ GNUNET_JSON_pack_allow_null (
+ GNUNET_JSON_pack_string ("account",
+ rd->payto_uri)));
TALER_ARL_report (report_lags,
rep);
}
+
+ return free_report_entry (cls,
+ key,
+ value);
+}
+
+
+/**
+ * Function called on deposits that are past their due date
+ * and have not yet seen a wire transfer.
+ *
+ * @param cls closure, points to a `struct ReportMissingWireContext`
+ * @param total_amount value of the missing deposits, including fee
+ * @param payto_uri where should the funds be wired
+ * @param deadline what was the earliest requested wire transfer deadline
+ */
+static void
+report_wire_missing_cb (void *cls,
+ uint64_t batch_deposit_serial_id,
+ const struct TALER_Amount *total_amount,
+ const struct TALER_PaytoHashP *wire_target_h_payto,
+ struct GNUNET_TIME_Timestamp deadline)
+{
+ struct ReportMissingWireContext *rc = cls;
+ struct ReasonDetail *rd;
+
+ rd = GNUNET_CONTAINER_multishortmap_get (rc->map,
+ &wire_target_h_payto->hash);
+ if (NULL == rd)
+ {
+ rd = GNUNET_new (struct ReasonDetail);
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multishortmap_put (
+ rc->map,
+ &wire_target_h_payto->hash,
+ rd,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ rc->err = TALER_ARL_edb->select_justification_for_missing_wire (
+ TALER_ARL_edb->cls,
+ wire_target_h_payto,
+ &rd->payto_uri,
+ &rd->kyc_pending,
+ &rd->status,
+ &rd->aml_limit);
+ rd->total_amount = *total_amount;
+ rd->deadline = deadline;
+ }
+ else
+ {
+ TALER_ARL_amount_add (&rd->total_amount,
+ &rd->total_amount,
+ total_amount);
+ rd->deadline = GNUNET_TIME_timestamp_min (rd->deadline,
+ deadline);
+ }
+}
+
+
+/**
+ * Function called on aggregations that were done for
+ * a (batch) deposit.
+ *
+ * @param cls closure
+ * @param tracking_serial_id where in the table are we
+ * @param batch_deposit_serial_id which batch deposit was aggregated
+ */
+static void
+clear_finished_transfer_cb (
+ void *cls,
+ uint64_t tracking_serial_id,
+ uint64_t batch_deposit_serial_id)
+{
+ struct AggregationContext *ac = cls;
+ enum GNUNET_DB_QueryStatus qs;
+
+ if (0 > ac->err)
+ return; /* already failed */
+ GNUNET_assert (ac->max_aggregation_serial < tracking_serial_id);
+ ac->max_aggregation_serial = tracking_serial_id;
+ qs = TALER_ARL_adb->delete_pending_deposit (
+ TALER_ARL_adb->cls,
+ &TALER_ARL_master_pub,
+ batch_deposit_serial_id);
+ if (0 == qs)
+ {
+ /* Aggregated something twice or other error, report! */
+ GNUNET_break (0);
+ // FIXME: report more nicely!
+ }
+ if (0 > qs)
+ ac->err = qs;
}
@@ -957,31 +1206,79 @@ wire_missing_cb (void *cls,
static void
check_for_required_transfers (void)
{
- struct GNUNET_TIME_Timestamp next_timestamp;
+ struct ImportMissingWireContext wc = {
+ .max_batch_deposit_uuid = pp.last_batch_deposit_uuid,
+ .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
+ };
+ struct GNUNET_TIME_Absolute deadline;
enum GNUNET_DB_QueryStatus qs;
+ struct ReportMissingWireContext rc = {
+ .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
+ };
+ struct AggregationContext ac = {
+ .max_aggregation_serial = pp.last_aggregation_serial,
+ .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
+ };
+ qs = TALER_ARL_edb->select_batch_deposits_missing_wire (
+ TALER_ARL_edb->cls,
+ pp.last_batch_deposit_uuid,
+ &import_wire_missing_cb,
+ &wc);
+ if ( (0 > qs) || (0 > wc.err) )
+ {
+ GNUNET_break (0);
+ GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) ||
+ (GNUNET_DB_STATUS_SOFT_ERROR == wc.err) );
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ pp.last_batch_deposit_uuid = wc.max_batch_deposit_uuid;
+ qs = TALER_ARL_edb->select_aggregations_above_serial (
+ TALER_ARL_edb->cls,
+ pp.last_aggregation_serial,
+ &clear_finished_transfer_cb,
+ &ac);
+ if ( (0 > qs) || (0 > ac.err) )
+ {
+ GNUNET_break (0);
+ GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) ||
+ (GNUNET_DB_STATUS_SOFT_ERROR == ac.err) );
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ pp.last_aggregation_serial = ac.max_aggregation_serial;
/* Subtract #GRACE_PERIOD, so we can be a bit behind in processing
without immediately raising undue concern */
- next_timestamp = GNUNET_TIME_absolute_to_timestamp (
- GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (),
- GRACE_PERIOD));
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Analyzing exchange's unfinished deposits (deadline: %s)\n",
- GNUNET_TIME_timestamp2s (next_timestamp));
- qs = TALER_ARL_edb->select_batch_deposits_missing_wire (TALER_ARL_edb->cls,
- pp.last_timestamp,
- next_timestamp,
- &wire_missing_cb,
- &next_timestamp);
- if (0 > qs)
+ deadline = GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (),
+ GRACE_PERIOD);
+ rc.map = GNUNET_CONTAINER_multishortmap_create (1024,
+ GNUNET_NO);
+ qs = TALER_ARL_adb->select_pending_deposits (
+ TALER_ARL_adb->cls,
+ &TALER_ARL_master_pub,
+ deadline,
+ &report_wire_missing_cb,
+ &rc);
+ if ( (0 > qs) || (0 > rc.err) )
{
GNUNET_break (0);
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) ||
+ (GNUNET_DB_STATUS_SOFT_ERROR == rc.err) );
+ GNUNET_CONTAINER_multishortmap_iterate (rc.map,
+ &free_report_entry,
+ NULL);
+ GNUNET_CONTAINER_multishortmap_destroy (rc.map);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
- pp.last_timestamp = next_timestamp;
+ GNUNET_CONTAINER_multishortmap_iterate (rc.map,
+ &generate_report,
+ NULL);
+ GNUNET_CONTAINER_multishortmap_destroy (rc.map);
/* conclude with success */
commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT);
GNUNET_SCHEDULER_shutdown ();
@@ -2275,10 +2572,11 @@ begin_transaction (void)
GNUNET_assert (GNUNET_OK ==
TALER_amount_set_zero (TALER_ARL_currency,
&total_wire_out));
- qs = TALER_ARL_adb->get_predicted_balance (TALER_ARL_adb->cls,
- &TALER_ARL_master_pub,
- &start_balance,
- &total_drained);
+ qs = TALER_ARL_adb->get_predicted_balance (
+ TALER_ARL_adb->cls,
+ &TALER_ARL_master_pub,
+ &start_balance,
+ &total_drained);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
@@ -2328,9 +2626,10 @@ begin_transaction (void)
{
start_pp = pp;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Resuming wire audit at %s / %llu\n",
- GNUNET_TIME_timestamp2s (pp.last_timestamp),
- (unsigned long long) pp.last_reserve_close_uuid);
+ "Resuming wire audit at %llu / %llu / %llu\n",
+ (unsigned long long) pp.last_reserve_close_uuid,
+ (unsigned long long) pp.last_batch_deposit_uuid,
+ (unsigned long long) pp.last_aggregation_serial);
}
{