libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 9be42f48b9aa05f87e4e53ae3844af79b9dee33e
parent 73dff597d887071895a7a1afacaabfaad9f62025
Author: Antoine A <>
Date:   Thu,  7 Nov 2024 17:46:02 +0100

nexus: fix ebics orders matching, perform checkpoints and add peek flag

Diffstat:
Mcommon/src/main/kotlin/TalerConfig.kt | 16++++++++++++++--
Mcontrib/nexus.conf | 5+++++
Adatabase-versioning/libeufin-nexus-0008.sql | 28++++++++++++++++++++++++++++
Mnexus/src/main/kotlin/tech/libeufin/nexus/Config.kt | 1+
Anexus/src/main/kotlin/tech/libeufin/nexus/Constants.kt | 23+++++++++++++++++++++++
Mnexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt | 170+++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------
Mnexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSetup.kt | 2+-
Mnexus/src/main/kotlin/tech/libeufin/nexus/cli/Testing.kt | 6+++++-
Mnexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt | 1+
Anexus/src/main/kotlin/tech/libeufin/nexus/db/KvDAO.kt | 78++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mnexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt | 18++++++++++++++++--
Mnexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsOrder.kt | 17+++++++++++++++++
Mnexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt | 2+-
Mtestbench/src/main/kotlin/Main.kt | 7+++++--
Mtestbench/src/test/kotlin/MigrationTest.kt | 3+++
15 files changed, 309 insertions(+), 68 deletions(-)

