diff options
Diffstat (limited to 'btc-wire/src/loops/worker.rs')
-rw-r--r-- | btc-wire/src/loops/worker.rs | 541 |
1 files changed, 541 insertions, 0 deletions
diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs new file mode 100644 index 0000000..d8924de --- /dev/null +++ b/btc-wire/src/loops/worker.rs @@ -0,0 +1,541 @@ +use std::{ + collections::{HashMap, HashSet}, + fmt::Write, + str::FromStr, + sync::atomic::Ordering, + time::{Duration, SystemTime}, +}; + +use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Txid}; +use btc_wire::{ + rpc::{self, BtcRpc, Category, ErrorCode}, + rpc_utils::sender_address, + GetOpReturnErr, GetSegwitErr, +}; +use postgres::{fallible_iterator::FallibleIterator, Client}; +use taler_common::{ + api_common::{base32, Amount}, + config::Config, + log::log::{error, info, warn}, +}; +use url::Url; + +use crate::{ + fail_point::fail_point, + info::{decode_info, encode_info, Info}, + reconnect::{AutoReconnectRPC, AutoReconnectSql}, + status::{BounceStatus, TxStatus}, + taler_util::{btc_payto_addr, btc_payto_url, btc_to_taler, taler_to_btc}, + WireState, +}; + +/// Listen for new proposed transactions and announce them on the bitcoin network +pub fn worker( + mut rpc: AutoReconnectRPC, + mut db: AutoReconnectSql, + config: &Config, + state: &WireState, +) { + /// Send a transaction on the blockchain, return true if more transactions with the same status remains + fn send( + db: &mut Client, + rpc: &mut BtcRpc, + status: TxStatus, + ) -> Result<bool, Box<dyn std::error::Error>> { + assert!(status == TxStatus::Delayed || status == TxStatus::Requested); + let mut tx = db.transaction()?; + // We lock the row with FOR UPDATE to prevent sending same transaction multiple time + let row = tx.query_opt( + "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE", + &[&(status as i16)], + )?; + if let Some(row) = &row { + let id: i32 = row.get(0); + let amount = taler_to_btc(&Amount::from_str(row.get(1))?)?; + let wtid: &[u8] = row.get(2); + let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?; + let exchange_base_url: Url = Url::parse(row.get(4))?; + let info = Info::Transaction { + wtid: wtid.try_into()?, + url: exchange_base_url, + }; + let metadata = encode_info(&info); + + fail_point("(injected) fail send_op_return", 0.2)?; + match rpc.send_op_return(&addr, &amount, &metadata, false) { + Ok(tx_id) => { + fail_point("(injected) fail update db", 0.2)?; + tx.execute( + "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", + &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id], + )?; + tx.commit()?; + let amount = btc_to_taler(&amount.to_signed().unwrap()); + info!(">> {} {} in {} to {}", amount, base32(wtid), tx_id, addr); + } + Err(e) => { + tx.execute( + "UPDATE tx_out SET status=$1 WHERE id=$2", + &[&(TxStatus::Delayed as i16), &id], + )?; + tx.commit()?; + info!("sender: RPC - {}", e); + } + } + } + Ok(row.is_some()) + } + + /// Bounce a transaction on the blockchain, return true if more bounce with the same status remains + fn bounce( + db: &mut Client, + rpc: &mut BtcRpc, + status: BounceStatus, + fee: &BtcAmount, + ) -> Result<bool, Box<dyn std::error::Error>> { + assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested); + let mut tx = db.transaction()?; + // We lock the row with FOR UPDATE to prevent sending same transaction multiple time + let row = tx.query_opt( + "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE", + &[&(status as i16)], + )?; + if let Some(row) = &row { + let id: i32 = row.get(0); + let bounced: Txid = Txid::from_slice(row.get(1))?; + let info = Info::Bounce { bounced }; + let metadata = encode_info(&info); + + fail_point("(injected) fail bounce", 0.2)?; + match rpc.bounce(&bounced, fee, &metadata) { + Ok(it) => { + tx.execute( + "UPDATE bounce SET txid = $1, status = $2 WHERE id = $3", + &[&it.as_ref(), &(BounceStatus::Sent as i16), &id], + )?; + tx.commit()?; + info!("|| {} in {}", &bounced, &it); + } + Err(err) => match err { + rpc::Error::RPC { + code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, + msg, + } => { + tx.execute( + "UPDATE bounce SET status = $1 WHERE id = $2", + &[&(BounceStatus::Ignored as i16), &id], + )?; + tx.commit()?; + info!("|| (ignore) {} because {}", &bounced, msg); + } + _ => Err(err)?, + }, + } + } + Ok(row.is_some()) + } + + // TODO check if transactions are abandoned + + let mut lifetime = config.btc_lifetime; + let mut status = true; + + // Alway start with a sync work + let mut skip_notification = true; + loop { + // Check lifetime + if let Some(nb) = lifetime.as_mut() { + if *nb == 0 { + info!("Reach end of lifetime"); + return; + } else { + *nb -= 1; + } + } + + // Connect + let rpc = rpc.client(); + let db = db.client(); + + let result: Result<(), Box<dyn std::error::Error>> = (|| { + // Listen to all channels + db.batch_execute("LISTEN new_block; LISTEN new_tx")?; + // Wait for the next notification + { + let mut ntf = db.notifications(); + if !skip_notification && ntf.is_empty() { + // Block until next notification + ntf.blocking_iter().next()?; + } + // Conflate all notifications + let mut iter = ntf.iter(); + while iter.next()?.is_some() {} + } + // Sync chain + sync_chain(rpc, db, config, state, &mut status)?; + + // As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent + + // Send delayed transactions + while send(db, rpc, TxStatus::Delayed)? {} + // Send requested transactions + while send(db, rpc, TxStatus::Requested)? {} + + let bounce_fee = BtcAmount::from_sat(config.bounce_fee); + // Send delayed bounce + while bounce(db, rpc, BounceStatus::Delayed, &bounce_fee)? {} + // Send requested bounce + while bounce(db, rpc, BounceStatus::Requested, &bounce_fee)? {} + + Ok(()) + })(); + if let Err(e) = result { + error!("worker: {}", e); + // On failure retry without waiting for notifications + skip_notification = true; + } else { + skip_notification = false; + } + } +} + +/// Retrieve last stored hash +fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> { + Ok(db + .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])? + .map(|r| BlockHash::from_slice(r.get(0)).unwrap())) +} + +/// Parse new transactions, return true if the database is up to date with the latest mined block +fn sync_chain( + rpc: &mut BtcRpc, + db: &mut Client, + config: &Config, + state: &WireState, + status: &mut bool, +) -> Result<bool, Box<dyn std::error::Error>> { + // Get stored last_hash + let last_hash = last_hash(db)?; + let min_confirmations = state.confirmation.load(Ordering::SeqCst); + + // Get a set of transactions ids to parse + let (txs, removed, lastblock): (HashMap<Txid, (Category, i32)>, HashSet<Txid>, BlockHash) = { + // Get all transactions made since this block + let list = rpc.list_since_block(last_hash.as_ref(), min_confirmations, true)?; + // Only keep ids and category + let txs = list + .transactions + .into_iter() + .map(|tx| (tx.txid, (tx.category, tx.confirmations))) + .collect(); + let removed = list + .removed + .into_iter() + .filter_map(|tx| (tx.category == Category::Receive).then(|| tx.txid)) + .collect(); + (txs, removed, list.lastblock) + }; + + // Check if a confirmed incoming transaction have been removed by a blockchain reorganisation + + let new_status = sync_chain_removed(&txs, &removed, rpc, db, min_confirmations as i32)?; + + if *status != new_status { + let mut tx = db.transaction()?; + tx.execute( + "UPDATE state SET value=$1 WHERE name='status'", + &[&[new_status as u8].as_ref()], + )?; + tx.execute("NOTIFY status", &[])?; + tx.commit()?; + *status = new_status; + } + + if !new_status { + return Ok(false); + } + for (id, (category, confirmations)) in txs { + match category { + Category::Send => sync_chain_outgoing(&id, confirmations, rpc, db, config)?, + Category::Receive if confirmations >= min_confirmations as i32 => { + sync_chain_incoming_confirmed(&id, rpc, db)? + } + Category::Receive if confirmations < 0 => { + panic!("receive conflict {} {}", id, confirmations) + } + _ => { + // Ignore coinbase and unconfirmed send transactions + } + } + } + + // Move last_hash forward + { + let nb_row = if let Some(hash) = &last_hash { + db.execute( + "UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2", + &[&lastblock.as_ref(), &hash.as_ref()], + )? + } else { + db.execute( + "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING", + &[&lastblock.as_ref()], + )? + }; + + if nb_row == 0 { + error!("watcher: hash state collision, database have been altered by another process"); + } + } + Ok(true) +} + +/// Sync database with removed transactions, return false if bitcoin backing is compromised +fn sync_chain_removed( + txs: &HashMap<Txid, (Category, i32)>, + removed: &HashSet<Txid>, + rpc: &mut BtcRpc, + db: &mut Client, + min_confirmations: i32, +) -> Result<bool, Box<dyn std::error::Error>> { + // Removed transactions are correctness issue in only two cases: + // - An incoming valid transaction considered confirmed in the database + // - An incoming invalid transactions already bounced + // Those two cases can compromise bitcoin backing + // Removed outgoing transactions will be retried automatically by the node + + let mut blocking_receive = Vec::new(); + let mut blocking_bounce = Vec::new(); + for id in removed { + match rpc.get_tx_segwit_key(id) { + Ok((full, key)) => { + // Valid tx are only problematic if not confirmed in the txs list and stored stored in the database + if txs + .get(id) + .map(|(_, confirmations)| *confirmations < min_confirmations) + .unwrap_or(true) + && db + .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_ref()])? + .is_some() + { + let debit_addr = sender_address(rpc, &full)?; + blocking_receive.push((key, id, debit_addr)); + } + } + Err(err) => match err { + GetSegwitErr::Decode(_) => { + // Invalid tx are only problematic if already bounced + if let Some(row) = db.query_opt( + "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL", + &[&id.as_ref()], + )? { + let txid = Txid::from_slice(row.get(0)).unwrap(); + blocking_bounce.push((txid, id)); + } else { + // Remove transaction from bounce table + db.execute("DELETE FROM bounce WHERE bounced=$1", &[&id.as_ref()])?; + } + } + GetSegwitErr::RPC(it) => return Err(it.into()), + }, + } + } + + if !blocking_bounce.is_empty() || !blocking_receive.is_empty() { + let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string(); + for (key, id, addr) in blocking_receive { + write!( + &mut buf, + "\n\treceived {} in {} from {}", + base32(&key), + id, + addr + ) + .unwrap(); + } + for (id, bounced) in blocking_bounce { + write!(&mut buf, "\n\tbounced {} in {}", id, bounced).unwrap(); + } + error!("{}", buf); + return Ok(false); + } else { + return Ok(true); + } +} + +/// Sync database with an outgoing transaction +fn sync_chain_outgoing( + id: &Txid, + confirmations: i32, + rpc: &mut BtcRpc, + db: &mut Client, + config: &Config, +) -> Result<(), Box<dyn std::error::Error>> { + match rpc + .get_tx_op_return(id) + .map(|(full, bytes)| (full, decode_info(&bytes))) + { + Ok((full, Ok(info))) => match info { + Info::Transaction { wtid, .. } => { + let credit_addr = full.details[0].address.as_ref().unwrap(); + let amount = btc_to_taler(&full.amount); + + if confirmations < 0 { + // Handle conflicting tx + let nb_row = db.execute( + "UPDATE tx_out SET status=$1, txid=NULL where txid=$2", + &[&(TxStatus::Delayed as i16), &id.as_ref()], + )?; + if nb_row > 0 { + warn!( + ">> (conflict) {} in {} to {}", + base32(&wtid), + id, + credit_addr + ); + } + } else { + // Get previous out tx + let row = db.query_opt( + "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE", + &[&wtid.as_ref()], + )?; + if let Some(row) = row { + // If already in database sync status + let _id: i32 = row.get(0); + let status: i16 = row.get(1); + match TxStatus::try_from(status as u8).unwrap() { + TxStatus::Requested | TxStatus::Delayed => { + let nb_row = db.execute( + "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3", + &[&(TxStatus::Sent as i16), &_id, &status], + )?; + if nb_row > 0 { + warn!( + ">> (recovered) {} {} in {} to {}", + amount, + base32(&wtid), + id, + credit_addr + ); + } + } + TxStatus::Sent => { /* Status is correct */ } + } + } else { + // Else add to database + let debit_addr = sender_address(rpc, &full)?; + let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); + let nb = db.execute( + "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", + &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &None::<&[u8]>], + )?; + if nb > 0 { + warn!( + ">> (onchain) {} {} in {} to {}", + amount, + base32(&wtid), + id, + credit_addr + ); + } + } + } + } + Info::Bounce { bounced } => { + if confirmations < 0 { + // Handle conflicting tx + let nb_row = db.execute( + "UPDATE bounce SET status=$1, txid=NULL where txid=$2", + &[&(BounceStatus::Delayed as i16), &id.as_ref()], + )?; + if nb_row > 0 { + warn!("|| (conflict) {} in {}", &bounced, &id); + } + } else { + // Get previous bounce + let row = db.query_opt( + "SELECT id, status FROM bounce WHERE bounced=$1", + &[&bounced.as_ref()], + )?; + if let Some(row) = row { + // If already in database sync status + let _id: i32 = row.get(0); + let status: i16 = row.get(1); + match BounceStatus::try_from(status as u8).unwrap() { + BounceStatus::Requested | BounceStatus::Delayed => { + let nb_row = db.execute( + "UPDATE bounce SET status=$1 WHERE id=$2 AND status=$3", + &[&(BounceStatus::Sent as i16), &_id, &status], + )?; + if nb_row > 0 { + warn!("|| (recovered) {} in {}", &bounced, &id); + } + } + BounceStatus::Ignored => error!( + "watcher: ignored bounce {} found in chain at {}", + bounced, id + ), + BounceStatus::Sent => { /* Status is correct */ } + } + } else { + // Else add to database + let nb = db.execute( + "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", + &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)], + )?; + if nb > 0 { + warn!("|| (onchain) {} in {}", &bounced, &id); + } + } + } + } + }, + Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e), + Err(e) => match e { + GetOpReturnErr::MissingOpReturn => { /* Ignore */ } + GetOpReturnErr::RPC(e) => return Err(e)?, + }, + } + Ok(()) +} + +/// Sync database with na incoming confirmed transaction +fn sync_chain_incoming_confirmed( + id: &Txid, + rpc: &mut BtcRpc, + db: &mut Client, +) -> Result<(), Box<dyn std::error::Error>> { + match rpc.get_tx_segwit_key(id) { + Ok((full, reserve_pub)) => { + // Store transactions in database + let debit_addr = sender_address(rpc, &full)?; + let credit_addr = full.details[0].address.as_ref().unwrap(); + let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); + let amount = btc_to_taler(&full.amount); + let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ + &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref() + ])?; + if nb > 0 { + info!( + "<< {} {} in {} from {}", + amount, + base32(&reserve_pub), + id, + debit_addr + ); + } + } + Err(err) => match err { + GetSegwitErr::Decode(_) => { + // If encoding is wrong request a bounce + db.execute( + "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", + &[&id.as_ref()], + )?; + } + GetSegwitErr::RPC(e) => return Err(e.into()), + }, + } + Ok(()) +} |