diff options
author | MS <ms@taler.net> | 2023-03-10 17:37:08 +0100 |
---|---|---|
committer | MS <ms@taler.net> | 2023-03-10 17:37:08 +0100 |
commit | 889b88faf594891b2eb9fa35be832e74d553e730 (patch) | |
tree | 17ad625c5a322dd9db2a78e44a19535be45bed6b /util/src/main/kotlin/DB.kt | |
parent | 8188af594cf7ac6adb81601ea8fc1444bbd697b0 (diff) | |
download | libeufin-889b88faf594891b2eb9fa35be832e74d553e730.tar.gz libeufin-889b88faf594891b2eb9fa35be832e74d553e730.tar.bz2 libeufin-889b88faf594891b2eb9fa35be832e74d553e730.zip |
Implementing database notifications.
Diffstat (limited to 'util/src/main/kotlin/DB.kt')
-rw-r--r-- | util/src/main/kotlin/DB.kt | 112 |
1 files changed, 67 insertions, 45 deletions
diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt index 63a213e2..beb5d12f 100644 --- a/util/src/main/kotlin/DB.kt +++ b/util/src/main/kotlin/DB.kt @@ -20,16 +20,44 @@ package tech.libeufin.util import UtilError import io.ktor.http.* -import kotlinx.coroutines.delay +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.withContext import logger +import net.taler.wallet.crypto.Base32Crockford +import org.jetbrains.exposed.sql.Transaction import org.postgresql.jdbc.PgConnection -import java.lang.Long.max -// This class abstracts the LISTEN/NOTIFY construct supported -class PostgresListenNotify( - private val conn: PgConnection, - private val channel: String -) { +fun Transaction.isPostgres(): Boolean { + return this.db.vendor == "postgresql" +} + +fun Transaction.getPgConnection(): PgConnection { + if (!this.isPostgres()) throw UtilError( + HttpStatusCode.InternalServerError, + "Unexpected non-postgresql connection: ${this.db.vendor}" + ) + return this.db.connector().connection as PgConnection +} + +// Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance. +enum class NotificationsChannelDomains(val value: Int) { + LIBEUFIN_TALER_INCOMING(3000) +} + +// Helper that builds a LISTEN-NOTIFY channel name. +fun buildChannelName( + domain: NotificationsChannelDomains, + iban: String, + separator: String = "_" +): String { + val channelElements = "${domain.value}$separator$iban" + return "X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}" +} + +// This class abstracts Postgres' LISTEN/NOTIFY. +// FIXME: find facts where Exposed provides always a live 'conn'. +class PostgresListenNotify(val conn: PgConnection, val channel: String) { fun postrgesListen() { val stmt = conn.createStatement() stmt.execute("LISTEN $channel") @@ -41,44 +69,38 @@ class PostgresListenNotify( stmt.close() } - suspend fun postgresWaitNotification(timeoutMs: Long) { - // Splits the checks into 10ms chunks. - val sleepTimeMs = 10L - var notificationFound = false - val iterations = timeoutMs / sleepTimeMs - for (i in 0..iterations) { - val maybeNotifications = conn.notifications - // If a notification arrived, stop fetching for it. - if (maybeNotifications.isNotEmpty()) { - // Double checking that the channel is correct. - // Notification(s) arrived, double-check channel name. - maybeNotifications.forEach { - if (it.name != channel) { - throw UtilError( - statusCode = HttpStatusCode.InternalServerError, - reason = "Listener got wrong notification. Expected: $channel, but got: ${it.name}" - ) - } - } - notificationFound = true - break - } - /* Notification didn't arrive, release the thread and - * retry in the next chunk. */ - delay(sleepTimeMs) - } + fun postgresUnlisten() { + val stmt = conn.createStatement() + stmt.execute("UNLISTEN $channel") + stmt.close() + } - if (!notificationFound) { - throw UtilError( - statusCode = HttpStatusCode.NotFound, - reason = "Timeout expired for notification on channel $channel", - ec = LibeufinErrorCode.LIBEUFIN_EC_TIMEOUT_EXPIRED + /** + * Asks Postgres for notifications with a timeout. Returns + * true when there have been, false otherwise. + */ + fun postgresWaitNotification(timeoutMs: Long): Boolean { + if (timeoutMs == 0L) + logger.warn("Database notification checker has timeout == 0," + + " that waits FOREVER until a notification arrives." ) + val maybeNotifications = conn.getNotifications(timeoutMs.toInt()) + + /** + * This check works around the apparent API inconsistency + * where instead of returning null, a empty array is given + * back when there have been no notifications. + */ + val noResultWorkaround = maybeNotifications.isEmpty() + /*if (noResultWorkaround) { + logger.warn("JDBC+Postgres: empty array from getNotifications() despite docs suggest null.") + }*/ + if (maybeNotifications == null || noResultWorkaround) return false + + for (n in maybeNotifications) { + if (n.name.lowercase() != this.channel.lowercase()) + throw internalServerError("Channel ${this.channel} got notified from ${n.name}!") } - /* Notification arrived. In this current version - * we don't pass any data to the caller; the channel - * name itself means that the awaited information arrived. - * */ - return - } - }
\ No newline at end of file + return true + } +}
\ No newline at end of file |