summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMS <ms@taler.net>2023-03-31 14:10:04 +0200
committerMS <ms@taler.net>2023-03-31 14:10:04 +0200
commit2d45653e2ceaca856e678987b1f8c5501fc29c82 (patch)
treea7d33d9842be4a26cd34399d27fc9d451f19a96d
parent84a5889dd8b49510f1b76fa68211070667d4d177 (diff)
downloadlibeufin-2d45653e2ceaca856e678987b1f8c5501fc29c82.tar.gz
libeufin-2d45653e2ceaca856e678987b1f8c5501fc29c82.tar.bz2
libeufin-2d45653e2ceaca856e678987b1f8c5501fc29c82.zip
Postgres notifications.
Closing the connection after delivery or unlisten.
-rw-r--r--util/src/main/kotlin/DB.kt97
1 files changed, 57 insertions, 40 deletions
diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index beb5d12f..90189257 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -20,24 +20,28 @@
package tech.libeufin.util
import UtilError
import io.ktor.http.*
-import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.withContext
import logger
import net.taler.wallet.crypto.Base32Crockford
+import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Transaction
+import org.jetbrains.exposed.sql.transactions.TransactionManager
+import org.jetbrains.exposed.sql.transactions.transaction
+import org.jetbrains.exposed.sql.transactions.transactionManager
+import org.postgresql.PGNotification
import org.postgresql.jdbc.PgConnection
fun Transaction.isPostgres(): Boolean {
return this.db.vendor == "postgresql"
}
-fun Transaction.getPgConnection(): PgConnection {
- if (!this.isPostgres()) throw UtilError(
- HttpStatusCode.InternalServerError,
- "Unexpected non-postgresql connection: ${this.db.vendor}"
+fun isPostgres(): Boolean {
+ val db = TransactionManager.defaultDatabase ?: throw internalServerError(
+ "Could not find the default database, can't check if that's Postgres."
)
- return this.db.connector().connection as PgConnection
+ return db.vendor == "postgresql"
+
}
// Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance.
@@ -52,55 +56,68 @@ fun buildChannelName(
separator: String = "_"
): String {
val channelElements = "${domain.value}$separator$iban"
- return "X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}"
+ val ret = "X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}"
+ logger.debug("Defining db channel name for IBAN: $iban, domain: ${domain.name}, resulting in: $ret")
+ return ret
}
-// This class abstracts Postgres' LISTEN/NOTIFY.
-// FIXME: find facts where Exposed provides always a live 'conn'.
-class PostgresListenNotify(val conn: PgConnection, val channel: String) {
- fun postrgesListen() {
- val stmt = conn.createStatement()
- stmt.execute("LISTEN $channel")
- stmt.close()
- }
- fun postgresNotify() {
+fun Transaction.postgresNotify(channel: String) {
+ this.exec("NOTIFY $channel")
+}
+
+/**
+ * postgresListen() and postgresGetNotifications() appear to have
+ * to use the same connection, in order for the notifications to
+ * arrive. Therefore, calling LISTEN inside one "transaction {}"
+ * and postgresGetNotifications() outside of it did NOT work because
+ * Exposed _closes_ the connection as soon as the transaction block
+ * completes. OTOH, calling postgresGetNotifications() _inside_ the
+ * same transaction block as LISTEN's would lead to keep the database
+ * locked for the timeout duration.
+ *
+ * For this reason, opening and keeping one connection open for the
+ * lifetime of this object and only executing postgresListen() and
+ * postgresGetNotifications() _on that connection_ makes the event
+ * delivery more reliable.
+ */
+class PostgresListenHandle(val channelName: String) {
+ private val db = TransactionManager.defaultDatabase ?: throw internalServerError(
+ "Could not find the default database, won't get Postgres notifications."
+ )
+ private val conn = db.connector().connection as PgConnection
+
+ fun postgresListen() {
val stmt = conn.createStatement()
- stmt.execute("NOTIFY $channel")
+ stmt.execute("LISTEN $channelName")
stmt.close()
+ logger.debug("LISTENing on channel: $channelName")
}
-
fun postgresUnlisten() {
val stmt = conn.createStatement()
- stmt.execute("UNLISTEN $channel")
+ stmt.execute("UNLISTEN $channelName")
stmt.close()
+ logger.debug("UNLISTENing on channel: $channelName")
+ conn.close()
}
- /**
- * Asks Postgres for notifications with a timeout. Returns
- * true when there have been, false otherwise.
- */
- fun postgresWaitNotification(timeoutMs: Long): Boolean {
+ fun postgresGetNotifications(timeoutMs: Long): Boolean {
if (timeoutMs == 0L)
logger.warn("Database notification checker has timeout == 0," +
" that waits FOREVER until a notification arrives."
)
- val maybeNotifications = conn.getNotifications(timeoutMs.toInt())
-
- /**
- * This check works around the apparent API inconsistency
- * where instead of returning null, a empty array is given
- * back when there have been no notifications.
- */
- val noResultWorkaround = maybeNotifications.isEmpty()
- /*if (noResultWorkaround) {
- logger.warn("JDBC+Postgres: empty array from getNotifications() despite docs suggest null.")
- }*/
- if (maybeNotifications == null || noResultWorkaround) return false
-
+ logger.debug("Waiting Postgres notifications on channel " +
+ "'$channelName' for $timeoutMs millis.")
+ val maybeNotifications = this.conn.getNotifications(timeoutMs.toInt())
+ if (maybeNotifications == null || maybeNotifications.isEmpty()) {
+ logger.debug("DB notification channel $channelName was found empty.")
+ return false
+ }
for (n in maybeNotifications) {
- if (n.name.lowercase() != this.channel.lowercase())
- throw internalServerError("Channel ${this.channel} got notified from ${n.name}!")
+ if (n.name.lowercase() != channelName.lowercase()) {
+ throw internalServerError("Channel $channelName got notified from ${n.name}!")
+ }
}
+ logger.debug("Found DB notifications on channel $channelName")
return true
}
} \ No newline at end of file