depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 7e023efbd0dc93dcdee840092e53b000def844cc
parent 1bca69267cbf96aba82ea031a8c15ebacd1a28bd
Author: Antoine A <>
Date:   Mon, 28 Jul 2025 13:12:55 +0200

bitcoin: add testbench for online testing and improve setup

Diffstat:
M.gitignore | 3++-
MCargo.lock | 333+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--------
MCargo.toml | 3++-
Mdepolymerizer-bitcoin/src/api.rs | 2+-
Adepolymerizer-bitcoin/src/cli.rs | 149+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mdepolymerizer-bitcoin/src/db.rs | 14++++++++++----
Mdepolymerizer-bitcoin/src/lib.rs | 4++++
Mdepolymerizer-bitcoin/src/loops.rs | 4+---
Mdepolymerizer-bitcoin/src/loops/analysis.rs | 6++++--
Mdepolymerizer-bitcoin/src/loops/watcher.rs | 5+++--
Mdepolymerizer-bitcoin/src/loops/worker.rs | 135+++++++++++++++++++++++++++++++++++++++++++++++++++----------------------------
Mdepolymerizer-bitcoin/src/main.rs | 172++++---------------------------------------------------------------------------
Mdepolymerizer-bitcoin/src/rpc.rs | 38+++++++++++++++++++++++++++++++++++---
Adepolymerizer-bitcoin/src/setup.rs | 82+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Dinstrumentation/Cargo.toml | 29-----------------------------
Dinstrumentation/src/btc.rs | 1064-------------------------------------------------------------------------------
Dinstrumentation/src/main.rs | 211-------------------------------------------------------------------------------
Dinstrumentation/src/utils.rs | 555-------------------------------------------------------------------------------
Mmakefile | 2+-
Atestbench/Cargo.toml | 31+++++++++++++++++++++++++++++++
Rinstrumentation/conf/bitcoin.conf -> testbench/conf/bitcoin.conf | 0
Atestbench/conf/bitcoindev.conf | 5+++++
Rinstrumentation/conf/taler_btc.conf -> testbench/conf/taler_btc.conf | 0
Rinstrumentation/conf/taler_btc_bump.conf -> testbench/conf/taler_btc_bump.conf | 0
Atestbench/conf/taler_btc_dev.conf | 17+++++++++++++++++
Rinstrumentation/conf/taler_btc_lifetime.conf -> testbench/conf/taler_btc_lifetime.conf | 0
Rinstrumentation/conf/taler_eth.conf -> testbench/conf/taler_eth.conf | 0
Rinstrumentation/conf/taler_eth_bump.conf -> testbench/conf/taler_eth_bump.conf | 0
Rinstrumentation/conf/taler_eth_lifetime.conf -> testbench/conf/taler_eth_lifetime.conf | 0
Atestbench/src/btc.rs | 1062+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Atestbench/src/main.rs | 561+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Atestbench/src/utils.rs | 560+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
32 files changed, 2926 insertions(+), 2121 deletions(-)

