summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntoine A <>2022-01-19 14:58:54 +0100
committerAntoine A <>2022-01-19 15:20:54 +0100
commitb26af1f75a683f3a2d50aaf0de36b58a71ae7754 (patch)
treedf8374a918744891a5e989cb27a53f0d59f7e29d
parent43ac6ffc4186d64b039c27649af1b14c28e0b3d1 (diff)
downloaddepolymerization-research.tar.gz
depolymerization-research.tar.bz2
depolymerization-research.zip
Adapt defensively to blockchain forksresearch
-rw-r--r--btc-wire/src/loops.rs3
-rw-r--r--btc-wire/src/loops/analysis.rs66
-rw-r--r--btc-wire/src/loops/listener.rs18
-rw-r--r--btc-wire/src/loops/worker.rs541
-rw-r--r--btc-wire/src/main.rs580
-rw-r--r--btc-wire/src/rpc.rs30
-rw-r--r--makefile1
-rw-r--r--taler-common/src/config.rs2
-rw-r--r--test/btc/analysis.sh65
-rw-r--r--test/btc/conflict.sh4
-rw-r--r--test/btc/hell.sh35
-rw-r--r--test/btc/reorg.sh26
-rw-r--r--test/btc/stress.sh2
-rw-r--r--test/common.sh4
14 files changed, 774 insertions, 603 deletions
diff --git a/btc-wire/src/loops.rs b/btc-wire/src/loops.rs
new file mode 100644
index 0000000..f28e2b1
--- /dev/null
+++ b/btc-wire/src/loops.rs
@@ -0,0 +1,3 @@
+pub mod worker;
+pub mod listener;
+pub mod analysis;
diff --git a/btc-wire/src/loops/analysis.rs b/btc-wire/src/loops/analysis.rs
new file mode 100644
index 0000000..e1ff5bf
--- /dev/null
+++ b/btc-wire/src/loops/analysis.rs
@@ -0,0 +1,66 @@
+use std::sync::atomic::Ordering;
+
+use btc_wire::rpc::ChainTipsStatus;
+use postgres::fallible_iterator::FallibleIterator;
+use taler_common::{
+ config::Config,
+ log::log::{error, warn},
+};
+
+use crate::{
+ reconnect::{AutoReconnectRPC, AutoReconnectSql},
+ WireState,
+};
+
+/// Analyse blockchain behavior and adapt confirmations in real time
+pub fn analysis(
+ mut rpc: AutoReconnectRPC,
+ mut db: AutoReconnectSql,
+ config: &Config,
+ state: &WireState,
+) {
+ // The biggest fork ever seen
+ let mut max_conf = 0;
+ loop {
+ let rpc = rpc.client();
+ let db = db.client();
+ let result: Result<(), Box<dyn std::error::Error>> = (|| {
+ // Register as listener
+ db.batch_execute("LISTEN new_block")?;
+ loop {
+ // Get biggest known valid fork
+ let max_fork = rpc
+ .get_chain_tips()?
+ .into_iter()
+ .filter_map(|t| {
+ (t.status == ChainTipsStatus::ValidFork).then(|| t.branch_length)
+ })
+ .max()
+ .unwrap_or(0) as u16;
+ // The first time we see a fork that big
+ if max_fork > max_conf {
+ max_conf = max_fork;
+ let current_conf = state.confirmation.load(Ordering::SeqCst);
+ // If new fork is bigger than the current confirmation
+ if max_fork > current_conf {
+ // Max two time the configuration
+ let new_conf = max_fork.min(config.confirmation * 2);
+ state.confirmation.store(new_conf, Ordering::SeqCst);
+ warn!(
+ "analysis: found dangerous fork of {} blocks, adapt confirmation to {} blocks capped at {}, you should update taler.conf",
+ max_fork, new_conf, config.confirmation * 2
+ );
+ }
+ }
+
+ // TODO smarter analysis: suspicious transaction value, limit wire bitcoin throughput
+
+ // Wait for next notification
+ db.notifications().blocking_iter().next()?;
+ }
+ })();
+ if let Err(e) = result {
+ error!("analysis: {}", e);
+ }
+ }
+}
diff --git a/btc-wire/src/loops/listener.rs b/btc-wire/src/loops/listener.rs
new file mode 100644
index 0000000..41cfd7e
--- /dev/null
+++ b/btc-wire/src/loops/listener.rs
@@ -0,0 +1,18 @@
+use taler_common::log::log::error;
+
+use crate::reconnect::{AutoReconnectRPC, AutoReconnectSql};
+
+/// Wait for new block and notify arrival with postgreSQL notifications
+pub fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) {
+ loop {
+ let rpc = rpc.client();
+ let db = db.client();
+ let result: Result<(), Box<dyn std::error::Error>> = (|| loop {
+ rpc.wait_for_new_block(0)?;
+ db.execute("NOTIFY new_block", &[])?;
+ })();
+ if let Err(e) = result {
+ error!("listener: {}", e);
+ }
+ }
+} \ No newline at end of file
diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs
new file mode 100644
index 0000000..d8924de
--- /dev/null
+++ b/btc-wire/src/loops/worker.rs
@@ -0,0 +1,541 @@
+use std::{
+ collections::{HashMap, HashSet},
+ fmt::Write,
+ str::FromStr,
+ sync::atomic::Ordering,
+ time::{Duration, SystemTime},
+};
+
+use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Txid};
+use btc_wire::{
+ rpc::{self, BtcRpc, Category, ErrorCode},
+ rpc_utils::sender_address,
+ GetOpReturnErr, GetSegwitErr,
+};
+use postgres::{fallible_iterator::FallibleIterator, Client};
+use taler_common::{
+ api_common::{base32, Amount},
+ config::Config,
+ log::log::{error, info, warn},
+};
+use url::Url;
+
+use crate::{
+ fail_point::fail_point,
+ info::{decode_info, encode_info, Info},
+ reconnect::{AutoReconnectRPC, AutoReconnectSql},
+ status::{BounceStatus, TxStatus},
+ taler_util::{btc_payto_addr, btc_payto_url, btc_to_taler, taler_to_btc},
+ WireState,
+};
+
+/// Listen for new proposed transactions and announce them on the bitcoin network
+pub fn worker(
+ mut rpc: AutoReconnectRPC,
+ mut db: AutoReconnectSql,
+ config: &Config,
+ state: &WireState,
+) {
+ /// Send a transaction on the blockchain, return true if more transactions with the same status remains
+ fn send(
+ db: &mut Client,
+ rpc: &mut BtcRpc,
+ status: TxStatus,
+ ) -> Result<bool, Box<dyn std::error::Error>> {
+ assert!(status == TxStatus::Delayed || status == TxStatus::Requested);
+ let mut tx = db.transaction()?;
+ // We lock the row with FOR UPDATE to prevent sending same transaction multiple time
+ let row = tx.query_opt(
+ "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE",
+ &[&(status as i16)],
+ )?;
+ if let Some(row) = &row {
+ let id: i32 = row.get(0);
+ let amount = taler_to_btc(&Amount::from_str(row.get(1))?)?;
+ let wtid: &[u8] = row.get(2);
+ let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?;
+ let exchange_base_url: Url = Url::parse(row.get(4))?;
+ let info = Info::Transaction {
+ wtid: wtid.try_into()?,
+ url: exchange_base_url,
+ };
+ let metadata = encode_info(&info);
+
+ fail_point("(injected) fail send_op_return", 0.2)?;
+ match rpc.send_op_return(&addr, &amount, &metadata, false) {
+ Ok(tx_id) => {
+ fail_point("(injected) fail update db", 0.2)?;
+ tx.execute(
+ "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
+ &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id],
+ )?;
+ tx.commit()?;
+ let amount = btc_to_taler(&amount.to_signed().unwrap());
+ info!(">> {} {} in {} to {}", amount, base32(wtid), tx_id, addr);
+ }
+ Err(e) => {
+ tx.execute(
+ "UPDATE tx_out SET status=$1 WHERE id=$2",
+ &[&(TxStatus::Delayed as i16), &id],
+ )?;
+ tx.commit()?;
+ info!("sender: RPC - {}", e);
+ }
+ }
+ }
+ Ok(row.is_some())
+ }
+
+ /// Bounce a transaction on the blockchain, return true if more bounce with the same status remains
+ fn bounce(
+ db: &mut Client,
+ rpc: &mut BtcRpc,
+ status: BounceStatus,
+ fee: &BtcAmount,
+ ) -> Result<bool, Box<dyn std::error::Error>> {
+ assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested);
+ let mut tx = db.transaction()?;
+ // We lock the row with FOR UPDATE to prevent sending same transaction multiple time
+ let row = tx.query_opt(
+ "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE",
+ &[&(status as i16)],
+ )?;
+ if let Some(row) = &row {
+ let id: i32 = row.get(0);
+ let bounced: Txid = Txid::from_slice(row.get(1))?;
+ let info = Info::Bounce { bounced };
+ let metadata = encode_info(&info);
+
+ fail_point("(injected) fail bounce", 0.2)?;
+ match rpc.bounce(&bounced, fee, &metadata) {
+ Ok(it) => {
+ tx.execute(
+ "UPDATE bounce SET txid = $1, status = $2 WHERE id = $3",
+ &[&it.as_ref(), &(BounceStatus::Sent as i16), &id],
+ )?;
+ tx.commit()?;
+ info!("|| {} in {}", &bounced, &it);
+ }
+ Err(err) => match err {
+ rpc::Error::RPC {
+ code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError,
+ msg,
+ } => {
+ tx.execute(
+ "UPDATE bounce SET status = $1 WHERE id = $2",
+ &[&(BounceStatus::Ignored as i16), &id],
+ )?;
+ tx.commit()?;
+ info!("|| (ignore) {} because {}", &bounced, msg);
+ }
+ _ => Err(err)?,
+ },
+ }
+ }
+ Ok(row.is_some())
+ }
+
+ // TODO check if transactions are abandoned
+
+ let mut lifetime = config.btc_lifetime;
+ let mut status = true;
+
+ // Alway start with a sync work
+ let mut skip_notification = true;
+ loop {
+ // Check lifetime
+ if let Some(nb) = lifetime.as_mut() {
+ if *nb == 0 {
+ info!("Reach end of lifetime");
+ return;
+ } else {
+ *nb -= 1;
+ }
+ }
+
+ // Connect
+ let rpc = rpc.client();
+ let db = db.client();
+
+ let result: Result<(), Box<dyn std::error::Error>> = (|| {
+ // Listen to all channels
+ db.batch_execute("LISTEN new_block; LISTEN new_tx")?;
+ // Wait for the next notification
+ {
+ let mut ntf = db.notifications();
+ if !skip_notification && ntf.is_empty() {
+ // Block until next notification
+ ntf.blocking_iter().next()?;
+ }
+ // Conflate all notifications
+ let mut iter = ntf.iter();
+ while iter.next()?.is_some() {}
+ }
+ // Sync chain
+ sync_chain(rpc, db, config, state, &mut status)?;
+
+ // As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent
+
+ // Send delayed transactions
+ while send(db, rpc, TxStatus::Delayed)? {}
+ // Send requested transactions
+ while send(db, rpc, TxStatus::Requested)? {}
+
+ let bounce_fee = BtcAmount::from_sat(config.bounce_fee);
+ // Send delayed bounce
+ while bounce(db, rpc, BounceStatus::Delayed, &bounce_fee)? {}
+ // Send requested bounce
+ while bounce(db, rpc, BounceStatus::Requested, &bounce_fee)? {}
+
+ Ok(())
+ })();
+ if let Err(e) = result {
+ error!("worker: {}", e);
+ // On failure retry without waiting for notifications
+ skip_notification = true;
+ } else {
+ skip_notification = false;
+ }
+ }
+}
+
+/// Retrieve last stored hash
+fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> {
+ Ok(db
+ .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])?
+ .map(|r| BlockHash::from_slice(r.get(0)).unwrap()))
+}
+
+/// Parse new transactions, return true if the database is up to date with the latest mined block
+fn sync_chain(
+ rpc: &mut BtcRpc,
+ db: &mut Client,
+ config: &Config,
+ state: &WireState,
+ status: &mut bool,
+) -> Result<bool, Box<dyn std::error::Error>> {
+ // Get stored last_hash
+ let last_hash = last_hash(db)?;
+ let min_confirmations = state.confirmation.load(Ordering::SeqCst);
+
+ // Get a set of transactions ids to parse
+ let (txs, removed, lastblock): (HashMap<Txid, (Category, i32)>, HashSet<Txid>, BlockHash) = {
+ // Get all transactions made since this block
+ let list = rpc.list_since_block(last_hash.as_ref(), min_confirmations, true)?;
+ // Only keep ids and category
+ let txs = list
+ .transactions
+ .into_iter()
+ .map(|tx| (tx.txid, (tx.category, tx.confirmations)))
+ .collect();
+ let removed = list
+ .removed
+ .into_iter()
+ .filter_map(|tx| (tx.category == Category::Receive).then(|| tx.txid))
+ .collect();
+ (txs, removed, list.lastblock)
+ };
+
+ // Check if a confirmed incoming transaction have been removed by a blockchain reorganisation
+
+ let new_status = sync_chain_removed(&txs, &removed, rpc, db, min_confirmations as i32)?;
+
+ if *status != new_status {
+ let mut tx = db.transaction()?;
+ tx.execute(
+ "UPDATE state SET value=$1 WHERE name='status'",
+ &[&[new_status as u8].as_ref()],
+ )?;
+ tx.execute("NOTIFY status", &[])?;
+ tx.commit()?;
+ *status = new_status;
+ }
+
+ if !new_status {
+ return Ok(false);
+ }
+ for (id, (category, confirmations)) in txs {
+ match category {
+ Category::Send => sync_chain_outgoing(&id, confirmations, rpc, db, config)?,
+ Category::Receive if confirmations >= min_confirmations as i32 => {
+ sync_chain_incoming_confirmed(&id, rpc, db)?
+ }
+ Category::Receive if confirmations < 0 => {
+ panic!("receive conflict {} {}", id, confirmations)
+ }
+ _ => {
+ // Ignore coinbase and unconfirmed send transactions
+ }
+ }
+ }
+
+ // Move last_hash forward
+ {
+ let nb_row = if let Some(hash) = &last_hash {
+ db.execute(
+ "UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2",
+ &[&lastblock.as_ref(), &hash.as_ref()],
+ )?
+ } else {
+ db.execute(
+ "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING",
+ &[&lastblock.as_ref()],
+ )?
+ };
+
+ if nb_row == 0 {
+ error!("watcher: hash state collision, database have been altered by another process");
+ }
+ }
+ Ok(true)
+}
+
+/// Sync database with removed transactions, return false if bitcoin backing is compromised
+fn sync_chain_removed(
+ txs: &HashMap<Txid, (Category, i32)>,
+ removed: &HashSet<Txid>,
+ rpc: &mut BtcRpc,
+ db: &mut Client,
+ min_confirmations: i32,
+) -> Result<bool, Box<dyn std::error::Error>> {
+ // Removed transactions are correctness issue in only two cases:
+ // - An incoming valid transaction considered confirmed in the database
+ // - An incoming invalid transactions already bounced
+ // Those two cases can compromise bitcoin backing
+ // Removed outgoing transactions will be retried automatically by the node
+
+ let mut blocking_receive = Vec::new();
+ let mut blocking_bounce = Vec::new();
+ for id in removed {
+ match rpc.get_tx_segwit_key(id) {
+ Ok((full, key)) => {
+ // Valid tx are only problematic if not confirmed in the txs list and stored stored in the database
+ if txs
+ .get(id)
+ .map(|(_, confirmations)| *confirmations < min_confirmations)
+ .unwrap_or(true)
+ && db
+ .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_ref()])?
+ .is_some()
+ {
+ let debit_addr = sender_address(rpc, &full)?;
+ blocking_receive.push((key, id, debit_addr));
+ }
+ }
+ Err(err) => match err {
+ GetSegwitErr::Decode(_) => {
+ // Invalid tx are only problematic if already bounced
+ if let Some(row) = db.query_opt(
+ "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL",
+ &[&id.as_ref()],
+ )? {
+ let txid = Txid::from_slice(row.get(0)).unwrap();
+ blocking_bounce.push((txid, id));
+ } else {
+ // Remove transaction from bounce table
+ db.execute("DELETE FROM bounce WHERE bounced=$1", &[&id.as_ref()])?;
+ }
+ }
+ GetSegwitErr::RPC(it) => return Err(it.into()),
+ },
+ }
+ }
+
+ if !blocking_bounce.is_empty() || !blocking_receive.is_empty() {
+ let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string();
+ for (key, id, addr) in blocking_receive {
+ write!(
+ &mut buf,
+ "\n\treceived {} in {} from {}",
+ base32(&key),
+ id,
+ addr
+ )
+ .unwrap();
+ }
+ for (id, bounced) in blocking_bounce {
+ write!(&mut buf, "\n\tbounced {} in {}", id, bounced).unwrap();
+ }
+ error!("{}", buf);
+ return Ok(false);
+ } else {
+ return Ok(true);
+ }
+}
+
+/// Sync database with an outgoing transaction
+fn sync_chain_outgoing(
+ id: &Txid,
+ confirmations: i32,
+ rpc: &mut BtcRpc,
+ db: &mut Client,
+ config: &Config,
+) -> Result<(), Box<dyn std::error::Error>> {
+ match rpc
+ .get_tx_op_return(id)
+ .map(|(full, bytes)| (full, decode_info(&bytes)))
+ {
+ Ok((full, Ok(info))) => match info {
+ Info::Transaction { wtid, .. } => {
+ let credit_addr = full.details[0].address.as_ref().unwrap();
+ let amount = btc_to_taler(&full.amount);
+
+ if confirmations < 0 {
+ // Handle conflicting tx
+ let nb_row = db.execute(
+ "UPDATE tx_out SET status=$1, txid=NULL where txid=$2",
+ &[&(TxStatus::Delayed as i16), &id.as_ref()],
+ )?;
+ if nb_row > 0 {
+ warn!(
+ ">> (conflict) {} in {} to {}",
+ base32(&wtid),
+ id,
+ credit_addr
+ );
+ }
+ } else {
+ // Get previous out tx
+ let row = db.query_opt(
+ "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE",
+ &[&wtid.as_ref()],
+ )?;
+ if let Some(row) = row {
+ // If already in database sync status
+ let _id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ match TxStatus::try_from(status as u8).unwrap() {
+ TxStatus::Requested | TxStatus::Delayed => {
+ let nb_row = db.execute(
+ "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3",
+ &[&(TxStatus::Sent as i16), &_id, &status],
+ )?;
+ if nb_row > 0 {
+ warn!(
+ ">> (recovered) {} {} in {} to {}",
+ amount,
+ base32(&wtid),
+ id,
+ credit_addr
+ );
+ }
+ }
+ TxStatus::Sent => { /* Status is correct */ }
+ }
+ } else {
+ // Else add to database
+ let debit_addr = sender_address(rpc, &full)?;
+ let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
+ let nb = db.execute(
+ "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
+ &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &None::<&[u8]>],
+ )?;
+ if nb > 0 {
+ warn!(
+ ">> (onchain) {} {} in {} to {}",
+ amount,
+ base32(&wtid),
+ id,
+ credit_addr
+ );
+ }
+ }
+ }
+ }
+ Info::Bounce { bounced } => {
+ if confirmations < 0 {
+ // Handle conflicting tx
+ let nb_row = db.execute(
+ "UPDATE bounce SET status=$1, txid=NULL where txid=$2",
+ &[&(BounceStatus::Delayed as i16), &id.as_ref()],
+ )?;
+ if nb_row > 0 {
+ warn!("|| (conflict) {} in {}", &bounced, &id);
+ }
+ } else {
+ // Get previous bounce
+ let row = db.query_opt(
+ "SELECT id, status FROM bounce WHERE bounced=$1",
+ &[&bounced.as_ref()],
+ )?;
+ if let Some(row) = row {
+ // If already in database sync status
+ let _id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ match BounceStatus::try_from(status as u8).unwrap() {
+ BounceStatus::Requested | BounceStatus::Delayed => {
+ let nb_row = db.execute(
+ "UPDATE bounce SET status=$1 WHERE id=$2 AND status=$3",
+ &[&(BounceStatus::Sent as i16), &_id, &status],
+ )?;
+ if nb_row > 0 {
+ warn!("|| (recovered) {} in {}", &bounced, &id);
+ }
+ }
+ BounceStatus::Ignored => error!(
+ "watcher: ignored bounce {} found in chain at {}",
+ bounced, id
+ ),
+ BounceStatus::Sent => { /* Status is correct */ }
+ }
+ } else {
+ // Else add to database
+ let nb = db.execute(
+ "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING",
+ &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)],
+ )?;
+ if nb > 0 {
+ warn!("|| (onchain) {} in {}", &bounced, &id);
+ }
+ }
+ }
+ }
+ },
+ Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e),
+ Err(e) => match e {
+ GetOpReturnErr::MissingOpReturn => { /* Ignore */ }
+ GetOpReturnErr::RPC(e) => return Err(e)?,
+ },
+ }
+ Ok(())
+}
+
+/// Sync database with na incoming confirmed transaction
+fn sync_chain_incoming_confirmed(
+ id: &Txid,
+ rpc: &mut BtcRpc,
+ db: &mut Client,
+) -> Result<(), Box<dyn std::error::Error>> {
+ match rpc.get_tx_segwit_key(id) {
+ Ok((full, reserve_pub)) => {
+ // Store transactions in database
+ let debit_addr = sender_address(rpc, &full)?;
+ let credit_addr = full.details[0].address.as_ref().unwrap();
+ let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
+ let amount = btc_to_taler(&full.amount);
+ let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
+ &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref()
+ ])?;
+ if nb > 0 {
+ info!(
+ "<< {} {} in {} from {}",
+ amount,
+ base32(&reserve_pub),
+ id,
+ debit_addr
+ );
+ }
+ }
+ Err(err) => match err {
+ GetSegwitErr::Decode(_) => {
+ // If encoding is wrong request a bounce
+ db.execute(
+ "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING",
+ &[&id.as_ref()],
+ )?;
+ }
+ GetSegwitErr::RPC(e) => return Err(e.into()),
+ },
+ }
+ Ok(())
+}
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
index 50fba54..875281f 100644
--- a/btc-wire/src/main.rs
+++ b/btc-wire/src/main.rs
@@ -1,562 +1,24 @@
-use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Network, Txid};
+use bitcoin::Network;
use btc_wire::{
config::{BitcoinConfig, WIRE_WALLET_NAME},
- rpc::{self, BtcRpc, Category, ErrorCode},
- rpc_utils::{default_data_dir, sender_address},
- GetOpReturnErr, GetSegwitErr,
+ rpc::BtcRpc,
+ rpc_utils::default_data_dir,
};
-use info::decode_info;
-use postgres::{fallible_iterator::FallibleIterator, Client};
use reconnect::{AutoReconnectRPC, AutoReconnectSql};
-use std::{
- collections::{HashMap, HashSet},
- fmt::Write,
- str::FromStr,
- time::{Duration, SystemTime},
-};
-use taler_common::{
- api_common::{base32, Amount},
- config::Config,
- log::log::{error, info, warn},
-};
-use taler_util::btc_payto_url;
-use url::Url;
+use std::{sync::atomic::AtomicU16, thread::JoinHandle};
+use taler_common::{config::Config, log::log::info};
-use crate::{
- fail_point::fail_point,
- info::{encode_info, Info},
- status::{BounceStatus, TxStatus},
- taler_util::{btc_payto_addr, btc_to_taler, taler_to_btc},
-};
+use crate::loops::{analysis::analysis, listener::block_listener, worker::worker};
mod fail_point;
mod info;
+mod loops;
mod reconnect;
mod status;
mod taler_util;
-/// Retrieve last stored hash
-fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> {
- Ok(db
- .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])?
- .map(|r| BlockHash::from_slice(r.get(0)).unwrap()))
-}
-
-/// Listen for new proposed transactions and announce them on the bitcoin network
-fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config) {
- /// Send a transaction on the blockchain, return true if more transactions with the same status remains
- fn send(
- db: &mut Client,
- rpc: &mut BtcRpc,
- status: TxStatus,
- ) -> Result<bool, Box<dyn std::error::Error>> {
- assert!(status == TxStatus::Delayed || status == TxStatus::Requested);
- let mut tx = db.transaction()?;
- // We lock the row with FOR UPDATE to prevent sending same transaction multiple time
- let row = tx.query_opt(
- "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE",
- &[&(status as i16)],
- )?;
- if let Some(row) = &row {
- let id: i32 = row.get(0);
- let amount = taler_to_btc(&Amount::from_str(row.get(1))?)?;
- let wtid: &[u8] = row.get(2);
- let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?;
- let exchange_base_url: Url = Url::parse(row.get(4))?;
- let info = Info::Transaction {
- wtid: wtid.try_into()?,
- url: exchange_base_url,
- };
- let metadata = encode_info(&info);
-
- fail_point("(injected) fail send_op_return", 0.2)?;
- match rpc.send_op_return(&addr, &amount, &metadata, false) {
- Ok(tx_id) => {
- fail_point("(injected) fail update db", 0.2)?;
- tx.execute(
- "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
- &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id],
- )?;
- tx.commit()?;
- let amount = btc_to_taler(&amount.to_signed().unwrap());
- info!(">> {} {} in {} to {}", amount, base32(wtid), tx_id, addr);
- }
- Err(e) => {
- tx.execute(
- "UPDATE tx_out SET status=$1 WHERE id=$2",
- &[&(TxStatus::Delayed as i16), &id],
- )?;
- tx.commit()?;
- info!("sender: RPC - {}", e);
- }
- }
- }
- Ok(row.is_some())
- }
-
- /// Bounce a transaction on the blockchain, return true if more bounce with the same status remains
- fn bounce(
- db: &mut Client,
- rpc: &mut BtcRpc,
- status: BounceStatus,
- fee: &BtcAmount,
- ) -> Result<bool, Box<dyn std::error::Error>> {
- assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested);
- let mut tx = db.transaction()?;
- // We lock the row with FOR UPDATE to prevent sending same transaction multiple time
- let row = tx.query_opt(
- "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE",
- &[&(status as i16)],
- )?;
- if let Some(row) = &row {
- let id: i32 = row.get(0);
- let bounced: Txid = Txid::from_slice(row.get(1))?;
- let info = Info::Bounce { bounced };
- let metadata = encode_info(&info);
-
- fail_point("(injected) fail bounce", 0.2)?;
- match rpc.bounce(&bounced, fee, &metadata) {
- Ok(it) => {
- tx.execute(
- "UPDATE bounce SET txid = $1, status = $2 WHERE id = $3",
- &[&it.as_ref(), &(BounceStatus::Sent as i16), &id],
- )?;
- tx.commit()?;
- info!("|| {} in {}", &bounced, &it);
- }
- Err(err) => match err {
- rpc::Error::RPC {
- code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError,
- msg,
- } => {
- tx.execute(
- "UPDATE bounce SET status = $1 WHERE id = $2",
- &[&(BounceStatus::Ignored as i16), &id],
- )?;
- tx.commit()?;
- info!("|| (ignore) {} because {}", &bounced, msg);
- }
- _ => Err(err)?,
- },
- }
- }
- Ok(row.is_some())
- }
-
- // TODO check if transactions are abandoned
-
- let mut lifetime = config.btc_lifetime;
- let mut status = true;
-
- // Alway start with a sync work
- let mut skip_notification = true;
- loop {
- // Check lifetime
- if let Some(nb) = lifetime.as_mut() {
- if *nb == 0 {
- info!("Reach end of lifetime");
- return;
- } else {
- *nb -= 1;
- }
- }
-
- // Connect
- let rpc = rpc.client();
- let db = db.client();
-
- let result: Result<(), Box<dyn std::error::Error>> = (|| {
- // Listen to all channels
- db.batch_execute("LISTEN new_block; LISTEN new_tx")?;
- // Wait for the next notification
- {
- let mut ntf = db.notifications();
- if !skip_notification && ntf.is_empty() {
- // Block until next notification
- ntf.blocking_iter().next()?;
- }
- // Conflate all notifications
- let mut iter = ntf.iter();
- while iter.next()?.is_some() {}
- }
- // Sync chain
- sync_chain(rpc, db, config, &mut status)?;
-
- // As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent
-
- // Send delayed transactions
- while send(db, rpc, TxStatus::Delayed)? {}
- // Send requested transactions
- while send(db, rpc, TxStatus::Requested)? {}
-
- let bounce_fee = BtcAmount::from_sat(config.bounce_fee);
- // Send delayed bounce
- while bounce(db, rpc, BounceStatus::Delayed, &bounce_fee)? {}
- // Send requested bounce
- while bounce(db, rpc, BounceStatus::Requested, &bounce_fee)? {}
-
- Ok(())
- })();
- if let Err(e) = result {
- error!("worker: {}", e);
- // On failure retry without waiting for notifications
- skip_notification = true;
- } else {
- skip_notification = false;
- }
- }
-}
-
-/// Parse new transactions, return true if the database is up to date with the latest mined block
-fn sync_chain(
- rpc: &mut BtcRpc,
- db: &mut Client,
- config: &Config,
- status: &mut bool,
-) -> Result<bool, Box<dyn std::error::Error>> {
- // Get stored last_hash
- let last_hash = last_hash(db)?;
- let min_confirmations = config.confirmation;
-
- // Get a set of transactions ids to parse
- let (txs, removed, lastblock): (HashMap<Txid, (Category, i32)>, HashSet<Txid>, BlockHash) = {
- // Get all transactions made since this block
- let list = rpc.list_since_block(last_hash.as_ref(), min_confirmations, true)?;
- // Only keep ids and category
- let txs = list
- .transactions
- .into_iter()
- .map(|tx| (tx.txid, (tx.category, tx.confirmations)))
- .collect();
- let removed = list
- .removed
- .into_iter()
- .filter_map(|tx| (tx.category == Category::Receive).then(|| tx.txid))
- .collect();
- (txs, removed, list.lastblock)
- };
-
- // Check if a confirmed incoming transaction have been removed by a blockchain reorganisation
-
- let new_status = sync_chain_removed(&txs, &removed, rpc, db, min_confirmations as i32)?;
-
- if *status != new_status {
- let mut tx = db.transaction()?;
- tx.execute(
- "UPDATE state SET value=$1 WHERE name='status'",
- &[&[new_status as u8].as_ref()],
- )?;
- tx.execute("NOTIFY status", &[])?;
- tx.commit()?;
- *status = new_status;
- }
-
- if !new_status {
- return Ok(false);
- }
- for (id, (category, confirmations)) in txs {
- match category {
- Category::Send => sync_chain_outgoing(&id, confirmations, rpc, db, config)?,
- Category::Receive if confirmations >= min_confirmations as i32 => {
- sync_chain_incoming_confirmed(&id, rpc, db)?
- }
- Category::Receive if confirmations < 0 => {
- panic!("receive conflict {} {}", id, confirmations)
- }
- _ => {
- // Ignore coinbase and unconfirmed send transactions
- }
- }
- }
- // Move last_hash forward if no error have been caught
- {
- let curr_hash: Option<BlockHash> = db
- .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])?
- .map(|r| BlockHash::from_slice(r.get(0)).unwrap());
-
- let nb_row = if let Some(hash) = &curr_hash {
- db.execute(
- "UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2",
- &[&lastblock.as_ref(), &hash.as_ref()],
- )?
- } else {
- db.execute(
- "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING",
- &[&lastblock.as_ref()],
- )?
- };
-
- if last_hash != curr_hash || nb_row == 0 {
- error!("watcher: hash state collision, database have been altered by another process");
- }
- }
- Ok(true)
-}
-
-/// Sync database with removed transactions, return false if bitcoin backing is compromised
-fn sync_chain_removed(
- txs: &HashMap<Txid, (Category, i32)>,
- removed: &HashSet<Txid>,
- rpc: &mut BtcRpc,
- db: &mut Client,
- min_confirmations: i32,
-) -> Result<bool, Box<dyn std::error::Error>> {
- // Removed transactions are correctness issue in only two cases:
- // - An incoming valid transaction considered confirmed in the database
- // - An incoming invalid transactions already bounced
- // Those two cases can compromise bitcoin backing
- // Removed outgoing transactions will be retried automatically by the node
-
- let mut blocking_receive = Vec::new();
- let mut blocking_bounce = Vec::new();
- for id in removed {
- match rpc.get_tx_segwit_key(id) {
- Ok((full, key)) => {
- // Valid tx are only problematic if not confirmed in the txs list and stored stored in the database
- if txs
- .get(id)
- .map(|(_, confirmations)| *confirmations < min_confirmations)
- .unwrap_or(true)
- && db
- .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_ref()])?
- .is_some()
- {
- let debit_addr = sender_address(rpc, &full)?;
- blocking_receive.push((key, id, debit_addr));
- }
- }
- Err(err) => match err {
- GetSegwitErr::Decode(_) => {
- // Invalid tx are only problematic if already bounced
- if let Some(row) = db.query_opt(
- "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL",
- &[&id.as_ref()],
- )? {
- let txid = Txid::from_slice(row.get(0)).unwrap();
- blocking_bounce.push((txid, id));
- } else {
- // Remove transaction from bounce table
- db.execute("DELETE FROM bounce WHERE bounced=$1", &[&id.as_ref()])?;
- }
- }
- GetSegwitErr::RPC(it) => return Err(it.into()),
- },
- }
- }
-
- if !blocking_bounce.is_empty() || !blocking_receive.is_empty() {
- let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string();
- for (key, id, addr) in blocking_receive {
- write!(
- &mut buf,
- "\n\treceived {} in {} from {}",
- base32(&key),
- id,
- addr
- )
- .unwrap();
- }
- for (id, bounced) in blocking_bounce {
- write!(&mut buf, "\n\tbounced {} in {}", id, bounced).unwrap();
- }
- error!("{}", buf);
- return Ok(false);
- } else {
- return Ok(true);
- }
-}
-
-/// Sync database with an outgoing transaction
-fn sync_chain_outgoing(
- id: &Txid,
- confirmations: i32,
- rpc: &mut BtcRpc,
- db: &mut Client,
- config: &Config,
-) -> Result<(), Box<dyn std::error::Error>> {
- match rpc
- .get_tx_op_return(id)
- .map(|(full, bytes)| (full, decode_info(&bytes)))
- {
- Ok((full, Ok(info))) => match info {
- Info::Transaction { wtid, .. } => {
- let credit_addr = full.details[0].address.as_ref().unwrap();
- let amount = btc_to_taler(&full.amount);
-
- if confirmations < 0 {
- // Handle conflicting tx
- let nb_row = db.execute(
- "UPDATE tx_out SET status=$1, txid=NULL where txid=$2",
- &[&(TxStatus::Delayed as i16), &id.as_ref()],
- )?;
- if nb_row > 0 {
- warn!(
- ">> (conflict) {} in {} to {}",
- base32(&wtid),
- id,
- credit_addr
- );
- }
- } else {
- // Get previous out tx
- let row = db.query_opt(
- "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE",
- &[&wtid.as_ref()],
- )?;
- if let Some(row) = row {
- // If already in database sync status
- let _id: i32 = row.get(0);
- let status: i16 = row.get(1);
- match TxStatus::try_from(status as u8).unwrap() {
- TxStatus::Requested | TxStatus::Delayed => {
- let nb_row = db.execute(
- "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3",
- &[&(TxStatus::Sent as i16), &_id, &status],
- )?;
- if nb_row > 0 {
- warn!(
- ">> (recovered) {} {} in {} to {}",
- amount,
- base32(&wtid),
- id,
- credit_addr
- );
- }
- }
- TxStatus::Sent => { /* Status is correct */ }
- }
- } else {
- // Else add to database
- let debit_addr = sender_address(rpc, &full)?;
- let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
- let nb = db.execute(
- "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
- &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &None::<&[u8]>],
- )?;
- if nb > 0 {
- warn!(
- ">> (onchain) {} {} in {} to {}",
- amount,
- base32(&wtid),
- id,
- credit_addr
- );
- }
- }
- }
- }
- Info::Bounce { bounced } => {
- if confirmations < 0 {
- // Handle conflicting tx
- let nb_row = db.execute(
- "UPDATE bounce SET status=$1, txid=NULL where txid=$2",
- &[&(BounceStatus::Delayed as i16), &id.as_ref()],
- )?;
- if nb_row > 0 {
- warn!("|| (conflict) {} in {}", &bounced, &id);
- }
- } else {
- // Get previous bounce
- let row = db.query_opt(
- "SELECT id, status FROM bounce WHERE bounced=$1",
- &[&bounced.as_ref()],
- )?;
- if let Some(row) = row {
- // If already in database sync status
- let _id: i32 = row.get(0);
- let status: i16 = row.get(1);
- match BounceStatus::try_from(status as u8).unwrap() {
- BounceStatus::Requested | BounceStatus::Delayed => {
- let nb_row = db.execute(
- "UPDATE bounce SET status=$1 WHERE id=$2 AND status=$3",
- &[&(BounceStatus::Sent as i16), &_id, &status],
- )?;
- if nb_row > 0 {
- warn!("|| (recovered) {} in {}", &bounced, &id);
- }
- }
- BounceStatus::Ignored => error!(
- "watcher: ignored bounce {} found in chain at {}",
- bounced, id
- ),
- BounceStatus::Sent => { /* Status is correct */ }
- }
- } else {
- // Else add to database
- let nb = db.execute(
- "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING",
- &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)],
- )?;
- if nb > 0 {
- warn!("|| (onchain) {} in {}", &bounced, &id);
- }
- }
- }
- }
- },
- Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e),
- Err(e) => match e {
- GetOpReturnErr::MissingOpReturn => { /* Ignore */ }
- GetOpReturnErr::RPC(e) => return Err(e)?,
- },
- }
- Ok(())
-}
-
-/// Sync database with na incoming confirmed transaction
-fn sync_chain_incoming_confirmed(
- id: &Txid,
- rpc: &mut BtcRpc,
- db: &mut Client,
-) -> Result<(), Box<dyn std::error::Error>> {
- match rpc.get_tx_segwit_key(id) {
- Ok((full, reserve_pub)) => {
- // Store transactions in database
- let debit_addr = sender_address(rpc, &full)?;
- let credit_addr = full.details[0].address.as_ref().unwrap();
- let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
- let amount = btc_to_taler(&full.amount);
- let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
- &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref()
- ])?;
- if nb > 0 {
- info!(
- "<< {} {} in {} from {}",
- amount,
- base32(&reserve_pub),
- id,
- debit_addr
- );
- }
- }
- Err(err) => match err {
- GetSegwitErr::Decode(_) => {
- // If encoding is wrong request a bounce
- db.execute(
- "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING",
- &[&id.as_ref()],
- )?;
- }
- GetSegwitErr::RPC(e) => return Err(e.into()),
- },
- }
- Ok(())
-}
-
-/// Wait for new block and notify arrival with postgreSQL notifications
-fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) {
- loop {
- let rpc = rpc.client();
- let db = db.client();
- let result: Result<(), Box<dyn std::error::Error>> = (|| {
- rpc.wait_for_new_block(0)?;
- db.execute("NOTIFY new_block", &[])?;
- Ok(())
- })();
- if let Err(e) = result {
- error!("listener: {}", e);
- }
- }
+pub struct WireState {
+ confirmation: AtomicU16,
}
fn main() {
@@ -588,13 +50,33 @@ fn main() {
};
info!("Running on {} chain", chain_name);
+ let state: &'static WireState = Box::leak(Box::new(WireState {
+ confirmation: AtomicU16::new(config.confirmation),
+ }));
+
let mut rpc = BtcRpc::common(&btc_config).unwrap();
rpc.load_wallet(WIRE_WALLET_NAME).ok();
let rpc_listener = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME);
+ let rpc_analysis = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME);
let rpc_worker = AutoReconnectRPC::new(btc_config, WIRE_WALLET_NAME);
let db_listener = AutoReconnectSql::new(&config.db_url);
+ let db_analysis = AutoReconnectSql::new(&config.db_url);
let db_worker = AutoReconnectSql::new(&config.db_url);
- std::thread::spawn(move || block_listener(rpc_listener, db_listener));
- worker(rpc_worker, db_worker, config);
+ named_spawn("listener".to_string(), move || {
+ block_listener(rpc_listener, db_listener)
+ });
+ named_spawn("analysis".to_string(), move || {
+ analysis(rpc_analysis, db_analysis, config, state)
+ });
+ worker(rpc_worker, db_worker, config, state);
+}
+
+pub fn named_spawn<F, T>(name: String, f: F) -> JoinHandle<T>
+where
+ F: FnOnce() -> T,
+ F: Send + 'static,
+ T: Send + 'static,
+{
+ std::thread::Builder::new().name(name).spawn(f).unwrap()
}
diff --git a/btc-wire/src/rpc.rs b/btc-wire/src/rpc.rs
index 1b02aeb..4b7e424 100644
--- a/btc-wire/src/rpc.rs
+++ b/btc-wire/src/rpc.rs
@@ -7,6 +7,8 @@
//!
//! We only parse the thing we actually use, this reduce memory usage and
//! make our code more compatible with future deprecation
+//!
+//! bitcoincore RPC documentation: <https://bitcoincore.org/en/doc/22.0.0/>
use bitcoin::{hashes::hex::ToHex, Address, Amount, BlockHash, SignedAmount, Txid};
use serde_json::{json, Value};
@@ -272,7 +274,7 @@ impl BtcRpc {
pub fn list_since_block(
&mut self,
hash: Option<&BlockHash>,
- confirmation: u8,
+ confirmation: u16,
include_remove: bool,
) -> Result<ListSinceBlock> {
self.call("listsinceblock", &(hash, confirmation, (), include_remove))
@@ -285,6 +287,10 @@ impl BtcRpc {
}
}
+ pub fn get_chain_tips(&mut self) -> Result<Vec<ChainTips>> {
+ self.call("getchaintips", &EMPTY)
+ }
+
pub fn get_tx(&mut self, id: &Txid) -> Result<TransactionFull> {
self.call("gettransaction", &(id, (), true))
}
@@ -400,6 +406,28 @@ pub struct HexWrapper {
pub hex: String,
}
+#[derive(Clone, PartialEq, Eq, serde::Deserialize, Debug)]
+pub struct ChainTips {
+ pub height: u64,
+ pub hash: bitcoin::BlockHash,
+ #[serde(rename = "branchlen")]
+ pub branch_length: usize,
+ pub status: ChainTipsStatus,
+}
+
+#[derive(Copy, serde::Deserialize, Clone, PartialEq, Eq, Debug)]
+#[serde(rename_all = "lowercase")]
+pub enum ChainTipsStatus {
+ Invalid,
+ #[serde(rename = "headers-only")]
+ HeadersOnly,
+ #[serde(rename = "valid-headers")]
+ ValidHeaders,
+ #[serde(rename = "valid-fork")]
+ ValidFork,
+ Active,
+}
+
#[derive(Debug, serde::Deserialize)]
pub struct Nothing {}
diff --git a/makefile b/makefile
index d947f1a..c27ab89 100644
--- a/makefile
+++ b/makefile
@@ -14,6 +14,7 @@ test_btc:
test/btc/conflict.sh
test/btc/reorg.sh
test/btc/hell.sh
+ test/btc/analysis.sh
test/btc/config.sh
test: install test_gateway test_btc \ No newline at end of file
diff --git a/taler-common/src/config.rs b/taler-common/src/config.rs
index 539c9e3..e43e1ed 100644
--- a/taler-common/src/config.rs
+++ b/taler-common/src/config.rs
@@ -32,7 +32,7 @@ pub struct Config {
pub unix_path: Option<PathBuf>,
pub btc_data_dir: Option<PathBuf>,
pub payto: Url,
- pub confirmation: u8,
+ pub confirmation: u16,
pub bounce_fee: u64,
pub btc_lifetime: Option<u64>,
pub http_lifetime: Option<u64>,
diff --git a/test/btc/analysis.sh b/test/btc/analysis.sh
new file mode 100644
index 0000000..5677c2d
--- /dev/null
+++ b/test/btc/analysis.sh
@@ -0,0 +1,65 @@
+#!/bin/bash
+
+## Test btc_wire ability to learn and protect itself from blockchain behavior
+
+set -eu
+
+source "${BASH_SOURCE%/*}/../common.sh"
+SCHEMA=btc.sql
+
+echo "----- Setup -----"
+echo "Load config file"
+load_config
+echo "Start database"
+setup_db
+echo "Start bitcoin node"
+init_btc
+echo "Start second bitcoin node"
+init_btc2
+echo "Start btc-wire"
+btc_wire
+echo "Start gateway"
+gateway
+echo ""
+
+echo "----- Learn from reorg -----"
+
+echo "Loose second bitcoin node"
+btc2_deco
+
+echo -n "Making wire transfer to exchange:"
+btc-wire-utils -d $BTC_DIR transfer 0.042 > /dev/null
+next_btc # Trigger btc_wire
+check_balance 9.95799209 0.04200000
+echo " OK"
+
+echo -n "Perform fork and check btc-wire hard error:"
+gateway_up
+btc2_fork 5
+check_balance 9.95799209 0.00000000
+gateway_down
+echo " OK"
+
+echo -n "Recover orphaned transactions:"
+next_btc 5 # More block needed to confirm
+check_balance 9.95799209 0.04200000
+gateway_up
+echo " OK"
+
+echo "Loose second bitcoin node"
+btc2_deco
+
+echo -n "Making wire transfer to exchange:"
+btc-wire-utils -d $BTC_DIR transfer 0.064 > /dev/null
+next_btc 5 # More block needed to confirm
+check_balance 9.89398418 0.10600000
+echo " OK"
+
+echo -n "Perform fork and check btc-wire learned from previous attack:"
+gateway_up
+btc2_fork 5
+check_balance 9.89398418 0.10600000
+gateway_up
+echo " OK"
+
+echo "All tests passed!" \ No newline at end of file
diff --git a/test/btc/conflict.sh b/test/btc/conflict.sh
index 2fa6376..f4710cd 100644
--- a/test/btc/conflict.sh
+++ b/test/btc/conflict.sh
@@ -57,7 +57,7 @@ check_balance 9.96299209 0.03698010
echo " OK"
echo -n "Resend conflicting transaction:"
-sleep 5
+sleep 5 # Wait for reconnection
mine_btc
check_delta "outgoing?delta=-100" "seq 4 5" "0.00"
check_balance 9.96699209 0.03297811
@@ -106,7 +106,7 @@ check_balance 9.95994929 0.04001000
echo " OK"
echo -n "Resend conflicting transaction:"
-sleep 5
+sleep 5 # Wait for reconnection
mine_btc
check_balance 9.99993744 0.00002000
echo " OK"
diff --git a/test/btc/hell.sh b/test/btc/hell.sh
index d7c3d51..f8862d2 100644
--- a/test/btc/hell.sh
+++ b/test/btc/hell.sh
@@ -35,17 +35,11 @@ echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
gateway_up
-btc2_fork
+btc2_fork 5
check_balance 9.99579209 0.00000000
gateway_down
echo " OK"
-echo -n "Check btc-wire hard error on restart:"
-btc_wire
-sleep 1
-gateway_down
-echo " OK"
-
echo -n "Generate conflict:"
restart_btc -minrelaytxfee=0.0001
btc-wire-utils -d $BTC_DIR abandon client
@@ -54,14 +48,10 @@ next_btc
check_balance 9.99457382 0.00540000
echo " OK"
-echo -n "Check btc-wire never heal on restart:"
-btc_wire
-sleep 1
+echo -n "Check btc-wire have not read the conflicting transaction:"
+sleep 5 # Wait for reconnection # Wait for reconnection
gateway_down
check_balance 9.99457382 0.00540000
-echo " OK"
-
-echo -n "Check btc-wire have not read the conflicting transaction:"
check_delta "incoming" ""
echo " OK"
@@ -93,23 +83,16 @@ btc2_deco
echo -n "Generate bounce:"
$BTC_CLI -rpcwallet=client sendtoaddress $WIRE 0.042 > /dev/null
next_btc
-sleep 1
check_balance 9.99998674 0.00001000
echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
gateway_up
-btc2_fork
+btc2_fork 5
check_balance 9.95799859 0.00000000
gateway_down
echo " OK"
-echo -n "Check btc-wire hard error on restart:"
-btc_wire
-sleep 1
-gateway_down
-echo " OK"
-
echo -n "Generate conflict:"
restart_btc -minrelaytxfee=0.0001
btc-wire-utils -d $BTC_DIR abandon client
@@ -118,16 +101,10 @@ next_btc
check_balance 9.94597382 0.05400000
echo " OK"
-sleep 5
-
-echo -n "Check btc-wire never heal on restart:"
-btc_wire
-sleep 1
+echo -n "Check btc-wire have not read the conflicting transaction:"
+sleep 5 # Wait for reconnection
gateway_down
check_balance 9.94597382 0.05400000
-echo " OK"
-
-echo -n "Check btc-wire have not read the conflicting transaction:"
check_delta "incoming" ""
echo " OK"
diff --git a/test/btc/reorg.sh b/test/btc/reorg.sh
index 3e4da8f..a0de12d 100644
--- a/test/btc/reorg.sh
+++ b/test/btc/reorg.sh
@@ -41,25 +41,14 @@ echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
gateway_up
-btc2_fork
+btc2_fork 22
check_balance 9.99826299 0.00000000
gateway_down
echo " OK"
-echo -n "Check btc-wire hard error on restart:"
-btc_wire
-sleep 1
-gateway_down
-echo " OK"
-
echo -n "Recover orphaned transactions:"
-next_btc
+next_btc 6 # More block needed to confirm
check_balance 9.99826299 0.00165000
-echo " OK"
-
-echo -n "Check btc-wire heal on restart:"
-btc_wire
-sleep 1
gateway_up
echo " OK"
@@ -83,19 +72,20 @@ echo " OK"
echo -n "Perform fork and check btc-wire still up:"
gateway_up
-btc2_fork
+btc2_fork 22
check_balance 9.99826299 0.00146311
gateway_up
echo " OK"
echo -n "Recover orphaned transactions:"
-next_btc
+next_btc 6 # More block needed to confirm
check_balance 9.99842799 0.00146311
echo " OK"
echo "----- Handle reorg bounce -----"
clear_wallet
+check_balance "*" 0.00000000
echo "Loose second bitcoin node"
btc2_deco
@@ -105,20 +95,20 @@ for n in `$SEQ`; do
$BTC_CLI -rpcwallet=client sendtoaddress $WIRE 0.000$n > /dev/null
mine_btc
done
-next_btc
+next_btc 6 # More block needed to confirm
sleep 1
check_balance "*" 0.00011000
echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
gateway_up
-btc2_fork
+btc2_fork 22
check_balance "*" 0.00000000
gateway_down
echo " OK"
echo -n "Recover orphaned transactions:"
-next_btc
+next_btc 6 # More block needed to confirm
check_balance "*" 0.00011000
echo " OK"
diff --git a/test/btc/stress.sh b/test/btc/stress.sh
index 48eade0..ff31563 100644
--- a/test/btc/stress.sh
+++ b/test/btc/stress.sh
@@ -99,7 +99,7 @@ done
next_btc
sleep 7
mine_btc
-sleep 5
+sleep 5 # Wait for reconnection
echo " OK"
echo -n "Check balance:"
diff --git a/test/common.sh b/test/common.sh
index 800d61e..a28165e 100644
--- a/test/common.sh
+++ b/test/common.sh
@@ -122,7 +122,7 @@ function btc2_deco() {
# Create a fork on the second node and reconnect the two node
function btc2_fork() {
- $BTC_CLI2 generatetoaddress ${1:-50} $RESERVE > /dev/null
+ $BTC_CLI2 generatetoaddress $1 $RESERVE > /dev/null
$BTC_CLI addnode 127.0.0.1:8346 onetry
sleep 1
}
@@ -160,7 +160,7 @@ function mine_btc() {
# Mine previous transactions
function next_btc() {
# Mine enough block to confirm previous transactions
- mine_btc $CONFIRMATION
+ mine_btc ${1:-$CONFIRMATION}
# Wait for btc_wire to catch up
sleep 0.2
# Mine one more block to trigger btc_wire