summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntoine A <>2021-12-14 15:17:19 +0100
committerAntoine A <>2021-12-14 15:17:19 +0100
commitc3d38b3f7af1478bd8055cfeda3ebbe220619ae3 (patch)
tree84dacf63be518e1583c4598c1dd816baedf86564
parentb3fd5aa4ca8dc4006ebb82539be944e5caaf8e31 (diff)
downloaddepolymerization-c3d38b3f7af1478bd8055cfeda3ebbe220619ae3.tar.gz
depolymerization-c3d38b3f7af1478bd8055cfeda3ebbe220619ae3.tar.bz2
depolymerization-c3d38b3f7af1478bd8055cfeda3ebbe220619ae3.zip
Register outgoing transactions whose status failed to be updated
-rw-r--r--btc-wire/src/main.rs141
-rw-r--r--script/setup.sh8
2 files changed, 79 insertions, 70 deletions
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
index b5bb550..f95e35f 100644
--- a/btc-wire/src/main.rs
+++ b/btc-wire/src/main.rs
@@ -32,9 +32,11 @@ enum Status {
/// Transaction have been announced to the bitcoin network
Pending = 1,
/// Transaction have been mined
- Confirmed = 2,
- /// The wire cannot failed to send this transaction and will try latter
+ OnChain = 2,
+ /// The wire failed to send this transaction and will try later
Delayed = 3,
+ /// The wire failed to update the transaction status in the database
+ Manual = 4,
}
impl TryFrom<u8> for Status {
@@ -44,8 +46,9 @@ impl TryFrom<u8> for Status {
match v {
x if x == Status::Proposed as u8 => Ok(Status::Proposed),
x if x == Status::Pending as u8 => Ok(Status::Pending),
- x if x == Status::Confirmed as u8 => Ok(Status::Confirmed),
+ x if x == Status::OnChain as u8 => Ok(Status::OnChain),
x if x == Status::Delayed as u8 => Ok(Status::Delayed),
+ x if x == Status::Manual as u8 => Ok(Status::Manual),
_ => Err(()),
}
}
@@ -123,10 +126,47 @@ mod test {
}
}
+struct AutoReloadDb {
+ delay: Duration,
+ config: String,
+ client: Client,
+}
+
+impl AutoReloadDb {
+ pub fn new(config: impl Into<String>, delay: Duration) -> Self {
+ let config: String = config.into();
+ Self {
+ client: Self::connect(&config, delay),
+ config,
+ delay,
+ }
+ }
+
+ /// Connect a new client, loop on error
+ fn connect(config: &str, delay: Duration) -> Client {
+ loop {
+ match Client::connect(config, NoTls) {
+ Ok(new) => return new,
+ Err(err) => {
+ error!("connect: DB - {}", err);
+ std::thread::sleep(delay);
+ }
+ }
+ }
+ }
+
+ pub fn client(&mut self) -> &mut Client {
+ if self.client.is_valid(self.delay).is_err() {
+ self.client = Self::connect(&self.config, self.delay);
+ }
+ &mut self.client
+ }
+}
+
/// Listen for new proposed transactions and announce them on the bitcoin network
fn sender(rpc: RPC, mut db: AutoReloadDb) {
fn get_proposed(
- db: &mut Transaction,
+ db: &mut Client,
) -> Result<Vec<(i32, BtcAmount, Address, Vec<u8>)>, Box<dyn std::error::Error>> {
let mut iter = db.query_raw(
"SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 OR status=$2",
@@ -153,73 +193,42 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
// TODO check if transactions are abandoned
loop {
- let db = db.client();
+ let mut db = db.client();
let result: Result<(), Box<dyn std::error::Error>> = (|| {
- // We should be the only one to interact with the database but we enforce it
- let mut tx = db
- .build_transaction()
- .isolation_level(IsolationLevel::Serializable)
- .start()?;
- for (id, amount, addr, metadata) in get_proposed(&mut tx)? {
- tx.execute(
+ for (id, amount, addr, metadata) in get_proposed(&mut db)? {
+ // Set status to MANUAL to detect database error preventing atomicity
+ db.execute(
"UPDATE tx_out SET status=$1 WHERE id=$2",
- &[&(Status::Delayed as i16), &id],
+ &[&(Status::Manual as i16), &id],
)?;
match rpc.send_op_return(&addr, amount, &metadata) {
Ok(txid) => {
- tx.execute(
+ let result = db.execute(
"UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
&[&(Status::Pending as i16), &txid.as_ref(), &id],
+ );
+ if let Err(e) = result {
+ error!("sender: DB - {}", e);
+ // The watcher is going to recover the transaction automatically
+ } else {
+ info!("{} PENDING", txid);
+ }
+ }
+ Err(e) => {
+ info!("sender: RPC - {}", e);
+ db.execute(
+ "UPDATE tx_out SET status=$1 WHERE id=$2",
+ &[&(Status::Delayed as i16), &id],
)?;
- info!("{} PENDING", txid);
}
- Err(e) => info!("sender: RPC - {}", e),
}
}
- tx.commit()?;
Ok(())
})();
if let Err(e) = result {
error!("sender: DB - {}", e);
}
- std::thread::sleep(Duration::from_millis(rand::random::<u8>() as u64));
- }
-}
-
-struct AutoReloadDb {
- delay: Duration,
- config: String,
- client: Client,
-}
-
-impl AutoReloadDb {
- pub fn new(config: impl Into<String>, delay: Duration) -> Self {
- let config: String = config.into();
- Self {
- client: Self::connect(&config, delay),
- config,
- delay,
- }
- }
-
- /// Connect a new client, loop on error
- fn connect(config: &str, delay: Duration) -> Client {
- loop {
- match Client::connect(config, NoTls) {
- Ok(new) => return new,
- Err(err) => {
- error!("connect: DB - {}", err);
- std::thread::sleep(delay);
- }
- }
- }
- }
-
- pub fn client(&mut self) -> &mut Client {
- if self.client.is_valid(self.delay).is_err() {
- self.client = Self::connect(&self.config, self.delay);
- }
- &mut self.client
+ std::thread::sleep(Duration::from_millis(300));
}
}
@@ -260,25 +269,25 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
Ok((full, bytes)) => {
let (wtid, url) = decode_info(&bytes);
let row = tx.query_opt(
- "SELECT status, wtid, id FROM tx_out WHERE txid=$1",
- &[&id.as_ref()],
+ "SELECT status, id FROM tx_out WHERE wtid=$1",
+ &[&wtid.as_ref()],
)?;
if let Some(row) = row {
let status: i16 = row.get(0);
- let _wtid: &[u8] = row.get(1);
- let _id: i32 = row.get(2);
- if &wtid != _wtid {
- warn!("watcher: state tx {} have uncompatible wtid in DB {} and on chain {}", id, crockford_base32_encode(&wtid), crockford_base32_encode(&_wtid));
- exit(1);
- }
+ let _id: i32 = row.get(1);
let status: Status = Status::try_from(status as u8).unwrap();
- if status != Status::Confirmed {
+ if status != Status::OnChain {
tx.execute(
"UPDATE tx_out SET status=$1 where id=$2",
- &[&(Status::Confirmed as i16), &_id],
+ &[&(Status::OnChain as i16), &_id],
)?;
if status == Status::Proposed {
warn!("watcher: tx {} is present on chain at {} while being in proposed status", _id, id);
+ } else if status == Status::Manual {
+ warn!(
+ "watcher: tx {} have been recovered automatically",
+ _id
+ );
} else {
info!("{} CONFIRMED", &id);
}
@@ -296,7 +305,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
OsRng.fill_bytes(&mut request_uid);
tx.execute(
"INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
- &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string(), &config.base_url.to_string(), &(Status::Confirmed as i16), &id.as_ref(), &request_uid.as_ref()
+ &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string(), &config.base_url.to_string(), &(Status::OnChain as i16), &id.as_ref(), &request_uid.as_ref()
],
)?;
warn!("watcher: found an unregistered outgoing address {} {} in tx {}", crockford_base32_encode(&wtid), &url, id);
diff --git a/script/setup.sh b/script/setup.sh
index 392918e..7c7ed44 100644
--- a/script/setup.sh
+++ b/script/setup.sh
@@ -56,10 +56,10 @@ function check_balance() {
function btc_wire() {
cargo build --bin btc-wire &> /dev/null
- target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
- # Can be used to test serialization
- # target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
- # target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
+ target/debug/btc-wire $BTC_DIR &> btc_wire.log &
+ # Can be used to test db transactions serialization
+ #target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
+ #target/debug/btc-wire $BTC_DIR &>> btc_wire.log &
}
function gateway() {