pg_reserves_in_insert.c (11751B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2022-2024 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file exchangedb/pg_reserves_in_insert.c 18 * @brief Implementation of the reserves_in_insert function for Postgres 19 * @author Christian Grothoff 20 * @author Joseph Xu 21 */ 22 #include "taler/platform.h" 23 #include "taler/taler_error_codes.h" 24 #include "taler/taler_dbevents.h" 25 #include "taler/taler_pq_lib.h" 26 #include "pg_reserves_in_insert.h" 27 #include "pg_helper.h" 28 #include "pg_start.h" 29 #include "pg_start_read_committed.h" 30 #include "pg_commit.h" 31 #include "pg_preflight.h" 32 #include "pg_rollback.h" 33 #include "pg_event_notify.h" 34 35 36 /** 37 * Generate event notification for the reserve change. 38 * 39 * @param reserve_pub reserve to notfiy on 40 * @return string to pass to postgres for the notification 41 */ 42 static char * 43 compute_notify_on_reserve (const struct TALER_ReservePublicKeyP *reserve_pub) 44 { 45 struct TALER_ReserveEventP rep = { 46 .header.size = htons (sizeof (rep)), 47 .header.type = htons (TALER_DBEVENT_EXCHANGE_RESERVE_INCOMING), 48 .reserve_pub = *reserve_pub 49 }; 50 51 return GNUNET_PQ_get_event_notify_channel (&rep.header); 52 } 53 54 55 /** 56 * Closure for our helper_cb() 57 */ 58 struct Context 59 { 60 /** 61 * Array of reserve UUIDs to initialize. 62 */ 63 uint64_t *reserve_uuids; 64 65 /** 66 * Array with entries set to 'true' for duplicate transactions. 67 */ 68 bool *transaction_duplicates; 69 70 /** 71 * Array with entries set to 'true' for rows with conflicts. 72 */ 73 bool *conflicts; 74 75 /** 76 * Set to #GNUNET_SYSERR on failures. 77 */ 78 enum GNUNET_GenericReturnValue status; 79 80 /** 81 * Single value (no array) set to true if we need 82 * to follow-up with an update. 83 */ 84 bool needs_update; 85 }; 86 87 88 /** 89 * Helper function to be called with the results of a SELECT statement 90 * that has returned @a num_results results. 91 * 92 * @param cls closure of type `struct Context *` 93 * @param result the postgres result 94 * @param num_results the number of results in @a result 95 */ 96 static void 97 helper_cb (void *cls, 98 PGresult *result, 99 unsigned int num_results) 100 { 101 struct Context *ctx = cls; 102 103 for (unsigned int i = 0; i<num_results; i++) 104 { 105 struct GNUNET_PQ_ResultSpec rs[] = { 106 GNUNET_PQ_result_spec_bool ( 107 "transaction_duplicate", 108 &ctx->transaction_duplicates[i]), 109 GNUNET_PQ_result_spec_allow_null ( 110 GNUNET_PQ_result_spec_uint64 ("ruuid", 111 &ctx->reserve_uuids[i]), 112 &ctx->conflicts[i]), 113 GNUNET_PQ_result_spec_end 114 }; 115 116 if (GNUNET_OK != 117 GNUNET_PQ_extract_result (result, 118 rs, 119 i)) 120 { 121 GNUNET_break (0); 122 ctx->status = GNUNET_SYSERR; 123 return; 124 } 125 if (! ctx->transaction_duplicates[i]) 126 ctx->needs_update |= ctx->conflicts[i]; 127 } 128 } 129 130 131 enum GNUNET_DB_QueryStatus 132 TEH_PG_reserves_in_insert ( 133 void *cls, 134 const struct TALER_EXCHANGEDB_ReserveInInfo *reserves, 135 unsigned int reserves_length, 136 enum GNUNET_DB_QueryStatus *results) 137 { 138 struct PostgresClosure *pg = cls; 139 unsigned int dups = 0; 140 141 struct TALER_FullPaytoHashP h_full_paytos[ 142 GNUNET_NZL (reserves_length)]; 143 struct TALER_NormalizedPaytoHashP h_normalized_paytos[ 144 GNUNET_NZL (reserves_length)]; 145 char *notify_s[GNUNET_NZL (reserves_length)]; 146 struct TALER_ReservePublicKeyP reserve_pubs[GNUNET_NZL (reserves_length)]; 147 struct TALER_Amount balances[GNUNET_NZL (reserves_length)]; 148 struct GNUNET_TIME_Timestamp execution_times[GNUNET_NZL (reserves_length)]; 149 const char *sender_account_details[GNUNET_NZL (reserves_length)]; 150 const char *exchange_account_names[GNUNET_NZL (reserves_length)]; 151 uint64_t wire_references[GNUNET_NZL (reserves_length)]; 152 uint64_t reserve_uuids[GNUNET_NZL (reserves_length)]; 153 bool transaction_duplicates[GNUNET_NZL (reserves_length)]; 154 bool conflicts[GNUNET_NZL (reserves_length)]; 155 struct GNUNET_TIME_Timestamp reserve_expiration 156 = GNUNET_TIME_relative_to_timestamp (pg->idle_reserve_expiration_time); 157 struct GNUNET_TIME_Timestamp gc 158 = GNUNET_TIME_relative_to_timestamp (pg->legal_reserve_expiration_time); 159 enum GNUNET_DB_QueryStatus qs; 160 bool need_update; 161 162 for (unsigned int i = 0; i<reserves_length; i++) 163 { 164 const struct TALER_EXCHANGEDB_ReserveInInfo *reserve = &reserves[i]; 165 166 TALER_full_payto_hash (reserve->sender_account_details, 167 &h_full_paytos[i]); 168 TALER_full_payto_normalize_and_hash (reserve->sender_account_details, 169 &h_normalized_paytos[i]); 170 notify_s[i] = compute_notify_on_reserve (reserve->reserve_pub); 171 reserve_pubs[i] = *reserve->reserve_pub; 172 balances[i] = *reserve->balance; 173 execution_times[i] = reserve->execution_time; 174 sender_account_details[i] = reserve->sender_account_details.full_payto; 175 exchange_account_names[i] = reserve->exchange_account_name; 176 wire_references[i] = reserve->wire_reference; 177 } 178 179 /* NOTE: kind-of pointless to explicitly start a transaction here... */ 180 if (GNUNET_OK != 181 TEH_PG_preflight (pg)) 182 { 183 GNUNET_break (0); 184 qs = GNUNET_DB_STATUS_HARD_ERROR; 185 goto finished; 186 } 187 if (GNUNET_OK != 188 TEH_PG_start_read_committed (pg, 189 "READ_COMMITED")) 190 { 191 GNUNET_break (0); 192 qs = GNUNET_DB_STATUS_HARD_ERROR; 193 goto finished; 194 } 195 PREPARE (pg, 196 "reserves_insert_with_array", 197 "SELECT" 198 " transaction_duplicate" 199 ",ruuid" 200 " FROM exchange_do_array_reserves_insert" 201 " ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10,$11);"); 202 { 203 struct GNUNET_PQ_QueryParam params[] = { 204 GNUNET_PQ_query_param_timestamp (&gc), 205 GNUNET_PQ_query_param_timestamp (&reserve_expiration), 206 GNUNET_PQ_query_param_array_auto_from_type (reserves_length, 207 reserve_pubs, 208 pg->conn), 209 GNUNET_PQ_query_param_array_uint64 (reserves_length, 210 wire_references, 211 pg->conn), 212 TALER_PQ_query_param_array_amount ( 213 reserves_length, 214 balances, 215 pg->conn), 216 GNUNET_PQ_query_param_array_ptrs_string ( 217 reserves_length, 218 (const char **) exchange_account_names, 219 pg->conn), 220 GNUNET_PQ_query_param_array_timestamp ( 221 reserves_length, 222 execution_times, 223 pg->conn), 224 GNUNET_PQ_query_param_array_auto_from_type ( 225 reserves_length, 226 h_full_paytos, 227 pg->conn), 228 GNUNET_PQ_query_param_array_auto_from_type ( 229 reserves_length, 230 h_normalized_paytos, 231 pg->conn), 232 GNUNET_PQ_query_param_array_ptrs_string ( 233 reserves_length, 234 (const char **) sender_account_details, 235 pg->conn), 236 GNUNET_PQ_query_param_array_ptrs_string ( 237 reserves_length, 238 (const char **) notify_s, 239 pg->conn), 240 GNUNET_PQ_query_param_end 241 }; 242 struct Context ctx = { 243 .reserve_uuids = reserve_uuids, 244 .transaction_duplicates = transaction_duplicates, 245 .conflicts = conflicts, 246 .needs_update = false, 247 .status = GNUNET_OK 248 }; 249 250 qs = GNUNET_PQ_eval_prepared_multi_select (pg->conn, 251 "reserves_insert_with_array", 252 params, 253 &helper_cb, 254 &ctx); 255 GNUNET_PQ_cleanup_query_params_closures (params); 256 if ( (qs < 0) || 257 (GNUNET_OK != ctx.status) ) 258 { 259 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 260 "Failed to insert into reserves (%d)\n", 261 qs); 262 goto finished; 263 } 264 need_update = ctx.needs_update; 265 } 266 267 { 268 enum GNUNET_DB_QueryStatus cs; 269 270 cs = TEH_PG_commit (pg); 271 if (cs < 0) 272 { 273 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 274 "Failed to commit\n"); 275 qs = cs; 276 goto finished; 277 } 278 } 279 280 for (unsigned int i = 0; i<reserves_length; i++) 281 { 282 if (transaction_duplicates[i]) 283 dups++; 284 results[i] = transaction_duplicates[i] 285 ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS 286 : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 287 } 288 289 if (! need_update) 290 { 291 qs = reserves_length; 292 goto finished; 293 } 294 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 295 "Reserve update needed for some reserves in the batch\n"); 296 PREPARE (pg, 297 "reserves_update", 298 "SELECT" 299 " out_duplicate AS duplicate " 300 "FROM exchange_do_batch_reserves_update" 301 " ($1,$2,$3,$4,$5,$6,$7);"); 302 303 if (GNUNET_OK != 304 TEH_PG_start (pg, 305 "reserve-insert-continued")) 306 { 307 GNUNET_break (0); 308 qs = GNUNET_DB_STATUS_HARD_ERROR; 309 goto finished; 310 } 311 312 for (unsigned int i = 0; i<reserves_length; i++) 313 { 314 if (transaction_duplicates[i]) 315 continue; 316 if (! conflicts[i]) 317 continue; 318 { 319 bool duplicate; 320 struct GNUNET_PQ_QueryParam params[] = { 321 GNUNET_PQ_query_param_auto_from_type (&reserve_pubs[i]), 322 GNUNET_PQ_query_param_timestamp (&reserve_expiration), 323 GNUNET_PQ_query_param_uint64 (&wire_references[i]), 324 TALER_PQ_query_param_amount (pg->conn, 325 &balances[i]), 326 GNUNET_PQ_query_param_string (exchange_account_names[i]), 327 GNUNET_PQ_query_param_auto_from_type (&h_full_paytos[i]), 328 GNUNET_PQ_query_param_string (notify_s[i]), 329 GNUNET_PQ_query_param_end 330 }; 331 struct GNUNET_PQ_ResultSpec rs[] = { 332 GNUNET_PQ_result_spec_bool ("duplicate", 333 &duplicate), 334 GNUNET_PQ_result_spec_end 335 }; 336 enum GNUNET_DB_QueryStatus qsi; 337 338 qsi = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, 339 "reserves_update", 340 params, 341 rs); 342 if (qsi < 0) 343 { 344 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 345 "Failed to update reserves (%d)\n", 346 qsi); 347 results[i] = qsi; 348 goto finished; 349 } 350 results[i] = duplicate 351 ? GNUNET_DB_STATUS_SUCCESS_NO_RESULTS 352 : GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 353 } 354 } 355 { 356 enum GNUNET_DB_QueryStatus cs; 357 358 cs = TEH_PG_commit (pg); 359 if (cs < 0) 360 { 361 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 362 "Failed to commit\n"); 363 qs = cs; 364 goto finished; 365 } 366 } 367 finished: 368 for (unsigned int i = 0; i<reserves_length; i++) 369 GNUNET_free (notify_s[i]); 370 if (qs < 0) 371 return qs; 372 GNUNET_PQ_event_do_poll (pg->conn); 373 if (0 != dups) 374 GNUNET_log (GNUNET_ERROR_TYPE_WARNING, 375 "%u/%u duplicates among incoming transactions. Try increasing WIREWATCH_IDLE_SLEEP_INTERVAL in the [exchange] configuration section (if this happens a lot).\n", 376 dups, 377 reserves_length); 378 return qs; 379 }