taler-merchant-reconciliation.c (37602B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2023-2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file src/backend/taler-merchant-reconciliation.c 18 * @brief Process that reconciles information about incoming bank transfers with orders by asking the exchange 19 * @author Christian Grothoff 20 */ 21 #include "platform.h" 22 struct Inquiry; 23 #define TALER_EXCHANGE_GET_TRANSFERS_RESULT_CLOSURE struct Inquiry 24 #include "microhttpd.h" 25 #include <gnunet/gnunet_util_lib.h> 26 #include <jansson.h> 27 #include <pthread.h> 28 #include <taler/taler_dbevents.h> 29 #include "taler/taler_merchant_util.h" 30 #include "taler/taler_merchant_bank_lib.h" 31 #include "merchantdb_lib.h" 32 #include "merchantdb_lib.h" 33 #include "merchant-database/finalize_transfer_status.h" 34 #include "merchant-database/insert_transfer_details.h" 35 #include "merchant-database/lookup_deposits_by_contract_and_coin.h" 36 #include "merchant-database/lookup_wire_fee.h" 37 #include "merchant-database/select_exchange_keys.h" 38 #include "merchant-database/select_open_transfers.h" 39 #include "merchant-database/set_instance.h" 40 #include "merchant-database/update_transfer_status.h" 41 #include "merchant-database/event_listen.h" 42 #include "merchant-database/preflight.h" 43 44 /** 45 * Timeout for the exchange interaction. Rather long as we should do 46 * long-polling and do not want to wake up too often. 47 */ 48 #define EXCHANGE_TIMEOUT GNUNET_TIME_relative_multiply ( \ 49 GNUNET_TIME_UNIT_MINUTES, \ 50 30) 51 52 /** 53 * How many inquiries do we process concurrently at most. 54 */ 55 #define OPEN_INQUIRY_LIMIT 1024 56 57 /** 58 * How many inquiries do we process concurrently per exchange at most. 59 */ 60 #define EXCHANGE_INQUIRY_LIMIT 16 61 62 63 /** 64 * Information about an inquiry job. 65 */ 66 struct Inquiry; 67 68 69 /** 70 * Information about an exchange. 71 */ 72 struct Exchange 73 { 74 /** 75 * Kept in a DLL. 76 */ 77 struct Exchange *next; 78 79 /** 80 * Kept in a DLL. 81 */ 82 struct Exchange *prev; 83 84 /** 85 * Head of active inquiries. 86 */ 87 struct Inquiry *w_head; 88 89 /** 90 * Tail of active inquiries. 91 */ 92 struct Inquiry *w_tail; 93 94 /** 95 * Which exchange are we tracking here. 96 */ 97 char *exchange_url; 98 99 /** 100 * The keys of this exchange 101 */ 102 struct TALER_EXCHANGE_Keys *keys; 103 104 /** 105 * How many active inquiries do we have right now with this exchange. 106 */ 107 unsigned int exchange_inquiries; 108 109 /** 110 * How long should we wait between requests 111 * for transfer details? 112 */ 113 struct GNUNET_TIME_Relative transfer_delay; 114 115 }; 116 117 118 /** 119 * Information about an inquiry job. 120 */ 121 struct Inquiry 122 { 123 /** 124 * Kept in a DLL. 125 */ 126 struct Inquiry *next; 127 128 /** 129 * Kept in a DLL. 130 */ 131 struct Inquiry *prev; 132 133 /** 134 * Handle to the exchange that made the transfer. 135 */ 136 struct Exchange *exchange; 137 138 /** 139 * Task where we retry fetching transfer details from the exchange. 140 */ 141 struct GNUNET_SCHEDULER_Task *task; 142 143 /** 144 * For which merchant instance is this tracking request? 145 */ 146 char *instance_id; 147 148 /** 149 * payto:// URI used for the transfer. 150 */ 151 struct TALER_FullPayto payto_uri; 152 153 /** 154 * Handle for the GET /transfers request. 155 */ 156 struct TALER_EXCHANGE_GetTransfersHandle *wdh; 157 158 /** 159 * When did the transfer happen? 160 */ 161 struct GNUNET_TIME_Timestamp execution_time; 162 163 /** 164 * Argument for the /wire/transfers request. 165 */ 166 struct TALER_WireTransferIdentifierRawP wtid; 167 168 /** 169 * Row of the wire transfer in our database. 170 */ 171 uint64_t rowid; 172 173 }; 174 175 176 /** 177 * Head of known exchanges. 178 */ 179 static struct Exchange *e_head; 180 181 /** 182 * Tail of known exchanges. 183 */ 184 static struct Exchange *e_tail; 185 186 /** 187 * The merchant's configuration. 188 */ 189 static const struct GNUNET_CONFIGURATION_Handle *cfg; 190 191 /** 192 * Our database connection. 193 */ 194 static struct TALER_MERCHANTDB_PostgresContext *pg; 195 196 /** 197 * Handle to the context for interacting with the bank. 198 */ 199 static struct GNUNET_CURL_Context *ctx; 200 201 /** 202 * Scheduler context for running the @e ctx. 203 */ 204 static struct GNUNET_CURL_RescheduleContext *rc; 205 206 /** 207 * Main task for #find_work(). 208 */ 209 static struct GNUNET_SCHEDULER_Task *task; 210 211 /** 212 * Event handler to learn that there are new transfers 213 * to check. 214 */ 215 static struct GNUNET_DB_EventHandler *eh; 216 217 /** 218 * Event handler to learn that there may be new exchange 219 * keys to check. 220 */ 221 static struct GNUNET_DB_EventHandler *eh_keys; 222 223 /** 224 * How many active inquiries do we have right now. 225 */ 226 static unsigned int active_inquiries; 227 228 /** 229 * Set to true if we ever encountered any problem. 230 */ 231 static bool found_problem; 232 233 /** 234 * Value to return from main(). 0 on success, non-zero on errors. 235 */ 236 static int global_ret; 237 238 /** 239 * #GNUNET_YES if we are in test mode and should exit when idle. 240 */ 241 static int test_mode; 242 243 /** 244 * True if the last DB query was limited by the 245 * #OPEN_INQUIRY_LIMIT and we thus should check again 246 * as soon as we are substantially below that limit, 247 * and not only when we get a DB notification. 248 */ 249 static bool at_limit; 250 251 252 /** 253 * Initiate download from exchange. 254 * 255 * @param cls a `struct Inquiry *` 256 */ 257 static void 258 exchange_request (void *cls); 259 260 261 /** 262 * The exchange @a e is ready to handle more inquiries, 263 * prepare to launch them. 264 * 265 * @param[in,out] e exchange to potentially launch inquiries on 266 */ 267 static void 268 launch_inquiries_at_exchange (struct Exchange *e) 269 { 270 for (struct Inquiry *w = e->w_head; 271 NULL != w; 272 w = w->next) 273 { 274 if (e->exchange_inquiries >= EXCHANGE_INQUIRY_LIMIT) 275 break; 276 if ( (NULL == w->task) && 277 (NULL == w->wdh) ) 278 { 279 e->exchange_inquiries++; 280 w->task = GNUNET_SCHEDULER_add_now (&exchange_request, 281 w); 282 } 283 } 284 } 285 286 287 /** 288 * Updates the transaction status for inquiry @a w to the given values. 289 * 290 * @param w inquiry to update status for 291 * @param next_attempt when should we retry @a w (if ever) 292 * @param http_status HTTP status of the response 293 * @param ec error code to use (if any) 294 * @param last_hint hint delivered with the response (if any, possibly NULL) 295 * @param needs_retry true if we should try the HTTP request again 296 */ 297 static void 298 update_transaction_status (const struct Inquiry *w, 299 struct GNUNET_TIME_Absolute next_attempt, 300 unsigned int http_status, 301 enum TALER_ErrorCode ec, 302 const char *last_hint, 303 bool needs_retry) 304 { 305 enum GNUNET_DB_QueryStatus qs; 306 307 qs = TALER_MERCHANTDB_set_instance (pg, 308 w->instance_id); 309 if (qs <= 0) 310 { 311 GNUNET_break (0); 312 global_ret = EXIT_FAILURE; 313 GNUNET_SCHEDULER_shutdown (); 314 return; 315 } 316 qs = TALER_MERCHANTDB_update_transfer_status (pg, 317 w->exchange->exchange_url, 318 &w->wtid, 319 next_attempt, 320 http_status, 321 ec, 322 last_hint, 323 needs_retry); 324 GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == 325 TALER_MERCHANTDB_set_instance ( 326 pg, 327 NULL)); 328 if (qs < 0) 329 { 330 GNUNET_break (0); 331 global_ret = EXIT_FAILURE; 332 GNUNET_SCHEDULER_shutdown (); 333 return; 334 } 335 } 336 337 338 /** 339 * Interact with the database to get the current set 340 * of exchange keys known to us. 341 * 342 * @param e the exchange to check 343 */ 344 static void 345 sync_keys (struct Exchange *e) 346 { 347 enum GNUNET_DB_QueryStatus qs; 348 struct TALER_EXCHANGE_Keys *keys; 349 struct GNUNET_TIME_Absolute first_retry; 350 351 qs = TALER_MERCHANTDB_select_exchange_keys (pg, 352 e->exchange_url, 353 &first_retry, 354 &keys); 355 if (qs < 0) 356 { 357 GNUNET_break (0); 358 return; 359 } 360 if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) || 361 (NULL == keys) ) 362 { 363 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 364 "Cannot launch inquiries at `%s': lacking /keys response\n", 365 e->exchange_url); 366 return; 367 } 368 TALER_EXCHANGE_keys_decref (e->keys); 369 e->keys = keys; 370 launch_inquiries_at_exchange (e); 371 } 372 373 374 /** 375 * Lookup our internal data structure for the given 376 * @a exchange_url or create one if we do not yet have 377 * one. 378 * 379 * @param exchange_url base URL of the exchange 380 * @return our state for this exchange 381 */ 382 static struct Exchange * 383 find_exchange (const char *exchange_url) 384 { 385 struct Exchange *e; 386 387 for (e = e_head; NULL != e; e = e->next) 388 if (0 == strcmp (exchange_url, 389 e->exchange_url)) 390 return e; 391 e = GNUNET_new (struct Exchange); 392 e->exchange_url = GNUNET_strdup (exchange_url); 393 GNUNET_CONTAINER_DLL_insert (e_head, 394 e_tail, 395 e); 396 sync_keys (e); 397 return e; 398 } 399 400 401 /** 402 * Finds new transfers that require work in the merchant database. 403 * 404 * @param cls NULL 405 */ 406 static void 407 find_work (void *cls); 408 409 410 /** 411 * Free resources of @a w. 412 * 413 * @param[in] w inquiry job to terminate 414 */ 415 static void 416 end_inquiry (struct Inquiry *w) 417 { 418 struct Exchange *e = w->exchange; 419 420 GNUNET_assert (active_inquiries > 0); 421 active_inquiries--; 422 if (NULL != w->wdh) 423 { 424 TALER_EXCHANGE_get_transfers_cancel (w->wdh); 425 w->wdh = NULL; 426 } 427 GNUNET_free (w->instance_id); 428 GNUNET_free (w->payto_uri.full_payto); 429 GNUNET_CONTAINER_DLL_remove (e->w_head, 430 e->w_tail, 431 w); 432 GNUNET_free (w); 433 if ( (active_inquiries < OPEN_INQUIRY_LIMIT / 2) && 434 (NULL == task) && 435 (at_limit) ) 436 { 437 at_limit = false; 438 GNUNET_assert (NULL == task); 439 task = GNUNET_SCHEDULER_add_now (&find_work, 440 NULL); 441 } 442 if ( (NULL == task) && 443 (! at_limit) && 444 (0 == active_inquiries) && 445 (test_mode) ) 446 { 447 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 448 "No more open inquiries and in test mode. Exiting.\n"); 449 GNUNET_SCHEDULER_shutdown (); 450 return; 451 } 452 } 453 454 455 /** 456 * We're being aborted with CTRL-C (or SIGTERM). Shut down. 457 * 458 * @param cls closure (NULL) 459 */ 460 static void 461 shutdown_task (void *cls) 462 { 463 (void) cls; 464 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 465 "Running shutdown\n"); 466 while (NULL != e_head) 467 { 468 struct Exchange *e = e_head; 469 470 while (NULL != e->w_head) 471 { 472 struct Inquiry *w = e->w_head; 473 474 end_inquiry (w); 475 } 476 GNUNET_free (e->exchange_url); 477 if (NULL != e->keys) 478 { 479 TALER_EXCHANGE_keys_decref (e->keys); 480 e->keys = NULL; 481 } 482 GNUNET_CONTAINER_DLL_remove (e_head, 483 e_tail, 484 e); 485 GNUNET_free (e); 486 } 487 if (NULL != eh) 488 { 489 TALER_MERCHANTDB_event_listen_cancel (eh); 490 eh = NULL; 491 } 492 if (NULL != eh_keys) 493 { 494 TALER_MERCHANTDB_event_listen_cancel (eh_keys); 495 eh_keys = NULL; 496 } 497 if (NULL != task) 498 { 499 GNUNET_SCHEDULER_cancel (task); 500 task = NULL; 501 } 502 if (NULL != pg) 503 { 504 TALER_MERCHANTDB_disconnect (pg); 505 pg = NULL; 506 } 507 cfg = NULL; 508 if (NULL != ctx) 509 { 510 GNUNET_CURL_fini (ctx); 511 ctx = NULL; 512 } 513 if (NULL != rc) 514 { 515 GNUNET_CURL_gnunet_rc_destroy (rc); 516 rc = NULL; 517 } 518 } 519 520 521 /** 522 * Check that the given @a wire_fee is what the @a e should charge 523 * at the @a execution_time. If the fee is correct (according to our 524 * database), return #GNUNET_OK. If we do not have the fee structure in our 525 * DB, we just accept it and return #GNUNET_NO; if we have proof that the fee 526 * is bogus, we respond with the proof to the client and return 527 * #GNUNET_SYSERR. 528 * 529 * @param w inquiry to check fees of 530 * @param execution_time time of the wire transfer 531 * @param wire_fee fee claimed by the exchange 532 * @return #GNUNET_SYSERR if we returned hard proof of 533 * missbehavior from the exchange to the client 534 */ 535 static enum GNUNET_GenericReturnValue 536 check_wire_fee (struct Inquiry *w, 537 struct GNUNET_TIME_Timestamp execution_time, 538 const struct TALER_Amount *wire_fee) 539 { 540 struct Exchange *e = w->exchange; 541 const struct TALER_EXCHANGE_Keys *keys = e->keys; 542 struct TALER_WireFeeSet fees; 543 struct TALER_MasterSignatureP master_sig; 544 struct GNUNET_TIME_Timestamp start_date; 545 struct GNUNET_TIME_Timestamp end_date; 546 enum GNUNET_DB_QueryStatus qs; 547 char *wire_method; 548 549 if (NULL == keys) 550 { 551 GNUNET_break (0); 552 return GNUNET_NO; 553 } 554 wire_method = TALER_payto_get_method (w->payto_uri.full_payto); 555 qs = TALER_MERCHANTDB_lookup_wire_fee (pg, 556 &keys->master_pub, 557 wire_method, 558 execution_time, 559 &fees, 560 &start_date, 561 &end_date, 562 &master_sig); 563 switch (qs) 564 { 565 case GNUNET_DB_STATUS_HARD_ERROR: 566 GNUNET_break (0); 567 GNUNET_free (wire_method); 568 return GNUNET_SYSERR; 569 case GNUNET_DB_STATUS_SOFT_ERROR: 570 GNUNET_free (wire_method); 571 return GNUNET_NO; 572 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 573 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 574 "Failed to find wire fee for `%s' and method `%s' at %s in DB, accepting blindly that the fee is %s\n", 575 TALER_B2S (&keys->master_pub), 576 wire_method, 577 GNUNET_TIME_timestamp2s (execution_time), 578 TALER_amount2s (wire_fee)); 579 GNUNET_free (wire_method); 580 return GNUNET_OK; 581 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 582 break; 583 } 584 if ( (GNUNET_OK != 585 TALER_amount_cmp_currency (&fees.wire, 586 wire_fee)) || 587 (0 > TALER_amount_cmp (&fees.wire, 588 wire_fee)) ) 589 { 590 GNUNET_break_op (0); 591 GNUNET_free (wire_method); 592 return GNUNET_SYSERR; /* expected_fee >= wire_fee */ 593 } 594 GNUNET_free (wire_method); 595 return GNUNET_OK; 596 } 597 598 599 /** 600 * Closure for #check_transfer() 601 */ 602 struct CheckTransferContext 603 { 604 605 /** 606 * Pointer to the detail that we are currently 607 * checking in #check_transfer(). 608 */ 609 const struct TALER_TrackTransferDetails *current_detail; 610 611 /** 612 * Which transaction detail are we currently looking at? 613 */ 614 unsigned int current_offset; 615 616 /** 617 * #GNUNET_NO if we did not find a matching coin. 618 * #GNUNET_SYSERR if we found a matching coin, but the amounts do not match. 619 * #GNUNET_OK if we did find a matching coin. 620 */ 621 enum GNUNET_GenericReturnValue check_transfer_result; 622 623 /** 624 * Set to error code, if any. 625 */ 626 enum TALER_ErrorCode ec; 627 628 /** 629 * Set to true if @e ec indicates a permanent failure. 630 */ 631 bool failure; 632 }; 633 634 635 /** 636 * This function checks that the information about the coin which 637 * was paid back by _this_ wire transfer matches what _we_ (the merchant) 638 * knew about this coin. 639 * 640 * @param cls closure with our `struct CheckTransferContext *` 641 * @param exchange_url URL of the exchange that issued @a coin_pub 642 * @param amount_with_fee amount the exchange will transfer for this coin 643 * @param deposit_fee fee the exchange will charge for this coin 644 * @param refund_fee fee the exchange will charge for refunding this coin 645 * @param wire_fee paid wire fee 646 * @param h_wire hash of merchant's wire details 647 * @param deposit_timestamp when did the exchange receive the deposit 648 * @param refund_deadline until when are refunds allowed 649 * @param exchange_sig signature by the exchange 650 * @param exchange_pub exchange signing key used for @a exchange_sig 651 */ 652 static void 653 check_transfer (void *cls, 654 const char *exchange_url, 655 const struct TALER_Amount *amount_with_fee, 656 const struct TALER_Amount *deposit_fee, 657 const struct TALER_Amount *refund_fee, 658 const struct TALER_Amount *wire_fee, 659 const struct TALER_MerchantWireHashP *h_wire, 660 struct GNUNET_TIME_Timestamp deposit_timestamp, 661 struct GNUNET_TIME_Timestamp refund_deadline, 662 const struct TALER_ExchangeSignatureP *exchange_sig, 663 const struct TALER_ExchangePublicKeyP *exchange_pub) 664 { 665 struct CheckTransferContext *ctc = cls; 666 const struct TALER_TrackTransferDetails *ttd = ctc->current_detail; 667 668 if (GNUNET_SYSERR == ctc->check_transfer_result) 669 { 670 GNUNET_break (0); 671 return; /* already had a serious issue; odd that we're called more than once as well... */ 672 } 673 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 674 "Checking coin with value %s\n", 675 TALER_amount2s (amount_with_fee)); 676 if ( (GNUNET_OK != 677 TALER_amount_cmp_currency (amount_with_fee, 678 &ttd->coin_value)) || 679 (0 != TALER_amount_cmp (amount_with_fee, 680 &ttd->coin_value)) ) 681 { 682 /* Disagreement between the exchange and us about how much this 683 coin is worth! */ 684 GNUNET_break_op (0); 685 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 686 "Disagreement about coin value %s\n", 687 TALER_amount2s (amount_with_fee)); 688 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 689 "Exchange gave it a value of %s\n", 690 TALER_amount2s (&ttd->coin_value)); 691 ctc->check_transfer_result = GNUNET_SYSERR; 692 /* Build the `TrackTransferConflictDetails` */ 693 ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; 694 ctc->failure = true; 695 /* FIXME-#9426: this should be reported to the auditor (once the auditor has an API for this) */ 696 return; 697 } 698 if ( (GNUNET_OK != 699 TALER_amount_cmp_currency (deposit_fee, 700 &ttd->coin_fee)) || 701 (0 != TALER_amount_cmp (deposit_fee, 702 &ttd->coin_fee)) ) 703 { 704 /* Disagreement between the exchange and us about how much this 705 coin is worth! */ 706 GNUNET_break_op (0); 707 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 708 "Expected fee is %s\n", 709 TALER_amount2s (&ttd->coin_fee)); 710 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 711 "Fee claimed by exchange is %s\n", 712 TALER_amount2s (deposit_fee)); 713 ctc->check_transfer_result = GNUNET_SYSERR; 714 /* Build the `TrackTransferConflictDetails` */ 715 ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; 716 ctc->failure = true; 717 /* FIXME-#9426: this should be reported to the auditor (once the auditor has an API for this) */ 718 return; 719 } 720 ctc->check_transfer_result = GNUNET_OK; 721 } 722 723 724 /** 725 * Function called with detailed wire transfer data, including all 726 * of the coin transactions that were combined into the wire transfer. 727 * 728 * @param cls closure a `struct Inquiry *` 729 * @param tgr response details 730 */ 731 static void 732 wire_transfer_cb (struct Inquiry *w, 733 const struct TALER_EXCHANGE_GetTransfersResponse *tgr) 734 { 735 struct Exchange *e = w->exchange; 736 const struct TALER_EXCHANGE_TransferData *td = NULL; 737 738 e->exchange_inquiries--; 739 w->wdh = NULL; 740 if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries) 741 launch_inquiries_at_exchange (e); 742 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 743 "Got response code %u from exchange for GET /transfers/$WTID\n", 744 tgr->hr.http_status); 745 switch (tgr->hr.http_status) 746 { 747 case MHD_HTTP_OK: 748 td = &tgr->details.ok.td; 749 w->execution_time = td->execution_time; 750 e->transfer_delay = GNUNET_TIME_UNIT_ZERO; 751 break; 752 case MHD_HTTP_BAD_REQUEST: 753 case MHD_HTTP_FORBIDDEN: 754 case MHD_HTTP_NOT_FOUND: 755 found_problem = true; 756 update_transaction_status (w, 757 GNUNET_TIME_UNIT_FOREVER_ABS, 758 tgr->hr.http_status, 759 tgr->hr.ec, 760 tgr->hr.hint, 761 false); 762 end_inquiry (w); 763 return; 764 case MHD_HTTP_INTERNAL_SERVER_ERROR: 765 case MHD_HTTP_BAD_GATEWAY: 766 case MHD_HTTP_GATEWAY_TIMEOUT: 767 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 768 update_transaction_status (w, 769 GNUNET_TIME_relative_to_absolute ( 770 e->transfer_delay), 771 tgr->hr.http_status, 772 tgr->hr.ec, 773 tgr->hr.hint, 774 true); 775 end_inquiry (w); 776 return; 777 default: 778 found_problem = true; 779 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 780 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 781 "Unexpected HTTP status %u\n", 782 tgr->hr.http_status); 783 update_transaction_status (w, 784 GNUNET_TIME_relative_to_absolute ( 785 e->transfer_delay), 786 tgr->hr.http_status, 787 tgr->hr.ec, 788 tgr->hr.hint, 789 true); 790 end_inquiry (w); 791 return; 792 } 793 TALER_MERCHANTDB_preflight (pg); 794 795 { 796 enum GNUNET_DB_QueryStatus qs; 797 798 qs = TALER_MERCHANTDB_set_instance (pg, 799 w->instance_id); 800 if (0 > qs) 801 { 802 /* Always report on DB error as well to enable diagnostics */ 803 GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs); 804 global_ret = EXIT_FAILURE; 805 GNUNET_SCHEDULER_shutdown (); 806 return; 807 } 808 qs = TALER_MERCHANTDB_insert_transfer_details (pg, 809 w->instance_id, 810 w->exchange->exchange_url, 811 w->payto_uri, 812 &w->wtid, 813 td); 814 if (0 > qs) 815 { 816 /* Always report on DB error as well to enable diagnostics */ 817 GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs); 818 global_ret = EXIT_FAILURE; 819 GNUNET_SCHEDULER_shutdown (); 820 return; 821 } 822 // FIXME: insert_transfer_details has more complex 823 // error possibilities inside, expose them here 824 // and persist them with the transaction status 825 // if they arise (especially no_account, no_exchange, conflict) 826 // -- not sure how no_instance could happen... 827 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) 828 { 829 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 830 "Transfer already known. Ignoring duplicate.\n"); 831 GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == 832 TALER_MERCHANTDB_set_instance ( 833 pg, 834 NULL)); 835 return; 836 } 837 } 838 839 { 840 struct CheckTransferContext ctc = { 841 .ec = TALER_EC_NONE, 842 .failure = false 843 }; 844 845 for (unsigned int i = 0; i<td->details_length; i++) 846 { 847 const struct TALER_TrackTransferDetails *ttd = &td->details[i]; 848 enum GNUNET_DB_QueryStatus qs; 849 850 if (TALER_EC_NONE != ctc.ec) 851 break; /* already encountered an error */ 852 ctc.current_offset = i; 853 ctc.current_detail = ttd; 854 /* Set the coin as "never seen" before. */ 855 ctc.check_transfer_result = GNUNET_NO; 856 qs = TALER_MERCHANTDB_lookup_deposits_by_contract_and_coin ( 857 pg, 858 w->instance_id, 859 &ttd->h_contract_terms, 860 &ttd->coin_pub, 861 &check_transfer, 862 &ctc); 863 switch (qs) 864 { 865 case GNUNET_DB_STATUS_SOFT_ERROR: 866 GNUNET_break (0); 867 ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; 868 break; 869 case GNUNET_DB_STATUS_HARD_ERROR: 870 GNUNET_break (0); 871 ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; 872 break; 873 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 874 /* The exchange says we made this deposit, but WE do not 875 recall making it (corrupted / unreliable database?)! 876 Well, let's say thanks and accept the money! */ 877 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 878 "Failed to find payment data in DB\n"); 879 ctc.check_transfer_result = GNUNET_OK; 880 break; 881 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 882 break; 883 } 884 switch (ctc.check_transfer_result) 885 { 886 case GNUNET_NO: 887 /* Internal error: how can we have called #check_transfer() 888 but still have no result? */ 889 GNUNET_break (0); 890 ctc.ec = TALER_EC_GENERIC_INTERNAL_INVARIANT_FAILURE; 891 GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == 892 TALER_MERCHANTDB_set_instance ( 893 pg, 894 NULL)); 895 return; 896 case GNUNET_SYSERR: 897 /* #check_transfer() failed, report conflict! */ 898 GNUNET_break_op (0); 899 GNUNET_assert (TALER_EC_NONE != ctc.ec); 900 break; 901 case GNUNET_OK: 902 break; 903 } 904 } 905 if (TALER_EC_NONE != ctc.ec) 906 { 907 update_transaction_status ( 908 w, 909 ctc.failure 910 ? GNUNET_TIME_UNIT_FOREVER_ABS 911 : GNUNET_TIME_relative_to_absolute ( 912 GNUNET_TIME_UNIT_MINUTES), 913 MHD_HTTP_OK, 914 ctc.ec, 915 NULL /* no hint */, 916 ! ctc.failure); 917 GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == 918 TALER_MERCHANTDB_set_instance ( 919 pg, 920 NULL)); 921 end_inquiry (w); 922 return; 923 } 924 } 925 926 if (GNUNET_SYSERR == 927 check_wire_fee (w, 928 td->execution_time, 929 &td->wire_fee)) 930 { 931 GNUNET_break_op (0); 932 update_transaction_status (w, 933 GNUNET_TIME_UNIT_FOREVER_ABS, 934 MHD_HTTP_OK, 935 TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_BAD_WIRE_FEE, 936 TALER_amount2s (&td->wire_fee), 937 false); 938 GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == 939 TALER_MERCHANTDB_set_instance ( 940 pg, 941 NULL)); 942 end_inquiry (w); 943 return; 944 } 945 946 { 947 enum GNUNET_DB_QueryStatus qs; 948 949 qs = TALER_MERCHANTDB_finalize_transfer_status (pg, 950 w->exchange->exchange_url, 951 &w->wtid, 952 &td->h_details, 953 &td->total_amount, 954 &td->wire_fee, 955 &td->exchange_pub, 956 &td->exchange_sig); 957 if (qs < 0) 958 { 959 GNUNET_break (0); 960 global_ret = EXIT_FAILURE; 961 GNUNET_SCHEDULER_shutdown (); 962 return; 963 } 964 } 965 GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT == 966 TALER_MERCHANTDB_set_instance ( 967 pg, 968 NULL)); 969 end_inquiry (w); 970 } 971 972 973 /** 974 * Initiate download from an exchange for a given inquiry. 975 * 976 * @param cls a `struct Inquiry *` 977 */ 978 static void 979 exchange_request (void *cls) 980 { 981 struct Inquiry *w = cls; 982 struct Exchange *e = w->exchange; 983 984 w->task = NULL; 985 if (NULL == e->keys) 986 return; 987 w->wdh = TALER_EXCHANGE_get_transfers_create ( 988 ctx, 989 e->exchange_url, 990 e->keys, 991 &w->wtid); 992 if (NULL == w->wdh) 993 { 994 GNUNET_break (0); 995 e->exchange_inquiries--; 996 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 997 update_transaction_status (w, 998 GNUNET_TIME_relative_to_absolute ( 999 e->transfer_delay), 1000 0 /* failed to begin */, 1001 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, 1002 "Failed to initiate GET request at exchange", 1003 true); 1004 end_inquiry (w); 1005 return; 1006 } 1007 GNUNET_assert (TALER_EC_NONE == 1008 TALER_EXCHANGE_get_transfers_start (w->wdh, 1009 &wire_transfer_cb, 1010 w)); 1011 1012 /* Wait at least 1m for the network transfer */ 1013 update_transaction_status (w, 1014 GNUNET_TIME_relative_to_absolute ( 1015 GNUNET_TIME_UNIT_MINUTES), 1016 0 /* timeout */, 1017 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_LIST, 1018 "Initiated GET with exchange", 1019 true); 1020 } 1021 1022 1023 /** 1024 * Function called with information about a transfer we 1025 * should ask the exchange about. 1026 * 1027 * @param cls closure (NULL) 1028 * @param rowid row of the transfer in the merchant database 1029 * @param instance_id instance that received the transfer 1030 * @param exchange_url base URL of the exchange that initiated the transfer 1031 * @param payto_uri account of the merchant that received the transfer 1032 * @param wtid wire transfer subject identifying the aggregation 1033 * @param next_attempt when should we next try to interact with the exchange 1034 */ 1035 static void 1036 start_inquiry ( 1037 void *cls, 1038 uint64_t rowid, 1039 const char *instance_id, 1040 const char *exchange_url, 1041 struct TALER_FullPayto payto_uri, 1042 const struct TALER_WireTransferIdentifierRawP *wtid, 1043 struct GNUNET_TIME_Absolute next_attempt) 1044 { 1045 struct Exchange *e; 1046 struct Inquiry *w; 1047 1048 (void) cls; 1049 if (GNUNET_TIME_absolute_is_future (next_attempt)) 1050 { 1051 if (NULL == task) 1052 task = GNUNET_SCHEDULER_add_at (next_attempt, 1053 &find_work, 1054 NULL); 1055 return; 1056 } 1057 active_inquiries++; 1058 1059 e = find_exchange (exchange_url); 1060 for (w = e->w_head; NULL != w; w = w->next) 1061 { 1062 if (0 == GNUNET_memcmp (&w->wtid, 1063 wtid)) 1064 { 1065 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1066 "Already processing inquiry. Aborting ongoing inquiry\n"); 1067 end_inquiry (w); 1068 break; 1069 } 1070 } 1071 1072 w = GNUNET_new (struct Inquiry); 1073 w->payto_uri.full_payto = GNUNET_strdup (payto_uri.full_payto); 1074 w->instance_id = GNUNET_strdup (instance_id); 1075 w->rowid = rowid; 1076 w->wtid = *wtid; 1077 GNUNET_CONTAINER_DLL_insert (e->w_head, 1078 e->w_tail, 1079 w); 1080 w->exchange = e; 1081 if (NULL != w->exchange->keys) 1082 w->task = GNUNET_SCHEDULER_add_now (&exchange_request, 1083 w); 1084 /* Wait at least 1 minute for /keys */ 1085 update_transaction_status (w, 1086 GNUNET_TIME_relative_to_absolute ( 1087 GNUNET_TIME_UNIT_MINUTES), 1088 0 /* timeout */, 1089 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_KEYS, 1090 exchange_url, 1091 true); 1092 } 1093 1094 1095 static void 1096 find_work (void *cls) 1097 { 1098 enum GNUNET_DB_QueryStatus qs; 1099 int limit; 1100 1101 (void) cls; 1102 task = NULL; 1103 GNUNET_assert (OPEN_INQUIRY_LIMIT >= active_inquiries); 1104 limit = OPEN_INQUIRY_LIMIT - active_inquiries; 1105 if (0 == limit) 1106 { 1107 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1108 "Not looking for work: at limit\n"); 1109 at_limit = true; 1110 return; 1111 } 1112 at_limit = false; 1113 qs = TALER_MERCHANTDB_select_open_transfers (pg, 1114 limit, 1115 &start_inquiry, 1116 NULL); 1117 if (qs < 0) 1118 { 1119 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1120 "Failed to obtain open transfers from database\n"); 1121 GNUNET_SCHEDULER_shutdown (); 1122 return; 1123 } 1124 if (qs >= limit) 1125 { 1126 /* DB limited response, re-trigger DB interaction 1127 the moment we significantly fall below the 1128 limit */ 1129 at_limit = true; 1130 } 1131 if (0 == active_inquiries) 1132 { 1133 if (test_mode) 1134 { 1135 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1136 "No more open inquiries and in test mode. Existing.\n"); 1137 GNUNET_SCHEDULER_shutdown (); 1138 return; 1139 } 1140 GNUNET_log ( 1141 GNUNET_ERROR_TYPE_INFO, 1142 "No open inquiries found, waiting for notification to resume\n"); 1143 } 1144 } 1145 1146 1147 /** 1148 * Function called when transfers are added to the merchant database. We look 1149 * for more work. 1150 * 1151 * @param cls closure (NULL) 1152 * @param extra additional event data provided 1153 * @param extra_size number of bytes in @a extra 1154 */ 1155 static void 1156 transfer_added (void *cls, 1157 const void *extra, 1158 size_t extra_size) 1159 { 1160 (void) cls; 1161 (void) extra; 1162 (void) extra_size; 1163 if (active_inquiries > OPEN_INQUIRY_LIMIT / 2) 1164 { 1165 /* Trigger DB only once we are substantially below the limit */ 1166 at_limit = true; 1167 return; 1168 } 1169 if (NULL != task) 1170 return; 1171 task = GNUNET_SCHEDULER_add_now (&find_work, 1172 NULL); 1173 } 1174 1175 1176 /** 1177 * Function called when keys were changed in the 1178 * merchant database. Updates ours. 1179 * 1180 * @param cls closure (NULL) 1181 * @param extra additional event data provided 1182 * @param extra_size number of bytes in @a extra 1183 */ 1184 static void 1185 keys_changed (void *cls, 1186 const void *extra, 1187 size_t extra_size) 1188 { 1189 const char *url = extra; 1190 struct Exchange *e; 1191 1192 (void) cls; 1193 if ( (NULL == extra) || 1194 (0 == extra_size) ) 1195 { 1196 GNUNET_break (0); 1197 return; 1198 } 1199 if ('\0' != url[extra_size - 1]) 1200 { 1201 GNUNET_break (0); 1202 return; 1203 } 1204 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1205 "Received keys change notification: reload `%s'\n", 1206 url); 1207 e = find_exchange (url); 1208 sync_keys (e); 1209 } 1210 1211 1212 /** 1213 * First task. 1214 * 1215 * @param cls closure, NULL 1216 * @param args remaining command-line arguments 1217 * @param cfgfile name of the configuration file used (for saving, can be NULL!) 1218 * @param c configuration 1219 */ 1220 static void 1221 run (void *cls, 1222 char *const *args, 1223 const char *cfgfile, 1224 const struct GNUNET_CONFIGURATION_Handle *c) 1225 { 1226 (void) args; 1227 (void) cfgfile; 1228 1229 cfg = c; 1230 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 1231 NULL); 1232 ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, 1233 &rc); 1234 rc = GNUNET_CURL_gnunet_rc_create (ctx); 1235 if (NULL == ctx) 1236 { 1237 GNUNET_break (0); 1238 GNUNET_SCHEDULER_shutdown (); 1239 global_ret = EXIT_FAILURE; 1240 return; 1241 } 1242 if (NULL == 1243 (pg = TALER_MERCHANTDB_connect (cfg))) 1244 { 1245 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1246 "Failed to initialize DB subsystem. Consider running taler-merchant-dbconfig!\n"); 1247 GNUNET_SCHEDULER_shutdown (); 1248 global_ret = EXIT_FAILURE; 1249 return; 1250 } 1251 { 1252 struct GNUNET_DB_EventHeaderP es = { 1253 .size = htons (sizeof (es)), 1254 .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_EXPECTED) 1255 }; 1256 1257 eh = TALER_MERCHANTDB_event_listen (pg, 1258 &es, 1259 GNUNET_TIME_UNIT_FOREVER_REL, 1260 &transfer_added, 1261 NULL); 1262 } 1263 { 1264 struct GNUNET_DB_EventHeaderP es = { 1265 .size = htons (sizeof (es)), 1266 .type = htons (TALER_DBEVENT_MERCHANT_EXCHANGE_KEYS) 1267 }; 1268 1269 eh_keys 1270 = TALER_MERCHANTDB_event_listen (pg, 1271 &es, 1272 GNUNET_TIME_UNIT_FOREVER_REL, 1273 &keys_changed, 1274 NULL); 1275 } 1276 1277 GNUNET_assert (NULL == task); 1278 task = GNUNET_SCHEDULER_add_now (&find_work, 1279 NULL); 1280 } 1281 1282 1283 /** 1284 * The main function of taler-merchant-reconciliation 1285 * 1286 * @param argc number of arguments from the command line 1287 * @param argv command line arguments 1288 * @return 0 ok, 1 on error 1289 */ 1290 int 1291 main (int argc, 1292 char *const *argv) 1293 { 1294 struct GNUNET_GETOPT_CommandLineOption options[] = { 1295 GNUNET_GETOPT_option_timetravel ('T', 1296 "timetravel"), 1297 GNUNET_GETOPT_option_flag ('t', 1298 "test", 1299 "run in test mode and exit when idle", 1300 &test_mode), 1301 GNUNET_GETOPT_option_version (VERSION), 1302 GNUNET_GETOPT_OPTION_END 1303 }; 1304 enum GNUNET_GenericReturnValue ret; 1305 1306 ret = GNUNET_PROGRAM_run ( 1307 TALER_MERCHANT_project_data (), 1308 argc, argv, 1309 "taler-merchant-reconciliation", 1310 gettext_noop ( 1311 "background process that reconciles bank transfers with orders by asking the exchange"), 1312 options, 1313 &run, NULL); 1314 if (GNUNET_SYSERR == ret) 1315 return EXIT_INVALIDARGUMENT; 1316 if (GNUNET_NO == ret) 1317 return EXIT_SUCCESS; 1318 if ( (found_problem) && 1319 (0 == global_ret) ) 1320 global_ret = 7; 1321 return global_ret; 1322 } 1323 1324 1325 /* end of taler-merchant-reconciliation.c */