summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntoine A <>2023-10-11 19:02:09 +0000
committerAntoine A <>2023-10-11 19:02:09 +0000
commit9d43dc08ac94ff1f30d76ee3978ea2f4571c4369 (patch)
treefa7b00d8e18af1f2caf7d157ac6178ac6b574b7e
parent11d9a79728761b838c6fc6d8e6821207113857e4 (diff)
downloadlibeufin-9d43dc08ac94ff1f30d76ee3978ea2f4571c4369.tar.gz
libeufin-9d43dc08ac94ff1f30d76ee3978ea2f4571c4369.tar.bz2
libeufin-9d43dc08ac94ff1f30d76ee3978ea2f4571c4369.zip
Clean up and document history polling
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/Database.kt110
1 files changed, 52 insertions, 58 deletions
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
index 867bef1d..59da5b23 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
@@ -77,7 +77,7 @@ private fun pgDataSource(dbConfig: String): PGSimpleDataSource {
}
private fun PGSimpleDataSource.pgConnection(): PgConnection {
- val conn = getConnection().unwrap(PgConnection::class.java)
+ val conn = connection.unwrap(PgConnection::class.java)
conn.execSQLUpdate("SET search_path TO libeufin_bank;")
return conn
}
@@ -194,12 +194,11 @@ private fun PreparedStatement.executeUpdateViolation(): Boolean {
}
class Database(dbConfig: String, private val bankCurrency: String): java.io.Closeable {
- private val pgSource: PGSimpleDataSource
private val dbPool: HikariDataSource
private val notifWatcher: NotificationWatcher
init {
- pgSource = pgDataSource(dbConfig)
+ val pgSource = pgDataSource(dbConfig)
val config = HikariConfig();
config.dataSource = pgSource
config.connectionInitSql = "SET search_path TO libeufin_bank;"
@@ -861,6 +860,10 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
}
}
+ /**
+ * The following function returns the list of transactions, according
+ * to the history parameters and perform long polling when necessary.
+ */
private suspend fun <T> poolHistory(
params: HistoryParams,
bankAccountId: Long,
@@ -868,68 +871,72 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
query: String,
map: (ResultSet) -> T
): List<T> {
- var lastSeenId = 0L;
- var poll_ms = params.poll_ms;
- var delta = params.delta
- val backward = delta < 0
+ val backward = params.delta < 0
+ val nbTx = abs(params.delta) // Number of transaction to query
+ // Range of transaction ids to check
var (min, max) = if (backward) Pair(0L, params.start) else Pair(params.start, Long.MAX_VALUE)
return dbPool.getConnection().use { conn ->
// Prepare statement
- val orderBy = if (backward) "DESC" else "ASC";
val stmt = conn.prepareStatement("""
$query
WHERE bank_account_id=? AND
bank_transaction_id > ? AND bank_transaction_id < ?
- ORDER BY bank_transaction_id $orderBy
+ ORDER BY bank_transaction_id ${if (backward) "DESC" else "ASC"}
LIMIT ?
""")
stmt.setLong(1, bankAccountId)
- fun load(): List<T> {
+ fun load(amount: Int): List<T> {
stmt.setLong(2, min)
stmt.setLong(3, max)
- stmt.setInt(4, abs(delta))
+ stmt.setInt(4, amount)
return stmt.all {
- lastSeenId = kotlin.math.max(it.getLong("bank_transaction_id"), lastSeenId)
+ // Remember not to check this transaction again
+ min = kotlin.math.max(it.getLong("bank_transaction_id"), min)
map(it)
}
}
- // Start expensive listening process only if we intend to poll
- if (poll_ms > 0) {
- var items = listOf<T>()
- notifWatcher.(listen)(bankAccountId) { flow ->
+ val shoudPoll = when {
+ params.poll_ms <= 0 -> false
+ backward -> {
val maxId = conn.prepareStatement("SELECT MAX(bank_transaction_id) FROM bank_account_transactions")
.oneOrNull { it.getLong(1) } ?: 0;
- // Start buffering notification to not miss any
+ // Check if a new transaction could appear within the chosen interval
+ max > maxId + 1
+ }
+ else -> true
+ }
+
+ if (shoudPoll) {
+ var history = listOf<T>()
+ notifWatcher.(listen)(bankAccountId) { flow ->
+ // Start buffering notification before loading transactions to not miss any
val buffered = flow.buffer()
// Initial load
- items += load()
- if (backward) delta += items.size else delta -= items.size
-
- // Long polling if we want more item and we could have more
- if (delta != 0 && !(backward && params.start <= maxId + 1)) {
- withTimeoutOrNull(poll_ms) {
+ history += load(nbTx)
+ // Long polling if transactions are missing
+ val missing = nbTx - history.size
+ if (missing > 0) {
+ withTimeoutOrNull(params.poll_ms) {
buffered
- .filter { it.rowId > lastSeenId } // Skip transactions we already have processed
- .take(abs(delta).toInt()).count()
+ .filter { it.rowId > min } // Skip transactions already checked
+ .take(missing).count() // Wait for missing transactions
}
if (backward) {
- min = lastSeenId
- // When going backward we might found more items that they are missing
- delta = params.delta
- items = (load() + items).take(abs(delta))
+ // When going backward, we could find more transactions than we need
+ history = (load(nbTx) + history).take(nbTx)
} else {
- min = lastSeenId
- items += load()
+ // Only load missing ones
+ history += load(missing)
}
}
}
- items
+ history
} else {
- load()
+ load(nbTx)
}
}
}
@@ -1050,8 +1057,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
fun bankTransactionGetHistory(
start: Long,
delta: Int,
- bankAccountId: Long,
- withDirection: TransactionDirection? = null
+ bankAccountId: Long
): List<BankAccountTransaction> = conn { conn ->
val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC")
val stmt = conn.prepareStatement("""
@@ -1067,35 +1073,19 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
,account_servicer_reference
,payment_information_id
,end_to_end_id
- ${if (withDirection != null) "" else ",direction"}
+ ,direction
,bank_account_id
,bank_transaction_id
FROM bank_account_transactions
WHERE bank_transaction_id ${cmpOp} ?
AND bank_account_id=?
- ${if (withDirection != null) "AND direction=?::direction_enum" else ""}
ORDER BY bank_transaction_id ${orderBy}
LIMIT ?
""")
stmt.setLong(1, start)
stmt.setLong(2, bankAccountId)
- /**
- * The LIMIT parameter index might change, according to
- * the presence of the direction filter.
- */
- val limitParamIndex = if (withDirection != null) {
- stmt.setString(3, withDirection.name)
- 4
- }
- else
- 3
- stmt.setInt(limitParamIndex, abs(delta))
+ stmt.setInt(3, abs(delta))
stmt.all {
- val direction = withDirection ?: when (it.getString("direction")) {
- "credit" -> TransactionDirection.credit
- "debit" -> TransactionDirection.debit
- else -> throw internalServerError("Wrong direction in transaction: $this")
- }
BankAccountTransaction(
creditorPaytoUri = it.getString("creditor_payto_uri"),
creditorName = it.getString("creditor_name"),
@@ -1108,7 +1098,11 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
),
accountServicerReference = it.getString("account_servicer_reference"),
endToEndId = it.getString("end_to_end_id"),
- direction = direction,
+ direction = when (it.getString("direction")) {
+ "credit" -> TransactionDirection.credit
+ "debit" -> TransactionDirection.debit
+ else -> throw internalServerError("Wrong direction in transaction: $this")
+ },
bankAccountId = it.getLong("bank_account_id"),
paymentInformationId = it.getString("payment_information_id"),
subject = it.getString("subject"),
@@ -1595,11 +1589,11 @@ private class NotificationWatcher(private val pgSource: PGSimpleDataSource) {
val debitRow = info[2];
val creditRow = info[3];
- bankTxFlows.get(debtorAccount)?.run {
+ bankTxFlows[debtorAccount]?.run {
flow.emit(Notification(debitRow))
flow.emit(Notification(creditRow))
}
- bankTxFlows.get(creditorAccount)?.run {
+ bankTxFlows[creditorAccount]?.run {
flow.emit(Notification(debitRow))
flow.emit(Notification(creditRow))
}
@@ -1608,11 +1602,11 @@ private class NotificationWatcher(private val pgSource: PGSimpleDataSource) {
val account = info[0];
val row = info[1];
if (it.name == "outgoing_tx") {
- outgoingTxFlows.get(account)?.run {
+ outgoingTxFlows[account]?.run {
flow.emit(Notification(row))
}
} else {
- incomingTxFlows.get(account)?.run {
+ incomingTxFlows[account]?.run {
flow.emit(Notification(row))
}
}