libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit 1cf29ab00155e2e16a593a6f4b43016fbccb1883
parent 5ca91639643d01aaedb33261f56dbf8d67da0bed
Author: Antoine A <>
Date:   Wed, 18 Sep 2024 11:13:12 +0200

nexus: improve status logic

Diffstat:
Mdatabase-versioning/libeufin-nexus-procedures.sql | 32++++++++++++++++----------------
Mnexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt | 16+++++++++++++---
Mnexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt | 150++++++++++++++++++++++++++++++++++++++-----------------------------------------
Mnexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt | 72++++++++++++++++++++++++++++++++++++------------------------------------
Mnexus/src/test/kotlin/DatabaseTest.kt | 225++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------
Rnexus/src/test/kotlin/Registration.kt -> nexus/src/test/kotlin/RegistrationTest.kt | 0
Mnexus/src/test/kotlin/helpers.kt | 22++++++++++++----------
7 files changed, 315 insertions(+), 202 deletions(-)

diff --git a/database-versioning/libeufin-nexus-procedures.sql b/database-versioning/libeufin-nexus-procedures.sql @@ -139,13 +139,13 @@ END IF; IF NOT out_found THEN -- Store the transaction in the database INSERT INTO outgoing_transactions ( - amount + amount ,subject ,execution_time ,credit_payto ,end_to_end_id ) VALUES ( - in_amount + in_amount ,in_subject ,in_execution_time ,in_credit_payto @@ -170,21 +170,21 @@ IF NOT out_found THEN PERFORM pg_notify('outgoing_tx', out_tx_id::text); END IF; END IF; -END IF; -IF NOT out_found AND out_initiated THEN - -- Reconciles the related initiated transaction - UPDATE initiated_outgoing_transactions - SET - outgoing_transaction_id = out_tx_id - ,status = 'success' - ,status_msg = null - WHERE initiated_outgoing_transaction_id = init_id; - - -- Reconciles the related initiated batch - UPDATE initiated_outgoing_batches - SET status = 'success', status_msg = null - WHERE message_id = in_msg_id; + IF out_initiated THEN + -- Reconciles the related initiated transaction + UPDATE initiated_outgoing_transactions + SET + outgoing_transaction_id = out_tx_id + ,status = 'success' + ,status_msg = null + WHERE initiated_outgoing_transaction_id = init_id; + + -- Reconciles the related initiated batch + UPDATE initiated_outgoing_batches + SET status = 'success', status_msg = null + WHERE message_id = in_msg_id AND status NOT IN ('success', 'permanent_failure'); + END IF; END IF; END $$; COMMENT ON FUNCTION register_outgoing diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt @@ -47,13 +47,23 @@ data class InitiatedPayment( val endToEndId: String ) -/** Outgoing transactions submission states for batches or individual transactions */ +/** Outgoing transactions and batches submission status */ enum class SubmissionState { + // Initiated but not yet submitted unsubmitted, + // Submission failed, retry possible transient_failure, + // Submission succed, pending settltment + pending, + // Definitive failure, will never succeed permanent_failure, - success, - pending + // Definitive success, booked and settled + success ; + + companion object { + val SETTLED = listOf(SubmissionState.success, SubmissionState.permanent_failure) + val PENDING = listOf(SubmissionState.unsubmitted, SubmissionState.pending) + } } /** Collects database connection steps and any operation on the Nexus tables */ diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt @@ -27,6 +27,10 @@ import java.time.Instant /** Data access logic for initiated outgoing payments */ class InitiatedDAO(private val db: Database) { + private val UNSETTLED_FILTER = + "status NOT IN (${SubmissionState.SETTLED.map { "'$it'" }.joinToString(",")})" + private val PENDING_FILTER = + "status IN (${SubmissionState.PENDING.map { "'$it'" }.joinToString(",")})" /** Outgoing payments initiation result */ sealed interface PaymentInitiationResult { @@ -34,11 +38,11 @@ class InitiatedDAO(private val db: Database) { data object RequestUidReuse: PaymentInitiationResult } - /** Register a new pending payment in the database */ - suspend fun create(paymentData: InitiatedPayment): PaymentInitiationResult = db.serializable( + /** Initiate a new payment */ + suspend fun create(payment: InitiatedPayment): PaymentInitiationResult = db.serializable( """ INSERT INTO initiated_outgoing_transactions ( - amount + amount ,subject ,credit_payto ,initiation_time @@ -48,66 +52,66 @@ class InitiatedDAO(private val db: Database) { """ ) { // TODO check payto uri - setLong(1, paymentData.amount.value) - setInt(2, paymentData.amount.frac) - setString(3, paymentData.subject) - setString(4, paymentData.creditor.toString()) - setLong(5, paymentData.initiationTime.micros()) - setString(6, paymentData.endToEndId) + setLong(1, payment.amount.value) + setInt(2, payment.amount.frac) + setString(3, payment.subject) + setString(4, payment.creditor.toString()) + setLong(5, payment.initiationTime.micros()) + setString(6, payment.endToEndId) oneUniqueViolation(PaymentInitiationResult.RequestUidReuse) { PaymentInitiationResult.Success(it.getLong("initiated_outgoing_transaction_id")) } } - /** Register EBICS submission success of order [orderId] for batch [id] at [timestamp] */ + /** Register submission success of order [orderId] for batch [id] at [timestamp] */ suspend fun batchSubmissionSuccess( id: Long, timestamp: Instant, orderId: String ) = db.serializableTransaction { tx -> - // Update batch state - tx.withStatement( + // Update batch status + val updated = tx.withStatement( """ UPDATE initiated_outgoing_batches - SET status = 'pending'::submission_state + SET status = 'pending' ,submission_date = ? ,status_msg = NULL ,order_id = ? ,submission_counter = submission_counter + 1 - WHERE initiated_outgoing_batch_id = ? + WHERE initiated_outgoing_batch_id = ? AND order_id IS NULL """ ) { setLong(1, timestamp.micros()) setString(2, orderId) setLong(3, id) - execute() + executeUpdate() } - // Update transactions state if they are still not in their final state - tx.withStatement( - """ - UPDATE initiated_outgoing_transactions - SET status = 'pending'::submission_state - ,status_msg = NULL - WHERE initiated_outgoing_batch_id = ? - AND status NOT IN ('permanent_failure', 'success') - """ - ) { - setLong(1, id) - execute() + if (updated > 0) { + // Update unsettled batch's transaction status + tx.withStatement( + """ + UPDATE initiated_outgoing_transactions + SET status = 'pending', status_msg = NULL + WHERE initiated_outgoing_batch_id = ? AND $UNSETTLED_FILTER + """ + ) { + setLong(1, id) + execute() + } } } - /** Register EBICS submission failure with [msg] for batch [id] at [timestamp]*/ + /** Register submission failure with [msg] for batch [id] at [timestamp]*/ suspend fun batchSubmissionFailure( id: Long, timestamp: Instant, msg: String? ) = db.serializableTransaction { tx -> - // Update batch state + // Update batch status tx.withStatement( """ - UPDATE initiated_outgoing_batches SET - status = 'transient_failure'::submission_state + UPDATE initiated_outgoing_batches + SET status = 'transient_failure' ,submission_date = ? ,status_msg = ? ,submission_counter = submission_counter + 1 @@ -119,14 +123,12 @@ class InitiatedDAO(private val db: Database) { setLong(3, id) execute() } - // Update transactions state if they are not yet in final status + // Update unsettled batch's transaction status tx.withStatement( """ - UPDATE initiated_outgoing_transactions SET - status = 'transient_failure'::submission_state - ,status_msg = ? - WHERE initiated_outgoing_batch_id = ? - AND status NOT IN ('permanent_failure', 'success') + UPDATE initiated_outgoing_transactions + SET status = 'transient_failure', status_msg = ? + WHERE initiated_outgoing_batch_id = ? AND $UNSETTLED_FILTER """ ) { setString(1, msg) @@ -135,14 +137,14 @@ class InitiatedDAO(private val db: Database) { } } - /** Register EBICS order step [msg] for [orderId] */ + /** Register order step [msg] for [orderId] */ suspend fun orderStep(orderId: String, msg: String) = db.serializableTransaction { tx -> - // Update batch state + // Update pending batch status val batchId = tx.withStatement( """ UPDATE initiated_outgoing_batches - SET status_msg = ? - WHERE order_id = ? + SET status = 'pending', status_msg = ? + WHERE order_id = ? AND $PENDING_FILTER RETURNING initiated_outgoing_batch_id """ ) { @@ -151,13 +153,12 @@ class InitiatedDAO(private val db: Database) { oneOrNull { it.getLong(1) } } if (batchId != null) { - // Update transactions state if they are not yet in final status + // Update pending batch's transaction status tx.withStatement( """ UPDATE initiated_outgoing_transactions - SET status_msg = ? - WHERE initiated_outgoing_batch_id = ? - AND status NOT IN ('permanent_failure', 'success') + SET status = 'pending', status_msg = ? + WHERE initiated_outgoing_batch_id = ? AND $PENDING_FILTER """ ) { setString(1, msg) @@ -167,9 +168,9 @@ class InitiatedDAO(private val db: Database) { } } - /** Register EBICS order success for [orderId] and return message_id if found */ + /** Register order success for [orderId] and return message_id if found */ suspend fun orderSuccess(orderId: String): String? = db.serializableTransaction { tx -> - // Update batch state + // Update batch status val result = tx.withStatement( """ UPDATE initiated_outgoing_batches @@ -188,13 +189,12 @@ class InitiatedDAO(private val db: Database) { } if (result == null) return@serializableTransaction null val (batchId, messageId) = result - // Update transactions state if they are not yet in final status + // Update unsettled batch's transaction status tx.withStatement( """ UPDATE initiated_outgoing_transactions SET status = 'pending' - WHERE initiated_outgoing_batch_id = ? - AND status NOT IN ('permanent_failure', 'success') + WHERE initiated_outgoing_batch_id = ? AND $UNSETTLED_FILTER """ ) { setLong(1, batchId) @@ -203,9 +203,9 @@ class InitiatedDAO(private val db: Database) { messageId } - /** Register EBICS order failure for [orderId] and return message_id and previous status_msg if found */ + /** Register order failure for [orderId] and return message_id and previous status_msg if found */ suspend fun orderFailure(orderId: String): Pair<String, String?>? = db.serializableTransaction { tx -> - // Update batch state + // Update batch status val result = tx.withStatement( """ UPDATE initiated_outgoing_batches @@ -225,13 +225,12 @@ class InitiatedDAO(private val db: Database) { } if (result == null) return@serializableTransaction null val (batchId, messageId, msg) = result - // Update transactions state if they are not yet in final status + // Update batch's transaction status tx.withStatement( """ UPDATE initiated_outgoing_transactions SET status = 'permanent_failure' WHERE initiated_outgoing_batch_id = ? - AND status NOT IN ('permanent_failure', 'success') """ ) { setLong(1, batchId) @@ -240,15 +239,14 @@ class InitiatedDAO(private val db: Database) { Pair(messageId, msg) } - /** Register EBICS payment status [state] with [msg] for batch [msgId] */ + /** Register payment status [state] with [msg] for batch [msgId] */ suspend fun batchStatusUpdate(msgId: String, state: SubmissionState, msg: String?) = db.serializableTransaction { tx -> - // Update batch state + // Update unsettled batch status val batchId = tx.withStatement( """ UPDATE initiated_outgoing_batches - SET status = ?::submission_state, - status_msg = ? - WHERE message_id = ? + SET status = ?::submission_state, status_msg = ? + WHERE message_id = ? AND $UNSETTLED_FILTER RETURNING initiated_outgoing_batch_id """ ) { @@ -257,7 +255,7 @@ class InitiatedDAO(private val db: Database) { setString(3, msgId) oneOrNull { it.getLong("initiated_outgoing_batch_id") } } - // Update transactions state if they are not yet in final status + // Update unsettled batch's transaction status if (batchId != null) { // When a batch succeed it doesn't mean that individual transaction also succeed val txState = if (state == SubmissionState.success) { @@ -268,10 +266,8 @@ class InitiatedDAO(private val db: Database) { tx.withStatement( """ UPDATE initiated_outgoing_transactions - SET status = ?::submission_state, - status_msg = ? - WHERE initiated_outgoing_batch_id = ? - AND status NOT IN ('permanent_failure', 'success') + SET status = ?::submission_state, status_msg = ? + WHERE initiated_outgoing_batch_id = ? AND $UNSETTLED_FILTER """ ) { setString(1, txState.name) @@ -282,15 +278,14 @@ class InitiatedDAO(private val db: Database) { } } - /** Register EBICS payment status [state] with [msg] for transaction [endToEndId] in batch [msgId] */ + /** Register payment status [state] with [msg] for transaction [endToEndId] in batch [msgId] */ suspend fun txStatusUpdate(endToEndId: String, msgId: String?, state: SubmissionState, msg: String) = db.serializableTransaction { tx -> + // Update unsettled transaction status tx.withStatement( """ UPDATE initiated_outgoing_transactions - SET status = ?::submission_state, - status_msg = ? - WHERE end_to_end_id = ? - AND status NOT IN ('permanent_failure', 'success') + SET status = ?::submission_state, status_msg = ? + WHERE end_to_end_id = ? AND $UNSETTLED_FILTER """ ) { setString(1, state.name) @@ -298,12 +293,13 @@ class InitiatedDAO(private val db: Database) { setString(3, endToEndId) execute() } + // Update unsettled batch status if (msgId != null) { tx.withStatement( """ UPDATE initiated_outgoing_batches - SET status = 'success' - WHERE message_id = ? + SET status = 'success', status_msg = NULL + WHERE message_id = ? AND $UNSETTLED_FILTER """ ) { setString(1, msgId) @@ -315,15 +311,15 @@ class InitiatedDAO(private val db: Database) { /** Unsettled intiaited payment in batch [msgId] */ suspend fun unsettledTxInBatch(msgId: String, executionTime: Instant) = db.serializable( """ - SELECT end_to_end_id, - (amount).val as amount_val, - (amount).frac as amount_frac, - subject, - credit_payto + SELECT end_to_end_id + ,(amount).val as amount_val + ,(amount).frac as amount_frac + ,subject + ,credit_payto FROM initiated_outgoing_transactions JOIN initiated_outgoing_batches USING (initiated_outgoing_batch_id) WHERE message_id = ? - AND initiated_outgoing_transactions.status NOT IN ('permanent_failure', 'success') + AND initiated_outgoing_transactions.$UNSETTLED_FILTER """ ) { setString(1, msgId) diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt @@ -37,7 +37,7 @@ class PaymentDAO(private val db: Database) { /** Register an outgoing payment reconciling it with its initiated payment counterpart if present */ suspend fun registerOutgoing( - paymentData: OutgoingPayment, + payment: OutgoingPayment, wtid: ShortHashCode?, baseUrl: ExchangeUrl?, ): OutgoingRegistrationResult = db.serializable( @@ -46,18 +46,18 @@ class PaymentDAO(private val db: Database) { FROM register_outgoing((?,?)::taler_amount,?,?,?,?,?,?,?) """ ) { - val executionTime = paymentData.executionTime.micros() - - setLong(1, paymentData.amount.value) - setInt(2, paymentData.amount.frac) - setString(3, paymentData.subject) + val executionTime = payment.executionTime.micros() + + setLong(1, payment.amount.value) + setInt(2, payment.amount.frac) + setString(3, payment.subject) setLong(4, executionTime) - setString(5, paymentData.creditorPayto.toString()) - setString(6, paymentData.endToEndId) + setString(5, payment.creditorPayto.toString()) + setString(6, payment.endToEndId) setBytes(7, wtid?.raw) setString(8, baseUrl?.url) - setString(9, paymentData.msgId) - + setString(9, payment.msgId) + one { OutgoingRegistrationResult( it.getLong("out_tx_id"), @@ -76,7 +76,7 @@ class PaymentDAO(private val db: Database) { /** Register an incoming payment and bounce it */ suspend fun registerMalformedIncoming( - paymentData: IncomingPayment, + payment: IncomingPayment, bounceAmount: TalerAmount, bounceEndToEndId: String, timestamp: Instant @@ -86,12 +86,12 @@ class PaymentDAO(private val db: Database) { FROM register_and_bounce_incoming((?,?)::taler_amount,?,?,?,?,(?,?)::taler_amount,?,?) """ ) { - setLong(1, paymentData.amount.value) - setInt(2, paymentData.amount.frac) - setString(3, paymentData.subject) - setLong(4, paymentData.executionTime.micros()) - setString(5, paymentData.debtorPayto.toString()) - setString(6, paymentData.bankId) + setLong(1, payment.amount.value) + setInt(2, payment.amount.frac) + setString(3, payment.subject) + setLong(4, payment.executionTime.micros()) + setString(5, payment.debtorPayto.toString()) + setString(6, payment.bankId) setLong(7, bounceAmount.value) setInt(8, bounceAmount.frac) setLong(9, timestamp.micros()) @@ -113,7 +113,7 @@ class PaymentDAO(private val db: Database) { /** Register an talerable incoming payment */ suspend fun registerTalerableIncoming( - paymentData: IncomingPayment, + payment: IncomingPayment, metadata: TalerIncomingMetadata ): IncomingRegistrationResult = db.serializable( """ @@ -121,13 +121,13 @@ class PaymentDAO(private val db: Database) { FROM register_incoming((?,?)::taler_amount,?,?,?,?,?::taler_incoming_type,?,?) """ ) { - val executionTime = paymentData.executionTime.micros() - setLong(1, paymentData.amount.value) - setInt(2, paymentData.amount.frac) - setString(3, paymentData.subject) + val executionTime = payment.executionTime.micros() + setLong(1, payment.amount.value) + setInt(2, payment.amount.frac) + setString(3, payment.subject) setLong(4, executionTime) - setString(5, paymentData.debtorPayto.toString()) - setString(6, paymentData.bankId) + setString(5, payment.debtorPayto.toString()) + setString(6, payment.bankId) setString(7, metadata.type.name) when (metadata.type) { TalerIncomingType.reserve -> { @@ -153,20 +153,20 @@ class PaymentDAO(private val db: Database) { /** Register an incoming payment */ suspend fun registerIncoming( - paymentData: IncomingPayment + payment: IncomingPayment ): IncomingRegistrationResult.Success = db.serializable( """ SELECT out_found, out_tx_id FROM register_incoming((?,?)::taler_amount,?,?,?,?,NULL,NULL,NULL) """ - ) { - val executionTime = paymentData.executionTime.micros() - setLong(1, paymentData.amount.value) - setInt(2, paymentData.amount.frac) - setString(3, paymentData.subject) + ) { + val executionTime = payment.executionTime.micros() + setLong(1, payment.amount.value) + setInt(2, payment.amount.frac) + setString(3, payment.subject) setLong(4, executionTime) - setString(5, paymentData.debtorPayto.toString()) - setString(6, paymentData.bankId) + setString(5, payment.debtorPayto.toString()) + setString(6, payment.bankId) one { IncomingRegistrationResult.Success( it.getLong("out_tx_id"), @@ -181,7 +181,7 @@ class PaymentDAO(private val db: Database) { ): List<RevenueIncomingBankTransaction> = db.poolHistoryGlobal(params, db::listenRevenue, """ SELECT - incoming_transaction_id + incoming_transaction_id ,execution_time ,(amount).val AS amount_val ,(amount).frac AS amount_frac @@ -202,7 +202,7 @@ class PaymentDAO(private val db: Database) { suspend fun metadataIncoming(): List<IncomingTxMetadata> = db.serializable( """ SELECT - (amount).val as amount_val + (amount).val as amount_val ,(amount).frac AS amount_frac ,subject ,execution_time @@ -238,7 +238,7 @@ class PaymentDAO(private val db: Database) { suspend fun metadataOutgoing(): List<OutgoingTxMetadata> = db.serializable( """ SELECT - (amount).val as amount_val + (amount).val as amount_val ,(amount).frac AS amount_frac ,subject ,execution_time @@ -268,7 +268,7 @@ class PaymentDAO(private val db: Database) { suspend fun metadataInitiated(): List<InitiatedTxMetadata> = db.serializable( """ SELECT - (amount).val as amount_val + (amount).val as amount_val ,(amount).frac AS amount_frac ,subject ,initiation_time diff --git a/nexus/src/test/kotlin/DatabaseTest.kt b/nexus/src/test/kotlin/DatabaseTest.kt @@ -20,7 +20,7 @@ import org.junit.Test import tech.libeufin.common.ShortHashCode import tech.libeufin.common.TalerAmount -import tech.libeufin.common.db.one +import tech.libeufin.common.db.* import tech.libeufin.nexus.AccountType import tech.libeufin.nexus.cli.* import tech.libeufin.nexus.db.* @@ -295,80 +295,183 @@ class PaymentInitiationsTest { @Test fun status() = setup { db, _ -> - suspend fun checkBatchStatus(id: Int, status: SubmissionState, txStatus: SubmissionState? = null) { + suspend fun checkPart( + batchId: Long, + batchStatus: SubmissionState, + batchMsg: String?, + txStatus: SubmissionState, + txMsg: String?, + settledStatus: SubmissionState, + settledMsg: String?, + ) { + // Check batch status + val msgId = db.serializable( + """ + SELECT message_id, status, status_msg FROM initiated_outgoing_batches WHERE initiated_outgoing_batch_id=? + """ + ) { + setLong(1, batchId) + one { + val msgId = it.getString("message_id") + assertEquals( + batchStatus to batchMsg, + it.getEnum<SubmissionState>("status") to it.getString("status_msg"), + msgId + ) + msgId + } + } + // Check tx status db.serializable( """ - SELECT bool_and(initiated_outgoing_batches.status=?::submission_state AND initiated_outgoing_transactions.status=?::submission_state) - FROM initiated_outgoing_batches - JOIN initiated_outgoing_transactions USING (initiated_outgoing_batch_id) - WHERE initiated_outgoing_batch_id=? + SELECT end_to_end_id, status, status_msg FROM initiated_outgoing_transactions WHERE initiated_outgoing_batch_id=? """ ) { - setString(1, status.name) - setString(2, (txStatus ?: status).name) - setLong(3, id.toLong()) - one { assert(it.getBoolean(1)) } + setLong(1, batchId) + all { + val endToEndId = it.getString("end_to_end_id") + val expected = when (endToEndId) { + "TX" -> Pair(txStatus, txMsg) + "TX_SETTLED" -> Pair(settledStatus, settledMsg) + else -> throw Exception("Unexpected tx $endToEndId") + } + assertEquals( + expected, + it.getEnum<SubmissionState>("status") to it.getString("status_msg"), + "$msgId.$endToEndId" + ) + } } } + suspend fun checkBatch(batchId: Long, status: SubmissionState, msg: String?, txStatus: SubmissionState? = null) { + val txStatus = txStatus ?: status + checkPart(batchId, status, msg, txStatus, msg, txStatus, msg) + } + suspend fun checkOrder(orderId: String, status: SubmissionState, msg: String?, txStatus: SubmissionState? = null) { + val batchId = db.serializable( + "SELECT initiated_outgoing_batch_id FROM initiated_outgoing_batches WHERE order_id=?" + ) { + setString(1, orderId) + one { + it.getLong("initiated_outgoing_batch_id") + } + } + checkBatch(batchId, status, msg, txStatus) + } - val NB_BATCH = 3 - val NB_TX_PER_BATCH = 2 - - repeat(NB_BATCH) { batch -> - repeat(NB_TX_PER_BATCH) { tx -> + suspend fun test(lambda: suspend (Long) -> Unit) { + // Reset DB + db.conn { conn -> + conn.execSQLUpdate("DELETE FROM initiated_outgoing_transactions"); + conn.execSQLUpdate("DELETE FROM initiated_outgoing_batches"); + } + // Create a test batch with three transactions + for (id in sequenceOf("TX", "TX_SETTLED")) { assertIs<PaymentInitiationResult.Success>( - db.initiated.create(genInitPay("PAY${batch*NB_TX_PER_BATCH+tx}")) + db.initiated.create(genInitPay(id)) ) } - db.initiated.batch(Instant.now(), "BATCH${batch}") - checkBatchStatus(batch+1, SubmissionState.unsubmitted) + db.initiated.batch(Instant.now(), "BATCH") + // Create witness transactions and batch + for (id in sequenceOf("WITNESS_1", "WITNESS_2")) { + assertIs<PaymentInitiationResult.Success>( + db.initiated.create(genInitPay(id)) + ) + } + db.initiated.batch(Instant.now(), "BATCH_WITNESS") + for (id in sequenceOf("WITNESS_3", "WITNESS_4")) { + assertIs<PaymentInitiationResult.Success>( + db.initiated.create(genInitPay(id)) + ) + } + // Check everything is unsubmitted + db.serializable( + """ + SELECT (SELECT bool_and(status = 'unsubmitted') FROM initiated_outgoing_batches) + AND (SELECT bool_and(status = 'unsubmitted') FROM initiated_outgoing_transactions) + """ + ) { + one { assertTrue(it.getBoolean(1)) } + } + // Run test + lambda(db.initiated.submittable("").find { it.messageId == "BATCH" }!!.id) + // Check witness status is unaltered + db.serializable( + """ + SELECT (SELECT bool_and(status = 'unsubmitted') FROM initiated_outgoing_batches WHERE message_id != 'BATCH') + AND (SELECT bool_and(initiated_outgoing_transactions.status = 'unsubmitted') + FROM initiated_outgoing_transactions JOIN initiated_outgoing_batches USING (initiated_outgoing_batch_id) + WHERE message_id != 'BATCH') + """ + ) { + one { assertTrue(it.getBoolean(1)) } + } } - // Submission succeed after retry but order failed - db.initiated.batchSubmissionFailure(1, Instant.now(), "First failure") - checkBatchStatus(1, SubmissionState.transient_failure) - db.initiated.batchSubmissionFailure(1, Instant.now(), "Second failure") - checkBatchStatus(1, SubmissionState.transient_failure) - db.initiated.batchSubmissionSuccess(1, Instant.now(), "ORDER1") - checkBatchStatus(1, SubmissionState.pending) - assertNull(db.initiated.orderFailure("ORDER1")!!.second) - checkBatchStatus(1, SubmissionState.permanent_failure) - - // Submission succeed after retry but order failed with messages - db.initiated.batchSubmissionFailure(2, Instant.now(), "First failure") - db.initiated.batchSubmissionSuccess(2, Instant.now(), "ORDER2") - db.initiated.orderStep("ORDER2", "status msg") - checkBatchStatus(2, SubmissionState.pending) - assertEquals("status msg", db.initiated.orderFailure("ORDER2")!!.second) - checkBatchStatus(2, SubmissionState.permanent_failure) - db.initiated.orderStep("ORDER2", "late msg") - checkBatchStatus(2, SubmissionState.permanent_failure) - - // Submission and order succeed - db.initiated.batchSubmissionSuccess(3, Instant.now(), "ORDER3") - checkBatchStatus(3, SubmissionState.pending) - assertNotNull(db.initiated.orderSuccess("ORDER3")) - checkBatchStatus(3, SubmissionState.success, SubmissionState.pending) + // Submission retry status + test { batchId -> + db.initiated.batchSubmissionFailure(batchId, Instant.now(), "First failure") + checkBatch(batchId, SubmissionState.transient_failure, "First failure") + db.initiated.batchSubmissionFailure(batchId, Instant.now(), "Second failure") + checkBatch(batchId, SubmissionState.transient_failure, "Second failure") + db.initiated.batchSubmissionSuccess(batchId, Instant.now(), "ORDER") + checkOrder("ORDER", SubmissionState.pending, null) + db.initiated.batchSubmissionSuccess(batchId, Instant.now(), "ORDER") + checkOrder("ORDER", SubmissionState.pending, null) + db.initiated.orderStep("ORDER", "step msg") + checkOrder("ORDER", SubmissionState.pending, "step msg") + db.initiated.orderStep("ORDER", "success msg") + checkOrder("ORDER", SubmissionState.pending, "success msg") + db.initiated.orderSuccess("ORDER") + checkOrder("ORDER", SubmissionState.success, "success msg", SubmissionState.pending) + db.initiated.orderStep("ORDER", "late msg") + checkOrder("ORDER", SubmissionState.success, "success msg", SubmissionState.pending) + } - // Unknown order + // Order step message on failure + test { batchId -> + db.initiated.batchSubmissionSuccess(batchId, Instant.now(), "ORDER") + checkOrder("ORDER", SubmissionState.pending, null) + db.initiated.orderStep("ORDER", "step msg") + checkOrder("ORDER", SubmissionState.pending, "step msg") + db.initiated.orderStep("ORDER", "failure msg") + checkOrder("ORDER", SubmissionState.pending, "failure msg") + assertEquals("failure msg", db.initiated.orderFailure("ORDER")!!.second) + checkOrder("ORDER", SubmissionState.permanent_failure, "failure msg") + db.initiated.orderStep("ORDER", "late msg") + checkOrder("ORDER", SubmissionState.permanent_failure, "failure msg") + } + + // Payment & batch status + test { batchId -> + checkBatch(batchId, SubmissionState.unsubmitted, null) + db.initiated.batchStatusUpdate("BATCH", SubmissionState.pending, "progress") + checkBatch(batchId, SubmissionState.pending, "progress") + db.initiated.txStatusUpdate("TX_SETTLED", null, SubmissionState.success, "success") + checkPart(batchId, SubmissionState.pending, "progress", SubmissionState.pending, "progress", SubmissionState.success, "success") + db.initiated.batchStatusUpdate("BATCH", SubmissionState.transient_failure, "waiting") + checkPart(batchId, SubmissionState.transient_failure, "waiting", SubmissionState.transient_failure, "waiting", SubmissionState.success, "success") + db.initiated.txStatusUpdate("TX", "BATCH", SubmissionState.permanent_failure, "failure") + checkPart(batchId, SubmissionState.success, null, SubmissionState.permanent_failure, "failure", SubmissionState.success, "success") + } + + // Registration + test { batchId -> + checkBatch(batchId, SubmissionState.unsubmitted, null) + registerOutgoingPayment(db, genOutPay("", endToEndId = "TX_SETTLED")) + checkPart(batchId, SubmissionState.unsubmitted, null, SubmissionState.unsubmitted, null, SubmissionState.success, null) + registerOutgoingPayment(db, genOutPay("", endToEndId = "TX", msgId = "BATCH")) + checkPart(batchId, SubmissionState.success, null, SubmissionState.success, null, SubmissionState.success, null) + } + + // Unknown order and batch + db.initiated.batchSubmissionSuccess(42, Instant.now(), "ORDER_X") + db.initiated.batchSubmissionFailure(42, Instant.now(), null) + db.initiated.orderStep("ORDER_X", "msg") + db.initiated.batchStatusUpdate("BATCH_X", SubmissionState.success, null) + db.initiated.txStatusUpdate("TX_X", "BATCH_X", SubmissionState.success, "msg") assertNull(db.initiated.orderSuccess("ORDER_X")) assertNull(db.initiated.orderFailure("ORDER_X")) - - // Transaction failure received before order success - assertIs<PaymentInitiationResult.Success>( - db.initiated.create(genInitPay("TEST1")) - ) - db.initiated.batch(Instant.now(), "BATCH4") - db.initiated.batchSubmissionSuccess(4, Instant.now(), "ORDER4") - checkBatchStatus(4, SubmissionState.pending) - db.initiated.txStatusUpdate("TEST1", null, SubmissionState.pending, "status progress") - checkBatchStatus(4, SubmissionState.pending) - db.initiated.txStatusUpdate("TEST1", null, SubmissionState.permanent_failure, "status failure") - checkBatchStatus(4, SubmissionState.pending, SubmissionState.permanent_failure) - assertNotNull(db.initiated.orderSuccess("ORDER4")) - checkBatchStatus(4, SubmissionState.success, SubmissionState.permanent_failure) - - // TODO test batch state for one success or one failure } @Test @@ -411,6 +514,8 @@ class PaymentInitiationsTest { db.initiated.batchSubmissionFailure(2, Instant.now(), "Failure") checkIds("PAY2", "PAY5", "PAY1") } + + // TODO test for unsettledTxInBatch } class EbicsTxTest { diff --git a/nexus/src/test/kotlin/Registration.kt b/nexus/src/test/kotlin/RegistrationTest.kt diff --git a/nexus/src/test/kotlin/helpers.kt b/nexus/src/test/kotlin/helpers.kt @@ -97,28 +97,30 @@ fun genInitPay( ) /** Generates an incoming payment, given its subject */ -fun genInPay(subject: String, amount: String = "KUDOS:44"): IncomingPayment { - val bankId = randEbicsId() - return IncomingPayment( +fun genInPay( + subject: String, + amount: String = "KUDOS:44" +) = IncomingPayment( amount = TalerAmount(amount), debtorPayto = ibanPayto("DE84500105177118117964"), subject = subject, executionTime = Instant.now(), - bankId = bankId + bankId = randEbicsId(), ) -} /** Generates an outgoing payment, given its subject and end-to-end ID */ -fun genOutPay(subject: String, endToEndId: String? = null): OutgoingPayment { - val id = endToEndId ?: randEbicsId() - return OutgoingPayment( +fun genOutPay( + subject: String, + endToEndId: String? = null, + msgId: String? = null +) = OutgoingPayment( amount = TalerAmount(44, 0, "KUDOS"), creditorPayto = ibanPayto("CH4189144589712575493", "Test"), subject = subject, executionTime = Instant.now(), - endToEndId = id + endToEndId = endToEndId ?: randEbicsId(), + msgId = msgId ) -} /** Perform a taler outgoing transaction */ suspend fun ApplicationTestBuilder.transfer() {