libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 11d9a79728761b838c6fc6d8e6821207113857e4
parent 54d7f76ec7c31cbddf7e8ff12d499f3896ab0044
Author: Antoine A <>
Date:   Wed, 11 Oct 2023 18:09:57 +0000

Fix and optimize polling history

Diffstat:
Mbank/src/main/kotlin/tech/libeufin/bank/BankMessages.kt | 2+-
Mbank/src/main/kotlin/tech/libeufin/bank/CorebankApiHandlers.kt | 25++-----------------------
Mbank/src/main/kotlin/tech/libeufin/bank/Database.kt | 121++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------
Mbank/src/main/kotlin/tech/libeufin/bank/helpers.kt | 6+++---
Mbank/src/test/kotlin/DatabaseTest.kt | 4++--
Mbank/src/test/kotlin/LibeuFinApiTest.kt | 1+
Mbank/src/test/kotlin/TalerApiTest.kt | 4++--
7 files changed, 91 insertions(+), 72 deletions(-)

diff --git a/bank/src/main/kotlin/tech/libeufin/bank/BankMessages.kt b/bank/src/main/kotlin/tech/libeufin/bank/BankMessages.kt @@ -542,7 +542,7 @@ data class BankAccountTransactionInfo( // Response type for histories, namely GET /transactions @Serializable data class BankAccountTransactionsResponse( - val transactions: MutableList<BankAccountTransactionInfo> + val transactions: List<BankAccountTransactionInfo> ) // Taler withdrawal request. diff --git a/bank/src/main/kotlin/tech/libeufin/bank/CorebankApiHandlers.kt b/bank/src/main/kotlin/tech/libeufin/bank/CorebankApiHandlers.kt @@ -507,29 +507,8 @@ fun Routing.accountsMgmtHandlers(db: Database, ctx: BankApplicationContext) { ) val bankAccount = db.bankAccountGetFromOwnerId(resourceCustomer.expectRowId()) ?: throw internalServerError("Customer '${resourceCustomer.login}' lacks bank account.") - val history: List<BankAccountTransaction> = db.bankTransactionGetHistory( - start = historyParams.start, - delta = historyParams.delta, - bankAccountId = bankAccount.expectRowId() - ) - val res = BankAccountTransactionsResponse(transactions = mutableListOf()) - history.forEach { - res.transactions.add( - BankAccountTransactionInfo( - debtor_payto_uri = it.debtorPaytoUri, - creditor_payto_uri = it.creditorPaytoUri, - subject = it.subject, - amount = it.amount, - direction = it.direction, - date = TalerProtocolTimestamp(it.transactionDate), - row_id = it.dbRowId ?: throw internalServerError( - "Transaction timestamped with '${it.transactionDate}' did not have row ID" - ) - ) - ) - } - call.respond(res) - return@get + val history: List<BankAccountTransactionInfo> = db.bankPoolHistory(historyParams, bankAccount.expectRowId()) + call.respond(BankAccountTransactionsResponse(history)) } // Creates a bank transaction. post("/accounts/{USERNAME}/transactions") { diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt @@ -868,72 +868,110 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos query: String, map: (ResultSet) -> T ): List<T> { - var start = params.start - var delta = params.delta + var lastSeenId = 0L; var poll_ms = params.poll_ms; - val items = mutableListOf<T>() - - // If going backward with a starting point, it is useless to poll - if (delta < 0 && start != Long.MAX_VALUE) { - poll_ms = 0; - } + var delta = params.delta + val backward = delta < 0 + var (min, max) = if (backward) Pair(0L, params.start) else Pair(params.start, Long.MAX_VALUE) - dbPool.getConnection().use { conn -> + return dbPool.getConnection().use { conn -> // Prepare statement - val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC") + val orderBy = if (backward) "DESC" else "ASC"; val stmt = conn.prepareStatement(""" $query - WHERE bank_transaction_id ${cmpOp} ? - AND bank_account_id=? - ORDER BY bank_transaction_id ${orderBy} + WHERE bank_account_id=? AND + bank_transaction_id > ? AND bank_transaction_id < ? + ORDER BY bank_transaction_id $orderBy LIMIT ? """) - stmt.setLong(2, bankAccountId) - - fun load() { - stmt.setLong(1, start) - stmt.setLong(3, abs(delta)) - items.addAll(stmt.all { - start = it.getLong("bank_transaction_id") - if (delta < 0) delta ++ else delta -- + stmt.setLong(1, bankAccountId) + + fun load(): List<T> { + stmt.setLong(2, min) + stmt.setLong(3, max) + stmt.setInt(4, abs(delta)) + return stmt.all { + lastSeenId = kotlin.math.max(it.getLong("bank_transaction_id"), lastSeenId) map(it) - }) + } } // Start expensive listening process only if we intend to poll if (poll_ms > 0) { + var items = listOf<T>() notifWatcher.(listen)(bankAccountId) { flow -> + val maxId = conn.prepareStatement("SELECT MAX(bank_transaction_id) FROM bank_account_transactions") + .oneOrNull { it.getLong(1) } ?: 0; // Start buffering notification to not miss any val buffered = flow.buffer() // Initial load - load() - // Long polling while necessary - if (delta != 0L) { + items += load() + if (backward) delta += items.size else delta -= items.size + + // Long polling if we want more item and we could have more + if (delta != 0 && !(backward && params.start <= maxId + 1)) { withTimeoutOrNull(poll_ms) { - buffered.filter { - when { - params.start == Long.MAX_VALUE -> true - delta < 0 -> it.rowId < start - else -> it.rowId > start - } - }.take(abs(delta).toInt()).count() + buffered + .filter { it.rowId > lastSeenId } // Skip transactions we already have processed + .take(abs(delta).toInt()).count() } - // If going backward without a starting point, we reset loading progress - if (params.start == Long.MAX_VALUE) { - start = params.start + if (backward) { + min = lastSeenId + // When going backward we might found more items that they are missing delta = params.delta - items.clear() + items = (load() + items).take(abs(delta)) + } else { + min = lastSeenId + items += load() } - load() } } + items } else { load() } } + } - return items.toList(); + suspend fun bankPoolHistory( + params: HistoryParams, + bankAccountId: Long + ): List<BankAccountTransactionInfo> { + return poolHistory(params, bankAccountId, NotificationWatcher::listenBank, """ + SELECT + bank_transaction_id + ,transaction_date + ,(amount).val AS amount_val + ,(amount).frac AS amount_frac + ,debtor_payto_uri + ,creditor_payto_uri + ,subject + ,direction + FROM bank_account_transactions + """) { + BankAccountTransactionInfo( + row_id = it.getLong("bank_transaction_id"), + date = TalerProtocolTimestamp( + it.getLong("transaction_date").microsToJavaInstant() ?: throw faultyTimestampByBank() + ), + debtor_payto_uri = it.getString("debtor_payto_uri"), + creditor_payto_uri = it.getString("creditor_payto_uri"), + amount = TalerAmount( + it.getLong("amount_val"), + it.getInt("amount_frac"), + getCurrency() + ), + subject = it.getString("subject"), + direction = it.getString("direction").run { + when(this) { + "credit" -> TransactionDirection.credit + "debit" -> TransactionDirection.debit + else -> throw internalServerError("Wrong direction in transaction: $this") + } + } + ) + } } suspend fun exchangeIncomingPoolHistory( @@ -1011,7 +1049,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos */ fun bankTransactionGetHistory( start: Long, - delta: Long, + delta: Int, bankAccountId: Long, withDirection: TransactionDirection? = null ): List<BankAccountTransaction> = conn { conn -> @@ -1051,7 +1089,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos } else 3 - stmt.setLong(limitParamIndex, abs(delta)) + stmt.setInt(limitParamIndex, abs(delta)) stmt.all { val direction = withDirection ?: when (it.getString("direction")) { "credit" -> TransactionDirection.credit @@ -1548,7 +1586,8 @@ private class NotificationWatcher(private val pgSource: PGSimpleDataSource) { conn.execSQLUpdate("LISTEN incoming_tx") while (true) { - conn.getNotifications().forEach { + conn.getNotifications(0) // Block until we receive at least one notification + .forEach { if (it.name == "bank_tx") { val info = it.parameter.split(' ', limit = 4).map { it.toLong() } val debtorAccount = info[0]; diff --git a/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt b/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt @@ -365,7 +365,7 @@ fun getWithdrawal(db: Database, opIdParam: String): TalerWithdrawalOperation { } data class HistoryParams( - val delta: Long, val start: Long, val poll_ms: Long + val delta: Int, val start: Long, val poll_ms: Long ) /** @@ -375,8 +375,8 @@ data class HistoryParams( fun getHistoryParams(params: Parameters): HistoryParams { val deltaParam: String = params["delta"] ?: throw MissingRequestParameterException(parameterName = "delta") - val delta: Long = try { - deltaParam.toLong() + val delta: Int = try { + deltaParam.toInt() } catch (e: Exception) { logger.error(e.message) throw badRequest("Param 'delta' not a number") diff --git a/bank/src/test/kotlin/DatabaseTest.kt b/bank/src/test/kotlin/DatabaseTest.kt @@ -339,13 +339,13 @@ class DatabaseTest { // Testing positive delta: val forward = db.bankTransactionGetHistory( start = 50L, - delta = 2L, + delta = 2, bankAccountId = 1L // asking as Foo ) assert(forward[0].expectRowId() >= 50 && forward.size == 2 && forward[0].dbRowId!! < forward[1].dbRowId!!) val backward = db.bankTransactionGetHistory( start = 50L, - delta = -2L, + delta = -2, bankAccountId = 1L // asking as Foo ) assert(backward[0].expectRowId() <= 50 && backward.size == 2 && backward[0].dbRowId!! > backward[1].dbRowId!!) diff --git a/bank/src/test/kotlin/LibeuFinApiTest.kt b/bank/src/test/kotlin/LibeuFinApiTest.kt @@ -68,6 +68,7 @@ class LibeuFinApiTest { */ @Test fun testHistory() = setup { db, ctx -> + // TODO add better tests with lon polling like Wire Gateway API val fooId = db.customerCreate(customerFoo); assert(fooId != null) assert(db.bankAccountCreate(genBankAccount(fooId!!)) != null) val barId = db.customerCreate(customerBar); assert(barId != null) diff --git a/bank/src/test/kotlin/TalerApiTest.kt b/bank/src/test/kotlin/TalerApiTest.kt @@ -296,7 +296,7 @@ class TalerApiTest { // Check no useless polling assertTime(0, 300) { - client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&start=20&long_poll_ms=1000") { + client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&start=15&long_poll_ms=1000") { basicAuth("bar", "secret") }.assertHistory(5) } @@ -435,7 +435,7 @@ class TalerApiTest { // Check no useless polling assertTime(0, 300) { - client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&start=20&long_poll_ms=1000") { + client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&start=15&long_poll_ms=1000") { basicAuth("bar", "secret") }.assertHistory(5) }