commit b4b8c17004c45eb0227cff415d6e2868a70644f6
parent 5be1c86dc740f8bd6eea1c25301929d698c5eacb
Author: Antoine A <>
Date: Tue, 14 Dec 2021 13:11:34 +0100
btc-wire recreate transactions history when database content is lost
Diffstat:
5 files changed, 148 insertions(+), 32 deletions(-)
diff --git a/.gitignore b/.gitignore
@@ -1 +1,2 @@
/target
+*.log
+\ No newline at end of file
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -12,17 +12,20 @@ use btc_wire::{
};
use configparser::ini::Ini;
use postgres::{fallible_iterator::FallibleIterator, Client, NoTls};
+use rand::{rngs::OsRng, RngCore};
use std::{
collections::HashMap,
- path::PathBuf,
+ path::{Path, PathBuf},
+ process::exit,
str::FromStr,
time::{Duration, SystemTime},
};
use taler_log::log::{error, info, warn};
use url::Url;
-use wire_gateway::api_common::Amount;
+use wire_gateway::api_common::{crockford_base32_encode, Amount};
#[repr(u8)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum Status {
/// Client have ask for a transaction
Proposed = 0,
@@ -34,6 +37,20 @@ enum Status {
Delayed = 3,
}
+impl TryFrom<u8> for Status {
+ type Error = ();
+
+ fn try_from(v: u8) -> Result<Self, Self::Error> {
+ match v {
+ x if x == Status::Proposed as u8 => Ok(Status::Proposed),
+ x if x == Status::Pending as u8 => Ok(Status::Pending),
+ x if x == Status::Confirmed as u8 => Ok(Status::Confirmed),
+ x if x == Status::Delayed as u8 => Ok(Status::Delayed),
+ _ => Err(()),
+ }
+ }
+}
+
fn btc_payto_url(addr: &Address) -> Url {
Url::from_str(&format!("payto://bitcoin/{}", addr.to_string())).unwrap()
}
@@ -112,8 +129,8 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
db: &mut Client,
) -> Result<Vec<(i32, BtcAmount, Address, Vec<u8>)>, Box<dyn std::error::Error>> {
let mut iter = db.query_raw(
- "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1",
- &[&(Status::Proposed as i16)],
+ "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 OR status=$2",
+ &[&(Status::Proposed as i16), &(Status::Delayed as i16)],
)?;
let mut out = Vec::new();
while let Some(row) = iter.next()? {
@@ -183,9 +200,7 @@ impl AutoReloadDb {
fn connect(config: &str, delay: Duration) -> Client {
loop {
match Client::connect(config, NoTls) {
- Ok(new) =>
- return new,
-
+ Ok(new) => return new,
Err(err) => {
error!("connect: DB - {}", err);
std::thread::sleep(delay);
@@ -203,7 +218,7 @@ impl AutoReloadDb {
}
/// Listen for mined block and index confirmed transactions into the database
-fn watcher(rpc: RPC, mut db: AutoReloadDb) {
+fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
let mut last_hash: Option<BlockHash> = {
let db = db.client();
let stored_hash = db
@@ -238,18 +253,48 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb) {
for (id, category) in txs {
match category {
Category::Send => match rpc.get_tx_op_return(&id) {
- Ok(_) => {
- let nb_rows = db.execute(
- "UPDATE tx_out SET status=$1 WHERE status=$2 AND txid=$3",
- &[
- &(Status::Confirmed as i16),
- &(Status::Pending as i16),
- &id.as_ref(),
- ],
+ Ok((full, bytes)) => {
+ let (wtid, url) = decode_info(&bytes);
+ let row = db.query_opt(
+ "SELECT status, wtid, id FROM tx_out WHERE txid=$1",
+ &[&id.as_ref()],
)?;
- // Check already confirmed
- if nb_rows > 0 {
- info!("{} CONFIRMED", &id);
+ if let Some(row) = row {
+ let status: i16 = row.get(0);
+ let _wtid: &[u8] = row.get(1);
+ let _id: i32 = row.get(2);
+ if &wtid != _wtid {
+ warn!("watcher: state tx {} have uncompatible wtid in DB {} and on chain {}", id, crockford_base32_encode(&wtid), crockford_base32_encode(&_wtid));
+ exit(1);
+ }
+ let status: Status = Status::try_from(status as u8).unwrap();
+ if status != Status::Confirmed {
+ db.execute(
+ "UPDATE tx_out SET status=$1 where id=$2",
+ &[&(Status::Confirmed as i16), &_id],
+ )?;
+ if status == Status::Proposed {
+ warn!("watcher: tx {} is present on chain at {} while being in proposed status", _id, id);
+ } else {
+ info!("{} CONFIRMED", &id);
+ }
+ }
+ } else {
+ let debit_addr = sender_address(&rpc, &full)?;
+ let credit_addr = full.tx.details[0].address.as_ref().unwrap();
+ let time = full.tx.info.blocktime.unwrap();
+ let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time);
+ let amount =
+ btc_amount_to_taler_amount(&full.tx.amount.abs().to_unsigned()?);
+ // Generate a random request_uid
+ let mut request_uid = [0; 64];
+ OsRng.fill_bytes(&mut request_uid);
+ db.execute(
+ "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
+ &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string(), &config.base_url.to_string(), &(Status::Confirmed as i16), &id.as_ref(), &request_uid.as_ref()
+ ],
+ )?;
+ warn!("watcher: found an unregistered outgoing address {} {} in tx {}", crockford_base32_encode(&wtid), &url, id);
}
}
Err(err) => match err {
@@ -298,12 +343,32 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb) {
}
}
+#[derive(Debug, Clone)]
+struct Config {
+ base_url: Url,
+ db_url: String,
+ port: u16,
+ payto: Url,
+ address: String,
+}
+
+impl Config {
+ pub fn from_path(path: impl AsRef<Path>) -> Self {
+ let string = std::fs::read_to_string(path).unwrap();
+ let mut conf = Ini::new();
+ conf.read(string).unwrap();
+ Self {
+ base_url: Url::parse(&conf.get("main", "BASE_URL").unwrap()).unwrap(),
+ db_url: conf.get("main", "DB_URL").unwrap(),
+ port: conf.get("main", "PORT").unwrap().parse().unwrap(),
+ payto: Url::parse(&conf.get("main", "PAYTO").unwrap()).unwrap(),
+ address: conf.get("main", "ADDRESS").unwrap(),
+ }
+ }
+}
+
fn main() {
taler_log::init();
- let mut conf = Ini::new();
- conf.read(std::fs::read_to_string("test.conf").unwrap())
- .unwrap();
- let db_url = conf.get("main", "DB_URL").expect("Missing BD_URL");
// Guess network by trying to connect to a JSON RPC server
let data_dir = std::env::args()
@@ -311,15 +376,16 @@ fn main() {
.next()
.map(|str| PathBuf::from_str(&str).unwrap())
.unwrap_or(default_data_dir());
+ let config = Config::from_path("test.conf");
let network = dirty_guess_network(&data_dir);
let rpc = common_rpc(&data_dir, network).unwrap();
rpc.load_wallet(&WIRE).ok();
let rpc_watcher = wallet_rpc(&data_dir, network, "wire");
let rpc_sender = wallet_rpc(&data_dir, network, "wire");
- let db_watcher = AutoReloadDb::new(&db_url, Duration::from_secs(5));
- let db_sender = AutoReloadDb::new(&db_url, Duration::from_secs(5));
+ let db_watcher = AutoReloadDb::new(&config.db_url, Duration::from_secs(5));
+ let db_sender = AutoReloadDb::new(&config.db_url, Duration::from_secs(5));
let join = std::thread::spawn(move || sender(rpc_sender, db_sender));
- watcher(rpc_watcher, db_watcher);
+ watcher(rpc_watcher, db_watcher, &config);
join.join().unwrap();
}
diff --git a/script/setup.sh b/script/setup.sh
@@ -18,6 +18,15 @@ function init_btc() {
$BTC_CLI -rpcwait getnetworkinfo > /dev/null
}
+function restart_btc() {
+ BTC_CLI="bitcoin-cli -regtest -datadir=$BTC_DIR"
+ bitcoind -datadir=$BTC_DIR -txindex -regtest -fallbackfee=0.00000001 &> /dev/null &
+ $BTC_CLI -rpcwait getnetworkinfo > /dev/null
+ for wallet in wire client reserve; do
+ $BTC_CLI loadwallet $wallet > /dev/null
+ done
+}
+
function setup_btc() {
for wallet in wire client reserve; do
$BTC_CLI createwallet $wallet > /dev/null
@@ -47,12 +56,12 @@ function check_balance() {
function btc_wire() {
cargo build --bin btc-wire &> /dev/null
- target/debug/btc-wire $BTC_DIR &> /dev/null &
+ target/debug/btc-wire $BTC_DIR &> btc_wire.log &
}
function gateway() {
cargo build --bin wire-gateway --features test &> /dev/null
- target/debug/wire-gateway &> /dev/null &
+ target/debug/wire-gateway &> gateway.log &
for n in `seq 1 50`; do
echo -n "."
sleep 0.2
diff --git a/script/test_recover_db.sh b/script/test_recover_db.sh
@@ -52,9 +52,9 @@ check_balance 0.99990218 0.00008200
echo -n "Requesting exchange incoming transaction list:"
taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i 2>&1 | grep -q "504" && echo " OK" || echo " Failed"
-echo "---- Recover DB -----"
+echo "---- Reconnect DB -----"
-echo "Recovering database"
+echo "Start database"
sudo service postgresql start > /dev/null
sleep 5 # Wait for connection to be avaible
echo -n "Requesting exchange incoming transaction list:"
@@ -65,11 +65,46 @@ taler-exchange-wire-gateway-client \
-C payto://bitcoin/$CLIENT \
-a BTC:0.00002 > /dev/null
next_btc
-check_balance 0.99990218 0.00006001
+next_btc
+check_balance 0.99992218 0.00006001
echo " OK"
echo -n "Requesting exchange's outgoing transaction list:"
taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -o | grep BTC:0.00002 > /dev/null
echo " OK"
+echo "----- Destroy DB ------"
+
+echo "Stop all jobs"
+for n in `jobs -p`; do
+ kill $n || true
+done
+echo "Reset database"
+reset_db
+echo "Start bitcoin node"
+restart_btc
+echo "Start btc-wire"
+btc_wire
+echo "Start gateway"
+gateway
+
+echo "---- Recover DB -----"
+
+echo -n "Checking recover incoming transactions:"
+ALL=`taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i`;
+for amount in 0.000042 0.00004; do
+ echo $ALL | grep BTC:$amount > /dev/null && echo -n " OK" || echo -n " Failed"
+done
+echo ""
+
+echo -n "Requesting exchange's outgoing transaction list:"
+ALL=`taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -o`;
+for amount in 0.00002; do
+ echo $ALL | grep BTC:$amount > /dev/null && echo -n " OK" || echo -n " Failed"
+done
+echo ""
+
+# Balance should not have changed
+check_balance 0.99992218 0.00006001
+
echo "All tests passed"
\ No newline at end of file
diff --git a/wire-gateway/src/api_common.rs b/wire-gateway/src/api_common.rs
@@ -296,9 +296,13 @@ impl<const L: usize> FromStr for Base32<L> {
}
}
+pub fn crockford_base32_encode(bytes: &[u8]) -> String {
+ base32::encode(base32::Alphabet::Crockford, &bytes)
+}
+
impl<const L: usize> Display for Base32<L> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- f.write_str(&base32::encode(base32::Alphabet::Crockford, &self.0))
+ f.write_str(&crockford_base32_encode(&self.0))
}
}