summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntoine A <>2023-10-06 01:25:51 +0000
committerAntoine A <>2023-10-06 01:25:51 +0000
commit49b6dbacec59bde7e99bfa429444941200126785 (patch)
treef5b8e49e0f52b3afc8e09fc66e7afdf00cd9f59b
parent426ad5b159956ede4f2699ed9d1f1ed41ad67492 (diff)
downloadlibeufin-49b6dbacec59bde7e99bfa429444941200126785.tar.gz
libeufin-49b6dbacec59bde7e99bfa429444941200126785.tar.bz2
libeufin-49b6dbacec59bde7e99bfa429444941200126785.zip
Add /taler-wire-gateway/history polling
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/Database.kt116
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt33
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/helpers.kt13
-rw-r--r--bank/src/test/kotlin/TalerApiTest.kt89
-rw-r--r--database-versioning/procedures.sql5
5 files changed, 230 insertions, 26 deletions
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
index 066d0cc2..32b925d8 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
@@ -130,7 +130,7 @@ fun resetDatabaseTables(dbConfig: String, sqlDir: String) {
dbConn.execSQLUpdate(sqlDrop)
}
-class Database(private val dbConfig: String, private val bankCurrency: String) {
+class Database(private val dbConfig: String, internal val bankCurrency: String) {
private var dbConn: PgConnection? = null
private var dbCtr: Int = 0
private val preparedStatements: MutableMap<String, PreparedStatement> = mutableMapOf()
@@ -139,6 +139,15 @@ class Database(private val dbConfig: String, private val bankCurrency: String) {
Class.forName("org.postgresql.Driver")
}
+ internal fun conn(): PgConnection? {
+ // Translate "normal" postgresql:// connection URI to something that JDBC likes.
+ val jdbcConnStr = getJdbcConnectionFromPg(dbConfig)
+ logger.info("connecting to database via JDBC string '$jdbcConnStr'")
+ val conn = DriverManager.getConnection(jdbcConnStr).unwrap(PgConnection::class.java)
+ conn?.execSQLUpdate("SET search_path TO libeufin_bank;")
+ return conn
+ }
+
private fun reconnect() {
dbCtr++
val myDbConn = dbConn
@@ -146,12 +155,8 @@ class Database(private val dbConfig: String, private val bankCurrency: String) {
return
dbConn?.close()
preparedStatements.clear()
- // Translate "normal" postgresql:// connection URI to something that JDBC likes.
- val jdbcConnStr = getJdbcConnectionFromPg(dbConfig)
- logger.info("connecting to database via JDBC string '$jdbcConnStr'")
- dbConn = DriverManager.getConnection(jdbcConnStr).unwrap(PgConnection::class.java)
+ dbConn = conn()
dbCtr = 0
- dbConn?.execSQLUpdate("SET search_path TO libeufin_bank;")
}
private fun prepare(sql: String): PreparedStatement {
@@ -1367,3 +1372,102 @@ class Database(private val dbConfig: String, private val bankCurrency: String) {
}
}
}
+
+// TODO perf, merge with Database to reuse connection and prepared statement
+class HistoryDatabaseCtx(private val db: Database, delta: Long, private val bankAccountId: Long, private val direction: TransactionDirection, private var poll_ms: Long): java.io.Closeable {
+ private val conn = db.conn() ?: throw internalServerError("DB connection down");
+ private val channel = "${direction.name}_$bankAccountId";
+ private var stmt: PreparedStatement;
+
+ init {
+ val (cmpOp, orderBy) = if (delta < 0) Pair("<", "DESC") else Pair(">", "ASC")
+ stmt = conn.prepareStatement("""
+ SELECT
+ creditor_payto_uri
+ ,creditor_name
+ ,debtor_payto_uri
+ ,debtor_name
+ ,subject
+ ,(amount).val AS amount_val
+ ,(amount).frac AS amount_frac
+ ,transaction_date
+ ,account_servicer_reference
+ ,payment_information_id
+ ,end_to_end_id
+ ,bank_account_id
+ ,bank_transaction_id
+ FROM bank_account_transactions
+ WHERE bank_transaction_id ${cmpOp} ?
+ AND bank_account_id=?
+ AND direction=?::direction_enum
+ ORDER BY bank_transaction_id ${orderBy}
+ LIMIT ?
+ """)
+
+ // Older notification cannot appear TODO ?
+ if (delta < 0) {
+ poll_ms = 0;
+ }
+ if (poll_ms > 0) {
+ conn.execSQLUpdate("LISTEN $channel");
+ }
+ }
+
+ fun bankTransactionGetHistory(start: Long, delta: Long): List<BankAccountTransaction> {
+ stmt.setLong(1, start)
+ stmt.setLong(2, bankAccountId)
+ stmt.setString(3, direction.name)
+ stmt.setLong(4, abs(delta))
+ val rs = stmt.executeQuery()
+ rs.use {
+ val ret = mutableListOf<BankAccountTransaction>()
+ if (!it.next()) return ret
+ do {
+ ret.add(
+ BankAccountTransaction(
+ creditorPaytoUri = it.getString("creditor_payto_uri"),
+ creditorName = it.getString("creditor_name"),
+ debtorPaytoUri = it.getString("debtor_payto_uri"),
+ debtorName = it.getString("debtor_name"),
+ amount = TalerAmount(
+ it.getLong("amount_val"),
+ it.getInt("amount_frac"),
+ db.bankCurrency
+ ),
+ accountServicerReference = it.getString("account_servicer_reference"),
+ endToEndId = it.getString("end_to_end_id"),
+ direction = direction,
+ bankAccountId = it.getLong("bank_account_id"),
+ paymentInformationId = it.getString("payment_information_id"),
+ subject = it.getString("subject"),
+ transactionDate = it.getLong("transaction_date").microsToJavaInstant() ?: throw faultyTimestampByBank(),
+ dbRowId = it.getLong("bank_transaction_id")
+ ))
+ } while (it.next())
+ return ret
+ }
+ }
+
+ /** Wait for more transactions, return true if new transactions have appeared */
+ suspend fun pool(start: Long, delta: Long): Boolean {
+ if (poll_ms <= 0 || delta == 0L)
+ return false;
+ var remaining = abs(delta);
+ do {
+ val pollStart = System.currentTimeMillis()
+ conn.getNotifications(poll_ms.toInt()).forEach {
+ val id = it.parameter.toLong()
+ val new = if (delta < 0) id < start else id > start
+ if (new) remaining -= 1
+ }
+ val pollEnd = System.currentTimeMillis()
+ poll_ms -= pollEnd - pollStart
+ } while (poll_ms > 0 && remaining > 0L)
+ return true
+ }
+
+ override fun close() {
+ conn.execSQLUpdate("UNLISTEN $channel");
+ conn.close()
+ }
+} \ No newline at end of file
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt b/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt
index 4664a55b..290341a4 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/WireGatewayApiHandlers.kt
@@ -129,25 +129,22 @@ fun Routing.talerWireGatewayHandlers(db: Database, ctx: BankApplicationContext)
var start = params.start
var delta = params.delta
val items = mutableListOf<T>()
+ val dbx = HistoryDatabaseCtx(db, delta, bankAccount.expectRowId(), direction, params.poll_ms);
+ dbx.use {
+ while (delta != 0L) {
+ val history = dbx.bankTransactionGetHistory(start, delta)
+ if (history.isEmpty() && !dbx.pool(start, delta))
+ break;
+ history.forEach {
+ val item = map(it);
+ // Advance cursor
+ start = it.expectRowId()
- while (delta != 0L) {
- val history = db.bankTransactionGetHistory(
- start = start,
- delta = delta,
- bankAccountId = bankAccount.expectRowId(),
- withDirection = direction
- )
- if (history.isEmpty())
- break; // TODO long polling here
- history.forEach {
- val item = map(it);
- // Advance cursor
- start = it.expectRowId()
-
- if (item != null) {
- items.add(item)
- // Reduce delta
- if (delta < 0) delta++ else delta--;
+ if (item != null) {
+ items.add(item)
+ // Reduce delta
+ if (delta < 0) delta++ else delta--;
+ }
}
}
}
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt b/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt
index 0efc116b..9b8e02e6 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt
@@ -365,7 +365,7 @@ fun getWithdrawal(db: Database, opIdParam: String): TalerWithdrawalOperation {
}
data class HistoryParams(
- val delta: Long, val start: Long
+ val delta: Long, val start: Long, val poll_ms: Long
)
/**
@@ -391,7 +391,16 @@ fun getHistoryParams(req: ApplicationRequest): HistoryParams {
throw badRequest("Param 'start' not a number")
}
}
- return HistoryParams(delta = delta, start = start)
+ val poll_ms: Long = when (val param = req.queryParameters["long_poll_ms"]) {
+ null -> 0
+ else -> try {
+ param.toLong()
+ } catch (e: Exception) {
+ logger.error(e.message)
+ throw badRequest("Param 'long_poll_ms' not a number")
+ }
+ }
+ return HistoryParams(delta = delta, start = start, poll_ms = poll_ms)
}
/**
diff --git a/bank/src/test/kotlin/TalerApiTest.kt b/bank/src/test/kotlin/TalerApiTest.kt
index c76fd30e..21b08f5b 100644
--- a/bank/src/test/kotlin/TalerApiTest.kt
+++ b/bank/src/test/kotlin/TalerApiTest.kt
@@ -6,6 +6,7 @@ import io.ktor.http.*
import io.ktor.server.testing.*
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonPrimitive
+import kotlinx.coroutines.*
import net.taler.wallet.crypto.Base32Crockford
import org.junit.Test
import tech.libeufin.bank.*
@@ -261,6 +262,50 @@ class TalerApiTest {
val j: IncomingHistory = Json.decodeFromString(this.bodyAsText())
assertEquals(5, j.incoming_transactions.size)
}
+
+ // Check no useless polling
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=-6&long_poll_ms=6000000") {
+ basicAuth("bar", "secret")
+ }.assertOk().run {
+ val j: IncomingHistory = Json.decodeFromString(this.bodyAsText())
+ assertEquals(5, j.incoming_transactions.size)
+ }
+
+ // Check polling end
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=60") {
+ basicAuth("bar", "secret")
+ }.assertOk().run {
+ val j: IncomingHistory = Json.decodeFromString(this.bodyAsText())
+ assertEquals(5, j.incoming_transactions.size)
+ }
+
+ // Check polling succedd
+ runBlocking {
+ launch {
+ delay(300)
+ db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess()
+ }
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=6000000") {
+ basicAuth("bar", "secret")
+ }.assertOk().run {
+ val j: IncomingHistory = Json.decodeFromString(this.bodyAsText())
+ assertEquals(6, j.incoming_transactions.size)
+ }
+ }
+
+ // Check polling timeout
+ runBlocking {
+ launch {
+ delay(300)
+ db.bankTransactionCreate(genTx(randShortHashCode().encoded)).assertSuccess()
+ }
+ client.get("/accounts/bar/taler-wire-gateway/history/incoming?delta=8&long_poll_ms=1000") {
+ basicAuth("bar", "secret")
+ }.assertOk().run {
+ val j: IncomingHistory = Json.decodeFromString(this.bodyAsText())
+ assertEquals(7, j.incoming_transactions.size)
+ }
+ }
// Testing ranges.
val mockReservePub = randShortHashCode().encoded
@@ -359,6 +404,50 @@ class TalerApiTest {
assertEquals(5, j.outgoing_transactions.size)
}
+ // Check no useless polling
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=-6&long_poll_ms=6000000") {
+ basicAuth("bar", "secret")
+ }.assertOk().run {
+ val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText())
+ assertEquals(5, j.outgoing_transactions.size)
+ }
+
+ // Check polling end
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=60") {
+ basicAuth("bar", "secret")
+ }.assertOk().run {
+ val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText())
+ assertEquals(5, j.outgoing_transactions.size)
+ }
+
+ // Check polling succedd
+ runBlocking {
+ launch {
+ delay(300)
+ transfer(db, 2, bankAccountFoo)
+ }
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=6000000") {
+ basicAuth("bar", "secret")
+ }.assertOk().run {
+ val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText())
+ assertEquals(6, j.outgoing_transactions.size)
+ }
+ }
+
+ // Check polling timeout
+ runBlocking {
+ launch {
+ delay(300)
+ transfer(db, 2, bankAccountFoo)
+ }
+ client.get("/accounts/bar/taler-wire-gateway/history/outgoing?delta=8&long_poll_ms=1000") {
+ basicAuth("bar", "secret")
+ }.assertOk().run {
+ val j: OutgoingHistory = Json.decodeFromString(this.bodyAsText())
+ assertEquals(7, j.outgoing_transactions.size)
+ }
+ }
+
// Testing ranges.
for (i in 1..400)
transfer(db, 2, bankAccountFoo)
diff --git a/database-versioning/procedures.sql b/database-versioning/procedures.sql
index cdf875aa..1af0d803 100644
--- a/database-versioning/procedures.sql
+++ b/database-versioning/procedures.sql
@@ -623,6 +623,11 @@ SET
balance=new_creditor_balance,
has_debt=will_creditor_have_debt
WHERE bank_account_id=in_creditor_account_id;
+
+-- notify transactions
+PERFORM pg_notify('debit_' || in_debtor_account_id, out_debit_row_id::text);
+PERFORM pg_notify('credit_' || in_creditor_account_id, out_credit_row_id::text);
+
RETURN;
END $$;