commit 9065207368c50b976f2b4a0a8e923b6f7ad5fa0c
parent 4072928c0539584b279f241da174a34629f56985
Author: Antoine A <>
Date: Mon, 9 Oct 2023 15:21:22 +0000
New long polling design with a single notification channel
Diffstat:
4 files changed, 178 insertions(+), 134 deletions(-)
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
@@ -33,7 +33,10 @@ import java.io.File
import java.sql.*
import java.time.Instant
import java.util.*
+import java.util.concurrent.ConcurrentHashMap
import kotlin.math.abs
+import kotlinx.coroutines.flow.*
+import kotlinx.coroutines.*
import com.zaxxer.hikari.*
private const val DB_CTR_LIMIT = 1000000
@@ -175,6 +178,7 @@ private fun PreparedStatement.executeUpdateViolation(): Boolean {
class Database(dbConfig: String, private val bankCurrency: String): java.io.Closeable {
private val pgSource: PGSimpleDataSource
private val dbPool: HikariDataSource
+ private val notifWatcher: NotificationWatcher
init {
pgSource = pgDataSource(dbConfig)
@@ -183,6 +187,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
config.connectionInitSql = "SET search_path TO libeufin_bank;"
config.validate()
dbPool = HikariDataSource(config);
+ notifWatcher = NotificationWatcher(pgSource)
}
override fun close() {
@@ -682,6 +687,8 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
fun bankTransactionCreate(
tx: BankInternalTransaction
): BankTransactionResult = conn { conn ->
+ // TODO register incoming transaction if creditor is taler exchange and subject is well formed else bounce
+ // TODO register outgoing transaction if debitor is taler exchange else ignore
val stmt = conn.prepareStatement("""
SELECT out_nx_creditor, out_nx_debtor, out_balance_insufficient
FROM bank_wire_transfer(?,?,TEXT(?),(?,?)::taler_amount,?,TEXT(?),TEXT(?),TEXT(?))
@@ -711,10 +718,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
logger.error("Balance insufficient")
BankTransactionResult.CONFLICT
}
- else -> {
- logger.debug("New transaction ${tx.creditorAccountId} -> ${tx.debtorAccountId}")
- BankTransactionResult.SUCCESS
- }
+ else -> BankTransactionResult.SUCCESS
}
}
}
@@ -794,7 +798,6 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
var start = params.start
var delta = params.delta
var poll_ms = params.poll_ms;
- val channel = "${direction.name}_$bankAccountId";
val items = mutableListOf<T>()
// If going backward with a starting point, it is useless to poll
@@ -802,23 +805,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
poll_ms = 0;
}
- // TODO listening for notification is blocking a connection and postgres support a limited amount of connections
- // TODO we should use a single connection to listen for notification and dispatch them with kotlin code
-
- val conn: Connection; // Generic connection to close and query
- val pg: PgConnection; // Postgres connection for notifications
-
- // Only start expensive listening and connection creation if we intend to poll
- if (poll_ms > 0) {
- pg = pgSource.pgConnection()
- conn = pg
- pg.execSQLUpdate("LISTEN $channel");
- } else {
- conn = dbPool.getConnection()
- pg = conn.unwrap(PgConnection::class.java)
- }
-
- conn.use {
+ dbPool.getConnection().use { conn ->
// Prepare statement
val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC")
val stmt = conn.prepareStatement("""
@@ -891,39 +878,43 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
}
}
- loadBankHistory()
-
- // Long polling
- while (delta != 0L && poll_ms > 0) {
- var remaining = abs(delta);
- do {
- val pollStart = System.currentTimeMillis()
- pg.getNotifications(poll_ms.toInt()).forEach {
- val id = it.parameter.toLong()
- val new = when {
- params.start == Long.MAX_VALUE -> true
- delta < 0 -> id < start
- else -> id > start
+ // Start expensive listening process only if we intend to poll
+ if (poll_ms > 0) {
+ notifWatcher.listen(NotificationTopic(bankAccountId, direction)) { flow ->
+ // Start buffering notification to not miss any
+ val buffered = flow.buffer()
+ // Initial load
+ loadBankHistory()
+ // Long polling while necessary
+ while (delta != 0L && poll_ms > 0) {
+ val pollStart = System.currentTimeMillis()
+ withTimeoutOrNull(poll_ms) {
+ buffered.filter {
+ when {
+ params.start == Long.MAX_VALUE -> true
+ delta < 0 -> it.rowId < start
+ else -> it.rowId > start
+ }
+ }.take(abs(delta).toInt()).count()
+ }
+ val pollEnd = System.currentTimeMillis()
+ poll_ms -= pollEnd - pollStart
+
+ // If going backward without a starting point, we reset loading progress
+ if (params.start == Long.MAX_VALUE) {
+ start = params.start
+ delta = params.delta
+ items.clear()
}
- if (new) remaining -= 1
+ loadBankHistory()
}
- val pollEnd = System.currentTimeMillis()
- poll_ms -= pollEnd - pollStart
- } while (poll_ms > 0 && remaining > 0L)
-
- // If going backward without a starting point, we reset loading progress
- if (params.start == Long.MAX_VALUE) {
- start = params.start
- delta = params.delta
- items.clear()
}
+ } else {
loadBankHistory()
}
-
- // No need to unlisten or clear notifications as we close the connection when polling is used
-
- return items.toList();
}
+
+ return items.toList();
}
/**
@@ -1447,4 +1438,65 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
}
}
}
+}
+
+private data class Notification(val rowId: Long)
+private data class NotificationTopic(val account: Long, val direction: TransactionDirection)
+
+private class NotificationWatcher(private val pgSource: PGSimpleDataSource) {
+ private class CountedSharedFlow(val flow: MutableSharedFlow<Notification>, var count: Int)
+
+ private val bankTxFlows = ConcurrentHashMap<NotificationTopic, CountedSharedFlow>()
+
+ init {
+ kotlin.concurrent.thread(isDaemon = true) {
+ runBlocking {
+ while (true) {
+ try {
+ val conn = pgSource.pgConnection()
+ conn.execSQLUpdate("LISTEN bank_tx")
+
+ while (true) {
+ conn.getNotifications().forEach {
+ val info = it.parameter.split(' ', limit = 4).map { it.toLong() }
+ val debtorAccount = info[0];
+ val creditorAccount = info[1];
+ val debitRow = info[2];
+ val creditRow = info[3];
+
+ bankTxFlows.get(NotificationTopic(debtorAccount, TransactionDirection.debit))?.run {
+ flow.emit(Notification(debitRow))
+ }
+ bankTxFlows.get(NotificationTopic(creditorAccount, TransactionDirection.credit))?.run {
+ flow.emit(Notification(creditRow))
+ }
+ }
+ }
+ } catch (e: Exception) {
+ logger.warn("notification_watcher failed: $e")
+ }
+ }
+ }
+ }
+ }
+
+ suspend fun listen(topic: NotificationTopic, lambda: suspend (Flow<Notification>) -> Unit) {
+ // Register listener
+ val flow = bankTxFlows.compute(topic) { _, v ->
+ val tmp = v ?: CountedSharedFlow(MutableSharedFlow(), 0);
+ tmp.count++;
+ tmp
+ }!!.flow;
+
+ try {
+ lambda(flow)
+ } finally {
+ // Unregister listener
+ bankTxFlows.compute(topic) { _, v ->
+ v!!;
+ v.count--;
+ if (v.count > 0) v else null
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Main.kt b/bank/src/main/kotlin/tech/libeufin/bank/Main.kt
@@ -566,12 +566,7 @@ class ServeBank : CliktCommand("Run libeufin-bank HTTP server", name = "serve")
val db = Database(dbConnStr, ctx.currency)
if (!maybeCreateAdminAccount(db, ctx)) // logs provided by the helper
exitProcess(1)
- embeddedServer(Netty, port = servePort, configure = {
- // Disable threads for now, the DB isn't thread safe yet.
- connectionGroupSize = 1
- workerGroupSize = 1
- callGroupSize = 1
- }) {
+ embeddedServer(Netty, port = servePort) {
corebankWebApp(db, ctx)
}.start(wait = true)
}
diff --git a/bank/src/test/kotlin/TalerApiTest.kt b/bank/src/test/kotlin/TalerApiTest.kt
@@ -280,7 +280,7 @@ class TalerApiTest {
}.assertHistory(5)
// Check no useless polling
- assertTime(0, 1000) {
+ assertTime(0, 300) {
client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&start=20&long_poll_ms=1000") {
basicAuth("bar", "secret")
}.assertHistory(5)
@@ -291,49 +291,48 @@ class TalerApiTest {
basicAuth("bar", "secret")
}.assertHistory(5)
- // Check polling succeed forward
runBlocking {
- async {
- delay(200)
- db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess()
- }
- assertTime(200, 1000) {
- client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=1000") {
- basicAuth("bar", "secret")
- }.assertHistory(6)
- }
- }
-
- // Check polling succeed backward
- runBlocking {
- async {
- delay(200)
- db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess()
- }
- assertTime(200, 1000) {
- client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-7&long_poll_ms=1000") {
- basicAuth("bar", "secret")
- }.assertHistory(7)
- }
- }
-
- // Check polling timeout
- runBlocking {
- launch {
- delay(200)
- db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess()
- }
- assertTime(200, 400) {
- client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=9&long_poll_ms=300") {
- basicAuth("bar", "secret")
- }.assertHistory(8)
- }
+ joinAll(
+ launch { // Check polling succeed forward
+ assertTime(200, 1000) {
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=1000") {
+ basicAuth("bar", "secret")
+ }.assertHistory(6)
+ }
+ },
+ launch { // Check polling succeed backward
+ assertTime(200, 1000) {
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&long_poll_ms=1000") {
+ basicAuth("bar", "secret")
+ }.assertHistory(6)
+ }
+ },
+ launch { // Check polling timeout forward
+ assertTime(200, 400) {
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=8&long_poll_ms=300") {
+ basicAuth("bar", "secret")
+ }.assertHistory(6)
+ }
+ },
+ launch { // Check polling timeout backward
+ assertTime(200, 400) {
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-8&long_poll_ms=300") {
+ basicAuth("bar", "secret")
+ }.assertHistory(6)
+ }
+ },
+ launch {
+ delay(200)
+ db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess()
+ }
+ )
}
// Testing ranges.
val mockReservePub = randShortHashCode().encoded
- for (i in 1..400)
+ repeat(300) {
db.bankTransactionCreate(genTx(mockReservePub)).assertSuccess()
+ }
// forward range:
client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=10&start=30") {
@@ -421,7 +420,7 @@ class TalerApiTest {
}.assertHistory(5)
// Check no useless polling
- assertTime(0, 1000) {
+ assertTime(0, 300) {
client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&start=20&long_poll_ms=1000") {
basicAuth("bar", "secret")
}.assertHistory(5)
@@ -432,48 +431,47 @@ class TalerApiTest {
basicAuth("bar", "secret")
}.assertHistory(5)
- // Check polling succeed forward
runBlocking {
- async {
- delay(200)
- transfer(db, 2, bankAccountFoo)
- }
- assertTime(200, 1000) {
- client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=1000") {
- basicAuth("bar", "secret")
- }.assertHistory(6)
- }
- }
-
- // Check polling succeed backward
- runBlocking {
- async {
- delay(200)
- transfer(db, 2, bankAccountFoo)
- }
- assertTime(200, 1000) {
- client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-7&long_poll_ms=1000") {
- basicAuth("bar", "secret")
- }.assertHistory(7)
- }
- }
-
- // Check polling timeout
- runBlocking {
- launch {
- delay(200)
- transfer(db, 2, bankAccountFoo)
- }
- assertTime(200, 400) {
- client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=8&long_poll_ms=300") {
- basicAuth("bar", "secret")
- }.assertHistory(8)
- }
+ joinAll(
+ launch { // Check polling succeed forward
+ assertTime(200, 1000) {
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=1000") {
+ basicAuth("bar", "secret")
+ }.assertHistory(6)
+ }
+ },
+ launch { // Check polling succeed backward
+ assertTime(200, 1000) {
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&long_poll_ms=1000") {
+ basicAuth("bar", "secret")
+ }.assertHistory(6)
+ }
+ },
+ launch { // Check polling timeout forward
+ assertTime(200, 400) {
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=8&long_poll_ms=300") {
+ basicAuth("bar", "secret")
+ }.assertHistory(6)
+ }
+ },
+ launch { // Check polling timeout backward
+ assertTime(200, 400) {
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-8&long_poll_ms=300") {
+ basicAuth("bar", "secret")
+ }.assertHistory(6)
+ }
+ },
+ launch {
+ delay(200)
+ transfer(db, 2, bankAccountFoo)
+ }
+ )
}
// Testing ranges.
- for (i in 1..400)
+ repeat(300) {
transfer(db, 2, bankAccountFoo)
+ }
// forward range:
client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=10&start=30") {
diff --git a/database-versioning/procedures.sql b/database-versioning/procedures.sql
@@ -624,9 +624,8 @@ SET
has_debt=will_creditor_have_debt
WHERE bank_account_id=in_creditor_account_id;
--- notify transactions
-PERFORM pg_notify('debit_' || in_debtor_account_id, out_debit_row_id::text);
-PERFORM pg_notify('credit_' || in_creditor_account_id, out_credit_row_id::text);
+-- notify new transaction
+PERFORM pg_notify('bank_tx', in_debtor_account_id || ' ' || in_creditor_account_id || ' ' || out_debit_row_id || ' ' || out_credit_row_id)
RETURN;
END $$;