commit 65e2e51fa87b6222fb1b3818deea8ed200d3b72e
parent 61cf08e25e4f4505d1784bb7fe2fb942a2d24a91
Author: Antoine A <>
Date: Thu, 12 Sep 2024 14:28:22 +0200
nexus: batch registration
Diffstat:
4 files changed, 62 insertions(+), 47 deletions(-)
diff --git a/database-versioning/libeufin-nexus-procedures.sql b/database-versioning/libeufin-nexus-procedures.sql
@@ -57,6 +57,51 @@ END $$;
COMMENT ON FUNCTION amount_add
IS 'Returns the normalized sum of two amounts. It raises an exception when the resulting .val is larger than 2^52';
+CREATE FUNCTION register_batch(
+ IN in_msg_id TEXT
+ ,IN in_execution_time INT8
+) returns table (end_to_end_id TEXT)
+LANGUAGE plpgsql AS $$
+DECLARE
+batch_id INT8;
+tx record;
+BEGIN
+-- Update batch status if exists
+UPDATE initiated_outgoing_batches
+SET status = 'success'::submission_state, status_msg = NULL
+WHERE message_id = in_msg_id
+RETURNING initiated_outgoing_batch_id INTO batch_id;
+IF NOT FOUND THEN
+ RETURN;
+END IF;
+
+-- Register pending transaction if they are not yet in final status
+FOR tx IN SELECT amount, subject, credit_payto, initiated_outgoing_transactions.end_to_end_id, wtid, exchange_base_url
+ FROM initiated_outgoing_transactions LEFT JOIN transfer_operations USING (initiated_outgoing_transaction_id)
+ WHERE initiated_outgoing_batch_id = batch_id
+ AND status NOT IN ('permanent_failure', 'success')
+LOOP
+ PERFORM register_outgoing(
+ tx.amount
+ ,tx.subject
+ ,in_execution_time
+ ,tx.credit_payto
+ ,tx.end_to_end_id
+ ,tx.wtid
+ ,tx.exchange_base_url
+ ,NULL
+ );
+END LOOP;
+
+-- Update transactions state if they are not yet in final status
+RETURN QUERY UPDATE initiated_outgoing_transactions
+ SET status = 'success'::submission_state, status_msg = NULL
+ WHERE initiated_outgoing_batch_id = batch_id
+ AND status NOT IN ('permanent_failure', 'success')
+ RETURNING initiated_outgoing_transactions.end_to_end_id;
+END$$;
+
+
CREATE FUNCTION register_outgoing(
IN in_amount taler_amount
,IN in_subject TEXT
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt
@@ -129,8 +129,9 @@ suspend fun ingestFile(
is IncomingPayment -> ingestIncomingPayment(db, it, cfg.accountType)
is OutgoingBatch -> {
logger.debug("{}", it)
- for (pay in db.initiated.ingestBatch(it.msgId, it.executionTime)) {
- ingestOutgoingPayment(db, pay)
+ val ids = db.payment.registerBatch(it.msgId, it.executionTime);
+ if (ids.isNotEmpty()) {
+ logger.info("BATCH ${it.executionTime.fmtDate()} ${it.msgId}: {}", ids.joinToString(","))
}
}
is OutgoingPayment -> ingestOutgoingPayment(db, it)
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt
@@ -312,51 +312,6 @@ class InitiatedDAO(private val db: Database) {
}
}
- /** Register EBICS payment batch as succeded and return pending payments */
- suspend fun ingestBatch(msgId: String, executionTime: Instant) = db.serializableTransaction { tx ->
- // Update batch state
- val batchId = tx.withStatement(
- """
- UPDATE initiated_outgoing_batches
- SET status = 'success'::submission_state, status_msg = NULL
- WHERE message_id = ?
- RETURNING initiated_outgoing_batch_id
- """
- ) {
- setString(1, msgId)
- oneOrNull { it.getLong("initiated_outgoing_batch_id") }
- }
- if (batchId == null) {
- return@serializableTransaction emptyList()
- }
- // Update transactions state if they are not yet in final status
- tx.withStatement(
- """
- UPDATE initiated_outgoing_transactions
- SET status = 'success'::submission_state, status_msg = NULL
- WHERE initiated_outgoing_batch_id = ?
- AND status NOT IN ('permanent_failure', 'success')
- RETURNING end_to_end_id,
- (amount).val as amount_val,
- (amount).frac as amount_frac,
- subject,
- credit_payto
- """
- ) {
- setLong(1, batchId)
- all {
- OutgoingPayment(
- endToEndId = it.getString("end_to_end_id"),
- msgId = msgId,
- amount = it.getAmount("amount", db.bankCurrency),
- subject = it.getString("subject"),
- executionTime = executionTime,
- creditPaytoUri = it.getString("credit_payto")
- )
- }
- }
- }
-
/** Group unbatched transaction into a single batch */
suspend fun batch(timestamp: Instant, ebicsId: String) {
db.serializable("SELECT FROM batch_outgoing_transactions(?, ?)") {
diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt
@@ -67,6 +67,20 @@ class PaymentDAO(private val db: Database) {
}
}
+
+ /** Register an outgoing payment batch reconciling it with its initiated payments counterpart if present */
+ suspend fun registerBatch(msgId: String, executionTime: Instant) = db.serializable(
+ """
+ SELECT end_to_end_id FROM register_batch(?, ?)
+ """
+ ) {
+ setString(1, msgId)
+ setLong(2, executionTime.micros())
+ all {
+ it.getString("end_to_end_id")
+ }
+ }
+
/** Incoming payments bounce registration result */
data class IncomingBounceRegistrationResult(
val id: Long,