depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 2309c24727cc5e32764b158494eca470bee30fa9
parent 0467435320f42f1e7245554e194f3f2cd73d92c1
Author: Antoine A <>
Date:   Wed, 15 Dec 2021 17:37:20 +0100

Improve btc_wire code and tests

Diffstat:
Mbtc-wire/src/fail_point.rs | 2++
Mbtc-wire/src/main.rs | 132++++++++++++++++++++++++++++++++++---------------------------------------------
Mresearch.md | 3+++
Mscript/setup.sh | 22++++++++++++++--------
Mscript/test_btc_fail.sh | 82+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mscript/test_btc_stress.sh | 13+++++++++----
Mscript/test_recover_db.sh | 2+-
7 files changed, 168 insertions(+), 88 deletions(-)

diff --git a/btc-wire/src/fail_point.rs b/btc-wire/src/fail_point.rs @@ -5,8 +5,10 @@ use taler_log::log::warn; pub fn fail_point(msg: &'static str, prob: f32) -> Result<(), &'static str> { #[cfg(feature = "fail")] return if fastrand::f32() < prob { + panic!("lol"); Err(msg) } else { + panic!("lol"); Ok(()) }; diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs @@ -10,7 +10,7 @@ use btc_wire::{ segwit::DecodeSegWitErr, ClientExtended, GetOpReturnErr, GetSegwitErr, }; -use postgres::{fallible_iterator::FallibleIterator, Client, NoTls, Transaction}; +use postgres::{fallible_iterator::FallibleIterator, Client, NoTls}; use rand::{rngs::OsRng, RngCore}; use std::{ collections::HashMap, @@ -168,6 +168,7 @@ impl AutoReloadDb { /// Listen for new proposed transactions and announce them on the bitcoin network fn sender(rpc: RPC, mut db: AutoReloadDb) { + // List all transactions waiting to be sent fn list(db: &mut Client) -> Result<Vec<(i32, Status)>, postgres::Error> { let mut iter = db.query_raw( "SELECT id, status FROM tx_out WHERE status=$1 OR status=$2", @@ -181,26 +182,49 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) { return Ok(out); } - fn get( - tx: &mut Transaction, - status: Status, - id: i32, - ) -> Result<Option<(BtcAmount, Address, Vec<u8>)>, Box<dyn std::error::Error>> { + // Perform a transaction on the blockchain + // The transaction must be in the manual state + fn perform_send(db: &mut Client, rpc: &RPC, id: i32) -> Result<(), Box<dyn std::error::Error>> { + let mut tx = db.transaction()?; // We lock the row with FOR UPDATE to prevent sending same transaction multiple time let iter = tx.query_opt( "SELECT amount, wtid, credit_acc, exchange_url FROM tx_out WHERE id=$1 AND status=$2 FOR UPDATE", - &[&id, &(status as i16)], + &[&id, &(Status::Manual as i16)], )?; - Ok(if let Some(row) = iter { - let amount: Amount = Amount::from_str(row.get(0))?; + if let Some(row) = iter { + let amount = taler_amount_to_btc_amount(&Amount::from_str(row.get(0))?)?; let reserve_pub: &[u8] = row.get(1); - let credit_addr: Address = btc_payto_addr(&Url::parse(row.get(2))?)?; + let addr: Address = btc_payto_addr(&Url::parse(row.get(2))?)?; let exchange_base_url: Url = Url::parse(row.get(3))?; let metadata = encode_info(reserve_pub.try_into()?, &exchange_base_url); - Some((taler_amount_to_btc_amount(&amount)?, credit_addr, metadata)) + + if let Err(err) = fail_point("Skip send_op_return", 0.4) { + error!("sender: fail - {}", err); + return Ok(()); + } + match rpc.send_op_return(&addr, amount, &metadata) { + Ok(tx_id) => { + fail_point("Fail update db", 0.5)?; + tx.execute( + "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", + &[&(Status::Pending as i16), &tx_id.as_ref(), &id], + )?; + info!("{} PENDING", tx_id); + } + Err(e) => { + info!("sender: RPC - {}", e); + fail_point("Fail update db", 0.5)?; + tx.execute( + "UPDATE tx_out SET status=$1 WHERE id=$2", + &[&(Status::Delayed as i16), &id], + )?; + } + } } else { - None - }) + warn!("sender: transaction status collision, database have been altered by another process"); + } + tx.commit()?; + Ok(()) } // TODO check if transactions are abandoned @@ -210,52 +234,14 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) { let result: Result<(), Box<dyn std::error::Error>> = (|| { for (id, status) in list(&mut db)? { // Set status to MANUAL to detect database error preventing atomicity - { - let mut tx = db.transaction()?; - if get(&mut tx, status, id)?.is_some() { - tx.execute( - "UPDATE tx_out SET status=$1 WHERE id=$2", - &[&(Status::Manual as i16), &id], - )?; - } else { - warn!("sender: transaction status collision, database have been altered by another process"); - } - - tx.commit()?; - } - { - let mut tx = db.transaction()?; - if let Some((amount, addr, metadata)) = get(&mut tx, Status::Manual, id)? { - if let Err(err) = fail_point("Skip send_op_return", 0.4) { - error!("sender: fail - {}", err); - continue; - } - match rpc.send_op_return(&addr, amount, &metadata) { - Ok(tx_id) => { - fail_point("Fail update db", 0.5)?; - tx.execute( - "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", - &[&(Status::Pending as i16), &tx_id.as_ref(), &id], - )?; - info!("{} PENDING", tx_id); - } - Err(e) => { - info!("sender: RPC - {}", e); - fail_point("Fail update db", 0.5)?; - tx.execute( - "UPDATE tx_out SET status=$1 WHERE id=$2", - &[&(Status::Delayed as i16), &id], - )?; - } - } - } else { - warn!("sender: transaction status collision, database have been altered by another process"); - } - tx.commit().map_err(|e| { - warn!("### --- COMMIT FAILED --- ###"); - return e; - })?; - } + let nb = db.execute( + "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3", + &[&(Status::Manual as i16), &id, &(status as i16)], + )?; + if nb == 0 { + warn!("sender: transaction status collision, database have been altered by another process"); + } + perform_send(db, &rpc, id)?; } Ok(()) })(); @@ -334,12 +320,14 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) { // Generate a random request_uid let mut request_uid = [0; 64]; OsRng.fill_bytes(&mut request_uid); - tx.execute( - "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", + let nb = tx.execute( + "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string(), &config.base_url.to_string(), &(Status::OnChain as i16), &id.as_ref(), &request_uid.as_ref() ], )?; - warn!("watcher: found an unregistered outgoing address {} {} in tx {}", crockford_base32_encode(&wtid), &url, id); + if nb > 0 { + warn!("watcher: found an unregistered outgoing address {} {} in tx {}", crockford_base32_encode(&wtid), &url, id); + } } tx.commit()?; } @@ -350,23 +338,17 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) { }, Category::Receive => match rpc.get_tx_segwit_key(&id) { Ok((full, reserve_pub)) => { - let mut tx = db.transaction()?; - let row = tx.query_opt( - "SELECT id FROM tx_in WHERE reserve_pub=$1 FOR UPDATE", - &[&reserve_pub.as_ref()], - )?; - if row.is_none() { - let debit_addr = sender_address(&rpc, &full)?; - let credit_addr = full.tx.details[0].address.as_ref().unwrap(); - let time = full.tx.info.blocktime.unwrap(); - let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time); - let amount = btc_amount_to_taler_amount(&full.tx.amount); - tx.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5)", &[ + let debit_addr = sender_address(&rpc, &full)?; + let credit_addr = full.tx.details[0].address.as_ref().unwrap(); + let time = full.tx.info.blocktime.unwrap(); + let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time); + let amount = btc_amount_to_taler_amount(&full.tx.amount); + let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string() ])?; + if nb > 0 { info!("{} << {} {}", &debit_addr, &credit_addr, &amount); } - tx.commit()?; } Err(err) => match err { GetSegwitErr::Decode( diff --git a/research.md b/research.md @@ -16,5 +16,8 @@ Rust client library OpenEthereum - JSON RCP API ## TODO +- Listen/Notify to replace pooling +- Add and test refund +- Automatic MANUAL fix - detect small fork -> warning - fork -> ERR panic \ No newline at end of file diff --git a/script/setup.sh b/script/setup.sh @@ -78,22 +78,28 @@ function check_balance() { # Start btc_wire function btc_wire() { - cargo build --bin btc-wire &> /dev/null - target/debug/btc-wire $BTC_DIR &> btc_wire.log & + cargo build --bin btc-wire --release &> /dev/null + target/release/btc-wire $BTC_DIR &> btc_wire.log & +} + +# Start btc_wire with random failures +function fail_btc_wire() { + cargo build --bin btc-wire --release --features fail &> /dev/null + target/release/btc-wire $BTC_DIR &> btc_wire.log & } # Start multiple btc_wire in parallel function stressed_btc_wire() { - cargo build --bin btc-wire &> /dev/null - target/debug/btc-wire $BTC_DIR &> btc_wire.log & - target/debug/btc-wire $BTC_DIR &> btc_wire1.log & - target/debug/btc-wire $BTC_DIR &> btc_wire2.log & + cargo build --bin btc-wire --release &> /dev/null + target/release/btc-wire $BTC_DIR &> btc_wire.log & + target/release/btc-wire $BTC_DIR &> btc_wire1.log & + target/release/btc-wire $BTC_DIR &> btc_wire2.log & } # Start wire_gateway in test mode function gateway() { - cargo build --bin wire-gateway --features test &> /dev/null - target/debug/wire-gateway &> gateway.log & + cargo build --bin wire-gateway --release --features test &> /dev/null + target/release/wire-gateway &> gateway.log & for n in `seq 1 50`; do echo -n "." sleep 0.2 diff --git a/script/test_btc_fail.sh b/script/test_btc_fail.sh @@ -0,0 +1,81 @@ +#!/bin/bash + +## Test btc_wire ability to recover from errors in correctness critical paths + +set -eu + +# Cleanup to run whenever we exit +function cleanup() { + for n in `jobs -p`; do + kill $n 2> /dev/null || true + done + rm -rf $BTC_DIR 2> /dev/null + wait +} + +# Install cleanup handler (except for kill -9) +trap cleanup EXIT + +source "${BASH_SOURCE%/*}/setup.sh" + +echo "---- Setup stressed -----" +echo "Load config file" +load_config +echo "Reset database" +reset_db +echo "Start bitcoin node" +init_btc +echo "Init bitcoin regtest" +setup_btc +echo "Start failing btc-wire" +fail_btc_wire +echo "Start gateway" +gateway +echo "" + +SEQ="seq 10 99" + +function check() { + check_delta "$1?delta=-100" "$SEQ" +} + +echo "----- Handle incoming -----" + +echo -n "Making wire transfer to exchange:" +for n in `$SEQ`; do + btc-wire-cli -d $BTC_DIR transfer 0.0000$n + mine_btc # Mine transactions +done +next_btc # Trigger btc_wire +echo " OK" + +echo -n "Requesting exchange incoming transaction list:" +check incoming +echo " OK" + +echo -n "Check balance:" +check_balance 9.99438310 1.00490500 +echo " OK" + +echo "----- Handle outgoing -----" + +echo -n "Making wire transfer from exchange:" +for n in `$SEQ`; do + taler-exchange-wire-gateway-client \ + -b $BANK_ENDPOINT \ + -C payto://bitcoin/$CLIENT \ + -a BTC:0.0000$n > /dev/null +done +next_btc # Mine transactions +sleep 5 +next_btc # Trigger watcher twice, never sure +sleep 3 +echo " OK" + +echo -n "Requesting exchange outgoing transaction list:" +check outgoing +echo " OK" + +echo -n "Check balance:" +check_balance 9.99928810 0.99982002 +echo " OK" +\ No newline at end of file diff --git a/script/test_btc_stress.sh b/script/test_btc_stress.sh @@ -39,7 +39,7 @@ function check() { check_delta "$1?delta=-100" "$SEQ" } -echo "----- Generate many transactions -----" +echo "----- Handle incoming -----" echo -n "Making wire transfer to exchange:" for n in `$SEQ`; do @@ -47,6 +47,9 @@ for n in `$SEQ`; do mine_btc # Mine transactions done next_btc # Trigger btc_wire +sleep 5 +next_btc # Trigger btc_wire again (for sure) +sleep 3 echo " OK" echo -n "Requesting exchange incoming transaction list:" @@ -57,6 +60,8 @@ echo -n "Check balance:" check_balance 9.99438310 1.00490500 echo " OK" +echo "----- Handle outgoing -----" + echo -n "Making wire transfer from exchange:" for n in `$SEQ`; do taler-exchange-wire-gateway-client \ @@ -66,7 +71,7 @@ for n in `$SEQ`; do done next_btc # Mine transactions sleep 5 -next_btc # Trigger watcher twice, never sure +next_btc # Trigger watcher twice (for sure) sleep 3 echo " OK" @@ -75,7 +80,7 @@ check outgoing echo " OK" echo -n "Check balance:" -check_balance 9.99928810 0.99981936 +check_balance 9.99928810 0.99982002 echo " OK" echo "----- Recover DB -----" @@ -95,7 +100,7 @@ echo " OK" echo -n "Check balance:" # Balance should not have changed -check_balance 9.99928810 0.99981936 +check_balance 9.99928810 0.99982002 echo " OK" echo "All tests passed" \ No newline at end of file diff --git a/script/test_recover_db.sh b/script/test_recover_db.sh @@ -64,7 +64,7 @@ taler-exchange-wire-gateway-client \ -b $BANK_ENDPOINT \ -C payto://bitcoin/$CLIENT \ -a BTC:0.00002 > /dev/null -mine_btc +sleep 1 next_btc check_balance 9.99992218 1.00006001 echo " OK"