commit 13ed1892fead3ea284f2033b2df9917be698d3d7
parent cf71b2da383a9fb86d074eeb15c0c9d5fefd1f05
Author: Antoine A <>
Date: Mon, 9 Sep 2024 16:06:07 +0200
nexus: register outgoing transactions in batched report
Diffstat:
5 files changed, 174 insertions(+), 114 deletions(-)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt
@@ -128,8 +128,10 @@ suspend fun ingestFile(
when (it) {
is IncomingPayment -> ingestIncomingPayment(db, it, cfg.accountType)
is OutgoingBatch -> {
- logger.error("{}", it)
- db.initiated.batchStatusUpdate(it.msgId, SubmissionState.success, null, true)
+ logger.debug("{}", it)
+ for (pay in db.initiated.ingestBatch(it.msgId, it.executionTime)) {
+ ingestOutgoingPayment(db, pay)
+ }
}
is OutgoingPayment -> ingestOutgoingPayment(db, it)
is OutgoingReversal -> {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsSubmit.kt
@@ -81,14 +81,12 @@ private suspend fun submitBatch(client: EbicsClient) {
client.db.initiated.batch(Instant.now(), randEbicsId())
// Send submitable batches
client.db.initiated.submittable(client.cfg.currency).forEach { batch ->
- logger.debug {
- val transactions = batch.payments.map { it.endToEndId }.joinToString(",")
- "Submitting batch ${batch.messageId} of transactions: $transactions"
- }
+ logger.debug("Submitting batch {}", batch.messageId)
runCatching { submitBatch(client, batch) }.fold(
onSuccess = { orderId ->
client.db.initiated.batchSubmissionSuccess(batch.id, Instant.now(), orderId)
- logger.info("Batch ${batch.messageId} submitted as order $orderId")
+ val transactions = batch.payments.map { it.endToEndId }.joinToString(",")
+ logger.info("Batch ${batch.messageId} submitted as order $orderId: $transactions")
},
onFailure = { e ->
client.db.initiated.batchSubmissionFailure(batch.id, Instant.now(), e.message)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt
@@ -22,6 +22,7 @@ package tech.libeufin.nexus.db
import tech.libeufin.common.asInstant
import tech.libeufin.common.db.*
import tech.libeufin.common.micros
+import tech.libeufin.nexus.iso20022.OutgoingPayment
import java.time.Instant
/** Data access logic for initiated outgoing payments */
@@ -240,7 +241,7 @@ class InitiatedDAO(private val db: Database) {
}
/** Register EBICS payment status [state] with [msg] for batch [msgId] */
- suspend fun batchStatusUpdate(msgId: String, state: SubmissionState, msg: String?, definitive: Boolean = false) = db.serializableTransaction { tx ->
+ suspend fun batchStatusUpdate(msgId: String, state: SubmissionState, msg: String?) = db.serializableTransaction { tx ->
// Update batch state
val batchId = tx.withStatement(
"""
@@ -259,7 +260,7 @@ class InitiatedDAO(private val db: Database) {
// Update transactions state if they are not yet in final status
if (batchId != null) {
// When a batch succeed it doesn't mean that individual transaction also succeed
- val txState = if (state == SubmissionState.success && !definitive) {
+ val txState = if (state == SubmissionState.success) {
SubmissionState.pending
} else {
state
@@ -311,6 +312,51 @@ class InitiatedDAO(private val db: Database) {
}
}
+ /** Register EBICS payment batch as succeded and return pending payments */
+ suspend fun ingestBatch(msgId: String, executionTime: Instant) = db.serializableTransaction { tx ->
+ // Update batch state
+ val batchId = tx.withStatement(
+ """
+ UPDATE initiated_outgoing_batches
+ SET status = 'success'::submission_state, status_msg = NULL
+ WHERE message_id = ?
+ RETURNING initiated_outgoing_batch_id
+ """
+ ) {
+ setString(1, msgId)
+ oneOrNull { it.getLong("initiated_outgoing_batch_id") }
+ }
+ if (batchId == null) {
+ return@serializableTransaction emptyList()
+ }
+ // Update transactions state if they are not yet in final status
+ tx.withStatement(
+ """
+ UPDATE initiated_outgoing_transactions
+ SET status = 'success'::submission_state, status_msg = NULL
+ WHERE initiated_outgoing_batch_id = ?
+ AND status NOT IN ('permanent_failure', 'success')
+ RETURNING end_to_end_id,
+ (amount).val as amount_val,
+ (amount).frac as amount_frac,
+ subject,
+ credit_payto
+ """
+ ) {
+ setLong(1, batchId)
+ all {
+ OutgoingPayment(
+ endToEndId = it.getString("end_to_end_id"),
+ msgId = msgId,
+ amount = it.getAmount("amount", db.bankCurrency),
+ subject = it.getString("subject"),
+ executionTime = executionTime,
+ creditPaytoUri = it.getString("credit_payto")
+ )
+ }
+ }
+ }
+
/** Group unbatched transaction into a single batch */
suspend fun batch(timestamp: Instant, ebicsId: String) {
db.serializable("SELECT FROM batch_outgoing_transactions(?, ?)") {
@@ -318,7 +364,7 @@ class InitiatedDAO(private val db: Database) {
setString(2, ebicsId)
execute()
}
- }
+ }
/** List every initiated payment pending submission in the order they should be submitted */
suspend fun submittable(currency: String): List<PaymentBatch> {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/camt.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/iso20022/camt.kt
@@ -86,17 +86,6 @@ data class OutgoingReversal(
}
}
-private fun XmlDestructor.payto(prefix: String): String? {
- val iban = opt("${prefix}Acct")?.one("Id")?.one("IBAN")?.text()
- return if (iban != null) {
- val name = opt(prefix) { opt("Nm")?.text() ?: opt("Pty")?.one("Nm")?.text() }
- // Parse bic ?
- IbanPayto.build(iban, null, name)
- } else {
- null
- }
-}
-
private class TxErr(val msg: String): Exception(msg)
private enum class Kind {
@@ -114,6 +103,109 @@ private data class OutgoingId(
fun ref(): String? = endToEndId ?: msgId
}
+/** Parse a payto */
+private fun XmlDestructor.payto(prefix: String): String? {
+ return opt("RltdPties") {
+ val iban = opt("${prefix}Acct")?.one("Id")?.one("IBAN")?.text()
+ if (iban != null) {
+ val name = opt(prefix) { opt("Nm")?.text() ?: opt("Pty")?.one("Nm")?.text() }
+ // Parse bic ?
+ IbanPayto.build(iban, null, name)
+ } else {
+ null
+ }
+ }
+}
+
+/** Check if an entry status is BOOK */
+private fun XmlDestructor.isBooked(): Boolean {
+ // We check at the Sts or Sts/Cd level for retrocompatibility
+ return one("Sts") {
+ val status = opt("Cd")?.text() ?: text()
+ status == "BOOK"
+ }
+}
+
+/** Parse the instruction execution date */
+private fun XmlDestructor.executionDate(): Instant {
+ // Value date if present else booking date
+ val date = opt("ValDt") ?: one("BookgDt")
+ val parsed = date.opt("Dt") {
+ date().atStartOfDay()
+ } ?: date.one("DtTm") {
+ dateTime()
+ }
+ return parsed.toInstant(ZoneOffset.UTC)
+}
+
+/** Parse batch message ID and transaction end-to-end ID as generated by libeufin-nexus */
+private fun XmlDestructor.outgoingId(): OutgoingId = one("Refs") {
+ val endToEndId = opt("EndToEndId")?.text()
+ val msgId = opt("MsgId")?.text()
+ if (endToEndId == null) {
+ // This is a batch representation
+ OutgoingId(msgId, null)
+ } else if (endToEndId == "NOTPROVIDED") {
+ // If not set use MsgId as end-to-end ID for retrocompatibility
+ OutgoingId(msgId, msgId)
+ } else {
+ OutgoingId(msgId, endToEndId)
+ }
+}
+
+/** Parse and format transaction return reasons */
+private fun XmlDestructor.returnReason(): String = opt("RtrInf") {
+ val code = one("Rsn").one("Cd").enum<ExternalReturnReasonCode>()
+ val info = map("AddtlInf") { text() }.joinToString("")
+ buildString {
+ append("${code.isoCode} '${code.description}'")
+ if (info.isNotEmpty()) {
+ append(" - '$info'")
+ }
+ }
+} ?: opt("RmtInf") {
+ map("Ustrd") { text() }.joinToString("")
+} ?: ""
+
+/** Parse amount */
+private fun XmlDestructor.amount(acceptedCurrency: String) = one("Amt") {
+ val currency = attr("Ccy")
+ /** FIXME: test by sending non-CHF to PoFi and see which currency gets here. */
+ if (currency != acceptedCurrency) throw Exception("Currency $currency not supported")
+ TalerAmount("$currency:${text()}")
+}
+
+/** Parse bank transaction code */
+private fun XmlDestructor.bankTransactionCode(): BankTransactionCode {
+ return one("BkTxCd").one("Domn") {
+ val domain = one("Cd").enum<ExternalBankTransactionDomainCode>()
+ one("Fmly") {
+ val family = one("Cd").enum<ExternalBankTransactionFamilyCode>()
+ val subFamily = one("SubFmlyCd").enum<ExternalBankTransactionSubFamilyCode>()
+
+ BankTransactionCode(domain, family, subFamily)
+ }
+ }
+}
+
+/** Parse optional bank transaction code */
+private fun XmlDestructor.optBankTransactionCode(): BankTransactionCode? {
+ return opt("BkTxCd")?.one("Domn") {
+ val domain = one("Cd").enum<ExternalBankTransactionDomainCode>()
+ one("Fmly") {
+ val family = one("Cd").enum<ExternalBankTransactionFamilyCode>()
+ val subFamily = one("SubFmlyCd").enum<ExternalBankTransactionSubFamilyCode>()
+
+ BankTransactionCode(domain, family, subFamily)
+ }
+ }
+}
+
+/** Parse transaction wire transfer subject */
+private fun XmlDestructor.wireTransferSubject(): String? {
+ return opt("RmtInf")?.map("Ustrd") { text() }?.joinToString("")?.trim()
+}
+
/** Parse camt.054 or camt.053 file */
fun parseTx(
notifXml: InputStream,
@@ -143,92 +235,7 @@ fun parseTx(
therefore only be used as a last resort.
*/
- /** Check if an entry status is BOOK */
- fun XmlDestructor.isBooked(): Boolean {
- // We check at the Sts or Sts/Cd level for retrocompatibility
- return one("Sts") {
- val status = opt("Cd")?.text() ?: text()
- status == "BOOK"
- }
- }
-
- /** Parse the instruction execution date */
- fun XmlDestructor.executionDate(): Instant {
- // Value date if present else booking date
- val date = opt("ValDt") ?: one("BookgDt")
- val parsed = date.opt("Dt") {
- date().atStartOfDay()
- } ?: date.one("DtTm") {
- dateTime()
- }
- return parsed.toInstant(ZoneOffset.UTC)
- }
-
- /** Parse batch message ID and transaction end-to-end ID as generated by libeufin-nexus */
- fun XmlDestructor.outgoingId(): OutgoingId = one("Refs") {
- val endToEndId = opt("EndToEndId")?.text()
- val msgId = opt("MsgId")?.text()
- if (endToEndId == null) {
- // This is a batch representation
- OutgoingId(msgId, null)
- } else if (endToEndId == "NOTPROVIDED") {
- // If not set use MsgId as end-to-end ID for retrocompatibility
- OutgoingId(msgId, msgId)
- } else {
- OutgoingId(msgId, endToEndId)
- }
- }
-
- /** Parse and format transaction return reasons */
- fun XmlDestructor.returnReason(): String = opt("RtrInf") {
- val code = one("Rsn").one("Cd").enum<ExternalReturnReasonCode>()
- val info = map("AddtlInf") { text() }.joinToString("")
- buildString {
- append("${code.isoCode} '${code.description}'")
- if (info.isNotEmpty()) {
- append(" - '$info'")
- }
- }
- } ?: opt("RmtInf") {
- map("Ustrd") { text() }.joinToString("")
- } ?: ""
-
- /** Parse amount */
- fun XmlDestructor.amount(acceptedCurrency: String) = one("Amt") {
- val currency = attr("Ccy")
- /** FIXME: test by sending non-CHF to PoFi and see which currency gets here. */
- if (currency != acceptedCurrency) throw Exception("Currency $currency not supported")
- TalerAmount("$currency:${text()}")
- }
-
- /** Parse bank transaction code */
- fun XmlDestructor.bankTransactionCode(): BankTransactionCode {
- return one("BkTxCd").one("Domn") {
- val domain = one("Cd").enum<ExternalBankTransactionDomainCode>()
- one("Fmly") {
- val family = one("Cd").enum<ExternalBankTransactionFamilyCode>()
- val subFamily = one("SubFmlyCd").enum<ExternalBankTransactionSubFamilyCode>()
-
- BankTransactionCode(domain, family, subFamily)
- }
- }
- }
-
- /** Parse optional bank transaction code */
- fun XmlDestructor.optBankTransactionCode(): BankTransactionCode? {
- return opt("BkTxCd")?.one("Domn") {
- val domain = one("Cd").enum<ExternalBankTransactionDomainCode>()
- one("Fmly") {
- val family = one("Cd").enum<ExternalBankTransactionFamilyCode>()
- val subFamily = one("SubFmlyCd").enum<ExternalBankTransactionSubFamilyCode>()
-
- BankTransactionCode(domain, family, subFamily)
- }
- }
- }
-
val txsInfo = mutableListOf<TxInfo>()
-
XmlDestructor.fromStream(notifXml, "Document") { when (dialect) {
Dialect.gls -> {
/** Common parsing logic for camt.052 and camt.053 */
@@ -260,11 +267,11 @@ fun parseTx(
))
}
} else {
- val subject = opt("RmtInf")?.map("Ustrd") { text() }?.joinToString("")
+ val subject = wireTransferSubject()
when (kind) {
Kind.CRDT -> {
val bankId = one("Refs").opt("TxId")?.text()
- val debtorPayto = opt("RltdPties") { payto("Dbtr") }
+ val debtorPayto = payto("Dbtr")
txsInfo.add(TxInfo.Credit(
ref = bankId ?: txRef ?: entryRef,
bookDate = bookDate,
@@ -277,7 +284,7 @@ fun parseTx(
}
Kind.DBIT -> {
val outgoingId = outgoingId()
- val creditorPayto = opt("RltdPties") { payto("Cdtr") }
+ val creditorPayto = payto("Cdtr")
txsInfo.add(TxInfo.Debit(
ref = outgoingId.ref() ?: txRef ?: entryRef,
bookDate = bookDate,
@@ -317,10 +324,10 @@ fun parseTx(
one("NtryDtls").one("TxDtls") {
val code = optBankTransactionCode() ?: code
val txRef = opt("Refs")?.opt("AcctSvcrRef")?.text()
- val subject = opt("RmtInf")?.map("Ustrd") { text() }?.joinToString("")
+ val subject = wireTransferSubject()
if (kind == Kind.CRDT) {
val bankId = one("Refs").opt("TxId")?.text()
- val debtorPayto = opt("RltdPties") { payto("Dbtr") }
+ val debtorPayto = payto("Dbtr")
txsInfo.add(TxInfo.Credit(
ref = txRef ?: entryRef,
bookDate = bookDate,
@@ -391,11 +398,11 @@ fun parseTx(
val code = optBankTransactionCode() ?: code
val amount = amount(acceptedCurrency)
val txRef = opt("Refs")?.opt("AcctSvcrRef")?.text()
- val subject = opt("RmtInf")?.map("Ustrd") { text() }?.joinToString("")
+ val subject = wireTransferSubject()
when (kind) {
Kind.CRDT -> {
val bankId = opt("Refs")?.opt("UETR")?.text()
- val debtorPayto = opt("RltdPties") { payto("Dbtr") }
+ val debtorPayto = payto("Dbtr")
txsInfo.add(TxInfo.Credit(
ref = bankId ?: txRef ?: entryRef,
bookDate = bookDate,
@@ -408,7 +415,7 @@ fun parseTx(
}
Kind.DBIT -> {
val outgoingId = outgoingId()
- val creditorPayto = opt("RltdPties") { payto("Cdtr") }
+ val creditorPayto = payto("Cdtr")
txsInfo.add(TxInfo.Debit(
ref = outgoingId.ref() ?: txRef ?: entryRef,
bookDate = bookDate,
diff --git a/nexus/src/test/kotlin/IngestionTest.kt b/nexus/src/test/kotlin/IngestionTest.kt
@@ -348,6 +348,13 @@ class IngestionTest {
executionTime = dateToInstant("2024-04-18"),
creditPaytoUri = "payto://iban/DE20500105172419259181?receiver-name=John%20Smith"
),
+ OutgoingPayment(
+ endToEndId = "27SK3166EG36SJ7VP7VFYP0MW8",
+ amount = TalerAmount("EUR:44"),
+ subject = "init payment",
+ executionTime = dateToInstant("2024-09-04"),
+ creditPaytoUri = "payto://iban/CH4189144589712575493?receiver-name=Test"
+ ),
)
)
}