utils.rs (15900B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2022-2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::{ 18 fmt::Display, 19 io::Write as _, 20 net::{Ipv4Addr, SocketAddrV4, TcpListener}, 21 ops::{Deref, DerefMut}, 22 path::{Path, PathBuf}, 23 process::{Child, Command, Stdio}, 24 str::FromStr, 25 sync::Arc, 26 time::Duration, 27 }; 28 29 use indicatif::ProgressBar; 30 use ini::Ini; 31 use taler_common::{ 32 api::{ 33 EddsaPublicKey, ShortHashCode, 34 wire::{IncomingBankTransaction, IncomingHistory, OutgoingHistory, TransferRequest}, 35 }, 36 types::{amount::Amount, base32::Base32, payto::PaytoURI}, 37 }; 38 use url::Url; 39 40 const LOG: &str = "DEBUG"; 41 42 #[must_use] 43 pub async fn check_incoming(base_url: &str, txs: &[(EddsaPublicKey, Amount)]) -> bool { 44 let mut res = ureq::get(&format!("{base_url}history/incoming")) 45 .query("delta", format!("-{}", 100)) 46 .call() 47 .unwrap(); 48 if txs.is_empty() { 49 res.status() == 204 50 } else { 51 if res.status() != 200 { 52 return false; 53 } 54 let history: IncomingHistory = res.body_mut().read_json().unwrap(); 55 56 history.incoming_transactions.len() == txs.len() 57 && txs.iter().all(|(reserve_pub_key, taler_amount)| { 58 history.incoming_transactions.iter().any(|h| { 59 matches!( 60 h, 61 IncomingBankTransaction::Reserve { 62 reserve_pub, 63 amount, 64 .. 65 } if reserve_pub == reserve_pub_key && amount == taler_amount 66 ) 67 }) 68 }) 69 } 70 } 71 72 #[must_use] 73 pub async fn check_gateway_down(base_url: &str) -> bool { 74 matches!( 75 ureq::get(&format!("{base_url}history/incoming")) 76 .query("delta", "-5") 77 .call(), 78 Err(ureq::Error::StatusCode(504 | 502)) 79 ) 80 } 81 82 #[must_use] 83 pub async fn check_gateway_up(base_url: &str) -> bool { 84 ureq::get(&format!("{base_url}config")).call().is_ok() 85 } 86 87 pub async fn transfer(base_url: &str, wtid: &[u8; 32], credit_account: PaytoURI, amount: &Amount) { 88 loop { 89 let res = ureq::post(&format!("{base_url}transfer")).send_json(TransferRequest { 90 request_uid: Base32::rand(), 91 amount: *amount, 92 exchange_base_url: Url::parse("https://exchange.test/").unwrap(), 93 wtid: Base32::from(*wtid), 94 credit_account: credit_account.clone(), 95 metadata: None, 96 }); 97 if !matches!(res, Err(ureq::Error::StatusCode(502))) { 98 res.unwrap(); 99 break; 100 } 101 } 102 } 103 104 #[must_use] 105 pub async fn check_outgoing(base_url: &str, txs: &[(ShortHashCode, Amount)]) -> bool { 106 let mut res = ureq::get(format!("{base_url}history/outgoing")) 107 .query("delta", format!("-{}", txs.len())) 108 .call() 109 .unwrap(); 110 if txs.is_empty() { 111 res.status() == 204 112 } else { 113 if res.status() != 200 { 114 return false; 115 } 116 let history: OutgoingHistory = res.body_mut().read_json().unwrap(); 117 118 history.outgoing_transactions.len() == txs.len() 119 && txs.iter().all(|(wtid, amount)| { 120 history 121 .outgoing_transactions 122 .iter() 123 .any(|h| h.wtid == *wtid && &h.amount == amount) 124 }) 125 } 126 } 127 pub struct ChildGuard(pub Child); 128 129 impl Drop for ChildGuard { 130 fn drop(&mut self) { 131 self.0.kill().ok(); 132 } 133 } 134 135 #[track_caller] 136 pub fn try_cmd_redirect( 137 cmd: &str, 138 args: &[&str], 139 path: impl AsRef<Path>, 140 ) -> std::io::Result<ChildGuard> { 141 let log_file = std::fs::OpenOptions::new() 142 .create(true) 143 .append(true) 144 .open(path)?; 145 146 let child = Command::new(cmd) 147 .args(args) 148 .stderr(log_file.try_clone()?) 149 .stdout(log_file) 150 .stdin(Stdio::null()) 151 .spawn()?; 152 Ok(ChildGuard(child)) 153 } 154 155 #[track_caller] 156 pub fn cmd_redirect(cmd: &str, args: &[&str], path: impl AsRef<Path>) -> ChildGuard { 157 try_cmd_redirect(cmd, args, path).unwrap() 158 } 159 160 #[track_caller] 161 pub fn cmd_ok(mut child: ChildGuard, name: &str) { 162 let result = child.0.wait().unwrap(); 163 if !result.success() { 164 panic!("cmd {name} failed"); 165 } 166 } 167 168 #[track_caller] 169 pub fn cmd_redirect_ok(cmd: &str, args: &[&str], path: impl AsRef<Path>, name: &str) { 170 cmd_ok(cmd_redirect(cmd, args, path), name) 171 } 172 173 #[macro_export] 174 macro_rules! retry_opt { 175 ($expr:expr) => { 176 async { 177 let start = std::time::Instant::now(); 178 loop { 179 let result = $expr.await; 180 if result.is_err() && start.elapsed() < std::time::Duration::from_secs(30) { 181 tokio::time::sleep(std::time::Duration::from_millis(300)).await; 182 } else { 183 return result.unwrap(); 184 } 185 } 186 } 187 .await 188 }; 189 } 190 191 #[macro_export] 192 macro_rules! retry { 193 ($expr:expr) => { 194 $crate::retry_opt! { 195 async { $expr.await.then_some(()).ok_or("failure") } 196 } 197 }; 198 } 199 200 #[derive(Clone)] 201 pub struct TestCtx { 202 pub name: String, 203 pub root: PathBuf, 204 pub dir: PathBuf, 205 pub pb: ProgressBar, 206 pub db: Arc<LocalDb>, 207 } 208 209 impl TestCtx { 210 pub fn new( 211 root: impl Into<PathBuf>, 212 name: impl Into<String>, 213 pb: ProgressBar, 214 db: Arc<LocalDb>, 215 ) -> Self { 216 let root = root.into(); 217 let name = name.into(); 218 // Create log dir 219 let dir = root.join(&name); 220 std::fs::create_dir_all(&dir).unwrap(); 221 222 Self { 223 name, 224 dir, 225 pb, 226 db, 227 root, 228 } 229 } 230 231 pub fn log(&self, name: &str) -> PathBuf { 232 self.dir.join(format!("{name}.log")) 233 } 234 235 pub fn step(&self, disp: impl Display) { 236 self.pb.set_message(format!("{disp}")) 237 } 238 239 /* ----- Database ----- */ 240 241 pub fn stop_db(&mut self) { 242 self.db.stop_db(&self.name); 243 } 244 245 pub fn resume_db(&mut self) { 246 self.db.resume_db(&self.name); 247 } 248 } 249 250 pub struct TalerCtx { 251 pub wire_dir: PathBuf, 252 pub wire2_dir: PathBuf, 253 pub conf: PathBuf, 254 ctx: TestCtx, 255 pub wire_bin_path: String, 256 stressed: bool, 257 gateway: Option<ChildGuard>, 258 pub gateway_url: String, 259 wire: Option<ChildGuard>, 260 wire2: Option<ChildGuard>, 261 pub gateway_port: u16, 262 } 263 264 impl TalerCtx { 265 pub fn new(ctx: &TestCtx, wire_name: impl Into<String>, config: &str, stressed: bool) -> Self { 266 // Create temporary dir 267 let dir = ctx.dir.clone(); 268 let conf = dir.join("taler.conf"); 269 270 // Create common dirs 271 let wire_dir = dir.join("wire"); 272 let wire2_dir = dir.join("wire2"); 273 for dir in [&wire_dir, &wire2_dir] { 274 std::fs::create_dir_all(dir).unwrap(); 275 } 276 277 // Find unused port 278 let gateway_port = unused_port(); 279 let gateway_url = format!("http://localhost:{gateway_port}/taler-wire-gateway/"); 280 281 // Generate taler config from base 282 let wire_name = wire_name.into(); 283 let config = PathBuf::from_str("testbench/conf").unwrap().join(config); 284 let mut cfg = ini::Ini::load_from_file(config).unwrap(); 285 cfg.with_section(Some("exchange-accountcredentials-admin")) 286 .set("WIRE_GATEWAY_URL", &gateway_url); 287 cfg.with_section(Some(format!("{wire_name}db-postgres"))) 288 .set("CONFIG", ctx.db.postgres_uri(&ctx.name)); 289 cfg.with_section(Some(format!("{wire_name}-httpd"))) 290 .set("PORT", gateway_port.to_string()); 291 292 cfg.write_to_file(&conf).unwrap(); 293 294 Self { 295 ctx: ctx.clone(), 296 gateway_url, 297 wire_dir, 298 wire2_dir, 299 conf, 300 wire_bin_path: if stressed { 301 ctx.root.join(format!("bin/{wire_name}-fail")) 302 } else { 303 ctx.root.join(format!("bin/{wire_name}")) 304 } 305 .to_string_lossy() 306 .to_string(), 307 stressed, 308 gateway: None, 309 wire: None, 310 wire2: None, 311 gateway_port, 312 } 313 } 314 315 pub fn dbinit(&self) { 316 self.db.wait_running(); 317 self.db.create_db(&self.ctx.name); 318 // Init db 319 cmd_redirect_ok( 320 &self.wire_bin_path, 321 &["-c", self.conf.to_string_lossy().as_ref(), "dbinit"], 322 self.log("cmd"), 323 "wire dbinit", 324 ); 325 } 326 327 pub fn reset_db(&self) { 328 // Reset db 329 self.db.execute_sql( 330 " 331 FOR r IN ( 332 SELECT schemaname, tablename 333 FROM pg_tables 334 WHERE schemaname NOT IN ('pg_catalog', 'information_schema') 335 ) LOOP 336 EXECUTE format( 337 'TRUNCATE TABLE %I.%I RESTART IDENTITY CASCADE', 338 r.schemaname, 339 r.tablename 340 ); 341 END LOOP; 342 ", 343 ); 344 } 345 346 pub fn setup(&self) { 347 // Init db 348 cmd_redirect_ok( 349 &self.wire_bin_path, 350 &["-c", self.conf.to_string_lossy().as_ref(), "setup", "-r"], 351 self.log("cmd"), 352 "wire setup", 353 ); 354 } 355 356 pub async fn run(&mut self) { 357 // Run gateway 358 self.gateway = Some(cmd_redirect( 359 &self.wire_bin_path, 360 &[ 361 "-c", 362 self.conf.to_string_lossy().as_ref(), 363 "-L", 364 LOG, 365 "serve", 366 ], 367 self.log("gateway"), 368 )); 369 370 // Start wires 371 self.wire = Some(cmd_redirect( 372 &self.wire_bin_path, 373 &[ 374 "-c", 375 self.conf.to_string_lossy().as_ref(), 376 "-L", 377 LOG, 378 "worker", 379 ], 380 self.log("worker"), 381 )); 382 self.wire2 = self.stressed.then(|| { 383 cmd_redirect( 384 &self.wire_bin_path, 385 &[ 386 "-c", 387 self.conf.to_string_lossy().as_ref(), 388 "-L", 389 LOG, 390 "worker", 391 ], 392 self.log("worker+"), 393 ) 394 }); 395 396 // Wait for gateway to be up 397 retry_opt! { 398 tokio::net::TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.gateway_port)) 399 }; 400 } 401 402 /* ----- Process ----- */ 403 404 #[must_use] 405 pub fn wire_running(&mut self) -> bool { 406 self.wire.as_mut().unwrap().0.try_wait().unwrap().is_none() 407 } 408 409 #[must_use] 410 pub fn gateway_running(&mut self) -> bool { 411 self.gateway 412 .as_mut() 413 .unwrap() 414 .0 415 .try_wait() 416 .unwrap() 417 .is_none() 418 } 419 420 /* ----- Wire Gateway -----*/ 421 422 pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) -> bool { 423 check_incoming(&self.gateway_url, txs).await 424 } 425 426 pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) -> bool { 427 check_outgoing(&self.gateway_url, txs).await 428 } 429 430 pub async fn expect_gateway_up(&self) { 431 retry!(check_gateway_up(&self.gateway_url)) 432 } 433 434 pub async fn expect_gateway_down(&self) { 435 retry!(check_gateway_down(&self.gateway_url)) 436 } 437 } 438 439 impl Deref for TalerCtx { 440 type Target = TestCtx; 441 442 fn deref(&self) -> &Self::Target { 443 &self.ctx 444 } 445 } 446 447 impl DerefMut for TalerCtx { 448 fn deref_mut(&mut self) -> &mut Self::Target { 449 &mut self.ctx 450 } 451 } 452 453 pub fn unused_port() -> u16 { 454 TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)) 455 .unwrap() 456 .local_addr() 457 .unwrap() 458 .port() 459 } 460 461 pub struct LocalDb { 462 cluster_dir: String, 463 cmd_log: String, 464 cluster: ChildGuard, 465 } 466 467 impl LocalDb { 468 pub fn new(root: &Path) -> Self { 469 let cluster_dir = root.join("db"); 470 let cluster_log = root.join("postgres.log").to_string_lossy().to_string(); 471 let cmd_log = root.join("postgres-cmd.log").to_string_lossy().to_string(); 472 if !std::fs::exists(&cluster_dir).unwrap() { 473 // Init databases files 474 cmd_redirect_ok( 475 "initdb", 476 &[cluster_dir.to_string_lossy().as_ref()], 477 &cluster_log, 478 "init_db", 479 ); 480 } 481 // Generate database config 482 std::fs::write( 483 cluster_dir.join("postgresql.conf"), 484 format!( 485 " 486 listen_addresses='' 487 unix_socket_directories='{}' 488 fsync=off 489 synchronous_commit=off 490 full_page_writes=off 491 ", 492 cluster_dir.to_string_lossy().as_ref() 493 ), 494 ) 495 .unwrap(); 496 let cluster = cmd_redirect( 497 "postgres", 498 &["-D", cluster_dir.to_string_lossy().as_ref()], 499 &cmd_log, 500 ); 501 Self { 502 cluster_dir: cluster_dir.to_string_lossy().to_string(), 503 cmd_log, 504 cluster, 505 } 506 } 507 508 pub fn postgres_uri(&self, database: &str) -> String { 509 format!("postgres:///{database}?host={}", self.cluster_dir) 510 } 511 512 pub fn execute_sql(&self, sql: &str) -> bool { 513 let mut psql = ChildGuard( 514 Command::new("psql") 515 .arg(self.postgres_uri("postgres")) 516 .stderr(Stdio::null()) 517 .stdout( 518 std::fs::File::options() 519 .append(true) 520 .create(true) 521 .open(&self.cmd_log) 522 .unwrap(), 523 ) 524 .stdin(Stdio::piped()) 525 .spawn() 526 .unwrap(), 527 ); 528 psql.0 529 .stdin 530 .as_mut() 531 .unwrap() 532 .write_all(sql.as_bytes()) 533 .unwrap(); 534 psql.0.wait().unwrap().success() 535 } 536 537 pub fn create_db(&self, name: &str) { 538 self.execute_sql(&format!("CREATE DATABASE {name};")); 539 } 540 541 pub fn stop_db(&self, name: &str) { 542 self.execute_sql(&format!( 543 " 544 UPDATE pg_database SET datallowconn=false WHERE datname='{name}'; 545 SELECT pg_terminate_backend(pid) 546 FROM pg_stat_activity 547 WHERE datname='{name}' AND pid <> pg_backend_pid(); 548 " 549 )); 550 } 551 pub fn resume_db(&self, name: &str) { 552 self.execute_sql(&format!( 553 "UPDATE pg_database SET datallowconn=true WHERE datname='{name}';" 554 )); 555 } 556 557 pub fn wait_running(&self) { 558 for _ in 0..10 { 559 if self.execute_sql("SELECT true") { 560 break; 561 } 562 std::thread::sleep(Duration::from_millis(500)) 563 } 564 } 565 } 566 567 pub fn patch_config(from: impl AsRef<Path>, to: impl AsRef<Path>, patch: impl FnOnce(&mut Ini)) { 568 let mut cfg = ini::Ini::load_from_file(from).unwrap(); 569 patch(&mut cfg); 570 cfg.write_to_file(to).unwrap(); 571 }