commit a35ce9012f2cd6a11f850da49c5ca6a92ee4a5fc
parent 382c5fc8c0008160f7ed4e85c49053164f2c6915
Author: MS <ms@taler.net>
Date: Wed, 12 Apr 2023 11:21:52 +0200
DB events at Access API.
Offering long polling for transactions download.
Diffstat:
3 files changed, 129 insertions(+), 9 deletions(-)
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/ConversionService.kt
@@ -0,0 +1,72 @@
+package tech.libeufin.sandbox
+
+import kotlinx.coroutines.delay
+import kotlinx.coroutines.runBlocking
+
+/**
+ * This file contains the logic for downloading/submitting incoming/outgoing
+ * fiat transactions to Nexus. It needs the following values for operating.
+ *
+ * 1. Nexus URL.
+ * 2. Credentials to authenticate at Nexus JSON API.
+ * 3. Long-polling interval.
+ * 4. Frequency of the download loop.
+ *
+ * Notes:
+ *
+ * 1. The account to credit on incoming transactions is ALWAYS "admin".
+ * 2. The time to submit a new payment is as soon as "admin" receives one
+ * incoming regional payment.
+ * 3. At this time, Nexus does NOT offer long polling when it serves the
+ * transactions via its JSON API.
+ * 4. At this time, Nexus does NOT offer any filter when it serves the
+ * transactions via its JSON API.
+ */
+
+// Temporarily hard-coded. According to fiat times, these values could be WAY higher.
+val longPollMs = 30000L // 30s long-polling.
+val loopNewReqMs = 2000L // 2s for the next request.
+
+/**
+ * Executes the 'block' function every 'loopNewReqMs' milliseconds.
+ * Does not exit/fail the process upon exceptions - just logs them.
+ */
+fun downloadLoop(block: () -> Unit) {
+ // Needs "runBlocking {}" to call "delay()" and in case 'block'
+ // contains suspend functions.
+ runBlocking {
+ while(true) {
+ try { block() }
+ catch (e: Exception) {
+ /**
+ * Not exiting to tolerate network issues, or optimistically
+ * tolerate problems not caused by Sandbox itself.
+ */
+ logger.error("Sandbox fiat-incoming monitor excepted: ${e.message}")
+ }
+ delay(loopNewReqMs)
+ }
+ }
+}
+
+/**
+ * This function downloads the incoming fiat transactions from Nexus,
+ * stores them into the database and signals their arrival (LIBEUFIN_FIAT_INCOMING)
+ * to allow crediting the "admin" account.
+ */
+// fetchTransactions()
+
+/**
+ * This function listens for fiat-incoming events (LIBEUFIN_FIAT_INCOMING)
+ * and credits the "admin" account as a reaction. Lastly, the Nexus instance
+ * wired to Sandbox will pick the new payment and serve it via its TWG, but
+ * this is OUT of the Sandbox scope.
+ */
+// creditAdmin()
+
+/**
+ * This function listens for regio-incoming events (LIBEUFIN_REGIO_INCOMING)
+ * and submits the related cash-out payment to Nexus. The fiat payment will
+ * then take place ENTIRELY on Nexus' responsibility.
+ */
+// issueCashout()
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt
@@ -47,6 +47,7 @@ import io.ktor.server.util.*
import io.ktor.server.plugins.callloging.*
import io.ktor.server.plugins.cors.routing.*
import io.ktor.util.date.*
+import org.jetbrains.exposed.dao.flushCache
import org.jetbrains.exposed.sql.*
import org.jetbrains.exposed.sql.statements.api.ExposedBlob
import org.jetbrains.exposed.sql.transactions.transaction
@@ -1530,16 +1531,50 @@ val sandboxApp: Application.() -> Unit = {
if (fromMs < 0) throw badRequest("'from_ms' param is less than 0")
val untilMs = expectLong(call.request.queryParameters["until_ms"] ?: Long.MAX_VALUE.toString())
if (untilMs < 0) throw badRequest("'until_ms' param is less than 0")
- val ret: List<XLibeufinBankTransaction> = transaction {
- extractTxHistory(
- HistoryParams(
- pageNumber = page,
- pageSize = size,
- bankAccount = bankAccount,
- fromMs = fromMs,
- untilMs = untilMs
- )
+ val longPollMs: Long? = call.maybeLong("long_poll_ms")
+ // LISTEN, if Postgres.
+ val listenHandle = if (isPostgres() && longPollMs != null) {
+ val channelName = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+ call.expectUriComponent("account_name")
)
+ val listenHandle = PostgresListenHandle(channelName)
+ // Can't LISTEN on the same DB TX that checks for data, as Exposed
+ // closes that connection and the notification getter would fail.
+ // Can't invoke the notification getter in the same DB TX either,
+ // as it would block the DB.
+ listenHandle.postgresListen()
+ listenHandle
+ } else null
+ val historyParams = HistoryParams(
+ pageNumber = page,
+ pageSize = size,
+ bankAccount = bankAccount,
+ fromMs = fromMs,
+ untilMs = untilMs
+ )
+ var ret: List<XLibeufinBankTransaction> = transaction {
+ extractTxHistory(historyParams)
+ }
+ // Data was found already, UNLISTEN and respond.
+ if (listenHandle != null && ret.isNotEmpty()) {
+ listenHandle.postgresUnlisten()
+ call.respond(object {val transactions = ret})
+ return@get
+ }
+ // No data was found, sleep until the timeout or getting woken up.
+ // Third condition only silences the compiler.
+ if (listenHandle != null && ret.isEmpty() && longPollMs != null) {
+ val notificationArrived = listenHandle.waitOnIODispatchers(longPollMs)
+ // Only if the awaited event fired, query again the DB.
+ if (notificationArrived)
+ {
+ ret = transaction {
+ // Refreshing to update the index to the very last transaction.
+ historyParams.bankAccount.refresh()
+ extractTxHistory(historyParams)
+ }
+ }
}
call.respond(object {val transactions = ret})
return@get
diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt
@@ -224,6 +224,19 @@ fun wireTransfer(
this.demobank = demobank
this.pmtInfId = pmtInfId
}
+ // Signaling this wire transfer's event.
+ if (this.isPostgres()) {
+ val creditChannel = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+ creditAccount.label
+ )
+ this.postgresNotify(creditChannel, "CRDT")
+ val debitChannel = buildChannelName(
+ NotificationsChannelDomains.LIBEUFIN_REGIO_TX,
+ debitAccount.label
+ )
+ this.postgresNotify(debitChannel, "DBIT")
+ }
}
return transactionRef
}