commit d76ac36c53783d5d6e358f71845944cd286a77aa
parent c0c672b5816046a9ce28e4f69af8c1ebb3f02409
Author: Antoine A <>
Date: Wed, 2 Feb 2022 00:20:52 +0100
eth_wire: rpc pub/sub
Diffstat:
2 files changed, 119 insertions(+), 109 deletions(-)
diff --git a/eth_wire/src/bin/eth_test.rs b/eth_wire/src/bin/eth_test.rs
@@ -13,7 +13,7 @@
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::{panic::AssertUnwindSafe, str::FromStr, time::Duration};
+use std::{panic::AssertUnwindSafe, str::FromStr};
use eth_wire::{
metadata::{InMetadata, OutMetadata},
@@ -23,9 +23,8 @@ use taler_common::{rand_slice, url::Url};
use web3::types::{H256, U256};
pub fn main() {
- //let path = std::env::args().nth(1).unwrap();
-
- let mut rpc = Rpc::new("/tmp/tmp.0fnWV9egdD/eth/geth.ipc").unwrap();
+ let path = std::env::args().nth(1).unwrap();
+ let mut rpc = Rpc::new(path).unwrap();
let accounts = rpc.list_accounts().unwrap();
for account in &accounts {
@@ -92,9 +91,10 @@ fn tx_pending(rpc: &mut Rpc, id: &H256) -> rpc::Result<bool> {
/// Mine pending transactions
fn mine_pending(rpc: &mut Rpc) -> rpc::Result<()> {
+ let mut notifier = rpc.subscribe_new_head()?;
rpc.miner_start()?;
while !rpc.pending_transactions()?.is_empty() {
- std::thread::sleep(Duration::from_millis(200));
+ notifier.next()?;
}
rpc.miner_stop()?;
Ok(())
diff --git a/eth_wire/src/rpc.rs b/eth_wire/src/rpc.rs
@@ -19,12 +19,14 @@
//! We only parse the thing we actually use, this reduce memory usage and
//! make our code more compatible with future deprecation
+use serde::de::DeserializeOwned;
+use serde_json::error::Category;
use std::{
fmt::Debug,
- io::{self, ErrorKind, Read},
- net::Shutdown,
+ io::{self, BufWriter, ErrorKind, Read, Write},
+ marker::PhantomData,
os::unix::net::UnixStream,
- path::Path,
+ path::{Path, PathBuf},
};
use web3::types::{Address, Bytes, SyncState, H256, U256, U64};
@@ -64,30 +66,31 @@ pub type Result<T> = std::result::Result<T, Error>;
const EMPTY: [(); 0] = [];
+pub trait RpcTrait {}
+
/// Bitcoin RPC connection
pub struct Rpc {
+ path: PathBuf,
id: u64,
- conn: UnixStream,
- buf: Vec<u8>,
+ conn: BufWriter<UnixStream>,
+ read_buf: Vec<u8>,
cursor: usize,
}
impl Rpc {
pub fn new(path: impl AsRef<Path>) -> io::Result<Self> {
- let conn = UnixStream::connect(path)?;
+ let conn = UnixStream::connect(&path)?;
Ok(Self {
+ path: path.as_ref().to_path_buf(),
id: 0,
- conn,
- buf: vec![0u8; 8 * 1024],
+ conn: BufWriter::new(conn),
+ read_buf: vec![0u8; 8 * 1024],
cursor: 0,
})
}
- fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T>
- where
- T: serde::de::DeserializeOwned + Debug,
- {
+ fn send(&mut self, method: &str, params: &impl serde::Serialize) -> Result<()> {
// TODO rethink timeout
let request = RpcRequest {
method,
@@ -97,100 +100,43 @@ impl Rpc {
// Send request
serde_json::to_writer(&mut self.conn, &request)?;
- loop {
- if self.cursor == self.buf.len() {
- self.buf.resize(self.cursor * 2, 0);
- }
- match self.conn.read(&mut self.buf[self.cursor..]) {
- Ok(nb) => {
- self.cursor += nb;
- let mut de: serde_json::StreamDeserializer<_, RpcResponse<T>> =
- serde_json::Deserializer::from_slice(&self.buf[..self.cursor]).into_iter();
-
- if let Some(Ok(response)) = de.next() {
- let read = de.byte_offset();
- self.buf.copy_within(read..self.cursor, 0);
- self.cursor -= read;
- assert_eq!(self.id, response.id);
- self.id += 1;
- return if let Some(ok) = response.result {
- Ok(ok)
- } else {
- Err(match response.error {
- Some(err) => Error::RPC {
- code: err.code,
- msg: err.message,
- },
- None => Error::Null,
- })
- };
- } else if nb == 0 {
- self.conn.shutdown(Shutdown::Both).ok();
- Err(std::io::Error::new(ErrorKind::UnexpectedEof, "Stream EOF"))?;
- }
- }
- Err(e) if e.kind() == ErrorKind::Interrupted => {}
- Err(e) => Err(e)?,
- }
- }
+ self.conn.flush()?;
+ Ok(())
}
- fn batch<T>(&mut self, requests: &[(&str, impl serde::Serialize)]) -> Result<Vec<T>>
+ fn receive<T>(&mut self) -> Result<T>
where
T: serde::de::DeserializeOwned + Debug,
{
- let requests: Vec<_> = requests
- .into_iter()
- .enumerate()
- .map(|(i, (method, params))| {
- let request = RpcRequest {
- method,
- id: self.id,
- params,
- };
- self.id + i as u64;
- request
- })
- .collect();
-
- // Send request
- serde_json::to_writer(&mut self.conn, &requests)?;
loop {
- if self.cursor == self.buf.len() {
- self.buf.resize(self.cursor * 2, 0);
+ if self.cursor == self.read_buf.len() {
+ self.read_buf.resize(self.cursor * 2, 0);
}
- match self.conn.read(&mut self.buf[self.cursor..]) {
+ match self.conn.get_mut().read(&mut self.read_buf[self.cursor..]) {
Ok(nb) => {
self.cursor += nb;
-
- let mut de: serde_json::StreamDeserializer<_, Vec<RpcResponse<T>>> =
- serde_json::Deserializer::from_slice(&self.buf[..self.cursor]).into_iter();
-
- if let Some(Ok(mut responses)) = de.next() {
- //let mut responses = result?;
- responses.sort_unstable_by_key(|r| r.id);
- //dbg!(&responses);
- let read = de.byte_offset();
- self.buf.copy_within(read..self.cursor, 0);
- self.cursor -= read;
- self.id += requests.len() as u64;
- return responses
- .into_iter()
- .map(|response| {
- if let Some(ok) = response.result {
- Ok(ok)
- } else {
- let err = response.error.unwrap();
- Err(Error::RPC {
- code: err.code,
- msg: err.message,
- })
+ let mut de: serde_json::StreamDeserializer<_, T> =
+ serde_json::Deserializer::from_slice(&self.read_buf[..self.cursor])
+ .into_iter();
+
+ if let Some(result) = de.next() {
+ match result {
+ Ok(response) => {
+ let read = de.byte_offset();
+ self.read_buf.copy_within(read..self.cursor, 0);
+ self.cursor -= read;
+ return Ok(response);
+ }
+ Err(err) if err.classify() == Category::Eof => {
+ if nb == 0 {
+ return Err(std::io::Error::new(
+ ErrorKind::UnexpectedEof,
+ "Stream EOF",
+ ))?;
}
- })
- .collect();
- } else if nb == 0 {
- self.conn.shutdown(Shutdown::Both).ok();
- Err(std::io::Error::new(ErrorKind::UnexpectedEof, "Stream EOF"))?;
+ }
+ Err(e) => Err(e)?,
+ }
}
}
Err(e) if e.kind() == ErrorKind::Interrupted => {}
@@ -199,6 +145,28 @@ impl Rpc {
}
}
+ fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T>
+ where
+ T: serde::de::DeserializeOwned + Debug,
+ {
+ self.send(method, params)?;
+ let response: RpcResponse<T> = self.receive()?;
+
+ assert_eq!(self.id, response.id);
+ self.id += 1;
+ return if let Some(ok) = response.result {
+ Ok(ok)
+ } else {
+ Err(match response.error {
+ Some(err) => Error::RPC {
+ code: err.code,
+ msg: err.message,
+ },
+ None => Error::Null,
+ })
+ };
+ }
+
pub fn syncing(&mut self) -> Result<SyncState> {
self.call("eth_syncing", &EMPTY)
}
@@ -231,14 +199,6 @@ impl Rpc {
self.call("eth_getBlockByNumber", &(U64::from(nb), &true))
}
- pub fn block_range(&mut self, start: u64, nb: u64) -> Result<Vec<Option<Block>>> {
- self.batch(
- &(start..start + nb)
- .map(|nb| ("eth_getBlockByNumber", (U64::from(nb), &true)))
- .collect::<Vec<_>>(),
- )
- }
-
pub fn pending_transactions(&mut self) -> Result<Vec<Transaction>> {
self.call("eth_pendingTransactions", &EMPTY)
}
@@ -256,6 +216,53 @@ impl Rpc {
i => i,
}
}
+
+ pub fn subscribe_new_head(&mut self) -> Result<RpcStream<BlockHead>> {
+ let mut rpc = Self::new(&self.path)?;
+ let id: String = rpc.call("eth_subscribe", &["newHeads"])?;
+ Ok(RpcStream::new(rpc, id))
+ }
+}
+
+pub struct RpcStream<T: Debug + DeserializeOwned> {
+ rpc: Rpc,
+ id: String,
+ phantom: PhantomData<T>,
+}
+
+impl<T: Debug + DeserializeOwned> RpcStream<T> {
+ fn new(rpc: Rpc, id: String) -> Self {
+ Self {
+ rpc,
+ id,
+ phantom: PhantomData,
+ }
+ }
+
+ pub fn next(&mut self) -> Result<T> {
+ let notification: Wrapper<T> = self.rpc.receive()?;
+ let notification = notification.params;
+ assert_eq!(self.id, notification.subscription);
+ Ok(notification.result)
+ }
+}
+
+#[derive(Debug, serde::Deserialize)]
+pub struct Notification<T> {
+ subscription: String,
+ result: T,
+}
+
+#[derive(Debug, serde::Deserialize)]
+struct Wrapper<T> {
+ params: Notification<T>,
+}
+
+#[derive(Debug, serde::Deserialize)]
+#[serde(untagged)]
+pub enum NotifEnd<T> {
+ Notification(Notification<T>),
+ End(bool),
}
#[derive(Debug, serde::Deserialize)]
@@ -269,6 +276,9 @@ pub struct Block {
pub transactions: Vec<Transaction>,
}
+#[derive(Debug, serde::Deserialize)]
+pub struct BlockHead {}
+
/// Description of a Transaction, pending or in the chain.
#[derive(Debug, serde::Deserialize)]
pub struct Transaction {