summaryrefslogtreecommitdiff
path: root/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
blob: c760a2b1b89688234d33860141026e3d5e96e270 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
package tech.libeufin.sandbox

import CamtBankAccountEntry
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.transactions.transaction
import tech.libeufin.util.*
import java.math.BigDecimal
import kotlin.system.exitProcess

/**
 * This file contains the logic for downloading/submitting incoming/outgoing
 * fiat transactions to Nexus.  It needs the following values for operating.
 *
 * 1.  Nexus URL.
 * 2.  Credentials to authenticate at Nexus JSON API.
 * 3.  Long-polling interval.
 * 4.  Frequency of the download loop.
 *
 * Notes:
 *
 * 1.  The account to credit on incoming transactions is ALWAYS "admin".
 * 2.  The time to submit a new payment is as soon as "admin" receives one
 *     incoming regional payment.
 * 3.  At this time, Nexus does NOT offer long polling when it serves the
 *     transactions via its JSON API. => Fixed.
 * 4.  At this time, Nexus does NOT offer any filter when it serves the
 *     transactions via its JSON API. => Can be fixed by using the TWG.
 */

// DEFINITIONS AND HELPERS

/**
 * Timeout the HTTP client waits for the server to respond,
 * after the request is made.
 */
val waitTimeout = 30000L

/**
 * Time to wait before HTTP requesting again to the server.
 * This helps to avoid tight cycles in case the server responds
 * quickly or the client doesn't long-poll.
 */
val newIterationTimeout = 2000L

/**
 * Response format of Nexus GET /transactions.
 */
data class TransactionItem(
    val index: String,
    val camtData: CamtBankAccountEntry
)
data class NexusTransactions(
    val transactions: List<TransactionItem>
)

/**
 * This exception signals that the buy-in service could NOT
 * GET the list of fiat transactions from Nexus due to a client
 * error.  Because this is fatal (e.g. wrong credentials, URL not found..),
 * the service should be stopped.
 */
class BuyinClientError : Exception()

/**
 * This exception signals that POSTing a cash-out operation
 * to Nexus failed due to the client.  This is a fatal condition
 * therefore the monitor should be stopped.
 */
class CashoutClientError : Exception()
/**
 * Executes the 'block' function every 'loopNewReqMs' milliseconds.
 * Does not exit/fail the process upon exceptions - just logs them.
 */
fun downloadLoop(block: () -> Unit) {
    // Needs "runBlocking {}" to call "delay()" and in case 'block'
    // contains suspend functions.
    runBlocking {
        while(true) {
            try { block() }
            catch (e: BuyinClientError) {
                logger.error("The buy-in monitor had a client error while GETting new" +
                        " transactions from Neuxs.  Stopping it")
                // Rethrowing and let the caller manage it
                throw e
            }
            // Tolerating any other error type that's not due to the client.
            catch (e: Exception) {
                logger.error("Sandbox fiat-incoming monitor excepted: ${e.message}")
            }
            delay(newIterationTimeout)
        }
    }
}

// BUY-IN SIDE.

/**
 * Applies the buy-in ratio and fees to the fiat amount
 * that came from Nexus.  The result is the regional amount
 * that will be wired to the exchange Sandbox account.
 */
fun applyBuyinRatioAndFees(
    amount: BigDecimal,
    ratiosAndFees: RatioAndFees
): BigDecimal {
    val maybeBuyinAmount = ((amount * ratiosAndFees.buy_at_ratio.toBigDecimal())
            - ratiosAndFees.buy_in_fee.toBigDecimal()).roundToTwoDigits()
    // Bank's fault, as buying in should never lead to negative.
    if (maybeBuyinAmount < BigDecimal.ZERO) {
        logger.error("Negative buy-in scenario: input fiat amount was '${amount}'" +
                ", buy-in ratio was '${ratiosAndFees.buy_at_ratio}'," +
                " buy-in fee was '${ratiosAndFees.buy_in_fee}'")
        throw internalServerError("Applying buy-in fees yielded negative regional amount")
    }
    return maybeBuyinAmount
}

