diff options
Diffstat (limited to 'src/auditor/taler-helper-auditor-wire.c')
-rw-r--r-- | src/auditor/taler-helper-auditor-wire.c | 1327 |
1 files changed, 1018 insertions, 309 deletions
diff --git a/src/auditor/taler-helper-auditor-wire.c b/src/auditor/taler-helper-auditor-wire.c index 89758c2fc..d48ac1f18 100644 --- a/src/auditor/taler-helper-auditor-wire.c +++ b/src/auditor/taler-helper-auditor-wire.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2017-2021 Taler Systems SA + Copyright (C) 2017-2023 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -53,6 +53,20 @@ /** + * Run in test mode. Exit when idle instead of + * going to sleep and waiting for more work. + * + * FIXME: not yet implemented! + */ +static int test_mode; + +struct TALER_AUDITORDB_WireAccountProgressPoint +{ + uint64_t last_reserve_in_serial_id; + uint64_t last_wire_out_serial_id; +}; + +/** * Information we keep for each supported account. */ struct WireAccount @@ -93,14 +107,34 @@ struct WireAccount struct TALER_AUDITORDB_WireAccountProgressPoint start_pp; /** - * Where we are in the inbound (CREDIT) transaction history. + * Where we are in the inbound transaction history. + */ + uint64_t wire_off_in; + + /** + * Where we are in the outbound transaction history. + */ + uint64_t wire_off_out; + + /** + * Label under which we store our pp's reserve_in_serial_id. + */ + char *label_reserve_in_serial_id; + + /** + * Label under which we store our pp's reserve_in_serial_id. */ - uint64_t in_wire_off; + char *label_wire_out_serial_id; /** - * Where we are in the inbound (DEBIT) transaction history. + * Label under which we store our wire_off_in. */ - uint64_t out_wire_off; + char *label_wire_off_in; + + /** + * Label under which we store our wire_off_out. + */ + char *label_wire_off_out; /** * Return value when we got this account's progress point. @@ -183,20 +217,17 @@ static enum GNUNET_DB_QueryStatus qsx_gwap; /** * Last reserve_in / wire_out serial IDs seen. */ -static struct TALER_AUDITORDB_WireProgressPoint pp; - -/** - * Last reserve_in / wire_out serial IDs seen. - */ -static struct TALER_AUDITORDB_WireProgressPoint start_pp; +static TALER_ARL_DEF_PP (wire_reserve_close_id); +static TALER_ARL_DEF_PP (wire_batch_deposit_id); +static TALER_ARL_DEF_PP (wire_aggregation_id); /** - * Array of reports about row inconsitencies in wire_out table. + * Array of reports about row inconsistencies in wire_out table. */ static json_t *report_wire_out_inconsistencies; /** - * Array of reports about row inconsitencies in reserves_in table. + * Array of reports about row inconsistencies in reserves_in table. */ static json_t *report_reserve_in_inconsistencies; @@ -204,7 +235,7 @@ static json_t *report_reserve_in_inconsistencies; * Array of reports about wrong bank account being recorded for * incoming wire transfers. */ -static json_t *report_missattribution_in_inconsistencies; +static json_t *report_misattribution_in_inconsistencies; /** * Array of reports about row inconsistencies. @@ -228,6 +259,16 @@ static json_t *report_row_minor_inconsistencies; static json_t *report_lags; /** + * Array of reports about lagging transactions from deposits due to missing KYC. + */ +static json_t *report_kyc_lags; + +/** + * Array of reports about lagging transactions from deposits due to pending or frozen AML decisions. + */ +static json_t *report_aml_lags; + +/** * Array of reports about lagging transactions from reserve closures. */ static json_t *report_closure_lags; @@ -267,7 +308,7 @@ static struct TALER_Amount total_bad_amount_in_minus; * for incoming funds and may thus wire funds to the wrong * destination when closing the reserve. */ -static struct TALER_Amount total_missattribution_in; +static struct TALER_Amount total_misattribution_in; /** * Total amount which the exchange did not transfer in time. @@ -285,6 +326,36 @@ static struct TALER_Amount total_closure_amount_lag; static struct TALER_Amount total_wire_format_amount; /** + * Total amount credited to exchange accounts. + */ +static struct TALER_Amount total_wire_in; + +/** + * Total amount debited to exchange accounts. + */ +static struct TALER_Amount total_wire_out; + +/** + * Total amount of profits drained. + */ +static TALER_ARL_DEF_AB (total_drained); + +/** + * Final balance at the end of this iteration. + */ +static TALER_ARL_DEF_AB (final_balance); + +/** + * Starting balance at the beginning of this iteration. + */ +static struct TALER_Amount start_balance; + +/** + * True if #start_balance was initialized. + */ +static bool had_start_balance; + +/** * Amount of zero in our currency. */ static struct TALER_Amount zero; @@ -304,6 +375,12 @@ static struct GNUNET_CURL_RescheduleContext *rc; */ static int internal_checks; +/** + * Should we ignore if the bank does not know our bank + * account? + */ +static int ignore_account_404; + /* ***************************** Shutdown **************************** */ /** @@ -361,7 +438,7 @@ struct ReserveOutInfo * @param value the `struct ReserveInInfo` to free * @return #GNUNET_OK */ -static int +static enum GNUNET_GenericReturnValue free_rii (void *cls, const struct GNUNET_HashCode *key, void *value) @@ -386,7 +463,7 @@ free_rii (void *cls, * @param value the `struct ReserveOutInfo` to free * @return #GNUNET_OK */ -static int +static enum GNUNET_GenericReturnValue free_roi (void *cls, const struct GNUNET_HashCode *key, void *value) @@ -411,7 +488,7 @@ free_roi (void *cls, * @param value the `struct ReserveClosure` to free * @return #GNUNET_OK */ -static int +static enum GNUNET_GenericReturnValue free_rc (void *cls, const struct GNUNET_HashCode *key, void *value) @@ -463,11 +540,11 @@ do_shutdown (void *cls) TALER_JSON_pack_amount ("total_wire_in_delta_minus", &total_bad_amount_in_minus), /* Tested in test-auditor.sh #9 */ - GNUNET_JSON_pack_array_steal ("missattribution_in_inconsistencies", - report_missattribution_in_inconsistencies), + GNUNET_JSON_pack_array_steal ("misattribution_in_inconsistencies", + report_misattribution_in_inconsistencies), /* Tested in test-auditor.sh #9 */ - TALER_JSON_pack_amount ("total_missattribution_in", - &total_missattribution_in), + TALER_JSON_pack_amount ("total_misattribution_in", + &total_misattribution_in), GNUNET_JSON_pack_array_steal ("row_inconsistencies", report_row_inconsistencies), /* Tested in test-auditor.sh #10/#17 */ @@ -479,12 +556,24 @@ do_shutdown (void *cls) /* Tested in test-auditor.sh #19 */ GNUNET_JSON_pack_array_steal ("wire_format_inconsistencies", report_wire_format_inconsistencies), + TALER_JSON_pack_amount ("total_wire_in", + &total_wire_in), + TALER_JSON_pack_amount ("total_wire_out", + &total_wire_out), + TALER_JSON_pack_amount ("total_drained", + &TALER_ARL_USE_AB (total_drained)), + TALER_JSON_pack_amount ("final_balance", + &TALER_ARL_USE_AB (final_balance)), /* Tested in test-auditor.sh #1 */ TALER_JSON_pack_amount ("total_amount_lag", &total_amount_lag), /* Tested in test-auditor.sh #1 */ GNUNET_JSON_pack_array_steal ("lag_details", report_lags), + GNUNET_JSON_pack_array_steal ("lag_aml_details", + report_aml_lags), + GNUNET_JSON_pack_array_steal ("lag_kyc_details", + report_kyc_lags), /* Tested in test-auditor.sh #22 */ TALER_JSON_pack_amount ("total_closure_amount_lag", &total_closure_amount_lag), @@ -495,22 +584,28 @@ do_shutdown (void *cls) start_time), TALER_JSON_pack_time_abs_human ("wire_auditor_end_time", GNUNET_TIME_absolute_get ()), - GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_uuid", - start_pp.last_reserve_close_uuid), - GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_uuid", - pp.last_reserve_close_uuid), - TALER_JSON_pack_time_abs_human ("start_pp_last_timestamp", - start_pp.last_timestamp.abs_time), - TALER_JSON_pack_time_abs_human ("end_pp_last_timestamp", - pp.last_timestamp.abs_time), + GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_id", + 0 /* no longer supported */), + GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_id", + TALER_ARL_USE_PP (wire_reserve_close_id)), + GNUNET_JSON_pack_uint64 ("start_pp_last_batch_deposit_id", + 0 /* no longer supported */), + GNUNET_JSON_pack_uint64 ("end_pp_last_batch_deposit_id", + TALER_ARL_USE_PP (wire_batch_deposit_id)), + GNUNET_JSON_pack_uint64 ("start_pp_last_aggregation_serial_id", + 0 /* no longer supported */), + GNUNET_JSON_pack_uint64 ("end_pp_last_aggregation_serial_id", + TALER_ARL_USE_PP (wire_aggregation_id)), GNUNET_JSON_pack_array_steal ("account_progress", report_account_progress))); report_wire_out_inconsistencies = NULL; report_reserve_in_inconsistencies = NULL; report_row_inconsistencies = NULL; report_row_minor_inconsistencies = NULL; - report_missattribution_in_inconsistencies = NULL; + report_misattribution_in_inconsistencies = NULL; report_lags = NULL; + report_kyc_lags = NULL; + report_aml_lags = NULL; report_closure_lags = NULL; report_account_progress = NULL; report_wire_format_inconsistencies = NULL; @@ -558,6 +653,10 @@ do_shutdown (void *cls) GNUNET_CONTAINER_DLL_remove (wa_head, wa_tail, wa); + GNUNET_free (wa->label_reserve_in_serial_id); + GNUNET_free (wa->label_wire_out_serial_id); + GNUNET_free (wa->label_wire_off_in); + GNUNET_free (wa->label_wire_off_out); GNUNET_free (wa); } if (NULL != ctx) @@ -585,7 +684,7 @@ do_shutdown (void *cls) * @param value the `struct ReserveClosure` to free * @return #GNUNET_OK */ -static int +static enum GNUNET_GenericReturnValue check_pending_rc (void *cls, const struct GNUNET_HashCode *key, void *value) @@ -612,8 +711,8 @@ check_pending_rc (void *cls, &rc->wtid), GNUNET_JSON_pack_string ("account", rc->receiver_account))); - pp.last_reserve_close_uuid - = GNUNET_MIN (pp.last_reserve_close_uuid, + TALER_ARL_USE_PP (wire_reserve_close_id) + = GNUNET_MIN (TALER_ARL_USE_PP (wire_reserve_close_id), rc->rowid); return GNUNET_OK; } @@ -635,12 +734,12 @@ hash_rc (const char *receiver_account, size_t slen = strlen (receiver_account); char buf[sizeof (struct TALER_WireTransferIdentifierRawP) + slen]; - memcpy (buf, - wtid, - sizeof (*wtid)); - memcpy (&buf[sizeof (*wtid)], - receiver_account, - slen); + GNUNET_memcpy (buf, + wtid, + sizeof (*wtid)); + GNUNET_memcpy (&buf[sizeof (*wtid)], + receiver_account, + slen); GNUNET_CRYPTO_hash (buf, sizeof (buf), key); @@ -656,6 +755,42 @@ hash_rc (const char *receiver_account, static enum GNUNET_DB_QueryStatus commit (enum GNUNET_DB_QueryStatus qs) { + if (qs >= 0) + { + if (had_start_balance) + { + struct TALER_Amount sum; + + TALER_ARL_amount_add (&sum, + &total_wire_in, + &start_balance); + TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (final_balance), + &sum, + &total_wire_out); + qs = TALER_ARL_adb->update_balance ( + TALER_ARL_adb->cls, + TALER_ARL_SET_AB (total_drained), + TALER_ARL_SET_AB (final_balance), + NULL); + } + else + { + TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (final_balance), + &total_wire_in, + &total_wire_out); + qs = TALER_ARL_adb->insert_balance ( + TALER_ARL_adb->cls, + TALER_ARL_SET_AB (total_drained), + TALER_ARL_SET_AB (final_balance), + NULL); + } + } + else + { + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (TALER_ARL_currency, + &TALER_ARL_USE_AB (final_balance))); + } if (0 > qs) { if (GNUNET_DB_STATUS_SOFT_ERROR == qs) @@ -684,26 +819,33 @@ commit (enum GNUNET_DB_QueryStatus qs) GNUNET_JSON_pack_uint64 ("end_reserve_in", wa->pp.last_reserve_in_serial_id), GNUNET_JSON_pack_uint64 ("start_wire_out", - wa->start_pp. - last_wire_out_serial_id), + wa->start_pp.last_wire_out_serial_id), GNUNET_JSON_pack_uint64 ("end_wire_out", wa->pp.last_wire_out_serial_id)))); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == wa->qsx) - qs = TALER_ARL_adb->update_wire_auditor_account_progress ( + qs = TALER_ARL_adb->update_auditor_progress ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - wa->ai->section_name, - &wa->pp, - wa->in_wire_off, - wa->out_wire_off); + wa->label_reserve_in_serial_id, + wa->pp.last_reserve_in_serial_id, + wa->label_wire_out_serial_id, + wa->pp.last_wire_out_serial_id, + wa->label_wire_off_in, + wa->wire_off_in, + wa->label_wire_off_out, + wa->wire_off_out, + NULL); else - qs = TALER_ARL_adb->insert_wire_auditor_account_progress ( + qs = TALER_ARL_adb->insert_auditor_progress ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - wa->ai->section_name, - &wa->pp, - wa->in_wire_off, - wa->out_wire_off); + wa->label_reserve_in_serial_id, + wa->pp.last_reserve_in_serial_id, + wa->label_wire_out_serial_id, + wa->pp.last_wire_out_serial_id, + wa->label_wire_off_in, + wa->wire_off_in, + wa->label_wire_off_out, + wa->wire_off_out, + NULL); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -716,13 +858,19 @@ commit (enum GNUNET_DB_QueryStatus qs) &check_pending_rc, NULL); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx_gwap) - qs = TALER_ARL_adb->update_wire_auditor_progress (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &pp); + qs = TALER_ARL_adb->update_auditor_progress ( + TALER_ARL_adb->cls, + TALER_ARL_SET_PP (wire_reserve_close_id), + TALER_ARL_SET_PP (wire_batch_deposit_id), + TALER_ARL_SET_PP (wire_aggregation_id), + NULL); else - qs = TALER_ARL_adb->insert_wire_auditor_progress (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &pp); + qs = TALER_ARL_adb->insert_auditor_progress ( + TALER_ARL_adb->cls, + TALER_ARL_SET_PP (wire_reserve_close_id), + TALER_ARL_SET_PP (wire_batch_deposit_id), + TALER_ARL_SET_PP (wire_aggregation_id), + NULL); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -731,8 +879,9 @@ commit (enum GNUNET_DB_QueryStatus qs) return qs; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Concluded audit step at %s\n", - GNUNET_TIME_timestamp2s (pp.last_timestamp)); + "Concluded audit step at %llu/%llu\n", + (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id), + (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id)); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) { @@ -769,74 +918,341 @@ commit (enum GNUNET_DB_QueryStatus qs) /* ***************************** Analyze required transfers ************************ */ /** - * Function called on deposits that are past their due date - * and have not yet seen a wire transfer. + * Closure for import_wire_missing_cb(). + */ +struct ImportMissingWireContext +{ + /** + * Set to maximum row ID encountered. + */ + uint64_t max_batch_deposit_uuid; + + /** + * Set to database errors in callback. + */ + enum GNUNET_DB_QueryStatus err; +}; + + +/** + * Function called on deposits that need to be checked for their + * wire transfer. * - * @param cls closure - * @param rowid deposit table row of the coin's deposit - * @param coin_pub public key of the coin - * @param amount value of the deposit, including fee - * @param payto_uri where should the funds be wired - * @param deadline what was the requested wire transfer deadline - * @param tiny did the exchange defer this transfer because it is too small? - * NOTE: only valid in internal audit mode! - * @param done did the exchange claim that it made a transfer? - * NOTE: only valid in internal audit mode! + * @param cls closure, points to a `struct ImportMissingWireContext` + * @param batch_deposit_serial_id serial of the entry in the batch deposits table + * @param total_amount value of the missing deposits, including fee + * @param wire_target_h_payto where should the funds be wired + * @param deadline what was the earliest requested wire transfer deadline */ static void -wire_missing_cb (void *cls, - uint64_t rowid, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount, - const char *payto_uri, - struct GNUNET_TIME_Timestamp deadline, - bool tiny, - bool done) +import_wire_missing_cb (void *cls, + uint64_t batch_deposit_serial_id, + const struct TALER_Amount *total_amount, + const struct TALER_PaytoHashP *wire_target_h_payto, + struct GNUNET_TIME_Timestamp deadline) { - json_t *rep; + struct ImportMissingWireContext *wc = cls; + enum GNUNET_DB_QueryStatus qs; - (void) cls; + if (wc->err < 0) + return; /* already failed */ + GNUNET_assert (batch_deposit_serial_id > wc->max_batch_deposit_uuid); + wc->max_batch_deposit_uuid = batch_deposit_serial_id; + qs = TALER_ARL_adb->insert_pending_deposit ( + TALER_ARL_adb->cls, + batch_deposit_serial_id, + wire_target_h_payto, + total_amount, + deadline); + if (qs < 0) + wc->err = qs; +} + + +/** + * Information about a delayed wire transfer and the possible + * reasons for the delay. + */ +struct ReasonDetail +{ + /** + * Total amount that should have been transferred. + */ + struct TALER_Amount total_amount; + + /** + * Earliest deadline for an expected transfer to the account. + */ + struct GNUNET_TIME_Timestamp deadline; + + /** + * Target account, NULL if even that is not known (due to + * exchange lacking required entry in wire_targets table). + */ + char *payto_uri; + + /** + * Reasons due to pending KYC requests. + */ + char *kyc_pending; + + /** + * AML decision state for the target account. + */ + enum TALER_AmlDecisionState status; + + /** + * Current AML threshold for the account, may be an invalid account if the + * default threshold applies. + */ + struct TALER_Amount aml_limit; +}; + +/** + * Closure for report_wire_missing_cb(). + */ +struct ReportMissingWireContext +{ + /** + * Map from wire_target_h_payto to `struct ReasonDetail`. + */ + struct GNUNET_CONTAINER_MultiShortmap *map; + + /** + * Set to database errors in callback. + */ + enum GNUNET_DB_QueryStatus err; +}; + + +/** + * Closure for #clear_finished_transfer_cb(). + */ +struct AggregationContext +{ + /** + * Set to maximum row ID encountered. + */ + uint64_t max_aggregation_serial; + + /** + * Set to database errors in callback. + */ + enum GNUNET_DB_QueryStatus err; +}; + + +/** + * Free memory allocated in @a value. + * + * @param cls unused + * @param key unused + * @param value must be a `struct ReasonDetail` + * @return #GNUNET_YES if we should continue to + * iterate, + * #GNUNET_NO if not. + */ +static enum GNUNET_GenericReturnValue +free_report_entry (void *cls, + const struct GNUNET_ShortHashCode *key, + void *value) +{ + struct ReasonDetail *rd = value; + + GNUNET_free (rd->kyc_pending); + GNUNET_free (rd->payto_uri); + GNUNET_free (rd); + return GNUNET_YES; +} + + +/** + * We had an entry in our map of wire transfers that + * should have been performed. Generate report. + * + * @param cls unused + * @param key unused + * @param value must be a `struct ReasonDetail` + * @return #GNUNET_YES if we should continue to + * iterate, + * #GNUNET_NO if not. + */ +static enum GNUNET_GenericReturnValue +generate_report (void *cls, + const struct GNUNET_ShortHashCode *key, + void *value) +{ + struct ReasonDetail *rd = value; + + /* For now, we simplify and only check that the + amount was tiny */ + if (0 > TALER_amount_cmp (&rd->total_amount, + &tiny_amount)) + return free_report_entry (cls, + key, + value); /* acceptable, amount was tiny */ + // TODO: maybe split total_amount_lag up by category below? TALER_ARL_amount_add (&total_amount_lag, &total_amount_lag, - amount); - if (internal_checks) + &rd->total_amount); + if (NULL != rd->kyc_pending) { - /* In internal mode, we insist that the entry was - actually marked as tiny. */ - if (tiny && - (0 > TALER_amount_cmp (amount, - &tiny_amount)) ) - return; /* acceptable, amount was tiny */ + json_t *rep; + + rep = GNUNET_JSON_PACK ( + TALER_JSON_pack_amount ("total_amount", + &rd->total_amount), + TALER_JSON_pack_time_abs_human ("deadline", + rd->deadline.abs_time), + GNUNET_JSON_pack_string ("kyc_pending", + rd->kyc_pending), + GNUNET_JSON_pack_allow_null ( + GNUNET_JSON_pack_string ("account", + rd->payto_uri))); + TALER_ARL_report (report_kyc_lags, + rep); + } + else if (TALER_AML_NORMAL != rd->status) + { + const char *sstatus = "<undefined>"; + json_t *rep; + + switch (rd->status) + { + case TALER_AML_NORMAL: + GNUNET_assert (0); + break; + case TALER_AML_PENDING: + sstatus = "pending"; + break; + case TALER_AML_FROZEN: + sstatus = "frozen"; + break; + } + rep = GNUNET_JSON_PACK ( + TALER_JSON_pack_amount ("total_amount", + &rd->total_amount), + GNUNET_JSON_pack_allow_null ( + TALER_JSON_pack_amount ("aml_limit", + TALER_amount_is_valid (&rd->aml_limit) + ? &rd->aml_limit + : NULL)), + TALER_JSON_pack_time_abs_human ("deadline", + rd->deadline.abs_time), + GNUNET_JSON_pack_string ("aml_status", + sstatus), + GNUNET_JSON_pack_allow_null ( + GNUNET_JSON_pack_string ("account", + rd->payto_uri))); + TALER_ARL_report (report_aml_lags, + rep); } else { - /* External auditors do not replicate tiny, so they - only check that the amount is tiny */ - if (0 > TALER_amount_cmp (amount, - &tiny_amount)) - return; /* acceptable, amount was tiny */ + json_t *rep; + + rep = GNUNET_JSON_PACK ( + TALER_JSON_pack_amount ("total_amount", + &rd->total_amount), + TALER_JSON_pack_time_abs_human ("deadline", + rd->deadline.abs_time), + GNUNET_JSON_pack_allow_null ( + GNUNET_JSON_pack_string ("account", + rd->payto_uri))); + TALER_ARL_report (report_lags, + rep); } - rep = GNUNET_JSON_PACK ( - GNUNET_JSON_pack_uint64 ("row", - rowid), - TALER_JSON_pack_amount ("amount", - amount), - TALER_JSON_pack_time_abs_human ("deadline", - deadline.abs_time), - GNUNET_JSON_pack_data_auto ("coin_pub", - coin_pub), - GNUNET_JSON_pack_string ("account", - payto_uri)); - if (internal_checks) + + return free_report_entry (cls, + key, + value); +} + + +/** + * Function called on deposits that are past their due date + * and have not yet seen a wire transfer. + * + * @param cls closure, points to a `struct ReportMissingWireContext` + * @param batch_deposit_serial_id row in the database for which the wire transfer is missing + * @param total_amount value of the missing deposits, including fee + * @param wire_target_h_payto hash of payto-URI where the funds should have been wired + * @param deadline what was the earliest requested wire transfer deadline + */ +static void +report_wire_missing_cb (void *cls, + uint64_t batch_deposit_serial_id, + const struct TALER_Amount *total_amount, + const struct TALER_PaytoHashP *wire_target_h_payto, + struct GNUNET_TIME_Timestamp deadline) +{ + struct ReportMissingWireContext *rc = cls; + struct ReasonDetail *rd; + + rd = GNUNET_CONTAINER_multishortmap_get (rc->map, + &wire_target_h_payto->hash); + if (NULL == rd) + { + rd = GNUNET_new (struct ReasonDetail); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multishortmap_put ( + rc->map, + &wire_target_h_payto->hash, + rd, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + rc->err = TALER_ARL_edb->select_justification_for_missing_wire ( + TALER_ARL_edb->cls, + wire_target_h_payto, + &rd->payto_uri, + &rd->kyc_pending, + &rd->status, + &rd->aml_limit); + rd->total_amount = *total_amount; + rd->deadline = deadline; + } + else { - /* the 'done' bit is only useful in 'internal' mode */ - GNUNET_assert (0 == - json_object_set (rep, - "claimed_done", - json_string ((done) ? "yes" : "no"))); + TALER_ARL_amount_add (&rd->total_amount, + &rd->total_amount, + total_amount); + rd->deadline = GNUNET_TIME_timestamp_min (rd->deadline, + deadline); } - TALER_ARL_report (report_lags, - rep); +} + + +/** + * Function called on aggregations that were done for + * a (batch) deposit. + * + * @param cls closure + * @param tracking_serial_id where in the table are we + * @param batch_deposit_serial_id which batch deposit was aggregated + */ +static void +clear_finished_transfer_cb ( + void *cls, + uint64_t tracking_serial_id, + uint64_t batch_deposit_serial_id) +{ + struct AggregationContext *ac = cls; + enum GNUNET_DB_QueryStatus qs; + + if (0 > ac->err) + return; /* already failed */ + GNUNET_assert (ac->max_aggregation_serial < tracking_serial_id); + ac->max_aggregation_serial = tracking_serial_id; + qs = TALER_ARL_adb->delete_pending_deposit ( + TALER_ARL_adb->cls, + batch_deposit_serial_id); + if (0 == qs) + { + /* Aggregated something twice or other error, report! */ + GNUNET_break (0); + // FIXME: report more nicely! + } + if (0 > qs) + ac->err = qs; } @@ -847,30 +1263,78 @@ wire_missing_cb (void *cls, static void check_for_required_transfers (void) { - struct GNUNET_TIME_Timestamp next_timestamp; + struct ImportMissingWireContext wc = { + .max_batch_deposit_uuid = TALER_ARL_USE_PP (wire_batch_deposit_id), + .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT + }; + struct GNUNET_TIME_Absolute deadline; enum GNUNET_DB_QueryStatus qs; + struct ReportMissingWireContext rc = { + .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT + }; + struct AggregationContext ac = { + .max_aggregation_serial = TALER_ARL_USE_PP (wire_aggregation_id), + .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT + }; + qs = TALER_ARL_edb->select_batch_deposits_missing_wire ( + TALER_ARL_edb->cls, + TALER_ARL_USE_PP (wire_batch_deposit_id), + &import_wire_missing_cb, + &wc); + if ( (0 > qs) || (0 > wc.err) ) + { + GNUNET_break (0); + GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) || + (GNUNET_DB_STATUS_SOFT_ERROR == wc.err) ); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + TALER_ARL_USE_PP (wire_batch_deposit_id) = wc.max_batch_deposit_uuid; + qs = TALER_ARL_edb->select_aggregations_above_serial ( + TALER_ARL_edb->cls, + TALER_ARL_USE_PP (wire_aggregation_id), + &clear_finished_transfer_cb, + &ac); + if ( (0 > qs) || (0 > ac.err) ) + { + GNUNET_break (0); + GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) || + (GNUNET_DB_STATUS_SOFT_ERROR == ac.err) ); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + TALER_ARL_USE_PP (wire_aggregation_id) = ac.max_aggregation_serial; /* Subtract #GRACE_PERIOD, so we can be a bit behind in processing without immediately raising undue concern */ - next_timestamp = GNUNET_TIME_absolute_to_timestamp ( - GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), - GRACE_PERIOD)); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Analyzing exchange's unfinished deposits (deadline: %s)\n", - GNUNET_TIME_timestamp2s (next_timestamp)); - qs = TALER_ARL_edb->select_deposits_missing_wire (TALER_ARL_edb->cls, - pp.last_timestamp, - next_timestamp, - &wire_missing_cb, - &next_timestamp); - if (0 > qs) + deadline = GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), + GRACE_PERIOD); + rc.map = GNUNET_CONTAINER_multishortmap_create (1024, + GNUNET_NO); + qs = TALER_ARL_adb->select_pending_deposits ( + TALER_ARL_adb->cls, + deadline, + &report_wire_missing_cb, + &rc); + if ( (0 > qs) || (0 > rc.err) ) { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + GNUNET_break (0); + GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) || + (GNUNET_DB_STATUS_SOFT_ERROR == rc.err) ); + GNUNET_CONTAINER_multishortmap_iterate (rc.map, + &free_report_entry, + NULL); + GNUNET_CONTAINER_multishortmap_destroy (rc.map); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } - pp.last_timestamp = next_timestamp; + GNUNET_CONTAINER_multishortmap_iterate (rc.map, + &generate_report, + NULL); + GNUNET_CONTAINER_multishortmap_destroy (rc.map); /* conclude with success */ commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT); GNUNET_SCHEDULER_shutdown (); @@ -965,6 +1429,9 @@ wire_out_cb (void *cls, GNUNET_TIME_timestamp2s (date), TALER_amount2s (amount), TALER_B2S (wtid)); + TALER_ARL_amount_add (&total_wire_out, + &total_wire_out, + amount); GNUNET_CRYPTO_hash (wtid, sizeof (struct TALER_WireTransferIdentifierRawP), &key); @@ -974,7 +1441,7 @@ wire_out_cb (void *cls, { /* Wire transfer was not made (yet) at all (but would have been justified), so the entire amount is missing / still to be done. - This is moderately harmless, it might just be that the aggreator + This is moderately harmless, it might just be that the aggregator has not yet fully caught up with the transfers it should do. */ TALER_ARL_report ( report_wire_out_inconsistencies, @@ -1142,7 +1609,7 @@ struct CheckMatchContext * @param key key of @a value in #reserve_closures * @param value a `struct ReserveClosure` */ -static int +static enum GNUNET_GenericReturnValue check_rc_matches (void *cls, const struct GNUNET_HashCode *key, void *value) @@ -1172,16 +1639,17 @@ check_rc_matches (void *cls, /** - * Check whether the given transfer was justified by a reserve closure. If - * not, complain that we failed to match an entry from #out_map. This means a - * wire transfer was made without proper justification. + * Check whether the given transfer was justified by a reserve closure or + * profit drain. If not, complain that we failed to match an entry from + * #out_map. This means a wire transfer was made without proper + * justification. * * @param cls a `struct WireAccount` * @param key unused key * @param value the `struct ReserveOutInfo` to report - * @return #GNUNET_OK + * @return #GNUNET_OK on success */ -static int +static enum GNUNET_GenericReturnValue complain_out_not_found (void *cls, const struct GNUNET_HashCode *key, void *value) @@ -1204,6 +1672,131 @@ complain_out_not_found (void *cls, &ctx); if (ctx.found) return GNUNET_OK; + /* check for profit drain */ + { + enum GNUNET_DB_QueryStatus qs; + uint64_t serial; + char *account_section; + char *payto_uri; + struct GNUNET_TIME_Timestamp request_timestamp; + struct TALER_Amount amount; + struct TALER_MasterSignatureP master_sig; + + qs = TALER_ARL_edb->get_drain_profit (TALER_ARL_edb->cls, + &roi->details.wtid, + &serial, + &account_section, + &payto_uri, + &request_timestamp, + &amount, + &master_sig); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return GNUNET_SYSERR; + case GNUNET_DB_STATUS_SOFT_ERROR: + /* should fail on commit later ... */ + GNUNET_break (0); + return GNUNET_NO; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + /* not a profit drain */ + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Profit drain of %s to %s found!\n", + TALER_amount2s (&amount), + payto_uri); + if (GNUNET_OK != + TALER_exchange_offline_profit_drain_verify ( + &roi->details.wtid, + request_timestamp, + &amount, + account_section, + payto_uri, + &TALER_ARL_master_pub, + &master_sig)) + { + GNUNET_break (0); + TALER_ARL_report (report_row_inconsistencies, + GNUNET_JSON_PACK ( + GNUNET_JSON_pack_string ("table", + "profit_drains"), + GNUNET_JSON_pack_uint64 ("row", + serial), + GNUNET_JSON_pack_data_auto ("id", + &roi->details.wtid), + GNUNET_JSON_pack_string ("diagnostic", + "invalid signature"))); + TALER_ARL_amount_add (&total_bad_amount_out_plus, + &total_bad_amount_out_plus, + &amount); + } + else if (0 != + strcasecmp (payto_uri, + roi->details.credit_account_uri)) + { + TALER_ARL_report ( + report_wire_out_inconsistencies, + GNUNET_JSON_PACK ( + GNUNET_JSON_pack_uint64 ("row", + serial), + TALER_JSON_pack_amount ("amount_wired", + &roi->details.amount), + TALER_JSON_pack_amount ("amount_wired", + &amount), + GNUNET_JSON_pack_data_auto ("wtid", + &roi->details.wtid), + TALER_JSON_pack_time_abs_human ("timestamp", + roi->details.execution_date.abs_time), + GNUNET_JSON_pack_string ("account", + wa->ai->section_name), + GNUNET_JSON_pack_string ("diagnostic", + "wrong target account"))); + TALER_ARL_amount_add (&total_bad_amount_out_plus, + &total_bad_amount_out_plus, + &amount); + } + else if (0 != + TALER_amount_cmp (&amount, + &roi->details.amount)) + { + TALER_ARL_report ( + report_wire_out_inconsistencies, + GNUNET_JSON_PACK ( + GNUNET_JSON_pack_uint64 ("row", + serial), + TALER_JSON_pack_amount ("amount_justified", + &roi->details.amount), + TALER_JSON_pack_amount ("amount_wired", + &amount), + GNUNET_JSON_pack_data_auto ("wtid", + &roi->details.wtid), + TALER_JSON_pack_time_abs_human ("timestamp", + roi->details.execution_date.abs_time), + GNUNET_JSON_pack_string ("account", + wa->ai->section_name), + GNUNET_JSON_pack_string ("diagnostic", + "profit drain amount incorrect"))); + TALER_ARL_amount_add (&total_bad_amount_out_minus, + &total_bad_amount_out_minus, + &roi->details.amount); + TALER_ARL_amount_add (&total_bad_amount_out_plus, + &total_bad_amount_out_plus, + &amount); + } + GNUNET_free (account_section); + GNUNET_free (payto_uri); + /* profit drain was correct */ + TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_drained), + &TALER_ARL_USE_AB (total_drained), + &amount); + return GNUNET_OK; + } + } + TALER_ARL_report ( report_wire_out_inconsistencies, GNUNET_JSON_PACK ( @@ -1250,6 +1843,7 @@ check_exchange_wire_out (struct WireAccount *wa) { enum GNUNET_DB_QueryStatus qs; + GNUNET_assert (NULL == wa->dhh); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Analyzing exchange's wire OUT table for account `%s'\n", wa->ai->section_name); @@ -1283,90 +1877,92 @@ check_exchange_wire_out (struct WireAccount *wa) * transactions). * * @param cls `struct WireAccount` with current wire account to process - * @param http_status_code http status of the request - * @param ec error code in case something went wrong - * @param row_off identification of the position at which we are querying - * @param details details about the wire transfer - * @param json original response in JSON format - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration - */ -static int + * @param dhr HTTP response details + */ +static void history_debit_cb (void *cls, - unsigned int http_status_code, - enum TALER_ErrorCode ec, - uint64_t row_off, - const struct TALER_BANK_DebitDetails *details, - const json_t *json) + const struct TALER_BANK_DebitHistoryResponse *dhr) { struct WireAccount *wa = cls; struct ReserveOutInfo *roi; size_t slen; - (void) json; - if (NULL == details) + wa->dhh = NULL; + switch (dhr->http_status) { - wa->dhh = NULL; - if (TALER_EC_NONE != ec) + case MHD_HTTP_OK: + for (unsigned int i = 0; i<dhr->details.ok.details_length; i++) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error fetching debit history of account %s: %u/%u!\n", - wa->ai->section_name, - http_status_code, - (unsigned int) ec); - commit (GNUNET_DB_STATUS_HARD_ERROR); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + const struct TALER_BANK_DebitDetails *dd + = &dhr->details.ok.details[i]; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Analyzing bank DEBIT at %s of %s with WTID %s\n", + GNUNET_TIME_timestamp2s (dd->execution_date), + TALER_amount2s (&dd->amount), + TALER_B2S (&dd->wtid)); + /* Update offset */ + wa->wire_off_out = dd->serial_id; + slen = strlen (dd->credit_account_uri) + 1; + roi = GNUNET_malloc (sizeof (struct ReserveOutInfo) + + slen); + GNUNET_CRYPTO_hash (&dd->wtid, + sizeof (dd->wtid), + &roi->subject_hash); + roi->details.amount = dd->amount; + roi->details.execution_date = dd->execution_date; + roi->details.wtid = dd->wtid; + roi->details.credit_account_uri = (const char *) &roi[1]; + GNUNET_memcpy (&roi[1], + dd->credit_account_uri, + slen); + if (GNUNET_OK != + GNUNET_CONTAINER_multihashmap_put (out_map, + &roi->subject_hash, + roi, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) + { + char *diagnostic; + + GNUNET_asprintf (&diagnostic, + "duplicate subject hash `%s'", + TALER_B2S (&roi->subject_hash)); + TALER_ARL_amount_add (&total_wire_format_amount, + &total_wire_format_amount, + &dd->amount); + TALER_ARL_report (report_wire_format_inconsistencies, + GNUNET_JSON_PACK ( + TALER_JSON_pack_amount ("amount", + &dd->amount), + GNUNET_JSON_pack_uint64 ("wire_offset", + dd->serial_id), + GNUNET_JSON_pack_string ("diagnostic", + diagnostic))); + GNUNET_free (diagnostic); + } } check_exchange_wire_out (wa); - return GNUNET_OK; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Analyzing bank DEBIT at %s of %s with WTID %s\n", - GNUNET_TIME_timestamp2s (details->execution_date), - TALER_amount2s (&details->amount), - TALER_B2S (&details->wtid)); - /* Update offset */ - wa->out_wire_off = row_off; - slen = strlen (details->credit_account_uri) + 1; - roi = GNUNET_malloc (sizeof (struct ReserveOutInfo) - + slen); - GNUNET_CRYPTO_hash (&details->wtid, - sizeof (details->wtid), - &roi->subject_hash); - roi->details.amount = details->amount; - roi->details.execution_date = details->execution_date; - roi->details.wtid = details->wtid; - roi->details.credit_account_uri = (const char *) &roi[1]; - memcpy (&roi[1], - details->credit_account_uri, - slen); - if (GNUNET_OK != - GNUNET_CONTAINER_multihashmap_put (out_map, - &roi->subject_hash, - roi, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) - { - char *diagnostic; - - GNUNET_asprintf (&diagnostic, - "duplicate subject hash `%s'", - TALER_B2S (&roi->subject_hash)); - TALER_ARL_amount_add (&total_wire_format_amount, - &total_wire_format_amount, - &details->amount); - TALER_ARL_report (report_wire_format_inconsistencies, - GNUNET_JSON_PACK ( - TALER_JSON_pack_amount ("amount", - &details->amount), - GNUNET_JSON_pack_uint64 ("wire_offset", - row_off), - GNUNET_JSON_pack_string ("diagnostic", - diagnostic))); - GNUNET_free (diagnostic); - return GNUNET_OK; + return; + case MHD_HTTP_NO_CONTENT: + check_exchange_wire_out (wa); + return; + case MHD_HTTP_NOT_FOUND: + if (ignore_account_404) + { + check_exchange_wire_out (wa); + return; + } + break; + default: + break; } - return GNUNET_OK; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error fetching debit history of account %s: %u/%u!\n", + wa->ai->section_name, + dhr->http_status, + (unsigned int) dhr->ec); + commit (GNUNET_DB_STATUS_HARD_ERROR); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); } @@ -1397,10 +1993,12 @@ process_debits (void *cls) "Checking bank DEBIT records of account `%s'\n", wa->ai->section_name); GNUNET_assert (NULL == wa->dhh); + // FIXME: handle the case where more than INT32_MAX transactions exist. + // (CG: used to be INT64_MAX, changed by MS to INT32_MAX, why? To be discussed with him!) wa->dhh = TALER_BANK_debit_history (ctx, wa->ai->auth, - wa->out_wire_off, - INT64_MAX, + wa->wire_off_out, + INT32_MAX, GNUNET_TIME_UNIT_ZERO, &history_debit_cb, wa); @@ -1423,8 +2021,9 @@ process_debits (void *cls) static void begin_debit_audit (void) { + GNUNET_assert (NULL == out_map); out_map = GNUNET_CONTAINER_multihashmap_create (1024, - GNUNET_YES); + true); process_debits (wa_head); } @@ -1439,8 +2038,11 @@ begin_debit_audit (void) static void conclude_credit_history (void) { - GNUNET_CONTAINER_multihashmap_destroy (in_map); - in_map = NULL; + if (NULL != in_map) + { + GNUNET_CONTAINER_multihashmap_destroy (in_map); + in_map = NULL; + } /* credit done, now check debits */ begin_debit_audit (); } @@ -1459,7 +2061,7 @@ conclude_credit_history (void) * @param execution_date when did we receive the funds * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop */ -static int +static enum GNUNET_GenericReturnValue reserve_in_cb (void *cls, uint64_t rowid, const struct TALER_ReservePublicKeyP *reserve_pub, @@ -1478,6 +2080,9 @@ reserve_in_cb (void *cls, GNUNET_TIME_timestamp2s (execution_date), TALER_amount2s (credit), TALER_B2S (reserve_pub)); + TALER_ARL_amount_add (&total_wire_in, + &total_wire_in, + credit); slen = strlen (sender_account_details) + 1; rii = GNUNET_malloc (sizeof (struct ReserveInInfo) + slen); rii->rowid = rowid; @@ -1485,9 +2090,9 @@ reserve_in_cb (void *cls, rii->details.execution_date = execution_date; rii->details.reserve_pub = *reserve_pub; rii->details.debit_account_uri = (const char *) &rii[1]; - memcpy (&rii[1], - sender_account_details, - slen); + GNUNET_memcpy (&rii[1], + sender_account_details, + slen); GNUNET_CRYPTO_hash (&wire_reference, sizeof (uint64_t), &rii->row_off_hash); @@ -1503,7 +2108,7 @@ reserve_in_cb (void *cls, "reserves_in"), GNUNET_JSON_pack_uint64 ("row", rowid), - GNUNET_JSON_pack_data_auto ("wire_offset_hash", + GNUNET_JSON_pack_data_auto ("id", &rii->row_off_hash), GNUNET_JSON_pack_string ("diagnostic", "duplicate wire offset"))); @@ -1527,7 +2132,7 @@ reserve_in_cb (void *cls, * @param value the `struct ReserveInInfo` to free * @return #GNUNET_OK */ -static int +static enum GNUNET_GenericReturnValue complain_in_not_found (void *cls, const struct GNUNET_HashCode *key, void *value) @@ -1571,50 +2176,19 @@ process_credits (void *cls); /** - * This function is called for all transactions that - * are credited to the exchange's account (incoming - * transactions). + * We got all of the incoming transactions for @a wa, + * finish processing the account. * - * @param cls `struct WireAccount` we are processing - * @param http_status HTTP status returned by the bank - * @param ec error code in case something went wrong - * @param row_off identification of the position at which we are querying - * @param details details about the wire transfer - * @param json raw response - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration - */ -static int -history_credit_cb (void *cls, - unsigned int http_status, - enum TALER_ErrorCode ec, - uint64_t row_off, - const struct TALER_BANK_CreditDetails *details, - const json_t *json) + * @param[in,out] wa wire account to process + */ +static void +conclude_account (struct WireAccount *wa) { - struct WireAccount *wa = cls; - struct ReserveInInfo *rii; - struct GNUNET_HashCode key; - - (void) json; - if (NULL == details) + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Reconciling CREDIT processing of account `%s'\n", + wa->ai->section_name); + if (NULL != in_map) { - wa->chh = NULL; - if (TALER_EC_NONE != ec) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error fetching credit history of account %s: %u/%s!\n", - wa->ai->section_name, - http_status, - TALER_ErrorCode_get_hint (ec)); - commit (GNUNET_DB_STATUS_HARD_ERROR); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; - } - /* end of operation */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Reconciling CREDIT processing of account `%s'\n", - wa->ai->section_name); GNUNET_CONTAINER_multihashmap_iterate (in_map, &complain_in_not_found, wa); @@ -1622,16 +2196,32 @@ history_credit_cb (void *cls, GNUNET_CONTAINER_multihashmap_iterate (in_map, &free_rii, NULL); - process_credits (wa->next); - return GNUNET_OK; } + process_credits (wa->next); +} + + +/** + * Analyze credit transaction @a details into @a wa. + * + * @param[in,out] wa account that received the transfer + * @param details transfer details + * @return true on success, false to stop loop at this point + */ +static bool +analyze_credit (struct WireAccount *wa, + const struct TALER_BANK_CreditDetails *details) +{ + struct ReserveInInfo *rii; + struct GNUNET_HashCode key; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Analyzing bank CREDIT at %s of %s with Reserve-pub %s\n", GNUNET_TIME_timestamp2s (details->execution_date), TALER_amount2s (&details->amount), TALER_B2S (&details->reserve_pub)); - GNUNET_CRYPTO_hash (&row_off, - sizeof (row_off), + GNUNET_CRYPTO_hash (&details->serial_id, + sizeof (details->serial_id), &key); rii = GNUNET_CONTAINER_multihashmap_get (in_map, &key); @@ -1640,13 +2230,12 @@ history_credit_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Failed to find wire transfer at `%s' in exchange database. Audit ends at this point in time.\n", GNUNET_TIME_timestamp2s (details->execution_date)); - wa->chh = NULL; process_credits (wa->next); - return GNUNET_SYSERR; /* not an error, just end of processing */ + return false; /* not an error, just end of processing */ } /* Update offset */ - wa->in_wire_off = row_off; + wa->wire_off_in = details->serial_id; /* compare records with expected data */ if (0 != GNUNET_memcmp (&details->reserve_pub, &rii->details.reserve_pub)) @@ -1657,7 +2246,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), TALER_JSON_pack_amount ("amount_exchange_expected", &rii->details.amount), TALER_JSON_pack_amount ("amount_wired", @@ -1677,7 +2266,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), TALER_JSON_pack_amount ("amount_exchange_expected", &zero), TALER_JSON_pack_amount ("amount_wired", @@ -1703,7 +2292,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), TALER_JSON_pack_amount ("amount_exchange_expected", &rii->details.amount), TALER_JSON_pack_amount ("amount_wired", @@ -1744,19 +2333,19 @@ history_credit_cb (void *cls, if (0 != strcasecmp (details->debit_account_uri, rii->details.debit_account_uri)) { - TALER_ARL_report (report_missattribution_in_inconsistencies, + TALER_ARL_report (report_misattribution_in_inconsistencies, GNUNET_JSON_PACK ( TALER_JSON_pack_amount ("amount", &rii->details.amount), GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), GNUNET_JSON_pack_data_auto ( "reserve_pub", &rii->details.reserve_pub))); - TALER_ARL_amount_add (&total_missattribution_in, - &total_missattribution_in, + TALER_ARL_amount_add (&total_misattribution_in, + &total_misattribution_in, &rii->details.amount); } if (GNUNET_TIME_timestamp_cmp (details->execution_date, @@ -1770,7 +2359,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), GNUNET_JSON_pack_string ("diagnostic", "execution date mismatch"))); } @@ -1779,7 +2368,60 @@ cleanup: free_rii (NULL, &key, rii)); - return GNUNET_OK; + return true; +} + + +/** + * This function is called for all transactions that + * are credited to the exchange's account (incoming + * transactions). + * + * @param cls `struct WireAccount` we are processing + * @param chr HTTP response returned by the bank + */ +static void +history_credit_cb (void *cls, + const struct TALER_BANK_CreditHistoryResponse *chr) +{ + struct WireAccount *wa = cls; + + wa->chh = NULL; + switch (chr->http_status) + { + case MHD_HTTP_OK: + for (unsigned int i = 0; i<chr->details.ok.details_length; i++) + { + const struct TALER_BANK_CreditDetails *cd + = &chr->details.ok.details[i]; + + if (! analyze_credit (wa, + cd)) + return; + } + conclude_account (wa); + return; + case MHD_HTTP_NO_CONTENT: + conclude_account (wa); + return; + case MHD_HTTP_NOT_FOUND: + if (ignore_account_404) + { + conclude_account (wa); + return; + } + break; + default: + break; + } + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error fetching credit history of account %s: %u/%s!\n", + wa->ai->section_name, + chr->http_status, + TALER_ErrorCode_get_hint (chr->ec)); + commit (GNUNET_DB_STATUS_HARD_ERROR); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); } @@ -1828,10 +2470,12 @@ process_credits (void *cls) GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Starting bank CREDIT history of account `%s'\n", wa->ai->section_name); + // NOTE: handle the case where more than INT32_MAX transactions exist. + // (CG: used to be INT64_MAX, changed by MS to INT32_MAX, why? To be discussed with him!) wa->chh = TALER_BANK_credit_history (ctx, wa->ai->auth, - wa->in_wire_off, - INT64_MAX, + wa->wire_off_in, + INT32_MAX, GNUNET_TIME_UNIT_ZERO, &history_credit_cb, wa); @@ -1853,6 +2497,7 @@ process_credits (void *cls) static void begin_credit_audit (void) { + GNUNET_assert (NULL == in_map); in_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_YES); /* now go over all bank accounts and check delta with in_map */ @@ -1861,8 +2506,7 @@ begin_credit_audit (void) /** - * Function called about reserve closing operations - * the aggregator triggered. + * Function called about reserve closing operations the aggregator triggered. * * @param cls closure * @param rowid row identifier used to uniquely identify the reserve closing operation @@ -1872,9 +2516,11 @@ begin_credit_audit (void) * @param reserve_pub public key of the reserve * @param receiver_account where did we send the funds, in payto://-format * @param wtid identifier used for the wire transfer + * @param close_request_row which close request triggered the operation? + * 0 if it was a timeout (not used) * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop */ -static int +static enum GNUNET_GenericReturnValue reserve_closed_cb (void *cls, uint64_t rowid, struct GNUNET_TIME_Timestamp execution_date, @@ -1882,12 +2528,14 @@ reserve_closed_cb (void *cls, const struct TALER_Amount *closing_fee, const struct TALER_ReservePublicKeyP *reserve_pub, const char *receiver_account, - const struct TALER_WireTransferIdentifierRawP *wtid) + const struct TALER_WireTransferIdentifierRawP *wtid, + uint64_t close_request_row) { struct ReserveClosure *rc; struct GNUNET_HashCode key; (void) cls; + (void) close_request_row; rc = GNUNET_new (struct ReserveClosure); if (TALER_ARL_SR_INVALID_NEGATIVE == TALER_ARL_amount_subtract_neg (&rc->amount, @@ -1900,7 +2548,7 @@ reserve_closed_cb (void *cls, "reserves_closures"), GNUNET_JSON_pack_uint64 ("row", rowid), - GNUNET_JSON_pack_data_auto ("reserve_pub", + GNUNET_JSON_pack_data_auto ("id", reserve_pub), TALER_JSON_pack_amount ("amount_with_fee", amount_with_fee), @@ -1913,8 +2561,8 @@ reserve_closed_cb (void *cls, return GNUNET_SYSERR; return GNUNET_OK; } - pp.last_reserve_close_uuid - = GNUNET_MAX (pp.last_reserve_close_uuid, + TALER_ARL_USE_PP (wire_reserve_close_id) + = GNUNET_MAX (TALER_ARL_USE_PP (wire_reserve_close_id), rowid + 1); rc->receiver_account = GNUNET_strdup (receiver_account); rc->wtid = *wtid; @@ -1941,6 +2589,8 @@ reserve_closed_cb (void *cls, static enum GNUNET_DB_QueryStatus begin_transaction (void) { + enum GNUNET_DB_QueryStatus qs; + if (GNUNET_SYSERR == TALER_ARL_edb->preflight (TALER_ARL_edb->cls)) { @@ -1969,17 +2619,66 @@ begin_transaction (void) GNUNET_break (0); return GNUNET_DB_STATUS_HARD_ERROR; } + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (TALER_ARL_currency, + &TALER_ARL_USE_AB (total_drained))); + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (TALER_ARL_currency, + &total_wire_in)); + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (TALER_ARL_currency, + &total_wire_out)); + qs = TALER_ARL_adb->get_balance ( + TALER_ARL_adb->cls, + TALER_ARL_GET_AB (total_drained), + TALER_ARL_GET_AB (final_balance), + NULL); + switch (qs) + { + case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); + return qs; + case GNUNET_DB_STATUS_SOFT_ERROR: + GNUNET_break (0); + return qs; + case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: + had_start_balance = false; + break; + case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + had_start_balance = true; + break; + } for (struct WireAccount *wa = wa_head; NULL != wa; wa = wa->next) { - wa->qsx = TALER_ARL_adb->get_wire_auditor_account_progress ( + GNUNET_asprintf (&wa->label_reserve_in_serial_id, + "wire-%s-%s", + wa->ai->section_name, + "reserve_in_serial_id"); + GNUNET_asprintf (&wa->label_wire_out_serial_id, + "wire-%s-%s", + wa->ai->section_name, + "wire_out_serial_id"); + GNUNET_asprintf (&wa->label_wire_off_in, + "wire-%s-%s", + wa->ai->section_name, + "wire_off_in"); + GNUNET_asprintf (&wa->label_wire_off_out, + "wire-%s-%s", + wa->ai->section_name, + "wire_off_out"); + wa->qsx = TALER_ARL_adb->get_auditor_progress ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - wa->ai->section_name, - &wa->pp, - &wa->in_wire_off, - &wa->out_wire_off); + wa->label_reserve_in_serial_id, + &wa->pp.last_reserve_in_serial_id, + wa->label_wire_out_serial_id, + &wa->pp.last_wire_out_serial_id, + wa->label_wire_off_in, + &wa->wire_off_in, + wa->label_wire_off_out, + &wa->wire_off_out, + NULL); if (0 > wa->qsx) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == wa->qsx); @@ -1987,9 +2686,12 @@ begin_transaction (void) } wa->start_pp = wa->pp; } - qsx_gwap = TALER_ARL_adb->get_wire_auditor_progress (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &pp); + qsx_gwap = TALER_ARL_adb->get_auditor_progress ( + TALER_ARL_adb->cls, + TALER_ARL_GET_PP (wire_reserve_close_id), + TALER_ARL_GET_PP (wire_batch_deposit_id), + TALER_ARL_GET_PP (wire_aggregation_id), + NULL); if (0 > qsx_gwap) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx_gwap); @@ -2002,11 +2704,11 @@ begin_transaction (void) } else { - start_pp = pp; GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Resuming wire audit at %s / %llu\n", - GNUNET_TIME_timestamp2s (pp.last_timestamp), - (unsigned long long) pp.last_reserve_close_uuid); + "Resuming wire audit at %llu / %llu / %llu\n", + (unsigned long long) TALER_ARL_USE_PP (wire_reserve_close_id), + (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id), + (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id)); } { @@ -2014,7 +2716,7 @@ begin_transaction (void) qs = TALER_ARL_edb->select_reserve_closed_above_serial_id ( TALER_ARL_edb->cls, - pp.last_reserve_close_uuid, + TALER_ARL_USE_PP (wire_reserve_close_id), &reserve_closed_cb, NULL); if (0 > qs) @@ -2116,11 +2818,15 @@ run (void *cls, GNUNET_assert (NULL != (report_row_inconsistencies = json_array ())); GNUNET_assert (NULL != - (report_missattribution_in_inconsistencies + (report_misattribution_in_inconsistencies = json_array ())); GNUNET_assert (NULL != (report_lags = json_array ())); GNUNET_assert (NULL != + (report_aml_lags = json_array ())); + GNUNET_assert (NULL != + (report_kyc_lags = json_array ())); + GNUNET_assert (NULL != (report_closure_lags = json_array ())); GNUNET_assert (NULL != (report_account_progress = json_array ())); @@ -2138,7 +2844,7 @@ run (void *cls, &total_bad_amount_in_minus)); GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, - &total_missattribution_in)); + &total_misattribution_in)); GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, &total_amount_lag)); @@ -2192,11 +2898,14 @@ main (int argc, "internal", "perform checks only applicable for exchange-internal audits", &internal_checks), - GNUNET_GETOPT_option_base32_auto ('m', - "exchange-key", - "KEY", - "public key of the exchange (Crockford base32 encoded)", - &TALER_ARL_master_pub), + GNUNET_GETOPT_option_flag ('I', + "ignore-not-found", + "continue, even if the bank account of the exchange was not found", + &ignore_account_404), + GNUNET_GETOPT_option_flag ('t', + "test", + "run in test mode and exit when idle", + &test_mode), GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_OPTION_END |