diff options
author | Antoine A <> | 2022-02-08 15:31:37 +0100 |
---|---|---|
committer | Antoine A <> | 2022-02-08 15:31:37 +0100 |
commit | 0a63ad8bcd97be33fbfa87e358de3bd99891300c (patch) | |
tree | 211b69bb6bc7d4bdfedad3af6b0c20aca3bc2618 /eth-wire | |
parent | f1aaee3fa20cfaa595eb4d3716d32d918b3c2dae (diff) | |
download | depolymerization-0a63ad8bcd97be33fbfa87e358de3bd99891300c.tar.gz depolymerization-0a63ad8bcd97be33fbfa87e358de3bd99891300c.tar.bz2 depolymerization-0a63ad8bcd97be33fbfa87e358de3bd99891300c.zip |
eth-wire: handle RPC and database reconnection
Diffstat (limited to 'eth-wire')
-rw-r--r-- | eth-wire/src/bin/eth-wire-utils.rs | 40 | ||||
-rw-r--r-- | eth-wire/src/loops/watcher.rs | 25 | ||||
-rw-r--r-- | eth-wire/src/loops/worker.rs | 130 | ||||
-rw-r--r-- | eth-wire/src/main.rs | 38 | ||||
-rw-r--r-- | eth-wire/src/rpc.rs | 22 |
5 files changed, 204 insertions, 51 deletions
diff --git a/eth-wire/src/bin/eth-wire-utils.rs b/eth-wire/src/bin/eth-wire-utils.rs index 26f5ddd..1d72dd7 100644 --- a/eth-wire/src/bin/eth-wire-utils.rs +++ b/eth-wire/src/bin/eth-wire-utils.rs @@ -15,10 +15,17 @@ */ use std::{path::PathBuf, str::FromStr}; -use common::{api_common::Amount, log::init, rand_slice}; +use common::{ + api_common::Amount, + config::{Config, CoreConfig}, + log::init, + postgres::{Client, NoTls}, + rand_slice, +}; use eth_wire::{ rpc::{hex::Hex, Rpc, TransactionRequest}, taler_util::taler_to_eth, + SyncState, }; use ethereum_types::H160; @@ -38,7 +45,7 @@ enum Cmd { Send(SendCmd), Deposit(DepositCmd), Mine(MineCmd), - ClearDB(ClearCmd), + ResetDb(ResetCmd), Balance(BalanceCmd), Connect(ConnectCmd), Disconnect(DisconnectCmd), @@ -99,12 +106,12 @@ struct MineCmd { } #[derive(argh::FromArgs)] -#[argh(subcommand, name = "cleardb")] +#[argh(subcommand, name = "resetdb")] /// Clear database -struct ClearCmd { +struct ResetCmd { #[argh(positional)] /// taler config - config: PathBuf, + config: String, } #[derive(argh::FromArgs)] @@ -197,7 +204,28 @@ fn main() { let balance = rpc.balance(&addr).unwrap(); println!("{}", (balance / 10_000_000_000u64).as_u64()); } - Cmd::ClearDB(_) => todo!(), + Cmd::ResetDb(ResetCmd { config }) => { + let config = CoreConfig::load_taler_config(Some(&config)); + let block = rpc.earliest_block().unwrap(); + let mut db = Client::connect(&config.db_url, NoTls).unwrap(); + let mut tx = db.transaction().unwrap(); + // Clear transaction tables and reset state + tx.execute("DELETE FROM tx_in", &[]).unwrap(); + tx.execute("DELETE FROM tx_out", &[]).unwrap(); + tx.execute("DELETE FROM bounce", &[]).unwrap(); + tx.execute( + "UPDATE state SET value=$1 WHERE name='sync'", + &[&SyncState { + tip_hash: block.hash.unwrap(), + tip_height: block.number.unwrap(), + conf_height: block.number.unwrap(), + } + .to_bytes() + .as_ref()], + ) + .unwrap(); + tx.commit().unwrap(); + } Cmd::Connect(ConnectCmd { datadir }) => { let mut peer = Rpc::new(datadir.join("geth.ipc")).unwrap(); let mut enode = peer.node_info().unwrap().enode; diff --git a/eth-wire/src/loops/watcher.rs b/eth-wire/src/loops/watcher.rs index 42b3ecb..05396ab 100644 --- a/eth-wire/src/loops/watcher.rs +++ b/eth-wire/src/loops/watcher.rs @@ -13,13 +13,26 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use eth_wire::rpc::Rpc; -use common::postgres::Client; +use common::{log::log::error, reconnect::AutoReconnectDb}; +use eth_wire::rpc::AutoReconnectRPC; -pub fn watcher(mut rpc: Rpc, mut db: Client) { - let mut notifier = rpc.subscribe_new_head().unwrap(); +use crate::LoopResult; + +/// Wait for new block and notify arrival with postgreSQL notifications +pub fn watcher(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb) { loop { - db.execute("NOTIFY new_block", &[]).unwrap(); - notifier.next().unwrap(); + let rpc = rpc.client(); + let db = db.client(); + + let result: LoopResult<()> = (|| { + let mut notifier = rpc.subscribe_new_head()?; + loop { + db.execute("NOTIFY new_block", &[])?; + notifier.next()?; + } + })(); + if let Err(e) = result { + error!("watcher: {}", e); + } } } diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs index b812c10..8f70a24 100644 --- a/eth-wire/src/loops/worker.rs +++ b/eth-wire/src/loops/worker.rs @@ -17,14 +17,15 @@ use std::{fmt::Write, sync::atomic::Ordering, time::SystemTime}; use common::{ api_common::base32, - log::log::{error, info}, + log::log::{error, info, warn}, postgres::{fallible_iterator::FallibleIterator, Client}, + reconnect::AutoReconnectDb, sql::{sql_array, sql_url}, status::{BounceStatus, WithdrawStatus}, }; use eth_wire::{ - metadata::InMetadata, - rpc::{self, Rpc, Transaction}, + metadata::{InMetadata, OutMetadata}, + rpc::{self, AutoReconnectRPC, Rpc, Transaction}, taler_util::{eth_payto_url, eth_to_taler}, SyncState, }; @@ -35,7 +36,7 @@ use crate::{ LoopResult, WireState, }; -pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) { +pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb, state: &WireState) { let mut lifetime = state.config.wire_lifetime; let mut status = true; let mut skip_notification = false; @@ -51,6 +52,10 @@ pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) { } } + // Connect + let rpc = rpc.client(); + let db = db.client(); + let result: LoopResult<()> = (|| { // Listen to all channels db.batch_execute("LISTEN new_block; LISTEN new_tx")?; @@ -66,18 +71,23 @@ pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) { while iter.next()?.is_some() {} } - sync_chain(&mut rpc, &mut db, state, &mut status)?; + // Sync chain + sync_chain(rpc, db, state, &mut status)?; + + // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent - while withdraw(&mut db, &mut rpc, state)? {} + // Send requested withdraws + while withdraw(db, rpc, state)? {} - while bounce(&mut db, &mut rpc, U256::from(state.config.bounce_fee))? {} + // Send requested bounce + while bounce(db, rpc, U256::from(state.config.bounce_fee))? {} Ok(()) })(); if let Err(e) = result { error!("worker: {}", e); - skip_notification = false; + skip_notification = true; } else { skip_notification = false; } @@ -150,6 +160,7 @@ fn list_since_block_state( )) } +/// Parse new transactions, return true if the database is up to date with the latest mined block fn sync_chain( rpc: &mut Rpc, db: &mut Client, @@ -215,6 +226,107 @@ fn sync_chain( )?; } } + } else if tx.from == Some(state.address) { + match OutMetadata::decode(&tx.input) { + Ok(metadata) => match metadata { + OutMetadata::Withdraw { wtid, .. } => { + let amount = eth_to_taler(&tx.value); + let credit_addr = tx.to.unwrap(); + // Get previous out tx + let row = db.query_opt( + "SELECT id, status, txid FROM tx_out WHERE wtid=$1 FOR UPDATE", + &[&wtid.as_ref()], + )?; + if let Some(row) = row { + // If already in database, sync status + let row_id: i32 = row.get(0); + let status: i16 = row.get(1); + match WithdrawStatus::try_from(status as u8).unwrap() { + WithdrawStatus::Requested => { + let nb_row = db.execute( + "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3 AND status=$4", + &[ + &(WithdrawStatus::Sent as i16), + &tx.hash.as_ref(), + &row_id, + &status, + ], + )?; + if nb_row > 0 { + warn!( + ">> (recovered) {} {} in {} to {}", + amount, + base32(&wtid), + hex::encode(tx.hash), + hex::encode(credit_addr) + ); + } + } + WithdrawStatus::Sent => { /* Status is correct */ } + } + } else { + // Else add to database + let date = SystemTime::now(); + let nb = db.execute( + "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", + &[&date, &amount.to_string(), &wtid.as_ref(), ð_payto_url(&state.address).as_ref(), ð_payto_url(&credit_addr).as_ref(), &state.config.base_url.as_ref(), &(WithdrawStatus::Sent as i16), &tx.hash.as_ref(), &None::<&[u8]>], + )?; + if nb > 0 { + warn!( + ">> (onchain) {} {} in {} to {}", + amount, + base32(&wtid), + hex::encode(tx.hash), + hex::encode(credit_addr) + ); + } + } + } + OutMetadata::Bounce { bounced } => { + // Get previous bounce + let row = db.query_opt( + "SELECT id, status FROM bounce WHERE bounced=$1", + &[&bounced.as_ref()], + )?; + if let Some(row) = row { + // If already in database, sync status + let row_id: i32 = row.get(0); + let status: i16 = row.get(1); + match BounceStatus::try_from(status as u8).unwrap() { + BounceStatus::Requested => { + let nb_row = db.execute( + "UPDATE bounce SET status=$1, txid=$2 WHERE id=$3 AND status=$4", + &[&(BounceStatus::Sent as i16), &tx.hash.as_ref(), &row_id, &status], + )?; + if nb_row > 0 { + warn!( + "|| (recovered) {} in {}", + &bounced, + hex::encode(tx.hash) + ); + } + } + BounceStatus::Ignored => error!( + "watcher: ignored bounce {} found in chain at {}", + bounced, + hex::encode(tx.hash) + ), + BounceStatus::Sent => { /* Status is correct */ } + } + } else { + // Else add to database + let nb = db.execute( + "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", + &[&bounced.as_ref(), &tx.hash.as_ref(), &(BounceStatus::Sent as i16)], + )?; + if nb > 0 { + warn!("|| (onchain) {} in {}", &bounced, hex::encode(tx.hash)); + } + } + } + }, + Err(_) => { /* Ignore */ } + } } } @@ -238,7 +350,7 @@ fn sync_chain_removed( // - An incoming invalid transactions already bounced // Those two cases can compromise ethereum backing // Removed outgoing transactions will be retried automatically by the node - + let mut blocking_deposit = Vec::new(); let mut blocking_bounce = Vec::new(); diff --git a/eth-wire/src/main.rs b/eth-wire/src/main.rs index 8165c4a..65e1a23 100644 --- a/eth-wire/src/main.rs +++ b/eth-wire/src/main.rs @@ -18,10 +18,11 @@ use std::sync::atomic::AtomicU16; use common::{ config::{load_eth_config, EthConfig}, - postgres::{self, Client, NoTls}, + postgres, + reconnect::auto_reconnect_db, }; use eth_wire::{ - rpc::{self, Rpc}, + rpc::{self, auto_reconnect_rpc}, taler_util::eth_payto_addr, }; use ethereum_types::H160; @@ -62,33 +63,12 @@ fn main() { config, })); - let mut rpc_worker = Rpc::new( - state - .config - .core - .data_dir - .as_ref() - .unwrap() - .join("geth.ipc"), - ) - .unwrap(); - - rpc_worker - .unlock_account(&state.address, "password") - .unwrap(); - - let rpc_watcher = Rpc::new( - state - .config - .core - .data_dir - .as_ref() - .unwrap() - .join("geth.ipc"), - ) - .unwrap(); - let db_watcher = Client::connect(&state.config.core.db_url, NoTls).unwrap(); - let db_worker = Client::connect(&state.config.core.db_url, NoTls).unwrap(); + let rpc_worker = auto_reconnect_rpc(state.config.core.data_dir.clone().unwrap(), state.address); + let rpc_watcher = + auto_reconnect_rpc(state.config.core.data_dir.clone().unwrap(), state.address); + + let db_watcher = auto_reconnect_db(state.config.core.db_url.clone()); + let db_worker = auto_reconnect_db(state.config.core.db_url.clone()); std::thread::spawn(move || watcher(rpc_watcher, db_watcher)); diff --git a/eth-wire/src/rpc.rs b/eth-wire/src/rpc.rs index e626cd6..cff9b86 100644 --- a/eth-wire/src/rpc.rs +++ b/eth-wire/src/rpc.rs @@ -19,7 +19,7 @@ //! We only parse the thing we actually use, this reduce memory usage and //! make our code more compatible with future deprecation -use common::url::Url; +use common::{log::log::error, reconnect::AutoReconnect, url::Url}; use ethereum_types::{Address, H256, U256, U64}; use serde::de::DeserializeOwned; use serde_json::error::Category; @@ -33,6 +33,22 @@ use std::{ use self::hex::Hex; +pub type AutoReconnectRPC = AutoReconnect<(PathBuf, Address), Rpc>; + +pub fn auto_reconnect_rpc(data_dir: PathBuf, address: Address) -> AutoReconnectRPC { + AutoReconnect::new( + (data_dir.join("geth.ipc"), address), + |(path, address)| { + let mut rpc = Rpc::new(path) + .map_err(|err| error!("connect RPC: {}", err)) + .ok()?; + rpc.unlock_account(address, "password").ok()?; + Some(rpc) + }, + |client| client.node_info().is_err(), + ) +} + #[derive(Debug, serde::Serialize)] struct RpcRequest<'a, T: serde::Serialize> { method: &'a str, @@ -232,6 +248,10 @@ impl Rpc { self.call("eth_getBlockByNumber", &("latest", &true)) } + pub fn earliest_block(&mut self) -> Result<Block> { + self.call("eth_getBlockByNumber", &("earliest", &true)) + } + pub fn balance(&mut self, addr: &Address) -> Result<U256> { self.call("eth_getBalance", &(addr, "latest")) } |