/* This file is part of TALER Copyright (C) 2022 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with TALER; see the file COPYING. If not, see */ /** * @file exchangedb/pg_aggregate.c * @brief Implementation of the aggregate function for Postgres * @author Christian Grothoff */ #include "platform.h" #include "taler_error_codes.h" #include "taler_dbevents.h" #include "taler_pq_lib.h" #include "pg_aggregate.h" #include "pg_helper.h" enum GNUNET_DB_QueryStatus TEH_PG_aggregate ( void *cls, const struct TALER_PaytoHashP *h_payto, const struct TALER_MerchantPublicKeyP *merchant_pub, const struct TALER_WireTransferIdentifierRawP *wtid, struct TALER_Amount *total) { struct PostgresClosure *pg = cls; struct GNUNET_TIME_Absolute now = {0}; struct GNUNET_PQ_QueryParam params[] = { GNUNET_PQ_query_param_absolute_time (&now), GNUNET_PQ_query_param_auto_from_type (merchant_pub), GNUNET_PQ_query_param_auto_from_type (h_payto), GNUNET_PQ_query_param_auto_from_type (wtid), GNUNET_PQ_query_param_end }; uint64_t sum_deposit_value; uint64_t sum_deposit_frac; uint64_t sum_refund_value; uint64_t sum_refund_frac; uint64_t sum_fee_value; uint64_t sum_fee_frac; struct GNUNET_PQ_ResultSpec rs[] = { GNUNET_PQ_result_spec_uint64 ("sum_deposit_value", &sum_deposit_value), GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction", &sum_deposit_frac), GNUNET_PQ_result_spec_uint64 ("sum_refund_value", &sum_refund_value), GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction", &sum_refund_frac), GNUNET_PQ_result_spec_uint64 ("sum_fee_value", &sum_fee_value), GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction", &sum_fee_frac), GNUNET_PQ_result_spec_end }; enum GNUNET_DB_QueryStatus qs; struct TALER_Amount sum_deposit; struct TALER_Amount sum_refund; struct TALER_Amount sum_fee; struct TALER_Amount delta; now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (), pg->aggregator_shift); /* Used in #postgres_aggregate() */ PREPARE (pg, "aggregate", "WITH rdy AS (" /* find deposits ready by merchant */ " SELECT" " coin_pub" " FROM deposits_for_matching" " WHERE refund_deadline<$1" /* filter by shard, only actually executable deposits */ " AND merchant_pub=$2" /* filter by target merchant */ " ORDER BY refund_deadline ASC" /* ordering is not critical */ " LIMIT " TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) /* limits transaction size */ " )" " ,dep AS (" /* restrict to our merchant and account and mark as done */ " UPDATE deposits" " SET done=TRUE" " WHERE coin_pub IN (SELECT coin_pub FROM rdy)" " AND merchant_pub=$2" /* theoretically, same coin could be spent at another merchant */ " AND wire_target_h_payto=$3" /* merchant could have a 2nd bank account */ " AND done=FALSE" /* theoretically, same coin could be spend at the same merchant a 2nd time */ " RETURNING" " deposit_serial_id" " ,coin_pub" " ,amount_with_fee_val AS amount_val" " ,amount_with_fee_frac AS amount_frac)" " ,ref AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */ " SELECT" " amount_with_fee_val AS refund_val" " ,amount_with_fee_frac AS refund_frac" " ,coin_pub" " ,deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ " FROM refunds" " WHERE coin_pub IN (SELECT coin_pub FROM dep)" " AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))" " ,ref_by_coin AS (" /* total up refunds by coin */ " SELECT" " SUM(refund_val) AS sum_refund_val" " ,SUM(refund_frac) AS sum_refund_frac" " ,coin_pub" " ,deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ " FROM ref" " GROUP BY coin_pub, deposit_serial_id)" " ,norm_ref_by_coin AS (" /* normalize */ " SELECT" " sum_refund_val + sum_refund_frac / 100000000 AS norm_refund_val" " ,sum_refund_frac % 100000000 AS norm_refund_frac" " ,coin_pub" " ,deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ " FROM ref_by_coin)" " ,fully_refunded_coins AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */ " SELECT" " dep.coin_pub" " FROM norm_ref_by_coin norm" " JOIN dep" " ON (norm.coin_pub = dep.coin_pub" " AND norm.deposit_serial_id = dep.deposit_Serial_id" " AND norm.norm_refund_val = dep.amount_val" " AND norm.norm_refund_frac = dep.amount_frac))" " ,fees AS (" /* find deposit fees for not fully refunded deposits */ " SELECT" " denom.fee_deposit_val AS fee_val" " ,denom.fee_deposit_frac AS fee_frac" " ,cs.deposit_serial_id" /* ensures we get the fee for each coin, not once per denomination */ " FROM dep cs" " JOIN known_coins kc" /* NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */ " USING (coin_pub)" " JOIN denominations denom" " USING (denominations_serial)" " WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins))" " ,dummy AS (" /* add deposits to aggregation_tracking */ " INSERT INTO aggregation_tracking" " (deposit_serial_id" " ,wtid_raw)" " SELECT deposit_serial_id,$4" " FROM dep)" "SELECT" /* calculate totals (deposits, refunds and fees) */ " CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value" /* cast needed, otherwise we get NUMBER */ " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction" /* SUM over INT returns INT8 */ " ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value" " ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction" " ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value" " ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction" " FROM dep " " FULL OUTER JOIN ref ON (FALSE)" /* We just want all sums */ " FULL OUTER JOIN fees ON (FALSE);"); qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, "aggregate", params, rs); if (qs < 0) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); return qs; } if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs) { GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (pg->currency, total)); return qs; } GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (pg->currency, &sum_deposit)); GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (pg->currency, &sum_refund)); GNUNET_assert (GNUNET_OK == TALER_amount_set_zero (pg->currency, &sum_fee)); sum_deposit.value = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE + sum_deposit_value; sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE; sum_refund.value = sum_refund_frac / TALER_AMOUNT_FRAC_BASE + sum_refund_value; sum_refund.fraction = sum_refund_frac % TALER_AMOUNT_FRAC_BASE; sum_fee.value = sum_fee_frac / TALER_AMOUNT_FRAC_BASE + sum_fee_value; sum_fee.fraction = sum_fee_frac % TALER_AMOUNT_FRAC_BASE; \ GNUNET_assert (0 <= TALER_amount_subtract (&delta, &sum_deposit, &sum_refund)); GNUNET_assert (0 <= TALER_amount_subtract (total, &delta, &sum_fee)); return qs; }