From d446df589a1ce8f2364c933a839c437783700e09 Mon Sep 17 00:00:00 2001 From: ms Date: Wed, 22 Sep 2021 14:32:51 +0200 Subject: Improve the 409 Conflict detection, address DB concurrency. The conflict now happens only if under the same withdraw operation ID the wallet tries to select two different exchanges or reserve public keys. As of DB concurrency, there is now one thread (named "DB") that should run all the database operations, in order to avoid conflicts on the disk. At this moment, and mostly to see where the current implementation fails with regard to concurrent DB access, not all the database operations were migrated into such thread. --- .../src/main/kotlin/tech/libeufin/sandbox/DB.kt | 4 +++ .../src/main/kotlin/tech/libeufin/sandbox/Main.kt | 35 +++++++++++++++++----- .../kotlin/tech/libeufin/sandbox/bankAccount.kt | 10 +++++-- 3 files changed, 39 insertions(+), 10 deletions(-) diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt index f1fbe561..ef6d3704 100644 --- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt +++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/DB.kt @@ -422,6 +422,8 @@ object TalerWithdrawalsTable : LongIdTable() { * the payment arrived at the exchange's bank yet. */ val transferDone = bool("transferDone").default(false) + val reservePub = text("reservePub").nullable() + val selectedExchangePayto = text("selectedExchangePayto").nullable() } class TalerWithdrawalEntity(id: EntityID) : LongEntity(id) { @@ -429,6 +431,8 @@ class TalerWithdrawalEntity(id: EntityID) : LongEntity(id) { var wopid by TalerWithdrawalsTable.wopid var selectionDone by TalerWithdrawalsTable.selectionDone var transferDone by TalerWithdrawalsTable.transferDone + var reservePub by TalerWithdrawalsTable.reservePub + var selectedExchangePayto by TalerWithdrawalsTable.selectedExchangePayto } object BankAccountReportsTable : IntIdTable() { diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt index bc826211..ad90770e 100644 --- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt +++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/Main.kt @@ -70,6 +70,8 @@ import io.ktor.http.* import io.ktor.http.content.* import io.ktor.request.* import io.ktor.util.date.* +import kotlinx.coroutines.newSingleThreadContext +import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction import tech.libeufin.util.* import tech.libeufin.util.ebics_h004.EbicsResponse import tech.libeufin.util.ebics_h004.EbicsTypes @@ -364,6 +366,8 @@ suspend inline fun ApplicationCall.receiveJson(): T { } } +val singleThreadContext = newSingleThreadContext("DB") + fun serverMain(dbName: String, port: Int) { execThrowableOrTerminate { dbCreateTables(dbName) } val myLogger = logger @@ -983,6 +987,7 @@ fun serverMain(dbName: String, port: Int) { // At this point, the three actors exist and a new withdraw operation can be created. TalerWithdrawalEntity.new { // wopid is autogenerated, and momentarily the only column + } } /** @@ -1042,30 +1047,46 @@ fun serverMain(dbName: String, port: Int) { val wopid: String = ensureNonNull(call.parameters["wopid"]) val body = call.receiveJson() - transaction { + newSuspendedTransaction(context = singleThreadContext) { var wo = TalerWithdrawalEntity.find { TalerWithdrawalsTable.wopid eq UUID.fromString(wopid) }.firstOrNull() ?: throw SandboxError( HttpStatusCode.NotFound, "Withdrawal operation $wopid not found." ) - if (wo.transferDone) { - throw SandboxError( + if (wo.selectionDone) { + if (wo.transferDone) { + logger.info("Wallet performs again this operation that was paid out earlier: idempotent") + return@newSuspendedTransaction + } + // reservePub+exchange selected but not payed: check consistency + if (body.reserve_pub != wo.reservePub) throw SandboxError( HttpStatusCode.Conflict, - "This withdraw operation was already funded. Aborting" + "Selecting a different reserve from the one already selected" + ) + if (body.selected_exchange != wo.selectedExchangePayto) throw SandboxError( + HttpStatusCode.Conflict, + "Selecting a different exchange from the one already selected" ) } - if (wo.selectionDone) { - logger.warn("This withdraw operation was already confirmed, but not funded. Trying again") - } + // here only if (1) no selection done or (2) _only_ selection done: + // both ways no transfer must have happened. + SandboxAssert(!wo.transferDone, "Sandbox allowed paid but unselected reserve") + wireTransfer( "sandbox-account-customer", "sandbox-account-exchange", "$currencyEnv:5", body.reserve_pub ) + wo.reservePub = body.reserve_pub + wo.selectedExchangePayto = body.selected_exchange wo.selectionDone = true wo.transferDone = true } + /** + * NOTE: is this always guaranteed to run AFTER the suspended + * transaction block above? + */ call.respond(object { val transfer_done = true }) diff --git a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt index 0a432451..3d5236a7 100644 --- a/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt +++ b/sandbox/src/main/kotlin/tech/libeufin/sandbox/bankAccount.kt @@ -1,14 +1,14 @@ package tech.libeufin.sandbox import io.ktor.http.* -import org.apache.http.HttpStatus import org.jetbrains.exposed.sql.and +import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction +import org.jetbrains.exposed.sql.transactions.experimental.suspendedTransactionAsync import org.jetbrains.exposed.sql.transactions.transaction import org.slf4j.Logger import org.slf4j.LoggerFactory import tech.libeufin.util.* import java.math.BigDecimal -import kotlin.system.exitProcess private val logger: Logger = LoggerFactory.getLogger("tech.libeufin.sandbox") @@ -117,12 +117,16 @@ fun historyForAccount(bankAccount: BankAccountEntity): MutableList { return history } +/** + * https://github.com/JetBrains/Exposed/wiki/Transactions#working-with-coroutines + * https://medium.com/androiddevelopers/threading-models-in-coroutines-and-android-sqlite-api-6cab11f7eb90 + */ fun wireTransfer( debitAccount: String, creditAccount: String, amount: String, subjectArg: String ) { - // check accounts exist transaction { + // check accounts exist val credit = BankAccountEntity.find { BankAccountsTable.label eq creditAccount }.firstOrNull() ?: run { -- cgit v1.2.3