libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 382c5fc8c0008160f7ed4e85c49053164f2c6915
parent 8ca8c368bca8ef9b2963a75eb83f9d32a0cd5eda
Author: MS <ms@taler.net>
Date:   Wed, 12 Apr 2023 11:11:09 +0200

DB events.

Implementing a poller that only succeeds if the channel
name and its payload are expected.

Diffstat:
Mutil/src/main/kotlin/DB.kt | 120++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------
1 file changed, 104 insertions(+), 16 deletions(-)

diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt @@ -18,18 +18,13 @@ */ package tech.libeufin.util -import UtilError -import io.ktor.http.* +import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope import logger import net.taler.wallet.crypto.Base32Crockford -import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Transaction import org.jetbrains.exposed.sql.transactions.TransactionManager -import org.jetbrains.exposed.sql.transactions.transaction -import org.jetbrains.exposed.sql.transactions.transactionManager -import org.postgresql.PGNotification import org.postgresql.jdbc.PgConnection fun Transaction.isPostgres(): Boolean { @@ -46,22 +41,51 @@ fun isPostgres(): Boolean { // Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance. enum class NotificationsChannelDomains(val value: Int) { - LIBEUFIN_TALER_INCOMING(3000) + // When payments with well-formed Taler subject arrive. + LIBEUFIN_TALER_INCOMING(3000), + // A transaction happened for a particular user. The payload + // informs about the direction. + LIBEUFIN_REGIO_TX(3001), + // When an incoming fiat payment is downloaded from Nexus. + // Happens when a customer wants to withdraw Taler coins in the + // regional currency. + LIBEUFIN_SANDBOX_FIAT_INCOMING(3002), + // When Nexus discovers a new transactions from the bank it + // is connected to. This even may wake up a client who is waiting + // on Nexus' GET /transactions. + LIBEUFIN_NEXUS_FIAT_INCOMING(3003) } -// Helper that builds a LISTEN-NOTIFY channel name. +/** + * Helper that builds a LISTEN-NOTIFY channel name. + * 'salt' should be any value that would uniquely deliver the + * message to its receiver. IBANs are ideal, but they cost DB queries. + */ + fun buildChannelName( domain: NotificationsChannelDomains, - iban: String, + salt: String, separator: String = "_" ): String { - val channelElements = "${domain.value}$separator$iban" + val channelElements = "${domain.value}$separator$salt" val ret = "X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}" - logger.debug("Defining db channel name for IBAN: $iban, domain: ${domain.name}, resulting in: $ret") + logger.debug("Defining db channel name for salt: $salt, domain: ${domain.name}, resulting in: $ret") return ret } -fun Transaction.postgresNotify(channel: String) { +fun Transaction.postgresNotify( + channel: String, + payload: String? = null + ) { + if (payload != null) { + val argEnc = Base32Crockford.encode(payload.toByteArray()) + if (payload.toByteArray().size > 8000) + throw internalServerError( + "DB notification on channel $channel used >8000 bytes payload '$payload'" + ) + this.exec("NOTIFY $channel, '$argEnc'") + return + } this.exec("NOTIFY $channel") } @@ -85,6 +109,12 @@ class PostgresListenHandle(val channelName: String) { "Could not find the default database, won't get Postgres notifications." ) private val conn = db.connector().connection as PgConnection + // Gets set to the NOTIFY's payload, in case one exists. + var receivedPayload: String? = null + // Signals whether the connection should be kept open, + // after one (and possibly not expected) event arrives. + // This gives more flexibility to the caller. + var keepConnection: Boolean = false fun postgresListen() { val stmt = conn.createStatement() @@ -100,7 +130,16 @@ class PostgresListenHandle(val channelName: String) { conn.close() } - fun postgresGetNotifications(timeoutMs: Long): Boolean { + private fun likelyCloseConnection() { + if (this.keepConnection) + return + this.conn.close() + } + + fun postgresGetNotifications( + timeoutMs: Long, + keepConnectionOpen: Boolean = false + ): Boolean { if (timeoutMs == 0L) logger.warn("Database notification checker has timeout == 0," + " that waits FOREVER until a notification arrives." @@ -110,17 +149,66 @@ class PostgresListenHandle(val channelName: String) { val maybeNotifications = this.conn.getNotifications(timeoutMs.toInt()) if (maybeNotifications == null || maybeNotifications.isEmpty()) { logger.debug("DB notification channel $channelName was found empty.") - conn.close() + this.likelyCloseConnection() return false } for (n in maybeNotifications) { if (n.name.lowercase() != channelName.lowercase()) { - conn.close() + conn.close() // always close on error, without the optional check. throw internalServerError("Channel $channelName got notified from ${n.name}!") } } logger.debug("Found DB notifications on channel $channelName") - conn.close() + // Only ever used for singleton notifications. + assert(maybeNotifications.size == 1) + if(maybeNotifications[0].parameter.isNotEmpty()) + this.receivedPayload = maybeNotifications[0].parameter + this.likelyCloseConnection() return true } + + // Wrapper around the core method "postgresGetNotifications()" that + // sets up the coroutine environment to wait and release the execution. + suspend fun waitOnIODispatchers(timeoutMs: Long): Boolean = + coroutineScope { + async(Dispatchers.IO) { + postgresGetNotifications(timeoutMs) + }.await() + } + + /** + * Waits at most 'timeoutMs' on 'this.channelName' for + * the one particular payload that's passed in the 'payload' + * argument. FIXME: will be used along the fiat side of cash-outs. + */ + suspend fun waitOnIoDispatchersForPayload( + timeoutMs: Long, + expectedPayload: String + ): Boolean { + var leftTime = timeoutMs + val expectedPayloadEnc = Base32Crockford.encode(expectedPayload.toByteArray()) + /** + * This setting allows the loop to reuse the open connection, + * otherwise the internal loop would close it if one unexpected + * payload wakes it up. + */ + this.keepConnection = true + while (leftTime > 0) { + val loopStart = System.currentTimeMillis() + // Ask for notifications. + val maybeNotification = waitOnIODispatchers(leftTime) + // One arrived, check the payload. + if (maybeNotification) { + if (this.receivedPayload != null && this.receivedPayload == expectedPayloadEnc) { + conn.close() + return true + } + } + val loopEnd = System.currentTimeMillis() + // Account the spent time. + leftTime -= loopEnd - loopStart + } + conn.close() + return false + } } \ No newline at end of file