libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit dfe4e5ccc18ae9065a350abaf1dd96b23f10f53d
parent 3aa2467099811fd22dd1437366cdbe65ee12e2a1
Author: Antoine A <>
Date:   Thu, 16 Nov 2023 14:50:58 +0000

Support long polling for  withdrawal confirmation and improve polling testing

Diffstat:
Mbank/src/main/kotlin/tech/libeufin/bank/BankIntegrationApi.kt | 5+++--
Mbank/src/main/kotlin/tech/libeufin/bank/Params.kt | 35+++++++++++++++++++++++------------
Mbank/src/main/kotlin/tech/libeufin/bank/db/Database.kt | 22+++++++++-------------
Mbank/src/main/kotlin/tech/libeufin/bank/db/NotificationWatcher.kt | 74++++++++++++++++++++++++++++++++++++++++++--------------------------------
Mbank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt | 86+++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------
Mbank/src/test/kotlin/BankIntegrationApiTest.kt | 44+++++++++++++++++++++++++++++++++++++++++---
Mbank/src/test/kotlin/CoreBankApiTest.kt | 58++++++++++++++++++++--------------------------------------
Mbank/src/test/kotlin/RevenueApiTest.kt | 12++++++------
Mbank/src/test/kotlin/WireGatewayApiTest.kt | 48+++++++++++++++++++-----------------------------
Mbank/src/test/kotlin/helpers.kt | 10++++++++++
Mdatabase-versioning/libeufin-bank-procedures.sql | 3+++
11 files changed, 234 insertions(+), 163 deletions(-)

