diff options
Diffstat (limited to 'eth-wire/src/rpc.rs')
-rw-r--r-- | eth-wire/src/rpc.rs | 73 |
1 files changed, 43 insertions, 30 deletions
diff --git a/eth-wire/src/rpc.rs b/eth-wire/src/rpc.rs index f5f3cc6..95d3428 100644 --- a/eth-wire/src/rpc.rs +++ b/eth-wire/src/rpc.rs @@ -20,9 +20,8 @@ //! make our code more compatible with future deprecation use common::{log::log::error, password, reconnect::AutoReconnect, url::Url}; -use ethereum_types::{Address, H256, U256, U64}; +use ethereum_types::{Address, H160, H256, U256, U64}; use serde::de::DeserializeOwned; -use serde_json::error::Category; use std::{ fmt::Debug, io::{self, BufWriter, ErrorKind, Read, Write}, @@ -67,6 +66,7 @@ pub fn auto_rpc_common(ipc_path: PathBuf) -> AutoRpcCommon { } #[derive(Debug, serde::Serialize)] struct RpcRequest<'a, T: serde::Serialize> { + jsonrpc: &'static str, method: &'a str, id: u64, params: &'a T, @@ -133,6 +133,7 @@ impl Rpc { method, id: self.id, params, + jsonrpc: "2.0", }; // Send request @@ -146,6 +147,22 @@ impl Rpc { T: serde::de::DeserializeOwned + Debug, { loop { + // Read one + let pos = self.read_buf[..self.cursor] + .iter() + .position(|c| *c == b'\n') + .map(|pos| pos + 1); // Move after newline + if let Some(pos) = pos { + match serde_json::from_slice(&self.read_buf[..pos]) { + Ok(response) => { + self.read_buf.copy_within(pos..self.cursor, 0); + self.cursor -= pos; + return Ok(response); + } + Err(err) => return Err(err)?, + } + } // Or read more + // Double buffer size if full if self.cursor == self.read_buf.len() { self.read_buf.resize(self.cursor * 2, 0); @@ -155,32 +172,7 @@ impl Rpc { ErrorKind::UnexpectedEof, "RPC EOF".to_string(), ))?, - Ok(nb) => { - self.cursor += nb; - let mut de: serde_json::StreamDeserializer<_, T> = - serde_json::Deserializer::from_slice(&self.read_buf[..self.cursor]) - .into_iter(); - - if let Some(result) = de.next() { - match result { - Ok(response) => { - let read = de.byte_offset(); - self.read_buf.copy_within(read..self.cursor, 0); - self.cursor -= read; - return Ok(response); - } - Err(err) if err.classify() == Category::Eof => { - if nb == 0 { - Err(std::io::Error::new( - ErrorKind::UnexpectedEof, - "Stream EOF", - ))?; - } - } - Err(e) => Err(e)?, - } - } - } + Ok(nb) => self.cursor += nb, Err(e) if e.kind() == ErrorKind::Interrupted => {} Err(e) => Err(e)?, } @@ -188,7 +180,14 @@ impl Rpc { } pub fn subscribe_new_head(&mut self) -> Result<RpcStream<Nothing>> { - let id: String = self.call("eth_subscribe", &["newHeads"])?; + self.send("eth_subscribe", &["newHeads"])?; + let id = loop { + match self.receive::<SubscribeDirtyFix>()? { + SubscribeDirtyFix::Fix(_) => { /* TODO debug */ } + SubscribeDirtyFix::Id(id) => break id, + } + }; + let id = self.handle_response(id)?; Ok(RpcStream::new(self, id)) } @@ -238,6 +237,12 @@ enum NotificationOrResponse<T, N> { Notification(Notification<N>), Response(RpcResponse<T>), } +#[derive(Debug, serde::Deserialize)] +#[serde(untagged)] +enum SubscribeDirtyFix { + Fix(RpcResponse<bool>), + Id(RpcResponse<String>), +} /// A notification stream wrapping an rpc client pub struct RpcStream<'a, N: Debug + DeserializeOwned> { @@ -283,6 +288,7 @@ impl<N: Debug + DeserializeOwned> Drop for RpcStream<'_, N> { NotificationOrResponse::Response(_) => return, } } + println!("has unusubsriced") } } @@ -401,9 +407,16 @@ pub trait RpcClient { /* ----- Miner ----- */ + fn miner_set_etherbase(&mut self, addr: &H160) -> Result<()> { + match self.call("miner_setEtherbase", &[addr]) { + Err(Error::Null) => Ok(()), + i => i, + } + } + /// Start mining fn miner_start(&mut self) -> Result<()> { - match self.call("miner_start", &[8]) { + match self.call("miner_start", &EMPTY) { Err(Error::Null) => Ok(()), i => i, } |