commit 13d35fb71aad9b9b02c555629ab84902852b09b8
parent 2ee83d5e05d211489505bdc46154842d2914a7c2
Author: Antoine A <>
Date: Sat, 20 Dec 2025 17:01:58 +0100
cyclos: failure injection
Diffstat:
12 files changed, 135 insertions(+), 69 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
@@ -5,6 +5,7 @@ members = [
"common/taler-common",
"common/taler-build",
"common/taler-test-utils",
+ "common/failure-injection",
"taler-magnet-bank",
"taler-cyclos",
]
@@ -48,6 +49,7 @@ taler-common = { path = "common/taler-common" }
taler-api = { path = "common/taler-api" }
taler-test-utils = { path = "common/taler-test-utils" }
taler-build = { path = "common/taler-build" }
+failure-injection = { path = "common/failure-injection" }
anyhow = "1"
http-body-util = "0.1.2"
libdeflater = "1.22.0"
diff --git a/Makefile b/Makefile
@@ -47,4 +47,11 @@ deb:
.PHONY: ci
ci:
- contrib/ci/run-all-jobs.sh
-\ No newline at end of file
+ contrib/ci/run-all-jobs.sh
+
+.PHONY: coverage-cyclos
+coverage-cyclos:
+ cargo llvm-cov clean --workspace
+ cargo llvm-cov test --no-clean
+ cargo llvm-cov run --bin cyclos-harness --no-clean -- -c dev.conf logic
+ cargo llvm-cov report --lcov --output-path ./target/lcov.info
+\ No newline at end of file
diff --git a/common/failure-injection/Cargo.toml b/common/failure-injection/Cargo.toml
@@ -0,0 +1,11 @@
+[package]
+name = "failure-injection"
+version.workspace = true
+edition.workspace = true
+authors.workspace = true
+homepage.workspace = true
+repository.workspace = true
+license-file.workspace = true
+
+[dependencies]
+thiserror.workspace = true
+\ No newline at end of file
diff --git a/common/failure-injection/src/lib.rs b/common/failure-injection/src/lib.rs
@@ -0,0 +1,44 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use std::sync::{
+ Mutex,
+ atomic::{AtomicBool, Ordering},
+};
+
+static ENABLED: AtomicBool = AtomicBool::new(false);
+static SCENARIO: Mutex<Vec<&'static str>> = Mutex::new(Vec::new());
+
+#[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
+#[error("injected failure at {0}")]
+pub struct InjectedErr(pub &'static str);
+
+pub fn set_failure_scenario(failures: &[&'static str]) {
+ ENABLED.store(true, Ordering::SeqCst);
+ let mut scenario = SCENARIO.lock().unwrap();
+ scenario.clear();
+ scenario.extend_from_slice(failures);
+ scenario.reverse();
+}
+
+pub fn fail_point(step: &'static str) -> Result<(), InjectedErr> {
+ if ENABLED.load(Ordering::SeqCst) && SCENARIO.lock().unwrap().pop_if(|it| *it == step).is_some()
+ {
+ Err(InjectedErr(step))
+ } else {
+ Ok(())
+ }
+}
diff --git a/taler-cyclos/Cargo.toml b/taler-cyclos/Cargo.toml
@@ -30,6 +30,7 @@ tokio.workspace = true
anyhow.workspace = true
base64.workspace = true
owo-colors.workspace = true
+failure-injection.workspace = true
[dev-dependencies]
taler-test-utils.workspace = true
diff --git a/taler-cyclos/db/cyclos-procedures.sql b/taler-cyclos/db/cyclos-procedures.sql
@@ -179,6 +179,14 @@ WHERE tx_id = in_tx_id;
IF FOUND OR EXISTS(SELECT FROM bounced WHERE chargeback_id = in_transfer_id) THEN
out_result = 'known';
ELSE
+ -- Make it idempotent using wtid matching TODO find a idempotent way with cyclos API
+ UPDATE initiated
+ SET
+ tx_out_id = out_tx_row_id,
+ status = 'success',
+ status_msg = NULL
+ FROM transfer
+ WHERE transfer.initiated_id = initiated.initiated_id AND wtid = in_wtid;
out_result = 'recovered';
END IF;
diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs
@@ -17,9 +17,10 @@
use std::{str::FromStr as _, time::Duration};
use clap::Parser as _;
+use failure_injection::{InjectedErr, set_failure_scenario};
use jiff::Timestamp;
use owo_colors::OwoColorize as _;
-use sqlx::{PgPool, Row, postgres::PgRow};
+use sqlx::{PgPool, postgres::PgRow};
use taler_api::db::TypeHelper as _;
use taler_build::long_version;
use taler_common::{
@@ -41,7 +42,7 @@ use taler_cyclos::{
constants::CONFIG_SOURCE,
cyclos_api::{api::CyclosAuth, client::Client, types::HistoryItem},
db::{self, TransferResult},
- worker::{Worker, WorkerResult},
+ worker::{Worker, WorkerError, WorkerResult},
};
/// Cyclos Adapter harness test suite
@@ -183,6 +184,11 @@ impl<'a> Harness<'a> {
}
}
+ async fn transfer(&self, amount: Decimal) -> u64 {
+ let creditor = FullCyclosPayto::new(CyclosId(self.client_id), "Client".to_string());
+ self.custom_transfer(amount, &creditor).await
+ }
+
async fn transfer_id(&self, transfer_id: u64) -> u64 {
sqlx::query(
"SELECT transfer_id
@@ -331,12 +337,7 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
step("Test transfer transactions");
let amount = decimal("3.5");
// Init a transfer to client
- let transfer_id = harness
- .custom_transfer(
- amount,
- &FullCyclosPayto::new(CyclosId(harness.client_id), "Client".to_string()),
- )
- .await;
+ let transfer_id = harness.transfer(amount).await;
// Check transfer pending
harness
.expect_transfer_status(transfer_id, TransferState::pending, None)
@@ -431,11 +432,15 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
balance.expect_add(amount).await;
harness.worker().await?;
harness
- .expect_transfer_status(transfer_id, TransferState::late_failure, Some("charged back"))
+ .expect_transfer_status(
+ transfer_id,
+ TransferState::late_failure,
+ Some("charged back"),
+ )
.await;
step("Test recover unexpected chargeback");
- let amount = decimal("10.22");
+ let amount = decimal("10.2");
// Manual tx from the exchange
harness
.exchange_send(&format!("What is this chargebacked ? {now}"), amount)
@@ -449,6 +454,38 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
// Sync
harness.worker().await?;
+ step("Test direct-payment failure");
+ let amount = decimal("10.3");
+ harness.transfer(amount).await;
+ set_failure_scenario(&["direct-payment"]);
+ assert!(matches!(
+ harness.worker().await.unwrap_err(),
+ WorkerError::Injected(InjectedErr("direct-payment"))
+ ));
+ harness.worker().await?;
+ balance.expect_sub(amount).await;
+ harness.worker().await?;
+
+ step("Test chargeback failure");
+ // Send malformed transaction
+ let amount = decimal("10.4");
+ harness
+ .client_send(&format!("Malformed test {now} with failure"), amount)
+ .await;
+ balance.expect_add(amount).await;
+ // Sync and bounce
+ set_failure_scenario(&["chargeback"]);
+ assert!(matches!(
+ harness.worker().await.unwrap_err(),
+ WorkerError::Injected(InjectedErr("chargeback"))
+ ));
+ balance.expect_sub(amount).await;
+ // Sync recover
+ harness.worker().await?;
+
+ step("Finish");
+ harness.worker().await?;
+ balance.expect_add(Decimal::zero()).await;
Ok(())
}
diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs
@@ -14,6 +14,7 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
+use failure_injection::{InjectedErr, fail_point};
use jiff::Timestamp;
use sqlx::PgConnection;
use taler_api::subject::{self, parse_incoming_unstructured};
@@ -39,8 +40,8 @@ pub enum WorkerError {
Db(#[from] sqlx::Error),
#[error(transparent)]
Api(#[from] ApiErr),
- //#[error(transparent)]
- //Injected(#[from] InjectedErr),
+ #[error(transparent)]
+ Injected(#[from] InjectedErr),
}
pub type WorkerResult = Result<(), WorkerError>;
@@ -85,7 +86,7 @@ impl Worker<'_> {
.client
.direct_payment(initiated.creditor.0, initiated.amount, &initiated.subject)
.await;
- // TODO fail_point("init-tx")?;
+ fail_point("direct-payment")?;
match res {
Ok(tx) => {
// Update transaction status, on failure the initiated transaction will be orphan
@@ -157,6 +158,7 @@ impl Worker<'_> {
}
} else {
let chargeback_id = self.client.chargeback(*transfer.id).await?;
+ fail_point("chargeback")?;
let res = db::register_bounced_tx_in(
db,
&tx,
diff --git a/taler-magnet-bank/Cargo.toml b/taler-magnet-bank/Cargo.toml
@@ -37,6 +37,7 @@ anyhow.workspace = true
base64.workspace = true
rand_core.workspace = true
owo-colors.workspace = true
+failure-injection.workspace = true
[dev-dependencies]
taler-test-utils.workspace = true
diff --git a/taler-magnet-bank/src/bin/magnet-bank-harness.rs b/taler-magnet-bank/src/bin/magnet-bank-harness.rs
@@ -17,6 +17,7 @@
use std::{fmt::Debug, time::Duration};
use clap::Parser as _;
+use failure_injection::{InjectedErr, set_failure_scenario};
use jiff::{Timestamp, Zoned};
use owo_colors::OwoColorize;
use p256::ecdsa::SigningKey;
@@ -37,7 +38,6 @@ use taler_magnet_bank::{
config::{AccountType, HarnessCfg, parse_db_cfg},
constants::CONFIG_SOURCE,
db::{self, TransferResult},
- failure_injection::{FailureLogic, InjectedErr, set_failure_logic},
magnet_api::{
client::{ApiClient, AuthClient},
types::{Account, Direction, Order, TxDto, TxStatus},
@@ -459,7 +459,7 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
step("Test transfer failure init-tx");
harness.transfer(10).await;
- set_failure_logic(FailureLogic::History(vec!["init-tx"]));
+ set_failure_scenario(&["init-tx"]);
assert!(matches!(
harness.worker().await,
Err(WorkerError::Injected(InjectedErr("init-tx")))
@@ -470,7 +470,7 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
step("Test transfer failure submit-tx");
harness.transfer(11).await;
- set_failure_logic(FailureLogic::History(vec!["submit-tx"]));
+ set_failure_scenario(&["submit-tx"]);
assert!(matches!(
harness.worker().await,
Err(WorkerError::Injected(InjectedErr("submit-tx")))
@@ -481,7 +481,7 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
step("Test transfer all failures");
harness.transfer(13).await;
- set_failure_logic(FailureLogic::History(vec!["init-tx", "submit-tx"]));
+ set_failure_scenario(&["init-tx", "submit-tx"]);
assert!(matches!(
harness.worker().await,
Err(WorkerError::Injected(InjectedErr("init-tx")))
diff --git a/taler-magnet-bank/src/lib.rs b/taler-magnet-bank/src/lib.rs
@@ -44,54 +44,6 @@ pub mod dev;
pub mod magnet_api;
pub mod setup;
pub mod worker;
-pub mod failure_injection {
- use std::sync::Mutex;
-
- #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)]
- #[error("injected failure at {0}")]
- pub struct InjectedErr(pub &'static str);
-
- pub enum FailureLogic {
- None,
- Always(&'static str),
- History(Vec<&'static str>),
- }
-
- impl FailureLogic {
- /// Check whether this step should fail
- pub fn check(&mut self, name: &'static str) -> bool {
- match self {
- FailureLogic::None => false,
- FailureLogic::Always(step) => *step == name,
- FailureLogic::History(items) => {
- if let Some(step) = items.first()
- && *step == name
- {
- items.remove(0);
- true
- } else {
- false
- }
- }
- }
- }
- }
-
- static FAILURE_STATE: Mutex<FailureLogic> = Mutex::new(FailureLogic::None);
-
- pub fn set_failure_logic(logic: FailureLogic) {
- let mut lock = FAILURE_STATE.lock().unwrap();
- *lock = logic;
- }
-
- pub fn fail_point(step: &'static str) -> Result<(), InjectedErr> {
- if FAILURE_STATE.lock().unwrap().check(step) {
- Err(InjectedErr(step))
- } else {
- Ok(())
- }
- }
-}
pub async fn run_serve(cfg: &Config, pool: PgPool) -> anyhow::Result<()> {
let cfg = ServeCfg::parse(cfg)?;
diff --git a/taler-magnet-bank/src/worker.rs b/taler-magnet-bank/src/worker.rs
@@ -16,6 +16,7 @@
use std::num::ParseIntError;
+use failure_injection::{InjectedErr, fail_point};
use jiff::{Timestamp, Zoned, civil::Date};
use p256::ecdsa::SigningKey;
use sqlx::PgConnection;
@@ -30,7 +31,6 @@ use crate::{
FullHuPayto, HuIban,
config::AccountType,
db::{self, AddIncomingResult, Initiated, RegisterResult, TxIn, TxOut, TxOutKind},
- failure_injection::{InjectedErr, fail_point},
magnet_api::{
api::{ApiErr, ErrKind},
client::ApiClient,
@@ -308,7 +308,7 @@ impl Worker<'_> {
)
.await?;
if let Some(id) = res.initiated_id {
- if res.new {
+ if res.new {
error!(target: "worker", "out failure {id} {tx_out}");
} else {
trace!(target: "worker", "out failure {id} {tx_out} already seen");