taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

magnet-bank-harness.rs (18068B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{fmt::Debug, time::Duration};
     18 
     19 use aws_lc_rs::signature::EcdsaKeyPair;
     20 use clap::Parser as _;
     21 use failure_injection::{InjectedErr, set_failure_scenario};
     22 use jiff::{Timestamp, Zoned};
     23 use owo_colors::OwoColorize;
     24 use sqlx::PgPool;
     25 use taler_api::notification::dummy_listen;
     26 use taler_build::long_version;
     27 use taler_common::{
     28     CommonArgs,
     29     api_common::{EddsaPublicKey, HashCode, ShortHashCode},
     30     api_params::{History, Page},
     31     api_wire::{IncomingBankTransaction, TransferState},
     32     config::Config,
     33     db::{dbinit, pool},
     34     taler_main,
     35     types::{amount::decimal, url},
     36 };
     37 use taler_magnet_bank::{
     38     FullHuPayto, HuIban,
     39     config::{AccountType, HarnessCfg, parse_db_cfg},
     40     constants::CONFIG_SOURCE,
     41     db::{self, TransferResult},
     42     magnet_api::{
     43         client::{ApiClient, AuthClient},
     44         types::{Account, Direction, Order, TxDto, TxStatus},
     45     },
     46     setup::{self, Keys},
     47     worker::{Worker, WorkerError, WorkerResult, run_worker},
     48 };
     49 
     50 // TODO macro for retry/expect logic
     51 
     52 /// Taler Magnet Bank Adapter harness test suite
     53 #[derive(clap::Parser, Debug)]
     54 #[command(long_version = long_version(), about, long_about = None)]
     55 struct Args {
     56     #[clap(flatten)]
     57     common: CommonArgs,
     58 
     59     #[command(subcommand)]
     60     cmd: Command,
     61 }
     62 
     63 #[derive(clap::Subcommand, Debug)]
     64 enum Command {
     65     /// Run logic tests
     66     Logic {
     67         #[clap(long, short)]
     68         reset: bool,
     69     },
     70     /// Run online tests
     71     Online {
     72         #[clap(long, short)]
     73         reset: bool,
     74     },
     75 }
     76 
     77 /// Custom client for harness actions
     78 struct Harness<'a> {
     79     cfg: &'a HarnessCfg,
     80     pool: &'a PgPool,
     81     api: ApiClient<'a>,
     82     exchange: Account,
     83     client: Account,
     84     signing_key: &'a EcdsaKeyPair,
     85 }
     86 
     87 impl<'a> Harness<'a> {
     88     async fn new(
     89         cfg: &'a HarnessCfg,
     90         client: &'a http_client::Client,
     91         pool: &'a PgPool,
     92         keys: &'a Keys,
     93     ) -> Self {
     94         let api = AuthClient::new(client, &cfg.worker.api_url, &cfg.worker.consumer)
     95             .upgrade(&keys.access_token);
     96         let (exchange, client) = tokio::try_join!(
     97             api.account(cfg.worker.payto.bban()),
     98             api.account(cfg.client_payto.bban())
     99         )
    100         .unwrap();
    101         Self {
    102             cfg,
    103             pool,
    104             api,
    105             exchange,
    106             client,
    107             signing_key: &keys.signing_key,
    108         }
    109     }
    110 
    111     async fn worker(&'a self) -> WorkerResult {
    112         let db = &mut self.pool.acquire().await.unwrap().detach();
    113         Worker {
    114             client: &self.api,
    115             db,
    116             account_number: &self.exchange.number,
    117             account_code: self.exchange.code,
    118             key: self.signing_key,
    119             account_type: AccountType::Exchange,
    120             ignore_tx_before: self.cfg.worker.ignore_tx_before,
    121             ignore_bounces_before: self.cfg.worker.ignore_bounces_before,
    122         }
    123         .run()
    124         .await
    125     }
    126 
    127     async fn balance(&self) -> (u32, u32) {
    128         let (exchange_balance, client_balance) = tokio::try_join!(
    129             self.api.balance_mini(self.exchange.iban.bban()),
    130             self.api.balance_mini(self.client.iban.bban())
    131         )
    132         .unwrap();
    133         (
    134             exchange_balance.balance as u32,
    135             client_balance.balance as u32,
    136         )
    137     }
    138 
    139     async fn custom_transfer(&self, forint: u32, creditor: FullHuPayto) -> u64 {
    140         let res = db::make_transfer(
    141             self.pool,
    142             &db::Transfer {
    143                 request_uid: HashCode::rand(),
    144                 amount: decimal(format!("{forint}")),
    145                 exchange_base_url: url("https://test.com"),
    146                 wtid: ShortHashCode::rand(),
    147                 creditor,
    148             },
    149             &Timestamp::now(),
    150         )
    151         .await
    152         .unwrap();
    153         match res {
    154             TransferResult::Success { id, .. } => id,
    155             TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(),
    156         }
    157     }
    158 
    159     async fn transfer(&self, forint: u32) -> u64 {
    160         self.custom_transfer(forint, FullHuPayto::new(self.client.iban.clone(), "Name"))
    161             .await
    162     }
    163 
    164     async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) {
    165         let mut attempts = 0;
    166         loop {
    167             let transfer = db::transfer_by_id(self.pool, id).await.unwrap().unwrap();
    168             if (transfer.status, transfer.status_msg.as_deref()) == (status, msg) {
    169                 return;
    170             }
    171             if attempts > 40 {
    172                 assert_eq!(
    173                     (transfer.status, transfer.status_msg.as_deref()),
    174                     (status, msg)
    175                 );
    176             }
    177             attempts += 1;
    178             tokio::time::sleep(Duration::from_millis(200)).await;
    179         }
    180     }
    181 
    182     async fn expect_incoming(&self, key: EddsaPublicKey) {
    183         let transfer = db::incoming_history(
    184             self.pool,
    185             &History {
    186                 page: Page {
    187                     limit: -1,
    188                     offset: None,
    189                 },
    190                 timeout_ms: None,
    191             },
    192             dummy_listen,
    193         )
    194         .await
    195         .unwrap();
    196         assert!(matches!(
    197             transfer.first().unwrap(),
    198             IncomingBankTransaction::Reserve { reserve_pub, .. } if *reserve_pub == key,
    199         ));
    200     }
    201 
    202     /// Send a transaction between two magnet accounts
    203     async fn send_tx(&self, from: &Account, to: &HuIban, subject: &str, amount: u32) -> u64 {
    204         let now = Zoned::now();
    205         let info = self
    206             .api
    207             .init_tx(
    208                 from.code,
    209                 amount as f64,
    210                 subject,
    211                 &now.date(),
    212                 "Name",
    213                 to.bban(),
    214             )
    215             .await
    216             .unwrap();
    217         self.api
    218             .submit_tx(
    219                 self.signing_key,
    220                 &from.number,
    221                 info.code,
    222                 info.amount,
    223                 &now.date(),
    224                 to.bban(),
    225             )
    226             .await
    227             .unwrap();
    228         info.code
    229     }
    230 
    231     async fn latest_tx(&self, account: &Account) -> TxDto {
    232         self.api
    233             .page_tx(
    234                 Direction::Both,
    235                 Order::Descending,
    236                 1,
    237                 account.iban.bban(),
    238                 &None,
    239                 true,
    240             )
    241             .await
    242             .unwrap()
    243             .list
    244             .pop()
    245             .unwrap()
    246             .tx
    247     }
    248 
    249     /// Send transaction from client to exchange
    250     async fn client_send(&self, subject: &str, amount: u32) -> u64 {
    251         self.send_tx(&self.client, &self.exchange.iban, subject, amount)
    252             .await
    253     }
    254 
    255     /// Send transaction from exchange to client
    256     async fn exchange_send_to(&self, subject: &str, amount: u32, to: &HuIban) -> u64 {
    257         self.send_tx(&self.exchange, to, subject, amount).await
    258     }
    259 
    260     /// Send transaction from exchange to client
    261     async fn exchange_send(&self, subject: &str, amount: u32) -> u64 {
    262         self.exchange_send_to(subject, amount, &self.client.iban)
    263             .await
    264     }
    265 
    266     async fn expect_status(&self, code: u64, status: TxStatus) {
    267         let mut attempts = 0;
    268         loop {
    269             let current = self.api.get_tx(code).await.unwrap().status;
    270             if current == status {
    271                 return;
    272             }
    273             if attempts > 40 {
    274                 assert_eq!(current, status, "{code}");
    275             }
    276             attempts += 1;
    277             tokio::time::sleep(Duration::from_millis(200)).await;
    278         }
    279     }
    280 }
    281 
    282 struct Balances<'a> {
    283     client: &'a Harness<'a>,
    284     exchange_balance: u32,
    285     client_balance: u32,
    286 }
    287 
    288 impl<'a> Balances<'a> {
    289     pub async fn new(client: &'a Harness<'a>) -> Self {
    290         let (exchange_balance, client_balance) = client.balance().await;
    291         Self {
    292             client,
    293             exchange_balance,
    294             client_balance,
    295         }
    296     }
    297 
    298     async fn expect(&mut self, diff: i32) {
    299         self.exchange_balance = (self.exchange_balance as i32 + diff) as u32;
    300         self.client_balance = (self.client_balance as i32 - diff) as u32;
    301         let mut attempts = 0;
    302         loop {
    303             let current = self.client.balance().await;
    304             if current == (self.exchange_balance, self.client_balance) {
    305                 return;
    306             }
    307             if attempts > 40 {
    308                 assert_eq!(
    309                     current,
    310                     (self.exchange_balance, self.client_balance),
    311                     "{current:?} {diff}"
    312                 );
    313             }
    314             attempts += 1;
    315             tokio::time::sleep(Duration::from_millis(200)).await;
    316         }
    317     }
    318 }
    319 
    320 fn step(step: &str) {
    321     println!("{}", step.green());
    322 }
    323 
    324 /// Run logic tests against local Magnet Bank backend
    325 async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
    326     step("Run Magnet Bank logic harness tests");
    327 
    328     step("Prepare db");
    329     let db_cfg = parse_db_cfg(cfg)?;
    330     let pool = pool(db_cfg.cfg, "magnet_bank").await?;
    331     let mut db = pool.acquire().await?.detach();
    332     dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
    333 
    334     let cfg = HarnessCfg::parse(cfg)?;
    335     let keys = setup::load(&cfg.worker)?;
    336     let client = http_client::client()?;
    337 
    338     let harness = Harness::new(&cfg, &client, &pool, &keys).await;
    339 
    340     step("Warmup");
    341     harness.worker().await?;
    342     tokio::time::sleep(Duration::from_secs(5)).await;
    343     harness.worker().await?;
    344 
    345     let unknown_account =
    346         FullHuPayto::new(HuIban::from_bban("1620000310991642").unwrap(), "Unknown");
    347     let now = Timestamp::now();
    348     let balance = &mut Balances::new(&harness).await;
    349 
    350     step("Test incoming talerable transaction");
    351     // Send talerable transaction
    352     let reserve_pub = EddsaPublicKey::rand();
    353     harness
    354         .client_send(&format!("Taler {reserve_pub}"), 33)
    355         .await;
    356     // Wait for transaction to finalize
    357     balance.expect(33).await;
    358     // Sync and register
    359     harness.worker().await?;
    360     harness.expect_incoming(reserve_pub).await;
    361 
    362     step("Test incoming malformed transaction");
    363     // Send malformed transaction
    364     harness
    365         .client_send(&format!("Malformed test {now}"), 34)
    366         .await;
    367     // Wait for transaction to finalize
    368     balance.expect(34).await;
    369     // Sync and bounce
    370     harness.worker().await?;
    371     // Wait for bounce to finalize
    372     balance.expect(-34).await;
    373     harness.worker().await?;
    374 
    375     step("Test transfer transactions");
    376     // Init a transfer to client
    377     let transfer_id = harness
    378         .custom_transfer(102, FullHuPayto::new(harness.client.iban.clone(), "Client"))
    379         .await;
    380     // Should send
    381     harness.worker().await?;
    382     // Check transfer is still pending
    383     harness
    384         .expect_transfer_status(transfer_id, TransferState::pending, None)
    385         .await;
    386     // Wait for transaction to finalize
    387     balance.expect(-102).await;
    388     // Should register
    389     harness.worker().await?;
    390     // Check transfer is now successful
    391     harness
    392         .expect_transfer_status(transfer_id, TransferState::success, None)
    393         .await;
    394 
    395     step("Test transfer to self");
    396     // Init a transfer to self
    397     let transfer_id = harness
    398         .custom_transfer(101, FullHuPayto::new(harness.exchange.iban.clone(), "Self"))
    399         .await;
    400     // Should failed
    401     harness.worker().await?;
    402     // Check transfer failed
    403     harness
    404         .expect_transfer_status(
    405             transfer_id,
    406             TransferState::permanent_failure,
    407             Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"),
    408         )
    409         .await;
    410 
    411     step("Test transfer to unknown account");
    412     let transfer_id = harness.custom_transfer(103, unknown_account.clone()).await;
    413     harness.worker().await?;
    414     harness
    415         .expect_transfer_status(transfer_id, TransferState::pending, None)
    416         .await;
    417     balance.expect(0).await;
    418     harness.worker().await?;
    419     harness
    420         .expect_transfer_status(transfer_id, TransferState::permanent_failure, None)
    421         .await;
    422 
    423     step("Test unexpected outgoing");
    424     // Manual tx from the exchange
    425     harness
    426         .exchange_send(&format!("What is this ? {now}"), 4)
    427         .await;
    428     harness.worker().await?;
    429     // Wait for transaction to finalize
    430     balance.expect(-4).await;
    431     harness.worker().await?;
    432 
    433     step("Test transfer failure init-tx");
    434     harness.transfer(10).await;
    435     set_failure_scenario(&["init-tx"]);
    436     assert!(matches!(
    437         harness.worker().await,
    438         Err(WorkerError::Injected(InjectedErr("init-tx")))
    439     ));
    440     harness.worker().await?;
    441     balance.expect(-10).await;
    442     harness.worker().await?;
    443 
    444     step("Test transfer failure submit-tx");
    445     harness.transfer(11).await;
    446     set_failure_scenario(&["submit-tx"]);
    447     assert!(matches!(
    448         harness.worker().await,
    449         Err(WorkerError::Injected(InjectedErr("submit-tx")))
    450     ));
    451     harness.worker().await?;
    452     balance.expect(-11).await;
    453     harness.worker().await?;
    454 
    455     step("Test transfer all failures");
    456     harness.transfer(13).await;
    457     set_failure_scenario(&["init-tx", "submit-tx"]);
    458     assert!(matches!(
    459         harness.worker().await,
    460         Err(WorkerError::Injected(InjectedErr("init-tx")))
    461     ));
    462     assert!(matches!(
    463         harness.worker().await,
    464         Err(WorkerError::Injected(InjectedErr("submit-tx")))
    465     ));
    466     harness.worker().await?;
    467     balance.expect(-13).await;
    468     harness.worker().await?;
    469 
    470     step("Test recover successful bounces");
    471     let code = harness
    472         .client_send(&format!("will be bounced {now}"), 2)
    473         .await;
    474     balance.expect(2).await;
    475     harness
    476         .exchange_send(&format!("bounced: {}", code + 1), 2)
    477         .await;
    478     balance.expect(-2).await;
    479     harness.worker().await?;
    480 
    481     step("Test recover failed bounces");
    482     // Send malformed transaction
    483     harness
    484         .client_send(&format!("will be failed bounced {now}"), 3)
    485         .await;
    486     // Wait for it to be received because rejected transaction take too much time to appear in the transactions log
    487     balance.expect(3).await;
    488     // Bounce it manually
    489     let received = harness.latest_tx(&harness.exchange).await;
    490     let bounce_code = harness
    491         .exchange_send_to(
    492             &format!("bounce manually: {}", received.code),
    493             3,
    494             &unknown_account,
    495         )
    496         .await;
    497     harness.expect_status(bounce_code, TxStatus::Rejected).await;
    498     // Should not bounce and catch the failure
    499     harness.worker().await?;
    500     // Wait for it to be bounce regardless because rejected transaction take too much time to appear in the transactions log
    501     // TODO fix this
    502     balance.expect(-3).await;
    503 
    504     step("Finish");
    505     tokio::time::sleep(Duration::from_secs(5)).await;
    506     harness.worker().await?;
    507     balance.expect(0).await;
    508     Ok(())
    509 }
    510 
    511 /// Run online tests against real Magnet Bank backend
    512 async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> {
    513     step("Run Magnet Bank online harness tests");
    514 
    515     step("Prepare db");
    516     let db_cfg = parse_db_cfg(config)?;
    517     let pool = pool(db_cfg.cfg, "magnet_bank").await?;
    518     let mut db = pool.acquire().await?.detach();
    519     dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
    520 
    521     let cfg = HarnessCfg::parse(config)?;
    522     let keys = setup::load(&cfg.worker)?;
    523     let client = http_client::client()?;
    524 
    525     let harness = Harness::new(&cfg, &client, &pool, &keys).await;
    526 
    527     step("Warmup worker");
    528     let _worker_task = {
    529         let client = client.clone();
    530         let pool = pool.clone();
    531         let config = config.clone();
    532         tokio::spawn(async move { run_worker(&config, &pool, &client, false).await })
    533     };
    534     tokio::time::sleep(Duration::from_secs(25)).await;
    535 
    536     let now = Timestamp::now();
    537     let balance = &mut Balances::new(&harness).await;
    538 
    539     step("Test incoming transactions");
    540     let reserve_pub = EddsaPublicKey::rand();
    541     harness
    542         .client_send(&format!("Taler {reserve_pub}"), 3)
    543         .await;
    544     harness
    545         .client_send(&format!("Malformed test {now}"), 4)
    546         .await;
    547     balance.expect(3).await;
    548     harness.expect_incoming(reserve_pub).await;
    549 
    550     step("Test outgoing transactions");
    551     let transfer_self = harness
    552         .custom_transfer(1, FullHuPayto::new(harness.exchange.iban.clone(), "Self"))
    553         .await;
    554     let transfer_id = harness
    555         .custom_transfer(2, FullHuPayto::new(harness.client.iban.clone(), "Client"))
    556         .await;
    557     balance.expect(-2).await;
    558     harness
    559         .expect_transfer_status(
    560             transfer_self,
    561             TransferState::permanent_failure,
    562             Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"),
    563         )
    564         .await;
    565     harness
    566         .expect_transfer_status(transfer_id, TransferState::success, None)
    567         .await;
    568 
    569     step("Finish");
    570     tokio::time::sleep(Duration::from_secs(5)).await;
    571     balance.expect(0).await;
    572 
    573     Ok(())
    574 }
    575 
    576 fn main() {
    577     let args = Args::parse();
    578     taler_main(CONFIG_SOURCE, args.common, |cfg| async move {
    579         match args.cmd {
    580             Command::Logic { reset } => logic_harness(&cfg, reset).await,
    581             Command::Online { reset } => online_harness(&cfg, reset).await,
    582         }
    583     });
    584 }