private fun ensureDisabledRedirects(client: HttpClient) {
    client.config {
        if (followRedirects) throw Exception(
            "HTTP client follows redirects, please disable."
        )
    }
}
/**
 * This function downloads the incoming fiat transactions from Nexus,
 * stores them into the database and triggers the related wire transfer
 * to the Taler exchange (to be specified in 'accountToCredit').  Once
 * started, this function is not supposed to return, except on _client
 * side_ errors.  On server side errors it pauses and retries.  When
 * it returns, the caller is expected to handle the error.
 */
fun buyinMonitor(
    demobankName: String, // used to get config values.
    client: HttpClient,
    accountToCredit: String,
    accountToDebit: String = "admin"
) {
    ensureDisabledRedirects(client)
    val demobank = ensureDemobank(demobankName)
    /**
     * Getting the config values to send authenticated requests
     * to Nexus.  Sandbox needs one account at Nexus before being
     * able to use these values.
     */
    val nexusBaseUrl = getConfigValueOrThrow(demobank.config::nexusBaseUrl)
    val usernameAtNexus = getConfigValueOrThrow(demobank.config::usernameAtNexus)
    val passwordAtNexus = getConfigValueOrThrow(demobank.config::passwordAtNexus)
    /**
     * This is the endpoint where Nexus serves all the transactions that
     * have ingested from the fiat bank.
     */
    val endpoint = "bank-accounts/$usernameAtNexus/transactions"
    val uriWithoutStart = joinUrl(nexusBaseUrl, endpoint) + "?long_poll_ms=$waitTimeout"

    // downloadLoop does already try-catch (without failing the process).
    downloadLoop {
        /**
         * This bank account will act as the debtor, once a new fiat
         * payment is detected.  It's the debtor that pays the related
         * regional amount to the exchange, in order to start a withdrawal
         * operation (in regional coins).
         */
        val debitBankAccount = getBankAccountFromLabel(accountToDebit)
        /**
         * Setting the 'start' URI param in the following command
         * lets Sandbox receive only unseen payments from Nexus.
         */
        val uriWithStart = "$uriWithoutStart&start=${debitBankAccount.lastFiatFetch}"
        runBlocking {
            // Maybe get new fiat transactions.
            logger.debug("GETting fiat transactions from: $uriWithStart")
            val resp = client.get(uriWithStart) {
                expectSuccess = false // Avoids excepting on !2xx
                basicAuth(usernameAtNexus, passwordAtNexus)
            }
            // The server failed, pause and try again
            if (resp.status.value.toString().startsWith('5')) {
                logger.error("Buy-in monitor requested to a failing Nexus.  Retry.")
                logger.error("Nexus responded: ${resp.bodyAsText()}")
                return@runBlocking
            }
            // The client failed, fail the process.
            if (resp.status.value.toString().startsWith('4')) {
                logger.error("Buy-in monitor failed at GETting to Nexus.  Stopping the buy-in monitor.")
                logger.error("Nexus responded: ${resp.bodyAsText()}")
                throw BuyinClientError()
            }
            // Expect 200 OK.  What if 3xx?
            if (resp.status.value != HttpStatusCode.OK.value) {
                logger.error("Unhandled response status ${resp.status.value}, failing Sandbox")
                throw BuyinClientError()
            }
            // Nexus responded 200 OK, analyzing the result.
            /**
             * Wire to "admin" if the subject is a public key, or do
             * nothing otherwise.
             */
            val respObj = jacksonObjectMapper().readValue(
                resp.bodyAsText(),
                NexusTransactions::class.java
            ) // errors are logged by the caller (without failing).
            respObj.transactions.forEach {
                // Ignoring payments with an invalid reserved public key.
                if (extractReservePubFromSubject(it.camtData.getSingletonSubject()) == null)
                    return@forEach
                // Extracts the amount and checks it's at most two fractional digits.
                val maybeValidAmount = it.camtData.amount.value
                if (!validatePlainAmount(maybeValidAmount)) {
                    logger.error("Nexus gave one amount with invalid fractional digits: $maybeValidAmount." +
                            "  The transaction has index ${it.index}")
                    // Advancing the last fetched pointer, to avoid GETting
                    // this invalid payment again.
                    transaction {
                        debitBankAccount.refresh()
                        debitBankAccount.lastFiatFetch = it.index
                    }
                }
                val convertedAmount = applyBuyinRatioAndFees(
                    maybeValidAmount.toBigDecimal(),
                    ratiosAndFees
                )
                transaction {
                    wireTransfer(
                        debitAccount = accountToDebit,
                        creditAccount = accountToCredit,
                        demobank = demobankName,
                        subject = it.camtData.getSingletonSubject(),
                        amount = "${demobank.config.currency}:$convertedAmount"
                    )
                    // Nexus enqueues the transactions such that the index increases.
                    // If Sandbox crashes here, it'll ask again using the last successful
                    // index as the start parameter.  Being this an exclusive bound, only
                    // transactions later than it are expected.
                    debitBankAccount.refresh()
                    debitBankAccount.lastFiatFetch = it.index
                }
            }
        }
    }
}

