diff options
Diffstat (limited to 'nexus/src/main')
3 files changed, 39 insertions, 25 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt index 765037e4..47c021cd 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt @@ -132,9 +132,9 @@ suspend fun ingestIncomingPayment( Instant.now() ) if (result.new) { - logger.info("$payment bounced in '${result.bounceId}': ${e.message}") + logger.info("$payment bounced in '${result.bounceId}': ${e.fmt()}") } else { - logger.debug("$payment already seen and bounced in '${result.bounceId}': ${e.message}") + logger.debug("$payment already seen and bounced in '${result.bounceId}': ${e.fmt()}") } } ) @@ -154,7 +154,7 @@ private suspend fun ingestDocument( is TxNotification.Incoming -> ingestIncomingPayment(db, it.payment) is TxNotification.Outgoing -> ingestOutgoingPayment(db, it.payment) is TxNotification.Reversal -> { - logger.warn("BOUNCE '${it.msgId}': ${it.reason}") + logger.error("BOUNCE '${it.msgId}': ${it.reason}") db.initiated.reversal(it.msgId, "Payment bounced: ${it.reason}") } } @@ -176,7 +176,7 @@ private suspend fun ingestDocument( HacAction.ORDER_HAC_FINAL_NEG -> { logger.debug("$ack") db.initiated.logFailure(ack.orderId!!)?.let { (requestUID, msg) -> - logger.warn("Payment '$requestUID' refused at ${ack.timestamp.fmtDateTime()}${if (msg != null) ": $msg" else ""}") + logger.error("Payment '$requestUID' refused at ${ack.timestamp.fmtDateTime()}${if (msg != null) ": $msg" else ""}") } } else -> { @@ -194,7 +194,7 @@ private suspend fun ingestDocument( logger.debug("$status") if (status.paymentCode == ExternalPaymentGroupStatusCode.RJCT) { db.initiated.bankFailure(status.msgId, msg) - logger.warn("Transaction '${status.msgId}' was rejected : $msg") + logger.error("Transaction '${status.msgId}' was rejected : $msg") } else { db.initiated.bankMessage(status.msgId, msg) } diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt index 656ce694..8366f064 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt @@ -110,7 +110,7 @@ private suspend fun submitBatch( ctx: SubmissionContext, db: Database, ) { - db.initiated.submittableGet(ctx.cfg.currency).forEach { + db.initiated.submittable(ctx.cfg.currency).forEach { logger.debug("Submitting payment '${it.requestUid}'") runCatching { submitInitiatedPayment(ctx, it) }.fold( onSuccess = { orderId -> @@ -119,7 +119,7 @@ private suspend fun submitBatch( }, onFailure = { e -> db.initiated.submissionFailure(it.id, Instant.now(), e.message) - logger.warn("Payment '${it.requestUid}' submission failure: ${e.message}") + logger.error("Payment '${it.requestUid}' submission failure: ${e.fmt()}") throw e } ) @@ -175,7 +175,7 @@ class EbicsSubmit : CliktCommand("Submits any initiated payment found in the dat try { submitBatch(ctx, db) } catch (e: Exception) { - throw Exception("Failed to submit payments", e) + throw Exception("Failed to submit payments") } // TODO take submitBatch taken time in the delay delay(((frequency?.inSeconds ?: 0) * 1000).toLong()) diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt index efe98f82..4df7f0c9 100644 --- a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt +++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt @@ -22,6 +22,7 @@ package tech.libeufin.nexus.db import tech.libeufin.nexus.* import tech.libeufin.common.* import java.time.Instant +import java.sql.ResultSet /** Data access logic for initiated outgoing payments */ class InitiatedDAO(private val db: Database) { @@ -169,28 +170,15 @@ class InitiatedDAO(private val db: Database) { stmt.execute() } - // TODO WIP - suspend fun submittableGet(currency: String): List<InitiatedPayment> = db.conn { conn -> - val stmt = conn.prepareStatement(""" - SELECT - initiated_outgoing_transaction_id - ,(amount).val as amount_val - ,(amount).frac as amount_frac - ,wire_transfer_subject - ,credit_payto_uri - ,initiation_time - ,request_uid - FROM initiated_outgoing_transactions - WHERE (submitted='unsubmitted' OR submitted='transient_failure') - AND ((amount).val != 0 OR (amount).frac != 0); - """) - stmt.all { + /** List every initiated payment pending submission in ther order they should be submitted */ + suspend fun submittable(currency: String): List<InitiatedPayment> = db.conn { conn -> + fun extract(it: ResultSet): InitiatedPayment { val rowId = it.getLong("initiated_outgoing_transaction_id") val initiationTime = it.getLong("initiation_time").microsToJavaInstant() if (initiationTime == null) { // nexus fault throw Exception("Found invalid timestamp at initiated payment with ID: $rowId") } - InitiatedPayment( + return InitiatedPayment( id = it.getLong("initiated_outgoing_transaction_id"), amount = it.getAmount("amount", currency), creditPaytoUri = it.getString("credit_payto_uri"), @@ -199,5 +187,31 @@ class InitiatedDAO(private val db: Database) { requestUid = it.getString("request_uid") ) } + val selectPart = """ + SELECT + initiated_outgoing_transaction_id + ,(amount).val as amount_val + ,(amount).frac as amount_frac + ,wire_transfer_subject + ,credit_payto_uri + ,initiation_time + ,request_uid + FROM initiated_outgoing_transactions + """ + // We want to maximize the number of successfully submitted transactions in the event + // of a malformed transaction or a persistent error classified as transient. We send + // the unsubmitted transactions first, starting with the oldest by creation time. + // This is the happy path, giving every transaction a chance while being fair on the + // basis of creation date. + // Then we retry the failed transaction, starting with the oldest by submission time. + // This the bad path retrying each failed transaction applying a rotation based on + // resubmission time. + val unsubmitted = conn.prepareStatement( + "$selectPart WHERE submitted='unsubmitted' ORDER BY initiation_time" + ).all(::extract) + val failed = conn.prepareStatement( + "$selectPart WHERE submitted='transient_failure' ORDER BY last_submission_time" + ).all(::extract) + unsubmitted + failed } }
\ No newline at end of file |