diff options
Diffstat (limited to 'src/auditor/taler-helper-auditor-wire.c')
-rw-r--r-- | src/auditor/taler-helper-auditor-wire.c | 747 |
1 files changed, 594 insertions, 153 deletions
diff --git a/src/auditor/taler-helper-auditor-wire.c b/src/auditor/taler-helper-auditor-wire.c index bfc465b05..d48ac1f18 100644 --- a/src/auditor/taler-helper-auditor-wire.c +++ b/src/auditor/taler-helper-auditor-wire.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2017-2022 Taler Systems SA + Copyright (C) 2017-2023 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -53,6 +53,20 @@ /** + * Run in test mode. Exit when idle instead of + * going to sleep and waiting for more work. + * + * FIXME: not yet implemented! + */ +static int test_mode; + +struct TALER_AUDITORDB_WireAccountProgressPoint +{ + uint64_t last_reserve_in_serial_id; + uint64_t last_wire_out_serial_id; +}; + +/** * Information we keep for each supported account. */ struct WireAccount @@ -93,9 +107,34 @@ struct WireAccount struct TALER_AUDITORDB_WireAccountProgressPoint start_pp; /** - * Where we are in the transaction history. + * Where we are in the inbound transaction history. + */ + uint64_t wire_off_in; + + /** + * Where we are in the outbound transaction history. + */ + uint64_t wire_off_out; + + /** + * Label under which we store our pp's reserve_in_serial_id. */ - struct TALER_AUDITORDB_BankAccountProgressPoint wire_off; + char *label_reserve_in_serial_id; + + /** + * Label under which we store our pp's reserve_in_serial_id. + */ + char *label_wire_out_serial_id; + + /** + * Label under which we store our wire_off_in. + */ + char *label_wire_off_in; + + /** + * Label under which we store our wire_off_out. + */ + char *label_wire_off_out; /** * Return value when we got this account's progress point. @@ -178,20 +217,17 @@ static enum GNUNET_DB_QueryStatus qsx_gwap; /** * Last reserve_in / wire_out serial IDs seen. */ -static struct TALER_AUDITORDB_WireProgressPoint pp; - -/** - * Last reserve_in / wire_out serial IDs seen. - */ -static struct TALER_AUDITORDB_WireProgressPoint start_pp; +static TALER_ARL_DEF_PP (wire_reserve_close_id); +static TALER_ARL_DEF_PP (wire_batch_deposit_id); +static TALER_ARL_DEF_PP (wire_aggregation_id); /** - * Array of reports about row inconsitencies in wire_out table. + * Array of reports about row inconsistencies in wire_out table. */ static json_t *report_wire_out_inconsistencies; /** - * Array of reports about row inconsitencies in reserves_in table. + * Array of reports about row inconsistencies in reserves_in table. */ static json_t *report_reserve_in_inconsistencies; @@ -223,6 +259,16 @@ static json_t *report_row_minor_inconsistencies; static json_t *report_lags; /** + * Array of reports about lagging transactions from deposits due to missing KYC. + */ +static json_t *report_kyc_lags; + +/** + * Array of reports about lagging transactions from deposits due to pending or frozen AML decisions. + */ +static json_t *report_aml_lags; + +/** * Array of reports about lagging transactions from reserve closures. */ static json_t *report_closure_lags; @@ -292,17 +338,17 @@ static struct TALER_Amount total_wire_out; /** * Total amount of profits drained. */ -static struct TALER_Amount total_drained; +static TALER_ARL_DEF_AB (total_drained); /** - * Starting balance at the beginning of this iteration. + * Final balance at the end of this iteration. */ -static struct TALER_Amount start_balance; +static TALER_ARL_DEF_AB (final_balance); /** - * Final balance at the end of this iteration. + * Starting balance at the beginning of this iteration. */ -static struct TALER_Amount final_balance; +static struct TALER_Amount start_balance; /** * True if #start_balance was initialized. @@ -515,15 +561,19 @@ do_shutdown (void *cls) TALER_JSON_pack_amount ("total_wire_out", &total_wire_out), TALER_JSON_pack_amount ("total_drained", - &total_drained), + &TALER_ARL_USE_AB (total_drained)), TALER_JSON_pack_amount ("final_balance", - &final_balance), + &TALER_ARL_USE_AB (final_balance)), /* Tested in test-auditor.sh #1 */ TALER_JSON_pack_amount ("total_amount_lag", &total_amount_lag), /* Tested in test-auditor.sh #1 */ GNUNET_JSON_pack_array_steal ("lag_details", report_lags), + GNUNET_JSON_pack_array_steal ("lag_aml_details", + report_aml_lags), + GNUNET_JSON_pack_array_steal ("lag_kyc_details", + report_kyc_lags), /* Tested in test-auditor.sh #22 */ TALER_JSON_pack_amount ("total_closure_amount_lag", &total_closure_amount_lag), @@ -534,14 +584,18 @@ do_shutdown (void *cls) start_time), TALER_JSON_pack_time_abs_human ("wire_auditor_end_time", GNUNET_TIME_absolute_get ()), - GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_uuid", - start_pp.last_reserve_close_uuid), - GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_uuid", - pp.last_reserve_close_uuid), - TALER_JSON_pack_time_abs_human ("start_pp_last_timestamp", - start_pp.last_timestamp.abs_time), - TALER_JSON_pack_time_abs_human ("end_pp_last_timestamp", - pp.last_timestamp.abs_time), + GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_id", + 0 /* no longer supported */), + GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_id", + TALER_ARL_USE_PP (wire_reserve_close_id)), + GNUNET_JSON_pack_uint64 ("start_pp_last_batch_deposit_id", + 0 /* no longer supported */), + GNUNET_JSON_pack_uint64 ("end_pp_last_batch_deposit_id", + TALER_ARL_USE_PP (wire_batch_deposit_id)), + GNUNET_JSON_pack_uint64 ("start_pp_last_aggregation_serial_id", + 0 /* no longer supported */), + GNUNET_JSON_pack_uint64 ("end_pp_last_aggregation_serial_id", + TALER_ARL_USE_PP (wire_aggregation_id)), GNUNET_JSON_pack_array_steal ("account_progress", report_account_progress))); report_wire_out_inconsistencies = NULL; @@ -550,6 +604,8 @@ do_shutdown (void *cls) report_row_minor_inconsistencies = NULL; report_misattribution_in_inconsistencies = NULL; report_lags = NULL; + report_kyc_lags = NULL; + report_aml_lags = NULL; report_closure_lags = NULL; report_account_progress = NULL; report_wire_format_inconsistencies = NULL; @@ -597,6 +653,10 @@ do_shutdown (void *cls) GNUNET_CONTAINER_DLL_remove (wa_head, wa_tail, wa); + GNUNET_free (wa->label_reserve_in_serial_id); + GNUNET_free (wa->label_wire_out_serial_id); + GNUNET_free (wa->label_wire_off_in); + GNUNET_free (wa->label_wire_off_out); GNUNET_free (wa); } if (NULL != ctx) @@ -651,8 +711,8 @@ check_pending_rc (void *cls, &rc->wtid), GNUNET_JSON_pack_string ("account", rc->receiver_account))); - pp.last_reserve_close_uuid - = GNUNET_MIN (pp.last_reserve_close_uuid, + TALER_ARL_USE_PP (wire_reserve_close_id) + = GNUNET_MIN (TALER_ARL_USE_PP (wire_reserve_close_id), rc->rowid); return GNUNET_OK; } @@ -704,30 +764,32 @@ commit (enum GNUNET_DB_QueryStatus qs) TALER_ARL_amount_add (&sum, &total_wire_in, &start_balance); - TALER_ARL_amount_subtract (&final_balance, + TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (final_balance), &sum, &total_wire_out); - qs = TALER_ARL_adb->update_predicted_result (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &final_balance, - &total_drained); + qs = TALER_ARL_adb->update_balance ( + TALER_ARL_adb->cls, + TALER_ARL_SET_AB (total_drained), + TALER_ARL_SET_AB (final_balance), + NULL); } else { - TALER_ARL_amount_subtract (&final_balance, + TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (final_balance), &total_wire_in, &total_wire_out); - qs = TALER_ARL_adb->insert_predicted_result (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &final_balance, - &total_drained); + qs = TALER_ARL_adb->insert_balance ( + TALER_ARL_adb->cls, + TALER_ARL_SET_AB (total_drained), + TALER_ARL_SET_AB (final_balance), + NULL); } } else { GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, - &final_balance)); + &TALER_ARL_USE_AB (final_balance))); } if (0 > qs) { @@ -757,24 +819,33 @@ commit (enum GNUNET_DB_QueryStatus qs) GNUNET_JSON_pack_uint64 ("end_reserve_in", wa->pp.last_reserve_in_serial_id), GNUNET_JSON_pack_uint64 ("start_wire_out", - wa->start_pp. - last_wire_out_serial_id), + wa->start_pp.last_wire_out_serial_id), GNUNET_JSON_pack_uint64 ("end_wire_out", wa->pp.last_wire_out_serial_id)))); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == wa->qsx) - qs = TALER_ARL_adb->update_wire_auditor_account_progress ( + qs = TALER_ARL_adb->update_auditor_progress ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - wa->ai->section_name, - &wa->pp, - &wa->wire_off); + wa->label_reserve_in_serial_id, + wa->pp.last_reserve_in_serial_id, + wa->label_wire_out_serial_id, + wa->pp.last_wire_out_serial_id, + wa->label_wire_off_in, + wa->wire_off_in, + wa->label_wire_off_out, + wa->wire_off_out, + NULL); else - qs = TALER_ARL_adb->insert_wire_auditor_account_progress ( + qs = TALER_ARL_adb->insert_auditor_progress ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - wa->ai->section_name, - &wa->pp, - &wa->wire_off); + wa->label_reserve_in_serial_id, + wa->pp.last_reserve_in_serial_id, + wa->label_wire_out_serial_id, + wa->pp.last_wire_out_serial_id, + wa->label_wire_off_in, + wa->wire_off_in, + wa->label_wire_off_out, + wa->wire_off_out, + NULL); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -787,13 +858,19 @@ commit (enum GNUNET_DB_QueryStatus qs) &check_pending_rc, NULL); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx_gwap) - qs = TALER_ARL_adb->update_wire_auditor_progress (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &pp); + qs = TALER_ARL_adb->update_auditor_progress ( + TALER_ARL_adb->cls, + TALER_ARL_SET_PP (wire_reserve_close_id), + TALER_ARL_SET_PP (wire_batch_deposit_id), + TALER_ARL_SET_PP (wire_aggregation_id), + NULL); else - qs = TALER_ARL_adb->insert_wire_auditor_progress (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &pp); + qs = TALER_ARL_adb->insert_auditor_progress ( + TALER_ARL_adb->cls, + TALER_ARL_SET_PP (wire_reserve_close_id), + TALER_ARL_SET_PP (wire_batch_deposit_id), + TALER_ARL_SET_PP (wire_aggregation_id), + NULL); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -802,8 +879,9 @@ commit (enum GNUNET_DB_QueryStatus qs) return qs; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Concluded audit step at %s\n", - GNUNET_TIME_timestamp2s (pp.last_timestamp)); + "Concluded audit step at %llu/%llu\n", + (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id), + (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id)); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) { @@ -840,59 +918,341 @@ commit (enum GNUNET_DB_QueryStatus qs) /* ***************************** Analyze required transfers ************************ */ /** - * Function called on deposits that are past their due date - * and have not yet seen a wire transfer. + * Closure for import_wire_missing_cb(). + */ +struct ImportMissingWireContext +{ + /** + * Set to maximum row ID encountered. + */ + uint64_t max_batch_deposit_uuid; + + /** + * Set to database errors in callback. + */ + enum GNUNET_DB_QueryStatus err; +}; + + +/** + * Function called on deposits that need to be checked for their + * wire transfer. * - * @param cls closure - * @param rowid deposit table row of the coin's deposit - * @param coin_pub public key of the coin - * @param amount value of the deposit, including fee - * @param payto_uri where should the funds be wired - * @param deadline what was the requested wire transfer deadline - * @param done did the exchange claim that it made a transfer? - * NOTE: only valid in internal audit mode! + * @param cls closure, points to a `struct ImportMissingWireContext` + * @param batch_deposit_serial_id serial of the entry in the batch deposits table + * @param total_amount value of the missing deposits, including fee + * @param wire_target_h_payto where should the funds be wired + * @param deadline what was the earliest requested wire transfer deadline */ static void -wire_missing_cb (void *cls, - uint64_t rowid, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount, - const char *payto_uri, - struct GNUNET_TIME_Timestamp deadline, - bool done) +import_wire_missing_cb (void *cls, + uint64_t batch_deposit_serial_id, + const struct TALER_Amount *total_amount, + const struct TALER_PaytoHashP *wire_target_h_payto, + struct GNUNET_TIME_Timestamp deadline) { - json_t *rep; + struct ImportMissingWireContext *wc = cls; + enum GNUNET_DB_QueryStatus qs; + + if (wc->err < 0) + return; /* already failed */ + GNUNET_assert (batch_deposit_serial_id > wc->max_batch_deposit_uuid); + wc->max_batch_deposit_uuid = batch_deposit_serial_id; + qs = TALER_ARL_adb->insert_pending_deposit ( + TALER_ARL_adb->cls, + batch_deposit_serial_id, + wire_target_h_payto, + total_amount, + deadline); + if (qs < 0) + wc->err = qs; +} + + +/** + * Information about a delayed wire transfer and the possible + * reasons for the delay. + */ +struct ReasonDetail +{ + /** + * Total amount that should have been transferred. + */ + struct TALER_Amount total_amount; + + /** + * Earliest deadline for an expected transfer to the account. + */ + struct GNUNET_TIME_Timestamp deadline; + + /** + * Target account, NULL if even that is not known (due to + * exchange lacking required entry in wire_targets table). + */ + char *payto_uri; + + /** + * Reasons due to pending KYC requests. + */ + char *kyc_pending; + + /** + * AML decision state for the target account. + */ + enum TALER_AmlDecisionState status; + + /** + * Current AML threshold for the account, may be an invalid account if the + * default threshold applies. + */ + struct TALER_Amount aml_limit; +}; + +/** + * Closure for report_wire_missing_cb(). + */ +struct ReportMissingWireContext +{ + /** + * Map from wire_target_h_payto to `struct ReasonDetail`. + */ + struct GNUNET_CONTAINER_MultiShortmap *map; + + /** + * Set to database errors in callback. + */ + enum GNUNET_DB_QueryStatus err; +}; + + +/** + * Closure for #clear_finished_transfer_cb(). + */ +struct AggregationContext +{ + /** + * Set to maximum row ID encountered. + */ + uint64_t max_aggregation_serial; + + /** + * Set to database errors in callback. + */ + enum GNUNET_DB_QueryStatus err; +}; + + +/** + * Free memory allocated in @a value. + * + * @param cls unused + * @param key unused + * @param value must be a `struct ReasonDetail` + * @return #GNUNET_YES if we should continue to + * iterate, + * #GNUNET_NO if not. + */ +static enum GNUNET_GenericReturnValue +free_report_entry (void *cls, + const struct GNUNET_ShortHashCode *key, + void *value) +{ + struct ReasonDetail *rd = value; + + GNUNET_free (rd->kyc_pending); + GNUNET_free (rd->payto_uri); + GNUNET_free (rd); + return GNUNET_YES; +} + + +/** + * We had an entry in our map of wire transfers that + * should have been performed. Generate report. + * + * @param cls unused + * @param key unused + * @param value must be a `struct ReasonDetail` + * @return #GNUNET_YES if we should continue to + * iterate, + * #GNUNET_NO if not. + */ +static enum GNUNET_GenericReturnValue +generate_report (void *cls, + const struct GNUNET_ShortHashCode *key, + void *value) +{ + struct ReasonDetail *rd = value; - (void) cls; - TALER_ARL_amount_add (&total_amount_lag, - &total_amount_lag, - amount); /* For now, we simplify and only check that the amount was tiny */ - if (0 > TALER_amount_cmp (amount, + if (0 > TALER_amount_cmp (&rd->total_amount, &tiny_amount)) - return; /* acceptable, amount was tiny */ - rep = GNUNET_JSON_PACK ( - GNUNET_JSON_pack_uint64 ("row", - rowid), - TALER_JSON_pack_amount ("amount", - amount), - TALER_JSON_pack_time_abs_human ("deadline", - deadline.abs_time), - GNUNET_JSON_pack_data_auto ("coin_pub", - coin_pub), - GNUNET_JSON_pack_string ("account", - payto_uri)); - if (internal_checks) + return free_report_entry (cls, + key, + value); /* acceptable, amount was tiny */ + // TODO: maybe split total_amount_lag up by category below? + TALER_ARL_amount_add (&total_amount_lag, + &total_amount_lag, + &rd->total_amount); + if (NULL != rd->kyc_pending) + { + json_t *rep; + + rep = GNUNET_JSON_PACK ( + TALER_JSON_pack_amount ("total_amount", + &rd->total_amount), + TALER_JSON_pack_time_abs_human ("deadline", + rd->deadline.abs_time), + GNUNET_JSON_pack_string ("kyc_pending", + rd->kyc_pending), + GNUNET_JSON_pack_allow_null ( + GNUNET_JSON_pack_string ("account", + rd->payto_uri))); + TALER_ARL_report (report_kyc_lags, + rep); + } + else if (TALER_AML_NORMAL != rd->status) + { + const char *sstatus = "<undefined>"; + json_t *rep; + + switch (rd->status) + { + case TALER_AML_NORMAL: + GNUNET_assert (0); + break; + case TALER_AML_PENDING: + sstatus = "pending"; + break; + case TALER_AML_FROZEN: + sstatus = "frozen"; + break; + } + rep = GNUNET_JSON_PACK ( + TALER_JSON_pack_amount ("total_amount", + &rd->total_amount), + GNUNET_JSON_pack_allow_null ( + TALER_JSON_pack_amount ("aml_limit", + TALER_amount_is_valid (&rd->aml_limit) + ? &rd->aml_limit + : NULL)), + TALER_JSON_pack_time_abs_human ("deadline", + rd->deadline.abs_time), + GNUNET_JSON_pack_string ("aml_status", + sstatus), + GNUNET_JSON_pack_allow_null ( + GNUNET_JSON_pack_string ("account", + rd->payto_uri))); + TALER_ARL_report (report_aml_lags, + rep); + } + else + { + json_t *rep; + + rep = GNUNET_JSON_PACK ( + TALER_JSON_pack_amount ("total_amount", + &rd->total_amount), + TALER_JSON_pack_time_abs_human ("deadline", + rd->deadline.abs_time), + GNUNET_JSON_pack_allow_null ( + GNUNET_JSON_pack_string ("account", + rd->payto_uri))); + TALER_ARL_report (report_lags, + rep); + } + + return free_report_entry (cls, + key, + value); +} + + +/** + * Function called on deposits that are past their due date + * and have not yet seen a wire transfer. + * + * @param cls closure, points to a `struct ReportMissingWireContext` + * @param batch_deposit_serial_id row in the database for which the wire transfer is missing + * @param total_amount value of the missing deposits, including fee + * @param wire_target_h_payto hash of payto-URI where the funds should have been wired + * @param deadline what was the earliest requested wire transfer deadline + */ +static void +report_wire_missing_cb (void *cls, + uint64_t batch_deposit_serial_id, + const struct TALER_Amount *total_amount, + const struct TALER_PaytoHashP *wire_target_h_payto, + struct GNUNET_TIME_Timestamp deadline) +{ + struct ReportMissingWireContext *rc = cls; + struct ReasonDetail *rd; + + rd = GNUNET_CONTAINER_multishortmap_get (rc->map, + &wire_target_h_payto->hash); + if (NULL == rd) + { + rd = GNUNET_new (struct ReasonDetail); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multishortmap_put ( + rc->map, + &wire_target_h_payto->hash, + rd, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + rc->err = TALER_ARL_edb->select_justification_for_missing_wire ( + TALER_ARL_edb->cls, + wire_target_h_payto, + &rd->payto_uri, + &rd->kyc_pending, + &rd->status, + &rd->aml_limit); + rd->total_amount = *total_amount; + rd->deadline = deadline; + } + else + { + TALER_ARL_amount_add (&rd->total_amount, + &rd->total_amount, + total_amount); + rd->deadline = GNUNET_TIME_timestamp_min (rd->deadline, + deadline); + } +} + + +/** + * Function called on aggregations that were done for + * a (batch) deposit. + * + * @param cls closure + * @param tracking_serial_id where in the table are we + * @param batch_deposit_serial_id which batch deposit was aggregated + */ +static void +clear_finished_transfer_cb ( + void *cls, + uint64_t tracking_serial_id, + uint64_t batch_deposit_serial_id) +{ + struct AggregationContext *ac = cls; + enum GNUNET_DB_QueryStatus qs; + + if (0 > ac->err) + return; /* already failed */ + GNUNET_assert (ac->max_aggregation_serial < tracking_serial_id); + ac->max_aggregation_serial = tracking_serial_id; + qs = TALER_ARL_adb->delete_pending_deposit ( + TALER_ARL_adb->cls, + batch_deposit_serial_id); + if (0 == qs) { - /* the 'done' bit is only useful in 'internal' mode */ - GNUNET_assert (0 == - json_object_set (rep, - "claimed_done", - json_string ((done) ? "yes" : "no"))); + /* Aggregated something twice or other error, report! */ + GNUNET_break (0); + // FIXME: report more nicely! } - TALER_ARL_report (report_lags, - rep); + if (0 > qs) + ac->err = qs; } @@ -903,31 +1263,78 @@ wire_missing_cb (void *cls, static void check_for_required_transfers (void) { - struct GNUNET_TIME_Timestamp next_timestamp; + struct ImportMissingWireContext wc = { + .max_batch_deposit_uuid = TALER_ARL_USE_PP (wire_batch_deposit_id), + .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT + }; + struct GNUNET_TIME_Absolute deadline; enum GNUNET_DB_QueryStatus qs; + struct ReportMissingWireContext rc = { + .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT + }; + struct AggregationContext ac = { + .max_aggregation_serial = TALER_ARL_USE_PP (wire_aggregation_id), + .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT + }; + qs = TALER_ARL_edb->select_batch_deposits_missing_wire ( + TALER_ARL_edb->cls, + TALER_ARL_USE_PP (wire_batch_deposit_id), + &import_wire_missing_cb, + &wc); + if ( (0 > qs) || (0 > wc.err) ) + { + GNUNET_break (0); + GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) || + (GNUNET_DB_STATUS_SOFT_ERROR == wc.err) ); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + TALER_ARL_USE_PP (wire_batch_deposit_id) = wc.max_batch_deposit_uuid; + qs = TALER_ARL_edb->select_aggregations_above_serial ( + TALER_ARL_edb->cls, + TALER_ARL_USE_PP (wire_aggregation_id), + &clear_finished_transfer_cb, + &ac); + if ( (0 > qs) || (0 > ac.err) ) + { + GNUNET_break (0); + GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) || + (GNUNET_DB_STATUS_SOFT_ERROR == ac.err) ); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + TALER_ARL_USE_PP (wire_aggregation_id) = ac.max_aggregation_serial; /* Subtract #GRACE_PERIOD, so we can be a bit behind in processing without immediately raising undue concern */ - next_timestamp = GNUNET_TIME_absolute_to_timestamp ( - GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), - GRACE_PERIOD)); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Analyzing exchange's unfinished deposits (deadline: %s)\n", - GNUNET_TIME_timestamp2s (next_timestamp)); - qs = TALER_ARL_edb->select_deposits_missing_wire (TALER_ARL_edb->cls, - pp.last_timestamp, - next_timestamp, - &wire_missing_cb, - &next_timestamp); - if (0 > qs) + deadline = GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), + GRACE_PERIOD); + rc.map = GNUNET_CONTAINER_multishortmap_create (1024, + GNUNET_NO); + qs = TALER_ARL_adb->select_pending_deposits ( + TALER_ARL_adb->cls, + deadline, + &report_wire_missing_cb, + &rc); + if ( (0 > qs) || (0 > rc.err) ) { GNUNET_break (0); - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) || + (GNUNET_DB_STATUS_SOFT_ERROR == rc.err) ); + GNUNET_CONTAINER_multishortmap_iterate (rc.map, + &free_report_entry, + NULL); + GNUNET_CONTAINER_multishortmap_destroy (rc.map); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } - pp.last_timestamp = next_timestamp; + GNUNET_CONTAINER_multishortmap_iterate (rc.map, + &generate_report, + NULL); + GNUNET_CONTAINER_multishortmap_destroy (rc.map); /* conclude with success */ commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT); GNUNET_SCHEDULER_shutdown (); @@ -1034,7 +1441,7 @@ wire_out_cb (void *cls, { /* Wire transfer was not made (yet) at all (but would have been justified), so the entire amount is missing / still to be done. - This is moderately harmless, it might just be that the aggreator + This is moderately harmless, it might just be that the aggregator has not yet fully caught up with the transfers it should do. */ TALER_ARL_report ( report_wire_out_inconsistencies, @@ -1383,8 +1790,8 @@ complain_out_not_found (void *cls, GNUNET_free (account_section); GNUNET_free (payto_uri); /* profit drain was correct */ - TALER_ARL_amount_add (&total_drained, - &total_drained, + TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_drained), + &TALER_ARL_USE_AB (total_drained), &amount); return GNUNET_OK; } @@ -1436,6 +1843,7 @@ check_exchange_wire_out (struct WireAccount *wa) { enum GNUNET_DB_QueryStatus qs; + GNUNET_assert (NULL == wa->dhh); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Analyzing exchange's wire OUT table for account `%s'\n", wa->ai->section_name); @@ -1493,7 +1901,7 @@ history_debit_cb (void *cls, TALER_amount2s (&dd->amount), TALER_B2S (&dd->wtid)); /* Update offset */ - wa->wire_off.out_wire_off = dd->serial_id; + wa->wire_off_out = dd->serial_id; slen = strlen (dd->credit_account_uri) + 1; roi = GNUNET_malloc (sizeof (struct ReserveOutInfo) + slen); @@ -1589,7 +1997,7 @@ process_debits (void *cls) // (CG: used to be INT64_MAX, changed by MS to INT32_MAX, why? To be discussed with him!) wa->dhh = TALER_BANK_debit_history (ctx, wa->ai->auth, - wa->wire_off.out_wire_off, + wa->wire_off_out, INT32_MAX, GNUNET_TIME_UNIT_ZERO, &history_debit_cb, @@ -1613,8 +2021,9 @@ process_debits (void *cls) static void begin_debit_audit (void) { + GNUNET_assert (NULL == out_map); out_map = GNUNET_CONTAINER_multihashmap_create (1024, - GNUNET_YES); + true); process_debits (wa_head); } @@ -1629,8 +2038,11 @@ begin_debit_audit (void) static void conclude_credit_history (void) { - GNUNET_CONTAINER_multihashmap_destroy (in_map); - in_map = NULL; + if (NULL != in_map) + { + GNUNET_CONTAINER_multihashmap_destroy (in_map); + in_map = NULL; + } /* credit done, now check debits */ begin_debit_audit (); } @@ -1823,7 +2235,7 @@ analyze_credit (struct WireAccount *wa, } /* Update offset */ - wa->wire_off.in_wire_off = details->serial_id; + wa->wire_off_in = details->serial_id; /* compare records with expected data */ if (0 != GNUNET_memcmp (&details->reserve_pub, &rii->details.reserve_pub)) @@ -1985,7 +2397,7 @@ history_credit_cb (void *cls, if (! analyze_credit (wa, cd)) - break; + return; } conclude_account (wa); return; @@ -2062,7 +2474,7 @@ process_credits (void *cls) // (CG: used to be INT64_MAX, changed by MS to INT32_MAX, why? To be discussed with him!) wa->chh = TALER_BANK_credit_history (ctx, wa->ai->auth, - wa->wire_off.in_wire_off, + wa->wire_off_in, INT32_MAX, GNUNET_TIME_UNIT_ZERO, &history_credit_cb, @@ -2085,6 +2497,7 @@ process_credits (void *cls) static void begin_credit_audit (void) { + GNUNET_assert (NULL == in_map); in_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_YES); /* now go over all bank accounts and check delta with in_map */ @@ -2148,8 +2561,8 @@ reserve_closed_cb (void *cls, return GNUNET_SYSERR; return GNUNET_OK; } - pp.last_reserve_close_uuid - = GNUNET_MAX (pp.last_reserve_close_uuid, + TALER_ARL_USE_PP (wire_reserve_close_id) + = GNUNET_MAX (TALER_ARL_USE_PP (wire_reserve_close_id), rowid + 1); rc->receiver_account = GNUNET_strdup (receiver_account); rc->wtid = *wtid; @@ -2208,17 +2621,18 @@ begin_transaction (void) } GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, - &total_drained)); + &TALER_ARL_USE_AB (total_drained))); GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, &total_wire_in)); GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, &total_wire_out)); - qs = TALER_ARL_adb->get_predicted_balance (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &start_balance, - &total_drained); + qs = TALER_ARL_adb->get_balance ( + TALER_ARL_adb->cls, + TALER_ARL_GET_AB (total_drained), + TALER_ARL_GET_AB (final_balance), + NULL); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: @@ -2238,12 +2652,33 @@ begin_transaction (void) NULL != wa; wa = wa->next) { - wa->qsx = TALER_ARL_adb->get_wire_auditor_account_progress ( + GNUNET_asprintf (&wa->label_reserve_in_serial_id, + "wire-%s-%s", + wa->ai->section_name, + "reserve_in_serial_id"); + GNUNET_asprintf (&wa->label_wire_out_serial_id, + "wire-%s-%s", + wa->ai->section_name, + "wire_out_serial_id"); + GNUNET_asprintf (&wa->label_wire_off_in, + "wire-%s-%s", + wa->ai->section_name, + "wire_off_in"); + GNUNET_asprintf (&wa->label_wire_off_out, + "wire-%s-%s", + wa->ai->section_name, + "wire_off_out"); + wa->qsx = TALER_ARL_adb->get_auditor_progress ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - wa->ai->section_name, - &wa->pp, - &wa->wire_off); + wa->label_reserve_in_serial_id, + &wa->pp.last_reserve_in_serial_id, + wa->label_wire_out_serial_id, + &wa->pp.last_wire_out_serial_id, + wa->label_wire_off_in, + &wa->wire_off_in, + wa->label_wire_off_out, + &wa->wire_off_out, + NULL); if (0 > wa->qsx) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == wa->qsx); @@ -2251,9 +2686,12 @@ begin_transaction (void) } wa->start_pp = wa->pp; } - qsx_gwap = TALER_ARL_adb->get_wire_auditor_progress (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &pp); + qsx_gwap = TALER_ARL_adb->get_auditor_progress ( + TALER_ARL_adb->cls, + TALER_ARL_GET_PP (wire_reserve_close_id), + TALER_ARL_GET_PP (wire_batch_deposit_id), + TALER_ARL_GET_PP (wire_aggregation_id), + NULL); if (0 > qsx_gwap) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx_gwap); @@ -2266,11 +2704,11 @@ begin_transaction (void) } else { - start_pp = pp; GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Resuming wire audit at %s / %llu\n", - GNUNET_TIME_timestamp2s (pp.last_timestamp), - (unsigned long long) pp.last_reserve_close_uuid); + "Resuming wire audit at %llu / %llu / %llu\n", + (unsigned long long) TALER_ARL_USE_PP (wire_reserve_close_id), + (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id), + (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id)); } { @@ -2278,7 +2716,7 @@ begin_transaction (void) qs = TALER_ARL_edb->select_reserve_closed_above_serial_id ( TALER_ARL_edb->cls, - pp.last_reserve_close_uuid, + TALER_ARL_USE_PP (wire_reserve_close_id), &reserve_closed_cb, NULL); if (0 > qs) @@ -2385,6 +2823,10 @@ run (void *cls, GNUNET_assert (NULL != (report_lags = json_array ())); GNUNET_assert (NULL != + (report_aml_lags = json_array ())); + GNUNET_assert (NULL != + (report_kyc_lags = json_array ())); + GNUNET_assert (NULL != (report_closure_lags = json_array ())); GNUNET_assert (NULL != (report_account_progress = json_array ())); @@ -2460,11 +2902,10 @@ main (int argc, "ignore-not-found", "continue, even if the bank account of the exchange was not found", &ignore_account_404), - GNUNET_GETOPT_option_base32_auto ('m', - "exchange-key", - "KEY", - "public key of the exchange (Crockford base32 encoded)", - &TALER_ARL_master_pub), + GNUNET_GETOPT_option_flag ('t', + "test", + "run in test mode and exit when idle", + &test_mode), GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_OPTION_END |