diff options
author | Antoine A <> | 2024-01-23 16:32:31 +0100 |
---|---|---|
committer | Antoine A <> | 2024-01-23 16:32:31 +0100 |
commit | 73a98866810b47e341b00c5e2fcc0fc4f9a22cbe (patch) | |
tree | 780dce068547dfaa8ab86050d62a83914e96ca5f | |
parent | ad7171c999f48236b459b0b75a8b0b77e7399a48 (diff) | |
download | libeufin-73a98866810b47e341b00c5e2fcc0fc4f9a22cbe.tar.gz libeufin-73a98866810b47e341b00c5e2fcc0fc4f9a22cbe.tar.bz2 libeufin-73a98866810b47e341b00c5e2fcc0fc4f9a22cbe.zip |
Share database logic
-rw-r--r-- | bank/build.gradle | 1 | ||||
-rw-r--r-- | bank/src/main/kotlin/tech/libeufin/bank/Constants.kt | 6 | ||||
-rw-r--r-- | bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt | 54 | ||||
-rw-r--r-- | bank/src/test/kotlin/AmountTest.kt | 7 | ||||
-rw-r--r-- | integration/src/main/kotlin/Main.kt | 2 | ||||
-rw-r--r-- | integration/src/test/kotlin/IntegrationTest.kt | 4 | ||||
-rw-r--r-- | nexus/build.gradle | 1 | ||||
-rw-r--r-- | nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt | 89 | ||||
-rw-r--r-- | nexus/src/test/kotlin/ConfigLoading.kt | 1 | ||||
-rw-r--r-- | nexus/src/test/kotlin/DatabaseTest.kt | 10 | ||||
-rw-r--r-- | util/build.gradle | 1 | ||||
-rw-r--r-- | util/src/main/kotlin/Constants.kt | 23 | ||||
-rw-r--r-- | util/src/main/kotlin/DB.kt | 52 |
13 files changed, 117 insertions, 134 deletions
diff --git a/bank/build.gradle b/bank/build.gradle index 0f0bb006..fa43b2ae 100644 --- a/bank/build.gradle +++ b/bank/build.gradle @@ -25,7 +25,6 @@ dependencies { implementation(project(":util")) implementation("org.postgresql:postgresql:$postgres_version") - implementation("com.zaxxer:HikariCP:5.0.1") implementation("com.github.ajalt.clikt:clikt:$clikt_version") implementation("io.ktor:ktor-server-core:$ktor_version") diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt b/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt index a1c65035..07d23469 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt @@ -39,11 +39,7 @@ const val IBAN_ALLOCATION_RETRY_COUNTER: Int = 5; // Security const val MAX_BODY_LENGTH: Long = 4 * 1024 // 4kB -// DB -const val MIN_VERSION: Int = 14 -const val SERIALIZATION_RETRY: Int = 10; - -// API version +// API version const val COREBANK_API_VERSION: String = "4:0:0" const val CONVERSION_API_VERSION: String = "0:0:0" const val INTEGRATION_API_VERSION: String = "2:0:2" diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt index 27cbfc2f..53b11c1d 100644 --- a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt +++ b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit import kotlin.math.abs import kotlinx.coroutines.flow.* import kotlinx.coroutines.* -import com.zaxxer.hikari.* import tech.libeufin.util.* import io.ktor.http.HttpStatusCode import net.taler.common.errorcodes.TalerErrorCode @@ -59,27 +58,8 @@ private val logger: Logger = LoggerFactory.getLogger("libeufin-bank-db") internal fun faultyTimestampByBank() = internalServerError("Bank took overflowing timestamp") internal fun faultyDurationByClient() = badRequest("Overflowing duration, please specify 'forever' instead.") -class Database(dbConfig: String, internal val bankCurrency: String, internal val fiatCurrency: String?): java.io.Closeable { - val dbPool: HikariDataSource - internal val notifWatcher: NotificationWatcher - - init { - val pgSource = pgDataSource(dbConfig) - val config = HikariConfig(); - config.dataSource = pgSource - config.connectionInitSql = "SET search_path TO libeufin_bank;SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;" - config.validate() - dbPool = HikariDataSource(config); - dbPool.getConnection().use { con -> - val meta = con.getMetaData(); - val majorVersion = meta.getDatabaseMajorVersion() - val minorVersion = meta.getDatabaseMinorVersion() - if (majorVersion < MIN_VERSION) { - throw Exception("postgres version must be at least $MIN_VERSION.0 got $majorVersion.$minorVersion") - } - } - notifWatcher = NotificationWatcher(pgSource) - } +class Database(dbConfig: String, internal val bankCurrency: String, internal val fiatCurrency: String?): DbPool(dbConfig, "libeufin_bank") { + internal val notifWatcher: NotificationWatcher = NotificationWatcher(pgSource) val cashout = CashoutDAO(this) val withdrawal = WithdrawalDAO(this) @@ -142,36 +122,6 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val } ?: throw internalServerError("No result from DB procedure stats_get_frame") } - override fun close() { - dbPool.close() - } - - suspend fun <R> conn(lambda: suspend (PgConnection) -> R): R { - // Use a coroutine dispatcher that we can block as JDBC API is blocking - return withContext(Dispatchers.IO) { - val conn = dbPool.getConnection() - conn.use{ it -> lambda(it.unwrap(PgConnection::class.java)) } - } - } - - - suspend fun <R> serializable(lambda: suspend (PgConnection) -> R): R = conn { conn -> - repeat(SERIALIZATION_RETRY) { - try { - return@conn lambda(conn); - } catch (e: SQLException) { - if (e.sqlState != PSQLState.SERIALIZATION_FAILURE.state) - throw e - } - } - try { - return@conn lambda(conn) - } catch(e: SQLException) { - logger.warn("Serialization failure after $SERIALIZATION_RETRY retry") - throw e - } - } - /** Apply paging logic to a sql query */ internal suspend fun <T> page( params: PageParams, diff --git a/bank/src/test/kotlin/AmountTest.kt b/bank/src/test/kotlin/AmountTest.kt index 43b6fbef..9ce1cfff 100644 --- a/bank/src/test/kotlin/AmountTest.kt +++ b/bank/src/test/kotlin/AmountTest.kt @@ -31,8 +31,7 @@ import tech.libeufin.util.* class AmountTest { // Test amount computation in database @Test - fun computationTest() = bankSetup { db -> - val conn = db.dbPool.getConnection().unwrap(PgConnection::class.java) + fun computationTest() = bankSetup { db -> db.conn { conn -> conn.execSQLUpdate("UPDATE libeufin_bank.bank_accounts SET balance.val = 100000 WHERE internal_payto_uri = '$customerPayto'") val stmt = conn.prepareStatement(""" UPDATE libeufin_bank.bank_accounts @@ -125,9 +124,7 @@ class AmountTest { hasBalanceDebt = true, maxDebt = TalerAmount(0, 1, "KUDOS") )) - - - } + }} @Test fun parse() { diff --git a/integration/src/main/kotlin/Main.kt b/integration/src/main/kotlin/Main.kt index 854b8c45..834f6f02 100644 --- a/integration/src/main/kotlin/Main.kt +++ b/integration/src/main/kotlin/Main.kt @@ -125,7 +125,7 @@ class Cli : CliktCommand("Run integration tests on banks provider") { val payto = "payto://iban/CH2989144971918294289?receiver-name=Test" step("Test fetch transactions") - nexusCmd.test("ebics-fetch $ebicsFlag --pinned-start 2022-01-01").assertOk() + nexusCmd.test("ebics-fetch $ebicsFlags --pinned-start 2022-01-01").assertOk() while (true) { when (ask("Run 'fetch', 'submit', 'tx', 'txs', 'logs', 'ack' or 'exit'>")) { diff --git a/integration/src/test/kotlin/IntegrationTest.kt b/integration/src/test/kotlin/IntegrationTest.kt index a1fb79b6..8e804196 100644 --- a/integration/src/test/kotlin/IntegrationTest.kt +++ b/integration/src/test/kotlin/IntegrationTest.kt @@ -125,7 +125,7 @@ class IntegrationTest { bankCmd.run("passwd admin password -c conf/integration.conf") suspend fun checkCount(db: NexusDb, nbIncoming: Int, nbBounce: Int, nbTalerable: Int) { - db.runConn { conn -> + db.conn { conn -> conn.prepareStatement("SELECT count(*) FROM incoming_transactions").oneOrNull { assertEquals(nbIncoming, it.getInt(1)) } @@ -144,7 +144,7 @@ class IntegrationTest { // Load conversion setup manually as the server would refuse to start without an exchange account val sqlProcedures = File("../database-versioning/libeufin-conversion-setup.sql") - db.runConn { + db.conn { it.execSQLUpdate(sqlProcedures.readText()) it.execSQLUpdate("SET search_path TO libeufin_nexus;") } diff --git a/nexus/build.gradle b/nexus/build.gradle index 75491bb3..879621f3 100644 --- a/nexus/build.gradle +++ b/nexus/build.gradle @@ -35,7 +35,6 @@ dependencies { // Command line parsing implementation("com.github.ajalt.clikt:clikt:$clikt_version") implementation("org.postgresql:postgresql:$postgres_version") - implementation("com.zaxxer:HikariCP:5.0.1") // Ktor client library implementation("io.ktor:ktor-client-apache:$ktor_version") diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt index 430d5d83..939aa630 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt @@ -22,7 +22,6 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import org.postgresql.jdbc.PgConnection import org.postgresql.util.PSQLState -import com.zaxxer.hikari.* import tech.libeufin.util.* import java.sql.PreparedStatement import java.sql.SQLException @@ -198,39 +197,7 @@ private fun PreparedStatement.maybeUpdate(): Boolean { /** * Collects database connection steps and any operation on the Nexus tables. */ -class Database(dbConfig: String): java.io.Closeable { - val dbPool: HikariDataSource - - init { - val pgSource = pgDataSource(dbConfig) - val config = HikariConfig(); - config.dataSource = pgSource - config.connectionInitSql = "SET search_path TO libeufin_nexus;" - config.validate() - dbPool = HikariDataSource(config); - } - - /** - * Closes the database connection. - */ - override fun close() { - dbPool.close() - } - - /** - * Moves the database operations where they can block, without - * blocking the whole process. - * - * @param lambda actual statement preparation and execution logic. - * @return what lambda returns. - */ - suspend fun <R> runConn(lambda: suspend (PgConnection) -> R): R { - // Use a coroutine dispatcher that we can block as JDBC API is blocking - return withContext(Dispatchers.IO) { - val conn = dbPool.getConnection() - conn.use { it -> lambda(it.unwrap(PgConnection::class.java)) } - } - } +class Database(dbConfig: String): DbPool(dbConfig, "libeufin_nexus") { // OUTGOING PAYMENTS METHODS @@ -241,7 +208,7 @@ class Database(dbConfig: String): java.io.Closeable { * @param paymentData information about the outgoing payment. * @return operation outcome enum. */ - suspend fun registerOutgoing(paymentData: OutgoingPayment): OutgoingRegistrationResult = runConn { + suspend fun registerOutgoing(paymentData: OutgoingPayment): OutgoingRegistrationResult = conn { val stmt = it.prepareStatement(""" SELECT out_tx_id, out_initiated, out_found FROM register_outgoing( @@ -289,7 +256,7 @@ class Database(dbConfig: String): java.io.Closeable { paymentData: IncomingPayment, bounceAmount: TalerAmount, now: Instant - ): IncomingBounceRegistrationResult = runConn { + ): IncomingBounceRegistrationResult = conn { val stmt = it.prepareStatement(""" SELECT out_found, out_tx_id, out_bounce_id FROM register_incoming_and_bounce( @@ -337,7 +304,7 @@ class Database(dbConfig: String): java.io.Closeable { suspend fun registerTalerableIncoming( paymentData: IncomingPayment, reservePub: ByteArray - ): IncomingRegistrationResult = runConn { conn -> + ): IncomingRegistrationResult = conn { conn -> val stmt = conn.prepareStatement(""" SELECT out_found, out_tx_id FROM register_incoming_and_talerable( @@ -374,15 +341,15 @@ class Database(dbConfig: String): java.io.Closeable { * * @return [Instant] or null if no results were found */ - suspend fun outgoingPaymentLastExecTime(): Instant? = runConn { conn -> + suspend fun outgoingPaymentLastExecTime(): Instant? = conn { conn -> val stmt = conn.prepareStatement( "SELECT MAX(execution_time) as latest_execution_time FROM outgoing_transactions" ) stmt.executeQuery().use { - if (!it.next()) return@runConn null + if (!it.next()) return@conn null val timestamp = it.getLong("latest_execution_time") - if (timestamp == 0L) return@runConn null - return@runConn timestamp.microsToJavaInstant() + if (timestamp == 0L) return@conn null + return@conn timestamp.microsToJavaInstant() ?: throw Exception("Could not convert latest_execution_time to Instant") } } @@ -392,15 +359,15 @@ class Database(dbConfig: String): java.io.Closeable { * * @return [Instant] or null if no results were found */ - suspend fun incomingPaymentLastExecTime(): Instant? = runConn { conn -> + suspend fun incomingPaymentLastExecTime(): Instant? = conn { conn -> val stmt = conn.prepareStatement( "SELECT MAX(execution_time) as latest_execution_time FROM incoming_transactions" ) stmt.executeQuery().use { - if (!it.next()) return@runConn null + if (!it.next()) return@conn null val timestamp = it.getLong("latest_execution_time") - if (timestamp == 0L) return@runConn null - return@runConn timestamp.microsToJavaInstant() + if (timestamp == 0L) return@conn null + return@conn timestamp.microsToJavaInstant() ?: throw Exception("Could not convert latest_execution_time to Instant") } } @@ -411,7 +378,7 @@ class Database(dbConfig: String): java.io.Closeable { * @param maybeReservePub reserve public key to look up * @return true if found, false otherwise */ - suspend fun isReservePubFound(maybeReservePub: ByteArray): Boolean = runConn { conn -> + suspend fun isReservePubFound(maybeReservePub: ByteArray): Boolean = conn { conn -> val stmt = conn.prepareStatement(""" SELECT 1 FROM talerable_incoming_transactions @@ -420,7 +387,7 @@ class Database(dbConfig: String): java.io.Closeable { stmt.setBytes(1, maybeReservePub) val res = stmt.executeQuery() res.use { - return@runConn it.next() + return@conn it.next() } } @@ -445,7 +412,7 @@ class Database(dbConfig: String): java.io.Closeable { suspend fun initiatedPaymentSetSubmittedState( rowId: Long, submissionState: DatabaseSubmissionState - ): Boolean = runConn { conn -> + ): Boolean = conn { conn -> val stmt = conn.prepareStatement(""" UPDATE initiated_outgoing_transactions SET submitted = submission_state(?), last_submission_time = ? @@ -458,7 +425,7 @@ class Database(dbConfig: String): java.io.Closeable { throw Exception("Submission time could not be converted to microseconds for the database.") }) stmt.setLong(3, rowId) - return@runConn stmt.maybeUpdate() + return@conn stmt.maybeUpdate() } /** @@ -468,7 +435,7 @@ class Database(dbConfig: String): java.io.Closeable { * @param failureMessage error associated to this initiated payment. * @return true on success, false if no payment was affected. */ - suspend fun initiatedPaymentSetFailureMessage(rowId: Long, failureMessage: String): Boolean = runConn { conn -> + suspend fun initiatedPaymentSetFailureMessage(rowId: Long, failureMessage: String): Boolean = conn { conn -> val stmt = conn.prepareStatement(""" UPDATE initiated_outgoing_transactions SET failure_message = ? @@ -477,7 +444,7 @@ class Database(dbConfig: String): java.io.Closeable { ) stmt.setString(1, failureMessage) stmt.setLong(2, rowId) - return@runConn stmt.maybeUpdate() + return@conn stmt.maybeUpdate() } /** @@ -487,7 +454,7 @@ class Database(dbConfig: String): java.io.Closeable { * @param currency in which currency should the payment be submitted to the bank. * @return [Map] of the initiated payment row ID and [InitiatedPayment] */ - suspend fun initiatedPaymentsSubmittableGet(currency: String): Map<Long, InitiatedPayment> = runConn { conn -> + suspend fun initiatedPaymentsSubmittableGet(currency: String): Map<Long, InitiatedPayment> = conn { conn -> val stmt = conn.prepareStatement(""" SELECT initiated_outgoing_transaction_id @@ -523,7 +490,7 @@ class Database(dbConfig: String): java.io.Closeable { ) } while (it.next()) } - return@runConn maybeMap + return@conn maybeMap } /** * Initiate a payment in the database. The "submit" @@ -533,7 +500,7 @@ class Database(dbConfig: String): java.io.Closeable { * @param paymentData any data that's used to prepare the payment. * @return true if the insertion went through, false in case of errors. */ - suspend fun initiatedPaymentCreate(paymentData: InitiatedPayment): PaymentInitiationOutcome = runConn { conn -> + suspend fun initiatedPaymentCreate(paymentData: InitiatedPayment): PaymentInitiationOutcome = conn { conn -> val stmt = conn.prepareStatement(""" INSERT INTO initiated_outgoing_transactions ( amount @@ -553,8 +520,8 @@ class Database(dbConfig: String): java.io.Closeable { stmt.setInt(2, paymentData.amount.fraction) stmt.setString(3, paymentData.wireTransferSubject) parsePayto(paymentData.creditPaytoUri).apply { - if (this == null) return@runConn PaymentInitiationOutcome.BAD_CREDIT_PAYTO - if (this.receiverName == null) return@runConn PaymentInitiationOutcome.RECEIVER_NAME_MISSING + if (this == null) return@conn PaymentInitiationOutcome.BAD_CREDIT_PAYTO + if (this.receiverName == null) return@conn PaymentInitiationOutcome.RECEIVER_NAME_MISSING } stmt.setString(4, paymentData.creditPaytoUri) val initiationTime = paymentData.initiationTime.toDbMicros() ?: run { @@ -563,12 +530,12 @@ class Database(dbConfig: String): java.io.Closeable { stmt.setLong(5, initiationTime) stmt.setString(6, paymentData.requestUid) // can be null. if (stmt.maybeUpdate()) - return@runConn PaymentInitiationOutcome.SUCCESS + return@conn PaymentInitiationOutcome.SUCCESS /** * _very_ likely, Nexus didn't check the request idempotency, * as the row ID would never fall into the following problem. */ - return@runConn PaymentInitiationOutcome.UNIQUE_CONSTRAINT_VIOLATION + return@conn PaymentInitiationOutcome.UNIQUE_CONSTRAINT_VIOLATION } /** @@ -583,7 +550,7 @@ class Database(dbConfig: String): java.io.Closeable { * null gets returned even when the initiated payment exists, * *but* it was NOT flagged as submitted. */ - suspend fun initiatedPaymentGetFromUid(uid: String): Long? = runConn { conn -> + suspend fun initiatedPaymentGetFromUid(uid: String): Long? = conn { conn -> val stmt = conn.prepareStatement(""" SELECT initiated_outgoing_transaction_id FROM initiated_outgoing_transactions @@ -592,8 +559,8 @@ class Database(dbConfig: String): java.io.Closeable { stmt.setString(1, uid) val res = stmt.executeQuery() res.use { - if (!it.next()) return@runConn null - return@runConn it.getLong("initiated_outgoing_transaction_id") + if (!it.next()) return@conn null + return@conn it.getLong("initiated_outgoing_transaction_id") } } }
\ No newline at end of file diff --git a/nexus/src/test/kotlin/ConfigLoading.kt b/nexus/src/test/kotlin/ConfigLoading.kt index bd636f6a..ad4fa7ad 100644 --- a/nexus/src/test/kotlin/ConfigLoading.kt +++ b/nexus/src/test/kotlin/ConfigLoading.kt @@ -43,7 +43,6 @@ class ConfigLoading { val handle = TalerConfig(NEXUS_CONFIG_SOURCE) handle.load() val cfg = EbicsSetupConfig(handle) - cfg.config.requirePath("nexus-fetch", "statement_log_directory") } diff --git a/nexus/src/test/kotlin/DatabaseTest.kt b/nexus/src/test/kotlin/DatabaseTest.kt index ed46598f..33113f66 100644 --- a/nexus/src/test/kotlin/DatabaseTest.kt +++ b/nexus/src/test/kotlin/DatabaseTest.kt @@ -82,7 +82,7 @@ class IncomingPaymentsTest { ).run { assertFalse(new) } - db.runConn { + db.conn { // Checking one incoming got created val checkIncoming = it.prepareStatement(""" SELECT (amount).val as amount_value, (amount).frac as amount_frac @@ -150,7 +150,7 @@ class PaymentInitiationsTest { assertFalse(db.initiatedPaymentSetFailureMessage(3, "3 not existing")) assertTrue(db.initiatedPaymentSetFailureMessage(1, "expired")) // Checking the value from the database. - db.runConn { conn -> + db.conn { conn -> val idOne = conn.execSQLQuery(""" SELECT failure_message FROM initiated_outgoing_transactions @@ -178,7 +178,7 @@ class PaymentInitiationsTest { db.initiatedPaymentCreate(genInitPay("not submitted, has row ID == 1")), ) // Asserting on the false default submitted state. - db.runConn { conn -> + db.conn { conn -> val isSubmitted = conn.execSQLQuery(getRowOne) assertTrue(isSubmitted.next()) assertEquals("unsubmitted", isSubmitted.getString("submitted")) @@ -186,7 +186,7 @@ class PaymentInitiationsTest { // Switching the submitted state to success. assertTrue(db.initiatedPaymentSetSubmittedState(1, DatabaseSubmissionState.success)) // Asserting on the submitted state being TRUE now. - db.runConn { conn -> + db.conn { conn -> val isSubmitted = conn.execSQLQuery(getRowOne) assertTrue(isSubmitted.next()) assertEquals("success", isSubmitted.getString("submitted")) @@ -275,7 +275,7 @@ class PaymentInitiationsTest { assertEquals(db.initiatedPaymentCreate(genInitPay("#4", "unique4")), PaymentInitiationOutcome.SUCCESS) // Marking one as submitted, hence not expecting it in the results. - db.runConn { conn -> + db.conn { conn -> conn.execSQLUpdate(""" UPDATE initiated_outgoing_transactions SET submitted='success' diff --git a/util/build.gradle b/util/build.gradle index 0bd9b551..b02c8f8d 100644 --- a/util/build.gradle +++ b/util/build.gradle @@ -27,6 +27,7 @@ dependencies { implementation("org.bouncycastle:bcprov-jdk15on:1.69") // Database helper implementation("org.postgresql:postgresql:$postgres_version") + implementation("com.zaxxer:HikariCP:5.0.1") implementation("io.ktor:ktor-serialization-kotlinx-json:$ktor_version") implementation("io.ktor:ktor-server-test-host:$ktor_version") diff --git a/util/src/main/kotlin/Constants.kt b/util/src/main/kotlin/Constants.kt new file mode 100644 index 00000000..3411323d --- /dev/null +++ b/util/src/main/kotlin/Constants.kt @@ -0,0 +1,23 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ +package tech.libeufin.util + +// DB +const val MIN_VERSION: Int = 14 +const val SERIALIZATION_RETRY: Int = 10;
\ No newline at end of file diff --git a/util/src/main/kotlin/DB.kt b/util/src/main/kotlin/DB.kt index facff98b..82e492f2 100644 --- a/util/src/main/kotlin/DB.kt +++ b/util/src/main/kotlin/DB.kt @@ -29,6 +29,8 @@ import java.net.URI import java.sql.PreparedStatement import java.sql.ResultSet import java.sql.SQLException +import kotlinx.coroutines.* +import com.zaxxer.hikari.* fun getCurrentUser(): String = System.getProperty("user.name") @@ -276,4 +278,54 @@ fun resetDatabaseTables(conn: PgConnection, cfg: DatabaseConfig, sqlFilePrefix: val sqlDrop = File("${cfg.sqlDir}/$sqlFilePrefix-drop.sql").readText() conn.execSQLUpdate(sqlDrop) +} + +abstract class DbPool(cfg: String, schema: String): java.io.Closeable { + val pgSource = pgDataSource(cfg) + private val pool: HikariDataSource + + init { + val config = HikariConfig(); + config.dataSource = pgSource + config.schema = schema + config.transactionIsolation = "TRANSACTION_SERIALIZABLE" + pool = HikariDataSource(config) + pool.getConnection().use { con -> + val meta = con.getMetaData(); + val majorVersion = meta.getDatabaseMajorVersion() + val minorVersion = meta.getDatabaseMinorVersion() + if (majorVersion < MIN_VERSION) { + throw Exception("postgres version must be at least $MIN_VERSION.0 got $majorVersion.$minorVersion") + } + } + } + + suspend fun <R> conn(lambda: suspend (PgConnection) -> R): R { + // Use a coroutine dispatcher that we can block as JDBC API is blocking + return withContext(Dispatchers.IO) { + val conn = pool.getConnection() + conn.use{ it -> lambda(it.unwrap(PgConnection::class.java)) } + } + } + + suspend fun <R> serializable(lambda: suspend (PgConnection) -> R): R = conn { conn -> + repeat(SERIALIZATION_RETRY) { + try { + return@conn lambda(conn); + } catch (e: SQLException) { + if (e.sqlState != PSQLState.SERIALIZATION_FAILURE.state) + throw e + } + } + try { + return@conn lambda(conn) + } catch(e: SQLException) { + logger.warn("Serialization failure after $SERIALIZATION_RETRY retry") + throw e + } + } + + override fun close() { + pool.close() + } }
\ No newline at end of file |