depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

utils.rs (15900B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2022-2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{
     18     fmt::Display,
     19     io::Write as _,
     20     net::{Ipv4Addr, SocketAddrV4, TcpListener},
     21     ops::{Deref, DerefMut},
     22     path::{Path, PathBuf},
     23     process::{Child, Command, Stdio},
     24     str::FromStr,
     25     sync::Arc,
     26     time::Duration,
     27 };
     28 
     29 use indicatif::ProgressBar;
     30 use ini::Ini;
     31 use taler_common::{
     32     api::{
     33         EddsaPublicKey, ShortHashCode,
     34         wire::{IncomingBankTransaction, IncomingHistory, OutgoingHistory, TransferRequest},
     35     },
     36     types::{amount::Amount, base32::Base32, payto::PaytoURI},
     37 };
     38 use url::Url;
     39 
     40 const LOG: &str = "DEBUG";
     41 
     42 #[must_use]
     43 pub async fn check_incoming(base_url: &str, txs: &[(EddsaPublicKey, Amount)]) -> bool {
     44     let mut res = ureq::get(&format!("{base_url}history/incoming"))
     45         .query("delta", format!("-{}", 100))
     46         .call()
     47         .unwrap();
     48     if txs.is_empty() {
     49         res.status() == 204
     50     } else {
     51         if res.status() != 200 {
     52             return false;
     53         }
     54         let history: IncomingHistory = res.body_mut().read_json().unwrap();
     55 
     56         history.incoming_transactions.len() == txs.len()
     57             && txs.iter().all(|(reserve_pub_key, taler_amount)| {
     58                 history.incoming_transactions.iter().any(|h| {
     59                     matches!(
     60                         h,
     61                         IncomingBankTransaction::Reserve {
     62                             reserve_pub,
     63                             amount,
     64                             ..
     65                         } if reserve_pub == reserve_pub_key && amount == taler_amount
     66                     )
     67                 })
     68             })
     69     }
     70 }
     71 
     72 #[must_use]
     73 pub async fn check_gateway_down(base_url: &str) -> bool {
     74     matches!(
     75         ureq::get(&format!("{base_url}history/incoming"))
     76             .query("delta", "-5")
     77             .call(),
     78         Err(ureq::Error::StatusCode(504 | 502))
     79     )
     80 }
     81 
     82 #[must_use]
     83 pub async fn check_gateway_up(base_url: &str) -> bool {
     84     ureq::get(&format!("{base_url}config")).call().is_ok()
     85 }
     86 
     87 pub async fn transfer(base_url: &str, wtid: &[u8; 32], credit_account: PaytoURI, amount: &Amount) {
     88     loop {
     89         let res = ureq::post(&format!("{base_url}transfer")).send_json(TransferRequest {
     90             request_uid: Base32::rand(),
     91             amount: *amount,
     92             exchange_base_url: Url::parse("https://exchange.test/").unwrap(),
     93             wtid: Base32::from(*wtid),
     94             credit_account: credit_account.clone(),
     95             metadata: None,
     96         });
     97         if !matches!(res, Err(ureq::Error::StatusCode(502))) {
     98             res.unwrap();
     99             break;
    100         }
    101     }
    102 }
    103 
    104 #[must_use]
    105 pub async fn check_outgoing(base_url: &str, txs: &[(ShortHashCode, Amount)]) -> bool {
    106     let mut res = ureq::get(format!("{base_url}history/outgoing"))
    107         .query("delta", format!("-{}", txs.len()))
    108         .call()
    109         .unwrap();
    110     if txs.is_empty() {
    111         res.status() == 204
    112     } else {
    113         if res.status() != 200 {
    114             return false;
    115         }
    116         let history: OutgoingHistory = res.body_mut().read_json().unwrap();
    117 
    118         history.outgoing_transactions.len() == txs.len()
    119             && txs.iter().all(|(wtid, amount)| {
    120                 history
    121                     .outgoing_transactions
    122                     .iter()
    123                     .any(|h| h.wtid == *wtid && &h.amount == amount)
    124             })
    125     }
    126 }
    127 pub struct ChildGuard(pub Child);
    128 
    129 impl Drop for ChildGuard {
    130     fn drop(&mut self) {
    131         self.0.kill().ok();
    132     }
    133 }
    134 
    135 #[track_caller]
    136 pub fn try_cmd_redirect(
    137     cmd: &str,
    138     args: &[&str],
    139     path: impl AsRef<Path>,
    140 ) -> std::io::Result<ChildGuard> {
    141     let log_file = std::fs::OpenOptions::new()
    142         .create(true)
    143         .append(true)
    144         .open(path)?;
    145 
    146     let child = Command::new(cmd)
    147         .args(args)
    148         .stderr(log_file.try_clone()?)
    149         .stdout(log_file)
    150         .stdin(Stdio::null())
    151         .spawn()?;
    152     Ok(ChildGuard(child))
    153 }
    154 
    155 #[track_caller]
    156 pub fn cmd_redirect(cmd: &str, args: &[&str], path: impl AsRef<Path>) -> ChildGuard {
    157     try_cmd_redirect(cmd, args, path).unwrap()
    158 }
    159 
    160 #[track_caller]
    161 pub fn cmd_ok(mut child: ChildGuard, name: &str) {
    162     let result = child.0.wait().unwrap();
    163     if !result.success() {
    164         panic!("cmd {name} failed");
    165     }
    166 }
    167 
    168 #[track_caller]
    169 pub fn cmd_redirect_ok(cmd: &str, args: &[&str], path: impl AsRef<Path>, name: &str) {
    170     cmd_ok(cmd_redirect(cmd, args, path), name)
    171 }
    172 
    173 #[macro_export]
    174 macro_rules! retry_opt {
    175     ($expr:expr) => {
    176         async {
    177             let start = std::time::Instant::now();
    178             loop {
    179                 let result = $expr.await;
    180                 if result.is_err() && start.elapsed() < std::time::Duration::from_secs(30) {
    181                     tokio::time::sleep(std::time::Duration::from_millis(300)).await;
    182                 } else {
    183                     return result.unwrap();
    184                 }
    185             }
    186         }
    187         .await
    188     };
    189 }
    190 
    191 #[macro_export]
    192 macro_rules! retry {
    193     ($expr:expr) => {
    194         $crate::retry_opt! {
    195             async { $expr.await.then_some(()).ok_or("failure") }
    196         }
    197     };
    198 }
    199 
    200 #[derive(Clone)]
    201 pub struct TestCtx {
    202     pub name: String,
    203     pub root: PathBuf,
    204     pub dir: PathBuf,
    205     pub pb: ProgressBar,
    206     pub db: Arc<LocalDb>,
    207 }
    208 
    209 impl TestCtx {
    210     pub fn new(
    211         root: impl Into<PathBuf>,
    212         name: impl Into<String>,
    213         pb: ProgressBar,
    214         db: Arc<LocalDb>,
    215     ) -> Self {
    216         let root = root.into();
    217         let name = name.into();
    218         // Create log dir
    219         let dir = root.join(&name);
    220         std::fs::create_dir_all(&dir).unwrap();
    221 
    222         Self {
    223             name,
    224             dir,
    225             pb,
    226             db,
    227             root,
    228         }
    229     }
    230 
    231     pub fn log(&self, name: &str) -> PathBuf {
    232         self.dir.join(format!("{name}.log"))
    233     }
    234 
    235     pub fn step(&self, disp: impl Display) {
    236         self.pb.set_message(format!("{disp}"))
    237     }
    238 
    239     /* ----- Database ----- */
    240 
    241     pub fn stop_db(&mut self) {
    242         self.db.stop_db(&self.name);
    243     }
    244 
    245     pub fn resume_db(&mut self) {
    246         self.db.resume_db(&self.name);
    247     }
    248 }
    249 
    250 pub struct TalerCtx {
    251     pub wire_dir: PathBuf,
    252     pub wire2_dir: PathBuf,
    253     pub conf: PathBuf,
    254     ctx: TestCtx,
    255     pub wire_bin_path: String,
    256     stressed: bool,
    257     gateway: Option<ChildGuard>,
    258     pub gateway_url: String,
    259     wire: Option<ChildGuard>,
    260     wire2: Option<ChildGuard>,
    261     pub gateway_port: u16,
    262 }
    263 
    264 impl TalerCtx {
    265     pub fn new(ctx: &TestCtx, wire_name: impl Into<String>, config: &str, stressed: bool) -> Self {
    266         // Create temporary dir
    267         let dir = ctx.dir.clone();
    268         let conf = dir.join("taler.conf");
    269 
    270         // Create common dirs
    271         let wire_dir = dir.join("wire");
    272         let wire2_dir = dir.join("wire2");
    273         for dir in [&wire_dir, &wire2_dir] {
    274             std::fs::create_dir_all(dir).unwrap();
    275         }
    276 
    277         // Find unused port
    278         let gateway_port = unused_port();
    279         let gateway_url = format!("http://localhost:{gateway_port}/taler-wire-gateway/");
    280 
    281         // Generate taler config from base
    282         let wire_name = wire_name.into();
    283         let config = PathBuf::from_str("testbench/conf").unwrap().join(config);
    284         let mut cfg = ini::Ini::load_from_file(config).unwrap();
    285         cfg.with_section(Some("exchange-accountcredentials-admin"))
    286             .set("WIRE_GATEWAY_URL", &gateway_url);
    287         cfg.with_section(Some(format!("{wire_name}db-postgres")))
    288             .set("CONFIG", ctx.db.postgres_uri(&ctx.name));
    289         cfg.with_section(Some(format!("{wire_name}-httpd")))
    290             .set("PORT", gateway_port.to_string());
    291 
    292         cfg.write_to_file(&conf).unwrap();
    293 
    294         Self {
    295             ctx: ctx.clone(),
    296             gateway_url,
    297             wire_dir,
    298             wire2_dir,
    299             conf,
    300             wire_bin_path: if stressed {
    301                 ctx.root.join(format!("bin/{wire_name}-fail"))
    302             } else {
    303                 ctx.root.join(format!("bin/{wire_name}"))
    304             }
    305             .to_string_lossy()
    306             .to_string(),
    307             stressed,
    308             gateway: None,
    309             wire: None,
    310             wire2: None,
    311             gateway_port,
    312         }
    313     }
    314 
    315     pub fn dbinit(&self) {
    316         self.db.wait_running();
    317         self.db.create_db(&self.ctx.name);
    318         // Init db
    319         cmd_redirect_ok(
    320             &self.wire_bin_path,
    321             &["-c", self.conf.to_string_lossy().as_ref(), "dbinit"],
    322             self.log("cmd"),
    323             "wire dbinit",
    324         );
    325     }
    326 
    327     pub fn reset_db(&self) {
    328         // Reset db
    329         self.db.execute_sql(
    330             "
    331             FOR r IN (
    332                     SELECT schemaname, tablename
    333                     FROM pg_tables
    334                     WHERE schemaname NOT IN ('pg_catalog', 'information_schema')
    335                 ) LOOP
    336                     EXECUTE format(
    337                         'TRUNCATE TABLE %I.%I RESTART IDENTITY CASCADE',
    338                         r.schemaname,
    339                         r.tablename
    340                     );
    341                 END LOOP;
    342             ",
    343         );
    344     }
    345 
    346     pub fn setup(&self) {
    347         // Init db
    348         cmd_redirect_ok(
    349             &self.wire_bin_path,
    350             &["-c", self.conf.to_string_lossy().as_ref(), "setup", "-r"],
    351             self.log("cmd"),
    352             "wire setup",
    353         );
    354     }
    355 
    356     pub async fn run(&mut self) {
    357         // Run gateway
    358         self.gateway = Some(cmd_redirect(
    359             &self.wire_bin_path,
    360             &[
    361                 "-c",
    362                 self.conf.to_string_lossy().as_ref(),
    363                 "-L",
    364                 LOG,
    365                 "serve",
    366             ],
    367             self.log("gateway"),
    368         ));
    369 
    370         // Start wires
    371         self.wire = Some(cmd_redirect(
    372             &self.wire_bin_path,
    373             &[
    374                 "-c",
    375                 self.conf.to_string_lossy().as_ref(),
    376                 "-L",
    377                 LOG,
    378                 "worker",
    379             ],
    380             self.log("worker"),
    381         ));
    382         self.wire2 = self.stressed.then(|| {
    383             cmd_redirect(
    384                 &self.wire_bin_path,
    385                 &[
    386                     "-c",
    387                     self.conf.to_string_lossy().as_ref(),
    388                     "-L",
    389                     LOG,
    390                     "worker",
    391                 ],
    392                 self.log("worker+"),
    393             )
    394         });
    395 
    396         // Wait for gateway to be up
    397         retry_opt! {
    398             tokio::net::TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.gateway_port))
    399         };
    400     }
    401 
    402     /* ----- Process ----- */
    403 
    404     #[must_use]
    405     pub fn wire_running(&mut self) -> bool {
    406         self.wire.as_mut().unwrap().0.try_wait().unwrap().is_none()
    407     }
    408 
    409     #[must_use]
    410     pub fn gateway_running(&mut self) -> bool {
    411         self.gateway
    412             .as_mut()
    413             .unwrap()
    414             .0
    415             .try_wait()
    416             .unwrap()
    417             .is_none()
    418     }
    419 
    420     /* ----- Wire Gateway -----*/
    421 
    422     pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) -> bool {
    423         check_incoming(&self.gateway_url, txs).await
    424     }
    425 
    426     pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) -> bool {
    427         check_outgoing(&self.gateway_url, txs).await
    428     }
    429 
    430     pub async fn expect_gateway_up(&self) {
    431         retry!(check_gateway_up(&self.gateway_url))
    432     }
    433 
    434     pub async fn expect_gateway_down(&self) {
    435         retry!(check_gateway_down(&self.gateway_url))
    436     }
    437 }
    438 
    439 impl Deref for TalerCtx {
    440     type Target = TestCtx;
    441 
    442     fn deref(&self) -> &Self::Target {
    443         &self.ctx
    444     }
    445 }
    446 
    447 impl DerefMut for TalerCtx {
    448     fn deref_mut(&mut self) -> &mut Self::Target {
    449         &mut self.ctx
    450     }
    451 }
    452 
    453 pub fn unused_port() -> u16 {
    454     TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0))
    455         .unwrap()
    456         .local_addr()
    457         .unwrap()
    458         .port()
    459 }
    460 
    461 pub struct LocalDb {
    462     cluster_dir: String,
    463     cmd_log: String,
    464     cluster: ChildGuard,
    465 }
    466 
    467 impl LocalDb {
    468     pub fn new(root: &Path) -> Self {
    469         let cluster_dir = root.join("db");
    470         let cluster_log = root.join("postgres.log").to_string_lossy().to_string();
    471         let cmd_log = root.join("postgres-cmd.log").to_string_lossy().to_string();
    472         if !std::fs::exists(&cluster_dir).unwrap() {
    473             // Init databases files
    474             cmd_redirect_ok(
    475                 "initdb",
    476                 &[cluster_dir.to_string_lossy().as_ref()],
    477                 &cluster_log,
    478                 "init_db",
    479             );
    480         }
    481         // Generate database config
    482         std::fs::write(
    483             cluster_dir.join("postgresql.conf"),
    484             format!(
    485                 "
    486                 listen_addresses=''
    487                 unix_socket_directories='{}'
    488                 fsync=off
    489                 synchronous_commit=off
    490                 full_page_writes=off
    491                 ",
    492                 cluster_dir.to_string_lossy().as_ref()
    493             ),
    494         )
    495         .unwrap();
    496         let cluster = cmd_redirect(
    497             "postgres",
    498             &["-D", cluster_dir.to_string_lossy().as_ref()],
    499             &cmd_log,
    500         );
    501         Self {
    502             cluster_dir: cluster_dir.to_string_lossy().to_string(),
    503             cmd_log,
    504             cluster,
    505         }
    506     }
    507 
    508     pub fn postgres_uri(&self, database: &str) -> String {
    509         format!("postgres:///{database}?host={}", self.cluster_dir)
    510     }
    511 
    512     pub fn execute_sql(&self, sql: &str) -> bool {
    513         let mut psql = ChildGuard(
    514             Command::new("psql")
    515                 .arg(self.postgres_uri("postgres"))
    516                 .stderr(Stdio::null())
    517                 .stdout(
    518                     std::fs::File::options()
    519                         .append(true)
    520                         .create(true)
    521                         .open(&self.cmd_log)
    522                         .unwrap(),
    523                 )
    524                 .stdin(Stdio::piped())
    525                 .spawn()
    526                 .unwrap(),
    527         );
    528         psql.0
    529             .stdin
    530             .as_mut()
    531             .unwrap()
    532             .write_all(sql.as_bytes())
    533             .unwrap();
    534         psql.0.wait().unwrap().success()
    535     }
    536 
    537     pub fn create_db(&self, name: &str) {
    538         self.execute_sql(&format!("CREATE DATABASE {name};"));
    539     }
    540 
    541     pub fn stop_db(&self, name: &str) {
    542         self.execute_sql(&format!(
    543             "
    544             UPDATE pg_database SET datallowconn=false WHERE datname='{name}';
    545             SELECT pg_terminate_backend(pid)
    546             FROM pg_stat_activity
    547             WHERE datname='{name}' AND pid <> pg_backend_pid();
    548             "
    549         ));
    550     }
    551     pub fn resume_db(&self, name: &str) {
    552         self.execute_sql(&format!(
    553             "UPDATE pg_database SET datallowconn=true WHERE datname='{name}';"
    554         ));
    555     }
    556 
    557     pub fn wait_running(&self) {
    558         for _ in 0..10 {
    559             if self.execute_sql("SELECT true") {
    560                 break;
    561             }
    562             std::thread::sleep(Duration::from_millis(500))
    563         }
    564     }
    565 }
    566 
    567 pub fn patch_config(from: impl AsRef<Path>, to: impl AsRef<Path>, patch: impl FnOnce(&mut Ini)) {
    568     let mut cfg = ini::Ini::load_from_file(from).unwrap();
    569     patch(&mut cfg);
    570     cfg.write_to_file(to).unwrap();
    571 }