commit 89ae2f42ce0e08663f6508c316fbc54606b1810b parent 9496810ba34a119e74df7fe8b1deeaebf064a321 Author: Antoine A <> Date: Fri, 1 Aug 2025 16:13:15 +0200 nexus: add manual acknowledgement Diffstat:
15 files changed, 247 insertions(+), 110 deletions(-)
diff --git a/contrib/nexus.conf b/contrib/nexus.conf @@ -86,6 +86,9 @@ CHECKPOINT_TIME_OF_DAY = 19:00 # How often should ebics-fetch submit pending transactions FREQUENCY = 30m +# Wether to wait for manual acknowledgement before submiting transactions +# manual_ack = NO + [nexus-httpd] # How "libeufin-nexus serve" serves its API, this can either be tcp or unix SERVE = tcp diff --git a/database-versioning/libeufin-nexus-0012.sql b/database-versioning/libeufin-nexus-0012.sql @@ -0,0 +1,25 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2025 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +BEGIN; + +SELECT _v.register_patch('libeufin-nexus-0012', NULL, NULL); + +SET search_path TO libeufin_nexus; + +ALTER TABLE initiated_outgoing_transactions + ADD COLUMN awaiting_ack BOOLEAN NOT NULL DEFAULT FALSE; + +COMMIT; diff --git a/database-versioning/libeufin-nexus-procedures.sql b/database-versioning/libeufin-nexus-procedures.sql @@ -476,17 +476,24 @@ END $$; CREATE FUNCTION batch_outgoing_transactions( IN in_timestamp INT8, - IN batch_ebics_id TEXT + IN batch_ebics_id TEXT, + IN require_ack BOOLEAN ) RETURNS void LANGUAGE plpgsql AS $$ DECLARE +pending BOOLEAN; batch_id INT8; local_sum taler_amount DEFAULT (0, 0)::taler_amount; tx record; BEGIN +IF require_ack THEN + pending = EXISTS(SELECT FROM initiated_outgoing_transactions WHERE initiated_outgoing_batch_id IS NULL AND awaiting_ack); +ELSE + pending = EXISTS(SELECT FROM initiated_outgoing_transactions WHERE initiated_outgoing_batch_id IS NULL); +END IF; -- Create a new batch only if some transactions are not batched -IF (EXISTS(SELECT FROM initiated_outgoing_transactions WHERE initiated_outgoing_batch_id IS NULL)) THEN +IF (pending) THEN -- Create batch INSERT INTO initiated_outgoing_batches (creation_date, message_id) VALUES (in_timestamp, batch_ebics_id) diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Config.kt @@ -58,6 +58,7 @@ class NexusSubmitConfig(config: TalerConfig) { private val section = config.section("nexus-submit") val frequency = section.duration("frequency").require() val frequencyRaw = section.string("frequency").require() + val requireAck = section.boolean("manual_ack").default(false) } class NexusSetupConfig(config: TalerConfig) { diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSubmit.kt @@ -77,13 +77,13 @@ private suspend fun submitBatch( } /** Submit all pending initiated payments using [client] */ -private suspend fun submitAll(client: EbicsClient) { +private suspend fun submitAll(client: EbicsClient, requireAck: Boolean) { // Find a supported debit order var instantDebitOrder = client.cfg.ebics.dialect.instantDirectDebit() val debitOrder = client.cfg.ebics.dialect.directDebit() // Create batch if necessary - client.db.initiated.batch(Instant.now(), randEbicsId()) + client.db.initiated.batch(Instant.now(), randEbicsId(), requireAck) // Send submittable batches client.db.initiated.submittable().forEach { batch -> logger.debug("Submitting batch {}", batch.messageId) @@ -130,12 +130,12 @@ class EbicsSubmit : EbicsCmd() { if (transient) { logger.info("Transient mode: submitting what found and returning.") - submitAll(client) + submitAll(client, cfg.submit.requireAck) } else { logger.debug("Running with a frequency of ${cfg.submit.frequencyRaw}") while (true) { try { - submitAll(client) + submitAll(client, cfg.submit.requireAck) } catch (e: Exception) { throw Exception("Failed to submit payments", e) } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/LibeufinNexus.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/LibeufinNexus.kt @@ -48,7 +48,7 @@ abstract class EbicsCmd(name: String? = null): TalerCmd(name) { class LibeufinNexus : CliktCommand() { init { versionOption(VERSION) - subcommands(DbInit(), EbicsSetup(), EbicsSubmit(), EbicsFetch(), Serve(), InitiatePayment(), ManualCmd(), CliConfigCmd(NEXUS_CONFIG_SOURCE), TestingCmd()) + subcommands(DbInit(), EbicsSetup(), EbicsSubmit(), EbicsFetch(), Serve(), InitiatePayment(), ManualCmd(), ListCmd(), CliConfigCmd(NEXUS_CONFIG_SOURCE), TestingCmd()) } override fun run() = Unit } \ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/List.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/List.kt @@ -0,0 +1,148 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2025 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.nexus.cli + +import com.github.ajalt.clikt.core.CliktCommand +import com.github.ajalt.clikt.core.Context +import com.github.ajalt.clikt.core.subcommands +import com.github.ajalt.clikt.parameters.arguments.* +import com.github.ajalt.clikt.parameters.groups.provideDelegate +import com.github.ajalt.clikt.parameters.options.convert +import com.github.ajalt.clikt.parameters.options.default +import com.github.ajalt.clikt.parameters.options.flag +import com.github.ajalt.clikt.parameters.options.option +import com.github.ajalt.clikt.parameters.types.* +import com.github.ajalt.mordant.terminal.* +import tech.libeufin.common.* +import tech.libeufin.nexus.* +import tech.libeufin.nexus.ebics.* +import tech.libeufin.nexus.iso20022.* +import java.util.zip.* +import java.time.Instant +import java.io.* + +private fun fmtPayto(payto: String): String { + try { + val parsed = Payto.parse(payto).expectIban() + return buildString { + append(parsed.iban.toString()) + if (parsed.bic != null) append(" ${parsed.bic}") + if (parsed.receiverName != null) append(" ${parsed.receiverName}") + } + } catch (e: Exception) { + return payto.removePrefix("payto://") + } +} + + +class ListIncoming: TalerCmd("incoming") { + override fun help(context: Context) = "List incoming transactions" + + override fun run() = cliCmd(logger) { + nexusConfig(config).withDb { db, cfg -> + val txs = db.list.incoming() + for (tx in txs) { + println(buildString{ + if (tx.creditFee.isZero()) { + append("${tx.date} ${tx.id} ${tx.amount}\n") + } else { + append("${tx.date} ${tx.id} ${tx.amount}-${tx.creditFee}\n") + } + if (tx.debtor != null) { + append(" debtor: ${fmtPayto(tx.debtor)}\n") + } + if (tx.subject != null) { + append(" subject: ${tx.subject}\n") + } + if (tx.talerable != null) { + append(" talerable: ${tx.talerable}\n") + } + if (tx.bounced != null) { + append(" bounced: ${tx.bounced}\n") + } + }) + } + } + } +} + +class ListOutgoing: TalerCmd("outgoing") { + override fun help(context: Context) = "List outgoing transactions" + + override fun run() = cliCmd(logger) { + nexusConfig(config).withDb { db, cfg -> + val txs = db.list.outgoing() + for (tx in txs) { + println(buildString{ + append("${tx.date} ${tx.id} ${tx.amount}\n") + if (tx.creditor != null) { + append(" creditor: ${fmtPayto(tx.creditor)}\n") + } + append(" subject: ${tx.subject}\n") + if (tx.wtid != null) { + append(" talerable: ${tx.wtid} ${tx.exchangeBaseUrl}\n") + } + }) + } + } + } +} + +class ListInitiated: TalerCmd("initiated") { + override fun help(context: Context) = "List initiated transactions" + + private val awaitingAck by option().flag() + + override fun run() = cliCmd(logger) { + nexusConfig(config).withDb { db, cfg -> + val txs = db.list.initiated(awaitingAck) + for (tx in txs) { + println(buildString{ + append("${tx.date} ${tx.id} ${tx.amount}\n") + append(" creditor: ${fmtPayto(tx.creditor)}\n") + append(" subject: ${tx.subject}\n") + if (tx.batch != null) { + append(" batch: ${tx.batch}") + if (tx.batchOrder != null) + append(" ${tx.batchOrder}") + append('\n') + } + append(" submission: ${tx.submissionTime} ${tx.submissionCounter}\n") + append(" status: ${tx.status}") + if (tx.msg != null) { + append(" ${tx.msg}") + } + append('\n') + }) + } + } + } +} + + +class ListCmd: CliktCommand("list") { + override fun help(context: Context) = "List nexus transactions" + + init { + subcommands(ListIncoming(), ListOutgoing(), ListInitiated()) + } + + override fun run() = Unit +} +\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/Manual.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/Manual.kt @@ -47,7 +47,7 @@ class ExportCmt: TalerCmd("export") { override fun run() = cliCmd(logger) { nexusConfig(config).withDb { db, cfg -> // Create and get pending batches - db.initiated.batch(Instant.now(), randEbicsId()) + db.initiated.batch(Instant.now(), randEbicsId(), cfg.submit.requireAck) val batches = db.initiated.submittable() var nbTx: Int = 0 @@ -132,9 +132,23 @@ class StatusCmd: TalerCmd("status") { } } +class AckCmd: TalerCmd("ack") { + override fun help(context: Context) = "Manually acknowledge the outgoing transaction for submission" + + private val ids by argument().long().multiple() + + override fun run() = cliCmd(logger) { + nexusConfig(config).withDb { db, cfg -> + for (id in ids) { + db.initiated.ack(id) + } + } + } +} + class ManualCmd : TalerCmd("manual") { init { - subcommands(ExportCmt(), ImportCmt(), StatusCmd()) + subcommands(ExportCmt(), ImportCmt(), StatusCmd(), AckCmd()) } override fun help(context: Context) = "Manual management commands" diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/Testing.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/Testing.kt @@ -196,96 +196,6 @@ class EbicsDownload: TalerCmd("ebics-btd") { } } -class ListCmd: TalerCmd("list") { - override fun help(context: Context) = "List nexus transactions" - - private val kind: ListKind by argument( - help = "Which list to print", - helpTags = ListKind.entries.associate { Pair(it.name, it.description()) } - ).enum<ListKind>() - - override fun run() = cliCmd(logger) { - nexusConfig(config).withDb { db, cfg -> - fun fmtPayto(payto: String): String { - if (payto == null) return "" - try { - val parsed = Payto.parse(payto).expectIban() - return buildString { - append(parsed.iban.toString()) - if (parsed.bic != null) append(" ${parsed.bic}") - if (parsed.receiverName != null) append(" ${parsed.receiverName}") - } - } catch (e: Exception) { - return payto.removePrefix("payto://") - } - } - when (kind) { - ListKind.incoming -> { - val txs = db.list.incoming() - for (tx in txs) { - println(buildString{ - if (tx.creditFee.isZero()) { - append("${tx.date} ${tx.id} ${tx.amount}\n") - } else { - append("${tx.date} ${tx.id} ${tx.amount}-${tx.creditFee}\n") - } - if (tx.debtor != null) { - append(" debtor: ${fmtPayto(tx.debtor)}\n") - } - if (tx.subject != null) { - append(" subject: ${tx.subject}\n") - } - if (tx.talerable != null) { - append(" talerable: ${tx.talerable}\n") - } - if (tx.bounced != null) { - append(" bounced: ${tx.bounced}\n") - } - }) - } - } - ListKind.outgoing -> { - val txs = db.list.outgoing() - for (tx in txs) { - println(buildString{ - append("${tx.date} ${tx.id} ${tx.amount}\n") - if (tx.creditor != null) { - append(" creditor: ${fmtPayto(tx.creditor)}\n") - } - append(" subject: ${tx.subject}\n") - if (tx.wtid != null) { - append(" talerable: ${tx.wtid} ${tx.exchangeBaseUrl}\n") - } - }) - } - } - ListKind.initiated -> { - val txs = db.list.initiated() - for (tx in txs) { - println(buildString{ - append("${tx.date} ${tx.id} ${tx.amount}\n") - append(" creditor: ${fmtPayto(tx.creditor)}\n") - append(" subject: ${tx.subject}\n") - if (tx.batch != null) { - append(" batch: ${tx.batch}") - if (tx.batchOrder != null) - append(" ${tx.batchOrder}") - append('\n') - } - append(" submission: ${tx.submissionTime} ${tx.submissionCounter}\n") - append(" status: ${tx.status}") - if (tx.msg != null) { - append(" ${tx.msg}") - } - append('\n') - }) - } - } - } - } - } -} - class TestingCmd : CliktCommand("testing") { init { subcommands(FakeIncoming(), ListCmd(), EbicsDownload(), TxCheck(), Wss()) diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt @@ -304,11 +304,18 @@ class InitiatedDAO(private val db: Database) { } } + suspend fun ack(id: Long): Boolean { + return db.serializable("UPDATE initiated_outgoing_transactions SET awaiting_ack=true") { + executeUpdateCheck() + } + } + /** Group unbatched transaction into a single batch */ - suspend fun batch(timestamp: Instant, ebicsId: String) { - db.serializable("SELECT FROM batch_outgoing_transactions(?, ?)") { + suspend fun batch(timestamp: Instant, ebicsId: String, requireAck: Boolean) { + db.serializable("SELECT FROM batch_outgoing_transactions(?, ?, ?)") { bind(timestamp) bind(ebicsId) + bind(requireAck) executeQuery() } } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/ListDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/ListDAO.kt @@ -112,7 +112,7 @@ class ListDAO(private val db: Database) { } /** List initiated transaction metadata for debugging */ - suspend fun initiated(): List<InitiatedTxMetadata> = db.serializable( + suspend fun initiated(awaitingAck: Boolean): List<InitiatedTxMetadata> = db.serializable( """ SELECT (amount).val AS amount_val @@ -129,6 +129,7 @@ class ListDAO(private val db: Database) { ,initiated_outgoing_transactions.status_msg FROM initiated_outgoing_transactions LEFT JOIN initiated_outgoing_batches USING (initiated_outgoing_batch_id) + ${if (awaitingAck) "WHERE initiated_outgoing_transactions.initiated_outgoing_batch_id IS NULL AND awaiting_ack = true" else ""} ORDER BY initiation_time """ ) { diff --git a/nexus/src/test/kotlin/CliTest.kt b/nexus/src/test/kotlin/CliTest.kt @@ -149,8 +149,8 @@ class CliTest { @Test fun listCheck() = setup { db, _ -> fun check() { - for (list in listOf("incoming", "outgoing", "initiated")) { - val result = nexusCmd.test("testing list $list -c conf/test.conf") + for (list in listOf("incoming", "outgoing", "initiated", "initiated --awaiting-ack")) { + val result = nexusCmd.test("list $list -c conf/test.conf") assertEquals(0, result.statusCode) } } diff --git a/nexus/src/test/kotlin/DatabaseTest.kt b/nexus/src/test/kotlin/DatabaseTest.kt @@ -136,11 +136,31 @@ class OutgoingPaymentsTest { db.initiated.create(genInitPay(randEbicsId(), subject=subject)) ) } - db.initiated.batch(Instant.now(), "BATCH") + db.initiated.batch(Instant.now(), "BATCH", false) // Register batch registerOutgoingBatch(db, OutgoingBatch("BATCH", Instant.now())); db.checkOutCount(nbIncoming = 4, nbTalerable = 2) + + // Test manual ack + val txs = List(3) { nb -> + assertIs<PaymentInitiationResult.Success>( + db.initiated.create(genInitPay(randEbicsId(), subject="tx $nb")) + ).id + } + + // Check not sent without ack + db.initiated.batch(Instant.now(), "BATCH_MANUAL", true) + registerOutgoingBatch(db, OutgoingBatch("BATCH_MANUAL", Instant.now())); + db.checkOutCount(nbIncoming = 4, nbTalerable = 2) + + // Check sent with ack + for (tx in txs) { + db.initiated.ack(tx) + } + db.initiated.batch(Instant.now(), "BATCH_MANUAL", true) + registerOutgoingBatch(db, OutgoingBatch("BATCH_MANUAL", Instant.now())); + db.checkOutCount(nbIncoming = 7, nbTalerable = 2) } } @@ -537,14 +557,14 @@ class PaymentInitiationsTest { db.initiated.create(genInitPay(id)) ) } - db.initiated.batch(Instant.now(), "BATCH") + db.initiated.batch(Instant.now(), "BATCH", false) // Create witness transactions and batch for (id in sequenceOf("WITNESS_1", "WITNESS_2")) { assertIs<PaymentInitiationResult.Success>( db.initiated.create(genInitPay(id)) ) } - db.initiated.batch(Instant.now(), "BATCH_WITNESS") + db.initiated.batch(Instant.now(), "BATCH_WITNESS", false) for (id in sequenceOf("WITNESS_3", "WITNESS_4")) { assertIs<PaymentInitiationResult.Success>( db.initiated.create(genInitPay(id)) @@ -658,7 +678,7 @@ class PaymentInitiationsTest { assertIs<PaymentInitiationResult.Success>( db.initiated.create(genInitPay("PAY$it")) ) - db.initiated.batch(Instant.now(), "BATCH$it") + db.initiated.batch(Instant.now(), "BATCH$it", false) } suspend fun checkIds(vararg ids: String) { assertEquals( diff --git a/nexus/src/test/kotlin/RegistrationTest.kt b/nexus/src/test/kotlin/RegistrationTest.kt @@ -41,7 +41,7 @@ class RegistrationTest { for (tx in txs) { initiated.create(tx) } - this.initiated.batch(Instant.now(), name) + this.initiated.batch(Instant.now(), name, false) val (batch) = this.initiated.submittable() this.initiated.batchSubmissionSuccess(batch.id, Instant.now(), name.replace("BATCH", "ORDER")) tmp.add(batch) diff --git a/nexus/src/test/kotlin/WireGatewayApiTest.kt b/nexus/src/test/kotlin/WireGatewayApiTest.kt @@ -181,7 +181,7 @@ class WireGatewayApiTest { "credit_account" to grothoffPayto } }.assertOkJson<TransferResponse>() - db.initiated.batch(Instant.now(), randEbicsId()) + db.initiated.batch(Instant.now(), randEbicsId(), false) } client.getA("/taler-wire-gateway/transfers") .assertOkJson<TransferList> {