summaryrefslogtreecommitdiff
path: root/btc-wire/src/main.rs
diff options
context:
space:
mode:
authorAntoine A <>2021-12-28 19:43:39 +0100
committerAntoine A <>2021-12-28 19:43:39 +0100
commit80005a6b383eeaf4593a25173727ba5c03fa2e74 (patch)
tree3f629166073fd747fc294f597b77199375f7c506 /btc-wire/src/main.rs
parent976967b98ec4120382e981f0d029cca30d68c908 (diff)
downloaddepolymerization-80005a6b383eeaf4593a25173727ba5c03fa2e74.tar.gz
depolymerization-80005a6b383eeaf4593a25173727ba5c03fa2e74.tar.bz2
depolymerization-80005a6b383eeaf4593a25173727ba5c03fa2e74.zip
btc-wire: add and test bouncing
Diffstat (limited to 'btc-wire/src/main.rs')
-rw-r--r--btc-wire/src/main.rs191
1 files changed, 133 insertions, 58 deletions
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
index e728b7f..29ad7ca 100644
--- a/btc-wire/src/main.rs
+++ b/btc-wire/src/main.rs
@@ -1,7 +1,7 @@
use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, SignedAmount, Txid};
use btc_wire::{
config::BitcoinConfig,
- rpc::{BtcRpc, Category},
+ rpc::{self, BtcRpc, Category, ErrorCode},
rpc_utils::{default_data_dir, sender_address},
segwit::DecodeSegWitErr,
GetOpReturnErr, GetSegwitErr,
@@ -24,7 +24,7 @@ use url::Url;
use crate::{
fail_point::fail_point,
info::{encode_info, Info},
- status::TxStatus,
+ status::{BounceStatus, TxStatus},
};
mod fail_point;
@@ -68,7 +68,7 @@ fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> {
/// Listen for new proposed transactions and announce them on the bitcoin network
fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config) {
// Send a transaction on the blockchain, return true if more transactions with the same status remains
- fn send_tx(
+ fn send(
db: &mut Client,
rpc: &mut BtcRpc,
status: TxStatus,
@@ -93,7 +93,7 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
let metadata = encode_info(&info);
fail_point("Skip send_op_return", 0.2)?;
- match rpc.send_op_return(&addr, &amount, &metadata) {
+ match rpc.send_op_return(&addr, &amount, &metadata, false) {
Ok(tx_id) => {
fail_point("Fail update db", 0.2)?;
tx.execute(
@@ -101,7 +101,7 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
&[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id],
)?;
let amount = btc_amount_to_taler_amount(&amount.to_signed().unwrap());
- info!("SEND >> {} {} in {}", addr, amount, tx_id);
+ info!("send {} {} in {}", addr, amount, tx_id);
}
Err(e) => {
info!("sender: RPC - {}", e);
@@ -116,6 +116,54 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
Ok(row.is_some())
}
+ // Bounce a transaction on the blockchain, return true if more bounce with the same status remains
+ fn bounce(
+ db: &mut Client,
+ rpc: &mut BtcRpc,
+ status: BounceStatus,
+ fee: &BtcAmount,
+ ) -> Result<bool, Box<dyn std::error::Error>> {
+ assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested);
+ let mut tx = db.transaction()?;
+ // We lock the row with FOR UPDATE to prevent sending same transaction multiple time
+ let row = tx.query_opt(
+ "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE",
+ &[&(status as i16)],
+ )?;
+ if let Some(row) = &row {
+ let id: i32 = row.get(0);
+ let bounced: Txid = Txid::from_slice(row.get(1))?;
+ let info = Info::Bounce { bounced };
+ let metadata = encode_info(&info);
+
+ fail_point("Skip send_op_return", 0.2)?;
+ match rpc.bounce(&bounced, &fee, &metadata) {
+ Ok(it) => {
+ info!("bounce {} in {}", &bounced, &it);
+ tx.execute(
+ "UPDATE bounce SET txid = $1, status = $2 WHERE id = $3",
+ &[&it.as_ref(), &(BounceStatus::Sent as i16), &id],
+ )?;
+ }
+ Err(err) => match err {
+ rpc::Error::RPC {
+ code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError,
+ msg,
+ } => {
+ info!("ignore bounce {} because {}", &bounced, msg);
+ tx.execute(
+ "UPDATE bounce SET status = $1 WHERE id = $2",
+ &[&(BounceStatus::Ignored as i16), &id],
+ )?;
+ }
+ _ => Err(err)?,
+ },
+ }
+ tx.commit()?;
+ }
+ Ok(row.is_some())
+ }
+
// TODO check if transactions are abandoned
let mut failed = false;
@@ -143,29 +191,15 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
// As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent
// Send delayed transactions
- while send_tx(db, rpc, TxStatus::Delayed)? {}
+ while send(db, rpc, TxStatus::Delayed)? {}
// Send requested transactions
- while send_tx(db, rpc, TxStatus::Requested)? {}
+ while send(db, rpc, TxStatus::Requested)? {}
- // Check if already bounced
- /*if nb > 0 && false {
- // We do not handle failures, bouncing is done in a best effort manner
- match rpc.bounce(&id, &BtcAmount::from_sat(config.bounce_fee)) {
- Ok(it) => {
- info!("bounce {} in {}", &id, &it);
- db.execute(
- "UPDATE bounce SET txid = $1 WHERE bounced = $2",
- &[&it.as_ref(), &id.as_ref()],
- )?;
- }
- Err(err) => match err {
- BounceErr::AmountLessThanFee => { /* Ignore */ }
- BounceErr::NotAReceiveTransaction | BounceErr::RPC(_) => {
- Err(err)?
- }
- },
- }
- }*/
+ let bounce_fee = BtcAmount::from_sat(config.bounce_fee);
+ // Send delayed bounce
+ while bounce(db, rpc, BounceStatus::Delayed, &bounce_fee)? {}
+ // Send requested bounce
+ while bounce(db, rpc, BounceStatus::Requested, &bounce_fee)? {}
Ok(())
})();
@@ -186,22 +220,22 @@ fn sync_chain(
) -> Result<(), Box<dyn std::error::Error>> {
// Get stored last_hash
let last_hash = last_hash(db)?;
- let confirmation = config.confirmation;
+ let min_confirmations = config.confirmation;
// Get a set of transactions ids to parse
- let (txs, lastblock): (HashMap<Txid, Category>, BlockHash) = {
+ let (txs, lastblock): (HashMap<Txid, (Category, i32)>, BlockHash) = {
// Get all transactions made since this block
- let list = rpc.list_since_block(last_hash.as_ref(), confirmation, true)?;
+ let list = rpc.list_since_block(last_hash.as_ref(), min_confirmations, true)?;
// Only keep ids and category
- let txs: HashMap<Txid, Category> = list
+ let txs = list
.transactions
.into_iter()
- .map(|tx| (tx.txid, tx.category))
+ .map(|tx| (tx.txid, (tx.category, tx.confirmations)))
.collect();
(txs, list.lastblock)
};
- for (id, category) in txs {
+ for (id, (category, confirmations)) in txs {
match category {
Category::Send => {
match rpc.get_tx_op_return(&id) {
@@ -211,14 +245,15 @@ fn sync_chain(
Ok(info) => match info {
Info::Transaction { wtid, url } => {
let row = tx.query_opt(
- "SELECT status, id FROM tx_out WHERE wtid=$1 FOR UPDATE",
+ "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE",
&[&wtid.as_ref()],
)?;
if let Some(row) = row {
- let status: i16 = row.get(0);
- let _id: i32 = row.get(1);
+ let _id: i32 = row.get(0);
+ let status: i16 = row.get(1);
let status: TxStatus =
TxStatus::try_from(status as u8).unwrap();
+ // TODO match
if status != TxStatus::Sent {
tx.execute(
"UPDATE tx_out SET status=$1 where id=$2",
@@ -235,7 +270,7 @@ fn sync_chain(
full.details[0].address.as_ref().unwrap();
let amount =
btc_amount_to_taler_amount(&full.amount);
- info!("SEND >> {} {} in {}", addr, amount, &id);
+ info!("send {} {} in {}", addr, amount, &id);
}
}
} else {
@@ -249,16 +284,53 @@ fn sync_chain(
OsRng.fill_bytes(&mut request_uid);
let nb = tx.execute(
"INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
- &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &request_uid.as_ref()
- ],
- )?;
+ &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &request_uid.as_ref()],
+ )?;
if nb > 0 {
- warn!("watcher: found an unregistered outgoing address {} {} in tx {}", crockford_base32_encode(&wtid), &url, id);
+ warn!(
+ "recovered {} {} in tx {}",
+ crockford_base32_encode(&wtid),
+ &url,
+ id
+ );
}
}
}
- Info::Bounce { .. } => {
- // TODO
+ Info::Bounce { bounced } => {
+ let row = tx.query_opt(
+ "SELECT id, status FROM bounce WHERE bounced=$1 FOR UPDATE",
+ &[&bounced.as_ref()],
+ )?;
+ if let Some(row) = row {
+ let _id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ let status: BounceStatus =
+ BounceStatus::try_from(status as u8).unwrap();
+ assert!(status != BounceStatus::Ignored); // TODO
+ if status != BounceStatus::Sent {
+ tx.execute(
+ "UPDATE bounce SET status=$1 where id=$2",
+ &[&(BounceStatus::Sent as i16), &_id],
+ )?;
+ if status == BounceStatus::Delayed
+ || status == BounceStatus::Requested
+ {
+ warn!(
+ "watcher: bounce {} have been recovered automatically",
+ _id
+ );
+ info!("bounce {} in {}", &bounced, &id);
+ }
+ }
+ } else {
+ let nb = tx.execute(
+ "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING",
+ &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)],
+ )?;
+ if nb > 0 {
+ info!("recovered bounce {} in {}", &bounced, &id);
+ }
+ }
}
},
Err(err) => warn!("send: decode-info {} - {}", id, err),
@@ -271,9 +343,9 @@ fn sync_chain(
},
}
}
- Category::Receive => match rpc.get_tx_segwit_key(&id) {
- Ok((full, reserve_pub)) => {
- if full.confirmations >= confirmation as i32 {
+ Category::Receive if confirmations >= min_confirmations as i32 => {
+ match rpc.get_tx_segwit_key(&id) {
+ Ok((full, reserve_pub)) => {
let debit_addr = sender_address(rpc, &full)?;
let credit_addr = full.details[0].address.as_ref().unwrap();
let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
@@ -282,22 +354,25 @@ fn sync_chain(
&date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref()
])?;
if nb > 0 {
- info!("{} << {} {} in {}", &debit_addr, &credit_addr, &amount, &id);
+ info!(
+ "receive {} << {} {} in {}",
+ &debit_addr, &credit_addr, &amount, &id
+ );
}
}
+ Err(err) => match err {
+ GetSegwitErr::Decode(
+ DecodeSegWitErr::MissingSegWitAddress | DecodeSegWitErr::NoMagicIdMatch,
+ ) => {
+ // Request a bounce
+ db.execute("INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", &[&id.as_ref()])?;
+ }
+ err => warn!("receive: {} {}", id, err),
+ },
}
- Err(err) => match err {
- GetSegwitErr::Decode(
- DecodeSegWitErr::MissingSegWitAddress | DecodeSegWitErr::NoMagicIdMatch,
- ) => {
- // Request a bounce
- db.execute("INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", &[&id.as_ref()])?;
- }
- err => warn!("receive: {} {}", id, err),
- },
- },
- Category::Generate | Category::Immature | Category::Orphan => {
- // Ignore coinbase transactions
+ }
+ _ => {
+ // Ignore coinbase and unconfirmed send transactions
}
}
}