commit 6ae01a8559757282663af4584e79fe81ecdefcb0
parent 9be42f48b9aa05f87e4e53ae3844af79b9dee33e
Author: Antoine A <>
Date: Thu, 7 Nov 2024 19:11:24 +0100
bank: fix serialization test and improve db procedure perf with explicit locking
Diffstat:
5 files changed, 94 insertions(+), 40 deletions(-)
diff --git a/bank/src/test/kotlin/DatabaseTest.kt b/bank/src/test/kotlin/DatabaseTest.kt
@@ -18,8 +18,7 @@
*/
import io.ktor.http.*
-import kotlinx.coroutines.coroutineScope
-import kotlinx.coroutines.launch
+import kotlinx.coroutines.*
import org.junit.Test
import tech.libeufin.bank.createAdminAccount
import tech.libeufin.bank.db.AccountDAO.AccountCreationResult
@@ -51,6 +50,10 @@ class DatabaseTest {
@Test
fun serialisation() = bankSetup { db ->
+ client.cachedToken("customer")
+ client.cachedToken("merchant")
+ client.cachedToken("admin")
+
assertBalance("customer", "+KUDOS:0")
assertBalance("merchant", "+KUDOS:0")
@@ -66,19 +69,42 @@ class DatabaseTest {
assertBalance("merchant", "+KUDOS:4.5")
// Generate concurrent write and read transactions and check that they only write transaction sometimes fails
+ var failures = 0
coroutineScope {
repeat(100) {
- // Write transaction
+ // Write transaction from merchant to customer
launch {
while (true) {
val result = client.postA("/accounts/customer/transactions") {
json {
- "payto_uri" to "$merchantPayto?message=${"concurrent 0$it".encodeURLParameter()}&amount=KUDOS:0.0$it"
+ "payto_uri" to "$merchantPayto?message=${"concurrent customer 0$it".encodeURLParameter()}&amount=KUDOS:0.0$it"
+ }
+ }
+ if (result.status == HttpStatusCode.InternalServerError) {
+ val body = result.json<TalerError>()
+ assertEquals(TalerErrorCode.BANK_SOFT_EXCEPTION.code, body.code)
+ failures += 1
+ delay(10)
+ continue // retry
+ } else {
+ result.assertOk()
+ break
+ }
+ }
+ }
+ // Write transactions from customer to merchant
+ launch {
+ while (true) {
+ val result = client.postA("/accounts/merchant/transactions") {
+ json {
+ "payto_uri" to "$customerPayto?message=${"concurrent merchant 0$it".encodeURLParameter()}&amount=KUDOS:0.0$it"
}
}
if (result.status == HttpStatusCode.InternalServerError) {
val body = result.json<TalerError>()
assertEquals(TalerErrorCode.BANK_SOFT_EXCEPTION.code, body.code)
+ failures += 1
+ delay(10)
continue // retry
} else {
result.assertOk()
@@ -104,8 +130,9 @@ class DatabaseTest {
}
}
}
- assertBalance("customer", "-KUDOS:9.855")
- assertBalance("merchant", "+KUDOS:9.855")
+ assert(failures > 0) { "Write operation must sometimes fail under contention" }
+ assertBalance("customer", "-KUDOS:4.5")
+ assertBalance("merchant", "+KUDOS:4.5")
}
@Test
diff --git a/common/src/main/kotlin/api/server.kt b/common/src/main/kotlin/api/server.kt
@@ -39,6 +39,7 @@ import org.postgresql.util.PSQLState
import org.slf4j.Logger
import org.slf4j.event.Level
import tech.libeufin.common.*
+import tech.libeufin.common.db.SERIALIZATION_ERROR
import java.net.InetAddress
import java.sql.SQLException
import java.util.zip.DataFormatException
@@ -159,13 +160,14 @@ fun Application.talerApi(logger: Logger, routes: Routing.() -> Unit) {
when (cause) {
is ApiException -> call.err(cause)
is SQLException -> {
- when (cause.sqlState) {
- PSQLState.SERIALIZATION_FAILURE.state -> call.err(
+ if (SERIALIZATION_ERROR.contains(cause.sqlState)) {
+ call.err(
HttpStatusCode.InternalServerError,
"Transaction serialization failure",
TalerErrorCode.BANK_SOFT_EXCEPTION
)
- else -> call.err(
+ } else {
+ call.err(
HttpStatusCode.InternalServerError,
"Unexpected sql error with state ${cause.sqlState}",
TalerErrorCode.BANK_UNMANAGED_EXCEPTION
diff --git a/common/src/main/kotlin/db/transaction.kt b/common/src/main/kotlin/db/transaction.kt
@@ -30,14 +30,19 @@ import java.sql.SQLException
internal val logger: Logger = LoggerFactory.getLogger("libeufin-db")
+val SERIALIZATION_ERROR = setOf(
+ "40001", //serialization_failure
+ "40P01", // deadlock_detected
+ "55P03", // lock_not_available
+)
+
/** Executes db logic with automatic retry on serialization errors */
suspend fun <R> retrySerializationError(lambda: suspend () -> R): R {
repeat(SERIALIZATION_RETRY) {
try {
return lambda()
} catch (e: SQLException) {
- if (e.sqlState != PSQLState.SERIALIZATION_FAILURE.state)
- throw e
+ if (!SERIALIZATION_ERROR.contains(e.sqlState)) throw e
}
}
return lambda()
diff --git a/common/src/main/kotlin/test/helpers.kt b/common/src/main/kotlin/test/helpers.kt
@@ -137,6 +137,7 @@ private fun HttpRequestBuilder.extractUsername(username: String? = null): String
else -> null
}
+/** Authenticate a request for [username] with basic auth */
fun HttpRequestBuilder.pwAuth(username: String? = null) {
val username = extractUsername(username) ?: return
basicAuth("$username", "$username-password")
@@ -144,13 +145,12 @@ fun HttpRequestBuilder.pwAuth(username: String? = null) {
val globalTestTokens = mutableMapOf<String, String>()
-suspend fun HttpRequestBuilder.tokenAuth(client: HttpClient, username: String? = null) {
- // Get username from arg or path
- val username = extractUsername(username) ?: return
+/** Get cached token or create it */
+suspend fun HttpClient.cachedToken(username: String): String {
// Get cached token or create it
var token = globalTestTokens.get(username)
if (token == null) {
- val response = client.post("/accounts/$username/token") {
+ val response = this.post("/accounts/$username/token") {
pwAuth()
json {
"scope" to "readwrite"
@@ -162,6 +162,15 @@ suspend fun HttpRequestBuilder.tokenAuth(client: HttpClient, username: String? =
token = Json.decodeFromJsonElement<String>(response.get("access_token")!!)
globalTestTokens.set(username, token)
}
+ return token
+}
+
+/** Authenticate a request for [username] with a bearer token */
+suspend fun HttpRequestBuilder.tokenAuth(client: HttpClient, username: String? = null) {
+ // Get username from arg or path
+ val username = extractUsername(username) ?: return
+ // Get cached token or create it
+ var token = client.cachedToken(username)
// Set authorization header
headers[HttpHeaders.Authorization] = "Bearer $token"
}
\ No newline at end of file
diff --git a/database-versioning/libeufin-bank-procedures.sql b/database-versioning/libeufin-bank-procedures.sql
@@ -205,6 +205,7 @@ CREATE FUNCTION bank_wire_transfer(
)
LANGUAGE plpgsql AS $$
DECLARE
+has_fee BOOLEAN;
amount_with_fee taler_amount;
admin_account_id INT8;
admin_has_debt BOOLEAN;
@@ -222,7 +223,6 @@ creditor_payto TEXT;
creditor_name TEXT;
tmp_balance taler_amount;
BEGIN
-
-- Check min and max
SELECT (SELECT in_min_amount IS NOT NULL AND NOT ok FROM amount_left_minus_right(in_amount, in_min_amount)) OR
(SELECT in_max_amount IS NOT NULL AND NOT ok FROM amount_left_minus_right(in_max_amount, in_amount))
@@ -231,6 +231,25 @@ IF out_bad_amount THEN
RETURN;
END IF;
+has_fee = in_wire_transfer_fees IS NOT NULL AND in_wire_transfer_fees != (0, 0)::taler_amount;
+IF has_fee THEN
+ -- Retrieve admin info
+ SELECT
+ bank_account_id, has_debt,
+ (balance).val, (balance).frac,
+ internal_payto, customers.name
+ INTO
+ admin_account_id, admin_has_debt,
+ admin_balance.val, admin_balance.frac,
+ admin_payto, admin_name
+ FROM bank_accounts
+ JOIN customers ON customer_id=owning_customer_id
+ WHERE username = 'admin'
+ FOR UPDATE;
+ IF NOT FOUND THEN
+ RAISE EXCEPTION 'No admin';
+ END IF;
+END IF;
-- Retrieve debtor info
SELECT
has_debt,
@@ -244,7 +263,8 @@ SELECT
debtor_payto, debtor_name
FROM bank_accounts
JOIN customers ON customer_id=owning_customer_id
- WHERE bank_account_id=in_debtor_account_id;
+ WHERE bank_account_id=in_debtor_account_id
+ FOR UPDATE;
IF NOT FOUND THEN
RAISE EXCEPTION 'Unknown debtor %', in_debtor_account_id;
END IF;
@@ -259,32 +279,19 @@ SELECT
creditor_payto, creditor_name
FROM bank_accounts
JOIN customers ON customer_id=owning_customer_id
- WHERE bank_account_id=in_creditor_account_id;
+ WHERE bank_account_id=in_creditor_account_id
+ FOR UPDATE NOWAIT;
IF NOT FOUND THEN
RAISE EXCEPTION 'Unknown creditor %', in_creditor_account_id;
END IF;
--- Retrieve admin info
-SELECT
- bank_account_id, has_debt,
- (balance).val, (balance).frac,
- internal_payto, customers.name
- INTO
- admin_account_id, admin_has_debt,
- admin_balance.val, admin_balance.frac,
- admin_payto, admin_name
- FROM bank_accounts
- JOIN customers ON customer_id=owning_customer_id
- WHERE username = 'admin';
-IF NOT FOUND THEN
- RAISE EXCEPTION 'No admin';
-END IF;
-- Add fees to the amount
-IF in_wire_transfer_fees IS NOT NULL AND in_wire_transfer_fees != (0, 0)::taler_amount AND admin_account_id != in_debtor_account_id THEN
+IF has_fee AND admin_account_id != in_debtor_account_id THEN
SELECT sum.val, sum.frac
INTO amount_with_fee.val, amount_with_fee.frac
FROM amount_add(in_amount, in_wire_transfer_fees) as sum;
ELSE
+ has_fee=false;
amount_with_fee = in_amount;
END IF;
@@ -366,7 +373,7 @@ END IF;
-- Here we figure out whether the administrator would switch
-- from debit to a credit situation, and adjust the balance
-- accordingly.
-IF amount_with_fee != in_amount THEN
+IF has_fee THEN
IF NOT admin_has_debt THEN -- easy case.
SELECT sum.val, sum.frac
INTO admin_balance.val, admin_balance.frac
@@ -452,7 +459,7 @@ SET
WHERE bank_account_id=in_creditor_account_id;
-- Fee part
-IF amount_with_fee != in_amount THEN
+IF has_fee THEN
INSERT INTO bank_account_transactions (
creditor_payto
,creditor_name
@@ -520,7 +527,8 @@ SELECT
,out_balance_not_zero
FROM customers
JOIN bank_accounts ON owning_customer_id = customer_id
- WHERE username = in_username AND deleted_at IS NULL;
+ WHERE username = in_username AND deleted_at IS NULL
+ FOR UPDATE;
IF NOT FOUND OR out_balance_not_zero OR out_tan_required THEN
out_not_found=NOT FOUND;
RETURN;
@@ -993,7 +1001,8 @@ SELECT
wallet_bank_account
INTO not_selected, out_status, out_already_selected, out_amount_differs, account_id
FROM taler_withdrawal_operations
- WHERE withdrawal_uuid=in_withdrawal_uuid;
+ WHERE withdrawal_uuid=in_withdrawal_uuid
+ FOR UPDATE;
IF NOT FOUND OR out_already_selected OR out_amount_differs THEN
out_no_op=NOT FOUND;
RETURN;
@@ -1128,7 +1137,8 @@ SELECT
FROM taler_withdrawal_operations
JOIN bank_accounts ON wallet_bank_account=bank_account_id
JOIN customers ON owning_customer_id=customer_id
- WHERE withdrawal_uuid=in_withdrawal_uuid AND username=in_username AND deleted_at IS NULL;
+ WHERE withdrawal_uuid=in_withdrawal_uuid AND username=in_username AND deleted_at IS NULL
+ FOR UPDATE;
out_no_op=NOT FOUND;
IF out_no_op OR already_confirmed OR out_aborted OR out_not_selected OR out_missing_amount OR out_amount_differs THEN
RETURN;
@@ -1454,7 +1464,8 @@ SELECT
,in_timestamp >= retransmission_date AND confirmation_date IS NULL
,code, COALESCE(tan_channel, out_tan_channel), COALESCE(tan_info, out_tan_info)
INTO expired, retransmit, out_tan_code, out_tan_channel, out_tan_info
-FROM tan_challenges WHERE challenge_id = in_challenge_id AND customer = account_id;
+FROM tan_challenges WHERE challenge_id = in_challenge_id AND customer = account_id
+FOR UPDATE;
IF NOT FOUND THEN
out_no_op = true;
RETURN;