libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 71050ab44fbccb970ec530383cdeef42ca0cf928
parent c73f750472139b7ec872c1bf16a284de143ef998
Author: Antoine A <>
Date:   Fri, 26 Apr 2024 15:17:14 +0900

nexus: wire gateway /history/incoming

Diffstat:
Mbank/src/main/kotlin/tech/libeufin/bank/db/Database.kt | 6+++---
Mbank/src/test/kotlin/helpers.kt | 34----------------------------------
Mbank/src/test/kotlin/routines.kt | 1+
Mcommon/src/main/kotlin/db/helpers.kt | 51+++++++++++++++++++++++++++++++++++++++++++++++++++
Mcommon/src/main/kotlin/db/notifications.kt | 7+++++--
Acommon/src/main/kotlin/test/helpers.kt | 66++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mdatabase-versioning/libeufin-nexus-0003.sql | 4++--
Mdatabase-versioning/libeufin-nexus-procedures.sql | 1+
Mnexus/src/main/kotlin/tech/libeufin/nexus/Config.kt | 2++
Mnexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt | 2+-
Mnexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt | 4++--
Mnexus/src/main/kotlin/tech/libeufin/nexus/Main.kt | 4++--
Mnexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt | 12++++--------
Mnexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt | 32+++++++++++++++++++++++++++++---
Mnexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt | 32+++++++++++++++++++++++++++-----
Mnexus/src/test/kotlin/WireGatewayApiTest.kt | 34+++++++++++++++++-----------------
Mnexus/src/test/kotlin/helpers.kt | 42++++++++++++++++++++++++++----------------
Anexus/src/test/kotlin/routines.kt | 152+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtestbench/src/test/kotlin/IntegrationTest.kt | 6++++--
19 files changed, 395 insertions(+), 97 deletions(-)

diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt @@ -96,13 +96,13 @@ class Database(dbConfig: DatabaseConfig, internal val bankCurrency: String, inte /** Listen for new bank transactions for [account] */ suspend fun <R> listenBank(account: Long, lambda: suspend (Flow<Long>) -> R): R = listen(bankTxFlows, account, lambda) - /** Listen for new taler outgoing transactions from [account] */ + /** Listen for new taler outgoing transactions from [exchange] */ suspend fun <R> listenOutgoing(exchange: Long, lambda: suspend (Flow<Long>) -> R): R = listen(outgoingTxFlows, exchange, lambda) - /** Listen for new taler incoming transactions to [account] */ + /** Listen for new taler incoming transactions to [exchange] */ suspend fun <R> listenIncoming(exchange: Long, lambda: suspend (Flow<Long>) -> R): R = listen(incomingTxFlows, exchange, lambda) - /** Listen for new taler outgoing transactions to [account] */ + /** Listen for new taler outgoing transactions to [merchant] */ suspend fun <R> listenRevenue(merchant: Long, lambda: suspend (Flow<Long>) -> R): R = listen(revenueTxFlows, merchant, lambda) /** Listen for new withdrawal confirmations */ diff --git a/bank/src/test/kotlin/helpers.kt b/bank/src/test/kotlin/helpers.kt @@ -343,15 +343,6 @@ suspend fun HttpResponse.assertChallenge( } } -suspend fun assertTime(min: Int, max: Int, lambda: suspend () -> Unit) { - val start = System.currentTimeMillis() - lambda() - val end = System.currentTimeMillis() - val time = end - start - assert(time >= min) { "Expected to last at least $min ms, lasted $time" } - assert(time <= max) { "Expected to last at most $max ms, lasted $time" } -} - fun assertException(msg: String, lambda: () -> Unit) { try { lambda() @@ -361,31 +352,6 @@ fun assertException(msg: String, lambda: () -> Unit) { } } -suspend inline fun <reified B> HttpResponse.assertHistoryIds(size: Int, ids: (B) -> List<Long>): B { - assertOk() - val body = json<B>() - val history = ids(body) - val params = PageParams.extract(call.request.url.parameters) - - // testing the size is like expected. - assertEquals(size, history.size, "bad history length: $history") - if (params.delta < 0) { - // testing that the first id is at most the 'start' query param. - assert(history[0] <= params.start) { "bad history start: $params $history" } - // testing that the id decreases. - if (history.size > 1) - assert(history.windowed(2).all { (a, b) -> a > b }) { "bad history order: $history" } - } else { - // testing that the first id is at least the 'start' query param. - assert(history[0] >= params.start) { "bad history start: $params $history" } - // testing that the id increases. - if (history.size > 1) - assert(history.windowed(2).all { (a, b) -> a < b }) { "bad history order: $history" } - } - - return body -} - /* ----- Body helper ----- */ suspend inline fun <reified B> HttpResponse.assertOkJson(lambda: (B) -> Unit = {}): B { diff --git a/bank/src/test/kotlin/routines.kt b/bank/src/test/kotlin/routines.kt @@ -28,6 +28,7 @@ import kotlinx.serialization.json.JsonObject import tech.libeufin.bank.BankAccountCreateWithdrawalResponse import tech.libeufin.bank.WithdrawalStatus import tech.libeufin.common.* +import tech.libeufin.common.test.* import kotlin.test.assertEquals // Test endpoint is correctly authenticated diff --git a/common/src/main/kotlin/db/helpers.kt b/common/src/main/kotlin/db/helpers.kt @@ -110,4 +110,55 @@ suspend fun <T> DbPool.poolHistory( } else { load() } +} + +/** +* The following function returns the list of transactions, according +* to the history parameters and perform long polling when necessary +*/ +suspend fun <T> DbPool.poolHistoryGlobal( + params: HistoryParams, + listen: suspend (suspend (Flow<Long>) -> List<T>) -> List<T>, + query: String, + idColumnValue: String, + map: (ResultSet) -> T +): List<T> { + + suspend fun load(): List<T> = page( + params.page, + idColumnValue, + query, + map=map + ) + + + // TODO do we want to handle polling when going backward and there is no transactions yet ? + // When going backward there is always at least one transaction or none + return if (params.page.delta >= 0 && params.polling.poll_ms > 0) { + listen { flow -> + coroutineScope { + // Start buffering notification before loading transactions to not miss any + val polling = launch { + withTimeoutOrNull(params.polling.poll_ms) { + flow.first { it > params.page.start } // Always forward so > + } + } + // Initial loading + val init = load() + // Long polling if we found no transactions + if (init.isEmpty()) { + if (polling.join() != null) { + load() + } else { + init + } + } else { + polling.cancel() + init + } + } + } + } else { + load() + } } \ No newline at end of file diff --git a/common/src/main/kotlin/db/notifications.kt b/common/src/main/kotlin/db/notifications.kt @@ -30,7 +30,10 @@ import java.util.* import java.util.concurrent.ConcurrentHashMap // SharedFlow that are manually counted for manual garbage collection -class CountedSharedFlow<T>(val flow: MutableSharedFlow<T>, var count: Int) +class CountedSharedFlow<T> { + val flow: MutableSharedFlow<T> = MutableSharedFlow() + var count: Int = 0 +} fun watchNotifications( pgSource: PGSimpleDataSource, @@ -73,7 +76,7 @@ fun watchNotifications( suspend fun <R, K, V> listen(map: ConcurrentHashMap<K, CountedSharedFlow<V>>, key: K, lambda: suspend (Flow<V>) -> R): R { // Register listener, create a new flow if missing val flow = map.compute(key) { _, v -> - val tmp = v ?: CountedSharedFlow(MutableSharedFlow(), 0) + val tmp = v ?: CountedSharedFlow() tmp.count++ tmp }!!.flow diff --git a/common/src/main/kotlin/test/helpers.kt b/common/src/main/kotlin/test/helpers.kt @@ -0,0 +1,65 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.common.test + +import io.ktor.client.* +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.http.* +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.test.assertNotNull +import tech.libeufin.common.* + +/* ----- Assert ----- */ + +suspend fun assertTime(min: Int, max: Int, lambda: suspend () -> Unit) { + val start = System.currentTimeMillis() + lambda() + val end = System.currentTimeMillis() + val time = end - start + assert(time >= min) { "Expected to last at least $min ms, lasted $time" } + assert(time <= max) { "Expected to last at most $max ms, lasted $time" } +} + +suspend inline fun <reified B> HttpResponse.assertHistoryIds(size: Int, ids: (B) -> List<Long>): B { + assertOk() + val body = json<B>() + val history = ids(body) + val params = PageParams.extract(call.request.url.parameters) + + // testing the size is like expected. + assertEquals(size, history.size, "bad history length: $history") + if (params.delta < 0) { + // testing that the first id is at most the 'start' query param. + assert(history[0] <= params.start) { "bad history start: $params $history" } + // testing that the id decreases. + if (history.size > 1) + assert(history.windowed(2).all { (a, b) -> a > b }) { "bad history order: $history" } + } else { + // testing that the first id is at least the 'start' query param. + assert(history[0] >= params.start) { "bad history start: $params $history" } + // testing that the id increases. + if (history.size > 1) + assert(history.windowed(2).all { (a, b) -> a < b }) { "bad history order: $history" } + } + + return body +} +\ No newline at end of file diff --git a/database-versioning/libeufin-nexus-0003.sql b/database-versioning/libeufin-nexus-0003.sql @@ -23,7 +23,7 @@ CREATE TABLE IF NOT EXISTS talerable_outgoing_transactions (talerable_outgoing_transaction_id INT8 GENERATED BY DEFAULT AS IDENTITY UNIQUE ,initiated_outgoing_transaction_id INT8 UNIQUE REFERENCES initiated_outgoing_transactions(initiated_outgoing_transaction_id) ON DELETE CASCADE ,outgoing_transaction_id INT8 UNIQUE REFERENCES outgoing_transactions(outgoing_transaction_id) ON DELETE CASCADE - ,CONSTRAINT tx_link CHECK ((initiated_outgoing_transaction_id IS NULL) != (outgoing_transaction_id IS NULL)) + ,CONSTRAINT tx_link CHECK ((initiated_outgoing_transaction_id IS NOT NULL) OR (outgoing_transaction_id IS NOT NULL)) ,request_uid BYTEA UNIQUE CHECK (LENGTH(request_uid)=64) ,wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32) ,exchange_base_url TEXT NOT NULL @@ -31,7 +31,7 @@ CREATE TABLE IF NOT EXISTS talerable_outgoing_transactions COMMENT ON COLUMN talerable_outgoing_transactions.initiated_outgoing_transaction_id IS 'If the transaction have been initiated'; COMMENT ON COLUMN talerable_outgoing_transactions.outgoing_transaction_id - IS 'If the transaction have been recovered'; + IS 'If the transaction have been recovered or done'; COMMENT ON CONSTRAINT tx_link ON talerable_outgoing_transactions IS 'A transaction is either initiated or recovered'; diff --git a/database-versioning/libeufin-nexus-procedures.sql b/database-versioning/libeufin-nexus-procedures.sql @@ -239,6 +239,7 @@ IF NOT EXISTS(SELECT 1 FROM talerable_incoming_transactions WHERE incoming_trans out_tx_id ,in_reserve_public_key ); + PERFORM pg_notify('incoming_tx', out_tx_id::text); END IF; END $$; COMMENT ON FUNCTION register_incoming_and_talerable IS ' diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt @@ -52,6 +52,8 @@ class NexusConfig(val config: TalerConfig) { bic = requireString("bic"), name = requireString("name") ) + /** Bank account payto */ + val payto = IbanPayto.build(account.iban, account.bic, account.name) /** Path where we store the bank public keys */ val bankPublicKeysPath = requirePath("bank_public_keys_file") /** Path where we store our private keys */ diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt @@ -366,7 +366,7 @@ class EbicsFetch: CliktCommand("Fetches EBICS files") { val cfg = extractEbicsConfig(common.config) val dbCfg = cfg.config.dbConfig() - Database(dbCfg).use { db -> + Database(dbCfg, cfg.currency).use { db -> val (clientKeys, bankKeys) = expectFullKeys(cfg) val ctx = FetchContext( cfg, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt @@ -65,7 +65,7 @@ data class SubmissionContext( private suspend fun submitInitiatedPayment( ctx: SubmissionContext, payment: InitiatedPayment -): String { +): String { val creditAccount = try { val payto = Payto.parse(payment.creditPaytoUri).expectIban() IbanAccountMetadata( @@ -157,7 +157,7 @@ class EbicsSubmit : CliktCommand("Submits any initiated payment found in the dat httpClient = HttpClient(), fileLogger = FileLogger(ebicsLog) ) - Database(dbCfg).use { db -> + Database(dbCfg, cfg.currency).use { db -> val frequency: Duration = if (transient) { logger.info("Transient mode: submitting what found and returning.") Duration.ZERO diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt @@ -130,7 +130,7 @@ class InitiatePayment: CliktCommand("Initiate an outgoing payment") { Base32Crockford.encode(bytes) } - Database(dbCfg).use { db -> + Database(dbCfg, currency).use { db -> db.initiated.create( InitiatedPayment( id = -1, @@ -273,7 +273,7 @@ class FakeIncoming: CliktCommand("Genere a fake incoming payment") { Base32Crockford.encode(bytes) } - Database(dbCfg).use { db -> + Database(dbCfg, currency).use { db -> ingestIncomingPayment(db, IncomingPayment( amount = amount, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt @@ -66,26 +66,22 @@ fun Routing.wireGatewayApi(db: Database, cfg: NexusConfig) { ) } } - /* suspend fun <T> PipelineContext<Unit, ApplicationCall>.historyEndpoint( reduce: (List<T>, String) -> Any, - dbLambda: suspend ExchangeDAO.(HistoryParams, Long, BankPaytoCtx) -> List<T> + dbLambda: suspend ExchangeDAO.(HistoryParams) -> List<T> ) { val params = HistoryParams.extract(context.request.queryParameters) - val bankAccount = call.bankInfo(db, ctx.payto) - - val items = db.exchange.dbLambda(params, bankAccount.bankAccountId, ctx.payto) - val + val items = db.exchange.dbLambda(params) if (items.isEmpty()) { call.respond(HttpStatusCode.NoContent) } else { - call.respond(reduce(items, bankAccount.payto)) + call.respond(reduce(items, cfg.payto)) } } get("/taler-wire-gateway/history/incoming") { historyEndpoint(::IncomingHistory, ExchangeDAO::incomingHistory) } - get("/taler-wire-gateway/history/outgoing") { + /*get("/taler-wire-gateway/history/outgoing") { historyEndpoint(::OutgoingHistory, ExchangeDAO::outgoingHistory) }*/ post("/taler-wire-gateway/admin/add-incoming") { diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt @@ -18,9 +18,12 @@ */ package tech.libeufin.nexus.db +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.slf4j.Logger +import org.slf4j.LoggerFactory import tech.libeufin.common.TalerAmount -import tech.libeufin.common.db.DatabaseConfig -import tech.libeufin.common.db.DbPool +import tech.libeufin.common.db.* import java.time.Instant /** @@ -39,8 +42,31 @@ data class InitiatedPayment( /** * Collects database connection steps and any operation on the Nexus tables. */ -class Database(dbConfig: DatabaseConfig): DbPool(dbConfig, "libeufin_nexus") { +class Database(dbConfig: DatabaseConfig, val bankCurrency: String): DbPool(dbConfig, "libeufin_nexus") { val payment = PaymentDAO(this) val initiated = InitiatedDAO(this) val exchange = ExchangeDAO(this) + + private val outgoingTxFlows: MutableSharedFlow<Long> = MutableSharedFlow() + private val incomingTxFlows: MutableSharedFlow<Long> = MutableSharedFlow() + + init { + watchNotifications(pgSource, "libeufin_nexus", LoggerFactory.getLogger("libeufin-nexus-db-watcher"), mapOf( + "outgoing_tx" to { + val id = it.toLong() + outgoingTxFlows.emit(id) + }, + "incoming_tx" to { + val id = it.toLong() + incomingTxFlows.emit(id) + } + )) + } + + /** Listen for new taler outgoing transactions */ + suspend fun <R> listenOutgoing(lambda: suspend (Flow<Long>) -> R): R + = lambda(outgoingTxFlows as Flow<Long>) + /** Listen for new taler incoming transactions */ + suspend fun <R> listenIncoming(lambda: suspend (Flow<Long>) -> R): R + = lambda(incomingTxFlows as Flow<Long>) } \ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt @@ -19,16 +19,38 @@ package tech.libeufin.nexus.db -import tech.libeufin.common.db.one -import tech.libeufin.common.db.getTalerTimestamp -import tech.libeufin.common.micros -import tech.libeufin.common.TalerProtocolTimestamp -import tech.libeufin.common.TransferRequest +import tech.libeufin.common.db.* +import tech.libeufin.common.* import java.sql.ResultSet import java.time.Instant /** Data access logic for exchange specific logic */ class ExchangeDAO(private val db: Database) { + /** Query history of taler incoming transactions */ + suspend fun incomingHistory( + params: HistoryParams + ): List<IncomingReserveTransaction> + = db.poolHistoryGlobal(params, db::listenIncoming, """ + SELECT + tit.incoming_transaction_id + ,execution_time + ,(amount).val AS amount_val + ,(amount).frac AS amount_frac + ,debit_payto_uri + ,reserve_public_key + FROM talerable_incoming_transactions AS tit + JOIN incoming_transactions AS it + ON tit.incoming_transaction_id=it.incoming_transaction_id + WHERE + """, "tit.incoming_transaction_id") { + IncomingReserveTransaction( + row_id = it.getLong("incoming_transaction_id"), + date = it.getTalerTimestamp("execution_time"), + amount = it.getAmount("amount", db.bankCurrency), + debit_account = it.getString("debit_payto_uri"), + reserve_pub = EddsaPublicKey(it.getBytes("reserve_public_key")), + ) + } /** Result of taler transfer transaction creation */ sealed interface TransferResult { diff --git a/nexus/src/test/kotlin/WireGatewayApiTest.kt b/nexus/src/test/kotlin/WireGatewayApiTest.kt @@ -24,6 +24,7 @@ import io.ktor.http.* import io.ktor.server.testing.* import org.junit.Test import tech.libeufin.common.* +import tech.libeufin.nexus.* class WireGatewayApiTest { // GET /accounts/{USERNAME}/taler-wire-gateway/config @@ -100,46 +101,45 @@ class WireGatewayApiTest { } }.assertBadRequest() } - /* + /** * Testing the /history/incoming call from the TWG API. */ @Test - fun historyIncoming() = serverSetup { - // Give Foo reasonable debt allowance: - setMaxDebt("merchant", "KUDOS:1000") - authRoutine(HttpMethod.Get, "/accounts/merchant/taler-wire-gateway/history/incoming") + fun historyIncoming() = serverSetup { db -> + //authRoutine(HttpMethod.Get, "/taler-wire-gateway/history/incoming") historyRoutine<IncomingHistory>( - url = "/accounts/exchange/taler-wire-gateway/history/incoming", + url = "/taler-wire-gateway/history/incoming", ids = { it.incoming_transactions.map { it.row_id } }, registered = listOf( { - // Transactions using clean add incoming logic - addIncoming("KUDOS:10") + client.post("/taler-wire-gateway/admin/add-incoming") { + json { + "amount" to "CHF:12" + "reserve_pub" to EddsaPublicKey.rand() + "debit_account" to grothoffPayto + } + }.assertOk() }, { // Transactions using raw bank transaction logic - tx("merchant", "KUDOS:10", "exchange", "history test with ${ShortHashCode.rand()} reserve pub") - }, - { - // Transaction using withdraw logic - withdrawal("KUDOS:9") + ingestIncomingPayment(db, genInPay("history test with ${ShortHashCode.rand()} reserve pub")) } ), ignored = listOf( { // Ignore malformed incoming transaction - tx("merchant", "KUDOS:10", "exchange", "ignored") + ingestIncomingPayment(db, genInPay("ignored")) }, { - // Ignore malformed outgoing transaction - tx("exchange", "KUDOS:10", "merchant", "ignored") + // Ignore outgoing transaction + ingestOutgoingPayment(db, genOutPay("ignored")) } ) ) } - + /* /** * Testing the /history/outgoing call from the TWG API. */ diff --git a/nexus/src/test/kotlin/helpers.kt b/nexus/src/test/kotlin/helpers.kt @@ -24,10 +24,8 @@ import io.ktor.client.statement.* import io.ktor.http.* import io.ktor.server.testing.* import kotlinx.coroutines.runBlocking -import tech.libeufin.common.TalerAmount -import tech.libeufin.common.db.dbInit -import tech.libeufin.common.db.pgDataSource -import tech.libeufin.common.fromFile +import tech.libeufin.common.* +import tech.libeufin.common.db.* import tech.libeufin.nexus.* import tech.libeufin.nexus.db.Database import tech.libeufin.nexus.db.InitiatedPayment @@ -49,10 +47,10 @@ fun setup( ) = runBlocking { val config = NEXUS_CONFIG_SOURCE.fromFile(Path("conf/$conf")) val dbCfg = config.dbConfig() - val ctx = NexusConfig(config) + val cfg = NexusConfig(config) pgDataSource(dbCfg.dbConnStr).dbInit(dbCfg, "libeufin-nexus", true) - Database(dbCfg).use { - lambda(it, ctx) + Database(dbCfg, cfg.currency).use { + lambda(it, cfg) } } @@ -98,21 +96,33 @@ fun genInitPay( ) // Generates an incoming payment, given its subject. -fun genInPay(subject: String) = - IncomingPayment( - amount = TalerAmount(44, 0, "KUDOS"), +fun genInPay(subject: String, amount: String = "KUDOS:44"): IncomingPayment { + val bankId = run { + val bytes = ByteArray(16) + kotlin.random.Random.nextBytes(bytes) + Base32Crockford.encode(bytes) + } + return IncomingPayment( + amount = TalerAmount(amount), debitPaytoUri = "payto://iban/not-used", wireTransferSubject = subject, executionTime = Instant.now(), - bankId = "entropic" + bankId = bankId ) +} // Generates an outgoing payment, given its subject and messageId -fun genOutPay(subject: String, messageId: String) = - OutgoingPayment( +fun genOutPay(subject: String, messageId: String? = null): OutgoingPayment { + val id = messageId ?: run { + val bytes = ByteArray(16) + kotlin.random.Random.nextBytes(bytes) + Base32Crockford.encode(bytes) + } + return OutgoingPayment( amount = TalerAmount(44, 0, "KUDOS"), creditPaytoUri = "payto://iban/CH4189144589712575493?receiver-name=Test", wireTransferSubject = subject, executionTime = Instant.now(), - messageId = messageId - ) -\ No newline at end of file + messageId = id + ) +} +\ No newline at end of file diff --git a/nexus/src/test/kotlin/routines.kt b/nexus/src/test/kotlin/routines.kt @@ -0,0 +1,151 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.http.* +import io.ktor.server.testing.* +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.serialization.json.JsonObject +import tech.libeufin.common.* +import tech.libeufin.common.test.* +import kotlin.test.assertEquals + +suspend inline fun <reified B> ApplicationTestBuilder.historyRoutine( + url: String, + crossinline ids: (B) -> List<Long>, + registered: List<suspend () -> Unit>, + ignored: List<suspend () -> Unit> = listOf(), + polling: Boolean = true, + auth: String? = null +) { + // Get history + val history: suspend (String) -> HttpResponse = { params: String -> + client.get("$url?$params") { + //pwAuth(auth) + } + } + // Check history is following specs + val assertHistory: suspend HttpResponse.(Int) -> Unit = { size: Int -> + assertHistoryIds<B>(size, ids) + } + // Get latest registered id + val latestId: suspend () -> Long = { + history("delta=-1").assertOkJson<B>().run { ids(this)[0] } + } + + // Check error when no transactions + history("delta=7").assertNoContent() + + // Run interleaved registered and ignore transactions + val registered_iter = registered.iterator() + val ignored_iter = ignored.iterator() + while (registered_iter.hasNext() || ignored_iter.hasNext()) { + if (registered_iter.hasNext()) registered_iter.next()() + if (ignored_iter.hasNext()) ignored_iter.next()() + } + + + val nbRegistered = registered.size + val nbIgnored = ignored.size + val nbTotal = nbRegistered + nbIgnored + + // Check ignored + history("delta=$nbTotal").assertHistory(nbRegistered) + // Check skip ignored + history("delta=$nbRegistered").assertHistory(nbRegistered) + + if (polling) { + // Check no polling when we cannot have more transactions + assertTime(0, 100) { + history("delta=-${nbRegistered+1}&long_poll_ms=1000") + .assertHistory(nbRegistered) + } + // Check no polling when already find transactions even if less than delta + assertTime(0, 100) { + history("delta=${nbRegistered+1}&long_poll_ms=1000") + .assertHistory(nbRegistered) + } + + // Check polling + coroutineScope { + val id = latestId() + launch { // Check polling succeed + assertTime(100, 200) { + history("delta=2&start=$id&long_poll_ms=1000") + .assertHistory(1) + } + } + launch { // Check polling timeout + assertTime(200, 300) { + history("delta=1&start=${id+nbTotal*3}&long_poll_ms=200") + .assertNoContent() + } + } + delay(100) + registered[0]() + } + + // Test triggers + for (register in registered) { + coroutineScope { + val id = latestId() + launch { + assertTime(100, 200) { + history("delta=7&start=$id&long_poll_ms=1000") + .assertHistory(1) + } + } + delay(100) + register() + } + } + + // Test doesn't trigger + coroutineScope { + val id = latestId() + launch { + assertTime(200, 300) { + history("delta=7&start=$id&long_poll_ms=200") + .assertNoContent() + } + } + delay(100) + for (ignore in ignored) { + ignore() + } + } + } + + // Testing ranges. + repeat(20) { + registered[0]() + } + val id = latestId() + // Default + history("").assertHistory(20) + // forward range: + history("delta=10").assertHistory(10) + history("delta=10&start=4").assertHistory(10) + // backward range: + history("delta=-10").assertHistory(10) + history("delta=-10&start=${id-4}").assertHistory(10) +} +\ No newline at end of file diff --git a/testbench/src/test/kotlin/IntegrationTest.kt b/testbench/src/test/kotlin/IntegrationTest.kt @@ -68,8 +68,10 @@ fun server(lambda: () -> Unit) { fun setup(conf: String, lambda: suspend (NexusDb) -> Unit) { try { runBlocking { - val cfg = loadConfig(Path(conf)).dbConfig() - NexusDb(cfg).use { + val cfg = loadConfig(Path(conf)) + val dbCfg = cfg.dbConfig() + val currency = cfg.requireString("nexus-ebics", "currency") + NexusDb(dbCfg, currency).use { lambda(it) } }