/*
This file is part of TALER
Copyright (C) 2022 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see
*/
//! This is a very simple RPC client designed only for a specific geth version
//! and to use on an secure unix domain socket to a trusted node
//!
//! We only parse the thing we actually use, this reduce memory usage and
//! make our code more compatible with future deprecation
use common::{log::log::error, password, reconnect::AutoReconnect, url::Url};
use ethereum_types::{Address, H256, U256, U64};
use serde::de::DeserializeOwned;
use serde_json::error::Category;
use std::{
fmt::Debug,
io::{self, BufWriter, ErrorKind, Read, Write},
os::unix::net::UnixStream,
path::{Path, PathBuf},
};
use self::hex::Hex;
pub type AutoRpcWallet = AutoReconnect<(PathBuf, Address), Rpc>;
/// Create a reconnecting rpc connection with an unlocked wallet
pub fn auto_rpc_wallet(ipc_path: PathBuf, address: Address) -> AutoRpcWallet {
AutoReconnect::new(
(ipc_path, address),
|(path, address)| {
let mut rpc = Rpc::new(path)
.map_err(|err| error!("connect RPC: {}", err))
.ok()?;
rpc.unlock_account(address, &password())
.map_err(|err| error!("connect RPC: {}", err))
.ok()?;
Some(rpc)
},
|client| client.node_info().is_err(),
)
}
pub type AutoRpcCommon = AutoReconnect;
/// Create a reconnecting rpc connection
pub fn auto_rpc_common(ipc_path: PathBuf) -> AutoRpcCommon {
AutoReconnect::new(
ipc_path,
|path| {
Rpc::new(path)
.map_err(|err| error!("connect RPC: {}", err))
.ok()
},
|client| client.node_info().is_err(),
)
}
#[derive(Debug, serde::Serialize)]
struct RpcRequest<'a, T: serde::Serialize> {
method: &'a str,
id: u64,
params: &'a T,
}
#[derive(Debug, serde::Deserialize)]
struct RpcResponse {
result: Option,
error: Option,
id: u64,
}
#[derive(Debug, serde::Deserialize)]
struct RpcErr {
code: i64,
message: String,
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("{0:?}")]
Transport(#[from] std::io::Error),
#[error("{code:?} - {msg}")]
RPC { code: i64, msg: String },
#[error("JSON: {0}")]
Json(#[from] serde_json::Error),
#[error("Null rpc, no result or error")]
Null,
}
pub type Result = std::result::Result;
const EMPTY: [(); 0] = [];
/// Ethereum RPC connection
pub struct Rpc {
id: u64,
conn: BufWriter,
read_buf: Vec,
cursor: usize,
}
impl Rpc {
/// Start a RPC connection, path can be datadir or ipc path
pub fn new(path: impl AsRef) -> io::Result {
let path = path.as_ref();
let conn = if path.is_dir() {
UnixStream::connect(path.join("geth.ipc"))
} else {
UnixStream::connect(path)
}?;
Ok(Self {
id: 0,
conn: BufWriter::new(conn),
read_buf: vec![0u8; 8 * 1024],
cursor: 0,
})
}
fn send(&mut self, method: &str, params: &impl serde::Serialize) -> Result<()> {
let request = RpcRequest {
method,
id: self.id,
params,
};
// Send request
serde_json::to_writer(&mut self.conn, &request)?;
self.conn.flush()?;
Ok(())
}
fn receive(&mut self) -> Result
where
T: serde::de::DeserializeOwned + Debug,
{
loop {
// Double buffer size if full
if self.cursor == self.read_buf.len() {
self.read_buf.resize(self.cursor * 2, 0);
}
match self.conn.get_mut().read(&mut self.read_buf[self.cursor..]) {
Ok(0) => Err(std::io::Error::new(
ErrorKind::UnexpectedEof,
"RPC EOF".to_string(),
))?,
Ok(nb) => {
self.cursor += nb;
let mut de: serde_json::StreamDeserializer<_, T> =
serde_json::Deserializer::from_slice(&self.read_buf[..self.cursor])
.into_iter();
if let Some(result) = de.next() {
match result {
Ok(response) => {
let read = de.byte_offset();
self.read_buf.copy_within(read..self.cursor, 0);
self.cursor -= read;
return Ok(response);
}
Err(err) if err.classify() == Category::Eof => {
if nb == 0 {
Err(std::io::Error::new(
ErrorKind::UnexpectedEof,
"Stream EOF",
))?;
}
}
Err(e) => Err(e)?,
}
}
}
Err(e) if e.kind() == ErrorKind::Interrupted => {}
Err(e) => Err(e)?,
}
}
}
pub fn subscribe_new_head(&mut self) -> Result> {
let id: String = self.call("eth_subscribe", &["newHeads"])?;
Ok(RpcStream::new(self, id))
}
fn handle_response(&mut self, response: RpcResponse) -> Result {
assert_eq!(self.id, response.id);
self.id += 1;
if let Some(ok) = response.result {
Ok(ok)
} else {
Err(match response.error {
Some(err) => Error::RPC {
code: err.code,
msg: err.message,
},
None => Error::Null,
})
}
}
}
impl RpcClient for Rpc {
fn call(&mut self, method: &str, params: &impl serde::Serialize) -> Result
where
T: serde::de::DeserializeOwned + Debug,
{
self.send(method, params)?;
let response = self.receive()?;
self.handle_response(response)
}
}
#[derive(Debug, serde::Deserialize)]
pub struct NotificationContent {
subscription: String,
result: T,
}
#[derive(Debug, serde::Deserialize)]
struct Notification {
params: NotificationContent,
}
#[derive(Debug, serde::Deserialize)]
#[serde(untagged)]
enum NotificationOrResponse {
Notification(Notification),
Response(RpcResponse),
}
/// A notification stream wrapping an rpc client
pub struct RpcStream<'a, N: Debug + DeserializeOwned> {
rpc: &'a mut Rpc,
id: String,
buff: Vec,
}
impl<'a, N: Debug + DeserializeOwned> RpcStream<'a, N> {
fn new(rpc: &'a mut Rpc, id: String) -> Self {
Self {
rpc,
id,
buff: vec![],
}
}
/// Block until next notification
pub fn next(&mut self) -> Result {
match self.buff.pop() {
// Consume buffered notifications
Some(prev) => Ok(prev),
// Else read next one
None => {
let notification: Notification = self.rpc.receive()?;
let notification = notification.params;
assert_eq!(self.id, notification.subscription);
Ok(notification.result)
}
}
}
}
impl Drop for RpcStream<'_, N> {
fn drop(&mut self) {
let Self { rpc, id, .. } = self;
// Request unsubscription, ignoring error
rpc.send("eth_unsubscribe", &[id]).ok();
// Ignore all buffered notification until subscription response
while let Ok(response) = rpc.receive::>() {
match response {
NotificationOrResponse::Notification(_) => { /* Ignore */ }
NotificationOrResponse::Response(_) => return,
}
}
}
}
impl RpcClient for RpcStream<'_, N> {
fn call(&mut self, method: &str, params: &impl serde::Serialize) -> Result
where
T: serde::de::DeserializeOwned + Debug,
{
self.rpc.send(method, params)?;
loop {
// Buffer notifications until response
let response: NotificationOrResponse = self.rpc.receive()?;
match response {
NotificationOrResponse::Notification(n) => {
let n = n.params;
assert_eq!(self.id, n.subscription);
self.buff.push(n.result);
}
NotificationOrResponse::Response(response) => {
return self.rpc.handle_response(response)
}
}
}
}
}
pub trait RpcClient {
fn call(&mut self, method: &str, params: &impl serde::Serialize) -> Result
where
T: serde::de::DeserializeOwned + Debug;
/* ----- Account management ----- */
/// List registered account
fn list_accounts(&mut self) -> Result> {
self.call("personal_listAccounts", &EMPTY)
}
/// Create a new encrypted account
fn new_account(&mut self, passwd: &str) -> Result {
self.call("personal_newAccount", &[passwd])
}
/// Unlock an existing account
fn unlock_account(&mut self, account: &Address, passwd: &str) -> Result {
self.call("personal_unlockAccount", &(account, passwd, 0))
}
/* ----- Getter ----- */
/// Get a transaction by hash
fn get_transaction(&mut self, hash: &H256) -> Result