taler-merchant-wirewatch.c (19574B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2023, 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file taler-merchant-wirewatch.c 18 * @brief Process that imports information about incoming bank transfers into the merchant backend 19 * @author Christian Grothoff 20 */ 21 #include "platform.h" 22 #include "microhttpd.h" 23 #include <gnunet/gnunet_util_lib.h> 24 #include <jansson.h> 25 #include <pthread.h> 26 #include <taler/taler_dbevents.h> 27 #include "taler_merchant_util.h" 28 #include "taler_merchant_bank_lib.h" 29 #include "taler_merchantdb_lib.h" 30 #include "taler_merchantdb_plugin.h" 31 32 /** 33 * Timeout for the bank interaction. Rather long as we should do long-polling 34 * and do not want to wake up too often. 35 */ 36 #define BANK_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \ 37 5) 38 39 40 /** 41 * Information about a watch job. 42 */ 43 struct Watch 44 { 45 /** 46 * Kept in a DLL. 47 */ 48 struct Watch *next; 49 50 /** 51 * Kept in a DLL. 52 */ 53 struct Watch *prev; 54 55 /** 56 * Next task to run, if any. 57 */ 58 struct GNUNET_SCHEDULER_Task *task; 59 60 /** 61 * Dynamically adjusted long polling time-out. 62 */ 63 struct GNUNET_TIME_Relative bank_timeout; 64 65 /** 66 * For which instance are we importing bank transfers? 67 */ 68 char *instance_id; 69 70 /** 71 * For which account are we importing bank transfers? 72 */ 73 struct TALER_FullPayto payto_uri; 74 75 /** 76 * Bank history request. 77 */ 78 struct TALER_MERCHANT_BANK_CreditHistoryHandle *hh; 79 80 /** 81 * Start row for the bank interaction. Exclusive. 82 */ 83 uint64_t start_row; 84 85 /** 86 * Artificial delay to use between API calls. Used to 87 * throttle on failures. 88 */ 89 struct GNUNET_TIME_Relative delay; 90 91 /** 92 * When did we start our last HTTP request? 93 */ 94 struct GNUNET_TIME_Absolute start_time; 95 96 /** 97 * How long should long-polling take at least? 98 */ 99 struct GNUNET_TIME_Absolute long_poll_timeout; 100 101 /** 102 * Login data for the bank. 103 */ 104 struct TALER_MERCHANT_BANK_AuthenticationData ad; 105 106 /** 107 * Set to true if we found a transaction in the last iteration. 108 */ 109 bool found; 110 111 }; 112 113 114 /** 115 * Head of active watches. 116 */ 117 static struct Watch *w_head; 118 119 /** 120 * Tail of active watches. 121 */ 122 static struct Watch *w_tail; 123 124 /** 125 * The merchant's configuration. 126 */ 127 static const struct GNUNET_CONFIGURATION_Handle *cfg; 128 129 /** 130 * Our database plugin. 131 */ 132 static struct TALER_MERCHANTDB_Plugin *db_plugin; 133 134 /** 135 * Handle to the context for interacting with the bank. 136 */ 137 static struct GNUNET_CURL_Context *ctx; 138 139 /** 140 * Scheduler context for running the @e ctx. 141 */ 142 static struct GNUNET_CURL_RescheduleContext *rc; 143 144 /** 145 * Event handler to learn that the configuration changed 146 * and we should shutdown (to be restarted). 147 */ 148 static struct GNUNET_DB_EventHandler *eh; 149 150 /** 151 * Value to return from main(). 0 on success, non-zero on errors. 152 */ 153 static int global_ret; 154 155 /** 156 * How many transactions should we fetch at most per batch? 157 */ 158 static unsigned int batch_size = 32; 159 160 /** 161 * #GNUNET_YES if we are in test mode and should exit when idle. 162 */ 163 static int test_mode; 164 165 /** 166 * #GNUNET_YES if we are in persistent mode and do 167 * not exit on #config_changed. 168 */ 169 static int persist_mode; 170 171 /** 172 * Set to true if we are shutting down due to a 173 * configuration change. 174 */ 175 static bool config_changed_flag; 176 177 /** 178 * Save progress in DB. 179 */ 180 static void 181 save (struct Watch *w) 182 { 183 enum GNUNET_DB_QueryStatus qs; 184 185 qs = db_plugin->update_wirewatch_progress (db_plugin->cls, 186 w->instance_id, 187 w->payto_uri, 188 w->start_row); 189 if (qs < 0) 190 { 191 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 192 "Failed to persist wirewatch progress for %s/%s (%d)\n", 193 w->instance_id, 194 w->payto_uri.full_payto, 195 qs); 196 GNUNET_SCHEDULER_shutdown (); 197 global_ret = EXIT_FAILURE; 198 } 199 } 200 201 202 /** 203 * Free resources of @a w. 204 * 205 * @param w watch job to terminate 206 */ 207 static void 208 end_watch (struct Watch *w) 209 { 210 if (NULL != w->task) 211 { 212 GNUNET_SCHEDULER_cancel (w->task); 213 w->task = NULL; 214 } 215 if (NULL != w->hh) 216 { 217 TALER_MERCHANT_BANK_credit_history_cancel (w->hh); 218 w->hh = NULL; 219 } 220 GNUNET_free (w->instance_id); 221 GNUNET_free (w->payto_uri.full_payto); 222 TALER_MERCHANT_BANK_auth_free (&w->ad); 223 GNUNET_CONTAINER_DLL_remove (w_head, 224 w_tail, 225 w); 226 GNUNET_free (w); 227 } 228 229 230 /** 231 * We're being aborted with CTRL-C (or SIGTERM). Shut down. 232 * 233 * @param cls closure 234 */ 235 static void 236 shutdown_task (void *cls) 237 { 238 (void) cls; 239 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 240 "Running shutdown\n"); 241 while (NULL != w_head) 242 { 243 struct Watch *w = w_head; 244 245 save (w); 246 end_watch (w); 247 } 248 if (NULL != eh) 249 { 250 db_plugin->event_listen_cancel (eh); 251 eh = NULL; 252 } 253 TALER_MERCHANTDB_plugin_unload (db_plugin); 254 db_plugin = NULL; 255 cfg = NULL; 256 if (NULL != ctx) 257 { 258 GNUNET_CURL_fini (ctx); 259 ctx = NULL; 260 } 261 if (NULL != rc) 262 { 263 GNUNET_CURL_gnunet_rc_destroy (rc); 264 rc = NULL; 265 } 266 } 267 268 269 /** 270 * Parse @a subject from wire transfer into @a wtid and @a exchange_url. 271 * 272 * @param subject wire transfer subject to parse; 273 * format is "$WTID $URL" 274 * @param[out] wtid wire transfer ID to extract 275 * @param[out] exchange_url set to exchange URL 276 * @return #GNUNET_OK on success 277 */ 278 static enum GNUNET_GenericReturnValue 279 parse_subject (const char *subject, 280 struct TALER_WireTransferIdentifierRawP *wtid, 281 char **exchange_url) 282 { 283 const char *space; 284 285 space = strchr (subject, ' '); 286 if (NULL == space) 287 return GNUNET_NO; 288 if (GNUNET_OK != 289 GNUNET_STRINGS_string_to_data (subject, 290 space - subject, 291 wtid, 292 sizeof (*wtid))) 293 return GNUNET_NO; 294 space++; 295 if (! TALER_url_valid_charset (space)) 296 return GNUNET_NO; 297 if ( (0 != strncasecmp ("http://", 298 space, 299 strlen ("http://"))) && 300 (0 != strncasecmp ("https://", 301 space, 302 strlen ("https://"))) ) 303 return GNUNET_NO; 304 *exchange_url = GNUNET_strdup (space); 305 return GNUNET_OK; 306 } 307 308 309 /** 310 * Run next iteration. 311 * 312 * @param cls a `struct Watch *` 313 */ 314 static void 315 do_work (void *cls); 316 317 318 /** 319 * Callbacks of this type are used to serve the result of asking 320 * the bank for the credit transaction history. 321 * 322 * @param cls a `struct Watch *` 323 * @param http_status HTTP response code, #MHD_HTTP_OK (200) for successful status request 324 * 0 if the bank's reply is bogus (fails to follow the protocol), 325 * #MHD_HTTP_NO_CONTENT if there are no more results; on success the 326 * last callback is always of this status (even if `abs(num_results)` were 327 * already returned). 328 * @param ec detailed error code 329 * @param serial_id monotonically increasing counter corresponding to the transaction 330 * @param details details about the wire transfer 331 * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration 332 */ 333 static enum GNUNET_GenericReturnValue 334 credit_cb ( 335 void *cls, 336 unsigned int http_status, 337 enum TALER_ErrorCode ec, 338 uint64_t serial_id, 339 const struct TALER_MERCHANT_BANK_CreditDetails *details) 340 { 341 struct Watch *w = cls; 342 343 switch (http_status) 344 { 345 case 0: 346 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 347 "Invalid HTTP response (HTTP status: 0, %d) from bank\n", 348 ec); 349 w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); 350 break; 351 case MHD_HTTP_OK: 352 { 353 enum GNUNET_DB_QueryStatus qs; 354 char *exchange_url; 355 struct TALER_WireTransferIdentifierRawP wtid; 356 357 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 358 "Received wire transfer `%s' over %s\n", 359 details->wire_subject, 360 TALER_amount2s (&details->amount)); 361 w->found = true; 362 if (GNUNET_OK != 363 parse_subject (details->wire_subject, 364 &wtid, 365 &exchange_url)) 366 { 367 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 368 "Skipping transfer %llu (%s): not from exchange\n", 369 (unsigned long long) serial_id, 370 details->wire_subject); 371 w->start_row = serial_id; 372 return GNUNET_OK; 373 } 374 /* FIXME-Performance-Optimization: consider grouping multiple inserts 375 into one bigger transaction with just one notify. */ 376 qs = db_plugin->insert_transfer (db_plugin->cls, 377 w->instance_id, 378 exchange_url, 379 &wtid, 380 &details->amount, 381 details->credit_account_uri, 382 serial_id); 383 GNUNET_free (exchange_url); 384 if (qs < 0) 385 { 386 GNUNET_break (0); 387 GNUNET_SCHEDULER_shutdown (); 388 w->hh = NULL; 389 return GNUNET_SYSERR; 390 } 391 /* Success => reset back-off timer! */ 392 w->delay = GNUNET_TIME_UNIT_ZERO; 393 { 394 struct GNUNET_DB_EventHeaderP es = { 395 .size = htons (sizeof (es)), 396 .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_CONFIRMED) 397 }; 398 399 db_plugin->event_notify (db_plugin->cls, 400 &es, 401 NULL, 402 0); 403 } 404 } 405 w->start_row = serial_id; 406 return GNUNET_OK; 407 case MHD_HTTP_NO_CONTENT: 408 save (w); 409 /* Delay artificially if server returned before long-poll timeout */ 410 if (! w->found) 411 w->delay = GNUNET_TIME_absolute_get_remaining (w->long_poll_timeout); 412 break; 413 case MHD_HTTP_NOT_FOUND: 414 /* configuration likely wrong, wait at least 1 minute, backoff up to 15 minutes! */ 415 w->delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MINUTES, 416 GNUNET_TIME_STD_BACKOFF (w->delay)); 417 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 418 "Bank claims account is unknown, waiting for %s before trying again\n", 419 GNUNET_TIME_relative2s (w->delay, 420 true)); 421 break; 422 case MHD_HTTP_GATEWAY_TIMEOUT: 423 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 424 "Gateway timeout, adjusting long polling threshold\n"); 425 /* Limit new timeout at request delay */ 426 w->bank_timeout 427 = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_duration ( 428 w->start_time), 429 w->bank_timeout); 430 /* set the timeout a bit earlier */ 431 w->bank_timeout 432 = GNUNET_TIME_relative_subtract (w->bank_timeout, 433 GNUNET_TIME_UNIT_SECONDS); 434 /* do not allow it to go to zero */ 435 w->bank_timeout 436 = GNUNET_TIME_relative_max (w->bank_timeout, 437 GNUNET_TIME_UNIT_SECONDS); 438 w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); 439 break; 440 default: 441 /* Something went wrong, try again, but with back-off */ 442 w->delay = GNUNET_TIME_STD_BACKOFF (w->delay); 443 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 444 "Unexpected HTTP status code %u(%d) from bank\n", 445 http_status, 446 ec); 447 break; 448 } 449 w->hh = NULL; 450 if (test_mode && (! w->found)) 451 { 452 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 453 "No transactions found and in test mode. Ending watch!\n"); 454 end_watch (w); 455 if (NULL == w_head) 456 GNUNET_SCHEDULER_shutdown (); 457 return GNUNET_OK; 458 } 459 w->task = GNUNET_SCHEDULER_add_delayed (w->delay, 460 &do_work, 461 w); 462 return GNUNET_OK; 463 } 464 465 466 static void 467 do_work (void *cls) 468 { 469 struct Watch *w = cls; 470 471 w->task = NULL; 472 w->found = false; 473 w->long_poll_timeout 474 = GNUNET_TIME_relative_to_absolute (w->bank_timeout); 475 w->start_time 476 = GNUNET_TIME_absolute_get (); 477 w->hh = TALER_MERCHANT_BANK_credit_history (ctx, 478 &w->ad, 479 w->start_row, 480 batch_size, 481 test_mode 482 ? GNUNET_TIME_UNIT_ZERO 483 : w->bank_timeout, 484 &credit_cb, 485 w); 486 if (NULL == w->hh) 487 { 488 GNUNET_break (0); 489 GNUNET_SCHEDULER_shutdown (); 490 return; 491 } 492 } 493 494 495 /** 496 * Function called with information about a accounts 497 * the wirewatcher should monitor. 498 * 499 * @param cls closure (NULL) 500 * @param instance instance that owns the account 501 * @param payto_uri account URI 502 * @param credit_facade_url URL for the credit facade 503 * @param credit_facade_credentials account access credentials 504 * @param last_serial last transaction serial (inclusive) we have seen from this account 505 */ 506 static void 507 start_watch ( 508 void *cls, 509 const char *instance, 510 struct TALER_FullPayto payto_uri, 511 const char *credit_facade_url, 512 const json_t *credit_facade_credentials, 513 uint64_t last_serial) 514 { 515 struct Watch *w = GNUNET_new (struct Watch); 516 517 (void) cls; 518 w->bank_timeout = BANK_TIMEOUT; 519 if (GNUNET_OK != 520 TALER_MERCHANT_BANK_auth_parse_json (credit_facade_credentials, 521 credit_facade_url, 522 &w->ad)) 523 { 524 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 525 "Failed to parse authentication data of `%s/%s'\n", 526 instance, 527 payto_uri.full_payto); 528 GNUNET_free (w); 529 GNUNET_SCHEDULER_shutdown (); 530 global_ret = EXIT_NOTCONFIGURED; 531 return; 532 } 533 534 GNUNET_CONTAINER_DLL_insert (w_head, 535 w_tail, 536 w); 537 w->instance_id = GNUNET_strdup (instance); 538 w->payto_uri.full_payto = GNUNET_strdup (payto_uri.full_payto); 539 w->start_row = last_serial; 540 w->task = GNUNET_SCHEDULER_add_now (&do_work, 541 w); 542 } 543 544 545 /** 546 * Function called on configuration change events received from Postgres. We 547 * shutdown (and systemd should restart us). 548 * 549 * @param cls closure (NULL) 550 * @param extra additional event data provided 551 * @param extra_size number of bytes in @a extra 552 */ 553 static void 554 config_changed (void *cls, 555 const void *extra, 556 size_t extra_size) 557 { 558 (void) cls; 559 (void) extra; 560 (void) extra_size; 561 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 562 "Configuration changed, %s\n", 563 0 == persist_mode 564 ? "restarting" 565 : "reinitializing"); 566 config_changed_flag = true; 567 GNUNET_SCHEDULER_shutdown (); 568 } 569 570 571 /** 572 * First task. 573 * 574 * @param cls closure, NULL 575 * @param args remaining command-line arguments 576 * @param cfgfile name of the configuration file used (for saving, can be NULL!) 577 * @param c configuration 578 */ 579 static void 580 run (void *cls, 581 char *const *args, 582 const char *cfgfile, 583 const struct GNUNET_CONFIGURATION_Handle *c) 584 { 585 (void) args; 586 (void) cfgfile; 587 588 cfg = c; 589 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 590 NULL); 591 ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, 592 &rc); 593 rc = GNUNET_CURL_gnunet_rc_create (ctx); 594 if (NULL == ctx) 595 { 596 GNUNET_break (0); 597 GNUNET_SCHEDULER_shutdown (); 598 global_ret = EXIT_FAILURE; 599 return; 600 } 601 if (NULL == 602 (db_plugin = TALER_MERCHANTDB_plugin_load (cfg))) 603 { 604 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 605 "Failed to initialize DB subsystem\n"); 606 GNUNET_SCHEDULER_shutdown (); 607 global_ret = EXIT_NOTCONFIGURED; 608 return; 609 } 610 if (GNUNET_OK != 611 db_plugin->connect (db_plugin->cls)) 612 { 613 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 614 "Failed to connect to database. Consider running taler-merchant-dbinit!\n"); 615 GNUNET_SCHEDULER_shutdown (); 616 global_ret = EXIT_FAILURE; 617 return; 618 } 619 { 620 struct GNUNET_DB_EventHeaderP es = { 621 .size = htons (sizeof (es)), 622 .type = htons (TALER_DBEVENT_MERCHANT_ACCOUNTS_CHANGED) 623 }; 624 625 eh = db_plugin->event_listen (db_plugin->cls, 626 &es, 627 GNUNET_TIME_UNIT_FOREVER_REL, 628 &config_changed, 629 NULL); 630 } 631 { 632 enum GNUNET_DB_QueryStatus qs; 633 634 qs = db_plugin->select_wirewatch_accounts (db_plugin->cls, 635 &start_watch, 636 NULL); 637 if (qs < 0) 638 { 639 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 640 "Failed to obtain wirewatch accounts from database\n"); 641 GNUNET_SCHEDULER_shutdown (); 642 global_ret = EXIT_NO_RESTART; 643 return; 644 } 645 if ( (NULL == w_head) && 646 (GNUNET_YES == test_mode) ) 647 { 648 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 649 "No active wirewatch accounts in database and in test mode. Exiting.\n"); 650 GNUNET_SCHEDULER_shutdown (); 651 global_ret = EXIT_SUCCESS; 652 return; 653 } 654 } 655 } 656 657 658 /** 659 * The main function of taler-merchant-wirewatch 660 * 661 * @param argc number of arguments from the command line 662 * @param argv command line arguments 663 * @return 0 ok, 1 on error 664 */ 665 int 666 main (int argc, 667 char *const *argv) 668 { 669 struct GNUNET_GETOPT_CommandLineOption options[] = { 670 GNUNET_GETOPT_option_flag ('p', 671 "persist", 672 "run in persist mode and do not exit on configuration changes", 673 &persist_mode), 674 GNUNET_GETOPT_option_timetravel ('T', 675 "timetravel"), 676 GNUNET_GETOPT_option_flag ('t', 677 "test", 678 "run in test mode and exit when idle", 679 &test_mode), 680 GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), 681 GNUNET_GETOPT_OPTION_END 682 }; 683 enum GNUNET_GenericReturnValue ret; 684 685 do { 686 config_changed_flag = false; 687 ret = GNUNET_PROGRAM_run ( 688 TALER_MERCHANT_project_data (), 689 argc, argv, 690 "taler-merchant-wirewatch", 691 gettext_noop ( 692 "background process that watches for incoming wire transfers to the merchant bank account"), 693 options, 694 &run, NULL); 695 } while ( (1 == persist_mode) && 696 config_changed_flag); 697 if (GNUNET_SYSERR == ret) 698 return EXIT_INVALIDARGUMENT; 699 if (GNUNET_NO == ret) 700 return EXIT_SUCCESS; 701 return global_ret; 702 } 703 704 705 /* end of taler-exchange-wirewatch.c */