depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 1a6786fdcd94d8bcdd8149176b4a6bf6e19f9ff5
parent 5f97609307dd96c3b0dca2a5f61f914fb9057f96
Author: Antoine A <>
Date:   Wed,  5 Jan 2022 12:25:49 +0100

Add configurable lifetime

Diffstat:
Mbtc-wire/src/main.rs | 35+++++++++++++++++++++++++++++++----
Mmakefile | 1+
Ascript/conf/taler_lifetime.conf | 13+++++++++++++
Mscript/setup.sh | 19+++++++++++++++++++
Mscript/test_btc_fork.sh | 33++++++++++++---------------------
Ascript/test_btc_lifetime.sh | 55+++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mscript/test_btc_reconnect.sh | 3++-
Mtaler-config/src/lib.rs | 10+++++++++-
Mwire-gateway/src/main.rs | 58+++++++++++++++++++++++++++++++++++++++++++++++++++++-----
9 files changed, 195 insertions(+), 32 deletions(-)

diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs @@ -1,4 +1,4 @@ -use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, SignedAmount, Txid}; +use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Network, SignedAmount, Txid}; use btc_wire::{ config::BitcoinConfig, rpc::{self, BtcRpc, Category, ErrorCode}, @@ -166,11 +166,25 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config) // TODO check if transactions are abandoned + let mut lifetime = config.btc_lifetime.clone(); + // Alway start with a sync work let mut skip_notification = true; loop { + // Check lifetime + if let Some(nb) = lifetime.as_mut() { + if *nb == 0 { + info!("Reach end of lifetime"); + return; + } else { + *nb -= 1; + } + } + + // Connect let rpc = rpc.client(); let db = db.client(); + let result: Result<(), Box<dyn std::error::Error>> = (|| { // Listen to all channels db.batch_execute("LISTEN new_block; LISTEN new_tx")?; @@ -445,9 +459,6 @@ fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) { fn main() { taler_log::init(); - #[cfg(feature = "fail")] - taler_log::log::warn!("Running with random failures is unsuitable for production"); - let config = taler_config::Config::load_from_file( std::env::args_os().nth(1).expect("Missing conf path arg"), ); @@ -458,6 +469,22 @@ fn main() { .unwrap_or_else(default_data_dir); let config: &'static Config = Box::leak(Box::new(config)); let btc_config = BitcoinConfig::load(&data_dir).unwrap(); + + #[cfg(feature = "fail")] + if btc_config.network == Network::Regtest { + taler_log::log::warn!("Running with random failures"); + } else { + taler_log::log::error!("Running with random failures is unsuitable for production"); + std::process::exit(1); + } + let chain_name = match btc_config.network { + Network::Bitcoin => "main", + Network::Testnet => "test", + Network::Signet => "signet", + Network::Regtest => "regtest", + }; + info!("Running on {} chain", chain_name); + let mut rpc = BtcRpc::common(&btc_config).unwrap(); rpc.load_wallet(&config.btc_wallet).ok(); let rpc_listener = AutoReconnectRPC::new( diff --git a/makefile b/makefile @@ -7,6 +7,7 @@ test_gateway: test_btc: script/test_btc_wire.sh + script/test_btc_lifetime.sh script/test_btc_reconnect.sh script/test_btc_fail.sh script/test_btc_fork.sh diff --git a/script/conf/taler_lifetime.conf b/script/conf/taler_lifetime.conf @@ -0,0 +1,12 @@ +[exchange] +BASE_URL = http://test.com + +[depolymerizer-bitcoin] +DB_URL = postgres://localhost:5454/postgres?user=postgres&password=password +PORT = 8060 +PAYTO = payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj +CONFIRMATION = 3 +BTC_WALLET = wire +BOUNCE_FEE = 1000 +HTTP_LIFETIME = 10 +BTC_LIFETIME = 10 +\ No newline at end of file diff --git a/script/setup.sh b/script/setup.sh @@ -34,6 +34,24 @@ function load_config() { BANK_ENDPOINT=http://127.0.0.1:$PORT/ } +# Check process is running +function check_up() { + if [ `ps -p $1 | grep -c $1` == 0 ]; then + echo "${2:-process} with pid $1 should be up" + ps -p $1 + exit 1 + fi +} + +# Check process is not running +function check_down() { + if [ `ps -p $1 | grep -c $1` != 0 ]; then + echo "${2:-process} with pid $1 should be down" + ps -p $1 + exit 1 + fi +} + # ----- Database ----- # # Create new postgresql cluster and init database schema @@ -167,6 +185,7 @@ function stressed_btc_wire() { function gateway() { cargo build --bin wire-gateway --release --features test &> /dev/null target/release/wire-gateway $CONF &> log/gateway.log & + GATEWAY_PID="$!" for n in `seq 1 50`; do echo -n "." sleep 0.2 diff --git a/script/test_btc_fork.sh b/script/test_btc_fork.sh @@ -24,21 +24,12 @@ gateway echo "" # Check btc-wire is running -function check_up() { - if [ `ps -p $WIRE_PID | grep -c $WIRE_PID` == 0 ]; then - echo "btc_wire with pid $WIRE_PID should be up" - ps -p $WIRE_PID - exit 1 - fi +function up() { + check_up $WIRE_PID btc_wire } - # Check btc-wire is not running -function check_down() { - if [ `ps -p $WIRE_PID | grep -c $WIRE_PID` != 0 ]; then - echo "btc_wire with pid $WIRE_PID should be down" - ps -p $WIRE_PID - exit 1 - fi +function down() { + check_down $WIRE_PID btc_wire } SEQ="seq 10 20" @@ -59,16 +50,16 @@ check_balance 9.99826299 0.00165000 echo " OK" echo -n "Perform fork and check btc-wire hard error:" -check_up +up btc2_fork check_balance 9.99826299 0.00000000 -check_down +down echo " OK" echo -n "Check btc-wire hard error on restart:" btc_wire sleep 1 -check_down +down echo " OK" echo -n "Recover orphaned transactions:" @@ -79,7 +70,7 @@ echo " OK" echo -n "Check btc-wire heal on restart:" btc_wire sleep 1 -check_up +up echo " OK" echo "----- Handle reorg outgoing transactions -----" @@ -101,10 +92,10 @@ check_balance 9.99842799 0.00146311 echo " OK" echo -n "Perform fork and check btc-wire still up:" -check_up +up btc2_fork check_balance 9.99826299 0.00146311 -check_up +up echo " OK" echo -n "Recover orphaned transactions:" @@ -133,10 +124,10 @@ check_balance "*" 0.00011000 echo " OK" echo -n "Perform fork and check btc-wire still up:" -check_up +up btc2_fork check_balance "*" 0.00000000 -check_up +up echo " OK" echo -n "Recover orphaned transactions:" diff --git a/script/test_btc_lifetime.sh b/script/test_btc_lifetime.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +## Check btc-wire and wire-gateway correctly stop when a lifetime limit is configured + +CONFIG=taler_lifetime.conf + +set -eu + +source "${BASH_SOURCE%/*}/setup.sh" + +echo "----- Setup -----" +echo "Load config file" +load_config +echo "Start database" +setup_db +echo "Start bitcoin node" +init_btc +echo "Setup bitcoin" +setup_btc +echo "Start btc-wire" +btc_wire +echo "Start gateway" +gateway +echo "" + +SEQ="seq 10 20" + +echo "---- Check lifetime -----" + +echo -n "Check up:" +check_up $WIRE_PID btc-wire +check_up $GATEWAY_PID wire-gateway +echo " OK" + +echo -n "Do some work:" +for n in `$SEQ`; do + btc-wire-cli -d $BTC_DIR transfer 0.000$n + mine_btc # Mine transactions +done +next_btc # Trigger btc_wire +check_balance 9.99826299 0.00165000 +for n in `$SEQ`; do + taler-exchange-wire-gateway-client \ + -b $BANK_ENDPOINT \ + -C payto://bitcoin/$CLIENT \ + -a BTC:0.0000$n &> /dev/null || break; +done +echo " OK" + +echo -n "Check down:" +check_down $WIRE_PID btc-wire +check_down $GATEWAY_PID wire-gateway +echo " OK" + +echo "All tests passed" diff --git a/script/test_btc_reconnect.sh b/script/test_btc_reconnect.sh @@ -35,10 +35,11 @@ echo "Stop database" pg_ctl stop -D $DB_DIR > /dev/null echo "Making incomplete wire transfer to exchange" $BTC_CLI -rpcwallet=client sendtoaddress $WIRE 0.00042 &> /dev/null -echo "Making wire transfer to exchange:" +echo -n "Making wire transfer to exchange:" btc-wire-cli -d $BTC_DIR transfer 0.00004 next_btc check_balance 9.99948077 0.00050200 +echo " OK" echo "Stop bitcoin node" stop_btc echo -n "Requesting exchange incoming transaction list:" diff --git a/taler-config/src/lib.rs b/taler-config/src/lib.rs @@ -12,11 +12,13 @@ pub struct Config { pub db_url: String, pub port: u16, pub unix_path: Option<PathBuf>, - pub btc_data_dir: Option<PathBuf>, + pub btc_data_dir: Option<PathBuf>, pub payto: Url, pub confirmation: u8, pub btc_wallet: String, pub bounce_fee: u64, + pub btc_lifetime: Option<u64>, + pub http_lifetime: Option<u64>, } impl Config { @@ -35,6 +37,12 @@ impl Config { confirmation: nb(self_conf, "CONFIRMATION").unwrap_or(6), btc_wallet: string(self_conf, "BTC_WALLET").unwrap_or_else(|| "wire".to_string()), bounce_fee: nb(self_conf, "BOUNCE_FEE").unwrap_or(1000), + btc_lifetime: nb(self_conf, "BTC_LIFETIME") + .and_then(|nb| (nb != 0).then(|| Some(nb))) + .unwrap_or(None), + http_lifetime: nb(self_conf, "HTTP_LIFETIME") + .and_then(|nb| (nb != 0).then(|| Some(nb))) + .unwrap_or(None), } } } diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs @@ -7,7 +7,12 @@ use hyper::{ }; use json::{encode_body, parse_body}; use listenfd::ListenFd; -use std::{convert::Infallible, str::FromStr, time::Instant}; +use std::{ + convert::Infallible, + str::FromStr, + sync::atomic::{AtomicU64, Ordering}, + time::Instant, +}; use taler_api::{ api_common::{Amount, SafeUint64, ShortHashCode, Timestamp}, api_wire::{ @@ -18,6 +23,7 @@ use taler_api::{ url::Url, }; use taler_log::log::{error, info, log, Level}; +use tokio::sync::Notify; use tokio_postgres::{config::Host, NoTls}; mod error; @@ -26,6 +32,36 @@ mod json; struct ServerState { pool: Pool, config: taler_config::Config, + notify: Notify, + lifetime: Option<AtomicU64>, +} + +impl ServerState { + /// Decrease lifetime, triggering graceful shutdown when reach lifetime end + pub fn step(&self) { + if let Some(lifetime) = &self.lifetime { + let mut current = lifetime.load(Ordering::Relaxed); + loop { + if current == 0 { + self.notify.notify_one(); + } + match lifetime.compare_exchange_weak( + current, + current - 1, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(new) => current = new, + } + } + } + } + + pub async fn shutdown_signal(&self) { + self.notify.notified().await; + info!("Reach end of lifetime"); + } } #[tokio::main] @@ -68,9 +104,12 @@ async fn main() { let state = ServerState { pool, config: conf.clone(), + notify: Notify::new(), + lifetime: conf.http_lifetime.map(|n| AtomicU64::new(n)), }; let state: &'static ServerState = Box::leak(Box::new(state)); let service = service_fn(move |req| async move { + state.step(); let start = Instant::now(); let (parts, body) = req.into_parts(); let (response, msg) = match router(&parts, body, state).await { @@ -100,12 +139,16 @@ async fn main() { let make_service_unix = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) }); let mut listenfd = ListenFd::from_env(); + if let Some(listener) = listenfd.take_tcp_listener(0).unwrap() { info!( "Server listening on activated socket {}", listener.local_addr().unwrap() ); - let server = Server::from_tcp(listener).unwrap().serve(make_service); + let server = Server::from_tcp(listener) + .unwrap() + .serve(make_service) + .with_graceful_shutdown(state.shutdown_signal()); if let Err(e) = server.await { error!("server: {}", e); } @@ -117,18 +160,23 @@ async fn main() { panic!("{}", err); } } - let server = Server::bind_unix(path).unwrap().serve(make_service_unix); + let server = Server::bind_unix(path) + .unwrap() + .serve(make_service_unix) + .with_graceful_shutdown(state.shutdown_signal()); if let Err(e) = server.await { error!("server: {}", e); } } else { let addr = ([0, 0, 0, 0], state.config.port).into(); info!("Server listening on http://{}", &addr); - let server = Server::bind(&addr).serve(make_service); + let server = Server::bind(&addr) + .serve(make_service) + .with_graceful_shutdown(state.shutdown_signal()); if let Err(e) = server.await { error!("server: {}", e); } - } + }; } /// Check if an url is a valid bitcoin payto url