taler-merchant-wirewatch.c (20203B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2023, 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file taler-merchant-wirewatch.c 18 * @brief Process that imports information about incoming bank transfers into the merchant backend 19 * @author Christian Grothoff 20 */ 21 #include "taler/platform.h" 22 #include "microhttpd.h" 23 #include <gnunet/gnunet_util_lib.h> 24 #include <jansson.h> 25 #include <pthread.h> 26 #include <taler/taler_dbevents.h> 27 #include "taler/taler_merchant_util.h" 28 #include "taler/taler_merchant_bank_lib.h" 29 #include "taler/taler_merchantdb_lib.h" 30 #include "taler/taler_merchantdb_plugin.h" 31 32 33 /** 34 * Timeout for the bank interaction. Rather long as we should do long-polling 35 * and do not want to wake up too often. 36 */ 37 #define BANK_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \ 38 5) 39 40 41 /** 42 * Information about a watch job. 43 */ 44 struct Watch 45 { 46 /** 47 * Kept in a DLL. 48 */ 49 struct Watch *next; 50 51 /** 52 * Kept in a DLL. 53 */ 54 struct Watch *prev; 55 56 /** 57 * Next task to run, if any. 58 */ 59 struct GNUNET_SCHEDULER_Task *task; 60 61 /** 62 * Dynamically adjusted long polling time-out. 63 */ 64 struct GNUNET_TIME_Relative bank_timeout; 65 66 /** 67 * For which instance are we importing bank transfers? 68 */ 69 char *instance_id; 70 71 /** 72 * For which account are we importing bank transfers? 73 */ 74 struct TALER_FullPayto payto_uri; 75 76 /** 77 * Bank history request. 78 */ 79 struct TALER_MERCHANT_BANK_CreditHistoryHandle *hh; 80 81 /** 82 * Start row for the bank interaction. Exclusive. 83 */ 84 uint64_t start_row; 85 86 /** 87 * Artificial delay to use between API calls. Used to 88 * throttle on failures. 89 */ 90 struct GNUNET_TIME_Relative delay; 91 92 /** 93 * When did we start our last HTTP request? 94 */ 95 struct GNUNET_TIME_Absolute start_time; 96 97 /** 98 * How long should long-polling take at least? 99 */ 100 struct GNUNET_TIME_Absolute long_poll_timeout; 101 102 /** 103 * Login data for the bank. 104 */ 105 struct TALER_MERCHANT_BANK_AuthenticationData ad; 106 107 /** 108 * Set to true if we found a transaction in the last iteration. 109 */ 110 bool found; 111 112 }; 113 114 115 /** 116 * Head of active watches. 117 */ 118 static struct Watch *w_head; 119 120 /** 121 * Tail of active watches. 122 */ 123 static struct Watch *w_tail; 124 125 /** 126 * The merchant's configuration. 127 */ 128 static const struct GNUNET_CONFIGURATION_Handle *cfg; 129 130 /** 131 * Our database plugin. 132 */ 133 static struct TALER_MERCHANTDB_Plugin *db_plugin; 134 135 /** 136 * Handle to the context for interacting with the bank. 137 */ 138 static struct GNUNET_CURL_Context *ctx; 139 140 /** 141 * Scheduler context for running the @e ctx. 142 */ 143 static struct GNUNET_CURL_RescheduleContext *rc; 144 145 /** 146 * Event handler to learn that the configuration changed 147 * and we should shutdown (to be restarted). 148 */ 149 static struct GNUNET_DB_EventHandler *eh; 150 151 /** 152 * Value to return from main(). 0 on success, non-zero on errors. 153 */ 154 static int global_ret; 155 156 /** 157 * How many transactions should we fetch at most per batch? 158 */ 159 static unsigned int batch_size = 32; 160 161 /** 162 * #GNUNET_YES if we are in test mode and should exit when idle. 163 */ 164 static int test_mode; 165 166 /** 167 * #GNUNET_YES if we are in persistent mode and do 168 * not exit on #config_changed. 169 */ 170 static int persist_mode; 171 172 /** 173 * Set to true if we are shutting down due to a 174 * configuration change. 175 */ 176 static bool config_changed_flag; 177 178 /** 179 * Save progress in DB. 180 */ 181 static void 182 save (struct Watch *w) 183 { 184 enum GNUNET_DB_QueryStatus qs; 185 186 qs = db_plugin->update_wirewatch_progress (db_plugin->cls, 187 w->instance_id, 188 w->payto_uri, 189 w->start_row); 190 if (qs < 0) 191 { 192 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 193 "Failed to persist wirewatch progress for %s/%s (%d)\n", 194 w->instance_id, 195 w->payto_uri.full_payto, 196 qs); 197 GNUNET_SCHEDULER_shutdown (); 198 global_ret = EXIT_FAILURE; 199 } 200 } 201 202 203 /** 204 * Free resources of @a w. 205 * 206 * @param w watch job to terminate 207 */ 208 static void 209 end_watch (struct Watch *w) 210 { 211 if (NULL != w->task) 212 { 213 GNUNET_SCHEDULER_cancel (w->task); 214 w->task = NULL; 215 } 216 if (NULL != w->hh) 217 { 218 TALER_MERCHANT_BANK_credit_history_cancel (w->hh); 219 w->hh = NULL; 220 } 221 GNUNET_free (w->instance_id); 222 GNUNET_free (w->payto_uri.full_payto); 223 TALER_MERCHANT_BANK_auth_free (&w->ad); 224 GNUNET_CONTAINER_DLL_remove (w_head, 225 w_tail, 226 w); 227 GNUNET_free (w); 228 } 229 230 231 /** 232 * We're being aborted with CTRL-C (or SIGTERM). Shut down. 233 * 234 * @param cls closure 235 */ 236 static void 237 shutdown_task (void *cls) 238 { 239 (void) cls; 240 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 241 "Running shutdown\n"); 242 while (NULL != w_head) 243 { 244 struct Watch *w = w_head; 245 246 save (w); 247 end_watch (w); 248 } 249 if (NULL != eh) 250 { 251 db_plugin->event_listen_cancel (eh); 252 eh = NULL; 253 } 254 TALER_MERCHANTDB_plugin_unload (db_plugin); 255 db_plugin = NULL; 256 cfg = NULL; 257 if (NULL != ctx) 258 { 259 GNUNET_CURL_fini (ctx); 260 ctx = NULL; 261 } 262 if (NULL != rc) 263 { 264 GNUNET_CURL_gnunet_rc_destroy (rc); 265 rc = NULL; 266 } 267 } 268 269 270 /** 271 * Parse @a subject from wire transfer into @a wtid and @a exchange_url. 272 * 273 * @param subject wire transfer subject to parse; 274 * format is "$WTID $URL" 275 * @param[out] wtid wire transfer ID to extract 276 * @param[out] exchange_url set to exchange URL 277 * @return #GNUNET_OK on success 278 */ 279 static enum GNUNET_GenericReturnValue 280 parse_subject (const char *subject, 281 struct TALER_WireTransferIdentifierRawP *wtid, 282 char **exchange_url) 283 { 284 const char *space; 285 286 space = strchr (subject, ' '); 287 if (NULL == space) 288 return GNUNET_NO; 289 if (GNUNET_OK != 290 GNUNET_STRINGS_string_to_data (subject, 291 space - subject, 292 wtid, 293 sizeof (*wtid))) 294 return GNUNET_NO; 295 space++; 296 if (! TALER_url_valid_charset (space)) 297 return GNUNET_NO; 298 if ( (0 != strncasecmp ("http://", 299 space, 300 strlen ("http://"))) && 301 (0 != strncasecmp ("https://", 302 space, 303 strlen ("https://"))) ) 304 return GNUNET_NO; 305 *exchange_url = GNUNET_strdup (space); 306 return GNUNET_OK; 307 } 308 309 310 /** 311 * Run next iteration. 312 * 313 * @param cls a `struct Watch *` 314 */ 315 static void 316 do_work (void *cls); 317 318 319 /** 320 * Callbacks of this type are used to serve the result of asking 321 * the bank for the credit transaction history. 322 * 323 * @param cls a `struct Watch *` 324 * @param http_status HTTP response code, #MHD_HTTP_OK (200) for successful status request 325 * 0 if the bank's reply is bogus (fails to follow the protocol), 326 * #MHD_HTTP_NO_CONTENT if there are no more results; on success the 327 * last callback is always of this status (even if `abs(num_results)` were 328 * already returned). 329 * @param ec detailed error code 330 * @param serial_id monotonically increasing counter corresponding to the transaction 331 * @param details details about the wire transfer 332 * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration 333 */ 334 static enum GNUNET_GenericReturnValue 335 credit_cb ( 336 void *cls, 337 unsigned int http_status, 338 enum TALER_ErrorCode ec, 339 uint64_t serial_id, 340 const struct TALER_MERCHANT_BANK_CreditDetails *details) 341 { 342 struct Watch *w = cls; 343 344 switch (http_status) 345 { 346 case 0: 347 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 348 "Invalid HTTP response (HTTP status: 0, %d) from bank\n", 349 ec); 350 w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); 351 break; 352 case MHD_HTTP_OK: 353 { 354 enum GNUNET_DB_QueryStatus qs; 355 char *exchange_url; 356 struct TALER_WireTransferIdentifierRawP wtid; 357 bool no_instance; 358 bool no_account; 359 bool conflict; 360 361 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 362 "Received wire transfer `%s' over %s\n", 363 details->wire_subject, 364 TALER_amount2s (&details->amount)); 365 w->found = true; 366 if (GNUNET_OK != 367 parse_subject (details->wire_subject, 368 &wtid, 369 &exchange_url)) 370 { 371 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 372 "Skipping transfer %llu (%s): not from exchange\n", 373 (unsigned long long) serial_id, 374 details->wire_subject); 375 w->start_row = serial_id; 376 return GNUNET_OK; 377 } 378 /* FIXME-Performance-Optimization: consider grouping multiple inserts 379 into one bigger transaction with just one notify. */ 380 qs = db_plugin->insert_transfer (db_plugin->cls, 381 w->instance_id, 382 exchange_url, 383 &wtid, 384 &details->amount, 385 details->credit_account_uri, 386 serial_id, 387 &no_instance, 388 &no_account, 389 &conflict); 390 391 GNUNET_free (exchange_url); 392 if (qs < 0) 393 { 394 GNUNET_break (0); 395 GNUNET_SCHEDULER_shutdown (); 396 w->hh = NULL; 397 return GNUNET_SYSERR; 398 } 399 if (no_instance) 400 { 401 GNUNET_break (0); 402 GNUNET_SCHEDULER_shutdown (); 403 w->hh = NULL; 404 return GNUNET_SYSERR; 405 } 406 if (no_account) 407 { 408 GNUNET_break (0); 409 GNUNET_SCHEDULER_shutdown (); 410 w->hh = NULL; 411 return GNUNET_SYSERR; 412 } 413 if (conflict) 414 { 415 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 416 "Fatal: new wire transfer #%llu has same WTID but different amount %s compared to a previous transfer\n", 417 (unsigned long long) serial_id, 418 TALER_amount2s (&details->amount)); 419 GNUNET_SCHEDULER_shutdown (); 420 w->hh = NULL; 421 return GNUNET_SYSERR; 422 } 423 /* Success => reset back-off timer! */ 424 w->delay = GNUNET_TIME_UNIT_ZERO; 425 } 426 w->start_row = serial_id; 427 return GNUNET_OK; 428 case MHD_HTTP_NO_CONTENT: 429 save (w); 430 /* Delay artificially if server returned before long-poll timeout */ 431 if (! w->found) 432 w->delay = GNUNET_TIME_absolute_get_remaining (w->long_poll_timeout); 433 break; 434 case MHD_HTTP_NOT_FOUND: 435 /* configuration likely wrong, wait at least 1 minute, backoff up to 15 minutes! */ 436 w->delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MINUTES, 437 GNUNET_TIME_STD_BACKOFF (w->delay)); 438 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 439 "Bank claims account is unknown, waiting for %s before trying again\n", 440 GNUNET_TIME_relative2s (w->delay, 441 true)); 442 break; 443 case MHD_HTTP_GATEWAY_TIMEOUT: 444 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 445 "Gateway timeout, adjusting long polling threshold\n"); 446 /* Limit new timeout at request delay */ 447 w->bank_timeout 448 = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_duration ( 449 w->start_time), 450 w->bank_timeout); 451 /* set the timeout a bit earlier */ 452 w->bank_timeout 453 = GNUNET_TIME_relative_subtract (w->bank_timeout, 454 GNUNET_TIME_UNIT_SECONDS); 455 /* do not allow it to go to zero */ 456 w->bank_timeout 457 = GNUNET_TIME_relative_max (w->bank_timeout, 458 GNUNET_TIME_UNIT_SECONDS); 459 w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); 460 break; 461 default: 462 /* Something went wrong, try again, but with back-off */ 463 w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); 464 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 465 "Unexpected HTTP status code %u(%d) from wire gateway\n", 466 http_status, 467 ec); 468 break; 469 } 470 w->hh = NULL; 471 if (test_mode && (! w->found)) 472 { 473 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 474 "No transactions found and in test mode. Ending watch!\n"); 475 end_watch (w); 476 if (NULL == w_head) 477 GNUNET_SCHEDULER_shutdown (); 478 return GNUNET_OK; 479 } 480 w->task = GNUNET_SCHEDULER_add_delayed (w->delay, 481 &do_work, 482 w); 483 return GNUNET_OK; 484 } 485 486 487 static void 488 do_work (void *cls) 489 { 490 struct Watch *w = cls; 491 492 w->task = NULL; 493 w->found = false; 494 w->long_poll_timeout 495 = GNUNET_TIME_relative_to_absolute (w->bank_timeout); 496 w->start_time 497 = GNUNET_TIME_absolute_get (); 498 w->hh = TALER_MERCHANT_BANK_credit_history (ctx, 499 &w->ad, 500 w->start_row, 501 batch_size, 502 test_mode 503 ? GNUNET_TIME_UNIT_ZERO 504 : w->bank_timeout, 505 &credit_cb, 506 w); 507 if (NULL == w->hh) 508 { 509 GNUNET_break (0); 510 GNUNET_SCHEDULER_shutdown (); 511 return; 512 } 513 } 514 515 516 /** 517 * Function called with information about a accounts 518 * the wirewatcher should monitor. 519 * 520 * @param cls closure (NULL) 521 * @param instance instance that owns the account 522 * @param payto_uri account URI 523 * @param credit_facade_url URL for the credit facade 524 * @param credit_facade_credentials account access credentials 525 * @param last_serial last transaction serial (inclusive) we have seen from this account 526 */ 527 static void 528 start_watch ( 529 void *cls, 530 const char *instance, 531 struct TALER_FullPayto payto_uri, 532 const char *credit_facade_url, 533 const json_t *credit_facade_credentials, 534 uint64_t last_serial) 535 { 536 struct Watch *w = GNUNET_new (struct Watch); 537 538 (void) cls; 539 w->bank_timeout = BANK_TIMEOUT; 540 if (GNUNET_OK != 541 TALER_MERCHANT_BANK_auth_parse_json (credit_facade_credentials, 542 credit_facade_url, 543 &w->ad)) 544 { 545 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 546 "Failed to parse authentication data of `%s/%s'\n", 547 instance, 548 payto_uri.full_payto); 549 GNUNET_free (w); 550 GNUNET_SCHEDULER_shutdown (); 551 global_ret = EXIT_NOTCONFIGURED; 552 return; 553 } 554 555 GNUNET_CONTAINER_DLL_insert (w_head, 556 w_tail, 557 w); 558 w->instance_id = GNUNET_strdup (instance); 559 w->payto_uri.full_payto = GNUNET_strdup (payto_uri.full_payto); 560 w->start_row = last_serial; 561 w->task = GNUNET_SCHEDULER_add_now (&do_work, 562 w); 563 } 564 565 566 /** 567 * Function called on configuration change events received from Postgres. We 568 * shutdown (and systemd should restart us). 569 * 570 * @param cls closure (NULL) 571 * @param extra additional event data provided 572 * @param extra_size number of bytes in @a extra 573 */ 574 static void 575 config_changed (void *cls, 576 const void *extra, 577 size_t extra_size) 578 { 579 (void) cls; 580 (void) extra; 581 (void) extra_size; 582 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 583 "Configuration changed, %s\n", 584 0 == persist_mode 585 ? "restarting" 586 : "reinitializing"); 587 config_changed_flag = true; 588 GNUNET_SCHEDULER_shutdown (); 589 } 590 591 592 /** 593 * First task. 594 * 595 * @param cls closure, NULL 596 * @param args remaining command-line arguments 597 * @param cfgfile name of the configuration file used (for saving, can be NULL!) 598 * @param c configuration 599 */ 600 static void 601 run (void *cls, 602 char *const *args, 603 const char *cfgfile, 604 const struct GNUNET_CONFIGURATION_Handle *c) 605 { 606 (void) args; 607 (void) cfgfile; 608 609 cfg = c; 610 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 611 NULL); 612 ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, 613 &rc); 614 rc = GNUNET_CURL_gnunet_rc_create (ctx); 615 if (NULL == ctx) 616 { 617 GNUNET_break (0); 618 GNUNET_SCHEDULER_shutdown (); 619 global_ret = EXIT_FAILURE; 620 return; 621 } 622 if (NULL == 623 (db_plugin = TALER_MERCHANTDB_plugin_load (cfg))) 624 { 625 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 626 "Failed to initialize DB subsystem\n"); 627 GNUNET_SCHEDULER_shutdown (); 628 global_ret = EXIT_NOTCONFIGURED; 629 return; 630 } 631 if (GNUNET_OK != 632 db_plugin->connect (db_plugin->cls)) 633 { 634 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 635 "Failed to connect to database. Consider running taler-merchant-dbinit!\n"); 636 GNUNET_SCHEDULER_shutdown (); 637 global_ret = EXIT_FAILURE; 638 return; 639 } 640 { 641 struct GNUNET_DB_EventHeaderP es = { 642 .size = htons (sizeof (es)), 643 .type = htons (TALER_DBEVENT_MERCHANT_ACCOUNTS_CHANGED) 644 }; 645 646 eh = db_plugin->event_listen (db_plugin->cls, 647 &es, 648 GNUNET_TIME_UNIT_FOREVER_REL, 649 &config_changed, 650 NULL); 651 } 652 { 653 enum GNUNET_DB_QueryStatus qs; 654 655 qs = db_plugin->select_wirewatch_accounts (db_plugin->cls, 656 &start_watch, 657 NULL); 658 if (qs < 0) 659 { 660 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 661 "Failed to obtain wirewatch accounts from database\n"); 662 GNUNET_SCHEDULER_shutdown (); 663 global_ret = EXIT_NO_RESTART; 664 return; 665 } 666 if ( (NULL == w_head) && 667 (GNUNET_YES == test_mode) ) 668 { 669 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 670 "No active wirewatch accounts in database and in test mode. Exiting.\n"); 671 GNUNET_SCHEDULER_shutdown (); 672 global_ret = EXIT_SUCCESS; 673 return; 674 } 675 } 676 } 677 678 679 /** 680 * The main function of taler-merchant-wirewatch 681 * 682 * @param argc number of arguments from the command line 683 * @param argv command line arguments 684 * @return 0 ok, 1 on error 685 */ 686 int 687 main (int argc, 688 char *const *argv) 689 { 690 struct GNUNET_GETOPT_CommandLineOption options[] = { 691 GNUNET_GETOPT_option_flag ('p', 692 "persist", 693 "run in persist mode and do not exit on configuration changes", 694 &persist_mode), 695 GNUNET_GETOPT_option_timetravel ('T', 696 "timetravel"), 697 GNUNET_GETOPT_option_flag ('t', 698 "test", 699 "run in test mode and exit when idle", 700 &test_mode), 701 GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), 702 GNUNET_GETOPT_OPTION_END 703 }; 704 enum GNUNET_GenericReturnValue ret; 705 706 do { 707 config_changed_flag = false; 708 ret = GNUNET_PROGRAM_run ( 709 TALER_MERCHANT_project_data (), 710 argc, argv, 711 "taler-merchant-wirewatch", 712 gettext_noop ( 713 "background process that watches for incoming wire transfers to the merchant bank account"), 714 options, 715 &run, NULL); 716 } while ( (1 == persist_mode) && 717 config_changed_flag); 718 if (GNUNET_SYSERR == ret) 719 return EXIT_INVALIDARGUMENT; 720 if (GNUNET_NO == ret) 721 return EXIT_SUCCESS; 722 return global_ret; 723 } 724 725 726 /* end of taler-exchange-wirewatch.c */