summaryrefslogtreecommitdiff
path: root/nexus/src/main
diff options
context:
space:
mode:
Diffstat (limited to 'nexus/src/main')
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt10
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt6
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt48
3 files changed, 39 insertions, 25 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
index 765037e4..47c021cd 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
@@ -132,9 +132,9 @@ suspend fun ingestIncomingPayment(
Instant.now()
)
if (result.new) {
- logger.info("$payment bounced in '${result.bounceId}': ${e.message}")
+ logger.info("$payment bounced in '${result.bounceId}': ${e.fmt()}")
} else {
- logger.debug("$payment already seen and bounced in '${result.bounceId}': ${e.message}")
+ logger.debug("$payment already seen and bounced in '${result.bounceId}': ${e.fmt()}")
}
}
)
@@ -154,7 +154,7 @@ private suspend fun ingestDocument(
is TxNotification.Incoming -> ingestIncomingPayment(db, it.payment)
is TxNotification.Outgoing -> ingestOutgoingPayment(db, it.payment)
is TxNotification.Reversal -> {
- logger.warn("BOUNCE '${it.msgId}': ${it.reason}")
+ logger.error("BOUNCE '${it.msgId}': ${it.reason}")
db.initiated.reversal(it.msgId, "Payment bounced: ${it.reason}")
}
}
@@ -176,7 +176,7 @@ private suspend fun ingestDocument(
HacAction.ORDER_HAC_FINAL_NEG -> {
logger.debug("$ack")
db.initiated.logFailure(ack.orderId!!)?.let { (requestUID, msg) ->
- logger.warn("Payment '$requestUID' refused at ${ack.timestamp.fmtDateTime()}${if (msg != null) ": $msg" else ""}")
+ logger.error("Payment '$requestUID' refused at ${ack.timestamp.fmtDateTime()}${if (msg != null) ": $msg" else ""}")
}
}
else -> {
@@ -194,7 +194,7 @@ private suspend fun ingestDocument(
logger.debug("$status")
if (status.paymentCode == ExternalPaymentGroupStatusCode.RJCT) {
db.initiated.bankFailure(status.msgId, msg)
- logger.warn("Transaction '${status.msgId}' was rejected : $msg")
+ logger.error("Transaction '${status.msgId}' was rejected : $msg")
} else {
db.initiated.bankMessage(status.msgId, msg)
}
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt
index 656ce694..8366f064 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsSubmit.kt
@@ -110,7 +110,7 @@ private suspend fun submitBatch(
ctx: SubmissionContext,
db: Database,
) {
- db.initiated.submittableGet(ctx.cfg.currency).forEach {
+ db.initiated.submittable(ctx.cfg.currency).forEach {
logger.debug("Submitting payment '${it.requestUid}'")
runCatching { submitInitiatedPayment(ctx, it) }.fold(
onSuccess = { orderId ->
@@ -119,7 +119,7 @@ private suspend fun submitBatch(
},
onFailure = { e ->
db.initiated.submissionFailure(it.id, Instant.now(), e.message)
- logger.warn("Payment '${it.requestUid}' submission failure: ${e.message}")
+ logger.error("Payment '${it.requestUid}' submission failure: ${e.fmt()}")
throw e
}
)
@@ -175,7 +175,7 @@ class EbicsSubmit : CliktCommand("Submits any initiated payment found in the dat
try {
submitBatch(ctx, db)
} catch (e: Exception) {
- throw Exception("Failed to submit payments", e)
+ throw Exception("Failed to submit payments")
}
// TODO take submitBatch taken time in the delay
delay(((frequency?.inSeconds ?: 0) * 1000).toLong())
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt
index efe98f82..4df7f0c9 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt
@@ -22,6 +22,7 @@ package tech.libeufin.nexus.db
import tech.libeufin.nexus.*
import tech.libeufin.common.*
import java.time.Instant
+import java.sql.ResultSet
/** Data access logic for initiated outgoing payments */
class InitiatedDAO(private val db: Database) {
@@ -169,28 +170,15 @@ class InitiatedDAO(private val db: Database) {
stmt.execute()
}
- // TODO WIP
- suspend fun submittableGet(currency: String): List<InitiatedPayment> = db.conn { conn ->
- val stmt = conn.prepareStatement("""
- SELECT
- initiated_outgoing_transaction_id
- ,(amount).val as amount_val
- ,(amount).frac as amount_frac
- ,wire_transfer_subject
- ,credit_payto_uri
- ,initiation_time
- ,request_uid
- FROM initiated_outgoing_transactions
- WHERE (submitted='unsubmitted' OR submitted='transient_failure')
- AND ((amount).val != 0 OR (amount).frac != 0);
- """)
- stmt.all {
+ /** List every initiated payment pending submission in ther order they should be submitted */
+ suspend fun submittable(currency: String): List<InitiatedPayment> = db.conn { conn ->
+ fun extract(it: ResultSet): InitiatedPayment {
val rowId = it.getLong("initiated_outgoing_transaction_id")
val initiationTime = it.getLong("initiation_time").microsToJavaInstant()
if (initiationTime == null) { // nexus fault
throw Exception("Found invalid timestamp at initiated payment with ID: $rowId")
}
- InitiatedPayment(
+ return InitiatedPayment(
id = it.getLong("initiated_outgoing_transaction_id"),
amount = it.getAmount("amount", currency),
creditPaytoUri = it.getString("credit_payto_uri"),
@@ -199,5 +187,31 @@ class InitiatedDAO(private val db: Database) {
requestUid = it.getString("request_uid")
)
}
+ val selectPart = """
+ SELECT
+ initiated_outgoing_transaction_id
+ ,(amount).val as amount_val
+ ,(amount).frac as amount_frac
+ ,wire_transfer_subject
+ ,credit_payto_uri
+ ,initiation_time
+ ,request_uid
+ FROM initiated_outgoing_transactions
+ """
+ // We want to maximize the number of successfully submitted transactions in the event
+ // of a malformed transaction or a persistent error classified as transient. We send
+ // the unsubmitted transactions first, starting with the oldest by creation time.
+ // This is the happy path, giving every transaction a chance while being fair on the
+ // basis of creation date.
+ // Then we retry the failed transaction, starting with the oldest by submission time.
+ // This the bad path retrying each failed transaction applying a rotation based on
+ // resubmission time.
+ val unsubmitted = conn.prepareStatement(
+ "$selectPart WHERE submitted='unsubmitted' ORDER BY initiation_time"
+ ).all(::extract)
+ val failed = conn.prepareStatement(
+ "$selectPart WHERE submitted='transient_failure' ORDER BY last_submission_time"
+ ).all(::extract)
+ unsubmitted + failed
}
} \ No newline at end of file