diff options
Diffstat (limited to 'src/auditor/taler-helper-auditor-wire.c')
-rw-r--r-- | src/auditor/taler-helper-auditor-wire.c | 1105 |
1 files changed, 796 insertions, 309 deletions
diff --git a/src/auditor/taler-helper-auditor-wire.c b/src/auditor/taler-helper-auditor-wire.c index f9c87b6f0..d48ac1f18 100644 --- a/src/auditor/taler-helper-auditor-wire.c +++ b/src/auditor/taler-helper-auditor-wire.c @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2017-2022 Taler Systems SA + Copyright (C) 2017-2023 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -53,6 +53,20 @@ /** + * Run in test mode. Exit when idle instead of + * going to sleep and waiting for more work. + * + * FIXME: not yet implemented! + */ +static int test_mode; + +struct TALER_AUDITORDB_WireAccountProgressPoint +{ + uint64_t last_reserve_in_serial_id; + uint64_t last_wire_out_serial_id; +}; + +/** * Information we keep for each supported account. */ struct WireAccount @@ -93,14 +107,34 @@ struct WireAccount struct TALER_AUDITORDB_WireAccountProgressPoint start_pp; /** - * Where we are in the inbound (CREDIT) transaction history. + * Where we are in the inbound transaction history. + */ + uint64_t wire_off_in; + + /** + * Where we are in the outbound transaction history. */ - uint64_t in_wire_off; + uint64_t wire_off_out; /** - * Where we are in the inbound (DEBIT) transaction history. + * Label under which we store our pp's reserve_in_serial_id. */ - uint64_t out_wire_off; + char *label_reserve_in_serial_id; + + /** + * Label under which we store our pp's reserve_in_serial_id. + */ + char *label_wire_out_serial_id; + + /** + * Label under which we store our wire_off_in. + */ + char *label_wire_off_in; + + /** + * Label under which we store our wire_off_out. + */ + char *label_wire_off_out; /** * Return value when we got this account's progress point. @@ -183,20 +217,17 @@ static enum GNUNET_DB_QueryStatus qsx_gwap; /** * Last reserve_in / wire_out serial IDs seen. */ -static struct TALER_AUDITORDB_WireProgressPoint pp; +static TALER_ARL_DEF_PP (wire_reserve_close_id); +static TALER_ARL_DEF_PP (wire_batch_deposit_id); +static TALER_ARL_DEF_PP (wire_aggregation_id); /** - * Last reserve_in / wire_out serial IDs seen. - */ -static struct TALER_AUDITORDB_WireProgressPoint start_pp; - -/** - * Array of reports about row inconsitencies in wire_out table. + * Array of reports about row inconsistencies in wire_out table. */ static json_t *report_wire_out_inconsistencies; /** - * Array of reports about row inconsitencies in reserves_in table. + * Array of reports about row inconsistencies in reserves_in table. */ static json_t *report_reserve_in_inconsistencies; @@ -204,7 +235,7 @@ static json_t *report_reserve_in_inconsistencies; * Array of reports about wrong bank account being recorded for * incoming wire transfers. */ -static json_t *report_missattribution_in_inconsistencies; +static json_t *report_misattribution_in_inconsistencies; /** * Array of reports about row inconsistencies. @@ -228,6 +259,16 @@ static json_t *report_row_minor_inconsistencies; static json_t *report_lags; /** + * Array of reports about lagging transactions from deposits due to missing KYC. + */ +static json_t *report_kyc_lags; + +/** + * Array of reports about lagging transactions from deposits due to pending or frozen AML decisions. + */ +static json_t *report_aml_lags; + +/** * Array of reports about lagging transactions from reserve closures. */ static json_t *report_closure_lags; @@ -267,7 +308,7 @@ static struct TALER_Amount total_bad_amount_in_minus; * for incoming funds and may thus wire funds to the wrong * destination when closing the reserve. */ -static struct TALER_Amount total_missattribution_in; +static struct TALER_Amount total_misattribution_in; /** * Total amount which the exchange did not transfer in time. @@ -297,17 +338,17 @@ static struct TALER_Amount total_wire_out; /** * Total amount of profits drained. */ -static struct TALER_Amount total_drained; +static TALER_ARL_DEF_AB (total_drained); /** - * Starting balance at the beginning of this iteration. + * Final balance at the end of this iteration. */ -static struct TALER_Amount start_balance; +static TALER_ARL_DEF_AB (final_balance); /** - * Final balance at the end of this iteration. + * Starting balance at the beginning of this iteration. */ -static struct TALER_Amount final_balance; +static struct TALER_Amount start_balance; /** * True if #start_balance was initialized. @@ -499,11 +540,11 @@ do_shutdown (void *cls) TALER_JSON_pack_amount ("total_wire_in_delta_minus", &total_bad_amount_in_minus), /* Tested in test-auditor.sh #9 */ - GNUNET_JSON_pack_array_steal ("missattribution_in_inconsistencies", - report_missattribution_in_inconsistencies), + GNUNET_JSON_pack_array_steal ("misattribution_in_inconsistencies", + report_misattribution_in_inconsistencies), /* Tested in test-auditor.sh #9 */ - TALER_JSON_pack_amount ("total_missattribution_in", - &total_missattribution_in), + TALER_JSON_pack_amount ("total_misattribution_in", + &total_misattribution_in), GNUNET_JSON_pack_array_steal ("row_inconsistencies", report_row_inconsistencies), /* Tested in test-auditor.sh #10/#17 */ @@ -520,15 +561,19 @@ do_shutdown (void *cls) TALER_JSON_pack_amount ("total_wire_out", &total_wire_out), TALER_JSON_pack_amount ("total_drained", - &total_drained), + &TALER_ARL_USE_AB (total_drained)), TALER_JSON_pack_amount ("final_balance", - &final_balance), + &TALER_ARL_USE_AB (final_balance)), /* Tested in test-auditor.sh #1 */ TALER_JSON_pack_amount ("total_amount_lag", &total_amount_lag), /* Tested in test-auditor.sh #1 */ GNUNET_JSON_pack_array_steal ("lag_details", report_lags), + GNUNET_JSON_pack_array_steal ("lag_aml_details", + report_aml_lags), + GNUNET_JSON_pack_array_steal ("lag_kyc_details", + report_kyc_lags), /* Tested in test-auditor.sh #22 */ TALER_JSON_pack_amount ("total_closure_amount_lag", &total_closure_amount_lag), @@ -539,22 +584,28 @@ do_shutdown (void *cls) start_time), TALER_JSON_pack_time_abs_human ("wire_auditor_end_time", GNUNET_TIME_absolute_get ()), - GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_uuid", - start_pp.last_reserve_close_uuid), - GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_uuid", - pp.last_reserve_close_uuid), - TALER_JSON_pack_time_abs_human ("start_pp_last_timestamp", - start_pp.last_timestamp.abs_time), - TALER_JSON_pack_time_abs_human ("end_pp_last_timestamp", - pp.last_timestamp.abs_time), + GNUNET_JSON_pack_uint64 ("start_pp_reserve_close_id", + 0 /* no longer supported */), + GNUNET_JSON_pack_uint64 ("end_pp_reserve_close_id", + TALER_ARL_USE_PP (wire_reserve_close_id)), + GNUNET_JSON_pack_uint64 ("start_pp_last_batch_deposit_id", + 0 /* no longer supported */), + GNUNET_JSON_pack_uint64 ("end_pp_last_batch_deposit_id", + TALER_ARL_USE_PP (wire_batch_deposit_id)), + GNUNET_JSON_pack_uint64 ("start_pp_last_aggregation_serial_id", + 0 /* no longer supported */), + GNUNET_JSON_pack_uint64 ("end_pp_last_aggregation_serial_id", + TALER_ARL_USE_PP (wire_aggregation_id)), GNUNET_JSON_pack_array_steal ("account_progress", report_account_progress))); report_wire_out_inconsistencies = NULL; report_reserve_in_inconsistencies = NULL; report_row_inconsistencies = NULL; report_row_minor_inconsistencies = NULL; - report_missattribution_in_inconsistencies = NULL; + report_misattribution_in_inconsistencies = NULL; report_lags = NULL; + report_kyc_lags = NULL; + report_aml_lags = NULL; report_closure_lags = NULL; report_account_progress = NULL; report_wire_format_inconsistencies = NULL; @@ -602,6 +653,10 @@ do_shutdown (void *cls) GNUNET_CONTAINER_DLL_remove (wa_head, wa_tail, wa); + GNUNET_free (wa->label_reserve_in_serial_id); + GNUNET_free (wa->label_wire_out_serial_id); + GNUNET_free (wa->label_wire_off_in); + GNUNET_free (wa->label_wire_off_out); GNUNET_free (wa); } if (NULL != ctx) @@ -656,8 +711,8 @@ check_pending_rc (void *cls, &rc->wtid), GNUNET_JSON_pack_string ("account", rc->receiver_account))); - pp.last_reserve_close_uuid - = GNUNET_MIN (pp.last_reserve_close_uuid, + TALER_ARL_USE_PP (wire_reserve_close_id) + = GNUNET_MIN (TALER_ARL_USE_PP (wire_reserve_close_id), rc->rowid); return GNUNET_OK; } @@ -679,12 +734,12 @@ hash_rc (const char *receiver_account, size_t slen = strlen (receiver_account); char buf[sizeof (struct TALER_WireTransferIdentifierRawP) + slen]; - memcpy (buf, - wtid, - sizeof (*wtid)); - memcpy (&buf[sizeof (*wtid)], - receiver_account, - slen); + GNUNET_memcpy (buf, + wtid, + sizeof (*wtid)); + GNUNET_memcpy (&buf[sizeof (*wtid)], + receiver_account, + slen); GNUNET_CRYPTO_hash (buf, sizeof (buf), key); @@ -709,25 +764,33 @@ commit (enum GNUNET_DB_QueryStatus qs) TALER_ARL_amount_add (&sum, &total_wire_in, &start_balance); - TALER_ARL_amount_subtract (&final_balance, + TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (final_balance), &sum, &total_wire_out); - qs = TALER_ARL_adb->update_predicted_result (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &final_balance, - &total_drained); + qs = TALER_ARL_adb->update_balance ( + TALER_ARL_adb->cls, + TALER_ARL_SET_AB (total_drained), + TALER_ARL_SET_AB (final_balance), + NULL); } else { - TALER_ARL_amount_subtract (&final_balance, + TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (final_balance), &total_wire_in, &total_wire_out); - qs = TALER_ARL_adb->insert_predicted_result (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &final_balance, - &total_drained); + qs = TALER_ARL_adb->insert_balance ( + TALER_ARL_adb->cls, + TALER_ARL_SET_AB (total_drained), + TALER_ARL_SET_AB (final_balance), + NULL); } } + else + { + GNUNET_assert (GNUNET_OK == + TALER_amount_set_zero (TALER_ARL_currency, + &TALER_ARL_USE_AB (final_balance))); + } if (0 > qs) { if (GNUNET_DB_STATUS_SOFT_ERROR == qs) @@ -756,26 +819,33 @@ commit (enum GNUNET_DB_QueryStatus qs) GNUNET_JSON_pack_uint64 ("end_reserve_in", wa->pp.last_reserve_in_serial_id), GNUNET_JSON_pack_uint64 ("start_wire_out", - wa->start_pp. - last_wire_out_serial_id), + wa->start_pp.last_wire_out_serial_id), GNUNET_JSON_pack_uint64 ("end_wire_out", wa->pp.last_wire_out_serial_id)))); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == wa->qsx) - qs = TALER_ARL_adb->update_wire_auditor_account_progress ( + qs = TALER_ARL_adb->update_auditor_progress ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - wa->ai->section_name, - &wa->pp, - wa->in_wire_off, - wa->out_wire_off); + wa->label_reserve_in_serial_id, + wa->pp.last_reserve_in_serial_id, + wa->label_wire_out_serial_id, + wa->pp.last_wire_out_serial_id, + wa->label_wire_off_in, + wa->wire_off_in, + wa->label_wire_off_out, + wa->wire_off_out, + NULL); else - qs = TALER_ARL_adb->insert_wire_auditor_account_progress ( + qs = TALER_ARL_adb->insert_auditor_progress ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - wa->ai->section_name, - &wa->pp, - wa->in_wire_off, - wa->out_wire_off); + wa->label_reserve_in_serial_id, + wa->pp.last_reserve_in_serial_id, + wa->label_wire_out_serial_id, + wa->pp.last_wire_out_serial_id, + wa->label_wire_off_in, + wa->wire_off_in, + wa->label_wire_off_out, + wa->wire_off_out, + NULL); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -788,13 +858,19 @@ commit (enum GNUNET_DB_QueryStatus qs) &check_pending_rc, NULL); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qsx_gwap) - qs = TALER_ARL_adb->update_wire_auditor_progress (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &pp); + qs = TALER_ARL_adb->update_auditor_progress ( + TALER_ARL_adb->cls, + TALER_ARL_SET_PP (wire_reserve_close_id), + TALER_ARL_SET_PP (wire_batch_deposit_id), + TALER_ARL_SET_PP (wire_aggregation_id), + NULL); else - qs = TALER_ARL_adb->insert_wire_auditor_progress (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &pp); + qs = TALER_ARL_adb->insert_auditor_progress ( + TALER_ARL_adb->cls, + TALER_ARL_SET_PP (wire_reserve_close_id), + TALER_ARL_SET_PP (wire_batch_deposit_id), + TALER_ARL_SET_PP (wire_aggregation_id), + NULL); if (0 >= qs) { GNUNET_log (GNUNET_ERROR_TYPE_INFO, @@ -803,8 +879,9 @@ commit (enum GNUNET_DB_QueryStatus qs) return qs; } GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Concluded audit step at %s\n", - GNUNET_TIME_timestamp2s (pp.last_timestamp)); + "Concluded audit step at %llu/%llu\n", + (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id), + (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id)); if (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == qs) { @@ -841,59 +918,341 @@ commit (enum GNUNET_DB_QueryStatus qs) /* ***************************** Analyze required transfers ************************ */ /** - * Function called on deposits that are past their due date - * and have not yet seen a wire transfer. + * Closure for import_wire_missing_cb(). + */ +struct ImportMissingWireContext +{ + /** + * Set to maximum row ID encountered. + */ + uint64_t max_batch_deposit_uuid; + + /** + * Set to database errors in callback. + */ + enum GNUNET_DB_QueryStatus err; +}; + + +/** + * Function called on deposits that need to be checked for their + * wire transfer. * - * @param cls closure - * @param rowid deposit table row of the coin's deposit - * @param coin_pub public key of the coin - * @param amount value of the deposit, including fee - * @param payto_uri where should the funds be wired - * @param deadline what was the requested wire transfer deadline - * @param done did the exchange claim that it made a transfer? - * NOTE: only valid in internal audit mode! + * @param cls closure, points to a `struct ImportMissingWireContext` + * @param batch_deposit_serial_id serial of the entry in the batch deposits table + * @param total_amount value of the missing deposits, including fee + * @param wire_target_h_payto where should the funds be wired + * @param deadline what was the earliest requested wire transfer deadline */ static void -wire_missing_cb (void *cls, - uint64_t rowid, - const struct TALER_CoinSpendPublicKeyP *coin_pub, - const struct TALER_Amount *amount, - const char *payto_uri, - struct GNUNET_TIME_Timestamp deadline, - bool done) +import_wire_missing_cb (void *cls, + uint64_t batch_deposit_serial_id, + const struct TALER_Amount *total_amount, + const struct TALER_PaytoHashP *wire_target_h_payto, + struct GNUNET_TIME_Timestamp deadline) { - json_t *rep; + struct ImportMissingWireContext *wc = cls; + enum GNUNET_DB_QueryStatus qs; + + if (wc->err < 0) + return; /* already failed */ + GNUNET_assert (batch_deposit_serial_id > wc->max_batch_deposit_uuid); + wc->max_batch_deposit_uuid = batch_deposit_serial_id; + qs = TALER_ARL_adb->insert_pending_deposit ( + TALER_ARL_adb->cls, + batch_deposit_serial_id, + wire_target_h_payto, + total_amount, + deadline); + if (qs < 0) + wc->err = qs; +} + + +/** + * Information about a delayed wire transfer and the possible + * reasons for the delay. + */ +struct ReasonDetail +{ + /** + * Total amount that should have been transferred. + */ + struct TALER_Amount total_amount; + + /** + * Earliest deadline for an expected transfer to the account. + */ + struct GNUNET_TIME_Timestamp deadline; + + /** + * Target account, NULL if even that is not known (due to + * exchange lacking required entry in wire_targets table). + */ + char *payto_uri; + + /** + * Reasons due to pending KYC requests. + */ + char *kyc_pending; + + /** + * AML decision state for the target account. + */ + enum TALER_AmlDecisionState status; + + /** + * Current AML threshold for the account, may be an invalid account if the + * default threshold applies. + */ + struct TALER_Amount aml_limit; +}; + +/** + * Closure for report_wire_missing_cb(). + */ +struct ReportMissingWireContext +{ + /** + * Map from wire_target_h_payto to `struct ReasonDetail`. + */ + struct GNUNET_CONTAINER_MultiShortmap *map; + + /** + * Set to database errors in callback. + */ + enum GNUNET_DB_QueryStatus err; +}; + + +/** + * Closure for #clear_finished_transfer_cb(). + */ +struct AggregationContext +{ + /** + * Set to maximum row ID encountered. + */ + uint64_t max_aggregation_serial; + + /** + * Set to database errors in callback. + */ + enum GNUNET_DB_QueryStatus err; +}; + + +/** + * Free memory allocated in @a value. + * + * @param cls unused + * @param key unused + * @param value must be a `struct ReasonDetail` + * @return #GNUNET_YES if we should continue to + * iterate, + * #GNUNET_NO if not. + */ +static enum GNUNET_GenericReturnValue +free_report_entry (void *cls, + const struct GNUNET_ShortHashCode *key, + void *value) +{ + struct ReasonDetail *rd = value; + + GNUNET_free (rd->kyc_pending); + GNUNET_free (rd->payto_uri); + GNUNET_free (rd); + return GNUNET_YES; +} + + +/** + * We had an entry in our map of wire transfers that + * should have been performed. Generate report. + * + * @param cls unused + * @param key unused + * @param value must be a `struct ReasonDetail` + * @return #GNUNET_YES if we should continue to + * iterate, + * #GNUNET_NO if not. + */ +static enum GNUNET_GenericReturnValue +generate_report (void *cls, + const struct GNUNET_ShortHashCode *key, + void *value) +{ + struct ReasonDetail *rd = value; - (void) cls; - TALER_ARL_amount_add (&total_amount_lag, - &total_amount_lag, - amount); /* For now, we simplify and only check that the amount was tiny */ - if (0 > TALER_amount_cmp (amount, + if (0 > TALER_amount_cmp (&rd->total_amount, &tiny_amount)) - return; /* acceptable, amount was tiny */ - rep = GNUNET_JSON_PACK ( - GNUNET_JSON_pack_uint64 ("row", - rowid), - TALER_JSON_pack_amount ("amount", - amount), - TALER_JSON_pack_time_abs_human ("deadline", - deadline.abs_time), - GNUNET_JSON_pack_data_auto ("coin_pub", - coin_pub), - GNUNET_JSON_pack_string ("account", - payto_uri)); - if (internal_checks) + return free_report_entry (cls, + key, + value); /* acceptable, amount was tiny */ + // TODO: maybe split total_amount_lag up by category below? + TALER_ARL_amount_add (&total_amount_lag, + &total_amount_lag, + &rd->total_amount); + if (NULL != rd->kyc_pending) + { + json_t *rep; + + rep = GNUNET_JSON_PACK ( + TALER_JSON_pack_amount ("total_amount", + &rd->total_amount), + TALER_JSON_pack_time_abs_human ("deadline", + rd->deadline.abs_time), + GNUNET_JSON_pack_string ("kyc_pending", + rd->kyc_pending), + GNUNET_JSON_pack_allow_null ( + GNUNET_JSON_pack_string ("account", + rd->payto_uri))); + TALER_ARL_report (report_kyc_lags, + rep); + } + else if (TALER_AML_NORMAL != rd->status) + { + const char *sstatus = "<undefined>"; + json_t *rep; + + switch (rd->status) + { + case TALER_AML_NORMAL: + GNUNET_assert (0); + break; + case TALER_AML_PENDING: + sstatus = "pending"; + break; + case TALER_AML_FROZEN: + sstatus = "frozen"; + break; + } + rep = GNUNET_JSON_PACK ( + TALER_JSON_pack_amount ("total_amount", + &rd->total_amount), + GNUNET_JSON_pack_allow_null ( + TALER_JSON_pack_amount ("aml_limit", + TALER_amount_is_valid (&rd->aml_limit) + ? &rd->aml_limit + : NULL)), + TALER_JSON_pack_time_abs_human ("deadline", + rd->deadline.abs_time), + GNUNET_JSON_pack_string ("aml_status", + sstatus), + GNUNET_JSON_pack_allow_null ( + GNUNET_JSON_pack_string ("account", + rd->payto_uri))); + TALER_ARL_report (report_aml_lags, + rep); + } + else { - /* the 'done' bit is only useful in 'internal' mode */ - GNUNET_assert (0 == - json_object_set (rep, - "claimed_done", - json_string ((done) ? "yes" : "no"))); + json_t *rep; + + rep = GNUNET_JSON_PACK ( + TALER_JSON_pack_amount ("total_amount", + &rd->total_amount), + TALER_JSON_pack_time_abs_human ("deadline", + rd->deadline.abs_time), + GNUNET_JSON_pack_allow_null ( + GNUNET_JSON_pack_string ("account", + rd->payto_uri))); + TALER_ARL_report (report_lags, + rep); } - TALER_ARL_report (report_lags, - rep); + + return free_report_entry (cls, + key, + value); +} + + +/** + * Function called on deposits that are past their due date + * and have not yet seen a wire transfer. + * + * @param cls closure, points to a `struct ReportMissingWireContext` + * @param batch_deposit_serial_id row in the database for which the wire transfer is missing + * @param total_amount value of the missing deposits, including fee + * @param wire_target_h_payto hash of payto-URI where the funds should have been wired + * @param deadline what was the earliest requested wire transfer deadline + */ +static void +report_wire_missing_cb (void *cls, + uint64_t batch_deposit_serial_id, + const struct TALER_Amount *total_amount, + const struct TALER_PaytoHashP *wire_target_h_payto, + struct GNUNET_TIME_Timestamp deadline) +{ + struct ReportMissingWireContext *rc = cls; + struct ReasonDetail *rd; + + rd = GNUNET_CONTAINER_multishortmap_get (rc->map, + &wire_target_h_payto->hash); + if (NULL == rd) + { + rd = GNUNET_new (struct ReasonDetail); + GNUNET_assert (GNUNET_YES == + GNUNET_CONTAINER_multishortmap_put ( + rc->map, + &wire_target_h_payto->hash, + rd, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)); + rc->err = TALER_ARL_edb->select_justification_for_missing_wire ( + TALER_ARL_edb->cls, + wire_target_h_payto, + &rd->payto_uri, + &rd->kyc_pending, + &rd->status, + &rd->aml_limit); + rd->total_amount = *total_amount; + rd->deadline = deadline; + } + else + { + TALER_ARL_amount_add (&rd->total_amount, + &rd->total_amount, + total_amount); + rd->deadline = GNUNET_TIME_timestamp_min (rd->deadline, + deadline); + } +} + + +/** + * Function called on aggregations that were done for + * a (batch) deposit. + * + * @param cls closure + * @param tracking_serial_id where in the table are we + * @param batch_deposit_serial_id which batch deposit was aggregated + */ +static void +clear_finished_transfer_cb ( + void *cls, + uint64_t tracking_serial_id, + uint64_t batch_deposit_serial_id) +{ + struct AggregationContext *ac = cls; + enum GNUNET_DB_QueryStatus qs; + + if (0 > ac->err) + return; /* already failed */ + GNUNET_assert (ac->max_aggregation_serial < tracking_serial_id); + ac->max_aggregation_serial = tracking_serial_id; + qs = TALER_ARL_adb->delete_pending_deposit ( + TALER_ARL_adb->cls, + batch_deposit_serial_id); + if (0 == qs) + { + /* Aggregated something twice or other error, report! */ + GNUNET_break (0); + // FIXME: report more nicely! + } + if (0 > qs) + ac->err = qs; } @@ -904,30 +1263,78 @@ wire_missing_cb (void *cls, static void check_for_required_transfers (void) { - struct GNUNET_TIME_Timestamp next_timestamp; + struct ImportMissingWireContext wc = { + .max_batch_deposit_uuid = TALER_ARL_USE_PP (wire_batch_deposit_id), + .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT + }; + struct GNUNET_TIME_Absolute deadline; enum GNUNET_DB_QueryStatus qs; + struct ReportMissingWireContext rc = { + .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT + }; + struct AggregationContext ac = { + .max_aggregation_serial = TALER_ARL_USE_PP (wire_aggregation_id), + .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT + }; + qs = TALER_ARL_edb->select_batch_deposits_missing_wire ( + TALER_ARL_edb->cls, + TALER_ARL_USE_PP (wire_batch_deposit_id), + &import_wire_missing_cb, + &wc); + if ( (0 > qs) || (0 > wc.err) ) + { + GNUNET_break (0); + GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) || + (GNUNET_DB_STATUS_SOFT_ERROR == wc.err) ); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + TALER_ARL_USE_PP (wire_batch_deposit_id) = wc.max_batch_deposit_uuid; + qs = TALER_ARL_edb->select_aggregations_above_serial ( + TALER_ARL_edb->cls, + TALER_ARL_USE_PP (wire_aggregation_id), + &clear_finished_transfer_cb, + &ac); + if ( (0 > qs) || (0 > ac.err) ) + { + GNUNET_break (0); + GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) || + (GNUNET_DB_STATUS_SOFT_ERROR == ac.err) ); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); + return; + } + TALER_ARL_USE_PP (wire_aggregation_id) = ac.max_aggregation_serial; /* Subtract #GRACE_PERIOD, so we can be a bit behind in processing without immediately raising undue concern */ - next_timestamp = GNUNET_TIME_absolute_to_timestamp ( - GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), - GRACE_PERIOD)); - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Analyzing exchange's unfinished deposits (deadline: %s)\n", - GNUNET_TIME_timestamp2s (next_timestamp)); - qs = TALER_ARL_edb->select_deposits_missing_wire (TALER_ARL_edb->cls, - pp.last_timestamp, - next_timestamp, - &wire_missing_cb, - &next_timestamp); - if (0 > qs) + deadline = GNUNET_TIME_absolute_subtract (GNUNET_TIME_absolute_get (), + GRACE_PERIOD); + rc.map = GNUNET_CONTAINER_multishortmap_create (1024, + GNUNET_NO); + qs = TALER_ARL_adb->select_pending_deposits ( + TALER_ARL_adb->cls, + deadline, + &report_wire_missing_cb, + &rc); + if ( (0 > qs) || (0 > rc.err) ) { - GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); + GNUNET_break (0); + GNUNET_break ( (GNUNET_DB_STATUS_SOFT_ERROR == qs) || + (GNUNET_DB_STATUS_SOFT_ERROR == rc.err) ); + GNUNET_CONTAINER_multishortmap_iterate (rc.map, + &free_report_entry, + NULL); + GNUNET_CONTAINER_multishortmap_destroy (rc.map); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return; } - pp.last_timestamp = next_timestamp; + GNUNET_CONTAINER_multishortmap_iterate (rc.map, + &generate_report, + NULL); + GNUNET_CONTAINER_multishortmap_destroy (rc.map); /* conclude with success */ commit (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT); GNUNET_SCHEDULER_shutdown (); @@ -1034,7 +1441,7 @@ wire_out_cb (void *cls, { /* Wire transfer was not made (yet) at all (but would have been justified), so the entire amount is missing / still to be done. - This is moderately harmless, it might just be that the aggreator + This is moderately harmless, it might just be that the aggregator has not yet fully caught up with the transfers it should do. */ TALER_ARL_report ( report_wire_out_inconsistencies, @@ -1286,6 +1693,7 @@ complain_out_not_found (void *cls, switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: + GNUNET_break (0); global_ret = EXIT_FAILURE; GNUNET_SCHEDULER_shutdown (); return GNUNET_SYSERR; @@ -1297,6 +1705,10 @@ complain_out_not_found (void *cls, /* not a profit drain */ break; case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Profit drain of %s to %s found!\n", + TALER_amount2s (&amount), + payto_uri); if (GNUNET_OK != TALER_exchange_offline_profit_drain_verify ( &roi->details.wtid, @@ -1314,7 +1726,7 @@ complain_out_not_found (void *cls, "profit_drains"), GNUNET_JSON_pack_uint64 ("row", serial), - GNUNET_JSON_pack_data_auto ("wtid", + GNUNET_JSON_pack_data_auto ("id", &roi->details.wtid), GNUNET_JSON_pack_string ("diagnostic", "invalid signature"))); @@ -1377,7 +1789,11 @@ complain_out_not_found (void *cls, } GNUNET_free (account_section); GNUNET_free (payto_uri); - break; + /* profit drain was correct */ + TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_drained), + &TALER_ARL_USE_AB (total_drained), + &amount); + return GNUNET_OK; } } @@ -1427,6 +1843,7 @@ check_exchange_wire_out (struct WireAccount *wa) { enum GNUNET_DB_QueryStatus qs; + GNUNET_assert (NULL == wa->dhh); GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Analyzing exchange's wire OUT table for account `%s'\n", wa->ai->section_name); @@ -1460,92 +1877,92 @@ check_exchange_wire_out (struct WireAccount *wa) * transactions). * * @param cls `struct WireAccount` with current wire account to process - * @param http_status_code http status of the request - * @param ec error code in case something went wrong - * @param row_off identification of the position at which we are querying - * @param details details about the wire transfer - * @param json original response in JSON format - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + * @param dhr HTTP response details */ -static enum GNUNET_GenericReturnValue +static void history_debit_cb (void *cls, - unsigned int http_status_code, - enum TALER_ErrorCode ec, - uint64_t row_off, - const struct TALER_BANK_DebitDetails *details, - const json_t *json) + const struct TALER_BANK_DebitHistoryResponse *dhr) { struct WireAccount *wa = cls; struct ReserveOutInfo *roi; size_t slen; - (void) json; - if (NULL == details) + wa->dhh = NULL; + switch (dhr->http_status) { - wa->dhh = NULL; - if ( (TALER_EC_NONE != ec) && - ( (! ignore_account_404) || - (MHD_HTTP_NOT_FOUND != http_status_code) ) ) + case MHD_HTTP_OK: + for (unsigned int i = 0; i<dhr->details.ok.details_length; i++) { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error fetching debit history of account %s: %u/%u!\n", - wa->ai->section_name, - http_status_code, - (unsigned int) ec); - commit (GNUNET_DB_STATUS_HARD_ERROR); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; + const struct TALER_BANK_DebitDetails *dd + = &dhr->details.ok.details[i]; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Analyzing bank DEBIT at %s of %s with WTID %s\n", + GNUNET_TIME_timestamp2s (dd->execution_date), + TALER_amount2s (&dd->amount), + TALER_B2S (&dd->wtid)); + /* Update offset */ + wa->wire_off_out = dd->serial_id; + slen = strlen (dd->credit_account_uri) + 1; + roi = GNUNET_malloc (sizeof (struct ReserveOutInfo) + + slen); + GNUNET_CRYPTO_hash (&dd->wtid, + sizeof (dd->wtid), + &roi->subject_hash); + roi->details.amount = dd->amount; + roi->details.execution_date = dd->execution_date; + roi->details.wtid = dd->wtid; + roi->details.credit_account_uri = (const char *) &roi[1]; + GNUNET_memcpy (&roi[1], + dd->credit_account_uri, + slen); + if (GNUNET_OK != + GNUNET_CONTAINER_multihashmap_put (out_map, + &roi->subject_hash, + roi, + GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) + { + char *diagnostic; + + GNUNET_asprintf (&diagnostic, + "duplicate subject hash `%s'", + TALER_B2S (&roi->subject_hash)); + TALER_ARL_amount_add (&total_wire_format_amount, + &total_wire_format_amount, + &dd->amount); + TALER_ARL_report (report_wire_format_inconsistencies, + GNUNET_JSON_PACK ( + TALER_JSON_pack_amount ("amount", + &dd->amount), + GNUNET_JSON_pack_uint64 ("wire_offset", + dd->serial_id), + GNUNET_JSON_pack_string ("diagnostic", + diagnostic))); + GNUNET_free (diagnostic); + } } check_exchange_wire_out (wa); - return GNUNET_OK; - } - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Analyzing bank DEBIT at %s of %s with WTID %s\n", - GNUNET_TIME_timestamp2s (details->execution_date), - TALER_amount2s (&details->amount), - TALER_B2S (&details->wtid)); - /* Update offset */ - wa->out_wire_off = row_off; - slen = strlen (details->credit_account_uri) + 1; - roi = GNUNET_malloc (sizeof (struct ReserveOutInfo) - + slen); - GNUNET_CRYPTO_hash (&details->wtid, - sizeof (details->wtid), - &roi->subject_hash); - roi->details.amount = details->amount; - roi->details.execution_date = details->execution_date; - roi->details.wtid = details->wtid; - roi->details.credit_account_uri = (const char *) &roi[1]; - memcpy (&roi[1], - details->credit_account_uri, - slen); - if (GNUNET_OK != - GNUNET_CONTAINER_multihashmap_put (out_map, - &roi->subject_hash, - roi, - GNUNET_CONTAINER_MULTIHASHMAPOPTION_UNIQUE_ONLY)) - { - char *diagnostic; - - GNUNET_asprintf (&diagnostic, - "duplicate subject hash `%s'", - TALER_B2S (&roi->subject_hash)); - TALER_ARL_amount_add (&total_wire_format_amount, - &total_wire_format_amount, - &details->amount); - TALER_ARL_report (report_wire_format_inconsistencies, - GNUNET_JSON_PACK ( - TALER_JSON_pack_amount ("amount", - &details->amount), - GNUNET_JSON_pack_uint64 ("wire_offset", - row_off), - GNUNET_JSON_pack_string ("diagnostic", - diagnostic))); - GNUNET_free (diagnostic); - return GNUNET_OK; + return; + case MHD_HTTP_NO_CONTENT: + check_exchange_wire_out (wa); + return; + case MHD_HTTP_NOT_FOUND: + if (ignore_account_404) + { + check_exchange_wire_out (wa); + return; + } + break; + default: + break; } - return GNUNET_OK; + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error fetching debit history of account %s: %u/%u!\n", + wa->ai->section_name, + dhr->http_status, + (unsigned int) dhr->ec); + commit (GNUNET_DB_STATUS_HARD_ERROR); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); } @@ -1580,7 +1997,7 @@ process_debits (void *cls) // (CG: used to be INT64_MAX, changed by MS to INT32_MAX, why? To be discussed with him!) wa->dhh = TALER_BANK_debit_history (ctx, wa->ai->auth, - wa->out_wire_off, + wa->wire_off_out, INT32_MAX, GNUNET_TIME_UNIT_ZERO, &history_debit_cb, @@ -1604,8 +2021,9 @@ process_debits (void *cls) static void begin_debit_audit (void) { + GNUNET_assert (NULL == out_map); out_map = GNUNET_CONTAINER_multihashmap_create (1024, - GNUNET_YES); + true); process_debits (wa_head); } @@ -1620,8 +2038,11 @@ begin_debit_audit (void) static void conclude_credit_history (void) { - GNUNET_CONTAINER_multihashmap_destroy (in_map); - in_map = NULL; + if (NULL != in_map) + { + GNUNET_CONTAINER_multihashmap_destroy (in_map); + in_map = NULL; + } /* credit done, now check debits */ begin_debit_audit (); } @@ -1669,9 +2090,9 @@ reserve_in_cb (void *cls, rii->details.execution_date = execution_date; rii->details.reserve_pub = *reserve_pub; rii->details.debit_account_uri = (const char *) &rii[1]; - memcpy (&rii[1], - sender_account_details, - slen); + GNUNET_memcpy (&rii[1], + sender_account_details, + slen); GNUNET_CRYPTO_hash (&wire_reference, sizeof (uint64_t), &rii->row_off_hash); @@ -1687,7 +2108,7 @@ reserve_in_cb (void *cls, "reserves_in"), GNUNET_JSON_pack_uint64 ("row", rowid), - GNUNET_JSON_pack_data_auto ("wire_offset_hash", + GNUNET_JSON_pack_data_auto ("id", &rii->row_off_hash), GNUNET_JSON_pack_string ("diagnostic", "duplicate wire offset"))); @@ -1755,52 +2176,19 @@ process_credits (void *cls); /** - * This function is called for all transactions that - * are credited to the exchange's account (incoming - * transactions). + * We got all of the incoming transactions for @a wa, + * finish processing the account. * - * @param cls `struct WireAccount` we are processing - * @param http_status HTTP status returned by the bank - * @param ec error code in case something went wrong - * @param row_off identification of the position at which we are querying - * @param details details about the wire transfer - * @param json raw response - * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration + * @param[in,out] wa wire account to process */ -static enum GNUNET_GenericReturnValue -history_credit_cb (void *cls, - unsigned int http_status, - enum TALER_ErrorCode ec, - uint64_t row_off, - const struct TALER_BANK_CreditDetails *details, - const json_t *json) +static void +conclude_account (struct WireAccount *wa) { - struct WireAccount *wa = cls; - struct ReserveInInfo *rii; - struct GNUNET_HashCode key; - - (void) json; - if (NULL == details) + GNUNET_log (GNUNET_ERROR_TYPE_INFO, + "Reconciling CREDIT processing of account `%s'\n", + wa->ai->section_name); + if (NULL != in_map) { - wa->chh = NULL; - if ( (TALER_EC_NONE != ec) && - ( (! ignore_account_404) || - (MHD_HTTP_NOT_FOUND != http_status) ) ) - { - GNUNET_log (GNUNET_ERROR_TYPE_ERROR, - "Error fetching credit history of account %s: %u/%s!\n", - wa->ai->section_name, - http_status, - TALER_ErrorCode_get_hint (ec)); - commit (GNUNET_DB_STATUS_HARD_ERROR); - global_ret = EXIT_FAILURE; - GNUNET_SCHEDULER_shutdown (); - return GNUNET_SYSERR; - } - /* end of operation */ - GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Reconciling CREDIT processing of account `%s'\n", - wa->ai->section_name); GNUNET_CONTAINER_multihashmap_iterate (in_map, &complain_in_not_found, wa); @@ -1808,16 +2196,32 @@ history_credit_cb (void *cls, GNUNET_CONTAINER_multihashmap_iterate (in_map, &free_rii, NULL); - process_credits (wa->next); - return GNUNET_OK; } + process_credits (wa->next); +} + + +/** + * Analyze credit transaction @a details into @a wa. + * + * @param[in,out] wa account that received the transfer + * @param details transfer details + * @return true on success, false to stop loop at this point + */ +static bool +analyze_credit (struct WireAccount *wa, + const struct TALER_BANK_CreditDetails *details) +{ + struct ReserveInInfo *rii; + struct GNUNET_HashCode key; + GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Analyzing bank CREDIT at %s of %s with Reserve-pub %s\n", GNUNET_TIME_timestamp2s (details->execution_date), TALER_amount2s (&details->amount), TALER_B2S (&details->reserve_pub)); - GNUNET_CRYPTO_hash (&row_off, - sizeof (row_off), + GNUNET_CRYPTO_hash (&details->serial_id, + sizeof (details->serial_id), &key); rii = GNUNET_CONTAINER_multihashmap_get (in_map, &key); @@ -1826,13 +2230,12 @@ history_credit_cb (void *cls, GNUNET_log (GNUNET_ERROR_TYPE_INFO, "Failed to find wire transfer at `%s' in exchange database. Audit ends at this point in time.\n", GNUNET_TIME_timestamp2s (details->execution_date)); - wa->chh = NULL; process_credits (wa->next); - return GNUNET_SYSERR; /* not an error, just end of processing */ + return false; /* not an error, just end of processing */ } /* Update offset */ - wa->in_wire_off = row_off; + wa->wire_off_in = details->serial_id; /* compare records with expected data */ if (0 != GNUNET_memcmp (&details->reserve_pub, &rii->details.reserve_pub)) @@ -1843,7 +2246,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), TALER_JSON_pack_amount ("amount_exchange_expected", &rii->details.amount), TALER_JSON_pack_amount ("amount_wired", @@ -1863,7 +2266,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), TALER_JSON_pack_amount ("amount_exchange_expected", &zero), TALER_JSON_pack_amount ("amount_wired", @@ -1889,7 +2292,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), TALER_JSON_pack_amount ("amount_exchange_expected", &rii->details.amount), TALER_JSON_pack_amount ("amount_wired", @@ -1930,19 +2333,19 @@ history_credit_cb (void *cls, if (0 != strcasecmp (details->debit_account_uri, rii->details.debit_account_uri)) { - TALER_ARL_report (report_missattribution_in_inconsistencies, + TALER_ARL_report (report_misattribution_in_inconsistencies, GNUNET_JSON_PACK ( TALER_JSON_pack_amount ("amount", &rii->details.amount), GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), GNUNET_JSON_pack_data_auto ( "reserve_pub", &rii->details.reserve_pub))); - TALER_ARL_amount_add (&total_missattribution_in, - &total_missattribution_in, + TALER_ARL_amount_add (&total_misattribution_in, + &total_misattribution_in, &rii->details.amount); } if (GNUNET_TIME_timestamp_cmp (details->execution_date, @@ -1956,7 +2359,7 @@ history_credit_cb (void *cls, GNUNET_JSON_pack_uint64 ("row", rii->rowid), GNUNET_JSON_pack_uint64 ("bank_row", - row_off), + details->serial_id), GNUNET_JSON_pack_string ("diagnostic", "execution date mismatch"))); } @@ -1965,7 +2368,60 @@ cleanup: free_rii (NULL, &key, rii)); - return GNUNET_OK; + return true; +} + + +/** + * This function is called for all transactions that + * are credited to the exchange's account (incoming + * transactions). + * + * @param cls `struct WireAccount` we are processing + * @param chr HTTP response returned by the bank + */ +static void +history_credit_cb (void *cls, + const struct TALER_BANK_CreditHistoryResponse *chr) +{ + struct WireAccount *wa = cls; + + wa->chh = NULL; + switch (chr->http_status) + { + case MHD_HTTP_OK: + for (unsigned int i = 0; i<chr->details.ok.details_length; i++) + { + const struct TALER_BANK_CreditDetails *cd + = &chr->details.ok.details[i]; + + if (! analyze_credit (wa, + cd)) + return; + } + conclude_account (wa); + return; + case MHD_HTTP_NO_CONTENT: + conclude_account (wa); + return; + case MHD_HTTP_NOT_FOUND: + if (ignore_account_404) + { + conclude_account (wa); + return; + } + break; + default: + break; + } + GNUNET_log (GNUNET_ERROR_TYPE_ERROR, + "Error fetching credit history of account %s: %u/%s!\n", + wa->ai->section_name, + chr->http_status, + TALER_ErrorCode_get_hint (chr->ec)); + commit (GNUNET_DB_STATUS_HARD_ERROR); + global_ret = EXIT_FAILURE; + GNUNET_SCHEDULER_shutdown (); } @@ -2018,7 +2474,7 @@ process_credits (void *cls) // (CG: used to be INT64_MAX, changed by MS to INT32_MAX, why? To be discussed with him!) wa->chh = TALER_BANK_credit_history (ctx, wa->ai->auth, - wa->in_wire_off, + wa->wire_off_in, INT32_MAX, GNUNET_TIME_UNIT_ZERO, &history_credit_cb, @@ -2041,6 +2497,7 @@ process_credits (void *cls) static void begin_credit_audit (void) { + GNUNET_assert (NULL == in_map); in_map = GNUNET_CONTAINER_multihashmap_create (1024, GNUNET_YES); /* now go over all bank accounts and check delta with in_map */ @@ -2049,8 +2506,7 @@ begin_credit_audit (void) /** - * Function called about reserve closing operations - * the aggregator triggered. + * Function called about reserve closing operations the aggregator triggered. * * @param cls closure * @param rowid row identifier used to uniquely identify the reserve closing operation @@ -2060,6 +2516,8 @@ begin_credit_audit (void) * @param reserve_pub public key of the reserve * @param receiver_account where did we send the funds, in payto://-format * @param wtid identifier used for the wire transfer + * @param close_request_row which close request triggered the operation? + * 0 if it was a timeout (not used) * @return #GNUNET_OK to continue to iterate, #GNUNET_SYSERR to stop */ static enum GNUNET_GenericReturnValue @@ -2070,12 +2528,14 @@ reserve_closed_cb (void *cls, const struct TALER_Amount *closing_fee, const struct TALER_ReservePublicKeyP *reserve_pub, const char *receiver_account, - const struct TALER_WireTransferIdentifierRawP *wtid) + const struct TALER_WireTransferIdentifierRawP *wtid, + uint64_t close_request_row) { struct ReserveClosure *rc; struct GNUNET_HashCode key; (void) cls; + (void) close_request_row; rc = GNUNET_new (struct ReserveClosure); if (TALER_ARL_SR_INVALID_NEGATIVE == TALER_ARL_amount_subtract_neg (&rc->amount, @@ -2088,7 +2548,7 @@ reserve_closed_cb (void *cls, "reserves_closures"), GNUNET_JSON_pack_uint64 ("row", rowid), - GNUNET_JSON_pack_data_auto ("reserve_pub", + GNUNET_JSON_pack_data_auto ("id", reserve_pub), TALER_JSON_pack_amount ("amount_with_fee", amount_with_fee), @@ -2101,8 +2561,8 @@ reserve_closed_cb (void *cls, return GNUNET_SYSERR; return GNUNET_OK; } - pp.last_reserve_close_uuid - = GNUNET_MAX (pp.last_reserve_close_uuid, + TALER_ARL_USE_PP (wire_reserve_close_id) + = GNUNET_MAX (TALER_ARL_USE_PP (wire_reserve_close_id), rowid + 1); rc->receiver_account = GNUNET_strdup (receiver_account); rc->wtid = *wtid; @@ -2161,17 +2621,18 @@ begin_transaction (void) } GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, - &total_drained)); + &TALER_ARL_USE_AB (total_drained))); GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, &total_wire_in)); GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, &total_wire_out)); - qs = TALER_ARL_adb->get_predicted_balance (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &start_balance, - &total_drained); + qs = TALER_ARL_adb->get_balance ( + TALER_ARL_adb->cls, + TALER_ARL_GET_AB (total_drained), + TALER_ARL_GET_AB (final_balance), + NULL); switch (qs) { case GNUNET_DB_STATUS_HARD_ERROR: @@ -2191,13 +2652,33 @@ begin_transaction (void) NULL != wa; wa = wa->next) { - wa->qsx = TALER_ARL_adb->get_wire_auditor_account_progress ( + GNUNET_asprintf (&wa->label_reserve_in_serial_id, + "wire-%s-%s", + wa->ai->section_name, + "reserve_in_serial_id"); + GNUNET_asprintf (&wa->label_wire_out_serial_id, + "wire-%s-%s", + wa->ai->section_name, + "wire_out_serial_id"); + GNUNET_asprintf (&wa->label_wire_off_in, + "wire-%s-%s", + wa->ai->section_name, + "wire_off_in"); + GNUNET_asprintf (&wa->label_wire_off_out, + "wire-%s-%s", + wa->ai->section_name, + "wire_off_out"); + wa->qsx = TALER_ARL_adb->get_auditor_progress ( TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - wa->ai->section_name, - &wa->pp, - &wa->in_wire_off, - &wa->out_wire_off); + wa->label_reserve_in_serial_id, + &wa->pp.last_reserve_in_serial_id, + wa->label_wire_out_serial_id, + &wa->pp.last_wire_out_serial_id, + wa->label_wire_off_in, + &wa->wire_off_in, + wa->label_wire_off_out, + &wa->wire_off_out, + NULL); if (0 > wa->qsx) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == wa->qsx); @@ -2205,9 +2686,12 @@ begin_transaction (void) } wa->start_pp = wa->pp; } - qsx_gwap = TALER_ARL_adb->get_wire_auditor_progress (TALER_ARL_adb->cls, - &TALER_ARL_master_pub, - &pp); + qsx_gwap = TALER_ARL_adb->get_auditor_progress ( + TALER_ARL_adb->cls, + TALER_ARL_GET_PP (wire_reserve_close_id), + TALER_ARL_GET_PP (wire_batch_deposit_id), + TALER_ARL_GET_PP (wire_aggregation_id), + NULL); if (0 > qsx_gwap) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qsx_gwap); @@ -2220,11 +2704,11 @@ begin_transaction (void) } else { - start_pp = pp; GNUNET_log (GNUNET_ERROR_TYPE_INFO, - "Resuming wire audit at %s / %llu\n", - GNUNET_TIME_timestamp2s (pp.last_timestamp), - (unsigned long long) pp.last_reserve_close_uuid); + "Resuming wire audit at %llu / %llu / %llu\n", + (unsigned long long) TALER_ARL_USE_PP (wire_reserve_close_id), + (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id), + (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id)); } { @@ -2232,7 +2716,7 @@ begin_transaction (void) qs = TALER_ARL_edb->select_reserve_closed_above_serial_id ( TALER_ARL_edb->cls, - pp.last_reserve_close_uuid, + TALER_ARL_USE_PP (wire_reserve_close_id), &reserve_closed_cb, NULL); if (0 > qs) @@ -2334,11 +2818,15 @@ run (void *cls, GNUNET_assert (NULL != (report_row_inconsistencies = json_array ())); GNUNET_assert (NULL != - (report_missattribution_in_inconsistencies + (report_misattribution_in_inconsistencies = json_array ())); GNUNET_assert (NULL != (report_lags = json_array ())); GNUNET_assert (NULL != + (report_aml_lags = json_array ())); + GNUNET_assert (NULL != + (report_kyc_lags = json_array ())); + GNUNET_assert (NULL != (report_closure_lags = json_array ())); GNUNET_assert (NULL != (report_account_progress = json_array ())); @@ -2356,7 +2844,7 @@ run (void *cls, &total_bad_amount_in_minus)); GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, - &total_missattribution_in)); + &total_misattribution_in)); GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (TALER_ARL_currency, &total_amount_lag)); @@ -2414,11 +2902,10 @@ main (int argc, "ignore-not-found", "continue, even if the bank account of the exchange was not found", &ignore_account_404), - GNUNET_GETOPT_option_base32_auto ('m', - "exchange-key", - "KEY", - "public key of the exchange (Crockford base32 encoded)", - &TALER_ARL_master_pub), + GNUNET_GETOPT_option_flag ('t', + "test", + "run in test mode and exit when idle", + &test_mode), GNUNET_GETOPT_option_timetravel ('T', "timetravel"), GNUNET_GETOPT_OPTION_END |