/* This file is part of TALER Copyright (C) 2022 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software Foundation; either version 3, or (at your option) any later version. TALER is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see */ //! This is a very simple RPC client designed only for a specific geth version //! and to use on an secure unix domain socket to a trusted node //! //! We only parse the thing we actually use, this reduce memory usage and //! make our code more compatible with future deprecation use common::{log::log::error, password, reconnect::AutoReconnect, url::Url}; use ethereum_types::{Address, H256, U256, U64}; use serde::de::DeserializeOwned; use serde_json::error::Category; use std::{ fmt::Debug, io::{self, BufWriter, ErrorKind, Read, Write}, os::unix::net::UnixStream, path::{Path, PathBuf}, }; use self::hex::Hex; pub type AutoRpcWallet = AutoReconnect<(PathBuf, Address), Rpc>; /// Create a reconnecting rpc connection with an unlocked wallet pub fn auto_rpc_wallet(ipc_path: PathBuf, address: Address) -> AutoRpcWallet { AutoReconnect::new( (ipc_path, address), |(path, address)| { let mut rpc = Rpc::new(path) .map_err(|err| error!("connect RPC: {}", err)) .ok()?; rpc.unlock_account(address, &password()) .map_err(|err| error!("connect RPC: {}", err)) .ok()?; Some(rpc) }, |client| client.node_info().is_err(), ) } pub type AutoRpcCommon = AutoReconnect; /// Create a reconnecting rpc connection pub fn auto_rpc_common(ipc_path: PathBuf) -> AutoRpcCommon { AutoReconnect::new( ipc_path, |path| { Rpc::new(path) .map_err(|err| error!("connect RPC: {}", err)) .ok() }, |client| client.node_info().is_err(), ) } #[derive(Debug, serde::Serialize)] struct RpcRequest<'a, T: serde::Serialize> { method: &'a str, id: u64, params: &'a T, } #[derive(Debug, serde::Deserialize)] struct RpcResponse { result: Option, error: Option, id: u64, } #[derive(Debug, serde::Deserialize)] struct RpcErr { code: i64, message: String, } #[derive(Debug, thiserror::Error)] pub enum Error { #[error("{0:?}")] Transport(#[from] std::io::Error), #[error("{code:?} - {msg}")] RPC { code: i64, msg: String }, #[error("JSON: {0}")] Json(#[from] serde_json::Error), #[error("Null rpc, no result or error")] Null, } pub type Result = std::result::Result; const EMPTY: [(); 0] = []; /// Ethereum RPC connection pub struct Rpc { id: u64, conn: BufWriter, read_buf: Vec, cursor: usize, } impl Rpc { /// Start a RPC connection, path can be datadir or ipc path pub fn new(path: impl AsRef) -> io::Result { let path = path.as_ref(); let conn = if path.is_dir() { UnixStream::connect(path.join("geth.ipc")) } else { UnixStream::connect(path) }?; Ok(Self { id: 0, conn: BufWriter::new(conn), read_buf: vec![0u8; 8 * 1024], cursor: 0, }) } fn send(&mut self, method: &str, params: &impl serde::Serialize) -> Result<()> { let request = RpcRequest { method, id: self.id, params, }; // Send request serde_json::to_writer(&mut self.conn, &request)?; self.conn.flush()?; Ok(()) } fn receive(&mut self) -> Result where T: serde::de::DeserializeOwned + Debug, { loop { // Double buffer size if full if self.cursor == self.read_buf.len() { self.read_buf.resize(self.cursor * 2, 0); } match self.conn.get_mut().read(&mut self.read_buf[self.cursor..]) { Ok(0) => Err(std::io::Error::new( ErrorKind::UnexpectedEof, "RPC EOF".to_string(), ))?, Ok(nb) => { self.cursor += nb; let mut de: serde_json::StreamDeserializer<_, T> = serde_json::Deserializer::from_slice(&self.read_buf[..self.cursor]) .into_iter(); if let Some(result) = de.next() { match result { Ok(response) => { let read = de.byte_offset(); self.read_buf.copy_within(read..self.cursor, 0); self.cursor -= read; return Ok(response); } Err(err) if err.classify() == Category::Eof => { if nb == 0 { Err(std::io::Error::new( ErrorKind::UnexpectedEof, "Stream EOF", ))?; } } Err(e) => Err(e)?, } } } Err(e) if e.kind() == ErrorKind::Interrupted => {} Err(e) => Err(e)?, } } } pub fn subscribe_new_head(&mut self) -> Result> { let id: String = self.call("eth_subscribe", &["newHeads"])?; Ok(RpcStream::new(self, id)) } fn handle_response(&mut self, response: RpcResponse) -> Result { assert_eq!(self.id, response.id); self.id += 1; if let Some(ok) = response.result { Ok(ok) } else { Err(match response.error { Some(err) => Error::RPC { code: err.code, msg: err.message, }, None => Error::Null, }) } } } impl RpcClient for Rpc { fn call(&mut self, method: &str, params: &impl serde::Serialize) -> Result where T: serde::de::DeserializeOwned + Debug, { self.send(method, params)?; let response = self.receive()?; self.handle_response(response) } } #[derive(Debug, serde::Deserialize)] pub struct NotificationContent { subscription: String, result: T, } #[derive(Debug, serde::Deserialize)] struct Notification { params: NotificationContent, } #[derive(Debug, serde::Deserialize)] #[serde(untagged)] enum NotificationOrResponse { Notification(Notification), Response(RpcResponse), } /// A notification stream wrapping an rpc client pub struct RpcStream<'a, N: Debug + DeserializeOwned> { rpc: &'a mut Rpc, id: String, buff: Vec, } impl<'a, N: Debug + DeserializeOwned> RpcStream<'a, N> { fn new(rpc: &'a mut Rpc, id: String) -> Self { Self { rpc, id, buff: vec![], } } /// Block until next notification pub fn next(&mut self) -> Result { match self.buff.pop() { // Consume buffered notifications Some(prev) => Ok(prev), // Else read next one None => { let notification: Notification = self.rpc.receive()?; let notification = notification.params; assert_eq!(self.id, notification.subscription); Ok(notification.result) } } } } impl Drop for RpcStream<'_, N> { fn drop(&mut self) { let Self { rpc, id, .. } = self; // Request unsubscription, ignoring error rpc.send("eth_unsubscribe", &[id]).ok(); // Ignore all buffered notification until subscription response while let Ok(response) = rpc.receive::>() { match response { NotificationOrResponse::Notification(_) => { /* Ignore */ } NotificationOrResponse::Response(_) => return, } } } } impl RpcClient for RpcStream<'_, N> { fn call(&mut self, method: &str, params: &impl serde::Serialize) -> Result where T: serde::de::DeserializeOwned + Debug, { self.rpc.send(method, params)?; loop { // Buffer notifications until response let response: NotificationOrResponse = self.rpc.receive()?; match response { NotificationOrResponse::Notification(n) => { let n = n.params; assert_eq!(self.id, n.subscription); self.buff.push(n.result); } NotificationOrResponse::Response(response) => { return self.rpc.handle_response(response) } } } } } pub trait RpcClient { fn call(&mut self, method: &str, params: &impl serde::Serialize) -> Result where T: serde::de::DeserializeOwned + Debug; /* ----- Account management ----- */ /// List registered account fn list_accounts(&mut self) -> Result> { self.call("personal_listAccounts", &EMPTY) } /// Create a new encrypted account fn new_account(&mut self, passwd: &str) -> Result
{ self.call("personal_newAccount", &[passwd]) } /// Unlock an existing account fn unlock_account(&mut self, account: &Address, passwd: &str) -> Result { self.call("personal_unlockAccount", &(account, passwd, 0)) } /* ----- Getter ----- */ /// Get a transaction by hash fn get_transaction(&mut self, hash: &H256) -> Result> { match self.call("eth_getTransactionByHash", &[hash]) { Err(Error::Null) => Ok(None), r => r, } } /// Get a transaction receipt by hash fn get_transaction_receipt(&mut self, hash: &H256) -> Result> { match self.call("eth_getTransactionReceipt", &[hash]) { Err(Error::Null) => Ok(None), r => r, } } /// Get block by hash fn block(&mut self, hash: &H256) -> Result> { match self.call("eth_getBlockByHash", &(hash, &true)) { Err(Error::Null) => Ok(None), r => r, } } /// Get pending transactions fn pending_transactions(&mut self) -> Result> { self.call("eth_pendingTransactions", &EMPTY) } /// Get latest block fn latest_block(&mut self) -> Result { self.call("eth_getBlockByNumber", &("latest", &true)) } /// Get earliest block (genesis if not pruned) fn earliest_block(&mut self) -> Result { self.call("eth_getBlockByNumber", &("earliest", &true)) } /// Get latest account balance fn get_balance_latest(&mut self, addr: &Address) -> Result { self.call("eth_getBalance", &(addr, "latest")) } /// Get pending account balance fn get_balance_pending(&mut self, addr: &Address) -> Result { self.call("eth_getBalance", &(addr, "pending")) } /// Get node info fn node_info(&mut self) -> Result { self.call("admin_nodeInfo", &EMPTY) } /* ----- Transactions ----- */ /// Fill missing options from transaction request with default values fn fill_transaction(&mut self, req: &TransactionRequest) -> Result { self.call("eth_fillTransaction", &[req]) } /// Send ethereum transaction fn send_transaction(&mut self, req: &TransactionRequest) -> Result { self.call("eth_sendTransaction", &[req]) } /* ----- Miner ----- */ /// Start mining fn miner_start(&mut self) -> Result<()> { match self.call("miner_start", &[8]) { Err(Error::Null) => Ok(()), i => i, } } /// Stop mining fn miner_stop(&mut self) -> Result<()> { match self.call("miner_stop", &EMPTY) { Err(Error::Null) => Ok(()), i => i, } } /* ----- Peer management ----- */ fn export_chain(&mut self, path: &str) -> Result { self.call("admin_exportChain", &[path]) } fn import_chain(&mut self, path: &str) -> Result { self.call("admin_importChain", &[path]) } } #[derive(Debug, Clone, serde::Deserialize)] pub struct Block { pub hash: Option, /// Block number (None if pending) pub number: Option, #[serde(rename = "parentHash")] pub parent_hash: H256, pub transactions: Vec, } #[derive(Debug, serde::Deserialize)] pub struct Nothing {} /// Description of a Transaction, pending or in the chain. #[derive(Debug, Clone, serde::Deserialize)] pub struct Transaction { pub hash: H256, pub nonce: U256, /// Sender address (None when coinbase) pub from: Option
, /// Recipient address (None when contract creation) pub to: Option
, /// Transferred value pub value: U256, /// Input data pub input: Hex, } /// Description of a Transaction, pending or in the chain. #[derive(Debug, Clone, serde::Deserialize)] pub struct TransactionReceipt { /// Gas used by this transaction alone. #[serde(rename = "gasUsed")] pub gas_used: U256, /// Effective gas price #[serde(rename = "effectiveGasPrice")] pub effective_gas_price: Option, } /// Fill result #[derive(Debug, serde::Deserialize)] pub struct Filled { pub tx: FilledGas, } /// Filles gas #[derive(Debug, serde::Deserialize)] pub struct FilledGas { /// Supplied gas pub gas: U256, #[serde(rename = "gasPrice")] pub gas_price: Option, #[serde(rename = "maxFeePerGas")] pub max_fee_per_gas: Option, } /// Send Transaction Parameters #[derive(Debug, serde::Serialize)] pub struct TransactionRequest { /// Sender address pub from: Address, /// Recipient address pub to: Address, /// Transferred value pub value: U256, /// Gas price (None for sensible default) #[serde(rename = "gasPrice")] pub gas_price: Option, /// Transaction data pub data: Hex, /// Transaction nonce (None for next available nonce) #[serde(skip_serializing_if = "Option::is_none")] pub nonce: Option, } #[derive(Debug, serde::Deserialize)] pub struct NodeInfo { pub enode: Url, } pub mod hex { use std::{ fmt, ops::{Deref, DerefMut}, }; use serde::{ de::{Error, Unexpected, Visitor}, Deserialize, Deserializer, Serialize, Serializer, }; /// Raw bytes wrapper #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] pub struct Hex(pub Vec); impl Deref for Hex { type Target = Vec; fn deref(&self) -> &Self::Target { &self.0 } } impl DerefMut for Hex { fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 } } impl Serialize for Hex { fn serialize(&self, serializer: S) -> Result where S: Serializer, { serializer.serialize_str(&hex::encode_prefixed(&self.0)) } } impl<'a> Deserialize<'a> for Hex { fn deserialize(deserializer: D) -> Result where D: Deserializer<'a>, { deserializer.deserialize_identifier(BytesVisitor) } } struct BytesVisitor; impl<'a> Visitor<'a> for BytesVisitor { type Value = Hex; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { write!(formatter, "a 0x-prefixed hex-encoded vector of bytes") } fn visit_str(self, value: &str) -> Result where E: Error, { if value.len() >= 2 && &value[0..2] == "0x" { let bytes = hex::decode(&value[2..]) .map_err(|e| Error::custom(format!("Invalid hex: {}", e)))?; Ok(Hex(bytes)) } else { Err(Error::invalid_value(Unexpected::Str(value), &"0x prefix")) } } fn visit_string(self, value: String) -> Result where E: Error, { self.visit_str(value.as_ref()) } } }