summaryrefslogtreecommitdiff
path: root/btc-wire/src/main.rs
diff options
context:
space:
mode:
Diffstat (limited to 'btc-wire/src/main.rs')
-rw-r--r--btc-wire/src/main.rs71
1 files changed, 38 insertions, 33 deletions
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
index c4f985d..b5bb550 100644
--- a/btc-wire/src/main.rs
+++ b/btc-wire/src/main.rs
@@ -11,7 +11,7 @@ use btc_wire::{
ClientExtended, GetOpReturnErr, GetSegwitErr,
};
use configparser::ini::Ini;
-use postgres::{fallible_iterator::FallibleIterator, Client, NoTls};
+use postgres::{fallible_iterator::FallibleIterator, Client, IsolationLevel, NoTls, Transaction};
use rand::{rngs::OsRng, RngCore};
use std::{
collections::HashMap,
@@ -126,7 +126,7 @@ mod test {
/// Listen for new proposed transactions and announce them on the bitcoin network
fn sender(rpc: RPC, mut db: AutoReloadDb) {
fn get_proposed(
- db: &mut Client,
+ db: &mut Transaction,
) -> Result<Vec<(i32, BtcAmount, Address, Vec<u8>)>, Box<dyn std::error::Error>> {
let mut iter = db.query_raw(
"SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 OR status=$2",
@@ -150,33 +150,39 @@ fn sender(rpc: RPC, mut db: AutoReloadDb) {
return Ok(out);
}
+ // TODO check if transactions are abandoned
+
loop {
let db = db.client();
let result: Result<(), Box<dyn std::error::Error>> = (|| {
- for (id, amount, addr, metadata) in get_proposed(db)? {
+ // We should be the only one to interact with the database but we enforce it
+ let mut tx = db
+ .build_transaction()
+ .isolation_level(IsolationLevel::Serializable)
+ .start()?;
+ for (id, amount, addr, metadata) in get_proposed(&mut tx)? {
+ tx.execute(
+ "UPDATE tx_out SET status=$1 WHERE id=$2",
+ &[&(Status::Delayed as i16), &id],
+ )?;
match rpc.send_op_return(&addr, amount, &metadata) {
Ok(txid) => {
- db.execute(
+ tx.execute(
"UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
&[&(Status::Pending as i16), &txid.as_ref(), &id],
)?;
info!("{} PENDING", txid);
}
- Err(e) => {
- info!("sender: RPC - {}", e);
- db.execute(
- "UPDATE tx_out SET status=$1 WHERE id=$3",
- &[&(Status::Delayed as i16), &id],
- )?;
- }
+ Err(e) => info!("sender: RPC - {}", e),
}
}
+ tx.commit()?;
Ok(())
})();
if let Err(e) = result {
error!("sender: DB - {}", e);
}
- std::thread::sleep(Duration::from_millis(300));
+ std::thread::sleep(Duration::from_millis(rand::random::<u8>() as u64));
}
}
@@ -219,22 +225,20 @@ impl AutoReloadDb {
/// Listen for mined block and index confirmed transactions into the database
fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
- let mut last_hash: Option<BlockHash> = {
- let db = db.client();
- let stored_hash = db
- .query("SELECT value FROM state WHERE name='last_hash'", &[])
- .unwrap();
- if stored_hash.len() == 1 {
- Some(BlockHash::from_slice(stored_hash[0].get(0)).unwrap())
- } else {
- None
- }
- };
let confirmation = 1;
loop {
let db = db.client();
+
let result: Result<(), Box<dyn std::error::Error>> = (|| {
+ // We should be the only one to interact with the database but we enforce it
+ let mut tx = db
+ .build_transaction()
+ .isolation_level(IsolationLevel::Serializable)
+ .start()?;
+ let last_hash: Option<BlockHash> = tx
+ .query_opt("SELECT value FROM state WHERE name='last_hash'", &[])?
+ .map(|r| BlockHash::from_slice(r.get(0)).unwrap());
let list =
rpc.list_since_block(last_hash.as_ref(), Some(confirmation), None, Some(true))?;
@@ -255,7 +259,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
Category::Send => match rpc.get_tx_op_return(&id) {
Ok((full, bytes)) => {
let (wtid, url) = decode_info(&bytes);
- let row = db.query_opt(
+ let row = tx.query_opt(
"SELECT status, wtid, id FROM tx_out WHERE txid=$1",
&[&id.as_ref()],
)?;
@@ -269,7 +273,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
}
let status: Status = Status::try_from(status as u8).unwrap();
if status != Status::Confirmed {
- db.execute(
+ tx.execute(
"UPDATE tx_out SET status=$1 where id=$2",
&[&(Status::Confirmed as i16), &_id],
)?;
@@ -284,12 +288,13 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
let credit_addr = full.tx.details[0].address.as_ref().unwrap();
let time = full.tx.info.blocktime.unwrap();
let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time);
- let amount =
- btc_amount_to_taler_amount(&full.tx.amount.abs().to_unsigned()?);
+ let amount = btc_amount_to_taler_amount(
+ &full.tx.amount.abs().to_unsigned()?,
+ );
// Generate a random request_uid
let mut request_uid = [0; 64];
OsRng.fill_bytes(&mut request_uid);
- db.execute(
+ tx.execute(
"INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)",
&[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string(), &config.base_url.to_string(), &(Status::Confirmed as i16), &id.as_ref(), &request_uid.as_ref()
],
@@ -309,7 +314,7 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
let time = full.tx.info.blocktime.unwrap();
let date = SystemTime::UNIX_EPOCH + Duration::from_secs(time);
let amount = btc_amount_to_taler_amount(&full.tx.amount.to_unsigned()?);
- db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5)", &[
+ tx.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5)", &[
&date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).to_string(), &btc_payto_url(&credit_addr).to_string()
])?;
info!("{} << {} {}", &debit_addr, &credit_addr, &amount);
@@ -330,15 +335,15 @@ fn watcher(rpc: RPC, mut db: AutoReloadDb, config: &Config) {
} else {
"INSERT INTO state (name, value) VALUES ('last_hash', $1)"
};
- db.execute(query, &[&list.lastblock.as_ref()])?;
- last_hash = Some(list.lastblock);
+ tx.execute(query, &[&list.lastblock.as_ref()])?;
+ tx.commit()?;
Ok(())
})();
if let Err(e) = result {
- error!("sender: DB - {}", e);
+ error!("watcher: DB - {}", e);
}
- info!("sender: Wait for block");
+ info!("watcher: Wait for block");
rpc.wait_for_new_block(0).ok();
}
}