/* DB query helper that fetches the latest cash-out operations that were
  confirmed in the regional currency.  A cash-out operation is 'confirmed'
  when the bank account pointed by the parameter 'bankAccountLabel' gets
  one incoming payment.

  The List return type (instead of SizedIterable) lets the caller NOT open
  a transaction block to access the values -- although some operations _on
  the values_ may be forbidden.
*/
fun getUnsubmittedTransactions(bankAccountLabel: String): List<BankAccountTransactionEntity> {
    return transaction {
        val bankAccount = getBankAccountFromLabel(bankAccountLabel)
        val lowerExclusiveLimit = bankAccount.lastFiatSubmission?.id?.value ?: 0
        BankAccountTransactionEntity.find {
            BankAccountTransactionsTable.id greater lowerExclusiveLimit and (
                BankAccountTransactionsTable.direction eq "CRDT"
            ) and (BankAccountTransactionsTable.account eq bankAccount.id)
        }.sortedBy { it.id }.map { it }
        /*  The latest payment must occupy the highest index,
        to reliably update the 'lastFiatSubmission' column of
        the bank account. */
    }
}

// CASH-OUT SIDE.

/**
 * This function listens for regio-incoming events (LIBEUFIN_REGIO_TX)
 * on the 'watchedBankAccount' and submits the related cash-out payment
 * to Nexus.  The fiat payment will then take place ENTIRELY on Nexus'
 * responsibility.
 */
