exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

taler-helper-auditor-transfer.c (16117B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2017-2024 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
     12 
     13   You should have received a copy of the GNU General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 /**
     17  * @file auditor/taler-helper-auditor-transfer.c
     18  * @brief audits that deposits past due date are
     19  *    aggregated and have a matching wire transfer
     20  * database.
     21  * @author Christian Grothoff
     22  */
     23 #include "taler/platform.h"
     24 #include <gnunet/gnunet_util_lib.h>
     25 #include <gnunet/gnunet_curl_lib.h>
     26 #include "taler/taler_auditordb_plugin.h"
     27 #include "taler/taler_exchangedb_lib.h"
     28 #include "taler/taler_json_lib.h"
     29 #include "taler/taler_signatures.h"
     30 #include "report-lib.h"
     31 #include "taler/taler_dbevents.h"
     32 
     33 
     34 /**
     35  * Run in test mode. Exit when idle instead of
     36  * going to sleep and waiting for more work.
     37  */
     38 static int test_mode;
     39 
     40 /**
     41  * Return value from main().
     42  */
     43 static int global_ret;
     44 
     45 /**
     46  * Last reserve_out / wire_out serial IDs seen.
     47  */
     48 static TALER_ARL_DEF_PP (wire_batch_deposit_id);
     49 static TALER_ARL_DEF_PP (wire_aggregation_id);
     50 
     51 /**
     52  * Total amount which the exchange did not aggregate/transfer in time.
     53  */
     54 static TALER_ARL_DEF_AB (total_amount_lag);
     55 
     56 /**
     57  * Total amount which the exchange did aggregate/transfer too early.
     58  */
     59 static TALER_ARL_DEF_AB (total_early_aggregation);
     60 
     61 /**
     62  * Should we run checks that only work for exchange-internal audits?
     63  */
     64 static int internal_checks;
     65 
     66 /**
     67  * Database event handler to wake us up again.
     68  */
     69 static struct GNUNET_DB_EventHandler *eh;
     70 
     71 /**
     72  * The auditors's configuration.
     73  */
     74 static const struct GNUNET_CONFIGURATION_Handle *cfg;
     75 
     76 
     77 /**
     78  * Task run on shutdown.
     79  *
     80  * @param cls NULL
     81  */
     82 static void
     83 do_shutdown (void *cls)
     84 {
     85   (void) cls;
     86   if (NULL != eh)
     87   {
     88     TALER_ARL_adb->event_listen_cancel (eh);
     89     eh = NULL;
     90   }
     91   TALER_ARL_done ();
     92   TALER_EXCHANGEDB_unload_accounts ();
     93   TALER_ARL_cfg = NULL;
     94 }
     95 
     96 
     97 /**
     98  * Closure for import_wire_missing_cb().
     99  */
    100 struct ImportMissingWireContext
    101 {
    102   /**
    103    * Set to maximum row ID encountered.
    104    */
    105   uint64_t max_batch_deposit_uuid;
    106 
    107   /**
    108    * Set to database errors in callback.
    109    */
    110   enum GNUNET_DB_QueryStatus err;
    111 };
    112 
    113 
    114 /**
    115  * Function called on deposits that need to be checked for their
    116  * wire transfer.
    117  *
    118  * @param cls closure, points to a `struct ImportMissingWireContext`
    119  * @param batch_deposit_serial_id serial of the entry in the batch deposits table
    120  * @param total_amount value of the missing deposits, including fee
    121  * @param wire_target_h_payto where should the funds be wired
    122  * @param deadline what was the earliest requested wire transfer deadline
    123  */
    124 static void
    125 import_wire_missing_cb (
    126   void *cls,
    127   uint64_t batch_deposit_serial_id,
    128   const struct TALER_Amount *total_amount,
    129   const struct TALER_FullPaytoHashP *wire_target_h_payto,
    130   struct GNUNET_TIME_Timestamp deadline)
    131 {
    132   struct ImportMissingWireContext *wc = cls;
    133   enum GNUNET_DB_QueryStatus qs;
    134 
    135   if (wc->err < 0)
    136     return; /* already failed */
    137   GNUNET_assert (batch_deposit_serial_id >= wc->max_batch_deposit_uuid);
    138   wc->max_batch_deposit_uuid = batch_deposit_serial_id + 1;
    139   qs = TALER_ARL_adb->delete_early_aggregation (
    140     TALER_ARL_adb->cls,
    141     batch_deposit_serial_id);
    142   switch (qs)
    143   {
    144   case GNUNET_DB_STATUS_SOFT_ERROR:
    145     GNUNET_break (0);
    146     wc->err = qs;
    147     return;
    148   case GNUNET_DB_STATUS_HARD_ERROR:
    149     GNUNET_break (0);
    150     wc->err = qs;
    151     return;
    152   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    153     qs = TALER_ARL_adb->insert_pending_deposit (
    154       TALER_ARL_adb->cls,
    155       batch_deposit_serial_id,
    156       wire_target_h_payto,
    157       total_amount,
    158       deadline);
    159     if (0 > qs)
    160     {
    161       GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    162       wc->err = qs;
    163     }
    164     TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_amount_lag),
    165                           &TALER_ARL_USE_AB (total_amount_lag),
    166                           total_amount);
    167     break;
    168   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    169     TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (total_early_aggregation),
    170                                &TALER_ARL_USE_AB (total_early_aggregation),
    171                                total_amount);
    172     break;
    173   default:
    174     GNUNET_assert (0);
    175   }
    176 }
    177 
    178 
    179 /**
    180  * Checks for wire transfers that should have happened.
    181  *
    182  * @return transaction status
    183  */
    184 static enum GNUNET_DB_QueryStatus
    185 check_for_required_transfers (void)
    186 {
    187   enum GNUNET_DB_QueryStatus qs;
    188   struct ImportMissingWireContext wc = {
    189     .max_batch_deposit_uuid = TALER_ARL_USE_PP (wire_batch_deposit_id),
    190     .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
    191   };
    192 
    193   qs = TALER_ARL_edb->select_batch_deposits_missing_wire (
    194     TALER_ARL_edb->cls,
    195     TALER_ARL_USE_PP (wire_batch_deposit_id),
    196     &import_wire_missing_cb,
    197     &wc);
    198   if (0 > qs)
    199   {
    200     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    201     return qs;
    202   }
    203   if (0 > wc.err)
    204   {
    205     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == wc.err);
    206     return wc.err;
    207   }
    208   TALER_ARL_USE_PP (wire_batch_deposit_id) = wc.max_batch_deposit_uuid;
    209   return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    210 }
    211 
    212 
    213 /**
    214  * Closure for #clear_finished_transfer_cb().
    215  */
    216 struct AggregationContext
    217 {
    218   /**
    219    * Set to maximum row ID encountered.
    220    */
    221   uint64_t max_aggregation_serial;
    222 
    223   /**
    224    * Set to database errors in callback.
    225    */
    226   enum GNUNET_DB_QueryStatus err;
    227 };
    228 
    229 
    230 /**
    231  * Function called on aggregations that were done for
    232  * a (batch) deposit.
    233  *
    234  * @param cls closure
    235  * @param amount affected amount
    236  * @param tracking_serial_id where in the table are we
    237  * @param batch_deposit_serial_id which batch deposit was aggregated
    238  */
    239 static void
    240 clear_finished_transfer_cb (
    241   void *cls,
    242   const struct TALER_Amount *amount,
    243   uint64_t tracking_serial_id,
    244   uint64_t batch_deposit_serial_id)
    245 {
    246   struct AggregationContext *ac = cls;
    247   enum GNUNET_DB_QueryStatus qs;
    248 
    249   if (0 > ac->err)
    250     return; /* already failed */
    251   GNUNET_assert (ac->max_aggregation_serial <= tracking_serial_id);
    252   ac->max_aggregation_serial = tracking_serial_id + 1;
    253   qs = TALER_ARL_adb->delete_pending_deposit (
    254     TALER_ARL_adb->cls,
    255     batch_deposit_serial_id);
    256   switch (qs)
    257   {
    258   case GNUNET_DB_STATUS_SOFT_ERROR:
    259     GNUNET_break (0);
    260     ac->err = qs;
    261     return;
    262   case GNUNET_DB_STATUS_HARD_ERROR:
    263     GNUNET_break (0);
    264     ac->err = qs;
    265     return;
    266   case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    267     qs = TALER_ARL_adb->insert_early_aggregation (
    268       TALER_ARL_adb->cls,
    269       batch_deposit_serial_id,
    270       tracking_serial_id,
    271       amount);
    272     if (0 > qs)
    273     {
    274       GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    275       ac->err = qs;
    276       return;
    277     }
    278     TALER_ARL_amount_add (&TALER_ARL_USE_AB (total_early_aggregation),
    279                           &TALER_ARL_USE_AB (total_early_aggregation),
    280                           amount);
    281     break;
    282   case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    283     TALER_ARL_amount_subtract (&TALER_ARL_USE_AB (total_amount_lag),
    284                                &TALER_ARL_USE_AB (total_amount_lag),
    285                                amount);
    286     break;
    287   default:
    288     GNUNET_assert (0);
    289   }
    290 }
    291 
    292 
    293 /**
    294  * Checks that all wire transfers that should have happened
    295  * (based on deposits) have indeed happened.
    296  *
    297  * @return transaction status
    298  */
    299 static enum GNUNET_DB_QueryStatus
    300 check_for_completed_transfers (void)
    301 {
    302   struct AggregationContext ac = {
    303     .max_aggregation_serial = TALER_ARL_USE_PP (wire_aggregation_id),
    304     .err = GNUNET_DB_STATUS_SUCCESS_ONE_RESULT
    305   };
    306   enum GNUNET_DB_QueryStatus qs;
    307 
    308   qs = TALER_ARL_edb->select_aggregations_above_serial (
    309     TALER_ARL_edb->cls,
    310     TALER_ARL_USE_PP (wire_aggregation_id),
    311     &clear_finished_transfer_cb,
    312     &ac);
    313   if (0 > qs)
    314   {
    315     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    316     return qs;
    317   }
    318   if (0 > ac.err)
    319   {
    320     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == ac.err);
    321     return ac.err;
    322   }
    323   TALER_ARL_USE_PP (wire_aggregation_id) = ac.max_aggregation_serial;
    324   return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    325 }
    326 
    327 
    328 /**
    329  * Start the database transactions and begin the audit.
    330  *
    331  * @return transaction status
    332  */
    333 static enum GNUNET_DB_QueryStatus
    334 begin_transaction (void)
    335 {
    336   enum GNUNET_DB_QueryStatus qs;
    337 
    338   if (GNUNET_SYSERR ==
    339       TALER_ARL_edb->preflight (TALER_ARL_edb->cls))
    340   {
    341     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    342                 "Failed to initialize exchange database connection.\n");
    343     return GNUNET_DB_STATUS_HARD_ERROR;
    344   }
    345   if (GNUNET_SYSERR ==
    346       TALER_ARL_adb->preflight (TALER_ARL_adb->cls))
    347   {
    348     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    349                 "Failed to initialize auditor database session.\n");
    350     return GNUNET_DB_STATUS_HARD_ERROR;
    351   }
    352   if (GNUNET_OK !=
    353       TALER_ARL_adb->start (TALER_ARL_adb->cls))
    354   {
    355     GNUNET_break (0);
    356     return GNUNET_DB_STATUS_HARD_ERROR;
    357   }
    358   if (GNUNET_OK !=
    359       TALER_ARL_edb->start_read_only (TALER_ARL_edb->cls,
    360                                       "transfer auditor"))
    361   {
    362     GNUNET_break (0);
    363     TALER_ARL_adb->rollback (TALER_ARL_adb->cls);
    364     return GNUNET_DB_STATUS_HARD_ERROR;
    365   }
    366   qs = TALER_ARL_adb->get_auditor_progress (
    367     TALER_ARL_adb->cls,
    368     TALER_ARL_GET_PP (wire_batch_deposit_id),
    369     TALER_ARL_GET_PP (wire_aggregation_id),
    370     NULL);
    371   if (0 > qs)
    372     goto handle_db_error;
    373 
    374   qs = TALER_ARL_adb->get_balance (
    375     TALER_ARL_adb->cls,
    376     TALER_ARL_GET_AB (total_amount_lag),
    377     TALER_ARL_GET_AB (total_early_aggregation),
    378     NULL);
    379   if (0 > qs)
    380     goto handle_db_error;
    381   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
    382   {
    383     GNUNET_log (GNUNET_ERROR_TYPE_MESSAGE,
    384                 "First analysis of with transfer auditor, starting audit from scratch\n");
    385   }
    386   else
    387   {
    388     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    389                 "Resuming transfer audit at %llu / %llu\n",
    390                 (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id),
    391                 (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id));
    392   }
    393 
    394   qs = check_for_required_transfers ();
    395   if (0 > qs)
    396     goto handle_db_error;
    397   qs = check_for_completed_transfers ();
    398   if (0 > qs)
    399     goto handle_db_error;
    400 
    401   qs = TALER_ARL_adb->update_auditor_progress (
    402     TALER_ARL_adb->cls,
    403     TALER_ARL_SET_PP (wire_batch_deposit_id),
    404     TALER_ARL_SET_PP (wire_aggregation_id),
    405     NULL);
    406   if (0 > qs)
    407     goto handle_db_error;
    408   qs = TALER_ARL_adb->insert_auditor_progress (
    409     TALER_ARL_adb->cls,
    410     TALER_ARL_SET_PP (wire_batch_deposit_id),
    411     TALER_ARL_SET_PP (wire_aggregation_id),
    412     NULL);
    413   if (0 > qs)
    414     goto handle_db_error;
    415   qs = TALER_ARL_adb->update_balance (
    416     TALER_ARL_adb->cls,
    417     TALER_ARL_SET_AB (total_amount_lag),
    418     TALER_ARL_SET_AB (total_early_aggregation),
    419     NULL);
    420   if (0 > qs)
    421     goto handle_db_error;
    422   qs = TALER_ARL_adb->insert_balance (
    423     TALER_ARL_adb->cls,
    424     TALER_ARL_SET_AB (total_amount_lag),
    425     TALER_ARL_SET_AB (total_early_aggregation),
    426     NULL);
    427   if (0 > qs)
    428     goto handle_db_error;
    429   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    430               "Concluded audit step at %llu/%llu\n",
    431               (unsigned long long) TALER_ARL_USE_PP (wire_aggregation_id),
    432               (unsigned long long) TALER_ARL_USE_PP (wire_batch_deposit_id));
    433   TALER_ARL_edb->rollback (TALER_ARL_edb->cls);
    434   qs = TALER_ARL_adb->commit (TALER_ARL_adb->cls);
    435   if (0 > qs)
    436     goto handle_db_error;
    437   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    438               "Transaction concluded!\n");
    439   return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    440 handle_db_error:
    441   TALER_ARL_adb->rollback (TALER_ARL_adb->cls);
    442   TALER_ARL_edb->rollback (TALER_ARL_edb->cls);
    443   GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    444   return qs;
    445 }
    446 
    447 
    448 /**
    449  * Start auditor process.
    450  */
    451 static void
    452 start (void)
    453 {
    454   enum GNUNET_DB_QueryStatus qs;
    455 
    456   for (unsigned int max_retries = 3; max_retries>0; max_retries--)
    457   {
    458     GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    459                 "Trying again (%u attempts left)\n",
    460                 max_retries);
    461     qs = begin_transaction ();
    462     if (GNUNET_DB_STATUS_SOFT_ERROR != qs)
    463       break;
    464   }
    465   if (0 > qs)
    466   {
    467     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    468                 "Audit failed\n");
    469     GNUNET_break (0);
    470     global_ret = EXIT_FAILURE;
    471     GNUNET_SCHEDULER_shutdown ();
    472     return;
    473   }
    474 }
    475 
    476 
    477 /**
    478  * Function called on events received from Postgres.
    479  *
    480  * @param cls closure, NULL
    481  * @param extra additional event data provided
    482  * @param extra_size number of bytes in @a extra
    483  */
    484 static void
    485 db_notify (void *cls,
    486            const void *extra,
    487            size_t extra_size)
    488 {
    489   (void) cls;
    490   (void) extra;
    491   (void) extra_size;
    492 
    493   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    494               "Received notification to wake transfer helper\n");
    495   start ();
    496 }
    497 
    498 
    499 /**
    500  * Main function that will be run.
    501  *
    502  * @param cls closure
    503  * @param args remaining command-line arguments
    504  * @param cfgfile name of the configuration file used (for saving, can be NULL!)
    505  * @param c configuration
    506  */
    507 static void
    508 run (void *cls,
    509      char *const *args,
    510      const char *cfgfile,
    511      const struct GNUNET_CONFIGURATION_Handle *c)
    512 {
    513   (void) cls;
    514   (void) args;
    515   (void) cfgfile;
    516   cfg = c;
    517   if (GNUNET_OK !=
    518       TALER_ARL_init (c))
    519   {
    520     global_ret = EXIT_FAILURE;
    521     return;
    522   }
    523   GNUNET_SCHEDULER_add_shutdown (&do_shutdown,
    524                                  NULL);
    525   if (GNUNET_OK !=
    526       TALER_EXCHANGEDB_load_accounts (TALER_ARL_cfg,
    527                                       TALER_EXCHANGEDB_ALO_DEBIT
    528                                       | TALER_EXCHANGEDB_ALO_CREDIT
    529                                       | TALER_EXCHANGEDB_ALO_AUTHDATA))
    530   {
    531     GNUNET_log (GNUNET_ERROR_TYPE_ERROR,
    532                 "No bank accounts configured\n");
    533     global_ret = EXIT_NOTCONFIGURED;
    534     GNUNET_SCHEDULER_shutdown ();
    535     return;
    536   }
    537   if (0 == test_mode)
    538   {
    539     // FIXME-Optimization: use different event type in the future!
    540     struct GNUNET_DB_EventHeaderP es = {
    541       .size = htons (sizeof (es)),
    542       .type = htons (TALER_DBEVENT_EXCHANGE_AUDITOR_WAKE_HELPER_WIRE)
    543     };
    544 
    545     eh = TALER_ARL_adb->event_listen (TALER_ARL_adb->cls,
    546                                       &es,
    547                                       GNUNET_TIME_UNIT_FOREVER_REL,
    548                                       &db_notify,
    549                                       NULL);
    550     GNUNET_assert (NULL != eh);
    551   }
    552   start ();
    553 }
    554 
    555 
    556 /**
    557  * The main function of the wire auditing tool. Checks that
    558  * the exchange's records of wire transfers match that of
    559  * the wire gateway.
    560  *
    561  * @param argc number of arguments from the command line
    562  * @param argv command line arguments
    563  * @return 0 ok, 1 on error
    564  */
    565 int
    566 main (int argc,
    567       char *const *argv)
    568 {
    569   const struct GNUNET_GETOPT_CommandLineOption options[] = {
    570     GNUNET_GETOPT_option_flag ('i',
    571                                "internal",
    572                                "perform checks only applicable for exchange-internal audits",
    573                                &internal_checks),
    574     GNUNET_GETOPT_option_flag ('t',
    575                                "test",
    576                                "run in test mode and exit when idle",
    577                                &test_mode),
    578     GNUNET_GETOPT_option_timetravel ('T',
    579                                      "timetravel"),
    580     GNUNET_GETOPT_OPTION_END
    581   };
    582   enum GNUNET_GenericReturnValue ret;
    583 
    584   ret = GNUNET_PROGRAM_run (
    585     TALER_AUDITOR_project_data (),
    586     argc,
    587     argv,
    588     "taler-helper-auditor-transfer",
    589     gettext_noop (
    590       "Audit exchange database for consistency of aggregations/transfers with respect to deposit deadlines"),
    591     options,
    592     &run,
    593     NULL);
    594   if (GNUNET_SYSERR == ret)
    595     return EXIT_INVALIDARGUMENT;
    596   if (GNUNET_NO == ret)
    597     return EXIT_SUCCESS;
    598   return global_ret;
    599 }
    600 
    601 
    602 /* end of taler-helper-auditor-transfer.c */