/* This file is part of TALER Copyright (C) 2022 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see */ use std::{ collections::HashMap, fmt::Write, time::{Duration, SystemTime}, }; use bitcoin::{hashes::Hash, Amount as BtcAmount, BlockHash, Txid}; use btc_wire::{ rpc::{self, AutoRpcWallet, Category, ErrorCode, Rpc, Transaction}, rpc_utils::sender_address, taler_utils::{btc_payto_url, btc_to_taler}, GetOpReturnErr, GetSegwitErr, }; use common::{ api_common::base32, log::{ log::{error, info, warn}, OrFail, }, metadata::OutMetadata, postgres, reconnect::AutoReconnectDb, sql::{sql_array, sql_url}, status::{BounceStatus, DebitStatus}, }; use postgres::{fallible_iterator::FallibleIterator, Client}; use crate::{ fail_point::fail_point, sql::{sql_addr, sql_btc_amount, sql_txid}, WireState, }; use super::{analysis::analysis, LoopError, LoopResult}; /// Synchronize local db with blockchain and perform transactions pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, mut state: WireState) { let mut lifetime = state.lifetime; let mut status = true; let mut skip_notification = false; loop { // Check lifetime if let Some(nb) = lifetime.as_mut() { if *nb == 0 { info!("Reach end of lifetime"); return; } else { *nb -= 1; } } // Connect let rpc = rpc.client(); let db = db.client(); let result: LoopResult<()> = (|| { // Listen to all channels db.batch_execute("LISTEN new_block; LISTEN new_tx")?; // Wait for the next notification { let mut ntf = db.notifications(); if !skip_notification && ntf.is_empty() { // Block until next notification ntf.blocking_iter().next()?; } // Conflate all notifications let mut iter = ntf.iter(); while iter.next()?.is_some() {} } // It is not possible to atomically update the blockchain and the database. // When we failed to sync the database and the blockchain state we rely on // sync_chain to recover the lost updates. // When this function is running concurrently, it not possible to known another // execution has failed, and this can lead to a transaction being sent multiple time. // To ensure only a single version of this function is running at a given time we rely // on postgres advisory lock // Take the lock let row = db.query_one("SELECT pg_try_advisory_lock(42)", &[])?; let locked: bool = row.get(0); if !locked { return Err(LoopError::Concurrency); } // Perform analysis state.confirmation = analysis(rpc, state.confirmation, state.max_confirmation)?; // Sync chain if let Some(stuck) = sync_chain(rpc, db, &state, &mut status)? { // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent // Send requested debits while debit(db, rpc, &state)? {} // Bump stuck transactions for id in stuck { let bump = rpc.bump_fee(&id)?; fail_point("(injected) fail bump", 0.3)?; let row = db.query_one( "UPDATE tx_out SET txid=$1 WHERE txid=$2 RETURNING wtid", &[ &bump.txid.as_byte_array().as_slice(), &id.as_byte_array().as_slice(), ], )?; info!( ">> (bump) {} replace {} with {}", base32(row.get(0)), id, bump.txid ); } // Send requested bounce while bounce(db, rpc, &state.bounce_fee)? {} } Ok(()) })(); if let Err(e) = result { error!("worker: {}", e); // When we catch an error, we sometimes want to retry immediately (eg. reconnect to RPC or DB). // Bitcoin error codes are generic. We need to match the msg to get precise ones. Some errors // can resolve themselves when a new block is mined (new fees, new transactions). Our simple // approach is to wait for the next loop when an RPC error is caught to prevent endless logged errors. skip_notification = !matches!( e, LoopError::Rpc(rpc::Error::RPC { .. } | rpc::Error::Bitcoin(_)) | LoopError::Concurrency ); } else { skip_notification = false; } } } /// Retrieve last stored hash fn last_hash(db: &mut Client) -> Result { let row = db.query_one("SELECT value FROM state WHERE name='last_hash'", &[])?; Ok(BlockHash::from_slice(row.get(0)).unwrap()) } /// Parse new transactions, return stuck transactions if the database is up to date with the latest mined block fn sync_chain( rpc: &mut Rpc, db: &mut Client, state: &WireState, status: &mut bool, ) -> LoopResult>> { // Get stored last_hash let last_hash = last_hash(db)?; // Get the current confirmation delay let conf_delay = state.confirmation; // Get a set of transactions ids to parse let (txs, removed, lastblock): ( HashMap, HashMap, BlockHash, ) = { // Get all transactions made since this block let list = rpc.list_since_block(Some(&last_hash), conf_delay)?; // Only keep ids and category let txs = list .transactions .into_iter() .map(|tx| (tx.txid, (tx.category, tx.confirmations))) .collect(); let removed = list .removed .into_iter() .map(|tx| (tx.txid, (tx.category, tx.confirmations))) .collect(); (txs, removed, list.lastblock) }; // Check if a confirmed incoming transaction have been removed by a blockchain reorganization let new_status = sync_chain_removed(&txs, &removed, rpc, db, conf_delay as i32)?; // Sync status with database if *status != new_status { let mut tx = db.transaction()?; tx.execute( "UPDATE state SET value=$1 WHERE name='status'", &[&[new_status as u8].as_slice()], )?; tx.execute("NOTIFY status", &[])?; tx.commit()?; *status = new_status; if new_status { info!("Recovered lost transactions"); } } if !new_status { return Ok(None); } let mut stuck = vec![]; for (id, (category, confirmations)) in txs { match category { Category::Send => { if sync_chain_outgoing(&id, confirmations, rpc, db, state)? { stuck.push(id); } } Category::Receive if confirmations >= conf_delay as i32 => { sync_chain_incoming_confirmed(&id, rpc, db, state)? } _ => { // Ignore coinbase and unconfirmed send transactions } } } // Move last_hash forward db.execute( "UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2", &[ &lastblock.as_byte_array().as_slice(), &last_hash.as_byte_array().as_slice(), ], )?; Ok(Some(stuck)) } /// Sync database with removed transactions, return false if bitcoin backing is compromised fn sync_chain_removed( txs: &HashMap, removed: &HashMap, rpc: &mut Rpc, db: &mut Client, min_confirmations: i32, ) -> LoopResult { // A removed incoming transaction is a correctness issues in only two cases: // - it is a confirmed credit registered in the database // - it is an invalid transactions already bounced // Those two cases can compromise bitcoin backing // Removed outgoing transactions will be retried automatically by the node let mut blocking_debit = Vec::new(); let mut blocking_bounce = Vec::new(); // Only keep incoming transaction that are not reconfirmed // TODO study risk of accepting only mined transactions for faster recovery for (id, _) in removed.iter().filter(|(id, (cat, _))| { *cat == Category::Receive && txs .get(*id) .map(|(_, confirmations)| *confirmations < min_confirmations) .unwrap_or(true) }) { match rpc.get_tx_segwit_key(id) { Ok((full, key)) => { // Credits are only problematic if not reconfirmed and stored in the database if db .query_opt( "SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_slice()], )? .is_some() { let debit_addr = sender_address(rpc, &full)?; blocking_debit.push((key, id, debit_addr)); } } Err(err) => match err { GetSegwitErr::Decode(_) => { // Invalid tx are only problematic if already bounced if let Some(row) = db.query_opt( "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL", &[&id.as_byte_array().as_slice()], )? { blocking_bounce.push((sql_txid(&row, 0), id)); } else { // Remove transaction from bounce table db.execute( "DELETE FROM bounce WHERE bounced=$1", &[&id.as_byte_array().as_slice()], )?; } } GetSegwitErr::RPC(it) => return Err(it.into()), }, } } if !blocking_bounce.is_empty() || !blocking_debit.is_empty() { let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string(); for (key, id, addr) in blocking_debit { write!( &mut buf, "\n\tcredit {} in {} from {}", base32(&key), id, addr ) .unwrap(); } for (id, bounced) in blocking_bounce { write!(&mut buf, "\n\tbounced {} in {}", id, bounced).unwrap(); } error!("{}", buf); Ok(false) } else { Ok(true) } } /// Sync database with an incoming confirmed transaction fn sync_chain_incoming_confirmed( id: &Txid, rpc: &mut Rpc, db: &mut Client, state: &WireState, ) -> Result<(), LoopError> { match rpc.get_tx_segwit_key(id) { Ok((full, reserve_pub)) => { // Store transactions in database let debit_addr = sender_address(rpc, &full)?; let credit_addr = full.details[0].address.clone().unwrap().assume_checked(); let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); let amount = btc_to_taler(&full.amount, state.currency); let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ &date, &amount.to_string(), &reserve_pub.as_slice(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(&credit_addr).as_ref() ])?; if nb > 0 { info!( "<< {} {} in {} from {}", amount, base32(&reserve_pub), id, debit_addr ); } } Err(err) => match err { GetSegwitErr::Decode(_) => { // If encoding is wrong request a bounce db.execute( "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", &[&id.as_byte_array().as_slice()], )?; } GetSegwitErr::RPC(e) => return Err(e.into()), }, } Ok(()) } /// Sync database with a debit transaction, return true if stuck fn sync_chain_debit( id: &Txid, full: &Transaction, wtid: &[u8; 32], rpc: &mut Rpc, db: &mut Client, confirmations: i32, state: &WireState, ) -> LoopResult { let credit_addr = full.details[0].address.clone().unwrap().assume_checked(); let amount = btc_to_taler(&full.amount, state.currency); if confirmations < 0 { if full.replaced_by_txid.is_none() { // Handle conflicting tx let nb_row = db.execute( "UPDATE tx_out SET status=$1, txid=NULL where txid=$2", &[ &(DebitStatus::Requested as i16), &id.as_byte_array().as_slice(), ], )?; if nb_row > 0 { warn!(">> (conflict) {} in {id} to {credit_addr}", base32(wtid)); } } } else { // Get previous out tx let row = db.query_opt( "SELECT id,status,txid FROM tx_out WHERE wtid=$1 FOR UPDATE", &[&wtid.as_slice()], )?; if let Some(row) = row { // If already in database, sync status let row_id: i32 = row.get(0); let status: i16 = row.get(1); match DebitStatus::try_from(status as u8).unwrap() { DebitStatus::Requested => { let nb_row = db.execute( "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3 AND status=$4", &[ &(DebitStatus::Sent as i16), &id.as_byte_array().as_slice(), &row_id, &status, ], )?; if nb_row > 0 { warn!( ">> (recovered) {amount} {} in {id} to {credit_addr}", base32(wtid) ); } } DebitStatus::Sent => { if let Some(txid) = full.replaces_txid { let stored_id = sql_txid(&row, 2); if txid == stored_id { let nb_row = db.execute( "UPDATE tx_out SET txid=$1 WHERE txid=$2", &[ &id.as_byte_array().as_slice(), &txid.as_byte_array().as_slice(), ], )?; if nb_row > 0 { info!(">> (recovered) {} replace {txid} with {id}", base32(wtid),); } } } } } } else { // Else add to database let debit_addr = sender_address(rpc, full)?; let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); let nb = db.execute( "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", &[&date, &amount.to_string(), &wtid.as_slice(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(&credit_addr).as_ref(), &state.base_url.as_ref(), &(DebitStatus::Sent as i16), &id.as_byte_array().as_slice(), &None::<&[u8]>], )?; if nb > 0 { warn!( ">> (onchain) {amount} {} in {id} to {credit_addr}", base32(wtid) ); } } // Check if stuck if let Some(delay) = state.bump_delay { if confirmations == 0 && full.replaced_by_txid.is_none() { let now = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap() .as_secs(); if now - full.time > delay as u64 { return Ok(true); } } } } Ok(false) } /// Sync database with an outgoing bounce transaction fn sync_chain_bounce( id: &Txid, bounced: &Txid, db: &mut Client, confirmations: i32, ) -> LoopResult<()> { if confirmations < 0 { // Handle conflicting tx let nb_row = db.execute( "UPDATE bounce SET status=$1, txid=NULL where txid=$2", &[ &(BounceStatus::Requested as i16), &id.as_byte_array().as_slice(), ], )?; if nb_row > 0 { warn!("|| (conflict) {} in {}", &bounced, &id); } } else { // Get previous bounce let row = db.query_opt( "SELECT id, status FROM bounce WHERE bounced=$1", &[&bounced.as_byte_array().as_slice()], )?; if let Some(row) = row { // If already in database, sync status let row_id: i32 = row.get(0); let status: i16 = row.get(1); match BounceStatus::try_from(status as u8).unwrap() { BounceStatus::Requested => { let nb_row = db.execute( "UPDATE bounce SET status=$1, txid=$2 WHERE id=$3 AND status=$4", &[ &(BounceStatus::Sent as i16), &id.as_byte_array().as_slice(), &row_id, &status, ], )?; if nb_row > 0 { warn!("|| (recovered) {} in {}", &bounced, &id); } } BounceStatus::Ignored => error!( "watcher: ignored bounce {} found in chain at {}", bounced, id ), BounceStatus::Sent => { /* Status is correct */ } } } else { // Else add to database let nb = db.execute( "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", &[&bounced.as_byte_array().as_slice(), &id.as_byte_array().as_slice(), &(BounceStatus::Sent as i16)], )?; if nb > 0 { warn!("|| (onchain) {} in {}", &bounced, &id); } } } Ok(()) } /// Sync database with an outgoing transaction, return true if stuck fn sync_chain_outgoing( id: &Txid, confirmations: i32, rpc: &mut Rpc, db: &mut Client, state: &WireState, ) -> LoopResult { match rpc .get_tx_op_return(id) .map(|(full, bytes)| (full, OutMetadata::decode(&bytes))) { Ok((full, Ok(info))) => match info { OutMetadata::Debit { wtid, .. } => { return sync_chain_debit(id, &full, &wtid, rpc, db, confirmations, state); } OutMetadata::Bounce { bounced } => { sync_chain_bounce(id, &Txid::from_byte_array(bounced), db, confirmations)? } }, Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e), Err(e) => match e { GetOpReturnErr::MissingOpReturn => { /* Ignore */ } GetOpReturnErr::RPC(e) => return Err(e)?, }, } Ok(false) } /// Send a debit transaction on the blockchain, return false if no more requested transactions are found fn debit(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult { // We rely on the advisory lock to ensure we are the only one sending transactions let row = db.query_opt( "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY _date LIMIT 1", &[&(DebitStatus::Requested as i16)], )?; if let Some(row) = &row { let id: i32 = row.get(0); let amount = sql_btc_amount(row, 1, state.currency); let wtid: [u8; 32] = sql_array(row, 2); let addr = sql_addr(row, 3); let url = sql_url(row, 4); let metadata = OutMetadata::Debit { wtid, url }; let tx_id = rpc.send( &addr, &amount, Some(&metadata.encode().or_fail(|e| format!("{}", e))), false, )?; fail_point("(injected) fail debit", 0.3)?; db.execute( "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", &[ &(DebitStatus::Sent as i16), &tx_id.as_byte_array().as_slice(), &id, ], )?; let amount = btc_to_taler(&amount.to_signed().unwrap(), state.currency); info!(">> {} {} in {} to {}", amount, base32(&wtid), tx_id, addr); } Ok(row.is_some()) } /// Bounce a transaction on the blockchain, return false if no more requested transactions are found fn bounce(db: &mut Client, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult { // We rely on the advisory lock to ensure we are the only one sending transactions let row = db.query_opt( "SELECT id, bounced FROM bounce WHERE status=$1 ORDER BY _date LIMIT 1", &[&(BounceStatus::Requested as i16)], )?; if let Some(row) = &row { let id: i32 = row.get(0); let bounced: Txid = sql_txid(row, 1); let metadata = OutMetadata::Bounce { bounced: *bounced.as_byte_array(), }; match rpc.bounce( &bounced, fee, Some(&metadata.encode().or_fail(|e| format!("{}", e))), ) { Ok(it) => { fail_point("(injected) fail bounce", 0.3)?; db.execute( "UPDATE bounce SET txid=$1, status=$2 WHERE id=$3", &[ &it.as_byte_array().as_slice(), &(BounceStatus::Sent as i16), &id, ], )?; info!("|| {} in {}", &bounced, &it); } Err(err) => match err { rpc::Error::RPC { code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, msg, } => { db.execute( "UPDATE bounce SET status=$1 WHERE id=$2", &[&(BounceStatus::Ignored as i16), &id], )?; info!("|| (ignore) {} because {}", &bounced, msg); } e => Err(e)?, }, } } Ok(row.is_some()) }