diff options
author | Antoine A <> | 2022-01-05 12:25:49 +0100 |
---|---|---|
committer | Antoine A <> | 2022-01-05 12:25:49 +0100 |
commit | 1a6786fdcd94d8bcdd8149176b4a6bf6e19f9ff5 (patch) | |
tree | f7d18ce1733b7b09dbb8c5d277f247a555244a29 | |
parent | 5f97609307dd96c3b0dca2a5f61f914fb9057f96 (diff) | |
download | depolymerization-1a6786fdcd94d8bcdd8149176b4a6bf6e19f9ff5.tar.gz depolymerization-1a6786fdcd94d8bcdd8149176b4a6bf6e19f9ff5.tar.bz2 depolymerization-1a6786fdcd94d8bcdd8149176b4a6bf6e19f9ff5.zip |
Add configurable lifetime
-rw-r--r-- | btc-wire/src/main.rs | 35 | ||||
-rw-r--r-- | makefile | 1 | ||||
-rw-r--r-- | script/conf/taler_lifetime.conf | 12 | ||||
-rw-r--r-- | script/setup.sh | 19 | ||||
-rw-r--r-- | script/test_btc_fork.sh | 33 | ||||
-rw-r--r-- | script/test_btc_lifetime.sh | 55 | ||||
-rw-r--r-- | script/test_btc_reconnect.sh | 3 | ||||
-rw-r--r-- | taler-config/src/lib.rs | 10 | ||||
-rw-r--r-- | wire-gateway/src/main.rs | 58 |
9 files changed, 194 insertions, 32 deletions
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs index ce1616c..c4d941f 100644 --- a/btc-wire/src/main.rs +++ b/btc-wire/src/main.rs @@ -1,4 +1,4 @@ -use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, SignedAmount, Txid};
+use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Network, SignedAmount, Txid};
use btc_wire::{
config::BitcoinConfig,
rpc::{self, BtcRpc, Category, ErrorCode},
@@ -166,11 +166,25 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config) // TODO check if transactions are abandoned
+ let mut lifetime = config.btc_lifetime.clone();
+
// Alway start with a sync work
let mut skip_notification = true;
loop {
+ // Check lifetime
+ if let Some(nb) = lifetime.as_mut() {
+ if *nb == 0 {
+ info!("Reach end of lifetime");
+ return;
+ } else {
+ *nb -= 1;
+ }
+ }
+
+ // Connect
let rpc = rpc.client();
let db = db.client();
+
let result: Result<(), Box<dyn std::error::Error>> = (|| {
// Listen to all channels
db.batch_execute("LISTEN new_block; LISTEN new_tx")?;
@@ -445,9 +459,6 @@ fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) { fn main() {
taler_log::init();
- #[cfg(feature = "fail")]
- taler_log::log::warn!("Running with random failures is unsuitable for production");
-
let config = taler_config::Config::load_from_file(
std::env::args_os().nth(1).expect("Missing conf path arg"),
);
@@ -458,6 +469,22 @@ fn main() { .unwrap_or_else(default_data_dir);
let config: &'static Config = Box::leak(Box::new(config));
let btc_config = BitcoinConfig::load(&data_dir).unwrap();
+
+ #[cfg(feature = "fail")]
+ if btc_config.network == Network::Regtest {
+ taler_log::log::warn!("Running with random failures");
+ } else {
+ taler_log::log::error!("Running with random failures is unsuitable for production");
+ std::process::exit(1);
+ }
+ let chain_name = match btc_config.network {
+ Network::Bitcoin => "main",
+ Network::Testnet => "test",
+ Network::Signet => "signet",
+ Network::Regtest => "regtest",
+ };
+ info!("Running on {} chain", chain_name);
+
let mut rpc = BtcRpc::common(&btc_config).unwrap();
rpc.load_wallet(&config.btc_wallet).ok();
let rpc_listener = AutoReconnectRPC::new(
@@ -7,6 +7,7 @@ test_gateway: test_btc:
script/test_btc_wire.sh
+ script/test_btc_lifetime.sh
script/test_btc_reconnect.sh
script/test_btc_fail.sh
script/test_btc_fork.sh
diff --git a/script/conf/taler_lifetime.conf b/script/conf/taler_lifetime.conf new file mode 100644 index 0000000..4829d12 --- /dev/null +++ b/script/conf/taler_lifetime.conf @@ -0,0 +1,12 @@ +[exchange] +BASE_URL = http://test.com + +[depolymerizer-bitcoin] +DB_URL = postgres://localhost:5454/postgres?user=postgres&password=password +PORT = 8060 +PAYTO = payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj +CONFIRMATION = 3 +BTC_WALLET = wire +BOUNCE_FEE = 1000 +HTTP_LIFETIME = 10 +BTC_LIFETIME = 10
\ No newline at end of file diff --git a/script/setup.sh b/script/setup.sh index b18a3d8..7601013 100644 --- a/script/setup.sh +++ b/script/setup.sh @@ -34,6 +34,24 @@ function load_config() { BANK_ENDPOINT=http://127.0.0.1:$PORT/ } +# Check process is running +function check_up() { + if [ `ps -p $1 | grep -c $1` == 0 ]; then + echo "${2:-process} with pid $1 should be up" + ps -p $1 + exit 1 + fi +} + +# Check process is not running +function check_down() { + if [ `ps -p $1 | grep -c $1` != 0 ]; then + echo "${2:-process} with pid $1 should be down" + ps -p $1 + exit 1 + fi +} + # ----- Database ----- # # Create new postgresql cluster and init database schema @@ -167,6 +185,7 @@ function stressed_btc_wire() { function gateway() { cargo build --bin wire-gateway --release --features test &> /dev/null target/release/wire-gateway $CONF &> log/gateway.log & + GATEWAY_PID="$!" for n in `seq 1 50`; do echo -n "." sleep 0.2 diff --git a/script/test_btc_fork.sh b/script/test_btc_fork.sh index 1bfc8e8..c715421 100644 --- a/script/test_btc_fork.sh +++ b/script/test_btc_fork.sh @@ -24,21 +24,12 @@ gateway echo "" # Check btc-wire is running -function check_up() { - if [ `ps -p $WIRE_PID | grep -c $WIRE_PID` == 0 ]; then - echo "btc_wire with pid $WIRE_PID should be up" - ps -p $WIRE_PID - exit 1 - fi +function up() { + check_up $WIRE_PID btc_wire } - # Check btc-wire is not running -function check_down() { - if [ `ps -p $WIRE_PID | grep -c $WIRE_PID` != 0 ]; then - echo "btc_wire with pid $WIRE_PID should be down" - ps -p $WIRE_PID - exit 1 - fi +function down() { + check_down $WIRE_PID btc_wire } SEQ="seq 10 20" @@ -59,16 +50,16 @@ check_balance 9.99826299 0.00165000 echo " OK" echo -n "Perform fork and check btc-wire hard error:" -check_up +up btc2_fork check_balance 9.99826299 0.00000000 -check_down +down echo " OK" echo -n "Check btc-wire hard error on restart:" btc_wire sleep 1 -check_down +down echo " OK" echo -n "Recover orphaned transactions:" @@ -79,7 +70,7 @@ echo " OK" echo -n "Check btc-wire heal on restart:" btc_wire sleep 1 -check_up +up echo " OK" echo "----- Handle reorg outgoing transactions -----" @@ -101,10 +92,10 @@ check_balance 9.99842799 0.00146311 echo " OK" echo -n "Perform fork and check btc-wire still up:" -check_up +up btc2_fork check_balance 9.99826299 0.00146311 -check_up +up echo " OK" echo -n "Recover orphaned transactions:" @@ -133,10 +124,10 @@ check_balance "*" 0.00011000 echo " OK" echo -n "Perform fork and check btc-wire still up:" -check_up +up btc2_fork check_balance "*" 0.00000000 -check_up +up echo " OK" echo -n "Recover orphaned transactions:" diff --git a/script/test_btc_lifetime.sh b/script/test_btc_lifetime.sh new file mode 100644 index 0000000..7929249 --- /dev/null +++ b/script/test_btc_lifetime.sh @@ -0,0 +1,55 @@ +#!/bin/bash + +## Check btc-wire and wire-gateway correctly stop when a lifetime limit is configured + +CONFIG=taler_lifetime.conf + +set -eu + +source "${BASH_SOURCE%/*}/setup.sh" + +echo "----- Setup -----" +echo "Load config file" +load_config +echo "Start database" +setup_db +echo "Start bitcoin node" +init_btc +echo "Setup bitcoin" +setup_btc +echo "Start btc-wire" +btc_wire +echo "Start gateway" +gateway +echo "" + +SEQ="seq 10 20" + +echo "---- Check lifetime -----" + +echo -n "Check up:" +check_up $WIRE_PID btc-wire +check_up $GATEWAY_PID wire-gateway +echo " OK" + +echo -n "Do some work:" +for n in `$SEQ`; do + btc-wire-cli -d $BTC_DIR transfer 0.000$n + mine_btc # Mine transactions +done +next_btc # Trigger btc_wire +check_balance 9.99826299 0.00165000 +for n in `$SEQ`; do + taler-exchange-wire-gateway-client \ + -b $BANK_ENDPOINT \ + -C payto://bitcoin/$CLIENT \ + -a BTC:0.0000$n &> /dev/null || break; +done +echo " OK" + +echo -n "Check down:" +check_down $WIRE_PID btc-wire +check_down $GATEWAY_PID wire-gateway +echo " OK" + +echo "All tests passed" diff --git a/script/test_btc_reconnect.sh b/script/test_btc_reconnect.sh index 32088b8..78c4225 100644 --- a/script/test_btc_reconnect.sh +++ b/script/test_btc_reconnect.sh @@ -35,10 +35,11 @@ echo "Stop database" pg_ctl stop -D $DB_DIR > /dev/null echo "Making incomplete wire transfer to exchange" $BTC_CLI -rpcwallet=client sendtoaddress $WIRE 0.00042 &> /dev/null -echo "Making wire transfer to exchange:" +echo -n "Making wire transfer to exchange:" btc-wire-cli -d $BTC_DIR transfer 0.00004 next_btc check_balance 9.99948077 0.00050200 +echo " OK" echo "Stop bitcoin node" stop_btc echo -n "Requesting exchange incoming transaction list:" diff --git a/taler-config/src/lib.rs b/taler-config/src/lib.rs index a1fee9d..7771708 100644 --- a/taler-config/src/lib.rs +++ b/taler-config/src/lib.rs @@ -12,11 +12,13 @@ pub struct Config { pub db_url: String, pub port: u16, pub unix_path: Option<PathBuf>, - pub btc_data_dir: Option<PathBuf>, + pub btc_data_dir: Option<PathBuf>, pub payto: Url, pub confirmation: u8, pub btc_wallet: String, pub bounce_fee: u64, + pub btc_lifetime: Option<u64>, + pub http_lifetime: Option<u64>, } impl Config { @@ -35,6 +37,12 @@ impl Config { confirmation: nb(self_conf, "CONFIRMATION").unwrap_or(6), btc_wallet: string(self_conf, "BTC_WALLET").unwrap_or_else(|| "wire".to_string()), bounce_fee: nb(self_conf, "BOUNCE_FEE").unwrap_or(1000), + btc_lifetime: nb(self_conf, "BTC_LIFETIME") + .and_then(|nb| (nb != 0).then(|| Some(nb))) + .unwrap_or(None), + http_lifetime: nb(self_conf, "HTTP_LIFETIME") + .and_then(|nb| (nb != 0).then(|| Some(nb))) + .unwrap_or(None), } } } diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs index a3245b6..acca739 100644 --- a/wire-gateway/src/main.rs +++ b/wire-gateway/src/main.rs @@ -7,7 +7,12 @@ use hyper::{ };
use json::{encode_body, parse_body};
use listenfd::ListenFd;
-use std::{convert::Infallible, str::FromStr, time::Instant};
+use std::{
+ convert::Infallible,
+ str::FromStr,
+ sync::atomic::{AtomicU64, Ordering},
+ time::Instant,
+};
use taler_api::{
api_common::{Amount, SafeUint64, ShortHashCode, Timestamp},
api_wire::{
@@ -18,6 +23,7 @@ use taler_api::{ url::Url,
};
use taler_log::log::{error, info, log, Level};
+use tokio::sync::Notify;
use tokio_postgres::{config::Host, NoTls};
mod error;
@@ -26,6 +32,36 @@ mod json; struct ServerState {
pool: Pool,
config: taler_config::Config,
+ notify: Notify,
+ lifetime: Option<AtomicU64>,
+}
+
+impl ServerState {
+ /// Decrease lifetime, triggering graceful shutdown when reach lifetime end
+ pub fn step(&self) {
+ if let Some(lifetime) = &self.lifetime {
+ let mut current = lifetime.load(Ordering::Relaxed);
+ loop {
+ if current == 0 {
+ self.notify.notify_one();
+ }
+ match lifetime.compare_exchange_weak(
+ current,
+ current - 1,
+ Ordering::SeqCst,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => break,
+ Err(new) => current = new,
+ }
+ }
+ }
+ }
+
+ pub async fn shutdown_signal(&self) {
+ self.notify.notified().await;
+ info!("Reach end of lifetime");
+ }
}
#[tokio::main]
@@ -68,9 +104,12 @@ async fn main() { let state = ServerState {
pool,
config: conf.clone(),
+ notify: Notify::new(),
+ lifetime: conf.http_lifetime.map(|n| AtomicU64::new(n)),
};
let state: &'static ServerState = Box::leak(Box::new(state));
let service = service_fn(move |req| async move {
+ state.step();
let start = Instant::now();
let (parts, body) = req.into_parts();
let (response, msg) = match router(&parts, body, state).await {
@@ -100,12 +139,16 @@ async fn main() { let make_service_unix = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) });
let mut listenfd = ListenFd::from_env();
+
if let Some(listener) = listenfd.take_tcp_listener(0).unwrap() {
info!(
"Server listening on activated socket {}",
listener.local_addr().unwrap()
);
- let server = Server::from_tcp(listener).unwrap().serve(make_service);
+ let server = Server::from_tcp(listener)
+ .unwrap()
+ .serve(make_service)
+ .with_graceful_shutdown(state.shutdown_signal());
if let Err(e) = server.await {
error!("server: {}", e);
}
@@ -117,18 +160,23 @@ async fn main() { panic!("{}", err);
}
}
- let server = Server::bind_unix(path).unwrap().serve(make_service_unix);
+ let server = Server::bind_unix(path)
+ .unwrap()
+ .serve(make_service_unix)
+ .with_graceful_shutdown(state.shutdown_signal());
if let Err(e) = server.await {
error!("server: {}", e);
}
} else {
let addr = ([0, 0, 0, 0], state.config.port).into();
info!("Server listening on http://{}", &addr);
- let server = Server::bind(&addr).serve(make_service);
+ let server = Server::bind(&addr)
+ .serve(make_service)
+ .with_graceful_shutdown(state.shutdown_signal());
if let Err(e) = server.await {
error!("server: {}", e);
}
- }
+ };
}
/// Check if an url is a valid bitcoin payto url
|