taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

magnet-bank-harness.rs (18923B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{fmt::Debug, time::Duration};
     18 
     19 use clap::Parser as _;
     20 use jiff::{Timestamp, Zoned};
     21 use owo_colors::OwoColorize;
     22 use p256::ecdsa::SigningKey;
     23 use sqlx::PgPool;
     24 use taler_build::long_version;
     25 use taler_common::{
     26     CommonArgs,
     27     api_common::{EddsaPublicKey, HashCode, ShortHashCode, rand_edsa_pub_key},
     28     api_params::{History, Page},
     29     api_wire::{IncomingBankTransaction, TransferRequest, TransferState},
     30     config::Config,
     31     db::{dbinit, pool},
     32     taler_main,
     33     types::{amount::amount, url},
     34 };
     35 use taler_magnet_bank::{
     36     FullHuPayto, HuIban,
     37     config::{AccountType, HarnessCfg, parse_db_cfg},
     38     constants::CONFIG_SOURCE,
     39     db::{self, TransferResult},
     40     failure_injection::{FailureLogic, InjectedErr, set_failure_logic},
     41     magnet_api::{
     42         client::{ApiClient, AuthClient},
     43         types::{Account, Direction, Order, TxDto, TxStatus},
     44     },
     45     run_worker,
     46     setup::{self, Keys},
     47     worker::{Worker, WorkerError, WorkerResult},
     48 };
     49 
     50 // TODO macro for retry/expect logic
     51 
     52 /// Taler Magnet Bank Adapter harness test suite
     53 #[derive(clap::Parser, Debug)]
     54 #[command(long_version = long_version(), about, long_about = None)]
     55 struct Args {
     56     #[clap(flatten)]
     57     common: CommonArgs,
     58 
     59     #[command(subcommand)]
     60     cmd: Command,
     61 }
     62 
     63 #[derive(clap::Subcommand, Debug)]
     64 enum Command {
     65     /// Run logic tests
     66     Logic {
     67         #[clap(long, short)]
     68         reset: bool,
     69     },
     70     /// Run online tests
     71     Online {
     72         #[clap(long, short)]
     73         reset: bool,
     74     },
     75 }
     76 
     77 /// Custom client for harness actions
     78 struct Harness<'a> {
     79     cfg: &'a HarnessCfg,
     80     pool: &'a PgPool,
     81     api: ApiClient<'a>,
     82     exchange: Account,
     83     client: Account,
     84     signing_key: &'a SigningKey,
     85 }
     86 
     87 impl<'a> Harness<'a> {
     88     async fn new(
     89         cfg: &'a HarnessCfg,
     90         client: &'a reqwest::Client,
     91         pool: &'a PgPool,
     92         keys: &'a Keys,
     93     ) -> Self {
     94         let api = AuthClient::new(client, &cfg.worker.api_url, &cfg.worker.consumer)
     95             .upgrade(&keys.access_token);
     96         let (exchange, client) = tokio::try_join!(
     97             api.account(cfg.worker.payto.bban()),
     98             api.account(cfg.client_payto.bban())
     99         )
    100         .unwrap();
    101         Self {
    102             cfg,
    103             pool,
    104             api,
    105             exchange,
    106             client,
    107             signing_key: &keys.signing_key,
    108         }
    109     }
    110 
    111     async fn worker(&'a self) -> WorkerResult {
    112         let db = &mut self.pool.acquire().await.unwrap().detach();
    113         Worker {
    114             client: &self.api,
    115             db,
    116             account_number: &self.exchange.number,
    117             account_code: self.exchange.code,
    118             key: self.signing_key,
    119             account_type: AccountType::Exchange,
    120             ignore_tx_before: self.cfg.worker.ignore_tx_before,
    121             ignore_bounces_before: self.cfg.worker.ignore_bounces_before,
    122         }
    123         .run()
    124         .await
    125     }
    126 
    127     async fn balance(&self) -> (u32, u32) {
    128         let (exchange_balance, client_balance) = tokio::try_join!(
    129             self.api.balance_mini(self.exchange.iban.bban()),
    130             self.api.balance_mini(self.client.iban.bban())
    131         )
    132         .unwrap();
    133         (
    134             exchange_balance.balance as u32,
    135             client_balance.balance as u32,
    136         )
    137     }
    138 
    139     async fn custom_transfer(&self, forint: u32, creditor: &FullHuPayto) -> u64 {
    140         let res = db::make_transfer(
    141             self.pool,
    142             &TransferRequest {
    143                 request_uid: HashCode::rand(),
    144                 amount: amount(format!("HUF:{forint}")),
    145                 exchange_base_url: url("https://test.com"),
    146                 wtid: ShortHashCode::rand(),
    147                 credit_account: creditor.as_payto(),
    148             },
    149             creditor,
    150             &Timestamp::now(),
    151         )
    152         .await
    153         .unwrap();
    154         match res {
    155             TransferResult::Success { id, .. } => id,
    156             TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(),
    157         }
    158     }
    159 
    160     async fn transfer(&self, forint: u32) -> u64 {
    161         self.custom_transfer(
    162             forint,
    163             &FullHuPayto::new(self.client.iban.clone(), "Name".to_owned()),
    164         )
    165         .await
    166     }
    167 
    168     async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) {
    169         let mut attempts = 0;
    170         loop {
    171             let transfer = db::transfer_by_id(self.pool, id).await.unwrap().unwrap();
    172             if (transfer.status, transfer.status_msg.as_deref()) == (status, msg) {
    173                 return;
    174             }
    175             if attempts > 40 {
    176                 assert_eq!(
    177                     (transfer.status, transfer.status_msg.as_deref()),
    178                     (status, msg)
    179                 );
    180             }
    181             attempts += 1;
    182             tokio::time::sleep(Duration::from_millis(200)).await;
    183         }
    184     }
    185 
    186     async fn expect_incoming(&self, key: EddsaPublicKey) {
    187         let transfer = db::incoming_history(
    188             self.pool,
    189             &History {
    190                 page: Page {
    191                     limit: -1,
    192                     offset: None,
    193                 },
    194                 timeout_ms: None,
    195             },
    196             || tokio::sync::watch::channel(0).1,
    197         )
    198         .await
    199         .unwrap();
    200         assert!(matches!(
    201             transfer.first().unwrap(),
    202             IncomingBankTransaction::Reserve { reserve_pub, .. } if *reserve_pub == key,
    203         ));
    204     }
    205 
    206     /// Send a transaction between two magnet accounts
    207     async fn send_tx(&self, from: &Account, to: &HuIban, subject: &str, amount: u32) -> u64 {
    208         let now = Zoned::now();
    209         let info = self
    210             .api
    211             .init_tx(
    212                 from.code,
    213                 amount as f64,
    214                 subject,
    215                 &now.date(),
    216                 "Name",
    217                 to.bban(),
    218             )
    219             .await
    220             .unwrap();
    221         self.api
    222             .submit_tx(
    223                 self.signing_key,
    224                 &from.number,
    225                 info.code,
    226                 info.amount,
    227                 &now.date(),
    228                 to.bban(),
    229             )
    230             .await
    231             .unwrap();
    232         info.code
    233     }
    234 
    235     async fn latest_tx(&self, account: &Account) -> TxDto {
    236         self.api
    237             .page_tx(
    238                 Direction::Both,
    239                 Order::Descending,
    240                 1,
    241                 account.iban.bban(),
    242                 &None,
    243                 true,
    244             )
    245             .await
    246             .unwrap()
    247             .list
    248             .pop()
    249             .unwrap()
    250             .tx
    251     }
    252 
    253     async fn expect_latest_tx(&self, account: &Account, mut check: impl FnMut(&TxDto) -> bool) {
    254         let mut attempts = 0;
    255         loop {
    256             let current = self.latest_tx(account).await;
    257             if check(&current) {
    258                 return;
    259             }
    260             if attempts > 40 {
    261                 assert!(check(&current), "{current:?}");
    262             }
    263             attempts += 1;
    264             tokio::time::sleep(Duration::from_millis(200)).await;
    265         }
    266     }
    267 
    268     /// Send transaction from client to exchange
    269     async fn client_send(&self, subject: &str, amount: u32) -> u64 {
    270         self.send_tx(&self.client, &self.exchange.iban, subject, amount)
    271             .await
    272     }
    273 
    274     /// Send transaction from exchange to client
    275     async fn exchange_send_to(&self, subject: &str, amount: u32, to: &HuIban) -> u64 {
    276         self.send_tx(&self.exchange, to, subject, amount).await
    277     }
    278 
    279     /// Send transaction from exchange to client
    280     async fn exchange_send(&self, subject: &str, amount: u32) -> u64 {
    281         self.exchange_send_to(subject, amount, &self.client.iban)
    282             .await
    283     }
    284 
    285     async fn expect_status(&self, code: u64, status: TxStatus) {
    286         let mut attempts = 0;
    287         loop {
    288             let current = self.api.get_tx(code).await.unwrap().status;
    289             if current == status {
    290                 return;
    291             }
    292             if attempts > 40 {
    293                 assert_eq!(current, status, "{code}");
    294             }
    295             attempts += 1;
    296             tokio::time::sleep(Duration::from_millis(200)).await;
    297         }
    298     }
    299 }
    300 
    301 struct Balances<'a> {
    302     client: &'a Harness<'a>,
    303     exchange_balance: u32,
    304     client_balance: u32,
    305 }
    306 
    307 impl<'a> Balances<'a> {
    308     pub async fn new(client: &'a Harness<'a>) -> Self {
    309         let (exchange_balance, client_balance) = client.balance().await;
    310         Self {
    311             client,
    312             exchange_balance,
    313             client_balance,
    314         }
    315     }
    316 
    317     async fn expect(&mut self, diff: i32) {
    318         self.exchange_balance = (self.exchange_balance as i32 + diff) as u32;
    319         self.client_balance = (self.client_balance as i32 - diff) as u32;
    320         let mut attempts = 0;
    321         loop {
    322             let current = self.client.balance().await;
    323             if current == (self.exchange_balance, self.client_balance) {
    324                 return;
    325             }
    326             if attempts > 40 {
    327                 assert_eq!(
    328                     current,
    329                     (self.exchange_balance, self.client_balance),
    330                     "{current:?} {diff}"
    331                 );
    332             }
    333             attempts += 1;
    334             tokio::time::sleep(Duration::from_millis(200)).await;
    335         }
    336     }
    337 }
    338 
    339 fn step(step: &str) {
    340     println!("{}", step.green());
    341 }
    342 
    343 /// Run logic tests against real Magnet Bank backend
    344 async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
    345     step("Run Magnet Bank logic harness tests");
    346 
    347     step("Prepare db");
    348     let db_cfg = parse_db_cfg(cfg)?;
    349     let pool = pool(db_cfg.cfg, "magnet_bank").await?;
    350     let mut db = pool.acquire().await?.detach();
    351     dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
    352 
    353     let cfg = HarnessCfg::parse(cfg)?;
    354     let keys = setup::load(&cfg.worker)?;
    355     let client = reqwest::Client::new();
    356 
    357     let harness = Harness::new(&cfg, &client, &pool, &keys).await;
    358 
    359     step("Warmup");
    360     harness.worker().await?;
    361     tokio::time::sleep(Duration::from_secs(5)).await;
    362     harness.worker().await?;
    363 
    364     let unknown_account = FullHuPayto::new(
    365         HuIban::from_bban("1620000310991642").unwrap(),
    366         "Unknown".to_string(),
    367     );
    368     let now = Timestamp::now();
    369     let balance = &mut Balances::new(&harness).await;
    370 
    371     step("Test incoming talerable transaction");
    372     // Send talerable transaction
    373     let reserve_pub = rand_edsa_pub_key();
    374     harness
    375         .client_send(&format!("Taler {reserve_pub}"), 33)
    376         .await;
    377     // Wait for transaction to finalize
    378     balance.expect(33).await;
    379     // Sync and register
    380     harness.worker().await?;
    381     harness.expect_incoming(reserve_pub).await;
    382 
    383     step("Test incoming malformed transaction");
    384     // Send malformed transaction
    385     harness
    386         .client_send(&format!("Malformed test {now}"), 34)
    387         .await;
    388     // Wait for transaction to finalize
    389     balance.expect(34).await;
    390     // Sync and bounce
    391     harness.worker().await?;
    392     // Wait for bounce to finalize
    393     balance.expect(-34).await;
    394     harness.worker().await?;
    395 
    396     step("Test transfer transactions");
    397     // Init a transfer to client
    398     let transfer_id = harness
    399         .custom_transfer(
    400             102,
    401             &FullHuPayto::new(harness.client.iban.clone(), "Client".to_string()),
    402         )
    403         .await;
    404     // Should send
    405     harness.worker().await?;
    406     // Check transfer is still pending
    407     harness
    408         .expect_transfer_status(transfer_id, TransferState::pending, None)
    409         .await;
    410     // Wait for transaction to finalize
    411     balance.expect(-102).await;
    412     // Should register
    413     harness.worker().await?;
    414     // Check transfer is now successful
    415     harness
    416         .expect_transfer_status(transfer_id, TransferState::success, None)
    417         .await;
    418 
    419     step("Test transfer to self");
    420     // Init a transfer to self
    421     let transfer_id = harness
    422         .custom_transfer(
    423             101,
    424             &FullHuPayto::new(harness.exchange.iban.clone(), "Self".to_string()),
    425         )
    426         .await;
    427     // Should failed
    428     harness.worker().await?;
    429     // Check transfer failed
    430     harness
    431         .expect_transfer_status(
    432             transfer_id,
    433             TransferState::permanent_failure,
    434             Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"),
    435         )
    436         .await;
    437 
    438     step("Test transfer to unknown account");
    439     let transfer_id = harness.custom_transfer(103, &unknown_account).await;
    440     harness.worker().await?;
    441     harness
    442         .expect_transfer_status(transfer_id, TransferState::pending, None)
    443         .await;
    444     balance.expect(0).await;
    445     harness.worker().await?;
    446     harness
    447         .expect_transfer_status(transfer_id, TransferState::permanent_failure, None)
    448         .await;
    449 
    450     step("Test unexpected outgoing");
    451     // Manual tx from the exchange
    452     harness
    453         .exchange_send(&format!("What is this ? {now}"), 4)
    454         .await;
    455     harness.worker().await?;
    456     // Wait for transaction to finalize
    457     balance.expect(-4).await;
    458     harness.worker().await?;
    459 
    460     step("Test transfer failure init-tx");
    461     harness.transfer(10).await;
    462     set_failure_logic(FailureLogic::History(vec!["init-tx"]));
    463     assert!(matches!(
    464         harness.worker().await,
    465         Err(WorkerError::Injected(InjectedErr("init-tx")))
    466     ));
    467     harness.worker().await?;
    468     balance.expect(-10).await;
    469     harness.worker().await?;
    470 
    471     step("Test transfer failure submit-tx");
    472     harness.transfer(11).await;
    473     set_failure_logic(FailureLogic::History(vec!["submit-tx"]));
    474     assert!(matches!(
    475         harness.worker().await,
    476         Err(WorkerError::Injected(InjectedErr("submit-tx")))
    477     ));
    478     harness.worker().await?;
    479     balance.expect(-11).await;
    480     harness.worker().await?;
    481 
    482     step("Test transfer all failures");
    483     harness.transfer(13).await;
    484     set_failure_logic(FailureLogic::History(vec!["init-tx", "submit-tx"]));
    485     assert!(matches!(
    486         harness.worker().await,
    487         Err(WorkerError::Injected(InjectedErr("init-tx")))
    488     ));
    489     assert!(matches!(
    490         harness.worker().await,
    491         Err(WorkerError::Injected(InjectedErr("submit-tx")))
    492     ));
    493     harness.worker().await?;
    494     balance.expect(-13).await;
    495     harness.worker().await?;
    496 
    497     step("Test recover successful bounces");
    498     let code = harness
    499         .client_send(&format!("will be bounced {now}"), 2)
    500         .await;
    501     balance.expect(2).await;
    502     harness
    503         .exchange_send(&format!("bounced: {}", code + 1), 2)
    504         .await;
    505     balance.expect(-2).await;
    506     harness.worker().await?;
    507 
    508     step("Test recover failed bounces");
    509     // Send malformed transaction
    510     harness
    511         .client_send(&format!("will be failed bounced {now}"), 3)
    512         .await;
    513     // Wait for it to be received because rejected transaction take too much time to appear in the transactions log
    514     balance.expect(3).await;
    515     // Bounce it manually
    516     let received = harness.latest_tx(&harness.exchange).await;
    517     let bounce_code = harness
    518         .exchange_send_to(
    519             &format!("bounce manually: {}", received.code),
    520             3,
    521             &unknown_account,
    522         )
    523         .await;
    524     harness.expect_status(bounce_code, TxStatus::Rejected).await;
    525     // Should not bounce and catch the failure
    526     harness.worker().await?;
    527     // Wait for it to be bounce regardless because rejected transaction take too much time to appear in the transactions log
    528     // TODO fix this
    529     balance.expect(-3).await;
    530 
    531     step("Finish");
    532     tokio::time::sleep(Duration::from_secs(5)).await;
    533     harness.worker().await?;
    534     balance.expect(0).await;
    535     Ok(())
    536 }
    537 
    538 /// Run online tests against real Magnet Bank backend
    539 async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> {
    540     step("Run Magnet Bank online harness tests");
    541 
    542     step("Prepare db");
    543     let db_cfg = parse_db_cfg(config)?;
    544     let pool = pool(db_cfg.cfg, "magnet_bank").await?;
    545     let mut db = pool.acquire().await?.detach();
    546     dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
    547 
    548     let cfg = HarnessCfg::parse(config)?;
    549     let keys = setup::load(&cfg.worker)?;
    550     let client = reqwest::Client::new();
    551 
    552     let harness = Harness::new(&cfg, &client, &pool, &keys).await;
    553 
    554     step("Warmup worker");
    555     let _worker_task = {
    556         let client = client.clone();
    557         let pool = pool.clone();
    558         let config = config.clone();
    559         tokio::spawn(async move { run_worker(&config, &pool, &client).await })
    560     };
    561     tokio::time::sleep(Duration::from_secs(25)).await;
    562 
    563     let now = Timestamp::now();
    564     let balance = &mut Balances::new(&harness).await;
    565 
    566     step("Test incoming transactions");
    567     let reserve_pub = rand_edsa_pub_key();
    568     harness
    569         .client_send(&format!("Taler {reserve_pub}"), 3)
    570         .await;
    571     harness
    572         .client_send(&format!("Malformed test {now}"), 4)
    573         .await;
    574     balance.expect(3).await;
    575     harness.expect_incoming(reserve_pub).await;
    576 
    577     step("Test outgoing transactions");
    578     let transfer_self = harness
    579         .custom_transfer(
    580             1,
    581             &FullHuPayto::new(harness.exchange.iban.clone(), "Self".to_string()),
    582         )
    583         .await;
    584     let transfer_id = harness
    585         .custom_transfer(
    586             2,
    587             &FullHuPayto::new(harness.client.iban.clone(), "Client".to_string()),
    588         )
    589         .await;
    590     balance.expect(-2).await;
    591     harness
    592         .expect_transfer_status(
    593             transfer_self,
    594             TransferState::permanent_failure,
    595             Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"),
    596         )
    597         .await;
    598     harness
    599         .expect_transfer_status(transfer_id, TransferState::success, None)
    600         .await;
    601 
    602     step("Finish");
    603     tokio::time::sleep(Duration::from_secs(5)).await;
    604     balance.expect(0).await;
    605 
    606     Ok(())
    607 }
    608 
    609 fn main() {
    610     let args = Args::parse();
    611     taler_main(CONFIG_SOURCE, args.common, |cfg| async move {
    612         match args.cmd {
    613             Command::Logic { reset } => logic_harness(&cfg, reset).await,
    614             Command::Online { reset } => online_harness(&cfg, reset).await,
    615         }
    616     });
    617 }