utils.rs (15559B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2022-2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::{ 18 fmt::Display, 19 io::Write as _, 20 net::{Ipv4Addr, SocketAddrV4, TcpListener}, 21 ops::{Deref, DerefMut}, 22 path::{Path, PathBuf}, 23 process::{Child, Command, Stdio}, 24 str::FromStr, 25 sync::Arc, 26 time::Duration, 27 }; 28 29 use indicatif::ProgressBar; 30 use ini::Ini; 31 use taler_common::{ 32 api_common::{EddsaPublicKey, ShortHashCode}, 33 api_wire::{IncomingBankTransaction, IncomingHistory, OutgoingHistory, TransferRequest}, 34 types::{amount::Amount, base32::Base32, payto::PaytoURI}, 35 }; 36 use url::Url; 37 38 const LOG: &str = "INFO"; 39 40 #[must_use] 41 pub async fn check_incoming(base_url: &str, txs: &[(EddsaPublicKey, Amount)]) -> bool { 42 let mut res = ureq::get(&format!("{base_url}history/incoming")) 43 .query("delta", format!("-{}", 100)) 44 .call() 45 .unwrap(); 46 if txs.is_empty() { 47 res.status() == 204 48 } else { 49 if res.status() != 200 { 50 return false; 51 } 52 let history: IncomingHistory = res.body_mut().read_json().unwrap(); 53 54 history.incoming_transactions.len() == txs.len() 55 && txs.iter().all(|(reserve_pub_key, taler_amount)| { 56 history.incoming_transactions.iter().any(|h| { 57 matches!( 58 h, 59 IncomingBankTransaction::Reserve { 60 reserve_pub, 61 amount, 62 .. 63 } if reserve_pub == reserve_pub_key && amount == taler_amount 64 ) 65 }) 66 }) 67 } 68 } 69 70 #[must_use] 71 pub async fn check_gateway_down(base_url: &str) -> bool { 72 matches!( 73 ureq::get(&format!("{base_url}history/incoming")) 74 .query("delta", "-5") 75 .call(), 76 Err(ureq::Error::StatusCode(504 | 502)) 77 ) 78 } 79 80 #[must_use] 81 pub async fn check_gateway_up(base_url: &str) -> bool { 82 ureq::get(&format!("{base_url}config")).call().is_ok() 83 } 84 85 pub async fn transfer(base_url: &str, wtid: &[u8; 32], credit_account: PaytoURI, amount: &Amount) { 86 loop { 87 let res = ureq::post(&format!("{base_url}transfer")).send_json(TransferRequest { 88 request_uid: Base32::rand(), 89 amount: amount.clone(), 90 exchange_base_url: Url::parse("https://exchange.test/").unwrap(), 91 wtid: Base32::from(*wtid), 92 credit_account: credit_account.clone(), 93 }); 94 if !matches!(res, Err(ureq::Error::StatusCode(502))) { 95 res.unwrap(); 96 break; 97 } 98 } 99 } 100 101 #[must_use] 102 pub async fn check_outgoing(base_url: &str, txs: &[(ShortHashCode, Amount)]) -> bool { 103 let mut res = ureq::get(format!("{base_url}history/outgoing")) 104 .query("delta", format!("-{}", txs.len())) 105 .call() 106 .unwrap(); 107 if txs.is_empty() { 108 res.status() == 204 109 } else { 110 if res.status() != 200 { 111 return false; 112 } 113 let history: OutgoingHistory = res.body_mut().read_json().unwrap(); 114 115 history.outgoing_transactions.len() == txs.len() 116 && txs.iter().all(|(wtid, amount)| { 117 history 118 .outgoing_transactions 119 .iter() 120 .any(|h| h.wtid == *wtid && &h.amount == amount) 121 }) 122 } 123 } 124 pub struct ChildGuard(pub Child); 125 126 impl Drop for ChildGuard { 127 fn drop(&mut self) { 128 self.0.kill().ok(); 129 } 130 } 131 132 #[track_caller] 133 pub fn try_cmd_redirect( 134 cmd: &str, 135 args: &[&str], 136 path: impl AsRef<Path>, 137 ) -> std::io::Result<ChildGuard> { 138 let log_file = std::fs::OpenOptions::new() 139 .create(true) 140 .append(true) 141 .open(path)?; 142 143 let child = Command::new(cmd) 144 .args(args) 145 .stderr(log_file.try_clone()?) 146 .stdout(log_file) 147 .stdin(Stdio::null()) 148 .spawn()?; 149 Ok(ChildGuard(child)) 150 } 151 152 #[track_caller] 153 pub fn cmd_redirect(cmd: &str, args: &[&str], path: impl AsRef<Path>) -> ChildGuard { 154 try_cmd_redirect(cmd, args, path).unwrap() 155 } 156 157 #[track_caller] 158 pub fn cmd_ok(mut child: ChildGuard, name: &str) { 159 let result = child.0.wait().unwrap(); 160 if !result.success() { 161 panic!("cmd {name} failed"); 162 } 163 } 164 165 #[track_caller] 166 pub fn cmd_redirect_ok(cmd: &str, args: &[&str], path: impl AsRef<Path>, name: &str) { 167 cmd_ok(cmd_redirect(cmd, args, path), name) 168 } 169 170 #[macro_export] 171 macro_rules! retry_opt { 172 ($expr:expr) => { 173 async { 174 let start = std::time::Instant::now(); 175 loop { 176 let result = $expr; 177 if result.is_err() && start.elapsed() < std::time::Duration::from_secs(30) { 178 tokio::time::sleep(std::time::Duration::from_millis(500)).await; 179 } else { 180 return result.unwrap(); 181 } 182 } 183 } 184 .await 185 }; 186 } 187 188 #[macro_export] 189 macro_rules! retry { 190 ($expr:expr) => { 191 $crate::retry_opt! { 192 $expr.then_some(()).ok_or("failure") 193 } 194 }; 195 } 196 #[derive(Clone)] 197 pub struct TestCtx { 198 pub name: String, 199 pub root: PathBuf, 200 pub dir: PathBuf, 201 pub pb: ProgressBar, 202 pub db: Arc<LocalDb>, 203 } 204 205 impl TestCtx { 206 pub fn new( 207 root: impl Into<PathBuf>, 208 name: impl Into<String>, 209 pb: ProgressBar, 210 db: Arc<LocalDb>, 211 ) -> Self { 212 let root = root.into(); 213 let name = name.into(); 214 // Create log dir 215 let dir = root.join(&name); 216 std::fs::create_dir_all(&dir).unwrap(); 217 218 Self { 219 name, 220 dir, 221 pb, 222 db, 223 root, 224 } 225 } 226 227 pub fn log(&self, name: &str) -> PathBuf { 228 self.dir.join(format!("{name}.log")) 229 } 230 231 pub fn step(&self, disp: impl Display) { 232 self.pb.set_message(format!("{disp}")) 233 } 234 235 /* ----- Database ----- */ 236 237 pub fn stop_db(&mut self) { 238 self.db.stop_db(&self.name); 239 } 240 241 pub fn resume_db(&mut self) { 242 self.db.resume_db(&self.name); 243 } 244 } 245 246 pub struct TalerCtx { 247 pub wire_dir: PathBuf, 248 pub wire2_dir: PathBuf, 249 pub conf: PathBuf, 250 ctx: TestCtx, 251 pub wire_bin_path: String, 252 stressed: bool, 253 gateway: Option<ChildGuard>, 254 pub gateway_url: String, 255 wire: Option<ChildGuard>, 256 wire2: Option<ChildGuard>, 257 pub gateway_port: u16, 258 } 259 260 impl TalerCtx { 261 pub fn new(ctx: &TestCtx, wire_name: impl Into<String>, config: &str, stressed: bool) -> Self { 262 // Create temporary dir 263 let dir = ctx.dir.clone(); 264 let conf = dir.join("taler.conf"); 265 266 // Create common dirs 267 let wire_dir = dir.join("wire"); 268 let wire2_dir = dir.join("wire2"); 269 for dir in [&wire_dir, &wire2_dir] { 270 std::fs::create_dir_all(dir).unwrap(); 271 } 272 273 // Find unused port 274 let gateway_port = unused_port(); 275 let gateway_url = format!("http://localhost:{gateway_port}/taler-wire-gateway/"); 276 277 // Generate taler config from base 278 let wire_name = wire_name.into(); 279 let config = PathBuf::from_str("testbench/conf").unwrap().join(config); 280 let mut cfg = ini::Ini::load_from_file(config).unwrap(); 281 cfg.with_section(Some("exchange-accountcredentials-admin")) 282 .set("WIRE_GATEWAY_URL", &gateway_url); 283 cfg.with_section(Some(format!("{wire_name}db-postgres"))) 284 .set("CONFIG", ctx.db.postgres_uri(&ctx.name)); 285 cfg.with_section(Some(format!("{wire_name}-httpd"))) 286 .set("PORT", gateway_port.to_string()); 287 288 cfg.write_to_file(&conf).unwrap(); 289 290 Self { 291 ctx: ctx.clone(), 292 gateway_url, 293 wire_dir, 294 wire2_dir, 295 conf, 296 wire_bin_path: if stressed { 297 ctx.root.join(format!("bin/{wire_name}-fail")) 298 } else { 299 ctx.root.join(format!("bin/{wire_name}")) 300 } 301 .to_string_lossy() 302 .to_string(), 303 stressed, 304 gateway: None, 305 wire: None, 306 wire2: None, 307 gateway_port, 308 } 309 } 310 311 pub fn dbinit(&self) { 312 self.db.wait_running(); 313 self.db.create_db(&self.ctx.name); 314 // Init db 315 cmd_redirect_ok( 316 &self.wire_bin_path, 317 &["-c", self.conf.to_string_lossy().as_ref(), "dbinit"], 318 self.log("cmd"), 319 "wire dbinit", 320 ); 321 } 322 323 pub fn reset_db(&self) { 324 // Reset db 325 cmd_redirect_ok( 326 &self.wire_bin_path, 327 &["-c", self.conf.to_string_lossy().as_ref(), "dbinit", "-r"], 328 self.log("cmd"), 329 "wire dbinit reset", 330 ); 331 } 332 333 pub fn setup(&self) { 334 // Init db 335 cmd_redirect_ok( 336 &self.wire_bin_path, 337 &["-c", self.conf.to_string_lossy().as_ref(), "setup"], 338 self.log("cmd"), 339 "wire setup", 340 ); 341 } 342 343 pub async fn run(&mut self) { 344 // Run gateway 345 self.gateway = Some(cmd_redirect( 346 &self.wire_bin_path, 347 &[ 348 "-c", 349 self.conf.to_string_lossy().as_ref(), 350 "-L", 351 LOG, 352 "serve", 353 ], 354 self.log("gateway"), 355 )); 356 357 // Start wires 358 self.wire = Some(cmd_redirect( 359 &self.wire_bin_path, 360 &[ 361 "-c", 362 self.conf.to_string_lossy().as_ref(), 363 "-L", 364 LOG, 365 "worker", 366 ], 367 self.log("worker"), 368 )); 369 self.wire2 = self.stressed.then(|| { 370 cmd_redirect( 371 &self.wire_bin_path, 372 &[ 373 "-c", 374 self.conf.to_string_lossy().as_ref(), 375 "-L", 376 LOG, 377 "worker", 378 ], 379 self.log("worker+"), 380 ) 381 }); 382 383 // Wait for gateway to be up 384 retry_opt! { 385 tokio::net::TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.gateway_port)).await 386 }; 387 } 388 389 /* ----- Process ----- */ 390 391 #[must_use] 392 pub fn wire_running(&mut self) -> bool { 393 self.wire.as_mut().unwrap().0.try_wait().unwrap().is_none() 394 } 395 396 #[must_use] 397 pub fn gateway_running(&mut self) -> bool { 398 self.gateway 399 .as_mut() 400 .unwrap() 401 .0 402 .try_wait() 403 .unwrap() 404 .is_none() 405 } 406 407 /* ----- Wire Gateway -----*/ 408 409 pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) { 410 retry! { check_incoming(&self.gateway_url, txs).await } 411 } 412 413 pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) { 414 retry! { check_outgoing(&self.gateway_url, txs).await } 415 } 416 417 pub async fn expect_gateway_up(&self) { 418 retry! { check_gateway_up(&self.gateway_url).await } 419 } 420 421 pub async fn expect_gateway_down(&self) { 422 retry! { check_gateway_down(&self.gateway_url).await } 423 } 424 } 425 426 impl Deref for TalerCtx { 427 type Target = TestCtx; 428 429 fn deref(&self) -> &Self::Target { 430 &self.ctx 431 } 432 } 433 434 impl DerefMut for TalerCtx { 435 fn deref_mut(&mut self) -> &mut Self::Target { 436 &mut self.ctx 437 } 438 } 439 440 pub fn unused_port() -> u16 { 441 TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)) 442 .unwrap() 443 .local_addr() 444 .unwrap() 445 .port() 446 } 447 448 pub struct LocalDb { 449 cluster_dir: String, 450 cmd_log: String, 451 cluster: ChildGuard, 452 } 453 454 impl LocalDb { 455 pub fn new(root: &Path) -> Self { 456 let cluster_dir = root.join("db"); 457 let cluster_log = root.join("postgres.log").to_string_lossy().to_string(); 458 let cmd_log = root.join("postgres-cmd.log").to_string_lossy().to_string(); 459 if !std::fs::exists(&cluster_dir).unwrap() { 460 // Init databases files 461 cmd_redirect_ok( 462 "initdb", 463 &[cluster_dir.to_string_lossy().as_ref()], 464 &cluster_log, 465 "init_db", 466 ); 467 } 468 // Generate database config 469 std::fs::write( 470 cluster_dir.join("postgresql.conf"), 471 format!( 472 " 473 listen_addresses='' 474 unix_socket_directories='{}' 475 fsync=off 476 synchronous_commit=off 477 full_page_writes=off 478 ", 479 cluster_dir.to_string_lossy().as_ref() 480 ), 481 ) 482 .unwrap(); 483 let cluster = cmd_redirect( 484 "postgres", 485 &["-D", cluster_dir.to_string_lossy().as_ref()], 486 &cmd_log, 487 ); 488 Self { 489 cluster_dir: cluster_dir.to_string_lossy().to_string(), 490 cmd_log, 491 cluster, 492 } 493 } 494 495 pub fn postgres_uri(&self, database: &str) -> String { 496 format!("postgres:///{database}?host={}", self.cluster_dir) 497 } 498 499 pub fn execute_sql(&self, sql: &str) -> bool { 500 let mut psql = ChildGuard( 501 Command::new("psql") 502 .arg(self.postgres_uri("postgres")) 503 .stderr(Stdio::null()) 504 .stdout( 505 std::fs::File::options() 506 .append(true) 507 .create(true) 508 .open(&self.cmd_log) 509 .unwrap(), 510 ) 511 .stdin(Stdio::piped()) 512 .spawn() 513 .unwrap(), 514 ); 515 psql.0 516 .stdin 517 .as_mut() 518 .unwrap() 519 .write_all(sql.as_bytes()) 520 .unwrap(); 521 psql.0.wait().unwrap().success() 522 } 523 524 pub fn create_db(&self, name: &str) { 525 self.execute_sql(&format!("CREATE DATABASE {name};")); 526 } 527 528 pub fn stop_db(&self, name: &str) { 529 self.execute_sql(&format!( 530 " 531 UPDATE pg_database SET datallowconn=false WHERE datname='{name}'; 532 SELECT pg_terminate_backend(pid) 533 FROM pg_stat_activity 534 WHERE datname='{name}' AND pid <> pg_backend_pid(); 535 " 536 )); 537 } 538 pub fn resume_db(&self, name: &str) { 539 self.execute_sql(&format!( 540 "UPDATE pg_database SET datallowconn=true WHERE datname='{name}';" 541 )); 542 } 543 544 pub fn wait_running(&self) { 545 for _ in 0..10 { 546 if self.execute_sql("SELECT true") { 547 break; 548 } 549 std::thread::sleep(Duration::from_millis(500)) 550 } 551 } 552 } 553 554 pub fn patch_config(from: impl AsRef<Path>, to: impl AsRef<Path>, patch: impl FnOnce(&mut Ini)) { 555 let mut cfg = ini::Ini::load_from_file(from).unwrap(); 556 patch(&mut cfg); 557 cfg.write_to_file(to).unwrap(); 558 }