exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

pg_aggregate.c (9570B)


      1 /*
      2    This file is part of TALER
      3    Copyright (C) 2022, 2023 Taler Systems SA
      4 
      5    TALER is free software; you can redistribute it and/or modify it under the
      6    terms of the GNU General Public License as published by the Free Software
      7    Foundation; either version 3, or (at your option) any later version.
      8 
      9    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
     12 
     13    You should have received a copy of the GNU General Public License along with
     14    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15  */
     16 /**
     17  * @file exchangedb/pg_aggregate.c
     18  * @brief Implementation of the aggregate function for Postgres
     19  * @author Christian Grothoff
     20  */
     21 #include "taler/platform.h"
     22 #include "taler/taler_error_codes.h"
     23 #include "taler/taler_dbevents.h"
     24 #include "taler/taler_pq_lib.h"
     25 #include "pg_compute_shard.h"
     26 #include "pg_event_notify.h"
     27 #include "pg_aggregate.h"
     28 #include "pg_helper.h"
     29 
     30 
     31 enum GNUNET_DB_QueryStatus
     32 TEH_PG_aggregate (
     33   void *cls,
     34   const struct TALER_FullPaytoHashP *h_payto,
     35   const struct TALER_MerchantPublicKeyP *merchant_pub,
     36   const struct TALER_WireTransferIdentifierRawP *wtid,
     37   struct TALER_Amount *total)
     38 {
     39   struct PostgresClosure *pg = cls;
     40   uint64_t deposit_shard = TEH_PG_compute_shard (merchant_pub);
     41   struct GNUNET_TIME_Absolute now = {0};
     42   uint64_t sum_deposit_value;
     43   uint64_t sum_deposit_frac;
     44   uint64_t sum_refund_value;
     45   uint64_t sum_refund_frac;
     46   uint64_t sum_fee_value;
     47   uint64_t sum_fee_frac;
     48   enum GNUNET_DB_QueryStatus qs;
     49   struct TALER_Amount sum_deposit;
     50   struct TALER_Amount sum_refund;
     51   struct TALER_Amount sum_fee;
     52   struct TALER_Amount delta;
     53 
     54   now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (),
     55                                          pg->aggregator_shift);
     56   PREPARE (pg,
     57            "aggregate",
     58            "WITH bdep AS (" /* restrict to our merchant and account and mark as done */
     59            "  UPDATE batch_deposits"
     60            "     SET done=TRUE"
     61            "   WHERE NOT (done OR policy_blocked)" /* only actually executable deposits */
     62            "     AND refund_deadline<$1"
     63            "     AND shard=$5" /* only for efficiency, merchant_pub is what we really filter by */
     64            "     AND merchant_pub=$2" /* filter by target merchant */
     65            "     AND wire_target_h_payto=$3" /* merchant could have a 2nd bank account */
     66            "   RETURNING"
     67            "     batch_deposit_serial_id)"
     68            " ,cdep AS ("
     69            "   SELECT"
     70            "     coin_deposit_serial_id"
     71            "    ,batch_deposit_serial_id"
     72            "    ,coin_pub"
     73            "    ,amount_with_fee AS amount"
     74            "   FROM coin_deposits"
     75            "   WHERE batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))"
     76            " ,ref AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
     77            "  SELECT"
     78            "    amount_with_fee AS refund"
     79            "   ,coin_pub"
     80            "   ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
     81            "    FROM refunds"
     82            "   WHERE coin_pub IN (SELECT coin_pub FROM cdep)"
     83            "     AND batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))"
     84            " ,ref_by_coin AS (" /* total up refunds by coin */
     85            "  SELECT"
     86            "    SUM((ref.refund).val) AS sum_refund_val"
     87            "   ,SUM((ref.refund).frac) AS sum_refund_frac"
     88            "   ,coin_pub"
     89            "   ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
     90            "    FROM ref"
     91            "   GROUP BY coin_pub, batch_deposit_serial_id)"
     92            " ,norm_ref_by_coin AS (" /* normalize */
     93            "  SELECT"
     94            "    sum_refund_val + sum_refund_frac / 100000000 AS norm_refund_val"
     95            "   ,sum_refund_frac % 100000000 AS norm_refund_frac"
     96            "   ,coin_pub"
     97            "   ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
     98            "    FROM ref_by_coin)"
     99            " ,fully_refunded_coins AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
    100            "  SELECT"
    101            "    cdep.coin_pub"
    102            "    FROM norm_ref_by_coin norm"
    103            "    JOIN cdep"
    104            "      ON (norm.coin_pub = cdep.coin_pub"
    105            "      AND norm.batch_deposit_serial_id = cdep.batch_deposit_serial_id"
    106            "      AND norm.norm_refund_val = (cdep.amount).val"
    107            "      AND norm.norm_refund_frac = (cdep.amount).frac))"
    108            " ,fees AS (" /* find deposit fees for not fully refunded deposits */
    109            "  SELECT"
    110            "    denom.fee_deposit AS fee"
    111            "   ,cs.batch_deposit_serial_id" /* ensures we get the fee for each coin, not once per denomination */
    112            "    FROM cdep cs"
    113            "    JOIN known_coins kc" /* NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
    114            "      USING (coin_pub)"
    115            "    JOIN denominations denom"
    116            "      USING (denominations_serial)"
    117            "    WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins))"
    118            " ,dummy AS (" /* add deposits to aggregation_tracking */
    119            "    INSERT INTO aggregation_tracking"
    120            "    (batch_deposit_serial_id"
    121            "    ,wtid_raw)"
    122            "    SELECT batch_deposit_serial_id,$4"
    123            "      FROM bdep)"
    124            "SELECT" /* calculate totals (deposits, refunds and fees) */
    125            "  CAST(COALESCE(SUM((cdep.amount).val),0) AS INT8) AS sum_deposit_value"
    126            /* cast needed, otherwise we get NUMBER */
    127            " ,COALESCE(SUM((cdep.amount).frac),0) AS sum_deposit_fraction" /* SUM over INT returns INT8 */
    128            " ,CAST(COALESCE(SUM((ref.refund).val),0) AS INT8) AS sum_refund_value"
    129            " ,COALESCE(SUM((ref.refund).frac),0) AS sum_refund_fraction"
    130            " ,CAST(COALESCE(SUM((fees.fee).val),0) AS INT8) AS sum_fee_value"
    131            " ,COALESCE(SUM((fees.fee).frac),0) AS sum_fee_fraction"
    132            " FROM cdep "
    133            "   FULL OUTER JOIN ref ON (FALSE)"    /* We just want all sums */
    134            "   FULL OUTER JOIN fees ON (FALSE);");
    135 
    136   {
    137     struct GNUNET_PQ_QueryParam params[] = {
    138       GNUNET_PQ_query_param_absolute_time (&now),
    139       GNUNET_PQ_query_param_auto_from_type (merchant_pub),
    140       GNUNET_PQ_query_param_auto_from_type (h_payto),
    141       GNUNET_PQ_query_param_auto_from_type (wtid),
    142       GNUNET_PQ_query_param_uint64 (&deposit_shard),
    143       GNUNET_PQ_query_param_end
    144     };
    145     struct GNUNET_PQ_ResultSpec rs[] = {
    146       GNUNET_PQ_result_spec_uint64 ("sum_deposit_value",
    147                                     &sum_deposit_value),
    148       GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction",
    149                                     &sum_deposit_frac),
    150       GNUNET_PQ_result_spec_uint64 ("sum_refund_value",
    151                                     &sum_refund_value),
    152       GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction",
    153                                     &sum_refund_frac),
    154       GNUNET_PQ_result_spec_uint64 ("sum_fee_value",
    155                                     &sum_fee_value),
    156       GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction",
    157                                     &sum_fee_frac),
    158       GNUNET_PQ_result_spec_end
    159     };
    160 
    161     qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
    162                                                    "aggregate",
    163                                                    params,
    164                                                    rs);
    165   }
    166   if (qs < 0)
    167   {
    168     GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
    169     return qs;
    170   }
    171   if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
    172   {
    173     GNUNET_assert (GNUNET_OK ==
    174                    TALER_amount_set_zero (pg->currency,
    175                                           total));
    176     return qs;
    177   }
    178   GNUNET_assert (GNUNET_OK ==
    179                  TALER_amount_set_zero (pg->currency,
    180                                         &sum_deposit));
    181   GNUNET_assert (GNUNET_OK ==
    182                  TALER_amount_set_zero (pg->currency,
    183                                         &sum_refund));
    184   GNUNET_assert (GNUNET_OK ==
    185                  TALER_amount_set_zero (pg->currency,
    186                                         &sum_fee));
    187   sum_deposit.value    = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE
    188                          + sum_deposit_value;
    189   sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE;
    190   sum_refund.value     = sum_refund_frac  / TALER_AMOUNT_FRAC_BASE
    191                          + sum_refund_value;
    192   sum_refund.fraction  = sum_refund_frac  % TALER_AMOUNT_FRAC_BASE;
    193   sum_fee.value        = sum_fee_frac     / TALER_AMOUNT_FRAC_BASE
    194                          + sum_fee_value;
    195   sum_fee.fraction     = sum_fee_frac     % TALER_AMOUNT_FRAC_BASE; \
    196   GNUNET_assert (0 <=
    197                  TALER_amount_subtract (&delta,
    198                                         &sum_deposit,
    199                                         &sum_refund));
    200   GNUNET_assert (0 <=
    201                  TALER_amount_subtract (total,
    202                                         &delta,
    203                                         &sum_fee));
    204   return qs;
    205 }