libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

transaction.kt (2628B)


      1 /*
      2  * This file is part of LibEuFin.
      3  * Copyright (C) 2025 Taler Systems S.A.
      4  *
      5  * LibEuFin is free software; you can redistribute it and/or modify
      6  * it under the terms of the GNU Affero General Public License as
      7  * published by the Free Software Foundation; either version 3, or
      8  * (at your option) any later version.
      9  *
     10  * LibEuFin is distributed in the hope that it will be useful, but
     11  * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
     12  * or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Affero General
     13  * Public License for more details.
     14  *
     15  * You should have received a copy of the GNU Affero General Public
     16  * License along with LibEuFin; see the file COPYING.  If not, see
     17  * <http://www.gnu.org/licenses/>
     18  */
     19 
     20 package tech.libeufin.common.db
     21 
     22 import org.postgresql.jdbc.PgConnection
     23 import org.postgresql.util.PSQLState
     24 import tech.libeufin.common.SERIALIZATION_RETRY
     25 import java.sql.PreparedStatement
     26 import java.sql.ResultSet
     27 import java.sql.SQLException
     28 
     29 val SERIALIZATION_ERROR = setOf(
     30     "40001", // serialization_failure
     31     "40P01", // deadlock_detected
     32     "55P03", // lock_not_available
     33 )
     34 
     35 /** Executes db logic with automatic retry on serialization errors */
     36 suspend fun <R> retrySerializationError(lambda: suspend () -> R): R {
     37     repeat(SERIALIZATION_RETRY) {
     38         try {
     39             return lambda()
     40         } catch (e: SQLException) {
     41             if (!SERIALIZATION_ERROR.contains(e.sqlState)) throw e
     42         }
     43     }
     44     return lambda()
     45 }
     46 
     47 fun PgConnection.talerStatement(query: String): TalerStatement = TalerStatement(prepareStatement(query))
     48 
     49 /** Run a postgres query using a prepared statement */
     50 inline fun <R> PgConnection.withStatement(query: String, lambda: TalerStatement.() -> R): R =
     51     talerStatement(query).use { it.lambda() }
     52 
     53 /** Run a postgres [transaction] */
     54 fun <R> PgConnection.transaction(transaction: (PgConnection) -> R): R {
     55     try {
     56         autoCommit = false
     57         val result = transaction(this)
     58         commit()
     59         autoCommit = true
     60         return result
     61     } catch (e: Exception) {
     62         rollback()
     63         autoCommit = true
     64         throw e
     65     }
     66 }
     67 
     68 /** 
     69  * Execute an update of [table] with a dynamic query generated at runtime.
     70  * Every [fields] in each row matching [filter] are updated using values from [bind].
     71  **/
     72 fun PgConnection.dynamicUpdate(
     73     table: String,
     74     fields: Sequence<String>,
     75     filter: String,
     76     bind: TalerStatement.() -> Unit,
     77 ) {
     78     val sql = fields.joinToString()
     79     if (sql.isEmpty()) return
     80     withStatement("UPDATE $table SET $sql $filter") {
     81         bind()
     82         executeUpdate()
     83     }
     84 }