summaryrefslogtreecommitdiff
path: root/util/src/main/kotlin/DB.kt
diff options
context:
space:
mode:
authorFlorian Dold <florian@dold.me>2023-09-22 18:09:59 +0200
committerFlorian Dold <florian@dold.me>2023-09-22 18:09:59 +0200
commitdee223922daacb357e79b711db568f97d85bb9e0 (patch)
tree075f2efb049075d37a03e473ad980c781b65c587 /util/src/main/kotlin/DB.kt
parent2befa711f29e7c4b3f2299dabdc51ec23419b2a1 (diff)
downloadlibeufin-dee223922daacb357e79b711db568f97d85bb9e0.tar.gz
libeufin-dee223922daacb357e79b711db568f97d85bb9e0.tar.bz2
libeufin-dee223922daacb357e79b711db568f97d85bb9e0.zip
re-add support for unix socket DB connection
Diffstat (limited to 'util/src/main/kotlin/DB.kt')
-rw-r--r--util/src/main/kotlin/DB.kt47
1 files changed, 30 insertions, 17 deletions
diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index 7ae47228..d79038b0 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -18,10 +18,10 @@
*/
package tech.libeufin.util
+
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
-import logger
import net.taler.wallet.crypto.Base32Crockford
import org.jetbrains.exposed.sql.Database
import org.jetbrains.exposed.sql.Transaction
@@ -29,8 +29,12 @@ import org.jetbrains.exposed.sql.name
import org.jetbrains.exposed.sql.transactions.TransactionManager
import org.jetbrains.exposed.sql.transactions.transaction
import org.postgresql.jdbc.PgConnection
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
import java.net.URI
+private val logger: Logger = LoggerFactory.getLogger("tech.libeufin.util.DB")
+
fun getCurrentUser(): String = System.getProperty("user.name")
fun isPostgres(): Boolean {
@@ -51,13 +55,16 @@ fun isPostgres(): Boolean {
enum class NotificationsChannelDomains(val value: Int) {
// When payments with well-formed Taler subject arrive.
LIBEUFIN_TALER_INCOMING(3000),
+
// A transaction happened for a particular user. The payload
// informs about the direction.
LIBEUFIN_REGIO_TX(3001),
+
// When an incoming fiat payment is downloaded from Nexus.
// Happens when a customer wants to withdraw Taler coins in the
// regional currency.
LIBEUFIN_SANDBOX_FIAT_INCOMING(3002),
+
// When Nexus has ingested a new transactions from the bank it
// is connected to. This event carries incoming and outgoing
// payments, and it specifies that in its payload. The direction
@@ -86,7 +93,7 @@ fun buildChannelName(
fun Transaction.postgresNotify(
channel: String,
payload: String? = null
- ) {
+) {
logger.debug("Sending NOTIFY on channel '$channel' with payload '$payload'")
if (payload != null) {
val argEnc = Base32Crockford.encode(payload.toByteArray())
@@ -120,8 +127,10 @@ class PostgresListenHandle(val channelName: String) {
"Could not find the default database, won't get Postgres notifications."
)
private val conn = db.connector().connection as PgConnection
+
// Gets set to the NOTIFY's payload, in case one exists.
var receivedPayload: String? = null
+
// Signals whether the connection should be kept open,
// after one (and possibly not expected) event arrives.
// This gives more flexibility to the caller.
@@ -133,6 +142,7 @@ class PostgresListenHandle(val channelName: String) {
stmt.close()
logger.debug("LISTENing on channel: $channelName")
}
+
fun postgresUnlisten() {
val stmt = conn.createStatement()
stmt.execute("UNLISTEN $channelName")
@@ -149,11 +159,14 @@ class PostgresListenHandle(val channelName: String) {
fun postgresGetNotifications(timeoutMs: Long): Boolean {
if (timeoutMs == 0L)
- logger.info("Database notification checker has timeout == 0," +
- " that waits FOREVER until a notification arrives."
+ logger.info(
+ "Database notification checker has timeout == 0," +
+ " that waits FOREVER until a notification arrives."
)
- logger.debug("Waiting Postgres notifications on channel " +
- "'$channelName' for $timeoutMs millis.")
+ logger.debug(
+ "Waiting Postgres notifications on channel " +
+ "'$channelName' for $timeoutMs millis."
+ )
val maybeNotifications = this.conn.getNotifications(timeoutMs.toInt())
if (maybeNotifications == null || maybeNotifications.isEmpty()) {
logger.debug("DB notifications not found on channel $channelName.")
@@ -169,7 +182,7 @@ class PostgresListenHandle(val channelName: String) {
logger.debug("Found DB notifications on channel $channelName")
// Only ever used for singleton notifications.
assert(maybeNotifications.size == 1)
- if(maybeNotifications[0].parameter.isNotEmpty())
+ if (maybeNotifications[0].parameter.isNotEmpty())
this.receivedPayload = maybeNotifications[0].parameter
this.likelyCloseConnection()
return true
@@ -245,32 +258,31 @@ fun connectWithSchema(jdbcConn: String, schemaName: String? = null) {
conn.schema = schemaName
}
)
- try { transaction { this.db.name } }
- catch (e: Throwable) {
+ try {
+ transaction { this.db.name }
+ } catch (e: Throwable) {
logger.error("Test query failed: ${e.message}")
throw Exception("Failed connection to: $jdbcConn")
}
}
-// Prepends "jdbc:" to the Postgres database connection string.
-fun getJdbcConnectionFromPg(pgConn: String): String {
- return "jdbc:$pgConn"
-}
/**
* This function converts a postgresql://-URI to a JDBC one.
* It is only needed because JDBC strings based on Unix domain
* sockets need individual intervention.
*/
-fun _getJdbcConnectionFromPg(pgConn: String): String {
+fun getJdbcConnectionFromPg(pgConn: String): String {
if (!pgConn.startsWith("postgresql://") && !pgConn.startsWith("postgres://")) {
logger.info("Not a Postgres connection string: $pgConn")
throw Exception("Not a Postgres connection string: $pgConn")
}
var maybeUnixSocket = false
val parsed = URI(pgConn)
- val hostAsParam: String? = if (parsed.query != null)
+ val hostAsParam: String? = if (parsed.query != null) {
getQueryParam(parsed.query, "host")
- else null
+ } else {
+ null
+ }
/**
* In some cases, it is possible to leave the hostname empty
* and specify it via a query param, therefore a "postgresql:///"-starting
@@ -278,7 +290,8 @@ fun _getJdbcConnectionFromPg(pgConn: String): String {
* https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING
*/
if (parsed.host == null &&
- (hostAsParam == null || hostAsParam.startsWith('/'))) {
+ (hostAsParam == null || hostAsParam.startsWith('/'))
+ ) {
maybeUnixSocket = true
}
if (maybeUnixSocket) {