libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit f052c1b50bb405fc99507ab478cc93547e9bd1f6
parent 4b91ab9d9fc69f3446eb46438d48bf49a650eaec
Author: Antoine A <>
Date:   Tue, 30 Jul 2024 16:53:10 +0200

nexus: support real-time EBICS notifications over websocket

Diffstat:
Mcontrib/bank.conf | 6+++---
Mnexus/src/main/kotlin/tech/libeufin/nexus/EbicsLogger.kt | 2+-
Mnexus/src/main/kotlin/tech/libeufin/nexus/XmlCombinators.kt | 14+++++++-------
Mnexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt | 155+++++++++++++++++++++++++++++++++++--------------------------------------------
Mnexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSubmit.kt | 25+++++++++++--------------
Mnexus/src/main/kotlin/tech/libeufin/nexus/cli/Testing.kt | 25+++++++------------------
Mnexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsBTS.kt | 8++++----
Mnexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt | 26+++++++++++++++-----------
Mnexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsOrder.kt | 99++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----------------
Mnexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt | 67++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mtestbench/src/main/kotlin/Main.kt | 4++--
11 files changed, 260 insertions(+), 171 deletions(-)

diff --git a/contrib/bank.conf b/contrib/bank.conf @@ -57,10 +57,10 @@ WIRE_TYPE = # Path to TAN challenge transmission script via email. If not specified, this TAN channel will not be supported. # TAN_EMAIL = libeufin-tan-email.sh -# Environment variables for the sms TAN script as a single line JSON object +# Environment variables for the sms TAN script as a single-line JSON object # TAN_SMS_ENV = { "AUTH_TOKEN": "secret-token" } -# Environment variables for the email TAN script as a single line JSON object +# Environment variables for the email TAN script as a single-line JSON object # TAN_EMAIL_ENV = { "AUTH_TOKEN": "secret-token" } # How "libeufin-bank serve" serves its API, this can either be tcp or unix @@ -87,7 +87,7 @@ SPA = $DATADIR/spa/ # Password hash algorithm, this can only be bcrypt PWD_HASH_ALGORITHM = bcrypt -# Password hash algorithm configuration as a single line JSON object +# Password hash algorithm configuration as a single-line JSON object # When PWD_HASH_ALGORITHM = bcrypt you can configure cost PWD_HASH_CONFIG = { "cost": 8 } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsLogger.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsLogger.kt @@ -68,7 +68,7 @@ class EbicsLogger(private val dir: Path?) { } is EbicsOrder.V3 -> { append(order.type) - for (part in sequenceOf(order.name, order.messageName, order.option)) { + for (part in sequenceOf(order.service, order.message, order.option)) { if (part != null) { append('-') append(part) diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/XmlCombinators.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/XmlCombinators.kt @@ -151,25 +151,25 @@ class XmlDestructor internal constructor(private val el: Element) { }.toList() } - fun one(path: String): XmlDestructor { - val children = el.childrenByTag(path).iterator() + fun one(tag: String): XmlDestructor { + val children = el.childrenByTag(tag).iterator() if (!children.hasNext()) { - throw DestructionError("expected unique '${el.tagName}.$path', got none") + throw DestructionError("expected unique '${el.tagName}.$tag', got none") } val el = children.next() if (children.hasNext()) { - throw DestructionError("expected unique '${el.tagName}.$path', got ${children.asSequence().count() + 1}") + throw DestructionError("expected unique '${el.tagName}.$tag', got ${children.asSequence().count() + 1}") } return XmlDestructor(el) } - fun opt(path: String): XmlDestructor? { - val children = el.childrenByTag(path).iterator() + fun opt(tag: String): XmlDestructor? { + val children = el.childrenByTag(tag).iterator() if (!children.hasNext()) { return null } val el = children.next() if (children.hasNext()) { - throw DestructionError("expected optional '${el.tagName}.$path', got ${children.asSequence().count() + 1}") + throw DestructionError("expected optional '${el.tagName}.$tag', got ${children.asSequence().count() + 1}") } return XmlDestructor(el) } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt @@ -26,13 +26,12 @@ import com.github.ajalt.clikt.parameters.arguments.unique import com.github.ajalt.clikt.parameters.groups.provideDelegate import com.github.ajalt.clikt.parameters.options.option import com.github.ajalt.clikt.parameters.types.enum -import kotlinx.coroutines.delay +import kotlinx.coroutines.* import tech.libeufin.common.* import tech.libeufin.nexus.* import tech.libeufin.nexus.db.Database import tech.libeufin.nexus.db.PaymentDAO.IncomingRegistrationResult -import tech.libeufin.nexus.ebics.EbicsClient -import tech.libeufin.nexus.ebics.SupportedDocument +import tech.libeufin.nexus.ebics.* import java.io.IOException import java.io.InputStream import java.time.Duration @@ -112,17 +111,17 @@ suspend fun ingestIncomingPayment( ) } -/** Ingest an EBICS [payload] of [document] into [db] */ +/** Ingest an EBICS [payload] of [doc] into [db] */ private suspend fun ingestPayload( db: Database, cfg: NexusEbicsConfig, payload: InputStream, - document: SupportedDocument + doc: OrderDoc ) { /** Ingest a single EBICS [xml] [document] into [db] */ suspend fun ingest(xml: InputStream) { - when (document) { - SupportedDocument.CAMT_052, SupportedDocument.CAMT_053, SupportedDocument.CAMT_054 -> { + when (doc) { + OrderDoc.report, OrderDoc.statement, OrderDoc.notification -> { try { parseTx(xml, cfg.currency, cfg.dialect).forEach { if (cfg.fetch.ignoreBefore != null && it.executionTime < cfg.fetch.ignoreBefore) { @@ -142,7 +141,7 @@ private suspend fun ingestPayload( throw Exception("Ingesting notifications failed", e) } } - SupportedDocument.PAIN_002_LOGS -> { + OrderDoc.acknowledgement -> { val acks = parseCustomerAck(xml) for (ack in acks) { when (ack.actionType) { @@ -167,7 +166,7 @@ private suspend fun ingestPayload( } } } - SupportedDocument.PAIN_002 -> { + OrderDoc.status -> { val status = parseCustomerPaymentStatusReport(xml) val msg = status.msg() logger.debug("{}", status) @@ -182,11 +181,11 @@ private suspend fun ingestPayload( } // Unzip payload if necessary - when (document) { - SupportedDocument.PAIN_002, - SupportedDocument.CAMT_052, - SupportedDocument.CAMT_053, - SupportedDocument.CAMT_054 -> { + when (doc) { + OrderDoc.status, + OrderDoc.report, + OrderDoc.statement, + OrderDoc.notification -> { try { payload.unzipEach { fileName, xml -> logger.trace("parse $fileName") @@ -196,12 +195,12 @@ private suspend fun ingestPayload( throw Exception("Could not open any ZIP archive", e) } } - SupportedDocument.PAIN_002_LOGS -> ingest(payload) + OrderDoc.acknowledgement -> ingest(payload) } } /** - * Fetch and ingest banking records of type [docs] using EBICS [client] starting from [pinnedStart] + * Fetch and ingest banking records from [orders] using EBICS [client] starting from [pinnedStart] * * If [pinnedStart] is null fetch new records. * @@ -209,80 +208,46 @@ private suspend fun ingestPayload( */ private suspend fun fetchEbicsDocuments( client: EbicsClient, - docs: List<EbicsDocument>, + orders: List<EbicsOrder>, pinnedStart: Instant?, ): Boolean { val lastExecutionTime: Instant? = pinnedStart - return docs.all { doc -> - try { - if (lastExecutionTime == null) { - logger.info("Fetching new '${doc.fullDescription()}'") - } else { - logger.info("Fetching '${doc.fullDescription()}' from timestamp: $lastExecutionTime") - } - // downloading the content - val doc = doc.doc() - val order = client.cfg.dialect.downloadDoc(doc, false) - client.download( - order, - lastExecutionTime, - null - ) { payload -> - ingestPayload(client.db, client.cfg, payload, doc) - } + return orders.all { order -> + val doc = order.doc() + if (doc == null) { + logger.debug("Skip unsupported order $order") true - } catch (e: Exception) { - e.fmtLog(logger) - false + } else { + try { + if (lastExecutionTime == null) { + logger.info("Fetching new '${doc.fullDescription()}'") + } else { + logger.info("Fetching '${doc.fullDescription()}' from timestamp: $lastExecutionTime") + } + // downloading the content + client.download( + order, + lastExecutionTime, + null + ) { payload -> + ingestPayload(client.db, client.cfg, payload, doc) + } + true + } catch (e: Exception) { + e.fmtLog(logger) + false + } } } } -enum class EbicsDocument { - /// EBICS acknowledgement - CustomerAcknowledgement HAC pain.002 - acknowledgement, - /// Payment status - CustomerPaymentStatusReport pain.002 - status, - /// Account intraday reports - BankToCustomerAccountReport camt.052 - report, - /// Account statements - BankToCustomerStatement camt.053 - statement, - /// Debit & credit notifications - BankToCustomerDebitCreditNotification camt.054 - notification, - ; - - fun shortDescription(): String = when (this) { - acknowledgement -> "EBICS acknowledgement" - status -> "Payment status" - report -> "Account intraday reports" - statement -> "Account statements" - notification -> "Debit & credit notifications" - } - - fun fullDescription(): String = when (this) { - acknowledgement -> "EBICS acknowledgement - CustomerAcknowledgement HAC pain.002" - status -> "Payment status - CustomerPaymentStatusReport pain.002" - report -> "Account intraday reports - BankToCustomerAccountReport camt.052" - statement -> "Account statements - BankToCustomerStatement camt.053" - notification -> "Debit & credit notifications - BankToCustomerDebitCreditNotification camt.054" - } - - fun doc(): SupportedDocument = when (this) { - acknowledgement -> SupportedDocument.PAIN_002_LOGS - status -> SupportedDocument.PAIN_002 - report -> SupportedDocument.CAMT_052 - statement -> SupportedDocument.CAMT_053 - notification -> SupportedDocument.CAMT_054 - } -} - class EbicsFetch: CliktCommand("Downloads and parse EBICS files from the bank and ingest them into the database") { private val common by CommonOption() private val transient by transientOption() - private val documents: Set<EbicsDocument> by argument( + private val documents: Set<OrderDoc> by argument( help = "Which documents should be fetched? If none are specified, all supported documents will be fetched", - helpTags = EbicsDocument.entries.associate { Pair(it.name, it.shortDescription()) }, - ).enum<EbicsDocument>().multiple().unique() + helpTags = OrderDoc.entries.associate { Pair(it.name, it.shortDescription()) }, + ).enum<OrderDoc>().multiple().unique() private val pinnedStart by option( help = "Only supported in --transient mode, this option lets specify the earliest timestamp of the downloaded documents", metavar = "YYYY-MM-DD" @@ -301,7 +266,8 @@ class EbicsFetch: CliktCommand("Downloads and parse EBICS files from the bank an clientKeys, bankKeys ) - val docs = if (documents.isEmpty()) EbicsDocument.entries else documents.toList() + val docs = if (documents.isEmpty()) OrderDoc.entries else documents.toList() + val orders = docs.map { cfg.dialect.downloadDoc(it, false) } if (transient) { logger.info("Transient mode: fetching once and returning.") val pinnedStartVal = pinnedStart @@ -309,18 +275,33 @@ class EbicsFetch: CliktCommand("Downloads and parse EBICS files from the bank an logger.debug("Pinning start date to: $pinnedStartVal") dateToInstant(pinnedStartVal) } else null - if (!fetchEbicsDocuments(client, docs, pinnedStartArg)) { + if (!fetchEbicsDocuments(client, orders, pinnedStartArg)) { throw Exception("Failed to fetch documents") } } else { - logger.debug("Running with a frequency of ${cfg.fetch.frequencyRaw}") - if (cfg.fetch.frequency == Duration.ZERO) { - logger.warn("Long-polling not implemented, running therefore in transient mode") + val wssNotification = listenForNotification(client) + logger.info("Running with a frequency of ${cfg.fetch.frequencyRaw}") + var nextFullRun = 0L + while (true) { + val now = System.currentTimeMillis() + if (nextFullRun < now) { + fetchEbicsDocuments(client, orders, null) + nextFullRun = now + cfg.fetch.frequency.toMillis() + } + val delay = nextFullRun - now + if (wssNotification == null) { + logger.info("Runnning at frequency") + delay(delay) + } else { + val notifications = withTimeoutOrNull(delay) { + wssNotification.receive() + } + if (notifications != null) { + logger.info("Running at real-time notifications reception") + fetchEbicsDocuments(client, notifications, null) + } + } } - do { - fetchEbicsDocuments(client, docs, null) - delay(cfg.fetch.frequency.toKotlinDuration()) - } while (cfg.fetch.frequency != Duration.ZERO) } } } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSubmit.kt @@ -105,25 +105,22 @@ class EbicsSubmit : CliktCommand("Submits pending initiated payments found in th clientKeys, bankKeys ) - val frequency: Duration = if (transient) { + + if (transient) { logger.info("Transient mode: submitting what found and returning.") - Duration.ZERO + submitBatch(client) } else { logger.debug("Running with a frequency of ${cfg.submit.frequencyRaw}") - if (cfg.submit.frequency == Duration.ZERO) { - logger.warn("Long-polling not implemented, running therefore in transient mode") + while (true) { + try { + submitBatch(client) + } catch (e: Exception) { + throw Exception("Failed to submit payments", e) + } + // TODO take submitBatch taken time in the delay + delay(cfg.submit.frequency.toKotlinDuration()) } - cfg.submit.frequency } - do { - try { - submitBatch(client) - } catch (e: Exception) { - throw Exception("Failed to submit payments", e) - } - // TODO take submitBatch taken time in the delay - delay(frequency.toKotlinDuration()) - } while (frequency != Duration.ZERO) } } } \ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/Testing.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/Testing.kt @@ -32,10 +32,7 @@ import com.github.ajalt.clikt.parameters.types.enum import kotlinx.coroutines.delay import tech.libeufin.common.* import tech.libeufin.nexus.* -import tech.libeufin.nexus.ebics.EbicsClient -import tech.libeufin.nexus.ebics.EbicsOrder -import tech.libeufin.nexus.ebics.connect -import tech.libeufin.nexus.ebics.wssParams +import tech.libeufin.nexus.ebics.* import java.time.Instant class Wss: CliktCommand("Listen to EBICS instant notification over websocket") { @@ -43,7 +40,6 @@ class Wss: CliktCommand("Listen to EBICS instant notification over websocket") { private val ebicsLog by ebicsLogOption() override fun run() = cliCmd(logger, common.log) { - val backoff = ExpoBackoffDecorr() nexusConfig(common.config).withDb { db, nexusCgf -> val cfg = nexusCgf.ebics val (clientKeys, bankKeys) = expectFullKeys(cfg) @@ -56,17 +52,11 @@ class Wss: CliktCommand("Listen to EBICS instant notification over websocket") { clientKeys, bankKeys ) - while (true) { - try { - logger.info("Fetch WSS params") - val params = client.wssParams() - logger.debug("{}", params) - logger.info("Start listening") - params.connect(httpClient) { - backoff.reset() - } - } catch (e: Exception) { - delay(backoff.next()) + val wssNotifications = listenForNotification(client) + if (wssNotifications != null) { + while (true) { + val notifications = wssNotifications.receive() + logger.debug("{}", wssNotifications) } } } @@ -122,8 +112,7 @@ class TxCheck: CliktCommand("Check transaction semantic") { val nexusCgf = nexusConfig(common.config) val cfg = nexusCgf.ebics val (clientKeys, bankKeys) = expectFullKeys(cfg) - val doc = EbicsDocument.acknowledgement.doc() - val order = cfg.dialect.downloadDoc(doc, false) + val order = cfg.dialect.downloadDoc(OrderDoc.acknowledgement, false) val client = httpClient() val result = tech.libeufin.nexus.test.txCheck(client, cfg, clientKeys, bankKeys, order, cfg.dialect.directDebit()) println("$result") diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsBTS.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsBTS.kt @@ -268,7 +268,7 @@ class EbicsBTS( private fun XmlBuilder.service(order: EbicsOrder.V3) { el("Service") { - el("ServiceName", order.name!!) + el("ServiceName", order.service!!) if (order.scope != null) { el("Scope", order.scope) } @@ -281,9 +281,9 @@ class EbicsBTS( } } el("MsgName") { - if (order.messageVersion != null) - attr("version", order.messageVersion) - text(order.messageName!!) + if (order.version != null) + attr("version", order.version) + text(order.message!!) } } } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt @@ -53,10 +53,14 @@ enum class SupportedDocument { /** EBICS related errors */ sealed class EbicsError(msg: String, cause: Throwable? = null): Exception(msg, cause) { - /** Http and network errors */ - class Transport(msg: String, cause: Throwable? = null): EbicsError(msg, cause) + /** Network errors */ + class Network(msg: String, cause: Throwable): EbicsError(msg, cause) + /** Http errors */ + class HTTP(msg: String, val status: HttpStatusCode): EbicsError(msg) /** EBICS protocol & XML format error */ class Protocol(msg: String, cause: Throwable? = null): EbicsError(msg, cause) + /** EBICS protocol & XML format error */ + class Code(msg: String, val technicalCode: EbicsReturnCode, val bankCode: EbicsReturnCode): EbicsError(msg) } /** POST an EBICS request [msg] to [bankUrl] returning a parsed XML response */ @@ -72,11 +76,11 @@ suspend fun HttpClient.postToBank( setBody(msg) } } catch (e: Exception) { - throw EbicsError.Transport("$phase: failed to contact bank", e) + throw EbicsError.Network("$phase: failed to contact bank", e) } if (res.status != HttpStatusCode.OK) { - throw EbicsError.Transport("$phase: bank HTTP error: ${res.status}") + throw EbicsError.HTTP("$phase: bank HTTP error: ${res.status}", res.status) } try { val bodyStream = res.bodyAsChannel().toInputStream(); @@ -85,7 +89,7 @@ suspend fun HttpClient.postToBank( } catch (e: SAXException) { throw EbicsError.Protocol("$phase: invalid XML bank response", e) } catch (e: Exception) { - throw EbicsError.Transport("$phase: failed read bank response", e) + throw EbicsError.Network("$phase: failed read bank response", e) } } @@ -380,12 +384,12 @@ class EbicsResponse<T>( /** Checks that return codes are both EBICS_OK or throw an exception */ fun okOrFail(phase: String): T { - require(technicalCode.kind() != EbicsReturnCode.Kind.Error) { - "$phase has technical error: $technicalCode" - } - require(bankCode.kind() != EbicsReturnCode.Kind.Error) { - "$phase has bank error: $bankCode" + if (technicalCode.kind() == EbicsReturnCode.Kind.Error) { + throw EbicsError.Code("$phase has technical error: $technicalCode", technicalCode, bankCode) + } else if (bankCode.kind() == EbicsReturnCode.Kind.Error) { + throw EbicsError.Code("$phase has bank error: $bankCode", technicalCode, bankCode) + } else { + return content } - return content } } \ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsOrder.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsOrder.kt @@ -27,10 +27,10 @@ sealed class EbicsOrder(val schema: String) { ): EbicsOrder("H004") data class V3( val type: String, - val name: String? = null, + val service: String? = null, val scope: String? = null, - val messageName: String? = null, - val messageVersion: String? = null, + val message: String? = null, + val version: String? = null, val container: String? = null, val option: String? = null ): EbicsOrder("H005") @@ -38,46 +38,103 @@ sealed class EbicsOrder(val schema: String) { companion object { val WSS_PARAMS = EbicsOrder.V3( type = "BTD", - name = "OTH", + service = "OTH", scope = "DE", - messageName = "wssparam" + message = "wssparam" ) } + + fun doc(): OrderDoc? { + return when (this) { + is EbicsOrder.V2_5 -> { + when (this.type) { + "HAC" -> OrderDoc.acknowledgement + "Z01" -> OrderDoc.status + "Z52" -> OrderDoc.report + "Z53" -> OrderDoc.statement + "Z54" -> OrderDoc.notification + else -> null + } + } + is EbicsOrder.V3 -> { + when (this.type) { + "HAC" -> OrderDoc.acknowledgement + "BTD" -> when (this.message) { + "pain.002" -> OrderDoc.status + "camt.052" -> OrderDoc.report + "camt.053" -> OrderDoc.statement + "camt.054" -> OrderDoc.notification + else -> null + } + else -> null + } + } + } + } +} + +enum class OrderDoc { + /// EBICS acknowledgement - CustomerAcknowledgement HAC pain.002 + acknowledgement, + /// Payment status - CustomerPaymentStatusReport pain.002 + status, + /// Account intraday reports - BankToCustomerAccountReport camt.052 + report, + /// Account statements - BankToCustomerStatement camt.053 + statement, + /// Debit & credit notifications - BankToCustomerDebitCreditNotification camt.054 + notification; + + fun shortDescription(): String = when (this) { + acknowledgement -> "EBICS acknowledgement" + status -> "Payment status" + report -> "Account intraday reports" + statement -> "Account statements" + notification -> "Debit & credit notifications" + } + + fun fullDescription(): String = when (this) { + acknowledgement -> "EBICS acknowledgement - CustomerAcknowledgement HAC pain.002" + status -> "Payment status - CustomerPaymentStatusReport pain.002" + report -> "Account intraday reports - BankToCustomerAccountReport camt.052" + statement -> "Account statements - BankToCustomerStatement camt.053" + notification -> "Debit & credit notifications - BankToCustomerDebitCreditNotification camt.054" + } } enum class Dialect { postfinance, gls; - fun downloadDoc(doc: SupportedDocument, ebics2: Boolean): EbicsOrder { + fun downloadDoc(doc: OrderDoc, ebics2: Boolean): EbicsOrder { return when (this) { postfinance -> { // TODO test platform need EBICS2 for HAC, should we use a separate dialect ? if (ebics2) { when (doc) { - SupportedDocument.PAIN_002 -> EbicsOrder.V2_5("Z01", "DZHNN") - SupportedDocument.CAMT_052 -> EbicsOrder.V2_5("Z52", "DZHNN") - SupportedDocument.CAMT_053 -> EbicsOrder.V2_5("Z53", "DZHNN") - SupportedDocument.CAMT_054 -> EbicsOrder.V2_5("Z54", "DZHNN") - SupportedDocument.PAIN_002_LOGS -> EbicsOrder.V2_5("HAC", "DZHNN") + OrderDoc.acknowledgement -> EbicsOrder.V2_5("HAC", "DZHNN") + OrderDoc.status -> EbicsOrder.V2_5("Z01", "DZHNN") + OrderDoc.report -> EbicsOrder.V2_5("Z52", "DZHNN") + OrderDoc.statement -> EbicsOrder.V2_5("Z53", "DZHNN") + OrderDoc.notification -> EbicsOrder.V2_5("Z54", "DZHNN") } } else { when (doc) { - SupportedDocument.PAIN_002 -> EbicsOrder.V3("BTD", "PSR", "CH", "pain.002", "10", "ZIP") - SupportedDocument.CAMT_052 -> EbicsOrder.V3("BTD", "STM", "CH", "camt.052", "08", "ZIP") - SupportedDocument.CAMT_053 -> EbicsOrder.V3("BTD", "EOP", "CH", "camt.053", "08", "ZIP") - SupportedDocument.CAMT_054 -> EbicsOrder.V3("BTD", "REP", "CH", "camt.054", "08", "ZIP") - SupportedDocument.PAIN_002_LOGS -> EbicsOrder.V3("HAC") + OrderDoc.acknowledgement -> EbicsOrder.V3("HAC") + OrderDoc.status -> EbicsOrder.V3("BTD", "PSR", "CH", "pain.002", "10", "ZIP") + OrderDoc.report -> EbicsOrder.V3("BTD", "STM", "CH", "camt.052", "08", "ZIP") + OrderDoc.statement -> EbicsOrder.V3("BTD", "EOP", "CH", "camt.053", "08", "ZIP") + OrderDoc.notification -> EbicsOrder.V3("BTD", "REP", "CH", "camt.054", "08", "ZIP") } } } // TODO for GLS we might have to fetch the same kind of files from multiple orders gls -> when (doc) { - SupportedDocument.PAIN_002 -> EbicsOrder.V3("BTD", "REP", "DE", "pain.002", null, "ZIP", "SCT") - SupportedDocument.CAMT_052 -> EbicsOrder.V3("BTD", "STM", "DE", "camt.052", null, "ZIP") - SupportedDocument.CAMT_053 -> EbicsOrder.V3("BTD", "EOP", "DE", "camt.053", null, "ZIP") - SupportedDocument.CAMT_054 -> EbicsOrder.V3("BTD", "STM", "DE", "camt.054", null, "ZIP", "SCI") - SupportedDocument.PAIN_002_LOGS -> EbicsOrder.V3("HAC") + OrderDoc.acknowledgement -> EbicsOrder.V3("HAC") + OrderDoc.status -> EbicsOrder.V3("BTD", "REP", "DE", "pain.002", null, "ZIP", "SCT") + OrderDoc.report -> EbicsOrder.V3("BTD", "STM", "DE", "camt.052", null, "ZIP") + OrderDoc.statement -> EbicsOrder.V3("BTD", "EOP", "DE", "camt.053", null, "ZIP") + OrderDoc.notification -> EbicsOrder.V3("BTD", "STM", "DE", "camt.054", null, "ZIP", "SCI") } } } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt @@ -27,9 +27,11 @@ import io.ktor.serialization.kotlinx.* import io.ktor.websocket.* import kotlinx.serialization.Serializable import kotlinx.serialization.json.* +import kotlinx.coroutines.channels.* +import kotlinx.coroutines.* import org.slf4j.Logger import org.slf4j.LoggerFactory -import tech.libeufin.common.encodeBase64 +import tech.libeufin.common.* private val logger: Logger = LoggerFactory.getLogger("libeufin-nexus-ws") @@ -136,11 +138,70 @@ suspend fun WssParams.connect(client: HttpClient, lambda: suspend (WssNotificati } }) { while (true) { - logger.info("waiting for msg") + logger.trace("wait for ws msg") // TODO use receiveDeserialized from ktor when it works val msg = receiveJson<WssNotification>() - logger.info("msg: {}", msg) + logger.trace("received: {}", msg) + if (msg is WssGeneralInfo) { + for (info in msg.INFO) { + logger.info("info: {}", info.FREE) + } + } lambda(msg) } } +} + +suspend fun listenForNotification(client: EbicsClient): ReceiveChannel<List<EbicsOrder>>? { + // Try to get params + val params = try { + client.wssParams() + } catch (e: EbicsError) { + if ( + // Expected EBICS error + (e is EbicsError.Code && e.technicalCode == EbicsReturnCode.EBICS_INVALID_ORDER_IDENTIFIER) || + // Netzbon HTTP error + (e is EbicsError.HTTP && e.status == HttpStatusCode.BadRequest) + ) { + // Failure is expected if this wss is not supported + logger.info("Real-time EBICS notifications is not supported") + return null + } else { + throw e + } + } + logger.info("Listening to real-time EBICS notifications") + val channel = Channel<List<EbicsOrder>>() + val backoff = ExpoBackoffDecorr() + kotlin.concurrent.thread(isDaemon = true) { + runBlocking { + while (true) { + try { + val params = client.wssParams() + logger.trace("{}", params) + params.connect(client.client) { msg -> + backoff.reset() + if (msg is WssNewData) { + val orders = msg.BTF.map { + EbicsOrder.V3( + type = "BTD", + service = it.SERVICE, + scope = it.SCOPE, + message = it.MSGNAME, + version = it.VERSION, + container = it.CONTTYPE, + option = it.OPTION + ) + } + channel.send(orders) + } + } + } catch (e: Exception) { + e.fmtLog(logger) + delay(backoff.next()) + } + } + } + } + return channel } \ No newline at end of file diff --git a/testbench/src/main/kotlin/Main.kt b/testbench/src/main/kotlin/Main.kt @@ -90,10 +90,10 @@ class Cli : CliktCommand("Run integration tests on banks provider") { LIBEUFIN_NEXUS_HOME = test/$platform [nexus-fetch] - FREQUENCY = 5s + FREQUENCY = 5d [nexus-submit] - FREQUENCY = 5s + FREQUENCY = 5d [libeufin-nexusdb-postgres] CONFIG = postgres:///libeufintestbench