diff options
Diffstat (limited to 'src/exchangedb/pg_aggregate.c')
-rw-r--r-- | src/exchangedb/pg_aggregate.c | 138 |
1 files changed, 74 insertions, 64 deletions
diff --git a/src/exchangedb/pg_aggregate.c b/src/exchangedb/pg_aggregate.c index 6e94cbebb..ba03e4a9c 100644 --- a/src/exchangedb/pg_aggregate.c +++ b/src/exchangedb/pg_aggregate.c @@ -22,9 +22,12 @@ #include "taler_error_codes.h" #include "taler_dbevents.h" #include "taler_pq_lib.h" +#include "pg_compute_shard.h" +#include "pg_event_notify.h" #include "pg_aggregate.h" #include "pg_helper.h" + enum GNUNET_DB_QueryStatus TEH_PG_aggregate ( void *cls, @@ -34,35 +37,14 @@ TEH_PG_aggregate ( struct TALER_Amount *total) { struct PostgresClosure *pg = cls; + uint64_t deposit_shard = TEH_PG_compute_shard (merchant_pub); struct GNUNET_TIME_Absolute now = {0}; - struct GNUNET_PQ_QueryParam params[] = { - GNUNET_PQ_query_param_absolute_time (&now), - GNUNET_PQ_query_param_auto_from_type (merchant_pub), - GNUNET_PQ_query_param_auto_from_type (h_payto), - GNUNET_PQ_query_param_auto_from_type (wtid), - GNUNET_PQ_query_param_end - }; uint64_t sum_deposit_value; uint64_t sum_deposit_frac; uint64_t sum_refund_value; uint64_t sum_refund_frac; uint64_t sum_fee_value; uint64_t sum_fee_frac; - struct GNUNET_PQ_ResultSpec rs[] = { - GNUNET_PQ_result_spec_uint64 ("sum_deposit_value", - &sum_deposit_value), - GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction", - &sum_deposit_frac), - GNUNET_PQ_result_spec_uint64 ("sum_refund_value", - &sum_refund_value), - GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction", - &sum_refund_frac), - GNUNET_PQ_result_spec_uint64 ("sum_fee_value", - &sum_fee_value), - GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction", - &sum_fee_frac), - GNUNET_PQ_result_spec_end - }; enum GNUNET_DB_QueryStatus qs; struct TALER_Amount sum_deposit; struct TALER_Amount sum_refund; @@ -71,61 +53,63 @@ TEH_PG_aggregate ( now = GNUNET_TIME_absolute_round_down (GNUNET_TIME_absolute_get (), pg->aggregator_shift); - - /* Used in #postgres_aggregate() */ PREPARE (pg, "aggregate", - "WITH dep AS (" /* restrict to our merchant and account and mark as done */ - " UPDATE deposits" + "WITH bdep AS (" /* restrict to our merchant and account and mark as done */ + " UPDATE batch_deposits" " SET done=TRUE" " WHERE NOT (done OR policy_blocked)" /* only actually executable deposits */ - " AND refund_deadline<$1" /* filter by shard */ + " AND refund_deadline<$1" + " AND shard=$5" /* only for efficiency, merchant_pub is what we really filter by */ " AND merchant_pub=$2" /* filter by target merchant */ " AND wire_target_h_payto=$3" /* merchant could have a 2nd bank account */ " RETURNING" - " deposit_serial_id" + " batch_deposit_serial_id)" + " ,cdep AS (" + " SELECT" + " coin_deposit_serial_id" + " ,batch_deposit_serial_id" " ,coin_pub" - " ,amount_with_fee_val AS amount_val" - " ,amount_with_fee_frac AS amount_frac)" + " ,amount_with_fee AS amount" + " FROM coin_deposits" + " WHERE batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))" " ,ref AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */ " SELECT" - " amount_with_fee_val AS refund_val" - " ,amount_with_fee_frac AS refund_frac" + " amount_with_fee AS refund" " ,coin_pub" - " ,deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ + " ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ " FROM refunds" - " WHERE coin_pub IN (SELECT coin_pub FROM dep)" - " AND deposit_serial_id IN (SELECT deposit_serial_id FROM dep))" + " WHERE coin_pub IN (SELECT coin_pub FROM cdep)" + " AND batch_deposit_serial_id IN (SELECT batch_deposit_serial_id FROM bdep))" " ,ref_by_coin AS (" /* total up refunds by coin */ " SELECT" - " SUM(refund_val) AS sum_refund_val" - " ,SUM(refund_frac) AS sum_refund_frac" + " SUM((ref.refund).val) AS sum_refund_val" + " ,SUM((ref.refund).frac) AS sum_refund_frac" " ,coin_pub" - " ,deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ + " ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ " FROM ref" - " GROUP BY coin_pub, deposit_serial_id)" + " GROUP BY coin_pub, batch_deposit_serial_id)" " ,norm_ref_by_coin AS (" /* normalize */ " SELECT" " sum_refund_val + sum_refund_frac / 100000000 AS norm_refund_val" " ,sum_refund_frac % 100000000 AS norm_refund_frac" " ,coin_pub" - " ,deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ + " ,batch_deposit_serial_id" /* theoretically, coin could be in multiple refunded transactions */ " FROM ref_by_coin)" " ,fully_refunded_coins AS (" /* find applicable refunds -- NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */ " SELECT" - " dep.coin_pub" + " cdep.coin_pub" " FROM norm_ref_by_coin norm" - " JOIN dep" - " ON (norm.coin_pub = dep.coin_pub" - " AND norm.deposit_serial_id = dep.deposit_Serial_id" - " AND norm.norm_refund_val = dep.amount_val" - " AND norm.norm_refund_frac = dep.amount_frac))" + " JOIN cdep" + " ON (norm.coin_pub = cdep.coin_pub" + " AND norm.batch_deposit_serial_id = cdep.batch_deposit_serial_id" + " AND norm.norm_refund_val = (cdep.amount).val" + " AND norm.norm_refund_frac = (cdep.amount).frac))" " ,fees AS (" /* find deposit fees for not fully refunded deposits */ " SELECT" - " denom.fee_deposit_val AS fee_val" - " ,denom.fee_deposit_frac AS fee_frac" - " ,cs.deposit_serial_id" /* ensures we get the fee for each coin, not once per denomination */ - " FROM dep cs" + " denom.fee_deposit AS fee" + " ,cs.batch_deposit_serial_id" /* ensures we get the fee for each coin, not once per denomination */ + " FROM cdep cs" " JOIN known_coins kc" /* NOTE: may do a full join on the master, maybe find a left-join way to integrate with query above to push it to the shards? */ " USING (coin_pub)" " JOIN denominations denom" @@ -133,26 +117,52 @@ TEH_PG_aggregate ( " WHERE coin_pub NOT IN (SELECT coin_pub FROM fully_refunded_coins))" " ,dummy AS (" /* add deposits to aggregation_tracking */ " INSERT INTO aggregation_tracking" - " (deposit_serial_id" + " (batch_deposit_serial_id" " ,wtid_raw)" - " SELECT deposit_serial_id,$4" - " FROM dep)" + " SELECT batch_deposit_serial_id,$4" + " FROM bdep)" "SELECT" /* calculate totals (deposits, refunds and fees) */ - " CAST(COALESCE(SUM(dep.amount_val),0) AS INT8) AS sum_deposit_value" /* cast needed, otherwise we get NUMBER */ - " ,COALESCE(SUM(dep.amount_frac),0) AS sum_deposit_fraction" /* SUM over INT returns INT8 */ - " ,CAST(COALESCE(SUM(ref.refund_val),0) AS INT8) AS sum_refund_value" - " ,COALESCE(SUM(ref.refund_frac),0) AS sum_refund_fraction" - " ,CAST(COALESCE(SUM(fees.fee_val),0) AS INT8) AS sum_fee_value" - " ,COALESCE(SUM(fees.fee_frac),0) AS sum_fee_fraction" - " FROM dep " + " CAST(COALESCE(SUM((cdep.amount).val),0) AS INT8) AS sum_deposit_value" + /* cast needed, otherwise we get NUMBER */ + " ,COALESCE(SUM((cdep.amount).frac),0) AS sum_deposit_fraction" /* SUM over INT returns INT8 */ + " ,CAST(COALESCE(SUM((ref.refund).val),0) AS INT8) AS sum_refund_value" + " ,COALESCE(SUM((ref.refund).frac),0) AS sum_refund_fraction" + " ,CAST(COALESCE(SUM((fees.fee).val),0) AS INT8) AS sum_fee_value" + " ,COALESCE(SUM((fees.fee).frac),0) AS sum_fee_fraction" + " FROM cdep " " FULL OUTER JOIN ref ON (FALSE)" /* We just want all sums */ " FULL OUTER JOIN fees ON (FALSE);"); + { + struct GNUNET_PQ_QueryParam params[] = { + GNUNET_PQ_query_param_absolute_time (&now), + GNUNET_PQ_query_param_auto_from_type (merchant_pub), + GNUNET_PQ_query_param_auto_from_type (h_payto), + GNUNET_PQ_query_param_auto_from_type (wtid), + GNUNET_PQ_query_param_uint64 (&deposit_shard), + GNUNET_PQ_query_param_end + }; + struct GNUNET_PQ_ResultSpec rs[] = { + GNUNET_PQ_result_spec_uint64 ("sum_deposit_value", + &sum_deposit_value), + GNUNET_PQ_result_spec_uint64 ("sum_deposit_fraction", + &sum_deposit_frac), + GNUNET_PQ_result_spec_uint64 ("sum_refund_value", + &sum_refund_value), + GNUNET_PQ_result_spec_uint64 ("sum_refund_fraction", + &sum_refund_frac), + GNUNET_PQ_result_spec_uint64 ("sum_fee_value", + &sum_fee_value), + GNUNET_PQ_result_spec_uint64 ("sum_fee_fraction", + &sum_fee_frac), + GNUNET_PQ_result_spec_end + }; - qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, - "aggregate", - params, - rs); + qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, + "aggregate", + params, + rs); + } if (qs < 0) { GNUNET_break (GNUNET_DB_STATUS_SOFT_ERROR == qs); |