suspend fun cashoutMonitor(
    httpClient: HttpClient,
    watchedBankAccount: String = "admin",
    demobankName: String = "default", // used to get config values.
    dbEventTimeout: Long = 0 // 0 waits forever.
) {
    ensureDisabledRedirects(httpClient)
    // Register for a REGIO_TX event.
    val eventChannel = buildChannelName(
        NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
        watchedBankAccount
    )
    val objectMapper = jacksonObjectMapper()
    val demobank = getDemobank(demobankName)
    val bankAccount = getBankAccountFromLabel(watchedBankAccount)
    val config = demobank?.config ?: throw internalServerError(
        "Demobank '$demobankName' has no configuration."
    )
    /**
     * The monitor needs the cash-out currency to correctly POST
     * payment initiations at Nexus.  Recall: Nexus bank accounts
     * do not mandate any particular currency, as they serve as mere
     * bridges to the backing bank.  And: a backing bank may have
     * multiple currencies, or the backing bank may not explicitly
     * specify any currencies to be _the_ currency of the backed
     * bank account.
     */
    if (config.cashoutCurrency == null) {
        logger.error("Config lacks cash-out currency.")
        exitProcess(1)
    }
    val nexusBaseUrl = getConfigValueOrThrow(config::nexusBaseUrl)
    val usernameAtNexus = getConfigValueOrThrow(config::usernameAtNexus)
    val passwordAtNexus = getConfigValueOrThrow(config::passwordAtNexus)
    val paymentInitEndpoint = nexusBaseUrl.run {
        var nexusBaseUrlFromConfig = this
        if (!nexusBaseUrlFromConfig.endsWith('/'))
            nexusBaseUrlFromConfig += '/'
        /**
         * WARNING: Nexus gives the possibility to have bank account names
         * DIFFERENT from their owner's username.  Sandbox however MUST have
         * its Nexus bank account named THE SAME as its username.
         */
        nexusBaseUrlFromConfig + "bank-accounts/$usernameAtNexus/payment-initiations"
    }
    while (true) {
        val listenHandle = PostgresListenHandle(eventChannel)
        // pessimistically LISTEN
        listenHandle.postgresListen()
        // but optimistically check for data, case some
        // arrived _before_ the LISTEN.
        var newTxs = getUnsubmittedTransactions(watchedBankAccount)
        // Data found, UNLISTEN.
        if (newTxs.isNotEmpty()) {
            logger.debug("Found cash-out's without waiting any DB event.")
            listenHandle.postgresUnlisten()
        }
        // Data not found, wait.
        else {
            logger.debug("Need to wait a DB event for new cash-out's")
            val isNotificationArrived = listenHandle.waitOnIODispatchers(dbEventTimeout)
            if (isNotificationArrived && listenHandle.receivedPayload == "CRDT")
                newTxs = getUnsubmittedTransactions(watchedBankAccount)
        }
        if (newTxs.isEmpty()) {
            logger.debug("DB event timeout expired")
            continue
        }
        logger.debug("POSTing new cash-out's")
        newTxs.forEach {
            logger.debug("POSTing cash-out '${it.subject}' to $paymentInitEndpoint")
            val body = object {
                /**
                 * This field is UID of the request _as assigned by the
                 * client_.  That helps to reconcile transactions or lets
                 * Nexus implement idempotency.  It will NOT identify the created
                 * resource at the server side.  The ID of the created resource is
                 * assigned _by Nexus_ and communicated in the (successful) response.
                 */
                val uid = it.accountServicerReference
                val iban = it.creditorIban
                val bic = it.creditorBic
                val amount = "${config.cashoutCurrency}:${it.amount}"
                val subject = it.subject
                val name = it.creditorName
            }
            val resp = try {
                httpClient.post(paymentInitEndpoint) {
                    expectSuccess = false // Avoids excepting on !2xx
                    basicAuth(usernameAtNexus, passwordAtNexus)
                    contentType(ContentType.Application.Json)
                    setBody(objectMapper.writeValueAsString(body))
                }
            }
            // Hard-error, response did not even arrive.
            catch (e: Exception) {
                logger.error("Cash-out monitor could not reach Nexus.  Pause and retry")
                logger.error(e.message)
                /**
                 * Explicit delaying because the monitor normally
                 * waits on DB events, and this retry likely won't
                 * wait on a DB event.
                 */
                delay(2000)
                return@forEach
            }
            // Server fault.  Pause and retry.
            if (resp.status.value.toString().startsWith('5')) {
                logger.error("Cash-out monitor POSTed to a failing Nexus.  Pause and retry")
                logger.error("Server responded: ${resp.bodyAsText()}")
                /**
                 * Explicit delaying because the monitor normally
                 * waits on DB events, and this retry likely won't
                 * wait on a DB event.
                 */
                delay(2000L)
                return@forEach
            }
            // Client fault, fail Sandbox.
            if (resp.status.value.toString().startsWith('4')) {
                logger.error("Cash-out monitor failed at POSTing to Nexus.")
                logger.error("Nexus responded: ${resp.bodyAsText()}")
                throw CashoutClientError()
            }
            // Expecting 200 OK.  What if 3xx?
            if (resp.status.value != HttpStatusCode.OK.value) {
                logger.error("Cash-out monitor, unhandled response status: ${resp.status.value}.")
                throw CashoutClientError()
            }
            // Successful case, mark the wire transfer as submitted,
            // and advance the pointer to the last submitted payment.
            val responseBody = resp.bodyAsText()
            transaction {
                CashoutSubmissionEntity.new {
                    localTransaction = it.id
                    submissionTime = resp.responseTime.timestamp
                    /**
                     * The following block associates the submitted payment
                     * to the UID that Nexus assigned to it.  It is currently not
                     * used in Sandbox, but might help for reconciliation.
                     */
                    if (responseBody.isNotEmpty())
                        maybeNexusResposnse = responseBody
                }
                // Advancing the 'last submitted bookmark', to avoid
                // handling the same transaction multiple times.
                bankAccount.lastFiatSubmission = it
            }
        }
    }
}