diff options
author | MS <ms@taler.net> | 2023-04-21 20:14:18 +0200 |
---|---|---|
committer | MS <ms@taler.net> | 2023-04-21 20:14:18 +0200 |
commit | 8611d22879eb802bc69077f77543004adc4970f7 (patch) | |
tree | 62cf67ba84376eb6cc3f624c714927869d88bc2c /sandbox/src/main/kotlin/tech/libeufin | |
parent | bf7d97746fb4448bbff63a109c529dc22ac136df (diff) | |
download | libeufin-8611d22879eb802bc69077f77543004adc4970f7.tar.gz libeufin-8611d22879eb802bc69077f77543004adc4970f7.tar.bz2 libeufin-8611d22879eb802bc69077f77543004adc4970f7.zip |
Conversion service.
Implementing the buy-in side.
Diffstat (limited to 'sandbox/src/main/kotlin/tech/libeufin')
-rw-r--r-- | sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt | 221 | ||||
-rw-r--r-- | sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt | 11 |
2 files changed, 184 insertions, 48 deletions
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt index c52d74dd..a4cfccbb 100644 --- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt +++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt @@ -1,16 +1,22 @@ package tech.libeufin.sandbox +import CamtBankAccountEntry +import com.fasterxml.jackson.databind.ObjectMapper import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import com.fasterxml.jackson.module.kotlin.jsonMapper import io.ktor.client.* import io.ktor.client.plugins.* import io.ktor.client.request.* import io.ktor.client.statement.* import io.ktor.http.* +import io.ktor.utils.io.jvm.javaio.* import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.transactions.transaction import tech.libeufin.util.* +import java.math.BigDecimal +import kotlin.system.exitProcess /** * This file contains the logic for downloading/submitting incoming/outgoing @@ -27,11 +33,13 @@ import tech.libeufin.util.* * 2. The time to submit a new payment is as soon as "admin" receives one * incoming regional payment. * 3. At this time, Nexus does NOT offer long polling when it serves the - * transactions via its JSON API. + * transactions via its JSON API. => Fixed. * 4. At this time, Nexus does NOT offer any filter when it serves the - * transactions via its JSON API. + * transactions via its JSON API. => Can be fixed by using the TWG. */ +// DEFINITIONS AND HELPERS + /** * Timeout the HTTP client waits for the server to respond, * after the request is made. @@ -46,6 +54,17 @@ val waitTimeout = 30000L val newIterationTimeout = 2000L /** + * Response format of Nexus GET /transactions. + */ +data class TransactionItem( + val index: String, + val camtData: CamtBankAccountEntry +) +data class NexusTransactions( + val transactions: List<TransactionItem> +) + +/** * Executes the 'block' function every 'loopNewReqMs' milliseconds. * Does not exit/fail the process upon exceptions - just logs them. */ @@ -67,20 +86,125 @@ fun downloadLoop(block: () -> Unit) { } } +// BUY-IN SIDE. + /** - * This function downloads the incoming fiat transactions from Nexus, - * stores them into the database and signals their arrival (LIBEUFIN_FIAT_INCOMING) - * to allow crediting the "admin" account. + * Applies the buy-in ratio and fees to the fiat amount + * that came from Nexus. The result is the regional amount + * that will be wired to the exchange Sandbox account. */ -// fetchTransactions() - +private fun applyBuyinRatioAndFees( + amount: BigDecimal, + ratioAndFees: RatioAndFees +): BigDecimal = + ((amount * ratiosAndFees.buy_at_ratio.toBigDecimal()) + - ratiosAndFees.buy_in_fee.toBigDecimal()).roundToTwoDigits() /** - * This function listens for fiat-incoming events (LIBEUFIN_FIAT_INCOMING) - * and credits the "admin" account as a reaction. Lastly, the Nexus instance - * wired to Sandbox will pick the new payment and serve it via its TWG, but - * this is OUT of the Sandbox scope. + * This function downloads the incoming fiat transactions from Nexus, + * stores them into the database and triggers the related wire transfer + * to the Taler exchange (to be specified in 'accountToCredit'). In case + * of errors, it pauses and retries when the server fails, but _fails_ when + * the client does. */ -// creditAdmin() +fun buyinMonitor( + demobankName: String, // used to get config values. + client: HttpClient, + accountToCredit: String, + accountToDebit: String = "admin" +) { + val demobank = ensureDemobank(demobankName) + val nexusBaseUrl = getConfigValueOrThrow(demobank.config::nexusBaseUrl) + val usernameAtNexus = getConfigValueOrThrow(demobank.config::usernameAtNexus) + val passwordAtNexus = getConfigValueOrThrow(demobank.config::passwordAtNexus) + val endpoint = "bank-accounts/$usernameAtNexus/transactions" + val uriWithoutStart = joinUrl(nexusBaseUrl, endpoint) + "?long_poll_ms=$waitTimeout" + + // downloadLoop does already try-catch (without failing the process). + downloadLoop { + val debitBankAccount = getBankAccountFromLabel(accountToDebit) + val uriWithStart = "$uriWithoutStart&start=${debitBankAccount.lastFiatFetch}" + runBlocking { + // Maybe get new fiat transactions. + logger.debug("GETting fiat transactions from: ${uriWithStart}") + val resp = client.get(uriWithStart) { basicAuth(usernameAtNexus, passwordAtNexus) } + // The server failed, pause and try again + if (resp.status.value.toString().startsWith('5')) { + logger.error("Buy-in monitor caught a failing to Nexus. Pause and retry.") + logger.error("Nexus responded: ${resp.bodyAsText()}") + delay(2000L) + return@runBlocking + } + // The client failed, fail the process. + if (resp.status.value.toString().startsWith('4')) { + logger.error("Buy-in monitor failed at GETting to Nexus. Fail Sandbox.") + logger.error("Nexus responded: ${resp.bodyAsText()}") + exitProcess(1) + } + // Expect 200 OK. What if 3xx? + if (resp.status.value != HttpStatusCode.OK.value) { + logger.error("Unhandled response status ${resp.status.value}, failing Sandbox") + exitProcess(1) + } + // Nexus responded 200 OK, analyzing the result. + /** + * Wire to "admin" if the subject is a public key, or do + * nothing otherwise. + */ + val respObj = jacksonObjectMapper().readValue( + resp.bodyAsText(), + NexusTransactions::class.java + ) // errors are logged by the caller (without failing). + respObj.transactions.forEach { + /** + * If the payment doesn't contain a reserve public key, + * continue the iteration with the new payment. + */ + if (extractReservePubFromSubject(it.camtData.getSingletonSubject()) == null) + return@forEach + /** + * The payment had a reserve public key in the subject, wire it to + * the exchange. NOTE: this ensures that all the payments that the + * exchange gets will NOT trigger any reimbursement, because they have + * a valid reserve public key. Reimbursements would in fact introduce + * significant friction, because they need to target _fiat_ bank accounts + * (the customers'), whereas the entity that _now_ pays the exchange is + * "admin", which lives in the regional circuit. + */ + // Extracts the amount and checks it's at most two fractional digits. + val maybeValidAmount = it.camtData.amount.value + if (!validatePlainAmount(maybeValidAmount)) { + logger.error("Nexus gave one amount with invalid fractional digits: $maybeValidAmount." + + " The transaction has index ${it.index}") + // Advancing the last fetched pointer, to avoid GETting + // this invalid payment again. + transaction { + debitBankAccount.refresh() + debitBankAccount.lastFiatFetch = it.index + } + } + val convertedAmount = applyBuyinRatioAndFees( + maybeValidAmount.toBigDecimal(), + ratiosAndFees + ) + transaction { + wireTransfer( + debitAccount = accountToDebit, + creditAccount = accountToCredit, + demobank = demobankName, + subject = it.camtData.getSingletonSubject(), + amount = "${demobank.config.currency}:$convertedAmount" + ) + // Nexus enqueues the transactions such that the index increases. + // If Sandbox crashes here, it'll ask again using the last successful + // index as the start parameter. Being this an exclusive bound, only + // transactions later than it are expected. + debitBankAccount.refresh() + debitBankAccount.lastFiatFetch = it.index + } + } + } + } +} // DB query helper. The List return type (instead of SizedIterable) lets // the caller NOT open a transaction block to access the values -- although @@ -100,6 +224,8 @@ private fun getUnsubmittedTransactions(bankAccountLabel: String): List<BankAccou } } +// CASH-OUT SIDE. + /** * This function listens for regio-incoming events (LIBEUFIN_REGIO_TX) * on the 'watchedBankAccount' and submits the related cash-out payment @@ -109,7 +235,8 @@ private fun getUnsubmittedTransactions(bankAccountLabel: String): List<BankAccou suspend fun cashoutMonitor( httpClient: HttpClient, watchedBankAccount: String = "admin", - demobankName: String = "default" // used to get config values. + demobankName: String = "default", // used to get config values. + dbEventTimeout: Long = 0 // 0 waits forever. ) { // Register for a REGIO_TX event. val eventChannel = buildChannelName( @@ -132,15 +259,11 @@ suspend fun cashoutMonitor( /** * WARNING: Nexus gives the possibility to have bank account names * DIFFERENT from their owner's username. Sandbox however MUST have - * its Nexus bank account named THE SAME as its username (until the - * config will allow to change). + * its Nexus bank account named THE SAME as its username. */ ret + "bank-accounts/$usernameAtNexus/payment-initiations" } while (true) { - // delaying here avoids to delay in multiple places (errors, - // lack of action, success) - delay(2000) val listenHandle = PostgresListenHandle(eventChannel) // pessimistically LISTEN listenHandle.postgresListen() @@ -152,11 +275,7 @@ suspend fun cashoutMonitor( listenHandle.postgresUnlisten() // Data not found, wait. else { - // OK to block, because the next event is going to - // be _this_ one. The caller should however execute - // this whole logic in a thread other than the main - // HTTP server. - val isNotificationArrived = listenHandle.postgresGetNotifications(waitTimeout) + val isNotificationArrived = listenHandle.waitOnIODispatchers(dbEventTimeout) if (isNotificationArrived && listenHandle.receivedPayload == "CRDT") newTxs = getUnsubmittedTransactions(watchedBankAccount) } @@ -188,35 +307,41 @@ suspend fun cashoutMonitor( } // Hard-error, response did not even arrive. catch (e: Exception) { + logger.error("Cash-out monitor could not reach Nexus. Pause and retry") logger.error(e.message) - // mark as failed and proceed to the next one. - transaction { - CashoutSubmissionEntity.new { - this.localTransaction = it.id - this.hasErrors = true - } - bankAccount.lastFiatSubmission = it - } + delay(2000) return@forEach } - // Handle the non 2xx error case. Here we try - // to store the response from Nexus. + // Server fault. Pause and retry. + if (resp.status.value.toString().startsWith('5')) { + logger.error("Cash-out monitor POSTed to a failing Nexus. Pause and retry") + logger.error(resp.bodyAsText()) + delay(2000L) + } + // Client fault, fail Sandbox. + if (resp.status.value.toString().startsWith('4')) { + logger.error("Cash-out monitor failed at POSTing to Nexus. Fail Sandbox") + logger.error("Nexus responded: ${resp.bodyAsText()}") + exitProcess(1) + } + // Expecting 200 OK. What if 3xx? if (resp.status.value != HttpStatusCode.OK.value) { - val maybeResponseBody = resp.bodyAsText() - logger.error( - "Fiat submission response was: $maybeResponseBody," + - " status: ${resp.status.value}" - ) - transaction { - CashoutSubmissionEntity.new { - localTransaction = it.id - this.hasErrors = true - if (maybeResponseBody.length > 0) - this.maybeNexusResposnse = maybeResponseBody + logger.error("Cash-out monitor, unhandled response status: ${resp.status.value}. Fail Sandbox") + exitProcess(1) + + // Previous versions use to store the faulty transaction + // and continue the execution. The block below shows how + // to do that. + + /*transaction { + CashoutSubmissionEntity.new { + localTransaction = it.id + this.hasErrors = true + if (maybeResponseBody.isNotEmpty()) + this.maybeNexusResposnse = maybeResponseBody } - bankAccount.lastFiatSubmission = it - } - return@forEach + bankAccount.lastFiatSubmission = it + }*/ } // Successful case, mark the wire transfer as submitted, // and advance the pointer to the last submitted payment. @@ -231,7 +356,7 @@ suspend fun cashoutMonitor( // unique identifier _as assigned by Nexus_. Not // currently used by Sandbox, but may help to resolve // disputes. - if (responseBody.length > 0) + if (responseBody.isNotEmpty()) maybeNexusResposnse = responseBody } // Advancing the 'last submitted bookmark', to avoid diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt index c8a1df18..ca83d31a 100644 --- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt +++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt @@ -515,6 +515,16 @@ object BankAccountsTable : IntIdTable() { * cash-out operation. */ val lastFiatSubmission = reference("lastFiatSubmission", BankAccountTransactionsTable).nullable() + + /** + * Tracks the last fiat payment that was read from Nexus. This tracker + * gets updated ONLY IF the exchange gets successfully paid with the related + * amount in the regional currency. NOTE: in case of disputes, the customer + * will provide the date of a problematic withdrawal, and the regional currency + * administrator should check into the "admin" (regional) outgoing history by + * using such date as filter. + */ + val lastFiatFetch = text("lastFiatFetch").default("0") } class BankAccountEntity(id: EntityID<Int>) : IntEntity(id) { @@ -528,6 +538,7 @@ class BankAccountEntity(id: EntityID<Int>) : IntEntity(id) { var demoBank by DemobankConfigEntity referencedOn BankAccountsTable.demoBank var lastTransaction by BankAccountTransactionEntity optionalReferencedOn BankAccountsTable.lastTransaction var lastFiatSubmission by BankAccountTransactionEntity optionalReferencedOn BankAccountsTable.lastFiatSubmission + var lastFiatFetch by BankAccountsTable.lastFiatFetch } object BankAccountStatementsTable : IntIdTable() { |