pg_begin_revolving_shard.c (8987B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2022 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU General Public License for more details. 12 13 You should have received a copy of the GNU General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 /** 17 * @file exchangedb/pg_begin_revolving_shard.c 18 * @brief Implementation of the begin_revolving_shard function for Postgres 19 * @author Christian Grothoff 20 */ 21 #include "taler/platform.h" 22 #include "taler/taler_error_codes.h" 23 #include "taler/taler_dbevents.h" 24 #include "taler/taler_pq_lib.h" 25 #include "pg_begin_revolving_shard.h" 26 #include "pg_commit.h" 27 #include "pg_helper.h" 28 #include "pg_start.h" 29 #include "pg_rollback.h" 30 31 enum GNUNET_DB_QueryStatus 32 TEH_PG_begin_revolving_shard (void *cls, 33 const char *job_name, 34 uint32_t shard_size, 35 uint32_t shard_limit, 36 uint32_t *start_row, 37 uint32_t *end_row) 38 { 39 struct PostgresClosure *pg = cls; 40 41 GNUNET_assert (shard_limit <= 1U + (uint32_t) INT_MAX); 42 GNUNET_assert (shard_limit > 0); 43 GNUNET_assert (shard_size > 0); 44 for (unsigned int retries = 0; retries<3; retries++) 45 { 46 if (GNUNET_OK != 47 TEH_PG_start (pg, 48 "begin_revolving_shard")) 49 { 50 GNUNET_break (0); 51 return GNUNET_DB_STATUS_HARD_ERROR; 52 } 53 54 /* First, find last 'end_row' */ 55 { 56 enum GNUNET_DB_QueryStatus qs; 57 uint32_t last_end; 58 struct GNUNET_PQ_QueryParam params[] = { 59 GNUNET_PQ_query_param_string (job_name), 60 GNUNET_PQ_query_param_end 61 }; 62 struct GNUNET_PQ_ResultSpec rs[] = { 63 GNUNET_PQ_result_spec_uint32 ("end_row", 64 &last_end), 65 GNUNET_PQ_result_spec_end 66 }; 67 /* Used in #postgres_begin_revolving_shard() */ 68 PREPARE (pg, 69 "get_last_revolving_shard", 70 "SELECT" 71 " end_row" 72 " FROM revolving_work_shards" 73 " WHERE job_name=$1" 74 " ORDER BY end_row DESC" 75 " LIMIT 1;"); 76 qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, 77 "get_last_revolving_shard", 78 params, 79 rs); 80 switch (qs) 81 { 82 case GNUNET_DB_STATUS_HARD_ERROR: 83 GNUNET_break (0); 84 TEH_PG_rollback (pg); 85 return qs; 86 case GNUNET_DB_STATUS_SOFT_ERROR: 87 TEH_PG_rollback (pg); 88 continue; 89 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 90 *start_row = 1U + last_end; 91 break; 92 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 93 *start_row = 0; /* base-case: no shards yet */ 94 break; /* continued below */ 95 } 96 } /* get_last_shard */ 97 98 if (*start_row < shard_limit) 99 { 100 /* Claim fresh shard */ 101 enum GNUNET_DB_QueryStatus qs; 102 struct GNUNET_TIME_Absolute now; 103 struct GNUNET_PQ_QueryParam params[] = { 104 GNUNET_PQ_query_param_string (job_name), 105 GNUNET_PQ_query_param_absolute_time (&now), 106 GNUNET_PQ_query_param_uint32 (start_row), 107 GNUNET_PQ_query_param_uint32 (end_row), 108 GNUNET_PQ_query_param_end 109 }; 110 111 *end_row = GNUNET_MIN (shard_limit, 112 *start_row + shard_size - 1); 113 now = GNUNET_TIME_absolute_get (); 114 GNUNET_log (GNUNET_ERROR_TYPE_INFO, 115 "Trying to claim shard %llu-%llu\n", 116 (unsigned long long) *start_row, 117 (unsigned long long) *end_row); 118 119 /* Used in #postgres_claim_revolving_shard() */ 120 PREPARE (pg, 121 "create_revolving_shard", 122 "INSERT INTO revolving_work_shards" 123 "(job_name" 124 ",last_attempt" 125 ",start_row" 126 ",end_row" 127 ",active" 128 ") VALUES " 129 "($1, $2, $3, $4, TRUE);"); 130 qs = GNUNET_PQ_eval_prepared_non_select (pg->conn, 131 "create_revolving_shard", 132 params); 133 switch (qs) 134 { 135 case GNUNET_DB_STATUS_HARD_ERROR: 136 GNUNET_break (0); 137 TEH_PG_rollback (pg); 138 return qs; 139 case GNUNET_DB_STATUS_SOFT_ERROR: 140 TEH_PG_rollback (pg); 141 continue; 142 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 143 /* continued below (with commit) */ 144 break; 145 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 146 /* someone else got this shard already, 147 try again */ 148 TEH_PG_rollback (pg); 149 continue; 150 } 151 } /* end create fresh reovlving shard */ 152 else 153 { 154 /* claim oldest existing shard */ 155 enum GNUNET_DB_QueryStatus qs; 156 struct GNUNET_PQ_QueryParam params[] = { 157 GNUNET_PQ_query_param_string (job_name), 158 GNUNET_PQ_query_param_end 159 }; 160 struct GNUNET_PQ_ResultSpec rs[] = { 161 GNUNET_PQ_result_spec_uint32 ("start_row", 162 start_row), 163 GNUNET_PQ_result_spec_uint32 ("end_row", 164 end_row), 165 GNUNET_PQ_result_spec_end 166 }; 167 168 PREPARE (pg, 169 "get_open_revolving_shard", 170 "SELECT" 171 " start_row" 172 ",end_row" 173 " FROM revolving_work_shards" 174 " WHERE job_name=$1" 175 " AND active=FALSE" 176 " ORDER BY last_attempt ASC" 177 " LIMIT 1;"); 178 qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn, 179 "get_open_revolving_shard", 180 params, 181 rs); 182 switch (qs) 183 { 184 case GNUNET_DB_STATUS_HARD_ERROR: 185 GNUNET_break (0); 186 TEH_PG_rollback (pg); 187 return qs; 188 case GNUNET_DB_STATUS_SOFT_ERROR: 189 TEH_PG_rollback (pg); 190 continue; 191 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 192 /* no open shards available */ 193 TEH_PG_rollback (pg); 194 return qs; 195 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 196 { 197 enum GNUNET_DB_QueryStatus qsz; 198 struct GNUNET_TIME_Timestamp now; 199 struct GNUNET_PQ_QueryParam iparams[] = { 200 GNUNET_PQ_query_param_string (job_name), 201 GNUNET_PQ_query_param_timestamp (&now), 202 GNUNET_PQ_query_param_uint32 (start_row), 203 GNUNET_PQ_query_param_uint32 (end_row), 204 GNUNET_PQ_query_param_end 205 }; 206 207 now = GNUNET_TIME_timestamp_get (); 208 PREPARE (pg, 209 "reclaim_revolving_shard", 210 "UPDATE revolving_work_shards" 211 " SET last_attempt=$2" 212 " ,active=TRUE" 213 " WHERE job_name=$1" 214 " AND start_row=$3" 215 " AND end_row=$4"); 216 qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn, 217 "reclaim_revolving_shard", 218 iparams); 219 switch (qsz) 220 { 221 case GNUNET_DB_STATUS_HARD_ERROR: 222 GNUNET_break (0); 223 TEH_PG_rollback (pg); 224 return qs; 225 case GNUNET_DB_STATUS_SOFT_ERROR: 226 TEH_PG_rollback (pg); 227 continue; 228 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 229 break; /* continue with commit */ 230 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 231 GNUNET_break (0); /* logic error, should be impossible */ 232 TEH_PG_rollback (pg); 233 return GNUNET_DB_STATUS_HARD_ERROR; 234 } 235 } 236 break; /* continue with commit */ 237 } 238 } /* end claim oldest existing shard */ 239 240 /* commit */ 241 { 242 enum GNUNET_DB_QueryStatus qs; 243 244 qs = TEH_PG_commit (pg); 245 switch (qs) 246 { 247 case GNUNET_DB_STATUS_HARD_ERROR: 248 GNUNET_break (0); 249 TEH_PG_rollback (pg); 250 return qs; 251 case GNUNET_DB_STATUS_SOFT_ERROR: 252 TEH_PG_rollback (pg); 253 continue; 254 case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS: 255 case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT: 256 return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT; 257 } 258 } 259 } /* retry 'for' loop */ 260 return GNUNET_DB_STATUS_SOFT_ERROR; 261 }