commit 68ccf4c10b0bd54d5c368187c16fd96e2053e714
parent 5353e1fd79610ebae4b81b1450ff759784d2b1bc
Author: Antoine A <>
Date: Tue, 28 Dec 2021 14:20:06 +0100
Reuse tcp stream for RPC
Diffstat:
6 files changed, 92 insertions(+), 80 deletions(-)
diff --git a/btc-wire/src/bin/test.rs b/btc-wire/src/bin/test.rs
@@ -39,7 +39,7 @@ pub fn main() {
let wallets = [CLIENT, WIRE, RESERVE];
- let rpc = BtcRpc::common(&config).unwrap();
+ let mut rpc = BtcRpc::common(&config).unwrap();
if !existing_wallets.contains(CLIENT)
|| !existing_wallets.contains(WIRE)
|| !existing_wallets.contains(RESERVE)
@@ -64,14 +64,14 @@ pub fn main() {
}
// Client initialization
- let client_rpc = BtcRpc::wallet(&config, CLIENT).unwrap();
- let wire_rpc = BtcRpc::wallet(&config, WIRE).unwrap();
- let reserve_rpc = BtcRpc::wallet(&config, RESERVE).unwrap();
+ let mut client_rpc = BtcRpc::wallet(&config, CLIENT).unwrap();
+ let mut wire_rpc = BtcRpc::wallet(&config, WIRE).unwrap();
+ let mut reserve_rpc = BtcRpc::wallet(&config, RESERVE).unwrap();
let client_addr = client_rpc.get_new_address().unwrap();
let wire_addr = wire_rpc.get_new_address().unwrap();
let reserve_addr = reserve_rpc.get_new_address().unwrap();
- let next_block = || {
+ let next_block = |reserve_rpc: &mut BtcRpc| {
match config.network {
Network::Regtest => {
// Manually mine a block
@@ -84,13 +84,13 @@ pub fn main() {
}
};
- let wait_for_tx = |rpc: &BtcRpc, txs: &[Txid]| {
+ let wait_for_tx = |rpc: &mut BtcRpc, reserve_rpc: &mut BtcRpc, txs: &[Txid]| {
let mut count = 0;
while txs
.iter()
.any(|id| rpc.get_tx(id).unwrap().confirmations <= 0)
{
- next_block();
+ next_block(reserve_rpc);
if count > 3 {
panic!("Transaction no sended after 4 blocks");
}
@@ -107,7 +107,7 @@ pub fn main() {
// Transfer all wire money to client
let reserve_balance = reserve_rpc.get_balance().unwrap();
reserve_rpc.send(&client_addr, &reserve_balance, true).ok();
- next_block();
+ next_block(&mut reserve_rpc);
let balance = client_rpc.get_balance().unwrap();
let min_balance = test_amount * 3;
@@ -140,7 +140,7 @@ pub fn main() {
reserve_rpc
.send(&client_addr, &Amount::from_sat(5_000_000_000), true)
.unwrap();
- next_block();
+ next_block(&mut reserve_rpc);
}
}
}
@@ -164,13 +164,13 @@ pub fn main() {
.unwrap();
// Check in mempool
assert!(
- tx_exist(&client_rpc, &id, 0, Category::Send).unwrap(),
+ tx_exist(&mut client_rpc, &id, 0, Category::Send).unwrap(),
"Not in mempool"
);
// Check mined
- wait_for_tx(&client_rpc, &[id]);
+ wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[id]);
assert!(
- tx_exist(&wire_rpc, &id, 1, Category::Receive).unwrap(),
+ tx_exist(&mut wire_rpc, &id, 1, Category::Receive).unwrap(),
"Not mined"
);
// Check extract
@@ -185,13 +185,13 @@ pub fn main() {
.unwrap();
// Check in mempool
assert!(
- tx_exist(&client_rpc, &id, 0, Category::Send).unwrap(),
+ tx_exist(&mut client_rpc, &id, 0, Category::Send).unwrap(),
"Not in mempool"
);
// Check mined
- wait_for_tx(&client_rpc, &[id]);
+ wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[id]);
assert!(
- tx_exist(&wire_rpc, &id, 1, Category::Receive).unwrap(),
+ tx_exist(&mut wire_rpc, &id, 1, Category::Receive).unwrap(),
"Not mined"
);
// Check extract
@@ -203,9 +203,9 @@ pub fn main() {
runner.test("Bounce simple", || {
let before = client_rpc.get_balance().unwrap();
let send_id = client_rpc.send(&wire_addr, &test_amount, false).unwrap();
- wait_for_tx(&client_rpc, &[send_id]);
+ wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[send_id]);
let bounce_id = wire_rpc.bounce(&send_id, &bounce_fee).unwrap();
- wait_for_tx(&wire_rpc, &[bounce_id]);
+ wait_for_tx(&mut wire_rpc, &mut reserve_rpc, &[bounce_id]);
let bounce_tx_fee = wire_rpc.get_tx(&bounce_id).unwrap().details[0]
.fee
.unwrap()
@@ -226,7 +226,7 @@ pub fn main() {
let send_id = client_rpc
.send(&wire_addr, &Amount::from_sat(294), false)
.unwrap();
- wait_for_tx(&client_rpc, &[send_id]);
+ wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[send_id]);
assert!(match wire_rpc.bounce(&send_id, &bounce_fee) {
Ok(_) => false,
Err(err) => match err {
@@ -239,7 +239,7 @@ pub fn main() {
let send_id = client_rpc
.send(&wire_addr, &(Amount::from_sat(294) + bounce_fee), false)
.unwrap();
- wait_for_tx(&client_rpc, &[send_id]);
+ wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[send_id]);
assert!(match wire_rpc.bounce(&send_id, &bounce_fee) {
Ok(_) => false,
@@ -264,15 +264,15 @@ pub fn main() {
.unwrap()
})
.collect();
- wait_for_tx(&client_rpc, txs.as_slice());
+ wait_for_tx(&mut client_rpc, &mut reserve_rpc, txs.as_slice());
let before = client_rpc.get_balance().unwrap();
// Send a transaction with multiple input from multiple transaction of different outputs len
let send_id = client_rpc
.send_custom(&txs, [(&wire_addr, &(test_amount * 3))], None)
.unwrap();
- wait_for_tx(&client_rpc, &[send_id]);
+ wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[send_id]);
let bounce_id = wire_rpc.bounce(&send_id, &bounce_fee).unwrap();
- wait_for_tx(&wire_rpc, &[bounce_id]);
+ wait_for_tx(&mut wire_rpc, &mut reserve_rpc, &[bounce_id]);
let after = client_rpc.get_balance().unwrap();
let bounce_tx_fee = wire_rpc.get_tx(&bounce_id).unwrap().details[0]
.fee
@@ -294,7 +294,12 @@ pub fn main() {
}
/// Check a specific transaction exist in a wallet historic
-fn tx_exist(rpc: &BtcRpc, id: &Txid, min_confirmation: i32, detail: Category) -> rpc::Result<bool> {
+fn tx_exist(
+ rpc: &mut BtcRpc,
+ id: &Txid,
+ min_confirmation: i32,
+ detail: Category,
+) -> rpc::Result<bool> {
let result = rpc.list_since_block(None, 1, false).unwrap();
let found = result
.transactions
diff --git a/btc-wire/src/lib.rs b/btc-wire/src/lib.rs
@@ -41,7 +41,7 @@ pub enum GetOpReturnErr {
impl BtcRpc {
/// Send a transaction with a 32B key as metadata encoded using fake segwit addresses
pub fn send_segwit_key(
- &self,
+ &mut self,
to: &Address,
amount: &Amount,
metadata: &[u8; 32],
@@ -64,7 +64,7 @@ impl BtcRpc {
/// Get detailed information about an in-wallet transaction and it's 32B metadata key encoded using fake segwit addresses
pub fn get_tx_segwit_key(
- &self,
+ &mut self,
id: &Txid,
) -> Result<(TransactionFull, [u8; 32]), GetSegwitErr> {
let full = self.get_tx(id)?;
@@ -88,7 +88,7 @@ impl BtcRpc {
/// Send a transaction with metadata encoded using OP_RETURN
pub fn send_op_return(
- &self,
+ &mut self,
to: &Address,
amount: &Amount,
metadata: &[u8],
@@ -100,7 +100,7 @@ impl BtcRpc {
/// Get detailed information about an in-wallet transaction and its op_return metadata
pub fn get_tx_op_return(
- &self,
+ &mut self,
id: &Txid,
) -> Result<(TransactionFull, Vec<u8>), GetOpReturnErr> {
let full = self.get_tx(id)?;
@@ -124,7 +124,7 @@ impl BtcRpc {
/// There is no reliable way to bounce a transaction as you cannot know if the addresses
/// used are shared or come from a third-party service. We only send back to the first input
/// address as a best-effort gesture.
- pub fn bounce(&self, id: &Txid, bounce_fee: &Amount) -> Result<Txid, BounceErr> {
+ pub fn bounce(&mut self, id: &Txid, bounce_fee: &Amount) -> Result<Txid, BounceErr> {
let full = self.get_tx(id)?;
let detail = &full.details[0];
if detail.category != Category::Receive {
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -144,7 +144,7 @@ impl AutoReconnectRPC {
fn connect(config: &BitcoinConfig, wallet: &str, delay: Duration) -> BtcRpc {
loop {
match BtcRpc::wallet(config, wallet) {
- Ok(new) => match new.net_info() {
+ Ok(mut new) => match new.net_info() {
Ok(_) => return new,
Err(err) => {
error!("connect: RPC - {}", err);
@@ -226,7 +226,7 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
// Perform a transaction on the blockchain
fn perform_send(
db: &mut Client,
- rpc: &BtcRpc,
+ rpc: &mut BtcRpc,
id: i32,
status: Status,
) -> Result<(), Box<dyn std::error::Error>> {
@@ -297,11 +297,11 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
// Send delayed transactions
for id in list_status(db, Status::Delayed)? {
- perform_send(db, &rpc, id, Status::Delayed)?;
+ perform_send(db, rpc, id, Status::Delayed)?;
}
// Send proposed transactions
for id in list_status(db, Status::Proposed)? {
- perform_send(db, &rpc, id, Status::Proposed)?;
+ perform_send(db, rpc, id, Status::Proposed)?;
}
Ok(())
})();
@@ -374,7 +374,7 @@ fn sync_chain(
}
}
} else {
- let debit_addr = sender_address(&rpc, &full)?;
+ let debit_addr = sender_address(rpc, &full)?;
let credit_addr = full.details[0].address.as_ref().unwrap();
let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
let amount = btc_amount_to_taler_amount(&full.amount);
@@ -400,7 +400,7 @@ fn sync_chain(
}
Category::Receive => match rpc.get_tx_segwit_key(&id) {
Ok((full, reserve_pub)) => {
- let debit_addr = sender_address(&rpc, &full)?;
+ let debit_addr = sender_address(rpc, &full)?;
let credit_addr = full.details[0].address.as_ref().unwrap();
let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
let amount = btc_amount_to_taler_amount(&full.amount);
@@ -503,7 +503,7 @@ fn main() {
let config = taler_config::Config::from_path("test.conf");
let config: &'static Config = Box::leak(Box::new(config));
let btc_config = BitcoinConfig::load(&data_dir).unwrap();
- let rpc = BtcRpc::common(&btc_config).unwrap();
+ let mut rpc = BtcRpc::common(&btc_config).unwrap();
rpc.load_wallet(&config.btc_wallet).ok();
let rpc_listener = AutoReconnectRPC::new(
btc_config.clone(),
diff --git a/btc-wire/src/rpc.rs b/btc-wire/src/rpc.rs
@@ -2,16 +2,19 @@ use bitcoin::{hashes::hex::ToHex, Address, Amount, BlockHash, SignedAmount, Txid
use serde_json::{json, Value};
use std::{
fmt::Debug,
- io::{self, BufRead, BufReader, ErrorKind, Write},
+ io::{self, BufRead, BufReader, Write},
net::{SocketAddr, TcpStream},
- sync::atomic::{AtomicU64, Ordering},
- time::{Duration, Instant},
+ time::Duration,
};
use crate::config::BitcoinConfig;
-// This is a very simple RPC client designed only for a specific bitcoincore version
+// This is a very simple RPC client designed only for a specific bitcoind version
// and to use on an secure localhost connection to a trusted node
+//
+// No http format of body length check as we trust the node output
+// No asynchronous request as bitcoind put requests in a queue and process
+// them synchronously and we do not want to fill this queue
#[derive(Debug, serde::Serialize)]
struct BtcRequest<'a, T: serde::Serialize> {
@@ -50,8 +53,9 @@ const EMPTY: [(); 0] = [];
pub struct BtcRpc {
addr: SocketAddr,
path: String,
- id: AtomicU64,
+ id: u64,
cookie: String,
+ conn: Option<BufReader<TcpStream>>,
}
impl BtcRpc {
@@ -74,37 +78,44 @@ impl BtcRpc {
Ok(Self {
addr: config.addr,
path,
- id: AtomicU64::new(0),
+ id: 0,
cookie: format!("Basic {}", base64::encode(&cookie)),
+ conn: None,
})
}
- fn call<T>(&self, method: &str, params: &impl serde::Serialize) -> Result<T>
+ fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T>
where
T: serde::de::DeserializeOwned + Debug,
{
- let id = self.id.fetch_add(1, Ordering::SeqCst);
- let request = BtcRequest { method, id, params };
-
- // Some call might hang waiting for a new block to be mined
- let timeout = Duration::from_secs(666);
- let request_deadline = Instant::now() + timeout;
+ if self.conn.is_none() {
+ // Some call might hang waiting for a new block to be mined
+ let timeout = Duration::from_secs(666);
+
+ // Open connection
+ let sock = TcpStream::connect_timeout(&self.addr, timeout)?;
+ sock.set_read_timeout(Some(timeout))?;
+ sock.set_write_timeout(Some(timeout))?;
+ self.conn.replace(BufReader::new(sock));
+ };
+ let sock = self.conn.as_mut().unwrap();
- // Open connection
- let mut sock = TcpStream::connect_timeout(&self.addr, timeout)?;
- sock.set_read_timeout(Some(timeout))?;
- sock.set_write_timeout(Some(timeout))?;
+ let request = BtcRequest {
+ method,
+ id: self.id,
+ params,
+ };
// Serialize the body first so we can set the Content-Length header.
let body = serde_json::to_vec(&request)?;
let mut buf = Vec::new();
// Write HTTP request
{
+ let sock = sock.get_mut();
// Send HTTP request
writeln!(buf, "POST {} HTTP/1.1\r", self.path)?;
// Write headers
writeln!(buf, "Accept: application/json-rpc\r")?;
- writeln!(buf, "Connection: close\r")?;
writeln!(buf, "Authorization: {}\r", self.cookie)?;
writeln!(buf, "Content-Type: application/json-rpc\r")?;
writeln!(buf, "Content-Length: {}\r", body.len())?;
@@ -116,23 +127,21 @@ impl BtcRpc {
sock.write_all(&body).unwrap();
sock.flush().unwrap();
}
- // Receive response
- let mut reader = BufReader::new(sock);
// Skip response
loop {
- let amount = reader.read_until(b'\n', &mut buf).unwrap();
+ let amount = sock.read_until(b'\n', &mut buf).unwrap();
let sep = buf[..amount] == [b'\r', b'\n'];
buf.clear();
if sep {
break;
- } else if Instant::now() > request_deadline {
- Err(std::io::Error::new(ErrorKind::TimedOut, "Timeout"))?
}
}
// Read body
- let amount = reader.read_until(b'\n', &mut buf).unwrap();
+ let amount = sock.read_until(b'\n', &mut buf).unwrap();
let response: BtcResponse<T> = serde_json::from_slice(&buf[..amount])?;
- assert_eq!(id, response.id);
+
+ assert_eq!(self.id, response.id);
+ self.id += 1;
if let Some(ok) = response.result {
Ok(ok)
} else {
@@ -144,43 +153,43 @@ impl BtcRpc {
}
}
- pub fn net_info(&self) -> Result<Empty> {
+ pub fn net_info(&mut self) -> Result<Empty> {
self.call("getnetworkinfo", &EMPTY)
}
- pub fn load_wallet(&self, name: &str) -> Result<Wallet> {
+ pub fn load_wallet(&mut self, name: &str) -> Result<Wallet> {
self.call("loadwallet", &[name])
}
- pub fn create_wallet(&self, name: &str) -> Result<Wallet> {
+ pub fn create_wallet(&mut self, name: &str) -> Result<Wallet> {
self.call("createwallet", &[name])
}
- pub fn get_new_address(&self) -> Result<Address> {
+ pub fn get_new_address(&mut self) -> Result<Address> {
self.call("getnewaddress", &EMPTY)
}
- pub fn generate(&self, nb: u16, address: &Address) -> Result<Vec<BlockHash>> {
+ pub fn generate(&mut self, nb: u16, address: &Address) -> Result<Vec<BlockHash>> {
self.call("generatetoaddress", &(nb, address))
}
- pub fn wait_for_new_block(&self, timeout: u64) -> Result<Empty> {
+ pub fn wait_for_new_block(&mut self, timeout: u64) -> Result<Empty> {
self.call("waitfornewblock", &[timeout])
}
- pub fn get_balance(&self) -> Result<Amount> {
+ pub fn get_balance(&mut self) -> Result<Amount> {
let btc: f64 = self.call("getbalance", &EMPTY)?;
Ok(Amount::from_btc(btc).unwrap())
}
- pub fn send(&self, address: &Address, amount: &Amount, subtract_fee: bool) -> Result<Txid> {
+ pub fn send(&mut self, address: &Address, amount: &Amount, subtract_fee: bool) -> Result<Txid> {
let btc = amount.as_btc();
self.call("sendtoaddress", &(address, btc, (), (), subtract_fee))
}
/// Send transaction to multiple recipients
pub fn send_many<'a, 'b>(
- &self,
+ &mut self,
recipients: impl IntoIterator<Item = (&'a Address, &'b Amount)>,
) -> Result<Txid> {
let amounts = Value::Object(
@@ -193,7 +202,7 @@ impl BtcRpc {
}
pub fn send_custom<'a, 'b, 'c>(
- &self,
+ &mut self,
inputs: impl IntoIterator<Item = &'a Txid>,
outputs: impl IntoIterator<Item = (&'b Address, &'c Amount)>,
data: Option<&[u8]>,
@@ -226,7 +235,7 @@ impl BtcRpc {
}
pub fn list_since_block(
- &self,
+ &mut self,
hash: Option<&BlockHash>,
confirmation: u8,
include_remove: bool,
@@ -234,11 +243,11 @@ impl BtcRpc {
self.call("listsinceblock", &(hash, confirmation, (), include_remove))
}
- pub fn get_tx(&self, id: &Txid) -> Result<TransactionFull> {
+ pub fn get_tx(&mut self, id: &Txid) -> Result<TransactionFull> {
self.call("gettransaction", &(id, (), true))
}
- pub fn get_raw(&self, id: &Txid) -> Result<RawTransaction> {
+ pub fn get_raw(&mut self, id: &Txid) -> Result<RawTransaction> {
self.call("getrawtransaction", &(id, true))
}
}
diff --git a/btc-wire/src/rpc_utils.rs b/btc-wire/src/rpc_utils.rs
@@ -36,7 +36,7 @@ pub fn check_address(addr: &str) -> bool {
}
/// Get the first sender address from a raw transaction
-pub fn sender_address(rpc: &BtcRpc, full: &TransactionFull) -> rpc::Result<Address> {
+pub fn sender_address(rpc: &mut BtcRpc, full: &TransactionFull) -> rpc::Result<Address> {
let first = &full.decoded.vin[0];
let tx = rpc.get_raw(&first.txid.unwrap())?;
Ok(tx
diff --git a/script/test_btc_fail.sh b/script/test_btc_fail.sh
@@ -33,7 +33,7 @@ echo "Start gateway"
gateway
echo ""
-SEQ="seq 10 99"
+SEQ="seq 10 40"
function check() {
check_delta "$1?delta=-100" "$SEQ"
@@ -54,7 +54,7 @@ check incoming
echo " OK"
echo -n "Check balance:"
-check_balance 9.99438310 1.00490500
+check_balance 9.99897979 1.00077500
echo " OK"
echo "----- Handle outgoing -----"
@@ -66,9 +66,7 @@ for n in `$SEQ`; do
-C payto://bitcoin/$CLIENT \
-a BTC:0.0000$n > /dev/null
done
-sleep 5
-mine_btc # Mine transactions
-sleep 5
+sleep 15
mine_btc # Mine transactions
echo " OK"
@@ -77,5 +75,5 @@ check outgoing
echo " OK"
echo -n "Check balance:"
-check_balance 9.99928810
+check_balance 9.99975479
echo " OK"
\ No newline at end of file