commit fa6b17a6f311fef35c92b03643356cf1522e2bc4
parent 46fed50db1edd4e3590fc3d7bcda565907c16b7a
Author: Antoine A <>
Date: Fri, 17 Dec 2021 17:14:31 +0100
Use postgres LISTEN/NOTIFY
Diffstat:
9 files changed, 99 insertions(+), 83 deletions(-)
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -1,4 +1,4 @@
-use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, SignedAmount, Txid, Network};
+use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Network, SignedAmount, Txid};
use btc_wire::{
rpc::{BtcRpc, Category},
rpc_utils::{default_data_dir, sender_address},
@@ -27,14 +27,14 @@ mod fail_point;
enum Status {
/// Client have ask for a transaction
Proposed = 0,
+ /// The wire is working has started handling this transaction
+ Active = 1,
/// Transaction have been announced to the bitcoin network
- Pending = 1,
- /// Transaction have been mined
- OnChain = 2,
+ Pending = 2,
/// The wire failed to send this transaction and will try later
Delayed = 3,
- /// The wire failed to update the transaction status in the database
- Manual = 4,
+ /// Transaction have been mined and confirmed
+ Confirmed = 4,
}
impl TryFrom<u8> for Status {
@@ -44,9 +44,9 @@ impl TryFrom<u8> for Status {
match v {
x if x == Status::Proposed as u8 => Ok(Status::Proposed),
x if x == Status::Pending as u8 => Ok(Status::Pending),
- x if x == Status::OnChain as u8 => Ok(Status::OnChain),
+ x if x == Status::Confirmed as u8 => Ok(Status::Confirmed),
x if x == Status::Delayed as u8 => Ok(Status::Delayed),
- x if x == Status::Manual as u8 => Ok(Status::Manual),
+ x if x == Status::Active as u8 => Ok(Status::Active),
_ => Err(()),
}
}
@@ -169,25 +169,22 @@ fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> {
/// Listen for new proposed transactions and announce them on the bitcoin network
fn sender(rpc: BtcRpc, mut db: AutoReloadDb, _config: &Config) {
- // List all transactions waiting to be sent
- fn list_waiting(db: &mut Client) -> Result<Vec<(i32, Status)>, postgres::Error> {
- let mut iter = db.query_raw(
- "SELECT id, status FROM tx_out WHERE status=$1 OR status=$2",
- &[&(Status::Proposed as i16), &(Status::Delayed as i16)],
- )?;
+ // List all transactions with the given status
+ fn list_status(db: &mut Client, status: Status) -> Result<Vec<i32>, postgres::Error> {
+ let mut iter =
+ db.query_raw("SELECT id FROM tx_out WHERE status=$1", &[&(status as i16)])?;
let mut out = Vec::new();
while let Some(row) = iter.next()? {
- let status: i16 = row.get(1);
- out.push((row.get(0), Status::try_from(status as u8).unwrap()));
+ out.push(row.get(0));
}
return Ok(out);
}
// List all transactions waiting to be sent
- fn list_manual(db: &mut Client) -> Result<Vec<(i32, [u8; 32])>, postgres::Error> {
+ fn list_active(db: &mut Client) -> Result<Vec<(i32, [u8; 32])>, postgres::Error> {
let mut iter = db.query_raw(
"SELECT id, wtid FROM tx_out WHERE status=$1",
- &[&(Status::Manual as i16)],
+ &[&(Status::Active as i16)],
)?;
let mut out = Vec::new();
while let Some(row) = iter.next()? {
@@ -208,7 +205,7 @@ fn sender(rpc: BtcRpc, mut db: AutoReloadDb, _config: &Config) {
// We lock the row with FOR UPDATE to prevent sending same transaction multiple time
let iter = tx.query_opt(
"SELECT amount, wtid, credit_acc, exchange_url FROM tx_out WHERE id=$1 AND status=$2 FOR UPDATE",
- &[&id, &(Status::Manual as i16)],
+ &[&id, &(Status::Active as i16)],
)?;
if let Some(row) = iter {
let amount = taler_amount_to_btc_amount(&Amount::from_str(row.get(0))?)?;
@@ -217,10 +214,10 @@ fn sender(rpc: BtcRpc, mut db: AutoReloadDb, _config: &Config) {
let exchange_base_url: Url = Url::parse(row.get(3))?;
let metadata = encode_info(reserve_pub.try_into()?, &exchange_base_url);
- fail_point("Skip send_op_return", 0.3)?;
+ fail_point("Skip send_op_return", 0.2)?;
match rpc.send_op_return(&addr, &amount, &metadata) {
Ok(tx_id) => {
- fail_point("Fail update db", 0.3)?;
+ fail_point("Fail update db", 0.2)?;
tx.execute(
"UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
&[&(Status::Pending as i16), &tx_id.as_ref(), &id],
@@ -229,7 +226,6 @@ fn sender(rpc: BtcRpc, mut db: AutoReloadDb, _config: &Config) {
}
Err(e) => {
info!("sender: RPC - {}", e);
- fail_point("Fail update db", 0.3)?;
tx.execute(
"UPDATE tx_out SET status=$1 WHERE id=$2",
&[&(Status::Delayed as i16), &id],
@@ -245,24 +241,27 @@ fn sender(rpc: BtcRpc, mut db: AutoReloadDb, _config: &Config) {
// TODO check if transactions are abandoned
+ let mut failed = false;
loop {
let db = db.client();
let result: Result<(), Box<dyn std::error::Error>> = (|| {
- // Send waiting transactions
- for (id, status) in list_waiting(db)? {
- // Set status to MANUAL to detect database error preventing atomicity
- let nb = db.execute(
- "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3",
- &[&(Status::Manual as i16), &id, &(status as i16)],
- )?;
- if nb == 0 {
- warn!("sender: transaction status collision, database have been altered by another process");
+ // Listen to all channels
+ db.batch_execute("LISTEN new_block; LISTEN new_tx")?;
+ // Wait for the next notification
+ {
+ let mut ntf = db.notifications();
+ // On failure retry without waiting for notifications
+ if !failed && ntf.is_empty() {
+ // Block until next notification
+ ntf.blocking_iter().next()?;
}
- perform_send(db, &rpc, id)?;
+ // Conflate all notifications
+ let mut iter = ntf.iter();
+ while let Some(_) = iter.next()? {}
}
- // Try to recover transactions stuck in manual
- let mut manuals = list_manual(db)?;
- if !manuals.is_empty() {
+ // Try to recover transactions stuck in active
+ let mut actives = list_active(db)?;
+ if !actives.is_empty() {
let last_hash = last_hash(db)?;
let txs = rpc.list_since_block(last_hash.as_ref(), 1, false)?;
// Search for a matching unconfirmed transactions
@@ -270,14 +269,14 @@ fn sender(rpc: BtcRpc, mut db: AutoReloadDb, _config: &Config) {
if tx.detail.category == Category::Send {
if let Ok((_, bytes)) = rpc.get_tx_op_return(&tx.txid) {
let (wtid, _) = decode_info(&bytes);
- if let Some(pos) = manuals.iter().position(|(_, it)| it == &wtid) {
- let (id, wtid) = manuals.swap_remove(pos);
+ if let Some(pos) = actives.iter().position(|(_, it)| it == &wtid) {
+ let (id, wtid) = actives.swap_remove(pos);
let nb = db.execute(
"UPDATE tx_out SET status=$1 WHERE wtid=$2 AND status=$3 RETURNING id",
&[
&(Status::Pending as i16),
&wtid.as_ref(),
- &(Status::Manual as i16),
+ &(Status::Active as i16),
],
)?;
if nb > 0 {
@@ -288,16 +287,42 @@ fn sender(rpc: BtcRpc, mut db: AutoReloadDb, _config: &Config) {
}
}
// If nothing match, retry to send
- for (id, _) in manuals {
+ for (id, _) in actives {
perform_send(db, &rpc, id)?;
}
}
+ // Send delayed transactions
+ for id in list_status(db, Status::Delayed)? {
+ // Set status to Active to detect database error preventing atomicity
+ let nb = db.execute(
+ "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3",
+ &[&(Status::Active as i16), &id, &(Status::Delayed as i16)],
+ )?;
+ if nb == 0 {
+ warn!("sender: transaction status collision, database have been altered by another process");
+ }
+ perform_send(db, &rpc, id)?;
+ }
+ // Send proposed transactions
+ for id in list_status(db, Status::Proposed)? {
+ // Set status to Active to detect database error preventing atomicity
+ let nb = db.execute(
+ "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3",
+ &[&(Status::Active as i16), &id, &(Status::Proposed as i16)],
+ )?;
+ if nb == 0 {
+ warn!("sender: transaction status collision, database have been altered by another process");
+ }
+ perform_send(db, &rpc, id)?;
+ }
Ok(())
})();
if let Err(e) = result {
error!("sender: DB - {}", e);
+ failed = true;
+ } else {
+ failed = false;
}
- std::thread::sleep(Duration::from_millis(300));
}
}
@@ -337,14 +362,14 @@ fn watcher(rpc: BtcRpc, mut db: AutoReloadDb, config: &Config) {
let status: i16 = row.get(0);
let _id: i32 = row.get(1);
let status: Status = Status::try_from(status as u8).unwrap();
- if status != Status::OnChain {
+ if status != Status::Confirmed {
tx.execute(
"UPDATE tx_out SET status=$1 where id=$2",
- &[&(Status::OnChain as i16), &_id],
+ &[&(Status::Confirmed as i16), &_id],
)?;
if status == Status::Proposed {
warn!("watcher: tx {} is present on chain at {} while being in proposed status", _id, id);
- } else if status == Status::Manual {
+ } else if status == Status::Active {
warn!(
"watcher: tx {} have been recovered automatically",
_id
@@ -363,7 +388,7 @@ fn watcher(rpc: BtcRpc, mut db: AutoReloadDb, config: &Config) {
OsRng.fill_bytes(&mut request_uid);
let nb = tx.execute(
"INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
- &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(Status::OnChain as i16), &id.as_ref(), &request_uid.as_ref()
+ &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(Status::Confirmed as i16), &id.as_ref(), &request_uid.as_ref()
],
)?;
if nb > 0 {
@@ -427,6 +452,8 @@ fn watcher(rpc: BtcRpc, mut db: AutoReloadDb, config: &Config) {
};
tx.commit()?;
}
+ // Notify a new block have been scanned
+ db.execute("NOTIFY new_block", &[])?;
Ok(())
})();
if let Err(e) = result {
diff --git a/research.md b/research.md
@@ -11,8 +11,7 @@ Rust client library OpenEthereum - JSON RCP API
## TODO
-- Listen/Notify to replace pooling
-- Add and test bound
+- Add and test bounce
- check if transactions are abandoned
- suicide after n operations
- detect small fork -> warning
diff --git a/script/setup.sh b/script/setup.sh
@@ -41,12 +41,10 @@ function setup_btc() {
RESERVE=`$BTC_CLI -rpcwallet=reserve getnewaddress`
CLIENT=`$BTC_CLI -rpcwallet=client getnewaddress`
WIRE=`$BTC_CLI -rpcwallet=wire getnewaddress`
- $BTC_CLI generatetoaddress 101 $RESERVE > /dev/null
+ mine_btc 101
$BTC_CLI -rpcwallet=reserve sendtoaddress $CLIENT 10 > /dev/null
$BTC_CLI -rpcwallet=reserve sendtoaddress $WIRE 1 > /dev/null
- $BTC_CLI generatetoaddress 1 $RESERVE > /dev/null
mine_btc
- check_balance 10.00000000 1.00000000
}
# Mine blocks
@@ -59,11 +57,9 @@ function next_btc() {
# Mine enough block to confirm previous transactions
mine_btc $CONFIRMATION
# Wait for btc_wire to catch up
- sleep 0.3
+ sleep 0.2
# Mine one more block to trigger btc_wire
mine_btc
- # Wait for btc_wire to catch up
- sleep 0.5
}
# Check client and wire balance
diff --git a/script/test_btc_fail.sh b/script/test_btc_fail.sh
@@ -18,7 +18,7 @@ trap cleanup EXIT
source "${BASH_SOURCE%/*}/setup.sh"
-echo "---- Setup fail -----"
+echo "----- Setup fail -----"
echo "Load config file"
load_config
echo "Reset database"
@@ -33,7 +33,7 @@ echo "Start gateway"
gateway
echo ""
-SEQ="seq 10 99"
+SEQ="seq 10 40"
function check() {
check_delta "$1?delta=-100" "$SEQ"
@@ -54,7 +54,7 @@ check incoming
echo " OK"
echo -n "Check balance:"
-check_balance 9.99438310 1.00490500
+check_balance 9.99897979 1.00077500
echo " OK"
echo "----- Handle outgoing -----"
@@ -66,12 +66,8 @@ for n in `$SEQ`; do
-C payto://bitcoin/$CLIENT \
-a BTC:0.0000$n > /dev/null
done
-sleep 20
-next_btc # Trigger watcher
-sleep 20
-next_btc # Mine transactions
-sleep 20
-next_btc # Mine transactions
+sleep 15
+mine_btc # Mine transactions
echo " OK"
echo -n "Requesting exchange outgoing transaction list:"
@@ -79,5 +75,5 @@ check outgoing
echo " OK"
echo -n "Check balance:"
-check_balance 9.99928810
+check_balance 9.99975479
echo " OK"
\ No newline at end of file
diff --git a/script/test_btc_stress.sh b/script/test_btc_stress.sh
@@ -18,7 +18,7 @@ trap cleanup EXIT
source "${BASH_SOURCE%/*}/setup.sh"
-echo "---- Setup stressed -----"
+echo "----- Setup stressed -----"
echo "Load config file"
load_config
echo "Reset database"
@@ -46,10 +46,9 @@ for n in `$SEQ`; do
btc-wire-cli -d $BTC_DIR transfer 0.0000$n
mine_btc # Mine transactions
done
-next_btc # Trigger btc_wire
-sleep 5
-next_btc # Trigger btc_wire again (for sure)
-sleep 3
+sleep 3 # Give time for btc_wire watcher to process
+next_btc # Confirm all transactions
+sleep 3 # Give time for btc_wire watcher to process
echo " OK"
echo -n "Requesting exchange incoming transaction list:"
@@ -69,12 +68,8 @@ for n in `$SEQ`; do
-C payto://bitcoin/$CLIENT \
-a BTC:0.0000$n > /dev/null
done
+sleep 10 # Give time for btc_wire sender to process
next_btc # Mine transactions
-sleep 5
-next_btc # Trigger watcher twice (for sure)
-sleep 5
-next_btc # Trigger watcher twice (for sure)
-sleep 5
echo " OK"
echo -n "Requesting exchange outgoing transaction list:"
diff --git a/script/test_btc_wire.sh b/script/test_btc_wire.sh
@@ -18,7 +18,7 @@ trap cleanup EXIT
source "${BASH_SOURCE%/*}/setup.sh"
-echo "---- Setup -----"
+echo "----- Setup -----"
echo "Load config file"
load_config
echo "Reset database"
diff --git a/script/test_gateway.sh b/script/test_gateway.sh
@@ -22,7 +22,7 @@ trap cleanup EXIT
source "${BASH_SOURCE%/*}/setup.sh"
ADDRESS=mpTJZxWPerz1Gife6mQSdHT8mMuJK6FP85
-echo "---- Setup -----"
+echo "----- Setup -----"
echo "Load config file"
load_config
echo "Reset database"
@@ -31,7 +31,7 @@ echo "Start gateway"
gateway
echo ""
-echo "---- Gateway API -----"
+echo "----- Gateway API -----"
echo -n "Making wire transfer to exchange:"
for n in `seq 1 9`; do
@@ -66,7 +66,7 @@ for n in `seq 1 9`; do
done
echo " OK"
-echo "---- Endpoint & Method -----"
+echo "----- Endpoint & Method -----"
echo -n "Unknown endpoint:"
test `curl -w %{http_code} -s -o /dev/null ${BANK_ENDPOINT}test` -eq 404 && echo " OK" || echo " Failed"
diff --git a/script/test_recover_db.sh b/script/test_recover_db.sh
@@ -18,7 +18,7 @@ trap cleanup EXIT
source "${BASH_SOURCE%/*}/setup.sh"
-echo "---- Setup -----"
+echo "----- Setup -----"
echo "Load config file"
load_config
echo "Reset database"
@@ -33,7 +33,7 @@ echo "Start gateway"
gateway
echo ""
-echo "---- With DB -----"
+echo "----- With DB -----"
echo "Making wire transfer to exchange:"
btc-wire-cli -d $BTC_DIR transfer 0.000042
next_btc
@@ -41,7 +41,7 @@ check_balance 9.99995009 1.00004200
echo -n "Requesting exchange incoming transaction list:"
taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i | grep BTC:0.000042 > /dev/null && echo " OK" || echo " Failed"
-echo "---- Without DB -----"
+echo "----- Without DB -----"
echo "Stop database"
sudo service postgresql stop > /dev/null
@@ -52,7 +52,7 @@ check_balance 9.99990218 1.00008200
echo -n "Requesting exchange incoming transaction list:"
taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i 2>&1 | grep -q "504" && echo " OK" || echo " Failed"
-echo "---- Reconnect DB -----"
+echo "----- Reconnect DB -----"
echo "Start database"
sudo service postgresql start > /dev/null
@@ -73,7 +73,7 @@ echo -n "Requesting exchange's outgoing transaction list:"
taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -o | grep BTC:0.00002 > /dev/null
echo " OK"
-echo "---- Recover DB -----"
+echo "----- Recover DB -----"
echo "Reset database"
reset_db # Clear database tables
diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs
@@ -188,7 +188,7 @@ async fn router(
ErrorCode::GENERIC_PARAMETER_MALFORMED,
));
}
- let db = state.pool.get().await.catch_code(
+ let mut db = state.pool.get().await.catch_code(
StatusCode::GATEWAY_TIMEOUT,
ErrorCode::GENERIC_DB_FETCH_FAILED,
)?;
@@ -228,9 +228,12 @@ async fn router(
}
let timestamp = Timestamp::now();
- let row = db.query_one("INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, request_uid) VALUES (now(), $1, $2, $3, $4, $5, $6, $7) RETURNING id", &[
+ let tx = db.transaction().await?;
+ let row = tx.query_one("INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, request_uid) VALUES (now(), $1, $2, $3, $4, $5, $6, $7) RETURNING id", &[
&request.amount.to_string(), &request.wtid.as_ref(), &state.config.payto.as_ref(), &request.credit_account.as_ref(), &request.exchange_base_url.as_ref(), &0i16, &request.request_uid.as_ref()
]).await?;
+ tx.execute("NOTIFY new_tx", &[]).await?;
+ tx.commit().await?;
encode_body(
parts,
StatusCode::OK,