depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

main.rs (18265B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2022-2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{
     18     borrow::Cow,
     19     panic::UnwindSafe,
     20     path::{Path, PathBuf},
     21     str::FromStr,
     22     string::String,
     23     sync::{Arc, Mutex},
     24     time::{Duration, Instant},
     25 };
     26 
     27 use bitcoin::{Address, Txid, address::NetworkUnchecked};
     28 use clap::{Parser, ValueEnum};
     29 use depolymerizer_bitcoin::{
     30     CONFIG_SOURCE, DB_SCHEMA,
     31     cli::{Command, run},
     32     config::{WalletCfg, WorkerCfg, parse_db_cfg},
     33     db,
     34     payto::BtcWallet,
     35     rpc::{Error, ErrorCode, Rpc, rpc_common, rpc_wallet},
     36     taler_utils::taler_to_btc,
     37 };
     38 use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
     39 use owo_colors::OwoColorize;
     40 use reedline::{Prompt, Reedline, Signal};
     41 use sqlx::PgPool;
     42 use taler_common::{
     43     api_common::{EddsaPublicKey, HashCode, ShortHashCode},
     44     api_wire::TransferRequest,
     45     config::Config,
     46     db::pool,
     47     log::taler_logger,
     48     types::{amount::amount, payto::FullPayto},
     49 };
     50 use tokio::task::{JoinError, JoinHandle};
     51 use tracing::{error, info};
     52 use tracing_subscriber::util::SubscriberInitExt as _;
     53 use url::Url;
     54 
     55 use crate::utils::{ChildGuard, LocalDb, TestCtx, cmd_redirect, patch_config, try_cmd_redirect};
     56 
     57 mod btc;
     58 mod utils;
     59 
     60 /// Depolymerizer instrumentation test
     61 #[derive(clap::Parser, Debug)]
     62 enum Testbench {
     63     /// Start instrumentation tests for offline testing
     64     Instrumentation {
     65         /// With tests to run
     66         #[clap(global = true, default_value = "")]
     67         filters: Vec<String>,
     68     },
     69     /// Start bitcoin testbench for online testing
     70     Bitcoin { network: Network },
     71 }
     72 
     73 struct Tmp<'a> {
     74     root: &'a Path,
     75     filters: &'a [String],
     76     m: MultiProgress,
     77     start_style: ProgressStyle,
     78     ok_style: ProgressStyle,
     79     err_style: ProgressStyle,
     80     tasks: Vec<(
     81         &'static str,
     82         JoinHandle<(Result<(), JoinError>, Duration, String)>,
     83     )>,
     84     db: Arc<LocalDb>,
     85     start: Instant,
     86 }
     87 
     88 impl<'a> Tmp<'a> {
     89     async fn check<T, F>(&mut self, name: &'static str, task: T)
     90     where
     91         T: FnOnce(TestCtx) -> F + Send + UnwindSafe + 'static,
     92         F: Future<Output = ()> + Send + 'static,
     93     {
     94         if self.filters.is_empty() || self.filters.iter().any(|f| name.starts_with(f)) {
     95             let pb = self.m.add(ProgressBar::new_spinner());
     96             pb.set_style(self.start_style.clone());
     97             pb.set_prefix(name);
     98             pb.set_message("Init");
     99             pb.enable_steady_tick(Duration::from_millis(1000));
    100             let ok_style = self.ok_style.clone();
    101             let err_style = self.err_style.clone();
    102             let start = self.start;
    103             let db = self.db.clone();
    104             let ctx: TestCtx = TestCtx::new(self.root, name, pb.clone(), db);
    105             self.tasks.push((
    106                 name,
    107                 tokio::spawn(async move {
    108                     let result = tokio::spawn(task(ctx)).await;
    109                     if result.is_ok() {
    110                         pb.set_style(ok_style.clone());
    111                         pb.finish_with_message("OK");
    112                     } else {
    113                         pb.set_style(err_style.clone());
    114                         pb.finish();
    115                     }
    116                     (result, start.elapsed(), pb.message())
    117                 }),
    118             ));
    119         }
    120     }
    121 }
    122 
    123 #[tokio::main]
    124 pub async fn main() {
    125     taler_logger(None).init();
    126     match Testbench::parse() {
    127         Testbench::Instrumentation { filters } => instrumentation(filters).await,
    128         Testbench::Bitcoin { network } => bitcoin(network).await,
    129     }
    130 }
    131 
    132 #[derive(Debug, Clone, ValueEnum, PartialEq, Eq)]
    133 enum Network {
    134     Local,
    135     Test,
    136 }
    137 struct BtcEnv {
    138     network: Network,
    139     db: LocalDb,
    140     bitcoind: ChildGuard,
    141     wire_rpc: Rpc,
    142     client_rpc: Rpc,
    143     cfg: Config,
    144     wire_addr: Address,
    145     client_addr: Address,
    146     tracked: Vec<Txid>,
    147     pool: PgPool,
    148 }
    149 
    150 impl BtcEnv {
    151     pub async fn init(network: Network) -> Self {
    152         let network_dir = match network {
    153             Network::Local => "btc-local",
    154             Network::Test => "btc-test",
    155         };
    156         println!("Setup bitcoin {network:?} env in {network_dir}");
    157         let root_dir = PathBuf::from_str("testbench/env")
    158             .unwrap()
    159             .join(network_dir);
    160         std::fs::create_dir_all(root_dir.join("bitcoin")).unwrap();
    161         let root_dir = root_dir.canonicalize().unwrap();
    162 
    163         // Generate bitcoind config
    164         let cfg = match network {
    165             Network::Local => {
    166                 "
    167                 chain=regtest
    168             txindex=1
    169             rpcservertimeout=0
    170             fallbackfee=0.00000001
    171 
    172             [regtest]
    173             rpcport=18345
    174             "
    175             }
    176             Network::Test => {
    177                 "
    178             chain=signet
    179             txindex=1
    180             rpcservertimeout=0
    181             fallbackfee=0.000001
    182             [signet]
    183             rpcport=18345
    184             "
    185             }
    186         };
    187         std::fs::write(root_dir.join("bitcoin/bitcoin.conf"), cfg).unwrap();
    188         let datadir = match network {
    189             Network::Local => root_dir.join("bitcoin").join("regtest"),
    190             Network::Test => root_dir.join("bitcoin").join("signet"),
    191         };
    192 
    193         // Start database
    194         let db = LocalDb::new(&root_dir);
    195 
    196         // Generate Taler config
    197         let taler_cfg_path = root_dir.join("taler.conf");
    198         patch_config(
    199             "testbench/conf/taler_btc_dev.conf",
    200             &taler_cfg_path,
    201             |ini| {
    202                 ini.with_section(Some("depolymerizer-bitcoin-worker"))
    203                     .set("RPC_COOKIE_FILE", datadir.to_string_lossy());
    204                 ini.with_section(Some("depolymerizer-bitcoindb-postgres"))
    205                     .set("CONFIG", db.postgres_uri("depolymerizer_bitcoin"));
    206             },
    207         );
    208 
    209         // Start node
    210         let bitcoind = cmd_redirect(
    211             "bitcoind",
    212             &[&format!(
    213                 "-datadir={}",
    214                 root_dir.join("bitcoin").to_string_lossy()
    215             )],
    216             root_dir.join("bitcoind.log"),
    217         );
    218         let cfg = Config::from_file(CONFIG_SOURCE, Some(&taler_cfg_path)).unwrap();
    219         let worker_cfg = WorkerCfg::parse(&cfg).unwrap();
    220         let mut rpc = retry_opt! { rpc_common(&worker_cfg.rpc_cfg).await };
    221 
    222         for wallet in ["wire", "client"] {
    223             loop {
    224                 let res = rpc.load_wallet(wallet).await;
    225                 if let Err(Error::RPC { code, msg: _ }) = res {
    226                     match code {
    227                         ErrorCode::RpcInWarmup => continue,
    228                         ErrorCode::RpcWalletNotFound => {
    229                             rpc.create_wallet(wallet, "").await.unwrap();
    230                             break;
    231                         }
    232                         _ => {
    233                             res.unwrap();
    234                         }
    235                     }
    236                 } else {
    237                     break;
    238                 }
    239             }
    240         }
    241         drop(rpc);
    242         let mut wire_rpc = rpc_wallet(
    243             &worker_cfg.rpc_cfg,
    244             &WalletCfg {
    245                 name: "wire".to_string(),
    246                 password: None,
    247             },
    248         )
    249         .await
    250         .unwrap();
    251         let mut client_rpc = rpc_wallet(
    252             &worker_cfg.rpc_cfg,
    253             &WalletCfg {
    254                 name: "client".to_string(),
    255                 password: None,
    256             },
    257         )
    258         .await
    259         .unwrap();
    260         let wire_addr = wire_rpc.gen_addr().await.unwrap();
    261         let client_addr = client_rpc.gen_addr().await.unwrap();
    262         patch_config(&taler_cfg_path, &taler_cfg_path, |ini| {
    263             ini.with_section(Some("depolymerizer-bitcoin"))
    264                 .set("WALLET", wire_addr.to_string());
    265         });
    266         let cfg = Config::from_file(CONFIG_SOURCE, Some(taler_cfg_path)).unwrap();
    267 
    268         // Wait for db to start
    269         db.wait_running();
    270         db.create_db("depolymerizer_bitcoin");
    271 
    272         // Dbinit & setup
    273         run(Command::Dbinit { reset: false }, &cfg).await.unwrap();
    274         run(Command::Setup { reset: false }, &cfg).await.unwrap();
    275 
    276         let db_cg = parse_db_cfg(&cfg).unwrap();
    277         Self {
    278             pool: pool(db_cg.cfg, DB_SCHEMA).await.unwrap(),
    279             wire_addr,
    280             client_addr,
    281             network,
    282             db,
    283             bitcoind,
    284             cfg,
    285             wire_rpc,
    286             client_rpc,
    287             tracked: Vec::new(),
    288         }
    289     }
    290 }
    291 
    292 #[derive(clap::Parser, Debug)]
    293 enum Shell {
    294     Setup,
    295     Reset,
    296     ResetDb,
    297     Sync,
    298     Credit,
    299     Debit,
    300     Mine {
    301         amount: Option<u16>,
    302         addr: Option<Address<NetworkUnchecked>>,
    303     },
    304     Exit,
    305     Tx {
    306         txid: Txid,
    307     },
    308     Track {
    309         txid: Txid,
    310     },
    311     Untrack {
    312         txid: Txid,
    313     },
    314 }
    315 
    316 async fn run_cmd(env: &mut BtcEnv, buffer: &str) -> anyhow::Result<bool> {
    317     if buffer.is_empty() {
    318         return Ok(false);
    319     }
    320     let cmd = Shell::try_parse_from(std::iter::once("shell").chain(buffer.split(' ')))?;
    321     match cmd {
    322         Shell::Setup => run(Command::Setup { reset: false }, &env.cfg).await?,
    323         Shell::Reset => run(Command::Setup { reset: true }, &env.cfg).await?,
    324         Shell::ResetDb => {
    325             run(Command::Dbinit { reset: true }, &env.cfg).await?;
    326             run(Command::Setup { reset: false }, &env.cfg).await?;
    327         }
    328         Shell::Sync => run(Command::Worker { transient: true }, &env.cfg).await?,
    329         Shell::Credit => {
    330             let reserve_pub = EddsaPublicKey::rand();
    331             let amount = amount("DEVBTC:0.00011");
    332             let txid = env
    333                 .client_rpc
    334                 .send_segwit_key(&env.wire_addr, &taler_to_btc(&amount), &reserve_pub)
    335                 .await?;
    336             env.tracked.push(txid);
    337             info!(target: "testbench", "Credit {reserve_pub} {amount} {txid} to {}", env.wire_addr);
    338         }
    339         Shell::Debit => {
    340             let creditor = FullPayto::new(BtcWallet(env.client_addr.clone()), "client".to_string());
    341             let wtid = ShortHashCode::rand();
    342             let amount = amount("DEVBTC:0.0001");
    343             let transfer = TransferRequest {
    344                 request_uid: HashCode::rand(),
    345                 amount: amount.clone(),
    346                 exchange_base_url: Url::parse("https://test.com/").unwrap(),
    347                 wtid: wtid.clone(),
    348                 credit_account: creditor.as_payto(),
    349             };
    350             db::transfer(&env.pool, &creditor, &transfer).await?;
    351             info!(target: "testbench", "Debit {wtid} {amount} to {}", env.wire_addr);
    352         }
    353         Shell::Mine { amount, addr } => {
    354             let amount = amount.unwrap_or(1);
    355             let addr = addr.map(|a| a.assume_checked());
    356             let addr = addr.as_ref().unwrap_or(&env.client_addr);
    357             env.client_rpc.mine(amount, addr).await?;
    358         }
    359         Shell::Exit => return Ok(true),
    360         Shell::Tx { txid } => {
    361             let info = env.client_rpc.get_tx(&txid).await?;
    362             info!(target: "testbench", "{txid} {} {}", info.amount, info.confirmations);
    363         }
    364         Shell::Track { txid } => {
    365             env.tracked.push(txid);
    366         }
    367         Shell::Untrack { txid } => {
    368             env.tracked.retain(|id| *id != txid);
    369         }
    370     }
    371     Ok(false)
    372 }
    373 
    374 struct TestBenchPrompt {
    375     name: String,
    376     progress: String,
    377 }
    378 
    379 impl Prompt for TestBenchPrompt {
    380     fn render_prompt_left(&self) -> Cow<'_, str> {
    381         Cow::Borrowed(&self.name)
    382     }
    383 
    384     fn render_prompt_right(&self) -> Cow<'_, str> {
    385         Cow::Borrowed(&self.progress)
    386     }
    387 
    388     fn render_prompt_indicator(&self, _: reedline::PromptEditMode) -> Cow<'_, str> {
    389         Cow::Borrowed(">")
    390     }
    391 
    392     fn render_prompt_multiline_indicator(&self) -> Cow<'_, str> {
    393         Cow::Borrowed(":")
    394     }
    395 
    396     fn render_prompt_history_search_indicator(
    397         &self,
    398         _: reedline::PromptHistorySearch,
    399     ) -> Cow<'_, str> {
    400         Cow::Borrowed(">")
    401     }
    402 }
    403 
    404 async fn bitcoin(network: Network) {
    405     let mut env = BtcEnv::init(network).await;
    406 
    407     let mut line_editor = Reedline::create();
    408     loop {
    409         let info = env.client_rpc.get_blockchain_info().await.unwrap();
    410         let wire_balance = env.wire_rpc.get_balance().await.unwrap();
    411         println!("wire {} {wire_balance}", env.wire_addr);
    412         let client_balance = env.client_rpc.get_balance().await.unwrap();
    413         println!("client {} {client_balance}", env.client_addr);
    414         for txid in &env.tracked {
    415             match env.client_rpc.get_tx(txid).await {
    416                 Ok(info) => println!(
    417                     "{} {txid} {} {}",
    418                     "tx".cyan(),
    419                     info.amount,
    420                     info.confirmations
    421                 ),
    422                 Err(e) => println!("{} {txid} {}", "tx".cyan(), e.red()),
    423             }
    424         }
    425         let prompt = TestBenchPrompt {
    426             name: info.chain,
    427             progress: format!("{:.6}", info.verification_progress),
    428         };
    429         let sig = line_editor.read_line(&prompt).unwrap();
    430         match sig {
    431             Signal::Success(buffer) => match run_cmd(&mut env, &buffer).await {
    432                 Ok(exit) => {
    433                     if exit {
    434                         break;
    435                     }
    436                 }
    437                 Err(e) => error!(target: "testbench", "{e}"),
    438             },
    439             Signal::CtrlC | Signal::CtrlD => break,
    440         }
    441     }
    442 }
    443 
    444 pub async fn instrumentation(filters: Vec<String>) {
    445     let root = PathBuf::from_str("testbench/instrumentation").unwrap();
    446     std::fs::remove_dir_all(&root).ok();
    447     std::fs::create_dir_all(root.join("bin")).unwrap();
    448     let root = root.canonicalize().unwrap();
    449 
    450     // Set panic hook
    451     let failures = Arc::new(Mutex::new(Vec::new()));
    452     {
    453         let failures = failures.clone();
    454         std::panic::set_hook(Box::new(move |e| {
    455             let backtrace = std::backtrace::Backtrace::force_capture();
    456             let info = format!("{e}\n{backtrace}");
    457             if let Some(id) = tokio::task::try_id() {
    458                 failures.lock().unwrap().push((id, info));
    459             } else {
    460                 eprintln!("Failed outside of a task:\n{info}")
    461             }
    462         }));
    463     }
    464 
    465     // Build binaries
    466     let p = ProgressBar::new_spinner();
    467     p.set_style(ProgressStyle::with_template("building {msg} {elapsed:.dim}").unwrap());
    468     p.enable_steady_tick(Duration::from_millis(1000));
    469     for name in ["depolymerizer-bitcoin"] {
    470         build_bin(&root, &p, name, None, name);
    471         build_bin(&root, &p, name, Some("fail"), &format!("{name}-fail"));
    472     }
    473     p.finish_and_clear();
    474 
    475     // Run tests
    476     let m = MultiProgress::new();
    477     let start_style =
    478         ProgressStyle::with_template("{prefix:.magenta} {msg} {elapsed:.dim}").unwrap();
    479     let ok_style =
    480         ProgressStyle::with_template("{prefix:.magenta} {msg:.green} {elapsed:.dim}").unwrap();
    481     let err_style =
    482         ProgressStyle::with_template("{prefix:.magenta} {msg:.red} {elapsed:.dim}").unwrap();
    483 
    484     let start = Instant::now();
    485     let db = Arc::new(LocalDb::new(&root));
    486     let mut tmp = Tmp {
    487         root: &root,
    488         filters: filters.as_slice(),
    489         m,
    490         start_style,
    491         ok_style,
    492         err_style,
    493         tasks: Vec::new(),
    494         db,
    495         start,
    496     };
    497     tmp.check("btc_wire", btc::wire).await;
    498     tmp.check("btc_lifetime", btc::lifetime).await;
    499     tmp.check("btc_reconnect", btc::reconnect).await;
    500     tmp.check("btc_stress", btc::stress).await;
    501     tmp.check("btc_conflict", btc::conflict).await;
    502     tmp.check("btc_reorg", btc::reorg).await;
    503     tmp.check("btc_hell", btc::hell).await;
    504     tmp.check("btc_analysis", btc::analysis).await;
    505     tmp.check("btc_bumpfee", btc::bumpfee).await;
    506     tmp.check("btc_maxfee", btc::maxfee).await;
    507     tmp.check("btc_config", btc::config).await;
    508     let mut results = Vec::new();
    509     for (name, task) in tmp.tasks {
    510         results.push((name, task.await.unwrap()));
    511     }
    512 
    513     let len = results.len();
    514 
    515     tmp.m.clear().unwrap();
    516     let failures = failures.lock().unwrap();
    517     for (name, (result, _, msg)) in &results {
    518         if let Err(e) = result
    519             && let Some((_, err)) = failures.iter().find(|(id, _)| *id == e.id())
    520         {
    521             println!("{} {}\n{}", name.magenta(), msg.red(), err.bright_black());
    522         }
    523     }
    524     for (name, (result, time, msg)) in results {
    525         match result {
    526             Ok(_) => {
    527                 println!(
    528                     "{} {} {}",
    529                     name.magenta(),
    530                     "OK".green(),
    531                     format_args!("{}s", time.as_secs()).bright_black()
    532                 );
    533             }
    534             Err(_) => {
    535                 println!(
    536                     "{} {} {}",
    537                     name.magenta(),
    538                     msg.red(),
    539                     format_args!("{}s", time.as_secs()).bright_black()
    540                 );
    541             }
    542         }
    543     }
    544     println!("{} tests in {}s", len, start.elapsed().as_secs());
    545 }
    546 
    547 pub fn build_bin(root: &Path, p: &ProgressBar, name: &str, features: Option<&str>, bin_name: &str) {
    548     p.set_message(bin_name.to_string());
    549     let mut args = vec!["build", "--bin", name, "--release"];
    550     if let Some(features) = features {
    551         args.extend_from_slice(&["--features", features]);
    552     }
    553     let result = try_cmd_redirect("cargo", &args, root.join("bin/build"))
    554         .unwrap()
    555         .0
    556         .wait()
    557         .unwrap();
    558     assert!(result.success());
    559     std::fs::rename(
    560         format!("target/release/{name}"),
    561         root.join("bin").join(bin_name),
    562     )
    563     .unwrap();
    564 }