commit a8ea7743f089e148fb217aa797c227be6d22708a
parent f27b5856875ffd6215022bbebe6067cb9f7cd9b1
Author: Antoine A <>
Date: Tue, 9 Sep 2025 15:24:55 +0200
nexus: non transient fetch never fail with network error and recover
Diffstat:
4 files changed, 63 insertions(+), 60 deletions(-)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt
@@ -385,23 +385,15 @@ private suspend fun fetchEbicsDocuments(
) { payload ->
registerPayload(client.db, client.cfg, payload, doc)
}
- } catch (e: Exception) {
- if (e is EbicsError.Code) {
- when (e.bankCode) {
- EbicsReturnCode.EBICS_NO_DOWNLOAD_DATA_AVAILABLE -> continue
- EbicsReturnCode.EBICS_AUTHORISATION_ORDER_IDENTIFIER_FAILED -> {
- e.fmtLog(logger)
- success = false
- continue
- }
- else -> {
- e.fmtLog(logger)
- return false
- }
+ } catch (e: EbicsError.Code) {
+ when (e.bankCode) {
+ EbicsReturnCode.EBICS_NO_DOWNLOAD_DATA_AVAILABLE -> continue
+ EbicsReturnCode.EBICS_AUTHORISATION_ORDER_IDENTIFIER_FAILED -> {
+ e.fmtLog(logger)
+ success = false
+ continue
}
- } else {
- e.fmtLog(logger)
- return false
+ else -> throw e
}
}
}
@@ -493,22 +485,28 @@ class EbicsFetch: EbicsCmd() {
(!transient && now > nextCheckpoint)
) {
logger.info("Running checkpoint")
- /// We fetch HKD to only fetch supported EBICS orders and get the document versions
- val orders = client.download(EbicsOrder.V3.HKD, null, null, false) { stream ->
- val hkd = EbicsAdministrative.parseHKD(stream)
- val supportedOrder = hkd.partner.orders.map { it.order }
- logger.debug {
- val fmt = supportedOrder.map(EbicsOrder::description).joinToString(" ")
- "HKD: ${fmt}"
- }
- selectedOrder select supportedOrder
- }
+
val since = if (transient && pinnedStart != null && (checkpoint.last_successfull == null || pinnedStart!!.isBefore(checkpoint.last_successfull))) {
pinnedStart
} else {
checkpoint.last_successfull
}
- success = fetchEbicsDocuments(client, orders, since, transient && peek)
+ success = try {
+ /// We fetch HKD to only fetch supported EBICS orders and get the document versions
+ val orders = client.download(EbicsOrder.V3.HKD) { stream ->
+ val hkd = EbicsAdministrative.parseHKD(stream)
+ val supportedOrder = hkd.partner.orders.map { it.order }
+ logger.debug {
+ val fmt = supportedOrder.map(EbicsOrder::description).joinToString(" ")
+ "HKD: ${fmt}"
+ }
+ selectedOrder select supportedOrder
+ }
+ fetchEbicsDocuments(client, orders, since, transient && peek)
+ } catch (e: Exception) {
+ e.fmtLog(logger)
+ false
+ }
checkpoint = if (success) {
checkpoint.copy(last_successfull = now, last_trial = now)
} else {
@@ -518,16 +516,21 @@ class EbicsFetch: EbicsCmd() {
lastFetch = now
} else if (transient || now > nextFetch) {
logger.info("Running at frequency")
- /// We fetch HAA to only fetch pending & supported EBICS orders and get the document versions
- val orders = client.download(EbicsOrder.V3.HAA, null, null, false) { stream ->
- val haa = EbicsAdministrative.parseHAA(stream)
- logger.debug {
- val orders = haa.orders.map(EbicsOrder::description).joinToString(" ")
- "HAA: ${orders}"
+ success = try {
+ /// We fetch HAA to only fetch pending & supported EBICS orders and get the document versions
+ val orders = client.download(EbicsOrder.V3.HAA) { stream ->
+ val haa = EbicsAdministrative.parseHAA(stream)
+ logger.debug {
+ val orders = haa.orders.map(EbicsOrder::description).joinToString(" ")
+ "HAA: ${orders}"
+ }
+ selectedOrder select haa.orders
}
- selectedOrder select haa.orders
+ fetchEbicsDocuments(client, orders, if (transient) pinnedStart else null, transient && peek)
+ } catch (e: Exception) {
+ e.fmtLog(logger)
+ false
}
- success = fetchEbicsDocuments(client, orders, if (transient) pinnedStart else null, transient && peek)
lastFetch = now
}
if (transient) {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSetup.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSetup.kt
@@ -253,7 +253,7 @@ class EbicsSetup: TalerCmd() {
ebicsLogger,
clientKeys,
bankKeys
- ).download(EbicsOrder.V3.HKD, null, null, false) { stream ->
+ ).download(EbicsOrder.V3.HKD) { stream ->
val (partner, users) = EbicsAdministrative.parseHKD(stream)
val user = users.find { it -> it.id == cfg.ebics.host.ebicsUserId }
// Debug logging
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt
@@ -147,9 +147,9 @@ class EbicsClient(
*/
suspend fun <T> download(
order: EbicsOrder,
- startDate: Instant?,
- endDate: Instant?,
- peek: Boolean,
+ startDate: Instant? = null,
+ endDate: Instant? = null,
+ peek: Boolean = false,
processing: suspend (InputStream) -> T,
): T {
val description = order.description()
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt
@@ -97,7 +97,7 @@ sealed interface WssNotification {
/** Download EBICS real-time notifications websocket params */
suspend fun EbicsClient.wssParams(): WssParams =
- download(EbicsOrder.V3.WSS_PARAMS, null, null, false) { stream ->
+ download(EbicsOrder.V3.WSS_PARAMS) { stream ->
Json.decodeFromStream(stream)
}
@@ -145,30 +145,31 @@ suspend fun WssParams.connect(client: HttpClient, lambda: suspend (WssNotificati
}
suspend fun listenForNotification(client: EbicsClient): ReceiveChannel<List<EbicsOrder>>? {
- // Try to get params
- var params = try {
- client.wssParams()
- } catch (e: EbicsError) {
- if (
- // Expected EBICS error
- (e is EbicsError.Code && e.technicalCode == EbicsReturnCode.EBICS_INVALID_ORDER_IDENTIFIER) ||
- // Netzbon HTTP error
- (e is EbicsError.HTTP && e.status == HttpStatusCode.BadRequest)
- ) {
- // Failure is expected if this wss is not supported
- logger.info("Real-time EBICS notifications is not supported")
- return null
- } else {
- throw e
- }
- }
- logger.info("Listening to real-time EBICS notifications")
val channel = Channel<List<EbicsOrder>>()
- val backoff = ExpoBackoffDecorr()
+ val backoff = ExpoBackoffDecorr(
+ 30 * 1000, // 30 seconds
+ 30 * 60 * 1000 // 30 min
+ )
kotlin.concurrent.thread(isDaemon = true) {
runBlocking {
while (true) {
try {
+ // Try to get params
+ val params = try {
+ client.wssParams()
+ } catch (e: EbicsError) {
+ if (
+ // Expected EBICS error
+ (e is EbicsError.Code && e.technicalCode == EbicsReturnCode.EBICS_INVALID_ORDER_IDENTIFIER) ||
+ // Netzbon HTTP error
+ (e is EbicsError.HTTP && e.status == HttpStatusCode.BadRequest)
+ ) {
+ // Failure is expected if this wss is not supported
+ logger.info("Real-time EBICS notifications is not supported")
+ return@runBlocking
+ } else throw e
+ }
+ logger.info("Listening to real-time EBICS notifications")
logger.trace("{}", params)
params.connect(client.client) { msg ->
backoff.reset()
@@ -198,7 +199,6 @@ suspend fun listenForNotification(client: EbicsClient): ReceiveChannel<List<Ebic
e.fmtLog(logger)
delay(backoff.next())
}
- params = client.wssParams()
}
}
}