use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Network, Txid}; use btc_wire::{ config::{BitcoinConfig, WIRE_WALLET_NAME}, rpc::{self, BtcRpc, Category, ErrorCode}, rpc_utils::{default_data_dir, sender_address}, GetOpReturnErr, GetSegwitErr, }; use info::decode_info; use postgres::{fallible_iterator::FallibleIterator, Client}; use reconnect::{AutoReconnectRPC, AutoReconnectSql}; use std::{ collections::{HashMap, HashSet}, fmt::Write, str::FromStr, time::{Duration, SystemTime}, }; use taler_common::{ api_common::{base32, Amount}, config::Config, log::log::{error, info, warn}, }; use taler_util::btc_payto_url; use url::Url; use crate::{ fail_point::fail_point, info::{encode_info, Info}, status::{BounceStatus, TxStatus}, taler_util::{btc_payto_addr, btc_to_taler, taler_to_btc}, }; mod fail_point; mod info; mod reconnect; mod status; mod taler_util; /// Retrieve last stored hash fn last_hash(db: &mut Client) -> Result, postgres::Error> { Ok(db .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])? .map(|r| BlockHash::from_slice(r.get(0)).unwrap())) } /// Listen for new proposed transactions and announce them on the bitcoin network fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config) { /// Send a transaction on the blockchain, return true if more transactions with the same status remains fn send( db: &mut Client, rpc: &mut BtcRpc, status: TxStatus, ) -> Result> { assert!(status == TxStatus::Delayed || status == TxStatus::Requested); let mut tx = db.transaction()?; // We lock the row with FOR UPDATE to prevent sending same transaction multiple time let row = tx.query_opt( "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE", &[&(status as i16)], )?; if let Some(row) = &row { let id: i32 = row.get(0); let amount = taler_to_btc(&Amount::from_str(row.get(1))?)?; let wtid: &[u8] = row.get(2); let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?; let exchange_base_url: Url = Url::parse(row.get(4))?; let info = Info::Transaction { wtid: wtid.try_into()?, url: exchange_base_url, }; let metadata = encode_info(&info); fail_point("(injected) fail send_op_return", 0.2)?; match rpc.send_op_return(&addr, &amount, &metadata, false) { Ok(tx_id) => { fail_point("(injected) fail update db", 0.2)?; tx.execute( "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id], )?; tx.commit()?; let amount = btc_to_taler(&amount.to_signed().unwrap()); info!(">> {} {} in {} to {}", amount, base32(wtid), tx_id, addr); } Err(e) => { tx.execute( "UPDATE tx_out SET status=$1 WHERE id=$2", &[&(TxStatus::Delayed as i16), &id], )?; tx.commit()?; info!("sender: RPC - {}", e); } } } Ok(row.is_some()) } /// Bounce a transaction on the blockchain, return true if more bounce with the same status remains fn bounce( db: &mut Client, rpc: &mut BtcRpc, status: BounceStatus, fee: &BtcAmount, ) -> Result> { assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested); let mut tx = db.transaction()?; // We lock the row with FOR UPDATE to prevent sending same transaction multiple time let row = tx.query_opt( "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE", &[&(status as i16)], )?; if let Some(row) = &row { let id: i32 = row.get(0); let bounced: Txid = Txid::from_slice(row.get(1))?; let info = Info::Bounce { bounced }; let metadata = encode_info(&info); fail_point("(injected) fail bounce", 0.2)?; match rpc.bounce(&bounced, fee, &metadata) { Ok(it) => { tx.execute( "UPDATE bounce SET txid = $1, status = $2 WHERE id = $3", &[&it.as_ref(), &(BounceStatus::Sent as i16), &id], )?; tx.commit()?; info!("|| {} in {}", &bounced, &it); } Err(err) => match err { rpc::Error::RPC { code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, msg, } => { tx.execute( "UPDATE bounce SET status = $1 WHERE id = $2", &[&(BounceStatus::Ignored as i16), &id], )?; tx.commit()?; info!("|| (ignore) {} because {}", &bounced, msg); } _ => Err(err)?, }, } } Ok(row.is_some()) } // TODO check if transactions are abandoned let mut lifetime = config.btc_lifetime; let mut status = true; // Alway start with a sync work let mut skip_notification = true; loop { // Check lifetime if let Some(nb) = lifetime.as_mut() { if *nb == 0 { info!("Reach end of lifetime"); return; } else { *nb -= 1; } } // Connect let rpc = rpc.client(); let db = db.client(); let result: Result<(), Box> = (|| { // Listen to all channels db.batch_execute("LISTEN new_block; LISTEN new_tx")?; // Wait for the next notification { let mut ntf = db.notifications(); if !skip_notification && ntf.is_empty() { // Block until next notification ntf.blocking_iter().next()?; } // Conflate all notifications let mut iter = ntf.iter(); while iter.next()?.is_some() {} } // Sync chain sync_chain(rpc, db, config, &mut status)?; // As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent // Send delayed transactions while send(db, rpc, TxStatus::Delayed)? {} // Send requested transactions while send(db, rpc, TxStatus::Requested)? {} let bounce_fee = BtcAmount::from_sat(config.bounce_fee); // Send delayed bounce while bounce(db, rpc, BounceStatus::Delayed, &bounce_fee)? {} // Send requested bounce while bounce(db, rpc, BounceStatus::Requested, &bounce_fee)? {} Ok(()) })(); if let Err(e) = result { error!("worker: {}", e); // On failure retry without waiting for notifications skip_notification = true; } else { skip_notification = false; } } } /// Parse new transactions, return true if the database is up to date with the latest mined block fn sync_chain( rpc: &mut BtcRpc, db: &mut Client, config: &Config, status: &mut bool, ) -> Result> { // Get stored last_hash let last_hash = last_hash(db)?; let min_confirmations = config.confirmation; // Get a set of transactions ids to parse let (txs, removed, lastblock): (HashMap, HashSet, BlockHash) = { // Get all transactions made since this block let list = rpc.list_since_block(last_hash.as_ref(), min_confirmations, true)?; // Only keep ids and category let txs = list .transactions .into_iter() .map(|tx| (tx.txid, (tx.category, tx.confirmations))) .collect(); let removed = list .removed .into_iter() .filter_map(|tx| (tx.category == Category::Receive).then(|| tx.txid)) .collect(); (txs, removed, list.lastblock) }; // Check if a confirmed incoming transaction have been removed by a blockchain reorganisation let new_status = sync_chain_removed(&txs, &removed, rpc, db, min_confirmations as i32)?; if *status != new_status { let mut tx = db.transaction()?; tx.execute( "UPDATE state SET value=$1 WHERE name='status'", &[&[new_status as u8].as_ref()], )?; tx.execute("NOTIFY status", &[])?; tx.commit()?; *status = new_status; } if !new_status { return Ok(false); } for (id, (category, confirmations)) in txs { match category { Category::Send => sync_chain_outgoing(&id, confirmations, rpc, db, config)?, Category::Receive if confirmations >= min_confirmations as i32 => { sync_chain_incoming_confirmed(&id, rpc, db)? } Category::Receive if confirmations < 0 => { panic!("receive conflict {} {}", id, confirmations) } _ => { // Ignore coinbase and unconfirmed send transactions } } } // Move last_hash forward if no error have been caught { let curr_hash: Option = db .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])? .map(|r| BlockHash::from_slice(r.get(0)).unwrap()); let nb_row = if let Some(hash) = &curr_hash { db.execute( "UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2", &[&lastblock.as_ref(), &hash.as_ref()], )? } else { db.execute( "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING", &[&lastblock.as_ref()], )? }; if last_hash != curr_hash || nb_row == 0 { error!("watcher: hash state collision, database have been altered by another process"); } } Ok(true) } /// Sync database with removed transactions, return false if bitcoin backing is compromised fn sync_chain_removed( txs: &HashMap, removed: &HashSet, rpc: &mut BtcRpc, db: &mut Client, min_confirmations: i32, ) -> Result> { // Removed transactions are correctness issue in only two cases: // - An incoming valid transaction considered confirmed in the database // - An incoming invalid transactions already bounced // Those two cases can compromise bitcoin backing // Removed outgoing transactions will be retried automatically by the node let mut blocking_receive = Vec::new(); let mut blocking_bounce = Vec::new(); for id in removed { match rpc.get_tx_segwit_key(id) { Ok((full, key)) => { // Valid tx are only problematic if not confirmed in the txs list and stored stored in the database if txs .get(id) .map(|(_, confirmations)| *confirmations < min_confirmations) .unwrap_or(true) && db .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_ref()])? .is_some() { let debit_addr = sender_address(rpc, &full)?; blocking_receive.push((key, id, debit_addr)); } } Err(err) => match err { GetSegwitErr::Decode(_) => { // Invalid tx are only problematic if already bounced if let Some(row) = db.query_opt( "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL", &[&id.as_ref()], )? { let txid = Txid::from_slice(row.get(0)).unwrap(); blocking_bounce.push((txid, id)); } else { // Remove transaction from bounce table db.execute("DELETE FROM bounce WHERE bounced=$1", &[&id.as_ref()])?; } } GetSegwitErr::RPC(it) => return Err(it.into()), }, } } if !blocking_bounce.is_empty() || !blocking_receive.is_empty() { let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string(); for (key, id, addr) in blocking_receive { write!( &mut buf, "\n\treceived {} in {} from {}", base32(&key), id, addr ) .unwrap(); } for (id, bounced) in blocking_bounce { write!(&mut buf, "\n\tbounced {} in {}", id, bounced).unwrap(); } error!("{}", buf); return Ok(false); } else { return Ok(true); } } /// Sync database with an outgoing transaction fn sync_chain_outgoing( id: &Txid, confirmations: i32, rpc: &mut BtcRpc, db: &mut Client, config: &Config, ) -> Result<(), Box> { match rpc .get_tx_op_return(id) .map(|(full, bytes)| (full, decode_info(&bytes))) { Ok((full, Ok(info))) => match info { Info::Transaction { wtid, .. } => { let credit_addr = full.details[0].address.as_ref().unwrap(); let amount = btc_to_taler(&full.amount); if confirmations < 0 { // Handle conflicting tx let nb_row = db.execute( "UPDATE tx_out SET status=$1, txid=NULL where txid=$2", &[&(TxStatus::Delayed as i16), &id.as_ref()], )?; if nb_row > 0 { warn!( ">> (conflict) {} in {} to {}", base32(&wtid), id, credit_addr ); } } else { // Get previous out tx let row = db.query_opt( "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE", &[&wtid.as_ref()], )?; if let Some(row) = row { // If already in database sync status let _id: i32 = row.get(0); let status: i16 = row.get(1); match TxStatus::try_from(status as u8).unwrap() { TxStatus::Requested | TxStatus::Delayed => { let nb_row = db.execute( "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3", &[&(TxStatus::Sent as i16), &_id, &status], )?; if nb_row > 0 { warn!( ">> (recovered) {} {} in {} to {}", amount, base32(&wtid), id, credit_addr ); } } TxStatus::Sent => { /* Status is correct */ } } } else { // Else add to database let debit_addr = sender_address(rpc, &full)?; let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); let nb = db.execute( "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &None::<&[u8]>], )?; if nb > 0 { warn!( ">> (onchain) {} {} in {} to {}", amount, base32(&wtid), id, credit_addr ); } } } } Info::Bounce { bounced } => { if confirmations < 0 { // Handle conflicting tx let nb_row = db.execute( "UPDATE bounce SET status=$1, txid=NULL where txid=$2", &[&(BounceStatus::Delayed as i16), &id.as_ref()], )?; if nb_row > 0 { warn!("|| (conflict) {} in {}", &bounced, &id); } } else { // Get previous bounce let row = db.query_opt( "SELECT id, status FROM bounce WHERE bounced=$1", &[&bounced.as_ref()], )?; if let Some(row) = row { // If already in database sync status let _id: i32 = row.get(0); let status: i16 = row.get(1); match BounceStatus::try_from(status as u8).unwrap() { BounceStatus::Requested | BounceStatus::Delayed => { let nb_row = db.execute( "UPDATE bounce SET status=$1 WHERE id=$2 AND status=$3", &[&(BounceStatus::Sent as i16), &_id, &status], )?; if nb_row > 0 { warn!("|| (recovered) {} in {}", &bounced, &id); } } BounceStatus::Ignored => error!( "watcher: ignored bounce {} found in chain at {}", bounced, id ), BounceStatus::Sent => { /* Status is correct */ } } } else { // Else add to database let nb = db.execute( "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)], )?; if nb > 0 { warn!("|| (onchain) {} in {}", &bounced, &id); } } } } }, Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e), Err(e) => match e { GetOpReturnErr::MissingOpReturn => { /* Ignore */ } GetOpReturnErr::RPC(e) => return Err(e)?, }, } Ok(()) } /// Sync database with na incoming confirmed transaction fn sync_chain_incoming_confirmed( id: &Txid, rpc: &mut BtcRpc, db: &mut Client, ) -> Result<(), Box> { match rpc.get_tx_segwit_key(id) { Ok((full, reserve_pub)) => { // Store transactions in database let debit_addr = sender_address(rpc, &full)?; let credit_addr = full.details[0].address.as_ref().unwrap(); let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); let amount = btc_to_taler(&full.amount); let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref() ])?; if nb > 0 { info!( "<< {} {} in {} from {}", amount, base32(&reserve_pub), id, debit_addr ); } } Err(err) => match err { GetSegwitErr::Decode(_) => { // If encoding is wrong request a bounce db.execute( "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", &[&id.as_ref()], )?; } GetSegwitErr::RPC(e) => return Err(e.into()), }, } Ok(()) } /// Wait for new block and notify arrival with postgreSQL notifications fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) { loop { let rpc = rpc.client(); let db = db.client(); let result: Result<(), Box> = (|| { rpc.wait_for_new_block(0)?; db.execute("NOTIFY new_block", &[])?; Ok(()) })(); if let Err(e) = result { error!("listener: {}", e); } } } fn main() { taler_common::log::init(); let config = taler_common::config::Config::load_from_file( std::env::args_os().nth(1).expect("Missing conf path arg"), ); let data_dir = config .btc_data_dir .as_ref() .cloned() .unwrap_or_else(default_data_dir); let config: &'static Config = Box::leak(Box::new(config)); let btc_config = BitcoinConfig::load(&data_dir).unwrap(); #[cfg(feature = "fail")] if btc_config.network == Network::Regtest { taler_common::log::log::warn!("Running with random failures"); } else { taler_common::log::log::error!("Running with random failures is unsuitable for production"); std::process::exit(1); } let chain_name = match btc_config.network { Network::Bitcoin => "main", Network::Testnet => "test", Network::Signet => "signet", Network::Regtest => "regtest", }; info!("Running on {} chain", chain_name); let mut rpc = BtcRpc::common(&btc_config).unwrap(); rpc.load_wallet(WIRE_WALLET_NAME).ok(); let rpc_listener = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME); let rpc_worker = AutoReconnectRPC::new(btc_config, WIRE_WALLET_NAME); let db_listener = AutoReconnectSql::new(&config.db_url); let db_worker = AutoReconnectSql::new(&config.db_url); std::thread::spawn(move || block_listener(rpc_listener, db_listener)); worker(rpc_worker, db_worker, config); }