exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

pg_begin_shard.c (9055B)


      1 /*
      2    This file is part of TALER
      3    Copyright (C) 2022 Taler Systems SA
      4 
      5    TALER is free software; you can redistribute it and/or modify it under the
      6    terms of the GNU General Public License as published by the Free Software
      7    Foundation; either version 3, or (at your option) any later version.
      8 
      9    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
     12 
     13    You should have received a copy of the GNU General Public License along with
     14    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15  */
     16 /**
     17  * @file exchangedb/pg_begin_shard.c
     18  * @brief Implementation of the begin_shard function for Postgres
     19  * @author Christian Grothoff
     20  */
     21 #include "taler/platform.h"
     22 #include "taler/taler_error_codes.h"
     23 #include "taler/taler_dbevents.h"
     24 #include "taler/taler_pq_lib.h"
     25 #include "pg_begin_shard.h"
     26 #include "pg_helper.h"
     27 #include "pg_start.h"
     28 #include "pg_rollback.h"
     29 #include "pg_commit.h"
     30 
     31 
     32 enum GNUNET_DB_QueryStatus
     33 TEH_PG_begin_shard (void *cls,
     34                     const char *job_name,
     35                     struct GNUNET_TIME_Relative delay,
     36                     uint64_t shard_size,
     37                     uint64_t *start_row,
     38                     uint64_t *end_row)
     39 {
     40   struct PostgresClosure *pg = cls;
     41 
     42   for (unsigned int retries = 0; retries<10; retries++)
     43   {
     44     if (GNUNET_OK !=
     45         TEH_PG_start (pg,
     46                       "begin_shard"))
     47     {
     48       GNUNET_break (0);
     49       return GNUNET_DB_STATUS_HARD_ERROR;
     50     }
     51 
     52     {
     53       struct GNUNET_TIME_Absolute past;
     54       enum GNUNET_DB_QueryStatus qs;
     55       struct GNUNET_PQ_QueryParam params[] = {
     56         GNUNET_PQ_query_param_string (job_name),
     57         GNUNET_PQ_query_param_absolute_time (&past),
     58         GNUNET_PQ_query_param_end
     59       };
     60       struct GNUNET_PQ_ResultSpec rs[] = {
     61         GNUNET_PQ_result_spec_uint64 ("start_row",
     62                                       start_row),
     63         GNUNET_PQ_result_spec_uint64 ("end_row",
     64                                       end_row),
     65         GNUNET_PQ_result_spec_end
     66       };
     67 
     68       past = GNUNET_TIME_absolute_get ();
     69       PREPARE (pg,
     70                "get_open_shard",
     71                "SELECT"
     72                " start_row"
     73                ",end_row"
     74                " FROM work_shards"
     75                " WHERE job_name=$1"
     76                "   AND completed=FALSE"
     77                "   AND last_attempt<$2"
     78                " ORDER BY last_attempt ASC"
     79                " LIMIT 1;");
     80       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     81                                                      "get_open_shard",
     82                                                      params,
     83                                                      rs);
     84       switch (qs)
     85       {
     86       case GNUNET_DB_STATUS_HARD_ERROR:
     87         GNUNET_break (0);
     88         TEH_PG_rollback (pg);
     89         return qs;
     90       case GNUNET_DB_STATUS_SOFT_ERROR:
     91         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
     92                     "Serialization error on getting open shard\n");
     93         TEH_PG_rollback (pg);
     94         continue;
     95       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     96         {
     97           enum GNUNET_DB_QueryStatus qsz;
     98           struct GNUNET_TIME_Absolute now;
     99           struct GNUNET_PQ_QueryParam iparams[] = {
    100             GNUNET_PQ_query_param_string (job_name),
    101             GNUNET_PQ_query_param_absolute_time (&now),
    102             GNUNET_PQ_query_param_uint64 (start_row),
    103             GNUNET_PQ_query_param_uint64 (end_row),
    104             GNUNET_PQ_query_param_end
    105           };
    106 
    107           now = GNUNET_TIME_relative_to_absolute (delay);
    108           PREPARE (pg,
    109                    "reclaim_shard",
    110                    "UPDATE work_shards"
    111                    " SET last_attempt=$2"
    112                    " WHERE job_name=$1"
    113                    "   AND start_row=$3"
    114                    "   AND end_row=$4");
    115           qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
    116                                                     "reclaim_shard",
    117                                                     iparams);
    118           switch (qsz)
    119           {
    120           case GNUNET_DB_STATUS_HARD_ERROR:
    121             GNUNET_break (0);
    122             TEH_PG_rollback (pg);
    123             return qsz;
    124           case GNUNET_DB_STATUS_SOFT_ERROR:
    125             GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    126                         "Serialization error on claiming open shard\n");
    127             TEH_PG_rollback (pg);
    128             continue;
    129           case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    130             goto commit;
    131           case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    132             GNUNET_break (0); /* logic error, should be impossible */
    133             TEH_PG_rollback (pg);
    134             return GNUNET_DB_STATUS_HARD_ERROR;
    135           }
    136         }
    137         break; /* actually unreachable */
    138       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    139         break; /* continued below */
    140       }
    141     } /* get_open_shard */
    142 
    143     /* No open shard, find last 'end_row' */
    144     {
    145       enum GNUNET_DB_QueryStatus qs;
    146       struct GNUNET_PQ_QueryParam params[] = {
    147         GNUNET_PQ_query_param_string (job_name),
    148         GNUNET_PQ_query_param_end
    149       };
    150       struct GNUNET_PQ_ResultSpec rs[] = {
    151         GNUNET_PQ_result_spec_uint64 ("end_row",
    152                                       start_row),
    153         GNUNET_PQ_result_spec_end
    154       };
    155 
    156       PREPARE (pg,
    157                "get_last_shard",
    158                "SELECT"
    159                " end_row"
    160                " FROM work_shards"
    161                " WHERE job_name=$1"
    162                " ORDER BY end_row DESC"
    163                " LIMIT 1;");
    164       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
    165                                                      "get_last_shard",
    166                                                      params,
    167                                                      rs);
    168       switch (qs)
    169       {
    170       case GNUNET_DB_STATUS_HARD_ERROR:
    171         GNUNET_break (0);
    172         TEH_PG_rollback (pg);
    173         return qs;
    174       case GNUNET_DB_STATUS_SOFT_ERROR:
    175         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    176                     "Serialization error on getting last shard\n");
    177         TEH_PG_rollback (pg);
    178         continue;
    179       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    180         break;
    181       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    182         *start_row = 0; /* base-case: no shards yet */
    183         break; /* continued below */
    184       }
    185       *end_row = *start_row + shard_size;
    186     } /* get_last_shard */
    187 
    188     /* Claim fresh shard */
    189     {
    190       enum GNUNET_DB_QueryStatus qs;
    191       struct GNUNET_TIME_Absolute now;
    192       struct GNUNET_PQ_QueryParam params[] = {
    193         GNUNET_PQ_query_param_string (job_name),
    194         GNUNET_PQ_query_param_absolute_time (&now),
    195         GNUNET_PQ_query_param_uint64 (start_row),
    196         GNUNET_PQ_query_param_uint64 (end_row),
    197         GNUNET_PQ_query_param_end
    198       };
    199 
    200       now = GNUNET_TIME_relative_to_absolute (delay);
    201       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    202                   "Trying to claim shard (%llu-%llu]\n",
    203                   (unsigned long long) *start_row,
    204                   (unsigned long long) *end_row);
    205 
    206       PREPARE (pg,
    207                "claim_next_shard",
    208                "INSERT INTO work_shards"
    209                "(job_name"
    210                ",last_attempt"
    211                ",start_row"
    212                ",end_row"
    213                ") VALUES "
    214                "($1, $2, $3, $4);");
    215       qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
    216                                                "claim_next_shard",
    217                                                params);
    218       switch (qs)
    219       {
    220       case GNUNET_DB_STATUS_HARD_ERROR:
    221         GNUNET_break (0);
    222         TEH_PG_rollback (pg);
    223         return qs;
    224       case GNUNET_DB_STATUS_SOFT_ERROR:
    225         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    226                     "Serialization error on claiming next shard\n");
    227         TEH_PG_rollback (pg);
    228         continue;
    229       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    230         /* continued below */
    231         break;
    232       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    233         /* someone else got this shard already,
    234            try again */
    235         TEH_PG_rollback (pg);
    236         continue;
    237       }
    238     } /* claim_next_shard */
    239 
    240     /* commit */
    241 commit:
    242     {
    243       enum GNUNET_DB_QueryStatus qs;
    244 
    245       qs = TEH_PG_commit (pg);
    246       switch (qs)
    247       {
    248       case GNUNET_DB_STATUS_HARD_ERROR:
    249         GNUNET_break (0);
    250         TEH_PG_rollback (pg);
    251         return qs;
    252       case GNUNET_DB_STATUS_SOFT_ERROR:
    253         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    254                     "Serialization error on commit for beginning shard\n");
    255         TEH_PG_rollback (pg);
    256         continue;
    257       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    258       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    259         GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    260                     "Claimed new shard\n");
    261         return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    262       }
    263     }
    264   } /* retry 'for' loop */
    265   return GNUNET_DB_STATUS_SOFT_ERROR;
    266 }