commit fa00163abd4a3a2a5e53f3ff298b8d7417857a84
parent 15ccaad99c4629b6226f9c2d9ef4107b4f4c91ce
Author: Antoine A <>
Date: Fri, 20 Sep 2024 18:44:59 +0200
nexus: improve fetch with HAA
fetch HAA to only fetch new documents
only fetch order that we support
improve logging fmt
Diffstat:
8 files changed, 125 insertions(+), 99 deletions(-)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt
@@ -277,7 +277,7 @@ private suspend fun registerPayload(
*/
private suspend fun fetchEbicsDocuments(
client: EbicsClient,
- orders: List<EbicsOrder>,
+ orders: Collection<EbicsOrder>,
pinnedStart: Instant?,
): Boolean {
val lastExecutionTime: Instant? = pinnedStart
@@ -338,7 +338,7 @@ class EbicsFetch: CliktCommand() {
bankKeys
)
val docs = if (documents.isEmpty()) OrderDoc.entries else documents.toList()
- val orders = docs.map { cfg.dialect.downloadDoc(it, false) }
+ val requestedOrders = docs.map { cfg.dialect.downloadDoc(it, false) }.toSet()
if (transient) {
logger.info("Transient mode: fetching once and returning.")
val pinnedStartVal = pinnedStart
@@ -346,7 +346,7 @@ class EbicsFetch: CliktCommand() {
logger.debug("Pinning start date to: {}", pinnedStartVal)
dateToInstant(pinnedStartVal)
} else null
- if (!fetchEbicsDocuments(client, orders, pinnedStartArg)) {
+ if (!fetchEbicsDocuments(client, requestedOrders, pinnedStartArg)) {
throw ProgramResult(1)
}
} else {
@@ -356,20 +356,36 @@ class EbicsFetch: CliktCommand() {
while (true) {
val now = System.currentTimeMillis()
if (nextFullRun < now) {
+ logger.info("Running at frequency")
+ // Always add HAC as it is never announced
+ val pendingOrders = mutableSetOf<EbicsOrder>(EbicsOrder.V3.HAC)
+ client.download(EbicsOrder.V3.HAA, null, null) { stream ->
+ val haa = EbicsAdministrative.parseHAA(stream)
+ logger.debug {
+ val orders = haa.orders.map(EbicsOrder::description).joinToString(", ")
+ "HAA: ${orders}"
+ }
+ pendingOrders.addAll(haa.orders)
+ }
+ // Only fetch requested and supported orders
+ val orders = requestedOrders intersect pendingOrders
fetchEbicsDocuments(client, orders, null)
nextFullRun = now + cfg.fetch.frequency.toMillis()
}
val delay = nextFullRun - now
if (wssNotification == null) {
- logger.info("Running at frequency")
delay(delay)
} else {
val notifications = withTimeoutOrNull(delay) {
wssNotification.receive()
}
if (notifications != null) {
- logger.info("Running at real-time notifications reception")
- fetchEbicsDocuments(client, notifications, null)
+ // Only fetch requested and supported orders
+ val orders = requestedOrders intersect notifications
+ if (orders.isNotEmpty()) {
+ logger.info("Running at real-time notifications reception")
+ fetchEbicsDocuments(client, notifications, null)
+ }
}
}
}
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSetup.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSetup.kt
@@ -221,7 +221,7 @@ class EbicsSetup: CliktCommand() {
ebicsLogger,
clientKeys,
bankKeys
- ).download(EbicsOrder.V3("HKD"), null, null) { stream ->
+ ).download(EbicsOrder.V3.HKD, null, null) { stream ->
val (partner, users) = EbicsAdministrative.parseHKD(stream)
val user = users.find { it -> it.id == cfg.ebicsUserId }
// Debug logging
@@ -244,7 +244,7 @@ class EbicsSetup: CliktCommand() {
append("Supported orders:\n")
for ((order, description) in partner.orders) {
append("- ")
- append(order.fullDescription())
+ append(order.description())
append(": ")
append(description)
append('\n')
@@ -253,7 +253,7 @@ class EbicsSetup: CliktCommand() {
append("Authorized orders:\n")
for ((order) in partner.orders) {
append("- ")
- append(order.fullDescription())
+ append(order.description())
append('\n')
}
}
@@ -277,14 +277,14 @@ class EbicsSetup: CliktCommand() {
// Check partner support required orders
val unsupportedOrder = requireOrders subtract partner.orders.map { it.order }
if (unsupportedOrder.isNotEmpty()) {
- logger.warn("Unsupported orders: {}", unsupportedOrder.map(EbicsOrder::fullDescription).joinToString(", "))
+ logger.warn("Unsupported orders: {}", unsupportedOrder.map(EbicsOrder::description).joinToString(", "))
}
// Check user is authorized for required orders
if (user != null) {
val unauthorizedOrders = requireOrders subtract user.permissions subtract unsupportedOrder
if (unauthorizedOrders.isNotEmpty()) {
- logger.warn("Unauthorized orders: {}", unauthorizedOrders.map(EbicsOrder::fullDescription).joinToString(", "))
+ logger.warn("Unauthorized orders: {}", unauthorizedOrders.map(EbicsOrder::description).joinToString(", "))
}
logger.info("Subscriber status: {}", user.status.description)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsAdministrative.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsAdministrative.kt
@@ -29,7 +29,7 @@ data class VersionNumber(val number: Float, val schema: String) {
override fun toString(): String = "$number:$schema"
}
-data class HKD (
+data class HKD(
val partner: PartnerInfo,
val users: List<UserInfo>
)
@@ -38,11 +38,11 @@ data class PartnerInfo(
val accounts: List<AccountInfo>,
val orders: List<OrderInfo>
)
-data class OrderInfo (
+data class OrderInfo(
val order: EbicsOrder,
val description: String,
)
-data class AccountInfo (
+data class AccountInfo(
val currency: String,
val iban: String,
)
@@ -52,6 +52,10 @@ data class UserInfo(
val permissions: List<EbicsOrder>,
)
+data class HAA(
+ val orders: List<EbicsOrder>
+)
+
enum class UserStatus(val description: String) {
Ready("Subscriber is permitted access"),
New("Subscriber is established, pending access permission"),
@@ -87,20 +91,23 @@ object EbicsAdministrative {
}
}
+ private fun XmlDestructor.ebicsOrder(type: String): EbicsOrder =
+ EbicsOrder.V3(
+ type = type,
+ service = opt("ServiceName")?.text(),
+ scope = opt("Scope")?.text(),
+ option = opt("ServiceOption")?.text(),
+ container = opt("Container")?.attr("containerType"),
+ message = opt("MsgName")?.text(),
+ version = opt("MsgName")?.optAttr("version"),
+ )
+
fun parseHKD(stream: InputStream): HKD {
fun XmlDestructor.order(): EbicsOrder {
- var order = EbicsOrder.V3(one("AdminOrderType").text())
- opt("Service") {
- order = order.copy(
- service = opt("ServiceName")?.text(),
- scope = opt("Scope")?.text(),
- option = opt("ServiceOption")?.text(),
- container = opt("Container")?.attr("containerType"),
- message = opt("MsgName")?.text(),
- version = opt("MsgName")?.optAttr("version"),
- )
- }
- return order
+ val type = one("AdminOrderType").text()
+ return opt("Service") {
+ ebicsOrder(type)
+ } ?: EbicsOrder.V3(type)
}
return XmlDestructor.fromStream(stream, "HKDResponseOrderData") {
val partnerInfo = one("PartnerInfo") {
@@ -146,4 +153,13 @@ object EbicsAdministrative {
HKD(partnerInfo, usersInfo)
}
}
+
+ fun parseHAA(stream: InputStream): HAA {
+ return XmlDestructor.fromStream(stream, "HAAResponseOrderData") {
+ val orders = map("Service") {
+ ebicsOrder("BTD")
+ }
+ HAA(orders)
+ }
+ }
}
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt
@@ -102,12 +102,12 @@ suspend fun EbicsBTS.postBTS(
doc,
bankKeys.bank_authentication_public_key
)) {
- throw EbicsError.Protocol("$phase: bank signature did not verify")
+ throw EbicsError.Protocol("$phase ${order.description()}: bank signature did not verify")
}
val response = try {
EbicsBTS.parseResponse(doc)
} catch (e: Exception) {
- throw EbicsError.Protocol("$phase: invalid ebics response", e)
+ throw EbicsError.Protocol("$phase ${order.description()}: invalid ebics response", e)
}
logger.debug {
buildString {
@@ -148,7 +148,8 @@ class EbicsClient(
endDate: Instant?,
processing: suspend (InputStream) -> Unit,
) {
- logger.debug { "Download order ${order.description()}" }
+ val description = order.description()
+ logger.debug { "Download order $description" }
val txLog = ebicsLogger.tx(order)
val impl = EbicsBTS(cfg, bankKeys, clientKeys, order)
@@ -171,22 +172,22 @@ class EbicsClient(
if (initResp.bankCode == EbicsReturnCode.EBICS_NO_DOWNLOAD_DATA_AVAILABLE) {
return@withContext null
}
- val initContent = initResp.okOrFail("Download init")
+ val initContent = initResp.okOrFail("Download init $description")
val tId = requireNotNull(initContent.transactionID) {
- "Download init: missing transaction ID"
+ "Download init $description: missing transaction ID"
}
db.ebics.register(tId)
Pair(tId, initContent)
}
val (tId, initContent) = if (init == null) return else init
val howManySegments = requireNotNull(initContent.numSegments) {
- "Download init: missing num segments"
+ "Download init $description: missing num segments"
}
val firstSegment = requireNotNull(initContent.segment) {
- "Download init: missing OrderData"
+ "Download init $description: missing OrderData"
}
val dataEncryptionInfo = requireNotNull(initContent.dataEncryptionInfo) {
- "Download init: missing EncryptionInfo"
+ "Download init $description: missing EncryptionInfo"
}
// Transfer phase
@@ -194,13 +195,14 @@ class EbicsClient(
for (x in 2 .. howManySegments) {
val transReq = impl.downloadTransfer(x, howManySegments, tId)
val transResp = impl.postBTS(client, transReq, "Download transfer", txLog.step("transfer$x"))
- .okOrFail("Download transfer")
+ .okOrFail("Download transfer $description")
val segment = requireNotNull(transResp.segment) {
"Download transfer: missing encrypted segment"
}
segments.add(segment)
}
+
// Decompress encrypted chunks
val payloadStream = try {
decryptAndDecompressPayload(
@@ -226,7 +228,7 @@ class EbicsClient(
// First send a proper EBICS transaction receipt
val xml = impl.downloadReceipt(tId, res.isSuccess)
impl.postBTS(client, xml, "Download receipt", txLog.step("receipt"))
- .okOrFail("Download receipt")
+ .okOrFail("Download receipt $description")
runCatching { db.ebics.remove(tId) }
// Then throw business logic exception if any
res.getOrThrow()
@@ -243,7 +245,8 @@ class EbicsClient(
order: EbicsOrder,
payload: ByteArray,
): String {
- logger.debug { "Upload order ${order.description()}" }
+ val description = order.description();
+ logger.debug { "Upload order $description" }
val txLog = ebicsLogger.tx(order)
val impl = EbicsBTS(cfg, bankKeys, clientKeys, order)
val preparedPayload = prepareUploadPayload(cfg, clientKeys, bankKeys, payload)
@@ -251,12 +254,12 @@ class EbicsClient(
// Init phase
val initXml = impl.uploadInitialization(preparedPayload)
val initResp = impl.postBTS(client, initXml, "Upload init", txLog.step("init"))
- .okOrFail("Upload init")
+ .okOrFail("Upload init $description")
val tId = requireNotNull(initResp.transactionID) {
- "Upload init: missing transaction ID"
+ "Upload init $description: missing transaction ID"
}
val orderId = requireNotNull(initResp.orderID) {
- "Upload init: missing order ID"
+ "Upload init $description: missing order ID"
}
txLog.payload(payload, "xml")
@@ -265,7 +268,7 @@ class EbicsClient(
for (i in 1..preparedPayload.segments.size) {
val transferXml = impl.uploadTransfer(tId, preparedPayload, i)
impl.postBTS(client, transferXml, "Upload transfer", txLog.step("transfer$i"))
- .okOrFail("Upload transfer")
+ .okOrFail("Upload transfer $description")
}
return orderId
}
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsOrder.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsOrder.kt
@@ -33,18 +33,20 @@ sealed class EbicsOrder(val schema: String) {
val version: String? = null,
val container: String? = null,
val option: String? = null
- ): EbicsOrder("H005")
-
- companion object {
- val WSS_PARAMS = V3(
- type = "BTD",
- service = "OTH",
- scope = "DE",
- message = "wssparam"
- )
+ ): EbicsOrder("H005") {
+ companion object {
+ val WSS_PARAMS = V3(
+ type = "BTD",
+ service = "OTH",
+ scope = "DE",
+ message = "wssparam"
+ )
+ val HAC = V3(type = "HAC")
+ val HKD = V3(type = "HKD")
+ val HAA = V3(type = "HAA")
+ }
}
- /** Minimal text description for file logging */
fun description(): String = buildString {
when (this@EbicsOrder) {
is V2_5 -> {
@@ -54,37 +56,17 @@ sealed class EbicsOrder(val schema: String) {
}
is V3 -> {
append(type)
- for (part in sequenceOf(service, option, message)) {
- if (part != null) {
- append('-')
- append(part)
- }
- }
- }
- }
- }
-
- /** Full text description */
- fun fullDescription(): String = buildString {
- when (this@EbicsOrder) {
- is V2_5 -> {
- append(type)
- append(' ')
- append(attribute)
- }
- is V3 -> {
- append(type)
for (part in sequenceOf(service, scope, option, container)) {
if (part != null) {
- append(' ')
+ append('-')
append(part)
}
}
if (message != null) {
- append(' ')
+ append('-')
append(message)
if (version != null) {
- append('v')
+ append('.')
append(version)
}
}
@@ -167,7 +149,7 @@ enum class Dialect {
}
} else {
when (doc) {
- OrderDoc.acknowledgement -> EbicsOrder.V3("HAC")
+ OrderDoc.acknowledgement -> EbicsOrder.V3.HAC
OrderDoc.status -> EbicsOrder.V3("BTD", "PSR", "CH", "pain.002", "10", "ZIP")
OrderDoc.report -> EbicsOrder.V3("BTD", "STM", "CH", "camt.052", "08", "ZIP")
OrderDoc.statement -> EbicsOrder.V3("BTD", "EOP", "CH", "camt.053", "08", "ZIP")
@@ -177,7 +159,7 @@ enum class Dialect {
}
// TODO for GLS we might have to fetch the same kind of files from multiple orders
gls -> when (doc) {
- OrderDoc.acknowledgement -> EbicsOrder.V3("HAC")
+ OrderDoc.acknowledgement -> EbicsOrder.V3.HAC
OrderDoc.status -> EbicsOrder.V3("BTD", "REP", "DE", "pain.002", null, "ZIP", "SCT")
OrderDoc.report -> EbicsOrder.V3("BTD", "STM", "DE", "camt.052", null, "ZIP")
OrderDoc.statement -> EbicsOrder.V3("BTD", "EOP", "DE", "camt.053", null, "ZIP")
@@ -193,6 +175,11 @@ enum class Dialect {
}
}
- fun orders(): Set<EbicsOrder> =
- (sequenceOf(EbicsOrder.V3("HAA")) + OrderDoc.entries.map { downloadDoc(it, false) }).toSet()
+ /** All orders required for a dialect implementation to work */
+ fun orders(): Set<EbicsOrder> = (
+ // Administrative orders
+ sequenceOf(EbicsOrder.V3.HAA, EbicsOrder.V3.HKD)
+ // and documents orders
+ + OrderDoc.entries.map { downloadDoc(it, false) }
+ ).toSet()
}
\ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt
@@ -98,14 +98,14 @@ sealed interface WssNotification {
/** Download EBICS real-time notifications websocket params */
suspend fun EbicsClient.wssParams(): WssParams {
lateinit var params: WssParams
- download(EbicsOrder.WSS_PARAMS, null, null) { stream ->
+ download(EbicsOrder.V3.WSS_PARAMS, null, null) { stream ->
params = Json.decodeFromStream(stream)
}
return params
}
/** Receive a JSON message from a websocket session */
-suspend inline fun <reified T> DefaultClientWebSocketSession.receiveJson(): T {
+private suspend inline fun <reified T> DefaultClientWebSocketSession.receiveJson(): T {
val frame = incoming.receive()
val content = frame.readBytes()
val msg = Json.decodeFromStream(kotlinx.serialization.serializer<T>(), content.inputStream())
@@ -142,11 +142,6 @@ suspend fun WssParams.connect(client: HttpClient, lambda: suspend (WssNotificati
// TODO use receiveDeserialized from ktor when it works
val msg = receiveJson<WssNotification>()
logger.trace("received: {}", msg)
- if (msg is WssGeneralInfo) {
- for (info in msg.INFO) {
- logger.info("info: {}", info.FREE)
- }
- }
lambda(msg)
}
}
@@ -181,19 +176,26 @@ suspend fun listenForNotification(client: EbicsClient): ReceiveChannel<List<Ebic
logger.trace("{}", params)
params.connect(client.client) { msg ->
backoff.reset()
- if (msg is WssNewData) {
- val orders = msg.BTF.map {
- EbicsOrder.V3(
- type = "BTD",
- service = it.SERVICE,
- scope = it.SCOPE,
- message = it.MSGNAME,
- version = it.VERSION,
- container = it.CONTTYPE,
- option = it.OPTION
- )
+ when (msg) {
+ is WssGeneralInfo -> {
+ for (info in msg.INFO) {
+ logger.info("info: {}", info.FREE)
+ }
+ }
+ is WssNewData -> {
+ val orders = msg.BTF.map {
+ EbicsOrder.V3(
+ type = "BTD",
+ service = it.SERVICE,
+ scope = it.SCOPE,
+ message = it.MSGNAME,
+ version = it.VERSION,
+ container = it.CONTTYPE,
+ option = it.OPTION
+ )
+ }
+ channel.send(orders)
}
- channel.send(orders)
}
}
} catch (e: Exception) {
diff --git a/testbench/src/main/kotlin/Main.kt b/testbench/src/main/kotlin/Main.kt
@@ -95,10 +95,10 @@ class Cli : CliktCommand() {
LIBEUFIN_NEXUS_HOME = test/$platform
[nexus-fetch]
- FREQUENCY = 5d
+ FREQUENCY = 1min
[nexus-submit]
- FREQUENCY = 5d
+ FREQUENCY = 1min
[libeufin-nexusdb-postgres]
CONFIG = postgres:///libeufintestbench
diff --git a/testbench/src/test/kotlin/Iso20022Test.kt b/testbench/src/test/kotlin/Iso20022Test.kt
@@ -113,6 +113,8 @@ class Iso20022Test {
// Skip
} else if (name.contains("HAC")) {
parseCustomerAck(content)
+ } else if (name.contains("HAA")) {
+ EbicsAdministrative.parseHAA(content)
} else if (name.contains("HKD")) {
EbicsAdministrative.parseHKD(content)
} else if (name.contains("pain.002")) {