diff options
author | Antoine A <> | 2024-03-12 20:48:56 +0100 |
---|---|---|
committer | Antoine A <> | 2024-03-13 21:19:30 +0100 |
commit | cd6421f1c45e1a51415c19c1e28eed8a91c56008 (patch) | |
tree | 6ddfbfad5cd18a874e29ba9a95d0873387caf375 /nexus | |
parent | c481ebac54b747ebb56b09c1430755bc4a70f3e5 (diff) | |
download | libeufin-cd6421f1c45e1a51415c19c1e28eed8a91c56008.tar.gz libeufin-cd6421f1c45e1a51415c19c1e28eed8a91c56008.tar.bz2 libeufin-cd6421f1c45e1a51415c19c1e28eed8a91c56008.zip |
Track outgoing transactions status
Diffstat (limited to 'nexus')
12 files changed, 590 insertions, 760 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt deleted file mode 100644 index 8a2b9185..00000000 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Database.kt +++ /dev/null @@ -1,527 +0,0 @@ -/* - * This file is part of LibEuFin. - * Copyright (C) 2024 Taler Systems S.A. - - * LibEuFin is free software; you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation; either version 3, or - * (at your option) any later version. - - * LibEuFin is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General - * Public License for more details. - - * You should have received a copy of the GNU Affero General Public - * License along with LibEuFin; see the file COPYING. If not, see - * <http://www.gnu.org/licenses/> - */ -package tech.libeufin.nexus - -import org.postgresql.util.PSQLState -import tech.libeufin.common.* -import java.sql.PreparedStatement -import java.sql.SQLException -import java.text.SimpleDateFormat -import java.time.Instant -import java.util.* - -fun Instant.fmtDate(): String { - val formatter = SimpleDateFormat("yyyy-MM-dd") - return formatter.format(Date.from(this)) -} - -fun Instant.fmtDateTime(): String { - val formatter = SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS") - return formatter.format(Date.from(this)) -} - -// INCOMING PAYMENTS STRUCTS - -/** - * Represents an incoming payment in the database. - */ -data class IncomingPayment( - val amount: TalerAmount, - val wireTransferSubject: String, - val debitPaytoUri: String, - val executionTime: Instant, - /** ISO20022 AccountServicerReference */ - val bankId: String -) { - override fun toString(): String { - return "IN ${executionTime.fmtDate()} $amount '$bankId' debitor=$debitPaytoUri subject=$wireTransferSubject" - } -} - - -// INITIATED PAYMENTS STRUCTS - -enum class DatabaseSubmissionState { - /** - * Submission got both EBICS_OK. - */ - success, - /** - * Submission can be retried (network issue, for example) - */ - transient_failure, - /** - * Submission got at least one error code which was not - * EBICS_OK. - */ - permanent_failure, - /** - * The submitted payment was never witnessed by a camt.5x - * or pain.002 report. - */ - never_heard_back -} - -/** - * Minimal set of information to initiate a new payment in - * the database. - */ -data class InitiatedPayment( - val id: Long, - val amount: TalerAmount, - val wireTransferSubject: String, - val creditPaytoUri: String, - val initiationTime: Instant, - val requestUid: String -) - -/** - * Possible outcomes for inserting a initiated payment - * into the database. - */ -enum class PaymentInitiationOutcome { - - /** - * The row contains a client_request_uid that exists - * already in the database. - */ - UNIQUE_CONSTRAINT_VIOLATION, - /** - * Record successfully created. - */ - SUCCESS -} - -// OUTGOING PAYMENTS STRUCTS - -/** - * Collects data of a booked outgoing payment. - */ -data class OutgoingPayment( - val amount: TalerAmount, - val executionTime: Instant, - /** ISO20022 MessageIdentification */ - val messageId: String, - val creditPaytoUri: String? = null, // not showing in camt.054 - val wireTransferSubject: String? = null // not showing in camt.054 -) { - override fun toString(): String { - return "OUT ${executionTime.fmtDate()} $amount '$messageId' creditor=$creditPaytoUri subject=$wireTransferSubject" - } -} - -/** Outgoing payments registration result */ -data class OutgoingRegistrationResult( - val id: Long, - val initiated: Boolean, - val new: Boolean -) - -/** Incoming payments registration result */ -data class IncomingRegistrationResult( - val id: Long, - val new: Boolean -) - -/** Incoming payments bounce registration result */ -data class IncomingBounceRegistrationResult( - val id: Long, - val bounceId: String, - val new: Boolean -) - -/** - * Performs a INSERT, UPDATE, or DELETE operation. - * - * @return true if at least one row was affected by this operation, - * false on unique constraint violation or no rows were affected. - * - */ -private fun PreparedStatement.maybeUpdate(): Boolean { - try { - this.executeUpdate() - } catch (e: SQLException) { - logger.error(e.message) - if (e.sqlState == PSQLState.UNIQUE_VIOLATION.state) return false - throw e // rethrowing, not to hide other types of errors. - } - return updateCount > 0 -} - -/** - * Collects database connection steps and any operation on the Nexus tables. - */ -class Database(dbConfig: String): DbPool(dbConfig, "libeufin_nexus") { - - // Temporary in memory database to store EBICS order status until we modify the schema to actually store it in the database - var mem: MutableMap<String, String> = mutableMapOf() - - // OUTGOING PAYMENTS METHODS - - /** - * Register an outgoing payment OPTIONALLY reconciling it with its - * initiated payment counterpart. - * - * @param paymentData information about the outgoing payment. - * @return operation outcome enum. - */ - suspend fun registerOutgoing(paymentData: OutgoingPayment): OutgoingRegistrationResult = conn { - val stmt = it.prepareStatement(""" - SELECT out_tx_id, out_initiated, out_found - FROM register_outgoing( - (?,?)::taler_amount - ,? - ,? - ,? - ,? - )""" - ) - val executionTime = paymentData.executionTime.toDbMicros() - ?: throw Exception("Could not convert outgoing payment execution_time to microseconds") - stmt.setLong(1, paymentData.amount.value) - stmt.setInt(2, paymentData.amount.frac) - stmt.setString(3, paymentData.wireTransferSubject) - stmt.setLong(4, executionTime) - stmt.setString(5, paymentData.creditPaytoUri) - stmt.setString(6, paymentData.messageId) - - stmt.executeQuery().use { - when { - !it.next() -> throw Exception("Inserting outgoing payment gave no outcome.") - else -> OutgoingRegistrationResult( - it.getLong("out_tx_id"), - it.getBoolean("out_initiated"), - !it.getBoolean("out_found") - ) - } - } - } - - // INCOMING PAYMENTS METHODS - - /** - * Register an incoming payment and bounce it - * - * @param paymentData information about the incoming payment - * @param requestUid unique identifier of the bounce outgoing payment to - * initiate - * @param bounceAmount amount to send back to the original debtor - * @param bounceSubject subject of the bounce outhoing payment - * @return true if new - */ - suspend fun registerMalformedIncoming( - paymentData: IncomingPayment, - bounceAmount: TalerAmount, - now: Instant - ): IncomingBounceRegistrationResult = conn { - val stmt = it.prepareStatement(""" - SELECT out_found, out_tx_id, out_bounce_id - FROM register_incoming_and_bounce( - (?,?)::taler_amount - ,? - ,? - ,? - ,? - ,(?,?)::taler_amount - ,? - )""" - ) - val refundTimestamp = now.toDbMicros() - ?: throw Exception("Could not convert refund execution time from Instant.now() to microsends.") - val executionTime = paymentData.executionTime.toDbMicros() - ?: throw Exception("Could not convert payment execution time from Instant to microseconds.") - stmt.setLong(1, paymentData.amount.value) - stmt.setInt(2, paymentData.amount.frac) - stmt.setString(3, paymentData.wireTransferSubject) - stmt.setLong(4, executionTime) - stmt.setString(5, paymentData.debitPaytoUri) - stmt.setString(6, paymentData.bankId) - stmt.setLong(7, bounceAmount.value) - stmt.setInt(8, bounceAmount.frac) - stmt.setLong(9, refundTimestamp) - stmt.executeQuery().use { - when { - !it.next() -> throw Exception("Inserting malformed incoming payment gave no outcome") - else -> IncomingBounceRegistrationResult( - it.getLong("out_tx_id"), - it.getString("out_bounce_id"), - !it.getBoolean("out_found") - ) - } - } - } - - /** - * Register an talerable incoming payment - * - * @param paymentData incoming talerable payment. - * @param reservePub reserve public key. The caller is - * responsible to check it. - */ - suspend fun registerTalerableIncoming( - paymentData: IncomingPayment, - reservePub: EddsaPublicKey - ): IncomingRegistrationResult = conn { conn -> - val stmt = conn.prepareStatement(""" - SELECT out_found, out_tx_id - FROM register_incoming_and_talerable( - (?,?)::taler_amount - ,? - ,? - ,? - ,? - ,? - )""" - ) - val executionTime = paymentData.executionTime.toDbMicros() - ?: throw Exception("Could not convert payment execution time from Instant to microseconds.") - stmt.setLong(1, paymentData.amount.value) - stmt.setInt(2, paymentData.amount.frac) - stmt.setString(3, paymentData.wireTransferSubject) - stmt.setLong(4, executionTime) - stmt.setString(5, paymentData.debitPaytoUri) - stmt.setString(6, paymentData.bankId) - stmt.setBytes(7, reservePub.raw) - stmt.executeQuery().use { - when { - !it.next() -> throw Exception("Inserting talerable incoming payment gave no outcome") - else -> IncomingRegistrationResult( - it.getLong("out_tx_id"), - !it.getBoolean("out_found") - ) - } - } - } - - /** - * Get the last execution time of outgoing transactions. - * - * @return [Instant] or null if no results were found - */ - suspend fun outgoingPaymentLastExecTime(): Instant? = conn { conn -> - val stmt = conn.prepareStatement( - "SELECT MAX(execution_time) as latest_execution_time FROM outgoing_transactions" - ) - stmt.executeQuery().use { - if (!it.next()) return@conn null - val timestamp = it.getLong("latest_execution_time") - if (timestamp == 0L) return@conn null - return@conn timestamp.microsToJavaInstant() - ?: throw Exception("Could not convert latest_execution_time to Instant") - } - } - - /** - * Get the last execution time of an incoming transaction. - * - * @return [Instant] or null if no results were found - */ - suspend fun incomingPaymentLastExecTime(): Instant? = conn { conn -> - val stmt = conn.prepareStatement( - "SELECT MAX(execution_time) as latest_execution_time FROM incoming_transactions" - ) - stmt.executeQuery().use { - if (!it.next()) return@conn null - val timestamp = it.getLong("latest_execution_time") - if (timestamp == 0L) return@conn null - return@conn timestamp.microsToJavaInstant() - ?: throw Exception("Could not convert latest_execution_time to Instant") - } - } - - /** - * Checks if the reserve public key already exists. - * - * @param maybeReservePub reserve public key to look up - * @return true if found, false otherwise - */ - suspend fun isReservePubFound(maybeReservePub: EddsaPublicKey): Boolean = conn { conn -> - val stmt = conn.prepareStatement(""" - SELECT 1 - FROM talerable_incoming_transactions - WHERE reserve_public_key = ?; - """) - stmt.setBytes(1, maybeReservePub.raw) - val res = stmt.executeQuery() - res.use { - return@conn it.next() - } - } - - // INITIATED PAYMENTS METHODS - - /** - * Represents all the states but "unsubmitted" related to an - * initiated payment. Unsubmitted gets set by default by the - * database and there's no case where it has to be reset to an - * initiated payment. - */ - - /** - * Sets the submission state of an initiated payment. Transparently - * sets the last_submission_time column too, as this corresponds to the - * time when we set the state. - * - * @param rowId row ID of the record to set. - * @param submissionState which state to set. - * @return true on success, false if no payment was affected. - */ - suspend fun initiatedPaymentSetSubmittedState( - rowId: Long, - submissionState: DatabaseSubmissionState - ): Boolean = conn { conn -> - val stmt = conn.prepareStatement(""" - UPDATE initiated_outgoing_transactions - SET submitted = submission_state(?), last_submission_time = ? - WHERE initiated_outgoing_transaction_id = ? - """ - ) - val now = Instant.now() - stmt.setString(1, submissionState.name) - stmt.setLong(2, now.toDbMicros() ?: run { - throw Exception("Submission time could not be converted to microseconds for the database.") - }) - stmt.setLong(3, rowId) - return@conn stmt.maybeUpdate() - } - - /** - * Sets the failure reason to an initiated payment. - * - * @param rowId row ID of the record to set. - * @param failureMessage error associated to this initiated payment. - * @return true on success, false if no payment was affected. - */ - suspend fun initiatedPaymentSetFailureMessage(rowId: Long, failureMessage: String): Boolean = conn { conn -> - val stmt = conn.prepareStatement(""" - UPDATE initiated_outgoing_transactions - SET failure_message = ? - WHERE initiated_outgoing_transaction_id=? - """ - ) - stmt.setString(1, failureMessage) - stmt.setLong(2, rowId) - return@conn stmt.maybeUpdate() - } - - /** - * Gets any initiated payment that was not submitted to the - * bank yet. - * - * @param currency in which currency should the payment be submitted to the bank. - * @return [Map] of the initiated payment row ID and [InitiatedPayment] - */ - suspend fun initiatedPaymentsSubmittableGet(currency: String): List<InitiatedPayment> = conn { conn -> - val stmt = conn.prepareStatement(""" - SELECT - initiated_outgoing_transaction_id - ,(amount).val as amount_val - ,(amount).frac as amount_frac - ,wire_transfer_subject - ,credit_payto_uri - ,initiation_time - ,request_uid - FROM initiated_outgoing_transactions - WHERE (submitted='unsubmitted' OR submitted='transient_failure') - AND ((amount).val != 0 OR (amount).frac != 0); - """) - stmt.all { - val rowId = it.getLong("initiated_outgoing_transaction_id") - val initiationTime = it.getLong("initiation_time").microsToJavaInstant() - if (initiationTime == null) { // nexus fault - throw Exception("Found invalid timestamp at initiated payment with ID: $rowId") - } - InitiatedPayment( - id = it.getLong("initiated_outgoing_transaction_id"), - amount = it.getAmount("amount", currency), - creditPaytoUri = it.getString("credit_payto_uri"), - wireTransferSubject = it.getString("wire_transfer_subject"), - initiationTime = initiationTime, - requestUid = it.getString("request_uid") - ) - } - } - /** - * Initiate a payment in the database. The "submit" - * command is then responsible to pick it up and submit - * it to the bank. - * - * @param paymentData any data that's used to prepare the payment. - * @return true if the insertion went through, false in case of errors. - */ - suspend fun initiatedPaymentCreate(paymentData: InitiatedPayment): PaymentInitiationOutcome = conn { conn -> - val stmt = conn.prepareStatement(""" - INSERT INTO initiated_outgoing_transactions ( - amount - ,wire_transfer_subject - ,credit_payto_uri - ,initiation_time - ,request_uid - ) VALUES ( - (?,?)::taler_amount - ,? - ,? - ,? - ,? - ) - """) - stmt.setLong(1, paymentData.amount.value) - stmt.setInt(2, paymentData.amount.frac) - stmt.setString(3, paymentData.wireTransferSubject) - stmt.setString(4, paymentData.creditPaytoUri.toString()) - val initiationTime = paymentData.initiationTime.toDbMicros() ?: run { - throw Exception("Initiation time could not be converted to microseconds for the database.") - } - stmt.setLong(5, initiationTime) - stmt.setString(6, paymentData.requestUid) // can be null. - if (stmt.maybeUpdate()) - return@conn PaymentInitiationOutcome.SUCCESS - /** - * _very_ likely, Nexus didn't check the request idempotency, - * as the row ID would never fall into the following problem. - */ - return@conn PaymentInitiationOutcome.UNIQUE_CONSTRAINT_VIOLATION - } - - /** - * Gets the ID of an initiated payment. Useful to link it to its - * outgoing payment witnessed in a bank record. - * - * @param uid UID as given by Nexus when it initiated the payment. - * This value then gets specified as the MsgId of pain.001, - * and it gets associated by the bank to the booked entries - * in camt.05x reports. - * @return the initiated payment row ID, or null if not found. NOTE: - * null gets returned even when the initiated payment exists, - * *but* it was NOT flagged as submitted. - */ - suspend fun initiatedPaymentGetFromUid(uid: String): Long? = conn { conn -> - val stmt = conn.prepareStatement(""" - SELECT initiated_outgoing_transaction_id - FROM initiated_outgoing_transactions - WHERE request_uid = ? AND submitted = 'success'; - """) - stmt.setString(1, uid) - val res = stmt.executeQuery() - res.use { - if (!it.next()) return@conn null - return@conn it.getLong("initiated_outgoing_transaction_id") - } - } -}
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt index ca7b9116..765037e4 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt @@ -28,6 +28,7 @@ import io.ktor.client.plugins.* import kotlinx.coroutines.* import tech.libeufin.common.* import tech.libeufin.nexus.ebics.* +import tech.libeufin.nexus.db.* import java.io.IOException import java.io.InputStream import java.time.Instant @@ -91,7 +92,7 @@ suspend fun ingestOutgoingPayment( db: Database, payment: OutgoingPayment ) { - val result = db.registerOutgoing(payment) + val result = db.payment.registerOutgoing(payment) if (result.new) { if (result.initiated) logger.info("$payment") @@ -117,7 +118,7 @@ suspend fun ingestIncomingPayment( ) { runCatching { parseIncomingTxMetadata(payment.wireTransferSubject) }.fold( onSuccess = { reservePub -> - val result = db.registerTalerableIncoming(payment, reservePub) + val result = db.payment.registerTalerableIncoming(payment, reservePub) if (result.new) { logger.info("$payment") } else { @@ -125,7 +126,7 @@ suspend fun ingestIncomingPayment( } }, onFailure = { e -> - val result = db.registerMalformedIncoming( + val result = db.payment.registerMalformedIncoming( payment, payment.amount, Instant.now() @@ -139,21 +140,7 @@ suspend fun ingestIncomingPayment( ) } -/** - * Ingests an outgoing payment bounce. - * - * @param db database handle. - * @param reversal reversal ingest. - */ -suspend fun ingestReversal( - db: Database, - reversal: OutgoingReversal -) { - logger.warn("BOUNCE '${reversal.bankId}': ${reversal.reason}") - // TODO store in db= -} - -private fun ingestDocument( +private suspend fun ingestDocument( db: Database, currency: String, xml: InputStream, @@ -162,15 +149,13 @@ private fun ingestDocument( when (whichDocument) { SupportedDocument.CAMT_054 -> { try { - val notifications = mutableListOf<TxNotification>() - parseTxNotif(xml, currency, notifications) - - runBlocking { - notifications.forEach { - when (it) { - is TxNotification.Incoming -> ingestIncomingPayment(db, it.payment) - is TxNotification.Outgoing -> ingestOutgoingPayment(db, it.payment) - is TxNotification.Reversal -> ingestReversal(db, it.reversal) + parseTxNotif(xml, currency).forEach { + when (it) { + is TxNotification.Incoming -> ingestIncomingPayment(db, it.payment) + is TxNotification.Outgoing -> ingestOutgoingPayment(db, it.payment) + is TxNotification.Reversal -> { + logger.warn("BOUNCE '${it.msgId}': ${it.reason}") + db.initiated.reversal(it.msgId, "Payment bounced: ${it.reason}") } } } @@ -181,42 +166,38 @@ private fun ingestDocument( SupportedDocument.PAIN_002_LOGS -> { val acks = parseCustomerAck(xml) for (ack in acks) { - val msg = if (ack.orderId != null) { - if (ack.code != null) { - val msg = ack.msg() - db.mem[ack.orderId] = msg - msg - } else { - db.mem[ack.orderId] - } - } else { - null - } when (ack.actionType) { - HacAction.FILE_DOWNLOAD -> logger.debug("$ack") HacAction.ORDER_HAC_FINAL_POS -> { - // TODO update pending transaction status logger.debug("$ack") - logger.info("Order '${ack.orderId}' was accepted at ${ack.timestamp.fmtDateTime()}") + db.initiated.logSuccess(ack.orderId!!)?.let { requestUID -> + logger.info("Payment '$requestUID' accepted at ${ack.timestamp.fmtDateTime()}") + } } HacAction.ORDER_HAC_FINAL_NEG -> { - // TODO update pending transaction status logger.debug("$ack") - logger.warn("Order '${ack.orderId}' was refused at ${ack.timestamp.fmtDateTime()}: $msg") + db.initiated.logFailure(ack.orderId!!)?.let { (requestUID, msg) -> + logger.warn("Payment '$requestUID' refused at ${ack.timestamp.fmtDateTime()}${if (msg != null) ": $msg" else ""}") + } } else -> { - // TODO update pending transaction status logger.debug("$ack") + if (ack.orderId != null) { + db.initiated.logMessage(ack.orderId, ack.msg()) + } } } } } SupportedDocument.PAIN_002 -> { val status = parseCustomerPaymentStatusReport(xml) - if (status.paymentCode == ExternalPaymentGroupStatusCode.RJCT) - logger.warn("Transaction '${status.id()}' was rejected") - // TODO update pending transaction status + val msg = status.msg() logger.debug("$status") + if (status.paymentCode == ExternalPaymentGroupStatusCode.RJCT) { + db.initiated.bankFailure(status.msgId, msg) + logger.warn("Transaction '${status.msgId}' was rejected : $msg") + } else { + db.initiated.bankMessage(status.msgId, msg) + } } SupportedDocument.CAMT_053, SupportedDocument.CAMT_052 -> { @@ -226,7 +207,7 @@ private fun ingestDocument( } } -private fun ingestDocuments( +private suspend fun ingestDocuments( db: Database, currency: String, content: InputStream, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt index 82c1a459..656ce694 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt @@ -26,6 +26,7 @@ import io.ktor.client.* import kotlinx.coroutines.* import tech.libeufin.common.* import tech.libeufin.nexus.ebics.* +import tech.libeufin.nexus.db.* import java.time.* import java.util.* @@ -109,19 +110,19 @@ private suspend fun submitBatch( ctx: SubmissionContext, db: Database, ) { - logger.debug("Running submit at: ${Instant.now()}") - db.initiatedPaymentsSubmittableGet(ctx.cfg.currency).forEach { - logger.debug("Submitting payment initiation with row ID: ${it.id}") - val submissionState = try { - val orderId = submitInitiatedPayment(ctx, it) - db.mem[orderId] = "Init" - DatabaseSubmissionState.success - } catch (e: Exception) { - e.fmtLog(logger) - DatabaseSubmissionState.transient_failure - // TODO - } - db.initiatedPaymentSetSubmittedState(it.id, submissionState) + db.initiated.submittableGet(ctx.cfg.currency).forEach { + logger.debug("Submitting payment '${it.requestUid}'") + runCatching { submitInitiatedPayment(ctx, it) }.fold( + onSuccess = { orderId -> + db.initiated.submissionSuccess(it.id, Instant.now(), orderId) + logger.info("Payment '${it.requestUid}' submitted") + }, + onFailure = { e -> + db.initiated.submissionFailure(it.id, Instant.now(), e.message) + logger.warn("Payment '${it.requestUid}' submission failure: ${e.message}") + throw e + } + ) } } @@ -171,8 +172,11 @@ class EbicsSubmit : CliktCommand("Submits any initiated payment found in the dat } } do { - // TODO error handling - submitBatch(ctx, db) + try { + submitBatch(ctx, db) + } catch (e: Exception) { + throw Exception("Failed to submit payments", e) + } // TODO take submitBatch taken time in the delay delay(((frequency?.inSeconds ?: 0) * 1000).toLong()) } while (frequency != null) diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt index a9f44077..6269377a 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Iso20022.kt @@ -143,11 +143,7 @@ data class CustomerAck( } } -/** - * Extract logs from a pain.002 HAC document. - * - * @param xml pain.002 input document - */ +/** Parse HAC pain.002 XML file */ fun parseCustomerAck(xml: InputStream): List<CustomerAck> { return destructXml(xml, "Document") { one("CstmrPmtStsRpt").map("OrgnlPmtInfAndSts") { @@ -192,19 +188,21 @@ data class PaymentStatus( fun description(): String = txCode?.description ?: paymentCode.description - override fun toString(): String { + fun msg(): String { return if (reasons.isEmpty()) { - "'${id()}' ${code()} '${description()}'" + "${code()} '${description()}'" } else if (reasons.size == 1) { - "'${id()}' ${code()} ${reasons[0].code.isoCode} - '${description()}' '${reasons[0].code.description}'" + "${code()} ${reasons[0].code.isoCode} - '${description()}' '${reasons[0].code.description}'" } else { - var str = "'${id()}' ${code()} '${description()}' - " + var str = "${code()} '${description()}' - " for (reason in reasons) { - str += "${reason.code.isoCode} '${reason.code.description}'" + str += "${reason.code.isoCode} '${reason.code.description}' " } str } } + + override fun toString(): String = "${id()} ${msg()}" } data class Reason ( @@ -212,11 +210,7 @@ data class Reason ( val information: String ) -/** - * Extract payment status from a pain.002 document. - * - * @param xml pain.002 input document - */ +/** Parse pain.002 XML file */ fun parseCustomerPaymentStatusReport(xml: InputStream): PaymentStatus { fun XmlDestructor.reasons(): List<Reason> { return map("StsRsnInf") { @@ -252,28 +246,74 @@ fun parseCustomerPaymentStatusReport(xml: InputStream): PaymentStatus { sealed interface TxNotification { data class Incoming(val payment: IncomingPayment): TxNotification data class Outgoing(val payment: OutgoingPayment): TxNotification - data class Reversal(val reversal: OutgoingReversal): TxNotification + data class Reversal( + val msgId: String, + val reason: String? + ): TxNotification } -data class OutgoingReversal( - val bankId: String, - val reason: String? -) +/** ISO20022 incoming payment */ +data class IncomingPayment( + val amount: TalerAmount, + val wireTransferSubject: String, + val debitPaytoUri: String, + val executionTime: Instant, + /** ISO20022 AccountServicerReference */ + val bankId: String +) { + override fun toString(): String { + return "IN ${executionTime.fmtDate()} $amount '$bankId' debitor=$debitPaytoUri subject=$wireTransferSubject" + } +} -/** - * Searches payments in a camt.054 (Detailavisierung) document. - * - * @param notifXml camt.054 input document - * @param acceptedCurrency currency accepted by Nexus - * @param incoming list of incoming payments - * @param outgoing list of outgoing payments - */ +/** ISO20022 outgoing payment */ +data class OutgoingPayment( + val amount: TalerAmount, + val executionTime: Instant, + /** ISO20022 MessageIdentification */ + val messageId: String, + val creditPaytoUri: String? = null, // not showing in camt.054 + val wireTransferSubject: String? = null // not showing in camt.054 +) { + override fun toString(): String { + return "OUT ${executionTime.fmtDate()} $amount '$messageId' creditor=$creditPaytoUri subject=$wireTransferSubject" + } +} + +/** Parse camt.054 XML file */ fun parseTxNotif( notifXml: InputStream, - acceptedCurrency: String, - notifications: MutableList<TxNotification>, -) { - notificationForEachTx(notifXml) { bookDate, reversal, info -> + acceptedCurrency: String +): List<TxNotification> { + fun notificationForEachTx( + directionLambda: XmlDestructor.(Instant, Boolean, String?) -> Unit + ) { + destructXml(notifXml, "Document") { + opt("BkToCstmrDbtCdtNtfctn")?.each("Ntfctn") { + each("Ntry") { + val reversal = opt("RvslInd")?.bool() ?: false + val info = opt("AddtlNtryInf")?.text() + one("Sts") { + if (text() != "BOOK") { + one("Cd") { + if (text() != "BOOK") + throw Exception("Found non booked transaction, " + + "stop parsing. Status was: ${text()}" + ) + } + } + } + val bookDate: Instant = one("BookgDt").one("Dt").date().atStartOfDay().toInstant(ZoneOffset.UTC) + one("NtryDtls").each("TxDtls") { + directionLambda(this, bookDate, reversal, info) + } + } + } + } + } + + val notifications = mutableListOf<TxNotification>() + notificationForEachTx { bookDate, reversal, info -> val kind = one("CdtDbtInd").text() val amount: TalerAmount = one("Amt") { val currency = attr("Ccy") @@ -289,10 +329,10 @@ fun parseTxNotif( if (msgId == null) { logger.debug("Unsupported reversal without message id") } else { - notifications.add(TxNotification.Reversal(OutgoingReversal( - bankId = msgId, + notifications.add(TxNotification.Reversal( + msgId = msgId, reason = info - ))) + )) } return@notificationForEachTx } @@ -341,38 +381,5 @@ fun parseTxNotif( else -> throw Exception("Unknown transaction notification kind '$kind'") } } -} - -/** - * Navigates the camt.054 (Detailavisierung) until its leaves, where - * then it invokes the related parser, according to the payment direction. - * - * @param xml the input document. - */ -private fun notificationForEachTx( - xml: InputStream, - directionLambda: XmlDestructor.(Instant, Boolean, String?) -> Unit -) { - destructXml(xml, "Document") { - opt("BkToCstmrDbtCdtNtfctn")?.each("Ntfctn") { - each("Ntry") { - val reversal = opt("RvslInd")?.bool() ?: false - val info = opt("AddtlNtryInf")?.text() - one("Sts") { - if (text() != "BOOK") { - one("Cd") { - if (text() != "BOOK") - throw Exception("Found non booked transaction, " + - "stop parsing. Status was: ${text()}" - ) - } - } - } - val bookDate: Instant = one("BookgDt").one("Dt").date().atStartOfDay().toInstant(ZoneOffset.UTC) - one("NtryDtls").each("TxDtls") { - directionLambda(this, bookDate, reversal, info) - } - } - } - } + return notifications }
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt index 14c0f27c..8a51c766 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/Main.kt @@ -37,8 +37,11 @@ import org.slf4j.Logger import org.slf4j.LoggerFactory import tech.libeufin.common.* import tech.libeufin.nexus.ebics.* +import tech.libeufin.nexus.db.* import java.nio.file.Path -import java.time.Instant +import java.util.* +import java.time.* +import java.time.format.* val NEXUS_CONFIG_SOURCE = ConfigSource("libeufin", "libeufin-nexus", "libeufin-nexus") internal val logger: Logger = LoggerFactory.getLogger("libeufin-nexus") @@ -126,6 +129,11 @@ fun checkFrequency(foundInConfig: String): Int { return frequencySeconds } +fun Instant.fmtDate(): String = + DateTimeFormatter.ISO_LOCAL_DATE.withZone(ZoneId.of("UTC")).format(this) + +fun Instant.fmtDateTime(): String = + DateTimeFormatter.ISO_LOCAL_DATE_TIME.withZone(ZoneId.of("UTC")).format(this) /** * Keeps all the options of the ebics-setup subcommand. The @@ -251,7 +259,7 @@ class InitiatePayment: CliktCommand("Initiate an outgoing payment") { } Database(dbCfg.dbConnStr).use { db -> - db.initiatedPaymentCreate( + db.initiated.create( InitiatedPayment( id = -1, amount = amount, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt new file mode 100644 index 00000000..2827b5f3 --- /dev/null +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt @@ -0,0 +1,49 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ +package tech.libeufin.nexus.db + +import org.postgresql.util.PSQLState +import tech.libeufin.common.* +import tech.libeufin.nexus.* +import java.sql.PreparedStatement +import java.sql.SQLException +import java.text.SimpleDateFormat +import java.time.Instant +import java.util.* + +/** + * Minimal set of information to initiate a new payment in + * the database. + */ +data class InitiatedPayment( + val id: Long, + val amount: TalerAmount, + val wireTransferSubject: String, + val creditPaytoUri: String, + val initiationTime: Instant, + val requestUid: String +) + +/** + * Collects database connection steps and any operation on the Nexus tables. + */ +class Database(dbConfig: String): DbPool(dbConfig, "libeufin_nexus") { + val payment = PaymentDAO(this) + val initiated = InitiatedDAO(this) +}
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt new file mode 100644 index 00000000..efe98f82 --- /dev/null +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt @@ -0,0 +1,203 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.nexus.db + +import tech.libeufin.nexus.* +import tech.libeufin.common.* +import java.time.Instant + +/** Data access logic for initiated outgoing payments */ +class InitiatedDAO(private val db: Database) { + + /** Outgoing payments initiation result */ + enum class PaymentInitiationResult { + REQUEST_UID_REUSE, + SUCCESS + } + + /** Register a new pending payment in the database */ + suspend fun create(paymentData: InitiatedPayment): PaymentInitiationResult = db.conn { conn -> + val stmt = conn.prepareStatement(""" + INSERT INTO initiated_outgoing_transactions ( + amount + ,wire_transfer_subject + ,credit_payto_uri + ,initiation_time + ,request_uid + ) VALUES ((?,?)::taler_amount,?,?,?,?) + """) + stmt.setLong(1, paymentData.amount.value) + stmt.setInt(2, paymentData.amount.frac) + stmt.setString(3, paymentData.wireTransferSubject) + stmt.setString(4, paymentData.creditPaytoUri.toString()) + val initiationTime = paymentData.initiationTime.toDbMicros() ?: run { + throw Exception("Initiation time could not be converted to microseconds for the database.") + } + stmt.setLong(5, initiationTime) + stmt.setString(6, paymentData.requestUid) + if (stmt.executeUpdateViolation()) + return@conn PaymentInitiationResult.SUCCESS + return@conn PaymentInitiationResult.REQUEST_UID_REUSE + } + + /** Register EBICS submission success */ + suspend fun submissionSuccess( + id: Long, + now: Instant, + orderId: String + ) = db.conn { conn -> + val stmt = conn.prepareStatement(""" + UPDATE initiated_outgoing_transactions SET + submitted = 'success'::submission_state + ,last_submission_time = ? + ,failure_message = NULL + ,order_id = ? + ,submission_counter = submission_counter + 1 + WHERE initiated_outgoing_transaction_id = ? + """) + stmt.setLong(1, now.toDbMicros()!!) + stmt.setString(2, orderId) + stmt.setLong(3, id) + stmt.execute() + } + + /** Register EBICS submission failure */ + suspend fun submissionFailure( + id: Long, + now: Instant, + msg: String? + ) = db.conn { conn -> + val stmt = conn.prepareStatement(""" + UPDATE initiated_outgoing_transactions SET + submitted = 'transient_failure'::submission_state + ,last_submission_time = ? + ,failure_message = ? + ,submission_counter = submission_counter + 1 + WHERE initiated_outgoing_transaction_id = ? + """) + stmt.setLong(1, now.toDbMicros()!!) + stmt.setString(2, msg) + stmt.setLong(3, id) + stmt.execute() + } + + /** Register EBICS log status message */ + suspend fun logMessage(orderId: String, msg: String) = db.conn { conn -> + val stmt = conn.prepareStatement(""" + UPDATE initiated_outgoing_transactions SET failure_message = ? + WHERE order_id = ? + """) + stmt.setString(1, msg) + stmt.setString(2, orderId) + stmt.execute() + } + + /** Register EBICS log success and return request_uid if found */ + suspend fun logSuccess(orderId: String): String? = db.conn { conn -> + val stmt = conn.prepareStatement(""" + SELECT request_uid FROM initiated_outgoing_transactions + WHERE order_id = ? + """) + stmt.setString(1, orderId) + stmt.oneOrNull { it.getString(1) } + } + + /** Register EBICS log failure and return request_uid and previous message if found */ + suspend fun logFailure(orderId: String): Pair<String, String?>? = db.conn { conn -> + val stmt = conn.prepareStatement(""" + UPDATE initiated_outgoing_transactions + SET submitted = 'permanent_failure'::submission_state + WHERE order_id = ? + RETURNING request_uid, failure_message + """) + stmt.setString(1, orderId) + stmt.oneOrNull { Pair(it.getString(1), it.getString(2)) } + } + + /** Register bank status message */ + suspend fun bankMessage(requestUID: String, msg: String) = db.conn { conn -> + val stmt = conn.prepareStatement(""" + UPDATE initiated_outgoing_transactions + SET failure_message = ? + WHERE request_uid = ? + """) + stmt.setString(1, msg) + stmt.setString(2, requestUID) + stmt.execute() + } + + /** Register bank failure */ + suspend fun bankFailure(requestUID: String, msg: String) = db.conn { conn -> + val stmt = conn.prepareStatement(""" + UPDATE initiated_outgoing_transactions SET + submitted = 'permanent_failure'::submission_state + ,failure_message = ? + WHERE request_uid = ? + """) + stmt.setString(1, msg) + stmt.setString(2, requestUID) + stmt.execute() + } + + /** Register reversal */ + suspend fun reversal(requestUID: String, msg: String) = db.conn { conn -> + val stmt = conn.prepareStatement(""" + UPDATE initiated_outgoing_transactions SET + submitted = 'permanent_failure'::submission_state + ,failure_message = ? + WHERE request_uid = ? + """) + stmt.setString(1, msg) + stmt.setString(2, requestUID) + stmt.execute() + } + + // TODO WIP + suspend fun submittableGet(currency: String): List<InitiatedPayment> = db.conn { conn -> + val stmt = conn.prepareStatement(""" + SELECT + initiated_outgoing_transaction_id + ,(amount).val as amount_val + ,(amount).frac as amount_frac + ,wire_transfer_subject + ,credit_payto_uri + ,initiation_time + ,request_uid + FROM initiated_outgoing_transactions + WHERE (submitted='unsubmitted' OR submitted='transient_failure') + AND ((amount).val != 0 OR (amount).frac != 0); + """) + stmt.all { + val rowId = it.getLong("initiated_outgoing_transaction_id") + val initiationTime = it.getLong("initiation_time").microsToJavaInstant() + if (initiationTime == null) { // nexus fault + throw Exception("Found invalid timestamp at initiated payment with ID: $rowId") + } + InitiatedPayment( + id = it.getLong("initiated_outgoing_transaction_id"), + amount = it.getAmount("amount", currency), + creditPaytoUri = it.getString("credit_payto_uri"), + wireTransferSubject = it.getString("wire_transfer_subject"), + initiationTime = initiationTime, + requestUid = it.getString("request_uid") + ) + } + } +}
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt new file mode 100644 index 00000000..2e315f38 --- /dev/null +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt @@ -0,0 +1,128 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +package tech.libeufin.nexus.db + +import tech.libeufin.nexus.* +import tech.libeufin.common.* +import java.time.Instant + +/** Data access logic for incoming & outgoing payments */ +class PaymentDAO(private val db: Database) { + /** Outgoing payments registration result */ + data class OutgoingRegistrationResult( + val id: Long, + val initiated: Boolean, + val new: Boolean + ) + + /** Register an outgoing payment reconciling it with its initiated payment counterpart if present */ + suspend fun registerOutgoing(paymentData: OutgoingPayment): OutgoingRegistrationResult = db.conn { + val stmt = it.prepareStatement(""" + SELECT out_tx_id, out_initiated, out_found + FROM register_outgoing((?,?)::taler_amount,?,?,?,?) + """) + val executionTime = paymentData.executionTime.toDbMicros() + ?: throw Exception("Could not convert outgoing payment execution_time to microseconds") + stmt.setLong(1, paymentData.amount.value) + stmt.setInt(2, paymentData.amount.frac) + stmt.setString(3, paymentData.wireTransferSubject) + stmt.setLong(4, executionTime) + stmt.setString(5, paymentData.creditPaytoUri) + stmt.setString(6, paymentData.messageId) + stmt.one { + OutgoingRegistrationResult( + it.getLong("out_tx_id"), + it.getBoolean("out_initiated"), + !it.getBoolean("out_found") + ) + } + } + + /** Incoming payments bounce registration result */ + data class IncomingBounceRegistrationResult( + val id: Long, + val bounceId: String, + val new: Boolean + ) + + /** Register an incoming payment and bounce it */ + suspend fun registerMalformedIncoming( + paymentData: IncomingPayment, + bounceAmount: TalerAmount, + now: Instant + ): IncomingBounceRegistrationResult = db.conn { + val stmt = it.prepareStatement(""" + SELECT out_found, out_tx_id, out_bounce_id + FROM register_incoming_and_bounce((?,?)::taler_amount,?,?,?,?,(?,?)::taler_amount,?) + """) + val refundTimestamp = now.toDbMicros() + ?: throw Exception("Could not convert refund execution time from Instant.now() to microsends.") + val executionTime = paymentData.executionTime.toDbMicros() + ?: throw Exception("Could not convert payment execution time from Instant to microseconds.") + stmt.setLong(1, paymentData.amount.value) + stmt.setInt(2, paymentData.amount.frac) + stmt.setString(3, paymentData.wireTransferSubject) + stmt.setLong(4, executionTime) + stmt.setString(5, paymentData.debitPaytoUri) + stmt.setString(6, paymentData.bankId) + stmt.setLong(7, bounceAmount.value) + stmt.setInt(8, bounceAmount.frac) + stmt.setLong(9, refundTimestamp) + stmt.one { + IncomingBounceRegistrationResult( + it.getLong("out_tx_id"), + it.getString("out_bounce_id"), + !it.getBoolean("out_found") + ) + } + } + + /** Incoming payments registration result */ + data class IncomingRegistrationResult( + val id: Long, + val new: Boolean + ) + + /** Register an talerable incoming payment */ + suspend fun registerTalerableIncoming( + paymentData: IncomingPayment, + reservePub: EddsaPublicKey + ): IncomingRegistrationResult = db.conn { conn -> + val stmt = conn.prepareStatement(""" + SELECT out_found, out_tx_id + FROM register_incoming_and_talerable((?,?)::taler_amount,?,?,?,?,?) + """) + val executionTime = paymentData.executionTime.toDbMicros() + ?: throw Exception("Could not convert payment execution time from Instant to microseconds.") + stmt.setLong(1, paymentData.amount.value) + stmt.setInt(2, paymentData.amount.frac) + stmt.setString(3, paymentData.wireTransferSubject) + stmt.setLong(4, executionTime) + stmt.setString(5, paymentData.debitPaytoUri) + stmt.setString(6, paymentData.bankId) + stmt.setBytes(7, reservePub.raw) + stmt.one { + IncomingRegistrationResult( + it.getLong("out_tx_id"), + !it.getBoolean("out_found") + ) + } + } +}
\ No newline at end of file diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsBTS.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsBTS.kt index 356d4b96..a6466366 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsBTS.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsBTS.kt @@ -33,8 +33,10 @@ import javax.xml.datatype.DatatypeFactory import java.security.interfaces.* -fun Instant.xmlDate(): String = DateTimeFormatter.ISO_DATE.withZone(ZoneId.of("UTC")).format(this) -fun Instant.xmlDateTime(): String = DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.of("UTC")).format(this) +fun Instant.xmlDate(): String = + DateTimeFormatter.ISO_DATE.withZone(ZoneId.of("UTC")).format(this) +fun Instant.xmlDateTime(): String = + DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.of("UTC")).format(this) /** EBICS protocol for business transactions */ class EbicsBTS( diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt index ae4233c4..77cefeb3 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/ebics/EbicsCommon.kt @@ -173,7 +173,7 @@ suspend fun ebicsDownload( order: EbicsOrder, startDate: Instant?, endDate: Instant?, - processing: (InputStream) -> Unit, + processing: suspend (InputStream) -> Unit, ) = coroutineScope { val impl = EbicsBTS(cfg, bankKeys, clientKeys, order) val parentScope = this diff --git a/nexus/src/test/kotlin/Common.kt b/nexus/src/test/kotlin/Common.kt index 892f7af9..8925a6a8 100644 --- a/nexus/src/test/kotlin/Common.kt +++ b/nexus/src/test/kotlin/Common.kt @@ -26,6 +26,7 @@ import tech.libeufin.common.fromFile import tech.libeufin.common.initializeDatabaseTables import tech.libeufin.common.resetDatabaseTables import tech.libeufin.nexus.* +import tech.libeufin.nexus.db.* import java.time.Instant import kotlin.io.path.Path diff --git a/nexus/src/test/kotlin/DatabaseTest.kt b/nexus/src/test/kotlin/DatabaseTest.kt index 1f27a782..c9d70ae0 100644 --- a/nexus/src/test/kotlin/DatabaseTest.kt +++ b/nexus/src/test/kotlin/DatabaseTest.kt @@ -19,9 +19,10 @@ import org.junit.Test import tech.libeufin.common.* -import tech.libeufin.nexus.DatabaseSubmissionState -import tech.libeufin.nexus.InitiatedPayment -import tech.libeufin.nexus.PaymentInitiationOutcome +import tech.libeufin.nexus.* +import tech.libeufin.nexus.db.* +import tech.libeufin.nexus.db.InitiatedDAO.* +import tech.libeufin.nexus.db.PaymentDAO.* import java.time.Instant import kotlin.random.Random import kotlin.test.* @@ -32,25 +33,25 @@ class OutgoingPaymentsTest { // With reconciling genOutPay("paid by nexus", "first").run { assertEquals( - PaymentInitiationOutcome.SUCCESS, - db.initiatedPaymentCreate(genInitPay("waiting for reconciliation", "first")) + PaymentInitiationResult.SUCCESS, + db.initiated.create(genInitPay("waiting for reconciliation", "first")) ) - db.registerOutgoing(this).run { + db.payment.registerOutgoing(this).run { assertTrue(new,) assertTrue(initiated) } - db.registerOutgoing(this).run { + db.payment.registerOutgoing(this).run { assertFalse(new) assertTrue(initiated) } } // Without reconciling genOutPay("not paid by nexus", "second").run { - db.registerOutgoing(this).run { + db.payment.registerOutgoing(this).run { assertTrue(new) assertFalse(initiated) } - db.registerOutgoing(this).run { + db.payment.registerOutgoing(this).run { assertFalse(new) assertFalse(initiated) } @@ -64,14 +65,14 @@ class IncomingPaymentsTest { fun bounce() = setup { db, _ -> // creating and bouncing one incoming transaction. val payment = genInPay("incoming and bounced") - db.registerMalformedIncoming( + db.payment.registerMalformedIncoming( payment, TalerAmount("KUDOS:2.53"), Instant.now() ).run { assertTrue(new) } - db.registerMalformedIncoming( + db.payment.registerMalformedIncoming( payment, TalerAmount("KUDOS:2.53"), Instant.now() @@ -112,79 +113,60 @@ class IncomingPaymentsTest { ) } } - - // Tests the creation of a talerable incoming payment. - @Test - fun talerable() = setup { db, _ -> - val reservePub = EddsaPublicKey.rand() - - val inc = genInPay("reserve-pub") - // Checking the reserve is not found. - assertFalse(db.isReservePubFound(reservePub)) - assertTrue(db.registerTalerableIncoming(inc, reservePub).new) - // Checking the reserve is not found. - assertTrue(db.isReservePubFound(reservePub)) - assertFalse(db.registerTalerableIncoming(inc, reservePub).new) - } } class PaymentInitiationsTest { - // Testing the insertion of the failure message. @Test - fun setFailureMessage() = setup { db, _ -> + fun status() = setup { db, _ -> assertEquals( - db.initiatedPaymentCreate(genInitPay("not submitted, has row ID == 1")), - PaymentInitiationOutcome.SUCCESS + PaymentInitiationResult.SUCCESS, + db.initiated.create(genInitPay(requestUid = "PAY1")) ) - assertFalse(db.initiatedPaymentSetFailureMessage(3, "3 not existing")) - assertTrue(db.initiatedPaymentSetFailureMessage(1, "expired")) - // Checking the value from the database. - db.conn { conn -> - val idOne = conn.execSQLQuery(""" - SELECT failure_message - FROM initiated_outgoing_transactions - WHERE initiated_outgoing_transaction_id = 1; - """.trimIndent()) - assertTrue(idOne.next()) - val maybeMessage = idOne.getString("failure_message") - assertEquals("expired", maybeMessage) - } - } - // Tests the flagging of payments as submitted. - @Test - fun paymentInitiationSetAsSubmitted() = setup { db, _ -> - val getRowOne = """ - SELECT submitted - FROM initiated_outgoing_transactions - WHERE initiated_outgoing_transaction_id=1 - """ + db.initiated.submissionFailure(1, Instant.now(), "First failure") + db.initiated.submissionFailure(1, Instant.now(), "Second failure") + db.initiated.submissionSuccess(1, Instant.now(), "ORDER1") + assertEquals(Pair("PAY1", null), db.initiated.logFailure("ORDER1")) - // Creating the record first. Defaults to submitted == false. assertEquals( - PaymentInitiationOutcome.SUCCESS, - db.initiatedPaymentCreate(genInitPay("not submitted, has row ID == 1")), + PaymentInitiationResult.SUCCESS, + db.initiated.create(genInitPay(requestUid = "PAY2")) ) - // Asserting on the false default submitted state. - db.conn { conn -> - val isSubmitted = conn.execSQLQuery(getRowOne) - assertTrue(isSubmitted.next()) - assertEquals("unsubmitted", isSubmitted.getString("submitted")) - } - // Switching the submitted state to success. - assertTrue(db.initiatedPaymentSetSubmittedState(1, DatabaseSubmissionState.success)) - // Asserting on the submitted state being TRUE now. - db.conn { conn -> - val isSubmitted = conn.execSQLQuery(getRowOne) - assertTrue(isSubmitted.next()) - assertEquals("success", isSubmitted.getString("submitted")) - } + db.initiated.submissionFailure(2, Instant.now(), "First failure") + db.initiated.submissionSuccess(2, Instant.now(), "ORDER2") + db.initiated.logMessage("ORDER2", "status msg") + assertEquals(Pair("PAY2", "status msg"), db.initiated.logFailure("ORDER2")) + + assertEquals( + PaymentInitiationResult.SUCCESS, + db.initiated.create(genInitPay(requestUid = "PAY3")) + ) + db.initiated.submissionSuccess(3, Instant.now(), "ORDER3") + assertEquals("PAY3", db.initiated.logSuccess("ORDER3")) + + // Unknown order + assertNull(db.initiated.logSuccess("ORDER_X")) + assertNull(db.initiated.logFailure("ORDER_X")) + + assertEquals( + PaymentInitiationResult.SUCCESS, + db.initiated.create(genInitPay(requestUid = "PAY4")) + ) + db.initiated.bankMessage("PAY4", "status progress") + db.initiated.bankFailure("PAY4", "status failure") + + assertEquals( + PaymentInitiationResult.SUCCESS, + db.initiated.create(genInitPay(requestUid = "PAY5")) + ) + db.initiated.bankMessage("PAY5", "status progress") + db.initiated.reversal("PAY5", "status reversal") } // Tests creation, unique constraint violation handling, and // retrieving only one non-submitted payment. @Test fun paymentInitiation() = setup { db, _ -> - val beEmpty = db.initiatedPaymentsSubmittableGet("KUDOS") // expect no records. + val beEmpty = db.initiated.submittableGet("KUDOS") // expect no records. assertEquals(beEmpty.size, 0) val initPay = InitiatedPayment( id = -1, @@ -194,17 +176,14 @@ class PaymentInitiationsTest { requestUid = "unique", initiationTime = Instant.now() ) - assertNull(db.initiatedPaymentGetFromUid("unique")) - assertEquals(db.initiatedPaymentCreate(initPay), PaymentInitiationOutcome.SUCCESS) - assertEquals(db.initiatedPaymentCreate(initPay), PaymentInitiationOutcome.UNIQUE_CONSTRAINT_VIOLATION) - val haveOne = db.initiatedPaymentsSubmittableGet("KUDOS") + assertEquals(db.initiated.create(initPay), PaymentInitiationResult.SUCCESS) + assertEquals(db.initiated.create(initPay), PaymentInitiationResult.REQUEST_UID_REUSE) + val haveOne = db.initiated.submittableGet("KUDOS") assertTrue("Size ${haveOne.size} instead of 1") { haveOne.size == 1 && haveOne.first().id == 1L && haveOne.first().requestUid == "unique" } - assertTrue(db.initiatedPaymentSetSubmittedState(1, DatabaseSubmissionState.success)) - assertNotNull(db.initiatedPaymentGetFromUid("unique")) } /** @@ -213,43 +192,39 @@ class PaymentInitiationsTest { */ @Test fun submittablePayments() = setup { db, _ -> - val beEmpty = db.initiatedPaymentsSubmittableGet("KUDOS") + val beEmpty = db.initiated.submittableGet("KUDOS") assertEquals(0, beEmpty.size) assertEquals( - db.initiatedPaymentCreate(genInitPay(requestUid = "first")), - PaymentInitiationOutcome.SUCCESS + db.initiated.create(genInitPay(requestUid = "first")), + PaymentInitiationResult.SUCCESS ) assertEquals( - db.initiatedPaymentCreate(genInitPay(requestUid = "second")), - PaymentInitiationOutcome.SUCCESS + db.initiated.create(genInitPay(requestUid = "second")), + PaymentInitiationResult.SUCCESS ) assertEquals( - db.initiatedPaymentCreate(genInitPay(requestUid = "third")), - PaymentInitiationOutcome.SUCCESS + db.initiated.create(genInitPay(requestUid = "third")), + PaymentInitiationResult.SUCCESS ) // Setting the first as "transient_failure", must be found. - assertTrue(db.initiatedPaymentSetSubmittedState( - 1, DatabaseSubmissionState.transient_failure - )) + db.initiated.submissionSuccess(1, Instant.now(), "Failure") // Setting the second as "success", must not be found. - assertTrue(db.initiatedPaymentSetSubmittedState( - 2, DatabaseSubmissionState.success - )) - val expectTwo = db.initiatedPaymentsSubmittableGet("KUDOS") + db.initiated.submissionSuccess(2, Instant.now(), "ORDER1234") + val expectTwo = db.initiated.submittableGet("KUDOS") // the third initiation keeps the default "unsubmitted" // state, must be found. Total 2. - assertEquals(2, expectTwo.size) + assertEquals(1, expectTwo.size) } // Tests how the fetch method gets the list of // multiple unsubmitted payment initiations. @Test fun paymentInitiationsMultiple() = setup { db, _ -> - assertEquals(db.initiatedPaymentCreate(genInitPay("#1", "unique1")), PaymentInitiationOutcome.SUCCESS) - assertEquals(db.initiatedPaymentCreate(genInitPay("#2", "unique2")), PaymentInitiationOutcome.SUCCESS) - assertEquals(db.initiatedPaymentCreate(genInitPay("#3", "unique3")), PaymentInitiationOutcome.SUCCESS) - assertEquals(db.initiatedPaymentCreate(genInitPay("#4", "unique4")), PaymentInitiationOutcome.SUCCESS) + assertEquals(db.initiated.create(genInitPay("#1", "unique1")), PaymentInitiationResult.SUCCESS) + assertEquals(db.initiated.create(genInitPay("#2", "unique2")), PaymentInitiationResult.SUCCESS) + assertEquals(db.initiated.create(genInitPay("#3", "unique3")), PaymentInitiationResult.SUCCESS) + assertEquals(db.initiated.create(genInitPay("#4", "unique4")), PaymentInitiationResult.SUCCESS) // Marking one as submitted, hence not expecting it in the results. db.conn { conn -> @@ -261,8 +236,7 @@ class PaymentInitiationsTest { } // Expecting all the payments BUT the #3 in the result. - db.initiatedPaymentsSubmittableGet("KUDOS").apply { - + db.initiated.submittableGet("KUDOS").apply { assertEquals(3, this.size) assertEquals("#1", this[0].wireTransferSubject) assertEquals("#2", this[1].wireTransferSubject) |