libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 80c0ec6b40af59e8c136f5f6393405df28c86247
parent 78b0fb7d443ea2f513b60d10308396f874d35439
Author: Antoine A <>
Date:   Tue, 14 Oct 2025 18:58:40 +0100

nexus: add observability API

Diffstat:
Mcommon/src/main/kotlin/Constants.kt | 1+
Mcommon/src/main/kotlin/TalerMessage.kt | 10++++++++--
Mcontrib/nexus.conf | 16++++++++++++++++
Mnexus/build.gradle | 6++++++
Mnexus/conf/test.conf | 8++++++--
Mnexus/src/main/kotlin/tech/libeufin/nexus/Config.kt | 1+
Mnexus/src/main/kotlin/tech/libeufin/nexus/Constants.kt | 6++++--
Mnexus/src/main/kotlin/tech/libeufin/nexus/Main.kt | 17+++++++++++++++--
Anexus/src/main/kotlin/tech/libeufin/nexus/api/ObservabilityApi.kt | 185+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mnexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt | 10+++-------
Mnexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSubmit.kt | 6+++++-
Mnexus/src/main/kotlin/tech/libeufin/nexus/db/KvDAO.kt | 27+++++++++++++++++++--------
Anexus/src/test/kotlin/ObservabilityTest.kt | 40++++++++++++++++++++++++++++++++++++++++
Mnexus/src/test/kotlin/bench.kt | 14++++++++++++++
14 files changed, 323 insertions(+), 24 deletions(-)

