depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 2304c7ac5b272d47c14e8d54f21c85afc0a0e01d
parent 83c8149b3562964c669dd614271b63edb4ce7019
Author: Antoine A <>
Date:   Thu, 15 Sep 2022 21:15:19 +0200

Fix analysis and merge analysis loop into worker loop

Diffstat:
Mbtc-wire/src/lib.rs | 6+++---
Mbtc-wire/src/loops/analysis.rs | 76+++++++++++++++++++++++-----------------------------------------------------
Mbtc-wire/src/loops/worker.rs | 18++++++++++--------
Mbtc-wire/src/main.rs | 8+-------
Meth-wire/src/lib.rs | 11+++++++----
Meth-wire/src/loops/analysis.rs | 89+++++++++++--------------------------------------------------------------------
Meth-wire/src/loops/worker.rs | 36+++++++++++++++++++++---------------
Meth-wire/src/main.rs | 9+--------
Minstrumentation/src/btc.rs | 6+++---
Minstrumentation/src/eth.rs | 7+++----
10 files changed, 84 insertions(+), 182 deletions(-)

diff --git a/btc-wire/src/lib.rs b/btc-wire/src/lib.rs @@ -14,7 +14,7 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ use std::path::{Path, PathBuf}; -use std::{str::FromStr, sync::atomic::AtomicU32}; +use std::str::FromStr; use bitcoin::{hashes::hex::FromHex, Address, Amount, Network, Txid}; use btc_config::BitcoinConfig; @@ -148,7 +148,7 @@ const DEFAULT_CONFIRMATION: u16 = 6; const DEFAULT_BOUNCE_FEE: &str = "0.00001"; pub struct WireState { - pub confirmation: AtomicU32, + pub confirmation: u32, pub max_confirmation: u32, pub btc_config: BitcoinConfig, pub bounce_fee: Amount, @@ -166,7 +166,7 @@ impl WireState { BitcoinConfig::load(path, currency).or_fail(|e| format!("bitcoin config: {}", e)); let init_confirmation = taler_config.confirmation().unwrap_or(DEFAULT_CONFIRMATION) as u32; Self { - confirmation: AtomicU32::new(init_confirmation), + confirmation: init_confirmation, max_confirmation: init_confirmation * 2, bounce_fee: config_bounce_fee(&taler_config.bounce_fee(), currency), lifetime: taler_config.wire_lifetime(), diff --git a/btc-wire/src/loops/analysis.rs b/btc-wire/src/loops/analysis.rs @@ -13,61 +13,31 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::{sync::atomic::Ordering, time::Duration}; - -use btc_wire::rpc::{AutoRpcCommon, ChainTipsStatus}; -use common::{ - log::log::{error, warn}, - postgres::fallible_iterator::FallibleIterator, - reconnect::AutoReconnectDb, -}; +use btc_wire::rpc::{ChainTipsStatus, Rpc}; +use common::log::log::warn; use super::LoopResult; -use crate::WireState; - -/// Analyse blockchain behavior and adapt confirmations in real time -pub fn analysis(mut rpc: AutoRpcCommon, mut db: AutoReconnectDb, state: &WireState) { - // The biggest fork ever seen - let mut max_seen = 0; - loop { - let rpc = rpc.client(); - let db = db.client(); - let result: LoopResult<()> = (|| { - // Register as listener - db.batch_execute("LISTEN new_block")?; - loop { - // Get biggest known valid fork - let fork = rpc - .get_chain_tips()? - .into_iter() - .filter_map(|t| (t.status == ChainTipsStatus::ValidFork).then(|| t.length)) - .max() - .unwrap_or(0) as u32; - // The first time we see a fork that big - if fork > max_seen { - max_seen = fork; - let current_conf = state.confirmation.load(Ordering::SeqCst); - // If new fork is bigger than the current confirmation - if fork > current_conf { - // Max two time the configuration - let new_conf = fork.min(state.max_confirmation); - state.confirmation.store(new_conf, Ordering::SeqCst); - warn!( - "analysis: found dangerous fork of {} blocks, adapt confirmation to {} blocks capped at {}, you should update taler.conf", - fork, new_conf, state.max_confirmation - ); - } - } - // TODO smarter analysis: suspicious transaction value, limit wire bitcoin throughput - - // Wait for next notification - db.notifications().blocking_iter().next()?; - } - })(); - if let Err(e) = result { - error!("analysis: {}", e); - std::thread::sleep(Duration::from_secs(5)); - } +/// Analyse blockchain behavior and return the new confirmation delay +pub fn analysis(rpc: &mut Rpc, current: u32, max: u32) -> LoopResult<u32> { + // Get biggest known valid fork + let fork = rpc + .get_chain_tips()? + .into_iter() + .filter_map(|t| (t.status == ChainTipsStatus::ValidFork).then(|| t.length)) + .max() + .unwrap_or(0) as u32; + // If new fork is bigger than what current confirmation delay protect against + if fork >= current { + // Limit confirmation growth + let new_conf = fork.saturating_add(1).min(max); + warn!( + "analysis: found dangerous fork of {} blocks, adapt confirmation to {} blocks capped at {}, you should update taler.conf", + fork, new_conf, max + ); + return Ok(new_conf); } + + // TODO smarter analysis: suspicious transaction value, limit wire bitcoin throughput + return Ok(current); } diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs @@ -16,7 +16,6 @@ use std::{ collections::HashMap, fmt::Write, - sync::atomic::Ordering, time::{Duration, SystemTime}, }; @@ -47,10 +46,10 @@ use crate::{ WireState, }; -use super::{LoopError, LoopResult}; +use super::{analysis::analysis, LoopError, LoopResult}; /// Synchronize local db with blockchain and perform transactions -pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, state: &WireState) { +pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, mut state: WireState) { let mut lifetime = state.lifetime; let mut status = true; let mut skip_notification = false; @@ -100,12 +99,15 @@ pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, state: &WireState return Err(LoopError::Concurrency); } + // Perform analysis + state.confirmation = analysis(rpc, state.confirmation, state.max_confirmation)?; + // Sync chain - if let Some(stuck) = sync_chain(rpc, db, state, &mut status)? { + if let Some(stuck) = sync_chain(rpc, db, &state, &mut status)? { // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent // Send requested debits - while debit(db, rpc, state)? {} + while debit(db, rpc, &state)? {} // Bump stuck transactions for id in stuck { @@ -161,8 +163,8 @@ fn sync_chain( ) -> LoopResult<Option<Vec<Txid>>> { // Get stored last_hash let last_hash = last_hash(db)?; - // Use the same confirmation delay for this sync - let conf_delay = state.confirmation.load(Ordering::SeqCst); + // Get the current confirmation delay + let conf_delay = state.confirmation; // Get a set of transactions ids to parse let (txs, removed, lastblock): ( @@ -171,7 +173,7 @@ fn sync_chain( BlockHash, ) = { // Get all transactions made since this block - let list = rpc.list_since_block(Some(&last_hash), conf_delay)?; + let list = rpc.list_since_block(Some(&last_hash), conf_delay as u32)?; // Only keep ids and category let txs = list .transactions diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs @@ -30,7 +30,7 @@ use common::{ use loops::LoopResult; use std::path::PathBuf; -use crate::loops::{analysis::analysis, watcher::watcher, worker::worker}; +use crate::loops::{watcher::watcher, worker::worker}; mod fail_point; mod loops; @@ -140,7 +140,6 @@ fn init(config: Option<PathBuf>, init: Init) -> LoopResult<()> { fn run(config: Option<PathBuf>) { let state = WireState::load_taler_config(config.as_deref()); - let state: &'static _ = Box::leak(Box::new(state)); #[cfg(feature = "fail")] if state.btc_config.network == Network::Regtest { @@ -159,16 +158,11 @@ fn run(config: Option<PathBuf>) { // TODO Check wire wallet own config PAYTO address let rpc_watcher = auto_rpc_common(state.btc_config.clone()); - let rpc_analysis = auto_rpc_common(state.btc_config.clone()); let rpc_worker = auto_rpc_wallet(state.btc_config.clone(), WIRE_WALLET_NAME); let db_watcher = auto_reconnect_db(state.db_config.clone()); - let db_analysis = auto_reconnect_db(state.db_config.clone()); let db_worker = auto_reconnect_db(state.db_config.clone()); named_spawn("watcher", move || watcher(rpc_watcher, db_watcher)); - named_spawn("analysis", move || { - analysis(rpc_analysis, db_analysis, state) - }); worker(rpc_worker, db_worker, state); info!("btc-wire stopped"); } diff --git a/eth-wire/src/lib.rs b/eth-wire/src/lib.rs @@ -18,7 +18,6 @@ use std::{ fmt::Debug, path::{Path, PathBuf}, str::FromStr, - sync::atomic::AtomicU32, }; use common::{ @@ -109,7 +108,7 @@ pub trait RpcExtended: RpcClient { }) } - /// List new and removed transaction since the last sync state, returning a new sync state + /// List new and removed transaction since the last sync state and the size of the reorganized fork if any, returning a new sync state fn list_since_sync( &mut self, address: &Address, @@ -127,6 +126,7 @@ pub trait RpcExtended: RpcClient { let mut txs = Vec::new(); let mut removed = Vec::new(); + let mut fork_len = 0; // Add pending transaction txs.extend(match_tx(self.pending_transactions()?, 0)); @@ -153,6 +153,7 @@ pub trait RpcExtended: RpcClient { chain_cursor = self.block(&chain_cursor.parent_hash)?.unwrap(); fork_cursor = self.block(&fork_cursor.parent_hash)?.unwrap(); confirmation += 1; + fork_len += 1; } } @@ -166,6 +167,7 @@ pub trait RpcExtended: RpcClient { Ok(ListSinceSync { txs, removed, + fork_len, state: SyncState { tip_hash: latest.hash.unwrap(), tip_height: latest.number.unwrap(), @@ -190,6 +192,7 @@ pub struct ListSinceSync { pub txs: Vec<SyncTransaction>, pub removed: Vec<SyncTransaction>, pub state: SyncState, + pub fork_len: u32, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -221,7 +224,7 @@ const DEFAULT_CONFIRMATION: u16 = 37; const DEFAULT_BOUNCE_FEE: &str = "0.00001"; pub struct WireState { - pub confirmation: AtomicU32, + pub confirmation: u32, pub max_confirmations: u32, pub address: H160, pub bounce_fee: U256, @@ -240,7 +243,7 @@ impl WireState { let init_confirmation = taler_config.confirmation().unwrap_or(DEFAULT_CONFIRMATION) as u32; let payto = taler_config.payto(); Self { - confirmation: AtomicU32::new(init_confirmation), + confirmation: init_confirmation, max_confirmations: init_confirmation * 2, address: eth_payto_addr(&payto).unwrap(), ipc_path, diff --git a/eth-wire/src/loops/analysis.rs b/eth-wire/src/loops/analysis.rs @@ -14,86 +14,21 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::sync::atomic::Ordering; - -use common::{ - log::log::{error, warn}, - postgres::fallible_iterator::FallibleIterator, - reconnect::AutoReconnectDb, -}; -use eth_wire::rpc::{AutoRpcCommon, Rpc, RpcClient}; -use ethereum_types::{H256, U64}; - -use crate::WireState; +use common::log::log::warn; use super::LoopResult; /// Analyse blockchain behavior and adapt confirmations in real time -pub fn analysis(mut rpc: AutoRpcCommon, mut db: AutoReconnectDb, state: &WireState) { - // The biggest fork ever seen - let mut max_seen = 0; - let mut tip: Option<(U64, H256)> = None; - loop { - let rpc = rpc.client(); - let db = db.client(); - let result: LoopResult<()> = (|| { - // Register as listener - db.batch_execute("LISTEN new_block")?; - loop { - // Get biggest known valid fork - let (fork, new_tip) = check_fork(rpc, &tip)?; - tip.replace(new_tip); - // The first time we see a fork that big - if fork > max_seen { - max_seen = fork; - let current_conf = state.confirmation.load(Ordering::SeqCst); - // If new fork is bigger than the current confirmation - if fork > current_conf { - // Max two time the configuration - let new_conf = fork.min(state.max_confirmations); - state.confirmation.store(new_conf, Ordering::SeqCst); - warn!( - "analysis: found dangerous fork of {} blocks, adapt confirmation to {} blocks capped at {}, you should update taler.conf", - fork, new_conf, state.max_confirmations - ); - } - } - - // TODO smarter analysis: suspicious transaction value, limit wire bitcoin throughput - - // Wait for next notification - db.notifications().blocking_iter().next()?; - } - })(); - if let Err(e) = result { - error!("analysis: {}", e); - } - } -} - -/// Return fork size and new tip -pub fn check_fork(rpc: &mut Rpc, tip: &Option<(U64, H256)>) -> LoopResult<(u32, (U64, H256))> { - let mut size = 0; - let latest = rpc.latest_block()?; - if let Some((number, hash)) = tip { - let mut chain_cursor = latest.clone(); - // Skip until tip height - while chain_cursor.number.unwrap() != *number { - chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap(); - size += 1; - } - - // Check fork - if chain_cursor.hash.unwrap() != *hash { - let mut fork_cursor = rpc.block(hash)?.unwrap(); - while fork_cursor.hash != chain_cursor.hash { - chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap(); - fork_cursor = rpc.block(&fork_cursor.parent_hash)?.unwrap(); - size += 1; - } - } else { - size = 0; - } +pub fn analysis(fork: u32, current: u32, max: u32) -> LoopResult<u32> { + // If new fork is bigger than what current confirmation delay protect against + if fork >= current { + // Limit confirmation growth + let new_conf = fork.saturating_add(1).min(max); + warn!( + "analysis: found dangerous fork of {} blocks, adapt confirmation to {} blocks capped at {}, you should update taler.conf", + fork, new_conf, max + ); + return Ok(new_conf); } - Ok((size, (latest.number.unwrap(), latest.hash.unwrap()))) + return Ok(current); } diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs @@ -13,7 +13,7 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::{fmt::Write, sync::atomic::Ordering, time::SystemTime}; +use std::{fmt::Write, time::SystemTime}; use common::{ api_common::base32, @@ -27,7 +27,7 @@ use common::{ use eth_wire::{ rpc::{self, AutoRpcWallet, Rpc, RpcClient, Transaction, TransactionRequest}, taler_util::{eth_payto_url, eth_to_taler}, - RpcExtended, SyncState, SyncTransaction, + ListSinceSync, RpcExtended, SyncState, SyncTransaction, }; use ethereum_types::{Address, H256, U256}; @@ -38,9 +38,9 @@ use crate::{ WireState, }; -use super::LoopResult; +use super::{analysis::analysis, LoopResult}; -pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, state: &WireState) { +pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, mut state: WireState) { let mut lifetime = state.lifetime; let mut status = true; let mut skip_notification = false; @@ -90,15 +90,26 @@ pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, state: &WireState return Err(LoopError::Concurrency); } + // Get stored sync state + let row = db.query_one("SELECT value FROM state WHERE name='sync'", &[])?; + let sync_state = SyncState::from_bytes(&sql_array(&row, 0)); + + // Get changes + let list = rpc.list_since_sync(&state.address, sync_state, state.confirmation)?; + + // Perform analysis + state.confirmation = + analysis(list.fork_len, state.confirmation, state.max_confirmations)?; + // Sync chain - if sync_chain(rpc, db, state, &mut status)? { + if sync_chain(db, &state, &mut status, list)? { // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent // Send requested debits - while debit(db, rpc, state)? {} + while debit(db, rpc, &state)? {} // Bump stuck transactions - while bump(db, rpc, state)? {} + while bump(db, rpc, &state)? {} // Send requested bounce while bounce(db, rpc, state.bounce_fee)? {} @@ -124,18 +135,13 @@ pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, state: &WireState /// Parse new transactions, return true if the database is up to date with the latest mined block fn sync_chain( - rpc: &mut Rpc, db: &mut Client, state: &WireState, status: &mut bool, + list: ListSinceSync, ) -> LoopResult<bool> { - // Get stored sync state - let row = db.query_one("SELECT value FROM state WHERE name='sync'", &[])?; - let sync_state = SyncState::from_bytes(&sql_array(&row, 0)); - // Use the same confirmation delay for this sync - let conf_delay = state.confirmation.load(Ordering::SeqCst); - - let list = rpc.list_since_sync(&state.address, sync_state, conf_delay)?; + // Get the current confirmation delay + let conf_delay = state.confirmation; // Check if a confirmed incoming transaction have been removed by a blockchain reorganization let new_status = sync_chain_removed(&list.txs, &list.removed, db, &state.address, conf_delay)?; diff --git a/eth-wire/src/main.rs b/eth-wire/src/main.rs @@ -29,7 +29,7 @@ use eth_wire::{ SyncState, WireState, }; use ethereum_types::H160; -use loops::{analysis::analysis, watcher::watcher, worker::worker, LoopResult}; +use loops::{watcher::watcher, worker::worker, LoopResult}; mod fail_point; mod loops; @@ -142,21 +142,14 @@ fn init(config: Option<PathBuf>, init: Init) -> LoopResult<()> { fn run(config: Option<PathBuf>) { let state = WireState::load_taler_config(config.as_deref()); - let state: &'static _ = Box::leak(Box::new(state)); let rpc_worker = auto_rpc_wallet(state.ipc_path.clone(), state.address); - let rpc_analysis = auto_rpc_common(state.ipc_path.clone()); let rpc_watcher = auto_rpc_common(state.ipc_path.clone()); let db_watcher = auto_reconnect_db(state.db_config.clone()); - let db_analysis = auto_reconnect_db(state.db_config.clone()); let db_worker = auto_reconnect_db(state.db_config.clone()); named_spawn("watcher", move || watcher(rpc_watcher, db_watcher)); - named_spawn("analysis", move || { - analysis(rpc_analysis, db_analysis, state) - }); - worker(rpc_worker, db_worker, state); info!("eth-wire stopped"); } diff --git a/instrumentation/src/btc.rs b/instrumentation/src/btc.rs @@ -18,7 +18,6 @@ use std::{ ops::{Deref, DerefMut}, path::{Path, PathBuf}, str::FromStr, - sync::atomic::Ordering, thread::sleep, time::Duration, }; @@ -346,7 +345,6 @@ impl BtcCtx { .unwrap(); common_rpc.mine(1, &reserve_addr).unwrap(); - let conf = state.confirmation.load(Ordering::SeqCst) as u16; Self { common, btc_node, @@ -357,8 +355,8 @@ impl BtcCtx { wire_addr, client_addr, reserve_addr, + conf: state.confirmation as u16, state, - conf, _btc_node2: btc_node2, common_rpc2, } @@ -981,6 +979,7 @@ fn analysis() { // Recover orphaned transaction ctx.next_conf(); + ctx.next_block(); // Conf have changed ctx.expect_wire_balance(after, false); ctx.expect_gateway_up(); @@ -996,6 +995,7 @@ fn analysis() { ctx.expect_gateway_up(); ctx.cluster_fork(5); ctx.expect_wire_balance(before, false); + std::thread::sleep(Duration::from_secs(3)); // Give some time for the gateway to be down ctx.expect_gateway_up(); } diff --git a/instrumentation/src/eth.rs b/instrumentation/src/eth.rs @@ -18,7 +18,6 @@ use std::{ io::Write, ops::{Deref, DerefMut}, path::Path, - sync::atomic::Ordering, thread::sleep, time::Duration, }; @@ -356,7 +355,6 @@ impl EthCtx { for addr in [&client_addr, &reserve_addr] { rpc.unlock_account(addr, &passwd).unwrap(); } - let conf = state.confirmation.load(Ordering::SeqCst) as u16; Self { node, @@ -364,9 +362,9 @@ impl EthCtx { reserve_addr, client_addr, wire_addr, + conf: state.confirmation as u16, state, common, - conf, passwd, } } @@ -981,7 +979,7 @@ fn analysis() { // Perform fork and check eth-wire hard error ctx.expect_gateway_up(); - ctx.cluster_fork(6); + ctx.cluster_fork(5); ctx.expect_wire_balance(before, false); ctx.expect_gateway_down(); @@ -1002,6 +1000,7 @@ fn analysis() { ctx.expect_gateway_up(); ctx.cluster_fork(5); ctx.expect_wire_balance(before, false); + std::thread::sleep(Duration::from_secs(3)); ctx.expect_gateway_up(); }