commit 5ca91639643d01aaedb33261f56dbf8d67da0bed parent 8ad947fd464260822cec3dc9b0f6492f903f81d1 Author: Antoine A <> Date: Mon, 16 Sep 2024 12:09:33 +0200 nexus: fix registration logic and rename ingest to register Diffstat:
15 files changed, 743 insertions(+), 725 deletions(-)
diff --git a/database-versioning/libeufin-conversion-setup.sql b/database-versioning/libeufin-conversion-setup.sql @@ -72,7 +72,16 @@ LANGUAGE plpgsql AS $$ -- Bounce on soft failures IF too_small THEN -- TODO bounce fees ? - PERFORM bounce_incoming(NEW.incoming_transaction_id, ((local_amount).val, (local_amount).frac)::taler_amount, now_date); + PERFORM bounce_incoming( + NEW.incoming_transaction_id + ,((local_amount).val, (local_amount).frac)::taler_amount + -- use gen_random_uuid to get some randomness + -- remove all - characters as they are not random + -- capitalise the UUID as some bank may still be case sensitive + -- end with 34 random chars which is valid for EBICS (max 35 chars) + ,upper(replace(gen_random_uuid()::text, '-', '')) + ,now_date + ); RETURN NULL; END IF; diff --git a/database-versioning/libeufin-nexus-procedures.sql b/database-versioning/libeufin-nexus-procedures.sql @@ -57,51 +57,6 @@ END $$; COMMENT ON FUNCTION amount_add IS 'Returns the normalized sum of two amounts. It raises an exception when the resulting .val is larger than 2^52'; -CREATE FUNCTION register_batch( - IN in_msg_id TEXT - ,IN in_execution_time INT8 -) returns table (end_to_end_id TEXT) -LANGUAGE plpgsql AS $$ -DECLARE -batch_id INT8; -tx record; -BEGIN --- Update batch status if exists -UPDATE initiated_outgoing_batches -SET status = 'success'::submission_state, status_msg = NULL -WHERE message_id = in_msg_id -RETURNING initiated_outgoing_batch_id INTO batch_id; -IF NOT FOUND THEN - RETURN; -END IF; - --- Register pending transaction if they are not yet in final status -FOR tx IN SELECT amount, subject, credit_payto, initiated_outgoing_transactions.end_to_end_id, wtid, exchange_base_url - FROM initiated_outgoing_transactions LEFT JOIN transfer_operations USING (initiated_outgoing_transaction_id) - WHERE initiated_outgoing_batch_id = batch_id - AND status NOT IN ('permanent_failure', 'success') -LOOP - PERFORM register_outgoing( - tx.amount - ,tx.subject - ,in_execution_time - ,tx.credit_payto - ,tx.end_to_end_id - ,tx.wtid - ,tx.exchange_base_url - ,NULL - ); -END LOOP; - --- Update transactions state if they are not yet in final status -RETURN QUERY UPDATE initiated_outgoing_transactions - SET status = 'success'::submission_state, status_msg = NULL - WHERE initiated_outgoing_batch_id = batch_id - AND status NOT IN ('permanent_failure', 'success') - RETURNING initiated_outgoing_transactions.end_to_end_id; -END$$; - - CREATE FUNCTION register_outgoing( IN in_amount taler_amount ,IN in_subject TEXT @@ -123,6 +78,7 @@ local_subject TEXT; local_credit_payto TEXT; local_wtid BYTEA; local_exchange_base_url TEXT; +local_end_to_end_id TEXT; BEGIN -- Check if already registered SELECT outgoing_transaction_id, subject, credit_payto, (amount).val, (amount).frac, @@ -200,13 +156,17 @@ IF NOT out_found THEN -- Register as talerable if contains wtid and exchange URL IF in_wtid IS NOT NULL OR in_exchange_url IS NOT NULL THEN - INSERT INTO talerable_outgoing_transactions ( - outgoing_transaction_id, - wtid, - exchange_base_url - ) VALUES (out_tx_id, in_wtid, in_exchange_url) - ON CONFLICT (wtid) DO NOTHING; + SELECT end_to_end_id INTO local_end_to_end_id + FROM talerable_outgoing_transactions + JOIN outgoing_transactions USING (outgoing_transaction_id) + WHERE wtid=in_wtid; IF FOUND THEN + IF local_end_to_end_id != in_end_to_end_id THEN + RAISE NOTICE 'wtid reuse: tx % and tx % have the same wtid %', in_end_to_end_id, local_end_to_end_id, in_wtid; + END IF; + ELSE + INSERT INTO talerable_outgoing_transactions(outgoing_transaction_id, wtid, exchange_base_url) + VALUES (out_tx_id, in_wtid, in_exchange_url); PERFORM pg_notify('outgoing_tx', out_tx_id::text); END IF; END IF; @@ -236,135 +196,6 @@ CREATE FUNCTION register_incoming( ,IN in_execution_time INT8 ,IN in_debit_payto TEXT ,IN in_bank_id TEXT - ,OUT out_found BOOLEAN - ,OUT out_tx_id INT8 -) -LANGUAGE plpgsql AS $$ -DECLARE -local_amount taler_amount; -local_subject TEXT; -local_debit_payto TEXT; -BEGIN --- Check if already registered -SELECT incoming_transaction_id, subject, debit_payto, (amount).val, (amount).frac - INTO out_tx_id, local_subject, local_debit_payto, local_amount.val, local_amount.frac - FROM incoming_transactions - WHERE bank_id = in_bank_id; -out_found=FOUND; -IF out_found THEN - IF local_subject != in_subject THEN - RAISE NOTICE 'incoming tx %: stored subjet is ''%'' got ''%''', in_bank_id, local_subject, in_subject; - END IF; - IF local_debit_payto != in_debit_payto THEN - RAISE NOTICE 'incoming tx %: stored subjet debit payto is % got %', in_bank_id, local_debit_payto, in_debit_payto; - END IF; - IF local_amount != in_amount THEN - RAISE NOTICE 'incoming tx %: stored amount is % got %', in_bank_id, local_amount, in_amount; - END IF; -ELSE - -- Store the transaction in the database - INSERT INTO incoming_transactions ( - amount - ,subject - ,execution_time - ,debit_payto - ,bank_id - ) VALUES ( - in_amount - ,in_subject - ,in_execution_time - ,in_debit_payto - ,in_bank_id - ) RETURNING incoming_transaction_id INTO out_tx_id; - PERFORM pg_notify('revenue_tx', out_tx_id::text); -END IF; -END $$; -COMMENT ON FUNCTION register_incoming - IS 'Register an incoming transaction'; - -CREATE FUNCTION bounce_incoming( - IN tx_id INT8 - ,IN in_bounce_amount taler_amount - ,IN in_now_date INT8 - ,OUT out_bounce_id TEXT -) -LANGUAGE plpgsql AS $$ -DECLARE -local_bank_id TEXT; -payto_uri TEXT; -init_id INT8; -BEGIN --- Get incoming transaction bank ID and creditor -SELECT bank_id, debit_payto - INTO local_bank_id, payto_uri - FROM incoming_transactions - WHERE incoming_transaction_id = tx_id; --- Generate a bounce ID deterministically from the bank ID --- We hash the bank ID with SHA-256 then we encode the hash using base64 --- As bank id can be at most 35 characters long we truncate the encoded hash --- We are not sure whether this field is case-insensitive in all banks as the standard --- does not clearly specify this, so we have chosen to capitalise it -SELECT upper(substr(encode(public.digest(local_bank_id, 'sha256'), 'base64'), 0, 35)) INTO out_bounce_id; - --- Initiate the bounce transaction -INSERT INTO initiated_outgoing_transactions ( - amount - ,subject - ,credit_payto - ,initiation_time - ,end_to_end_id - ) VALUES ( - in_bounce_amount - ,'bounce: ' || local_bank_id - ,payto_uri - ,in_now_date - ,out_bounce_id - ) - ON CONFLICT (end_to_end_id) DO NOTHING -- idempotent - RETURNING initiated_outgoing_transaction_id INTO init_id; -IF FOUND THEN - -- Register the bounce - INSERT INTO bounced_transactions ( - incoming_transaction_id ,initiated_outgoing_transaction_id - ) VALUES (tx_id, init_id); -END IF; -END$$; -COMMENT ON FUNCTION bounce_incoming - IS 'Bounce an incoming transaction, initiate a bounce outgoing transaction with a deterministic ID'; - -CREATE FUNCTION register_incoming_and_bounce( - IN in_amount taler_amount - ,IN in_subject TEXT - ,IN in_execution_time INT8 - ,IN in_debit_payto TEXT - ,IN in_bank_id TEXT - ,IN in_bounce_amount taler_amount - ,IN in_now_date INT8 - ,OUT out_found BOOLEAN - ,OUT out_tx_id INT8 - ,OUT out_bounce_id TEXT -) -LANGUAGE plpgsql AS $$ -DECLARE -init_id INT8; -BEGIN --- Register the incoming transaction -SELECT reg.out_found, reg.out_tx_id - FROM register_incoming(in_amount, in_subject, in_execution_time, in_debit_payto, in_bank_id) as reg - INTO out_found, out_tx_id; - --- Bounce the incoming transaction -SELECT b.out_bounce_id INTO out_bounce_id FROM bounce_incoming(out_tx_id, in_bounce_amount, in_now_date) as b; -END $$; -COMMENT ON FUNCTION register_incoming_and_bounce - IS 'Register an incoming transaction and bounce it'; - -CREATE FUNCTION register_incoming_and_talerable( - IN in_amount taler_amount - ,IN in_subject TEXT - ,IN in_execution_time INT8 - ,IN in_debit_payto TEXT - ,IN in_bank_id TEXT ,IN in_type taler_incoming_type ,IN in_reserve_pub BYTEA ,IN in_account_pub BYTEA @@ -377,8 +208,13 @@ CREATE FUNCTION register_incoming_and_talerable( LANGUAGE plpgsql AS $$ DECLARE need_reconcile BOOLEAN; +local_amount taler_amount; +local_subject TEXT; +local_debit_payto TEXT; BEGIN -IF in_type = 'reserve' THEN +IF in_type IS NULL THEN + -- No talerable logic +ELSIF in_type = 'reserve' THEN -- Search if already inserted based on unique reserve_pub key -- Reconcile missing bank_id if metadata match -- Check for reserve_pub reuse @@ -427,13 +263,43 @@ ELSE RAISE EXCEPTION 'Unsupported incoming type %', in_type; END IF; --- Register the incoming transaction -SELECT reg.out_found, reg.out_tx_id - FROM register_incoming(in_amount, in_subject, in_execution_time, in_debit_payto, in_bank_id) as reg - INTO out_found, out_tx_id; +-- Check if already registered +SELECT incoming_transaction_id, subject, debit_payto, (amount).val, (amount).frac + INTO out_tx_id, local_subject, local_debit_payto, local_amount.val, local_amount.frac + FROM incoming_transactions + WHERE bank_id = in_bank_id; +out_found=FOUND; +IF out_found THEN + -- Check metadata + IF local_subject != in_subject THEN + RAISE NOTICE 'incoming tx %: stored subjet is ''%'' got ''%''', in_bank_id, local_subject, in_subject; + END IF; + IF local_debit_payto != in_debit_payto THEN + RAISE NOTICE 'incoming tx %: stored subjet debit payto is % got %', in_bank_id, local_debit_payto, in_debit_payto; + END IF; + IF local_amount != in_amount THEN + RAISE NOTICE 'incoming tx %: stored amount is % got %', in_bank_id, local_amount, in_amount; + END IF; +ELSE + -- Store the transaction in the database + INSERT INTO incoming_transactions ( + amount + ,subject + ,execution_time + ,debit_payto + ,bank_id + ) VALUES ( + in_amount + ,in_subject + ,in_execution_time + ,in_debit_payto + ,in_bank_id + ) RETURNING incoming_transaction_id INTO out_tx_id; + PERFORM pg_notify('revenue_tx', out_tx_id::text); +END IF; -- Register as talerable -IF NOT EXISTS(SELECT FROM talerable_incoming_transactions WHERE incoming_transaction_id = out_tx_id) THEN +IF in_type IS NOT NULL AND NOT EXISTS(SELECT FROM talerable_incoming_transactions WHERE incoming_transaction_id = out_tx_id) THEN -- We cannot use ON CONFLICT here because conversion use a trigger before insertion that isn't idempotent INSERT INTO talerable_incoming_transactions ( incoming_transaction_id @@ -449,10 +315,80 @@ IF NOT EXISTS(SELECT FROM talerable_incoming_transactions WHERE incoming_transac PERFORM pg_notify('incoming_tx', out_tx_id::text); END IF; END $$; -COMMENT ON FUNCTION register_incoming_and_talerable IS ' -Creates one row in the incoming transactions table and one row -in the talerable transactions table. The talerable row links the -incoming one.'; + +CREATE FUNCTION register_and_bounce_incoming( + IN in_amount taler_amount + ,IN in_subject TEXT + ,IN in_execution_time INT8 + ,IN in_debit_payto TEXT + ,IN in_bank_id TEXT + ,IN in_bounce_amount taler_amount + ,IN in_now_date INT8 + ,IN in_bounce_id TEXT + ,OUT out_found BOOLEAN + ,OUT out_tx_id INT8 + ,OUT out_bounce_id TEXT +) +LANGUAGE plpgsql AS $$ +DECLARE +init_id INT8; +BEGIN +-- Register incoming transaction +SELECT reg.out_found, reg.out_tx_id + FROM register_incoming(in_amount, in_subject, in_execution_time, in_debit_payto, in_bank_id, NULL, NULL, NULL) as reg + INTO out_found, out_tx_id; + +-- Bounce incoming transaction +SELECT bounce.out_bounce_id INTO out_bounce_id FROM bounce_incoming(out_tx_id, in_bounce_amount, in_bounce_id, in_now_date) AS bounce; +END $$; + +CREATE FUNCTION bounce_incoming( + IN in_tx_id INT8 + ,IN in_bounce_amount taler_amount + ,IN in_bounce_id TEXT + ,IN in_now_date INT8 + ,OUT out_bounce_id TEXT +) +LANGUAGE plpgsql AS $$ +DECLARE +local_bank_id TEXT; +payto_uri TEXT; +init_id INT8; +BEGIN +-- Check if already bounce +SELECT end_to_end_id INTO out_bounce_id + FROM initiated_outgoing_transactions + JOIN bounced_transactions USING (initiated_outgoing_transaction_id) + WHERE incoming_transaction_id = in_tx_id; + +-- Else initiate the bounce transaction +IF NOT FOUND THEN + out_bounce_id = in_bounce_id; + -- Get incoming transaction bank ID and creditor + SELECT bank_id, debit_payto + INTO local_bank_id, payto_uri + FROM incoming_transactions + WHERE incoming_transaction_id = in_tx_id; + -- Initiate the bounce transaction + INSERT INTO initiated_outgoing_transactions ( + amount + ,subject + ,credit_payto + ,initiation_time + ,end_to_end_id + ) VALUES ( + in_bounce_amount + ,'bounce: ' || local_bank_id + ,payto_uri + ,in_now_date + ,in_bounce_id + ) + RETURNING initiated_outgoing_transaction_id INTO init_id; + -- Register the bounce + INSERT INTO bounced_transactions (incoming_transaction_id, initiated_outgoing_transaction_id) + VALUES (in_tx_id, init_id); +END IF; +END$$; CREATE FUNCTION taler_transfer( IN in_request_uid BYTEA, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/EbicsFetch.kt @@ -31,18 +31,18 @@ import kotlinx.coroutines.* import tech.libeufin.common.* import tech.libeufin.nexus.* import tech.libeufin.nexus.db.* -import tech.libeufin.nexus.db.PaymentDAO.IncomingRegistrationResult +import tech.libeufin.nexus.db.PaymentDAO.* import tech.libeufin.nexus.ebics.* import tech.libeufin.nexus.iso20022.* import java.io.IOException import java.io.InputStream import java.time.Instant -/** Ingests an outgoing [payment] into [db] */ -suspend fun ingestOutgoingPayment( +/** Register an outgoing [payment] into [db] */ +suspend fun registerOutgoingPayment( db: Database, payment: OutgoingPayment -) { +): OutgoingRegistrationResult { val metadata: Pair<ShortHashCode, ExchangeUrl>? = payment.subject?.let { runCatching { parseOutgoingTxMetadata(it) }.getOrNull() } @@ -55,13 +55,25 @@ suspend fun ingestOutgoingPayment( } else { logger.debug("{} already seen", payment) } + return result +} + +/** Register an outgoing [payment] into [db] */ +suspend fun registerOutgoingBatch( + db: Database, + batch: OutgoingBatch +) { + logger.info("BATCH ${batch.executionTime.fmtDate()} ${batch.msgId}") + for (it in db.initiated.unsettledTxInBatch(batch.msgId, batch.executionTime)) { + registerOutgoingPayment(db, it) + } } /** - * Ingest an incoming [payment] into [db] + * Register an incoming [payment] into [db] * Stores the payment into valid talerable ones or bounces it, according to [accountType] . */ -suspend fun ingestIncomingPayment( +suspend fun registerIncomingPayment( db: Database, payment: IncomingPayment, accountType: AccountType @@ -75,7 +87,8 @@ suspend fun ingestIncomingPayment( AccountType.exchange -> { val result = db.payment.registerMalformedIncoming( payment, - payment.amount, + payment.amount, + randEbicsId(), Instant.now() ) if (result.new) { @@ -111,8 +124,8 @@ suspend fun ingestIncomingPayment( ) } -/** Ingest a single EBICS [xml] [document] into [db] */ -suspend fun ingestFile( +/** Register a single EBICS [xml] [document] into [db] */ +suspend fun registerFile( db: Database, cfg: NexusEbicsConfig, xml: InputStream, @@ -126,15 +139,9 @@ suspend fun ingestFile( logger.debug("IGNORE {}", it) } else { when (it) { - is IncomingPayment -> ingestIncomingPayment(db, it, cfg.accountType) - is OutgoingBatch -> { - logger.debug("{}", it) - val ids = db.payment.registerBatch(it.msgId, it.executionTime); - if (ids.isNotEmpty()) { - logger.info("BATCH ${it.executionTime.fmtDate()} ${it.msgId}: {}", ids.joinToString(",")) - } - } - is OutgoingPayment -> ingestOutgoingPayment(db, it) + is IncomingPayment -> registerIncomingPayment(db, it, cfg.accountType) + is OutgoingBatch -> registerOutgoingBatch(db, it) + is OutgoingPayment -> registerOutgoingPayment(db, it) is OutgoingReversal -> { logger.error("{}", it) db.initiated.txStatusUpdate(it.endToEndId, it.msgId, SubmissionState.permanent_failure, "Payment bounced: ${it.reason}") @@ -143,7 +150,7 @@ suspend fun ingestFile( } } } catch (e: Exception) { - throw Exception("Ingesting notifications failed", e) + throw Exception("Notifications registration failed", e) } } OrderDoc.acknowledgement -> { @@ -218,8 +225,8 @@ suspend fun ingestFile( } } -/** Ingest an EBICS [payload] of [doc] into [db] */ -private suspend fun ingestPayload( +/** Register an EBICS [payload] of [doc] into [db] */ +private suspend fun registerPayload( db: Database, cfg: NexusEbicsConfig, payload: InputStream, @@ -234,18 +241,18 @@ private suspend fun ingestPayload( try { payload.unzipEach { fileName, xml -> logger.trace("parse $fileName") - ingestFile(db, cfg, xml, doc) + registerFile(db, cfg, xml, doc) } } catch (e: IOException) { throw Exception("Could not open any ZIP archive", e) } } - OrderDoc.acknowledgement -> ingestFile(db, cfg, payload, doc) + OrderDoc.acknowledgement -> registerFile(db, cfg, payload, doc) } } /** - * Fetch and ingest banking records from [orders] using EBICS [client] starting from [pinnedStart] + * Fetch and register banking records from [orders] using EBICS [client] starting from [pinnedStart] * * If [pinnedStart] is null fetch new records. */ @@ -273,7 +280,7 @@ private suspend fun fetchEbicsDocuments( lastExecutionTime, null ) { payload -> - ingestPayload(client.db, client.cfg, payload, doc) + registerPayload(client.db, client.cfg, payload, doc) } true } catch (e: Exception) { @@ -284,7 +291,7 @@ private suspend fun fetchEbicsDocuments( } } -class EbicsFetch: CliktCommand("Downloads and parse EBICS files from the bank and ingest them into the database") { +class EbicsFetch: CliktCommand("Downloads and parse EBICS files from the bank and register them into the database") { private val common by CommonOption() private val transient by transientOption() private val documents: Set<OrderDoc> by argument( diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/cli/Testing.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/cli/Testing.kt @@ -86,7 +86,7 @@ class FakeIncoming: CliktCommand("Genere a fake incoming payment") { "Wrong currency: expected ${cfg.currency} got ${amount.currency}" } - ingestIncomingPayment(db, + registerIncomingPayment(db, IncomingPayment( amount = amount, debtorPayto = payto, diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/InitiatedDAO.kt @@ -312,6 +312,33 @@ class InitiatedDAO(private val db: Database) { } } + /** Unsettled intiaited payment in batch [msgId] */ + suspend fun unsettledTxInBatch(msgId: String, executionTime: Instant) = db.serializable( + """ + SELECT end_to_end_id, + (amount).val as amount_val, + (amount).frac as amount_frac, + subject, + credit_payto + FROM initiated_outgoing_transactions + JOIN initiated_outgoing_batches USING (initiated_outgoing_batch_id) + WHERE message_id = ? + AND initiated_outgoing_transactions.status NOT IN ('permanent_failure', 'success') + """ + ) { + setString(1, msgId) + all { + OutgoingPayment( + endToEndId = it.getString("end_to_end_id"), + msgId = msgId, + amount = it.getAmount("amount", db.bankCurrency), + subject = it.getString("subject"), + executionTime = executionTime, + creditorPayto = it.getIbanPayto("credit_payto") + ) + } + } + /** Group unbatched transaction into a single batch */ suspend fun batch(timestamp: Instant, ebicsId: String) { db.serializable("SELECT FROM batch_outgoing_transactions(?, ?)") { diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/PaymentDAO.kt @@ -67,20 +67,6 @@ class PaymentDAO(private val db: Database) { } } - - /** Register an outgoing payment batch reconciling it with its initiated payments counterpart if present */ - suspend fun registerBatch(msgId: String, executionTime: Instant) = db.serializable( - """ - SELECT end_to_end_id FROM register_batch(?, ?) - """ - ) { - setString(1, msgId) - setLong(2, executionTime.micros()) - all { - it.getString("end_to_end_id") - } - } - /** Incoming payments bounce registration result */ data class IncomingBounceRegistrationResult( val id: Long, @@ -92,11 +78,12 @@ class PaymentDAO(private val db: Database) { suspend fun registerMalformedIncoming( paymentData: IncomingPayment, bounceAmount: TalerAmount, + bounceEndToEndId: String, timestamp: Instant ): IncomingBounceRegistrationResult = db.serializable( """ SELECT out_found, out_tx_id, out_bounce_id - FROM register_incoming_and_bounce((?,?)::taler_amount,?,?,?,?,(?,?)::taler_amount,?) + FROM register_and_bounce_incoming((?,?)::taler_amount,?,?,?,?,(?,?)::taler_amount,?,?) """ ) { setLong(1, paymentData.amount.value) @@ -108,6 +95,7 @@ class PaymentDAO(private val db: Database) { setLong(7, bounceAmount.value) setInt(8, bounceAmount.frac) setLong(9, timestamp.micros()) + setString(10, bounceEndToEndId) one { IncomingBounceRegistrationResult( it.getLong("out_tx_id"), @@ -130,7 +118,7 @@ class PaymentDAO(private val db: Database) { ): IncomingRegistrationResult = db.serializable( """ SELECT out_reserve_pub_reuse, out_found, out_tx_id - FROM register_incoming_and_talerable((?,?)::taler_amount,?,?,?,?,?::taler_incoming_type,?,?) + FROM register_incoming((?,?)::taler_amount,?,?,?,?,?::taler_incoming_type,?,?) """ ) { val executionTime = paymentData.executionTime.micros() @@ -169,7 +157,7 @@ class PaymentDAO(private val db: Database) { ): IncomingRegistrationResult.Success = db.serializable( """ SELECT out_found, out_tx_id - FROM register_incoming((?,?)::taler_amount,?,?,?,?) + FROM register_incoming((?,?)::taler_amount,?,?,?,?,NULL,NULL,NULL) """ ) { val executionTime = paymentData.executionTime.micros() diff --git a/nexus/src/test/kotlin/CliTest.kt b/nexus/src/test/kotlin/CliTest.kt @@ -146,8 +146,8 @@ class CliTest { // Check empty check() // Check with transactions - ingestIn(db) - ingestOut(db) + registerIn(db) + registerOut(db) check() // Check with taler transactions talerableOut(db) diff --git a/nexus/src/test/kotlin/DatabaseTest.kt b/nexus/src/test/kotlin/DatabaseTest.kt @@ -22,58 +22,16 @@ import tech.libeufin.common.ShortHashCode import tech.libeufin.common.TalerAmount import tech.libeufin.common.db.one import tech.libeufin.nexus.AccountType -import tech.libeufin.nexus.cli.ingestIncomingPayment -import tech.libeufin.nexus.cli.ingestOutgoingPayment -import tech.libeufin.nexus.db.Database -import tech.libeufin.nexus.db.InitiatedDAO.PaymentInitiationResult -import tech.libeufin.nexus.db.SubmissionState +import tech.libeufin.nexus.cli.* +import tech.libeufin.nexus.db.* +import tech.libeufin.nexus.db.InitiatedDAO.* +import tech.libeufin.nexus.db.PaymentDAO.* +import tech.libeufin.nexus.ebics.randEbicsId +import tech.libeufin.nexus.iso20022.* import java.time.Instant import kotlin.test.* -class OutgoingPaymentsTest { - @Test - fun register() = setup { db, _ -> - // With reconciling - genOutPay("paid by nexus").run { - assertIs<PaymentInitiationResult.Success>( - db.initiated.create(genInitPay(endToEndId, "waiting for reconciliation")) - ) - db.payment.registerOutgoing(this, null, null).run { - assertTrue(new) - assertTrue(initiated) - } - db.payment.registerOutgoing(this, null, null).run { - assertFalse(new) - assertTrue(initiated) - } - } - // Without reconciling - genOutPay("not paid by nexus").run { - db.payment.registerOutgoing(this, null, null).run { - assertTrue(new) - assertFalse(initiated) - } - db.payment.registerOutgoing(this, null, null).run { - assertFalse(new) - assertFalse(initiated) - } - } - } - - @Test - fun talerable() = setup { db, _ -> - val wtid = ShortHashCode.rand() - val url = "https://exchange.com" - genOutPay("$wtid $url").run { - assertIs<PaymentInitiationResult.Success>( - db.initiated.create(genInitPay(endToEndId, "waiting for reconciliation")) - ) - ingestOutgoingPayment(db, this) - } - } -} - -suspend fun Database.getCount(): Triple<Int, Int, Int> = serializable( +suspend fun Database.checkInCount(nbIncoming: Int, nbBounce: Int, nbTalerable: Int) = serializable( """ SELECT (SELECT count(*) FROM incoming_transactions) AS incoming, (SELECT count(*) FROM bounced_transactions) AS bounce, @@ -81,12 +39,25 @@ suspend fun Database.getCount(): Triple<Int, Int, Int> = serializable( """ ) { one { - Triple(it.getInt("incoming"), it.getInt("bounce"), it.getInt("talerable")) + assertEquals( + Triple(nbIncoming, nbBounce, nbTalerable), + Triple(it.getInt("incoming"), it.getInt("bounce"), it.getInt("talerable")) + ) } } -suspend fun Database.checkCount(nbIncoming: Int, nbBounce: Int, nbTalerable: Int) { - assertEquals(Triple(nbIncoming, nbBounce, nbTalerable), getCount()) +suspend fun Database.checkOutCount(nbIncoming: Int, nbTalerable: Int) = serializable( + """ + SELECT (SELECT count(*) FROM outgoing_transactions) AS incoming, + (SELECT count(*) FROM talerable_outgoing_transactions) AS talerable; + """ +) { + one { + assertEquals( + Pair(nbIncoming, nbTalerable), + Pair(it.getInt("incoming"), it.getInt("talerable")) + ) + } } suspend fun Database.inTxExists(id: String): Boolean = serializable( @@ -98,25 +69,105 @@ suspend fun Database.inTxExists(id: String): Boolean = serializable( } } +class OutgoingPaymentsTest { + @Test + fun registerTx() = setup { db, _ -> + // Register initiated transaction + for (subject in sequenceOf( + "initiated by nexus", + "${ShortHashCode.rand()} https://exchange.com" + )) { + val pay = genOutPay(subject) + assertIs<PaymentInitiationResult.Success>( + db.initiated.create(genInitPay(pay.endToEndId, subject)) + ) + val first = registerOutgoingPayment(db, pay) + assertEquals(OutgoingRegistrationResult(id = first.id, initiated = true, new = true), first) + assertEquals( + OutgoingRegistrationResult(id = first.id, initiated = true, new = false), + registerOutgoingPayment(db, pay) + ) + } + db.checkOutCount(nbIncoming = 2, nbTalerable = 1) + + // Register unknonwn + for (subject in sequenceOf( + "not initiated by nexus", + "${ShortHashCode.rand()} https://exchange.com" + )) { + val pay = genOutPay(subject) + val first = registerOutgoingPayment(db, pay) + assertEquals(OutgoingRegistrationResult(id = first.id, initiated = false, new = true), first) + assertEquals( + OutgoingRegistrationResult(id = first.id, initiated = false, new = false), + registerOutgoingPayment(db, pay) + ) + } + db.checkOutCount(nbIncoming = 4, nbTalerable = 2) + + // Register wtid reuse + val wtid = ShortHashCode.rand() + for (subject in sequenceOf( + "$wtid https://exchange.com", + "$wtid https://exchange.com" + )) { + val pay = genOutPay(subject) + val first = registerOutgoingPayment(db, pay) + assertEquals(OutgoingRegistrationResult(id = first.id, initiated = false, new = true), first) + assertEquals( + OutgoingRegistrationResult(id = first.id, initiated = false, new = false), + db.payment.registerOutgoing(pay, null, null) + ) + } + db.checkOutCount(nbIncoming = 6, nbTalerable = 3) + } + + @Test + fun registerBatch() = setup { db, _ -> + // Init batch + val wtid = ShortHashCode.rand() + for (subject in sequenceOf( + "initiated by nexus", + "${ShortHashCode.rand()} https://exchange.com", + "$wtid https://exchange.com", + "$wtid https://exchange.com" + )) { + assertIs<PaymentInitiationResult.Success>( + db.initiated.create(genInitPay(randEbicsId(), subject=subject)) + ) + } + db.initiated.batch(Instant.now(), "BATCH") + + // Register batch + registerOutgoingBatch(db, OutgoingBatch("BATCH", Instant.now())); + db.checkOutCount(nbIncoming = 4, nbTalerable = 2) + } +} + class IncomingPaymentsTest { // Tests creating and bouncing incoming payments in one DB transaction @Test fun bounce() = setup { db, _ -> // creating and bouncing one incoming transaction. val payment = genInPay("incoming and bounced") + val id = randEbicsId() db.payment.registerMalformedIncoming( payment, TalerAmount("KUDOS:2.53"), + id, Instant.now() ).run { assertTrue(new) + assertEquals(id, bounceId) } db.payment.registerMalformedIncoming( payment, TalerAmount("KUDOS:2.53"), + randEbicsId(), Instant.now() ).run { assertFalse(new) + assertEquals(id, bounceId) } db.conn { // Checking one incoming got created @@ -161,39 +212,39 @@ class IncomingPaymentsTest { // Register with missing ID val incoming = genInPay(subject) val incomingMissingId = incoming.copy(bankId = null) - ingestIncomingPayment(db, incomingMissingId, AccountType.exchange) - db.checkCount(1, 0, 1) + registerIncomingPayment(db, incomingMissingId, AccountType.exchange) + db.checkInCount(1, 0, 1) assertFalse(db.inTxExists(incoming.bankId!!)) // Idempotent - ingestIncomingPayment(db, incomingMissingId, AccountType.exchange) - db.checkCount(1, 0, 1) + registerIncomingPayment(db, incomingMissingId, AccountType.exchange) + db.checkInCount(1, 0, 1) // Different metadata is bounced - ingestIncomingPayment(db, genInPay(subject, "KUDOS:9"), AccountType.exchange) - ingestIncomingPayment(db, genInPay("another $subject"), AccountType.exchange) - db.checkCount(3, 2, 1) + registerIncomingPayment(db, genInPay(subject, "KUDOS:9"), AccountType.exchange) + registerIncomingPayment(db, genInPay("another $subject"), AccountType.exchange) + db.checkInCount(3, 2, 1) // Different medata with missing id is ignored - ingestIncomingPayment(db, incomingMissingId.copy(amount = TalerAmount("KUDOS:9")), AccountType.exchange) - ingestIncomingPayment(db, incomingMissingId.copy(subject = "another $subject"), AccountType.exchange) - db.checkCount(3, 2, 1) + registerIncomingPayment(db, incomingMissingId.copy(amount = TalerAmount("KUDOS:9")), AccountType.exchange) + registerIncomingPayment(db, incomingMissingId.copy(subject = "another $subject"), AccountType.exchange) + db.checkInCount(3, 2, 1) // Recover bank ID when metadata match - ingestIncomingPayment(db, incoming, AccountType.exchange) + registerIncomingPayment(db, incoming, AccountType.exchange) assertTrue(db.inTxExists(incoming.bankId!!)) // Idempotent - ingestIncomingPayment(db, incoming, AccountType.exchange) - db.checkCount(3, 2, 1) + registerIncomingPayment(db, incoming, AccountType.exchange) + db.checkInCount(3, 2, 1) // Missing ID is ignored - ingestIncomingPayment(db, incomingMissingId, AccountType.exchange) - db.checkCount(3, 2, 1) + registerIncomingPayment(db, incomingMissingId, AccountType.exchange) + db.checkInCount(3, 2, 1) // Other ID is bounced known that we know the id - ingestIncomingPayment(db, incomingMissingId.copy(bankId = "NEW"), AccountType.exchange) - db.checkCount(4, 3, 1) + registerIncomingPayment(db, incomingMissingId.copy(bankId = "NEW"), AccountType.exchange) + db.checkInCount(4, 3, 1) } // Test creating an incoming kyc taler transaction without and ID and reconcile it later again @@ -204,39 +255,39 @@ class IncomingPaymentsTest { // Register with missing ID val incoming = genInPay(subject) val incomingMissingId = incoming.copy(bankId = null) - ingestIncomingPayment(db, incomingMissingId, AccountType.exchange) - db.checkCount(1, 0, 1) + registerIncomingPayment(db, incomingMissingId, AccountType.exchange) + db.checkInCount(1, 0, 1) assertFalse(db.inTxExists(incoming.bankId!!)) // Idempotent - ingestIncomingPayment(db, incomingMissingId, AccountType.exchange) - db.checkCount(1, 0, 1) + registerIncomingPayment(db, incomingMissingId, AccountType.exchange) + db.checkInCount(1, 0, 1) // Different metadata is accepted - ingestIncomingPayment(db, genInPay(subject, "KUDOS:9"), AccountType.exchange) - ingestIncomingPayment(db, genInPay("another $subject"), AccountType.exchange) - db.checkCount(3, 0, 3) + registerIncomingPayment(db, genInPay(subject, "KUDOS:9"), AccountType.exchange) + registerIncomingPayment(db, genInPay("another $subject"), AccountType.exchange) + db.checkInCount(3, 0, 3) // Different medata with missing id are accepted - ingestIncomingPayment(db, incomingMissingId.copy(amount = TalerAmount("KUDOS:9.5")), AccountType.exchange) - ingestIncomingPayment(db, incomingMissingId.copy(subject = "again another $subject"), AccountType.exchange) - db.checkCount(5, 0, 5) + registerIncomingPayment(db, incomingMissingId.copy(amount = TalerAmount("KUDOS:9.5")), AccountType.exchange) + registerIncomingPayment(db, incomingMissingId.copy(subject = "again another $subject"), AccountType.exchange) + db.checkInCount(5, 0, 5) // Recover bank ID when metadata match - ingestIncomingPayment(db, incoming, AccountType.exchange) + registerIncomingPayment(db, incoming, AccountType.exchange) assertTrue(db.inTxExists(incoming.bankId!!)) // Idempotent - ingestIncomingPayment(db, incoming, AccountType.exchange) - db.checkCount(5, 0, 5) + registerIncomingPayment(db, incoming, AccountType.exchange) + db.checkInCount(5, 0, 5) // Missing ID is ignored - ingestIncomingPayment(db, incomingMissingId, AccountType.exchange) - db.checkCount(5, 0, 5) + registerIncomingPayment(db, incomingMissingId, AccountType.exchange) + db.checkInCount(5, 0, 5) // Other ID is accepted - ingestIncomingPayment(db, incomingMissingId.copy(bankId = "NEW"), AccountType.exchange) - db.checkCount(6, 0, 6) + registerIncomingPayment(db, incomingMissingId.copy(bankId = "NEW"), AccountType.exchange) + db.checkInCount(6, 0, 6) } } diff --git a/nexus/src/test/kotlin/IngestionTest.kt b/nexus/src/test/kotlin/IngestionTest.kt @@ -1,361 +0,0 @@ -/* - * This file is part of LibEuFin. - * Copyright (C) 2024 Taler Systems S.A. - - * LibEuFin is free software; you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation; either version 3, or - * (at your option) any later version. - - * LibEuFin is distributed in the hope that it will be useful, but - * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY - * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General - * Public License for more details. - - * You should have received a copy of the GNU Affero General Public - * License along with LibEuFin; see the file COPYING. If not, see - * <http://www.gnu.org/licenses/> - */ - -import org.junit.Test -import tech.libeufin.common.* -import tech.libeufin.common.db.* -import tech.libeufin.nexus.* -import tech.libeufin.nexus.cli.* -import tech.libeufin.nexus.db.* -import tech.libeufin.nexus.ebics.* -import tech.libeufin.nexus.iso20022.* -import java.nio.file.Files -import java.time.Instant -import kotlin.io.path.* -import kotlin.test.* - -/** End-to-end test for XML file ingestion */ -class IngestionTest { - - /** Register batches of initiated payments for reconcciliation */ - suspend fun Database.batches(batches: Map<String, List<InitiatedPayment>>): List<PaymentBatch> { - val tmp = mutableListOf<PaymentBatch>() - for ((name, txs) in batches) { - for (tx in txs) { - initiated.create(tx) - } - this.initiated.batch(Instant.now(), name) - val (batch) = this.initiated.submittable("EUR") - this.initiated.batchSubmissionSuccess(batch.id, Instant.now(), name.replace("BATCH", "ORDER")) - tmp.add(batch) - } - return tmp - } - - /** Ingest an XML sample into the database */ - suspend fun Database.ingest( - cfg: NexusEbicsConfig, - path: String, - doc: OrderDoc - ) { - ingestFile(this, cfg, Files.newInputStream(Path(path)), doc) - } - - /** Check database content */ - suspend fun Database.check( - status: Map<String, Pair<SubmissionState, Map<String, SubmissionState>>>, - incoming: List<IncomingPayment>, - outgoing: List<OutgoingPayment> - ) { - // Check batch status - val batch_status = this.serializable( - """ - SELECT message_id, status FROM initiated_outgoing_batches ORDER BY initiated_outgoing_batch_id - """ - ) { - all { - Pair( - it.getString("message_id"), - it.getEnum<SubmissionState>("status") - ) - } - } - assertContentEquals(status.map { Pair(it.key, it.value.first) }, batch_status) - - // Check transactions status - val batch_tx = this.serializable( - """ - SELECT message_id, end_to_end_id, initiated_outgoing_transactions.status - FROM initiated_outgoing_transactions - JOIN initiated_outgoing_batches USING (initiated_outgoing_batch_id) - ORDER BY initiated_outgoing_batch_id, initiated_outgoing_transaction_id - """ - ) { - all { - Triple( - it.getString("message_id"), - it.getString("end_to_end_id"), - it.getEnum<SubmissionState>("status") - ) - }.groupBy( - keySelector = { it.first }, - valueTransform = { Pair(it.second, it.third) } - ).mapValues { it.value.toMap() } - } - assertContentEquals(status.mapValues { it.value.second }.toList(), batch_tx.toList()) - - // Check incoming transactions - val incoming_tx = this.serializable( - """ - SELECT bank_id - ,(amount).val as amount_val - ,(amount).frac AS amount_frac - ,subject - ,execution_time - ,debit_payto - FROM incoming_transactions - ORDER BY incoming_transaction_id - """ - ) { - all { - IncomingPayment( - bankId = it.getString("bank_id"), - amount = it.getAmount("amount", this@check.bankCurrency), - subject = it.getString("subject"), - executionTime = it.getLong("execution_time").asInstant(), - debtorPayto = it.getIbanPayto("debit_payto"), - ) - } - } - assertContentEquals(incoming, incoming_tx) - - // Check outgoing transactions - val outgoing_tx = this.serializable( - """ - SELECT end_to_end_id - ,(amount).val as amount_val - ,(amount).frac AS amount_frac - ,subject - ,execution_time - ,credit_payto - FROM outgoing_transactions - ORDER BY outgoing_transaction_id - """ - ) { - all { - OutgoingPayment( - endToEndId = it.getString("end_to_end_id"), - amount = it.getAmount("amount", this@check.bankCurrency), - subject = it.getString("subject"), - executionTime = it.getLong("execution_time").asInstant(), - creditorPayto = it.getIbanPayto("credit_payto"), - ) - } - } - assertContentEquals(outgoing, outgoing_tx) - } - - @Test - fun pain001() = setup { db, cfg -> - val (batch) = db.batches(mapOf( - "MESSAGE_ID" to listOf( - genInitPay( - endToEndId = "TX_FIRST", - amount = "EUR:42", - subject = "Test 42", - ), - genInitPay( - endToEndId = "TX_SECOND", - amount = "EUR:5.11", - subject = "Test 5.11", - ), - genInitPay( - endToEndId = "TX_THIRD", - amount = "EUR:0.21", - subject = "Test 0.21", - ), - ), - )) - val msg = batchToPain001Msg(cfg.ebics.account, batch).copy(timestamp = dateToInstant("2024-09-09"),) - for (dialect in Dialect.entries) { - assertContentEquals( - Path("sample/platform/${dialect}_pain001.xml").readBytes(), - createPain001(msg, dialect) - ) - } - } - - /** HAC order id test */ - @Test - fun hac() = setup { db, cfg -> - db.batches(mapOf( - "BATCH_SUCCESS" to listOf( - genInitPay("BATCH_SUCCESS_0"), - genInitPay("BATCH_SUCCESS_1"), - ), - "BATCH_FAILURE" to listOf( - genInitPay("BATCH_FAILURE_0"), - genInitPay("BATCH_FAILURE_1"), - ) - )) - - // Ingest HAC files - db.ingest(cfg.ebics, "sample/platform/hac.xml", OrderDoc.acknowledgement) - - // Check state - db.check( - status = mapOf( - "BATCH_SUCCESS" to Pair(SubmissionState.success, mapOf( - "BATCH_SUCCESS_0" to SubmissionState.pending, - "BATCH_SUCCESS_1" to SubmissionState.pending, - )), - "BATCH_FAILURE" to Pair(SubmissionState.permanent_failure, mapOf( - "BATCH_FAILURE_0" to SubmissionState.permanent_failure, - "BATCH_FAILURE_1" to SubmissionState.permanent_failure, - )) - ), - incoming = emptyList(), - outgoing = emptyList() - ) - } - - /** CreditSuisse dialect test */ - @Test - fun cs() = setup { db, cfg -> - db.batches(mapOf( - "05BD4C5B4A2649B5B08F6EF6A31F197A" to listOf( - genInitPay("AQCXNCPWD8PHW5JTN65Y5XTF7R"), - genInitPay("EE9SX76FC5YSC657EK3GMVZ9TC"), - genInitPay("V5B3MXPEWES9VQW1JDRD6VAET4"), - genInitPay("M9NGRCAC1FBX3ENX3XEDEPJ2JW"), - ), - )) - - // Ingest pain files - db.ingest(cfg.ebics, "sample/platform/pain002.xml", OrderDoc.status) - - // Check state - db.check( - status = mapOf( - "05BD4C5B4A2649B5B08F6EF6A31F197A" to Pair(SubmissionState.pending, mapOf( - "AQCXNCPWD8PHW5JTN65Y5XTF7R" to SubmissionState.permanent_failure, - "EE9SX76FC5YSC657EK3GMVZ9TC" to SubmissionState.permanent_failure, - "V5B3MXPEWES9VQW1JDRD6VAET4" to SubmissionState.permanent_failure, - "M9NGRCAC1FBX3ENX3XEDEPJ2JW" to SubmissionState.pending, - )), - ), - incoming = emptyList(), - outgoing = emptyList() - ) - } - - /** GLS dialect test */ - @Test - fun gls() = setup("gls.conf") { db, cfg -> - db.batches(mapOf( - "COMPAT_SUCCESS" to listOf( - genInitPay("COMPAT_SUCCESS") - ), - "COMPAT_FAILURE" to listOf( - genInitPay("COMPAT_FAILURE") - ), - "BATCH_SINGLE_SUCCESS" to listOf( - genInitPay("FD622SMXKT5QWSAHDY0H8NYG3G"), - ), - // JEYMR3OYZTFM7505OWWENFPAH53LNOWJHS - "BATCH_SINGLE_FAILURE" to listOf( - genInitPay("DAFC3NEE4T48WVC560T76ABA2C"), - ), - // EF525087DD2D4ABBA65C8CD3EEB6952F - "BATCH_MANY_SUCCESS" to listOf( - genInitPay("ZGRT91MSQY3QVJ93SX5MNFAC9R"), - genInitPay("T9CYNR9EJS3HR3KFVQF5VY82EW"), - genInitPay("B93XHQR6SPAB7QCDG960E71MWM"), - genInitPay("XC1YNY5HCDDAM0M7GKV0KN01S0"), - ), - "BATCH_MANY_PART" to listOf( - genInitPay("27SK3166EG36SJ7VP7VFYP0MW8"), - genInitPay("KGTDBASWTJ6JM89WXD3Q5KFQC4"), - genInitPay("8XK8Z7RAX224FGWK832FD40GYC"), - ), - // ZQOOPJC1DYBP52X119YGO6WMXU6NWIDPJK - "BATCH_MANY_FAILURE" to listOf( - genInitPay("4XTPKWE4A9V90PRQJCT8Z3MQZ8"), - genInitPay("3VZZHVYJ6XP2SNPKWF4D4YVHNG"), - ) - )) - - // Ingest camt files - db.ingest(cfg.ebics, "sample/platform/gls_camt052.xml", OrderDoc.report) - db.ingest(cfg.ebics, "sample/platform/gls_camt053.xml", OrderDoc.statement) - // TODO camt054 with missing id before and after - - // Check state - db.check( - status = mapOf( - "COMPAT_SUCCESS" to Pair(SubmissionState.success, mapOf( - "COMPAT_SUCCESS" to SubmissionState.success - )), - "COMPAT_FAILURE" to Pair(SubmissionState.pending, mapOf( - "COMPAT_FAILURE" to SubmissionState.permanent_failure - )), - "BATCH_SINGLE_SUCCESS" to Pair(SubmissionState.success, mapOf( - "FD622SMXKT5QWSAHDY0H8NYG3G" to SubmissionState.success - )), - "BATCH_SINGLE_FAILURE" to Pair(SubmissionState.pending, mapOf( // TODO success - "DAFC3NEE4T48WVC560T76ABA2C" to SubmissionState.pending, // TODO failure - )), - "BATCH_MANY_SUCCESS" to Pair(SubmissionState.pending, mapOf( // TODO success - "ZGRT91MSQY3QVJ93SX5MNFAC9R" to SubmissionState.pending, // TODO success - "T9CYNR9EJS3HR3KFVQF5VY82EW" to SubmissionState.pending, // TODO success - "B93XHQR6SPAB7QCDG960E71MWM" to SubmissionState.pending, // TODO success - "XC1YNY5HCDDAM0M7GKV0KN01S0" to SubmissionState.pending, // TODO success - )), - "BATCH_MANY_PART" to Pair(SubmissionState.success, mapOf( - "27SK3166EG36SJ7VP7VFYP0MW8" to SubmissionState.success, - "KGTDBASWTJ6JM89WXD3Q5KFQC4" to SubmissionState.permanent_failure, - "8XK8Z7RAX224FGWK832FD40GYC" to SubmissionState.permanent_failure, - )), - "BATCH_MANY_FAILURE" to Pair(SubmissionState.pending, mapOf( // TODO success - "4XTPKWE4A9V90PRQJCT8Z3MQZ8" to SubmissionState.pending, // TODO failure - "3VZZHVYJ6XP2SNPKWF4D4YVHNG" to SubmissionState.pending, // TODO failure - )) - ), - incoming = listOf( - IncomingPayment( - bankId = "BYLADEM1WOR-G2910276709458A2", - amount = TalerAmount("EUR:3"), - subject = "Taler FJDQ7W6G7NWX4H9M1MKA12090FRC9K7DA6N0FANDZZFXTR6QHX5G Test.,-", - executionTime = dateToInstant("2024-04-12"), - debtorPayto = ibanPayto("DE84500105177118117964", "John Smith") - ), - ), - outgoing = listOf( - OutgoingPayment( - endToEndId = "COMPAT_SUCCESS", - amount = TalerAmount("EUR:2"), - subject = "TestABC123", - executionTime = dateToInstant("2024-04-18"), - creditorPayto = ibanPayto("DE20500105172419259181", "John Smith") - ), - OutgoingPayment( - endToEndId = "FD622SMXKT5QWSAHDY0H8NYG3G", - amount = TalerAmount("EUR:1.1"), - subject = "single 2024-09-02T14:29:52.875253314Z", - executionTime = dateToInstant("2024-09-02"), - creditorPayto = ibanPayto("DE89500105173198527518", "Grothoff Hans") - ), - OutgoingPayment( - endToEndId = "YF5QBARGQ0MNY0VK59S477VDG4", - amount = TalerAmount("EUR:1.1"), - subject = "Simple tx", - executionTime = dateToInstant("2024-04-18"), - creditorPayto = ibanPayto("DE20500105172419259181", "John Smith") - ), - OutgoingPayment( - endToEndId = "27SK3166EG36SJ7VP7VFYP0MW8", - amount = TalerAmount("EUR:44"), - subject = "init payment", - executionTime = dateToInstant("2024-09-04"), - creditorPayto = ibanPayto("CH4189144589712575493", "Test") - ), - ) - ) - } -} -\ No newline at end of file diff --git a/nexus/src/test/kotlin/Registration.kt b/nexus/src/test/kotlin/Registration.kt @@ -0,0 +1,361 @@ +/* + * This file is part of LibEuFin. + * Copyright (C) 2024 Taler Systems S.A. + + * LibEuFin is free software; you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation; either version 3, or + * (at your option) any later version. + + * LibEuFin is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY + * or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General + * Public License for more details. + + * You should have received a copy of the GNU Affero General Public + * License along with LibEuFin; see the file COPYING. If not, see + * <http://www.gnu.org/licenses/> + */ + +import org.junit.Test +import tech.libeufin.common.* +import tech.libeufin.common.db.* +import tech.libeufin.nexus.* +import tech.libeufin.nexus.cli.* +import tech.libeufin.nexus.db.* +import tech.libeufin.nexus.ebics.* +import tech.libeufin.nexus.iso20022.* +import java.nio.file.Files +import java.time.Instant +import kotlin.io.path.* +import kotlin.test.* + +/** End-to-end test for XML file registration */ +class RegistrationTest { + + /** Register batches of initiated payments for reconcciliation */ + suspend fun Database.batches(batches: Map<String, List<InitiatedPayment>>): List<PaymentBatch> { + val tmp = mutableListOf<PaymentBatch>() + for ((name, txs) in batches) { + for (tx in txs) { + initiated.create(tx) + } + this.initiated.batch(Instant.now(), name) + val (batch) = this.initiated.submittable("EUR") + this.initiated.batchSubmissionSuccess(batch.id, Instant.now(), name.replace("BATCH", "ORDER")) + tmp.add(batch) + } + return tmp + } + + /** Register an XML sample into the database */ + suspend fun Database.register( + cfg: NexusEbicsConfig, + path: String, + doc: OrderDoc + ) { + registerFile(this, cfg, Files.newInputStream(Path(path)), doc) + } + + /** Check database content */ + suspend fun Database.check( + status: Map<String, Pair<SubmissionState, Map<String, SubmissionState>>>, + incoming: List<IncomingPayment>, + outgoing: List<OutgoingPayment> + ) { + // Check batch status + val batch_status = this.serializable( + """ + SELECT message_id, status FROM initiated_outgoing_batches ORDER BY initiated_outgoing_batch_id + """ + ) { + all { + Pair( + it.getString("message_id"), + it.getEnum<SubmissionState>("status") + ) + } + } + assertContentEquals(status.map { Pair(it.key, it.value.first) }, batch_status) + + // Check transactions status + val batch_tx = this.serializable( + """ + SELECT message_id, end_to_end_id, initiated_outgoing_transactions.status + FROM initiated_outgoing_transactions + JOIN initiated_outgoing_batches USING (initiated_outgoing_batch_id) + ORDER BY initiated_outgoing_batch_id, initiated_outgoing_transaction_id + """ + ) { + all { + Triple( + it.getString("message_id"), + it.getString("end_to_end_id"), + it.getEnum<SubmissionState>("status") + ) + }.groupBy( + keySelector = { it.first }, + valueTransform = { Pair(it.second, it.third) } + ).mapValues { it.value.toMap() } + } + assertContentEquals(status.mapValues { it.value.second }.toList(), batch_tx.toList()) + + // Check incoming transactions + val incoming_tx = this.serializable( + """ + SELECT bank_id + ,(amount).val as amount_val + ,(amount).frac AS amount_frac + ,subject + ,execution_time + ,debit_payto + FROM incoming_transactions + ORDER BY incoming_transaction_id + """ + ) { + all { + IncomingPayment( + bankId = it.getString("bank_id"), + amount = it.getAmount("amount", this@check.bankCurrency), + subject = it.getString("subject"), + executionTime = it.getLong("execution_time").asInstant(), + debtorPayto = it.getIbanPayto("debit_payto"), + ) + } + } + assertContentEquals(incoming, incoming_tx) + + // Check outgoing transactions + val outgoing_tx = this.serializable( + """ + SELECT end_to_end_id + ,(amount).val as amount_val + ,(amount).frac AS amount_frac + ,subject + ,execution_time + ,credit_payto + FROM outgoing_transactions + ORDER BY outgoing_transaction_id + """ + ) { + all { + OutgoingPayment( + endToEndId = it.getString("end_to_end_id"), + amount = it.getAmount("amount", this@check.bankCurrency), + subject = it.getString("subject"), + executionTime = it.getLong("execution_time").asInstant(), + creditorPayto = it.getIbanPayto("credit_payto"), + ) + } + } + assertContentEquals(outgoing, outgoing_tx) + } + + @Test + fun pain001() = setup { db, cfg -> + val (batch) = db.batches(mapOf( + "MESSAGE_ID" to listOf( + genInitPay( + endToEndId = "TX_FIRST", + amount = "EUR:42", + subject = "Test 42", + ), + genInitPay( + endToEndId = "TX_SECOND", + amount = "EUR:5.11", + subject = "Test 5.11", + ), + genInitPay( + endToEndId = "TX_THIRD", + amount = "EUR:0.21", + subject = "Test 0.21", + ), + ), + )) + val msg = batchToPain001Msg(cfg.ebics.account, batch).copy(timestamp = dateToInstant("2024-09-09"),) + for (dialect in Dialect.entries) { + assertContentEquals( + Path("sample/platform/${dialect}_pain001.xml").readBytes(), + createPain001(msg, dialect) + ) + } + } + + /** HAC order id test */ + @Test + fun hac() = setup { db, cfg -> + db.batches(mapOf( + "BATCH_SUCCESS" to listOf( + genInitPay("BATCH_SUCCESS_0"), + genInitPay("BATCH_SUCCESS_1"), + ), + "BATCH_FAILURE" to listOf( + genInitPay("BATCH_FAILURE_0"), + genInitPay("BATCH_FAILURE_1"), + ) + )) + + // Register HAC files + db.register(cfg.ebics, "sample/platform/hac.xml", OrderDoc.acknowledgement) + + // Check state + db.check( + status = mapOf( + "BATCH_SUCCESS" to Pair(SubmissionState.success, mapOf( + "BATCH_SUCCESS_0" to SubmissionState.pending, + "BATCH_SUCCESS_1" to SubmissionState.pending, + )), + "BATCH_FAILURE" to Pair(SubmissionState.permanent_failure, mapOf( + "BATCH_FAILURE_0" to SubmissionState.permanent_failure, + "BATCH_FAILURE_1" to SubmissionState.permanent_failure, + )) + ), + incoming = emptyList(), + outgoing = emptyList() + ) + } + + /** CreditSuisse dialect test */ + @Test + fun cs() = setup { db, cfg -> + db.batches(mapOf( + "05BD4C5B4A2649B5B08F6EF6A31F197A" to listOf( + genInitPay("AQCXNCPWD8PHW5JTN65Y5XTF7R"), + genInitPay("EE9SX76FC5YSC657EK3GMVZ9TC"), + genInitPay("V5B3MXPEWES9VQW1JDRD6VAET4"), + genInitPay("M9NGRCAC1FBX3ENX3XEDEPJ2JW"), + ), + )) + + // Register pain files + db.register(cfg.ebics, "sample/platform/pain002.xml", OrderDoc.status) + + // Check state + db.check( + status = mapOf( + "05BD4C5B4A2649B5B08F6EF6A31F197A" to Pair(SubmissionState.pending, mapOf( + "AQCXNCPWD8PHW5JTN65Y5XTF7R" to SubmissionState.permanent_failure, + "EE9SX76FC5YSC657EK3GMVZ9TC" to SubmissionState.permanent_failure, + "V5B3MXPEWES9VQW1JDRD6VAET4" to SubmissionState.permanent_failure, + "M9NGRCAC1FBX3ENX3XEDEPJ2JW" to SubmissionState.pending, + )), + ), + incoming = emptyList(), + outgoing = emptyList() + ) + } + + /** GLS dialect test */ + @Test + fun gls() = setup("gls.conf") { db, cfg -> + db.batches(mapOf( + "COMPAT_SUCCESS" to listOf( + genInitPay("COMPAT_SUCCESS") + ), + "COMPAT_FAILURE" to listOf( + genInitPay("COMPAT_FAILURE") + ), + "BATCH_SINGLE_SUCCESS" to listOf( + genInitPay("FD622SMXKT5QWSAHDY0H8NYG3G"), + ), + // JEYMR3OYZTFM7505OWWENFPAH53LNOWJHS + "BATCH_SINGLE_FAILURE" to listOf( + genInitPay("DAFC3NEE4T48WVC560T76ABA2C"), + ), + // EF525087DD2D4ABBA65C8CD3EEB6952F + "BATCH_MANY_SUCCESS" to listOf( + genInitPay("ZGRT91MSQY3QVJ93SX5MNFAC9R"), + genInitPay("T9CYNR9EJS3HR3KFVQF5VY82EW"), + genInitPay("B93XHQR6SPAB7QCDG960E71MWM"), + genInitPay("XC1YNY5HCDDAM0M7GKV0KN01S0"), + ), + "BATCH_MANY_PART" to listOf( + genInitPay("27SK3166EG36SJ7VP7VFYP0MW8"), + genInitPay("KGTDBASWTJ6JM89WXD3Q5KFQC4"), + genInitPay("8XK8Z7RAX224FGWK832FD40GYC"), + ), + // ZQOOPJC1DYBP52X119YGO6WMXU6NWIDPJK + "BATCH_MANY_FAILURE" to listOf( + genInitPay("4XTPKWE4A9V90PRQJCT8Z3MQZ8"), + genInitPay("3VZZHVYJ6XP2SNPKWF4D4YVHNG"), + ) + )) + + // Register camt files + db.register(cfg.ebics, "sample/platform/gls_camt052.xml", OrderDoc.report) + db.register(cfg.ebics, "sample/platform/gls_camt053.xml", OrderDoc.statement) + // TODO camt054 with missing id before and after + + // Check state + db.check( + status = mapOf( + "COMPAT_SUCCESS" to Pair(SubmissionState.success, mapOf( + "COMPAT_SUCCESS" to SubmissionState.success + )), + "COMPAT_FAILURE" to Pair(SubmissionState.pending, mapOf( + "COMPAT_FAILURE" to SubmissionState.permanent_failure + )), + "BATCH_SINGLE_SUCCESS" to Pair(SubmissionState.success, mapOf( + "FD622SMXKT5QWSAHDY0H8NYG3G" to SubmissionState.success + )), + "BATCH_SINGLE_FAILURE" to Pair(SubmissionState.pending, mapOf( // TODO success + "DAFC3NEE4T48WVC560T76ABA2C" to SubmissionState.pending, // TODO failure + )), + "BATCH_MANY_SUCCESS" to Pair(SubmissionState.pending, mapOf( // TODO success + "ZGRT91MSQY3QVJ93SX5MNFAC9R" to SubmissionState.pending, // TODO success + "T9CYNR9EJS3HR3KFVQF5VY82EW" to SubmissionState.pending, // TODO success + "B93XHQR6SPAB7QCDG960E71MWM" to SubmissionState.pending, // TODO success + "XC1YNY5HCDDAM0M7GKV0KN01S0" to SubmissionState.pending, // TODO success + )), + "BATCH_MANY_PART" to Pair(SubmissionState.success, mapOf( + "27SK3166EG36SJ7VP7VFYP0MW8" to SubmissionState.success, + "KGTDBASWTJ6JM89WXD3Q5KFQC4" to SubmissionState.permanent_failure, + "8XK8Z7RAX224FGWK832FD40GYC" to SubmissionState.permanent_failure, + )), + "BATCH_MANY_FAILURE" to Pair(SubmissionState.pending, mapOf( // TODO success + "4XTPKWE4A9V90PRQJCT8Z3MQZ8" to SubmissionState.pending, // TODO failure + "3VZZHVYJ6XP2SNPKWF4D4YVHNG" to SubmissionState.pending, // TODO failure + )) + ), + incoming = listOf( + IncomingPayment( + bankId = "BYLADEM1WOR-G2910276709458A2", + amount = TalerAmount("EUR:3"), + subject = "Taler FJDQ7W6G7NWX4H9M1MKA12090FRC9K7DA6N0FANDZZFXTR6QHX5G Test.,-", + executionTime = dateToInstant("2024-04-12"), + debtorPayto = ibanPayto("DE84500105177118117964", "John Smith") + ), + ), + outgoing = listOf( + OutgoingPayment( + endToEndId = "COMPAT_SUCCESS", + amount = TalerAmount("EUR:2"), + subject = "TestABC123", + executionTime = dateToInstant("2024-04-18"), + creditorPayto = ibanPayto("DE20500105172419259181", "John Smith") + ), + OutgoingPayment( + endToEndId = "FD622SMXKT5QWSAHDY0H8NYG3G", + amount = TalerAmount("EUR:1.1"), + subject = "single 2024-09-02T14:29:52.875253314Z", + executionTime = dateToInstant("2024-09-02"), + creditorPayto = ibanPayto("DE89500105173198527518", "Grothoff Hans") + ), + OutgoingPayment( + endToEndId = "YF5QBARGQ0MNY0VK59S477VDG4", + amount = TalerAmount("EUR:1.1"), + subject = "Simple tx", + executionTime = dateToInstant("2024-04-18"), + creditorPayto = ibanPayto("DE20500105172419259181", "John Smith") + ), + OutgoingPayment( + endToEndId = "27SK3166EG36SJ7VP7VFYP0MW8", + amount = TalerAmount("EUR:44"), + subject = "init payment", + executionTime = dateToInstant("2024-09-04"), + creditorPayto = ibanPayto("CH4189144589712575493", "Test") + ), + ) + ) + } +} +\ No newline at end of file diff --git a/nexus/src/test/kotlin/RevenueApiTest.kt b/nexus/src/test/kotlin/RevenueApiTest.kt @@ -47,7 +47,7 @@ class RevenueApiTest { }, { // Common credit transactions - ingestIn(db) + registerIn(db) } ), ignored = listOf( diff --git a/nexus/src/test/kotlin/WireGatewayApiTest.kt b/nexus/src/test/kotlin/WireGatewayApiTest.kt @@ -22,7 +22,7 @@ import io.ktor.http.* import io.ktor.server.testing.* import org.junit.Test import tech.libeufin.common.* -import tech.libeufin.nexus.cli.ingestOutgoingPayment +import tech.libeufin.nexus.cli.registerOutgoingPayment import tech.libeufin.nexus.ebics.randEbicsId import java.time.Instant import kotlin.test.* @@ -205,7 +205,7 @@ class WireGatewayApiTest { ), ignored = listOf( // Ignore malformed incoming transaction - { ingestIn(db) }, + { registerIn(db) }, // Ignore outgoing transaction { talerableOut(db) }, @@ -232,10 +232,10 @@ class WireGatewayApiTest { { talerableIn(db) }, // Ignore malformed incoming transaction - { ingestIn(db) }, + { registerIn(db) }, // Ignore malformed outgoing transaction - { ingestOutgoingPayment(db, genOutPay("ignored")) }, + { registerOutgoingPayment(db, genOutPay("ignored")) }, ) ) } diff --git a/nexus/src/test/kotlin/bench.kt b/nexus/src/test/kotlin/bench.kt @@ -84,28 +84,28 @@ class Bench { // Warm HTTP client client.getA("/taler-revenue/config").assertOk() - // Ingest - measureAction("ingest_in") { - ingestIn(db) + // Register + measureAction("register_in") { + registerIn(db) } - measureAction("ingest_out") { - ingestOut(db) + measureAction("register_out") { + registerOut(db) } - measureAction("ingest_reserve") { + measureAction("register_reserve") { talerableIn(db) } - measureAction("ingest_kyc") { + measureAction("register_kyc") { talerableKycIn(db) } - measureAction("ingest_reserve_missing_id") { + measureAction("register_reserve_missing_id") { val incoming = genInPay("test with ${ShortHashCode.rand()} reserve pub") - ingestIncomingPayment(db, incoming.copy(bankId = null), AccountType.exchange) - ingestIncomingPayment(db, incoming, AccountType.exchange) + registerIncomingPayment(db, incoming.copy(bankId = null), AccountType.exchange) + registerIncomingPayment(db, incoming, AccountType.exchange) } - measureAction("ingest_kyc_missing_id") { + measureAction("register_kyc_missing_id") { val incoming = genInPay("test with KYC:${ShortHashCode.rand()} account pub") - ingestIncomingPayment(db, incoming.copy(bankId = null), AccountType.exchange) - ingestIncomingPayment(db, incoming, AccountType.exchange) + registerIncomingPayment(db, incoming.copy(bankId = null), AccountType.exchange) + registerIncomingPayment(db, incoming, AccountType.exchange) } // Revenue API diff --git a/nexus/src/test/kotlin/helpers.kt b/nexus/src/test/kotlin/helpers.kt @@ -28,8 +28,8 @@ import tech.libeufin.common.* import tech.libeufin.common.db.dbInit import tech.libeufin.common.db.pgDataSource import tech.libeufin.nexus.* -import tech.libeufin.nexus.cli.ingestIncomingPayment -import tech.libeufin.nexus.cli.ingestOutgoingPayment +import tech.libeufin.nexus.cli.registerIncomingPayment +import tech.libeufin.nexus.cli.registerOutgoingPayment import tech.libeufin.nexus.db.Database import tech.libeufin.nexus.db.InitiatedPayment import tech.libeufin.nexus.ebics.randEbicsId @@ -155,16 +155,16 @@ suspend fun ApplicationTestBuilder.addKyc(amount: String) { }.assertOk() } -/** Ingest a talerable outgoing transaction */ +/** Register a talerable outgoing transaction */ suspend fun talerableOut(db: Database) { val wtid = ShortHashCode.rand() - ingestOutgoingPayment(db, genOutPay("$wtid http://exchange.example.com/")) + registerOutgoingPayment(db, genOutPay("$wtid http://exchange.example.com/")) } -/** Ingest a talerable reserve incoming transaction */ +/** Register a talerable reserve incoming transaction */ suspend fun talerableIn(db: Database, nullId: Boolean = false, amount: String = "CHF:44") { val reserve_pub = ShortHashCode.rand() - ingestIncomingPayment(db, genInPay("test with $reserve_pub reserve pub", amount).run { + registerIncomingPayment(db, genInPay("test with $reserve_pub reserve pub", amount).run { if (nullId) { copy(bankId = null) } else { @@ -173,10 +173,10 @@ suspend fun talerableIn(db: Database, nullId: Boolean = false, amount: String = }, AccountType.exchange) } -/** Ingest a talerable KYC incoming transaction */ +/** Register a talerable KYC incoming transaction */ suspend fun talerableKycIn(db: Database, nullId: Boolean = false, amount: String = "CHF:44") { val account_pub = ShortHashCode.rand() - ingestIncomingPayment(db, genInPay("test with KYC:$account_pub account pub", amount).run { + registerIncomingPayment(db, genInPay("test with KYC:$account_pub account pub", amount).run { if (nullId) { copy(bankId = null) } else { @@ -185,14 +185,14 @@ suspend fun talerableKycIn(db: Database, nullId: Boolean = false, amount: String }, AccountType.exchange) } -/** Ingest an incoming transaction */ -suspend fun ingestIn(db: Database) { - ingestIncomingPayment(db, genInPay("ignored"), AccountType.normal) +/** Register an incoming transaction */ +suspend fun registerIn(db: Database) { + registerIncomingPayment(db, genInPay("ignored"), AccountType.normal) } -/** Ingest an outgoing transaction */ -suspend fun ingestOut(db: Database) { - ingestOutgoingPayment(db, genOutPay("ignored")) +/** Register an outgoing transaction */ +suspend fun registerOut(db: Database) { + registerOutgoingPayment(db, genOutPay("ignored")) } /* ----- Auth ----- */ diff --git a/testbench/src/test/kotlin/IntegrationTest.kt b/testbench/src/test/kotlin/IntegrationTest.kt @@ -37,7 +37,7 @@ import tech.libeufin.common.api.engine import tech.libeufin.common.db.one import tech.libeufin.nexus.AccountType import tech.libeufin.nexus.cli.LibeufinNexus -import tech.libeufin.nexus.cli.ingestIncomingPayment +import tech.libeufin.nexus.cli.registerIncomingPayment import tech.libeufin.nexus.iso20022.* import tech.libeufin.nexus.nexusConfig import tech.libeufin.nexus.withDb @@ -169,11 +169,11 @@ class IntegrationTest { ) assertException("ERROR: cashin failed: missing exchange account") { - ingestIncomingPayment(db, reservePayment, AccountType.exchange) + registerIncomingPayment(db, reservePayment, AccountType.exchange) } // But KYC works - ingestIncomingPayment( + registerIncomingPayment( db, reservePayment.copy( bankId = "kyc", @@ -186,7 +186,7 @@ class IntegrationTest { bankCmd.run("create-account $flags -u exchange -p password --name 'Mr Money' --exchange") assertException("ERROR: cashin currency conversion failed: missing conversion rates") { - ingestIncomingPayment(db, reservePayment, AccountType.exchange) + registerIncomingPayment(db, reservePayment, AccountType.exchange) } // Start server @@ -220,7 +220,7 @@ class IntegrationTest { // Too small amount db.checkCount(1, 0, 1) - ingestIncomingPayment(db, reservePayment.copy( + registerIncomingPayment(db, reservePayment.copy( amount = TalerAmount("EUR:0.01"), ), AccountType.exchange) db.checkCount(2, 1, 1) @@ -233,15 +233,15 @@ class IntegrationTest { subject = "Success $reservePub", bankId = "success" ) - ingestIncomingPayment(db, validPayment, AccountType.exchange) + registerIncomingPayment(db, validPayment, AccountType.exchange) db.checkCount(3, 1, 2) client.get("http://0.0.0.0:8080/accounts/exchange/transactions") { basicAuth("exchange", "password") }.assertOkJson<BankAccountTransactionsResponse>() // Check idempotency - ingestIncomingPayment(db, validPayment, AccountType.exchange) - ingestIncomingPayment(db, validPayment.copy( + registerIncomingPayment(db, validPayment, AccountType.exchange) + registerIncomingPayment(db, validPayment.copy( subject="Success 2 $reservePub" ), AccountType.exchange) db.checkCount(3, 1, 2)