/* This file is part of TALER Copyright (C) 2022 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see */ use std::{ fmt::Display, fmt::Write as _, io::Write as _, net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream}, ops::{Deref, DerefMut}, path::{Path, PathBuf}, process::{Child, Command, Stdio}, str::FromStr, sync::{Arc, Mutex}, thread::sleep, time::{Duration, Instant}, }; use common::{ api_common::{Amount, Base32}, api_wire::{IncomingBankTransaction, IncomingHistory, OutgoingHistory, TransferRequest}, config::TalerConfig, rand_slice, url::Url, }; use signal_child::{signal::Signal, Signalable}; use tempfile::TempDir; pub fn print_now(disp: impl Display) { print!("{}", disp); std::io::stdout().flush().unwrap(); } #[must_use] pub fn check_incoming(base_url: &str, txs: &[([u8; 32], Amount)]) -> bool { let history: IncomingHistory = ureq::get(&format!("{}history/incoming", base_url)) .query("delta", &format!("-{}", txs.len())) .call() .unwrap() .into_json() .unwrap(); history.incoming_transactions.len() == txs.len() && txs.iter().all(|(reserve_pub_key, taler_amount)| { history.incoming_transactions.iter().any(|h| { matches!( h, IncomingBankTransaction::IncomingReserveTransaction { reserve_pub, amount, .. } if reserve_pub == &Base32::from(*reserve_pub_key) && amount == taler_amount ) }) }) } pub fn gateway_error(path: &str, error: u16) { let err = ureq::get(path).call().unwrap_err(); match err { ureq::Error::Status(nb, _) => assert_eq!(nb, error), ureq::Error::Transport(_) => unreachable!(), } } #[must_use] pub fn check_gateway_error(base_url: &str) -> bool { matches!( ureq::get(&format!("{}history/incoming", base_url)) .query("delta", "-5") .call(), Err(ureq::Error::Status(504, _)) ) } #[must_use] pub fn check_gateway_down(base_url: &str) -> bool { matches!( ureq::get(&format!("{}history/incoming", base_url)) .query("delta", "-5") .call(), Err(ureq::Error::Status(502, _)) ) } #[must_use] pub fn check_gateway_up(base_url: &str) -> bool { ureq::get(&format!("{}history/incoming", base_url)) .query("delta", "-5") .call() .is_ok() } pub fn transfer(base_url: &str, wtid: &[u8; 32], url: &Url, credit_account: Url, amount: &Amount) { ureq::post(&format!("{}transfer", base_url)) .send_json(TransferRequest { request_uid: Base32::from(rand_slice()), amount: amount.clone(), exchange_base_url: url.clone(), wtid: Base32::from(*wtid), credit_account, }) .unwrap(); } #[must_use] pub fn check_outgoing(base_url: &str, url: &Url, txs: &[([u8; 32], Amount)]) -> bool { let history: OutgoingHistory = ureq::get(&format!("{}history/outgoing", base_url)) .query("delta", &format!("-{}", txs.len())) .call() .unwrap() .into_json() .unwrap(); history.outgoing_transactions.len() == txs.len() && txs.iter().all(|(wtid, amount)| { history.outgoing_transactions.iter().any(|h| { h.wtid == Base32::from(*wtid) && &h.exchange_base_url == url && &h.amount == amount }) }) } pub struct ChildGuard(pub Child); impl Drop for ChildGuard { fn drop(&mut self) { self.0.kill().ok(); } } pub fn cmd_out(cmd: &str, args: &[&str]) -> String { let output = Command::new(cmd) .args(args) .stdin(Stdio::null()) .output() .unwrap(); if output.stdout.is_empty() { String::from_utf8(output.stderr).unwrap() } else { String::from_utf8(output.stdout).unwrap() } } pub fn try_cmd_redirect(cmd: &str, args: &[&str], path: &str) -> std::io::Result { let log_file = std::fs::OpenOptions::new() .create(true) .append(true) .open(path)?; let child = Command::new(cmd) .args(args) .stderr(log_file.try_clone()?) .stdout(log_file) .stdin(Stdio::null()) .spawn()?; Ok(ChildGuard(child)) } pub fn cmd_redirect(cmd: &str, args: &[&str], path: &str) -> ChildGuard { try_cmd_redirect(cmd, args, path).unwrap() } pub fn cmd_ok(mut child: ChildGuard, name: &str) { let result = child.0.wait().unwrap(); if !result.success() { panic!("cmd {} failed", name); } } pub fn cmd_redirect_ok(cmd: &str, args: &[&str], path: &str, name: &str) { cmd_ok(cmd_redirect(cmd, args, path), name) } pub fn retry_opt(mut lambda: impl FnMut() -> Option) -> T { let start = Instant::now(); loop { let result = lambda(); if result.is_none() && start.elapsed() < Duration::from_secs(60) { sleep(Duration::from_millis(500)); } else { return result.unwrap(); } } } pub fn retry(mut lambda: impl FnMut() -> bool) { retry_opt(|| lambda().then_some(())) } #[derive(Clone)] pub struct TestCtx { pub log_dir: String, pub out: Arc>, } impl TestCtx { pub fn new(name: &str) -> Self { // Create log dir let log_dir = format!("log/{name}"); std::fs::remove_dir_all(&log_dir).ok(); std::fs::create_dir_all(&log_dir).unwrap(); // Generate password let pwd: String = (0..30).map(|_| fastrand::alphanumeric()).collect(); std::env::set_var("PASSWORD", pwd); Self { log_dir, out: Arc::new(Mutex::new(String::new())), } } pub fn log(&self, name: &str) -> String { format!("{}/{name}.log", self.log_dir) } pub fn step(&self, disp: impl Display) { let it: &mut String = &mut self.out.lock().unwrap(); writeln!(it, "{disp}").unwrap(); } } pub struct TalerCtx { pub dir: TempDir, pub wire_dir: PathBuf, pub wire2_dir: PathBuf, pub db_dir: PathBuf, pub conf: PathBuf, pub taler_conf: TalerConfig, ctx: TestCtx, db: ChildGuard, wire_bin_path: String, stressed: bool, gateway: Option, pub gateway_url: String, wire: Option, wire2: Option, pub gateway_port: u16, } impl TalerCtx { pub fn new(ctx: &TestCtx, wire_name: impl Into, config: &str, stressed: bool) -> Self { // Create temporary dir let dir = TempDir::new().unwrap(); let conf = dir.path().join("taler.conf"); // Create common dirs let wire_dir = dir.path().join("wire"); let wire2_dir = dir.path().join("wire2"); let db_dir = dir.path().join("db"); for dir in [&wire_dir, &wire2_dir, &db_dir] { std::fs::create_dir_all(dir).unwrap(); } // Find unused port let gateway_port = unused_port(); let db_port = unused_port(); // Generate taler config from base let config = PathBuf::from_str("instrumentation/conf") .unwrap() .join(config); let mut config = ini::Ini::load_from_file(config).unwrap(); let section = config .sections() .find(|it| { it.map(|it| it.starts_with("depolymerizer-")) .unwrap_or(false) }) .unwrap_or_default() .map(|it| it.to_string()); config .with_section(Some("exchange-accountcredentials-admin")) .set( "WIRE_GATEWAY_URL", format!("http://localhost:{gateway_port}/"), ); config .with_section(section) .set("CONF_PATH", wire_dir.to_string_lossy()) .set("IPC_PATH", wire_dir.to_string_lossy()) .set( "DB_URL", format!("postgres://localhost:{db_port}/postgres?user=postgres&password=password"), ) .set("PORT", gateway_port.to_string()); config.write_to_file(&conf).unwrap(); let taler_conf = TalerConfig::load(Some(&conf)); // Setup database let db = { // Init databases files cmd_redirect_ok( "initdb", &[db_dir.to_string_lossy().as_ref()], &ctx.log("postgres"), "init_db", ); // Generate database config std::fs::write( db_dir.join("postgresql.conf"), format!( "port={db_port}\nunix_socket_directories='{}'\n", db_dir.to_string_lossy().as_ref() ), ) .unwrap(); let db = TalerCtx::start_db(&db_dir, ctx); retry(|| { let mut psql = ChildGuard( Command::new("psql") .args(["-h", "localhost", "-p", &db_port.to_string(), "postgres"]) .stderr(Stdio::null()) .stdout( std::fs::File::options() .append(true) .create(true) .open(ctx.log("postgres")) .unwrap(), ) .stdin(Stdio::piped()) .spawn() .unwrap(), ); psql.0 .stdin .as_mut() .unwrap() .write_all( "CREATE ROLE postgres LOGIN SUPERUSER PASSWORD 'password'".as_bytes(), ) .unwrap(); psql.0.wait().unwrap().success() }); db }; let wire_name = wire_name.into(); Self { ctx: ctx.clone(), gateway_url: format!("http://localhost:{}/", taler_conf.port()), dir, wire_dir, wire2_dir, db_dir, conf, taler_conf, db, wire_bin_path: if stressed { format!("log/bin/{wire_name}-fail") } else { format!("log/bin/{wire_name}") }, stressed, gateway: None, wire: None, wire2: None, gateway_port, } } pub fn init_db(&self) { // Init db retry(|| { cmd_redirect( &self.wire_bin_path, &["-c", self.conf.to_string_lossy().as_ref(), "initdb"], &self.log("cmd"), ) .0 .wait() .unwrap() .success() }) } pub fn run(&mut self) { // Start wires self.wire = Some(cmd_redirect( &self.wire_bin_path, &["-c", self.conf.to_string_lossy().as_ref()], &self.log("wire"), )); self.wire2 = self.stressed.then(|| { cmd_redirect( &self.wire_bin_path, &["-c", self.conf.to_string_lossy().as_ref()], &self.log("wire1"), ) }); // Run gateway self.gateway = Some(cmd_redirect( "log/bin/wire-gateway", &["-c", self.conf.to_string_lossy().as_ref()], &self.log("gateway"), )); retry(|| { TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.gateway_port)).is_ok() }); } /* ----- Process ----- */ #[must_use] pub fn wire_running(&mut self) -> bool { self.wire.as_mut().unwrap().0.try_wait().unwrap().is_none() } #[must_use] pub fn gateway_running(&mut self) -> bool { self.gateway .as_mut() .unwrap() .0 .try_wait() .unwrap() .is_none() } /* ----- Database ----- */ fn start_db(db_dir: &Path, ctx: &TestCtx) -> ChildGuard { cmd_redirect( "postgres", &["-D", db_dir.to_string_lossy().as_ref()], &ctx.log("postgres"), ) } pub fn resume_db(&mut self) { self.db = Self::start_db(&self.db_dir, &self.ctx); } pub fn stop_db(&mut self) { self.db.0.signal(Signal::SIGQUIT).unwrap(); } /* ----- Wire Gateway -----*/ pub fn expect_credits(&self, txs: &[([u8; 32], Amount)]) { retry(|| check_incoming(&self.gateway_url, txs)) } pub fn expect_debits(&self, base_url: &Url, txs: &[([u8; 32], Amount)]) { retry(|| check_outgoing(&self.gateway_url, base_url, txs)) } pub fn expect_error(&self) { retry(|| check_gateway_error(&self.gateway_url)); } pub fn expect_gateway_up(&self) { retry(|| check_gateway_up(&self.gateway_url)); } pub fn expect_gateway_down(&self) { retry(|| check_gateway_down(&self.gateway_url)); } } impl Deref for TalerCtx { type Target = TestCtx; fn deref(&self) -> &Self::Target { &self.ctx } } impl DerefMut for TalerCtx { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.ctx } } pub fn unused_port() -> u16 { TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)) .unwrap() .local_addr() .unwrap() .port() }