taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit 9605a01f4ab9c2222011bd7b946f94795cebc0d5
parent 6d73ae9d438c1c2d92c32aadfb4aea32d942d737
Author: Antoine A <>
Date:   Thu,  6 Nov 2025 18:03:02 +0100

magnet-bank: register outgoing transactions failures

Diffstat:
MCargo.lock | 4++--
Mtaler-magnet-bank/db/magnet-bank-procedures.sql | 32+++++++++++++++++++++++++++++++-
Mtaler-magnet-bank/src/db.rs | 155+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------
Mtaler-magnet-bank/src/dev.rs | 6+++---
Mtaler-magnet-bank/src/magnet_api/types.rs | 4++--
Mtaler-magnet-bank/src/worker.rs | 128++++++++++++++++++++++++++++++++++++++++++++++++++-----------------------------
6 files changed, 265 insertions(+), 64 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -2154,9 +2154,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "syn" -version = "2.0.108" +version = "2.0.109" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917" +checksum = "2f17c7e013e88258aa9543dcbe81aca68a667a9ac37cd69c9fbc07858bfe0e2f" dependencies = [ "proc-macro2", "quote", diff --git a/taler-magnet-bank/db/magnet-bank-procedures.sql b/taler-magnet-bank/db/magnet-bank-procedures.sql @@ -192,11 +192,41 @@ IF out_new THEN FROM bounced JOIN tx_in USING (tx_in_id) WHERE initiated.initiated_id = bounced.initiated_id AND tx_in.magnet_code = in_bounced; END IF; +END IF; +END $$; +COMMENT ON FUNCTION register_tx_out IS 'Register an outgoing transaction idempotently'; +CREATE FUNCTION register_tx_out_failure( + IN in_code INT8, + IN in_bounced INT8, + IN in_now INT8, + -- Success return + OUT out_initiated_id INT8, + OUT out_new BOOLEAN +) +LANGUAGE plpgsql AS $$ +DECLARE +current_status transfer_status; +BEGIN +-- Found existing initiated transaction or bounced transaction +SELECT status, initiated_id +INTO current_status, out_initiated_id +FROM initiated +LEFT JOIN bounced USING (initiated_id) +LEFT JOIN tx_in USING (tx_in_id) +WHERE initiated.magnet_code = in_code OR tx_in.magnet_code = in_bounced; +-- Update status if new +out_new = FOUND AND current_status != 'permanent_failure'; +IF out_new THEN + UPDATE initiated + SET + status = 'permanent_failure', + status_msg = NULL + WHERE initiated_id = out_initiated_id; END IF; END $$; -COMMENT ON FUNCTION register_tx_out IS 'Register an outgoing transaction idempotently'; +COMMENT ON FUNCTION register_tx_out_failure IS 'Register an outgoing transaction failure idempotently'; CREATE FUNCTION taler_transfer( IN in_request_uid BYTEA, diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs @@ -151,12 +151,6 @@ pub struct TxInAdmin { } #[derive(Debug, PartialEq, Eq)] -pub struct AddOutgoingResult { - pub new: bool, - pub row_id: u64, -} - -#[derive(Debug, PartialEq, Eq)] pub enum AddIncomingResult { Success { new: bool, @@ -242,6 +236,12 @@ pub enum TxOutKind { Talerable(OutgoingSubject), } +#[derive(Debug, PartialEq, Eq)] +pub struct AddOutgoingResult { + pub new: bool, + pub row_id: u64, +} + pub async fn register_tx_out( db: &mut PgConnection, tx: &TxOut, @@ -287,6 +287,37 @@ pub async fn register_tx_out( } #[derive(Debug, PartialEq, Eq)] +pub struct OutFailureResult { + pub initiated_id: Option<u64>, + pub new: bool, +} + +pub async fn register_tx_out_failure( + db: &mut PgConnection, + code: u64, + bounced: Option<u32>, + now: &Timestamp, +) -> sqlx::Result<OutFailureResult> { + sqlx::query( + " + SELECT out_new, out_initiated_id + FROM register_tx_out_failure($1, $2, $3) + ", + ) + .bind(code as i64) + .bind(bounced.map(|i| i as i32)) + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(OutFailureResult { + new: r.try_get(0)?, + initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64), + }) + }) + .fetch_one(db) + .await +} + +#[derive(Debug, PartialEq, Eq)] pub enum TransferResult { Success { id: u64, initiated_at: Timestamp }, RequestUidReuse, @@ -741,9 +772,9 @@ mod test { CONFIG_SOURCE, constant::CURRENCY, db::{ - self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, TransferResult, - TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer, register_bounce_tx_in, - register_tx_in, register_tx_in_admin, register_tx_out, + self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult, + TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer, + register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out, }, magnet_api::types::TxStatus, magnet_payto, @@ -1097,6 +1128,110 @@ mod test { } #[tokio::test] + async fn tx_out_failure() { + let (mut db, _) = setup().await; + + let now = Timestamp::now_stable(); + + // Unknown + assert_eq!( + db::register_tx_out_failure(&mut db, 42, None, &now) + .await + .unwrap(), + OutFailureResult { + initiated_id: None, + new: false + } + ); + assert_eq!( + db::register_tx_out_failure(&mut db, 42, Some(12), &now) + .await + .unwrap(), + OutFailureResult { + initiated_id: None, + new: false + } + ); + + // Initiated + let req = TransferRequest { + request_uid: HashCode::rand(), + amount: amount("HUF:10"), + exchange_base_url: url("https://exchange.test.com/"), + wtid: ShortHashCode::rand(), + credit_account: payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), + }; + let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); + assert_eq!( + make_transfer(&mut db, &req, &payto, &now).await.unwrap(), + TransferResult::Success { + id: 1, + initiated_at: now + } + ); + db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34) + .await + .expect("status success"); + assert_eq!( + db::register_tx_out_failure(&mut db, 34, None, &now) + .await + .unwrap(), + OutFailureResult { + initiated_id: Some(1), + new: true + } + ); + assert_eq!( + db::register_tx_out_failure(&mut db, 34, None, &now) + .await + .unwrap(), + OutFailureResult { + initiated_id: Some(1), + new: false + } + ); + + // Recovered bounce + let tx = TxIn { + code: 12, + amount: amount("HUF:11"), + subject: "malformed transaction".to_owned(), + debtor: payto, + value_date: Zoned::now().date(), + status: TxStatus::Completed, + }; + assert_eq!( + db::register_bounce_tx_in(&mut db, &tx, &tx.amount, "no reason", &now) + .await + .unwrap(), + BounceResult { + tx_id: 1, + tx_new: true, + bounce_id: 2, + bounce_new: true + } + ); + assert_eq!( + db::register_tx_out_failure(&mut db, 10, Some(12), &now) + .await + .unwrap(), + OutFailureResult { + initiated_id: Some(2), + new: true + } + ); + assert_eq!( + db::register_tx_out_failure(&mut db, 10, Some(12), &now) + .await + .unwrap(), + OutFailureResult { + initiated_id: Some(2), + new: false + } + ); + } + + #[tokio::test] async fn transfer() { let (mut db, _) = setup().await; @@ -1111,7 +1246,7 @@ mod test { let req = TransferRequest { request_uid: HashCode::rand(), - amount: amount("EUR:10"), + amount: amount("HUF:10"), exchange_base_url: url("https://exchange.test.com/"), wtid: ShortHashCode::rand(), credit_account: payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), diff --git a/taler-magnet-bank/src/dev.rs b/taler-magnet-bank/src/dev.rs @@ -24,7 +24,7 @@ use taler_common::{ use tracing::info; use crate::{ - HuPayto, TransferHuPayto, + HuIban, HuPayto, TransferHuPayto, config::WorkerCfg, magnet_api::{client::AuthClient, types::Direction}, setup, @@ -45,7 +45,7 @@ pub enum DevCmd { /// Print account info Accounts, Tx { - account: HuPayto, + account: HuIban, #[clap(long, short, value_enum, default_value_t = DirArg::Both)] direction: DirArg, }, @@ -92,7 +92,7 @@ pub async fn dev(cfg: &Config, cmd: DevCmd) -> anyhow::Result<()> { for item in page.list { let tx = extract_tx_info(item.tx); match tx { - Tx::In(tx_in) => info!(target: "dev", "in {tx_in}"), + Tx::In(tx_in) => info!(target: "dev", "in {tx_in}"), Tx::Out(tx_out) => info!(target: "dev", "out {tx_out}"), } } diff --git a/taler-magnet-bank/src/magnet_api/types.rs b/taler-magnet-bank/src/magnet_api/types.rs @@ -218,7 +218,7 @@ pub struct Tx { } #[derive(Debug, Deserialize)] -pub struct Transaction { +pub struct TxDto { #[serde(rename = "kod")] pub code: u64, #[serde(rename = "bankszamla")] @@ -267,5 +267,5 @@ pub struct TransactionPage { #[derive(Debug, Deserialize)] pub struct TransactionWrapper { #[serde(rename = "tranzakcioDto")] - pub tx: Transaction, + pub tx: TxDto, } diff --git a/taler-magnet-bank/src/worker.rs b/taler-magnet-bank/src/worker.rs @@ -35,7 +35,7 @@ use crate::{ magnet_api::{ api::{ApiErr, ErrKind}, client::ApiClient, - types::{Direction, Next, Transaction, TxStatus}, + types::{Direction, Next, TxDto, TxStatus}, }, }; @@ -68,7 +68,7 @@ impl Worker<'_> { /// Run a single worker pass pub async fn run(&mut self) -> WorkerResult { // Sync transactions - let mut next: Option<Next> = None; // kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused + let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused let mut all_final = true; let mut first = true; loop { @@ -106,9 +106,9 @@ impl Worker<'_> { { AddIncomingResult::Success { new, .. } => { if new { - info!(target: "worker", "in {tx_in} skip bounce: {reason}"); + info!(target: "worker", "in {tx_in} skip bounce: {reason}"); } else { - trace!(target: "worker", "in {tx_in} already skil bounce "); + trace!(target: "worker", "in {tx_in} already skil bounce "); } } AddIncomingResult::ReservePubReuse => unreachable!(), @@ -125,12 +125,12 @@ impl Worker<'_> { if res.tx_new { info!(target: "worker", - "in {tx_in} bounced in {}: {reason}", + "in {tx_in} bounced in {}: {reason}", res.bounce_id ); } else { trace!(target: "worker", - "in {tx_in} already seen and bounced in {}: {reason}", + "in {tx_in} already seen and bounced in {}: {reason}", res.bounce_id ); } @@ -151,9 +151,9 @@ impl Worker<'_> { { AddIncomingResult::Success { new, .. } => { if new { - info!(target: "worker", "in {tx_in}"); + info!(target: "worker", "in {tx_in}"); } else { - trace!(target: "worker", "in {tx_in} already seen"); + trace!(target: "worker", "in {tx_in} already seen"); } } AddIncomingResult::ReservePubReuse => { @@ -169,9 +169,9 @@ impl Worker<'_> { { AddIncomingResult::Success { new, .. } => { if new { - info!(target: "worker", "in {tx_in}"); + info!(target: "worker", "in {tx_in}"); } else { - trace!(target: "worker", "in {tx_in} already seen"); + trace!(target: "worker", "in {tx_in} already seen"); } } AddIncomingResult::ReservePubReuse => unreachable!(), @@ -185,11 +185,6 @@ impl Worker<'_> { self.recover_tx(&tx_out).await?; continue; } - TxStatus::Rejected | TxStatus::Canceled => { - warn!(target: "worker", "out failed {tx_out}"); - continue; - } - TxStatus::Completed => {} TxStatus::PendingFirstSignature | TxStatus::PendingSecondSignature | TxStatus::PendingProcessing @@ -200,37 +195,77 @@ impl Worker<'_> { debug!(target: "worker", "pending out {tx_out}"); continue; } + TxStatus::Rejected | TxStatus::Canceled | TxStatus::Completed => {} } match self.account_type { AccountType::Exchange => { - // TODO log status (known | recovered | founded) - if let Ok(subject) = subject::parse_outgoing(&tx_out.subject) { + let kind = if let Ok(subject) = + subject::parse_outgoing(&tx_out.subject) + { + TxOutKind::Talerable(subject) + } else if let Ok(bounced) = parse_bounce_outgoing(&tx_out.subject) { + TxOutKind::Bounce(bounced) + } else { + TxOutKind::Simple + }; + if tx_out.status == TxStatus::Completed { let res = db::register_tx_out( self.db, &tx_out, - &TxOutKind::Talerable(subject), + &kind, &Timestamp::now(), ) .await?; + // TODO log status (known | recovered | founded) if res.new { - info!(target: "worker", "out {tx_out}"); + match kind { + TxOutKind::Simple => { + warn!(target: "worker", "out (malformed) {tx_out}") + } + TxOutKind::Bounce(_) => { + info!(target: "worker", "out (bounce) {tx_out}") + } + TxOutKind::Talerable(_) => { + info!(target: "worker", "out {tx_out}") + } + } } else { - trace!(target: "worker", "out {tx_out} already seen"); + match kind { + TxOutKind::Simple => { + trace!(target: "worker", "out (malformed) {tx_out} already see") + } + TxOutKind::Bounce(_) => { + trace!(target: "worker", "out (bounce) {tx_out} already see") + } + TxOutKind::Talerable(_) => { + trace!(target: "worker", "out {tx_out} already see") + } + } } - } else if let Ok(bounced) = parse_bounce_outgoing(&tx_out.subject) { - let res = db::register_tx_out( + } else { + let bounced = match kind { + TxOutKind::Simple => None, + TxOutKind::Bounce(bounced) => Some(bounced), + TxOutKind::Talerable(_) => None, + }; + let res = db::register_tx_out_failure( self.db, - &tx_out, - &TxOutKind::Bounce(bounced), + tx_out.code, + bounced, &Timestamp::now(), ) .await?; - if res.new { - info!(target: "worker", "out (bounce) {tx_out}"); - } else { - trace!(target: "worker", "out (bounce) {tx_out} already seen"); + if let Some(id) = res.initiated_id { + if res.new { + error!(target: "worker", "initiated tx {id}: {:?}", tx_out.status); + } else { + trace!(target: "worker", "initiated tx {id} already seen {:?}", tx_out.status); + } } - } else { + } + } + AccountType::Normal => { + if tx_out.status == TxStatus::Completed { let res = db::register_tx_out( self.db, &tx_out, @@ -239,24 +274,25 @@ impl Worker<'_> { ) .await?; if res.new { - warn!(target: "worker", "out (malformed) {tx_out}"); + info!(target: "worker", "out {tx_out}"); } else { - trace!(target: "worker", "out (malformed) {tx_out} already seen"); + trace!(target: "worker", "out {tx_out} already seen"); } - } - } - AccountType::Normal => { - let res = db::register_tx_out( - self.db, - &tx_out, - &TxOutKind::Simple, - &Timestamp::now(), - ) - .await?; - if res.new { - info!(target: "worker", "out {tx_out}"); } else { - trace!(target: "worker", "out {tx_out} already seen"); + let res = db::register_tx_out_failure( + self.db, + tx_out.code, + None, + &Timestamp::now(), + ) + .await?; + if let Some(id) = res.initiated_id { + if res.new { + error!(target: "worker", "initiated tx {id}: {:?}", tx_out.status); + } else { + trace!(target: "worker", "initiated tx {id} already seen {:?}", tx_out.status); + } + } } } } @@ -268,7 +304,7 @@ impl Worker<'_> { // Update in db cursor only if all previous transactions where final if all_final { // debug!(target: "worker", "advance cursor {next:?}"); - // kv_set(&mut *self.db, TXS_CURSOR_KEY, next).await?; TODO cursor is broken + // kv_set(&mut *self.db, TXS_CURSOR_KEY, &next).await?; TODO cursor is broken } } else { break; @@ -423,7 +459,7 @@ pub enum Tx { Out(TxOut), } -pub fn extract_tx_info(tx: Transaction) -> Tx { +pub fn extract_tx_info(tx: TxDto) -> Tx { // TODO amount from f64 without allocations let amount = amount::amount(format!("{}:{}", tx.currency, tx.amount.abs())); // TODO we should support non hungarian account and error handling