summaryrefslogtreecommitdiff
path: root/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
blob: 4045d10e4797aaf59ffa3562b7d526c120bbb5d4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
package tech.libeufin.sandbox

import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.transactions.transaction
import tech.libeufin.util.*

/**
 * This file contains the logic for downloading/submitting incoming/outgoing
 * fiat transactions to Nexus.  It needs the following values for operating.
 *
 * 1.  Nexus URL.
 * 2.  Credentials to authenticate at Nexus JSON API.
 * 3.  Long-polling interval.
 * 4.  Frequency of the download loop.
 *
 * Notes:
 *
 * 1.  The account to credit on incoming transactions is ALWAYS "admin".
 * 2.  The time to submit a new payment is as soon as "admin" receives one
 *     incoming regional payment.
 * 3.  At this time, Nexus does NOT offer long polling when it serves the
 *     transactions via its JSON API.
 * 4.  At this time, Nexus does NOT offer any filter when it serves the
 *     transactions via its JSON API.
 */

/**
 * Timeout the HTTP client waits for the server to respond,
 * after the request is made.
 */
val waitTimeout = 30000L

/**
 * Time to wait before HTTP requesting again to the server.
 * This helps to avoid tight cycles in case the server responds
 * quickly or the client doesn't long-poll.
 */
val newIterationTimeout = 2000L

/**
 * Executes the 'block' function every 'loopNewReqMs' milliseconds.
 * Does not exit/fail the process upon exceptions - just logs them.
 */
fun downloadLoop(block: () -> Unit) {
    // Needs "runBlocking {}" to call "delay()" and in case 'block'
    // contains suspend functions.
    runBlocking {
        while(true) {
            try { block() }
            catch (e: Exception) {
                /**
                 * Not exiting to tolerate network issues, or optimistically
                 * tolerate problems not caused by Sandbox itself.
                 */
                logger.error("Sandbox fiat-incoming monitor excepted: ${e.message}")
            }
            delay(newIterationTimeout)
        }
    }
}

/**
 * This function downloads the incoming fiat transactions from Nexus,
 * stores them into the database and signals their arrival (LIBEUFIN_FIAT_INCOMING)
 * to allow crediting the "admin" account.
 */
// fetchTransactions()

/**
 * This function listens for fiat-incoming events (LIBEUFIN_FIAT_INCOMING)
 * and credits the "admin" account as a reaction.  Lastly, the Nexus instance
 * wired to Sandbox will pick the new payment and serve it via its TWG, but
 * this is OUT of the Sandbox scope.
 */
// creditAdmin()

// DB query helper.  The List return type (instead of SizedIterable) lets
// the caller NOT open a transaction block to access the values -- although
// some operations _on the values_ may be forbidden.
private fun getUnsubmittedTransactions(bankAccountLabel: String): List<BankAccountTransactionEntity> {
    return transaction {
        val bankAccount = getBankAccountFromLabel(bankAccountLabel)
        val lowerExclusiveLimit = bankAccount.lastFiatSubmission?.id?.value ?: 0
        BankAccountTransactionEntity.find {
            BankAccountTransactionsTable.id greater lowerExclusiveLimit and (
                BankAccountTransactionsTable.direction eq "CRDT"
            )
        }.sortedBy { it.id }.map { it }
        // The latest payment must occupy the highest index,
        // to reliably update the bank account row with the last
        // submitted cash-out.
    }
}

/**
 * This function listens for regio-incoming events (LIBEUFIN_REGIO_TX)
 * and submits the related cash-out payment to Nexus.  The fiat payment will
 * then take place ENTIRELY on Nexus' responsibility.
 */
