exchange

Base system with REST service to issue digital coins, run by the payment service provider
Log | Files | Refs | Submodules | README | LICENSE

pg_begin_revolving_shard.c (8987B)


      1 /*
      2    This file is part of TALER
      3    Copyright (C) 2022 Taler Systems SA
      4 
      5    TALER is free software; you can redistribute it and/or modify it under the
      6    terms of the GNU General Public License as published by the Free Software
      7    Foundation; either version 3, or (at your option) any later version.
      8 
      9    TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10    WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11    A PARTICULAR PURPOSE.  See the GNU General Public License for more details.
     12 
     13    You should have received a copy of the GNU General Public License along with
     14    TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15  */
     16 /**
     17  * @file exchangedb/pg_begin_revolving_shard.c
     18  * @brief Implementation of the begin_revolving_shard function for Postgres
     19  * @author Christian Grothoff
     20  */
     21 #include "taler/platform.h"
     22 #include "taler/taler_error_codes.h"
     23 #include "taler/taler_dbevents.h"
     24 #include "taler/taler_pq_lib.h"
     25 #include "pg_begin_revolving_shard.h"
     26 #include "pg_commit.h"
     27 #include "pg_helper.h"
     28 #include "pg_start.h"
     29 #include "pg_rollback.h"
     30 
     31 enum GNUNET_DB_QueryStatus
     32 TEH_PG_begin_revolving_shard (void *cls,
     33                               const char *job_name,
     34                               uint32_t shard_size,
     35                               uint32_t shard_limit,
     36                               uint32_t *start_row,
     37                               uint32_t *end_row)
     38 {
     39   struct PostgresClosure *pg = cls;
     40 
     41   GNUNET_assert (shard_limit <= 1U + (uint32_t) INT_MAX);
     42   GNUNET_assert (shard_limit > 0);
     43   GNUNET_assert (shard_size > 0);
     44   for (unsigned int retries = 0; retries<3; retries++)
     45   {
     46     if (GNUNET_OK !=
     47         TEH_PG_start (pg,
     48                       "begin_revolving_shard"))
     49     {
     50       GNUNET_break (0);
     51       return GNUNET_DB_STATUS_HARD_ERROR;
     52     }
     53 
     54     /* First, find last 'end_row' */
     55     {
     56       enum GNUNET_DB_QueryStatus qs;
     57       uint32_t last_end;
     58       struct GNUNET_PQ_QueryParam params[] = {
     59         GNUNET_PQ_query_param_string (job_name),
     60         GNUNET_PQ_query_param_end
     61       };
     62       struct GNUNET_PQ_ResultSpec rs[] = {
     63         GNUNET_PQ_result_spec_uint32 ("end_row",
     64                                       &last_end),
     65         GNUNET_PQ_result_spec_end
     66       };
     67       /* Used in #postgres_begin_revolving_shard() */
     68       PREPARE (pg,
     69                "get_last_revolving_shard",
     70                "SELECT"
     71                " end_row"
     72                " FROM revolving_work_shards"
     73                " WHERE job_name=$1"
     74                " ORDER BY end_row DESC"
     75                " LIMIT 1;");
     76       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
     77                                                      "get_last_revolving_shard",
     78                                                      params,
     79                                                      rs);
     80       switch (qs)
     81       {
     82       case GNUNET_DB_STATUS_HARD_ERROR:
     83         GNUNET_break (0);
     84         TEH_PG_rollback (pg);
     85         return qs;
     86       case GNUNET_DB_STATUS_SOFT_ERROR:
     87         TEH_PG_rollback (pg);
     88         continue;
     89       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
     90         *start_row = 1U + last_end;
     91         break;
     92       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
     93         *start_row = 0; /* base-case: no shards yet */
     94         break; /* continued below */
     95       }
     96     } /* get_last_shard */
     97 
     98     if (*start_row < shard_limit)
     99     {
    100       /* Claim fresh shard */
    101       enum GNUNET_DB_QueryStatus qs;
    102       struct GNUNET_TIME_Absolute now;
    103       struct GNUNET_PQ_QueryParam params[] = {
    104         GNUNET_PQ_query_param_string (job_name),
    105         GNUNET_PQ_query_param_absolute_time (&now),
    106         GNUNET_PQ_query_param_uint32 (start_row),
    107         GNUNET_PQ_query_param_uint32 (end_row),
    108         GNUNET_PQ_query_param_end
    109       };
    110 
    111       *end_row = GNUNET_MIN (shard_limit,
    112                              *start_row + shard_size - 1);
    113       now = GNUNET_TIME_absolute_get ();
    114       GNUNET_log (GNUNET_ERROR_TYPE_INFO,
    115                   "Trying to claim shard %llu-%llu\n",
    116                   (unsigned long long) *start_row,
    117                   (unsigned long long) *end_row);
    118 
    119       /* Used in #postgres_claim_revolving_shard() */
    120       PREPARE (pg,
    121                "create_revolving_shard",
    122                "INSERT INTO revolving_work_shards"
    123                "(job_name"
    124                ",last_attempt"
    125                ",start_row"
    126                ",end_row"
    127                ",active"
    128                ") VALUES "
    129                "($1, $2, $3, $4, TRUE);");
    130       qs = GNUNET_PQ_eval_prepared_non_select (pg->conn,
    131                                                "create_revolving_shard",
    132                                                params);
    133       switch (qs)
    134       {
    135       case GNUNET_DB_STATUS_HARD_ERROR:
    136         GNUNET_break (0);
    137         TEH_PG_rollback (pg);
    138         return qs;
    139       case GNUNET_DB_STATUS_SOFT_ERROR:
    140         TEH_PG_rollback (pg);
    141         continue;
    142       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    143         /* continued below (with commit) */
    144         break;
    145       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    146         /* someone else got this shard already,
    147            try again */
    148         TEH_PG_rollback (pg);
    149         continue;
    150       }
    151     } /* end create fresh reovlving shard */
    152     else
    153     {
    154       /* claim oldest existing shard */
    155       enum GNUNET_DB_QueryStatus qs;
    156       struct GNUNET_PQ_QueryParam params[] = {
    157         GNUNET_PQ_query_param_string (job_name),
    158         GNUNET_PQ_query_param_end
    159       };
    160       struct GNUNET_PQ_ResultSpec rs[] = {
    161         GNUNET_PQ_result_spec_uint32 ("start_row",
    162                                       start_row),
    163         GNUNET_PQ_result_spec_uint32 ("end_row",
    164                                       end_row),
    165         GNUNET_PQ_result_spec_end
    166       };
    167 
    168       PREPARE (pg,
    169                "get_open_revolving_shard",
    170                "SELECT"
    171                " start_row"
    172                ",end_row"
    173                " FROM revolving_work_shards"
    174                " WHERE job_name=$1"
    175                "   AND active=FALSE"
    176                " ORDER BY last_attempt ASC"
    177                " LIMIT 1;");
    178       qs = GNUNET_PQ_eval_prepared_singleton_select (pg->conn,
    179                                                      "get_open_revolving_shard",
    180                                                      params,
    181                                                      rs);
    182       switch (qs)
    183       {
    184       case GNUNET_DB_STATUS_HARD_ERROR:
    185         GNUNET_break (0);
    186         TEH_PG_rollback (pg);
    187         return qs;
    188       case GNUNET_DB_STATUS_SOFT_ERROR:
    189         TEH_PG_rollback (pg);
    190         continue;
    191       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    192         /* no open shards available */
    193         TEH_PG_rollback (pg);
    194         return qs;
    195       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    196         {
    197           enum GNUNET_DB_QueryStatus qsz;
    198           struct GNUNET_TIME_Timestamp now;
    199           struct GNUNET_PQ_QueryParam iparams[] = {
    200             GNUNET_PQ_query_param_string (job_name),
    201             GNUNET_PQ_query_param_timestamp (&now),
    202             GNUNET_PQ_query_param_uint32 (start_row),
    203             GNUNET_PQ_query_param_uint32 (end_row),
    204             GNUNET_PQ_query_param_end
    205           };
    206 
    207           now = GNUNET_TIME_timestamp_get ();
    208           PREPARE (pg,
    209                    "reclaim_revolving_shard",
    210                    "UPDATE revolving_work_shards"
    211                    " SET last_attempt=$2"
    212                    "    ,active=TRUE"
    213                    " WHERE job_name=$1"
    214                    "   AND start_row=$3"
    215                    "   AND end_row=$4");
    216           qsz = GNUNET_PQ_eval_prepared_non_select (pg->conn,
    217                                                     "reclaim_revolving_shard",
    218                                                     iparams);
    219           switch (qsz)
    220           {
    221           case GNUNET_DB_STATUS_HARD_ERROR:
    222             GNUNET_break (0);
    223             TEH_PG_rollback (pg);
    224             return qs;
    225           case GNUNET_DB_STATUS_SOFT_ERROR:
    226             TEH_PG_rollback (pg);
    227             continue;
    228           case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    229             break; /* continue with commit */
    230           case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    231             GNUNET_break (0); /* logic error, should be impossible */
    232             TEH_PG_rollback (pg);
    233             return GNUNET_DB_STATUS_HARD_ERROR;
    234           }
    235         }
    236         break; /* continue with commit */
    237       }
    238     } /* end claim oldest existing shard */
    239 
    240     /* commit */
    241     {
    242       enum GNUNET_DB_QueryStatus qs;
    243 
    244       qs = TEH_PG_commit (pg);
    245       switch (qs)
    246       {
    247       case GNUNET_DB_STATUS_HARD_ERROR:
    248         GNUNET_break (0);
    249         TEH_PG_rollback (pg);
    250         return qs;
    251       case GNUNET_DB_STATUS_SOFT_ERROR:
    252         TEH_PG_rollback (pg);
    253         continue;
    254       case GNUNET_DB_STATUS_SUCCESS_NO_RESULTS:
    255       case GNUNET_DB_STATUS_SUCCESS_ONE_RESULT:
    256         return GNUNET_DB_STATUS_SUCCESS_ONE_RESULT;
    257       }
    258     }
    259   } /* retry 'for' loop */
    260   return GNUNET_DB_STATUS_SOFT_ERROR;
    261 }