diff options
author | Antoine A <> | 2022-09-15 21:15:19 +0200 |
---|---|---|
committer | Antoine A <> | 2022-09-15 21:16:03 +0200 |
commit | 2304c7ac5b272d47c14e8d54f21c85afc0a0e01d (patch) | |
tree | 097270a511e9a63c4f1bde19ecb3dd4ee7f2d1db /eth-wire | |
parent | 83c8149b3562964c669dd614271b63edb4ce7019 (diff) | |
download | depolymerization-2304c7ac5b272d47c14e8d54f21c85afc0a0e01d.tar.gz depolymerization-2304c7ac5b272d47c14e8d54f21c85afc0a0e01d.tar.bz2 depolymerization-2304c7ac5b272d47c14e8d54f21c85afc0a0e01d.zip |
Fix analysis and merge analysis loop into worker loop
Diffstat (limited to 'eth-wire')
-rw-r--r-- | eth-wire/src/lib.rs | 11 | ||||
-rw-r--r-- | eth-wire/src/loops/analysis.rs | 89 | ||||
-rw-r--r-- | eth-wire/src/loops/worker.rs | 36 | ||||
-rw-r--r-- | eth-wire/src/main.rs | 9 |
4 files changed, 41 insertions, 104 deletions
diff --git a/eth-wire/src/lib.rs b/eth-wire/src/lib.rs index 041f3ba..71ab818 100644 --- a/eth-wire/src/lib.rs +++ b/eth-wire/src/lib.rs @@ -18,7 +18,6 @@ use std::{ fmt::Debug, path::{Path, PathBuf}, str::FromStr, - sync::atomic::AtomicU32, }; use common::{ @@ -109,7 +108,7 @@ pub trait RpcExtended: RpcClient { }) } - /// List new and removed transaction since the last sync state, returning a new sync state + /// List new and removed transaction since the last sync state and the size of the reorganized fork if any, returning a new sync state fn list_since_sync( &mut self, address: &Address, @@ -127,6 +126,7 @@ pub trait RpcExtended: RpcClient { let mut txs = Vec::new(); let mut removed = Vec::new(); + let mut fork_len = 0; // Add pending transaction txs.extend(match_tx(self.pending_transactions()?, 0)); @@ -153,6 +153,7 @@ pub trait RpcExtended: RpcClient { chain_cursor = self.block(&chain_cursor.parent_hash)?.unwrap(); fork_cursor = self.block(&fork_cursor.parent_hash)?.unwrap(); confirmation += 1; + fork_len += 1; } } @@ -166,6 +167,7 @@ pub trait RpcExtended: RpcClient { Ok(ListSinceSync { txs, removed, + fork_len, state: SyncState { tip_hash: latest.hash.unwrap(), tip_height: latest.number.unwrap(), @@ -190,6 +192,7 @@ pub struct ListSinceSync { pub txs: Vec<SyncTransaction>, pub removed: Vec<SyncTransaction>, pub state: SyncState, + pub fork_len: u32, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -221,7 +224,7 @@ const DEFAULT_CONFIRMATION: u16 = 37; const DEFAULT_BOUNCE_FEE: &str = "0.00001"; pub struct WireState { - pub confirmation: AtomicU32, + pub confirmation: u32, pub max_confirmations: u32, pub address: H160, pub bounce_fee: U256, @@ -240,7 +243,7 @@ impl WireState { let init_confirmation = taler_config.confirmation().unwrap_or(DEFAULT_CONFIRMATION) as u32; let payto = taler_config.payto(); Self { - confirmation: AtomicU32::new(init_confirmation), + confirmation: init_confirmation, max_confirmations: init_confirmation * 2, address: eth_payto_addr(&payto).unwrap(), ipc_path, diff --git a/eth-wire/src/loops/analysis.rs b/eth-wire/src/loops/analysis.rs index 471409c..546eb1e 100644 --- a/eth-wire/src/loops/analysis.rs +++ b/eth-wire/src/loops/analysis.rs @@ -14,86 +14,21 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::sync::atomic::Ordering; - -use common::{ - log::log::{error, warn}, - postgres::fallible_iterator::FallibleIterator, - reconnect::AutoReconnectDb, -}; -use eth_wire::rpc::{AutoRpcCommon, Rpc, RpcClient}; -use ethereum_types::{H256, U64}; - -use crate::WireState; +use common::log::log::warn; use super::LoopResult; /// Analyse blockchain behavior and adapt confirmations in real time -pub fn analysis(mut rpc: AutoRpcCommon, mut db: AutoReconnectDb, state: &WireState) { - // The biggest fork ever seen - let mut max_seen = 0; - let mut tip: Option<(U64, H256)> = None; - loop { - let rpc = rpc.client(); - let db = db.client(); - let result: LoopResult<()> = (|| { - // Register as listener - db.batch_execute("LISTEN new_block")?; - loop { - // Get biggest known valid fork - let (fork, new_tip) = check_fork(rpc, &tip)?; - tip.replace(new_tip); - // The first time we see a fork that big - if fork > max_seen { - max_seen = fork; - let current_conf = state.confirmation.load(Ordering::SeqCst); - // If new fork is bigger than the current confirmation - if fork > current_conf { - // Max two time the configuration - let new_conf = fork.min(state.max_confirmations); - state.confirmation.store(new_conf, Ordering::SeqCst); - warn!( - "analysis: found dangerous fork of {} blocks, adapt confirmation to {} blocks capped at {}, you should update taler.conf", - fork, new_conf, state.max_confirmations - ); - } - } - - // TODO smarter analysis: suspicious transaction value, limit wire bitcoin throughput - - // Wait for next notification - db.notifications().blocking_iter().next()?; - } - })(); - if let Err(e) = result { - error!("analysis: {}", e); - } - } -} - -/// Return fork size and new tip -pub fn check_fork(rpc: &mut Rpc, tip: &Option<(U64, H256)>) -> LoopResult<(u32, (U64, H256))> { - let mut size = 0; - let latest = rpc.latest_block()?; - if let Some((number, hash)) = tip { - let mut chain_cursor = latest.clone(); - // Skip until tip height - while chain_cursor.number.unwrap() != *number { - chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap(); - size += 1; - } - - // Check fork - if chain_cursor.hash.unwrap() != *hash { - let mut fork_cursor = rpc.block(hash)?.unwrap(); - while fork_cursor.hash != chain_cursor.hash { - chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap(); - fork_cursor = rpc.block(&fork_cursor.parent_hash)?.unwrap(); - size += 1; - } - } else { - size = 0; - } +pub fn analysis(fork: u32, current: u32, max: u32) -> LoopResult<u32> { + // If new fork is bigger than what current confirmation delay protect against + if fork >= current { + // Limit confirmation growth + let new_conf = fork.saturating_add(1).min(max); + warn!( + "analysis: found dangerous fork of {} blocks, adapt confirmation to {} blocks capped at {}, you should update taler.conf", + fork, new_conf, max + ); + return Ok(new_conf); } - Ok((size, (latest.number.unwrap(), latest.hash.unwrap()))) + return Ok(current); } diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs index da68160..379ea6f 100644 --- a/eth-wire/src/loops/worker.rs +++ b/eth-wire/src/loops/worker.rs @@ -13,7 +13,7 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::{fmt::Write, sync::atomic::Ordering, time::SystemTime}; +use std::{fmt::Write, time::SystemTime}; use common::{ api_common::base32, @@ -27,7 +27,7 @@ use common::{ use eth_wire::{ rpc::{self, AutoRpcWallet, Rpc, RpcClient, Transaction, TransactionRequest}, taler_util::{eth_payto_url, eth_to_taler}, - RpcExtended, SyncState, SyncTransaction, + ListSinceSync, RpcExtended, SyncState, SyncTransaction, }; use ethereum_types::{Address, H256, U256}; @@ -38,9 +38,9 @@ use crate::{ WireState, }; -use super::LoopResult; +use super::{analysis::analysis, LoopResult}; -pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, state: &WireState) { +pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, mut state: WireState) { let mut lifetime = state.lifetime; let mut status = true; let mut skip_notification = false; @@ -90,15 +90,26 @@ pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, state: &WireState return Err(LoopError::Concurrency); } + // Get stored sync state + let row = db.query_one("SELECT value FROM state WHERE name='sync'", &[])?; + let sync_state = SyncState::from_bytes(&sql_array(&row, 0)); + + // Get changes + let list = rpc.list_since_sync(&state.address, sync_state, state.confirmation)?; + + // Perform analysis + state.confirmation = + analysis(list.fork_len, state.confirmation, state.max_confirmations)?; + // Sync chain - if sync_chain(rpc, db, state, &mut status)? { + if sync_chain(db, &state, &mut status, list)? { // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent // Send requested debits - while debit(db, rpc, state)? {} + while debit(db, rpc, &state)? {} // Bump stuck transactions - while bump(db, rpc, state)? {} + while bump(db, rpc, &state)? {} // Send requested bounce while bounce(db, rpc, state.bounce_fee)? {} @@ -124,18 +135,13 @@ pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, state: &WireState /// Parse new transactions, return true if the database is up to date with the latest mined block fn sync_chain( - rpc: &mut Rpc, db: &mut Client, state: &WireState, status: &mut bool, + list: ListSinceSync, ) -> LoopResult<bool> { - // Get stored sync state - let row = db.query_one("SELECT value FROM state WHERE name='sync'", &[])?; - let sync_state = SyncState::from_bytes(&sql_array(&row, 0)); - // Use the same confirmation delay for this sync - let conf_delay = state.confirmation.load(Ordering::SeqCst); - - let list = rpc.list_since_sync(&state.address, sync_state, conf_delay)?; + // Get the current confirmation delay + let conf_delay = state.confirmation; // Check if a confirmed incoming transaction have been removed by a blockchain reorganization let new_status = sync_chain_removed(&list.txs, &list.removed, db, &state.address, conf_delay)?; diff --git a/eth-wire/src/main.rs b/eth-wire/src/main.rs index 5ef5a74..287cfd3 100644 --- a/eth-wire/src/main.rs +++ b/eth-wire/src/main.rs @@ -29,7 +29,7 @@ use eth_wire::{ SyncState, WireState, }; use ethereum_types::H160; -use loops::{analysis::analysis, watcher::watcher, worker::worker, LoopResult}; +use loops::{watcher::watcher, worker::worker, LoopResult}; mod fail_point; mod loops; @@ -142,21 +142,14 @@ fn init(config: Option<PathBuf>, init: Init) -> LoopResult<()> { fn run(config: Option<PathBuf>) { let state = WireState::load_taler_config(config.as_deref()); - let state: &'static _ = Box::leak(Box::new(state)); let rpc_worker = auto_rpc_wallet(state.ipc_path.clone(), state.address); - let rpc_analysis = auto_rpc_common(state.ipc_path.clone()); let rpc_watcher = auto_rpc_common(state.ipc_path.clone()); let db_watcher = auto_reconnect_db(state.db_config.clone()); - let db_analysis = auto_reconnect_db(state.db_config.clone()); let db_worker = auto_reconnect_db(state.db_config.clone()); named_spawn("watcher", move || watcher(rpc_watcher, db_watcher)); - named_spawn("analysis", move || { - analysis(rpc_analysis, db_analysis, state) - }); - worker(rpc_worker, db_worker, state); info!("eth-wire stopped"); } |