diff --git a/common/src/main/kotlin/TalerConfig.kt b/common/src/main/kotlin/TalerConfig.kt @@ -26,8 +26,8 @@ import java.nio.file.AccessDeniedException import java.nio.file.NoSuchFileException import java.nio.file.NotDirectoryException import java.nio.file.Path -import java.time.Duration -import java.time.format.DateTimeParseException +import java.time.* +import java.time.format.* import java.time.temporal.ChronoUnit import kotlin.io.path.* @@ -512,6 +512,18 @@ class TalerConfigSection internal constructor( } } + /** Access [option] as time */ + fun time(option: String) = option(option, "time") { + try { + LocalTime.parse(it, DateTimeFormatter.ISO_LOCAL_TIME) + } catch (e: DateTimeParseException) { + val indexFmt = if (e.errorIndex != 0) " at index ${e.errorIndex}" else "" + val causeMsg = e.cause?.message + val causeFmt = if (causeMsg != null) ": ${causeMsg}" else "" + throw ValueError("'$it' not a valid time$indexFmt$causeFmt") + } + } + /** Access [option] as JSON object [T] */ inline fun <reified T> json(option: String, type: String) = option(option, type) { try { diff --git a/contrib/nexus.conf b/contrib/nexus.conf @@ -54,8 +54,12 @@ SQL_DIR = $DATADIR/sql/ CONFIG = postgres:///libeufin [nexus-fetch] +# How often should ebics-fetch run when the bank does not support real time notification FREQUENCY = 30m +# At what time of day should ebics-fetch perform a checkpoint +CHECKPOINT_TIME_OF_DAY = 19:00 + # Ignore all transactions prior to a certain date, useful when you want to use an existing account with old transactions that should not be bounced. # IGNORE_TRANSACTIONS_BEFORE = YYYY-MM-DD @@ -63,6 +67,7 @@ FREQUENCY = 30m # IGNORE_BOUNCES_BEFORE = YYYY-MM-DD [nexus-submit] +# How often should ebics-fetch submit pending transactions FREQUENCY = 30m [nexus-httpd] diff --git a/database-versioning/libeufin-nexus-0008.sql b/database-versioning/libeufin-nexus-0008.sql @@ -0,0 +1,28 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2024 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +BEGIN; + +SELECT _v.register_patch('libeufin-nexus-0008', NULL, NULL); + +SET search_path TO libeufin_nexus; + +CREATE TABLE kv ( + key TEXT NOT NULL PRIMARY KEY, + value JSONB NOT NULL +); +COMMENT ON TYPE kv + IS 'Store key/value informations that do not fit well in a traditional relational table.'; +COMMIT; diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt @@ -43,6 +43,7 @@ class NexusFetchConfig(config: TalerConfig) { private val section = config.section("nexus-fetch") val frequency = section.duration("frequency").require() val frequencyRaw = section.string("frequency").require() + val checkpointTime = section.time("checkpoint_time_of_day").require() val ignoreTransactionsBefore = section.date("ignore_transactions_before").default(Instant.MIN) val ignoreBouncesBefore = section.date("ignore_bounces_before").default(Instant.MIN) } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Constants.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Constants.kt @@ -0,0 +1,22 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ +package tech.libeufin.nexus + +// KV +val CHECKPOINT_KEY = "checkpoint" +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt @@ -22,13 +22,14 @@ package tech.libeufin.nexus.cli import com.github.ajalt.clikt.core.CliktCommand import com.github.ajalt.clikt.core.Context import com.github.ajalt.clikt.core.ProgramResult -import com.github.ajalt.clikt.parameters.arguments.argument -import com.github.ajalt.clikt.parameters.arguments.multiple -import com.github.ajalt.clikt.parameters.arguments.unique +import com.github.ajalt.clikt.parameters.arguments.* import com.github.ajalt.clikt.parameters.groups.provideDelegate -import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.options.* import com.github.ajalt.clikt.parameters.types.enum +import kotlin.math.min import kotlinx.coroutines.* +import kotlinx.serialization.Serializable +import kotlinx.serialization.Contextual import tech.libeufin.common.* import tech.libeufin.nexus.* import tech.libeufin.nexus.db.* @@ -37,7 +38,8 @@ import tech.libeufin.nexus.ebics.* import tech.libeufin.nexus.iso20022.* import java.io.IOException import java.io.InputStream -import java.time.Instant +import java.time.* +import java.time.temporal.* /** Register an outgoing [payment] into [db] */ suspend fun registerOutgoingPayment( @@ -279,6 +281,7 @@ private suspend fun fetchEbicsDocuments( client: EbicsClient, orders: Collection<EbicsOrder>, pinnedStart: Instant?, + peek: Boolean ): Boolean { val lastExecutionTime: Instant? = pinnedStart return orders.all { order -> @@ -297,7 +300,8 @@ private suspend fun fetchEbicsDocuments( client.download( order, lastExecutionTime, - null + null, + peek ) { payload -> registerPayload(client.db, client.cfg, payload, doc) } @@ -310,6 +314,14 @@ private suspend fun fetchEbicsDocuments( } } +@Serializable +data class Checkpoint( + @Contextual + val last_successfull: Instant? = null, + @Contextual + val last_trial: Instant? = null +) + class EbicsFetch: CliktCommand() { override fun help(context: Context) = "Downloads and parse EBICS files from the bank and register them into the database" @@ -322,7 +334,13 @@ class EbicsFetch: CliktCommand() { private val pinnedStart by option( help = "Only supported in --transient mode, this option lets specify the earliest timestamp of the downloaded documents", metavar = "YYYY-MM-DD" - ) + ).convert { dateToInstant(it) } + private val peek by option("--peek", + help = "Only supported in --transient mode, do not consume fetched documents" + ).flag() + private val transientCheckpoint by option("--checkpoint", + help = "Only supported in --transient mode, run a checkpoint" + ).flag() private val ebicsLog by ebicsLogOption() override fun run() = cliCmd(logger, common.log) { @@ -337,65 +355,99 @@ class EbicsFetch: CliktCommand() { bankKeys ) val docs = if (documents.isEmpty()) OrderDoc.entries else documents.toList() - val requestedOrders = docs.map { cfg.ebics.dialect.downloadDoc(it, false) }.toSet() - /** Fetch requested documents only if they have new content */ - suspend fun fetchAvailableDocuments(pinnedStartArg: Instant?): Boolean { - val orders = if (requestedOrders.size > 1) { - // Always add HAC as it is never announced - val pendingOrders = mutableSetOf<EbicsOrder>(EbicsOrder.V3.HAC) - client.download(EbicsOrder.V3.HAA, null, null) { stream -> - val haa = EbicsAdministrative.parseHAA(stream) - logger.debug { - val orders = haa.orders.map(EbicsOrder::description).joinToString(", ") - "HAA: ${orders}" - } - pendingOrders.addAll(haa.orders) - } - // Only fetch requested and supported orders - requestedOrders intersect pendingOrders - } else { - // If there is only one document to fetch, fetching HAA is actually always more expensive - requestedOrders - } - return fetchEbicsDocuments(client, orders, pinnedStartArg) - } + // EBICS order than should be fetched + val selectedOrder = docs.map { cfg.ebics.dialect.downloadDoc(it, false) } - if (transient) { - logger.info("Transient mode: fetching once and returning.") - val pinnedStartVal = pinnedStart - val pinnedStartArg = if (pinnedStartVal != null) { - logger.debug("Pinning start date to: {}", pinnedStartVal) - dateToInstant(pinnedStartVal) - } else null - if (!fetchAvailableDocuments(pinnedStartArg)) { - throw ProgramResult(1) - } + // Try to obtain real-time notification channel if not transient + val wssNotification = if (transient) { + logger.info("Transient mode: fetching once and returning") + null } else { - val wssNotification = listenForNotification(client) + val tmp = listenForNotification(client) logger.info("Running with a frequency of ${cfg.fetch.frequencyRaw}") - var nextFullRun = 0L - while (true) { - val now = System.currentTimeMillis() - if (nextFullRun < now) { - logger.info("Running at frequency") - fetchAvailableDocuments(null) - nextFullRun = now + cfg.fetch.frequency.toMillis() - } - val delay = nextFullRun - now - if (wssNotification == null) { - delay(delay) + tmp + } + + var lastFetch = Instant.MIN + var checkpoint = db.kv.get<Checkpoint>(CHECKPOINT_KEY) ?: Checkpoint() + + while (true) { + var nextFetch = lastFetch + cfg.fetch.frequency + var nextCheckpoint = run { + // We never ran, we must checkpoint now + if (checkpoint.last_trial == null) { + Instant.MIN } else { - val notifications = withTimeoutOrNull(delay) { - wssNotification.receive() + // We run today at checkpointTime + val checkpointDate = OffsetDateTime.now().with(cfg.fetch.checkpointTime) + val checkpointToday = checkpointDate.toInstant() + // If we already ran today we ruAn tomorrow + if (checkpoint.last_trial > checkpointToday) { + checkpointDate.plusDays(1).toInstant() + } else { + checkpointToday } - if (notifications != null) { - // Only fetch requested and supported orders - val orders = requestedOrders intersect notifications - if (orders.isNotEmpty()) { - logger.info("Running at real-time notifications reception") - fetchEbicsDocuments(client, notifications, null) + } + + } + + val now = Instant.now() + var success: Boolean = true + if ( + // Run transient checkpoint at request + (transient && transientCheckpoint) || + // Or run recurent checkpoint + (!transient && now > nextCheckpoint) + ) { + logger.info("Running checkpoint") + success = fetchEbicsDocuments(client, selectedOrder, checkpoint.last_successfull, transient && peek) + checkpoint = if (success) { + checkpoint.copy(last_successfull = now, last_trial = now) + } else { + checkpoint.copy(last_trial = now) + } + db.kv.set(CHECKPOINT_KEY, checkpoint) + lastFetch = now + continue + } else if (transient || now > nextFetch) { + logger.info("Running at frequency") + val orders = if (selectedOrder.size > 1) { + var pendingOrders = listOf<EbicsOrder>() + client.download(EbicsOrder.V3.HAA, null, null, false) { stream -> + val haa = EbicsAdministrative.parseHAA(stream) + logger.debug { + val orders = haa.orders.map(EbicsOrder::description).joinToString(", ") + "HAA: ${orders}" } + pendingOrders = haa.orders + } + // Only fetch requested and supported orders + selectedOrder matches pendingOrders + } else { + // If there is only one document to fetch, fetching HAA is actually always more expensive + selectedOrder + } + success = fetchEbicsDocuments(client, orders, if (transient) pinnedStart else null, transient && peek) + lastFetch = now + } + if (transient) { + throw ProgramResult(if (!success) 1 else 0) + } + + val delay = min(ChronoUnit.MILLIS.between(now, nextFetch), ChronoUnit.MILLIS.between(now, nextCheckpoint)) + if (wssNotification == null) { + delay(delay) + } else { + val notifications = withTimeoutOrNull(delay) { + wssNotification.receive() + } + if (notifications != null) { + // Only fetch requested and supported orders + val orders = selectedOrder matches notifications + if (orders.isNotEmpty()) { + logger.info("Running at real-time notifications reception") + fetchEbicsDocuments(client, notifications, null, false) } } } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSetup.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSetup.kt @@ -220,7 +220,7 @@ class EbicsSetup: CliktCommand() { ebicsLogger, clientKeys, bankKeys - ).download(EbicsOrder.V3.HKD, null, null) { stream -> + ).download(EbicsOrder.V3.HKD, null, null, false) { stream -> val (partner, users) = EbicsAdministrative.parseHKD(stream) val user = users.find { it -> it.id == cfg.ebics.host.ebicsUserId } // Debug logging diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/Testing.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/Testing.kt @@ -148,6 +148,9 @@ class EbicsDownload: CliktCommand("ebics-btd") { " to download (only consumed in --transient mode). The" + " latest document is always until the current time." ) + private val peek by option("--peek", + help = "Do not consume fetched documents" + ).flag() private val dryRun by option().flag() class DryRun: Exception() @@ -172,7 +175,8 @@ class EbicsDownload: CliktCommand("ebics-btd") { client.download( EbicsOrder.V3(type, name, scope, messageName, messageVersion, container, option), pinnedStartArg, - null + null, + peek ) { stream -> if (container == "ZIP") { stream.unzipEach { fileName, xmlContent -> diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt @@ -92,6 +92,7 @@ class Database(dbConfig: DatabaseConfig, val bankCurrency: String): DbPool(dbCon val exchange = ExchangeDAO(this) val ebics = EbicsDAO(this) val list = ListDAO(this) + val kv = KvDAO(this) private val outgoingTxFlows: MutableSharedFlow<Long> = MutableSharedFlow() private val incomingTxFlows: MutableSharedFlow<Long> = MutableSharedFlow() diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/KvDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/KvDAO.kt @@ -0,0 +1,77 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.nexus.db + +import tech.libeufin.common.* +import tech.libeufin.common.db.* +import tech.libeufin.nexus.iso20022.IncomingPayment +import tech.libeufin.nexus.iso20022.OutgoingPayment +import java.sql.Types +import java.time.Instant +import kotlinx.serialization.Contextual +import kotlinx.serialization.KSerializer +import kotlinx.serialization.Serializable +import kotlinx.serialization.descriptors.PrimitiveKind +import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.encodeToString +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlinx.serialization.json.Json +import kotlinx.serialization.modules.SerializersModule + +object InstantSerialize : KSerializer<Instant> { + override val descriptor: SerialDescriptor = + PrimitiveSerialDescriptor("Instant", PrimitiveKind.LONG) + override fun serialize(encoder: Encoder, value: Instant) = + encoder.encodeLong(value.micros()) + override fun deserialize(decoder: Decoder): Instant = + decoder.decodeLong().asInstant() +} + +val JSON = Json { + this.serializersModule = SerializersModule { + contextual(Instant::class) { InstantSerialize } + } +} + +/** Data access logic for key value */ +class KvDAO( val db: Database) { + /** Get current value for [key] */ + suspend inline fun <reified T> get(key: String): T? = db.serializable( + "SELECT value FROM kv WHERE key=?" + ) { + setString(1, key) + oneOrNull { + val encoded = it.getString(1) + JSON.decodeFromString<T>(encoded) + } + } + + /** Set [value] for [key] */ + suspend inline fun <reified T> set(key: String, value: T) = db.serializable( + "INSERT INTO kv (key, value) VALUES (?, ?::jsonb) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value" + ) { + val encoded = JSON.encodeToString<T>(value) + setString(1, key) + setString(2, encoded) + execute() + } +} +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt @@ -145,10 +145,24 @@ class EbicsClient( order: EbicsOrder, startDate: Instant?, endDate: Instant?, + peek: Boolean, processing: suspend (InputStream) -> Unit, ) { val description = order.description() - logger.debug { "Download order $description" } + logger.debug { + buildString { + append("Download order ") + append(description) + if (startDate != null) { + append(" from ") + append(startDate) + if (endDate != null) { + append(" to ") + append(endDate) + } + } + } + } val txLog = ebicsLogger.tx(order) val impl = EbicsBTS(cfg.ebics, bankKeys, clientKeys, order) @@ -225,7 +239,7 @@ class EbicsClient( } // First send a proper EBICS transaction receipt - val xml = impl.downloadReceipt(tId, res.isSuccess) + val xml = impl.downloadReceipt(tId, res.isSuccess && !peek) impl.postBTS(client, xml, "Download receipt", txLog.step("receipt")) .okOrFail("Download receipt $description") runCatching { db.ebics.remove(tId) } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsOrder.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsOrder.kt @@ -101,8 +101,25 @@ sealed class EbicsOrder(val schema: String) { } } } + + /** Check if two EBICS order match ignoring the message version */ + fun match(other: EbicsOrder): Boolean = when (this) { + is V2_5 -> other is V2_5 + && type == other.type + && attribute == other.attribute + is V3 -> other is V3 + && type == other.type + && service == other.service + && scope == other.scope + && message == other.message + && container == other.container + && option == other.option + } } +infix fun Collection<EbicsOrder>.matches(other: Collection<EbicsOrder>): List<EbicsOrder> + = this.filter { a -> other.any { b -> a.match(b) } } + enum class OrderDoc { /// EBICS acknowledgement - CustomerAcknowledgement HAC pain.002 acknowledgement, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt @@ -98,7 +98,7 @@ sealed interface WssNotification { /** Download EBICS real-time notifications websocket params */ suspend fun EbicsClient.wssParams(): WssParams { lateinit var params: WssParams - download(EbicsOrder.V3.WSS_PARAMS, null, null) { stream -> + download(EbicsOrder.V3.WSS_PARAMS, null, null, false) { stream -> params = Json.decodeFromStream(stream) } return params diff --git a/testbench/src/main/kotlin/Main.kt b/testbench/src/main/kotlin/Main.kt @@ -95,10 +95,11 @@ class Cli : CliktCommand() { LIBEUFIN_NEXUS_HOME = test/$platform [nexus-fetch] - FREQUENCY = 4h + FREQUENCY = 5m + CHECKPOINT_TIME_OF_DAY = 16:52 [nexus-submit] - FREQUENCY = 4h + FREQUENCY = 5m [libeufin-nexusdb-postgres] CONFIG = postgres:///libeufintestbench @@ -163,6 +164,8 @@ class Cli : CliktCommand() { put("recover", "Recover old transactions", "ebics-fetch $ebicsFlags --pinned-start 2024-01-01 $recoverDoc") put("fetch", "Fetch all documents", "ebics-fetch $ebicsFlags") put("fetch-wait", "Fetch all documents", "ebics-fetch $debugFlags") + put("checkpoint", "Run a transient checkpoint", "ebics-fetch $ebicsFlags --checkpoint") + put("peek", "Run a transient peek", "ebics-fetch $ebicsFlags --peek") put("ack", "Fetch CustomerAcknowledgement", "ebics-fetch $ebicsFlags acknowledgement") put("status", "Fetch CustomerPaymentStatusReport", "ebics-fetch $ebicsFlags status") put("report", "Fetch BankToCustomerAccountReport", "ebics-fetch $ebicsFlags report") diff --git a/testbench/src/test/kotlin/MigrationTest.kt b/testbench/src/test/kotlin/MigrationTest.kt @@ -112,5 +112,8 @@ class MigrationTest { // libeufin-nexus-0007 conn.execSQLUpdate(Path("../database-versioning/libeufin-nexus-0007.sql").readText()) + + // libeufin-nexus-0008 + conn.execSQLUpdate(Path("../database-versioning/libeufin-nexus-0008.sql").readText()) } } \ No newline at end of file