taler-merchant-wirewatch.c (21003B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2023, 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file src/backend/taler-merchant-wirewatch.c 18 * @brief Process that imports information about incoming bank transfers into the merchant backend 19 * @author Christian Grothoff 20 */ 21 #include "platform.h" 22 #include "microhttpd.h" 23 #include <gnunet/gnunet_util_lib.h> 24 #include <jansson.h> 25 #include <pthread.h> 26 #include <taler/taler_dbevents.h> 27 #include "taler/taler_merchant_util.h" 28 #include "taler/taler_merchant_bank_lib.h" 29 #include "merchantdb_lib.h" 30 #include "merchantdb_lib.h" 31 #include "merchant-database/insert_transfer.h" 32 #include "merchant-database/set_instance.h" 33 #include "merchant-database/select_wirewatch_accounts.h" 34 #include "merchant-database/update_wirewatch_progress.h" 35 #include "merchant-database/event_listen.h" 36 37 38 /** 39 * Timeout for the bank interaction. Rather long as we should do long-polling 40 * and do not want to wake up too often. 41 */ 42 #define BANK_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \ 43 5) 44 45 46 /** 47 * Information about a watch job. 48 */ 49 struct Watch 50 { 51 /** 52 * Kept in a DLL. 53 */ 54 struct Watch *next; 55 56 /** 57 * Kept in a DLL. 58 */ 59 struct Watch *prev; 60 61 /** 62 * Next task to run, if any. 63 */ 64 struct GNUNET_SCHEDULER_Task *task; 65 66 /** 67 * Dynamically adjusted long polling time-out. 68 */ 69 struct GNUNET_TIME_Relative bank_timeout; 70 71 /** 72 * For which instance are we importing bank transfers? 73 */ 74 char *instance_id; 75 76 /** 77 * For which account are we importing bank transfers? 78 */ 79 struct TALER_FullPayto payto_uri; 80 81 /** 82 * Bank history request. 83 */ 84 struct TALER_MERCHANT_BANK_CreditHistoryHandle *hh; 85 86 /** 87 * Start row for the bank interaction. Exclusive. 88 */ 89 uint64_t start_row; 90 91 /** 92 * Artificial delay to use between API calls. Used to 93 * throttle on failures. 94 */ 95 struct GNUNET_TIME_Relative delay; 96 97 /** 98 * When did we start our last HTTP request? 99 */ 100 struct GNUNET_TIME_Absolute start_time; 101 102 /** 103 * How long should long-polling take at least? 104 */ 105 struct GNUNET_TIME_Absolute long_poll_timeout; 106 107 /** 108 * Login data for the bank. 109 */ 110 struct TALER_MERCHANT_BANK_AuthenticationData ad; 111 112 /** 113 * Set to true if we found a transaction in the last iteration. 114 */ 115 bool found; 116 117 }; 118 119 120 /** 121 * Head of active watches. 122 */ 123 static struct Watch *w_head; 124 125 /** 126 * Tail of active watches. 127 */ 128 static struct Watch *w_tail; 129 130 /** 131 * The merchant's configuration. 132 */ 133 static const struct GNUNET_CONFIGURATION_Handle *cfg; 134 135 /** 136 * Our database connection. 137 */ 138 static struct TALER_MERCHANTDB_PostgresContext *pg; 139 140 /** 141 * Handle to the context for interacting with the bank. 142 */ 143 static struct GNUNET_CURL_Context *ctx; 144 145 /** 146 * Scheduler context for running the @e ctx. 147 */ 148 static struct GNUNET_CURL_RescheduleContext *rc; 149 150 /** 151 * Event handler to learn that the configuration changed 152 * and we should shutdown (to be restarted). 153 */ 154 static struct GNUNET_DB_EventHandler *eh; 155 156 /** 157 * Value to return from main(). 0 on success, non-zero on errors. 158 */ 159 static int global_ret; 160 161 /** 162 * How many transactions should we fetch at most per batch? 163 */ 164 static unsigned int batch_size = 32; 165 166 /** 167 * #GNUNET_YES if we are in test mode and should exit when idle. 168 */ 169 static int test_mode; 170 171 /** 172 * #GNUNET_YES if we are in persistent mode and do 173 * not exit on #config_changed. 174 */ 175 static int persist_mode; 176 177 /** 178 * Set to true if we are shutting down due to a 179 * configuration change. 180 */ 181 static bool config_changed_flag; 182 183 /** 184 * Save progress in DB. 185 */ 186 static void 187 save (struct Watch *w) 188 { 189 enum GNUNET_DB_QueryStatus qs; 190 191 qs = TALER_MERCHANTDB_set_instance (pg, 192 w->instance_id); 193 if (qs < 0) 194 { 195 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 196 "Failed to set instance to %s (%d)\n", 197 w->instance_id, 198 qs); 199 GNUNET_SCHEDULER_shutdown (); 200 global_ret = EXIT_FAILURE; 201 return; 202 } 203 204 qs = TALER_MERCHANTDB_update_wirewatch_progress (pg, 205 w->instance_id, 206 w->payto_uri, 207 w->start_row); 208 GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == 209 TALER_MERCHANTDB_set_instance (pg, 210 NULL)); 211 if (qs < 0) 212 { 213 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 214 "Failed to persist wirewatch progress for %s/%s (%d)\n", 215 w->instance_id, 216 w->payto_uri.full_payto, 217 qs); 218 GNUNET_SCHEDULER_shutdown (); 219 global_ret = EXIT_FAILURE; 220 } 221 } 222 223 224 /** 225 * Free resources of @a w. 226 * 227 * @param w watch job to terminate 228 */ 229 static void 230 end_watch (struct Watch *w) 231 { 232 if (NULL != w->task) 233 { 234 GNUNET_SCHEDULER_cancel (w->task); 235 w->task = NULL; 236 } 237 if (NULL != w->hh) 238 { 239 TALER_MERCHANT_BANK_credit_history_cancel (w->hh); 240 w->hh = NULL; 241 } 242 GNUNET_free (w->instance_id); 243 GNUNET_free (w->payto_uri.full_payto); 244 TALER_MERCHANT_BANK_auth_free (&w->ad); 245 GNUNET_CONTAINER_DLL_remove (w_head, 246 w_tail, 247 w); 248 GNUNET_free (w); 249 } 250 251 252 /** 253 * We're being aborted with CTRL-C (or SIGTERM). Shut down. 254 * 255 * @param cls closure 256 */ 257 static void 258 shutdown_task (void *cls) 259 { 260 (void) cls; 261 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 262 "Running shutdown\n"); 263 while (NULL != w_head) 264 { 265 struct Watch *w = w_head; 266 267 save (w); 268 end_watch (w); 269 } 270 if (NULL != eh) 271 { 272 TALER_MERCHANTDB_event_listen_cancel (eh); 273 eh = NULL; 274 } 275 if (NULL != pg) 276 { 277 TALER_MERCHANTDB_disconnect (pg); 278 pg = NULL; 279 } 280 cfg = NULL; 281 if (NULL != ctx) 282 { 283 GNUNET_CURL_fini (ctx); 284 ctx = NULL; 285 } 286 if (NULL != rc) 287 { 288 GNUNET_CURL_gnunet_rc_destroy (rc); 289 rc = NULL; 290 } 291 } 292 293 294 /** 295 * Parse @a subject from wire transfer into @a wtid and @a exchange_url. 296 * 297 * @param subject wire transfer subject to parse; 298 * format is "$WTID $URL" 299 * @param[out] wtid wire transfer ID to extract 300 * @param[out] exchange_url set to exchange URL 301 * @return #GNUNET_OK on success 302 */ 303 static enum GNUNET_GenericReturnValue 304 parse_subject (const char *subject, 305 struct TALER_WireTransferIdentifierRawP *wtid, 306 char **exchange_url) 307 { 308 const char *space; 309 310 space = strchr (subject, ' '); 311 if (NULL == space) 312 return GNUNET_NO; 313 if (GNUNET_OK != 314 GNUNET_STRINGS_string_to_data (subject, 315 space - subject, 316 wtid, 317 sizeof (*wtid))) 318 return GNUNET_NO; 319 space++; 320 if (! TALER_url_valid_charset (space)) 321 return GNUNET_NO; 322 if ( (0 != strncasecmp ("http://", 323 space, 324 strlen ("http://"))) && 325 (0 != strncasecmp ("https://", 326 space, 327 strlen ("https://"))) ) 328 return GNUNET_NO; 329 *exchange_url = GNUNET_strdup (space); 330 return GNUNET_OK; 331 } 332 333 334 /** 335 * Run next iteration. 336 * 337 * @param cls a `struct Watch *` 338 */ 339 static void 340 do_work (void *cls); 341 342 343 /** 344 * Callbacks of this type are used to serve the result of asking 345 * the bank for the credit transaction history. 346 * 347 * @param cls a `struct Watch *` 348 * @param http_status HTTP response code, #MHD_HTTP_OK (200) for successful status request 349 * 0 if the bank's reply is bogus (fails to follow the protocol), 350 * #MHD_HTTP_NO_CONTENT if there are no more results; on success the 351 * last callback is always of this status (even if `abs(num_results)` were 352 * already returned). 353 * @param ec detailed error code 354 * @param serial_id monotonically increasing counter corresponding to the transaction 355 * @param details details about the wire transfer 356 * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration 357 */ 358 static enum GNUNET_GenericReturnValue 359 credit_cb ( 360 void *cls, 361 unsigned int http_status, 362 enum TALER_ErrorCode ec, 363 uint64_t serial_id, 364 const struct TALER_MERCHANT_BANK_CreditDetails *details) 365 { 366 struct Watch *w = cls; 367 368 switch (http_status) 369 { 370 case 0: 371 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 372 "Invalid HTTP response (HTTP status: 0, %d) from bank\n", 373 ec); 374 w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); 375 break; 376 case MHD_HTTP_OK: 377 { 378 enum GNUNET_DB_QueryStatus qs; 379 char *exchange_url; 380 struct TALER_WireTransferIdentifierRawP wtid; 381 bool no_account; 382 bool conflict; 383 384 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 385 "Received wire transfer `%s' over %s\n", 386 details->wire_subject, 387 TALER_amount2s (&details->amount)); 388 w->found = true; 389 if (GNUNET_OK != 390 parse_subject (details->wire_subject, 391 &wtid, 392 &exchange_url)) 393 { 394 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 395 "Skipping transfer %llu (%s): not from exchange\n", 396 (unsigned long long) serial_id, 397 details->wire_subject); 398 w->start_row = serial_id; 399 return GNUNET_OK; 400 } 401 /* FIXME-Performance-Optimization: consider grouping multiple inserts 402 into one bigger transaction with just one notify. */ 403 qs = TALER_MERCHANTDB_set_instance (pg, 404 w->instance_id); 405 if (qs < 0) 406 { 407 GNUNET_break (0); 408 GNUNET_SCHEDULER_shutdown (); 409 w->hh = NULL; 410 GNUNET_free (exchange_url); 411 return GNUNET_SYSERR; 412 } 413 qs = TALER_MERCHANTDB_insert_transfer (pg, 414 w->instance_id, 415 exchange_url, 416 &wtid, 417 &details->amount, 418 details->credit_account_uri, 419 serial_id, 420 &no_account, 421 &conflict); 422 423 GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == 424 TALER_MERCHANTDB_set_instance (pg, 425 NULL)); 426 GNUNET_free (exchange_url); 427 if (qs < 0) 428 { 429 GNUNET_break (0); 430 GNUNET_SCHEDULER_shutdown (); 431 w->hh = NULL; 432 return GNUNET_SYSERR; 433 } 434 if (no_account) 435 { 436 GNUNET_break (0); 437 GNUNET_SCHEDULER_shutdown (); 438 w->hh = NULL; 439 return GNUNET_SYSERR; 440 } 441 if (conflict) 442 { 443 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 444 "Fatal: new wire transfer #%llu has same WTID but different amount %s compared to a previous transfer\n", 445 (unsigned long long) serial_id, 446 TALER_amount2s (&details->amount)); 447 GNUNET_SCHEDULER_shutdown (); 448 w->hh = NULL; 449 return GNUNET_SYSERR; 450 } 451 /* Success => reset back-off timer! */ 452 w->delay = GNUNET_TIME_UNIT_ZERO; 453 } 454 w->start_row = serial_id; 455 return GNUNET_OK; 456 case MHD_HTTP_NO_CONTENT: 457 save (w); 458 /* Delay artificially if server returned before long-poll timeout */ 459 if (! w->found) 460 w->delay = GNUNET_TIME_absolute_get_remaining (w->long_poll_timeout); 461 break; 462 case MHD_HTTP_NOT_FOUND: 463 /* configuration likely wrong, wait at least 1 minute, backoff up to 15 minutes! */ 464 w->delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MINUTES, 465 GNUNET_TIME_STD_BACKOFF (w->delay)); 466 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 467 "Bank claims account is unknown, waiting for %s before trying again\n", 468 GNUNET_TIME_relative2s (w->delay, 469 true)); 470 break; 471 case MHD_HTTP_GATEWAY_TIMEOUT: 472 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 473 "Gateway timeout, adjusting long polling threshold\n"); 474 /* Limit new timeout at request delay */ 475 w->bank_timeout 476 = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_duration ( 477 w->start_time), 478 w->bank_timeout); 479 /* set the timeout a bit earlier */ 480 w->bank_timeout 481 = GNUNET_TIME_relative_subtract (w->bank_timeout, 482 GNUNET_TIME_UNIT_SECONDS); 483 /* do not allow it to go to zero */ 484 w->bank_timeout 485 = GNUNET_TIME_relative_max (w->bank_timeout, 486 GNUNET_TIME_UNIT_SECONDS); 487 w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); 488 break; 489 default: 490 /* Something went wrong, try again, but with back-off */ 491 w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); 492 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 493 "Unexpected HTTP status code %u(%d) from wire gateway\n", 494 http_status, 495 ec); 496 break; 497 } 498 w->hh = NULL; 499 if (test_mode && (! w->found)) 500 { 501 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 502 "No transactions found and in test mode. Ending watch!\n"); 503 end_watch (w); 504 if (NULL == w_head) 505 GNUNET_SCHEDULER_shutdown (); 506 return GNUNET_OK; 507 } 508 w->task = GNUNET_SCHEDULER_add_delayed (w->delay, 509 &do_work, 510 w); 511 return GNUNET_OK; 512 } 513 514 515 static void 516 do_work (void *cls) 517 { 518 struct Watch *w = cls; 519 520 w->task = NULL; 521 w->found = false; 522 w->long_poll_timeout 523 = GNUNET_TIME_relative_to_absolute (w->bank_timeout); 524 w->start_time 525 = GNUNET_TIME_absolute_get (); 526 w->hh = TALER_MERCHANT_BANK_credit_history (ctx, 527 &w->ad, 528 w->start_row, 529 batch_size, 530 test_mode 531 ? GNUNET_TIME_UNIT_ZERO 532 : w->bank_timeout, 533 &credit_cb, 534 w); 535 if (NULL == w->hh) 536 { 537 GNUNET_break (0); 538 GNUNET_SCHEDULER_shutdown (); 539 return; 540 } 541 } 542 543 544 /** 545 * Function called with information about a accounts 546 * the wirewatcher should monitor. 547 * 548 * @param cls closure (NULL) 549 * @param instance instance that owns the account 550 * @param payto_uri account URI 551 * @param credit_facade_url URL for the credit facade 552 * @param credit_facade_credentials account access credentials 553 * @param last_serial last transaction serial (inclusive) we have seen from this account 554 */ 555 static void 556 start_watch ( 557 void *cls, 558 const char *instance, 559 struct TALER_FullPayto payto_uri, 560 const char *credit_facade_url, 561 const json_t *credit_facade_credentials, 562 uint64_t last_serial) 563 { 564 struct Watch *w = GNUNET_new (struct Watch); 565 566 (void) cls; 567 w->bank_timeout = BANK_TIMEOUT; 568 if (GNUNET_OK != 569 TALER_MERCHANT_BANK_auth_parse_json (credit_facade_credentials, 570 credit_facade_url, 571 &w->ad)) 572 { 573 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 574 "Failed to parse authentication data of `%s/%s'\n", 575 instance, 576 payto_uri.full_payto); 577 GNUNET_free (w); 578 GNUNET_SCHEDULER_shutdown (); 579 global_ret = EXIT_NOTCONFIGURED; 580 return; 581 } 582 583 GNUNET_CONTAINER_DLL_insert (w_head, 584 w_tail, 585 w); 586 w->instance_id = GNUNET_strdup (instance); 587 w->payto_uri.full_payto = GNUNET_strdup (payto_uri.full_payto); 588 w->start_row = last_serial; 589 w->task = GNUNET_SCHEDULER_add_now (&do_work, 590 w); 591 } 592 593 594 /** 595 * Function called on configuration change events received from Postgres. We 596 * shutdown (and systemd should restart us). 597 * 598 * @param cls closure (NULL) 599 * @param extra additional event data provided 600 * @param extra_size number of bytes in @a extra 601 */ 602 static void 603 config_changed (void *cls, 604 const void *extra, 605 size_t extra_size) 606 { 607 (void) cls; 608 (void) extra; 609 (void) extra_size; 610 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 611 "Configuration changed, %s\n", 612 0 == persist_mode 613 ? "restarting" 614 : "reinitializing"); 615 config_changed_flag = true; 616 GNUNET_SCHEDULER_shutdown (); 617 } 618 619 620 /** 621 * First task. 622 * 623 * @param cls closure, NULL 624 * @param args remaining command-line arguments 625 * @param cfgfile name of the configuration file used (for saving, can be NULL!) 626 * @param c configuration 627 */ 628 static void 629 run (void *cls, 630 char *const *args, 631 const char *cfgfile, 632 const struct GNUNET_CONFIGURATION_Handle *c) 633 { 634 (void) args; 635 (void) cfgfile; 636 637 cfg = c; 638 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 639 NULL); 640 ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, 641 &rc); 642 rc = GNUNET_CURL_gnunet_rc_create (ctx); 643 if (NULL == ctx) 644 { 645 GNUNET_break (0); 646 GNUNET_SCHEDULER_shutdown (); 647 global_ret = EXIT_FAILURE; 648 return; 649 } 650 if (NULL == 651 (pg = TALER_MERCHANTDB_connect (cfg))) 652 { 653 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 654 "Failed to initialize DB subsystem. Consider running taler-merchant-dbconfig\n"); 655 GNUNET_SCHEDULER_shutdown (); 656 global_ret = EXIT_FAILURE; 657 return; 658 } 659 { 660 struct GNUNET_DB_EventHeaderP es = { 661 .size = htons (sizeof (es)), 662 .type = htons (TALER_DBEVENT_MERCHANT_ACCOUNTS_CHANGED) 663 }; 664 665 eh = TALER_MERCHANTDB_event_listen (pg, 666 &es, 667 GNUNET_TIME_UNIT_FOREVER_REL, 668 &config_changed, 669 NULL); 670 } 671 { 672 enum GNUNET_DB_QueryStatus qs; 673 674 qs = TALER_MERCHANTDB_select_wirewatch_accounts (pg, 675 &start_watch, 676 NULL); 677 if (qs < 0) 678 { 679 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 680 "Failed to obtain wirewatch accounts from database\n"); 681 GNUNET_SCHEDULER_shutdown (); 682 global_ret = EXIT_NO_RESTART; 683 return; 684 } 685 if ( (NULL == w_head) && 686 (GNUNET_YES == test_mode) ) 687 { 688 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 689 "No active wirewatch accounts in database and in test mode. Exiting.\n"); 690 GNUNET_SCHEDULER_shutdown (); 691 global_ret = EXIT_SUCCESS; 692 return; 693 } 694 } 695 } 696 697 698 /** 699 * The main function of taler-merchant-wirewatch 700 * 701 * @param argc number of arguments from the command line 702 * @param argv command line arguments 703 * @return 0 ok, 1 on error 704 */ 705 int 706 main (int argc, 707 char *const *argv) 708 { 709 struct GNUNET_GETOPT_CommandLineOption options[] = { 710 GNUNET_GETOPT_option_flag ('p', 711 "persist", 712 "run in persist mode and do not exit on configuration changes", 713 &persist_mode), 714 GNUNET_GETOPT_option_timetravel ('T', 715 "timetravel"), 716 GNUNET_GETOPT_option_flag ('t', 717 "test", 718 "run in test mode and exit when idle", 719 &test_mode), 720 GNUNET_GETOPT_option_version (VERSION), 721 GNUNET_GETOPT_OPTION_END 722 }; 723 enum GNUNET_GenericReturnValue ret; 724 725 do { 726 config_changed_flag = false; 727 ret = GNUNET_PROGRAM_run ( 728 TALER_MERCHANT_project_data (), 729 argc, argv, 730 "taler-merchant-wirewatch", 731 gettext_noop ( 732 "background process that watches for incoming wire transfers to the merchant bank account"), 733 options, 734 &run, NULL); 735 } while ( (1 == persist_mode) && 736 config_changed_flag); 737 if (GNUNET_SYSERR == ret) 738 return EXIT_INVALIDARGUMENT; 739 if (GNUNET_NO == ret) 740 return EXIT_SUCCESS; 741 return global_ret; 742 } 743 744 745 /* end of taler-exchange-wirewatch.c */