depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit e83e3ac4f0994c4e052b2578ffa6d04b11d4c722
parent 050c5fac288d15a6bc8a123cf085fcc833d2e1e4
Author: Antoine A <>
Date:   Wed,  9 Jul 2025 14:02:35 +0200

common: async everywhere

Diffstat:
MCargo.lock | 154++-----------------------------------------------------------------------------
MCargo.toml | 13++++---------
Mcommon/Cargo.toml | 12++----------
Mcommon/src/lib.rs | 25+++++--------------------
Dcommon/src/log.rs | 40----------------------------------------
Dcommon/src/reconnect.rs | 32--------------------------------
Dcommon/src/sql.rs | 78------------------------------------------------------------------------------
Mdepolymerizer-bitcoin/Cargo.toml | 1+
Mdepolymerizer-bitcoin/benches/metadata.rs | 8+++++---
Mdepolymerizer-bitcoin/src/api.rs | 4++--
Mdepolymerizer-bitcoin/src/config.rs | 4++--
Mdepolymerizer-bitcoin/src/fail_point.rs | 2+-
Mdepolymerizer-bitcoin/src/lib.rs | 20++++++++++----------
Mdepolymerizer-bitcoin/src/loops.rs | 5++---
Mdepolymerizer-bitcoin/src/loops/analysis.rs | 5+++--
Mdepolymerizer-bitcoin/src/loops/watcher.rs | 28++++++++++++++--------------
Mdepolymerizer-bitcoin/src/loops/worker.rs | 403++++++++++++++++++++++++++++++++++++++++---------------------------------------
Mdepolymerizer-bitcoin/src/main.rs | 26+++++++++++---------------
Mdepolymerizer-bitcoin/src/rpc.rs | 127+++++++++++++++++++++++++++++++++++++++++++------------------------------------
Mdepolymerizer-bitcoin/src/rpc_utils.rs | 4++--
Mdepolymerizer-bitcoin/src/segwit.rs | 11+++++------
Mdepolymerizer-bitcoin/src/sql.rs | 33++++++++++++---------------------
Minstrumentation/Cargo.toml | 5+----
Minstrumentation/src/btc.rs | 726++++++++++++++++++++++++++++++++++++++++---------------------------------------
Minstrumentation/src/main.rs | 173++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------
Minstrumentation/src/utils.rs | 102+++++++++++++++++++++++++++++++++++++++----------------------------------------
Muri-pack/Cargo.toml | 2--
27 files changed, 883 insertions(+), 1160 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -101,17 +101,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c02d123df017efcdfbd739ef81735b36c5ba83ec3c59c80a9d7ecc718f92e50" [[package]] -name = "async-trait" -version = "0.1.88" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e539d3fca749fcee5236ab05e93a52867dd549cc157c8cb7f99595f3cedffdb5" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] name = "atoi" version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -411,15 +400,9 @@ checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" name = "common" version = "0.1.0" dependencies = [ - "bitcoin", - "const-hex", - "postgres", "rand 0.9.1", - "sqlx", - "taler-api", "taler-common", "thiserror", - "tracing", "uri-pack", "url", ] @@ -739,6 +722,7 @@ dependencies = [ "common", "const-hex", "criterion", + "rand 0.9.1", "serde", "serde_json", "serde_repr", @@ -903,12 +887,6 @@ dependencies = [ ] [[package]] -name = "fallible-iterator" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" - -[[package]] name = "fastrand" version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1007,17 +985,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] -name = "futures-macro" -version = "0.3.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - -[[package]] name = "futures-sink" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1037,7 +1004,6 @@ checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-core", "futures-io", - "futures-macro", "futures-sink", "futures-task", "memchr", @@ -1407,14 +1373,12 @@ dependencies = [ "clap", "common", "depolymerizer-bitcoin", - "fastrand", "indicatif", "owo-colors", "rust-ini", - "taler-api", "taler-common", "tempfile", - "thread-local-panic-hook", + "tokio", "ureq", ] @@ -1793,24 +1757,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3148f5046208a5d56bcfc03053e3ca6334e51da8dfb19b6cdc8b306fae3283e" [[package]] -name = "phf" -version = "0.11.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" -dependencies = [ - "phf_shared", -] - -[[package]] -name = "phf_shared" -version = "0.11.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" -dependencies = [ - "siphasher", -] - -[[package]] name = "pin-project-lite" version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1893,49 +1839,6 @@ dependencies = [ ] [[package]] -name = "postgres" -version = "0.19.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "363e6dfbdd780d3aa3597b6eb430db76bb315fa9bad7fae595bb8def808b8470" -dependencies = [ - "bytes", - "fallible-iterator", - "futures-util", - "log", - "tokio", - "tokio-postgres", -] - -[[package]] -name = "postgres-protocol" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76ff0abab4a9b844b93ef7b81f1efc0a366062aaef2cd702c76256b5dc075c54" -dependencies = [ - "base64", - "byteorder", - "bytes", - "fallible-iterator", - "hmac", - "md-5", - "memchr", - "rand 0.9.1", - "sha2", - "stringprep", -] - -[[package]] -name = "postgres-types" -version = "0.2.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "613283563cd90e1dfc3518d548caee47e0e725455ed619881f5cf21f36de4b48" -dependencies = [ - "bytes", - "fallible-iterator", - "postgres-protocol", -] - -[[package]] name = "potential_utf" version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2461,12 +2364,6 @@ dependencies = [ ] [[package]] -name = "siphasher" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" - -[[package]] name = "slab" version = "0.4.10" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2861,12 +2758,6 @@ dependencies = [ ] [[package]] -name = "thread-local-panic-hook" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e70399498abd3ec85f99a2f2d765c8638588e20361678af93a9f47de96719743" - -[[package]] name = "thread_local" version = "1.1.9" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2981,32 +2872,6 @@ dependencies = [ ] [[package]] -name = "tokio-postgres" -version = "0.7.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c95d533c83082bb6490e0189acaa0bbeef9084e60471b696ca6988cd0541fb0" -dependencies = [ - "async-trait", - "byteorder", - "bytes", - "fallible-iterator", - "futures-channel", - "futures-util", - "log", - "parking_lot", - "percent-encoding", - "phf", - "pin-project-lite", - "postgres-protocol", - "postgres-types", - "rand 0.9.1", - "socket2", - "tokio", - "tokio-util", - "whoami", -] - -[[package]] name = "tokio-stream" version = "0.1.17" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3018,19 +2883,6 @@ dependencies = [ ] [[package]] -name = "tokio-util" -version = "0.7.15" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "66a539a9ad6d5d281510d5bd368c973d636c02dbf8a67300bfb6b950696ad7df" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "pin-project-lite", - "tokio", -] - -[[package]] name = "tower" version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3217,7 +3069,6 @@ dependencies = [ "quickcheck_macros", "serde_json", "thiserror", - "url", ] [[package]] @@ -3413,7 +3264,6 @@ checksum = "6994d13118ab492c3c80c1f81928718159254c53c472bf9ce36f8dae4add02a7" dependencies = [ "redox_syscall", "wasite", - "web-sys", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml @@ -1,11 +1,6 @@ [workspace] resolver = "3" -members = [ - "depolymerizer-bitcoin", - "uri-pack", - "common", - "instrumentation", -] +members = ["depolymerizer-bitcoin", "uri-pack", "common", "instrumentation"] [workspace.package] edition = "2024" @@ -41,6 +36,6 @@ hex = { package = "const-hex", version = "1.9.1" } clap = { version = "4.5", features = ["derive"] } anyhow = "1" tracing = "0.1" -tracing-subscriber = "0.3" criterion = "0.6" -base64 = "0.22.1" -\ No newline at end of file +base64 = "0.22.1" +rand = { version = "0.9.0" } +\ No newline at end of file diff --git a/common/Cargo.toml b/common/Cargo.toml @@ -12,15 +12,7 @@ license-file.workspace = true url.workspace = true # Error macros thiserror.workspace = true -# Postgres client -postgres = "0.19.7" -# Secure random -rand = { version = "0.9.0" } # Optimized uri binary format uri-pack = { path = "../uri-pack" } taler-common.workspace = true -taler-api.workspace = true -sqlx.workspace = true -bitcoin.workspace = true -hex.workspace = true -tracing.workspace = true -\ No newline at end of file +rand.workspace = true +\ No newline at end of file diff --git a/common/src/lib.rs b/common/src/lib.rs @@ -13,19 +13,12 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::thread::JoinHandle; - -use rand::{RngCore, rngs::ThreadRng}; - -pub use postgres; -pub use rand; +use rand::{RngCore as _, rngs::ThreadRng}; pub use taler_common; +use taler_common::ExpoBackoffDecorr; pub use url; -pub mod log; pub mod metadata; -pub mod reconnect; -pub mod sql; pub mod status; /// Secure random slice generator using getrandom @@ -35,15 +28,7 @@ pub fn rand_slice<const N: usize>() -> [u8; N] { slice } -/// Spawned a named thread -pub fn named_spawn<F, T>(name: impl Into<String>, f: F) -> JoinHandle<T> -where - F: FnOnce() -> T, - F: Send + 'static, - T: Send + 'static, -{ - std::thread::Builder::new() - .name(name.into()) - .spawn(f) - .unwrap() +/// Default jitter config +pub fn jitter() -> ExpoBackoffDecorr { + ExpoBackoffDecorr::new(200, 15 * 1000, 2.0) } diff --git a/common/src/log.rs b/common/src/log.rs @@ -1,40 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -use std::{fmt::Display, process::exit}; - -use tracing::error; - -pub trait OrFail<T, E> { - fn or_fail<F: FnOnce(E) -> String>(self, lambda: F) -> T; -} - -impl<T, E> OrFail<T, E> for Result<T, E> { - fn or_fail<F: FnOnce(E) -> String>(self, lambda: F) -> T { - self.unwrap_or_else(|e| fail(lambda(e))) - } -} - -impl<T> OrFail<T, ()> for Option<T> { - fn or_fail<F: FnOnce(()) -> String>(self, lambda: F) -> T { - self.unwrap_or_else(|| fail(lambda(()))) - } -} - -/// Log error message then exit -pub fn fail(msg: impl Display) -> ! { - error!("{msg}"); - exit(1); -} diff --git a/common/src/reconnect.rs b/common/src/reconnect.rs @@ -1,32 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -use postgres::NoTls; -use taler_common::ExpoBackoffDecorr; - -pub fn client_jitter() -> ExpoBackoffDecorr { - ExpoBackoffDecorr::new(200, 15 * 1000, 2.0) -} - -pub fn connect_db( - cfg: &postgres::Config, - schema: &str, -) -> Result<postgres::Client, postgres::Error> { - let mut client = cfg.connect(NoTls)?; - client.batch_execute(&format!( - "SET search_path TO {schema};SET default_transaction_isolation = 'serializable';" - ))?; - Ok(client) -} diff --git a/common/src/sql.rs b/common/src/sql.rs @@ -1,78 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::str::FromStr; - -use postgres::Row; -use taler_common::{ - api_common::SafeU64, - types::{ - amount::{Amount, Currency}, - base32::Base32, - payto::PaytoURI, - }, -}; -use url::Url; - -use crate::log::OrFail; - -/// URL from sql -pub fn sql_url(row: &Row, idx: usize) -> Url { - let str: &str = row.get(idx); - Url::from_str(str).or_fail(|_| format!("Database invariant: expected an url got {str}")) -} - -/// Payto from sql -pub fn sql_payto(row: &Row, idx: usize) -> PaytoURI { - let str: &str = row.get(idx); - PaytoURI::from_str(str).or_fail(|_| format!("Database invariant: expected a payto got {str}")) -} - -/// Ethereum amount from sql -pub fn sql_amount(row: &Row, idx: usize, currency: &Currency) -> Amount { - // TODO use decimal instead - let val: i64 = row.get(idx); - let frac: i32 = row.get(idx + 1); - Amount::new(currency, val as u64, frac as u32) -} - -/// Byte array from sql -pub fn sql_array<const N: usize>(row: &Row, idx: usize) -> [u8; N] { - let slice: &[u8] = row.get(idx); - slice.try_into().or_fail(|_| { - format!( - "Database invariant: expected an byte array of {N}B for {}B", - slice.len() - ) - }) -} - -/// Base32 from sql -pub fn sql_base_32<const N: usize>(row: &Row, idx: usize) -> Base32<N> { - let slice: &[u8] = row.get(idx); - slice.try_into().or_fail(|_| { - format!( - "Database invariant: expected a base32 byte array of {N}B for {}B", - slice.len() - ) - }) -} - -/// Safe safe u64 from sql -pub fn sql_safe_u64(row: &Row, idx: usize) -> SafeU64 { - let id: i64 = row.get(idx); - SafeU64::try_from(id as u64).unwrap() -} diff --git a/depolymerizer-bitcoin/Cargo.toml b/depolymerizer-bitcoin/Cargo.toml @@ -36,6 +36,7 @@ tokio.workspace = true tracing.workspace = true axum.workspace = true base64.workspace = true +rand.workspace = true [dev-dependencies] criterion.workspace = true diff --git a/depolymerizer-bitcoin/benches/metadata.rs b/depolymerizer-bitcoin/benches/metadata.rs @@ -1,3 +1,4 @@ +use bech32::Hrp; /* This file is part of TALER Copyright (C) 2022-2025 Taler Systems SA @@ -13,22 +14,23 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use btc_wire::segwit::{decode_segwit_msg, encode_segwit_key, rand_addresses}; use common::rand_slice; use criterion::{Criterion, criterion_group, criterion_main}; +use depolymerizer_bitcoin::segwit::{decode_segwit_msg, encode_segwit_key, rand_addresses}; fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("SegWit addresses"); + let hrp = Hrp::parse("bench").unwrap(); group.bench_function("encode", |b| { b.iter_batched( || rand_slice(), - |key| encode_segwit_key("bench", &key), + |key| encode_segwit_key(hrp, &key), criterion::BatchSize::SmallInput, ); }); group.bench_function("decode", |b| { b.iter_batched( - || rand_addresses("bench", &rand_slice()), + || rand_addresses(hrp, &rand_slice()), |addrs| decode_segwit_msg(&addrs), criterion::BatchSize::SmallInput, ); diff --git a/depolymerizer-bitcoin/src/api.rs b/depolymerizer-bitcoin/src/api.rs @@ -29,7 +29,7 @@ use axum::{ response::{IntoResponse as _, Response}, }; use bitcoin::address::NetworkUnchecked; -use common::reconnect::client_jitter; +use common::jitter; use sqlx::{ PgPool, QueryBuilder, Row, postgres::{PgListener, PgRow}, @@ -384,7 +384,7 @@ pub async fn status_middleware( /// Listen to backend status change async fn status_watcher(state: Arc<ServerState>) { - let mut jitter = client_jitter(); + let mut jitter = jitter(); async fn inner( state: &ServerState, jitter: &mut ExpoBackoffDecorr, diff --git a/depolymerizer-bitcoin/src/config.rs b/depolymerizer-bitcoin/src/config.rs @@ -17,7 +17,7 @@ use std::net::SocketAddr; use bitcoin::Amount; -use common::postgres; +use sqlx::postgres::PgConnectOptions; use taler_api::{ Serve, config::{ApiCfg, DbCfg}, @@ -94,7 +94,7 @@ pub struct WorkerCfg { pub bounce_fee: Amount, pub lifetime: Option<u32>, pub bump_delay: Option<u32>, - pub db_config: postgres::Config, + pub db_config: PgConnectOptions, pub currency: Currency, pub rpc_cfg: RpcCfg, pub wallet_cfg: WalletCfg, diff --git a/depolymerizer-bitcoin/src/fail_point.rs b/depolymerizer-bitcoin/src/fail_point.rs @@ -21,7 +21,7 @@ pub struct Injected(&'static str); #[allow(unused_variables)] pub fn fail_point(msg: &'static str, prob: f32) -> Result<(), Injected> { #[cfg(feature = "fail")] - return if common::rand::random::<f32>() < prob { + return if rand::random::<f32>() < prob { Err(Injected(msg)) } else { Ok(()) diff --git a/depolymerizer-bitcoin/src/lib.rs b/depolymerizer-bitcoin/src/lib.rs @@ -51,7 +51,7 @@ pub enum GetOpReturnErr { /// An extended bitcoincore JSON-RPC api client who can send and retrieve metadata with their transaction impl Rpc { /// Send a transaction with a 32B key as metadata encoded using fake segwit addresses - pub fn send_segwit_key( + pub async fn send_segwit_key( &mut self, to: &Address, amount: &Amount, @@ -72,15 +72,15 @@ impl Rpc { let mut recipients = vec![(to, amount)]; let min = segwit_min_amount(); recipients.extend(addresses.iter().map(|addr| (addr, &min))); - self.send_many(recipients) + self.send_many(recipients).await } /// Get detailed information about an in-wallet transaction and it's 32B metadata key encoded using fake segwit addresses - pub fn get_tx_segwit_key( + pub async fn get_tx_segwit_key( &mut self, id: &Txid, ) -> Result<(Transaction, EddsaPublicKey), GetSegwitErr> { - let full = self.get_tx(id)?; + let full = self.get_tx(id).await?; let addresses: Vec<String> = full .decoded @@ -100,11 +100,11 @@ impl Rpc { } /// Get detailed information about an in-wallet transaction and its op_return metadata - pub fn get_tx_op_return( + pub async fn get_tx_op_return( &mut self, id: &Txid, ) -> Result<(Transaction, Vec<u8>), GetOpReturnErr> { - let full = self.get_tx(id)?; + let full = self.get_tx(id).await?; let op_return_out = full .decoded @@ -125,21 +125,21 @@ impl Rpc { /// There is no reliable way to bounce a transaction as you cannot know if the addresses /// used are shared or come from a third-party service. We only send back to the first input /// address as a best-effort gesture. - pub fn bounce( + pub async fn bounce( &mut self, id: &Txid, bounce_fee: &Amount, metadata: Option<&[u8]>, ) -> Result<Txid, rpc::Error> { - let full = self.get_tx(id)?; + let full = self.get_tx(id).await?; let detail = &full.details[0]; assert!(detail.category == Category::Receive); let amount = detail.amount.to_unsigned().unwrap(); - let sender = sender_address(self, &full)?; + let sender = sender_address(self, &full).await?; let bounce_amount = Amount::from_sat(amount.to_sat().saturating_sub(bounce_fee.to_sat())); // Send refund making recipient pay the transaction fees - self.send(&sender, &bounce_amount, metadata, true) + self.send(&sender, &bounce_amount, metadata, true).await } } diff --git a/depolymerizer-bitcoin/src/loops.rs b/depolymerizer-bitcoin/src/loops.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022 Taler Systems SA + Copyright (C) 2022-2025 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -14,7 +14,6 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use common::postgres; use depolymerizer_bitcoin::rpc; use crate::fail_point::Injected; @@ -28,7 +27,7 @@ pub enum LoopError { #[error(transparent)] Rpc(#[from] rpc::Error), #[error("DB {0}")] - DB(#[from] postgres::Error), + DB(#[from] sqlx::Error), #[error("Another btc-wire process is running concurrently")] Concurrency, #[error(transparent)] diff --git a/depolymerizer-bitcoin/src/loops/analysis.rs b/depolymerizer-bitcoin/src/loops/analysis.rs @@ -19,10 +19,11 @@ use tracing::warn; use super::LoopResult; /// Analyse blockchain behavior and return the new confirmation delay -pub fn analysis(rpc: &mut Rpc, current: u32, max: u32) -> LoopResult<u32> { +pub async fn analysis(rpc: &mut Rpc, current: u32, max: u32) -> LoopResult<u32> { // Get biggest known valid fork let fork = rpc - .get_chain_tips()? + .get_chain_tips() + .await? .into_iter() .filter_map(|t| (t.status == ChainTipsStatus::ValidFork).then_some(t.length)) .max() diff --git a/depolymerizer-bitcoin/src/loops/watcher.rs b/depolymerizer-bitcoin/src/loops/watcher.rs @@ -13,32 +13,32 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use common::{ - postgres, - reconnect::{client_jitter, connect_db}, -}; -use depolymerizer_bitcoin::{DB_SCHEMA, config::RpcCfg, rpc::rpc_common}; + +use common::jitter; +use depolymerizer_bitcoin::{config::RpcCfg, rpc::rpc_common}; +use sqlx::PgPool; use std::time::Duration; +use tokio::time::sleep; use tracing::error; use super::LoopResult; /// Wait for new block and notify arrival with postgreSQL notifications -pub fn watcher(rpc_cfg: &RpcCfg, db_cfg: &postgres::Config) { - let mut jitter = client_jitter(); +pub async fn watcher(rpc_cfg: RpcCfg, pool: PgPool) { + let mut jitter = jitter(); loop { - let result: LoopResult<()> = (|| { - let mut rpc = rpc_common(rpc_cfg)?; - let mut db = connect_db(db_cfg, DB_SCHEMA)?; + let result: LoopResult<()> = async { + let mut rpc = rpc_common(&rpc_cfg).await?; loop { - db.execute("NOTIFY new_block", &[])?; - rpc.wait_for_new_block()?; + sqlx::query("NOTIFY new_block").execute(&pool).await?; + rpc.wait_for_new_block().await?; jitter.reset(); } - })(); + } + .await; if let Err(e) = result { error!("watcher: {e}"); - std::thread::sleep(Duration::from_millis(jitter.next() as u64)); + sleep(Duration::from_millis(jitter.next() as u64)).await; } } } diff --git a/depolymerizer-bitcoin/src/loops/worker.rs b/depolymerizer-bitcoin/src/loops/worker.rs @@ -21,21 +21,20 @@ use std::{ use bitcoin::{Amount as BtcAmount, BlockHash, Txid, hashes::Hash}; use common::{ - log::OrFail, + jitter, metadata::OutMetadata, - postgres, - reconnect::{client_jitter, connect_db}, - sql::{sql_base_32, sql_url}, status::{BounceStatus, DebitStatus}, taler_common::{api_common::ShortHashCode, types::timestamp::Timestamp}, }; use depolymerizer_bitcoin::{ - DB_SCHEMA, GetOpReturnErr, GetSegwitErr, + GetOpReturnErr, GetSegwitErr, rpc::{self, Category, ErrorCode, Rpc, Transaction, rpc_wallet}, rpc_utils::sender_address, taler_utils::btc_to_taler, }; -use postgres::{Client, fallible_iterator::FallibleIterator}; +use sqlx::{PgPool, Row, postgres::PgListener}; +use taler_api::db::{BindHelper as _, TypeHelper}; +use tokio::time::sleep; use tracing::{error, info, warn}; use crate::{ @@ -47,17 +46,17 @@ use crate::{ use super::{LoopError, LoopResult, analysis::analysis}; /// Synchronize local db with blockchain and perform transactions -pub fn worker(mut state: WorkerCfg) { - let mut jitter = client_jitter(); +pub async fn worker(mut state: WorkerCfg, pool: PgPool) { + let mut jitter = jitter(); let mut lifetime = state.lifetime; let mut status = true; let mut skip_notification = true; loop { - let result: LoopResult<()> = (|| { + let result: LoopResult<()> = async { // Connect - let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg)?; - let db = &mut connect_db(&state.db_config, DB_SCHEMA)?; + let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg).await?; + let db = &mut PgListener::connect_with(&pool).await?; // It is not possible to atomically update the blockchain and the database. // When we failed to sync the database and the blockchain state we rely on @@ -68,25 +67,27 @@ pub fn worker(mut state: WorkerCfg) { // on postgres advisory lock // Take the lock - let row = db.query_one("SELECT pg_try_advisory_lock(42)", &[])?; - let locked: bool = row.get(0); + let row = sqlx::query("SELECT pg_try_advisory_lock(42)") + .fetch_one(&mut *db) + .await?; + let locked: bool = row.try_get(0)?; if !locked { return Err(LoopError::Concurrency); } + // Listen to all channels + db.listen_all(["new_block", "new_tx"]).await?; + loop { - // Listen to all channels - db.batch_execute("LISTEN new_block; LISTEN new_tx")?; // Wait for the next notification { - let mut ntf = db.notifications(); - if !skip_notification && ntf.is_empty() { + let ntf = db.next_buffered(); + if !skip_notification && ntf.is_none() { // Block until next notification - ntf.blocking_iter().next()?; + db.try_recv().await?; } // Conflate all notifications - let mut iter = ntf.iter(); - while iter.next()?.is_some() {} + while db.next_buffered().is_some() {} } // Check lifetime @@ -100,38 +101,39 @@ pub fn worker(mut state: WorkerCfg) { } // Perform analysis - state.confirmation = analysis(rpc, state.confirmation, state.max_confirmation)?; + state.confirmation = + analysis(rpc, state.confirmation, state.max_confirmation).await?; // Sync chain - if let Some(stuck) = sync_chain(rpc, db, &state, &mut status)? { + if let Some(stuck) = sync_chain(rpc, db, &state, &mut status).await? { // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent // Send requested debits - while debit(db, rpc, &state)? {} + while debit(db, rpc, &state).await? {} // Bump stuck transactions for id in stuck { - let bump = rpc.bump_fee(&id)?; + let bump = rpc.bump_fee(&id).await?; fail_point("(injected) fail bump", 0.3)?; - let row = db.query_one( - "UPDATE tx_out SET txid=$1 WHERE txid=$2 RETURNING wtid", - &[ - &bump.txid.as_byte_array().as_slice(), - &id.as_byte_array().as_slice(), - ], - )?; - let wtid: ShortHashCode = sql_base_32(&row, 0); + let row = + sqlx::query("UPDATE tx_out SET txid=$1 WHERE txid=$2 RETURNING wtid") + .bind(bump.txid.as_byte_array()) + .bind(id.as_byte_array()) + .fetch_one(&mut *db) + .await?; + let wtid: ShortHashCode = row.try_get_base32(0)?; info!(">> (bump) {wtid} replace {id} with {}", bump.txid); } // Send requested bounce - while bounce(db, rpc, &state.bounce_fee)? {} + while bounce(db, rpc, &state.bounce_fee).await? {} } skip_notification = false; jitter.reset(); } - })(); + } + .await; if let Err(e) = result { error!("worker: {e}"); // When we catch an error, we sometimes want to retry immediately (eg. reconnect to RPC or DB). @@ -144,7 +146,7 @@ pub fn worker(mut state: WorkerCfg) { | LoopError::DB(_) | LoopError::Injected(_) ); - std::thread::sleep(Duration::from_millis(jitter.next() as u64)); + sleep(Duration::from_millis(jitter.next() as u64)).await; } else { return; } @@ -152,20 +154,22 @@ pub fn worker(mut state: WorkerCfg) { } /// Retrieve last stored hash -fn last_hash(db: &mut Client) -> Result<BlockHash, postgres::Error> { - let row = db.query_one("SELECT value FROM state WHERE name='last_hash'", &[])?; +async fn last_hash(db: &mut PgListener) -> sqlx::Result<BlockHash> { + let row = sqlx::query("SELECT value FROM state WHERE name='last_hash'") + .fetch_one(db) + .await?; Ok(BlockHash::from_slice(row.get(0)).unwrap()) } /// Parse new transactions, return stuck transactions if the database is up to date with the latest mined block -fn sync_chain( +async fn sync_chain( rpc: &mut Rpc, - db: &mut Client, + db: &mut PgListener, state: &WorkerCfg, status: &mut bool, ) -> LoopResult<Option<Vec<Txid>>> { // Get stored last_hash - let last_hash = last_hash(db)?; + let last_hash = last_hash(db).await?; // Get the current confirmation delay let conf_delay = state.confirmation; @@ -176,7 +180,7 @@ fn sync_chain( BlockHash, ) = { // Get all transactions made since this block - let list = rpc.list_since_block(Some(&last_hash), conf_delay)?; + let list = rpc.list_since_block(Some(&last_hash), conf_delay).await?; // Only keep ids and category let txs = list .transactions @@ -192,17 +196,15 @@ fn sync_chain( }; // Check if a confirmed incoming transaction have been removed by a blockchain reorganization - let new_status = sync_chain_removed(&txs, &removed, rpc, db, conf_delay as i32)?; + let new_status = sync_chain_removed(&txs, &removed, rpc, db, conf_delay as i32).await?; // Sync status with database if *status != new_status { - let mut tx = db.transaction()?; - tx.execute( - "UPDATE state SET value=$1 WHERE name='status'", - &[&[new_status as u8].as_slice()], - )?; - tx.execute("NOTIFY status", &[])?; - tx.commit()?; + sqlx::query("UPDATE state SET value=$1 WHERE name='status'") + .bind([new_status as u8].as_slice()) + .execute(&mut *db) + .await?; + sqlx::query("NOTIFY status").execute(&mut *db).await?; *status = new_status; if new_status { info!("Recovered lost transactions"); @@ -217,12 +219,12 @@ fn sync_chain( for (id, (category, confirmations)) in txs { match category { Category::Send => { - if sync_chain_outgoing(&id, confirmations, rpc, db, state)? { + if sync_chain_outgoing(&id, confirmations, rpc, db, state).await? { stuck.push(id); } } Category::Receive if confirmations >= conf_delay as i32 => { - sync_chain_incoming_confirmed(&id, rpc, db, state)? + sync_chain_incoming_confirmed(&id, rpc, db, state).await? } _ => { // Ignore coinbase and unconfirmed send transactions @@ -231,23 +233,21 @@ fn sync_chain( } // Move last_hash forward - db.execute( - "UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2", - &[ - &lastblock.as_byte_array().as_slice(), - &last_hash.as_byte_array().as_slice(), - ], - )?; + sqlx::query("UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2") + .bind(lastblock.as_byte_array()) + .bind(last_hash.as_byte_array().as_slice()) + .execute(db) + .await?; Ok(Some(stuck)) } /// Sync database with removed transactions, return false if bitcoin backing is compromised -fn sync_chain_removed( +async fn sync_chain_removed( txs: &HashMap<Txid, (Category, i32)>, removed: &HashMap<Txid, (Category, i32)>, rpc: &mut Rpc, - db: &mut Client, + db: &mut PgListener, min_confirmations: i32, ) -> LoopResult<bool> { // A removed incoming transaction is a correctness issues in only two cases: @@ -268,34 +268,35 @@ fn sync_chain_removed( .map(|(_, confirmations)| *confirmations < min_confirmations) .unwrap_or(true) }) { - match rpc.get_tx_segwit_key(id) { + match rpc.get_tx_segwit_key(id).await { Ok((full, key)) => { // Credits are only problematic if not reconfirmed and stored in the database - if db - .query_opt( - "SELECT 1 FROM tx_in WHERE reserve_pub=$1", - &[&key.as_slice()], - )? + if sqlx::query("SELECT 1 FROM tx_in WHERE reserve_pub=$1") + .bind(key.as_slice()) + .fetch_optional(&mut *db) + .await? .is_some() { - let debit_addr = sender_address(rpc, &full)?; + let debit_addr = sender_address(rpc, &full).await?; blocking_debit.push((key, id, debit_addr)); } } Err(err) => match err { GetSegwitErr::Decode(_) => { // Invalid tx are only problematic if already bounced - if let Some(row) = db.query_opt( - "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL", - &[&id.as_byte_array().as_slice()], - )? { - blocking_bounce.push((sql_txid(&row, 0), id)); + if let Some(row) = + sqlx::query("SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL") + .bind(id.as_byte_array()) + .fetch_optional(&mut *db) + .await? + { + blocking_bounce.push((sql_txid(&row, 0)?, id)); } else { // Remove transaction from bounce table - db.execute( - "DELETE FROM bounce WHERE bounced=$1", - &[&id.as_byte_array().as_slice()], - )?; + sqlx::query("DELETE FROM bounce WHERE bounced=$1") + .bind(id.as_byte_array()) + .execute(&mut *db) + .await?; } } GetSegwitErr::RPC(it) => return Err(it.into()), @@ -319,34 +320,32 @@ fn sync_chain_removed( } /// Sync database with an incoming confirmed transaction -fn sync_chain_incoming_confirmed( +async fn sync_chain_incoming_confirmed( id: &Txid, rpc: &mut Rpc, - db: &mut Client, + db: &mut PgListener, state: &WorkerCfg, ) -> Result<(), LoopError> { - match rpc.get_tx_segwit_key(id) { + match rpc.get_tx_segwit_key(id).await { Ok((full, reserve_pub)) => { // Store transactions in database - let debit_addr = sender_address(rpc, &full)?; + let debit_addr = sender_address(rpc, &full).await?; let credit_addr = full.details[0].address.clone().unwrap().assume_checked(); let amount = btc_to_taler(&full.amount, &state.currency); - let update = db.query_opt("INSERT INTO tx_in (received, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6) ON CONFLICT (reserve_pub) DO NOTHING RETURNING id", &[ - &((full.time * 1000000) as i64), &(amount.val as i64), &(amount.frac as i32), &reserve_pub.as_slice(), &debit_addr.to_string(), &credit_addr.to_string() - ])?; - if let Some(row) = update { + if let Some(row) = sqlx::query("INSERT INTO tx_in (received, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6) ON CONFLICT (reserve_pub) DO NOTHING RETURNING id") + .bind((full.time * 1000000) as i64).bind_amount(&amount).bind(reserve_pub.as_slice()).bind(debit_addr.to_string()).bind(credit_addr.to_string()).fetch_optional(&mut *db).await? { info!("<< {amount} {reserve_pub} in {id} from {debit_addr}"); let id: i64 = row.try_get(0)?; - db.execute("SELECT pg_notify('taler_in', $1)", &[&id.to_string()])?; + sqlx::query("SELECT pg_notify('taler_in', $1)").bind(id.to_string()).execute(db).await?; } } Err(err) => match err { GetSegwitErr::Decode(_) => { // If encoding is wrong request a bounce - db.execute( - "INSERT INTO bounce (created, bounced) VALUES ($1, $2) ON CONFLICT (bounced) DO NOTHING", - &[&Timestamp::now().as_sql_micros(), &id.as_byte_array().as_slice()], - )?; + sqlx::query( + "INSERT INTO bounce (created, bounced) VALUES ($1, $2) ON CONFLICT (bounced) DO NOTHING") + .bind_timestamp(&Timestamp::now()).bind(id.as_byte_array()) + .execute(db).await?; } GetSegwitErr::RPC(e) => return Err(e.into()), }, @@ -355,12 +354,12 @@ fn sync_chain_incoming_confirmed( } /// Sync database with a debit transaction, return true if stuck -fn sync_chain_debit( +async fn sync_chain_debit( id: &Txid, full: &Transaction, wtid: &ShortHashCode, rpc: &mut Rpc, - db: &mut Client, + db: &mut PgListener, confirmations: i32, state: &WorkerCfg, ) -> LoopResult<bool> { @@ -370,53 +369,52 @@ fn sync_chain_debit( if confirmations < 0 { if full.replaced_by_txid.is_none() { // Handle conflicting tx - let nb_row = db.execute( - "UPDATE tx_out SET status=$1, txid=NULL where txid=$2", - &[ - &(DebitStatus::Requested as i16), - &id.as_byte_array().as_slice(), - ], - )?; + let nb_row = sqlx::query("UPDATE tx_out SET status=$1, txid=NULL where txid=$2") + .bind(DebitStatus::Requested as i16) + .bind(id.as_byte_array()) + .execute(db) + .await? + .rows_affected(); if nb_row > 0 { warn!(">> (conflict) {wtid} in {id} to {credit_addr}"); } } } else { // Get previous out tx - let row = db.query_opt( - "SELECT id,status,txid FROM tx_out WHERE wtid=$1 FOR UPDATE", - &[&wtid.as_slice()], - )?; - if let Some(row) = row { + if let Some(row) = sqlx::query("SELECT id,status,txid FROM tx_out WHERE wtid=$1 FOR UPDATE") + .bind(wtid.as_slice()) + .fetch_optional(&mut *db) + .await? + { // If already in database, sync status let row_id: i64 = row.get(0); let status: i16 = row.get(1); match DebitStatus::try_from(status as u8).unwrap() { DebitStatus::Requested => { - let nb_row = db.execute( + let nb_row = sqlx::query( "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3 AND status=$4", - &[ - &(DebitStatus::Sent as i16), - &id.as_byte_array().as_slice(), - &row_id, - &status, - ], - )?; + ) + .bind(DebitStatus::Sent as i16) + .bind(id.as_byte_array()) + .bind(row_id) + .bind(status) + .execute(db) + .await? + .rows_affected(); if nb_row > 0 { warn!(">> (recovered) {amount} {wtid} in {id} to {credit_addr}"); } } DebitStatus::Sent => { if let Some(txid) = full.replaces_txid { - let stored_id = sql_txid(&row, 2); + let stored_id = sql_txid(&row, 2)?; if txid == stored_id { - let nb_row = db.execute( - "UPDATE tx_out SET txid=$1 WHERE txid=$2", - &[ - &id.as_byte_array().as_slice(), - &txid.as_byte_array().as_slice(), - ], - )?; + let nb_row = sqlx::query("UPDATE tx_out SET txid=$1 WHERE txid=$2") + .bind(id.as_byte_array()) + .bind(txid.as_byte_array()) + .execute(db) + .await? + .rows_affected(); if nb_row > 0 { info!(">> (recovered) {wtid} replace {txid} with {id}",); } @@ -426,15 +424,26 @@ fn sync_chain_debit( } } else { // Else add to database - let debit_addr = sender_address(rpc, full)?; - let update = db.query_opt( - "INSERT INTO tx_out (created, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (wtid) DO NOTHING RETURNING id", - &[&((full.time*1000000) as i64), &(amount.val as i64), &(amount.frac as i32), &wtid.as_slice(), &debit_addr.to_string(), &credit_addr.to_string(), &"https://exchange.url.TODO/", &(DebitStatus::Sent as i16), &id.as_byte_array().as_slice(), &None::<&[u8]>], - )?; + let debit_addr = sender_address(rpc, full).await?; + let update = sqlx::query( + "INSERT INTO tx_out (created, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (wtid) DO NOTHING RETURNING id") + .bind((full.time*1000000) as i64) + .bind_amount(&amount) + .bind(wtid.as_slice()) + .bind(debit_addr.to_string()) + .bind(credit_addr.to_string()) + .bind("https://exchange.url.TODO/") + .bind(DebitStatus::Sent as i16) + .bind(id.as_byte_array()) + .bind(None::<&[u8]>) + .fetch_optional(&mut *db).await?; if let Some(row) = update { - warn!(">> (onchain) {amount} {wtid} in {id} to {credit_addr}",); + warn!(">> (onchain) {amount} {wtid} in {id} to {credit_addr}"); let id: i64 = row.try_get(0)?; - db.execute("SELECT pg_notify('taler_out', $1)", &[&id.to_string()])?; + sqlx::query("SELECT pg_notify('taler_out', $1)") + .bind(id) + .execute(db) + .await?; } } @@ -455,45 +464,45 @@ fn sync_chain_debit( } /// Sync database with an outgoing bounce transaction -fn sync_chain_bounce( +async fn sync_chain_bounce( id: &Txid, bounced: &Txid, - db: &mut Client, + db: &mut PgListener, confirmations: i32, ) -> LoopResult<()> { if confirmations < 0 { // Handle conflicting tx - let nb_row = db.execute( - "UPDATE bounce SET status=$1, txid=NULL where txid=$2", - &[ - &(BounceStatus::Requested as i16), - &id.as_byte_array().as_slice(), - ], - )?; + let nb_row = sqlx::query("UPDATE bounce SET status=$1, txid=NULL where txid=$2") + .bind(BounceStatus::Requested as i16) + .bind(id.as_byte_array()) + .execute(&mut *db) + .await? + .rows_affected(); if nb_row > 0 { warn!("|| (conflict) {bounced} in {id}"); } } else { // Get previous bounce - let row = db.query_opt( - "SELECT id, status FROM bounce WHERE bounced=$1", - &[&bounced.as_byte_array().as_slice()], - )?; + let row = sqlx::query("SELECT id, status FROM bounce WHERE bounced=$1") + .bind(bounced.as_byte_array()) + .fetch_optional(&mut *db) + .await?; if let Some(row) = row { // If already in database, sync status let row_id: i64 = row.get(0); let status: i16 = row.get(1); match BounceStatus::try_from(status as u8).unwrap() { BounceStatus::Requested => { - let nb_row = db.execute( + let nb_row = sqlx::query( "UPDATE bounce SET status=$1, txid=$2 WHERE id=$3 AND status=$4", - &[ - &(BounceStatus::Sent as i16), - &id.as_byte_array().as_slice(), - &row_id, - &status, - ], - )?; + ) + .bind(BounceStatus::Sent as i16) + .bind(id.as_byte_array()) + .bind(row_id) + .bind(status) + .execute(db) + .await? + .rows_affected(); if nb_row > 0 { warn!("|| (recovered) {bounced} in {id}"); } @@ -505,10 +514,9 @@ fn sync_chain_bounce( } } else { // Else add to database - let nb = db.execute( - "INSERT INTO bounce (created, bounced, txid, status) VALUES ($1, $2, $3, $4) ON CONFLICT (txid) DO NOTHING", - &[&Timestamp::now().as_sql_micros(), &bounced.as_byte_array().as_slice(), &id.as_byte_array().as_slice(), &(BounceStatus::Sent as i16)], - )?; + let nb = sqlx::query( + "INSERT INTO bounce (created, bounced, txid, status) VALUES ($1, $2, $3, $4) ON CONFLICT (txid) DO NOTHING") + .bind_timestamp(&Timestamp::now()).bind(bounced.as_byte_array()).bind(id.as_byte_array()).bind(BounceStatus::Sent as i16).execute(db).await?.rows_affected(); if nb > 0 { warn!("|| (onchain) {bounced} in {id}"); } @@ -519,23 +527,24 @@ fn sync_chain_bounce( } /// Sync database with an outgoing transaction, return true if stuck -fn sync_chain_outgoing( +async fn sync_chain_outgoing( id: &Txid, confirmations: i32, rpc: &mut Rpc, - db: &mut Client, + db: &mut PgListener, state: &WorkerCfg, ) -> LoopResult<bool> { match rpc .get_tx_op_return(id) + .await .map(|(full, bytes)| (full, OutMetadata::decode(&bytes))) { Ok((full, Ok(info))) => match info { OutMetadata::Debit { wtid, .. } => { - return sync_chain_debit(id, &full, &wtid, rpc, db, confirmations, state); + return sync_chain_debit(id, &full, &wtid, rpc, db, confirmations, state).await; } OutMetadata::Bounce { bounced } => { - sync_chain_bounce(id, &Txid::from_byte_array(bounced), db, confirmations)? + sync_chain_bounce(id, &Txid::from_byte_array(bounced), db, confirmations).await? } }, Ok((_, Err(e))) => warn!("send: decode-info {id} - {e}"), @@ -548,38 +557,31 @@ fn sync_chain_outgoing( } /// Send a debit transaction on the blockchain, return false if no more requested transactions are found -fn debit(db: &mut Client, rpc: &mut Rpc, state: &WorkerCfg) -> LoopResult<bool> { +async fn debit(db: &mut PgListener, rpc: &mut Rpc, state: &WorkerCfg) -> LoopResult<bool> { // We rely on the advisory lock to ensure we are the only one sending transactions - let row = db.query_opt( - "SELECT id, (amount).val, (amount).frac, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY created LIMIT 1", - &[&(DebitStatus::Requested as i16)], - )?; + let row = sqlx::query( + "SELECT id, (amount).val, (amount).frac, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY created LIMIT 1").bind(DebitStatus::Requested as i16).fetch_optional(&mut *db).await?; if let Some(row) = &row { let id: i64 = row.get(0); - let amount = sql_btc_amount(row, 1, &state.currency); - let wtid: ShortHashCode = sql_base_32(row, 3); - let addr = sql_addr(row, 4); - let url = sql_url(row, 5); + let amount = sql_btc_amount(row, 1, &state.currency)?; + let wtid: ShortHashCode = row.try_get_base32(3)?; + let addr = sql_addr(row, 4)?; + let url = row.try_get_parse(5)?; let metadata = OutMetadata::Debit { wtid: wtid.clone(), url, }; - let tx_id = rpc.send( - &addr, - &amount, - Some(&metadata.encode().or_fail(|e| format!("{e}"))), - false, - )?; + let tx_id = rpc + .send(&addr, &amount, Some(&metadata.encode().unwrap()), false) + .await?; fail_point("(injected) fail debit", 0.3)?; - db.execute( - "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", - &[ - &(DebitStatus::Sent as i16), - &tx_id.as_byte_array().as_slice(), - &id, - ], - )?; + sqlx::query("UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3") + .bind(DebitStatus::Sent as i16) + .bind(tx_id.as_byte_array()) + .bind(id) + .execute(db) + .await?; let amount = btc_to_taler(&amount.to_signed().unwrap(), &state.currency); info!(">> {amount} {wtid} in {tx_id} to {addr}"); } @@ -587,34 +589,32 @@ fn debit(db: &mut Client, rpc: &mut Rpc, state: &WorkerCfg) -> LoopResult<bool> } /// Bounce a transaction on the blockchain, return false if no more requested transactions are found -fn bounce(db: &mut Client, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> { +async fn bounce(db: &mut PgListener, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> { // We rely on the advisory lock to ensure we are the only one sending transactions - let row = db.query_opt( - "SELECT id, bounced FROM bounce WHERE status=$1 ORDER BY created LIMIT 1", - &[&(BounceStatus::Requested as i16)], - )?; + let row = + sqlx::query("SELECT id, bounced FROM bounce WHERE status=$1 ORDER BY created LIMIT 1") + .bind(BounceStatus::Requested as i16) + .fetch_optional(&mut *db) + .await?; if let Some(row) = &row { let id: i64 = row.get(0); - let bounced: Txid = sql_txid(row, 1); + let bounced: Txid = sql_txid(row, 1)?; let metadata = OutMetadata::Bounce { bounced: *bounced.as_byte_array(), }; - match rpc.bounce( - &bounced, - fee, - Some(&metadata.encode().or_fail(|e| format!("{e}"))), - ) { + match rpc + .bounce(&bounced, fee, Some(&metadata.encode().unwrap())) + .await + { Ok(it) => { fail_point("(injected) fail bounce", 0.3)?; - db.execute( - "UPDATE bounce SET txid=$1, status=$2 WHERE id=$3", - &[ - &it.as_byte_array().as_slice(), - &(BounceStatus::Sent as i16), - &id, - ], - )?; + sqlx::query("UPDATE bounce SET txid=$1, status=$2 WHERE id=$3") + .bind(it.as_byte_array()) + .bind(BounceStatus::Sent as i16) + .bind(id) + .execute(db) + .await?; info!("|| {bounced} in {it}"); } Err(err) => match err { @@ -622,10 +622,11 @@ fn bounce(db: &mut Client, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> { code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, msg, } => { - db.execute( - "UPDATE bounce SET status=$1 WHERE id=$2", - &[&(BounceStatus::Ignored as i16), &id], - )?; + sqlx::query("UPDATE bounce SET status=$1 WHERE id=$2") + .bind(BounceStatus::Ignored as i16) + .bind(id) + .execute(db) + .await?; info!("|| (ignore) {bounced} because {msg}"); } e => Err(e)?, diff --git a/depolymerizer-bitcoin/src/main.rs b/depolymerizer-bitcoin/src/main.rs @@ -16,7 +16,6 @@ use axum::{Router, middleware}; use bitcoin::hashes::Hash; use clap::Parser; -use common::named_spawn; use depolymerizer_bitcoin::{ CONFIG_SOURCE, DB_SCHEMA, api::{ServerState, status_middleware}, @@ -98,22 +97,22 @@ async fn app(args: Args, cfg: Config) -> anyhow::Result<()> { Command::Setup { reset } => { info!("Connect to bitcoind"); let state = WorkerCfg::parse(&cfg)?; - let mut rpc = Rpc::wallet(&state.rpc_cfg, &state.wallet_cfg.name)?; - let info = rpc.get_blockchain_info()?; + let mut rpc = Rpc::wallet(&state.rpc_cfg, &state.wallet_cfg.name).await?; + let info = rpc.get_blockchain_info().await?; info!("Running on {} chain", info.chain); #[cfg(feature = "fail")] if info.chain != "regtest" { anyhow::bail!("Running with random failures is unsuitable for production"); } - let genesis_hash = rpc.get_genesis().unwrap(); + let genesis_hash = rpc.get_genesis().await.unwrap(); // TODO wait for the blockchain to sync // TODO Check wire wallet own config PAYTO address info!("Check wallet"); - rpc.load_wallet(&state.wallet_cfg.name)?; + rpc.load_wallet(&state.wallet_cfg.name).await?; if let Some(password) = &state.wallet_cfg.password { - rpc.unlock_wallet(password)?; + rpc.unlock_wallet(password).await?; } info!("Setup database state"); @@ -123,28 +122,25 @@ async fn app(args: Args, cfg: Config) -> anyhow::Result<()> { // Init status to true sqlx::query("INSERT INTO state (name, value) VALUES ('status', $1) ON CONFLICT (name) DO NOTHING") .bind([1u8]) - .execute( &pool).await?; + .execute(&pool).await?; sqlx::query("INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING") .bind(genesis_hash.as_byte_array().as_slice()) - .execute( &pool).await?; + .execute(&pool).await?; // TODO reset ? println!("Database initialised"); } Command::Worker { transient } => { let state = WorkerCfg::parse(&cfg)?; + let db_cfg = parse_db_cfg(&cfg)?; + let pool = pool(db_cfg.cfg, DB_SCHEMA).await?; #[cfg(feature = "fail")] tracing::warn!("Running with random failures"); // TODO Check wire wallet own config PAYTO address - named_spawn("worker", move || { - let tmp = state.clone(); - named_spawn("watcher", move || watcher(&tmp.rpc_cfg, &tmp.db_config)); - worker(state) - }) - .join() - .unwrap(); + tokio::spawn(watcher(state.rpc_cfg.clone(), pool.clone())); + tokio::spawn(worker(state, pool)).await?; info!("btc-wire stopped"); } diff --git a/depolymerizer-bitcoin/src/rpc.rs b/depolymerizer-bitcoin/src/rpc.rs @@ -31,26 +31,30 @@ use bitcoin::{Address, Amount, BlockHash, SignedAmount, Txid, address::NetworkUn use serde_json::{Value, json}; use std::{ fmt::Debug, - io::{self, BufRead, BufReader, Write}, - net::TcpStream, + io::Write as _, time::{Duration, Instant}, }; +use tokio::{ + io::{self, AsyncBufReadExt as _, AsyncWriteExt as _, BufReader}, + net::TcpStream, + time::timeout, +}; use crate::config::{RpcAuth, RpcCfg, WalletCfg}; /// Create a rpc connection with an unlocked wallet -pub fn rpc_wallet(config: &RpcCfg, wallet: &WalletCfg) -> Result<Rpc> { - let mut rpc = Rpc::wallet(config, &wallet.name)?; - rpc.load_wallet(&wallet.name)?; +pub async fn rpc_wallet(config: &RpcCfg, wallet: &WalletCfg) -> Result<Rpc> { + let mut rpc = Rpc::wallet(config, &wallet.name).await?; + rpc.load_wallet(&wallet.name).await?; if let Some(password) = &wallet.password { - rpc.unlock_wallet(password)?; + rpc.unlock_wallet(password).await?; } Ok(rpc) } /// Create a rpc connection -pub fn rpc_common(config: &RpcCfg) -> Result<Rpc> { - Ok(Rpc::common(config)?) +pub async fn rpc_common(config: &RpcCfg) -> Result<Rpc> { + Ok(Rpc::common(config).await?) } #[derive(Debug, serde::Serialize)] @@ -114,16 +118,16 @@ pub struct Rpc { impl Rpc { /// Start a RPC connection - pub fn common(cfg: &RpcCfg) -> io::Result<Self> { - Self::new(cfg, None) + pub async fn common(cfg: &RpcCfg) -> io::Result<Self> { + Self::new(cfg, None).await } /// Start a wallet RPC connection - pub fn wallet(cfg: &RpcCfg, wallet: &str) -> io::Result<Self> { - Self::new(cfg, Some(wallet)) + pub async fn wallet(cfg: &RpcCfg, wallet: &str) -> io::Result<Self> { + Self::new(cfg, Some(wallet)).await } - fn new(cfg: &RpcCfg, wallet: Option<&str>) -> io::Result<Self> { + async fn new(cfg: &RpcCfg, wallet: Option<&str>) -> io::Result<Self> { let path = if let Some(wallet) = wallet { format!("/wallet/{wallet}") } else { @@ -136,7 +140,7 @@ impl Rpc { RpcAuth::Cookie(path) => std::fs::read(path)?, }; // Open connection - let sock = TcpStream::connect_timeout(&cfg.addr, Duration::from_secs(5))?; + let sock = timeout(Duration::from_secs(5), TcpStream::connect(&cfg.addr)).await??; let conn = BufReader::new(sock); Ok(Self { @@ -149,7 +153,7 @@ impl Rpc { }) } - fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T> + async fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T> where T: serde::de::DeserializeOwned + Debug, { @@ -175,16 +179,16 @@ impl Rpc { writeln!(buf, "Content-Length: {}\r", body.len())?; // Write separator writeln!(buf, "\r")?; - sock.write_all(buf)?; + sock.write_all(buf).await?; buf.clear(); // Write body - sock.write_all(&body)?; - sock.flush()?; + sock.write_all(&body).await?; + sock.flush().await?; } // Skip response let sock = &mut self.conn; loop { - let amount = sock.read_until(b'\n', buf)?; + let amount = sock.read_until(b'\n', buf).await?; let sep = buf[..amount] == [b'\r', b'\n']; buf.clear(); if sep { @@ -193,7 +197,7 @@ impl Rpc { self.last_call = Instant::now(); } // Read body - let amount = sock.read_until(b'\n', buf)?; + let amount = sock.read_until(b'\n', buf).await?; let response: RpcResponse<T> = serde_json::from_slice(&buf[..amount])?; match response { RpcResponse::RpcResponse { result, error, id } => { @@ -218,13 +222,14 @@ impl Rpc { /* ----- Wallet management ----- */ /// Create encrypted native bitcoin wallet - pub fn create_wallet(&mut self, name: &str, passwd: &str) -> Result<Wallet> { + pub async fn create_wallet(&mut self, name: &str, passwd: &str) -> Result<Wallet> { self.call("createwallet", &(name, (), (), passwd, (), true)) + .await } /// Load existing wallet - pub fn load_wallet(&mut self, name: &str) -> Result<Wallet> { - match self.call("loadwallet", &[name]) { + pub async fn load_wallet(&mut self, name: &str) -> Result<Wallet> { + match self.call("loadwallet", &[name]).await { Err(Error::RPC { code: ErrorCode::RpcWalletAlreadyLoaded, .. @@ -236,64 +241,65 @@ impl Rpc { } /// Unlock loaded wallet - pub fn unlock_wallet(&mut self, passwd: &str) -> Result<()> { + pub async fn unlock_wallet(&mut self, passwd: &str) -> Result<()> { // TODO Capped at 3yrs, is it enough ? - expect_null(self.call("walletpassphrase", &(passwd, 100000000))) + expect_null(self.call("walletpassphrase", &(passwd, 100000000)).await) } /* ----- Wallet utils ----- */ /// Generate a new address fot the current wallet - pub fn gen_addr(&mut self) -> Result<Address> { + pub async fn gen_addr(&mut self) -> Result<Address> { Ok(self - .call::<Address<NetworkUnchecked>>("getnewaddress", &EMPTY)? + .call::<Address<NetworkUnchecked>>("getnewaddress", &EMPTY) + .await? .assume_checked()) } /// Get current balance amount - pub fn get_balance(&mut self) -> Result<Amount> { - let btc: f64 = self.call("getbalance", &EMPTY)?; + pub async fn get_balance(&mut self) -> Result<Amount> { + let btc: f64 = self.call("getbalance", &EMPTY).await?; Ok(Amount::from_btc(btc).unwrap()) } /* ----- Mining ----- */ /// Mine a certain amount of block to profit a given address - pub fn mine(&mut self, nb: u16, address: &Address) -> Result<Vec<BlockHash>> { - self.call("generatetoaddress", &(nb, address)) + pub async fn mine(&mut self, nb: u16, address: &Address) -> Result<Vec<BlockHash>> { + self.call("generatetoaddress", &(nb, address)).await } /* ----- Getter ----- */ /// Get blockchain info - pub fn get_blockchain_info(&mut self) -> Result<BlockchainInfo> { - self.call("getblockchaininfo", &EMPTY) + pub async fn get_blockchain_info(&mut self) -> Result<BlockchainInfo> { + self.call("getblockchaininfo", &EMPTY).await } /// Get chain tips - pub fn get_chain_tips(&mut self) -> Result<Vec<ChainTips>> { - self.call("getchaintips", &EMPTY) + pub async fn get_chain_tips(&mut self) -> Result<Vec<ChainTips>> { + self.call("getchaintips", &EMPTY).await } /// Get wallet transaction info from id - pub fn get_tx(&mut self, id: &Txid) -> Result<Transaction> { - self.call("gettransaction", &(id, (), true)) + pub async fn get_tx(&mut self, id: &Txid) -> Result<Transaction> { + self.call("gettransaction", &(id, (), true)).await } /// Get transaction inputs and outputs - pub fn get_input_output(&mut self, id: &Txid) -> Result<InputOutput> { - self.call("getrawtransaction", &(id, true)) + pub async fn get_input_output(&mut self, id: &Txid) -> Result<InputOutput> { + self.call("getrawtransaction", &(id, true)).await } /// Get genesis block hash - pub fn get_genesis(&mut self) -> Result<BlockHash> { - self.call("getblockhash", &[0]) + pub async fn get_genesis(&mut self) -> Result<BlockHash> { + self.call("getblockhash", &[0]).await } /* ----- Transactions ----- */ /// Send bitcoin transaction - pub fn send( + pub async fn send( &mut self, to: &Address, amount: &Amount, @@ -301,18 +307,21 @@ impl Rpc { subtract_fee: bool, ) -> Result<Txid> { self.send_custom([], [(to, amount)], data, subtract_fee) + .await .map(|it| it.txid) } /// Send bitcoin transaction with multiple recipients - pub fn send_many<'a>( + pub async fn send_many<'a>( &mut self, to: impl IntoIterator<Item = (&'a Address, &'a Amount)>, ) -> Result<Txid> { - self.send_custom([], to, None, false).map(|it| it.txid) + self.send_custom([], to, None, false) + .await + .map(|it| it.txid) } - fn send_custom<'a>( + async fn send_custom<'a>( &mut self, from: impl IntoIterator<Item = &'a Txid>, to: impl IntoIterator<Item = (&'a Address, &'a Amount)>, @@ -354,51 +363,53 @@ impl Rpc { }, ), ) + .await } /// Bump transaction fees of a wallet debit - pub fn bump_fee(&mut self, id: &Txid) -> Result<BumpResult> { - self.call("bumpfee", &[id]) + pub async fn bump_fee(&mut self, id: &Txid) -> Result<BumpResult> { + self.call("bumpfee", &[id]).await } /// Abandon a pending transaction - pub fn abandon_tx(&mut self, id: &Txid) -> Result<()> { - expect_null(self.call("abandontransaction", &[&id])) + pub async fn abandon_tx(&mut self, id: &Txid) -> Result<()> { + expect_null(self.call("abandontransaction", &[&id]).await) } /* ----- Watcher ----- */ /// Block until a new block is mined - pub fn wait_for_new_block(&mut self) -> Result<Nothing> { - self.call("waitfornewblock", &[0]) + pub async fn wait_for_new_block(&mut self) -> Result<Nothing> { + self.call("waitfornewblock", &[0]).await } /// List new and removed transaction since a block - pub fn list_since_block( + pub async fn list_since_block( &mut self, hash: Option<&BlockHash>, confirmation: u32, ) -> Result<ListSinceBlock> { self.call("listsinceblock", &(hash, confirmation.max(1), (), true)) + .await } /* ----- Cluster ----- */ /// Try a connection to a node once - pub fn add_node(&mut self, addr: &str) -> Result<()> { - expect_null(self.call("addnode", &(addr, "onetry"))) + pub async fn add_node(&mut self, addr: &str) -> Result<()> { + expect_null(self.call("addnode", &(addr, "onetry")).await) } /// Immediately disconnects from the specified peer node. - pub fn disconnect_node(&mut self, addr: &str) -> Result<()> { - expect_null(self.call("disconnectnode", &(addr, ()))) + pub async fn disconnect_node(&mut self, addr: &str) -> Result<()> { + expect_null(self.call("disconnectnode", &(addr, ())).await) } /* ----- Control ------ */ /// Request a graceful shutdown - pub fn stop(&mut self) -> Result<String> { - self.call("stop", &()) + pub async fn stop(&mut self) -> Result<String> { + self.call("stop", &()).await } } diff --git a/depolymerizer-bitcoin/src/rpc_utils.rs b/depolymerizer-bitcoin/src/rpc_utils.rs @@ -67,9 +67,9 @@ pub fn segwit_min_amount() -> Amount { } /// Get the first sender address from a raw transaction -pub fn sender_address(rpc: &mut Rpc, full: &Transaction) -> rpc::Result<Address> { +pub async fn sender_address(rpc: &mut Rpc, full: &Transaction) -> rpc::Result<Address> { let first = &full.decoded.vin[0]; - let tx = rpc.get_input_output(&first.txid.unwrap())?; + let tx = rpc.get_input_output(&first.txid.unwrap()).await?; Ok(tx .vout .into_iter() diff --git a/depolymerizer-bitcoin/src/segwit.rs b/depolymerizer-bitcoin/src/segwit.rs @@ -14,7 +14,8 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ use bech32::Hrp; -use common::{rand::rngs::ThreadRng, rand_slice, taler_common::api_common::EddsaPublicKey}; +use common::{rand_slice, taler_common::api_common::EddsaPublicKey}; +use rand::rngs::ThreadRng; use std::cmp::Ordering; /// TODO use segwit v1 to only use a single address @@ -124,7 +125,7 @@ pub fn decode_segwit_msg( // TODO find a way to hide that function while using it in test and benchmark pub fn rand_addresses(hrp: Hrp, key: &[u8; 32]) -> Vec<String> { - use common::rand::prelude::SliceRandom; + use rand::prelude::SliceRandom; let mut rng_address: Vec<String> = std::iter::repeat_with(|| encode_segwit_addr(hrp, &rand_slice())) @@ -139,10 +140,8 @@ pub fn rand_addresses(hrp: Hrp, key: &[u8; 32]) -> Vec<String> { #[cfg(test)] mod test { - use common::{ - rand::{prelude::SliceRandom, rngs::ThreadRng}, - taler_common::types::base32::Base32, - }; + use common::taler_common::types::base32::Base32; + use rand::{prelude::SliceRandom, rngs::ThreadRng}; use crate::segwit::{decode_segwit_msg, encode_segwit_key, rand_addresses}; diff --git a/depolymerizer-bitcoin/src/sql.rs b/depolymerizer-bitcoin/src/sql.rs @@ -14,35 +14,26 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::str::FromStr as _; - -use bitcoin::{Address, Amount as BtcAmount, Txid, hashes::Hash}; -use common::postgres::Row; -use common::{log::OrFail, sql::sql_amount}; +use bitcoin::{Address, Amount as BtcAmount, Txid, address::NetworkUnchecked, hashes::Hash}; use depolymerizer_bitcoin::taler_utils::taler_to_btc; +use sqlx::postgres::PgRow; +use taler_api::db::TypeHelper; use taler_common::types::amount::Currency; /// Bitcoin amount from sql -pub fn sql_btc_amount(row: &Row, idx: usize, currency: &Currency) -> BtcAmount { - let amount = sql_amount(row, idx, currency); - taler_to_btc(&amount) +pub fn sql_btc_amount(row: &PgRow, idx: usize, currency: &Currency) -> sqlx::Result<BtcAmount> { + let amount = row.try_get_amount_i(idx, currency)?; + Ok(taler_to_btc(&amount)) } /// Bitcoin address from sql -pub fn sql_addr(row: &Row, idx: usize) -> Address { - let str = row.get(idx); - Address::from_str(str) - .or_fail(|_| format!("Database invariant: expected an bitcoin address got {str}")) - .assume_checked() +pub fn sql_addr(row: &PgRow, idx: usize) -> sqlx::Result<Address> { + Ok(row + .try_get_parse::<_, _, Address<NetworkUnchecked>>(idx)? + .assume_checked()) } /// Bitcoin transaction id from sql -pub fn sql_txid(row: &Row, idx: usize) -> Txid { - let slice: &[u8] = row.get(idx); - Txid::from_slice(slice).or_fail(|_| { - format!( - "Database invariant: expected a transaction if got an array of {}B", - slice.len() - ) - }) +pub fn sql_txid(row: &PgRow, idx: usize) -> sqlx::Result<Txid> { + row.try_get_map(idx, Txid::from_slice) } diff --git a/instrumentation/Cargo.toml b/instrumentation/Cargo.toml @@ -18,15 +18,12 @@ bitcoin.workspace = true ureq = { version = "3.0.0", features = ["json"] } # Generate temporary files tempfile = "3.3.0" -# RNG -fastrand = "2.0.1" # terminal color owo-colors = "4.0.0" # Edit toml files rust-ini = "0.21.0" # Progress reporting indicatif = "0.18.0" -thread-local-panic-hook = "0.1.0" taler-common.workspace = true -taler-api.workspace = true +tokio.workspace = true anyhow.workspace = true diff --git a/instrumentation/src/btc.rs b/instrumentation/src/btc.rs @@ -38,9 +38,9 @@ use ini::Ini; use taler_common::{config::Config, types::payto::Payto}; use tempfile::TempDir; -use crate::utils::{ - ChildGuard, TalerCtx, TestCtx, cmd_redirect, patch_config, retry, retry_opt, transfer, - unused_port, +use crate::{ + retry, retry_opt, + utils::{ChildGuard, TalerCtx, TestCtx, cmd_redirect, patch_config, transfer, unused_port}, }; pub struct BtcCtx { @@ -76,7 +76,7 @@ impl DerefMut for BtcCtx { } impl BtcCtx { - pub fn config( + pub async fn config( ctx: &TestCtx, btc_patch: impl FnOnce(&mut Ini, &Path), cfg_patch: impl FnOnce(&mut Ini, &Path), @@ -116,11 +116,13 @@ impl BtcCtx { ctx.log("bitcoind"), ); // Connect - retry_opt(|| { - let mut client = Rpc::common(&rpc_cfg)?; - client.get_blockchain_info()?; - Ok::<_, anyhow::Error>(()) - }); + retry_opt! { + async { + let mut client = Rpc::common(&rpc_cfg).await?; + client.get_blockchain_info().await?; + Ok::<_, anyhow::Error>(()) + }.await + }; } fn patch_btc_config(from: impl AsRef<Path>, to: impl AsRef<Path>, port: u16, rpc_port: u16) { @@ -131,11 +133,9 @@ impl BtcCtx { }) } - pub fn setup(ctx: &TestCtx, config: &str, stressed: bool) -> Self { + pub async fn setup(ctx: &TestCtx, config: &str, stressed: bool) -> Self { let mut ctx = TalerCtx::new(ctx, "depolymerizer-bitcoin", config, stressed); - ctx.dbinit(); - // Choose unused port let btc_port = unused_port(); let btc_rpc_port = unused_port(); @@ -181,37 +181,37 @@ impl BtcCtx { ); // Setup wallets - let mut common_rpc = retry_opt(|| Rpc::common(&cfg.rpc_cfg)); + let mut common_rpc = retry_opt! { Rpc::common(&cfg.rpc_cfg).await }; let node2_addr = format!("127.0.0.1:{btc2_port}"); - common_rpc.add_node(&node2_addr).unwrap(); + common_rpc.add_node(&node2_addr).await.unwrap(); for name in ["wire", "client", "reserve"] { - common_rpc.create_wallet(name, "").unwrap(); + common_rpc.create_wallet(name, "").await.unwrap(); } - let common_rpc2 = retry_opt(|| { + let common_rpc2 = retry_opt! { Rpc::common(&RpcCfg { addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), btc2_rpc_port), - auth: RpcAuth::Cookie( - ctx.wire2_dir + auth: RpcAuth::Cookie(ctx + .wire2_dir .join("regtest/.cookie") .to_string_lossy() - .to_string(), - ), - }) - }); + .to_string()), + }).await + }; // Generate money - let mut reserve_rpc = Rpc::wallet(&cfg.rpc_cfg, "reserve").unwrap(); - let mut client_rpc = Rpc::wallet(&cfg.rpc_cfg, "client").unwrap(); - let mut wire_rpc = Rpc::wallet(&cfg.rpc_cfg, "wire").unwrap(); - let reserve_addr = reserve_rpc.gen_addr().unwrap(); - let client_addr = client_rpc.gen_addr().unwrap(); - let wire_addr = wire_rpc.gen_addr().unwrap(); - common_rpc.mine(101, &reserve_addr).unwrap(); + let mut reserve_rpc = Rpc::wallet(&cfg.rpc_cfg, "reserve").await.unwrap(); + let mut client_rpc = Rpc::wallet(&cfg.rpc_cfg, "client").await.unwrap(); + let mut wire_rpc = Rpc::wallet(&cfg.rpc_cfg, "wire").await.unwrap(); + let reserve_addr = reserve_rpc.gen_addr().await.unwrap(); + let client_addr = client_rpc.gen_addr().await.unwrap(); + let wire_addr = wire_rpc.gen_addr().await.unwrap(); + common_rpc.mine(101, &reserve_addr).await.unwrap(); reserve_rpc .send(&client_addr, &(Amount::ONE_BTC * 10), None, false) + .await .unwrap(); - common_rpc.mine(1, &reserve_addr).unwrap(); + common_rpc.mine(1, &reserve_addr).await.unwrap(); patch_config(&ctx.conf, &ctx.conf, |cfg| { cfg.with_section(Some("depolymerizer-bitcoin")) @@ -223,8 +223,9 @@ impl BtcCtx { let serve_cfg = ServeCfg::parse(&config).unwrap(); // Setup & run + ctx.dbinit(); ctx.setup(); - ctx.run(); + ctx.run().await; Self { ctx, @@ -250,56 +251,65 @@ impl BtcCtx { self.ctx.setup(); } - pub fn stop_node(&mut self) { + pub async fn stop_node(&mut self) { // We need to kill bitcoin gracefully to avoid corruption - self.common_rpc.stop().unwrap(); + self.common_rpc.stop().await.unwrap(); self.btc_node.0.wait().unwrap(); } - pub fn cluster_deco(&mut self) { - self.common_rpc.disconnect_node(&self.node2_addr).unwrap(); + pub async fn cluster_deco(&mut self) { + self.common_rpc + .disconnect_node(&self.node2_addr) + .await + .unwrap(); } - pub fn cluster_fork(&mut self) { - let node1_height = self.common_rpc.get_blockchain_info().unwrap().blocks; - let node2_height = self.common_rpc2.get_blockchain_info().unwrap().blocks; + pub async fn cluster_fork(&mut self) { + let node1_height = self.common_rpc.get_blockchain_info().await.unwrap().blocks; + let node2_height = self.common_rpc2.get_blockchain_info().await.unwrap().blocks; let diff = node1_height - node2_height; self.common_rpc2 .mine((diff + 1) as u16, &self.reserve_addr) + .await .unwrap(); - self.common_rpc.add_node(&self.node2_addr).unwrap(); + self.common_rpc.add_node(&self.node2_addr).await.unwrap(); } - pub fn restart_node(&mut self, additional_args: &[&str]) { - self.stop_node(); - self.resume_node(additional_args); + pub async fn restart_node(&mut self, additional_args: &[&str]) { + self.stop_node().await; + self.resume_node(additional_args).await; } - pub fn resume_node(&mut self, additional_args: &[&str]) { + pub async fn resume_node(&mut self, additional_args: &[&str]) { let datadir = format!("-datadir={}", self.ctx.wire_dir.to_string_lossy()); let mut args = vec![datadir.as_str()]; args.extend_from_slice(additional_args); self.btc_node = cmd_redirect("bitcoind", &args, self.ctx.log("bitcoind")); - self.common_rpc = retry_opt(|| Rpc::common(&self.worker_cfg.rpc_cfg)); - self.common_rpc.add_node(&self.node2_addr).unwrap(); + self.common_rpc = retry_opt! { Rpc::common(&self.worker_cfg.rpc_cfg).await }; + self.common_rpc.add_node(&self.node2_addr).await.unwrap(); for name in ["client", "reserve", "wire"] { - self.common_rpc.load_wallet(name).ok(); + self.common_rpc.load_wallet(name).await.ok(); } - self.reserve_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "reserve").unwrap(); - self.client_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "client").unwrap(); - self.wire_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "wire").unwrap(); + self.reserve_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "reserve") + .await + .unwrap(); + self.client_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "client") + .await + .unwrap(); + self.wire_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "wire").await.unwrap(); } /* ----- Transaction ------ */ - pub fn credit(&mut self, amount: Amount, metadata: &EddsaPublicKey) { + pub async fn credit(&mut self, amount: Amount, metadata: &EddsaPublicKey) { self.client_rpc .send_segwit_key(&self.wire_addr, &amount, metadata) + .await .unwrap(); } - pub fn debit(&mut self, amount: Amount, metadata: &ShortHashCode) { + pub async fn debit(&mut self, amount: Amount, metadata: &ShortHashCode) { transfer( &self.ctx.gateway_url, metadata, @@ -308,87 +318,89 @@ impl BtcCtx { .as_full_payto("name"), &btc_to_taler(&amount.to_signed().unwrap(), &self.worker_cfg.currency), ) + .await } - pub fn malformed_credit(&mut self, amount: &Amount) { + pub async fn malformed_credit(&mut self, amount: &Amount) { self.client_rpc .send(&self.wire_addr, amount, None, false) + .await .unwrap(); } - pub fn reset_wallet(&mut self) { - let amount = self.wire_balance(); + pub async fn reset_wallet(&mut self) { + let amount = self.wire_balance().await; self.wire_rpc .send(&self.client_addr, &amount, None, true) + .await .unwrap(); - self.next_block(); + self.next_block().await; } - fn abandon(rpc: &mut Rpc) { - let list = rpc.list_since_block(None, 1).unwrap(); + async fn abandon(rpc: &mut Rpc) { + let list = rpc.list_since_block(None, 1).await.unwrap(); for tx in list.transactions { if tx.category == Category::Send && tx.confirmations == 0 { - rpc.abandon_tx(&tx.txid).unwrap(); + rpc.abandon_tx(&tx.txid).await.unwrap(); } } } - pub fn abandon_wire(&mut self) { - Self::abandon(&mut self.wire_rpc); + pub async fn abandon_wire(&mut self) { + Self::abandon(&mut self.wire_rpc).await; } - pub fn abandon_client(&mut self) { - Self::abandon(&mut self.client_rpc); + pub async fn abandon_client(&mut self) { + Self::abandon(&mut self.client_rpc).await; } /* ----- Mining ----- */ - fn mine(&mut self, nb: u16) { - self.common_rpc.mine(nb, &self.reserve_addr).unwrap(); + async fn mine(&mut self, nb: u16) { + self.common_rpc.mine(nb, &self.reserve_addr).await.unwrap(); } - pub fn next_conf(&mut self) { - self.mine(self.conf) + pub async fn next_conf(&mut self) { + self.mine(self.conf).await } - pub fn next_block(&mut self) { - self.mine(1) + pub async fn next_block(&mut self) { + self.mine(1).await } /* ----- Balances ----- */ - pub fn client_balance(&mut self) -> Amount { - self.client_rpc.get_balance().unwrap() - } - - pub fn wire_balance(&mut self) -> Amount { - self.wire_rpc.get_balance().unwrap() + pub async fn client_balance(&mut self) -> Amount { + self.client_rpc.get_balance().await.unwrap() } - fn expect_balance(&mut self, balance: Amount, mine: bool, lambda: fn(&mut Self) -> Amount) { - retry( - || { - let check = balance == lambda(self); - if !check && mine { - self.next_block(); - } - check - }, - "balance", - ); + pub async fn wire_balance(&mut self) -> Amount { + self.wire_rpc.get_balance().await.unwrap() } - pub fn expect_client_balance(&mut self, balance: Amount, mine: bool) { - self.expect_balance(balance, mine, Self::client_balance) + pub async fn expect_client_balance(&mut self, balance: Amount, mine: bool) { + retry! {{ + let check = balance == self.client_balance().await; + if !check && mine { + self.next_block().await; + } + check + }} } - pub fn expect_wire_balance(&mut self, balance: Amount, mine: bool) { - self.expect_balance(balance, mine, Self::wire_balance) + pub async fn expect_wire_balance(&mut self, balance: Amount, mine: bool) { + retry! {{ + let check = balance == self.wire_balance().await; + if !check && mine { + self.next_block().await; + } + check + }} } /* ----- Wire Gateway ----- */ - pub fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) { + pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) { let txs: Vec<_> = txs .iter() .map(|(metadata, amount)| { @@ -398,10 +410,10 @@ impl BtcCtx { ) }) .collect(); - self.ctx.expect_credits(&txs) + self.ctx.expect_credits(&txs).await } - pub fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) { + pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) { let txs: Vec<_> = txs .iter() .map(|(metadata, amount)| { @@ -411,96 +423,90 @@ impl BtcCtx { ) }) .collect(); - self.ctx.expect_debits(&txs) + self.ctx.expect_debits(&txs).await } } /// Test btc-wire correctly receive and send transactions on the blockchain -pub fn wire(ctx: TestCtx) { +pub async fn wire(ctx: TestCtx) { ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; ctx.step("Credit"); { // Send transactions - let mut balance = ctx.wire_balance(); + let mut balance = ctx.wire_balance().await; let mut txs = Vec::new(); for n in 10..100 { let metadata = Base32::rand(); let amount = Amount::from_sat(n * 1000); - ctx.credit(amount, &metadata); + ctx.credit(amount, &metadata).await; txs.push((metadata, amount)); balance += amount; - ctx.next_block(); + ctx.next_block().await; } - ctx.next_conf(); - ctx.expect_credits(&txs); - ctx.expect_wire_balance(balance, false); + ctx.next_conf().await; + ctx.expect_credits(&txs).await; + ctx.expect_wire_balance(balance, false).await; }; ctx.step("Debit"); { - let mut balance = ctx.client_balance(); + let mut balance = ctx.client_balance().await; let mut txs = Vec::new(); for n in 10..100 { let metadata = Base32::rand(); let amount = Amount::from_sat(n * 100); balance += amount; - ctx.debit(amount, &metadata); + ctx.debit(amount, &metadata).await; txs.push((metadata, amount)); } - ctx.next_block(); - ctx.expect_debits(&txs); - ctx.expect_client_balance(balance, true); + ctx.next_block().await; + ctx.expect_debits(&txs).await; + ctx.expect_client_balance(balance, true).await; } ctx.step("Bounce"); { - ctx.reset_wallet(); + ctx.reset_wallet().await; // Send bad transactions - let mut balance = ctx.wire_balance(); + let mut balance = ctx.wire_balance().await; for n in 10..40 { - ctx.malformed_credit(&Amount::from_sat(n * 1000)); + ctx.malformed_credit(&Amount::from_sat(n * 1000)).await; balance += ctx.worker_cfg.bounce_fee; } - ctx.next_conf(); - ctx.expect_wire_balance(balance, true); + ctx.next_conf().await; + ctx.expect_wire_balance(balance, true).await; } } /// Check btc-wire and wire-gateway correctly stop when a lifetime limit is configured -pub fn lifetime(ctx: TestCtx) { +pub async fn lifetime(ctx: TestCtx) { ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc_lifetime.conf", false); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc_lifetime.conf", false).await; ctx.step("Check lifetime"); // Start up - retry( - || ctx.wire_running() && ctx.gateway_running(), - "both running", - ); + retry! { ctx.wire_running() && ctx.gateway_running() }; // Consume wire lifetime for _ in 0..=ctx.worker_cfg.lifetime.unwrap() + 2 { - ctx.credit(segwit_min_amount(), &Base32::rand()); - ctx.next_block(); - std::thread::sleep(Duration::from_millis(200)); + ctx.credit(segwit_min_amount(), &Base32::rand()).await; + ctx.next_block().await; + tokio::time::sleep(Duration::from_millis(100)).await; } - retry(|| !ctx.wire_running(), "wire not running"); + retry! { !ctx.wire_running() }; // Consume gateway lifetime for _ in 0..=ctx.serve_cfg.lifetime.unwrap() { - ctx.debit(segwit_min_amount(), &Base32::rand()); - ctx.next_block(); + ctx.debit(segwit_min_amount(), &Base32::rand()).await; + ctx.next_block().await; } // End down - retry( - || !ctx.wire_running() && !ctx.gateway_running(), - "both down", - ); + retry! { !ctx.wire_running() && !ctx.gateway_running() }; } /// Check the capacity of wire-gateway and btc-wire to recover from database and node loss -pub fn reconnect(ctx: TestCtx) { +pub async fn reconnect(ctx: TestCtx) { ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; let mut credits = Vec::new(); let mut debits = Vec::new(); @@ -509,490 +515,500 @@ pub fn reconnect(ctx: TestCtx) { { let metadata = Base32::rand(); let amount = Amount::from_sat(42000); - ctx.credit(amount, &metadata); + ctx.credit(amount, &metadata).await; credits.push((metadata, amount)); - ctx.next_block(); - ctx.next_conf(); - ctx.expect_credits(&credits); + ctx.next_block().await; + ctx.next_conf().await; + ctx.expect_credits(&credits).await; }; ctx.step("Without DB"); { ctx.stop_db(); - ctx.malformed_credit(&Amount::from_sat(24000)); + ctx.malformed_credit(&Amount::from_sat(24000)).await; let metadata = Base32::rand(); let amount = Amount::from_sat(40000); - ctx.credit(amount, &metadata); + ctx.credit(amount, &metadata).await; credits.push((metadata, amount)); - ctx.stop_node(); - ctx.expect_gateway_down(); + ctx.stop_node().await; + ctx.expect_gateway_down().await; } ctx.step("Reconnect DB"); { ctx.resume_db(); - ctx.resume_node(&[]); + ctx.resume_node(&[]).await; let metadata = Base32::rand(); let amount = Amount::from_sat(2000); - ctx.debit(amount, &metadata); + ctx.debit(amount, &metadata).await; debits.push((metadata, amount)); - ctx.next_conf(); - ctx.expect_debits(&debits); - ctx.expect_credits(&credits); + ctx.next_conf().await; + ctx.expect_debits(&debits).await; + ctx.expect_credits(&credits).await; } ctx.step("Recover DB"); { - let balance = ctx.wire_balance(); + let balance = ctx.wire_balance().await; ctx.reset_db(); - ctx.next_block(); - ctx.expect_debits(&debits); - ctx.expect_credits(&credits); - ctx.expect_wire_balance(balance, true); + ctx.next_block().await; + ctx.expect_debits(&debits).await; + ctx.expect_credits(&credits).await; + ctx.expect_wire_balance(balance, true).await; } } /// Test btc-wire ability to recover from errors in correctness critical paths and prevent concurrent sending -pub fn stress(ctx: TestCtx) { +pub async fn stress(ctx: TestCtx) { ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", true); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", true).await; let mut credits = Vec::new(); let mut debits = Vec::new(); ctx.step("Credit"); { - let mut balance = ctx.wire_balance(); + let mut balance = ctx.wire_balance().await; for n in 10..30 { let metadata = Base32::rand(); let amount = Amount::from_sat(n * 1000); - ctx.credit(amount, &metadata); + ctx.credit(amount, &metadata).await; credits.push((metadata, amount)); balance += amount; - ctx.next_block(); + ctx.next_block().await; } - ctx.next_conf(); - ctx.expect_credits(&credits); - ctx.expect_wire_balance(balance, true); + ctx.next_conf().await; + ctx.expect_credits(&credits).await; + ctx.expect_wire_balance(balance, true).await; }; ctx.step("Debit"); { - let mut balance = ctx.client_balance(); + let mut balance = ctx.client_balance().await; for n in 10..30 { let metadata = Base32::rand(); let amount = Amount::from_sat(n * 100); balance += amount; - ctx.debit(amount, &metadata); + ctx.debit(amount, &metadata).await; debits.push((metadata, amount)); } - ctx.next_block(); - ctx.expect_debits(&debits); - ctx.expect_client_balance(balance, true); + ctx.next_block().await; + ctx.expect_debits(&debits).await; + ctx.expect_client_balance(balance, true).await; } ctx.step("Bounce"); { - ctx.reset_wallet(); - let mut balance = ctx.wire_balance(); + ctx.reset_wallet().await; + let mut balance = ctx.wire_balance().await; for n in 10..30 { - ctx.malformed_credit(&Amount::from_sat(n * 1000)); + ctx.malformed_credit(&Amount::from_sat(n * 1000)).await; balance += ctx.worker_cfg.bounce_fee; } - ctx.next_conf(); - ctx.expect_wire_balance(balance, true); + ctx.next_conf().await; + ctx.expect_wire_balance(balance, true).await; } ctx.step("Recover DB"); { - let balance = ctx.wire_balance(); + let balance = ctx.wire_balance().await; ctx.reset_db(); - ctx.next_block(); - ctx.expect_debits(&debits); - ctx.expect_credits(&credits); - ctx.expect_wire_balance(balance, true); + ctx.next_block().await; + ctx.expect_debits(&debits).await; + ctx.expect_credits(&credits).await; + ctx.expect_wire_balance(balance, true).await; } } /// Test btc-wire ability to handle conflicting outgoing transactions -pub fn conflict(tctx: TestCtx) { +pub async fn conflict(tctx: TestCtx) { tctx.step("Setup"); - let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false); + let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; ctx.step("Conflict send"); { // Perform credit let amount = Amount::from_sat(4200000); - ctx.credit(amount, &Base32::rand()); - ctx.next_conf(); - ctx.expect_wire_balance(amount, true); - let client = ctx.client_balance(); - let wire = ctx.wire_balance(); + ctx.credit(amount, &Base32::rand()).await; + ctx.next_conf().await; + ctx.expect_wire_balance(amount, true).await; + let client = ctx.client_balance().await; + let wire = ctx.wire_balance().await; // Perform debit - ctx.debit(Amount::from_sat(400000), &Base32::rand()); - retry(|| ctx.wire_balance() < wire, "balance"); + ctx.debit(Amount::from_sat(400000), &Base32::rand()).await; + retry! { ctx.wire_balance().await < wire }; // Abandon pending transaction - ctx.restart_node(&["-minrelaytxfee=0.0001"]); - ctx.abandon_wire(); - ctx.expect_client_balance(client, false); - ctx.expect_wire_balance(wire, false); + ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; + ctx.abandon_wire().await; + ctx.expect_client_balance(client, false).await; + ctx.expect_wire_balance(wire, false).await; // Generate conflict - ctx.debit(Amount::from_sat(500000), &Base32::rand()); - retry(|| ctx.wire_balance() < wire, "balance"); + ctx.debit(Amount::from_sat(500000), &Base32::rand()).await; + retry! { ctx.wire_balance().await < wire }; // Resend conflicting transaction - ctx.restart_node(&[]); - ctx.next_block(); - let wire = ctx.wire_balance(); - retry(|| ctx.wire_balance() < wire, "balance"); + ctx.restart_node(&[]).await; + ctx.next_block().await; + let wire = ctx.wire_balance().await; + retry! { ctx.wire_balance().await < wire }; } ctx.step("Setup"); drop(ctx); - let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false); - ctx.credit(Amount::from_sat(3000000), &Base32::rand()); - ctx.next_block(); + let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; + ctx.credit(Amount::from_sat(3000000), &Base32::rand()).await; + ctx.next_block().await; ctx.step("Conflict bounce"); { // Perform bounce - let wire = ctx.wire_balance(); + let wire = ctx.wire_balance().await; let bounce_amount = Amount::from_sat(4000000); - ctx.malformed_credit(&bounce_amount); - ctx.next_conf(); + ctx.malformed_credit(&bounce_amount).await; + ctx.next_conf().await; let fee = ctx.worker_cfg.bounce_fee; - ctx.expect_wire_balance(wire + fee, true); + ctx.expect_wire_balance(wire + fee, true).await; // Abandon pending transaction - ctx.restart_node(&["-minrelaytxfee=0.0001"]); - ctx.abandon_wire(); - ctx.expect_wire_balance(wire + bounce_amount, false); + ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; + ctx.abandon_wire().await; + ctx.expect_wire_balance(wire + bounce_amount, false).await; // Generate conflict let amount = Amount::from_sat(50000); - ctx.debit(amount, &Base32::rand()); - retry(|| ctx.wire_balance() < (wire + bounce_amount), "balance"); + ctx.debit(amount, &Base32::rand()).await; + retry! { ctx.wire_balance().await < (wire + bounce_amount) }; // Resend conflicting transaction - ctx.restart_node(&[]); - let wire = ctx.wire_balance(); - ctx.next_block(); - retry(|| ctx.wire_balance() < wire, "balance"); + ctx.restart_node(&[]).await; + let wire = ctx.wire_balance().await; + ctx.next_block().await; + retry! { ctx.wire_balance().await < wire }; } } /// Test btc-wire correctness when a blockchain reorganization occurs -pub fn reorg(ctx: TestCtx) { +pub async fn reorg(ctx: TestCtx) { ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; ctx.step("Handle reorg incoming transactions"); { // Loose second bitcoin node - ctx.cluster_deco(); + ctx.cluster_deco().await; // Perform credits - let before = ctx.wire_balance(); + let before = ctx.wire_balance().await; for n in 10..21 { - ctx.credit(Amount::from_sat(n * 10000), &Base32::rand()); - ctx.next_block(); + ctx.credit(Amount::from_sat(n * 10000), &Base32::rand()) + .await; + ctx.next_block().await; } - let after = ctx.wire_balance(); + let after = ctx.wire_balance().await; // Perform fork and check btc-wire hard error - ctx.expect_gateway_up(); - ctx.cluster_fork(); - ctx.expect_wire_balance(before, false); - ctx.expect_gateway_down(); + ctx.expect_gateway_up().await; + ctx.cluster_fork().await; + ctx.expect_wire_balance(before, false).await; + ctx.expect_gateway_down().await; // Recover orphaned transaction - ctx.mine(12); - ctx.expect_wire_balance(after, false); - ctx.expect_gateway_up(); + ctx.mine(12).await; + ctx.expect_wire_balance(after, false).await; + ctx.expect_gateway_up().await; } ctx.step("Handle reorg outgoing transactions"); { // Loose second bitcoin node - ctx.cluster_deco(); + ctx.cluster_deco().await; // Perform debits - let before = ctx.client_balance(); - let mut after = ctx.client_balance(); + let before = ctx.client_balance().await; + let mut after = ctx.client_balance().await; for n in 10..21 { let amount = Amount::from_sat(n * 100); - ctx.debit(amount, &Base32::rand()); + ctx.debit(amount, &Base32::rand()).await; after += amount; } - ctx.next_block(); - ctx.expect_client_balance(after, true); + ctx.next_block().await; + ctx.expect_client_balance(after, true).await; // Perform fork and check btc-wire still up - ctx.expect_gateway_up(); - ctx.cluster_fork(); - ctx.expect_client_balance(before, false); - ctx.expect_gateway_up(); + ctx.expect_gateway_up().await; + ctx.cluster_fork().await; + ctx.expect_client_balance(before, false).await; + ctx.expect_gateway_up().await; // Recover orphaned transaction - ctx.next_conf(); - ctx.expect_client_balance(after, false); + ctx.next_conf().await; + ctx.expect_client_balance(after, false).await; } ctx.step("Handle reorg bounce"); { - ctx.reset_wallet(); + ctx.reset_wallet().await; // Loose second bitcoin node - ctx.cluster_deco(); + ctx.cluster_deco().await; // Perform bounce - let before = ctx.wire_balance(); - let mut after = ctx.wire_balance(); + let before = ctx.wire_balance().await; + let mut after = ctx.wire_balance().await; for n in 10..21 { - ctx.malformed_credit(&Amount::from_sat(n * 1000)); + ctx.malformed_credit(&Amount::from_sat(n * 1000)).await; after += ctx.worker_cfg.bounce_fee; } - ctx.next_conf(); - ctx.expect_wire_balance(after, true); + ctx.next_conf().await; + ctx.expect_wire_balance(after, true).await; // Perform fork and check btc-wire hard error - ctx.expect_gateway_up(); - ctx.cluster_fork(); - ctx.expect_wire_balance(before, false); - ctx.expect_gateway_down(); + ctx.expect_gateway_up().await; + ctx.cluster_fork().await; + ctx.expect_wire_balance(before, false).await; + ctx.expect_gateway_down().await; // Recover orphaned transaction - ctx.mine(12); - ctx.expect_wire_balance(after, false); - ctx.expect_gateway_up(); + ctx.mine(12).await; + ctx.expect_wire_balance(after, false).await; + ctx.expect_gateway_up().await; } } /// Test btc-wire correctness when a blockchain reorganization occurs leading to past incoming transaction conflict -pub fn hell(ctx: TestCtx) { - fn step(ctx: &TestCtx, name: &str, action: impl FnOnce(&mut BtcCtx)) { - ctx.step("Setup"); - let mut ctx = BtcCtx::setup(ctx, "taler_btc.conf", false); - ctx.step(name); - - // Loose second bitcoin node - ctx.cluster_deco(); - - // Perform action - action(&mut ctx); - - // Perform fork and check btc-wire hard error - ctx.expect_gateway_up(); - ctx.cluster_fork(); - ctx.expect_gateway_down(); - - // Generate conflict - ctx.restart_node(&["-minrelaytxfee=0.001"]); - ctx.abandon_client(); - let amount = Amount::from_sat(54000); - ctx.credit(amount, &Base32::rand()); - ctx.expect_wire_balance(amount, true); - - // Check btc-wire suspend operation - let bounce_amount = Amount::from_sat(34000); - ctx.malformed_credit(&bounce_amount); - ctx.next_conf(); - ctx.expect_wire_balance(amount + bounce_amount, true); - ctx.expect_gateway_down(); +pub async fn hell(tctx: TestCtx) { + macro_rules! step { + ($ctx:ident, $action:expr) => { + // Loose second bitcoin node + $ctx.cluster_deco().await; + + // Perform action + $action; + + // Perform fork and check btc-wire hard error + $ctx.expect_gateway_up().await; + $ctx.cluster_fork().await; + $ctx.expect_gateway_down().await; + + // Generate conflict + $ctx.restart_node(&["-minrelaytxfee=0.001"]).await; + $ctx.abandon_client().await; + let amount = Amount::from_sat(54000); + $ctx.credit(amount, &Base32::rand()).await; + $ctx.expect_wire_balance(amount, true).await; + + // Check btc-wire suspend operation + let bounce_amount = Amount::from_sat(34000); + $ctx.malformed_credit(&bounce_amount).await; + $ctx.next_conf().await; + $ctx.expect_wire_balance(amount + bounce_amount, true).await; + $ctx.expect_gateway_down().await; + }; } - step(&ctx, "Handle reorg conflicting incoming credit", |ctx| { + tctx.step("Setup"); + let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; + ctx.step("Handle reorg conflicting incoming credit"); + step!(ctx, { let amount = Amount::from_sat(420000); - ctx.credit(amount, &Base32::rand()); - ctx.next_conf(); - ctx.expect_wire_balance(amount, true); + ctx.credit(amount, &Base32::rand()).await; + ctx.next_conf().await; + ctx.expect_wire_balance(amount, true).await; }); - step(&ctx, "Handle reorg conflicting incoming bounce", |ctx| { + drop(ctx); + tctx.step("Setup"); + let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; + ctx.step("Handle reorg conflicting incoming credit"); + step!(ctx, { let amount = Amount::from_sat(420000); - ctx.malformed_credit(&amount); - ctx.next_conf(); + ctx.malformed_credit(&amount).await; + ctx.next_conf().await; let fee = ctx.worker_cfg.bounce_fee; - ctx.expect_wire_balance(fee, true); + ctx.expect_wire_balance(fee, true).await; }); } /// Test btc-wire ability to learn and protect itself from blockchain behavior -pub fn analysis(ctx: TestCtx) { +pub async fn analysis(ctx: TestCtx) { ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; ctx.step("Learn from reorg"); // Loose second bitcoin node - ctx.cluster_deco(); + ctx.cluster_deco().await; // Perform credit - let before = ctx.wire_balance(); - ctx.credit(Amount::from_sat(42000), &Base32::rand()); - ctx.next_conf(); - let after = ctx.wire_balance(); + let before = ctx.wire_balance().await; + ctx.credit(Amount::from_sat(42000), &Base32::rand()).await; + ctx.next_conf().await; + let after = ctx.wire_balance().await; // Perform fork and check btc-wire hard error - ctx.expect_gateway_up(); - ctx.cluster_fork(); - ctx.expect_wire_balance(before, false); - ctx.expect_gateway_down(); + ctx.expect_gateway_up().await; + ctx.cluster_fork().await; + ctx.expect_wire_balance(before, false).await; + ctx.expect_gateway_down().await; // Recover orphaned transaction - ctx.next_conf(); - ctx.next_block(); // Conf have changed - ctx.expect_wire_balance(after, false); - ctx.expect_gateway_up(); + ctx.next_conf().await; + ctx.next_block().await; // Conf have changed + ctx.expect_wire_balance(after, false).await; + ctx.expect_gateway_up().await; // Loose second bitcoin node - ctx.cluster_deco(); + ctx.cluster_deco().await; // Perform credit - let before = ctx.wire_balance(); - ctx.credit(Amount::from_sat(42000), &Base32::rand()); - ctx.next_conf(); + let before = ctx.wire_balance().await; + ctx.credit(Amount::from_sat(42000), &Base32::rand()).await; + ctx.next_conf().await; // Perform fork and check btc-wire learned from previous attack - ctx.expect_gateway_up(); - ctx.cluster_fork(); - ctx.expect_wire_balance(before, false); - ctx.expect_gateway_up(); + ctx.expect_gateway_up().await; + ctx.cluster_fork().await; + ctx.expect_wire_balance(before, false).await; + ctx.expect_gateway_up().await; } /// Test btc-wire ability to handle stuck transaction correctly -pub fn bumpfee(tctx: TestCtx) { +pub async fn bumpfee(tctx: TestCtx) { tctx.step("Setup"); - let mut ctx = BtcCtx::setup(&tctx, "taler_btc_bump.conf", false); + let mut ctx = BtcCtx::setup(&tctx, "taler_btc_bump.conf", false).await; // Perform credits to allow wire to perform debits latter for n in 10..13 { - ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()); - ctx.next_block(); + ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()) + .await; + ctx.next_block().await; } - ctx.next_conf(); + ctx.next_conf().await; ctx.step("Bump fee"); { // Perform debit - let mut client = ctx.client_balance(); - let wire = ctx.wire_balance(); + let mut client = ctx.client_balance().await; + let wire = ctx.wire_balance().await; let amount = Amount::from_sat(40000); - ctx.debit(amount, &Base32::rand()); - retry(|| ctx.wire_balance() < wire, "balance"); + ctx.debit(amount, &Base32::rand()).await; + retry! { ctx.wire_balance().await < wire }; // Bump min relay fee making the previous debit stuck - ctx.restart_node(&["-minrelaytxfee=0.0001"]); + ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; // Check bump happen client += amount; - ctx.expect_client_balance(client, true); + ctx.expect_client_balance(client, true).await; } ctx.step("Bump fee reorg"); { // Loose second bitcoin node - ctx.cluster_deco(); + ctx.cluster_deco().await; // Perform debit - let mut client = ctx.client_balance(); - let wire = ctx.wire_balance(); + let mut client = ctx.client_balance().await; + let wire = ctx.wire_balance().await; let amount = Amount::from_sat(40000); - ctx.debit(amount, &Base32::rand()); - retry(|| ctx.wire_balance() < wire, "balance"); + ctx.debit(amount, &Base32::rand()).await; + retry! { ctx.wire_balance().await < wire }; // Bump min relay fee and fork making the previous debit stuck and problematic - ctx.cluster_fork(); - ctx.restart_node(&["-minrelaytxfee=0.0001"]); + ctx.cluster_fork().await; + ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; // Check bump happen client += amount; - ctx.expect_client_balance(client, true); + ctx.expect_client_balance(client, true).await; } ctx.step("Setup"); drop(ctx); - let mut ctx = BtcCtx::setup(&tctx, "taler_btc_bump.conf", true); + let mut ctx = BtcCtx::setup(&tctx, "taler_btc_bump.conf", true).await; // Perform credits to allow wire to perform debits latter for n in 10..61 { - ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()); - ctx.next_block(); + ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()) + .await; + ctx.next_block().await; } - ctx.next_conf(); + ctx.next_conf().await; ctx.step("Bump fee stress"); { // Loose second bitcoin node - ctx.cluster_deco(); + ctx.cluster_deco().await; // Perform debits - let client = ctx.client_balance(); - let wire = ctx.wire_balance(); + let client = ctx.client_balance().await; + let wire = ctx.wire_balance().await; let mut total_amount = Amount::ZERO; for n in 10..31 { let amount = Amount::from_sat(n * 10000); total_amount += amount; - ctx.debit(amount, &Base32::rand()); + ctx.debit(amount, &Base32::rand()).await; } - retry(|| ctx.wire_balance() < wire - total_amount, "balance"); + retry! { ctx.wire_balance().await < wire - total_amount }; // Bump min relay fee making the previous debits stuck - ctx.restart_node(&["-minrelaytxfee=0.0001"]); + ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; // Check bump happen - ctx.expect_client_balance(client + total_amount, true); + ctx.expect_client_balance(client + total_amount, true).await; } } /// Test btc-wire handle transaction fees exceeding limits -pub fn maxfee(ctx: TestCtx) { +pub async fn maxfee(ctx: TestCtx) { ctx.step("Setup"); - let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false); + let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; // Perform credits to allow wire to perform debits latter for n in 10..31 { - ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()); - ctx.next_block(); + ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()) + .await; + ctx.next_block().await; } - ctx.next_conf(); + ctx.next_conf().await; - let client = ctx.client_balance(); - let wire = ctx.wire_balance(); + let client = ctx.client_balance().await; + let wire = ctx.wire_balance().await; let mut total_amount = Amount::ZERO; ctx.step("Too high fee"); { // Change fee config - ctx.restart_node(&["-maxtxfee=0.0000001", "-minrelaytxfee=0.0000001"]); + ctx.restart_node(&["-maxtxfee=0.0000001", "-minrelaytxfee=0.0000001"]) + .await; // Perform debits for n in 10..31 { let amount = Amount::from_sat(n * 10000); total_amount += amount; - ctx.debit(amount, &Base32::rand()); + ctx.debit(amount, &Base32::rand()).await; } - ctx.mine(2); + ctx.mine(2).await; // Check no transaction happen - ctx.expect_wire_balance(wire, false); - ctx.expect_client_balance(client, false); + ctx.expect_wire_balance(wire, false).await; + ctx.expect_client_balance(client, false).await; } ctx.step("Good feed"); { // Restore default config - ctx.restart_node(&[]); + ctx.restart_node(&[]).await; // Check transaction now have been made - ctx.expect_client_balance(client + total_amount, true); + ctx.expect_client_balance(client + total_amount, true).await; } } /// Test btc-wire ability to configure itself from bitcoin configuration -pub fn config(ctx: TestCtx) { +pub async fn config(ctx: TestCtx) { // Connect with cookie files ctx.step("Cookie"); BtcCtx::config( @@ -1009,7 +1025,8 @@ pub fn config(ctx: TestCtx) { dir.join("catch_me_if_you_can").to_string_lossy(), ); }, - ); + ) + .await; // Connect with password ctx.step("Password"); @@ -1026,7 +1043,8 @@ pub fn config(ctx: TestCtx) { .set("RPC_USERNAME", "bob") .set("RPC_PASSWORD", "password"); }, - ); + ) + .await; // Connect with token ctx.step("Token"); @@ -1042,5 +1060,5 @@ pub fn config(ctx: TestCtx) { .set("RPC_USERNAME", "bob") .set("RPC_PASSWORD", "password"); }, - ); + ).await; } diff --git a/instrumentation/src/main.rs b/instrumentation/src/main.rs @@ -15,7 +15,8 @@ */ use std::{ - panic::catch_unwind, + panic::UnwindSafe, + string::String, sync::{Arc, Mutex}, time::{Duration, Instant}, }; @@ -23,7 +24,7 @@ use std::{ use clap::Parser; use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; use owo_colors::OwoColorize; -use thread_local_panic_hook::set_hook; +use tokio::task::{JoinError, JoinHandle}; use utils::TestDb; use crate::utils::{TestCtx, try_cmd_redirect}; @@ -39,11 +40,76 @@ struct Args { filters: Vec<String>, } -pub fn main() { +struct Tmp<'a> { + filters: &'a [String], + m: MultiProgress, + start_style: ProgressStyle, + ok_style: ProgressStyle, + err_style: ProgressStyle, + tasks: Vec<( + &'static str, + JoinHandle<(Result<(), JoinError>, Duration, String)>, + )>, + db: Arc<TestDb>, + start: Instant, +} + +impl<'a> Tmp<'a> { + async fn check<T, F>(&mut self, name: &'static str, task: T) + where + T: FnOnce(TestCtx) -> F + Send + UnwindSafe + 'static, + F: Future<Output = ()> + Send + 'static, + { + if self.filters.is_empty() || self.filters.iter().any(|f| name.starts_with(f)) { + let pb = self.m.add(ProgressBar::new_spinner()); + pb.set_style(self.start_style.clone()); + pb.set_prefix(name); + pb.set_message("Init"); + pb.enable_steady_tick(Duration::from_millis(1000)); + let ok_style = self.ok_style.clone(); + let err_style = self.err_style.clone(); + let start = self.start; + let db = self.db.clone(); + self.tasks.push(( + name, + tokio::spawn(async move { + let ctx: TestCtx = TestCtx::new(name, pb.clone(), db); + let result = tokio::spawn(task(ctx)).await; + if result.is_ok() { + pb.set_style(ok_style.clone()); + pb.finish_with_message("OK"); + } else { + pb.set_style(err_style.clone()); + pb.finish(); + } + (result, start.elapsed(), pb.message()) + }), + )); + } + } +} + +#[tokio::main] +pub async fn main() { let Args { filters } = Args::parse(); std::fs::remove_dir_all("log").ok(); std::fs::create_dir_all("log/bin").unwrap(); + // Set panic hook + let failures = Arc::new(Mutex::new(Vec::new())); + { + let failures = failures.clone(); + std::panic::set_hook(Box::new(move |e| { + let backtrace = std::backtrace::Backtrace::force_capture(); + let info = format!("{e}\n{backtrace}"); + if let Some(id) = tokio::task::try_id() { + failures.lock().unwrap().push((id, info)); + } else { + eprintln!("Failed outside of a task:\n{info}") + } + })); + } + // Build binaries let p = ProgressBar::new_spinner(); p.set_style(ProgressStyle::with_template("building {msg} {elapsed:.dim}").unwrap()); @@ -57,65 +123,52 @@ pub fn main() { // Run tests let m = MultiProgress::new(); let start_style = - &ProgressStyle::with_template("{prefix:.magenta} {msg} {elapsed:.dim}").unwrap(); + ProgressStyle::with_template("{prefix:.magenta} {msg} {elapsed:.dim}").unwrap(); let ok_style = - &ProgressStyle::with_template("{prefix:.magenta} {msg:.green} {elapsed:.dim}").unwrap(); + ProgressStyle::with_template("{prefix:.magenta} {msg:.green} {elapsed:.dim}").unwrap(); let err_style = - &ProgressStyle::with_template("{prefix:.magenta} {msg:.red} {elapsed:.dim}").unwrap(); + ProgressStyle::with_template("{prefix:.magenta} {msg:.red} {elapsed:.dim}").unwrap(); let start = Instant::now(); let db = Arc::new(TestDb::new()); - let results: Vec<_> = std::thread::scope(|s| { - let tests: Vec<_> = TESTS - .iter() - .filter(|(_, name)| filters.is_empty() || filters.iter().any(|f| name.starts_with(f))) - .map(|(action, name)| { - let pb = m.add(ProgressBar::new_spinner()); - pb.set_style(start_style.clone()); - pb.set_prefix(*name); - pb.set_message("Init"); - pb.enable_steady_tick(Duration::from_millis(1000)); - let db = db.clone(); - let join = s.spawn(move || { - let start = Instant::now(); - let ctx: TestCtx = TestCtx::new(name, pb.clone(), db); - let out = Arc::new(Mutex::new(String::new())); - let tmp = out.clone(); - set_hook(Box::new(move |_| { - let backtrace = std::backtrace::Backtrace::force_capture().to_string(); - tmp.lock().unwrap().push_str(&backtrace); - })); - let tmp = ctx.clone(); - let result = catch_unwind(|| { - action(tmp); - }); - if result.is_ok() { - pb.set_style(ok_style.clone()); - pb.finish_with_message("OK"); - } else { - pb.set_style(err_style.clone()); - pb.finish(); - } - let out: String = out.lock().unwrap().clone(); - (result, start.elapsed(), out, pb.message()) - }); - (join, name) - }) - .collect(); - tests - .into_iter() - .map(|(j, n)| (j.join().unwrap(), n)) - .collect() - }); + let mut tmp = Tmp { + filters: filters.as_slice(), + m, + start_style, + ok_style, + err_style, + tasks: Vec::new(), + db, + start, + }; + tmp.check("btc_wire", btc::wire).await; + tmp.check("btc_lifetime", btc::lifetime).await; + tmp.check("btc_reconnect", btc::reconnect).await; + tmp.check("btc_stress", btc::stress).await; + tmp.check("btc_conflict", btc::conflict).await; + tmp.check("btc_reorg", btc::reorg).await; + tmp.check("btc_hell", btc::hell).await; + tmp.check("btc_analysis", btc::analysis).await; + tmp.check("btc_bumpfee", btc::bumpfee).await; + tmp.check("btc_maxfee", btc::maxfee).await; + tmp.check("btc_config", btc::config).await; + let mut results = Vec::new(); + for (name, task) in tmp.tasks { + results.push((name, task.await.unwrap())); + } let len = results.len(); - m.clear().unwrap(); - for ((result, _, out, msg), name) in &results { - if result.is_err() { - println!("{} {}\n{}", name.magenta(), msg.red(), out.bright_black()); + + tmp.m.clear().unwrap(); + let failures = failures.lock().unwrap(); + for (name, (result, _, msg)) in &results { + if let Err(e) = result { + if let Some((_, err)) = failures.iter().find(|(id, _)| *id == e.id()) { + println!("{} {}\n{}", name.magenta(), msg.red(), err.bright_black()); + } } } - for ((result, time, _, msg), name) in results { + for (name, (result, time, msg)) in results { match result { Ok(_) => { println!( @@ -156,17 +209,3 @@ pub fn build_bin(p: &ProgressBar, name: &str, features: Option<&str>, bin_name: ) .unwrap(); } - -pub const TESTS: &[(fn(TestCtx), &str)] = &[ - (btc::wire, "btc_wire"), - (btc::lifetime, "btc_lifetime"), - (btc::reconnect, "btc_reconnect"), - (btc::stress, "btc_stress"), - (btc::conflict, "btc_conflict"), - (btc::reorg, "btc_reorg"), - (btc::hell, "btc_hell"), - (btc::analysis, "btc_analysis"), - (btc::bumpfee, "btc_bumpfee"), - (btc::maxfee, "btc_maxfee"), - (btc::config, "btc_config") -]; diff --git a/instrumentation/src/utils.rs b/instrumentation/src/utils.rs @@ -15,16 +15,15 @@ */ use std::{ - fmt::{Debug, Display}, + fmt::Display, io::Write as _, - net::{Ipv4Addr, SocketAddrV4, TcpListener, TcpStream}, + net::{Ipv4Addr, SocketAddrV4, TcpListener}, ops::{Deref, DerefMut}, path::{Path, PathBuf}, process::{Child, Command, Stdio}, str::FromStr, sync::Arc, - thread::sleep, - time::{Duration, Instant}, + time::Duration, }; use common::{ @@ -40,7 +39,7 @@ use ini::Ini; use tempfile::TempDir; #[must_use] -pub fn check_incoming(base_url: &str, txs: &[(EddsaPublicKey, Amount)]) -> bool { +pub async fn check_incoming(base_url: &str, txs: &[(EddsaPublicKey, Amount)]) -> bool { let mut res = ureq::get(&format!("{base_url}history/incoming")) .query("delta", format!("-{}", 100)) .call() @@ -70,7 +69,7 @@ pub fn check_incoming(base_url: &str, txs: &[(EddsaPublicKey, Amount)]) -> bool } #[must_use] -pub fn check_gateway_down(base_url: &str) -> bool { +pub async fn check_gateway_down(base_url: &str) -> bool { matches!( ureq::get(&format!("{base_url}history/incoming")) .query("delta", "-5") @@ -80,11 +79,11 @@ pub fn check_gateway_down(base_url: &str) -> bool { } #[must_use] -pub fn check_gateway_up(base_url: &str) -> bool { +pub async fn check_gateway_up(base_url: &str) -> bool { ureq::get(&format!("{base_url}config")).call().is_ok() } -pub fn transfer(base_url: &str, wtid: &[u8; 32], credit_account: PaytoURI, amount: &Amount) { +pub async fn transfer(base_url: &str, wtid: &[u8; 32], credit_account: PaytoURI, amount: &Amount) { ureq::post(&format!("{base_url}transfer")) .send_json(TransferRequest { request_uid: Base32::rand(), @@ -97,7 +96,7 @@ pub fn transfer(base_url: &str, wtid: &[u8; 32], credit_account: PaytoURI, amoun } #[must_use] -pub fn check_outgoing(base_url: &str, txs: &[(ShortHashCode, Amount)]) -> bool { +pub async fn check_outgoing(base_url: &str, txs: &[(ShortHashCode, Amount)]) -> bool { let mut res = ureq::get(format!("{base_url}history/outgoing")) .query("delta", format!("-{}", txs.len())) .call() @@ -128,20 +127,6 @@ impl Drop for ChildGuard { } #[track_caller] -pub fn cmd_out(cmd: &str, args: &[&str]) -> String { - let output = Command::new(cmd) - .args(args) - .stdin(Stdio::null()) - .output() - .unwrap(); - if output.stdout.is_empty() { - String::from_utf8(output.stderr).unwrap() - } else { - String::from_utf8(output.stdout).unwrap() - } -} - -#[track_caller] pub fn try_cmd_redirect( cmd: &str, args: &[&str], @@ -179,22 +164,31 @@ pub fn cmd_redirect_ok(cmd: &str, args: &[&str], path: impl AsRef<Path>, name: & cmd_ok(cmd_redirect(cmd, args, path), name) } -#[track_caller] -pub fn retry_opt<T, E: Debug>(mut lambda: impl FnMut() -> Result<T, E>) -> T { - let start = Instant::now(); - loop { - let result = lambda(); - if result.is_err() && start.elapsed() < Duration::from_secs(20) { - sleep(Duration::from_millis(500)); - } else { - return result.unwrap(); +#[macro_export] +macro_rules! retry_opt { + ($expr:expr) => { + async { + let start = std::time::Instant::now(); + loop { + let result = $expr; + if result.is_err() && start.elapsed() < std::time::Duration::from_secs(20) { + tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } else { + return result.unwrap(); + } + } } - } + .await + }; } -#[track_caller] -pub fn retry(mut lambda: impl FnMut() -> bool, msg: &str) { - retry_opt(|| lambda().then_some(()).ok_or(msg)) +#[macro_export] +macro_rules! retry { + ($expr:expr) => { + $crate::retry_opt! { + $expr.then_some(()).ok_or("failure") + } + }; } #[derive(Clone)] @@ -240,7 +234,7 @@ impl TestCtx { } pub struct TalerCtx { - pub dir: TempDir, + _dir: TempDir, pub wire_dir: PathBuf, pub wire2_dir: PathBuf, pub conf: PathBuf, @@ -287,7 +281,7 @@ impl TalerCtx { cfg.write_to_file(&conf).unwrap(); Self { - dir, + _dir: dir, ctx: ctx.clone(), gateway_url, wire_dir, @@ -337,7 +331,7 @@ impl TalerCtx { ); } - pub fn run(&mut self) { + pub async fn run(&mut self) { // Run gateway self.gateway = Some(cmd_redirect( &self.wire_bin_path, @@ -360,7 +354,9 @@ impl TalerCtx { }); // Wait for gateway to be up - retry_opt(|| TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.gateway_port))); + retry_opt! { + tokio::net::TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.gateway_port)).await + }; } /* ----- Process ----- */ @@ -383,23 +379,20 @@ impl TalerCtx { /* ----- Wire Gateway -----*/ - pub fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) { - retry(|| check_incoming(&self.gateway_url, txs), "check_incoming") + pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) { + retry! { check_incoming(&self.gateway_url, txs).await } } - pub fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) { - retry(|| check_outgoing(&self.gateway_url, txs), "check_outgoing") + pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) { + retry! { check_outgoing(&self.gateway_url, txs).await } } - pub fn expect_gateway_up(&self) { - retry(|| check_gateway_up(&self.gateway_url), "check_gateway_up"); + pub async fn expect_gateway_up(&self) { + retry! { check_gateway_up(&self.gateway_url).await } } - pub fn expect_gateway_down(&self) { - retry( - || check_gateway_down(&self.gateway_url), - "check_gateway_down", - ); + pub async fn expect_gateway_down(&self) { + retry! { check_gateway_down(&self.gateway_url).await } } } @@ -464,7 +457,12 @@ impl TestDb { ); let tmp = Self { dir, _db: db }; // Wait for postgres to start - retry(|| tmp.execute_sql("SELECT true"), "test db"); + for _ in 0..10 { + if tmp.execute_sql("SELECT true") { + break; + } + std::thread::sleep(Duration::from_millis(500)) + } tmp } diff --git a/uri-pack/Cargo.toml b/uri-pack/Cargo.toml @@ -15,8 +15,6 @@ csv = "1.3.0" [dev-dependencies] # Json parser serde_json.workspace = true -# Url parser -url.workspace = true # statistics-driven micro-benchmarks criterion.workspace = true # Fast insecure random