depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit c3d38b3f7af1478bd8055cfeda3ebbe220619ae3
parent b3fd5aa4ca8dc4006ebb82539be944e5caaf8e31
Author: Antoine A <>
Date:   Tue, 14 Dec 2021 15:17:19 +0100

Register outgoing transactions whose status failed to be updated

Diffstat:
Mbtc-wire/src/main.rs | 141++++++++++++++++++++++++++++++++++++++++++-------------------------------------
Mscript/setup.sh | 8++++----
2 files changed, 79 insertions(+), 70 deletions(-)

diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs @@ -32,9 +32,11 @@ enum Status { /// Transaction have been announced to the bitcoin network Pending = 1, /// Transaction have been mined - Confirmed = 2, - /// The wire cannot failed to send this transaction and will try latter + OnChain = 2, + /// The wire failed to send this transaction and will try later Delayed = 3, + /// The wire failed to update the transaction status in the database + Manual = 4, } impl TryFrom<u8> for Status { @@ -44,8 +46,9 @@ impl TryFrom<u8> for Status { match v { x if x == Status::Proposed as u8 => Ok(Status::Proposed), x if x == Status::Pending as u8 => Ok(Status::Pending), - x if x == Status::Confirmed as u8 => Ok(Status::Confirmed), + x if x == Status::OnChain as u8 => Ok(Status::OnChain), x if x == Status::Delayed as u8 => Ok(Status::Delayed), + x if x == Status::Manual as u8 => Ok(Status::Manual), _ => Err(()), } } @@ -123,10 +126,47 @@ mod test { } } +struct AutoReloadDb { + delay: Duration, + config: String, + client: Client, +} + +impl AutoReloadDb { + pub fn new(config: impl Into<String>, delay: Duration) -> Self { + let config: String = config.into(); + Self { + client: Self::connect(&config, delay), + config, + delay, + } + } + + /// Connect a new client, loop on error + fn connect(config: &str, delay: Duration) -> Client { + loop { + match Client::connect(config, NoTls) { + Ok(new) => return new, + Err(err) => { + error!("connect: DB - {}", err); + std::thread::sleep(delay); + } + } + } + } + + pub fn client(&mut self) -> &mut Client { + if self.client.is_valid(self.delay).is_err() { + self.client = Self::connect(&self.config, self.delay); + } + &mut self.client + } +} + /// Listen for new proposed transactions and announce them on the bitcoin network fn sender(rpc: RPC, mut db: AutoReloadDb) { fn get_proposed( - db: &mut Transaction, + db: &mut Client, ) -> Result<Vec<(i32, BtcAmount, Address, Vec<u8>)>, Box<dyn std::error::Error>> { let mut iter = db.query_raw( "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 OR status=$2", @@ -153,73 +193,42 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) { // TODO check if transactions are abandoned loop { - let db = db.client(); + let mut db = db.client(); let result: Result<(), Box<dyn std::error::Error>> = (|| { - // We should be the only one to interact with the database but we enforce it - let mut tx = db - .build_transaction() - .isolation_level(IsolationLevel::Serializable) - .start()?; - for (id, amount, addr, metadata) in get_proposed(&mut tx)? { - tx.execute( + for (id, amount, addr, metadata) in get_proposed(&mut db)? { + // Set status to MANUAL to detect database error preventing atomicity + db.execute( "UPDATE tx_out SET status=$1 WHERE id=$2", - &[&(Status::Delayed as i16), &id], + &[&(Status::Manual as i16), &id], )?; match rpc.send_op_return(&addr, amount, &metadata) { Ok(txid) => { - tx.execute( + let result = db.execute( "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", &[&(Status::Pending as i16), &txid.as_ref(), &id], + ); + if let Err(e) = result { + error!("sender: DB - {}", e); + // The watcher is going to recover the transaction automatically + } else { + info!("{} PENDING", txid); + } + } + Err(e) => { + info!("sender: RPC - {}", e); + db.execute( + "UPDATE tx_out SET status=$1 WHERE id=$2", + &[&(Status::Delayed as i16), &id], )?; - info!("{} PENDING", txid); } - Err(e) => info!("sender: RPC - {}", e), } } - tx.commit()?; Ok(()) })(); if let Err(e) = result { error!("sender: DB - {}", e); } - std::thread::sleep(Duration::from_millis(rand::random::<u8>() as u64)); - } -} - -struct AutoReloadDb { - delay: Duration, - config: String, - client: Client, -} - -impl AutoReloadDb { - pub fn new(config: impl Into<String>, delay: Duration) -> Self { - let config: String = config.into(); - Self { - client: Self::connect(&config, delay), - config, - delay, - } - } - - /// Connect a new client, loop on error - fn connect(config: &str, delay: Duration) -> Client { - loop { - match Client::connect(config, NoTls) { - Ok(new) => return new, - Err(err) => { - error!("connect: DB - {}", err); - std::thread::sleep(delay); - } - } - } - } - - pub fn client(&mut self) -> &mut Client { - if self.client.is_valid(self.delay).is_err() { - self.client = Self::connect(&self.config, self.delay); - } - &mut self.client + std::thread::sleep(Duration::from_millis(300)); } } @@ -260,25 +269,25 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) { Ok((full, bytes)) => { let (wtid, url) = decode_info(&bytes); let row = tx.query_opt( - "SELECT status, wtid, id FROM tx_out WHERE txid=$1", - &[&id.as_ref()], + "SELECT status, id FROM tx_out WHERE wtid=$1", + &[&wtid.as_ref()], )?; if let Some(row) = row { let status: i16 = row.get(0); - let _wtid: &[u8] = row.get(1); - let _id: i32 = row.get(2); - if &wtid != _wtid { - warn!("watcher: state tx {} have uncompatible wtid in DB {} and on chain {}", id, crockford_base32_encode(&wtid), crockford_base32_encode(&_wtid)); - exit(1); - } + let _id: i32 = row.get(1); let status: Status = Status::try_from(status as u8).unwrap(); - if status != Status::Confirmed { + if status != Status::OnChain { tx.execute( "UPDATE tx_out SET status=$1 where id=$2", - &[&(Status::Confirmed as i16), &_id], + &[&(Status::OnChain as i16), &_id], )?; if status == Status::Proposed { warn!("watcher: tx {} is present on chain at {} while being in proposed status", _id, id); + } else if status == Status::Manual { + warn!( + "watcher: tx {} have been recovered automatically", + _id + ); } else { info!("{} CONFIRMED", &id); } @@ -296,7 +305,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) { OsRng.fill_bytes(&mut request_uid); tx.execute( "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)", - &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string(), &config.base_url.to_string(), &(Status::Confirmed as i16), &id.as_ref(), &request_uid.as_ref() + &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string(), &config.base_url.to_string(), &(Status::OnChain as i16), &id.as_ref(), &request_uid.as_ref() ], )?; warn!("watcher: found an unregistered outgoing address {} {} in tx {}", crockford_base32_encode(&wtid), &url, id); diff --git a/script/setup.sh b/script/setup.sh @@ -56,10 +56,10 @@ function check_balance() { function btc_wire() { cargo build --bin btc-wire &> /dev/null - target/debug/btc-wire $BTC_DIR &>> btc_wire.log & - # Can be used to test serialization - # target/debug/btc-wire $BTC_DIR &>> btc_wire.log & - # target/debug/btc-wire $BTC_DIR &>> btc_wire.log & + target/debug/btc-wire $BTC_DIR &> btc_wire.log & + # Can be used to test db transactions serialization + #target/debug/btc-wire $BTC_DIR &>> btc_wire.log & + #target/debug/btc-wire $BTC_DIR &>> btc_wire.log & } function gateway() {