merchant

Merchant backend to process payments, run by merchants
Log | Files | Refs | Submodules | README | LICENSE

taler-merchant-reconciliation.c (37602B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2023-2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 /**
     17  * @file src/backend/taler-merchant-reconciliation.c
     18  * @brief Process that reconciles information about incoming bank transfers with orders by asking the exchange
     19  * @author Christian Grothoff
     20  */
     21 #include "platform.h"
     22 struct Inquiry;
     23 #define TALER_EXCHANGE_GET_TRANSFERS_RESULT_CLOSURE struct Inquiry
     24 #include "microhttpd.h"
     25 #include <gnunet/gnunet_util_lib.h>
     26 #include <jansson.h>
     27 #include <pthread.h>
     28 #include <taler/taler_dbevents.h>
     29 #include "taler/taler_merchant_util.h"
     30 #include "taler/taler_merchant_bank_lib.h"
     31 #include "merchantdb_lib.h"
     32 #include "merchantdb_lib.h"
     33 #include "merchant-database/finalize_transfer_status.h"
     34 #include "merchant-database/insert_transfer_details.h"
     35 #include "merchant-database/lookup_deposits_by_contract_and_coin.h"
     36 #include "merchant-database/lookup_wire_fee.h"
     37 #include "merchant-database/select_exchange_keys.h"
     38 #include "merchant-database/select_open_transfers.h"
     39 #include "merchant-database/set_instance.h"
     40 #include "merchant-database/update_transfer_status.h"
     41 #include "merchant-database/event_listen.h"
     42 #include "merchant-database/preflight.h"
     43 
     44 /**
     45  * Timeout for the exchange interaction.  Rather long as we should do
     46  * long-polling and do not want to wake up too often.
     47  */
     48 #define EXCHANGE_TIMEOUT GNUNET_TIME_relative_multiply ( \
     49           GNUNET_TIME_UNIT_MINUTES, \
     50           30)
     51 
     52 /**
     53  * How many inquiries do we process concurrently at most.
     54  */
     55 #define OPEN_INQUIRY_LIMIT 1024
     56 
     57 /**
     58  * How many inquiries do we process concurrently per exchange at most.
     59  */
     60 #define EXCHANGE_INQUIRY_LIMIT 16
     61 
     62 
     63 /**
     64  * Information about an inquiry job.
     65  */
     66 struct Inquiry;
     67 
     68 
     69 /**
     70  * Information about an exchange.
     71  */
     72 struct Exchange
     73 {
     74   /**
     75    * Kept in a DLL.
     76    */
     77   struct Exchange *next;
     78 
     79   /**
     80    * Kept in a DLL.
     81    */
     82   struct Exchange *prev;
     83 
     84   /**
     85    * Head of active inquiries.
     86    */
     87   struct Inquiry *w_head;
     88 
     89   /**
     90    * Tail of active inquiries.
     91    */
     92   struct Inquiry *w_tail;
     93 
     94   /**
     95    * Which exchange are we tracking here.
     96    */
     97   char *exchange_url;
     98 
     99   /**
    100    * The keys of this exchange
    101    */
    102   struct TALER_EXCHANGE_Keys *keys;
    103 
    104   /**
    105    * How many active inquiries do we have right now with this exchange.
    106    */
    107   unsigned int exchange_inquiries;
    108 
    109   /**
    110    * How long should we wait between requests
    111    * for transfer details?
    112    */
    113   struct GNUNET_TIME_Relative transfer_delay;
    114 
    115 };
    116 
    117 
    118 /**
    119  * Information about an inquiry job.
    120  */
    121 struct Inquiry
    122 {
    123   /**
    124    * Kept in a DLL.
    125    */
    126   struct Inquiry *next;
    127 
    128   /**
    129    * Kept in a DLL.
    130    */
    131   struct Inquiry *prev;
    132 
    133   /**
    134    * Handle to the exchange that made the transfer.
    135    */
    136   struct Exchange *exchange;
    137 
    138   /**
    139    * Task where we retry fetching transfer details from the exchange.
    140    */
    141   struct GNUNET_SCHEDULER_Task *task;
    142 
    143   /**
    144    * For which merchant instance is this tracking request?
    145    */
    146   char *instance_id;
    147 
    148   /**
    149    * payto:// URI used for the transfer.
    150    */
    151   struct TALER_FullPayto payto_uri;
    152 
    153   /**
    154    * Handle for the GET /transfers request.
    155    */
    156   struct TALER_EXCHANGE_GetTransfersHandle *wdh;
    157 
    158   /**
    159    * When did the transfer happen?
    160    */
    161   struct GNUNET_TIME_Timestamp execution_time;
    162 
    163   /**
    164    * Argument for the /wire/transfers request.
    165    */
    166   struct TALER_WireTransferIdentifierRawP wtid;
    167 
    168   /**
    169    * Row of the wire transfer in our database.
    170    */
    171   uint64_t rowid;
    172 
    173 };
    174 
    175 
    176 /**
    177  * Head of known exchanges.
    178  */
    179 static struct Exchange *e_head;
    180 
    181 /**
    182  * Tail of known exchanges.
    183  */
    184 static struct Exchange *e_tail;
    185 
    186 /**
    187  * The merchant's configuration.
    188  */
    189 static const struct GNUNET_CONFIGURATION_Handle *cfg;
    190 
    191 /**
    192  * Our database connection.
    193  */
    194 static struct TALER_MERCHANTDB_PostgresContext *pg;
    195 
    196 /**
    197  * Handle to the context for interacting with the bank.
    198  */
    199 static struct GNUNET_CURL_Context *ctx;
    200 
    201 /**
    202  * Scheduler context for running the @e ctx.
    203  */
    204 static struct GNUNET_CURL_RescheduleContext *rc;
    205 
    206 /**
    207  * Main task for #find_work().
    208  */
    209 static struct GNUNET_SCHEDULER_Task *task;
    210 
    211 /**
    212  * Event handler to learn that there are new transfers
    213  * to check.
    214  */
    215 static struct GNUNET_DB_EventHandler *eh;
    216 
    217 /**
    218  * Event handler to learn that there may be new exchange
    219  * keys to check.
    220  */
    221 static struct GNUNET_DB_EventHandler *eh_keys;
    222 
    223 /**
    224  * How many active inquiries do we have right now.
    225  */
    226 static unsigned int active_inquiries;
    227 
    228 /**
    229  * Set to true if we ever encountered any problem.
    230  */
    231 static bool found_problem;
    232 
    233 /**
    234  * Value to return from main(). 0 on success, non-zero on errors.
    235  */
    236 static int global_ret;
    237 
    238 /**
    239  * #GNUNET_YES if we are in test mode and should exit when idle.
    240  */
    241 static int test_mode;
    242 
    243 /**
    244  * True if the last DB query was limited by the
    245  * #OPEN_INQUIRY_LIMIT and we thus should check again
    246  * as soon as we are substantially below that limit,
    247  * and not only when we get a DB notification.
    248  */
    249 static bool at_limit;
    250 
    251 
    252 /**
    253  * Initiate download from exchange.
    254  *
    255  * @param cls a `struct Inquiry *`
    256  */
    257 static void
    258 exchange_request (void *cls);
    259 
    260 
    261 /**
    262  * The exchange @a e is ready to handle more inquiries,
    263  * prepare to launch them.
    264  *
    265  * @param[in,out] e exchange to potentially launch inquiries on
    266  */
    267 static void
    268 launch_inquiries_at_exchange (struct Exchange *e)
    269 {
    270   for (struct Inquiry *w = e->w_head;
    271        NULL != w;
    272        w = w->next)
    273   {
    274     if (e->exchange_inquiries >= EXCHANGE_INQUIRY_LIMIT)
    275       break;
    276     if ( (NULL == w->task) &&
    277          (NULL == w->wdh) )
    278     {
    279       e->exchange_inquiries++;
    280       w->task = GNUNET_SCHEDULER_add_now (&exchange_request,
    281                                           w);
    282     }
    283   }
    284 }
    285 
    286 
    287 /**
    288  * Updates the transaction status for inquiry @a w to the given values.
    289  *
    290  * @param w inquiry to update status for
    291  * @param next_attempt when should we retry @a w (if ever)
    292  * @param http_status HTTP status of the response
    293  * @param ec error code to use (if any)
    294  * @param last_hint hint delivered with the response (if any, possibly NULL)
    295  * @param needs_retry true if we should try the HTTP request again
    296  */
    297 static void
    298 update_transaction_status (const struct Inquiry *w,
    299                            struct GNUNET_TIME_Absolute next_attempt,
    300                            unsigned int http_status,
    301                            enum TALER_ErrorCode ec,
    302                            const char *last_hint,
    303                            bool needs_retry)
    304 {
    305   enum GNUNET_DB_QueryStatus qs;
    306 
    307   qs = TALER_MERCHANTDB_set_instance (pg,
    308                                       w->instance_id);
    309   if (qs <= 0)
    310   {
    311     GNUNET_break (0);
    312     global_ret = EXIT_FAILURE;
    313     GNUNET_SCHEDULER_shutdown ();
    314     return;
    315   }
    316   qs = TALER_MERCHANTDB_update_transfer_status (pg,
    317                                                 w->exchange->exchange_url,
    318                                                 &w->wtid,
    319                                                 next_attempt,
    320                                                 http_status,
    321                                                 ec,
    322                                                 last_hint,
    323                                                 needs_retry);
    324   GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT ==
    325                 TALER_MERCHANTDB_set_instance (
    326                   pg,
    327                   NULL));
    328   if (qs < 0)
    329   {
    330     GNUNET_break (0);
    331     global_ret = EXIT_FAILURE;
    332     GNUNET_SCHEDULER_shutdown ();
    333     return;
    334   }
    335 }
    336 
    337 
    338 /**
    339  * Interact with the database to get the current set
    340  * of exchange keys known to us.
    341  *
    342  * @param e the exchange to check
    343  */
    344 static void
    345 sync_keys (struct Exchange *e)
    346 {
    347   enum GNUNET_DB_QueryStatus qs;
    348   struct TALER_EXCHANGE_Keys *keys;
    349   struct GNUNET_TIME_Absolute first_retry;
    350 
    351   qs = TALER_MERCHANTDB_select_exchange_keys (pg,
    352                                               e->exchange_url,
    353                                               &first_retry,
    354                                               &keys);
    355   if (qs < 0)
    356   {
    357     GNUNET_break (0);
    358     return;
    359   }
    360   if ( (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) ||
    361        (NULL == keys) )
    362   {
    363     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    364                 "Cannot launch inquiries at `%s': lacking /keys response\n",
    365                 e->exchange_url);
    366     return;
    367   }
    368   TALER_EXCHANGE_keys_decref (e->keys);
    369   e->keys = keys;
    370   launch_inquiries_at_exchange (e);
    371 }
    372 
    373 
    374 /**
    375  * Lookup our internal data structure for the given
    376  * @a exchange_url or create one if we do not yet have
    377  * one.
    378  *
    379  * @param exchange_url base URL of the exchange
    380  * @return our state for this exchange
    381  */
    382 static struct Exchange *
    383 find_exchange (const char *exchange_url)
    384 {
    385   struct Exchange *e;
    386 
    387   for (e = e_head; NULL != e; e = e->next)
    388     if (0 == strcmp (exchange_url,
    389                      e->exchange_url))
    390       return e;
    391   e = GNUNET_new (struct Exchange);
    392   e->exchange_url = GNUNET_strdup (exchange_url);
    393   GNUNET_CONTAINER_DLL_insert (e_head,
    394                                e_tail,
    395                                e);
    396   sync_keys (e);
    397   return e;
    398 }
    399 
    400 
    401 /**
    402  * Finds new transfers that require work in the merchant database.
    403  *
    404  * @param cls NULL
    405  */
    406 static void
    407 find_work (void *cls);
    408 
    409 
    410 /**
    411  * Free resources of @a w.
    412  *
    413  * @param[in] w inquiry job to terminate
    414  */
    415 static void
    416 end_inquiry (struct Inquiry *w)
    417 {
    418   struct Exchange *e = w->exchange;
    419 
    420   GNUNET_assert (active_inquiries > 0);
    421   active_inquiries--;
    422   if (NULL != w->wdh)
    423   {
    424     TALER_EXCHANGE_get_transfers_cancel (w->wdh);
    425     w->wdh = NULL;
    426   }
    427   GNUNET_free (w->instance_id);
    428   GNUNET_free (w->payto_uri.full_payto);
    429   GNUNET_CONTAINER_DLL_remove (e->w_head,
    430                                e->w_tail,
    431                                w);
    432   GNUNET_free (w);
    433   if ( (active_inquiries < OPEN_INQUIRY_LIMIT / 2) &&
    434        (NULL == task) &&
    435        (at_limit) )
    436   {
    437     at_limit = false;
    438     GNUNET_assert (NULL == task);
    439     task = GNUNET_SCHEDULER_add_now (&find_work,
    440                                      NULL);
    441   }
    442   if ( (NULL == task) &&
    443        (! at_limit) &&
    444        (0 == active_inquiries) &&
    445        (test_mode) )
    446   {
    447     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    448                 "No more open inquiries and in test mode. Exiting.\n");
    449     GNUNET_SCHEDULER_shutdown ();
    450     return;
    451   }
    452 }
    453 
    454 
    455 /**
    456  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
    457  *
    458  * @param cls closure (NULL)
    459  */
    460 static void
    461 shutdown_task (void *cls)
    462 {
    463   (void) cls;
    464   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    465               "Running shutdown\n");
    466   while (NULL != e_head)
    467   {
    468     struct Exchange *e = e_head;
    469 
    470     while (NULL != e->w_head)
    471     {
    472       struct Inquiry *w = e->w_head;
    473 
    474       end_inquiry (w);
    475     }
    476     GNUNET_free (e->exchange_url);
    477     if (NULL != e->keys)
    478     {
    479       TALER_EXCHANGE_keys_decref (e->keys);
    480       e->keys = NULL;
    481     }
    482     GNUNET_CONTAINER_DLL_remove (e_head,
    483                                  e_tail,
    484                                  e);
    485     GNUNET_free (e);
    486   }
    487   if (NULL != eh)
    488   {
    489     TALER_MERCHANTDB_event_listen_cancel (eh);
    490     eh = NULL;
    491   }
    492   if (NULL != eh_keys)
    493   {
    494     TALER_MERCHANTDB_event_listen_cancel (eh_keys);
    495     eh_keys = NULL;
    496   }
    497   if (NULL != task)
    498   {
    499     GNUNET_SCHEDULER_cancel (task);
    500     task = NULL;
    501   }
    502   if (NULL != pg)
    503   {
    504     TALER_MERCHANTDB_disconnect (pg);
    505     pg = NULL;
    506   }
    507   cfg = NULL;
    508   if (NULL != ctx)
    509   {
    510     GNUNET_CURL_fini (ctx);
    511     ctx = NULL;
    512   }
    513   if (NULL != rc)
    514   {
    515     GNUNET_CURL_gnunet_rc_destroy (rc);
    516     rc = NULL;
    517   }
    518 }
    519 
    520 
    521 /**
    522  * Check that the given @a wire_fee is what the @a e should charge
    523  * at the @a execution_time.  If the fee is correct (according to our
    524  * database), return #GNUNET_OK.  If we do not have the fee structure in our
    525  * DB, we just accept it and return #GNUNET_NO; if we have proof that the fee
    526  * is bogus, we respond with the proof to the client and return
    527  * #GNUNET_SYSERR.
    528  *
    529  * @param w inquiry to check fees of
    530  * @param execution_time time of the wire transfer
    531  * @param wire_fee fee claimed by the exchange
    532  * @return #GNUNET_SYSERR if we returned hard proof of
    533  *   missbehavior from the exchange to the client
    534  */
    535 static enum GNUNET_GenericReturnValue
    536 check_wire_fee (struct Inquiry *w,
    537                 struct GNUNET_TIME_Timestamp execution_time,
    538                 const struct TALER_Amount *wire_fee)
    539 {
    540   struct Exchange *e = w->exchange;
    541   const struct TALER_EXCHANGE_Keys *keys = e->keys;
    542   struct TALER_WireFeeSet fees;
    543   struct TALER_MasterSignatureP master_sig;
    544   struct GNUNET_TIME_Timestamp start_date;
    545   struct GNUNET_TIME_Timestamp end_date;
    546   enum GNUNET_DB_QueryStatus qs;
    547   char *wire_method;
    548 
    549   if (NULL == keys)
    550   {
    551     GNUNET_break (0);
    552     return GNUNET_NO;
    553   }
    554   wire_method = TALER_payto_get_method (w->payto_uri.full_payto);
    555   qs = TALER_MERCHANTDB_lookup_wire_fee (pg,
    556                                          &keys->master_pub,
    557                                          wire_method,
    558                                          execution_time,
    559                                          &fees,
    560                                          &start_date,
    561                                          &end_date,
    562                                          &master_sig);
    563   switch (qs)
    564   {
    565   case GNUNET_DB_STATUS_HARD_ERROR:
    566     GNUNET_break (0);
    567     GNUNET_free (wire_method);
    568     return GNUNET_SYSERR;
    569   case GNUNET_DB_STATUS_SOFT_ERROR:
    570     GNUNET_free (wire_method);
    571     return GNUNET_NO;
    572   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    573     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    574                 "Failed to find wire fee for `%s' and method `%s' at %s in DB, accepting blindly that the fee is %s\n",
    575                 TALER_B2S (&keys->master_pub),
    576                 wire_method,
    577                 GNUNET_TIME_timestamp2s (execution_time),
    578                 TALER_amount2s (wire_fee));
    579     GNUNET_free (wire_method);
    580     return GNUNET_OK;
    581   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    582     break;
    583   }
    584   if ( (GNUNET_OK !=
    585         TALER_amount_cmp_currency (&fees.wire,
    586                                    wire_fee)) ||
    587        (0 > TALER_amount_cmp (&fees.wire,
    588                               wire_fee)) )
    589   {
    590     GNUNET_break_op (0);
    591     GNUNET_free (wire_method);
    592     return GNUNET_SYSERR;   /* expected_fee >= wire_fee */
    593   }
    594   GNUNET_free (wire_method);
    595   return GNUNET_OK;
    596 }
    597 
    598 
    599 /**
    600  * Closure for #check_transfer()
    601  */
    602 struct CheckTransferContext
    603 {
    604 
    605   /**
    606    * Pointer to the detail that we are currently
    607    * checking in #check_transfer().
    608    */
    609   const struct TALER_TrackTransferDetails *current_detail;
    610 
    611   /**
    612    * Which transaction detail are we currently looking at?
    613    */
    614   unsigned int current_offset;
    615 
    616   /**
    617    * #GNUNET_NO if we did not find a matching coin.
    618    * #GNUNET_SYSERR if we found a matching coin, but the amounts do not match.
    619    * #GNUNET_OK if we did find a matching coin.
    620    */
    621   enum GNUNET_GenericReturnValue check_transfer_result;
    622 
    623   /**
    624    * Set to error code, if any.
    625    */
    626   enum TALER_ErrorCode ec;
    627 
    628   /**
    629    * Set to true if @e ec indicates a permanent failure.
    630    */
    631   bool failure;
    632 };
    633 
    634 
    635 /**
    636  * This function checks that the information about the coin which
    637  * was paid back by _this_ wire transfer matches what _we_ (the merchant)
    638  * knew about this coin.
    639  *
    640  * @param cls closure with our `struct CheckTransferContext  *`
    641  * @param exchange_url URL of the exchange that issued @a coin_pub
    642  * @param amount_with_fee amount the exchange will transfer for this coin
    643  * @param deposit_fee fee the exchange will charge for this coin
    644  * @param refund_fee fee the exchange will charge for refunding this coin
    645  * @param wire_fee paid wire fee
    646  * @param h_wire hash of merchant's wire details
    647  * @param deposit_timestamp when did the exchange receive the deposit
    648  * @param refund_deadline until when are refunds allowed
    649  * @param exchange_sig signature by the exchange
    650  * @param exchange_pub exchange signing key used for @a exchange_sig
    651  */
    652 static void
    653 check_transfer (void *cls,
    654                 const char *exchange_url,
    655                 const struct TALER_Amount *amount_with_fee,
    656                 const struct TALER_Amount *deposit_fee,
    657                 const struct TALER_Amount *refund_fee,
    658                 const struct TALER_Amount *wire_fee,
    659                 const struct TALER_MerchantWireHashP *h_wire,
    660                 struct GNUNET_TIME_Timestamp deposit_timestamp,
    661                 struct GNUNET_TIME_Timestamp refund_deadline,
    662                 const struct TALER_ExchangeSignatureP *exchange_sig,
    663                 const struct TALER_ExchangePublicKeyP *exchange_pub)
    664 {
    665   struct CheckTransferContext *ctc = cls;
    666   const struct TALER_TrackTransferDetails *ttd = ctc->current_detail;
    667 
    668   if (GNUNET_SYSERR == ctc->check_transfer_result)
    669   {
    670     GNUNET_break (0);
    671     return;   /* already had a serious issue; odd that we're called more than once as well... */
    672   }
    673   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    674               "Checking coin with value %s\n",
    675               TALER_amount2s (amount_with_fee));
    676   if ( (GNUNET_OK !=
    677         TALER_amount_cmp_currency (amount_with_fee,
    678                                    &ttd->coin_value)) ||
    679        (0 != TALER_amount_cmp (amount_with_fee,
    680                                &ttd->coin_value)) )
    681   {
    682     /* Disagreement between the exchange and us about how much this
    683        coin is worth! */
    684     GNUNET_break_op (0);
    685     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    686                 "Disagreement about coin value %s\n",
    687                 TALER_amount2s (amount_with_fee));
    688     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    689                 "Exchange gave it a value of %s\n",
    690                 TALER_amount2s (&ttd->coin_value));
    691     ctc->check_transfer_result = GNUNET_SYSERR;
    692     /* Build the `TrackTransferConflictDetails` */
    693     ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS;
    694     ctc->failure = true;
    695     /* FIXME-#9426: this should be reported to the auditor (once the auditor has an API for this) */
    696     return;
    697   }
    698   if ( (GNUNET_OK !=
    699         TALER_amount_cmp_currency (deposit_fee,
    700                                    &ttd->coin_fee)) ||
    701        (0 != TALER_amount_cmp (deposit_fee,
    702                                &ttd->coin_fee)) )
    703   {
    704     /* Disagreement between the exchange and us about how much this
    705        coin is worth! */
    706     GNUNET_break_op (0);
    707     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    708                 "Expected fee is %s\n",
    709                 TALER_amount2s (&ttd->coin_fee));
    710     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    711                 "Fee claimed by exchange is %s\n",
    712                 TALER_amount2s (deposit_fee));
    713     ctc->check_transfer_result = GNUNET_SYSERR;
    714     /* Build the `TrackTransferConflictDetails` */
    715     ctc->ec = TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_CONFLICTING_REPORTS;
    716     ctc->failure = true;
    717     /* FIXME-#9426: this should be reported to the auditor (once the auditor has an API for this) */
    718     return;
    719   }
    720   ctc->check_transfer_result = GNUNET_OK;
    721 }
    722 
    723 
    724 /**
    725  * Function called with detailed wire transfer data, including all
    726  * of the coin transactions that were combined into the wire transfer.
    727  *
    728  * @param cls closure a `struct Inquiry *`
    729  * @param tgr response details
    730  */
    731 static void
    732 wire_transfer_cb (struct Inquiry *w,
    733                   const struct TALER_EXCHANGE_GetTransfersResponse *tgr)
    734 {
    735   struct Exchange *e = w->exchange;
    736   const struct TALER_EXCHANGE_TransferData *td = NULL;
    737 
    738   e->exchange_inquiries--;
    739   w->wdh = NULL;
    740   if (EXCHANGE_INQUIRY_LIMIT - 1 == e->exchange_inquiries)
    741     launch_inquiries_at_exchange (e);
    742   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    743               "Got response code %u from exchange for GET /transfers/$WTID\n",
    744               tgr->hr.http_status);
    745   switch (tgr->hr.http_status)
    746   {
    747   case MHD_HTTP_OK:
    748     td = &tgr->details.ok.td;
    749     w->execution_time = td->execution_time;
    750     e->transfer_delay = GNUNET_TIME_UNIT_ZERO;
    751     break;
    752   case MHD_HTTP_BAD_REQUEST:
    753   case MHD_HTTP_FORBIDDEN:
    754   case MHD_HTTP_NOT_FOUND:
    755     found_problem = true;
    756     update_transaction_status (w,
    757                                GNUNET_TIME_UNIT_FOREVER_ABS,
    758                                tgr->hr.http_status,
    759                                tgr->hr.ec,
    760                                tgr->hr.hint,
    761                                false);
    762     end_inquiry (w);
    763     return;
    764   case MHD_HTTP_INTERNAL_SERVER_ERROR:
    765   case MHD_HTTP_BAD_GATEWAY:
    766   case MHD_HTTP_GATEWAY_TIMEOUT:
    767     e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay);
    768     update_transaction_status (w,
    769                                GNUNET_TIME_relative_to_absolute (
    770                                  e->transfer_delay),
    771                                tgr->hr.http_status,
    772                                tgr->hr.ec,
    773                                tgr->hr.hint,
    774                                true);
    775     end_inquiry (w);
    776     return;
    777   default:
    778     found_problem = true;
    779     e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay);
    780     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    781                 "Unexpected HTTP status %u\n",
    782                 tgr->hr.http_status);
    783     update_transaction_status (w,
    784                                GNUNET_TIME_relative_to_absolute (
    785                                  e->transfer_delay),
    786                                tgr->hr.http_status,
    787                                tgr->hr.ec,
    788                                tgr->hr.hint,
    789                                true);
    790     end_inquiry (w);
    791     return;
    792   }
    793   TALER_MERCHANTDB_preflight (pg);
    794 
    795   {
    796     enum GNUNET_DB_QueryStatus qs;
    797 
    798     qs = TALER_MERCHANTDB_set_instance (pg,
    799                                         w->instance_id);
    800     if (0 > qs)
    801     {
    802       /* Always report on DB error as well to enable diagnostics */
    803       GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs);
    804       global_ret = EXIT_FAILURE;
    805       GNUNET_SCHEDULER_shutdown ();
    806       return;
    807     }
    808     qs = TALER_MERCHANTDB_insert_transfer_details (pg,
    809                                                    w->instance_id,
    810                                                    w->exchange->exchange_url,
    811                                                    w->payto_uri,
    812                                                    &w->wtid,
    813                                                    td);
    814     if (0 > qs)
    815     {
    816       /* Always report on DB error as well to enable diagnostics */
    817       GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR == qs);
    818       global_ret = EXIT_FAILURE;
    819       GNUNET_SCHEDULER_shutdown ();
    820       return;
    821     }
    822     // FIXME: insert_transfer_details has more complex
    823     // error possibilities inside, expose them here
    824     // and persist them with the transaction status
    825     // if they arise (especially no_account, no_exchange, conflict)
    826     // -- not sure how no_instance could happen...
    827     if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
    828     {
    829       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    830                   "Transfer already known. Ignoring duplicate.\n");
    831       GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT ==
    832                     TALER_MERCHANTDB_set_instance (
    833                       pg,
    834                       NULL));
    835       return;
    836     }
    837   }
    838 
    839   {
    840     struct CheckTransferContext ctc = {
    841       .ec = TALER_EC_NONE,
    842       .failure = false
    843     };
    844 
    845     for (unsigned int i = 0; i<td->details_length; i++)
    846     {
    847       const struct TALER_TrackTransferDetails *ttd = &td->details[i];
    848       enum GNUNET_DB_QueryStatus qs;
    849 
    850       if (TALER_EC_NONE != ctc.ec)
    851         break; /* already encountered an error */
    852       ctc.current_offset = i;
    853       ctc.current_detail = ttd;
    854       /* Set the coin as "never seen" before. */
    855       ctc.check_transfer_result = GNUNET_NO;
    856       qs = TALER_MERCHANTDB_lookup_deposits_by_contract_and_coin (
    857         pg,
    858         w->instance_id,
    859         &ttd->h_contract_terms,
    860         &ttd->coin_pub,
    861         &check_transfer,
    862         &ctc);
    863       switch (qs)
    864       {
    865       case GNUNET_DB_STATUS_SOFT_ERROR:
    866         GNUNET_break (0);
    867         ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED;
    868         break;
    869       case GNUNET_DB_STATUS_HARD_ERROR:
    870         GNUNET_break (0);
    871         ctc.ec = TALER_EC_GENERIC_DB_FETCH_FAILED;
    872         break;
    873       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    874         /* The exchange says we made this deposit, but WE do not
    875            recall making it (corrupted / unreliable database?)!
    876            Well, let's say thanks and accept the money! */
    877         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    878                     "Failed to find payment data in DB\n");
    879         ctc.check_transfer_result = GNUNET_OK;
    880         break;
    881       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    882         break;
    883       }
    884       switch (ctc.check_transfer_result)
    885       {
    886       case GNUNET_NO:
    887         /* Internal error: how can we have called #check_transfer()
    888            but still have no result? */
    889         GNUNET_break (0);
    890         ctc.ec = TALER_EC_GENERIC_INTERNAL_INVARIANT_FAILURE;
    891         GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT ==
    892                       TALER_MERCHANTDB_set_instance (
    893                         pg,
    894                         NULL));
    895         return;
    896       case GNUNET_SYSERR:
    897         /* #check_transfer() failed, report conflict! */
    898         GNUNET_break_op (0);
    899         GNUNET_assert (TALER_EC_NONE != ctc.ec);
    900         break;
    901       case GNUNET_OK:
    902         break;
    903       }
    904     }
    905     if (TALER_EC_NONE != ctc.ec)
    906     {
    907       update_transaction_status (
    908         w,
    909         ctc.failure
    910         ? GNUNET_TIME_UNIT_FOREVER_ABS
    911         : GNUNET_TIME_relative_to_absolute (
    912           GNUNET_TIME_UNIT_MINUTES),
    913         MHD_HTTP_OK,
    914         ctc.ec,
    915         NULL /* no hint */,
    916         ! ctc.failure);
    917       GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT ==
    918                     TALER_MERCHANTDB_set_instance (
    919                       pg,
    920                       NULL));
    921       end_inquiry (w);
    922       return;
    923     }
    924   }
    925 
    926   if (GNUNET_SYSERR ==
    927       check_wire_fee (w,
    928                       td->execution_time,
    929                       &td->wire_fee))
    930   {
    931     GNUNET_break_op (0);
    932     update_transaction_status (w,
    933                                GNUNET_TIME_UNIT_FOREVER_ABS,
    934                                MHD_HTTP_OK,
    935                                TALER_EC_MERCHANT_PRIVATE_POST_TRANSFERS_BAD_WIRE_FEE,
    936                                TALER_amount2s (&td->wire_fee),
    937                                false);
    938     GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT ==
    939                   TALER_MERCHANTDB_set_instance (
    940                     pg,
    941                     NULL));
    942     end_inquiry (w);
    943     return;
    944   }
    945 
    946   {
    947     enum GNUNET_DB_QueryStatus qs;
    948 
    949     qs = TALER_MERCHANTDB_finalize_transfer_status (pg,
    950                                                     w->exchange->exchange_url,
    951                                                     &w->wtid,
    952                                                     &td->h_details,
    953                                                     &td->total_amount,
    954                                                     &td->wire_fee,
    955                                                     &td->exchange_pub,
    956                                                     &td->exchange_sig);
    957     if (qs < 0)
    958     {
    959       GNUNET_break (0);
    960       global_ret = EXIT_FAILURE;
    961       GNUNET_SCHEDULER_shutdown ();
    962       return;
    963     }
    964   }
    965   GNUNET_break (GNUNET_DB_STATUS_SUCCESS_ONE_RESULT ==
    966                 TALER_MERCHANTDB_set_instance (
    967                   pg,
    968                   NULL));
    969   end_inquiry (w);
    970 }
    971 
    972 
    973 /**
    974  * Initiate download from an exchange for a given inquiry.
    975  *
    976  * @param cls a `struct Inquiry *`
    977  */
    978 static void
    979 exchange_request (void *cls)
    980 {
    981   struct Inquiry *w = cls;
    982   struct Exchange *e = w->exchange;
    983 
    984   w->task = NULL;
    985   if (NULL == e->keys)
    986     return;
    987   w->wdh = TALER_EXCHANGE_get_transfers_create (
    988     ctx,
    989     e->exchange_url,
    990     e->keys,
    991     &w->wtid);
    992   if (NULL == w->wdh)
    993   {
    994     GNUNET_break (0);
    995     e->exchange_inquiries--;
    996     e->transfer_delay = GNUNET_TIME_STD_BACKOFF (e->transfer_delay);
    997     update_transaction_status (w,
    998                                GNUNET_TIME_relative_to_absolute (
    999                                  e->transfer_delay),
   1000                                0 /* failed to begin */,
   1001                                TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_TRANSIENT_FAILURE,
   1002                                "Failed to initiate GET request at exchange",
   1003                                true);
   1004     end_inquiry (w);
   1005     return;
   1006   }
   1007   GNUNET_assert (TALER_EC_NONE ==
   1008                  TALER_EXCHANGE_get_transfers_start (w->wdh,
   1009                                                      &wire_transfer_cb,
   1010                                                      w));
   1011 
   1012   /* Wait at least 1m for the network transfer */
   1013   update_transaction_status (w,
   1014                              GNUNET_TIME_relative_to_absolute (
   1015                                GNUNET_TIME_UNIT_MINUTES),
   1016                              0 /* timeout */,
   1017                              TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_LIST,
   1018                              "Initiated GET with exchange",
   1019                              true);
   1020 }
   1021 
   1022 
   1023 /**
   1024  * Function called with information about a transfer we
   1025  * should ask the exchange about.
   1026  *
   1027  * @param cls closure (NULL)
   1028  * @param rowid row of the transfer in the merchant database
   1029  * @param instance_id instance that received the transfer
   1030  * @param exchange_url base URL of the exchange that initiated the transfer
   1031  * @param payto_uri account of the merchant that received the transfer
   1032  * @param wtid wire transfer subject identifying the aggregation
   1033  * @param next_attempt when should we next try to interact with the exchange
   1034  */
   1035 static void
   1036 start_inquiry (
   1037   void *cls,
   1038   uint64_t rowid,
   1039   const char *instance_id,
   1040   const char *exchange_url,
   1041   struct TALER_FullPayto payto_uri,
   1042   const struct TALER_WireTransferIdentifierRawP *wtid,
   1043   struct GNUNET_TIME_Absolute next_attempt)
   1044 {
   1045   struct Exchange *e;
   1046   struct Inquiry *w;
   1047 
   1048   (void) cls;
   1049   if (GNUNET_TIME_absolute_is_future (next_attempt))
   1050   {
   1051     if (NULL == task)
   1052       task = GNUNET_SCHEDULER_add_at (next_attempt,
   1053                                       &find_work,
   1054                                       NULL);
   1055     return;
   1056   }
   1057   active_inquiries++;
   1058 
   1059   e = find_exchange (exchange_url);
   1060   for (w = e->w_head; NULL != w; w = w->next)
   1061   {
   1062     if (0 == GNUNET_memcmp (&w->wtid,
   1063                             wtid))
   1064     {
   1065       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
   1066                   "Already processing inquiry. Aborting ongoing inquiry\n");
   1067       end_inquiry (w);
   1068       break;
   1069     }
   1070   }
   1071 
   1072   w = GNUNET_new (struct Inquiry);
   1073   w->payto_uri.full_payto = GNUNET_strdup (payto_uri.full_payto);
   1074   w->instance_id = GNUNET_strdup (instance_id);
   1075   w->rowid = rowid;
   1076   w->wtid = *wtid;
   1077   GNUNET_CONTAINER_DLL_insert (e->w_head,
   1078                                e->w_tail,
   1079                                w);
   1080   w->exchange = e;
   1081   if (NULL != w->exchange->keys)
   1082     w->task = GNUNET_SCHEDULER_add_now (&exchange_request,
   1083                                         w);
   1084   /* Wait at least 1 minute for /keys */
   1085   update_transaction_status (w,
   1086                              GNUNET_TIME_relative_to_absolute (
   1087                                GNUNET_TIME_UNIT_MINUTES),
   1088                              0 /* timeout */,
   1089                              TALER_EC_MERCHANT_EXCHANGE_TRANSFERS_AWAITING_KEYS,
   1090                              exchange_url,
   1091                              true);
   1092 }
   1093 
   1094 
   1095 static void
   1096 find_work (void *cls)
   1097 {
   1098   enum GNUNET_DB_QueryStatus qs;
   1099   int limit;
   1100 
   1101   (void) cls;
   1102   task = NULL;
   1103   GNUNET_assert (OPEN_INQUIRY_LIMIT >= active_inquiries);
   1104   limit = OPEN_INQUIRY_LIMIT - active_inquiries;
   1105   if (0 == limit)
   1106   {
   1107     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
   1108                 "Not looking for work: at limit\n");
   1109     at_limit = true;
   1110     return;
   1111   }
   1112   at_limit = false;
   1113   qs = TALER_MERCHANTDB_select_open_transfers (pg,
   1114                                                limit,
   1115                                                &start_inquiry,
   1116                                                NULL);
   1117   if (qs < 0)
   1118   {
   1119     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1120                 "Failed to obtain open transfers from database\n");
   1121     GNUNET_SCHEDULER_shutdown ();
   1122     return;
   1123   }
   1124   if (qs >= limit)
   1125   {
   1126     /* DB limited response, re-trigger DB interaction
   1127        the moment we significantly fall below the
   1128        limit */
   1129     at_limit = true;
   1130   }
   1131   if (0 == active_inquiries)
   1132   {
   1133     if (test_mode)
   1134     {
   1135       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
   1136                   "No more open inquiries and in test mode. Existing.\n");
   1137       GNUNET_SCHEDULER_shutdown ();
   1138       return;
   1139     }
   1140     GNUNET_log (
   1141       GNUNET_ERROR_TYPE_INFO,
   1142       "No open inquiries found, waiting for notification to resume\n");
   1143   }
   1144 }
   1145 
   1146 
   1147 /**
   1148  * Function called when transfers are added to the merchant database.  We look
   1149  * for more work.
   1150  *
   1151  * @param cls closure (NULL)
   1152  * @param extra additional event data provided
   1153  * @param extra_size number of bytes in @a extra
   1154  */
   1155 static void
   1156 transfer_added (void *cls,
   1157                 const void *extra,
   1158                 size_t extra_size)
   1159 {
   1160   (void) cls;
   1161   (void) extra;
   1162   (void) extra_size;
   1163   if (active_inquiries > OPEN_INQUIRY_LIMIT / 2)
   1164   {
   1165     /* Trigger DB only once we are substantially below the limit */
   1166     at_limit = true;
   1167     return;
   1168   }
   1169   if (NULL != task)
   1170     return;
   1171   task = GNUNET_SCHEDULER_add_now (&find_work,
   1172                                    NULL);
   1173 }
   1174 
   1175 
   1176 /**
   1177  * Function called when keys were changed in the
   1178  * merchant database. Updates ours.
   1179  *
   1180  * @param cls closure (NULL)
   1181  * @param extra additional event data provided
   1182  * @param extra_size number of bytes in @a extra
   1183  */
   1184 static void
   1185 keys_changed (void *cls,
   1186               const void *extra,
   1187               size_t extra_size)
   1188 {
   1189   const char *url = extra;
   1190   struct Exchange *e;
   1191 
   1192   (void) cls;
   1193   if ( (NULL == extra) ||
   1194        (0 == extra_size) )
   1195   {
   1196     GNUNET_break (0);
   1197     return;
   1198   }
   1199   if ('\0' != url[extra_size - 1])
   1200   {
   1201     GNUNET_break (0);
   1202     return;
   1203   }
   1204   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
   1205               "Received keys change notification: reload `%s'\n",
   1206               url);
   1207   e = find_exchange (url);
   1208   sync_keys (e);
   1209 }
   1210 
   1211 
   1212 /**
   1213  * First task.
   1214  *
   1215  * @param cls closure, NULL
   1216  * @param args remaining command-line arguments
   1217  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
   1218  * @param c configuration
   1219  */
   1220 static void
   1221 run (void *cls,
   1222      char *const *args,
   1223      const char *cfgfile,
   1224      const struct GNUNET_CONFIGURATION_Handle *c)
   1225 {
   1226   (void) args;
   1227   (void) cfgfile;
   1228 
   1229   cfg = c;
   1230   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
   1231                                  NULL);
   1232   ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
   1233                           &rc);
   1234   rc = GNUNET_CURL_gnunet_rc_create (ctx);
   1235   if (NULL == ctx)
   1236   {
   1237     GNUNET_break (0);
   1238     GNUNET_SCHEDULER_shutdown ();
   1239     global_ret = EXIT_FAILURE;
   1240     return;
   1241   }
   1242   if (NULL ==
   1243       (pg = TALER_MERCHANTDB_connect (cfg)))
   1244   {
   1245     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
   1246                 "Failed to initialize DB subsystem. Consider running taler-merchant-dbconfig!\n");
   1247     GNUNET_SCHEDULER_shutdown ();
   1248     global_ret = EXIT_FAILURE;
   1249     return;
   1250   }
   1251   {
   1252     struct GNUNET_DB_EventHeaderP es = {
   1253       .size = htons (sizeof (es)),
   1254       .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_EXPECTED)
   1255     };
   1256 
   1257     eh = TALER_MERCHANTDB_event_listen (pg,
   1258                                         &es,
   1259                                         GNUNET_TIME_UNIT_FOREVER_REL,
   1260                                         &transfer_added,
   1261                                         NULL);
   1262   }
   1263   {
   1264     struct GNUNET_DB_EventHeaderP es = {
   1265       .size = htons (sizeof (es)),
   1266       .type = htons (TALER_DBEVENT_MERCHANT_EXCHANGE_KEYS)
   1267     };
   1268 
   1269     eh_keys
   1270       = TALER_MERCHANTDB_event_listen (pg,
   1271                                        &es,
   1272                                        GNUNET_TIME_UNIT_FOREVER_REL,
   1273                                        &keys_changed,
   1274                                        NULL);
   1275   }
   1276 
   1277   GNUNET_assert (NULL == task);
   1278   task = GNUNET_SCHEDULER_add_now (&find_work,
   1279                                    NULL);
   1280 }
   1281 
   1282 
   1283 /**
   1284  * The main function of taler-merchant-reconciliation
   1285  *
   1286  * @param argc number of arguments from the command line
   1287  * @param argv command line arguments
   1288  * @return 0 ok, 1 on error
   1289  */
   1290 int
   1291 main (int argc,
   1292       char *const *argv)
   1293 {
   1294   struct GNUNET_GETOPT_CommandLineOption options[] = {
   1295     GNUNET_GETOPT_option_timetravel ('T',
   1296                                      "timetravel"),
   1297     GNUNET_GETOPT_option_flag ('t',
   1298                                "test",
   1299                                "run in test mode and exit when idle",
   1300                                &test_mode),
   1301     GNUNET_GETOPT_option_version (VERSION),
   1302     GNUNET_GETOPT_OPTION_END
   1303   };
   1304   enum GNUNET_GenericReturnValue ret;
   1305 
   1306   ret = GNUNET_PROGRAM_run (
   1307     TALER_MERCHANT_project_data (),
   1308     argc, argv,
   1309     "taler-merchant-reconciliation",
   1310     gettext_noop (
   1311       "background process that reconciles bank transfers with orders by asking the exchange"),
   1312     options,
   1313     &run, NULL);
   1314   if (GNUNET_SYSERR == ret)
   1315     return EXIT_INVALIDARGUMENT;
   1316   if (GNUNET_NO == ret)
   1317     return EXIT_SUCCESS;
   1318   if ( (found_problem) &&
   1319        (0 == global_ret) )
   1320     global_ret = 7;
   1321   return global_ret;
   1322 }
   1323 
   1324 
   1325 /* end of taler-merchant-reconciliation.c */