diff --git a/bank/src/main/kotlin/tech/libeufin/bank/BankIntegrationApi.kt b/bank/src/main/kotlin/tech/libeufin/bank/BankIntegrationApi.kt @@ -27,6 +27,7 @@ import io.ktor.server.response.* import io.ktor.server.routing.* import net.taler.common.errorcodes.TalerErrorCode import java.util.* +import tech.libeufin.bank.PollingParams fun Routing.bankIntegrationApi(db: Database, ctx: BankConfig) { get("/taler-integration/config") { @@ -38,9 +39,9 @@ fun Routing.bankIntegrationApi(db: Database, ctx: BankConfig) { // Note: wopid acts as an authentication token. get("/taler-integration/withdrawal-operation/{wopid}") { - // TODO long poll val uuid = call.uuidUriComponent("wopid") - val op = db.withdrawal.getStatus(uuid) ?: throw notFound( + val params = PollingParams.extract(call.request.queryParameters) + val op = db.withdrawal.pollStatus(uuid, params) ?: throw notFound( "Withdrawal operation '$uuid' not found", TalerErrorCode.BANK_TRANSACTION_NOT_FOUND ) diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Params.kt b/bank/src/main/kotlin/tech/libeufin/bank/Params.kt @@ -81,18 +81,6 @@ data class MonitorParams( } } -data class HistoryParams( - val page: PageParams, val poll_ms: Long -) { - companion object { - fun extract(params: Parameters): HistoryParams { - val poll_ms: Long = params.long("long_poll_ms") ?: 0 - // TODO check poll_ms range - return HistoryParams(PageParams.extract(params), poll_ms) - } - } -} - data class AccountParams( val page: PageParams, val loginFilter: String ) { @@ -111,12 +99,35 @@ data class PageParams( fun extract(params: Parameters): PageParams { val delta: Int = params.int("delta") ?: -20 val start: Long = params.long("start") ?: if (delta >= 0) 0L else Long.MAX_VALUE + if (start < 0) throw badRequest("Param 'start' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) // TODO enforce delta limit return PageParams(delta, start) } } } +data class PollingParams( + val poll_ms: Long +) { + companion object { + fun extract(params: Parameters): PollingParams { + val poll_ms: Long = params.long("long_poll_ms") ?: 0 + if (poll_ms < 0) throw badRequest("Param 'long_poll_ms' must be a positive number", TalerErrorCode.GENERIC_PARAMETER_MALFORMED) + return PollingParams(poll_ms) + } + } +} + +data class HistoryParams( + val page: PageParams, val polling: PollingParams +) { + companion object { + fun extract(params: Parameters): HistoryParams { + return HistoryParams(PageParams.extract(params), PollingParams.extract(params)) + } + } +} + data class RateParams( val debit: TalerAmount?, val credit: TalerAmount? ) { diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt @@ -746,10 +746,7 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val } } - /** - * The following function returns the list of transactions, according - * to the page parameters - */ + /** Apply paging logic to a sql query */ internal suspend fun <T> page( params: PageParams, idName: String, @@ -779,7 +776,7 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val internal suspend fun <T> poolHistory( params: HistoryParams, bankAccountId: Long, - listen: suspend NotificationWatcher.(Long, suspend (Flow<Long>) -> Unit) -> Unit, + listen: suspend NotificationWatcher.(Long, suspend (Flow<Long>) -> List<T>) -> List<T>, query: String, accountColumn: String = "bank_account_id", map: (ResultSet) -> T @@ -799,30 +796,29 @@ class Database(dbConfig: String, internal val bankCurrency: String, internal val // TODO do we want to handle polling when going backward and there is no transactions yet ? // When going backward there is always at least one transaction or none - if (params.page.delta >= 0 && params.poll_ms > 0) { - var history = listOf<T>() + return if (params.page.delta >= 0 && params.polling.poll_ms > 0) { notifWatcher.(listen)(bankAccountId) { flow -> coroutineScope { // Start buffering notification before loading transactions to not miss any val polling = launch { - withTimeoutOrNull(params.poll_ms) { + withTimeoutOrNull(params.polling.poll_ms) { flow.first { it > params.page.start } // Always forward so > } } // Initial loading - history = load() + val init = load() // Long polling if we found no transactions - if (history.isEmpty()) { + if (init.isEmpty()) { polling.join() - history = load() + load() } else { polling.cancel() + init } } } - return history } else { - return load() + load() } } diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/NotificationWatcher.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/NotificationWatcher.kt @@ -21,6 +21,7 @@ package tech.libeufin.bank import org.postgresql.ds.PGSimpleDataSource import java.util.concurrent.ConcurrentHashMap +import java.util.UUID import kotlinx.coroutines.flow.* import kotlinx.coroutines.* import tech.libeufin.util.* @@ -29,9 +30,10 @@ internal class NotificationWatcher(private val pgSource: PGSimpleDataSource) { private class CountedSharedFlow(val flow: MutableSharedFlow<Long>, var count: Int) private val bankTxFlows = ConcurrentHashMap<Long, CountedSharedFlow>() - private val revenueTxFlows = ConcurrentHashMap<Long, CountedSharedFlow>() private val outgoingTxFlows = ConcurrentHashMap<Long, CountedSharedFlow>() private val incomingTxFlows = ConcurrentHashMap<Long, CountedSharedFlow>() + private val revenueTxFlows = ConcurrentHashMap<Long, CountedSharedFlow>() + private val withdrawalFlow = MutableSharedFlow<UUID>() init { kotlin.concurrent.thread(isDaemon = true) { @@ -42,30 +44,39 @@ internal class NotificationWatcher(private val pgSource: PGSimpleDataSource) { conn.execSQLUpdate("LISTEN bank_tx") conn.execSQLUpdate("LISTEN outgoing_tx") conn.execSQLUpdate("LISTEN incoming_tx") + conn.execSQLUpdate("LISTEN withdrawal_confirm") while (true) { conn.getNotifications(0) // Block until we receive at least one notification .forEach { - if (it.name == "bank_tx") { - val (debtor, creditor, debitRow, creditRow) = it.parameter.split(' ', limit = 4).map { it.toLong() } - bankTxFlows[debtor]?.run { - flow.emit(debitRow) + when (it.name) { + "bank_tx" -> { + val (debtor, creditor, debitRow, creditRow) = it.parameter.split(' ', limit = 4).map { it.toLong() } + bankTxFlows[debtor]?.run { + flow.emit(debitRow) + } + bankTxFlows[creditor]?.run { + flow.emit(creditRow) + } } - bankTxFlows[creditor]?.run { - flow.emit(creditRow) + "outgoing_tx" -> { + val (account, merchant, debitRow, creditRow) = it.parameter.split(' ', limit = 4).map { it.toLong() } + outgoingTxFlows[account]?.run { + flow.emit(debitRow) + } + revenueTxFlows[merchant]?.run { + flow.emit(creditRow) + } } - } else if (it.name == "outgoing_tx") { - val (account, merchant, debitRow, creditRow) = it.parameter.split(' ', limit = 4).map { it.toLong() } - outgoingTxFlows[account]?.run { - flow.emit(debitRow) + "incoming_tx" -> { + val (account, row) = it.parameter.split(' ', limit = 2).map { it.toLong() } + incomingTxFlows[account]?.run { + flow.emit(row) + } } - revenueTxFlows[merchant]?.run { - flow.emit(creditRow) - } - } else { - val (account, row) = it.parameter.split(' ', limit = 2).map { it.toLong() } - incomingTxFlows[account]?.run { - flow.emit(row) + "withdrawal_confirm" -> { + val uuid = UUID.fromString(it.parameter) + withdrawalFlow.emit(uuid) } } } @@ -78,7 +89,7 @@ internal class NotificationWatcher(private val pgSource: PGSimpleDataSource) { } } - private suspend fun listen(map: ConcurrentHashMap<Long, CountedSharedFlow>, account: Long, lambda: suspend (Flow<Long>) -> Unit) { + private suspend fun <R> listen(map: ConcurrentHashMap<Long, CountedSharedFlow>, account: Long, lambda: suspend (Flow<Long>) -> R): R { // Register listener val flow = map.compute(account) { _, v -> val tmp = v ?: CountedSharedFlow(MutableSharedFlow(), 0); @@ -87,7 +98,7 @@ internal class NotificationWatcher(private val pgSource: PGSimpleDataSource) { }!!.flow; try { - lambda(flow) + return lambda(flow) } finally { // Unregister listener map.compute(account) { _, v -> @@ -98,19 +109,18 @@ internal class NotificationWatcher(private val pgSource: PGSimpleDataSource) { } } - suspend fun listenBank(account: Long, lambda: suspend (Flow<Long>) -> Unit) { - listen(bankTxFlows, account, lambda) - } + suspend fun <R> listenBank(account: Long, lambda: suspend (Flow<Long>) -> R): R + = listen(bankTxFlows, account, lambda) - suspend fun listenOutgoing(account: Long, lambda: suspend (Flow<Long>) -> Unit) { - listen(outgoingTxFlows, account, lambda) - } + suspend fun <R> listenOutgoing(account: Long, lambda: suspend (Flow<Long>) -> R): R + = listen(outgoingTxFlows, account, lambda) - suspend fun listenIncoming(account: Long, lambda: suspend (Flow<Long>) -> Unit) { - listen(incomingTxFlows, account, lambda) - } + suspend fun <R> listenIncoming(account: Long, lambda: suspend (Flow<Long>) -> R): R + = listen(incomingTxFlows, account, lambda) - suspend fun listenRevenue(account: Long, lambda: suspend (Flow<Long>) -> Unit) { - listen(revenueTxFlows, account, lambda) - } + suspend fun <R> listenRevenue(account: Long, lambda: suspend (Flow<Long>) -> R): R + = listen(revenueTxFlows, account, lambda) + + suspend fun <R> listenWithdrawals(lambda: suspend (Flow<UUID>) -> R): R + = lambda(withdrawalFlow) } \ No newline at end of file diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/WithdrawalDAO.kt @@ -24,6 +24,8 @@ import java.time.Instant import java.time.Duration import java.util.concurrent.TimeUnit import tech.libeufin.util.* +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.* /** Result status of withdrawal operation creation */ enum class WithdrawalCreationResult { @@ -112,34 +114,62 @@ class WithdrawalDAO(private val db: Database) { } } - suspend fun getStatus(uuid: UUID): BankWithdrawalOperationStatus? = db.conn { conn -> - val stmt = conn.prepareStatement(""" - SELECT - (amount).val as amount_val - ,(amount).frac as amount_frac - ,selection_done - ,aborted - ,confirmation_done - ,internal_payto_uri - FROM taler_withdrawal_operations - JOIN bank_accounts ON (wallet_bank_account=bank_account_id) - WHERE withdrawal_uuid=? - """) - stmt.setObject(1, uuid) - stmt.oneOrNull { - BankWithdrawalOperationStatus( - amount = TalerAmount( - it.getLong("amount_val"), - it.getInt("amount_frac"), - db.bankCurrency - ), - selection_done = it.getBoolean("selection_done"), - transfer_done = it.getBoolean("confirmation_done"), - aborted = it.getBoolean("aborted"), - sender_wire = it.getString("internal_payto_uri"), - confirm_transfer_url = null, - suggested_exchange = null - ) + suspend fun pollStatus(uuid: UUID, params: PollingParams): BankWithdrawalOperationStatus? { + suspend fun load(): BankWithdrawalOperationStatus? = db.conn { conn -> + val stmt = conn.prepareStatement(""" + SELECT + (amount).val as amount_val + ,(amount).frac as amount_frac + ,selection_done + ,aborted + ,confirmation_done + ,internal_payto_uri + FROM taler_withdrawal_operations + JOIN bank_accounts ON (wallet_bank_account=bank_account_id) + WHERE withdrawal_uuid=? + """) + stmt.setObject(1, uuid) + stmt.oneOrNull { + BankWithdrawalOperationStatus( + amount = TalerAmount( + it.getLong("amount_val"), + it.getInt("amount_frac"), + db.bankCurrency + ), + selection_done = it.getBoolean("selection_done"), + transfer_done = it.getBoolean("confirmation_done"), + aborted = it.getBoolean("aborted"), + sender_wire = it.getString("internal_payto_uri"), + confirm_transfer_url = null, + suggested_exchange = null + ) + } + } + + + return if (params.poll_ms > 0) { + db.notifWatcher.listenWithdrawals { flow -> + coroutineScope { + // Start buffering notification before loading transactions to not miss any + val polling = launch { + withTimeoutOrNull(params.poll_ms) { + flow.first { it == uuid } + } + } + // Initial loading + val init = load() + // Long polling if there is no operation or its not confirmed + if (init?.run { transfer_done == false } ?: true) { + polling.join() + load() + } else { + polling.cancel() + init + } + } + } + } else { + load() } } diff --git a/bank/src/test/kotlin/BankIntegrationApiTest.kt b/bank/src/test/kotlin/BankIntegrationApiTest.kt @@ -26,15 +26,53 @@ class BankIntegrationApiTest { // GET /taler-integration/withdrawal-operation/UUID @Test fun get() = bankSetup { _ -> + val amount = TalerAmount("KUDOS:9.0") // Check OK - client.postA("/accounts/merchant/withdrawals") { - json { "amount" to "KUDOS:9" } + client.postA("/accounts/customer/withdrawals") { + json { "amount" to amount } }.assertOkJson<BankAccountCreateWithdrawalResponse> { resp -> val uuid = resp.taler_withdraw_uri.split("/").last() client.get("/taler-integration/withdrawal-operation/$uuid") - .assertOk() + .assertOkJson<BankWithdrawalOperationStatus> { + assert(!it.selection_done) + assert(!it.aborted) + assert(!it.transfer_done) + assertEquals(amount, it.amount) + // TODO check all status + } } + // Check polling + client.postA("/accounts/customer/withdrawals") { + json { "amount" to amount } + }.assertOkJson<BankAccountCreateWithdrawalResponse> { resp -> + val never_confirmed_uuid = resp.taler_withdraw_uri.split("/").last() + val confirmed_uuid = client.postA("/accounts/customer/withdrawals") { + json { "amount" to amount } + }.assertOkJson<BankAccountCreateWithdrawalResponse>() + .taler_withdraw_uri.split("/").last() + withdrawalSelect(confirmed_uuid) + + coroutineScope { + launch { // Check polling succeed forward + assertTime(100, 200) { + client.get("/taler-integration/withdrawal-operation/$confirmed_uuid?long_poll_ms=1000") + .assertOkJson<BankWithdrawalOperationStatus> { assert(it.selection_done) } + } + } + launch { // Check polling timeout forward + assertTime(200, 300) { + client.get("/taler-integration/withdrawal-operation/$never_confirmed_uuid?long_poll_ms=200") + .assertOkJson<BankWithdrawalOperationStatus> { assert(!it.selection_done) } + } + } + delay(100) + client.post("/withdrawals/$confirmed_uuid/confirm").assertNoContent() + } + } + + + // Check unknown client.get("/taler-integration/withdrawal-operation/${UUID.randomUUID()}") .assertNotFound(TalerErrorCode.BANK_TRANSACTION_NOT_FOUND) diff --git a/bank/src/test/kotlin/CoreBankApiTest.kt b/bank/src/test/kotlin/CoreBankApiTest.kt @@ -560,32 +560,32 @@ class CoreBankTransactionsApiTest { } // Check no useless polling - assertTime(0, 200) { + assertTime(0, 100) { client.get("/accounts/merchant/transactions?delta=-6&start=11&long_poll_ms=1000") { pwAuth("merchant") }.assertHistory(5) } // Check no polling when find transaction - assertTime(0, 200) { + assertTime(0, 100) { client.getA("/accounts/merchant/transactions?delta=6&long_poll_ms=1000") .assertHistory(5) } coroutineScope { launch { // Check polling succeed - assertTime(200, 1000) { + assertTime(100, 200) { client.getA("/accounts/merchant/transactions?delta=2&start=10&long_poll_ms=1000") .assertHistory(1) } } launch { // Check polling timeout - assertTime(200, 400) { - client.getA("/accounts/merchant/transactions?delta=1&start=11&long_poll_ms=300") + assertTime(200, 300) { + client.getA("/accounts/merchant/transactions?delta=1&start=11&long_poll_ms=200") .assertNoContent() } } - delay(200) + delay(100) tx("merchant", "KUDOS:4.2", "exchange") } @@ -762,13 +762,20 @@ class CoreBankWithdrawalApiTest { // GET /withdrawals/withdrawal_id @Test fun get() = bankSetup { _ -> + val amount = TalerAmount("KUDOS:9.0") // Check OK client.postA("/accounts/merchant/withdrawals") { - json { "amount" to "KUDOS:9.0" } + json { "amount" to amount} }.assertOkJson<BankAccountCreateWithdrawalResponse> { client.get("/withdrawals/${it.withdrawal_id}") { pwAuth("merchant") - }.assertOk() + }.assertOkJson<BankAccountGetWithdrawalResponse> { + assert(!it.selection_done) + assert(!it.aborted) + assert(!it.confirmation_done) + assertEquals(amount, it.amount) + // TODO check all status + } } // Check bad UUID @@ -799,12 +806,7 @@ class CoreBankWithdrawalApiTest { json { "amount" to "KUDOS:1" } }.assertOkJson<BankAccountCreateWithdrawalResponse> { val uuid = it.taler_withdraw_uri.split("/").last() - client.post("/taler-integration/withdrawal-operation/$uuid") { - json { - "reserve_pub" to randEddsaPublicKey() - "selected_exchange" to exchangePayto - } - }.assertOk() + withdrawalSelect(uuid) // Check OK client.post("/withdrawals/$uuid/abort").assertNoContent() @@ -817,12 +819,7 @@ class CoreBankWithdrawalApiTest { json { "amount" to "KUDOS:1" } }.assertOkJson<BankAccountCreateWithdrawalResponse> { val uuid = it.taler_withdraw_uri.split("/").last() - client.post("/taler-integration/withdrawal-operation/$uuid") { - json { - "reserve_pub" to randEddsaPublicKey() - "selected_exchange" to exchangePayto - } - }.assertOk() + withdrawalSelect(uuid) client.post("/withdrawals/$uuid/confirm").assertNoContent() // Check error @@ -857,12 +854,7 @@ class CoreBankWithdrawalApiTest { json { "amount" to "KUDOS:1" } }.assertOkJson<BankAccountCreateWithdrawalResponse> { val uuid = it.taler_withdraw_uri.split("/").last() - client.post("/taler-integration/withdrawal-operation/$uuid") { - json { - "reserve_pub" to randEddsaPublicKey() - "selected_exchange" to exchangePayto - } - }.assertOk() + withdrawalSelect(uuid) // Check OK client.post("/withdrawals/$uuid/confirm").assertNoContent() @@ -875,12 +867,7 @@ class CoreBankWithdrawalApiTest { json { "amount" to "KUDOS:1" } }.assertOkJson<BankAccountCreateWithdrawalResponse> { val uuid = it.taler_withdraw_uri.split("/").last() - client.post("/taler-integration/withdrawal-operation/$uuid") { - json { - "reserve_pub" to randEddsaPublicKey() - "selected_exchange" to exchangePayto - } - }.assertOk() + withdrawalSelect(uuid) client.post("/withdrawals/$uuid/abort").assertNoContent() // Check error @@ -893,12 +880,7 @@ class CoreBankWithdrawalApiTest { json { "amount" to "KUDOS:5" } }.assertOkJson<BankAccountCreateWithdrawalResponse> { val uuid = it.taler_withdraw_uri.split("/").last() - client.post("/taler-integration/withdrawal-operation/$uuid") { - json { - "reserve_pub" to randEddsaPublicKey() - "selected_exchange" to exchangePayto - } - }.assertOk() + withdrawalSelect(uuid) // Send too much money tx("merchant", "KUDOS:5", "exchange") diff --git a/bank/src/test/kotlin/RevenueApiTest.kt b/bank/src/test/kotlin/RevenueApiTest.kt @@ -56,31 +56,31 @@ class RevenueApiTest { .assertHistory(5) // Check no useless polling - assertTime(0, 200) { + assertTime(0, 100) { client.getA("/accounts/merchant/taler-revenue/history?delta=-6&start=14&long_poll_ms=1000") .assertHistory(5) } // Check no polling when find transaction - assertTime(0, 200) { + assertTime(0, 100) { client.getA("/accounts/merchant/taler-revenue/history?delta=6&long_poll_ms=1000") .assertHistory(5) } coroutineScope { launch { // Check polling succeed forward - assertTime(200, 300) { + assertTime(100, 200) { client.getA("/accounts/merchant/taler-revenue/history?delta=2&start=13&long_poll_ms=1000") .assertHistory(1) } } launch { // Check polling timeout forward - assertTime(200, 400) { - client.getA("/accounts/merchant/taler-revenue/history?delta=1&start=15&long_poll_ms=300") + assertTime(200, 300) { + client.getA("/accounts/merchant/taler-revenue/history?delta=1&start=16&long_poll_ms=200") .assertNoContent() } } - delay(200) + delay(100) transfer("KUDOS:10") } diff --git a/bank/src/test/kotlin/WireGatewayApiTest.kt b/bank/src/test/kotlin/WireGatewayApiTest.kt @@ -184,12 +184,7 @@ class WireGatewayApiTest { json { "amount" to "KUDOS:9" } }.assertOkJson<BankAccountCreateWithdrawalResponse> { val uuid = it.taler_withdraw_uri.split("/").last() - client.post("/taler-integration/withdrawal-operation/${uuid}") { - json { - "reserve_pub" to randEddsaPublicKey() - "selected_exchange" to exchangePayto - } - }.assertOk() + withdrawalSelect(uuid) client.post("/withdrawals/${uuid}/confirm") { pwAuth("merchant") }.assertNoContent() @@ -204,65 +199,60 @@ class WireGatewayApiTest { .assertHistory(5) // Check no useless polling - assertTime(0, 200) { + assertTime(0, 100) { client.getA("/accounts/exchange/taler-wire-gateway/history/incoming?delta=-6&start=15&long_poll_ms=1000") .assertHistory(5) } // Check no polling when find transaction - assertTime(0, 200) { - client.getA("/accounts/exchange/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=60") + assertTime(0, 100) { + client.getA("/accounts/exchange/taler-wire-gateway/history/incoming?delta=6&long_poll_ms=1000") .assertHistory(5) } coroutineScope { launch { // Check polling succeed - assertTime(200, 300) { + assertTime(100, 200) { client.getA("/accounts/exchange/taler-wire-gateway/history/incoming?delta=2&start=14&long_poll_ms=1000") .assertHistory(1) } } launch { // Check polling timeout - assertTime(200, 400) { - client.getA("/accounts/exchange/taler-wire-gateway/history/incoming?delta=1&start=16&long_poll_ms=300") + assertTime(200, 300) { + client.getA("/accounts/exchange/taler-wire-gateway/history/incoming?delta=1&start=16&long_poll_ms=200") .assertNoContent() } } - delay(200) + delay(100) addIncoming("KUDOS:10") } // Test trigger by raw transaction coroutineScope { launch { - assertTime(200, 300) { + assertTime(100, 200) { client.getA("/accounts/exchange/taler-wire-gateway/history/incoming?delta=7&start=16&long_poll_ms=1000") .assertHistory(1) } } - delay(200) + delay(100) tx("merchant", "KUDOS:10", "exchange", IncomingTxMetadata(randShortHashCode()).encode()) } // Test trigger by withdraw operationr coroutineScope { launch { - assertTime(200, 300) { + assertTime(100, 200) { client.getA("/accounts/exchange/taler-wire-gateway/history/incoming?delta=7&start=18&long_poll_ms=1000") .assertHistory(1) } } - delay(200) + delay(100) client.postA("/accounts/merchant/withdrawals") { json { "amount" to "KUDOS:9" } }.assertOkJson<BankAccountCreateWithdrawalResponse> { val uuid = it.taler_withdraw_uri.split("/").last() - client.post("/taler-integration/withdrawal-operation/${uuid}") { - json { - "reserve_pub" to randEddsaPublicKey() - "selected_exchange" to exchangePayto - } - }.assertOk() + withdrawalSelect(uuid) client.postA("/withdrawals/${uuid}/confirm") .assertNoContent() } @@ -324,31 +314,31 @@ class WireGatewayApiTest { .assertHistory(5) // Check no useless polling - assertTime(0, 200) { + assertTime(0, 100) { client.getA("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=-6&start=15&long_poll_ms=1000") .assertHistory(5) } // Check no polling when find transaction - assertTime(0, 200) { + assertTime(0, 100) { client.getA("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=6&long_poll_ms=1000") .assertHistory(5) } coroutineScope { launch { // Check polling succeed forward - assertTime(200, 300) { + assertTime(100, 200) { client.getA("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=2&start=14&long_poll_ms=1000") .assertHistory(1) } } launch { // Check polling timeout forward - assertTime(200, 400) { - client.getA("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=1&start=16&long_poll_ms=300") + assertTime(200, 300) { + client.getA("/accounts/exchange/taler-wire-gateway/history/outgoing?delta=1&start=16&long_poll_ms=200") .assertNoContent() } } - delay(200) + delay(100) transfer("KUDOS:10") } diff --git a/bank/src/test/kotlin/helpers.kt b/bank/src/test/kotlin/helpers.kt @@ -12,6 +12,7 @@ import tech.libeufin.bank.* import java.io.ByteArrayOutputStream import java.io.File import java.util.zip.DeflaterOutputStream +import java.util.UUID import tech.libeufin.util.CryptoUtil import tech.libeufin.util.* @@ -172,6 +173,15 @@ suspend fun ApplicationTestBuilder.fillCashoutInfo(account: String) { }.assertNoContent() } +suspend fun ApplicationTestBuilder.withdrawalSelect(uuid: String) { + client.post("/taler-integration/withdrawal-operation/$uuid") { + json { + "reserve_pub" to randEddsaPublicKey() + "selected_exchange" to exchangePayto + } + }.assertOk() +} + suspend fun ApplicationTestBuilder.convert(amount: String): TalerAmount { return client.get("/cashout-rate?amount_debit=$amount") .assertOkJson<ConversionResponse>().amount_credit diff --git a/database-versioning/libeufin-bank-procedures.sql b/database-versioning/libeufin-bank-procedures.sql @@ -756,6 +756,9 @@ UPDATE taler_withdrawal_operations -- Register incoming transaction CALL register_incoming(reserve_pub_local, tx_row_id); + +-- Notify new transaction +PERFORM pg_notify('withdrawal_confirm', in_withdrawal_uuid::text); END $$; COMMENT ON FUNCTION confirm_taler_withdrawal IS 'Set a withdrawal operation as confirmed and wire the funds to the exchange.';