summaryrefslogtreecommitdiff
path: root/src/exchangedb/pg_aggregate.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/exchangedb/pg_aggregate.c')
-rw-r--r--src/exchangedb/pg_aggregate.c205
1 files changed, 205 insertions, 0 deletions
diff --git a/src/exchangedb/pg_aggregate.c b/src/exchangedb/pg_aggregate.c
new file mode 100644
index 000000000..f1c4d6776
--- /dev/null
+++ b/src/exchangedb/pg_aggregate.c
@@ -0,0 +1,205 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2022 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+ */
+/**
+ * @file exchangedb/pg_aggregate.c
+ * @brief Implementation of the aggregate function for Postgres
+ * @author Christian Grothoff
+ */
+#include "platform.h"
+#include "taler_error_codes.h"
+#include "taler_dbevents.h"
+#include "taler_pq_lib.h"
+#include "pg_aggregate.h"
+#include "pg_helper.h"
+
+enum GNUNET_DB_QueryStatus
+TEH_PG_aggregate (
+ void *cls,
+ const struct TALER_PaytoHashP *h_payto,
+ const struct TALER_MerchantPublicKeyP *merchant_pub,
+ const struct TALER_WireTransferIdentifierRawP *wtid,
+ struct TALER_Amount *total)
+{
+ struct PostgresClosure *pg = cls;
+ struct GNUNET_TIME_Absolute now = {0};
+ struct GNUNET_PQ_QueryParam params[] = {
+ GNUNET_PQ_query_param_absolute_time (&now),
+ GNUNET_PQ_query_param_auto_from_type (merchant_pub),
+ GNUNET_PQ_query_param_auto_from_type (h_payto),
+ GNUNET_PQ_query_param_auto_from_type (wtid),
+ GNUNET_PQ_query_param_end
+ };
+ uint64_t sum_deposit_value;
+ uint64_t sum_deposit_frac;
+ uint64_t sum_refund_value;
+ uint64_t sum_refund_frac;
+ uint64_t sum_fee_value;
+ uint64_t sum_fee_frac;
+ struct GNUNET_PQ_ResultSpec rs[] = {
+ GNUNET_PQ_result_spec_uint64 ("sum_deposit_value",
+ &sum_deposit_value),
+ GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction",
+ &sum_deposit_frac),
+ GNUNET_PQ_result_spec_uint64 ("sum_refund_value",
+ &sum_refund_value),
+ GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction",
+ &sum_refund_frac),
+ GNUNET_PQ_result_spec_uint64 ("sum_fee_value",
+ &sum_fee_value),
+ GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction",
+ &sum_fee_frac),
+ GNUNET_PQ_result_spec_end
+ };
+ enum GNUNET_DB_QueryStatus qs;
+ struct TALER_Amount sum_deposit;
+ struct TALER_Amount sum_refund;
+ struct TALER_Amount sum_fee;
+ struct TALER_Amount delta;
+
+ now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (),
+ pg->aggregator_shift);
+
+ /* Used in #postgres_aggregate() */
+ PREPARE (pg,
+ "aggregate",
+ "WITH rdy AS (" /* find deposits ready by merchant */
+ " SELECT"
+ " coin_pub"
+ " FROM deposits_for_matching"
+ " WHERE refund_deadline<$1" /* filter by shard, only actually executable deposits */
+ " AND merchant_pub=$2" /* filter by target merchant */
+ " ORDER BY refund_deadline ASC" /* ordering is not critical */
+ " LIMIT "
+ TALER_QUOTE (TALER_EXCHANGEDB_MATCHING_DEPOSITS_LIMIT) /* limits transaction size */
+ " )"
+ " ,dep AS (" /* restrict to our merchant and account and mark as done */
+ " UPDATE deposits"
+ " SET done=TRUE"
+ " WHERE coin_pub IN (SELECT coin_pub FROM rdy)"
+ " AND merchant_pub=$2" /* theoretically, same coin could be spent at another merchant */
+ " AND wire_target_h_payto=$3" /* merchant could have a 2nd bank account */
+ " AND done=FALSE" /* theoretically, same coin could be spend at the same merchant a 2nd time */
+ " RETURNING"
+ " deposit_serial_id"
+ " ,coin_pub"
+ " ,amount_with_fee_val AS amount_val"
+ " ,amount_with_fee_frac AS amount_frac)"
+ " ,ref AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
+ " SELECT"
+ " amount_with_fee_val AS refund_val"
+ " ,amount_with_fee_frac AS refund_frac"
+ " ,coin_pub"
+ " ,deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
+ " FROM refunds"
+ " WHERE coin_pub IN (SELECT coin_pub FROM dep)"
+ " AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))"
+ " ,ref_by_coin AS (" /* total up refunds by coin */
+ " SELECT"
+ " SUM(refund_val) AS sum_refund_val"
+ " ,SUM(refund_frac) AS sum_refund_frac"
+ " ,coin_pub"
+ " ,deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
+ " FROM ref"
+ " GROUP BY coin_pub, deposit_serial_id)"
+ " ,norm_ref_by_coin AS (" /* normalize */
+ " SELECT"
+ " sum_refund_val + sum_refund_frac / 100000000 AS norm_refund_val"
+ " ,sum_refund_frac % 100000000 AS norm_refund_frac"
+ " ,coin_pub"
+ " ,deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */
+ " FROM ref_by_coin)"
+ " ,fully_refunded_coins AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
+ " SELECT"
+ " dep.coin_pub"
+ " FROM norm_ref_by_coin norm"
+ " JOIN dep"
+ " ON (norm.coin_pub = dep.coin_pub"
+ " AND norm.deposit_serial_id = dep.deposit_Serial_id"
+ " AND norm.norm_refund_val = dep.amount_val"
+ " AND norm.norm_refund_frac = dep.amount_frac))"
+ " ,fees AS (" /* find deposit fees for not fully refunded deposits */
+ " SELECT"
+ " denom.fee_deposit_val AS fee_val"
+ " ,denom.fee_deposit_frac AS fee_frac"
+ " ,cs.deposit_serial_id" /* ensures we get the fee for each coin, not once per denomination */
+ " FROM dep cs"
+ " JOIN known_coins kc" /* NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */
+ " USING (coin_pub)"
+ " JOIN denominations denom"
+ " USING (denominations_serial)"
+ " WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins))"
+ " ,dummy AS (" /* add deposits to aggregation_tracking */
+ " INSERT INTO aggregation_tracking"
+ " (deposit_serial_id"
+ " ,wtid_raw)"
+ " SELECT deposit_serial_id,$4"
+ " FROM dep)"
+ "SELECT" /* calculate totals (deposits, refunds and fees) */
+ " CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value" /* cast needed, otherwise we get NUMBER */
+ " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction" /* SUM over INT returns INT8 */
+ " ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value"
+ " ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction"
+ " ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value"
+ " ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction"
+ " FROM dep "
+ " FULL OUTER JOIN ref ON (FALSE)" /* We just want all sums */
+ " FULL OUTER JOIN fees ON (FALSE);");
+
+
+ qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
+ "aggregate",
+ params,
+ rs);
+ if (qs < 0)
+ {
+ GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs);
+ return qs;
+ }
+ if (GNUNET_DB_STATUS_SUCCESS_NO_RESULTS == qs)
+ {
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_set_zero (pg->currency,
+ total));
+ return qs;
+ }
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_set_zero (pg->currency,
+ &sum_deposit));
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_set_zero (pg->currency,
+ &sum_refund));
+ GNUNET_assert (GNUNET_OK ==
+ TALER_amount_set_zero (pg->currency,
+ &sum_fee));
+ sum_deposit.value = sum_deposit_frac / TALER_AMOUNT_FRAC_BASE
+ + sum_deposit_value;
+ sum_deposit.fraction = sum_deposit_frac % TALER_AMOUNT_FRAC_BASE;
+ sum_refund.value = sum_refund_frac / TALER_AMOUNT_FRAC_BASE
+ + sum_refund_value;
+ sum_refund.fraction = sum_refund_frac % TALER_AMOUNT_FRAC_BASE;
+ sum_fee.value = sum_fee_frac / TALER_AMOUNT_FRAC_BASE
+ + sum_fee_value;
+ sum_fee.fraction = sum_fee_frac % TALER_AMOUNT_FRAC_BASE; \
+ GNUNET_assert (0 <=
+ TALER_amount_subtract (&delta,
+ &sum_deposit,
+ &sum_refund));
+ GNUNET_assert (0 <=
+ TALER_amount_subtract (total,
+ &delta,
+ &sum_fee));
+ return qs;
+}