diff options
author | MS <ms@taler.net> | 2023-03-31 14:10:04 +0200 |
---|---|---|
committer | MS <ms@taler.net> | 2023-03-31 14:10:04 +0200 |
commit | 2d45653e2ceaca856e678987b1f8c5501fc29c82 (patch) | |
tree | a7d33d9842be4a26cd34399d27fc9d451f19a96d | |
parent | 84a5889dd8b49510f1b76fa68211070667d4d177 (diff) | |
download | libeufin-2d45653e2ceaca856e678987b1f8c5501fc29c82.tar.gz libeufin-2d45653e2ceaca856e678987b1f8c5501fc29c82.tar.bz2 libeufin-2d45653e2ceaca856e678987b1f8c5501fc29c82.zip |
Postgres notifications.
Closing the connection after delivery or unlisten.
-rw-r--r-- | util/src/main/kotlin/DB.kt | 97 |
1 files changed, 57 insertions, 40 deletions
diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt index beb5d12f..90189257 100644 --- a/util/src/main/kotlin/DB.kt +++ b/util/src/main/kotlin/DB.kt @@ -20,24 +20,28 @@ package tech.libeufin.util import UtilError import io.ktor.http.* -import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.withContext import logger import net.taler.wallet.crypto.Base32Crockford +import org.jetbrains.exposed.sql.Database import org.jetbrains.exposed.sql.Transaction +import org.jetbrains.exposed.sql.transactions.TransactionManager +import org.jetbrains.exposed.sql.transactions.transaction +import org.jetbrains.exposed.sql.transactions.transactionManager +import org.postgresql.PGNotification import org.postgresql.jdbc.PgConnection fun Transaction.isPostgres(): Boolean { return this.db.vendor == "postgresql" } -fun Transaction.getPgConnection(): PgConnection { - if (!this.isPostgres()) throw UtilError( - HttpStatusCode.InternalServerError, - "Unexpected non-postgresql connection: ${this.db.vendor}" +fun isPostgres(): Boolean { + val db = TransactionManager.defaultDatabase ?: throw internalServerError( + "Could not find the default database, can't check if that's Postgres." ) - return this.db.connector().connection as PgConnection + return db.vendor == "postgresql" + } // Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance. @@ -52,55 +56,68 @@ fun buildChannelName( separator: String = "_" ): String { val channelElements = "${domain.value}$separator$iban" - return "X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}" + val ret = "X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}" + logger.debug("Defining db channel name for IBAN: $iban, domain: ${domain.name}, resulting in: $ret") + return ret } -// This class abstracts Postgres' LISTEN/NOTIFY. -// FIXME: find facts where Exposed provides always a live 'conn'. -class PostgresListenNotify(val conn: PgConnection, val channel: String) { - fun postrgesListen() { - val stmt = conn.createStatement() - stmt.execute("LISTEN $channel") - stmt.close() - } - fun postgresNotify() { +fun Transaction.postgresNotify(channel: String) { + this.exec("NOTIFY $channel") +} + +/** + * postgresListen() and postgresGetNotifications() appear to have + * to use the same connection, in order for the notifications to + * arrive. Therefore, calling LISTEN inside one "transaction {}" + * and postgresGetNotifications() outside of it did NOT work because + * Exposed _closes_ the connection as soon as the transaction block + * completes. OTOH, calling postgresGetNotifications() _inside_ the + * same transaction block as LISTEN's would lead to keep the database + * locked for the timeout duration. + * + * For this reason, opening and keeping one connection open for the + * lifetime of this object and only executing postgresListen() and + * postgresGetNotifications() _on that connection_ makes the event + * delivery more reliable. + */ +class PostgresListenHandle(val channelName: String) { + private val db = TransactionManager.defaultDatabase ?: throw internalServerError( + "Could not find the default database, won't get Postgres notifications." + ) + private val conn = db.connector().connection as PgConnection + + fun postgresListen() { val stmt = conn.createStatement() - stmt.execute("NOTIFY $channel") + stmt.execute("LISTEN $channelName") stmt.close() + logger.debug("LISTENing on channel: $channelName") } - fun postgresUnlisten() { val stmt = conn.createStatement() - stmt.execute("UNLISTEN $channel") + stmt.execute("UNLISTEN $channelName") stmt.close() + logger.debug("UNLISTENing on channel: $channelName") + conn.close() } - /** - * Asks Postgres for notifications with a timeout. Returns - * true when there have been, false otherwise. - */ - fun postgresWaitNotification(timeoutMs: Long): Boolean { + fun postgresGetNotifications(timeoutMs: Long): Boolean { if (timeoutMs == 0L) logger.warn("Database notification checker has timeout == 0," + " that waits FOREVER until a notification arrives." ) - val maybeNotifications = conn.getNotifications(timeoutMs.toInt()) - - /** - * This check works around the apparent API inconsistency - * where instead of returning null, a empty array is given - * back when there have been no notifications. - */ - val noResultWorkaround = maybeNotifications.isEmpty() - /*if (noResultWorkaround) { - logger.warn("JDBC+Postgres: empty array from getNotifications() despite docs suggest null.") - }*/ - if (maybeNotifications == null || noResultWorkaround) return false - + logger.debug("Waiting Postgres notifications on channel " + + "'$channelName' for $timeoutMs millis.") + val maybeNotifications = this.conn.getNotifications(timeoutMs.toInt()) + if (maybeNotifications == null || maybeNotifications.isEmpty()) { + logger.debug("DB notification channel $channelName was found empty.") + return false + } for (n in maybeNotifications) { - if (n.name.lowercase() != this.channel.lowercase()) - throw internalServerError("Channel ${this.channel} got notified from ${n.name}!") + if (n.name.lowercase() != channelName.lowercase()) { + throw internalServerError("Channel $channelName got notified from ${n.name}!") + } } + logger.debug("Found DB notifications on channel $channelName") return true } }
\ No newline at end of file |