summaryrefslogtreecommitdiff
path: root/eth-wire
diff options
context:
space:
mode:
authorAntoine A <>2022-02-08 15:31:37 +0100
committerAntoine A <>2022-02-08 15:31:37 +0100
commit0a63ad8bcd97be33fbfa87e358de3bd99891300c (patch)
tree211b69bb6bc7d4bdfedad3af6b0c20aca3bc2618 /eth-wire
parentf1aaee3fa20cfaa595eb4d3716d32d918b3c2dae (diff)
downloaddepolymerization-0a63ad8bcd97be33fbfa87e358de3bd99891300c.tar.gz
depolymerization-0a63ad8bcd97be33fbfa87e358de3bd99891300c.tar.bz2
depolymerization-0a63ad8bcd97be33fbfa87e358de3bd99891300c.zip
eth-wire: handle RPC and database reconnection
Diffstat (limited to 'eth-wire')
-rw-r--r--eth-wire/src/bin/eth-wire-utils.rs40
-rw-r--r--eth-wire/src/loops/watcher.rs25
-rw-r--r--eth-wire/src/loops/worker.rs130
-rw-r--r--eth-wire/src/main.rs38
-rw-r--r--eth-wire/src/rpc.rs22
5 files changed, 204 insertions, 51 deletions
diff --git a/eth-wire/src/bin/eth-wire-utils.rs b/eth-wire/src/bin/eth-wire-utils.rs
index 26f5ddd..1d72dd7 100644
--- a/eth-wire/src/bin/eth-wire-utils.rs
+++ b/eth-wire/src/bin/eth-wire-utils.rs
@@ -15,10 +15,17 @@
*/
use std::{path::PathBuf, str::FromStr};
-use common::{api_common::Amount, log::init, rand_slice};
+use common::{
+ api_common::Amount,
+ config::{Config, CoreConfig},
+ log::init,
+ postgres::{Client, NoTls},
+ rand_slice,
+};
use eth_wire::{
rpc::{hex::Hex, Rpc, TransactionRequest},
taler_util::taler_to_eth,
+ SyncState,
};
use ethereum_types::H160;
@@ -38,7 +45,7 @@ enum Cmd {
Send(SendCmd),
Deposit(DepositCmd),
Mine(MineCmd),
- ClearDB(ClearCmd),
+ ResetDb(ResetCmd),
Balance(BalanceCmd),
Connect(ConnectCmd),
Disconnect(DisconnectCmd),
@@ -99,12 +106,12 @@ struct MineCmd {
}
#[derive(argh::FromArgs)]
-#[argh(subcommand, name = "cleardb")]
+#[argh(subcommand, name = "resetdb")]
/// Clear database
-struct ClearCmd {
+struct ResetCmd {
#[argh(positional)]
/// taler config
- config: PathBuf,
+ config: String,
}
#[derive(argh::FromArgs)]
@@ -197,7 +204,28 @@ fn main() {
let balance = rpc.balance(&addr).unwrap();
println!("{}", (balance / 10_000_000_000u64).as_u64());
}
- Cmd::ClearDB(_) => todo!(),
+ Cmd::ResetDb(ResetCmd { config }) => {
+ let config = CoreConfig::load_taler_config(Some(&config));
+ let block = rpc.earliest_block().unwrap();
+ let mut db = Client::connect(&config.db_url, NoTls).unwrap();
+ let mut tx = db.transaction().unwrap();
+ // Clear transaction tables and reset state
+ tx.execute("DELETE FROM tx_in", &[]).unwrap();
+ tx.execute("DELETE FROM tx_out", &[]).unwrap();
+ tx.execute("DELETE FROM bounce", &[]).unwrap();
+ tx.execute(
+ "UPDATE state SET value=$1 WHERE name='sync'",
+ &[&SyncState {
+ tip_hash: block.hash.unwrap(),
+ tip_height: block.number.unwrap(),
+ conf_height: block.number.unwrap(),
+ }
+ .to_bytes()
+ .as_ref()],
+ )
+ .unwrap();
+ tx.commit().unwrap();
+ }
Cmd::Connect(ConnectCmd { datadir }) => {
let mut peer = Rpc::new(datadir.join("geth.ipc")).unwrap();
let mut enode = peer.node_info().unwrap().enode;
diff --git a/eth-wire/src/loops/watcher.rs b/eth-wire/src/loops/watcher.rs
index 42b3ecb..05396ab 100644
--- a/eth-wire/src/loops/watcher.rs
+++ b/eth-wire/src/loops/watcher.rs
@@ -13,13 +13,26 @@
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use eth_wire::rpc::Rpc;
-use common::postgres::Client;
+use common::{log::log::error, reconnect::AutoReconnectDb};
+use eth_wire::rpc::AutoReconnectRPC;
-pub fn watcher(mut rpc: Rpc, mut db: Client) {
- let mut notifier = rpc.subscribe_new_head().unwrap();
+use crate::LoopResult;
+
+/// Wait for new block and notify arrival with postgreSQL notifications
+pub fn watcher(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb) {
loop {
- db.execute("NOTIFY new_block", &[]).unwrap();
- notifier.next().unwrap();
+ let rpc = rpc.client();
+ let db = db.client();
+
+ let result: LoopResult<()> = (|| {
+ let mut notifier = rpc.subscribe_new_head()?;
+ loop {
+ db.execute("NOTIFY new_block", &[])?;
+ notifier.next()?;
+ }
+ })();
+ if let Err(e) = result {
+ error!("watcher: {}", e);
+ }
}
}
diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs
index b812c10..8f70a24 100644
--- a/eth-wire/src/loops/worker.rs
+++ b/eth-wire/src/loops/worker.rs
@@ -17,14 +17,15 @@ use std::{fmt::Write, sync::atomic::Ordering, time::SystemTime};
use common::{
api_common::base32,
- log::log::{error, info},
+ log::log::{error, info, warn},
postgres::{fallible_iterator::FallibleIterator, Client},
+ reconnect::AutoReconnectDb,
sql::{sql_array, sql_url},
status::{BounceStatus, WithdrawStatus},
};
use eth_wire::{
- metadata::InMetadata,
- rpc::{self, Rpc, Transaction},
+ metadata::{InMetadata, OutMetadata},
+ rpc::{self, AutoReconnectRPC, Rpc, Transaction},
taler_util::{eth_payto_url, eth_to_taler},
SyncState,
};
@@ -35,7 +36,7 @@ use crate::{
LoopResult, WireState,
};
-pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) {
+pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb, state: &WireState) {
let mut lifetime = state.config.wire_lifetime;
let mut status = true;
let mut skip_notification = false;
@@ -51,6 +52,10 @@ pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) {
}
}
+ // Connect
+ let rpc = rpc.client();
+ let db = db.client();
+
let result: LoopResult<()> = (|| {
// Listen to all channels
db.batch_execute("LISTEN new_block; LISTEN new_tx")?;
@@ -66,18 +71,23 @@ pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) {
while iter.next()?.is_some() {}
}
- sync_chain(&mut rpc, &mut db, state, &mut status)?;
+ // Sync chain
+ sync_chain(rpc, db, state, &mut status)?;
+
+ // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent
- while withdraw(&mut db, &mut rpc, state)? {}
+ // Send requested withdraws
+ while withdraw(db, rpc, state)? {}
- while bounce(&mut db, &mut rpc, U256::from(state.config.bounce_fee))? {}
+ // Send requested bounce
+ while bounce(db, rpc, U256::from(state.config.bounce_fee))? {}
Ok(())
})();
if let Err(e) = result {
error!("worker: {}", e);
- skip_notification = false;
+ skip_notification = true;
} else {
skip_notification = false;
}
@@ -150,6 +160,7 @@ fn list_since_block_state(
))
}
+/// Parse new transactions, return true if the database is up to date with the latest mined block
fn sync_chain(
rpc: &mut Rpc,
db: &mut Client,
@@ -215,6 +226,107 @@ fn sync_chain(
)?;
}
}
+ } else if tx.from == Some(state.address) {
+ match OutMetadata::decode(&tx.input) {
+ Ok(metadata) => match metadata {
+ OutMetadata::Withdraw { wtid, .. } => {
+ let amount = eth_to_taler(&tx.value);
+ let credit_addr = tx.to.unwrap();
+ // Get previous out tx
+ let row = db.query_opt(
+ "SELECT id, status, txid FROM tx_out WHERE wtid=$1 FOR UPDATE",
+ &[&wtid.as_ref()],
+ )?;
+ if let Some(row) = row {
+ // If already in database, sync status
+ let row_id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ match WithdrawStatus::try_from(status as u8).unwrap() {
+ WithdrawStatus::Requested => {
+ let nb_row = db.execute(
+ "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3 AND status=$4",
+ &[
+ &(WithdrawStatus::Sent as i16),
+ &tx.hash.as_ref(),
+ &row_id,
+ &status,
+ ],
+ )?;
+ if nb_row > 0 {
+ warn!(
+ ">> (recovered) {} {} in {} to {}",
+ amount,
+ base32(&wtid),
+ hex::encode(tx.hash),
+ hex::encode(credit_addr)
+ );
+ }
+ }
+ WithdrawStatus::Sent => { /* Status is correct */ }
+ }
+ } else {
+ // Else add to database
+ let date = SystemTime::now();
+ let nb = db.execute(
+ "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
+ &[&date, &amount.to_string(), &wtid.as_ref(), &eth_payto_url(&state.address).as_ref(), &eth_payto_url(&credit_addr).as_ref(), &state.config.base_url.as_ref(), &(WithdrawStatus::Sent as i16), &tx.hash.as_ref(), &None::<&[u8]>],
+ )?;
+ if nb > 0 {
+ warn!(
+ ">> (onchain) {} {} in {} to {}",
+ amount,
+ base32(&wtid),
+ hex::encode(tx.hash),
+ hex::encode(credit_addr)
+ );
+ }
+ }
+ }
+ OutMetadata::Bounce { bounced } => {
+ // Get previous bounce
+ let row = db.query_opt(
+ "SELECT id, status FROM bounce WHERE bounced=$1",
+ &[&bounced.as_ref()],
+ )?;
+ if let Some(row) = row {
+ // If already in database, sync status
+ let row_id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ match BounceStatus::try_from(status as u8).unwrap() {
+ BounceStatus::Requested => {
+ let nb_row = db.execute(
+ "UPDATE bounce SET status=$1, txid=$2 WHERE id=$3 AND status=$4",
+ &[&(BounceStatus::Sent as i16), &tx.hash.as_ref(), &row_id, &status],
+ )?;
+ if nb_row > 0 {
+ warn!(
+ "|| (recovered) {} in {}",
+ &bounced,
+ hex::encode(tx.hash)
+ );
+ }
+ }
+ BounceStatus::Ignored => error!(
+ "watcher: ignored bounce {} found in chain at {}",
+ bounced,
+ hex::encode(tx.hash)
+ ),
+ BounceStatus::Sent => { /* Status is correct */ }
+ }
+ } else {
+ // Else add to database
+ let nb = db.execute(
+ "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING",
+ &[&bounced.as_ref(), &tx.hash.as_ref(), &(BounceStatus::Sent as i16)],
+ )?;
+ if nb > 0 {
+ warn!("|| (onchain) {} in {}", &bounced, hex::encode(tx.hash));
+ }
+ }
+ }
+ },
+ Err(_) => { /* Ignore */ }
+ }
}
}
@@ -238,7 +350,7 @@ fn sync_chain_removed(
// - An incoming invalid transactions already bounced
// Those two cases can compromise ethereum backing
// Removed outgoing transactions will be retried automatically by the node
-
+
let mut blocking_deposit = Vec::new();
let mut blocking_bounce = Vec::new();
diff --git a/eth-wire/src/main.rs b/eth-wire/src/main.rs
index 8165c4a..65e1a23 100644
--- a/eth-wire/src/main.rs
+++ b/eth-wire/src/main.rs
@@ -18,10 +18,11 @@ use std::sync::atomic::AtomicU16;
use common::{
config::{load_eth_config, EthConfig},
- postgres::{self, Client, NoTls},
+ postgres,
+ reconnect::auto_reconnect_db,
};
use eth_wire::{
- rpc::{self, Rpc},
+ rpc::{self, auto_reconnect_rpc},
taler_util::eth_payto_addr,
};
use ethereum_types::H160;
@@ -62,33 +63,12 @@ fn main() {
config,
}));
- let mut rpc_worker = Rpc::new(
- state
- .config
- .core
- .data_dir
- .as_ref()
- .unwrap()
- .join("geth.ipc"),
- )
- .unwrap();
-
- rpc_worker
- .unlock_account(&state.address, "password")
- .unwrap();
-
- let rpc_watcher = Rpc::new(
- state
- .config
- .core
- .data_dir
- .as_ref()
- .unwrap()
- .join("geth.ipc"),
- )
- .unwrap();
- let db_watcher = Client::connect(&state.config.core.db_url, NoTls).unwrap();
- let db_worker = Client::connect(&state.config.core.db_url, NoTls).unwrap();
+ let rpc_worker = auto_reconnect_rpc(state.config.core.data_dir.clone().unwrap(), state.address);
+ let rpc_watcher =
+ auto_reconnect_rpc(state.config.core.data_dir.clone().unwrap(), state.address);
+
+ let db_watcher = auto_reconnect_db(state.config.core.db_url.clone());
+ let db_worker = auto_reconnect_db(state.config.core.db_url.clone());
std::thread::spawn(move || watcher(rpc_watcher, db_watcher));
diff --git a/eth-wire/src/rpc.rs b/eth-wire/src/rpc.rs
index e626cd6..cff9b86 100644
--- a/eth-wire/src/rpc.rs
+++ b/eth-wire/src/rpc.rs
@@ -19,7 +19,7 @@
//! We only parse the thing we actually use, this reduce memory usage and
//! make our code more compatible with future deprecation
-use common::url::Url;
+use common::{log::log::error, reconnect::AutoReconnect, url::Url};
use ethereum_types::{Address, H256, U256, U64};
use serde::de::DeserializeOwned;
use serde_json::error::Category;
@@ -33,6 +33,22 @@ use std::{
use self::hex::Hex;
+pub type AutoReconnectRPC = AutoReconnect<(PathBuf, Address), Rpc>;
+
+pub fn auto_reconnect_rpc(data_dir: PathBuf, address: Address) -> AutoReconnectRPC {
+ AutoReconnect::new(
+ (data_dir.join("geth.ipc"), address),
+ |(path, address)| {
+ let mut rpc = Rpc::new(path)
+ .map_err(|err| error!("connect RPC: {}", err))
+ .ok()?;
+ rpc.unlock_account(address, "password").ok()?;
+ Some(rpc)
+ },
+ |client| client.node_info().is_err(),
+ )
+}
+
#[derive(Debug, serde::Serialize)]
struct RpcRequest<'a, T: serde::Serialize> {
method: &'a str,
@@ -232,6 +248,10 @@ impl Rpc {
self.call("eth_getBlockByNumber", &("latest", &true))
}
+ pub fn earliest_block(&mut self) -> Result<Block> {
+ self.call("eth_getBlockByNumber", &("earliest", &true))
+ }
+
pub fn balance(&mut self, addr: &Address) -> Result<U256> {
self.call("eth_getBalance", &(addr, "latest"))
}