commit 5bef9183bf0fe7fd329e934b9e73b9c758ac8098
parent 829510aa743f6f24474bfe98ef4c2b395834616c
Author: Joel-Haeberli <haebu@rubigen.ch>
Date: Sat, 18 May 2024 09:25:15 +0200
fix: transfer
Diffstat:
12 files changed, 76 insertions(+), 73 deletions(-)
diff --git a/c2ec/api-wire-gateway.go b/c2ec/api-wire-gateway.go
@@ -3,7 +3,7 @@ package main
import (
"errors"
"log"
- http "net/http"
+ "net/http"
"strconv"
"time"
)
diff --git a/c2ec/c2ec-config.yaml b/c2ec/c2ec-config.yaml
@@ -8,7 +8,7 @@ c2ec:
fail-on-missing-attestors: false # forced if prod=true
credit-account: "payto://IBAN/CH50030202099498" # this account must be specified at the providers backends as well
currency: "CHF"
- max-retries: 3
+ max-retries: 100
retry-delay-ms: 1000
wire-gateway:
username: "wire"
diff --git a/c2ec/db-postgres.go b/c2ec/db-postgres.go
@@ -140,6 +140,9 @@ const PS_GET_TRANSFERS_DESC_MAX = "SELECT * FROM " + TRANSFER_TABLE_NAME +
" OFFSET ((SELECT COUNT(*) FROM " + TRANSFER_TABLE_NAME +
" WHERE " + TRANSFER_FIELD_NAME_STATUS + "=0)-1)" // TODO Timestamp based offset (-time since request)
+const PS_GET_TRANSFERS_BY_STATUS = "SELECT * FROM " + TRANSFER_TABLE_NAME +
+ " WHERE " + TRANSFER_FIELD_NAME_STATUS + "=$1"
+
// Postgres implementation of the C2ECDatabase
type C2ECPostgres struct {
C2ECDatabase
@@ -686,7 +689,6 @@ func (db *C2ECPostgres) GetTransferById(requestUid []byte) (*Transfer, error) {
LogInfo("postgres", "query="+PS_GET_TRANSFER_BY_ID)
return transfer, nil
}
-
}
func (db *C2ECPostgres) AddTransfer(
@@ -808,6 +810,35 @@ func (db *C2ECPostgres) GetTransfers(start int, delta int) ([]*Transfer, error)
}
}
+func (db *C2ECPostgres) GetTransfersByState(status int) ([]*Transfer, error) {
+
+ if rows, err := db.pool.Query(
+ db.ctx,
+ PS_GET_TRANSFERS_BY_STATUS,
+ status,
+ ); err != nil {
+ LogError("postgres", err)
+ if rows != nil {
+ rows.Close()
+ }
+ return nil, err
+ } else {
+
+ defer rows.Close()
+
+ transfers, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByName[Transfer])
+ if err != nil {
+ LogError("postgres", err)
+ return nil, err
+ }
+
+ // this will fill up the logs...
+ // LogInfo("postgres", "query="+PS_GET_TRANSFERS_BY_STATUS)
+ // LogInfo("postgres", "size of transfer list="+strconv.Itoa(len(transfers)))
+ return removeNulls(transfers), nil
+ }
+}
+
// Sets up a a listener for the given channel.
// Notifications will be sent through the out channel.
func (db *C2ECPostgres) NewListener(
diff --git a/c2ec/db/test_c2ec_simulation.sql b/c2ec/db/test_c2ec_simulation.sql
@@ -1,19 +0,0 @@
-BEGIN;
-
-SET search_path TO c2ec;
-
-DROP TABLE IF EXISTS p_id;
-
-INSERT INTO provider (name, payto_target_type, backend_base_url, backend_credentials)
- VALUES ('Simulation', 'void', 'will be simulated', 'no creds');
-
-SELECT provider_id INTO p_id FROM provider WHERE name = 'Simulation';
-
-INSERT INTO terminal (access_token, description, provider_id)
- VALUES ('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', 'this is a simulated terminal', (SELECT * FROM p_id));
-
-DROP TABLE IF EXISTS p_id;
-
-COMMIT;
-
-SELECT * FROM provider;
diff --git a/c2ec/db/test_c2ec_simulation_rollback.sql b/c2ec/db/test_c2ec_simulation_rollback.sql
@@ -1,20 +0,0 @@
-BEGIN;
-
-SET search_path TO c2ec;
-
-DROP TABLE IF EXISTS p_r_id;
-DROP TABLE IF EXISTS t_r_id;
-
-SELECT provider_id INTO p_r_id FROM provider WHERE name = 'Simulation';
-SELECT terminal_id INTO t_r_id FROM terminal WHERE provider_id = (SELECT * FROM p_r_id);
-
-DELETE FROM withdrawal WHERE terminal_id = (SELECT * FROM t_r_id);
-DELETE FROM terminal WHERE provider_id = (SELECT * FROM p_r_id);
-DELETE FROM provider WHERE provider_id = (SELECT * FROM p_r_id);
-
-DROP TABLE IF EXISTS p_r_id;
-DROP TABLE IF EXISTS t_r_id;
-
-COMMIT;
-
-SELECT * FROM provider;
-\ No newline at end of file
diff --git a/c2ec/exponential-backoff.go b/c2ec/exponential-backoff.go
@@ -61,7 +61,7 @@ func randomizeBackoff(backoff int64) int64 {
if subtracted < 0 {
return 0
}
- return backoff - randomizedThreshold
+ return subtracted
}
return backoff + randomizedThreshold
}
diff --git a/c2ec/logger.go b/c2ec/logger.go
@@ -37,6 +37,10 @@ func LogInfo(src string, msg string) {
func logAppendError(src string, level LogLevel, err error) {
+ if err == nil {
+ fmt.Println("wanted to log from " + src + " but err was nil")
+ return
+ }
logAppend(src, level, err.Error())
}
diff --git a/c2ec/main.go b/c2ec/main.go
@@ -87,7 +87,7 @@ func main() {
transferCtx, transferCancel := context.WithCancel(context.Background())
defer transferCancel()
transferErrs := make(chan error)
- RunRefunder(transferCtx, transferErrs)
+ RunTransferrer(transferCtx, transferErrs)
LogInfo("main", "refunder is running")
router := http.NewServeMux()
@@ -127,7 +127,7 @@ func main() {
case <-transferCtx.Done():
transferCancel() // first run old cancellation function
transferCtx, transferCancel = context.WithCancel(context.Background())
- RunRefunder(transferCtx, transferErrs)
+ RunTransferrer(transferCtx, transferErrs)
case attestationError := <-attestorErrs:
LogError("main-from-proc-attestor", attestationError)
case retryError := <-retryErrs:
diff --git a/c2ec/proc-attestor.go b/c2ec/proc-attestor.go
@@ -30,7 +30,7 @@ func RunAttestor(
func attestationCallback(notification *Notification, errs chan error) {
- LogInfo("attestor", fmt.Sprintf("retrieved information on channel=%s with payload=%s", notification.Channel, notification.Payload))
+ LogInfo("proc-attestor", fmt.Sprintf("retrieved information on channel=%s with payload=%s", notification.Channel, notification.Payload))
// The payload is formatted like: "{PROVIDER_NAME}|{WITHDRAWAL_ID}|{PROVIDER_TRANSACTION_ID}"
// the validation is strict. This means, that the dispatcher emits an error
@@ -60,7 +60,7 @@ func attestationCallback(notification *Notification, errs chan error) {
transaction, err := client.GetTransaction(providerTransactionId)
if err != nil {
- LogError("attestor", err)
+ LogError("proc-attestor", err)
prepareRetryOrAbort(withdrawalRowId, errs)
return
}
@@ -80,7 +80,7 @@ func finaliseOrSetRetry(
if transaction == nil {
err := errors.New("transaction was nil. will set retry or abort")
- LogError("attestor", err)
+ LogError("proc-attestor", err)
errs <- err
prepareRetryOrAbort(withdrawalRowId, errs)
return
@@ -94,7 +94,7 @@ func finaliseOrSetRetry(
err := DB.FinaliseWithdrawal(withdrawalRowId, CONFIRMED, completionProof)
if err != nil {
- LogError("attestor", err)
+ LogError("proc-attestor", err)
prepareRetryOrAbort(withdrawalRowId, errs)
}
} else {
@@ -104,7 +104,7 @@ func finaliseOrSetRetry(
if transaction.AbortWithdrawal() {
err := DB.FinaliseWithdrawal(withdrawalRowId, ABORTED, completionProof)
if err != nil {
- LogError("attestor", err)
+ LogError("proc-attestor", err)
prepareRetryOrAbort(withdrawalRowId, errs)
return
}
@@ -130,26 +130,24 @@ func prepareRetryOrAbort(
withdrawal, err := DB.GetWithdrawalById(withdrawalRowId)
if err != nil {
- LogError("attestor", err)
+ LogError("proc-attestor", err)
errs <- err
return
}
- // TODO retry will not work like this at the moment
- execRetry := ShouldStartRetry(time.Unix(*withdrawal.LastRetryTs, 0), int(withdrawal.RetryCounter), MAX_BACKOFF_MS)
- if !execRetry {
- LogInfo("attestor", fmt.Sprintf("max retries for withdrawal with id=%d was reached. withdrawal is aborted.", withdrawal.WithdrawalRowId))
+ if withdrawal.RetryCounter >= CONFIG.Server.MaxRetries {
+
+ LogInfo("proc-attestor", fmt.Sprintf("max retries for withdrawal with id=%d was reached. withdrawal is aborted.", withdrawal.WithdrawalRowId))
err := DB.FinaliseWithdrawal(withdrawalRowId, ABORTED, make([]byte, 0))
if err != nil {
- LogError("attestor", err)
+ LogError("proc-attestor", err)
}
} else {
lastRetryTs := time.Now().Unix()
err := DB.SetLastRetry(withdrawalRowId, lastRetryTs)
if err != nil {
- LogError("attestor", err)
+ LogError("proc-attestor", err)
}
}
-
}
diff --git a/c2ec/proc-retrier.go b/c2ec/proc-retrier.go
@@ -18,44 +18,49 @@ func RunRetrier(ctx context.Context, errs chan error) {
make(chan *Notification, RETRY_CHANNEL_BUFFER_SIZE),
errs,
)
+
+ go func() {
+ for {
+ time.Sleep(time.Duration(CONFIG.Server.RetryDelayMs) * time.Millisecond)
+
+ }
+ }()
}
func retryCallback(n *Notification, errs chan error) {
withdrawalId, err := strconv.Atoi(n.Payload)
if err != nil {
- LogError("retrier", err)
+ LogError("proc-retrier", err)
errs <- err
return
}
withdrawal, err := DB.GetWithdrawalById(withdrawalId)
if err != nil {
- LogError("retrier", err)
+ LogError("proc-retrier", err)
errs <- err
return
}
provider, err := DB.GetProviderByTerminal(withdrawal.TerminalId)
if err != nil {
- LogError("retrier", err)
+ LogError("proc-retrier", err)
errs <- err
return
}
err = DB.SetRetryCounter(withdrawalId, int(withdrawal.RetryCounter)+1)
if err != nil {
- LogError("retrier", err)
+ LogError("proc-retrier", err)
errs <- err
return
}
- time.Sleep(time.Duration(CONFIG.Server.RetryDelayMs) * time.Millisecond)
-
client := PROVIDER_CLIENTS[provider.Name]
transaction, err := client.GetTransaction(*withdrawal.ProviderTransactionId)
if err != nil {
- LogError("retrier", err)
+ LogError("proc-retrier", err)
errs <- err
return
}
diff --git a/c2ec/proc-transfer.go b/c2ec/proc-transfer.go
@@ -20,7 +20,7 @@ const TRANSFER_STATUS_FAILED = -1
const MAX_TRANSFER_BACKOFF_MS = 24 * 60 * 60 * 1000 // 1 day
// Sets up and runs an attestor in the background. This must be called at startup.
-func RunRefunder(
+func RunTransferrer(
ctx context.Context,
errs chan error,
) {
@@ -34,7 +34,6 @@ func RunRefunder(
)
go func() {
-
for {
time.Sleep(REFUND_RETRY_INTERVAL_SECONDS * time.Second)
executePendingTransfers(errs)
@@ -44,7 +43,7 @@ func RunRefunder(
func transferCallback(notification *Notification, errs chan error) {
- LogInfo("refunder", fmt.Sprintf("retrieved information on channel=%s with payload=%s", notification.Channel, notification.Payload))
+ LogInfo("proc-transfer", fmt.Sprintf("retrieved information on channel=%s with payload=%s", notification.Channel, notification.Payload))
transferRequestUidBase64 := notification.Payload
if transferRequestUidBase64 == "" {
@@ -60,7 +59,7 @@ func transferCallback(notification *Notification, errs chan error) {
transfer, err := DB.GetTransferById(transferRequestUid)
if err != nil {
- LogError("refunder", err)
+ LogError("proc-transfer", err)
transferFailed(transfer, errs)
errs <- err
}
@@ -74,7 +73,7 @@ func transferCallback(notification *Notification, errs chan error) {
provider, err := DB.GetTerminalProviderByPaytoTargetType(paytoTargetType)
if err != nil {
- LogError("refunder", err)
+ LogError("proc-transfer", err)
transferFailed(transfer, errs)
errs <- err
}
@@ -86,7 +85,7 @@ func transferCallback(notification *Notification, errs chan error) {
err = client.Refund(tid)
if err != nil {
- LogError("refunder", err)
+ LogError("proc-transfer", err)
transferFailed(transfer, errs)
return
}
@@ -107,6 +106,7 @@ func executePendingTransfers(errs chan error) {
transfers, err := DB.GetTransfersByState(TRANSFER_STATUS_RETRY)
if err != nil {
LogError("proc-transfer", err)
+ errs <- err
return
}
@@ -115,6 +115,7 @@ func executePendingTransfers(errs chan error) {
shouldRetry := ShouldStartRetry(time.Unix(t.TransferTs, 0), int(t.Retries), MAX_TRANSFER_BACKOFF_MS)
if !shouldRetry {
LogInfo("proc-transfer", fmt.Sprintf("not retrying transfer %d, because backoff not yet exceeded", t.RowId))
+ continue
}
paytoTargetType, tid, err := ParsePaytoWalleeTransaction(t.CreditAccount)
@@ -140,11 +141,11 @@ func executePendingTransfers(errs chan error) {
err = client.Refund(tid)
if err != nil {
LogError("proc-transfer", err)
+ transferFailed(t, errs)
errs <- err
continue
}
}
- close(errs)
}
func transferFailed(
diff --git a/c2ec/wallee-client.go b/c2ec/wallee-client.go
@@ -147,6 +147,10 @@ func (w *WalleeClient) GetTransaction(transactionId string) (ProviderTransaction
func (sc *WalleeClient) FormatPayto(w *Withdrawal) string {
+ if w == nil || w.ProviderTransactionId == nil {
+ LogError("wallee-client", errors.New("withdrawal or provider transaction identifier was nil"))
+ return ""
+ }
return fmt.Sprintf("payto://wallee-transaction/%s", *w.ProviderTransactionId)
}