depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 4b9a168bc7e97e48f1d11175a51da426f1cfe178
parent c016404cd3dd43b085c7f84fd137258bf27ee0f1
Author: Antoine A <>
Date:   Mon, 13 Dec 2021 17:48:37 +0100

Handle and test losing database connection in btc-wire

Diffstat:
MCargo.lock | 1+
Mbtc-wire/Cargo.toml | 2++
Mbtc-wire/src/main.rs | 238++++++++++++++++++++++++++++++++++++++++++++++++-------------------------------
Ascript/setup.sh | 62++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mscript/test_btc_wire.sh | 61+++++++++++++------------------------------------------------
Mscript/test_gateway.sh | 20++++++--------------
Mscript/test_recover_db.sh | 64+++++++++++++++++++++++++++++++++++++++-------------------------
7 files changed, 269 insertions(+), 179 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -187,6 +187,7 @@ dependencies = [ "postgres", "rand", "serde", + "taler-log", "thiserror", "uri-pack", "url", diff --git a/btc-wire/Cargo.toml b/btc-wire/Cargo.toml @@ -30,6 +30,8 @@ wire-gateway = { path = "../wire-gateway" } owo-colors = "3.1.1" # Ini files configparser = "3.0.0" +# Taler logging +taler-log = { path = "../taler-log" } [dev-dependencies] # statistics-driven micro-benchmarks diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs @@ -15,10 +15,10 @@ use postgres::{fallible_iterator::FallibleIterator, Client, NoTls}; use std::{ collections::HashMap, path::PathBuf, - process::exit, str::FromStr, time::{Duration, SystemTime}, }; +use taler_log::log::{error, info, warn}; use url::Url; use wire_gateway::api_common::Amount; @@ -107,7 +107,7 @@ mod test { } /// Listen for new proposed transactions and announce them on the bitcoin network -fn sender(rpc: RPC, mut db: Client) { +fn sender(rpc: RPC, mut db: AutoReloadDb) { fn get_proposed( db: &mut Client, ) -> Result<Vec<(i32, BtcAmount, Address, Vec<u8>)>, Box<dyn std::error::Error>> { @@ -133,119 +133,173 @@ fn sender(rpc: RPC, mut db: Client) { return Ok(out); } - let result: Result<(), Box<dyn std::error::Error>> = (|| loop { - for (id, amount, addr, metadata) in get_proposed(&mut db)? { - match rpc.send_op_return(&addr, amount, &metadata) { - Ok(txid) => { - db.execute( - "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", - &[&(Status::Pending as i16), &txid.as_ref(), &id], - )?; - println!("{} PENDING", txid); - } - Err(e) => { - println!("sender: RPC - {}", e); - db.execute( - "UPDATE tx_out SET status=$1 WHERE id=$3", - &[&(Status::Delayed as i16), &id], - )?; + loop { + let db = db.client(); + let result: Result<(), Box<dyn std::error::Error>> = (|| { + for (id, amount, addr, metadata) in get_proposed(db)? { + match rpc.send_op_return(&addr, amount, &metadata) { + Ok(txid) => { + db.execute( + "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", + &[&(Status::Pending as i16), &txid.as_ref(), &id], + )?; + info!("{} PENDING", txid); + } + Err(e) => { + info!("sender: RPC - {}", e); + db.execute( + "UPDATE tx_out SET status=$1 WHERE id=$3", + &[&(Status::Delayed as i16), &id], + )?; + } } } + Ok(()) + })(); + if let Err(e) = result { + error!("sender: DB - {}", e); } std::thread::sleep(Duration::from_millis(300)); - })(); - if let Err(e) = result { - eprintln!("sender: DB - {}", e); - exit(1); } - unreachable!("Sender should only exit on error") +} + +struct AutoReloadDb { + delay: Duration, + config: String, + client: Client, +} + +impl AutoReloadDb { + pub fn new(config: impl Into<String>, delay: Duration) -> Self { + let config: String = config.into(); + Self { + client: Self::connect(&config, delay), + config, + delay, + } + } + + /// Connect a new client, loop on error + fn connect(config: &str, delay: Duration) -> Client { + loop { + match Client::connect(config, NoTls) { + Ok(new) => + return new, + + Err(err) => { + error!("connect: DB - {}", err); + std::thread::sleep(delay); + } + } + } + } + + pub fn client(&mut self) -> &mut Client { + if self.client.is_valid(self.delay).is_err() { + self.client = Self::connect(&self.config, self.delay); + } + &mut self.client + } } /// Listen for mined block and index confirmed transactions into the database -fn watcher(rpc: RPC, mut db: Client) -> Result<(), Box<dyn std::error::Error>> { - let stored_hash = db - .query("SELECT value FROM state WHERE name='last_hash'", &[]) - .unwrap(); - let mut last_hash: Option<BlockHash> = if stored_hash.len() == 1 { - Some(BlockHash::from_slice(stored_hash[0].get(0)).unwrap()) - } else { - None +fn watcher(rpc: RPC, mut db: AutoReloadDb) { + let mut last_hash: Option<BlockHash> = { + let db = db.client(); + let stored_hash = db + .query("SELECT value FROM state WHERE name='last_hash'", &[]) + .unwrap(); + if stored_hash.len() == 1 { + Some(BlockHash::from_slice(stored_hash[0].get(0)).unwrap()) + } else { + None + } }; let confirmation = 1; loop { - let list = - rpc.list_since_block(last_hash.as_ref(), Some(confirmation), None, Some(true))?; + let db = db.client(); + let result: Result<(), Box<dyn std::error::Error>> = (|| { + let list = + rpc.list_since_block(last_hash.as_ref(), Some(confirmation), None, Some(true))?; - // List all confirmed send and receive transactions since last check - let txs: HashMap<Txid, Category> = list - .transactions - .into_iter() - .filter_map(|tx| { - let cat = tx.detail.category; - (tx.info.confirmations >= confirmation as i32 - && (cat == Category::Send || cat == Category::Receive)) - .then(|| (tx.info.txid, cat)) - }) - .collect(); + // List all confirmed send and receive transactions since last check + let txs: HashMap<Txid, Category> = list + .transactions + .into_iter() + .filter_map(|tx| { + let cat = tx.detail.category; + (tx.info.confirmations >= confirmation as i32 + && (cat == Category::Send || cat == Category::Receive)) + .then(|| (tx.info.txid, cat)) + }) + .collect(); - for (id, category) in txs { - match category { - Category::Send => match rpc.get_tx_op_return(&id) { - Ok(_) => { - let nb_rows = db.execute( - "UPDATE tx_out SET status=$1 WHERE status=$2 AND txid=$3", - &[ - &(Status::Confirmed as i16), - &(Status::Pending as i16), - &id.as_ref(), - ], - )?; - // Check already confirmed - if nb_rows > 0 { - println!("{} CONFIRMED", &id); + for (id, category) in txs { + match category { + Category::Send => match rpc.get_tx_op_return(&id) { + Ok(_) => { + let nb_rows = db.execute( + "UPDATE tx_out SET status=$1 WHERE status=$2 AND txid=$3", + &[ + &(Status::Confirmed as i16), + &(Status::Pending as i16), + &id.as_ref(), + ], + )?; + // Check already confirmed + if nb_rows > 0 { + info!("{} CONFIRMED", &id); + } } - } - Err(err) => match err { - GetOpReturnErr::MissingOpReturn => {} // ignore - err => println!("send: {} {}", id, err), + Err(err) => match err { + GetOpReturnErr::MissingOpReturn => {} // ignore + err => warn!("send: {} {}", id, err), + }, }, - }, - Category::Receive => match rpc.get_tx_segwit_key(&id) { - Ok((full, reserve_pub)) => { - let debit_addr = sender_address(&rpc, &full)?; - let credit_addr = full.tx.details[0].address.as_ref().unwrap(); - let time = full.tx.info.blocktime.unwrap(); - let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time); - let amount = btc_amount_to_taler_amount(&full.tx.amount.to_unsigned()?); - db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5)", &[ + Category::Receive => match rpc.get_tx_segwit_key(&id) { + Ok((full, reserve_pub)) => { + let debit_addr = sender_address(&rpc, &full)?; + let credit_addr = full.tx.details[0].address.as_ref().unwrap(); + let time = full.tx.info.blocktime.unwrap(); + let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time); + let amount = btc_amount_to_taler_amount(&full.tx.amount.to_unsigned()?); + db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5)", &[ &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string() ])?; - println!("{} << {} {}", &debit_addr, &credit_addr, &amount); - } - Err(err) => match err { - GetSegwitErr::Decode( - DecodeSegWitErr::MissingSegWitAddress | DecodeSegWitErr::NoMagicIdMatch, - ) => {} - err => println!("receive: {} {}", id, err), + info!("{} << {} {}", &debit_addr, &credit_addr, &amount); + } + Err(err) => match err { + GetSegwitErr::Decode( + DecodeSegWitErr::MissingSegWitAddress + | DecodeSegWitErr::NoMagicIdMatch, + ) => {} + err => warn!("receive: {} {}", id, err), + }, }, - }, - Category::Generate | Category::Immature | Category::Orphan => {} + Category::Generate | Category::Immature | Category::Orphan => {} + } } + let query = if let Some(_) = last_hash { + "UPDATE state SET value=$1 WHERE name='last_hash'" + } else { + "INSERT INTO state (name, value) VALUES ('last_hash', $1)" + }; + db.execute(query, &[&list.lastblock.as_ref()])?; + last_hash = Some(list.lastblock); + Ok(()) + })(); + if let Err(e) = result { + error!("sender: DB - {}", e); } - let query = if let Some(_) = last_hash { - "UPDATE state SET value=$1 WHERE name='last_hash'" - } else { - "INSERT INTO state (name, value) VALUES ('last_hash', $1)" - }; - db.execute(query, &[&list.lastblock.as_ref()])?; - last_hash = Some(list.lastblock); - println!("Wait for block"); + + info!("sender: Wait for block"); rpc.wait_for_new_block(0).ok(); } } fn main() { + taler_log::init(); let mut conf = Ini::new(); conf.read(std::fs::read_to_string("test.conf").unwrap()) .unwrap(); @@ -263,9 +317,9 @@ fn main() { let rpc_watcher = wallet_rpc(&data_dir, network, "wire"); let rpc_sender = wallet_rpc(&data_dir, network, "wire"); - let db_watcher = Client::connect(&db_url, NoTls).unwrap(); - let db_sender = Client::connect(&db_url, NoTls).unwrap(); + let db_watcher = AutoReloadDb::new(&db_url, Duration::from_secs(5)); + let db_sender = AutoReloadDb::new(&db_url, Duration::from_secs(5)); let join = std::thread::spawn(move || sender(rpc_sender, db_sender)); - watcher(rpc_watcher, db_watcher).unwrap(); + watcher(rpc_watcher, db_watcher); join.join().unwrap(); } diff --git a/script/setup.sh b/script/setup.sh @@ -0,0 +1,61 @@ +#!/bin/bash + +function load_config() { + source <(grep = test.conf | sed 's/ *= */=/' | sed 's/=\(.*\)/="\1"/g1') + BANK_ENDPOINT=http://127.0.0.1:$PORT/ +} + +function reset_db() { + # TODO sudo alternative ? + sudo service postgresql start > /dev/null + sudo -u postgres psql $DB_URL < wire-gateway/db/schema.sql > /dev/null +} + +function init_btc() { + BTC_DIR=$(mktemp -d) + BTC_CLI="bitcoin-cli -regtest -datadir=$BTC_DIR" + bitcoind -datadir=$BTC_DIR -txindex -regtest -fallbackfee=0.00000001 &> /dev/null & + $BTC_CLI -rpcwait getnetworkinfo > /dev/null +} + +function setup_btc() { + for wallet in wire client reserve; do + $BTC_CLI createwallet $wallet > /dev/null + done + RESERVE=`$BTC_CLI -rpcwallet=reserve getnewaddress` + CLIENT=`$BTC_CLI -rpcwallet=client getnewaddress` + $BTC_CLI generatetoaddress 101 $RESERVE > /dev/null + $BTC_CLI -rpcwallet=reserve sendtoaddress $CLIENT 1 > /dev/null + $BTC_CLI generatetoaddress 1 $RESERVE > /dev/null +} + +function next_btc() { + # Mine one block + $BTC_CLI generatetoaddress 1 $RESERVE > /dev/null + # Wait for btc_wire to catch up + sleep 0.5 +} + +function check_balance() { + CLIENT_BALANCE=`$BTC_CLI -rpcwallet=client getbalance` + WIRE_BALANCE=`$BTC_CLI -rpcwallet=wire getbalance` + if [ "$CLIENT_BALANCE" != "$1" ] || [ "$WIRE_BALANCE" != "$2" ]; then + echo "expected: client $1 wire $2 got: client $CLIENT_BALANCE wire $WIRE_BALANCE" + exit 1 + fi +} + +function btc_wire() { + cargo build --bin btc-wire &> /dev/null + target/debug/btc-wire $BTC_DIR &> /dev/null & +} + +function gateway() { + cargo build --bin wire-gateway --features test &> /dev/null + target/debug/wire-gateway &> /dev/null & + for n in `seq 1 50`; do + echo -n "." + sleep 0.2 + curl -s $BANK_ENDPOINT -o /dev/null && break + done +} +\ No newline at end of file diff --git a/script/test_btc_wire.sh b/script/test_btc_wire.sh @@ -2,77 +2,43 @@ set -eu -# Create temp file -TEMP_DIR=$(mktemp -d) # Cleanup to run whenever we exit function cleanup() { for n in `jobs -p`; do kill $n 2> /dev/null || true done - rm -rf $TEMP_DIR 2> /dev/null + rm -rf $BTC_DIR 2> /dev/null wait } # Install cleanup handler (except for kill -9) trap cleanup EXIT -# Load conf file -source <(grep = test.conf | sed 's/ *= */=/' | sed 's/=\(.*\)/="\1"/g1') -BANK_ENDPOINT=http://127.0.0.1:$PORT/ -BTC_CLI="bitcoin-cli -regtest -datadir=$TEMP_DIR" +source "${BASH_SOURCE%/*}/setup.sh" echo "---- Setup -----" - +echo "Load config file" +load_config echo "Reset database" -sudo service postgresql start > /dev/null -sudo -u postgres psql $DB_URL < wire-gateway/db/schema.sql > /dev/null +reset_db echo "Start bitcoin node" -bitcoind -datadir=$TEMP_DIR -txindex -regtest -fallbackfee=0.00000001 &> /dev/null & -$BTC_CLI -rpcwait getnetworkinfo > /dev/null +init_btc echo "Init bitcoin regtest" -for wallet in wire client reserve; do - $BTC_CLI createwallet $wallet > /dev/null -done -RESERVE=`$BTC_CLI -rpcwallet=reserve getnewaddress` -CLIENT=`$BTC_CLI -rpcwallet=client getnewaddress` -$BTC_CLI generatetoaddress 101 $RESERVE > /dev/null -$BTC_CLI -rpcwallet=reserve sendtoaddress $CLIENT 1 > /dev/null -$BTC_CLI generatetoaddress 1 $RESERVE > /dev/null -function check_balance() { - CLIENT_BALANCE=`$BTC_CLI -rpcwallet=client getbalance` - WIRE_BALANCE=`$BTC_CLI -rpcwallet=wire getbalance` - if [ "$CLIENT_BALANCE" != "$1" ] || [ "$WIRE_BALANCE" != "$2" ]; then - echo "expected: client $1 wire $2" - echo "got: client $CLIENT_BALANCE wire $WIRE_BALANCE" - exit 1 - fi -} -function next() { - # Mine one block - $BTC_CLI generatetoaddress 1 $RESERVE > /dev/null - # Wait for btc_wire to catch up - sleep 0.5 -} +setup_btc +next_btc check_balance 1.00000000 0.00000000 echo "Start btc-wire" -cargo build --bin btc-wire &> /dev/null -target/debug/btc-wire $TEMP_DIR &> /dev/null & +btc_wire echo "Start gateway" -cargo build --bin wire-gateway --features test &> /dev/null -target/debug/wire-gateway &> /dev/null & -for n in `seq 1 50`; do - echo -n "." - sleep 0.2 - curl -s $BANK_ENDPOINT -o /dev/null && break -done +gateway echo "" echo "---- Gateway API -----" echo -n "Making wire transfer to exchange:" -btc-wire-cli -d $TEMP_DIR transfer 0.00004 -next +btc-wire-cli -d $BTC_DIR transfer 0.00004 +next_btc check_balance 0.99995209 0.00004000 echo " OK" @@ -85,11 +51,10 @@ taler-exchange-wire-gateway-client \ -b $BANK_ENDPOINT \ -C payto://bitcoin/$CLIENT \ -a BTC:0.00002 > /dev/null -next +next_btc check_balance 0.99995209 0.00001801 echo " OK" - echo -n "Requesting exchange's outgoing transaction list:" taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -o | grep BTC:0.00002 > /dev/null echo " OK" diff --git a/script/test_gateway.sh b/script/test_gateway.sh @@ -16,24 +16,16 @@ function cleanup() { # Install cleanup handler (except for kill -9) trap cleanup EXIT -# Load conf file -source <(grep = test.conf | sed 's/ *= */=/' | sed 's/=\(.*\)/="\1"/g1') -BANK_ENDPOINT=http://127.0.0.1:$PORT/ +source "${BASH_SOURCE%/*}/setup.sh" echo "---- Setup -----" - +echo "Load config file" +load_config echo "Reset database" -sudo service postgresql start > /dev/null -sudo -u postgres psql $DB_URL < wire-gateway/db/schema.sql > /dev/null +reset_db echo "Start gateway" -cargo build --bin wire-gateway --features test &> /dev/null -target/debug/wire-gateway &> /dev/null & -for n in `seq 1 50`; do - echo -n "." - sleep 0.2 - curl -s $BANK_ENDPOINT -o /dev/null && break -done +gateway echo "" echo "---- Gateway API -----" @@ -88,7 +80,7 @@ done echo "" echo -n "Bad bitcoin address:" -taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -C payto://bitcoin/42$ADDRESS -a BTC:0.00042 2>&1 | grep -q "(400/26)" && echo " OK" || echo " Failed" +taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -C payto://bitcoin/42$ADDRESS -a BTC:0.00042 2>&1 | grep -q "(400/24)" && echo " OK" || echo " Failed" echo -n "Bad transaction amount:" taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -C payto://bitcoin/$ADDRESS -a ATC:0.00042 2>&1 | grep -q "(400/26)" && echo " OK" || echo " Failed" diff --git a/script/test_recover_db.sh b/script/test_recover_db.sh @@ -2,44 +2,42 @@ set -eu -# Create temp file -TEMP_FILE=$(mktemp) - # Cleanup to run whenever we exit function cleanup() { for n in `jobs -p`; do kill $n 2> /dev/null || true done - rm -f $TEMP_FILE + rm -rf $BTC_DIR 2> /dev/null wait } # Install cleanup handler (except for kill -9) trap cleanup EXIT -# Load conf file -source <(grep = test.conf | sed 's/ *= */=/' | sed 's/=\(.*\)/="\1"/g1') -BANK_ENDPOINT=http://127.0.0.1:$PORT/ +source "${BASH_SOURCE%/*}/setup.sh" echo "---- Setup -----" - +echo "Load config file" +load_config echo "Reset database" -sudo service postgresql start > /dev/null -sudo -u postgres psql $DB_URL < wire-gateway/db/schema.sql > /dev/null +reset_db +echo "Start bitcoin node" +init_btc +echo "Init bitcoin regtest" +setup_btc +next_btc +check_balance 1.00000000 0.00000000 +echo "Start btc-wire" +btc_wire echo "Start gateway" -cargo build --bin wire-gateway --features test &> /dev/null -target/debug/wire-gateway &> /dev/null & -for n in `seq 1 50`; do - echo -n "." - sleep 0.2 - curl -s $BANK_ENDPOINT -o /dev/null && break -done +gateway echo "" echo "---- With DB -----" - -echo -n "Making wire transfer to exchange:" -taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -D payto://bitcoin/$ADDRESS -a BTC:0.000042 > /dev/null && echo " OK" || echo " Failed" +echo "Making wire transfer to exchange:" +btc-wire-cli -d $BTC_DIR transfer 0.000042 +next_btc +check_balance 0.99995009 0.00004200 echo -n "Requesting exchange incoming transaction list:" taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i | grep BTC:0.000042 > /dev/null && echo " OK" || echo " Failed" @@ -47,8 +45,10 @@ echo "---- Without DB -----" echo "Stop database" sudo service postgresql stop > /dev/null -echo -n "Making wire transfer to exchange:" -taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -D payto://bitcoin/$ADDRESS -a BTC:0.000042 2>&1 | grep -q "504" && echo " OK" || echo " Failed" +echo "Making wire transfer to exchange:" +btc-wire-cli -d $BTC_DIR transfer 0.00004 +next_btc +check_balance 0.99990218 0.00008200 echo -n "Requesting exchange incoming transaction list:" taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i 2>&1 | grep -q "504" && echo " OK" || echo " Failed" @@ -56,7 +56,20 @@ echo "---- Recover DB -----" echo "Recovering database" sudo service postgresql start > /dev/null -echo -n "Making wire transfer to exchange:" -taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -D payto://bitcoin/$ADDRESS -a BTC:0.000042 > /dev/null && echo " OK" || echo " Failed" +sleep 5 # Wait for connection to be avaible echo -n "Requesting exchange incoming transaction list:" -taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i | grep BTC:0.000042 > /dev/null && echo " OK" || echo " Failed" +taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i | grep BTC:0.00004 > /dev/null && echo " OK" || echo " Failed" +echo -n "Making wire transfer from exchange:" +taler-exchange-wire-gateway-client \ + -b $BANK_ENDPOINT \ + -C payto://bitcoin/$CLIENT \ + -a BTC:0.00002 > /dev/null +next_btc +check_balance 0.99990218 0.00006001 +echo " OK" + +echo -n "Requesting exchange's outgoing transaction list:" +taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -o | grep BTC:0.00002 > /dev/null +echo " OK" + +echo "All tests passed" +\ No newline at end of file