libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 6ffd6c5f2b2dbb659b08ec6503a6d4a71ff47386
parent bcf5411774eb115e339720afec7ea9c46ae699e7
Author: MS <ms@taler.net>
Date:   Wed, 12 Apr 2023 22:46:49 +0200

Performance.

Implementing long polling to get transactions via
the Nexus native API.

Diffstat:
Mnexus/src/main/kotlin/tech/libeufin/nexus/DB.kt | 17++++++++++++++---
Mnexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt | 1-
Mnexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt | 39++++++++++++++++++++++++++++++++++++---
Mnexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt | 49+++++++++++++++++++++++++++++++++++--------------
Mnexus/src/test/kotlin/DbEventTest.kt | 35+++++++++++++++++++++++++++++++++++
Mnexus/src/test/kotlin/NexusApiTest.kt | 53++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mutil/src/main/kotlin/DB.kt | 17+++++++++++++----
7 files changed, 185 insertions(+), 26 deletions(-)

diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/DB.kt @@ -27,7 +27,7 @@ import org.jetbrains.exposed.dao.id.LongIdTable import org.jetbrains.exposed.sql.* import org.jetbrains.exposed.sql.transactions.TransactionManager import org.jetbrains.exposed.sql.transactions.transaction -import tech.libeufin.util.EbicsInitState +import tech.libeufin.util.* import java.sql.Connection import kotlin.reflect.typeOf @@ -210,8 +210,19 @@ object NexusBankTransactionsTable : LongIdTable() { } class NexusBankTransactionEntity(id: EntityID<Long>) : LongEntity(id) { - companion object : LongEntityClass<NexusBankTransactionEntity>(NexusBankTransactionsTable) - + companion object : LongEntityClass<NexusBankTransactionEntity>(NexusBankTransactionsTable) { + override fun new(init: NexusBankTransactionEntity.() -> Unit): NexusBankTransactionEntity { + val ret = super.new(init) + if (isPostgres()) { + val channelName = buildChannelName( + NotificationsChannelDomains.LIBEUFIN_NEXUS_TX, + ret.bankAccount.bankAccountName + ) + TransactionManager.current().postgresNotify(channelName, ret.creditDebitIndicator) + } + return ret + } + } var currency by NexusBankTransactionsTable.currency var amount by NexusBankTransactionsTable.amount var status by NexusBankTransactionsTable.status diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/Iso20022.kt @@ -273,7 +273,6 @@ data class Batch( @JsonInclude(JsonInclude.Include.NON_NULL) data class CamtBankAccountEntry( val amount: CurrencyAmount, - /** * Is this entry debiting or crediting the account * it is reported for? diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/server/Helpers.kt @@ -1,13 +1,46 @@ package tech.libeufin.nexus.server +import com.fasterxml.jackson.databind.JsonNode +import com.fasterxml.jackson.databind.node.ObjectNode +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.http.* +import org.jetbrains.exposed.sql.SqlExpressionBuilder.eq +import org.jetbrains.exposed.sql.SqlExpressionBuilder.greaterEq +import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.transactions.transaction -import tech.libeufin.nexus.NexusBankConnectionEntity -import tech.libeufin.nexus.NexusBankConnectionsTable -import tech.libeufin.nexus.NexusError +import tech.libeufin.nexus.* +import tech.libeufin.nexus.bankaccount.getBankAccount +import tech.libeufin.nexus.iso20022.CamtBankAccountEntry import tech.libeufin.util.internalServerError import tech.libeufin.util.notFound +// Type holding parameters of GET /transactions. +data class GetTransactionsParams( + val bankAccountId: String, + val startIndex: Long, + val resultSize: Long +) + +/** + * Queries the database according to the GET /transactions + * parameters. + */ +fun getIngestedTransactions(params: GetTransactionsParams): List<JsonNode> = + transaction { + val bankAccount = getBankAccount(params.bankAccountId) + val maybeResult = NexusBankTransactionEntity.find { + NexusBankTransactionsTable.bankAccount eq bankAccount.id.value and ( + NexusBankTransactionsTable.id greaterEq params.startIndex + ) + }.sortedBy { it.id.value }.take(params.resultSize.toInt()) // Smallest index (= earliest transaction) first + // Converting the result to the HTTP response type. + maybeResult.map { + val element: ObjectNode = jacksonObjectMapper().readTree(it.transactionJson) as ObjectNode + element.put("index", it.id.value.toString()) + return@map element + } + } + fun unknownBankAccount(bankAccountLabel: String): NexusError { return NexusError( HttpStatusCode.NotFound, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/server/NexusServer.kt @@ -762,21 +762,42 @@ val nexusApp: Application.() -> Unit = { // Asks list of transactions ALREADY downloaded from the bank. get("/bank-accounts/{accountid}/transactions") { requireSuperuser(call.request) - val bankAccountId = expectNonNull(call.parameters["accountid"]) - val ret = Transactions() - transaction { - val bankAccount = NexusBankAccountEntity.findByName(bankAccountId) - if (bankAccount == null) { - throw unknownBankAccount(bankAccountId) - } - NexusBankTransactionEntity.find { NexusBankTransactionsTable.bankAccount eq bankAccount.id }.map { - val tx = jacksonObjectMapper().readValue( - it.transactionJson, CamtBankAccountEntry::class.java - ) - ret.transactions.add(tx) - } + val accountLabel = expectNonNull(call.parameters["accountid"]) + // Getting the URI parameters. + val maybeStart = call.maybeLong("start") // Earliest TX in the result. + val maybeSize = call.maybeLong("size") // How many TXs at most. + val maybeLongPoll = call.maybeLong("long_poll_ms") + + // Ask for a DB event (before the actual query), + // in case the DB is Postgres and the client wants. + val listenHandle = if (isPostgres() && maybeLongPoll != null) { + val channelName = buildChannelName( + NotificationsChannelDomains.LIBEUFIN_NEXUS_TX, + accountLabel + ) + val listenHandle = PostgresListenHandle(channelName) + listenHandle.postgresListen() + listenHandle + } else null + + // Try getting results, and UNLISTEN in case they exist. + val queryParam = GetTransactionsParams( + bankAccountId = accountLabel, + resultSize = maybeSize ?: 5, + startIndex = maybeStart ?: 1 + ) + var ret = getIngestedTransactions(queryParam) + if (ret.isNotEmpty() && listenHandle != null) + listenHandle.postgresUnlisten() // closes the PG connection too. + + // No results and a DB event is pending: wait. + if (ret.isEmpty() && listenHandle != null && maybeLongPoll != null) { + val isNotificationArrived = listenHandle.waitOnIODispatchers(maybeLongPoll) + // The event happened, query again. + if (isNotificationArrived) + ret = getIngestedTransactions(queryParam) } - call.respond(ret) + call.respond(object {val transactions = ret}) return@get } diff --git a/nexus/src/test/kotlin/DbEventTest.kt b/nexus/src/test/kotlin/DbEventTest.kt @@ -4,7 +4,9 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.jetbrains.exposed.sql.transactions.transaction import org.junit.Test +import tech.libeufin.util.NotificationsChannelDomains import tech.libeufin.util.PostgresListenHandle +import tech.libeufin.util.buildChannelName import tech.libeufin.util.postgresNotify @@ -34,4 +36,37 @@ class DbEventTest { } } } + + /** + * This function tests the NOTIFY sent by a Exposed's + * "new {}" overridden static method. + */ + @Test + fun automaticNotifyTest() { + withTestDatabase { + prepNexusDb() + val nexusTxChannel = buildChannelName( + NotificationsChannelDomains.LIBEUFIN_NEXUS_TX, + "foo" // bank account label. + ) + val listenHandle = PostgresListenHandle(nexusTxChannel) + transaction { listenHandle.postgresListen() } + runBlocking { + launch { + val isArrived = listenHandle.waitOnIODispatchers(timeoutMs = 1000L) + assert(isArrived) + } + launch { + delay(500L); // Ensures the wait helper runs first. + transaction { + newNexusBankTransaction( + "TESTKUDOS", + "2", + "unblocking event" + ) + } + } + } + } + } } \ No newline at end of file diff --git a/nexus/src/test/kotlin/NexusApiTest.kt b/nexus/src/test/kotlin/NexusApiTest.kt @@ -1,16 +1,67 @@ +import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import io.ktor.client.plugins.* import io.ktor.client.request.* +import io.ktor.client.statement.* import io.ktor.http.* import io.ktor.server.testing.* +import kotlinx.coroutines.async +import kotlinx.coroutines.delay +import kotlinx.coroutines.ensureActive +import kotlinx.coroutines.runBlocking import org.junit.Test import tech.libeufin.nexus.server.nexusApp -import tech.libeufin.sandbox.sandboxApp /** * This class tests the API offered by Nexus, * documented here: https://docs.taler.net/libeufin/api-nexus.html */ class NexusApiTest { + // Testing long-polling on GET /transactions + @Test + fun getTransactions() { + withTestDatabase { + prepNexusDb() + testApplication { + application(nexusApp) + /** + * Requesting /transactions with long polling, and assert that + * the response arrives _after_ the unblocking INSERT into the + * database. + */ + val longPollMs = 5000 + runBlocking { + val requestJob = async { + client.get("/bank-accounts/foo/transactions?long_poll_ms=$longPollMs") { + basicAuth("foo", "foo") + contentType(ContentType.Application.Json) + } + } + /** + * The following delay ensures that the payment below + * gets inserted after the client has issued the long + * polled request above (and it is therefore waiting) + */ + delay(2000) + // Ensures that the request is active _before_ the + // upcoming payment. This ensures that the request + // didn't find already another payment in the database. + requestJob.ensureActive() + newNexusBankTransaction( + currency = "TESTKUDOS", + value = "2", + subject = "first" + ) + val R = requestJob.await() + // Ensures that the request did NOT wait all the timeout + assert((R.responseTime.timestamp - R.requestTime.timestamp) < longPollMs) + val body = jacksonObjectMapper().readTree(R.bodyAsText()) + // Ensures that the unblocking payment exists in the response. + val tx = body.get("transactions") + assert(tx.isArray && tx.size() == 1) + } + } + } + } // Testing basic operations on facades. @Test fun facades() { diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt @@ -40,6 +40,12 @@ fun isPostgres(): Boolean { } // Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance. +/** + * Note: every domain is ALWAYS meant to be salted with + * a unique identifier that points to the user waiting for + * a notification. The reference function for salting is: + * "buildChannelName()", in this file. + */ enum class NotificationsChannelDomains(val value: Int) { // When payments with well-formed Taler subject arrive. LIBEUFIN_TALER_INCOMING(3000), @@ -50,10 +56,12 @@ enum class NotificationsChannelDomains(val value: Int) { // Happens when a customer wants to withdraw Taler coins in the // regional currency. LIBEUFIN_SANDBOX_FIAT_INCOMING(3002), - // When Nexus discovers a new transactions from the bank it - // is connected to. This even may wake up a client who is waiting - // on Nexus' GET /transactions. - LIBEUFIN_NEXUS_FIAT_INCOMING(3003) + // When Nexus has ingested a new transactions from the bank it + // is connected to. This event carries incoming and outgoing + // payments, and it specifies that in its payload. The direction + // codename is the same as CaMt (DBIT, CRDT), as that is also + // used in the database. + LIBEUFIN_NEXUS_TX(3003) } /** @@ -77,6 +85,7 @@ fun Transaction.postgresNotify( channel: String, payload: String? = null ) { + logger.debug("Sending NOTIFY on channel '$channel' with payload '$payload'") if (payload != null) { val argEnc = Base32Crockford.encode(payload.toByteArray()) if (payload.toByteArray().size > 8000)