taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit 70339a8b15cc3b103fb93048a8e9e99891280836
parent 19db9e56b96f9a6b9ce85c5ad246080e9b775adf
Author: Antoine A <>
Date:   Tue, 16 Dec 2025 13:53:04 +0100

cyclos: handle payment failure

Diffstat:
Ataler-cyclos/README.md | 26++++++++++++++++++++++++++
Mtaler-cyclos/src/bin/cyclos-harness.rs | 44++++++++++++++++++++++++++++++++------------
Mtaler-cyclos/src/cyclos_api/types.rs | 4++--
Mtaler-cyclos/src/worker.rs | 69++++++++++++++++++++++++++++++++++++++++++++++++---------------------
4 files changed, 108 insertions(+), 35 deletions(-)

diff --git a/taler-cyclos/README.md b/taler-cyclos/README.md @@ -0,0 +1,25 @@ + + +podman-compose up + +# Create test deploment + +ANything will do + +# Create and configure network + + + +- allow debt +- give money at creation + +- enable channel for all users + +# Create two users + +wire f20n4X3qV44dNoZUmpeU +client 1EkY5JJMrkwyvv9yK7x4 + +Enable chargeback Product details +Type +Member +\ No newline at end of file diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs @@ -14,7 +14,7 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::str::FromStr as _; +use std::{str::FromStr as _, time::Duration}; use clap::Parser as _; use jiff::Timestamp; @@ -25,8 +25,7 @@ use taler_common::{ CommonArgs, api_common::{EddsaPublicKey, HashCode, ShortHashCode, rand_edsa_pub_key}, api_params::{History, Page}, - api_wire::{IncomingBankTransaction, TransferRequest}, - cli, + api_wire::{IncomingBankTransaction, TransferRequest, TransferState}, config::Config, db::{dbinit, pool}, taler_main, @@ -164,6 +163,27 @@ impl<'a> Harness<'a> { TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(), } } + + async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) { + let mut attempts = 0; + loop { + let transfer = db::transfer_by_id(self.pool, id, &self.currency) + .await + .unwrap() + .unwrap(); + if (transfer.status, transfer.status_msg.as_deref()) == (status, msg) { + return; + } + if attempts > 40 { + assert_eq!( + (transfer.status, transfer.status_msg.as_deref()), + (status, msg) + ); + } + attempts += 1; + tokio::time::sleep(Duration::from_millis(200)).await; + } + } } struct Balances<'a> { @@ -209,7 +229,7 @@ impl<'a> Balances<'a> { /// Run logic tests against real Cyclos backend async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { - step("Run Magnet Bank logic harness tests"); + step("Run Cyclos logic harness tests"); step("Prepare db"); let db_cfg = parse_db_cfg(cfg)?; @@ -252,7 +272,7 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { let now = Timestamp::now(); let balance = &mut Balances::new(&harness).await; - /*step("Test incoming talerable transaction"); + step("Test incoming talerable transaction"); // Send talerable transaction let reserve_pub = rand_edsa_pub_key(); let amount = decimal("3.3"); @@ -273,28 +293,28 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { balance.expect_add(amount).await; // Sync and bounce harness.worker().await?; - balance.expect_sub(amount).await;*/ + balance.expect_sub(amount).await; step("Test transfer to self"); // Init a transfer to self let transfer_id = harness .custom_transfer( decimal("10.1"), - &FullCyclosPayto::new(CyclosId(42), "Self".to_string()), + &FullCyclosPayto::new(CyclosId(harness.wire_id), "Self".to_string()), ) .await; // Should failed harness.worker().await?; // Check transfer failed - /*harness + harness .expect_transfer_status( transfer_id, TransferState::permanent_failure, - Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"), + Some("permissionDenied - The operation was denied because a required permission was not granted"), ) - .await;*/ + .await; - /*step("Test unexpected outgoing"); + step("Test unexpected outgoing"); // Manual tx from the exchange let amount = decimal("4"); harness @@ -303,7 +323,7 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { harness.worker().await?; // Wait for transaction to finalize balance.expect_sub(amount).await; - harness.worker().await?;*/ + harness.worker().await?; step("Finish"); Ok(()) diff --git a/taler-cyclos/src/cyclos_api/types.rs b/taler-cyclos/src/cyclos_api/types.rs @@ -353,8 +353,8 @@ pub enum UnavailableError { #[error("{entity_type} {key}")] /// Error codes for 404 Not Found pub struct NotFoundError { - entity_type: String, - key: String, + pub entity_type: String, + pub key: String, } #[derive(Debug, serde::Deserialize, thiserror::Error)] diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs @@ -18,15 +18,15 @@ use jiff::Timestamp; use sqlx::PgConnection; use taler_api::subject::{self, parse_incoming_unstructured}; use taler_common::types::amount::{self, Currency}; -use tracing::{debug, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; use crate::{ FullCyclosPayto, config::AccountType, cyclos_api::{ - api::ApiErr, + api::{ApiErr, ErrKind}, client::Client, - types::{AccountKind, HistoryItem}, + types::{AccountKind, HistoryItem, NotFoundError}, }, db::{self, AddIncomingResult, RegisterResult, TxIn, TxOut, TxOutKind}, }; @@ -77,17 +77,44 @@ impl Worker<'_> { if batch.is_empty() { break; } - for tx in batch { - debug!(target: "worker", "send tx {tx}"); + for initiated in batch { + debug!(target: "worker", "send tx {initiated}"); let res = self .client - .direct_payment(tx.creditor.0, tx.amount, &tx.subject) + .direct_payment(initiated.creditor.0, initiated.amount, &initiated.subject) .await; + // TODO fail_point("init-tx")?; match res { - Ok(_) => todo!(), - Err(_) => todo!(), + Ok(tx) => { + dbg!(tx.date); + // Update transaction status, on failure the initiated transaction will be orphan + db::initiated_submit_success( + &mut *self.db, + initiated.id, + &tx.date, + tx.id.0, + ) + .await?; + } + Err(e) => { + let msg = match e.kind { + ErrKind::Unknown(NotFoundError { entity_type, key }) => { + format!("Unknown {entity_type} {key}") + } + ErrKind::Forbidden(err) => err.to_string(), + _ => return Err(e.into()), + }; + // TODO is permission should be considered are hard or soft failure ? + db::initiated_submit_permanent_failure( + &mut *self.db, + initiated.id, + &Timestamp::now(), + &msg, + ) + .await?; + error!(target: "worker", "initiated failure {initiated}: {msg}"); + } } - // TODO store success } } Ok(()) @@ -113,20 +140,20 @@ impl Worker<'_> { .await?; if res.tx_new { info!(target: "worker", - "in {tx} bounced (recovered) in {}: {reason}", chargeback.id + "in {tx} bounced (recovered) in {}: {reason}", chargeback.id ); } else { trace!(target: "worker", - "in {tx} already seen and bounced in {}: {reason}",chargeback.id + "in {tx} already seen and bounced in {}: {reason}",chargeback.id ); } } else if !transfer.can_chargeback { match db::register_tx_in(db, &tx, &None, &Timestamp::now()).await? { AddIncomingResult::Success { new, .. } => { if new { - info!(target: "worker", "in {tx} cannot bounce: {reason}"); + warn!(target: "worker", "in {tx} cannot bounce: {reason}"); } else { - trace!(target: "worker", "in {tx} already seen and cannot bounce "); + trace!(target: "worker", "in {tx} already seen and cannot bounce "); } } AddIncomingResult::ReservePubReuse => unreachable!(), @@ -142,9 +169,9 @@ impl Worker<'_> { ) .await?; if res.tx_new { - info!(target: "worker", "in {tx} bounced in {chargeback_id}: {reason}"); + info!(target: "worker", "in {tx} bounced in {chargeback_id}: {reason}"); } else { - trace!(target: "worker", "in {tx} already seen and bounced in {chargeback_id}: {reason}"); + trace!(target: "worker", "in {tx} already seen and bounced in {chargeback_id}: {reason}"); } } Ok(()) @@ -161,16 +188,15 @@ impl Worker<'_> { { AddIncomingResult::Success { new, .. } => { if new { - info!(target: "worker", "in {tx}"); + info!(target: "worker", "in {tx}"); } else { - trace!(target: "worker", "in {tx} already seen"); + trace!(target: "worker", "in {tx} already seen"); } } AddIncomingResult::ReservePubReuse => { bounce(self.db, "reserve pub reuse").await? } } - info!(target: "worker", "in {tx}"); } Err(e) => bounce(self.db, &e.to_string()).await?, } @@ -179,9 +205,9 @@ impl Worker<'_> { match db::register_tx_in(self.db, &tx, &None, &Timestamp::now()).await? { AddIncomingResult::Success { new, .. } => { if new { - info!(target: "worker", "in {tx}"); + info!(target: "worker", "in {tx}"); } else { - trace!(target: "worker", "in {tx} already seen"); + trace!(target: "worker", "in {tx} already seen"); } } AddIncomingResult::ReservePubReuse => unreachable!(), @@ -238,7 +264,8 @@ impl Worker<'_> { warn!(target: "worker", "out malformed (recovered) {tx}") } TxOutKind::Bounce(_) => { - warn!(target: "worker", "out bounce (recovered) {tx}") + // Chargeback are not stored as initiated and are therefor always recovered + info!(target: "worker", "out bounce {tx}") } TxOutKind::Talerable(_) => { warn!(target: "worker", "out (recovered) {tx}")