commit 11d4d41bf20465259d245c64d50c676308216976
parent e58faac730f96106cd04e5a28f6462c3a376421b
Author: Antoine A <>
Date: Fri, 13 Oct 2023 09:57:14 +0000
Don't block the server during JDBC calls
Diffstat:
6 files changed, 111 insertions(+), 105 deletions(-)
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Authentication.kt b/bank/src/main/kotlin/tech/libeufin/bank/Authentication.kt
@@ -15,7 +15,7 @@ import tech.libeufin.util.getAuthorizationRawHeader
*
* Returns the authenticated customer, or null if they failed.
*/
-fun ApplicationCall.authenticateBankRequest(db: Database, requiredScope: TokenScope): Customer? {
+suspend fun ApplicationCall.authenticateBankRequest(db: Database, requiredScope: TokenScope): Customer? {
// Extracting the Authorization header.
val header = getAuthorizationRawHeader(this.request) ?: throw badRequest(
"Authorization header not found.",
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/Database.kt
@@ -211,9 +211,12 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
dbPool.close()
}
- private fun <R> conn(lambda: (PgConnection) -> R): R {
- val conn = dbPool.getConnection()
- return conn.use{ it -> lambda(it.unwrap(PgConnection::class.java)) }
+ private suspend fun <R> conn(lambda: suspend (PgConnection) -> R): R {
+ // Use a coroutine dispatcher that we can block as JDBC API is blocking
+ return withContext(Dispatchers.IO) {
+ val conn = dbPool.getConnection()
+ conn.use{ it -> lambda(it.unwrap(PgConnection::class.java)) }
+ }
}
// CUSTOMERS
@@ -225,7 +228,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
*
* In case of conflict, this method returns null.
*/
- fun customerCreate(customer: Customer): Long? = conn { conn ->
+ suspend fun customerCreate(customer: Customer): Long? = conn { conn ->
val stmt = conn.prepareStatement("""
INSERT INTO customers (
login
@@ -267,7 +270,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
* Deletes a customer (including its bank account row) from
* the database. The bank account gets deleted by the cascade.
*/
- fun customerDeleteIfBalanceIsZero(login: String): CustomerDeletionResult = conn { conn ->
+ suspend fun customerDeleteIfBalanceIsZero(login: String): CustomerDeletionResult = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
out_nx_customer,
@@ -286,7 +289,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
}
// Mostly used to get customers out of bearer tokens.
- fun customerGetFromRowId(customer_id: Long): Customer? = conn { conn ->
+ suspend fun customerGetFromRowId(customer_id: Long): Customer? = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
login,
@@ -314,7 +317,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
}
}
- fun customerChangePassword(customerName: String, passwordHash: String): Boolean = conn { conn ->
+ suspend fun customerChangePassword(customerName: String, passwordHash: String): Boolean = conn { conn ->
val stmt = conn.prepareStatement("""
UPDATE customers SET password_hash=? where login=?
""")
@@ -323,7 +326,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
stmt.executeUpdateCheck()
}
- fun customerGetFromLogin(login: String): Customer? = conn { conn ->
+ suspend fun customerGetFromLogin(login: String): Customer? = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
customer_id,
@@ -354,7 +357,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
// Possibly more "customerGetFrom*()" to come.
// BEARER TOKEN
- fun bearerTokenCreate(token: BearerToken): Boolean = conn { conn ->
+ suspend fun bearerTokenCreate(token: BearerToken): Boolean = conn { conn ->
val stmt = conn.prepareStatement("""
INSERT INTO bearer_tokens
(content,
@@ -374,7 +377,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
stmt.setBoolean(6, token.isRefreshable)
stmt.executeUpdateViolation()
}
- fun bearerTokenGet(token: ByteArray): BearerToken? = conn { conn ->
+ suspend fun bearerTokenGet(token: ByteArray): BearerToken? = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
expiration_time,
@@ -406,7 +409,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
* if deletion succeeds or false if the token could not be
* deleted (= not found).
*/
- fun bearerTokenDelete(token: ByteArray): Boolean = conn { conn ->
+ suspend fun bearerTokenDelete(token: ByteArray): Boolean = conn { conn ->
val stmt = conn.prepareStatement("""
DELETE FROM bearer_tokens
WHERE content = ?
@@ -432,7 +435,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
* The return type expresses either success, or that the target rows
* could not be found.
*/
- fun accountReconfig(
+ suspend fun accountReconfig(
login: String,
name: String?,
cashoutPayto: String?,
@@ -473,7 +476,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
*
* Returns an empty list, if no public account was found.
*/
- fun accountsGetPublic(internalCurrency: String, loginFilter: String = "%"): List<PublicAccount> = conn { conn ->
+ suspend fun accountsGetPublic(internalCurrency: String, loginFilter: String = "%"): List<PublicAccount> = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
(balance).val AS balance_val,
@@ -512,7 +515,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
* LIKE operator. If it's null, it defaults to the "%" wildcard, meaning
* that it returns ALL the existing accounts.
*/
- fun accountsGetForAdmin(nameFilter: String = "%"): List<AccountMinimalData> = conn { conn ->
+ suspend fun accountsGetForAdmin(nameFilter: String = "%"): List<AccountMinimalData> = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
login,
@@ -559,7 +562,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
* row ID in the successful case. If of unique constrain violation,
* it returns null and any other error will be thrown as 500.
*/
- fun bankAccountCreate(bankAccount: BankAccount): Long? = conn { conn ->
+ suspend fun bankAccountCreate(bankAccount: BankAccount): Long? = conn { conn ->
if (bankAccount.balance != null)
throw internalServerError(
"Do not pass a balance upon bank account creation, do a wire transfer instead."
@@ -598,7 +601,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
}
}
- fun bankAccountSetMaxDebt(
+ suspend fun bankAccountSetMaxDebt(
owningCustomerId: Long,
maxDebt: TalerAmount
): Boolean = conn { conn ->
@@ -617,7 +620,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
return bankCurrency
}
- fun bankAccountGetFromOwnerId(ownerId: Long): BankAccount? = conn { conn ->
+ suspend fun bankAccountGetFromOwnerId(ownerId: Long): BankAccount? = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
internal_payto_uri
@@ -665,7 +668,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
val internalPaytoUri: String
)
- fun bankAccountInfoFromCustomerLogin(login: String): BankInfo? = conn { conn ->
+ suspend fun bankAccountInfoFromCustomerLogin(login: String): BankInfo? = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
bank_account_id
@@ -686,7 +689,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
}
}
- fun bankAccountGetFromInternalPayto(internalPayto: String): BankAccount? = conn { conn ->
+ suspend fun bankAccountGetFromInternalPayto(internalPayto: String): BankAccount? = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
bank_account_id
@@ -728,7 +731,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
// BANK ACCOUNT TRANSACTIONS
- fun bankTransactionCreate(
+ suspend fun bankTransactionCreate(
tx: BankInternalTransaction
): BankTransactionResult = conn { conn ->
conn.transaction {
@@ -828,7 +831,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
*
* Returns the row ID if found, null otherwise.
*/
- fun bankTransactionCheckExists(subject: String): Long? = conn { conn ->
+ suspend fun bankTransactionCheckExists(subject: String): Long? = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT bank_transaction_id
FROM bank_account_transactions
@@ -839,7 +842,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
}
// Get the bank transaction whose row ID is rowId
- fun bankTransactionGetFromInternalId(rowId: Long): BankAccountTransaction? = conn { conn ->
+ suspend fun bankTransactionGetFromInternalId(rowId: Long): BankAccountTransaction? = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
creditor_payto_uri
@@ -902,69 +905,70 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
val nbTx = abs(params.delta) // Number of transaction to query
// Range of transaction ids to check
var (min, max) = if (backward) Pair(0L, params.start) else Pair(params.start, Long.MAX_VALUE)
-
- return dbPool.getConnection().use { conn ->
- // Prepare statement
- val stmt = conn.prepareStatement("""
- $query
- WHERE bank_account_id=? AND
- bank_transaction_id > ? AND bank_transaction_id < ?
- ORDER BY bank_transaction_id ${if (backward) "DESC" else "ASC"}
- LIMIT ?
- """)
- stmt.setLong(1, bankAccountId)
-
- fun load(amount: Int): List<T> {
+ val query = """
+ $query
+ WHERE bank_account_id=? AND
+ bank_transaction_id > ? AND bank_transaction_id < ?
+ ORDER BY bank_transaction_id ${if (backward) "DESC" else "ASC"}
+ LIMIT ?
+ """
+
+ suspend fun load(amount: Int): List<T> = conn { conn ->
+ conn.prepareStatement(query).use { stmt ->
+ stmt.setLong(1, bankAccountId)
stmt.setLong(2, min)
stmt.setLong(3, max)
stmt.setInt(4, amount)
- return stmt.all {
+ stmt.all {
// Remember not to check this transaction again
min = kotlin.math.max(it.getLong("bank_transaction_id"), min)
map(it)
}
}
+ }
- val shoudPoll = when {
- params.poll_ms <= 0 -> false
- backward -> {
- val maxId = conn.prepareStatement("SELECT MAX(bank_transaction_id) FROM bank_account_transactions")
- .oneOrNull { it.getLong(1) } ?: 0;
- // Check if a new transaction could appear within the chosen interval
- max > maxId + 1
- }
- else -> true
+ val shoudPoll = when {
+ params.poll_ms <= 0 -> false
+ backward && max == Long.MAX_VALUE -> true
+ backward -> {
+ val maxId = conn {
+ it.prepareStatement("SELECT MAX(bank_transaction_id) FROM bank_account_transactions")
+ .oneOrNull { it.getLong(1) } ?: 0
+ };
+ // Check if a new transaction could appear within the chosen interval
+ max > maxId + 1
}
+ else -> true
+ }
- if (shoudPoll) {
- var history = listOf<T>()
- notifWatcher.(listen)(bankAccountId) { flow ->
- // Start buffering notification before loading transactions to not miss any
- val buffered = flow.buffer()
- // Initial load
- history += load(nbTx)
- // Long polling if transactions are missing
- val missing = nbTx - history.size
- if (missing > 0) {
- withTimeoutOrNull(params.poll_ms) {
- buffered
- .filter { it.rowId > min } // Skip transactions already checked
- .take(missing).count() // Wait for missing transactions
- }
+ if (shoudPoll) {
+ var history = listOf<T>()
+ notifWatcher.(listen)(bankAccountId) { flow ->
+ // Start buffering notification before loading transactions to not miss any
+ val buffered = flow.buffer()
+ // Initial load
+ history += load(nbTx)
+ // Long polling if transactions are missing
+ val missing = nbTx - history.size
+ if (missing > 0) {
+ withTimeoutOrNull(params.poll_ms) {
+ buffered
+ .filter { it.rowId > min } // Skip transactions already checked
+ .take(missing).count() // Wait for missing transactions
+ }
- if (backward) {
- // When going backward, we could find more transactions than we need
- history = (load(nbTx) + history).take(nbTx)
- } else {
- // Only load missing ones
- history += load(missing)
- }
+ if (backward) {
+ // When going backward, we could find more transactions than we need
+ history = (load(nbTx) + history).take(nbTx)
+ } else {
+ // Only load missing ones
+ history += load(missing)
}
}
- history
- } else {
- load(nbTx)
}
+ return history
+ } else {
+ return load(nbTx)
}
}
@@ -1081,7 +1085,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
* moment, only the TWG uses the direction, to provide the /incoming
* and /outgoing endpoints.
*/
- fun bankTransactionGetHistory(
+ suspend fun bankTransactionGetHistory(
start: Long,
delta: Int,
bankAccountId: Long
@@ -1140,7 +1144,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
}
// WITHDRAWALS
- fun talerWithdrawalCreate(
+ suspend fun talerWithdrawalCreate(
opUUID: UUID,
walletBankAccount: Long,
amount: TalerAmount
@@ -1157,7 +1161,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
stmt.setInt(4, amount.frac)
stmt.executeUpdateViolation()
}
- fun talerWithdrawalGet(opUUID: UUID): TalerWithdrawalOperation? = conn { conn ->
+ suspend fun talerWithdrawalGet(opUUID: UUID): TalerWithdrawalOperation? = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
(amount).val as amount_val
@@ -1195,7 +1199,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
* Aborts one Taler withdrawal, only if it wasn't previously
* confirmed. It returns false if the UPDATE didn't succeed.
*/
- fun talerWithdrawalAbort(opUUID: UUID): Boolean = conn { conn ->
+ suspend fun talerWithdrawalAbort(opUUID: UUID): Boolean = conn { conn ->
val stmt = conn.prepareStatement("""
UPDATE taler_withdrawal_operations
SET aborted = true
@@ -1214,7 +1218,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
*
* Checking for idempotency is entirely on the Kotlin side.
*/
- fun talerWithdrawalSetDetails(
+ suspend fun talerWithdrawalSetDetails(
opUuid: UUID,
exchangePayto: String,
reservePub: String
@@ -1235,7 +1239,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
* Confirms a Taler withdrawal: flags the operation as
* confirmed and performs the related wire transfer.
*/
- fun talerWithdrawalConfirm(
+ suspend fun talerWithdrawalConfirm(
opUuid: UUID,
timestamp: Instant,
accountServicerReference: String = "NOT-USED",
@@ -1272,7 +1276,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
/**
* Creates a cashout operation in the database.
*/
- fun cashoutCreate(op: Cashout): Boolean = conn { conn ->
+ suspend fun cashoutCreate(op: Cashout): Boolean = conn { conn ->
val stmt = conn.prepareStatement("""
INSERT INTO cashout_operations (
cashout_uuid
@@ -1333,7 +1337,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
* payment should already have taken place, before calling
* this function.
*/
- fun cashoutConfirm(
+ suspend fun cashoutConfirm(
opUuid: UUID,
tanConfirmationTimestamp: Long,
bankTransaction: Long // regional payment backing the operation
@@ -1360,7 +1364,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
/**
* Deletes a cashout operation from the database.
*/
- fun cashoutDelete(opUuid: UUID): CashoutDeleteResult = conn { conn ->
+ suspend fun cashoutDelete(opUuid: UUID): CashoutDeleteResult = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT out_already_confirmed
FROM cashout_delete(?)
@@ -1379,7 +1383,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
* Gets a cashout operation from the database, according
* to its uuid.
*/
- fun cashoutGetFromUuid(opUuid: UUID): Cashout? = conn { conn ->
+ suspend fun cashoutGetFromUuid(opUuid: UUID): Cashout? = conn { conn ->
val stmt = conn.prepareStatement("""
SELECT
(amount_debit).val as amount_debit_val
@@ -1490,7 +1494,7 @@ class Database(dbConfig: String, private val bankCurrency: String): java.io.Clos
* is the same returned by "bank_wire_transfer()" where however
* the NO_DEBTOR error will hardly take place.
*/
- fun talerTransferCreate(
+ suspend fun talerTransferCreate(
req: TransferRequest,
username: String,
timestamp: Instant,
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Main.kt b/bank/src/main/kotlin/tech/libeufin/bank/Main.kt
@@ -47,8 +47,7 @@ import io.ktor.server.response.*
import io.ktor.server.routing.*
import io.ktor.utils.io.*
import io.ktor.utils.io.jvm.javaio.*
-import kotlinx.coroutines.Dispatchers
-import kotlinx.coroutines.withContext
+import kotlinx.coroutines.*
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.KSerializer
import kotlinx.serialization.descriptors.*
@@ -437,8 +436,10 @@ class ServeBank : CliktCommand("Run libeufin-bank HTTP server", name = "serve")
val servePortLong = config.requireNumber("libeufin-bank", "port")
val servePort = servePortLong.toInt()
val db = Database(dbConnStr, ctx.currency)
- if (!maybeCreateAdminAccount(db, ctx)) // logs provided by the helper
- exitProcess(1)
+ runBlocking {
+ if (!maybeCreateAdminAccount(db, ctx)) // logs provided by the helper
+ exitProcess(1)
+ }
embeddedServer(Netty, port = servePort) {
corebankWebApp(db, ctx)
}.start(wait = true)
@@ -466,14 +467,16 @@ class ChangePw : CliktCommand("Change account password", name = "passwd") {
val dbConnStr = config.requireString("libeufin-bankdb-postgres", "config")
config.requireNumber("libeufin-bank", "port")
val db = Database(dbConnStr, ctx.currency)
- if (!maybeCreateAdminAccount(db, ctx)) // logs provided by the helper
+ runBlocking {
+ if (!maybeCreateAdminAccount(db, ctx)) // logs provided by the helper
exitProcess(1)
- if (!db.customerChangePassword(account, CryptoUtil.hashpw(password))) {
- println("password change failed")
- exitProcess(1)
- } else {
- println("password change succeeded")
+ if (!db.customerChangePassword(account, CryptoUtil.hashpw(password))) {
+ println("password change failed")
+ exitProcess(1)
+ } else {
+ println("password change succeeded")
+ }
}
}
}
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt b/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt
@@ -58,7 +58,7 @@ fun ApplicationCall.getAuthToken(): String? {
* Performs the HTTP basic authentication. Returns the
* authenticated customer on success, or null otherwise.
*/
-fun doBasicAuth(db: Database, encodedCredentials: String): Customer? {
+suspend fun doBasicAuth(db: Database, encodedCredentials: String): Customer? {
val plainUserAndPass = String(base64ToBytes(encodedCredentials), Charsets.UTF_8) // :-separated
val userAndPassSplit = plainUserAndPass.split(
":",
@@ -96,7 +96,7 @@ private fun splitBearerToken(tok: String): String? {
/* Performs the secret-token authentication. Returns the
* authenticated customer on success, null otherwise. */
-fun doTokenAuth(
+suspend fun doTokenAuth(
db: Database,
token: String,
requiredScope: TokenScope,
@@ -268,7 +268,7 @@ fun getWithdrawalConfirmUrl(
* if the query param doesn't parse into a UUID. Currently
* used by the Taler Web/SPA and Integration API handlers.
*/
-fun getWithdrawal(db: Database, opIdParam: String): TalerWithdrawalOperation {
+suspend fun getWithdrawal(db: Database, opIdParam: String): TalerWithdrawalOperation {
val opId = try {
UUID.fromString(opIdParam)
} catch (e: Exception) {
@@ -327,7 +327,7 @@ fun getHistoryParams(params: Parameters): HistoryParams {
*
* It returns false in case of problems, true otherwise.
*/
-fun maybeCreateAdminAccount(db: Database, ctx: BankApplicationContext): Boolean {
+suspend fun maybeCreateAdminAccount(db: Database, ctx: BankApplicationContext): Boolean {
val maybeAdminCustomer = db.customerGetFromLogin("admin")
val adminCustomerId: Long = if (maybeAdminCustomer == null) {
logger.debug("Creating admin's customer row")
diff --git a/bank/src/test/kotlin/TalerApiTest.kt b/bank/src/test/kotlin/TalerApiTest.kt
@@ -84,7 +84,7 @@ class TalerApiTest {
).assertSuccess()
}
- fun commonSetup(lambda: (Database, BankApplicationContext) -> Unit) {
+ fun commonSetup(lambda: suspend (Database, BankApplicationContext) -> Unit) {
setup { db, ctx ->
// Creating the exchange and merchant accounts first.
assertNotNull(db.customerCreate(customerFoo))
diff --git a/bank/src/test/kotlin/helpers.kt b/bank/src/test/kotlin/helpers.kt
@@ -1,11 +1,8 @@
import io.ktor.http.*
import io.ktor.client.statement.*
import io.ktor.client.request.*
-import kotlinx.serialization.json.Json
-import kotlinx.serialization.json.JsonObjectBuilder
-import kotlinx.serialization.json.JsonObject
-import kotlinx.serialization.json.JsonElement
-import kotlinx.serialization.json.JsonPrimitive
+import kotlinx.coroutines.*
+import kotlinx.serialization.json.*
import net.taler.wallet.crypto.Base32Crockford
import kotlin.test.assertEquals
import tech.libeufin.bank.*
@@ -16,7 +13,7 @@ import java.util.zip.DeflaterOutputStream
fun setup(
conf: String = "test.conf",
- lambda: (Database, BankApplicationContext) -> Unit
+ lambda: suspend (Database, BankApplicationContext) -> Unit
){
val config = TalerConfig(BANK_CONFIG_SOURCE)
config.load("conf/$conf")
@@ -26,11 +23,13 @@ fun setup(
initializeDatabaseTables(dbConnStr, sqlPath)
val ctx = BankApplicationContext.readFromConfig(config)
Database(dbConnStr, ctx.currency).use {
- lambda(it, ctx)
+ runBlocking {
+ lambda(it, ctx)
+ }
}
}
-fun setupDb(lambda: (Database) -> Unit) {
+fun setupDb(lambda: suspend (Database) -> Unit) {
setup() { db, _ -> lambda(db) }
}