suspend fun cashoutMonitor(
    httpClient: HttpClient,
    bankAccountLabel: String = "admin",
    demobankName: String = "default" // used to get config values.
) {
    // Register for a REGIO_TX event.
    val eventChannel = buildChannelName(
        NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
        bankAccountLabel
    )
    val objectMapper = jacksonObjectMapper()
    val demobank = getDemobank(demobankName)
    val bankAccount = getBankAccountFromLabel(bankAccountLabel)
    val config = demobank?.config ?: throw internalServerError(
        "Demobank '$demobankName' has no configuration."
    )
    val nexusBaseUrl = getConfigValueOrThrow(config::nexusBaseUrl)
    val usernameAtNexus = getConfigValueOrThrow(config::usernameAtNexus)
    val passwordAtNexus = getConfigValueOrThrow(config::passwordAtNexus)
    val paymentInitEndpoint = nexusBaseUrl.run {
        var ret = this
        if (!ret.endsWith('/'))
            ret += '/'
        /**
         * WARNING: Nexus gives the possibility to have bank account names
         * DIFFERENT from their owner's username.  Sandbox however MUST have
         * its Nexus bank account named THE SAME as its username (until the
         * config will allow to change).
         */
        ret + "bank-accounts/$usernameAtNexus/payment-initiations"
    }
    while (true) {
        // delaying here avoids to delay in multiple places (errors,
        // lack of action, success)
        delay(2000)
        val listenHandle = PostgresListenHandle(eventChannel)
        // pessimistically LISTEN
        listenHandle.postgresListen()
        // but optimistically check for data, case some
        // arrived _before_ the LISTEN.
        var newTxs = getUnsubmittedTransactions(bankAccountLabel)
        // Data found, UNLISTEN.
        if (newTxs.isNotEmpty())
            listenHandle.postgresUnlisten()
        // Data not found, wait.
        else {
            // OK to block, because the next event is going to
            // be _this_ one.  The caller should however execute
            // this whole logic in a thread other than the main
            // HTTP server.
            val isNotificationArrived = listenHandle.postgresGetNotifications(waitTimeout)
            if (isNotificationArrived && listenHandle.receivedPayload == "CRDT")
                newTxs = getUnsubmittedTransactions(bankAccountLabel)
        }
        if (newTxs.isEmpty())
            continue
        newTxs.forEach {
            val body = object {
                /**
                 * This field is UID of the request _as assigned by the
                 * client_.  That helps to reconcile transactions or lets
                 * Nexus implement idempotency.  It will NOT identify the created
                 * resource at the server side.  The ID of the created resource is
                 * assigned _by Nexus_ and communicated in the (successful) response.
                 */
                val uid = it.accountServicerReference
                val iban = it.creditorIban
                val bic = it.debtorBic
                val amount = "${it.currency}:${it.amount}"
                val subject = it.subject
                val name = it.creditorName
            }
            val resp = try {
                httpClient.post(paymentInitEndpoint) {
                    expectSuccess = false // Avoid excepting on !2xx
                    basicAuth(usernameAtNexus, passwordAtNexus)
                    contentType(ContentType.Application.Json)
                    setBody(objectMapper.writeValueAsString(body))
                }
            }
            // Hard-error, response did not even arrive.
            catch (e: Exception) {
                logger.error(e.message)
                // mark as failed and proceed to the next one.
                transaction {
                    CashoutSubmissionEntity.new {
                        this.localTransaction = it.id
                        this.hasErrors = true
                    }
                    bankAccount.lastFiatSubmission = it
                }
                return@forEach
            }
            // Handle the non 2xx error case.  Here we try
            // to store the response from Nexus.
            if (resp.status.value != HttpStatusCode.OK.value) {
                val maybeResponseBody = resp.bodyAsText()
                logger.error(
                    "Fiat submission response was: $maybeResponseBody," +
                            " status: ${resp.status.value}"
                )
                transaction {
                    CashoutSubmissionEntity.new {
                        localTransaction = it.id
                        this.hasErrors = true
                        if (maybeResponseBody.length > 0)
                            this.maybeNexusResposnse = maybeResponseBody
                    }
                    bankAccount.lastFiatSubmission = it
                }
                return@forEach
            }
            // Successful case, mark the wire transfer as submitted,
            // and advance the pointer to the last submitted payment.
            val responseBody = resp.bodyAsText()
            transaction {
                CashoutSubmissionEntity.new {
                    localTransaction = it.id
                    hasErrors = false
                    submissionTime = resp.responseTime.timestamp
                    isSubmitted = true
                    // Expectedly is > 0 and contains the submission
                    // unique identifier _as assigned by Nexus_.  Not
                    // currently used by Sandbox, but may help to resolve
                    // disputes.
                    if (responseBody.length > 0)
                        maybeNexusResposnse = responseBody
                }
                // Advancing the 'last submitted bookmark', to avoid
                // handling the same transaction multiple times.
                bankAccount.lastFiatSubmission = it
            }
        }
    }
}