commit 52742e7145217d0562ffef2bd794b6d9bf4c70ff
parent e618031731a0d16a36bf14c1b25c46acd20a2eba
Author: Antoine A <>
Date: Fri, 5 Dec 2025 16:46:30 +0100
ebisync: add fetch
Diffstat:
8 files changed, 280 insertions(+), 31 deletions(-)
diff --git a/libeufin-ebics/src/main/kotlin/tech/libeufin/ebics/order.kt b/libeufin-ebics/src/main/kotlin/tech/libeufin/ebics/order.kt
@@ -101,6 +101,14 @@ sealed class EbicsOrder(val schema: String) {
}
}
+ /** Check if EBICS order is a downloadable file */
+ fun isDownload(): Boolean = when (this) {
+ is V2_5 -> this.type in setOf("HAC", "Z01", "Z52", "Z53", "Z54")
+ is V3 -> this.type == "HAC" || (
+ this.type=="BTD" && this.message in setOf("pain.002", "camt.052", "camt.053", "camt.054")
+ )
+ }
+
/** Check if two EBICS order match ignoring the message version */
fun match(other: EbicsOrder): Boolean = when (this) {
is V2_5 -> other is V2_5
diff --git a/libeufin-ebisync/src/main/kotlin/tech/libeufin/constants.kt b/libeufin-ebisync/src/main/kotlin/tech/libeufin/constants.kt
@@ -0,0 +1,25 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2025 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+
+package tech.libeufin.ebisync
+
+// KV
+val CHECKPOINT_KEY = "checkpoint"
+val SUBMIT_TASK_KEY = "submit_task"
+val FETCH_TASK_KEY = "fetch_task"
+\ No newline at end of file
diff --git a/libeufin-ebisync/src/main/kotlin/tech/libeufin/ebisync/Main.kt b/libeufin-ebisync/src/main/kotlin/tech/libeufin/ebisync/Main.kt
@@ -38,16 +38,20 @@ import javax.crypto.spec.SecretKeySpec
import kotlinx.coroutines.runBlocking
import java.util.Base64
import com.github.ajalt.clikt.core.main
+import kotlinx.serialization.Serializable
+import kotlinx.serialization.Contextual
fun main(args: Array<String>) {
setupSecurityProperties()
setupSecurityProperties()
LibeufinEbiSync().main(args)
- /*
- runBlocking {
- //client.createContainer("main")
- //client.listBlobs("main1")
- client.putBlob("main", "file2", "Hello World".toByteArray(Charsets.UTF_8), ContentType.Application.Docx)
- }*/
-}
-\ No newline at end of file
+}
+
+@Serializable
+data class TaskStatus(
+ @Contextual
+ val last_successfull: Instant? = null,
+ @Contextual
+ val last_trial: Instant? = null
+)
+\ No newline at end of file
diff --git a/libeufin-ebisync/src/main/kotlin/tech/libeufin/ebisync/azure.kt b/libeufin-ebisync/src/main/kotlin/tech/libeufin/ebisync/azure.kt
@@ -66,13 +66,8 @@ val AzureSharedKeyAuth = createClientPlugin("AzureSharedKeyAuth", ::AzureStorage
set("x-ms-version", API_VERSION)
}
- for (entry in req.headers.entries()) {
- println("${entry.key} ${entry.value}")
- }
-
// 2. Build the StringToSign
val stringToSign = createStringToSign(req, config.accountName)
- println(stringToSign.replace("\n", "\\n"))
// 3. Calculate the HMAC-SHA256 signature
val signature = run {
diff --git a/libeufin-ebisync/src/main/kotlin/tech/libeufin/ebisync/cli/Fetch.kt b/libeufin-ebisync/src/main/kotlin/tech/libeufin/ebisync/cli/Fetch.kt
@@ -20,24 +20,65 @@
package tech.libeufin.ebisync.cli
import io.ktor.client.HttpClient
+import io.ktor.http.ContentType
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.core.Context
+import com.github.ajalt.clikt.core.ProgramResult
+import com.github.ajalt.clikt.parameters.arguments.*
import com.github.ajalt.clikt.parameters.groups.provideDelegate
-import kotlinx.coroutines.delay
+import com.github.ajalt.clikt.parameters.options.*
+import com.github.ajalt.clikt.parameters.types.enum
+import kotlin.math.min
+import kotlinx.coroutines.*
import tech.libeufin.common.*
import tech.libeufin.ebics.*
import tech.libeufin.ebisync.*
import tech.libeufin.ebisync.db.Database
-import java.time.Instant
+import java.time.*
+import java.time.temporal.*
+import java.io.IOException
import kotlin.time.toKotlinDuration
-suspend fun submit(cfg: EbisyncFetchConfig, client: EbicsClient, db: Database) {
- val client = AzureBlogStorage(cfg.accountName, cfg.accountKey, cfg.apiUrl, client.client)
+suspend fun submit(cfg: EbisyncFetchConfig, client: EbicsClient, db: Database, orders: List<EbicsOrder>) {
+ val azure = AzureBlogStorage(cfg.accountName, cfg.accountKey, cfg.apiUrl, client.client)
+ for (order in orders) {
+ try {
+ client.download(order) { payload ->
+ val doc = order.doc();
+ if (doc == OrderDoc.acknowledgement) {
+ // TODO HAC
+ } else {
+ payload.unzipEach { fileName, xml ->
+ val bytes = xml.use { it.readBytes() }
+
+ logger.info("upload $fileName")
+ azure.putBlob(cfg.container, fileName, xml.use { it.readBytes() }, ContentType.Application.Xml)
+ }
+ }
+ }
+ } catch (e: EbicsError.Code) {
+ when (e.bankCode) {
+ EbicsReturnCode.EBICS_NO_DOWNLOAD_DATA_AVAILABLE -> continue
+ else -> throw e
+ }
+ }
+ }
}
class Fetch : EbicsCmd() {
override fun help(context: Context) = "Downloads EBICS files from the bank and store them in the configured destination"
+ private val pinnedStart by option(
+ help = "Only supported in --transient mode, this option lets specify the earliest timestamp of the downloaded documents",
+ metavar = "YYYY-MM-DD"
+ ).convert { dateToInstant(it) }
+ private val peek by option("--peek",
+ help = "Only supported in --transient mode, do not consume fetched documents"
+ ).flag()
+ private val transientCheckpoint by option("--checkpoint",
+ help = "Only supported in --transient mode, run a checkpoint"
+ ).flag()
+
override fun run() = cliCmd(logger) {
ebisyncConfig(config).withDb { db, cfg ->
val (clientKeys, bankKeys) = expectFullKeys(cfg)
@@ -49,22 +90,109 @@ class Fetch : EbicsCmd() {
clientKeys,
bankKeys
)
-
- if (transient) {
- logger.info("Transient mode: submitting what found and returning.")
- submit(cfg.fetch, client, db)
+ // Try to obtain real-time notification channel if not transient
+ val wssNotification = if (transient) {
+ logger.info("Transient mode: fetching once and returning")
+ null
} else {
- logger.debug("Running with a frequency of ${cfg.fetch.frequencyRaw}")
- while (true) {
- val now = Instant.now();
- try {
- submit(cfg.fetch, client, db)
+ val tmp = listenForNotification(client)
+ logger.info("Running with a frequency of ${cfg.fetch.frequencyRaw}")
+ tmp
+ }
+
+ var lastFetch = Instant.EPOCH
+
+ while (true) {
+ val checkpoint = db.kv.get<TaskStatus>(CHECKPOINT_KEY) ?: TaskStatus()
+ var nextFetch = lastFetch + cfg.fetch.frequency
+ var nextCheckpoint = run {
+ // We never ran, we must checkpoint now
+ if (checkpoint.last_trial == null) {
+ Instant.EPOCH
+ } else {
+ // We run today at checkpointTime
+ val checkpointDate = OffsetDateTime.now().with(cfg.fetch.checkpointTime)
+ val checkpointToday = checkpointDate.toInstant()
+ // If we already ran today we ruAn tomorrow
+ if (checkpoint.last_trial > checkpointToday) {
+ checkpointDate.plusDays(1).toInstant()
+ } else {
+ checkpointToday
+ }
+ }
+ }
+
+ val now = Instant.now()
+ var success: Boolean = true
+ if (
+ // Run transient checkpoint at request
+ (transient && transientCheckpoint) ||
+ // Or run recurrent checkpoint
+ (!transient && now > nextCheckpoint)
+ ) {
+ logger.info("Running checkpoint")
+
+ val since = if (transient && pinnedStart != null && (checkpoint.last_successfull == null || pinnedStart!!.isBefore(checkpoint.last_successfull))) {
+ pinnedStart
+ } else {
+ checkpoint.last_successfull
+ }
+ success = try {
+ /// We fetch HKD to only fetch supported EBICS orders and get the document versions
+ val orders = client.download(EbicsOrder.V3.HKD) { stream ->
+ val hkd = EbicsAdministrative.parseHKD(stream)
+ val supportedOrder = hkd.partner.orders.map { it.order }
+ logger.debug {
+ val fmt = supportedOrder.map(EbicsOrder::description).joinToString(" ")
+ "HKD: ${fmt}"
+ }
+ supportedOrder.filter { it.isDownload() }
+ }
+ submit(cfg.fetch, client, db, orders)
+ true
+ } catch (e: Exception) {
+ e.fmtLog(logger)
+ false
+ }
+ db.kv.updateTaskStatus(CHECKPOINT_KEY, now, success)
+ lastFetch = now
+ } else if (transient || now > nextFetch) {
+ if (!transient) logger.info("Running at frequency")
+ success = try {
+ /// We fetch HAA to only fetch pending & supported EBICS orders and get the document versions
+ val orders = client.download(EbicsOrder.V3.HAA) { stream ->
+ val haa = EbicsAdministrative.parseHAA(stream)
+ logger.debug {
+ val orders = haa.orders.map(EbicsOrder::description).joinToString(" ")
+ "HAA: ${orders}"
+ }
+ haa.orders
+ }
+ // TODO pinned starts
+ submit(cfg.fetch, client, db, orders)
+ true
} catch (e: Exception) {
e.fmtLog(logger)
+ false
+ }
+ lastFetch = now
+ }
+ db.kv.updateTaskStatus(SUBMIT_TASK_KEY, now, success)
+
+ if (transient) throw ProgramResult(if (!success) 1 else 0)
+
+ val delay = min(ChronoUnit.MILLIS.between(now, nextFetch), ChronoUnit.MILLIS.between(now, nextCheckpoint))
+ if (wssNotification == null) {
+ delay(delay)
+ } else {
+ val notifications = withTimeoutOrNull(delay) {
+ wssNotification.receive()
+ }
+ if (notifications != null) {
+ logger.info("Running at real-time notifications reception")
+ submit(cfg.fetch, client, db, notifications)
}
- // TODO take submit taken time in the delay
- delay(cfg.fetch.frequency.toKotlinDuration())
}
}
}
diff --git a/libeufin-ebisync/src/main/kotlin/tech/libeufin/ebisync/config.kt b/libeufin-ebisync/src/main/kotlin/tech/libeufin/ebisync/config.kt
@@ -70,8 +70,8 @@ class EbisyncFetchConfig(cfg: TalerConfig) {
val checkpointTime = sect.time("checkpoint_time_of_day").require()
val apiUrl = sect.baseURL("azure_api_url").require()
- val accountName = sect.string("account_name").require()
- val accountKey = sect.string("account_key").require()
+ val accountName = sect.string("azure_account_name").require()
+ val accountKey = sect.string("azure_account_key").require()
val container = sect.string("azure_container").require()
}
diff --git a/libeufin-ebisync/src/main/kotlin/tech/libeufin/ebisync/db/Database.kt b/libeufin-ebisync/src/main/kotlin/tech/libeufin/ebisync/db/Database.kt
@@ -16,7 +16,7 @@
* License along with LibEuFin; see the file COPYING. If not, see
* <http://www.gnu.org/licenses/>
*/
-package tech.libeufin.ebics.db
+package tech.libeufin.ebisync.db
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableSharedFlow
@@ -33,4 +33,5 @@ import java.time.Instant
/** Collects database connection steps and any operation on the EbiSync tables */
class Database(dbConfig: DatabaseConfig): DbPool(dbConfig, "libeufin_ebisync") {
val ebics = EbicsDAO(this)
+ val kv = KvDAO(this)
}
\ No newline at end of file
diff --git a/libeufin-ebisync/src/main/kotlin/tech/libeufin/ebisync/db/KvDAO.kt b/libeufin-ebisync/src/main/kotlin/tech/libeufin/ebisync/db/KvDAO.kt
@@ -0,0 +1,86 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024-2025 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+
+package tech.libeufin.ebisync.db
+
+import tech.libeufin.common.*
+import tech.libeufin.common.db.*
+import java.sql.*
+import java.time.Instant
+import kotlinx.serialization.Contextual
+import kotlinx.serialization.KSerializer
+import kotlinx.serialization.Serializable
+import kotlinx.serialization.descriptors.PrimitiveKind
+import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor
+import kotlinx.serialization.descriptors.SerialDescriptor
+import kotlinx.serialization.encodeToString
+import kotlinx.serialization.encoding.Decoder
+import kotlinx.serialization.encoding.Encoder
+import kotlinx.serialization.json.Json
+import kotlinx.serialization.modules.SerializersModule
+
+object InstantSerialize : KSerializer<Instant> {
+ override val descriptor: SerialDescriptor =
+ PrimitiveSerialDescriptor("Instant", PrimitiveKind.LONG)
+ override fun serialize(encoder: Encoder, value: Instant) =
+ encoder.encodeLong(value.micros())
+ override fun deserialize(decoder: Decoder): Instant =
+ decoder.decodeLong().asInstant()
+}
+
+val JSON = Json {
+ this.serializersModule = SerializersModule {
+ contextual(Instant::class) { InstantSerialize }
+ }
+}
+
+inline fun <reified T> ResultSet.getJson(name: String): T? {
+ val value = this.getString(name)
+ if (value == null) {
+ return value
+ }
+ return JSON.decodeFromString(value)
+}
+
+/** Data access logic for key value */
+class KvDAO(val db: Database) {
+ /** Get current value for [key] */
+ suspend inline fun <reified T> get(key: String): T? = db.serializable(
+ "SELECT value FROM kv WHERE key=?"
+ ) {
+ bind(key)
+ oneOrNull {
+ it.getJson("value")
+ }
+ }
+
+ /** Update a TaskStatus timestamp */
+ suspend fun updateTaskStatus(key: String, timestamp: Instant, success: Boolean) = db.serializable(
+ if (success) {
+ "INSERT INTO kv (key, value) VALUES (?, jsonb_build_object('last_successfull', ?, 'last_trial', ?)) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value"
+ } else {
+ "INSERT INTO kv (key, value) VALUES (?, jsonb_build_object('last_trial', ?)) ON CONFLICT (key) DO UPDATE SET value=jsonb_set(EXCLUDED.value, '{last_trial}'::text[], to_jsonb(?))"
+ }
+ ) {
+ bind(key)
+ bind(timestamp)
+ bind(timestamp)
+ executeUpdate()
+ }
+}
+\ No newline at end of file