summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntoine A <>2023-10-06 08:20:28 +0000
committerAntoine A <>2023-10-06 08:20:28 +0000
commit0f5b178be2eafee891002dc38aee062ead94e64c (patch)
treea4471be44208eaed8643725bdd83af13f0a77c3d
parent49b6dbacec59bde7e99bfa429444941200126785 (diff)
downloadlibeufin-0f5b178be2eafee891002dc38aee062ead94e64c.tar.gz
libeufin-0f5b178be2eafee891002dc38aee062ead94e64c.tar.bz2
libeufin-0f5b178be2eafee891002dc38aee062ead94e64c.zip
Fix /taler-wire-gateway/history polling
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/Database.kt134
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt25
-rw-r--r--bank/src/test/kotlin/TalerApiTest.kt20
3 files changed, 99 insertions, 80 deletions
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
index 32b925d8..67a4ae95 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
@@ -1374,46 +1374,56 @@ class Database(private val dbConfig: String, internal val bankCurrency: String)
}
// TODO perf, merge with Database to reuse connection and prepared statement
-class HistoryDatabaseCtx(private val db: Database, delta: Long, private val bankAccountId: Long, private val direction: TransactionDirection, private var poll_ms: Long): java.io.Closeable {
- private val conn = db.conn() ?: throw internalServerError("DB connection down");
- private val channel = "${direction.name}_$bankAccountId";
- private var stmt: PreparedStatement;
-
- init {
- val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC")
- stmt = conn.prepareStatement("""
- SELECT
- creditor_payto_uri
- ,creditor_name
- ,debtor_payto_uri
- ,debtor_name
- ,subject
- ,(amount).val AS amount_val
- ,(amount).frac AS amount_frac
- ,transaction_date
- ,account_servicer_reference
- ,payment_information_id
- ,end_to_end_id
- ,bank_account_id
- ,bank_transaction_id
- FROM bank_account_transactions
- WHERE bank_transaction_id ${cmpOp} ?
- AND bank_account_id=?
- AND direction=?::direction_enum
- ORDER BY bank_transaction_id ${orderBy}
- LIMIT ?
- """)
+suspend fun <T> bankTransactionPoolHistory(
+ db: Database,
+ params: HistoryParams,
+ bankAccountId: Long,
+ direction: TransactionDirection,
+ map: (BankAccountTransaction) -> T?
+): List<T> {
+ val conn = db.conn() ?: throw internalServerError("DB connection down");
+ val channel = "${direction.name}_$bankAccountId";
+ var start = params.start
+ var delta = params.delta
+ var poll_ms = params.poll_ms;
+
+ val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC")
+ val stmt = conn.prepareStatement("""
+ SELECT
+ creditor_payto_uri
+ ,creditor_name
+ ,debtor_payto_uri
+ ,debtor_name
+ ,subject
+ ,(amount).val AS amount_val
+ ,(amount).frac AS amount_frac
+ ,transaction_date
+ ,account_servicer_reference
+ ,payment_information_id
+ ,end_to_end_id
+ ,bank_account_id
+ ,bank_transaction_id
+ FROM bank_account_transactions
+ WHERE bank_transaction_id ${cmpOp} ?
+ AND bank_account_id=?
+ AND direction=?::direction_enum
+ ORDER BY bank_transaction_id ${orderBy}
+ LIMIT ?
+ """)
+
+ // If going backward with a starting point, it is useless to poll
+ if (delta < 0 && start != Long.MAX_VALUE) {
+ poll_ms = 0;
+ }
- // Older notification cannot appear TODO ?
- if (delta < 0) {
- poll_ms = 0;
- }
- if (poll_ms > 0) {
- conn.execSQLUpdate("LISTEN $channel");
- }
+ // Only start expensive listening if we intend to poll
+ if (poll_ms > 0) {
+ conn.execSQLUpdate("LISTEN $channel");
}
- fun bankTransactionGetHistory(start: Long, delta: Long): List<BankAccountTransaction> {
+ val items = mutableListOf<T>()
+
+ fun bankTransactionGetHistory(): List<BankAccountTransaction> {
stmt.setLong(1, start)
stmt.setLong(2, bankAccountId)
stmt.setString(3, direction.name)
@@ -1448,26 +1458,56 @@ class HistoryDatabaseCtx(private val db: Database, delta: Long, private val bank
}
}
- /** Wait for more transactions, return true if new transactions have appeared */
- suspend fun pool(start: Long, delta: Long): Boolean {
- if (poll_ms <= 0 || delta == 0L)
- return false;
+ fun loadBankHistory() {
+ while (delta != 0L) {
+ val history = bankTransactionGetHistory()
+ if (history.isEmpty())
+ break;
+ history.forEach {
+ val item = map(it);
+ // Advance cursor
+ start = it.expectRowId()
+
+ if (item != null) {
+ items.add(item)
+ // Reduce delta
+ if (delta < 0) delta++ else delta--;
+ }
+ }
+ }
+ }
+
+ loadBankHistory()
+
+ // Long polling
+ while (delta != 0L && poll_ms > 0) {
var remaining = abs(delta);
do {
val pollStart = System.currentTimeMillis()
conn.getNotifications(poll_ms.toInt()).forEach {
val id = it.parameter.toLong()
- val new = if (delta < 0) id < start else id > start
+ val new = when {
+ params.start == Long.MAX_VALUE -> true
+ delta < 0 -> id < start
+ else -> id > start
+ }
if (new) remaining -= 1
}
val pollEnd = System.currentTimeMillis()
poll_ms -= pollEnd - pollStart
} while (poll_ms > 0 && remaining > 0L)
- return true
- }
- override fun close() {
- conn.execSQLUpdate("UNLISTEN $channel");
- conn.close()
+ // If going backward without a starting point, we reset loading progress
+ if (params.start == Long.MAX_VALUE) {
+ start = params.start
+ delta = params.delta
+ items.clear()
+ }
+ loadBankHistory()
}
+
+ conn.execSQLUpdate("UNLISTEN $channel");
+ conn.close()
+
+ return items.toList();
} \ No newline at end of file
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt b/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt
index 290341a4..e52828f6 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt
@@ -125,29 +125,8 @@ fun Routing.talerWireGatewayHandlers(db: Database, ctx: BankApplicationContext)
val params = getHistoryParams(call.request)
val bankAccount = call.bankAccount()
if (!bankAccount.isTalerExchange) throw forbidden("History is not related to a Taler exchange.")
-
- var start = params.start
- var delta = params.delta
- val items = mutableListOf<T>()
- val dbx = HistoryDatabaseCtx(db, delta, bankAccount.expectRowId(), direction, params.poll_ms);
- dbx.use {
- while (delta != 0L) {
- val history = dbx.bankTransactionGetHistory(start, delta)
- if (history.isEmpty() && !dbx.pool(start, delta))
- break;
- history.forEach {
- val item = map(it);
- // Advance cursor
- start = it.expectRowId()
-
- if (item != null) {
- items.add(item)
- // Reduce delta
- if (delta < 0) delta++ else delta--;
- }
- }
- }
- }
+
+ val items = bankTransactionPoolHistory(db, params, bankAccount.expectRowId(), direction, map);
if (items.isEmpty()) {
call.respond(HttpStatusCode.NoContent)
diff --git a/bank/src/test/kotlin/TalerApiTest.kt b/bank/src/test/kotlin/TalerApiTest.kt
index 21b08f5b..894e94b8 100644
--- a/bank/src/test/kotlin/TalerApiTest.kt
+++ b/bank/src/test/kotlin/TalerApiTest.kt
@@ -264,7 +264,7 @@ class TalerApiTest {
}
// Check no useless polling
- client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&long_poll_ms=6000000") {
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&start=20&long_poll_ms=6000000") {
basicAuth("bar", "secret")
}.assertOk().run {
val j: IncomingHistory = Json.decodeFromString(this.bodyAsText())
@@ -282,10 +282,10 @@ class TalerApiTest {
// Check polling succedd
runBlocking {
launch {
- delay(300)
+ delay(200)
db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess()
}
- client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=6000000") {
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&long_poll_ms=6000000") {
basicAuth("bar", "secret")
}.assertOk().run {
val j: IncomingHistory = Json.decodeFromString(this.bodyAsText())
@@ -296,10 +296,10 @@ class TalerApiTest {
// Check polling timeout
runBlocking {
launch {
- delay(300)
+ delay(200)
db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess()
}
- client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=8&long_poll_ms=1000") {
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=8&long_poll_ms=300") {
basicAuth("bar", "secret")
}.assertOk().run {
val j: IncomingHistory = Json.decodeFromString(this.bodyAsText())
@@ -405,7 +405,7 @@ class TalerApiTest {
}
// Check no useless polling
- client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&long_poll_ms=6000000") {
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&start=20&long_poll_ms=6000000") {
basicAuth("bar", "secret")
}.assertOk().run {
val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText())
@@ -423,10 +423,10 @@ class TalerApiTest {
// Check polling succedd
runBlocking {
launch {
- delay(300)
+ delay(200)
transfer(db, 2, bankAccountFoo)
}
- client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=6000000") {
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&long_poll_ms=6000000") {
basicAuth("bar", "secret")
}.assertOk().run {
val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText())
@@ -437,10 +437,10 @@ class TalerApiTest {
// Check polling timeout
runBlocking {
launch {
- delay(300)
+ delay(200)
transfer(db, 2, bankAccountFoo)
}
- client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=8&long_poll_ms=1000") {
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=8&long_poll_ms=300") {
basicAuth("bar", "secret")
}.assertOk().run {
val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText())