taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

cyclos-harness.rs (19124B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::time::Duration;
     18 
     19 use clap::Parser as _;
     20 use compact_str::CompactString;
     21 use failure_injection::{InjectedErr, set_failure_scenario};
     22 use jiff::Timestamp;
     23 use owo_colors::OwoColorize as _;
     24 use sqlx::{PgPool, Row as _, postgres::PgRow};
     25 use taler_api::notification::dummy_listen;
     26 use taler_build::long_version;
     27 use taler_common::{
     28     CommonArgs,
     29     api_common::{EddsaPublicKey, HashCode, ShortHashCode},
     30     api_params::{History, Page},
     31     api_wire::{IncomingBankTransaction, TransferState},
     32     config::Config,
     33     taler_main,
     34     types::{
     35         amount::{Currency, Decimal, decimal},
     36         url,
     37     },
     38 };
     39 use taler_cyclos::{
     40     config::{AccountType, HarnessCfg},
     41     constants::CONFIG_SOURCE,
     42     cyclos_api::{
     43         api::CyclosAuth,
     44         client::Client,
     45         types::{HistoryItem, OrderBy},
     46     },
     47     db::{self, TransferResult, dbinit},
     48     payto::FullCyclosPayto,
     49     setup,
     50     worker::{Worker, WorkerError, WorkerResult, run_worker},
     51 };
     52 
     53 /// Cyclos Adapter harness test suite
     54 #[derive(clap::Parser, Debug)]
     55 #[command(long_version = long_version(), about, long_about = None)]
     56 struct Args {
     57     #[clap(flatten)]
     58     common: CommonArgs,
     59 
     60     #[command(subcommand)]
     61     cmd: Command,
     62 }
     63 
     64 #[derive(clap::Subcommand, Debug)]
     65 enum Command {
     66     /// Run logic tests
     67     Logic {
     68         #[clap(long, short)]
     69         reset: bool,
     70     },
     71     /// Run online tests
     72     Online {
     73         #[clap(long, short)]
     74         reset: bool,
     75     },
     76 }
     77 
     78 fn step(step: &str) {
     79     println!("{}", step.green());
     80 }
     81 
     82 struct Harness<'a> {
     83     pool: &'a PgPool,
     84     client: Client<'a>,
     85     wire: Client<'a>,
     86     client_payto: FullCyclosPayto,
     87     wire_payto: FullCyclosPayto,
     88     payment_type_id: i64,
     89     account_type_id: i64,
     90     currency: Currency,
     91     root: CompactString,
     92 }
     93 
     94 impl<'a> Harness<'a> {
     95     async fn balance(&self) -> (Decimal, Decimal) {
     96         let (exchange, client) =
     97             tokio::try_join!(self.wire.accounts(), self.client.accounts()).unwrap();
     98         (
     99             exchange[0]
    100                 .status
    101                 .available_balance
    102                 .unwrap_or(exchange[0].status.balance),
    103             client[0]
    104                 .status
    105                 .available_balance
    106                 .unwrap_or(client[0].status.balance),
    107         )
    108     }
    109 
    110     /// Send transaction from client to exchange
    111     async fn client_send(&self, subject: &str, amount: Decimal) -> i64 {
    112         *self
    113             .client
    114             .direct_payment(*self.wire_payto.id, self.payment_type_id, amount, subject)
    115             .await
    116             .unwrap()
    117             .id
    118     }
    119 
    120     /// Send transaction from exchange to client
    121     async fn exchange_send(&self, subject: &str, amount: Decimal) -> i64 {
    122         *self
    123             .wire
    124             .direct_payment(*self.client_payto.id, self.payment_type_id, amount, subject)
    125             .await
    126             .unwrap()
    127             .id
    128     }
    129 
    130     /// Chargeback a transfer
    131     async fn chargeback(&self, id: i64) -> i64 {
    132         self.client.chargeback(id).await.unwrap()
    133     }
    134 
    135     /// Fetch last transfer related to client
    136     async fn client_last_transfer(&self) -> HistoryItem {
    137         self.client
    138             .history(*self.client_payto.id, OrderBy::DateDesc, 0, None)
    139             .await
    140             .unwrap()
    141             .page
    142             .remove(0)
    143     }
    144 
    145     /// Run the worker once
    146     async fn worker(&'a self) -> WorkerResult {
    147         let db = &mut self.pool.acquire().await.unwrap().detach();
    148         Worker {
    149             db,
    150             currency: self.currency.clone(),
    151             client: &self.wire,
    152             account_type: AccountType::Exchange,
    153             account_type_id: self.account_type_id,
    154             payment_type_id: self.payment_type_id,
    155         }
    156         .run()
    157         .await
    158     }
    159 
    160     async fn expect_incoming(&self, key: EddsaPublicKey) {
    161         let transfer = db::incoming_history(
    162             self.pool,
    163             &History {
    164                 page: Page {
    165                     limit: -1,
    166                     offset: None,
    167                 },
    168                 timeout_ms: None,
    169             },
    170             &self.currency,
    171             &self.root,
    172             dummy_listen,
    173         )
    174         .await
    175         .unwrap();
    176         assert!(matches!(
    177             transfer.first().unwrap(),
    178             IncomingBankTransaction::Reserve { reserve_pub, .. } if *reserve_pub == key,
    179         ));
    180     }
    181 
    182     async fn custom_transfer(&self, amount: Decimal, creditor_id: i64, creditor_name: &str) -> u64 {
    183         let res = db::make_transfer(
    184             self.pool,
    185             &db::Transfer {
    186                 request_uid: HashCode::rand(),
    187                 amount,
    188                 exchange_base_url: url("https://test.com"),
    189                 wtid: ShortHashCode::rand(),
    190                 creditor_id,
    191                 creditor_name: CompactString::new(creditor_name),
    192             },
    193             &Timestamp::now(),
    194         )
    195         .await
    196         .unwrap();
    197         match res {
    198             TransferResult::Success { id, .. } => id,
    199             TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(),
    200         }
    201     }
    202 
    203     async fn transfer(&self, amount: Decimal) -> u64 {
    204         self.custom_transfer(amount, *self.client_payto.id, &self.client_payto.name)
    205             .await
    206     }
    207 
    208     async fn transfer_id(&self, transfer_id: u64) -> i64 {
    209         sqlx::query(
    210             "SELECT transfer_id 
    211                 FROM transfer 
    212                     JOIN initiated USING (initiated_id)
    213                     JOIN tx_out USING (tx_out_id)
    214                 WHERE initiated_id=$1",
    215         )
    216         .bind(transfer_id as i64)
    217         .try_map(|r: PgRow| r.try_get(0))
    218         .fetch_one(self.pool)
    219         .await
    220         .unwrap()
    221     }
    222 
    223     async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) {
    224         let mut attempts = 0;
    225         loop {
    226             let transfer = db::transfer_by_id(self.pool, id, &self.currency, &self.root)
    227                 .await
    228                 .unwrap()
    229                 .unwrap();
    230             if (transfer.status, transfer.status_msg.as_deref()) == (status, msg) {
    231                 return;
    232             }
    233             if attempts > 40 {
    234                 assert_eq!(
    235                     (transfer.status, transfer.status_msg.as_deref()),
    236                     (status, msg)
    237                 );
    238             }
    239             attempts += 1;
    240             tokio::time::sleep(Duration::from_millis(200)).await;
    241         }
    242     }
    243 }
    244 
    245 struct Balances<'a> {
    246     client: &'a Harness<'a>,
    247     exchange_balance: Decimal,
    248     client_balance: Decimal,
    249 }
    250 
    251 impl<'a> Balances<'a> {
    252     pub async fn new(client: &'a Harness<'a>) -> Self {
    253         let (exchange_balance, client_balance) = client.balance().await;
    254         Self {
    255             client,
    256             exchange_balance,
    257             client_balance,
    258         }
    259     }
    260 
    261     async fn expect_add(&mut self, diff: Decimal) {
    262         self.exchange_balance = self.exchange_balance.try_add(&diff).unwrap();
    263         self.client_balance = self.client_balance.try_sub(&diff).unwrap();
    264         let mut attempts = 0;
    265         loop {
    266             let current = self.client.balance().await;
    267             if current == (self.exchange_balance, self.client_balance) {
    268                 return;
    269             }
    270             if attempts > 40 {
    271                 assert_eq!(
    272                     current,
    273                     (self.exchange_balance, self.client_balance),
    274                     "({} {}) +{diff}",
    275                     current.0,
    276                     current.1
    277                 );
    278             }
    279             attempts += 1;
    280             tokio::time::sleep(Duration::from_millis(200)).await;
    281         }
    282     }
    283 
    284     async fn expect_sub(&mut self, diff: Decimal) {
    285         self.exchange_balance = self.exchange_balance.try_sub(&diff).unwrap();
    286         self.client_balance = self.client_balance.try_add(&diff).unwrap();
    287 
    288         let mut attempts = 0;
    289         loop {
    290             let current = self.client.balance().await;
    291             if current == (self.exchange_balance, self.client_balance) {
    292                 return;
    293             }
    294             if attempts > 40 {
    295                 assert_eq!(
    296                     current,
    297                     (self.exchange_balance, self.client_balance),
    298                     "({} {}) -{diff}",
    299                     current.0,
    300                     current.1
    301                 );
    302             }
    303             attempts += 1;
    304             tokio::time::sleep(Duration::from_millis(200)).await;
    305         }
    306     }
    307 }
    308 
    309 /// Run logic tests against local Cyclos backend
    310 async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
    311     step("Run Cyclos logic harness tests");
    312 
    313     step("Prepare db");
    314     let pool = dbinit(cfg, reset).await?;
    315 
    316     let client = http_client::client()?;
    317     setup::setup(cfg, reset, &client).await?;
    318     let cfg = HarnessCfg::parse(cfg)?;
    319     let wire = Client {
    320         client: &client,
    321         api_url: &cfg.worker.host.api_url,
    322         auth: &CyclosAuth::Basic {
    323             username: cfg.worker.host.username,
    324             password: cfg.worker.host.password,
    325         },
    326     };
    327     let client = Client {
    328         client: &client,
    329         api_url: &cfg.worker.host.api_url,
    330         auth: &CyclosAuth::Basic {
    331             username: cfg.username,
    332             password: cfg.password,
    333         },
    334     };
    335     let harness = Harness {
    336         pool: &pool,
    337         client_payto: client
    338             .whoami()
    339             .await
    340             .unwrap()
    341             .payto(cfg.worker.root.clone()),
    342         wire_payto: wire.whoami().await.unwrap().payto(cfg.worker.root.clone()),
    343         client,
    344         wire,
    345         currency: cfg.worker.currency,
    346         root: cfg.worker.root,
    347         payment_type_id: *cfg.worker.payment_type_id,
    348         account_type_id: *cfg.worker.account_type_id,
    349     };
    350 
    351     step("Warmup");
    352     harness.worker().await.unwrap();
    353     let now = Timestamp::now();
    354     let balance = &mut Balances::new(&harness).await;
    355 
    356     step("Test incoming talerable transaction");
    357     // Send talerable transaction
    358     let reserve_pub = EddsaPublicKey::rand();
    359     let amount = decimal("3.3");
    360     harness
    361         .client_send(&format!("Taler {reserve_pub}"), amount)
    362         .await;
    363     // Sync and register
    364     harness.worker().await?;
    365     harness.expect_incoming(reserve_pub).await;
    366     balance.expect_add(amount).await;
    367 
    368     step("Test incoming malformed transaction");
    369     // Send malformed transaction
    370     let amount = decimal("3.4");
    371     harness
    372         .client_send(&format!("Malformed test {now}"), amount)
    373         .await;
    374     balance.expect_add(amount).await;
    375     // Sync and bounce
    376     harness.worker().await?;
    377     balance.expect_sub(amount).await;
    378 
    379     step("Test transfer transactions");
    380     let amount = decimal("3.5");
    381     // Init a transfer to client
    382     let transfer_id = harness.transfer(amount).await;
    383     // Check transfer pending
    384     harness
    385         .expect_transfer_status(transfer_id, TransferState::pending, None)
    386         .await;
    387     // Should send
    388     harness.worker().await?;
    389     // Wait for transaction to finalize
    390     balance.expect_sub(amount).await;
    391     // Should register
    392     harness.worker().await?;
    393     // Check transfer is now successful
    394     harness
    395         .expect_transfer_status(transfer_id, TransferState::success, None)
    396         .await;
    397 
    398     step("Test transfer to self");
    399     // Init a transfer to self
    400     let transfer_id = harness
    401         .custom_transfer(
    402             decimal("10.1"),
    403             *harness.wire_payto.id,
    404             &harness.wire_payto.name,
    405         )
    406         .await;
    407     // Should failed
    408     harness.worker().await?;
    409     // Check transfer failed
    410     harness
    411         .expect_transfer_status(
    412             transfer_id,
    413             TransferState::permanent_failure,
    414             Some("permissionDenied - The operation was denied because a required permission was not granted"),
    415         )
    416         .await;
    417 
    418     step("Test transfer to unknown account");
    419     // Init a transfer to unknown
    420     let transfer_id = harness
    421         .custom_transfer(decimal("10.1"), 42, "Unknown")
    422         .await;
    423     // Should failed
    424     harness.worker().await?;
    425     // Check transfer failed
    426     harness
    427         .expect_transfer_status(
    428             transfer_id,
    429             TransferState::permanent_failure,
    430             Some("unknown BasicUser 42"),
    431         )
    432         .await;
    433 
    434     step("Test unexpected outgoing");
    435     // Manual tx from the exchange
    436     let amount = decimal("4");
    437     harness
    438         .exchange_send(&format!("What is this ? {now}"), amount)
    439         .await;
    440     harness.worker().await?;
    441     // Wait for transaction to finalize
    442     balance.expect_sub(amount).await;
    443     harness.worker().await?;
    444 
    445     step("Test transfer chargeback");
    446     let amount = decimal("10.1");
    447     // Init a transfer to client
    448     let transfer_id = harness.transfer(amount).await;
    449     harness
    450         .expect_transfer_status(transfer_id, TransferState::pending, None)
    451         .await;
    452     // Send
    453     harness.worker().await?;
    454     balance.expect_sub(amount).await;
    455     harness
    456         .expect_transfer_status(transfer_id, TransferState::pending, None)
    457         .await;
    458     // Sync
    459     harness.worker().await?;
    460     harness
    461         .expect_transfer_status(transfer_id, TransferState::success, None)
    462         .await;
    463     // Chargeback
    464     harness
    465         .chargeback(harness.transfer_id(transfer_id).await)
    466         .await;
    467     balance.expect_add(amount).await;
    468     harness.worker().await?;
    469     harness
    470         .expect_transfer_status(
    471             transfer_id,
    472             TransferState::late_failure,
    473             Some("charged back"),
    474         )
    475         .await;
    476 
    477     step("Test recover unexpected chargeback");
    478     let amount = decimal("10.2");
    479     // Manual tx from the exchange
    480     harness
    481         .exchange_send(&format!("What is this chargebacked ? {now}"), amount)
    482         .await;
    483     balance.expect_sub(amount).await;
    484     // Chargeback
    485     harness
    486         .chargeback(*harness.client_last_transfer().await.id)
    487         .await;
    488     balance.expect_add(amount).await;
    489     // Sync
    490     harness.worker().await?;
    491 
    492     step("Test direct-payment failure");
    493     let amount = decimal("10.3");
    494     harness.transfer(amount).await;
    495     set_failure_scenario(&["direct-payment"]);
    496     assert!(matches!(
    497         harness.worker().await.unwrap_err(),
    498         WorkerError::Injected(InjectedErr("direct-payment"))
    499     ));
    500     harness.worker().await?;
    501     balance.expect_sub(amount).await;
    502     harness.worker().await?;
    503 
    504     step("Test chargeback failure");
    505     // Send malformed transaction
    506     let amount = decimal("10.4");
    507     harness
    508         .client_send(&format!("Malformed test {now} with failure"), amount)
    509         .await;
    510     balance.expect_add(amount).await;
    511     // Sync and bounce
    512     set_failure_scenario(&["chargeback"]);
    513     assert!(matches!(
    514         harness.worker().await.unwrap_err(),
    515         WorkerError::Injected(InjectedErr("chargeback"))
    516     ));
    517     balance.expect_sub(amount).await;
    518     // Sync recover
    519     harness.worker().await?;
    520 
    521     step("Finish");
    522     harness.worker().await?;
    523     balance.expect_add(Decimal::zero()).await;
    524     Ok(())
    525 }
    526 
    527 /// Run online tests against real Cyclos backend
    528 async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> {
    529     step("Run Cyclos online harness tests");
    530 
    531     step("Prepare db");
    532     let pool = dbinit(config, reset).await?;
    533     let http_client = http_client::client()?;
    534     setup::setup(config, reset, &http_client).await?;
    535     let cfg = HarnessCfg::parse(config)?;
    536     let wire = Client {
    537         client: &http_client,
    538         api_url: &cfg.worker.host.api_url,
    539         auth: &CyclosAuth::Basic {
    540             username: cfg.worker.host.username,
    541             password: cfg.worker.host.password,
    542         },
    543     };
    544     let client = Client {
    545         client: &http_client,
    546         api_url: &cfg.worker.host.api_url,
    547         auth: &CyclosAuth::Basic {
    548             username: cfg.username,
    549             password: cfg.password,
    550         },
    551     };
    552 
    553     let harness = Harness {
    554         pool: &pool,
    555         client_payto: client
    556             .whoami()
    557             .await
    558             .unwrap()
    559             .payto(cfg.worker.root.clone()),
    560         wire_payto: wire.whoami().await.unwrap().payto(cfg.worker.root.clone()),
    561         client,
    562         wire,
    563         currency: cfg.worker.currency,
    564         root: cfg.worker.root,
    565         payment_type_id: *cfg.worker.payment_type_id,
    566         account_type_id: *cfg.worker.account_type_id,
    567     };
    568 
    569     step("Warmup worker");
    570     let _worker_task = {
    571         let client = http_client.clone();
    572         let pool = pool.clone();
    573         let config = config.clone();
    574         tokio::spawn(async move { run_worker(&config, &pool, &client, false).await })
    575     };
    576     tokio::time::sleep(Duration::from_secs(5)).await;
    577     let now = Timestamp::now();
    578     let balance = &mut Balances::new(&harness).await;
    579 
    580     step("Test incoming transactions");
    581     let taler_amount = decimal("3");
    582     let malformed_amount = decimal("4");
    583     let reserve_pub = EddsaPublicKey::rand();
    584     harness
    585         .client_send(&format!("Taler {reserve_pub}"), taler_amount)
    586         .await;
    587     harness
    588         .client_send(&format!("Malformed test {now}"), malformed_amount)
    589         .await;
    590     balance.expect_add(taler_amount).await;
    591     harness.expect_incoming(reserve_pub).await;
    592 
    593     step("Test outgoing transactions");
    594     let self_amount = decimal("1");
    595     let taler_amount = decimal("2");
    596 
    597     let transfer_self = harness
    598         .custom_transfer(
    599             self_amount,
    600             *harness.wire_payto.id,
    601             &harness.wire_payto.name,
    602         )
    603         .await;
    604     let transfer_id = harness.transfer(taler_amount).await;
    605     balance.expect_sub(taler_amount).await;
    606     harness
    607         .expect_transfer_status(
    608             transfer_self,
    609             TransferState::permanent_failure,
    610             Some("permissionDenied - The operation was denied because a required permission was not granted"),
    611         )
    612         .await;
    613     harness
    614         .expect_transfer_status(transfer_id, TransferState::success, None)
    615         .await;
    616 
    617     step("Finish");
    618     tokio::time::sleep(Duration::from_secs(5)).await;
    619     balance.expect_add(Decimal::zero()).await;
    620 
    621     Ok(())
    622 }
    623 
    624 fn main() {
    625     let args = Args::parse();
    626     taler_main(CONFIG_SOURCE, args.common, |cfg| async move {
    627         match args.cmd {
    628             Command::Logic { reset } => logic_harness(&cfg, reset).await,
    629             Command::Online { reset } => online_harness(&cfg, reset).await,
    630         }
    631     });
    632 }