commit 583787d4b40bcdc6c5ce3017dd495267b711136f
parent f6d0a7e9378137643513d088370ec6377ba78c1e
Author: Antoine A <>
Date: Mon, 22 Jul 2024 15:24:48 +0200
nexus: WIP WSS more fix and test
Diffstat:
3 files changed, 93 insertions(+), 25 deletions(-)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -161,9 +161,10 @@ class Wss: CliktCommand("Listen to EBICS instant notification over websocket") {
val backoff = ExpoBackoffDecorr()
nexusConfig(common.config).withDb { db, cfg ->
val (clientKeys, bankKeys) = expectFullKeys(cfg)
+ val httpClient = httpClient()
val client = EbicsClient(
cfg,
- httpClient(),
+ httpClient,
db,
EbicsLogger(ebicsLog),
clientKeys,
@@ -175,7 +176,7 @@ class Wss: CliktCommand("Listen to EBICS instant notification over websocket") {
val params = client.wssParams()
logger.debug("{}", params)
logger.info("Start listening")
- params.client {
+ params.connect(httpClient) {
backoff.reset()
}
} catch (e: Exception) {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsWS.kt
@@ -93,7 +93,7 @@ sealed interface WssNotification {
}
}
-
+/** Download EBICS real-time notifications websocket params */
suspend fun EbicsClient.wssParams(): WssParams {
lateinit var params: WssParams
download(EbicsOrder.WSS_PARAMS, null, null) { stream ->
@@ -102,12 +102,22 @@ suspend fun EbicsClient.wssParams(): WssParams {
return params
}
-suspend fun WssParams.client(lambda: suspend (WssNotification) -> Unit) {
- val client = HttpClient {
+/** Receive a JSON message from a websocket session */
+inline suspend fun <reified T> DefaultClientWebSocketSession.receiveJson(): T {
+ val frame = incoming.receive()
+ val content = frame.readBytes()
+ val msg = Json.decodeFromStream(kotlinx.serialization.serializer<T>(), content.inputStream())
+ return msg
+}
+
+/** Connect to the EBICS real-time notifications websocket */
+suspend fun WssParams.connect(client: HttpClient, lambda: suspend (WssNotification) -> Unit) {
+ val client = client.config {
install(WebSockets) {
contentConverter = KotlinxWebsocketSerializationConverter(Json)
}
}
+ // TODO check PARTNERID and USERID match conf ?
val credentials = buildString {
// Username
append(PARTNERID)
@@ -127,7 +137,8 @@ suspend fun WssParams.client(lambda: suspend (WssNotification) -> Unit) {
}) {
while (true) {
logger.info("waiting for msg")
- val msg = receiveDeserialized<WssNotification>()
+ // TODO use receiveDeserialized from ktor when it works
+ val msg = receiveJson<WssNotification>()
logger.info("msg: {}", msg)
lambda(msg)
}
diff --git a/nexus/src/test/kotlin/WsTest.kt b/nexus/src/test/kotlin/WsTest.kt
@@ -19,30 +19,29 @@
import com.github.ajalt.clikt.core.CliktCommand
import com.github.ajalt.clikt.testing.test
+import io.ktor.client.request.*
+import io.ktor.client.statement.*
+import io.ktor.http.*
+import io.ktor.server.application.*
+import io.ktor.server.testing.*
+import io.ktor.server.routing.*
+import io.ktor.serialization.kotlinx.*
+import io.ktor.server.websocket.*
+import io.ktor.websocket.*
import tech.libeufin.common.crypto.CryptoUtil
import tech.libeufin.nexus.*
import tech.libeufin.nexus.ebics.*
import kotlinx.serialization.json.*
import kotlinx.serialization.*
+import kotlinx.coroutines.channels.ClosedReceiveChannelException
import java.io.ByteArrayOutputStream
import java.io.PrintStream
import kotlin.io.path.*
-import kotlin.test.Test
-import kotlin.test.assertEquals
+import kotlin.test.*
class WsTest {
- inline fun <reified B> roundtrip(raw: String): B {
- val json: JsonObject = Json.decodeFromString(raw)
- val decoded: B = Json.decodeFromJsonElement(json)
- val encoded = Json.encodeToJsonElement(decoded)
- assertEquals(json, encoded)
- return decoded
- }
-
- /** Test our serialization implementation works with spec examples */
- @Test
- fun serialization() {
- roundtrip<WssParams>("""
+ // WSS params example from the spec
+ val PARAMS_EXAMPLE = """
{
"URL": "https://bankmitwebsocket.de",
"TOKEN": "550e8400-e29b-11d4-a716-446655440000",
@@ -51,9 +50,9 @@ class WsTest {
"PARTNERID": "K1234567",
"USERID": "USER4711"
}
- """)
-
- val examples = sequenceOf(
+ """
+ // Notifications examples from the spec
+ val NOTIFICATION_EXAMPLES = sequenceOf(
"""
{
"MCLASS": [
@@ -124,9 +123,66 @@ class WsTest {
}
]
}
- """)
- for (raw in examples) {
+ """
+ )
+
+ /** Test JSON serialization roudtrip */
+ inline fun <reified B> roundtrip(raw: String): B {
+ val json: JsonObject = Json.decodeFromString(raw)
+ val decoded: B = Json.decodeFromJsonElement(json)
+ val encoded = Json.encodeToJsonElement(decoded)
+ assertEquals(json, encoded)
+ return decoded
+ }
+
+ /** Test our serialization implementation works with spec examples */
+ @Test
+ fun serialization() {
+ roundtrip<WssParams>(PARAMS_EXAMPLE)
+ for (raw in NOTIFICATION_EXAMPLES) {
roundtrip<WssNotification>(raw)
}
}
+
+ /** Test our implemetation works with spec examples */
+ @Test
+ fun wss() {
+ println(kotlinx.serialization.serializer<WssNotification>())
+ val params: WssParams = Json.decodeFromString(PARAMS_EXAMPLE)
+
+ testApplication {
+ externalServices {
+ hosts(params.URL.replace("https://", "wss://")) {
+ install(WebSockets) {
+ contentConverter = KotlinxWebsocketSerializationConverter(Json)
+ }
+ routing {
+ webSocket("/") {
+ // Send all examples and read them back
+ for (example in NOTIFICATION_EXAMPLES) {
+ send(example)
+ }
+ close(CloseReason(CloseReason.Codes.NORMAL, "Test done"))
+ }
+ }
+ }
+ }
+ var count = 0
+ try {
+ params.connect(client) { msg ->
+ count++
+ assert(count <= 3)
+ if (count == 3) {
+ assertIs<WssGeneralInfo>(msg)
+ } else {
+ assertIs<WssNewData>(msg)
+ }
+ }
+ } catch (e: ClosedReceiveChannelException) {
+ // Expected
+ }
+ // Check receive all messages
+ assertEquals(3, count)
+ }
+ }
}
\ No newline at end of file