summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntoine A <>2024-01-23 16:32:31 +0100
committerAntoine A <>2024-01-23 16:32:31 +0100
commit73a98866810b47e341b00c5e2fcc0fc4f9a22cbe (patch)
tree780dce068547dfaa8ab86050d62a83914e96ca5f
parentad7171c999f48236b459b0b75a8b0b77e7399a48 (diff)
downloadlibeufin-73a98866810b47e341b00c5e2fcc0fc4f9a22cbe.tar.gz
libeufin-73a98866810b47e341b00c5e2fcc0fc4f9a22cbe.tar.bz2
libeufin-73a98866810b47e341b00c5e2fcc0fc4f9a22cbe.zip
Share database logic
-rw-r--r--bank/build.gradle1
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/Constants.kt6
-rw-r--r--bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt54
-rw-r--r--bank/src/test/kotlin/AmountTest.kt7
-rw-r--r--integration/src/main/kotlin/Main.kt2
-rw-r--r--integration/src/test/kotlin/IntegrationTest.kt4
-rw-r--r--nexus/build.gradle1
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt89
-rw-r--r--nexus/src/test/kotlin/ConfigLoading.kt1
-rw-r--r--nexus/src/test/kotlin/DatabaseTest.kt10
-rw-r--r--util/build.gradle1
-rw-r--r--util/src/main/kotlin/Constants.kt23
-rw-r--r--util/src/main/kotlin/DB.kt52
13 files changed, 117 insertions, 134 deletions
diff --git a/bank/build.gradle b/bank/build.gradle
index 0f0bb006..fa43b2ae 100644
--- a/bank/build.gradle
+++ b/bank/build.gradle
@@ -25,7 +25,6 @@ dependencies {
implementation(project(":util"))
implementation("org.postgresql:postgresql:$postgres_version")
- implementation("com.zaxxer:HikariCP:5.0.1")
implementation("com.github.ajalt.clikt:clikt:$clikt_version")
implementation("io.ktor:ktor-server-core:$ktor_version")
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt b/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt
index a1c65035..07d23469 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt
@@ -39,11 +39,7 @@ const val IBAN_ALLOCATION_RETRY_COUNTER: Int = 5;
// Security
const val MAX_BODY_LENGTH: Long = 4 * 1024 // 4kB
-// DB
-const val MIN_VERSION: Int = 14
-const val SERIALIZATION_RETRY: Int = 10;
-
-// API version
+// API version
const val COREBANK_API_VERSION: String = "4:0:0"
const val CONVERSION_API_VERSION: String = "0:0:0"
const val INTEGRATION_API_VERSION: String = "2:0:2"
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt
index 27cbfc2f..53b11c1d 100644
--- a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt
+++ b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt
@@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit
import kotlin.math.abs
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
-import com.zaxxer.hikari.*
import tech.libeufin.util.*
import io.ktor.http.HttpStatusCode
import net.taler.common.errorcodes.TalerErrorCode
@@ -59,27 +58,8 @@ private val logger: Logger = LoggerFactory.getLogger("libeufin-bank-db")
internal fun faultyTimestampByBank() = internalServerError("Bank took overflowing timestamp")
internal fun faultyDurationByClient() = badRequest("Overflowing duration, please specify 'forever' instead.")
-class Database(dbConfig: String, internal val bankCurrency: String, internal val fiatCurrency: String?): java.io.Closeable {
- val dbPool: HikariDataSource
- internal val notifWatcher: NotificationWatcher
-
- init {
- val pgSource = pgDataSource(dbConfig)
- val config = HikariConfig();
- config.dataSource = pgSource
- config.connectionInitSql = "SET search_path TO libeufin_bank;SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;"
- config.validate()
- dbPool = HikariDataSource(config);
- dbPool.getConnection().use { con ->
- val meta = con.getMetaData();
- val majorVersion = meta.getDatabaseMajorVersion()
- val minorVersion = meta.getDatabaseMinorVersion()
- if (majorVersion < MIN_VERSION) {
- throw Exception("postgres version must be at least $MIN_VERSION.0 got $majorVersion.$minorVersion")
- }
- }
- notifWatcher = NotificationWatcher(pgSource)
- }
+class Database(dbConfig: String, internal val bankCurrency: String, internal val fiatCurrency: String?): DbPool(dbConfig, "libeufin_bank") {
+ internal val notifWatcher: NotificationWatcher = NotificationWatcher(pgSource)
val cashout = CashoutDAO(this)
val withdrawal = WithdrawalDAO(this)
@@ -142,36 +122,6 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val
} ?: throw internalServerError("No result from DB procedure stats_get_frame")
}
- override fun close() {
- dbPool.close()
- }
-
- suspend fun <R> conn(lambda: suspend (PgConnection) -> R): R {
- // Use a coroutine dispatcher that we can block as JDBC API is blocking
- return withContext(Dispatchers.IO) {
- val conn = dbPool.getConnection()
- conn.use{ it -> lambda(it.unwrap(PgConnection::class.java)) }
- }
- }
-
-
- suspend fun <R> serializable(lambda: suspend (PgConnection) -> R): R = conn { conn ->
- repeat(SERIALIZATION_RETRY) {
- try {
- return@conn lambda(conn);
- } catch (e: SQLException) {
- if (e.sqlState != PSQLState.SERIALIZATION_FAILURE.state)
- throw e
- }
- }
- try {
- return@conn lambda(conn)
- } catch(e: SQLException) {
- logger.warn("Serialization failure after $SERIALIZATION_RETRY retry")
- throw e
- }
- }
-
/** Apply paging logic to a sql query */
internal suspend fun <T> page(
params: PageParams,
diff --git a/bank/src/test/kotlin/AmountTest.kt b/bank/src/test/kotlin/AmountTest.kt
index 43b6fbef..9ce1cfff 100644
--- a/bank/src/test/kotlin/AmountTest.kt
+++ b/bank/src/test/kotlin/AmountTest.kt
@@ -31,8 +31,7 @@ import tech.libeufin.util.*
class AmountTest {
// Test amount computation in database
@Test
- fun computationTest() = bankSetup { db ->
- val conn = db.dbPool.getConnection().unwrap(PgConnection::class.java)
+ fun computationTest() = bankSetup { db -> db.conn { conn ->
conn.execSQLUpdate("UPDATE libeufin_bank.bank_accounts SET balance.val = 100000 WHERE internal_payto_uri = '$customerPayto'")
val stmt = conn.prepareStatement("""
UPDATE libeufin_bank.bank_accounts
@@ -125,9 +124,7 @@ class AmountTest {
hasBalanceDebt = true,
maxDebt = TalerAmount(0, 1, "KUDOS")
))
-
-
- }
+ }}
@Test
fun parse() {
diff --git a/integration/src/main/kotlin/Main.kt b/integration/src/main/kotlin/Main.kt
index 854b8c45..834f6f02 100644
--- a/integration/src/main/kotlin/Main.kt
+++ b/integration/src/main/kotlin/Main.kt
@@ -125,7 +125,7 @@ class Cli : CliktCommand("Run integration tests on banks provider") {
val payto = "payto://iban/CH2989144971918294289?receiver-name=Test"
step("Test fetch transactions")
- nexusCmd.test("ebics-fetch $ebicsFlag --pinned-start 2022-01-01").assertOk()
+ nexusCmd.test("ebics-fetch $ebicsFlags --pinned-start 2022-01-01").assertOk()
while (true) {
when (ask("Run 'fetch', 'submit', 'tx', 'txs', 'logs', 'ack' or 'exit'>")) {
diff --git a/integration/src/test/kotlin/IntegrationTest.kt b/integration/src/test/kotlin/IntegrationTest.kt
index a1fb79b6..8e804196 100644
--- a/integration/src/test/kotlin/IntegrationTest.kt
+++ b/integration/src/test/kotlin/IntegrationTest.kt
@@ -125,7 +125,7 @@ class IntegrationTest {
bankCmd.run("passwd admin password -c conf/integration.conf")
suspend fun checkCount(db: NexusDb, nbIncoming: Int, nbBounce: Int, nbTalerable: Int) {
- db.runConn { conn ->
+ db.conn { conn ->
conn.prepareStatement("SELECT count(*) FROM incoming_transactions").oneOrNull {
assertEquals(nbIncoming, it.getInt(1))
}
@@ -144,7 +144,7 @@ class IntegrationTest {
// Load conversion setup manually as the server would refuse to start without an exchange account
val sqlProcedures = File("../database-versioning/libeufin-conversion-setup.sql")
- db.runConn {
+ db.conn {
it.execSQLUpdate(sqlProcedures.readText())
it.execSQLUpdate("SET search_path TO libeufin_nexus;")
}
diff --git a/nexus/build.gradle b/nexus/build.gradle
index 75491bb3..879621f3 100644
--- a/nexus/build.gradle
+++ b/nexus/build.gradle
@@ -35,7 +35,6 @@ dependencies {
// Command line parsing
implementation("com.github.ajalt.clikt:clikt:$clikt_version")
implementation("org.postgresql:postgresql:$postgres_version")
- implementation("com.zaxxer:HikariCP:5.0.1")
// Ktor client library
implementation("io.ktor:ktor-client-apache:$ktor_version")
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt
index 430d5d83..939aa630 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt
@@ -22,7 +22,6 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.withContext
import org.postgresql.jdbc.PgConnection
import org.postgresql.util.PSQLState
-import com.zaxxer.hikari.*
import tech.libeufin.util.*
import java.sql.PreparedStatement
import java.sql.SQLException
@@ -198,39 +197,7 @@ private fun PreparedStatement.maybeUpdate(): Boolean {
/**
* Collects database connection steps and any operation on the Nexus tables.
*/
-class Database(dbConfig: String): java.io.Closeable {
- val dbPool: HikariDataSource
-
- init {
- val pgSource = pgDataSource(dbConfig)
- val config = HikariConfig();
- config.dataSource = pgSource
- config.connectionInitSql = "SET search_path TO libeufin_nexus;"
- config.validate()
- dbPool = HikariDataSource(config);
- }
-
- /**
- * Closes the database connection.
- */
- override fun close() {
- dbPool.close()
- }
-
- /**
- * Moves the database operations where they can block, without
- * blocking the whole process.
- *
- * @param lambda actual statement preparation and execution logic.
- * @return what lambda returns.
- */
- suspend fun <R> runConn(lambda: suspend (PgConnection) -> R): R {
- // Use a coroutine dispatcher that we can block as JDBC API is blocking
- return withContext(Dispatchers.IO) {
- val conn = dbPool.getConnection()
- conn.use { it -> lambda(it.unwrap(PgConnection::class.java)) }
- }
- }
+class Database(dbConfig: String): DbPool(dbConfig, "libeufin_nexus") {
// OUTGOING PAYMENTS METHODS
@@ -241,7 +208,7 @@ class Database(dbConfig: String): java.io.Closeable {
* @param paymentData information about the outgoing payment.
* @return operation outcome enum.
*/
- suspend fun registerOutgoing(paymentData: OutgoingPayment): OutgoingRegistrationResult = runConn {
+ suspend fun registerOutgoing(paymentData: OutgoingPayment): OutgoingRegistrationResult = conn {
val stmt = it.prepareStatement("""
SELECT out_tx_id, out_initiated, out_found
FROM register_outgoing(
@@ -289,7 +256,7 @@ class Database(dbConfig: String): java.io.Closeable {
paymentData: IncomingPayment,
bounceAmount: TalerAmount,
now: Instant
- ): IncomingBounceRegistrationResult = runConn {
+ ): IncomingBounceRegistrationResult = conn {
val stmt = it.prepareStatement("""
SELECT out_found, out_tx_id, out_bounce_id
FROM register_incoming_and_bounce(
@@ -337,7 +304,7 @@ class Database(dbConfig: String): java.io.Closeable {
suspend fun registerTalerableIncoming(
paymentData: IncomingPayment,
reservePub: ByteArray
- ): IncomingRegistrationResult = runConn { conn ->
+ ): IncomingRegistrationResult = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT out_found, out_tx_id
FROM register_incoming_and_talerable(
@@ -374,15 +341,15 @@ class Database(dbConfig: String): java.io.Closeable {
*
* @return [Instant] or null if no results were found
*/
- suspend fun outgoingPaymentLastExecTime(): Instant? = runConn { conn ->
+ suspend fun outgoingPaymentLastExecTime(): Instant? = conn { conn ->
val stmt = conn.prepareStatement(
"SELECT MAX(execution_time) as latest_execution_time FROM outgoing_transactions"
)
stmt.executeQuery().use {
- if (!it.next()) return@runConn null
+ if (!it.next()) return@conn null
val timestamp = it.getLong("latest_execution_time")
- if (timestamp == 0L) return@runConn null
- return@runConn timestamp.microsToJavaInstant()
+ if (timestamp == 0L) return@conn null
+ return@conn timestamp.microsToJavaInstant()
?: throw Exception("Could not convert latest_execution_time to Instant")
}
}
@@ -392,15 +359,15 @@ class Database(dbConfig: String): java.io.Closeable {
*
* @return [Instant] or null if no results were found
*/
- suspend fun incomingPaymentLastExecTime(): Instant? = runConn { conn ->
+ suspend fun incomingPaymentLastExecTime(): Instant? = conn { conn ->
val stmt = conn.prepareStatement(
"SELECT MAX(execution_time) as latest_execution_time FROM incoming_transactions"
)
stmt.executeQuery().use {
- if (!it.next()) return@runConn null
+ if (!it.next()) return@conn null
val timestamp = it.getLong("latest_execution_time")
- if (timestamp == 0L) return@runConn null
- return@runConn timestamp.microsToJavaInstant()
+ if (timestamp == 0L) return@conn null
+ return@conn timestamp.microsToJavaInstant()
?: throw Exception("Could not convert latest_execution_time to Instant")
}
}
@@ -411,7 +378,7 @@ class Database(dbConfig: String): java.io.Closeable {
* @param maybeReservePub reserve public key to look up
* @return true if found, false otherwise
*/
- suspend fun isReservePubFound(maybeReservePub: ByteArray): Boolean = runConn { conn ->
+ suspend fun isReservePubFound(maybeReservePub: ByteArray): Boolean = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT 1
FROM talerable_incoming_transactions
@@ -420,7 +387,7 @@ class Database(dbConfig: String): java.io.Closeable {
stmt.setBytes(1, maybeReservePub)
val res = stmt.executeQuery()
res.use {
- return@runConn it.next()
+ return@conn it.next()
}
}
@@ -445,7 +412,7 @@ class Database(dbConfig: String): java.io.Closeable {
suspend fun initiatedPaymentSetSubmittedState(
rowId: Long,
submissionState: DatabaseSubmissionState
- ): Boolean = runConn { conn ->
+ ): Boolean = conn { conn ->
val stmt = conn.prepareStatement("""
UPDATE initiated_outgoing_transactions
SET submitted = submission_state(?), last_submission_time = ?
@@ -458,7 +425,7 @@ class Database(dbConfig: String): java.io.Closeable {
throw Exception("Submission time could not be converted to microseconds for the database.")
})
stmt.setLong(3, rowId)
- return@runConn stmt.maybeUpdate()
+ return@conn stmt.maybeUpdate()
}
/**
@@ -468,7 +435,7 @@ class Database(dbConfig: String): java.io.Closeable {
* @param failureMessage error associated to this initiated payment.
* @return true on success, false if no payment was affected.
*/
- suspend fun initiatedPaymentSetFailureMessage(rowId: Long, failureMessage: String): Boolean = runConn { conn ->
+ suspend fun initiatedPaymentSetFailureMessage(rowId: Long, failureMessage: String): Boolean = conn { conn ->
val stmt = conn.prepareStatement("""
UPDATE initiated_outgoing_transactions
SET failure_message = ?
@@ -477,7 +444,7 @@ class Database(dbConfig: String): java.io.Closeable {
)
stmt.setString(1, failureMessage)
stmt.setLong(2, rowId)
- return@runConn stmt.maybeUpdate()
+ return@conn stmt.maybeUpdate()
}
/**
@@ -487,7 +454,7 @@ class Database(dbConfig: String): java.io.Closeable {
* @param currency in which currency should the payment be submitted to the bank.
* @return [Map] of the initiated payment row ID and [InitiatedPayment]
*/
- suspend fun initiatedPaymentsSubmittableGet(currency: String): Map<Long, InitiatedPayment> = runConn { conn ->
+ suspend fun initiatedPaymentsSubmittableGet(currency: String): Map<Long, InitiatedPayment> = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
initiated_outgoing_transaction_id
@@ -523,7 +490,7 @@ class Database(dbConfig: String): java.io.Closeable {
)
} while (it.next())
}
- return@runConn maybeMap
+ return@conn maybeMap
}
/**
* Initiate a payment in the database. The "submit"
@@ -533,7 +500,7 @@ class Database(dbConfig: String): java.io.Closeable {
* @param paymentData any data that's used to prepare the payment.
* @return true if the insertion went through, false in case of errors.
*/
- suspend fun initiatedPaymentCreate(paymentData: InitiatedPayment): PaymentInitiationOutcome = runConn { conn ->
+ suspend fun initiatedPaymentCreate(paymentData: InitiatedPayment): PaymentInitiationOutcome = conn { conn ->
val stmt = conn.prepareStatement("""
INSERT INTO initiated_outgoing_transactions (
amount
@@ -553,8 +520,8 @@ class Database(dbConfig: String): java.io.Closeable {
stmt.setInt(2, paymentData.amount.fraction)
stmt.setString(3, paymentData.wireTransferSubject)
parsePayto(paymentData.creditPaytoUri).apply {
- if (this == null) return@runConn PaymentInitiationOutcome.BAD_CREDIT_PAYTO
- if (this.receiverName == null) return@runConn PaymentInitiationOutcome.RECEIVER_NAME_MISSING
+ if (this == null) return@conn PaymentInitiationOutcome.BAD_CREDIT_PAYTO
+ if (this.receiverName == null) return@conn PaymentInitiationOutcome.RECEIVER_NAME_MISSING
}
stmt.setString(4, paymentData.creditPaytoUri)
val initiationTime = paymentData.initiationTime.toDbMicros() ?: run {
@@ -563,12 +530,12 @@ class Database(dbConfig: String): java.io.Closeable {
stmt.setLong(5, initiationTime)
stmt.setString(6, paymentData.requestUid) // can be null.
if (stmt.maybeUpdate())
- return@runConn PaymentInitiationOutcome.SUCCESS
+ return@conn PaymentInitiationOutcome.SUCCESS
/**
* _very_ likely, Nexus didn't check the request idempotency,
* as the row ID would never fall into the following problem.
*/
- return@runConn PaymentInitiationOutcome.UNIQUE_CONSTRAINT_VIOLATION
+ return@conn PaymentInitiationOutcome.UNIQUE_CONSTRAINT_VIOLATION
}
/**
@@ -583,7 +550,7 @@ class Database(dbConfig: String): java.io.Closeable {
* null gets returned even when the initiated payment exists,
* *but* it was NOT flagged as submitted.
*/
- suspend fun initiatedPaymentGetFromUid(uid: String): Long? = runConn { conn ->
+ suspend fun initiatedPaymentGetFromUid(uid: String): Long? = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT initiated_outgoing_transaction_id
FROM initiated_outgoing_transactions
@@ -592,8 +559,8 @@ class Database(dbConfig: String): java.io.Closeable {
stmt.setString(1, uid)
val res = stmt.executeQuery()
res.use {
- if (!it.next()) return@runConn null
- return@runConn it.getLong("initiated_outgoing_transaction_id")
+ if (!it.next()) return@conn null
+ return@conn it.getLong("initiated_outgoing_transaction_id")
}
}
} \ No newline at end of file
diff --git a/nexus/src/test/kotlin/ConfigLoading.kt b/nexus/src/test/kotlin/ConfigLoading.kt
index bd636f6a..ad4fa7ad 100644
--- a/nexus/src/test/kotlin/ConfigLoading.kt
+++ b/nexus/src/test/kotlin/ConfigLoading.kt
@@ -43,7 +43,6 @@ class ConfigLoading {
val handle = TalerConfig(NEXUS_CONFIG_SOURCE)
handle.load()
val cfg = EbicsSetupConfig(handle)
- cfg.config.requirePath("nexus-fetch", "statement_log_directory")
}
diff --git a/nexus/src/test/kotlin/DatabaseTest.kt b/nexus/src/test/kotlin/DatabaseTest.kt
index ed46598f..33113f66 100644
--- a/nexus/src/test/kotlin/DatabaseTest.kt
+++ b/nexus/src/test/kotlin/DatabaseTest.kt
@@ -82,7 +82,7 @@ class IncomingPaymentsTest {
).run {
assertFalse(new)
}
- db.runConn {
+ db.conn {
// Checking one incoming got created
val checkIncoming = it.prepareStatement("""
SELECT (amount).val as amount_value, (amount).frac as amount_frac
@@ -150,7 +150,7 @@ class PaymentInitiationsTest {
assertFalse(db.initiatedPaymentSetFailureMessage(3, "3 not existing"))
assertTrue(db.initiatedPaymentSetFailureMessage(1, "expired"))
// Checking the value from the database.
- db.runConn { conn ->
+ db.conn { conn ->
val idOne = conn.execSQLQuery("""
SELECT failure_message
FROM initiated_outgoing_transactions
@@ -178,7 +178,7 @@ class PaymentInitiationsTest {
db.initiatedPaymentCreate(genInitPay("not submitted, has row ID == 1")),
)
// Asserting on the false default submitted state.
- db.runConn { conn ->
+ db.conn { conn ->
val isSubmitted = conn.execSQLQuery(getRowOne)
assertTrue(isSubmitted.next())
assertEquals("unsubmitted", isSubmitted.getString("submitted"))
@@ -186,7 +186,7 @@ class PaymentInitiationsTest {
// Switching the submitted state to success.
assertTrue(db.initiatedPaymentSetSubmittedState(1, DatabaseSubmissionState.success))
// Asserting on the submitted state being TRUE now.
- db.runConn { conn ->
+ db.conn { conn ->
val isSubmitted = conn.execSQLQuery(getRowOne)
assertTrue(isSubmitted.next())
assertEquals("success", isSubmitted.getString("submitted"))
@@ -275,7 +275,7 @@ class PaymentInitiationsTest {
assertEquals(db.initiatedPaymentCreate(genInitPay("#4", "unique4")), PaymentInitiationOutcome.SUCCESS)
// Marking one as submitted, hence not expecting it in the results.
- db.runConn { conn ->
+ db.conn { conn ->
conn.execSQLUpdate("""
UPDATE initiated_outgoing_transactions
SET submitted='success'
diff --git a/util/build.gradle b/util/build.gradle
index 0bd9b551..b02c8f8d 100644
--- a/util/build.gradle
+++ b/util/build.gradle
@@ -27,6 +27,7 @@ dependencies {
implementation("org.bouncycastle:bcprov-jdk15on:1.69")
// Database helper
implementation("org.postgresql:postgresql:$postgres_version")
+ implementation("com.zaxxer:HikariCP:5.0.1")
implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version")
implementation("io.ktor:ktor-server-test-host:$ktor_version")
diff --git a/util/src/main/kotlin/Constants.kt b/util/src/main/kotlin/Constants.kt
new file mode 100644
index 00000000..3411323d
--- /dev/null
+++ b/util/src/main/kotlin/Constants.kt
@@ -0,0 +1,23 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+package tech.libeufin.util
+
+// DB
+const val MIN_VERSION: Int = 14
+const val SERIALIZATION_RETRY: Int = 10; \ No newline at end of file
diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt
index facff98b..82e492f2 100644
--- a/util/src/main/kotlin/DB.kt
+++ b/util/src/main/kotlin/DB.kt
@@ -29,6 +29,8 @@ import java.net.URI
import java.sql.PreparedStatement
import java.sql.ResultSet
import java.sql.SQLException
+import kotlinx.coroutines.*
+import com.zaxxer.hikari.*
fun getCurrentUser(): String = System.getProperty("user.name")
@@ -276,4 +278,54 @@ fun resetDatabaseTables(conn: PgConnection, cfg: DatabaseConfig, sqlFilePrefix:
val sqlDrop = File("${cfg.sqlDir}/$sqlFilePrefix-drop.sql").readText()
conn.execSQLUpdate(sqlDrop)
+}
+
+abstract class DbPool(cfg: String, schema: String): java.io.Closeable {
+ val pgSource = pgDataSource(cfg)
+ private val pool: HikariDataSource
+
+ init {
+ val config = HikariConfig();
+ config.dataSource = pgSource
+ config.schema = schema
+ config.transactionIsolation = "TRANSACTION_SERIALIZABLE"
+ pool = HikariDataSource(config)
+ pool.getConnection().use { con ->
+ val meta = con.getMetaData();
+ val majorVersion = meta.getDatabaseMajorVersion()
+ val minorVersion = meta.getDatabaseMinorVersion()
+ if (majorVersion < MIN_VERSION) {
+ throw Exception("postgres version must be at least $MIN_VERSION.0 got $majorVersion.$minorVersion")
+ }
+ }
+ }
+
+ suspend fun <R> conn(lambda: suspend (PgConnection) -> R): R {
+ // Use a coroutine dispatcher that we can block as JDBC API is blocking
+ return withContext(Dispatchers.IO) {
+ val conn = pool.getConnection()
+ conn.use{ it -> lambda(it.unwrap(PgConnection::class.java)) }
+ }
+ }
+
+ suspend fun <R> serializable(lambda: suspend (PgConnection) -> R): R = conn { conn ->
+ repeat(SERIALIZATION_RETRY) {
+ try {
+ return@conn lambda(conn);
+ } catch (e: SQLException) {
+ if (e.sqlState != PSQLState.SERIALIZATION_FAILURE.state)
+ throw e
+ }
+ }
+ try {
+ return@conn lambda(conn)
+ } catch(e: SQLException) {
+ logger.warn("Serialization failure after $SERIALIZATION_RETRY retry")
+ throw e
+ }
+ }
+
+ override fun close() {
+ pool.close()
+ }
} \ No newline at end of file