summaryrefslogtreecommitdiff
path: root/eth-wire/src/rpc.rs
diff options
context:
space:
mode:
Diffstat (limited to 'eth-wire/src/rpc.rs')
-rw-r--r--eth-wire/src/rpc.rs73
1 files changed, 43 insertions, 30 deletions
diff --git a/eth-wire/src/rpc.rs b/eth-wire/src/rpc.rs
index f5f3cc6..95d3428 100644
--- a/eth-wire/src/rpc.rs
+++ b/eth-wire/src/rpc.rs
@@ -20,9 +20,8 @@
//! make our code more compatible with future deprecation
use common::{log::log::error, password, reconnect::AutoReconnect, url::Url};
-use ethereum_types::{Address, H256, U256, U64};
+use ethereum_types::{Address, H160, H256, U256, U64};
use serde::de::DeserializeOwned;
-use serde_json::error::Category;
use std::{
fmt::Debug,
io::{self, BufWriter, ErrorKind, Read, Write},
@@ -67,6 +66,7 @@ pub fn auto_rpc_common(ipc_path: PathBuf) -> AutoRpcCommon {
}
#[derive(Debug, serde::Serialize)]
struct RpcRequest<'a, T: serde::Serialize> {
+ jsonrpc: &'static str,
method: &'a str,
id: u64,
params: &'a T,
@@ -133,6 +133,7 @@ impl Rpc {
method,
id: self.id,
params,
+ jsonrpc: "2.0",
};
// Send request
@@ -146,6 +147,22 @@ impl Rpc {
T: serde::de::DeserializeOwned + Debug,
{
loop {
+ // Read one
+ let pos = self.read_buf[..self.cursor]
+ .iter()
+ .position(|c| *c == b'\n')
+ .map(|pos| pos + 1); // Move after newline
+ if let Some(pos) = pos {
+ match serde_json::from_slice(&self.read_buf[..pos]) {
+ Ok(response) => {
+ self.read_buf.copy_within(pos..self.cursor, 0);
+ self.cursor -= pos;
+ return Ok(response);
+ }
+ Err(err) => return Err(err)?,
+ }
+ } // Or read more
+
// Double buffer size if full
if self.cursor == self.read_buf.len() {
self.read_buf.resize(self.cursor * 2, 0);
@@ -155,32 +172,7 @@ impl Rpc {
ErrorKind::UnexpectedEof,
"RPC EOF".to_string(),
))?,
- Ok(nb) => {
- self.cursor += nb;
- let mut de: serde_json::StreamDeserializer<_, T> =
- serde_json::Deserializer::from_slice(&self.read_buf[..self.cursor])
- .into_iter();
-
- if let Some(result) = de.next() {
- match result {
- Ok(response) => {
- let read = de.byte_offset();
- self.read_buf.copy_within(read..self.cursor, 0);
- self.cursor -= read;
- return Ok(response);
- }
- Err(err) if err.classify() == Category::Eof => {
- if nb == 0 {
- Err(std::io::Error::new(
- ErrorKind::UnexpectedEof,
- "Stream EOF",
- ))?;
- }
- }
- Err(e) => Err(e)?,
- }
- }
- }
+ Ok(nb) => self.cursor += nb,
Err(e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => Err(e)?,
}
@@ -188,7 +180,14 @@ impl Rpc {
}
pub fn subscribe_new_head(&mut self) -> Result<RpcStream<Nothing>> {
- let id: String = self.call("eth_subscribe", &["newHeads"])?;
+ self.send("eth_subscribe", &["newHeads"])?;
+ let id = loop {
+ match self.receive::<SubscribeDirtyFix>()? {
+ SubscribeDirtyFix::Fix(_) => { /* TODO debug */ }
+ SubscribeDirtyFix::Id(id) => break id,
+ }
+ };
+ let id = self.handle_response(id)?;
Ok(RpcStream::new(self, id))
}
@@ -238,6 +237,12 @@ enum NotificationOrResponse<T, N> {
Notification(Notification<N>),
Response(RpcResponse<T>),
}
+#[derive(Debug, serde::Deserialize)]
+#[serde(untagged)]
+enum SubscribeDirtyFix {
+ Fix(RpcResponse<bool>),
+ Id(RpcResponse<String>),
+}
/// A notification stream wrapping an rpc client
pub struct RpcStream<'a, N: Debug + DeserializeOwned> {
@@ -283,6 +288,7 @@ impl<N: Debug + DeserializeOwned> Drop for RpcStream<'_, N> {
NotificationOrResponse::Response(_) => return,
}
}
+ println!("has unusubsriced")
}
}
@@ -401,9 +407,16 @@ pub trait RpcClient {
/* ----- Miner ----- */
+ fn miner_set_etherbase(&mut self, addr: &H160) -> Result<()> {
+ match self.call("miner_setEtherbase", &[addr]) {
+ Err(Error::Null) => Ok(()),
+ i => i,
+ }
+ }
+
/// Start mining
fn miner_start(&mut self) -> Result<()> {
- match self.call("miner_start", &[8]) {
+ match self.call("miner_start", &EMPTY) {
Err(Error::Null) => Ok(()),
i => i,
}