commit ca6277fe082e3e025cc83d6e2366238c639874c8
parent e3ac7f84d0c8e134c67d93a401c271dd89f1ac6f
Author: Antoine A <>
Date: Mon, 27 Dec 2021 15:54:08 +0100
Improve btc_wire architecture to remove Active state
Diffstat:
8 files changed, 263 insertions(+), 270 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -237,6 +237,15 @@ dependencies = [
]
[[package]]
+name = "crc32fast"
+version = "1.3.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "738c290dfaea84fc1ca15ad9c168d083b05a714e1efddd8edaab678dc28d2836"
+dependencies = [
+ "cfg-if",
+]
+
+[[package]]
name = "criterion"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -468,6 +477,18 @@ dependencies = [
]
[[package]]
+name = "flate2"
+version = "1.0.22"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "1e6988e897c1c9c485f43b47a529cef42fde0547f9d8d41a7062518f1d8fc53f"
+dependencies = [
+ "cfg-if",
+ "crc32fast",
+ "libc",
+ "miniz_oxide 0.4.4",
+]
+
+[[package]]
name = "flexi_logger"
version = "0.20.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ -841,6 +862,16 @@ dependencies = [
[[package]]
name = "miniz_oxide"
+version = "0.4.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "a92518e98c078586bc6c934028adcca4c92a53d6a958196de835170a01d84e4b"
+dependencies = [
+ "adler",
+ "autocfg",
+]
+
+[[package]]
+name = "miniz_oxide"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2b29bd4bc3f33391105ebee3589c19197c4271e3e5a9ec9bfe8127eeff8f082"
@@ -922,9 +953,9 @@ dependencies = [
[[package]]
name = "owo-colors"
-version = "3.1.1"
+version = "3.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8d3b1ca05e7e4171727a5dab03790a344f248eaad925dce8ba0014fd78392b88"
+checksum = "20448fd678ec04e6ea15bbe0476874af65e98a01515d667aa49f1434dc44ebf4"
[[package]]
name = "parking_lot"
@@ -1066,9 +1097,9 @@ checksum = "ed0cfbc8191465bed66e1718596ee0b0b35d5ee1f41c5df2189d0fe8bde535ba"
[[package]]
name = "proc-macro2"
-version = "1.0.34"
+version = "1.0.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2f84e92c0f7c9d58328b85a78557813e4bd845130db68d7184635344399423b1"
+checksum = "392a54546fda6b7cc663379d0e6ce8b324cf88aecc5a499838e1be9781bdce2e"
dependencies = [
"unicode-xid",
]
@@ -1465,9 +1496,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]]
name = "syn"
-version = "1.0.82"
+version = "1.0.84"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "8daf5dd0bb60cbd4137b1b587d2fc0ae729bc07cf01cd70b36a1ed5ade3b9d59"
+checksum = "ecb2e6da8ee5eb9a61068762a32fa9619cc591ceb055b3687f4cd4051ec2e06b"
dependencies = [
"proc-macro2",
"quote",
@@ -1672,9 +1703,9 @@ checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]]
name = "typenum"
-version = "1.14.0"
+version = "1.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "b63708a265f51345575b27fe43f9500ad611579e764c79edbc2037b1121959ec"
+checksum = "dcf81ac59edc17cc8697ff311e8f5ef2d99fcbd9817b34cec66f90b6c3dfd987"
[[package]]
name = "unicode-bidi"
@@ -1717,12 +1748,13 @@ checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "ureq"
-version = "2.3.1"
+version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c5c448dcb78ec38c7d59ec61f87f70a98ea19171e06c139357e012ee226fec90"
+checksum = "9399fa2f927a3d327187cbd201480cee55bee6ac5d3c77dd27f0c6814cff16d5"
dependencies = [
"base64",
"chunked_transfer",
+ "flate2",
"log",
"once_cell",
"rustls",
@@ -1921,7 +1953,7 @@ dependencies = [
"deadpool-postgres",
"hyper",
"listenfd",
- "miniz_oxide",
+ "miniz_oxide 0.5.1",
"rand",
"serde",
"serde_json",
diff --git a/btc-wire/src/bin/test.rs b/btc-wire/src/bin/test.rs
@@ -297,7 +297,7 @@ pub fn main() {
fn tx_exist(rpc: &BtcRpc, id: &Txid, min_confirmation: i32, detail: Category) -> rpc::Result<bool> {
let result = rpc.list_since_block(None, 1, false).unwrap();
let found = result.transactions.into_iter().any(|tx| {
- tx.detail.category == detail && tx.confirmations >= min_confirmation && tx.txid == *id
+ tx.category == detail && tx.confirmations >= min_confirmation && tx.txid == *id
});
Ok(found)
}
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -3,7 +3,7 @@ use btc_wire::{
rpc::{BtcRpc, Category},
rpc_utils::{default_data_dir, sender_address},
segwit::DecodeSegWitErr,
- BounceErr, GetOpReturnErr, GetSegwitErr,
+ GetOpReturnErr, GetSegwitErr,
};
use postgres::{fallible_iterator::FallibleIterator, Client, NoTls};
use rand::{rngs::OsRng, RngCore};
@@ -27,14 +27,12 @@ mod fail_point;
enum Status {
/// Client have ask for a transaction
Proposed = 0,
- /// The wire is working has started handling this transaction
- Active = 1,
/// Transaction have been announced to the bitcoin network
- Pending = 2,
+ Pending = 1,
/// The wire failed to send this transaction and will try later
- Delayed = 3,
+ Delayed = 2,
/// Transaction have been mined and confirmed
- Confirmed = 4,
+ Confirmed = 3,
}
impl TryFrom<u8> for Status {
@@ -46,7 +44,6 @@ impl TryFrom<u8> for Status {
x if x == Status::Pending as u8 => Ok(Status::Pending),
x if x == Status::Confirmed as u8 => Ok(Status::Confirmed),
x if x == Status::Delayed as u8 => Ok(Status::Delayed),
- x if x == Status::Active as u8 => Ok(Status::Active),
_ => Err(()),
}
}
@@ -157,12 +154,12 @@ impl AutoReconnectRPC {
Ok(new) => match new.net_info() {
Ok(_) => return new,
Err(err) => {
- error!("connect: DB - {}", err);
+ error!("connect: RPC - {}", err);
std::thread::sleep(delay);
- },
+ }
},
Err(err) => {
- error!("connect: DB - {}", err);
+ error!("connect:RPC - {}", err);
std::thread::sleep(delay);
}
}
@@ -221,7 +218,7 @@ fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> {
}
/// Listen for new proposed transactions and announce them on the bitcoin network
-fn sender(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, _config: &Config) {
+fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config) {
// List all transactions with the given status
fn list_status(db: &mut Client, status: Status) -> Result<Vec<i32>, postgres::Error> {
let mut iter =
@@ -233,32 +230,19 @@ fn sender(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, _config: &Config)
return Ok(out);
}
- // List all transactions waiting to be sent
- fn list_active(db: &mut Client) -> Result<Vec<(i32, [u8; 32])>, postgres::Error> {
- let mut iter = db.query_raw(
- "SELECT id, wtid FROM tx_out WHERE status=$1",
- &[&(Status::Active as i16)],
- )?;
- let mut out = Vec::new();
- while let Some(row) = iter.next()? {
- let wtid: &[u8] = row.get(1);
- out.push((row.get(0), wtid.try_into().unwrap()));
- }
- return Ok(out);
- }
-
// Perform a transaction on the blockchain
- // The transaction must be in the manual state
fn perform_send(
db: &mut Client,
rpc: &BtcRpc,
id: i32,
+ status: Status,
) -> Result<(), Box<dyn std::error::Error>> {
+ assert!(status == Status::Delayed || status == Status::Proposed);
let mut tx = db.transaction()?;
// We lock the row with FOR UPDATE to prevent sending same transaction multiple time
let iter = tx.query_opt(
"SELECT amount, wtid, credit_acc, exchange_url FROM tx_out WHERE id=$1 AND status=$2 FOR UPDATE",
- &[&id, &(Status::Active as i16)],
+ &[&id, &(status as i16)],
)?;
if let Some(row) = iter {
let amount = taler_amount_to_btc_amount(&Amount::from_str(row.get(0))?)?;
@@ -313,66 +297,23 @@ fn sender(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, _config: &Config)
let mut iter = ntf.iter();
while let Some(_) = iter.next()? {}
}
- // Try to recover transactions stuck in active
- let mut actives = list_active(db)?;
- if !actives.is_empty() {
- let last_hash = last_hash(db)?;
- let txs = rpc.list_since_block(last_hash.as_ref(), 1, false)?;
- // Search for a matching unconfirmed transactions
- for tx in txs.transactions {
- if tx.detail.category == Category::Send {
- if let Ok((_, bytes)) = rpc.get_tx_op_return(&tx.txid) {
- let (wtid, _) = decode_info(&bytes);
- if let Some(pos) = actives.iter().position(|(_, it)| it == &wtid) {
- let (id, wtid) = actives.swap_remove(pos);
- let nb = db.execute(
- "UPDATE tx_out SET status=$1 WHERE wtid=$2 AND status=$3 RETURNING id",
- &[
- &(Status::Pending as i16),
- &wtid.as_ref(),
- &(Status::Active as i16),
- ],
- )?;
- if nb > 0 {
- warn!("sender: tx {} have been recovered automatically", id);
- }
- }
- }
- }
- }
- // If nothing match, retry to send
- for (id, _) in actives {
- perform_send(db, &rpc, id)?;
- }
- }
+ // Sync chain
+ sync_chain(rpc, db, config)?;
+
+ // As we are now in sync with the blockchain if a transaction is in proposed or delayed state it have not been sent
+
// Send delayed transactions
for id in list_status(db, Status::Delayed)? {
- // Set status to Active to detect database error preventing atomicity
- let nb = db.execute(
- "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3",
- &[&(Status::Active as i16), &id, &(Status::Delayed as i16)],
- )?;
- if nb == 0 {
- warn!("sender: transaction status collision, database have been altered by another process");
- }
- perform_send(db, &rpc, id)?;
+ perform_send(db, &rpc, id, Status::Delayed)?;
}
// Send proposed transactions
for id in list_status(db, Status::Proposed)? {
- // Set status to Active to detect database error preventing atomicity
- let nb = db.execute(
- "UPDATE tx_out SET status=$1 WHERE id=$2 AND status=$3",
- &[&(Status::Active as i16), &id, &(Status::Proposed as i16)],
- )?;
- if nb == 0 {
- warn!("sender: transaction status collision, database have been altered by another process");
- }
- perform_send(db, &rpc, id)?;
+ perform_send(db, &rpc, id, Status::Proposed)?;
}
Ok(())
})();
if let Err(e) = result {
- error!("sender: DB - {}", e);
+ error!("worker: DB - {}", e);
failed = true;
} else {
failed = false;
@@ -380,171 +321,178 @@ fn sender(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, _config: &Config)
}
}
-/// Listen for mined block and index confirmed transactions into the database
-fn watcher(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config) {
- let mut init = false;
- loop {
- let rpc = rpc.client();
- let db = db.client();
+/// Parse new transactions, if exit whiteout failing the database is up to date with the latest mined block
+fn sync_chain(
+ rpc: &mut BtcRpc,
+ db: &mut Client,
+ config: &Config,
+) -> Result<(), Box<dyn std::error::Error>> {
+ // Get stored last_hash
+ let last_hash = last_hash(db)?;
+ let confirmation = config.confirmation;
+
+ // Get a set of transactions ids to parse
+ let (txs, lastblock): (HashMap<Txid, Category>, BlockHash) = {
+ // Get all transactions made since this block
+ let list = rpc.list_since_block(last_hash.as_ref(), confirmation, true)?;
+ // Only keep ids and category
+ let txs: HashMap<Txid, Category> = list
+ .transactions
+ .into_iter()
+ .map(|tx| (tx.txid, tx.category))
+ .collect();
+ (txs, list.lastblock)
+ };
- let result: Result<(), Box<dyn std::error::Error>> = (|| {
- // Get stored last_hash
- let last_hash = last_hash(db)?;
- // Get all transactions made since this block
- let list = rpc.list_since_block(last_hash.as_ref(), config.confirmation, true)?;
- // Keep only confirmed send and receive transactions
- let txs: HashMap<Txid, Category> = list
- .transactions
- .into_iter()
- .filter_map(|tx| {
- let cat = tx.detail.category;
- (tx.confirmations >= config.confirmation as i32
- && (cat == Category::Send || cat == Category::Receive))
- .then(|| (tx.txid, cat))
- })
- .collect();
-
- for (id, category) in txs {
- match category {
- Category::Send => match rpc.get_tx_op_return(&id) {
- Ok((full, bytes)) => {
- let (wtid, url) = decode_info(&bytes);
- let mut tx = db.transaction()?;
- let row = tx.query_opt(
- "SELECT status, id FROM tx_out WHERE wtid=$1 FOR UPDATE",
- &[&wtid.as_ref()],
- )?;
- if let Some(row) = row {
- let status: i16 = row.get(0);
- let _id: i32 = row.get(1);
- let status: Status = Status::try_from(status as u8).unwrap();
- if status != Status::Confirmed {
+ for (id, category) in txs {
+ match category {
+ Category::Send => {
+ match rpc.get_tx_op_return(&id) {
+ Ok((full, bytes)) => {
+ let (wtid, url) = decode_info(&bytes);
+ let expected_status = if full.confirmations >= confirmation as i32 {
+ Status::Confirmed
+ } else {
+ Status::Pending
+ };
+ let mut tx = db.transaction()?;
+ let row = tx.query_opt(
+ "SELECT status, id FROM tx_out WHERE wtid=$1 FOR UPDATE",
+ &[&wtid.as_ref()],
+ )?;
+ if let Some(row) = row {
+ let status: i16 = row.get(0);
+ let _id: i32 = row.get(1);
+ let status: Status = Status::try_from(status as u8).unwrap();
+ if status != Status::Confirmed {
+ if status != expected_status {
tx.execute(
"UPDATE tx_out SET status=$1 where id=$2",
- &[&(Status::Confirmed as i16), &_id],
+ &[&(expected_status as i16), &_id],
)?;
- if status == Status::Proposed {
- warn!("watcher: tx {} is present on chain at {} while being in proposed status", _id, id);
- } else if status == Status::Active {
+ if status == Status::Delayed || status == Status::Proposed {
warn!(
"watcher: tx {} have been recovered automatically",
_id
);
- } else {
+ } else if expected_status == Status::Confirmed {
info!("{} CONFIRMED", &id);
}
}
- } else {
- let debit_addr = sender_address(&rpc, &full)?;
- let credit_addr = full.details[0].address.as_ref().unwrap();
- let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
- let amount = btc_amount_to_taler_amount(&full.amount);
- // Generate a random request_uid
- let mut request_uid = [0; 64];
- OsRng.fill_bytes(&mut request_uid);
- let nb = tx.execute(
- "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
- &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(Status::Confirmed as i16), &id.as_ref(), &request_uid.as_ref()
- ],
- )?;
- if nb > 0 {
- warn!("watcher: found an unregistered outgoing address {} {} in tx {}", crockford_base32_encode(&wtid), &url, id);
- }
}
- tx.commit()?;
- }
- Err(err) => match err {
- GetOpReturnErr::MissingOpReturn => {} // ignore
- err => warn!("send: {} {}", id, err),
- },
- },
- Category::Receive => match rpc.get_tx_segwit_key(&id) {
- Ok((full, reserve_pub)) => {
+ } else {
let debit_addr = sender_address(&rpc, &full)?;
let credit_addr = full.details[0].address.as_ref().unwrap();
let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
let amount = btc_amount_to_taler_amount(&full.amount);
- let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
- &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref()
- ])?;
+ // Generate a random request_uid
+ let mut request_uid = [0; 64];
+ OsRng.fill_bytes(&mut request_uid);
+ let nb = tx.execute(
+ "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
+ &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(expected_status as i16), &id.as_ref(), &request_uid.as_ref()
+ ],
+ )?;
if nb > 0 {
- info!("{} << {} {}", &debit_addr, &credit_addr, &amount);
+ warn!("watcher: found an unregistered outgoing address {} {} in tx {}", crockford_base32_encode(&wtid), &url, id);
}
}
- Err(err) => match err {
- GetSegwitErr::Decode(
- DecodeSegWitErr::MissingSegWitAddress
- | DecodeSegWitErr::NoMagicIdMatch,
- ) => {
- // Skip debounce while getting the database in sync to prevent double spent
- if !init && false {
- let nb = db.execute("INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", &[&id.as_ref()])?;
- // Check if already bounced
- if nb > 0 && false {
- // We do not handle failures, bouncing is done in a best effort manner
- match rpc
- .bounce(&id, &BtcAmount::from_sat(config.bounce_fee))
- {
- Ok(it) => {
- info!("bounce {} in {}", &id, &it);
- db.execute(
- "UPDATE bounce SET txid = $1 WHERE bounced = $2",
- &[&it.as_ref(), &id.as_ref()],
- )?;
- }
- Err(err) => match err {
- BounceErr::AmountLessThanFee => { /* Ignore */ }
- BounceErr::NotAReceiveTransaction
- | BounceErr::RPC(_) => Err(err)?,
- },
- }
- }
- }
- }
- err => warn!("receive: {} {}", id, err),
- },
+ tx.commit()?;
+ }
+ Err(err) => match err {
+ GetOpReturnErr::MissingOpReturn => {} // ignore
+ err => warn!("send: {} {}", id, err),
},
- Category::Generate | Category::Immature | Category::Orphan => {}
}
}
- // Move last_hash forward if no error have been caught
- {
- let mut tx = db.transaction()?;
- let curr_hash: Option<BlockHash> = tx
- .query_opt(
- "SELECT value FROM state WHERE name='last_hash' FOR UPDATE",
- &[],
- )?
- .map(|r| BlockHash::from_slice(r.get(0)).unwrap());
-
- if last_hash != curr_hash {
- error!("watcher: hash state collision, database have been altered by another process");
- tx.execute("DELETE FROM state WHERE name = 'last_hash'", &[])?;
- } else if curr_hash.is_some() {
- tx.execute(
- "UPDATE state SET value=$1 WHERE name='last_hash'",
- &[&list.lastblock.as_ref()],
- )?;
- } else {
- tx.execute(
- "INSERT INTO state (name, value) VALUES ('last_hash', $1)",
- &[&list.lastblock.as_ref()],
- )?;
- };
- tx.commit()?;
+ Category::Receive => match rpc.get_tx_segwit_key(&id) {
+ Ok((full, reserve_pub)) => {
+ let debit_addr = sender_address(&rpc, &full)?;
+ let credit_addr = full.details[0].address.as_ref().unwrap();
+ let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
+ let amount = btc_amount_to_taler_amount(&full.amount);
+ let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
+ &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref()
+ ])?;
+ if nb > 0 {
+ info!("{} << {} {}", &debit_addr, &credit_addr, &amount);
+ }
+ }
+ Err(err) => match err {
+ GetSegwitErr::Decode(
+ DecodeSegWitErr::MissingSegWitAddress | DecodeSegWitErr::NoMagicIdMatch,
+ ) => {
+ db.execute("INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", &[&id.as_ref()])?;
+ // Check if already bounced
+ /*if nb > 0 && false {
+ // We do not handle failures, bouncing is done in a best effort manner
+ match rpc.bounce(&id, &BtcAmount::from_sat(config.bounce_fee)) {
+ Ok(it) => {
+ info!("bounce {} in {}", &id, &it);
+ db.execute(
+ "UPDATE bounce SET txid = $1 WHERE bounced = $2",
+ &[&it.as_ref(), &id.as_ref()],
+ )?;
+ }
+ Err(err) => match err {
+ BounceErr::AmountLessThanFee => { /* Ignore */ }
+ BounceErr::NotAReceiveTransaction | BounceErr::RPC(_) => {
+ Err(err)?
+ }
+ },
+ }
+ }*/
+ }
+ err => warn!("receive: {} {}", id, err),
+ },
+ },
+ Category::Generate | Category::Immature | Category::Orphan => {
+ // Ignore coinbase transactions
}
- // Notify a new block have been scanned
+ }
+ }
+ // Move last_hash forward if no error have been caught
+ {
+ let mut tx = db.transaction()?;
+ let curr_hash: Option<BlockHash> = tx
+ .query_opt(
+ "SELECT value FROM state WHERE name='last_hash' FOR UPDATE",
+ &[],
+ )?
+ .map(|r| BlockHash::from_slice(r.get(0)).unwrap());
+
+ if last_hash != curr_hash {
+ error!("watcher: hash state collision, database have been altered by another process");
+ }
+
+ if curr_hash.is_some() {
+ tx.execute(
+ "UPDATE state SET value=$1 WHERE name='last_hash'",
+ &[&lastblock.as_ref()],
+ )?;
+ } else {
+ tx.execute(
+ "INSERT INTO state (name, value) VALUES ('last_hash', $1)",
+ &[&lastblock.as_ref()],
+ )?;
+ };
+ tx.commit()?;
+ }
+ Ok(())
+}
+
+fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) {
+ loop {
+ let result: Result<(), Box<dyn std::error::Error>> = (|| {
+ let rpc = rpc.client();
+ let db = db.client();
+ rpc.wait_for_new_block(0).ok();
db.execute("NOTIFY new_block", &[])?;
Ok(())
})();
if let Err(e) = result {
- error!("watcher: DB - {}", e);
- } else {
- init = true;
+ error!("listener: DB - {}", e);
}
-
- info!("watcher: Wait for block");
- rpc.wait_for_new_block(0).ok();
}
}
@@ -572,22 +520,22 @@ fn main() {
};
let rpc = BtcRpc::common(&data_dir, network).unwrap();
rpc.load_wallet(&config.btc_wallet).ok();
- let rpc_watcher = AutoReconnectRPC::new(
+ let rpc_listener = AutoReconnectRPC::new(
&data_dir,
network,
&config.btc_wallet,
Duration::from_secs(5),
);
- let rpc_sender = AutoReconnectRPC::new(
+ let rpc_worker = AutoReconnectRPC::new(
&data_dir,
network,
&config.btc_wallet,
Duration::from_secs(5),
);
- let db_watcher = AutoReconnectSql::new(&config.db_url, Duration::from_secs(5));
- let db_sender = AutoReconnectSql::new(&config.db_url, Duration::from_secs(5));
- let join = std::thread::spawn(move || sender(rpc_sender, db_sender, config));
- watcher(rpc_watcher, db_watcher, config);
+ let db_listener = AutoReconnectSql::new(&config.db_url, Duration::from_secs(5));
+ let db_worker = AutoReconnectSql::new(&config.db_url, Duration::from_secs(5));
+ let join = std::thread::spawn(move || block_listener(rpc_listener, db_listener));
+ worker(rpc_worker, db_worker, config);
join.join().unwrap();
}
diff --git a/btc-wire/src/rpc.rs b/btc-wire/src/rpc.rs
@@ -67,12 +67,7 @@ impl BtcRpc {
fn new(data_dir: &Path, path: String, network: Network) -> io::Result<Self> {
let cookie_path = data_dir.join(rpc_dir(network)).join(".cookie");
let cookie = std::fs::read(cookie_path)?;
- let agent = ureq::builder()
- .redirects(0)
- .timeout_connect(Duration::from_secs(5))
- .timeout_write(Duration::from_secs(5))
- .timeout_read(Duration::from_secs(5))
- .build();
+ let agent = ureq::builder().redirects(0).build();
Ok(Self {
path,
agent,
@@ -81,7 +76,12 @@ impl BtcRpc {
})
}
- fn call<T>(&self, method: &str, params: &impl serde::Serialize) -> Result<T>
+ fn call<T>(
+ &self,
+ method: &str,
+ params: &impl serde::Serialize,
+ timeout: Option<Duration>,
+ ) -> Result<T>
where
T: serde::de::DeserializeOwned + Debug,
{
@@ -95,7 +95,7 @@ impl BtcRpc {
.set("Authorization", &self.cookie)
.set("Content-Type", "application/json-rpc")
.set("Accept", "application/json-rpc")
- .timeout(Duration::from_secs(10))
+ .timeout(timeout.unwrap_or(Duration::from_secs(10)))
.send_bytes(&body);
let response = match result {
Ok(it) => it,
@@ -119,37 +119,41 @@ impl BtcRpc {
pub fn net_info(&self) -> Result<Empty> {
let params: [(); 0] = [];
- self.call("getnetworkinfo", ¶ms)
+ self.call("getnetworkinfo", ¶ms, None)
}
pub fn load_wallet(&self, name: &str) -> Result<Wallet> {
- self.call("loadwallet", &[name])
+ self.call("loadwallet", &[name], None)
}
pub fn create_wallet(&self, name: &str) -> Result<Wallet> {
- self.call("createwallet", &[name])
+ self.call("createwallet", &[name], None)
}
pub fn get_new_address(&self) -> Result<Address> {
- self.call("getnewaddress", &[()])
+ self.call("getnewaddress", &[()], None)
}
pub fn generate(&self, nb: u16, address: &Address) -> Result<Vec<BlockHash>> {
- self.call("generatetoaddress", &(nb, address))
+ self.call(
+ "generatetoaddress",
+ &(nb, address),
+ Some(Duration::from_secs(10 * 60 + 10)),
+ )
}
pub fn wait_for_new_block(&self, timeout: u64) -> Result<Empty> {
- self.call("waitfornewblock", &[timeout])
+ self.call("waitfornewblock", &[timeout], None)
}
pub fn get_balance(&self) -> Result<Amount> {
- let btc: f64 = self.call("getbalance", &[()])?;
+ let btc: f64 = self.call("getbalance", &[()], None)?;
Ok(Amount::from_btc(btc).unwrap())
}
pub fn send(&self, address: &Address, amount: &Amount, subtract_fee: bool) -> Result<Txid> {
let btc = amount.as_btc();
- self.call("sendtoaddress", &(address, btc, (), (), subtract_fee))
+ self.call("sendtoaddress", &(address, btc, (), (), subtract_fee), None)
}
/// Send transaction to multiple recipients
@@ -163,7 +167,7 @@ impl BtcRpc {
.map(|(addr, amount)| (addr.to_string(), amount.as_btc().into()))
.collect(),
);
- self.call("sendmany", &("", amounts))
+ self.call("sendmany", &("", amounts), None)
}
pub fn send_custom<'a, 'b, 'c>(
@@ -193,10 +197,11 @@ impl BtcRpc {
vec
}),
],
+ None,
)?;
- let funded: HexWrapper = self.call("fundrawtransaction", &[hex])?;
- let signed: HexWrapper = self.call("signrawtransactionwithwallet", &[&funded.hex])?;
- self.call("sendrawtransaction", &[&signed.hex])
+ let funded: HexWrapper = self.call("fundrawtransaction", &[hex], None)?;
+ let signed: HexWrapper = self.call("signrawtransactionwithwallet", &[&funded.hex], None)?;
+ self.call("sendrawtransaction", &[&signed.hex], None)
}
pub fn list_since_block(
@@ -205,15 +210,19 @@ impl BtcRpc {
confirmation: u8,
include_remove: bool,
) -> Result<ListSinceBlock> {
- self.call("listsinceblock", &(hash, confirmation, (), include_remove))
+ self.call(
+ "listsinceblock",
+ &(hash, confirmation, (), include_remove),
+ None,
+ )
}
pub fn get_tx(&self, id: &Txid) -> Result<TransactionFull> {
- self.call("gettransaction", &(id, (), true))
+ self.call("gettransaction", &(id, (), true), None)
}
pub fn get_raw(&self, id: &Txid) -> Result<RawTransaction> {
- self.call("getrawtransaction", &(id, true))
+ self.call("getrawtransaction", &(id, true), None)
}
}
@@ -276,9 +285,7 @@ pub struct TransactionDetail {
pub struct ListTransaction {
pub confirmations: i32,
pub txid: Txid,
- pub time: u64,
- #[serde(flatten)]
- pub detail: TransactionDetail,
+ pub category: Category,
}
#[derive(Debug, serde::Deserialize)]
diff --git a/makefile b/makefile
@@ -6,5 +6,5 @@ test:
script/test_gateway.sh
script/test_btc_wire.sh
script/test_recover_db.sh
- script/test_btc_stress.sh
- script/test_btc_fail.sh
-\ No newline at end of file
+ script/test_btc_fail.sh
+ script/test_btc_stress.sh
+\ No newline at end of file
diff --git a/script/setup.sh b/script/setup.sh
@@ -19,14 +19,14 @@ function reset_db() {
function init_btc() {
BTC_DIR=$(mktemp -d)
BTC_CLI="bitcoin-cli -regtest -datadir=$BTC_DIR"
- bitcoind -datadir=$BTC_DIR -txindex -regtest -fallbackfee=0.00000001 &> btc.log &
+ bitcoind -datadir=$BTC_DIR -txindex -regtest -fallbackfee=0.00000001 -rpcworkqueue=64 &> btc.log &
$BTC_CLI -rpcwait getnetworkinfo > /dev/null
}
-# Start a bitcoind regest server in a previously created temprorary directory and load wallets
+# Start a bitcoind regest server in a previously created temporary directory and load wallets
function restart_btc() {
BTC_CLI="bitcoin-cli -regtest -datadir=$BTC_DIR"
- bitcoind -datadir=$BTC_DIR -txindex -regtest -fallbackfee=0.00000001 &>> btc.log &
+ bitcoind -datadir=$BTC_DIR -txindex -regtest -fallbackfee=0.00000001 -rpcworkqueue=64 &>> btc.log &
$BTC_CLI -rpcwait getnetworkinfo > /dev/null
for wallet in wire client reserve; do
$BTC_CLI loadwallet $wallet > /dev/null
diff --git a/script/test_btc_fail.sh b/script/test_btc_fail.sh
@@ -65,9 +65,12 @@ for n in `$SEQ`; do
-b $BANK_ENDPOINT \
-C payto://bitcoin/$CLIENT \
-a BTC:0.0000$n > /dev/null
+ mine_btc
done
sleep 15
mine_btc # Mine transactions
+sleep 15
+mine_btc # Mine transactions
echo " OK"
echo -n "Requesting exchange outgoing transaction list:"
diff --git a/script/test_btc_stress.sh b/script/test_btc_stress.sh
@@ -33,7 +33,7 @@ echo "Start gateway"
gateway
echo ""
-SEQ="seq 10 99"
+SEQ="seq 10 50"
function check() {
check_delta "$1?delta=-100" "$SEQ"
@@ -48,7 +48,7 @@ for n in `$SEQ`; do
done
sleep 3 # Give time for btc_wire watcher to process
next_btc # Confirm all transactions
-sleep 5 # Give time for btc_wire watcher to process
+sleep 3 # Give time for btc_wire watcher to process
echo " OK"
echo -n "Requesting exchange incoming transaction list:"
@@ -56,7 +56,7 @@ check incoming
echo " OK"
echo -n "Check balance:"
-check_balance 9.99438310 1.00490500
+check_balance 9.99844569 1.00123000
echo " OK"
echo "----- Handle outgoing -----"
@@ -71,6 +71,8 @@ for n in `$SEQ`; do
done
sleep 10 # Give time for btc_wire sender to process
next_btc # Mine transactions
+sleep 10 # Give time for btc_wire sender to process
+next_btc # Mine transactions
echo " OK"
echo -n "Requesting exchange outgoing transaction list:"
@@ -78,9 +80,10 @@ check outgoing
echo " OK"
echo -n "Check balance:"
-check_balance 9.99928810
+check_balance 9.99967569
echo " OK"
+next_btc # Mine transactions
echo "----- Recover DB -----"
echo "Reset database"
@@ -98,7 +101,7 @@ echo " OK"
echo -n "Check balance:"
# Balance should not have changed
-check_balance 9.99928810
+check_balance 9.99967569
echo " OK"
echo "All tests passed"
\ No newline at end of file