commit 63553386690e14b6cba34f5ff0a3f8c404f310a1
parent eab6ac884c9fea3c76e0d55ab7605f23e178713d
Author: Antoine A <>
Date: Fri, 6 Oct 2023 16:56:14 +0000
More polling tests
Diffstat:
3 files changed, 166 insertions(+), 130 deletions(-)
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
@@ -744,7 +744,10 @@ class Database(private val dbConfig: String, private val bankCurrency: String) {
logger.error("Balance insufficient")
BankTransactionResult.CONFLICT
}
- else -> BankTransactionResult.SUCCESS
+ else -> {
+ logger.debug("New transaction ${tx.creditorAccountId} -> ${tx.debtorAccountId}")
+ BankTransactionResult.SUCCESS
+ }
}
}
}
@@ -824,129 +827,131 @@ class Database(private val dbConfig: String, private val bankCurrency: String) {
map: (BankAccountTransaction) -> T?
): List<T> {
val conn = conn() ?: throw internalServerError("DB connection down");
- val channel = "${direction.name}_$bankAccountId";
- var start = params.start
- var delta = params.delta
- var poll_ms = params.poll_ms;
-
- val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC")
- val stmt = conn.prepareStatement("""
- SELECT
- creditor_payto_uri
- ,creditor_name
- ,debtor_payto_uri
- ,debtor_name
- ,subject
- ,(amount).val AS amount_val
- ,(amount).frac AS amount_frac
- ,transaction_date
- ,account_servicer_reference
- ,payment_information_id
- ,end_to_end_id
- ,bank_account_id
- ,bank_transaction_id
- FROM bank_account_transactions
- WHERE bank_transaction_id ${cmpOp} ?
- AND bank_account_id=?
- AND direction=?::direction_enum
- ORDER BY bank_transaction_id ${orderBy}
- LIMIT ?
- """)
-
- // If going backward with a starting point, it is useless to poll
- if (delta < 0 && start != Long.MAX_VALUE) {
- poll_ms = 0;
- }
-
- // Only start expensive listening if we intend to poll
- if (poll_ms > 0) {
- conn.execSQLUpdate("LISTEN $channel");
- }
+ conn.use {
+ val channel = "${direction.name}_$bankAccountId";
+ var start = params.start
+ var delta = params.delta
+ var poll_ms = params.poll_ms;
+
+ val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC")
+ val stmt = conn.prepareStatement("""
+ SELECT
+ creditor_payto_uri
+ ,creditor_name
+ ,debtor_payto_uri
+ ,debtor_name
+ ,subject
+ ,(amount).val AS amount_val
+ ,(amount).frac AS amount_frac
+ ,transaction_date
+ ,account_servicer_reference
+ ,payment_information_id
+ ,end_to_end_id
+ ,bank_account_id
+ ,bank_transaction_id
+ FROM bank_account_transactions
+ WHERE bank_transaction_id ${cmpOp} ?
+ AND bank_account_id=?
+ AND direction=?::direction_enum
+ ORDER BY bank_transaction_id ${orderBy}
+ LIMIT ?
+ """)
+
+ // If going backward with a starting point, it is useless to poll
+ if (delta < 0 && start != Long.MAX_VALUE) {
+ poll_ms = 0;
+ }
- val items = mutableListOf<T>()
-
- fun bankTransactionGetHistory(): List<BankAccountTransaction> {
- stmt.setLong(1, start)
- stmt.setLong(2, bankAccountId)
- stmt.setString(3, direction.name)
- stmt.setLong(4, abs(delta))
- return stmt.all {
- BankAccountTransaction(
- creditorPaytoUri = it.getString("creditor_payto_uri"),
- creditorName = it.getString("creditor_name"),
- debtorPaytoUri = it.getString("debtor_payto_uri"),
- debtorName = it.getString("debtor_name"),
- amount = TalerAmount(
- it.getLong("amount_val"),
- it.getInt("amount_frac"),
- getCurrency()
- ),
- accountServicerReference = it.getString("account_servicer_reference"),
- endToEndId = it.getString("end_to_end_id"),
- direction = direction,
- bankAccountId = it.getLong("bank_account_id"),
- paymentInformationId = it.getString("payment_information_id"),
- subject = it.getString("subject"),
- transactionDate = it.getLong("transaction_date").microsToJavaInstant() ?: throw faultyTimestampByBank(),
- dbRowId = it.getLong("bank_transaction_id")
- )
+ // Only start expensive listening if we intend to poll
+ if (poll_ms > 0) {
+ conn.execSQLUpdate("LISTEN $channel");
}
- }
- fun loadBankHistory() {
- while (delta != 0L) {
- val history = bankTransactionGetHistory()
- if (history.isEmpty())
- break;
- history.forEach {
- val item = map(it);
- // Advance cursor
- start = it.expectRowId()
-
- if (item != null) {
- items.add(item)
- // Reduce delta
- if (delta < 0) delta++ else delta--;
- }
+ val items = mutableListOf<T>()
+
+ fun bankTransactionGetHistory(): List<BankAccountTransaction> {
+ stmt.setLong(1, start)
+ stmt.setLong(2, bankAccountId)
+ stmt.setString(3, direction.name)
+ stmt.setLong(4, abs(delta))
+ return stmt.all {
+ BankAccountTransaction(
+ creditorPaytoUri = it.getString("creditor_payto_uri"),
+ creditorName = it.getString("creditor_name"),
+ debtorPaytoUri = it.getString("debtor_payto_uri"),
+ debtorName = it.getString("debtor_name"),
+ amount = TalerAmount(
+ it.getLong("amount_val"),
+ it.getInt("amount_frac"),
+ getCurrency()
+ ),
+ accountServicerReference = it.getString("account_servicer_reference"),
+ endToEndId = it.getString("end_to_end_id"),
+ direction = direction,
+ bankAccountId = it.getLong("bank_account_id"),
+ paymentInformationId = it.getString("payment_information_id"),
+ subject = it.getString("subject"),
+ transactionDate = it.getLong("transaction_date").microsToJavaInstant() ?: throw faultyTimestampByBank(),
+ dbRowId = it.getLong("bank_transaction_id")
+ )
}
}
- }
- loadBankHistory()
-
- // Long polling
- while (delta != 0L && poll_ms > 0) {
- var remaining = abs(delta);
- do {
- val pollStart = System.currentTimeMillis()
- conn.getNotifications(poll_ms.toInt()).forEach {
- val id = it.parameter.toLong()
- val new = when {
- params.start == Long.MAX_VALUE -> true
- delta < 0 -> id < start
- else -> id > start
+ fun loadBankHistory() {
+ while (delta != 0L) {
+ val history = bankTransactionGetHistory()
+ if (history.isEmpty())
+ break;
+ history.forEach {
+ val item = map(it);
+ // Advance cursor
+ start = it.expectRowId()
+
+ if (item != null) {
+ items.add(item)
+ // Reduce delta
+ if (delta < 0) delta++ else delta--;
+ }
}
- if (new) remaining -= 1
}
- val pollEnd = System.currentTimeMillis()
- poll_ms -= pollEnd - pollStart
- } while (poll_ms > 0 && remaining > 0L)
-
- // If going backward without a starting point, we reset loading progress
- if (params.start == Long.MAX_VALUE) {
- start = params.start
- delta = params.delta
- items.clear()
}
+
loadBankHistory()
- }
- conn.execSQLUpdate("UNLISTEN $channel");
- conn.getNotifications(); // Clear pending notifications
+ // Long polling
+ while (delta != 0L && poll_ms > 0) {
+ var remaining = abs(delta);
+ do {
+ val pollStart = System.currentTimeMillis()
+ logger.debug("POOL")
+ conn.getNotifications(poll_ms.toInt()).forEach {
+ val id = it.parameter.toLong()
+ val new = when {
+ params.start == Long.MAX_VALUE -> true
+ delta < 0 -> id < start
+ else -> id > start
+ }
+ logger.debug("NOTIF $id $new")
+ if (new) remaining -= 1
+ }
+ val pollEnd = System.currentTimeMillis()
+ poll_ms -= pollEnd - pollStart
+ } while (poll_ms > 0 && remaining > 0L)
+
+ // If going backward without a starting point, we reset loading progress
+ if (params.start == Long.MAX_VALUE) {
+ start = params.start
+ delta = params.delta
+ items.clear()
+ }
+ loadBankHistory()
+ }
- conn.close()
+ conn.execSQLUpdate("UNLISTEN $channel");
+ conn.getNotifications(); // Clear pending notifications
- return items.toList();
+ return items.toList();
+ }
}
/**
diff --git a/bank/src/test/kotlin/TalerApiTest.kt b/bank/src/test/kotlin/TalerApiTest.kt
@@ -282,7 +282,7 @@ class TalerApiTest {
}.assertHistory(5)
// Check no useless polling
- assertTime(1000) {
+ assertTime(0, 1000) {
client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&start=20&long_poll_ms=1000") {
basicAuth("bar", "secret")
}.assertHistory(5)
@@ -293,28 +293,43 @@ class TalerApiTest {
basicAuth("bar", "secret")
}.assertHistory(5)
- // Check polling succeed
+ // Check polling succeed forward
runBlocking {
- launch {
+ async {
delay(200)
db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess()
}
- assertTime(1000) {
- client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&long_poll_ms=1000") {
+ assertTime(200, 1000) {
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=1000") {
basicAuth("bar", "secret")
}.assertHistory(6)
}
}
+ // Check polling succeed backward
+ runBlocking {
+ async {
+ delay(200)
+ db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess()
+ }
+ assertTime(200, 1000) {
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-7&long_poll_ms=1000") {
+ basicAuth("bar", "secret")
+ }.assertHistory(7)
+ }
+ }
+
// Check polling timeout
runBlocking {
launch {
delay(200)
db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess()
}
- client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=8&long_poll_ms=300") {
- basicAuth("bar", "secret")
- }.assertHistory(7)
+ assertTime(200, 400) {
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=9&long_poll_ms=300") {
+ basicAuth("bar", "secret")
+ }.assertHistory(8)
+ }
}
// Testing ranges.
@@ -409,8 +424,8 @@ class TalerApiTest {
}.assertHistory(5)
// Check no useless polling
- assertTime(3000) {
- client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&start=20&long_poll_ms=3000") {
+ assertTime(0, 1000) {
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&start=20&long_poll_ms=1000") {
basicAuth("bar", "secret")
}.assertHistory(5)
}
@@ -420,28 +435,43 @@ class TalerApiTest {
basicAuth("bar", "secret")
}.assertHistory(5)
- // Check polling succeed
+ // Check polling succeed forward
runBlocking {
- launch {
+ async {
delay(200)
transfer(db, 2, bankAccountFoo)
}
- assertTime(3000) {
- client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&long_poll_ms=3000") {
+ assertTime(200, 1000) {
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=1000") {
basicAuth("bar", "secret")
}.assertHistory(6)
}
}
+ // Check polling succeed backward
+ runBlocking {
+ async {
+ delay(200)
+ transfer(db, 2, bankAccountFoo)
+ }
+ assertTime(200, 1000) {
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-7&long_poll_ms=1000") {
+ basicAuth("bar", "secret")
+ }.assertHistory(7)
+ }
+ }
+
// Check polling timeout
runBlocking {
launch {
delay(200)
transfer(db, 2, bankAccountFoo)
}
- client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=8&long_poll_ms=300") {
- basicAuth("bar", "secret")
- }.assertHistory(7)
+ assertTime(200, 400) {
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=8&long_poll_ms=300") {
+ basicAuth("bar", "secret")
+ }.assertHistory(8)
+ }
}
// Testing ranges.
diff --git a/bank/src/test/kotlin/helpers.kt b/bank/src/test/kotlin/helpers.kt
@@ -24,12 +24,13 @@ fun BankTransactionResult.assertSuccess() {
assertEquals(BankTransactionResult.SUCCESS, this)
}
-suspend fun assertTime(ms: Int, lambda: suspend () -> Unit) {
+suspend fun assertTime(min: Int, max: Int, lambda: suspend () -> Unit) {
val start = System.currentTimeMillis()
lambda()
val end = System.currentTimeMillis()
val time = end - start
- assert(time < ms) { "Expected to last at most $ms ms, lasted $time" }
+ assert(time >= min) { "Expected to last at least $min ms, lasted $time" }
+ assert(time <= max) { "Expected to last at most $max ms, lasted $time" }
}
/* ----- Body helper ----- */