commit ee9c60847a8e4aa7daecb588c2f723b6bf272fe6
parent 3e4647fb219a9d98ba31c1331ca18afe837b74ea
Author: Antoine A <>
Date: Thu, 3 Mar 2022 00:58:36 +0100
eth-wire: reuse rpc connection for notifications
Diffstat:
7 files changed, 161 insertions(+), 106 deletions(-)
diff --git a/eth-wire/src/bin/eth-wire-utils.rs b/eth-wire/src/bin/eth-wire-utils.rs
@@ -30,9 +30,9 @@ use common::{
rand_slice,
};
use eth_wire::{
- rpc::{hex::Hex, Rpc, TransactionRequest},
+ rpc::{hex::Hex, Rpc, TransactionRequest, RpcClient},
taler_util::{taler_to_eth, TRUNC},
- SyncState,
+ SyncState, RpcExtended,
};
use ethereum_types::{H160, U256};
@@ -161,15 +161,15 @@ fn main() {
Cmd::Mine { to, mut amount } => {
let to = H160::from_str(&to).unwrap();
rpc.unlock_account(&to, &passwd).ok();
- let mut notifier = rpc.subscribe_new_head().unwrap();
+ let mut rpc = rpc.subscribe_new_head().unwrap();
rpc.miner_start().unwrap();
while !rpc.pending_transactions().unwrap().is_empty() {
- notifier.next().unwrap();
+ rpc.next().unwrap();
amount = amount.saturating_sub(1);
}
for _ in 0..amount {
- notifier.next().unwrap();
+ rpc.next().unwrap();
}
rpc.miner_stop().unwrap();
}
diff --git a/eth-wire/src/lib.rs b/eth-wire/src/lib.rs
@@ -14,20 +14,21 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::{str::FromStr, sync::atomic::AtomicU32};
+use std::{fmt::Debug, str::FromStr, sync::atomic::AtomicU32};
use common::{api_common::Amount, config::EthConfig, url::Url};
use ethereum_types::{Address, H160, H256, U256, U64};
use metadata::{InMetadata, OutMetadata};
-use rpc::{hex::Hex, Transaction};
+use rpc::{hex::Hex, Rpc, RpcClient, RpcStream, Transaction};
+use serde::de::DeserializeOwned;
use taler_util::{eth_payto_addr, taler_to_eth};
pub mod metadata;
pub mod rpc;
pub mod taler_util;
-impl rpc::Rpc {
- pub fn deposit(
+pub trait RpcExtended: RpcClient {
+ fn deposit(
&mut self,
from: Address,
to: Address,
@@ -45,7 +46,7 @@ impl rpc::Rpc {
})
}
- pub fn withdraw(
+ fn withdraw(
&mut self,
from: Address,
to: Address,
@@ -64,7 +65,7 @@ impl rpc::Rpc {
})
}
- pub fn bounce(&mut self, hash: H256, bounce_fee: U256) -> rpc::Result<Option<H256>> {
+ fn bounce(&mut self, hash: H256, bounce_fee: U256) -> rpc::Result<Option<H256>> {
let tx = self
.get_transaction(&hash)?
.expect("Cannot bounce a non existent transaction");
@@ -92,7 +93,7 @@ impl rpc::Rpc {
}
/// List new and removed transaction since the last sync state, returning a new sync state
- pub fn list_since_sync(
+ fn list_since_sync(
&mut self,
address: &Address,
state: SyncState,
@@ -160,6 +161,9 @@ impl rpc::Rpc {
}
}
+impl RpcExtended for Rpc {}
+impl<N: Debug + DeserializeOwned> RpcExtended for RpcStream<'_, N> {}
+
pub struct SyncTransaction {
pub tx: Transaction,
pub confirmations: u32,
diff --git a/eth-wire/src/loops/analysis.rs b/eth-wire/src/loops/analysis.rs
@@ -21,7 +21,7 @@ use common::{
postgres::fallible_iterator::FallibleIterator,
reconnect::AutoReconnectDb,
};
-use eth_wire::rpc::{ Rpc, AutoRpcCommon};
+use eth_wire::rpc::{ Rpc, AutoRpcCommon, RpcClient};
use ethereum_types::{H256, U64};
use crate::WireState;
diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs
@@ -25,9 +25,9 @@ use common::{
};
use eth_wire::{
metadata::{InMetadata, OutMetadata},
- rpc::{self, AutoRpcWallet, Rpc, Transaction, TransactionRequest},
+ rpc::{self, AutoRpcWallet, Rpc, Transaction, TransactionRequest, RpcClient},
taler_util::{eth_payto_url, eth_to_taler},
- SyncState, SyncTransaction,
+ SyncState, SyncTransaction, RpcExtended,
};
use ethereum_types::{Address, H256, U256};
diff --git a/eth-wire/src/main.rs b/eth-wire/src/main.rs
@@ -24,7 +24,7 @@ use common::{
reconnect::auto_reconnect_db,
};
use eth_wire::{
- rpc::{auto_rpc_common, auto_rpc_wallet, Rpc},
+ rpc::{auto_rpc_common, auto_rpc_wallet, Rpc, RpcClient},
SyncState, WireState,
};
use ethereum_types::H160;
diff --git a/eth-wire/src/rpc.rs b/eth-wire/src/rpc.rs
@@ -26,7 +26,6 @@ use serde_json::error::Category;
use std::{
fmt::Debug,
io::{self, BufWriter, ErrorKind, Read, Write},
- marker::PhantomData,
os::unix::net::UnixStream,
path::{Path, PathBuf},
};
@@ -104,7 +103,6 @@ pub trait RpcTrait {}
/// Bitcoin RPC connection
pub struct Rpc {
- path: PathBuf,
id: u64,
conn: BufWriter<UnixStream>,
read_buf: Vec<u8>,
@@ -116,7 +114,6 @@ impl Rpc {
let conn = UnixStream::connect(&path)?;
Ok(Self {
- path: path.as_ref().to_path_buf(),
id: 0,
conn: BufWriter::new(conn),
read_buf: vec![0u8; 8 * 1024],
@@ -183,13 +180,12 @@ impl Rpc {
}
}
- fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T>
- where
- T: serde::de::DeserializeOwned + Debug,
- {
- self.send(method, params)?;
- let response: RpcResponse<T> = self.receive()?;
+ pub fn subscribe_new_head(&mut self) -> Result<RpcStream<Nothing>> {
+ let id: String = self.call("eth_subscribe", &["newHeads"])?;
+ Ok(RpcStream::new(self, id))
+ }
+ fn handle_response<T>(&mut self, response: RpcResponse<T>) -> Result<T> {
assert_eq!(self.id, response.id);
self.id += 1;
return if let Some(ok) = response.result {
@@ -204,149 +200,207 @@ impl Rpc {
})
};
}
+}
+
+impl RpcClient for Rpc {
+ fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T>
+ where
+ T: serde::de::DeserializeOwned + Debug,
+ {
+ self.send(method, params)?;
+ let response = self.receive()?;
+ return self.handle_response(response);
+ }
+}
+
+#[derive(Debug, serde::Deserialize)]
+pub struct NotificationContent<T> {
+ subscription: String,
+ result: T,
+}
+
+#[derive(Debug, serde::Deserialize)]
+
+struct Notification<T> {
+ params: NotificationContent<T>,
+}
+
+#[derive(Debug, serde::Deserialize)]
+#[serde(untagged)]
+enum NotificationOrResponse<T, N> {
+ Notification(Notification<N>),
+ Response(RpcResponse<T>),
+}
+
+/// A notification stream wrapping an rpc client
+pub struct RpcStream<'a, N: Debug + DeserializeOwned> {
+ rpc: &'a mut Rpc,
+ id: String,
+ buff: Vec<N>,
+}
+
+impl<'a, N: Debug + DeserializeOwned> RpcStream<'a, N> {
+ fn new(rpc: &'a mut Rpc, id: String) -> Self {
+ Self {
+ rpc,
+ id,
+ buff: vec![],
+ }
+ }
+
+ /// Block until next notification
+ pub fn next(&mut self) -> Result<N> {
+ match self.buff.pop() {
+ // Consume buffered notifications
+ Some(prev) => Ok(prev),
+ // Else read next one
+ None => {
+ let notification: Notification<N> = self.rpc.receive()?;
+ let notification = notification.params;
+ assert_eq!(self.id, notification.subscription);
+ Ok(notification.result)
+ }
+ }
+ }
+}
+
+impl<N: Debug + DeserializeOwned> Drop for RpcStream<'_, N> {
+ fn drop(&mut self) {
+ let Self { rpc, id, .. } = self;
+ // Request unsubscription, ignoring error
+ rpc.send("eth_unsubscribe", &[id]).ok();
+ // Ignore all buffered notification until subscription response
+ while let Ok(response) = rpc.receive::<NotificationOrResponse<bool, N>>() {
+ match response {
+ NotificationOrResponse::Notification(_) => { /* Ignore */ }
+ NotificationOrResponse::Response(_) => return,
+ }
+ }
+ }
+}
- pub fn list_accounts(&mut self) -> Result<Vec<Address>> {
+impl<N: Debug + DeserializeOwned> RpcClient for RpcStream<'_, N> {
+ fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T>
+ where
+ T: serde::de::DeserializeOwned + Debug,
+ {
+ self.rpc.send(method, params)?;
+ loop {
+ let response: NotificationOrResponse<T, N> = self.rpc.receive()?;
+ match response {
+ NotificationOrResponse::Notification(n) => {
+ let n = n.params;
+ assert_eq!(self.id, n.subscription);
+ self.buff.push(n.result);
+ }
+ NotificationOrResponse::Response(response) => {
+ return self.rpc.handle_response(response)
+ }
+ }
+ }
+ }
+}
+
+pub trait RpcClient {
+ fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T>
+ where
+ T: serde::de::DeserializeOwned + Debug;
+
+ fn list_accounts(&mut self) -> Result<Vec<Address>> {
self.call("personal_listAccounts", &EMPTY)
}
- pub fn new_account(&mut self, passwd: &str) -> Result<Address> {
+ fn new_account(&mut self, passwd: &str) -> Result<Address> {
self.call("personal_newAccount", &[passwd])
}
- pub fn import_account(&mut self, hex: &str, passwd: &str) -> Result<bool> {
+ fn import_account(&mut self, hex: &str, passwd: &str) -> Result<bool> {
self.call("personal_importRawKey", &(hex, passwd))
}
- pub fn unlock_account(&mut self, account: &Address, passwd: &str) -> Result<bool> {
+ fn unlock_account(&mut self, account: &Address, passwd: &str) -> Result<bool> {
self.call("personal_unlockAccount", &(account, passwd, 0))
}
- pub fn get_transaction(&mut self, hash: &H256) -> Result<Option<Transaction>> {
+ fn get_transaction(&mut self, hash: &H256) -> Result<Option<Transaction>> {
match self.call("eth_getTransactionByHash", &[hash]) {
Err(Error::Null) => Ok(None),
r => r,
}
}
- pub fn get_transaction_receipt(&mut self, hash: &H256) -> Result<Option<TransactionReceipt>> {
+ fn get_transaction_receipt(&mut self, hash: &H256) -> Result<Option<TransactionReceipt>> {
match self.call("eth_getTransactionReceipt", &[hash]) {
Err(Error::Null) => Ok(None),
r => r,
}
}
- pub fn fill_transaction(&mut self, req: &TransactionRequest) -> Result<Filled> {
+ fn fill_transaction(&mut self, req: &TransactionRequest) -> Result<Filled> {
self.call("eth_fillTransaction", &[req])
}
- pub fn send_transaction(&mut self, req: &TransactionRequest) -> Result<H256> {
+ fn send_transaction(&mut self, req: &TransactionRequest) -> Result<H256> {
self.call("eth_sendTransaction", &[req])
}
- pub fn block(&mut self, hash: &H256) -> Result<Option<Block>> {
+ fn block(&mut self, hash: &H256) -> Result<Option<Block>> {
match self.call("eth_getBlockByHash", &(hash, &true)) {
Err(Error::Null) => Ok(None),
r => r,
}
}
- pub fn pending_transactions(&mut self) -> Result<Vec<Transaction>> {
+ fn pending_transactions(&mut self) -> Result<Vec<Transaction>> {
self.call("eth_pendingTransactions", &EMPTY)
}
- pub fn miner_start(&mut self) -> Result<()> {
+ fn miner_start(&mut self) -> Result<()> {
match self.call("miner_start", &[8]) {
Err(Error::Null) => Ok(()),
i => i,
}
}
- pub fn miner_stop(&mut self) -> Result<()> {
+ fn miner_stop(&mut self) -> Result<()> {
match self.call("miner_stop", &EMPTY) {
Err(Error::Null) => Ok(()),
i => i,
}
}
- pub fn subscribe_new_head(&mut self) -> Result<RpcStream<Nothing>> {
- let mut rpc = Self::new(&self.path)?;
- let id: String = rpc.call("eth_subscribe", &["newHeads"])?;
- Ok(RpcStream::new(rpc, id))
- }
-
- pub fn latest_block(&mut self) -> Result<Block> {
+ fn latest_block(&mut self) -> Result<Block> {
self.call("eth_getBlockByNumber", &("latest", &true))
}
- pub fn earliest_block(&mut self) -> Result<Block> {
+ fn earliest_block(&mut self) -> Result<Block> {
self.call("eth_getBlockByNumber", &("earliest", &true))
}
- pub fn get_balance(&mut self, addr: &Address) -> Result<U256> {
+ fn get_balance(&mut self, addr: &Address) -> Result<U256> {
self.call("eth_getBalance", &(addr, "latest"))
}
/* ----- Peer management ----- */
- pub fn node_info(&mut self) -> Result<NodeInfo> {
+ fn node_info(&mut self) -> Result<NodeInfo> {
self.call("admin_nodeInfo", &EMPTY)
}
- pub fn add_peer(&mut self, url: &Url) -> Result<bool> {
+ fn add_peer(&mut self, url: &Url) -> Result<bool> {
self.call("admin_addPeer", &[url])
}
- pub fn remove_peer(&mut self, url: &Url) -> Result<bool> {
+ fn remove_peer(&mut self, url: &Url) -> Result<bool> {
self.call("admin_removePeer", &[url])
}
- pub fn count_peer(&mut self) -> Result<usize> {
+ fn count_peer(&mut self) -> Result<usize> {
let peers: Vec<Nothing> = self.call("admin_peers", &EMPTY)?;
Ok(peers.len())
}
}
-pub struct RpcStream<T: Debug + DeserializeOwned> {
- rpc: Rpc,
- id: String,
- phantom: PhantomData<T>,
-}
-
-impl<T: Debug + DeserializeOwned> RpcStream<T> {
- fn new(rpc: Rpc, id: String) -> Self {
- Self {
- rpc,
- id,
- phantom: PhantomData,
- }
- }
-
- pub fn next(&mut self) -> Result<T> {
- let notification: Wrapper<T> = self.rpc.receive()?;
- let notification = notification.params;
- assert_eq!(self.id, notification.subscription);
- Ok(notification.result)
- }
-}
-
-#[derive(Debug, serde::Deserialize)]
-pub struct Notification<T> {
- subscription: String,
- result: T,
-}
-
-#[derive(Debug, serde::Deserialize)]
-struct Wrapper<T> {
- params: Notification<T>,
-}
-
-#[derive(Debug, serde::Deserialize)]
-#[serde(untagged)]
-pub enum NotifEnd<T> {
- Notification(Notification<T>),
- End(bool),
-}
-
#[derive(Debug, Clone, serde::Deserialize)]
pub struct Block {
pub hash: Option<H256>,
diff --git a/instrumentation/src/eth.rs b/instrumentation/src/eth.rs
@@ -6,9 +6,9 @@ use std::{
use common::{config::load_eth_config, rand_slice};
use eth_wire::{
metadata::OutMetadata,
- rpc::{hex::Hex, Rpc, TransactionRequest},
+ rpc::{hex::Hex, Rpc, RpcClient, TransactionRequest},
taler_util::{eth_payto_url, eth_to_taler, TRUNC},
- SyncState, WireState,
+ RpcExtended, SyncState, WireState,
};
use ethereum_types::U256;
@@ -16,10 +16,10 @@ use crate::{check_incoming, check_outgoing, print_now, transfer};
fn wait_for_pending(rpc: &mut Rpc) {
print_now("Wait for pending transactions mining:");
- let mut notifier = rpc.subscribe_new_head().unwrap();
+ let mut rpc = rpc.subscribe_new_head().unwrap();
std::thread::sleep(Duration::from_secs(1)); // Wait for eth-wire to act
while !rpc.pending_transactions().unwrap().is_empty() {
- notifier.next().unwrap();
+ rpc.next().unwrap();
print_now(".");
std::thread::sleep(Duration::from_secs(1)); // Wait for eth-wire to act
}
@@ -45,17 +45,14 @@ pub fn eth_test(config: Option<&Path>, base_url: &str) {
.unwrap();
// Load client
- let client = rpc
+ let client_addr = rpc
.list_accounts()
.unwrap()
.into_iter()
.skip(1) // Skip etherbase if dev network
- .find(|addr| addr != &state.address); // Skip wire
-
- let client_addr = match client {
- Some(addr) => addr,
- None => rpc.new_account("password").unwrap(),
- };
+ .find(|addr| addr != &state.address) // Skip wire
+ .unwrap_or_else(|| rpc.new_account("password").unwrap()); // Else create account
+
rpc.unlock_account(&client_addr, "password").unwrap();
if rpc.get_balance(&client_addr).unwrap() < min_fund {
println!(
@@ -64,9 +61,9 @@ pub fn eth_test(config: Option<&Path>, base_url: &str) {
hex::encode(client_addr)
);
print_now("Waiting for fund:");
- let mut notifier = rpc.subscribe_new_head().unwrap();
+ let mut rpc = rpc.subscribe_new_head().unwrap();
while rpc.get_balance(&client_addr).unwrap() < min_fund {
- notifier.next().unwrap();
+ rpc.next().unwrap();
print_now(".");
}
println!("");
@@ -111,7 +108,7 @@ pub fn eth_test(config: Option<&Path>, base_url: &str) {
.unwrap();
print_now("Wait for bounce:");
let bounce = {
- let mut notifier = rpc.subscribe_new_head().unwrap();
+ let mut rpc = rpc.subscribe_new_head().unwrap();
'l: loop {
let list = rpc.list_since_sync(&state.address, sync_state, 0).unwrap();
sync_state = list.state;
@@ -131,7 +128,7 @@ pub fn eth_test(config: Option<&Path>, base_url: &str) {
}
}
}
- notifier.next().unwrap();
+ rpc.next().unwrap();
print_now(".");
}
};