summaryrefslogtreecommitdiff
path: root/eth-wire
diff options
context:
space:
mode:
authorAntoine A <>2022-02-10 15:40:15 +0100
committerAntoine A <>2022-02-10 15:40:37 +0100
commitf8e660fa3915e3b7f4320dda808db582d20e6de6 (patch)
treecc3f24f994f4d71c7fb9c1de16050ba87399b84e /eth-wire
parent60e4b9be19afea1a91e0b29cf4ff14af6f41dbfa (diff)
downloaddepolymerization-f8e660fa3915e3b7f4320dda808db582d20e6de6.tar.gz
depolymerization-f8e660fa3915e3b7f4320dda808db582d20e6de6.tar.bz2
depolymerization-f8e660fa3915e3b7f4320dda808db582d20e6de6.zip
eth-wire: add analysis
Diffstat (limited to 'eth-wire')
-rw-r--r--eth-wire/src/loops.rs1
-rw-r--r--eth-wire/src/loops/analysis.rs97
-rw-r--r--eth-wire/src/main.rs12
3 files changed, 107 insertions, 3 deletions
diff --git a/eth-wire/src/loops.rs b/eth-wire/src/loops.rs
index 6bcc715..e47b51e 100644
--- a/eth-wire/src/loops.rs
+++ b/eth-wire/src/loops.rs
@@ -16,3 +16,4 @@
pub mod watcher;
pub mod worker;
+pub mod analysis;
diff --git a/eth-wire/src/loops/analysis.rs b/eth-wire/src/loops/analysis.rs
new file mode 100644
index 0000000..c1333af
--- /dev/null
+++ b/eth-wire/src/loops/analysis.rs
@@ -0,0 +1,97 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2022 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use std::sync::atomic::Ordering;
+
+use common::{
+ log::log::{error, warn},
+ postgres::fallible_iterator::FallibleIterator,
+ reconnect::AutoReconnectDb,
+};
+use eth_wire::rpc::{AutoReconnectRPC, Rpc};
+use ethereum_types::{H256, U64};
+
+use crate::{LoopResult, WireState};
+
+/// Analyse blockchain behavior and adapt confirmations in real time
+pub fn analysis(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb, state: &WireState) {
+ // The biggest fork ever seen
+ let mut max_seen = 0;
+ let mut tip: Option<(U64, H256)> = None;
+ loop {
+ let rpc = rpc.client();
+ let db = db.client();
+ let result: LoopResult<()> = (|| {
+ // Register as listener
+ db.batch_execute("LISTEN new_block")?;
+ loop {
+ // Get biggest known valid fork
+ let (fork, new_tip) = check_fork(rpc, &tip)?;
+ tip.replace(new_tip);
+ // The first time we see a fork that big
+ if fork > max_seen {
+ max_seen = fork;
+ let current_conf = state.confirmation.load(Ordering::SeqCst);
+ // If new fork is bigger than the current confirmation
+ if fork > current_conf {
+ // Max two time the configuration
+ let new_conf = fork.min(state.config.confirmation * 2);
+ state.confirmation.store(new_conf, Ordering::SeqCst);
+ warn!(
+ "analysis: found dangerous fork of {} blocks, adapt confirmation to {} blocks capped at {}, you should update taler.conf",
+ fork, new_conf, state.config.confirmation * 2
+ );
+ }
+ }
+
+ // TODO smarter analysis: suspicious transaction value, limit wire bitcoin throughput
+
+ // Wait for next notification
+ db.notifications().blocking_iter().next()?;
+ }
+ })();
+ if let Err(e) = result {
+ error!("analysis: {}", e);
+ }
+ }
+}
+
+/// Return fork size and new tip
+pub fn check_fork(rpc: &mut Rpc, tip: &Option<(U64, H256)>) -> LoopResult<(u16, (U64, H256))> {
+ let mut size = 0;
+ let latest = rpc.latest_block()?;
+ if let Some((number, hash)) = tip {
+ let mut chain_cursor = latest.clone();
+ // Skip until tip height
+ while chain_cursor.number.unwrap() != *number {
+ chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap();
+ size += 1;
+ }
+
+ // Check fork
+ if chain_cursor.hash.unwrap() != *hash {
+ let mut fork_cursor = rpc.block(hash)?.unwrap();
+ while fork_cursor.hash != chain_cursor.hash {
+ chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap();
+ fork_cursor = rpc.block(&fork_cursor.parent_hash)?.unwrap();
+ size += 1;
+ }
+ } else {
+ size = 0;
+ }
+ }
+ Ok((size, (latest.number.unwrap(), latest.hash.unwrap())))
+}
diff --git a/eth-wire/src/main.rs b/eth-wire/src/main.rs
index fdcd618..65c60d0 100644
--- a/eth-wire/src/main.rs
+++ b/eth-wire/src/main.rs
@@ -18,7 +18,7 @@ use std::sync::atomic::AtomicU16;
use common::{
config::{load_eth_config, EthConfig},
- postgres,
+ named_spawn, postgres,
reconnect::auto_reconnect_db,
};
use eth_wire::{
@@ -27,7 +27,7 @@ use eth_wire::{
};
use ethereum_types::H160;
use fail_point::Injected;
-use loops::{watcher::watcher, worker::worker};
+use loops::{analysis::analysis, watcher::watcher, worker::worker};
mod fail_point;
mod loops;
@@ -66,13 +66,19 @@ fn main() {
}));
let rpc_worker = auto_reconnect_rpc(state.config.core.data_dir.clone().unwrap(), state.address);
+ let rpc_analysis =
+ auto_reconnect_rpc(state.config.core.data_dir.clone().unwrap(), state.address);
let rpc_watcher =
auto_reconnect_rpc(state.config.core.data_dir.clone().unwrap(), state.address);
let db_watcher = auto_reconnect_db(state.config.core.db_url.clone());
+ let db_analysis = auto_reconnect_db(state.config.core.db_url.clone());
let db_worker = auto_reconnect_db(state.config.core.db_url.clone());
- std::thread::spawn(move || watcher(rpc_watcher, db_watcher));
+ named_spawn("watcher", move || watcher(rpc_watcher, db_watcher));
+ named_spawn("analysis", move || {
+ analysis(rpc_analysis, db_analysis, state)
+ });
worker(rpc_worker, db_worker, state);
}