cashless2ecash

cashless2ecash: pay with cards for digital cash (experimental)
Log | Files | Refs | README

db-postgres.go (27209B)


      1 // This file is part of taler-cashless2ecash.
      2 // Copyright (C) 2024 Joel Häberli
      3 //
      4 // taler-cashless2ecash is free software: you can redistribute it and/or modify it
      5 // under the terms of the GNU Affero General Public License as published
      6 // by the Free Software Foundation, either version 3 of the License,
      7 // or (at your option) any later version.
      8 //
      9 // taler-cashless2ecash is distributed in the hope that it will be useful, but
     10 // WITHOUT ANY WARRANTY; without even the implied warranty of
     11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
     12 // Affero General Public License for more details.
     13 //
     14 // You should have received a copy of the GNU Affero General Public License
     15 // along with this program.  If not, see <http://www.gnu.org/licenses/>.
     16 //
     17 // SPDX-License-Identifier: AGPL3.0-or-later
     18 
     19 package internal_postgres
     20 
     21 import (
     22 	internal_db "c2ec/internal/db"
     23 	internal_utils "c2ec/internal/utils"
     24 	"c2ec/pkg/config"
     25 	"c2ec/pkg/db"
     26 	public_db "c2ec/pkg/db"
     27 	"context"
     28 	"errors"
     29 	"fmt"
     30 	"math"
     31 	"os"
     32 	"strconv"
     33 	"strings"
     34 	"time"
     35 
     36 	"github.com/jackc/pgx/v5"
     37 	"github.com/jackc/pgx/v5/pgconn"
     38 	"github.com/jackc/pgx/v5/pgxpool"
     39 	"github.com/jackc/pgxlisten"
     40 )
     41 
     42 const PS_INSERT_WITHDRAWAL = "INSERT INTO " + internal_db.WITHDRAWAL_TABLE_NAME + " (" +
     43 	internal_db.WITHDRAWAL_FIELD_NAME_WOPID + ", " + internal_db.WITHDRAWAL_FIELD_NAME_RUID + ", " +
     44 	internal_db.WITHDRAWAL_FIELD_NAME_SUGGESTED_AMOUNT + ", " + internal_db.WITHDRAWAL_FIELD_NAME_AMOUNT + ", " +
     45 	internal_db.WITHDRAWAL_FIELD_NAME_TRANSACTION_ID + ", " + internal_db.WITHDRAWAL_FIELD_NAME_FEES + ", " +
     46 	internal_db.WITHDRAWAL_FIELD_NAME_TS + ", " + internal_db.WITHDRAWAL_FIELD_NAME_TERMINAL_ID +
     47 	") VALUES ($1,$2,($3,$4,$5),($6,$7,$8),$9,($10,$11,$12),$13,$14)"
     48 
     49 const PS_REGISTER_WITHDRAWAL_PARAMS = "UPDATE " + internal_db.WITHDRAWAL_TABLE_NAME + " SET (" +
     50 	internal_db.WITHDRAWAL_FIELD_NAME_RESPUBKEY + "," +
     51 	internal_db.WITHDRAWAL_FIELD_NAME_STATUS + "," +
     52 	internal_db.WITHDRAWAL_FIELD_NAME_TS + ")" +
     53 	" = ($1,$2,$3)" +
     54 	" WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_WOPID + "=$4"
     55 
     56 const PS_GET_UNCONFIRMED_WITHDRAWALS = "SELECT * FROM " + internal_db.WITHDRAWAL_TABLE_NAME +
     57 	" WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_STATUS + " = '" + string(internal_utils.SELECTED) + "'"
     58 
     59 const PS_PAYMENT_NOTIFICATION = "UPDATE " + internal_db.WITHDRAWAL_TABLE_NAME + " SET (" +
     60 	internal_db.WITHDRAWAL_FIELD_NAME_FEES + "," + internal_db.WITHDRAWAL_FIELD_NAME_TRANSACTION_ID + "," +
     61 	internal_db.WITHDRAWAL_FIELD_NAME_TERMINAL_ID + ")" +
     62 	" = (($1,$2,$3),$4,$5)" +
     63 	" WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_WOPID + "=$6"
     64 
     65 const PS_FINALISE_PAYMENT = "UPDATE " + internal_db.WITHDRAWAL_TABLE_NAME + " SET (" +
     66 	internal_db.WITHDRAWAL_FIELD_NAME_STATUS + "," +
     67 	internal_db.WITHDRAWAL_FIELD_NAME_COMPLETION_PROOF + "," +
     68 	internal_db.WITHDRAWAL_FIELD_NAME_CONFIRMED_ROW_ID + ")" +
     69 	" = ($1, $2, (SELECT ((SELECT MAX(" + internal_db.WITHDRAWAL_FIELD_NAME_CONFIRMED_ROW_ID + ") FROM " + internal_db.WITHDRAWAL_TABLE_NAME + " WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_STATUS + "='" + string(internal_utils.CONFIRMED) + "')+1)))" +
     70 	" WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_ID + "=$3"
     71 
     72 const PS_SET_LAST_RETRY = "UPDATE " + internal_db.WITHDRAWAL_TABLE_NAME +
     73 	" SET " + internal_db.WITHDRAWAL_FIELD_NAME_LAST_RETRY + "=$1" +
     74 	" WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_ID + "=$2"
     75 
     76 const PS_SET_RETRY_COUNTER = "UPDATE " + internal_db.WITHDRAWAL_TABLE_NAME +
     77 	" SET " + internal_db.WITHDRAWAL_FIELD_NAME_RETRY_COUNTER + "=$1" +
     78 	" WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_ID + "=$2"
     79 
     80 const PS_GET_WITHDRAWAL_BY_RUID = "SELECT * FROM " + internal_db.WITHDRAWAL_TABLE_NAME +
     81 	" WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_RUID + "=$1"
     82 
     83 const PS_GET_WITHDRAWAL_BY_ID = "SELECT * FROM " + internal_db.WITHDRAWAL_TABLE_NAME +
     84 	" WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_ID + "=$1"
     85 
     86 const PS_GET_WITHDRAWAL_BY_WOPID = "SELECT * FROM " + internal_db.WITHDRAWAL_TABLE_NAME +
     87 	" WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_WOPID + "=$1"
     88 
     89 const PS_GET_WITHDRAWAL_BY_PTID = "SELECT * FROM " + internal_db.WITHDRAWAL_TABLE_NAME +
     90 	" WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_TRANSACTION_ID + "=$1"
     91 
     92 const PS_GET_PROVIDER_BY_TERMINAL = "SELECT * FROM " + internal_db.PROVIDER_TABLE_NAME +
     93 	" WHERE " + internal_db.PROVIDER_FIELD_NAME_ID +
     94 	" = (SELECT " + internal_db.TERMINAL_FIELD_NAME_PROVIDER_ID + " FROM " + internal_db.TERMINAL_TABLE_NAME +
     95 	" WHERE " + internal_db.TERMINAL_FIELD_NAME_ID + "=$1)"
     96 
     97 const PS_GET_PROVIDER_BY_NAME = "SELECT * FROM " + internal_db.PROVIDER_TABLE_NAME +
     98 	" WHERE " + internal_db.PROVIDER_FIELD_NAME_NAME + "=$1"
     99 
    100 const PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE = "SELECT * FROM " + internal_db.PROVIDER_TABLE_NAME +
    101 	" WHERE " + internal_db.PROVIDER_FIELD_NAME_PAYTO_TARGET_TYPE + "=$1"
    102 
    103 const PS_GET_TERMINAL_BY_ID = "SELECT * FROM " + internal_db.TERMINAL_TABLE_NAME +
    104 	" WHERE " + internal_db.TERMINAL_FIELD_NAME_ID + "=$1"
    105 
    106 const PS_GET_TRANSFER_BY_ID = "SELECT * FROM " + internal_db.TRANSFER_TABLE_NAME +
    107 	" WHERE " + internal_db.TRANSFER_FIELD_NAME_ID + "=$1"
    108 
    109 const PS_GET_TRANSFER_BY_CREDIT_ACCOUNT = "SELECT * FROM " + internal_db.TRANSFER_TABLE_NAME +
    110 	" WHERE " + internal_db.TRANSFER_FIELD_NAME_CREDIT_ACCOUNT + "=$1"
    111 
    112 const PS_ADD_TRANSFER = "INSERT INTO " + internal_db.TRANSFER_TABLE_NAME +
    113 	" (" + internal_db.TRANSFER_FIELD_NAME_ID + ", " + internal_db.TRANSFER_FIELD_NAME_AMOUNT + ", " +
    114 	internal_db.TRANSFER_FIELD_NAME_EXCHANGE_BASE_URL + ", " + internal_db.TRANSFER_FIELD_NAME_WTID + ", " +
    115 	internal_db.TRANSFER_FIELD_NAME_CREDIT_ACCOUNT + ", " + internal_db.TRANSFER_FIELD_NAME_TS +
    116 	") VALUES ($1,$2,$3,$4,$5,$6)"
    117 
    118 const PS_UPDATE_TRANSFER = "UPDATE " + internal_db.TRANSFER_TABLE_NAME + " SET (" +
    119 	internal_db.TRANSFER_FIELD_NAME_TS + ", " + internal_db.TRANSFER_FIELD_NAME_STATUS + ", " +
    120 	internal_db.TRANSFER_FIELD_NAME_RETRIES + ", " + internal_db.TRANSFER_FIELD_NAME_TRANSFERRED_ROW_ID + ") = ($1,$2,$3," +
    121 	"(SELECT ((SELECT MAX(" + internal_db.TRANSFER_FIELD_NAME_TRANSFERRED_ROW_ID + ") FROM " + internal_db.TRANSFER_TABLE_NAME + " WHERE " + internal_db.TRANSFER_FIELD_NAME_STATUS + "=0)+1))" +
    122 	") WHERE " + internal_db.TRANSFER_FIELD_NAME_ID + "=$4"
    123 
    124 const PS_CONFIRMED_TRANSACTIONS_ASC = "SELECT * FROM c2ec.withdrawal WHERE confirmed_row_id > $1 ORDER BY confirmed_row_id ASC LIMIT $2"
    125 
    126 const PS_CONFIRMED_TRANSACTIONS_DESC = "SELECT * FROM c2ec.withdrawal WHERE confirmed_row_id < $1 ORDER BY confirmed_row_id DESC LIMIT $2"
    127 
    128 const PS_CONFIRMED_TRANSACTIONS_ASC_MAX = "SELECT * FROM c2ec.withdrawal WHERE confirmed_row_id > $1 ORDER BY confirmed_row_id ASC LIMIT $2"
    129 
    130 const PS_CONFIRMED_TRANSACTIONS_DESC_MAX = "SELECT * FROM c2ec.withdrawal WHERE confirmed_row_id < (SELECT MAX(confirmed_row_id) FROM c2ec.withdrawal) ORDER BY confirmed_row_id DESC LIMIT $1"
    131 
    132 const PS_GET_TRANSFERS_ASC = "SELECT * FROM c2ec.transfer WHERE transferred_row_id > $1 ORDER BY transferred_row_id ASC LIMIT $2"
    133 
    134 const PS_GET_TRANSFERS_DESC = "SELECT * FROM c2ec.transfer WHERE transferred_row_id < $1 ORDER BY transferred_row_id DESC LIMIT $2"
    135 
    136 const PS_GET_TRANSFERS_ASC_MAX = "SELECT * FROM c2ec.transfer WHERE transferred_row_id > $1 ORDER BY transferred_row_id ASC LIMIT $2"
    137 
    138 const PS_GET_TRANSFERS_DESC_MAX = "SELECT * FROM c2ec.transfer WHERE transferred_row_id < (SELECT MAX(transferred_row_id) FROM c2ec.transfer) ORDER BY transferred_row_id DESC LIMIT $1"
    139 
    140 const PS_GET_TRANSFERS_BY_STATUS = "SELECT * FROM " + internal_db.TRANSFER_TABLE_NAME +
    141 	" WHERE " + internal_db.TRANSFER_FIELD_NAME_STATUS + "=$1"
    142 
    143 // Postgres implementation of the C2ECDatabase
    144 type C2ECPostgres struct {
    145 	public_db.C2ECDatabase
    146 
    147 	ctx  context.Context
    148 	pool *pgxpool.Pool
    149 }
    150 
    151 func PostgresConnectionString(cfg *config.C2ECDatabseConfig) string {
    152 
    153 	if cfg.ConnectionString != "" {
    154 		return cfg.ConnectionString
    155 	}
    156 
    157 	pgHost := os.Getenv("PGHOST")
    158 	if pgHost != "" {
    159 		internal_utils.LogInfo("postgres", "pghost was set")
    160 	} else {
    161 		pgHost = cfg.Host
    162 	}
    163 
    164 	pgPort := os.Getenv("PGPORT")
    165 	if pgPort != "" {
    166 		internal_utils.LogInfo("postgres", "pgport was set")
    167 	} else {
    168 		pgPort = strconv.Itoa(cfg.Port)
    169 	}
    170 
    171 	pgUsername := os.Getenv("PGUSER")
    172 	if pgUsername != "" {
    173 		internal_utils.LogInfo("postgres", "pghost was set")
    174 	} else {
    175 		pgUsername = cfg.Username
    176 	}
    177 
    178 	pgPassword := os.Getenv("PGPASSWORD")
    179 	if pgPassword != "" {
    180 		internal_utils.LogInfo("postgres", "pghost was set")
    181 	} else {
    182 		pgPassword = cfg.Password
    183 	}
    184 
    185 	pgDb := os.Getenv("PGDATABASE")
    186 	if pgDb != "" {
    187 		internal_utils.LogInfo("postgres", "pghost was set")
    188 	} else {
    189 		pgDb = cfg.Database
    190 	}
    191 
    192 	return fmt.Sprintf(
    193 		"postgres://%s:%s@%s:%s/%s",
    194 		pgUsername,
    195 		pgPassword,
    196 		pgHost,
    197 		pgPort,
    198 		pgDb,
    199 	)
    200 }
    201 
    202 func NewC2ECPostgres(cfg *config.C2ECDatabseConfig) (*C2ECPostgres, error) {
    203 
    204 	ctx := context.Background()
    205 	db := new(C2ECPostgres)
    206 
    207 	connectionString := PostgresConnectionString(cfg)
    208 
    209 	dbConnCfg, err := pgxpool.ParseConfig(connectionString)
    210 	if err != nil {
    211 		panic(err.Error())
    212 	}
    213 	dbConnCfg.AfterConnect = db.registerCustomTypesHook
    214 	db.pool, err = pgxpool.NewWithConfig(context.Background(), dbConnCfg)
    215 	if err != nil {
    216 		panic(err.Error())
    217 	}
    218 
    219 	db.ctx = ctx
    220 
    221 	return db, nil
    222 }
    223 
    224 func (db *C2ECPostgres) registerCustomTypesHook(ctx context.Context, conn *pgx.Conn) error {
    225 
    226 	t, err := conn.LoadType(ctx, "c2ec.taler_amount_currency")
    227 	if err != nil {
    228 		return err
    229 	}
    230 
    231 	conn.TypeMap().RegisterType(t)
    232 	return nil
    233 }
    234 
    235 func (db *C2ECPostgres) SetupWithdrawal(
    236 	wopid []byte,
    237 	suggestedAmount internal_utils.Amount,
    238 	amount internal_utils.Amount,
    239 	terminalId int,
    240 	providerTransactionId string,
    241 	terminalFees internal_utils.Amount,
    242 	requestUid string,
    243 ) error {
    244 
    245 	ts := time.Now()
    246 	res, err := db.pool.Exec(
    247 		db.ctx,
    248 		PS_INSERT_WITHDRAWAL,
    249 		wopid,
    250 		requestUid,
    251 		suggestedAmount.Value,
    252 		suggestedAmount.Fraction,
    253 		suggestedAmount.Currency,
    254 		amount.Value,
    255 		amount.Fraction,
    256 		amount.Currency,
    257 		providerTransactionId,
    258 		terminalFees.Value,
    259 		terminalFees.Fraction,
    260 		terminalFees.Currency,
    261 		ts.Unix(),
    262 		terminalId,
    263 	)
    264 	if err != nil {
    265 		internal_utils.LogError("postgres", err)
    266 		return err
    267 	}
    268 	internal_utils.LogInfo("postgres", "query="+PS_INSERT_WITHDRAWAL)
    269 	internal_utils.LogInfo("postgres", "setup withdrawal successfully. affected rows="+strconv.Itoa(int(res.RowsAffected())))
    270 	return nil
    271 }
    272 
    273 func (db *C2ECPostgres) RegisterWithdrawalParameters(
    274 	wopid []byte,
    275 	resPubKey internal_utils.EddsaPublicKey,
    276 ) error {
    277 
    278 	resPubKeyBytes, err := internal_utils.ParseEddsaPubKey(resPubKey)
    279 	if err != nil {
    280 		return err
    281 	}
    282 
    283 	ts := time.Now()
    284 	res, err := db.pool.Exec(
    285 		db.ctx,
    286 		PS_REGISTER_WITHDRAWAL_PARAMS,
    287 		resPubKeyBytes,
    288 		internal_utils.SELECTED,
    289 		ts.Unix(),
    290 		wopid,
    291 	)
    292 	if err != nil {
    293 		internal_utils.LogError("postgres", err)
    294 		return err
    295 	}
    296 	internal_utils.LogInfo("postgres", "query="+PS_REGISTER_WITHDRAWAL_PARAMS)
    297 	internal_utils.LogInfo("postgres", "registered withdrawal successfully. affected rows="+strconv.Itoa(int(res.RowsAffected())))
    298 	return nil
    299 }
    300 
    301 func (db *C2ECPostgres) GetWithdrawalByRequestUid(requestUid string) (*db.Withdrawal, error) {
    302 
    303 	if row, err := db.pool.Query(
    304 		db.ctx,
    305 		PS_GET_WITHDRAWAL_BY_RUID,
    306 		requestUid,
    307 	); err != nil {
    308 		internal_utils.LogError("postgres", err)
    309 		if row != nil {
    310 			row.Close()
    311 		}
    312 		return nil, err
    313 	} else {
    314 		defer row.Close()
    315 		internal_utils.LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_RUID)
    316 		collected, err := pgx.CollectOneRow(row, pgx.RowToAddrOfStructByName[public_db.Withdrawal])
    317 		if err != nil {
    318 			if errors.Is(err, pgx.ErrNoRows) {
    319 				return nil, nil
    320 			}
    321 			return nil, err
    322 		}
    323 		return collected, nil
    324 	}
    325 }
    326 
    327 func (db *C2ECPostgres) GetWithdrawalById(withdrawalId int) (*db.Withdrawal, error) {
    328 
    329 	if row, err := db.pool.Query(
    330 		db.ctx,
    331 		PS_GET_WITHDRAWAL_BY_ID,
    332 		withdrawalId,
    333 	); err != nil {
    334 		internal_utils.LogError("postgres", err)
    335 		if row != nil {
    336 			row.Close()
    337 		}
    338 		return nil, err
    339 	} else {
    340 
    341 		defer row.Close()
    342 		internal_utils.LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_ID)
    343 		return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Withdrawal])
    344 	}
    345 }
    346 
    347 func (db *C2ECPostgres) GetWithdrawalByWopid(wopid []byte) (*public_db.Withdrawal, error) {
    348 
    349 	if row, err := db.pool.Query(
    350 		db.ctx,
    351 		PS_GET_WITHDRAWAL_BY_WOPID,
    352 		wopid,
    353 	); err != nil {
    354 		internal_utils.LogError("postgres", err)
    355 		if row != nil {
    356 			row.Close()
    357 		}
    358 		return nil, err
    359 	} else {
    360 
    361 		defer row.Close()
    362 		internal_utils.LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_WOPID)
    363 		return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Withdrawal])
    364 	}
    365 }
    366 
    367 func (db *C2ECPostgres) GetWithdrawalByProviderTransactionId(tid string) (*public_db.Withdrawal, error) {
    368 	if row, err := db.pool.Query(
    369 		db.ctx,
    370 		PS_GET_WITHDRAWAL_BY_PTID,
    371 		tid,
    372 	); err != nil {
    373 		internal_utils.LogInfo("postgres", "failed query="+PS_GET_WITHDRAWAL_BY_PTID)
    374 		internal_utils.LogError("postgres", err)
    375 		if row != nil {
    376 			row.Close()
    377 		}
    378 		return nil, err
    379 	} else {
    380 
    381 		defer row.Close()
    382 		internal_utils.LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_PTID)
    383 		return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Withdrawal])
    384 	}
    385 }
    386 
    387 func (db *C2ECPostgres) NotifyPayment(
    388 	wopid []byte,
    389 	providerTransactionId string,
    390 	terminalId int,
    391 	fees internal_utils.Amount,
    392 ) error {
    393 
    394 	res, err := db.pool.Exec(
    395 		db.ctx,
    396 		PS_PAYMENT_NOTIFICATION,
    397 		fees.Value,
    398 		fees.Fraction,
    399 		fees.Currency,
    400 		providerTransactionId,
    401 		terminalId,
    402 		wopid,
    403 	)
    404 	if err != nil {
    405 		internal_utils.LogError("postgres", err)
    406 		return err
    407 	}
    408 	internal_utils.LogInfo("postgres", "query="+PS_PAYMENT_NOTIFICATION+", affected rows="+strconv.Itoa(int(res.RowsAffected())))
    409 	return nil
    410 }
    411 
    412 func (db *C2ECPostgres) GetWithdrawalsForConfirmation() ([]*public_db.Withdrawal, error) {
    413 
    414 	if row, err := db.pool.Query(
    415 		db.ctx,
    416 		PS_GET_UNCONFIRMED_WITHDRAWALS,
    417 	); err != nil {
    418 		internal_utils.LogError("postgres", err)
    419 		if row != nil {
    420 			row.Close()
    421 		}
    422 		return nil, err
    423 	} else {
    424 
    425 		defer row.Close()
    426 
    427 		withdrawals, err := pgx.CollectRows(row, pgx.RowToAddrOfStructByName[public_db.Withdrawal])
    428 		if err != nil {
    429 			internal_utils.LogError("postgres", err)
    430 			return nil, err
    431 		}
    432 
    433 		// potentially fills the logs
    434 		// internal_utils.LogInfo("postgres", "query="+PS_GET_UNCONFIRMED_WITHDRAWALS)
    435 		return internal_utils.RemoveNulls(withdrawals), nil
    436 	}
    437 }
    438 
    439 func (db *C2ECPostgres) FinaliseWithdrawal(
    440 	withdrawalId int,
    441 	confirmOrAbort internal_utils.WithdrawalOperationStatus,
    442 	completionProof []byte,
    443 ) error {
    444 
    445 	if confirmOrAbort != internal_utils.CONFIRMED && confirmOrAbort != internal_utils.ABORTED {
    446 		return errors.New("can only finalise payment when new status is either confirmed or aborted")
    447 	}
    448 
    449 	query := PS_FINALISE_PAYMENT
    450 	if withdrawalId <= 1 {
    451 		// tweak to intially set confirmed_row_id. Can be removed once confirmed_row_id field is obsolete
    452 		query = "UPDATE c2ec.withdrawal SET (withdrawal_status,completion_proof,confirmed_row_id) = ($1,$2,1) WHERE withdrawal_row_id=$3"
    453 	}
    454 
    455 	_, err := db.pool.Exec(
    456 		db.ctx,
    457 		query,
    458 		confirmOrAbort,
    459 		completionProof,
    460 		withdrawalId,
    461 	)
    462 	if err != nil {
    463 		internal_utils.LogError("postgres", err)
    464 		return err
    465 	}
    466 	internal_utils.LogInfo("postgres", "query="+query)
    467 	return nil
    468 }
    469 
    470 func (db *C2ECPostgres) SetLastRetry(withdrawalId int, lastRetryTsUnix int64) error {
    471 
    472 	_, err := db.pool.Exec(
    473 		db.ctx,
    474 		PS_SET_LAST_RETRY,
    475 		lastRetryTsUnix,
    476 		withdrawalId,
    477 	)
    478 	if err != nil {
    479 		internal_utils.LogError("postgres", err)
    480 		return err
    481 	}
    482 	internal_utils.LogInfo("postgres", "query="+PS_SET_LAST_RETRY)
    483 	return nil
    484 }
    485 
    486 func (db *C2ECPostgres) SetRetryCounter(withdrawalId int, retryCounter int) error {
    487 
    488 	_, err := db.pool.Exec(
    489 		db.ctx,
    490 		PS_SET_RETRY_COUNTER,
    491 		retryCounter,
    492 		withdrawalId,
    493 	)
    494 	if err != nil {
    495 		internal_utils.LogError("postgres", err)
    496 		return err
    497 	}
    498 	internal_utils.LogInfo("postgres", "query="+PS_SET_RETRY_COUNTER)
    499 	return nil
    500 }
    501 
    502 // The query at the postgres database works as specified by the
    503 // wire gateway api.
    504 func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int, since time.Time) ([]*public_db.Withdrawal, error) {
    505 
    506 	// +d / +s
    507 	query := PS_CONFIRMED_TRANSACTIONS_ASC
    508 	if delta < 0 {
    509 		// d negatives indicates DESC ordering and backwards reading
    510 		// -d / +s
    511 		query = PS_CONFIRMED_TRANSACTIONS_DESC
    512 		if start < 0 {
    513 			// start negative indicates not explicitly given
    514 			// since -d is the case here we try reading the latest entries
    515 			// -d / -s
    516 			query = PS_CONFIRMED_TRANSACTIONS_DESC_MAX
    517 		}
    518 	} else {
    519 		if start < 0 {
    520 			// +d / -s
    521 			query = PS_CONFIRMED_TRANSACTIONS_ASC_MAX
    522 		}
    523 	}
    524 
    525 	limit := int(math.Abs(float64(delta)))
    526 	offset := start
    527 	if offset < 0 {
    528 		offset = 0
    529 	}
    530 
    531 	if start < 0 {
    532 		start = 0
    533 	}
    534 
    535 	internal_utils.LogInfo("postgres", fmt.Sprintf("selected query=%s (\nparameters:\n delta=%d,\n start=%d, limit=%d,\n offset=%d,\n since=%d\n)", query, delta, start, limit, offset, since.Unix()))
    536 
    537 	var row pgx.Rows
    538 	var err error
    539 
    540 	if strings.Count(query, "$") == 1 {
    541 		row, err = db.pool.Query(
    542 			db.ctx,
    543 			query,
    544 			limit,
    545 		)
    546 	} else {
    547 		row, err = db.pool.Query(
    548 			db.ctx,
    549 			query,
    550 			offset,
    551 			limit,
    552 		)
    553 	}
    554 
    555 	internal_utils.LogInfo("postgres", "query="+query)
    556 	if err != nil {
    557 		internal_utils.LogError("postgres", err)
    558 		if row != nil {
    559 			row.Close()
    560 		}
    561 		return nil, err
    562 	} else {
    563 
    564 		defer row.Close()
    565 
    566 		withdrawals, err := pgx.CollectRows(row, pgx.RowToAddrOfStructByName[public_db.Withdrawal])
    567 		if err != nil {
    568 			internal_utils.LogError("postgres", err)
    569 			return nil, err
    570 		}
    571 
    572 		return internal_utils.RemoveNulls(withdrawals), nil
    573 	}
    574 }
    575 
    576 func (db *C2ECPostgres) GetProviderByTerminal(terminalId int) (*public_db.Provider, error) {
    577 
    578 	if row, err := db.pool.Query(
    579 		db.ctx,
    580 		PS_GET_PROVIDER_BY_TERMINAL,
    581 		terminalId,
    582 	); err != nil {
    583 		internal_utils.LogWarn("postgres", "failed query="+PS_GET_PROVIDER_BY_TERMINAL)
    584 		internal_utils.LogError("postgres", err)
    585 		if row != nil {
    586 			row.Close()
    587 		}
    588 		return nil, err
    589 	} else {
    590 
    591 		defer row.Close()
    592 
    593 		provider, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Provider])
    594 		if err != nil {
    595 			internal_utils.LogError("postgres", err)
    596 			return nil, err
    597 		}
    598 
    599 		internal_utils.LogInfo("postgres", "query="+PS_GET_PROVIDER_BY_TERMINAL)
    600 		return provider, nil
    601 	}
    602 }
    603 
    604 func (db *C2ECPostgres) GetTerminalProviderByName(name string) (*public_db.Provider, error) {
    605 
    606 	if row, err := db.pool.Query(
    607 		db.ctx,
    608 		PS_GET_PROVIDER_BY_NAME,
    609 		name,
    610 	); err != nil {
    611 		internal_utils.LogWarn("postgres", "failed query="+PS_GET_PROVIDER_BY_NAME)
    612 		internal_utils.LogError("postgres", err)
    613 		if row != nil {
    614 			row.Close()
    615 		}
    616 		return nil, err
    617 	} else {
    618 
    619 		defer row.Close()
    620 
    621 		provider, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Provider])
    622 		if err != nil {
    623 			internal_utils.LogWarn("postgres", "failed query="+PS_GET_PROVIDER_BY_NAME)
    624 			internal_utils.LogError("postgres", err)
    625 			return nil, err
    626 		}
    627 
    628 		internal_utils.LogInfo("postgres", "query="+PS_GET_PROVIDER_BY_NAME)
    629 		return provider, nil
    630 	}
    631 }
    632 
    633 func (db *C2ECPostgres) GetTerminalProviderByPaytoTargetType(paytoTargetType string) (*public_db.Provider, error) {
    634 
    635 	internal_utils.LogInfo("postgres", "loading provider for payto-target-type="+paytoTargetType)
    636 	if row, err := db.pool.Query(
    637 		db.ctx,
    638 		PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE,
    639 		paytoTargetType,
    640 	); err != nil {
    641 		internal_utils.LogWarn("postgres", "failed query="+PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE)
    642 		internal_utils.LogError("postgres", err)
    643 		if row != nil {
    644 			row.Close()
    645 		}
    646 		return nil, err
    647 	} else {
    648 
    649 		defer row.Close()
    650 
    651 		provider, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Provider])
    652 		if err != nil {
    653 			internal_utils.LogWarn("postgres", "failed query="+PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE)
    654 			internal_utils.LogError("postgres", err)
    655 			return nil, err
    656 		}
    657 
    658 		internal_utils.LogInfo("postgres", "query="+PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE)
    659 		return provider, nil
    660 	}
    661 }
    662 
    663 func (db *C2ECPostgres) GetTerminalById(id int) (*public_db.Terminal, error) {
    664 
    665 	if row, err := db.pool.Query(
    666 		db.ctx,
    667 		PS_GET_TERMINAL_BY_ID,
    668 		id,
    669 	); err != nil {
    670 		internal_utils.LogWarn("postgres", "failed query="+PS_GET_TERMINAL_BY_ID)
    671 		internal_utils.LogError("postgres", err)
    672 		if row != nil {
    673 			row.Close()
    674 		}
    675 		return nil, err
    676 	} else {
    677 
    678 		defer row.Close()
    679 
    680 		terminal, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Terminal])
    681 		if err != nil {
    682 			internal_utils.LogWarn("postgres", "failed query="+PS_GET_TERMINAL_BY_ID)
    683 			internal_utils.LogError("postgres", err)
    684 			return nil, err
    685 		}
    686 
    687 		internal_utils.LogInfo("postgres", "query="+PS_GET_TERMINAL_BY_ID)
    688 		return terminal, nil
    689 	}
    690 }
    691 
    692 func (db *C2ECPostgres) GetTransferById(requestUid []byte) (*public_db.Transfer, error) {
    693 
    694 	if rows, err := db.pool.Query(
    695 		db.ctx,
    696 		PS_GET_TRANSFER_BY_ID,
    697 		requestUid,
    698 	); err != nil {
    699 		internal_utils.LogWarn("postgres", "failed query="+PS_GET_TRANSFER_BY_ID)
    700 		internal_utils.LogError("postgres", err)
    701 		if rows != nil {
    702 			rows.Close()
    703 		}
    704 		return nil, err
    705 	} else {
    706 
    707 		defer rows.Close()
    708 
    709 		transfer, err := pgx.CollectOneRow(rows, pgx.RowToAddrOfStructByName[public_db.Transfer])
    710 		if err != nil {
    711 			if errors.Is(err, pgx.ErrNoRows) {
    712 				return nil, nil
    713 			}
    714 			internal_utils.LogError("postgres", err)
    715 			return nil, err
    716 		}
    717 
    718 		internal_utils.LogInfo("postgres", "query="+PS_GET_TRANSFER_BY_ID)
    719 		return transfer, nil
    720 	}
    721 }
    722 
    723 func (db *C2ECPostgres) GetTransfersByCreditAccount(creditAccount string) ([]*public_db.Transfer, error) {
    724 
    725 	if rows, err := db.pool.Query(
    726 		db.ctx,
    727 		PS_GET_TRANSFER_BY_CREDIT_ACCOUNT,
    728 		creditAccount,
    729 	); err != nil {
    730 		internal_utils.LogWarn("postgres", "failed query="+PS_GET_TRANSFER_BY_CREDIT_ACCOUNT)
    731 		internal_utils.LogError("postgres", err)
    732 		if rows != nil {
    733 			rows.Close()
    734 		}
    735 		return nil, err
    736 	} else {
    737 
    738 		defer rows.Close()
    739 
    740 		transfers, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByName[public_db.Transfer])
    741 		if err != nil {
    742 			if errors.Is(err, pgx.ErrNoRows) {
    743 				return make([]*public_db.Transfer, 0), nil
    744 			}
    745 			internal_utils.LogError("postgres", err)
    746 			return nil, err
    747 		}
    748 
    749 		internal_utils.LogInfo("postgres", "query="+PS_GET_TRANSFER_BY_CREDIT_ACCOUNT)
    750 		return internal_utils.RemoveNulls(transfers), nil
    751 	}
    752 }
    753 
    754 func (db *C2ECPostgres) AddTransfer(
    755 	requestUid []byte,
    756 	amount *internal_utils.Amount,
    757 	exchangeBaseUrl string,
    758 	wtid string,
    759 	credit_account string,
    760 	ts time.Time,
    761 ) error {
    762 
    763 	dbAmount := internal_utils.TalerAmountCurrency{
    764 		Val:  int64(amount.Value),
    765 		Frac: int32(amount.Fraction),
    766 		Curr: amount.Currency,
    767 	}
    768 
    769 	_, err := db.pool.Exec(
    770 		db.ctx,
    771 		PS_ADD_TRANSFER,
    772 		requestUid,
    773 		dbAmount,
    774 		exchangeBaseUrl,
    775 		wtid,
    776 		credit_account,
    777 		ts.Unix(),
    778 	)
    779 	if err != nil {
    780 		internal_utils.LogInfo("postgres", "failed query="+PS_ADD_TRANSFER)
    781 		internal_utils.LogError("postgres", err)
    782 		return err
    783 	}
    784 	internal_utils.LogInfo("postgres", "query="+PS_ADD_TRANSFER)
    785 	return nil
    786 }
    787 
    788 func (db *C2ECPostgres) UpdateTransfer(
    789 	rowId int,
    790 	requestUid []byte,
    791 	timestamp int64,
    792 	status int16,
    793 	retries int16,
    794 ) error {
    795 
    796 	query := PS_UPDATE_TRANSFER
    797 	if rowId <= 1 {
    798 		// tweak to intially set transferred_row_id. Can be removed once transferred_row_id field is obsolete
    799 		query = "UPDATE c2ec.transfer SET (transfer_ts, transfer_status, retries, transferred_row_id) = ($1,$2,$3,1) WHERE request_uid=$4"
    800 	}
    801 
    802 	_, err := db.pool.Exec(
    803 		db.ctx,
    804 		query,
    805 		timestamp,
    806 		status,
    807 		retries,
    808 		requestUid,
    809 	)
    810 	if err != nil {
    811 		internal_utils.LogInfo("postgres", "failed query="+query)
    812 		internal_utils.LogError("postgres", err)
    813 		return err
    814 	}
    815 	internal_utils.LogInfo("postgres", "query="+query)
    816 	return nil
    817 }
    818 
    819 func (db *C2ECPostgres) GetTransfers(start int, delta int, since time.Time) ([]*public_db.Transfer, error) {
    820 
    821 	// +d / +s
    822 	query := PS_GET_TRANSFERS_ASC
    823 	if delta < 0 {
    824 		// d negatives indicates DESC ordering and backwards reading
    825 		// -d / +s
    826 		query = PS_GET_TRANSFERS_DESC
    827 		if start < 0 {
    828 			// start negative indicates not explicitly given
    829 			// since -d is the case here we try reading the latest entries
    830 			// -d / -s
    831 			query = PS_GET_TRANSFERS_DESC_MAX
    832 		}
    833 	} else {
    834 		if start < 0 {
    835 			// +d / -s
    836 			query = PS_GET_TRANSFERS_ASC_MAX
    837 		}
    838 	}
    839 
    840 	limit := int(math.Abs(float64(delta)))
    841 	offset := start
    842 	if offset < 0 {
    843 		offset = 0
    844 	}
    845 
    846 	if start < 0 {
    847 		start = 0
    848 	}
    849 
    850 	internal_utils.LogInfo("postgres", fmt.Sprintf("selected query=%s (\nparameters:\n delta=%d,\n start=%d, limit=%d,\n offset=%d,\n since=%d\n)", query, delta, start, limit, offset, since.Unix()))
    851 
    852 	var row pgx.Rows
    853 	var err error
    854 
    855 	if strings.Count(query, "$") == 1 {
    856 		row, err = db.pool.Query(
    857 			db.ctx,
    858 			query,
    859 			limit,
    860 		)
    861 	} else {
    862 		row, err = db.pool.Query(
    863 			db.ctx,
    864 			query,
    865 			offset,
    866 			limit,
    867 		)
    868 	}
    869 
    870 	if err != nil {
    871 		internal_utils.LogWarn("postgres", "failed query="+query)
    872 		internal_utils.LogError("postgres", err)
    873 		if row != nil {
    874 			row.Close()
    875 		}
    876 		return nil, err
    877 	} else {
    878 
    879 		defer row.Close()
    880 
    881 		transfers, err := pgx.CollectRows(row, pgx.RowToAddrOfStructByName[public_db.Transfer])
    882 		if err != nil {
    883 			internal_utils.LogWarn("postgres", "failed query="+query)
    884 			internal_utils.LogError("postgres", err)
    885 			return nil, err
    886 		}
    887 
    888 		return internal_utils.RemoveNulls(transfers), nil
    889 	}
    890 }
    891 
    892 func (db *C2ECPostgres) GetTransfersByState(status int) ([]*public_db.Transfer, error) {
    893 
    894 	if rows, err := db.pool.Query(
    895 		db.ctx,
    896 		PS_GET_TRANSFERS_BY_STATUS,
    897 		status,
    898 	); err != nil {
    899 		internal_utils.LogError("postgres", err)
    900 		if rows != nil {
    901 			rows.Close()
    902 		}
    903 		return nil, err
    904 	} else {
    905 
    906 		defer rows.Close()
    907 
    908 		transfers, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByName[public_db.Transfer])
    909 		if err != nil {
    910 			internal_utils.LogWarn("postgres", "failed query="+PS_GET_TRANSFERS_BY_STATUS)
    911 			internal_utils.LogError("postgres", err)
    912 			return nil, err
    913 		}
    914 
    915 		// this will fill up the logs...
    916 		// internal_utils.LogInfo("postgres", "query="+PS_GET_TRANSFERS_BY_STATUS)
    917 		// internal_utils.LogInfo("postgres", "size of transfer list="+strconv.Itoa(len(transfers)))
    918 		return internal_utils.RemoveNulls(transfers), nil
    919 	}
    920 }
    921 
    922 // Sets up a a listener for the given channel.
    923 // Notifications will be sent through the out channel.
    924 func (db *C2ECPostgres) NewListener(
    925 	cn string,
    926 	out chan *public_db.Notification,
    927 ) (func(context.Context) error, error) {
    928 
    929 	connectionString := PostgresConnectionString(&config.CONFIG.Database)
    930 	cfg, err := pgx.ParseConfig(connectionString)
    931 	if err != nil {
    932 		return nil, err
    933 	}
    934 
    935 	listener := &pgxlisten.Listener{
    936 		Connect: func(ctx context.Context) (*pgx.Conn, error) {
    937 			internal_utils.LogInfo("postgres", "listener connecting to the database")
    938 			return pgx.ConnectConfig(ctx, cfg)
    939 		},
    940 	}
    941 
    942 	internal_utils.LogInfo("postgres", "handling notifications on channel="+cn)
    943 	listener.Handle(cn, pgxlisten.HandlerFunc(func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error {
    944 		internal_utils.LogInfo("postgres", fmt.Sprintf("handling postgres notification. channel=%s", notification.Channel))
    945 		out <- &public_db.Notification{
    946 			Channel: notification.Channel,
    947 			Payload: notification.Payload,
    948 		}
    949 		return nil
    950 	}))
    951 
    952 	return listener.Listen, nil
    953 }