summaryrefslogtreecommitdiff
path: root/btc-wire/src/loops/worker.rs
diff options
context:
space:
mode:
Diffstat (limited to 'btc-wire/src/loops/worker.rs')
-rw-r--r--btc-wire/src/loops/worker.rs541
1 files changed, 541 insertions, 0 deletions
diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs
new file mode 100644
index 0000000..d8924de
--- /dev/null
+++ b/btc-wire/src/loops/worker.rs
@@ -0,0 +1,541 @@
+use std::{
+ collections::{HashMap, HashSet},
+ fmt::Write,
+ str::FromStr,
+ sync::atomic::Ordering,
+ time::{Duration, SystemTime},
+};
+
+use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Txid};
+use btc_wire::{
+ rpc::{self, BtcRpc, Category, ErrorCode},
+ rpc_utils::sender_address,
+ GetOpReturnErr, GetSegwitErr,
+};
+use postgres::{fallible_iterator::FallibleIterator, Client};
+use taler_common::{
+ api_common::{base32, Amount},
+ config::Config,
+ log::log::{error, info, warn},
+};
+use url::Url;
+
+use crate::{
+ fail_point::fail_point,
+ info::{decode_info, encode_info, Info},
+ reconnect::{AutoReconnectRPC, AutoReconnectSql},
+ status::{BounceStatus, TxStatus},
+ taler_util::{btc_payto_addr, btc_payto_url, btc_to_taler, taler_to_btc},
+ WireState,
+};
+
+/// Listen for new proposed transactions and announce them on the bitcoin network
+pub fn worker(
+ mut rpc: AutoReconnectRPC,
+ mut db: AutoReconnectSql,
+ config: &Config,
+ state: &WireState,
+) {
+ /// Send a transaction on the blockchain, return true if more transactions with the same status remains
+ fn send(
+ db: &mut Client,
+ rpc: &mut BtcRpc,
+ status: TxStatus,
+ ) -> Result<bool, Box<dyn std::error::Error>> {
+ assert!(status == TxStatus::Delayed || status == TxStatus::Requested);
+ let mut tx = db.transaction()?;
+ // We lock the row with FOR UPDATE to prevent sending same transaction multiple time
+ let row = tx.query_opt(
+ "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE",
+ &[&(status as i16)],
+ )?;
+ if let Some(row) = &row {
+ let id: i32 = row.get(0);
+ let amount = taler_to_btc(&Amount::from_str(row.get(1))?)?;
+ let wtid: &[u8] = row.get(2);
+ let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?;
+ let exchange_base_url: Url = Url::parse(row.get(4))?;
+ let info = Info::Transaction {
+ wtid: wtid.try_into()?,
+ url: exchange_base_url,
+ };
+ let metadata = encode_info(&info);
+
+ fail_point("(injected) fail send_op_return", 0.2)?;
+ match rpc.send_op_return(&addr, &amount, &metadata, false) {
+ Ok(tx_id) => {
+ fail_point("(injected) fail update db", 0.2)?;
+ tx.execute(
+ "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
+ &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id],
+ )?;
+ tx.commit()?;
+ let amount = btc_to_taler(&amount.to_signed().unwrap());
+ info!(">> {} {} in {} to {}", amount, base32(wtid), tx_id, addr);
+ }
+ Err(e) => {
+ tx.execute(
+ "UPDATE tx_out SET status=$1 WHERE id=$2",
+ &[&(TxStatus::Delayed as i16), &id],
+ )?;
+ tx.commit()?;
+ info!("sender: RPC - {}", e);
+ }
+ }
+ }
+ Ok(row.is_some())
+ }
+
+ /// Bounce a transaction on the blockchain, return true if more bounce with the same status remains
+ fn bounce(
+ db: &mut Client,
+ rpc: &mut BtcRpc,
+ status: BounceStatus,
+ fee: &BtcAmount,
+ ) -> Result<bool, Box<dyn std::error::Error>> {
+ assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested);
+ let mut tx = db.transaction()?;
+ // We lock the row with FOR UPDATE to prevent sending same transaction multiple time
+ let row = tx.query_opt(
+ "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE",
+ &[&(status as i16)],
+ )?;
+ if let Some(row) = &row {
+ let id: i32 = row.get(0);
+ let bounced: Txid = Txid::from_slice(row.get(1))?;
+ let info = Info::Bounce { bounced };
+ let metadata = encode_info(&info);
+
+ fail_point("(injected) fail bounce", 0.2)?;
+ match rpc.bounce(&bounced, fee, &metadata) {
+ Ok(it) => {
+ tx.execute(
+ "UPDATE bounce SET txid = $1, status = $2 WHERE id = $3",
+ &[&it.as_ref(), &(BounceStatus::Sent as i16), &id],
+ )?;
+ tx.commit()?;
+ info!("|| {} in {}", &bounced, &it);
+ }
+ Err(err) => match err {
+ rpc::Error::RPC {
+ code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError,
+ msg,
+ } => {
+ tx.execute(
+ "UPDATE bounce SET status = $1 WHERE id = $2",
+ &[&(BounceStatus::Ignored as i16), &id],
+ )?;
+ tx.commit()?;
+ info!("|| (ignore) {} because {}", &bounced, msg);
+ }
+ _ => Err(err)?,
+ },
+ }
+ }
+ Ok(row.is_some())
+ }
+
+ // TODO check if transactions are abandoned
+
+ let mut lifetime = config.btc_lifetime;
+ let mut status = true;
+
+ // Alway start with a sync work
+ let mut skip_notification = true;
+ loop {
+ // Check lifetime
+ if let Some(nb) = lifetime.as_mut() {
+ if *nb == 0 {
+ info!("Reach end of lifetime");
+ return;
+ } else {
+ *nb -= 1;
+ }
+ }
+
+ // Connect
+ let rpc = rpc.client();
+ let db = db.client();
+
+ let result: Result<(), Box<dyn std::error::Error>> = (|| {
+ // Listen to all channels
+ db.batch_execute("LISTEN new_block; LISTEN new_tx")?;
+ // Wait for the next notification
+ {
+ let mut ntf = db.notifications();
+ if !skip_notification && ntf.is_empty() {
+ // Block until next notification
+ ntf.blocking_iter().next()?;
+ }
+ // Conflate all notifications
+ let mut iter = ntf.iter();
+ while iter.next()?.is_some() {}
+ }
+ // Sync chain
+ sync_chain(rpc, db, config, state, &mut status)?;
+
+ // As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent
+
+ // Send delayed transactions
+ while send(db, rpc, TxStatus::Delayed)? {}
+ // Send requested transactions
+ while send(db, rpc, TxStatus::Requested)? {}
+
+ let bounce_fee = BtcAmount::from_sat(config.bounce_fee);
+ // Send delayed bounce
+ while bounce(db, rpc, BounceStatus::Delayed, &bounce_fee)? {}
+ // Send requested bounce
+ while bounce(db, rpc, BounceStatus::Requested, &bounce_fee)? {}
+
+ Ok(())
+ })();
+ if let Err(e) = result {
+ error!("worker: {}", e);
+ // On failure retry without waiting for notifications
+ skip_notification = true;
+ } else {
+ skip_notification = false;
+ }
+ }
+}
+
+/// Retrieve last stored hash
+fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> {
+ Ok(db
+ .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])?
+ .map(|r| BlockHash::from_slice(r.get(0)).unwrap()))
+}
+
+/// Parse new transactions, return true if the database is up to date with the latest mined block
+fn sync_chain(
+ rpc: &mut BtcRpc,
+ db: &mut Client,
+ config: &Config,
+ state: &WireState,
+ status: &mut bool,
+) -> Result<bool, Box<dyn std::error::Error>> {
+ // Get stored last_hash
+ let last_hash = last_hash(db)?;
+ let min_confirmations = state.confirmation.load(Ordering::SeqCst);
+
+ // Get a set of transactions ids to parse
+ let (txs, removed, lastblock): (HashMap<Txid, (Category, i32)>, HashSet<Txid>, BlockHash) = {
+ // Get all transactions made since this block
+ let list = rpc.list_since_block(last_hash.as_ref(), min_confirmations, true)?;
+ // Only keep ids and category
+ let txs = list
+ .transactions
+ .into_iter()
+ .map(|tx| (tx.txid, (tx.category, tx.confirmations)))
+ .collect();
+ let removed = list
+ .removed
+ .into_iter()
+ .filter_map(|tx| (tx.category == Category::Receive).then(|| tx.txid))
+ .collect();
+ (txs, removed, list.lastblock)
+ };
+
+ // Check if a confirmed incoming transaction have been removed by a blockchain reorganisation
+
+ let new_status = sync_chain_removed(&txs, &removed, rpc, db, min_confirmations as i32)?;
+
+ if *status != new_status {
+ let mut tx = db.transaction()?;
+ tx.execute(
+ "UPDATE state SET value=$1 WHERE name='status'",
+ &[&[new_status as u8].as_ref()],
+ )?;
+ tx.execute("NOTIFY status", &[])?;
+ tx.commit()?;
+ *status = new_status;
+ }
+
+ if !new_status {
+ return Ok(false);
+ }
+ for (id, (category, confirmations)) in txs {
+ match category {
+ Category::Send => sync_chain_outgoing(&id, confirmations, rpc, db, config)?,
+ Category::Receive if confirmations >= min_confirmations as i32 => {
+ sync_chain_incoming_confirmed(&id, rpc, db)?
+ }
+ Category::Receive if confirmations < 0 => {
+ panic!("receive conflict {} {}", id, confirmations)
+ }
+ _ => {
+ // Ignore coinbase and unconfirmed send transactions
+ }
+ }
+ }
+
+ // Move last_hash forward
+ {
+ let nb_row = if let Some(hash) = &last_hash {
+ db.execute(
+ "UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2",
+ &[&lastblock.as_ref(), &hash.as_ref()],
+ )?
+ } else {
+ db.execute(
+ "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING",
+ &[&lastblock.as_ref()],
+ )?
+ };
+
+ if nb_row == 0 {
+ error!("watcher: hash state collision, database have been altered by another process");
+ }
+ }
+ Ok(true)
+}
+
+/// Sync database with removed transactions, return false if bitcoin backing is compromised
+fn sync_chain_removed(
+ txs: &HashMap<Txid, (Category, i32)>,
+ removed: &HashSet<Txid>,
+ rpc: &mut BtcRpc,
+ db: &mut Client,
+ min_confirmations: i32,
+) -> Result<bool, Box<dyn std::error::Error>> {
+ // Removed transactions are correctness issue in only two cases:
+ // - An incoming valid transaction considered confirmed in the database
+ // - An incoming invalid transactions already bounced
+ // Those two cases can compromise bitcoin backing
+ // Removed outgoing transactions will be retried automatically by the node
+
+ let mut blocking_receive = Vec::new();
+ let mut blocking_bounce = Vec::new();
+ for id in removed {
+ match rpc.get_tx_segwit_key(id) {
+ Ok((full, key)) => {
+ // Valid tx are only problematic if not confirmed in the txs list and stored stored in the database
+ if txs
+ .get(id)
+ .map(|(_, confirmations)| *confirmations < min_confirmations)
+ .unwrap_or(true)
+ && db
+ .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_ref()])?
+ .is_some()
+ {
+ let debit_addr = sender_address(rpc, &full)?;
+ blocking_receive.push((key, id, debit_addr));
+ }
+ }
+ Err(err) => match err {
+ GetSegwitErr::Decode(_) => {
+ // Invalid tx are only problematic if already bounced
+ if let Some(row) = db.query_opt(
+ "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL",
+ &[&id.as_ref()],
+ )? {
+ let txid = Txid::from_slice(row.get(0)).unwrap();
+ blocking_bounce.push((txid, id));
+ } else {
+ // Remove transaction from bounce table
+ db.execute("DELETE FROM bounce WHERE bounced=$1", &[&id.as_ref()])?;
+ }
+ }
+ GetSegwitErr::RPC(it) => return Err(it.into()),
+ },
+ }
+ }
+
+ if !blocking_bounce.is_empty() || !blocking_receive.is_empty() {
+ let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string();
+ for (key, id, addr) in blocking_receive {
+ write!(
+ &mut buf,
+ "\n\treceived {} in {} from {}",
+ base32(&key),
+ id,
+ addr
+ )
+ .unwrap();
+ }
+ for (id, bounced) in blocking_bounce {
+ write!(&mut buf, "\n\tbounced {} in {}", id, bounced).unwrap();
+ }
+ error!("{}", buf);
+ return Ok(false);
+ } else {
+ return Ok(true);
+ }
+}
+
+/// Sync database with an outgoing transaction
+fn sync_chain_outgoing(
+ id: &Txid,
+ confirmations: i32,
+ rpc: &mut BtcRpc,
+ db: &mut Client,
+ config: &Config,
+) -> Result<(), Box<dyn std::error::Error>> {
+ match rpc
+ .get_tx_op_return(id)
+ .map(|(full, bytes)| (full, decode_info(&bytes)))
+ {
+ Ok((full, Ok(info))) => match info {
+ Info::Transaction { wtid, .. } => {
+ let credit_addr = full.details[0].address.as_ref().unwrap();
+ let amount = btc_to_taler(&full.amount);
+
+ if confirmations < 0 {
+ // Handle conflicting tx
+ let nb_row = db.execute(
+ "UPDATE tx_out SET status=$1, txid=NULL where txid=$2",
+ &[&(TxStatus::Delayed as i16), &id.as_ref()],
+ )?;
+ if nb_row > 0 {
+ warn!(
+ ">> (conflict) {} in {} to {}",
+ base32(&wtid),
+ id,
+ credit_addr
+ );
+ }
+ } else {
+ // Get previous out tx
+ let row = db.query_opt(
+ "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE",
+ &[&wtid.as_ref()],
+ )?;
+ if let Some(row) = row {
+ // If already in database sync status
+ let _id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ match TxStatus::try_from(status as u8).unwrap() {
+ TxStatus::Requested | TxStatus::Delayed => {
+ let nb_row = db.execute(
+ "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3",
+ &[&(TxStatus::Sent as i16), &_id, &status],
+ )?;
+ if nb_row > 0 {
+ warn!(
+ ">> (recovered) {} {} in {} to {}",
+ amount,
+ base32(&wtid),
+ id,
+ credit_addr
+ );
+ }
+ }
+ TxStatus::Sent => { /* Status is correct */ }
+ }
+ } else {
+ // Else add to database
+ let debit_addr = sender_address(rpc, &full)?;
+ let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
+ let nb = db.execute(
+ "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
+ &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &None::<&[u8]>],
+ )?;
+ if nb > 0 {
+ warn!(
+ ">> (onchain) {} {} in {} to {}",
+ amount,
+ base32(&wtid),
+ id,
+ credit_addr
+ );
+ }
+ }
+ }
+ }
+ Info::Bounce { bounced } => {
+ if confirmations < 0 {
+ // Handle conflicting tx
+ let nb_row = db.execute(
+ "UPDATE bounce SET status=$1, txid=NULL where txid=$2",
+ &[&(BounceStatus::Delayed as i16), &id.as_ref()],
+ )?;
+ if nb_row > 0 {
+ warn!("|| (conflict) {} in {}", &bounced, &id);
+ }
+ } else {
+ // Get previous bounce
+ let row = db.query_opt(
+ "SELECT id, status FROM bounce WHERE bounced=$1",
+ &[&bounced.as_ref()],
+ )?;
+ if let Some(row) = row {
+ // If already in database sync status
+ let _id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ match BounceStatus::try_from(status as u8).unwrap() {
+ BounceStatus::Requested | BounceStatus::Delayed => {
+ let nb_row = db.execute(
+ "UPDATE bounce SET status=$1 WHERE id=$2 AND status=$3",
+ &[&(BounceStatus::Sent as i16), &_id, &status],
+ )?;
+ if nb_row > 0 {
+ warn!("|| (recovered) {} in {}", &bounced, &id);
+ }
+ }
+ BounceStatus::Ignored => error!(
+ "watcher: ignored bounce {} found in chain at {}",
+ bounced, id
+ ),
+ BounceStatus::Sent => { /* Status is correct */ }
+ }
+ } else {
+ // Else add to database
+ let nb = db.execute(
+ "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING",
+ &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)],
+ )?;
+ if nb > 0 {
+ warn!("|| (onchain) {} in {}", &bounced, &id);
+ }
+ }
+ }
+ }
+ },
+ Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e),
+ Err(e) => match e {
+ GetOpReturnErr::MissingOpReturn => { /* Ignore */ }
+ GetOpReturnErr::RPC(e) => return Err(e)?,
+ },
+ }
+ Ok(())
+}
+
+/// Sync database with na incoming confirmed transaction
+fn sync_chain_incoming_confirmed(
+ id: &Txid,
+ rpc: &mut BtcRpc,
+ db: &mut Client,
+) -> Result<(), Box<dyn std::error::Error>> {
+ match rpc.get_tx_segwit_key(id) {
+ Ok((full, reserve_pub)) => {
+ // Store transactions in database
+ let debit_addr = sender_address(rpc, &full)?;
+ let credit_addr = full.details[0].address.as_ref().unwrap();
+ let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
+ let amount = btc_to_taler(&full.amount);
+ let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
+ &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref()
+ ])?;
+ if nb > 0 {
+ info!(
+ "<< {} {} in {} from {}",
+ amount,
+ base32(&reserve_pub),
+ id,
+ debit_addr
+ );
+ }
+ }
+ Err(err) => match err {
+ GetSegwitErr::Decode(_) => {
+ // If encoding is wrong request a bounce
+ db.execute(
+ "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING",
+ &[&id.as_ref()],
+ )?;
+ }
+ GetSegwitErr::RPC(e) => return Err(e.into()),
+ },
+ }
+ Ok(())
+}