pg_select_early_aggregations.c (4915B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file auditordb/pg_select_early_aggregations.c 18 * @brief Implementation of the select_early_aggregations function for Postgres 19 * @author Christian Grothoff 20 */ 21 #include "taler/platform.h" 22 #include "taler/taler_error_codes.h" 23 #include "taler/taler_dbevents.h" 24 #include "taler/taler_pq_lib.h" 25 #include "pg_select_early_aggregations.h" 26 #include "pg_helper.h" 27 28 29 /** 30 * Closure for #early_aggregation_cb(). 31 */ 32 struct EarlyAggregationContext 33 { 34 35 /** 36 * Function to call for each early aggregation. 37 */ 38 TALER_AUDITORDB_EarlyAggregationsCallback cb; 39 40 /** 41 * Closure for @e cb 42 */ 43 void *cb_cls; 44 45 /** 46 * Plugin context. 47 */ 48 struct PostgresClosure *pg; 49 50 /** 51 * Query status to return. 52 */ 53 enum GNUNET_DB_QueryStatus qs; 54 }; 55 56 57 /** 58 * Helper function for #TAH_PG_select_purse_expired(). 59 * To be called with the results of a SELECT statement 60 * that has returned @a num_results results. 61 * 62 * @param cls closure of type `struct EarlyAggregationContext *` 63 * @param result the postgres result 64 * @param num_results the number of results in @a result 65 */ 66 static void 67 early_aggregation_cb (void *cls, 68 PGresult *result, 69 unsigned int num_results) 70 { 71 struct EarlyAggregationContext *eic = cls; 72 struct PostgresClosure *pg = eic->pg; 73 74 for (unsigned int i = 0; i < num_results; i++) 75 { 76 struct TALER_AUDITORDB_EarlyAggregation ea; 77 struct GNUNET_PQ_ResultSpec rs[] = { 78 GNUNET_PQ_result_spec_uint64 ("row_id", 79 &ea.row_id), 80 GNUNET_PQ_result_spec_uint64 ("batch_deposit_serial_id", 81 &ea.batch_deposit_serial_id), 82 GNUNET_PQ_result_spec_uint64 ("tracking_serial_id", 83 &ea.tracking_serial_id), 84 TALER_PQ_RESULT_SPEC_AMOUNT ("amount", 85 &ea.total), 86 GNUNET_PQ_result_spec_bool ("suppressed", 87 &ea.suppressed), 88 GNUNET_PQ_result_spec_end 89 }; 90 91 /* just to be safe in case the structure changes */ 92 memset (&ea, 93 0, 94 sizeof (ea)); 95 if (GNUNET_OK != 96 GNUNET_PQ_extract_result (result, 97 rs, 98 i)) 99 { 100 GNUNET_break (0); 101 eic->qs = GNUNET_DB_STATUS_HARD_ERROR; 102 return; 103 } 104 eic->cb (eic->cb_cls, 105 &ea); 106 } 107 eic->qs = num_results; 108 } 109 110 111 enum GNUNET_DB_QueryStatus 112 TAH_PG_select_early_aggregations ( 113 void *cls, 114 int64_t limit, 115 uint64_t offset, 116 bool return_suppressed, 117 TALER_AUDITORDB_EarlyAggregationsCallback cb, 118 void *cb_cls) 119 { 120 struct PostgresClosure *pg = cls; 121 uint64_t ulimit = (limit < 0) ? -limit : limit; 122 struct GNUNET_PQ_QueryParam params[] = { 123 GNUNET_PQ_query_param_uint64 (&offset), 124 GNUNET_PQ_query_param_uint64 (&ulimit), 125 GNUNET_PQ_query_param_bool (return_suppressed), 126 GNUNET_PQ_query_param_end 127 }; 128 struct EarlyAggregationContext eic = { 129 .cb = cb, 130 .cb_cls = cb_cls, 131 .pg = pg 132 }; 133 enum GNUNET_DB_QueryStatus qs; 134 135 PREPARE (pg, 136 "auditor_select_early_aggregations_asc", 137 "SELECT" 138 " row_id" 139 ",batch_deposit_serial_id" 140 ",tracking_serial_id" 141 ",amount" 142 ",suppressed" 143 " FROM auditor_early_aggregations" 144 " WHERE row_id > $1" 145 " AND ($3 OR NOT suppressed)" 146 " ORDER BY row_id ASC" 147 " LIMIT $2;"); 148 PREPARE (pg, 149 "auditor_select_early_aggregations_desc", 150 "SELECT" 151 " row_id" 152 ",batch_deposit_serial_id" 153 ",tracking_serial_id" 154 ",amount" 155 ",suppressed" 156 " FROM auditor_early_aggregations" 157 " WHERE row_id < $1" 158 " AND ($3 OR NOT suppressed)" 159 " ORDER BY row_id DESC" 160 " LIMIT $2;"); 161 qs = GNUNET_PQ_eval_prepared_multi_select ( 162 pg->conn, 163 (limit < 0) 164 ? "auditor_select_early_aggregations_desc" 165 : "auditor_select_early_aggregations_asc ", 166 params, 167 &early_aggregation_cb, 168 &eic); 169 if (0 > qs) 170 return qs; 171 GNUNET_break (GNUNET_DB_STATUS_HARD_ERROR != eic.qs); 172 return eic.qs; 173 }