taler-helper-auditor-transfer.c (16117B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2017-2024 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file auditor/taler-helper-auditor-transfer.c 18 * @brief audits that deposits past due date are 19 * aggregated and have a matching wire transfer 20 * database. 21 * @author Christian Grothoff 22 */ 23 #include "taler/platform.h" 24 #include <gnunet/gnunet_util_lib.h> 25 #include <gnunet/gnunet_curl_lib.h> 26 #include "taler/taler_auditordb_plugin.h" 27 #include "taler/taler_exchangedb_lib.h" 28 #include "taler/taler_json_lib.h" 29 #include "taler/taler_signatures.h" 30 #include "report-lib.h" 31 #include "taler/taler_dbevents.h" 32 33 34 /** 35 * Run in test mode. Exit when idle instead of 36 * going to sleep and waiting for more work. 37 */ 38 static int test_mode; 39 40 /** 41 * Return value from main(). 42 */ 43 static int global_ret; 44 45 /** 46 * Last reserve_out / wire_out serial IDs seen. 47 */ 48 static TALER_ARL_DEF_PP (wire_batch_deposit_id); 49 static TALER_ARL_DEF_PP (wire_aggregation_id); 50 51 /** 52 * Total amount which the exchange did not aggregate/transfer in time. 53 */ 54 static TALER_ARL_DEF_AB (total_amount_lag); 55 56 /** 57 * Total amount which the exchange did aggregate/transfer too early. 58 */ 59 static TALER_ARL_DEF_AB (total_early_aggregation); 60 61 /** 62 * Should we run checks that only work for exchange-internal audits? 63 */ 64 static int internal_checks; 65 66 /** 67 * Database event handler to wake us up again. 68 */ 69 static struct GNUNET_DB_EventHandler *eh; 70 71 /** 72 * The auditors's configuration. 73 */ 74 static const struct GNUNET_CONFIGURATION_Handle *cfg; 75 76 77 /** 78 * Task run on shutdown. 79 * 80 * @param cls NULL 81 */ 82 static void 83 do_shutdown (void *cls) 84 { 85 (void) cls; 86 if (NULL != eh) 87 { 88 TALER_ARL_adb->event_listen_cancel (eh); 89 eh = NULL; 90 } 91 TALER_ARL_done (); 92 TALER_EXCHANGEDB_unload_accounts (); 93 TALER_ARL_cfg = NULL; 94 } 95 96 97 /** 98 * Closure for import_wire_missing_cb(). 99 */ 100 struct ImportMissingWireContext 101 { 102 /** 103 * Set to maximum row ID encountered. 104 */ 105 uint64_t max_batch_deposit_uuid; 106 107 /** 108 * Set to database errors in callback. 109 */ 110 enum GNUNET_DB_QueryStatus err; 111 }; 112 113 114 /** 115 * Function called on deposits that need to be checked for their 116 * wire transfer. 117 * 118 * @param cls closure, points to a `struct ImportMissingWireContext` 119 * @param batch_deposit_serial_id serial of the entry in the batch deposits table 120 * @param total_amount value of the missing deposits, including fee 121 * @param wire_target_h_payto where should the funds be wired 122 * @param deadline what was the earliest requested wire transfer deadline 123 */ 124 static void 125 import_wire_missing_cb ( 126 void *cls, 127 uint64_t batch_deposit_serial_id, 128 const struct TALER_Amount *total_amount, 129 const struct TALER_FullPaytoHashP *wire_target_h_payto, 130 struct GNUNET_TIME_Timestamp deadline) 131 { 132 struct ImportMissingWireContext *wc = cls; 133 enum GNUNET_DB_QueryStatus qs; 134 135 if (wc->err < 0) 136 return; /* already failed */ 137 GNUNET_assert (batch_deposit_serial_id >= wc->max_batch_deposit_uuid); 138 wc->max_batch_deposit_uuid = batch_deposit_serial_id + 1; 139 qs = TALER_ARL_adb->delete_early_aggregation ( 140 TALER_ARL_adb->cls, 141 batch_deposit_serial_id); 142 switch (qs) 143 { 144 case GNUNET_DB_STATUS_SOFT_ERROR: 145 GNUNET_break (0); 146 wc->err = qs; 147 return; 148 case GNUNET_DB_STATUS_HARD_ERROR: 149 GNUNET_break (0); 150 wc->err = qs; 151 return; 152 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 153 qs = TALER_ARL_adb->insert_pending_deposit ( 154 TALER_ARL_adb->cls, 155 batch_deposit_serial_id, 156 wire_target_h_payto, 157 total_amount, 158 deadline); 159 if (0 > qs) 160 { 161 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); 162 wc->err = qs; 163 } 164 TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_amount_lag), 165 &TALER_ARL_USE_AB (total_amount_lag), 166 total_amount); 167 break; 168 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 169 TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (total_early_aggregation), 170 &TALER_ARL_USE_AB (total_early_aggregation), 171 total_amount); 172 break; 173 default: 174 GNUNET_assert (0); 175 } 176 } 177 178 179 /** 180 * Checks for wire transfers that should have happened. 181 * 182 * @return transaction status 183 */ 184 static enum GNUNET_DB_QueryStatus 185 check_for_required_transfers (void) 186 { 187 enum GNUNET_DB_QueryStatus qs; 188 struct ImportMissingWireContext wc = { 189 .max_batch_deposit_uuid = TALER_ARL_USE_PP (wire_batch_deposit_id), 190 .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT 191 }; 192 193 qs = TALER_ARL_edb->select_batch_deposits_missing_wire ( 194 TALER_ARL_edb->cls, 195 TALER_ARL_USE_PP (wire_batch_deposit_id), 196 &import_wire_missing_cb, 197 &wc); 198 if (0 > qs) 199 { 200 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); 201 return qs; 202 } 203 if (0 > wc.err) 204 { 205 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == wc.err); 206 return wc.err; 207 } 208 TALER_ARL_USE_PP (wire_batch_deposit_id) = wc.max_batch_deposit_uuid; 209 return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 210 } 211 212 213 /** 214 * Closure for #clear_finished_transfer_cb(). 215 */ 216 struct AggregationContext 217 { 218 /** 219 * Set to maximum row ID encountered. 220 */ 221 uint64_t max_aggregation_serial; 222 223 /** 224 * Set to database errors in callback. 225 */ 226 enum GNUNET_DB_QueryStatus err; 227 }; 228 229 230 /** 231 * Function called on aggregations that were done for 232 * a (batch) deposit. 233 * 234 * @param cls closure 235 * @param amount affected amount 236 * @param tracking_serial_id where in the table are we 237 * @param batch_deposit_serial_id which batch deposit was aggregated 238 */ 239 static void 240 clear_finished_transfer_cb ( 241 void *cls, 242 const struct TALER_Amount *amount, 243 uint64_t tracking_serial_id, 244 uint64_t batch_deposit_serial_id) 245 { 246 struct AggregationContext *ac = cls; 247 enum GNUNET_DB_QueryStatus qs; 248 249 if (0 > ac->err) 250 return; /* already failed */ 251 GNUNET_assert (ac->max_aggregation_serial <= tracking_serial_id); 252 ac->max_aggregation_serial = tracking_serial_id + 1; 253 qs = TALER_ARL_adb->delete_pending_deposit ( 254 TALER_ARL_adb->cls, 255 batch_deposit_serial_id); 256 switch (qs) 257 { 258 case GNUNET_DB_STATUS_SOFT_ERROR: 259 GNUNET_break (0); 260 ac->err = qs; 261 return; 262 case GNUNET_DB_STATUS_HARD_ERROR: 263 GNUNET_break (0); 264 ac->err = qs; 265 return; 266 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 267 qs = TALER_ARL_adb->insert_early_aggregation ( 268 TALER_ARL_adb->cls, 269 batch_deposit_serial_id, 270 tracking_serial_id, 271 amount); 272 if (0 > qs) 273 { 274 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); 275 ac->err = qs; 276 return; 277 } 278 TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_early_aggregation), 279 &TALER_ARL_USE_AB (total_early_aggregation), 280 amount); 281 break; 282 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 283 TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (total_amount_lag), 284 &TALER_ARL_USE_AB (total_amount_lag), 285 amount); 286 break; 287 default: 288 GNUNET_assert (0); 289 } 290 } 291 292 293 /** 294 * Checks that all wire transfers that should have happened 295 * (based on deposits) have indeed happened. 296 * 297 * @return transaction status 298 */ 299 static enum GNUNET_DB_QueryStatus 300 check_for_completed_transfers (void) 301 { 302 struct AggregationContext ac = { 303 .max_aggregation_serial = TALER_ARL_USE_PP (wire_aggregation_id), 304 .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT 305 }; 306 enum GNUNET_DB_QueryStatus qs; 307 308 qs = TALER_ARL_edb->select_aggregations_above_serial ( 309 TALER_ARL_edb->cls, 310 TALER_ARL_USE_PP (wire_aggregation_id), 311 &clear_finished_transfer_cb, 312 &ac); 313 if (0 > qs) 314 { 315 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); 316 return qs; 317 } 318 if (0 > ac.err) 319 { 320 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == ac.err); 321 return ac.err; 322 } 323 TALER_ARL_USE_PP (wire_aggregation_id) = ac.max_aggregation_serial; 324 return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 325 } 326 327 328 /** 329 * Start the database transactions and begin the audit. 330 * 331 * @return transaction status 332 */ 333 static enum GNUNET_DB_QueryStatus 334 begin_transaction (void) 335 { 336 enum GNUNET_DB_QueryStatus qs; 337 338 if (GNUNET_SYSERR == 339 TALER_ARL_edb->preflight (TALER_ARL_edb->cls)) 340 { 341 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 342 "Failed to initialize exchange database connection.\n"); 343 return GNUNET_DB_STATUS_HARD_ERROR; 344 } 345 if (GNUNET_SYSERR == 346 TALER_ARL_adb->preflight (TALER_ARL_adb->cls)) 347 { 348 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 349 "Failed to initialize auditor database session.\n"); 350 return GNUNET_DB_STATUS_HARD_ERROR; 351 } 352 if (GNUNET_OK != 353 TALER_ARL_adb->start (TALER_ARL_adb->cls)) 354 { 355 GNUNET_break (0); 356 return GNUNET_DB_STATUS_HARD_ERROR; 357 } 358 if (GNUNET_OK != 359 TALER_ARL_edb->start_read_only (TALER_ARL_edb->cls, 360 "transfer auditor")) 361 { 362 GNUNET_break (0); 363 TALER_ARL_adb->rollback (TALER_ARL_adb->cls); 364 return GNUNET_DB_STATUS_HARD_ERROR; 365 } 366 qs = TALER_ARL_adb->get_auditor_progress ( 367 TALER_ARL_adb->cls, 368 TALER_ARL_GET_PP (wire_batch_deposit_id), 369 TALER_ARL_GET_PP (wire_aggregation_id), 370 NULL); 371 if (0 > qs) 372 goto handle_db_error; 373 374 qs = TALER_ARL_adb->get_balance ( 375 TALER_ARL_adb->cls, 376 TALER_ARL_GET_AB (total_amount_lag), 377 TALER_ARL_GET_AB (total_early_aggregation), 378 NULL); 379 if (0 > qs) 380 goto handle_db_error; 381 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) 382 { 383 GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE, 384 "First analysis of with transfer auditor, starting audit from scratch\n"); 385 } 386 else 387 { 388 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 389 "Resuming transfer audit at %llu / %llu\n", 390 (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id), 391 (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id)); 392 } 393 394 qs = check_for_required_transfers (); 395 if (0 > qs) 396 goto handle_db_error; 397 qs = check_for_completed_transfers (); 398 if (0 > qs) 399 goto handle_db_error; 400 401 qs = TALER_ARL_adb->update_auditor_progress ( 402 TALER_ARL_adb->cls, 403 TALER_ARL_SET_PP (wire_batch_deposit_id), 404 TALER_ARL_SET_PP (wire_aggregation_id), 405 NULL); 406 if (0 > qs) 407 goto handle_db_error; 408 qs = TALER_ARL_adb->insert_auditor_progress ( 409 TALER_ARL_adb->cls, 410 TALER_ARL_SET_PP (wire_batch_deposit_id), 411 TALER_ARL_SET_PP (wire_aggregation_id), 412 NULL); 413 if (0 > qs) 414 goto handle_db_error; 415 qs = TALER_ARL_adb->update_balance ( 416 TALER_ARL_adb->cls, 417 TALER_ARL_SET_AB (total_amount_lag), 418 TALER_ARL_SET_AB (total_early_aggregation), 419 NULL); 420 if (0 > qs) 421 goto handle_db_error; 422 qs = TALER_ARL_adb->insert_balance ( 423 TALER_ARL_adb->cls, 424 TALER_ARL_SET_AB (total_amount_lag), 425 TALER_ARL_SET_AB (total_early_aggregation), 426 NULL); 427 if (0 > qs) 428 goto handle_db_error; 429 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 430 "Concluded audit step at %llu/%llu\n", 431 (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id), 432 (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id)); 433 TALER_ARL_edb->rollback (TALER_ARL_edb->cls); 434 qs = TALER_ARL_adb->commit (TALER_ARL_adb->cls); 435 if (0 > qs) 436 goto handle_db_error; 437 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 438 "Transaction concluded!\n"); 439 return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 440 handle_db_error: 441 TALER_ARL_adb->rollback (TALER_ARL_adb->cls); 442 TALER_ARL_edb->rollback (TALER_ARL_edb->cls); 443 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); 444 return qs; 445 } 446 447 448 /** 449 * Start auditor process. 450 */ 451 static void 452 start (void) 453 { 454 enum GNUNET_DB_QueryStatus qs; 455 456 for (unsigned int max_retries = 3; max_retries>0; max_retries--) 457 { 458 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 459 "Trying again (%u attempts left)\n", 460 max_retries); 461 qs = begin_transaction (); 462 if (GNUNET_DB_STATUS_SOFT_ERROR != qs) 463 break; 464 } 465 if (0 > qs) 466 { 467 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 468 "Audit failed\n"); 469 GNUNET_break (0); 470 global_ret = EXIT_FAILURE; 471 GNUNET_SCHEDULER_shutdown (); 472 return; 473 } 474 } 475 476 477 /** 478 * Function called on events received from Postgres. 479 * 480 * @param cls closure, NULL 481 * @param extra additional event data provided 482 * @param extra_size number of bytes in @a extra 483 */ 484 static void 485 db_notify (void *cls, 486 const void *extra, 487 size_t extra_size) 488 { 489 (void) cls; 490 (void) extra; 491 (void) extra_size; 492 493 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 494 "Received notification to wake transfer helper\n"); 495 start (); 496 } 497 498 499 /** 500 * Main function that will be run. 501 * 502 * @param cls closure 503 * @param args remaining command-line arguments 504 * @param cfgfile name of the configuration file used (for saving, can be NULL!) 505 * @param c configuration 506 */ 507 static void 508 run (void *cls, 509 char *const *args, 510 const char *cfgfile, 511 const struct GNUNET_CONFIGURATION_Handle *c) 512 { 513 (void) cls; 514 (void) args; 515 (void) cfgfile; 516 cfg = c; 517 if (GNUNET_OK != 518 TALER_ARL_init (c)) 519 { 520 global_ret = EXIT_FAILURE; 521 return; 522 } 523 GNUNET_SCHEDULER_add_shutdown (&do_shutdown, 524 NULL); 525 if (GNUNET_OK != 526 TALER_EXCHANGEDB_load_accounts (TALER_ARL_cfg, 527 TALER_EXCHANGEDB_ALO_DEBIT 528 | TALER_EXCHANGEDB_ALO_CREDIT 529 | TALER_EXCHANGEDB_ALO_AUTHDATA)) 530 { 531 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 532 "No bank accounts configured\n"); 533 global_ret = EXIT_NOTCONFIGURED; 534 GNUNET_SCHEDULER_shutdown (); 535 return; 536 } 537 if (0 == test_mode) 538 { 539 // FIXME-Optimization: use different event type in the future! 540 struct GNUNET_DB_EventHeaderP es = { 541 .size = htons (sizeof (es)), 542 .type = htons (TALER_DBEVENT_EXCHANGE_AUDITOR_WAKE_HELPER_WIRE) 543 }; 544 545 eh = TALER_ARL_adb->event_listen (TALER_ARL_adb->cls, 546 &es, 547 GNUNET_TIME_UNIT_FOREVER_REL, 548 &db_notify, 549 NULL); 550 GNUNET_assert (NULL != eh); 551 } 552 start (); 553 } 554 555 556 /** 557 * The main function of the wire auditing tool. Checks that 558 * the exchange's records of wire transfers match that of 559 * the wire gateway. 560 * 561 * @param argc number of arguments from the command line 562 * @param argv command line arguments 563 * @return 0 ok, 1 on error 564 */ 565 int 566 main (int argc, 567 char *const *argv) 568 { 569 const struct GNUNET_GETOPT_CommandLineOption options[] = { 570 GNUNET_GETOPT_option_flag ('i', 571 "internal", 572 "perform checks only applicable for exchange-internal audits", 573 &internal_checks), 574 GNUNET_GETOPT_option_flag ('t', 575 "test", 576 "run in test mode and exit when idle", 577 &test_mode), 578 GNUNET_GETOPT_option_timetravel ('T', 579 "timetravel"), 580 GNUNET_GETOPT_OPTION_END 581 }; 582 enum GNUNET_GenericReturnValue ret; 583 584 ret = GNUNET_PROGRAM_run ( 585 TALER_AUDITOR_project_data (), 586 argc, 587 argv, 588 "taler-helper-auditor-transfer", 589 gettext_noop ( 590 "Audit exchange database for consistency of aggregations/transfers with respect to deposit deadlines"), 591 options, 592 &run, 593 NULL); 594 if (GNUNET_SYSERR == ret) 595 return EXIT_INVALIDARGUMENT; 596 if (GNUNET_NO == ret) 597 return EXIT_SUCCESS; 598 return global_ret; 599 } 600 601 602 /* end of taler-helper-auditor-transfer.c */