depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit c9693b859e564174db47c60859742a0ee314ab1b
parent 656f4092bd42002e33362cb2aba75cadd66c9716
Author: Antoine A <>
Date:   Wed, 12 Jan 2022 16:47:47 +0100

btc-wire: reduce the excessive amount of row locking

Diffstat:
Mbtc-wire/src/main.rs | 95+++++++++++++++++++++++++++++++++++++------------------------------------------
Mbtc-wire/src/reconnect.rs | 6+++---
Mtest/btc/stress.sh | 6+++---
Mtest/common.sh | 1+
4 files changed, 51 insertions(+), 57 deletions(-)

diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs @@ -257,30 +257,25 @@ fn sync_chain( } // Move last_hash forward if no error have been caught { - let mut tx = db.transaction()?; - let curr_hash: Option<BlockHash> = tx - .query_opt( - "SELECT value FROM state WHERE name='last_hash' FOR UPDATE", - &[], - )? + let curr_hash: Option<BlockHash> = db + .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])? .map(|r| BlockHash::from_slice(r.get(0)).unwrap()); - if last_hash != curr_hash { - error!("watcher: hash state collision, database have been altered by another process"); - } - - if curr_hash.is_some() { - tx.execute( - "UPDATE state SET value=$1 WHERE name='last_hash'", - &[&lastblock.as_ref()], - )?; + let nb_row = if let Some(hash) = &curr_hash { + db.execute( + "UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2", + &[&lastblock.as_ref(), &hash.as_ref()], + )? } else { - tx.execute( - "INSERT INTO state (name, value) VALUES ('last_hash', $1)", + db.execute( + "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING", &[&lastblock.as_ref()], - )?; + )? }; - tx.commit()?; + + if last_hash != curr_hash || nb_row == 0 { + error!("watcher: hash state collision, database have been altered by another process"); + } } Ok(()) } @@ -391,8 +386,7 @@ fn sync_chain_outgoing( } } else { // Get previous out tx - let mut tx = db.transaction()?; - let row = tx.query_opt( + let row = db.query_opt( "SELECT id, status FROM tx_out WHERE wtid=$1 FOR UPDATE", &[&wtid.as_ref()], )?; @@ -402,18 +396,19 @@ fn sync_chain_outgoing( let status: i16 = row.get(1); match TxStatus::try_from(status as u8).unwrap() { TxStatus::Requested | TxStatus::Delayed => { - tx.execute( - "UPDATE tx_out SET status=$1 where id=$2", - &[&(TxStatus::Sent as i16), &_id], + let nb_row = db.execute( + "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3", + &[&(TxStatus::Sent as i16), &_id, &status], )?; - tx.commit()?; - warn!( - ">> (recovered) {} {} in {} to {}", - amount, - base32(&wtid), - id, - credit_addr - ); + if nb_row > 0 { + warn!( + ">> (recovered) {} {} in {} to {}", + amount, + base32(&wtid), + id, + credit_addr + ); + } } TxStatus::Sent => { /* Status is correct */ } } @@ -421,11 +416,10 @@ fn sync_chain_outgoing( // Else add to database let debit_addr = sender_address(rpc, &full)?; let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); - let nb = tx.execute( - "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", - &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &None::<&[u8]>], - )?; - tx.commit()?; + let nb = db.execute( + "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", + &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &None::<&[u8]>], + )?; if nb > 0 { warn!( ">> (onchain) {} {} in {} to {}", @@ -450,9 +444,8 @@ fn sync_chain_outgoing( } } else { // Get previous bounce - let mut tx = db.transaction()?; - let row = tx.query_opt( - "SELECT id, status FROM bounce WHERE bounced=$1 FOR UPDATE", + let row = db.query_opt( + "SELECT id, status FROM bounce WHERE bounced=$1", &[&bounced.as_ref()], )?; if let Some(row) = row { @@ -461,12 +454,13 @@ fn sync_chain_outgoing( let status: i16 = row.get(1); match BounceStatus::try_from(status as u8).unwrap() { BounceStatus::Requested | BounceStatus::Delayed => { - tx.execute( - "UPDATE bounce SET status=$1 where id=$2", - &[&(BounceStatus::Sent as i16), &_id], + let nb_row = db.execute( + "UPDATE bounce SET status=$1 WHERE id=$2 AND status=$3", + &[&(BounceStatus::Sent as i16), &_id, &status], )?; - tx.commit()?; - warn!("|| (recovered) {} in {}", &bounced, &id); + if nb_row > 0 { + warn!("|| (recovered) {} in {}", &bounced, &id); + } } BounceStatus::Ignored => error!( "watcher: ignored bounce {} found in chain at {}", @@ -476,11 +470,10 @@ fn sync_chain_outgoing( } } else { // Else add to database - let nb = tx.execute( - "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", - &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)], - )?; - tx.commit()?; + let nb = db.execute( + "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", + &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)], + )?; if nb > 0 { warn!("|| (onchain) {} in {}", &bounced, &id); } @@ -548,7 +541,7 @@ fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) { Ok(()) })(); if let Err(e) = result { - error!("listener - {}", e); + error!("listener: {}", e); } } } diff --git a/btc-wire/src/reconnect.rs b/btc-wire/src/reconnect.rs @@ -32,12 +32,12 @@ impl AutoReconnectRPC { Ok(mut new) => match new.net_info() { Ok(_) => return new, Err(err) => { - error!("connect: RPC - {}", err); + error!("connect RPC: {}", err); std::thread::sleep(RECONNECT_DELAY); } }, Err(err) => { - error!("connect: RPC - {}", err); + error!("connect RPC: {}", err); std::thread::sleep(RECONNECT_DELAY); } } @@ -74,7 +74,7 @@ impl AutoReconnectSql { match Client::connect(config, NoTls) { Ok(new) => return new, Err(err) => { - error!("connect: DB - {}", err); + error!("connect DB: {}", err); std::thread::sleep(RECONNECT_DELAY); } } diff --git a/test/btc/stress.sh b/test/btc/stress.sh @@ -97,9 +97,9 @@ for n in `$SEQ`; do mine_btc done next_btc -sleep 10 +sleep 7 mine_btc -sleep 10 +sleep 5 echo " OK" echo -n "Check balance:" @@ -111,7 +111,7 @@ echo "----- Recover DB -----" echo "Reset database" reset_db # Clear database tables mine_btc # Trigger worker -sleep 20 +sleep 10 echo -n "Requesting exchange incoming transaction list:" check_delta "incoming?delta=-100" "$SEQ" "0.000" diff --git a/test/common.sh b/test/common.sh @@ -202,6 +202,7 @@ function stressed_btc_wire() { cargo build --bin btc-wire --release &> /dev/null target/release/btc-wire $CONF &>> log/btc_wire.log & target/release/btc-wire $CONF &>> log/btc_wire1.log & + target/release/btc-wire $CONF &>> log/btc_wire2.log & } # ----- Gateway ------ #