diff --git a/common/src/main/kotlin/Constants.kt b/common/src/main/kotlin/Constants.kt @@ -28,6 +28,7 @@ const val MAX_BODY_LENGTH: Int = 4 * 1024 // 4kB // API version const val WIRE_GATEWAY_API_VERSION: String = "4:0:3" const val REVENUE_API_VERSION: String = "1:1:1" +const val OBSERVABILITY_API_VERSION: String = "0:0:0" // HTTP headers const val TALER_CHALLENGE_IDS: String = "Taler-Challenge-Ids" diff --git a/common/src/main/kotlin/TalerMessage.kt b/common/src/main/kotlin/TalerMessage.kt @@ -207,4 +207,11 @@ data class RevenueIncomingBankTransaction( val credit_fee: TalerAmount? = null, val debit_account: String, val subject: String -) -\ No newline at end of file +) + +/** Response GET /taler-observability/config */ +@Serializable +class TalerObservabilityConfig() { + val name: String = "taler-observability" + val version: String = OBSERVABILITY_API_VERSION +} diff --git a/contrib/nexus.conf b/contrib/nexus.conf @@ -136,3 +136,19 @@ AUTH_METHOD = bearer # Token for bearer authentication scheme TOKEN = + +[nexus-httpd-observability-api] +# Whether to serve the Observability API +ENABLED = NO + +# Authentication scheme, this can either can be basic, bearer or none. +AUTH_METHOD = bearer + +# User name for basic authentication scheme +# USERNAME = + +# Password for basic authentication scheme +# PASSWORD = + +# Token for bearer authentication scheme +TOKEN = diff --git a/nexus/build.gradle b/nexus/build.gradle @@ -23,6 +23,12 @@ dependencies { implementation(project(":common")) + // Metrics + implementation('io.prometheus:prometheus-metrics-core:1.4.1') + implementation('io.prometheus:prometheus-metrics-instrumentation-jvm:1.4.1') + implementation("io.prometheus:prometheus-metrics-exporter-httpserver:1.4.1") + implementation("io.prometheus:prometheus-metrics-exposition-formats:1.4.1") + // Command line parsing implementation("com.github.ajalt.clikt:clikt:$clikt_version") implementation("org.postgresql:postgresql:$postgres_version") diff --git a/nexus/conf/test.conf b/nexus/conf/test.conf @@ -22,4 +22,8 @@ TOKEN = secret-token [nexus-httpd-revenue-api] ENABLED = YES AUTH_METHOD = bearer -TOKEN = secret-token -\ No newline at end of file +TOKEN = secret-token + +[nexus-httpd-observability-api] +ENABLED = YES +AUTH_METHOD = none +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt @@ -140,6 +140,7 @@ class NexusConfig internal constructor (val cfg: TalerConfig) { val wireGatewayApiCfg = cfg.section("nexus-httpd-wire-gateway-api").apiConf() val revenueApiCfg = cfg.section("nexus-httpd-revenue-api").apiConf() + val observabilityApiCfg = cfg.section("nexus-httpd-observability-api").apiConf() } fun NexusConfig.checkCurrency(amount: TalerAmount) { diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Constants.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Constants.kt @@ -19,4 +19,6 @@ package tech.libeufin.nexus // KV -val CHECKPOINT_KEY = "checkpoint" -\ No newline at end of file +val CHECKPOINT_KEY = "checkpoint" +val SUBMIT_TASK_KEY = "submit_task" +val FETCH_TASK_KEY = "fetch_task" +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt @@ -27,13 +27,17 @@ package tech.libeufin.nexus import io.ktor.server.application.* import org.slf4j.Logger import org.slf4j.LoggerFactory +import kotlinx.serialization.Serializable +import kotlinx.serialization.Contextual import tech.libeufin.common.api.talerApi import tech.libeufin.common.setupSecurityProperties import tech.libeufin.nexus.api.revenueApi import tech.libeufin.nexus.api.wireGatewayApi +import tech.libeufin.nexus.api.observabilityApi import tech.libeufin.nexus.cli.LibeufinNexus import tech.libeufin.nexus.db.Database import com.github.ajalt.clikt.core.main +import java.time.Instant internal val logger: Logger = LoggerFactory.getLogger("libeufin-nexus") @@ -47,9 +51,18 @@ data class IbanAccountMetadata( fun Application.nexusApi(db: Database, cfg: NexusConfig) = talerApi(LoggerFactory.getLogger("libeufin-nexus-api")) { wireGatewayApi(db, cfg) revenueApi(db, cfg) + observabilityApi(db, cfg) } fun main(args: Array<String>) { setupSecurityProperties() LibeufinNexus().main(args) -} -\ No newline at end of file +} + +@Serializable +data class TaskStatus( + @Contextual + val last_successfull: Instant? = null, + @Contextual + val last_trial: Instant? = null +) +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/api/ObservabilityApi.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/api/ObservabilityApi.kt @@ -0,0 +1,184 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2025 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.nexus.api + +import io.ktor.http.* +import io.ktor.server.application.* +import io.ktor.server.request.* +import io.ktor.server.response.* +import io.ktor.server.routing.* +import io.ktor.util.pipeline.* +import io.prometheus.metrics.core.metrics.* +import io.prometheus.metrics.model.registry.PrometheusRegistry +import io.prometheus.metrics.model.snapshots.Unit +import io.prometheus.metrics.instrumentation.jvm.JvmMetrics; +import io.prometheus.metrics.expositionformats.ExpositionFormats +import tech.libeufin.common.* +import tech.libeufin.common.db.* +import tech.libeufin.nexus.* +import tech.libeufin.nexus.db.* +import tech.libeufin.nexus.db.KvDAO.* +import tech.libeufin.nexus.db.ExchangeDAO.TransferResult +import tech.libeufin.nexus.db.PaymentDAO.IncomingRegistrationResult +import tech.libeufin.nexus.ebics.randEbicsId +import tech.libeufin.nexus.iso20022.* +import java.time.Instant +import java.io.ByteArrayOutputStream + +object Metrics { + @Volatile + private var incomingTxTotal: Long = 0 + @Volatile + private var outgoingTxTotal: Long = 0 + @Volatile + private var incomingTalerTxTotal: Long = 0 + @Volatile + private var outgoingTalerTxTotal: Long = 0 + @Volatile + private var bouncedTotal: Long = 0 + @Volatile + private var initiatedStatus: Map<String, Long> = emptyMap() + @Volatile + private var submitStatus: TaskStatus = TaskStatus() + @Volatile + private var fetchStatus: TaskStatus = TaskStatus() + + init { + // Register JVM metrics + JvmMetrics.builder().register() + + // Register custom metrics + CounterWithCallback.builder() + .name("libeufin_nexus_tx_incoming_total") + .help("Number of registered incoming transactions") + .callback { it.call(incomingTxTotal.toDouble()) } + .register() + CounterWithCallback.builder() + .name("libeufin_nexus_tx_outgoing_total") + .help("Number of initiated outgoing transactions") + .callback { it.call(outgoingTxTotal.toDouble()) } + .register() + CounterWithCallback.builder() + .name("libeufin_nexus_tx_incoming_talerable_total") + .help("Number of registered incoming talerable transactions") + .callback { it.call(incomingTalerTxTotal.toDouble()) } + .register() + CounterWithCallback.builder() + .name("libeufin_nexus_tx_outgoing_talerable_total") + .help("Number of initiated outgoing talerable transactions") + .callback { it.call(outgoingTalerTxTotal.toDouble()) } + .register() + CounterWithCallback.builder() + .name("libeufin_nexus_tx_bounced_total") + .help("Number of bounced transactions") + .callback { it.call(bouncedTotal.toDouble()) } + .register() + GaugeWithCallback.builder() + .name("libeufin_nexus_tx_initiated") + .help("Status of initiated transaction") + .labelNames("status") + .callback { + for ((label, count) in initiatedStatus) { + it.call(count.toDouble(), label) + } + } + .register() + GaugeWithCallback.builder() + .name("libeufin_nexus_task_execution_timestamp_seconds") + .help("Status of initiated transaction") + .unit(Unit.SECONDS) + .labelNames("name") + .callback { + submitStatus.last_trial?.let { timestamp -> + it.call(timestamp.getEpochSecond().toDouble(), "submit") + } + fetchStatus.last_trial?.let { timestamp -> + it.call(timestamp.getEpochSecond().toDouble(), "fetch") + } + } + .register() + GaugeWithCallback.builder() + .name("libeufin_nexus_task_success_timestamp_seconds") + .help("Status of initiated transaction") + .unit(Unit.SECONDS) + .labelNames("name") + .callback { + submitStatus.last_successfull?.let { timestamp -> + it.call(timestamp.getEpochSecond().toDouble(), "submit") + } + fetchStatus.last_successfull?.let { timestamp -> + it.call(timestamp.getEpochSecond().toDouble(), "fetch") + } + } + .register() + } + + // Load metrics from the database + suspend fun sync(db: Database) { + db.serializable( + """ + SELECT + (SELECT count(*) FROM incoming_transactions) AS incoming_tx_count, + (SELECT count(*) FROM outgoing_transactions) AS outgoing_tx_count, + (SELECT count(*) FROM talerable_incoming_transactions) AS incoming_talerable_tx_count, + (SELECT count(*) FROM talerable_outgoing_transactions) AS outgoing_talerable_tx_count, + (SELECT count(*) FROM bounced_transactions) AS bounced_tx_count, + (SELECT value FROM kv WHERE key='$SUBMIT_TASK_KEY') AS submit_status, + (SELECT value FROM kv WHERE key='$FETCH_TASK_KEY') AS fetch_status + """ + ) { + one { + incomingTxTotal = it.getLong("incoming_tx_count") + outgoingTxTotal = it.getLong("outgoing_tx_count") + incomingTalerTxTotal = it.getLong("incoming_talerable_tx_count") + outgoingTalerTxTotal = it.getLong("outgoing_talerable_tx_count") + bouncedTotal = it.getLong("bounced_tx_count") + submitStatus = it.getJson<TaskStatus>("submit_status") ?: TaskStatus() + fetchStatus = it.getJson<TaskStatus>("fetch_status") ?: TaskStatus() + Unit + } + } + + db.serializable( + """ + SELECT count(*) as count, status FROM initiated_outgoing_transactions GROUP BY status + """ + ) { + initiatedStatus = all { + it.getString("status") to it.getLong("count") + }.toMap() + } + } +} + +fun Routing.observabilityApi(db: Database, cfg: NexusConfig) = conditional(cfg.observabilityApiCfg) { + get("/taler-observability/config") { + call.respond(TalerObservabilityConfig()) + } + auth(cfg.observabilityApiCfg) { + get("/taler-observability/metrics") { + Metrics.sync(db) + val snapshot = PrometheusRegistry.defaultRegistry.scrape() + val outputStream = ByteArrayOutputStream() + ExpositionFormats.init().getPrometheusTextFormatWriter().write(outputStream, snapshot) + call.respondText(outputStream.toString(Charsets.UTF_8), ContentType.parse("text/plain; version=0.0.4; charset=utf-8")) + } + } +} +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt @@ -455,9 +455,9 @@ class EbicsFetch: EbicsCmd() { } var lastFetch = Instant.EPOCH - var checkpoint = db.kv.get<Checkpoint>(CHECKPOINT_KEY) ?: Checkpoint() while (true) { + val checkpoint = db.kv.get<TaskStatus>(CHECKPOINT_KEY) ?: TaskStatus() var nextFetch = lastFetch + cfg.fetch.frequency var nextCheckpoint = run { // We never ran, we must checkpoint now @@ -507,12 +507,7 @@ class EbicsFetch: EbicsCmd() { e.fmtLog(logger) false } - checkpoint = if (success) { - checkpoint.copy(last_successfull = now, last_trial = now) - } else { - checkpoint.copy(last_trial = now) - } - db.kv.set(CHECKPOINT_KEY, checkpoint) + db.kv.updateTaskStatus(CHECKPOINT_KEY, now, success) lastFetch = now } else if (transient || now > nextFetch) { logger.info("Running at frequency") @@ -533,6 +528,7 @@ class EbicsFetch: EbicsCmd() { } lastFetch = now } + db.kv.updateTaskStatus(SUBMIT_TASK_KEY, now, success) if (transient) { throw ProgramResult(if (!success) 1 else 0) } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSubmit.kt @@ -141,11 +141,15 @@ class EbicsSubmit : EbicsCmd() { } else { logger.debug("Running with a frequency of ${cfg.submit.frequencyRaw}") while (true) { - try { + val now = Instant.now(); + val success = try { submitAll(client, cfg.submit.requireAck) + true } catch (e: Exception) { e.fmtLog(logger) + false } + db.kv.updateTaskStatus(SUBMIT_TASK_KEY, now, success) // TODO take submitBatch taken time in the delay delay(cfg.submit.frequency.toKotlinDuration()) } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/KvDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/KvDAO.kt @@ -23,7 +23,7 @@ import tech.libeufin.common.* import tech.libeufin.common.db.* import tech.libeufin.nexus.iso20022.IncomingPayment import tech.libeufin.nexus.iso20022.OutgoingPayment -import java.sql.Types +import java.sql.* import java.time.Instant import kotlinx.serialization.Contextual import kotlinx.serialization.KSerializer @@ -52,6 +52,14 @@ val JSON = Json { } } +inline fun <reified T> ResultSet.getJson(name: String): T? { + val value = this.getString(name) + if (value == null) { + return value + } + return JSON.decodeFromString(value) +} + /** Data access logic for key value */ class KvDAO( val db: Database) { /** Get current value for [key] */ @@ -60,18 +68,21 @@ class KvDAO( val db: Database) { ) { bind(key) oneOrNull { - val encoded = it.getString(1) - JSON.decodeFromString<T>(encoded) + it.getJson("value") } } - /** Set [value] for [key] */ - suspend inline fun <reified T> set(key: String, value: T) = db.serializable( - "INSERT INTO kv (key, value) VALUES (?, ?::jsonb) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value" + /** Update a TaskStatus timestamp */ + suspend fun updateTaskStatus(key: String, timestamp: Instant, success: Boolean) = db.serializable( + if (success) { + "INSERT INTO kv (key, value) VALUES (?, jsonb_build_object('last_successfull', ?, 'last_trial', ?)) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value" + } else { + "INSERT INTO kv (key, value) VALUES (?, jsonb_build_object('last_trial', ?)) ON CONFLICT (key) DO UPDATE SET value=jsonb_set(EXCLUDED.value, '{last_trial}'::text[], to_jsonb(?))" + } ) { - val encoded = JSON.encodeToString<T>(value) bind(key) - bind(encoded) + bind(timestamp) + bind(timestamp) executeUpdate() } } \ No newline at end of file diff --git a/nexus/src/test/kotlin/ObservabilityTest.kt b/nexus/src/test/kotlin/ObservabilityTest.kt @@ -0,0 +1,39 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024-2025 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +import io.ktor.http.* +import io.ktor.client.request.* +import org.junit.Test +import tech.libeufin.common.TalerObservabilityConfig +import tech.libeufin.common.assertOkJson +import tech.libeufin.common.assertOk + +class ObservabilityApiTest { + // GET /taler-observability/config + @Test + fun config() = serverSetup { + client.get("/taler-observability/config").assertOkJson<TalerObservabilityConfig>() + } + + // GET /taler-observability/metrics + @Test + fun metrics() = serverSetup { db -> + client.getA("/taler-observability/metrics").assertOk() + } +} +\ No newline at end of file diff --git a/nexus/src/test/kotlin/bench.kt b/nexus/src/test/kotlin/bench.kt @@ -19,6 +19,7 @@ import org.junit.Test import org.postgresql.jdbc.PgConnection +import io.ktor.client.request.* import tech.libeufin.common.* import tech.libeufin.common.test.* import tech.libeufin.nexus.* @@ -65,6 +66,13 @@ class Bench { "initiated_outgoing_transactions(amount, subject, initiation_time, credit_payto, outgoing_transaction_id, end_to_end_id)" to { "(42,0)\tsubject\t0\tcredit_payto\t${it*2}\tE2E_ID$it\n" }, + "bounced_transactions(incoming_transaction_id, initiated_outgoing_transaction_id)" to { + if (it % 10 == 0) { + "${it/2}\t${it/3}\n" + } else { + "" + } + }, "talerable_incoming_transactions(type, metadata, incoming_transaction_id)" to { val hex = token32.rand().encodeHex() if (it % 2 == 0) { @@ -162,6 +170,12 @@ class Bench { client.getA("/taler-wire-gateway/history/outgoing") .assertOk() } + + // Observability + measureAction("metrics") { + client.get("/taler-observability/metrics") + .assertOk() + } }} } } \ No newline at end of file