summaryrefslogtreecommitdiff
path: root/eth-wire
diff options
context:
space:
mode:
authorAntoine A <>2022-09-15 21:15:19 +0200
committerAntoine A <>2022-09-15 21:16:03 +0200
commit2304c7ac5b272d47c14e8d54f21c85afc0a0e01d (patch)
tree097270a511e9a63c4f1bde19ecb3dd4ee7f2d1db /eth-wire
parent83c8149b3562964c669dd614271b63edb4ce7019 (diff)
downloaddepolymerization-2304c7ac5b272d47c14e8d54f21c85afc0a0e01d.tar.gz
depolymerization-2304c7ac5b272d47c14e8d54f21c85afc0a0e01d.tar.bz2
depolymerization-2304c7ac5b272d47c14e8d54f21c85afc0a0e01d.zip
Fix analysis and merge analysis loop into worker loop
Diffstat (limited to 'eth-wire')
-rw-r--r--eth-wire/src/lib.rs11
-rw-r--r--eth-wire/src/loops/analysis.rs89
-rw-r--r--eth-wire/src/loops/worker.rs36
-rw-r--r--eth-wire/src/main.rs9
4 files changed, 41 insertions, 104 deletions
diff --git a/eth-wire/src/lib.rs b/eth-wire/src/lib.rs
index 041f3ba..71ab818 100644
--- a/eth-wire/src/lib.rs
+++ b/eth-wire/src/lib.rs
@@ -18,7 +18,6 @@ use std::{
fmt::Debug,
path::{Path, PathBuf},
str::FromStr,
- sync::atomic::AtomicU32,
};
use common::{
@@ -109,7 +108,7 @@ pub trait RpcExtended: RpcClient {
})
}
- /// List new and removed transaction since the last sync state, returning a new sync state
+ /// List new and removed transaction since the last sync state and the size of the reorganized fork if any, returning a new sync state
fn list_since_sync(
&mut self,
address: &Address,
@@ -127,6 +126,7 @@ pub trait RpcExtended: RpcClient {
let mut txs = Vec::new();
let mut removed = Vec::new();
+ let mut fork_len = 0;
// Add pending transaction
txs.extend(match_tx(self.pending_transactions()?, 0));
@@ -153,6 +153,7 @@ pub trait RpcExtended: RpcClient {
chain_cursor = self.block(&chain_cursor.parent_hash)?.unwrap();
fork_cursor = self.block(&fork_cursor.parent_hash)?.unwrap();
confirmation += 1;
+ fork_len += 1;
}
}
@@ -166,6 +167,7 @@ pub trait RpcExtended: RpcClient {
Ok(ListSinceSync {
txs,
removed,
+ fork_len,
state: SyncState {
tip_hash: latest.hash.unwrap(),
tip_height: latest.number.unwrap(),
@@ -190,6 +192,7 @@ pub struct ListSinceSync {
pub txs: Vec<SyncTransaction>,
pub removed: Vec<SyncTransaction>,
pub state: SyncState,
+ pub fork_len: u32,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
@@ -221,7 +224,7 @@ const DEFAULT_CONFIRMATION: u16 = 37;
const DEFAULT_BOUNCE_FEE: &str = "0.00001";
pub struct WireState {
- pub confirmation: AtomicU32,
+ pub confirmation: u32,
pub max_confirmations: u32,
pub address: H160,
pub bounce_fee: U256,
@@ -240,7 +243,7 @@ impl WireState {
let init_confirmation = taler_config.confirmation().unwrap_or(DEFAULT_CONFIRMATION) as u32;
let payto = taler_config.payto();
Self {
- confirmation: AtomicU32::new(init_confirmation),
+ confirmation: init_confirmation,
max_confirmations: init_confirmation * 2,
address: eth_payto_addr(&payto).unwrap(),
ipc_path,
diff --git a/eth-wire/src/loops/analysis.rs b/eth-wire/src/loops/analysis.rs
index 471409c..546eb1e 100644
--- a/eth-wire/src/loops/analysis.rs
+++ b/eth-wire/src/loops/analysis.rs
@@ -14,86 +14,21 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::sync::atomic::Ordering;
-
-use common::{
- log::log::{error, warn},
- postgres::fallible_iterator::FallibleIterator,
- reconnect::AutoReconnectDb,
-};
-use eth_wire::rpc::{AutoRpcCommon, Rpc, RpcClient};
-use ethereum_types::{H256, U64};
-
-use crate::WireState;
+use common::log::log::warn;
use super::LoopResult;
/// Analyse blockchain behavior and adapt confirmations in real time
-pub fn analysis(mut rpc: AutoRpcCommon, mut db: AutoReconnectDb, state: &WireState) {
- // The biggest fork ever seen
- let mut max_seen = 0;
- let mut tip: Option<(U64, H256)> = None;
- loop {
- let rpc = rpc.client();
- let db = db.client();
- let result: LoopResult<()> = (|| {
- // Register as listener
- db.batch_execute("LISTEN new_block")?;
- loop {
- // Get biggest known valid fork
- let (fork, new_tip) = check_fork(rpc, &tip)?;
- tip.replace(new_tip);
- // The first time we see a fork that big
- if fork > max_seen {
- max_seen = fork;
- let current_conf = state.confirmation.load(Ordering::SeqCst);
- // If new fork is bigger than the current confirmation
- if fork > current_conf {
- // Max two time the configuration
- let new_conf = fork.min(state.max_confirmations);
- state.confirmation.store(new_conf, Ordering::SeqCst);
- warn!(
- "analysis: found dangerous fork of {} blocks, adapt confirmation to {} blocks capped at {}, you should update taler.conf",
- fork, new_conf, state.max_confirmations
- );
- }
- }
-
- // TODO smarter analysis: suspicious transaction value, limit wire bitcoin throughput
-
- // Wait for next notification
- db.notifications().blocking_iter().next()?;
- }
- })();
- if let Err(e) = result {
- error!("analysis: {}", e);
- }
- }
-}
-
-/// Return fork size and new tip
-pub fn check_fork(rpc: &mut Rpc, tip: &Option<(U64, H256)>) -> LoopResult<(u32, (U64, H256))> {
- let mut size = 0;
- let latest = rpc.latest_block()?;
- if let Some((number, hash)) = tip {
- let mut chain_cursor = latest.clone();
- // Skip until tip height
- while chain_cursor.number.unwrap() != *number {
- chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap();
- size += 1;
- }
-
- // Check fork
- if chain_cursor.hash.unwrap() != *hash {
- let mut fork_cursor = rpc.block(hash)?.unwrap();
- while fork_cursor.hash != chain_cursor.hash {
- chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap();
- fork_cursor = rpc.block(&fork_cursor.parent_hash)?.unwrap();
- size += 1;
- }
- } else {
- size = 0;
- }
+pub fn analysis(fork: u32, current: u32, max: u32) -> LoopResult<u32> {
+ // If new fork is bigger than what current confirmation delay protect against
+ if fork >= current {
+ // Limit confirmation growth
+ let new_conf = fork.saturating_add(1).min(max);
+ warn!(
+ "analysis: found dangerous fork of {} blocks, adapt confirmation to {} blocks capped at {}, you should update taler.conf",
+ fork, new_conf, max
+ );
+ return Ok(new_conf);
}
- Ok((size, (latest.number.unwrap(), latest.hash.unwrap())))
+ return Ok(current);
}
diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs
index da68160..379ea6f 100644
--- a/eth-wire/src/loops/worker.rs
+++ b/eth-wire/src/loops/worker.rs
@@ -13,7 +13,7 @@
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::{fmt::Write, sync::atomic::Ordering, time::SystemTime};
+use std::{fmt::Write, time::SystemTime};
use common::{
api_common::base32,
@@ -27,7 +27,7 @@ use common::{
use eth_wire::{
rpc::{self, AutoRpcWallet, Rpc, RpcClient, Transaction, TransactionRequest},
taler_util::{eth_payto_url, eth_to_taler},
- RpcExtended, SyncState, SyncTransaction,
+ ListSinceSync, RpcExtended, SyncState, SyncTransaction,
};
use ethereum_types::{Address, H256, U256};
@@ -38,9 +38,9 @@ use crate::{
WireState,
};
-use super::LoopResult;
+use super::{analysis::analysis, LoopResult};
-pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, state: &WireState) {
+pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, mut state: WireState) {
let mut lifetime = state.lifetime;
let mut status = true;
let mut skip_notification = false;
@@ -90,15 +90,26 @@ pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, state: &WireState
return Err(LoopError::Concurrency);
}
+ // Get stored sync state
+ let row = db.query_one("SELECT value FROM state WHERE name='sync'", &[])?;
+ let sync_state = SyncState::from_bytes(&sql_array(&row, 0));
+
+ // Get changes
+ let list = rpc.list_since_sync(&state.address, sync_state, state.confirmation)?;
+
+ // Perform analysis
+ state.confirmation =
+ analysis(list.fork_len, state.confirmation, state.max_confirmations)?;
+
// Sync chain
- if sync_chain(rpc, db, state, &mut status)? {
+ if sync_chain(db, &state, &mut status, list)? {
// As we are now in sync with the blockchain if a transaction has Requested status it have not been sent
// Send requested debits
- while debit(db, rpc, state)? {}
+ while debit(db, rpc, &state)? {}
// Bump stuck transactions
- while bump(db, rpc, state)? {}
+ while bump(db, rpc, &state)? {}
// Send requested bounce
while bounce(db, rpc, state.bounce_fee)? {}
@@ -124,18 +135,13 @@ pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, state: &WireState
/// Parse new transactions, return true if the database is up to date with the latest mined block
fn sync_chain(
- rpc: &mut Rpc,
db: &mut Client,
state: &WireState,
status: &mut bool,
+ list: ListSinceSync,
) -> LoopResult<bool> {
- // Get stored sync state
- let row = db.query_one("SELECT value FROM state WHERE name='sync'", &[])?;
- let sync_state = SyncState::from_bytes(&sql_array(&row, 0));
- // Use the same confirmation delay for this sync
- let conf_delay = state.confirmation.load(Ordering::SeqCst);
-
- let list = rpc.list_since_sync(&state.address, sync_state, conf_delay)?;
+ // Get the current confirmation delay
+ let conf_delay = state.confirmation;
// Check if a confirmed incoming transaction have been removed by a blockchain reorganization
let new_status = sync_chain_removed(&list.txs, &list.removed, db, &state.address, conf_delay)?;
diff --git a/eth-wire/src/main.rs b/eth-wire/src/main.rs
index 5ef5a74..287cfd3 100644
--- a/eth-wire/src/main.rs
+++ b/eth-wire/src/main.rs
@@ -29,7 +29,7 @@ use eth_wire::{
SyncState, WireState,
};
use ethereum_types::H160;
-use loops::{analysis::analysis, watcher::watcher, worker::worker, LoopResult};
+use loops::{watcher::watcher, worker::worker, LoopResult};
mod fail_point;
mod loops;
@@ -142,21 +142,14 @@ fn init(config: Option<PathBuf>, init: Init) -> LoopResult<()> {
fn run(config: Option<PathBuf>) {
let state = WireState::load_taler_config(config.as_deref());
- let state: &'static _ = Box::leak(Box::new(state));
let rpc_worker = auto_rpc_wallet(state.ipc_path.clone(), state.address);
- let rpc_analysis = auto_rpc_common(state.ipc_path.clone());
let rpc_watcher = auto_rpc_common(state.ipc_path.clone());
let db_watcher = auto_reconnect_db(state.db_config.clone());
- let db_analysis = auto_reconnect_db(state.db_config.clone());
let db_worker = auto_reconnect_db(state.db_config.clone());
named_spawn("watcher", move || watcher(rpc_watcher, db_watcher));
- named_spawn("analysis", move || {
- analysis(rpc_analysis, db_analysis, state)
- });
-
worker(rpc_worker, db_worker, state);
info!("eth-wire stopped");
}