summaryrefslogtreecommitdiff
path: root/c2ec/postgres.go
diff options
context:
space:
mode:
Diffstat (limited to 'c2ec/postgres.go')
-rw-r--r--c2ec/postgres.go231
1 files changed, 130 insertions, 101 deletions
diff --git a/c2ec/postgres.go b/c2ec/postgres.go
index ce9bc9a..e9a085a 100644
--- a/c2ec/postgres.go
+++ b/c2ec/postgres.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math"
+ "os"
"strconv"
"time"
@@ -14,9 +15,6 @@ import (
"github.com/jackc/pgxlisten"
)
-const PS_ASC_SELECTOR = "ASC"
-const PS_DESC_SELECTOR = "DESC"
-
const PS_INSERT_WITHDRAWAL = "INSERT INTO " + WITHDRAWAL_TABLE_NAME + " (" +
WITHDRAWAL_FIELD_NAME_WOPID + "," +
WITHDRAWAL_FIELD_NAME_RESPUBKEY + "," +
@@ -45,13 +43,20 @@ const PS_SET_LAST_RETRY = "UPDATE " + WITHDRAWAL_TABLE_NAME +
" WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$2"
const PS_SET_RETRY_COUNTER = "UPDATE " + WITHDRAWAL_TABLE_NAME +
- " SET " + WITHDRAWAL_FIELD_NAME_RETRY_COUNTER + "=($1)" +
+ " SET " + WITHDRAWAL_FIELD_NAME_RETRY_COUNTER + "=$1" +
" WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$2"
-const PS_CONFIRMED_TRANSACTIONS = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
+const PS_CONFIRMED_TRANSACTIONS_ASC = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
+ " WHERE " + WITHDRAWAL_FIELD_NAME_STATUS + "='" + string(CONFIRMED) + "'" +
+ " ORDER BY " + WITHDRAWAL_FIELD_NAME_ID + " ASC" +
+ " LIMIT $1" +
+ " OFFSET $2"
+
+const PS_CONFIRMED_TRANSACTIONS_DESC = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
+ " WHERE " + WITHDRAWAL_FIELD_NAME_STATUS + "='" + string(CONFIRMED) + "'" +
+ " ORDER BY " + WITHDRAWAL_FIELD_NAME_ID + " DESC" +
" LIMIT $1" +
- " OFFSET $2" +
- " ORDER BY " + WITHDRAWAL_FIELD_NAME_ID + " $3"
+ " OFFSET $2"
const PS_GET_WITHDRAWAL_BY_ID = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME +
" WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$1"
@@ -82,7 +87,22 @@ const PS_GET_TRANSFER_BY_ID = "SELECT * FROM " + TRANSFER_TABLE_NAME +
const PS_ADD_TRANSFER = "INSERT INTO " + TRANSFER_TABLE_NAME +
" (" + TRANSFER_FIELD_NAME_ID + ", " + TRANSFER_FIELD_NAME_AMOUNT + ", " +
TRANSFER_FIELD_NAME_EXCHANGE_BASE_URL + ", " + TRANSFER_FIELD_NAME_WTID + ", " +
- TRANSFER_FIELD_NAME_CREDIT_ACCOUNT + ")" + " VALUES ($1, $2, $3, $4, $5, $6)"
+ TRANSFER_FIELD_NAME_CREDIT_ACCOUNT + ") VALUES ($1, $2, $3, $4, $5)"
+
+const PS_UPDATE_TRANSFER = "UPDATE " + TRANSFER_TABLE_NAME + " SET (" +
+ TRANSFER_FIELD_NAME_TS + ", " + TRANSFER_FIELD_NAME_STATUS + ", " +
+ TRANSFER_FIELD_NAME_RETRIES + ") VALUES ($1,$2,$3) WHERE " +
+ TRANSFER_FIELD_NAME_ID + "=$4"
+
+const PS_GET_TRANSFERS_ASC = "SELECT * FROM " + TRANSFER_TABLE_NAME +
+ " ORDER BY " + TRANSFER_FIELD_NAME_ROW_ID + " ASC" +
+ " LIMIT $1" +
+ " OFFSET $2"
+
+const PS_GET_TRANSFERS_DESC = "SELECT * FROM " + TRANSFER_TABLE_NAME +
+ " ORDER BY " + TRANSFER_FIELD_NAME_ROW_ID + " DESC" +
+ " LIMIT $1" +
+ " OFFSET $2"
// Postgres implementation of the C2ECDatabase
type C2ECPostgres struct {
@@ -109,6 +129,11 @@ func NewC2ECPostgres(cfg *C2ECDatabseConfig) (*C2ECPostgres, error) {
db := new(C2ECPostgres)
connectionString := PostgresConnectionString(cfg)
+ pgHost := os.Getenv("PGHOST")
+ if pgHost != "" {
+ LogInfo("postgres", "pghost was set")
+ connectionString = pgHost
+ }
dbConnCfg, err := pgxpool.ParseConfig(connectionString)
if err != nil {
@@ -179,15 +204,8 @@ func (db *C2ECPostgres) GetWithdrawalById(withdrawalId int) (*Withdrawal, error)
} else {
defer row.Close()
-
- withdrawal, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal])
- if err != nil {
- LogError("postgres", err)
- return nil, err
- }
-
LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_ID)
- return withdrawal, nil
+ return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal])
}
}
@@ -206,15 +224,8 @@ func (db *C2ECPostgres) GetWithdrawalByWopid(wopid []byte) (*Withdrawal, error)
} else {
defer row.Close()
-
- withdrawal, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal])
- if err != nil {
- LogError("postgres", err)
- return nil, err
- }
-
LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_WOPID)
- return withdrawal, nil
+ return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal])
}
}
@@ -232,18 +243,8 @@ func (db *C2ECPostgres) GetWithdrawalByProviderTransactionId(tid string) (*Withd
} else {
defer row.Close()
-
- withdrawals, err := pgx.CollectRows(row, pgx.RowToAddrOfStructByName[Withdrawal])
- if err != nil {
- LogError("postgres", err)
- return nil, err
- }
-
- if len(withdrawals) < 1 {
- return nil, nil
- }
LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_PTID)
- return withdrawals[0], nil
+ return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal])
}
}
@@ -298,7 +299,7 @@ func (db *C2ECPostgres) GetAttestableWithdrawals() ([]*Withdrawal, error) {
}
LogInfo("postgres", "query="+PS_GET_UNCONFIRMED_WITHDRAWALS)
- return withdrawals, nil
+ return removeNulls(withdrawals), nil
}
}
@@ -363,9 +364,9 @@ func (db *C2ECPostgres) SetRetryCounter(withdrawalId int, retryCounter int) erro
// wire gateway api.
func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int) ([]*Withdrawal, error) {
- sort := PS_ASC_SELECTOR
+ query := PS_CONFIRMED_TRANSACTIONS_ASC
if delta < 0 {
- sort = PS_DESC_SELECTOR
+ query = PS_CONFIRMED_TRANSACTIONS_DESC
}
limit := math.Abs(float64(delta))
@@ -385,21 +386,20 @@ func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int) ([]*Withdr
// recent ids.
row, err = db.pool.Query(
db.ctx,
- PS_CONFIRMED_TRANSACTIONS,
+ query,
limit,
"MAX("+WITHDRAWAL_FIELD_NAME_ID+")",
- sort,
)
} else {
row, err = db.pool.Query(
db.ctx,
- PS_CONFIRMED_TRANSACTIONS,
+ query,
limit,
offset,
- sort,
)
}
+ LogInfo("postgres", "query="+query)
if err != nil {
LogError("postgres", err)
if row != nil {
@@ -416,8 +416,7 @@ func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int) ([]*Withdr
return nil, err
}
- LogInfo("postgres", "query="+PS_CONFIRMED_TRANSACTIONS)
- return withdrawals, nil
+ return removeNulls(withdrawals), nil
}
}
@@ -529,7 +528,7 @@ func (db *C2ECPostgres) GetTerminalById(id int) (*Terminal, error) {
}
}
-func (db *C2ECPostgres) GetTransferById(requestUid HashCode) (*Transfer, error) {
+func (db *C2ECPostgres) GetTransferById(requestUid []byte) (*Transfer, error) {
if row, err := db.pool.Query(
db.ctx,
@@ -558,7 +557,7 @@ func (db *C2ECPostgres) GetTransferById(requestUid HashCode) (*Transfer, error)
}
func (db *C2ECPostgres) AddTransfer(
- requestUid HashCode,
+ requestUid []byte,
amount *Amount,
exchangeBaseUrl string,
wtid string,
@@ -589,81 +588,100 @@ func (db *C2ECPostgres) AddTransfer(
return nil
}
-func (db *C2ECPostgres) ListenForWithdrawalStatusChange(
- ctx context.Context,
- wopid WithdrawalIdentifier,
- out chan WithdrawalOperationStatus,
- errs chan error,
-) {
-
- notifications := make(chan *Notification)
- errsInternal := make(chan error)
+func (db *C2ECPostgres) UpdateTransfer(
+ requestUid []byte,
+ timestamp int64,
+ status int16,
+ retries int16,
+) error {
- go func() {
+ _, err := db.pool.Exec(
+ db.ctx,
+ PS_UPDATE_TRANSFER,
+ timestamp,
+ status,
+ retries,
+ requestUid,
+ )
+ if err != nil {
+ LogError("postgres", err)
+ return err
+ }
+ LogInfo("postgres", "query="+PS_UPDATE_TRANSFER)
+ return nil
+}
- channel := "w_" + string(wopid)
- listener, err := NewListener(channel, notifications)
- if err != nil {
- errsInternal <- err
- return
- }
- LogInfo("postgres", fmt.Sprintf("listening for status change of wopid=%s", wopid))
+func (db *C2ECPostgres) GetTransfers(start int, delta int) ([]*Transfer, error) {
- if err := listener.Listen(ctx); err != nil {
- LogError("postgres", err)
- errs <- err
- }
- }()
-
- for {
- select {
- case e := <-errsInternal:
- LogError("postgres", e)
- errs <- e
- case <-ctx.Done():
- err := ctx.Err()
- msg := "context sent done signal while listening for status change"
- if err != nil {
- LogError("postgres", err)
- }
- LogWarn("postgres", msg)
- errs <- errors.New(msg)
- case n := <-notifications:
- LogInfo("postgres", fmt.Sprintf("received notification for channel %s: %s", n.Channel, n.Payload))
- out <- WithdrawalOperationStatus(n.Payload)
- }
+ query := PS_GET_TRANSFERS_ASC
+ if delta < 0 {
+ query = PS_GET_TRANSFERS_DESC
}
-}
-func Listen(ctx context.Context, channel string) (chan *Notification, chan error, error) {
+ limit := math.Abs(float64(delta))
+ offset := start
+ if delta < 0 {
+ offset = start - int(limit)
+ }
+ if offset < 0 {
+ offset = 0
+ }
- out := make(chan *Notification)
- errs := make(chan error)
+ var row pgx.Rows
+ var err error
+ if start < 0 {
+ // use MAX(id) instead of a concrete id, because start
+ // identifier was negative. Inidicates to read the most
+ // recent ids.
+ row, err = db.pool.Query(
+ db.ctx,
+ query,
+ limit,
+ "MAX("+TRANSFER_FIELD_NAME_ROW_ID+")",
+ )
+ } else {
+ row, err = db.pool.Query(
+ db.ctx,
+ query,
+ limit,
+ offset,
+ )
+ }
- listener, err := NewListener(channel, out)
+ LogInfo("postgres", "query="+query)
if err != nil {
- return nil, nil, err
- }
+ LogError("postgres", err)
+ if row != nil {
+ row.Close()
+ }
+ return nil, err
+ } else {
- go func() {
+ defer row.Close()
- err := listener.Listen(ctx)
+ transfers, err := pgx.CollectRows(row, pgx.RowToAddrOfStructByName[Transfer])
if err != nil {
- errs <- err
+ LogError("postgres", err)
+ return nil, err
}
- }()
- return out, errs, nil
+ return removeNulls(transfers), nil
+ }
}
// Sets up a a listener for the given channel.
// Notifications will be sent through the out channel.
-func NewListener(
+func (db *C2ECPostgres) NewListener(
cn string,
out chan *Notification,
-) (*pgxlisten.Listener, error) {
+) (func(context.Context) error, error) {
connectionString := PostgresConnectionString(&CONFIG.Database)
+ pgHost := os.Getenv("PGHOST")
+ if pgHost != "" {
+ LogInfo("postgres", "pghost was set")
+ connectionString = pgHost
+ }
cfg, err := pgx.ParseConfig(connectionString)
if err != nil {
@@ -687,5 +705,16 @@ func NewListener(
return nil
}))
- return listener, nil
+ return listener.Listen, nil
+}
+
+func removeNulls[T any](l []*T) []*T {
+
+ withoutNulls := make([]*T, 0)
+ for _, e := range l {
+ if e != nil {
+ withoutNulls = append(withoutNulls, e)
+ }
+ }
+ return withoutNulls
}