depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 34f0fa413f698cb1b8436a3414ccad969dd7a79e
parent 773b374e47a7baadb729e68203e548a54b33ce4c
Author: Antoine A <>
Date:   Fri, 21 Jan 2022 14:17:24 +0100

Simplify locking

Diffstat:
Mbtc-wire/src/loops/worker.rs | 49+++++++++++++++++++++++++++++--------------------
Mmakefile | 1-
Mtest/btc/bumpfee.sh | 6+++---
Dtest/btc/fail.sh | 82-------------------------------------------------------------------------------
Mtest/btc/stress.sh | 15+++++++--------
Mtest/common.sh | 13+++----------
6 files changed, 42 insertions(+), 124 deletions(-)

diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs @@ -70,6 +70,22 @@ pub fn worker( let mut iter = ntf.iter(); while iter.next()?.is_some() {} } + + // It is not possible to atomically update the blockchain and the database. + // When we failed to sync the database and the blockchain state we rely on + // sync_chain to recover the lost update. + // When this function is running in parallel it not possible to known another + // execution has failed, and this can lead to a transaction being sent multiple time. + // To ensure only a single version of this function is running at a given time we rely + // on postgres advisory lock + + // Take the lock + let row = db.query_one("SELECT pg_try_advisory_lock(42)", &[])?; + let locked: bool = row.get(0); + if !locked { + return Err("Another btc_wire process is running concurrently".into()); + } + // Sync chain sync_chain(rpc, db, config, state, &mut status)?; @@ -105,12 +121,11 @@ fn send( status: TxStatus, ) -> Result<bool, Box<dyn std::error::Error>> { assert!(status == TxStatus::Delayed || status == TxStatus::Requested); - let mut tx = db.transaction()?; - // We lock the row with FOR UPDATE to prevent sending same transaction multiple time - let row = tx.query_opt( - "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE", - &[&(status as i16)], - )?; + // We rely on the advisory lock to ensure we are the only one sending transactions + let row = db.query_opt( + "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1", + &[&(status as i16)], + )?; if let Some(row) = &row { let id: i32 = row.get(0); let amount = taler_to_btc(&Amount::from_str(row.get(1))?)?; @@ -126,20 +141,18 @@ fn send( match rpc.send_op_return(&addr, &amount, &metadata, false, true) { Ok(tx_id) => { fail_point("(injected) fail send", 0.3)?; - tx.execute( + db.execute( "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id], )?; - tx.commit()?; let amount = btc_to_taler(&amount.to_signed().unwrap()); info!(">> {} {} in {} to {}", amount, base32(wtid), tx_id, addr); } Err(e) => { - tx.execute( + db.execute( "UPDATE tx_out SET status=$1 WHERE id=$2", &[&(TxStatus::Delayed as i16), &id], )?; - tx.commit()?; Err(e)?; } } @@ -155,10 +168,9 @@ fn bounce( fee: &BtcAmount, ) -> Result<bool, Box<dyn std::error::Error>> { assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested); - let mut tx = db.transaction()?; - // We lock the row with FOR UPDATE to prevent sending same transaction multiple time - let row = tx.query_opt( - "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1 FOR UPDATE", + // We rely on the advisory lock to ensure we are the only one sending transactions + let row = db.query_opt( + "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1", &[&(status as i16)], )?; if let Some(row) = &row { @@ -170,11 +182,10 @@ fn bounce( match rpc.bounce(&bounced, fee, &metadata) { Ok(it) => { fail_point("(injected) fail bounce", 0.3)?; - tx.execute( + db.execute( "UPDATE bounce SET txid=$1, status=$2 WHERE id=$3", &[&it.as_ref(), &(BounceStatus::Sent as i16), &id], )?; - tx.commit()?; info!("|| {} in {}", &bounced, &it); } Err(err) => match err { @@ -182,19 +193,17 @@ fn bounce( code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, msg, } => { - tx.execute( + db.execute( "UPDATE bounce SET status=$1 WHERE id=$2", &[&(BounceStatus::Ignored as i16), &id], )?; - tx.commit()?; info!("|| (ignore) {} because {}", &bounced, msg); } e => { - tx.execute( + db.execute( "UPDATE bounce SET status=$1 WHERE id=$2", &[&(BounceStatus::Delayed as i16), &id], )?; - tx.commit()?; Err(e)?; } }, diff --git a/makefile b/makefile @@ -9,7 +9,6 @@ test_btc: test/btc/wire.sh test/btc/lifetime.sh test/btc/reconnect.sh - test/btc/fail.sh test/btc/stress.sh test/btc/conflict.sh test/btc/reorg.sh diff --git a/test/btc/bumpfee.sh b/test/btc/bumpfee.sh @@ -57,11 +57,11 @@ mine_btc check_balance 5.80383389 4.19598010 echo " OK" -echo "----- Bump fail -----" +echo "----- Bump stress -----" -echo -n "Replace btc_wire with failing btc_wire" +echo -n "Replace btc_wire with stressed btc_wire" kill $WIRE_PID -fail_btc_wire +stress_btc_wire echo " OK" echo -n "Making wire transfer from exchange:" diff --git a/test/btc/fail.sh b/test/btc/fail.sh @@ -1,81 +0,0 @@ -#!/bin/bash - -## Test btc_wire ability to recover from errors in correctness critical paths - -set -eu - -source "${BASH_SOURCE%/*}/../common.sh" -SCHEMA=btc.sql - -echo "----- Setup fail -----" -echo "Load config file" -load_config -echo "Start database" -setup_db -echo "Start bitcoin node" -init_btc -echo "Start failing btc-wire" -fail_btc_wire -echo "Start gateway" -gateway -echo "" - -SEQ="seq 10 40" - -echo "----- Handle incoming -----" - -echo -n "Making wire transfer to exchange:" -for n in `$SEQ`; do - btc-wire-utils -d $BTC_DIR transfer 0.000$n > /dev/null - mine_btc # Mine transactions -done -next_btc # Trigger btc_wire -echo " OK" - -echo -n "Requesting exchange incoming transaction list:" -check_delta "incoming?delta=-100" "$SEQ" "0.000" -echo " OK" - -echo -n "Check balance:" -check_balance 9.99200479 0.00775000 -echo " OK" - -echo "----- Handle outgoing -----" - -echo -n "Making wire transfer from exchange:" -for n in `$SEQ`; do - taler-exchange-wire-gateway-client \ - -b $BANK_ENDPOINT \ - -C payto://bitcoin/$CLIENT \ - -a BTC:0.0000$n > /dev/null -done -sleep 10 -mine_btc # Mine transactions -echo " OK" - -echo -n "Requesting exchange outgoing transaction list:" -check_delta "outgoing?delta=-100" "$SEQ" -echo " OK" - -echo -n "Check balance:" -check_balance 9.99277979 0.00691331 -echo " OK" - -echo "----- Handle bounce -----" - -clear_wallet - -echo -n "Making incomplete wire transfer to exchange:" -for n in `$SEQ`; do - $BTC_CLI -rpcwallet=client sendtoaddress $WIRE 0.000$n > /dev/null - mine_btc -done -next_btc -sleep 10 -echo " OK" - -echo -n "Check balance:" -check_balance "*" 0.00031000 -echo " OK" - -echo "All tests passed!" -\ No newline at end of file diff --git a/test/btc/stress.sh b/test/btc/stress.sh @@ -1,6 +1,6 @@ #!/bin/bash -## Test btc_wire behavior when ran and stressed concurrently +## Test btc_wire ability to recover from errors in correctness critical paths and prevent concurrent sending set -eu @@ -15,7 +15,7 @@ setup_db echo "Start bitcoin node" init_btc echo "Start btc-wire stressed" -stressed_btc_wire +stress_btc_wire echo "Start gateway" gateway echo "" @@ -49,9 +49,8 @@ for n in `$SEQ`; do -C payto://bitcoin/$CLIENT \ -a BTC:0.0000$n > /dev/null done -sleep 20 # Give time for btc_wire worker to process +sleep 10 # Give time for btc_wire worker to process mine_btc # Mine transactions -sleep 10 check_balance 9.99605389 0.00373821 echo " OK" @@ -70,7 +69,7 @@ echo "----- Recover DB -----" echo "Reset database" reset_db # Clear database tables mine_btc # Trigger worker -sleep 10 +sleep 5 echo -n "Requesting exchange incoming transaction list:" check_delta "incoming?delta=-100" "$SEQ" "0.000" @@ -97,9 +96,9 @@ for n in `$SEQ`; do mine_btc done next_btc -sleep 7 +sleep 5 mine_btc -sleep 5 # Wait for reconnection +sleep 5 echo " OK" echo -n "Check balance:" @@ -111,7 +110,7 @@ echo "----- Recover DB -----" echo "Reset database" reset_db # Clear database tables mine_btc # Trigger worker -sleep 10 +sleep 5 echo -n "Requesting exchange incoming transaction list:" check_delta "incoming?delta=-100" "$SEQ" "0.000" diff --git a/test/common.sh b/test/common.sh @@ -191,16 +191,9 @@ function btc_wire() { WIRE_PID="$!" } -# Start btc_wire with random failures -function fail_btc_wire() { - cargo build --bin btc-wire --release --features fail &> /dev/null - target/release/btc-wire $CONF &>> log/btc_wire.log & - WIRE_PID="$!" -} - -# Start multiple btc_wire in parallel -function stressed_btc_wire() { - cargo build --bin btc-wire --release &> /dev/null +# Start multiple btc_wire with random failures in parallel +function stress_btc_wire() { + cargo build --bin btc-wire --release --features fail &> /dev/null target/release/btc-wire $CONF &>> log/btc_wire.log & target/release/btc-wire $CONF &>> log/btc_wire1.log & target/release/btc-wire $CONF &>> log/btc_wire2.log &