commit 4ff2df763817b66c998da3bd274efd70585d0a85
parent 268cdff79e8acbd4ba403bfcd2126493444e0275
Author: Antoine A <>
Date: Tue, 18 Mar 2025 12:46:30 +0100
nexus: fetch multiple files versions and improve download logic
Diffstat:
4 files changed, 54 insertions(+), 60 deletions(-)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt
@@ -336,34 +336,36 @@ private suspend fun fetchEbicsDocuments(
peek: Boolean
): Boolean {
val lastExecutionTime: Instant? = pinnedStart
- return orders.all { order ->
- val doc = order.doc()
+ for ((doc, orders) in orders.groupBy { it.doc() }) {
if (doc == null) {
- logger.debug("Skip unsupported order {}", order)
- true
+ logger.debug("Skip unsupported orders {}", orders)
} else {
- try {
- if (lastExecutionTime == null) {
- logger.info("Fetching new '${doc.fullDescription()}'")
- } else {
- logger.info("Fetching '${doc.fullDescription()}' from timestamp: $lastExecutionTime")
- }
- // downloading the content
- client.download(
- order,
- lastExecutionTime,
- null,
- peek
- ) { payload ->
- registerPayload(client.db, client.cfg, payload, doc)
+ if (lastExecutionTime == null) {
+ logger.info("Fetching new '${doc.fullDescription()}'")
+ } else {
+ logger.info("Fetching '${doc.fullDescription()}' from timestamp: $lastExecutionTime")
+ }
+ for (order in orders) {
+ try {
+ client.download(
+ order,
+ lastExecutionTime,
+ null,
+ peek
+ ) { payload ->
+ registerPayload(client.db, client.cfg, payload, doc)
+ }
+ } catch (e: Exception) {
+ // Ignore no data errors
+ if (e !is EbicsError.Code || e.bankCode != EbicsReturnCode.EBICS_NO_DOWNLOAD_DATA_AVAILABLE) {
+ e.fmtLog(logger)
+ return false
+ }
}
- true
- } catch (e: Exception) {
- e.fmtLog(logger)
- false
}
}
}
+ return true
}
@Serializable
@@ -441,7 +443,6 @@ class EbicsFetch: CliktCommand() {
checkpointToday
}
}
-
}
val now = Instant.now()
@@ -453,7 +454,17 @@ class EbicsFetch: CliktCommand() {
(!transient && now > nextCheckpoint)
) {
logger.info("Running checkpoint")
- success = fetchEbicsDocuments(client, selectedOrder, checkpoint.last_successfull, transient && peek)
+ /// We fetch HKD to only fetch supported EBICS orders and get the document versions
+ val orders = client.download(EbicsOrder.V3.HKD, null, null, false) { stream ->
+ val hkd = EbicsAdministrative.parseHKD(stream)
+ val supportedOrder = hkd.partner.orders.map { it.order }
+ logger.debug {
+ val fmt = supportedOrder.map(EbicsOrder::description).joinToString(", ")
+ "HKD: ${fmt}"
+ }
+ selectedOrder select supportedOrder
+ }
+ success = fetchEbicsDocuments(client, orders, checkpoint.last_successfull, transient && peek)
checkpoint = if (success) {
checkpoint.copy(last_successfull = now, last_trial = now)
} else {
@@ -461,24 +472,16 @@ class EbicsFetch: CliktCommand() {
}
db.kv.set(CHECKPOINT_KEY, checkpoint)
lastFetch = now
- continue
} else if (transient || now > nextFetch) {
logger.info("Running at frequency")
- val orders = if (selectedOrder.size > 1) {
- var pendingOrders = listOf<EbicsOrder>()
- client.download(EbicsOrder.V3.HAA, null, null, false) { stream ->
- val haa = EbicsAdministrative.parseHAA(stream)
- logger.debug {
- val orders = haa.orders.map(EbicsOrder::description).joinToString(", ")
- "HAA: ${orders}"
- }
- pendingOrders = haa.orders
+ /// We fetch HAA to only fetch pending & supported EBICS orders and get the document versions
+ val orders = client.download(EbicsOrder.V3.HAA, null, null, false) { stream ->
+ val haa = EbicsAdministrative.parseHAA(stream)
+ logger.debug {
+ val orders = haa.orders.map(EbicsOrder::description).joinToString(", ")
+ "HAA: ${orders}"
}
- // Only fetch requested and supported orders
- selectedOrder matches pendingOrders
- } else {
- // If there is only one document to fetch, fetching HAA is actually always more expensive
- selectedOrder
+ selectedOrder select haa.orders
}
success = fetchEbicsDocuments(client, orders, if (transient) pinnedStart else null, transient && peek)
lastFetch = now
@@ -496,7 +499,7 @@ class EbicsFetch: CliktCommand() {
}
if (notifications != null) {
// Only fetch requested and supported orders
- val orders = selectedOrder matches notifications
+ val orders = selectedOrder select notifications
if (orders.isNotEmpty()) {
logger.info("Running at real-time notifications reception")
fetchEbicsDocuments(client, notifications, null, false)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt
@@ -143,13 +143,13 @@ class EbicsClient(
*
* Cancellations and failures are handled.
*/
- suspend fun download(
+ suspend fun <T> download(
order: EbicsOrder,
startDate: Instant?,
endDate: Instant?,
peek: Boolean,
- processing: suspend (InputStream) -> Unit,
- ) {
+ processing: suspend (InputStream) -> T,
+ ): T {
val description = order.description()
logger.debug {
buildString {
@@ -180,13 +180,10 @@ class EbicsClient(
// We need to run the logic in a non-cancelable context because we need to send
// a receipt for each open download transaction, otherwise we'll be stuck in an
// error loop until the pending transaction timeout.
- val init = withContext(NonCancellable) {
+ val (tId, initContent) = withContext(NonCancellable) {
// Init phase
val initReq = impl.downloadInitialization(startDate, endDate)
val initResp = impl.postBTS(client, initReq, "Download init", txLog.step("init"))
- if (initResp.bankCode == EbicsReturnCode.EBICS_NO_DOWNLOAD_DATA_AVAILABLE) {
- return@withContext null
- }
val initContent = initResp.okOrFail("Download init $description")
val tId = requireNotNull(initContent.transactionID) {
"Download init $description: missing transaction ID"
@@ -194,7 +191,6 @@ class EbicsClient(
db.ebics.register(tId)
Pair(tId, initContent)
}
- val (tId, initContent) = if (init == null) return else init
val howManySegments = requireNotNull(initContent.numSegments) {
"Download init $description: missing num segments"
}
@@ -246,7 +242,7 @@ class EbicsClient(
.okOrFail("Download receipt $description")
runCatching { db.ebics.remove(tId) }
// Then throw business logic exception if any
- res.getOrThrow()
+ return res.getOrThrow()
}
/**
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsOrder.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsOrder.kt
@@ -1,6 +1,6 @@
/*
* This file is part of LibEuFin.
- * Copyright (C) 2024 Taler Systems S.A.
+ * Copyright (C) 2024-2025 Taler Systems S.A.
* LibEuFin is free software; you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
@@ -18,8 +18,6 @@
*/
package tech.libeufin.nexus.ebics
-// We will support more dialect in the future
-
sealed class EbicsOrder(val schema: String) {
data class V2_5(
val type: String,
@@ -117,8 +115,8 @@ sealed class EbicsOrder(val schema: String) {
}
}
-infix fun Collection<EbicsOrder>.matches(other: Collection<EbicsOrder>): List<EbicsOrder>
- = this.filter { a -> other.any { b -> a.match(b) } }
+infix fun Collection<EbicsOrder>.select(other: Collection<EbicsOrder>): List<EbicsOrder>
+ = other.filter { order -> this.any { filter -> filter.match(order) } }
enum class OrderDoc {
/// EBICS acknowledgement - CustomerAcknowledgement HAC pain.002
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt
@@ -1,6 +1,6 @@
/*
* This file is part of LibEuFin.
- * Copyright (C) 2024 Taler Systems S.A.
+ * Copyright (C) 2024-2025 Taler Systems S.A.
* LibEuFin is free software; you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
@@ -96,13 +96,10 @@ sealed interface WssNotification {
}
/** Download EBICS real-time notifications websocket params */
-suspend fun EbicsClient.wssParams(): WssParams {
- lateinit var params: WssParams
+suspend fun EbicsClient.wssParams(): WssParams =
download(EbicsOrder.V3.WSS_PARAMS, null, null, false) { stream ->
- params = Json.decodeFromStream(stream)
+ Json.decodeFromStream(stream)
}
- return params
-}
/** Receive a JSON message from a websocket session */
private suspend inline fun <reified T> DefaultClientWebSocketSession.receiveJson(): T {