depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 773b374e47a7baadb729e68203e548a54b33ce4c
parent b26af1f75a683f3a2d50aaf0de36b58a71ae7754
Author: Antoine A <>
Date:   Fri, 21 Jan 2022 13:52:42 +0100

Handle stuck transactions

Diffstat:
MCargo.lock | 64+++++++++++++++++++++++++++++++++++++---------------------------
Mbtc-wire/src/bin/test.rs | 5+++--
Mbtc-wire/src/lib.rs | 5+++--
Mbtc-wire/src/loops/worker.rs | 289++++++++++++++++++++++++++++++++++++++++++++++---------------------------------
Mbtc-wire/src/rpc.rs | 29++++++++++++++++++++++++-----
Mmakefile | 1+
Mtaler-common/src/config.rs | 8++++++--
Atest/btc/bumpfee.sh | 95+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtest/btc/fail.sh | 4++--
Mtest/btc/stress.sh | 20++++++++++----------
Mtest/btc/wire.sh | 14+++++++-------
Mtest/common.sh | 9++-------
Atest/conf/taler_bump.conf | 14++++++++++++++
Mwire-gateway/src/main.rs | 7+++----
14 files changed, 375 insertions(+), 189 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -784,9 +784,9 @@ checksum = "1aab8fc367588b89dcee83ab0fd66b72b50b72fa1904d7095045ace2b0c81c35" [[package]] name = "js-sys" -version = "0.3.55" +version = "0.3.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7cc9ffccd38c451a86bf13657df244e9c3f37493cce8e5e21e940963777acc84" +checksum = "a38fc24e30fd564ce974c02bf1d337caddff65be6cc4735a1f7eab22a7440f04" dependencies = [ "wasm-bindgen", ] @@ -799,9 +799,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.112" +version = "0.2.113" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b03d17f364a3a042d5e5d46b053bbbf82c92c9430c592dd4c064dc6ee997125" +checksum = "eef78b64d87775463c549fbd80e19249ef436ea3bf1de2a1eb7e717ec7fab1e9" [[package]] name = "listenfd" @@ -932,6 +932,15 @@ dependencies = [ ] [[package]] +name = "num_threads" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71a1eb3a36534514077c1e079ada2fb170ef30c47d203aa6916138cf882ecd52" +dependencies = [ + "libc", +] + +[[package]] name = "oorandom" version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1315,9 +1324,9 @@ checksum = "568a8e6258aa33c13358f81fd834adb854c6f7c9468520910a9b1e8fac068012" [[package]] name = "serde" -version = "1.0.133" +version = "1.0.134" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97565067517b60e2d1ea8b268e59ce036de907ac523ad83a0475da04e818989a" +checksum = "96b3c34c1690edf8174f5b289a336ab03f568a4460d8c6df75f2f3a692b3bc6a" dependencies = [ "serde_derive", ] @@ -1334,9 +1343,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.133" +version = "1.0.134" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed201699328568d8d08208fdd080e3ff594e6c422e438b6705905da01005d537" +checksum = "784ed1fbfa13fe191077537b0d70ec8ad1e903cfe04831da608aa36457cb653d" dependencies = [ "proc-macro2", "quote", @@ -1431,9 +1440,9 @@ checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83" [[package]] name = "socket2" -version = "0.4.2" +version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dc90fe6c7be1a323296982db1836d1ea9e47b6839496dde9a541bc496df3516" +checksum = "0f82496b90c36d70af5fcd482edaa2e0bd16fade569de1330405fecbbdac736b" dependencies = [ "libc", "winapi", @@ -1463,9 +1472,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601" [[package]] name = "syn" -version = "1.0.85" +version = "1.0.86" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a684ac3dcd8913827e18cd09a68384ee66c1de24157e3c556c9ab16d85695fb7" +checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b" dependencies = [ "proc-macro2", "quote", @@ -1519,12 +1528,13 @@ dependencies = [ [[package]] name = "time" -version = "0.3.5" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41effe7cfa8af36f439fac33861b66b049edc6f9a32331e2312660529c1c24ad" +checksum = "c8d54b9298e05179c335de2b9645d061255bcd5155f843b3e328d2cfe0a5b413" dependencies = [ - "itoa 0.4.8", + "itoa 1.0.1", "libc", + "num_threads", "time-macros", ] @@ -1762,9 +1772,9 @@ checksum = "fd6fbd9a79829dd1ad0cc20627bf1ed606756a7f77edff7b66b7064f9cb327c6" [[package]] name = "wasm-bindgen" -version = "0.2.78" +version = "0.2.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "632f73e236b219150ea279196e54e610f5dbafa5d61786303d4da54f84e47fce" +checksum = "25f1af7423d8588a3d840681122e72e6a24ddbcb3f0ec385cac0d12d24256c06" dependencies = [ "cfg-if", "wasm-bindgen-macro", @@ -1772,9 +1782,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-backend" -version = "0.2.78" +version = "0.2.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a317bf8f9fba2476b4b2c85ef4c4af8ff39c3c7f0cdfeed4f82c34a880aa837b" +checksum = "8b21c0df030f5a177f3cba22e9bc4322695ec43e7257d865302900290bcdedca" dependencies = [ "bumpalo", "lazy_static", @@ -1787,9 +1797,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.78" +version = "0.2.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d56146e7c495528bf6587663bea13a8eb588d39b36b679d83972e1a2dbbdacf9" +checksum = "2f4203d69e40a52ee523b2529a773d5ffc1dc0071801c87b3d270b471b80ed01" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -1797,9 +1807,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.78" +version = "0.2.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7803e0eea25835f8abdc585cd3021b3deb11543c6fe226dcd30b228857c5c5ab" +checksum = "bfa8a30d46208db204854cadbb5d4baf5fcf8071ba5bf48190c3e59937962ebc" dependencies = [ "proc-macro2", "quote", @@ -1810,15 +1820,15 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.78" +version = "0.2.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0237232789cf037d5480773fe568aac745bfe2afbc11a863e97901780a6b47cc" +checksum = "3d958d035c4438e28c70e4321a2911302f10135ce78a9c7834c0cab4123d06a2" [[package]] name = "web-sys" -version = "0.3.55" +version = "0.3.56" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38eb105f1c59d9eaa6b5cdc92b859d85b926e82cb2e0945cd0c9259faa6fe9fb" +checksum = "c060b319f29dd25724f09a2ba1418f142f539b2be99fbf4d2d5a8f7330afb8eb" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/btc-wire/src/bin/test.rs b/btc-wire/src/bin/test.rs @@ -154,7 +154,7 @@ pub fn main() { // Send metadata let msg = "J'aime le chocolat".as_bytes(); let id = client_rpc - .send_op_return(&wire_addr, &test_amount, msg, false) + .send_op_return(&wire_addr, &test_amount, msg, false, false) .unwrap(); // Check in mempool assert!( @@ -295,6 +295,7 @@ pub fn main() { addresses.iter().map(|addr| (addr, &test_amount)), None, false, + false, ) .unwrap() }) @@ -303,7 +304,7 @@ pub fn main() { let before = client_rpc.get_balance().unwrap(); // Send a transaction with multiple input from multiple transaction of different outputs len let send_id = client_rpc - .send_custom(&txs, [(&wire_addr, &(test_amount * 3))], None, false) + .send_custom(&txs, [(&wire_addr, &(test_amount * 3))], None, false, false) .unwrap(); wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[send_id]); let bounce_id = wire_rpc.bounce(&send_id, &bounce_fee, &[]).unwrap(); diff --git a/btc-wire/src/lib.rs b/btc-wire/src/lib.rs @@ -83,10 +83,11 @@ impl BtcRpc { amount: &Amount, metadata: &[u8], subtract_fee: bool, + replaceable: bool ) -> rpc::Result<Txid> { assert!(metadata.len() > 0, "No medatata"); assert!(metadata.len() <= 80, "Max 80 bytes"); - self.send_custom(&[], [(to, amount)], Some(metadata), subtract_fee) + self.send_custom(&[], [(to, amount)], Some(metadata), subtract_fee, replaceable) } /// Get detailed information about an in-wallet transaction and its op_return metadata @@ -132,7 +133,7 @@ impl BtcRpc { let id = if metadata.is_empty() { self.send(&sender, &bounce_amount, true)? } else { - self.send_op_return(&sender, &bounce_amount, metadata, true)? + self.send_op_return(&sender, &bounce_amount, metadata, true, false)? }; Ok(id) } diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs @@ -36,107 +36,6 @@ pub fn worker( config: &Config, state: &WireState, ) { - /// Send a transaction on the blockchain, return true if more transactions with the same status remains - fn send( - db: &mut Client, - rpc: &mut BtcRpc, - status: TxStatus, - ) -> Result<bool, Box<dyn std::error::Error>> { - assert!(status == TxStatus::Delayed || status == TxStatus::Requested); - let mut tx = db.transaction()?; - // We lock the row with FOR UPDATE to prevent sending same transaction multiple time - let row = tx.query_opt( - "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE", - &[&(status as i16)], - )?; - if let Some(row) = &row { - let id: i32 = row.get(0); - let amount = taler_to_btc(&Amount::from_str(row.get(1))?)?; - let wtid: &[u8] = row.get(2); - let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?; - let exchange_base_url: Url = Url::parse(row.get(4))?; - let info = Info::Transaction { - wtid: wtid.try_into()?, - url: exchange_base_url, - }; - let metadata = encode_info(&info); - - fail_point("(injected) fail send_op_return", 0.2)?; - match rpc.send_op_return(&addr, &amount, &metadata, false) { - Ok(tx_id) => { - fail_point("(injected) fail update db", 0.2)?; - tx.execute( - "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", - &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id], - )?; - tx.commit()?; - let amount = btc_to_taler(&amount.to_signed().unwrap()); - info!(">> {} {} in {} to {}", amount, base32(wtid), tx_id, addr); - } - Err(e) => { - tx.execute( - "UPDATE tx_out SET status=$1 WHERE id=$2", - &[&(TxStatus::Delayed as i16), &id], - )?; - tx.commit()?; - info!("sender: RPC - {}", e); - } - } - } - Ok(row.is_some()) - } - - /// Bounce a transaction on the blockchain, return true if more bounce with the same status remains - fn bounce( - db: &mut Client, - rpc: &mut BtcRpc, - status: BounceStatus, - fee: &BtcAmount, - ) -> Result<bool, Box<dyn std::error::Error>> { - assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested); - let mut tx = db.transaction()?; - // We lock the row with FOR UPDATE to prevent sending same transaction multiple time - let row = tx.query_opt( - "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE", - &[&(status as i16)], - )?; - if let Some(row) = &row { - let id: i32 = row.get(0); - let bounced: Txid = Txid::from_slice(row.get(1))?; - let info = Info::Bounce { bounced }; - let metadata = encode_info(&info); - - fail_point("(injected) fail bounce", 0.2)?; - match rpc.bounce(&bounced, fee, &metadata) { - Ok(it) => { - tx.execute( - "UPDATE bounce SET txid = $1, status = $2 WHERE id = $3", - &[&it.as_ref(), &(BounceStatus::Sent as i16), &id], - )?; - tx.commit()?; - info!("|| {} in {}", &bounced, &it); - } - Err(err) => match err { - rpc::Error::RPC { - code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, - msg, - } => { - tx.execute( - "UPDATE bounce SET status = $1 WHERE id = $2", - &[&(BounceStatus::Ignored as i16), &id], - )?; - tx.commit()?; - info!("|| (ignore) {} because {}", &bounced, msg); - } - _ => Err(err)?, - }, - } - } - Ok(row.is_some()) - } - - // TODO check if transactions are abandoned - let mut lifetime = config.btc_lifetime; let mut status = true; @@ -199,6 +98,111 @@ pub fn worker( } } +/// Send a transaction on the blockchain, return true if more transactions with the same status remains +fn send( + db: &mut Client, + rpc: &mut BtcRpc, + status: TxStatus, +) -> Result<bool, Box<dyn std::error::Error>> { + assert!(status == TxStatus::Delayed || status == TxStatus::Requested); + let mut tx = db.transaction()?; + // We lock the row with FOR UPDATE to prevent sending same transaction multiple time + let row = tx.query_opt( + "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE", + &[&(status as i16)], + )?; + if let Some(row) = &row { + let id: i32 = row.get(0); + let amount = taler_to_btc(&Amount::from_str(row.get(1))?)?; + let wtid: &[u8] = row.get(2); + let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?; + let exchange_base_url: Url = Url::parse(row.get(4))?; + let info = Info::Transaction { + wtid: wtid.try_into()?, + url: exchange_base_url, + }; + let metadata = encode_info(&info); + + match rpc.send_op_return(&addr, &amount, &metadata, false, true) { + Ok(tx_id) => { + fail_point("(injected) fail send", 0.3)?; + tx.execute( + "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", + &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id], + )?; + tx.commit()?; + let amount = btc_to_taler(&amount.to_signed().unwrap()); + info!(">> {} {} in {} to {}", amount, base32(wtid), tx_id, addr); + } + Err(e) => { + tx.execute( + "UPDATE tx_out SET status=$1 WHERE id=$2", + &[&(TxStatus::Delayed as i16), &id], + )?; + tx.commit()?; + Err(e)?; + } + } + } + Ok(row.is_some()) +} + +/// Bounce a transaction on the blockchain, return true if more bounce with the same status remains +fn bounce( + db: &mut Client, + rpc: &mut BtcRpc, + status: BounceStatus, + fee: &BtcAmount, +) -> Result<bool, Box<dyn std::error::Error>> { + assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested); + let mut tx = db.transaction()?; + // We lock the row with FOR UPDATE to prevent sending same transaction multiple time + let row = tx.query_opt( + "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE", + &[&(status as i16)], + )?; + if let Some(row) = &row { + let id: i32 = row.get(0); + let bounced: Txid = Txid::from_slice(row.get(1))?; + let info = Info::Bounce { bounced }; + let metadata = encode_info(&info); + + match rpc.bounce(&bounced, fee, &metadata) { + Ok(it) => { + fail_point("(injected) fail bounce", 0.3)?; + tx.execute( + "UPDATE bounce SET txid=$1, status=$2 WHERE id=$3", + &[&it.as_ref(), &(BounceStatus::Sent as i16), &id], + )?; + tx.commit()?; + info!("|| {} in {}", &bounced, &it); + } + Err(err) => match err { + rpc::Error::RPC { + code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, + msg, + } => { + tx.execute( + "UPDATE bounce SET status=$1 WHERE id=$2", + &[&(BounceStatus::Ignored as i16), &id], + )?; + tx.commit()?; + info!("|| (ignore) {} because {}", &bounced, msg); + } + e => { + tx.execute( + "UPDATE bounce SET status=$1 WHERE id=$2", + &[&(BounceStatus::Delayed as i16), &id], + )?; + tx.commit()?; + Err(e)?; + } + }, + } + } + Ok(row.is_some()) +} + /// Retrieve last stored hash fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> { Ok(db @@ -381,34 +385,36 @@ fn sync_chain_outgoing( let amount = btc_to_taler(&full.amount); if confirmations < 0 { - // Handle conflicting tx - let nb_row = db.execute( - "UPDATE tx_out SET status=$1, txid=NULL where txid=$2", - &[&(TxStatus::Delayed as i16), &id.as_ref()], - )?; - if nb_row > 0 { - warn!( - ">> (conflict) {} in {} to {}", - base32(&wtid), - id, - credit_addr - ); + if full.replaced_by_txid.is_none() { + // Handle conflicting tx + let nb_row = db.execute( + "UPDATE tx_out SET status=$1, txid=NULL where txid=$2", + &[&(TxStatus::Delayed as i16), &id.as_ref()], + )?; + if nb_row > 0 { + warn!( + ">> (conflict) {} in {} to {}", + base32(&wtid), + id, + credit_addr + ); + } } } else { // Get previous out tx let row = db.query_opt( - "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE", + "SELECT id, status, txid FROM tx_out WHERE wtid=$1 FOR UPDATE", &[&wtid.as_ref()], )?; if let Some(row) = row { // If already in database sync status - let _id: i32 = row.get(0); + let row_id: i32 = row.get(0); let status: i16 = row.get(1); match TxStatus::try_from(status as u8).unwrap() { TxStatus::Requested | TxStatus::Delayed => { let nb_row = db.execute( - "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3", - &[&(TxStatus::Sent as i16), &_id, &status], + "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3 AND status=$4", + &[&(TxStatus::Sent as i16), &id.as_ref(), &row_id, &status], )?; if nb_row > 0 { warn!( @@ -420,7 +426,25 @@ fn sync_chain_outgoing( ); } } - TxStatus::Sent => { /* Status is correct */ } + TxStatus::Sent => { + if let Some(txid) = full.replaces_txid { + let stored_id: Txid = Txid::from_slice(row.get(2)).unwrap(); + if txid == stored_id { + let nb_row = db.execute( + "UPDATE tx_out SET txid=$1 WHERE txid=$2", + &[&id.as_ref(), &txid.as_ref()], + )?; + if nb_row > 0 { + info!( + ">> (recovered) {} replace {} with {}", + base32(&wtid), + txid, + id + ); + } + } + } + } } } else { // Else add to database @@ -440,6 +464,29 @@ fn sync_chain_outgoing( ); } } + + if let Some(delay) = config.bump_delay { + if confirmations == 0 && full.replaced_by_txid.is_none() { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + if now - full.time > delay as u64 { + let bump = rpc.bump_fee(&id)?; + fail_point("(injected) fail bump", 0.3)?; + db.execute( + "UPDATE tx_out SET txid=$1 WHERE txid=$2", + &[&bump.txid.as_ref(), &id.as_ref()], + )?; + info!( + ">> (bump) {} replace {} with {}", + base32(&wtid), + id, + bump.txid + ); + } + } + } } } Info::Bounce { bounced } => { @@ -460,13 +507,13 @@ fn sync_chain_outgoing( )?; if let Some(row) = row { // If already in database sync status - let _id: i32 = row.get(0); + let row_id: i32 = row.get(0); let status: i16 = row.get(1); match BounceStatus::try_from(status as u8).unwrap() { BounceStatus::Requested | BounceStatus::Delayed => { let nb_row = db.execute( - "UPDATE bounce SET status=$1 WHERE id=$2 AND status=$3", - &[&(BounceStatus::Sent as i16), &_id, &status], + "UPDATE bounce SET status=$1, txid=$2 WHERE id=$3 AND status=$4", + &[&(BounceStatus::Sent as i16), &id.as_ref(), &row_id, &status], )?; if nb_row > 0 { warn!("|| (recovered) {} in {}", &bounced, &id); diff --git a/btc-wire/src/rpc.rs b/btc-wire/src/rpc.rs @@ -69,6 +69,7 @@ pub struct BtcRpc { id: u64, cookie: String, conn: BufReader<TcpStream>, + buf: Vec<u8>, } impl BtcRpc { @@ -104,6 +105,7 @@ impl BtcRpc { id: 0, cookie: format!("Basic {}", base64::encode(token)), conn, + buf: Vec::new(), }) } @@ -120,7 +122,8 @@ impl BtcRpc { // Serialize the body first so we can set the Content-Length header. let body = serde_json::to_vec(&request)?; - let mut buf = Vec::new(); + let buf = &mut self.buf; + buf.clear(); // Write HTTP request { let sock = self.conn.get_mut(); @@ -142,7 +145,7 @@ impl BtcRpc { // Skip response let sock = &mut self.conn; loop { - let amount = sock.read_until(b'\n', &mut buf)?; + let amount = sock.read_until(b'\n', buf)?; let sep = buf[..amount] == [b'\r', b'\n']; buf.clear(); if sep { @@ -150,7 +153,7 @@ impl BtcRpc { } } // Read body - let amount = sock.read_until(b'\n', &mut buf)?; + let amount = sock.read_until(b'\n', buf)?; let response: BtcResponse<T> = serde_json::from_slice(&buf[..amount])?; match response { BtcResponse::RpcResponse { result, error, id } => { @@ -230,6 +233,7 @@ impl BtcRpc { outputs: impl IntoIterator<Item = (&'b Address, &'c Amount)>, data: Option<&[u8]>, subtract_fee: bool, + replaceable: bool, ) -> Result<Txid> { let mut outputs: Vec<Value> = outputs .into_iter() @@ -238,7 +242,7 @@ impl BtcRpc { let len = outputs.len(); let hex: String = self.call( "createrawtransaction", - &[ + &( Value::Array( inputs .into_iter() @@ -252,7 +256,9 @@ impl BtcRpc { } outputs }), - ], + (), + true, + ), )?; let funded: HexWrapper = self.call( "fundrawtransaction", @@ -264,6 +270,7 @@ impl BtcRpc { } else { vec![] }, + replaceable, }, ), )?; @@ -271,6 +278,10 @@ impl BtcRpc { self.call("sendrawtransaction", &[&signed.hex]) } + pub fn bump_fee(&mut self, id: &Txid) -> Result<Bump> { + self.call("bumpfee", &[id]) + } + pub fn list_since_block( &mut self, hash: Option<&BlockHash>, @@ -304,6 +315,7 @@ impl BtcRpc { #[serde(rename_all = "camelCase")] pub struct FundOption { pub subtract_fee_from_outputs: Vec<usize>, + pub replaceable: bool, } #[derive(Debug, serde::Deserialize)] @@ -344,6 +356,11 @@ pub struct Vin { pub vout: Option<u32>, } +#[derive(Debug, serde::Deserialize)] +pub struct Bump { + pub txid: Txid, +} + /// Enum to represent the category of a transaction. #[derive(Copy, PartialEq, Eq, Clone, Debug, serde::Deserialize)] #[serde(rename_all = "lowercase")] @@ -397,6 +414,8 @@ pub struct TransactionFull { pub amount: SignedAmount, #[serde(default, with = "bitcoin::util::amount::serde::as_btc::opt")] pub fee: Option<SignedAmount>, + pub replaces_txid: Option<Txid>, + pub replaced_by_txid: Option<Txid>, pub details: Vec<TransactionDetail>, pub decoded: RawTransaction, } diff --git a/makefile b/makefile @@ -15,6 +15,7 @@ test_btc: test/btc/reorg.sh test/btc/hell.sh test/btc/analysis.sh + test/btc/bumpfee.sh test/btc/config.sh test: install test_gateway test_btc \ No newline at end of file diff --git a/taler-common/src/config.rs b/taler-common/src/config.rs @@ -34,8 +34,9 @@ pub struct Config { pub payto: Url, pub confirmation: u16, pub bounce_fee: u64, - pub btc_lifetime: Option<u64>, - pub http_lifetime: Option<u64>, + pub btc_lifetime: Option<u32>, + pub http_lifetime: Option<u32>, + pub bump_delay: Option<u32>, } impl Config { @@ -60,6 +61,9 @@ impl Config { http_lifetime: nb(self_conf, "HTTP_LIFETIME") .and_then(|nb| (nb != 0).then(|| Some(nb))) .unwrap_or(None), + bump_delay: nb(self_conf, "BUMP_DELAY") + .and_then(|nb| (nb != 0).then(|| Some(nb))) + .unwrap_or(None), } } } diff --git a/test/btc/bumpfee.sh b/test/btc/bumpfee.sh @@ -0,0 +1,94 @@ +#!/bin/bash + +## Test btc_wire ability to handle stuck transaction correctly + +set -eu + +source "${BASH_SOURCE%/*}/../common.sh" +SCHEMA=btc.sql +CONFIG=taler_bump.conf +RUST_BACKTRACE=full + +echo "----- Setup -----" +echo "Load config file" +load_config +echo "Start database" +setup_db +echo "Start bitcoin node" +init_btc +echo "Start second bitcoin node" +init_btc2 +echo "Start btc-wire" +btc_wire +echo "Start gateway" +gateway +echo "" + +SEQ="seq 10 30" + + +echo -n "Making wire transfer to exchange:" +for n in `$SEQ`; do + btc-wire-utils -d $BTC_DIR transfer 0.$n > /dev/null + mine_btc # Mine transactions +done +next_btc # Trigger btc_wire +check_balance 5.79983389 4.20000000 +echo " OK" + +echo "----- Bump fee -----" + +echo -n "Making wire transfer from exchange:" +taler-exchange-wire-gateway-client \ + -b $BANK_ENDPOINT \ + -C payto://bitcoin/$CLIENT \ + -a BTC:0.004 > /dev/null +sleep 1 +check_balance 5.79983389 4.19599801 +echo " OK" + +echo -n "Abandon pending transaction:" +restart_btc -minrelaytxfee=0.0001 +echo " OK" + +echo -n "Check bump:" +sleep 6 +mine_btc +check_balance 5.80383389 4.19598010 +echo " OK" + +echo "----- Bump fail -----" + +echo -n "Replace btc_wire with failing btc_wire" +kill $WIRE_PID +fail_btc_wire +echo " OK" + +echo -n "Making wire transfer from exchange:" +for n in `$SEQ`; do + taler-exchange-wire-gateway-client \ + -b $BANK_ENDPOINT \ + -C payto://bitcoin/$CLIENT \ + -a BTC:0.00$n > /dev/null +done +sleep 5 +check_balance 5.80383389 4.15356220 +echo " OK" + +echo -n "Abandon pending transaction:" +restart_btc -minrelaytxfee=0.0002 +echo " OK" + +echo -n "Check bump:" +sleep 2 +mine_btc +sleep 2 +mine_btc +sleep 2 +mine_btc +sleep 2 +mine_btc +check_balance 5.84583389 4.15314430 +echo " OK" + +echo "All tests passed!" +\ No newline at end of file diff --git a/test/btc/fail.sh b/test/btc/fail.sh @@ -49,7 +49,7 @@ for n in `$SEQ`; do -C payto://bitcoin/$CLIENT \ -a BTC:0.0000$n > /dev/null done -sleep 20 +sleep 10 mine_btc # Mine transactions echo " OK" @@ -71,7 +71,7 @@ for n in `$SEQ`; do mine_btc done next_btc -sleep 20 +sleep 10 echo " OK" echo -n "Check balance:" diff --git a/test/btc/stress.sh b/test/btc/stress.sh @@ -20,7 +20,7 @@ echo "Start gateway" gateway echo "" -SEQ="seq 10 99" +SEQ="seq 10 30" echo "----- Handle incoming -----" @@ -29,9 +29,7 @@ for n in `$SEQ`; do btc-wire-utils -d $BTC_DIR transfer 0.000$n > /dev/null mine_btc # Mine transactions done -sleep 3 # Give time for btc_wire worker to process next_btc # Confirm all transactions -sleep 3 # Give time for btc_wire worker to process echo " OK" echo -n "Requesting exchange incoming transaction list:" @@ -39,7 +37,7 @@ check_delta "incoming?delta=-100" "$SEQ" "0.000" echo " OK" echo -n "Check balance:" -check_balance 9.95023810 0.04905000 +check_balance 9.99563389 0.00420000 echo " OK" echo "----- Handle outgoing -----" @@ -51,8 +49,10 @@ for n in `$SEQ`; do -C payto://bitcoin/$CLIENT \ -a BTC:0.0000$n > /dev/null done -sleep 10 # Give time for btc_wire worker to process -next_btc # Mine transactions +sleep 20 # Give time for btc_wire worker to process +mine_btc # Mine transactions +sleep 10 +check_balance 9.99605389 0.00373821 echo " OK" echo -n "Requesting exchange outgoing transaction list:" @@ -60,7 +60,7 @@ check_delta "outgoing?delta=-100" "$SEQ" echo " OK" echo -n "Check balance:" -check_balance 9.95514310 +check_balance 9.99605389 0.00373821 echo " OK" next_btc # Mine transactions @@ -82,7 +82,7 @@ echo " OK" echo -n "Check balance:" # Balance should not have changed -check_balance 9.95514310 +check_balance 9.99605389 0.00373821 echo " OK" echo "----- Handle bounce -----" @@ -103,7 +103,7 @@ sleep 5 # Wait for reconnection echo " OK" echo -n "Check balance:" -check_balance "*" 0.00090000 +check_balance "*" 0.00021000 echo " OK" echo "----- Recover DB -----" @@ -123,7 +123,7 @@ echo " OK" echo -n "Check balance:" # Balance should not have changed -check_balance "*" 0.00090000 +check_balance "*" 0.00021000 echo " OK" echo "All tests passed!" \ No newline at end of file diff --git a/test/btc/wire.sh b/test/btc/wire.sh @@ -20,7 +20,7 @@ echo "Start gateway" gateway echo "" -SEQ="seq 10 20" +SEQ="seq 10 99" echo "----- Receive -----" @@ -30,7 +30,7 @@ for n in `$SEQ`; do mine_btc # Mine transactions done next_btc # Trigger btc_wire -check_balance 9.99826299 0.00165000 +check_balance 9.95023810 0.04905000 echo " OK" echo -n "Requesting exchange incoming transaction list:" @@ -46,9 +46,9 @@ for n in `$SEQ`; do -C payto://bitcoin/$CLIENT \ -a BTC:0.0000$n > /dev/null done -sleep 1 +sleep 6 mine_btc # Mine transactions -check_balance 9.99842799 0.00146311 +check_balance 9.95514310 "" echo " OK" echo -n "Requesting exchange's outgoing transaction list:" @@ -60,13 +60,13 @@ echo "----- Bounce -----" clear_wallet echo -n "Bounce:" -for n in `$SEQ`; do +for n in `seq 10 40`; do $BTC_CLI -rpcwallet=client sendtoaddress $WIRE 0.000$n > /dev/null mine_btc done next_btc -sleep 1 -check_balance "*" 0.00011000 +sleep 3 +check_balance "*" 0.00031000 echo " OK" echo "All tests passed!" \ No newline at end of file diff --git a/test/common.sh b/test/common.sh @@ -10,9 +10,6 @@ function cleanup() { for n in `jobs -p`; do kill $n &> /dev/null || true done - for n in `jobs -p`; do - wait $n || true - done rm -rf $DIR &> /dev/null wait } @@ -131,12 +128,10 @@ function btc2_fork() { function resume_btc() { # Restart node bitcoind -datadir=$BTC_DIR $* &>> log/btc.log & - BTC_PID="$!" - # Wait for RPC server to be online - $BTC_CLI -rpcwait getnetworkinfo > /dev/null + BTC_PID="$!" # Load wallets for wallet in wire client reserve; do - $BTC_CLI loadwallet $wallet > /dev/null + $BTC_CLI -rpcwait loadwallet $wallet > /dev/null done # Connect second node $BTC_CLI addnode 127.0.0.1:8346 onetry diff --git a/test/conf/taler_bump.conf b/test/conf/taler_bump.conf @@ -0,0 +1,13 @@ +[taler] +CURRENCY = BTC + +[exchange] +BASE_URL = http://test.com + +[depolymerizer-bitcoin] +DB_URL = postgres://localhost:5454/postgres?user=postgres&password=password +PORT = 8060 +PAYTO = payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj +CONFIRMATION = 3 +BOUNCE_FEE = 1000 +BUMP_DELAY = 10 +\ No newline at end of file diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs @@ -11,7 +11,7 @@ use postgres::fallible_iterator::FallibleIterator; use std::{ convert::Infallible, str::FromStr, - sync::atomic::{AtomicBool, AtomicU64, Ordering}, + sync::atomic::{AtomicBool, AtomicU32, Ordering}, time::{Duration, Instant}, }; use taler_common::{ @@ -34,7 +34,7 @@ struct ServerState { pool: Pool, config: taler_common::config::Config, notify: Notify, - lifetime: Option<AtomicU64>, + lifetime: Option<AtomicU32>, status: AtomicBool, } @@ -107,7 +107,7 @@ async fn main() { pool, config: conf.clone(), notify: Notify::new(), - lifetime: conf.http_lifetime.map(AtomicU64::new), + lifetime: conf.http_lifetime.map(AtomicU32::new), status: AtomicBool::new(true), }; let state: &'static ServerState = Box::leak(Box::new(state)); @@ -468,7 +468,6 @@ fn status_watcher(state: &'static ServerState) { // Sync state let row = db.query_one("SELECT value FROM state WHERE name = 'status'", &[])?; let status: &[u8] = row.get(0); - dbg!(status); assert!(status.len() == 1 && status[0] < 2); state.status.store(status[0] == 1, Ordering::SeqCst); // Wait for next notification