summaryrefslogtreecommitdiff
path: root/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
blob: a4cfccbb35731a6cc683437e1d79d6db6f8dd044 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
package tech.libeufin.sandbox

import CamtBankAccountEntry
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.jsonMapper
import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
import io.ktor.utils.io.jvm.javaio.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.transactions.transaction
import tech.libeufin.util.*
import java.math.BigDecimal
import kotlin.system.exitProcess

/**
 * This file contains the logic for downloading/submitting incoming/outgoing
 * fiat transactions to Nexus.  It needs the following values for operating.
 *
 * 1.  Nexus URL.
 * 2.  Credentials to authenticate at Nexus JSON API.
 * 3.  Long-polling interval.
 * 4.  Frequency of the download loop.
 *
 * Notes:
 *
 * 1.  The account to credit on incoming transactions is ALWAYS "admin".
 * 2.  The time to submit a new payment is as soon as "admin" receives one
 *     incoming regional payment.
 * 3.  At this time, Nexus does NOT offer long polling when it serves the
 *     transactions via its JSON API. => Fixed.
 * 4.  At this time, Nexus does NOT offer any filter when it serves the
 *     transactions via its JSON API. => Can be fixed by using the TWG.
 */

// DEFINITIONS AND HELPERS

/**
 * Timeout the HTTP client waits for the server to respond,
 * after the request is made.
 */
val waitTimeout = 30000L

/**
 * Time to wait before HTTP requesting again to the server.
 * This helps to avoid tight cycles in case the server responds
 * quickly or the client doesn't long-poll.
 */
val newIterationTimeout = 2000L

/**
 * Response format of Nexus GET /transactions.
 */
data class TransactionItem(
    val index: String,
    val camtData: CamtBankAccountEntry
)
data class NexusTransactions(
    val transactions: List<TransactionItem>
)

/**
 * Executes the 'block' function every 'loopNewReqMs' milliseconds.
 * Does not exit/fail the process upon exceptions - just logs them.
 */
fun downloadLoop(block: () -> Unit) {
    // Needs "runBlocking {}" to call "delay()" and in case 'block'
    // contains suspend functions.
    runBlocking {
        while(true) {
            try { block() }
            catch (e: Exception) {
                /**
                 * Not exiting to tolerate network issues, or optimistically
                 * tolerate problems not caused by Sandbox itself.
                 */
                logger.error("Sandbox fiat-incoming monitor excepted: ${e.message}")
            }
            delay(newIterationTimeout)
        }
    }
}

// BUY-IN SIDE.

/**
 * Applies the buy-in ratio and fees to the fiat amount
 * that came from Nexus.  The result is the regional amount
 * that will be wired to the exchange Sandbox account.
 */
private fun applyBuyinRatioAndFees(
    amount: BigDecimal,
    ratioAndFees: RatioAndFees
): BigDecimal =
    ((amount * ratiosAndFees.buy_at_ratio.toBigDecimal())
            - ratiosAndFees.buy_in_fee.toBigDecimal()).roundToTwoDigits()
/**
 * This function downloads the incoming fiat transactions from Nexus,
 * stores them into the database and triggers the related wire transfer
 * to the Taler exchange (to be specified in 'accountToCredit').  In case
 * of errors, it pauses and retries when the server fails, but _fails_ when
 * the client does.
 */
