summaryrefslogtreecommitdiff
path: root/sandbox/src/main/kotlin/tech/libeufin
diff options
context:
space:
mode:
authorMS <ms@taler.net>2023-04-16 09:19:46 +0200
committerMS <ms@taler.net>2023-04-16 10:13:54 +0200
commitd7cecd35a5f7ab3ab2491e4c4ad4f07e041e9944 (patch)
tree56ab7ef93a92a0f3f743c53c10ec56db9425f37b /sandbox/src/main/kotlin/tech/libeufin
parent1602c0b6cd3cad8d8b8f14d68f509842e72146b6 (diff)
downloadlibeufin-d7cecd35a5f7ab3ab2491e4c4ad4f07e041e9944.tar.gz
libeufin-d7cecd35a5f7ab3ab2491e4c4ad4f07e041e9944.tar.bz2
libeufin-d7cecd35a5f7ab3ab2491e4c4ad4f07e041e9944.zip
Conversion service.
Implementing the cash-out monitor. The monitor watches one particular bank account (the admin's by default) and submits a fiat payment initiation to Nexus upon every new incoming transaction. Also implementing idempotence for payment initiations at Nexus. This helps in case the cash-out monitor fails at keeping track of the submitted payments and accidentally submits multiple times the same payment.
Diffstat (limited to 'sandbox/src/main/kotlin/tech/libeufin')
-rw-r--r--sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt183
-rw-r--r--sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt35
-rw-r--r--sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt11
3 files changed, 222 insertions, 7 deletions
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
index 1d256beb..4045d10e 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
@@ -1,7 +1,17 @@
package tech.libeufin.sandbox
+import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
+import io.ktor.client.*
+import io.ktor.client.plugins.*
+import io.ktor.client.request.*
+import io.ktor.client.statement.*
+import io.ktor.http.*
+import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
+import org.jetbrains.exposed.sql.and
+import org.jetbrains.exposed.sql.transactions.transaction
+import tech.libeufin.util.*
/**
* This file contains the logic for downloading/submitting incoming/outgoing
@@ -23,9 +33,18 @@ import kotlinx.coroutines.runBlocking
* transactions via its JSON API.
*/
-// Temporarily hard-coded. According to fiat times, these values could be WAY higher.
-val longPollMs = 30000L // 30s long-polling.
-val loopNewReqMs = 2000L // 2s for the next request.
+/**
+ * Timeout the HTTP client waits for the server to respond,
+ * after the request is made.
+ */
+val waitTimeout = 30000L
+
+/**
+ * Time to wait before HTTP requesting again to the server.
+ * This helps to avoid tight cycles in case the server responds
+ * quickly or the client doesn't long-poll.
+ */
+val newIterationTimeout = 2000L
/**
* Executes the 'block' function every 'loopNewReqMs' milliseconds.
@@ -44,7 +63,7 @@ fun downloadLoop(block: () -> Unit) {
*/
logger.error("Sandbox fiat-incoming monitor excepted: ${e.message}")
}
- delay(loopNewReqMs)
+ delay(newIterationTimeout)
}
}
}
@@ -64,9 +83,161 @@ fun downloadLoop(block: () -> Unit) {
*/
// creditAdmin()
+// DB query helper. The List return type (instead of SizedIterable) lets
+// the caller NOT open a transaction block to access the values -- although
+// some operations _on the values_ may be forbidden.
+private fun getUnsubmittedTransactions(bankAccountLabel: String): List<BankAccountTransactionEntity> {
+ return transaction {
+ val bankAccount = getBankAccountFromLabel(bankAccountLabel)
+ val lowerExclusiveLimit = bankAccount.lastFiatSubmission?.id?.value ?: 0
+ BankAccountTransactionEntity.find {
+ BankAccountTransactionsTable.id greater lowerExclusiveLimit and (
+ BankAccountTransactionsTable.direction eq "CRDT"
+ )
+ }.sortedBy { it.id }.map { it }
+ // The latest payment must occupy the highest index,
+ // to reliably update the bank account row with the last
+ // submitted cash-out.
+ }
+}
+
/**
- * This function listens for regio-incoming events (LIBEUFIN_REGIO_INCOMING)
+ * This function listens for regio-incoming events (LIBEUFIN_REGIO_TX)
* and submits the related cash-out payment to Nexus. The fiat payment will
* then take place ENTIRELY on Nexus' responsibility.
*/
-// issueCashout()
+suspend fun cashoutMonitor(
+ httpClient: HttpClient,
+ bankAccountLabel: String = "admin",
+ demobankName: String = "default" // used to get config values.
+) {
+ // Register for a REGIO_TX event.
+ val eventChannel = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+ bankAccountLabel
+ )
+ val objectMapper = jacksonObjectMapper()
+ val demobank = getDemobank(demobankName)
+ val bankAccount = getBankAccountFromLabel(bankAccountLabel)
+ val config = demobank?.config ?: throw internalServerError(
+ "Demobank '$demobankName' has no configuration."
+ )
+ val nexusBaseUrl = getConfigValueOrThrow(config::nexusBaseUrl)
+ val usernameAtNexus = getConfigValueOrThrow(config::usernameAtNexus)
+ val passwordAtNexus = getConfigValueOrThrow(config::passwordAtNexus)
+ val paymentInitEndpoint = nexusBaseUrl.run {
+ var ret = this
+ if (!ret.endsWith('/'))
+ ret += '/'
+ /**
+ * WARNING: Nexus gives the possibility to have bank account names
+ * DIFFERENT from their owner's username. Sandbox however MUST have
+ * its Nexus bank account named THE SAME as its username (until the
+ * config will allow to change).
+ */
+ ret + "bank-accounts/$usernameAtNexus/payment-initiations"
+ }
+ while (true) {
+ // delaying here avoids to delay in multiple places (errors,
+ // lack of action, success)
+ delay(2000)
+ val listenHandle = PostgresListenHandle(eventChannel)
+ // pessimistically LISTEN
+ listenHandle.postgresListen()
+ // but optimistically check for data, case some
+ // arrived _before_ the LISTEN.
+ var newTxs = getUnsubmittedTransactions(bankAccountLabel)
+ // Data found, UNLISTEN.
+ if (newTxs.isNotEmpty())
+ listenHandle.postgresUnlisten()
+ // Data not found, wait.
+ else {
+ // OK to block, because the next event is going to
+ // be _this_ one. The caller should however execute
+ // this whole logic in a thread other than the main
+ // HTTP server.
+ val isNotificationArrived = listenHandle.postgresGetNotifications(waitTimeout)
+ if (isNotificationArrived && listenHandle.receivedPayload == "CRDT")
+ newTxs = getUnsubmittedTransactions(bankAccountLabel)
+ }
+ if (newTxs.isEmpty())
+ continue
+ newTxs.forEach {
+ val body = object {
+ /**
+ * This field is UID of the request _as assigned by the
+ * client_. That helps to reconcile transactions or lets
+ * Nexus implement idempotency. It will NOT identify the created
+ * resource at the server side. The ID of the created resource is
+ * assigned _by Nexus_ and communicated in the (successful) response.
+ */
+ val uid = it.accountServicerReference
+ val iban = it.creditorIban
+ val bic = it.debtorBic
+ val amount = "${it.currency}:${it.amount}"
+ val subject = it.subject
+ val name = it.creditorName
+ }
+ val resp = try {
+ httpClient.post(paymentInitEndpoint) {
+ expectSuccess = false // Avoid excepting on !2xx
+ basicAuth(usernameAtNexus, passwordAtNexus)
+ contentType(ContentType.Application.Json)
+ setBody(objectMapper.writeValueAsString(body))
+ }
+ }
+ // Hard-error, response did not even arrive.
+ catch (e: Exception) {
+ logger.error(e.message)
+ // mark as failed and proceed to the next one.
+ transaction {
+ CashoutSubmissionEntity.new {
+ this.localTransaction = it.id
+ this.hasErrors = true
+ }
+ bankAccount.lastFiatSubmission = it
+ }
+ return@forEach
+ }
+ // Handle the non 2xx error case. Here we try
+ // to store the response from Nexus.
+ if (resp.status.value != HttpStatusCode.OK.value) {
+ val maybeResponseBody = resp.bodyAsText()
+ logger.error(
+ "Fiat submission response was: $maybeResponseBody," +
+ " status: ${resp.status.value}"
+ )
+ transaction {
+ CashoutSubmissionEntity.new {
+ localTransaction = it.id
+ this.hasErrors = true
+ if (maybeResponseBody.length > 0)
+ this.maybeNexusResposnse = maybeResponseBody
+ }
+ bankAccount.lastFiatSubmission = it
+ }
+ return@forEach
+ }
+ // Successful case, mark the wire transfer as submitted,
+ // and advance the pointer to the last submitted payment.
+ val responseBody = resp.bodyAsText()
+ transaction {
+ CashoutSubmissionEntity.new {
+ localTransaction = it.id
+ hasErrors = false
+ submissionTime = resp.responseTime.timestamp
+ isSubmitted = true
+ // Expectedly is > 0 and contains the submission
+ // unique identifier _as assigned by Nexus_. Not
+ // currently used by Sandbox, but may help to resolve
+ // disputes.
+ if (responseBody.length > 0)
+ maybeNexusResposnse = responseBody
+ }
+ // Advancing the 'last submitted bookmark', to avoid
+ // handling the same transaction multiple times.
+ bankAccount.lastFiatSubmission = it
+ }
+ }
+ }
+}
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
index b0654950..c8a1df18 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt
@@ -508,6 +508,13 @@ object BankAccountsTable : IntIdTable() {
* history results that start from / depend on the last transaction.
*/
val lastTransaction = reference("lastTransaction", BankAccountTransactionsTable).nullable()
+
+ /**
+ * Points to the transaction that was last submitted by the conversion
+ * service to Nexus, in order to initiate a fiat payment related to a
+ * cash-out operation.
+ */
+ val lastFiatSubmission = reference("lastFiatSubmission", BankAccountTransactionsTable).nullable()
}
class BankAccountEntity(id: EntityID<Int>) : IntEntity(id) {
@@ -520,6 +527,7 @@ class BankAccountEntity(id: EntityID<Int>) : IntEntity(id) {
var isPublic by BankAccountsTable.isPublic
var demoBank by DemobankConfigEntity referencedOn BankAccountsTable.demoBank
var lastTransaction by BankAccountTransactionEntity optionalReferencedOn BankAccountsTable.lastTransaction
+ var lastFiatSubmission by BankAccountTransactionEntity optionalReferencedOn BankAccountsTable.lastFiatSubmission
}
object BankAccountStatementsTable : IntIdTable() {
@@ -620,10 +628,36 @@ object BankAccountReportsTable : IntIdTable() {
val bankAccount = reference("bankAccount", BankAccountsTable)
}
+/**
+ * This table tracks the submissions of fiat payment instructions
+ * that Sandbox sends to Nexus. Every fiat payment instruction is
+ * related to a confirmed cash-out operation. The cash-out confirmation
+ * is effective once the customer sends a local wire transfer to the
+ * "admin" bank account. Such wire transfer is tracked by the 'localTransaction'
+ * column.
+ */
+object CashoutSubmissionsTable: LongIdTable() {
+ val localTransaction = reference("localTransaction", BankAccountTransactionsTable).uniqueIndex()
+ val isSubmitted = bool("isSubmitted").default(false)
+ val hasErrors = bool("hasErrors")
+ val maybeNexusResponse = text("maybeNexusResponse").nullable()
+ val submissionTime = long("submissionTime").nullable() // failed don't have it.
+}
+
+class CashoutSubmissionEntity(id: EntityID<Long>) : LongEntity(id) {
+ companion object : LongEntityClass<CashoutSubmissionEntity>(CashoutSubmissionsTable)
+ var localTransaction by CashoutSubmissionsTable.localTransaction
+ var isSubmitted by CashoutSubmissionsTable.isSubmitted
+ var hasErrors by CashoutSubmissionsTable.hasErrors
+ var maybeNexusResposnse by CashoutSubmissionsTable.maybeNexusResponse
+ var submissionTime by CashoutSubmissionsTable.submissionTime
+}
+
fun dbDropTables(dbConnectionString: String) {
Database.connect(dbConnectionString)
transaction {
SchemaUtils.drop(
+ CashoutSubmissionsTable,
EbicsSubscribersTable,
EbicsHostsTable,
EbicsDownloadTransactionsTable,
@@ -649,6 +683,7 @@ fun dbCreateTables(dbConnectionString: String) {
TransactionManager.manager.defaultIsolationLevel = Connection.TRANSACTION_SERIALIZABLE
transaction {
SchemaUtils.create(
+ CashoutSubmissionsTable,
DemobankConfigsTable,
DemobankConfigPairsTable,
EbicsSubscribersTable,
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt
index a2454ff4..fdea79c2 100644
--- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt
+++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Helpers.kt
@@ -31,6 +31,7 @@ import tech.libeufin.util.*
import java.security.interfaces.RSAPublicKey
import java.util.*
import java.util.zip.DeflaterInputStream
+import kotlin.reflect.KProperty
data class DemobankConfig(
val allowRegistrations: Boolean,
@@ -43,9 +44,17 @@ data class DemobankConfig(
val smsTan: String? = null, // fixme: move the config subcommand
val emailTan: String? = null, // fixme: same as above.
val suggestedExchangeBaseUrl: String? = null,
- val suggestedExchangePayto: String? = null
+ val suggestedExchangePayto: String? = null,
+ val nexusBaseUrl: String? = null,
+ val usernameAtNexus: String? = null,
+ val passwordAtNexus: String? = null,
+ val enableConversionService: Boolean = false
)
+fun <T>getConfigValueOrThrow(configKey: KProperty<T?>): T {
+ return configKey.getter.call() ?: throw nullConfigValueError(configKey.name)
+}
+
/**
* Helps to communicate Camt values without having
* to parse the XML each time one is needed.