commit 50bf46ceedaae54a18709f22d9eae27aff59d85d
parent 7ffa7a9f21f20f854b15c44c81b9db9eac93623e
Author: Antoine A <>
Date: Tue, 14 Nov 2023 17:42:12 +0000
Enforce transaction serialization
Diffstat:
6 files changed, 74 insertions(+), 20 deletions(-)
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/CashoutDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/CashoutDAO.kt
@@ -67,7 +67,7 @@ class CashoutDAO(private val db: Database) {
now: Instant,
retryCounter: Int,
validityPeriod: Duration
- ): CashoutCreation = db.conn { conn ->
+ ): CashoutCreation = db.serializable { conn ->
val stmt = conn.prepareStatement("""
SELECT
out_bad_conversion,
@@ -121,7 +121,7 @@ class CashoutDAO(private val db: Database) {
id: Long,
now: Instant,
retransmissionPeriod: Duration
- ) = db.conn { conn ->
+ ) = db.serializable { conn ->
val stmt = conn.prepareStatement("""
SELECT challenge_mark_sent(challenge, ?, ?)
FROM cashout_operations
@@ -133,7 +133,7 @@ class CashoutDAO(private val db: Database) {
stmt.executeQueryCheck()
}
- suspend fun abort(id: Long): AbortResult = db.conn { conn ->
+ suspend fun abort(id: Long): AbortResult = db.serializable { conn ->
val stmt = conn.prepareStatement("""
UPDATE cashout_operations
SET aborted = local_transaction IS NULL
@@ -152,7 +152,7 @@ class CashoutDAO(private val db: Database) {
id: Long,
tanCode: String,
timestamp: Instant
- ): CashoutConfirmationResult = db.conn { conn ->
+ ): CashoutConfirmationResult = db.serializable { conn ->
val stmt = conn.prepareStatement("""
SELECT
out_no_op,
@@ -188,7 +188,7 @@ class CashoutDAO(private val db: Database) {
CONFLICT_ALREADY_CONFIRMED
}
- suspend fun delete(id: Long): CashoutDeleteResult = db.conn { conn ->
+ suspend fun delete(id: Long): CashoutDeleteResult = db.serializable { conn ->
val stmt = conn.prepareStatement("""
SELECT out_already_confirmed
FROM cashout_delete(?)
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/ConversionDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/ConversionDAO.kt
@@ -26,7 +26,7 @@ import java.util.concurrent.TimeUnit
import tech.libeufin.util.*
class ConversionDAO(private val db: Database) {
- suspend fun updateConfig(cfg: ConversionInfo) = db.conn {
+ suspend fun updateConfig(cfg: ConversionInfo) = db.serializable {
it.transaction { conn ->
var stmt = conn.prepareStatement("CALL config_set_amount(?, (?, ?)::taler_amount)")
for ((name, amount) in listOf(
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt
@@ -33,8 +33,11 @@ import kotlinx.coroutines.flow.*
import kotlinx.coroutines.*
import com.zaxxer.hikari.*
import tech.libeufin.util.*
+import io.ktor.http.HttpStatusCode
+import net.taler.common.errorcodes.TalerErrorCode
private val logger: Logger = LoggerFactory.getLogger("tech.libeufin.bank.Database")
+private val SERIALIZATION_RETRY: Int = 10;
/**
* This error occurs in case the timestamp took by the bank for some
@@ -63,7 +66,7 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val
val pgSource = pgDataSource(dbConfig)
val config = HikariConfig();
config.dataSource = pgSource
- config.connectionInitSql = "SET search_path TO libeufin_bank;"
+ config.connectionInitSql = "SET search_path TO libeufin_bank;SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE;"
config.validate()
dbPool = HikariDataSource(config);
notifWatcher = NotificationWatcher(pgSource)
@@ -86,13 +89,31 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val
}
}
+
+ suspend fun <R> serializable(lambda: suspend (PgConnection) -> R): R = conn { conn ->
+ repeat(SERIALIZATION_RETRY) {
+ try {
+ return@conn lambda(conn);
+ } catch (e: SQLException) {
+ logger.error(e.message)
+ if (e.sqlState != "40001") // serialization_failure
+ throw e // rethrowing, not to hide other types of errors.
+ }
+ }
+ throw libeufinError(
+ HttpStatusCode.InternalServerError,
+ "Transaction serialization failure",
+ TalerErrorCode.BANK_SOFT_EXCEPTION
+ )
+ }
+
// CUSTOMERS
/**
* Deletes a customer (including its bank account row) from
* the database. The bank account gets deleted by the cascade.
*/
- suspend fun customerDeleteIfBalanceIsZero(login: String): CustomerDeletionResult = conn { conn ->
+ suspend fun customerDeleteIfBalanceIsZero(login: String): CustomerDeletionResult = serializable { conn ->
val stmt = conn.prepareStatement("""
SELECT
out_nx_customer,
@@ -138,7 +159,7 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val
expirationTime: Instant,
scope: TokenScope,
isRefreshable: Boolean
- ): Boolean = conn { conn ->
+ ): Boolean = serializable { conn ->
val bankCustomer = conn.prepareStatement("""
SELECT customer_id FROM customers WHERE login=?
""").run {
@@ -192,7 +213,7 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val
* if deletion succeeds or false if the token could not be
* deleted (= not found).
*/
- suspend fun bearerTokenDelete(token: ByteArray): Boolean = conn { conn ->
+ suspend fun bearerTokenDelete(token: ByteArray): Boolean = serializable { conn ->
val stmt = conn.prepareStatement("""
DELETE FROM bearer_tokens
WHERE content = ?
@@ -216,7 +237,7 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val
isTalerExchange: Boolean,
maxDebt: TalerAmount,
bonus: TalerAmount?
- ): CustomerCreationResult = conn { it ->
+ ): CustomerCreationResult = serializable { it ->
val now = Instant.now().toDbMicros() ?: throw faultyTimestampByBank();
it.transaction { conn ->
val idempotent = conn.prepareStatement("""
@@ -409,7 +430,7 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val
isTalerExchange: Boolean?,
debtLimit: TalerAmount?,
isAdmin: Boolean
- ): CustomerPatchResult = conn { conn ->
+ ): CustomerPatchResult = serializable { conn ->
val stmt = conn.prepareStatement("""
SELECT
out_not_found,
@@ -444,7 +465,7 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val
}
}
- suspend fun accountReconfigPassword(login: String, newPw: String, oldPw: String?): CustomerPatchAuthResult = conn {
+ suspend fun accountReconfigPassword(login: String, newPw: String, oldPw: String?): CustomerPatchAuthResult = serializable {
it.transaction { conn ->
val currentPwh = conn.prepareStatement("""
SELECT password_hash FROM customers WHERE login=?
@@ -650,7 +671,7 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val
subject: String,
amount: TalerAmount,
timestamp: Instant,
- ): Pair<BankTransactionResult, Long?> = conn { conn ->
+ ): Pair<BankTransactionResult, Long?> = serializable { conn ->
conn.transaction {
val stmt = conn.prepareStatement("""
SELECT
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/ExchangeDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/ExchangeDAO.kt
@@ -127,7 +127,7 @@ class ExchangeDAO(private val db: Database) {
req: TransferRequest,
username: String,
timestamp: Instant
- ): TransferResult = db.conn { conn ->
+ ): TransferResult = db.serializable { conn ->
val subject = OutgoingTxMetadata(req.wtid, req.exchange_base_url).encode()
val stmt = conn.prepareStatement("""
SELECT
@@ -195,7 +195,7 @@ class ExchangeDAO(private val db: Database) {
req: AddIncomingRequest,
username: String,
timestamp: Instant
- ): AddIncomingResult = db.conn { conn ->
+ ): AddIncomingResult = db.serializable { conn ->
val subject = IncomingTxMetadata(req.reserve_pub).encode()
val stmt = conn.prepareStatement("""
SELECT
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt
@@ -58,7 +58,7 @@ class WithdrawalDAO(private val db: Database) {
walletAccountUsername: String,
uuid: UUID,
amount: TalerAmount
- ): WithdrawalCreationResult = db.conn { conn ->
+ ): WithdrawalCreationResult = db.serializable { conn ->
val stmt = conn.prepareStatement("""
SELECT
out_account_not_found,
@@ -147,7 +147,7 @@ class WithdrawalDAO(private val db: Database) {
* Aborts one Taler withdrawal, only if it wasn't previously
* confirmed. It returns false if the UPDATE didn't succeed.
*/
- suspend fun abort(uuid: UUID): AbortResult = db.conn { conn ->
+ suspend fun abort(uuid: UUID): AbortResult = db.serializable { conn ->
val stmt = conn.prepareStatement("""
UPDATE taler_withdrawal_operations
SET aborted = NOT confirmation_done
@@ -174,7 +174,7 @@ class WithdrawalDAO(private val db: Database) {
uuid: UUID,
exchangePayto: IbanPayTo,
reservePub: EddsaPublicKey
- ): Pair<WithdrawalSelectionResult, Boolean> = db.conn { conn ->
+ ): Pair<WithdrawalSelectionResult, Boolean> = db.serializable { conn ->
val subject = IncomingTxMetadata(reservePub).encode()
val stmt = conn.prepareStatement("""
SELECT
@@ -213,7 +213,7 @@ class WithdrawalDAO(private val db: Database) {
suspend fun confirm(
uuid: UUID,
now: Instant
- ): WithdrawalConfirmationResult = db.conn { conn ->
+ ): WithdrawalConfirmationResult = db.serializable { conn ->
val stmt = conn.prepareStatement("""
SELECT
out_no_op,
diff --git a/bank/src/test/kotlin/DatabaseTest.kt b/bank/src/test/kotlin/DatabaseTest.kt
@@ -30,6 +30,12 @@ import java.util.UUID
import java.util.concurrent.TimeUnit
import kotlin.experimental.inv
import kotlin.test.*
+import kotlinx.coroutines.*
+import io.ktor.http.*
+import io.ktor.client.plugins.*
+import io.ktor.client.request.*
+import io.ktor.client.statement.*
+import io.ktor.client.HttpClient
class DatabaseTest {
// Testing the helper that update conversion config
@@ -50,6 +56,33 @@ class DatabaseTest {
}
@Test
+ fun serialisation() = bankSetup {
+ assertBalance("customer", CreditDebitInfo.credit, "KUDOS:0")
+ assertBalance("merchant", CreditDebitInfo.credit, "KUDOS:0")
+ coroutineScope {
+ repeat(10) {
+ launch {
+ tx("customer", "KUDOS:0.$it", "merchant", "concurrent $it")
+ }
+ }
+ }
+ assertBalance("customer", CreditDebitInfo.debit, "KUDOS:4.5")
+ assertBalance("merchant", CreditDebitInfo.credit, "KUDOS:4.5")
+ coroutineScope {
+ repeat(5) {
+ launch {
+ tx("customer", "KUDOS:0.0$it", "merchant", "concurrent 0$it")
+ }
+ launch {
+ client.get("/accounts/merchant/transactions") {
+ basicAuth("merchant", "merchant-password")
+ }.assertOk()
+ }
+ }
+ }
+ }
+
+ @Test
fun challenge() = setup { db, _ -> db.conn { conn ->
val createStmt = conn.prepareStatement("SELECT challenge_create(?,?,?,?)")
val sendStmt = conn.prepareStatement("SELECT challenge_mark_sent(?,?,?)")