commit d80f18e661ebe7904cfbdd85517441f82b0b4961
parent ad5c77a2e34b2cc794c5e12e99f82aae3c74e460
Author: Antoine A <>
Date: Tue, 11 Jan 2022 17:47:29 +0100
Cleaner code and better error handling
Diffstat:
3 files changed, 294 insertions(+), 239 deletions(-)
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -71,10 +71,10 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
};
let metadata = encode_info(&info);
- fail_point("Skip send_op_return", 0.2)?;
+ fail_point("(injected) fail send_op_return", 0.2)?;
match rpc.send_op_return(&addr, &amount, &metadata, false) {
Ok(tx_id) => {
- fail_point("Fail update db", 0.2)?;
+ fail_point("(injected) fail update db", 0.2)?;
tx.execute(
"UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
&[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id],
@@ -116,7 +116,7 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
let info = Info::Bounce { bounced };
let metadata = encode_info(&info);
- fail_point("Skip send_op_return", 0.2)?;
+ fail_point("(injected) fail bounce", 0.2)?;
match rpc.bounce(&bounced, fee, &metadata) {
Ok(it) => {
tx.execute(
@@ -238,223 +238,18 @@ fn sync_chain(
// Check if a confirmed incoming transaction have been removed by a blockchain reorganisation
- if !removed.is_empty() {
- let mut blocking_receive = Vec::new();
- let mut blocking_bounce = Vec::new();
- for id in removed {
- match rpc.get_tx_segwit_key(&id) {
- Ok((full, key)) => {
- // If the removed tx is not in confirmed the txs list and the tx is stored in the database hard error
- if txs
- .get(&id)
- .map(|(_, confirmations)| *confirmations < min_confirmations as i32)
- .unwrap_or(true)
- && db
- .query_opt(
- "SELECT 1 FROM tx_in WHERE reserve_pub=$1",
- &[&key.as_ref()],
- )?
- .is_some()
- {
- let debit_addr = sender_address(rpc, &full)?;
- blocking_receive.push((key, id, debit_addr));
- }
- }
- Err(err) => {
- match err {
- GetSegwitErr::Decode(_) => {
- if let Some(row) = db.query_opt(
- "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL",
- &[&id.as_ref()],
- )? {
- let txid = Txid::from_slice(row.get(0)).unwrap();
- blocking_bounce.push((txid, id));
- } else {
- db.execute("DELETE FROM bounce WHERE bounced=$1", &[&id.as_ref()])?;
- }
- }
- _ => { /* ignore already caught error */ }
- }
- }
- }
- }
-
- if !blocking_bounce.is_empty() || !blocking_receive.is_empty() {
- let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string();
- for (key, id, addr) in blocking_receive {
- write!(
- &mut buf,
- "\n\treceived {} in {} from {}",
- base32(&key),
- id,
- addr
- )
- .unwrap();
- }
- for (id, bounced) in blocking_bounce {
- write!(&mut buf, "\n\tbounced {} in {}", id, bounced).unwrap();
- }
- error!("{}", buf);
- exit(1);
- }
+ if !sync_chain_removed(&txs, &removed, rpc, db, min_confirmations as i32)? {
+ // TODO sleep instead ?
+ exit(1);
}
-
for (id, (category, confirmations)) in txs {
match category {
- Category::Send => {
- match rpc.get_tx_op_return(&id) {
- Ok((full, bytes)) => {
- match decode_info(&bytes) {
- Ok(info) => {
- match info {
- Info::Transaction { wtid, .. } => {
- let addr = full.details[0].address.as_ref().unwrap();
- let amount = btc_to_taler(&full.amount);
-
- if confirmations < 0 {
- let nb_row = db.execute(
- "UPDATE tx_out SET status=$1, txid=NULL where txid=$2",
- &[&(TxStatus::Delayed as i16), &id.as_ref()],
- )?;
- if nb_row > 0 {
- warn!(
- ">> (conflict) {} in {} to {}",
- base32(&wtid),
- id,
- addr
- );
- }
- } else {
- let mut tx = db.transaction()?;
- let row = tx.query_opt(
- "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE",
- &[&wtid.as_ref()],
- )?;
- if let Some(row) = row {
- let _id: i32 = row.get(0);
- let status: i16 = row.get(1);
- match TxStatus::try_from(status as u8).unwrap() {
- TxStatus::Requested | TxStatus::Delayed => {
- tx.execute(
- "UPDATE tx_out SET status=$1 where id=$2",
- &[&(TxStatus::Sent as i16), &_id],
- )?;
- tx.commit()?;
- warn!(
- ">> (recovered) {} {} in {} to {}",
- amount,
- base32(&wtid),
- id,
- addr
- );
- }
- TxStatus::Sent => {}
- }
- } else {
- let debit_addr = sender_address(rpc, &full)?;
- let date = SystemTime::UNIX_EPOCH
- + Duration::from_secs(full.time);
- // Generate a random request_uid
- let mut request_uid = [0; 64];
- OsRng.fill_bytes(&mut request_uid);
- let nb = tx.execute(
- "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
- &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &request_uid.as_ref()],
- )?;
- tx.commit()?;
- if nb > 0 {
- warn!(
- ">> (onchain) {} {} in {} to {}",
- amount,
- base32(&wtid),
- id,
- addr
- );
- }
- }
- }
- }
- Info::Bounce { bounced } => {
- if confirmations < 0 {
- let nb_row = db.execute(
- "UPDATE bounce SET status=$1, txid=NULL where txid=$2",
- &[&(BounceStatus::Delayed as i16), &id.as_ref()],
- )?;
- if nb_row > 0 {
- warn!("|| (conflict) {} in {}", &bounced, &id);
- }
- } else {
- let mut tx = db.transaction()?;
- let row = tx.query_opt(
- "SELECT id, status FROM bounce WHERE bounced=$1 FOR UPDATE",
- &[&bounced.as_ref()],
- )?;
- if let Some(row) = row {
- let _id: i32 = row.get(0);
- let status: i16 = row.get(1);
- match BounceStatus::try_from(status as u8).unwrap() {
- BounceStatus::Requested | BounceStatus::Delayed => {
- tx.execute(
- "UPDATE bounce SET status=$1 where id=$2",
- &[&(BounceStatus::Sent as i16), &_id],
- )?;
- tx.commit()?;
- warn!("|| (recovered) {} in {}", &bounced, &id);
- }
- BounceStatus::Ignored => error!("watcher: ignored bounce {} found in chain at {}", bounced, id),
- BounceStatus::Sent => {}
- }
- } else {
- let nb = tx.execute(
- "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING",
- &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)],
- )?;
- tx.commit()?;
- if nb > 0 {
- warn!("|| (onchain) {} in {}", &bounced, &id);
- }
- }
- }
- }
- }
- }
- Err(err) => warn!("send: decode-info {} - {}", id, err),
- }
- }
- Err(err) => match err {
- GetOpReturnErr::MissingOpReturn => {} // ignore
- err => warn!("send: {} {}", id, err),
- },
- }
- }
+ Category::Send => sync_chain_outgoing(&id, confirmations, rpc, db, config)?,
Category::Receive if confirmations >= min_confirmations as i32 => {
- match rpc.get_tx_segwit_key(&id) {
- Ok((full, reserve_pub)) => {
- let debit_addr = sender_address(rpc, &full)?;
- let credit_addr = full.details[0].address.as_ref().unwrap();
- let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
- let amount = btc_to_taler(&full.amount);
- let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
- &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref()
- ])?;
- if nb > 0 {
- info!(
- "<< {} {} in {} from {}",
- amount,
- base32(&reserve_pub),
- id,
- debit_addr
- );
- }
- }
- Err(err) => match err {
- GetSegwitErr::Decode(_) => {
- // Request a bounce
- db.execute("INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", &[&id.as_ref()])?;
- }
- err => warn!("receive: {} {}", id, err),
- },
- }
+ sync_chain_incoming_confirmed(&id, rpc, db)?
+ }
+ Category::Receive if confirmations < 0 => {
+ panic!("receive conflict {} {}", id, confirmations)
}
_ => {
// Ignore coinbase and unconfirmed send transactions
@@ -491,6 +286,254 @@ fn sync_chain(
Ok(())
}
+/// Sync database with removed transactions, return false if bitcoin backing is compromised
+fn sync_chain_removed(
+ txs: &HashMap<Txid, (Category, i32)>,
+ removed: &HashSet<Txid>,
+ rpc: &mut BtcRpc,
+ db: &mut Client,
+ min_confirmations: i32,
+) -> Result<bool, Box<dyn std::error::Error>> {
+ // Removed transactions are correctness issue in only two cases:
+ // - An incoming valid transaction considered confirmed in the database
+ // - An incoming invalid transactions already bounced
+ // Those two cases can compromise bitcoin backing
+ // Removed outgoing transactions will be retried automatically by the node
+
+ let mut blocking_receive = Vec::new();
+ let mut blocking_bounce = Vec::new();
+ for id in removed {
+ match rpc.get_tx_segwit_key(&id) {
+ Ok((full, key)) => {
+ // Valid tx are only problematic if not confirmed in the txs list and stored stored in the database
+ if txs
+ .get(id)
+ .map(|(_, confirmations)| *confirmations < min_confirmations)
+ .unwrap_or(true)
+ && db
+ .query_opt("SELECT 1 FROM tx_in WHERE reserve_pub=$1", &[&key.as_ref()])?
+ .is_some()
+ {
+ let debit_addr = sender_address(rpc, &full)?;
+ blocking_receive.push((key, id, debit_addr));
+ }
+ }
+ Err(err) => match err {
+ GetSegwitErr::Decode(_) => {
+ // Invalid tx are only problematic if already bounced
+ if let Some(row) = db.query_opt(
+ "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL",
+ &[&id.as_ref()],
+ )? {
+ let txid = Txid::from_slice(row.get(0)).unwrap();
+ blocking_bounce.push((txid, id));
+ } else {
+ // Remove transaction from bounce table
+ db.execute("DELETE FROM bounce WHERE bounced=$1", &[&id.as_ref()])?;
+ }
+ }
+ GetSegwitErr::RPC(it) => return Err(it.into()),
+ },
+ }
+ }
+
+ if !blocking_bounce.is_empty() || !blocking_receive.is_empty() {
+ let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string();
+ for (key, id, addr) in blocking_receive {
+ write!(
+ &mut buf,
+ "\n\treceived {} in {} from {}",
+ base32(&key),
+ id,
+ addr
+ )
+ .unwrap();
+ }
+ for (id, bounced) in blocking_bounce {
+ write!(&mut buf, "\n\tbounced {} in {}", id, bounced).unwrap();
+ }
+ error!("{}", buf);
+ return Ok(false);
+ } else {
+ return Ok(true);
+ }
+}
+
+/// Sync database with an outgoing transaction
+fn sync_chain_outgoing(
+ id: &Txid,
+ confirmations: i32,
+ rpc: &mut BtcRpc,
+ db: &mut Client,
+ config: &Config,
+) -> Result<(), Box<dyn std::error::Error>> {
+ match rpc.get_tx_op_return(&id) {
+ Ok((full, bytes)) => {
+ match decode_info(&bytes) {
+ Ok(info) => {
+ match info {
+ Info::Transaction { wtid, .. } => {
+ let addr = full.details[0].address.as_ref().unwrap();
+ let amount = btc_to_taler(&full.amount);
+
+ if confirmations < 0 {
+ // Handle conflicting tx
+ let nb_row = db.execute(
+ "UPDATE tx_out SET status=$1, txid=NULL where txid=$2",
+ &[&(TxStatus::Delayed as i16), &id.as_ref()],
+ )?;
+ if nb_row > 0 {
+ warn!(">> (conflict) {} in {} to {}", base32(&wtid), id, addr);
+ }
+ } else {
+ let mut tx = db.transaction()?;
+ let row = tx.query_opt(
+ "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE",
+ &[&wtid.as_ref()],
+ )?;
+ if let Some(row) = row {
+ let _id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ match TxStatus::try_from(status as u8).unwrap() {
+ TxStatus::Requested | TxStatus::Delayed => {
+ tx.execute(
+ "UPDATE tx_out SET status=$1 where id=$2",
+ &[&(TxStatus::Sent as i16), &_id],
+ )?;
+ tx.commit()?;
+ warn!(
+ ">> (recovered) {} {} in {} to {}",
+ amount,
+ base32(&wtid),
+ id,
+ addr
+ );
+ }
+ TxStatus::Sent => {}
+ }
+ } else {
+ let debit_addr = sender_address(rpc, &full)?;
+ let date =
+ SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
+ // Generate a random request_uid
+ let mut request_uid = [0; 64];
+ OsRng.fill_bytes(&mut request_uid);
+ let nb = tx.execute(
+ "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
+ &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &request_uid.as_ref()],
+ )?;
+ tx.commit()?;
+ if nb > 0 {
+ warn!(
+ ">> (onchain) {} {} in {} to {}",
+ amount,
+ base32(&wtid),
+ id,
+ addr
+ );
+ }
+ }
+ }
+ }
+ Info::Bounce { bounced } => {
+ if confirmations < 0 {
+ // Handle conflicting tx
+ let nb_row = db.execute(
+ "UPDATE bounce SET status=$1, txid=NULL where txid=$2",
+ &[&(BounceStatus::Delayed as i16), &id.as_ref()],
+ )?;
+ if nb_row > 0 {
+ warn!("|| (conflict) {} in {}", &bounced, &id);
+ }
+ } else {
+ let mut tx = db.transaction()?;
+ let row = tx.query_opt(
+ "SELECT id, status FROM bounce WHERE bounced=$1 FOR UPDATE",
+ &[&bounced.as_ref()],
+ )?;
+ if let Some(row) = row {
+ let _id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ match BounceStatus::try_from(status as u8).unwrap() {
+ BounceStatus::Requested | BounceStatus::Delayed => {
+ tx.execute(
+ "UPDATE bounce SET status=$1 where id=$2",
+ &[&(BounceStatus::Sent as i16), &_id],
+ )?;
+ tx.commit()?;
+ warn!("|| (recovered) {} in {}", &bounced, &id);
+ }
+ BounceStatus::Ignored => error!(
+ "watcher: ignored bounce {} found in chain at {}",
+ bounced, id
+ ),
+ BounceStatus::Sent => {}
+ }
+ } else {
+ let nb = tx.execute(
+ "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING",
+ &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)],
+ )?;
+ tx.commit()?;
+ if nb > 0 {
+ warn!("|| (onchain) {} in {}", &bounced, &id);
+ }
+ }
+ }
+ }
+ }
+ }
+ Err(err) => warn!("send: decode-info {} - {}", id, err),
+ }
+ }
+ Err(err) => match err {
+ GetOpReturnErr::MissingOpReturn => { /* Ignore */ }
+ GetOpReturnErr::RPC(e) => return Err(e)?,
+ },
+ }
+ Ok(())
+}
+
+/// Sync database with na incoming confirmed transaction
+fn sync_chain_incoming_confirmed(
+ id: &Txid,
+ rpc: &mut BtcRpc,
+ db: &mut Client,
+) -> Result<(), Box<dyn std::error::Error>> {
+ match rpc.get_tx_segwit_key(&id) {
+ Ok((full, reserve_pub)) => {
+ // Store transactions in database
+ let debit_addr = sender_address(rpc, &full)?;
+ let credit_addr = full.details[0].address.as_ref().unwrap();
+ let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
+ let amount = btc_to_taler(&full.amount);
+ let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
+ &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref()
+ ])?;
+ if nb > 0 {
+ info!(
+ "<< {} {} in {} from {}",
+ amount,
+ base32(&reserve_pub),
+ id,
+ debit_addr
+ );
+ }
+ }
+ Err(err) => match err {
+ GetSegwitErr::Decode(_) => {
+ // Request a bounce if encoding is wrong
+ db.execute(
+ "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING",
+ &[&id.as_ref()],
+ )?;
+ }
+ GetSegwitErr::RPC(e) => return Err(e.into()),
+ },
+ }
+ Ok(())
+}
+
/// Wait for new block and notify arrival with postgreSQL notifications
fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) {
loop {
diff --git a/btc-wire/src/rpc.rs b/btc-wire/src/rpc.rs
@@ -14,22 +14,27 @@ use std::{
fmt::Debug,
io::{self, BufRead, BufReader, Write},
net::TcpStream,
+ time::Duration,
};
use crate::config::BitcoinConfig;
#[derive(Debug, serde::Serialize)]
-struct BtcRequest<'a, T: serde::Serialize> {
+struct RpcRequest<'a, T: serde::Serialize> {
method: &'a str,
id: u64,
params: &'a T,
}
#[derive(Debug, serde::Deserialize)]
-struct BtcResponse<T> {
- result: Option<T>,
- error: Option<BtcErr>,
- id: u64,
+#[serde(untagged)]
+enum BtcResponse<T> {
+ RpcResponse {
+ result: Option<T>,
+ error: Option<BtcErr>,
+ id: u64,
+ },
+ Error(String),
}
#[derive(Debug, serde::Deserialize)]
@@ -42,8 +47,10 @@ struct BtcErr {
pub enum Error {
#[error("{0:?}")]
Transport(#[from] std::io::Error),
- #[error("{code:?} - {msg}")]
+ #[error("RPC: {code:?} - {msg}")]
RPC { code: ErrorCode, msg: String },
+ #[error("BTC: {0}")]
+ Bitcoin(String),
#[error("JSON: {0}")]
Json(#[from] serde_json::Error),
#[error("No result or error")]
@@ -82,7 +89,7 @@ impl BtcRpc {
let cookie_path = config.dir.join(".cookie");
let cookie = std::fs::read(cookie_path)?;
// Open connection
- let sock = TcpStream::connect(config.addr)?;
+ let sock = TcpStream::connect_timeout(&config.addr, Duration::from_secs(5))?;
let conn = BufReader::new(sock);
Ok(Self {
@@ -98,7 +105,7 @@ impl BtcRpc {
T: serde::de::DeserializeOwned + Debug,
{
// TODO rethink timeout
- let request = BtcRequest {
+ let request = RpcRequest {
method,
id: self.id,
params,
@@ -138,20 +145,23 @@ impl BtcRpc {
// Read body
let amount = sock.read_until(b'\n', &mut buf)?;
let response: BtcResponse<T> = serde_json::from_slice(&buf[..amount])?;
-
- assert_eq!(self.id, response.id);
- self.id += 1;
-
- if let Some(ok) = response.result {
- Ok(ok)
- } else {
- Err(match response.error {
- Some(err) => Error::RPC {
- code: err.code,
- msg: err.message,
- },
- None => Error::Null,
- })
+ match response {
+ BtcResponse::RpcResponse { result, error, id } => {
+ assert_eq!(self.id, id);
+ self.id += 1;
+ if let Some(ok) = result {
+ Ok(ok)
+ } else {
+ Err(match error {
+ Some(err) => Error::RPC {
+ code: err.code,
+ msg: err.message,
+ },
+ None => Error::Null,
+ })
+ }
+ }
+ BtcResponse::Error(msg) => Err(Error::Bitcoin(msg)),
}
}
diff --git a/script/setup.sh b/script/setup.sh
@@ -28,7 +28,9 @@ for dir in $BTC_DIR $BTC_DIR2 $DB_DIR log; do
done
# Clear logs
-rm -f log/*
+for log in log/*; do
+ echo -n "" > $log
+done
# Setup command helpers
BTC_CLI="bitcoin-cli -datadir=$BTC_DIR"