commit 0a63ad8bcd97be33fbfa87e358de3bd99891300c
parent f1aaee3fa20cfaa595eb4d3716d32d918b3c2dae
Author: Antoine A <>
Date: Tue, 8 Feb 2022 15:31:37 +0100
eth-wire: handle RPC and database reconnection
Diffstat:
23 files changed, 498 insertions(+), 250 deletions(-)
diff --git a/btc-wire/src/loops/analysis.rs b/btc-wire/src/loops/analysis.rs
@@ -15,21 +15,19 @@
*/
use std::sync::atomic::Ordering;
-use btc_wire::rpc::ChainTipsStatus;
+use btc_wire::rpc::{AutoReconnectRPC, ChainTipsStatus};
use common::{
log::log::{error, warn},
postgres::fallible_iterator::FallibleIterator,
+ reconnect::AutoReconnectDb,
};
-use crate::{
- reconnect::{AutoReconnectRPC, AutoReconnectSql},
- WireState,
-};
+use crate::WireState;
use super::LoopResult;
/// Analyse blockchain behavior and adapt confirmations in real time
-pub fn analysis(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, state: &WireState) {
+pub fn analysis(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb, state: &WireState) {
// The biggest fork ever seen
let mut max_seen = 0;
loop {
diff --git a/btc-wire/src/loops/watcher.rs b/btc-wire/src/loops/watcher.rs
@@ -1,3 +1,4 @@
+use btc_wire::rpc::AutoReconnectRPC;
/*
This file is part of TALER
Copyright (C) 2022 Taler Systems SA
@@ -13,14 +14,12 @@
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use common::log::log::error;
-
-use crate::reconnect::{AutoReconnectRPC, AutoReconnectSql};
+use common::{log::log::error, reconnect::AutoReconnectDb};
use super::LoopResult;
/// Wait for new block and notify arrival with postgreSQL notifications
-pub fn watcher(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) {
+pub fn watcher(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb) {
loop {
let rpc = rpc.client();
let db = db.client();
diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs
@@ -22,7 +22,7 @@ use std::{
use bitcoin::{hashes::Hash, Amount as BtcAmount, BlockHash, Txid};
use btc_wire::{
- rpc::{self, Category, ErrorCode, Rpc, TransactionFull},
+ rpc::{self, AutoReconnectRPC, Category, ErrorCode, Rpc, TransactionFull},
rpc_utils::sender_address,
GetOpReturnErr, GetSegwitErr,
};
@@ -30,6 +30,7 @@ use common::{
api_common::base32,
log::log::{error, info, warn},
postgres,
+ reconnect::AutoReconnectDb,
sql::{sql_array, sql_url},
status::{BounceStatus, WithdrawStatus},
};
@@ -38,7 +39,6 @@ use postgres::{fallible_iterator::FallibleIterator, Client};
use crate::{
fail_point::fail_point,
metadata::OutMetadata,
- reconnect::{AutoReconnectRPC, AutoReconnectSql},
sql::{sql_addr, sql_btc_amount, sql_txid},
taler_util::{btc_payto_url, btc_to_taler},
WireState,
@@ -46,8 +46,7 @@ use crate::{
use super::{LoopError, LoopResult};
-/// Listen for new proposed transactions and announce them on the bitcoin network
-pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, state: &WireState) {
+pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb, state: &WireState) {
let mut lifetime = state.config.wire_lifetime;
let mut status = true;
let mut skip_notification = false;
@@ -100,7 +99,7 @@ pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, state: &WireS
// Sync chain
sync_chain(rpc, db, state, &mut status)?;
- // As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent
+ // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent
// Send requested withdraws
while withdraw(db, rpc)? {}
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -16,14 +16,14 @@
use bitcoin::Network;
use btc_wire::{
config::{BitcoinConfig, WIRE_WALLET_NAME},
- rpc::Rpc,
+ rpc::{auto_reconnect_rpc, Rpc},
rpc_utils::default_data_dir,
};
use common::{
config::{load_btc_config, BtcConfig},
log::log::info,
+ reconnect::auto_reconnect_db,
};
-use reconnect::{AutoReconnectRPC, AutoReconnectSql};
use std::{sync::atomic::AtomicU16, thread::JoinHandle};
use crate::loops::{analysis::analysis, watcher::watcher, worker::worker};
@@ -31,7 +31,6 @@ use crate::loops::{analysis::analysis, watcher::watcher, worker::worker};
mod fail_point;
mod loops;
mod metadata;
-mod reconnect;
mod sql;
mod taler_util;
@@ -43,7 +42,9 @@ pub struct WireState {
fn main() {
common::log::init();
- let config = load_btc_config(Some(&std::env::args().nth(1).expect("Missing conf path arg")));
+ let config = load_btc_config(Some(
+ &std::env::args().nth(1).expect("Missing conf path arg"),
+ ));
let data_dir = config
.core
.data_dir
@@ -74,13 +75,13 @@ fn main() {
let mut rpc = Rpc::common(&btc_config).unwrap();
rpc.load_wallet(WIRE_WALLET_NAME).ok();
- let rpc_watcher = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME);
- let rpc_analysis = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME);
- let rpc_worker = AutoReconnectRPC::new(btc_config, WIRE_WALLET_NAME);
+ let rpc_watcher = auto_reconnect_rpc(btc_config.clone(), WIRE_WALLET_NAME);
+ let rpc_analysis = auto_reconnect_rpc(btc_config.clone(), WIRE_WALLET_NAME);
+ let rpc_worker = auto_reconnect_rpc(btc_config, WIRE_WALLET_NAME);
- let db_watcher = AutoReconnectSql::new(&state.config.core.db_url);
- let db_analysis = AutoReconnectSql::new(&state.config.core.db_url);
- let db_worker = AutoReconnectSql::new(&state.config.core.db_url);
+ let db_watcher = auto_reconnect_db(state.config.core.db_url.clone());
+ let db_analysis = auto_reconnect_db(state.config.core.db_url.clone());
+ let db_worker = auto_reconnect_db(state.config.core.db_url.clone());
named_spawn("watcher", move || watcher(rpc_watcher, db_watcher));
named_spawn("analysis", move || {
analysis(rpc_analysis, db_analysis, state)
diff --git a/btc-wire/src/reconnect.rs b/btc-wire/src/reconnect.rs
@@ -1,106 +0,0 @@
-/*
- This file is part of TALER
- Copyright (C) 2022 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-//! Wrapper for RPC or PostgreSQL connection which automatically reconnect on error
-
-use std::time::Duration;
-
-use btc_wire::{config::BitcoinConfig, rpc::Rpc};
-use common::log::log::error;
-use common::postgres::{Client, NoTls};
-
-const RECONNECT_DELAY: Duration = Duration::from_secs(5);
-
-/// auto-reconnect RPC
-pub struct AutoReconnectRPC {
- config: BitcoinConfig,
- wallet: String,
- client: Rpc,
-}
-
-impl AutoReconnectRPC {
- pub fn new(config: BitcoinConfig, wallet: impl Into<String>) -> Self {
- let wallet: String = wallet.into();
- Self {
- client: Self::connect(&config, &wallet),
- wallet,
- config,
- }
- }
-
- /// Connect a new client, loop on error
- fn connect(config: &BitcoinConfig, wallet: &str) -> Rpc {
- loop {
- match Rpc::wallet(config, wallet) {
- Ok(mut new) => match new.net_info() {
- Ok(_) => return new,
- Err(err) => {
- error!("connect RPC: {}", err);
- std::thread::sleep(RECONNECT_DELAY);
- }
- },
- Err(err) => {
- error!("connect RPC: {}", err);
- std::thread::sleep(RECONNECT_DELAY);
- }
- }
- }
- }
-
- /// Get a mutable connection, block until a connection can be established
- pub fn client(&mut self) -> &mut Rpc {
- if self.client.net_info().is_err() {
- self.client = Self::connect(&self.config, &self.wallet);
- }
- &mut self.client
- }
-}
-
-/// auto-reconnect SQL
-pub struct AutoReconnectSql {
- config: String,
- client: Client,
-}
-
-impl AutoReconnectSql {
- pub fn new(config: impl Into<String>) -> Self {
- let config: String = config.into();
- Self {
- client: Self::connect(&config),
- config,
- }
- }
-
- /// Connect a new client, loop on error
- fn connect(config: &str) -> Client {
- loop {
- match Client::connect(config, NoTls) {
- Ok(new) => return new,
- Err(err) => {
- error!("connect DB: {}", err);
- std::thread::sleep(RECONNECT_DELAY);
- }
- }
- }
- }
-
- /// Get a mutable connection, block until a connection can be established
- pub fn client(&mut self) -> &mut Client {
- if self.client.is_valid(RECONNECT_DELAY).is_err() {
- self.client = Self::connect(&self.config);
- }
- &mut self.client
- }
-}
diff --git a/btc-wire/src/rpc.rs b/btc-wire/src/rpc.rs
@@ -26,6 +26,7 @@
//! bitcoincore RPC documentation: <https://bitcoincore.org/en/doc/22.0.0/>
use bitcoin::{hashes::hex::ToHex, Address, Amount, BlockHash, SignedAmount, Txid};
+use common::{log::log::error, reconnect::AutoReconnect};
use serde_json::{json, Value};
use std::{
fmt::Debug,
@@ -36,6 +37,20 @@ use std::{
use crate::config::{BitcoinConfig, BtcAuth};
+pub type AutoReconnectRPC = AutoReconnect<(BitcoinConfig, &'static str), Rpc>;
+
+pub fn auto_reconnect_rpc(config: BitcoinConfig, wallet: &'static str) -> AutoReconnectRPC {
+ AutoReconnect::new(
+ (config, wallet),
+ |(config, wallet)| {
+ Rpc::wallet(config, wallet)
+ .map_err(|err| error!("connect RPC: {}", err))
+ .ok()
+ },
+ |client| client.net_info().is_err(),
+ )
+}
+
#[derive(Debug, serde::Serialize)]
struct RpcRequest<'a, T: serde::Serialize> {
method: &'a str,
diff --git a/common/src/lib.rs b/common/src/lib.rs
@@ -1,4 +1,3 @@
-use rand::{rngs::OsRng, RngCore};
/*
This file is part of TALER
Copyright (C) 2022 Taler Systems SA
@@ -14,6 +13,8 @@ use rand::{rngs::OsRng, RngCore};
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
+use rand::{rngs::OsRng, RngCore};
+
pub use postgres;
pub use rand;
pub use url;
@@ -23,6 +24,7 @@ pub mod api_wire;
pub mod config;
pub mod error_codes;
pub mod log;
+pub mod reconnect;
pub mod sql;
pub mod status;
diff --git a/common/src/reconnect.rs b/common/src/reconnect.rs
@@ -0,0 +1,71 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2022 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+use std::time::Duration;
+
+use log::error;
+use postgres::{Client, NoTls};
+
+const RECONNECT_DELAY: Duration = Duration::from_secs(5);
+
+pub struct AutoReconnect<S, C> {
+ config: S,
+ client: C,
+ connect: fn(&S) -> Option<C>,
+ check: fn(&mut C) -> bool,
+}
+
+impl<S, C> AutoReconnect<S, C> {
+ pub fn new(config: S, connect: fn(&S) -> Option<C>, check: fn(&mut C) -> bool) -> Self {
+ Self {
+ client: Self::connect(&config, connect),
+ connect,
+ check,
+ config,
+ }
+ }
+
+ /// Create a new client, loop on error
+ fn connect(config: &S, connect: fn(&S) -> Option<C>) -> C {
+ loop {
+ match connect(config) {
+ Some(new) => return new,
+ None => std::thread::sleep(RECONNECT_DELAY),
+ }
+ }
+ }
+
+ /// Get a mutable connection, block until a connection can be established
+ pub fn client(&mut self) -> &mut C {
+ if (self.check)(&mut self.client) {
+ self.client = Self::connect(&self.config, self.connect);
+ }
+ &mut self.client
+ }
+}
+
+pub type AutoReconnectDb = AutoReconnect<String, Client>;
+
+pub fn auto_reconnect_db(config: String) -> AutoReconnectDb {
+ AutoReconnect::new(
+ config,
+ |config| {
+ Client::connect(config, NoTls)
+ .map_err(|err| error!("connect DB: {}", err))
+ .ok()
+ },
+ |client| client.is_valid(RECONNECT_DELAY).is_err(),
+ )
+}
diff --git a/eth-wire/src/bin/eth-wire-utils.rs b/eth-wire/src/bin/eth-wire-utils.rs
@@ -15,10 +15,17 @@
*/
use std::{path::PathBuf, str::FromStr};
-use common::{api_common::Amount, log::init, rand_slice};
+use common::{
+ api_common::Amount,
+ config::{Config, CoreConfig},
+ log::init,
+ postgres::{Client, NoTls},
+ rand_slice,
+};
use eth_wire::{
rpc::{hex::Hex, Rpc, TransactionRequest},
taler_util::taler_to_eth,
+ SyncState,
};
use ethereum_types::H160;
@@ -38,7 +45,7 @@ enum Cmd {
Send(SendCmd),
Deposit(DepositCmd),
Mine(MineCmd),
- ClearDB(ClearCmd),
+ ResetDb(ResetCmd),
Balance(BalanceCmd),
Connect(ConnectCmd),
Disconnect(DisconnectCmd),
@@ -99,12 +106,12 @@ struct MineCmd {
}
#[derive(argh::FromArgs)]
-#[argh(subcommand, name = "cleardb")]
+#[argh(subcommand, name = "resetdb")]
/// Clear database
-struct ClearCmd {
+struct ResetCmd {
#[argh(positional)]
/// taler config
- config: PathBuf,
+ config: String,
}
#[derive(argh::FromArgs)]
@@ -197,7 +204,28 @@ fn main() {
let balance = rpc.balance(&addr).unwrap();
println!("{}", (balance / 10_000_000_000u64).as_u64());
}
- Cmd::ClearDB(_) => todo!(),
+ Cmd::ResetDb(ResetCmd { config }) => {
+ let config = CoreConfig::load_taler_config(Some(&config));
+ let block = rpc.earliest_block().unwrap();
+ let mut db = Client::connect(&config.db_url, NoTls).unwrap();
+ let mut tx = db.transaction().unwrap();
+ // Clear transaction tables and reset state
+ tx.execute("DELETE FROM tx_in", &[]).unwrap();
+ tx.execute("DELETE FROM tx_out", &[]).unwrap();
+ tx.execute("DELETE FROM bounce", &[]).unwrap();
+ tx.execute(
+ "UPDATE state SET value=$1 WHERE name='sync'",
+ &[&SyncState {
+ tip_hash: block.hash.unwrap(),
+ tip_height: block.number.unwrap(),
+ conf_height: block.number.unwrap(),
+ }
+ .to_bytes()
+ .as_ref()],
+ )
+ .unwrap();
+ tx.commit().unwrap();
+ }
Cmd::Connect(ConnectCmd { datadir }) => {
let mut peer = Rpc::new(datadir.join("geth.ipc")).unwrap();
let mut enode = peer.node_info().unwrap().enode;
diff --git a/eth-wire/src/loops/watcher.rs b/eth-wire/src/loops/watcher.rs
@@ -13,13 +13,26 @@
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use eth_wire::rpc::Rpc;
-use common::postgres::Client;
+use common::{log::log::error, reconnect::AutoReconnectDb};
+use eth_wire::rpc::AutoReconnectRPC;
-pub fn watcher(mut rpc: Rpc, mut db: Client) {
- let mut notifier = rpc.subscribe_new_head().unwrap();
+use crate::LoopResult;
+
+/// Wait for new block and notify arrival with postgreSQL notifications
+pub fn watcher(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb) {
loop {
- db.execute("NOTIFY new_block", &[]).unwrap();
- notifier.next().unwrap();
+ let rpc = rpc.client();
+ let db = db.client();
+
+ let result: LoopResult<()> = (|| {
+ let mut notifier = rpc.subscribe_new_head()?;
+ loop {
+ db.execute("NOTIFY new_block", &[])?;
+ notifier.next()?;
+ }
+ })();
+ if let Err(e) = result {
+ error!("watcher: {}", e);
+ }
}
}
diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs
@@ -17,14 +17,15 @@ use std::{fmt::Write, sync::atomic::Ordering, time::SystemTime};
use common::{
api_common::base32,
- log::log::{error, info},
+ log::log::{error, info, warn},
postgres::{fallible_iterator::FallibleIterator, Client},
+ reconnect::AutoReconnectDb,
sql::{sql_array, sql_url},
status::{BounceStatus, WithdrawStatus},
};
use eth_wire::{
- metadata::InMetadata,
- rpc::{self, Rpc, Transaction},
+ metadata::{InMetadata, OutMetadata},
+ rpc::{self, AutoReconnectRPC, Rpc, Transaction},
taler_util::{eth_payto_url, eth_to_taler},
SyncState,
};
@@ -35,7 +36,7 @@ use crate::{
LoopResult, WireState,
};
-pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) {
+pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb, state: &WireState) {
let mut lifetime = state.config.wire_lifetime;
let mut status = true;
let mut skip_notification = false;
@@ -51,6 +52,10 @@ pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) {
}
}
+ // Connect
+ let rpc = rpc.client();
+ let db = db.client();
+
let result: LoopResult<()> = (|| {
// Listen to all channels
db.batch_execute("LISTEN new_block; LISTEN new_tx")?;
@@ -66,18 +71,23 @@ pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) {
while iter.next()?.is_some() {}
}
- sync_chain(&mut rpc, &mut db, state, &mut status)?;
+ // Sync chain
+ sync_chain(rpc, db, state, &mut status)?;
+
+ // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent
- while withdraw(&mut db, &mut rpc, state)? {}
+ // Send requested withdraws
+ while withdraw(db, rpc, state)? {}
- while bounce(&mut db, &mut rpc, U256::from(state.config.bounce_fee))? {}
+ // Send requested bounce
+ while bounce(db, rpc, U256::from(state.config.bounce_fee))? {}
Ok(())
})();
if let Err(e) = result {
error!("worker: {}", e);
- skip_notification = false;
+ skip_notification = true;
} else {
skip_notification = false;
}
@@ -150,6 +160,7 @@ fn list_since_block_state(
))
}
+/// Parse new transactions, return true if the database is up to date with the latest mined block
fn sync_chain(
rpc: &mut Rpc,
db: &mut Client,
@@ -215,6 +226,107 @@ fn sync_chain(
)?;
}
}
+ } else if tx.from == Some(state.address) {
+ match OutMetadata::decode(&tx.input) {
+ Ok(metadata) => match metadata {
+ OutMetadata::Withdraw { wtid, .. } => {
+ let amount = eth_to_taler(&tx.value);
+ let credit_addr = tx.to.unwrap();
+ // Get previous out tx
+ let row = db.query_opt(
+ "SELECT id, status, txid FROM tx_out WHERE wtid=$1 FOR UPDATE",
+ &[&wtid.as_ref()],
+ )?;
+ if let Some(row) = row {
+ // If already in database, sync status
+ let row_id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ match WithdrawStatus::try_from(status as u8).unwrap() {
+ WithdrawStatus::Requested => {
+ let nb_row = db.execute(
+ "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3 AND status=$4",
+ &[
+ &(WithdrawStatus::Sent as i16),
+ &tx.hash.as_ref(),
+ &row_id,
+ &status,
+ ],
+ )?;
+ if nb_row > 0 {
+ warn!(
+ ">> (recovered) {} {} in {} to {}",
+ amount,
+ base32(&wtid),
+ hex::encode(tx.hash),
+ hex::encode(credit_addr)
+ );
+ }
+ }
+ WithdrawStatus::Sent => { /* Status is correct */ }
+ }
+ } else {
+ // Else add to database
+ let date = SystemTime::now();
+ let nb = db.execute(
+ "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
+ &[&date, &amount.to_string(), &wtid.as_ref(), ð_payto_url(&state.address).as_ref(), ð_payto_url(&credit_addr).as_ref(), &state.config.base_url.as_ref(), &(WithdrawStatus::Sent as i16), &tx.hash.as_ref(), &None::<&[u8]>],
+ )?;
+ if nb > 0 {
+ warn!(
+ ">> (onchain) {} {} in {} to {}",
+ amount,
+ base32(&wtid),
+ hex::encode(tx.hash),
+ hex::encode(credit_addr)
+ );
+ }
+ }
+ }
+ OutMetadata::Bounce { bounced } => {
+ // Get previous bounce
+ let row = db.query_opt(
+ "SELECT id, status FROM bounce WHERE bounced=$1",
+ &[&bounced.as_ref()],
+ )?;
+ if let Some(row) = row {
+ // If already in database, sync status
+ let row_id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ match BounceStatus::try_from(status as u8).unwrap() {
+ BounceStatus::Requested => {
+ let nb_row = db.execute(
+ "UPDATE bounce SET status=$1, txid=$2 WHERE id=$3 AND status=$4",
+ &[&(BounceStatus::Sent as i16), &tx.hash.as_ref(), &row_id, &status],
+ )?;
+ if nb_row > 0 {
+ warn!(
+ "|| (recovered) {} in {}",
+ &bounced,
+ hex::encode(tx.hash)
+ );
+ }
+ }
+ BounceStatus::Ignored => error!(
+ "watcher: ignored bounce {} found in chain at {}",
+ bounced,
+ hex::encode(tx.hash)
+ ),
+ BounceStatus::Sent => { /* Status is correct */ }
+ }
+ } else {
+ // Else add to database
+ let nb = db.execute(
+ "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING",
+ &[&bounced.as_ref(), &tx.hash.as_ref(), &(BounceStatus::Sent as i16)],
+ )?;
+ if nb > 0 {
+ warn!("|| (onchain) {} in {}", &bounced, hex::encode(tx.hash));
+ }
+ }
+ }
+ },
+ Err(_) => { /* Ignore */ }
+ }
}
}
@@ -238,7 +350,7 @@ fn sync_chain_removed(
// - An incoming invalid transactions already bounced
// Those two cases can compromise ethereum backing
// Removed outgoing transactions will be retried automatically by the node
-
+
let mut blocking_deposit = Vec::new();
let mut blocking_bounce = Vec::new();
diff --git a/eth-wire/src/main.rs b/eth-wire/src/main.rs
@@ -18,10 +18,11 @@ use std::sync::atomic::AtomicU16;
use common::{
config::{load_eth_config, EthConfig},
- postgres::{self, Client, NoTls},
+ postgres,
+ reconnect::auto_reconnect_db,
};
use eth_wire::{
- rpc::{self, Rpc},
+ rpc::{self, auto_reconnect_rpc},
taler_util::eth_payto_addr,
};
use ethereum_types::H160;
@@ -62,33 +63,12 @@ fn main() {
config,
}));
- let mut rpc_worker = Rpc::new(
- state
- .config
- .core
- .data_dir
- .as_ref()
- .unwrap()
- .join("geth.ipc"),
- )
- .unwrap();
-
- rpc_worker
- .unlock_account(&state.address, "password")
- .unwrap();
-
- let rpc_watcher = Rpc::new(
- state
- .config
- .core
- .data_dir
- .as_ref()
- .unwrap()
- .join("geth.ipc"),
- )
- .unwrap();
- let db_watcher = Client::connect(&state.config.core.db_url, NoTls).unwrap();
- let db_worker = Client::connect(&state.config.core.db_url, NoTls).unwrap();
+ let rpc_worker = auto_reconnect_rpc(state.config.core.data_dir.clone().unwrap(), state.address);
+ let rpc_watcher =
+ auto_reconnect_rpc(state.config.core.data_dir.clone().unwrap(), state.address);
+
+ let db_watcher = auto_reconnect_db(state.config.core.db_url.clone());
+ let db_worker = auto_reconnect_db(state.config.core.db_url.clone());
std::thread::spawn(move || watcher(rpc_watcher, db_watcher));
diff --git a/eth-wire/src/rpc.rs b/eth-wire/src/rpc.rs
@@ -19,7 +19,7 @@
//! We only parse the thing we actually use, this reduce memory usage and
//! make our code more compatible with future deprecation
-use common::url::Url;
+use common::{log::log::error, reconnect::AutoReconnect, url::Url};
use ethereum_types::{Address, H256, U256, U64};
use serde::de::DeserializeOwned;
use serde_json::error::Category;
@@ -33,6 +33,22 @@ use std::{
use self::hex::Hex;
+pub type AutoReconnectRPC = AutoReconnect<(PathBuf, Address), Rpc>;
+
+pub fn auto_reconnect_rpc(data_dir: PathBuf, address: Address) -> AutoReconnectRPC {
+ AutoReconnect::new(
+ (data_dir.join("geth.ipc"), address),
+ |(path, address)| {
+ let mut rpc = Rpc::new(path)
+ .map_err(|err| error!("connect RPC: {}", err))
+ .ok()?;
+ rpc.unlock_account(address, "password").ok()?;
+ Some(rpc)
+ },
+ |client| client.node_info().is_err(),
+ )
+}
+
#[derive(Debug, serde::Serialize)]
struct RpcRequest<'a, T: serde::Serialize> {
method: &'a str,
@@ -232,6 +248,10 @@ impl Rpc {
self.call("eth_getBlockByNumber", &("latest", &true))
}
+ pub fn earliest_block(&mut self) -> Result<Block> {
+ self.call("eth_getBlockByNumber", &("earliest", &true))
+ }
+
pub fn balance(&mut self, addr: &Address) -> Result<U256> {
self.call("eth_getBalance", &(addr, "latest"))
}
diff --git a/makefile b/makefile
@@ -22,6 +22,7 @@ test_btc: install
test_eth: install
test/eth/wire.sh
test/eth/lifetime.sh
+ test/eth/reconnect.sh
test/eth/reorg.sh
test: install test_gateway test_eth test_btc
\ No newline at end of file
diff --git a/test/btc/analysis.sh b/test/btc/analysis.sh
@@ -26,7 +26,7 @@ echo ""
echo "----- Learn from reorg -----"
echo "Loose second bitcoin node"
-btc2_deco
+btc_deco
echo -n "Making wire transfer to exchange:"
btc-wire-utils -d $WIRE_DIR transfer 0.042 > /dev/null
@@ -36,7 +36,7 @@ echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
gateway_up
-btc2_fork 5
+btc_fork 5
check_balance 9.95799209 0.00000000
gateway_down
echo " OK"
@@ -48,7 +48,7 @@ gateway_up
echo " OK"
echo "Loose second bitcoin node"
-btc2_deco
+btc_deco
echo -n "Making wire transfer to exchange:"
btc-wire-utils -d $WIRE_DIR transfer 0.064 > /dev/null
@@ -58,7 +58,7 @@ echo " OK"
echo -n "Perform fork and check btc-wire learned from previous attack:"
gateway_up
-btc2_fork 5
+btc_fork 5
check_balance 9.89398418 0.10600000
gateway_up
echo " OK"
diff --git a/test/btc/bumpfee.sh b/test/btc/bumpfee.sh
@@ -59,7 +59,7 @@ echo " OK"
echo "----- Bump fee reorg -----"
echo "Loose second bitcoin node"
-btc2_deco
+btc_deco
echo -n "Making wire transfer from exchange:"
taler-exchange-wire-gateway-client \
@@ -71,7 +71,7 @@ check_balance 5.80383389 4.19196020
echo " OK"
echo -n "Perform fork and bump relay fee:"
-btc2_fork 6
+btc_fork 6
restart_btc -minrelaytxfee=0.0002
mine_btc
echo " OK"
diff --git a/test/btc/hell.sh b/test/btc/hell.sh
@@ -26,7 +26,7 @@ echo ""
echo "----- Handle reorg conflicting incoming receive -----"
echo "Loose second bitcoin node"
-btc2_deco
+btc_deco
echo -n "Gen incoming transactions:"
btc-wire-utils -d $WIRE_DIR transfer 0.0042 > /dev/null
@@ -36,7 +36,7 @@ echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
gateway_up
-btc2_fork 5
+btc_fork 5
check_balance 9.99579209 0.00000000
gateway_down
echo " OK"
@@ -79,7 +79,7 @@ echo ""
echo "----- Handle reorg conflicting incoming bounce -----"
echo "Loose second bitcoin node"
-btc2_deco
+btc_deco
echo -n "Generate bounce:"
$BTC_CLI -rpcwallet=client sendtoaddress $WIRE 0.042 > /dev/null
@@ -89,7 +89,7 @@ echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
gateway_up
-btc2_fork 5
+btc_fork 5
check_balance 9.95799859 0.00000000
gateway_down
echo " OK"
diff --git a/test/btc/reconnect.sh b/test/btc/reconnect.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-## Check the capacity of wire_gateway and btc_wire to recover from database loss
+## Check the capacity of wire-gateway and btc-wire to recover from database and node loss
set -eu
@@ -27,12 +27,13 @@ btc-wire-utils -d $WIRE_DIR transfer 0.000042 > /dev/null
next_btc
check_balance 9.99995009 0.00004200
echo -n "Requesting exchange incoming transaction list:"
-taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i | grep BTC:0.000042 > /dev/null && echo " OK" || echo " Failed"
+check_delta "incoming?delta=-100" "echo 42"
+echo "OK"
echo "----- Without DB -----"
echo "Stop database"
-pg_ctl stop -D $DB_DIR > /dev/null
+stop_db
echo "Making incomplete wire transfer to exchange"
$BTC_CLI -rpcwallet=client sendtoaddress $WIRE 0.00042 &> /dev/null
echo -n "Making wire transfer to exchange:"
@@ -41,19 +42,17 @@ next_btc
check_balance 9.99948077 0.00050200
echo " OK"
echo "Stop bitcoin node"
-stop_btc
+stop_node
echo -n "Requesting exchange incoming transaction list:"
taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i 2>&1 | grep -q "504" && echo " OK" || echo " Failed"
echo "----- Reconnect DB -----"
echo "Start database"
-pg_ctl start -D $DB_DIR > /dev/null
-echo "Start bitcoin node"
+start_db
+echo "Resume bitcoin node"
resume_btc
sleep 6 # Wait for connection to be available
-echo -n "Requesting exchange incoming transaction list:"
-taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i | grep BTC:0.00004 > /dev/null && echo " OK" || echo " Failed"
echo -n "Making wire transfer from exchange:"
taler-exchange-wire-gateway-client \
-b $BANK_ENDPOINT \
@@ -65,7 +64,7 @@ check_balance 9.99990892 0.00007001
echo " OK"
echo -n "Requesting exchange's outgoing transaction list:"
-taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -o | grep BTC:0.00002 > /dev/null
+check_delta "outgoing?delta=-100" "echo 2"
echo " OK"
echo "----- Recover DB -----"
@@ -76,20 +75,15 @@ mine_btc # Trigger worker
sleep 2
echo -n "Checking recover incoming transactions:"
-ALL=`taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i`;
-for amount in 0.000042 0.00004; do
- echo $ALL | grep BTC:$amount > /dev/null && echo -n " OK" || echo -n " Failed"
-done
-echo ""
+check_delta "incoming?delta=-100" "echo 42 4"
+echo " OK"
echo -n "Requesting exchange's outgoing transaction list:"
-ALL=`taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -o`;
-for amount in 0.00002; do
- echo $ALL | grep BTC:$amount > /dev/null && echo -n " OK" || echo -n " Failed"
-done
-echo ""
+check_delta "outgoing?delta=-100" "echo 2"
+echo " OK"
-# Balance should not have changed
+echo -n "Balance should not have changed:"
check_balance 9.99990892 0.00007001
+echo " OK"
echo "All tests passed!"
\ No newline at end of file
diff --git a/test/btc/reorg.sh b/test/btc/reorg.sh
@@ -28,7 +28,7 @@ SEQ="seq 10 20"
echo "----- Handle reorg incoming transactions -----"
echo "Loose second bitcoin node"
-btc2_deco
+btc_deco
echo -n "Gen incoming transactions:"
for n in `$SEQ`; do
@@ -42,7 +42,7 @@ echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
gateway_up
-btc2_fork 22
+btc_fork 22
check_balance 9.99826299 0.00000000
gateway_down
echo " OK"
@@ -56,7 +56,7 @@ echo " OK"
echo "----- Handle reorg outgoing transactions -----"
echo "Loose second bitcoin node"
-btc2_deco
+btc_deco
echo -n "Gen outgoing transactions:"
for n in `$SEQ`; do
@@ -73,7 +73,7 @@ echo " OK"
echo -n "Perform fork and check btc-wire still up:"
gateway_up
-btc2_fork 22
+btc_fork 22
check_balance 9.99826299 0.00146311
gateway_up
echo " OK"
@@ -89,7 +89,7 @@ clear_wallet
check_balance "*" 0.00000000
echo "Loose second bitcoin node"
-btc2_deco
+btc_deco
echo -n "Generate bounce:"
for n in `$SEQ`; do
@@ -103,7 +103,7 @@ echo " OK"
echo -n "Perform fork and check btc-wire hard error:"
gateway_up
-btc2_fork 22
+btc_fork 22
check_balance "*" 0.00000000
gateway_down
echo " OK"
diff --git a/test/btc/stress.sh b/test/btc/stress.sh
@@ -68,7 +68,6 @@ next_btc # Mine transactions
echo "----- Recover DB -----"
echo "Reset database"
-sleep 5
reset_db
mine_btc # Trigger worker
sleep 10
@@ -110,7 +109,6 @@ echo " OK"
echo "----- Recover DB -----"
echo "Reset database"
-sleep 5
reset_db
mine_btc # Trigger worker
sleep 10
diff --git a/test/common.sh b/test/common.sh
@@ -38,6 +38,8 @@ BTC_CLI2="bitcoin-cli -datadir=$WIRE_DIR2"
ETH_CLI="geth -datadir=$WIRE_DIR"
ETH_CLI2="geth -datadir=$WIRE_DIR2"
+# ----- Common ----- #
+
# Load test.conf as bash variables
function load_config() {
cp ${BASH_SOURCE%/*}/conf/$CONFIG $CONF
@@ -73,6 +75,11 @@ function check_down() {
fi
}
+function stop_node() {
+ kill $NODE_PID
+ wait $NODE_PID
+}
+
# ----- Database ----- #
# Create new postgresql cluster and init database schema
@@ -86,16 +93,27 @@ function setup_db() {
# Erase database
function reset_db() {
+ sleep 5 # Wait for loop to stop
$WIRE_UTILS resetdb $CONF
}
+# Stop database
+function stop_db() {
+ pg_ctl stop -D $DB_DIR >> log/postgres.log
+}
+
+# Start database
+function start_db() {
+ pg_ctl start -D $DB_DIR >> log/postgres.log
+}
+
# ----- Bitcoin node ----- #
# Start a bitcoind regtest node, generate money, wallet and addresses
function init_btc() {
cp ${BASH_SOURCE%/*}/conf/${BTC_CONFIG:-bitcoin.conf} $WIRE_DIR/bitcoin.conf
bitcoind -datadir=$WIRE_DIR $* &>> log/node.log &
- BTC_PID="$!"
+ NODE_PID="$!"
# Wait for RPC server to be online
$BTC_CLI -rpcwait getnetworkinfo > /dev/null
# Create wire wallet
@@ -123,12 +141,12 @@ function init_btc2() {
}
# Disconnect the two nodes
-function btc2_deco() {
+function btc_deco() {
$BTC_CLI disconnectnode 127.0.0.1:8346
}
# Create a fork on the second node and reconnect the two node
-function btc2_fork() {
+function btc_fork() {
$BTC_CLI2 generatetoaddress $1 $RESERVE > /dev/null
$BTC_CLI addnode 127.0.0.1:8346 onetry
sleep 1
@@ -138,7 +156,7 @@ function btc2_fork() {
function resume_btc() {
# Restart node
bitcoind -datadir=$WIRE_DIR $* &>> log/node.log &
- BTC_PID="$!"
+ NODE_PID="$!"
# Load wallets
for wallet in wire client reserve; do
$BTC_CLI -rpcwait loadwallet $wallet > /dev/null
@@ -147,13 +165,8 @@ function resume_btc() {
$BTC_CLI addnode 127.0.0.1:8346 onetry
}
-function stop_btc() {
- kill $BTC_PID
- wait $BTC_PID
-}
-
function restart_btc() {
- stop_btc
+ stop_node
resume_btc $*
}
@@ -196,16 +209,16 @@ function check_balance() {
# Start btc-wire
function btc_wire() {
- cargo build --bin btc-wire --release &> log/cargo.log
- target/release/btc-wire $CONF &> log/wire.log &
+ cargo build --bin btc-wire --release &>> log/cargo.log
+ target/release/btc-wire $CONF &>> log/wire.log &
WIRE_PID="$!"
}
# Start multiple btc_wire with random failures in parallel
function stress_btc_wire() {
- cargo build --bin btc-wire --release --features fail &> log/cargo.log
- target/release/btc-wire $CONF &> log/wire.log &
- target/release/btc-wire $CONF &> log/wire1.log &
+ cargo build --bin btc-wire --release --features fail &>> log/cargo.log
+ target/release/btc-wire $CONF &>> log/wire.log &
+ target/release/btc-wire $CONF &>> log/wire1.log &
}
# ----- Ethereum node ----- #
@@ -242,9 +255,10 @@ function init_eth() {
}
}" > $DIR/genesis.json
# Initialize blockchain
- $ETH_CLI init $DIR/genesis.json &> log/node.log
+ $ETH_CLI init $DIR/genesis.json &>> log/node.log
# Start node
- $ETH_CLI --miner.recommit 0s --miner.gasprice 0 $* &> log/node.log &
+ $ETH_CLI --miner.recommit 0s --miner.gasprice 0 $* &>> log/node.log &
+ NODE_PID="$!"
sleep 1
# Create wire address
WIRE=`eth-wire-cli initwallet $CONF | grep -oP '(?<=is ).*'`
@@ -254,9 +268,9 @@ function init_eth() {
# Start a seconf geth dev node connected to the first one
function init_eth2() {
# Initialize blockchain
- $ETH_CLI2 init $DIR/genesis.json &> log/node2.log
+ $ETH_CLI2 init $DIR/genesis.json &>> log/node2.log
# Start node
- $ETH_CLI2 --port 30305 --miner.recommit 0s --miner.gasprice 0 $* &> log/node2.log &
+ $ETH_CLI2 --port 30305 --miner.recommit 0s --miner.gasprice 0 $* &>> log/node2.log &
sleep 1
# Create etherbase account for mining
$ETH_CLI2 account new --password <(echo "password") &> /dev/null
@@ -265,17 +279,32 @@ function init_eth2() {
}
# Disconnect the two nodes
-function eth2_deco() {
+function eth_deco() {
$WIRE_UTILS disconnect $WIRE_DIR2
}
# Create a fork on the second node and reconnect the two node
-function eth2_fork() {
+function eth_fork() {
$WIRE_UTILS2 mine $RESERVE ${1:-}
$WIRE_UTILS connect $WIRE_DIR2
sleep 5
}
+# Restart an initialized geth dev node
+function resume_eth() {
+ # Start node
+ $ETH_CLI --port 30305 --miner.recommit 0s --miner.gasprice 0 $* &>> log/node2.log &
+ NODE_PID="$!"
+ sleep 1
+ # Try to connect nodes
+ $WIRE_UTILS connect $WIRE_DIR2 &> /dev/null || true
+}
+
+function restart_eth() {
+ stop_node
+ resume_eth $*
+}
+
# Check client and wire balance
function check_balance_eth() {
local CLIENT_BALANCE=`$WIRE_UTILS balance $CLIENT`
@@ -295,8 +324,8 @@ function check_balance_eth() {
# Start eth-wire
function eth_wire() {
- cargo build --bin eth-wire --release &> log/cargo.log
- target/release/eth-wire $CONF &> log/wire.log &
+ cargo build --bin eth-wire --release &>> log/cargo.log
+ target/release/eth-wire $CONF &>> log/wire.log &
WIRE_PID="$!"
}
diff --git a/test/eth/reconnect.sh b/test/eth/reconnect.sh
@@ -0,0 +1,93 @@
+#!/bin/bash
+
+## Check the capacity of wire-gateway and eth-wire to recover from database and node loss
+
+set -eu
+
+source "${BASH_SOURCE%/*}/../common.sh"
+SCHEMA=eth.sql
+CONFIG=taler_eth.conf
+
+echo "----- Setup -----"
+echo "Load config file"
+load_config
+echo "Start database"
+setup_db
+echo "Start ethereum node"
+init_eth
+echo "Start eth-wire"
+eth_wire
+echo "Start gateway"
+gateway
+echo ""
+
+echo "----- With DB -----"
+
+echo -n "Making wire transfer to exchange:"
+$WIRE_UTILS deposit $CLIENT $WIRE 0.0000 42
+next_eth # Trigger eth-wire
+check_balance_eth 999995800 4200
+echo " OK"
+
+echo -n "Requesting exchange incoming transaction list:"
+check_delta "incoming?delta=-100" "echo 42"
+echo " OK"
+
+echo "----- Without DB -----"
+
+echo "Stop database"
+stop_db
+echo "Making incomplete wire transfer to exchange"
+$WIRE_UTILS send $CLIENT $WIRE 0.0000 42
+echo -n "Making wire transfer to exchange:"
+$WIRE_UTILS deposit $CLIENT $WIRE 0.0000 4
+next_eth
+check_balance_eth 999987600 12400
+echo "OK"
+echo "Stop ethereum node"
+stop_node
+echo -n "Requesting exchange incoming transaction list:"
+taler-exchange-wire-gateway-client -b $BANK_ENDPOINT -i 2>&1 | grep -q "504" && echo " OK" || echo " Failed"
+
+echo "----- Reconnect DB -----"
+
+echo "Start database"
+start_db
+echo "Resume ethereum node"
+resume_eth
+sleep 6 # Wait for connection to be available
+echo -n "Making wire transfer from exchange:"
+ taler-exchange-wire-gateway-client \
+ -b $BANK_ENDPOINT \
+ -C payto://ethereum/$CLIENT \
+ -a ETH:0.00002 > /dev/null
+sleep 1
+mine_eth # Mine transactions
+check_balance_eth 999992800 7200
+echo " OK"
+
+echo -n "Requesting exchange's outgoing transaction list:"
+check_delta "outgoing?delta=-100" "echo 2"
+echo " OK"
+
+echo "----- Recover DB -----"
+
+echo "Reset database"
+reset_db
+mine_eth 1 # Trigger worker
+sleep 2
+
+echo -n "Checking recover incoming transactions:"
+check_delta "incoming?delta=-100" "echo 42 4"
+echo " OK"
+
+echo -n "Requesting exchange's outgoing transaction list:"
+check_delta "outgoing?delta=-100" "echo 2"
+echo " OK"
+
+echo -n "Balance should not have changed:"
+check_balance_eth 999992800 7200
+echo " OK"
+
+
+echo "All tests passed!"
+\ No newline at end of file
diff --git a/test/eth/reorg.sh b/test/eth/reorg.sh
@@ -28,7 +28,7 @@ SEQ="seq 10 20"
echo "----- Handle reorg incoming transactions -----"
echo "Loose second ethereum node"
-eth2_deco
+eth_deco
echo -n "Making wire transfer to exchange:"
eth-wire-utils -d $WIRE_DIR deposit $CLIENT $WIRE 0.000 `$SEQ`
@@ -39,7 +39,7 @@ echo " OK"
echo -n "Perform fork and check eth-wire hard error:"
gateway_up
-eth2_fork 10
+eth_fork 10
check_balance_eth 1000000000 0
gateway_down
echo " OK"
@@ -53,7 +53,7 @@ echo " OK"
echo "----- Handle reorg outgoing transactions -----"
echo "Loose second ethereum node"
-eth2_deco
+eth_deco
echo -n "Making wire transfer from exchange:"
for n in `$SEQ`; do
@@ -70,7 +70,7 @@ echo " OK"
echo -n "Perform fork and check eth-wire still up:"
gateway_up
-eth2_fork 10
+eth_fork 10
check_balance_eth 999835000 165000
gateway_up
echo " OK"
@@ -83,7 +83,7 @@ echo " OK"
echo "----- Handle reorg bounce -----"
echo "Loose second ethereum node"
-eth2_deco
+eth_deco
echo -n "Bounce:"
eth-wire-utils -d $WIRE_DIR send $CLIENT $WIRE 0.000 `$SEQ`
@@ -94,7 +94,7 @@ echo " OK"
echo -n "Perform fork and check eth-wire hard error:"
gateway_up
-eth2_fork 10
+eth_fork 10
check_balance_eth 999851500 148500
gateway_down
echo " OK"