diff options
author | MS <ms@taler.net> | 2023-04-16 09:19:46 +0200 |
---|---|---|
committer | MS <ms@taler.net> | 2023-04-16 10:13:54 +0200 |
commit | d7cecd35a5f7ab3ab2491e4c4ad4f07e041e9944 (patch) | |
tree | 56ab7ef93a92a0f3f743c53c10ec56db9425f37b /sandbox/src/main/kotlin/tech/libeufin | |
parent | 1602c0b6cd3cad8d8b8f14d68f509842e72146b6 (diff) | |
download | libeufin-d7cecd35a5f7ab3ab2491e4c4ad4f07e041e9944.tar.gz libeufin-d7cecd35a5f7ab3ab2491e4c4ad4f07e041e9944.tar.bz2 libeufin-d7cecd35a5f7ab3ab2491e4c4ad4f07e041e9944.zip |
Conversion service.
Implementing the cash-out monitor. The monitor watches one
particular bank account (the admin's by default) and submits
a fiat payment initiation to Nexus upon every new incoming
transaction.
Also implementing idempotence for payment initiations at Nexus.
This helps in case the cash-out monitor fails at keeping track
of the submitted payments and accidentally submits multiple times
the same payment.
Diffstat (limited to 'sandbox/src/main/kotlin/tech/libeufin')
3 files changed, 222 insertions, 7 deletions
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt index 1d256beb..4045d10e 100644 --- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt +++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt @@ -1,7 +1,17 @@ package tech.libeufin.sandbox +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper +import io.ktor.client.* +import io.ktor.client.plugins.* +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.http.* +import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay import kotlinx.coroutines.runBlocking +import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.transactions.transaction +import tech.libeufin.util.* /** * This file contains the logic for downloading/submitting incoming/outgoing @@ -23,9 +33,18 @@ import kotlinx.coroutines.runBlocking * transactions via its JSON API. */ -// Temporarily hard-coded. According to fiat times, these values could be WAY higher. -val longPollMs = 30000L // 30s long-polling. -val loopNewReqMs = 2000L // 2s for the next request. +/** + * Timeout the HTTP client waits for the server to respond, + * after the request is made. + */ +val waitTimeout = 30000L + +/** + * Time to wait before HTTP requesting again to the server. + * This helps to avoid tight cycles in case the server responds + * quickly or the client doesn't long-poll. + */ +val newIterationTimeout = 2000L /** * Executes the 'block' function every 'loopNewReqMs' milliseconds. @@ -44,7 +63,7 @@ fun downloadLoop(block: () -> Unit) { */ logger.error("Sandbox fiat-incoming monitor excepted: ${e.message}") } - delay(loopNewReqMs) + delay(newIterationTimeout) } } } @@ -64,9 +83,161 @@ fun downloadLoop(block: () -> Unit) { */ // creditAdmin() +// DB query helper. The List return type (instead of SizedIterable) lets +// the caller NOT open a transaction block to access the values -- although +// some operations _on the values_ may be forbidden. +private fun getUnsubmittedTransactions(bankAccountLabel: String): List<BankAccountTransactionEntity> { + return transaction { + val bankAccount = getBankAccountFromLabel(bankAccountLabel) + val lowerExclusiveLimit = bankAccount.lastFiatSubmission?.id?.value ?: 0 + BankAccountTransactionEntity.find { + BankAccountTransactionsTable.id greater lowerExclusiveLimit and ( + BankAccountTransactionsTable.direction eq "CRDT" + ) + }.sortedBy { it.id }.map { it } + // The latest payment must occupy the highest index, + // to reliably update the bank account row with the last + // submitted cash-out. + } +} + /** - * This function listens for regio-incoming events (LIBEUFIN_REGIO_INCOMING) + * This function listens for regio-incoming events (LIBEUFIN_REGIO_TX) * and submits the related cash-out payment to Nexus. The fiat payment will * then take place ENTIRELY on Nexus' responsibility. */ -// issueCashout() +suspend fun cashoutMonitor( + httpClient: HttpClient, + bankAccountLabel: String = "admin", + demobankName: String = "default" // used to get config values. +) { + // Register for a REGIO_TX event. + val eventChannel = buildChannelName( + NotificationsChannelDomains.LIBEUFIN_REGIO_TX, + bankAccountLabel + ) + val objectMapper = jacksonObjectMapper() + val demobank = getDemobank(demobankName) + val bankAccount = getBankAccountFromLabel(bankAccountLabel) + val config = demobank?.config ?: throw internalServerError( + "Demobank '$demobankName' has no configuration." + ) + val nexusBaseUrl = getConfigValueOrThrow(config::nexusBaseUrl) + val usernameAtNexus = getConfigValueOrThrow(config::usernameAtNexus) + val passwordAtNexus = getConfigValueOrThrow(config::passwordAtNexus) + val paymentInitEndpoint = nexusBaseUrl.run { + var ret = this + if (!ret.endsWith('/')) + ret += '/' + /** + * WARNING: Nexus gives the possibility to have bank account names + * DIFFERENT from their owner's username. Sandbox however MUST have + * its Nexus bank account named THE SAME as its username (until the + * config will allow to change). + */ + ret + "bank-accounts/$usernameAtNexus/payment-initiations" + } + while (true) { + // delaying here avoids to delay in multiple places (errors, + // lack of action, success) + delay(2000) + val listenHandle = PostgresListenHandle(eventChannel) + // pessimistically LISTEN + listenHandle.postgresListen() + // but optimistically check for data, case some + // arrived _before_ the LISTEN. + var newTxs = getUnsubmittedTransactions(bankAccountLabel) + // Data found, UNLISTEN. + if (newTxs.isNotEmpty()) + listenHandle.postgresUnlisten() + // Data not found, wait. + else { + // OK to block, because the next event is going to + // be _this_ one. The caller should however execute + // this whole logic in a thread other than the main + // HTTP server. + val isNotificationArrived = listenHandle.postgresGetNotifications(waitTimeout) + if (isNotificationArrived && listenHandle.receivedPayload == "CRDT") + newTxs = getUnsubmittedTransactions(bankAccountLabel) + } + if (newTxs.isEmpty()) + continue + newTxs.forEach { + val body = object { + /** + * This field is UID of the request _as assigned by the + * client_. That helps to reconcile transactions or lets + * Nexus implement idempotency. It will NOT identify the created + * resource at the server side. The ID of the created resource is + * assigned _by Nexus_ and communicated in the (successful) response. + */ + val uid = it.accountServicerReference + val iban = it.creditorIban + val bic = it.debtorBic + val amount = "${it.currency}:${it.amount}" + val subject = it.subject + val name = it.creditorName + } + val resp = try { + httpClient.post(paymentInitEndpoint) { + expectSuccess = false // Avoid excepting on !2xx + basicAuth(usernameAtNexus, passwordAtNexus) + contentType(ContentType.Application.Json) + setBody(objectMapper.writeValueAsString(body)) + } + } + // Hard-error, response did not even arrive. + catch (e: Exception) { + logger.error(e.message) + // mark as failed and proceed to the next one. + transaction { + CashoutSubmissionEntity.new { + this.localTransaction = it.id + this.hasErrors = true + } + bankAccount.lastFiatSubmission = it + } + return@forEach + } + // Handle the non 2xx error case. Here we try + // to store the response from Nexus. + if (resp.status.value != HttpStatusCode.OK.value) { + val maybeResponseBody = resp.bodyAsText() + logger.error( + "Fiat submission response was: $maybeResponseBody," + + " status: ${resp.status.value}" + ) + transaction { + CashoutSubmissionEntity.new { + localTransaction = it.id + this.hasErrors = true + if (maybeResponseBody.length > 0) + this.maybeNexusResposnse = maybeResponseBody + } + bankAccount.lastFiatSubmission = it + } + return@forEach + } + // Successful case, mark the wire transfer as submitted, + // and advance the pointer to the last submitted payment. + val responseBody = resp.bodyAsText() + transaction { + CashoutSubmissionEntity.new { + localTransaction = it.id + hasErrors = false + submissionTime = resp.responseTime.timestamp + isSubmitted = true + // Expectedly is > 0 and contains the submission + // unique identifier _as assigned by Nexus_. Not + // currently used by Sandbox, but may help to resolve + // disputes. + if (responseBody.length > 0) + maybeNexusResposnse = responseBody + } + // Advancing the 'last submitted bookmark', to avoid + // handling the same transaction multiple times. + bankAccount.lastFiatSubmission = it + } + } + } +} diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt index b0654950..c8a1df18 100644 --- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt +++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt @@ -508,6 +508,13 @@ object BankAccountsTable : IntIdTable() { * history results that start from / depend on the last transaction. */ val lastTransaction = reference("lastTransaction", BankAccountTransactionsTable).nullable() + + /** + * Points to the transaction that was last submitted by the conversion + * service to Nexus, in order to initiate a fiat payment related to a + * cash-out operation. + */ + val lastFiatSubmission = reference("lastFiatSubmission", BankAccountTransactionsTable).nullable() } class BankAccountEntity(id: EntityID<Int>) : IntEntity(id) { @@ -520,6 +527,7 @@ class BankAccountEntity(id: EntityID<Int>) : IntEntity(id) { var isPublic by BankAccountsTable.isPublic var demoBank by DemobankConfigEntity referencedOn BankAccountsTable.demoBank var lastTransaction by BankAccountTransactionEntity optionalReferencedOn BankAccountsTable.lastTransaction + var lastFiatSubmission by BankAccountTransactionEntity optionalReferencedOn BankAccountsTable.lastFiatSubmission } object BankAccountStatementsTable : IntIdTable() { @@ -620,10 +628,36 @@ object BankAccountReportsTable : IntIdTable() { val bankAccount = reference("bankAccount", BankAccountsTable) } +/** + * This table tracks the submissions of fiat payment instructions + * that Sandbox sends to Nexus. Every fiat payment instruction is + * related to a confirmed cash-out operation. The cash-out confirmation + * is effective once the customer sends a local wire transfer to the + * "admin" bank account. Such wire transfer is tracked by the 'localTransaction' + * column. + */ +object CashoutSubmissionsTable: LongIdTable() { + val localTransaction = reference("localTransaction", BankAccountTransactionsTable).uniqueIndex() + val isSubmitted = bool("isSubmitted").default(false) + val hasErrors = bool("hasErrors") + val maybeNexusResponse = text("maybeNexusResponse").nullable() + val submissionTime = long("submissionTime").nullable() // failed don't have it. +} + +class CashoutSubmissionEntity(id: EntityID<Long>) : LongEntity(id) { + companion object : LongEntityClass<CashoutSubmissionEntity>(CashoutSubmissionsTable) + var localTransaction by CashoutSubmissionsTable.localTransaction + var isSubmitted by CashoutSubmissionsTable.isSubmitted + var hasErrors by CashoutSubmissionsTable.hasErrors + var maybeNexusResposnse by CashoutSubmissionsTable.maybeNexusResponse + var submissionTime by CashoutSubmissionsTable.submissionTime +} + fun dbDropTables(dbConnectionString: String) { Database.connect(dbConnectionString) transaction { SchemaUtils.drop( + CashoutSubmissionsTable, EbicsSubscribersTable, EbicsHostsTable, EbicsDownloadTransactionsTable, @@ -649,6 +683,7 @@ fun dbCreateTables(dbConnectionString: String) { TransactionManager.manager.defaultIsolationLevel = Connection.TRANSACTION_SERIALIZABLE transaction { SchemaUtils.create( + CashoutSubmissionsTable, DemobankConfigsTable, DemobankConfigPairsTable, EbicsSubscribersTable, diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt index a2454ff4..fdea79c2 100644 --- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt +++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt @@ -31,6 +31,7 @@ import tech.libeufin.util.* import java.security.interfaces.RSAPublicKey import java.util.* import java.util.zip.DeflaterInputStream +import kotlin.reflect.KProperty data class DemobankConfig( val allowRegistrations: Boolean, @@ -43,9 +44,17 @@ data class DemobankConfig( val smsTan: String? = null, // fixme: move the config subcommand val emailTan: String? = null, // fixme: same as above. val suggestedExchangeBaseUrl: String? = null, - val suggestedExchangePayto: String? = null + val suggestedExchangePayto: String? = null, + val nexusBaseUrl: String? = null, + val usernameAtNexus: String? = null, + val passwordAtNexus: String? = null, + val enableConversionService: Boolean = false ) +fun <T>getConfigValueOrThrow(configKey: KProperty<T?>): T { + return configKey.getter.call() ?: throw nullConfigValueError(configKey.name) +} + /** * Helps to communicate Camt values without having * to parse the XML each time one is needed. |