commit b3fd5aa4ca8dc4006ebb82539be944e5caaf8e31
parent b4b8c17004c45eb0227cff415d6e2868a70644f6
Author: Antoine A <>
Date: Tue, 14 Dec 2021 13:46:46 +0100
btc-wire: Handle potential db concurrence
Diffstat:
4 files changed, 51 insertions(+), 51 deletions(-)
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -11,7 +11,7 @@ use btc_wire::{
ClientExtended, GetOpReturnErr, GetSegwitErr,
};
use configparser::ini::Ini;
-use postgres::{fallible_iterator::FallibleIterator, Client, NoTls};
+use postgres::{fallible_iterator::FallibleIterator, Client, IsolationLevel, NoTls, Transaction};
use rand::{rngs::OsRng, RngCore};
use std::{
collections::HashMap,
@@ -126,7 +126,7 @@ mod test {
/// Listen for new proposed transactions and announce them on the bitcoin network
fn sender(rpc: RPC, mut db: AutoReloadDb) {
fn get_proposed(
- db: &mut Client,
+ db: &mut Transaction,
) -> Result<Vec<(i32, BtcAmount, Address, Vec<u8>)>, Box<dyn std::error::Error>> {
let mut iter = db.query_raw(
"SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 OR status=$2",
@@ -150,33 +150,39 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
return Ok(out);
}
+ // TODO check if transactions are abandoned
+
loop {
let db = db.client();
let result: Result<(), Box<dyn std::error::Error>> = (|| {
- for (id, amount, addr, metadata) in get_proposed(db)? {
+ // We should be the only one to interact with the database but we enforce it
+ let mut tx = db
+ .build_transaction()
+ .isolation_level(IsolationLevel::Serializable)
+ .start()?;
+ for (id, amount, addr, metadata) in get_proposed(&mut tx)? {
+ tx.execute(
+ "UPDATE tx_out SET status=$1 WHERE id=$2",
+ &[&(Status::Delayed as i16), &id],
+ )?;
match rpc.send_op_return(&addr, amount, &metadata) {
Ok(txid) => {
- db.execute(
+ tx.execute(
"UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
&[&(Status::Pending as i16), &txid.as_ref(), &id],
)?;
info!("{} PENDING", txid);
}
- Err(e) => {
- info!("sender: RPC - {}", e);
- db.execute(
- "UPDATE tx_out SET status=$1 WHERE id=$3",
- &[&(Status::Delayed as i16), &id],
- )?;
- }
+ Err(e) => info!("sender: RPC - {}", e),
}
}
+ tx.commit()?;
Ok(())
})();
if let Err(e) = result {
error!("sender: DB - {}", e);
}
- std::thread::sleep(Duration::from_millis(300));
+ std::thread::sleep(Duration::from_millis(rand::random::<u8>() as u64));
}
}
@@ -219,22 +225,20 @@ impl AutoReloadDb {
/// Listen for mined block and index confirmed transactions into the database
fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
- let mut last_hash: Option<BlockHash> = {
- let db = db.client();
- let stored_hash = db
- .query("SELECT value FROM state WHERE name='last_hash'", &[])
- .unwrap();
- if stored_hash.len() == 1 {
- Some(BlockHash::from_slice(stored_hash[0].get(0)).unwrap())
- } else {
- None
- }
- };
let confirmation = 1;
loop {
let db = db.client();
+
let result: Result<(), Box<dyn std::error::Error>> = (|| {
+ // We should be the only one to interact with the database but we enforce it
+ let mut tx = db
+ .build_transaction()
+ .isolation_level(IsolationLevel::Serializable)
+ .start()?;
+ let last_hash: Option<BlockHash> = tx
+ .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])?
+ .map(|r| BlockHash::from_slice(r.get(0)).unwrap());
let list =
rpc.list_since_block(last_hash.as_ref(), Some(confirmation), None, Some(true))?;
@@ -255,7 +259,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
Category::Send => match rpc.get_tx_op_return(&id) {
Ok((full, bytes)) => {
let (wtid, url) = decode_info(&bytes);
- let row = db.query_opt(
+ let row = tx.query_opt(
"SELECT status, wtid, id FROM tx_out WHERE txid=$1",
&[&id.as_ref()],
)?;
@@ -269,7 +273,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
}
let status: Status = Status::try_from(status as u8).unwrap();
if status != Status::Confirmed {
- db.execute(
+ tx.execute(
"UPDATE tx_out SET status=$1 where id=$2",
&[&(Status::Confirmed as i16), &_id],
)?;
@@ -284,12 +288,13 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
let credit_addr = full.tx.details[0].address.as_ref().unwrap();
let time = full.tx.info.blocktime.unwrap();
let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time);
- let amount =
- btc_amount_to_taler_amount(&full.tx.amount.abs().to_unsigned()?);
+ let amount = btc_amount_to_taler_amount(
+ &full.tx.amount.abs().to_unsigned()?,
+ );
// Generate a random request_uid
let mut request_uid = [0; 64];
OsRng.fill_bytes(&mut request_uid);
- db.execute(
+ tx.execute(
"INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
&[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string(), &config.base_url.to_string(), &(Status::Confirmed as i16), &id.as_ref(), &request_uid.as_ref()
],
@@ -309,7 +314,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
let time = full.tx.info.blocktime.unwrap();
let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time);
let amount = btc_amount_to_taler_amount(&full.tx.amount.to_unsigned()?);
- db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5)", &[
+ tx.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5)", &[
&date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string()
])?;
info!("{} << {} {}", &debit_addr, &credit_addr, &amount);
@@ -330,15 +335,15 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
} else {
"INSERT INTO state (name, value) VALUES ('last_hash', $1)"
};
- db.execute(query, &[&list.lastblock.as_ref()])?;
- last_hash = Some(list.lastblock);
+ tx.execute(query, &[&list.lastblock.as_ref()])?;
+ tx.commit()?;
Ok(())
})();
if let Err(e) = result {
- error!("sender: DB - {}", e);
+ error!("watcher: DB - {}", e);
}
- info!("sender: Wait for block");
+ info!("watcher: Wait for block");
rpc.wait_for_new_block(0).ok();
}
}
diff --git a/script/setup.sh b/script/setup.sh
@@ -56,7 +56,10 @@ function check_balance() {
function btc_wire() {
cargo build --bin btc-wire &> /dev/null
- target/debug/btc-wire $BTC_DIR &> btc_wire.log &
+ target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
+ # Can be used to test serialization
+ # target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
+ # target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
}
function gateway() {
diff --git a/script/test_btc_wire.sh b/script/test_btc_wire.sh
@@ -51,8 +51,9 @@ taler-exchange-wire-gateway-client \
-b $BANK_ENDPOINT \
-C payto://bitcoin/$CLIENT \
-a BTC:0.00002 > /dev/null
-next_btc
-check_balance 0.99995209 0.00001801
+next_btc # Send transaction
+next_btc # Trigger watcher
+check_balance 0.99997209 0.00001801
echo " OK"
echo -n "Requesting exchange's outgoing transaction list:"
diff --git a/script/test_recover_db.sh b/script/test_recover_db.sh
@@ -64,8 +64,8 @@ taler-exchange-wire-gateway-client \
-b $BANK_ENDPOINT \
-C payto://bitcoin/$CLIENT \
-a BTC:0.00002 > /dev/null
-next_btc
-next_btc
+next_btc # Send transaction
+next_btc # Trigger watcher
check_balance 0.99992218 0.00006001
echo " OK"
@@ -73,22 +73,13 @@ echo -n "Requesting exchange's outgoing transaction list:"
taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -o | grep BTC:0.00002 > /dev/null
echo " OK"
-echo "----- Destroy DB ------"
+echo "---- Recover DB -----"
-echo "Stop all jobs"
-for n in `jobs -p`; do
- kill $n || true
-done
echo "Reset database"
-reset_db
-echo "Start bitcoin node"
-restart_btc
-echo "Start btc-wire"
-btc_wire
-echo "Start gateway"
-gateway
+reset_db # Clear database tables
+next_btc # Trigger watcher
-echo "---- Recover DB -----"
+sleep 2
echo -n "Checking recover incoming transactions:"
ALL=`taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i`;