pg_begin_shard.c (9055B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2022 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file exchangedb/pg_begin_shard.c 18 * @brief Implementation of the begin_shard function for Postgres 19 * @author Christian Grothoff 20 */ 21 #include "taler/platform.h" 22 #include "taler/taler_error_codes.h" 23 #include "taler/taler_dbevents.h" 24 #include "taler/taler_pq_lib.h" 25 #include "pg_begin_shard.h" 26 #include "pg_helper.h" 27 #include "pg_start.h" 28 #include "pg_rollback.h" 29 #include "pg_commit.h" 30 31 32 enum GNUNET_DB_QueryStatus 33 TEH_PG_begin_shard (void *cls, 34 const char *job_name, 35 struct GNUNET_TIME_Relative delay, 36 uint64_t shard_size, 37 uint64_t *start_row, 38 uint64_t *end_row) 39 { 40 struct PostgresClosure *pg = cls; 41 42 for (unsigned int retries = 0; retries<10; retries++) 43 { 44 if (GNUNET_OK != 45 TEH_PG_start (pg, 46 "begin_shard")) 47 { 48 GNUNET_break (0); 49 return GNUNET_DB_STATUS_HARD_ERROR; 50 } 51 52 { 53 struct GNUNET_TIME_Absolute past; 54 enum GNUNET_DB_QueryStatus qs; 55 struct GNUNET_PQ_QueryParam params[] = { 56 GNUNET_PQ_query_param_string (job_name), 57 GNUNET_PQ_query_param_absolute_time (&past), 58 GNUNET_PQ_query_param_end 59 }; 60 struct GNUNET_PQ_ResultSpec rs[] = { 61 GNUNET_PQ_result_spec_uint64 ("start_row", 62 start_row), 63 GNUNET_PQ_result_spec_uint64 ("end_row", 64 end_row), 65 GNUNET_PQ_result_spec_end 66 }; 67 68 past = GNUNET_TIME_absolute_get (); 69 PREPARE (pg, 70 "get_open_shard", 71 "SELECT" 72 " start_row" 73 ",end_row" 74 " FROM work_shards" 75 " WHERE job_name=$1" 76 " AND completed=FALSE" 77 " AND last_attempt<$2" 78 " ORDER BY last_attempt ASC" 79 " LIMIT 1;"); 80 qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, 81 "get_open_shard", 82 params, 83 rs); 84 switch (qs) 85 { 86 case GNUNET_DB_STATUS_HARD_ERROR: 87 GNUNET_break (0); 88 TEH_PG_rollback (pg); 89 return qs; 90 case GNUNET_DB_STATUS_SOFT_ERROR: 91 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 92 "Serialization error on getting open shard\n"); 93 TEH_PG_rollback (pg); 94 continue; 95 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 96 { 97 enum GNUNET_DB_QueryStatus qsz; 98 struct GNUNET_TIME_Absolute now; 99 struct GNUNET_PQ_QueryParam iparams[] = { 100 GNUNET_PQ_query_param_string (job_name), 101 GNUNET_PQ_query_param_absolute_time (&now), 102 GNUNET_PQ_query_param_uint64 (start_row), 103 GNUNET_PQ_query_param_uint64 (end_row), 104 GNUNET_PQ_query_param_end 105 }; 106 107 now = GNUNET_TIME_relative_to_absolute (delay); 108 PREPARE (pg, 109 "reclaim_shard", 110 "UPDATE work_shards" 111 " SET last_attempt=$2" 112 " WHERE job_name=$1" 113 " AND start_row=$3" 114 " AND end_row=$4"); 115 qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn, 116 "reclaim_shard", 117 iparams); 118 switch (qsz) 119 { 120 case GNUNET_DB_STATUS_HARD_ERROR: 121 GNUNET_break (0); 122 TEH_PG_rollback (pg); 123 return qsz; 124 case GNUNET_DB_STATUS_SOFT_ERROR: 125 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 126 "Serialization error on claiming open shard\n"); 127 TEH_PG_rollback (pg); 128 continue; 129 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 130 goto commit; 131 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 132 GNUNET_break (0); /* logic error, should be impossible */ 133 TEH_PG_rollback (pg); 134 return GNUNET_DB_STATUS_HARD_ERROR; 135 } 136 } 137 break; /* actually unreachable */ 138 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 139 break; /* continued below */ 140 } 141 } /* get_open_shard */ 142 143 /* No open shard, find last 'end_row' */ 144 { 145 enum GNUNET_DB_QueryStatus qs; 146 struct GNUNET_PQ_QueryParam params[] = { 147 GNUNET_PQ_query_param_string (job_name), 148 GNUNET_PQ_query_param_end 149 }; 150 struct GNUNET_PQ_ResultSpec rs[] = { 151 GNUNET_PQ_result_spec_uint64 ("end_row", 152 start_row), 153 GNUNET_PQ_result_spec_end 154 }; 155 156 PREPARE (pg, 157 "get_last_shard", 158 "SELECT" 159 " end_row" 160 " FROM work_shards" 161 " WHERE job_name=$1" 162 " ORDER BY end_row DESC" 163 " LIMIT 1;"); 164 qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, 165 "get_last_shard", 166 params, 167 rs); 168 switch (qs) 169 { 170 case GNUNET_DB_STATUS_HARD_ERROR: 171 GNUNET_break (0); 172 TEH_PG_rollback (pg); 173 return qs; 174 case GNUNET_DB_STATUS_SOFT_ERROR: 175 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 176 "Serialization error on getting last shard\n"); 177 TEH_PG_rollback (pg); 178 continue; 179 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 180 break; 181 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 182 *start_row = 0; /* base-case: no shards yet */ 183 break; /* continued below */ 184 } 185 *end_row = *start_row + shard_size; 186 } /* get_last_shard */ 187 188 /* Claim fresh shard */ 189 { 190 enum GNUNET_DB_QueryStatus qs; 191 struct GNUNET_TIME_Absolute now; 192 struct GNUNET_PQ_QueryParam params[] = { 193 GNUNET_PQ_query_param_string (job_name), 194 GNUNET_PQ_query_param_absolute_time (&now), 195 GNUNET_PQ_query_param_uint64 (start_row), 196 GNUNET_PQ_query_param_uint64 (end_row), 197 GNUNET_PQ_query_param_end 198 }; 199 200 now = GNUNET_TIME_relative_to_absolute (delay); 201 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 202 "Trying to claim shard (%llu-%llu]\n", 203 (unsigned long long) *start_row, 204 (unsigned long long) *end_row); 205 206 PREPARE (pg, 207 "claim_next_shard", 208 "INSERT INTO work_shards" 209 "(job_name" 210 ",last_attempt" 211 ",start_row" 212 ",end_row" 213 ") VALUES " 214 "($1, $2, $3, $4);"); 215 qs = GNUNET_PQ_eval_prepared_non_select (pg->conn, 216 "claim_next_shard", 217 params); 218 switch (qs) 219 { 220 case GNUNET_DB_STATUS_HARD_ERROR: 221 GNUNET_break (0); 222 TEH_PG_rollback (pg); 223 return qs; 224 case GNUNET_DB_STATUS_SOFT_ERROR: 225 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 226 "Serialization error on claiming next shard\n"); 227 TEH_PG_rollback (pg); 228 continue; 229 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 230 /* continued below */ 231 break; 232 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 233 /* someone else got this shard already, 234 try again */ 235 TEH_PG_rollback (pg); 236 continue; 237 } 238 } /* claim_next_shard */ 239 240 /* commit */ 241 commit: 242 { 243 enum GNUNET_DB_QueryStatus qs; 244 245 qs = TEH_PG_commit (pg); 246 switch (qs) 247 { 248 case GNUNET_DB_STATUS_HARD_ERROR: 249 GNUNET_break (0); 250 TEH_PG_rollback (pg); 251 return qs; 252 case GNUNET_DB_STATUS_SOFT_ERROR: 253 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 254 "Serialization error on commit for beginning shard\n"); 255 TEH_PG_rollback (pg); 256 continue; 257 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 258 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 259 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 260 "Claimed new shard\n"); 261 return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 262 } 263 } 264 } /* retry 'for' loop */ 265 return GNUNET_DB_STATUS_SOFT_ERROR; 266 }