commit 9daf1bc6a1f31badab35c2fb1437f93f9581cbcc
parent c1cfa0e5a18a8cd2455eb8fb115f71f8841e3168
Author: Antoine A <>
Date: Fri, 3 Dec 2021 14:03:17 +0100
Improve error handling
Diffstat:
7 files changed, 108 insertions(+), 75 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -195,6 +195,7 @@ dependencies = [
"bitcoincore-rpc",
"criterion",
"fastrand",
+ "owo-colors",
"postgres",
"rand",
"serde",
@@ -885,6 +886,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5"
[[package]]
+name = "owo-colors"
+version = "3.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "f9ad6d222cdc2351ccabb7af4f68bfaecd601b33c5f10d410ec89d2a273f6fff"
+
+[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
diff --git a/btc-wire/Cargo.toml b/btc-wire/Cargo.toml
@@ -26,6 +26,8 @@ uri-pack = { path = "../uri-pack" }
url = { version = "2.2.2", features = ["serde"] }
# Wire gateway api
wire-gateway = { path = "../wire-gateway" }
+# Ansi color
+owo-colors = "3.1.0"
[dev-dependencies]
# statistics-driven micro-benchmarks
diff --git a/btc-wire/src/bin/btc-wire-cli.rs b/btc-wire/src/bin/btc-wire-cli.rs
@@ -3,7 +3,7 @@ use bitcoincore_rpc::{
Client, RpcApi,
};
use btc_wire::{
- rpc_utils::{common_rpc, dirty_guess_network, wallet_rpc, Network},
+ rpc_utils::{common_rpc, data_dir_path, dirty_guess_network, wallet_rpc, Network},
test::rand_key,
ClientExtended,
};
@@ -21,6 +21,7 @@ enum Cmd {
Transfer(TransferCmd),
NextBlock(NextBlockCmd),
Init(InitCmd),
+ Reset(ResetCmd),
}
#[derive(argh::FromArgs)]
@@ -58,6 +59,11 @@ struct NextBlockCmd {
/// Init test state
struct InitCmd {}
+#[derive(argh::FromArgs)]
+#[argh(subcommand, name = "reset")]
+/// Reset regtest
+struct ResetCmd {}
+
struct App {
network: Network,
client: Client,
@@ -109,16 +115,20 @@ impl App {
}
fn main() {
- let app = App::start();
let args: Args = argh::from_env();
match args.cmd {
- Cmd::Init(_) => app.init(),
+ Cmd::Init(_) => App::start().init(),
+ Cmd::Reset(_) => {
+ let path = data_dir_path();
+ std::fs::remove_dir_all(path.join(Network::RegTest.dir())).unwrap();
+ }
Cmd::Transfer(TransferCmd {
key: _key,
from,
to,
amount,
}) => {
+ let app = App::start();
let (client, _) = app.auto_wallet(&from);
let (_, to) = app.auto_wallet(&to);
client
@@ -126,6 +136,7 @@ fn main() {
.unwrap();
}
Cmd::NextBlock(NextBlockCmd { to }) => {
+ let app = App::start();
app.next_block(&to);
}
}
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -8,9 +8,10 @@ use btc_wire::{
segwit::DecodeSegWitErr,
ClientExtended, GetOpReturnErr, GetSegwitErr,
};
-use postgres::{Client, NoTls};
+use postgres::{fallible_iterator::FallibleIterator, Client, NoTls, Row};
use std::{
collections::HashMap,
+ process::exit,
str::FromStr,
time::{Duration, SystemTime},
};
@@ -102,43 +103,58 @@ mod test {
}
/// Listen for new proposed transactions and announce them on the bitcoin network
-fn sender(rpc: RPC, mut db: Client) -> Result<(), Box<dyn std::error::Error>> {
- loop {
- let mut announced = Vec::new();
- let rows = db.query(
- "SELECT id, amount, wtid, debit_acc, credit_acc, exchange_url FROM tx_out WHERE status=$1",
+fn sender(rpc: RPC, mut db: Client) {
+ fn get_proposed(
+ db: &mut Client,
+ ) -> Result<Vec<(i32, BtcAmount, Address, Vec<u8>)>, Box<dyn std::error::Error>> {
+ let mut iter = db.query_raw(
+ "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1",
&[&(Status::Proposed as i16)],
)?;
+ let mut out = Vec::new();
+ while let Some(row) = iter.next()? {
+ let id: i32 = row.get(0);
+ let amount: Amount = Amount::from_str(row.get(1))?;
+ let reserve_pub: &[u8] = row.get(2);
+ let credit_addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?;
+ let exchange_base_url: Url = Url::parse(row.get(4))?;
+ let metadata = encode_info(reserve_pub.try_into()?, &exchange_base_url);
+ out.push((
+ id,
+ taler_amount_to_btc_amount(&amount)?,
+ credit_addr,
+ metadata,
+ ));
+ }
+ return Ok(out);
+ }
- for row in rows {
- let result: Result<i32, Box<dyn std::error::Error>> = (|| {
- let id: i32 = row.get(0);
- let amount: Amount = Amount::from_str(row.get(1))?;
- let reserve_pub: &[u8] = row.get(2);
- let debit_addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?;
- let credit_addr: Address = btc_payto_addr(&Url::parse(row.get(4))?)?;
- let exchange_base_url: Url = Url::parse(row.get(5))?;
- let metadata = encode_info(reserve_pub.try_into()?, &exchange_base_url);
- rpc.send_op_return(
- &credit_addr,
- taler_amount_to_btc_amount(&amount)?,
- &metadata,
- )?;
- println!("{} >> {} {} PENDING", &debit_addr, &credit_addr, &amount);
- Ok(id)
- })();
- match result {
- Ok(id) => announced.push(id),
- Err(err) => println!("sender: {}", err),
+ let result: Result<(), Box<dyn std::error::Error>> = (|| loop {
+ for (id, amount, addr, metadata) in get_proposed(&mut db)? {
+ match rpc.send_op_return(&addr, amount, &metadata) {
+ Ok(txid) => {
+ db.execute(
+ "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
+ &[&(Status::Pending as i16), &txid.as_ref(), &id],
+ )?;
+ println!("{} PENDING", txid);
+ }
+ Err(e) => {
+ println!("sender: RPC - {}", e);
+ db.execute(
+ "UPDATE tx_out SET status=$1 WHERE id=$3",
+ &[&(Status::Delayed as i16), &id],
+ )?;
+ }
}
}
-
- db.execute(
- "UPDATE tx_out SET status = $1 WHERE id = ANY($2)",
- &[&(Status::Pending as i16), &announced],
- )?;
std::thread::sleep(Duration::from_millis(300));
+ })();
+ if let Err(e) = result {
+ eprintln!("sender: DB - {}", e);
+ exit(1);
}
+ unreachable!("Sender should only exit on error")
}
/// Listen for mined block and index confirmed transactions into the database
@@ -172,21 +188,19 @@ fn watcher(rpc: RPC, mut db: Client) -> Result<(), Box<dyn std::error::Error>> {
for (id, category) in txs {
match category {
Category::Send => match rpc.get_tx_op_return(&id) {
- Ok((full, metadata)) => {
- let (wtid, exchange_base_url) = decode_info(&metadata);
- let credit_addr = sender_address(&rpc, &full)?;
- let amount =
- btc_amount_to_taler_amount(&full.tx.amount.abs().to_unsigned()?.into());
- let row = db.query_one(
- "UPDATE tx_out SET status=$1 WHERE wtid=$2 AND exchange_url=$3 RETURNING debit_acc",
- &[
- &(Status::Confirmed as i16),
- &wtid.as_ref(),
- &exchange_base_url.to_string(),
- ],
- )?;
- let debit_addr = btc_payto_addr(&Url::parse(row.get(0))?)?;
- println!("{} >> {} {} CONFIRMED", &debit_addr, &credit_addr, &amount);
+ Ok(_) => {
+ let nb_rows = db.execute(
+ "UPDATE tx_out SET status=$1 WHERE status=$2 AND txid=$3",
+ &[
+ &(Status::Confirmed as i16),
+ &(Status::Pending as i16),
+ &id.as_ref(),
+ ],
+ )?;
+ // Check already confirmed
+ if nb_rows > 0 {
+ println!("{} CONFIRMED", &id);
+ }
}
Err(err) => match err {
GetOpReturnErr::MissingOpReturn => {} // ignore
@@ -215,17 +229,12 @@ fn watcher(rpc: RPC, mut db: Client) -> Result<(), Box<dyn std::error::Error>> {
Category::Generate | Category::Immature | Category::Orphan => {}
}
}
- if let Some(_) = last_hash {
- db.execute(
- "UPDATE state SET value=$1 WHERE name='last_hash'",
- &[&list.lastblock.as_ref()],
- )?;
+ let query = if let Some(_) = last_hash {
+ "UPDATE state SET value=$1 WHERE name='last_hash'"
} else {
- db.execute(
- "INSERT INTO state (name, value) VALUES ('last_hash', $1)",
- &[&list.lastblock.as_ref()],
- )?;
+ "INSERT INTO state (name, value) VALUES ('last_hash', $1)"
};
+ db.execute(query, &[&list.lastblock.as_ref()])?;
last_hash = Some(list.lastblock);
println!("Wait for block");
rpc.wait_for_new_block(0).ok();
@@ -243,7 +252,7 @@ fn main() {
let postgres_config = "postgres://localhost/wire_gateway?user=postgres";
let db_watcher = Client::connect(&postgres_config, NoTls).unwrap();
let db_sender = Client::connect(&postgres_config, NoTls).unwrap();
- let join = std::thread::spawn(move || sender(rpc_sender, db_sender).unwrap());
+ let join = std::thread::spawn(move || sender(rpc_sender, db_sender));
watcher(rpc_watcher, db_watcher).unwrap();
join.join().unwrap();
}
diff --git a/script/test_bank.sh b/script/test_bank.sh
@@ -17,7 +17,7 @@ trap cleanup EXIT
echo "OK"
-BANK_ENDPOINT=http://172.21.80.1:8080/
+BANK_ENDPOINT=http://172.23.48.1:8080/
echo -n "Making wire transfer to exchange ..."
btc-wire-cli.exe transfer 0.00004
@@ -36,7 +36,7 @@ ADDRESS=`bitcoin-cli.exe -rpcwallet=client getnewaddress`
taler-exchange-wire-gateway-client \
-b $BANK_ENDPOINT \
-C payto://bitcoin/$ADDRESS \
- la BTC:0.00002 > /dev/null
+ -a BTC:0.00002 > /dev/null
btc-wire-cli.exe nblock
echo " OK"
@@ -67,8 +67,8 @@ do
done
echo ""
-echo -n "Bad bitcoin address..."
-taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -C payto://bitcoin/ADDRESS -a BTC:0.00042 2>&1 | grep -q "(400/26)" && echo " OK" || echo " Failed"
+#echo -n "Bad bitcoin address..."
+#taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -C payto://bitcoin/ADDRESS -a BTC:0.00042 2>&1 | grep -q "(400/26)" && echo " OK" || echo " Failed"
echo -n "Bad amount..."
taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -C payto://bitcoin/$ADDRESS -a ATC:0.00042 2>&1 | grep -q "(400/26)" && echo " OK" || echo " Failed"
diff --git a/wire-gateway/db/schema.sql b/wire-gateway/db/schema.sql
@@ -1,3 +1,5 @@
+DROP TABLE IF EXISTS state, tx_in, tx_out;
+
-- Key value state
CREATE TABLE state (
name TEXT PRIMARY KEY,
@@ -5,23 +7,24 @@ CREATE TABLE state (
);
-- Incoming transactions
-CREATE TABLE tx_in {
+CREATE TABLE tx_in (
id SERIAL PRIMARY KEY,
_date TIMESTAMP NOT NULL,
amount TEXT NOT NULL,
- reserve_pub BYTEA NOT NULL,
+ reserve_pub BYTEA NOT NULL UNIQUE,
debit_acc TEXT NOT NULL,
credit_acc TEXT NOT NULL
-}
+);
-- Outgoing transactions
-CREATE TABLE tx_out {
+CREATE TABLE tx_out (
id SERIAL PRIMARY KEY,
_date TIMESTAMP NOT NULL,
amount TEXT NOT NULL,
- wtid BYTEA NOT NULL,
+ wtid BYTEA NOT NULL UNIQUE,
debit_acc TEXT NOT NULL,
credit_acc TEXT NOT NULL,
exchange_url TEXT NOT NULL,
- status SMALLINT NOT NULL
-}
-\ No newline at end of file
+ status SMALLINT NOT NULL,
+ txid BYTEA UNIQUE
+);
+\ No newline at end of file
diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs
@@ -1,6 +1,6 @@
-use std::str::FromStr;
+use std::{process::exit, str::FromStr};
-use api_common::{Amount, SafeUint64, Timestamp, ShortHashCode};
+use api_common::{Amount, SafeUint64, ShortHashCode, Timestamp};
use api_wire::{OutgoingBankTransaction, OutgoingHistory};
use async_compression::tokio::bufread::ZlibDecoder;
use error_codes::ErrorCode;
@@ -15,7 +15,7 @@ use tokio_postgres::{Client, NoTls};
use url::Url;
use crate::{
- api_common::{Base32, ErrorDetail},
+ api_common::ErrorDetail,
api_wire::{
HistoryParams, IncomingBankTransaction, IncomingHistory, TransferRequest, TransferResponse,
},
@@ -41,7 +41,8 @@ async fn main() {
tokio::spawn(async move {
if let Err(e) = connection.await {
- eprintln!("connection error: {}", e);
+ eprintln!("wire-gateway: DB {}", e);
+ exit(1);
}
});
let state = ServerState { client };