diff --git a/.gitignore b/.gitignore @@ -1,5 +1,4 @@ /target -log configure test-suite.log /.vscode @@ -15,4 +14,6 @@ test-suite.log /tmp taler.conf *.mk +testbench/instrumentation +testbench/env tools \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock @@ -33,6 +33,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" [[package]] +name = "android-tzdata" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0" + +[[package]] +name = "android_system_properties" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311" +dependencies = [ + "libc", +] + +[[package]] name = "anes" version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -324,6 +339,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268" [[package]] +name = "chrono" +version = "0.4.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c469d952047f47f91b68d1cba3f10d63c11d73e4636f24f08daf0278abf01c4d" +dependencies = [ + "android-tzdata", + "iana-time-zone", + "num-traits", + "serde", + "windows-link", +] + +[[package]] name = "ciborium" version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -499,6 +527,12 @@ dependencies = [ ] [[package]] +name = "core-foundation-sys" +version = "0.8.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" + +[[package]] name = "cpufeatures" version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -599,6 +633,32 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" [[package]] +name = "crossterm" +version = "0.28.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "829d955a0bb380ef178a640b91779e3987da38c9aea133b20614cfed8cdea9c6" +dependencies = [ + "bitflags", + "crossterm_winapi", + "mio", + "parking_lot", + "rustix 0.38.44", + "serde", + "signal-hook", + "signal-hook-mio", + "winapi", +] + +[[package]] +name = "crossterm_winapi" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acdd7c62a3665c7f6830a51635d9ac9b23ed385797f70a83bb8bafe9c572ab2b" +dependencies = [ + "winapi", +] + +[[package]] name = "crunchy" version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -894,6 +954,17 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" [[package]] +name = "fd-lock" +version = "4.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce92ff622d6dadf7349484f42c93271a0d49b7cc4d466a936405bacbe10aa78" +dependencies = [ + "cfg-if", + "rustix 1.0.8", + "windows-sys 0.59.0", +] + +[[package]] name = "fiat-crypto" version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1230,6 +1301,30 @@ dependencies = [ ] [[package]] +name = "iana-time-zone" +version = "0.1.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0c919e5debc312ad217002b8048a17b7d83f80703865bbfcfebb0458b0b27d8" +dependencies = [ + "android_system_properties", + "core-foundation-sys", + "iana-time-zone-haiku", + "js-sys", + "log", + "wasm-bindgen", + "windows-core", +] + +[[package]] +name = "iana-time-zone-haiku" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f" +dependencies = [ + "cc", +] + +[[package]] name = "icu_collections" version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1366,24 +1461,6 @@ dependencies = [ ] [[package]] -name = "instrumentation" -version = "0.1.0" -dependencies = [ - "anyhow", - "bitcoin", - "clap", - "common", - "depolymerizer-bitcoin", - "indicatif", - "owo-colors", - "rust-ini", - "taler-common", - "tempfile", - "tokio", - "ureq", -] - -[[package]] name = "io-uring" version = "0.7.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1501,6 +1578,12 @@ dependencies = [ [[package]] name = "linux-raw-sys" +version = "0.4.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d26c52dbd32dccf2d10cac7725f8eae5296885fb5703b261f7d0a0739ec807ab" + +[[package]] +name = "linux-raw-sys" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cd945864f07fe9f5371a27ad7b52a172b4b499999f1d97574c9fa68373937e12" @@ -1588,6 +1671,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" dependencies = [ "libc", + "log", "wasi 0.11.1+wasi-snapshot-preview1", "windows-sys 0.59.0", ] @@ -1603,6 +1687,15 @@ dependencies = [ ] [[package]] +name = "nu-ansi-term" +version = "0.50.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4a28e057d01f97e61255210fcff094d74ed0466038633e95017f5beb68e4399" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] name = "num-bigint-dig" version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2006,14 +2099,34 @@ dependencies = [ [[package]] name = "redox_syscall" -version = "0.5.16" +version = "0.5.17" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7251471db004e509f4e75a62cca9435365b5ec7bcdff530d612ac7c87c44a792" +checksum = "5407465600fb0548f1442edf71dd20683c6ed326200ace4b1ef0763521bb3b77" dependencies = [ "bitflags", ] [[package]] +name = "reedline" +version = "0.41.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b627c435d0189363b15f885f1b07193d310ec9e4e39c5627951c6e0f4d02c93a" +dependencies = [ + "chrono", + "crossterm", + "fd-lock", + "itertools", + "nu-ansi-term 0.50.1", + "serde", + "strip-ansi-escapes", + "strum", + "strum_macros", + "thiserror", + "unicode-segmentation", + "unicode-width", +] + +[[package]] name = "regex" version = "1.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2103,6 +2216,19 @@ dependencies = [ [[package]] name = "rustix" +version = "0.38.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdb5bc1ae2baa591800df16c9ca78619bf65c0488b41b96ccec5d11220d8c154" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys 0.4.15", + "windows-sys 0.59.0", +] + +[[package]] +name = "rustix" version = "1.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "11181fbabf243db407ef8df94a6ce0b2f9a733bd8be4ad02b4eda9602296cac8" @@ -2110,15 +2236,15 @@ dependencies = [ "bitflags", "errno", "libc", - "linux-raw-sys", + "linux-raw-sys 0.9.4", "windows-sys 0.60.2", ] [[package]] name = "rustls" -version = "0.23.30" +version = "0.23.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "069a8df149a16b1a12dcc31497c3396a173844be3cac4bd40c9e7671fef96671" +checksum = "2491382039b29b9b11ff08b76ff6c97cf287671dbb74f0be44bda389fffe9bd1" dependencies = [ "log", "once_cell", @@ -2337,6 +2463,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] +name = "signal-hook" +version = "0.3.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d881a16cf4426aa584979d30bd82cb33429027e42122b169753d6ef1085ed6e2" +dependencies = [ + "libc", + "signal-hook-registry", +] + +[[package]] +name = "signal-hook-mio" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34db1a06d485c9142248b7a054f034b349b212551f3dfd19c94d45a754a217cd" +dependencies = [ + "libc", + "mio", + "signal-hook", +] + +[[package]] name = "signal-hook-registry" version = "1.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2611,12 +2758,40 @@ dependencies = [ ] [[package]] +name = "strip-ansi-escapes" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2a8f8038e7e7969abb3f1b7c2a811225e9296da208539e0f79c5251d6cac0025" +dependencies = [ + "vte", +] + +[[package]] name = "strsim" version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" [[package]] +name = "strum" +version = "0.26.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fec0f0aef304996cf250b31b5a10dee7980c85da9d759361292b8bca5a18f06" + +[[package]] +name = "strum_macros" +version = "0.26.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4c6bee85a5a24955dc440386795aa378cd9cf82acd5f764469152d2270e581be" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "rustversion", + "syn", +] + +[[package]] name = "subtle" version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2653,7 +2828,7 @@ dependencies = [ [[package]] name = "taler-api" version = "0.0.0" -source = "git+https://git.taler.net/taler-rust.git/#259bc29a703d0e0c9b8a3b3d5e6c1b89bfa0bd8b" +source = "git+https://git.taler.net/taler-rust.git/#e5ed895f45aa41a199f5b13dabbafda5744be7b5" dependencies = [ "axum", "base64", @@ -2676,7 +2851,7 @@ dependencies = [ [[package]] name = "taler-common" version = "0.0.0" -source = "git+https://git.taler.net/taler-rust.git/#259bc29a703d0e0c9b8a3b3d5e6c1b89bfa0bd8b" +source = "git+https://git.taler.net/taler-rust.git/#e5ed895f45aa41a199f5b13dabbafda5744be7b5" dependencies = [ "anyhow", "clap", @@ -2701,7 +2876,7 @@ dependencies = [ [[package]] name = "taler-test-utils" version = "0.0.0" -source = "git+https://git.taler.net/taler-rust.git/#259bc29a703d0e0c9b8a3b3d5e6c1b89bfa0bd8b" +source = "git+https://git.taler.net/taler-rust.git/#e5ed895f45aa41a199f5b13dabbafda5744be7b5" dependencies = [ "axum", "http-body-util", @@ -2728,11 +2903,32 @@ dependencies = [ "fastrand", "getrandom 0.3.3", "once_cell", - "rustix", + "rustix 1.0.8", "windows-sys 0.59.0", ] [[package]] +name = "testbench" +version = "0.1.0" +dependencies = [ + "anyhow", + "bitcoin", + "clap", + "common", + "depolymerizer-bitcoin", + "indicatif", + "owo-colors", + "reedline", + "rust-ini", + "sqlx", + "taler-common", + "tokio", + "tracing", + "tracing-subscriber", + "ureq", +] + +[[package]] name = "thiserror" version = "2.0.12" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2955,7 +3151,7 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" dependencies = [ - "nu-ansi-term", + "nu-ansi-term 0.46.0", "sharded-slab", "smallvec", "thread_local", @@ -3003,6 +3199,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e70f2a8b45122e719eb623c01822704c4e0907e7e426a05927e1a1cfff5b75d0" [[package]] +name = "unicode-segmentation" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" + +[[package]] name = "unicode-width" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3125,6 +3327,15 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" [[package]] +name = "vte" +version = "0.14.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "231fdcd7ef3037e8330d8e17e61011a2c244126acc0a982f4040ac3f9f0bc077" +dependencies = [ + "memchr", +] + +[[package]] name = "walkdir" version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3293,6 +3504,65 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] +name = "windows-core" +version = "0.61.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c0fdd3ddb90610c7638aa2b3a3ab2904fb9e5cdbecc643ddb3647212781c4ae3" +dependencies = [ + "windows-implement", + "windows-interface", + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-implement" +version = "0.60.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47fddd13af08290e67f4acabf4b459f647552718f683a7b415d290ac744a836" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-interface" +version = "0.59.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd9211b69f8dcdfa817bfd14bf1c97c9188afa36f4750130fcdf3f400eca9fa8" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "windows-link" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6ad25900d524eaabdbbb96d20b4311e1e7ae1699af4fb28c17ae66c80d798a" + +[[package]] +name = "windows-result" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56f42bd332cc6c8eac5af113fc0c1fd6a8fd2aa08a0119358686e5160d0586c6" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56e6c93f3a0c3b36176cb1327a4958a0353d5d166c2a35cb268ace15e91d3b57" +dependencies = [ + "windows-link", +] + +[[package]] name = "windows-sys" version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3325,7 +3595,7 @@ version = "0.60.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f2f500e4d28234f72040990ec9d39e3a6b950f9f22d3dba18416c35882612bcb" dependencies = [ - "windows-targets 0.53.2", + "windows-targets 0.53.3", ] [[package]] @@ -3361,10 +3631,11 @@ dependencies = [ [[package]] name = "windows-targets" -version = "0.53.2" +version = "0.53.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c66f69fcc9ce11da9966ddb31a40968cad001c5bedeb5c2b82ede4253ab48aef" +checksum = "d5fe6031c4041849d7c496a8ded650796e7b6ecc19df1a431c1a363342e5dc91" dependencies = [ + "windows-link", "windows_aarch64_gnullvm 0.53.0", "windows_aarch64_msvc 0.53.0", "windows_i686_gnu 0.53.0", diff --git a/Cargo.toml b/Cargo.toml @@ -1,6 +1,6 @@ [workspace] resolver = "3" -members = ["depolymerizer-bitcoin", "uri-pack", "common", "instrumentation"] +members = ["depolymerizer-bitcoin", "uri-pack", "common", "testbench"] [workspace.package] edition = "2024" @@ -39,3 +39,4 @@ tracing = "0.1" criterion = "0.7" base64 = "0.22.1" rand = { version = "0.9.0" } +tracing-subscriber = "0.3" diff --git a/depolymerizer-bitcoin/src/api.rs b/depolymerizer-bitcoin/src/api.rs @@ -247,7 +247,7 @@ async fn status_watcher(state: Arc<ServerState>) { loop { if let Err(err) = inner(&state, &mut jitter).await { - error!("status-watcher: {}", err); + error!(target: "status-watcher", "{err}"); sleep(jitter.next()).await; } } diff --git a/depolymerizer-bitcoin/src/cli.rs b/depolymerizer-bitcoin/src/cli.rs @@ -0,0 +1,149 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use axum::{Router, middleware}; +use taler_api::api::TalerRouter as _; +use taler_common::{ + CommonArgs, + cli::{ConfigCmd, long_version}, + config::Config, + db::{dbinit, pool}, +}; +use tracing::info; + +use crate::{ + CONFIG_SOURCE, DB_SCHEMA, + api::{ServerState, status_middleware}, + config::{ServeCfg, WorkerCfg, parse_db_cfg}, + loops::{ + watcher::watcher, + worker::{worker_loop, worker_transient}, + }, + setup::setup, +}; + +/// Taler adapter for bitcoincore +#[derive(clap::Parser, Debug)] +#[command(long_version = long_version(), about, long_about = None)] +pub struct Args { + #[clap(flatten)] + pub common: CommonArgs, + #[clap(subcommand)] + pub cmd: Command, +} + +#[derive(clap::Subcommand, Debug)] +pub enum Command { + /// Initialize btc-wire database + Dbinit { + /// Reset database (DANGEROUS: All existing data is lost) + #[clap(long, short)] + reset: bool, + }, + /// TODO + Setup { + #[clap(long, short)] + reset: bool, + }, + /// Run btc-wire worker + Worker { + /// Execute once and return + #[clap(long, short)] + transient: bool, + }, + /// Run btc-wire HTTP server + Serve { + /// Check whether an API is in use (if it's useful to start the HTTP + /// server). Exit with 0 if at least one API is enabled, otherwise 1 + #[clap(long)] + check: bool, + }, + #[command(subcommand)] + Config(ConfigCmd), +} + +/// TODO support external signer https://github.com/bitcoin/bitcoin/blob/master/doc/external-signer.md + +pub async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> { + match cmd { + Command::Dbinit { reset } => { + let cfg = parse_db_cfg(cfg)?; + let pool = pool(cfg.cfg, DB_SCHEMA).await?; + let mut conn = pool.acquire().await?; + dbinit( + &mut conn, + cfg.sql_dir.as_ref(), + CONFIG_SOURCE.component_name, + reset, + ) + .await?; + } + Command::Setup { reset } => { + setup(cfg, reset).await?; + } + Command::Worker { transient } => { + let state = WorkerCfg::parse(cfg)?; + let db_cfg = parse_db_cfg(cfg)?; + let pool = pool(db_cfg.cfg, DB_SCHEMA).await?; + + #[cfg(feature = "fail")] + tracing::warn!("Running with random failures"); + + if transient { + worker_transient(state, pool).await?; + } else { + tokio::spawn(watcher(state.rpc_cfg.clone(), pool.clone())); + tokio::spawn(worker_loop(state, pool)).await?; + + info!("btc-wire stopped"); + } + } + Command::Serve { check } => { + if check { + let cfg = ServeCfg::parse(cfg)?; + if cfg.revenue.is_none() && cfg.wire_gateway.is_none() { + std::process::exit(1); + } + } else { + let db = parse_db_cfg(cfg)?; + let pool = pool(db.cfg, DB_SCHEMA).await?; + let cfg = ServeCfg::parse(cfg)?; + let api = ServerState::start(pool, cfg.payto, cfg.currency).await; + let mut router = Router::new(); + + if let Some(cfg) = cfg.wire_gateway { + router = router.wire_gateway(api.clone(), cfg.auth); + } else { + panic!("lol") + } + + /*if let Some(cfg) = cfg.revenue { + router = router.revenue(api, cfg.auth); + }*/ + // TODO http lifetime + router + .layer(middleware::from_fn_with_state( + api.clone(), + status_middleware, + )) + .serve(cfg.serve, cfg.lifetime) + .await?; + } + } + Command::Config(cfg_cmd) => cfg_cmd.run(cfg)?, + } + Ok(()) +} diff --git a/depolymerizer-bitcoin/src/db.rs b/depolymerizer-bitcoin/src/db.rs @@ -74,10 +74,16 @@ pub async fn update_status(e: &mut PgListener, new_status: bool) -> sqlx::Result } /// Initialize the worker sync state -pub async fn init_sync_state<'a>(e: impl PgExecutor<'a>, hash: &BlockHash) -> sqlx::Result<()> { - sqlx::query( - "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING", - ) +pub async fn init_sync_state<'a>( + e: impl PgExecutor<'a>, + hash: &BlockHash, + reset: bool, +) -> sqlx::Result<()> { + sqlx::query(if reset { + "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO UPDATE SET value=$1" + } else { + "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING" + }) .bind(hash.as_byte_array()) .execute(e) .await?; diff --git a/depolymerizer-bitcoin/src/lib.rs b/depolymerizer-bitcoin/src/lib.rs @@ -22,12 +22,16 @@ use segwit::{decode_segwit_msg, encode_segwit_key}; use taler_common::{api_common::EddsaPublicKey, config::parser::ConfigSource}; pub mod api; +pub mod cli; pub mod config; pub mod db; +mod fail_point; +mod loops; pub mod payto; pub mod rpc; pub mod rpc_utils; pub mod segwit; +pub mod setup; pub mod sql; pub mod taler_utils; diff --git a/depolymerizer-bitcoin/src/loops.rs b/depolymerizer-bitcoin/src/loops.rs @@ -14,9 +14,7 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use depolymerizer_bitcoin::rpc; - -use crate::fail_point::Injected; +use crate::{fail_point::Injected, rpc}; pub mod analysis; pub mod watcher; diff --git a/depolymerizer-bitcoin/src/loops/analysis.rs b/depolymerizer-bitcoin/src/loops/analysis.rs @@ -13,10 +13,11 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use depolymerizer_bitcoin::rpc::{ChainTipsStatus, Rpc}; + use tracing::warn; use super::LoopResult; +use crate::rpc::{ChainTipsStatus, Rpc}; /// Analyse blockchain behavior and return the new confirmation delay pub async fn analysis(rpc: &mut Rpc, current: u32, max: u32) -> LoopResult<u32> { @@ -33,7 +34,8 @@ pub async fn analysis(rpc: &mut Rpc, current: u32, max: u32) -> LoopResult<u32> // Limit confirmation growth let new_conf = fork.saturating_add(1).min(max); warn!( - "analysis: found dangerous fork of {fork} blocks, adapt confirmation to {new_conf} blocks capped at {max}, you should update taler.conf" + target: "analysis", + "found dangerous fork of {fork} blocks, adapt confirmation to {new_conf} blocks capped at {max}, you should update taler.conf" ); return Ok(new_conf); } diff --git a/depolymerizer-bitcoin/src/loops/watcher.rs b/depolymerizer-bitcoin/src/loops/watcher.rs @@ -14,12 +14,13 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use depolymerizer_bitcoin::{config::RpcCfg, rpc::rpc_common}; use sqlx::PgPool; use taler_common::ExpoBackoffDecorr; use tokio::time::sleep; use tracing::error; +use crate::{config::RpcCfg, rpc::rpc_common}; + use super::LoopResult; /// Wait for new block and notify arrival with postgreSQL notifications @@ -36,7 +37,7 @@ pub async fn watcher(rpc_cfg: RpcCfg, pool: PgPool) { } .await; if let Err(e) = result { - error!("watcher: {e}"); + error!(target: "watcher", "{e}"); sleep(jitter.next()).await; } } diff --git a/depolymerizer-bitcoin/src/loops/worker.rs b/depolymerizer-bitcoin/src/loops/worker.rs @@ -21,24 +21,24 @@ use common::{ status::BounceStatus, taler_common::{api_common::ShortHashCode, types::timestamp::Timestamp}, }; -use depolymerizer_bitcoin::{ +use sqlx::{PgPool, postgres::PgListener}; +use taler_common::{ExpoBackoffDecorr, types::url}; +use tokio::time::sleep; +use tracing::{debug, error, info, trace, warn}; + +use crate::{ GetOpReturnErr, GetSegwitErr, + config::WorkerCfg, db::{self, AddIncomingResult, SyncBounceResult, SyncOutResult, worker_lock}, + fail_point::fail_point, rpc::{self, Category, ErrorCode, Rpc, Transaction, rpc_wallet}, rpc_utils::sender_address, taler_utils::btc_to_taler, }; -use sqlx::{PgPool, postgres::PgListener}; -use taler_common::{ExpoBackoffDecorr, types::url}; -use tokio::time::sleep; -use tracing::{debug, error, info, trace, warn}; - -use crate::{WorkerCfg, fail_point::fail_point}; use super::{LoopError, LoopResult, analysis::analysis}; -/// Synchronize local db with blockchain and perform transactions -pub async fn worker(mut state: WorkerCfg, pool: PgPool) { +pub async fn worker_loop(mut state: WorkerCfg, pool: PgPool) { let mut jitter = ExpoBackoffDecorr::default(); let mut lifetime = state.lifetime; let mut status = true; @@ -71,55 +71,38 @@ pub async fn worker(mut state: WorkerCfg, pool: PgPool) { { let ntf = db.next_buffered(); if let Some(ntf) = &ntf { - trace!("notification from {}", ntf.channel()) + trace!(target: "worker", "notification from {}", ntf.channel()) } if !skip_notification && ntf.is_none() { - debug!("waiting for notifications"); + debug!(target: "worker", "waiting for notifications"); // Block until next notification if let Some(ntf) = db.try_recv().await? { - trace!("notification from {}", ntf.channel()) + trace!(target: "worker", "notification from {}", ntf.channel()) } } // Conflate all notifications while let Some(ntf) = db.next_buffered() { - trace!("notification from {}", ntf.channel()) + trace!(target: "worker", "notification from {}", ntf.channel()) } } // Check lifetime if let Some(nb) = lifetime.as_mut() { if *nb == 0 { - info!("Reach end of lifetime"); + info!(target: "worker", "Reach end of lifetime"); return Ok(()); } else { *nb -= 1; } } - debug!("syncing blockchain"); + debug!(target: "worker", "syncing blockchain"); // Perform analysis state.confirmation = analysis(rpc, state.confirmation, state.max_confirmation).await?; - // Sync chain - if let Some(stuck) = sync_chain(rpc, db, &state, &mut status).await? { - // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent - - // Send requested debits - while debit(db, rpc, &state).await? {} - - // Bump stuck transactions - for id in stuck { - let bump = rpc.bump_fee(&id).await?; - fail_point("(injected) fail bump", 0.3)?; - let wtid = db::bump_tx_id(&mut *db, &id, &bump.txid).await?; - info!(">> (bump) {wtid} replace {id} with {}", bump.txid); - } - - // Send requested bounce - while bounce(db, rpc, &state.bounce_fee).await? {} - } + worker_step(rpc, db, &mut state, &mut status).await?; skip_notification = false; jitter.reset(); @@ -127,7 +110,7 @@ pub async fn worker(mut state: WorkerCfg, pool: PgPool) { } .await; if let Err(e) = result { - error!("worker: {e}"); + error!(target: "worker", "{e}"); // When we catch an error, we sometimes want to retry immediately (eg. reconnect to RPC or DB). // Bitcoin error codes are generic. We need to match the msg to get precise ones. Some errors // can resolve themselves when a new block is mined (new fees, new transactions). Our simple @@ -137,6 +120,7 @@ pub async fn worker(mut state: WorkerCfg, pool: PgPool) { LoopError::Rpc(rpc::Error::Transport(_)) | LoopError::DB(_) | LoopError::Injected(_) + | LoopError::Concurrency ); sleep(jitter.next()).await; } else { @@ -145,6 +129,58 @@ pub async fn worker(mut state: WorkerCfg, pool: PgPool) { } } +pub async fn worker_transient(mut state: WorkerCfg, pool: PgPool) -> LoopResult<()> { + let mut status = true; + + // Connect + let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg).await?; + let db = &mut PgListener::connect_with(&pool).await?; + + // It is not possible to atomically update the blockchain and the database. + // When we failed to sync the database and the blockchain state we rely on + // sync_chain to recover the lost updates. + // When this function is running concurrently, it not possible to known another + // execution has failed, and this can lead to a transaction being sent multiple time. + // To ensure only a single version of this function is running at a given time we rely + // on postgres advisory lock + + // Take the lock + if !worker_lock(&mut *db).await? { + return Err(LoopError::Concurrency); + } + + worker_step(rpc, db, &mut state, &mut status).await?; + Ok(()) +} + +/// Synchronize local db with blockchain and perform transactions +async fn worker_step( + rpc: &mut Rpc, + db: &mut PgListener, + state: &mut WorkerCfg, + status: &mut bool, +) -> LoopResult<()> { + // Sync chain + if let Some(stuck) = sync_chain(rpc, db, state, status).await? { + // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent + + // Send requested debits + while debit(db, rpc, state).await? {} + + // Bump stuck transactions + for id in stuck { + let bump = rpc.bump_fee(&id).await?; + fail_point("(injected) fail bump", 0.3)?; + let wtid = db::bump_tx_id(&mut *db, &id, &bump.txid).await?; + info!(target: "worker", ">> (bump) {wtid} replace {id} with {}", bump.txid); + } + + // Send requested bounce + while bounce(db, rpc, &state.bounce_fee).await? {} + } + Ok(()) +} + /// Parse new transactions, return stuck transactions if the database is up to date with the latest mined block async fn sync_chain( rpc: &mut Rpc, @@ -187,7 +223,7 @@ async fn sync_chain( db::update_status(db, new_status).await?; *status = new_status; if new_status { - info!("Recovered lost transactions"); + info!(target: "worker", "Recovered lost transactions"); } } if !new_status { @@ -257,7 +293,7 @@ async fn sync_chain_removed( } } } - error!("{buf}"); + error!(target: "worker", "{buf}"); Ok(false) } @@ -289,7 +325,7 @@ async fn sync_chain_incoming_confirmed( valued_at: _, } => { if new { - info!("<< {amount} {reserve_pub} in {id} from {debit_addr}"); + info!(target: "worker", "<< {amount} {reserve_pub} in {id} from {debit_addr}"); } } AddIncomingResult::ReservePubReuse => { @@ -320,7 +356,7 @@ async fn sync_chain_debit( if confirmations < 0 { // Handle conflicting tx if full.replaced_by_txid.is_none() && db::conflict_tx_out(db, txid).await? { - warn!(">> (conflict) {wtid} in {txid} to {credit_addr}"); + warn!(target: "worker", ">> (conflict) {wtid} in {txid} to {credit_addr}"); } } else { match db::sync_out( @@ -336,16 +372,17 @@ async fn sync_chain_debit( .await? { SyncOutResult::New => { - warn!(">> (onchain) {amount} {wtid} in {txid} to {credit_addr}"); + warn!(target: "worker", ">> (onchain) {amount} {wtid} in {txid} to {credit_addr}"); } SyncOutResult::Replaced => { info!( + target: "worker", ">> (recovered) {wtid} replace {txid} with {}", full.replaced_by_txid.unwrap() ) } SyncOutResult::Recovered => { - warn!(">> (recovered) {amount} {wtid} in {txid} to {credit_addr}") + warn!(target: "worker", ">> (recovered) {amount} {wtid} in {txid} to {credit_addr}") } SyncOutResult::None => {} } @@ -376,12 +413,14 @@ async fn sync_chain_bounce( if confirmations < 0 { // Handle conflicting tx if db::conflict_bounce(db, txid).await? { - warn!("|| (conflict) {bounced} in {txid}"); + warn!(target: "worker", "|| (conflict) {bounced} in {txid}"); } } else { match db::sync_bounce(db, txid, bounced, &Timestamp::now()).await? { - SyncBounceResult::New => warn!("|| (onchain) {bounced} in {txid}"), - SyncBounceResult::Recovered => warn!("|| (recovered) {bounced} in {txid}"), + SyncBounceResult::New => warn!(target: "worker", "|| (onchain) {bounced} in {txid}"), + SyncBounceResult::Recovered => { + warn!(target: "worker", "|| (recovered) {bounced} in {txid}") + } SyncBounceResult::None => {} } } @@ -410,7 +449,7 @@ async fn sync_chain_outgoing( sync_chain_bounce(id, &Txid::from_byte_array(bounced), db, confirmations).await? } }, - Ok((_, Err(e))) => warn!("send: decode-info {id} - {e}"), + Ok((_, Err(e))) => warn!(target: "worker", "send: decode-info {id} - {e}"), Err(e) => match e { GetOpReturnErr::MissingOpReturn => { /* Ignore */ } GetOpReturnErr::RPC(e) => return Err(e)?, @@ -436,7 +475,7 @@ async fn debit(db: &mut PgListener, rpc: &mut Rpc, state: &WorkerCfg) -> LoopRes fail_point("(injected) fail debit", 0.3)?; db::debit_sent(db, id, &txid).await?; let amount = btc_to_taler(&amount.to_signed().unwrap(), &state.currency); - info!(">> {amount} {wtid} in {txid} to {addr}"); + info!(target: "worker", ">> {amount} {wtid} in {txid} to {addr}"); Ok(true) } else { Ok(false) @@ -459,9 +498,9 @@ async fn bounce(db: &mut PgListener, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResu fail_point("(injected) fail bounce", 0.3)?; db::bounce_set_status(db, id, Some(&txid), &BounceStatus::sent).await?; if let Some(reason) = reason { - info!("|| {bounced} in {txid}: {reason}"); + info!(target: "worker", "|| {bounced} in {txid}: {reason}"); } else { - info!("|| {bounced} in {txid}"); + info!(target: "worker", "|| {bounced} in {txid}"); } } Err(err) => match err { @@ -469,7 +508,7 @@ async fn bounce(db: &mut PgListener, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResu code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, msg: _, } => { - warn!("{err}"); + warn!(target: "worker", "{err}"); return Ok(false); } e => Err(e)?, diff --git a/depolymerizer-bitcoin/src/main.rs b/depolymerizer-bitcoin/src/main.rs @@ -13,175 +13,17 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use axum::{Router, middleware}; -use clap::Parser; + +use clap::Parser as _; use depolymerizer_bitcoin::{ - CONFIG_SOURCE, DB_SCHEMA, - api::{ServerState, status_middleware}, - config::{ServeCfg, WorkerCfg, parse_db_cfg}, - db, - rpc::Rpc, -}; -use taler_api::api::TalerRouter as _; -use taler_common::{ - CommonArgs, - cli::{ConfigCmd, long_version}, - config::Config, - db::{dbinit, pool}, - taler_main, + CONFIG_SOURCE, + cli::{Args, run}, }; -use tokio::try_join; -use tracing::info; - -use crate::loops::{watcher::watcher, worker::worker}; - -mod fail_point; -mod loops; - -/// Taler adapter for bitcoincore -#[derive(clap::Parser, Debug)] -#[command(long_version = long_version(), about, long_about = None)] -struct Args { - #[clap(flatten)] - common: CommonArgs, - #[clap(subcommand)] - cmd: Command, -} - -#[derive(clap::Subcommand, Debug)] -enum Command { - /// Initialize btc-wire database - Dbinit { - /// Reset database (DANGEROUS: All existing data is lost) - #[clap(long, short)] - reset: bool, - }, - /// TODO - Setup { - #[clap(long, short)] - reset: bool, - }, - /// Run btc-wire worker - Worker { - /// Execute once and return - #[clap(long, short)] - transient: bool, - }, - /// Run btc-wire HTTP server - Serve { - /// Check whether an API is in use (if it's useful to start the HTTP - /// server). Exit with 0 if at least one API is enabled, otherwise 1 - #[clap(long)] - check: bool, - }, - #[command(subcommand)] - Config(ConfigCmd), -} - -/// TODO support external signer https://github.com/bitcoin/bitcoin/blob/master/doc/external-signer.md - -async fn app(args: Args, cfg: Config) -> anyhow::Result<()> { - match args.cmd { - Command::Dbinit { reset } => { - let cfg = parse_db_cfg(&cfg)?; - let pool = pool(cfg.cfg, DB_SCHEMA).await?; - let mut conn = pool.acquire().await?; - dbinit( - &mut conn, - cfg.sql_dir.as_ref(), - CONFIG_SOURCE.component_name, - reset, - ) - .await?; - } - Command::Setup { reset } => { - info!("Connect to bitcoind"); - let state = WorkerCfg::parse(&cfg)?; - let mut rpc = Rpc::wallet(&state.rpc_cfg, &state.wallet_cfg.name).await?; - let info = rpc.get_blockchain_info().await?; - info!("Running on {} chain", info.chain); - - #[cfg(feature = "fail")] - if info.chain != "regtest" { - anyhow::bail!("Running with random failures is unsuitable for production"); - } - let genesis_hash = rpc.get_genesis().await.unwrap(); - // TODO wait for the blockchain to sync - // TODO Check wire wallet own config PAYTO address - - info!("Check wallet"); - rpc.load_wallet(&state.wallet_cfg.name).await?; - if let Some(password) = &state.wallet_cfg.password { - rpc.unlock_wallet(password).await?; - } - - info!("Setup database state"); - let db_cfg = parse_db_cfg(&cfg)?; - let pool = pool(db_cfg.cfg, DB_SCHEMA).await?; - - // Init status to true - try_join!( - db::init_status(&pool), - db::init_sync_state(&pool, &genesis_hash) - )?; - // TODO reset ? - - println!("Database initialised"); - } - Command::Worker { transient } => { - let state = WorkerCfg::parse(&cfg)?; - let db_cfg = parse_db_cfg(&cfg)?; - let pool = pool(db_cfg.cfg, DB_SCHEMA).await?; - - #[cfg(feature = "fail")] - tracing::warn!("Running with random failures"); - // TODO Check wire wallet own config PAYTO address - - tokio::spawn(watcher(state.rpc_cfg.clone(), pool.clone())); - tokio::spawn(worker(state, pool)).await?; - - info!("btc-wire stopped"); - } - Command::Serve { check } => { - if check { - let cfg = ServeCfg::parse(&cfg)?; - if cfg.revenue.is_none() && cfg.wire_gateway.is_none() { - std::process::exit(1); - } - } else { - let db = parse_db_cfg(&cfg)?; - let pool = pool(db.cfg, DB_SCHEMA).await?; - let cfg = ServeCfg::parse(&cfg)?; - let api = ServerState::start(pool, cfg.payto, cfg.currency).await; - let mut router = Router::new(); - - if let Some(cfg) = cfg.wire_gateway { - router = router.wire_gateway(api.clone(), cfg.auth); - } else { - panic!("lol") - } - - /*if let Some(cfg) = cfg.revenue { - router = router.revenue(api, cfg.auth); - }*/ - // TODO http lifetime - router - .layer(middleware::from_fn_with_state( - api.clone(), - status_middleware, - )) - .serve(cfg.serve, cfg.lifetime) - .await?; - } - } - Command::Config(cfg_cmd) => cfg_cmd.run(cfg)?, - } - Ok(()) -} +use taler_common::taler_main; fn main() { let args = Args::parse(); - taler_main(CONFIG_SOURCE, args.common.clone(), |cfg| async move { - app(args, cfg).await + taler_main(CONFIG_SOURCE, args.common, |cfg| async move { + run(args.cmd, &cfg).await }) } diff --git a/depolymerizer-bitcoin/src/rpc.rs b/depolymerizer-bitcoin/src/rpc.rs @@ -31,7 +31,9 @@ use bitcoin::{Address, Amount, BlockHash, SignedAmount, Txid, address::NetworkUn use serde_json::{Value, json}; use std::{ fmt::Debug, - io::{IoSlice, Write as _}, + io::{ErrorKind, IoSlice, Write as _}, + path::PathBuf, + str::FromStr as _, time::Duration, }; use tokio::{ @@ -193,7 +195,13 @@ impl Rpc { // TODO error let token = match &cfg.auth { RpcAuth::Basic(s) => s.as_bytes().to_vec(), - RpcAuth::Cookie(path) => std::fs::read(path)?, + RpcAuth::Cookie(path) => match std::fs::read(path) { + Ok(content) => content, + Err(e) if e.kind() == ErrorKind::IsADirectory => { + std::fs::read(PathBuf::from_str(path).unwrap().join(".cookie"))? + } + Err(e) => return Err(e), + }, }; // Open connection let sock = timeout(Duration::from_secs(5), TcpStream::connect(&cfg.addr)).await??; @@ -285,6 +293,11 @@ impl Rpc { Ok(Amount::from_btc(btc).unwrap()) } + /// Get current balance amount + pub async fn addr_info(&mut self, addr: &Address) -> Result<AddressInfo> { + self.call("getaddressinfo", &[addr]).await + } + /* ----- Mining ----- */ /// Mine a certain amount of block to profit a given address @@ -299,6 +312,11 @@ impl Rpc { self.call("getblockchaininfo", &EMPTY).await } + /// Get blockchain info + pub async fn get_wallet_info(&mut self) -> Result<WalletInfo> { + self.call("getwalletinfo", &EMPTY).await + } + /// Get chain tips pub async fn get_chain_tips(&mut self) -> Result<Vec<ChainTips>> { self.call("getchaintips", &EMPTY).await @@ -355,7 +373,7 @@ impl Rpc { let inputs: Vec<_> = from .into_iter() .enumerate() - .map(|(i, id)| json!({"txid": id.to_string(), "vout": i})) + .map(|(i, id)| json!({"txid": id, "vout": i})) .collect(); let mut outputs: Vec<Value> = to .into_iter() @@ -453,6 +471,12 @@ pub struct BlockchainInfo { pub best_block_hash: BlockHash, } +#[derive(Clone, Debug, serde::Deserialize)] +pub struct WalletInfo { + pub walletname: String, + pub scanning: serde_json::Value, +} + #[derive(Debug, serde::Deserialize)] pub struct BumpResult { pub txid: Txid, @@ -499,6 +523,7 @@ pub struct ListTransaction { pub confirmations: i32, pub txid: Txid, pub category: Category, + pub vout: i32 } #[derive(Debug, serde::Deserialize)] @@ -574,6 +599,13 @@ pub enum ChainTipsStatus { } #[derive(Debug, serde::Deserialize)] +pub struct AddressInfo { + pub ismine: bool, + pub iswatchonly: bool, + pub solvable: bool, +} + +#[derive(Debug, serde::Deserialize)] pub struct Nothing {} /// Bitcoin RPC error codes <https://github.com/bitcoin/bitcoin/blob/master/src/rpc/protocol.h> diff --git a/depolymerizer-bitcoin/src/setup.rs b/depolymerizer-bitcoin/src/setup.rs @@ -0,0 +1,82 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use anyhow::bail; +use taler_common::{config::Config, db::pool}; +use tokio::try_join; +use tracing::info; + +use crate::{ + DB_SCHEMA, + config::{WorkerCfg, parse_db_cfg}, + db, + payto::BtcWallet, + rpc::Rpc, +}; + +pub async fn setup(cfg: &Config, reset: bool) -> anyhow::Result<()> { + info!(target: "setup", "Check bitcoind RPC connection"); + let worker_cfg = WorkerCfg::parse(cfg)?; + let mut rpc = Rpc::wallet(&worker_cfg.rpc_cfg, &worker_cfg.wallet_cfg.name).await?; + let info = rpc.get_blockchain_info().await?; + info!(target: "setup", "Running on {} chain", info.chain); + + #[cfg(feature = "fail")] + if info.chain != "regtest" { + anyhow::bail!("Running with random failures is unsuitable for production"); + } + let genesis_hash = rpc.get_genesis().await.unwrap(); + + // TODO wait for the blockchain to sync && wallet scan ? + // Wait for verification_progress = 1.0 and use initial_block_download as a hint + + info!(target: "setup", "Check wallet"); + rpc.load_wallet(&worker_cfg.wallet_cfg.name).await?; + if let Some(password) = &worker_cfg.wallet_cfg.password { + rpc.unlock_wallet(password).await?; + } + + info!(target: "setup", "Check address"); + let wallet: BtcWallet = cfg + .section("depolymerizer-bitcoin") + .parse("bitcoin wallet address", "WALLET") + .require()?; + let addr_info = rpc.addr_info(&wallet.0).await?; + if !addr_info.ismine { + bail!( + "Address {} does not belong to wallet '{}'", + wallet.0, + worker_cfg.wallet_cfg.name + ); + } else if addr_info.iswatchonly { + bail!("Address {} is watchonly", wallet.0); + } else if !addr_info.solvable { + bail!("Address {} is not solvable", wallet.0); + } + + info!(target: "setup", "Setup database state"); + let db_cfg = parse_db_cfg(cfg)?; + let pool = pool(db_cfg.cfg, DB_SCHEMA).await?; + + // Init status to true + try_join!( + db::init_status(&pool), + db::init_sync_state(&pool, &genesis_hash, reset) + )?; + + info!(target: "setup", "Worker setup"); + Ok(()) +} diff --git a/instrumentation/Cargo.toml b/instrumentation/Cargo.toml @@ -1,29 +0,0 @@ -[package] -name = "instrumentation" -version = "0.1.0" -edition.workspace = true -authors.workspace = true -homepage.workspace = true -repository.workspace = true -license-file.workspace = true - -[dependencies] -# Cli args parser -clap.workspace = true -common = { path = "../common" } -# Bitcoin -depolymerizer-bitcoin = { path = "../depolymerizer-bitcoin" } -bitcoin.workspace = true -# Wire Gateway -ureq = { version = "3.0.0", features = ["json"] } -# Generate temporary files -tempfile = "3.3.0" -# terminal color -owo-colors = "4.0.0" -# Edit toml files -rust-ini = "0.21.0" -# Progress reporting -indicatif = "0.18.0" -taler-common.workspace = true -tokio.workspace = true -anyhow.workspace = true diff --git a/instrumentation/src/btc.rs b/instrumentation/src/btc.rs @@ -1,1064 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::{ - net::{IpAddr, Ipv4Addr, SocketAddr}, - ops::{Deref, DerefMut}, - path::Path, - time::Duration, -}; - -use bitcoin::{Address, Amount}; -use common::taler_common::{ - api_common::{EddsaPublicKey, ShortHashCode}, - types::base32::Base32, -}; -use depolymerizer_bitcoin::{ - CONFIG_SOURCE, - config::{RpcAuth, RpcCfg, ServeCfg, WorkerCfg}, - payto::BtcWallet, - rpc::{Category, Rpc}, - rpc_utils::segwit_min_amount, - taler_utils::btc_to_taler, -}; -use ini::Ini; -use taler_common::{config::Config, types::payto::Payto}; -use tempfile::TempDir; - -use crate::{ - retry, retry_opt, - utils::{ChildGuard, TalerCtx, TestCtx, cmd_redirect, patch_config, transfer, unused_port}, -}; - -pub struct BtcCtx { - btc_node: ChildGuard, - _btc_node2: ChildGuard, - common_rpc: Rpc, - common_rpc2: Rpc, - wire_rpc: Rpc, - client_rpc: Rpc, - reserve_rpc: Rpc, - wire_addr: Address, - pub client_addr: Address, - reserve_addr: Address, - worker_cfg: WorkerCfg, - serve_cfg: ServeCfg, - conf: u16, - ctx: TalerCtx, - node2_addr: String, -} - -impl Deref for BtcCtx { - type Target = TalerCtx; - - fn deref(&self) -> &Self::Target { - &self.ctx - } -} - -impl DerefMut for BtcCtx { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.ctx - } -} - -impl BtcCtx { - pub async fn config( - ctx: &TestCtx, - btc_patch: impl FnOnce(&mut Ini, &Path), - cfg_patch: impl FnOnce(&mut Ini, &Path), - ) { - // Patch configs - let wire_dir = TempDir::new().unwrap(); - let wire_dir = wire_dir.path(); - let port = unused_port(); - let rpc_port = unused_port(); - - patch_config( - "instrumentation/conf/bitcoin.conf", - wire_dir.join("bitcoin.conf"), - |cfg| { - cfg.with_section(Some("regtest")) - .set("bind", format!("127.0.0.1:{port}")) - .set("rpcport", format!("{rpc_port}")); - btc_patch(cfg, wire_dir) - }, - ); - patch_config( - "instrumentation/conf/taler_btc.conf", - wire_dir.join("config.conf"), - |cfg| { - cfg.with_section(Some("depolymerizer-bitcoin-worker")) - .set("RPC_BIND", format!("127.0.0.1:{rpc_port}")); - cfg_patch(cfg, wire_dir) - }, - ); - // Load config - let cfg = Config::from_file(CONFIG_SOURCE, Some(wire_dir.join("config.conf"))).unwrap(); - let rpc_cfg = RpcCfg::parse(&cfg).unwrap(); - // Start bitcoin nodes - let _node = cmd_redirect( - "bitcoind", - &[&format!("-datadir={}", wire_dir.to_string_lossy())], - ctx.log("bitcoind"), - ); - // Connect - retry_opt! { - async { - let mut client = Rpc::common(&rpc_cfg).await?; - client.get_blockchain_info().await?; - Ok::<_, anyhow::Error>(()) - }.await - }; - } - - fn patch_btc_config(from: impl AsRef<Path>, to: impl AsRef<Path>, port: u16, rpc_port: u16) { - patch_config(from, to, |cfg| { - cfg.with_section(Some("regtest")) - .set("bind", format!("127.0.0.1:{port}")) - .set("rpcport", format!("{rpc_port}")); - }) - } - - pub async fn setup(ctx: &TestCtx, config: &str, stressed: bool) -> Self { - let mut ctx = TalerCtx::new(ctx, "depolymerizer-bitcoin", config, stressed); - - // Choose unused port - let btc_port = unused_port(); - let btc_rpc_port = unused_port(); - let btc2_port = unused_port(); - let btc2_rpc_port = unused_port(); - - // Bitcoin config - Self::patch_btc_config( - "instrumentation/conf/bitcoin.conf", - ctx.wire_dir.join("bitcoin.conf"), - btc_port, - btc_rpc_port, - ); - Self::patch_btc_config( - "instrumentation/conf/bitcoin.conf", - ctx.wire2_dir.join("bitcoin.conf"), - btc2_port, - btc2_rpc_port, - ); - patch_config(&ctx.conf, &ctx.conf, |cfg| { - cfg.with_section(Some("depolymerizer-bitcoin-worker")) - .set("RPC_BIND", format!("127.0.0.1:{btc_rpc_port}")) - .set("WALLET_NAME", "wire") - .set( - "RPC_COOKIE_FILE", - ctx.wire_dir.join("regtest/.cookie").to_string_lossy(), - ); - }); - - // Load config - let config = Config::from_file(CONFIG_SOURCE, Some(&ctx.conf)).unwrap(); - let cfg = WorkerCfg::parse(&config).unwrap(); - // Start bitcoin nodes - let btc_node = cmd_redirect( - "bitcoind", - &[&format!("-datadir={}", ctx.wire_dir.to_string_lossy())], - ctx.log("bitcoind"), - ); - let _btc_node2 = cmd_redirect( - "bitcoind", - &[&format!("-datadir={}", ctx.wire2_dir.to_string_lossy())], - ctx.log("bitcoind2"), - ); - - // Setup wallets - let mut common_rpc = retry_opt! { Rpc::common(&cfg.rpc_cfg).await }; - let node2_addr = format!("127.0.0.1:{btc2_port}"); - common_rpc.add_node(&node2_addr).await.unwrap(); - - for name in ["wire", "client", "reserve"] { - common_rpc.create_wallet(name, "").await.unwrap(); - } - let common_rpc2 = retry_opt! { - Rpc::common(&RpcCfg { - addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), btc2_rpc_port), - auth: RpcAuth::Cookie(ctx - .wire2_dir - .join("regtest/.cookie") - .to_string_lossy() - .to_string()), - }).await - }; - - // Generate money - let mut reserve_rpc = Rpc::wallet(&cfg.rpc_cfg, "reserve").await.unwrap(); - let mut client_rpc = Rpc::wallet(&cfg.rpc_cfg, "client").await.unwrap(); - let mut wire_rpc = Rpc::wallet(&cfg.rpc_cfg, "wire").await.unwrap(); - let reserve_addr = reserve_rpc.gen_addr().await.unwrap(); - let client_addr = client_rpc.gen_addr().await.unwrap(); - let wire_addr = wire_rpc.gen_addr().await.unwrap(); - common_rpc.mine(101, &reserve_addr).await.unwrap(); - reserve_rpc - .send(&client_addr, &(Amount::ONE_BTC * 10), None, false) - .await - .unwrap(); - common_rpc.mine(1, &reserve_addr).await.unwrap(); - - patch_config(&ctx.conf, &ctx.conf, |cfg| { - cfg.with_section(Some("depolymerizer-bitcoin")) - .set("NAME", "Exchange Owner") - .set("WALLET", reserve_addr.to_string()); - }); - - let config = Config::from_file(CONFIG_SOURCE, Some(&ctx.conf)).unwrap(); - let serve_cfg = ServeCfg::parse(&config).unwrap(); - - // Setup & run - ctx.dbinit(); - ctx.setup(); - ctx.run().await; - - Self { - ctx, - btc_node, - common_rpc, - wire_rpc, - client_rpc, - reserve_rpc, - wire_addr, - client_addr, - reserve_addr, - conf: cfg.confirmation as u16, - worker_cfg: cfg, - serve_cfg, - _btc_node2, - common_rpc2, - node2_addr, - } - } - - pub fn reset_db(&mut self) { - self.ctx.reset_db(); - self.ctx.setup(); - } - - pub async fn stop_node(&mut self) { - // We need to kill bitcoin gracefully to avoid corruption - self.common_rpc.stop().await.unwrap(); - self.btc_node.0.wait().unwrap(); - } - - pub async fn cluster_deco(&mut self) { - self.common_rpc - .disconnect_node(&self.node2_addr) - .await - .unwrap(); - } - - pub async fn cluster_fork(&mut self) { - let node1_height = self.common_rpc.get_blockchain_info().await.unwrap().blocks; - let node2_height = self.common_rpc2.get_blockchain_info().await.unwrap().blocks; - let diff = node1_height - node2_height; - self.common_rpc2 - .mine((diff + 1) as u16, &self.reserve_addr) - .await - .unwrap(); - self.common_rpc.add_node(&self.node2_addr).await.unwrap(); - } - - pub async fn restart_node(&mut self, additional_args: &[&str]) { - self.stop_node().await; - self.resume_node(additional_args).await; - } - - pub async fn resume_node(&mut self, additional_args: &[&str]) { - let datadir = format!("-datadir={}", self.ctx.wire_dir.to_string_lossy()); - let mut args = vec![datadir.as_str()]; - args.extend_from_slice(additional_args); - self.btc_node = cmd_redirect("bitcoind", &args, self.ctx.log("bitcoind")); - self.common_rpc = retry_opt! { Rpc::common(&self.worker_cfg.rpc_cfg).await }; - self.common_rpc.add_node(&self.node2_addr).await.unwrap(); - for name in ["client", "reserve", "wire"] { - self.common_rpc.load_wallet(name).await.ok(); - } - - self.reserve_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "reserve") - .await - .unwrap(); - self.client_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "client") - .await - .unwrap(); - self.wire_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "wire").await.unwrap(); - } - - /* ----- Transaction ------ */ - - pub async fn credit(&mut self, amount: Amount, metadata: &EddsaPublicKey) { - self.client_rpc - .send_segwit_key(&self.wire_addr, &amount, metadata) - .await - .unwrap(); - } - - pub async fn debit(&mut self, amount: Amount, metadata: &ShortHashCode) { - transfer( - &self.ctx.gateway_url, - metadata, - Payto::new(BtcWallet(self.client_addr.clone())) - .as_payto() - .as_full_payto("name"), - &btc_to_taler(&amount.to_signed().unwrap(), &self.worker_cfg.currency), - ) - .await - } - - pub async fn malformed_credit(&mut self, amount: &Amount) { - self.client_rpc - .send(&self.wire_addr, amount, None, false) - .await - .unwrap(); - } - - pub async fn reset_wallet(&mut self) { - let amount = self.wire_balance().await; - self.wire_rpc - .send(&self.client_addr, &amount, None, true) - .await - .unwrap(); - self.next_block().await; - } - - async fn abandon(rpc: &mut Rpc) { - let list = rpc.list_since_block(None, 1).await.unwrap(); - for tx in list.transactions { - if tx.category == Category::Send && tx.confirmations == 0 { - rpc.abandon_tx(&tx.txid).await.unwrap(); - } - } - } - - pub async fn abandon_wire(&mut self) { - Self::abandon(&mut self.wire_rpc).await; - } - - pub async fn abandon_client(&mut self) { - Self::abandon(&mut self.client_rpc).await; - } - - /* ----- Mining ----- */ - - async fn mine(&mut self, nb: u16) { - self.common_rpc.mine(nb, &self.reserve_addr).await.unwrap(); - } - - pub async fn next_conf(&mut self) { - self.mine(self.conf).await - } - - pub async fn next_block(&mut self) { - self.mine(1).await - } - - /* ----- Balances ----- */ - - pub async fn client_balance(&mut self) -> Amount { - self.client_rpc.get_balance().await.unwrap() - } - - pub async fn wire_balance(&mut self) -> Amount { - self.wire_rpc.get_balance().await.unwrap() - } - - pub async fn expect_client_balance(&mut self, balance: Amount, mine: bool) { - retry! {{ - let check = balance == self.client_balance().await; - if !check && mine { - self.next_block().await; - } - check - }} - } - - pub async fn expect_wire_balance(&mut self, balance: Amount, mine: bool) { - retry! {{ - let check = balance == self.wire_balance().await; - if !check && mine { - self.next_block().await; - } - check - }} - } - - /* ----- Wire Gateway ----- */ - - pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) { - let txs: Vec<_> = txs - .iter() - .map(|(metadata, amount)| { - ( - metadata.clone(), - btc_to_taler(&amount.to_signed().unwrap(), &self.worker_cfg.currency), - ) - }) - .collect(); - self.ctx.expect_credits(&txs).await - } - - pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) { - let txs: Vec<_> = txs - .iter() - .map(|(metadata, amount)| { - ( - metadata.clone(), - btc_to_taler(&amount.to_signed().unwrap(), &self.worker_cfg.currency), - ) - }) - .collect(); - self.ctx.expect_debits(&txs).await - } -} - -/// Test btc-wire correctly receive and send transactions on the blockchain -pub async fn wire(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; - - ctx.step("Credit"); - { - // Send transactions - let mut balance = ctx.wire_balance().await; - let mut txs = Vec::new(); - for n in 10..100 { - let metadata = Base32::rand(); - let amount = Amount::from_sat(n * 1000); - ctx.credit(amount, &metadata).await; - txs.push((metadata, amount)); - balance += amount; - ctx.next_block().await; - } - ctx.next_conf().await; - ctx.expect_credits(&txs).await; - ctx.expect_wire_balance(balance, false).await; - }; - - ctx.step("Debit"); - { - let mut balance = ctx.client_balance().await; - let mut txs = Vec::new(); - for n in 10..100 { - let metadata = Base32::rand(); - let amount = Amount::from_sat(n * 100); - balance += amount; - ctx.debit(amount, &metadata).await; - txs.push((metadata, amount)); - } - ctx.next_block().await; - ctx.expect_debits(&txs).await; - ctx.expect_client_balance(balance, true).await; - } - - ctx.step("Bounce"); - { - ctx.reset_wallet().await; - // Send bad transactions - let mut balance = ctx.wire_balance().await; - for n in 10..40 { - ctx.malformed_credit(&Amount::from_sat(n * 1000)).await; - balance += ctx.worker_cfg.bounce_fee; - } - ctx.next_conf().await; - ctx.expect_wire_balance(balance, true).await; - } -} - -/// Check btc-wire and wire-gateway correctly stop when a lifetime limit is configured -pub async fn lifetime(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc_lifetime.conf", false).await; - ctx.step("Check lifetime"); - // Start up - retry! { ctx.wire_running() && ctx.gateway_running() }; - // Consume wire lifetime - for _ in 0..=ctx.worker_cfg.lifetime.unwrap() + 2 { - ctx.credit(segwit_min_amount(), &Base32::rand()).await; - ctx.next_block().await; - tokio::time::sleep(Duration::from_millis(100)).await; - } - retry! { !ctx.wire_running() }; - // Consume gateway lifetime - for _ in 0..=ctx.serve_cfg.lifetime.unwrap() { - ctx.debit(segwit_min_amount(), &Base32::rand()).await; - ctx.next_block().await; - } - // End down - retry! { !ctx.wire_running() && !ctx.gateway_running() }; -} - -/// Check the capacity of wire-gateway and btc-wire to recover from database and node loss -pub async fn reconnect(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; - - let mut credits = Vec::new(); - let mut debits = Vec::new(); - - ctx.step("With DB"); - { - let metadata = Base32::rand(); - let amount = Amount::from_sat(42000); - ctx.credit(amount, &metadata).await; - credits.push((metadata, amount)); - ctx.next_block().await; - ctx.next_conf().await; - ctx.expect_credits(&credits).await; - }; - - ctx.step("Without DB"); - { - ctx.stop_db(); - ctx.malformed_credit(&Amount::from_sat(24000)).await; - let metadata = Base32::rand(); - let amount = Amount::from_sat(40000); - ctx.credit(amount, &metadata).await; - credits.push((metadata, amount)); - ctx.stop_node().await; - ctx.expect_gateway_down().await; - } - - ctx.step("Reconnect DB"); - { - ctx.resume_db(); - ctx.resume_node(&[]).await; - let metadata = Base32::rand(); - let amount = Amount::from_sat(2000); - ctx.debit(amount, &metadata).await; - debits.push((metadata, amount)); - ctx.next_conf().await; - ctx.expect_debits(&debits).await; - ctx.expect_credits(&credits).await; - } - - ctx.step("Recover DB"); - { - let balance = ctx.wire_balance().await; - ctx.reset_db(); - ctx.next_block().await; - ctx.expect_debits(&debits).await; - ctx.expect_credits(&credits).await; - ctx.expect_wire_balance(balance, true).await; - } -} - -/// Test btc-wire ability to recover from errors in correctness critical paths and prevent concurrent sending -pub async fn stress(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", true).await; - - let mut credits = Vec::new(); - let mut debits = Vec::new(); - - ctx.step("Credit"); - { - let mut balance = ctx.wire_balance().await; - for n in 10..30 { - let metadata = Base32::rand(); - let amount = Amount::from_sat(n * 1000); - ctx.credit(amount, &metadata).await; - credits.push((metadata, amount)); - balance += amount; - ctx.next_block().await; - } - ctx.next_conf().await; - ctx.expect_credits(&credits).await; - ctx.expect_wire_balance(balance, true).await; - }; - - ctx.step("Debit"); - { - let mut balance = ctx.client_balance().await; - for n in 10..30 { - let metadata = Base32::rand(); - let amount = Amount::from_sat(n * 100); - balance += amount; - ctx.debit(amount, &metadata).await; - debits.push((metadata, amount)); - } - ctx.next_block().await; - ctx.expect_debits(&debits).await; - ctx.expect_client_balance(balance, true).await; - } - - ctx.step("Bounce"); - { - ctx.reset_wallet().await; - let mut balance = ctx.wire_balance().await; - for n in 10..30 { - ctx.malformed_credit(&Amount::from_sat(n * 1000)).await; - balance += ctx.worker_cfg.bounce_fee; - } - ctx.next_conf().await; - ctx.expect_wire_balance(balance, true).await; - } - - ctx.step("Recover DB"); - { - let balance = ctx.wire_balance().await; - ctx.reset_db(); - ctx.next_block().await; - ctx.expect_debits(&debits).await; - ctx.expect_credits(&credits).await; - ctx.expect_wire_balance(balance, true).await; - } -} - -/// Test btc-wire ability to handle conflicting outgoing transactions -pub async fn conflict(tctx: TestCtx) { - tctx.step("Setup"); - let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; - - ctx.step("Conflict send"); - { - // Perform credit - let amount = Amount::from_sat(4200000); - ctx.credit(amount, &Base32::rand()).await; - ctx.next_conf().await; - ctx.expect_wire_balance(amount, true).await; - let client = ctx.client_balance().await; - let wire = ctx.wire_balance().await; - - // Perform debit - ctx.debit(Amount::from_sat(400000), &Base32::rand()).await; - retry! { ctx.wire_balance().await < wire }; - - // Abandon pending transaction - ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; - ctx.abandon_wire().await; - ctx.expect_client_balance(client, false).await; - ctx.expect_wire_balance(wire, false).await; - - // Generate conflict - ctx.debit(Amount::from_sat(500000), &Base32::rand()).await; - retry! { ctx.wire_balance().await < wire }; - - // Resend conflicting transaction - ctx.restart_node(&[]).await; - ctx.next_block().await; - let wire = ctx.wire_balance().await; - retry! { ctx.wire_balance().await < wire }; - } - - ctx.step("Setup"); - drop(ctx); - let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; - ctx.credit(Amount::from_sat(3000000), &Base32::rand()).await; - ctx.next_block().await; - - ctx.step("Conflict bounce"); - { - // Perform bounce - let wire = ctx.wire_balance().await; - let bounce_amount = Amount::from_sat(4000000); - ctx.malformed_credit(&bounce_amount).await; - ctx.next_conf().await; - let fee = ctx.worker_cfg.bounce_fee; - ctx.expect_wire_balance(wire + fee, true).await; - - // Abandon pending transaction - ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; - ctx.abandon_wire().await; - ctx.expect_wire_balance(wire + bounce_amount, false).await; - - // Generate conflict - let amount = Amount::from_sat(50000); - ctx.debit(amount, &Base32::rand()).await; - retry! { ctx.wire_balance().await < (wire + bounce_amount) }; - - // Resend conflicting transaction - ctx.restart_node(&[]).await; - let wire = ctx.wire_balance().await; - ctx.next_block().await; - retry! { ctx.wire_balance().await < wire }; - } -} - -/// Test btc-wire correctness when a blockchain reorganization occurs -pub async fn reorg(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; - - ctx.step("Handle reorg incoming transactions"); - { - // Loose second bitcoin node - ctx.cluster_deco().await; - - // Perform credits - let before = ctx.wire_balance().await; - for n in 10..21 { - ctx.credit(Amount::from_sat(n * 10000), &Base32::rand()) - .await; - ctx.next_block().await; - } - let after = ctx.wire_balance().await; - - // Perform fork and check btc-wire hard error - ctx.expect_gateway_up().await; - ctx.cluster_fork().await; - ctx.expect_wire_balance(before, false).await; - ctx.expect_gateway_down().await; - - // Recover orphaned transaction - ctx.mine(12).await; - ctx.expect_wire_balance(after, false).await; - ctx.expect_gateway_up().await; - } - - ctx.step("Handle reorg outgoing transactions"); - { - // Loose second bitcoin node - ctx.cluster_deco().await; - - // Perform debits - let before = ctx.client_balance().await; - let mut after = ctx.client_balance().await; - for n in 10..21 { - let amount = Amount::from_sat(n * 100); - ctx.debit(amount, &Base32::rand()).await; - after += amount; - } - ctx.next_block().await; - ctx.expect_client_balance(after, true).await; - - // Perform fork and check btc-wire still up - ctx.expect_gateway_up().await; - ctx.cluster_fork().await; - ctx.expect_client_balance(before, false).await; - ctx.expect_gateway_up().await; - - // Recover orphaned transaction - ctx.next_conf().await; - ctx.expect_client_balance(after, false).await; - } - - ctx.step("Handle reorg bounce"); - { - ctx.reset_wallet().await; - - // Loose second bitcoin node - ctx.cluster_deco().await; - - // Perform bounce - let before = ctx.wire_balance().await; - let mut after = ctx.wire_balance().await; - for n in 10..21 { - ctx.malformed_credit(&Amount::from_sat(n * 1000)).await; - after += ctx.worker_cfg.bounce_fee; - } - ctx.next_conf().await; - ctx.expect_wire_balance(after, true).await; - - // Perform fork and check btc-wire hard error - ctx.expect_gateway_up().await; - ctx.cluster_fork().await; - ctx.expect_wire_balance(before, false).await; - ctx.expect_gateway_down().await; - - // Recover orphaned transaction - ctx.mine(12).await; - ctx.expect_wire_balance(after, false).await; - ctx.expect_gateway_up().await; - } -} - -/// Test btc-wire correctness when a blockchain reorganization occurs leading to past incoming transaction conflict -pub async fn hell(tctx: TestCtx) { - macro_rules! step { - ($ctx:ident, $action:expr) => { - // Loose second bitcoin node - $ctx.cluster_deco().await; - - // Perform action - $action; - - // Perform fork and check btc-wire hard error - $ctx.expect_gateway_up().await; - $ctx.cluster_fork().await; - $ctx.expect_gateway_down().await; - - // Generate conflict - $ctx.restart_node(&["-minrelaytxfee=0.001"]).await; - $ctx.abandon_client().await; - let amount = Amount::from_sat(54000); - $ctx.credit(amount, &Base32::rand()).await; - $ctx.expect_wire_balance(amount, true).await; - - // Check btc-wire suspend operation - let bounce_amount = Amount::from_sat(34000); - $ctx.malformed_credit(&bounce_amount).await; - $ctx.next_conf().await; - $ctx.expect_wire_balance(amount + bounce_amount, true).await; - $ctx.expect_gateway_down().await; - }; - } - - tctx.step("Setup"); - let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; - ctx.step("Handle reorg conflicting incoming credit"); - step!(ctx, { - let amount = Amount::from_sat(420000); - ctx.credit(amount, &Base32::rand()).await; - ctx.next_conf().await; - ctx.expect_wire_balance(amount, true).await; - }); - - drop(ctx); - tctx.step("Setup"); - let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; - ctx.step("Handle reorg conflicting incoming credit"); - step!(ctx, { - let amount = Amount::from_sat(420000); - ctx.malformed_credit(&amount).await; - ctx.next_conf().await; - let fee = ctx.worker_cfg.bounce_fee; - ctx.expect_wire_balance(fee, true).await; - }); -} - -/// Test btc-wire ability to learn and protect itself from blockchain behavior -pub async fn analysis(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; - - ctx.step("Learn from reorg"); - - // Loose second bitcoin node - ctx.cluster_deco().await; - - // Perform credit - let before = ctx.wire_balance().await; - ctx.credit(Amount::from_sat(42000), &Base32::rand()).await; - ctx.next_conf().await; - let after = ctx.wire_balance().await; - - // Perform fork and check btc-wire hard error - ctx.expect_gateway_up().await; - ctx.cluster_fork().await; - ctx.expect_wire_balance(before, false).await; - ctx.expect_gateway_down().await; - - // Recover orphaned transaction - ctx.next_conf().await; - ctx.next_block().await; // Conf have changed - ctx.expect_wire_balance(after, false).await; - ctx.expect_gateway_up().await; - - // Loose second bitcoin node - ctx.cluster_deco().await; - - // Perform credit - let before = ctx.wire_balance().await; - ctx.credit(Amount::from_sat(42000), &Base32::rand()).await; - ctx.next_conf().await; - - // Perform fork and check btc-wire learned from previous attack - ctx.expect_gateway_up().await; - ctx.cluster_fork().await; - ctx.expect_wire_balance(before, false).await; - ctx.expect_gateway_up().await; -} - -/// Test btc-wire ability to handle stuck transaction correctly -pub async fn bumpfee(tctx: TestCtx) { - tctx.step("Setup"); - let mut ctx = BtcCtx::setup(&tctx, "taler_btc_bump.conf", false).await; - - // Perform credits to allow wire to perform debits latter - for n in 10..13 { - ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()) - .await; - ctx.next_block().await; - } - ctx.next_conf().await; - - ctx.step("Bump fee"); - { - // Perform debit - let mut client = ctx.client_balance().await; - let wire = ctx.wire_balance().await; - let amount = Amount::from_sat(40000); - ctx.debit(amount, &Base32::rand()).await; - retry! { ctx.wire_balance().await < wire }; - - // Bump min relay fee making the previous debit stuck - ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; - - // Check bump happen - client += amount; - ctx.expect_client_balance(client, true).await; - } - - ctx.step("Bump fee reorg"); - { - // Loose second bitcoin node - ctx.cluster_deco().await; - - // Perform debit - let mut client = ctx.client_balance().await; - let wire = ctx.wire_balance().await; - let amount = Amount::from_sat(40000); - ctx.debit(amount, &Base32::rand()).await; - retry! { ctx.wire_balance().await < wire }; - - // Bump min relay fee and fork making the previous debit stuck and problematic - ctx.cluster_fork().await; - ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; - - // Check bump happen - client += amount; - ctx.expect_client_balance(client, true).await; - } - ctx.step("Setup"); - drop(ctx); - let mut ctx = BtcCtx::setup(&tctx, "taler_btc_bump.conf", true).await; - - // Perform credits to allow wire to perform debits latter - for n in 10..61 { - ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()) - .await; - ctx.next_block().await; - } - ctx.next_conf().await; - - ctx.step("Bump fee stress"); - { - // Loose second bitcoin node - ctx.cluster_deco().await; - - // Perform debits - let client = ctx.client_balance().await; - let wire = ctx.wire_balance().await; - let mut total_amount = Amount::ZERO; - for n in 10..31 { - let amount = Amount::from_sat(n * 10000); - total_amount += amount; - ctx.debit(amount, &Base32::rand()).await; - } - retry! { ctx.wire_balance().await < wire - total_amount }; - - // Bump min relay fee making the previous debits stuck - ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; - - // Check bump happen - ctx.expect_client_balance(client + total_amount, true).await; - } -} - -/// Test btc-wire handle transaction fees exceeding limits -pub async fn maxfee(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; - - // Perform credits to allow wire to perform debits latter - for n in 10..31 { - ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()) - .await; - ctx.next_block().await; - } - ctx.next_conf().await; - - let client = ctx.client_balance().await; - let wire = ctx.wire_balance().await; - let mut total_amount = Amount::ZERO; - - ctx.step("Too high fee"); - { - // Change fee config - ctx.restart_node(&["-maxtxfee=0.0000001", "-minrelaytxfee=0.0000001"]) - .await; - - // Perform debits - for n in 10..31 { - let amount = Amount::from_sat(n * 10000); - total_amount += amount; - ctx.debit(amount, &Base32::rand()).await; - } - ctx.mine(2).await; - - // Check no transaction happen - ctx.expect_wire_balance(wire, false).await; - ctx.expect_client_balance(client, false).await; - } - - ctx.step("Good feed"); - { - // Restore default config - ctx.restart_node(&[]).await; - - // Check transaction now have been made - ctx.expect_client_balance(client + total_amount, true).await; - } -} - -/// Test btc-wire ability to configure itself from bitcoin configuration -pub async fn config(ctx: TestCtx) { - // Connect with cookie files - ctx.step("Cookie"); - BtcCtx::config( - &ctx, - |btc, dir| { - btc.with_section(None::<&str>).set( - "rpccookiefile", - dir.join("catch_me_if_you_can").to_string_lossy(), - ); - }, - |cfg, dir| { - cfg.with_section(Some("depolymerizer-bitcoin-worker")).set( - "RPC_COOKIE_FILE", - dir.join("catch_me_if_you_can").to_string_lossy(), - ); - }, - ) - .await; - - // Connect with password - ctx.step("Password"); - BtcCtx::config( - &ctx, - |btc, _| { - btc.with_section(None::<&str>) - .set("rpcuser", "bob") - .set("rpcpassword", "password"); - }, - |cfg, _| { - cfg.with_section(Some("depolymerizer-bitcoin-worker")) - .set("RPC_AUTH_METHOD", "basic") - .set("RPC_USERNAME", "bob") - .set("RPC_PASSWORD", "password"); - }, - ) - .await; - - // Connect with token - ctx.step("Token"); - BtcCtx::config( - &ctx, - |btc, _| { - btc.with_section(None::<&str>) - .set("rpcauth", "bob:9641cec731e1fad1ded02e1d31536e44$36b8b8af0a38104997a57f017805ff56bf8963ae4a2ed40252ca0e0e070fc19e"); - }, - |cfg, _| { - cfg.with_section(Some("depolymerizer-bitcoin-worker")) - .set("RPC_AUTH_METHOD", "basic") - .set("RPC_USERNAME", "bob") - .set("RPC_PASSWORD", "password"); - }, - ).await; -} diff --git a/instrumentation/src/main.rs b/instrumentation/src/main.rs @@ -1,211 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::{ - panic::UnwindSafe, - string::String, - sync::{Arc, Mutex}, - time::{Duration, Instant}, -}; - -use clap::Parser; -use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; -use owo_colors::OwoColorize; -use tokio::task::{JoinError, JoinHandle}; -use utils::TestDb; - -use crate::utils::{TestCtx, try_cmd_redirect}; - -mod btc; -mod utils; - -/// Depolymerizer instrumentation test -#[derive(clap::Parser, Debug)] -struct Args { - /// With tests to run - #[clap(global = true, default_value = "")] - filters: Vec<String>, -} - -struct Tmp<'a> { - filters: &'a [String], - m: MultiProgress, - start_style: ProgressStyle, - ok_style: ProgressStyle, - err_style: ProgressStyle, - tasks: Vec<( - &'static str, - JoinHandle<(Result<(), JoinError>, Duration, String)>, - )>, - db: Arc<TestDb>, - start: Instant, -} - -impl<'a> Tmp<'a> { - async fn check<T, F>(&mut self, name: &'static str, task: T) - where - T: FnOnce(TestCtx) -> F + Send + UnwindSafe + 'static, - F: Future<Output = ()> + Send + 'static, - { - if self.filters.is_empty() || self.filters.iter().any(|f| name.starts_with(f)) { - let pb = self.m.add(ProgressBar::new_spinner()); - pb.set_style(self.start_style.clone()); - pb.set_prefix(name); - pb.set_message("Init"); - pb.enable_steady_tick(Duration::from_millis(1000)); - let ok_style = self.ok_style.clone(); - let err_style = self.err_style.clone(); - let start = self.start; - let db = self.db.clone(); - self.tasks.push(( - name, - tokio::spawn(async move { - let ctx: TestCtx = TestCtx::new(name, pb.clone(), db); - let result = tokio::spawn(task(ctx)).await; - if result.is_ok() { - pb.set_style(ok_style.clone()); - pb.finish_with_message("OK"); - } else { - pb.set_style(err_style.clone()); - pb.finish(); - } - (result, start.elapsed(), pb.message()) - }), - )); - } - } -} - -#[tokio::main] -pub async fn main() { - let Args { filters } = Args::parse(); - std::fs::remove_dir_all("log").ok(); - std::fs::create_dir_all("log/bin").unwrap(); - - // Set panic hook - let failures = Arc::new(Mutex::new(Vec::new())); - { - let failures = failures.clone(); - std::panic::set_hook(Box::new(move |e| { - let backtrace = std::backtrace::Backtrace::force_capture(); - let info = format!("{e}\n{backtrace}"); - if let Some(id) = tokio::task::try_id() { - failures.lock().unwrap().push((id, info)); - } else { - eprintln!("Failed outside of a task:\n{info}") - } - })); - } - - // Build binaries - let p = ProgressBar::new_spinner(); - p.set_style(ProgressStyle::with_template("building {msg} {elapsed:.dim}").unwrap()); - p.enable_steady_tick(Duration::from_millis(1000)); - for name in ["depolymerizer-bitcoin"] { - build_bin(&p, name, None, name); - build_bin(&p, name, Some("fail"), &format!("{name}-fail")); - } - p.finish_and_clear(); - - // Run tests - let m = MultiProgress::new(); - let start_style = - ProgressStyle::with_template("{prefix:.magenta} {msg} {elapsed:.dim}").unwrap(); - let ok_style = - ProgressStyle::with_template("{prefix:.magenta} {msg:.green} {elapsed:.dim}").unwrap(); - let err_style = - ProgressStyle::with_template("{prefix:.magenta} {msg:.red} {elapsed:.dim}").unwrap(); - - let start = Instant::now(); - let db = Arc::new(TestDb::new()); - let mut tmp = Tmp { - filters: filters.as_slice(), - m, - start_style, - ok_style, - err_style, - tasks: Vec::new(), - db, - start, - }; - tmp.check("btc_wire", btc::wire).await; - tmp.check("btc_lifetime", btc::lifetime).await; - tmp.check("btc_reconnect", btc::reconnect).await; - tmp.check("btc_stress", btc::stress).await; - tmp.check("btc_conflict", btc::conflict).await; - tmp.check("btc_reorg", btc::reorg).await; - tmp.check("btc_hell", btc::hell).await; - tmp.check("btc_analysis", btc::analysis).await; - tmp.check("btc_bumpfee", btc::bumpfee).await; - tmp.check("btc_maxfee", btc::maxfee).await; - tmp.check("btc_config", btc::config).await; - let mut results = Vec::new(); - for (name, task) in tmp.tasks { - results.push((name, task.await.unwrap())); - } - - let len = results.len(); - - tmp.m.clear().unwrap(); - let failures = failures.lock().unwrap(); - for (name, (result, _, msg)) in &results { - if let Err(e) = result { - if let Some((_, err)) = failures.iter().find(|(id, _)| *id == e.id()) { - println!("{} {}\n{}", name.magenta(), msg.red(), err.bright_black()); - } - } - } - for (name, (result, time, msg)) in results { - match result { - Ok(_) => { - println!( - "{} {} {}", - name.magenta(), - "OK".green(), - format_args!("{}s", time.as_secs()).bright_black() - ); - } - Err(_) => { - println!( - "{} {} {}", - name.magenta(), - msg.red(), - format_args!("{}s", time.as_secs()).bright_black() - ); - } - } - } - println!("{} tests in {}s", len, start.elapsed().as_secs()); -} - -pub fn build_bin(p: &ProgressBar, name: &str, features: Option<&str>, bin_name: &str) { - p.set_message(bin_name.to_string()); - let mut args = vec!["build", "--bin", name, "--release"]; - if let Some(features) = features { - args.extend_from_slice(&["--features", features]); - } - let result = try_cmd_redirect("cargo", &args, "log/bin/build") - .unwrap() - .0 - .wait() - .unwrap(); - assert!(result.success()); - std::fs::rename( - format!("target/release/{name}"), - format!("log/bin/{bin_name}"), - ) - .unwrap(); -} diff --git a/instrumentation/src/utils.rs b/instrumentation/src/utils.rs @@ -1,555 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::{ - fmt::Display, - io::Write as _, - net::{Ipv4Addr, SocketAddrV4, TcpListener}, - ops::{Deref, DerefMut}, - path::{Path, PathBuf}, - process::{Child, Command, Stdio}, - str::FromStr, - sync::Arc, - time::Duration, -}; - -use common::{ - taler_common::{ - api_common::{EddsaPublicKey, ShortHashCode}, - api_wire::{IncomingBankTransaction, IncomingHistory, OutgoingHistory, TransferRequest}, - types::{amount::Amount, base32::Base32, payto::PaytoURI}, - }, - url::Url, -}; -use indicatif::ProgressBar; -use ini::Ini; -use tempfile::TempDir; - -const LOG: &str = "INFO"; - -#[must_use] -pub async fn check_incoming(base_url: &str, txs: &[(EddsaPublicKey, Amount)]) -> bool { - let mut res = ureq::get(&format!("{base_url}history/incoming")) - .query("delta", format!("-{}", 100)) - .call() - .unwrap(); - if txs.is_empty() { - res.status() == 204 - } else { - if res.status() != 200 { - return false; - } - let history: IncomingHistory = res.body_mut().read_json().unwrap(); - - history.incoming_transactions.len() == txs.len() - && txs.iter().all(|(reserve_pub_key, taler_amount)| { - history.incoming_transactions.iter().any(|h| { - matches!( - h, - IncomingBankTransaction::Reserve { - reserve_pub, - amount, - .. - } if reserve_pub == reserve_pub_key && amount == taler_amount - ) - }) - }) - } -} - -#[must_use] -pub async fn check_gateway_down(base_url: &str) -> bool { - matches!( - ureq::get(&format!("{base_url}history/incoming")) - .query("delta", "-5") - .call(), - Err(ureq::Error::StatusCode(504 | 502)) - ) -} - -#[must_use] -pub async fn check_gateway_up(base_url: &str) -> bool { - ureq::get(&format!("{base_url}config")).call().is_ok() -} - -pub async fn transfer(base_url: &str, wtid: &[u8; 32], credit_account: PaytoURI, amount: &Amount) { - loop { - let res = ureq::post(&format!("{base_url}transfer")).send_json(TransferRequest { - request_uid: Base32::rand(), - amount: amount.clone(), - exchange_base_url: Url::parse("https://exchange.test/").unwrap(), - wtid: Base32::from(*wtid), - credit_account: credit_account.clone(), - }); - if !matches!(res, Err(ureq::Error::StatusCode(502))) { - res.unwrap(); - break; - } - } -} - -#[must_use] -pub async fn check_outgoing(base_url: &str, txs: &[(ShortHashCode, Amount)]) -> bool { - let mut res = ureq::get(format!("{base_url}history/outgoing")) - .query("delta", format!("-{}", txs.len())) - .call() - .unwrap(); - if txs.is_empty() { - res.status() == 204 - } else { - if res.status() != 200 { - return false; - } - let history: OutgoingHistory = res.body_mut().read_json().unwrap(); - - history.outgoing_transactions.len() == txs.len() - && txs.iter().all(|(wtid, amount)| { - history - .outgoing_transactions - .iter() - .any(|h| h.wtid == *wtid && &h.amount == amount) - }) - } -} -pub struct ChildGuard(pub Child); - -impl Drop for ChildGuard { - fn drop(&mut self) { - self.0.kill().ok(); - } -} - -#[track_caller] -pub fn try_cmd_redirect( - cmd: &str, - args: &[&str], - path: impl AsRef<Path>, -) -> std::io::Result<ChildGuard> { - let log_file = std::fs::OpenOptions::new() - .create(true) - .append(true) - .open(path)?; - - let child = Command::new(cmd) - .args(args) - .stderr(log_file.try_clone()?) - .stdout(log_file) - .stdin(Stdio::null()) - .spawn()?; - Ok(ChildGuard(child)) -} - -#[track_caller] -pub fn cmd_redirect(cmd: &str, args: &[&str], path: impl AsRef<Path>) -> ChildGuard { - try_cmd_redirect(cmd, args, path).unwrap() -} - -#[track_caller] -pub fn cmd_ok(mut child: ChildGuard, name: &str) { - let result = child.0.wait().unwrap(); - if !result.success() { - panic!("cmd {name} failed"); - } -} - -#[track_caller] -pub fn cmd_redirect_ok(cmd: &str, args: &[&str], path: impl AsRef<Path>, name: &str) { - cmd_ok(cmd_redirect(cmd, args, path), name) -} - -#[macro_export] -macro_rules! retry_opt { - ($expr:expr) => { - async { - let start = std::time::Instant::now(); - loop { - let result = $expr; - if result.is_err() && start.elapsed() < std::time::Duration::from_secs(30) { - tokio::time::sleep(std::time::Duration::from_millis(500)).await; - } else { - return result.unwrap(); - } - } - } - .await - }; -} - -#[macro_export] -macro_rules! retry { - ($expr:expr) => { - $crate::retry_opt! { - $expr.then_some(()).ok_or("failure") - } - }; -} - -#[derive(Clone)] -pub struct TestCtx { - pub name: String, - pub log_dir: PathBuf, - pub pb: ProgressBar, - pub db: Arc<TestDb>, -} - -impl TestCtx { - pub fn new(name: &str, pb: ProgressBar, db: Arc<TestDb>) -> Self { - // Create log dir - let log_dir = PathBuf::from_str("log").unwrap().join(name); - std::fs::remove_dir_all(&log_dir).ok(); - std::fs::create_dir_all(&log_dir).unwrap(); - - Self { - name: name.to_owned(), - log_dir, - pb, - db, - } - } - - pub fn log(&self, name: &str) -> PathBuf { - self.log_dir.join(format!("{name}.log")) - } - - pub fn step(&self, disp: impl Display) { - self.pb.set_message(format!("{disp}")) - } - - /* ----- Database ----- */ - - pub fn stop_db(&mut self) { - self.db.stop_db(&self.name); - } - - pub fn resume_db(&mut self) { - self.db.resume_db(&self.name); - } -} - -pub struct TalerCtx { - _dir: TempDir, - pub wire_dir: PathBuf, - pub wire2_dir: PathBuf, - pub conf: PathBuf, - ctx: TestCtx, - pub wire_bin_path: String, - stressed: bool, - gateway: Option<ChildGuard>, - pub gateway_url: String, - wire: Option<ChildGuard>, - wire2: Option<ChildGuard>, - pub gateway_port: u16, -} - -impl TalerCtx { - pub fn new(ctx: &TestCtx, wire_name: impl Into<String>, config: &str, stressed: bool) -> Self { - // Create temporary dir - let dir = TempDir::new().unwrap(); - let conf = dir.path().join("taler.conf"); - - // Create common dirs - let wire_dir = dir.path().join("wire"); - let wire2_dir = dir.path().join("wire2"); - for dir in [&wire_dir, &wire2_dir] { - std::fs::create_dir_all(dir).unwrap(); - } - - // Find unused port - let gateway_port = unused_port(); - let gateway_url = format!("http://localhost:{gateway_port}/taler-wire-gateway/"); - - // Generate taler config from base - let wire_name = wire_name.into(); - let config = PathBuf::from_str("instrumentation/conf") - .unwrap() - .join(config); - let mut cfg = ini::Ini::load_from_file(config).unwrap(); - cfg.with_section(Some("exchange-accountcredentials-admin")) - .set("WIRE_GATEWAY_URL", &gateway_url); - cfg.with_section(Some(format!("{wire_name}db-postgres"))) - .set("CONFIG", ctx.db.postgres_uri(&ctx.name)); - cfg.with_section(Some(format!("{wire_name}-httpd"))) - .set("PORT", gateway_port.to_string()); - - cfg.write_to_file(&conf).unwrap(); - - Self { - _dir: dir, - ctx: ctx.clone(), - gateway_url, - wire_dir, - wire2_dir, - conf, - wire_bin_path: if stressed { - format!("log/bin/{wire_name}-fail") - } else { - format!("log/bin/{wire_name}") - }, - stressed, - gateway: None, - wire: None, - wire2: None, - gateway_port, - } - } - - pub fn dbinit(&self) { - self.db.create_db(&self.ctx.name); - // Init db - cmd_redirect_ok( - &self.wire_bin_path, - &["-c", self.conf.to_string_lossy().as_ref(), "dbinit"], - self.log("cmd"), - "wire dbinit", - ); - } - - pub fn reset_db(&self) { - // Reset db - cmd_redirect_ok( - &self.wire_bin_path, - &["-c", self.conf.to_string_lossy().as_ref(), "dbinit", "-r"], - self.log("cmd"), - "wire dbinit reset", - ); - } - - pub fn setup(&self) { - // Init db - cmd_redirect_ok( - &self.wire_bin_path, - &["-c", self.conf.to_string_lossy().as_ref(), "setup"], - self.log("cmd"), - "wire setup", - ); - } - - pub async fn run(&mut self) { - // Run gateway - self.gateway = Some(cmd_redirect( - &self.wire_bin_path, - &[ - "-c", - self.conf.to_string_lossy().as_ref(), - "-L", - LOG, - "serve", - ], - self.log("gateway"), - )); - - // Start wires - self.wire = Some(cmd_redirect( - &self.wire_bin_path, - &[ - "-c", - self.conf.to_string_lossy().as_ref(), - "-L", - LOG, - "worker", - ], - self.log("worker"), - )); - self.wire2 = self.stressed.then(|| { - cmd_redirect( - &self.wire_bin_path, - &[ - "-c", - self.conf.to_string_lossy().as_ref(), - "-L", - LOG, - "worker", - ], - self.log("worker+"), - ) - }); - - // Wait for gateway to be up - retry_opt! { - tokio::net::TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.gateway_port)).await - }; - } - - /* ----- Process ----- */ - - #[must_use] - pub fn wire_running(&mut self) -> bool { - self.wire.as_mut().unwrap().0.try_wait().unwrap().is_none() - } - - #[must_use] - pub fn gateway_running(&mut self) -> bool { - self.gateway - .as_mut() - .unwrap() - .0 - .try_wait() - .unwrap() - .is_none() - } - - /* ----- Wire Gateway -----*/ - - pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) { - retry! { check_incoming(&self.gateway_url, txs).await } - } - - pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) { - retry! { check_outgoing(&self.gateway_url, txs).await } - } - - pub async fn expect_gateway_up(&self) { - retry! { check_gateway_up(&self.gateway_url).await } - } - - pub async fn expect_gateway_down(&self) { - retry! { check_gateway_down(&self.gateway_url).await } - } -} - -impl Deref for TalerCtx { - type Target = TestCtx; - - fn deref(&self) -> &Self::Target { - &self.ctx - } -} - -impl DerefMut for TalerCtx { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.ctx - } -} - -pub fn unused_port() -> u16 { - TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)) - .unwrap() - .local_addr() - .unwrap() - .port() -} - -pub struct TestDb { - pub dir: TempDir, - _db: ChildGuard, -} - -impl TestDb { - pub fn new() -> Self { - let dir = TempDir::new().unwrap(); - let log = "log/postgres.log"; - - // Init databases files - cmd_redirect_ok( - "initdb", - &[dir.path().to_string_lossy().as_ref()], - log, - "init_db", - ); - // Generate database config - std::fs::write( - dir.path().join("postgresql.conf"), - format!( - " - listen_addresses='' - unix_socket_directories='{}' - fsync=off - synchronous_commit=off - full_page_writes=off - ", - dir.path().to_string_lossy().as_ref() - ), - ) - .unwrap(); - let db = cmd_redirect( - "postgres", - &["-D", dir.path().to_string_lossy().as_ref()], - log, - ); - let tmp = Self { dir, _db: db }; - // Wait for postgres to start - for _ in 0..10 { - if tmp.execute_sql("SELECT true") { - break; - } - std::thread::sleep(Duration::from_millis(500)) - } - tmp - } - - pub fn postgres_uri(&self, database: &str) -> String { - format!( - "postgres:///{database}?host={}", - self.dir.path().to_string_lossy().as_ref() - ) - } - - fn execute_sql(&self, sql: &str) -> bool { - let mut psql = ChildGuard( - Command::new("psql") - .arg(self.postgres_uri("postgres")) - .stderr(Stdio::null()) - .stdout( - std::fs::File::options() - .append(true) - .create(true) - .open("log/postgres.log") - .unwrap(), - ) - .stdin(Stdio::piped()) - .spawn() - .unwrap(), - ); - psql.0 - .stdin - .as_mut() - .unwrap() - .write_all(sql.as_bytes()) - .unwrap(); - psql.0.wait().unwrap().success() - } - - pub fn create_db(&self, name: &str) { - self.execute_sql(&format!( - " - DROP DATABASE IF EXISTS {name}; - CREATE DATABASE {name}; - " - )); - } - - pub fn stop_db(&self, name: &str) { - self.execute_sql(&format!( - " - UPDATE pg_database SET datallowconn=false WHERE datname='{name}'; - SELECT pg_terminate_backend(pid) - FROM pg_stat_activity - WHERE datname='{name}' AND pid <> pg_backend_pid(); - " - )); - } - pub fn resume_db(&self, name: &str) { - self.execute_sql(&format!( - "UPDATE pg_database SET datallowconn=true WHERE datname='{name}';" - )); - } -} - -pub fn patch_config(from: impl AsRef<Path>, to: impl AsRef<Path>, patch: impl FnOnce(&mut Ini)) { - let mut cfg = ini::Ini::load_from_file(from).unwrap(); - patch(&mut cfg); - cfg.write_to_file(to).unwrap(); -} diff --git a/makefile b/makefile @@ -42,7 +42,7 @@ check: install-nobuild-files .PHONY: test test: install-nobuild-files - RUST_BACKTRACE=true cargo run --profile dev --bin instrumentation + RUST_BACKTRACE=true cargo run --profile dev --bin testbench -- instrumentation .PHONY: doc doc: diff --git a/testbench/Cargo.toml b/testbench/Cargo.toml @@ -0,0 +1,31 @@ +[package] +name = "testbench" +version = "0.1.0" +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license-file.workspace = true + +[dependencies] +# Cli args parser +clap.workspace = true +common = { path = "../common" } +# Bitcoin +depolymerizer-bitcoin = { path = "../depolymerizer-bitcoin" } +bitcoin.workspace = true +# Wire Gateway +ureq = { version = "3.0.0", features = ["json"] } +# terminal color +owo-colors = "4.0.0" +# Edit toml files +rust-ini = "0.21.0" +# Progress reporting +indicatif = "0.18.0" +reedline = "0.41.0" +taler-common.workspace = true +tokio.workspace = true +anyhow.workspace = true +tracing-subscriber.workspace = true +tracing.workspace = true +sqlx.workspace = true diff --git a/instrumentation/conf/bitcoin.conf b/testbench/conf/bitcoin.conf diff --git a/testbench/conf/bitcoindev.conf b/testbench/conf/bitcoindev.conf @@ -0,0 +1,4 @@ +regtest=1 +txindex=1 +rpcservertimeout=0 +rpcport=18345 +\ No newline at end of file diff --git a/instrumentation/conf/taler_btc.conf b/testbench/conf/taler_btc.conf diff --git a/instrumentation/conf/taler_btc_bump.conf b/testbench/conf/taler_btc_bump.conf diff --git a/testbench/conf/taler_btc_dev.conf b/testbench/conf/taler_btc_dev.conf @@ -0,0 +1,17 @@ +[depolymerizer-bitcoin] +CURRENCY = DEVBTC +NAME = Test + +[depolymerizer-bitcoin-worker] +WALLET_NAME = wire +RPC_COOKIE_FILE = testbenches/btc-regtest/bitcoin/testnet4/.cookie +RPC_BIND = 127.0.0.1:18345 +CONFIRMATION = 3 + +[depolymerizer-bitcoin-httpd-wire-gateway-api] +ENABLED = YES +AUTH_METHOD = none + +[depolymerizer-bitcoin-httpd-revenue-api] +ENABLED = YES +AUTH_METHOD = none diff --git a/instrumentation/conf/taler_btc_lifetime.conf b/testbench/conf/taler_btc_lifetime.conf diff --git a/instrumentation/conf/taler_eth.conf b/testbench/conf/taler_eth.conf diff --git a/instrumentation/conf/taler_eth_bump.conf b/testbench/conf/taler_eth_bump.conf diff --git a/instrumentation/conf/taler_eth_lifetime.conf b/testbench/conf/taler_eth_lifetime.conf diff --git a/testbench/src/btc.rs b/testbench/src/btc.rs @@ -0,0 +1,1062 @@ +/* + This file is part of TALER + Copyright (C) 2022-2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::{ + net::{IpAddr, Ipv4Addr, SocketAddr}, + ops::{Deref, DerefMut}, + path::Path, + time::Duration, +}; + +use bitcoin::{Address, Amount}; +use common::taler_common::{ + api_common::{EddsaPublicKey, ShortHashCode}, + types::base32::Base32, +}; +use depolymerizer_bitcoin::{ + CONFIG_SOURCE, + config::{RpcAuth, RpcCfg, ServeCfg, WorkerCfg}, + payto::BtcWallet, + rpc::{Category, Rpc}, + rpc_utils::segwit_min_amount, + taler_utils::btc_to_taler, +}; +use ini::Ini; +use taler_common::{config::Config, types::payto::Payto}; + +use crate::{ + retry, retry_opt, + utils::{ChildGuard, TalerCtx, TestCtx, cmd_redirect, patch_config, transfer, unused_port}, +}; + +pub struct BtcCtx { + btc_node: ChildGuard, + _btc_node2: ChildGuard, + common_rpc: Rpc, + common_rpc2: Rpc, + wire_rpc: Rpc, + client_rpc: Rpc, + reserve_rpc: Rpc, + wire_addr: Address, + pub client_addr: Address, + reserve_addr: Address, + worker_cfg: WorkerCfg, + serve_cfg: ServeCfg, + conf: u16, + ctx: TalerCtx, + node2_addr: String, +} + +impl Deref for BtcCtx { + type Target = TalerCtx; + + fn deref(&self) -> &Self::Target { + &self.ctx + } +} + +impl DerefMut for BtcCtx { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.ctx + } +} + +impl BtcCtx { + pub async fn config( + ctx: &TestCtx, + btc_patch: impl FnOnce(&mut Ini, &Path), + cfg_patch: impl FnOnce(&mut Ini, &Path), + ) { + // Patch configs + let port = unused_port(); + let rpc_port = unused_port(); + + patch_config( + "testbench/conf/bitcoin.conf", + ctx.dir.join("bitcoin.conf"), + |cfg| { + cfg.with_section(Some("regtest")) + .set("bind", format!("127.0.0.1:{port}")) + .set("rpcport", format!("{rpc_port}")); + btc_patch(cfg, &ctx.dir) + }, + ); + patch_config( + "testbench/conf/taler_btc.conf", + ctx.dir.join("config.conf"), + |cfg| { + cfg.with_section(Some("depolymerizer-bitcoin-worker")) + .set("RPC_BIND", format!("127.0.0.1:{rpc_port}")); + cfg_patch(cfg, &ctx.dir) + }, + ); + // Load config + let cfg = Config::from_file(CONFIG_SOURCE, Some(ctx.dir.join("config.conf"))).unwrap(); + let rpc_cfg = RpcCfg::parse(&cfg).unwrap(); + // Start bitcoin nodes + let _node = cmd_redirect( + "bitcoind", + &[&format!("-datadir={}", ctx.dir.to_string_lossy())], + ctx.log("bitcoind"), + ); + // Connect + retry_opt! { + async { + let mut client = Rpc::common(&rpc_cfg).await?; + client.get_blockchain_info().await?; + Ok::<_, anyhow::Error>(()) + }.await + }; + } + + fn patch_btc_config(from: impl AsRef<Path>, to: impl AsRef<Path>, port: u16, rpc_port: u16) { + patch_config(from, to, |cfg| { + cfg.with_section(Some("regtest")) + .set("bind", format!("127.0.0.1:{port}")) + .set("rpcport", format!("{rpc_port}")); + }) + } + + pub async fn setup(ctx: &TestCtx, config: &str, stressed: bool) -> Self { + let mut ctx = TalerCtx::new(ctx, "depolymerizer-bitcoin", config, stressed); + + // Choose unused port + let btc_port = unused_port(); + let btc_rpc_port = unused_port(); + let btc2_port = unused_port(); + let btc2_rpc_port = unused_port(); + + // Bitcoin config + Self::patch_btc_config( + "testbench/conf/bitcoin.conf", + ctx.wire_dir.join("bitcoin.conf"), + btc_port, + btc_rpc_port, + ); + Self::patch_btc_config( + "testbench/conf/bitcoin.conf", + ctx.wire2_dir.join("bitcoin.conf"), + btc2_port, + btc2_rpc_port, + ); + patch_config(&ctx.conf, &ctx.conf, |cfg| { + cfg.with_section(Some("depolymerizer-bitcoin-worker")) + .set("RPC_BIND", format!("127.0.0.1:{btc_rpc_port}")) + .set("WALLET_NAME", "wire") + .set( + "RPC_COOKIE_FILE", + ctx.wire_dir.join("regtest/.cookie").to_string_lossy(), + ); + }); + + // Load config + let config = Config::from_file(CONFIG_SOURCE, Some(&ctx.conf)).unwrap(); + let cfg = WorkerCfg::parse(&config).unwrap(); + // Start bitcoin nodes + let btc_node = cmd_redirect( + "bitcoind", + &[&format!("-datadir={}", ctx.wire_dir.to_string_lossy())], + ctx.log("bitcoind"), + ); + let _btc_node2 = cmd_redirect( + "bitcoind", + &[&format!("-datadir={}", ctx.wire2_dir.to_string_lossy())], + ctx.log("bitcoind2"), + ); + + // Setup wallets + let mut common_rpc = retry_opt! { Rpc::common(&cfg.rpc_cfg).await }; + retry_opt! { common_rpc.get_blockchain_info().await }; + let node2_addr = format!("127.0.0.1:{btc2_port}"); + common_rpc.add_node(&node2_addr).await.unwrap(); + + for name in ["wire", "client", "reserve"] { + common_rpc.create_wallet(name, "").await.unwrap(); + } + let common_rpc2 = retry_opt! { + Rpc::common(&RpcCfg { + addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), btc2_rpc_port), + auth: RpcAuth::Cookie(ctx + .wire2_dir + .join("regtest/.cookie") + .to_string_lossy() + .to_string()), + }).await + }; + + // Generate money + let mut reserve_rpc = Rpc::wallet(&cfg.rpc_cfg, "reserve").await.unwrap(); + let mut client_rpc = Rpc::wallet(&cfg.rpc_cfg, "client").await.unwrap(); + let mut wire_rpc = Rpc::wallet(&cfg.rpc_cfg, "wire").await.unwrap(); + let reserve_addr = reserve_rpc.gen_addr().await.unwrap(); + let client_addr = client_rpc.gen_addr().await.unwrap(); + let wire_addr = wire_rpc.gen_addr().await.unwrap(); + common_rpc.mine(101, &reserve_addr).await.unwrap(); + reserve_rpc + .send(&client_addr, &(Amount::ONE_BTC * 10), None, false) + .await + .unwrap(); + common_rpc.mine(1, &reserve_addr).await.unwrap(); + + patch_config(&ctx.conf, &ctx.conf, |cfg| { + cfg.with_section(Some("depolymerizer-bitcoin")) + .set("NAME", "Exchange Owner") + .set("WALLET", wire_addr.to_string()); + }); + + let config = Config::from_file(CONFIG_SOURCE, Some(&ctx.conf)).unwrap(); + let serve_cfg = ServeCfg::parse(&config).unwrap(); + + // Setup & run + ctx.dbinit(); + ctx.setup(); + ctx.run().await; + + Self { + ctx, + btc_node, + common_rpc, + wire_rpc, + client_rpc, + reserve_rpc, + wire_addr, + client_addr, + reserve_addr, + conf: cfg.confirmation as u16, + worker_cfg: cfg, + serve_cfg, + _btc_node2, + common_rpc2, + node2_addr, + } + } + + pub fn reset_db(&mut self) { + self.ctx.reset_db(); + self.ctx.setup(); + } + + pub async fn stop_node(&mut self) { + // We need to kill bitcoin gracefully to avoid corruption + self.common_rpc.stop().await.unwrap(); + self.btc_node.0.wait().unwrap(); + } + + pub async fn cluster_deco(&mut self) { + self.common_rpc + .disconnect_node(&self.node2_addr) + .await + .unwrap(); + } + + pub async fn cluster_fork(&mut self) { + let node1_height = self.common_rpc.get_blockchain_info().await.unwrap().blocks; + let node2_height = self.common_rpc2.get_blockchain_info().await.unwrap().blocks; + let diff = node1_height - node2_height; + self.common_rpc2 + .mine((diff + 1) as u16, &self.reserve_addr) + .await + .unwrap(); + self.common_rpc.add_node(&self.node2_addr).await.unwrap(); + } + + pub async fn restart_node(&mut self, additional_args: &[&str]) { + self.stop_node().await; + self.resume_node(additional_args).await; + } + + pub async fn resume_node(&mut self, additional_args: &[&str]) { + let datadir = format!("-datadir={}", self.ctx.wire_dir.to_string_lossy()); + let mut args = vec![datadir.as_str()]; + args.extend_from_slice(additional_args); + self.btc_node = cmd_redirect("bitcoind", &args, self.ctx.log("bitcoind")); + self.common_rpc = retry_opt! { Rpc::common(&self.worker_cfg.rpc_cfg).await }; + self.common_rpc.add_node(&self.node2_addr).await.unwrap(); + for name in ["client", "reserve", "wire"] { + self.common_rpc.load_wallet(name).await.ok(); + } + + self.reserve_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "reserve") + .await + .unwrap(); + self.client_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "client") + .await + .unwrap(); + self.wire_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "wire").await.unwrap(); + } + + /* ----- Transaction ------ */ + + pub async fn credit(&mut self, amount: Amount, metadata: &EddsaPublicKey) { + self.client_rpc + .send_segwit_key(&self.wire_addr, &amount, metadata) + .await + .unwrap(); + } + + pub async fn debit(&mut self, amount: Amount, metadata: &ShortHashCode) { + transfer( + &self.ctx.gateway_url, + metadata, + Payto::new(BtcWallet(self.client_addr.clone())) + .as_payto() + .as_full_payto("name"), + &btc_to_taler(&amount.to_signed().unwrap(), &self.worker_cfg.currency), + ) + .await + } + + pub async fn malformed_credit(&mut self, amount: &Amount) { + self.client_rpc + .send(&self.wire_addr, amount, None, false) + .await + .unwrap(); + } + + pub async fn reset_wallet(&mut self) { + let amount = self.wire_balance().await; + self.wire_rpc + .send(&self.client_addr, &amount, None, true) + .await + .unwrap(); + self.next_block().await; + } + + async fn abandon(rpc: &mut Rpc) { + let list = rpc.list_since_block(None, 1).await.unwrap(); + for tx in list.transactions { + if tx.category == Category::Send && tx.confirmations == 0 { + rpc.abandon_tx(&tx.txid).await.unwrap(); + } + } + } + + pub async fn abandon_wire(&mut self) { + Self::abandon(&mut self.wire_rpc).await; + } + + pub async fn abandon_client(&mut self) { + Self::abandon(&mut self.client_rpc).await; + } + + /* ----- Mining ----- */ + + async fn mine(&mut self, nb: u16) { + self.common_rpc.mine(nb, &self.reserve_addr).await.unwrap(); + } + + pub async fn next_conf(&mut self) { + self.mine(self.conf).await + } + + pub async fn next_block(&mut self) { + self.mine(1).await + } + + /* ----- Balances ----- */ + + pub async fn client_balance(&mut self) -> Amount { + self.client_rpc.get_balance().await.unwrap() + } + + pub async fn wire_balance(&mut self) -> Amount { + self.wire_rpc.get_balance().await.unwrap() + } + + pub async fn expect_client_balance(&mut self, balance: Amount, mine: bool) { + retry! {{ + let check = balance == self.client_balance().await; + if !check && mine { + self.next_block().await; + } + check + }} + } + + pub async fn expect_wire_balance(&mut self, balance: Amount, mine: bool) { + retry! {{ + let check = balance == self.wire_balance().await; + if !check && mine { + self.next_block().await; + } + check + }} + } + + /* ----- Wire Gateway ----- */ + + pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) { + let txs: Vec<_> = txs + .iter() + .map(|(metadata, amount)| { + ( + metadata.clone(), + btc_to_taler(&amount.to_signed().unwrap(), &self.worker_cfg.currency), + ) + }) + .collect(); + self.ctx.expect_credits(&txs).await + } + + pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) { + let txs: Vec<_> = txs + .iter() + .map(|(metadata, amount)| { + ( + metadata.clone(), + btc_to_taler(&amount.to_signed().unwrap(), &self.worker_cfg.currency), + ) + }) + .collect(); + self.ctx.expect_debits(&txs).await + } +} + +/// Test btc-wire correctly receive and send transactions on the blockchain +pub async fn wire(ctx: TestCtx) { + ctx.step("Setup"); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; + + ctx.step("Credit"); + { + // Send transactions + let mut balance = ctx.wire_balance().await; + let mut txs = Vec::new(); + for n in 10..100 { + let metadata = Base32::rand(); + let amount = Amount::from_sat(n * 1000); + ctx.credit(amount, &metadata).await; + txs.push((metadata, amount)); + balance += amount; + ctx.next_block().await; + } + ctx.next_conf().await; + ctx.expect_credits(&txs).await; + ctx.expect_wire_balance(balance, false).await; + }; + + ctx.step("Debit"); + { + let mut balance = ctx.client_balance().await; + let mut txs = Vec::new(); + for n in 10..100 { + let metadata = Base32::rand(); + let amount = Amount::from_sat(n * 100); + balance += amount; + ctx.debit(amount, &metadata).await; + txs.push((metadata, amount)); + } + ctx.next_block().await; + ctx.expect_debits(&txs).await; + ctx.expect_client_balance(balance, true).await; + } + + ctx.step("Bounce"); + { + ctx.reset_wallet().await; + // Send bad transactions + let mut balance = ctx.wire_balance().await; + for n in 10..40 { + ctx.malformed_credit(&Amount::from_sat(n * 1000)).await; + balance += ctx.worker_cfg.bounce_fee; + } + ctx.next_conf().await; + ctx.expect_wire_balance(balance, true).await; + } +} + +/// Check btc-wire and wire-gateway correctly stop when a lifetime limit is configured +pub async fn lifetime(ctx: TestCtx) { + ctx.step("Setup"); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc_lifetime.conf", false).await; + ctx.step("Check lifetime"); + // Start up + retry! { ctx.wire_running() && ctx.gateway_running() }; + // Consume wire lifetime + for _ in 0..=ctx.worker_cfg.lifetime.unwrap() + 2 { + ctx.credit(segwit_min_amount(), &Base32::rand()).await; + ctx.next_block().await; + tokio::time::sleep(Duration::from_millis(100)).await; + } + retry! { !ctx.wire_running() }; + // Consume gateway lifetime + for _ in 0..=ctx.serve_cfg.lifetime.unwrap() { + ctx.debit(segwit_min_amount(), &Base32::rand()).await; + ctx.next_block().await; + } + // End down + retry! { !ctx.wire_running() && !ctx.gateway_running() }; +} + +/// Check the capacity of wire-gateway and btc-wire to recover from database and node loss +pub async fn reconnect(ctx: TestCtx) { + ctx.step("Setup"); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; + + let mut credits = Vec::new(); + let mut debits = Vec::new(); + + ctx.step("With DB"); + { + let metadata = Base32::rand(); + let amount = Amount::from_sat(42000); + ctx.credit(amount, &metadata).await; + credits.push((metadata, amount)); + ctx.next_block().await; + ctx.next_conf().await; + ctx.expect_credits(&credits).await; + }; + + ctx.step("Without DB"); + { + ctx.stop_db(); + ctx.malformed_credit(&Amount::from_sat(24000)).await; + let metadata = Base32::rand(); + let amount = Amount::from_sat(40000); + ctx.credit(amount, &metadata).await; + credits.push((metadata, amount)); + ctx.stop_node().await; + ctx.expect_gateway_down().await; + } + + ctx.step("Reconnect DB"); + { + ctx.resume_db(); + ctx.resume_node(&[]).await; + let metadata = Base32::rand(); + let amount = Amount::from_sat(2000); + ctx.debit(amount, &metadata).await; + debits.push((metadata, amount)); + ctx.next_conf().await; + ctx.expect_debits(&debits).await; + ctx.expect_credits(&credits).await; + } + + ctx.step("Recover DB"); + { + let balance = ctx.wire_balance().await; + ctx.reset_db(); + ctx.next_block().await; + ctx.expect_debits(&debits).await; + ctx.expect_credits(&credits).await; + ctx.expect_wire_balance(balance, true).await; + } +} + +/// Test btc-wire ability to recover from errors in correctness critical paths and prevent concurrent sending +pub async fn stress(ctx: TestCtx) { + ctx.step("Setup"); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", true).await; + + let mut credits = Vec::new(); + let mut debits = Vec::new(); + + ctx.step("Credit"); + { + let mut balance = ctx.wire_balance().await; + for n in 10..30 { + let metadata = Base32::rand(); + let amount = Amount::from_sat(n * 1000); + ctx.credit(amount, &metadata).await; + credits.push((metadata, amount)); + balance += amount; + ctx.next_block().await; + } + ctx.next_conf().await; + ctx.expect_credits(&credits).await; + ctx.expect_wire_balance(balance, true).await; + }; + + ctx.step("Debit"); + { + let mut balance = ctx.client_balance().await; + for n in 10..30 { + let metadata = Base32::rand(); + let amount = Amount::from_sat(n * 100); + balance += amount; + ctx.debit(amount, &metadata).await; + debits.push((metadata, amount)); + } + ctx.next_block().await; + ctx.expect_debits(&debits).await; + ctx.expect_client_balance(balance, true).await; + } + + ctx.step("Bounce"); + { + ctx.reset_wallet().await; + let mut balance = ctx.wire_balance().await; + for n in 10..30 { + ctx.malformed_credit(&Amount::from_sat(n * 1000)).await; + balance += ctx.worker_cfg.bounce_fee; + } + ctx.next_conf().await; + ctx.expect_wire_balance(balance, true).await; + } + + ctx.step("Recover DB"); + { + let balance = ctx.wire_balance().await; + ctx.reset_db(); + ctx.next_block().await; + ctx.expect_debits(&debits).await; + ctx.expect_credits(&credits).await; + ctx.expect_wire_balance(balance, true).await; + } +} + +/// Test btc-wire ability to handle conflicting outgoing transactions +pub async fn conflict(tctx: TestCtx) { + tctx.step("Setup"); + let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; + + ctx.step("Conflict send"); + { + // Perform credit + let amount = Amount::from_sat(4200000); + ctx.credit(amount, &Base32::rand()).await; + ctx.next_conf().await; + ctx.expect_wire_balance(amount, true).await; + let client = ctx.client_balance().await; + let wire = ctx.wire_balance().await; + + // Perform debit + ctx.debit(Amount::from_sat(400000), &Base32::rand()).await; + retry! { ctx.wire_balance().await < wire }; + + // Abandon pending transaction + ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; + ctx.abandon_wire().await; + ctx.expect_client_balance(client, false).await; + ctx.expect_wire_balance(wire, false).await; + + // Generate conflict + ctx.debit(Amount::from_sat(500000), &Base32::rand()).await; + retry! { ctx.wire_balance().await < wire }; + + // Resend conflicting transaction + ctx.restart_node(&[]).await; + ctx.next_block().await; + let wire = ctx.wire_balance().await; + retry! { ctx.wire_balance().await < wire }; + } + + ctx.step("Setup"); + drop(ctx); + let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; + ctx.credit(Amount::from_sat(3000000), &Base32::rand()).await; + ctx.next_block().await; + + ctx.step("Conflict bounce"); + { + // Perform bounce + let wire = ctx.wire_balance().await; + let bounce_amount = Amount::from_sat(4000000); + ctx.malformed_credit(&bounce_amount).await; + ctx.next_conf().await; + let fee = ctx.worker_cfg.bounce_fee; + ctx.expect_wire_balance(wire + fee, true).await; + + // Abandon pending transaction + ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; + ctx.abandon_wire().await; + ctx.expect_wire_balance(wire + bounce_amount, false).await; + + // Generate conflict + let amount = Amount::from_sat(50000); + ctx.debit(amount, &Base32::rand()).await; + retry! { ctx.wire_balance().await < (wire + bounce_amount) }; + + // Resend conflicting transaction + ctx.restart_node(&[]).await; + let wire = ctx.wire_balance().await; + ctx.next_block().await; + retry! { ctx.wire_balance().await < wire }; + } +} + +/// Test btc-wire correctness when a blockchain reorganization occurs +pub async fn reorg(ctx: TestCtx) { + ctx.step("Setup"); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; + + ctx.step("Handle reorg incoming transactions"); + { + // Loose second bitcoin node + ctx.cluster_deco().await; + + // Perform credits + let before = ctx.wire_balance().await; + for n in 10..21 { + ctx.credit(Amount::from_sat(n * 10000), &Base32::rand()) + .await; + ctx.next_block().await; + } + let after = ctx.wire_balance().await; + + // Perform fork and check btc-wire hard error + ctx.expect_gateway_up().await; + ctx.cluster_fork().await; + ctx.expect_wire_balance(before, false).await; + ctx.expect_gateway_down().await; + + // Recover orphaned transaction + ctx.mine(12).await; + ctx.expect_wire_balance(after, false).await; + ctx.expect_gateway_up().await; + } + + ctx.step("Handle reorg outgoing transactions"); + { + // Loose second bitcoin node + ctx.cluster_deco().await; + + // Perform debits + let before = ctx.client_balance().await; + let mut after = ctx.client_balance().await; + for n in 10..21 { + let amount = Amount::from_sat(n * 100); + ctx.debit(amount, &Base32::rand()).await; + after += amount; + } + ctx.next_block().await; + ctx.expect_client_balance(after, true).await; + + // Perform fork and check btc-wire still up + ctx.expect_gateway_up().await; + ctx.cluster_fork().await; + ctx.expect_client_balance(before, false).await; + ctx.expect_gateway_up().await; + + // Recover orphaned transaction + ctx.next_conf().await; + ctx.expect_client_balance(after, false).await; + } + + ctx.step("Handle reorg bounce"); + { + ctx.reset_wallet().await; + + // Loose second bitcoin node + ctx.cluster_deco().await; + + // Perform bounce + let before = ctx.wire_balance().await; + let mut after = ctx.wire_balance().await; + for n in 10..21 { + ctx.malformed_credit(&Amount::from_sat(n * 1000)).await; + after += ctx.worker_cfg.bounce_fee; + } + ctx.next_conf().await; + ctx.expect_wire_balance(after, true).await; + + // Perform fork and check btc-wire hard error + ctx.expect_gateway_up().await; + ctx.cluster_fork().await; + ctx.expect_wire_balance(before, false).await; + ctx.expect_gateway_down().await; + + // Recover orphaned transaction + ctx.mine(12).await; + ctx.expect_wire_balance(after, false).await; + ctx.expect_gateway_up().await; + } +} + +/// Test btc-wire correctness when a blockchain reorganization occurs leading to past incoming transaction conflict +pub async fn hell(tctx: TestCtx) { + macro_rules! step { + ($ctx:ident, $action:expr) => { + // Loose second bitcoin node + $ctx.cluster_deco().await; + + // Perform action + $action; + + // Perform fork and check btc-wire hard error + $ctx.expect_gateway_up().await; + $ctx.cluster_fork().await; + $ctx.expect_gateway_down().await; + + // Generate conflict + $ctx.restart_node(&["-minrelaytxfee=0.001"]).await; + $ctx.abandon_client().await; + let amount = Amount::from_sat(54000); + $ctx.credit(amount, &Base32::rand()).await; + $ctx.expect_wire_balance(amount, true).await; + + // Check btc-wire suspend operation + let bounce_amount = Amount::from_sat(34000); + $ctx.malformed_credit(&bounce_amount).await; + $ctx.next_conf().await; + $ctx.expect_wire_balance(amount + bounce_amount, true).await; + $ctx.expect_gateway_down().await; + }; + } + + tctx.step("Setup"); + let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; + ctx.step("Handle reorg conflicting incoming credit"); + step!(ctx, { + let amount = Amount::from_sat(420000); + ctx.credit(amount, &Base32::rand()).await; + ctx.next_conf().await; + ctx.expect_wire_balance(amount, true).await; + }); + + drop(ctx); + tctx.step("Setup"); + let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; + ctx.step("Handle reorg conflicting incoming credit"); + step!(ctx, { + let amount = Amount::from_sat(420000); + ctx.malformed_credit(&amount).await; + ctx.next_conf().await; + let fee = ctx.worker_cfg.bounce_fee; + ctx.expect_wire_balance(fee, true).await; + }); +} + +/// Test btc-wire ability to learn and protect itself from blockchain behavior +pub async fn analysis(ctx: TestCtx) { + ctx.step("Setup"); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; + + ctx.step("Learn from reorg"); + + // Loose second bitcoin node + ctx.cluster_deco().await; + + // Perform credit + let before = ctx.wire_balance().await; + ctx.credit(Amount::from_sat(42000), &Base32::rand()).await; + ctx.next_conf().await; + let after = ctx.wire_balance().await; + + // Perform fork and check btc-wire hard error + ctx.expect_gateway_up().await; + ctx.cluster_fork().await; + ctx.expect_wire_balance(before, false).await; + ctx.expect_gateway_down().await; + + // Recover orphaned transaction + ctx.next_conf().await; + ctx.next_block().await; // Conf have changed + ctx.expect_wire_balance(after, false).await; + ctx.expect_gateway_up().await; + + // Loose second bitcoin node + ctx.cluster_deco().await; + + // Perform credit + let before = ctx.wire_balance().await; + ctx.credit(Amount::from_sat(42000), &Base32::rand()).await; + ctx.next_conf().await; + + // Perform fork and check btc-wire learned from previous attack + ctx.expect_gateway_up().await; + ctx.cluster_fork().await; + ctx.expect_wire_balance(before, false).await; + ctx.expect_gateway_up().await; +} + +/// Test btc-wire ability to handle stuck transaction correctly +pub async fn bumpfee(tctx: TestCtx) { + tctx.step("Setup"); + let mut ctx = BtcCtx::setup(&tctx, "taler_btc_bump.conf", false).await; + + // Perform credits to allow wire to perform debits latter + for n in 10..13 { + ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()) + .await; + ctx.next_block().await; + } + ctx.next_conf().await; + + ctx.step("Bump fee"); + { + // Perform debit + let mut client = ctx.client_balance().await; + let wire = ctx.wire_balance().await; + let amount = Amount::from_sat(40000); + ctx.debit(amount, &Base32::rand()).await; + retry! { ctx.wire_balance().await < wire }; + + // Bump min relay fee making the previous debit stuck + ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; + + // Check bump happen + client += amount; + ctx.expect_client_balance(client, true).await; + } + + ctx.step("Bump fee reorg"); + { + // Loose second bitcoin node + ctx.cluster_deco().await; + + // Perform debit + let mut client = ctx.client_balance().await; + let wire = ctx.wire_balance().await; + let amount = Amount::from_sat(40000); + ctx.debit(amount, &Base32::rand()).await; + retry! { ctx.wire_balance().await < wire }; + + // Bump min relay fee and fork making the previous debit stuck and problematic + ctx.cluster_fork().await; + ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; + + // Check bump happen + client += amount; + ctx.expect_client_balance(client, true).await; + } + ctx.step("Setup"); + drop(ctx); + let mut ctx = BtcCtx::setup(&tctx, "taler_btc_bump.conf", true).await; + + // Perform credits to allow wire to perform debits latter + for n in 10..61 { + ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()) + .await; + ctx.next_block().await; + } + ctx.next_conf().await; + + ctx.step("Bump fee stress"); + { + // Loose second bitcoin node + ctx.cluster_deco().await; + + // Perform debits + let client = ctx.client_balance().await; + let wire = ctx.wire_balance().await; + let mut total_amount = Amount::ZERO; + for n in 10..31 { + let amount = Amount::from_sat(n * 10000); + total_amount += amount; + ctx.debit(amount, &Base32::rand()).await; + } + retry! { ctx.wire_balance().await < wire - total_amount }; + + // Bump min relay fee making the previous debits stuck + ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; + + // Check bump happen + ctx.expect_client_balance(client + total_amount, true).await; + } +} + +/// Test btc-wire handle transaction fees exceeding limits +pub async fn maxfee(ctx: TestCtx) { + ctx.step("Setup"); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; + + // Perform credits to allow wire to perform debits latter + for n in 10..31 { + ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()) + .await; + ctx.next_block().await; + } + ctx.next_conf().await; + + let client = ctx.client_balance().await; + let wire = ctx.wire_balance().await; + let mut total_amount = Amount::ZERO; + + ctx.step("Too high fee"); + { + // Change fee config + ctx.restart_node(&["-maxtxfee=0.0000001", "-minrelaytxfee=0.0000001"]) + .await; + + // Perform debits + for n in 10..31 { + let amount = Amount::from_sat(n * 10000); + total_amount += amount; + ctx.debit(amount, &Base32::rand()).await; + } + ctx.mine(2).await; + + // Check no transaction happen + ctx.expect_wire_balance(wire, false).await; + ctx.expect_client_balance(client, false).await; + } + + ctx.step("Good feed"); + { + // Restore default config + ctx.restart_node(&[]).await; + + // Check transaction now have been made + ctx.expect_client_balance(client + total_amount, true).await; + } +} + +/// Test btc-wire ability to configure itself from bitcoin configuration +pub async fn config(ctx: TestCtx) { + // Connect with cookie files + ctx.step("Cookie"); + BtcCtx::config( + &ctx, + |btc, dir| { + btc.with_section(None::<&str>).set( + "rpccookiefile", + dir.join("catch_me_if_you_can").to_string_lossy(), + ); + }, + |cfg, dir| { + cfg.with_section(Some("depolymerizer-bitcoin-worker")).set( + "RPC_COOKIE_FILE", + dir.join("catch_me_if_you_can").to_string_lossy(), + ); + }, + ) + .await; + + // Connect with password + ctx.step("Password"); + BtcCtx::config( + &ctx, + |btc, _| { + btc.with_section(None::<&str>) + .set("rpcuser", "bob") + .set("rpcpassword", "password"); + }, + |cfg, _| { + cfg.with_section(Some("depolymerizer-bitcoin-worker")) + .set("RPC_AUTH_METHOD", "basic") + .set("RPC_USERNAME", "bob") + .set("RPC_PASSWORD", "password"); + }, + ) + .await; + + // Connect with token + ctx.step("Token"); + BtcCtx::config( + &ctx, + |btc, _| { + btc.with_section(None::<&str>) + .set("rpcauth", "bob:9641cec731e1fad1ded02e1d31536e44$36b8b8af0a38104997a57f017805ff56bf8963ae4a2ed40252ca0e0e070fc19e"); + }, + |cfg, _| { + cfg.with_section(Some("depolymerizer-bitcoin-worker")) + .set("RPC_AUTH_METHOD", "basic") + .set("RPC_USERNAME", "bob") + .set("RPC_PASSWORD", "password"); + }, + ).await; +} diff --git a/testbench/src/main.rs b/testbench/src/main.rs @@ -0,0 +1,561 @@ +/* + This file is part of TALER + Copyright (C) 2022-2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::{ + borrow::Cow, + panic::UnwindSafe, + path::{Path, PathBuf}, + str::FromStr, + string::String, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; + +use bitcoin::{Address, Txid, address::NetworkUnchecked}; +use clap::{Parser, ValueEnum}; +use common::url::Url; +use depolymerizer_bitcoin::{ + CONFIG_SOURCE, DB_SCHEMA, + cli::{Command, run}, + config::{WalletCfg, WorkerCfg, parse_db_cfg}, + db, + payto::BtcWallet, + rpc::{Error, ErrorCode, Rpc, rpc_common, rpc_wallet}, + taler_utils::taler_to_btc, +}; +use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; +use owo_colors::OwoColorize; +use reedline::{Prompt, Reedline, Signal}; +use sqlx::PgPool; +use taler_common::{ + api_common::{EddsaPublicKey, HashCode, ShortHashCode}, + api_wire::TransferRequest, + config::Config, + db::pool, + log::taler_logger, + types::{amount::amount, payto::FullPayto}, +}; +use tokio::task::{JoinError, JoinHandle}; +use tracing::{error, info}; +use tracing_subscriber::util::SubscriberInitExt as _; + +use crate::utils::{ChildGuard, LocalDb, TestCtx, cmd_redirect, patch_config, try_cmd_redirect}; + +mod btc; +mod utils; + +/// Depolymerizer instrumentation test +#[derive(clap::Parser, Debug)] +enum Testbench { + /// Start instrumentation tests for offline testing + Instrumentation { + /// With tests to run + #[clap(global = true, default_value = "")] + filters: Vec<String>, + }, + /// Start bitcoin testbench for online testing + Bitcoin { network: Network }, +} + +struct Tmp<'a> { + root: &'a Path, + filters: &'a [String], + m: MultiProgress, + start_style: ProgressStyle, + ok_style: ProgressStyle, + err_style: ProgressStyle, + tasks: Vec<( + &'static str, + JoinHandle<(Result<(), JoinError>, Duration, String)>, + )>, + db: Arc<LocalDb>, + start: Instant, +} + +impl<'a> Tmp<'a> { + async fn check<T, F>(&mut self, name: &'static str, task: T) + where + T: FnOnce(TestCtx) -> F + Send + UnwindSafe + 'static, + F: Future<Output = ()> + Send + 'static, + { + if self.filters.is_empty() || self.filters.iter().any(|f| name.starts_with(f)) { + let pb = self.m.add(ProgressBar::new_spinner()); + pb.set_style(self.start_style.clone()); + pb.set_prefix(name); + pb.set_message("Init"); + pb.enable_steady_tick(Duration::from_millis(1000)); + let ok_style = self.ok_style.clone(); + let err_style = self.err_style.clone(); + let start = self.start; + let db = self.db.clone(); + let ctx: TestCtx = TestCtx::new(self.root, name, pb.clone(), db); + self.tasks.push(( + name, + tokio::spawn(async move { + let result = tokio::spawn(task(ctx)).await; + if result.is_ok() { + pb.set_style(ok_style.clone()); + pb.finish_with_message("OK"); + } else { + pb.set_style(err_style.clone()); + pb.finish(); + } + (result, start.elapsed(), pb.message()) + }), + )); + } + } +} + +#[tokio::main] +pub async fn main() { + taler_logger(None).init(); + match Testbench::parse() { + Testbench::Instrumentation { filters } => instrumentation(filters).await, + Testbench::Bitcoin { network } => bitcoin(network).await, + } +} + +#[derive(Debug, Clone, ValueEnum, PartialEq, Eq)] +enum Network { + Local, + Test, +} +struct BtcEnv { + network: Network, + db: LocalDb, + bitcoind: ChildGuard, + wire_rpc: Rpc, + client_rpc: Rpc, + cfg: Config, + wire_addr: Address, + client_addr: Address, + tracked: Vec<Txid>, + pool: PgPool, +} + +impl BtcEnv { + pub async fn init(network: Network) -> Self { + let network_dir = match network { + Network::Local => "btc-local", + Network::Test => "btc-test", + }; + println!("Setup bitcoin {network:?} env in {network_dir}"); + let root_dir = PathBuf::from_str("testbench/env") + .unwrap() + .join(network_dir); + std::fs::create_dir_all(root_dir.join("bitcoin")).unwrap(); + let root_dir = root_dir.canonicalize().unwrap(); + + // Generate bitcoind config + let cfg = match network { + Network::Local => { + " + chain=regtest + txindex=1 + rpcservertimeout=0 + fallbackfee=0.00000001 + + [regtest] + rpcport=18345 + " + } + Network::Test => { + " + chain=signet + txindex=1 + rpcservertimeout=0 + fallbackfee=0.000001 + [signet] + rpcport=18345 + " + } + }; + std::fs::write(root_dir.join("bitcoin/bitcoin.conf"), cfg).unwrap(); + let datadir = match network { + Network::Local => root_dir.join("bitcoin").join("regtest"), + Network::Test => root_dir.join("bitcoin").join("signet"), + }; + + // Start database + let db = LocalDb::new(&root_dir); + + // Generate Taler config + let taler_cfg_path = root_dir.join("taler.conf"); + patch_config( + "testbench/conf/taler_btc_dev.conf", + &taler_cfg_path, + |ini| { + ini.with_section(Some("depolymerizer-bitcoin-worker")) + .set("RPC_COOKIE_FILE", datadir.to_string_lossy()); + ini.with_section(Some("depolymerizer-bitcoindb-postgres")) + .set("CONFIG", db.postgres_uri("depolymerizer_bitcoin")); + }, + ); + + // Start node + let bitcoind = cmd_redirect( + "bitcoind", + &[&format!( + "-datadir={}", + root_dir.join("bitcoin").to_string_lossy() + )], + root_dir.join("bitcoind.log"), + ); + let cfg = Config::from_file(CONFIG_SOURCE, Some(&taler_cfg_path)).unwrap(); + let worker_cfg = WorkerCfg::parse(&cfg).unwrap(); + let mut rpc = retry_opt! { rpc_common(&worker_cfg.rpc_cfg).await }; + + for wallet in ["wire", "client"] { + loop { + let res = rpc.load_wallet(wallet).await; + if let Err(Error::RPC { code, msg: _ }) = res { + match code { + ErrorCode::RpcInWarmup => continue, + ErrorCode::RpcWalletNotFound => { + rpc.create_wallet(wallet, "").await.unwrap(); + break; + } + _ => { + res.unwrap(); + } + } + } else { + break; + } + } + } + drop(rpc); + let mut wire_rpc = rpc_wallet( + &worker_cfg.rpc_cfg, + &WalletCfg { + name: "wire".to_string(), + password: None, + }, + ) + .await + .unwrap(); + let mut client_rpc = rpc_wallet( + &worker_cfg.rpc_cfg, + &WalletCfg { + name: "client".to_string(), + password: None, + }, + ) + .await + .unwrap(); + let wire_addr = wire_rpc.gen_addr().await.unwrap(); + let client_addr = client_rpc.gen_addr().await.unwrap(); + patch_config(&taler_cfg_path, &taler_cfg_path, |ini| { + ini.with_section(Some("depolymerizer-bitcoin")) + .set("WALLET", wire_addr.to_string()); + }); + let cfg = Config::from_file(CONFIG_SOURCE, Some(taler_cfg_path)).unwrap(); + + // Wait for db to start + db.wait_running(); + db.create_db("depolymerizer_bitcoin"); + + // Dbinit & setup + run(Command::Dbinit { reset: false }, &cfg).await.unwrap(); + run(Command::Setup { reset: false }, &cfg).await.unwrap(); + + let db_cg = parse_db_cfg(&cfg).unwrap(); + Self { + pool: pool(db_cg.cfg, DB_SCHEMA).await.unwrap(), + wire_addr, + client_addr, + network, + db, + bitcoind, + cfg, + wire_rpc, + client_rpc, + tracked: Vec::new(), + } + } +} + +#[derive(clap::Parser, Debug)] +enum Shell { + Setup, + Reset, + ResetDb, + Sync, + Credit, + Debit, + Mine { + amount: Option<u16>, + addr: Option<Address<NetworkUnchecked>>, + }, + Exit, + Tx { + txid: Txid, + }, + Track { + txid: Txid, + }, + Untrack { + txid: Txid, + }, +} + +async fn run_cmd(env: &mut BtcEnv, buffer: &str) -> anyhow::Result<bool> { + if buffer.is_empty() { + return Ok(false); + } + let cmd = Shell::try_parse_from(std::iter::once("shell").chain(buffer.split(' ')))?; + match cmd { + Shell::Setup => run(Command::Setup { reset: false }, &env.cfg).await?, + Shell::Reset => run(Command::Setup { reset: true }, &env.cfg).await?, + Shell::ResetDb => { + run(Command::Dbinit { reset: true }, &env.cfg).await?; + run(Command::Setup { reset: false }, &env.cfg).await?; + } + Shell::Sync => run(Command::Worker { transient: true }, &env.cfg).await?, + Shell::Credit => { + let reserve_pub = EddsaPublicKey::rand(); + let amount = amount("DEVBTC:0.00011"); + let txid = env + .client_rpc + .send_segwit_key(&env.wire_addr, &taler_to_btc(&amount), &reserve_pub) + .await?; + env.tracked.push(txid); + info!(target: "testbench", "Credit {reserve_pub} {amount} {txid} to {}", env.wire_addr); + } + Shell::Debit => { + let creditor = FullPayto::new(BtcWallet(env.client_addr.clone()), "client".to_string()); + let wtid = ShortHashCode::rand(); + let amount = amount("DEVBTC:0.0001"); + let transfer = TransferRequest { + request_uid: HashCode::rand(), + amount: amount.clone(), + exchange_base_url: Url::parse("https://test.com/").unwrap(), + wtid: wtid.clone(), + credit_account: creditor.as_payto(), + }; + db::transfer(&env.pool, &creditor, &transfer).await?; + info!(target: "testbench", "Debit {wtid} {amount} to {}", env.wire_addr); + } + Shell::Mine { amount, addr } => { + let amount = amount.unwrap_or(1); + let addr = addr.map(|a| a.assume_checked()); + let addr = addr.as_ref().unwrap_or(&env.client_addr); + env.client_rpc.mine(amount, addr).await?; + } + Shell::Exit => return Ok(true), + Shell::Tx { txid } => { + let info = env.client_rpc.get_tx(&txid).await?; + info!(target: "testbench", "{txid} {} {}", info.amount, info.confirmations); + } + Shell::Track { txid } => { + env.tracked.push(txid); + } + Shell::Untrack { txid } => { + env.tracked.retain(|id| *id != txid); + } + } + Ok(false) +} + +struct TestBenchPrompt { + name: String, + progress: String, +} + +impl Prompt for TestBenchPrompt { + fn render_prompt_left(&self) -> Cow<str> { + Cow::Borrowed(&self.name) + } + + fn render_prompt_right(&self) -> Cow<str> { + Cow::Borrowed(&self.progress) + } + + fn render_prompt_indicator(&self, _: reedline::PromptEditMode) -> Cow<str> { + Cow::Borrowed(">") + } + + fn render_prompt_multiline_indicator(&self) -> Cow<str> { + Cow::Borrowed(":") + } + + fn render_prompt_history_search_indicator(&self, _: reedline::PromptHistorySearch) -> Cow<str> { + Cow::Borrowed(">") + } +} + +async fn bitcoin(network: Network) { + let mut env = BtcEnv::init(network).await; + + let mut line_editor = Reedline::create(); + loop { + let info = env.client_rpc.get_blockchain_info().await.unwrap(); + let wire_balance = env.wire_rpc.get_balance().await.unwrap(); + println!("wire {} {wire_balance}", env.wire_addr); + let client_balance = env.client_rpc.get_balance().await.unwrap(); + println!("client {} {client_balance}", env.client_addr); + for txid in &env.tracked { + match env.client_rpc.get_tx(txid).await { + Ok(info) => println!( + "{} {txid} {} {}", + "tx".cyan(), + info.amount, + info.confirmations + ), + Err(e) => println!("{} {txid} {}", "tx".cyan(), e.red()), + } + } + let prompt = TestBenchPrompt { + name: info.chain, + progress: format!("{:.6}", info.verification_progress), + }; + let sig = line_editor.read_line(&prompt).unwrap(); + match sig { + Signal::Success(buffer) => match run_cmd(&mut env, &buffer).await { + Ok(exit) => { + if exit { + break; + } + } + Err(e) => error!(target: "testbench", "{e}"), + }, + Signal::CtrlC | Signal::CtrlD => break, + } + } +} + +pub async fn instrumentation(filters: Vec<String>) { + let root = PathBuf::from_str("testbench/instrumentation").unwrap(); + std::fs::remove_dir_all(&root).ok(); + std::fs::create_dir_all(root.join("bin")).unwrap(); + let root = root.canonicalize().unwrap(); + + // Set panic hook + let failures = Arc::new(Mutex::new(Vec::new())); + { + let failures = failures.clone(); + std::panic::set_hook(Box::new(move |e| { + let backtrace = std::backtrace::Backtrace::force_capture(); + let info = format!("{e}\n{backtrace}"); + if let Some(id) = tokio::task::try_id() { + failures.lock().unwrap().push((id, info)); + } else { + eprintln!("Failed outside of a task:\n{info}") + } + })); + } + + // Build binaries + let p = ProgressBar::new_spinner(); + p.set_style(ProgressStyle::with_template("building {msg} {elapsed:.dim}").unwrap()); + p.enable_steady_tick(Duration::from_millis(1000)); + for name in ["depolymerizer-bitcoin"] { + build_bin(&root, &p, name, None, name); + build_bin(&root, &p, name, Some("fail"), &format!("{name}-fail")); + } + p.finish_and_clear(); + + // Run tests + let m = MultiProgress::new(); + let start_style = + ProgressStyle::with_template("{prefix:.magenta} {msg} {elapsed:.dim}").unwrap(); + let ok_style = + ProgressStyle::with_template("{prefix:.magenta} {msg:.green} {elapsed:.dim}").unwrap(); + let err_style = + ProgressStyle::with_template("{prefix:.magenta} {msg:.red} {elapsed:.dim}").unwrap(); + + let start = Instant::now(); + let db = Arc::new(LocalDb::new(&root)); + let mut tmp = Tmp { + root: &root, + filters: filters.as_slice(), + m, + start_style, + ok_style, + err_style, + tasks: Vec::new(), + db, + start, + }; + tmp.check("btc_wire", btc::wire).await; + tmp.check("btc_lifetime", btc::lifetime).await; + tmp.check("btc_reconnect", btc::reconnect).await; + tmp.check("btc_stress", btc::stress).await; + tmp.check("btc_conflict", btc::conflict).await; + tmp.check("btc_reorg", btc::reorg).await; + tmp.check("btc_hell", btc::hell).await; + tmp.check("btc_analysis", btc::analysis).await; + tmp.check("btc_bumpfee", btc::bumpfee).await; + tmp.check("btc_maxfee", btc::maxfee).await; + tmp.check("btc_config", btc::config).await; + let mut results = Vec::new(); + for (name, task) in tmp.tasks { + results.push((name, task.await.unwrap())); + } + + let len = results.len(); + + tmp.m.clear().unwrap(); + let failures = failures.lock().unwrap(); + for (name, (result, _, msg)) in &results { + if let Err(e) = result { + if let Some((_, err)) = failures.iter().find(|(id, _)| *id == e.id()) { + println!("{} {}\n{}", name.magenta(), msg.red(), err.bright_black()); + } + } + } + for (name, (result, time, msg)) in results { + match result { + Ok(_) => { + println!( + "{} {} {}", + name.magenta(), + "OK".green(), + format_args!("{}s", time.as_secs()).bright_black() + ); + } + Err(_) => { + println!( + "{} {} {}", + name.magenta(), + msg.red(), + format_args!("{}s", time.as_secs()).bright_black() + ); + } + } + } + println!("{} tests in {}s", len, start.elapsed().as_secs()); +} + +pub fn build_bin(root: &Path, p: &ProgressBar, name: &str, features: Option<&str>, bin_name: &str) { + p.set_message(bin_name.to_string()); + let mut args = vec!["build", "--bin", name, "--release"]; + if let Some(features) = features { + args.extend_from_slice(&["--features", features]); + } + let result = try_cmd_redirect("cargo", &args, root.join("bin/build")) + .unwrap() + .0 + .wait() + .unwrap(); + assert!(result.success()); + std::fs::rename( + format!("target/release/{name}"), + root.join("bin").join(bin_name), + ) + .unwrap(); +} diff --git a/testbench/src/utils.rs b/testbench/src/utils.rs @@ -0,0 +1,560 @@ +/* + This file is part of TALER + Copyright (C) 2022-2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::{ + fmt::Display, + io::Write as _, + net::{Ipv4Addr, SocketAddrV4, TcpListener}, + ops::{Deref, DerefMut}, + path::{Path, PathBuf}, + process::{Child, Command, Stdio}, + str::FromStr, + sync::Arc, + time::Duration, +}; + +use common::{ + taler_common::{ + api_common::{EddsaPublicKey, ShortHashCode}, + api_wire::{IncomingBankTransaction, IncomingHistory, OutgoingHistory, TransferRequest}, + types::{amount::Amount, base32::Base32, payto::PaytoURI}, + }, + url::Url, +}; +use indicatif::ProgressBar; +use ini::Ini; + +const LOG: &str = "INFO"; + +#[must_use] +pub async fn check_incoming(base_url: &str, txs: &[(EddsaPublicKey, Amount)]) -> bool { + let mut res = ureq::get(&format!("{base_url}history/incoming")) + .query("delta", format!("-{}", 100)) + .call() + .unwrap(); + if txs.is_empty() { + res.status() == 204 + } else { + if res.status() != 200 { + return false; + } + let history: IncomingHistory = res.body_mut().read_json().unwrap(); + + history.incoming_transactions.len() == txs.len() + && txs.iter().all(|(reserve_pub_key, taler_amount)| { + history.incoming_transactions.iter().any(|h| { + matches!( + h, + IncomingBankTransaction::Reserve { + reserve_pub, + amount, + .. + } if reserve_pub == reserve_pub_key && amount == taler_amount + ) + }) + }) + } +} + +#[must_use] +pub async fn check_gateway_down(base_url: &str) -> bool { + matches!( + ureq::get(&format!("{base_url}history/incoming")) + .query("delta", "-5") + .call(), + Err(ureq::Error::StatusCode(504 | 502)) + ) +} + +#[must_use] +pub async fn check_gateway_up(base_url: &str) -> bool { + ureq::get(&format!("{base_url}config")).call().is_ok() +} + +pub async fn transfer(base_url: &str, wtid: &[u8; 32], credit_account: PaytoURI, amount: &Amount) { + loop { + let res = ureq::post(&format!("{base_url}transfer")).send_json(TransferRequest { + request_uid: Base32::rand(), + amount: amount.clone(), + exchange_base_url: Url::parse("https://exchange.test/").unwrap(), + wtid: Base32::from(*wtid), + credit_account: credit_account.clone(), + }); + if !matches!(res, Err(ureq::Error::StatusCode(502))) { + res.unwrap(); + break; + } + } +} + +#[must_use] +pub async fn check_outgoing(base_url: &str, txs: &[(ShortHashCode, Amount)]) -> bool { + let mut res = ureq::get(format!("{base_url}history/outgoing")) + .query("delta", format!("-{}", txs.len())) + .call() + .unwrap(); + if txs.is_empty() { + res.status() == 204 + } else { + if res.status() != 200 { + return false; + } + let history: OutgoingHistory = res.body_mut().read_json().unwrap(); + + history.outgoing_transactions.len() == txs.len() + && txs.iter().all(|(wtid, amount)| { + history + .outgoing_transactions + .iter() + .any(|h| h.wtid == *wtid && &h.amount == amount) + }) + } +} +pub struct ChildGuard(pub Child); + +impl Drop for ChildGuard { + fn drop(&mut self) { + self.0.kill().ok(); + } +} + +#[track_caller] +pub fn try_cmd_redirect( + cmd: &str, + args: &[&str], + path: impl AsRef<Path>, +) -> std::io::Result<ChildGuard> { + let log_file = std::fs::OpenOptions::new() + .create(true) + .append(true) + .open(path)?; + + let child = Command::new(cmd) + .args(args) + .stderr(log_file.try_clone()?) + .stdout(log_file) + .stdin(Stdio::null()) + .spawn()?; + Ok(ChildGuard(child)) +} + +#[track_caller] +pub fn cmd_redirect(cmd: &str, args: &[&str], path: impl AsRef<Path>) -> ChildGuard { + try_cmd_redirect(cmd, args, path).unwrap() +} + +#[track_caller] +pub fn cmd_ok(mut child: ChildGuard, name: &str) { + let result = child.0.wait().unwrap(); + if !result.success() { + panic!("cmd {name} failed"); + } +} + +#[track_caller] +pub fn cmd_redirect_ok(cmd: &str, args: &[&str], path: impl AsRef<Path>, name: &str) { + cmd_ok(cmd_redirect(cmd, args, path), name) +} + +#[macro_export] +macro_rules! retry_opt { + ($expr:expr) => { + async { + let start = std::time::Instant::now(); + loop { + let result = $expr; + if result.is_err() && start.elapsed() < std::time::Duration::from_secs(30) { + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } else { + return result.unwrap(); + } + } + } + .await + }; +} + +#[macro_export] +macro_rules! retry { + ($expr:expr) => { + $crate::retry_opt! { + $expr.then_some(()).ok_or("failure") + } + }; +} +#[derive(Clone)] +pub struct TestCtx { + pub name: String, + pub root: PathBuf, + pub dir: PathBuf, + pub pb: ProgressBar, + pub db: Arc<LocalDb>, +} + +impl TestCtx { + pub fn new( + root: impl Into<PathBuf>, + name: impl Into<String>, + pb: ProgressBar, + db: Arc<LocalDb>, + ) -> Self { + let root = root.into(); + let name = name.into(); + // Create log dir + let dir = root.join(&name); + std::fs::create_dir_all(&dir).unwrap(); + + Self { + name, + dir, + pb, + db, + root, + } + } + + pub fn log(&self, name: &str) -> PathBuf { + self.dir.join(format!("{name}.log")) + } + + pub fn step(&self, disp: impl Display) { + self.pb.set_message(format!("{disp}")) + } + + /* ----- Database ----- */ + + pub fn stop_db(&mut self) { + self.db.stop_db(&self.name); + } + + pub fn resume_db(&mut self) { + self.db.resume_db(&self.name); + } +} + +pub struct TalerCtx { + pub wire_dir: PathBuf, + pub wire2_dir: PathBuf, + pub conf: PathBuf, + ctx: TestCtx, + pub wire_bin_path: String, + stressed: bool, + gateway: Option<ChildGuard>, + pub gateway_url: String, + wire: Option<ChildGuard>, + wire2: Option<ChildGuard>, + pub gateway_port: u16, +} + +impl TalerCtx { + pub fn new(ctx: &TestCtx, wire_name: impl Into<String>, config: &str, stressed: bool) -> Self { + // Create temporary dir + let dir = ctx.dir.clone(); + let conf = dir.join("taler.conf"); + + // Create common dirs + let wire_dir = dir.join("wire"); + let wire2_dir = dir.join("wire2"); + for dir in [&wire_dir, &wire2_dir] { + std::fs::create_dir_all(dir).unwrap(); + } + + // Find unused port + let gateway_port = unused_port(); + let gateway_url = format!("http://localhost:{gateway_port}/taler-wire-gateway/"); + + // Generate taler config from base + let wire_name = wire_name.into(); + let config = PathBuf::from_str("testbench/conf").unwrap().join(config); + let mut cfg = ini::Ini::load_from_file(config).unwrap(); + cfg.with_section(Some("exchange-accountcredentials-admin")) + .set("WIRE_GATEWAY_URL", &gateway_url); + cfg.with_section(Some(format!("{wire_name}db-postgres"))) + .set("CONFIG", ctx.db.postgres_uri(&ctx.name)); + cfg.with_section(Some(format!("{wire_name}-httpd"))) + .set("PORT", gateway_port.to_string()); + + cfg.write_to_file(&conf).unwrap(); + + Self { + ctx: ctx.clone(), + gateway_url, + wire_dir, + wire2_dir, + conf, + wire_bin_path: if stressed { + ctx.root.join(format!("bin/{wire_name}-fail")) + } else { + ctx.root.join(format!("bin/{wire_name}")) + } + .to_string_lossy() + .to_string(), + stressed, + gateway: None, + wire: None, + wire2: None, + gateway_port, + } + } + + pub fn dbinit(&self) { + self.db.wait_running(); + self.db.create_db(&self.ctx.name); + // Init db + cmd_redirect_ok( + &self.wire_bin_path, + &["-c", self.conf.to_string_lossy().as_ref(), "dbinit"], + self.log("cmd"), + "wire dbinit", + ); + } + + pub fn reset_db(&self) { + // Reset db + cmd_redirect_ok( + &self.wire_bin_path, + &["-c", self.conf.to_string_lossy().as_ref(), "dbinit", "-r"], + self.log("cmd"), + "wire dbinit reset", + ); + } + + pub fn setup(&self) { + // Init db + cmd_redirect_ok( + &self.wire_bin_path, + &["-c", self.conf.to_string_lossy().as_ref(), "setup"], + self.log("cmd"), + "wire setup", + ); + } + + pub async fn run(&mut self) { + // Run gateway + self.gateway = Some(cmd_redirect( + &self.wire_bin_path, + &[ + "-c", + self.conf.to_string_lossy().as_ref(), + "-L", + LOG, + "serve", + ], + self.log("gateway"), + )); + + // Start wires + self.wire = Some(cmd_redirect( + &self.wire_bin_path, + &[ + "-c", + self.conf.to_string_lossy().as_ref(), + "-L", + LOG, + "worker", + ], + self.log("worker"), + )); + self.wire2 = self.stressed.then(|| { + cmd_redirect( + &self.wire_bin_path, + &[ + "-c", + self.conf.to_string_lossy().as_ref(), + "-L", + LOG, + "worker", + ], + self.log("worker+"), + ) + }); + + // Wait for gateway to be up + retry_opt! { + tokio::net::TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.gateway_port)).await + }; + } + + /* ----- Process ----- */ + + #[must_use] + pub fn wire_running(&mut self) -> bool { + self.wire.as_mut().unwrap().0.try_wait().unwrap().is_none() + } + + #[must_use] + pub fn gateway_running(&mut self) -> bool { + self.gateway + .as_mut() + .unwrap() + .0 + .try_wait() + .unwrap() + .is_none() + } + + /* ----- Wire Gateway -----*/ + + pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) { + retry! { check_incoming(&self.gateway_url, txs).await } + } + + pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) { + retry! { check_outgoing(&self.gateway_url, txs).await } + } + + pub async fn expect_gateway_up(&self) { + retry! { check_gateway_up(&self.gateway_url).await } + } + + pub async fn expect_gateway_down(&self) { + retry! { check_gateway_down(&self.gateway_url).await } + } +} + +impl Deref for TalerCtx { + type Target = TestCtx; + + fn deref(&self) -> &Self::Target { + &self.ctx + } +} + +impl DerefMut for TalerCtx { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.ctx + } +} + +pub fn unused_port() -> u16 { + TcpListener::bind(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0)) + .unwrap() + .local_addr() + .unwrap() + .port() +} + +pub struct LocalDb { + cluster_dir: String, + cmd_log: String, + cluster: ChildGuard, +} + +impl LocalDb { + pub fn new(root: &Path) -> Self { + let cluster_dir = root.join("db"); + let cluster_log = root.join("postgres.log").to_string_lossy().to_string(); + let cmd_log = root.join("postgres-cmd.log").to_string_lossy().to_string(); + if !std::fs::exists(&cluster_dir).unwrap() { + // Init databases files + cmd_redirect_ok( + "initdb", + &[cluster_dir.to_string_lossy().as_ref()], + &cluster_log, + "init_db", + ); + } + // Generate database config + std::fs::write( + cluster_dir.join("postgresql.conf"), + format!( + " + listen_addresses='' + unix_socket_directories='{}' + fsync=off + synchronous_commit=off + full_page_writes=off + ", + cluster_dir.to_string_lossy().as_ref() + ), + ) + .unwrap(); + let cluster = cmd_redirect( + "postgres", + &["-D", cluster_dir.to_string_lossy().as_ref()], + &cmd_log, + ); + Self { + cluster_dir: cluster_dir.to_string_lossy().to_string(), + cmd_log, + cluster, + } + } + + pub fn postgres_uri(&self, database: &str) -> String { + format!("postgres:///{database}?host={}", self.cluster_dir) + } + + pub fn execute_sql(&self, sql: &str) -> bool { + let mut psql = ChildGuard( + Command::new("psql") + .arg(self.postgres_uri("postgres")) + .stderr(Stdio::null()) + .stdout( + std::fs::File::options() + .append(true) + .create(true) + .open(&self.cmd_log) + .unwrap(), + ) + .stdin(Stdio::piped()) + .spawn() + .unwrap(), + ); + psql.0 + .stdin + .as_mut() + .unwrap() + .write_all(sql.as_bytes()) + .unwrap(); + psql.0.wait().unwrap().success() + } + + pub fn create_db(&self, name: &str) { + self.execute_sql(&format!("CREATE DATABASE {name};")); + } + + pub fn stop_db(&self, name: &str) { + self.execute_sql(&format!( + " + UPDATE pg_database SET datallowconn=false WHERE datname='{name}'; + SELECT pg_terminate_backend(pid) + FROM pg_stat_activity + WHERE datname='{name}' AND pid <> pg_backend_pid(); + " + )); + } + pub fn resume_db(&self, name: &str) { + self.execute_sql(&format!( + "UPDATE pg_database SET datallowconn=true WHERE datname='{name}';" + )); + } + + pub fn wait_running(&self) { + for _ in 0..10 { + if self.execute_sql("SELECT true") { + break; + } + std::thread::sleep(Duration::from_millis(500)) + } + } +} + +pub fn patch_config(from: impl AsRef<Path>, to: impl AsRef<Path>, patch: impl FnOnce(&mut Ini)) { + let mut cfg = ini::Ini::load_from_file(from).unwrap(); + patch(&mut cfg); + cfg.write_to_file(to).unwrap(); +}