libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 889b88faf594891b2eb9fa35be832e74d553e730
parent 8188af594cf7ac6adb81601ea8fc1444bbd697b0
Author: MS <ms@taler.net>
Date:   Fri, 10 Mar 2023 17:37:08 +0100

Implementing database notifications.

Diffstat:
Mutil/src/main/kotlin/DB.kt | 114+++++++++++++++++++++++++++++++++++++++++++++++--------------------------------
Mutil/src/main/kotlin/HTTP.kt | 2+-
2 files changed, 69 insertions(+), 47 deletions(-)

diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt @@ -20,16 +20,44 @@ package tech.libeufin.util import UtilError import io.ktor.http.* -import kotlinx.coroutines.delay +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.withContext import logger +import net.taler.wallet.crypto.Base32Crockford +import org.jetbrains.exposed.sql.Transaction import org.postgresql.jdbc.PgConnection -import java.lang.Long.max -// This class abstracts the LISTEN/NOTIFY construct supported -class PostgresListenNotify( - private val conn: PgConnection, - private val channel: String -) { +fun Transaction.isPostgres(): Boolean { + return this.db.vendor == "postgresql" +} + +fun Transaction.getPgConnection(): PgConnection { + if (!this.isPostgres()) throw UtilError( + HttpStatusCode.InternalServerError, + "Unexpected non-postgresql connection: ${this.db.vendor}" + ) + return this.db.connector().connection as PgConnection +} + +// Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance. +enum class NotificationsChannelDomains(val value: Int) { + LIBEUFIN_TALER_INCOMING(3000) +} + +// Helper that builds a LISTEN-NOTIFY channel name. +fun buildChannelName( + domain: NotificationsChannelDomains, + iban: String, + separator: String = "_" +): String { + val channelElements = "${domain.value}$separator$iban" + return "X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}" +} + +// This class abstracts Postgres' LISTEN/NOTIFY. +// FIXME: find facts where Exposed provides always a live 'conn'. +class PostgresListenNotify(val conn: PgConnection, val channel: String) { fun postrgesListen() { val stmt = conn.createStatement() stmt.execute("LISTEN $channel") @@ -41,44 +69,38 @@ class PostgresListenNotify( stmt.close() } - suspend fun postgresWaitNotification(timeoutMs: Long) { - // Splits the checks into 10ms chunks. - val sleepTimeMs = 10L - var notificationFound = false - val iterations = timeoutMs / sleepTimeMs - for (i in 0..iterations) { - val maybeNotifications = conn.notifications - // If a notification arrived, stop fetching for it. - if (maybeNotifications.isNotEmpty()) { - // Double checking that the channel is correct. - // Notification(s) arrived, double-check channel name. - maybeNotifications.forEach { - if (it.name != channel) { - throw UtilError( - statusCode = HttpStatusCode.InternalServerError, - reason = "Listener got wrong notification. Expected: $channel, but got: ${it.name}" - ) - } - } - notificationFound = true - break - } - /* Notification didn't arrive, release the thread and - * retry in the next chunk. */ - delay(sleepTimeMs) - } + fun postgresUnlisten() { + val stmt = conn.createStatement() + stmt.execute("UNLISTEN $channel") + stmt.close() + } - if (!notificationFound) { - throw UtilError( - statusCode = HttpStatusCode.NotFound, - reason = "Timeout expired for notification on channel $channel", - ec = LibeufinErrorCode.LIBEUFIN_EC_TIMEOUT_EXPIRED + /** + * Asks Postgres for notifications with a timeout. Returns + * true when there have been, false otherwise. + */ + fun postgresWaitNotification(timeoutMs: Long): Boolean { + if (timeoutMs == 0L) + logger.warn("Database notification checker has timeout == 0," + + " that waits FOREVER until a notification arrives." ) + val maybeNotifications = conn.getNotifications(timeoutMs.toInt()) + + /** + * This check works around the apparent API inconsistency + * where instead of returning null, a empty array is given + * back when there have been no notifications. + */ + val noResultWorkaround = maybeNotifications.isEmpty() + /*if (noResultWorkaround) { + logger.warn("JDBC+Postgres: empty array from getNotifications() despite docs suggest null.") + }*/ + if (maybeNotifications == null || noResultWorkaround) return false + + for (n in maybeNotifications) { + if (n.name.lowercase() != this.channel.lowercase()) + throw internalServerError("Channel ${this.channel} got notified from ${n.name}!") } - /* Notification arrived. In this current version - * we don't pass any data to the caller; the channel - * name itself means that the awaited information arrived. - * */ - return - } - } -\ No newline at end of file + return true + } +} +\ No newline at end of file diff --git a/util/src/main/kotlin/HTTP.kt b/util/src/main/kotlin/HTTP.kt @@ -88,7 +88,7 @@ fun ApplicationRequest.getBaseUrl(): String { logger.info("Building X-Forwarded- base URL") // FIXME: should tolerate a missing X-Forwarded-Prefix. - var prefix: String = this.headers.get("X-Forwarded-Prefix") + var prefix: String = this.headers["X-Forwarded-Prefix"] ?: throw internalServerError("Reverse proxy did not define X-Forwarded-Prefix") if (!prefix.endsWith("/")) prefix += "/"