use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, SignedAmount, Txid}; use btc_wire::{ config::BitcoinConfig, rpc::{self, BtcRpc, Category, ErrorCode}, rpc_utils::{default_data_dir, sender_address}, segwit::DecodeSegWitErr, GetOpReturnErr, GetSegwitErr, }; use info::decode_info; use postgres::{fallible_iterator::FallibleIterator, Client}; use rand::{rngs::OsRng, RngCore}; use reconnect::{AutoReconnectRPC, AutoReconnectSql}; use std::{ collections::{HashMap, HashSet}, path::PathBuf, process::exit, str::FromStr, time::{Duration, SystemTime}, }; use taler_api::api_common::{base32, Amount}; use taler_config::Config; use taler_log::log::{error, info, warn}; use url::Url; use crate::{ fail_point::fail_point, info::{encode_info, Info}, status::{BounceStatus, TxStatus}, }; mod fail_point; mod info; mod reconnect; mod status; fn btc_payto_url(addr: &Address) -> Url { Url::from_str(&format!("payto://bitcoin/{}", addr)).unwrap() } fn btc_payto_addr(url: &Url) -> Result { if url.domain() != Some("bitcoin") { return Err("".to_string()); } let str = url.path().trim_start_matches('/'); return Address::from_str(str).map_err(|_| "".to_string()); } fn btc_amount_to_taler_amount(amount: &SignedAmount) -> Amount { let unsigned = amount.abs().to_unsigned().unwrap(); let sat = unsigned.as_sat(); return Amount::new("BTC", sat / 100_000_000, (sat % 100_000_000) as u32); } fn taler_amount_to_btc_amount(amount: &Amount) -> Result { if amount.currency != "BTC" { return Err("Wrong currency".to_string()); } let sat = amount.value * 100_000_000 + amount.fraction as u64; return Ok(BtcAmount::from_sat(sat)); } fn last_hash(db: &mut Client) -> Result, postgres::Error> { Ok(db .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])? .map(|r| BlockHash::from_slice(r.get(0)).unwrap())) } /// Listen for new proposed transactions and announce them on the bitcoin network fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config) { // Send a transaction on the blockchain, return true if more transactions with the same status remains fn send( db: &mut Client, rpc: &mut BtcRpc, status: TxStatus, ) -> Result> { assert!(status == TxStatus::Delayed || status == TxStatus::Requested); let mut tx = db.transaction()?; // We lock the row with FOR UPDATE to prevent sending same transaction multiple time let row = tx.query_opt( "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE", &[&(status as i16)], )?; if let Some(row) = &row { let id: i32 = row.get(0); let amount = taler_amount_to_btc_amount(&Amount::from_str(row.get(1))?)?; let wtid: &[u8] = row.get(2); let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?; let exchange_base_url: Url = Url::parse(row.get(4))?; let info = Info::Transaction { wtid: wtid.try_into()?, url: exchange_base_url, }; let metadata = encode_info(&info); fail_point("Skip send_op_return", 0.2)?; match rpc.send_op_return(&addr, &amount, &metadata, false) { Ok(tx_id) => { fail_point("Fail update db", 0.2)?; tx.execute( "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id], )?; let amount = btc_amount_to_taler_amount(&amount.to_signed().unwrap()); info!(">> {} {} in {} to {}", amount, base32(&wtid), tx_id, addr); } Err(e) => { info!("sender: RPC - {}", e); tx.execute( "UPDATE tx_out SET status=$1 WHERE id=$2", &[&(TxStatus::Delayed as i16), &id], )?; } } tx.commit()?; } Ok(row.is_some()) } // Bounce a transaction on the blockchain, return true if more bounce with the same status remains fn bounce( db: &mut Client, rpc: &mut BtcRpc, status: BounceStatus, fee: &BtcAmount, ) -> Result> { assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested); let mut tx = db.transaction()?; // We lock the row with FOR UPDATE to prevent sending same transaction multiple time let row = tx.query_opt( "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE", &[&(status as i16)], )?; if let Some(row) = &row { let id: i32 = row.get(0); let bounced: Txid = Txid::from_slice(row.get(1))?; let info = Info::Bounce { bounced }; let metadata = encode_info(&info); fail_point("Skip send_op_return", 0.2)?; match rpc.bounce(&bounced, &fee, &metadata) { Ok(it) => { info!("|| {} in {}", &bounced, &it); tx.execute( "UPDATE bounce SET txid = $1, status = $2 WHERE id = $3", &[&it.as_ref(), &(BounceStatus::Sent as i16), &id], )?; } Err(err) => match err { rpc::Error::RPC { code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, msg, } => { info!("|| (ignore) {} because {}", &bounced, msg); tx.execute( "UPDATE bounce SET status = $1 WHERE id = $2", &[&(BounceStatus::Ignored as i16), &id], )?; } _ => Err(err)?, }, } tx.commit()?; } Ok(row.is_some()) } // TODO check if transactions are abandoned // Alway start with a sync work let mut skip_notification = true; loop { let rpc = rpc.client(); let db = db.client(); let result: Result<(), Box> = (|| { // Listen to all channels db.batch_execute("LISTEN new_block; LISTEN new_tx")?; // Wait for the next notification { let mut ntf = db.notifications(); if !skip_notification && ntf.is_empty() { // Block until next notification ntf.blocking_iter().next()?; } // Conflate all notifications let mut iter = ntf.iter(); while let Some(_) = iter.next()? {} } // Sync chain sync_chain(rpc, db, config)?; // As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent // Send delayed transactions while send(db, rpc, TxStatus::Delayed)? {} // Send requested transactions while send(db, rpc, TxStatus::Requested)? {} let bounce_fee = BtcAmount::from_sat(config.bounce_fee); // Send delayed bounce while bounce(db, rpc, BounceStatus::Delayed, &bounce_fee)? {} // Send requested bounce while bounce(db, rpc, BounceStatus::Requested, &bounce_fee)? {} Ok(()) })(); if let Err(e) = result { error!("worker: DB - {}", e); // On failure retry without waiting for notifications skip_notification = true; } else { skip_notification = false; } } } /// Parse new transactions, if exit whiteout failing the database is up to date with the latest mined block fn sync_chain( rpc: &mut BtcRpc, db: &mut Client, config: &Config, ) -> Result<(), Box> { // Get stored last_hash let last_hash = last_hash(db)?; let min_confirmations = config.confirmation; // Get a set of transactions ids to parse let (txs, removed, lastblock): (HashMap, HashSet, BlockHash) = { // Get all transactions made since this block let list = rpc.list_since_block(last_hash.as_ref(), min_confirmations, true)?; // Only keep ids and category let txs = list .transactions .into_iter() .map(|tx| (tx.txid, (tx.category, tx.confirmations))) .collect(); let removed = list .removed .into_iter() .filter_map(|tx| (tx.category == Category::Receive).then(|| tx.txid)) .collect(); (txs, removed, list.lastblock) }; // Check if a confirmed incoming transaction have been removed by a blockchain reorganisation if !removed.is_empty() { for id in removed { if let Ok((full, key)) = rpc.get_tx_segwit_key(&id) { // If the removed tx is not in confirmed the txs list and the tx is stored in the database hard error if txs .get(&id) .map(|(_, confirmations)| *confirmations < min_confirmations as i32) .unwrap_or(true) && db .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_ref()])? .is_some() { let credit_addr = full.details[0].address.as_ref().unwrap(); error!("Received transaction {} in {} from {} have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear", base32(&key), id, credit_addr); exit(1); } } } } for (id, (category, confirmations)) in txs { match category { Category::Send => { match rpc.get_tx_op_return(&id) { Ok((full, bytes)) => { let mut tx = db.transaction()?; match decode_info(&bytes) { Ok(info) => { match info { Info::Transaction { wtid, .. } => { let addr = full.details[0].address.as_ref().unwrap(); let amount = btc_amount_to_taler_amount(&full.amount); let row = tx.query_opt( "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE", &[&wtid.as_ref()], )?; if let Some(row) = row { let _id: i32 = row.get(0); let status: i16 = row.get(1); match TxStatus::try_from(status as u8).unwrap() { TxStatus::Requested | TxStatus::Delayed => { tx.execute( "UPDATE tx_out SET status=$1 where id=$2", &[&(TxStatus::Sent as i16), &_id], )?; warn!( ">> (recovered) {} {} in {} to {}", amount, base32(&wtid), id, addr ); } TxStatus::Sent => {} } } else { let debit_addr = sender_address(rpc, &full)?; let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); // Generate a random request_uid let mut request_uid = [0; 64]; OsRng.fill_bytes(&mut request_uid); let nb = tx.execute( "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &request_uid.as_ref()], )?; if nb > 0 { warn!( ">> (onchain) {} {} in {} to {}", amount, base32(&wtid), id, addr ); } } } Info::Bounce { bounced } => { let row = tx.query_opt( "SELECT id, status FROM bounce WHERE bounced=$1 FOR UPDATE", &[&bounced.as_ref()], )?; if let Some(row) = row { let _id: i32 = row.get(0); let status: i16 = row.get(1); match BounceStatus::try_from(status as u8).unwrap() { BounceStatus::Requested | BounceStatus::Delayed => { tx.execute( "UPDATE bounce SET status=$1 where id=$2", &[&(BounceStatus::Sent as i16), &_id], )?; warn!("|| (recovered) {} in {}", &bounced, &id); } BounceStatus::Ignored => error!("watcher: ignored bounce {} found in chain at {}", bounced, id), BounceStatus::Sent => {} } } else { let nb = tx.execute( "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)], )?; if nb > 0 { warn!("|| (onchain) {} in {}", &bounced, &id); } } } } } Err(err) => warn!("send: decode-info {} - {}", id, err), } tx.commit()?; } Err(err) => match err { GetOpReturnErr::MissingOpReturn => {} // ignore err => warn!("send: {} {}", id, err), }, } } Category::Receive if confirmations >= min_confirmations as i32 => { match rpc.get_tx_segwit_key(&id) { Ok((full, reserve_pub)) => { let debit_addr = sender_address(rpc, &full)?; let credit_addr = full.details[0].address.as_ref().unwrap(); let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); let amount = btc_amount_to_taler_amount(&full.amount); let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref() ])?; if nb > 0 { info!( "<< {} {} in {} from {}", amount, base32(&reserve_pub), id, debit_addr ); } } Err(err) => match err { GetSegwitErr::Decode( DecodeSegWitErr::MissingSegWitAddress | DecodeSegWitErr::NoMagicIdMatch, ) => { // Request a bounce db.execute("INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", &[&id.as_ref()])?; } err => warn!("receive: {} {}", id, err), }, } } _ => { // Ignore coinbase and unconfirmed send transactions } } } // Move last_hash forward if no error have been caught { let mut tx = db.transaction()?; let curr_hash: Option = tx .query_opt( "SELECT value FROM state WHERE name='last_hash' FOR UPDATE", &[], )? .map(|r| BlockHash::from_slice(r.get(0)).unwrap()); if last_hash != curr_hash { error!("watcher: hash state collision, database have been altered by another process"); } if curr_hash.is_some() { tx.execute( "UPDATE state SET value=$1 WHERE name='last_hash'", &[&lastblock.as_ref()], )?; } else { tx.execute( "INSERT INTO state (name, value) VALUES ('last_hash', $1)", &[&lastblock.as_ref()], )?; }; tx.commit()?; } Ok(()) } fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) { loop { let result: Result<(), Box> = (|| { let rpc = rpc.client(); let db = db.client(); rpc.wait_for_new_block(0).ok(); db.execute("NOTIFY new_block", &[])?; Ok(()) })(); if let Err(e) = result { error!("listener: DB - {}", e); } } } fn main() { taler_log::init(); #[cfg(feature = "fail")] taler_log::log::warn!("Running with random failures is unsuitable for production"); // Guess network by trying to connect to a JSON RPC server let data_dir = std::env::args() .nth(1) .map(|str| PathBuf::from_str(&str).unwrap()) .unwrap_or_else(default_data_dir); let config = taler_config::Config::from_path("test.conf"); let config: &'static Config = Box::leak(Box::new(config)); let btc_config = BitcoinConfig::load(&data_dir).unwrap(); let mut rpc = BtcRpc::common(&btc_config).unwrap(); rpc.load_wallet(&config.btc_wallet).ok(); let rpc_listener = AutoReconnectRPC::new( btc_config.clone(), &config.btc_wallet, Duration::from_secs(5), ); let rpc_worker = AutoReconnectRPC::new(btc_config, &config.btc_wallet, Duration::from_secs(5)); let db_listener = AutoReconnectSql::new(&config.db_url, Duration::from_secs(5)); let db_worker = AutoReconnectSql::new(&config.db_url, Duration::from_secs(5)); std::thread::spawn(move || block_listener(rpc_listener, db_listener)); worker(rpc_worker, db_worker, config); }