summaryrefslogtreecommitdiff
path: root/src/auditor/taler-helper-auditor-wire.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/auditor/taler-helper-auditor-wire.c')
-rw-r--r--src/auditor/taler-helper-auditor-wire.c747
1 files changed, 594 insertions, 153 deletions
diff --git a/src/auditor/taler-helper-auditor-wire.c b/src/auditor/taler-helper-auditor-wire.c
index bfc465b05..d48ac1f18 100644
--- a/src/auditor/taler-helper-auditor-wire.c
+++ b/src/auditor/taler-helper-auditor-wire.c
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2017-2022 Taler Systems SA
+ Copyright (C) 2017-2023 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU General Public License as published by the Free Software
@@ -53,6 +53,20 @@
/**
+ * Run in test mode. Exit when idle instead of
+ * going to sleep and waiting for more work.
+ *
+ * FIXME: not yet implemented!
+ */
+static int test_mode;
+
+struct TALER_AUDITORDB_WireAccountProgressPoint
+{
+ uint64_t last_reserve_in_serial_id;
+ uint64_t last_wire_out_serial_id;
+};
+
+/**
* Information we keep for each supported account.
*/
struct WireAccount
@@ -93,9 +107,34 @@ struct WireAccount
struct TALER_AUDITORDB_WireAccountProgressPoint start_pp;
/**
- * Where we are in the transaction history.
+ * Where we are in the inbound transaction history.
+ */
+ uint64_t wire_off_in;
+
+ /**
+ * Where we are in the outbound transaction history.
+ */
+ uint64_t wire_off_out;
+
+ /**
+ * Label under which we store our pp's reserve_in_serial_id.
*/
- struct TALER_AUDITORDB_BankAccountProgressPoint wire_off;
+ char *label_reserve_in_serial_id;
+
+ /**
+ * Label under which we store our pp's reserve_in_serial_id.
+ */
+ char *label_wire_out_serial_id;
+
+ /**
+ * Label under which we store our wire_off_in.
+ */
+ char *label_wire_off_in;
+
+ /**
+ * Label under which we store our wire_off_out.
+ */
+ char *label_wire_off_out;
/**
* Return value when we got this account's progress point.
@@ -178,20 +217,17 @@ static enum GNUNET_DB_QueryStatus qsx_gwap;
/**
* Last reserve_in / wire_out serial IDs seen.
*/
-static struct TALER_AUDITORDB_WireProgressPoint pp;
-
-/**
- * Last reserve_in / wire_out serial IDs seen.
- */
-static struct TALER_AUDITORDB_WireProgressPoint start_pp;
+static TALER_ARL_DEF_PP (wire_reserve_close_id);
+static TALER_ARL_DEF_PP (wire_batch_deposit_id);
+static TALER_ARL_DEF_PP (wire_aggregation_id);
/**
- * Array of reports about row inconsitencies in wire_out table.
+ * Array of reports about row inconsistencies in wire_out table.
*/
static json_t *report_wire_out_inconsistencies;
/**
- * Array of reports about row inconsitencies in reserves_in table.
+ * Array of reports about row inconsistencies in reserves_in table.
*/
static json_t *report_reserve_in_inconsistencies;
@@ -223,6 +259,16 @@ static json_t *report_row_minor_inconsistencies;
static json_t *report_lags;
/**
+ * Array of reports about lagging transactions from deposits due to missing KYC.
+ */
+static json_t *report_kyc_lags;
+
+/**
+ * Array of reports about lagging transactions from deposits due to pending or frozen AML decisions.
+ */
+static json_t *report_aml_lags;
+
+/**
* Array of reports about lagging transactions from reserve closures.
*/
static json_t *report_closure_lags;
@@ -292,17 +338,17 @@ static struct TALER_Amount total_wire_out;
/**
* Total amount of profits drained.
*/
-static struct TALER_Amount total_drained;
+static TALER_ARL_DEF_AB (total_drained);
/**
- * Starting balance at the beginning of this iteration.
+ * Final balance at the end of this iteration.
*/
-static struct TALER_Amount start_balance;
+static TALER_ARL_DEF_AB (final_balance);
/**
- * Final balance at the end of this iteration.
+ * Starting balance at the beginning of this iteration.
*/
-static struct TALER_Amount final_balance;
+static struct TALER_Amount start_balance;
/**
* True if #start_balance was initialized.
@@ -515,15 +561,19 @@ do_shutdown (void *cls)
TALER_JSON_pack_amount ("total_wire_out",
&total_wire_out),
TALER_JSON_pack_amount ("total_drained",
- &total_drained),
+ &TALER_ARL_USE_AB (total_drained)),
TALER_JSON_pack_amount ("final_balance",
- &final_balance),
+ &TALER_ARL_USE_AB (final_balance)),
/* Tested in test-auditor.sh #1 */
TALER_JSON_pack_amount ("total_amount_lag",
&total_amount_lag),
/* Tested in test-auditor.sh #1 */
GNUNET_JSON_pack_array_steal ("lag_details",
report_lags),
+ GNUNET_JSON_pack_array_steal ("lag_aml_details",
+ report_aml_lags),
+ GNUNET_JSON_pack_array_steal ("lag_kyc_details",
+ report_kyc_lags),
/* Tested in test-auditor.sh #22 */
TALER_JSON_pack_amount ("total_closure_amount_lag",
&total_closure_amount_lag),
@@ -534,14 +584,18 @@ do_shutdown (void *cls)
start_time),
TALER_JSON_pack_time_abs_human ("wire_auditor_end_time",
GNUNET_TIME_absolute_get ()),
- GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_uuid",
- start_pp.last_reserve_close_uuid),
- GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_uuid",
- pp.last_reserve_close_uuid),
- TALER_JSON_pack_time_abs_human ("start_pp_last_timestamp",
- start_pp.last_timestamp.abs_time),
- TALER_JSON_pack_time_abs_human ("end_pp_last_timestamp",
- pp.last_timestamp.abs_time),
+ GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_id",
+ 0 /* no longer supported */),
+ GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_id",
+ TALER_ARL_USE_PP (wire_reserve_close_id)),
+ GNUNET_JSON_pack_uint64 ("start_pp_last_batch_deposit_id",
+ 0 /* no longer supported */),
+ GNUNET_JSON_pack_uint64 ("end_pp_last_batch_deposit_id",
+ TALER_ARL_USE_PP (wire_batch_deposit_id)),
+ GNUNET_JSON_pack_uint64 ("start_pp_last_aggregation_serial_id",
+ 0 /* no longer supported */),
+ GNUNET_JSON_pack_uint64 ("end_pp_last_aggregation_serial_id",
+ TALER_ARL_USE_PP (wire_aggregation_id)),
GNUNET_JSON_pack_array_steal ("account_progress",
report_account_progress)));
report_wire_out_inconsistencies = NULL;
@@ -550,6 +604,8 @@ do_shutdown (void *cls)
report_row_minor_inconsistencies = NULL;
report_misattribution_in_inconsistencies = NULL;
report_lags = NULL;
+ report_kyc_lags = NULL;
+ report_aml_lags = NULL;
report_closure_lags = NULL;
report_account_progress = NULL;
report_wire_format_inconsistencies = NULL;
@@ -597,6 +653,10 @@ do_shutdown (void *cls)
GNUNET_CONTAINER_DLL_remove (wa_head,
wa_tail,
wa);
+ GNUNET_free (wa->label_reserve_in_serial_id);
+ GNUNET_free (wa->label_wire_out_serial_id);
+ GNUNET_free (wa->label_wire_off_in);
+ GNUNET_free (wa->label_wire_off_out);
GNUNET_free (wa);
}
if (NULL != ctx)
@@ -651,8 +711,8 @@ check_pending_rc (void *cls,
&rc->wtid),
GNUNET_JSON_pack_string ("account",
rc->receiver_account)));
- pp.last_reserve_close_uuid
- = GNUNET_MIN (pp.last_reserve_close_uuid,
+ TALER_ARL_USE_PP (wire_reserve_close_id)
+ = GNUNET_MIN (TALER_ARL_USE_PP (wire_reserve_close_id),
rc->rowid);
return GNUNET_OK;
}
@@ -704,30 +764,32 @@ commit (enum GNUNET_DB_QueryStatus qs)
TALER_ARL_amount_add (&sum,
&total_wire_in,
&start_balance);
- TALER_ARL_amount_subtract (&final_balance,
+ TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (final_balance),
&sum,
&total_wire_out);
- qs = TALER_ARL_adb->update_predicted_result (TALER_ARL_adb->cls,
- &TALER_ARL_master_pub,
- &final_balance,
- &total_drained);
+ qs = TALER_ARL_adb->update_balance (
+ TALER_ARL_adb->cls,
+ TALER_ARL_SET_AB (total_drained),
+ TALER_ARL_SET_AB (final_balance),
+ NULL);
}
else
{
- TALER_ARL_amount_subtract (&final_balance,
+ TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (final_balance),
&total_wire_in,
&total_wire_out);
- qs = TALER_ARL_adb->insert_predicted_result (TALER_ARL_adb->cls,
- &TALER_ARL_master_pub,
- &final_balance,
- &total_drained);
+ qs = TALER_ARL_adb->insert_balance (
+ TALER_ARL_adb->cls,
+ TALER_ARL_SET_AB (total_drained),
+ TALER_ARL_SET_AB (final_balance),
+ NULL);
}
}
else
{
GNUNET_assert (GNUNET_OK ==
TALER_amount_set_zero (TALER_ARL_currency,
- &final_balance));
+ &TALER_ARL_USE_AB (final_balance)));
}
if (0 > qs)
{
@@ -757,24 +819,33 @@ commit (enum GNUNET_DB_QueryStatus qs)
GNUNET_JSON_pack_uint64 ("end_reserve_in",
wa->pp.last_reserve_in_serial_id),
GNUNET_JSON_pack_uint64 ("start_wire_out",
- wa->start_pp.
- last_wire_out_serial_id),
+ wa->start_pp.last_wire_out_serial_id),
GNUNET_JSON_pack_uint64 ("end_wire_out",
wa->pp.last_wire_out_serial_id))));
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == wa->qsx)
- qs = TALER_ARL_adb->update_wire_auditor_account_progress (
+ qs = TALER_ARL_adb->update_auditor_progress (
TALER_ARL_adb->cls,
- &TALER_ARL_master_pub,
- wa->ai->section_name,
- &wa->pp,
- &wa->wire_off);
+ wa->label_reserve_in_serial_id,
+ wa->pp.last_reserve_in_serial_id,
+ wa->label_wire_out_serial_id,
+ wa->pp.last_wire_out_serial_id,
+ wa->label_wire_off_in,
+ wa->wire_off_in,
+ wa->label_wire_off_out,
+ wa->wire_off_out,
+ NULL);
else
- qs = TALER_ARL_adb->insert_wire_auditor_account_progress (
+ qs = TALER_ARL_adb->insert_auditor_progress (
TALER_ARL_adb->cls,
- &TALER_ARL_master_pub,
- wa->ai->section_name,
- &wa->pp,
- &wa->wire_off);
+ wa->label_reserve_in_serial_id,
+ wa->pp.last_reserve_in_serial_id,
+ wa->label_wire_out_serial_id,
+ wa->pp.last_wire_out_serial_id,
+ wa->label_wire_off_in,
+ wa->wire_off_in,
+ wa->label_wire_off_out,
+ wa->wire_off_out,
+ NULL);
if (0 >= qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -787,13 +858,19 @@ commit (enum GNUNET_DB_QueryStatus qs)
&check_pending_rc,
NULL);
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx_gwap)
- qs = TALER_ARL_adb->update_wire_auditor_progress (TALER_ARL_adb->cls,
- &TALER_ARL_master_pub,
- &pp);
+ qs = TALER_ARL_adb->update_auditor_progress (
+ TALER_ARL_adb->cls,
+ TALER_ARL_SET_PP (wire_reserve_close_id),
+ TALER_ARL_SET_PP (wire_batch_deposit_id),
+ TALER_ARL_SET_PP (wire_aggregation_id),
+ NULL);
else
- qs = TALER_ARL_adb->insert_wire_auditor_progress (TALER_ARL_adb->cls,
- &TALER_ARL_master_pub,
- &pp);
+ qs = TALER_ARL_adb->insert_auditor_progress (
+ TALER_ARL_adb->cls,
+ TALER_ARL_SET_PP (wire_reserve_close_id),
+ TALER_ARL_SET_PP (wire_batch_deposit_id),
+ TALER_ARL_SET_PP (wire_aggregation_id),
+ NULL);
if (0 >= qs)
{
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
@@ -802,8 +879,9 @@ commit (enum GNUNET_DB_QueryStatus qs)
return qs;
}
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Concluded audit step at %s\n",
- GNUNET_TIME_timestamp2s (pp.last_timestamp));
+ "Concluded audit step at %llu/%llu\n",
+ (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id),
+ (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id));
if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs)
{
@@ -840,59 +918,341 @@ commit (enum GNUNET_DB_QueryStatus qs)
/* ***************************** Analyze required transfers ************************ */
/**
- * Function called on deposits that are past their due date
- * and have not yet seen a wire transfer.
+ * Closure for import_wire_missing_cb().
+ */
+struct ImportMissingWireContext
+{
+ /**
+ * Set to maximum row ID encountered.
+ */
+ uint64_t max_batch_deposit_uuid;
+
+ /**
+ * Set to database errors in callback.
+ */
+ enum GNUNET_DB_QueryStatus err;
+};
+
+
+/**
+ * Function called on deposits that need to be checked for their
+ * wire transfer.
*
- * @param cls closure
- * @param rowid deposit table row of the coin's deposit
- * @param coin_pub public key of the coin
- * @param amount value of the deposit, including fee
- * @param payto_uri where should the funds be wired
- * @param deadline what was the requested wire transfer deadline
- * @param done did the exchange claim that it made a transfer?
- * NOTE: only valid in internal audit mode!
+ * @param cls closure, points to a `struct ImportMissingWireContext`
+ * @param batch_deposit_serial_id serial of the entry in the batch deposits table
+ * @param total_amount value of the missing deposits, including fee
+ * @param wire_target_h_payto where should the funds be wired
+ * @param deadline what was the earliest requested wire transfer deadline
*/
static void
-wire_missing_cb (void *cls,
- uint64_t rowid,
- const struct TALER_CoinSpendPublicKeyP *coin_pub,
- const struct TALER_Amount *amount,
- const char *payto_uri,
- struct GNUNET_TIME_Timestamp deadline,
- bool done)
+import_wire_missing_cb (void *cls,
+ uint64_t batch_deposit_serial_id,
+ const struct TALER_Amount *total_amount,
+ const struct TALER_PaytoHashP *wire_target_h_payto,
+ struct GNUNET_TIME_Timestamp deadline)
{
- json_t *rep;
+ struct ImportMissingWireContext *wc = cls;
+ enum GNUNET_DB_QueryStatus qs;
+
+ if (wc->err < 0)
+ return; /* already failed */
+ GNUNET_assert (batch_deposit_serial_id > wc->max_batch_deposit_uuid);
+ wc->max_batch_deposit_uuid = batch_deposit_serial_id;
+ qs = TALER_ARL_adb->insert_pending_deposit (
+ TALER_ARL_adb->cls,
+ batch_deposit_serial_id,
+ wire_target_h_payto,
+ total_amount,
+ deadline);
+ if (qs < 0)
+ wc->err = qs;
+}
+
+
+/**
+ * Information about a delayed wire transfer and the possible
+ * reasons for the delay.
+ */
+struct ReasonDetail
+{
+ /**
+ * Total amount that should have been transferred.
+ */
+ struct TALER_Amount total_amount;
+
+ /**
+ * Earliest deadline for an expected transfer to the account.
+ */
+ struct GNUNET_TIME_Timestamp deadline;
+
+ /**
+ * Target account, NULL if even that is not known (due to
+ * exchange lacking required entry in wire_targets table).
+ */
+ char *payto_uri;
+
+ /**
+ * Reasons due to pending KYC requests.
+ */
+ char *kyc_pending;
+
+ /**
+ * AML decision state for the target account.
+ */
+ enum TALER_AmlDecisionState status;
+
+ /**
+ * Current AML threshold for the account, may be an invalid account if the
+ * default threshold applies.
+ */
+ struct TALER_Amount aml_limit;
+};
+
+/**
+ * Closure for report_wire_missing_cb().
+ */
+struct ReportMissingWireContext
+{
+ /**
+ * Map from wire_target_h_payto to `struct ReasonDetail`.
+ */
+ struct GNUNET_CONTAINER_MultiShortmap *map;
+
+ /**
+ * Set to database errors in callback.
+ */
+ enum GNUNET_DB_QueryStatus err;
+};
+
+
+/**
+ * Closure for #clear_finished_transfer_cb().
+ */
+struct AggregationContext
+{
+ /**
+ * Set to maximum row ID encountered.
+ */
+ uint64_t max_aggregation_serial;
+
+ /**
+ * Set to database errors in callback.
+ */
+ enum GNUNET_DB_QueryStatus err;
+};
+
+
+/**
+ * Free memory allocated in @a value.
+ *
+ * @param cls unused
+ * @param key unused
+ * @param value must be a `struct ReasonDetail`
+ * @return #GNUNET_YES if we should continue to
+ * iterate,
+ * #GNUNET_NO if not.
+ */
+static enum GNUNET_GenericReturnValue
+free_report_entry (void *cls,
+ const struct GNUNET_ShortHashCode *key,
+ void *value)
+{
+ struct ReasonDetail *rd = value;
+
+ GNUNET_free (rd->kyc_pending);
+ GNUNET_free (rd->payto_uri);
+ GNUNET_free (rd);
+ return GNUNET_YES;
+}
+
+
+/**
+ * We had an entry in our map of wire transfers that
+ * should have been performed. Generate report.
+ *
+ * @param cls unused
+ * @param key unused
+ * @param value must be a `struct ReasonDetail`
+ * @return #GNUNET_YES if we should continue to
+ * iterate,
+ * #GNUNET_NO if not.
+ */
+static enum GNUNET_GenericReturnValue
+generate_report (void *cls,
+ const struct GNUNET_ShortHashCode *key,
+ void *value)
+{
+ struct ReasonDetail *rd = value;
- (void) cls;
- TALER_ARL_amount_add (&total_amount_lag,
- &total_amount_lag,
- amount);
/* For now, we simplify and only check that the
amount was tiny */
- if (0 > TALER_amount_cmp (amount,
+ if (0 > TALER_amount_cmp (&rd->total_amount,
&tiny_amount))
- return; /* acceptable, amount was tiny */
- rep = GNUNET_JSON_PACK (
- GNUNET_JSON_pack_uint64 ("row",
- rowid),
- TALER_JSON_pack_amount ("amount",
- amount),
- TALER_JSON_pack_time_abs_human ("deadline",
- deadline.abs_time),
- GNUNET_JSON_pack_data_auto ("coin_pub",
- coin_pub),
- GNUNET_JSON_pack_string ("account",
- payto_uri));
- if (internal_checks)
+ return free_report_entry (cls,
+ key,
+ value); /* acceptable, amount was tiny */
+ // TODO: maybe split total_amount_lag up by category below?
+ TALER_ARL_amount_add (&total_amount_lag,
+ &total_amount_lag,
+ &rd->total_amount);
+ if (NULL != rd->kyc_pending)
+ {
+ json_t *rep;
+
+ rep = GNUNET_JSON_PACK (
+ TALER_JSON_pack_amount ("total_amount",
+ &rd->total_amount),
+ TALER_JSON_pack_time_abs_human ("deadline",
+ rd->deadline.abs_time),
+ GNUNET_JSON_pack_string ("kyc_pending",
+ rd->kyc_pending),
+ GNUNET_JSON_pack_allow_null (
+ GNUNET_JSON_pack_string ("account",
+ rd->payto_uri)));
+ TALER_ARL_report (report_kyc_lags,
+ rep);
+ }
+ else if (TALER_AML_NORMAL != rd->status)
+ {
+ const char *sstatus = "<undefined>";
+ json_t *rep;
+
+ switch (rd->status)
+ {
+ case TALER_AML_NORMAL:
+ GNUNET_assert (0);
+ break;
+ case TALER_AML_PENDING:
+ sstatus = "pending";
+ break;
+ case TALER_AML_FROZEN:
+ sstatus = "frozen";
+ break;
+ }
+ rep = GNUNET_JSON_PACK (
+ TALER_JSON_pack_amount ("total_amount",
+ &rd->total_amount),
+ GNUNET_JSON_pack_allow_null (
+ TALER_JSON_pack_amount ("aml_limit",
+ TALER_amount_is_valid (&rd->aml_limit)
+ ? &rd->aml_limit
+ : NULL)),
+ TALER_JSON_pack_time_abs_human ("deadline",
+ rd->deadline.abs_time),
+ GNUNET_JSON_pack_string ("aml_status",
+ sstatus),
+ GNUNET_JSON_pack_allow_null (
+ GNUNET_JSON_pack_string ("account",
+ rd->payto_uri)));
+ TALER_ARL_report (report_aml_lags,
+ rep);
+ }
+ else
+ {
+ json_t *rep;
+
+ rep = GNUNET_JSON_PACK (
+ TALER_JSON_pack_amount ("total_amount",
+ &rd->total_amount),
+ TALER_JSON_pack_time_abs_human ("deadline",
+ rd->deadline.abs_time),
+ GNUNET_JSON_pack_allow_null (
+ GNUNET_JSON_pack_string ("account",
+ rd->payto_uri)));
+ TALER_ARL_report (report_lags,
+ rep);
+ }
+
+ return free_report_entry (cls,
+ key,
+ value);
+}
+
+
+/**
+ * Function called on deposits that are past their due date
+ * and have not yet seen a wire transfer.
+ *
+ * @param cls closure, points to a `struct ReportMissingWireContext`
+ * @param batch_deposit_serial_id row in the database for which the wire transfer is missing
+ * @param total_amount value of the missing deposits, including fee
+ * @param wire_target_h_payto hash of payto-URI where the funds should have been wired
+ * @param deadline what was the earliest requested wire transfer deadline
+ */
+static void
+report_wire_missing_cb (void *cls,
+ uint64_t batch_deposit_serial_id,
+ const struct TALER_Amount *total_amount,
+ const struct TALER_PaytoHashP *wire_target_h_payto,
+ struct GNUNET_TIME_Timestamp deadline)
+{
+ struct ReportMissingWireContext *rc = cls;
+ struct ReasonDetail *rd;
+
+ rd = GNUNET_CONTAINER_multishortmap_get (rc->map,
+ &wire_target_h_payto->hash);
+ if (NULL == rd)
+ {
+ rd = GNUNET_new (struct ReasonDetail);
+ GNUNET_assert (GNUNET_YES ==
+ GNUNET_CONTAINER_multishortmap_put (
+ rc->map,
+ &wire_target_h_payto->hash,
+ rd,
+ GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY));
+ rc->err = TALER_ARL_edb->select_justification_for_missing_wire (
+ TALER_ARL_edb->cls,
+ wire_target_h_payto,
+ &rd->payto_uri,
+ &rd->kyc_pending,
+ &rd->status,
+ &rd->aml_limit);
+ rd->total_amount = *total_amount;
+ rd->deadline = deadline;
+ }
+ else
+ {
+ TALER_ARL_amount_add (&rd->total_amount,
+ &rd->total_amount,
+ total_amount);
+ rd->deadline = GNUNET_TIME_timestamp_min (rd->deadline,
+ deadline);
+ }
+}
+
+
+/**
+ * Function called on aggregations that were done for
+ * a (batch) deposit.
+ *
+ * @param cls closure
+ * @param tracking_serial_id where in the table are we
+ * @param batch_deposit_serial_id which batch deposit was aggregated
+ */
+static void
+clear_finished_transfer_cb (
+ void *cls,
+ uint64_t tracking_serial_id,
+ uint64_t batch_deposit_serial_id)
+{
+ struct AggregationContext *ac = cls;
+ enum GNUNET_DB_QueryStatus qs;
+
+ if (0 > ac->err)
+ return; /* already failed */
+ GNUNET_assert (ac->max_aggregation_serial < tracking_serial_id);
+ ac->max_aggregation_serial = tracking_serial_id;
+ qs = TALER_ARL_adb->delete_pending_deposit (
+ TALER_ARL_adb->cls,
+ batch_deposit_serial_id);
+ if (0 == qs)
{
- /* the 'done' bit is only useful in 'internal' mode */
- GNUNET_assert (0 ==
- json_object_set (rep,
- "claimed_done",
- json_string ((done) ? "yes" : "no")));
+ /* Aggregated something twice or other error, report! */
+ GNUNET_break (0);
+ // FIXME: report more nicely!
}
- TALER_ARL_report (report_lags,
- rep);
+ if (0 > qs)
+ ac->err = qs;
}
@@ -903,31 +1263,78 @@ wire_missing_cb (void *cls,
static void
check_for_required_transfers (void)
{
- struct GNUNET_TIME_Timestamp next_timestamp;
+ struct ImportMissingWireContext wc = {
+ .max_batch_deposit_uuid = TALER_ARL_USE_PP (wire_batch_deposit_id),
+ .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
+ };
+ struct GNUNET_TIME_Absolute deadline;
enum GNUNET_DB_QueryStatus qs;
+ struct ReportMissingWireContext rc = {
+ .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
+ };
+ struct AggregationContext ac = {
+ .max_aggregation_serial = TALER_ARL_USE_PP (wire_aggregation_id),
+ .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
+ };
+ qs = TALER_ARL_edb->select_batch_deposits_missing_wire (
+ TALER_ARL_edb->cls,
+ TALER_ARL_USE_PP (wire_batch_deposit_id),
+ &import_wire_missing_cb,
+ &wc);
+ if ( (0 > qs) || (0 > wc.err) )
+ {
+ GNUNET_break (0);
+ GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) ||
+ (GNUNET_DB_STATUS_SOFT_ERROR == wc.err) );
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ TALER_ARL_USE_PP (wire_batch_deposit_id) = wc.max_batch_deposit_uuid;
+ qs = TALER_ARL_edb->select_aggregations_above_serial (
+ TALER_ARL_edb->cls,
+ TALER_ARL_USE_PP (wire_aggregation_id),
+ &clear_finished_transfer_cb,
+ &ac);
+ if ( (0 > qs) || (0 > ac.err) )
+ {
+ GNUNET_break (0);
+ GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) ||
+ (GNUNET_DB_STATUS_SOFT_ERROR == ac.err) );
+ global_ret = EXIT_FAILURE;
+ GNUNET_SCHEDULER_shutdown ();
+ return;
+ }
+ TALER_ARL_USE_PP (wire_aggregation_id) = ac.max_aggregation_serial;
/* Subtract #GRACE_PERIOD, so we can be a bit behind in processing
without immediately raising undue concern */
- next_timestamp = GNUNET_TIME_absolute_to_timestamp (
- GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (),
- GRACE_PERIOD));
- GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Analyzing exchange's unfinished deposits (deadline: %s)\n",
- GNUNET_TIME_timestamp2s (next_timestamp));
- qs = TALER_ARL_edb->select_deposits_missing_wire (TALER_ARL_edb->cls,
- pp.last_timestamp,
- next_timestamp,
- &wire_missing_cb,
- &next_timestamp);
- if (0 > qs)
+ deadline = GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (),
+ GRACE_PERIOD);
+ rc.map = GNUNET_CONTAINER_multishortmap_create (1024,
+ GNUNET_NO);
+ qs = TALER_ARL_adb->select_pending_deposits (
+ TALER_ARL_adb->cls,
+ deadline,
+ &report_wire_missing_cb,
+ &rc);
+ if ( (0 > qs) || (0 > rc.err) )
{
GNUNET_break (0);
- GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) ||
+ (GNUNET_DB_STATUS_SOFT_ERROR == rc.err) );
+ GNUNET_CONTAINER_multishortmap_iterate (rc.map,
+ &free_report_entry,
+ NULL);
+ GNUNET_CONTAINER_multishortmap_destroy (rc.map);
global_ret = EXIT_FAILURE;
GNUNET_SCHEDULER_shutdown ();
return;
}
- pp.last_timestamp = next_timestamp;
+ GNUNET_CONTAINER_multishortmap_iterate (rc.map,
+ &generate_report,
+ NULL);
+ GNUNET_CONTAINER_multishortmap_destroy (rc.map);
/* conclude with success */
commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT);
GNUNET_SCHEDULER_shutdown ();
@@ -1034,7 +1441,7 @@ wire_out_cb (void *cls,
{
/* Wire transfer was not made (yet) at all (but would have been
justified), so the entire amount is missing / still to be done.
- This is moderately harmless, it might just be that the aggreator
+ This is moderately harmless, it might just be that the aggregator
has not yet fully caught up with the transfers it should do. */
TALER_ARL_report (
report_wire_out_inconsistencies,
@@ -1383,8 +1790,8 @@ complain_out_not_found (void *cls,
GNUNET_free (account_section);
GNUNET_free (payto_uri);
/* profit drain was correct */
- TALER_ARL_amount_add (&total_drained,
- &total_drained,
+ TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_drained),
+ &TALER_ARL_USE_AB (total_drained),
&amount);
return GNUNET_OK;
}
@@ -1436,6 +1843,7 @@ check_exchange_wire_out (struct WireAccount *wa)
{
enum GNUNET_DB_QueryStatus qs;
+ GNUNET_assert (NULL == wa->dhh);
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
"Analyzing exchange's wire OUT table for account `%s'\n",
wa->ai->section_name);
@@ -1493,7 +1901,7 @@ history_debit_cb (void *cls,
TALER_amount2s (&dd->amount),
TALER_B2S (&dd->wtid));
/* Update offset */
- wa->wire_off.out_wire_off = dd->serial_id;
+ wa->wire_off_out = dd->serial_id;
slen = strlen (dd->credit_account_uri) + 1;
roi = GNUNET_malloc (sizeof (struct ReserveOutInfo)
+ slen);
@@ -1589,7 +1997,7 @@ process_debits (void *cls)
// (CG: used to be INT64_MAX, changed by MS to INT32_MAX, why? To be discussed with him!)
wa->dhh = TALER_BANK_debit_history (ctx,
wa->ai->auth,
- wa->wire_off.out_wire_off,
+ wa->wire_off_out,
INT32_MAX,
GNUNET_TIME_UNIT_ZERO,
&history_debit_cb,
@@ -1613,8 +2021,9 @@ process_debits (void *cls)
static void
begin_debit_audit (void)
{
+ GNUNET_assert (NULL == out_map);
out_map = GNUNET_CONTAINER_multihashmap_create (1024,
- GNUNET_YES);
+ true);
process_debits (wa_head);
}
@@ -1629,8 +2038,11 @@ begin_debit_audit (void)
static void
conclude_credit_history (void)
{
- GNUNET_CONTAINER_multihashmap_destroy (in_map);
- in_map = NULL;
+ if (NULL != in_map)
+ {
+ GNUNET_CONTAINER_multihashmap_destroy (in_map);
+ in_map = NULL;
+ }
/* credit done, now check debits */
begin_debit_audit ();
}
@@ -1823,7 +2235,7 @@ analyze_credit (struct WireAccount *wa,
}
/* Update offset */
- wa->wire_off.in_wire_off = details->serial_id;
+ wa->wire_off_in = details->serial_id;
/* compare records with expected data */
if (0 != GNUNET_memcmp (&details->reserve_pub,
&rii->details.reserve_pub))
@@ -1985,7 +2397,7 @@ history_credit_cb (void *cls,
if (! analyze_credit (wa,
cd))
- break;
+ return;
}
conclude_account (wa);
return;
@@ -2062,7 +2474,7 @@ process_credits (void *cls)
// (CG: used to be INT64_MAX, changed by MS to INT32_MAX, why? To be discussed with him!)
wa->chh = TALER_BANK_credit_history (ctx,
wa->ai->auth,
- wa->wire_off.in_wire_off,
+ wa->wire_off_in,
INT32_MAX,
GNUNET_TIME_UNIT_ZERO,
&history_credit_cb,
@@ -2085,6 +2497,7 @@ process_credits (void *cls)
static void
begin_credit_audit (void)
{
+ GNUNET_assert (NULL == in_map);
in_map = GNUNET_CONTAINER_multihashmap_create (1024,
GNUNET_YES);
/* now go over all bank accounts and check delta with in_map */
@@ -2148,8 +2561,8 @@ reserve_closed_cb (void *cls,
return GNUNET_SYSERR;
return GNUNET_OK;
}
- pp.last_reserve_close_uuid
- = GNUNET_MAX (pp.last_reserve_close_uuid,
+ TALER_ARL_USE_PP (wire_reserve_close_id)
+ = GNUNET_MAX (TALER_ARL_USE_PP (wire_reserve_close_id),
rowid + 1);
rc->receiver_account = GNUNET_strdup (receiver_account);
rc->wtid = *wtid;
@@ -2208,17 +2621,18 @@ begin_transaction (void)
}
GNUNET_assert (GNUNET_OK ==
TALER_amount_set_zero (TALER_ARL_currency,
- &total_drained));
+ &TALER_ARL_USE_AB (total_drained)));
GNUNET_assert (GNUNET_OK ==
TALER_amount_set_zero (TALER_ARL_currency,
&total_wire_in));
GNUNET_assert (GNUNET_OK ==
TALER_amount_set_zero (TALER_ARL_currency,
&total_wire_out));
- qs = TALER_ARL_adb->get_predicted_balance (TALER_ARL_adb->cls,
- &TALER_ARL_master_pub,
- &start_balance,
- &total_drained);
+ qs = TALER_ARL_adb->get_balance (
+ TALER_ARL_adb->cls,
+ TALER_ARL_GET_AB (total_drained),
+ TALER_ARL_GET_AB (final_balance),
+ NULL);
switch (qs)
{
case GNUNET_DB_STATUS_HARD_ERROR:
@@ -2238,12 +2652,33 @@ begin_transaction (void)
NULL != wa;
wa = wa->next)
{
- wa->qsx = TALER_ARL_adb->get_wire_auditor_account_progress (
+ GNUNET_asprintf (&wa->label_reserve_in_serial_id,
+ "wire-%s-%s",
+ wa->ai->section_name,
+ "reserve_in_serial_id");
+ GNUNET_asprintf (&wa->label_wire_out_serial_id,
+ "wire-%s-%s",
+ wa->ai->section_name,
+ "wire_out_serial_id");
+ GNUNET_asprintf (&wa->label_wire_off_in,
+ "wire-%s-%s",
+ wa->ai->section_name,
+ "wire_off_in");
+ GNUNET_asprintf (&wa->label_wire_off_out,
+ "wire-%s-%s",
+ wa->ai->section_name,
+ "wire_off_out");
+ wa->qsx = TALER_ARL_adb->get_auditor_progress (
TALER_ARL_adb->cls,
- &TALER_ARL_master_pub,
- wa->ai->section_name,
- &wa->pp,
- &wa->wire_off);
+ wa->label_reserve_in_serial_id,
+ &wa->pp.last_reserve_in_serial_id,
+ wa->label_wire_out_serial_id,
+ &wa->pp.last_wire_out_serial_id,
+ wa->label_wire_off_in,
+ &wa->wire_off_in,
+ wa->label_wire_off_out,
+ &wa->wire_off_out,
+ NULL);
if (0 > wa->qsx)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == wa->qsx);
@@ -2251,9 +2686,12 @@ begin_transaction (void)
}
wa->start_pp = wa->pp;
}
- qsx_gwap = TALER_ARL_adb->get_wire_auditor_progress (TALER_ARL_adb->cls,
- &TALER_ARL_master_pub,
- &pp);
+ qsx_gwap = TALER_ARL_adb->get_auditor_progress (
+ TALER_ARL_adb->cls,
+ TALER_ARL_GET_PP (wire_reserve_close_id),
+ TALER_ARL_GET_PP (wire_batch_deposit_id),
+ TALER_ARL_GET_PP (wire_aggregation_id),
+ NULL);
if (0 > qsx_gwap)
{
GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx_gwap);
@@ -2266,11 +2704,11 @@ begin_transaction (void)
}
else
{
- start_pp = pp;
GNUNET_log (GNUNET_ERROR_TYPE_INFO,
- "Resuming wire audit at %s / %llu\n",
- GNUNET_TIME_timestamp2s (pp.last_timestamp),
- (unsigned long long) pp.last_reserve_close_uuid);
+ "Resuming wire audit at %llu / %llu / %llu\n",
+ (unsigned long long) TALER_ARL_USE_PP (wire_reserve_close_id),
+ (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id),
+ (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id));
}
{
@@ -2278,7 +2716,7 @@ begin_transaction (void)
qs = TALER_ARL_edb->select_reserve_closed_above_serial_id (
TALER_ARL_edb->cls,
- pp.last_reserve_close_uuid,
+ TALER_ARL_USE_PP (wire_reserve_close_id),
&reserve_closed_cb,
NULL);
if (0 > qs)
@@ -2385,6 +2823,10 @@ run (void *cls,
GNUNET_assert (NULL !=
(report_lags = json_array ()));
GNUNET_assert (NULL !=
+ (report_aml_lags = json_array ()));
+ GNUNET_assert (NULL !=
+ (report_kyc_lags = json_array ()));
+ GNUNET_assert (NULL !=
(report_closure_lags = json_array ()));
GNUNET_assert (NULL !=
(report_account_progress = json_array ()));
@@ -2460,11 +2902,10 @@ main (int argc,
"ignore-not-found",
"continue, even if the bank account of the exchange was not found",
&ignore_account_404),
- GNUNET_GETOPT_option_base32_auto ('m',
- "exchange-key",
- "KEY",
- "public key of the exchange (Crockford base32 encoded)",
- &TALER_ARL_master_pub),
+ GNUNET_GETOPT_option_flag ('t',
+ "test",
+ "run in test mode and exit when idle",
+ &test_mode),
GNUNET_GETOPT_option_timetravel ('T',
"timetravel"),
GNUNET_GETOPT_OPTION_END