exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

pg_reserves_in_insert.c (11751B)


      1 /*
      2    This file is part of TALER
      3    Copyright (C) 2022-2024 Taler Systems SA
      4 
      5    TALER is free software; you can redistribute it and/or modify it under the
      6    terms of the GNU General Public License as published by the Free Software
      7    Foundation; either version 3, or (at your option) any later version.
      8 
      9    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
     12 
     13    You should have received a copy of the GNU General Public License along with
     14    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15  */
     16 /**
     17  * @file exchangedb/pg_reserves_in_insert.c
     18  * @brief Implementation of the reserves_in_insert function for Postgres
     19  * @author Christian Grothoff
     20  * @author Joseph Xu
     21  */
     22 #include "taler/platform.h"
     23 #include "taler/taler_error_codes.h"
     24 #include "taler/taler_dbevents.h"
     25 #include "taler/taler_pq_lib.h"
     26 #include "pg_reserves_in_insert.h"
     27 #include "pg_helper.h"
     28 #include "pg_start.h"
     29 #include "pg_start_read_committed.h"
     30 #include "pg_commit.h"
     31 #include "pg_preflight.h"
     32 #include "pg_rollback.h"
     33 #include "pg_event_notify.h"
     34 
     35 
     36 /**
     37  * Generate event notification for the reserve change.
     38  *
     39  * @param reserve_pub reserve to notfiy on
     40  * @return string to pass to postgres for the notification
     41  */
     42 static char *
     43 compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub)
     44 {
     45   struct TALER_ReserveEventP rep = {
     46     .header.size = htons (sizeof (rep)),
     47     .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING),
     48     .reserve_pub = *reserve_pub
     49   };
     50 
     51   return GNUNET_PQ_get_event_notify_channel (&rep.header);
     52 }
     53 
     54 
     55 /**
     56  * Closure for our helper_cb()
     57  */
     58 struct Context
     59 {
     60   /**
     61    * Array of reserve UUIDs to initialize.
     62    */
     63   uint64_t *reserve_uuids;
     64 
     65   /**
     66    * Array with entries set to 'true' for duplicate transactions.
     67    */
     68   bool *transaction_duplicates;
     69 
     70   /**
     71    * Array with entries set to 'true' for rows with conflicts.
     72    */
     73   bool *conflicts;
     74 
     75   /**
     76    * Set to #GNUNET_SYSERR on failures.
     77    */
     78   enum GNUNET_GenericReturnValue status;
     79 
     80   /**
     81    * Single value (no array) set to true if we need
     82    * to follow-up with an update.
     83    */
     84   bool needs_update;
     85 };
     86 
     87 
     88 /**
     89  * Helper function to be called with the results of a SELECT statement
     90  * that has returned @a num_results results.
     91  *
     92  * @param cls closure of type `struct Context *`
     93  * @param result the postgres result
     94  * @param num_results the number of results in @a result
     95  */
     96 static void
     97 helper_cb (void *cls,
     98            PGresult *result,
     99            unsigned int num_results)
    100 {
    101   struct Context *ctx = cls;
    102 
    103   for (unsigned int i = 0; i<num_results; i++)
    104   {
    105     struct GNUNET_PQ_ResultSpec rs[] = {
    106       GNUNET_PQ_result_spec_bool (
    107         "transaction_duplicate",
    108         &ctx->transaction_duplicates[i]),
    109       GNUNET_PQ_result_spec_allow_null (
    110         GNUNET_PQ_result_spec_uint64 ("ruuid",
    111                                       &ctx->reserve_uuids[i]),
    112         &ctx->conflicts[i]),
    113       GNUNET_PQ_result_spec_end
    114     };
    115 
    116     if (GNUNET_OK !=
    117         GNUNET_PQ_extract_result (result,
    118                                   rs,
    119                                   i))
    120     {
    121       GNUNET_break (0);
    122       ctx->status = GNUNET_SYSERR;
    123       return;
    124     }
    125     if (! ctx->transaction_duplicates[i])
    126       ctx->needs_update |= ctx->conflicts[i];
    127   }
    128 }
    129 
    130 
    131 enum GNUNET_DB_QueryStatus
    132 TEH_PG_reserves_in_insert (
    133   void *cls,
    134   const struct TALER_EXCHANGEDB_ReserveInInfo *reserves,
    135   unsigned int reserves_length,
    136   enum GNUNET_DB_QueryStatus *results)
    137 {
    138   struct PostgresClosure *pg = cls;
    139   unsigned int dups = 0;
    140 
    141   struct TALER_FullPaytoHashP h_full_paytos[
    142     GNUNET_NZL (reserves_length)];
    143   struct TALER_NormalizedPaytoHashP h_normalized_paytos[
    144     GNUNET_NZL (reserves_length)];
    145   char *notify_s[GNUNET_NZL (reserves_length)];
    146   struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)];
    147   struct TALER_Amount balances[GNUNET_NZL (reserves_length)];
    148   struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)];
    149   const char *sender_account_details[GNUNET_NZL (reserves_length)];
    150   const char *exchange_account_names[GNUNET_NZL (reserves_length)];
    151   uint64_t wire_references[GNUNET_NZL (reserves_length)];
    152   uint64_t reserve_uuids[GNUNET_NZL (reserves_length)];
    153   bool transaction_duplicates[GNUNET_NZL (reserves_length)];
    154   bool conflicts[GNUNET_NZL (reserves_length)];
    155   struct GNUNET_TIME_Timestamp reserve_expiration
    156     = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time);
    157   struct GNUNET_TIME_Timestamp gc
    158     = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time);
    159   enum GNUNET_DB_QueryStatus qs;
    160   bool need_update;
    161 
    162   for (unsigned int i = 0; i<reserves_length; i++)
    163   {
    164     const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i];
    165 
    166     TALER_full_payto_hash (reserve->sender_account_details,
    167                            &h_full_paytos[i]);
    168     TALER_full_payto_normalize_and_hash (reserve->sender_account_details,
    169                                          &h_normalized_paytos[i]);
    170     notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub);
    171     reserve_pubs[i] = *reserve->reserve_pub;
    172     balances[i] = *reserve->balance;
    173     execution_times[i] = reserve->execution_time;
    174     sender_account_details[i] = reserve->sender_account_details.full_payto;
    175     exchange_account_names[i] = reserve->exchange_account_name;
    176     wire_references[i] = reserve->wire_reference;
    177   }
    178 
    179   /* NOTE: kind-of pointless to explicitly start a transaction here... */
    180   if (GNUNET_OK !=
    181       TEH_PG_preflight (pg))
    182   {
    183     GNUNET_break (0);
    184     qs = GNUNET_DB_STATUS_HARD_ERROR;
    185     goto finished;
    186   }
    187   if (GNUNET_OK !=
    188       TEH_PG_start_read_committed (pg,
    189                                    "READ_COMMITED"))
    190   {
    191     GNUNET_break (0);
    192     qs = GNUNET_DB_STATUS_HARD_ERROR;
    193     goto finished;
    194   }
    195   PREPARE (pg,
    196            "reserves_insert_with_array",
    197            "SELECT"
    198            " transaction_duplicate"
    199            ",ruuid"
    200            " FROM exchange_do_array_reserves_insert"
    201            " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);");
    202   {
    203     struct GNUNET_PQ_QueryParam params[] = {
    204       GNUNET_PQ_query_param_timestamp (&gc),
    205       GNUNET_PQ_query_param_timestamp (&reserve_expiration),
    206       GNUNET_PQ_query_param_array_auto_from_type (reserves_length,
    207                                                   reserve_pubs,
    208                                                   pg->conn),
    209       GNUNET_PQ_query_param_array_uint64 (reserves_length,
    210                                           wire_references,
    211                                           pg->conn),
    212       TALER_PQ_query_param_array_amount (
    213         reserves_length,
    214         balances,
    215         pg->conn),
    216       GNUNET_PQ_query_param_array_ptrs_string (
    217         reserves_length,
    218         (const char **) exchange_account_names,
    219         pg->conn),
    220       GNUNET_PQ_query_param_array_timestamp (
    221         reserves_length,
    222         execution_times,
    223         pg->conn),
    224       GNUNET_PQ_query_param_array_auto_from_type (
    225         reserves_length,
    226         h_full_paytos,
    227         pg->conn),
    228       GNUNET_PQ_query_param_array_auto_from_type (
    229         reserves_length,
    230         h_normalized_paytos,
    231         pg->conn),
    232       GNUNET_PQ_query_param_array_ptrs_string (
    233         reserves_length,
    234         (const char **) sender_account_details,
    235         pg->conn),
    236       GNUNET_PQ_query_param_array_ptrs_string (
    237         reserves_length,
    238         (const char **) notify_s,
    239         pg->conn),
    240       GNUNET_PQ_query_param_end
    241     };
    242     struct Context ctx = {
    243       .reserve_uuids = reserve_uuids,
    244       .transaction_duplicates = transaction_duplicates,
    245       .conflicts = conflicts,
    246       .needs_update = false,
    247       .status = GNUNET_OK
    248     };
    249 
    250     qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn,
    251                                                "reserves_insert_with_array",
    252                                                params,
    253                                                &helper_cb,
    254                                                &ctx);
    255     GNUNET_PQ_cleanup_query_params_closures (params);
    256     if ( (qs < 0) ||
    257          (GNUNET_OK != ctx.status) )
    258     {
    259       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    260                   "Failed to insert into reserves (%d)\n",
    261                   qs);
    262       goto finished;
    263     }
    264     need_update = ctx.needs_update;
    265   }
    266 
    267   {
    268     enum GNUNET_DB_QueryStatus cs;
    269 
    270     cs = TEH_PG_commit (pg);
    271     if (cs < 0)
    272     {
    273       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    274                   "Failed to commit\n");
    275       qs = cs;
    276       goto finished;
    277     }
    278   }
    279 
    280   for (unsigned int i = 0; i<reserves_length; i++)
    281   {
    282     if (transaction_duplicates[i])
    283       dups++;
    284     results[i] = transaction_duplicates[i]
    285       ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
    286       : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    287   }
    288 
    289   if (! need_update)
    290   {
    291     qs = reserves_length;
    292     goto finished;
    293   }
    294   GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    295               "Reserve update needed for some reserves in the batch\n");
    296   PREPARE (pg,
    297            "reserves_update",
    298            "SELECT"
    299            " out_duplicate AS duplicate "
    300            "FROM exchange_do_batch_reserves_update"
    301            " ($1,$2,$3,$4,$5,$6,$7);");
    302 
    303   if (GNUNET_OK !=
    304       TEH_PG_start (pg,
    305                     "reserve-insert-continued"))
    306   {
    307     GNUNET_break (0);
    308     qs = GNUNET_DB_STATUS_HARD_ERROR;
    309     goto finished;
    310   }
    311 
    312   for (unsigned int i = 0; i<reserves_length; i++)
    313   {
    314     if (transaction_duplicates[i])
    315       continue;
    316     if (! conflicts[i])
    317       continue;
    318     {
    319       bool duplicate;
    320       struct GNUNET_PQ_QueryParam params[] = {
    321         GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]),
    322         GNUNET_PQ_query_param_timestamp (&reserve_expiration),
    323         GNUNET_PQ_query_param_uint64 (&wire_references[i]),
    324         TALER_PQ_query_param_amount (pg->conn,
    325                                      &balances[i]),
    326         GNUNET_PQ_query_param_string (exchange_account_names[i]),
    327         GNUNET_PQ_query_param_auto_from_type (&h_full_paytos[i]),
    328         GNUNET_PQ_query_param_string (notify_s[i]),
    329         GNUNET_PQ_query_param_end
    330       };
    331       struct GNUNET_PQ_ResultSpec rs[] = {
    332         GNUNET_PQ_result_spec_bool ("duplicate",
    333                                     &duplicate),
    334         GNUNET_PQ_result_spec_end
    335       };
    336       enum GNUNET_DB_QueryStatus qsi;
    337 
    338       qsi = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
    339                                                       "reserves_update",
    340                                                       params,
    341                                                       rs);
    342       if (qsi < 0)
    343       {
    344         GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    345                     "Failed to update reserves (%d)\n",
    346                     qsi);
    347         results[i] = qsi;
    348         goto finished;
    349       }
    350       results[i] = duplicate
    351           ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS
    352           : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    353     }
    354   }
    355   {
    356     enum GNUNET_DB_QueryStatus cs;
    357 
    358     cs = TEH_PG_commit (pg);
    359     if (cs < 0)
    360     {
    361       GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    362                   "Failed to commit\n");
    363       qs = cs;
    364       goto finished;
    365     }
    366   }
    367 finished:
    368   for (unsigned int i = 0; i<reserves_length; i++)
    369     GNUNET_free (notify_s[i]);
    370   if (qs < 0)
    371     return qs;
    372   GNUNET_PQ_event_do_poll (pg->conn);
    373   if (0 != dups)
    374     GNUNET_log (GNUNET_ERROR_TYPE_WARNING,
    375                 "%u/%u duplicates among incoming transactions. Try increasing WIREWATCH_IDLE_SLEEP_INTERVAL in the [exchange] configuration section (if this happens a lot).\n",
    376                 dups,
    377                 reserves_length);
    378   return qs;
    379 }