use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, SignedAmount, Txid};
use btc_wire::{
config::BitcoinConfig,
rpc::{self, BtcRpc, Category, ErrorCode},
rpc_utils::{default_data_dir, sender_address},
segwit::DecodeSegWitErr,
GetOpReturnErr, GetSegwitErr,
};
use info::decode_info;
use postgres::{fallible_iterator::FallibleIterator, Client};
use rand::{rngs::OsRng, RngCore};
use reconnect::{AutoReconnectRPC, AutoReconnectSql};
use std::{
collections::{HashMap, HashSet},
path::PathBuf,
process::exit,
str::FromStr,
time::{Duration, SystemTime},
};
use taler_api::api_common::{base32, Amount};
use taler_config::Config;
use taler_log::log::{error, info, warn};
use url::Url;
use crate::{
fail_point::fail_point,
info::{encode_info, Info},
status::{BounceStatus, TxStatus},
};
mod fail_point;
mod info;
mod reconnect;
mod status;
fn btc_payto_url(addr: &Address) -> Url {
Url::from_str(&format!("payto://bitcoin/{}", addr)).unwrap()
}
fn btc_payto_addr(url: &Url) -> Result
{
if url.domain() != Some("bitcoin") {
return Err("".to_string());
}
let str = url.path().trim_start_matches('/');
return Address::from_str(str).map_err(|_| "".to_string());
}
fn btc_amount_to_taler_amount(amount: &SignedAmount) -> Amount {
let unsigned = amount.abs().to_unsigned().unwrap();
let sat = unsigned.as_sat();
return Amount::new("BTC", sat / 100_000_000, (sat % 100_000_000) as u32);
}
fn taler_amount_to_btc_amount(amount: &Amount) -> Result {
if amount.currency != "BTC" {
return Err("Wrong currency".to_string());
}
let sat = amount.value * 100_000_000 + amount.fraction as u64;
return Ok(BtcAmount::from_sat(sat));
}
fn last_hash(db: &mut Client) -> Result, postgres::Error> {
Ok(db
.query_opt("SELECT value FROM state WHERE name='last_hash'", &[])?
.map(|r| BlockHash::from_slice(r.get(0)).unwrap()))
}
/// Listen for new proposed transactions and announce them on the bitcoin network
fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config) {
// Send a transaction on the blockchain, return true if more transactions with the same status remains
fn send(
db: &mut Client,
rpc: &mut BtcRpc,
status: TxStatus,
) -> Result> {
assert!(status == TxStatus::Delayed || status == TxStatus::Requested);
let mut tx = db.transaction()?;
// We lock the row with FOR UPDATE to prevent sending same transaction multiple time
let row = tx.query_opt(
"SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE",
&[&(status as i16)],
)?;
if let Some(row) = &row {
let id: i32 = row.get(0);
let amount = taler_amount_to_btc_amount(&Amount::from_str(row.get(1))?)?;
let wtid: &[u8] = row.get(2);
let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?;
let exchange_base_url: Url = Url::parse(row.get(4))?;
let info = Info::Transaction {
wtid: wtid.try_into()?,
url: exchange_base_url,
};
let metadata = encode_info(&info);
fail_point("Skip send_op_return", 0.2)?;
match rpc.send_op_return(&addr, &amount, &metadata, false) {
Ok(tx_id) => {
fail_point("Fail update db", 0.2)?;
tx.execute(
"UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
&[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id],
)?;
let amount = btc_amount_to_taler_amount(&amount.to_signed().unwrap());
info!(">> {} {} in {} to {}", amount, base32(&wtid), tx_id, addr);
}
Err(e) => {
info!("sender: RPC - {}", e);
tx.execute(
"UPDATE tx_out SET status=$1 WHERE id=$2",
&[&(TxStatus::Delayed as i16), &id],
)?;
}
}
tx.commit()?;
}
Ok(row.is_some())
}
// Bounce a transaction on the blockchain, return true if more bounce with the same status remains
fn bounce(
db: &mut Client,
rpc: &mut BtcRpc,
status: BounceStatus,
fee: &BtcAmount,
) -> Result> {
assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested);
let mut tx = db.transaction()?;
// We lock the row with FOR UPDATE to prevent sending same transaction multiple time
let row = tx.query_opt(
"SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE",
&[&(status as i16)],
)?;
if let Some(row) = &row {
let id: i32 = row.get(0);
let bounced: Txid = Txid::from_slice(row.get(1))?;
let info = Info::Bounce { bounced };
let metadata = encode_info(&info);
fail_point("Skip send_op_return", 0.2)?;
match rpc.bounce(&bounced, &fee, &metadata) {
Ok(it) => {
info!("|| {} in {}", &bounced, &it);
tx.execute(
"UPDATE bounce SET txid = $1, status = $2 WHERE id = $3",
&[&it.as_ref(), &(BounceStatus::Sent as i16), &id],
)?;
}
Err(err) => match err {
rpc::Error::RPC {
code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError,
msg,
} => {
info!("|| (ignore) {} because {}", &bounced, msg);
tx.execute(
"UPDATE bounce SET status = $1 WHERE id = $2",
&[&(BounceStatus::Ignored as i16), &id],
)?;
}
_ => Err(err)?,
},
}
tx.commit()?;
}
Ok(row.is_some())
}
// TODO check if transactions are abandoned
// Alway start with a sync work
let mut skip_notification = true;
loop {
let rpc = rpc.client();
let db = db.client();
let result: Result<(), Box> = (|| {
// Listen to all channels
db.batch_execute("LISTEN new_block; LISTEN new_tx")?;
// Wait for the next notification
{
let mut ntf = db.notifications();
if !skip_notification && ntf.is_empty() {
// Block until next notification
ntf.blocking_iter().next()?;
}
// Conflate all notifications
let mut iter = ntf.iter();
while let Some(_) = iter.next()? {}
}
// Sync chain
sync_chain(rpc, db, config)?;
// As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent
// Send delayed transactions
while send(db, rpc, TxStatus::Delayed)? {}
// Send requested transactions
while send(db, rpc, TxStatus::Requested)? {}
let bounce_fee = BtcAmount::from_sat(config.bounce_fee);
// Send delayed bounce
while bounce(db, rpc, BounceStatus::Delayed, &bounce_fee)? {}
// Send requested bounce
while bounce(db, rpc, BounceStatus::Requested, &bounce_fee)? {}
Ok(())
})();
if let Err(e) = result {
error!("worker: DB - {}", e);
// On failure retry without waiting for notifications
skip_notification = true;
} else {
skip_notification = false;
}
}
}
/// Parse new transactions, if exit whiteout failing the database is up to date with the latest mined block
fn sync_chain(
rpc: &mut BtcRpc,
db: &mut Client,
config: &Config,
) -> Result<(), Box> {
// Get stored last_hash
let last_hash = last_hash(db)?;
let min_confirmations = config.confirmation;
// Get a set of transactions ids to parse
let (txs, removed, lastblock): (HashMap, HashSet, BlockHash) = {
// Get all transactions made since this block
let list = rpc.list_since_block(last_hash.as_ref(), min_confirmations, true)?;
// Only keep ids and category
let txs = list
.transactions
.into_iter()
.map(|tx| (tx.txid, (tx.category, tx.confirmations)))
.collect();
let removed = list
.removed
.into_iter()
.filter_map(|tx| (tx.category == Category::Receive).then(|| tx.txid))
.collect();
(txs, removed, list.lastblock)
};
// Check if a confirmed incoming transaction have been removed by a blockchain reorganisation
if !removed.is_empty() {
for id in removed {
if let Ok((full, key)) = rpc.get_tx_segwit_key(&id) {
// If the removed tx is not in confirmed the txs list and the tx is stored in the database hard error
if txs
.get(&id)
.map(|(_, confirmations)| *confirmations < min_confirmations as i32)
.unwrap_or(true)
&& db
.query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_ref()])?
.is_some()
{
let credit_addr = full.details[0].address.as_ref().unwrap();
error!("Received transaction {} in {} from {} have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear", base32(&key), id, credit_addr);
exit(1);
}
}
}
}
for (id, (category, confirmations)) in txs {
match category {
Category::Send => {
match rpc.get_tx_op_return(&id) {
Ok((full, bytes)) => {
let mut tx = db.transaction()?;
match decode_info(&bytes) {
Ok(info) => {
match info {
Info::Transaction { wtid, .. } => {
let addr = full.details[0].address.as_ref().unwrap();
let amount = btc_amount_to_taler_amount(&full.amount);
let row = tx.query_opt(
"SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE",
&[&wtid.as_ref()],
)?;
if let Some(row) = row {
let _id: i32 = row.get(0);
let status: i16 = row.get(1);
match TxStatus::try_from(status as u8).unwrap() {
TxStatus::Requested | TxStatus::Delayed => {
tx.execute(
"UPDATE tx_out SET status=$1 where id=$2",
&[&(TxStatus::Sent as i16), &_id],
)?;
warn!(
">> (recovered) {} {} in {} to {}",
amount,
base32(&wtid),
id,
addr
);
}
TxStatus::Sent => {}
}
} else {
let debit_addr = sender_address(rpc, &full)?;
let date = SystemTime::UNIX_EPOCH
+ Duration::from_secs(full.time);
// Generate a random request_uid
let mut request_uid = [0; 64];
OsRng.fill_bytes(&mut request_uid);
let nb = tx.execute(
"INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
&[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &request_uid.as_ref()],
)?;
if nb > 0 {
warn!(
">> (onchain) {} {} in {} to {}",
amount,
base32(&wtid),
id,
addr
);
}
}
}
Info::Bounce { bounced } => {
let row = tx.query_opt(
"SELECT id, status FROM bounce WHERE bounced=$1 FOR UPDATE",
&[&bounced.as_ref()],
)?;
if let Some(row) = row {
let _id: i32 = row.get(0);
let status: i16 = row.get(1);
match BounceStatus::try_from(status as u8).unwrap() {
BounceStatus::Requested | BounceStatus::Delayed => {
tx.execute(
"UPDATE bounce SET status=$1 where id=$2",
&[&(BounceStatus::Sent as i16), &_id],
)?;
warn!("|| (recovered) {} in {}", &bounced, &id);
}
BounceStatus::Ignored => error!("watcher: ignored bounce {} found in chain at {}", bounced, id),
BounceStatus::Sent => {}
}
} else {
let nb = tx.execute(
"INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING",
&[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)],
)?;
if nb > 0 {
warn!("|| (onchain) {} in {}", &bounced, &id);
}
}
}
}
}
Err(err) => warn!("send: decode-info {} - {}", id, err),
}
tx.commit()?;
}
Err(err) => match err {
GetOpReturnErr::MissingOpReturn => {} // ignore
err => warn!("send: {} {}", id, err),
},
}
}
Category::Receive if confirmations >= min_confirmations as i32 => {
match rpc.get_tx_segwit_key(&id) {
Ok((full, reserve_pub)) => {
let debit_addr = sender_address(rpc, &full)?;
let credit_addr = full.details[0].address.as_ref().unwrap();
let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
let amount = btc_amount_to_taler_amount(&full.amount);
let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
&date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref()
])?;
if nb > 0 {
info!(
"<< {} {} in {} from {}",
amount,
base32(&reserve_pub),
id,
debit_addr
);
}
}
Err(err) => match err {
GetSegwitErr::Decode(
DecodeSegWitErr::MissingSegWitAddress | DecodeSegWitErr::NoMagicIdMatch,
) => {
// Request a bounce
db.execute("INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", &[&id.as_ref()])?;
}
err => warn!("receive: {} {}", id, err),
},
}
}
_ => {
// Ignore coinbase and unconfirmed send transactions
}
}
}
// Move last_hash forward if no error have been caught
{
let mut tx = db.transaction()?;
let curr_hash: Option = tx
.query_opt(
"SELECT value FROM state WHERE name='last_hash' FOR UPDATE",
&[],
)?
.map(|r| BlockHash::from_slice(r.get(0)).unwrap());
if last_hash != curr_hash {
error!("watcher: hash state collision, database have been altered by another process");
}
if curr_hash.is_some() {
tx.execute(
"UPDATE state SET value=$1 WHERE name='last_hash'",
&[&lastblock.as_ref()],
)?;
} else {
tx.execute(
"INSERT INTO state (name, value) VALUES ('last_hash', $1)",
&[&lastblock.as_ref()],
)?;
};
tx.commit()?;
}
Ok(())
}
fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) {
loop {
let result: Result<(), Box> = (|| {
let rpc = rpc.client();
let db = db.client();
rpc.wait_for_new_block(0).ok();
db.execute("NOTIFY new_block", &[])?;
Ok(())
})();
if let Err(e) = result {
error!("listener: DB - {}", e);
}
}
}
fn main() {
taler_log::init();
#[cfg(feature = "fail")]
taler_log::log::warn!("Running with random failures is unsuitable for production");
// Guess network by trying to connect to a JSON RPC server
let data_dir = std::env::args()
.nth(1)
.map(|str| PathBuf::from_str(&str).unwrap())
.unwrap_or_else(default_data_dir);
let config = taler_config::Config::from_path("test.conf");
let config: &'static Config = Box::leak(Box::new(config));
let btc_config = BitcoinConfig::load(&data_dir).unwrap();
let mut rpc = BtcRpc::common(&btc_config).unwrap();
rpc.load_wallet(&config.btc_wallet).ok();
let rpc_listener = AutoReconnectRPC::new(
btc_config.clone(),
&config.btc_wallet,
Duration::from_secs(5),
);
let rpc_worker = AutoReconnectRPC::new(btc_config, &config.btc_wallet, Duration::from_secs(5));
let db_listener = AutoReconnectSql::new(&config.db_url, Duration::from_secs(5));
let db_worker = AutoReconnectSql::new(&config.db_url, Duration::from_secs(5));
std::thread::spawn(move || block_listener(rpc_listener, db_listener));
worker(rpc_worker, db_worker, config);
}