commit 70379b90fb1e606fe5fc2f58c85b41c80a686e57
parent ac77e1c832e43ec95665ffe6bbb9450c70495a3c
Author: Antoine A <>
Date: Wed, 18 Oct 2023 12:43:22 +0000
Fix and simplify long polling
Diffstat:
3 files changed, 161 insertions(+), 184 deletions(-)
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
@@ -914,66 +914,43 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
): List<T> {
val backward = params.delta < 0
val nbTx = abs(params.delta) // Number of transaction to query
- // Range of transaction ids to check
- var (min, max) = if (backward) Pair(0L, params.start) else Pair(params.start, Long.MAX_VALUE)
val query = """
$query
WHERE bank_account_id=? AND
- bank_transaction_id > ? AND bank_transaction_id < ?
+ bank_transaction_id ${if (backward) '<' else '>'} ?
ORDER BY bank_transaction_id ${if (backward) "DESC" else "ASC"}
LIMIT ?
"""
-
+
suspend fun load(amount: Int): List<T> = conn { conn ->
conn.prepareStatement(query).use { stmt ->
stmt.setLong(1, bankAccountId)
- stmt.setLong(2, min)
- stmt.setLong(3, max)
- stmt.setInt(4, amount)
- stmt.all {
- // Remember not to check this transaction again
- min = kotlin.math.max(it.getLong("bank_transaction_id"), min)
- map(it)
- }
- }
- }
-
- val shoudPoll = when {
- params.poll_ms <= 0 -> false
- backward && max == Long.MAX_VALUE -> true
- backward -> {
- val maxId = conn {
- it.prepareStatement("SELECT MAX(bank_transaction_id) FROM bank_account_transactions")
- .oneOrNull { it.getLong(1) } ?: 0
- };
- // Check if a new transaction could appear within the chosen interval
- max > maxId + 1
+ stmt.setLong(2, params.start)
+ stmt.setInt(3, amount)
+ stmt.all { map(it) }
}
- else -> true
}
- if (shoudPoll) {
+ // TODO do we want to handle polling when going backward and there is no transactions yet ?
+ // When going backward there is always at least one transaction or none
+ if (!backward && params.poll_ms > 0) {
var history = listOf<T>()
notifWatcher.(listen)(bankAccountId) { flow ->
- // Start buffering notification before loading transactions to not miss any
- val buffered = flow.buffer()
- // Initial load
- history += load(nbTx)
- // Long polling if transactions are missing
- val missing = nbTx - history.size
- if (missing > 0) {
- withTimeoutOrNull(params.poll_ms) {
- buffered
- .filter { it > min } // Skip transactions already checked
- .take(missing).count() // Wait for missing transactions
+ coroutineScope {
+ // Start buffering notification before loading transactions to not miss any
+ val polling = launch {
+ withTimeoutOrNull(params.poll_ms) {
+ flow.first { it > params.start } // Always forward so >
+ }
}
-
- if (backward) {
- // When going backward, we could find more transactions than we need
- history = (load(nbTx) + history).take(nbTx)
+ // Initial loading
+ history = load(nbTx)
+ // Long polling if we found no transactions
+ if (history.isEmpty()) {
+ polling.join()
+ history = load(nbTx)
} else {
- // Only load missing ones
- history += load(missing)
+ polling.cancel()
}
}
}
diff --git a/bank/src/test/kotlin/CoreBankApiTest.kt b/bank/src/test/kotlin/CoreBankApiTest.kt
@@ -330,12 +330,14 @@ class CoreBankTransactionsApiTest {
// testing that the first row_id is at most the 'start' query param.
assert(history.transactions[0].row_id <= params.start)
// testing that the row_id decreases.
- assert(history.transactions.windowed(2).all { (a, b) -> a.row_id > b.row_id })
+ if (history.transactions.size > 1)
+ assert(history.transactions.windowed(2).all { (a, b) -> a.row_id > b.row_id })
} else {
// testing that the first row_id is at least the 'start' query param.
assert(history.transactions[0].row_id >= params.start)
// testing that the row_id increases.
- assert(history.transactions.windowed(2).all { (a, b) -> a.row_id < b.row_id })
+ if (history.transactions.size > 1)
+ assert(history.transactions.windowed(2).all { (a, b) -> a.row_id < b.row_id })
}
}
}
@@ -367,57 +369,41 @@ class CoreBankTransactionsApiTest {
}
// Check no useless polling
- assertTime(0, 300) {
+ assertTime(0, 200) {
client.get("/accounts/merchant/transactions?delta=-6&start=11&long_poll_ms=1000") {
basicAuth("merchant", "merchant-password")
}.assertHistory(5)
}
- // Check polling end
- client.get("/accounts/merchant/transactions?delta=6&long_poll_ms=60") {
- basicAuth("merchant", "merchant-password")
- }.assertHistory(5)
-
- runBlocking {
- joinAll(
- launch { // Check polling succeed forward
- assertTime(200, 1000) {
- client.get("/accounts/merchant/transactions?delta=6&long_poll_ms=1000") {
- basicAuth("merchant", "merchant-password")
- }.assertHistory(6)
- }
- },
- launch { // Check polling succeed backward
- assertTime(200, 1000) {
- client.get("/accounts/merchant/transactions?delta=-6&long_poll_ms=1000") {
- basicAuth("merchant", "merchant-password")
- }.assertHistory(6)
- }
- },
- launch { // Check polling timeout forward
- assertTime(200, 400) {
- client.get("/accounts/merchant/transactions?delta=8&long_poll_ms=300") {
- basicAuth("merchant", "merchant-password")
- }.assertHistory(6)
- }
- },
- launch { // Check polling timeout backward
- assertTime(200, 400) {
- client.get("/accounts/merchant/transactions?delta=-8&long_poll_ms=300") {
- basicAuth("merchant", "merchant-password")
- }.assertHistory(6)
- }
- },
- launch {
- delay(200)
- client.post("/accounts/merchant/transactions") {
+ // Check no polling when find transaction
+ assertTime(0, 200) {
+ client.get("/accounts/merchant/transactions?delta=6&long_poll_ms=1000") {
+ basicAuth("merchant", "merchant-password")
+ }.assertHistory(5)
+ }
+
+ coroutineScope {
+ launch { // Check polling succeed
+ assertTime(200, 1000) {
+ client.get("/accounts/merchant/transactions?delta=2&start=10&long_poll_ms=1000") {
basicAuth("merchant", "merchant-password")
- jsonBody(json {
- "payto_uri" to "payto://iban/EXCHANGE-IBAN-XYZ?message=payout_poll&amount=KUDOS:4.2"
- })
- }.assertOk()
+ }.assertHistory(1)
}
- )
+ }
+ launch { // Check polling timeout
+ assertTime(200, 400) {
+ client.get("/accounts/merchant/transactions?delta=1&start=11&long_poll_ms=300") {
+ basicAuth("merchant", "merchant-password")
+ }.assertHistory(0)
+ }
+ }
+ delay(200)
+ client.post("/accounts/merchant/transactions") {
+ basicAuth("merchant", "merchant-password")
+ jsonBody(json {
+ "payto_uri" to "payto://iban/EXCHANGE-IBAN-XYZ?message=payout_poll&amount=KUDOS:4.2"
+ })
+ }.assertOk()
}
// Testing ranges.
diff --git a/bank/src/test/kotlin/WireGatewayApiTest.kt b/bank/src/test/kotlin/WireGatewayApiTest.kt
@@ -213,16 +213,17 @@ class WireGatewayApiTest {
// testing that the first row_id is at most the 'start' query param.
assert(history.incoming_transactions[0].row_id <= params.start)
// testing that the row_id decreases.
- assert(history.incoming_transactions.windowed(2).all { (a, b) -> a.row_id > b.row_id })
+ if (history.incoming_transactions.size > 1)
+ assert(history.incoming_transactions.windowed(2).all { (a, b) -> a.row_id > b.row_id })
} else {
// testing that the first row_id is at least the 'start' query param.
assert(history.incoming_transactions[0].row_id >= params.start)
// testing that the row_id increases.
- assert(history.incoming_transactions.windowed(2).all { (a, b) -> a.row_id < b.row_id })
+ if (history.incoming_transactions.size > 1)
+ assert(history.incoming_transactions.windowed(2).all { (a, b) -> a.row_id < b.row_id })
}
}
-
authRoutine("/accounts/merchant/taler-wire-gateway/history/incoming?delta=7", method = HttpMethod.Get)
// Check error when no transactions
@@ -243,22 +244,22 @@ class WireGatewayApiTest {
genTx(IncomingTxMetadata(randShortHashCode()).encode(), 2, 1)
).assertSuccess()
// Gen one transaction using withdraw logic
- val uuid = client.post("/accounts/merchant/withdrawals") {
+ client.post("/accounts/merchant/withdrawals") {
basicAuth("merchant", "merchant-password")
jsonBody(json { "amount" to "KUDOS:9" })
}.assertOk().run {
val resp = Json.decodeFromString<BankAccountCreateWithdrawalResponse>(bodyAsText())
- resp.taler_withdraw_uri.split("/").last()
+ val uuid = resp.taler_withdraw_uri.split("/").last()
+ client.post("/taler-integration/withdrawal-operation/${uuid}") {
+ jsonBody(json {
+ "reserve_pub" to randEddsaPublicKey()
+ "selected_exchange" to IbanPayTo("payto://iban/EXCHANGE-IBAN-XYZ")
+ })
+ }.assertOk()
+ client.post("/withdrawals/${uuid}/confirm") {
+ basicAuth("merchant", "merchant-password")
+ }.assertOk()
}
- client.post("/taler-integration/withdrawal-operation/${uuid}") {
- jsonBody(json {
- "reserve_pub" to randEddsaPublicKey()
- "selected_exchange" to IbanPayTo("payto://iban/EXCHANGE-IBAN-XYZ")
- })
- }.assertOk()
- client.post("/withdrawals/${uuid}/confirm") {
- basicAuth("merchant", "merchant-password")
- }.assertOk()
// Check ignore bogus subject
client.get("/accounts/exchange/taler-wire-gateway/history/incoming?delta=7") {
@@ -271,52 +272,79 @@ class WireGatewayApiTest {
}.assertHistory(5)
// Check no useless polling
- assertTime(0, 300) {
+ assertTime(0, 200) {
client.get("/accounts/exchange/taler-wire-gateway/history/incoming?delta=-6&start=15&long_poll_ms=1000") {
basicAuth("exchange", "exchange-password")
}.assertHistory(5)
}
- // Check polling end
- client.get("/accounts/exchange/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=60") {
- basicAuth("exchange", "exchange-password")
- }.assertHistory(5)
+ // Check no polling when find transaction
+ assertTime(0, 200) {
+ client.get("/accounts/exchange/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=60") {
+ basicAuth("exchange", "exchange-password")
+ }.assertHistory(5)
+ }
- runBlocking {
- joinAll(
- launch { // Check polling succeed forward
- assertTime(200, 1000) {
- client.get("/accounts/exchange/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=1000") {
- basicAuth("exchange", "exchange-password")
- }.assertHistory(6)
- }
- },
- launch { // Check polling succeed backward
- assertTime(200, 1000) {
- client.get("/accounts/exchange/taler-wire-gateway/history/incoming?delta=-6&long_poll_ms=1000") {
- basicAuth("exchange", "exchange-password")
- }.assertHistory(6)
- }
- },
- launch { // Check polling timeout forward
- assertTime(200, 400) {
- client.get("/accounts/exchange/taler-wire-gateway/history/incoming?delta=8&long_poll_ms=300") {
- basicAuth("exchange", "exchange-password")
- }.assertHistory(6)
- }
- },
- launch { // Check polling timeout backward
- assertTime(200, 400) {
- client.get("/accounts/exchange/taler-wire-gateway/history/incoming?delta=-8&long_poll_ms=300") {
- basicAuth("exchange", "exchange-password")
- }.assertHistory(6)
- }
- },
- launch {
- delay(200)
- db.genIncoming("exchange", bankAccountMerchant)
+ coroutineScope {
+ launch { // Check polling succeed
+ assertTime(200, 300) {
+ client.get("/accounts/exchange/taler-wire-gateway/history/incoming?delta=2&start=14&long_poll_ms=1000") {
+ basicAuth("exchange", "exchange-password")
+ }.assertHistory(1)
}
- )
+ }
+ launch { // Check polling timeout
+ assertTime(200, 400) {
+ client.get("/accounts/exchange/taler-wire-gateway/history/incoming?delta=1&start=16&long_poll_ms=300") {
+ basicAuth("exchange", "exchange-password")
+ }.assertNoContent()
+ }
+ }
+ delay(200)
+ db.genIncoming("exchange", bankAccountMerchant)
+ }
+
+ // Test trigger by raw transaction
+ coroutineScope {
+ launch {
+ assertTime(200, 300) {
+ client.get("/accounts/exchange/taler-wire-gateway/history/incoming?delta=7&start=16&long_poll_ms=1000") {
+ basicAuth("exchange", "exchange-password")
+ }.assertHistory(1)
+ }
+ }
+ delay(200)
+ db.bankTransactionCreate(
+ genTx(IncomingTxMetadata(randShortHashCode()).encode(), 2, 1)
+ ).assertSuccess()
+ }
+
+ // Test trigger by withdraw operationr
+ coroutineScope {
+ launch {
+ assertTime(200, 300) {
+ client.get("/accounts/exchange/taler-wire-gateway/history/incoming?delta=7&start=18&long_poll_ms=1000") {
+ basicAuth("exchange", "exchange-password")
+ }.assertHistory(1)
+ }
+ }
+ delay(200)
+ client.post("/accounts/merchant/withdrawals") {
+ basicAuth("merchant", "merchant-password")
+ jsonBody(json { "amount" to "KUDOS:9" })
+ }.assertOk().run {
+ val resp = Json.decodeFromString<BankAccountCreateWithdrawalResponse>(bodyAsText())
+ val uuid = resp.taler_withdraw_uri.split("/").last()
+ client.post("/taler-integration/withdrawal-operation/${uuid}") {
+ jsonBody(json {
+ "reserve_pub" to randEddsaPublicKey()
+ "selected_exchange" to IbanPayTo("payto://iban/EXCHANGE-IBAN-XYZ")
+ })
+ }.assertOk()
+ client.post("/withdrawals/${uuid}/confirm") {
+ basicAuth("merchant", "merchant-password")
+ }.assertOk()
+ }
}
// Testing ranges.
@@ -364,12 +392,14 @@ class WireGatewayApiTest {
// testing that the first row_id is at most the 'start' query param.
assert(history.outgoing_transactions[0].row_id <= params.start)
// testing that the row_id decreases.
- assert(history.outgoing_transactions.windowed(2).all { (a, b) -> a.row_id > b.row_id })
+ if (history.outgoing_transactions.size > 1)
+ assert(history.outgoing_transactions.windowed(2).all { (a, b) -> a.row_id > b.row_id })
} else {
// testing that the first row_id is at least the 'start' query param.
assert(history.outgoing_transactions[0].row_id >= params.start)
// testing that the row_id increases.
- assert(history.outgoing_transactions.windowed(2).all { (a, b) -> a.row_id < b.row_id })
+ if (history.outgoing_transactions.size > 1)
+ assert(history.outgoing_transactions.windowed(2).all { (a, b) -> a.row_id < b.row_id })
}
}
@@ -406,52 +436,36 @@ class WireGatewayApiTest {
}.assertHistory(5)
// Check no useless polling
- assertTime(0, 300) {
+ assertTime(0, 200) {
client.get("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=-6&start=15&long_poll_ms=1000") {
basicAuth("exchange", "exchange-password")
}.assertHistory(5)
}
- // Check polling end
- client.get("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=60") {
- basicAuth("exchange", "exchange-password")
- }.assertHistory(5)
+ // Check no polling when find transaction
+ assertTime(0, 200) {
+ client.get("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=1000") {
+ basicAuth("exchange", "exchange-password")
+ }.assertHistory(5)
+ }
- runBlocking {
- joinAll(
- launch { // Check polling succeed forward
- assertTime(200, 1000) {
- client.get("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=1000") {
- basicAuth("exchange", "exchange-password")
- }.assertHistory(6)
- }
- },
- launch { // Check polling succeed backward
- assertTime(200, 1000) {
- client.get("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=-6&long_poll_ms=1000") {
- basicAuth("exchange", "exchange-password")
- }.assertHistory(6)
- }
- },
- launch { // Check polling timeout forward
- assertTime(200, 400) {
- client.get("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=8&long_poll_ms=300") {
- basicAuth("exchange", "exchange-password")
- }.assertHistory(6)
- }
- },
- launch { // Check polling timeout backward
- assertTime(200, 400) {
- client.get("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=-8&long_poll_ms=300") {
- basicAuth("exchange", "exchange-password")
- }.assertHistory(6)
- }
- },
- launch {
- delay(200)
- db.genTransfer("exchange", bankAccountMerchant)
+ coroutineScope {
+ launch { // Check polling succeed forward
+ assertTime(200, 300) {
+ client.get("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=2&start=14&long_poll_ms=1000") {
+ basicAuth("exchange", "exchange-password")
+ }.assertHistory(1)
}
- )
+ }
+ launch { // Check polling timeout forward
+ assertTime(200, 400) {
+ client.get("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=1&start=16&long_poll_ms=300") {
+ basicAuth("exchange", "exchange-password")
+ }.assertNoContent()
+ }
+ }
+ delay(200)
+ db.genTransfer("exchange", bankAccountMerchant)
}
// Testing ranges.