taler-merchant-reconciliation.c (35580B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2023-2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file taler-merchant-reconciliation.c 18 * @brief Process that reconciles information about incoming bank transfers with orders by asking the exchange 19 * @author Christian Grothoff 20 */ 21 #include "taler/platform.h" 22 #include "microhttpd.h" 23 #include <gnunet/gnunet_util_lib.h> 24 #include <jansson.h> 25 #include <pthread.h> 26 #include <taler/taler_dbevents.h> 27 #include "taler/taler_merchant_util.h" 28 #include "taler/taler_merchant_bank_lib.h" 29 #include "taler/taler_merchantdb_lib.h" 30 #include "taler/taler_merchantdb_plugin.h" 31 32 /** 33 * Timeout for the exchange interaction. Rather long as we should do 34 * long-polling and do not want to wake up too often. 35 */ 36 #define EXCHANGE_TIMEOUT GNUNET_TIME_relative_multiply ( \ 37 GNUNET_TIME_UNIT_MINUTES, \ 38 30) 39 40 /** 41 * How many inquiries do we process concurrently at most. 42 */ 43 #define OPEN_INQUIRY_LIMIT 1024 44 45 /** 46 * How many inquiries do we process concurrently per exchange at most. 47 */ 48 #define EXCHANGE_INQUIRY_LIMIT 16 49 50 51 /** 52 * Information about an inquiry job. 53 */ 54 struct Inquiry; 55 56 57 /** 58 * Information about an exchange. 59 */ 60 struct Exchange 61 { 62 /** 63 * Kept in a DLL. 64 */ 65 struct Exchange *next; 66 67 /** 68 * Kept in a DLL. 69 */ 70 struct Exchange *prev; 71 72 /** 73 * Head of active inquiries. 74 */ 75 struct Inquiry *w_head; 76 77 /** 78 * Tail of active inquiries. 79 */ 80 struct Inquiry *w_tail; 81 82 /** 83 * Which exchange are we tracking here. 84 */ 85 char *exchange_url; 86 87 /** 88 * The keys of this exchange 89 */ 90 struct TALER_EXCHANGE_Keys *keys; 91 92 /** 93 * How many active inquiries do we have right now with this exchange. 94 */ 95 unsigned int exchange_inquiries; 96 97 /** 98 * How long should we wait between requests 99 * for transfer details? 100 */ 101 struct GNUNET_TIME_Relative transfer_delay; 102 103 }; 104 105 106 /** 107 * Information about an inquiry job. 108 */ 109 struct Inquiry 110 { 111 /** 112 * Kept in a DLL. 113 */ 114 struct Inquiry *next; 115 116 /** 117 * Kept in a DLL. 118 */ 119 struct Inquiry *prev; 120 121 /** 122 * Handle to the exchange that made the transfer. 123 */ 124 struct Exchange *exchange; 125 126 /** 127 * Task where we retry fetching transfer details from the exchange. 128 */ 129 struct GNUNET_SCHEDULER_Task *task; 130 131 /** 132 * For which merchant instance is this tracking request? 133 */ 134 char *instance_id; 135 136 /** 137 * payto:// URI used for the transfer. 138 */ 139 struct TALER_FullPayto payto_uri; 140 141 /** 142 * Handle for the GET /transfers request. 143 */ 144 struct TALER_EXCHANGE_GetTransfersHandle *wdh; 145 146 /** 147 * When did the transfer happen? 148 */ 149 struct GNUNET_TIME_Timestamp execution_time; 150 151 /** 152 * Argument for the /wire/transfers request. 153 */ 154 struct TALER_WireTransferIdentifierRawP wtid; 155 156 /** 157 * Row of the wire transfer in our database. 158 */ 159 uint64_t rowid; 160 161 }; 162 163 164 /** 165 * Head of known exchanges. 166 */ 167 static struct Exchange *e_head; 168 169 /** 170 * Tail of known exchanges. 171 */ 172 static struct Exchange *e_tail; 173 174 /** 175 * The merchant's configuration. 176 */ 177 static const struct GNUNET_CONFIGURATION_Handle *cfg; 178 179 /** 180 * Our database plugin. 181 */ 182 static struct TALER_MERCHANTDB_Plugin *db_plugin; 183 184 /** 185 * Handle to the context for interacting with the bank. 186 */ 187 static struct GNUNET_CURL_Context *ctx; 188 189 /** 190 * Scheduler context for running the @e ctx. 191 */ 192 static struct GNUNET_CURL_RescheduleContext *rc; 193 194 /** 195 * Main task for #find_work(). 196 */ 197 static struct GNUNET_SCHEDULER_Task *task; 198 199 /** 200 * Event handler to learn that there are new transfers 201 * to check. 202 */ 203 static struct GNUNET_DB_EventHandler *eh; 204 205 /** 206 * Event handler to learn that there may be new exchange 207 * keys to check. 208 */ 209 static struct GNUNET_DB_EventHandler *eh_keys; 210 211 /** 212 * How many active inquiries do we have right now. 213 */ 214 static unsigned int active_inquiries; 215 216 /** 217 * Set to true if we ever encountered any problem. 218 */ 219 static bool found_problem; 220 221 /** 222 * Value to return from main(). 0 on success, non-zero on errors. 223 */ 224 static int global_ret; 225 226 /** 227 * #GNUNET_YES if we are in test mode and should exit when idle. 228 */ 229 static int test_mode; 230 231 /** 232 * True if the last DB query was limited by the 233 * #OPEN_INQUIRY_LIMIT and we thus should check again 234 * as soon as we are substantially below that limit, 235 * and not only when we get a DB notification. 236 */ 237 static bool at_limit; 238 239 240 /** 241 * Initiate download from exchange. 242 * 243 * @param cls a `struct Inquiry *` 244 */ 245 static void 246 exchange_request (void *cls); 247 248 249 /** 250 * The exchange @a e is ready to handle more inquiries, 251 * prepare to launch them. 252 * 253 * @param[in,out] e exchange to potentially launch inquiries on 254 */ 255 static void 256 launch_inquiries_at_exchange (struct Exchange *e) 257 { 258 for (struct Inquiry *w = e->w_head; 259 NULL != w; 260 w = w->next) 261 { 262 if (e->exchange_inquiries >= EXCHANGE_INQUIRY_LIMIT) 263 break; 264 if ( (NULL == w->task) && 265 (NULL == w->wdh) ) 266 { 267 e->exchange_inquiries++; 268 w->task = GNUNET_SCHEDULER_add_now (&exchange_request, 269 w); 270 } 271 } 272 } 273 274 275 /** 276 * Updates the transaction status for inquiry @a w to the given values. 277 * 278 * @param w inquiry to update status for 279 * @param next_attempt when should we retry @a w (if ever) 280 * @param http_status HTTP status of the response 281 * @param ec error code to use (if any) 282 * @param last_hint hint delivered with the response (if any, possibly NULL) 283 * @param needs_retry true if we should try the HTTP request again 284 */ 285 static void 286 update_transaction_status (const struct Inquiry *w, 287 struct GNUNET_TIME_Absolute next_attempt, 288 unsigned int http_status, 289 enum TALER_ErrorCode ec, 290 const char *last_hint, 291 bool needs_retry) 292 { 293 enum GNUNET_DB_QueryStatus qs; 294 295 qs = db_plugin->update_transfer_status (db_plugin->cls, 296 w->exchange->exchange_url, 297 &w->wtid, 298 next_attempt, 299 http_status, 300 ec, 301 last_hint, 302 needs_retry); 303 if (qs < 0) 304 { 305 GNUNET_break (0); 306 global_ret = EXIT_FAILURE; 307 GNUNET_SCHEDULER_shutdown (); 308 return; 309 } 310 } 311 312 313 /** 314 * Interact with the database to get the current set 315 * of exchange keys known to us. 316 * 317 * @param e the exchange to check 318 */ 319 static void 320 sync_keys (struct Exchange *e) 321 { 322 enum GNUNET_DB_QueryStatus qs; 323 struct TALER_EXCHANGE_Keys *keys; 324 struct GNUNET_TIME_Absolute first_retry; 325 326 qs = db_plugin->select_exchange_keys (db_plugin->cls, 327 e->exchange_url, 328 &first_retry, 329 &keys); 330 if (qs < 0) 331 { 332 GNUNET_break (0); 333 return; 334 } 335 if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) || 336 (NULL == keys) ) 337 { 338 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 339 "Cannot launch inquiries at `%s': lacking /keys response\n", 340 e->exchange_url); 341 return; 342 } 343 TALER_EXCHANGE_keys_decref (e->keys); 344 e->keys = keys; 345 launch_inquiries_at_exchange (e); 346 } 347 348 349 /** 350 * Lookup our internal data structure for the given 351 * @a exchange_url or create one if we do not yet have 352 * one. 353 * 354 * @param exchange_url base URL of the exchange 355 * @return our state for this exchange 356 */ 357 static struct Exchange * 358 find_exchange (const char *exchange_url) 359 { 360 struct Exchange *e; 361 362 for (e = e_head; NULL != e; e = e->next) 363 if (0 == strcmp (exchange_url, 364 e->exchange_url)) 365 return e; 366 e = GNUNET_new (struct Exchange); 367 e->exchange_url = GNUNET_strdup (exchange_url); 368 GNUNET_CONTAINER_DLL_insert (e_head, 369 e_tail, 370 e); 371 sync_keys (e); 372 return e; 373 } 374 375 376 /** 377 * Finds new transfers that require work in the merchant database. 378 * 379 * @param cls NULL 380 */ 381 static void 382 find_work (void *cls); 383 384 385 /** 386 * Free resources of @a w. 387 * 388 * @param[in] w inquiry job to terminate 389 */ 390 static void 391 end_inquiry (struct Inquiry *w) 392 { 393 struct Exchange *e = w->exchange; 394 395 GNUNET_assert (active_inquiries > 0); 396 active_inquiries--; 397 if (NULL != w->wdh) 398 { 399 TALER_EXCHANGE_get_transfers_cancel (w->wdh); 400 w->wdh = NULL; 401 } 402 GNUNET_free (w->instance_id); 403 GNUNET_free (w->payto_uri.full_payto); 404 GNUNET_CONTAINER_DLL_remove (e->w_head, 405 e->w_tail, 406 w); 407 GNUNET_free (w); 408 if ( (active_inquiries < OPEN_INQUIRY_LIMIT / 2) && 409 (NULL == task) && 410 (at_limit) ) 411 { 412 at_limit = false; 413 GNUNET_assert (NULL == task); 414 task = GNUNET_SCHEDULER_add_now (&find_work, 415 NULL); 416 } 417 if ( (NULL == task) && 418 (! at_limit) && 419 (0 == active_inquiries) && 420 (test_mode) ) 421 { 422 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 423 "No more open inquiries and in test mode. Exiting.\n"); 424 GNUNET_SCHEDULER_shutdown (); 425 return; 426 } 427 } 428 429 430 /** 431 * We're being aborted with CTRL-C (or SIGTERM). Shut down. 432 * 433 * @param cls closure (NULL) 434 */ 435 static void 436 shutdown_task (void *cls) 437 { 438 (void) cls; 439 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 440 "Running shutdown\n"); 441 while (NULL != e_head) 442 { 443 struct Exchange *e = e_head; 444 445 while (NULL != e->w_head) 446 { 447 struct Inquiry *w = e->w_head; 448 449 end_inquiry (w); 450 } 451 GNUNET_free (e->exchange_url); 452 if (NULL != e->keys) 453 { 454 TALER_EXCHANGE_keys_decref (e->keys); 455 e->keys = NULL; 456 } 457 GNUNET_CONTAINER_DLL_remove (e_head, 458 e_tail, 459 e); 460 GNUNET_free (e); 461 } 462 if (NULL != eh) 463 { 464 db_plugin->event_listen_cancel (eh); 465 eh = NULL; 466 } 467 if (NULL != eh_keys) 468 { 469 db_plugin->event_listen_cancel (eh_keys); 470 eh_keys = NULL; 471 } 472 if (NULL != task) 473 { 474 GNUNET_SCHEDULER_cancel (task); 475 task = NULL; 476 } 477 TALER_MERCHANTDB_plugin_unload (db_plugin); 478 db_plugin = NULL; 479 cfg = NULL; 480 if (NULL != ctx) 481 { 482 GNUNET_CURL_fini (ctx); 483 ctx = NULL; 484 } 485 if (NULL != rc) 486 { 487 GNUNET_CURL_gnunet_rc_destroy (rc); 488 rc = NULL; 489 } 490 } 491 492 493 /** 494 * Check that the given @a wire_fee is what the @a e should charge 495 * at the @a execution_time. If the fee is correct (according to our 496 * database), return #GNUNET_OK. If we do not have the fee structure in our 497 * DB, we just accept it and return #GNUNET_NO; if we have proof that the fee 498 * is bogus, we respond with the proof to the client and return 499 * #GNUNET_SYSERR. 500 * 501 * @param w inquiry to check fees of 502 * @param execution_time time of the wire transfer 503 * @param wire_fee fee claimed by the exchange 504 * @return #GNUNET_SYSERR if we returned hard proof of 505 * missbehavior from the exchange to the client 506 */ 507 static enum GNUNET_GenericReturnValue 508 check_wire_fee (struct Inquiry *w, 509 struct GNUNET_TIME_Timestamp execution_time, 510 const struct TALER_Amount *wire_fee) 511 { 512 struct Exchange *e = w->exchange; 513 const struct TALER_EXCHANGE_Keys *keys = e->keys; 514 struct TALER_WireFeeSet fees; 515 struct TALER_MasterSignatureP master_sig; 516 struct GNUNET_TIME_Timestamp start_date; 517 struct GNUNET_TIME_Timestamp end_date; 518 enum GNUNET_DB_QueryStatus qs; 519 char *wire_method; 520 521 if (NULL == keys) 522 { 523 GNUNET_break (0); 524 return GNUNET_NO; 525 } 526 wire_method = TALER_payto_get_method (w->payto_uri.full_payto); 527 qs = db_plugin->lookup_wire_fee (db_plugin->cls, 528 &keys->master_pub, 529 wire_method, 530 execution_time, 531 &fees, 532 &start_date, 533 &end_date, 534 &master_sig); 535 switch (qs) 536 { 537 case GNUNET_DB_STATUS_HARD_ERROR: 538 GNUNET_break (0); 539 GNUNET_free (wire_method); 540 return GNUNET_SYSERR; 541 case GNUNET_DB_STATUS_SOFT_ERROR: 542 GNUNET_free (wire_method); 543 return GNUNET_NO; 544 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 545 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 546 "Failed to find wire fee for `%s' and method `%s' at %s in DB, accepting blindly that the fee is %s\n", 547 TALER_B2S (&keys->master_pub), 548 wire_method, 549 GNUNET_TIME_timestamp2s (execution_time), 550 TALER_amount2s (wire_fee)); 551 GNUNET_free (wire_method); 552 return GNUNET_OK; 553 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 554 break; 555 } 556 if ( (GNUNET_OK != 557 TALER_amount_cmp_currency (&fees.wire, 558 wire_fee)) || 559 (0 > TALER_amount_cmp (&fees.wire, 560 wire_fee)) ) 561 { 562 GNUNET_break_op (0); 563 GNUNET_free (wire_method); 564 return GNUNET_SYSERR; /* expected_fee >= wire_fee */ 565 } 566 GNUNET_free (wire_method); 567 return GNUNET_OK; 568 } 569 570 571 /** 572 * Closure for #check_transfer() 573 */ 574 struct CheckTransferContext 575 { 576 577 /** 578 * Pointer to the detail that we are currently 579 * checking in #check_transfer(). 580 */ 581 const struct TALER_TrackTransferDetails *current_detail; 582 583 /** 584 * Which transaction detail are we currently looking at? 585 */ 586 unsigned int current_offset; 587 588 /** 589 * #GNUNET_NO if we did not find a matching coin. 590 * #GNUNET_SYSERR if we found a matching coin, but the amounts do not match. 591 * #GNUNET_OK if we did find a matching coin. 592 */ 593 enum GNUNET_GenericReturnValue check_transfer_result; 594 595 /** 596 * Set to error code, if any. 597 */ 598 enum TALER_ErrorCode ec; 599 600 /** 601 * Set to true if @e ec indicates a permanent failure. 602 */ 603 bool failure; 604 }; 605 606 607 /** 608 * This function checks that the information about the coin which 609 * was paid back by _this_ wire transfer matches what _we_ (the merchant) 610 * knew about this coin. 611 * 612 * @param cls closure with our `struct CheckTransferContext *` 613 * @param exchange_url URL of the exchange that issued @a coin_pub 614 * @param amount_with_fee amount the exchange will transfer for this coin 615 * @param deposit_fee fee the exchange will charge for this coin 616 * @param refund_fee fee the exchange will charge for refunding this coin 617 * @param wire_fee paid wire fee 618 * @param h_wire hash of merchant's wire details 619 * @param deposit_timestamp when did the exchange receive the deposit 620 * @param refund_deadline until when are refunds allowed 621 * @param exchange_sig signature by the exchange 622 * @param exchange_pub exchange signing key used for @a exchange_sig 623 */ 624 static void 625 check_transfer (void *cls, 626 const char *exchange_url, 627 const struct TALER_Amount *amount_with_fee, 628 const struct TALER_Amount *deposit_fee, 629 const struct TALER_Amount *refund_fee, 630 const struct TALER_Amount *wire_fee, 631 const struct TALER_MerchantWireHashP *h_wire, 632 struct GNUNET_TIME_Timestamp deposit_timestamp, 633 struct GNUNET_TIME_Timestamp refund_deadline, 634 const struct TALER_ExchangeSignatureP *exchange_sig, 635 const struct TALER_ExchangePublicKeyP *exchange_pub) 636 { 637 struct CheckTransferContext *ctc = cls; 638 const struct TALER_TrackTransferDetails *ttd = ctc->current_detail; 639 640 if (GNUNET_SYSERR == ctc->check_transfer_result) 641 { 642 GNUNET_break (0); 643 return; /* already had a serious issue; odd that we're called more than once as well... */ 644 } 645 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 646 "Checking coin with value %s\n", 647 TALER_amount2s (amount_with_fee)); 648 if ( (GNUNET_OK != 649 TALER_amount_cmp_currency (amount_with_fee, 650 &ttd->coin_value)) || 651 (0 != TALER_amount_cmp (amount_with_fee, 652 &ttd->coin_value)) ) 653 { 654 /* Disagreement between the exchange and us about how much this 655 coin is worth! */ 656 GNUNET_break_op (0); 657 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 658 "Disagreement about coin value %s\n", 659 TALER_amount2s (amount_with_fee)); 660 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 661 "Exchange gave it a value of %s\n", 662 TALER_amount2s (&ttd->coin_value)); 663 ctc->check_transfer_result = GNUNET_SYSERR; 664 /* Build the `TrackTransferConflictDetails` */ 665 ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; 666 ctc->failure = true; 667 /* FIXME-#9426: this should be reported to the auditor (once the auditor has an API for this) */ 668 return; 669 } 670 if ( (GNUNET_OK != 671 TALER_amount_cmp_currency (deposit_fee, 672 &ttd->coin_fee)) || 673 (0 != TALER_amount_cmp (deposit_fee, 674 &ttd->coin_fee)) ) 675 { 676 /* Disagreement between the exchange and us about how much this 677 coin is worth! */ 678 GNUNET_break_op (0); 679 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 680 "Expected fee is %s\n", 681 TALER_amount2s (&ttd->coin_fee)); 682 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 683 "Fee claimed by exchange is %s\n", 684 TALER_amount2s (deposit_fee)); 685 ctc->check_transfer_result = GNUNET_SYSERR; 686 /* Build the `TrackTransferConflictDetails` */ 687 ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS; 688 ctc->failure = true; 689 /* FIXME-#9426: this should be reported to the auditor (once the auditor has an API for this) */ 690 return; 691 } 692 ctc->check_transfer_result = GNUNET_OK; 693 } 694 695 696 /** 697 * Function called with detailed wire transfer data, including all 698 * of the coin transactions that were combined into the wire transfer. 699 * 700 * @param cls closure a `struct Inquiry *` 701 * @param tgr response details 702 */ 703 static void 704 wire_transfer_cb (void *cls, 705 const struct TALER_EXCHANGE_GetTransfersResponse *tgr) 706 { 707 struct Inquiry *w = cls; 708 struct Exchange *e = w->exchange; 709 const struct TALER_EXCHANGE_TransferData *td = NULL; 710 711 e->exchange_inquiries--; 712 w->wdh = NULL; 713 if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries) 714 launch_inquiries_at_exchange (e); 715 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 716 "Got response code %u from exchange for GET /transfers/$WTID\n", 717 tgr->hr.http_status); 718 switch (tgr->hr.http_status) 719 { 720 case MHD_HTTP_OK: 721 td = &tgr->details.ok.td; 722 w->execution_time = td->execution_time; 723 e->transfer_delay = GNUNET_TIME_UNIT_ZERO; 724 break; 725 case MHD_HTTP_BAD_REQUEST: 726 case MHD_HTTP_FORBIDDEN: 727 case MHD_HTTP_NOT_FOUND: 728 found_problem = true; 729 update_transaction_status (w, 730 GNUNET_TIME_UNIT_FOREVER_ABS, 731 tgr->hr.http_status, 732 tgr->hr.ec, 733 tgr->hr.hint, 734 false); 735 end_inquiry (w); 736 return; 737 case MHD_HTTP_INTERNAL_SERVER_ERROR: 738 case MHD_HTTP_BAD_GATEWAY: 739 case MHD_HTTP_GATEWAY_TIMEOUT: 740 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 741 update_transaction_status (w, 742 GNUNET_TIME_relative_to_absolute ( 743 e->transfer_delay), 744 tgr->hr.http_status, 745 tgr->hr.ec, 746 tgr->hr.hint, 747 true); 748 end_inquiry (w); 749 return; 750 default: 751 found_problem = true; 752 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 753 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 754 "Unexpected HTTP status %u\n", 755 tgr->hr.http_status); 756 update_transaction_status (w, 757 GNUNET_TIME_relative_to_absolute ( 758 e->transfer_delay), 759 tgr->hr.http_status, 760 tgr->hr.ec, 761 tgr->hr.hint, 762 true); 763 end_inquiry (w); 764 return; 765 } 766 db_plugin->preflight (db_plugin->cls); 767 768 { 769 enum GNUNET_DB_QueryStatus qs; 770 771 qs = db_plugin->insert_transfer_details (db_plugin->cls, 772 w->instance_id, 773 w->exchange->exchange_url, 774 w->payto_uri, 775 &w->wtid, 776 td); 777 if (0 > qs) 778 { 779 /* Always report on DB error as well to enable diagnostics */ 780 GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs); 781 global_ret = EXIT_FAILURE; 782 GNUNET_SCHEDULER_shutdown (); 783 return; 784 } 785 // FIXME: insert_transfer_details has more complex 786 // error possibilities inside, expose them here 787 // and persist them with the transaction status 788 // if they arise (especially no_account, no_exchange, conflict) 789 // -- not sure how no_instance could happen... 790 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) 791 { 792 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 793 "Transfer already known. Ignoring duplicate.\n"); 794 return; 795 } 796 } 797 798 { 799 struct CheckTransferContext ctc = { 800 .ec = TALER_EC_NONE, 801 .failure = false 802 }; 803 804 for (unsigned int i = 0; i<td->details_length; i++) 805 { 806 const struct TALER_TrackTransferDetails *ttd = &td->details[i]; 807 enum GNUNET_DB_QueryStatus qs; 808 809 if (TALER_EC_NONE != ctc.ec) 810 break; /* already encountered an error */ 811 ctc.current_offset = i; 812 ctc.current_detail = ttd; 813 /* Set the coin as "never seen" before. */ 814 ctc.check_transfer_result = GNUNET_NO; 815 qs = db_plugin->lookup_deposits_by_contract_and_coin ( 816 db_plugin->cls, 817 w->instance_id, 818 &ttd->h_contract_terms, 819 &ttd->coin_pub, 820 &check_transfer, 821 &ctc); 822 switch (qs) 823 { 824 case GNUNET_DB_STATUS_SOFT_ERROR: 825 GNUNET_break (0); 826 ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; 827 break; 828 case GNUNET_DB_STATUS_HARD_ERROR: 829 GNUNET_break (0); 830 ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED; 831 break; 832 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 833 /* The exchange says we made this deposit, but WE do not 834 recall making it (corrupted / unreliable database?)! 835 Well, let's say thanks and accept the money! */ 836 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 837 "Failed to find payment data in DB\n"); 838 ctc.check_transfer_result = GNUNET_OK; 839 break; 840 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 841 break; 842 } 843 switch (ctc.check_transfer_result) 844 { 845 case GNUNET_NO: 846 /* Internal error: how can we have called #check_transfer() 847 but still have no result? */ 848 GNUNET_break (0); 849 ctc.ec = TALER_EC_GENERIC_INTERNAL_INVARIANT_FAILURE; 850 return; 851 case GNUNET_SYSERR: 852 /* #check_transfer() failed, report conflict! */ 853 GNUNET_break_op (0); 854 GNUNET_assert (TALER_EC_NONE != ctc.ec); 855 break; 856 case GNUNET_OK: 857 break; 858 } 859 } 860 if (TALER_EC_NONE != ctc.ec) 861 { 862 update_transaction_status ( 863 w, 864 ctc.failure 865 ? GNUNET_TIME_UNIT_FOREVER_ABS 866 : GNUNET_TIME_relative_to_absolute ( 867 GNUNET_TIME_UNIT_MINUTES), 868 MHD_HTTP_OK, 869 ctc.ec, 870 NULL /* no hint */, 871 ! ctc.failure); 872 end_inquiry (w); 873 return; 874 } 875 } 876 877 if (GNUNET_SYSERR == 878 check_wire_fee (w, 879 td->execution_time, 880 &td->wire_fee)) 881 { 882 GNUNET_break_op (0); 883 update_transaction_status (w, 884 GNUNET_TIME_UNIT_FOREVER_ABS, 885 MHD_HTTP_OK, 886 TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_BAD_WIRE_FEE, 887 TALER_amount2s (&td->wire_fee), 888 false); 889 end_inquiry (w); 890 return; 891 } 892 893 { 894 enum GNUNET_DB_QueryStatus qs; 895 896 qs = db_plugin->finalize_transfer_status (db_plugin->cls, 897 w->exchange->exchange_url, 898 &w->wtid, 899 &td->h_details, 900 &td->total_amount, 901 &td->wire_fee, 902 &td->exchange_pub, 903 &td->exchange_sig); 904 if (qs < 0) 905 { 906 GNUNET_break (0); 907 global_ret = EXIT_FAILURE; 908 GNUNET_SCHEDULER_shutdown (); 909 return; 910 } 911 } 912 end_inquiry (w); 913 } 914 915 916 /** 917 * Initiate download from an exchange for a given inquiry. 918 * 919 * @param cls a `struct Inquiry *` 920 */ 921 static void 922 exchange_request (void *cls) 923 { 924 struct Inquiry *w = cls; 925 struct Exchange *e = w->exchange; 926 927 w->task = NULL; 928 if (NULL == e->keys) 929 return; 930 w->wdh = TALER_EXCHANGE_get_transfers_create ( 931 ctx, 932 e->exchange_url, 933 e->keys, 934 &w->wtid); 935 if (NULL == w->wdh) 936 { 937 GNUNET_break (0); 938 e->exchange_inquiries--; 939 e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay); 940 update_transaction_status (w, 941 GNUNET_TIME_relative_to_absolute ( 942 e->transfer_delay), 943 0 /* failed to begin */, 944 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE, 945 "Failed to initiate GET request at exchange", 946 true); 947 end_inquiry (w); 948 return; 949 } 950 GNUNET_assert (TALER_EC_NONE == 951 TALER_EXCHANGE_get_transfers_start (w->wdh, 952 &wire_transfer_cb, 953 w)); 954 955 /* Wait at least 1m for the network transfer */ 956 update_transaction_status (w, 957 GNUNET_TIME_relative_to_absolute ( 958 GNUNET_TIME_UNIT_MINUTES), 959 0 /* timeout */, 960 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_LIST, 961 "Initiated GET with exchange", 962 true); 963 } 964 965 966 /** 967 * Function called with information about a transfer we 968 * should ask the exchange about. 969 * 970 * @param cls closure (NULL) 971 * @param rowid row of the transfer in the merchant database 972 * @param instance_id instance that received the transfer 973 * @param exchange_url base URL of the exchange that initiated the transfer 974 * @param payto_uri account of the merchant that received the transfer 975 * @param wtid wire transfer subject identifying the aggregation 976 * @param next_attempt when should we next try to interact with the exchange 977 */ 978 static void 979 start_inquiry ( 980 void *cls, 981 uint64_t rowid, 982 const char *instance_id, 983 const char *exchange_url, 984 struct TALER_FullPayto payto_uri, 985 const struct TALER_WireTransferIdentifierRawP *wtid, 986 struct GNUNET_TIME_Absolute next_attempt) 987 { 988 struct Exchange *e; 989 struct Inquiry *w; 990 991 (void) cls; 992 if (GNUNET_TIME_absolute_is_future (next_attempt)) 993 { 994 if (NULL == task) 995 task = GNUNET_SCHEDULER_add_at (next_attempt, 996 &find_work, 997 NULL); 998 return; 999 } 1000 active_inquiries++; 1001 1002 e = find_exchange (exchange_url); 1003 for (w = e->w_head; NULL != w; w = w->next) 1004 { 1005 if (0 == GNUNET_memcmp (&w->wtid, 1006 wtid)) 1007 { 1008 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 1009 "Already processing inquiry. Aborting ongoing inquiry\n"); 1010 end_inquiry (w); 1011 break; 1012 } 1013 } 1014 1015 w = GNUNET_new (struct Inquiry); 1016 w->payto_uri.full_payto = GNUNET_strdup (payto_uri.full_payto); 1017 w->instance_id = GNUNET_strdup (instance_id); 1018 w->rowid = rowid; 1019 w->wtid = *wtid; 1020 GNUNET_CONTAINER_DLL_insert (e->w_head, 1021 e->w_tail, 1022 w); 1023 w->exchange = e; 1024 if (NULL != w->exchange->keys) 1025 w->task = GNUNET_SCHEDULER_add_now (&exchange_request, 1026 w); 1027 /* Wait at least 1 minute for /keys */ 1028 update_transaction_status (w, 1029 GNUNET_TIME_relative_to_absolute ( 1030 GNUNET_TIME_UNIT_MINUTES), 1031 0 /* timeout */, 1032 TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_KEYS, 1033 exchange_url, 1034 true); 1035 } 1036 1037 1038 static void 1039 find_work (void *cls) 1040 { 1041 enum GNUNET_DB_QueryStatus qs; 1042 int limit; 1043 1044 (void) cls; 1045 task = NULL; 1046 GNUNET_assert (OPEN_INQUIRY_LIMIT >= active_inquiries); 1047 limit = OPEN_INQUIRY_LIMIT - active_inquiries; 1048 if (0 == limit) 1049 { 1050 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1051 "Not looking for work: at limit\n"); 1052 at_limit = true; 1053 return; 1054 } 1055 at_limit = false; 1056 qs = db_plugin->select_open_transfers (db_plugin->cls, 1057 limit, 1058 &start_inquiry, 1059 NULL); 1060 if (qs < 0) 1061 { 1062 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1063 "Failed to obtain open transfers from database\n"); 1064 GNUNET_SCHEDULER_shutdown (); 1065 return; 1066 } 1067 if (qs >= limit) 1068 { 1069 /* DB limited response, re-trigger DB interaction 1070 the moment we significantly fall below the 1071 limit */ 1072 at_limit = true; 1073 } 1074 if (0 == active_inquiries) 1075 { 1076 if (test_mode) 1077 { 1078 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1079 "No more open inquiries and in test mode. Existing.\n"); 1080 GNUNET_SCHEDULER_shutdown (); 1081 return; 1082 } 1083 GNUNET_log ( 1084 GNUNET_ERROR_TYPE_INFO, 1085 "No open inquiries found, waiting for notification to resume\n"); 1086 } 1087 } 1088 1089 1090 /** 1091 * Function called when transfers are added to the merchant database. We look 1092 * for more work. 1093 * 1094 * @param cls closure (NULL) 1095 * @param extra additional event data provided 1096 * @param extra_size number of bytes in @a extra 1097 */ 1098 static void 1099 transfer_added (void *cls, 1100 const void *extra, 1101 size_t extra_size) 1102 { 1103 (void) cls; 1104 (void) extra; 1105 (void) extra_size; 1106 if (active_inquiries > OPEN_INQUIRY_LIMIT / 2) 1107 { 1108 /* Trigger DB only once we are substantially below the limit */ 1109 at_limit = true; 1110 return; 1111 } 1112 if (NULL != task) 1113 return; 1114 task = GNUNET_SCHEDULER_add_now (&find_work, 1115 NULL); 1116 } 1117 1118 1119 /** 1120 * Function called when keys were changed in the 1121 * merchant database. Updates ours. 1122 * 1123 * @param cls closure (NULL) 1124 * @param extra additional event data provided 1125 * @param extra_size number of bytes in @a extra 1126 */ 1127 static void 1128 keys_changed (void *cls, 1129 const void *extra, 1130 size_t extra_size) 1131 { 1132 const char *url = extra; 1133 struct Exchange *e; 1134 1135 (void) cls; 1136 if ( (NULL == extra) || 1137 (0 == extra_size) ) 1138 { 1139 GNUNET_break (0); 1140 return; 1141 } 1142 if ('\0' != url[extra_size - 1]) 1143 { 1144 GNUNET_break (0); 1145 return; 1146 } 1147 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 1148 "Received keys change notification: reload `%s'\n", 1149 url); 1150 e = find_exchange (url); 1151 sync_keys (e); 1152 } 1153 1154 1155 /** 1156 * First task. 1157 * 1158 * @param cls closure, NULL 1159 * @param args remaining command-line arguments 1160 * @param cfgfile name of the configuration file used (for saving, can be NULL!) 1161 * @param c configuration 1162 */ 1163 static void 1164 run (void *cls, 1165 char *const *args, 1166 const char *cfgfile, 1167 const struct GNUNET_CONFIGURATION_Handle *c) 1168 { 1169 (void) args; 1170 (void) cfgfile; 1171 1172 cfg = c; 1173 GNUNET_SCHEDULER_add_shutdown (&shutdown_task, 1174 NULL); 1175 ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule, 1176 &rc); 1177 rc = GNUNET_CURL_gnunet_rc_create (ctx); 1178 if (NULL == ctx) 1179 { 1180 GNUNET_break (0); 1181 GNUNET_SCHEDULER_shutdown (); 1182 global_ret = EXIT_FAILURE; 1183 return; 1184 } 1185 if (NULL == 1186 (db_plugin = TALER_MERCHANTDB_plugin_load (cfg))) 1187 { 1188 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1189 "Failed to initialize DB subsystem\n"); 1190 GNUNET_SCHEDULER_shutdown (); 1191 global_ret = EXIT_NOTCONFIGURED; 1192 return; 1193 } 1194 if (GNUNET_OK != 1195 db_plugin->connect (db_plugin->cls)) 1196 { 1197 GNUNET_log (GNUNET_ERROR_TYPE_ERROR, 1198 "Failed to connect to database. Consider running taler-merchant-dbinit!\n"); 1199 GNUNET_SCHEDULER_shutdown (); 1200 global_ret = EXIT_FAILURE; 1201 return; 1202 } 1203 { 1204 struct GNUNET_DB_EventHeaderP es = { 1205 .size = htons (sizeof (es)), 1206 .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_EXPECTED) 1207 }; 1208 1209 eh = db_plugin->event_listen (db_plugin->cls, 1210 &es, 1211 GNUNET_TIME_UNIT_FOREVER_REL, 1212 &transfer_added, 1213 NULL); 1214 } 1215 { 1216 struct GNUNET_DB_EventHeaderP es = { 1217 .size = htons (sizeof (es)), 1218 .type = htons (TALER_DBEVENT_MERCHANT_EXCHANGE_KEYS) 1219 }; 1220 1221 eh_keys 1222 = db_plugin->event_listen (db_plugin->cls, 1223 &es, 1224 GNUNET_TIME_UNIT_FOREVER_REL, 1225 &keys_changed, 1226 NULL); 1227 } 1228 1229 GNUNET_assert (NULL == task); 1230 task = GNUNET_SCHEDULER_add_now (&find_work, 1231 NULL); 1232 } 1233 1234 1235 /** 1236 * The main function of taler-merchant-reconciliation 1237 * 1238 * @param argc number of arguments from the command line 1239 * @param argv command line arguments 1240 * @return 0 ok, 1 on error 1241 */ 1242 int 1243 main (int argc, 1244 char *const *argv) 1245 { 1246 struct GNUNET_GETOPT_CommandLineOption options[] = { 1247 GNUNET_GETOPT_option_timetravel ('T', 1248 "timetravel"), 1249 GNUNET_GETOPT_option_flag ('t', 1250 "test", 1251 "run in test mode and exit when idle", 1252 &test_mode), 1253 GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION), 1254 GNUNET_GETOPT_OPTION_END 1255 }; 1256 enum GNUNET_GenericReturnValue ret; 1257 1258 ret = GNUNET_PROGRAM_run ( 1259 TALER_MERCHANT_project_data (), 1260 argc, argv, 1261 "taler-merchant-reconciliation", 1262 gettext_noop ( 1263 "background process that reconciles bank transfers with orders by asking the exchange"), 1264 options, 1265 &run, NULL); 1266 if (GNUNET_SYSERR == ret) 1267 return EXIT_INVALIDARGUMENT; 1268 if (GNUNET_NO == ret) 1269 return EXIT_SUCCESS; 1270 if ( (found_problem) && 1271 (0 == global_ret) ) 1272 global_ret = 7; 1273 return global_ret; 1274 } 1275 1276 1277 /* end of taler-merchant-reconciliation.c */