summaryrefslogtreecommitdiff
path: root/util/src/main/kotlin/DB.kt
blob: a4ccbda3103f02082da5869fee9664bc055490c2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
/*
 * This file is part of LibEuFin.
 * Copyright (C) 2020 Taler Systems S.A.
 *
 * LibEuFin is free software; you can redistribute it and/or modify
 * it under the terms of the GNU Affero General Public License as
 * published by the Free Software Foundation; either version 3, or
 * (at your option) any later version.
 *
 * LibEuFin is distributed in the hope that it will be useful, but
 * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
 * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General
 * Public License for more details.
 *
 * You should have received a copy of the GNU Affero General Public
 * License along with LibEuFin; see the file COPYING.  If not, see
 * <http://www.gnu.org/licenses/>
 */

package tech.libeufin.util
import UtilError
import io.ktor.http.*
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import logger
import net.taler.wallet.crypto.Base32Crockford
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Transaction
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction
import org.jetbrains.exposed.sql.transactions.transactionManager
import org.postgresql.PGNotification
import org.postgresql.jdbc.PgConnection

fun Transaction.isPostgres(): Boolean {
    return this.db.vendor == "postgresql"
}

fun isPostgres(): Boolean {
    val db = TransactionManager.defaultDatabase ?: throw internalServerError(
        "Could not find the default database, can't check if that's Postgres."
    )
    return db.vendor == "postgresql"

}

// Check GANA (https://docs.gnunet.org/gana/index.html) for numbers allowance.
enum class NotificationsChannelDomains(val value: Int) {
    LIBEUFIN_TALER_INCOMING(3000)
}

// Helper that builds a LISTEN-NOTIFY channel name.
fun buildChannelName(
    domain: NotificationsChannelDomains,
    iban: String,
    separator: String = "_"
): String {
    val channelElements = "${domain.value}$separator$iban"
    val ret = "X${Base32Crockford.encode(CryptoUtil.hashStringSHA256(channelElements))}"
    logger.debug("Defining db channel name for IBAN: $iban, domain: ${domain.name}, resulting in: $ret")
    return ret
}

fun Transaction.postgresNotify(channel: String) {
    this.exec("NOTIFY $channel")
}

/**
 * postgresListen() and postgresGetNotifications() appear to have
 * to use the same connection, in order for the notifications to
 * arrive.  Therefore, calling LISTEN inside one "transaction {}"
 * and postgresGetNotifications() outside of it did NOT work because
 * Exposed _closes_ the connection as soon as the transaction block
 * completes. OTOH, calling postgresGetNotifications() _inside_ the
 * same transaction block as LISTEN's would lead to keep the database
 * locked for the timeout duration.
 *
 * For this reason, opening and keeping one connection open for the
 * lifetime of this object and only executing postgresListen() and
 * postgresGetNotifications() _on that connection_ makes the event
 * delivery more reliable.
 */
class PostgresListenHandle(val channelName: String) {
    private val db = TransactionManager.defaultDatabase ?: throw internalServerError(
        "Could not find the default database, won't get Postgres notifications."
    )
    private val conn = db.connector().connection as PgConnection

    fun postgresListen() {
        val stmt = conn.createStatement()
        stmt.execute("LISTEN $channelName")
        stmt.close()
        logger.debug("LISTENing on channel: $channelName")
    }
    fun postgresUnlisten() {
        val stmt = conn.createStatement()
        stmt.execute("UNLISTEN $channelName")
        stmt.close()
        logger.debug("UNLISTENing on channel: $channelName")
        conn.close()
    }

    fun postgresGetNotifications(timeoutMs: Long): Boolean {
        if (timeoutMs == 0L)
            logger.warn("Database notification checker has timeout == 0," +
                    " that waits FOREVER until a notification arrives."
            )
        logger.debug("Waiting Postgres notifications on channel " +
                "'$channelName' for $timeoutMs millis.")
        val maybeNotifications = this.conn.getNotifications(timeoutMs.toInt())
        if (maybeNotifications == null || maybeNotifications.isEmpty()) {
            logger.debug("DB notification channel $channelName was found empty.")
            conn.close()
            return false
        }
        for (n in maybeNotifications) {
            if (n.name.lowercase() != channelName.lowercase()) {
                conn.close()
                throw internalServerError("Channel $channelName got notified from ${n.name}!")
            }
        }
        logger.debug("Found DB notifications on channel $channelName")
        conn.close()
        return true
    }
}