summaryrefslogtreecommitdiff
path: root/sandbox/src/main/kotlin/tech/libeufin
diff options
context:
space:
mode:
authorMS <ms@taler.net>2023-04-21 20:14:18 +0200
committerMS <ms@taler.net>2023-04-21 20:14:18 +0200
commit8611d22879eb802bc69077f77543004adc4970f7 (patch)
tree62cf67ba84376eb6cc3f624c714927869d88bc2c /sandbox/src/main/kotlin/tech/libeufin
parentbf7d97746fb4448bbff63a109c529dc22ac136df (diff)
downloadlibeufin-8611d22879eb802bc69077f77543004adc4970f7.tar.gz
libeufin-8611d22879eb802bc69077f77543004adc4970f7.tar.bz2
libeufin-8611d22879eb802bc69077f77543004adc4970f7.zip
Conversion service.
Implementing the buy-in side.
Diffstat (limited to 'sandbox/src/main/kotlin/tech/libeufin')
-rw-r--r--sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt221
-rw-r--r--sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt11
2 files changed, 184 insertions, 48 deletions
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
index c52d74dd..a4cfccbb 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
@@ -1,16 +1,22 @@
package tech.libeufin.sandbox
+import CamtBankAccountEntry
+import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import com.fasterxml.jackson.module.kotlin.jsonMapper
import io.ktor.client.*
import io.ktor.client.plugins.*
import io.ktor.client.request.*
import io.ktor.client.statement.*
import io.ktor.http.*
+import io.ktor.utils.io.jvm.javaio.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.transactions.transaction
import tech.libeufin.util.*
+import java.math.BigDecimal
+import kotlin.system.exitProcess
/**
* This file contains the logic for downloading/submitting incoming/outgoing
@@ -27,11 +33,13 @@ import tech.libeufin.util.*
* 2. The time to submit a new payment is as soon as "admin" receives one
* incoming regional payment.
* 3. At this time, Nexus does NOT offer long polling when it serves the
- * transactions via its JSON API.
+ * transactions via its JSON API. => Fixed.
* 4. At this time, Nexus does NOT offer any filter when it serves the
- * transactions via its JSON API.
+ * transactions via its JSON API. => Can be fixed by using the TWG.
*/
+// DEFINITIONS AND HELPERS
+
/**
* Timeout the HTTP client waits for the server to respond,
* after the request is made.
@@ -46,6 +54,17 @@ val waitTimeout = 30000L
val newIterationTimeout = 2000L
/**
+ * Response format of Nexus GET /transactions.
+ */
+data class TransactionItem(
+ val index: String,
+ val camtData: CamtBankAccountEntry
+)
+data class NexusTransactions(
+ val transactions: List<TransactionItem>
+)
+
+/**
* Executes the 'block' function every 'loopNewReqMs' milliseconds.
* Does not exit/fail the process upon exceptions - just logs them.
*/
@@ -67,20 +86,125 @@ fun downloadLoop(block: () -> Unit) {
}
}
+// BUY-IN SIDE.
+
/**
- * This function downloads the incoming fiat transactions from Nexus,
- * stores them into the database and signals their arrival (LIBEUFIN_FIAT_INCOMING)
- * to allow crediting the "admin" account.
+ * Applies the buy-in ratio and fees to the fiat amount
+ * that came from Nexus. The result is the regional amount
+ * that will be wired to the exchange Sandbox account.
*/
-// fetchTransactions()
-
+private fun applyBuyinRatioAndFees(
+ amount: BigDecimal,
+ ratioAndFees: RatioAndFees
+): BigDecimal =
+ ((amount * ratiosAndFees.buy_at_ratio.toBigDecimal())
+ - ratiosAndFees.buy_in_fee.toBigDecimal()).roundToTwoDigits()
/**
- * This function listens for fiat-incoming events (LIBEUFIN_FIAT_INCOMING)
- * and credits the "admin" account as a reaction. Lastly, the Nexus instance
- * wired to Sandbox will pick the new payment and serve it via its TWG, but
- * this is OUT of the Sandbox scope.
+ * This function downloads the incoming fiat transactions from Nexus,
+ * stores them into the database and triggers the related wire transfer
+ * to the Taler exchange (to be specified in 'accountToCredit'). In case
+ * of errors, it pauses and retries when the server fails, but _fails_ when
+ * the client does.
*/
-// creditAdmin()
+fun buyinMonitor(
+ demobankName: String, // used to get config values.
+ client: HttpClient,
+ accountToCredit: String,
+ accountToDebit: String = "admin"
+) {
+ val demobank = ensureDemobank(demobankName)
+ val nexusBaseUrl = getConfigValueOrThrow(demobank.config::nexusBaseUrl)
+ val usernameAtNexus = getConfigValueOrThrow(demobank.config::usernameAtNexus)
+ val passwordAtNexus = getConfigValueOrThrow(demobank.config::passwordAtNexus)
+ val endpoint = "bank-accounts/$usernameAtNexus/transactions"
+ val uriWithoutStart = joinUrl(nexusBaseUrl, endpoint) + "?long_poll_ms=$waitTimeout"
+
+ // downloadLoop does already try-catch (without failing the process).
+ downloadLoop {
+ val debitBankAccount = getBankAccountFromLabel(accountToDebit)
+ val uriWithStart = "$uriWithoutStart&start=${debitBankAccount.lastFiatFetch}"
+ runBlocking {
+ // Maybe get new fiat transactions.
+ logger.debug("GETting fiat transactions from: ${uriWithStart}")
+ val resp = client.get(uriWithStart) { basicAuth(usernameAtNexus, passwordAtNexus) }
+ // The server failed, pause and try again
+ if (resp.status.value.toString().startsWith('5')) {
+ logger.error("Buy-in monitor caught a failing to Nexus. Pause and retry.")
+ logger.error("Nexus responded: ${resp.bodyAsText()}")
+ delay(2000L)
+ return@runBlocking
+ }
+ // The client failed, fail the process.
+ if (resp.status.value.toString().startsWith('4')) {
+ logger.error("Buy-in monitor failed at GETting to Nexus. Fail Sandbox.")
+ logger.error("Nexus responded: ${resp.bodyAsText()}")
+ exitProcess(1)
+ }
+ // Expect 200 OK. What if 3xx?
+ if (resp.status.value != HttpStatusCode.OK.value) {
+ logger.error("Unhandled response status ${resp.status.value}, failing Sandbox")
+ exitProcess(1)
+ }
+ // Nexus responded 200 OK, analyzing the result.
+ /**
+ * Wire to "admin" if the subject is a public key, or do
+ * nothing otherwise.
+ */
+ val respObj = jacksonObjectMapper().readValue(
+ resp.bodyAsText(),
+ NexusTransactions::class.java
+ ) // errors are logged by the caller (without failing).
+ respObj.transactions.forEach {
+ /**
+ * If the payment doesn't contain a reserve public key,
+ * continue the iteration with the new payment.
+ */
+ if (extractReservePubFromSubject(it.camtData.getSingletonSubject()) == null)
+ return@forEach
+ /**
+ * The payment had a reserve public key in the subject, wire it to
+ * the exchange. NOTE: this ensures that all the payments that the
+ * exchange gets will NOT trigger any reimbursement, because they have
+ * a valid reserve public key. Reimbursements would in fact introduce
+ * significant friction, because they need to target _fiat_ bank accounts
+ * (the customers'), whereas the entity that _now_ pays the exchange is
+ * "admin", which lives in the regional circuit.
+ */
+ // Extracts the amount and checks it's at most two fractional digits.
+ val maybeValidAmount = it.camtData.amount.value
+ if (!validatePlainAmount(maybeValidAmount)) {
+ logger.error("Nexus gave one amount with invalid fractional digits: $maybeValidAmount." +
+ " The transaction has index ${it.index}")
+ // Advancing the last fetched pointer, to avoid GETting
+ // this invalid payment again.
+ transaction {
+ debitBankAccount.refresh()
+ debitBankAccount.lastFiatFetch = it.index
+ }
+ }
+ val convertedAmount = applyBuyinRatioAndFees(
+ maybeValidAmount.toBigDecimal(),
+ ratiosAndFees
+ )
+ transaction {
+ wireTransfer(
+ debitAccount = accountToDebit,
+ creditAccount = accountToCredit,
+ demobank = demobankName,
+ subject = it.camtData.getSingletonSubject(),
+ amount = "${demobank.config.currency}:$convertedAmount"
+ )
+ // Nexus enqueues the transactions such that the index increases.
+ // If Sandbox crashes here, it'll ask again using the last successful
+ // index as the start parameter. Being this an exclusive bound, only
+ // transactions later than it are expected.
+ debitBankAccount.refresh()
+ debitBankAccount.lastFiatFetch = it.index
+ }
+ }
+ }
+ }
+}
// DB query helper. The List return type (instead of SizedIterable) lets
// the caller NOT open a transaction block to access the values -- although
@@ -100,6 +224,8 @@ private fun getUnsubmittedTransactions(bankAccountLabel: String): List<BankAccou
}
}
+// CASH-OUT SIDE.
+
/**
* This function listens for regio-incoming events (LIBEUFIN_REGIO_TX)
* on the 'watchedBankAccount' and submits the related cash-out payment
@@ -109,7 +235,8 @@ private fun getUnsubmittedTransactions(bankAccountLabel: String): List<BankAccou
suspend fun cashoutMonitor(
httpClient: HttpClient,
watchedBankAccount: String = "admin",
- demobankName: String = "default" // used to get config values.
+ demobankName: String = "default", // used to get config values.
+ dbEventTimeout: Long = 0 // 0 waits forever.
) {
// Register for a REGIO_TX event.
val eventChannel = buildChannelName(
@@ -132,15 +259,11 @@ suspend fun cashoutMonitor(
/**
* WARNING: Nexus gives the possibility to have bank account names
* DIFFERENT from their owner's username. Sandbox however MUST have
- * its Nexus bank account named THE SAME as its username (until the
- * config will allow to change).
+ * its Nexus bank account named THE SAME as its username.
*/
ret + "bank-accounts/$usernameAtNexus/payment-initiations"
}
while (true) {
- // delaying here avoids to delay in multiple places (errors,
- // lack of action, success)
- delay(2000)
val listenHandle = PostgresListenHandle(eventChannel)
// pessimistically LISTEN
listenHandle.postgresListen()
@@ -152,11 +275,7 @@ suspend fun cashoutMonitor(
listenHandle.postgresUnlisten()
// Data not found, wait.
else {
- // OK to block, because the next event is going to
- // be _this_ one. The caller should however execute
- // this whole logic in a thread other than the main
- // HTTP server.
- val isNotificationArrived = listenHandle.postgresGetNotifications(waitTimeout)
+ val isNotificationArrived = listenHandle.waitOnIODispatchers(dbEventTimeout)
if (isNotificationArrived && listenHandle.receivedPayload == "CRDT")
newTxs = getUnsubmittedTransactions(watchedBankAccount)
}
@@ -188,35 +307,41 @@ suspend fun cashoutMonitor(
}
// Hard-error, response did not even arrive.
catch (e: Exception) {
+ logger.error("Cash-out monitor could not reach Nexus. Pause and retry")
logger.error(e.message)
- // mark as failed and proceed to the next one.
- transaction {
- CashoutSubmissionEntity.new {
- this.localTransaction = it.id
- this.hasErrors = true
- }
- bankAccount.lastFiatSubmission = it
- }
+ delay(2000)
return@forEach
}
- // Handle the non 2xx error case. Here we try
- // to store the response from Nexus.
+ // Server fault. Pause and retry.
+ if (resp.status.value.toString().startsWith('5')) {
+ logger.error("Cash-out monitor POSTed to a failing Nexus. Pause and retry")
+ logger.error(resp.bodyAsText())
+ delay(2000L)
+ }
+ // Client fault, fail Sandbox.
+ if (resp.status.value.toString().startsWith('4')) {
+ logger.error("Cash-out monitor failed at POSTing to Nexus. Fail Sandbox")
+ logger.error("Nexus responded: ${resp.bodyAsText()}")
+ exitProcess(1)
+ }
+ // Expecting 200 OK. What if 3xx?
if (resp.status.value != HttpStatusCode.OK.value) {
- val maybeResponseBody = resp.bodyAsText()
- logger.error(
- "Fiat submission response was: $maybeResponseBody," +
- " status: ${resp.status.value}"
- )
- transaction {
- CashoutSubmissionEntity.new {
- localTransaction = it.id
- this.hasErrors = true
- if (maybeResponseBody.length > 0)
- this.maybeNexusResposnse = maybeResponseBody
+ logger.error("Cash-out monitor, unhandled response status: ${resp.status.value}. Fail Sandbox")
+ exitProcess(1)
+
+ // Previous versions use to store the faulty transaction
+ // and continue the execution. The block below shows how
+ // to do that.
+
+ /*transaction {
+ CashoutSubmissionEntity.new {
+ localTransaction = it.id
+ this.hasErrors = true
+ if (maybeResponseBody.isNotEmpty())
+ this.maybeNexusResposnse = maybeResponseBody
}
- bankAccount.lastFiatSubmission = it
- }
- return@forEach
+ bankAccount.lastFiatSubmission = it
+ }*/
}
// Successful case, mark the wire transfer as submitted,
// and advance the pointer to the last submitted payment.
@@ -231,7 +356,7 @@ suspend fun cashoutMonitor(
// unique identifier _as assigned by Nexus_. Not
// currently used by Sandbox, but may help to resolve
// disputes.
- if (responseBody.length > 0)
+ if (responseBody.isNotEmpty())
maybeNexusResposnse = responseBody
}
// Advancing the 'last submitted bookmark', to avoid
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
index c8a1df18..ca83d31a 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
@@ -515,6 +515,16 @@ object BankAccountsTable : IntIdTable() {
* cash-out operation.
*/
val lastFiatSubmission = reference("lastFiatSubmission", BankAccountTransactionsTable).nullable()
+
+ /**
+ * Tracks the last fiat payment that was read from Nexus. This tracker
+ * gets updated ONLY IF the exchange gets successfully paid with the related
+ * amount in the regional currency. NOTE: in case of disputes, the customer
+ * will provide the date of a problematic withdrawal, and the regional currency
+ * administrator should check into the "admin" (regional) outgoing history by
+ * using such date as filter.
+ */
+ val lastFiatFetch = text("lastFiatFetch").default("0")
}
class BankAccountEntity(id: EntityID<Int>) : IntEntity(id) {
@@ -528,6 +538,7 @@ class BankAccountEntity(id: EntityID<Int>) : IntEntity(id) {
var demoBank by DemobankConfigEntity referencedOn BankAccountsTable.demoBank
var lastTransaction by BankAccountTransactionEntity optionalReferencedOn BankAccountsTable.lastTransaction
var lastFiatSubmission by BankAccountTransactionEntity optionalReferencedOn BankAccountsTable.lastFiatSubmission
+ var lastFiatFetch by BankAccountsTable.lastFiatFetch
}
object BankAccountStatementsTable : IntIdTable() {