libeufin

Integration and sandbox testing for FinTech APIs and data formats
Log | Files | Refs | Submodules | README | LICENSE

commit d729f18bb4c082cf355977857368d31af3f5579b
parent a488eecd3cbb88a2bb2167c5e67e1090bf405f59
Author: Antoine A <>
Date:   Thu, 19 Sep 2024 15:51:38 +0200

common: fix notification channel name clash and improve error fmt

Diffstat:
Mbank/src/main/kotlin/tech/libeufin/bank/db/Database.kt | 6+++---
Mcommon/src/main/kotlin/db/notifications.kt | 6+++++-
Mdatabase-versioning/libeufin-bank-procedures.sql | 10+++++-----
Mdatabase-versioning/libeufin-nexus-procedures.sql | 8++++----
Mnexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt | 6+++---
5 files changed, 20 insertions(+), 16 deletions(-)

diff --git a/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt b/bank/src/main/kotlin/tech/libeufin/bank/db/Database.kt @@ -64,19 +64,19 @@ class Database(dbConfig: DatabaseConfig, internal val bankCurrency: String, inte flow.emit(creditRow) } }, - "outgoing_tx" to { + "bank_outgoing_tx" to { val (account, merchant, debitRow, creditRow) = it.split(' ', limit = 4).map { it.toLong() } outgoingTxFlows[account]?.run { flow.emit(debitRow) } }, - "incoming_tx" to { + "bank_incoming_tx" to { val (account, row) = it.split(' ', limit = 2).map { it.toLong() } incomingTxFlows[account]?.run { flow.emit(row) } }, - "withdrawal_status" to { + "bank_withdrawal_status" to { val raw = it.split(' ', limit = 2) val uuid = UUID.fromString(raw[0]) val status = WithdrawalStatus.valueOf(raw[1]) diff --git a/common/src/main/kotlin/db/notifications.kt b/common/src/main/kotlin/db/notifications.kt @@ -60,7 +60,11 @@ fun watchNotifications( conn.getNotifications(0) // Block until we receive at least one notification .forEach { // Dispatch - listeners[it.name]!!(it.parameter) + try { + listeners[it.name]!!(it.parameter) + } catch (e: Exception) { + throw Exception("channel ${it.name} with input '${it.parameter}'", e) + } } } } catch (e: Exception) { diff --git a/database-versioning/libeufin-bank-procedures.sql b/database-versioning/libeufin-bank-procedures.sql @@ -567,7 +567,7 @@ INTO local_amount.val, local_amount.frac, local_bank_account_id FROM bank_account_transactions WHERE bank_transaction_id=in_debit_row_id; CALL stats_register_payment('taler_out', NULL, local_amount, null); -- notify new transaction -PERFORM pg_notify('outgoing_tx', in_debtor_account_id || ' ' || in_creditor_account_id || ' ' || in_debit_row_id || ' ' || in_credit_row_id); +PERFORM pg_notify('bank_outgoing_tx', in_debtor_account_id || ' ' || in_creditor_account_id || ' ' || in_debit_row_id || ' ' || in_credit_row_id); END $$; COMMENT ON PROCEDURE register_outgoing IS 'Register a bank transaction as a taler outgoing transaction and announce it'; @@ -605,7 +605,7 @@ IF in_type = 'reserve' THEN CALL stats_register_payment('taler_in', NULL, local_amount, null); END IF; -- Notify new incoming transaction -PERFORM pg_notify('incoming_tx', local_bank_account_id || ' ' || in_tx_row_id); +PERFORM pg_notify('bank_incoming_tx', local_bank_account_id || ' ' || in_tx_row_id); END $$; COMMENT ON PROCEDURE register_incoming IS 'Register a bank transaction as a taler incoming transaction and announce it'; @@ -1026,7 +1026,7 @@ IF not_selected THEN WHERE withdrawal_uuid=in_withdrawal_uuid; -- Notify status change - PERFORM pg_notify('withdrawal_status', in_withdrawal_uuid::text || ' selected'); + PERFORM pg_notify('bank_withdrawal_status', in_withdrawal_uuid::text || ' selected'); END IF; END $$; COMMENT ON FUNCTION select_taler_withdrawal IS 'Set details of a withdrawal operation'; @@ -1049,7 +1049,7 @@ IF NOT FOUND OR out_already_confirmed THEN END IF; -- Notify status change -PERFORM pg_notify('withdrawal_status', in_withdrawal_uuid::text || ' aborted'); +PERFORM pg_notify('bank_withdrawal_status', in_withdrawal_uuid::text || ' aborted'); END $$; COMMENT ON FUNCTION abort_taler_withdrawal IS 'Abort a withdrawal operation.'; @@ -1146,7 +1146,7 @@ UPDATE taler_withdrawal_operations CALL register_incoming(tx_row_id, 'reserve'::taler_incoming_type, reserve_pub_local, NULL); -- Notify status change -PERFORM pg_notify('withdrawal_status', in_withdrawal_uuid::text || ' confirmed'); +PERFORM pg_notify('bank_withdrawal_status', in_withdrawal_uuid::text || ' confirmed'); END $$; COMMENT ON FUNCTION confirm_taler_withdrawal IS 'Set a withdrawal operation as confirmed and wire the funds to the exchange.'; diff --git a/database-versioning/libeufin-nexus-procedures.sql b/database-versioning/libeufin-nexus-procedures.sql @@ -91,7 +91,7 @@ IF in_wtid IS NOT NULL OR in_exchange_url IS NOT NULL THEN ) VALUES (out_tx_id, in_wtid, in_exchange_url) ON CONFLICT (wtid) DO NOTHING; IF FOUND THEN - PERFORM pg_notify('outgoing_tx', out_tx_id::text); + PERFORM pg_notify('nexus_outgoing_tx', out_tx_id::text); END IF; END IF; END $$; @@ -131,7 +131,7 @@ ELSE ,in_debit_payto_uri ,in_bank_id ) RETURNING incoming_transaction_id INTO out_tx_id; - PERFORM pg_notify('revenue_tx', out_tx_id::text); + PERFORM pg_notify('nexus_revenue_tx', out_tx_id::text); END IF; END $$; COMMENT ON FUNCTION register_incoming @@ -301,7 +301,7 @@ IF NOT EXISTS(SELECT FROM talerable_incoming_transactions WHERE incoming_transac ,in_reserve_pub ,in_account_pub ); - PERFORM pg_notify('incoming_tx', out_tx_id::text); + PERFORM pg_notify('nexus_incoming_tx', out_tx_id::text); END IF; END $$; COMMENT ON FUNCTION register_incoming_and_talerable IS ' @@ -367,5 +367,5 @@ INSERT INTO transfer_operations( ,in_exchange_base_url ); out_timestamp = in_timestamp; -PERFORM pg_notify('outgoing_tx', out_tx_row_id::text); +PERFORM pg_notify('nexus_outgoing_tx', out_tx_row_id::text); END $$; diff --git a/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt b/nexus/src/main/kotlin/tech/libeufin/nexus/db/Database.kt @@ -62,15 +62,15 @@ class Database(dbConfig: DatabaseConfig, val bankCurrency: String): DbPool(dbCon init { watchNotifications(pgSource, "libeufin_nexus", LoggerFactory.getLogger("libeufin-nexus-db-watcher"), mapOf( - "revenue_tx" to { + "nexus_revenue_tx" to { val id = it.toLong() revenueTxFlows.emit(id) }, - "outgoing_tx" to { + "nexus_outgoing_tx" to { val id = it.toLong() outgoingTxFlows.emit(id) }, - "incoming_tx" to { + "nexus_incoming_tx" to { val id = it.toLong() incomingTxFlows.emit(id) }