commit 8e8b3c720d7a807a3076ce400fa1bd70d5306c2b
parent 1fc2654e3086cad482864f9cf6a11670086ae192
Author: Antoine A <>
Date: Sun, 27 Jul 2025 14:06:49 +0200
bitcoin: optimize rpc and make bounce more error resilient
Diffstat:
9 files changed, 148 insertions(+), 112 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -533,16 +533,16 @@ dependencies = [
[[package]]
name = "criterion"
-version = "0.6.0"
+version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3bf7af66b0989381bd0be551bd7cc91912a655a58c6918420c9527b1fd8b4679"
+checksum = "e1c047a62b0cc3e145fa84415a3191f628e980b194c2755aa12300a4e6cbd928"
dependencies = [
"anes",
"cast",
"ciborium",
"clap",
"criterion-plot",
- "itertools 0.13.0",
+ "itertools",
"num-traits",
"oorandom",
"plotters",
@@ -556,12 +556,12 @@ dependencies = [
[[package]]
name = "criterion-plot"
-version = "0.5.0"
+version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1"
+checksum = "9b1bcc0dc7dfae599d84ad0b1a55f80cde8af3725da8313b528da95ef783e338"
dependencies = [
"cast",
- "itertools 0.10.5",
+ "itertools",
]
[[package]]
@@ -1402,15 +1402,6 @@ checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "itertools"
-version = "0.10.5"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
-dependencies = [
- "either",
-]
-
-[[package]]
-name = "itertools"
version = "0.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "413ee7dfc52ee1a4949ceeb7dbc8a33f2d6c088194d9f922fb8318faf1f01186"
@@ -1533,9 +1524,9 @@ checksum = "241eaef5fd12c88705a01fc1066c48c4b36e0dd4377dcdc7ec3942cea7a69956"
[[package]]
name = "litrs"
-version = "0.4.1"
+version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b4ce301924b7887e9d637144fdade93f9dfff9b60981d4ac161db09720d39aa5"
+checksum = "f5e54036fe321fd421e10d732f155734c4e4afd610dd556d9a82833ab3ee0bed"
[[package]]
name = "lock_api"
@@ -2015,9 +2006,9 @@ dependencies = [
[[package]]
name = "redox_syscall"
-version = "0.5.15"
+version = "0.5.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7e8af0dde094006011e6a740d4879319439489813bd0bcdc7d821beaeeff48ec"
+checksum = "7251471db004e509f4e75a62cca9435365b5ec7bcdff530d612ac7c87c44a792"
dependencies = [
"bitflags",
]
@@ -2381,12 +2372,12 @@ dependencies = [
[[package]]
name = "socket2"
-version = "0.5.10"
+version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "e22376abed350d73dd1cd119b57ffccad95b4e585a7cda43e286245ce23c0678"
+checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807"
dependencies = [
"libc",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
]
[[package]]
@@ -2847,9 +2838,9 @@ checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
-version = "1.46.1"
+version = "1.47.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "0cc3a2344dafbe23a245241fe8b09735b521110d30fcefbbd5feb1797ca35d17"
+checksum = "43864ed400b6043a4757a25c7a64a8efde741aed79a056a2fb348a406701bb35"
dependencies = [
"backtrace",
"bytes",
@@ -2861,7 +2852,7 @@ dependencies = [
"slab",
"socket2",
"tokio-macros",
- "windows-sys 0.52.0",
+ "windows-sys 0.59.0",
]
[[package]]
diff --git a/Cargo.toml b/Cargo.toml
@@ -36,6 +36,6 @@ hex = { package = "const-hex", version = "1.9.1" }
clap = { version = "4.5", features = ["derive"] }
anyhow = "1"
tracing = "0.1"
-criterion = "0.6"
+criterion = "0.7"
base64 = "0.22.1"
rand = { version = "0.9.0" }
diff --git a/database-versioning/depolymerizer-bitcoin-procedures.sql b/database-versioning/depolymerizer-bitcoin-procedures.sql
@@ -97,7 +97,6 @@ IF out_wtid_reuse THEN
RETURN;
END IF;
-- Notify new transaction
-NOTIFY new_tx;
PERFORM pg_notify('taler_out', out_transfer_row_id || '');
END $$;
COMMENT ON FUNCTION taler_transfer IS 'Create an outgoing taler transaction and register it';
diff --git a/depolymerizer-bitcoin/src/loops/worker.rs b/depolymerizer-bitcoin/src/loops/worker.rs
@@ -31,7 +31,7 @@ use depolymerizer_bitcoin::{
use sqlx::{PgPool, postgres::PgListener};
use taler_common::{ExpoBackoffDecorr, types::url};
use tokio::time::sleep;
-use tracing::{error, info, warn};
+use tracing::{debug, error, info, trace, warn};
use crate::{WorkerCfg, fail_point::fail_point};
@@ -64,18 +64,26 @@ pub async fn worker(mut state: WorkerCfg, pool: PgPool) {
}
// Listen to all channels
- db.listen_all(["new_block", "new_tx"]).await?;
+ db.listen_all(["new_block", "taler_out"]).await?;
loop {
// Wait for the next notification
{
let ntf = db.next_buffered();
+ if let Some(ntf) = &ntf {
+ trace!("notification from {}", ntf.channel())
+ }
if !skip_notification && ntf.is_none() {
+ debug!("waiting for notifications");
// Block until next notification
- db.try_recv().await?;
+ if let Some(ntf) = db.try_recv().await? {
+ trace!("notification from {}", ntf.channel())
+ }
}
// Conflate all notifications
- while db.next_buffered().is_some() {}
+ while let Some(ntf) = db.next_buffered() {
+ trace!("notification from {}", ntf.channel())
+ }
}
// Check lifetime
@@ -88,6 +96,8 @@ pub async fn worker(mut state: WorkerCfg, pool: PgPool) {
}
}
+ debug!("syncing blockchain");
+
// Perform analysis
state.confirmation =
analysis(rpc, state.confirmation, state.max_confirmation).await?;
@@ -457,10 +467,10 @@ async fn bounce(db: &mut PgListener, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResu
Err(err) => match err {
rpc::Error::RPC {
code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError,
- msg,
+ msg: _,
} => {
- db::bounce_set_status(db, id, None, &BounceStatus::ignored).await?;
- info!("|| (ignore) {bounced} because {msg}");
+ warn!("{err}");
+ return Ok(false);
}
e => Err(e)?,
},
diff --git a/depolymerizer-bitcoin/src/rpc.rs b/depolymerizer-bitcoin/src/rpc.rs
@@ -23,7 +23,7 @@
//! We only parse the thing we actually use, this reduce memory usage and
//! make our code more compatible with future deprecation
//!
-//! bitcoincore RPC documentation: <https://bitcoincore.org/en/doc/23.0.0/>
+//! bitcoincore RPC documentation: <https://bitcoincore.org/en/doc/29.0.0/>
use base64::Engine;
use base64::prelude::BASE64_STANDARD;
@@ -31,14 +31,15 @@ use bitcoin::{Address, Amount, BlockHash, SignedAmount, Txid, address::NetworkUn
use serde_json::{Value, json};
use std::{
fmt::Debug,
- io::Write as _,
- time::{Duration, Instant},
+ io::{IoSlice, Write as _},
+ time::Duration,
};
use tokio::{
- io::{self, AsyncBufReadExt as _, AsyncWriteExt as _, BufReader},
+ io::{self, AsyncReadExt, AsyncWriteExt as _},
net::TcpStream,
time::timeout,
};
+use tracing::trace;
use crate::config::{RpcAuth, RpcCfg, WalletCfg};
@@ -106,16 +107,74 @@ fn expect_null(result: Result<()>) -> Result<()> {
}
}
-/// Bitcoin RPC connection
-pub struct Rpc {
- last_call: Instant,
+pub struct JsonSocket {
path: String,
- id: u64,
cookie: String,
- conn: BufReader<TcpStream>,
+ sock: TcpStream,
buf: Vec<u8>,
}
+impl JsonSocket {
+ async fn call<T>(&mut self, body: &impl serde::Serialize) -> Result<T>
+ where
+ T: serde::de::DeserializeOwned,
+ {
+ let buf = &mut self.buf;
+ let sock = &mut self.sock;
+ buf.clear();
+ serde_json::to_writer(&mut *buf, body)?;
+ let body_len = buf.len();
+
+ // Write HTTP request
+ writeln!(buf, "POST {} HTTP/1.1\r", self.path)?;
+ // Write headers
+ writeln!(buf, "Accept: application/json-rpc\r")?;
+ writeln!(buf, "Authorization: {}\r", self.cookie)?;
+ writeln!(buf, "Content-Type: application/json-rpc\r")?;
+ writeln!(buf, "Content-Length: {}\r", body_len)?;
+ // Write separator
+ writeln!(buf, "\r")?;
+ let (body, head) = buf.split_at(body_len);
+ let mut vectors = [IoSlice::new(head), IoSlice::new(body)];
+ let mut vectors = vectors.as_mut_slice();
+ while !vectors.is_empty() {
+ let written = sock.write_vectored(&vectors).await?;
+ IoSlice::advance_slices(&mut vectors, written);
+ }
+ sock.flush().await?;
+
+ // Skip response
+
+ buf.clear();
+ let header_pos = loop {
+ let amount = sock.read_buf(buf).await?;
+ if amount == 0 {
+ return Err(Error::Transport(io::Error::new(
+ io::ErrorKind::UnexpectedEof,
+ "End of file reached unexpectedly",
+ )));
+ }
+ if let Some(header_pos) = buf
+ .windows(4)
+ .position(|w| *w == [b'\r', b'\n', b'\r', b'\n'])
+ {
+ if buf.ends_with(&[b'\n']) {
+ break header_pos;
+ }
+ }
+ };
+ // Read body
+ let response = serde_json::from_slice(&buf[header_pos + 4..])?;
+ Ok(response)
+ }
+}
+
+/// Bitcoin RPC connection
+pub struct Rpc {
+ socket: JsonSocket,
+ id: u64,
+}
+
impl Rpc {
/// Start a RPC connection
pub async fn common(cfg: &RpcCfg) -> io::Result<Self> {
@@ -141,64 +200,31 @@ impl Rpc {
};
// Open connection
let sock = timeout(Duration::from_secs(5), TcpStream::connect(&cfg.addr)).await??;
- let conn = BufReader::new(sock);
+ sock.set_nodelay(true).ok();
Ok(Self {
- last_call: Instant::now(),
- path,
id: 0,
- cookie: format!("Basic {}", BASE64_STANDARD.encode(&token)),
- conn,
- buf: Vec::new(),
+ socket: JsonSocket {
+ path,
+ cookie: format!("Basic {}", BASE64_STANDARD.encode(&token)),
+ sock,
+ buf: Vec::with_capacity(16 * 1024),
+ },
})
}
- async fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T>
+ async fn call<T>(&mut self, method: &str, params: &(impl serde::Serialize + Debug)) -> Result<T>
where
T: serde::de::DeserializeOwned + Debug,
{
+ trace!("RPC > {method} {params:?}");
let request = RpcRequest {
method,
id: self.id,
params,
};
-
- // Serialize the body first so we can set the Content-Length header.
- let body = serde_json::to_vec(&request)?;
- let buf = &mut self.buf;
- buf.clear();
- // Write HTTP request
- {
- let sock = self.conn.get_mut();
- // Send HTTP request
- writeln!(buf, "POST {} HTTP/1.1\r", self.path)?;
- // Write headers
- writeln!(buf, "Accept: application/json-rpc\r")?;
- writeln!(buf, "Authorization: {}\r", self.cookie)?;
- writeln!(buf, "Content-Type: application/json-rpc\r")?;
- writeln!(buf, "Content-Length: {}\r", body.len())?;
- // Write separator
- writeln!(buf, "\r")?;
- sock.write_all(buf).await?;
- buf.clear();
- // Write body
- sock.write_all(&body).await?;
- sock.flush().await?;
- }
- // Skip response
- let sock = &mut self.conn;
- loop {
- let amount = sock.read_until(b'\n', buf).await?;
- let sep = buf[..amount] == [b'\r', b'\n'];
- buf.clear();
- if sep {
- break;
- }
- self.last_call = Instant::now();
- }
- // Read body
- let amount = sock.read_until(b'\n', buf).await?;
- let response: RpcResponse<T> = serde_json::from_slice(&buf[..amount])?;
+ let response: RpcResponse<T> = self.socket.call(&request).await?;
+ trace!("RPC < {response:?}");
match response {
RpcResponse::RpcResponse { result, error, id } => {
assert_eq!(self.id, id);
diff --git a/instrumentation/conf/bitcoin.conf b/instrumentation/conf/bitcoin.conf
@@ -2,7 +2,7 @@ regtest=1
txindex=1
maxtxfee=0.01
fallbackfee=0.00000001
-rpcservertimeout=15
+rpcservertimeout=0
dbcache=4
maxmempool=5
par=2
diff --git a/instrumentation/conf/bitcoin2.conf b/instrumentation/conf/bitcoin2.conf
@@ -1,13 +0,0 @@
-regtest=1
-txindex=1
-maxtxfee=0.01
-fallbackfee=0.00000001
-rpcservertimeout=0
-dbcache=4
-maxmempool=5
-par=2
-rpcthreads=5
-
-[regtest]
-port=8346
-rpcport=18346
-\ No newline at end of file
diff --git a/instrumentation/src/btc.rs b/instrumentation/src/btc.rs
@@ -150,7 +150,7 @@ impl BtcCtx {
btc_rpc_port,
);
Self::patch_btc_config(
- "instrumentation/conf/bitcoin2.conf",
+ "instrumentation/conf/bitcoin.conf",
ctx.wire2_dir.join("bitcoin.conf"),
btc2_port,
btc2_rpc_port,
diff --git a/instrumentation/src/utils.rs b/instrumentation/src/utils.rs
@@ -38,6 +38,8 @@ use indicatif::ProgressBar;
use ini::Ini;
use tempfile::TempDir;
+const LOG: &str = "INFO";
+
#[must_use]
pub async fn check_incoming(base_url: &str, txs: &[(EddsaPublicKey, Amount)]) -> bool {
let mut res = ureq::get(&format!("{base_url}history/incoming"))
@@ -84,15 +86,19 @@ pub async fn check_gateway_up(base_url: &str) -> bool {
}
pub async fn transfer(base_url: &str, wtid: &[u8; 32], credit_account: PaytoURI, amount: &Amount) {
- ureq::post(&format!("{base_url}transfer"))
- .send_json(TransferRequest {
+ loop {
+ let res = ureq::post(&format!("{base_url}transfer")).send_json(TransferRequest {
request_uid: Base32::rand(),
amount: amount.clone(),
exchange_base_url: Url::parse("https://exchange.test/").unwrap(),
wtid: Base32::from(*wtid),
- credit_account,
- })
- .unwrap();
+ credit_account: credit_account.clone(),
+ });
+ if !matches!(res, Err(ureq::Error::StatusCode(502))) {
+ res.unwrap();
+ break;
+ }
+ }
}
#[must_use]
@@ -171,7 +177,7 @@ macro_rules! retry_opt {
let start = std::time::Instant::now();
loop {
let result = $expr;
- if result.is_err() && start.elapsed() < std::time::Duration::from_secs(20) {
+ if result.is_err() && start.elapsed() < std::time::Duration::from_secs(30) {
tokio::time::sleep(std::time::Duration::from_millis(500)).await;
} else {
return result.unwrap();
@@ -335,20 +341,38 @@ impl TalerCtx {
// Run gateway
self.gateway = Some(cmd_redirect(
&self.wire_bin_path,
- &["-c", self.conf.to_string_lossy().as_ref(), "serve"],
+ &[
+ "-c",
+ self.conf.to_string_lossy().as_ref(),
+ "-L",
+ LOG,
+ "serve",
+ ],
self.log("gateway"),
));
// Start wires
self.wire = Some(cmd_redirect(
&self.wire_bin_path,
- &["-c", self.conf.to_string_lossy().as_ref(), "worker"],
+ &[
+ "-c",
+ self.conf.to_string_lossy().as_ref(),
+ "-L",
+ LOG,
+ "worker",
+ ],
self.log("worker"),
));
self.wire2 = self.stressed.then(|| {
cmd_redirect(
&self.wire_bin_path,
- &["-c", self.conf.to_string_lossy().as_ref(), "worker"],
+ &[
+ "-c",
+ self.conf.to_string_lossy().as_ref(),
+ "-L",
+ LOG,
+ "worker",
+ ],
self.log("worker+"),
)
});