commit d38811ca846872386824a3f053d018727bfa4e20
parent 0a63ad8bcd97be33fbfa87e358de3bd99891300c
Author: Antoine A <>
Date: Wed, 9 Feb 2022 12:17:27 +0100
eth-wire: handle concurrency and injected failures
Diffstat:
23 files changed, 486 insertions(+), 292 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -76,9 +76,9 @@ dependencies = [
[[package]]
name = "autocfg"
-version = "1.0.1"
+version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
+checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "base32"
@@ -127,9 +127,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a"
[[package]]
name = "block-buffer"
-version = "0.10.1"
+version = "0.10.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "03588e54c62ae6d763e2a80090d50353b785795361b4ff5b3bf0a5097fc31c0b"
+checksum = "0bf7fe51849ea569fd452f37822f606a5cabb684dc918707a0193fd4664ff324"
dependencies = [
"generic-array",
]
@@ -492,9 +492,9 @@ dependencies = [
[[package]]
name = "ethbloom"
-version = "0.12.0"
+version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c927489503a5d331fdb25a3ac8d80e433ee6aa96951c99346d66e7be2b0400a0"
+checksum = "11da94e443c60508eb62cf256243a64da87304c2802ac2528847f79d750007ef"
dependencies = [
"crunchy",
"fixed-hash",
@@ -504,9 +504,9 @@ dependencies = [
[[package]]
name = "ethereum-types"
-version = "0.13.0"
+version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6e6307a3fa100563ce323e6b654c5935c8d919aeee56b0ccef1c65c5ecd2ecf6"
+checksum = "b2827b94c556145446fcce834ca86b7abf0c39a805883fe20e72c5bfdb5a0dc6"
dependencies = [
"ethbloom",
"fixed-hash",
@@ -1196,9 +1196,9 @@ checksum = "eb9f9e6e233e5c4a35559a617bf40a4ec447db2e84c20b55a6f83167b7e57872"
[[package]]
name = "primitive-types"
-version = "0.11.0"
+version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "db5a3482110d085cee2a284278b42a4361bc611d008eddfbf04fd84735ae521f"
+checksum = "e28720988bff275df1f51b171e1b2a18c30d194c4d2b61defdacecd625a5d94a"
dependencies = [
"fixed-hash",
"impl-serde",
@@ -1485,9 +1485,9 @@ dependencies = [
[[package]]
name = "serde_with"
-version = "1.11.0"
+version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ad6056b4cb69b6e43e3a0f055def223380baecc99da683884f205bf347f7c4b3"
+checksum = "ec1e6ec4d8950e5b1e894eac0d360742f3b1407a6078a604a731c4b3f49cefbc"
dependencies = [
"rustversion",
"serde",
diff --git a/btc-wire/README.md b/btc-wire/README.md
@@ -17,7 +17,7 @@ Bitcoind version >= 22.0 is required
The configuration is based on [taler.conf](https://docs.taler.net/manpages/taler.conf.5.html)
``` ini
-# taler.conf - btc_wire config
+# taler.conf - btc-wire config
[depolymerizer-bitcoin]
DATA_DIR =
CONFIRMATION = 6
diff --git a/btc-wire/src/loops.rs b/btc-wire/src/loops.rs
@@ -29,7 +29,7 @@ pub enum LoopError {
Rpc(#[from] rpc::Error),
#[error(transparent)]
DB(#[from] postgres::Error),
- #[error("Another btc_wire process is running concurrently")]
+ #[error("Another btc-wire process is running concurrently")]
Concurrency,
#[error(transparent)]
Injected(#[from] Injected),
diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs
@@ -83,8 +83,8 @@ pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb, state: &WireSt
// It is not possible to atomically update the blockchain and the database.
// When we failed to sync the database and the blockchain state we rely on
- // sync_chain to recover the lost update.
- // When this function is running in parallel it not possible to known another
+ // sync_chain to recover the lost updates.
+ // When this function is running concurrently, it not possible to known another
// execution has failed, and this can lead to a transaction being sent multiple time.
// To ensure only a single version of this function is running at a given time we rely
// on postgres advisory lock
@@ -127,72 +127,6 @@ pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb, state: &WireSt
}
}
-/// Send a withdraw transaction on the blockchain, return false if no more requested transaction are found
-fn withdraw(db: &mut Client, rpc: &mut Rpc) -> LoopResult<bool> {
- // We rely on the advisory lock to ensure we are the only one sending transactions
- let row = db.query_opt(
- "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY _date LIMIT 1",
- &[&(WithdrawStatus::Requested as i16)],
- )?;
- if let Some(row) = &row {
- let id: i32 = row.get(0);
- let amount = sql_btc_amount(row, 1);
- let wtid: [u8; 32] = sql_array(row, 2);
- let addr = sql_addr(row, 3);
- let url = sql_url(row, 4);
- let metadata = OutMetadata::Withdraw { wtid, url };
-
- let tx_id = rpc.send_op_return(&addr, &amount, &metadata.encode(), false, true)?;
- fail_point("(injected) fail send", 0.3)?;
- db.execute(
- "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
- &[&(WithdrawStatus::Sent as i16), &tx_id.as_ref(), &id],
- )?;
- let amount = btc_to_taler(&amount.to_signed().unwrap());
- info!(">> {} {} in {} to {}", amount, base32(&wtid), tx_id, addr);
- }
- Ok(row.is_some())
-}
-
-/// Bounce a transaction on the blockchain, return false if no more requested transaction are found
-fn bounce(db: &mut Client, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> {
- // We rely on the advisory lock to ensure we are the only one sending transactions
- let row = db.query_opt(
- "SELECT id, bounced FROM bounce WHERE status=$1 ORDER BY _date LIMIT 1",
- &[&(BounceStatus::Requested as i16)],
- )?;
- if let Some(row) = &row {
- let id: i32 = row.get(0);
- let bounced: Txid = sql_txid(row, 1);
- let metadata = OutMetadata::Bounce { bounced };
-
- match rpc.bounce(&bounced, fee, &metadata.encode()) {
- Ok(it) => {
- fail_point("(injected) fail bounce", 0.3)?;
- db.execute(
- "UPDATE bounce SET txid=$1, status=$2 WHERE id=$3",
- &[&it.as_ref(), &(BounceStatus::Sent as i16), &id],
- )?;
- info!("|| {} in {}", &bounced, &it);
- }
- Err(err) => match err {
- rpc::Error::RPC {
- code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError,
- msg,
- } => {
- db.execute(
- "UPDATE bounce SET status=$1 WHERE id=$2",
- &[&(BounceStatus::Ignored as i16), &id],
- )?;
- info!("|| (ignore) {} because {}", &bounced, msg);
- }
- e => Err(e)?,
- },
- }
- }
- Ok(row.is_some())
-}
-
/// Retrieve last stored hash
fn last_hash(db: &mut Client) -> Result<BlockHash, postgres::Error> {
let row = db.query_one("SELECT value FROM state WHERE name='last_hash'", &[])?;
@@ -351,28 +285,41 @@ fn sync_chain_removed(
}
}
-/// Sync database with an outgoing transaction
-fn sync_chain_outgoing(
+/// Sync database with an incoming confirmed transaction
+fn sync_chain_incoming_confirmed(
id: &Txid,
- confirmations: i32,
rpc: &mut Rpc,
db: &mut Client,
- state: &WireState,
-) -> LoopResult<()> {
- match rpc
- .get_tx_op_return(id)
- .map(|(full, bytes)| (full, OutMetadata::decode(&bytes)))
- {
- Ok((full, Ok(info))) => match info {
- OutMetadata::Withdraw { wtid, .. } => {
- sync_chain_withdraw(id, &full, &wtid, rpc, db, confirmations, state)?
+) -> Result<(), LoopError> {
+ match rpc.get_tx_segwit_key(id) {
+ Ok((full, reserve_pub)) => {
+ // Store transactions in database
+ let debit_addr = sender_address(rpc, &full)?;
+ let credit_addr = full.details[0].address.as_ref().unwrap();
+ let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
+ let amount = btc_to_taler(&full.amount);
+ let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
+ &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref()
+ ])?;
+ if nb > 0 {
+ info!(
+ "<< {} {} in {} from {}",
+ amount,
+ base32(&reserve_pub),
+ id,
+ debit_addr
+ );
}
- OutMetadata::Bounce { bounced } => sync_chain_bounce(id, &bounced, db, confirmations)?,
- },
- Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e),
- Err(e) => match e {
- GetOpReturnErr::MissingOpReturn => { /* Ignore */ }
- GetOpReturnErr::RPC(e) => return Err(e)?,
+ }
+ Err(err) => match err {
+ GetSegwitErr::Decode(_) => {
+ // If encoding is wrong request a bounce
+ db.execute(
+ "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING",
+ &[&id.as_ref()],
+ )?;
+ }
+ GetSegwitErr::RPC(e) => return Err(e.into()),
},
}
Ok(())
@@ -560,42 +507,95 @@ fn sync_chain_bounce(
Ok(())
}
-/// Sync database with an incoming confirmed transaction
-fn sync_chain_incoming_confirmed(
+/// Sync database with an outgoing transaction
+fn sync_chain_outgoing(
id: &Txid,
+ confirmations: i32,
rpc: &mut Rpc,
db: &mut Client,
-) -> Result<(), LoopError> {
- match rpc.get_tx_segwit_key(id) {
- Ok((full, reserve_pub)) => {
- // Store transactions in database
- let debit_addr = sender_address(rpc, &full)?;
- let credit_addr = full.details[0].address.as_ref().unwrap();
- let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time);
- let amount = btc_to_taler(&full.amount);
- let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
- &date, &amount.to_string(), &reserve_pub.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref()
- ])?;
- if nb > 0 {
- info!(
- "<< {} {} in {} from {}",
- amount,
- base32(&reserve_pub),
- id,
- debit_addr
- );
+ state: &WireState,
+) -> LoopResult<()> {
+ match rpc
+ .get_tx_op_return(id)
+ .map(|(full, bytes)| (full, OutMetadata::decode(&bytes)))
+ {
+ Ok((full, Ok(info))) => match info {
+ OutMetadata::Withdraw { wtid, .. } => {
+ sync_chain_withdraw(id, &full, &wtid, rpc, db, confirmations, state)?
}
- }
- Err(err) => match err {
- GetSegwitErr::Decode(_) => {
- // If encoding is wrong request a bounce
+ OutMetadata::Bounce { bounced } => sync_chain_bounce(id, &bounced, db, confirmations)?,
+ },
+ Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e),
+ Err(e) => match e {
+ GetOpReturnErr::MissingOpReturn => { /* Ignore */ }
+ GetOpReturnErr::RPC(e) => return Err(e)?,
+ },
+ }
+ Ok(())
+}
+
+/// Send a withdraw transaction on the blockchain, return false if no more requested transaction are found
+fn withdraw(db: &mut Client, rpc: &mut Rpc) -> LoopResult<bool> {
+ // We rely on the advisory lock to ensure we are the only one sending transactions
+ let row = db.query_opt(
+ "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY _date LIMIT 1",
+ &[&(WithdrawStatus::Requested as i16)],
+ )?;
+ if let Some(row) = &row {
+ let id: i32 = row.get(0);
+ let amount = sql_btc_amount(row, 1);
+ let wtid: [u8; 32] = sql_array(row, 2);
+ let addr = sql_addr(row, 3);
+ let url = sql_url(row, 4);
+ let metadata = OutMetadata::Withdraw { wtid, url };
+
+ let tx_id = rpc.send_op_return(&addr, &amount, &metadata.encode(), false, true)?;
+ fail_point("(injected) fail withdraw", 0.3)?;
+ db.execute(
+ "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
+ &[&(WithdrawStatus::Sent as i16), &tx_id.as_ref(), &id],
+ )?;
+ let amount = btc_to_taler(&amount.to_signed().unwrap());
+ info!(">> {} {} in {} to {}", amount, base32(&wtid), tx_id, addr);
+ }
+ Ok(row.is_some())
+}
+
+/// Bounce a transaction on the blockchain, return false if no more requested transaction are found
+fn bounce(db: &mut Client, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> {
+ // We rely on the advisory lock to ensure we are the only one sending transactions
+ let row = db.query_opt(
+ "SELECT id, bounced FROM bounce WHERE status=$1 ORDER BY _date LIMIT 1",
+ &[&(BounceStatus::Requested as i16)],
+ )?;
+ if let Some(row) = &row {
+ let id: i32 = row.get(0);
+ let bounced: Txid = sql_txid(row, 1);
+ let metadata = OutMetadata::Bounce { bounced };
+
+ match rpc.bounce(&bounced, fee, &metadata.encode()) {
+ Ok(it) => {
+ fail_point("(injected) fail bounce", 0.3)?;
db.execute(
- "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING",
- &[&id.as_ref()],
+ "UPDATE bounce SET txid=$1, status=$2 WHERE id=$3",
+ &[&it.as_ref(), &(BounceStatus::Sent as i16), &id],
)?;
+ info!("|| {} in {}", &bounced, &it);
}
- GetSegwitErr::RPC(e) => return Err(e.into()),
- },
+ Err(err) => match err {
+ rpc::Error::RPC {
+ code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError,
+ msg,
+ } => {
+ db.execute(
+ "UPDATE bounce SET status=$1 WHERE id=$2",
+ &[&(BounceStatus::Ignored as i16), &id],
+ )?;
+ info!("|| (ignore) {} because {}", &bounced, msg);
+ }
+ e => Err(e)?,
+ },
+ }
}
- Ok(())
+ Ok(row.is_some())
}
diff --git a/eth-wire/Cargo.toml b/eth-wire/Cargo.toml
@@ -5,6 +5,10 @@ edition = "2021"
license = "AGPL-3.0-or-later"
rust-version = "1.56.1"
+[features]
+# Enable random failures
+fail = []
+
[dependencies]
# Cli args
argh = "0.1.7"
diff --git a/eth-wire/src/fail_point.rs b/eth-wire/src/fail_point.rs
@@ -0,0 +1,31 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2022 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+#[derive(Debug, thiserror::Error)]
+#[error("{0}")]
+pub struct Injected(&'static str);
+
+/// Inject random failure when 'fail' feature is used
+#[allow(unused_variables)]
+pub fn fail_point(msg: &'static str, prob: f32) -> Result<(), Injected> {
+ #[cfg(feature = "fail")]
+ return if common::rand::random::<f32>() < prob {
+ Err(Injected(msg))
+ } else {
+ Ok(())
+ };
+
+ return Ok(());
+}
diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs
@@ -32,8 +32,9 @@ use eth_wire::{
use ethereum_types::{Address, H256, U256, U64};
use crate::{
+ fail_point::fail_point,
sql::{sql_addr, sql_eth_amount, sql_hash},
- LoopResult, WireState,
+ LoopError, LoopResult, WireState,
};
pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb, state: &WireState) {
@@ -71,6 +72,21 @@ pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb, state: &WireSt
while iter.next()?.is_some() {}
}
+ // It is not possible to atomically update the blockchain and the database.
+ // When we failed to sync the database and the blockchain state we rely on
+ // sync_chain to recover the lost updates.
+ // When this function is running concurrently, it not possible to known another
+ // execution has failed, and this can lead to a transaction being sent multiple time.
+ // To ensure only a single version of this function is running at a given time we rely
+ // on postgres advisory lock
+
+ // Take the lock
+ let row = db.query_one("SELECT pg_try_advisory_lock(42)", &[])?;
+ let locked: bool = row.get(0);
+ if !locked {
+ return Err(LoopError::Concurrency);
+ }
+
// Sync chain
sync_chain(rpc, db, state, &mut status)?;
@@ -87,13 +103,17 @@ pub fn worker(mut rpc: AutoReconnectRPC, mut db: AutoReconnectDb, state: &WireSt
if let Err(e) = result {
error!("worker: {}", e);
- skip_notification = true;
+ skip_notification = !matches!(
+ e,
+ LoopError::RPC(rpc::Error::RPC { .. }) | LoopError::Concurrency
+ );
} else {
skip_notification = false;
}
}
}
+/// List new and removed transaction since the last sync state, returning a new sync state
fn list_since_block_state(
rpc: &mut Rpc,
address: &Address,
@@ -198,135 +218,9 @@ fn sync_chain(
for (tx, confirmation) in txs {
if tx.to == Some(state.address) && confirmation >= min_confirmations {
- match InMetadata::decode(&tx.input) {
- Ok(metadata) => match metadata {
- InMetadata::Deposit { reserve_pub } => {
- let date = SystemTime::now();
- let amount = eth_to_taler(&tx.value);
- let credit_addr = tx.from.expect("Not coinbase");
- let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
- &date, &amount.to_string(), &reserve_pub.as_ref(), ð_payto_url(&credit_addr).as_ref(), &state.config.payto.as_ref()
- ])?;
- if nb > 0 {
- info!(
- "<< {} {} in {} from {}",
- amount,
- base32(&reserve_pub),
- hex::encode(tx.hash),
- hex::encode(credit_addr),
- );
- }
- }
- },
- Err(_) => {
- // If encoding is wrong request a bounce
- db.execute(
- "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING",
- &[&tx.hash.as_ref()],
- )?;
- }
- }
+ sync_chain_incoming_confirmed(&tx, db, state)?;
} else if tx.from == Some(state.address) {
- match OutMetadata::decode(&tx.input) {
- Ok(metadata) => match metadata {
- OutMetadata::Withdraw { wtid, .. } => {
- let amount = eth_to_taler(&tx.value);
- let credit_addr = tx.to.unwrap();
- // Get previous out tx
- let row = db.query_opt(
- "SELECT id, status, txid FROM tx_out WHERE wtid=$1 FOR UPDATE",
- &[&wtid.as_ref()],
- )?;
- if let Some(row) = row {
- // If already in database, sync status
- let row_id: i32 = row.get(0);
- let status: i16 = row.get(1);
- match WithdrawStatus::try_from(status as u8).unwrap() {
- WithdrawStatus::Requested => {
- let nb_row = db.execute(
- "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3 AND status=$4",
- &[
- &(WithdrawStatus::Sent as i16),
- &tx.hash.as_ref(),
- &row_id,
- &status,
- ],
- )?;
- if nb_row > 0 {
- warn!(
- ">> (recovered) {} {} in {} to {}",
- amount,
- base32(&wtid),
- hex::encode(tx.hash),
- hex::encode(credit_addr)
- );
- }
- }
- WithdrawStatus::Sent => { /* Status is correct */ }
- }
- } else {
- // Else add to database
- let date = SystemTime::now();
- let nb = db.execute(
- "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
- &[&date, &amount.to_string(), &wtid.as_ref(), ð_payto_url(&state.address).as_ref(), ð_payto_url(&credit_addr).as_ref(), &state.config.base_url.as_ref(), &(WithdrawStatus::Sent as i16), &tx.hash.as_ref(), &None::<&[u8]>],
- )?;
- if nb > 0 {
- warn!(
- ">> (onchain) {} {} in {} to {}",
- amount,
- base32(&wtid),
- hex::encode(tx.hash),
- hex::encode(credit_addr)
- );
- }
- }
- }
- OutMetadata::Bounce { bounced } => {
- // Get previous bounce
- let row = db.query_opt(
- "SELECT id, status FROM bounce WHERE bounced=$1",
- &[&bounced.as_ref()],
- )?;
- if let Some(row) = row {
- // If already in database, sync status
- let row_id: i32 = row.get(0);
- let status: i16 = row.get(1);
- match BounceStatus::try_from(status as u8).unwrap() {
- BounceStatus::Requested => {
- let nb_row = db.execute(
- "UPDATE bounce SET status=$1, txid=$2 WHERE id=$3 AND status=$4",
- &[&(BounceStatus::Sent as i16), &tx.hash.as_ref(), &row_id, &status],
- )?;
- if nb_row > 0 {
- warn!(
- "|| (recovered) {} in {}",
- &bounced,
- hex::encode(tx.hash)
- );
- }
- }
- BounceStatus::Ignored => error!(
- "watcher: ignored bounce {} found in chain at {}",
- bounced,
- hex::encode(tx.hash)
- ),
- BounceStatus::Sent => { /* Status is correct */ }
- }
- } else {
- // Else add to database
- let nb = db.execute(
- "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING",
- &[&bounced.as_ref(), &tx.hash.as_ref(), &(BounceStatus::Sent as i16)],
- )?;
- if nb > 0 {
- warn!("|| (onchain) {} in {}", &bounced, hex::encode(tx.hash));
- }
- }
- }
- },
- Err(_) => { /* Ignore */ }
- }
+ sync_chain_outgoing(&tx, db, state)?;
}
}
@@ -420,6 +314,149 @@ fn sync_chain_removed(
}
}
+/// Sync database with an incoming confirmed transaction
+fn sync_chain_incoming_confirmed(
+ tx: &Transaction,
+ db: &mut Client,
+ state: &WireState,
+) -> Result<(), LoopError> {
+ match InMetadata::decode(&tx.input) {
+ Ok(metadata) => match metadata {
+ InMetadata::Deposit { reserve_pub } => {
+ let date = SystemTime::now();
+ let amount = eth_to_taler(&tx.value);
+ let credit_addr = tx.from.expect("Not coinbase");
+ let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[
+ &date, &amount.to_string(), &reserve_pub.as_ref(), ð_payto_url(&credit_addr).as_ref(), &state.config.payto.as_ref()
+ ])?;
+ if nb > 0 {
+ info!(
+ "<< {} {} in {} from {}",
+ amount,
+ base32(&reserve_pub),
+ hex::encode(tx.hash),
+ hex::encode(credit_addr),
+ );
+ }
+ }
+ },
+ Err(_) => {
+ // If encoding is wrong request a bounce
+ db.execute(
+ "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING",
+ &[&tx.hash.as_ref()],
+ )?;
+ }
+ }
+ Ok(())
+}
+
+/// Sync database with an outgoing transaction
+fn sync_chain_outgoing(tx: &Transaction, db: &mut Client, state: &WireState) -> LoopResult<()> {
+ match OutMetadata::decode(&tx.input) {
+ Ok(metadata) => match metadata {
+ OutMetadata::Withdraw { wtid, .. } => {
+ let amount = eth_to_taler(&tx.value);
+ let credit_addr = tx.to.unwrap();
+ // Get previous out tx
+ let row = db.query_opt(
+ "SELECT id, status, txid FROM tx_out WHERE wtid=$1 FOR UPDATE",
+ &[&wtid.as_ref()],
+ )?;
+ if let Some(row) = row {
+ // If already in database, sync status
+ let row_id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ match WithdrawStatus::try_from(status as u8).unwrap() {
+ WithdrawStatus::Requested => {
+ let nb_row = db.execute(
+ "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3 AND status=$4",
+ &[
+ &(WithdrawStatus::Sent as i16),
+ &tx.hash.as_ref(),
+ &row_id,
+ &status,
+ ],
+ )?;
+ if nb_row > 0 {
+ warn!(
+ ">> (recovered) {} {} in {} to {}",
+ amount,
+ base32(&wtid),
+ hex::encode(tx.hash),
+ hex::encode(credit_addr)
+ );
+ }
+ }
+ WithdrawStatus::Sent => { /* Status is correct */ }
+ }
+ } else {
+ // Else add to database
+ let date = SystemTime::now();
+ let nb = db.execute(
+ "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING",
+ &[&date, &amount.to_string(), &wtid.as_ref(), ð_payto_url(&state.address).as_ref(), ð_payto_url(&credit_addr).as_ref(), &state.config.base_url.as_ref(), &(WithdrawStatus::Sent as i16), &tx.hash.as_ref(), &None::<&[u8]>],
+ )?;
+ if nb > 0 {
+ warn!(
+ ">> (onchain) {} {} in {} to {}",
+ amount,
+ base32(&wtid),
+ hex::encode(tx.hash),
+ hex::encode(credit_addr)
+ );
+ }
+ }
+ }
+ OutMetadata::Bounce { bounced } => {
+ // Get previous bounce
+ let row = db.query_opt(
+ "SELECT id, status FROM bounce WHERE bounced=$1",
+ &[&bounced.as_ref()],
+ )?;
+ if let Some(row) = row {
+ // If already in database, sync status
+ let row_id: i32 = row.get(0);
+ let status: i16 = row.get(1);
+ match BounceStatus::try_from(status as u8).unwrap() {
+ BounceStatus::Requested => {
+ let nb_row = db.execute(
+ "UPDATE bounce SET status=$1, txid=$2 WHERE id=$3 AND status=$4",
+ &[
+ &(BounceStatus::Sent as i16),
+ &tx.hash.as_ref(),
+ &row_id,
+ &status,
+ ],
+ )?;
+ if nb_row > 0 {
+ warn!("|| (recovered) {} in {}", &bounced, hex::encode(tx.hash));
+ }
+ }
+ BounceStatus::Ignored => error!(
+ "watcher: ignored bounce {} found in chain at {}",
+ bounced,
+ hex::encode(tx.hash)
+ ),
+ BounceStatus::Sent => { /* Status is correct */ }
+ }
+ } else {
+ // Else add to database
+ let nb = db.execute(
+ "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING",
+ &[&bounced.as_ref(), &tx.hash.as_ref(), &(BounceStatus::Sent as i16)],
+ )?;
+ if nb > 0 {
+ warn!("|| (onchain) {} in {}", &bounced, hex::encode(tx.hash));
+ }
+ }
+ }
+ },
+ Err(_) => { /* Ignore */ }
+ }
+ Ok(())
+}
+
/// Send a withdraw transaction on the blockchain, return false if no more requested transaction are found
fn withdraw(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> {
// We rely on the advisory lock to ensure we are the only one sending transactions
@@ -434,6 +471,7 @@ fn withdraw(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<boo
let addr = sql_addr(row, 3);
let url = sql_url(row, 4);
let tx_id = rpc.withdraw(state.address, addr, amount, wtid, url)?;
+ fail_point("(injected) fail withdraw", 0.3)?;
db.execute(
"UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3",
&[&(WithdrawStatus::Sent as i16), &tx_id.as_ref(), &id],
@@ -463,6 +501,7 @@ fn bounce(db: &mut Client, rpc: &mut Rpc, fee: U256) -> LoopResult<bool> {
match rpc.bounce(bounced, fee) {
Ok(it) => {
+ fail_point("(injected) fail bounce", 0.3)?;
db.execute(
"UPDATE bounce SET txid=$1, status=$2 WHERE id=$3",
&[&it.as_ref(), &(BounceStatus::Sent as i16), &id],
diff --git a/eth-wire/src/main.rs b/eth-wire/src/main.rs
@@ -26,8 +26,10 @@ use eth_wire::{
taler_util::eth_payto_addr,
};
use ethereum_types::H160;
+use fail_point::Injected;
use loops::{watcher::watcher, worker::worker};
+mod fail_point;
mod loops;
mod sql;
@@ -43,10 +45,10 @@ pub enum LoopError {
RPC(#[from] rpc::Error),
#[error(transparent)]
DB(#[from] postgres::Error),
- #[error("Another btc_wire process is running concurrently")]
+ #[error("Another eth-wire process is running concurrently")]
Concurrency,
- // #[error(transparent)]
- // Injected(#[from] Injected),
+ #[error(transparent)]
+ Injected(#[from] Injected),
}
pub type LoopResult<T> = Result<T, LoopError>;
diff --git a/makefile b/makefile
@@ -23,6 +23,7 @@ test_eth: install
test/eth/wire.sh
test/eth/lifetime.sh
test/eth/reconnect.sh
+ test/eth/stress.sh
test/eth/reorg.sh
test: install test_gateway test_eth test_btc
\ No newline at end of file
diff --git a/test/btc/analysis.sh b/test/btc/analysis.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-## Test btc_wire ability to learn and protect itself from blockchain behavior
+## Test btc-wire ability to learn and protect itself from blockchain behavior
set -eu
@@ -30,7 +30,7 @@ btc_deco
echo -n "Making wire transfer to exchange:"
btc-wire-utils -d $WIRE_DIR transfer 0.042 > /dev/null
-next_btc # Trigger btc_wire
+next_btc # Trigger btc-wire
check_balance 9.95799209 0.04200000
echo " OK"
diff --git a/test/btc/bumpfee.sh b/test/btc/bumpfee.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-## Test btc_wire ability to handle stuck transaction correctly
+## Test btc-wire ability to handle stuck transaction correctly
set -eu
@@ -30,7 +30,7 @@ for n in `$SEQ`; do
btc-wire-utils -d $WIRE_DIR transfer 0.$n > /dev/null
mine_btc # Mine transactions
done
-next_btc # Trigger btc_wire
+next_btc # Trigger btc-wire
check_balance 5.79983389 4.20000000
echo " OK"
@@ -85,7 +85,7 @@ echo " OK"
echo "----- Bump fee stress -----"
-echo -n "Replace btc_wire with stressed btc_wire"
+echo -n "Replace btc-wire with stressed btc-wire"
kill $WIRE_PID
stress_btc_wire
echo " OK"
diff --git a/test/btc/config.sh b/test/btc/config.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-## Test btc_wire ability to configure itself from bitcoin configuration
+## Test btc-wire ability to configure itself from bitcoin configuration
set -eu
diff --git a/test/btc/conflict.sh b/test/btc/conflict.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-## Test btc_wire ability to handle conflicting outgoing transactions
+## Test btc-wire ability to handle conflicting outgoing transactions
set -eu
diff --git a/test/btc/hell.sh b/test/btc/hell.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-## Test btc_wire correctness when a blockchain reorganisation occurs leading to past incoming transaction conflict
+## Test btc-wire correctness when a blockchain reorganisation occurs leading to past incoming transaction conflict
set -eu
@@ -30,7 +30,7 @@ btc_deco
echo -n "Gen incoming transactions:"
btc-wire-utils -d $WIRE_DIR transfer 0.0042 > /dev/null
-next_btc # Trigger btc_wire
+next_btc # Trigger btc-wire
check_balance 9.99579209 0.00420000
echo " OK"
diff --git a/test/btc/lifetime.sh b/test/btc/lifetime.sh
@@ -36,7 +36,7 @@ for n in `$SEQ`; do
btc-wire-utils -d $WIRE_DIR transfer 0.000$n > /dev/null
mine_btc # Mine transactions
done
-next_btc # Trigger btc_wire
+next_btc # Trigger btc-wire
check_balance 9.99826299 0.00165000
for n in `$SEQ`; do
taler-exchange-wire-gateway-client \
diff --git a/test/btc/maxfee.sh b/test/btc/maxfee.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-## Test btc_wire ability to handle stuck transaction correctly
+## Test btc-wire ability to handle stuck transaction correctly
set -eu
@@ -30,7 +30,7 @@ for n in `$SEQ`; do
btc-wire-utils -d $WIRE_DIR transfer 0.$n > /dev/null
mine_btc # Mine transactions
done
-next_btc # Trigger btc_wire
+next_btc # Trigger btc-wire
check_balance 5.79983389 4.20000000
echo " OK"
diff --git a/test/btc/reorg.sh b/test/btc/reorg.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-## Test btc_wire correctness when a blockchain reorganisation occurs
+## Test btc-wire correctness when a blockchain reorganisation occurs
set -eu
@@ -35,7 +35,7 @@ for n in `$SEQ`; do
btc-wire-utils -d $WIRE_DIR transfer 0.000$n > /dev/null
mine_btc # Mine transactions
done
-next_btc # Trigger btc_wire
+next_btc # Trigger btc-wire
check_delta "incoming?delta=-100" "$SEQ" "0.000"
check_balance 9.99826299 0.00165000
echo " OK"
diff --git a/test/btc/stress.sh b/test/btc/stress.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-## Test btc_wire ability to recover from errors in correctness critical paths and prevent concurrent sending
+## Test btc-wire ability to recover from errors in correctness critical paths and prevent concurrent sending
set -eu
@@ -15,7 +15,7 @@ echo "Start database"
setup_db
echo "Start bitcoin node"
init_btc
-echo "Start btc-wire stressed"
+echo "Start stressed btc-wire"
stress_btc_wire
echo "Start gateway"
gateway
@@ -50,9 +50,8 @@ for n in `$SEQ`; do
-C payto://bitcoin/$CLIENT \
-a BTC:0.0000$n > /dev/null
done
-sleep 10 # Give time for btc_wire worker to process
-mine_btc # Mine transactions
-check_balance 9.99605389 0.00373821
+sleep 10 # Give time for btc-wire worker to process
+next_btc # Mine transactions
echo " OK"
echo -n "Requesting exchange outgoing transaction list:"
@@ -63,8 +62,6 @@ echo -n "Check balance:"
check_balance 9.99605389 0.00373821
echo " OK"
-next_btc # Mine transactions
-
echo "----- Recover DB -----"
echo "Reset database"
diff --git a/test/btc/wire.sh b/test/btc/wire.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-## Test btc_wire correctly receive and send transactions on the blockchain
+## Test btc-wire correctly receive and send transactions on the blockchain
set -eu
@@ -30,7 +30,7 @@ for n in `$SEQ`; do
btc-wire-utils -d $WIRE_DIR transfer 0.000$n > /dev/null
mine_btc # Mine transactions
done
-next_btc # Trigger btc_wire
+next_btc # Trigger btc-wire
check_balance 9.95023810 0.04905000
echo " OK"
diff --git a/test/common.sh b/test/common.sh
@@ -179,9 +179,9 @@ function mine_btc() {
function next_btc() {
# Mine enough block to confirm previous transactions
mine_btc ${1:-$CONFIRMATION}
- # Wait for btc_wire to catch up
+ # Wait for btc-wire to catch up
sleep 0.2
- # Mine one more block to trigger btc_wire
+ # Mine one more block to trigger btc-wire
mine_btc
}
@@ -214,7 +214,7 @@ function btc_wire() {
WIRE_PID="$!"
}
-# Start multiple btc_wire with random failures in parallel
+# Start multiple btc-wire with random failures in parallel
function stress_btc_wire() {
cargo build --bin btc-wire --release --features fail &>> log/cargo.log
target/release/btc-wire $CONF &>> log/wire.log &
@@ -329,6 +329,13 @@ function eth_wire() {
WIRE_PID="$!"
}
+# Start multiple eth-wire with random failures in parallel
+function stress_eth_wire() {
+ cargo build --bin eth-wire --release --features fail &>> log/cargo.log
+ target/release/eth-wire $CONF &>> log/wire.log &
+ target/release/eth-wire $CONF &>> log/wire1.log &
+}
+
# Mine ethereum blocks
function mine_eth() {
$WIRE_UTILS mine $RESERVE ${1:-}
@@ -346,7 +353,7 @@ function next_eth() {
# ----- Gateway ------ #
-# Start wire_gateway in test mode
+# Start wire-gateway in test mode
function gateway() {
cargo build --bin wire-gateway --release --features test &> /dev/null
target/release/wire-gateway $CONF &>> log/gateway.log &
diff --git a/test/eth/stress.sh b/test/eth/stress.sh
@@ -0,0 +1,112 @@
+#!/bin/bash
+
+## Test eth-wire ability to recover from errors in correctness critical paths and prevent concurrent sending
+
+set -eu
+
+source "${BASH_SOURCE%/*}/../common.sh"
+SCHEMA=eth.sql
+CONFIG=taler_eth.conf
+
+echo "----- Setup -----"
+echo "Load config file"
+load_config
+echo "Start database"
+setup_db
+echo "Start ethereum node"
+init_eth
+echo "Start stressed eth-wire"
+stress_eth_wire
+echo "Start gateway"
+gateway
+echo ""
+
+SEQ="seq 10 30"
+
+echo "----- Handle incoming -----"
+
+echo -n "Making wire transfer to exchange:"
+eth-wire-utils -d $WIRE_DIR deposit $CLIENT $WIRE 0.000 `$SEQ`
+next_eth # Trigger eth-wire
+echo " OK"
+
+echo -n "Requesting exchange incoming transaction list:"
+check_delta "incoming?delta=-100" "$SEQ" "0.000"
+echo " OK"
+
+echo -n "Check balance:"
+check_balance_eth 999580000 420000
+echo " OK"
+
+echo "----- Handle outgoing -----"
+
+echo -n "Making wire transfer from exchange:"
+for n in `$SEQ`; do
+ taler-exchange-wire-gateway-client \
+ -b $BANK_ENDPOINT \
+ -C payto://ethereum/$CLIENT \
+ -a ETH:0.0000$n > /dev/null
+done
+sleep 1
+next_eth # Mine transactions
+echo " OK"
+
+echo -n "Requesting exchange's outgoing transaction list:"
+check_delta "outgoing?delta=-100" "$SEQ"
+echo " OK"
+
+echo -n "Check balance:"
+check_balance_eth 999622000 378000
+echo " OK"
+
+echo "----- Recover DB -----"
+
+echo "Reset database"
+reset_db
+mine_eth 1 # Trigger worker
+sleep 10
+
+echo -n "Requesting exchange incoming transaction list:"
+check_delta "incoming?delta=-100" "$SEQ" "0.000"
+echo " OK"
+
+echo -n "Requesting exchange outgoing transaction list:"
+check_delta "outgoing?delta=-100" "$SEQ"
+echo " OK"
+
+echo -n "Balance have not changed:"
+check_balance_eth 999622000 378000
+echo " OK"
+
+echo "----- Handle bounce -----"
+
+echo -n "Bounce:"
+eth-wire-utils -d $WIRE_DIR send $CLIENT $WIRE 0.000 `$SEQ`
+sleep 1
+next_eth
+echo " OK"
+
+echo -n "Check balance:"
+check_balance_eth 999601000 399000
+echo " OK"
+
+echo "----- Recover DB -----"
+
+echo "Reset database"
+reset_db
+mine_eth 1 # Trigger worker
+sleep 10
+
+echo -n "Requesting exchange incoming transaction list:"
+check_delta "incoming?delta=-100" "$SEQ" "0.000"
+echo " OK"
+
+echo -n "Requesting exchange outgoing transaction list:"
+check_delta "outgoing?delta=-100" "$SEQ"
+echo " OK"
+
+echo -n "Balance have not changed:"
+check_balance_eth 999601000 399000
+echo " OK"
+
+echo "All tests passed!"
+\ No newline at end of file
diff --git a/test/gateway/api.sh b/test/gateway/api.sh
@@ -1,6 +1,6 @@
#!/bin/bash
-## Test wire_gateway conformance to documentation and its security
+## Test wire-gateway conformance to documentation and its security
set -eu
diff --git a/wire-gateway/README.md b/wire-gateway/README.md
@@ -9,7 +9,7 @@ Rust server for [Taler Wire Gateway HTTP API](https://docs.taler.net/core/api-wi
## Configuration
``` ini
-# taler.conf - wire_gateway config
+# taler.conf - wire-gateway config
[depolymerizer-___]
PORT = 8080
UNIXPATH =