depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit b26af1f75a683f3a2d50aaf0de36b58a71ae7754
parent 43ac6ffc4186d64b039c27649af1b14c28e0b3d1
Author: Antoine A <>
Date:   Wed, 19 Jan 2022 14:58:54 +0100

Adapt defensively to blockchain forks

Diffstat:
Abtc-wire/src/loops.rs | 3+++
Abtc-wire/src/loops/analysis.rs | 66++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Abtc-wire/src/loops/listener.rs | 19+++++++++++++++++++
Abtc-wire/src/loops/worker.rs | 541+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mbtc-wire/src/main.rs | 580+++++--------------------------------------------------------------------------
Mbtc-wire/src/rpc.rs | 30+++++++++++++++++++++++++++++-
Mmakefile | 1+
Mtaler-common/src/config.rs | 2+-
Atest/btc/analysis.sh | 66++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtest/btc/conflict.sh | 4++--
Mtest/btc/hell.sh | 35++++++-----------------------------
Mtest/btc/reorg.sh | 26++++++++------------------
Mtest/btc/stress.sh | 2+-
Mtest/common.sh | 4++--
14 files changed, 776 insertions(+), 603 deletions(-)

diff --git a/btc-wire/src/loops.rs b/btc-wire/src/loops.rs @@ -0,0 +1,3 @@ +pub mod worker; +pub mod listener; +pub mod analysis; diff --git a/btc-wire/src/loops/analysis.rs b/btc-wire/src/loops/analysis.rs @@ -0,0 +1,66 @@ +use std::sync::atomic::Ordering; + +use btc_wire::rpc::ChainTipsStatus; +use postgres::fallible_iterator::FallibleIterator; +use taler_common::{ + config::Config, + log::log::{error, warn}, +}; + +use crate::{ + reconnect::{AutoReconnectRPC, AutoReconnectSql}, + WireState, +}; + +/// Analyse blockchain behavior and adapt confirmations in real time +pub fn analysis( + mut rpc: AutoReconnectRPC, + mut db: AutoReconnectSql, + config: &Config, + state: &WireState, +) { + // The biggest fork ever seen + let mut max_conf = 0; + loop { + let rpc = rpc.client(); + let db = db.client(); + let result: Result<(), Box<dyn std::error::Error>> = (|| { + // Register as listener + db.batch_execute("LISTEN new_block")?; + loop { + // Get biggest known valid fork + let max_fork = rpc + .get_chain_tips()? + .into_iter() + .filter_map(|t| { + (t.status == ChainTipsStatus::ValidFork).then(|| t.branch_length) + }) + .max() + .unwrap_or(0) as u16; + // The first time we see a fork that big + if max_fork > max_conf { + max_conf = max_fork; + let current_conf = state.confirmation.load(Ordering::SeqCst); + // If new fork is bigger than the current confirmation + if max_fork > current_conf { + // Max two time the configuration + let new_conf = max_fork.min(config.confirmation * 2); + state.confirmation.store(new_conf, Ordering::SeqCst); + warn!( + "analysis: found dangerous fork of {} blocks, adapt confirmation to {} blocks capped at {}, you should update taler.conf", + max_fork, new_conf, config.confirmation * 2 + ); + } + } + + // TODO smarter analysis: suspicious transaction value, limit wire bitcoin throughput + + // Wait for next notification + db.notifications().blocking_iter().next()?; + } + })(); + if let Err(e) = result { + error!("analysis: {}", e); + } + } +} diff --git a/btc-wire/src/loops/listener.rs b/btc-wire/src/loops/listener.rs @@ -0,0 +1,18 @@ +use taler_common::log::log::error; + +use crate::reconnect::{AutoReconnectRPC, AutoReconnectSql}; + +/// Wait for new block and notify arrival with postgreSQL notifications +pub fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) { + loop { + let rpc = rpc.client(); + let db = db.client(); + let result: Result<(), Box<dyn std::error::Error>> = (|| loop { + rpc.wait_for_new_block(0)?; + db.execute("NOTIFY new_block", &[])?; + })(); + if let Err(e) = result { + error!("listener: {}", e); + } + } +} +\ No newline at end of file diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs @@ -0,0 +1,541 @@ +use std::{ + collections::{HashMap, HashSet}, + fmt::Write, + str::FromStr, + sync::atomic::Ordering, + time::{Duration, SystemTime}, +}; + +use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Txid}; +use btc_wire::{ + rpc::{self, BtcRpc, Category, ErrorCode}, + rpc_utils::sender_address, + GetOpReturnErr, GetSegwitErr, +}; +use postgres::{fallible_iterator::FallibleIterator, Client}; +use taler_common::{ + api_common::{base32, Amount}, + config::Config, + log::log::{error, info, warn}, +}; +use url::Url; + +use crate::{ + fail_point::fail_point, + info::{decode_info, encode_info, Info}, + reconnect::{AutoReconnectRPC, AutoReconnectSql}, + status::{BounceStatus, TxStatus}, + taler_util::{btc_payto_addr, btc_payto_url, btc_to_taler, taler_to_btc}, + WireState, +}; + +/// Listen for new proposed transactions and announce them on the bitcoin network +pub fn worker( + mut rpc: AutoReconnectRPC, + mut db: AutoReconnectSql, + config: &Config, + state: &WireState, +) { + /// Send a transaction on the blockchain, return true if more transactions with the same status remains + fn send( + db: &mut Client, + rpc: &mut BtcRpc, + status: TxStatus, + ) -> Result<bool, Box<dyn std::error::Error>> { + assert!(status == TxStatus::Delayed || status == TxStatus::Requested); + let mut tx = db.transaction()?; + // We lock the row with FOR UPDATE to prevent sending same transaction multiple time + let row = tx.query_opt( + "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE", + &[&(status as i16)], + )?; + if let Some(row) = &row { + let id: i32 = row.get(0); + let amount = taler_to_btc(&Amount::from_str(row.get(1))?)?; + let wtid: &[u8] = row.get(2); + let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?; + let exchange_base_url: Url = Url::parse(row.get(4))?; + let info = Info::Transaction { + wtid: wtid.try_into()?, + url: exchange_base_url, + }; + let metadata = encode_info(&info); + + fail_point("(injected) fail send_op_return", 0.2)?; + match rpc.send_op_return(&addr, &amount, &metadata, false) { + Ok(tx_id) => { + fail_point("(injected) fail update db", 0.2)?; + tx.execute( + "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", + &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id], + )?; + tx.commit()?; + let amount = btc_to_taler(&amount.to_signed().unwrap()); + info!(">> {} {} in {} to {}", amount, base32(wtid), tx_id, addr); + } + Err(e) => { + tx.execute( + "UPDATE tx_out SET status=$1 WHERE id=$2", + &[&(TxStatus::Delayed as i16), &id], + )?; + tx.commit()?; + info!("sender: RPC - {}", e); + } + } + } + Ok(row.is_some()) + } + + /// Bounce a transaction on the blockchain, return true if more bounce with the same status remains + fn bounce( + db: &mut Client, + rpc: &mut BtcRpc, + status: BounceStatus, + fee: &BtcAmount, + ) -> Result<bool, Box<dyn std::error::Error>> { + assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested); + let mut tx = db.transaction()?; + // We lock the row with FOR UPDATE to prevent sending same transaction multiple time + let row = tx.query_opt( + "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE", + &[&(status as i16)], + )?; + if let Some(row) = &row { + let id: i32 = row.get(0); + let bounced: Txid = Txid::from_slice(row.get(1))?; + let info = Info::Bounce { bounced }; + let metadata = encode_info(&info); + + fail_point("(injected) fail bounce", 0.2)?; + match rpc.bounce(&bounced, fee, &metadata) { + Ok(it) => { + tx.execute( + "UPDATE bounce SET txid = $1, status = $2 WHERE id = $3", + &[&it.as_ref(), &(BounceStatus::Sent as i16), &id], + )?; + tx.commit()?; + info!("|| {} in {}", &bounced, &it); + } + Err(err) => match err { + rpc::Error::RPC { + code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, + msg, + } => { + tx.execute( + "UPDATE bounce SET status = $1 WHERE id = $2", + &[&(BounceStatus::Ignored as i16), &id], + )?; + tx.commit()?; + info!("|| (ignore) {} because {}", &bounced, msg); + } + _ => Err(err)?, + }, + } + } + Ok(row.is_some()) + } + + // TODO check if transactions are abandoned + + let mut lifetime = config.btc_lifetime; + let mut status = true; + + // Alway start with a sync work + let mut skip_notification = true; + loop { + // Check lifetime + if let Some(nb) = lifetime.as_mut() { + if *nb == 0 { + info!("Reach end of lifetime"); + return; + } else { + *nb -= 1; + } + } + + // Connect + let rpc = rpc.client(); + let db = db.client(); + + let result: Result<(), Box<dyn std::error::Error>> = (|| { + // Listen to all channels + db.batch_execute("LISTEN new_block; LISTEN new_tx")?; + // Wait for the next notification + { + let mut ntf = db.notifications(); + if !skip_notification && ntf.is_empty() { + // Block until next notification + ntf.blocking_iter().next()?; + } + // Conflate all notifications + let mut iter = ntf.iter(); + while iter.next()?.is_some() {} + } + // Sync chain + sync_chain(rpc, db, config, state, &mut status)?; + + // As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent + + // Send delayed transactions + while send(db, rpc, TxStatus::Delayed)? {} + // Send requested transactions + while send(db, rpc, TxStatus::Requested)? {} + + let bounce_fee = BtcAmount::from_sat(config.bounce_fee); + // Send delayed bounce + while bounce(db, rpc, BounceStatus::Delayed, &bounce_fee)? {} + // Send requested bounce + while bounce(db, rpc, BounceStatus::Requested, &bounce_fee)? {} + + Ok(()) + })(); + if let Err(e) = result { + error!("worker: {}", e); + // On failure retry without waiting for notifications + skip_notification = true; + } else { + skip_notification = false; + } + } +} + +/// Retrieve last stored hash +fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> { + Ok(db + .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])? + .map(|r| BlockHash::from_slice(r.get(0)).unwrap())) +} + +/// Parse new transactions, return true if the database is up to date with the latest mined block +fn sync_chain( + rpc: &mut BtcRpc, + db: &mut Client, + config: &Config, + state: &WireState, + status: &mut bool, +) -> Result<bool, Box<dyn std::error::Error>> { + // Get stored last_hash + let last_hash = last_hash(db)?; + let min_confirmations = state.confirmation.load(Ordering::SeqCst); + + // Get a set of transactions ids to parse + let (txs, removed, lastblock): (HashMap<Txid, (Category, i32)>, HashSet<Txid>, BlockHash) = { + // Get all transactions made since this block + let list = rpc.list_since_block(last_hash.as_ref(), min_confirmations, true)?; + // Only keep ids and category + let txs = list + .transactions + .into_iter() + .map(|tx| (tx.txid, (tx.category, tx.confirmations))) + .collect(); + let removed = list + .removed + .into_iter() + .filter_map(|tx| (tx.category == Category::Receive).then(|| tx.txid)) + .collect(); + (txs, removed, list.lastblock) + }; + + // Check if a confirmed incoming transaction have been removed by a blockchain reorganisation + + let new_status = sync_chain_removed(&txs, &removed, rpc, db, min_confirmations as i32)?; + + if *status != new_status { + let mut tx = db.transaction()?; + tx.execute( + "UPDATE state SET value=$1 WHERE name='status'", + &[&[new_status as u8].as_ref()], + )?; + tx.execute("NOTIFY status", &[])?; + tx.commit()?; + *status = new_status; + } + + if !new_status { + return Ok(false); + } + for (id, (category, confirmations)) in txs { + match category { + Category::Send => sync_chain_outgoing(&id, confirmations, rpc, db, config)?, + Category::Receive if confirmations >= min_confirmations as i32 => { + sync_chain_incoming_confirmed(&id, rpc, db)? + } + Category::Receive if confirmations < 0 => { + panic!("receive conflict {} {}", id, confirmations) + } + _ => { + // Ignore coinbase and unconfirmed send transactions + } + } + } + + // Move last_hash forward + { + let nb_row = if let Some(hash) = &last_hash { + db.execute( + "UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2", + &[&lastblock.as_ref(), &hash.as_ref()], + )? + } else { + db.execute( + "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING", + &[&lastblock.as_ref()], + )? + }; + + if nb_row == 0 { + error!("watcher: hash state collision, database have been altered by another process"); + } + } + Ok(true) +} + +/// Sync database with removed transactions, return false if bitcoin backing is compromised +fn sync_chain_removed( + txs: &HashMap<Txid, (Category, i32)>, + removed: &HashSet<Txid>, + rpc: &mut BtcRpc, + db: &mut Client, + min_confirmations: i32, +) -> Result<bool, Box<dyn std::error::Error>> { + // Removed transactions are correctness issue in only two cases: + // - An incoming valid transaction considered confirmed in the database + // - An incoming invalid transactions already bounced + // Those two cases can compromise bitcoin backing + // Removed outgoing transactions will be retried automatically by the node + + let mut blocking_receive = Vec::new(); + let mut blocking_bounce = Vec::new(); + for id in removed { + match rpc.get_tx_segwit_key(id) { + Ok((full, key)) => { + // Valid tx are only problematic if not confirmed in the txs list and stored stored in the database + if txs + .get(id) + .map(|(_, confirmations)| *confirmations < min_confirmations) + .unwrap_or(true) + && db + .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_ref()])? + .is_some() + { + let debit_addr = sender_address(rpc, &full)?; + blocking_receive.push((key, id, debit_addr)); + } + } + Err(err) => match err { + GetSegwitErr::Decode(_) => { + // Invalid tx are only problematic if already bounced + if let Some(row) = db.query_opt( + "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL", + &[&id.as_ref()], + )? { + let txid = Txid::from_slice(row.get(0)).unwrap(); + blocking_bounce.push((txid, id)); + } else { + // Remove transaction from bounce table + db.execute("DELETE FROM bounce WHERE bounced=$1", &[&id.as_ref()])?; + } + } + GetSegwitErr::RPC(it) => return Err(it.into()), + }, + } + } + + if !blocking_bounce.is_empty() || !blocking_receive.is_empty() { + let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string(); + for (key, id, addr) in blocking_receive { + write!( + &mut buf, + "\n\treceived {} in {} from {}", + base32(&key), + id, + addr + ) + .unwrap(); + } + for (id, bounced) in blocking_bounce { + write!(&mut buf, "\n\tbounced {} in {}", id, bounced).unwrap(); + } + error!("{}", buf); + return Ok(false); + } else { + return Ok(true); + } +} + +/// Sync database with an outgoing transaction +fn sync_chain_outgoing( + id: &Txid, + confirmations: i32, + rpc: &mut BtcRpc, + db: &mut Client, + config: &Config, +) -> Result<(), Box<dyn std::error::Error>> { + match rpc + .get_tx_op_return(id) + .map(|(full, bytes)| (full, decode_info(&bytes))) + { + Ok((full, Ok(info))) => match info { + Info::Transaction { wtid, .. } => { + let credit_addr = full.details[0].address.as_ref().unwrap(); + let amount = btc_to_taler(&full.amount); + + if confirmations < 0 { + // Handle conflicting tx + let nb_row = db.execute( + "UPDATE tx_out SET status=$1, txid=NULL where txid=$2", + &[&(TxStatus::Delayed as i16), &id.as_ref()], + )?; + if nb_row > 0 { + warn!( + ">> (conflict) {} in {} to {}", + base32(&wtid), + id, + credit_addr + ); + } + } else { + // Get previous out tx + let row = db.query_opt( + "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE", + &[&wtid.as_ref()], + )?; + if let Some(row) = row { + // If already in database sync status + let _id: i32 = row.get(0); + let status: i16 = row.get(1); + match TxStatus::try_from(status as u8).unwrap() { + TxStatus::Requested | TxStatus::Delayed => { + let nb_row = db.execute( + "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3", + &[&(TxStatus::Sent as i16), &_id, &status], + )?; + if nb_row > 0 { + warn!( + ">> (recovered) {} {} in {} to {}", + amount, + base32(&wtid), + id, + credit_addr + ); + } + } + TxStatus::Sent => { /* Status is correct */ } + } + } else { + // Else add to database + let debit_addr = sender_address(rpc, &full)?; + let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); + let nb = db.execute( + "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", + &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &None::<&[u8]>], + )?; + if nb > 0 { + warn!( + ">> (onchain) {} {} in {} to {}", + amount, + base32(&wtid), + id, + credit_addr + ); + } + } + } + } + Info::Bounce { bounced } => { + if confirmations < 0 { + // Handle conflicting tx + let nb_row = db.execute( + "UPDATE bounce SET status=$1, txid=NULL where txid=$2", + &[&(BounceStatus::Delayed as i16), &id.as_ref()], + )?; + if nb_row > 0 { + warn!("|| (conflict) {} in {}", &bounced, &id); + } + } else { + // Get previous bounce + let row = db.query_opt( + "SELECT id, status FROM bounce WHERE bounced=$1", + &[&bounced.as_ref()], + )?; + if let Some(row) = row { + // If already in database sync status + let _id: i32 = row.get(0); + let status: i16 = row.get(1); + match BounceStatus::try_from(status as u8).unwrap() { + BounceStatus::Requested | BounceStatus::Delayed => { + let nb_row = db.execute( + "UPDATE bounce SET status=$1 WHERE id=$2 AND status=$3", + &[&(BounceStatus::Sent as i16), &_id, &status], + )?; + if nb_row > 0 { + warn!("|| (recovered) {} in {}", &bounced, &id); + } + } + BounceStatus::Ignored => error!( + "watcher: ignored bounce {} found in chain at {}", + bounced, id + ), + BounceStatus::Sent => { /* Status is correct */ } + } + } else { + // Else add to database + let nb = db.execute( + "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", + &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)], + )?; + if nb > 0 { + warn!("|| (onchain) {} in {}", &bounced, &id); + } + } + } + } + }, + Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e), + Err(e) => match e { + GetOpReturnErr::MissingOpReturn => { /* Ignore */ } + GetOpReturnErr::RPC(e) => return Err(e)?, + }, + } + Ok(()) +} + +/// Sync database with na incoming confirmed transaction +fn sync_chain_incoming_confirmed( + id: &Txid, + rpc: &mut BtcRpc, + db: &mut Client, +) -> Result<(), Box<dyn std::error::Error>> { + match rpc.get_tx_segwit_key(id) { + Ok((full, reserve_pub)) => { + // Store transactions in database + let debit_addr = sender_address(rpc, &full)?; + let credit_addr = full.details[0].address.as_ref().unwrap(); + let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); + let amount = btc_to_taler(&full.amount); + let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ + &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref() + ])?; + if nb > 0 { + info!( + "<< {} {} in {} from {}", + amount, + base32(&reserve_pub), + id, + debit_addr + ); + } + } + Err(err) => match err { + GetSegwitErr::Decode(_) => { + // If encoding is wrong request a bounce + db.execute( + "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", + &[&id.as_ref()], + )?; + } + GetSegwitErr::RPC(e) => return Err(e.into()), + }, + } + Ok(()) +} diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs @@ -1,562 +1,24 @@ -use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Network, Txid}; +use bitcoin::Network; use btc_wire::{ config::{BitcoinConfig, WIRE_WALLET_NAME}, - rpc::{self, BtcRpc, Category, ErrorCode}, - rpc_utils::{default_data_dir, sender_address}, - GetOpReturnErr, GetSegwitErr, + rpc::BtcRpc, + rpc_utils::default_data_dir, }; -use info::decode_info; -use postgres::{fallible_iterator::FallibleIterator, Client}; use reconnect::{AutoReconnectRPC, AutoReconnectSql}; -use std::{ - collections::{HashMap, HashSet}, - fmt::Write, - str::FromStr, - time::{Duration, SystemTime}, -}; -use taler_common::{ - api_common::{base32, Amount}, - config::Config, - log::log::{error, info, warn}, -}; -use taler_util::btc_payto_url; -use url::Url; +use std::{sync::atomic::AtomicU16, thread::JoinHandle}; +use taler_common::{config::Config, log::log::info}; -use crate::{ - fail_point::fail_point, - info::{encode_info, Info}, - status::{BounceStatus, TxStatus}, - taler_util::{btc_payto_addr, btc_to_taler, taler_to_btc}, -}; +use crate::loops::{analysis::analysis, listener::block_listener, worker::worker}; mod fail_point; mod info; +mod loops; mod reconnect; mod status; mod taler_util; -/// Retrieve last stored hash -fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> { - Ok(db - .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])? - .map(|r| BlockHash::from_slice(r.get(0)).unwrap())) -} - -/// Listen for new proposed transactions and announce them on the bitcoin network -fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config) { - /// Send a transaction on the blockchain, return true if more transactions with the same status remains - fn send( - db: &mut Client, - rpc: &mut BtcRpc, - status: TxStatus, - ) -> Result<bool, Box<dyn std::error::Error>> { - assert!(status == TxStatus::Delayed || status == TxStatus::Requested); - let mut tx = db.transaction()?; - // We lock the row with FOR UPDATE to prevent sending same transaction multiple time - let row = tx.query_opt( - "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE", - &[&(status as i16)], - )?; - if let Some(row) = &row { - let id: i32 = row.get(0); - let amount = taler_to_btc(&Amount::from_str(row.get(1))?)?; - let wtid: &[u8] = row.get(2); - let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?; - let exchange_base_url: Url = Url::parse(row.get(4))?; - let info = Info::Transaction { - wtid: wtid.try_into()?, - url: exchange_base_url, - }; - let metadata = encode_info(&info); - - fail_point("(injected) fail send_op_return", 0.2)?; - match rpc.send_op_return(&addr, &amount, &metadata, false) { - Ok(tx_id) => { - fail_point("(injected) fail update db", 0.2)?; - tx.execute( - "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", - &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id], - )?; - tx.commit()?; - let amount = btc_to_taler(&amount.to_signed().unwrap()); - info!(">> {} {} in {} to {}", amount, base32(wtid), tx_id, addr); - } - Err(e) => { - tx.execute( - "UPDATE tx_out SET status=$1 WHERE id=$2", - &[&(TxStatus::Delayed as i16), &id], - )?; - tx.commit()?; - info!("sender: RPC - {}", e); - } - } - } - Ok(row.is_some()) - } - - /// Bounce a transaction on the blockchain, return true if more bounce with the same status remains - fn bounce( - db: &mut Client, - rpc: &mut BtcRpc, - status: BounceStatus, - fee: &BtcAmount, - ) -> Result<bool, Box<dyn std::error::Error>> { - assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested); - let mut tx = db.transaction()?; - // We lock the row with FOR UPDATE to prevent sending same transaction multiple time - let row = tx.query_opt( - "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE", - &[&(status as i16)], - )?; - if let Some(row) = &row { - let id: i32 = row.get(0); - let bounced: Txid = Txid::from_slice(row.get(1))?; - let info = Info::Bounce { bounced }; - let metadata = encode_info(&info); - - fail_point("(injected) fail bounce", 0.2)?; - match rpc.bounce(&bounced, fee, &metadata) { - Ok(it) => { - tx.execute( - "UPDATE bounce SET txid = $1, status = $2 WHERE id = $3", - &[&it.as_ref(), &(BounceStatus::Sent as i16), &id], - )?; - tx.commit()?; - info!("|| {} in {}", &bounced, &it); - } - Err(err) => match err { - rpc::Error::RPC { - code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, - msg, - } => { - tx.execute( - "UPDATE bounce SET status = $1 WHERE id = $2", - &[&(BounceStatus::Ignored as i16), &id], - )?; - tx.commit()?; - info!("|| (ignore) {} because {}", &bounced, msg); - } - _ => Err(err)?, - }, - } - } - Ok(row.is_some()) - } - - // TODO check if transactions are abandoned - - let mut lifetime = config.btc_lifetime; - let mut status = true; - - // Alway start with a sync work - let mut skip_notification = true; - loop { - // Check lifetime - if let Some(nb) = lifetime.as_mut() { - if *nb == 0 { - info!("Reach end of lifetime"); - return; - } else { - *nb -= 1; - } - } - - // Connect - let rpc = rpc.client(); - let db = db.client(); - - let result: Result<(), Box<dyn std::error::Error>> = (|| { - // Listen to all channels - db.batch_execute("LISTEN new_block; LISTEN new_tx")?; - // Wait for the next notification - { - let mut ntf = db.notifications(); - if !skip_notification && ntf.is_empty() { - // Block until next notification - ntf.blocking_iter().next()?; - } - // Conflate all notifications - let mut iter = ntf.iter(); - while iter.next()?.is_some() {} - } - // Sync chain - sync_chain(rpc, db, config, &mut status)?; - - // As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent - - // Send delayed transactions - while send(db, rpc, TxStatus::Delayed)? {} - // Send requested transactions - while send(db, rpc, TxStatus::Requested)? {} - - let bounce_fee = BtcAmount::from_sat(config.bounce_fee); - // Send delayed bounce - while bounce(db, rpc, BounceStatus::Delayed, &bounce_fee)? {} - // Send requested bounce - while bounce(db, rpc, BounceStatus::Requested, &bounce_fee)? {} - - Ok(()) - })(); - if let Err(e) = result { - error!("worker: {}", e); - // On failure retry without waiting for notifications - skip_notification = true; - } else { - skip_notification = false; - } - } -} - -/// Parse new transactions, return true if the database is up to date with the latest mined block -fn sync_chain( - rpc: &mut BtcRpc, - db: &mut Client, - config: &Config, - status: &mut bool, -) -> Result<bool, Box<dyn std::error::Error>> { - // Get stored last_hash - let last_hash = last_hash(db)?; - let min_confirmations = config.confirmation; - - // Get a set of transactions ids to parse - let (txs, removed, lastblock): (HashMap<Txid, (Category, i32)>, HashSet<Txid>, BlockHash) = { - // Get all transactions made since this block - let list = rpc.list_since_block(last_hash.as_ref(), min_confirmations, true)?; - // Only keep ids and category - let txs = list - .transactions - .into_iter() - .map(|tx| (tx.txid, (tx.category, tx.confirmations))) - .collect(); - let removed = list - .removed - .into_iter() - .filter_map(|tx| (tx.category == Category::Receive).then(|| tx.txid)) - .collect(); - (txs, removed, list.lastblock) - }; - - // Check if a confirmed incoming transaction have been removed by a blockchain reorganisation - - let new_status = sync_chain_removed(&txs, &removed, rpc, db, min_confirmations as i32)?; - - if *status != new_status { - let mut tx = db.transaction()?; - tx.execute( - "UPDATE state SET value=$1 WHERE name='status'", - &[&[new_status as u8].as_ref()], - )?; - tx.execute("NOTIFY status", &[])?; - tx.commit()?; - *status = new_status; - } - - if !new_status { - return Ok(false); - } - for (id, (category, confirmations)) in txs { - match category { - Category::Send => sync_chain_outgoing(&id, confirmations, rpc, db, config)?, - Category::Receive if confirmations >= min_confirmations as i32 => { - sync_chain_incoming_confirmed(&id, rpc, db)? - } - Category::Receive if confirmations < 0 => { - panic!("receive conflict {} {}", id, confirmations) - } - _ => { - // Ignore coinbase and unconfirmed send transactions - } - } - } - // Move last_hash forward if no error have been caught - { - let curr_hash: Option<BlockHash> = db - .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])? - .map(|r| BlockHash::from_slice(r.get(0)).unwrap()); - - let nb_row = if let Some(hash) = &curr_hash { - db.execute( - "UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2", - &[&lastblock.as_ref(), &hash.as_ref()], - )? - } else { - db.execute( - "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING", - &[&lastblock.as_ref()], - )? - }; - - if last_hash != curr_hash || nb_row == 0 { - error!("watcher: hash state collision, database have been altered by another process"); - } - } - Ok(true) -} - -/// Sync database with removed transactions, return false if bitcoin backing is compromised -fn sync_chain_removed( - txs: &HashMap<Txid, (Category, i32)>, - removed: &HashSet<Txid>, - rpc: &mut BtcRpc, - db: &mut Client, - min_confirmations: i32, -) -> Result<bool, Box<dyn std::error::Error>> { - // Removed transactions are correctness issue in only two cases: - // - An incoming valid transaction considered confirmed in the database - // - An incoming invalid transactions already bounced - // Those two cases can compromise bitcoin backing - // Removed outgoing transactions will be retried automatically by the node - - let mut blocking_receive = Vec::new(); - let mut blocking_bounce = Vec::new(); - for id in removed { - match rpc.get_tx_segwit_key(id) { - Ok((full, key)) => { - // Valid tx are only problematic if not confirmed in the txs list and stored stored in the database - if txs - .get(id) - .map(|(_, confirmations)| *confirmations < min_confirmations) - .unwrap_or(true) - && db - .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_ref()])? - .is_some() - { - let debit_addr = sender_address(rpc, &full)?; - blocking_receive.push((key, id, debit_addr)); - } - } - Err(err) => match err { - GetSegwitErr::Decode(_) => { - // Invalid tx are only problematic if already bounced - if let Some(row) = db.query_opt( - "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL", - &[&id.as_ref()], - )? { - let txid = Txid::from_slice(row.get(0)).unwrap(); - blocking_bounce.push((txid, id)); - } else { - // Remove transaction from bounce table - db.execute("DELETE FROM bounce WHERE bounced=$1", &[&id.as_ref()])?; - } - } - GetSegwitErr::RPC(it) => return Err(it.into()), - }, - } - } - - if !blocking_bounce.is_empty() || !blocking_receive.is_empty() { - let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string(); - for (key, id, addr) in blocking_receive { - write!( - &mut buf, - "\n\treceived {} in {} from {}", - base32(&key), - id, - addr - ) - .unwrap(); - } - for (id, bounced) in blocking_bounce { - write!(&mut buf, "\n\tbounced {} in {}", id, bounced).unwrap(); - } - error!("{}", buf); - return Ok(false); - } else { - return Ok(true); - } -} - -/// Sync database with an outgoing transaction -fn sync_chain_outgoing( - id: &Txid, - confirmations: i32, - rpc: &mut BtcRpc, - db: &mut Client, - config: &Config, -) -> Result<(), Box<dyn std::error::Error>> { - match rpc - .get_tx_op_return(id) - .map(|(full, bytes)| (full, decode_info(&bytes))) - { - Ok((full, Ok(info))) => match info { - Info::Transaction { wtid, .. } => { - let credit_addr = full.details[0].address.as_ref().unwrap(); - let amount = btc_to_taler(&full.amount); - - if confirmations < 0 { - // Handle conflicting tx - let nb_row = db.execute( - "UPDATE tx_out SET status=$1, txid=NULL where txid=$2", - &[&(TxStatus::Delayed as i16), &id.as_ref()], - )?; - if nb_row > 0 { - warn!( - ">> (conflict) {} in {} to {}", - base32(&wtid), - id, - credit_addr - ); - } - } else { - // Get previous out tx - let row = db.query_opt( - "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE", - &[&wtid.as_ref()], - )?; - if let Some(row) = row { - // If already in database sync status - let _id: i32 = row.get(0); - let status: i16 = row.get(1); - match TxStatus::try_from(status as u8).unwrap() { - TxStatus::Requested | TxStatus::Delayed => { - let nb_row = db.execute( - "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3", - &[&(TxStatus::Sent as i16), &_id, &status], - )?; - if nb_row > 0 { - warn!( - ">> (recovered) {} {} in {} to {}", - amount, - base32(&wtid), - id, - credit_addr - ); - } - } - TxStatus::Sent => { /* Status is correct */ } - } - } else { - // Else add to database - let debit_addr = sender_address(rpc, &full)?; - let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); - let nb = db.execute( - "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", - &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &None::<&[u8]>], - )?; - if nb > 0 { - warn!( - ">> (onchain) {} {} in {} to {}", - amount, - base32(&wtid), - id, - credit_addr - ); - } - } - } - } - Info::Bounce { bounced } => { - if confirmations < 0 { - // Handle conflicting tx - let nb_row = db.execute( - "UPDATE bounce SET status=$1, txid=NULL where txid=$2", - &[&(BounceStatus::Delayed as i16), &id.as_ref()], - )?; - if nb_row > 0 { - warn!("|| (conflict) {} in {}", &bounced, &id); - } - } else { - // Get previous bounce - let row = db.query_opt( - "SELECT id, status FROM bounce WHERE bounced=$1", - &[&bounced.as_ref()], - )?; - if let Some(row) = row { - // If already in database sync status - let _id: i32 = row.get(0); - let status: i16 = row.get(1); - match BounceStatus::try_from(status as u8).unwrap() { - BounceStatus::Requested | BounceStatus::Delayed => { - let nb_row = db.execute( - "UPDATE bounce SET status=$1 WHERE id=$2 AND status=$3", - &[&(BounceStatus::Sent as i16), &_id, &status], - )?; - if nb_row > 0 { - warn!("|| (recovered) {} in {}", &bounced, &id); - } - } - BounceStatus::Ignored => error!( - "watcher: ignored bounce {} found in chain at {}", - bounced, id - ), - BounceStatus::Sent => { /* Status is correct */ } - } - } else { - // Else add to database - let nb = db.execute( - "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", - &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)], - )?; - if nb > 0 { - warn!("|| (onchain) {} in {}", &bounced, &id); - } - } - } - } - }, - Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e), - Err(e) => match e { - GetOpReturnErr::MissingOpReturn => { /* Ignore */ } - GetOpReturnErr::RPC(e) => return Err(e)?, - }, - } - Ok(()) -} - -/// Sync database with na incoming confirmed transaction -fn sync_chain_incoming_confirmed( - id: &Txid, - rpc: &mut BtcRpc, - db: &mut Client, -) -> Result<(), Box<dyn std::error::Error>> { - match rpc.get_tx_segwit_key(id) { - Ok((full, reserve_pub)) => { - // Store transactions in database - let debit_addr = sender_address(rpc, &full)?; - let credit_addr = full.details[0].address.as_ref().unwrap(); - let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); - let amount = btc_to_taler(&full.amount); - let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ - &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref() - ])?; - if nb > 0 { - info!( - "<< {} {} in {} from {}", - amount, - base32(&reserve_pub), - id, - debit_addr - ); - } - } - Err(err) => match err { - GetSegwitErr::Decode(_) => { - // If encoding is wrong request a bounce - db.execute( - "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", - &[&id.as_ref()], - )?; - } - GetSegwitErr::RPC(e) => return Err(e.into()), - }, - } - Ok(()) -} - -/// Wait for new block and notify arrival with postgreSQL notifications -fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) { - loop { - let rpc = rpc.client(); - let db = db.client(); - let result: Result<(), Box<dyn std::error::Error>> = (|| { - rpc.wait_for_new_block(0)?; - db.execute("NOTIFY new_block", &[])?; - Ok(()) - })(); - if let Err(e) = result { - error!("listener: {}", e); - } - } +pub struct WireState { + confirmation: AtomicU16, } fn main() { @@ -588,13 +50,33 @@ fn main() { }; info!("Running on {} chain", chain_name); + let state: &'static WireState = Box::leak(Box::new(WireState { + confirmation: AtomicU16::new(config.confirmation), + })); + let mut rpc = BtcRpc::common(&btc_config).unwrap(); rpc.load_wallet(WIRE_WALLET_NAME).ok(); let rpc_listener = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME); + let rpc_analysis = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME); let rpc_worker = AutoReconnectRPC::new(btc_config, WIRE_WALLET_NAME); let db_listener = AutoReconnectSql::new(&config.db_url); + let db_analysis = AutoReconnectSql::new(&config.db_url); let db_worker = AutoReconnectSql::new(&config.db_url); - std::thread::spawn(move || block_listener(rpc_listener, db_listener)); - worker(rpc_worker, db_worker, config); + named_spawn("listener".to_string(), move || { + block_listener(rpc_listener, db_listener) + }); + named_spawn("analysis".to_string(), move || { + analysis(rpc_analysis, db_analysis, config, state) + }); + worker(rpc_worker, db_worker, config, state); +} + +pub fn named_spawn<F, T>(name: String, f: F) -> JoinHandle<T> +where + F: FnOnce() -> T, + F: Send + 'static, + T: Send + 'static, +{ + std::thread::Builder::new().name(name).spawn(f).unwrap() } diff --git a/btc-wire/src/rpc.rs b/btc-wire/src/rpc.rs @@ -7,6 +7,8 @@ //! //! We only parse the thing we actually use, this reduce memory usage and //! make our code more compatible with future deprecation +//! +//! bitcoincore RPC documentation: <https://bitcoincore.org/en/doc/22.0.0/> use bitcoin::{hashes::hex::ToHex, Address, Amount, BlockHash, SignedAmount, Txid}; use serde_json::{json, Value}; @@ -272,7 +274,7 @@ impl BtcRpc { pub fn list_since_block( &mut self, hash: Option<&BlockHash>, - confirmation: u8, + confirmation: u16, include_remove: bool, ) -> Result<ListSinceBlock> { self.call("listsinceblock", &(hash, confirmation, (), include_remove)) @@ -285,6 +287,10 @@ impl BtcRpc { } } + pub fn get_chain_tips(&mut self) -> Result<Vec<ChainTips>> { + self.call("getchaintips", &EMPTY) + } + pub fn get_tx(&mut self, id: &Txid) -> Result<TransactionFull> { self.call("gettransaction", &(id, (), true)) } @@ -400,6 +406,28 @@ pub struct HexWrapper { pub hex: String, } +#[derive(Clone, PartialEq, Eq, serde::Deserialize, Debug)] +pub struct ChainTips { + pub height: u64, + pub hash: bitcoin::BlockHash, + #[serde(rename = "branchlen")] + pub branch_length: usize, + pub status: ChainTipsStatus, +} + +#[derive(Copy, serde::Deserialize, Clone, PartialEq, Eq, Debug)] +#[serde(rename_all = "lowercase")] +pub enum ChainTipsStatus { + Invalid, + #[serde(rename = "headers-only")] + HeadersOnly, + #[serde(rename = "valid-headers")] + ValidHeaders, + #[serde(rename = "valid-fork")] + ValidFork, + Active, +} + #[derive(Debug, serde::Deserialize)] pub struct Nothing {} diff --git a/makefile b/makefile @@ -14,6 +14,7 @@ test_btc: test/btc/conflict.sh test/btc/reorg.sh test/btc/hell.sh + test/btc/analysis.sh test/btc/config.sh test: install test_gateway test_btc \ No newline at end of file diff --git a/taler-common/src/config.rs b/taler-common/src/config.rs @@ -32,7 +32,7 @@ pub struct Config { pub unix_path: Option<PathBuf>, pub btc_data_dir: Option<PathBuf>, pub payto: Url, - pub confirmation: u8, + pub confirmation: u16, pub bounce_fee: u64, pub btc_lifetime: Option<u64>, pub http_lifetime: Option<u64>, diff --git a/test/btc/analysis.sh b/test/btc/analysis.sh @@ -0,0 +1,65 @@ +#!/bin/bash + +## Test btc_wire ability to learn and protect itself from blockchain behavior + +set -eu + +source "${BASH_SOURCE%/*}/../common.sh" +SCHEMA=btc.sql + +echo "----- Setup -----" +echo "Load config file" +load_config +echo "Start database" +setup_db +echo "Start bitcoin node" +init_btc +echo "Start second bitcoin node" +init_btc2 +echo "Start btc-wire" +btc_wire +echo "Start gateway" +gateway +echo "" + +echo "----- Learn from reorg -----" + +echo "Loose second bitcoin node" +btc2_deco + +echo -n "Making wire transfer to exchange:" +btc-wire-utils -d $BTC_DIR transfer 0.042 > /dev/null +next_btc # Trigger btc_wire +check_balance 9.95799209 0.04200000 +echo " OK" + +echo -n "Perform fork and check btc-wire hard error:" +gateway_up +btc2_fork 5 +check_balance 9.95799209 0.00000000 +gateway_down +echo " OK" + +echo -n "Recover orphaned transactions:" +next_btc 5 # More block needed to confirm +check_balance 9.95799209 0.04200000 +gateway_up +echo " OK" + +echo "Loose second bitcoin node" +btc2_deco + +echo -n "Making wire transfer to exchange:" +btc-wire-utils -d $BTC_DIR transfer 0.064 > /dev/null +next_btc 5 # More block needed to confirm +check_balance 9.89398418 0.10600000 +echo " OK" + +echo -n "Perform fork and check btc-wire learned from previous attack:" +gateway_up +btc2_fork 5 +check_balance 9.89398418 0.10600000 +gateway_up +echo " OK" + +echo "All tests passed!" +\ No newline at end of file diff --git a/test/btc/conflict.sh b/test/btc/conflict.sh @@ -57,7 +57,7 @@ check_balance 9.96299209 0.03698010 echo " OK" echo -n "Resend conflicting transaction:" -sleep 5 +sleep 5 # Wait for reconnection mine_btc check_delta "outgoing?delta=-100" "seq 4 5" "0.00" check_balance 9.96699209 0.03297811 @@ -106,7 +106,7 @@ check_balance 9.95994929 0.04001000 echo " OK" echo -n "Resend conflicting transaction:" -sleep 5 +sleep 5 # Wait for reconnection mine_btc check_balance 9.99993744 0.00002000 echo " OK" diff --git a/test/btc/hell.sh b/test/btc/hell.sh @@ -35,17 +35,11 @@ echo " OK" echo -n "Perform fork and check btc-wire hard error:" gateway_up -btc2_fork +btc2_fork 5 check_balance 9.99579209 0.00000000 gateway_down echo " OK" -echo -n "Check btc-wire hard error on restart:" -btc_wire -sleep 1 -gateway_down -echo " OK" - echo -n "Generate conflict:" restart_btc -minrelaytxfee=0.0001 btc-wire-utils -d $BTC_DIR abandon client @@ -54,14 +48,10 @@ next_btc check_balance 9.99457382 0.00540000 echo " OK" -echo -n "Check btc-wire never heal on restart:" -btc_wire -sleep 1 +echo -n "Check btc-wire have not read the conflicting transaction:" +sleep 5 # Wait for reconnection # Wait for reconnection gateway_down check_balance 9.99457382 0.00540000 -echo " OK" - -echo -n "Check btc-wire have not read the conflicting transaction:" check_delta "incoming" "" echo " OK" @@ -93,23 +83,16 @@ btc2_deco echo -n "Generate bounce:" $BTC_CLI -rpcwallet=client sendtoaddress $WIRE 0.042 > /dev/null next_btc -sleep 1 check_balance 9.99998674 0.00001000 echo " OK" echo -n "Perform fork and check btc-wire hard error:" gateway_up -btc2_fork +btc2_fork 5 check_balance 9.95799859 0.00000000 gateway_down echo " OK" -echo -n "Check btc-wire hard error on restart:" -btc_wire -sleep 1 -gateway_down -echo " OK" - echo -n "Generate conflict:" restart_btc -minrelaytxfee=0.0001 btc-wire-utils -d $BTC_DIR abandon client @@ -118,16 +101,10 @@ next_btc check_balance 9.94597382 0.05400000 echo " OK" -sleep 5 - -echo -n "Check btc-wire never heal on restart:" -btc_wire -sleep 1 +echo -n "Check btc-wire have not read the conflicting transaction:" +sleep 5 # Wait for reconnection gateway_down check_balance 9.94597382 0.05400000 -echo " OK" - -echo -n "Check btc-wire have not read the conflicting transaction:" check_delta "incoming" "" echo " OK" diff --git a/test/btc/reorg.sh b/test/btc/reorg.sh @@ -41,25 +41,14 @@ echo " OK" echo -n "Perform fork and check btc-wire hard error:" gateway_up -btc2_fork +btc2_fork 22 check_balance 9.99826299 0.00000000 gateway_down echo " OK" -echo -n "Check btc-wire hard error on restart:" -btc_wire -sleep 1 -gateway_down -echo " OK" - echo -n "Recover orphaned transactions:" -next_btc +next_btc 6 # More block needed to confirm check_balance 9.99826299 0.00165000 -echo " OK" - -echo -n "Check btc-wire heal on restart:" -btc_wire -sleep 1 gateway_up echo " OK" @@ -83,19 +72,20 @@ echo " OK" echo -n "Perform fork and check btc-wire still up:" gateway_up -btc2_fork +btc2_fork 22 check_balance 9.99826299 0.00146311 gateway_up echo " OK" echo -n "Recover orphaned transactions:" -next_btc +next_btc 6 # More block needed to confirm check_balance 9.99842799 0.00146311 echo " OK" echo "----- Handle reorg bounce -----" clear_wallet +check_balance "*" 0.00000000 echo "Loose second bitcoin node" btc2_deco @@ -105,20 +95,20 @@ for n in `$SEQ`; do $BTC_CLI -rpcwallet=client sendtoaddress $WIRE 0.000$n > /dev/null mine_btc done -next_btc +next_btc 6 # More block needed to confirm sleep 1 check_balance "*" 0.00011000 echo " OK" echo -n "Perform fork and check btc-wire hard error:" gateway_up -btc2_fork +btc2_fork 22 check_balance "*" 0.00000000 gateway_down echo " OK" echo -n "Recover orphaned transactions:" -next_btc +next_btc 6 # More block needed to confirm check_balance "*" 0.00011000 echo " OK" diff --git a/test/btc/stress.sh b/test/btc/stress.sh @@ -99,7 +99,7 @@ done next_btc sleep 7 mine_btc -sleep 5 +sleep 5 # Wait for reconnection echo " OK" echo -n "Check balance:" diff --git a/test/common.sh b/test/common.sh @@ -122,7 +122,7 @@ function btc2_deco() { # Create a fork on the second node and reconnect the two node function btc2_fork() { - $BTC_CLI2 generatetoaddress ${1:-50} $RESERVE > /dev/null + $BTC_CLI2 generatetoaddress $1 $RESERVE > /dev/null $BTC_CLI addnode 127.0.0.1:8346 onetry sleep 1 } @@ -160,7 +160,7 @@ function mine_btc() { # Mine previous transactions function next_btc() { # Mine enough block to confirm previous transactions - mine_btc $CONFIRMATION + mine_btc ${1:-$CONFIRMATION} # Wait for btc_wire to catch up sleep 0.2 # Mine one more block to trigger btc_wire