summaryrefslogtreecommitdiff
path: root/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
diff options
context:
space:
mode:
Diffstat (limited to 'nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt')
-rw-r--r--nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt77
1 files changed, 29 insertions, 48 deletions
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
index ca7b9116..765037e4 100644
--- a/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
+++ b/nexus/src/main/kotlin/tech/libeufin/nexus/EbicsFetch.kt
@@ -28,6 +28,7 @@ import io.ktor.client.plugins.*
import kotlinx.coroutines.*
import tech.libeufin.common.*
import tech.libeufin.nexus.ebics.*
+import tech.libeufin.nexus.db.*
import java.io.IOException
import java.io.InputStream
import java.time.Instant
@@ -91,7 +92,7 @@ suspend fun ingestOutgoingPayment(
db: Database,
payment: OutgoingPayment
) {
- val result = db.registerOutgoing(payment)
+ val result = db.payment.registerOutgoing(payment)
if (result.new) {
if (result.initiated)
logger.info("$payment")
@@ -117,7 +118,7 @@ suspend fun ingestIncomingPayment(
) {
runCatching { parseIncomingTxMetadata(payment.wireTransferSubject) }.fold(
onSuccess = { reservePub ->
- val result = db.registerTalerableIncoming(payment, reservePub)
+ val result = db.payment.registerTalerableIncoming(payment, reservePub)
if (result.new) {
logger.info("$payment")
} else {
@@ -125,7 +126,7 @@ suspend fun ingestIncomingPayment(
}
},
onFailure = { e ->
- val result = db.registerMalformedIncoming(
+ val result = db.payment.registerMalformedIncoming(
payment,
payment.amount,
Instant.now()
@@ -139,21 +140,7 @@ suspend fun ingestIncomingPayment(
)
}
-/**
- * Ingests an outgoing payment bounce.
- *
- * @param db database handle.
- * @param reversal reversal ingest.
- */
-suspend fun ingestReversal(
- db: Database,
- reversal: OutgoingReversal
-) {
- logger.warn("BOUNCE '${reversal.bankId}': ${reversal.reason}")
- // TODO store in db=
-}
-
-private fun ingestDocument(
+private suspend fun ingestDocument(
db: Database,
currency: String,
xml: InputStream,
@@ -162,15 +149,13 @@ private fun ingestDocument(
when (whichDocument) {
SupportedDocument.CAMT_054 -> {
try {
- val notifications = mutableListOf<TxNotification>()
- parseTxNotif(xml, currency, notifications)
-
- runBlocking {
- notifications.forEach {
- when (it) {
- is TxNotification.Incoming -> ingestIncomingPayment(db, it.payment)
- is TxNotification.Outgoing -> ingestOutgoingPayment(db, it.payment)
- is TxNotification.Reversal -> ingestReversal(db, it.reversal)
+ parseTxNotif(xml, currency).forEach {
+ when (it) {
+ is TxNotification.Incoming -> ingestIncomingPayment(db, it.payment)
+ is TxNotification.Outgoing -> ingestOutgoingPayment(db, it.payment)
+ is TxNotification.Reversal -> {
+ logger.warn("BOUNCE '${it.msgId}': ${it.reason}")
+ db.initiated.reversal(it.msgId, "Payment bounced: ${it.reason}")
}
}
}
@@ -181,42 +166,38 @@ private fun ingestDocument(
SupportedDocument.PAIN_002_LOGS -> {
val acks = parseCustomerAck(xml)
for (ack in acks) {
- val msg = if (ack.orderId != null) {
- if (ack.code != null) {
- val msg = ack.msg()
- db.mem[ack.orderId] = msg
- msg
- } else {
- db.mem[ack.orderId]
- }
- } else {
- null
- }
when (ack.actionType) {
- HacAction.FILE_DOWNLOAD -> logger.debug("$ack")
HacAction.ORDER_HAC_FINAL_POS -> {
- // TODO update pending transaction status
logger.debug("$ack")
- logger.info("Order '${ack.orderId}' was accepted at ${ack.timestamp.fmtDateTime()}")
+ db.initiated.logSuccess(ack.orderId!!)?.let { requestUID ->
+ logger.info("Payment '$requestUID' accepted at ${ack.timestamp.fmtDateTime()}")
+ }
}
HacAction.ORDER_HAC_FINAL_NEG -> {
- // TODO update pending transaction status
logger.debug("$ack")
- logger.warn("Order '${ack.orderId}' was refused at ${ack.timestamp.fmtDateTime()}: $msg")
+ db.initiated.logFailure(ack.orderId!!)?.let { (requestUID, msg) ->
+ logger.warn("Payment '$requestUID' refused at ${ack.timestamp.fmtDateTime()}${if (msg != null) ": $msg" else ""}")
+ }
}
else -> {
- // TODO update pending transaction status
logger.debug("$ack")
+ if (ack.orderId != null) {
+ db.initiated.logMessage(ack.orderId, ack.msg())
+ }
}
}
}
}
SupportedDocument.PAIN_002 -> {
val status = parseCustomerPaymentStatusReport(xml)
- if (status.paymentCode == ExternalPaymentGroupStatusCode.RJCT)
- logger.warn("Transaction '${status.id()}' was rejected")
- // TODO update pending transaction status
+ val msg = status.msg()
logger.debug("$status")
+ if (status.paymentCode == ExternalPaymentGroupStatusCode.RJCT) {
+ db.initiated.bankFailure(status.msgId, msg)
+ logger.warn("Transaction '${status.msgId}' was rejected : $msg")
+ } else {
+ db.initiated.bankMessage(status.msgId, msg)
+ }
}
SupportedDocument.CAMT_053,
SupportedDocument.CAMT_052 -> {
@@ -226,7 +207,7 @@ private fun ingestDocument(
}
}
-private fun ingestDocuments(
+private suspend fun ingestDocuments(
db: Database,
currency: String,
content: InputStream,