cashless2ecash

cashless2ecash: pay with cards for digital cash (experimental)
Log | Files | Refs | README

commit 829510aa743f6f24474bfe98ef4c2b395834616c
parent cd1a9c2cbe1ac7ed2e467ce8611f934298523a6e
Author: Joel-Haeberli <haebu@rubigen.ch>
Date:   Fri, 17 May 2024 17:46:45 +0200

fix: refund and transfer retries

Diffstat:
M.gitignore | 2++
Mc2ec/api-wire-gateway.go | 5++---
Mc2ec/config.go | 1+
Mc2ec/db.go | 4++++
Ac2ec/exponential-backoff.go | 67+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ac2ec/exponential-backoff_test.go | 62++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mc2ec/main.go | 98+++++++++++++++++++++++++++++++++++++++++--------------------------------------
Mc2ec/payto.go | 11+++++------
Mc2ec/proc-attestor.go | 6++++--
Mc2ec/proc-transfer.go | 108+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------
Mc2ec/wallee-client.go | 35++++++++++++++++++++++++-----------
Mc2ec/wallee-client_test.go | 5+++++
Mc2ec/wallee-models.go | 43+++++++++++++++++++++++++++++++++++--------
13 files changed, 346 insertions(+), 101 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -7,6 +7,8 @@ c2ec/.vscode c2ec/c2ec-log.txt cli/c2ec-cli +setup-local + LocalMakefile .~lock.*.odg# diff --git a/c2ec/api-wire-gateway.go b/c2ec/api-wire-gateway.go @@ -210,8 +210,7 @@ func transfer(res http.ResponseWriter, req *http.Request) { return } - ptid := strconv.Itoa(tid) - w, err := DB.GetWithdrawalByProviderTransactionId(ptid) + w, err := DB.GetWithdrawalByProviderTransactionId(tid) if err != nil || w == nil { LogError("wire-gateway-api", err) res.WriteHeader(HTTP_INTERNAL_SERVER_ERROR) @@ -224,7 +223,7 @@ func transfer(res http.ResponseWriter, req *http.Request) { res.WriteHeader(HTTP_INTERNAL_SERVER_ERROR) return } - refundClient.Refund(ptid) + refundClient.Refund(tid) } } diff --git a/c2ec/config.go b/c2ec/config.go @@ -224,6 +224,7 @@ func ParseIni(content []byte) (*C2ECConfig, error) { connstr := value.String() + // TODO do proper err = os.Setenv("PGHOST", connstr) if err != nil { return nil, err diff --git a/c2ec/db.go b/c2ec/db.go @@ -220,6 +220,10 @@ type C2ECDatabase interface { // id shall be used as starting point. GetTransfers(start int, delta int) ([]*Transfer, error) + // Returns the transfer entries in the given state. + // This can be used for retry operations. + GetTransfersByState(status int) ([]*Transfer, error) + // A listener can listen for notifications ont the specified // channel. Returns a listen function, which must be called // by the caller to start listening on the channel. The returned diff --git a/c2ec/exponential-backoff.go b/c2ec/exponential-backoff.go @@ -0,0 +1,67 @@ +package main + +import ( + "fmt" + "math" + "math/rand" + "time" +) + +const EXPONENTIAL_BACKOFF_BASE = 2 + +const RANDOMIZATION_THRESHOLD_FACTOR = 0.2 // +/- 20% + +/* +Generic implementation of a limited exponential backoff +algorithm. It includes a randomization to prevent +self-synchronization issues. + +Parameters: + + - lastExecution: time of the last execution + - retryCount : number of the retries + - limitMs : field shall be the maximal milliseconds to backoff before retry happens +*/ +func ShouldStartRetry( + lastExecution time.Time, + retryCount int, + limitMs int, +) bool { + + backoffMs := exponentialBackoffMs(retryCount) + randomizedBackoff := randomizeBackoff(backoffMs) + if backoffMs > int64(limitMs) { + LogInfo("exponential-backoff", fmt.Sprintf("backoff limit exceeded. setting manual limit: %d", limitMs)) + randomizedBackoff = int64(limitMs) + } + + now := time.Now().Unix() + backoffTime := lastExecution.Unix() + randomizedBackoff + if now > backoffTime { + LogInfo("exponential-backoff", "backoff time not exceeded. do not retry yet.") + return false + } + return true +} + +func exponentialBackoffMs(retries int) int64 { + + return int64(math.Pow(EXPONENTIAL_BACKOFF_BASE, float64(retries))) +} + +func randomizeBackoff(backoff int64) int64 { + + // it's about randomizing on millisecond base... we mustn't care about rounding + threshold := int64(float64(backoff)*RANDOMIZATION_THRESHOLD_FACTOR) + 1 // +1 to guarantee positive threshold + randomizedThreshold := rand.Int63n(threshold) + subtract := rand.Int31n(100) // upper boundary is exclusive (value is between 0 and 99) + + if subtract < 50 { + subtracted := backoff - randomizedThreshold + if subtracted < 0 { + return 0 + } + return backoff - randomizedThreshold + } + return backoff + randomizedThreshold +} diff --git a/c2ec/exponential-backoff_test.go b/c2ec/exponential-backoff_test.go @@ -0,0 +1,62 @@ +package main + +import ( + "fmt" + "testing" + "time" +) + +func TestShouldRetryYes(t *testing.T) { + + lastExecution := time.Now().Add(-(time.Duration(10 * time.Second))) // 10 seconds ago + retries := 4 // no retries + limitMs := 1000 // second + + retry := ShouldStartRetry(lastExecution, retries, limitMs) + if !retry { + fmt.Println("expected retry = true but was false") + t.FailNow() + } +} + +func TestShouldRetryNo(t *testing.T) { + + lastExecution := time.Now().Add(-(time.Duration(10 * time.Second))) // 10 seconds ago + retries := 3 // three retries + limitMs := 1000 // second + + retry := ShouldStartRetry(lastExecution, retries, limitMs) + if retry { + fmt.Println("expected retry = false but was true") + t.FailNow() + } +} + +func TestBackoff(t *testing.T) { + + expectations := []int{1, 2, 4, 8, 16, 32, 64, 128, 256} + for i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8} { + backoff := exponentialBackoffMs(i) + if backoff != int64(expectations[i]) { + fmt.Printf("expected %d, but got %d", expectations[i], backoff) + t.FailNow() + } + } +} + +func TestRandomization(t *testing.T) { + + input := 100 + lowerBoundary := 80 // -20% + upperBoundary := 120 // +20% + rounds := 1000 + currentRound := 0 + for currentRound < rounds { + randomized := randomizeBackoff(int64(input)) + if randomized < int64(lowerBoundary) || randomized > int64(upperBoundary) { + fmt.Printf("round %d failed. Expected value between %d and %d but got %d", currentRound, lowerBoundary, upperBoundary, randomized) + t.FailNow() + } + currentRound++ + } +} diff --git a/c2ec/main.go b/c2ec/main.go @@ -87,54 +87,17 @@ func main() { transferCtx, transferCancel := context.WithCancel(context.Background()) defer transferCancel() transferErrs := make(chan error) - RunRefunder(attestorCtx, attestorErrs) + RunRefunder(transferCtx, transferErrs) LogInfo("main", "refunder is running") router := http.NewServeMux() + routerErrs := make(chan error) setupBankIntegrationRoutes(router) - setupWireGatewayRoutes(router) - setupTerminalRoutes(router) - server := http.Server{ - Handler: router, - } - - routerErrs := make(chan error) - if CONFIG.Server.UseUnixDomainSocket { - - socket, err := net.Listen("unix", CONFIG.Server.UnixSocketPath) - if err != nil { - panic("failed listening on socket: " + err.Error()) - } - - // cleans up socket when process fails and is shutdown. - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - go func() { - <-c - os.Remove(CONFIG.Server.UnixSocketPath) - os.Exit(1) - }() - - go func() { - LogInfo("main", "serving at unix-domain-socket "+server.Addr) - if err = server.Serve(socket); err != nil { - routerErrs <- err - } - }() - } else { - - go func() { - server.Addr = fmt.Sprintf("%s:%d", CONFIG.Server.Host, CONFIG.Server.Port) - LogInfo("main", "serving at "+server.Addr) - if err = server.ListenAndServe(); err != nil { - routerErrs <- err - } - }() - } + startListening(router, routerErrs) // since listening for incoming request, attesting payments and // retrying payments are separated processes who can fail @@ -151,25 +114,26 @@ func main() { LogError("main", routerError) attestorCancel() retryCancel() + transferCancel() panic(routerError) - case attestationError := <-attestorErrs: - LogError("main from attestor", attestationError) case <-attestorCtx.Done(): attestorCancel() // first run old cancellation function attestorCtx, attestorCancel = context.WithCancel(context.Background()) RunAttestor(attestorCtx, attestorErrs) - case retryError := <-retryErrs: - LogError("main from retrier", retryError) case <-retryCtx.Done(): retryCancel() // first run old cancellation function retryCtx, retryCancel = context.WithCancel(context.Background()) RunRetrier(retryCtx, retryErrs) - case transferError := <-transferErrs: - LogError("main from refunder", transferError) case <-transferCtx.Done(): transferCancel() // first run old cancellation function transferCtx, transferCancel = context.WithCancel(context.Background()) - RunRefunder(retryCtx, retryErrs) + RunRefunder(transferCtx, transferErrs) + case attestationError := <-attestorErrs: + LogError("main-from-proc-attestor", attestationError) + case retryError := <-retryErrs: + LogError("main-from-proc-retrier", retryError) + case transferError := <-transferErrs: + LogError("main-from-proc-transfer", transferError) } } } @@ -316,3 +280,43 @@ func setupTerminalRoutes(router *http.ServeMux) { handleWithdrawalAbortTerminal, ) } + +func startListening(router *http.ServeMux, errs chan error) { + + server := http.Server{ + Handler: router, + } + + if CONFIG.Server.UseUnixDomainSocket { + + socket, err := net.Listen("unix", CONFIG.Server.UnixSocketPath) + if err != nil { + panic("failed listening on socket: " + err.Error()) + } + + // cleans up socket when process fails and is shutdown. + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + go func() { + <-c + os.Remove(CONFIG.Server.UnixSocketPath) + os.Exit(1) + }() + + go func() { + LogInfo("main", "serving at unix-domain-socket "+server.Addr) + if err = server.Serve(socket); err != nil { + errs <- err + } + }() + } else { + + go func() { + server.Addr = fmt.Sprintf("%s:%d", CONFIG.Server.Host, CONFIG.Server.Port) + LogInfo("main", "serving at "+server.Addr) + if err := server.ListenAndServe(); err != nil { + errs <- err + } + }() + } +} diff --git a/c2ec/payto.go b/c2ec/payto.go @@ -3,7 +3,6 @@ package main import ( "errors" "fmt" - "strconv" "strings" ) @@ -30,18 +29,18 @@ var REGISTERED_TARGET_TYPES = []string{ // This method parses a payto-uri (RFC 8905: https://www.rfc-editor.org/rfc/rfc8905.html) // The method only parses the target type "wallee-transaction" as specified // in the payto GANA registry (https://gana.gnunet.org/payto-payment-target-types/payto_payment_target_types.html) -func ParsePaytoWalleeTransaction(uri string) (string, int, error) { +func ParsePaytoWalleeTransaction(uri string) (string, string, error) { if t, i, err := ParsePaytoUri(uri); err != nil { - tid, err := strconv.Atoi(i) + _, err := decodeCrock(i) if err != nil { - return "", -1, errors.New("invalid transaction-id for wallee-transaction") + return "", "", errors.New("invalid transaction-id for wallee-transaction") } - return t, tid, nil + return t, i, nil } else { - return t, -1, err + return t, "", err } } diff --git a/c2ec/proc-attestor.go b/c2ec/proc-attestor.go @@ -11,6 +11,7 @@ import ( const PAYMENT_NOTIFICATION_CHANNEL_BUFFER_SIZE = 10 const PS_PAYMENT_NOTIFICATION_CHANNEL = "payment_notification" +const MAX_BACKOFF_MS = 30 * 60000 // thirty minutes // Sets up and runs an attestor in the background. This must be called at startup. func RunAttestor( @@ -134,8 +135,9 @@ func prepareRetryOrAbort( return } - if withdrawal.RetryCounter >= CONFIG.Server.MaxRetries { - + // TODO retry will not work like this at the moment + execRetry := ShouldStartRetry(time.Unix(*withdrawal.LastRetryTs, 0), int(withdrawal.RetryCounter), MAX_BACKOFF_MS) + if !execRetry { LogInfo("attestor", fmt.Sprintf("max retries for withdrawal with id=%d was reached. withdrawal is aborted.", withdrawal.WithdrawalRowId)) err := DB.FinaliseWithdrawal(withdrawalRowId, ABORTED, make([]byte, 0)) if err != nil { diff --git a/c2ec/proc-transfer.go b/c2ec/proc-transfer.go @@ -5,10 +5,11 @@ import ( "encoding/base64" "errors" "fmt" - "strconv" "time" ) +const REFUND_RETRY_INTERVAL_SECONDS = 1 + const REFUND_CHANNEL_BUFFER_SIZE = 10 const PS_REFUND_CHANNEL = "transfer" @@ -16,6 +17,8 @@ const TRANSFER_STATUS_SUCCESS = 0 const TRANSFER_STATUS_RETRY = 1 const TRANSFER_STATUS_FAILED = -1 +const MAX_TRANSFER_BACKOFF_MS = 24 * 60 * 60 * 1000 // 1 day + // Sets up and runs an attestor in the background. This must be called at startup. func RunRefunder( ctx context.Context, @@ -29,6 +32,14 @@ func RunRefunder( make(chan *Notification, REFUND_CHANNEL_BUFFER_SIZE), errs, ) + + go func() { + + for { + time.Sleep(REFUND_RETRY_INTERVAL_SECONDS * time.Second) + executePendingTransfers(errs) + } + }() } func transferCallback(notification *Notification, errs chan error) { @@ -73,7 +84,7 @@ func transferCallback(notification *Notification, errs chan error) { errs <- errors.New("no provider client registered for provider " + provider.Name) } - err = client.Refund(strconv.Itoa(tid)) + err = client.Refund(tid) if err != nil { LogError("refunder", err) transferFailed(transfer, errs) @@ -91,36 +102,85 @@ func transferCallback(notification *Notification, errs chan error) { } } -func transferScheduler() { - // TODO somehow schedule transfer jobs periodically -} +func executePendingTransfers(errs chan error) { -func transferFailed( - transfer *Transfer, - errs chan error, -) { + transfers, err := DB.GetTransfersByState(TRANSFER_STATUS_RETRY) + if err != nil { + LogError("proc-transfer", err) + return + } + + for _, t := range transfers { - // TODO: Delay retry somehow... randomize due to self-synch problems + shouldRetry := ShouldStartRetry(time.Unix(t.TransferTs, 0), int(t.Retries), MAX_TRANSFER_BACKOFF_MS) + if !shouldRetry { + LogInfo("proc-transfer", fmt.Sprintf("not retrying transfer %d, because backoff not yet exceeded", t.RowId)) + } + + paytoTargetType, tid, err := ParsePaytoWalleeTransaction(t.CreditAccount) + if err != nil { + LogError("proc-transfer", err) + errs <- err + continue + } - if transfer.Retries > 2 { - err := DB.UpdateTransfer( - transfer.RequestUid, - time.Now().Unix(), - TRANSFER_STATUS_FAILED, // transfer ultimatively failed. - transfer.Retries, - ) + provider, err := DB.GetTerminalProviderByPaytoTargetType(paytoTargetType) if err != nil { + LogError("proc-transfer", err) errs <- err + continue + } + + client := PROVIDER_CLIENTS[provider.Name] + if client == nil { + errs <- errors.New("no provider client registered for provider " + provider.Name) + continue } - } else { - err := DB.UpdateTransfer( - transfer.RequestUid, - time.Now().Unix(), - TRANSFER_STATUS_RETRY, // retry transfer. - transfer.Retries+1, - ) + + err = client.Refund(tid) if err != nil { + LogError("proc-transfer", err) errs <- err + continue } } + close(errs) +} + +func transferFailed( + transfer *Transfer, + errs chan error, +) { + + err := DB.UpdateTransfer( + transfer.RequestUid, + time.Now().Unix(), + TRANSFER_STATUS_RETRY, // retry transfer. + transfer.Retries+1, + ) + if err != nil { + errs <- err + } + + // if transfer.Retries > 2 { + // err := DB.UpdateTransfer( + // transfer.RequestUid, + // time.Now().Unix(), + // TRANSFER_STATUS_FAILED, // transfer ultimatively failed. + // transfer.Retries, + // ) + // if err != nil { + // errs <- err + // } + // } else { + // err := DB.UpdateTransfer( + // transfer.RequestUid, + // time.Now().Unix(), + // TRANSFER_STATUS_RETRY, // retry transfer. + // transfer.Retries+1, + // ) + // if err != nil { + // errs <- err + // } + //} } diff --git a/c2ec/wallee-client.go b/c2ec/wallee-client.go @@ -54,10 +54,10 @@ func (wt *WalleeTransaction) AbortWithdrawal() bool { strings.EqualFold(string(wt.State), string(StateDecline)) } -func (wt *WalleeTransaction) FormatPayto() string { +// func (wt *WalleeTransaction) FormatPayto() string { - return fmt.Sprintf("payto://wallee-transaction/%d", wt.Id) -} +// return fmt.Sprintf("payto://wallee-transaction/%d", wt.Id) +// } func (wt *WalleeTransaction) Bytes() []byte { @@ -163,15 +163,28 @@ func (w *WalleeClient) Refund(transactionId string) error { return err } - // TODO generate refund object. needs Completion and Transaction IDs + withdrawal, err := DB.GetWithdrawalByProviderTransactionId(transactionId) + if err != nil || withdrawal == nil { + if err != nil { + return err + } else { + return errors.New("unable to find withdrawal belonging to transactionId=" + transactionId) + } + } + + decodedWalleeTransaction, err := NewJsonCodec[WalleeTransaction]().Decode(bytes.NewBuffer(withdrawal.CompletionProof)) + if err != nil { + return err + } + refund := &WalleeRefund{ - Amount: 10, - Completion: 10, - ExternalID: "", - MerchantReference: "", - Reductions: []WalleeLineItemReduction{}, - Transaction: 10, - Type: "", + Amount: decodedWalleeTransaction.CompletedAmount, + ExternalID: encodeCrock(withdrawal.Wopid), + MerchantReference: decodedWalleeTransaction.MerchantReference, + Transaction: WalleeRefundTransaction{ + Id: int64(decodedWalleeTransaction.Id), + }, + Type: "", } _, status, err := HttpPost[WalleeRefund, any](url, hdrs, refund, NewJsonCodec[WalleeRefund](), nil) diff --git a/c2ec/wallee-client_test.go b/c2ec/wallee-client_test.go @@ -8,6 +8,7 @@ import ( "testing" ) +const ENABLE_WALLEE_INTEGRATION_TEST = false const TEST_SPACE_ID = 0 const TEST_USER_ID = 0 const TEST_ACCESS_TOKEN = "" @@ -59,6 +60,10 @@ func TestWalleeMac(t *testing.T) { func TestTransactionSearchIntegration(t *testing.T) { + if !ENABLE_WALLEE_INTEGRATION_TEST { + return + } + filter := WalleeSearchFilter{ FieldName: "merchantReference", Operator: EQUALS, diff --git a/c2ec/wallee-models.go b/c2ec/wallee-models.go @@ -64,16 +64,43 @@ type WalleeTransactionCompletion struct { Version int `json:"version"` } +/* + { + "amount": "14.00", + "externalId": "1", + "merchantReference": "1BQMAGTYTQVM0B1EM40PDS4H4REVMNCEN9867SJQ26Q43C38RDDG", + "transaction": { + "id": 213103343 + }, + "type": "MERCHANT_INITIATED_ONLINE" + } +*/ type WalleeRefund struct { - Amount float64 `json:"amount"` - Completion int64 `json:"completion"` // ID of WalleeTransactionCompletion - ExternalID string `json:"externalId"` // Unique per transaction - MerchantReference string `json:"merchantReference"` - Reductions []WalleeLineItemReduction `json:"reductions"` - Transaction int64 `json:"transaction"` // ID of WalleeTransaction - Type string `json:"type"` // Refund Type + Amount float64 `json:"amount"` + ExternalID string `json:"externalId"` // idempotence support + MerchantReference string `json:"merchantReference"` + Transaction WalleeRefundTransaction `json:"transaction"` + /* + Refund Type (for testing (not triggered at processor): MERCHANT_INITIATED_OFFLINE + For real world (triggering at the processor): MERCHANT_INITIATED_ONLINE + */ + Type string `json:"type"` } +type WalleeRefundTransaction struct { + Id int64 `json:"id"` +} + +// type WalleeRefund struct { +// Amount float64 `json:"amount"` +// Completion int64 `json:"completion"` // ID of WalleeTransactionCompletion +// ExternalID string `json:"externalId"` // Unique per transaction +// MerchantReference string `json:"merchantReference"` +// Reductions []WalleeLineItemReduction `json:"reductions"` +// Transaction int64 `json:"transaction"` // ID of WalleeTransaction +// Type string `json:"type"` // Refund Type +// } + type WalleeLabel struct { Content []byte `json:"content"` ContentAsString string `json:"contentAsString"` @@ -184,7 +211,7 @@ type WalleeTransaction struct { FailedUrl interface{} `json:"failedUrl"` FailureReason interface{} `json:"failureReason"` Group WalleeGroup `json:"group"` - Id int `json:"id"` + Id int64 `json:"id"` InternetProtocolAddress interface{} `json:"internetProtocolAddress"` InternetProtocolAddressCountry interface{} `json:"internetProtocolAddressCountry"` InvoiceMerchantReference string `json:"invoiceMerchantReference"`