commit 8841ef62cec8943c3008db9a9b915cde313d04bd
parent 4166428e6ddbc57d3e564bc4ffd920c7891fc25b
Author: Antoine A <>
Date: Tue, 6 Aug 2024 14:39:18 +0200
common: support new API for long polling and pagination
Diffstat:
11 files changed, 70 insertions(+), 59 deletions(-)
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt b/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt
@@ -37,6 +37,6 @@ val RESERVED_ACCOUNTS = setOf("admin", "bank")
const val IBAN_ALLOCATION_RETRY_COUNTER: Int = 5
// API version
-const val COREBANK_API_VERSION: String = "4:13:1"
+const val COREBANK_API_VERSION: String = "5:0:2"
const val CONVERSION_API_VERSION: String = "0:1:0"
-const val INTEGRATION_API_VERSION: String = "2:1:3"
+const val INTEGRATION_API_VERSION: String = "3:0:4"
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt
@@ -271,12 +271,12 @@ class WithdrawalDAO(private val db: Database) {
status: (T) -> WithdrawalStatus,
load: suspend () -> T?
): T? {
- return if (params.polling.poll_ms > 0) {
+ return if (params.polling.timeout_ms > 0) {
db.listenWithdrawals(uuid) { flow ->
coroutineScope {
// Start buffering notification before loading transactions to not miss any
val polling = launch {
- withTimeoutOrNull(params.polling.poll_ms) {
+ withTimeoutOrNull(params.polling.timeout_ms) {
flow.first { it != params.old_state }
}
}
diff --git a/bank/src/test/kotlin/CoreBankApiTest.kt b/bank/src/test/kotlin/CoreBankApiTest.kt
@@ -961,7 +961,7 @@ class CoreBankAccountsApiTest {
}
}
// All accounts
- client.get("/accounts?delta=10"){
+ client.get("/accounts?limit=10"){
pwAuth("admin")
}.assertOkJson<ListBankAccountsResponse> {
assertEquals(6, it.accounts.size)
diff --git a/bank/src/test/kotlin/WireGatewayApiTest.kt b/bank/src/test/kotlin/WireGatewayApiTest.kt
@@ -274,7 +274,7 @@ class WireGatewayApiTest {
addKyc("KUDOS:2")
tx("merchant", "KUDOS:3", "exchange", "test with ${ShortHashCode.rand()} reserve pub")
tx("merchant", "KUDOS:4", "exchange", "test with KYC:${ShortHashCode.rand()} account pub")
- client.getA("/accounts/exchange/taler-wire-gateway/history/incoming?delta=25").assertOkJson<IncomingHistory> {
+ client.getA("/accounts/exchange/taler-wire-gateway/history/incoming?limit=25").assertOkJson<IncomingHistory> {
assertEquals(4, it.incoming_transactions.size)
it.incoming_transactions.forEachIndexed { i, tx ->
assertEquals(TalerAmount("KUDOS:${i+1}"), tx.amount)
diff --git a/bank/src/test/kotlin/routines.kt b/bank/src/test/kotlin/routines.kt
@@ -126,7 +126,7 @@ suspend inline fun <reified B> ApplicationTestBuilder.statusRoutine(
// Check no useless polling
assertTime(0, 100) {
- client.get("$url/$confirmed_uuid?long_poll_ms=1000&old_state=selected")
+ client.get("$url/$confirmed_uuid?timeout_ms=1000&old_state=selected")
.assertOkJson<B> { assertEquals(WithdrawalStatus.pending, status(it)) }
}
@@ -134,13 +134,13 @@ suspend inline fun <reified B> ApplicationTestBuilder.statusRoutine(
coroutineScope {
launch { // Check polling succeed
assertTime(100, 200) {
- client.get("$url/$confirmed_uuid?long_poll_ms=1000")
+ client.get("$url/$confirmed_uuid?timeout_ms=1000")
.assertOkJson<B> { assertEquals(WithdrawalStatus.selected, status(it)) }
}
}
launch { // Check polling succeed
assertTime(100, 200) {
- client.get("$url/$aborted_uuid?long_poll_ms=1000")
+ client.get("$url/$aborted_uuid?timeout_ms=1000")
.assertOkJson<B> { assertEquals(WithdrawalStatus.selected, status(it)) }
}
}
@@ -153,13 +153,13 @@ suspend inline fun <reified B> ApplicationTestBuilder.statusRoutine(
coroutineScope {
launch { // Check polling succeed
assertTime(100, 200) {
- client.get("$url/$confirmed_uuid?long_poll_ms=1000&old_state=selected")
+ client.get("$url/$confirmed_uuid?timeout_ms=1000&old_state=selected")
.assertOkJson<B> { assertEquals(WithdrawalStatus.confirmed, status(it))}
}
}
launch { // Check polling timeout
assertTime(200, 300) {
- client.get("$url/$aborted_uuid?long_poll_ms=200&old_state=selected")
+ client.get("$url/$aborted_uuid?timeout_ms=200&old_state=selected")
.assertOkJson<B> { assertEquals(WithdrawalStatus.selected, status(it)) }
}
}
@@ -171,13 +171,13 @@ suspend inline fun <reified B> ApplicationTestBuilder.statusRoutine(
coroutineScope {
launch {
assertTime(200, 300) {
- client.get("$url/$confirmed_uuid?long_poll_ms=200&old_state=confirmed")
+ client.get("$url/$confirmed_uuid?timeout_ms=200&old_state=confirmed")
.assertOkJson<B> { assertEquals(WithdrawalStatus.confirmed, status(it))}
}
}
launch {
assertTime(100, 200) {
- client.get("$url/$aborted_uuid?long_poll_ms=1000&old_state=selected")
+ client.get("$url/$aborted_uuid?timeout_ms=1000&old_state=selected")
.assertOkJson<B> { assertEquals(WithdrawalStatus.aborted, status(it)) }
}
}
diff --git a/common/src/main/kotlin/Constants.kt b/common/src/main/kotlin/Constants.kt
@@ -26,8 +26,8 @@ const val SERIALIZATION_RETRY: Int = 10
const val MAX_BODY_LENGTH: Int = 4 * 1024 // 4kB
// API version
-const val WIRE_GATEWAY_API_VERSION: String = "1:0:0"
-const val REVENUE_API_VERSION: String = "0:0:0"
+const val WIRE_GATEWAY_API_VERSION: String = "2:0:1"
+const val REVENUE_API_VERSION: String = "1:0:1"
// HTTP headers
const val X_CHALLENGE_ID: String = "X-Challenge-Id"
diff --git a/common/src/main/kotlin/db/helpers.kt b/common/src/main/kotlin/db/helpers.kt
@@ -38,7 +38,7 @@ suspend fun <T> DbPool.page(
bind: PreparedStatement.() -> Int = { 0 },
map: (ResultSet) -> T
): List<T> {
- val backward = params.delta < 0
+ val backward = params.limit < 0
val pageQuery = """
$query
$idName ${if (backward) '<' else '>'} ?
@@ -47,8 +47,8 @@ suspend fun <T> DbPool.page(
"""
return serializable(pageQuery) {
val pad = bind()
- setLong(pad + 1, params.start)
- setInt(pad + 2, abs(params.delta))
+ setLong(pad + 1, params.offset)
+ setInt(pad + 2, abs(params.limit))
all { map(it) }
}
}
@@ -76,17 +76,15 @@ suspend fun <T> DbPool.poolHistory(
},
map
)
-
-
- // TODO do we want to handle polling when going backward and there is no transactions yet ?
+
// When going backward there is always at least one transaction or none
- return if (params.page.delta >= 0 && params.polling.poll_ms > 0) {
+ return if (params.page.limit >= 0 && params.polling.timeout_ms > 0) {
listen(bankAccountId) { flow ->
coroutineScope {
// Start buffering notification before loading transactions to not miss any
val polling = launch {
- withTimeoutOrNull(params.polling.poll_ms) {
- flow.first { it > params.page.start } // Always forward so >
+ withTimeoutOrNull(params.polling.timeout_ms) {
+ flow.first { it > params.page.offset } // Always forward so >
}
}
// Initial loading
@@ -127,17 +125,15 @@ suspend fun <T> DbPool.poolHistoryGlobal(
query,
map=map
)
-
- // TODO do we want to handle polling when going backward and there is no transactions yet ?
// When going backward there is always at least one transaction or none
- return if (params.page.delta >= 0 && params.polling.poll_ms > 0) {
+ return if (params.page.limit >= 0 && params.polling.timeout_ms > 0) {
listen { flow ->
coroutineScope {
// Start buffering notification before loading transactions to not miss any
val polling = launch {
- withTimeoutOrNull(params.polling.poll_ms) {
- flow.first { it > params.page.start } // Always forward so >
+ withTimeoutOrNull(params.polling.timeout_ms) {
+ flow.first { it > params.page.offset } // Always forward so >
}
}
// Initial loading
diff --git a/common/src/main/kotlin/params.kt b/common/src/main/kotlin/params.kt
@@ -53,27 +53,42 @@ fun Parameters.amount(name: String): TalerAmount?
}
data class PageParams(
- val delta: Int, val start: Long
+ val limit: Int, val offset: Long
) {
companion object {
fun extract(params: Parameters): PageParams {
- val delta: Int = params.int("delta") ?: -20
- val start: Long = params.long("start") ?: if (delta >= 0) 0L else Long.MAX_VALUE
- if (start < 0) throw badRequest("Param 'start' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
- // TODO enforce delta limit
- return PageParams(delta, start)
+ val legacy_limit_value = params.int("delta")
+ val new_limit_value = params.int("limit")
+ if (legacy_limit_value != null && new_limit_value != null && legacy_limit_value != new_limit_value)
+ throw badRequest("Param 'limit' cannot be used with param 'delta'", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
+
+ val legacy_offset_value = params.long("start")
+ val new_offset_value = params.long("offset")
+ if (legacy_offset_value != null && new_offset_value != null && legacy_offset_value != new_offset_value)
+ throw badRequest("Param 'offset' cannot be used with param 'start'", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
+
+ val limit: Int = new_limit_value ?: legacy_limit_value ?: -20
+ if (limit == 0) throw badRequest("Param 'limit' must be non-zero", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
+ val offset: Long = new_offset_value ?: legacy_offset_value ?: if (limit >= 0) 0L else Long.MAX_VALUE
+ if (offset < 0) throw badRequest("Param 'offset' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
+ // TODO enforce max limit
+ return PageParams(limit, offset)
}
}
}
data class PollingParams(
- val poll_ms: Long
+ val timeout_ms: Long
) {
companion object {
fun extract(params: Parameters): PollingParams {
- val poll_ms: Long = params.long("long_poll_ms") ?: 0
- if (poll_ms < 0) throw badRequest("Param 'long_poll_ms' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
- return PollingParams(poll_ms)
+ val legacy_value = params.long("long_poll_ms")
+ val new_value = params.long("timeout_ms")
+ if (legacy_value != null && new_value != null && legacy_value != new_value)
+ throw badRequest("Param 'timeout_ms' cannot be used with param 'long_poll_ms'", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
+ val timeout_ms: Long = new_value ?: legacy_value ?: 0
+ if (timeout_ms < 0) throw badRequest("Param 'timeout_ms' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED)
+ return PollingParams(timeout_ms)
}
}
}
diff --git a/common/src/main/kotlin/test/helpers.kt b/common/src/main/kotlin/test/helpers.kt
@@ -44,15 +44,15 @@ suspend inline fun <reified B> HttpResponse.assertHistoryIds(size: Int, ids: (B)
// testing the size is like expected.
assertEquals(size, history.size, "bad history length: $history")
- if (params.delta < 0) {
- // testing that the first id is at most the 'start' query param.
- assert(history[0] <= params.start) { "bad history start: $params $history" }
+ if (params.limit < 0) {
+ // testing that the first id is at most the 'offset' query param.
+ assert(history[0] <= params.offset) { "bad history offset: $params $history" }
// testing that the id decreases.
if (history.size > 1)
assert(history.windowed(2).all { (a, b) -> a > b }) { "bad history order: $history" }
} else {
- // testing that the first id is at least the 'start' query param.
- assert(history[0] >= params.start) { "bad history start: $params $history" }
+ // testing that the first id is at least the 'offset' query param.
+ assert(history[0] >= params.offset) { "bad history offset: $params $history" }
// testing that the id increases.
if (history.size > 1)
assert(history.windowed(2).all { (a, b) -> a < b }) { "bad history order: $history" }
diff --git a/common/src/main/kotlin/test/routines.kt b/common/src/main/kotlin/test/routines.kt
@@ -39,11 +39,11 @@ suspend inline fun <reified B> abstractHistoryRoutine(
}
// Get latest registered id
val latestId: suspend () -> Long = {
- history("delta=-1").assertOkJson<B>().run { ids(this)[0] }
+ history("limit=-1").assertOkJson<B>().run { ids(this)[0] }
}
// Check error when no transactions
- history("delta=7").assertNoContent()
+ history("limit=7").assertNoContent()
// Run interleaved registered and ignore transactions
val registeredIter = registered.iterator()
@@ -58,19 +58,19 @@ suspend inline fun <reified B> abstractHistoryRoutine(
val nbTotal = nbRegistered + nbIgnored
// Check ignored
- history("delta=$nbTotal").assertHistory(nbRegistered)
+ history("limit=$nbTotal").assertHistory(nbRegistered)
// Check skip ignored
- history("delta=$nbRegistered").assertHistory(nbRegistered)
+ history("limit=$nbRegistered").assertHistory(nbRegistered)
if (polling) {
// Check no polling when we cannot have more transactions
assertTime(0, 100) {
- history("delta=-${nbRegistered+1}&long_poll_ms=1000")
+ history("limit=-${nbRegistered+1}&timeout_ms=1000")
.assertHistory(nbRegistered)
}
// Check no polling when already find transactions even if less than delta
assertTime(0, 100) {
- history("delta=${nbRegistered+1}&long_poll_ms=1000")
+ history("limit=${nbRegistered+1}&timeout_ms=1000")
.assertHistory(nbRegistered)
}
@@ -79,13 +79,13 @@ suspend inline fun <reified B> abstractHistoryRoutine(
val id = latestId()
launch { // Check polling succeed
assertTime(100, 200) {
- history("delta=2&start=$id&long_poll_ms=1000")
+ history("limit=2&offset=$id&timeout_ms=1000")
.assertHistory(1)
}
}
launch { // Check polling timeout
assertTime(200, 300) {
- history("delta=1&start=${id+nbTotal*3}&long_poll_ms=200")
+ history("limit=1&offset=${id+nbTotal*3}&timeout_ms=200")
.assertNoContent()
}
}
@@ -99,7 +99,7 @@ suspend inline fun <reified B> abstractHistoryRoutine(
val id = latestId()
launch {
assertTime(100, 200) {
- history("delta=7&start=$id&long_poll_ms=1000")
+ history("limit=7&offset=$id&timeout_ms=1000")
.assertHistory(1)
}
}
@@ -113,7 +113,7 @@ suspend inline fun <reified B> abstractHistoryRoutine(
val id = latestId()
launch {
assertTime(200, 300) {
- history("delta=7&start=$id&long_poll_ms=200")
+ history("limit=7&offset=$id&timeout_ms=200")
.assertNoContent()
}
}
@@ -132,9 +132,9 @@ suspend inline fun <reified B> abstractHistoryRoutine(
// Default
history("").assertHistory(20)
// forward range:
- history("delta=10").assertHistory(10)
- history("delta=10&start=4").assertHistory(10)
+ history("limit=10").assertHistory(10)
+ history("limit=10&offset=4").assertHistory(10)
// backward range:
- history("delta=-10").assertHistory(10)
- history("delta=-10&start=${id-4}").assertHistory(10)
+ history("limit=-10").assertHistory(10)
+ history("limit=-10&offset=${id-4}").assertHistory(10)
}
\ No newline at end of file
diff --git a/nexus/src/test/kotlin/WireGatewayApiTest.kt b/nexus/src/test/kotlin/WireGatewayApiTest.kt
@@ -241,7 +241,7 @@ class WireGatewayApiTest {
addKyc("CHF:2")
talerableIn(db, amount = "CHF:3")
talerableKycIn(db, amount = "CHF:4")
- client.getA("/taler-wire-gateway/history/incoming?delta=25").assertOkJson<IncomingHistory> {
+ client.getA("/taler-wire-gateway/history/incoming?limit=25").assertOkJson<IncomingHistory> {
assertEquals(4, it.incoming_transactions.size)
println(it)
it.incoming_transactions.forEachIndexed { i, tx ->