commit 5a6bfd153360b24f11dffd99836c5a7d0e3662e2
parent d5f9529de0778fdecd361cbd35d15b62abe595f0
Author: Antoine A <>
Date: Fri, 19 Jul 2024 11:21:42 +0200
nexus: WIP instant EBICS notifications over websocket
Diffstat:
7 files changed, 183 insertions(+), 2 deletions(-)
diff --git a/build.gradle b/build.gradle
@@ -20,7 +20,7 @@ if (!JavaVersion.current().isCompatibleWith(JavaVersion.VERSION_17)){
allprojects {
ext {
set("kotlin_version", "2.0.0")
- set("ktor_version", "2.3.11")
+ set("ktor_version", "2.3.12")
set("clikt_version", "4.4.0")
set("coroutines_version", "1.8.1")
set("postgres_version", "42.7.3")
diff --git a/common/src/main/kotlin/helpers.kt b/common/src/main/kotlin/helpers.kt
@@ -38,6 +38,8 @@ import java.time.format.DateTimeFormatter
/** Decode a base64 encoded string */
fun String.decodeBase64(): ByteArray = Base64.getDecoder().decode(this)
+/** Encode a string as base64 */
+fun String.encodeBase64(): String = toByteArray().encodeBase64()
/** Decode a hexadecimal uppercase encoded string */
fun String.decodeUpHex(): ByteArray = HexFormat.of().withUpperCase().parseHex(this)
diff --git a/nexus/build.gradle b/nexus/build.gradle
@@ -29,6 +29,7 @@ dependencies {
// Ktor client library
implementation("io.ktor:ktor-server-core:$ktor_version")
implementation("io.ktor:ktor-client-cio:$ktor_version")
+ implementation("io.ktor:ktor-client-websockets:$ktor_version")
// PDF generation
implementation("com.itextpdf:itext-core:8.0.4")
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -48,8 +48,10 @@ import tech.libeufin.nexus.db.Database
import tech.libeufin.nexus.db.InitiatedPayment
import tech.libeufin.nexus.ebics.EbicsOrder
import tech.libeufin.nexus.ebics.EbicsClient
+import tech.libeufin.nexus.ebics.*
import java.nio.file.Path
import java.time.Instant
+import kotlinx.coroutines.delay
internal val logger: Logger = LoggerFactory.getLogger("libeufin-nexus")
@@ -148,6 +150,42 @@ class Serve : CliktCommand("Run libeufin-nexus HTTP server", name = "serve") {
}
}
+class Wss: CliktCommand("Listen to EBICS instant notification over websocket") {
+ private val common by CommonOption()
+ private val ebicsLog by option(
+ "--debug-ebics",
+ help = "Log EBICS content at SAVEDIR",
+ )
+
+ override fun run() = cliCmd(logger, common.log) {
+ val backoff = ExpoBackoffDecorr()
+ nexusConfig(common.config).withDb { db, cfg ->
+ val (clientKeys, bankKeys) = expectFullKeys(cfg)
+ val client = EbicsClient(
+ cfg,
+ httpClient(),
+ db,
+ EbicsLogger(ebicsLog),
+ clientKeys,
+ bankKeys
+ )
+ while (true) {
+ try {
+ logger.info("Fetch WSS params")
+ val params = client.wssParams()
+ logger.debug("{}", params)
+ logger.info("Start listening")
+ params.client {
+ backoff.reset()
+ }
+ } catch (e: Exception) {
+ delay(backoff.next())
+ }
+ }
+ }
+ }
+}
+
class FakeIncoming: CliktCommand("Genere a fake incoming payment") {
private val common by CommonOption()
private val amount by option(
@@ -361,7 +399,7 @@ class ListCmd: CliktCommand("List nexus transactions", name = "list") {
class TestingCmd : CliktCommand("Testing helper commands", name = "testing") {
init {
- subcommands(FakeIncoming(), ListCmd(), EbicsDownload(), TxCheck())
+ subcommands(FakeIncoming(), ListCmd(), EbicsDownload(), TxCheck(), Wss())
}
override fun run() = Unit
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsOrder.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsOrder.kt
@@ -34,6 +34,15 @@ sealed class EbicsOrder(val schema: String) {
val container: String? = null,
val option: String? = null
): EbicsOrder("H005")
+
+ companion object {
+ val WSS_PARAMS = EbicsOrder.V3(
+ type = "BTD",
+ name = "OTH",
+ scope = "DE",
+ messageName = "wssparam"
+ )
+ }
}
enum class Dialect {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt
@@ -0,0 +1,128 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+
+package tech.libeufin.nexus.ebics
+
+import tech.libeufin.common.*
+import kotlinx.serialization.Serializable
+import io.ktor.serialization.kotlinx.*
+import io.ktor.websocket.*
+import kotlinx.serialization.json.*
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+import io.ktor.http.*
+import io.ktor.client.*
+import io.ktor.client.request.*
+import io.ktor.client.plugins.websocket.*
+
+private val logger: Logger = LoggerFactory.getLogger("libeufin-nexus-ws")
+
+@Serializable
+data class WssParams(
+ val URL: String,
+ val TOKEN: String,
+ val OTT: String,
+ val VALIDITY: String,
+ val PARTNERID: String,
+ val USERID: String? = null,
+)
+
+@Serializable
+data class WssNotificationClass(
+ val NAME: String,
+ val VERS: String,
+ val TIMESTAMP: String,
+)
+
+@Serializable
+data class WssNotificationBTF(
+ val SERVICE: String,
+ val SCOPE: String? = null,
+ val OPTION: String? = null,
+ val CONTTYPE: String? = null,
+ val MSGNAME: String,
+ val VARIANT: String? = null,
+ val VERSION: String? = null,
+ val FORMAT: String? = null,
+)
+
+@Serializable
+data class WssNewData(
+ val MCLASS: WssNotificationClass,
+ val PARTNERID: String,
+ val USERID: String? = null,
+ val BTF: WssNotificationBTF,
+ val ORDERTYPE: List<String>? = null
+): WssNotification
+
+@Serializable
+data class WssInfo(
+ val LANG: String,
+ val FREE: String
+)
+
+@Serializable
+data class WssGeneralInfo(
+ val MCLASS: WssNotificationClass,
+ val INFO: WssInfo
+): WssNotification
+
+@Serializable
+sealed interface WssNotification;
+
+
+suspend fun EbicsClient.wssParams(): WssParams {
+ lateinit var params: WssParams
+ download(EbicsOrder.WSS_PARAMS, null, null) { stream ->
+ params = Json.decodeFromStream(stream)
+ }
+ return params
+}
+
+suspend fun WssParams.client(lambda: suspend (WssNotification) -> Unit) {
+ val client = HttpClient {
+ install(WebSockets) {
+ contentConverter = KotlinxWebsocketSerializationConverter(Json)
+ }
+ }
+ val credentials = buildString {
+ // Username
+ append(PARTNERID)
+ if (USERID != null) {
+ append('_')
+ append(USERID)
+ }
+ // Password
+ append(':')
+ append(TOKEN)
+ }.encodeBase64()
+
+ client.wss(URL.replace("https://", "wss://"), request = {
+ headers {
+ append(HttpHeaders.Authorization, "Basic $credentials")
+ }
+ }) {
+ while (true) {
+ logger.info("waiting for msg")
+ val msg = receiveDeserialized<WssNotification>()
+ logger.info("msg: {}", msg)
+ lambda(msg)
+ }
+ }
+}
+\ No newline at end of file
diff --git a/testbench/src/main/kotlin/Main.kt b/testbench/src/main/kotlin/Main.kt
@@ -156,6 +156,7 @@ class Cli : CliktCommand("Run integration tests on banks provider") {
put("reset-db", "dbinit -r $flags")
put("recover", "Recover old transactions", "ebics-fetch $ebicsFlags --pinned-start 2024-01-01 $recoverDoc")
put("fetch", "Fetch all documents", "ebics-fetch $ebicsFlags")
+ put("fetch-wait", "Fetch all documents", "ebics-fetch $debugFlags")
put("ack", "Fetch CustomerAcknowledgement", "ebics-fetch $ebicsFlags acknowledgement")
put("status", "Fetch CustomerPaymentStatusReport", "ebics-fetch $ebicsFlags status")
put("report", "Fetch BankToCustomerAccountReport", "ebics-fetch $ebicsFlags report")
@@ -164,6 +165,7 @@ class Cli : CliktCommand("Run integration tests on banks provider") {
put("list-incoming", "List incoming transaction", "testing list $flags incoming")
put("list-outgoing", "List outgoing transaction", "testing list $flags outgoing")
put("list-initiated", "List initiated payments", "testing list $flags initiated")
+ put("wss", "Listen to notification over websocket", "testing wss $debugFlags")
put("submit", "Submit pending transactions", "ebics-submit $ebicsFlags")
put("setup", "Setup", "ebics-setup $debugFlags")
put("reset-keys", suspend {