summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntoine A <>2024-03-12 20:48:56 +0100
committerAntoine A <>2024-03-13 21:19:30 +0100
commitcd6421f1c45e1a51415c19c1e28eed8a91c56008 (patch)
tree6ddfbfad5cd18a874e29ba9a95d0873387caf375
parentc481ebac54b747ebb56b09c1430755bc4a70f3e5 (diff)
downloadlibeufin-cd6421f1c45e1a51415c19c1e28eed8a91c56008.tar.gz
libeufin-cd6421f1c45e1a51415c19c1e28eed8a91c56008.tar.bz2
libeufin-cd6421f1c45e1a51415c19c1e28eed8a91c56008.zip
Track outgoing transactions status
-rw-r--r--common/src/main/kotlin/DB.kt6
-rw-r--r--common/src/main/kotlin/helpers.kt2
-rw-r--r--database-versioning/libeufin-nexus-0002.sql28
-rw-r--r--database-versioning/libeufin-nexus-drop.sql1
-rw-r--r--database-versioning/libeufin-nexus-procedures.sql1
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt527
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt77
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt34
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt145
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt12
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt49
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt203
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt128
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsBTS.kt6
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt2
-rw-r--r--nexus/src/test/kotlin/Common.kt1
-rw-r--r--nexus/src/test/kotlin/DatabaseTest.kt166
-rw-r--r--testbench/src/test/kotlin/IntegrationTest.kt4
-rw-r--r--testbench/src/test/kotlin/Iso20022Test.kt4
19 files changed, 629 insertions, 767 deletions
diff --git a/common/src/main/kotlin/DB.kt b/common/src/main/kotlin/DB.kt
index 2fa25ebc..b44e1fcd 100644
--- a/common/src/main/kotlin/DB.kt
+++ b/common/src/main/kotlin/DB.kt
@@ -139,11 +139,13 @@ fun <R> PgConnection.transaction(lambda: (PgConnection) -> R): R {
fun <T> PreparedStatement.oneOrNull(lambda: (ResultSet) -> T): T? {
executeQuery().use {
- if (!it.next()) return null
- return lambda(it)
+ return if (it.next()) lambda(it) else null
}
}
+fun <T> PreparedStatement.one(lambda: (ResultSet) -> T): T =
+ requireNotNull(oneOrNull(lambda)) { "Missing result to database query" }
+
fun <T> PreparedStatement.all(lambda: (ResultSet) -> T): List<T> {
executeQuery().use {
val ret = mutableListOf<T>()
diff --git a/common/src/main/kotlin/helpers.kt b/common/src/main/kotlin/helpers.kt
index cd803f64..2eba2d16 100644
--- a/common/src/main/kotlin/helpers.kt
+++ b/common/src/main/kotlin/helpers.kt
@@ -63,7 +63,7 @@ fun ByteArray.encodeBase64(): String = Base64.getEncoder().encodeToString(this)
/* ----- InputStream ----- */
/** Unzip an input stream and run [lambda] over each entry */
-fun InputStream.unzipEach(lambda: (String, InputStream) -> Unit) {
+inline fun InputStream.unzipEach(lambda: (String, InputStream) -> Unit) {
ZipInputStream(this).use { zip ->
while (true) {
val entry = zip.getNextEntry()
diff --git a/database-versioning/libeufin-nexus-0002.sql b/database-versioning/libeufin-nexus-0002.sql
new file mode 100644
index 00000000..03958b7d
--- /dev/null
+++ b/database-versioning/libeufin-nexus-0002.sql
@@ -0,0 +1,28 @@
+--
+-- This file is part of TALER
+-- Copyright (C) 2024 Taler Systems SA
+--
+-- TALER is free software; you can redistribute it and/or modify it under the
+-- terms of the GNU General Public License as published by the Free Software
+-- Foundation; either version 3, or (at your option) any later version.
+--
+-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+-- A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+--
+-- You should have received a copy of the GNU General Public License along with
+-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+
+BEGIN;
+
+SELECT _v.register_patch('libeufin-nexus-0002', NULL, NULL);
+
+SET search_path TO libeufin_nexus;
+
+-- Add order ID
+ALTER TABLE initiated_outgoing_transactions
+ ADD order_id TEXT NULL UNIQUE;
+COMMENT ON COLUMN initiated_outgoing_transactions.order_id
+ IS 'Order ID of the EBICS upload transaction, used to track EBICS order status.';
+
+COMMIT;
diff --git a/database-versioning/libeufin-nexus-drop.sql b/database-versioning/libeufin-nexus-drop.sql
index 51f13a06..77ac722a 100644
--- a/database-versioning/libeufin-nexus-drop.sql
+++ b/database-versioning/libeufin-nexus-drop.sql
@@ -1,6 +1,7 @@
BEGIN;
SELECT _v.unregister_patch('libeufin-nexus-0001');
+SELECT _v.unregister_patch('libeufin-nexus-0002');
DROP SCHEMA libeufin_nexus CASCADE;
COMMIT;
diff --git a/database-versioning/libeufin-nexus-procedures.sql b/database-versioning/libeufin-nexus-procedures.sql
index 1cd333a6..ed68ecc9 100644
--- a/database-versioning/libeufin-nexus-procedures.sql
+++ b/database-versioning/libeufin-nexus-procedures.sql
@@ -75,6 +75,7 @@ ELSE
SET
outgoing_transaction_id = out_tx_id
,submitted = 'success'
+ ,failure_message = null
WHERE request_uid = in_message_id
RETURNING true INTO out_initiated;
END IF;
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt
deleted file mode 100644
index 8a2b9185..00000000
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt
+++ /dev/null
@@ -1,527 +0,0 @@
-/*
- * This file is part of LibEuFin.
- * Copyright (C) 2024 Taler Systems S.A.
-
- * LibEuFin is free software; you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as
- * published by the Free Software Foundation; either version 3, or
- * (at your option) any later version.
-
- * LibEuFin is distributed in the hope that it will be useful, but
- * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
- * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
- * Public License for more details.
-
- * You should have received a copy of the GNU Affero General Public
- * License along with LibEuFin; see the file COPYING. If not, see
- * <http://www.gnu.org/licenses/>
- */
-package tech.libeufin.nexus
-
-import org.postgresql.util.PSQLState
-import tech.libeufin.common.*
-import java.sql.PreparedStatement
-import java.sql.SQLException
-import java.text.SimpleDateFormat
-import java.time.Instant
-import java.util.*
-
-fun Instant.fmtDate(): String {
- val formatter = SimpleDateFormat("yyyy-MM-dd")
- return formatter.format(Date.from(this))
-}
-
-fun Instant.fmtDateTime(): String {
- val formatter = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS")
- return formatter.format(Date.from(this))
-}
-
-// INCOMING PAYMENTS STRUCTS
-
-/**
- * Represents an incoming payment in the database.
- */
-data class IncomingPayment(
- val amount: TalerAmount,
- val wireTransferSubject: String,
- val debitPaytoUri: String,
- val executionTime: Instant,
- /** ISO20022 AccountServicerReference */
- val bankId: String
-) {
- override fun toString(): String {
- return "IN ${executionTime.fmtDate()} $amount '$bankId' debitor=$debitPaytoUri subject=$wireTransferSubject"
- }
-}
-
-
-// INITIATED PAYMENTS STRUCTS
-
-enum class DatabaseSubmissionState {
- /**
- * Submission got both EBICS_OK.
- */
- success,
- /**
- * Submission can be retried (network issue, for example)
- */
- transient_failure,
- /**
- * Submission got at least one error code which was not
- * EBICS_OK.
- */
- permanent_failure,
- /**
- * The submitted payment was never witnessed by a camt.5x
- * or pain.002 report.
- */
- never_heard_back
-}
-
-/**
- * Minimal set of information to initiate a new payment in
- * the database.
- */
-data class InitiatedPayment(
- val id: Long,
- val amount: TalerAmount,
- val wireTransferSubject: String,
- val creditPaytoUri: String,
- val initiationTime: Instant,
- val requestUid: String
-)
-
-/**
- * Possible outcomes for inserting a initiated payment
- * into the database.
- */
-enum class PaymentInitiationOutcome {
-
- /**
- * The row contains a client_request_uid that exists
- * already in the database.
- */
- UNIQUE_CONSTRAINT_VIOLATION,
- /**
- * Record successfully created.
- */
- SUCCESS
-}
-
-// OUTGOING PAYMENTS STRUCTS
-
-/**
- * Collects data of a booked outgoing payment.
- */
-data class OutgoingPayment(
- val amount: TalerAmount,
- val executionTime: Instant,
- /** ISO20022 MessageIdentification */
- val messageId: String,
- val creditPaytoUri: String? = null, // not showing in camt.054
- val wireTransferSubject: String? = null // not showing in camt.054
-) {
- override fun toString(): String {
- return "OUT ${executionTime.fmtDate()} $amount '$messageId' creditor=$creditPaytoUri subject=$wireTransferSubject"
- }
-}
-
-/** Outgoing payments registration result */
-data class OutgoingRegistrationResult(
- val id: Long,
- val initiated: Boolean,
- val new: Boolean
-)
-
-/** Incoming payments registration result */
-data class IncomingRegistrationResult(
- val id: Long,
- val new: Boolean
-)
-
-/** Incoming payments bounce registration result */
-data class IncomingBounceRegistrationResult(
- val id: Long,
- val bounceId: String,
- val new: Boolean
-)
-
-/**
- * Performs a INSERT, UPDATE, or DELETE operation.
- *
- * @return true if at least one row was affected by this operation,
- * false on unique constraint violation or no rows were affected.
- *
- */
-private fun PreparedStatement.maybeUpdate(): Boolean {
- try {
- this.executeUpdate()
- } catch (e: SQLException) {
- logger.error(e.message)
- if (e.sqlState == PSQLState.UNIQUE_VIOLATION.state) return false
- throw e // rethrowing, not to hide other types of errors.
- }
- return updateCount > 0
-}
-
-/**
- * Collects database connection steps and any operation on the Nexus tables.
- */
-class Database(dbConfig: String): DbPool(dbConfig, "libeufin_nexus") {
-
- // Temporary in memory database to store EBICS order status until we modify the schema to actually store it in the database
- var mem: MutableMap<String, String> = mutableMapOf()
-
- // OUTGOING PAYMENTS METHODS
-
- /**
- * Register an outgoing payment OPTIONALLY reconciling it with its
- * initiated payment counterpart.
- *
- * @param paymentData information about the outgoing payment.
- * @return operation outcome enum.
- */
- suspend fun registerOutgoing(paymentData: OutgoingPayment): OutgoingRegistrationResult = conn {
- val stmt = it.prepareStatement("""
- SELECT out_tx_id, out_initiated, out_found
- FROM register_outgoing(
- (?,?)::taler_amount
- ,?
- ,?
- ,?
- ,?
- )"""
- )
- val executionTime = paymentData.executionTime.toDbMicros()
- ?: throw Exception("Could not convert outgoing payment execution_time to microseconds")
- stmt.setLong(1, paymentData.amount.value)
- stmt.setInt(2, paymentData.amount.frac)
- stmt.setString(3, paymentData.wireTransferSubject)
- stmt.setLong(4, executionTime)
- stmt.setString(5, paymentData.creditPaytoUri)
- stmt.setString(6, paymentData.messageId)
-
- stmt.executeQuery().use {
- when {
- !it.next() -> throw Exception("Inserting outgoing payment gave no outcome.")
- else -> OutgoingRegistrationResult(
- it.getLong("out_tx_id"),
- it.getBoolean("out_initiated"),
- !it.getBoolean("out_found")
- )
- }
- }
- }
-
- // INCOMING PAYMENTS METHODS
-
- /**
- * Register an incoming payment and bounce it
- *
- * @param paymentData information about the incoming payment
- * @param requestUid unique identifier of the bounce outgoing payment to
- * initiate
- * @param bounceAmount amount to send back to the original debtor
- * @param bounceSubject subject of the bounce outhoing payment
- * @return true if new
- */
- suspend fun registerMalformedIncoming(
- paymentData: IncomingPayment,
- bounceAmount: TalerAmount,
- now: Instant
- ): IncomingBounceRegistrationResult = conn {
- val stmt = it.prepareStatement("""
- SELECT out_found, out_tx_id, out_bounce_id
- FROM register_incoming_and_bounce(
- (?,?)::taler_amount
- ,?
- ,?
- ,?
- ,?
- ,(?,?)::taler_amount
- ,?
- )"""
- )
- val refundTimestamp = now.toDbMicros()
- ?: throw Exception("Could not convert refund execution time from Instant.now() to microsends.")
- val executionTime = paymentData.executionTime.toDbMicros()
- ?: throw Exception("Could not convert payment execution time from Instant to microseconds.")
- stmt.setLong(1, paymentData.amount.value)
- stmt.setInt(2, paymentData.amount.frac)
- stmt.setString(3, paymentData.wireTransferSubject)
- stmt.setLong(4, executionTime)
- stmt.setString(5, paymentData.debitPaytoUri)
- stmt.setString(6, paymentData.bankId)
- stmt.setLong(7, bounceAmount.value)
- stmt.setInt(8, bounceAmount.frac)
- stmt.setLong(9, refundTimestamp)
- stmt.executeQuery().use {
- when {
- !it.next() -> throw Exception("Inserting malformed incoming payment gave no outcome")
- else -> IncomingBounceRegistrationResult(
- it.getLong("out_tx_id"),
- it.getString("out_bounce_id"),
- !it.getBoolean("out_found")
- )
- }
- }
- }
-
- /**
- * Register an talerable incoming payment
- *
- * @param paymentData incoming talerable payment.
- * @param reservePub reserve public key. The caller is
- * responsible to check it.
- */
- suspend fun registerTalerableIncoming(
- paymentData: IncomingPayment,
- reservePub: EddsaPublicKey
- ): IncomingRegistrationResult = conn { conn ->
- val stmt = conn.prepareStatement("""
- SELECT out_found, out_tx_id
- FROM register_incoming_and_talerable(
- (?,?)::taler_amount
- ,?
- ,?
- ,?
- ,?
- ,?
- )"""
- )
- val executionTime = paymentData.executionTime.toDbMicros()
- ?: throw Exception("Could not convert payment execution time from Instant to microseconds.")
- stmt.setLong(1, paymentData.amount.value)
- stmt.setInt(2, paymentData.amount.frac)
- stmt.setString(3, paymentData.wireTransferSubject)
- stmt.setLong(4, executionTime)
- stmt.setString(5, paymentData.debitPaytoUri)
- stmt.setString(6, paymentData.bankId)
- stmt.setBytes(7, reservePub.raw)
- stmt.executeQuery().use {
- when {
- !it.next() -> throw Exception("Inserting talerable incoming payment gave no outcome")
- else -> IncomingRegistrationResult(
- it.getLong("out_tx_id"),
- !it.getBoolean("out_found")
- )
- }
- }
- }
-
- /**
- * Get the last execution time of outgoing transactions.
- *
- * @return [Instant] or null if no results were found
- */
- suspend fun outgoingPaymentLastExecTime(): Instant? = conn { conn ->
- val stmt = conn.prepareStatement(
- "SELECT MAX(execution_time) as latest_execution_time FROM outgoing_transactions"
- )
- stmt.executeQuery().use {
- if (!it.next()) return@conn null
- val timestamp = it.getLong("latest_execution_time")
- if (timestamp == 0L) return@conn null
- return@conn timestamp.microsToJavaInstant()
- ?: throw Exception("Could not convert latest_execution_time to Instant")
- }
- }
-
- /**
- * Get the last execution time of an incoming transaction.
- *
- * @return [Instant] or null if no results were found
- */
- suspend fun incomingPaymentLastExecTime(): Instant? = conn { conn ->
- val stmt = conn.prepareStatement(
- "SELECT MAX(execution_time) as latest_execution_time FROM incoming_transactions"
- )
- stmt.executeQuery().use {
- if (!it.next()) return@conn null
- val timestamp = it.getLong("latest_execution_time")
- if (timestamp == 0L) return@conn null
- return@conn timestamp.microsToJavaInstant()
- ?: throw Exception("Could not convert latest_execution_time to Instant")
- }
- }
-
- /**
- * Checks if the reserve public key already exists.
- *
- * @param maybeReservePub reserve public key to look up
- * @return true if found, false otherwise
- */
- suspend fun isReservePubFound(maybeReservePub: EddsaPublicKey): Boolean = conn { conn ->
- val stmt = conn.prepareStatement("""
- SELECT 1
- FROM talerable_incoming_transactions
- WHERE reserve_public_key = ?;
- """)
- stmt.setBytes(1, maybeReservePub.raw)
- val res = stmt.executeQuery()
- res.use {
- return@conn it.next()
- }
- }
-
- // INITIATED PAYMENTS METHODS
-
- /**
- * Represents all the states but "unsubmitted" related to an
- * initiated payment. Unsubmitted gets set by default by the
- * database and there's no case where it has to be reset to an
- * initiated payment.
- */
-
- /**
- * Sets the submission state of an initiated payment. Transparently
- * sets the last_submission_time column too, as this corresponds to the
- * time when we set the state.
- *
- * @param rowId row ID of the record to set.
- * @param submissionState which state to set.
- * @return true on success, false if no payment was affected.
- */
- suspend fun initiatedPaymentSetSubmittedState(
- rowId: Long,
- submissionState: DatabaseSubmissionState
- ): Boolean = conn { conn ->
- val stmt = conn.prepareStatement("""
- UPDATE initiated_outgoing_transactions
- SET submitted = submission_state(?), last_submission_time = ?
- WHERE initiated_outgoing_transaction_id = ?
- """
- )
- val now = Instant.now()
- stmt.setString(1, submissionState.name)
- stmt.setLong(2, now.toDbMicros() ?: run {
- throw Exception("Submission time could not be converted to microseconds for the database.")
- })
- stmt.setLong(3, rowId)
- return@conn stmt.maybeUpdate()
- }
-
- /**
- * Sets the failure reason to an initiated payment.
- *
- * @param rowId row ID of the record to set.
- * @param failureMessage error associated to this initiated payment.
- * @return true on success, false if no payment was affected.
- */
- suspend fun initiatedPaymentSetFailureMessage(rowId: Long, failureMessage: String): Boolean = conn { conn ->
- val stmt = conn.prepareStatement("""
- UPDATE initiated_outgoing_transactions
- SET failure_message = ?
- WHERE initiated_outgoing_transaction_id=?
- """
- )
- stmt.setString(1, failureMessage)
- stmt.setLong(2, rowId)
- return@conn stmt.maybeUpdate()
- }
-
- /**
- * Gets any initiated payment that was not submitted to the
- * bank yet.
- *
- * @param currency in which currency should the payment be submitted to the bank.
- * @return [Map] of the initiated payment row ID and [InitiatedPayment]
- */
- suspend fun initiatedPaymentsSubmittableGet(currency: String): List<InitiatedPayment> = conn { conn ->
- val stmt = conn.prepareStatement("""
- SELECT
- initiated_outgoing_transaction_id
- ,(amount).val as amount_val
- ,(amount).frac as amount_frac
- ,wire_transfer_subject
- ,credit_payto_uri
- ,initiation_time
- ,request_uid
- FROM initiated_outgoing_transactions
- WHERE (submitted='unsubmitted' OR submitted='transient_failure')
- AND ((amount).val != 0 OR (amount).frac != 0);
- """)
- stmt.all {
- val rowId = it.getLong("initiated_outgoing_transaction_id")
- val initiationTime = it.getLong("initiation_time").microsToJavaInstant()
- if (initiationTime == null) { // nexus fault
- throw Exception("Found invalid timestamp at initiated payment with ID: $rowId")
- }
- InitiatedPayment(
- id = it.getLong("initiated_outgoing_transaction_id"),
- amount = it.getAmount("amount", currency),
- creditPaytoUri = it.getString("credit_payto_uri"),
- wireTransferSubject = it.getString("wire_transfer_subject"),
- initiationTime = initiationTime,
- requestUid = it.getString("request_uid")
- )
- }
- }
- /**
- * Initiate a payment in the database. The "submit"
- * command is then responsible to pick it up and submit
- * it to the bank.
- *
- * @param paymentData any data that's used to prepare the payment.
- * @return true if the insertion went through, false in case of errors.
- */
- suspend fun initiatedPaymentCreate(paymentData: InitiatedPayment): PaymentInitiationOutcome = conn { conn ->
- val stmt = conn.prepareStatement("""
- INSERT INTO initiated_outgoing_transactions (
- amount
- ,wire_transfer_subject
- ,credit_payto_uri
- ,initiation_time
- ,request_uid
- ) VALUES (
- (?,?)::taler_amount
- ,?
- ,?
- ,?
- ,?
- )
- """)
- stmt.setLong(1, paymentData.amount.value)
- stmt.setInt(2, paymentData.amount.frac)
- stmt.setString(3, paymentData.wireTransferSubject)
- stmt.setString(4, paymentData.creditPaytoUri.toString())
- val initiationTime = paymentData.initiationTime.toDbMicros() ?: run {
- throw Exception("Initiation time could not be converted to microseconds for the database.")
- }
- stmt.setLong(5, initiationTime)
- stmt.setString(6, paymentData.requestUid) // can be null.
- if (stmt.maybeUpdate())
- return@conn PaymentInitiationOutcome.SUCCESS
- /**
- * _very_ likely, Nexus didn't check the request idempotency,
- * as the row ID would never fall into the following problem.
- */
- return@conn PaymentInitiationOutcome.UNIQUE_CONSTRAINT_VIOLATION
- }
-
- /**
- * Gets the ID of an initiated payment. Useful to link it to its
- * outgoing payment witnessed in a bank record.
- *
- * @param uid UID as given by Nexus when it initiated the payment.
- * This value then gets specified as the MsgId of pain.001,
- * and it gets associated by the bank to the booked entries
- * in camt.05x reports.
- * @return the initiated payment row ID, or null if not found. NOTE:
- * null gets returned even when the initiated payment exists,
- * *but* it was NOT flagged as submitted.
- */
- suspend fun initiatedPaymentGetFromUid(uid: String): Long? = conn { conn ->
- val stmt = conn.prepareStatement("""
- SELECT initiated_outgoing_transaction_id
- FROM initiated_outgoing_transactions
- WHERE request_uid = ? AND submitted = 'success';
- """)
- stmt.setString(1, uid)
- val res = stmt.executeQuery()
- res.use {
- if (!it.next()) return@conn null
- return@conn it.getLong("initiated_outgoing_transaction_id")
- }
- }
-} \ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
index ca7b9116..765037e4 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
@@ -28,6 +28,7 @@ import io.ktor.client.plugins.*
import kotlinx.coroutines.*
import tech.libeufin.common.*
import tech.libeufin.nexus.ebics.*
+import tech.libeufin.nexus.db.*
import java.io.IOException
import java.io.InputStream
import java.time.Instant
@@ -91,7 +92,7 @@ suspend fun ingestOutgoingPayment(
db: Database,
payment: OutgoingPayment
) {
- val result = db.registerOutgoing(payment)
+ val result = db.payment.registerOutgoing(payment)
if (result.new) {
if (result.initiated)
logger.info("$payment")
@@ -117,7 +118,7 @@ suspend fun ingestIncomingPayment(
) {
runCatching { parseIncomingTxMetadata(payment.wireTransferSubject) }.fold(
onSuccess = { reservePub ->
- val result = db.registerTalerableIncoming(payment, reservePub)
+ val result = db.payment.registerTalerableIncoming(payment, reservePub)
if (result.new) {
logger.info("$payment")
} else {
@@ -125,7 +126,7 @@ suspend fun ingestIncomingPayment(
}
},
onFailure = { e ->
- val result = db.registerMalformedIncoming(
+ val result = db.payment.registerMalformedIncoming(
payment,
payment.amount,
Instant.now()
@@ -139,21 +140,7 @@ suspend fun ingestIncomingPayment(
)
}
-/**
- * Ingests an outgoing payment bounce.
- *
- * @param db database handle.
- * @param reversal reversal ingest.
- */
-suspend fun ingestReversal(
- db: Database,
- reversal: OutgoingReversal
-) {
- logger.warn("BOUNCE '${reversal.bankId}': ${reversal.reason}")
- // TODO store in db=
-}
-
-private fun ingestDocument(
+private suspend fun ingestDocument(
db: Database,
currency: String,
xml: InputStream,
@@ -162,15 +149,13 @@ private fun ingestDocument(
when (whichDocument) {
SupportedDocument.CAMT_054 -> {
try {
- val notifications = mutableListOf<TxNotification>()
- parseTxNotif(xml, currency, notifications)
-
- runBlocking {
- notifications.forEach {
- when (it) {
- is TxNotification.Incoming -> ingestIncomingPayment(db, it.payment)
- is TxNotification.Outgoing -> ingestOutgoingPayment(db, it.payment)
- is TxNotification.Reversal -> ingestReversal(db, it.reversal)
+ parseTxNotif(xml, currency).forEach {
+ when (it) {
+ is TxNotification.Incoming -> ingestIncomingPayment(db, it.payment)
+ is TxNotification.Outgoing -> ingestOutgoingPayment(db, it.payment)
+ is TxNotification.Reversal -> {
+ logger.warn("BOUNCE '${it.msgId}': ${it.reason}")
+ db.initiated.reversal(it.msgId, "Payment bounced: ${it.reason}")
}
}
}
@@ -181,42 +166,38 @@ private fun ingestDocument(
SupportedDocument.PAIN_002_LOGS -> {
val acks = parseCustomerAck(xml)
for (ack in acks) {
- val msg = if (ack.orderId != null) {
- if (ack.code != null) {
- val msg = ack.msg()
- db.mem[ack.orderId] = msg
- msg
- } else {
- db.mem[ack.orderId]
- }
- } else {
- null
- }
when (ack.actionType) {
- HacAction.FILE_DOWNLOAD -> logger.debug("$ack")
HacAction.ORDER_HAC_FINAL_POS -> {
- // TODO update pending transaction status
logger.debug("$ack")
- logger.info("Order '${ack.orderId}' was accepted at ${ack.timestamp.fmtDateTime()}")
+ db.initiated.logSuccess(ack.orderId!!)?.let { requestUID ->
+ logger.info("Payment '$requestUID' accepted at ${ack.timestamp.fmtDateTime()}")
+ }
}
HacAction.ORDER_HAC_FINAL_NEG -> {
- // TODO update pending transaction status
logger.debug("$ack")
- logger.warn("Order '${ack.orderId}' was refused at ${ack.timestamp.fmtDateTime()}: $msg")
+ db.initiated.logFailure(ack.orderId!!)?.let { (requestUID, msg) ->
+ logger.warn("Payment '$requestUID' refused at ${ack.timestamp.fmtDateTime()}${if (msg != null) ": $msg" else ""}")
+ }
}
else -> {
- // TODO update pending transaction status
logger.debug("$ack")
+ if (ack.orderId != null) {
+ db.initiated.logMessage(ack.orderId, ack.msg())
+ }
}
}
}
}
SupportedDocument.PAIN_002 -> {
val status = parseCustomerPaymentStatusReport(xml)
- if (status.paymentCode == ExternalPaymentGroupStatusCode.RJCT)
- logger.warn("Transaction '${status.id()}' was rejected")
- // TODO update pending transaction status
+ val msg = status.msg()
logger.debug("$status")
+ if (status.paymentCode == ExternalPaymentGroupStatusCode.RJCT) {
+ db.initiated.bankFailure(status.msgId, msg)
+ logger.warn("Transaction '${status.msgId}' was rejected : $msg")
+ } else {
+ db.initiated.bankMessage(status.msgId, msg)
+ }
}
SupportedDocument.CAMT_053,
SupportedDocument.CAMT_052 -> {
@@ -226,7 +207,7 @@ private fun ingestDocument(
}
}
-private fun ingestDocuments(
+private suspend fun ingestDocuments(
db: Database,
currency: String,
content: InputStream,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt
index 82c1a459..656ce694 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt
@@ -26,6 +26,7 @@ import io.ktor.client.*
import kotlinx.coroutines.*
import tech.libeufin.common.*
import tech.libeufin.nexus.ebics.*
+import tech.libeufin.nexus.db.*
import java.time.*
import java.util.*
@@ -109,19 +110,19 @@ private suspend fun submitBatch(
ctx: SubmissionContext,
db: Database,
) {
- logger.debug("Running submit at: ${Instant.now()}")
- db.initiatedPaymentsSubmittableGet(ctx.cfg.currency).forEach {
- logger.debug("Submitting payment initiation with row ID: ${it.id}")
- val submissionState = try {
- val orderId = submitInitiatedPayment(ctx, it)
- db.mem[orderId] = "Init"
- DatabaseSubmissionState.success
- } catch (e: Exception) {
- e.fmtLog(logger)
- DatabaseSubmissionState.transient_failure
- // TODO
- }
- db.initiatedPaymentSetSubmittedState(it.id, submissionState)
+ db.initiated.submittableGet(ctx.cfg.currency).forEach {
+ logger.debug("Submitting payment '${it.requestUid}'")
+ runCatching { submitInitiatedPayment(ctx, it) }.fold(
+ onSuccess = { orderId ->
+ db.initiated.submissionSuccess(it.id, Instant.now(), orderId)
+ logger.info("Payment '${it.requestUid}' submitted")
+ },
+ onFailure = { e ->
+ db.initiated.submissionFailure(it.id, Instant.now(), e.message)
+ logger.warn("Payment '${it.requestUid}' submission failure: ${e.message}")
+ throw e
+ }
+ )
}
}
@@ -171,8 +172,11 @@ class EbicsSubmit : CliktCommand("Submits any initiated payment found in the dat
}
}
do {
- // TODO error handling
- submitBatch(ctx, db)
+ try {
+ submitBatch(ctx, db)
+ } catch (e: Exception) {
+ throw Exception("Failed to submit payments", e)
+ }
// TODO take submitBatch taken time in the delay
delay(((frequency?.inSeconds ?: 0) * 1000).toLong())
} while (frequency != null)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt
index a9f44077..6269377a 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt
@@ -143,11 +143,7 @@ data class CustomerAck(
}
}
-/**
- * Extract logs from a pain.002 HAC document.
- *
- * @param xml pain.002 input document
- */
+/** Parse HAC pain.002 XML file */
fun parseCustomerAck(xml: InputStream): List<CustomerAck> {
return destructXml(xml, "Document") {
one("CstmrPmtStsRpt").map("OrgnlPmtInfAndSts") {
@@ -192,19 +188,21 @@ data class PaymentStatus(
fun description(): String = txCode?.description ?: paymentCode.description
- override fun toString(): String {
+ fun msg(): String {
return if (reasons.isEmpty()) {
- "'${id()}' ${code()} '${description()}'"
+ "${code()} '${description()}'"
} else if (reasons.size == 1) {
- "'${id()}' ${code()} ${reasons[0].code.isoCode} - '${description()}' '${reasons[0].code.description}'"
+ "${code()} ${reasons[0].code.isoCode} - '${description()}' '${reasons[0].code.description}'"
} else {
- var str = "'${id()}' ${code()} '${description()}' - "
+ var str = "${code()} '${description()}' - "
for (reason in reasons) {
- str += "${reason.code.isoCode} '${reason.code.description}'"
+ str += "${reason.code.isoCode} '${reason.code.description}' "
}
str
}
}
+
+ override fun toString(): String = "${id()} ${msg()}"
}
data class Reason (
@@ -212,11 +210,7 @@ data class Reason (
val information: String
)
-/**
- * Extract payment status from a pain.002 document.
- *
- * @param xml pain.002 input document
- */
+/** Parse pain.002 XML file */
fun parseCustomerPaymentStatusReport(xml: InputStream): PaymentStatus {
fun XmlDestructor.reasons(): List<Reason> {
return map("StsRsnInf") {
@@ -252,28 +246,74 @@ fun parseCustomerPaymentStatusReport(xml: InputStream): PaymentStatus {
sealed interface TxNotification {
data class Incoming(val payment: IncomingPayment): TxNotification
data class Outgoing(val payment: OutgoingPayment): TxNotification
- data class Reversal(val reversal: OutgoingReversal): TxNotification
+ data class Reversal(
+ val msgId: String,
+ val reason: String?
+ ): TxNotification
}
-data class OutgoingReversal(
- val bankId: String,
- val reason: String?
-)
+/** ISO20022 incoming payment */
+data class IncomingPayment(
+ val amount: TalerAmount,
+ val wireTransferSubject: String,
+ val debitPaytoUri: String,
+ val executionTime: Instant,
+ /** ISO20022 AccountServicerReference */
+ val bankId: String
+) {
+ override fun toString(): String {
+ return "IN ${executionTime.fmtDate()} $amount '$bankId' debitor=$debitPaytoUri subject=$wireTransferSubject"
+ }
+}
-/**
- * Searches payments in a camt.054 (Detailavisierung) document.
- *
- * @param notifXml camt.054 input document
- * @param acceptedCurrency currency accepted by Nexus
- * @param incoming list of incoming payments
- * @param outgoing list of outgoing payments
- */
+/** ISO20022 outgoing payment */
+data class OutgoingPayment(
+ val amount: TalerAmount,
+ val executionTime: Instant,
+ /** ISO20022 MessageIdentification */
+ val messageId: String,
+ val creditPaytoUri: String? = null, // not showing in camt.054
+ val wireTransferSubject: String? = null // not showing in camt.054
+) {
+ override fun toString(): String {
+ return "OUT ${executionTime.fmtDate()} $amount '$messageId' creditor=$creditPaytoUri subject=$wireTransferSubject"
+ }
+}
+
+/** Parse camt.054 XML file */
fun parseTxNotif(
notifXml: InputStream,
- acceptedCurrency: String,
- notifications: MutableList<TxNotification>,
-) {
- notificationForEachTx(notifXml) { bookDate, reversal, info ->
+ acceptedCurrency: String
+): List<TxNotification> {
+ fun notificationForEachTx(
+ directionLambda: XmlDestructor.(Instant, Boolean, String?) -> Unit
+ ) {
+ destructXml(notifXml, "Document") {
+ opt("BkToCstmrDbtCdtNtfctn")?.each("Ntfctn") {
+ each("Ntry") {
+ val reversal = opt("RvslInd")?.bool() ?: false
+ val info = opt("AddtlNtryInf")?.text()
+ one("Sts") {
+ if (text() != "BOOK") {
+ one("Cd") {
+ if (text() != "BOOK")
+ throw Exception("Found non booked transaction, " +
+ "stop parsing. Status was: ${text()}"
+ )
+ }
+ }
+ }
+ val bookDate: Instant = one("BookgDt").one("Dt").date().atStartOfDay().toInstant(ZoneOffset.UTC)
+ one("NtryDtls").each("TxDtls") {
+ directionLambda(this, bookDate, reversal, info)
+ }
+ }
+ }
+ }
+ }
+
+ val notifications = mutableListOf<TxNotification>()
+ notificationForEachTx { bookDate, reversal, info ->
val kind = one("CdtDbtInd").text()
val amount: TalerAmount = one("Amt") {
val currency = attr("Ccy")
@@ -289,10 +329,10 @@ fun parseTxNotif(
if (msgId == null) {
logger.debug("Unsupported reversal without message id")
} else {
- notifications.add(TxNotification.Reversal(OutgoingReversal(
- bankId = msgId,
+ notifications.add(TxNotification.Reversal(
+ msgId = msgId,
reason = info
- )))
+ ))
}
return@notificationForEachTx
}
@@ -341,38 +381,5 @@ fun parseTxNotif(
else -> throw Exception("Unknown transaction notification kind '$kind'")
}
}
-}
-
-/**
- * Navigates the camt.054 (Detailavisierung) until its leaves, where
- * then it invokes the related parser, according to the payment direction.
- *
- * @param xml the input document.
- */
-private fun notificationForEachTx(
- xml: InputStream,
- directionLambda: XmlDestructor.(Instant, Boolean, String?) -> Unit
-) {
- destructXml(xml, "Document") {
- opt("BkToCstmrDbtCdtNtfctn")?.each("Ntfctn") {
- each("Ntry") {
- val reversal = opt("RvslInd")?.bool() ?: false
- val info = opt("AddtlNtryInf")?.text()
- one("Sts") {
- if (text() != "BOOK") {
- one("Cd") {
- if (text() != "BOOK")
- throw Exception("Found non booked transaction, " +
- "stop parsing. Status was: ${text()}"
- )
- }
- }
- }
- val bookDate: Instant = one("BookgDt").one("Dt").date().atStartOfDay().toInstant(ZoneOffset.UTC)
- one("NtryDtls").each("TxDtls") {
- directionLambda(this, bookDate, reversal, info)
- }
- }
- }
- }
+ return notifications
} \ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
index 14c0f27c..8a51c766 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt
@@ -37,8 +37,11 @@ import org.slf4j.Logger
import org.slf4j.LoggerFactory
import tech.libeufin.common.*
import tech.libeufin.nexus.ebics.*
+import tech.libeufin.nexus.db.*
import java.nio.file.Path
-import java.time.Instant
+import java.util.*
+import java.time.*
+import java.time.format.*
val NEXUS_CONFIG_SOURCE = ConfigSource("libeufin", "libeufin-nexus", "libeufin-nexus")
internal val logger: Logger = LoggerFactory.getLogger("libeufin-nexus")
@@ -126,6 +129,11 @@ fun checkFrequency(foundInConfig: String): Int {
return frequencySeconds
}
+fun Instant.fmtDate(): String =
+ DateTimeFormatter.ISO_LOCAL_DATE.withZone(ZoneId.of("UTC")).format(this)
+
+fun Instant.fmtDateTime(): String =
+ DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC")).format(this)
/**
* Keeps all the options of the ebics-setup subcommand. The
@@ -251,7 +259,7 @@ class InitiatePayment: CliktCommand("Initiate an outgoing payment") {
}
Database(dbCfg.dbConnStr).use { db ->
- db.initiatedPaymentCreate(
+ db.initiated.create(
InitiatedPayment(
id = -1,
amount = amount,
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt
new file mode 100644
index 00000000..2827b5f3
--- /dev/null
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt
@@ -0,0 +1,49 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+package tech.libeufin.nexus.db
+
+import org.postgresql.util.PSQLState
+import tech.libeufin.common.*
+import tech.libeufin.nexus.*
+import java.sql.PreparedStatement
+import java.sql.SQLException
+import java.text.SimpleDateFormat
+import java.time.Instant
+import java.util.*
+
+/**
+ * Minimal set of information to initiate a new payment in
+ * the database.
+ */
+data class InitiatedPayment(
+ val id: Long,
+ val amount: TalerAmount,
+ val wireTransferSubject: String,
+ val creditPaytoUri: String,
+ val initiationTime: Instant,
+ val requestUid: String
+)
+
+/**
+ * Collects database connection steps and any operation on the Nexus tables.
+ */
+class Database(dbConfig: String): DbPool(dbConfig, "libeufin_nexus") {
+ val payment = PaymentDAO(this)
+ val initiated = InitiatedDAO(this)
+} \ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt
new file mode 100644
index 00000000..efe98f82
--- /dev/null
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt
@@ -0,0 +1,203 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+
+package tech.libeufin.nexus.db
+
+import tech.libeufin.nexus.*
+import tech.libeufin.common.*
+import java.time.Instant
+
+/** Data access logic for initiated outgoing payments */
+class InitiatedDAO(private val db: Database) {
+
+ /** Outgoing payments initiation result */
+ enum class PaymentInitiationResult {
+ REQUEST_UID_REUSE,
+ SUCCESS
+ }
+
+ /** Register a new pending payment in the database */
+ suspend fun create(paymentData: InitiatedPayment): PaymentInitiationResult = db.conn { conn ->
+ val stmt = conn.prepareStatement("""
+ INSERT INTO initiated_outgoing_transactions (
+ amount
+ ,wire_transfer_subject
+ ,credit_payto_uri
+ ,initiation_time
+ ,request_uid
+ ) VALUES ((?,?)::taler_amount,?,?,?,?)
+ """)
+ stmt.setLong(1, paymentData.amount.value)
+ stmt.setInt(2, paymentData.amount.frac)
+ stmt.setString(3, paymentData.wireTransferSubject)
+ stmt.setString(4, paymentData.creditPaytoUri.toString())
+ val initiationTime = paymentData.initiationTime.toDbMicros() ?: run {
+ throw Exception("Initiation time could not be converted to microseconds for the database.")
+ }
+ stmt.setLong(5, initiationTime)
+ stmt.setString(6, paymentData.requestUid)
+ if (stmt.executeUpdateViolation())
+ return@conn PaymentInitiationResult.SUCCESS
+ return@conn PaymentInitiationResult.REQUEST_UID_REUSE
+ }
+
+ /** Register EBICS submission success */
+ suspend fun submissionSuccess(
+ id: Long,
+ now: Instant,
+ orderId: String
+ ) = db.conn { conn ->
+ val stmt = conn.prepareStatement("""
+ UPDATE initiated_outgoing_transactions SET
+ submitted = 'success'::submission_state
+ ,last_submission_time = ?
+ ,failure_message = NULL
+ ,order_id = ?
+ ,submission_counter = submission_counter + 1
+ WHERE initiated_outgoing_transaction_id = ?
+ """)
+ stmt.setLong(1, now.toDbMicros()!!)
+ stmt.setString(2, orderId)
+ stmt.setLong(3, id)
+ stmt.execute()
+ }
+
+ /** Register EBICS submission failure */
+ suspend fun submissionFailure(
+ id: Long,
+ now: Instant,
+ msg: String?
+ ) = db.conn { conn ->
+ val stmt = conn.prepareStatement("""
+ UPDATE initiated_outgoing_transactions SET
+ submitted = 'transient_failure'::submission_state
+ ,last_submission_time = ?
+ ,failure_message = ?
+ ,submission_counter = submission_counter + 1
+ WHERE initiated_outgoing_transaction_id = ?
+ """)
+ stmt.setLong(1, now.toDbMicros()!!)
+ stmt.setString(2, msg)
+ stmt.setLong(3, id)
+ stmt.execute()
+ }
+
+ /** Register EBICS log status message */
+ suspend fun logMessage(orderId: String, msg: String) = db.conn { conn ->
+ val stmt = conn.prepareStatement("""
+ UPDATE initiated_outgoing_transactions SET failure_message = ?
+ WHERE order_id = ?
+ """)
+ stmt.setString(1, msg)
+ stmt.setString(2, orderId)
+ stmt.execute()
+ }
+
+ /** Register EBICS log success and return request_uid if found */
+ suspend fun logSuccess(orderId: String): String? = db.conn { conn ->
+ val stmt = conn.prepareStatement("""
+ SELECT request_uid FROM initiated_outgoing_transactions
+ WHERE order_id = ?
+ """)
+ stmt.setString(1, orderId)
+ stmt.oneOrNull { it.getString(1) }
+ }
+
+ /** Register EBICS log failure and return request_uid and previous message if found */
+ suspend fun logFailure(orderId: String): Pair<String, String?>? = db.conn { conn ->
+ val stmt = conn.prepareStatement("""
+ UPDATE initiated_outgoing_transactions
+ SET submitted = 'permanent_failure'::submission_state
+ WHERE order_id = ?
+ RETURNING request_uid, failure_message
+ """)
+ stmt.setString(1, orderId)
+ stmt.oneOrNull { Pair(it.getString(1), it.getString(2)) }
+ }
+
+ /** Register bank status message */
+ suspend fun bankMessage(requestUID: String, msg: String) = db.conn { conn ->
+ val stmt = conn.prepareStatement("""
+ UPDATE initiated_outgoing_transactions
+ SET failure_message = ?
+ WHERE request_uid = ?
+ """)
+ stmt.setString(1, msg)
+ stmt.setString(2, requestUID)
+ stmt.execute()
+ }
+
+ /** Register bank failure */
+ suspend fun bankFailure(requestUID: String, msg: String) = db.conn { conn ->
+ val stmt = conn.prepareStatement("""
+ UPDATE initiated_outgoing_transactions SET
+ submitted = 'permanent_failure'::submission_state
+ ,failure_message = ?
+ WHERE request_uid = ?
+ """)
+ stmt.setString(1, msg)
+ stmt.setString(2, requestUID)
+ stmt.execute()
+ }
+
+ /** Register reversal */
+ suspend fun reversal(requestUID: String, msg: String) = db.conn { conn ->
+ val stmt = conn.prepareStatement("""
+ UPDATE initiated_outgoing_transactions SET
+ submitted = 'permanent_failure'::submission_state
+ ,failure_message = ?
+ WHERE request_uid = ?
+ """)
+ stmt.setString(1, msg)
+ stmt.setString(2, requestUID)
+ stmt.execute()
+ }
+
+ // TODO WIP
+ suspend fun submittableGet(currency: String): List<InitiatedPayment> = db.conn { conn ->
+ val stmt = conn.prepareStatement("""
+ SELECT
+ initiated_outgoing_transaction_id
+ ,(amount).val as amount_val
+ ,(amount).frac as amount_frac
+ ,wire_transfer_subject
+ ,credit_payto_uri
+ ,initiation_time
+ ,request_uid
+ FROM initiated_outgoing_transactions
+ WHERE (submitted='unsubmitted' OR submitted='transient_failure')
+ AND ((amount).val != 0 OR (amount).frac != 0);
+ """)
+ stmt.all {
+ val rowId = it.getLong("initiated_outgoing_transaction_id")
+ val initiationTime = it.getLong("initiation_time").microsToJavaInstant()
+ if (initiationTime == null) { // nexus fault
+ throw Exception("Found invalid timestamp at initiated payment with ID: $rowId")
+ }
+ InitiatedPayment(
+ id = it.getLong("initiated_outgoing_transaction_id"),
+ amount = it.getAmount("amount", currency),
+ creditPaytoUri = it.getString("credit_payto_uri"),
+ wireTransferSubject = it.getString("wire_transfer_subject"),
+ initiationTime = initiationTime,
+ requestUid = it.getString("request_uid")
+ )
+ }
+ }
+} \ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt
new file mode 100644
index 00000000..2e315f38
--- /dev/null
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt
@@ -0,0 +1,128 @@
+/*
+ * This file is part of LibEuFin.
+ * Copyright (C) 2024 Taler Systems S.A.
+
+ * LibEuFin is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as
+ * published by the Free Software Foundation; either version 3, or
+ * (at your option) any later version.
+
+ * LibEuFin is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
+ * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General
+ * Public License for more details.
+
+ * You should have received a copy of the GNU Affero General Public
+ * License along with LibEuFin; see the file COPYING. If not, see
+ * <http://www.gnu.org/licenses/>
+ */
+
+package tech.libeufin.nexus.db
+
+import tech.libeufin.nexus.*
+import tech.libeufin.common.*
+import java.time.Instant
+
+/** Data access logic for incoming & outgoing payments */
+class PaymentDAO(private val db: Database) {
+ /** Outgoing payments registration result */
+ data class OutgoingRegistrationResult(
+ val id: Long,
+ val initiated: Boolean,
+ val new: Boolean
+ )
+
+ /** Register an outgoing payment reconciling it with its initiated payment counterpart if present */
+ suspend fun registerOutgoing(paymentData: OutgoingPayment): OutgoingRegistrationResult = db.conn {
+ val stmt = it.prepareStatement("""
+ SELECT out_tx_id, out_initiated, out_found
+ FROM register_outgoing((?,?)::taler_amount,?,?,?,?)
+ """)
+ val executionTime = paymentData.executionTime.toDbMicros()
+ ?: throw Exception("Could not convert outgoing payment execution_time to microseconds")
+ stmt.setLong(1, paymentData.amount.value)
+ stmt.setInt(2, paymentData.amount.frac)
+ stmt.setString(3, paymentData.wireTransferSubject)
+ stmt.setLong(4, executionTime)
+ stmt.setString(5, paymentData.creditPaytoUri)
+ stmt.setString(6, paymentData.messageId)
+ stmt.one {
+ OutgoingRegistrationResult(
+ it.getLong("out_tx_id"),
+ it.getBoolean("out_initiated"),
+ !it.getBoolean("out_found")
+ )
+ }
+ }
+
+ /** Incoming payments bounce registration result */
+ data class IncomingBounceRegistrationResult(
+ val id: Long,
+ val bounceId: String,
+ val new: Boolean
+ )
+
+ /** Register an incoming payment and bounce it */
+ suspend fun registerMalformedIncoming(
+ paymentData: IncomingPayment,
+ bounceAmount: TalerAmount,
+ now: Instant
+ ): IncomingBounceRegistrationResult = db.conn {
+ val stmt = it.prepareStatement("""
+ SELECT out_found, out_tx_id, out_bounce_id
+ FROM register_incoming_and_bounce((?,?)::taler_amount,?,?,?,?,(?,?)::taler_amount,?)
+ """)
+ val refundTimestamp = now.toDbMicros()
+ ?: throw Exception("Could not convert refund execution time from Instant.now() to microsends.")
+ val executionTime = paymentData.executionTime.toDbMicros()
+ ?: throw Exception("Could not convert payment execution time from Instant to microseconds.")
+ stmt.setLong(1, paymentData.amount.value)
+ stmt.setInt(2, paymentData.amount.frac)
+ stmt.setString(3, paymentData.wireTransferSubject)
+ stmt.setLong(4, executionTime)
+ stmt.setString(5, paymentData.debitPaytoUri)
+ stmt.setString(6, paymentData.bankId)
+ stmt.setLong(7, bounceAmount.value)
+ stmt.setInt(8, bounceAmount.frac)
+ stmt.setLong(9, refundTimestamp)
+ stmt.one {
+ IncomingBounceRegistrationResult(
+ it.getLong("out_tx_id"),
+ it.getString("out_bounce_id"),
+ !it.getBoolean("out_found")
+ )
+ }
+ }
+
+ /** Incoming payments registration result */
+ data class IncomingRegistrationResult(
+ val id: Long,
+ val new: Boolean
+ )
+
+ /** Register an talerable incoming payment */
+ suspend fun registerTalerableIncoming(
+ paymentData: IncomingPayment,
+ reservePub: EddsaPublicKey
+ ): IncomingRegistrationResult = db.conn { conn ->
+ val stmt = conn.prepareStatement("""
+ SELECT out_found, out_tx_id
+ FROM register_incoming_and_talerable((?,?)::taler_amount,?,?,?,?,?)
+ """)
+ val executionTime = paymentData.executionTime.toDbMicros()
+ ?: throw Exception("Could not convert payment execution time from Instant to microseconds.")
+ stmt.setLong(1, paymentData.amount.value)
+ stmt.setInt(2, paymentData.amount.frac)
+ stmt.setString(3, paymentData.wireTransferSubject)
+ stmt.setLong(4, executionTime)
+ stmt.setString(5, paymentData.debitPaytoUri)
+ stmt.setString(6, paymentData.bankId)
+ stmt.setBytes(7, reservePub.raw)
+ stmt.one {
+ IncomingRegistrationResult(
+ it.getLong("out_tx_id"),
+ !it.getBoolean("out_found")
+ )
+ }
+ }
+} \ No newline at end of file
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsBTS.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsBTS.kt
index 356d4b96..a6466366 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsBTS.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsBTS.kt
@@ -33,8 +33,10 @@ import javax.xml.datatype.DatatypeFactory
import java.security.interfaces.*
-fun Instant.xmlDate(): String = DateTimeFormatter.ISO_DATE.withZone(ZoneId.of("UTC")).format(this)
-fun Instant.xmlDateTime(): String = DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.of("UTC")).format(this)
+fun Instant.xmlDate(): String =
+ DateTimeFormatter.ISO_DATE.withZone(ZoneId.of("UTC")).format(this)
+fun Instant.xmlDateTime(): String =
+ DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.of("UTC")).format(this)
/** EBICS protocol for business transactions */
class EbicsBTS(
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt
index ae4233c4..77cefeb3 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt
@@ -173,7 +173,7 @@ suspend fun ebicsDownload(
order: EbicsOrder,
startDate: Instant?,
endDate: Instant?,
- processing: (InputStream) -> Unit,
+ processing: suspend (InputStream) -> Unit,
) = coroutineScope {
val impl = EbicsBTS(cfg, bankKeys, clientKeys, order)
val parentScope = this
diff --git a/nexus/src/test/kotlin/Common.kt b/nexus/src/test/kotlin/Common.kt
index 892f7af9..8925a6a8 100644
--- a/nexus/src/test/kotlin/Common.kt
+++ b/nexus/src/test/kotlin/Common.kt
@@ -26,6 +26,7 @@ import tech.libeufin.common.fromFile
import tech.libeufin.common.initializeDatabaseTables
import tech.libeufin.common.resetDatabaseTables
import tech.libeufin.nexus.*
+import tech.libeufin.nexus.db.*
import java.time.Instant
import kotlin.io.path.Path
diff --git a/nexus/src/test/kotlin/DatabaseTest.kt b/nexus/src/test/kotlin/DatabaseTest.kt
index 1f27a782..c9d70ae0 100644
--- a/nexus/src/test/kotlin/DatabaseTest.kt
+++ b/nexus/src/test/kotlin/DatabaseTest.kt
@@ -19,9 +19,10 @@
import org.junit.Test
import tech.libeufin.common.*
-import tech.libeufin.nexus.DatabaseSubmissionState
-import tech.libeufin.nexus.InitiatedPayment
-import tech.libeufin.nexus.PaymentInitiationOutcome
+import tech.libeufin.nexus.*
+import tech.libeufin.nexus.db.*
+import tech.libeufin.nexus.db.InitiatedDAO.*
+import tech.libeufin.nexus.db.PaymentDAO.*
import java.time.Instant
import kotlin.random.Random
import kotlin.test.*
@@ -32,25 +33,25 @@ class OutgoingPaymentsTest {
// With reconciling
genOutPay("paid by nexus", "first").run {
assertEquals(
- PaymentInitiationOutcome.SUCCESS,
- db.initiatedPaymentCreate(genInitPay("waiting for reconciliation", "first"))
+ PaymentInitiationResult.SUCCESS,
+ db.initiated.create(genInitPay("waiting for reconciliation", "first"))
)
- db.registerOutgoing(this).run {
+ db.payment.registerOutgoing(this).run {
assertTrue(new,)
assertTrue(initiated)
}
- db.registerOutgoing(this).run {
+ db.payment.registerOutgoing(this).run {
assertFalse(new)
assertTrue(initiated)
}
}
// Without reconciling
genOutPay("not paid by nexus", "second").run {
- db.registerOutgoing(this).run {
+ db.payment.registerOutgoing(this).run {
assertTrue(new)
assertFalse(initiated)
}
- db.registerOutgoing(this).run {
+ db.payment.registerOutgoing(this).run {
assertFalse(new)
assertFalse(initiated)
}
@@ -64,14 +65,14 @@ class IncomingPaymentsTest {
fun bounce() = setup { db, _ ->
// creating and bouncing one incoming transaction.
val payment = genInPay("incoming and bounced")
- db.registerMalformedIncoming(
+ db.payment.registerMalformedIncoming(
payment,
TalerAmount("KUDOS:2.53"),
Instant.now()
).run {
assertTrue(new)
}
- db.registerMalformedIncoming(
+ db.payment.registerMalformedIncoming(
payment,
TalerAmount("KUDOS:2.53"),
Instant.now()
@@ -112,79 +113,60 @@ class IncomingPaymentsTest {
)
}
}
-
- // Tests the creation of a talerable incoming payment.
- @Test
- fun talerable() = setup { db, _ ->
- val reservePub = EddsaPublicKey.rand()
-
- val inc = genInPay("reserve-pub")
- // Checking the reserve is not found.
- assertFalse(db.isReservePubFound(reservePub))
- assertTrue(db.registerTalerableIncoming(inc, reservePub).new)
- // Checking the reserve is not found.
- assertTrue(db.isReservePubFound(reservePub))
- assertFalse(db.registerTalerableIncoming(inc, reservePub).new)
- }
}
class PaymentInitiationsTest {
- // Testing the insertion of the failure message.
@Test
- fun setFailureMessage() = setup { db, _ ->
+ fun status() = setup { db, _ ->
assertEquals(
- db.initiatedPaymentCreate(genInitPay("not submitted, has row ID == 1")),
- PaymentInitiationOutcome.SUCCESS
+ PaymentInitiationResult.SUCCESS,
+ db.initiated.create(genInitPay(requestUid = "PAY1"))
)
- assertFalse(db.initiatedPaymentSetFailureMessage(3, "3 not existing"))
- assertTrue(db.initiatedPaymentSetFailureMessage(1, "expired"))
- // Checking the value from the database.
- db.conn { conn ->
- val idOne = conn.execSQLQuery("""
- SELECT failure_message
- FROM initiated_outgoing_transactions
- WHERE initiated_outgoing_transaction_id = 1;
- """.trimIndent())
- assertTrue(idOne.next())
- val maybeMessage = idOne.getString("failure_message")
- assertEquals("expired", maybeMessage)
- }
- }
- // Tests the flagging of payments as submitted.
- @Test
- fun paymentInitiationSetAsSubmitted() = setup { db, _ ->
- val getRowOne = """
- SELECT submitted
- FROM initiated_outgoing_transactions
- WHERE initiated_outgoing_transaction_id=1
- """
+ db.initiated.submissionFailure(1, Instant.now(), "First failure")
+ db.initiated.submissionFailure(1, Instant.now(), "Second failure")
+ db.initiated.submissionSuccess(1, Instant.now(), "ORDER1")
+ assertEquals(Pair("PAY1", null), db.initiated.logFailure("ORDER1"))
- // Creating the record first. Defaults to submitted == false.
assertEquals(
- PaymentInitiationOutcome.SUCCESS,
- db.initiatedPaymentCreate(genInitPay("not submitted, has row ID == 1")),
+ PaymentInitiationResult.SUCCESS,
+ db.initiated.create(genInitPay(requestUid = "PAY2"))
)
- // Asserting on the false default submitted state.
- db.conn { conn ->
- val isSubmitted = conn.execSQLQuery(getRowOne)
- assertTrue(isSubmitted.next())
- assertEquals("unsubmitted", isSubmitted.getString("submitted"))
- }
- // Switching the submitted state to success.
- assertTrue(db.initiatedPaymentSetSubmittedState(1, DatabaseSubmissionState.success))
- // Asserting on the submitted state being TRUE now.
- db.conn { conn ->
- val isSubmitted = conn.execSQLQuery(getRowOne)
- assertTrue(isSubmitted.next())
- assertEquals("success", isSubmitted.getString("submitted"))
- }
+ db.initiated.submissionFailure(2, Instant.now(), "First failure")
+ db.initiated.submissionSuccess(2, Instant.now(), "ORDER2")
+ db.initiated.logMessage("ORDER2", "status msg")
+ assertEquals(Pair("PAY2", "status msg"), db.initiated.logFailure("ORDER2"))
+
+ assertEquals(
+ PaymentInitiationResult.SUCCESS,
+ db.initiated.create(genInitPay(requestUid = "PAY3"))
+ )
+ db.initiated.submissionSuccess(3, Instant.now(), "ORDER3")
+ assertEquals("PAY3", db.initiated.logSuccess("ORDER3"))
+
+ // Unknown order
+ assertNull(db.initiated.logSuccess("ORDER_X"))
+ assertNull(db.initiated.logFailure("ORDER_X"))
+
+ assertEquals(
+ PaymentInitiationResult.SUCCESS,
+ db.initiated.create(genInitPay(requestUid = "PAY4"))
+ )
+ db.initiated.bankMessage("PAY4", "status progress")
+ db.initiated.bankFailure("PAY4", "status failure")
+
+ assertEquals(
+ PaymentInitiationResult.SUCCESS,
+ db.initiated.create(genInitPay(requestUid = "PAY5"))
+ )
+ db.initiated.bankMessage("PAY5", "status progress")
+ db.initiated.reversal("PAY5", "status reversal")
}
// Tests creation, unique constraint violation handling, and
// retrieving only one non-submitted payment.
@Test
fun paymentInitiation() = setup { db, _ ->
- val beEmpty = db.initiatedPaymentsSubmittableGet("KUDOS") // expect no records.
+ val beEmpty = db.initiated.submittableGet("KUDOS") // expect no records.
assertEquals(beEmpty.size, 0)
val initPay = InitiatedPayment(
id = -1,
@@ -194,17 +176,14 @@ class PaymentInitiationsTest {
requestUid = "unique",
initiationTime = Instant.now()
)
- assertNull(db.initiatedPaymentGetFromUid("unique"))
- assertEquals(db.initiatedPaymentCreate(initPay), PaymentInitiationOutcome.SUCCESS)
- assertEquals(db.initiatedPaymentCreate(initPay), PaymentInitiationOutcome.UNIQUE_CONSTRAINT_VIOLATION)
- val haveOne = db.initiatedPaymentsSubmittableGet("KUDOS")
+ assertEquals(db.initiated.create(initPay), PaymentInitiationResult.SUCCESS)
+ assertEquals(db.initiated.create(initPay), PaymentInitiationResult.REQUEST_UID_REUSE)
+ val haveOne = db.initiated.submittableGet("KUDOS")
assertTrue("Size ${haveOne.size} instead of 1") {
haveOne.size == 1
&& haveOne.first().id == 1L
&& haveOne.first().requestUid == "unique"
}
- assertTrue(db.initiatedPaymentSetSubmittedState(1, DatabaseSubmissionState.success))
- assertNotNull(db.initiatedPaymentGetFromUid("unique"))
}
/**
@@ -213,43 +192,39 @@ class PaymentInitiationsTest {
*/
@Test
fun submittablePayments() = setup { db, _ ->
- val beEmpty = db.initiatedPaymentsSubmittableGet("KUDOS")
+ val beEmpty = db.initiated.submittableGet("KUDOS")
assertEquals(0, beEmpty.size)
assertEquals(
- db.initiatedPaymentCreate(genInitPay(requestUid = "first")),
- PaymentInitiationOutcome.SUCCESS
+ db.initiated.create(genInitPay(requestUid = "first")),
+ PaymentInitiationResult.SUCCESS
)
assertEquals(
- db.initiatedPaymentCreate(genInitPay(requestUid = "second")),
- PaymentInitiationOutcome.SUCCESS
+ db.initiated.create(genInitPay(requestUid = "second")),
+ PaymentInitiationResult.SUCCESS
)
assertEquals(
- db.initiatedPaymentCreate(genInitPay(requestUid = "third")),
- PaymentInitiationOutcome.SUCCESS
+ db.initiated.create(genInitPay(requestUid = "third")),
+ PaymentInitiationResult.SUCCESS
)
// Setting the first as "transient_failure", must be found.
- assertTrue(db.initiatedPaymentSetSubmittedState(
- 1, DatabaseSubmissionState.transient_failure
- ))
+ db.initiated.submissionSuccess(1, Instant.now(), "Failure")
// Setting the second as "success", must not be found.
- assertTrue(db.initiatedPaymentSetSubmittedState(
- 2, DatabaseSubmissionState.success
- ))
- val expectTwo = db.initiatedPaymentsSubmittableGet("KUDOS")
+ db.initiated.submissionSuccess(2, Instant.now(), "ORDER1234")
+ val expectTwo = db.initiated.submittableGet("KUDOS")
// the third initiation keeps the default "unsubmitted"
// state, must be found. Total 2.
- assertEquals(2, expectTwo.size)
+ assertEquals(1, expectTwo.size)
}
// Tests how the fetch method gets the list of
// multiple unsubmitted payment initiations.
@Test
fun paymentInitiationsMultiple() = setup { db, _ ->
- assertEquals(db.initiatedPaymentCreate(genInitPay("#1", "unique1")), PaymentInitiationOutcome.SUCCESS)
- assertEquals(db.initiatedPaymentCreate(genInitPay("#2", "unique2")), PaymentInitiationOutcome.SUCCESS)
- assertEquals(db.initiatedPaymentCreate(genInitPay("#3", "unique3")), PaymentInitiationOutcome.SUCCESS)
- assertEquals(db.initiatedPaymentCreate(genInitPay("#4", "unique4")), PaymentInitiationOutcome.SUCCESS)
+ assertEquals(db.initiated.create(genInitPay("#1", "unique1")), PaymentInitiationResult.SUCCESS)
+ assertEquals(db.initiated.create(genInitPay("#2", "unique2")), PaymentInitiationResult.SUCCESS)
+ assertEquals(db.initiated.create(genInitPay("#3", "unique3")), PaymentInitiationResult.SUCCESS)
+ assertEquals(db.initiated.create(genInitPay("#4", "unique4")), PaymentInitiationResult.SUCCESS)
// Marking one as submitted, hence not expecting it in the results.
db.conn { conn ->
@@ -261,8 +236,7 @@ class PaymentInitiationsTest {
}
// Expecting all the payments BUT the #3 in the result.
- db.initiatedPaymentsSubmittableGet("KUDOS").apply {
-
+ db.initiated.submittableGet("KUDOS").apply {
assertEquals(3, this.size)
assertEquals("#1", this[0].wireTransferSubject)
assertEquals("#2", this[1].wireTransferSubject)
diff --git a/testbench/src/test/kotlin/IntegrationTest.kt b/testbench/src/test/kotlin/IntegrationTest.kt
index b398ad03..2f529a90 100644
--- a/testbench/src/test/kotlin/IntegrationTest.kt
+++ b/testbench/src/test/kotlin/IntegrationTest.kt
@@ -20,7 +20,7 @@
import org.junit.Test
import tech.libeufin.bank.*
import tech.libeufin.nexus.*
-import tech.libeufin.nexus.Database as NexusDb
+import tech.libeufin.nexus.db.Database as NexusDb
import tech.libeufin.bank.db.AccountDAO.*
import tech.libeufin.common.*
import java.time.Instant
@@ -185,7 +185,7 @@ class IntegrationTest {
}.assertNoContent()
assertException("ERROR: cashin failed: admin balance insufficient") {
- db.registerTalerableIncoming(payment, reservePub)
+ db.payment.registerTalerableIncoming(payment, reservePub)
}
// Allow admin debt
diff --git a/testbench/src/test/kotlin/Iso20022Test.kt b/testbench/src/test/kotlin/Iso20022Test.kt
index 7e0fc7b9..26447b9d 100644
--- a/testbench/src/test/kotlin/Iso20022Test.kt
+++ b/testbench/src/test/kotlin/Iso20022Test.kt
@@ -34,7 +34,7 @@ class Iso20022Test {
} else if (name.contains("pain.002")) {
parseCustomerPaymentStatusReport(content)
} else {
- parseTxNotif(content, "CHF", mutableListOf())
+ parseTxNotif(content, "CHF")
}
}
}
@@ -56,7 +56,7 @@ class Iso20022Test {
} else if (name.contains("pain.002")) {
parseCustomerPaymentStatusReport(content)
} else {
- parseTxNotif(content, "CHF", mutableListOf())
+ parseTxNotif(content, "CHF")
}
}
}