taler-merchant-reconciliation.c (35187B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2023-2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file taler-merchant-reconciliation.c 18 * @brief Process that reconciles information about incoming bank transfers with orders by asking the exchange 19 * @author Christian Grothoff 20 */ 21 #include "platform.h" 22 #include "microhttpd.h" 23 #include <gnunet/gnunet_util_lib.h> 24 #include <jansson.h> 25 #include <pthread.h> 26 #include <taler/taler_dbevents.h> 27 #include "taler_merchant_util.h" 28 #include "taler_merchant_bank_lib.h" 29 #include "taler_merchantdb_lib.h" 30 #include "taler_merchantdb_plugin.h" 31 32 /** 33 * Timeout for the exchange interaction. Rather long as we should do 34 * long-polling and do not want to wake up too often. 35 */ 36 #define EXCHANGE_TIMEOUT GNUNET_TIME_relative_multiply ( \ 37 GNUNET_TIME_UNIT_MINUTES, \ 38 30) 39 40 /** 41 * How many inquiries do we process concurrently at most. 42 */ 43 #define OPEN_INQUIRY_LIMIT 1024 44 45 /** 46 * How many inquiries do we process concurrently per exchange at most. 47 */ 48 #define EXCHANGE_INQUIRY_LIMIT 16 49 50 51 /** 52 * Information about an inquiry job. 53 */ 54 struct Inquiry; 55 56 57 /** 58 * Information about an exchange. 59 */ 60 struct Exchange 61 { 62 /** 63 * Kept in a DLL. 64 */ 65 struct Exchange *next; 66 67 /** 68 * Kept in a DLL. 69 */ 70 struct Exchange *prev; 71 72 /** 73 * Head of active inquiries. 74 */ 75 struct Inquiry *w_head; 76 77 /** 78 * Tail of active inquiries. 79 */ 80 struct Inquiry *w_tail; 81 82 /** 83 * Which exchange are we tracking here. 84 */ 85 char *exchange_url; 86 87 /** 88 * The keys of this exchange 89 */ 90 struct TALER_EXCHANGE_Keys *keys; 91 92 /** 93 * How many active inquiries do we have right now with this exchange. 94 */ 95 unsigned int exchange_inquiries; 96 97 /** 98 * How long should we wait between requests 99 * for transfer details? 100 */ 101 struct GNUNET_TIME_Relative transfer_delay; 102 103 }; 104 105 106 /** 107 * Information about an inquiry job. 108 */ 109 struct Inquiry 110 { 111 /** 112 * Kept in a DLL. 113 */ 114 struct Inquiry *next; 115 116 /** 117 * Kept in a DLL. 118 */ 119 struct Inquiry *prev; 120 121 /** 122 * Handle to the exchange that made the transfer. 123 */ 124 struct Exchange *exchange; 125 126 /** 127 * Task where we retry fetching transfer details from the exchange. 128 */ 129 struct GNUNET_SCHEDULER_Task *task; 130 131 /** 132 * For which merchant instance is this tracking request? 133 */ 134 char *instance_id; 135 136 /** 137 * payto:// URI used for the transfer. 138 */ 139 struct TALER_FullPayto payto_uri; 140 141 /** 142 * Handle for the /wire/transfers request. 143 */ 144 struct TALER_EXCHANGE_TransfersGetHandle *wdh; 145 146 /** 147 * When did the transfer happen? 148 */ 149 struct GNUNET_TIME_Timestamp execution_time; 150 151 /** 152 * Argument for the /wire/transfers request. 153 */ 154 struct TALER_WireTransferIdentifierRawP wtid; 155 156 /** 157 * Row of the wire transfer in our database. 158 */ 159 uint64_t rowid; 160 161 }; 162 163 164 /** 165 * Head of known exchanges. 166 */ 167 static struct Exchange *e_head; 168 169 /** 170 * Tail of known exchanges. 171 */ 172 static struct Exchange *e_tail; 173 174 /** 175 * The merchant's configuration. 176 */ 177 static const struct GNUNET_CONFIGURATION_Handle *cfg; 178 179 /** 180 * Our database plugin. 181 */ 182 static struct TALER_MERCHANTDB_Plugin *db_plugin; 183 184 /** 185 * Handle to the context for interacting with the bank. 186 */ 187 static struct GNUNET_CURL_Context *ctx; 188 189 /** 190 * Scheduler context for running the @e ctx. 191 */ 192 static struct GNUNET_CURL_RescheduleContext *rc; 193 194 /** 195 * Main task for #find_work(). 196 */ 197 static struct GNUNET_SCHEDULER_Task *task; 198 199 /** 200 * Event handler to learn that there are new transfers 201 * to check. 202 */ 203 static struct GNUNET_DB_EventHandler *eh; 204 205 /** 206 * Event handler to learn that there may be new exchange 207 * keys to check. 208 */ 209 static struct GNUNET_DB_EventHandler *eh_keys; 210 211 /** 212 * How many active inquiries do we have right now. 213 */ 214 static unsigned int active_inquiries; 215 216 /** 217 * Set to true if we ever encountered any problem. 218 */ 219 static bool found_problem; 220 221 /** 222 * Value to return from main(). 0 on success, non-zero on errors. 223 */ 224 static int global_ret; 225 226 /** 227 * #GNUNET_YES if we are in test mode and should exit when idle. 228 */ 229 static int test_mode; 230 231 /** 232 * True if the last DB query was limited by the 233 * #OPEN_INQUIRY_LIMIT and we thus should check again 234 * as soon as we are substantially below that limit, 235 * and not only when we get a DB notification. 236 */ 237 static bool at_limit; 238 239 240 /** 241 * Initiate download from exchange. 242 * 243 * @param cls a `struct Inquiry *` 244 */ 245 static void 246 exchange_request (void *cls); 247 248 249 /** 250 * The exchange @a e is ready to handle more inquiries, 251 * prepare to launch them. 252 * 253 * @param[in,out] e exchange to potentially launch inquiries on 254 */ 255 static void 256 launch_inquiries_at_exchange (struct Exchange *e) 257 { 258 for (struct Inquiry *w = e->w_head; 259 NULL != w; 260 w = w->next) 261 { 262 if (e->exchange_inquiries > EXCHANGE_INQUIRY_LIMIT) 263 break; 264 if ( (NULL == w->task) && 265 (NULL == w->wdh) ) 266 { 267 e->exchange_inquiries++; 268 w->task = GNUNET_SCHEDULER_add_now (&exchange_request, 269 w); 270 } 271 } 272 } 273 274 275 /** 276 * Updates the transaction status for inquiry @a w to the given values. 277 * 278 * @param w inquiry to update status for 279 * @param next_attempt when should we retry @a w (if ever) 280 * @param http_status HTTP status of the response 281 * @param ec error code to use (if any) 282 * @param last_hint hint delivered with the response (if any, possibly NULL) 283 * @param needs_retry true if we should try the HTTP request again 284 */ 285 static void 286 update_transaction_status (const struct Inquiry *w, 287 struct GNUNET_TIME_Absolute next_attempt, 288 unsigned int http_status, 289 enum TALER_ErrorCode ec, 290 const char *last_hint, 291 bool needs_retry) 292 { 293 enum GNUNET_DB_QueryStatus qs; 294 295 qs = db_plugin->update_transfer_status (db_plugin->cls, 296 w->exchange->exchange_url, 297 &w->wtid, 298 next_attempt, 299 http_status, 300 ec, 301 last_hint, 302 needs_retry); 303 if (qs < 0) 304 { 305 GNUNET_break (0); 306 global_ret = EXIT_FAILURE; 307 GNUNET_SCHEDULER_shutdown (); 308 return; 309 } 310 } 311 312 313 /** 314 * Interact with the database to get the current set 315 * of exchange keys known to us. 316 * 317 * @param e the exchange to check 318 */ 319 static void 320 sync_keys (struct Exchange *e) 321 { 322 enum GNUNET_DB_QueryStatus qs; 323 struct TALER_EXCHANGE_Keys *keys; 324 struct GNUNET_TIME_Absolute first_retry; 325 326 qs = db_plugin->select_exchange_keys (db_plugin->cls, 327 e->exchange_url, 328 &first_retry, 329 &keys); 330 if (qs < 0) 331 { 332 GNUNET_break (0); 333 return; 334 } 335 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) 336 { 337 GNUNET_break (0); 338 return; 339 } 340 TALER_EXCHANGE_keys_decref (e->keys); 341 e->keys = keys; 342 launch_inquiries_at_exchange (e); 343 } 344 345 346 /** 347 * Lookup our internal data structure for the given 348 * @a exchange_url or create one if we do not yet have 349 * one. 350 * 351 * @param exchange_url base URL of the exchange 352 * @return our state for this exchange 353 */ 354 static struct Exchange * 355 find_exchange (const char *exchange_url) 356 { 357 struct Exchange *e; 358 359 for (e = e_head; NULL != e; e = e->next) 360 if (0 == strcmp (exchange_url, 361 e->exchange_url)) 362 return e; 363 e = GNUNET_new (struct Exchange); 364 e->exchange_url = GNUNET_strdup (exchange_url); 365 GNUNET_CONTAINER_DLL_insert (e_head, 366 e_tail, 367 e); 368 sync_keys (e); 369 return e; 370 } 371 372 373 /** 374 * Finds new transfers that require work in the merchant database. 375 * 376 * @param cls NULL 377 */ 378 static void 379 find_work (void *cls); 380 381 382 /** 383 * Free resources of @a w. 384 * 385 * @param[in] w inquiry job to terminate 386 */ 387 static void 388 end_inquiry (struct Inquiry *w) 389 { 390 struct Exchange *e = w->exchange; 391 392 GNUNET_assert (active_inquiries > 0); 393 active_inquiries--; 394 if (NULL != w->wdh) 395 { 396 TALER_EXCHANGE_transfers_get_cancel (w->wdh); 397 w->wdh = NULL; 398 } 399 GNUNET_free (w->instance_id); 400 GNUNET_free (w->payto_uri.full_payto); 401 GNUNET_CONTAINER_DLL_remove (e->w_head, 402 e->w_tail, 403 w); 404 GNUNET_free (w); 405 if ( (active_inquiries < OPEN_INQUIRY_LIMIT / 2) && 406 (NULL == task) && 407 (at_limit) ) 408 { 409 at_limit = false; 410 GNUNET_assert (NULL == task); 411 task = GNUNET_SCHEDULER_add_now (&find_work, 412 NULL); 413 } 414 if ( (NULL == task) && 415 (! at_limit) && 416 (0 == active_inquiries) && 417 (test_mode) ) 418 { 419 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 420 "No more open inquiries and in test mode. Existing.\n"); 421 GNUNET_SCHEDULER_shutdown (); 422 return; 423 } 424 } 425 426 427 /** 428 * We're being aborted with CTRL-C (or SIGTERM). Shut down. 429 * 430 * @param cls closure (NULL) 431 */ 432 static void 433 shutdown_task (void *cls) 434 { 435 (void) cls; 436 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 437 "Running shutdown\n"); 438 while (NULL != e_head) 439 { 440 struct Exchange *e = e_head; 441 442 while (NULL != e->w_head) 443 { 444 struct Inquiry *w = e->w_head; 445 446 end_inquiry (w); 447 } 448 GNUNET_free (e->exchange_url); 449 if (NULL != e->keys) 450 { 451 TALER_EXCHANGE_keys_decref (e->keys); 452 e->keys = NULL; 453 } 454 GNUNET_CONTAINER_DLL_remove (e_head, 455 e_tail, 456 e); 457 GNUNET_free (e); 458 } 459 if (NULL != eh) 460 { 461 db_plugin->event_listen_cancel (eh); 462 eh = NULL; 463 } 464 if (NULL != eh_keys) 465 { 466 db_plugin->event_listen_cancel (eh_keys); 467 eh_keys = NULL; 468 } 469 if (NULL != task) 470 { 471 GNUNET_SCHEDULER_cancel (task); 472 task = NULL; 473 } 474 TALER_MERCHANTDB_plugin_unload (db_plugin); 475 db_plugin = NULL; 476 cfg = NULL; 477 if (NULL != ctx) 478 { 479 GNUNET_CURL_fini (ctx); 480 ctx = NULL; 481 } 482 if (NULL != rc) 483 { 484 GNUNET_CURL_gnunet_rc_destroy (rc); 485 rc = NULL; 486 } 487 } 488 489 490 /** 491 * Check that the given @a wire_fee is what the @a e should charge 492 * at the @a execution_time. If the fee is correct (according to our 493 * database), return #GNUNET_OK. If we do not have the fee structure in our 494 * DB, we just accept it and return #GNUNET_NO; if we have proof that the fee 495 * is bogus, we respond with the proof to the client and return 496 * #GNUNET_SYSERR. 497 * 498 * @param w inquiry to check fees of 499 * @param execution_time time of the wire transfer 500 * @param wire_fee fee claimed by the exchange 501 * @return #GNUNET_SYSERR if we returned hard proof of 502 * missbehavior from the exchange to the client 503 */ 504 static enum GNUNET_GenericReturnValue 505 check_wire_fee (struct Inquiry *w, 506 struct GNUNET_TIME_Timestamp execution_time, 507 const struct TALER_Amount *wire_fee) 508 { 509 struct Exchange *e = w->exchange; 510 const struct TALER_EXCHANGE_Keys *keys = e->keys; 511 struct TALER_WireFeeSet fees; 512 struct TALER_MasterSignatureP master_sig; 513 struct GNUNET_TIME_Timestamp start_date; 514 struct GNUNET_TIME_Timestamp end_date; 515 enum GNUNET_DB_QueryStatus qs; 516 char *wire_method; 517 518 if (NULL == keys) 519 { 520 GNUNET_break (0); 521 return GNUNET_NO; 522 } 523 wire_method = TALER_payto_get_method (w->payto_uri.full_payto); 524 qs = db_plugin->lookup_wire_fee (db_plugin->cls, 525 &keys->master_pub, 526 wire_method, 527 execution_time, 528 &fees, 529 &start_date, 530 &end_date, 531 &master_sig); 532 switch (qs) 533 { 534 case GNUNET_DB_STATUS_HARD_ERROR: 535 GNUNET_break (0); 536 GNUNET_free (wire_method); 537 return GNUNET_SYSERR; 538 case GNUNET_DB_STATUS_SOFT_ERROR: 539 GNUNET_free (wire_method); 540 return GNUNET_NO; 541 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 542 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 543 "Failed to find wire fee for `%s' and method `%s' at %s in DB, accepting blindly that the fee is %s\n", 544 TALER_B2S (&keys->master_pub), 545 wire_method, 546 GNUNET_TIME_timestamp2s (execution_time), 547 TALER_amount2s (wire_fee)); 548 GNUNET_free (wire_method); 549 return GNUNET_OK; 550 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 551 break; 552 } 553 if ( (GNUNET_OK != 554 TALER_amount_cmp_currency (&fees.wire, 555 wire_fee)) || 556 (0 > TALER_amount_cmp (&fees.wire, 557 wire_fee)) ) 558 { 559 GNUNET_break_op (0); 560 GNUNET_free (wire_method); 561 return GNUNET_SYSERR; /* expected_fee >= wire_fee */ 562 } 563 GNUNET_free (wire_method); 564 return GNUNET_OK; 565 } 566 567 568 /** 569 * Closure for #check_transfer() 570 */ 571 struct CheckTransferContext 572 { 573 574 /** 575 * Pointer to the detail that we are currently 576 * checking in #check_transfer(). 577 */ 578 const struct TALER_TrackTransferDetails *current_detail; 579 580 /** 581 * Which transaction detail are we currently looking at? 582 */ 583 unsigned int current_offset; 584 585 /** 586 * #GNUNET_NO if we did not find a matching coin. 587 * #GNUNET_SYSERR if we found a matching coin, but the amounts do not match. 588 * #GNUNET_OK if we did find a matching coin. 589 */ 590 enum GNUNET_GenericReturnValue check_transfer_result; 591 592 /** 593 * Set to error code, if any. 594 */ 595 enum TALER_ErrorCode ec; 596 597 /** 598 * Set to true if @e ec indicates a permanent failure. 599 */ 600 bool failure; 601 }; 602 603 604 /** 605 * This function checks that the information about the coin which 606 * was paid back by _this_ wire transfer matches what _we_ (the merchant) 607 * knew about this coin. 608 * 609 * @param cls closure with our `struct CheckTransferContext *` 610 * @param exchange_url URL of the exchange that issued @a coin_pub 611 * @param amount_with_fee amount the exchange will transfer for this coin 612 * @param deposit_fee fee the exchange will charge for this coin 613 * @param refund_fee fee the exchange will charge for refunding this coin 614 * @param wire_fee paid wire fee 615 * @param h_wire hash of merchant's wire details 616 * @param deposit_timestamp when did the exchange receive the deposit 617 * @param refund_deadline until when are refunds allowed 618 * @param exchange_sig signature by the exchange 619 * @param exchange_pub exchange signing key used for @a exchange_sig 620 */ 621 static void 622 check_transfer (void *cls, 623 const char *exchange_url, 624 const struct TALER_Amount *amount_with_fee, 625 const struct TALER_Amount *deposit_fee, 626 const struct TALER_Amount *refund_fee, 627 const struct TALER_Amount *wire_fee, 628 const struct TALER_MerchantWireHashP *h_wire, 629 struct GNUNET_TIME_Timestamp deposit_timestamp, 630 struct GNUNET_TIME_Timestamp refund_deadline, 631 const struct TALER_ExchangeSignatureP *exchange_sig, 632 const struct TALER_ExchangePublicKeyP *exchange_pub) 633 { 634 struct CheckTransferContext *ctc = cls; 635 const struct TALER_TrackTransferDetails *ttd = ctc->current_detail; 636 637 if (GNUNET_SYSERR == ctc->check_transfer_result) 638 { 639 GNUNET_break (0); 640 return; /* already had a serious issue; odd that we're called more than once as well... */ 641 } 642 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 643 "Checking coin with value %s\n", 644 TALER_amount2s (amount_with_fee)); 645 if ( (GNUNET_OK != 646 TALER_amount_cmp_currency (amount_with_fee, 647 &ttd->coin_value)) || 648 (0 != TALER_amount_cmp (amount_with_fee, 649 &ttd->coin_value)) ) 650 { 651 /* Disagreement between the exchange and us about how much this 652 coin is worth! */ 653 GNUNET_break_op (0); 654 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 655 "Disagreement about coin value %s\n", 656 TALER_amount2s (amount_with_fee)); 657 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 658 "Exchange gave it a value of %s\n", 659 TALER_amount2s (&ttd->coin_value)); 660 ctc->check_transfer_result = GNUNET_SYSERR; 661 /* Build the `TrackTransferConflictDetails` */ 662 ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; 663 ctc->failure = true; 664 /* FIXME-#9426: this should be reported to the auditor (once the auditor has an API for this) */ 665 return; 666 } 667 if ( (GNUNET_OK != 668 TALER_amount_cmp_currency (deposit_fee, 669 &ttd->coin_fee)) || 670 (0 != TALER_amount_cmp (deposit_fee, 671 &ttd->coin_fee)) ) 672 { 673 /* Disagreement between the exchange and us about how much this 674 coin is worth! */ 675 GNUNET_break_op (0); 676 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 677 "Expected fee is %s\n", 678 TALER_amount2s (&ttd->coin_fee)); 679 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 680 "Fee claimed by exchange is %s\n", 681 TALER_amount2s (deposit_fee)); 682 ctc->check_transfer_result = GNUNET_SYSERR; 683 /* Build the `TrackTransferConflictDetails` */ 684 ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; 685 ctc->failure = true; 686 /* FIXME-#9426: this should be reported to the auditor (once the auditor has an API for this) */ 687 return; 688 } 689 ctc->check_transfer_result = GNUNET_OK; 690 } 691 692 693 /** 694 * Function called with detailed wire transfer data, including all 695 * of the coin transactions that were combined into the wire transfer. 696 * 697 * @param cls closure a `struct Inquiry *` 698 * @param tgr response details 699 */ 700 static void 701 wire_transfer_cb (void *cls, 702 const struct TALER_EXCHANGE_TransfersGetResponse *tgr) 703 { 704 struct Inquiry *w = cls; 705 struct Exchange *e = w->exchange; 706 const struct TALER_EXCHANGE_TransferData *td = NULL; 707 708 e->exchange_inquiries--; 709 w->wdh = NULL; 710 if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries) 711 launch_inquiries_at_exchange (e); 712 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 713 "Got response code %u from exchange for GET /transfers/$WTID\n", 714 tgr->hr.http_status); 715 switch (tgr->hr.http_status) 716 { 717 case MHD_HTTP_OK: 718 td = &tgr->details.ok.td; 719 w->execution_time = td->execution_time; 720 e->transfer_delay = GNUNET_TIME_UNIT_ZERO; 721 break; 722 case MHD_HTTP_BAD_REQUEST: 723 case MHD_HTTP_FORBIDDEN: 724 case MHD_HTTP_NOT_FOUND: 725 found_problem = true; 726 update_transaction_status (w, 727 GNUNET_TIME_UNIT_FOREVER_ABS, 728 tgr->hr.http_status, 729 tgr->hr.ec, 730 tgr->hr.hint, 731 false); 732 end_inquiry (w); 733 return; 734 case MHD_HTTP_INTERNAL_SERVER_ERROR: 735 case MHD_HTTP_BAD_GATEWAY: 736 case MHD_HTTP_GATEWAY_TIMEOUT: 737 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 738 update_transaction_status (w, 739 GNUNET_TIME_relative_to_absolute ( 740 e->transfer_delay), 741 tgr->hr.http_status, 742 tgr->hr.ec, 743 tgr->hr.hint, 744 true); 745 end_inquiry (w); 746 return; 747 default: 748 found_problem = true; 749 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 750 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 751 "Unexpected HTTP status %u\n", 752 tgr->hr.http_status); 753 update_transaction_status (w, 754 GNUNET_TIME_relative_to_absolute ( 755 e->transfer_delay), 756 tgr->hr.http_status, 757 tgr->hr.ec, 758 tgr->hr.hint, 759 true); 760 end_inquiry (w); 761 return; 762 } 763 db_plugin->preflight (db_plugin->cls); 764 765 { 766 enum GNUNET_DB_QueryStatus qs; 767 768 qs = db_plugin->insert_transfer_details (db_plugin->cls, 769 w->instance_id, 770 w->exchange->exchange_url, 771 w->payto_uri, 772 &w->wtid, 773 td); 774 if (0 > qs) 775 { 776 /* Always report on DB error as well to enable diagnostics */ 777 GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs); 778 global_ret = EXIT_FAILURE; 779 GNUNET_SCHEDULER_shutdown (); 780 return; 781 } 782 // FIXME: insert_transfer_details has more complex 783 // error possibilities inside, expose them here 784 // and persist them with the transaction status 785 // if they arise (especially no_account, no_exchange, conflict) 786 // -- not sure how no_instance could happen... 787 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) 788 { 789 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 790 "Transfer already known. Ignoring duplicate.\n"); 791 return; 792 } 793 } 794 795 { 796 struct CheckTransferContext ctc = { 797 .ec = TALER_EC_NONE, 798 .failure = false 799 }; 800 801 for (unsigned int i = 0; i<td->details_length; i++) 802 { 803 const struct TALER_TrackTransferDetails *ttd = &td->details[i]; 804 enum GNUNET_DB_QueryStatus qs; 805 806 if (TALER_EC_NONE != ctc.ec) 807 break; /* already encountered an error */ 808 ctc.current_offset = i; 809 ctc.current_detail = ttd; 810 /* Set the coin as "never seen" before. */ 811 ctc.check_transfer_result = GNUNET_NO; 812 qs = db_plugin->lookup_deposits_by_contract_and_coin ( 813 db_plugin->cls, 814 w->instance_id, 815 &ttd->h_contract_terms, 816 &ttd->coin_pub, 817 &check_transfer, 818 &ctc); 819 switch (qs) 820 { 821 case GNUNET_DB_STATUS_SOFT_ERROR: 822 GNUNET_break (0); 823 ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; 824 break; 825 case GNUNET_DB_STATUS_HARD_ERROR: 826 GNUNET_break (0); 827 ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; 828 break; 829 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 830 /* The exchange says we made this deposit, but WE do not 831 recall making it (corrupted / unreliable database?)! 832 Well, let's say thanks and accept the money! */ 833 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 834 "Failed to find payment data in DB\n"); 835 ctc.check_transfer_result = GNUNET_OK; 836 break; 837 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 838 break; 839 } 840 switch (ctc.check_transfer_result) 841 { 842 case GNUNET_NO: 843 /* Internal error: how can we have called #check_transfer() 844 but still have no result? */ 845 GNUNET_break (0); 846 ctc.ec = TALER_EC_GENERIC_INTERNAL_INVARIANT_FAILURE; 847 return; 848 case GNUNET_SYSERR: 849 /* #check_transfer() failed, report conflict! */ 850 GNUNET_break_op (0); 851 GNUNET_assert (TALER_EC_NONE != ctc.ec); 852 break; 853 case GNUNET_OK: 854 break; 855 } 856 } 857 if (TALER_EC_NONE != ctc.ec) 858 { 859 update_transaction_status ( 860 w, 861 ctc.failure 862 ? GNUNET_TIME_UNIT_FOREVER_ABS 863 : GNUNET_TIME_relative_to_absolute ( 864 GNUNET_TIME_UNIT_MINUTES), 865 MHD_HTTP_OK, 866 ctc.ec, 867 NULL /* no hint */, 868 ! ctc.failure); 869 end_inquiry (w); 870 return; 871 } 872 } 873 874 if (GNUNET_SYSERR == 875 check_wire_fee (w, 876 td->execution_time, 877 &td->wire_fee)) 878 { 879 GNUNET_break_op (0); 880 update_transaction_status (w, 881 GNUNET_TIME_UNIT_FOREVER_ABS, 882 MHD_HTTP_OK, 883 TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_BAD_WIRE_FEE, 884 TALER_amount2s (&td->wire_fee), 885 false); 886 end_inquiry (w); 887 return; 888 } 889 890 { 891 enum GNUNET_DB_QueryStatus qs; 892 893 qs = db_plugin->finalize_transfer_status (db_plugin->cls, 894 w->exchange->exchange_url, 895 &w->wtid, 896 &td->h_details, 897 &td->total_amount, 898 &td->wire_fee, 899 &td->exchange_pub, 900 &td->exchange_sig); 901 if (qs < 0) 902 { 903 GNUNET_break (0); 904 global_ret = EXIT_FAILURE; 905 GNUNET_SCHEDULER_shutdown (); 906 return; 907 } 908 } 909 end_inquiry (w); 910 } 911 912 913 /** 914 * Initiate download from an exchange for a given inquiry. 915 * 916 * @param cls a `struct Inquiry *` 917 */ 918 static void 919 exchange_request (void *cls) 920 { 921 struct Inquiry *w = cls; 922 struct Exchange *e = w->exchange; 923 924 w->task = NULL; 925 if (NULL == e->keys) 926 return; 927 w->wdh = TALER_EXCHANGE_transfers_get ( 928 ctx, 929 e->exchange_url, 930 e->keys, 931 &w->wtid, 932 &wire_transfer_cb, 933 w); 934 if (NULL == w->wdh) 935 { 936 GNUNET_break (0); 937 e->exchange_inquiries--; 938 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 939 update_transaction_status (w, 940 GNUNET_TIME_relative_to_absolute ( 941 e->transfer_delay), 942 0 /* failed to begin */, 943 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, 944 "Failed to initiate GET request at exchange", 945 true); 946 end_inquiry (w); 947 return; 948 } 949 /* Wait at least 1m for the network transfer */ 950 update_transaction_status (w, 951 GNUNET_TIME_relative_to_absolute ( 952 GNUNET_TIME_UNIT_MINUTES), 953 0 /* timeout */, 954 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_LIST, 955 "Initiated GET with exchange", 956 true); 957 } 958 959 960 /** 961 * Function called with information about a transfer we 962 * should ask the exchange about. 963 * 964 * @param cls closure (NULL) 965 * @param rowid row of the transfer in the merchant database 966 * @param instance_id instance that received the transfer 967 * @param exchange_url base URL of the exchange that initiated the transfer 968 * @param payto_uri account of the merchant that received the transfer 969 * @param wtid wire transfer subject identifying the aggregation 970 * @param next_attempt when should we next try to interact with the exchange 971 */ 972 static void 973 start_inquiry ( 974 void *cls, 975 uint64_t rowid, 976 const char *instance_id, 977 const char *exchange_url, 978 struct TALER_FullPayto payto_uri, 979 const struct TALER_WireTransferIdentifierRawP *wtid, 980 struct GNUNET_TIME_Absolute next_attempt) 981 { 982 struct Exchange *e; 983 struct Inquiry *w; 984 985 (void) cls; 986 if (GNUNET_TIME_absolute_is_future (next_attempt)) 987 { 988 if (NULL == task) 989 task = GNUNET_SCHEDULER_add_at (next_attempt, 990 &find_work, 991 NULL); 992 return; 993 } 994 active_inquiries++; 995 996 e = find_exchange (exchange_url); 997 for (w = e->w_head; NULL != w; w = w->next) 998 { 999 if (0 == GNUNET_memcmp (&w->wtid, 1000 wtid)) 1001 { 1002 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1003 "Already processing inquiry. Aborting ongoing inquiry\n"); 1004 end_inquiry (w); 1005 break; 1006 } 1007 } 1008 1009 w = GNUNET_new (struct Inquiry); 1010 w->payto_uri.full_payto = GNUNET_strdup (payto_uri.full_payto); 1011 w->instance_id = GNUNET_strdup (instance_id); 1012 w->rowid = rowid; 1013 w->wtid = *wtid; 1014 GNUNET_CONTAINER_DLL_insert (e->w_head, 1015 e->w_tail, 1016 w); 1017 w->exchange = e; 1018 if (NULL != w->exchange->keys) 1019 w->task = GNUNET_SCHEDULER_add_now (&exchange_request, 1020 w); 1021 /* Wait at least 1 minute for /keys */ 1022 update_transaction_status (w, 1023 GNUNET_TIME_relative_to_absolute ( 1024 GNUNET_TIME_UNIT_MINUTES), 1025 0 /* timeout */, 1026 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_KEYS, 1027 exchange_url, 1028 true); 1029 } 1030 1031 1032 static void 1033 find_work (void *cls) 1034 { 1035 enum GNUNET_DB_QueryStatus qs; 1036 int limit; 1037 1038 (void) cls; 1039 task = NULL; 1040 GNUNET_assert (OPEN_INQUIRY_LIMIT >= active_inquiries); 1041 limit = OPEN_INQUIRY_LIMIT - active_inquiries; 1042 if (0 == limit) 1043 { 1044 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1045 "Not looking for work: at limit\n"); 1046 at_limit = true; 1047 return; 1048 } 1049 at_limit = false; 1050 qs = db_plugin->select_open_transfers (db_plugin->cls, 1051 limit, 1052 &start_inquiry, 1053 NULL); 1054 if (qs < 0) 1055 { 1056 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1057 "Failed to obtain open transfers from database\n"); 1058 GNUNET_SCHEDULER_shutdown (); 1059 return; 1060 } 1061 if (qs == limit) 1062 { 1063 /* DB limited response, re-trigger DB interaction 1064 the moment we significantly fall below the 1065 limit */ 1066 at_limit = true; 1067 } 1068 if (0 == active_inquiries) 1069 { 1070 if (test_mode) 1071 { 1072 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1073 "No more open inquiries and in test mode. Existing.\n"); 1074 GNUNET_SCHEDULER_shutdown (); 1075 return; 1076 } 1077 GNUNET_log ( 1078 GNUNET_ERROR_TYPE_INFO, 1079 "No open inquiries found, waiting for notification to resume\n"); 1080 } 1081 } 1082 1083 1084 /** 1085 * Function called when transfers are added to the merchant database. We look 1086 * for more work. 1087 * 1088 * @param cls closure (NULL) 1089 * @param extra additional event data provided 1090 * @param extra_size number of bytes in @a extra 1091 */ 1092 static void 1093 transfer_added (void *cls, 1094 const void *extra, 1095 size_t extra_size) 1096 { 1097 (void) cls; 1098 (void) extra; 1099 (void) extra_size; 1100 if (active_inquiries > OPEN_INQUIRY_LIMIT / 2) 1101 { 1102 /* Trigger DB only once we are substantially below the limit */ 1103 at_limit = true; 1104 return; 1105 } 1106 if (NULL != task) 1107 return; 1108 task = GNUNET_SCHEDULER_add_now (&find_work, 1109 NULL); 1110 } 1111 1112 1113 /** 1114 * Function called when keys were changed in the 1115 * merchant database. Updates ours. 1116 * 1117 * @param cls closure (NULL) 1118 * @param extra additional event data provided 1119 * @param extra_size number of bytes in @a extra 1120 */ 1121 static void 1122 keys_changed (void *cls, 1123 const void *extra, 1124 size_t extra_size) 1125 { 1126 const char *url = extra; 1127 struct Exchange *e; 1128 1129 (void) cls; 1130 if ( (NULL == extra) || 1131 (0 == extra_size) ) 1132 { 1133 GNUNET_break (0); 1134 return; 1135 } 1136 if ('\0' != url[extra_size - 1]) 1137 { 1138 GNUNET_break (0); 1139 return; 1140 } 1141 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1142 "Received keys change notification: reload `%s'\n", 1143 url); 1144 e = find_exchange (url); 1145 sync_keys (e); 1146 } 1147 1148 1149 /** 1150 * First task. 1151 * 1152 * @param cls closure, NULL 1153 * @param args remaining command-line arguments 1154 * @param cfgfile name of the configuration file used (for saving, can be NULL!) 1155 * @param c configuration 1156 */ 1157 static void 1158 run (void *cls, 1159 char *const *args, 1160 const char *cfgfile, 1161 const struct GNUNET_CONFIGURATION_Handle *c) 1162 { 1163 (void) args; 1164 (void) cfgfile; 1165 1166 cfg = c; 1167 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 1168 NULL); 1169 ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, 1170 &rc); 1171 rc = GNUNET_CURL_gnunet_rc_create (ctx); 1172 if (NULL == ctx) 1173 { 1174 GNUNET_break (0); 1175 GNUNET_SCHEDULER_shutdown (); 1176 global_ret = EXIT_FAILURE; 1177 return; 1178 } 1179 if (NULL == 1180 (db_plugin = TALER_MERCHANTDB_plugin_load (cfg))) 1181 { 1182 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1183 "Failed to initialize DB subsystem\n"); 1184 GNUNET_SCHEDULER_shutdown (); 1185 global_ret = EXIT_NOTCONFIGURED; 1186 return; 1187 } 1188 if (GNUNET_OK != 1189 db_plugin->connect (db_plugin->cls)) 1190 { 1191 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1192 "Failed to connect to database. Consider running taler-merchant-dbinit!\n"); 1193 GNUNET_SCHEDULER_shutdown (); 1194 global_ret = EXIT_FAILURE; 1195 return; 1196 } 1197 { 1198 struct GNUNET_DB_EventHeaderP es = { 1199 .size = htons (sizeof (es)), 1200 .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_EXPECTED) 1201 }; 1202 1203 eh = db_plugin->event_listen (db_plugin->cls, 1204 &es, 1205 GNUNET_TIME_UNIT_FOREVER_REL, 1206 &transfer_added, 1207 NULL); 1208 } 1209 { 1210 struct GNUNET_DB_EventHeaderP es = { 1211 .size = htons (sizeof (es)), 1212 .type = htons (TALER_DBEVENT_MERCHANT_EXCHANGE_KEYS) 1213 }; 1214 1215 eh_keys 1216 = db_plugin->event_listen (db_plugin->cls, 1217 &es, 1218 GNUNET_TIME_UNIT_FOREVER_REL, 1219 &keys_changed, 1220 NULL); 1221 } 1222 1223 GNUNET_assert (NULL == task); 1224 task = GNUNET_SCHEDULER_add_now (&find_work, 1225 NULL); 1226 } 1227 1228 1229 /** 1230 * The main function of taler-merchant-reconciliation 1231 * 1232 * @param argc number of arguments from the command line 1233 * @param argv command line arguments 1234 * @return 0 ok, 1 on error 1235 */ 1236 int 1237 main (int argc, 1238 char *const *argv) 1239 { 1240 struct GNUNET_GETOPT_CommandLineOption options[] = { 1241 GNUNET_GETOPT_option_timetravel ('T', 1242 "timetravel"), 1243 GNUNET_GETOPT_option_flag ('t', 1244 "test", 1245 "run in test mode and exit when idle", 1246 &test_mode), 1247 GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), 1248 GNUNET_GETOPT_OPTION_END 1249 }; 1250 enum GNUNET_GenericReturnValue ret; 1251 1252 ret = GNUNET_PROGRAM_run ( 1253 TALER_MERCHANT_project_data (), 1254 argc, argv, 1255 "taler-merchant-reconciliation", 1256 gettext_noop ( 1257 "background process that reconciles bank transfers with orders by asking the exchange"), 1258 options, 1259 &run, NULL); 1260 if (GNUNET_SYSERR == ret) 1261 return EXIT_INVALIDARGUMENT; 1262 if (GNUNET_NO == ret) 1263 return EXIT_SUCCESS; 1264 if ( (found_problem) && 1265 (0 == global_ret) ) 1266 global_ret = 7; 1267 return global_ret; 1268 } 1269 1270 1271 /* end of taler-merchant-reconciliation.c */