commit f1aaee3fa20cfaa595eb4d3716d32d918b3c2dae
parent 835931410dcce1d49dab2caf9b5b1492ba3d30ff
Author: Antoine A <>
Date: Mon, 7 Feb 2022 20:13:50 +0100
Add eth-wire reorg handling and fix btc-wire bounce reorg handling
Diffstat:
13 files changed, 443 insertions(+), 104 deletions(-)
diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs
@@ -14,7 +14,7 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
use std::{
- collections::{HashMap, HashSet},
+ collections::HashMap,
fmt::Write,
sync::atomic::Ordering,
time::{Duration, SystemTime},
@@ -50,8 +50,8 @@ use super::{LoopError, LoopResult};
pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, state: &WireState) {
let mut lifetime = state.config.wire_lifetime;
let mut status = true;
-
let mut skip_notification = false;
+
loop {
// Check lifetime
if let Some(nb) = lifetime.as_mut() {
@@ -212,7 +212,11 @@ fn sync_chain(
let min_confirmations = state.confirmation.load(Ordering::SeqCst);
// Get a set of transactions ids to parse
- let (txs, removed, lastblock): (HashMap<Txid, (Category, i32)>, HashSet<Txid>, BlockHash) = {
+ let (txs, removed, lastblock): (
+ HashMap<Txid, (Category, i32)>,
+ HashMap<Txid, (Category, i32)>,
+ BlockHash,
+ ) = {
// Get all transactions made since this block
let list = rpc.list_since_block(Some(&last_hash), min_confirmations, true)?;
// Only keep ids and category
@@ -224,15 +228,15 @@ fn sync_chain(
let removed = list
.removed
.into_iter()
- .filter_map(|tx| (tx.category == Category::Receive).then(|| tx.txid))
+ .map(|tx| (tx.txid, (tx.category, tx.confirmations)))
.collect();
(txs, removed, list.lastblock)
};
// Check if a confirmed incoming transaction have been removed by a blockchain reorganisation
-
let new_status = sync_chain_removed(&txs, &removed, rpc, db, min_confirmations as i32)?;
+ // Sync status with database
if *status != new_status {
let mut tx = db.transaction()?;
tx.execute(
@@ -242,11 +246,14 @@ fn sync_chain(
tx.execute("NOTIFY status", &[])?;
tx.commit()?;
*status = new_status;
+ if new_status {
+ info!("Recovered lost transactions");
+ }
}
-
if !new_status {
return Ok(false);
}
+
for (id, (category, confirmations)) in txs {
match category {
Category::Send => sync_chain_outgoing(&id, confirmations, rpc, db, state)?,
@@ -271,33 +278,38 @@ fn sync_chain(
/// Sync database with removed transactions, return false if bitcoin backing is compromised
fn sync_chain_removed(
txs: &HashMap<Txid, (Category, i32)>,
- removed: &HashSet<Txid>,
+ removed: &HashMap<Txid, (Category, i32)>,
rpc: &mut Rpc,
db: &mut Client,
min_confirmations: i32,
) -> LoopResult<bool> {
- // Removed transactions are correctness issue in only two cases:
+ // Removed transactions are correctness issues in only two cases:
// - An incoming valid transaction considered confirmed in the database
// - An incoming invalid transactions already bounced
// Those two cases can compromise bitcoin backing
// Removed outgoing transactions will be retried automatically by the node
- let mut blocking_receive = Vec::new();
+ let mut blocking_deposit = Vec::new();
let mut blocking_bounce = Vec::new();
- for id in removed {
+
+ // Only keep incoming transaction that are not reconfirmed
+ // TODO study risk of accepting only mined transactions for faster recovery
+ for (id, _) in removed.iter().filter(|(id, (cat, _))| {
+ *cat == Category::Receive
+ && txs
+ .get(*id)
+ .map(|(_, confirmations)| *confirmations < min_confirmations)
+ .unwrap_or(true)
+ }) {
match rpc.get_tx_segwit_key(id) {
Ok((full, key)) => {
- // Valid tx are only problematic if not confirmed in the txs list and stored stored in the database
- if txs
- .get(id)
- .map(|(_, confirmations)| *confirmations < min_confirmations)
- .unwrap_or(true)
- && db
- .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_ref()])?
- .is_some()
+ // Deposit are only problematic if not reconfirmed and stored in the database
+ if db
+ .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_ref()])?
+ .is_some()
{
let debit_addr = sender_address(rpc, &full)?;
- blocking_receive.push((key, id, debit_addr));
+ blocking_deposit.push((key, id, debit_addr));
}
}
Err(err) => match err {
@@ -318,12 +330,12 @@ fn sync_chain_removed(
}
}
- if !blocking_bounce.is_empty() || !blocking_receive.is_empty() {
+ if !blocking_bounce.is_empty() || !blocking_deposit.is_empty() {
let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string();
- for (key, id, addr) in blocking_receive {
+ for (key, id, addr) in blocking_deposit {
write!(
&mut buf,
- "\n\treceived {} in {} from {}",
+ "\n\tdeposit {} in {} from {}",
base32(&key),
id,
addr
diff --git a/btc-wire/src/rpc.rs b/btc-wire/src/rpc.rs
@@ -70,7 +70,7 @@ pub enum Error {
Bitcoin(String),
#[error("JSON: {0}")]
Json(#[from] serde_json::Error),
- #[error("No result or error")]
+ #[error("Null rpc, no result or error")]
Null,
}
diff --git a/common/src/config.rs b/common/src/config.rs
@@ -131,7 +131,7 @@ pub fn load_btc_config(path: Option<&str>) -> BtcConfig {
return config;
}
-pub type EthConfig = WireConfig<1000000, 24>;
+pub type EthConfig = WireConfig<10_000_000_000_000, 24>;
pub fn load_eth_config(path: Option<&str>) -> EthConfig {
let config = WireConfig::load_taler_config(path);
diff --git a/eth-wire/src/bin/eth-wire-cli.rs b/eth-wire/src/bin/eth-wire-cli.rs
@@ -14,12 +14,13 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use eth_wire::{rpc::Rpc, BlockState};
-use ethereum_types::H160;
use common::{
config::{Config, CoreConfig},
- postgres::{Client, NoTls}, log::init,
+ log::init,
+ postgres::{Client, NoTls},
};
+use eth_wire::{rpc::Rpc, SyncState};
+use ethereum_types::H160;
fn main() {
init();
@@ -51,18 +52,19 @@ fn main() {
// Skip previous blocks
let block = rpc.latest_block().expect("Failed to get current block");
- let state = BlockState {
- hash: block.hash.unwrap(),
- number: block.number.unwrap(),
+ let state = SyncState {
+ tip_hash: block.hash.unwrap(),
+ tip_height: block.number.unwrap(),
+ conf_height: block.number.unwrap(),
};
let nb_row = db
.execute(
- "INSERT INTO state (name, value) VALUES ('last_block', $1) ON CONFLICT (name) DO NOTHING",
+ "INSERT INTO state (name, value) VALUES ('sync', $1) ON CONFLICT (name) DO NOTHING",
&[&state.to_bytes().as_ref()],
)
.expect("Failed to update database state");
if nb_row > 0 {
- println!("Skipped {} previous block", state.number);
+ println!("Skipped {} previous block", state.conf_height);
}
let prev_addr = db
diff --git a/eth-wire/src/bin/eth-wire-utils.rs b/eth-wire/src/bin/eth-wire-utils.rs
@@ -15,7 +15,7 @@
*/
use std::{path::PathBuf, str::FromStr};
-use common::{api_common::Amount, rand_slice, log::init};
+use common::{api_common::Amount, log::init, rand_slice};
use eth_wire::{
rpc::{hex::Hex, Rpc, TransactionRequest},
taler_util::taler_to_eth,
@@ -40,6 +40,8 @@ enum Cmd {
Mine(MineCmd),
ClearDB(ClearCmd),
Balance(BalanceCmd),
+ Connect(ConnectCmd),
+ Disconnect(DisconnectCmd),
}
#[derive(argh::FromArgs)]
@@ -102,7 +104,7 @@ struct MineCmd {
struct ClearCmd {
#[argh(positional)]
/// taler config
- config: String,
+ config: PathBuf,
}
#[derive(argh::FromArgs)]
@@ -114,9 +116,28 @@ struct BalanceCmd {
addr: String,
}
+#[derive(argh::FromArgs)]
+#[argh(subcommand, name = "connect")]
+/// Add a peer
+struct ConnectCmd {
+ #[argh(positional)]
+ /// peer datadir
+ datadir: PathBuf,
+}
+
+#[derive(argh::FromArgs)]
+#[argh(subcommand, name = "disconnect")]
+/// Remove a peer
+struct DisconnectCmd {
+ #[argh(positional)]
+ /// peer datadir
+ datadir: PathBuf,
+}
+
fn main() {
init();
let args: Args = argh::from_env();
+ let mut rpc = Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap();
match args.cmd {
Cmd::Deposit(DepositCmd {
from,
@@ -124,7 +145,6 @@ fn main() {
amounts,
fmt,
}) => {
- let mut rpc = Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap();
let from = H160::from_str(&from).unwrap();
let to = H160::from_str(&to).unwrap();
rpc.unlock_account(&from, "password").ok();
@@ -140,7 +160,6 @@ fn main() {
fmt,
amounts,
}) => {
- let mut rpc = Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap();
let from = H160::from_str(&from).unwrap();
let to = H160::from_str(&to).unwrap();
rpc.unlock_account(&from, "password").ok();
@@ -159,7 +178,6 @@ fn main() {
}
}
Cmd::Mine(MineCmd { to, mut amount }) => {
- let mut rpc = Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap();
let to = H160::from_str(&to).unwrap();
rpc.unlock_account(&to, "password").ok();
let mut notifier = rpc.subscribe_new_head().unwrap();
@@ -175,11 +193,24 @@ fn main() {
rpc.miner_stop().unwrap();
}
Cmd::Balance(BalanceCmd { addr }) => {
- let mut rpc = Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap();
let addr = H160::from_str(&addr).unwrap();
let balance = rpc.balance(&addr).unwrap();
println!("{}", (balance / 10_000_000_000u64).as_u64());
}
Cmd::ClearDB(_) => todo!(),
+ Cmd::Connect(ConnectCmd { datadir }) => {
+ let mut peer = Rpc::new(datadir.join("geth.ipc")).unwrap();
+ let mut enode = peer.node_info().unwrap().enode;
+ // Replace ip with localhost because it is broken
+ enode.set_host(Some("127.0.0.1")).unwrap();
+ assert!(rpc.add_peer(&enode).unwrap());
+ }
+ Cmd::Disconnect(DisconnectCmd { datadir }) => {
+ let mut peer = Rpc::new(datadir.join("geth.ipc")).unwrap();
+ let mut enode = peer.node_info().unwrap().enode;
+ // Replace ip with localhost because it is broken
+ enode.set_host(Some("127.0.0.1")).unwrap();
+ assert!(rpc.remove_peer(&enode).unwrap());
+ }
}
}
diff --git a/eth-wire/src/lib.rs b/eth-wire/src/lib.rs
@@ -14,10 +14,10 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
+use common::url::Url;
use ethereum_types::{Address, H256, U256, U64};
use metadata::{InMetadata, OutMetadata};
use rpc::hex::Hex;
-use common::url::Url;
pub mod metadata;
pub mod rpc;
@@ -80,43 +80,47 @@ impl rpc::Rpc {
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub struct BlockState {
- pub hash: H256,
- pub number: U64,
+pub struct SyncState {
+ pub tip_hash: H256,
+ pub tip_height: U64,
+ pub conf_height: U64,
}
-impl BlockState {
- pub fn to_bytes(&self) -> [u8; 40] {
- let mut bytes = [0; 40];
- bytes[..32].copy_from_slice(self.hash.as_bytes());
- self.number.to_little_endian(&mut bytes[32..]);
+impl SyncState {
+ pub fn to_bytes(&self) -> [u8; 48] {
+ let mut bytes = [0; 48];
+ bytes[..32].copy_from_slice(self.tip_hash.as_bytes());
+ self.tip_height.to_little_endian(&mut bytes[32..40]);
+ self.conf_height.to_little_endian(&mut bytes[40..]);
bytes
}
- pub fn from_bytes(bytes: &[u8; 40]) -> Self {
+ pub fn from_bytes(bytes: &[u8; 48]) -> Self {
Self {
- hash: H256::from_slice(&bytes[..32]),
- number: U64::from_little_endian(&bytes[32..]),
+ tip_hash: H256::from_slice(&bytes[..32]),
+ tip_height: U64::from_little_endian(&bytes[32..40]),
+ conf_height: U64::from_little_endian(&bytes[40..]),
}
}
}
#[cfg(test)]
mod test {
- use ethereum_types::{H256, U64};
use common::{rand::random, rand_slice};
+ use ethereum_types::{H256, U64};
- use crate::BlockState;
+ use crate::SyncState;
#[test]
fn to_from_bytes_block_state() {
for _ in 0..4 {
- let state = BlockState {
- hash: H256::from_slice(&rand_slice::<32>()),
- number: U64::from(random::<u64>()),
+ let state = SyncState {
+ tip_hash: H256::from_slice(&rand_slice::<32>()),
+ tip_height: U64::from(random::<u64>()),
+ conf_height: U64::from(random::<u64>()),
};
let encoded = state.to_bytes();
- let decoded = BlockState::from_bytes(&encoded);
+ let decoded = SyncState::from_bytes(&encoded);
assert_eq!(state, decoded);
}
}
diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs
@@ -13,7 +13,7 @@
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::{sync::atomic::Ordering, time::SystemTime};
+use std::{fmt::Write, sync::atomic::Ordering, time::SystemTime};
use common::{
api_common::base32,
@@ -26,9 +26,9 @@ use eth_wire::{
metadata::InMetadata,
rpc::{self, Rpc, Transaction},
taler_util::{eth_payto_url, eth_to_taler},
- BlockState,
+ SyncState,
};
-use ethereum_types::{Address, H256, U256};
+use ethereum_types::{Address, H256, U256, U64};
use crate::{
sql::{sql_addr, sql_eth_amount, sql_hash},
@@ -37,7 +37,9 @@ use crate::{
pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) {
let mut lifetime = state.config.wire_lifetime;
+ let mut status = true;
let mut skip_notification = false;
+
loop {
// Check lifetime
if let Some(nb) = lifetime.as_mut() {
@@ -64,7 +66,7 @@ pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) {
while iter.next()?.is_some() {}
}
- sync_chain(&mut rpc, &mut db, state)?;
+ sync_chain(&mut rpc, &mut db, state, &mut status)?;
while withdraw(&mut db, &mut rpc, state)? {}
@@ -85,9 +87,9 @@ pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) {
fn list_since_block_state(
rpc: &mut Rpc,
address: &Address,
- state: BlockState,
+ state: SyncState,
min_confirmation: u16,
-) -> LoopResult<Option<(Vec<(Transaction, u16)>, BlockState)>> {
+) -> LoopResult<(Vec<(Transaction, u16)>, Vec<(Transaction, u16)>, SyncState)> {
let match_tx = |txs: Vec<Transaction>, conf: u16| -> Vec<(Transaction, u16)> {
txs.into_iter()
.filter_map(|tx| {
@@ -97,45 +99,91 @@ fn list_since_block_state(
};
let mut txs = Vec::new();
+ let mut removed = Vec::new();
+
// Add pending transaction
txs.extend(match_tx(rpc.pending_transactions()?, 0));
- let mut next_state = state;
+ let latest = rpc.latest_block()?;
+
let mut confirmation = 1;
- let mut current = rpc.latest_block()?;
-
- // Move backward until we reach the starting block
- while current.number.expect("Mined block") != state.number {
- if confirmation == min_confirmation {
- next_state = BlockState {
- hash: current.hash.unwrap(),
- number: current.number.unwrap(),
- };
- }
- txs.extend(match_tx(current.transactions, confirmation));
- if let Some(block) = rpc.block(¤t.parent_hash)? {
- current = block;
- } else {
- return Ok(None);
- }
+ let mut chain_cursor = latest.clone();
+
+ // Move until tip height
+ while chain_cursor.number.unwrap() != state.tip_height {
+ txs.extend(match_tx(chain_cursor.transactions, confirmation));
+ chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap();
confirmation += 1;
}
- if current.hash.unwrap() != state.hash {
- return Ok(None);
+ // Check if fork
+ if chain_cursor.hash.unwrap() != state.tip_hash {
+ let mut fork_cursor = rpc.block(&state.tip_hash)?.unwrap();
+ // Move until found common parent
+ while fork_cursor.hash != chain_cursor.hash {
+ txs.extend(match_tx(chain_cursor.transactions, confirmation));
+ removed.extend(match_tx(fork_cursor.transactions, confirmation));
+ chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap();
+ fork_cursor = rpc.block(&fork_cursor.parent_hash)?.unwrap();
+ confirmation += 1;
+ }
}
- Ok(Some((txs, next_state)))
+ // Move until last conf
+ while chain_cursor.number.unwrap() > state.conf_height {
+ txs.extend(match_tx(chain_cursor.transactions, confirmation));
+ chain_cursor = rpc.block(&chain_cursor.parent_hash)?.unwrap();
+ confirmation += 1;
+ }
+
+ Ok((
+ txs,
+ removed,
+ SyncState {
+ tip_hash: latest.hash.unwrap(),
+ tip_height: latest.number.unwrap(),
+ conf_height: latest
+ .number
+ .unwrap()
+ .saturating_sub(U64::from(min_confirmation)),
+ },
+ ))
}
-fn sync_chain(rpc: &mut Rpc, db: &mut Client, state: &WireState) -> LoopResult<bool> {
- let row = db.query_one("SELECT value FROM state WHERE name='last_block'", &[])?;
+fn sync_chain(
+ rpc: &mut Rpc,
+ db: &mut Client,
+ state: &WireState,
+ status: &mut bool,
+) -> LoopResult<bool> {
+ let row = db.query_one("SELECT value FROM state WHERE name='sync'", &[])?;
let slice: &[u8] = row.get(0);
- let block = BlockState::from_bytes(slice.try_into().unwrap());
+ let block = SyncState::from_bytes(slice.try_into().unwrap());
let min_confirmations = state.confirmation.load(Ordering::SeqCst);
- let (txs, next_state) =
- list_since_block_state(rpc, &state.address, block, min_confirmations)?.unwrap();
+ let (txs, removed, next_state) =
+ list_since_block_state(rpc, &state.address, block, min_confirmations)?;
+
+ // Check if a confirmed incoming transaction have been removed by a blockchain reorganisation
+ let new_status = sync_chain_removed(&txs, &removed, db, &state.address, min_confirmations)?;
+
+ // Sync status with database
+ if *status != new_status {
+ let mut tx = db.transaction()?;
+ tx.execute(
+ "UPDATE state SET value=$1 WHERE name='status'",
+ &[&[new_status as u8].as_ref()],
+ )?;
+ tx.execute("NOTIFY status", &[])?;
+ tx.commit()?;
+ *status = new_status;
+ if new_status {
+ info!("Recovered lost transactions");
+ }
+ }
+ if !new_status {
+ return Ok(false);
+ }
for (tx, confirmation) in txs {
if tx.to == Some(state.address) && confirmation >= min_confirmations {
@@ -171,12 +219,95 @@ fn sync_chain(rpc: &mut Rpc, db: &mut Client, state: &WireState) -> LoopResult<b
}
db.execute(
- "UPDATE state SET value=$1 WHERE name='last_block'",
+ "UPDATE state SET value=$1 WHERE name='sync'",
&[&next_state.to_bytes().as_ref()],
)?;
Ok(true)
}
+/// Sync database with removed transactions, return false if bitcoin backing is compromised
+fn sync_chain_removed(
+ txs: &[(Transaction, u16)],
+ removed: &[(Transaction, u16)],
+ db: &mut Client,
+ addr: &Address,
+ min_confirmation: u16,
+) -> LoopResult<bool> {
+ // Removed transactions are correctness issues in only two cases:
+ // - An incoming valid transaction considered confirmed in the database
+ // - An incoming invalid transactions already bounced
+ // Those two cases can compromise ethereum backing
+ // Removed outgoing transactions will be retried automatically by the node
+
+ let mut blocking_deposit = Vec::new();
+ let mut blocking_bounce = Vec::new();
+
+ // Only keep incoming transaction that are not reconfirmed
+ // TODO study risk of accepting only mined transactions for faster recovery
+ for (tx, _) in removed.iter().filter(|(tx, _)| {
+ tx.to == Some(*addr)
+ && txs
+ .iter()
+ .all(|(t, conf)| t.hash != tx.hash || *conf < min_confirmation)
+ }) {
+ match InMetadata::decode(&tx.input) {
+ Ok(metadata) => match metadata {
+ InMetadata::Deposit { reserve_pub } => {
+ // Deposit are only problematic if not reconfirmed and stored in the database
+ if db
+ .query_opt(
+ "SELECT 1 FROM tx_in WHERE reserve_pub=$1",
+ &[&reserve_pub.as_ref()],
+ )?
+ .is_some()
+ {
+ blocking_deposit.push((reserve_pub, tx.hash, tx.from.unwrap()));
+ }
+ }
+ },
+ Err(_) => {
+ // Invalid tx are only problematic if if not reconfirmed and already bounced
+ if let Some(row) = db.query_opt(
+ "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL",
+ &[&tx.hash.as_ref()],
+ )? {
+ blocking_bounce.push((sql_hash(&row, 0), tx.hash));
+ } else {
+ // Remove transaction from bounce table
+ db.execute("DELETE FROM bounce WHERE bounced=$1", &[&tx.hash.as_ref()])?;
+ }
+ }
+ }
+ }
+
+ if !blocking_bounce.is_empty() || !blocking_deposit.is_empty() {
+ let mut buf = "The following transaction have been removed from the blockchain, ethereum backing is compromised until the transaction reappear:".to_string();
+ for (key, id, addr) in blocking_deposit {
+ write!(
+ &mut buf,
+ "\n\tdeposit {} in {} from {}",
+ base32(&key),
+ hex::encode(id),
+ hex::encode(addr)
+ )
+ .unwrap();
+ }
+ for (id, bounced) in blocking_bounce {
+ write!(
+ &mut buf,
+ "\n\tbounce {} in {}",
+ hex::encode(id),
+ hex::encode(bounced)
+ )
+ .unwrap();
+ }
+ error!("{}", buf);
+ return Ok(false);
+ } else {
+ return Ok(true);
+ }
+}
+
/// Send a withdraw transaction on the blockchain, return false if no more requested transaction are found
fn withdraw(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> {
// We rely on the advisory lock to ensure we are the only one sending transactions
diff --git a/eth-wire/src/rpc.rs b/eth-wire/src/rpc.rs
@@ -19,6 +19,7 @@
//! We only parse the thing we actually use, this reduce memory usage and
//! make our code more compatible with future deprecation
+use common::url::Url;
use ethereum_types::{Address, H256, U256, U64};
use serde::de::DeserializeOwned;
use serde_json::error::Category;
@@ -60,7 +61,7 @@ pub enum Error {
RPC { code: i64, msg: String },
#[error("JSON: {0}")]
Json(#[from] serde_json::Error),
- #[error("No result or error")]
+ #[error("Null rpc, no result or error")]
Null,
}
@@ -186,19 +187,21 @@ impl Rpc {
}
pub fn get_transaction(&mut self, hash: &H256) -> Result<Option<Transaction>> {
- self.call("eth_getTransactionByHash", &[hash])
+ match self.call("eth_getTransactionByHash", &[hash]) {
+ Err(Error::Null) => Ok(None),
+ r => r,
+ }
}
pub fn send_transaction(&mut self, params: &TransactionRequest) -> Result<H256> {
self.call("eth_sendTransaction", &[params])
}
- pub fn block_at(&mut self, nb: &U64) -> Result<Option<Block>> {
- self.call("eth_getBlockByNumber", &(nb, &true))
- }
-
pub fn block(&mut self, hash: &H256) -> Result<Option<Block>> {
- self.call("eth_getBlockByHash", &(hash, &true))
+ match self.call("eth_getBlockByHash", &(hash, &true)) {
+ Err(Error::Null) => Ok(None),
+ r => r,
+ }
}
pub fn pending_transactions(&mut self) -> Result<Vec<Transaction>> {
@@ -232,6 +235,20 @@ impl Rpc {
pub fn balance(&mut self, addr: &Address) -> Result<U256> {
self.call("eth_getBalance", &(addr, "latest"))
}
+
+ /* ----- Peer management ----- */
+
+ pub fn node_info(&mut self) -> Result<NodeInfo> {
+ self.call("admin_nodeInfo", &EMPTY)
+ }
+
+ pub fn add_peer(&mut self, url: &Url) -> Result<bool> {
+ self.call("admin_addPeer", &[url])
+ }
+
+ pub fn remove_peer(&mut self, url: &Url) -> Result<bool> {
+ self.call("admin_removePeer", &[url])
+ }
}
pub struct RpcStream<T: Debug + DeserializeOwned> {
@@ -275,7 +292,7 @@ pub enum NotifEnd<T> {
End(bool),
}
-#[derive(Debug, serde::Deserialize)]
+#[derive(Debug, Clone, serde::Deserialize)]
pub struct Block {
/// Hash of the block
pub hash: Option<H256>,
@@ -292,7 +309,7 @@ pub struct Block {
pub struct BlockHead {}
/// Description of a Transaction, pending or in the chain.
-#[derive(Debug, serde::Deserialize)]
+#[derive(Debug, Clone, serde::Deserialize)]
pub struct Transaction {
/// Hash
pub hash: H256,
@@ -326,6 +343,11 @@ pub struct TransactionRequest {
pub data: Hex,
}
+#[derive(Debug, serde::Deserialize)]
+pub struct NodeInfo {
+ pub enode: Url,
+}
+
pub mod hex {
use std::{
fmt,
diff --git a/makefile b/makefile
@@ -22,5 +22,6 @@ test_btc: install
test_eth: install
test/eth/wire.sh
test/eth/lifetime.sh
+ test/eth/reorg.sh
test: install test_gateway test_eth test_btc
\ No newline at end of file
diff --git a/test/btc/reorg.sh b/test/btc/reorg.sh
@@ -111,6 +111,7 @@ echo " OK"
echo -n "Recover orphaned transactions:"
next_btc 6 # More block needed to confirm
check_balance "*" 0.00011000
+gateway_up
echo " OK"
echo "All tests passed!"
\ No newline at end of file
diff --git a/test/common.sh b/test/common.sh
@@ -47,9 +47,11 @@ function load_config() {
if [ "$CURRENCY" == "BTC" ]; then
WIRE_CLI="btc-wire-cli"
WIRE_UTILS="btc-wire-utils -d $WIRE_DIR"
+ WIRE_UTILS2="btc-wire-utils -d $WIRE_DIR2"
else
WIRE_CLI="eth-wire-cli"
WIRE_UTILS="eth-wire-utils -d $WIRE_DIR"
+ WIRE_UTILS2="eth-wire-utils -d $WIRE_DIR2"
fi
}
@@ -92,7 +94,7 @@ function reset_db() {
# Start a bitcoind regtest node, generate money, wallet and addresses
function init_btc() {
cp ${BASH_SOURCE%/*}/conf/${BTC_CONFIG:-bitcoin.conf} $WIRE_DIR/bitcoin.conf
- bitcoind -datadir=$WIRE_DIR $* &>> log/btc.log &
+ bitcoind -datadir=$WIRE_DIR $* &>> log/node.log &
BTC_PID="$!"
# Wait for RPC server to be online
$BTC_CLI -rpcwait getnetworkinfo > /dev/null
@@ -115,7 +117,7 @@ function init_btc() {
# Start a second bitcoind regtest node connected to the first one
function init_btc2() {
cp ${BASH_SOURCE%/*}/conf/bitcoin2.conf $WIRE_DIR2/bitcoin.conf
- bitcoind -datadir=$WIRE_DIR2 $* &>> log/btc2.log &
+ bitcoind -datadir=$WIRE_DIR2 $* &>> log/node2.log &
$BTC_CLI2 -rpcwait getnetworkinfo > /dev/null
$BTC_CLI addnode 127.0.0.1:8346 onetry
}
@@ -135,7 +137,7 @@ function btc2_fork() {
# Restart a bitcoind regest server in a previously created temporary directory and load wallets
function resume_btc() {
# Restart node
- bitcoind -datadir=$WIRE_DIR $* &>> log/btc.log &
+ bitcoind -datadir=$WIRE_DIR $* &>> log/node.log &
BTC_PID="$!"
# Load wallets
for wallet in wire client reserve; do
@@ -240,15 +242,40 @@ function init_eth() {
}
}" > $DIR/genesis.json
# Initialize blockchain
- $ETH_CLI init $DIR/genesis.json &> log/eth.log
+ $ETH_CLI init $DIR/genesis.json &> log/node.log
# Start node
- $ETH_CLI --miner.gasprice 0 $* &> log/eth.log &
+ $ETH_CLI --miner.recommit 0s --miner.gasprice 0 $* &> log/node.log &
sleep 1
# Create wire address
WIRE=`eth-wire-cli initwallet $CONF | grep -oP '(?<=is ).*'`
echo -e "PAYTO = payto://ethereum/$WIRE" >> $CONF
}
+# Start a seconf geth dev node connected to the first one
+function init_eth2() {
+ # Initialize blockchain
+ $ETH_CLI2 init $DIR/genesis.json &> log/node2.log
+ # Start node
+ $ETH_CLI2 --port 30305 --miner.recommit 0s --miner.gasprice 0 $* &> log/node2.log &
+ sleep 1
+ # Create etherbase account for mining
+ $ETH_CLI2 account new --password <(echo "password") &> /dev/null
+ # Connect nodes
+ $WIRE_UTILS connect $WIRE_DIR2
+}
+
+# Disconnect the two nodes
+function eth2_deco() {
+ $WIRE_UTILS disconnect $WIRE_DIR2
+}
+
+# Create a fork on the second node and reconnect the two node
+function eth2_fork() {
+ $WIRE_UTILS2 mine $RESERVE ${1:-}
+ $WIRE_UTILS connect $WIRE_DIR2
+ sleep 5
+}
+
# Check client and wire balance
function check_balance_eth() {
local CLIENT_BALANCE=`$WIRE_UTILS balance $CLIENT`
diff --git a/test/eth/reorg.sh b/test/eth/reorg.sh
@@ -0,0 +1,108 @@
+#!/bin/bash
+
+## Test eth-wire correctness when a blockchain reorganisation occurs
+
+set -eu
+
+source "${BASH_SOURCE%/*}/../common.sh"
+SCHEMA=eth.sql
+CONFIG=taler_eth.conf
+
+echo "----- Setup -----"
+echo "Load config file"
+load_config
+echo "Start database"
+setup_db
+echo "Start ethereum node"
+init_eth
+echo "Start second ethereum node"
+init_eth2
+echo "Start eth-wire"
+eth_wire
+echo "Start gateway"
+gateway
+echo ""
+
+SEQ="seq 10 20"
+
+echo "----- Handle reorg incoming transactions -----"
+
+echo "Loose second ethereum node"
+eth2_deco
+
+echo -n "Making wire transfer to exchange:"
+eth-wire-utils -d $WIRE_DIR deposit $CLIENT $WIRE 0.000 `$SEQ`
+next_eth # Trigger eth-wire
+check_delta "incoming?delta=-100" "$SEQ" "0.000"
+check_balance_eth 999835000 165000
+echo " OK"
+
+echo -n "Perform fork and check eth-wire hard error:"
+gateway_up
+eth2_fork 10
+check_balance_eth 1000000000 0
+gateway_down
+echo " OK"
+
+echo -n "Recover orphaned transactions:"
+next_eth 6 # More block needed to confirm
+check_balance_eth 999835000 165000
+gateway_up
+echo " OK"
+
+echo "----- Handle reorg outgoing transactions -----"
+
+echo "Loose second ethereum node"
+eth2_deco
+
+echo -n "Making wire transfer from exchange:"
+for n in `$SEQ`; do
+ taler-exchange-wire-gateway-client \
+ -b $BANK_ENDPOINT \
+ -C payto://ethereum/$CLIENT \
+ -a ETH:0.0000$n > /dev/null
+done
+sleep 1
+mine_eth # Mine transactions
+check_delta "outgoing?delta=-100" "$SEQ"
+check_balance_eth 999851500 148500
+echo " OK"
+
+echo -n "Perform fork and check eth-wire still up:"
+gateway_up
+eth2_fork 10
+check_balance_eth 999835000 165000
+gateway_up
+echo " OK"
+
+echo -n "Recover orphaned transactions:"
+next_eth 6 # More block needed to confirm
+check_balance_eth 999851500 148500
+echo " OK"
+
+echo "----- Handle reorg bounce -----"
+
+echo "Loose second ethereum node"
+eth2_deco
+
+echo -n "Bounce:"
+eth-wire-utils -d $WIRE_DIR send $CLIENT $WIRE 0.000 `$SEQ`
+sleep 1
+next_eth 6
+check_balance_eth 999840500 159500
+echo " OK"
+
+echo -n "Perform fork and check eth-wire hard error:"
+gateway_up
+eth2_fork 10
+check_balance_eth 999851500 148500
+gateway_down
+echo " OK"
+
+echo -n "Recover orphaned transactions:"
+next_eth 6 # More block needed to confirm
+check_balance_eth 999840500 159500
+gateway_up
+echo " OK"
+
+echo "All tests passed!"
+\ No newline at end of file
diff --git a/test/eth/wire.sh b/test/eth/wire.sh
@@ -22,7 +22,6 @@ gateway
echo ""
SEQ="seq 10 99"
-RUST_BACKTRACE=1
echo "----- Receive -----"
@@ -60,7 +59,7 @@ echo -n "Bounce:"
eth-wire-utils -d $WIRE_DIR send $CLIENT $WIRE 0.000 `seq 10 40`
sleep 1
next_eth
-check_balance_eth 995585499 4414500
+check_balance_eth 995554500 4445500
echo " OK"
echo "All tests passed!"
\ No newline at end of file