summaryrefslogtreecommitdiff
path: root/util/src/main/kotlin/DB.kt
diff options
context:
space:
mode:
authorMS <ms@taler.net>2023-03-10 17:37:08 +0100
committerMS <ms@taler.net>2023-03-10 17:37:08 +0100
commit889b88faf594891b2eb9fa35be832e74d553e730 (patch)
tree17ad625c5a322dd9db2a78e44a19535be45bed6b /util/src/main/kotlin/DB.kt
parent8188af594cf7ac6adb81601ea8fc1444bbd697b0 (diff)
downloadlibeufin-889b88faf594891b2eb9fa35be832e74d553e730.tar.gz
libeufin-889b88faf594891b2eb9fa35be832e74d553e730.tar.bz2
libeufin-889b88faf594891b2eb9fa35be832e74d553e730.zip
Implementing database notifications.
Diffstat (limited to 'util/src/main/kotlin/DB.kt')
-rw-r--r--util/src/main/kotlin/DB.kt112
1 files changed, 67 insertions, 45 deletions
diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index 63a213e2..beb5d12f 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -20,16 +20,44 @@
package tech.libeufin.util
import UtilError
import io.ktor.http.*
-import kotlinx.coroutines.delay
+import kotlinx.coroutines.Dispatchers
+import kotlinx.coroutines.coroutineScope
+import kotlinx.coroutines.withContext
import logger
+import net.taler.wallet.crypto.Base32Crockford
+import org.jetbrains.exposed.sql.Transaction
import org.postgresql.jdbc.PgConnection
-import java.lang.Long.max
-// This class abstracts the LISTEN/NOTIFY construct supported
-class PostgresListenNotify(
- private val conn: PgConnection,
- private val channel: String
-) {
+fun Transaction.isPostgres(): Boolean {
+ return this.db.vendor == "postgresql"
+}
+
+fun Transaction.getPgConnection(): PgConnection {
+ if (!this.isPostgres()) throw UtilError(
+ HttpStatusCode.InternalServerError,
+ "Unexpected non-postgresql connection: ${this.db.vendor}"
+ )
+ return this.db.connector().connection as PgConnection
+}
+
+// Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance.
+enum class NotificationsChannelDomains(val value: Int) {
+ LIBEUFIN_TALER_INCOMING(3000)
+}
+
+// Helper that builds a LISTEN-NOTIFY channel name.
+fun buildChannelName(
+ domain: NotificationsChannelDomains,
+ iban: String,
+ separator: String = "_"
+): String {
+ val channelElements = "${domain.value}$separator$iban"
+ return "X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}"
+}
+
+// This class abstracts Postgres' LISTEN/NOTIFY.
+// FIXME: find facts where Exposed provides always a live 'conn'.
+class PostgresListenNotify(val conn: PgConnection, val channel: String) {
fun postrgesListen() {
val stmt = conn.createStatement()
stmt.execute("LISTEN $channel")
@@ -41,44 +69,38 @@ class PostgresListenNotify(
stmt.close()
}
- suspend fun postgresWaitNotification(timeoutMs: Long) {
- // Splits the checks into 10ms chunks.
- val sleepTimeMs = 10L
- var notificationFound = false
- val iterations = timeoutMs / sleepTimeMs
- for (i in 0..iterations) {
- val maybeNotifications = conn.notifications
- // If a notification arrived, stop fetching for it.
- if (maybeNotifications.isNotEmpty()) {
- // Double checking that the channel is correct.
- // Notification(s) arrived, double-check channel name.
- maybeNotifications.forEach {
- if (it.name != channel) {
- throw UtilError(
- statusCode = HttpStatusCode.InternalServerError,
- reason = "Listener got wrong notification. Expected: $channel, but got: ${it.name}"
- )
- }
- }
- notificationFound = true
- break
- }
- /* Notification didn't arrive, release the thread and
- * retry in the next chunk. */
- delay(sleepTimeMs)
- }
+ fun postgresUnlisten() {
+ val stmt = conn.createStatement()
+ stmt.execute("UNLISTEN $channel")
+ stmt.close()
+ }
- if (!notificationFound) {
- throw UtilError(
- statusCode = HttpStatusCode.NotFound,
- reason = "Timeout expired for notification on channel $channel",
- ec = LibeufinErrorCode.LIBEUFIN_EC_TIMEOUT_EXPIRED
+ /**
+ * Asks Postgres for notifications with a timeout. Returns
+ * true when there have been, false otherwise.
+ */
+ fun postgresWaitNotification(timeoutMs: Long): Boolean {
+ if (timeoutMs == 0L)
+ logger.warn("Database notification checker has timeout == 0," +
+ " that waits FOREVER until a notification arrives."
)
+ val maybeNotifications = conn.getNotifications(timeoutMs.toInt())
+
+ /**
+ * This check works around the apparent API inconsistency
+ * where instead of returning null, a empty array is given
+ * back when there have been no notifications.
+ */
+ val noResultWorkaround = maybeNotifications.isEmpty()
+ /*if (noResultWorkaround) {
+ logger.warn("JDBC+Postgres: empty array from getNotifications() despite docs suggest null.")
+ }*/
+ if (maybeNotifications == null || noResultWorkaround) return false
+
+ for (n in maybeNotifications) {
+ if (n.name.lowercase() != this.channel.lowercase())
+ throw internalServerError("Channel ${this.channel} got notified from ${n.name}!")
}
- /* Notification arrived. In this current version
- * we don't pass any data to the caller; the channel
- * name itself means that the awaited information arrived.
- * */
- return
- }
- } \ No newline at end of file
+ return true
+ }
+} \ No newline at end of file