cyclos-harness.rs (19124B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::time::Duration; 18 19 use clap::Parser as _; 20 use compact_str::CompactString; 21 use failure_injection::{InjectedErr, set_failure_scenario}; 22 use jiff::Timestamp; 23 use owo_colors::OwoColorize as _; 24 use sqlx::{PgPool, Row as _, postgres::PgRow}; 25 use taler_api::notification::dummy_listen; 26 use taler_build::long_version; 27 use taler_common::{ 28 CommonArgs, 29 api_common::{EddsaPublicKey, HashCode, ShortHashCode}, 30 api_params::{History, Page}, 31 api_wire::{IncomingBankTransaction, TransferState}, 32 config::Config, 33 taler_main, 34 types::{ 35 amount::{Currency, Decimal, decimal}, 36 url, 37 }, 38 }; 39 use taler_cyclos::{ 40 config::{AccountType, HarnessCfg}, 41 constants::CONFIG_SOURCE, 42 cyclos_api::{ 43 api::CyclosAuth, 44 client::Client, 45 types::{HistoryItem, OrderBy}, 46 }, 47 db::{self, TransferResult, dbinit}, 48 payto::FullCyclosPayto, 49 setup, 50 worker::{Worker, WorkerError, WorkerResult, run_worker}, 51 }; 52 53 /// Cyclos Adapter harness test suite 54 #[derive(clap::Parser, Debug)] 55 #[command(long_version = long_version(), about, long_about = None)] 56 struct Args { 57 #[clap(flatten)] 58 common: CommonArgs, 59 60 #[command(subcommand)] 61 cmd: Command, 62 } 63 64 #[derive(clap::Subcommand, Debug)] 65 enum Command { 66 /// Run logic tests 67 Logic { 68 #[clap(long, short)] 69 reset: bool, 70 }, 71 /// Run online tests 72 Online { 73 #[clap(long, short)] 74 reset: bool, 75 }, 76 } 77 78 fn step(step: &str) { 79 println!("{}", step.green()); 80 } 81 82 struct Harness<'a> { 83 pool: &'a PgPool, 84 client: Client<'a>, 85 wire: Client<'a>, 86 client_payto: FullCyclosPayto, 87 wire_payto: FullCyclosPayto, 88 payment_type_id: i64, 89 account_type_id: i64, 90 currency: Currency, 91 root: CompactString, 92 } 93 94 impl<'a> Harness<'a> { 95 async fn balance(&self) -> (Decimal, Decimal) { 96 let (exchange, client) = 97 tokio::try_join!(self.wire.accounts(), self.client.accounts()).unwrap(); 98 ( 99 exchange[0] 100 .status 101 .available_balance 102 .unwrap_or(exchange[0].status.balance), 103 client[0] 104 .status 105 .available_balance 106 .unwrap_or(client[0].status.balance), 107 ) 108 } 109 110 /// Send transaction from client to exchange 111 async fn client_send(&self, subject: &str, amount: Decimal) -> i64 { 112 *self 113 .client 114 .direct_payment(*self.wire_payto.id, self.payment_type_id, amount, subject) 115 .await 116 .unwrap() 117 .id 118 } 119 120 /// Send transaction from exchange to client 121 async fn exchange_send(&self, subject: &str, amount: Decimal) -> i64 { 122 *self 123 .wire 124 .direct_payment(*self.client_payto.id, self.payment_type_id, amount, subject) 125 .await 126 .unwrap() 127 .id 128 } 129 130 /// Chargeback a transfer 131 async fn chargeback(&self, id: i64) -> i64 { 132 self.client.chargeback(id).await.unwrap() 133 } 134 135 /// Fetch last transfer related to client 136 async fn client_last_transfer(&self) -> HistoryItem { 137 self.client 138 .history(*self.client_payto.id, OrderBy::DateDesc, 0, None) 139 .await 140 .unwrap() 141 .page 142 .remove(0) 143 } 144 145 /// Run the worker once 146 async fn worker(&'a self) -> WorkerResult { 147 let db = &mut self.pool.acquire().await.unwrap().detach(); 148 Worker { 149 db, 150 currency: self.currency.clone(), 151 client: &self.wire, 152 account_type: AccountType::Exchange, 153 account_type_id: self.account_type_id, 154 payment_type_id: self.payment_type_id, 155 } 156 .run() 157 .await 158 } 159 160 async fn expect_incoming(&self, key: EddsaPublicKey) { 161 let transfer = db::incoming_history( 162 self.pool, 163 &History { 164 page: Page { 165 limit: -1, 166 offset: None, 167 }, 168 timeout_ms: None, 169 }, 170 &self.currency, 171 &self.root, 172 dummy_listen, 173 ) 174 .await 175 .unwrap(); 176 assert!(matches!( 177 transfer.first().unwrap(), 178 IncomingBankTransaction::Reserve { reserve_pub, .. } if *reserve_pub == key, 179 )); 180 } 181 182 async fn custom_transfer(&self, amount: Decimal, creditor_id: i64, creditor_name: &str) -> u64 { 183 let res = db::make_transfer( 184 self.pool, 185 &db::Transfer { 186 request_uid: HashCode::rand(), 187 amount, 188 exchange_base_url: url("https://test.com"), 189 wtid: ShortHashCode::rand(), 190 creditor_id, 191 creditor_name: CompactString::new(creditor_name), 192 }, 193 &Timestamp::now(), 194 ) 195 .await 196 .unwrap(); 197 match res { 198 TransferResult::Success { id, .. } => id, 199 TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(), 200 } 201 } 202 203 async fn transfer(&self, amount: Decimal) -> u64 { 204 self.custom_transfer(amount, *self.client_payto.id, &self.client_payto.name) 205 .await 206 } 207 208 async fn transfer_id(&self, transfer_id: u64) -> i64 { 209 sqlx::query( 210 "SELECT transfer_id 211 FROM transfer 212 JOIN initiated USING (initiated_id) 213 JOIN tx_out USING (tx_out_id) 214 WHERE initiated_id=$1", 215 ) 216 .bind(transfer_id as i64) 217 .try_map(|r: PgRow| r.try_get(0)) 218 .fetch_one(self.pool) 219 .await 220 .unwrap() 221 } 222 223 async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) { 224 let mut attempts = 0; 225 loop { 226 let transfer = db::transfer_by_id(self.pool, id, &self.currency, &self.root) 227 .await 228 .unwrap() 229 .unwrap(); 230 if (transfer.status, transfer.status_msg.as_deref()) == (status, msg) { 231 return; 232 } 233 if attempts > 40 { 234 assert_eq!( 235 (transfer.status, transfer.status_msg.as_deref()), 236 (status, msg) 237 ); 238 } 239 attempts += 1; 240 tokio::time::sleep(Duration::from_millis(200)).await; 241 } 242 } 243 } 244 245 struct Balances<'a> { 246 client: &'a Harness<'a>, 247 exchange_balance: Decimal, 248 client_balance: Decimal, 249 } 250 251 impl<'a> Balances<'a> { 252 pub async fn new(client: &'a Harness<'a>) -> Self { 253 let (exchange_balance, client_balance) = client.balance().await; 254 Self { 255 client, 256 exchange_balance, 257 client_balance, 258 } 259 } 260 261 async fn expect_add(&mut self, diff: Decimal) { 262 self.exchange_balance = self.exchange_balance.try_add(&diff).unwrap(); 263 self.client_balance = self.client_balance.try_sub(&diff).unwrap(); 264 let mut attempts = 0; 265 loop { 266 let current = self.client.balance().await; 267 if current == (self.exchange_balance, self.client_balance) { 268 return; 269 } 270 if attempts > 40 { 271 assert_eq!( 272 current, 273 (self.exchange_balance, self.client_balance), 274 "({} {}) +{diff}", 275 current.0, 276 current.1 277 ); 278 } 279 attempts += 1; 280 tokio::time::sleep(Duration::from_millis(200)).await; 281 } 282 } 283 284 async fn expect_sub(&mut self, diff: Decimal) { 285 self.exchange_balance = self.exchange_balance.try_sub(&diff).unwrap(); 286 self.client_balance = self.client_balance.try_add(&diff).unwrap(); 287 288 let mut attempts = 0; 289 loop { 290 let current = self.client.balance().await; 291 if current == (self.exchange_balance, self.client_balance) { 292 return; 293 } 294 if attempts > 40 { 295 assert_eq!( 296 current, 297 (self.exchange_balance, self.client_balance), 298 "({} {}) -{diff}", 299 current.0, 300 current.1 301 ); 302 } 303 attempts += 1; 304 tokio::time::sleep(Duration::from_millis(200)).await; 305 } 306 } 307 } 308 309 /// Run logic tests against local Cyclos backend 310 async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { 311 step("Run Cyclos logic harness tests"); 312 313 step("Prepare db"); 314 let pool = dbinit(cfg, reset).await?; 315 316 let client = http_client::client()?; 317 setup::setup(cfg, reset, &client).await?; 318 let cfg = HarnessCfg::parse(cfg)?; 319 let wire = Client { 320 client: &client, 321 api_url: &cfg.worker.host.api_url, 322 auth: &CyclosAuth::Basic { 323 username: cfg.worker.host.username, 324 password: cfg.worker.host.password, 325 }, 326 }; 327 let client = Client { 328 client: &client, 329 api_url: &cfg.worker.host.api_url, 330 auth: &CyclosAuth::Basic { 331 username: cfg.username, 332 password: cfg.password, 333 }, 334 }; 335 let harness = Harness { 336 pool: &pool, 337 client_payto: client 338 .whoami() 339 .await 340 .unwrap() 341 .payto(cfg.worker.root.clone()), 342 wire_payto: wire.whoami().await.unwrap().payto(cfg.worker.root.clone()), 343 client, 344 wire, 345 currency: cfg.worker.currency, 346 root: cfg.worker.root, 347 payment_type_id: *cfg.worker.payment_type_id, 348 account_type_id: *cfg.worker.account_type_id, 349 }; 350 351 step("Warmup"); 352 harness.worker().await.unwrap(); 353 let now = Timestamp::now(); 354 let balance = &mut Balances::new(&harness).await; 355 356 step("Test incoming talerable transaction"); 357 // Send talerable transaction 358 let reserve_pub = EddsaPublicKey::rand(); 359 let amount = decimal("3.3"); 360 harness 361 .client_send(&format!("Taler {reserve_pub}"), amount) 362 .await; 363 // Sync and register 364 harness.worker().await?; 365 harness.expect_incoming(reserve_pub).await; 366 balance.expect_add(amount).await; 367 368 step("Test incoming malformed transaction"); 369 // Send malformed transaction 370 let amount = decimal("3.4"); 371 harness 372 .client_send(&format!("Malformed test {now}"), amount) 373 .await; 374 balance.expect_add(amount).await; 375 // Sync and bounce 376 harness.worker().await?; 377 balance.expect_sub(amount).await; 378 379 step("Test transfer transactions"); 380 let amount = decimal("3.5"); 381 // Init a transfer to client 382 let transfer_id = harness.transfer(amount).await; 383 // Check transfer pending 384 harness 385 .expect_transfer_status(transfer_id, TransferState::pending, None) 386 .await; 387 // Should send 388 harness.worker().await?; 389 // Wait for transaction to finalize 390 balance.expect_sub(amount).await; 391 // Should register 392 harness.worker().await?; 393 // Check transfer is now successful 394 harness 395 .expect_transfer_status(transfer_id, TransferState::success, None) 396 .await; 397 398 step("Test transfer to self"); 399 // Init a transfer to self 400 let transfer_id = harness 401 .custom_transfer( 402 decimal("10.1"), 403 *harness.wire_payto.id, 404 &harness.wire_payto.name, 405 ) 406 .await; 407 // Should failed 408 harness.worker().await?; 409 // Check transfer failed 410 harness 411 .expect_transfer_status( 412 transfer_id, 413 TransferState::permanent_failure, 414 Some("permissionDenied - The operation was denied because a required permission was not granted"), 415 ) 416 .await; 417 418 step("Test transfer to unknown account"); 419 // Init a transfer to unknown 420 let transfer_id = harness 421 .custom_transfer(decimal("10.1"), 42, "Unknown") 422 .await; 423 // Should failed 424 harness.worker().await?; 425 // Check transfer failed 426 harness 427 .expect_transfer_status( 428 transfer_id, 429 TransferState::permanent_failure, 430 Some("unknown BasicUser 42"), 431 ) 432 .await; 433 434 step("Test unexpected outgoing"); 435 // Manual tx from the exchange 436 let amount = decimal("4"); 437 harness 438 .exchange_send(&format!("What is this ? {now}"), amount) 439 .await; 440 harness.worker().await?; 441 // Wait for transaction to finalize 442 balance.expect_sub(amount).await; 443 harness.worker().await?; 444 445 step("Test transfer chargeback"); 446 let amount = decimal("10.1"); 447 // Init a transfer to client 448 let transfer_id = harness.transfer(amount).await; 449 harness 450 .expect_transfer_status(transfer_id, TransferState::pending, None) 451 .await; 452 // Send 453 harness.worker().await?; 454 balance.expect_sub(amount).await; 455 harness 456 .expect_transfer_status(transfer_id, TransferState::pending, None) 457 .await; 458 // Sync 459 harness.worker().await?; 460 harness 461 .expect_transfer_status(transfer_id, TransferState::success, None) 462 .await; 463 // Chargeback 464 harness 465 .chargeback(harness.transfer_id(transfer_id).await) 466 .await; 467 balance.expect_add(amount).await; 468 harness.worker().await?; 469 harness 470 .expect_transfer_status( 471 transfer_id, 472 TransferState::late_failure, 473 Some("charged back"), 474 ) 475 .await; 476 477 step("Test recover unexpected chargeback"); 478 let amount = decimal("10.2"); 479 // Manual tx from the exchange 480 harness 481 .exchange_send(&format!("What is this chargebacked ? {now}"), amount) 482 .await; 483 balance.expect_sub(amount).await; 484 // Chargeback 485 harness 486 .chargeback(*harness.client_last_transfer().await.id) 487 .await; 488 balance.expect_add(amount).await; 489 // Sync 490 harness.worker().await?; 491 492 step("Test direct-payment failure"); 493 let amount = decimal("10.3"); 494 harness.transfer(amount).await; 495 set_failure_scenario(&["direct-payment"]); 496 assert!(matches!( 497 harness.worker().await.unwrap_err(), 498 WorkerError::Injected(InjectedErr("direct-payment")) 499 )); 500 harness.worker().await?; 501 balance.expect_sub(amount).await; 502 harness.worker().await?; 503 504 step("Test chargeback failure"); 505 // Send malformed transaction 506 let amount = decimal("10.4"); 507 harness 508 .client_send(&format!("Malformed test {now} with failure"), amount) 509 .await; 510 balance.expect_add(amount).await; 511 // Sync and bounce 512 set_failure_scenario(&["chargeback"]); 513 assert!(matches!( 514 harness.worker().await.unwrap_err(), 515 WorkerError::Injected(InjectedErr("chargeback")) 516 )); 517 balance.expect_sub(amount).await; 518 // Sync recover 519 harness.worker().await?; 520 521 step("Finish"); 522 harness.worker().await?; 523 balance.expect_add(Decimal::zero()).await; 524 Ok(()) 525 } 526 527 /// Run online tests against real Cyclos backend 528 async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> { 529 step("Run Cyclos online harness tests"); 530 531 step("Prepare db"); 532 let pool = dbinit(config, reset).await?; 533 let http_client = http_client::client()?; 534 setup::setup(config, reset, &http_client).await?; 535 let cfg = HarnessCfg::parse(config)?; 536 let wire = Client { 537 client: &http_client, 538 api_url: &cfg.worker.host.api_url, 539 auth: &CyclosAuth::Basic { 540 username: cfg.worker.host.username, 541 password: cfg.worker.host.password, 542 }, 543 }; 544 let client = Client { 545 client: &http_client, 546 api_url: &cfg.worker.host.api_url, 547 auth: &CyclosAuth::Basic { 548 username: cfg.username, 549 password: cfg.password, 550 }, 551 }; 552 553 let harness = Harness { 554 pool: &pool, 555 client_payto: client 556 .whoami() 557 .await 558 .unwrap() 559 .payto(cfg.worker.root.clone()), 560 wire_payto: wire.whoami().await.unwrap().payto(cfg.worker.root.clone()), 561 client, 562 wire, 563 currency: cfg.worker.currency, 564 root: cfg.worker.root, 565 payment_type_id: *cfg.worker.payment_type_id, 566 account_type_id: *cfg.worker.account_type_id, 567 }; 568 569 step("Warmup worker"); 570 let _worker_task = { 571 let client = http_client.clone(); 572 let pool = pool.clone(); 573 let config = config.clone(); 574 tokio::spawn(async move { run_worker(&config, &pool, &client, false).await }) 575 }; 576 tokio::time::sleep(Duration::from_secs(5)).await; 577 let now = Timestamp::now(); 578 let balance = &mut Balances::new(&harness).await; 579 580 step("Test incoming transactions"); 581 let taler_amount = decimal("3"); 582 let malformed_amount = decimal("4"); 583 let reserve_pub = EddsaPublicKey::rand(); 584 harness 585 .client_send(&format!("Taler {reserve_pub}"), taler_amount) 586 .await; 587 harness 588 .client_send(&format!("Malformed test {now}"), malformed_amount) 589 .await; 590 balance.expect_add(taler_amount).await; 591 harness.expect_incoming(reserve_pub).await; 592 593 step("Test outgoing transactions"); 594 let self_amount = decimal("1"); 595 let taler_amount = decimal("2"); 596 597 let transfer_self = harness 598 .custom_transfer( 599 self_amount, 600 *harness.wire_payto.id, 601 &harness.wire_payto.name, 602 ) 603 .await; 604 let transfer_id = harness.transfer(taler_amount).await; 605 balance.expect_sub(taler_amount).await; 606 harness 607 .expect_transfer_status( 608 transfer_self, 609 TransferState::permanent_failure, 610 Some("permissionDenied - The operation was denied because a required permission was not granted"), 611 ) 612 .await; 613 harness 614 .expect_transfer_status(transfer_id, TransferState::success, None) 615 .await; 616 617 step("Finish"); 618 tokio::time::sleep(Duration::from_secs(5)).await; 619 balance.expect_add(Decimal::zero()).await; 620 621 Ok(()) 622 } 623 624 fn main() { 625 let args = Args::parse(); 626 taler_main(CONFIG_SOURCE, args.common, |cfg| async move { 627 match args.cmd { 628 Command::Logic { reset } => logic_harness(&cfg, reset).await, 629 Command::Online { reset } => online_harness(&cfg, reset).await, 630 } 631 }); 632 }