fun buyinMonitor(
    demobankName: String, // used to get config values.
    client: HttpClient,
    accountToCredit: String,
    accountToDebit: String = "admin"
) {
    val demobank = ensureDemobank(demobankName)
    val nexusBaseUrl = getConfigValueOrThrow(demobank.config::nexusBaseUrl)
    val usernameAtNexus = getConfigValueOrThrow(demobank.config::usernameAtNexus)
    val passwordAtNexus = getConfigValueOrThrow(demobank.config::passwordAtNexus)
    val endpoint = "bank-accounts/$usernameAtNexus/transactions"
    val uriWithoutStart = joinUrl(nexusBaseUrl, endpoint) + "?long_poll_ms=$waitTimeout"

    // downloadLoop does already try-catch (without failing the process).
    downloadLoop {
        val debitBankAccount = getBankAccountFromLabel(accountToDebit)
        val uriWithStart = "$uriWithoutStart&start=${debitBankAccount.lastFiatFetch}"
        runBlocking {
            // Maybe get new fiat transactions.
            logger.debug("GETting fiat transactions from: ${uriWithStart}")
            val resp = client.get(uriWithStart) { basicAuth(usernameAtNexus, passwordAtNexus) }
            // The server failed, pause and try again
            if (resp.status.value.toString().startsWith('5')) {
                logger.error("Buy-in monitor caught a failing to Nexus.  Pause and retry.")
                logger.error("Nexus responded: ${resp.bodyAsText()}")
                delay(2000L)
                return@runBlocking
            }
            // The client failed, fail the process.
            if (resp.status.value.toString().startsWith('4')) {
                logger.error("Buy-in monitor failed at GETting to Nexus.  Fail Sandbox.")
                logger.error("Nexus responded: ${resp.bodyAsText()}")
                exitProcess(1)
            }
            // Expect 200 OK.  What if 3xx?
            if (resp.status.value != HttpStatusCode.OK.value) {
                logger.error("Unhandled response status ${resp.status.value}, failing Sandbox")
                exitProcess(1)
            }
            // Nexus responded 200 OK, analyzing the result.
            /**
             * Wire to "admin" if the subject is a public key, or do
             * nothing otherwise.
             */
            val respObj = jacksonObjectMapper().readValue(
                resp.bodyAsText(),
                NexusTransactions::class.java
            ) // errors are logged by the caller (without failing).
            respObj.transactions.forEach {
                /**
                 * If the payment doesn't contain a reserve public key,
                 * continue the iteration with the new payment.
                 */
                if (extractReservePubFromSubject(it.camtData.getSingletonSubject()) == null)
                    return@forEach
                /**
                 * The payment had a reserve public key in the subject, wire it to
                 * the exchange.  NOTE: this ensures that all the payments that the
                 * exchange gets will NOT trigger any reimbursement, because they have
                 * a valid reserve public key.  Reimbursements would in fact introduce
                 * significant friction, because they need to target _fiat_ bank accounts
                 * (the customers'), whereas the entity that _now_ pays the exchange is
                 * "admin", which lives in the regional circuit.
                 */
                // Extracts the amount and checks it's at most two fractional digits.
                val maybeValidAmount = it.camtData.amount.value
                if (!validatePlainAmount(maybeValidAmount)) {
                    logger.error("Nexus gave one amount with invalid fractional digits: $maybeValidAmount." +
                            "  The transaction has index ${it.index}")
                    // Advancing the last fetched pointer, to avoid GETting
                    // this invalid payment again.
                    transaction {
                        debitBankAccount.refresh()
                        debitBankAccount.lastFiatFetch = it.index
                    }
                }
                val convertedAmount = applyBuyinRatioAndFees(
                    maybeValidAmount.toBigDecimal(),
                    ratiosAndFees
                )
                transaction {
                    wireTransfer(
                        debitAccount = accountToDebit,
                        creditAccount = accountToCredit,
                        demobank = demobankName,
                        subject = it.camtData.getSingletonSubject(),
                        amount = "${demobank.config.currency}:$convertedAmount"
                    )
                    // Nexus enqueues the transactions such that the index increases.
                    // If Sandbox crashes here, it'll ask again using the last successful
                    // index as the start parameter.  Being this an exclusive bound, only
                    // transactions later than it are expected.
                    debitBankAccount.refresh()
                    debitBankAccount.lastFiatFetch = it.index
                }
            }
        }
    }
}

// DB query helper.  The List return type (instead of SizedIterable) lets
// the caller NOT open a transaction block to access the values -- although
// some operations _on the values_ may be forbidden.
private fun getUnsubmittedTransactions(bankAccountLabel: String): List<BankAccountTransactionEntity> {
    return transaction {
        val bankAccount = getBankAccountFromLabel(bankAccountLabel)
        val lowerExclusiveLimit = bankAccount.lastFiatSubmission?.id?.value ?: 0
        BankAccountTransactionEntity.find {
            BankAccountTransactionsTable.id greater lowerExclusiveLimit and (
                BankAccountTransactionsTable.direction eq "CRDT"
            )
        }.sortedBy { it.id }.map { it }
        // The latest payment must occupy the highest index,
        // to reliably update the bank account row with the last
        // submitted cash-out.
    }
}

// CASH-OUT SIDE.

/**
 * This function listens for regio-incoming events (LIBEUFIN_REGIO_TX)
 * on the 'watchedBankAccount' and submits the related cash-out payment
 * to Nexus.  The fiat payment will then take place ENTIRELY on Nexus'
 * responsibility.
 */
