db-postgres.go (27209B)
1 // This file is part of taler-cashless2ecash. 2 // Copyright (C) 2024 Joel Häberli 3 // 4 // taler-cashless2ecash is free software: you can redistribute it and/or modify it 5 // under the terms of the GNU Affero General Public License as published 6 // by the Free Software Foundation, either version 3 of the License, 7 // or (at your option) any later version. 8 // 9 // taler-cashless2ecash is distributed in the hope that it will be useful, but 10 // WITHOUT ANY WARRANTY; without even the implied warranty of 11 // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU 12 // Affero General Public License for more details. 13 // 14 // You should have received a copy of the GNU Affero General Public License 15 // along with this program. If not, see <http://www.gnu.org/licenses/>. 16 // 17 // SPDX-License-Identifier: AGPL3.0-or-later 18 19 package internal_postgres 20 21 import ( 22 internal_db "c2ec/internal/db" 23 internal_utils "c2ec/internal/utils" 24 "c2ec/pkg/config" 25 "c2ec/pkg/db" 26 public_db "c2ec/pkg/db" 27 "context" 28 "errors" 29 "fmt" 30 "math" 31 "os" 32 "strconv" 33 "strings" 34 "time" 35 36 "github.com/jackc/pgx/v5" 37 "github.com/jackc/pgx/v5/pgconn" 38 "github.com/jackc/pgx/v5/pgxpool" 39 "github.com/jackc/pgxlisten" 40 ) 41 42 const PS_INSERT_WITHDRAWAL = "INSERT INTO " + internal_db.WITHDRAWAL_TABLE_NAME + " (" + 43 internal_db.WITHDRAWAL_FIELD_NAME_WOPID + ", " + internal_db.WITHDRAWAL_FIELD_NAME_RUID + ", " + 44 internal_db.WITHDRAWAL_FIELD_NAME_SUGGESTED_AMOUNT + ", " + internal_db.WITHDRAWAL_FIELD_NAME_AMOUNT + ", " + 45 internal_db.WITHDRAWAL_FIELD_NAME_TRANSACTION_ID + ", " + internal_db.WITHDRAWAL_FIELD_NAME_FEES + ", " + 46 internal_db.WITHDRAWAL_FIELD_NAME_TS + ", " + internal_db.WITHDRAWAL_FIELD_NAME_TERMINAL_ID + 47 ") VALUES ($1,$2,($3,$4,$5),($6,$7,$8),$9,($10,$11,$12),$13,$14)" 48 49 const PS_REGISTER_WITHDRAWAL_PARAMS = "UPDATE " + internal_db.WITHDRAWAL_TABLE_NAME + " SET (" + 50 internal_db.WITHDRAWAL_FIELD_NAME_RESPUBKEY + "," + 51 internal_db.WITHDRAWAL_FIELD_NAME_STATUS + "," + 52 internal_db.WITHDRAWAL_FIELD_NAME_TS + ")" + 53 " = ($1,$2,$3)" + 54 " WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_WOPID + "=$4" 55 56 const PS_GET_UNCONFIRMED_WITHDRAWALS = "SELECT * FROM " + internal_db.WITHDRAWAL_TABLE_NAME + 57 " WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_STATUS + " = '" + string(internal_utils.SELECTED) + "'" 58 59 const PS_PAYMENT_NOTIFICATION = "UPDATE " + internal_db.WITHDRAWAL_TABLE_NAME + " SET (" + 60 internal_db.WITHDRAWAL_FIELD_NAME_FEES + "," + internal_db.WITHDRAWAL_FIELD_NAME_TRANSACTION_ID + "," + 61 internal_db.WITHDRAWAL_FIELD_NAME_TERMINAL_ID + ")" + 62 " = (($1,$2,$3),$4,$5)" + 63 " WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_WOPID + "=$6" 64 65 const PS_FINALISE_PAYMENT = "UPDATE " + internal_db.WITHDRAWAL_TABLE_NAME + " SET (" + 66 internal_db.WITHDRAWAL_FIELD_NAME_STATUS + "," + 67 internal_db.WITHDRAWAL_FIELD_NAME_COMPLETION_PROOF + "," + 68 internal_db.WITHDRAWAL_FIELD_NAME_CONFIRMED_ROW_ID + ")" + 69 " = ($1, $2, (SELECT ((SELECT MAX(" + internal_db.WITHDRAWAL_FIELD_NAME_CONFIRMED_ROW_ID + ") FROM " + internal_db.WITHDRAWAL_TABLE_NAME + " WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_STATUS + "='" + string(internal_utils.CONFIRMED) + "')+1)))" + 70 " WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_ID + "=$3" 71 72 const PS_SET_LAST_RETRY = "UPDATE " + internal_db.WITHDRAWAL_TABLE_NAME + 73 " SET " + internal_db.WITHDRAWAL_FIELD_NAME_LAST_RETRY + "=$1" + 74 " WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_ID + "=$2" 75 76 const PS_SET_RETRY_COUNTER = "UPDATE " + internal_db.WITHDRAWAL_TABLE_NAME + 77 " SET " + internal_db.WITHDRAWAL_FIELD_NAME_RETRY_COUNTER + "=$1" + 78 " WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_ID + "=$2" 79 80 const PS_GET_WITHDRAWAL_BY_RUID = "SELECT * FROM " + internal_db.WITHDRAWAL_TABLE_NAME + 81 " WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_RUID + "=$1" 82 83 const PS_GET_WITHDRAWAL_BY_ID = "SELECT * FROM " + internal_db.WITHDRAWAL_TABLE_NAME + 84 " WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_ID + "=$1" 85 86 const PS_GET_WITHDRAWAL_BY_WOPID = "SELECT * FROM " + internal_db.WITHDRAWAL_TABLE_NAME + 87 " WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_WOPID + "=$1" 88 89 const PS_GET_WITHDRAWAL_BY_PTID = "SELECT * FROM " + internal_db.WITHDRAWAL_TABLE_NAME + 90 " WHERE " + internal_db.WITHDRAWAL_FIELD_NAME_TRANSACTION_ID + "=$1" 91 92 const PS_GET_PROVIDER_BY_TERMINAL = "SELECT * FROM " + internal_db.PROVIDER_TABLE_NAME + 93 " WHERE " + internal_db.PROVIDER_FIELD_NAME_ID + 94 " = (SELECT " + internal_db.TERMINAL_FIELD_NAME_PROVIDER_ID + " FROM " + internal_db.TERMINAL_TABLE_NAME + 95 " WHERE " + internal_db.TERMINAL_FIELD_NAME_ID + "=$1)" 96 97 const PS_GET_PROVIDER_BY_NAME = "SELECT * FROM " + internal_db.PROVIDER_TABLE_NAME + 98 " WHERE " + internal_db.PROVIDER_FIELD_NAME_NAME + "=$1" 99 100 const PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE = "SELECT * FROM " + internal_db.PROVIDER_TABLE_NAME + 101 " WHERE " + internal_db.PROVIDER_FIELD_NAME_PAYTO_TARGET_TYPE + "=$1" 102 103 const PS_GET_TERMINAL_BY_ID = "SELECT * FROM " + internal_db.TERMINAL_TABLE_NAME + 104 " WHERE " + internal_db.TERMINAL_FIELD_NAME_ID + "=$1" 105 106 const PS_GET_TRANSFER_BY_ID = "SELECT * FROM " + internal_db.TRANSFER_TABLE_NAME + 107 " WHERE " + internal_db.TRANSFER_FIELD_NAME_ID + "=$1" 108 109 const PS_GET_TRANSFER_BY_CREDIT_ACCOUNT = "SELECT * FROM " + internal_db.TRANSFER_TABLE_NAME + 110 " WHERE " + internal_db.TRANSFER_FIELD_NAME_CREDIT_ACCOUNT + "=$1" 111 112 const PS_ADD_TRANSFER = "INSERT INTO " + internal_db.TRANSFER_TABLE_NAME + 113 " (" + internal_db.TRANSFER_FIELD_NAME_ID + ", " + internal_db.TRANSFER_FIELD_NAME_AMOUNT + ", " + 114 internal_db.TRANSFER_FIELD_NAME_EXCHANGE_BASE_URL + ", " + internal_db.TRANSFER_FIELD_NAME_WTID + ", " + 115 internal_db.TRANSFER_FIELD_NAME_CREDIT_ACCOUNT + ", " + internal_db.TRANSFER_FIELD_NAME_TS + 116 ") VALUES ($1,$2,$3,$4,$5,$6)" 117 118 const PS_UPDATE_TRANSFER = "UPDATE " + internal_db.TRANSFER_TABLE_NAME + " SET (" + 119 internal_db.TRANSFER_FIELD_NAME_TS + ", " + internal_db.TRANSFER_FIELD_NAME_STATUS + ", " + 120 internal_db.TRANSFER_FIELD_NAME_RETRIES + ", " + internal_db.TRANSFER_FIELD_NAME_TRANSFERRED_ROW_ID + ") = ($1,$2,$3," + 121 "(SELECT ((SELECT MAX(" + internal_db.TRANSFER_FIELD_NAME_TRANSFERRED_ROW_ID + ") FROM " + internal_db.TRANSFER_TABLE_NAME + " WHERE " + internal_db.TRANSFER_FIELD_NAME_STATUS + "=0)+1))" + 122 ") WHERE " + internal_db.TRANSFER_FIELD_NAME_ID + "=$4" 123 124 const PS_CONFIRMED_TRANSACTIONS_ASC = "SELECT * FROM c2ec.withdrawal WHERE confirmed_row_id > $1 ORDER BY confirmed_row_id ASC LIMIT $2" 125 126 const PS_CONFIRMED_TRANSACTIONS_DESC = "SELECT * FROM c2ec.withdrawal WHERE confirmed_row_id < $1 ORDER BY confirmed_row_id DESC LIMIT $2" 127 128 const PS_CONFIRMED_TRANSACTIONS_ASC_MAX = "SELECT * FROM c2ec.withdrawal WHERE confirmed_row_id > $1 ORDER BY confirmed_row_id ASC LIMIT $2" 129 130 const PS_CONFIRMED_TRANSACTIONS_DESC_MAX = "SELECT * FROM c2ec.withdrawal WHERE confirmed_row_id < (SELECT MAX(confirmed_row_id) FROM c2ec.withdrawal) ORDER BY confirmed_row_id DESC LIMIT $1" 131 132 const PS_GET_TRANSFERS_ASC = "SELECT * FROM c2ec.transfer WHERE transferred_row_id > $1 ORDER BY transferred_row_id ASC LIMIT $2" 133 134 const PS_GET_TRANSFERS_DESC = "SELECT * FROM c2ec.transfer WHERE transferred_row_id < $1 ORDER BY transferred_row_id DESC LIMIT $2" 135 136 const PS_GET_TRANSFERS_ASC_MAX = "SELECT * FROM c2ec.transfer WHERE transferred_row_id > $1 ORDER BY transferred_row_id ASC LIMIT $2" 137 138 const PS_GET_TRANSFERS_DESC_MAX = "SELECT * FROM c2ec.transfer WHERE transferred_row_id < (SELECT MAX(transferred_row_id) FROM c2ec.transfer) ORDER BY transferred_row_id DESC LIMIT $1" 139 140 const PS_GET_TRANSFERS_BY_STATUS = "SELECT * FROM " + internal_db.TRANSFER_TABLE_NAME + 141 " WHERE " + internal_db.TRANSFER_FIELD_NAME_STATUS + "=$1" 142 143 // Postgres implementation of the C2ECDatabase 144 type C2ECPostgres struct { 145 public_db.C2ECDatabase 146 147 ctx context.Context 148 pool *pgxpool.Pool 149 } 150 151 func PostgresConnectionString(cfg *config.C2ECDatabseConfig) string { 152 153 if cfg.ConnectionString != "" { 154 return cfg.ConnectionString 155 } 156 157 pgHost := os.Getenv("PGHOST") 158 if pgHost != "" { 159 internal_utils.LogInfo("postgres", "pghost was set") 160 } else { 161 pgHost = cfg.Host 162 } 163 164 pgPort := os.Getenv("PGPORT") 165 if pgPort != "" { 166 internal_utils.LogInfo("postgres", "pgport was set") 167 } else { 168 pgPort = strconv.Itoa(cfg.Port) 169 } 170 171 pgUsername := os.Getenv("PGUSER") 172 if pgUsername != "" { 173 internal_utils.LogInfo("postgres", "pghost was set") 174 } else { 175 pgUsername = cfg.Username 176 } 177 178 pgPassword := os.Getenv("PGPASSWORD") 179 if pgPassword != "" { 180 internal_utils.LogInfo("postgres", "pghost was set") 181 } else { 182 pgPassword = cfg.Password 183 } 184 185 pgDb := os.Getenv("PGDATABASE") 186 if pgDb != "" { 187 internal_utils.LogInfo("postgres", "pghost was set") 188 } else { 189 pgDb = cfg.Database 190 } 191 192 return fmt.Sprintf( 193 "postgres://%s:%s@%s:%s/%s", 194 pgUsername, 195 pgPassword, 196 pgHost, 197 pgPort, 198 pgDb, 199 ) 200 } 201 202 func NewC2ECPostgres(cfg *config.C2ECDatabseConfig) (*C2ECPostgres, error) { 203 204 ctx := context.Background() 205 db := new(C2ECPostgres) 206 207 connectionString := PostgresConnectionString(cfg) 208 209 dbConnCfg, err := pgxpool.ParseConfig(connectionString) 210 if err != nil { 211 panic(err.Error()) 212 } 213 dbConnCfg.AfterConnect = db.registerCustomTypesHook 214 db.pool, err = pgxpool.NewWithConfig(context.Background(), dbConnCfg) 215 if err != nil { 216 panic(err.Error()) 217 } 218 219 db.ctx = ctx 220 221 return db, nil 222 } 223 224 func (db *C2ECPostgres) registerCustomTypesHook(ctx context.Context, conn *pgx.Conn) error { 225 226 t, err := conn.LoadType(ctx, "c2ec.taler_amount_currency") 227 if err != nil { 228 return err 229 } 230 231 conn.TypeMap().RegisterType(t) 232 return nil 233 } 234 235 func (db *C2ECPostgres) SetupWithdrawal( 236 wopid []byte, 237 suggestedAmount internal_utils.Amount, 238 amount internal_utils.Amount, 239 terminalId int, 240 providerTransactionId string, 241 terminalFees internal_utils.Amount, 242 requestUid string, 243 ) error { 244 245 ts := time.Now() 246 res, err := db.pool.Exec( 247 db.ctx, 248 PS_INSERT_WITHDRAWAL, 249 wopid, 250 requestUid, 251 suggestedAmount.Value, 252 suggestedAmount.Fraction, 253 suggestedAmount.Currency, 254 amount.Value, 255 amount.Fraction, 256 amount.Currency, 257 providerTransactionId, 258 terminalFees.Value, 259 terminalFees.Fraction, 260 terminalFees.Currency, 261 ts.Unix(), 262 terminalId, 263 ) 264 if err != nil { 265 internal_utils.LogError("postgres", err) 266 return err 267 } 268 internal_utils.LogInfo("postgres", "query="+PS_INSERT_WITHDRAWAL) 269 internal_utils.LogInfo("postgres", "setup withdrawal successfully. affected rows="+strconv.Itoa(int(res.RowsAffected()))) 270 return nil 271 } 272 273 func (db *C2ECPostgres) RegisterWithdrawalParameters( 274 wopid []byte, 275 resPubKey internal_utils.EddsaPublicKey, 276 ) error { 277 278 resPubKeyBytes, err := internal_utils.ParseEddsaPubKey(resPubKey) 279 if err != nil { 280 return err 281 } 282 283 ts := time.Now() 284 res, err := db.pool.Exec( 285 db.ctx, 286 PS_REGISTER_WITHDRAWAL_PARAMS, 287 resPubKeyBytes, 288 internal_utils.SELECTED, 289 ts.Unix(), 290 wopid, 291 ) 292 if err != nil { 293 internal_utils.LogError("postgres", err) 294 return err 295 } 296 internal_utils.LogInfo("postgres", "query="+PS_REGISTER_WITHDRAWAL_PARAMS) 297 internal_utils.LogInfo("postgres", "registered withdrawal successfully. affected rows="+strconv.Itoa(int(res.RowsAffected()))) 298 return nil 299 } 300 301 func (db *C2ECPostgres) GetWithdrawalByRequestUid(requestUid string) (*db.Withdrawal, error) { 302 303 if row, err := db.pool.Query( 304 db.ctx, 305 PS_GET_WITHDRAWAL_BY_RUID, 306 requestUid, 307 ); err != nil { 308 internal_utils.LogError("postgres", err) 309 if row != nil { 310 row.Close() 311 } 312 return nil, err 313 } else { 314 defer row.Close() 315 internal_utils.LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_RUID) 316 collected, err := pgx.CollectOneRow(row, pgx.RowToAddrOfStructByName[public_db.Withdrawal]) 317 if err != nil { 318 if errors.Is(err, pgx.ErrNoRows) { 319 return nil, nil 320 } 321 return nil, err 322 } 323 return collected, nil 324 } 325 } 326 327 func (db *C2ECPostgres) GetWithdrawalById(withdrawalId int) (*db.Withdrawal, error) { 328 329 if row, err := db.pool.Query( 330 db.ctx, 331 PS_GET_WITHDRAWAL_BY_ID, 332 withdrawalId, 333 ); err != nil { 334 internal_utils.LogError("postgres", err) 335 if row != nil { 336 row.Close() 337 } 338 return nil, err 339 } else { 340 341 defer row.Close() 342 internal_utils.LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_ID) 343 return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Withdrawal]) 344 } 345 } 346 347 func (db *C2ECPostgres) GetWithdrawalByWopid(wopid []byte) (*public_db.Withdrawal, error) { 348 349 if row, err := db.pool.Query( 350 db.ctx, 351 PS_GET_WITHDRAWAL_BY_WOPID, 352 wopid, 353 ); err != nil { 354 internal_utils.LogError("postgres", err) 355 if row != nil { 356 row.Close() 357 } 358 return nil, err 359 } else { 360 361 defer row.Close() 362 internal_utils.LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_WOPID) 363 return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Withdrawal]) 364 } 365 } 366 367 func (db *C2ECPostgres) GetWithdrawalByProviderTransactionId(tid string) (*public_db.Withdrawal, error) { 368 if row, err := db.pool.Query( 369 db.ctx, 370 PS_GET_WITHDRAWAL_BY_PTID, 371 tid, 372 ); err != nil { 373 internal_utils.LogInfo("postgres", "failed query="+PS_GET_WITHDRAWAL_BY_PTID) 374 internal_utils.LogError("postgres", err) 375 if row != nil { 376 row.Close() 377 } 378 return nil, err 379 } else { 380 381 defer row.Close() 382 internal_utils.LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_PTID) 383 return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Withdrawal]) 384 } 385 } 386 387 func (db *C2ECPostgres) NotifyPayment( 388 wopid []byte, 389 providerTransactionId string, 390 terminalId int, 391 fees internal_utils.Amount, 392 ) error { 393 394 res, err := db.pool.Exec( 395 db.ctx, 396 PS_PAYMENT_NOTIFICATION, 397 fees.Value, 398 fees.Fraction, 399 fees.Currency, 400 providerTransactionId, 401 terminalId, 402 wopid, 403 ) 404 if err != nil { 405 internal_utils.LogError("postgres", err) 406 return err 407 } 408 internal_utils.LogInfo("postgres", "query="+PS_PAYMENT_NOTIFICATION+", affected rows="+strconv.Itoa(int(res.RowsAffected()))) 409 return nil 410 } 411 412 func (db *C2ECPostgres) GetWithdrawalsForConfirmation() ([]*public_db.Withdrawal, error) { 413 414 if row, err := db.pool.Query( 415 db.ctx, 416 PS_GET_UNCONFIRMED_WITHDRAWALS, 417 ); err != nil { 418 internal_utils.LogError("postgres", err) 419 if row != nil { 420 row.Close() 421 } 422 return nil, err 423 } else { 424 425 defer row.Close() 426 427 withdrawals, err := pgx.CollectRows(row, pgx.RowToAddrOfStructByName[public_db.Withdrawal]) 428 if err != nil { 429 internal_utils.LogError("postgres", err) 430 return nil, err 431 } 432 433 // potentially fills the logs 434 // internal_utils.LogInfo("postgres", "query="+PS_GET_UNCONFIRMED_WITHDRAWALS) 435 return internal_utils.RemoveNulls(withdrawals), nil 436 } 437 } 438 439 func (db *C2ECPostgres) FinaliseWithdrawal( 440 withdrawalId int, 441 confirmOrAbort internal_utils.WithdrawalOperationStatus, 442 completionProof []byte, 443 ) error { 444 445 if confirmOrAbort != internal_utils.CONFIRMED && confirmOrAbort != internal_utils.ABORTED { 446 return errors.New("can only finalise payment when new status is either confirmed or aborted") 447 } 448 449 query := PS_FINALISE_PAYMENT 450 if withdrawalId <= 1 { 451 // tweak to intially set confirmed_row_id. Can be removed once confirmed_row_id field is obsolete 452 query = "UPDATE c2ec.withdrawal SET (withdrawal_status,completion_proof,confirmed_row_id) = ($1,$2,1) WHERE withdrawal_row_id=$3" 453 } 454 455 _, err := db.pool.Exec( 456 db.ctx, 457 query, 458 confirmOrAbort, 459 completionProof, 460 withdrawalId, 461 ) 462 if err != nil { 463 internal_utils.LogError("postgres", err) 464 return err 465 } 466 internal_utils.LogInfo("postgres", "query="+query) 467 return nil 468 } 469 470 func (db *C2ECPostgres) SetLastRetry(withdrawalId int, lastRetryTsUnix int64) error { 471 472 _, err := db.pool.Exec( 473 db.ctx, 474 PS_SET_LAST_RETRY, 475 lastRetryTsUnix, 476 withdrawalId, 477 ) 478 if err != nil { 479 internal_utils.LogError("postgres", err) 480 return err 481 } 482 internal_utils.LogInfo("postgres", "query="+PS_SET_LAST_RETRY) 483 return nil 484 } 485 486 func (db *C2ECPostgres) SetRetryCounter(withdrawalId int, retryCounter int) error { 487 488 _, err := db.pool.Exec( 489 db.ctx, 490 PS_SET_RETRY_COUNTER, 491 retryCounter, 492 withdrawalId, 493 ) 494 if err != nil { 495 internal_utils.LogError("postgres", err) 496 return err 497 } 498 internal_utils.LogInfo("postgres", "query="+PS_SET_RETRY_COUNTER) 499 return nil 500 } 501 502 // The query at the postgres database works as specified by the 503 // wire gateway api. 504 func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int, since time.Time) ([]*public_db.Withdrawal, error) { 505 506 // +d / +s 507 query := PS_CONFIRMED_TRANSACTIONS_ASC 508 if delta < 0 { 509 // d negatives indicates DESC ordering and backwards reading 510 // -d / +s 511 query = PS_CONFIRMED_TRANSACTIONS_DESC 512 if start < 0 { 513 // start negative indicates not explicitly given 514 // since -d is the case here we try reading the latest entries 515 // -d / -s 516 query = PS_CONFIRMED_TRANSACTIONS_DESC_MAX 517 } 518 } else { 519 if start < 0 { 520 // +d / -s 521 query = PS_CONFIRMED_TRANSACTIONS_ASC_MAX 522 } 523 } 524 525 limit := int(math.Abs(float64(delta))) 526 offset := start 527 if offset < 0 { 528 offset = 0 529 } 530 531 if start < 0 { 532 start = 0 533 } 534 535 internal_utils.LogInfo("postgres", fmt.Sprintf("selected query=%s (\nparameters:\n delta=%d,\n start=%d, limit=%d,\n offset=%d,\n since=%d\n)", query, delta, start, limit, offset, since.Unix())) 536 537 var row pgx.Rows 538 var err error 539 540 if strings.Count(query, "$") == 1 { 541 row, err = db.pool.Query( 542 db.ctx, 543 query, 544 limit, 545 ) 546 } else { 547 row, err = db.pool.Query( 548 db.ctx, 549 query, 550 offset, 551 limit, 552 ) 553 } 554 555 internal_utils.LogInfo("postgres", "query="+query) 556 if err != nil { 557 internal_utils.LogError("postgres", err) 558 if row != nil { 559 row.Close() 560 } 561 return nil, err 562 } else { 563 564 defer row.Close() 565 566 withdrawals, err := pgx.CollectRows(row, pgx.RowToAddrOfStructByName[public_db.Withdrawal]) 567 if err != nil { 568 internal_utils.LogError("postgres", err) 569 return nil, err 570 } 571 572 return internal_utils.RemoveNulls(withdrawals), nil 573 } 574 } 575 576 func (db *C2ECPostgres) GetProviderByTerminal(terminalId int) (*public_db.Provider, error) { 577 578 if row, err := db.pool.Query( 579 db.ctx, 580 PS_GET_PROVIDER_BY_TERMINAL, 581 terminalId, 582 ); err != nil { 583 internal_utils.LogWarn("postgres", "failed query="+PS_GET_PROVIDER_BY_TERMINAL) 584 internal_utils.LogError("postgres", err) 585 if row != nil { 586 row.Close() 587 } 588 return nil, err 589 } else { 590 591 defer row.Close() 592 593 provider, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Provider]) 594 if err != nil { 595 internal_utils.LogError("postgres", err) 596 return nil, err 597 } 598 599 internal_utils.LogInfo("postgres", "query="+PS_GET_PROVIDER_BY_TERMINAL) 600 return provider, nil 601 } 602 } 603 604 func (db *C2ECPostgres) GetTerminalProviderByName(name string) (*public_db.Provider, error) { 605 606 if row, err := db.pool.Query( 607 db.ctx, 608 PS_GET_PROVIDER_BY_NAME, 609 name, 610 ); err != nil { 611 internal_utils.LogWarn("postgres", "failed query="+PS_GET_PROVIDER_BY_NAME) 612 internal_utils.LogError("postgres", err) 613 if row != nil { 614 row.Close() 615 } 616 return nil, err 617 } else { 618 619 defer row.Close() 620 621 provider, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Provider]) 622 if err != nil { 623 internal_utils.LogWarn("postgres", "failed query="+PS_GET_PROVIDER_BY_NAME) 624 internal_utils.LogError("postgres", err) 625 return nil, err 626 } 627 628 internal_utils.LogInfo("postgres", "query="+PS_GET_PROVIDER_BY_NAME) 629 return provider, nil 630 } 631 } 632 633 func (db *C2ECPostgres) GetTerminalProviderByPaytoTargetType(paytoTargetType string) (*public_db.Provider, error) { 634 635 internal_utils.LogInfo("postgres", "loading provider for payto-target-type="+paytoTargetType) 636 if row, err := db.pool.Query( 637 db.ctx, 638 PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE, 639 paytoTargetType, 640 ); err != nil { 641 internal_utils.LogWarn("postgres", "failed query="+PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE) 642 internal_utils.LogError("postgres", err) 643 if row != nil { 644 row.Close() 645 } 646 return nil, err 647 } else { 648 649 defer row.Close() 650 651 provider, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Provider]) 652 if err != nil { 653 internal_utils.LogWarn("postgres", "failed query="+PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE) 654 internal_utils.LogError("postgres", err) 655 return nil, err 656 } 657 658 internal_utils.LogInfo("postgres", "query="+PS_GET_PROVIDER_BY_PAYTO_TARGET_TYPE) 659 return provider, nil 660 } 661 } 662 663 func (db *C2ECPostgres) GetTerminalById(id int) (*public_db.Terminal, error) { 664 665 if row, err := db.pool.Query( 666 db.ctx, 667 PS_GET_TERMINAL_BY_ID, 668 id, 669 ); err != nil { 670 internal_utils.LogWarn("postgres", "failed query="+PS_GET_TERMINAL_BY_ID) 671 internal_utils.LogError("postgres", err) 672 if row != nil { 673 row.Close() 674 } 675 return nil, err 676 } else { 677 678 defer row.Close() 679 680 terminal, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[public_db.Terminal]) 681 if err != nil { 682 internal_utils.LogWarn("postgres", "failed query="+PS_GET_TERMINAL_BY_ID) 683 internal_utils.LogError("postgres", err) 684 return nil, err 685 } 686 687 internal_utils.LogInfo("postgres", "query="+PS_GET_TERMINAL_BY_ID) 688 return terminal, nil 689 } 690 } 691 692 func (db *C2ECPostgres) GetTransferById(requestUid []byte) (*public_db.Transfer, error) { 693 694 if rows, err := db.pool.Query( 695 db.ctx, 696 PS_GET_TRANSFER_BY_ID, 697 requestUid, 698 ); err != nil { 699 internal_utils.LogWarn("postgres", "failed query="+PS_GET_TRANSFER_BY_ID) 700 internal_utils.LogError("postgres", err) 701 if rows != nil { 702 rows.Close() 703 } 704 return nil, err 705 } else { 706 707 defer rows.Close() 708 709 transfer, err := pgx.CollectOneRow(rows, pgx.RowToAddrOfStructByName[public_db.Transfer]) 710 if err != nil { 711 if errors.Is(err, pgx.ErrNoRows) { 712 return nil, nil 713 } 714 internal_utils.LogError("postgres", err) 715 return nil, err 716 } 717 718 internal_utils.LogInfo("postgres", "query="+PS_GET_TRANSFER_BY_ID) 719 return transfer, nil 720 } 721 } 722 723 func (db *C2ECPostgres) GetTransfersByCreditAccount(creditAccount string) ([]*public_db.Transfer, error) { 724 725 if rows, err := db.pool.Query( 726 db.ctx, 727 PS_GET_TRANSFER_BY_CREDIT_ACCOUNT, 728 creditAccount, 729 ); err != nil { 730 internal_utils.LogWarn("postgres", "failed query="+PS_GET_TRANSFER_BY_CREDIT_ACCOUNT) 731 internal_utils.LogError("postgres", err) 732 if rows != nil { 733 rows.Close() 734 } 735 return nil, err 736 } else { 737 738 defer rows.Close() 739 740 transfers, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByName[public_db.Transfer]) 741 if err != nil { 742 if errors.Is(err, pgx.ErrNoRows) { 743 return make([]*public_db.Transfer, 0), nil 744 } 745 internal_utils.LogError("postgres", err) 746 return nil, err 747 } 748 749 internal_utils.LogInfo("postgres", "query="+PS_GET_TRANSFER_BY_CREDIT_ACCOUNT) 750 return internal_utils.RemoveNulls(transfers), nil 751 } 752 } 753 754 func (db *C2ECPostgres) AddTransfer( 755 requestUid []byte, 756 amount *internal_utils.Amount, 757 exchangeBaseUrl string, 758 wtid string, 759 credit_account string, 760 ts time.Time, 761 ) error { 762 763 dbAmount := internal_utils.TalerAmountCurrency{ 764 Val: int64(amount.Value), 765 Frac: int32(amount.Fraction), 766 Curr: amount.Currency, 767 } 768 769 _, err := db.pool.Exec( 770 db.ctx, 771 PS_ADD_TRANSFER, 772 requestUid, 773 dbAmount, 774 exchangeBaseUrl, 775 wtid, 776 credit_account, 777 ts.Unix(), 778 ) 779 if err != nil { 780 internal_utils.LogInfo("postgres", "failed query="+PS_ADD_TRANSFER) 781 internal_utils.LogError("postgres", err) 782 return err 783 } 784 internal_utils.LogInfo("postgres", "query="+PS_ADD_TRANSFER) 785 return nil 786 } 787 788 func (db *C2ECPostgres) UpdateTransfer( 789 rowId int, 790 requestUid []byte, 791 timestamp int64, 792 status int16, 793 retries int16, 794 ) error { 795 796 query := PS_UPDATE_TRANSFER 797 if rowId <= 1 { 798 // tweak to intially set transferred_row_id. Can be removed once transferred_row_id field is obsolete 799 query = "UPDATE c2ec.transfer SET (transfer_ts, transfer_status, retries, transferred_row_id) = ($1,$2,$3,1) WHERE request_uid=$4" 800 } 801 802 _, err := db.pool.Exec( 803 db.ctx, 804 query, 805 timestamp, 806 status, 807 retries, 808 requestUid, 809 ) 810 if err != nil { 811 internal_utils.LogInfo("postgres", "failed query="+query) 812 internal_utils.LogError("postgres", err) 813 return err 814 } 815 internal_utils.LogInfo("postgres", "query="+query) 816 return nil 817 } 818 819 func (db *C2ECPostgres) GetTransfers(start int, delta int, since time.Time) ([]*public_db.Transfer, error) { 820 821 // +d / +s 822 query := PS_GET_TRANSFERS_ASC 823 if delta < 0 { 824 // d negatives indicates DESC ordering and backwards reading 825 // -d / +s 826 query = PS_GET_TRANSFERS_DESC 827 if start < 0 { 828 // start negative indicates not explicitly given 829 // since -d is the case here we try reading the latest entries 830 // -d / -s 831 query = PS_GET_TRANSFERS_DESC_MAX 832 } 833 } else { 834 if start < 0 { 835 // +d / -s 836 query = PS_GET_TRANSFERS_ASC_MAX 837 } 838 } 839 840 limit := int(math.Abs(float64(delta))) 841 offset := start 842 if offset < 0 { 843 offset = 0 844 } 845 846 if start < 0 { 847 start = 0 848 } 849 850 internal_utils.LogInfo("postgres", fmt.Sprintf("selected query=%s (\nparameters:\n delta=%d,\n start=%d, limit=%d,\n offset=%d,\n since=%d\n)", query, delta, start, limit, offset, since.Unix())) 851 852 var row pgx.Rows 853 var err error 854 855 if strings.Count(query, "$") == 1 { 856 row, err = db.pool.Query( 857 db.ctx, 858 query, 859 limit, 860 ) 861 } else { 862 row, err = db.pool.Query( 863 db.ctx, 864 query, 865 offset, 866 limit, 867 ) 868 } 869 870 if err != nil { 871 internal_utils.LogWarn("postgres", "failed query="+query) 872 internal_utils.LogError("postgres", err) 873 if row != nil { 874 row.Close() 875 } 876 return nil, err 877 } else { 878 879 defer row.Close() 880 881 transfers, err := pgx.CollectRows(row, pgx.RowToAddrOfStructByName[public_db.Transfer]) 882 if err != nil { 883 internal_utils.LogWarn("postgres", "failed query="+query) 884 internal_utils.LogError("postgres", err) 885 return nil, err 886 } 887 888 return internal_utils.RemoveNulls(transfers), nil 889 } 890 } 891 892 func (db *C2ECPostgres) GetTransfersByState(status int) ([]*public_db.Transfer, error) { 893 894 if rows, err := db.pool.Query( 895 db.ctx, 896 PS_GET_TRANSFERS_BY_STATUS, 897 status, 898 ); err != nil { 899 internal_utils.LogError("postgres", err) 900 if rows != nil { 901 rows.Close() 902 } 903 return nil, err 904 } else { 905 906 defer rows.Close() 907 908 transfers, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByName[public_db.Transfer]) 909 if err != nil { 910 internal_utils.LogWarn("postgres", "failed query="+PS_GET_TRANSFERS_BY_STATUS) 911 internal_utils.LogError("postgres", err) 912 return nil, err 913 } 914 915 // this will fill up the logs... 916 // internal_utils.LogInfo("postgres", "query="+PS_GET_TRANSFERS_BY_STATUS) 917 // internal_utils.LogInfo("postgres", "size of transfer list="+strconv.Itoa(len(transfers))) 918 return internal_utils.RemoveNulls(transfers), nil 919 } 920 } 921 922 // Sets up a a listener for the given channel. 923 // Notifications will be sent through the out channel. 924 func (db *C2ECPostgres) NewListener( 925 cn string, 926 out chan *public_db.Notification, 927 ) (func(context.Context) error, error) { 928 929 connectionString := PostgresConnectionString(&config.CONFIG.Database) 930 cfg, err := pgx.ParseConfig(connectionString) 931 if err != nil { 932 return nil, err 933 } 934 935 listener := &pgxlisten.Listener{ 936 Connect: func(ctx context.Context) (*pgx.Conn, error) { 937 internal_utils.LogInfo("postgres", "listener connecting to the database") 938 return pgx.ConnectConfig(ctx, cfg) 939 }, 940 } 941 942 internal_utils.LogInfo("postgres", "handling notifications on channel="+cn) 943 listener.Handle(cn, pgxlisten.HandlerFunc(func(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error { 944 internal_utils.LogInfo("postgres", fmt.Sprintf("handling postgres notification. channel=%s", notification.Channel)) 945 out <- &public_db.Notification{ 946 Channel: notification.Channel, 947 Payload: notification.Payload, 948 } 949 return nil 950 })) 951 952 return listener.Listen, nil 953 }