diff options
author | Antoine A <> | 2023-10-11 19:02:09 +0000 |
---|---|---|
committer | Antoine A <> | 2023-10-11 19:02:09 +0000 |
commit | 9d43dc08ac94ff1f30d76ee3978ea2f4571c4369 (patch) | |
tree | fa7b00d8e18af1f2caf7d157ac6178ac6b574b7e | |
parent | 11d9a79728761b838c6fc6d8e6821207113857e4 (diff) | |
download | libeufin-9d43dc08ac94ff1f30d76ee3978ea2f4571c4369.tar.gz libeufin-9d43dc08ac94ff1f30d76ee3978ea2f4571c4369.tar.bz2 libeufin-9d43dc08ac94ff1f30d76ee3978ea2f4571c4369.zip |
Clean up and document history polling
-rw-r--r-- | bank/src/main/kotlin/tech/libeufin/bank/Database.kt | 110 |
1 files changed, 52 insertions, 58 deletions
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt index 867bef1d..59da5b23 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt @@ -77,7 +77,7 @@ private fun pgDataSource(dbConfig: String): PGSimpleDataSource { } private fun PGSimpleDataSource.pgConnection(): PgConnection { - val conn = getConnection().unwrap(PgConnection::class.java) + val conn = connection.unwrap(PgConnection::class.java) conn.execSQLUpdate("SET search_path TO libeufin_bank;") return conn } @@ -194,12 +194,11 @@ private fun PreparedStatement.executeUpdateViolation(): Boolean { } class Database(dbConfig: String, private val bankCurrency: String): java.io.Closeable { - private val pgSource: PGSimpleDataSource private val dbPool: HikariDataSource private val notifWatcher: NotificationWatcher init { - pgSource = pgDataSource(dbConfig) + val pgSource = pgDataSource(dbConfig) val config = HikariConfig(); config.dataSource = pgSource config.connectionInitSql = "SET search_path TO libeufin_bank;" @@ -861,6 +860,10 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos } } + /** + * The following function returns the list of transactions, according + * to the history parameters and perform long polling when necessary. + */ private suspend fun <T> poolHistory( params: HistoryParams, bankAccountId: Long, @@ -868,68 +871,72 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos query: String, map: (ResultSet) -> T ): List<T> { - var lastSeenId = 0L; - var poll_ms = params.poll_ms; - var delta = params.delta - val backward = delta < 0 + val backward = params.delta < 0 + val nbTx = abs(params.delta) // Number of transaction to query + // Range of transaction ids to check var (min, max) = if (backward) Pair(0L, params.start) else Pair(params.start, Long.MAX_VALUE) return dbPool.getConnection().use { conn -> // Prepare statement - val orderBy = if (backward) "DESC" else "ASC"; val stmt = conn.prepareStatement(""" $query WHERE bank_account_id=? AND bank_transaction_id > ? AND bank_transaction_id < ? - ORDER BY bank_transaction_id $orderBy + ORDER BY bank_transaction_id ${if (backward) "DESC" else "ASC"} LIMIT ? """) stmt.setLong(1, bankAccountId) - fun load(): List<T> { + fun load(amount: Int): List<T> { stmt.setLong(2, min) stmt.setLong(3, max) - stmt.setInt(4, abs(delta)) + stmt.setInt(4, amount) return stmt.all { - lastSeenId = kotlin.math.max(it.getLong("bank_transaction_id"), lastSeenId) + // Remember not to check this transaction again + min = kotlin.math.max(it.getLong("bank_transaction_id"), min) map(it) } } - // Start expensive listening process only if we intend to poll - if (poll_ms > 0) { - var items = listOf<T>() - notifWatcher.(listen)(bankAccountId) { flow -> + val shoudPoll = when { + params.poll_ms <= 0 -> false + backward -> { val maxId = conn.prepareStatement("SELECT MAX(bank_transaction_id) FROM bank_account_transactions") .oneOrNull { it.getLong(1) } ?: 0; - // Start buffering notification to not miss any + // Check if a new transaction could appear within the chosen interval + max > maxId + 1 + } + else -> true + } + + if (shoudPoll) { + var history = listOf<T>() + notifWatcher.(listen)(bankAccountId) { flow -> + // Start buffering notification before loading transactions to not miss any val buffered = flow.buffer() // Initial load - items += load() - if (backward) delta += items.size else delta -= items.size - - // Long polling if we want more item and we could have more - if (delta != 0 && !(backward && params.start <= maxId + 1)) { - withTimeoutOrNull(poll_ms) { + history += load(nbTx) + // Long polling if transactions are missing + val missing = nbTx - history.size + if (missing > 0) { + withTimeoutOrNull(params.poll_ms) { buffered - .filter { it.rowId > lastSeenId } // Skip transactions we already have processed - .take(abs(delta).toInt()).count() + .filter { it.rowId > min } // Skip transactions already checked + .take(missing).count() // Wait for missing transactions } if (backward) { - min = lastSeenId - // When going backward we might found more items that they are missing - delta = params.delta - items = (load() + items).take(abs(delta)) + // When going backward, we could find more transactions than we need + history = (load(nbTx) + history).take(nbTx) } else { - min = lastSeenId - items += load() + // Only load missing ones + history += load(missing) } } } - items + history } else { - load() + load(nbTx) } } } @@ -1050,8 +1057,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos fun bankTransactionGetHistory( start: Long, delta: Int, - bankAccountId: Long, - withDirection: TransactionDirection? = null + bankAccountId: Long ): List<BankAccountTransaction> = conn { conn -> val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC") val stmt = conn.prepareStatement(""" @@ -1067,35 +1073,19 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos ,account_servicer_reference ,payment_information_id ,end_to_end_id - ${if (withDirection != null) "" else ",direction"} + ,direction ,bank_account_id ,bank_transaction_id FROM bank_account_transactions WHERE bank_transaction_id ${cmpOp} ? AND bank_account_id=? - ${if (withDirection != null) "AND direction=?::direction_enum" else ""} ORDER BY bank_transaction_id ${orderBy} LIMIT ? """) stmt.setLong(1, start) stmt.setLong(2, bankAccountId) - /** - * The LIMIT parameter index might change, according to - * the presence of the direction filter. - */ - val limitParamIndex = if (withDirection != null) { - stmt.setString(3, withDirection.name) - 4 - } - else - 3 - stmt.setInt(limitParamIndex, abs(delta)) + stmt.setInt(3, abs(delta)) stmt.all { - val direction = withDirection ?: when (it.getString("direction")) { - "credit" -> TransactionDirection.credit - "debit" -> TransactionDirection.debit - else -> throw internalServerError("Wrong direction in transaction: $this") - } BankAccountTransaction( creditorPaytoUri = it.getString("creditor_payto_uri"), creditorName = it.getString("creditor_name"), @@ -1108,7 +1098,11 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos ), accountServicerReference = it.getString("account_servicer_reference"), endToEndId = it.getString("end_to_end_id"), - direction = direction, + direction = when (it.getString("direction")) { + "credit" -> TransactionDirection.credit + "debit" -> TransactionDirection.debit + else -> throw internalServerError("Wrong direction in transaction: $this") + }, bankAccountId = it.getLong("bank_account_id"), paymentInformationId = it.getString("payment_information_id"), subject = it.getString("subject"), @@ -1595,11 +1589,11 @@ private class NotificationWatcher(private val pgSource: PGSimpleDataSource) { val debitRow = info[2]; val creditRow = info[3]; - bankTxFlows.get(debtorAccount)?.run { + bankTxFlows[debtorAccount]?.run { flow.emit(Notification(debitRow)) flow.emit(Notification(creditRow)) } - bankTxFlows.get(creditorAccount)?.run { + bankTxFlows[creditorAccount]?.run { flow.emit(Notification(debitRow)) flow.emit(Notification(creditRow)) } @@ -1608,11 +1602,11 @@ private class NotificationWatcher(private val pgSource: PGSimpleDataSource) { val account = info[0]; val row = info[1]; if (it.name == "outgoing_tx") { - outgoingTxFlows.get(account)?.run { + outgoingTxFlows[account]?.run { flow.emit(Notification(row)) } } else { - incomingTxFlows.get(account)?.run { + incomingTxFlows[account]?.run { flow.emit(Notification(row)) } } |