pg_aggregate.c (9570B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2022, 2023 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file exchangedb/pg_aggregate.c 18 * @brief Implementation of the aggregate function for Postgres 19 * @author Christian Grothoff 20 */ 21 #include "taler/platform.h" 22 #include "taler/taler_error_codes.h" 23 #include "taler/taler_dbevents.h" 24 #include "taler/taler_pq_lib.h" 25 #include "pg_compute_shard.h" 26 #include "pg_event_notify.h" 27 #include "pg_aggregate.h" 28 #include "pg_helper.h" 29 30 31 enum GNUNET_DB_QueryStatus 32 TEH_PG_aggregate ( 33 void *cls, 34 const struct TALER_FullPaytoHashP *h_payto, 35 const struct TALER_MerchantPublicKeyP *merchant_pub, 36 const struct TALER_WireTransferIdentifierRawP *wtid, 37 struct TALER_Amount *total) 38 { 39 struct PostgresClosure *pg = cls; 40 uint64_t deposit_shard = TEH_PG_compute_shard (merchant_pub); 41 struct GNUNET_TIME_Absolute now = {0}; 42 uint64_t sum_deposit_value; 43 uint64_t sum_deposit_frac; 44 uint64_t sum_refund_value; 45 uint64_t sum_refund_frac; 46 uint64_t sum_fee_value; 47 uint64_t sum_fee_frac; 48 enum GNUNET_DB_QueryStatus qs; 49 struct TALER_Amount sum_deposit; 50 struct TALER_Amount sum_refund; 51 struct TALER_Amount sum_fee; 52 struct TALER_Amount delta; 53 54 now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (), 55 pg->aggregator_shift); 56 PREPARE (pg, 57 "aggregate", 58 "WITH bdep AS (" /* restrict to our merchant and account and mark as done */ 59 " UPDATE batch_deposits" 60 " SET done=TRUE" 61 " WHERE NOT (done OR policy_blocked)" /* only actually executable deposits */ 62 " AND refund_deadline<$1" 63 " AND shard=$5" /* only for efficiency, merchant_pub is what we really filter by */ 64 " AND merchant_pub=$2" /* filter by target merchant */ 65 " AND wire_target_h_payto=$3" /* merchant could have a 2nd bank account */ 66 " RETURNING" 67 " batch_deposit_serial_id)" 68 " ,cdep AS (" 69 " SELECT" 70 " coin_deposit_serial_id" 71 " ,batch_deposit_serial_id" 72 " ,coin_pub" 73 " ,amount_with_fee AS amount" 74 " FROM coin_deposits" 75 " WHERE batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))" 76 " ,ref AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */ 77 " SELECT" 78 " amount_with_fee AS refund" 79 " ,coin_pub" 80 " ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ 81 " FROM refunds" 82 " WHERE coin_pub IN (SELECT coin_pub FROM cdep)" 83 " AND batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))" 84 " ,ref_by_coin AS (" /* total up refunds by coin */ 85 " SELECT" 86 " SUM((ref.refund).val) AS sum_refund_val" 87 " ,SUM((ref.refund).frac) AS sum_refund_frac" 88 " ,coin_pub" 89 " ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ 90 " FROM ref" 91 " GROUP BY coin_pub, batch_deposit_serial_id)" 92 " ,norm_ref_by_coin AS (" /* normalize */ 93 " SELECT" 94 " sum_refund_val + sum_refund_frac / 100000000 AS norm_refund_val" 95 " ,sum_refund_frac % 100000000 AS norm_refund_frac" 96 " ,coin_pub" 97 " ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ 98 " FROM ref_by_coin)" 99 " ,fully_refunded_coins AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */ 100 " SELECT" 101 " cdep.coin_pub" 102 " FROM norm_ref_by_coin norm" 103 " JOIN cdep" 104 " ON (norm.coin_pub = cdep.coin_pub" 105 " AND norm.batch_deposit_serial_id = cdep.batch_deposit_serial_id" 106 " AND norm.norm_refund_val = (cdep.amount).val" 107 " AND norm.norm_refund_frac = (cdep.amount).frac))" 108 " ,fees AS (" /* find deposit fees for not fully refunded deposits */ 109 " SELECT" 110 " denom.fee_deposit AS fee" 111 " ,cs.batch_deposit_serial_id" /* ensures we get the fee for each coin, not once per denomination */ 112 " FROM cdep cs" 113 " JOIN known_coins kc" /* NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */ 114 " USING (coin_pub)" 115 " JOIN denominations denom" 116 " USING (denominations_serial)" 117 " WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins))" 118 " ,dummy AS (" /* add deposits to aggregation_tracking */ 119 " INSERT INTO aggregation_tracking" 120 " (batch_deposit_serial_id" 121 " ,wtid_raw)" 122 " SELECT batch_deposit_serial_id,$4" 123 " FROM bdep)" 124 "SELECT" /* calculate totals (deposits, refunds and fees) */ 125 " CAST(COALESCE(SUM((cdep.amount).val),0) AS INT8) AS sum_deposit_value" 126 /* cast needed, otherwise we get NUMBER */ 127 " ,COALESCE(SUM((cdep.amount).frac),0) AS sum_deposit_fraction" /* SUM over INT returns INT8 */ 128 " ,CAST(COALESCE(SUM((ref.refund).val),0) AS INT8) AS sum_refund_value" 129 " ,COALESCE(SUM((ref.refund).frac),0) AS sum_refund_fraction" 130 " ,CAST(COALESCE(SUM((fees.fee).val),0) AS INT8) AS sum_fee_value" 131 " ,COALESCE(SUM((fees.fee).frac),0) AS sum_fee_fraction" 132 " FROM cdep " 133 " FULL OUTER JOIN ref ON (FALSE)" /* We just want all sums */ 134 " FULL OUTER JOIN fees ON (FALSE);"); 135 136 { 137 struct GNUNET_PQ_QueryParam params[] = { 138 GNUNET_PQ_query_param_absolute_time (&now), 139 GNUNET_PQ_query_param_auto_from_type (merchant_pub), 140 GNUNET_PQ_query_param_auto_from_type (h_payto), 141 GNUNET_PQ_query_param_auto_from_type (wtid), 142 GNUNET_PQ_query_param_uint64 (&deposit_shard), 143 GNUNET_PQ_query_param_end 144 }; 145 struct GNUNET_PQ_ResultSpec rs[] = { 146 GNUNET_PQ_result_spec_uint64 ("sum_deposit_value", 147 &sum_deposit_value), 148 GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction", 149 &sum_deposit_frac), 150 GNUNET_PQ_result_spec_uint64 ("sum_refund_value", 151 &sum_refund_value), 152 GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction", 153 &sum_refund_frac), 154 GNUNET_PQ_result_spec_uint64 ("sum_fee_value", 155 &sum_fee_value), 156 GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction", 157 &sum_fee_frac), 158 GNUNET_PQ_result_spec_end 159 }; 160 161 qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, 162 "aggregate", 163 params, 164 rs); 165 } 166 if (qs < 0) 167 { 168 GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); 169 return qs; 170 } 171 if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) 172 { 173 GNUNET_assert (GNUNET_OK == 174 TALER_amount_set_zero (pg->currency, 175 total)); 176 return qs; 177 } 178 GNUNET_assert (GNUNET_OK == 179 TALER_amount_set_zero (pg->currency, 180 &sum_deposit)); 181 GNUNET_assert (GNUNET_OK == 182 TALER_amount_set_zero (pg->currency, 183 &sum_refund)); 184 GNUNET_assert (GNUNET_OK == 185 TALER_amount_set_zero (pg->currency, 186 &sum_fee)); 187 sum_deposit.value = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE 188 + sum_deposit_value; 189 sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE; 190 sum_refund.value = sum_refund_frac / TALER_AMOUNT_FRAC_BASE 191 + sum_refund_value; 192 sum_refund.fraction = sum_refund_frac % TALER_AMOUNT_FRAC_BASE; 193 sum_fee.value = sum_fee_frac / TALER_AMOUNT_FRAC_BASE 194 + sum_fee_value; 195 sum_fee.fraction = sum_fee_frac % TALER_AMOUNT_FRAC_BASE; \ 196 GNUNET_assert (0 <= 197 TALER_amount_subtract (&delta, 198 &sum_deposit, 199 &sum_refund)); 200 GNUNET_assert (0 <= 201 TALER_amount_subtract (total, 202 &delta, 203 &sum_fee)); 204 return qs; 205 }