summaryrefslogtreecommitdiff
path: root/c2ec/retrier.go
blob: bab148fbdda96b147f5cb74a487a09c8fb08eaa2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
package main

import (
	"context"
	"errors"
	"strconv"
	"time"
)

const RETRY_CHANNEL_BUFFER_SIZE = 10
const PS_RETRY_CHANNEL = "retry"

func RunRetrier(ctx context.Context, errs chan error) {

	for _, p := range CONFIG.Providers {
		if PROVIDER_CLIENTS[p.Name] == nil {
			err := errors.New("no provider client initialized for provider " + p.Name)
			LogError("retrier", err)
			errs <- err
		}
	}

	notifications := make(chan *Notification, RETRY_CHANNEL_BUFFER_SIZE)
	go retryCallback(ctx, notifications, errs)
}

func retryCallback(ctx context.Context, notifications chan *Notification, errs chan error) {

	listener, err := NewListener(PS_RETRY_CHANNEL, notifications)
	if err != nil {
		LogError("retrier", err)
		errs <- errors.New("retrier needs to be setup first")
	}

	go func() {
		LogInfo("retrier", "retrier starts listening for retry notifications at the db")
		err := listener.Listen(ctx)
		if err != nil {
			LogError("retry-listener", err)
			errs <- err
		}
		close(notifications)
		close(errs)
	}()

	// Listen is started async. We can therefore block here and must
	// not run the retrieval logic in own goroutine
	for {
		select {
		case notification := <-notifications:
			// the dispatching and setup of the retry process can
			// be kicked off asynchronically, thus not blocking
			// further incoming notifications.
			go dispatchRetry(notification, errs)
		case <-ctx.Done():
			errs <- ctx.Err()
			return
		}
	}
}

func dispatchRetry(n *Notification, errs chan error) {

	withdrawalId, err := strconv.Atoi(n.Payload)
	if err != nil {
		LogError("retrier", err)
		errs <- err
		return
	}

	withdrawal, err := DB.GetWithdrawalById(withdrawalId)
	if err != nil {
		LogError("retrier", err)
		errs <- err
		return
	}

	provider, err := DB.GetProviderByTerminal(int(withdrawal.TerminalId))
	if err != nil {
		LogError("retrier", err)
		errs <- err
		return
	}

	err = DB.SetRetryCounter(withdrawalId, int(withdrawal.RetryCounter)+1)
	if err != nil {
		LogError("retrier", err)
		errs <- err
		return
	}

	time.Sleep(time.Duration(CONFIG.Server.RetryDelayMs) * time.Millisecond)

	client := PROVIDER_CLIENTS[provider.Name]
	transaction, err := client.GetTransaction(*withdrawal.ProviderTransactionId)
	if err != nil {
		LogError("retrier", err)
		errs <- err
		return
	}

	finaliseOrSetRetry(transaction, withdrawalId, errs)
}