commit 60bfc24eaedf3c8cacfea3b6d2b8fec520a1350d
parent c3d38b3f7af1478bd8055cfeda3ebbe220619ae3
Author: Antoine A <>
Date: Tue, 14 Dec 2021 17:03:07 +0100
btc_wire: improve locking anc correctness
Diffstat:
4 files changed, 141 insertions(+), 80 deletions(-)
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -1,5 +1,5 @@
use bitcoincore_rpc::{
- bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Txid},
+ bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, SignedAmount, Txid},
json::GetTransactionResultDetailCategory as Category,
Client as RPC, RpcApi,
};
@@ -16,7 +16,6 @@ use rand::{rngs::OsRng, RngCore};
use std::{
collections::HashMap,
path::{Path, PathBuf},
- process::exit,
str::FromStr,
time::{Duration, SystemTime},
};
@@ -66,8 +65,9 @@ fn btc_payto_addr(url: &Url) -> Result<Address, String> {
return Ok(Address::from_str(str).map_err(|_| "".to_string())?);
}
-fn btc_amount_to_taler_amount(amount: &BtcAmount) -> Amount {
- let sat = amount.as_sat();
+fn btc_amount_to_taler_amount(amount: &SignedAmount) -> Amount {
+ let unsigned = amount.abs().to_unsigned().unwrap();
+ let sat = unsigned.as_sat();
return Amount::new("BTC", sat / 100_000_000, (sat % 100_000_000) as u32);
}
@@ -165,62 +165,89 @@ impl AutoReloadDb {
/// Listen for new proposed transactions and announce them on the bitcoin network
fn sender(rpc: RPC, mut db: AutoReloadDb) {
- fn get_proposed(
- db: &mut Client,
- ) -> Result<Vec<(i32, BtcAmount, Address, Vec<u8>)>, Box<dyn std::error::Error>> {
+ fn list(db: &mut Client) -> Result<Vec<(i32, Status)>, postgres::Error> {
let mut iter = db.query_raw(
- "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 OR status=$2",
+ "SELECT id, status FROM tx_out WHERE status=$1 OR status=$2",
&[&(Status::Proposed as i16), &(Status::Delayed as i16)],
)?;
let mut out = Vec::new();
while let Some(row) = iter.next()? {
- let id: i32 = row.get(0);
- let amount: Amount = Amount::from_str(row.get(1))?;
- let reserve_pub: &[u8] = row.get(2);
- let credit_addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?;
- let exchange_base_url: Url = Url::parse(row.get(4))?;
- let metadata = encode_info(reserve_pub.try_into()?, &exchange_base_url);
- out.push((
- id,
- taler_amount_to_btc_amount(&amount)?,
- credit_addr,
- metadata,
- ));
+ let status: i16 = row.get(1);
+ out.push((row.get(0), Status::try_from(status as u8).unwrap()));
}
return Ok(out);
}
+ fn get(
+ tx: &mut Transaction,
+ status: Status,
+ id: i32,
+ ) -> Result<Option<(BtcAmount, Address, Vec<u8>)>, Box<dyn std::error::Error>> {
+ let iter = tx.query_opt(
+ "SELECT amount, wtid, credit_acc, exchange_url FROM tx_out WHERE id=$1 AND status=$2",
+ &[&id, &(status as i16)],
+ )?;
+ Ok(if let Some(row) = iter {
+ let amount: Amount = Amount::from_str(row.get(0))?;
+ let reserve_pub: &[u8] = row.get(1);
+ let credit_addr: Address = btc_payto_addr(&Url::parse(row.get(2))?)?;
+ let exchange_base_url: Url = Url::parse(row.get(3))?;
+ let metadata = encode_info(reserve_pub.try_into()?, &exchange_base_url);
+ Some((taler_amount_to_btc_amount(&amount)?, credit_addr, metadata))
+ } else {
+ None
+ })
+ }
+
// TODO check if transactions are abandoned
loop {
let mut db = db.client();
let result: Result<(), Box<dyn std::error::Error>> = (|| {
- for (id, amount, addr, metadata) in get_proposed(&mut db)? {
+ for (id, status) in list(&mut db)? {
// Set status to MANUAL to detect database error preventing atomicity
- db.execute(
- "UPDATE tx_out SET status=$1 WHERE id=$2",
- &[&(Status::Manual as i16), &id],
- )?;
- match rpc.send_op_return(&addr, amount, &metadata) {
- Ok(txid) => {
- let result = db.execute(
- "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
- &[&(Status::Pending as i16), &txid.as_ref(), &id],
- );
- if let Err(e) = result {
- error!("sender: DB - {}", e);
- // The watcher is going to recover the transaction automatically
- } else {
- info!("{} PENDING", txid);
- }
- }
- Err(e) => {
- info!("sender: RPC - {}", e);
- db.execute(
+ {
+ let mut tx = db
+ .build_transaction()
+ .isolation_level(IsolationLevel::Serializable)
+ .start()?;
+ if get(&mut tx, status, id)?.is_some() {
+ tx.execute(
"UPDATE tx_out SET status=$1 WHERE id=$2",
- &[&(Status::Delayed as i16), &id],
+ &[&(Status::Manual as i16), &id],
)?;
+ } else {
+ warn!("sender: transaction status collision, database have been altered by another process");
+ }
+
+ tx.commit()?;
+ }
+ {
+ let mut tx = db
+ .build_transaction()
+ .isolation_level(IsolationLevel::Serializable)
+ .start()?;
+ if let Some((amount, addr, metadata)) = get(&mut tx, Status::Manual, id)? {
+ match rpc.send_op_return(&addr, amount, &metadata) {
+ Ok(tx_id) => {
+ tx.execute(
+ "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
+ &[&(Status::Pending as i16), &tx_id.as_ref(), &id],
+ )?;
+ info!("{} PENDING", tx_id);
+ }
+ Err(e) => {
+ info!("sender: RPC - {}", e);
+ tx.execute(
+ "UPDATE tx_out SET status=$1 WHERE id=$2",
+ &[&(Status::Delayed as i16), &id],
+ )?;
+ }
+ }
+ } else {
+ warn!("sender: transaction status collision, database have been altered by another process");
}
+ tx.commit()?;
}
}
Ok(())
@@ -240,18 +267,14 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
let db = db.client();
let result: Result<(), Box<dyn std::error::Error>> = (|| {
- // We should be the only one to interact with the database but we enforce it
- let mut tx = db
- .build_transaction()
- .isolation_level(IsolationLevel::Serializable)
- .start()?;
- let last_hash: Option<BlockHash> = tx
+ // Get stored last_hash
+ let last_hash: Option<BlockHash> = db
.query_opt("SELECT value FROM state WHERE name='last_hash'", &[])?
.map(|r| BlockHash::from_slice(r.get(0)).unwrap());
+ // Get all transactions made since this block
let list =
rpc.list_since_block(last_hash.as_ref(), Some(confirmation), None, Some(true))?;
-
- // List all confirmed send and receive transactions since last check
+ // Keep only confirmed send and receive transactions
let txs: HashMap<Txid, Category> = list
.transactions
.into_iter()
@@ -268,8 +291,12 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
Category::Send => match rpc.get_tx_op_return(&id) {
Ok((full, bytes)) => {
let (wtid, url) = decode_info(&bytes);
+ let mut tx = db
+ .build_transaction()
+ .isolation_level(IsolationLevel::Serializable)
+ .start()?;
let row = tx.query_opt(
- "SELECT status, id FROM tx_out WHERE wtid=$1",
+ "SELECT status, id FROM tx_out WHERE wtid=$1 FOR UPDATE",
&[&wtid.as_ref()],
)?;
if let Some(row) = row {
@@ -297,9 +324,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
let credit_addr = full.tx.details[0].address.as_ref().unwrap();
let time = full.tx.info.blocktime.unwrap();
let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time);
- let amount = btc_amount_to_taler_amount(
- &full.tx.amount.abs().to_unsigned()?,
- );
+ let amount = btc_amount_to_taler_amount(&full.tx.amount);
// Generate a random request_uid
let mut request_uid = [0; 64];
OsRng.fill_bytes(&mut request_uid);
@@ -310,6 +335,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
)?;
warn!("watcher: found an unregistered outgoing address {} {} in tx {}", crockford_base32_encode(&wtid), &url, id);
}
+ tx.commit()?;
}
Err(err) => match err {
GetOpReturnErr::MissingOpReturn => {} // ignore
@@ -318,15 +344,26 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
},
Category::Receive => match rpc.get_tx_segwit_key(&id) {
Ok((full, reserve_pub)) => {
- let debit_addr = sender_address(&rpc, &full)?;
- let credit_addr = full.tx.details[0].address.as_ref().unwrap();
- let time = full.tx.info.blocktime.unwrap();
- let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time);
- let amount = btc_amount_to_taler_amount(&full.tx.amount.to_unsigned()?);
- tx.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5)", &[
- &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string()
- ])?;
- info!("{} << {} {}", &debit_addr, &credit_addr, &amount);
+ let mut tx = db
+ .build_transaction()
+ .isolation_level(IsolationLevel::Serializable)
+ .start()?;
+ let row = tx.query_opt(
+ "SELECT id FROM tx_in WHERE reserve_pub=$1 FOR UPDATE",
+ &[&reserve_pub.as_ref()],
+ )?;
+ if row.is_none() {
+ let debit_addr = sender_address(&rpc, &full)?;
+ let credit_addr = full.tx.details[0].address.as_ref().unwrap();
+ let time = full.tx.info.blocktime.unwrap();
+ let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time);
+ let amount = btc_amount_to_taler_amount(&full.tx.amount);
+ tx.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5)", &[
+ &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string()
+ ])?;
+ info!("{} << {} {}", &debit_addr, &credit_addr, &amount);
+ }
+ tx.commit()?;
}
Err(err) => match err {
GetSegwitErr::Decode(
@@ -339,13 +376,35 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
Category::Generate | Category::Immature | Category::Orphan => {}
}
}
- let query = if let Some(_) = last_hash {
- "UPDATE state SET value=$1 WHERE name='last_hash'"
- } else {
- "INSERT INTO state (name, value) VALUES ('last_hash', $1)"
- };
- tx.execute(query, &[&list.lastblock.as_ref()])?;
- tx.commit()?;
+ // Move last_hash forward if no error have been caught
+ {
+ let mut tx = db
+ .build_transaction()
+ .isolation_level(IsolationLevel::Serializable)
+ .start()?;
+ let curr_hash: Option<BlockHash> = tx
+ .query_opt(
+ "SELECT value FROM state WHERE name='last_hash' FOR UPDATE",
+ &[],
+ )?
+ .map(|r| BlockHash::from_slice(r.get(0)).unwrap());
+
+ if last_hash != curr_hash {
+ error!("watcher: hash state collision, database have been altered by another process");
+ tx.execute("DELETE FROM state WHERE name = 'last_hash'", &[])?;
+ } else if curr_hash.is_some() {
+ tx.execute(
+ "UPDATE state SET value=$1 WHERE name='last_hash'",
+ &[&list.lastblock.as_ref()],
+ )?;
+ } else {
+ tx.execute(
+ "INSERT INTO state (name, value) VALUES ('last_hash', $1)",
+ &[&list.lastblock.as_ref()],
+ )?;
+ };
+ tx.commit()?;
+ }
Ok(())
})();
if let Err(e) = result {
diff --git a/script/setup.sh b/script/setup.sh
@@ -33,8 +33,10 @@ function setup_btc() {
done
RESERVE=`$BTC_CLI -rpcwallet=reserve getnewaddress`
CLIENT=`$BTC_CLI -rpcwallet=client getnewaddress`
+ WIRE=`$BTC_CLI -rpcwallet=wire getnewaddress`
$BTC_CLI generatetoaddress 101 $RESERVE > /dev/null
$BTC_CLI -rpcwallet=reserve sendtoaddress $CLIENT 1 > /dev/null
+ $BTC_CLI -rpcwallet=reserve sendtoaddress $WIRE 1 > /dev/null
$BTC_CLI generatetoaddress 1 $RESERVE > /dev/null
}
@@ -58,8 +60,8 @@ function btc_wire() {
cargo build --bin btc-wire &> /dev/null
target/debug/btc-wire $BTC_DIR &> btc_wire.log &
# Can be used to test db transactions serialization
- #target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
- #target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
+ # target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
+ # target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
}
function gateway() {
diff --git a/script/test_btc_wire.sh b/script/test_btc_wire.sh
@@ -27,7 +27,7 @@ init_btc
echo "Init bitcoin regtest"
setup_btc
next_btc
-check_balance 1.00000000 0.00000000
+check_balance 1.00000000 1.00000000
echo "Start btc-wire"
btc_wire
echo "Start gateway"
@@ -39,7 +39,7 @@ echo "---- Gateway API -----"
echo -n "Making wire transfer to exchange:"
btc-wire-cli -d $BTC_DIR transfer 0.00004
next_btc
-check_balance 0.99995209 0.00004000
+check_balance 0.99995209 1.00004000
echo " OK"
echo -n "Requesting exchange incoming transaction list:"
@@ -53,7 +53,7 @@ taler-exchange-wire-gateway-client \
-a BTC:0.00002 > /dev/null
next_btc # Send transaction
next_btc # Trigger watcher
-check_balance 0.99997209 0.00001801
+check_balance 0.99997209 1.00001801
echo " OK"
echo -n "Requesting exchange's outgoing transaction list:"
diff --git a/script/test_recover_db.sh b/script/test_recover_db.sh
@@ -26,7 +26,7 @@ init_btc
echo "Init bitcoin regtest"
setup_btc
next_btc
-check_balance 1.00000000 0.00000000
+check_balance 1.00000000 1.00000000
echo "Start btc-wire"
btc_wire
echo "Start gateway"
@@ -37,7 +37,7 @@ echo "---- With DB -----"
echo "Making wire transfer to exchange:"
btc-wire-cli -d $BTC_DIR transfer 0.000042
next_btc
-check_balance 0.99995009 0.00004200
+check_balance 0.99995009 1.00004200
echo -n "Requesting exchange incoming transaction list:"
taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i | grep BTC:0.000042 > /dev/null && echo " OK" || echo " Failed"
@@ -48,7 +48,7 @@ sudo service postgresql stop > /dev/null
echo "Making wire transfer to exchange:"
btc-wire-cli -d $BTC_DIR transfer 0.00004
next_btc
-check_balance 0.99990218 0.00008200
+check_balance 0.99990218 1.00008200
echo -n "Requesting exchange incoming transaction list:"
taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i 2>&1 | grep -q "504" && echo " OK" || echo " Failed"
@@ -66,7 +66,7 @@ taler-exchange-wire-gateway-client \
-a BTC:0.00002 > /dev/null
next_btc # Send transaction
next_btc # Trigger watcher
-check_balance 0.99992218 0.00006001
+check_balance 0.99992218 1.00006001
echo " OK"
echo -n "Requesting exchange's outgoing transaction list:"
@@ -96,6 +96,6 @@ done
echo ""
# Balance should not have changed
-check_balance 0.99992218 0.00006001
+check_balance 0.99992218 1.00006001
echo "All tests passed"
\ No newline at end of file