commit 36a2771a15840ce42a2fc5ac582d5835de9aa601
parent 2309c24727cc5e32764b158494eca470bee30fa9
Author: Antoine A <>
Date: Wed, 15 Dec 2021 19:35:51 +0100
btc_wire: handle failures in the critical path
Diffstat:
6 files changed, 75 insertions(+), 27 deletions(-)
diff --git a/btc-wire/src/fail_point.rs b/btc-wire/src/fail_point.rs
@@ -5,10 +5,8 @@ use taler_log::log::warn;
pub fn fail_point(msg: &'static str, prob: f32) -> Result<(), &'static str> {
#[cfg(feature = "fail")]
return if fastrand::f32() < prob {
- panic!("lol");
Err(msg)
} else {
- panic!("lol");
Ok(())
};
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -1,5 +1,5 @@
use bitcoincore_rpc::{
- bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, SignedAmount, Txid},
+ bitcoin::{hashes::Hash, Address, Amount as BtcAmount, Block, BlockHash, SignedAmount, Txid},
json::GetTransactionResultDetailCategory as Category,
Client as RPC, RpcApi,
};
@@ -166,10 +166,16 @@ impl AutoReloadDb {
}
}
+fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> {
+ Ok(db
+ .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])?
+ .map(|r| BlockHash::from_slice(r.get(0)).unwrap()))
+}
+
/// Listen for new proposed transactions and announce them on the bitcoin network
-fn sender(rpc: RPC, mut db: AutoReloadDb) {
+fn sender(rpc: RPC, mut db: AutoReloadDb, _config: &Config) {
// List all transactions waiting to be sent
- fn list(db: &mut Client) -> Result<Vec<(i32, Status)>, postgres::Error> {
+ fn list_waiting(db: &mut Client) -> Result<Vec<(i32, Status)>, postgres::Error> {
let mut iter = db.query_raw(
"SELECT id, status FROM tx_out WHERE status=$1 OR status=$2",
&[&(Status::Proposed as i16), &(Status::Delayed as i16)],
@@ -182,6 +188,20 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
return Ok(out);
}
+ // List all transactions waiting to be sent
+ fn list_manual(db: &mut Client) -> Result<Vec<(i32, [u8; 32])>, postgres::Error> {
+ let mut iter = db.query_raw(
+ "SELECT id, wtid FROM tx_out WHERE status=$1",
+ &[&(Status::Manual as i16)],
+ )?;
+ let mut out = Vec::new();
+ while let Some(row) = iter.next()? {
+ let wtid: &[u8] = row.get(1);
+ out.push((row.get(0), wtid.try_into().unwrap()));
+ }
+ return Ok(out);
+ }
+
// Perform a transaction on the blockchain
// The transaction must be in the manual state
fn perform_send(db: &mut Client, rpc: &RPC, id: i32) -> Result<(), Box<dyn std::error::Error>> {
@@ -198,13 +218,10 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
let exchange_base_url: Url = Url::parse(row.get(3))?;
let metadata = encode_info(reserve_pub.try_into()?, &exchange_base_url);
- if let Err(err) = fail_point("Skip send_op_return", 0.4) {
- error!("sender: fail - {}", err);
- return Ok(());
- }
+ fail_point("Skip send_op_return", 0.3)?;
match rpc.send_op_return(&addr, amount, &metadata) {
Ok(tx_id) => {
- fail_point("Fail update db", 0.5)?;
+ fail_point("Fail update db", 0.4)?;
tx.execute(
"UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
&[&(Status::Pending as i16), &tx_id.as_ref(), &id],
@@ -213,7 +230,7 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
}
Err(e) => {
info!("sender: RPC - {}", e);
- fail_point("Fail update db", 0.5)?;
+ fail_point("Fail update db", 0.4)?;
tx.execute(
"UPDATE tx_out SET status=$1 WHERE id=$2",
&[&(Status::Delayed as i16), &id],
@@ -232,7 +249,8 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
loop {
let mut db = db.client();
let result: Result<(), Box<dyn std::error::Error>> = (|| {
- for (id, status) in list(&mut db)? {
+ // Send waiting transactions
+ for (id, status) in list_waiting(&mut db)? {
// Set status to MANUAL to detect database error preventing atomicity
let nb = db.execute(
"UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3",
@@ -240,9 +258,41 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
)?;
if nb == 0 {
warn!("sender: transaction status collision, database have been altered by another process");
- }
+ }
perform_send(db, &rpc, id)?;
}
+ // Try to recover transactions stuck in manual
+ let mut manuals = list_manual(&mut db)?;
+ if !manuals.is_empty() {
+ let last_hash = last_hash(db)?;
+ let txs = rpc.list_since_block(last_hash.as_ref(), None, None, None)?;
+ // Search for a matching unconfirmed transactions
+ for tx in txs.transactions {
+ if tx.detail.category == Category::Send {
+ if let Ok((_, bytes)) = rpc.get_tx_op_return(&tx.info.txid) {
+ let (wtid, _) = decode_info(&bytes);
+ if let Some(pos) = manuals.iter().position(|(_, it)| it == &wtid) {
+ let (id, wtid) = manuals.swap_remove(pos);
+ let nb = db.execute(
+ "UPDATE tx_out SET status=$1 WHERE wtid=$2 AND status=$3 RETURNING id",
+ &[
+ &(Status::Pending as i16),
+ &wtid.as_ref(),
+ &(Status::Manual as i16),
+ ],
+ )?;
+ if nb > 0 {
+ warn!("sender: tx {} have been recovered automatically", id);
+ }
+ }
+ }
+ }
+ }
+ // If nothing match, retry to send
+ for (id, _) in manuals {
+ perform_send(db, &rpc, id)?;
+ }
+ }
Ok(())
})();
if let Err(e) = result {
@@ -259,9 +309,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
let result: Result<(), Box<dyn std::error::Error>> = (|| {
// Get stored last_hash
- let last_hash: Option<BlockHash> = db
- .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])?
- .map(|r| BlockHash::from_slice(r.get(0)).unwrap());
+ let last_hash = last_hash(db)?;
// Get all transactions made since this block
let list = rpc.list_since_block(
last_hash.as_ref(),
@@ -411,6 +459,7 @@ fn main() {
.map(|str| PathBuf::from_str(&str).unwrap())
.unwrap_or(default_data_dir());
let config = taler_config::Config::from_path("test.conf");
+ let config: &'static Config = Box::leak(Box::new(config));
let network = dirty_guess_network(&data_dir);
let rpc = common_rpc(&data_dir, network).unwrap();
rpc.load_wallet(&WIRE).ok();
@@ -419,7 +468,7 @@ fn main() {
let db_watcher = AutoReloadDb::new(&config.db_url, Duration::from_secs(5));
let db_sender = AutoReloadDb::new(&config.db_url, Duration::from_secs(5));
- let join = std::thread::spawn(move || sender(rpc_sender, db_sender));
+ let join = std::thread::spawn(move || sender(rpc_sender, db_sender, &config));
watcher(rpc_watcher, db_watcher, &config);
join.join().unwrap();
}
diff --git a/makefile b/makefile
@@ -6,4 +6,5 @@ test:
script/test_gateway.sh
script/test_btc_wire.sh
script/test_recover_db.sh
- script/test_btc_stress.sh
-\ No newline at end of file
+ script/test_btc_stress.sh
+ script/test_btc_fail.sh
+\ No newline at end of file
diff --git a/script/setup.sh b/script/setup.sh
@@ -70,7 +70,7 @@ function next_btc() {
function check_balance() {
CLIENT_BALANCE=`$BTC_CLI -rpcwallet=client getbalance`
WIRE_BALANCE=`$BTC_CLI -rpcwallet=wire getbalance`
- if [ "$CLIENT_BALANCE" != "$1" ] || [ "$WIRE_BALANCE" != "$2" ]; then
+ if [ "$CLIENT_BALANCE" != "$1" ] || [ "$WIRE_BALANCE" != "${2:-$WIRE_BALANCE}" ]; then
echo "expected: client $1 wire $2 got: client $CLIENT_BALANCE wire $WIRE_BALANCE"
exit 1
fi
@@ -117,7 +117,7 @@ function check_delta() {
return 1
fi
done
- NB=`echo $ALL | grep -o BTC | wc -l`
+ NB=`echo $ALL | grep -o BTC:0.0000 | wc -l`
EXPECTED=`$2 | wc -w`
if [ "$EXPECTED" != "$NB" ]; then
echo -n " expected: $EXPECTED txs found $NB"
diff --git a/script/test_btc_fail.sh b/script/test_btc_fail.sh
@@ -66,10 +66,10 @@ for n in `$SEQ`; do
-C payto://bitcoin/$CLIENT \
-a BTC:0.0000$n > /dev/null
done
+sleep 20
+next_btc # Trigger watcher
+sleep 20
next_btc # Mine transactions
-sleep 5
-next_btc # Trigger watcher twice, never sure
-sleep 3
echo " OK"
echo -n "Requesting exchange outgoing transaction list:"
@@ -77,5 +77,5 @@ check outgoing
echo " OK"
echo -n "Check balance:"
-check_balance 9.99928810 0.99982002
+check_balance 9.99928810
echo " OK"
\ No newline at end of file
diff --git a/script/test_btc_stress.sh b/script/test_btc_stress.sh
@@ -80,7 +80,7 @@ check outgoing
echo " OK"
echo -n "Check balance:"
-check_balance 9.99928810 0.99982002
+check_balance 9.99928810
echo " OK"
echo "----- Recover DB -----"
@@ -100,7 +100,7 @@ echo " OK"
echo -n "Check balance:"
# Balance should not have changed
-check_balance 9.99928810 0.99982002
+check_balance 9.99928810
echo " OK"
echo "All tests passed"
\ No newline at end of file