depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

utils.rs (15559B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2022-2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{
     18     fmt::Display,
     19     io::Write as _,
     20     net::{Ipv4Addr, SocketAddrV4, TcpListener},
     21     ops::{Deref, DerefMut},
     22     path::{Path, PathBuf},
     23     process::{Child, Command, Stdio},
     24     str::FromStr,
     25     sync::Arc,
     26     time::Duration,
     27 };
     28 
     29 use indicatif::ProgressBar;
     30 use ini::Ini;
     31 use taler_common::{
     32     api_common::{EddsaPublicKey, ShortHashCode},
     33     api_wire::{IncomingBankTransaction, IncomingHistory, OutgoingHistory, TransferRequest},
     34     types::{amount::Amount, base32::Base32, payto::PaytoURI},
     35 };
     36 use url::Url;
     37 
     38 const LOG: &str = "INFO";
     39 
     40 #[must_use]
     41 pub async fn check_incoming(base_url: &str, txs: &[(EddsaPublicKey, Amount)]) -> bool {
     42     let mut res = ureq::get(&format!("{base_url}history/incoming"))
     43         .query("delta", format!("-{}", 100))
     44         .call()
     45         .unwrap();
     46     if txs.is_empty() {
     47         res.status() == 204
     48     } else {
     49         if res.status() != 200 {
     50             return false;
     51         }
     52         let history: IncomingHistory = res.body_mut().read_json().unwrap();
     53 
     54         history.incoming_transactions.len() == txs.len()
     55             && txs.iter().all(|(reserve_pub_key, taler_amount)| {
     56                 history.incoming_transactions.iter().any(|h| {
     57                     matches!(
     58                         h,
     59                         IncomingBankTransaction::Reserve {
     60                             reserve_pub,
     61                             amount,
     62                             ..
     63                         } if reserve_pub == reserve_pub_key && amount == taler_amount
     64                     )
     65                 })
     66             })
     67     }
     68 }
     69 
     70 #[must_use]
     71 pub async fn check_gateway_down(base_url: &str) -> bool {
     72     matches!(
     73         ureq::get(&format!("{base_url}history/incoming"))
     74             .query("delta", "-5")
     75             .call(),
     76         Err(ureq::Error::StatusCode(504 | 502))
     77     )
     78 }
     79 
     80 #[must_use]
     81 pub async fn check_gateway_up(base_url: &str) -> bool {
     82     ureq::get(&format!("{base_url}config")).call().is_ok()
     83 }
     84 
     85 pub async fn transfer(base_url: &str, wtid: &[u8; 32], credit_account: PaytoURI, amount: &Amount) {
     86     loop {
     87         let res = ureq::post(&format!("{base_url}transfer")).send_json(TransferRequest {
     88             request_uid: Base32::rand(),
     89             amount: amount.clone(),
     90             exchange_base_url: Url::parse("https://exchange.test/").unwrap(),
     91             wtid: Base32::from(*wtid),
     92             credit_account: credit_account.clone(),
     93         });
     94         if !matches!(res, Err(ureq::Error::StatusCode(502))) {
     95             res.unwrap();
     96             break;
     97         }
     98     }
     99 }
    100 
    101 #[must_use]
    102 pub async fn check_outgoing(base_url: &str, txs: &[(ShortHashCode, Amount)]) -> bool {
    103     let mut res = ureq::get(format!("{base_url}history/outgoing"))
    104         .query("delta", format!("-{}", txs.len()))
    105         .call()
    106         .unwrap();
    107     if txs.is_empty() {
    108         res.status() == 204
    109     } else {
    110         if res.status() != 200 {
    111             return false;
    112         }
    113         let history: OutgoingHistory = res.body_mut().read_json().unwrap();
    114 
    115         history.outgoing_transactions.len() == txs.len()
    116             && txs.iter().all(|(wtid, amount)| {
    117                 history
    118                     .outgoing_transactions
    119                     .iter()
    120                     .any(|h| h.wtid == *wtid && &h.amount == amount)
    121             })
    122     }
    123 }
    124 pub struct ChildGuard(pub Child);
    125 
    126 impl Drop for ChildGuard {
    127     fn drop(&mut self) {
    128         self.0.kill().ok();
    129     }
    130 }
    131 
    132 #[track_caller]
    133 pub fn try_cmd_redirect(
    134     cmd: &str,
    135     args: &[&str],
    136     path: impl AsRef<Path>,
    137 ) -> std::io::Result<ChildGuard> {
    138     let log_file = std::fs::OpenOptions::new()
    139         .create(true)
    140         .append(true)
    141         .open(path)?;
    142 
    143     let child = Command::new(cmd)
    144         .args(args)
    145         .stderr(log_file.try_clone()?)
    146         .stdout(log_file)
    147         .stdin(Stdio::null())
    148         .spawn()?;
    149     Ok(ChildGuard(child))
    150 }
    151 
    152 #[track_caller]
    153 pub fn cmd_redirect(cmd: &str, args: &[&str], path: impl AsRef<Path>) -> ChildGuard {
    154     try_cmd_redirect(cmd, args, path).unwrap()
    155 }
    156 
    157 #[track_caller]
    158 pub fn cmd_ok(mut child: ChildGuard, name: &str) {
    159     let result = child.0.wait().unwrap();
    160     if !result.success() {
    161         panic!("cmd {name} failed");
    162     }
    163 }
    164 
    165 #[track_caller]
    166 pub fn cmd_redirect_ok(cmd: &str, args: &[&str], path: impl AsRef<Path>, name: &str) {
    167     cmd_ok(cmd_redirect(cmd, args, path), name)
    168 }
    169 
    170 #[macro_export]
    171 macro_rules! retry_opt {
    172     ($expr:expr) => {
    173         async {
    174             let start = std::time::Instant::now();
    175             loop {
    176                 let result = $expr;
    177                 if result.is_err() && start.elapsed() < std::time::Duration::from_secs(30) {
    178                     tokio::time::sleep(std::time::Duration::from_millis(500)).await;
    179                 } else {
    180                     return result.unwrap();
    181                 }
    182             }
    183         }
    184         .await
    185     };
    186 }
    187 
    188 #[macro_export]
    189 macro_rules! retry {
    190     ($expr:expr) => {
    191         $crate::retry_opt! {
    192             $expr.then_some(()).ok_or("failure")
    193         }
    194     };
    195 }
    196 #[derive(Clone)]
    197 pub struct TestCtx {
    198     pub name: String,
    199     pub root: PathBuf,
    200     pub dir: PathBuf,
    201     pub pb: ProgressBar,
    202     pub db: Arc<LocalDb>,
    203 }
    204 
    205 impl TestCtx {
    206     pub fn new(
    207         root: impl Into<PathBuf>,
    208         name: impl Into<String>,
    209         pb: ProgressBar,
    210         db: Arc<LocalDb>,
    211     ) -> Self {
    212         let root = root.into();
    213         let name = name.into();
    214         // Create log dir
    215         let dir = root.join(&name);
    216         std::fs::create_dir_all(&dir).unwrap();
    217 
    218         Self {
    219             name,
    220             dir,
    221             pb,
    222             db,
    223             root,
    224         }
    225     }
    226 
    227     pub fn log(&self, name: &str) -> PathBuf {
    228         self.dir.join(format!("{name}.log"))
    229     }
    230 
    231     pub fn step(&self, disp: impl Display) {
    232         self.pb.set_message(format!("{disp}"))
    233     }
    234 
    235     /* ----- Database ----- */
    236 
    237     pub fn stop_db(&mut self) {
    238         self.db.stop_db(&self.name);
    239     }
    240 
    241     pub fn resume_db(&mut self) {
    242         self.db.resume_db(&self.name);
    243     }
    244 }
    245 
    246 pub struct TalerCtx {
    247     pub wire_dir: PathBuf,
    248     pub wire2_dir: PathBuf,
    249     pub conf: PathBuf,
    250     ctx: TestCtx,
    251     pub wire_bin_path: String,
    252     stressed: bool,
    253     gateway: Option<ChildGuard>,
    254     pub gateway_url: String,
    255     wire: Option<ChildGuard>,
    256     wire2: Option<ChildGuard>,
    257     pub gateway_port: u16,
    258 }
    259 
    260 impl TalerCtx {
    261     pub fn new(ctx: &TestCtx, wire_name: impl Into<String>, config: &str, stressed: bool) -> Self {
    262         // Create temporary dir
    263         let dir = ctx.dir.clone();
    264         let conf = dir.join("taler.conf");
    265 
    266         // Create common dirs
    267         let wire_dir = dir.join("wire");
    268         let wire2_dir = dir.join("wire2");
    269         for dir in [&wire_dir, &wire2_dir] {
    270             std::fs::create_dir_all(dir).unwrap();
    271         }
    272 
    273         // Find unused port
    274         let gateway_port = unused_port();
    275         let gateway_url = format!("http://localhost:{gateway_port}/taler-wire-gateway/");
    276 
    277         // Generate taler config from base
    278         let wire_name = wire_name.into();
    279         let config = PathBuf::from_str("testbench/conf").unwrap().join(config);
    280         let mut cfg = ini::Ini::load_from_file(config).unwrap();
    281         cfg.with_section(Some("exchange-accountcredentials-admin"))
    282             .set("WIRE_GATEWAY_URL", &gateway_url);
    283         cfg.with_section(Some(format!("{wire_name}db-postgres")))
    284             .set("CONFIG", ctx.db.postgres_uri(&ctx.name));
    285         cfg.with_section(Some(format!("{wire_name}-httpd")))
    286             .set("PORT", gateway_port.to_string());
    287 
    288         cfg.write_to_file(&conf).unwrap();
    289 
    290         Self {
    291             ctx: ctx.clone(),
    292             gateway_url,
    293             wire_dir,
    294             wire2_dir,
    295             conf,
    296             wire_bin_path: if stressed {
    297                 ctx.root.join(format!("bin/{wire_name}-fail"))
    298             } else {
    299                 ctx.root.join(format!("bin/{wire_name}"))
    300             }
    301             .to_string_lossy()
    302             .to_string(),
    303             stressed,
    304             gateway: None,
    305             wire: None,
    306             wire2: None,
    307             gateway_port,
    308         }
    309     }
    310 
    311     pub fn dbinit(&self) {
    312         self.db.wait_running();
    313         self.db.create_db(&self.ctx.name);
    314         // Init db
    315         cmd_redirect_ok(
    316             &self.wire_bin_path,
    317             &["-c", self.conf.to_string_lossy().as_ref(), "dbinit"],
    318             self.log("cmd"),
    319             "wire dbinit",
    320         );
    321     }
    322 
    323     pub fn reset_db(&self) {
    324         // Reset db
    325         cmd_redirect_ok(
    326             &self.wire_bin_path,
    327             &["-c", self.conf.to_string_lossy().as_ref(), "dbinit", "-r"],
    328             self.log("cmd"),
    329             "wire dbinit reset",
    330         );
    331     }
    332 
    333     pub fn setup(&self) {
    334         // Init db
    335         cmd_redirect_ok(
    336             &self.wire_bin_path,
    337             &["-c", self.conf.to_string_lossy().as_ref(), "setup"],
    338             self.log("cmd"),
    339             "wire setup",
    340         );
    341     }
    342 
    343     pub async fn run(&mut self) {
    344         // Run gateway
    345         self.gateway = Some(cmd_redirect(
    346             &self.wire_bin_path,
    347             &[
    348                 "-c",
    349                 self.conf.to_string_lossy().as_ref(),
    350                 "-L",
    351                 LOG,
    352                 "serve",
    353             ],
    354             self.log("gateway"),
    355         ));
    356 
    357         // Start wires
    358         self.wire = Some(cmd_redirect(
    359             &self.wire_bin_path,
    360             &[
    361                 "-c",
    362                 self.conf.to_string_lossy().as_ref(),
    363                 "-L",
    364                 LOG,
    365                 "worker",
    366             ],
    367             self.log("worker"),
    368         ));
    369         self.wire2 = self.stressed.then(|| {
    370             cmd_redirect(
    371                 &self.wire_bin_path,
    372                 &[
    373                     "-c",
    374                     self.conf.to_string_lossy().as_ref(),
    375                     "-L",
    376                     LOG,
    377                     "worker",
    378                 ],
    379                 self.log("worker+"),
    380             )
    381         });
    382 
    383         // Wait for gateway to be up
    384         retry_opt! {
    385             tokio::net::TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.gateway_port)).await
    386         };
    387     }
    388 
    389     /* ----- Process ----- */
    390 
    391     #[must_use]
    392     pub fn wire_running(&mut self) -> bool {
    393         self.wire.as_mut().unwrap().0.try_wait().unwrap().is_none()
    394     }
    395 
    396     #[must_use]
    397     pub fn gateway_running(&mut self) -> bool {
    398         self.gateway
    399             .as_mut()
    400             .unwrap()
    401             .0
    402             .try_wait()
    403             .unwrap()
    404             .is_none()
    405     }
    406 
    407     /* ----- Wire Gateway -----*/
    408 
    409     pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) {
    410         retry! { check_incoming(&self.gateway_url, txs).await }
    411     }
    412 
    413     pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) {
    414         retry! { check_outgoing(&self.gateway_url, txs).await }
    415     }
    416 
    417     pub async fn expect_gateway_up(&self) {
    418         retry! { check_gateway_up(&self.gateway_url).await }
    419     }
    420 
    421     pub async fn expect_gateway_down(&self) {
    422         retry! { check_gateway_down(&self.gateway_url).await }
    423     }
    424 }
    425 
    426 impl Deref for TalerCtx {
    427     type Target = TestCtx;
    428 
    429     fn deref(&self) -> &Self::Target {
    430         &self.ctx
    431     }
    432 }
    433 
    434 impl DerefMut for TalerCtx {
    435     fn deref_mut(&mut self) -> &mut Self::Target {
    436         &mut self.ctx
    437     }
    438 }
    439 
    440 pub fn unused_port() -> u16 {
    441     TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0))
    442         .unwrap()
    443         .local_addr()
    444         .unwrap()
    445         .port()
    446 }
    447 
    448 pub struct LocalDb {
    449     cluster_dir: String,
    450     cmd_log: String,
    451     cluster: ChildGuard,
    452 }
    453 
    454 impl LocalDb {
    455     pub fn new(root: &Path) -> Self {
    456         let cluster_dir = root.join("db");
    457         let cluster_log = root.join("postgres.log").to_string_lossy().to_string();
    458         let cmd_log = root.join("postgres-cmd.log").to_string_lossy().to_string();
    459         if !std::fs::exists(&cluster_dir).unwrap() {
    460             // Init databases files
    461             cmd_redirect_ok(
    462                 "initdb",
    463                 &[cluster_dir.to_string_lossy().as_ref()],
    464                 &cluster_log,
    465                 "init_db",
    466             );
    467         }
    468         // Generate database config
    469         std::fs::write(
    470             cluster_dir.join("postgresql.conf"),
    471             format!(
    472                 "
    473                 listen_addresses=''
    474                 unix_socket_directories='{}'
    475                 fsync=off
    476                 synchronous_commit=off
    477                 full_page_writes=off
    478                 ",
    479                 cluster_dir.to_string_lossy().as_ref()
    480             ),
    481         )
    482         .unwrap();
    483         let cluster = cmd_redirect(
    484             "postgres",
    485             &["-D", cluster_dir.to_string_lossy().as_ref()],
    486             &cmd_log,
    487         );
    488         Self {
    489             cluster_dir: cluster_dir.to_string_lossy().to_string(),
    490             cmd_log,
    491             cluster,
    492         }
    493     }
    494 
    495     pub fn postgres_uri(&self, database: &str) -> String {
    496         format!("postgres:///{database}?host={}", self.cluster_dir)
    497     }
    498 
    499     pub fn execute_sql(&self, sql: &str) -> bool {
    500         let mut psql = ChildGuard(
    501             Command::new("psql")
    502                 .arg(self.postgres_uri("postgres"))
    503                 .stderr(Stdio::null())
    504                 .stdout(
    505                     std::fs::File::options()
    506                         .append(true)
    507                         .create(true)
    508                         .open(&self.cmd_log)
    509                         .unwrap(),
    510                 )
    511                 .stdin(Stdio::piped())
    512                 .spawn()
    513                 .unwrap(),
    514         );
    515         psql.0
    516             .stdin
    517             .as_mut()
    518             .unwrap()
    519             .write_all(sql.as_bytes())
    520             .unwrap();
    521         psql.0.wait().unwrap().success()
    522     }
    523 
    524     pub fn create_db(&self, name: &str) {
    525         self.execute_sql(&format!("CREATE DATABASE {name};"));
    526     }
    527 
    528     pub fn stop_db(&self, name: &str) {
    529         self.execute_sql(&format!(
    530             "
    531             UPDATE pg_database SET datallowconn=false WHERE datname='{name}';
    532             SELECT pg_terminate_backend(pid)
    533             FROM pg_stat_activity
    534             WHERE datname='{name}' AND pid <> pg_backend_pid();
    535             "
    536         ));
    537     }
    538     pub fn resume_db(&self, name: &str) {
    539         self.execute_sql(&format!(
    540             "UPDATE pg_database SET datallowconn=true WHERE datname='{name}';"
    541         ));
    542     }
    543 
    544     pub fn wait_running(&self) {
    545         for _ in 0..10 {
    546             if self.execute_sql("SELECT true") {
    547                 break;
    548             }
    549             std::thread::sleep(Duration::from_millis(500))
    550         }
    551     }
    552 }
    553 
    554 pub fn patch_config(from: impl AsRef<Path>, to: impl AsRef<Path>, patch: impl FnOnce(&mut Ini)) {
    555     let mut cfg = ini::Ini::load_from_file(from).unwrap();
    556     patch(&mut cfg);
    557     cfg.write_to_file(to).unwrap();
    558 }