diff options
author | Antoine A <> | 2023-10-06 08:20:28 +0000 |
---|---|---|
committer | Antoine A <> | 2023-10-06 08:20:28 +0000 |
commit | 0f5b178be2eafee891002dc38aee062ead94e64c (patch) | |
tree | a4471be44208eaed8643725bdd83af13f0a77c3d | |
parent | 49b6dbacec59bde7e99bfa429444941200126785 (diff) | |
download | libeufin-0f5b178be2eafee891002dc38aee062ead94e64c.tar.gz libeufin-0f5b178be2eafee891002dc38aee062ead94e64c.tar.bz2 libeufin-0f5b178be2eafee891002dc38aee062ead94e64c.zip |
Fix /taler-wire-gateway/history polling
-rw-r--r-- | bank/src/main/kotlin/tech/libeufin/bank/Database.kt | 134 | ||||
-rw-r--r-- | bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt | 25 | ||||
-rw-r--r-- | bank/src/test/kotlin/TalerApiTest.kt | 20 |
3 files changed, 99 insertions, 80 deletions
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt index 32b925d8..67a4ae95 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt @@ -1374,46 +1374,56 @@ class Database(private val dbConfig: String, internal val bankCurrency: String) } // TODO perf, merge with Database to reuse connection and prepared statement -class HistoryDatabaseCtx(private val db: Database, delta: Long, private val bankAccountId: Long, private val direction: TransactionDirection, private var poll_ms: Long): java.io.Closeable { - private val conn = db.conn() ?: throw internalServerError("DB connection down"); - private val channel = "${direction.name}_$bankAccountId"; - private var stmt: PreparedStatement; - - init { - val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC") - stmt = conn.prepareStatement(""" - SELECT - creditor_payto_uri - ,creditor_name - ,debtor_payto_uri - ,debtor_name - ,subject - ,(amount).val AS amount_val - ,(amount).frac AS amount_frac - ,transaction_date - ,account_servicer_reference - ,payment_information_id - ,end_to_end_id - ,bank_account_id - ,bank_transaction_id - FROM bank_account_transactions - WHERE bank_transaction_id ${cmpOp} ? - AND bank_account_id=? - AND direction=?::direction_enum - ORDER BY bank_transaction_id ${orderBy} - LIMIT ? - """) +suspend fun <T> bankTransactionPoolHistory( + db: Database, + params: HistoryParams, + bankAccountId: Long, + direction: TransactionDirection, + map: (BankAccountTransaction) -> T? +): List<T> { + val conn = db.conn() ?: throw internalServerError("DB connection down"); + val channel = "${direction.name}_$bankAccountId"; + var start = params.start + var delta = params.delta + var poll_ms = params.poll_ms; + + val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC") + val stmt = conn.prepareStatement(""" + SELECT + creditor_payto_uri + ,creditor_name + ,debtor_payto_uri + ,debtor_name + ,subject + ,(amount).val AS amount_val + ,(amount).frac AS amount_frac + ,transaction_date + ,account_servicer_reference + ,payment_information_id + ,end_to_end_id + ,bank_account_id + ,bank_transaction_id + FROM bank_account_transactions + WHERE bank_transaction_id ${cmpOp} ? + AND bank_account_id=? + AND direction=?::direction_enum + ORDER BY bank_transaction_id ${orderBy} + LIMIT ? + """) + + // If going backward with a starting point, it is useless to poll + if (delta < 0 && start != Long.MAX_VALUE) { + poll_ms = 0; + } - // Older notification cannot appear TODO ? - if (delta < 0) { - poll_ms = 0; - } - if (poll_ms > 0) { - conn.execSQLUpdate("LISTEN $channel"); - } + // Only start expensive listening if we intend to poll + if (poll_ms > 0) { + conn.execSQLUpdate("LISTEN $channel"); } - fun bankTransactionGetHistory(start: Long, delta: Long): List<BankAccountTransaction> { + val items = mutableListOf<T>() + + fun bankTransactionGetHistory(): List<BankAccountTransaction> { stmt.setLong(1, start) stmt.setLong(2, bankAccountId) stmt.setString(3, direction.name) @@ -1448,26 +1458,56 @@ class HistoryDatabaseCtx(private val db: Database, delta: Long, private val bank } } - /** Wait for more transactions, return true if new transactions have appeared */ - suspend fun pool(start: Long, delta: Long): Boolean { - if (poll_ms <= 0 || delta == 0L) - return false; + fun loadBankHistory() { + while (delta != 0L) { + val history = bankTransactionGetHistory() + if (history.isEmpty()) + break; + history.forEach { + val item = map(it); + // Advance cursor + start = it.expectRowId() + + if (item != null) { + items.add(item) + // Reduce delta + if (delta < 0) delta++ else delta--; + } + } + } + } + + loadBankHistory() + + // Long polling + while (delta != 0L && poll_ms > 0) { var remaining = abs(delta); do { val pollStart = System.currentTimeMillis() conn.getNotifications(poll_ms.toInt()).forEach { val id = it.parameter.toLong() - val new = if (delta < 0) id < start else id > start + val new = when { + params.start == Long.MAX_VALUE -> true + delta < 0 -> id < start + else -> id > start + } if (new) remaining -= 1 } val pollEnd = System.currentTimeMillis() poll_ms -= pollEnd - pollStart } while (poll_ms > 0 && remaining > 0L) - return true - } - override fun close() { - conn.execSQLUpdate("UNLISTEN $channel"); - conn.close() + // If going backward without a starting point, we reset loading progress + if (params.start == Long.MAX_VALUE) { + start = params.start + delta = params.delta + items.clear() + } + loadBankHistory() } + + conn.execSQLUpdate("UNLISTEN $channel"); + conn.close() + + return items.toList(); }
\ No newline at end of file diff --git a/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt b/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt index 290341a4..e52828f6 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt @@ -125,29 +125,8 @@ fun Routing.talerWireGatewayHandlers(db: Database, ctx: BankApplicationContext) val params = getHistoryParams(call.request) val bankAccount = call.bankAccount() if (!bankAccount.isTalerExchange) throw forbidden("History is not related to a Taler exchange.") - - var start = params.start - var delta = params.delta - val items = mutableListOf<T>() - val dbx = HistoryDatabaseCtx(db, delta, bankAccount.expectRowId(), direction, params.poll_ms); - dbx.use { - while (delta != 0L) { - val history = dbx.bankTransactionGetHistory(start, delta) - if (history.isEmpty() && !dbx.pool(start, delta)) - break; - history.forEach { - val item = map(it); - // Advance cursor - start = it.expectRowId() - - if (item != null) { - items.add(item) - // Reduce delta - if (delta < 0) delta++ else delta--; - } - } - } - } + + val items = bankTransactionPoolHistory(db, params, bankAccount.expectRowId(), direction, map); if (items.isEmpty()) { call.respond(HttpStatusCode.NoContent) diff --git a/bank/src/test/kotlin/TalerApiTest.kt b/bank/src/test/kotlin/TalerApiTest.kt index 21b08f5b..894e94b8 100644 --- a/bank/src/test/kotlin/TalerApiTest.kt +++ b/bank/src/test/kotlin/TalerApiTest.kt @@ -264,7 +264,7 @@ class TalerApiTest { } // Check no useless polling - client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&long_poll_ms=6000000") { + client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&start=20&long_poll_ms=6000000") { basicAuth("bar", "secret") }.assertOk().run { val j: IncomingHistory = Json.decodeFromString(this.bodyAsText()) @@ -282,10 +282,10 @@ class TalerApiTest { // Check polling succedd runBlocking { launch { - delay(300) + delay(200) db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess() } - client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=6000000") { + client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&long_poll_ms=6000000") { basicAuth("bar", "secret") }.assertOk().run { val j: IncomingHistory = Json.decodeFromString(this.bodyAsText()) @@ -296,10 +296,10 @@ class TalerApiTest { // Check polling timeout runBlocking { launch { - delay(300) + delay(200) db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess() } - client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=8&long_poll_ms=1000") { + client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=8&long_poll_ms=300") { basicAuth("bar", "secret") }.assertOk().run { val j: IncomingHistory = Json.decodeFromString(this.bodyAsText()) @@ -405,7 +405,7 @@ class TalerApiTest { } // Check no useless polling - client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&long_poll_ms=6000000") { + client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&start=20&long_poll_ms=6000000") { basicAuth("bar", "secret") }.assertOk().run { val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText()) @@ -423,10 +423,10 @@ class TalerApiTest { // Check polling succedd runBlocking { launch { - delay(300) + delay(200) transfer(db, 2, bankAccountFoo) } - client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=6000000") { + client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&long_poll_ms=6000000") { basicAuth("bar", "secret") }.assertOk().run { val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText()) @@ -437,10 +437,10 @@ class TalerApiTest { // Check polling timeout runBlocking { launch { - delay(300) + delay(200) transfer(db, 2, bankAccountFoo) } - client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=8&long_poll_ms=1000") { + client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=8&long_poll_ms=300") { basicAuth("bar", "secret") }.assertOk().run { val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText()) |