diff options
author | Antoine A <> | 2022-02-10 15:40:15 +0100 |
---|---|---|
committer | Antoine A <> | 2022-02-10 15:40:37 +0100 |
commit | f8e660fa3915e3b7f4320dda808db582d20e6de6 (patch) | |
tree | cc3f24f994f4d71c7fb9c1de16050ba87399b84e /eth-wire | |
parent | 60e4b9be19afea1a91e0b29cf4ff14af6f41dbfa (diff) | |
download | depolymerization-f8e660fa3915e3b7f4320dda808db582d20e6de6.tar.gz depolymerization-f8e660fa3915e3b7f4320dda808db582d20e6de6.tar.bz2 depolymerization-f8e660fa3915e3b7f4320dda808db582d20e6de6.zip |
eth-wire: add analysis
Diffstat (limited to 'eth-wire')
-rw-r--r-- | eth-wire/src/loops.rs | 1 | ||||
-rw-r--r-- | eth-wire/src/loops/analysis.rs | 97 | ||||
-rw-r--r-- | eth-wire/src/main.rs | 12 |
3 files changed, 107 insertions, 3 deletions
diff --git a/eth-wire/src/loops.rs b/eth-wire/src/loops.rs index 6bcc715..e47b51e 100644 --- a/eth-wire/src/loops.rs +++ b/eth-wire/src/loops.rs @@ -16,3 +16,4 @@ pub mod watcher; pub mod worker; +pub mod analysis; diff --git a/eth-wire/src/loops/analysis.rs b/eth-wire/src/loops/analysis.rs new file mode 100644 index 0000000..c1333af --- /dev/null +++ b/eth-wire/src/loops/analysis.rs @@ -0,0 +1,97 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::sync::atomic::Ordering; + +use common::{ + log::log::{error, warn}, + postgres::fallible_iterator::FallibleIterator, + reconnect::AutoReconnectDb, +}; +use eth_wire::rpc::{AutoReconnectRPC, Rpc}; +use ethereum_types::{H256, U64}; + +use crate::{LoopResult, WireState}; + +/// Analyse blockchain behavior and adapt confirmations in real time +pub fn analysis(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb, state: &WireState) { + // The biggest fork ever seen + let mut max_seen = 0; + let mut tip: Option<(U64, H256)> = None; + loop { + let rpc = rpc.client(); + let db = db.client(); + let result: LoopResult<()> = (|| { + // Register as listener + db.batch_execute("LISTEN new_block")?; + loop { + // Get biggest known valid fork + let (fork, new_tip) = check_fork(rpc, &tip)?; + tip.replace(new_tip); + // The first time we see a fork that big + if fork > max_seen { + max_seen = fork; + let current_conf = state.confirmation.load(Ordering::SeqCst); + // If new fork is bigger than the current confirmation + if fork > current_conf { + // Max two time the configuration + let new_conf = fork.min(state.config.confirmation * 2); + state.confirmation.store(new_conf, Ordering::SeqCst); + warn!( + "analysis: found dangerous fork of {} blocks, adapt confirmation to {} blocks capped at {}, you should update taler.conf", + fork, new_conf, state.config.confirmation * 2 + ); + } + } + + // TODO smarter analysis: suspicious transaction value, limit wire bitcoin throughput + + // Wait for next notification + db.notifications().blocking_iter().next()?; + } + })(); + if let Err(e) = result { + error!("analysis: {}", e); + } + } +} + +/// Return fork size and new tip +pub fn check_fork(rpc: &mut Rpc, tip: &Option<(U64, H256)>) -> LoopResult<(u16, (U64, H256))> { + let mut size = 0; + let latest = rpc.latest_block()?; + if let Some((number, hash)) = tip { + let mut chain_cursor = latest.clone(); + // Skip until tip height + while chain_cursor.number.unwrap() != *number { + chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap(); + size += 1; + } + + // Check fork + if chain_cursor.hash.unwrap() != *hash { + let mut fork_cursor = rpc.block(hash)?.unwrap(); + while fork_cursor.hash != chain_cursor.hash { + chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap(); + fork_cursor = rpc.block(&fork_cursor.parent_hash)?.unwrap(); + size += 1; + } + } else { + size = 0; + } + } + Ok((size, (latest.number.unwrap(), latest.hash.unwrap()))) +} diff --git a/eth-wire/src/main.rs b/eth-wire/src/main.rs index fdcd618..65c60d0 100644 --- a/eth-wire/src/main.rs +++ b/eth-wire/src/main.rs @@ -18,7 +18,7 @@ use std::sync::atomic::AtomicU16; use common::{ config::{load_eth_config, EthConfig}, - postgres, + named_spawn, postgres, reconnect::auto_reconnect_db, }; use eth_wire::{ @@ -27,7 +27,7 @@ use eth_wire::{ }; use ethereum_types::H160; use fail_point::Injected; -use loops::{watcher::watcher, worker::worker}; +use loops::{analysis::analysis, watcher::watcher, worker::worker}; mod fail_point; mod loops; @@ -66,13 +66,19 @@ fn main() { })); let rpc_worker = auto_reconnect_rpc(state.config.core.data_dir.clone().unwrap(), state.address); + let rpc_analysis = + auto_reconnect_rpc(state.config.core.data_dir.clone().unwrap(), state.address); let rpc_watcher = auto_reconnect_rpc(state.config.core.data_dir.clone().unwrap(), state.address); let db_watcher = auto_reconnect_db(state.config.core.db_url.clone()); + let db_analysis = auto_reconnect_db(state.config.core.db_url.clone()); let db_worker = auto_reconnect_db(state.config.core.db_url.clone()); - std::thread::spawn(move || watcher(rpc_watcher, db_watcher)); + named_spawn("watcher", move || watcher(rpc_watcher, db_watcher)); + named_spawn("analysis", move || { + analysis(rpc_analysis, db_analysis, state) + }); worker(rpc_worker, db_worker, state); } |