taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit 392dde21040646a83433ddd99c22fae3c2e13d11
parent 21a3305026bcd4a57e10266aa4b2310f1e50236a
Author: Antoine A <>
Date:   Wed, 29 Oct 2025 17:32:35 +0100

magnet-bank: more worker testing and fixing

Diffstat:
MCargo.lock | 10++++++++++
MCargo.toml | 5+++++
Mcommon/taler-api/Cargo.toml | 2+-
Mcommon/taler-api/benches/subject.rs | 4+++-
Mcommon/taler-api/src/subject.rs | 11+++++++----
Mcommon/taler-common/Cargo.toml | 2++
Mcommon/taler-common/src/api_common.rs | 6++++++
Mcommon/taler-common/src/log.rs | 4+++-
Mtaler-magnet-bank/Cargo.toml | 3++-
Mtaler-magnet-bank/db/magnet-bank-procedures.sql | 18+++++++++++++++---
Mtaler-magnet-bank/src/bin/magnet-bank-harness.rs | 272++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------
Mtaler-magnet-bank/src/db.rs | 26++++++++++++++++++++++++++
Mtaler-magnet-bank/src/lib.rs | 47+++++++++++++++++++++++++++++++++++++++++++++--
Mtaler-magnet-bank/src/worker.rs | 27++++++++++++++++-----------
14 files changed, 343 insertions(+), 94 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -563,6 +563,7 @@ checksum = "70e796c081cee67dc755e1a36a0a172b897fab85fc3f6bc48307991f64e4eca9" dependencies = [ "curve25519-dalek", "ed25519", + "rand_core 0.6.4", "sha2", "subtle", ] @@ -1346,6 +1347,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" [[package]] +name = "owo-colors" +version = "4.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c6901729fa79e91a0913333229e9ca5dc725089d1c363b2f4b4760709dc4a52" + +[[package]] name = "p256" version = "0.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2214,10 +2221,12 @@ dependencies = [ "anyhow", "clap", "criterion", + "ed25519-dalek", "fastrand", "glob", "indexmap", "jiff", + "rand_core 0.6.4", "serde", "serde_json", "serde_path_to_error", @@ -2242,6 +2251,7 @@ dependencies = [ "form_urlencoded", "hmac", "jiff", + "owo-colors", "p256", "passterm", "percent-encoding", diff --git a/Cargo.toml b/Cargo.toml @@ -48,3 +48,8 @@ anyhow = "1" http-body-util = "0.1.2" libdeflater = "1.22.0" base64 = "0.22" +owo-colors = "4.2.3" +ed25519-dalek = { version = "2.1.1", default-features = false, features = [ + "rand_core", +] } +rand_core = { version = "0.6.4" } diff --git a/common/taler-api/Cargo.toml b/common/taler-api/Cargo.toml @@ -13,7 +13,7 @@ dashmap = "6.1" base64.workspace = true http-body-util.workspace = true libdeflater.workspace = true -ed25519-dalek = { version = "2.1.1", default-features = false } +ed25519-dalek.workspace = true tokio = { workspace = true, features = ["signal"] } serde.workspace = true tracing.workspace = true diff --git a/common/taler-api/benches/subject.rs b/common/taler-api/benches/subject.rs @@ -14,7 +14,9 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use criterion::{Criterion, black_box, criterion_group, criterion_main}; +use std::hint::black_box; + +use criterion::{Criterion, criterion_group, criterion_main}; use taler_api::subject::parse_incoming_unstructured; fn parser(c: &mut Criterion) { diff --git a/common/taler-api/src/subject.rs b/common/taler-api/src/subject.rs @@ -149,7 +149,7 @@ pub fn parse_incoming_unstructured( return None; } } - _ => return None, + _ => unreachable!(), }; // Check key validity @@ -187,11 +187,14 @@ pub fn parse_incoming_unstructured( // For each part as a starting point for (i, &start) in parts.iter().enumerate() { // Use progressively longer concatenation - for &end in &parts[i..] { + for &end in parts[i..].iter().skip(1) { + let len = (end - start) as usize; // Until they are to long to be a key - if (end - start) as usize > KYC_SIZE { + if len > KYC_SIZE { break; - }; + } else if len != KEY_SIZE && len != KYC_SIZE { + continue; + } // Parse the concatenated parts // SAFETY: we now end.end <= concatenated.len diff --git a/common/taler-common/Cargo.toml b/common/taler-common/Cargo.toml @@ -24,6 +24,8 @@ tracing.workspace = true clap.workspace = true anyhow.workspace = true tracing-subscriber.workspace = true +ed25519-dalek.workspace = true +rand_core.workspace = true tokio = { workspace = true, features = ["rt-multi-thread"] } sqlx = { workspace = true, features = ["macros"] } diff --git a/common/taler-common/src/api_common.rs b/common/taler-common/src/api_common.rs @@ -16,6 +16,7 @@ use std::{fmt::Display, ops::Deref}; +use rand_core::OsRng; use serde::{Deserialize, Deserializer, Serialize, de::Error}; use serde_json::value::RawValue; @@ -118,3 +119,8 @@ pub type HashCode = Base32<64>; /// 32-bytes hash code pub type ShortHashCode = Base32<32>; pub type WadId = Base32<24>; + +pub fn rand_edsa_pub_key() -> EddsaPublicKey { + let signing_key = ed25519_dalek::SigningKey::generate(&mut OsRng); + Base32::from(signing_key.verifying_key().to_bytes()) +} diff --git a/common/taler-common/src/log.rs b/common/taler-common/src/log.rs @@ -68,7 +68,9 @@ pub fn taler_logger(max_level: Option<Level>) -> impl SubscriberInitExt { .with_ansi(std::io::stderr().is_terminal()), ) .with(tracing_subscriber::filter::filter_fn(move |metadata| { + let target = metadata.target(); max_level == Level::TRACE - || (*metadata.level() <= max_level && !metadata.target().starts_with("sqlx")) + || (*metadata.level() <= max_level + && !(target.starts_with("sqlx") || target.starts_with("hyper_util"))) })) } diff --git a/taler-magnet-bank/Cargo.toml b/taler-magnet-bank/Cargo.toml @@ -9,7 +9,6 @@ repository.workspace = true license-file.workspace = true [dependencies] -rand_core = { version = "0.6.4" } reqwest = { version = "0.12", default-features = false, features = [ "json", "rustls-tls", @@ -36,6 +35,8 @@ tracing.workspace = true tokio.workspace = true anyhow.workspace = true base64.workspace = true +rand_core.workspace = true +owo-colors.workspace = true [dev-dependencies] taler-test-utils.workspace = true diff --git a/taler-magnet-bank/db/magnet-bank-procedures.sql b/taler-magnet-bank/db/magnet-bank-procedures.sql @@ -160,6 +160,14 @@ IF out_new THEN -- Notify new outgoing transaction registration PERFORM pg_notify('tx_out', out_tx_row_id || ''); + -- Update initiated status + UPDATE initiated + SET + tx_out_id = out_tx_row_id, + status = 'success', + status_msg = NULL + WHERE magnet_code = in_code; + IF in_wtid IS NOT NULL THEN -- Insert new outgoing talerable transaction INSERT INTO taler_out ( @@ -170,9 +178,11 @@ IF out_new THEN out_tx_row_id, in_wtid, in_origin_exchange_url - ); - -- Notify new outgoing talerable transaction registration - PERFORM pg_notify('taler_out', out_tx_row_id || ''); + ) ON CONFLICT (wtid) DO NOTHING; + IF FOUND THEN + -- Notify new outgoing talerable transaction registration + PERFORM pg_notify('taler_out', out_tx_row_id || ''); + END IF; ELSIF in_bounced IS NOT NULL THEN UPDATE initiated SET @@ -182,6 +192,8 @@ IF out_new THEN FROM bounced JOIN tx_in USING (tx_in_id) WHERE initiated.initiated_id = bounced.initiated_id AND tx_in.magnet_code = in_bounced; END IF; + + END IF; END $$; COMMENT ON FUNCTION register_tx_out IS 'Register an outgoing transaction idempotently'; diff --git a/taler-magnet-bank/src/bin/magnet-bank-harness.rs b/taler-magnet-bank/src/bin/magnet-bank-harness.rs @@ -18,19 +18,27 @@ use std::time::Duration; use clap::Parser as _; use jiff::{Timestamp, Zoned}; +use owo_colors::OwoColorize; use p256::ecdsa::SigningKey; +use sqlx::PgPool; use taler_common::{ CommonArgs, + api_common::{EddsaPublicKey, HashCode, ShortHashCode, rand_edsa_pub_key}, + api_params::{History, Page}, + api_wire::{IncomingBankTransaction, TransferRequest, TransferState}, cli::long_version, db::{dbinit, pool}, taler_main, + types::{self, amount::amount, url}, }; use taler_magnet_bank::{ - CONFIG_SOURCE, + CONFIG_SOURCE, FullHuPayto, config::{HarnessCfg, parse_db_cfg}, + db::{self, TransferResult}, + failure_injection::{FailureLogic, InjectedErr, set_failure_logic}, magnet::{Account, ApiClient, AuthClient}, setup, - worker::Worker, + worker::{Worker, WorkerError}, }; /// Taler Magnet Bank Adapter harness test suite @@ -46,6 +54,7 @@ struct Args { } struct HarnessClient<'a> { + pool: &'a PgPool, api: &'a ApiClient<'a>, exchange: &'a Account, client: &'a Account, @@ -53,30 +62,80 @@ struct HarnessClient<'a> { } impl HarnessClient<'_> { - async fn expect_balance(&self, exchange: u32, client: u32) -> anyhow::Result<()> { - let mut attemps = 0; - loop { - let current = self.balance().await?; - if current == (exchange, client) { - return Ok(()); - } - if attemps > 20 { - assert_eq!(current, (exchange, client)); - } - attemps += 1; - tokio::time::sleep(Duration::from_secs(1)).await; - } - } - async fn balance(&self) -> anyhow::Result<(u32, u32)> { - let exchange_balance = self.api.balance_mini(self.exchange.iban.bban()).await?; - let client_balance = self.api.balance_mini(self.client.iban.bban()).await?; - return Ok(( + let (exchange_balance, client_balance) = tokio::try_join!( + self.api.balance_mini(self.exchange.iban.bban()), + self.api.balance_mini(self.client.iban.bban()) + )?; + Ok(( exchange_balance.balance as u32, client_balance.balance as u32, + )) + } + + async fn custom_transfer(&self, forint: u32, creditor: FullHuPayto) -> anyhow::Result<u64> { + let res = db::make_transfer( + self.pool, + &TransferRequest { + request_uid: HashCode::rand(), + amount: amount(format!("HUF:{forint}")), + exchange_base_url: url("https://test.com"), + wtid: ShortHashCode::rand(), + credit_account: creditor.as_payto(), + }, + &creditor, + &types::timestamp::Timestamp::now(), + ) + .await?; + match res { + TransferResult::Success { id, .. } => Ok(id), + TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(), + } + } + + async fn transfer(&self, forint: u32) -> anyhow::Result<u64> { + self.custom_transfer( + forint, + FullHuPayto::new(self.client.iban.clone(), "Name".to_owned()), + ) + .await + } + + async fn assert_transfer_status( + &self, + id: u64, + status: TransferState, + msg: Option<&str>, + ) -> anyhow::Result<()> { + let transfer = db::transfer_by_id(self.pool, id).await?.unwrap(); + assert_eq!( + (transfer.status, transfer.status_msg.as_deref()), + (status, msg) + ); + Ok(()) + } + + async fn assert_incoming(&self, key: EddsaPublicKey) -> anyhow::Result<()> { + let transfer = db::incoming_history( + self.pool, + &History { + page: Page { + limit: -1, + offset: None, + }, + timeout_ms: None, + }, + || tokio::sync::watch::channel(0).1, + ) + .await?; + assert!(matches!( + transfer.first().unwrap(), + IncomingBankTransaction::Reserve { reserve_pub, .. } if *reserve_pub == key, )); + Ok(()) } + /// Send a transaction between two magnet accounts async fn send_tx( &self, from: &Account, @@ -109,15 +168,62 @@ impl HarnessClient<'_> { Ok(()) } + /// Send transaction from client to exchange async fn client_send(&self, subject: &str, amount: u32) -> anyhow::Result<()> { self.send_tx(self.client, self.exchange, subject, amount) .await } + + /// Send transaction from exchange to client + async fn exchange_send(&self, subject: &str, amount: u32) -> anyhow::Result<()> { + self.send_tx(self.exchange, self.client, subject, amount) + .await + } +} + +struct Balances<'a> { + client: &'a HarnessClient<'a>, + exchange_balance: u32, + client_balance: u32, +} + +impl<'a> Balances<'a> { + pub async fn new(client: &'a HarnessClient<'a>) -> anyhow::Result<Self> { + let (exchange_balance, client_balance) = client.balance().await?; + Ok(Self { + client, + exchange_balance, + client_balance, + }) + } + + async fn expect(&mut self, diff: i32) -> anyhow::Result<()> { + self.exchange_balance = (self.exchange_balance as i32 + diff) as u32; + self.client_balance = (self.client_balance as i32 - diff) as u32; + let mut attemps = 0; + loop { + let current = self.client.balance().await?; + if current == (self.exchange_balance, self.client_balance) { + return Ok(()); + } + if attemps > 20 { + assert_eq!(current, (self.exchange_balance, self.client_balance)); + } + attemps += 1; + tokio::time::sleep(Duration::from_secs(1)).await; + } + } +} + +fn step(step: &str) { + println!("{}", step.green()); } fn main() { let args = Args::parse(); taler_main(CONFIG_SOURCE, args.common, |cfg| async move { + step("Init"); + // Prepare db let db_cfg = parse_db_cfg(&cfg)?; let pool = pool(db_cfg.cfg, "magnet_bank").await?; @@ -143,83 +249,109 @@ fn main() { ignore_bounces_before: cfg.worker.ignore_bounces_before, }; let harness = HarnessClient { + pool: &pool, api: &client, exchange: &exchange_account, client: &client_account, signing_key: &keys.signing_key, }; - // Fill existing info + // Initial sync worker.run().await?; + tokio::time::sleep(Duration::from_secs(10)).await; let now = Timestamp::now(); + let balance = &mut Balances::new(&harness).await?; - // Load current balance - let (mut exchange_balance, mut client_balance) = harness.balance().await?; - // Send malformed transaction - let amount = 34; + step("Test incoming talerable transaction"); + // Send talerable transaction + let reserve_pub = rand_edsa_pub_key(); harness - .client_send(&format!("Malformed test {now}"), amount) + .client_send(&format!("Taler {reserve_pub}"), 33) .await?; // Wait for transaction to finalize + balance.expect(33).await?; + // Sync and register + worker.run().await?; + harness.assert_incoming(reserve_pub).await?; + + step("Test incoming malformed transaction"); + // Send malformed transaction harness - .expect_balance(exchange_balance + amount, client_balance - amount) + .client_send(&format!("Malformed test {now}"), 34) .await?; + // Wait for transaction to finalize + balance.expect(34).await?; // Sync and bounce worker.run().await?; // Wait for bounce to finalize + balance.expect(-34).await?; + worker.run().await?; + + step("Test outgoing transactions to self"); + let transfer_id = harness + .custom_transfer( + 101, + FullHuPayto::new(exchange_account.iban.clone(), "Self".to_string()), + ) + .await?; + worker.run().await?; harness - .expect_balance(exchange_balance, client_balance) + .assert_transfer_status( + transfer_id, + TransferState::permanent_failure, + Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"), + ) .await?; + balance.expect(0).await?; - /* - harness.client_send("subject", 4).await?; - exchange_balance += 4; - client_balance -= 4; + step("Test unexpected outgoing"); harness - .expect_balance(exchange_balance, client_balance) - .await?;*/ - - /*send_tx( - &client, - &keys.signing_key, - &client_account, - &exchange_account, - "Test tx", - 33, - ) - .await?; - - let exchange_balance = client.balance_mini(exchange_account.iban.bban()).await?; - let client_balance = client.balance_mini(client_account.iban.bban()).await?; - dbg!(exchange_balance, client_balance);*/ + .exchange_send(&format!("What is this ? {now}"), 4) + .await?; + worker.run().await?; + balance.expect(-4).await?; - // Println WTF - /* - let wtid = EddsaPublicKey::rand(); - let now = Zoned::now(); - let info = client - .init_tx( - client_account.code, - 123.0, - &format!("Taler test {wtid}"), - &now.date(), - "Name", - exchange_account.iban.bban(), + step("Test transfer transactions to self"); + let transfer_id = harness + .custom_transfer( + 102, + FullHuPayto::new(client_account.iban.clone(), "Client".to_string()), ) .await?; worker.run().await?; - client - .sign_tx( - &keys.signing_key, - &client_account.number, - info.code, - info.amount, - &now.date(), - exchange_account.iban.bban(), - ) + harness + .assert_transfer_status(transfer_id, TransferState::pending, None) .await?; - worker.run().await?;*/ + balance.expect(-102).await?; + worker.run().await?; + + harness + .assert_transfer_status(transfer_id, TransferState::success, None) + .await?; + + step("Test transfer failure create-tx"); + harness.transfer(10).await?; + set_failure_logic(FailureLogic::History(vec!["create-tx"])); + assert!(matches!( + worker.run().await, + Err(WorkerError::Injected(InjectedErr("create-tx"))) + )); + balance.expect(0).await?; + worker.run().await?; + balance.expect(-10).await?; + + step("Test transfer failure sign-tx"); + harness.transfer(11).await?; + set_failure_logic(FailureLogic::History(vec!["sign-tx"])); + assert!(matches!( + worker.run().await, + Err(WorkerError::Injected(InjectedErr("sign-tx"))) + )); + balance.expect(0).await?; + worker.run().await?; + // TODO both transactions came through which is VERY WRONG waiting on Magnet Bank on the matter + balance.expect(-22).await?; Ok(()) }); } diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs @@ -581,6 +581,7 @@ pub async fn transfer_by_id<'a>( .await } +/** Get a batch of pending initiated transactions not attempted since [start] */ pub async fn pending_batch<'a>( db: impl PgExecutor<'a>, start: &Timestamp, @@ -608,6 +609,31 @@ pub async fn pending_batch<'a>( .await } +/** Get an initiated transaction matching the given magnet [code] */ +pub async fn initiated_by_code<'a>( + db: impl PgExecutor<'a>, + code: u64, +) -> sqlx::Result<Option<Initiated>> { + sqlx::query( + " + SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name + FROM initiated + WHERE magnet_code IS $1 + ", + ) + .bind(code as i64) + .try_map(|r: PgRow| { + Ok(Initiated { + id: r.try_get_u64(0)?, + amount: r.try_get_amount_i(1, &CURRENCY)?, + subject: r.try_get(3)?, + creditor: FullHuPayto::new(r.try_get_parse(4)?, r.try_get(5)?), + }) + }) + .fetch_optional(db) + .await +} + /** Update status of a successful submitted initiated transaction */ pub async fn initiated_submit_success<'a>( db: impl PgExecutor<'a>, diff --git a/taler-magnet-bank/src/lib.rs b/taler-magnet-bank/src/lib.rs @@ -33,8 +33,51 @@ pub mod magnet; pub mod setup; pub mod worker; pub mod failure_injection { - pub fn fail_point(_name: &'static str) { - // TODO inject failures for error handling tests + use std::sync::Mutex; + + #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] + #[error("injected failure at {0}")] + pub struct InjectedErr(pub &'static str); + + pub enum FailureLogic { + None, + Always(&'static str), + History(Vec<&'static str>), + } + + impl FailureLogic { + /// Check whether this step should fail + pub fn check(&mut self, name: &'static str) -> bool { + match self { + FailureLogic::None => false, + FailureLogic::Always(step) => *step == name, + FailureLogic::History(items) => { + if let Some(step) = items.first() + && *step == name + { + items.remove(0); + true + } else { + false + } + } + } + } + } + + static FAILURE_STATE: Mutex<FailureLogic> = Mutex::new(FailureLogic::None); + + pub fn set_failure_logic(logic: FailureLogic) { + let mut lock = FAILURE_STATE.lock().unwrap(); + *lock = logic; + } + + pub fn fail_point(step: &'static str) -> Result<(), InjectedErr> { + if FAILURE_STATE.lock().unwrap().check(step) { + Err(InjectedErr(step)) + } else { + Ok(()) + } } } diff --git a/taler-magnet-bank/src/worker.rs b/taler-magnet-bank/src/worker.rs @@ -31,7 +31,7 @@ use crate::{ FullHuPayto, HuIban, config::AccountType, db::{self, AddIncomingResult, Initiated, TxIn, TxOut, TxOutKind}, - failure_injection::fail_point, + failure_injection::{InjectedErr, fail_point}, magnet::{ ApiClient, Direction, Transaction, error::{ApiError, MagnetError}, @@ -44,6 +44,8 @@ pub enum WorkerError { Db(#[from] sqlx::Error), #[error(transparent)] Api(#[from] ApiError), + #[error(transparent)] + Injected(#[from] InjectedErr), } type WorkerResult = Result<(), WorkerError>; @@ -93,7 +95,7 @@ impl Worker<'_> { if new { info!(target: "worker", "in {tx_in} skip bounce: {reason}"); } else { - debug!(target: "worker", "in {tx_in} already skil bounce "); + trace!(target: "worker", "in {tx_in} already skil bounce "); } } AddIncomingResult::ReservePubReuse => unreachable!(), @@ -114,7 +116,7 @@ impl Worker<'_> { res.bounce_id ); } else { - debug!(target: "worker", + trace!(target: "worker", "in {tx_in} already seen and bounced in {}: {reason}", res.bounce_id ); @@ -138,7 +140,7 @@ impl Worker<'_> { if new { info!(target: "worker", "in {tx_in}"); } else { - debug!(target: "worker", "in {tx_in} already seen"); + trace!(target: "worker", "in {tx_in} already seen"); } } AddIncomingResult::ReservePubReuse => { @@ -156,7 +158,7 @@ impl Worker<'_> { if new { info!(target: "worker", "in {tx_in}"); } else { - debug!(target: "worker", "in {tx_in} already seen"); + trace!(target: "worker", "in {tx_in} already seen"); } } AddIncomingResult::ReservePubReuse => unreachable!(), @@ -179,7 +181,7 @@ impl Worker<'_> { if res.new { info!(target: "worker", "out {tx_out}"); } else { - debug!(target: "worker", "out {tx_out} already seen"); + trace!(target: "worker", "out {tx_out} already seen"); } } else if let Ok(bounced) = parse_bounce_outgoing(&tx_out.subject) { let res = db::register_tx_out( @@ -192,7 +194,7 @@ impl Worker<'_> { if res.new { info!(target: "worker", "out (bounce) {tx_out}"); } else { - debug!(target: "worker", "out (bounce) {tx_out} already seen"); + trace!(target: "worker", "out (bounce) {tx_out} already seen"); } } else { let res = db::register_tx_out( @@ -205,7 +207,7 @@ impl Worker<'_> { if res.new { warn!(target: "worker", "out (malformed) {tx_out}"); } else { - debug!(target: "worker", "out (malformed) {tx_out} already seen"); + trace!(target: "worker", "out (malformed) {tx_out} already seen"); } } } @@ -220,7 +222,7 @@ impl Worker<'_> { if res.new { info!(target: "worker", "out {tx_out}"); } else { - debug!(target: "worker", "out {tx_out} already seen"); + trace!(target: "worker", "out {tx_out} already seen"); } } } @@ -235,6 +237,8 @@ impl Worker<'_> { } } + // Recover pending transaction + // Send transactions let start = Timestamp::now(); let now = Zoned::now(); @@ -283,7 +287,8 @@ impl Worker<'_> { tx.creditor.bban(), ) .await; - fail_point("submit-create-tx"); + debug!("{res:?}"); + fail_point("create-tx")?; let info = match res { // Check if succeeded Ok(info) => { @@ -337,7 +342,7 @@ impl Worker<'_> { creditor: &str, ) -> WorkerResult { debug!(target: "worker", "sign tx {tx_code}"); - fail_point("sign-tx"); + fail_point("sign-tx")?; // Sign initiated transaction, on failure we will retry self.client .sign_tx(