/*
This file is part of TALER
Copyright (C) 2022 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see
*/
use std::{
fmt::Display,
fmt::Write as _,
io::Write as _,
net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream},
ops::{Deref, DerefMut},
path::{Path, PathBuf},
process::{Child, Command, Stdio},
str::FromStr,
sync::{Arc, Mutex},
thread::sleep,
time::{Duration, Instant},
};
use common::{
api_common::{Amount, Base32},
api_wire::{IncomingBankTransaction, IncomingHistory, OutgoingHistory, TransferRequest},
config::TalerConfig,
rand_slice,
url::Url,
};
use signal_child::{signal::Signal, Signalable};
use tempfile::TempDir;
pub fn print_now(disp: impl Display) {
print!("{}", disp);
std::io::stdout().flush().unwrap();
}
#[must_use]
pub fn check_incoming(base_url: &str, txs: &[([u8; 32], Amount)]) -> bool {
let history: IncomingHistory = ureq::get(&format!("{}history/incoming", base_url))
.query("delta", &format!("-{}", txs.len()))
.call()
.unwrap()
.into_json()
.unwrap();
history.incoming_transactions.len() == txs.len()
&& txs.iter().all(|(reserve_pub_key, taler_amount)| {
history.incoming_transactions.iter().any(|h| {
matches!(
h,
IncomingBankTransaction::IncomingReserveTransaction {
reserve_pub,
amount,
..
} if reserve_pub == &Base32::from(*reserve_pub_key) && amount == taler_amount
)
})
})
}
pub fn gateway_error(path: &str, error: u16) {
let err = ureq::get(path).call().unwrap_err();
match err {
ureq::Error::Status(nb, _) => assert_eq!(nb, error),
ureq::Error::Transport(_) => unreachable!(),
}
}
#[must_use]
pub fn check_gateway_error(base_url: &str) -> bool {
matches!(
ureq::get(&format!("{}history/incoming", base_url))
.query("delta", "-5")
.call(),
Err(ureq::Error::Status(504, _))
)
}
#[must_use]
pub fn check_gateway_down(base_url: &str) -> bool {
matches!(
ureq::get(&format!("{}history/incoming", base_url))
.query("delta", "-5")
.call(),
Err(ureq::Error::Status(502, _))
)
}
#[must_use]
pub fn check_gateway_up(base_url: &str) -> bool {
ureq::get(&format!("{}history/incoming", base_url))
.query("delta", "-5")
.call()
.is_ok()
}
pub fn transfer(base_url: &str, wtid: &[u8; 32], url: &Url, credit_account: Url, amount: &Amount) {
ureq::post(&format!("{}transfer", base_url))
.send_json(TransferRequest {
request_uid: Base32::from(rand_slice()),
amount: amount.clone(),
exchange_base_url: url.clone(),
wtid: Base32::from(*wtid),
credit_account,
})
.unwrap();
}
#[must_use]
pub fn check_outgoing(base_url: &str, url: &Url, txs: &[([u8; 32], Amount)]) -> bool {
let history: OutgoingHistory = ureq::get(&format!("{}history/outgoing", base_url))
.query("delta", &format!("-{}", txs.len()))
.call()
.unwrap()
.into_json()
.unwrap();
history.outgoing_transactions.len() == txs.len()
&& txs.iter().all(|(wtid, amount)| {
history.outgoing_transactions.iter().any(|h| {
h.wtid == Base32::from(*wtid) && &h.exchange_base_url == url && &h.amount == amount
})
})
}
pub struct ChildGuard(pub Child);
impl Drop for ChildGuard {
fn drop(&mut self) {
self.0.kill().ok();
}
}
pub fn cmd_out(cmd: &str, args: &[&str]) -> String {
let output = Command::new(cmd)
.args(args)
.stdin(Stdio::null())
.output()
.unwrap();
if output.stdout.is_empty() {
String::from_utf8(output.stderr).unwrap()
} else {
String::from_utf8(output.stdout).unwrap()
}
}
pub fn try_cmd_redirect(cmd: &str, args: &[&str], path: &str) -> std::io::Result {
let log_file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
let child = Command::new(cmd)
.args(args)
.stderr(log_file.try_clone()?)
.stdout(log_file)
.stdin(Stdio::null())
.spawn()?;
Ok(ChildGuard(child))
}
pub fn cmd_redirect(cmd: &str, args: &[&str], path: &str) -> ChildGuard {
try_cmd_redirect(cmd, args, path).unwrap()
}
pub fn cmd_ok(mut child: ChildGuard, name: &str) {
let result = child.0.wait().unwrap();
if !result.success() {
panic!("cmd {} failed", name);
}
}
pub fn cmd_redirect_ok(cmd: &str, args: &[&str], path: &str, name: &str) {
cmd_ok(cmd_redirect(cmd, args, path), name)
}
pub fn retry_opt(mut lambda: impl FnMut() -> Option) -> T {
let start = Instant::now();
loop {
let result = lambda();
if result.is_none() && start.elapsed() < Duration::from_secs(60) {
sleep(Duration::from_millis(500));
} else {
return result.unwrap();
}
}
}
pub fn retry(mut lambda: impl FnMut() -> bool) {
retry_opt(|| lambda().then_some(()))
}
#[derive(Clone)]
pub struct TestCtx {
pub log_dir: String,
pub out: Arc>,
}
impl TestCtx {
pub fn new(name: &str) -> Self {
// Create log dir
let log_dir = format!("log/{name}");
std::fs::remove_dir_all(&log_dir).ok();
std::fs::create_dir_all(&log_dir).unwrap();
// Generate password
let pwd: String = (0..30).map(|_| fastrand::alphanumeric()).collect();
std::env::set_var("PASSWORD", pwd);
Self {
log_dir,
out: Arc::new(Mutex::new(String::new())),
}
}
pub fn log(&self, name: &str) -> String {
format!("{}/{name}.log", self.log_dir)
}
pub fn step(&self, disp: impl Display) {
let it: &mut String = &mut self.out.lock().unwrap();
writeln!(it, "{disp}").unwrap();
}
}
pub struct TalerCtx {
pub dir: TempDir,
pub wire_dir: PathBuf,
pub wire2_dir: PathBuf,
pub db_dir: PathBuf,
pub conf: PathBuf,
pub taler_conf: TalerConfig,
ctx: TestCtx,
db: ChildGuard,
wire_bin_path: String,
stressed: bool,
gateway: Option,
pub gateway_url: String,
wire: Option,
wire2: Option,
pub gateway_port: u16,
}
impl TalerCtx {
pub fn new(ctx: &TestCtx, wire_name: impl Into, config: &str, stressed: bool) -> Self {
// Create temporary dir
let dir = TempDir::new().unwrap();
let conf = dir.path().join("taler.conf");
// Create common dirs
let wire_dir = dir.path().join("wire");
let wire2_dir = dir.path().join("wire2");
let db_dir = dir.path().join("db");
for dir in [&wire_dir, &wire2_dir, &db_dir] {
std::fs::create_dir_all(dir).unwrap();
}
// Find unused port
let gateway_port = unused_port();
let db_port = unused_port();
// Generate taler config from base
let config = PathBuf::from_str("instrumentation/conf")
.unwrap()
.join(config);
let mut config = ini::Ini::load_from_file(config).unwrap();
let section = config
.sections()
.find(|it| {
it.map(|it| it.starts_with("depolymerizer-"))
.unwrap_or(false)
})
.unwrap_or_default()
.map(|it| it.to_string());
config
.with_section(Some("exchange-accountcredentials-admin"))
.set(
"WIRE_GATEWAY_URL",
format!("http://localhost:{gateway_port}/"),
);
config
.with_section(section)
.set("CONF_PATH", wire_dir.to_string_lossy())
.set("IPC_PATH", wire_dir.to_string_lossy())
.set(
"DB_URL",
format!("postgres://localhost:{db_port}/postgres?user=postgres&password=password"),
)
.set("PORT", gateway_port.to_string());
config.write_to_file(&conf).unwrap();
let taler_conf = TalerConfig::load(Some(&conf));
// Setup database
let db = {
// Init databases files
cmd_redirect_ok(
"initdb",
&[db_dir.to_string_lossy().as_ref()],
&ctx.log("postgres"),
"init_db",
);
// Generate database config
std::fs::write(
db_dir.join("postgresql.conf"),
format!(
"port={db_port}\nunix_socket_directories='{}'\n",
db_dir.to_string_lossy().as_ref()
),
)
.unwrap();
let db = TalerCtx::start_db(&db_dir, ctx);
retry(|| {
let mut psql = ChildGuard(
Command::new("psql")
.args(["-h", "localhost", "-p", &db_port.to_string(), "postgres"])
.stderr(Stdio::null())
.stdout(
std::fs::File::options()
.append(true)
.create(true)
.open(ctx.log("postgres"))
.unwrap(),
)
.stdin(Stdio::piped())
.spawn()
.unwrap(),
);
psql.0
.stdin
.as_mut()
.unwrap()
.write_all(
"CREATE ROLE postgres LOGIN SUPERUSER PASSWORD 'password'".as_bytes(),
)
.unwrap();
psql.0.wait().unwrap().success()
});
db
};
let wire_name = wire_name.into();
Self {
ctx: ctx.clone(),
gateway_url: format!("http://localhost:{}/", taler_conf.port()),
dir,
wire_dir,
wire2_dir,
db_dir,
conf,
taler_conf,
db,
wire_bin_path: if stressed {
format!("log/bin/{wire_name}-fail")
} else {
format!("log/bin/{wire_name}")
},
stressed,
gateway: None,
wire: None,
wire2: None,
gateway_port,
}
}
pub fn init_db(&self) {
// Init db
retry(|| {
cmd_redirect(
&self.wire_bin_path,
&["-c", self.conf.to_string_lossy().as_ref(), "initdb"],
&self.log("cmd"),
)
.0
.wait()
.unwrap()
.success()
})
}
pub fn run(&mut self) {
// Start wires
self.wire = Some(cmd_redirect(
&self.wire_bin_path,
&["-c", self.conf.to_string_lossy().as_ref()],
&self.log("wire"),
));
self.wire2 = self.stressed.then(|| {
cmd_redirect(
&self.wire_bin_path,
&["-c", self.conf.to_string_lossy().as_ref()],
&self.log("wire1"),
)
});
// Run gateway
self.gateway = Some(cmd_redirect(
"log/bin/wire-gateway",
&["-c", self.conf.to_string_lossy().as_ref()],
&self.log("gateway"),
));
retry(|| {
TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.gateway_port)).is_ok()
});
}
/* ----- Process ----- */
#[must_use]
pub fn wire_running(&mut self) -> bool {
self.wire.as_mut().unwrap().0.try_wait().unwrap().is_none()
}
#[must_use]
pub fn gateway_running(&mut self) -> bool {
self.gateway
.as_mut()
.unwrap()
.0
.try_wait()
.unwrap()
.is_none()
}
/* ----- Database ----- */
fn start_db(db_dir: &Path, ctx: &TestCtx) -> ChildGuard {
cmd_redirect(
"postgres",
&["-D", db_dir.to_string_lossy().as_ref()],
&ctx.log("postgres"),
)
}
pub fn resume_db(&mut self) {
self.db = Self::start_db(&self.db_dir, &self.ctx);
}
pub fn stop_db(&mut self) {
self.db.0.signal(Signal::SIGQUIT).unwrap();
}
/* ----- Wire Gateway -----*/
pub fn expect_credits(&self, txs: &[([u8; 32], Amount)]) {
retry(|| check_incoming(&self.gateway_url, txs))
}
pub fn expect_debits(&self, base_url: &Url, txs: &[([u8; 32], Amount)]) {
retry(|| check_outgoing(&self.gateway_url, base_url, txs))
}
pub fn expect_error(&self) {
retry(|| check_gateway_error(&self.gateway_url));
}
pub fn expect_gateway_up(&self) {
retry(|| check_gateway_up(&self.gateway_url));
}
pub fn expect_gateway_down(&self) {
retry(|| check_gateway_down(&self.gateway_url));
}
}
impl Deref for TalerCtx {
type Target = TestCtx;
fn deref(&self) -> &Self::Target {
&self.ctx
}
}
impl DerefMut for TalerCtx {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.ctx
}
}
pub fn unused_port() -> u16 {
TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0))
.unwrap()
.local_addr()
.unwrap()
.port()
}