taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

magnet-bank-harness.rs (18101B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{fmt::Debug, time::Duration};
     18 
     19 use aws_lc_rs::signature::EcdsaKeyPair;
     20 use clap::Parser as _;
     21 use failure_injection::{InjectedErr, set_failure_scenario};
     22 use jiff::{Timestamp, Zoned};
     23 use owo_colors::OwoColorize;
     24 use sqlx::PgPool;
     25 use taler_api::notification::dummy_listen;
     26 use taler_build::long_version;
     27 use taler_common::{
     28     CommonArgs,
     29     api::{
     30         EddsaPublicKey, HashCode, ShortHashCode,
     31         params::{History, Page, Pooling},
     32         wire::{IncomingBankTransaction, TransferState},
     33     },
     34     config::Config,
     35     db::{dbinit, pool},
     36     taler_main,
     37     types::{amount::decimal, url},
     38 };
     39 use taler_magnet_bank::{
     40     FullHuPayto, HuIban,
     41     config::{AccountType, HarnessCfg, parse_db_cfg},
     42     constants::CONFIG_SOURCE,
     43     db::{self, TransferResult},
     44     magnet_api::{
     45         client::{ApiClient, AuthClient},
     46         types::{Account, Direction, Order, TxDto, TxStatus},
     47     },
     48     setup::{self, Keys},
     49     worker::{Worker, WorkerError, WorkerResult, run_worker},
     50 };
     51 
     52 // TODO macro for retry/expect logic
     53 
     54 /// Taler Magnet Bank Adapter harness test suite
     55 #[derive(clap::Parser, Debug)]
     56 #[command(long_version = long_version(), about, long_about = None)]
     57 struct Args {
     58     #[clap(flatten)]
     59     common: CommonArgs,
     60 
     61     #[command(subcommand)]
     62     cmd: Command,
     63 }
     64 
     65 #[derive(clap::Subcommand, Debug)]
     66 enum Command {
     67     /// Run logic tests
     68     Logic {
     69         #[arg(short, long)]
     70         reset: bool,
     71     },
     72     /// Run online tests
     73     Online {
     74         #[arg(short, long)]
     75         reset: bool,
     76     },
     77 }
     78 
     79 /// Custom client for harness actions
     80 struct Harness<'a> {
     81     cfg: &'a HarnessCfg,
     82     pool: &'a PgPool,
     83     api: ApiClient<'a>,
     84     exchange: Account,
     85     client: Account,
     86     signing_key: &'a EcdsaKeyPair,
     87 }
     88 
     89 impl<'a> Harness<'a> {
     90     async fn new(
     91         cfg: &'a HarnessCfg,
     92         client: &'a http_client::Client,
     93         pool: &'a PgPool,
     94         keys: &'a Keys,
     95     ) -> Self {
     96         let api = AuthClient::new(client, &cfg.worker.api_url, &cfg.worker.consumer)
     97             .upgrade(&keys.access_token);
     98         let (exchange, client) = tokio::try_join!(
     99             api.account(cfg.worker.payto.bban()),
    100             api.account(cfg.client_payto.bban())
    101         )
    102         .unwrap();
    103         Self {
    104             cfg,
    105             pool,
    106             api,
    107             exchange,
    108             client,
    109             signing_key: &keys.signing_key,
    110         }
    111     }
    112 
    113     async fn worker(&'a self) -> WorkerResult {
    114         let db = &mut self.pool.acquire().await.unwrap().detach();
    115         Worker {
    116             client: &self.api,
    117             db,
    118             account_number: &self.exchange.number,
    119             account_code: self.exchange.code,
    120             key: self.signing_key,
    121             account_type: AccountType::Exchange,
    122             ignore_tx_before: self.cfg.worker.ignore_tx_before,
    123             ignore_bounces_before: self.cfg.worker.ignore_bounces_before,
    124         }
    125         .run()
    126         .await
    127     }
    128 
    129     async fn balance(&self) -> (u32, u32) {
    130         let (exchange_balance, client_balance) = tokio::try_join!(
    131             self.api.balance_mini(self.exchange.iban.bban()),
    132             self.api.balance_mini(self.client.iban.bban())
    133         )
    134         .unwrap();
    135         (
    136             exchange_balance.balance as u32,
    137             client_balance.balance as u32,
    138         )
    139     }
    140 
    141     async fn custom_transfer(&self, forint: u32, creditor: FullHuPayto) -> u64 {
    142         let res = db::make_transfer(
    143             self.pool,
    144             &db::Transfer {
    145                 request_uid: HashCode::rand(),
    146                 amount: decimal(format!("{forint}")),
    147                 exchange_base_url: url("https://test.com"),
    148                 metadata: None,
    149                 wtid: ShortHashCode::rand(),
    150                 creditor,
    151             },
    152             &Timestamp::now(),
    153         )
    154         .await
    155         .unwrap();
    156         match res {
    157             TransferResult::Success { id, .. } => id,
    158             TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(),
    159         }
    160     }
    161 
    162     async fn transfer(&self, forint: u32) -> u64 {
    163         self.custom_transfer(forint, FullHuPayto::new(self.client.iban.clone(), "Name"))
    164             .await
    165     }
    166 
    167     async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) {
    168         let mut attempts = 0;
    169         loop {
    170             let transfer = db::transfer_by_id(self.pool, id).await.unwrap().unwrap();
    171             if (transfer.status, transfer.status_msg.as_deref()) == (status, msg) {
    172                 return;
    173             }
    174             if attempts > 40 {
    175                 assert_eq!(
    176                     (transfer.status, transfer.status_msg.as_deref()),
    177                     (status, msg)
    178                 );
    179             }
    180             attempts += 1;
    181             tokio::time::sleep(Duration::from_millis(200)).await;
    182         }
    183     }
    184 
    185     async fn expect_incoming(&self, key: EddsaPublicKey) {
    186         let transfer = db::incoming_history(
    187             self.pool,
    188             &History {
    189                 page: Page {
    190                     limit: -1,
    191                     offset: None,
    192                 },
    193                 pooling: Pooling { timeout_ms: None },
    194             },
    195             dummy_listen,
    196         )
    197         .await
    198         .unwrap();
    199         assert!(matches!(
    200             transfer.first().unwrap(),
    201             IncomingBankTransaction::Reserve { reserve_pub, .. } if *reserve_pub == key,
    202         ));
    203     }
    204 
    205     /// Send a transaction between two magnet accounts
    206     async fn send_tx(&self, from: &Account, to: &HuIban, subject: &str, amount: u32) -> u64 {
    207         let now = Zoned::now();
    208         let info = self
    209             .api
    210             .init_tx(
    211                 from.code,
    212                 amount as f64,
    213                 subject,
    214                 &now.date(),
    215                 "Name",
    216                 to.bban(),
    217             )
    218             .await
    219             .unwrap();
    220         self.api
    221             .submit_tx(
    222                 self.signing_key,
    223                 &from.number,
    224                 info.code,
    225                 info.amount,
    226                 &now.date(),
    227                 to.bban(),
    228             )
    229             .await
    230             .unwrap();
    231         info.code
    232     }
    233 
    234     async fn latest_tx(&self, account: &Account) -> TxDto {
    235         self.api
    236             .page_tx(
    237                 Direction::Both,
    238                 Order::Descending,
    239                 1,
    240                 account.iban.bban(),
    241                 &None,
    242                 true,
    243             )
    244             .await
    245             .unwrap()
    246             .list
    247             .pop()
    248             .unwrap()
    249             .tx
    250     }
    251 
    252     /// Send transaction from client to exchange
    253     async fn client_send(&self, subject: &str, amount: u32) -> u64 {
    254         self.send_tx(&self.client, &self.exchange.iban, subject, amount)
    255             .await
    256     }
    257 
    258     /// Send transaction from exchange to client
    259     async fn exchange_send_to(&self, subject: &str, amount: u32, to: &HuIban) -> u64 {
    260         self.send_tx(&self.exchange, to, subject, amount).await
    261     }
    262 
    263     /// Send transaction from exchange to client
    264     async fn exchange_send(&self, subject: &str, amount: u32) -> u64 {
    265         self.exchange_send_to(subject, amount, &self.client.iban)
    266             .await
    267     }
    268 
    269     async fn expect_status(&self, code: u64, status: TxStatus) {
    270         let mut attempts = 0;
    271         loop {
    272             let current = self.api.get_tx(code).await.unwrap().status;
    273             if current == status {
    274                 return;
    275             }
    276             if attempts > 40 {
    277                 assert_eq!(current, status, "{code}");
    278             }
    279             attempts += 1;
    280             tokio::time::sleep(Duration::from_millis(200)).await;
    281         }
    282     }
    283 }
    284 
    285 struct Balances<'a> {
    286     client: &'a Harness<'a>,
    287     exchange_balance: u32,
    288     client_balance: u32,
    289 }
    290 
    291 impl<'a> Balances<'a> {
    292     pub async fn new(client: &'a Harness<'a>) -> Self {
    293         let (exchange_balance, client_balance) = client.balance().await;
    294         Self {
    295             client,
    296             exchange_balance,
    297             client_balance,
    298         }
    299     }
    300 
    301     async fn expect(&mut self, diff: i32) {
    302         self.exchange_balance = (self.exchange_balance as i32 + diff) as u32;
    303         self.client_balance = (self.client_balance as i32 - diff) as u32;
    304         let mut attempts = 0;
    305         loop {
    306             let current = self.client.balance().await;
    307             if current == (self.exchange_balance, self.client_balance) {
    308                 return;
    309             }
    310             if attempts > 40 {
    311                 assert_eq!(
    312                     current,
    313                     (self.exchange_balance, self.client_balance),
    314                     "{current:?} {diff}"
    315                 );
    316             }
    317             attempts += 1;
    318             tokio::time::sleep(Duration::from_millis(200)).await;
    319         }
    320     }
    321 }
    322 
    323 fn step(step: &str) {
    324     println!("{}", step.green());
    325 }
    326 
    327 /// Run logic tests against local Magnet Bank backend
    328 async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
    329     step("Run Magnet Bank logic harness tests");
    330 
    331     step("Prepare db");
    332     let db_cfg = parse_db_cfg(cfg)?;
    333     let pool = pool(db_cfg.cfg, "magnet_bank").await?;
    334     let mut db = pool.acquire().await?.detach();
    335     dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
    336 
    337     let cfg = HarnessCfg::parse(cfg)?;
    338     let keys = setup::load(&cfg.worker)?;
    339     let client = http_client::client();
    340 
    341     let harness = Harness::new(&cfg, &client, &pool, &keys).await;
    342 
    343     step("Warmup");
    344     harness.worker().await?;
    345     tokio::time::sleep(Duration::from_secs(5)).await;
    346     harness.worker().await?;
    347 
    348     let unknown_account =
    349         FullHuPayto::new(HuIban::from_bban("1620000310991642").unwrap(), "Unknown");
    350     let now = Timestamp::now();
    351     let balance = &mut Balances::new(&harness).await;
    352 
    353     step("Test incoming talerable transaction");
    354     // Send talerable transaction
    355     let reserve_pub = EddsaPublicKey::rand();
    356     harness
    357         .client_send(&format!("Taler {reserve_pub}"), 33)
    358         .await;
    359     // Wait for transaction to finalize
    360     balance.expect(33).await;
    361     // Sync and register
    362     harness.worker().await?;
    363     harness.expect_incoming(reserve_pub).await;
    364 
    365     step("Test incoming malformed transaction");
    366     // Send malformed transaction
    367     harness
    368         .client_send(&format!("Malformed test {now}"), 34)
    369         .await;
    370     // Wait for transaction to finalize
    371     balance.expect(34).await;
    372     // Sync and bounce
    373     harness.worker().await?;
    374     // Wait for bounce to finalize
    375     balance.expect(-34).await;
    376     harness.worker().await?;
    377 
    378     step("Test transfer transactions");
    379     // Init a transfer to client
    380     let transfer_id = harness
    381         .custom_transfer(102, FullHuPayto::new(harness.client.iban.clone(), "Client"))
    382         .await;
    383     // Should send
    384     harness.worker().await?;
    385     // Check transfer is still pending
    386     harness
    387         .expect_transfer_status(transfer_id, TransferState::pending, None)
    388         .await;
    389     // Wait for transaction to finalize
    390     balance.expect(-102).await;
    391     // Should register
    392     harness.worker().await?;
    393     // Check transfer is now successful
    394     harness
    395         .expect_transfer_status(transfer_id, TransferState::success, None)
    396         .await;
    397 
    398     step("Test transfer to self");
    399     // Init a transfer to self
    400     let transfer_id = harness
    401         .custom_transfer(101, FullHuPayto::new(harness.exchange.iban.clone(), "Self"))
    402         .await;
    403     // Should failed
    404     harness.worker().await?;
    405     // Check transfer failed
    406     harness
    407         .expect_transfer_status(
    408             transfer_id,
    409             TransferState::permanent_failure,
    410             Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"),
    411         )
    412         .await;
    413 
    414     step("Test transfer to unknown account");
    415     let transfer_id = harness.custom_transfer(103, unknown_account.clone()).await;
    416     harness.worker().await?;
    417     harness
    418         .expect_transfer_status(transfer_id, TransferState::pending, None)
    419         .await;
    420     balance.expect(0).await;
    421     harness.worker().await?;
    422     harness
    423         .expect_transfer_status(transfer_id, TransferState::permanent_failure, None)
    424         .await;
    425 
    426     step("Test unexpected outgoing");
    427     // Manual tx from the exchange
    428     harness
    429         .exchange_send(&format!("What is this ? {now}"), 4)
    430         .await;
    431     harness.worker().await?;
    432     // Wait for transaction to finalize
    433     balance.expect(-4).await;
    434     harness.worker().await?;
    435 
    436     step("Test transfer failure init-tx");
    437     harness.transfer(10).await;
    438     set_failure_scenario(&["init-tx"]);
    439     assert!(matches!(
    440         harness.worker().await,
    441         Err(WorkerError::Injected(InjectedErr("init-tx")))
    442     ));
    443     harness.worker().await?;
    444     balance.expect(-10).await;
    445     harness.worker().await?;
    446 
    447     step("Test transfer failure submit-tx");
    448     harness.transfer(11).await;
    449     set_failure_scenario(&["submit-tx"]);
    450     assert!(matches!(
    451         harness.worker().await,
    452         Err(WorkerError::Injected(InjectedErr("submit-tx")))
    453     ));
    454     harness.worker().await?;
    455     balance.expect(-11).await;
    456     harness.worker().await?;
    457 
    458     step("Test transfer all failures");
    459     harness.transfer(13).await;
    460     set_failure_scenario(&["init-tx", "submit-tx"]);
    461     assert!(matches!(
    462         harness.worker().await,
    463         Err(WorkerError::Injected(InjectedErr("init-tx")))
    464     ));
    465     assert!(matches!(
    466         harness.worker().await,
    467         Err(WorkerError::Injected(InjectedErr("submit-tx")))
    468     ));
    469     harness.worker().await?;
    470     balance.expect(-13).await;
    471     harness.worker().await?;
    472 
    473     step("Test recover successful bounces");
    474     let code = harness
    475         .client_send(&format!("will be bounced {now}"), 2)
    476         .await;
    477     balance.expect(2).await;
    478     harness
    479         .exchange_send(&format!("bounced: {}", code + 1), 2)
    480         .await;
    481     balance.expect(-2).await;
    482     harness.worker().await?;
    483 
    484     step("Test recover failed bounces");
    485     // Send malformed transaction
    486     harness
    487         .client_send(&format!("will be failed bounced {now}"), 3)
    488         .await;
    489     // Wait for it to be received because rejected transaction take too much time to appear in the transactions log
    490     balance.expect(3).await;
    491     // Bounce it manually
    492     let received = harness.latest_tx(&harness.exchange).await;
    493     let bounce_code = harness
    494         .exchange_send_to(
    495             &format!("bounce manually: {}", received.code),
    496             3,
    497             &unknown_account,
    498         )
    499         .await;
    500     harness.expect_status(bounce_code, TxStatus::Rejected).await;
    501     // Should not bounce and catch the failure
    502     harness.worker().await?;
    503     // Wait for it to be bounce regardless because rejected transaction take too much time to appear in the transactions log
    504     // TODO fix this
    505     balance.expect(-3).await;
    506 
    507     step("Finish");
    508     tokio::time::sleep(Duration::from_secs(5)).await;
    509     harness.worker().await?;
    510     balance.expect(0).await;
    511     Ok(())
    512 }
    513 
    514 /// Run online tests against real Magnet Bank backend
    515 async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> {
    516     step("Run Magnet Bank online harness tests");
    517 
    518     step("Prepare db");
    519     let db_cfg = parse_db_cfg(config)?;
    520     let pool = pool(db_cfg.cfg, "magnet_bank").await?;
    521     let mut db = pool.acquire().await?.detach();
    522     dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
    523 
    524     let cfg = HarnessCfg::parse(config)?;
    525     let keys = setup::load(&cfg.worker)?;
    526     let client = http_client::client();
    527 
    528     let harness = Harness::new(&cfg, &client, &pool, &keys).await;
    529 
    530     step("Warmup worker");
    531     let _worker_task = {
    532         let client = client.clone();
    533         let pool = pool.clone();
    534         let config = config.clone();
    535         tokio::spawn(async move { run_worker(&config, &pool, &client, false).await })
    536     };
    537     tokio::time::sleep(Duration::from_secs(25)).await;
    538 
    539     let now = Timestamp::now();
    540     let balance = &mut Balances::new(&harness).await;
    541 
    542     step("Test incoming transactions");
    543     let reserve_pub = EddsaPublicKey::rand();
    544     harness
    545         .client_send(&format!("Taler {reserve_pub}"), 3)
    546         .await;
    547     harness
    548         .client_send(&format!("Malformed test {now}"), 4)
    549         .await;
    550     balance.expect(3).await;
    551     harness.expect_incoming(reserve_pub).await;
    552 
    553     step("Test outgoing transactions");
    554     let transfer_self = harness
    555         .custom_transfer(1, FullHuPayto::new(harness.exchange.iban.clone(), "Self"))
    556         .await;
    557     let transfer_id = harness
    558         .custom_transfer(2, FullHuPayto::new(harness.client.iban.clone(), "Client"))
    559         .await;
    560     balance.expect(-2).await;
    561     harness
    562         .expect_transfer_status(
    563             transfer_self,
    564             TransferState::permanent_failure,
    565             Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"),
    566         )
    567         .await;
    568     harness
    569         .expect_transfer_status(transfer_id, TransferState::success, None)
    570         .await;
    571 
    572     step("Finish");
    573     tokio::time::sleep(Duration::from_secs(5)).await;
    574     balance.expect(0).await;
    575 
    576     Ok(())
    577 }
    578 
    579 fn main() {
    580     let args = Args::parse();
    581     taler_main(CONFIG_SOURCE, args.common, async |cfg| match args.cmd {
    582         Command::Logic { reset } => logic_harness(&cfg, reset).await,
    583         Command::Online { reset } => online_harness(&cfg, reset).await,
    584     });
    585 }