suspend fun cashoutMonitor(
    httpClient: HttpClient,
    watchedBankAccount: String = "admin",
    demobankName: String = "default", // used to get config values.
    dbEventTimeout: Long = 0 // 0 waits forever.
) {
    // Register for a REGIO_TX event.
    val eventChannel = buildChannelName(
        NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
        watchedBankAccount
    )
    val objectMapper = jacksonObjectMapper()
    val demobank = getDemobank(demobankName)
    val bankAccount = getBankAccountFromLabel(watchedBankAccount)
    val config = demobank?.config ?: throw internalServerError(
        "Demobank '$demobankName' has no configuration."
    )
    val nexusBaseUrl = getConfigValueOrThrow(config::nexusBaseUrl)
    val usernameAtNexus = getConfigValueOrThrow(config::usernameAtNexus)
    val passwordAtNexus = getConfigValueOrThrow(config::passwordAtNexus)
    val paymentInitEndpoint = nexusBaseUrl.run {
        var ret = this
        if (!ret.endsWith('/'))
            ret += '/'
        /**
         * WARNING: Nexus gives the possibility to have bank account names
         * DIFFERENT from their owner's username.  Sandbox however MUST have
         * its Nexus bank account named THE SAME as its username.
         */
        ret + "bank-accounts/$usernameAtNexus/payment-initiations"
    }
    while (true) {
        val listenHandle = PostgresListenHandle(eventChannel)
        // pessimistically LISTEN
        listenHandle.postgresListen()
        // but optimistically check for data, case some
        // arrived _before_ the LISTEN.
        var newTxs = getUnsubmittedTransactions(watchedBankAccount)
        // Data found, UNLISTEN.
        if (newTxs.isNotEmpty())
            listenHandle.postgresUnlisten()
        // Data not found, wait.
        else {
            val isNotificationArrived = listenHandle.waitOnIODispatchers(dbEventTimeout)
            if (isNotificationArrived && listenHandle.receivedPayload == "CRDT")
                newTxs = getUnsubmittedTransactions(watchedBankAccount)
        }
        if (newTxs.isEmpty())
            continue
        newTxs.forEach {
            val body = object {
                /**
                 * This field is UID of the request _as assigned by the
                 * client_.  That helps to reconcile transactions or lets
                 * Nexus implement idempotency.  It will NOT identify the created
                 * resource at the server side.  The ID of the created resource is
                 * assigned _by Nexus_ and communicated in the (successful) response.
                 */
                val uid = it.accountServicerReference
                val iban = it.creditorIban
                val bic = it.debtorBic
                val amount = "${it.currency}:${it.amount}"
                val subject = it.subject
                val name = it.creditorName
            }
            val resp = try {
                httpClient.post(paymentInitEndpoint) {
                    expectSuccess = false // Avoid excepting on !2xx
                    basicAuth(usernameAtNexus, passwordAtNexus)
                    contentType(ContentType.Application.Json)
                    setBody(objectMapper.writeValueAsString(body))
                }
            }
            // Hard-error, response did not even arrive.
            catch (e: Exception) {
                logger.error("Cash-out monitor could not reach Nexus.  Pause and retry")
                logger.error(e.message)
                delay(2000)
                return@forEach
            }
            // Server fault.  Pause and retry.
            if (resp.status.value.toString().startsWith('5')) {
                logger.error("Cash-out monitor POSTed to a failing Nexus.  Pause and retry")
                logger.error(resp.bodyAsText())
                delay(2000L)
            }
            // Client fault, fail Sandbox.
            if (resp.status.value.toString().startsWith('4')) {
                logger.error("Cash-out monitor failed at POSTing to Nexus.  Fail Sandbox")
                logger.error("Nexus responded: ${resp.bodyAsText()}")
                exitProcess(1)
            }
            // Expecting 200 OK.  What if 3xx?
            if (resp.status.value != HttpStatusCode.OK.value) {
                logger.error("Cash-out monitor, unhandled response status: ${resp.status.value}.  Fail Sandbox")
                exitProcess(1)

                // Previous versions use to store the faulty transaction
                // and continue the execution.  The block below shows how
                // to do that.

                /*transaction {
                  CashoutSubmissionEntity.new {
                    localTransaction = it.id
                    this.hasErrors = true
                    if (maybeResponseBody.isNotEmpty())
                      this.maybeNexusResposnse = maybeResponseBody
                    }
                  bankAccount.lastFiatSubmission = it
                }*/
            }
            // Successful case, mark the wire transfer as submitted,
            // and advance the pointer to the last submitted payment.
            val responseBody = resp.bodyAsText()
            transaction {
                CashoutSubmissionEntity.new {
                    localTransaction = it.id
                    hasErrors = false
                    submissionTime = resp.responseTime.timestamp
                    isSubmitted = true
                    // Expectedly is > 0 and contains the submission
                    // unique identifier _as assigned by Nexus_.  Not
                    // currently used by Sandbox, but may help to resolve
                    // disputes.
                    if (responseBody.isNotEmpty())
                        maybeNexusResposnse = responseBody
                }
                // Advancing the 'last submitted bookmark', to avoid
                // handling the same transaction multiple times.
                bankAccount.lastFiatSubmission = it
            }
        }
    }
}