commit 7243876ab11582fed2ae21eedb2a2807a2c19318 parent 5cc21948754598ab23519da826e47e6202696970 Author: Antoine A <> Date: Tue, 7 May 2024 13:19:57 +0900 Merge remote-tracking branch 'origin/v11-dev' Diffstat:
60 files changed, 1552 insertions(+), 532 deletions(-)
diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Config.kt b/bank/src/main/kotlin/tech/libeufin/bank/Config.kt @@ -68,11 +68,6 @@ data class ConversionRate ( val cashout_min_amount: TalerAmount, ) -sealed interface ServerConfig { - data class Unix(val path: String, val mode: Int): ServerConfig - data class Tcp(val addr: String, val port: Int): ServerConfig -} - fun talerConfig(configPath: Path?): TalerConfig = BANK_CONFIG_SOURCE.fromFile(configPath) fun TalerConfig.loadDbConfig(): DatabaseConfig { @@ -82,14 +77,6 @@ fun TalerConfig.loadDbConfig(): DatabaseConfig { ) } -fun TalerConfig.loadServerConfig(): ServerConfig { - return when (val method = requireString("libeufin-bank", "serve")) { - "tcp" -> ServerConfig.Tcp(lookupString("libeufin-bank", "address") ?: requireString("libeufin-bank", "bind_to"), requireNumber("libeufin-bank", "port")) - "unix" -> ServerConfig.Unix(requireString("libeufin-bank", "unixpath"), requireNumber("libeufin-bank", "unixpath_mode")) - else -> throw TalerConfigError.invalid("server method", "libeufin-bank", "serve", "expected 'tcp' or 'unix' got '$method'") - } -} - fun TalerConfig.loadBankConfig(): BankConfig { val regionalCurrency = requireString("libeufin-bank", "currency") var fiatCurrency: String? = null diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt b/bank/src/main/kotlin/tech/libeufin/bank/Constants.kt @@ -40,4 +40,3 @@ const val IBAN_ALLOCATION_RETRY_COUNTER: Int = 5 const val COREBANK_API_VERSION: String = "4:8:0" const val CONVERSION_API_VERSION: String = "0:1:0" const val INTEGRATION_API_VERSION: String = "2:0:2" -const val REVENUE_API_VERSION: String = "0:0:0" -\ No newline at end of file diff --git a/bank/src/main/kotlin/tech/libeufin/bank/Main.kt b/bank/src/main/kotlin/tech/libeufin/bank/Main.kt @@ -62,8 +62,7 @@ import kotlin.io.path.exists import kotlin.io.path.readText private val logger: Logger = LoggerFactory.getLogger("libeufin-bank") -// Dirty local variable to stop the server in test TODO remove this ugly hack -var engine: ApplicationEngine? = null + /** @@ -117,7 +116,7 @@ class ServeBank : CliktCommand("Run libeufin-bank HTTP server", name = "serve") val cfg = talerConfig(common.config) val ctx = cfg.loadBankConfig() val dbCfg = cfg.loadDbConfig() - val serverCfg = cfg.loadServerConfig() + val serverCfg = cfg.loadServerConfig("libeufin-bank") Database(dbCfg, ctx.regionalCurrency, ctx.fiatCurrency).use { db -> if (ctx.allowConversion) { logger.info("Ensure exchange account exists") @@ -142,25 +141,9 @@ class ServeBank : CliktCommand("Run libeufin-bank HTTP server", name = "serve") db.conn { it.execSQLUpdate(sqlProcedures.readText()) } // Remove conversion info from the database ? } - - val env = applicationEngineEnvironment { - when (serverCfg) { - is ServerConfig.Tcp -> { - for (addr in InetAddress.getAllByName(serverCfg.addr)) { - connector { - port = serverCfg.port - host = addr.hostAddress - } - } - } - is ServerConfig.Unix -> - throw Exception("Can only serve libeufin-bank via TCP") - } - module { corebankWebApp(db, ctx) } + serve(serverCfg) { + corebankWebApp(db, ctx) } - val local = embeddedServer(Netty, env) - engine = local - local.start(wait = true) } } } diff --git a/bank/src/main/kotlin/tech/libeufin/bank/TalerMessage.kt b/bank/src/main/kotlin/tech/libeufin/bank/TalerMessage.kt @@ -329,14 +329,6 @@ data class TalerIntegrationConfigResponse( val version: String = INTEGRATION_API_VERSION } -@Serializable -data class RevenueConfig( - val currency: String -) { - val name: String = "taler-revenue" - val version: String = REVENUE_API_VERSION -} - enum class CreditDebitInfo { credit, debit } @@ -550,21 +542,6 @@ data class ConversionResponse( val amount_credit: TalerAmount, ) -@Serializable -data class RevenueIncomingHistory( - val incoming_transactions : List<RevenueIncomingBankTransaction>, - val credit_account: String -) - -@Serializable -data class RevenueIncomingBankTransaction( - val row_id: Long, - val date: TalerProtocolTimestamp, - val amount: TalerAmount, - val debit_account: String, - val subject: String -) - /** * Response to GET /public-accounts */ diff --git a/bank/src/main/kotlin/tech/libeufin/bank/api/CoreBankApi.kt b/bank/src/main/kotlin/tech/libeufin/bank/api/CoreBankApi.kt @@ -215,8 +215,7 @@ suspend fun createAccount( when (cfg.wireMethod) { WireMethod.IBAN -> { - if (req.payto_uri != null && !(req.payto_uri is IbanPayto)) - throw badRequest("Expected an IBAN payto uri") + req.payto_uri?.expectRequestIban() var retry = if (req.payto_uri == null) IBAN_ALLOCATION_RETRY_COUNTER else 0 while (true) { @@ -232,10 +231,9 @@ suspend fun createAccount( } WireMethod.X_TALER_BANK -> { if (req.payto_uri != null) { - if (!(req.payto_uri is XTalerBankPayto)) - throw badRequest("Expected an IBAN payto uri") - else if (req.payto_uri.username != req.username) - throw badRequest("Expected a payto uri for '${req.username}' got one for '${req.payto_uri.username}'") + val payto = req.payto_uri.expectRequestXTalerBank() + if (payto.username != req.username) + throw badRequest("Expected a payto uri for '${req.username}' got one for '${payto.username}'") } val internalPayto = XTalerBankPayto.forUsername(req.username) diff --git a/bank/src/main/kotlin/tech/libeufin/bank/auth/auth.kt b/bank/src/main/kotlin/tech/libeufin/bank/auth/auth.kt @@ -28,6 +28,7 @@ import io.ktor.util.pipeline.* import tech.libeufin.bank.* import tech.libeufin.bank.db.Database import tech.libeufin.common.* +import tech.libeufin.common.api.* import tech.libeufin.common.crypto.PwCrypto import java.time.Instant diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt @@ -96,13 +96,13 @@ class Database(dbConfig: DatabaseConfig, internal val bankCurrency: String, inte /** Listen for new bank transactions for [account] */ suspend fun <R> listenBank(account: Long, lambda: suspend (Flow<Long>) -> R): R = listen(bankTxFlows, account, lambda) - /** Listen for new taler outgoing transactions from [account] */ + /** Listen for new taler outgoing transactions from [exchange] */ suspend fun <R> listenOutgoing(exchange: Long, lambda: suspend (Flow<Long>) -> R): R = listen(outgoingTxFlows, exchange, lambda) - /** Listen for new taler incoming transactions to [account] */ + /** Listen for new taler incoming transactions to [exchange] */ suspend fun <R> listenIncoming(exchange: Long, lambda: suspend (Flow<Long>) -> R): R = listen(incomingTxFlows, exchange, lambda) - /** Listen for new taler outgoing transactions to [account] */ + /** Listen for new incoming transactions to [merchant] */ suspend fun <R> listenRevenue(merchant: Long, lambda: suspend (Flow<Long>) -> R): R = listen(revenueTxFlows, merchant, lambda) /** Listen for new withdrawal confirmations */ @@ -163,8 +163,4 @@ enum class AbortResult { Success, UnknownOperation, AlreadyConfirmed -} - -fun ResultSet.getTalerTimestamp(name: String): TalerProtocolTimestamp{ - return TalerProtocolTimestamp(getLong(name).asInstant()) } \ No newline at end of file diff --git a/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt b/bank/src/main/kotlin/tech/libeufin/bank/helpers.kt @@ -37,6 +37,7 @@ import tech.libeufin.bank.auth.username import tech.libeufin.bank.db.AccountDAO.AccountCreationResult import tech.libeufin.bank.db.Database import tech.libeufin.common.* +import tech.libeufin.common.api.* import java.util.* fun ApplicationCall.uuidPath(name: String): UUID { @@ -133,20 +134,6 @@ suspend fun createAdminAccount(db: Database, cfg: BankConfig, pw: String? = null ) } -fun Route.intercept(callback: Route.() -> Unit, interceptor: suspend PipelineContext<Unit, ApplicationCall>.() -> Unit): Route { - val subRoute = createChild(object : RouteSelector() { - override fun evaluate(context: RoutingResolveContext, segmentIndex: Int): RouteSelectorEvaluation = - RouteSelectorEvaluation.Constant - }) - subRoute.intercept(ApplicationCallPipeline.Plugins) { - interceptor() - proceed() - } - - callback(subRoute) - return subRoute -} - fun Route.conditional(implemented: Boolean, callback: Route.() -> Unit): Route = intercept(callback) { if (!implemented) { diff --git a/bank/src/test/kotlin/RevenueApiTest.kt b/bank/src/test/kotlin/RevenueApiTest.kt @@ -19,7 +19,6 @@ import io.ktor.http.* import org.junit.Test -import tech.libeufin.bank.RevenueIncomingHistory import tech.libeufin.common.* class RevenueApiTest { diff --git a/bank/src/test/kotlin/StatsTest.kt b/bank/src/test/kotlin/StatsTest.kt @@ -23,10 +23,8 @@ import tech.libeufin.bank.MonitorParams import tech.libeufin.bank.MonitorResponse import tech.libeufin.bank.MonitorWithConversion import tech.libeufin.bank.Timeframe -import tech.libeufin.common.ShortHashCode -import tech.libeufin.common.TalerAmount import tech.libeufin.common.db.executeQueryCheck -import tech.libeufin.common.micros +import tech.libeufin.common.* import java.time.Instant import java.time.LocalDateTime import java.time.ZoneOffset diff --git a/bank/src/test/kotlin/WireGatewayApiTest.kt b/bank/src/test/kotlin/WireGatewayApiTest.kt @@ -30,7 +30,7 @@ class WireGatewayApiTest { client.getA("/accounts/merchant/taler-wire-gateway/config").assertOk() } - // Testing the POST /transfer call from the TWG API. + // POST /accounts/{USERNAME}/taler-wire-gateway/transfer @Test fun transfer() = bankSetup { _ -> val valid_req = obj { @@ -121,9 +121,7 @@ class WireGatewayApiTest { }.assertBadRequest() } - /** - * Testing the /history/incoming call from the TWG API. - */ + // GET /accounts/{USERNAME}/taler-wire-gateway/history/incoming @Test fun historyIncoming() = bankSetup { // Give Foo reasonable debt allowance: @@ -159,10 +157,7 @@ class WireGatewayApiTest { ) } - - /** - * Testing the /history/outgoing call from the TWG API. - */ + // GET /accounts/{USERNAME}/taler-wire-gateway/history/outgoing @Test fun historyOutgoing() = bankSetup { setMaxDebt("exchange", "KUDOS:1000000") @@ -193,7 +188,7 @@ class WireGatewayApiTest { ) } - // Testing the /admin/add-incoming call from the TWG API. + // POST /accounts/{USERNAME}/taler-wire-gateway/admin/add-incoming @Test fun addIncoming() = bankSetup { _ -> val valid_req = obj { diff --git a/bank/src/test/kotlin/helpers.kt b/bank/src/test/kotlin/helpers.kt @@ -346,15 +346,6 @@ suspend fun HttpResponse.assertChallenge( } } -suspend fun assertTime(min: Int, max: Int, lambda: suspend () -> Unit) { - val start = System.currentTimeMillis() - lambda() - val end = System.currentTimeMillis() - val time = end - start - assert(time >= min) { "Expected to last at least $min ms, lasted $time" } - assert(time <= max) { "Expected to last at most $max ms, lasted $time" } -} - fun assertException(msg: String, lambda: () -> Unit) { try { lambda() @@ -364,47 +355,6 @@ fun assertException(msg: String, lambda: () -> Unit) { } } -suspend inline fun <reified B> HttpResponse.assertHistoryIds(size: Int, ids: (B) -> List<Long>): B { - assertOk() - val body = json<B>() - val history = ids(body) - val params = PageParams.extract(call.request.url.parameters) - - // testing the size is like expected. - assertEquals(size, history.size, "bad history length: $history") - if (params.delta < 0) { - // testing that the first id is at most the 'start' query param. - assert(history[0] <= params.start) { "bad history start: $params $history" } - // testing that the id decreases. - if (history.size > 1) - assert(history.windowed(2).all { (a, b) -> a > b }) { "bad history order: $history" } - } else { - // testing that the first id is at least the 'start' query param. - assert(history[0] >= params.start) { "bad history start: $params $history" } - // testing that the id increases. - if (history.size > 1) - assert(history.windowed(2).all { (a, b) -> a < b }) { "bad history order: $history" } - } - - return body -} - -/* ----- Body helper ----- */ - -suspend inline fun <reified B> HttpResponse.assertOkJson(lambda: (B) -> Unit = {}): B { - assertOk() - val body = json<B>() - lambda(body) - return body -} - -suspend inline fun <reified B> HttpResponse.assertAcceptedJson(lambda: (B) -> Unit = {}): B { - assertAccepted() - val body = json<B>() - lambda(body) - return body -} - /* ----- Auth ----- */ /** Auto auth get request */ @@ -449,7 +399,6 @@ fun HttpRequestBuilder.pwAuth(username: String? = null) { val login = url.pathSegments[2] basicAuth("$login", "$login-password") } - } /* ----- Random data generation ----- */ diff --git a/bank/src/test/kotlin/routines.kt b/bank/src/test/kotlin/routines.kt @@ -28,6 +28,7 @@ import kotlinx.serialization.json.JsonObject import tech.libeufin.bank.BankAccountCreateWithdrawalResponse import tech.libeufin.bank.WithdrawalStatus import tech.libeufin.common.* +import tech.libeufin.common.test.* import kotlin.test.assertEquals // Test endpoint is correctly authenticated @@ -40,6 +41,17 @@ suspend fun ApplicationTestBuilder.authRoutine( allowAdmin: Boolean = false ) { // No body when authentication must happen before parsing the body + + // No header + client.request(path) { + this.method = method + }.assertUnauthorized(TalerErrorCode.GENERIC_PARAMETER_MISSING) + + // Bad header + client.request(path) { + this.method = method + headers["Authorization"] = "WTF" + }.assertBadRequest(TalerErrorCode.GENERIC_HTTP_HEADERS_MALFORMED) // Unknown account client.request(path) { @@ -60,7 +72,7 @@ suspend fun ApplicationTestBuilder.authRoutine( }.assertUnauthorized() if (requireAdmin) { - // Not exchange account + // Not exchange account client.request(path) { this.method = method pwAuth("merchant") @@ -91,117 +103,11 @@ suspend inline fun <reified B> ApplicationTestBuilder.historyRoutine( polling: Boolean = true, auth: String? = null ) { - // Get history - val history: suspend (String) -> HttpResponse = { params: String -> + abstractHistoryRoutine(ids, registered, ignored, polling) { params: String -> client.get("$url?$params") { pwAuth(auth) } } - // Check history is following specs - val assertHistory: suspend HttpResponse.(Int) -> Unit = { size: Int -> - assertHistoryIds<B>(size, ids) - } - // Get latest registered id - val latestId: suspend () -> Long = { - history("delta=-1").assertOkJson<B>().run { ids(this)[0] } - } - - // Check error when no transactions - history("delta=7").assertNoContent() - - // Run interleaved registered and ignore transactions - val registered_iter = registered.iterator() - val ignored_iter = ignored.iterator() - while (registered_iter.hasNext() || ignored_iter.hasNext()) { - if (registered_iter.hasNext()) registered_iter.next()() - if (ignored_iter.hasNext()) ignored_iter.next()() - } - - - val nbRegistered = registered.size - val nbIgnored = ignored.size - val nbTotal = nbRegistered + nbIgnored - - // Check ignored - history("delta=$nbTotal").assertHistory(nbRegistered) - // Check skip ignored - history("delta=$nbRegistered").assertHistory(nbRegistered) - - if (polling) { - // Check no polling when we cannot have more transactions - assertTime(0, 100) { - history("delta=-${nbRegistered+1}&long_poll_ms=1000") - .assertHistory(nbRegistered) - } - // Check no polling when already find transactions even if less than delta - assertTime(0, 100) { - history("delta=${nbRegistered+1}&long_poll_ms=1000") - .assertHistory(nbRegistered) - } - - // Check polling - coroutineScope { - val id = latestId() - launch { // Check polling succeed - assertTime(100, 200) { - history("delta=2&start=$id&long_poll_ms=1000") - .assertHistory(1) - } - } - launch { // Check polling timeout - assertTime(200, 300) { - history("delta=1&start=${id+nbTotal*3}&long_poll_ms=200") - .assertNoContent() - } - } - delay(100) - registered[0]() - } - - // Test triggers - for (register in registered) { - coroutineScope { - val id = latestId() - launch { - assertTime(100, 200) { - history("delta=7&start=$id&long_poll_ms=1000") - .assertHistory(1) - } - } - delay(100) - register() - } - } - - // Test doesn't trigger - coroutineScope { - val id = latestId() - launch { - assertTime(200, 300) { - history("delta=7&start=$id&long_poll_ms=200") - .assertNoContent() - } - } - delay(100) - for (ignore in ignored) { - ignore() - } - } - } - - // Testing ranges. - repeat(20) { - registered[0]() - } - val id = latestId() - // Default - history("").assertHistory(20) - // forward range: - history("delta=10").assertHistory(10) - history("delta=10&start=4").assertHistory(10) - // backward range: - history("delta=-10").assertHistory(10) - history("delta=-10&start=${id-4}").assertHistory(10) } suspend inline fun <reified B> ApplicationTestBuilder.statusRoutine( diff --git a/common/src/main/kotlin/Cli.kt b/common/src/main/kotlin/Cli.kt @@ -70,6 +70,8 @@ fun cliCmd(logger: Logger, level: Level, lambda: suspend () -> Unit) { } }) } + } catch (e: ProgramResult) { + throw e } catch (e: Throwable) { e.fmtLog(logger) throw ProgramResult(1) diff --git a/common/src/main/kotlin/Client.kt b/common/src/main/kotlin/Client.kt @@ -63,7 +63,14 @@ suspend inline fun <reified B> HttpResponse.json(): B = Json.decodeFromString(kotlinx.serialization.serializer<B>(), bodyAsText()) suspend inline fun <reified B> HttpResponse.assertOkJson(lambda: (B) -> Unit = {}): B { - assertEquals(HttpStatusCode.OK, status) + assertOk() + val body = json<B>() + lambda(body) + return body +} + +suspend inline fun <reified B> HttpResponse.assertAcceptedJson(lambda: (B) -> Unit = {}): B { + assertAccepted() val body = json<B>() lambda(body) return body diff --git a/common/src/main/kotlin/Config.kt b/common/src/main/kotlin/Config.kt @@ -33,4 +33,17 @@ fun getVersion(): String { return Loader.getResource( "version.txt", ClassLoader.getSystemClassLoader() ).readText() +} + +sealed interface ServerConfig { + data class Unix(val path: String, val mode: Int): ServerConfig + data class Tcp(val addr: String, val port: Int): ServerConfig +} + +fun TalerConfig.loadServerConfig(section: String): ServerConfig { + return when (val method = requireString(section, "serve")) { + "tcp" -> ServerConfig.Tcp(lookupString(section, "address") ?: requireString(section, "bind_to"), requireNumber(section, "port")) + "unix" -> ServerConfig.Unix(requireString(section, "unixpath"), requireNumber(section, "unixpath_mode")) + else -> throw TalerConfigError.invalid("server method", section, "serve", "expected 'tcp' or 'unix' got '$method'") + } } \ No newline at end of file diff --git a/common/src/main/kotlin/Constants.kt b/common/src/main/kotlin/Constants.kt @@ -26,4 +26,5 @@ const val SERIALIZATION_RETRY: Int = 10 const val MAX_BODY_LENGTH: Long = 4 * 1024 // 4kB // API version -const val WIRE_GATEWAY_API_VERSION: String = "0:2:0" -\ No newline at end of file +const val WIRE_GATEWAY_API_VERSION: String = "0:2:0" +const val REVENUE_API_VERSION: String = "0:0:0" +\ No newline at end of file diff --git a/common/src/main/kotlin/TalerCommon.kt b/common/src/main/kotlin/TalerCommon.kt @@ -20,6 +20,7 @@ package tech.libeufin.common import io.ktor.http.* +import io.ktor.server.plugins.BadRequestException import kotlinx.serialization.KSerializer import kotlinx.serialization.Serializable import kotlinx.serialization.descriptors.PrimitiveKind @@ -67,7 +68,6 @@ data class TalerProtocolTimestamp( } else { encoder.encodeLong(value.epochSecond) } - } override fun deserialize(decoder: Decoder): Instant { @@ -292,10 +292,7 @@ sealed class Payto { /** Transform a payto URI to its bank form, using [name] as the receiver-name and the bank [ctx] */ fun bank(name: String, ctx: BankPaytoCtx): String = when (this) { - is IbanPayto -> { - val bic = if (ctx.bic != null) "${ctx.bic}/" else "" - "payto://iban/$bic$iban?receiver-name=${name.encodeURLParameter()}" - } + is IbanPayto -> IbanPayto.build(iban.toString(), ctx.bic, name) is XTalerBankPayto -> "payto://x-taler-bank/${ctx.hostname ?: "localhost"}/$username?receiver-name=${name.encodeURLParameter()}" } @@ -306,6 +303,14 @@ sealed class Payto { } } + fun expectRequestIban(): IbanPayto { + try { + return expectIban() + } catch (e: Exception) { + throw BadRequestException(e.message ?: "", e) + } + } + fun expectXTalerBank(): XTalerBankPayto { return when (this) { is XTalerBankPayto -> this @@ -313,6 +318,14 @@ sealed class Payto { } } + fun expectRequestXTalerBank(): XTalerBankPayto { + try { + return expectXTalerBank() + } catch (e: Exception) { + throw BadRequestException(e.message ?: "", e) + } + } + internal object Serializer : KSerializer<Payto> { override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("Payto", PrimitiveKind.STRING) @@ -393,10 +406,7 @@ class IbanPayto internal constructor( override fun toString(): String = parsed.toString() /** Transform an IBAN payto URI to its full form, using [defaultName] if receiver-name is missing */ - fun full(defaultName: String): String { - val bic = if (this.bic != null) "$bic/" else "" - return "payto://iban/$bic$iban?receiver-name=${(receiverName ?: defaultName).encodeURLParameter()}" - } + fun full(defaultName: String): String = build(iban.toString(), bic, receiverName ?: defaultName) internal object Serializer : KSerializer<IbanPayto> { override val descriptor: SerialDescriptor = @@ -412,6 +422,12 @@ class IbanPayto internal constructor( } companion object { + fun build(iban: String, bic: String?, name: String?): String { + val bic = if (bic != null) "$bic/" else "" + val name = if (name != null) "?receiver-name=${name.encodeURLParameter()}" else "" + return "payto://iban/$bic$iban$name" + } + fun rand(): IbanPayto { return parse("payto://iban/SANDBOXX/${IBAN.rand()}").expectIban() } diff --git a/common/src/main/kotlin/TalerConfig.kt b/common/src/main/kotlin/TalerConfig.kt @@ -444,8 +444,8 @@ class TalerConfig internal constructor( return str } - fun requireString(section: String, option: String): String = - lookupString(section, option) ?: throw TalerConfigError.missing("string", section, option) + fun requireString(section: String, option: String, type: String? = null): String = + lookupString(section, option) ?: throw TalerConfigError.missing(type ?: "string", section, option) fun requireNumber(section: String, option: String): Int { val raw = lookupString(section, option) ?: throw TalerConfigError.missing("number", section, option) diff --git a/common/src/main/kotlin/TalerMessage.kt b/common/src/main/kotlin/TalerMessage.kt @@ -75,6 +75,7 @@ data class IncomingHistory( val credit_account: String ) +/** Inner request GET /taler-wire-gateway/history/incoming */ @Serializable data class IncomingReserveTransaction( val type: String = "RESERVE", @@ -92,6 +93,7 @@ data class OutgoingHistory( val debit_account: String ) +/** Inner request GET /taler-wire-gateway/history/outgoing */ @Serializable data class OutgoingTransaction( val row_id: Long, // DB row ID of the payment. @@ -101,3 +103,29 @@ data class OutgoingTransaction( val wtid: ShortHashCode, val exchange_base_url: String, ) + +/** Response GET /taler-revenue/config */ +@Serializable +data class RevenueConfig( + val currency: String +) { + val name: String = "taler-revenue" + val version: String = REVENUE_API_VERSION +} + +/** Request GET /taler-revenue/history */ +@Serializable +data class RevenueIncomingHistory( + val incoming_transactions : List<RevenueIncomingBankTransaction>, + val credit_account: String +) + +/** Inner request GET /taler-revenue/history */ +@Serializable +data class RevenueIncomingBankTransaction( + val row_id: Long, + val date: TalerProtocolTimestamp, + val amount: TalerAmount, + val debit_account: String, + val subject: String +) +\ No newline at end of file diff --git a/common/src/main/kotlin/TxMedatada.kt b/common/src/main/kotlin/TxMedatada.kt @@ -18,10 +18,16 @@ */ package tech.libeufin.common -private val PATTERN = Regex("[a-z0-9A-Z]{52}") +private val BASE32_32B_PATTERN = Regex("[a-z0-9A-Z]{52}") /** Extract the reserve public key from an incoming Taler transaction subject */ fun parseIncomingTxMetadata(subject: String): EddsaPublicKey { - val match = PATTERN.find(subject)?.value ?: throw Exception("Missing reserve public key") + val match = BASE32_32B_PATTERN.find(subject)?.value ?: throw Exception("Missing reserve public key") return EddsaPublicKey(match) +} + +/** Extract the reserve public key from an incoming Taler transaction subject */ +fun parseOutgoingTxMetadata(subject: String): Pair<ShortHashCode, ExchangeUrl> { + val (wtid, baseUrl) = subject.splitOnce(" ") ?: throw Exception("Malformed outgoing subject") + return Pair(EddsaPublicKey(wtid), ExchangeUrl(baseUrl)) } \ No newline at end of file diff --git a/common/src/main/kotlin/api/route.kt b/common/src/main/kotlin/api/route.kt @@ -0,0 +1,41 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.common.api + +import io.ktor.http.* +import io.ktor.server.application.* +import io.ktor.server.response.* +import io.ktor.server.routing.* +import io.ktor.util.* +import io.ktor.util.pipeline.* + +fun Route.intercept(callback: Route.() -> Unit, interceptor: suspend PipelineContext<Unit, ApplicationCall>.() -> Unit): Route { + val subRoute = createChild(object : RouteSelector() { + override fun evaluate(context: RoutingResolveContext, segmentIndex: Int): RouteSelectorEvaluation = + RouteSelectorEvaluation.Constant + }) + subRoute.intercept(ApplicationCallPipeline.Plugins) { + interceptor() + proceed() + } + + callback(subRoute) + return subRoute +} +\ No newline at end of file diff --git a/common/src/main/kotlin/api/server.kt b/common/src/main/kotlin/api/server.kt @@ -222,4 +222,28 @@ fun Application.talerApi(logger: Logger, routes: Routing.() -> Unit) { } } routing { routes() } +} + +// Dirty local variable to stop the server in test TODO remove this ugly hack +var engine: ApplicationEngine? = null + +fun serve(cfg: ServerConfig, api: Application.() -> Unit) { + val env = applicationEngineEnvironment { + when (cfg) { + is ServerConfig.Tcp -> { + for (addr in InetAddress.getAllByName(cfg.addr)) { + connector { + port = cfg.port + host = addr.hostAddress + } + } + } + is ServerConfig.Unix -> + throw Exception("Can only serve via TCP") + } + module { api() } + } + val local = embeddedServer(Netty, env) + engine = local + local.start(wait = true) } \ No newline at end of file diff --git a/common/src/main/kotlin/db/helpers.kt b/common/src/main/kotlin/db/helpers.kt @@ -110,4 +110,55 @@ suspend fun <T> DbPool.poolHistory( } else { load() } +} + +/** +* The following function returns the list of transactions, according +* to the history parameters and perform long polling when necessary +*/ +suspend fun <T> DbPool.poolHistoryGlobal( + params: HistoryParams, + listen: suspend (suspend (Flow<Long>) -> List<T>) -> List<T>, + query: String, + idColumnValue: String, + map: (ResultSet) -> T +): List<T> { + + suspend fun load(): List<T> = page( + params.page, + idColumnValue, + query, + map=map + ) + + + // TODO do we want to handle polling when going backward and there is no transactions yet ? + // When going backward there is always at least one transaction or none + return if (params.page.delta >= 0 && params.polling.poll_ms > 0) { + listen { flow -> + coroutineScope { + // Start buffering notification before loading transactions to not miss any + val polling = launch { + withTimeoutOrNull(params.polling.poll_ms) { + flow.first { it > params.page.start } // Always forward so > + } + } + // Initial loading + val init = load() + // Long polling if we found no transactions + if (init.isEmpty()) { + if (polling.join() != null) { + load() + } else { + init + } + } else { + polling.cancel() + init + } + } + } + } else { + load() + } } \ No newline at end of file diff --git a/common/src/main/kotlin/db/notifications.kt b/common/src/main/kotlin/db/notifications.kt @@ -30,7 +30,10 @@ import java.util.* import java.util.concurrent.ConcurrentHashMap // SharedFlow that are manually counted for manual garbage collection -class CountedSharedFlow<T>(val flow: MutableSharedFlow<T>, var count: Int) +class CountedSharedFlow<T> { + val flow: MutableSharedFlow<T> = MutableSharedFlow() + var count: Int = 0 +} fun watchNotifications( pgSource: PGSimpleDataSource, @@ -73,7 +76,7 @@ fun watchNotifications( suspend fun <R, K, V> listen(map: ConcurrentHashMap<K, CountedSharedFlow<V>>, key: K, lambda: suspend (Flow<V>) -> R): R { // Register listener, create a new flow if missing val flow = map.compute(key) { _, v -> - val tmp = v ?: CountedSharedFlow(MutableSharedFlow(), 0) + val tmp = v ?: CountedSharedFlow() tmp.count++ tmp }!!.flow diff --git a/common/src/main/kotlin/db/transaction.kt b/common/src/main/kotlin/db/transaction.kt @@ -52,6 +52,15 @@ fun <T> PreparedStatement.oneOrNull(lambda: (ResultSet) -> T): T? { fun <T> PreparedStatement.one(lambda: (ResultSet) -> T): T = requireNotNull(oneOrNull(lambda)) { "Missing result to database query" } +fun <T> PreparedStatement.oneUniqueViolation(err: T, lambda: (ResultSet) -> T): T { + return try { + one(lambda) + } catch (e: SQLException) { + if (e.sqlState == PSQLState.UNIQUE_VIOLATION.state) return err + throw e // rethrowing, not to hide other types of errors. + } +} + fun <T> PreparedStatement.all(lambda: (ResultSet) -> T): List<T> { executeQuery().use { val ret = mutableListOf<T>() diff --git a/common/src/main/kotlin/db/types.kt b/common/src/main/kotlin/db/types.kt @@ -23,6 +23,8 @@ import tech.libeufin.common.BankPaytoCtx import tech.libeufin.common.Payto import tech.libeufin.common.TalerAmount import tech.libeufin.common.DecimalNumber +import tech.libeufin.common.TalerProtocolTimestamp +import tech.libeufin.common.asInstant import java.sql.ResultSet fun ResultSet.getAmount(name: String, currency: String): TalerAmount { @@ -46,6 +48,10 @@ fun ResultSet.getDecimal(name: String): DecimalNumber { ) } +fun ResultSet.getTalerTimestamp(name: String): TalerProtocolTimestamp{ + return TalerProtocolTimestamp(getLong(name).asInstant()) +} + fun ResultSet.getBankPayto(payto: String, name: String, ctx: BankPaytoCtx): String { return Payto.parse(getString(payto)).bank(getString(name), ctx) } \ No newline at end of file diff --git a/common/src/main/kotlin/test/helpers.kt b/common/src/main/kotlin/test/helpers.kt @@ -0,0 +1,65 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.common.test + +import io.ktor.client.* +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.http.* +import kotlin.test.assertEquals +import kotlin.test.assertIs +import kotlin.test.assertNotNull +import tech.libeufin.common.* + +/* ----- Assert ----- */ + +suspend fun assertTime(min: Int, max: Int, lambda: suspend () -> Unit) { + val start = System.currentTimeMillis() + lambda() + val end = System.currentTimeMillis() + val time = end - start + assert(time >= min) { "Expected to last at least $min ms, lasted $time" } + assert(time <= max) { "Expected to last at most $max ms, lasted $time" } +} + +suspend inline fun <reified B> HttpResponse.assertHistoryIds(size: Int, ids: (B) -> List<Long>): B { + assertOk() + val body = json<B>() + val history = ids(body) + val params = PageParams.extract(call.request.url.parameters) + + // testing the size is like expected. + assertEquals(size, history.size, "bad history length: $history") + if (params.delta < 0) { + // testing that the first id is at most the 'start' query param. + assert(history[0] <= params.start) { "bad history start: $params $history" } + // testing that the id decreases. + if (history.size > 1) + assert(history.windowed(2).all { (a, b) -> a > b }) { "bad history order: $history" } + } else { + // testing that the first id is at least the 'start' query param. + assert(history[0] >= params.start) { "bad history start: $params $history" } + // testing that the id increases. + if (history.size > 1) + assert(history.windowed(2).all { (a, b) -> a < b }) { "bad history order: $history" } + } + + return body +} +\ No newline at end of file diff --git a/common/src/main/kotlin/test/routines.kt b/common/src/main/kotlin/test/routines.kt @@ -0,0 +1,142 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.common.test + +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.http.* +import io.ktor.server.testing.* +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import tech.libeufin.common.* + +suspend inline fun <reified B> ApplicationTestBuilder.abstractHistoryRoutine( + crossinline ids: (B) -> List<Long>, + registered: List<suspend () -> Unit>, + ignored: List<suspend () -> Unit> = listOf(), + polling: Boolean = true, + crossinline history: suspend (String) -> HttpResponse, +) { + // Check history is following specs + val assertHistory: suspend HttpResponse.(Int) -> Unit = { size: Int -> + assertHistoryIds<B>(size, ids) + } + // Get latest registered id + val latestId: suspend () -> Long = { + history("delta=-1").assertOkJson<B>().run { ids(this)[0] } + } + + // Check error when no transactions + history("delta=7").assertNoContent() + + // Run interleaved registered and ignore transactions + val registered_iter = registered.iterator() + val ignored_iter = ignored.iterator() + while (registered_iter.hasNext() || ignored_iter.hasNext()) { + if (registered_iter.hasNext()) registered_iter.next()() + if (ignored_iter.hasNext()) ignored_iter.next()() + } + + val nbRegistered = registered.size + val nbIgnored = ignored.size + val nbTotal = nbRegistered + nbIgnored + + // Check ignored + history("delta=$nbTotal").assertHistory(nbRegistered) + // Check skip ignored + history("delta=$nbRegistered").assertHistory(nbRegistered) + + if (polling) { + // Check no polling when we cannot have more transactions + assertTime(0, 100) { + history("delta=-${nbRegistered+1}&long_poll_ms=1000") + .assertHistory(nbRegistered) + } + // Check no polling when already find transactions even if less than delta + assertTime(0, 100) { + history("delta=${nbRegistered+1}&long_poll_ms=1000") + .assertHistory(nbRegistered) + } + + // Check polling + coroutineScope { + val id = latestId() + launch { // Check polling succeed + assertTime(100, 200) { + history("delta=2&start=$id&long_poll_ms=1000") + .assertHistory(1) + } + } + launch { // Check polling timeout + assertTime(200, 300) { + history("delta=1&start=${id+nbTotal*3}&long_poll_ms=200") + .assertNoContent() + } + } + delay(100) + registered[0]() + } + + // Test triggers + for (register in registered) { + coroutineScope { + val id = latestId() + launch { + assertTime(100, 200) { + history("delta=7&start=$id&long_poll_ms=1000") + .assertHistory(1) + } + } + delay(100) + register() + } + } + + // Test doesn't trigger + coroutineScope { + val id = latestId() + launch { + assertTime(200, 300) { + history("delta=7&start=$id&long_poll_ms=200") + .assertNoContent() + } + } + delay(100) + for (ignore in ignored) { + ignore() + } + } + } + + // Testing ranges. + repeat(20) { + registered[0]() + } + val id = latestId() + // Default + history("").assertHistory(20) + // forward range: + history("delta=10").assertHistory(10) + history("delta=10&start=4").assertHistory(10) + // backward range: + history("delta=-10").assertHistory(10) + history("delta=-10&start=${id-4}").assertHistory(10) +} +\ No newline at end of file diff --git a/contrib/nexus.conf b/contrib/nexus.conf @@ -40,6 +40,11 @@ CLIENT_PRIVATE_KEYS_FILE = ${LIBEUFIN_NEXUS_HOME}/client-ebics-keys.json # Typically, it is named after the bank itself. BANK_DIALECT = postfinance +# Specify the account type and therefore the indexing behavior. +# This can either can be normal or exchange. +# Exchange accounts bounce invalid incoming Taler transactions. +ACCOUNT_TYPE = exchange + [libeufin-nexusdb-postgres] # Where are the SQL files to setup our tables? SQL_DIR = $DATADIR/sql/ @@ -56,15 +61,27 @@ FREQUENCY = 30m FREQUENCY = 30m [nexus-httpd] -PORT = 8080 +# How "libeufin-nexus serve" serves its API, this can either be tcp or unix SERVE = tcp +# Port on which the HTTP server listens, e.g. 9967. Only used if SERVE is tcp. +PORT = 8080 + +# Which IP address should we bind to? E.g. ``127.0.0.1`` or ``::1``for loopback. Can also be given as a hostname. Only used if SERVE is tcp. +BIND_TO = 0.0.0.0 + +# Which unix domain path should we bind to? Only used if SERVE is unix. +# UNIXPATH = libeufin-nexus.sock + +# What should be the file access permissions for UNIXPATH? Only used if SERVE is unix. +# UNIXPATH_MODE = 660 + [nexus-httpd-wire-gateway-api] ENABLED = NO -AUTH_METHOD = token -AUTH_TOKEN = +AUTH_METHOD = bearer-token +AUTH_BEARER_TOKEN = [nexus-httpd-revenue-api] ENABLED = NO -AUTH_METHOD = token -AUTH_TOKEN = +AUTH_METHOD = bearer-token +AUTH_BEARER_TOKEN = diff --git a/database-versioning/libeufin-bank-procedures.sql b/database-versioning/libeufin-bank-procedures.sql @@ -1058,7 +1058,7 @@ SELECT custom_min_cashout.val, custom_min_cashout.frac, out_no_cashout_payto, out_tan_required FROM bank_accounts - JOIN customers ON bank_accounts.owning_customer_id = customers.customer_id + JOIN customers ON owning_customer_id=customer_id WHERE login=in_login; IF NOT FOUND THEN out_account_not_found=TRUE; diff --git a/database-versioning/libeufin-nexus-0001.sql b/database-versioning/libeufin-nexus-0001.sql @@ -41,7 +41,7 @@ COMMENT ON TYPE submission_state never_heard_back is a fallback state, in case one successful submission did never get confirmed via camt.5x or pain.002.'; -CREATE TABLE IF NOT EXISTS incoming_transactions +CREATE TABLE incoming_transactions (incoming_transaction_id INT8 GENERATED BY DEFAULT AS IDENTITY UNIQUE ,amount taler_amount NOT NULL ,wire_transfer_subject TEXT NOT NULL @@ -53,12 +53,12 @@ COMMENT ON COLUMN incoming_transactions.bank_id IS 'ISO20022 AccountServicerReference'; -- only active in exchange mode. Note: duplicate keys are another reason to bounce. -CREATE TABLE IF NOT EXISTS talerable_incoming_transactions +CREATE TABLE talerable_incoming_transactions (incoming_transaction_id INT8 NOT NULL UNIQUE REFERENCES incoming_transactions(incoming_transaction_id) ON DELETE CASCADE ,reserve_public_key BYTEA NOT NULL UNIQUE CHECK (LENGTH(reserve_public_key)=32) ); -CREATE TABLE IF NOT EXISTS outgoing_transactions +CREATE TABLE outgoing_transactions (outgoing_transaction_id INT8 GENERATED BY DEFAULT AS IDENTITY UNIQUE ,amount taler_amount NOT NULL ,wire_transfer_subject TEXT @@ -69,7 +69,7 @@ CREATE TABLE IF NOT EXISTS outgoing_transactions COMMENT ON COLUMN outgoing_transactions.message_id IS 'ISO20022 MessageIdentification'; -CREATE TABLE IF NOT EXISTS initiated_outgoing_transactions +CREATE TABLE initiated_outgoing_transactions (initiated_outgoing_transaction_id INT8 GENERATED BY DEFAULT AS IDENTITY UNIQUE ,amount taler_amount NOT NULL ,wire_transfer_subject TEXT NOT NULL @@ -93,15 +93,15 @@ value will be used as a unique identifier for its related pain.001 document. For this reason, it must have at most 35 characters'; -- only active in exchange mode. -CREATE TABLE IF NOT EXISTS bounced_transactions +CREATE TABLE bounced_transactions (incoming_transaction_id INT8 NOT NULL UNIQUE REFERENCES incoming_transactions(incoming_transaction_id) ON DELETE CASCADE ,initiated_outgoing_transaction_id INT8 NOT NULL UNIQUE REFERENCES initiated_outgoing_transactions(initiated_outgoing_transaction_id) ON DELETE CASCADE ); -CREATE INDEX IF NOT EXISTS incoming_transaction_timestamp +CREATE INDEX incoming_transaction_timestamp ON incoming_transactions (execution_time); -CREATE INDEX IF NOT EXISTS outgoing_transaction_timestamp +CREATE INDEX outgoing_transaction_timestamp ON outgoing_transactions (execution_time); COMMIT; diff --git a/database-versioning/libeufin-nexus-0003.sql b/database-versioning/libeufin-nexus-0003.sql @@ -0,0 +1,36 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2024 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +BEGIN; + +SELECT _v.register_patch('libeufin-nexus-0003', NULL, NULL); + +SET search_path TO libeufin_nexus; + +CREATE TABLE talerable_outgoing_transactions + ( outgoing_transaction_id INT8 UNIQUE NOT NULL REFERENCES outgoing_transactions(outgoing_transaction_id) ON DELETE CASCADE + ,wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32) + ,exchange_base_url TEXT NOT NULL + ); + +CREATE TABLE transfer_operations + ( initiated_outgoing_transaction_id INT8 UNIQUE NOT NULL REFERENCES initiated_outgoing_transactions(initiated_outgoing_transaction_id) ON DELETE CASCADE + ,request_uid BYTEA UNIQUE NOT NULL CHECK (LENGTH(request_uid)=64) + ,wtid BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid)=32) + ,exchange_base_url TEXT NOT NULL + ); +COMMENT ON TABLE transfer_operations + IS 'Operation table for idempotent wire gateway transfers.'; +COMMIT; diff --git a/database-versioning/libeufin-nexus-procedures.sql b/database-versioning/libeufin-nexus-procedures.sql @@ -33,6 +33,8 @@ CREATE FUNCTION register_outgoing( ,IN in_execution_time INT8 ,IN in_credit_payto_uri TEXT ,IN in_message_id TEXT + ,IN in_wtid BYTEA + ,IN in_exchange_url TEXT ,OUT out_tx_id INT8 ,OUT out_found BOOLEAN ,OUT out_initiated BOOLEAN @@ -79,6 +81,19 @@ ELSE WHERE request_uid = in_message_id RETURNING true INTO out_initiated; END IF; + +-- Register as talerable if contains wtid and exchange URL +IF in_wtid IS NOT NULL OR in_exchange_url IS NOT NULL THEN + INSERT INTO talerable_outgoing_transactions ( + outgoing_transaction_id, + wtid, + exchange_base_url + ) VALUES (out_tx_id, in_wtid, in_exchange_url) + ON CONFLICT (wtid) DO NOTHING; + IF FOUND THEN + PERFORM pg_notify('outgoing_tx', out_tx_id::text); + END IF; +END IF; END $$; COMMENT ON FUNCTION register_outgoing IS 'Register an outgoing transaction and optionally reconciles the related initiated transaction with it'; @@ -116,6 +131,7 @@ ELSE ,in_debit_payto_uri ,in_bank_id ) RETURNING incoming_transaction_id INTO out_tx_id; + PERFORM pg_notify('revenue_tx', out_tx_id::text); END IF; END $$; COMMENT ON FUNCTION register_incoming @@ -216,7 +232,7 @@ BEGIN -- Check conflict IF EXISTS ( SELECT FROM talerable_incoming_transactions - JOIN incoming_transactions ON talerable_incoming_transactions.incoming_transaction_id=incoming_transactions.incoming_transaction_id + JOIN incoming_transactions USING(incoming_transaction_id) WHERE reserve_public_key = in_reserve_public_key AND bank_id != in_bank_id ) THEN @@ -239,9 +255,71 @@ IF NOT EXISTS(SELECT 1 FROM talerable_incoming_transactions WHERE incoming_trans out_tx_id ,in_reserve_public_key ); + PERFORM pg_notify('incoming_tx', out_tx_id::text); END IF; END $$; COMMENT ON FUNCTION register_incoming_and_talerable IS ' Creates one row in the incoming transactions table and one row in the talerable transactions table. The talerable row links the -incoming one.'; -\ No newline at end of file +incoming one.'; + +CREATE FUNCTION taler_transfer( + IN in_request_uid BYTEA, + IN in_wtid BYTEA, + IN in_subject TEXT, + IN in_amount taler_amount, + IN in_exchange_base_url TEXT, + IN in_credit_account_payto TEXT, + IN in_bank_id TEXT, + IN in_timestamp INT8, + -- Error status + OUT out_request_uid_reuse BOOLEAN, + -- Success return + OUT out_tx_row_id INT8, + OUT out_timestamp INT8 +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check for idempotence and conflict +SELECT (amount != in_amount + OR credit_payto_uri != in_credit_account_payto + OR exchange_base_url != in_exchange_base_url + OR wtid != in_wtid) + ,transfer_operations.initiated_outgoing_transaction_id, initiation_time + INTO out_request_uid_reuse, out_tx_row_id, out_timestamp + FROM transfer_operations + JOIN initiated_outgoing_transactions + ON transfer_operations.initiated_outgoing_transaction_id=initiated_outgoing_transactions.initiated_outgoing_transaction_id + WHERE transfer_operations.request_uid = in_request_uid; +IF FOUND THEN + RETURN; +END IF; +-- Initiate bank transfer +INSERT INTO initiated_outgoing_transactions ( + amount + ,wire_transfer_subject + ,credit_payto_uri + ,initiation_time + ,request_uid +) VALUES ( + in_amount + ,in_subject + ,in_credit_account_payto + ,in_timestamp + ,in_bank_id +) RETURNING initiated_outgoing_transaction_id INTO out_tx_row_id; +-- Register outgoing transaction +INSERT INTO transfer_operations( + initiated_outgoing_transaction_id + ,request_uid + ,wtid + ,exchange_base_url +) VALUES ( + out_tx_row_id + ,in_request_uid + ,in_wtid + ,in_exchange_base_url +); +out_timestamp = in_timestamp; +PERFORM pg_notify('outgoing_tx', out_tx_row_id::text); +END $$; diff --git a/debian/libeufin-nexus.libeufin-nexus-httpd.service b/debian/libeufin-nexus.libeufin-nexus-httpd.service @@ -0,0 +1,14 @@ +[Unit] +Description=LibEuFin Nexus Server Service +After=postgres.service network.target +PartOf=libeufin-nexus.target + +[Service] +User=libeufin-nexus +ExecStart=/usr/bin/libeufin-nexus serve -c /etc/libeufin/libeufin-nexus.conf +ExecCondition=/usr/bin/libeufin-nexus serve -c /etc/libeufin/libeufin-nexus.conf --check +Restart=on-failure +RestartSec=1s + +[Install] +WantedBy=multi-user.target diff --git a/debian/libeufin-nexus.target b/debian/libeufin-nexus.target @@ -4,6 +4,7 @@ After=postgres.service network.target Wants=libeufin-nexus-ebics-fetch.service Wants=libeufin-nexus-ebics-submit.service +Wants=libeufin-nexus-httpd.service [Install] WantedBy=multi-user.target \ No newline at end of file diff --git a/debian/rules b/debian/rules @@ -39,12 +39,13 @@ override_dh_install: override_dh_installsystemd: # Need to specify units manually, since we have multiple # and dh_installsystemd by default only looks for "<package>.service". - dh_installsystemd -plibeufin-bank --name=libeufin-bank --no-start --no-enable --no-stop-on-upgrade + dh_installsystemd -plibeufin-bank --name=libeufin-bank.service --no-start --no-enable --no-stop-on-upgrade dh_installsystemd -plibeufin-bank --name=libeufin-bank-gc --no-start --no-enable --no-stop-on-upgrade dh_installsystemd -plibeufin-bank --name=libeufin-bank-gc.timer --no-start --no-enable --no-stop-on-upgrade dh_installsystemd -plibeufin-bank --name=libeufin-bank --no-start --no-enable --no-stop-on-upgrade dh_installsystemd -plibeufin-nexus --name=libeufin-nexus-ebics-submit --no-start --no-enable --no-stop-on-upgrade dh_installsystemd -plibeufin-nexus --name=libeufin-nexus-ebics-fetch --no-start --no-enable --no-stop-on-upgrade + dh_installsystemd -plibeufin-nexus --name=libeufin-nexus-httpd --no-start --no-enable --no-stop-on-upgrade dh_installsystemd -plibeufin-nexus --name=libeufin-nexus --no-start --no-enable --no-stop-on-upgrade # final invocation to generate daemon reload dh_installsystemd diff --git a/nexus/conf/mini.conf b/nexus/conf/mini.conf @@ -0,0 +1,15 @@ +[nexus-ebics] +CURRENCY = CHF +BANK_DIALECT = postfinance +HOST_BASE_URL = https://isotest.postfinance.ch/ebicsweb/ebicsweb +BANK_PUBLIC_KEYS_FILE = test/tmp/bank-keys.json +CLIENT_PRIVATE_KEYS_FILE = test/tmp/client-keys.json +IBAN = CH7789144474425692816 +HOST_ID = PFEBICS +USER_ID = PFC00563 +PARTNER_ID = PFC00563 +BIC = BIC +NAME = myname + +[libeufin-nexusdb-postgres] +CONFIG = postgres:///libeufincheck +\ No newline at end of file diff --git a/nexus/conf/test.conf b/nexus/conf/test.conf @@ -15,4 +15,14 @@ NAME = myname CONFIG = postgres:///libeufincheck [nexus-fetch] -IGNORE_TRANSACTIONS_BEFORE = 2024-04-04 -\ No newline at end of file +IGNORE_TRANSACTIONS_BEFORE = 2024-04-04 + +[nexus-httpd-wire-gateway-api] +ENABLED = YES +AUTH_METHOD = bearer-token +AUTH_BEARER_TOKEN = secret-token + +[nexus-httpd-revenue-api] +ENABLED = YES +AUTH_METHOD = bearer-token +AUTH_BEARER_TOKEN = secret-token +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt @@ -31,9 +31,13 @@ class NexusFetchConfig(config: TalerConfig) { val ignoreBefore = config.lookupDate("nexus-fetch", "ignore_transactions_before") } +class ApiConfig(config: TalerConfig, section: String) { + val authMethod = config.requireAuthMethod(section) +} + /** Configuration for libeufin-nexus */ class NexusConfig(val config: TalerConfig) { - private fun requireString(option: String): String = config.requireString("nexus-ebics", option) + private fun requireString(option: String, type: String? = null): String = config.requireString("nexus-ebics", option, type) private fun requirePath(option: String): Path = config.requirePath("nexus-ebics", option) /** The bank's currency */ @@ -52,17 +56,26 @@ class NexusConfig(val config: TalerConfig) { bic = requireString("bic"), name = requireString("name") ) + /** Bank account payto */ + val payto = IbanPayto.build(account.iban, account.bic, account.name) /** Path where we store the bank public keys */ val bankPublicKeysPath = requirePath("bank_public_keys_file") /** Path where we store our private keys */ val clientPrivateKeysPath = requirePath("client_private_keys_file") val fetch = NexusFetchConfig(config) - val dialect = when (val type = requireString("bank_dialect")) { + val dialect = when (val type = requireString("bank_dialect", "dialect")) { "postfinance" -> Dialect.postfinance "gls" -> Dialect.gls - else -> throw TalerConfigError.invalid("dialct", "libeufin-nexus", "bank_dialect", "expected 'postfinance' or 'gls' got '$type'") + else -> throw TalerConfigError.invalid("bank dialect", "libeufin-nexus", "bank_dialect", "expected 'postfinance' or 'gls' got '$type'") } + val accountType = when (val type = requireString("account_type", "account type")) { + "normal" -> AccountType.normal + "exchange" -> AccountType.exchange + else -> throw TalerConfigError.invalid("account type", "libeufin-nexus", "account_type", "expected 'normal' or 'exchange' got '$type'") + } + val wireGatewayApiCfg = config.apiConf("nexus-httpd-wire-gateway-api") + val revenueApiCfg = config.apiConf("nexus-httpd-revenue-api") } fun NexusConfig.checkCurrency(amount: TalerAmount) { @@ -70,4 +83,34 @@ fun NexusConfig.checkCurrency(amount: TalerAmount) { "Wrong currency: expected regional $currency got ${amount.currency}", TalerErrorCode.GENERIC_CURRENCY_MISMATCH ) +} + +fun TalerConfig.requireAuthMethod(section: String): AuthMethod { + return when (val method = requireString(section, "auth_method", "auth method")) { + "none" -> AuthMethod.None + "bearer-token" -> { + val token = requireString(section, "auth_bearer_token") + AuthMethod.Bearer(token) + } + else -> throw TalerConfigError.invalid("auth method target type", section, "auth_method", "expected 'bearer-token' or 'none' got '$method'") + } +} + +fun TalerConfig.apiConf(section: String): ApiConfig? { + val enabled = requireBoolean(section, "enabled") + return if (enabled) { + return ApiConfig(this, section) + } else { + null + } +} + +sealed interface AuthMethod { + data object None: AuthMethod + data class Bearer(val token: String): AuthMethod +} + +enum class AccountType { + normal, + exchange } \ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt @@ -95,7 +95,10 @@ suspend fun ingestOutgoingPayment( db: Database, payment: OutgoingPayment ) { - val result = db.payment.registerOutgoing(payment) + val metadata: Pair<ShortHashCode, ExchangeUrl>? = payment.wireTransferSubject?.let { + runCatching { parseOutgoingTxMetadata(it) }.getOrNull() + } + val result = db.payment.registerOutgoing(payment, metadata?.first, metadata?.second) if (result.new) { if (result.initiated) logger.info("$payment") @@ -106,8 +109,6 @@ suspend fun ingestOutgoingPayment( } } -private val PATTERN = Regex("[a-z0-9A-Z]{52}") - /** * Ingests an incoming payment. Stores the payment into valid talerable ones * or bounces it, according to the subject. @@ -117,18 +118,31 @@ private val PATTERN = Regex("[a-z0-9A-Z]{52}") */ suspend fun ingestIncomingPayment( db: Database, - payment: IncomingPayment + payment: IncomingPayment, + accountType: AccountType ) { suspend fun bounce(msg: String) { - val result = db.payment.registerMalformedIncoming( - payment, - payment.amount, - Instant.now() - ) - if (result.new) { - logger.info("$payment bounced in '${result.bounceId}': $msg") - } else { - logger.debug("$payment already seen and bounced in '${result.bounceId}': $msg") + when (accountType) { + AccountType.exchange -> { + val result = db.payment.registerMalformedIncoming( + payment, + payment.amount, + Instant.now() + ) + if (result.new) { + logger.info("$payment bounced in '${result.bounceId}': $msg") + } else { + logger.debug("$payment already seen and bounced in '${result.bounceId}': $msg") + } + } + AccountType.normal -> { + val res = db.payment.registerIncoming(payment) + if (res.new) { + logger.info("$payment") + } else { + logger.debug("$payment already seen") + } + } } } runCatching { parseIncomingTxMetadata(payment.wireTransferSubject) }.fold( @@ -163,7 +177,7 @@ private suspend fun ingestDocument( logger.debug("IGNORE $it") } else { when (it) { - is IncomingPayment -> ingestIncomingPayment(db, it) + is IncomingPayment -> ingestIncomingPayment(db, it, cfg.accountType) is OutgoingPayment -> ingestOutgoingPayment(db, it) is TxNotification.Reversal -> { logger.error("BOUNCE '${it.msgId}': ${it.reason}") @@ -364,10 +378,10 @@ class EbicsFetch: CliktCommand("Fetches EBICS files") { * mode when no flags are passed to the invocation. */ override fun run() = cliCmd(logger, common.log) { - val cfg = extractEbicsConfig(common.config) + val cfg = loadNexusConfig(common.config) val dbCfg = cfg.config.dbConfig() - Database(dbCfg).use { db -> + Database(dbCfg, cfg.currency).use { db -> val (clientKeys, bankKeys) = expectFullKeys(cfg) val ctx = FetchContext( cfg, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSetup.kt @@ -155,7 +155,7 @@ suspend fun doKeysRequestAndUpdateState( * @param configFile location of the configuration entry point. * @return internal representation of the configuration. */ -fun extractEbicsConfig(configFile: Path?): NexusConfig { +fun loadNexusConfig(configFile: Path?): NexusConfig { val config = loadConfig(configFile) return NexusConfig(config) } @@ -197,8 +197,8 @@ class EbicsSetup: CliktCommand("Set up the EBICS subscriber") { * This function collects the main steps of setting up an EBICS access. */ override fun run() = cliCmd(logger, common.log) { - val cfg = extractEbicsConfig(common.config) - val client = HttpClient { + val cfg = loadNexusConfig(common.config) + val client = HttpClient { install(HttpTimeout) { // It can take a lot of time for the bank to generate documents socketTimeoutMillis = 5 * 60 * 1000 diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt @@ -65,7 +65,7 @@ data class SubmissionContext( private suspend fun submitInitiatedPayment( ctx: SubmissionContext, payment: InitiatedPayment -): String { +): String { val creditAccount = try { val payto = Payto.parse(payment.creditPaytoUri).expectIban() IbanAccountMetadata( @@ -147,7 +147,7 @@ class EbicsSubmit : CliktCommand("Submits any initiated payment found in the dat * FIXME: reduce code duplication with the fetch subcommand. */ override fun run() = cliCmd(logger, common.log) { - val cfg = extractEbicsConfig(common.config) + val cfg = loadNexusConfig(common.config) val dbCfg = cfg.config.dbConfig() val (clientKeys, bankKeys) = expectFullKeys(cfg) val ctx = SubmissionContext( @@ -157,7 +157,7 @@ class EbicsSubmit : CliktCommand("Submits any initiated payment found in the dat httpClient = HttpClient(), fileLogger = FileLogger(ebicsLog) ) - Database(dbCfg).use { db -> + Database(dbCfg, cfg.currency).use { db -> val frequency: Duration = if (transient) { logger.info("Transient mode: submitting what found and returning.") Duration.ZERO diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt @@ -300,13 +300,9 @@ data class OutgoingPayment( private fun XmlDestructor.payto(prefix: String): String? { val iban = opt("${prefix}Acct")?.one("Id")?.one("IBAN")?.text() return if (iban != null) { - val payto = StringBuilder("payto://iban/$iban") val name = opt(prefix) { opt("Nm")?.text() ?: opt("Pty")?.one("Nm")?.text() } - if (name != null) { - val urlEncName = URLEncoder.encode(name, "utf-8") - payto.append("?receiver-name=$urlEncName") - } - return payto.toString() + // Parse bic ? + IbanPayto.build(iban, null, name) } else { null } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt @@ -31,6 +31,7 @@ import com.github.ajalt.clikt.parameters.arguments.* import com.github.ajalt.clikt.parameters.groups.* import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.* +import com.github.ajalt.clikt.core.ProgramResult import io.ktor.client.* import io.ktor.client.plugins.* import kotlinx.serialization.json.Json @@ -75,6 +76,7 @@ fun Instant.fmtDateTime(): String = fun Application.nexusApi(db: Database, cfg: NexusConfig) = talerApi(logger) { wireGatewayApi(db, cfg) + revenueApi(db, cfg) } /** @@ -132,7 +134,7 @@ class InitiatePayment: CliktCommand("Initiate an outgoing payment") { Base32Crockford.encode(bytes) } - Database(dbCfg).use { db -> + Database(dbCfg, currency).use { db -> db.initiated.create( InitiatedPayment( id = -1, @@ -147,6 +149,44 @@ class InitiatePayment: CliktCommand("Initiate an outgoing payment") { } } +class Serve : CliktCommand("Run libeufin-nexus HTTP server", name = "serve") { + private val common by CommonOption() + private val check by option().flag() + + override fun run() = cliCmd(logger, common.log) { + val cfg = loadNexusConfig(common.config) + + if (check) { + // Check if the server is to be started + val apis = listOf( + cfg.wireGatewayApiCfg to "Wire Gateway API", + cfg.revenueApiCfg to "Revenue API" + ) + var startServer = false + for ((api, name) in apis) { + if (api != null) { + startServer = true + logger.info("$name is enabled: starting the server") + } + } + if (!startServer) { + logger.info("All APIs are disabled: not starting the server") + throw ProgramResult(1) + } else { + throw ProgramResult(0) + } + } + + val dbCfg = cfg.config.dbConfig() + val serverCfg = cfg.config.loadServerConfig("nexus-httpd") + Database(dbCfg, cfg.currency).use { db -> + serve(serverCfg) { + nexusApi(db, cfg) + } + } + } +} + class FakeIncoming: CliktCommand("Genere a fake incoming payment") { private val common by CommonOption() private val amount by option( @@ -162,15 +202,14 @@ class FakeIncoming: CliktCommand("Genere a fake incoming payment") { ).convert { Payto.parse(it).expectIban() } override fun run() = cliCmd(logger, common.log) { - val cfg = loadConfig(common.config) - val dbCfg = cfg.dbConfig() - val currency = cfg.requireString("nexus-ebics", "currency") + val cfg = loadNexusConfig(common.config) + val dbCfg = cfg.config.dbConfig() val subject = payto.message ?: subject ?: throw Exception("Missing subject") val amount = payto.amount ?: amount ?: throw Exception("Missing amount") - if (amount.currency != currency) - throw Exception("Wrong currency: expected $currency got ${amount.currency}") + if (amount.currency != cfg.currency) + throw Exception("Wrong currency: expected ${cfg.currency} got ${amount.currency}") val bankId = run { val bytes = ByteArray(16) @@ -178,7 +217,7 @@ class FakeIncoming: CliktCommand("Genere a fake incoming payment") { Base32Crockford.encode(bytes) } - Database(dbCfg).use { db -> + Database(dbCfg, amount.currency).use { db -> ingestIncomingPayment(db, IncomingPayment( amount = amount, @@ -186,7 +225,8 @@ class FakeIncoming: CliktCommand("Genere a fake incoming payment") { wireTransferSubject = subject, executionTime = Instant.now(), bankId = bankId - ) + ), + cfg.accountType ) } } @@ -227,7 +267,7 @@ class EbicsDownload: CliktCommand("Perform EBICS requests", name = "ebics-btd") class DryRun: Exception() override fun run() = cliCmd(logger, common.log) { - val cfg = extractEbicsConfig(common.config) + val cfg = loadNexusConfig(common.config) val (clientKeys, bankKeys) = expectFullKeys(cfg) val pinnedStartVal = pinnedStart val pinnedStartArg = if (pinnedStartVal != null) { @@ -282,7 +322,7 @@ class ListCmd: CliktCommand("List nexus transactions", name = "list") { val dbCfg = cfg.dbConfig() val currency = cfg.requireString("nexus-ebics", "currency") - Database(dbCfg).use { db -> + Database(dbCfg, currency).use { db -> fun fmtPayto(payto: String?): String { if (payto == null) return "" try { @@ -405,7 +445,7 @@ class TestingCmd : CliktCommand("Testing helper commands", name = "testing") { class LibeufinNexusCommand : CliktCommand() { init { versionOption(getVersion()) - subcommands(EbicsSetup(), DbInit(), EbicsSubmit(), EbicsFetch(), InitiatePayment(), CliConfigCmd(NEXUS_CONFIG_SOURCE), TestingCmd()) + subcommands(EbicsSetup(), DbInit(), Serve(), EbicsSubmit(), EbicsFetch(), InitiatePayment(), CliConfigCmd(NEXUS_CONFIG_SOURCE), TestingCmd()) } override fun run() = Unit } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/api/RevenueApi.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/api/RevenueApi.kt @@ -0,0 +1,45 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ +package tech.libeufin.nexus.api + +import io.ktor.http.* +import io.ktor.server.application.* +import io.ktor.server.response.* +import io.ktor.server.routing.* +import tech.libeufin.nexus.* +import tech.libeufin.nexus.db.* +import tech.libeufin.common.* + +fun Routing.revenueApi(db: Database, cfg: NexusConfig) = authApi(cfg.revenueApiCfg) { + get("/taler-revenue/config") { + call.respond(RevenueConfig( + currency = cfg.currency + )) + } + get("/taler-revenue/history") { + val params = HistoryParams.extract(context.request.queryParameters) + val items = db.payment.revenueHistory(params) + + if (items.isEmpty()) { + call.respond(HttpStatusCode.NoContent) + } else { + call.respond(RevenueIncomingHistory(items, cfg.payto)) + } + } +} +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/api/WireGatewayApi.kt @@ -29,10 +29,12 @@ import tech.libeufin.common.* import tech.libeufin.nexus.* import tech.libeufin.nexus.db.* import tech.libeufin.nexus.db.PaymentDAO.* +import tech.libeufin.nexus.db.InitiatedDAO.* +import tech.libeufin.nexus.db.ExchangeDAO.* import java.time.Instant -fun Routing.wireGatewayApi(db: Database, cfg: NexusConfig) { +fun Routing.wireGatewayApi(db: Database, cfg: NexusConfig) = authApi(cfg.wireGatewayApiCfg) { get("/taler-wire-gateway/config") { call.respond(WireGatewayConfig( currency = cfg.currency @@ -41,69 +43,52 @@ fun Routing.wireGatewayApi(db: Database, cfg: NexusConfig) { post("/taler-wire-gateway/transfer") { val req = call.receive<TransferRequest>() cfg.checkCurrency(req.amount) - // TODO - /*val res = db.exchange.transfer( - req = req, - login = username, - now = Instant.now() + req.credit_account.expectRequestIban() + val bankId = run { + val bytes = ByteArray(16) + kotlin.random.Random.nextBytes(bytes) + Base32Crockford.encode(bytes) + } + val res = db.exchange.transfer( + req, + bankId, + Instant.now() ) when (res) { - is TransferResult.UnknownExchange -> throw unknownAccount(username) - is TransferResult.NotAnExchange -> throw conflict( - "$username is not an exchange account.", - TalerErrorCode.BANK_ACCOUNT_IS_NOT_EXCHANGE - ) - is TransferResult.UnknownCreditor -> throw unknownCreditorAccount(req.credit_account.canonical) - is TransferResult.BothPartyAreExchange -> throw conflict( - "Wire transfer attempted with credit and debit party being both exchange account", - TalerErrorCode.BANK_ACCOUNT_IS_EXCHANGE - ) - is TransferResult.ReserveUidReuse -> throw conflict( + TransferResult.RequestUidReuse -> throw conflict( "request_uid used already", TalerErrorCode.BANK_TRANSFER_REQUEST_UID_REUSED ) - is TransferResult.BalanceInsufficient -> throw conflict( - "Insufficient balance for exchange", - TalerErrorCode.BANK_UNALLOWED_DEBIT - ) is TransferResult.Success -> call.respond( TransferResponse( timestamp = res.timestamp, row_id = res.id ) ) - }*/ + } } - /*suspend fun <T> PipelineContext<Unit, ApplicationCall>.historyEndpoint( + suspend fun <T> PipelineContext<Unit, ApplicationCall>.historyEndpoint( reduce: (List<T>, String) -> Any, - dbLambda: suspend ExchangeDAO.(HistoryParams, Long, BankPaytoCtx) -> List<T> + dbLambda: suspend ExchangeDAO.(HistoryParams) -> List<T> ) { val params = HistoryParams.extract(context.request.queryParameters) - val bankAccount = call.bankInfo(db, ctx.payto) - - if (!bankAccount.isTalerExchange) - throw conflict( - "$username is not an exchange account.", - TalerErrorCode.BANK_ACCOUNT_IS_NOT_EXCHANGE - ) - - val items = db.exchange.dbLambda(params, bankAccount.bankAccountId, ctx.payto) - + val items = db.exchange.dbLambda(params) if (items.isEmpty()) { call.respond(HttpStatusCode.NoContent) } else { - call.respond(reduce(items, bankAccount.payto)) + call.respond(reduce(items, cfg.payto)) } - }*/ - /*get("/taler-wire-gateway/history/incoming") { + } + get("/taler-wire-gateway/history/incoming") { historyEndpoint(::IncomingHistory, ExchangeDAO::incomingHistory) } get("/taler-wire-gateway/history/outgoing") { historyEndpoint(::OutgoingHistory, ExchangeDAO::outgoingHistory) - }*/ + } post("/taler-wire-gateway/admin/add-incoming") { val req = call.receive<AddIncomingRequest>() cfg.checkCurrency(req.amount) + req.debit_account.expectRequestIban() val timestamp = Instant.now() val bankId = run { val bytes = ByteArray(16) @@ -122,7 +107,6 @@ fun Routing.wireGatewayApi(db: Database, cfg: NexusConfig) { "reserve_pub used already", TalerErrorCode.BANK_DUPLICATE_RESERVE_PUB_SUBJECT ) - // TODO timestamp when idempotent is IncomingRegistrationResult.Success -> call.respond( AddIncomingResponse( timestamp = TalerProtocolTimestamp(timestamp), diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/api/helpers.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/api/helpers.kt @@ -0,0 +1,65 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.nexus.api + +import tech.libeufin.nexus.* +import tech.libeufin.common.* +import tech.libeufin.common.api.* +import io.ktor.http.* +import io.ktor.server.application.* +import io.ktor.server.response.* +import io.ktor.server.routing.* +import io.ktor.util.* +import io.ktor.util.pipeline.* + +/** Apply api configuration for a route: conditional access and authentication */ +fun Route.authApi(cfg: ApiConfig?, callback: Route.() -> Unit): Route = + intercept(callback) { + if (cfg == null) { + throw apiError(HttpStatusCode.NotImplemented, "API not implemented", TalerErrorCode.END) + } + val header = context.request.headers["Authorization"] + // Basic auth challenge + when (cfg.authMethod) { + AuthMethod.None -> {} + is AuthMethod.Bearer -> { + if (header == null) { + context.response.header(HttpHeaders.WWWAuthenticate, "Bearer") + throw unauthorized( + "Authorization header not found", + TalerErrorCode.GENERIC_PARAMETER_MISSING + ) + } + val (scheme, content) = header.splitOnce(" ") ?: throw badRequest( + "Authorization is invalid", + TalerErrorCode.GENERIC_HTTP_HEADERS_MALFORMED + ) + when (scheme) { + "Bearer" -> { + // TODO choose between one of those + if (content != cfg.authMethod.token) { + throw unauthorized("Unknown token") + } + } + else -> throw unauthorized("Authorization method wrong or not supported") + } + } + } + } +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt @@ -18,9 +18,12 @@ */ package tech.libeufin.nexus.db +import kotlinx.coroutines.* +import kotlinx.coroutines.flow.* +import org.slf4j.Logger +import org.slf4j.LoggerFactory import tech.libeufin.common.TalerAmount -import tech.libeufin.common.db.DatabaseConfig -import tech.libeufin.common.db.DbPool +import tech.libeufin.common.db.* import java.time.Instant /** @@ -39,7 +42,39 @@ data class InitiatedPayment( /** * Collects database connection steps and any operation on the Nexus tables. */ -class Database(dbConfig: DatabaseConfig): DbPool(dbConfig, "libeufin_nexus") { +class Database(dbConfig: DatabaseConfig, val bankCurrency: String): DbPool(dbConfig, "libeufin_nexus") { val payment = PaymentDAO(this) val initiated = InitiatedDAO(this) + val exchange = ExchangeDAO(this) + + private val outgoingTxFlows: MutableSharedFlow<Long> = MutableSharedFlow() + private val incomingTxFlows: MutableSharedFlow<Long> = MutableSharedFlow() + private val revenueTxFlows: MutableSharedFlow<Long> = MutableSharedFlow() + + init { + watchNotifications(pgSource, "libeufin_nexus", LoggerFactory.getLogger("libeufin-nexus-db-watcher"), mapOf( + "revenue_tx" to { + val id = it.toLong() + revenueTxFlows.emit(id) + }, + "outgoing_tx" to { + val id = it.toLong() + outgoingTxFlows.emit(id) + }, + "incoming_tx" to { + val id = it.toLong() + incomingTxFlows.emit(id) + } + )) + } + + /** Listen for new taler outgoing transactions */ + suspend fun <R> listenOutgoing(lambda: suspend (Flow<Long>) -> R): R + = lambda(outgoingTxFlows) + /** Listen for new taler incoming transactions */ + suspend fun <R> listenIncoming(lambda: suspend (Flow<Long>) -> R): R + = lambda(incomingTxFlows) + /** Listen for new incoming transactions */ + suspend fun <R> listenRevenue(lambda: suspend (Flow<Long>) -> R): R + = lambda(revenueTxFlows) } \ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/ExchangeDAO.kt @@ -0,0 +1,128 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.nexus.db + +import tech.libeufin.common.db.* +import tech.libeufin.common.* +import java.sql.ResultSet +import java.time.Instant + +/** Data access logic for exchange specific logic */ +class ExchangeDAO(private val db: Database) { + /** Query history of taler incoming transactions */ + suspend fun incomingHistory( + params: HistoryParams + ): List<IncomingReserveTransaction> + = db.poolHistoryGlobal(params, db::listenIncoming, """ + SELECT + incoming_transaction_id + ,execution_time + ,(amount).val AS amount_val + ,(amount).frac AS amount_frac + ,debit_payto_uri + ,reserve_public_key + FROM talerable_incoming_transactions + JOIN incoming_transactions USING(incoming_transaction_id) + WHERE + """, "incoming_transaction_id") { + IncomingReserveTransaction( + row_id = it.getLong("incoming_transaction_id"), + date = it.getTalerTimestamp("execution_time"), + amount = it.getAmount("amount", db.bankCurrency), + debit_account = it.getString("debit_payto_uri"), + reserve_pub = EddsaPublicKey(it.getBytes("reserve_public_key")), + ) + } + + /** Query [exchangeId] history of taler outgoing transactions */ + suspend fun outgoingHistory( + params: HistoryParams + ): List<OutgoingTransaction> + = db.poolHistoryGlobal(params, db::listenOutgoing, """ + SELECT + outgoing_transaction_id + ,execution_time AS execution_time + ,(amount).val AS amount_val + ,(amount).frac AS amount_frac + ,credit_payto_uri AS credit_payto_uri + ,wtid + ,exchange_base_url + FROM talerable_outgoing_transactions + JOIN outgoing_transactions USING(outgoing_transaction_id) + WHERE + """, "outgoing_transaction_id") { + OutgoingTransaction( + row_id = it.getLong("outgoing_transaction_id"), + date = it.getTalerTimestamp("execution_time"), + amount = it.getAmount("amount", db.bankCurrency), + credit_account = it.getString("credit_payto_uri"), + wtid = ShortHashCode(it.getBytes("wtid")), + exchange_base_url = it.getString("exchange_base_url") + ) + } + + /** Result of taler transfer transaction creation */ + sealed interface TransferResult { + /** Transaction [id] and wire transfer [timestamp] */ + data class Success(val id: Long, val timestamp: TalerProtocolTimestamp): TransferResult + data object RequestUidReuse: TransferResult + } + + /** Perform a Taler transfer */ + suspend fun transfer( + req: TransferRequest, + bankId: String, + now: Instant + ): TransferResult = db.serializable { conn -> + val subject = "${req.wtid} ${req.exchange_base_url.url}" + val stmt = conn.prepareStatement(""" + SELECT + out_request_uid_reuse + ,out_tx_row_id + ,out_timestamp + FROM + taler_transfer ( + ?, ?, ?, + (?,?)::taler_amount, + ?, ?, ?, ? + ); + """) + + stmt.setBytes(1, req.request_uid.raw) + stmt.setBytes(2, req.wtid.raw) + stmt.setString(3, subject) + stmt.setLong(4, req.amount.value) + stmt.setInt(5, req.amount.frac) + stmt.setString(6, req.exchange_base_url.url) + stmt.setString(7, req.credit_account.canonical) + stmt.setString(8, bankId) + stmt.setLong(9, now.micros()) + + stmt.one { + when { + it.getBoolean("out_request_uid_reuse") -> TransferResult.RequestUidReuse + else -> TransferResult.Success( + id = it.getLong("out_tx_row_id"), + timestamp = it.getTalerTimestamp("out_timestamp") + ) + } + } + } +} +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt @@ -22,6 +22,7 @@ package tech.libeufin.nexus.db import tech.libeufin.common.asInstant import tech.libeufin.common.db.all import tech.libeufin.common.db.executeUpdateViolation +import tech.libeufin.common.db.oneUniqueViolation import tech.libeufin.common.db.getAmount import tech.libeufin.common.db.oneOrNull import tech.libeufin.common.micros @@ -32,9 +33,9 @@ import java.time.Instant class InitiatedDAO(private val db: Database) { /** Outgoing payments initiation result */ - enum class PaymentInitiationResult { - REQUEST_UID_REUSE, - SUCCESS + sealed interface PaymentInitiationResult { + data class Success(val id: Long): PaymentInitiationResult + data object RequestUidReuse: PaymentInitiationResult } /** Register a new pending payment in the database */ @@ -47,16 +48,18 @@ class InitiatedDAO(private val db: Database) { ,initiation_time ,request_uid ) VALUES ((?,?)::taler_amount,?,?,?,?) + RETURNING initiated_outgoing_transaction_id """) + // TODO check payto uri stmt.setLong(1, paymentData.amount.value) stmt.setInt(2, paymentData.amount.frac) stmt.setString(3, paymentData.wireTransferSubject) stmt.setString(4, paymentData.creditPaytoUri.toString()) stmt.setLong(5, paymentData.initiationTime.micros()) stmt.setString(6, paymentData.requestUid) - if (stmt.executeUpdateViolation()) - return@conn PaymentInitiationResult.SUCCESS - return@conn PaymentInitiationResult.REQUEST_UID_REUSE + stmt.oneUniqueViolation(PaymentInitiationResult.RequestUidReuse) { + PaymentInitiationResult.Success(it.getLong("initiated_outgoing_transaction_id")) + } } /** Register EBICS submission success */ diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt @@ -35,10 +35,14 @@ class PaymentDAO(private val db: Database) { ) /** Register an outgoing payment reconciling it with its initiated payment counterpart if present */ - suspend fun registerOutgoing(paymentData: OutgoingPayment): OutgoingRegistrationResult = db.conn { + suspend fun registerOutgoing( + paymentData: OutgoingPayment, + wtid: ShortHashCode?, + baseUrl: ExchangeUrl?, + ): OutgoingRegistrationResult = db.conn { val stmt = it.prepareStatement(""" SELECT out_tx_id, out_initiated, out_found - FROM register_outgoing((?,?)::taler_amount,?,?,?,?) + FROM register_outgoing((?,?)::taler_amount,?,?,?,?,?,?) """) val executionTime = paymentData.executionTime.micros() stmt.setLong(1, paymentData.amount.value) @@ -47,6 +51,17 @@ class PaymentDAO(private val db: Database) { stmt.setLong(4, executionTime) stmt.setString(5, paymentData.creditPaytoUri) stmt.setString(6, paymentData.messageId) + if (wtid != null) { + stmt.setBytes(7, wtid.raw) + } else { + stmt.setNull(7, java.sql.Types.NULL) + } + if (baseUrl != null) { + stmt.setString(8, baseUrl.url) + } else { + stmt.setNull(8, java.sql.Types.NULL) + } + stmt.one { OutgoingRegistrationResult( it.getLong("out_tx_id"), @@ -127,6 +142,52 @@ class PaymentDAO(private val db: Database) { } } + /** Register an incoming payment */ + suspend fun registerIncoming( + paymentData: IncomingPayment + ): IncomingRegistrationResult.Success = db.conn { conn -> + val stmt = conn.prepareStatement(""" + SELECT out_found, out_tx_id + FROM register_incoming((?,?)::taler_amount,?,?,?,?) + """) + val executionTime = paymentData.executionTime.micros() + stmt.setLong(1, paymentData.amount.value) + stmt.setInt(2, paymentData.amount.frac) + stmt.setString(3, paymentData.wireTransferSubject) + stmt.setLong(4, executionTime) + stmt.setString(5, paymentData.debitPaytoUri) + stmt.setString(6, paymentData.bankId) + stmt.one { + IncomingRegistrationResult.Success( + it.getLong("out_tx_id"), + !it.getBoolean("out_found") + ) + } + } + + /** Query history of incoming transactions */ + suspend fun revenueHistory( + params: HistoryParams + ): List<RevenueIncomingBankTransaction> + = db.poolHistoryGlobal(params, db::listenRevenue, """ + SELECT + incoming_transaction_id + ,execution_time + ,(amount).val AS amount_val + ,(amount).frac AS amount_frac + ,debit_payto_uri + ,wire_transfer_subject + FROM incoming_transactions WHERE + """, "incoming_transaction_id") { + RevenueIncomingBankTransaction( + row_id = it.getLong("incoming_transaction_id"), + date = it.getTalerTimestamp("execution_time"), + amount = it.getAmount("amount", db.bankCurrency), + debit_account = it.getString("debit_payto_uri"), + subject = it.getString("wire_transfer_subject") + ) + } + /** List incoming transaction metadata for debugging */ suspend fun metadataIncoming(): List<IncomingTxMetadata> = db.conn { conn -> val stmt = conn.prepareStatement(""" diff --git a/nexus/src/test/kotlin/CliTest.kt b/nexus/src/test/kotlin/CliTest.kt @@ -118,4 +118,17 @@ class CliTest { nexusCmd.testErr("ebics-setup -c $conf", "Could not write client private keys at '$clientKeysPath': permission denied on '${clientKeysPath.parent}'") } } + + /** Test server check */ + @Test + fun serveCheck() { + val confs = listOf( + "mini" to 1, + "test" to 0 + ) + for ((conf, statusCode) in confs) { + val result = nexusCmd.test("serve --check -c conf/$conf.conf") + assertEquals(statusCode, result.statusCode) + } + } } \ No newline at end of file diff --git a/nexus/src/test/kotlin/DatabaseTest.kt b/nexus/src/test/kotlin/DatabaseTest.kt @@ -18,10 +18,12 @@ */ import org.junit.Test -import tech.libeufin.common.TalerAmount +import tech.libeufin.common.* import tech.libeufin.nexus.db.InitiatedDAO.PaymentInitiationResult +import tech.libeufin.nexus.* import java.time.Instant import kotlin.test.assertEquals +import kotlin.test.assertIs import kotlin.test.assertFalse import kotlin.test.assertNull import kotlin.test.assertTrue @@ -30,32 +32,43 @@ class OutgoingPaymentsTest { @Test fun register() = setup { db, _ -> // With reconciling - genOutPay("paid by nexus", "first").run { - assertEquals( - PaymentInitiationResult.SUCCESS, - db.initiated.create(genInitPay("waiting for reconciliation", "first")) + genOutPay("paid by nexus").run { + assertIs<PaymentInitiationResult.Success>( + db.initiated.create(genInitPay("waiting for reconciliation", messageId)) ) - db.payment.registerOutgoing(this).run { - assertTrue(new,) + db.payment.registerOutgoing(this, null, null).run { + assertTrue(new) assertTrue(initiated) } - db.payment.registerOutgoing(this).run { + db.payment.registerOutgoing(this, null, null).run { assertFalse(new) assertTrue(initiated) } } // Without reconciling - genOutPay("not paid by nexus", "second").run { - db.payment.registerOutgoing(this).run { + genOutPay("not paid by nexus").run { + db.payment.registerOutgoing(this, null, null).run { assertTrue(new) assertFalse(initiated) } - db.payment.registerOutgoing(this).run { + db.payment.registerOutgoing(this, null, null).run { assertFalse(new) assertFalse(initiated) } } } + + @Test + fun talerable() = setup { db, _ -> + val wtid = ShortHashCode.rand() + val url = "https://exchange.com" + genOutPay("$wtid $url").run { + assertIs<PaymentInitiationResult.Success>( + db.initiated.create(genInitPay("waiting for reconciliation", messageId)) + ) + ingestOutgoingPayment(db, this) + } + } } class IncomingPaymentsTest { @@ -117,8 +130,7 @@ class PaymentInitiationsTest { @Test fun status() = setup { db, _ -> - assertEquals( - PaymentInitiationResult.SUCCESS, + assertIs<PaymentInitiationResult.Success>( db.initiated.create(genInitPay(requestUid = "PAY1")) ) db.initiated.submissionFailure(1, Instant.now(), "First failure") @@ -126,8 +138,7 @@ class PaymentInitiationsTest { db.initiated.submissionSuccess(1, Instant.now(), "ORDER1") assertEquals(Pair("PAY1", null), db.initiated.logFailure("ORDER1")) - assertEquals( - PaymentInitiationResult.SUCCESS, + assertIs<PaymentInitiationResult.Success>( db.initiated.create(genInitPay(requestUid = "PAY2")) ) db.initiated.submissionFailure(2, Instant.now(), "First failure") @@ -135,8 +146,7 @@ class PaymentInitiationsTest { db.initiated.logMessage("ORDER2", "status msg") assertEquals(Pair("PAY2", "status msg"), db.initiated.logFailure("ORDER2")) - assertEquals( - PaymentInitiationResult.SUCCESS, + assertIs<PaymentInitiationResult.Success>( db.initiated.create(genInitPay(requestUid = "PAY3")) ) db.initiated.submissionSuccess(3, Instant.now(), "ORDER3") @@ -146,15 +156,13 @@ class PaymentInitiationsTest { assertNull(db.initiated.logSuccess("ORDER_X")) assertNull(db.initiated.logFailure("ORDER_X")) - assertEquals( - PaymentInitiationResult.SUCCESS, + assertIs<PaymentInitiationResult.Success>( db.initiated.create(genInitPay(requestUid = "PAY4")) ) db.initiated.bankMessage("PAY4", "status progress") db.initiated.bankFailure("PAY4", "status failure") - assertEquals( - PaymentInitiationResult.SUCCESS, + assertIs<PaymentInitiationResult.Success>( db.initiated.create(genInitPay(requestUid = "PAY5")) ) db.initiated.bankMessage("PAY5", "status progress") @@ -164,8 +172,7 @@ class PaymentInitiationsTest { @Test fun submittable() = setup { db, _ -> for (i in 0..5) { - assertEquals( - PaymentInitiationResult.SUCCESS, + assertIs<PaymentInitiationResult.Success>( db.initiated.create(genInitPay(requestUid = "PAY$i")) ) } diff --git a/nexus/src/test/kotlin/Iso20022Test.kt b/nexus/src/test/kotlin/Iso20022Test.kt @@ -55,14 +55,14 @@ class Iso20022Test { amount = TalerAmount("CHF:10"), wireTransferSubject = "G1XTY6HGWGMVRM7E6XQ4JHJK561ETFDFTJZ7JVGV543XZCB27YBG", executionTime = instant("2023-12-19"), - debitPaytoUri = "payto://iban/CH7389144832588726658?receiver-name=Mr+Test" + debitPaytoUri = "payto://iban/CH7389144832588726658?receiver-name=Mr%20Test" ), IncomingPayment( bankId = "62e2b511-7313-4ccd-8d40-c9d8e612cd71", amount = TalerAmount("CHF:2.53"), wireTransferSubject = "G1XTY6HGWGMVRM7E6XQ4JHJK561ETFDFTJZ7JVGV543XZCB27YB", executionTime = instant("2023-12-19"), - debitPaytoUri = "payto://iban/CH7389144832588726658?receiver-name=Mr+Test" + debitPaytoUri = "payto://iban/CH7389144832588726658?receiver-name=Mr%20Test" ) ), txs @@ -101,21 +101,21 @@ class Iso20022Test { amount = TalerAmount("EUR:2"), wireTransferSubject = "TestABC123", executionTime = instant("2024-04-18"), - creditPaytoUri = "payto://iban/DE20500105172419259181?receiver-name=John+Smith" + creditPaytoUri = "payto://iban/DE20500105172419259181?receiver-name=John%20Smith" ), OutgoingPayment( messageId = "YF5QBARGQ0MNY0VK59S477VDG4", amount = TalerAmount("EUR:1.1"), wireTransferSubject = "This should fail because dummy", executionTime = instant("2024-04-18"), - creditPaytoUri = "payto://iban/DE20500105172419259181?receiver-name=John+Smith" + creditPaytoUri = "payto://iban/DE20500105172419259181?receiver-name=John%20Smith" ), IncomingPayment( bankId = "BYLADEM1WOR-G2910276709458A2", amount = TalerAmount("EUR:3"), wireTransferSubject = "Taler FJDQ7W6G7NWX4H9M1MKA12090FRC9K7DA6N0FANDZZFXTR6QHX5G Test.,-", executionTime = instant("2024-04-12"), - debitPaytoUri = "payto://iban/DE84500105177118117964?receiver-name=John+Smith" + debitPaytoUri = "payto://iban/DE84500105177118117964?receiver-name=John%20Smith" ), Reversal( msgId = "G27KNKZAR5DV7HRB085YMA9GB4", @@ -138,21 +138,21 @@ class Iso20022Test { amount = TalerAmount("EUR:2"), wireTransferSubject = "TestABC123", executionTime = instant("2024-04-18"), - creditPaytoUri = "payto://iban/DE20500105172419259181?receiver-name=John+Smith" + creditPaytoUri = "payto://iban/DE20500105172419259181?receiver-name=John%20Smith" ), OutgoingPayment( messageId = "YF5QBARGQ0MNY0VK59S477VDG4", amount = TalerAmount("EUR:1.1"), wireTransferSubject = "This should fail because dummy", executionTime = instant("2024-04-18"), - creditPaytoUri = "payto://iban/DE20500105172419259181?receiver-name=John+Smith" + creditPaytoUri = "payto://iban/DE20500105172419259181?receiver-name=John%20Smith" ), IncomingPayment( bankId = "BYLADEM1WOR-G2910276709458A2", amount = TalerAmount("EUR:3"), wireTransferSubject = "Taler FJDQ7W6G7NWX4H9M1MKA12090FRC9K7DA6N0FANDZZFXTR6QHX5G Test.,-", executionTime = instant("2024-04-12"), - debitPaytoUri = "payto://iban/DE84500105177118117964?receiver-name=John+Smith" + debitPaytoUri = "payto://iban/DE84500105177118117964?receiver-name=John%20Smith" ), Reversal( msgId = "G27KNKZAR5DV7HRB085YMA9GB4", @@ -175,7 +175,7 @@ class Iso20022Test { amount = TalerAmount("EUR:2.5"), wireTransferSubject = "Test ICT", executionTime = instant("2024-05-05"), - debitPaytoUri = "payto://iban/DE84500105177118117964?receiver-name=Mr+Test" + debitPaytoUri = "payto://iban/DE84500105177118117964?receiver-name=Mr%20Test" ) ), txs diff --git a/nexus/src/test/kotlin/RevenueApiTest.kt b/nexus/src/test/kotlin/RevenueApiTest.kt @@ -0,0 +1,65 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +import io.ktor.http.* +import org.junit.Test +import tech.libeufin.common.* +import tech.libeufin.nexus.* + +class RevenueApiTest { + // GET /taler-revenue/config + @Test + fun config() = serverSetup { + authRoutine(HttpMethod.Get, "/taler-revenue/config") + + client.getA("/taler-revenue/config").assertOk() + } + + // GET /taler-revenue/history + @Test + fun history() = serverSetup { db -> + authRoutine(HttpMethod.Get, "/taler-revenue/history") + + historyRoutine<RevenueIncomingHistory>( + url = "/taler-revenue/history", + ids = { it.incoming_transactions.map { it.row_id } }, + registered = listOf( + { + // Transactions using clean transfer logic + talerableIn(db) + }, + { + // Common credit transactions + ingestIn(db) + } + ), + ignored = listOf( + { + // Ignore debit transactions + talerableOut(db) + } + ) + ) + } + + @Test + fun noApi() = serverSetup("mini.conf") { + client.getA("/taler-revenue/config").assertNotImplemented() + } +} +\ No newline at end of file diff --git a/nexus/src/test/kotlin/WireGatewayApiTest.kt b/nexus/src/test/kotlin/WireGatewayApiTest.kt @@ -24,47 +24,42 @@ import io.ktor.http.* import io.ktor.server.testing.* import org.junit.Test import tech.libeufin.common.* +import tech.libeufin.nexus.* class WireGatewayApiTest { - // GET /accounts/{USERNAME}/taler-wire-gateway/config + // GET /taler-wire-gateway/config @Test fun config() = serverSetup { _ -> - //authRoutine(HttpMethod.Get, "/accounts/merchant/taler-wire-gateway/config") + authRoutine(HttpMethod.Get, "/taler-wire-gateway/config") - client.get("/taler-wire-gateway/config").assertOk() + client.getA("/taler-wire-gateway/config").assertOk() } - // Testing the POST /transfer call from the TWG API. - /*@Test - fun transfer() = bankSetup { _ -> + // POST /taler-wire-gateway/transfer + @Test + fun transfer() = serverSetup { _ -> val valid_req = obj { "request_uid" to HashCode.rand() - "amount" to "KUDOS:55" + "amount" to "CHF:55" "exchange_base_url" to "http://exchange.example.com/" "wtid" to ShortHashCode.rand() - "credit_account" to merchantPayto.canonical + "credit_account" to grothoffPayto } - authRoutine(HttpMethod.Post, "/accounts/merchant/taler-wire-gateway/transfer", valid_req) - - // Checking exchange debt constraint. - client.postA("/accounts/exchange/taler-wire-gateway/transfer") { - json(valid_req) - }.assertConflict(TalerErrorCode.BANK_UNALLOWED_DEBIT) + authRoutine(HttpMethod.Post, "/taler-wire-gateway/transfer") - // Giving debt allowance and checking the OK case. - setMaxDebt("exchange", "KUDOS:1000") - client.postA("/accounts/exchange/taler-wire-gateway/transfer") { + // Check OK + client.postA("/taler-wire-gateway/transfer") { json(valid_req) }.assertOk() // check idempotency - client.postA("/accounts/exchange/taler-wire-gateway/transfer") { + client.postA("/taler-wire-gateway/transfer") { json(valid_req) }.assertOk() // Trigger conflict due to reused request_uid - client.postA("/accounts/exchange/taler-wire-gateway/transfer") { + client.postA("/taler-wire-gateway/transfer") { json(valid_req) { "wtid" to ShortHashCode.rand() "exchange_base_url" to "http://different-exchange.example.com/" @@ -72,132 +67,117 @@ class WireGatewayApiTest { }.assertConflict(TalerErrorCode.BANK_TRANSFER_REQUEST_UID_REUSED) // Currency mismatch - client.postA("/accounts/exchange/taler-wire-gateway/transfer") { + client.postA("/taler-wire-gateway/transfer") { json(valid_req) { "amount" to "EUR:33" } }.assertBadRequest(TalerErrorCode.GENERIC_CURRENCY_MISMATCH) - // Unknown account - client.postA("/accounts/exchange/taler-wire-gateway/transfer") { - json(valid_req) { - "request_uid" to HashCode.rand() - "wtid" to ShortHashCode.rand() - "credit_account" to unknownPayto - } - }.assertConflict(TalerErrorCode.BANK_UNKNOWN_CREDITOR) - - // Same account - client.postA("/accounts/exchange/taler-wire-gateway/transfer") { - json(valid_req) { - "request_uid" to HashCode.rand() - "wtid" to ShortHashCode.rand() - "credit_account" to exchangePayto - } - }.assertConflict(TalerErrorCode.BANK_ACCOUNT_IS_EXCHANGE) - // Bad BASE32 wtid - client.postA("/accounts/exchange/taler-wire-gateway/transfer") { + client.postA("/taler-wire-gateway/transfer") { json(valid_req) { "wtid" to "I love chocolate" } }.assertBadRequest() // Bad BASE32 len wtid - client.postA("/accounts/exchange/taler-wire-gateway/transfer") { + client.postA("/taler-wire-gateway/transfer") { json(valid_req) { - "wtid" to randBase32Crockford(31) + "wtid" to Base32Crockford.encode(ByteArray(31).rand()) } }.assertBadRequest() // Bad BASE32 request_uid - client.postA("/accounts/exchange/taler-wire-gateway/transfer") { + client.postA("/taler-wire-gateway/transfer") { json(valid_req) { "request_uid" to "I love chocolate" } }.assertBadRequest() // Bad BASE32 len wtid - client.postA("/accounts/exchange/taler-wire-gateway/transfer") { + client.postA("/taler-wire-gateway/transfer") { + json(valid_req) { + "request_uid" to Base32Crockford.encode(ByteArray(65).rand()) + } + }.assertBadRequest() + + // Bad payto kind + client.postA("/taler-wire-gateway/transfer") { json(valid_req) { - "request_uid" to randBase32Crockford(65) + "credit_account" to "payto://x-taler-bank/bank.hostname.test/bar" } }.assertBadRequest() - }*/ - /* - /** - * Testing the /history/incoming call from the TWG API. - */ + } + + // GET /taler-wire-gateway/history/incoming @Test - fun historyIncoming() = serverSetup { - // Give Foo reasonable debt allowance: - setMaxDebt("merchant", "KUDOS:1000") - authRoutine(HttpMethod.Get, "/accounts/merchant/taler-wire-gateway/history/incoming") + fun historyIncoming() = serverSetup { db -> + authRoutine(HttpMethod.Get, "/taler-wire-gateway/history/incoming") historyRoutine<IncomingHistory>( - url = "/accounts/exchange/taler-wire-gateway/history/incoming", + url = "/taler-wire-gateway/history/incoming", ids = { it.incoming_transactions.map { it.row_id } }, registered = listOf( { - // Transactions using clean add incoming logic - addIncoming("KUDOS:10") + client.postA("/taler-wire-gateway/admin/add-incoming") { + json { + "amount" to "CHF:12" + "reserve_pub" to EddsaPublicKey.rand() + "debit_account" to grothoffPayto + } + }.assertOk() }, { // Transactions using raw bank transaction logic - tx("merchant", "KUDOS:10", "exchange", "history test with ${ShortHashCode.rand()} reserve pub") - }, - { - // Transaction using withdraw logic - withdrawal("KUDOS:9") + talerableIn(db) } ), ignored = listOf( { // Ignore malformed incoming transaction - tx("merchant", "KUDOS:10", "exchange", "ignored") + ingestIn(db) }, { - // Ignore malformed outgoing transaction - tx("exchange", "KUDOS:10", "merchant", "ignored") + // Ignore outgoing transaction + talerableOut(db) } ) ) } - - /** - * Testing the /history/outgoing call from the TWG API. - */ + // GET /taler-wire-gateway/history/outgoing @Test - fun historyOutgoing() = serverSetup { - setMaxDebt("exchange", "KUDOS:1000000") - authRoutine(HttpMethod.Get, "/accounts/merchant/taler-wire-gateway/history/outgoing") + fun historyOutgoing() = serverSetup { db -> + authRoutine(HttpMethod.Get, "/taler-wire-gateway/history/outgoing") historyRoutine<OutgoingHistory>( - url = "/accounts/exchange/taler-wire-gateway/history/outgoing", + url = "/taler-wire-gateway/history/outgoing", ids = { it.outgoing_transactions.map { it.row_id } }, registered = listOf( - { - // Transactions using clean add incoming logic - transfer("KUDOS:10") + { + talerableOut(db) } ), ignored = listOf( { - // gnore manual incoming transaction - tx("exchange", "KUDOS:10", "merchant", "${ShortHashCode.rand()} http://exchange.example.com/") + // Ignore pending transfers + transfer() + }, + { + // Ignore manual incoming transaction + talerableIn(db) }, { // Ignore malformed incoming transaction - tx("merchant", "KUDOS:10", "exchange", "ignored") + ingestIn(db) }, { // Ignore malformed outgoing transaction - tx("exchange", "KUDOS:10", "merchant", "ignored") + ingestOutgoingPayment(db, genOutPay("ignored")) } ) ) - }*/ + } - // Testing the /admin/add-incoming call from the TWG API. + // POST /taler-wire-gateway/admin/add-incoming @Test fun addIncoming() = serverSetup { _ -> val valid_req = obj { @@ -206,35 +186,47 @@ class WireGatewayApiTest { "debit_account" to grothoffPayto } - //authRoutine(HttpMethod.Post, "/accounts/merchant/taler-wire-gateway/admin/add-incoming", valid_req, requireAdmin = true) + authRoutine(HttpMethod.Post, "/taler-wire-gateway/admin/add-incoming") // Check OK - client.post("/taler-wire-gateway/admin/add-incoming") { + client.postA("/taler-wire-gateway/admin/add-incoming") { json(valid_req) }.assertOk() // Trigger conflict due to reused reserve_pub - client.post("/taler-wire-gateway/admin/add-incoming") { + client.postA("/taler-wire-gateway/admin/add-incoming") { json(valid_req) }.assertConflict(TalerErrorCode.BANK_DUPLICATE_RESERVE_PUB_SUBJECT) // Currency mismatch - client.post("/taler-wire-gateway/admin/add-incoming") { + client.postA("/taler-wire-gateway/admin/add-incoming") { json(valid_req) { "amount" to "EUR:33" } }.assertBadRequest(TalerErrorCode.GENERIC_CURRENCY_MISMATCH) // Bad BASE32 reserve_pub - client.post("/taler-wire-gateway/admin/add-incoming") { + client.postA("/taler-wire-gateway/admin/add-incoming") { json(valid_req) { "reserve_pub" to "I love chocolate" } }.assertBadRequest() // Bad BASE32 len reserve_pub - client.post("/taler-wire-gateway/admin/add-incoming") { + client.postA("/taler-wire-gateway/admin/add-incoming") { json(valid_req) { "reserve_pub" to Base32Crockford.encode(ByteArray(31).rand()) } }.assertBadRequest() + + // Bad payto kind + client.postA("/taler-wire-gateway/admin/add-incoming") { + json(valid_req) { + "debit_account" to "payto://x-taler-bank/bank.hostname.test/bar" + } + }.assertBadRequest() + } + + @Test + fun noApi() = serverSetup("mini.conf") { _ -> + client.get("/taler-wire-gateway/config").assertNotImplemented() } } \ No newline at end of file diff --git a/nexus/src/test/kotlin/helpers.kt b/nexus/src/test/kotlin/helpers.kt @@ -24,10 +24,8 @@ import io.ktor.client.statement.* import io.ktor.http.* import io.ktor.server.testing.* import kotlinx.coroutines.runBlocking -import tech.libeufin.common.TalerAmount -import tech.libeufin.common.db.dbInit -import tech.libeufin.common.db.pgDataSource -import tech.libeufin.common.fromFile +import tech.libeufin.common.* +import tech.libeufin.common.db.* import tech.libeufin.nexus.* import tech.libeufin.nexus.db.Database import tech.libeufin.nexus.db.InitiatedPayment @@ -49,17 +47,17 @@ fun setup( ) = runBlocking { val config = NEXUS_CONFIG_SOURCE.fromFile(Path("conf/$conf")) val dbCfg = config.dbConfig() - val ctx = NexusConfig(config) + val cfg = NexusConfig(config) pgDataSource(dbCfg.dbConnStr).dbInit(dbCfg, "libeufin-nexus", true) - Database(dbCfg).use { - lambda(it, ctx) + Database(dbCfg, cfg.currency).use { + lambda(it, cfg) } } fun serverSetup( conf: String = "test.conf", lambda: suspend ApplicationTestBuilder.(Database) -> Unit -) = setup { db, cfg -> +) = setup(conf) { db, cfg -> testApplication { application { nexusApi(db, cfg) @@ -79,7 +77,7 @@ fun getMockedClient( followRedirects = false engine { addHandler { - request -> handler(request) + request -> handler(request) } } } @@ -98,21 +96,102 @@ fun genInitPay( ) // Generates an incoming payment, given its subject. -fun genInPay(subject: String) = - IncomingPayment( - amount = TalerAmount(44, 0, "KUDOS"), +fun genInPay(subject: String, amount: String = "KUDOS:44"): IncomingPayment { + val bankId = run { + val bytes = ByteArray(16) + kotlin.random.Random.nextBytes(bytes) + Base32Crockford.encode(bytes) + } + return IncomingPayment( + amount = TalerAmount(amount), debitPaytoUri = "payto://iban/not-used", wireTransferSubject = subject, executionTime = Instant.now(), - bankId = "entropic" + bankId = bankId ) +} // Generates an outgoing payment, given its subject and messageId -fun genOutPay(subject: String, messageId: String) = - OutgoingPayment( +fun genOutPay(subject: String, messageId: String? = null): OutgoingPayment { + val id = messageId ?: run { + val bytes = ByteArray(16) + kotlin.random.Random.nextBytes(bytes) + Base32Crockford.encode(bytes) + } + return OutgoingPayment( amount = TalerAmount(44, 0, "KUDOS"), creditPaytoUri = "payto://iban/CH4189144589712575493?receiver-name=Test", wireTransferSubject = subject, executionTime = Instant.now(), - messageId = messageId - ) -\ No newline at end of file + messageId = id + ) +} + +/** Perform a taler outgoing transaction */ +suspend fun ApplicationTestBuilder.transfer() { + client.postA("/taler-wire-gateway/transfer") { + json { + "request_uid" to HashCode.rand() + "amount" to "CHF:55" + "exchange_base_url" to "http://exchange.example.com/" + "wtid" to ShortHashCode.rand() + "credit_account" to grothoffPayto + } + }.assertOk() +} + +/** Ingest a talerable outgoing transaction */ +suspend fun talerableOut(db: Database) { + val wtid = ShortHashCode.rand() + ingestOutgoingPayment(db, genOutPay("$wtid http://exchange.example.com/")) +} + +/** Ingest a talerable incoming transaction */ +suspend fun talerableIn(db: Database) { + val reserve_pub = ShortHashCode.rand() + ingestIncomingPayment(db, genInPay("history test with $reserve_pub reserve pub"), AccountType.exchange) +} + +/** Ingest an incoming transaction */ +suspend fun ingestIn(db: Database) { + ingestIncomingPayment(db, genInPay("ignored"), AccountType.normal) +} + + +/* ----- Auth ----- */ + +/** Auto auth get request */ +suspend inline fun HttpClient.getA(url: String, builder: HttpRequestBuilder.() -> Unit = {}): HttpResponse { + return get(url) { + auth() + builder(this) + } +} + +/** Auto auth post request */ +suspend inline fun HttpClient.postA(url: String, builder: HttpRequestBuilder.() -> Unit = {}): HttpResponse { + return post(url) { + auth() + builder(this) + } +} + +/** Auto auth patch request */ +suspend inline fun HttpClient.patchA(url: String, builder: HttpRequestBuilder.() -> Unit = {}): HttpResponse { + return patch(url) { + auth() + builder(this) + } +} + +/** Auto auth delete request */ +suspend inline fun HttpClient.deleteA(url: String, builder: HttpRequestBuilder.() -> Unit = {}): HttpResponse { + return delete(url) { + auth() + builder(this) + } +} + +fun HttpRequestBuilder.auth() { + headers["Authorization"] = "Bearer secret-token" +} +\ No newline at end of file diff --git a/nexus/src/test/kotlin/routines.kt b/nexus/src/test/kotlin/routines.kt @@ -0,0 +1,73 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +import io.ktor.client.request.* +import io.ktor.client.statement.* +import io.ktor.http.* +import io.ktor.server.testing.* +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.serialization.json.JsonObject +import tech.libeufin.common.* +import tech.libeufin.common.test.* +import kotlin.test.assertEquals + + +// Test endpoint is correctly authenticated +suspend fun ApplicationTestBuilder.authRoutine( + method: HttpMethod, + path: String +) { + // No header + client.request(path) { + this.method = method + }.assertUnauthorized(TalerErrorCode.GENERIC_PARAMETER_MISSING) + + // Bad header + client.request(path) { + this.method = method + headers["Authorization"] = "WTF" + }.assertBadRequest(TalerErrorCode.GENERIC_HTTP_HEADERS_MALFORMED) + + // Bad token + client.request(path) { + this.method = method + headers["Authorization"] = "Bearer bad-token" + }.assertUnauthorized() + + // GLS deployment + // - testing did work ? + // token - basic bearer + // libeufin-nexus + // - wire gateway try camt.052 files +} + + +suspend inline fun <reified B> ApplicationTestBuilder.historyRoutine( + url: String, + crossinline ids: (B) -> List<Long>, + registered: List<suspend () -> Unit>, + ignored: List<suspend () -> Unit> = listOf(), + polling: Boolean = true +) { + abstractHistoryRoutine(ids, registered, ignored, polling) { params: String -> + client.getA("$url?$params") + } +} +\ No newline at end of file diff --git a/testbench/src/test/kotlin/IntegrationTest.kt b/testbench/src/test/kotlin/IntegrationTest.kt @@ -29,6 +29,7 @@ import kotlinx.coroutines.runBlocking import org.junit.Test import tech.libeufin.bank.* import tech.libeufin.common.* +import tech.libeufin.common.api.engine import tech.libeufin.common.db.one import tech.libeufin.nexus.* import java.time.Instant @@ -68,8 +69,10 @@ fun server(lambda: () -> Unit) { fun setup(conf: String, lambda: suspend (NexusDb) -> Unit) { try { runBlocking { - val cfg = loadConfig(Path(conf)).dbConfig() - NexusDb(cfg).use { + val cfg = loadConfig(Path(conf)) + val dbCfg = cfg.dbConfig() + val currency = cfg.requireString("nexus-ebics", "currency") + NexusDb(dbCfg, currency).use { lambda(it) } } @@ -109,6 +112,11 @@ class IntegrationTest { } bankCmd.run("gc $flags") + + server { + nexusCmd.run("serve $flags") + } + engine?.stop(0, 0) } @Test @@ -127,7 +135,7 @@ class IntegrationTest { } } - setup("conf/integration.conf") { db -> + setup("conf/integration.conf") { db -> val userPayTo = IbanPayto.rand() val fiatPayTo = IbanPayto.rand() @@ -148,14 +156,14 @@ class IntegrationTest { ) assertException("ERROR: cashin failed: missing exchange account") { - ingestIncomingPayment(db, payment) + ingestIncomingPayment(db, payment, AccountType.exchange) } // Create exchange account bankCmd.run("create-account $flags -u exchange -p password --name 'Mr Money' --exchange") assertException("ERROR: cashin currency conversion failed: missing conversion rates") { - ingestIncomingPayment(db, payment) + ingestIncomingPayment(db, payment, AccountType.exchange) } // Start server @@ -191,7 +199,7 @@ class IntegrationTest { checkCount(db, 0, 0, 0) ingestIncomingPayment(db, payment.copy( amount = TalerAmount("EUR:0.01"), - )) + ), AccountType.exchange) checkCount(db, 1, 1, 0) client.get("http://0.0.0.0:8080/accounts/exchange/transactions") { basicAuth("exchange", "password") @@ -205,14 +213,14 @@ class IntegrationTest { executionTime = Instant.now(), bankId = "success" ) - ingestIncomingPayment(db, valid_payment) + ingestIncomingPayment(db, valid_payment, AccountType.exchange) checkCount(db, 2, 1, 1) client.get("http://0.0.0.0:8080/accounts/exchange/transactions") { basicAuth("exchange", "password") }.assertOkJson<BankAccountTransactionsResponse>() // Check idempotency - ingestIncomingPayment(db, valid_payment) + ingestIncomingPayment(db, valid_payment, AccountType.exchange) checkCount(db, 2, 1, 1) // TODO check double insert cashin with different subject }