diff options
Diffstat (limited to 'c2ec/postgres.go')
-rw-r--r-- | c2ec/postgres.go | 231 |
1 files changed, 130 insertions, 101 deletions
diff --git a/c2ec/postgres.go b/c2ec/postgres.go index ce9bc9a..e9a085a 100644 --- a/c2ec/postgres.go +++ b/c2ec/postgres.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "math" + "os" "strconv" "time" @@ -14,9 +15,6 @@ import ( "github.com/jackc/pgxlisten" ) -const PS_ASC_SELECTOR = "ASC" -const PS_DESC_SELECTOR = "DESC" - const PS_INSERT_WITHDRAWAL = "INSERT INTO " + WITHDRAWAL_TABLE_NAME + " (" + WITHDRAWAL_FIELD_NAME_WOPID + "," + WITHDRAWAL_FIELD_NAME_RESPUBKEY + "," + @@ -45,13 +43,20 @@ const PS_SET_LAST_RETRY = "UPDATE " + WITHDRAWAL_TABLE_NAME + " WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$2" const PS_SET_RETRY_COUNTER = "UPDATE " + WITHDRAWAL_TABLE_NAME + - " SET " + WITHDRAWAL_FIELD_NAME_RETRY_COUNTER + "=($1)" + + " SET " + WITHDRAWAL_FIELD_NAME_RETRY_COUNTER + "=$1" + " WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$2" -const PS_CONFIRMED_TRANSACTIONS = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME + +const PS_CONFIRMED_TRANSACTIONS_ASC = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME + + " WHERE " + WITHDRAWAL_FIELD_NAME_STATUS + "='" + string(CONFIRMED) + "'" + + " ORDER BY " + WITHDRAWAL_FIELD_NAME_ID + " ASC" + + " LIMIT $1" + + " OFFSET $2" + +const PS_CONFIRMED_TRANSACTIONS_DESC = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME + + " WHERE " + WITHDRAWAL_FIELD_NAME_STATUS + "='" + string(CONFIRMED) + "'" + + " ORDER BY " + WITHDRAWAL_FIELD_NAME_ID + " DESC" + " LIMIT $1" + - " OFFSET $2" + - " ORDER BY " + WITHDRAWAL_FIELD_NAME_ID + " $3" + " OFFSET $2" const PS_GET_WITHDRAWAL_BY_ID = "SELECT * FROM " + WITHDRAWAL_TABLE_NAME + " WHERE " + WITHDRAWAL_FIELD_NAME_ID + "=$1" @@ -82,7 +87,22 @@ const PS_GET_TRANSFER_BY_ID = "SELECT * FROM " + TRANSFER_TABLE_NAME + const PS_ADD_TRANSFER = "INSERT INTO " + TRANSFER_TABLE_NAME + " (" + TRANSFER_FIELD_NAME_ID + ", " + TRANSFER_FIELD_NAME_AMOUNT + ", " + TRANSFER_FIELD_NAME_EXCHANGE_BASE_URL + ", " + TRANSFER_FIELD_NAME_WTID + ", " + - TRANSFER_FIELD_NAME_CREDIT_ACCOUNT + ")" + " VALUES ($1, $2, $3, $4, $5, $6)" + TRANSFER_FIELD_NAME_CREDIT_ACCOUNT + ") VALUES ($1, $2, $3, $4, $5)" + +const PS_UPDATE_TRANSFER = "UPDATE " + TRANSFER_TABLE_NAME + " SET (" + + TRANSFER_FIELD_NAME_TS + ", " + TRANSFER_FIELD_NAME_STATUS + ", " + + TRANSFER_FIELD_NAME_RETRIES + ") VALUES ($1,$2,$3) WHERE " + + TRANSFER_FIELD_NAME_ID + "=$4" + +const PS_GET_TRANSFERS_ASC = "SELECT * FROM " + TRANSFER_TABLE_NAME + + " ORDER BY " + TRANSFER_FIELD_NAME_ROW_ID + " ASC" + + " LIMIT $1" + + " OFFSET $2" + +const PS_GET_TRANSFERS_DESC = "SELECT * FROM " + TRANSFER_TABLE_NAME + + " ORDER BY " + TRANSFER_FIELD_NAME_ROW_ID + " DESC" + + " LIMIT $1" + + " OFFSET $2" // Postgres implementation of the C2ECDatabase type C2ECPostgres struct { @@ -109,6 +129,11 @@ func NewC2ECPostgres(cfg *C2ECDatabseConfig) (*C2ECPostgres, error) { db := new(C2ECPostgres) connectionString := PostgresConnectionString(cfg) + pgHost := os.Getenv("PGHOST") + if pgHost != "" { + LogInfo("postgres", "pghost was set") + connectionString = pgHost + } dbConnCfg, err := pgxpool.ParseConfig(connectionString) if err != nil { @@ -179,15 +204,8 @@ func (db *C2ECPostgres) GetWithdrawalById(withdrawalId int) (*Withdrawal, error) } else { defer row.Close() - - withdrawal, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal]) - if err != nil { - LogError("postgres", err) - return nil, err - } - LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_ID) - return withdrawal, nil + return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal]) } } @@ -206,15 +224,8 @@ func (db *C2ECPostgres) GetWithdrawalByWopid(wopid []byte) (*Withdrawal, error) } else { defer row.Close() - - withdrawal, err := pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal]) - if err != nil { - LogError("postgres", err) - return nil, err - } - LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_WOPID) - return withdrawal, nil + return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal]) } } @@ -232,18 +243,8 @@ func (db *C2ECPostgres) GetWithdrawalByProviderTransactionId(tid string) (*Withd } else { defer row.Close() - - withdrawals, err := pgx.CollectRows(row, pgx.RowToAddrOfStructByName[Withdrawal]) - if err != nil { - LogError("postgres", err) - return nil, err - } - - if len(withdrawals) < 1 { - return nil, nil - } LogInfo("postgres", "query="+PS_GET_WITHDRAWAL_BY_PTID) - return withdrawals[0], nil + return pgx.CollectExactlyOneRow(row, pgx.RowToAddrOfStructByName[Withdrawal]) } } @@ -298,7 +299,7 @@ func (db *C2ECPostgres) GetAttestableWithdrawals() ([]*Withdrawal, error) { } LogInfo("postgres", "query="+PS_GET_UNCONFIRMED_WITHDRAWALS) - return withdrawals, nil + return removeNulls(withdrawals), nil } } @@ -363,9 +364,9 @@ func (db *C2ECPostgres) SetRetryCounter(withdrawalId int, retryCounter int) erro // wire gateway api. func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int) ([]*Withdrawal, error) { - sort := PS_ASC_SELECTOR + query := PS_CONFIRMED_TRANSACTIONS_ASC if delta < 0 { - sort = PS_DESC_SELECTOR + query = PS_CONFIRMED_TRANSACTIONS_DESC } limit := math.Abs(float64(delta)) @@ -385,21 +386,20 @@ func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int) ([]*Withdr // recent ids. row, err = db.pool.Query( db.ctx, - PS_CONFIRMED_TRANSACTIONS, + query, limit, "MAX("+WITHDRAWAL_FIELD_NAME_ID+")", - sort, ) } else { row, err = db.pool.Query( db.ctx, - PS_CONFIRMED_TRANSACTIONS, + query, limit, offset, - sort, ) } + LogInfo("postgres", "query="+query) if err != nil { LogError("postgres", err) if row != nil { @@ -416,8 +416,7 @@ func (db *C2ECPostgres) GetConfirmedWithdrawals(start int, delta int) ([]*Withdr return nil, err } - LogInfo("postgres", "query="+PS_CONFIRMED_TRANSACTIONS) - return withdrawals, nil + return removeNulls(withdrawals), nil } } @@ -529,7 +528,7 @@ func (db *C2ECPostgres) GetTerminalById(id int) (*Terminal, error) { } } -func (db *C2ECPostgres) GetTransferById(requestUid HashCode) (*Transfer, error) { +func (db *C2ECPostgres) GetTransferById(requestUid []byte) (*Transfer, error) { if row, err := db.pool.Query( db.ctx, @@ -558,7 +557,7 @@ func (db *C2ECPostgres) GetTransferById(requestUid HashCode) (*Transfer, error) } func (db *C2ECPostgres) AddTransfer( - requestUid HashCode, + requestUid []byte, amount *Amount, exchangeBaseUrl string, wtid string, @@ -589,81 +588,100 @@ func (db *C2ECPostgres) AddTransfer( return nil } -func (db *C2ECPostgres) ListenForWithdrawalStatusChange( - ctx context.Context, - wopid WithdrawalIdentifier, - out chan WithdrawalOperationStatus, - errs chan error, -) { - - notifications := make(chan *Notification) - errsInternal := make(chan error) +func (db *C2ECPostgres) UpdateTransfer( + requestUid []byte, + timestamp int64, + status int16, + retries int16, +) error { - go func() { + _, err := db.pool.Exec( + db.ctx, + PS_UPDATE_TRANSFER, + timestamp, + status, + retries, + requestUid, + ) + if err != nil { + LogError("postgres", err) + return err + } + LogInfo("postgres", "query="+PS_UPDATE_TRANSFER) + return nil +} - channel := "w_" + string(wopid) - listener, err := NewListener(channel, notifications) - if err != nil { - errsInternal <- err - return - } - LogInfo("postgres", fmt.Sprintf("listening for status change of wopid=%s", wopid)) +func (db *C2ECPostgres) GetTransfers(start int, delta int) ([]*Transfer, error) { - if err := listener.Listen(ctx); err != nil { - LogError("postgres", err) - errs <- err - } - }() - - for { - select { - case e := <-errsInternal: - LogError("postgres", e) - errs <- e - case <-ctx.Done(): - err := ctx.Err() - msg := "context sent done signal while listening for status change" - if err != nil { - LogError("postgres", err) - } - LogWarn("postgres", msg) - errs <- errors.New(msg) - case n := <-notifications: - LogInfo("postgres", fmt.Sprintf("received notification for channel %s: %s", n.Channel, n.Payload)) - out <- WithdrawalOperationStatus(n.Payload) - } + query := PS_GET_TRANSFERS_ASC + if delta < 0 { + query = PS_GET_TRANSFERS_DESC } -} -func Listen(ctx context.Context, channel string) (chan *Notification, chan error, error) { + limit := math.Abs(float64(delta)) + offset := start + if delta < 0 { + offset = start - int(limit) + } + if offset < 0 { + offset = 0 + } - out := make(chan *Notification) - errs := make(chan error) + var row pgx.Rows + var err error + if start < 0 { + // use MAX(id) instead of a concrete id, because start + // identifier was negative. Inidicates to read the most + // recent ids. + row, err = db.pool.Query( + db.ctx, + query, + limit, + "MAX("+TRANSFER_FIELD_NAME_ROW_ID+")", + ) + } else { + row, err = db.pool.Query( + db.ctx, + query, + limit, + offset, + ) + } - listener, err := NewListener(channel, out) + LogInfo("postgres", "query="+query) if err != nil { - return nil, nil, err - } + LogError("postgres", err) + if row != nil { + row.Close() + } + return nil, err + } else { - go func() { + defer row.Close() - err := listener.Listen(ctx) + transfers, err := pgx.CollectRows(row, pgx.RowToAddrOfStructByName[Transfer]) if err != nil { - errs <- err + LogError("postgres", err) + return nil, err } - }() - return out, errs, nil + return removeNulls(transfers), nil + } } // Sets up a a listener for the given channel. // Notifications will be sent through the out channel. -func NewListener( +func (db *C2ECPostgres) NewListener( cn string, out chan *Notification, -) (*pgxlisten.Listener, error) { +) (func(context.Context) error, error) { connectionString := PostgresConnectionString(&CONFIG.Database) + pgHost := os.Getenv("PGHOST") + if pgHost != "" { + LogInfo("postgres", "pghost was set") + connectionString = pgHost + } cfg, err := pgx.ParseConfig(connectionString) if err != nil { @@ -687,5 +705,16 @@ func NewListener( return nil })) - return listener, nil + return listener.Listen, nil +} + +func removeNulls[T any](l []*T) []*T { + + withoutNulls := make([]*T, 0) + for _, e := range l { + if e != nil { + withoutNulls = append(withoutNulls, e) + } + } + return withoutNulls } |