From e18367bae0a1cd3223896a6e55e7fac4b14f5266 Mon Sep 17 00:00:00 2001 From: Antoine A <> Date: Wed, 23 Feb 2022 18:43:29 +0100 Subject: Better initialization and improved instrumentation test --- btc-wire/src/bin/btc-wire-utils.rs | 2 +- btc-wire/src/loops/analysis.rs | 3 ++- btc-wire/src/main.rs | 21 ++++++++++++++------- btc-wire/src/rpc.rs | 32 ++++++++++++++++++++++---------- 4 files changed, 39 insertions(+), 19 deletions(-) (limited to 'btc-wire') diff --git a/btc-wire/src/bin/btc-wire-utils.rs b/btc-wire/src/bin/btc-wire-utils.rs index 8075d67..9176597 100644 --- a/btc-wire/src/bin/btc-wire-utils.rs +++ b/btc-wire/src/bin/btc-wire-utils.rs @@ -127,7 +127,7 @@ fn main() { } } Cmd::Resetdb => { - let hash: BlockHash = rpc.get_block_hash(0).unwrap(); + let hash: BlockHash = rpc.get_genesis().unwrap(); let mut db = Client::connect(&config.unwrap().db_url, NoTls).unwrap(); let mut tx = db.transaction().unwrap(); // Clear transaction tables and reset state diff --git a/btc-wire/src/loops/analysis.rs b/btc-wire/src/loops/analysis.rs index 57b7bd1..f036a73 100644 --- a/btc-wire/src/loops/analysis.rs +++ b/btc-wire/src/loops/analysis.rs @@ -13,7 +13,7 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see */ -use std::sync::atomic::Ordering; +use std::{sync::atomic::Ordering, time::Duration}; use btc_wire::rpc::{AutoRpcCommon, ChainTipsStatus}; use common::{ @@ -70,6 +70,7 @@ pub fn analysis(mut rpc: AutoRpcCommon, mut db: AutoReconnectDb, state: &WireSta })(); if let Err(e) = result { error!("analysis: {}", e); + std::thread::sleep(Duration::from_secs(5)); } } } diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs index 4719318..2d78294 100644 --- a/btc-wire/src/main.rs +++ b/btc-wire/src/main.rs @@ -79,6 +79,11 @@ fn init(config: Option, init: Init) { let config = CoreConfig::load_taler_config(config.as_deref(), Some("BTC")); // Connect to database let mut db = Client::connect(&config.db_url, NoTls).expect("Failed to connect to database"); + // Parse bitcoin config + let btc_conf = BitcoinConfig::load(config.data_dir.unwrap_or_else(default_data_dir)) + .expect("Failed to load bitcoin configuration"); + // Connect to bitcoin node + let mut rpc = Rpc::common(&btc_conf).expect("Failed to connect to bitcoin RPC server"); match init { Init::Initdb => { // Load schema @@ -91,16 +96,18 @@ fn init(config: Option, init: Init) { &[&[1u8].as_ref()], ) .expect("Failed to initialise database state"); + // Init last_hash if not already set + let genesis_hash = rpc.get_genesis().expect("Failed to get genesis hash"); + db + .execute( + "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING", + &[&genesis_hash.as_ref()], + ) + .expect("Failed to update database state"); println!("Database initialised"); } Init::Initwallet => { - // Parse bitcoin config - let btc_conf = BitcoinConfig::load(config.data_dir.unwrap_or_else(default_data_dir)) - .expect("Failed to load bitcoin configuration"); - // Connect to bitcoin node - let mut rpc = Rpc::common(&btc_conf).expect("Failed to connect to bitcoin RPC server"); - - // Skip previous blocks + // Skip past blocks let info = rpc .get_blockchain_info() .expect("Failed to get blockchain info"); diff --git a/btc-wire/src/rpc.rs b/btc-wire/src/rpc.rs index 814127a..f022737 100644 --- a/btc-wire/src/rpc.rs +++ b/btc-wire/src/rpc.rs @@ -31,8 +31,8 @@ use serde_json::{json, Value}; use std::{ fmt::Debug, io::{self, BufRead, BufReader, Write}, - net::TcpStream, - time::Duration, + net::{TcpStream, SocketAddr}, + time::{Duration, Instant}, }; use crate::config::{BitcoinConfig, BtcAuth}; @@ -113,6 +113,8 @@ const EMPTY: [(); 0] = []; /// Bitcoin RPC connection pub struct Rpc { + last_call: Instant, + addr: SocketAddr, path: String, id: u64, cookie: String, @@ -149,6 +151,8 @@ impl Rpc { let conn = BufReader::new(sock); Ok(Self { + last_call: Instant::now(), + addr: config.addr, path, id: 0, cookie: format!("Basic {}", base64::encode(token)), @@ -161,7 +165,13 @@ impl Rpc { where T: serde::de::DeserializeOwned + Debug, { - // TODO rethink timeout + // Handle bitcoind RPC client timeout + if self.last_call.elapsed() > Duration::from_secs(60) { + // Create new connection + let sock = TcpStream::connect_timeout(&self.addr, Duration::from_secs(5))?; + self.conn = BufReader::new(sock); + } + let request = RpcRequest { method, id: self.id, @@ -203,7 +213,7 @@ impl Rpc { // Read body let amount = sock.read_until(b'\n', buf)?; let response: RpcResponse = serde_json::from_slice(&buf[..amount])?; - match response { + let result = match response { RpcResponse::RpcResponse { result, error, id } => { assert_eq!(self.id, id); self.id += 1; @@ -220,7 +230,9 @@ impl Rpc { } } RpcResponse::Error(msg) => Err(Error::Bitcoin(msg)), - } + }; + self.last_call = Instant::now(); + return result; } /* ----- Wallet management ----- */ @@ -266,11 +278,6 @@ impl Rpc { /* ----- Getter ----- */ - /// Get block hash at a given height - pub fn get_block_hash(&mut self, height: u32) -> Result { - self.call("getblockhash", &[height]) - } - /// Get blockchain info pub fn get_blockchain_info(&mut self) -> Result { self.call("getblockchaininfo", &EMPTY) @@ -291,6 +298,11 @@ impl Rpc { self.call("getrawtransaction", &(id, true)) } + /// Get genesis block hash + pub fn get_genesis(&mut self) -> Result { + self.call("getblockhash", &[0]) + } + /* ----- Transactions ----- */ /// Send bitcoin transaction -- cgit v1.2.3