merchant

Merchant backend to process payments, run by merchants
Log | Files | Refs | Submodules | README | LICENSE

taler-merchant-wirewatch.c (19574B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2023, 2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 /**
     17  * @file taler-merchant-wirewatch.c
     18  * @brief Process that imports information about incoming bank transfers into the merchant backend
     19  * @author Christian Grothoff
     20  */
     21 #include "platform.h"
     22 #include "microhttpd.h"
     23 #include <gnunet/gnunet_util_lib.h>
     24 #include <jansson.h>
     25 #include <pthread.h>
     26 #include <taler/taler_dbevents.h>
     27 #include "taler_merchant_util.h"
     28 #include "taler_merchant_bank_lib.h"
     29 #include "taler_merchantdb_lib.h"
     30 #include "taler_merchantdb_plugin.h"
     31 
     32 /**
     33  * Timeout for the bank interaction.  Rather long as we should do long-polling
     34  * and do not want to wake up too often.
     35  */
     36 #define BANK_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT_MINUTES, \
     37                                                     5)
     38 
     39 
     40 /**
     41  * Information about a watch job.
     42  */
     43 struct Watch
     44 {
     45   /**
     46    * Kept in a DLL.
     47    */
     48   struct Watch *next;
     49 
     50   /**
     51    * Kept in a DLL.
     52    */
     53   struct Watch *prev;
     54 
     55   /**
     56    * Next task to run, if any.
     57    */
     58   struct GNUNET_SCHEDULER_Task *task;
     59 
     60   /**
     61    * Dynamically adjusted long polling time-out.
     62    */
     63   struct GNUNET_TIME_Relative bank_timeout;
     64 
     65   /**
     66    * For which instance are we importing bank transfers?
     67    */
     68   char *instance_id;
     69 
     70   /**
     71    * For which account are we importing bank transfers?
     72    */
     73   struct TALER_FullPayto payto_uri;
     74 
     75   /**
     76    * Bank history request.
     77    */
     78   struct TALER_MERCHANT_BANK_CreditHistoryHandle *hh;
     79 
     80   /**
     81    * Start row for the bank interaction. Exclusive.
     82    */
     83   uint64_t start_row;
     84 
     85   /**
     86    * Artificial delay to use between API calls. Used to
     87    * throttle on failures.
     88    */
     89   struct GNUNET_TIME_Relative delay;
     90 
     91   /**
     92    * When did we start our last HTTP request?
     93    */
     94   struct GNUNET_TIME_Absolute start_time;
     95 
     96   /**
     97    * How long should long-polling take at least?
     98    */
     99   struct GNUNET_TIME_Absolute long_poll_timeout;
    100 
    101   /**
    102    * Login data for the bank.
    103    */
    104   struct TALER_MERCHANT_BANK_AuthenticationData ad;
    105 
    106   /**
    107    * Set to true if we found a transaction in the last iteration.
    108    */
    109   bool found;
    110 
    111 };
    112 
    113 
    114 /**
    115  * Head of active watches.
    116  */
    117 static struct Watch *w_head;
    118 
    119 /**
    120  * Tail of active watches.
    121  */
    122 static struct Watch *w_tail;
    123 
    124 /**
    125  * The merchant's configuration.
    126  */
    127 static const struct GNUNET_CONFIGURATION_Handle *cfg;
    128 
    129 /**
    130  * Our database plugin.
    131  */
    132 static struct TALER_MERCHANTDB_Plugin *db_plugin;
    133 
    134 /**
    135  * Handle to the context for interacting with the bank.
    136  */
    137 static struct GNUNET_CURL_Context *ctx;
    138 
    139 /**
    140  * Scheduler context for running the @e ctx.
    141  */
    142 static struct GNUNET_CURL_RescheduleContext *rc;
    143 
    144 /**
    145  * Event handler to learn that the configuration changed
    146  * and we should shutdown (to be restarted).
    147  */
    148 static struct GNUNET_DB_EventHandler *eh;
    149 
    150 /**
    151  * Value to return from main(). 0 on success, non-zero on errors.
    152  */
    153 static int global_ret;
    154 
    155 /**
    156  * How many transactions should we fetch at most per batch?
    157  */
    158 static unsigned int batch_size = 32;
    159 
    160 /**
    161  * #GNUNET_YES if we are in test mode and should exit when idle.
    162  */
    163 static int test_mode;
    164 
    165 /**
    166  * #GNUNET_YES if we are in persistent mode and do
    167  * not exit on #config_changed.
    168  */
    169 static int persist_mode;
    170 
    171 /**
    172  * Set to true if we are shutting down due to a
    173  * configuration change.
    174  */
    175 static bool config_changed_flag;
    176 
    177 /**
    178  * Save progress in DB.
    179  */
    180 static void
    181 save (struct Watch *w)
    182 {
    183   enum GNUNET_DB_QueryStatus qs;
    184 
    185   qs = db_plugin->update_wirewatch_progress (db_plugin->cls,
    186                                              w->instance_id,
    187                                              w->payto_uri,
    188                                              w->start_row);
    189   if (qs < 0)
    190   {
    191     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    192                 "Failed to persist wirewatch progress for %s/%s (%d)\n",
    193                 w->instance_id,
    194                 w->payto_uri.full_payto,
    195                 qs);
    196     GNUNET_SCHEDULER_shutdown ();
    197     global_ret = EXIT_FAILURE;
    198   }
    199 }
    200 
    201 
    202 /**
    203  * Free resources of @a w.
    204  *
    205  * @param w watch job to terminate
    206  */
    207 static void
    208 end_watch (struct Watch *w)
    209 {
    210   if (NULL != w->task)
    211   {
    212     GNUNET_SCHEDULER_cancel (w->task);
    213     w->task = NULL;
    214   }
    215   if (NULL != w->hh)
    216   {
    217     TALER_MERCHANT_BANK_credit_history_cancel (w->hh);
    218     w->hh = NULL;
    219   }
    220   GNUNET_free (w->instance_id);
    221   GNUNET_free (w->payto_uri.full_payto);
    222   TALER_MERCHANT_BANK_auth_free (&w->ad);
    223   GNUNET_CONTAINER_DLL_remove (w_head,
    224                                w_tail,
    225                                w);
    226   GNUNET_free (w);
    227 }
    228 
    229 
    230 /**
    231  * We're being aborted with CTRL-C (or SIGTERM). Shut down.
    232  *
    233  * @param cls closure
    234  */
    235 static void
    236 shutdown_task (void *cls)
    237 {
    238   (void) cls;
    239   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    240               "Running shutdown\n");
    241   while (NULL != w_head)
    242   {
    243     struct Watch *w = w_head;
    244 
    245     save (w);
    246     end_watch (w);
    247   }
    248   if (NULL != eh)
    249   {
    250     db_plugin->event_listen_cancel (eh);
    251     eh = NULL;
    252   }
    253   TALER_MERCHANTDB_plugin_unload (db_plugin);
    254   db_plugin = NULL;
    255   cfg = NULL;
    256   if (NULL != ctx)
    257   {
    258     GNUNET_CURL_fini (ctx);
    259     ctx = NULL;
    260   }
    261   if (NULL != rc)
    262   {
    263     GNUNET_CURL_gnunet_rc_destroy (rc);
    264     rc = NULL;
    265   }
    266 }
    267 
    268 
    269 /**
    270  * Parse @a subject from wire transfer into @a wtid and @a exchange_url.
    271  *
    272  * @param subject wire transfer subject to parse;
    273  *        format is "$WTID $URL"
    274  * @param[out] wtid wire transfer ID to extract
    275  * @param[out] exchange_url set to exchange URL
    276  * @return #GNUNET_OK on success
    277  */
    278 static enum GNUNET_GenericReturnValue
    279 parse_subject (const char *subject,
    280                struct TALER_WireTransferIdentifierRawP *wtid,
    281                char **exchange_url)
    282 {
    283   const char *space;
    284 
    285   space = strchr (subject, ' ');
    286   if (NULL == space)
    287     return GNUNET_NO;
    288   if (GNUNET_OK !=
    289       GNUNET_STRINGS_string_to_data (subject,
    290                                      space - subject,
    291                                      wtid,
    292                                      sizeof (*wtid)))
    293     return GNUNET_NO;
    294   space++;
    295   if (! TALER_url_valid_charset (space))
    296     return GNUNET_NO;
    297   if ( (0 != strncasecmp ("http://",
    298                           space,
    299                           strlen ("http://"))) &&
    300        (0 != strncasecmp ("https://",
    301                           space,
    302                           strlen ("https://"))) )
    303     return GNUNET_NO;
    304   *exchange_url = GNUNET_strdup (space);
    305   return GNUNET_OK;
    306 }
    307 
    308 
    309 /**
    310  * Run next iteration.
    311  *
    312  * @param cls a `struct Watch *`
    313  */
    314 static void
    315 do_work (void *cls);
    316 
    317 
    318 /**
    319  * Callbacks of this type are used to serve the result of asking
    320  * the bank for the credit transaction history.
    321  *
    322  * @param cls a `struct Watch *`
    323  * @param http_status HTTP response code, #MHD_HTTP_OK (200) for successful status request
    324  *                    0 if the bank's reply is bogus (fails to follow the protocol),
    325  *                    #MHD_HTTP_NO_CONTENT if there are no more results; on success the
    326  *                    last callback is always of this status (even if `abs(num_results)` were
    327  *                    already returned).
    328  * @param ec detailed error code
    329  * @param serial_id monotonically increasing counter corresponding to the transaction
    330  * @param details details about the wire transfer
    331  * @return #GNUNET_OK to continue, #GNUNET_SYSERR to abort iteration
    332  */
    333 static enum GNUNET_GenericReturnValue
    334 credit_cb (
    335   void *cls,
    336   unsigned int http_status,
    337   enum TALER_ErrorCode ec,
    338   uint64_t serial_id,
    339   const struct TALER_MERCHANT_BANK_CreditDetails *details)
    340 {
    341   struct Watch *w = cls;
    342 
    343   switch (http_status)
    344   {
    345   case 0:
    346     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    347                 "Invalid HTTP response (HTTP status: 0, %d) from bank\n",
    348                 ec);
    349     w->delay = GNUNET_TIME_STD_BACKOFF (w->delay);
    350     break;
    351   case MHD_HTTP_OK:
    352     {
    353       enum GNUNET_DB_QueryStatus qs;
    354       char *exchange_url;
    355       struct TALER_WireTransferIdentifierRawP wtid;
    356 
    357       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    358                   "Received wire transfer `%s' over %s\n",
    359                   details->wire_subject,
    360                   TALER_amount2s (&details->amount));
    361       w->found = true;
    362       if (GNUNET_OK !=
    363           parse_subject (details->wire_subject,
    364                          &wtid,
    365                          &exchange_url))
    366       {
    367         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    368                     "Skipping transfer %llu (%s): not from exchange\n",
    369                     (unsigned long long) serial_id,
    370                     details->wire_subject);
    371         w->start_row = serial_id;
    372         return GNUNET_OK;
    373       }
    374       /* FIXME-Performance-Optimization: consider grouping multiple inserts
    375          into one bigger transaction with just one notify. */
    376       qs = db_plugin->insert_transfer (db_plugin->cls,
    377                                        w->instance_id,
    378                                        exchange_url,
    379                                        &wtid,
    380                                        &details->amount,
    381                                        details->credit_account_uri,
    382                                        serial_id);
    383       GNUNET_free (exchange_url);
    384       if (qs < 0)
    385       {
    386         GNUNET_break (0);
    387         GNUNET_SCHEDULER_shutdown ();
    388         w->hh = NULL;
    389         return GNUNET_SYSERR;
    390       }
    391       /* Success => reset back-off timer! */
    392       w->delay = GNUNET_TIME_UNIT_ZERO;
    393       {
    394         struct GNUNET_DB_EventHeaderP es = {
    395           .size = htons (sizeof (es)),
    396           .type = htons (TALER_DBEVENT_MERCHANT_WIRE_TRANSFER_CONFIRMED)
    397         };
    398 
    399         db_plugin->event_notify (db_plugin->cls,
    400                                  &es,
    401                                  NULL,
    402                                  0);
    403       }
    404     }
    405     w->start_row = serial_id;
    406     return GNUNET_OK;
    407   case MHD_HTTP_NO_CONTENT:
    408     save (w);
    409     /* Delay artificially if server returned before long-poll timeout */
    410     if (! w->found)
    411       w->delay = GNUNET_TIME_absolute_get_remaining (w->long_poll_timeout);
    412     break;
    413   case MHD_HTTP_NOT_FOUND:
    414     /* configuration likely wrong, wait at least 1 minute, backoff up to 15 minutes! */
    415     w->delay = GNUNET_TIME_relative_max (GNUNET_TIME_UNIT_MINUTES,
    416                                          GNUNET_TIME_STD_BACKOFF (w->delay));
    417     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    418                 "Bank claims account is unknown, waiting for %s before trying again\n",
    419                 GNUNET_TIME_relative2s (w->delay,
    420                                         true));
    421     break;
    422   case MHD_HTTP_GATEWAY_TIMEOUT:
    423     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    424                 "Gateway timeout, adjusting long polling threshold\n");
    425     /* Limit new timeout at request delay */
    426     w->bank_timeout
    427       = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_duration (
    428                                     w->start_time),
    429                                   w->bank_timeout);
    430     /* set the timeout a bit earlier */
    431     w->bank_timeout
    432       = GNUNET_TIME_relative_subtract (w->bank_timeout,
    433                                        GNUNET_TIME_UNIT_SECONDS);
    434     /* do not allow it to go to zero */
    435     w->bank_timeout
    436       = GNUNET_TIME_relative_max (w->bank_timeout,
    437                                   GNUNET_TIME_UNIT_SECONDS);
    438     w->delay = GNUNET_TIME_STD_BACKOFF (w->delay);
    439     break;
    440   default:
    441     /* Something went wrong, try again, but with back-off */
    442     w->delay = GNUNET_TIME_STD_BACKOFF (w->delay);
    443     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    444                 "Unexpected HTTP status code %u(%d) from bank\n",
    445                 http_status,
    446                 ec);
    447     break;
    448   }
    449   w->hh = NULL;
    450   if (test_mode && (! w->found))
    451   {
    452     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    453                 "No transactions found and in test mode. Ending watch!\n");
    454     end_watch (w);
    455     if (NULL == w_head)
    456       GNUNET_SCHEDULER_shutdown ();
    457     return GNUNET_OK;
    458   }
    459   w->task = GNUNET_SCHEDULER_add_delayed (w->delay,
    460                                           &do_work,
    461                                           w);
    462   return GNUNET_OK;
    463 }
    464 
    465 
    466 static void
    467 do_work (void *cls)
    468 {
    469   struct Watch *w = cls;
    470 
    471   w->task = NULL;
    472   w->found = false;
    473   w->long_poll_timeout
    474     = GNUNET_TIME_relative_to_absolute (w->bank_timeout);
    475   w->start_time
    476     = GNUNET_TIME_absolute_get ();
    477   w->hh = TALER_MERCHANT_BANK_credit_history (ctx,
    478                                               &w->ad,
    479                                               w->start_row,
    480                                               batch_size,
    481                                               test_mode
    482                                               ? GNUNET_TIME_UNIT_ZERO
    483                                               : w->bank_timeout,
    484                                               &credit_cb,
    485                                               w);
    486   if (NULL == w->hh)
    487   {
    488     GNUNET_break (0);
    489     GNUNET_SCHEDULER_shutdown ();
    490     return;
    491   }
    492 }
    493 
    494 
    495 /**
    496  * Function called with information about a accounts
    497  * the wirewatcher should monitor.
    498  *
    499  * @param cls closure (NULL)
    500  * @param instance instance that owns the account
    501  * @param payto_uri account URI
    502  * @param credit_facade_url URL for the credit facade
    503  * @param credit_facade_credentials account access credentials
    504  * @param last_serial last transaction serial (inclusive) we have seen from this account
    505  */
    506 static void
    507 start_watch (
    508   void *cls,
    509   const char *instance,
    510   struct TALER_FullPayto payto_uri,
    511   const char *credit_facade_url,
    512   const json_t *credit_facade_credentials,
    513   uint64_t last_serial)
    514 {
    515   struct Watch *w = GNUNET_new (struct Watch);
    516 
    517   (void) cls;
    518   w->bank_timeout = BANK_TIMEOUT;
    519   if (GNUNET_OK !=
    520       TALER_MERCHANT_BANK_auth_parse_json (credit_facade_credentials,
    521                                            credit_facade_url,
    522                                            &w->ad))
    523   {
    524     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    525                 "Failed to parse authentication data of `%s/%s'\n",
    526                 instance,
    527                 payto_uri.full_payto);
    528     GNUNET_free (w);
    529     GNUNET_SCHEDULER_shutdown ();
    530     global_ret = EXIT_NOTCONFIGURED;
    531     return;
    532   }
    533 
    534   GNUNET_CONTAINER_DLL_insert (w_head,
    535                                w_tail,
    536                                w);
    537   w->instance_id = GNUNET_strdup (instance);
    538   w->payto_uri.full_payto = GNUNET_strdup (payto_uri.full_payto);
    539   w->start_row = last_serial;
    540   w->task = GNUNET_SCHEDULER_add_now (&do_work,
    541                                       w);
    542 }
    543 
    544 
    545 /**
    546  * Function called on configuration change events received from Postgres.  We
    547  * shutdown (and systemd should restart us).
    548  *
    549  * @param cls closure (NULL)
    550  * @param extra additional event data provided
    551  * @param extra_size number of bytes in @a extra
    552  */
    553 static void
    554 config_changed (void *cls,
    555                 const void *extra,
    556                 size_t extra_size)
    557 {
    558   (void) cls;
    559   (void) extra;
    560   (void) extra_size;
    561   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    562               "Configuration changed, %s\n",
    563               0 == persist_mode
    564               ? "restarting"
    565               : "reinitializing");
    566   config_changed_flag = true;
    567   GNUNET_SCHEDULER_shutdown ();
    568 }
    569 
    570 
    571 /**
    572  * First task.
    573  *
    574  * @param cls closure, NULL
    575  * @param args remaining command-line arguments
    576  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
    577  * @param c configuration
    578  */
    579 static void
    580 run (void *cls,
    581      char *const *args,
    582      const char *cfgfile,
    583      const struct GNUNET_CONFIGURATION_Handle *c)
    584 {
    585   (void) args;
    586   (void) cfgfile;
    587 
    588   cfg = c;
    589   GNUNET_SCHEDULER_add_shutdown (&shutdown_task,
    590                                  NULL);
    591   ctx = GNUNET_CURL_init (&GNUNET_CURL_gnunet_scheduler_reschedule,
    592                           &rc);
    593   rc = GNUNET_CURL_gnunet_rc_create (ctx);
    594   if (NULL == ctx)
    595   {
    596     GNUNET_break (0);
    597     GNUNET_SCHEDULER_shutdown ();
    598     global_ret = EXIT_FAILURE;
    599     return;
    600   }
    601   if (NULL ==
    602       (db_plugin = TALER_MERCHANTDB_plugin_load (cfg)))
    603   {
    604     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    605                 "Failed to initialize DB subsystem\n");
    606     GNUNET_SCHEDULER_shutdown ();
    607     global_ret = EXIT_NOTCONFIGURED;
    608     return;
    609   }
    610   if (GNUNET_OK !=
    611       db_plugin->connect (db_plugin->cls))
    612   {
    613     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    614                 "Failed to connect to database. Consider running taler-merchant-dbinit!\n");
    615     GNUNET_SCHEDULER_shutdown ();
    616     global_ret = EXIT_FAILURE;
    617     return;
    618   }
    619   {
    620     struct GNUNET_DB_EventHeaderP es = {
    621       .size = htons (sizeof (es)),
    622       .type = htons (TALER_DBEVENT_MERCHANT_ACCOUNTS_CHANGED)
    623     };
    624 
    625     eh = db_plugin->event_listen (db_plugin->cls,
    626                                   &es,
    627                                   GNUNET_TIME_UNIT_FOREVER_REL,
    628                                   &config_changed,
    629                                   NULL);
    630   }
    631   {
    632     enum GNUNET_DB_QueryStatus qs;
    633 
    634     qs = db_plugin->select_wirewatch_accounts (db_plugin->cls,
    635                                                &start_watch,
    636                                                NULL);
    637     if (qs < 0)
    638     {
    639       GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    640                   "Failed to obtain wirewatch accounts from database\n");
    641       GNUNET_SCHEDULER_shutdown ();
    642       global_ret = EXIT_NO_RESTART;
    643       return;
    644     }
    645     if ( (NULL == w_head) &&
    646          (GNUNET_YES == test_mode) )
    647     {
    648       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    649                   "No active wirewatch accounts in database and in test mode. Exiting.\n");
    650       GNUNET_SCHEDULER_shutdown ();
    651       global_ret = EXIT_SUCCESS;
    652       return;
    653     }
    654   }
    655 }
    656 
    657 
    658 /**
    659  * The main function of taler-merchant-wirewatch
    660  *
    661  * @param argc number of arguments from the command line
    662  * @param argv command line arguments
    663  * @return 0 ok, 1 on error
    664  */
    665 int
    666 main (int argc,
    667       char *const *argv)
    668 {
    669   struct GNUNET_GETOPT_CommandLineOption options[] = {
    670     GNUNET_GETOPT_option_flag ('p',
    671                                "persist",
    672                                "run in persist mode and do not exit on configuration changes",
    673                                &persist_mode),
    674     GNUNET_GETOPT_option_timetravel ('T',
    675                                      "timetravel"),
    676     GNUNET_GETOPT_option_flag ('t',
    677                                "test",
    678                                "run in test mode and exit when idle",
    679                                &test_mode),
    680     GNUNET_GETOPT_option_version (VERSION "-" VCS_VERSION),
    681     GNUNET_GETOPT_OPTION_END
    682   };
    683   enum GNUNET_GenericReturnValue ret;
    684 
    685   do {
    686     config_changed_flag = false;
    687     ret = GNUNET_PROGRAM_run (
    688       TALER_MERCHANT_project_data (),
    689       argc, argv,
    690       "taler-merchant-wirewatch",
    691       gettext_noop (
    692         "background process that watches for incoming wire transfers to the merchant bank account"),
    693       options,
    694       &run, NULL);
    695   } while ( (1 == persist_mode) &&
    696             config_changed_flag);
    697   if (GNUNET_SYSERR == ret)
    698     return EXIT_INVALIDARGUMENT;
    699   if (GNUNET_NO == ret)
    700     return EXIT_SUCCESS;
    701   return global_ret;
    702 }
    703 
    704 
    705 /* end of taler-exchange-wirewatch.c */