depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 656f4092bd42002e33362cb2aba75cadd66c9716
parent e07437ecd30f4e16aad37caa2efbd084ce79a6a7
Author: Antoine A <>
Date:   Wed, 12 Jan 2022 16:14:08 +0100

btc-wire: do not create fake request_uid when recovering onchain transactions and clean code

Diffstat:
Mbtc-wire/src/main.rs | 239++++++++++++++++++++++++++++++++++++++++---------------------------------------
Mdb/btc.sql | 2+-
Mdb/common.sql | 2+-
3 files changed, 123 insertions(+), 120 deletions(-)

diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs @@ -7,7 +7,6 @@ use btc_wire::{ }; use info::decode_info; use postgres::{fallible_iterator::FallibleIterator, Client}; -use rand::{rngs::OsRng, RngCore}; use reconnect::{AutoReconnectRPC, AutoReconnectSql}; use std::{ collections::{HashMap, HashSet}, @@ -367,126 +366,130 @@ fn sync_chain_outgoing( db: &mut Client, config: &Config, ) -> Result<(), Box<dyn std::error::Error>> { - match rpc.get_tx_op_return(&id) { - Ok((full, bytes)) => { - match decode_info(&bytes) { - Ok(info) => { - match info { - Info::Transaction { wtid, .. } => { - let addr = full.details[0].address.as_ref().unwrap(); - let amount = btc_to_taler(&full.amount); - - if confirmations < 0 { - // Handle conflicting tx - let nb_row = db.execute( - "UPDATE tx_out SET status=$1, txid=NULL where txid=$2", - &[&(TxStatus::Delayed as i16), &id.as_ref()], - )?; - if nb_row > 0 { - warn!(">> (conflict) {} in {} to {}", base32(&wtid), id, addr); - } - } else { - let mut tx = db.transaction()?; - let row = tx.query_opt( - "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE", - &[&wtid.as_ref()], + match rpc + .get_tx_op_return(&id) + .map(|(full, bytes)| (full, decode_info(&bytes))) + { + Ok((full, Ok(info))) => match info { + Info::Transaction { wtid, .. } => { + let credit_addr = full.details[0].address.as_ref().unwrap(); + let amount = btc_to_taler(&full.amount); + + if confirmations < 0 { + // Handle conflicting tx + let nb_row = db.execute( + "UPDATE tx_out SET status=$1, txid=NULL where txid=$2", + &[&(TxStatus::Delayed as i16), &id.as_ref()], + )?; + if nb_row > 0 { + warn!( + ">> (conflict) {} in {} to {}", + base32(&wtid), + id, + credit_addr + ); + } + } else { + // Get previous out tx + let mut tx = db.transaction()?; + let row = tx.query_opt( + "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE", + &[&wtid.as_ref()], + )?; + if let Some(row) = row { + // If already in database sync status + let _id: i32 = row.get(0); + let status: i16 = row.get(1); + match TxStatus::try_from(status as u8).unwrap() { + TxStatus::Requested | TxStatus::Delayed => { + tx.execute( + "UPDATE tx_out SET status=$1 where id=$2", + &[&(TxStatus::Sent as i16), &_id], )?; - if let Some(row) = row { - let _id: i32 = row.get(0); - let status: i16 = row.get(1); - match TxStatus::try_from(status as u8).unwrap() { - TxStatus::Requested | TxStatus::Delayed => { - tx.execute( - "UPDATE tx_out SET status=$1 where id=$2", - &[&(TxStatus::Sent as i16), &_id], - )?; - tx.commit()?; - warn!( - ">> (recovered) {} {} in {} to {}", - amount, - base32(&wtid), - id, - addr - ); - } - TxStatus::Sent => {} - } - } else { - let debit_addr = sender_address(rpc, &full)?; - let date = - SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); - // Generate a random request_uid - let mut request_uid = [0; 64]; - OsRng.fill_bytes(&mut request_uid); - let nb = tx.execute( - "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", - &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &request_uid.as_ref()], - )?; - tx.commit()?; - if nb > 0 { - warn!( - ">> (onchain) {} {} in {} to {}", - amount, - base32(&wtid), - id, - addr - ); - } - } + tx.commit()?; + warn!( + ">> (recovered) {} {} in {} to {}", + amount, + base32(&wtid), + id, + credit_addr + ); } + TxStatus::Sent => { /* Status is correct */ } } - Info::Bounce { bounced } => { - if confirmations < 0 { - // Handle conflicting tx - let nb_row = db.execute( - "UPDATE bounce SET status=$1, txid=NULL where txid=$2", - &[&(BounceStatus::Delayed as i16), &id.as_ref()], - )?; - if nb_row > 0 { - warn!("|| (conflict) {} in {}", &bounced, &id); - } - } else { - let mut tx = db.transaction()?; - let row = tx.query_opt( - "SELECT id, status FROM bounce WHERE bounced=$1 FOR UPDATE", - &[&bounced.as_ref()], + } else { + // Else add to database + let debit_addr = sender_address(rpc, &full)?; + let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); + let nb = tx.execute( + "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", + &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &None::<&[u8]>], + )?; + tx.commit()?; + if nb > 0 { + warn!( + ">> (onchain) {} {} in {} to {}", + amount, + base32(&wtid), + id, + credit_addr + ); + } + } + } + } + Info::Bounce { bounced } => { + if confirmations < 0 { + // Handle conflicting tx + let nb_row = db.execute( + "UPDATE bounce SET status=$1, txid=NULL where txid=$2", + &[&(BounceStatus::Delayed as i16), &id.as_ref()], + )?; + if nb_row > 0 { + warn!("|| (conflict) {} in {}", &bounced, &id); + } + } else { + // Get previous bounce + let mut tx = db.transaction()?; + let row = tx.query_opt( + "SELECT id, status FROM bounce WHERE bounced=$1 FOR UPDATE", + &[&bounced.as_ref()], + )?; + if let Some(row) = row { + // If already in database sync status + let _id: i32 = row.get(0); + let status: i16 = row.get(1); + match BounceStatus::try_from(status as u8).unwrap() { + BounceStatus::Requested | BounceStatus::Delayed => { + tx.execute( + "UPDATE bounce SET status=$1 where id=$2", + &[&(BounceStatus::Sent as i16), &_id], )?; - if let Some(row) = row { - let _id: i32 = row.get(0); - let status: i16 = row.get(1); - match BounceStatus::try_from(status as u8).unwrap() { - BounceStatus::Requested | BounceStatus::Delayed => { - tx.execute( - "UPDATE bounce SET status=$1 where id=$2", - &[&(BounceStatus::Sent as i16), &_id], - )?; - tx.commit()?; - warn!("|| (recovered) {} in {}", &bounced, &id); - } - BounceStatus::Ignored => error!( - "watcher: ignored bounce {} found in chain at {}", - bounced, id - ), - BounceStatus::Sent => {} - } - } else { - let nb = tx.execute( - "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", - &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)], - )?; - tx.commit()?; - if nb > 0 { - warn!("|| (onchain) {} in {}", &bounced, &id); - } - } + tx.commit()?; + warn!("|| (recovered) {} in {}", &bounced, &id); } + BounceStatus::Ignored => error!( + "watcher: ignored bounce {} found in chain at {}", + bounced, id + ), + BounceStatus::Sent => { /* Status is correct */ } + } + } else { + // Else add to database + let nb = tx.execute( + "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", + &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)], + )?; + tx.commit()?; + if nb > 0 { + warn!("|| (onchain) {} in {}", &bounced, &id); } } } - Err(err) => warn!("send: decode-info {} - {}", id, err), } - } - Err(err) => match err { + }, + Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e), + Err(e) => match e { GetOpReturnErr::MissingOpReturn => { /* Ignore */ } GetOpReturnErr::RPC(e) => return Err(e)?, }, @@ -508,8 +511,8 @@ fn sync_chain_incoming_confirmed( let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); let amount = btc_to_taler(&full.amount); let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ - &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref() - ])?; + &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref() + ])?; if nb > 0 { info!( "<< {} {} in {} from {}", @@ -522,7 +525,7 @@ fn sync_chain_incoming_confirmed( } Err(err) => match err { GetSegwitErr::Decode(_) => { - // Request a bounce if encoding is wrong + // If encoding is wrong request a bounce db.execute( "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", &[&id.as_ref()], @@ -537,10 +540,10 @@ fn sync_chain_incoming_confirmed( /// Wait for new block and notify arrival with postgreSQL notifications fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) { loop { + let rpc = rpc.client(); + let db = db.client(); let result: Result<(), Box<dyn std::error::Error>> = (|| { - let rpc = rpc.client(); - let db = db.client(); - rpc.wait_for_new_block(0).ok(); + rpc.wait_for_new_block(0)?; db.execute("NOTIFY new_block", &[])?; Ok(()) })(); diff --git a/db/btc.sql b/db/btc.sql @@ -27,7 +27,7 @@ CREATE TABLE tx_out ( exchange_url TEXT NOT NULL, status SMALLINT NOT NULL DEFAULT 0, txid BYTEA UNIQUE, - request_uid BYTEA NOT NULL UNIQUE + request_uid BYTEA UNIQUE ); -- Bounced transaction diff --git a/db/common.sql b/db/common.sql @@ -17,5 +17,5 @@ CREATE TABLE tx_out ( debit_acc TEXT NOT NULL, credit_acc TEXT NOT NULL, exchange_url TEXT NOT NULL, - request_uid BYTEA NOT NULL UNIQUE + request_uid BYTEA UNIQUE ); \ No newline at end of file