use bitcoincore_rpc::{ bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Txid}, json::GetTransactionResultDetailCategory as Category, Client as RPC, RpcApi, }; use btc_wire::{ rpc_utils::{ common_rpc, default_data_dir, dirty_guess_network, sender_address, wallet_rpc, WIRE, }, segwit::DecodeSegWitErr, ClientExtended, GetOpReturnErr, GetSegwitErr, }; use configparser::ini::Ini; use postgres::{fallible_iterator::FallibleIterator, Client, IsolationLevel, NoTls, Transaction}; use rand::{rngs::OsRng, RngCore}; use std::{ collections::HashMap, path::{Path, PathBuf}, process::exit, str::FromStr, time::{Duration, SystemTime}, }; use taler_log::log::{error, info, warn}; use url::Url; use wire_gateway::api_common::{crockford_base32_encode, Amount}; #[repr(u8)] #[derive(Debug, Clone, Copy, PartialEq, Eq)] enum Status { /// Client have ask for a transaction Proposed = 0, /// Transaction have been announced to the bitcoin network Pending = 1, /// Transaction have been mined Confirmed = 2, /// The wire cannot failed to send this transaction and will try latter Delayed = 3, } impl TryFrom for Status { type Error = (); fn try_from(v: u8) -> Result { match v { x if x == Status::Proposed as u8 => Ok(Status::Proposed), x if x == Status::Pending as u8 => Ok(Status::Pending), x if x == Status::Confirmed as u8 => Ok(Status::Confirmed), x if x == Status::Delayed as u8 => Ok(Status::Delayed), _ => Err(()), } } } fn btc_payto_url(addr: &Address) -> Url { Url::from_str(&format!("payto://bitcoin/{}", addr.to_string())).unwrap() } fn btc_payto_addr(url: &Url) -> Result { if url.domain() != Some("bitcoin") { return Err("".to_string()); } let str = url.path().trim_start_matches('/'); return Ok(Address::from_str(str).map_err(|_| "".to_string())?); } fn btc_amount_to_taler_amount(amount: &BtcAmount) -> Amount { let sat = amount.as_sat(); return Amount::new("BTC", sat / 100_000_000, (sat % 100_000_000) as u32); } fn taler_amount_to_btc_amount(amount: &Amount) -> Result { if amount.currency != "BTC" { return Err("Wrong currency".to_string()); } let sat = amount.value * 100_000_000 + amount.fraction as u64; return Ok(BtcAmount::from_sat(sat)); } fn encode_info(wtid: &[u8; 32], url: &Url) -> Vec { let mut buffer = Vec::new(); buffer.extend_from_slice(wtid); let parts = format!("{}{}", url.domain().unwrap_or(""), url.path()); let packed = uri_pack::pack_uri(&parts).unwrap(); buffer.push((url.scheme() == "http:") as u8); buffer.extend_from_slice(&packed); return buffer; } fn decode_info(bytes: &[u8]) -> ([u8; 32], Url) { let mut packed = uri_pack::unpack_uri(&bytes[33..]).unwrap(); packed.insert_str(0, "://"); if bytes[32] != 0 { packed.insert(0, 's'); } packed.insert_str(0, "http"); let url = Url::parse(&packed).unwrap(); return (bytes[..32].try_into().unwrap(), url); } #[cfg(test)] mod test { use btc_wire::test::rand_key; use url::Url; use crate::{decode_info, encode_info}; #[test] fn decode_encode_info() { let key = rand_key(); let urls = [ "https://git.taler.net/", "https://git.taler.net/depolymerization.git/", ]; for url in urls { let url = Url::parse(url).unwrap(); let encode = encode_info(&key, &url); let decode = decode_info(&encode); assert_eq!(key, decode.0); assert_eq!(url, decode.1); } } } /// Listen for new proposed transactions and announce them on the bitcoin network fn sender(rpc: RPC, mut db: AutoReloadDb) { fn get_proposed( db: &mut Transaction, ) -> Result)>, Box> { let mut iter = db.query_raw( "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 OR status=$2", &[&(Status::Proposed as i16), &(Status::Delayed as i16)], )?; let mut out = Vec::new(); while let Some(row) = iter.next()? { let id: i32 = row.get(0); let amount: Amount = Amount::from_str(row.get(1))?; let reserve_pub: &[u8] = row.get(2); let credit_addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?; let exchange_base_url: Url = Url::parse(row.get(4))?; let metadata = encode_info(reserve_pub.try_into()?, &exchange_base_url); out.push(( id, taler_amount_to_btc_amount(&amount)?, credit_addr, metadata, )); } return Ok(out); } // TODO check if transactions are abandoned loop { let db = db.client(); let result: Result<(), Box> = (|| { // We should be the only one to interact with the database but we enforce it let mut tx = db .build_transaction() .isolation_level(IsolationLevel::Serializable) .start()?; for (id, amount, addr, metadata) in get_proposed(&mut tx)? { tx.execute( "UPDATE tx_out SET status=$1 WHERE id=$2", &[&(Status::Delayed as i16), &id], )?; match rpc.send_op_return(&addr, amount, &metadata) { Ok(txid) => { tx.execute( "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", &[&(Status::Pending as i16), &txid.as_ref(), &id], )?; info!("{} PENDING", txid); } Err(e) => info!("sender: RPC - {}", e), } } tx.commit()?; Ok(()) })(); if let Err(e) = result { error!("sender: DB - {}", e); } std::thread::sleep(Duration::from_millis(rand::random::() as u64)); } } struct AutoReloadDb { delay: Duration, config: String, client: Client, } impl AutoReloadDb { pub fn new(config: impl Into, delay: Duration) -> Self { let config: String = config.into(); Self { client: Self::connect(&config, delay), config, delay, } } /// Connect a new client, loop on error fn connect(config: &str, delay: Duration) -> Client { loop { match Client::connect(config, NoTls) { Ok(new) => return new, Err(err) => { error!("connect: DB - {}", err); std::thread::sleep(delay); } } } } pub fn client(&mut self) -> &mut Client { if self.client.is_valid(self.delay).is_err() { self.client = Self::connect(&self.config, self.delay); } &mut self.client } } /// Listen for mined block and index confirmed transactions into the database fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) { let confirmation = 1; loop { let db = db.client(); let result: Result<(), Box> = (|| { // We should be the only one to interact with the database but we enforce it let mut tx = db .build_transaction() .isolation_level(IsolationLevel::Serializable) .start()?; let last_hash: Option = tx .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])? .map(|r| BlockHash::from_slice(r.get(0)).unwrap()); let list = rpc.list_since_block(last_hash.as_ref(), Some(confirmation), None, Some(true))?; // List all confirmed send and receive transactions since last check let txs: HashMap = list .transactions .into_iter() .filter_map(|tx| { let cat = tx.detail.category; (tx.info.confirmations >= confirmation as i32 && (cat == Category::Send || cat == Category::Receive)) .then(|| (tx.info.txid, cat)) }) .collect(); for (id, category) in txs { match category { Category::Send => match rpc.get_tx_op_return(&id) { Ok((full, bytes)) => { let (wtid, url) = decode_info(&bytes); let row = tx.query_opt( "SELECT status, wtid, id FROM tx_out WHERE txid=$1", &[&id.as_ref()], )?; if let Some(row) = row { let status: i16 = row.get(0); let _wtid: &[u8] = row.get(1); let _id: i32 = row.get(2); if &wtid != _wtid { warn!("watcher: state tx {} have uncompatible wtid in DB {} and on chain {}", id, crockford_base32_encode(&wtid), crockford_base32_encode(&_wtid)); exit(1); } let status: Status = Status::try_from(status as u8).unwrap(); if status != Status::Confirmed { tx.execute( "UPDATE tx_out SET status=$1 where id=$2", &[&(Status::Confirmed as i16), &_id], )?; if status == Status::Proposed { warn!("watcher: tx {} is present on chain at {} while being in proposed status", _id, id); } else { info!("{} CONFIRMED", &id); } } } else { let debit_addr = sender_address(&rpc, &full)?; let credit_addr = full.tx.details[0].address.as_ref().unwrap(); let time = full.tx.info.blocktime.unwrap(); let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time); let amount = btc_amount_to_taler_amount( &full.tx.amount.abs().to_unsigned()?, ); // Generate a random request_uid let mut request_uid = [0; 64]; OsRng.fill_bytes(&mut request_uid); tx.execute( "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string(), &config.base_url.to_string(), &(Status::Confirmed as i16), &id.as_ref(), &request_uid.as_ref() ], )?; warn!("watcher: found an unregistered outgoing address {} {} in tx {}", crockford_base32_encode(&wtid), &url, id); } } Err(err) => match err { GetOpReturnErr::MissingOpReturn => {} // ignore err => warn!("send: {} {}", id, err), }, }, Category::Receive => match rpc.get_tx_segwit_key(&id) { Ok((full, reserve_pub)) => { let debit_addr = sender_address(&rpc, &full)?; let credit_addr = full.tx.details[0].address.as_ref().unwrap(); let time = full.tx.info.blocktime.unwrap(); let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time); let amount = btc_amount_to_taler_amount(&full.tx.amount.to_unsigned()?); tx.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5)", &[ &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string() ])?; info!("{} << {} {}", &debit_addr, &credit_addr, &amount); } Err(err) => match err { GetSegwitErr::Decode( DecodeSegWitErr::MissingSegWitAddress | DecodeSegWitErr::NoMagicIdMatch, ) => {} err => warn!("receive: {} {}", id, err), }, }, Category::Generate | Category::Immature | Category::Orphan => {} } } let query = if let Some(_) = last_hash { "UPDATE state SET value=$1 WHERE name='last_hash'" } else { "INSERT INTO state (name, value) VALUES ('last_hash', $1)" }; tx.execute(query, &[&list.lastblock.as_ref()])?; tx.commit()?; Ok(()) })(); if let Err(e) = result { error!("watcher: DB - {}", e); } info!("watcher: Wait for block"); rpc.wait_for_new_block(0).ok(); } } #[derive(Debug, Clone)] struct Config { base_url: Url, db_url: String, port: u16, payto: Url, address: String, } impl Config { pub fn from_path(path: impl AsRef) -> Self { let string = std::fs::read_to_string(path).unwrap(); let mut conf = Ini::new(); conf.read(string).unwrap(); Self { base_url: Url::parse(&conf.get("main", "BASE_URL").unwrap()).unwrap(), db_url: conf.get("main", "DB_URL").unwrap(), port: conf.get("main", "PORT").unwrap().parse().unwrap(), payto: Url::parse(&conf.get("main", "PAYTO").unwrap()).unwrap(), address: conf.get("main", "ADDRESS").unwrap(), } } } fn main() { taler_log::init(); // Guess network by trying to connect to a JSON RPC server let data_dir = std::env::args() .skip(1) .next() .map(|str| PathBuf::from_str(&str).unwrap()) .unwrap_or(default_data_dir()); let config = Config::from_path("test.conf"); let network = dirty_guess_network(&data_dir); let rpc = common_rpc(&data_dir, network).unwrap(); rpc.load_wallet(&WIRE).ok(); let rpc_watcher = wallet_rpc(&data_dir, network, "wire"); let rpc_sender = wallet_rpc(&data_dir, network, "wire"); let db_watcher = AutoReloadDb::new(&config.db_url, Duration::from_secs(5)); let db_sender = AutoReloadDb::new(&config.db_url, Duration::from_secs(5)); let join = std::thread::spawn(move || sender(rpc_sender, db_sender)); watcher(rpc_watcher, db_watcher, &config); join.join().unwrap(); }