commit 976967b98ec4120382e981f0d029cca30d68c908
parent 68ccf4c10b0bd54d5c368187c16fd96e2053e714
Author: Antoine A <>
Date: Tue, 28 Dec 2021 15:43:44 +0100
btc-wire: simpler design with less diferent status and preparation for bouncing
Diffstat:
7 files changed, 382 insertions(+), 276 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -1054,9 +1054,9 @@ checksum = "ed0cfbc8191465bed66e1718596ee0b0b35d5ee1f41c5df2189d0fe8bde535ba"
[[package]]
name = "proc-macro2"
-version = "1.0.35"
+version = "1.0.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "392a54546fda6b7cc663379d0e6ce8b324cf88aecc5a499838e1be9781bdce2e"
+checksum = "c7342d5883fbccae1cc37a2353b09c87c9b0f3afd73f5fb9bba687a1f733b029"
dependencies = [
"unicode-xid",
]
@@ -1085,9 +1085,9 @@ dependencies = [
[[package]]
name = "quote"
-version = "1.0.10"
+version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "38bc8cc6a5f2e3655e0899c1b848643b2562f853f114bfec7be120678e3ace05"
+checksum = "47aa80447ce4daf1717500037052af176af5d38cc3e571d9ec1c7353fc10c87d"
dependencies = [
"proc-macro2",
]
diff --git a/btc-wire/src/bin/btc-wire-cli.rs b/btc-wire/src/bin/btc-wire-cli.rs
@@ -69,7 +69,7 @@ impl App {
Self { config, client }
}
- pub fn auto_wallet(&self, name: &str) -> (BtcRpc, Address) {
+ pub fn auto_wallet(&mut self, name: &str) -> (BtcRpc, Address) {
// Auto load
if let Err(err) = self.client.load_wallet(name) {
match err {
@@ -77,14 +77,14 @@ impl App {
e => Err(e).unwrap(),
}
}
- let wallet = BtcRpc::wallet(&self.config, name).unwrap();
+ let mut wallet = BtcRpc::wallet(&self.config, name).unwrap();
let addr = wallet
.get_new_address()
.expect(&format!("Failed to get wallet address {}", name));
(wallet, addr)
}
- pub fn next_block(&self, wallet: &str) {
+ pub fn next_block(&mut self, wallet: &str) {
match self.config.network {
Network::Regtest => {
// Manually mine a block
@@ -108,15 +108,15 @@ fn main() {
to,
amount,
}) => {
- let app = App::start(args.datadir);
- let (client, _) = app.auto_wallet(&from);
+ let mut app = App::start(args.datadir);
+ let (mut client, _) = app.auto_wallet(&from);
let (_, to) = app.auto_wallet(&to);
client
.send_segwit_key(&to, &Amount::from_btc(amount).unwrap(), &rand_key())
.unwrap();
}
Cmd::NextBlock(NextBlockCmd { to }) => {
- let app = App::start(args.datadir);
+ let mut app = App::start(args.datadir);
app.next_block(&to);
}
}
diff --git a/btc-wire/src/info.rs b/btc-wire/src/info.rs
@@ -0,0 +1,110 @@
+use bitcoin::{hashes::Hash, Txid};
+use url::Url;
+
+#[derive(Debug, Clone, Copy, thiserror::Error)]
+pub enum DecodeErr {
+ #[error("Unknown first byte: {0}")]
+ UnknownFirstByte(u8),
+ #[error(transparent)]
+ UriPack(#[from] uri_pack::DecodeErr),
+ #[error(transparent)]
+ Hash(#[from] bitcoin::hashes::Error),
+ #[error("Unexpected end of file")]
+ UnexpectedEOF,
+}
+
+/// Encoded metadata for outgoing transaction
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub enum Info {
+ Transaction { wtid: [u8; 32], url: Url },
+ Bounce { id: Txid },
+}
+
+// We leave a potential special meaning for u8::MAX
+const BOUNCE_BYTE: u8 = u8::MAX - 1;
+
+pub fn encode_info(info: &Info) -> Vec<u8> {
+ let mut buffer = Vec::new();
+ match info {
+ Info::Transaction { wtid, url } => {
+ buffer.push(if url.scheme() == "http" { 1 } else { 0 });
+ buffer.extend_from_slice(wtid);
+ let parts = format!("{}{}", url.domain().unwrap_or(""), url.path());
+ let packed = uri_pack::pack_uri(&parts).unwrap();
+ buffer.extend_from_slice(&packed);
+ return buffer;
+ }
+ Info::Bounce { id } => {
+ buffer.push(BOUNCE_BYTE);
+ buffer.extend_from_slice(id.as_ref());
+ }
+ }
+ return buffer;
+}
+
+pub fn decode_info(bytes: &[u8]) -> Result<Info, DecodeErr> {
+ if bytes.is_empty() {
+ return Err(DecodeErr::UnexpectedEOF);
+ }
+ match bytes[0] {
+ 0..=1 => {
+ if bytes.len() < 33 {
+ return Err(DecodeErr::UnexpectedEOF);
+ }
+ let packed = format!(
+ "http{}://{}",
+ if bytes[0] == 0 { "s" } else { "" },
+ uri_pack::unpack_uri(&bytes[33..])?,
+ );
+ let url = Url::parse(&packed).unwrap();
+ Ok(Info::Transaction {
+ wtid: bytes[1..33].try_into().unwrap(),
+ url,
+ })
+ }
+ BOUNCE_BYTE => Ok(Info::Bounce {
+ id: Txid::from_slice(&bytes[1..])?,
+ }),
+ unknown => Err(DecodeErr::UnknownFirstByte(unknown)),
+ }
+}
+
+#[cfg(test)]
+mod test {
+ use bitcoin::{hashes::Hash, Txid};
+ use btc_wire::test::rand_key;
+ use url::Url;
+
+ use crate::info::{decode_info, encode_info, Info};
+
+ #[test]
+ fn decode_encode_tx() {
+ let urls = [
+ "https://git.taler.net/",
+ "https://git.taler.net/depolymerization.git/",
+ "http://git.taler.net/",
+ "http://git.taler.net/depolymerization.git/",
+ ];
+ for url in urls {
+ let wtid = rand_key();
+ let url = Url::parse(url).unwrap();
+ let info = Info::Transaction { wtid, url };
+ let encode = encode_info(&info);
+ let decoded = decode_info(&encode).unwrap();
+ assert_eq!(decoded, info);
+ }
+ }
+
+ #[test]
+ fn decode_encode_bounce() {
+ for _ in 0..4 {
+ let id = rand_key();
+ let info = Info::Bounce {
+ id: Txid::from_slice(&id).unwrap(),
+ };
+ let encode = encode_info(&info);
+ let decoded = decode_info(&encode).unwrap();
+ assert_eq!(decoded, info);
+ }
+ }
+}
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -6,8 +6,10 @@ use btc_wire::{
segwit::DecodeSegWitErr,
GetOpReturnErr, GetSegwitErr,
};
-use postgres::{fallible_iterator::FallibleIterator, Client, NoTls};
+use info::decode_info;
+use postgres::{fallible_iterator::FallibleIterator, Client};
use rand::{rngs::OsRng, RngCore};
+use reconnect::{AutoReconnectRPC, AutoReconnectSql};
use std::{
collections::HashMap,
path::PathBuf,
@@ -19,36 +21,16 @@ use taler_config::Config;
use taler_log::log::{error, info, warn};
use url::Url;
-use crate::fail_point::fail_point;
+use crate::{
+ fail_point::fail_point,
+ info::{encode_info, Info},
+ status::TxStatus,
+};
mod fail_point;
-
-#[repr(u8)]
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-enum Status {
- /// Client have ask for a transaction
- Proposed = 0,
- /// Transaction have been announced to the bitcoin network
- Pending = 1,
- /// The wire failed to send this transaction and will try later
- Delayed = 2,
- /// Transaction have been mined and confirmed
- Confirmed = 3,
-}
-
-impl TryFrom<u8> for Status {
- type Error = ();
-
- fn try_from(v: u8) -> Result<Self, Self::Error> {
- match v {
- x if x == Status::Proposed as u8 => Ok(Status::Proposed),
- x if x == Status::Pending as u8 => Ok(Status::Pending),
- x if x == Status::Confirmed as u8 => Ok(Status::Confirmed),
- x if x == Status::Delayed as u8 => Ok(Status::Delayed),
- _ => Err(()),
- }
- }
-}
+mod info;
+mod reconnect;
+mod status;
fn btc_payto_url(addr: &Address) -> Url {
Url::from_str(&format!("payto://bitcoin/{}", addr)).unwrap()
@@ -77,133 +59,6 @@ fn taler_amount_to_btc_amount(amount: &Amount) -> Result<BtcAmount, String> {
return Ok(BtcAmount::from_sat(sat));
}
-fn encode_info(wtid: &[u8; 32], url: &Url) -> Vec<u8> {
- let mut buffer = wtid.to_vec();
- buffer.push(if url.scheme() == "http" { 1 } else { 0 });
- let parts = format!("{}{}", url.domain().unwrap_or(""), url.path());
- let packed = uri_pack::pack_uri(&parts).unwrap();
- buffer.extend_from_slice(&packed);
- return buffer;
-}
-
-fn decode_info(bytes: &[u8]) -> ([u8; 32], Url) {
- let packed = format!(
- "http{}://{}",
- if bytes[32] == 0 { "s" } else { "" },
- uri_pack::unpack_uri(&bytes[33..]).unwrap(),
- );
- let url = Url::parse(&packed).unwrap();
- return (bytes[..32].try_into().unwrap(), url);
-}
-
-#[cfg(test)]
-mod test {
- use btc_wire::test::rand_key;
- use url::Url;
-
- use crate::{decode_info, encode_info};
-
- #[test]
- fn decode_encode_info() {
- let urls = [
- "https://git.taler.net/",
- "https://git.taler.net/depolymerization.git/",
- "http://git.taler.net/",
- "http://git.taler.net/depolymerization.git/",
- ];
- for url in urls {
- let key = rand_key();
- let url = Url::parse(url).unwrap();
- let encode = encode_info(&key, &url);
- let decode = decode_info(&encode);
- assert_eq!(key, decode.0);
- assert_eq!(url, decode.1);
- }
- }
-}
-
-struct AutoReconnectRPC {
- delay: Duration,
- config: BitcoinConfig,
- wallet: String,
- client: BtcRpc,
-}
-
-impl AutoReconnectRPC {
- pub fn new(config: BitcoinConfig, wallet: impl Into<String>, delay: Duration) -> Self {
- let wallet: String = wallet.into();
- Self {
- client: Self::connect(&config, &wallet, delay),
- wallet,
- delay,
- config,
- }
- }
-
- /// Connect a new client, loop on error
- fn connect(config: &BitcoinConfig, wallet: &str, delay: Duration) -> BtcRpc {
- loop {
- match BtcRpc::wallet(config, wallet) {
- Ok(mut new) => match new.net_info() {
- Ok(_) => return new,
- Err(err) => {
- error!("connect: RPC - {}", err);
- std::thread::sleep(delay);
- }
- },
- Err(err) => {
- error!("connect:RPC - {}", err);
- std::thread::sleep(delay);
- }
- }
- }
- }
-
- pub fn client(&mut self) -> &mut BtcRpc {
- if self.client.net_info().is_err() {
- self.client = Self::connect(&self.config, &self.wallet, self.delay);
- }
- &mut self.client
- }
-}
-
-struct AutoReconnectSql {
- delay: Duration,
- config: String,
- client: Client,
-}
-
-impl AutoReconnectSql {
- pub fn new(config: impl Into<String>, delay: Duration) -> Self {
- let config: String = config.into();
- Self {
- client: Self::connect(&config, delay),
- config,
- delay,
- }
- }
-
- /// Connect a new client, loop on error
- fn connect(config: &str, delay: Duration) -> Client {
- loop {
- match Client::connect(config, NoTls) {
- Ok(new) => return new,
- Err(err) => {
- error!("connect: RPC - {}", err);
- std::thread::sleep(delay);
- }
- }
- }
- }
-
- pub fn client(&mut self) -> &mut Client {
- if self.client.is_valid(self.delay).is_err() {
- self.client = Self::connect(&self.config, self.delay);
- }
- &mut self.client
- }
-}
-
fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> {
Ok(db
.query_opt("SELECT value FROM state WHERE name='last_hash'", &[])?
@@ -212,37 +67,30 @@ fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> {
/// Listen for new proposed transactions and announce them on the bitcoin network
fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config) {
- // List all transactions with the given status
- fn list_status(db: &mut Client, status: Status) -> Result<Vec<i32>, postgres::Error> {
- let mut iter =
- db.query_raw("SELECT id FROM tx_out WHERE status=$1", &[&(status as i16)])?;
- let mut out = Vec::new();
- while let Some(row) = iter.next()? {
- out.push(row.get(0));
- }
- return Ok(out);
- }
-
- // Perform a transaction on the blockchain
- fn perform_send(
+ // Send a transaction on the blockchain, return true if more transactions with the same status remains
+ fn send_tx(
db: &mut Client,
rpc: &mut BtcRpc,
- id: i32,
- status: Status,
- ) -> Result<(), Box<dyn std::error::Error>> {
- assert!(status == Status::Delayed || status == Status::Proposed);
+ status: TxStatus,
+ ) -> Result<bool, Box<dyn std::error::Error>> {
+ assert!(status == TxStatus::Delayed || status == TxStatus::Requested);
let mut tx = db.transaction()?;
// We lock the row with FOR UPDATE to prevent sending same transaction multiple time
- let iter = tx.query_opt(
- "SELECT amount, wtid, credit_acc, exchange_url FROM tx_out WHERE id=$1 AND status=$2 FOR UPDATE",
- &[&id, &(status as i16)],
+ let row = tx.query_opt(
+ "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1 FOR UPDATE",
+ &[&(status as i16)],
)?;
- if let Some(row) = iter {
- let amount = taler_amount_to_btc_amount(&Amount::from_str(row.get(0))?)?;
- let reserve_pub: &[u8] = row.get(1);
- let addr: Address = btc_payto_addr(&Url::parse(row.get(2))?)?;
- let exchange_base_url: Url = Url::parse(row.get(3))?;
- let metadata = encode_info(reserve_pub.try_into()?, &exchange_base_url);
+ if let Some(row) = &row {
+ let id: i32 = row.get(0);
+ let amount = taler_amount_to_btc_amount(&Amount::from_str(row.get(1))?)?;
+ let reserve_pub: &[u8] = row.get(2);
+ let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?;
+ let exchange_base_url: Url = Url::parse(row.get(4))?;
+ let info = Info::Transaction {
+ wtid: reserve_pub.try_into()?,
+ url: exchange_base_url,
+ };
+ let metadata = encode_info(&info);
fail_point("Skip send_op_return", 0.2)?;
match rpc.send_op_return(&addr, &amount, &metadata) {
@@ -250,23 +98,22 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
fail_point("Fail update db", 0.2)?;
tx.execute(
"UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
- &[&(Status::Pending as i16), &tx_id.as_ref(), &id],
+ &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id],
)?;
- info!("{} PENDING", tx_id);
+ let amount = btc_amount_to_taler_amount(&amount.to_signed().unwrap());
+ info!("SEND >> {} {} in {}", addr, amount, tx_id);
}
Err(e) => {
info!("sender: RPC - {}", e);
tx.execute(
"UPDATE tx_out SET status=$1 WHERE id=$2",
- &[&(Status::Delayed as i16), &id],
+ &[&(TxStatus::Delayed as i16), &id],
)?;
}
}
- } else {
- warn!("sender: transaction status collision, database have been altered by another process");
+ tx.commit()?;
}
- tx.commit()?;
- Ok(())
+ Ok(row.is_some())
}
// TODO check if transactions are abandoned
@@ -293,16 +140,33 @@ fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql, config: &Config)
// Sync chain
sync_chain(rpc, db, config)?;
- // As we are now in sync with the blockchain if a transaction is in proposed or delayed state it have not been sent
+ // As we are now in sync with the blockchain if a transaction is in requested or delayed state it have not been sent
// Send delayed transactions
- for id in list_status(db, Status::Delayed)? {
- perform_send(db, rpc, id, Status::Delayed)?;
- }
- // Send proposed transactions
- for id in list_status(db, Status::Proposed)? {
- perform_send(db, rpc, id, Status::Proposed)?;
- }
+ while send_tx(db, rpc, TxStatus::Delayed)? {}
+ // Send requested transactions
+ while send_tx(db, rpc, TxStatus::Requested)? {}
+
+ // Check if already bounced
+ /*if nb > 0 && false {
+ // We do not handle failures, bouncing is done in a best effort manner
+ match rpc.bounce(&id, &BtcAmount::from_sat(config.bounce_fee)) {
+ Ok(it) => {
+ info!("bounce {} in {}", &id, &it);
+ db.execute(
+ "UPDATE bounce SET txid = $1 WHERE bounced = $2",
+ &[&it.as_ref(), &id.as_ref()],
+ )?;
+ }
+ Err(err) => match err {
+ BounceErr::AmountLessThanFee => { /* Ignore */ }
+ BounceErr::NotAReceiveTransaction | BounceErr::RPC(_) => {
+ Err(err)?
+ }
+ },
+ }
+ }*/
+
Ok(())
})();
if let Err(e) = result {
@@ -342,53 +206,62 @@ fn sync_chain(
Category::Send => {
match rpc.get_tx_op_return(&id) {
Ok((full, bytes)) => {
- let (wtid, url) = decode_info(&bytes);
- let expected_status = if full.confirmations >= confirmation as i32 {
- Status::Confirmed
- } else {
- Status::Pending
- };
let mut tx = db.transaction()?;
- let row = tx.query_opt(
- "SELECT status, id FROM tx_out WHERE wtid=$1 FOR UPDATE",
- &[&wtid.as_ref()],
- )?;
- if let Some(row) = row {
- let status: i16 = row.get(0);
- let _id: i32 = row.get(1);
- let status: Status = Status::try_from(status as u8).unwrap();
- if status != Status::Confirmed {
- if status != expected_status {
- tx.execute(
- "UPDATE tx_out SET status=$1 where id=$2",
- &[&(expected_status as i16), &_id],
+ match decode_info(&bytes) {
+ Ok(info) => match info {
+ Info::Transaction { wtid, url } => {
+ let row = tx.query_opt(
+ "SELECT status, id FROM tx_out WHERE wtid=$1 FOR UPDATE",
+ &[&wtid.as_ref()],
)?;
- if status == Status::Delayed || status == Status::Proposed {
- warn!(
- "watcher: tx {} have been recovered automatically",
- _id
- );
- } else if expected_status == Status::Confirmed {
- info!("{} CONFIRMED", &id);
- }
- }
- }
- } else {
- let debit_addr = sender_address(rpc, &full)?;
- let credit_addr = full.details[0].address.as_ref().unwrap();
- let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
- let amount = btc_amount_to_taler_amount(&full.amount);
- // Generate a random request_uid
- let mut request_uid = [0; 64];
- OsRng.fill_bytes(&mut request_uid);
- let nb = tx.execute(
+ if let Some(row) = row {
+ let status: i16 = row.get(0);
+ let _id: i32 = row.get(1);
+ let status: TxStatus =
+ TxStatus::try_from(status as u8).unwrap();
+ if status != TxStatus::Sent {
+ tx.execute(
+ "UPDATE tx_out SET status=$1 where id=$2",
+ &[&(TxStatus::Sent as i16), &_id],
+ )?;
+ if status == TxStatus::Delayed
+ || status == TxStatus::Requested
+ {
+ warn!(
+ "watcher: tx {} have been recovered automatically",
+ _id
+ );
+ let addr =
+ full.details[0].address.as_ref().unwrap();
+ let amount =
+ btc_amount_to_taler_amount(&full.amount);
+ info!("SEND >> {} {} in {}", addr, amount, &id);
+ }
+ }
+ } else {
+ let debit_addr = sender_address(rpc, &full)?;
+ let credit_addr = full.details[0].address.as_ref().unwrap();
+ let date =
+ SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
+ let amount = btc_amount_to_taler_amount(&full.amount);
+ // Generate a random request_uid
+ let mut request_uid = [0; 64];
+ OsRng.fill_bytes(&mut request_uid);
+ let nb = tx.execute(
"INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
- &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(expected_status as i16), &id.as_ref(), &request_uid.as_ref()
+ &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &request_uid.as_ref()
],
)?;
- if nb > 0 {
- warn!("watcher: found an unregistered outgoing address {} {} in tx {}", crockford_base32_encode(&wtid), &url, id);
- }
+ if nb > 0 {
+ warn!("watcher: found an unregistered outgoing address {} {} in tx {}", crockford_base32_encode(&wtid), &url, id);
+ }
+ }
+ }
+ Info::Bounce { .. } => {
+ // TODO
+ }
+ },
+ Err(err) => warn!("send: decode-info {} - {}", id, err),
}
tx.commit()?;
}
@@ -400,41 +273,25 @@ fn sync_chain(
}
Category::Receive => match rpc.get_tx_segwit_key(&id) {
Ok((full, reserve_pub)) => {
- let debit_addr = sender_address(rpc, &full)?;
- let credit_addr = full.details[0].address.as_ref().unwrap();
- let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
- let amount = btc_amount_to_taler_amount(&full.amount);
- let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
+ if full.confirmations >= confirmation as i32 {
+ let debit_addr = sender_address(rpc, &full)?;
+ let credit_addr = full.details[0].address.as_ref().unwrap();
+ let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
+ let amount = btc_amount_to_taler_amount(&full.amount);
+ let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
&date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref()
])?;
- if nb > 0 {
- info!("{} << {} {}", &debit_addr, &credit_addr, &amount);
+ if nb > 0 {
+ info!("{} << {} {} in {}", &debit_addr, &credit_addr, &amount, &id);
+ }
}
}
Err(err) => match err {
GetSegwitErr::Decode(
DecodeSegWitErr::MissingSegWitAddress | DecodeSegWitErr::NoMagicIdMatch,
) => {
+ // Request a bounce
db.execute("INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", &[&id.as_ref()])?;
- // Check if already bounced
- /*if nb > 0 && false {
- // We do not handle failures, bouncing is done in a best effort manner
- match rpc.bounce(&id, &BtcAmount::from_sat(config.bounce_fee)) {
- Ok(it) => {
- info!("bounce {} in {}", &id, &it);
- db.execute(
- "UPDATE bounce SET txid = $1 WHERE bounced = $2",
- &[&it.as_ref(), &id.as_ref()],
- )?;
- }
- Err(err) => match err {
- BounceErr::AmountLessThanFee => { /* Ignore */ }
- BounceErr::NotAReceiveTransaction | BounceErr::RPC(_) => {
- Err(err)?
- }
- },
- }
- }*/
}
err => warn!("receive: {} {}", id, err),
},
diff --git a/btc-wire/src/reconnect.rs b/btc-wire/src/reconnect.rs
@@ -0,0 +1,87 @@
+use std::time::Duration;
+
+use btc_wire::{config::BitcoinConfig, rpc::BtcRpc};
+use postgres::{Client, NoTls};
+use taler_log::log::error;
+
+pub struct AutoReconnectRPC {
+ delay: Duration,
+ config: BitcoinConfig,
+ wallet: String,
+ client: BtcRpc,
+}
+
+impl AutoReconnectRPC {
+ pub fn new(config: BitcoinConfig, wallet: impl Into<String>, delay: Duration) -> Self {
+ let wallet: String = wallet.into();
+ Self {
+ client: Self::connect(&config, &wallet, delay),
+ wallet,
+ delay,
+ config,
+ }
+ }
+
+ /// Connect a new client, loop on error
+ fn connect(config: &BitcoinConfig, wallet: &str, delay: Duration) -> BtcRpc {
+ loop {
+ match BtcRpc::wallet(config, wallet) {
+ Ok(mut new) => match new.net_info() {
+ Ok(_) => return new,
+ Err(err) => {
+ error!("connect: RPC - {}", err);
+ std::thread::sleep(delay);
+ }
+ },
+ Err(err) => {
+ error!("connect:RPC - {}", err);
+ std::thread::sleep(delay);
+ }
+ }
+ }
+ }
+
+ pub fn client(&mut self) -> &mut BtcRpc {
+ if self.client.net_info().is_err() {
+ self.client = Self::connect(&self.config, &self.wallet, self.delay);
+ }
+ &mut self.client
+ }
+}
+
+pub struct AutoReconnectSql {
+ delay: Duration,
+ config: String,
+ client: Client,
+}
+
+impl AutoReconnectSql {
+ pub fn new(config: impl Into<String>, delay: Duration) -> Self {
+ let config: String = config.into();
+ Self {
+ client: Self::connect(&config, delay),
+ config,
+ delay,
+ }
+ }
+
+ /// Connect a new client, loop on error
+ fn connect(config: &str, delay: Duration) -> Client {
+ loop {
+ match Client::connect(config, NoTls) {
+ Ok(new) => return new,
+ Err(err) => {
+ error!("connect: RPC - {}", err);
+ std::thread::sleep(delay);
+ }
+ }
+ }
+ }
+
+ pub fn client(&mut self) -> &mut Client {
+ if self.client.is_valid(self.delay).is_err() {
+ self.client = Self::connect(&self.config, self.delay);
+ }
+ &mut self.client
+ }
+}
diff --git a/btc-wire/src/status.rs b/btc-wire/src/status.rs
@@ -0,0 +1,50 @@
+#[repr(u8)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum TxStatus {
+ /// Client have ask for a transaction (default status)
+ Requested = 0,
+ /// The wire failed to send this transaction and will try later
+ Delayed = 1,
+ /// Transaction have been announced to the bitcoin network
+ Sent = 2,
+}
+
+impl TryFrom<u8> for TxStatus {
+ type Error = ();
+
+ fn try_from(v: u8) -> Result<Self, Self::Error> {
+ match v {
+ x if x == TxStatus::Requested as u8 => Ok(TxStatus::Requested),
+ x if x == TxStatus::Sent as u8 => Ok(TxStatus::Sent),
+ x if x == TxStatus::Delayed as u8 => Ok(TxStatus::Delayed),
+ _ => Err(()),
+ }
+ }
+}
+
+#[repr(u8)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum BounceStatus {
+ /// Bounce have been requested (default status)
+ Requested = 0,
+ /// The wire failed to send this bounce and will try later
+ Delayed = 1,
+ /// Bounce will not be sent (e.g: bounce amount smaller than bounce fee)
+ Ignored = 2,
+ /// Bounce have been announced to the bitcoin network
+ Sent = 3,
+}
+
+impl TryFrom<u8> for BounceStatus {
+ type Error = ();
+
+ fn try_from(v: u8) -> Result<Self, Self::Error> {
+ match v {
+ x if x == BounceStatus::Requested as u8 => Ok(BounceStatus::Requested),
+ x if x == BounceStatus::Sent as u8 => Ok(BounceStatus::Sent),
+ x if x == BounceStatus::Delayed as u8 => Ok(BounceStatus::Delayed),
+ x if x == BounceStatus::Ignored as u8 => Ok(BounceStatus::Ignored),
+ _ => Err(()),
+ }
+ }
+}
diff --git a/wire-gateway/db/schema.sql b/wire-gateway/db/schema.sql
@@ -32,7 +32,9 @@ CREATE TABLE tx_out (
-- Bounced transaction
CREATE TABLE bounce (
+ id SERIAL PRIMARY KEY,
bounced BYTEA UNIQUE NOT NULL,
txid BYTEA UNIQUE,
- _date TIMESTAMP NOT NULL DEFAULT now()
+ _date TIMESTAMP NOT NULL DEFAULT now(),
+ status SMALLINT NOT NULL DEFAULT 0
)
\ No newline at end of file