From 49b6dbacec59bde7e99bfa429444941200126785 Mon Sep 17 00:00:00 2001 From: Antoine A <> Date: Fri, 6 Oct 2023 01:25:51 +0000 Subject: Add /taler-wire-gateway/history polling --- .../src/main/kotlin/tech/libeufin/bank/Database.kt | 116 +++++++++++++++++++-- .../tech/libeufin/bank/WireGatewayApiHandlers.kt | 33 +++--- bank/src/main/kotlin/tech/libeufin/bank/helpers.kt | 13 ++- bank/src/test/kotlin/TalerApiTest.kt | 89 ++++++++++++++++ database-versioning/procedures.sql | 5 + 5 files changed, 230 insertions(+), 26 deletions(-) diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt index 066d0cc2..32b925d8 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt @@ -130,7 +130,7 @@ fun resetDatabaseTables(dbConfig: String, sqlDir: String) { dbConn.execSQLUpdate(sqlDrop) } -class Database(private val dbConfig: String, private val bankCurrency: String) { +class Database(private val dbConfig: String, internal val bankCurrency: String) { private var dbConn: PgConnection? = null private var dbCtr: Int = 0 private val preparedStatements: MutableMap = mutableMapOf() @@ -139,6 +139,15 @@ class Database(private val dbConfig: String, private val bankCurrency: String) { Class.forName("org.postgresql.Driver") } + internal fun conn(): PgConnection? { + // Translate "normal" postgresql:// connection URI to something that JDBC likes. + val jdbcConnStr = getJdbcConnectionFromPg(dbConfig) + logger.info("connecting to database via JDBC string '$jdbcConnStr'") + val conn = DriverManager.getConnection(jdbcConnStr).unwrap(PgConnection::class.java) + conn?.execSQLUpdate("SET search_path TO libeufin_bank;") + return conn + } + private fun reconnect() { dbCtr++ val myDbConn = dbConn @@ -146,12 +155,8 @@ class Database(private val dbConfig: String, private val bankCurrency: String) { return dbConn?.close() preparedStatements.clear() - // Translate "normal" postgresql:// connection URI to something that JDBC likes. - val jdbcConnStr = getJdbcConnectionFromPg(dbConfig) - logger.info("connecting to database via JDBC string '$jdbcConnStr'") - dbConn = DriverManager.getConnection(jdbcConnStr).unwrap(PgConnection::class.java) + dbConn = conn() dbCtr = 0 - dbConn?.execSQLUpdate("SET search_path TO libeufin_bank;") } private fun prepare(sql: String): PreparedStatement { @@ -1367,3 +1372,102 @@ class Database(private val dbConfig: String, private val bankCurrency: String) { } } } + +// TODO perf, merge with Database to reuse connection and prepared statement +class HistoryDatabaseCtx(private val db: Database, delta: Long, private val bankAccountId: Long, private val direction: TransactionDirection, private var poll_ms: Long): java.io.Closeable { + private val conn = db.conn() ?: throw internalServerError("DB connection down"); + private val channel = "${direction.name}_$bankAccountId"; + private var stmt: PreparedStatement; + + init { + val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC") + stmt = conn.prepareStatement(""" + SELECT + creditor_payto_uri + ,creditor_name + ,debtor_payto_uri + ,debtor_name + ,subject + ,(amount).val AS amount_val + ,(amount).frac AS amount_frac + ,transaction_date + ,account_servicer_reference + ,payment_information_id + ,end_to_end_id + ,bank_account_id + ,bank_transaction_id + FROM bank_account_transactions + WHERE bank_transaction_id ${cmpOp} ? + AND bank_account_id=? + AND direction=?::direction_enum + ORDER BY bank_transaction_id ${orderBy} + LIMIT ? + """) + + // Older notification cannot appear TODO ? + if (delta < 0) { + poll_ms = 0; + } + if (poll_ms > 0) { + conn.execSQLUpdate("LISTEN $channel"); + } + } + + fun bankTransactionGetHistory(start: Long, delta: Long): List { + stmt.setLong(1, start) + stmt.setLong(2, bankAccountId) + stmt.setString(3, direction.name) + stmt.setLong(4, abs(delta)) + val rs = stmt.executeQuery() + rs.use { + val ret = mutableListOf() + if (!it.next()) return ret + do { + ret.add( + BankAccountTransaction( + creditorPaytoUri = it.getString("creditor_payto_uri"), + creditorName = it.getString("creditor_name"), + debtorPaytoUri = it.getString("debtor_payto_uri"), + debtorName = it.getString("debtor_name"), + amount = TalerAmount( + it.getLong("amount_val"), + it.getInt("amount_frac"), + db.bankCurrency + ), + accountServicerReference = it.getString("account_servicer_reference"), + endToEndId = it.getString("end_to_end_id"), + direction = direction, + bankAccountId = it.getLong("bank_account_id"), + paymentInformationId = it.getString("payment_information_id"), + subject = it.getString("subject"), + transactionDate = it.getLong("transaction_date").microsToJavaInstant() ?: throw faultyTimestampByBank(), + dbRowId = it.getLong("bank_transaction_id") + )) + } while (it.next()) + return ret + } + } + + /** Wait for more transactions, return true if new transactions have appeared */ + suspend fun pool(start: Long, delta: Long): Boolean { + if (poll_ms <= 0 || delta == 0L) + return false; + var remaining = abs(delta); + do { + val pollStart = System.currentTimeMillis() + conn.getNotifications(poll_ms.toInt()).forEach { + val id = it.parameter.toLong() + val new = if (delta < 0) id < start else id > start + if (new) remaining -= 1 + } + val pollEnd = System.currentTimeMillis() + poll_ms -= pollEnd - pollStart + } while (poll_ms > 0 && remaining > 0L) + return true + } + + override fun close() { + conn.execSQLUpdate("UNLISTEN $channel"); + conn.close() + } +} \ No newline at end of file diff --git a/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt b/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt index 4664a55b..290341a4 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt @@ -129,25 +129,22 @@ fun Routing.talerWireGatewayHandlers(db: Database, ctx: BankApplicationContext) var start = params.start var delta = params.delta val items = mutableListOf() + val dbx = HistoryDatabaseCtx(db, delta, bankAccount.expectRowId(), direction, params.poll_ms); + dbx.use { + while (delta != 0L) { + val history = dbx.bankTransactionGetHistory(start, delta) + if (history.isEmpty() && !dbx.pool(start, delta)) + break; + history.forEach { + val item = map(it); + // Advance cursor + start = it.expectRowId() - while (delta != 0L) { - val history = db.bankTransactionGetHistory( - start = start, - delta = delta, - bankAccountId = bankAccount.expectRowId(), - withDirection = direction - ) - if (history.isEmpty()) - break; // TODO long polling here - history.forEach { - val item = map(it); - // Advance cursor - start = it.expectRowId() - - if (item != null) { - items.add(item) - // Reduce delta - if (delta < 0) delta++ else delta--; + if (item != null) { + items.add(item) + // Reduce delta + if (delta < 0) delta++ else delta--; + } } } } diff --git a/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt b/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt index 0efc116b..9b8e02e6 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt @@ -365,7 +365,7 @@ fun getWithdrawal(db: Database, opIdParam: String): TalerWithdrawalOperation { } data class HistoryParams( - val delta: Long, val start: Long + val delta: Long, val start: Long, val poll_ms: Long ) /** @@ -391,7 +391,16 @@ fun getHistoryParams(req: ApplicationRequest): HistoryParams { throw badRequest("Param 'start' not a number") } } - return HistoryParams(delta = delta, start = start) + val poll_ms: Long = when (val param = req.queryParameters["long_poll_ms"]) { + null -> 0 + else -> try { + param.toLong() + } catch (e: Exception) { + logger.error(e.message) + throw badRequest("Param 'long_poll_ms' not a number") + } + } + return HistoryParams(delta = delta, start = start, poll_ms = poll_ms) } /** diff --git a/bank/src/test/kotlin/TalerApiTest.kt b/bank/src/test/kotlin/TalerApiTest.kt index c76fd30e..21b08f5b 100644 --- a/bank/src/test/kotlin/TalerApiTest.kt +++ b/bank/src/test/kotlin/TalerApiTest.kt @@ -6,6 +6,7 @@ import io.ktor.http.* import io.ktor.server.testing.* import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonPrimitive +import kotlinx.coroutines.* import net.taler.wallet.crypto.Base32Crockford import org.junit.Test import tech.libeufin.bank.* @@ -261,6 +262,50 @@ class TalerApiTest { val j: IncomingHistory = Json.decodeFromString(this.bodyAsText()) assertEquals(5, j.incoming_transactions.size) } + + // Check no useless polling + client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&long_poll_ms=6000000") { + basicAuth("bar", "secret") + }.assertOk().run { + val j: IncomingHistory = Json.decodeFromString(this.bodyAsText()) + assertEquals(5, j.incoming_transactions.size) + } + + // Check polling end + client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=60") { + basicAuth("bar", "secret") + }.assertOk().run { + val j: IncomingHistory = Json.decodeFromString(this.bodyAsText()) + assertEquals(5, j.incoming_transactions.size) + } + + // Check polling succedd + runBlocking { + launch { + delay(300) + db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess() + } + client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=6000000") { + basicAuth("bar", "secret") + }.assertOk().run { + val j: IncomingHistory = Json.decodeFromString(this.bodyAsText()) + assertEquals(6, j.incoming_transactions.size) + } + } + + // Check polling timeout + runBlocking { + launch { + delay(300) + db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess() + } + client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=8&long_poll_ms=1000") { + basicAuth("bar", "secret") + }.assertOk().run { + val j: IncomingHistory = Json.decodeFromString(this.bodyAsText()) + assertEquals(7, j.incoming_transactions.size) + } + } // Testing ranges. val mockReservePub = randShortHashCode().encoded @@ -359,6 +404,50 @@ class TalerApiTest { assertEquals(5, j.outgoing_transactions.size) } + // Check no useless polling + client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&long_poll_ms=6000000") { + basicAuth("bar", "secret") + }.assertOk().run { + val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText()) + assertEquals(5, j.outgoing_transactions.size) + } + + // Check polling end + client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=60") { + basicAuth("bar", "secret") + }.assertOk().run { + val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText()) + assertEquals(5, j.outgoing_transactions.size) + } + + // Check polling succedd + runBlocking { + launch { + delay(300) + transfer(db, 2, bankAccountFoo) + } + client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=6000000") { + basicAuth("bar", "secret") + }.assertOk().run { + val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText()) + assertEquals(6, j.outgoing_transactions.size) + } + } + + // Check polling timeout + runBlocking { + launch { + delay(300) + transfer(db, 2, bankAccountFoo) + } + client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=8&long_poll_ms=1000") { + basicAuth("bar", "secret") + }.assertOk().run { + val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText()) + assertEquals(7, j.outgoing_transactions.size) + } + } + // Testing ranges. for (i in 1..400) transfer(db, 2, bankAccountFoo) diff --git a/database-versioning/procedures.sql b/database-versioning/procedures.sql index cdf875aa..1af0d803 100644 --- a/database-versioning/procedures.sql +++ b/database-versioning/procedures.sql @@ -623,6 +623,11 @@ SET balance=new_creditor_balance, has_debt=will_creditor_have_debt WHERE bank_account_id=in_creditor_account_id; + +-- notify transactions +PERFORM pg_notify('debit_' || in_debtor_account_id, out_debit_row_id::text); +PERFORM pg_notify('credit_' || in_creditor_account_id, out_credit_row_id::text); + RETURN; END $$; -- cgit v1.2.3