commit e21dbc6255dad5c30de11a9622bb4c02af68a4b0
parent d0a58b074225b5c277253b3aae4bcd52fa7714c3
Author: Antoine A <>
Date: Wed, 15 Dec 2021 15:04:00 +0100
Learn how concurrency work in postgresql the hard way
Diffstat:
8 files changed, 121 insertions(+), 57 deletions(-)
diff --git a/btc-wire/Cargo.toml b/btc-wire/Cargo.toml
@@ -3,6 +3,10 @@ name = "btc-wire"
version = "0.1.0"
edition = "2021"
+[features]
+# Enable random failures
+fail = []
+
[dependencies]
# Typed bitcoin json-rpc library
bitcoincore-rpc = "0.14.0"
diff --git a/btc-wire/src/fail_point.rs b/btc-wire/src/fail_point.rs
@@ -0,0 +1,14 @@
+#[cfg(feature = "fail")]
+use taler_log::log::warn;
+
+#[allow(unused_variables)]
+pub fn fail_point(msg: &'static str, prob: f32) -> Result<(), &'static str> {
+ #[cfg(feature = "fail")]
+ return if fastrand::f32() < prob {
+ Err(msg)
+ } else {
+ Ok(())
+ };
+
+ return Ok(());
+}
+\ No newline at end of file
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -10,7 +10,7 @@ use btc_wire::{
segwit::DecodeSegWitErr,
ClientExtended, GetOpReturnErr, GetSegwitErr,
};
-use postgres::{fallible_iterator::FallibleIterator, Client, IsolationLevel, NoTls, Transaction};
+use postgres::{fallible_iterator::FallibleIterator, Client, NoTls, Transaction};
use rand::{rngs::OsRng, RngCore};
use std::{
collections::HashMap,
@@ -22,6 +22,10 @@ use taler_log::log::{error, info, warn};
use url::Url;
use wire_gateway::api_common::{crockford_base32_encode, Amount};
+use crate::fail_point::fail_point;
+
+mod fail_point;
+
#[repr(u8)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Status {
@@ -182,8 +186,9 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
status: Status,
id: i32,
) -> Result<Option<(BtcAmount, Address, Vec<u8>)>, Box<dyn std::error::Error>> {
+ // We lock the row with FOR UPDATE to prevent sending same transaction multiple time
let iter = tx.query_opt(
- "SELECT amount, wtid, credit_acc, exchange_url FROM tx_out WHERE id=$1 AND status=$2",
+ "SELECT amount, wtid, credit_acc, exchange_url FROM tx_out WHERE id=$1 AND status=$2 FOR UPDATE",
&[&id, &(status as i16)],
)?;
Ok(if let Some(row) = iter {
@@ -206,10 +211,7 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
for (id, status) in list(&mut db)? {
// Set status to MANUAL to detect database error preventing atomicity
{
- let mut tx = db
- .build_transaction()
- .isolation_level(IsolationLevel::Serializable)
- .start()?;
+ let mut tx = db.transaction()?;
if get(&mut tx, status, id)?.is_some() {
tx.execute(
"UPDATE tx_out SET status=$1 WHERE id=$2",
@@ -222,13 +224,15 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
tx.commit()?;
}
{
- let mut tx = db
- .build_transaction()
- .isolation_level(IsolationLevel::Serializable)
- .start()?;
+ let mut tx = db.transaction()?;
if let Some((amount, addr, metadata)) = get(&mut tx, Status::Manual, id)? {
+ if let Err(err) = fail_point("Skip send_op_return", 0.4) {
+ error!("sender: fail - {}", err);
+ continue;
+ }
match rpc.send_op_return(&addr, amount, &metadata) {
Ok(tx_id) => {
+ fail_point("Fail update db", 0.5)?;
tx.execute(
"UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
&[&(Status::Pending as i16), &tx_id.as_ref(), &id],
@@ -237,6 +241,7 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
}
Err(e) => {
info!("sender: RPC - {}", e);
+ fail_point("Fail update db", 0.5)?;
tx.execute(
"UPDATE tx_out SET status=$1 WHERE id=$2",
&[&(Status::Delayed as i16), &id],
@@ -246,7 +251,10 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
} else {
warn!("sender: transaction status collision, database have been altered by another process");
}
- tx.commit()?;
+ tx.commit().map_err(|e| {
+ warn!("### --- COMMIT FAILED --- ###");
+ return e;
+ })?;
}
}
Ok(())
@@ -292,10 +300,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
Category::Send => match rpc.get_tx_op_return(&id) {
Ok((full, bytes)) => {
let (wtid, url) = decode_info(&bytes);
- let mut tx = db
- .build_transaction()
- .isolation_level(IsolationLevel::Serializable)
- .start()?;
+ let mut tx = db.transaction()?;
let row = tx.query_opt(
"SELECT status, id FROM tx_out WHERE wtid=$1 FOR UPDATE",
&[&wtid.as_ref()],
@@ -345,10 +350,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
},
Category::Receive => match rpc.get_tx_segwit_key(&id) {
Ok((full, reserve_pub)) => {
- let mut tx = db
- .build_transaction()
- .isolation_level(IsolationLevel::Serializable)
- .start()?;
+ let mut tx = db.transaction()?;
let row = tx.query_opt(
"SELECT id FROM tx_in WHERE reserve_pub=$1 FOR UPDATE",
&[&reserve_pub.as_ref()],
@@ -379,10 +381,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
}
// Move last_hash forward if no error have been caught
{
- let mut tx = db
- .build_transaction()
- .isolation_level(IsolationLevel::Serializable)
- .start()?;
+ let mut tx = db.transaction()?;
let curr_hash: Option<BlockHash> = tx
.query_opt(
"SELECT value FROM state WHERE name='last_hash' FOR UPDATE",
@@ -445,6 +444,9 @@ impl Config {
fn main() {
taler_log::init();
+ #[cfg(feature = "fail")]
+ taler_log::log::warn!("Running with random failures is unsuitable for production");
+
// Guess network by trying to connect to a JSON RPC server
let data_dir = std::env::args()
.skip(1)
diff --git a/script/setup.sh b/script/setup.sh
@@ -80,12 +80,12 @@ function btc_wire() {
target/debug/btc-wire $BTC_DIR &> btc_wire.log &
}
-# Start multiple btc_wire in parralel in fail mode
+# Start multiple btc_wire in parallel
function stressed_btc_wire() {
cargo build --bin btc-wire &> /dev/null
target/debug/btc-wire $BTC_DIR &> btc_wire.log &
- target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
- target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
+ target/debug/btc-wire $BTC_DIR &> btc_wire1.log &
+ target/debug/btc-wire $BTC_DIR &> btc_wire2.log &
}
# Start wire_gateway in test mode
@@ -97,4 +97,22 @@ function gateway() {
sleep 0.2
curl -s $BANK_ENDPOINT -o /dev/null && break
done
+}
+
+# Check history endpoint request return a specific amount of transactions of specific amounts
+# usage: check_delta endpoint nb_txs amount_sequence
+function check_delta() {
+ ALL=`curl -s ${BANK_ENDPOINT}history/$1`
+ for n in `$2`; do
+ if ! `echo $ALL | grep BTC:0.0000$n > /dev/null`; then
+ echo -n " missing tx with amount: BTC:0.0000$n"
+ return 1
+ fi
+ done
+ NB=`echo $ALL | grep -o BTC | wc -l`
+ EXPECTED=`$2 | wc -w`
+ if [ "$EXPECTED" != "$NB" ]; then
+ echo -n " expected: $EXPECTED txs found $NB"
+ return 1
+ fi
}
\ No newline at end of file
diff --git a/script/test_btc_fail.sh b/script/test_btc_fail.sh
diff --git a/script/test_btc_stress.sh b/script/test_btc_stress.sh
@@ -1,5 +1,7 @@
#!/bin/bash
+# Test btc_wire behavior
+
set -eu
# Cleanup to run whenever we exit
@@ -16,7 +18,7 @@ trap cleanup EXIT
source "${BASH_SOURCE%/*}/setup.sh"
-echo "---- Setup -----"
+echo "---- Setup stressed -----"
echo "Load config file"
load_config
echo "Reset database"
@@ -31,24 +33,52 @@ echo "Start gateway"
gateway
echo ""
+SEQ="seq 10 99"
+
+function check() {
+ check_delta "$1?delta=-100" "$SEQ"
+}
+
echo "----- Generate many transactions -----"
echo -n "Making wire transfer to exchange:"
-for n in `seq 10 99`; do
+for n in `$SEQ`; do
btc-wire-cli -d $BTC_DIR transfer 0.0000$n
mine_btc # Mine transactions
done
next_btc # Trigger btc_wire
-next_btc # Trigger watcher twice, never sure
-check_balance 9.99438310 1.00490500
echo " OK"
echo -n "Requesting exchange incoming transaction list:"
-ALL=`curl -s ${BANK_ENDPOINT}history/incoming?delta=-100`
-test `echo $ALL | grep -o BTC | wc -l` -eq 90 || exit 1;
-for n in `seq 10 99`; do
- echo $ALL | grep BTC:0.0000$n > /dev/null || exit 1;
+check incoming
+echo " OK"
+
+echo -n "Check balance:"
+check_balance 9.99438310 1.00490500
+echo " OK"
+
+echo -n "Making wire transfer from exchange:"
+for n in `$SEQ`; do
+ taler-exchange-wire-gateway-client \
+ -b $BANK_ENDPOINT \
+ -C payto://bitcoin/$CLIENT \
+ -a BTC:0.0000$n > /dev/null
done
+next_btc # Mine transactions
+sleep 10
+next_btc # Trigger watcher twice, never sure
+sleep 10
+next_btc # Trigger watcher twice, never sure
+sleep 10
+next_btc # Trigger watcher twice, never sure
+echo " OK"
+
+echo -n "Requesting exchange outgoing transaction list:"
+check outgoing
+echo " OK"
+
+echo -n "Check balance:"
+check_balance 9.99928810 0.99981936
echo " OK"
echo "----- Recover DB -----"
@@ -59,11 +89,16 @@ next_btc # Trigger watcher
sleep 2
echo -n "Requesting exchange incoming transaction list:"
-ALL=`curl -s ${BANK_ENDPOINT}history/incoming?delta=-100`
-test `echo $ALL | grep -o BTC | wc -l` -eq 90 || exit 1;
-for n in `seq 10 99`; do
- echo $ALL | grep BTC:0.0000$n > /dev/null || exit 1;
-done
+check incoming
+echo " OK"
+
+echo -n "Requesting exchange outgoing transaction list:"
+check outgoing
+echo " OK"
+
+echo -n "Check balance:"
+# Balance should not have changed
+check_balance 9.99928810 0.99981936
echo " OK"
echo "All tests passed"
\ No newline at end of file
diff --git a/script/test_gateway.sh b/script/test_gateway.sh
@@ -93,24 +93,14 @@ echo ""
echo "----- History delta -----"
-# Check history endpoint request return a specific amount of transactions of specific amounts
-# usage: check_delta endpoint nb_txs amount_sequence
-function check_delta() {
- ALL=`curl -s ${BANK_ENDPOINT}history/$1`
- test `echo $ALL | grep -o BTC | wc -l` -eq $2 || return 1;
- for n in `$3`; do
- echo $ALL | grep BTC:0.0000$n > /dev/null || return 1;
- done
-}
-
for endpoint in incoming outgoing; do
echo -n "History $endpoint:"
- check_delta ${endpoint}?delta=-9 9 "seq 1 9" && echo -n " OK" || echo -n " Failed"
- check_delta ${endpoint}?delta=9 9 "seq 1 9" && echo -n " OK" || echo -n " Failed"
- check_delta ${endpoint}?delta=-4 4 "seq 6 9" && echo -n " OK" || echo -n " Failed"
- check_delta ${endpoint}?delta=4 4 "seq 1 4" && echo -n " OK" || echo -n " Failed"
- check_delta "${endpoint}?delta=-3&start=5" 3 "seq 2 4" && echo -n " OK" || echo -n " Failed"
- check_delta "${endpoint}?delta=3&start=4" 3 "seq 5 7" && echo -n " OK" || echo -n " Failed"
+ check_delta ${endpoint}?delta=-9 "seq 1 9" && echo -n " OK" || echo -n " Failed"
+ check_delta ${endpoint}?delta=9 "seq 1 9" && echo -n " OK" || echo -n " Failed"
+ check_delta ${endpoint}?delta=-4 "seq 6 9" && echo -n " OK" || echo -n " Failed"
+ check_delta ${endpoint}?delta=4 "seq 1 4" && echo -n " OK" || echo -n " Failed"
+ check_delta "${endpoint}?delta=-3&start=5" "seq 2 4" && echo -n " OK" || echo -n " Failed"
+ check_delta "${endpoint}?delta=3&start=4" "seq 5 7" && echo -n " OK" || echo -n " Failed"
echo ""
done
diff --git a/test.conf b/test.conf
@@ -4,4 +4,4 @@ DB_URL = postgres://localhost/postgres?user=postgres&password=password
PORT = 8060
PAYTO = payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj
ADDRESS = mpTJZxWPerz1Gife6mQSdHT8mMuJK6FP85
-CONFIRMATION = 6
-\ No newline at end of file
+CONFIRMATION = 1
+\ No newline at end of file