1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
|
package main
import (
"context"
"errors"
"strconv"
"time"
)
const RETRY_CHANNEL_BUFFER_SIZE = 10
const PS_RETRY_CHANNEL = "retry"
func RunRetrier(ctx context.Context, errs chan error) {
for _, p := range CONFIG.Providers {
if PROVIDER_CLIENTS[p.Name] == nil {
err := errors.New("no provider client initialized for provider " + p.Name)
LogError("retrier", err)
errs <- err
}
}
notifications := make(chan *Notification, RETRY_CHANNEL_BUFFER_SIZE)
go retryCallback(ctx, notifications, errs)
}
func retryCallback(ctx context.Context, notifications chan *Notification, errs chan error) {
listener, err := NewListener(PS_RETRY_CHANNEL, notifications)
if err != nil {
LogError("retrier", err)
errs <- errors.New("retrier needs to be setup first")
}
go func() {
LogInfo("retrier", "retrier starts listening for retry notifications at the db")
err := listener.Listen(ctx)
if err != nil {
LogError("retry-listener", err)
errs <- err
}
close(notifications)
close(errs)
}()
// Listen is started async. We can therefore block here and must
// not run the retrieval logic in own goroutine
for {
select {
case notification := <-notifications:
// the dispatching and setup of the retry process can
// be kicked off asynchronically, thus not blocking
// further incoming notifications.
go dispatchRetry(notification, errs)
case <-ctx.Done():
errs <- ctx.Err()
return
}
}
}
func dispatchRetry(n *Notification, errs chan error) {
withdrawalId, err := strconv.Atoi(n.Payload)
if err != nil {
LogError("retrier", err)
errs <- err
return
}
withdrawal, err := DB.GetWithdrawalById(withdrawalId)
if err != nil {
LogError("retrier", err)
errs <- err
return
}
provider, err := DB.GetProviderByTerminal(int(withdrawal.TerminalId))
if err != nil {
LogError("retrier", err)
errs <- err
return
}
err = DB.SetRetryCounter(withdrawalId, int(withdrawal.RetryCounter)+1)
if err != nil {
LogError("retrier", err)
errs <- err
return
}
time.Sleep(time.Duration(CONFIG.Server.RetryDelayMs) * time.Millisecond)
client := PROVIDER_CLIENTS[provider.Name]
transaction, err := client.GetTransaction(*withdrawal.ProviderTransactionId)
if err != nil {
LogError("retrier", err)
errs <- err
return
}
finaliseOrSetRetry(transaction, withdrawalId, errs)
}
|