commit 641d31b40b5e6bd9552bd34e3202303f70fdf7eb
parent 005ae3486e8a8d35d4b74d850e449086d267cebd
Author: Antoine A <>
Date: Fri, 14 Jan 2022 14:41:20 +0100
Improve handling of compromised wire state
Diffstat:
12 files changed, 126 insertions(+), 52 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -602,9 +602,9 @@ dependencies = [
[[package]]
name = "getrandom"
-version = "0.2.3"
+version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7fcd999463524c52659517fe2cea98493cfe485d10565e7b0fb07dbba7ad2753"
+checksum = "418d37c8b1d42553c93648be529cb70f920d3baf8ef469b74b9638df426e0b4c"
dependencies = [
"cfg-if",
"libc",
@@ -1425,9 +1425,9 @@ checksum = "9def91fd1e018fe007022791f865d0ccc9b3a0d5001e01aabb8b40e46000afb5"
[[package]]
name = "smallvec"
-version = "1.7.0"
+version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1ecab6c735a6bb4139c0caafd0cc3635748bbb3acf4550e8138122099251f309"
+checksum = "f2dd574626839106c320a323308629dcb1acfc96e32a8cba364ddc61ac23ee83"
[[package]]
name = "socket2"
@@ -1865,6 +1865,7 @@ dependencies = [
"hyperlocal",
"listenfd",
"miniz_oxide",
+ "postgres",
"rand",
"serde",
"serde_json",
diff --git a/btc-wire/src/bin/btc-wire-cli.rs b/btc-wire/src/bin/btc-wire-cli.rs
@@ -7,19 +7,31 @@ use postgres::{Client, NoTls};
fn main() {
let args: Vec<_> = std::env::args().collect();
+ // Parse taler config
let config = taler_common::config::InitConfig::load_from_file(&args[2]);
+ // Connect to database
+ let mut client = Client::connect(&config.db_url, NoTls).unwrap();
match args[1].as_str() {
"initdb" => {
- let mut client = Client::connect(&config.db_url, NoTls).unwrap();
+ // Load schema
client
.batch_execute(include_str!("../../../db/btc.sql"))
.unwrap();
+ // Init status to true
+ client
+ .execute(
+ "INSERT INTO state (name, value) VALUES ('status', $1)",
+ &[&[1u8].as_ref()],
+ )
+ .unwrap();
println!("Database initialised");
}
"initwallet" => {
+ // Parse bitcoin config
let btc_conf =
BitcoinConfig::load(config.btc_data_dir.unwrap_or_else(default_data_dir)).unwrap();
+ // Connect to bitcoin node
let mut rpc = BtcRpc::common(&btc_conf).unwrap();
let created = match rpc.create_wallet(WIRE_WALLET_NAME) {
Err(Error::RPC { code, .. }) if code == ErrorCode::RpcWalletError => false,
diff --git a/btc-wire/src/bin/test.rs b/btc-wire/src/bin/test.rs
@@ -365,7 +365,6 @@ impl TestRunner {
println!("OK");
self.nb_ok += 1;
} else {
- dbg!(&result);
println!("ERR");
self.nb_err += 1;
}
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -11,7 +11,6 @@ use reconnect::{AutoReconnectRPC, AutoReconnectSql};
use std::{
collections::{HashMap, HashSet},
fmt::Write,
- process::exit,
str::FromStr,
time::{Duration, SystemTime},
};
@@ -147,6 +146,7 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
// TODO check if transactions are abandoned
let mut lifetime = config.btc_lifetime;
+ let mut status = true;
// Alway start with a sync work
let mut skip_notification = true;
@@ -180,7 +180,7 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
while iter.next()?.is_some() {}
}
// Sync chain
- sync_chain(rpc, db, config)?;
+ sync_chain(rpc, db, config, &mut status)?;
// As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent
@@ -207,12 +207,13 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
}
}
-/// Parse new transactions, if exit whiteout failing the database is up to date with the latest mined block
+/// Parse new transactions, return true if the database is up to date with the latest mined block
fn sync_chain(
rpc: &mut BtcRpc,
db: &mut Client,
config: &Config,
-) -> Result<(), Box<dyn std::error::Error>> {
+ status: &mut bool,
+) -> Result<bool, Box<dyn std::error::Error>> {
// Get stored last_hash
let last_hash = last_hash(db)?;
let min_confirmations = config.confirmation;
@@ -237,9 +238,21 @@ fn sync_chain(
// Check if a confirmed incoming transaction have been removed by a blockchain reorganisation
- if !sync_chain_removed(&txs, &removed, rpc, db, min_confirmations as i32)? {
- // TODO sleep instead ?
- exit(1);
+ let new_status = sync_chain_removed(&txs, &removed, rpc, db, min_confirmations as i32)?;
+
+ if *status != new_status {
+ let mut tx = db.transaction()?;
+ tx.execute(
+ "UPDATE state SET value=$1 WHERE name='status'",
+ &[&[new_status as u8].as_ref()],
+ )?;
+ tx.execute("NOTIFY status", &[])?;
+ tx.commit()?;
+ *status = new_status;
+ }
+
+ if !new_status {
+ return Ok(false);
}
for (id, (category, confirmations)) in txs {
match category {
@@ -277,7 +290,7 @@ fn sync_chain(
error!("watcher: hash state collision, database have been altered by another process");
}
}
- Ok(())
+ Ok(true)
}
/// Sync database with removed transactions, return false if bitcoin backing is compromised
diff --git a/db/common.sql b/db/common.sql
@@ -1,3 +1,9 @@
+-- Key value state
+CREATE TABLE state (
+ name TEXT PRIMARY KEY,
+ value BYTEA NOT NULL
+);
+
-- Incoming transactions
CREATE TABLE tx_in (
id SERIAL PRIMARY KEY,
diff --git a/makefile b/makefile
@@ -15,4 +15,4 @@ test_btc:
test/btc/reorg.sh
test/btc/hell.sh
-test: test_gateway test_btc
-\ No newline at end of file
+test: install test_gateway test_btc
+\ No newline at end of file
diff --git a/test/btc/hell.sh b/test/btc/hell.sh
@@ -22,15 +22,6 @@ echo "Start gateway"
gateway
echo ""
-# Check btc-wire is running
-function up() {
- check_up $WIRE_PID btc_wire
-}
-# Check btc-wire is not running
-function down() {
- check_down $WIRE_PID btc_wire
-}
-
echo "----- Handle reorg conflicting incoming receive -----"
echo "Loose second bitcoin node"
@@ -43,16 +34,16 @@ check_balance 9.99579209 0.00420000
echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
-up
+gateway_up
btc2_fork
check_balance 9.99579209 0.00000000
-down
+gateway_down
echo " OK"
echo -n "Check btc-wire hard error on restart:"
btc_wire
sleep 1
-down
+gateway_down
echo " OK"
echo -n "Generate conflict:"
@@ -66,7 +57,7 @@ echo " OK"
echo -n "Check btc-wire never heal on restart:"
btc_wire
sleep 1
-down
+gateway_down
check_balance 9.99457382 0.00540000
echo " OK"
@@ -107,16 +98,16 @@ check_balance 9.99998674 0.00001000
echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
-up
+gateway_up
btc2_fork
check_balance 9.95799859 0.00000000
-down
+gateway_down
echo " OK"
echo -n "Check btc-wire hard error on restart:"
btc_wire
sleep 1
-down
+gateway_down
echo " OK"
echo -n "Generate conflict:"
@@ -132,7 +123,7 @@ sleep 5
echo -n "Check btc-wire never heal on restart:"
btc_wire
sleep 1
-down
+gateway_down
check_balance 9.94597382 0.05400000
echo " OK"
diff --git a/test/btc/reorg.sh b/test/btc/reorg.sh
@@ -22,15 +22,6 @@ echo "Start gateway"
gateway
echo ""
-# Check btc-wire is running
-function up() {
- check_up $WIRE_PID btc_wire
-}
-# Check btc-wire is not running
-function down() {
- check_down $WIRE_PID btc_wire
-}
-
SEQ="seq 10 20"
echo "----- Handle reorg incoming transactions -----"
@@ -49,16 +40,16 @@ check_balance 9.99826299 0.00165000
echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
-up
+gateway_up
btc2_fork
check_balance 9.99826299 0.00000000
-down
+gateway_down
echo " OK"
echo -n "Check btc-wire hard error on restart:"
btc_wire
sleep 1
-down
+gateway_down
echo " OK"
echo -n "Recover orphaned transactions:"
@@ -69,7 +60,7 @@ echo " OK"
echo -n "Check btc-wire heal on restart:"
btc_wire
sleep 1
-up
+gateway_up
echo " OK"
echo "----- Handle reorg outgoing transactions -----"
@@ -91,10 +82,10 @@ check_balance 9.99842799 0.00146311
echo " OK"
echo -n "Perform fork and check btc-wire still up:"
-up
+gateway_up
btc2_fork
check_balance 9.99826299 0.00146311
-up
+gateway_up
echo " OK"
echo -n "Recover orphaned transactions:"
@@ -120,10 +111,10 @@ check_balance "*" 0.00011000
echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
-up
+gateway_up
btc2_fork
check_balance "*" 0.00000000
-down
+gateway_down
echo " OK"
echo -n "Recover orphaned transactions:"
diff --git a/test/common.sh b/test/common.sh
@@ -222,6 +222,22 @@ function gateway() {
done
}
+# Check wire-gateway is healthy
+function gateway_up() {
+ if [ `curl -w %{http_code} -s ${BANK_ENDPOINT}history/outgoing -o /dev/null` -ne 400 ]; then
+ echo "gateway should be up"
+ exit 1
+ fi
+}
+
+# Check wire-gateway is down (backend is blocked)
+function gateway_down() {
+ if [ `curl -w %{http_code} -s ${BANK_ENDPOINT}history/outgoing -o /dev/null` -ne 502 ]; then
+ echo "gateway should be down"
+ exit 1
+ fi
+}
+
# Check history endpoint request return a specific amount of transactions of specific amounts
# usage: check_delta endpoint nb_txs amount_sequence
function check_delta() {
diff --git a/wire-gateway/Cargo.toml b/wire-gateway/Cargo.toml
@@ -30,6 +30,7 @@ miniz_oxide = "0.5.1"
rand = { version = "0.8.4", features = ["getrandom"] }
# Async postgres client
tokio-postgres = { version = "0.7.5" }
+postgres = { version = "0.19.2" }
deadpool-postgres = "0.10.1"
# Socket activation
listenfd = "0.3.5"
diff --git a/wire-gateway/README.md b/wire-gateway/README.md
@@ -16,6 +16,12 @@ Depend on the used wire implementation :
The server is wire implementation agnostic, it only require an postgres database with following schema:
```sql
+-- Key value state
+CREATE TABLE state (
+ name TEXT PRIMARY KEY,
+ value BYTEA NOT NULL
+);
+
-- Incoming transactions
CREATE TABLE tx_in (
id SERIAL PRIMARY KEY,
diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs
@@ -7,11 +7,12 @@ use hyper::{
};
use json::{encode_body, parse_body};
use listenfd::ListenFd;
+use postgres::fallible_iterator::FallibleIterator;
use std::{
convert::Infallible,
str::FromStr,
- sync::atomic::{AtomicU64, Ordering},
- time::Instant,
+ sync::atomic::{AtomicBool, AtomicU64, Ordering},
+ time::{Duration, Instant},
};
use taler_common::{
api_common::{Amount, SafeUint64, ShortHashCode, Timestamp},
@@ -34,6 +35,7 @@ struct ServerState {
config: taler_common::config::Config,
notify: Notify,
lifetime: Option<AtomicU64>,
+ status: AtomicBool,
}
impl ServerState {
@@ -106,8 +108,10 @@ async fn main() {
config: conf.clone(),
notify: Notify::new(),
lifetime: conf.http_lifetime.map(AtomicU64::new),
+ status: AtomicBool::new(true),
};
let state: &'static ServerState = Box::leak(Box::new(state));
+ std::thread::spawn(move || status_watcher(state));
let service = service_fn(move |req| async move {
state.step();
let start = Instant::now();
@@ -236,6 +240,14 @@ async fn router(
body: Body,
state: &'static ServerState,
) -> Result<Response<Body>, ServerError> {
+ // Check status error
+ if !state.status.load(Ordering::SeqCst) {
+ return Ok(Response::builder()
+ .status(StatusCode::BAD_GATEWAY)
+ .body(Body::empty())
+ .unwrap());
+ }
+
let response = match parts.uri.path() {
"/transfer" => {
assert_method(parts, Method::POST)?;
@@ -445,3 +457,29 @@ async fn router(
};
return Ok(response);
}
+
+/// Listen to backend status change
+fn status_watcher(state: &'static ServerState) {
+ fn inner(state: &'static ServerState) -> Result<(), Box<dyn std::error::Error>> {
+ let mut db = postgres::Client::connect(&state.config.db_url, NoTls)?;
+ // Register as listener
+ db.batch_execute("LISTEN status")?;
+ loop {
+ // Sync state
+ let row = db.query_one("SELECT value FROM state WHERE name = 'status'", &[])?;
+ let status: &[u8] = row.get(0);
+ dbg!(status);
+ assert!(status.len() == 1 && status[0] < 2);
+ state.status.store(status[0] == 1, Ordering::SeqCst);
+ // Wait for next notification
+ db.notifications().blocking_iter().next()?;
+ }
+ }
+
+ loop {
+ if let Err(err) = inner(state) {
+ error!("status-watcher: {err}");
+ }
+ std::thread::sleep(Duration::from_secs(5));
+ }
+}