depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 32fc54e7fc2db2e4f68d7bf96ab0e46c66d0de6e
parent 0412d20a31c0b49199f4d418611d40f512aa368a
Author: Antoine A <>
Date:   Tue, 25 Jan 2022 14:34:45 +0100

Break long function in two and other improvements

Diffstat:
MCargo.lock | 4++--
MREADME.md | 4++--
Mbtc-wire/src/loops.rs | 2+-
Dbtc-wire/src/loops/listener.rs | 35-----------------------------------
Abtc-wire/src/loops/watcher.rs | 35+++++++++++++++++++++++++++++++++++
Mbtc-wire/src/loops/worker.rs | 325+++++++++++++++++++++++++++++++++++++++++++-------------------------------------
Mbtc-wire/src/main.rs | 16++++++++--------
Adocs/presentation.tex | 120+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
8 files changed, 343 insertions(+), 198 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -799,9 +799,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.113" +version = "0.2.114" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eef78b64d87775463c549fbd80e19249ef436ea3bf1de2a1eb7e717ec7fab1e9" +checksum = "b0005d08a8f7b65fb8073cb697aa0b12b631ed251ce73d862ce50eeb52ce3b50" [[package]] name = "listenfd" diff --git a/README.md b/README.md @@ -51,7 +51,7 @@ Modules have specific configuration: ``` ┌─────┐ ┌────────────────┐ ┌──────────┐ - │Taler├───┤Depolymerization├───┤Blockchain│ + │Taler│◄─►│Depolymerization│◄─►│Blockchain│ └─────┘ └────────────────┘ └──────────┘ ``` @@ -59,6 +59,6 @@ Modules have specific configuration: ``` ┌────────────┐ ┌──────────┐ ┌────────┐ - │wire_gateway├───┤PostgreSQL├───┤###_wire│ + │wire_gateway│◄─►│PostgreSQL│◄─►│###_wire│ └────────────┘ └──────────┘ └────────┘ ``` \ No newline at end of file diff --git a/btc-wire/src/loops.rs b/btc-wire/src/loops.rs @@ -19,7 +19,7 @@ use btc_wire::rpc; use crate::fail_point::Injected; pub mod analysis; -pub mod listener; +pub mod watcher; pub mod worker; #[derive(Debug, thiserror::Error)] diff --git a/btc-wire/src/loops/listener.rs b/btc-wire/src/loops/listener.rs @@ -1,35 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -use taler_common::log::log::error; - -use crate::reconnect::{AutoReconnectRPC, AutoReconnectSql}; - -use super::LoopResult; - -/// Wait for new block and notify arrival with postgreSQL notifications -pub fn block_listener(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) { - loop { - let rpc = rpc.client(); - let db = db.client(); - let result: LoopResult<()> = (|| loop { - rpc.wait_for_new_block(0)?; - db.execute("NOTIFY new_block", &[])?; - })(); - if let Err(e) = result { - error!("listener: {}", e); - } - } -} diff --git a/btc-wire/src/loops/watcher.rs b/btc-wire/src/loops/watcher.rs @@ -0,0 +1,35 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ +use taler_common::log::log::error; + +use crate::reconnect::{AutoReconnectRPC, AutoReconnectSql}; + +use super::LoopResult; + +/// Wait for new block and notify arrival with postgreSQL notifications +pub fn watcher(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) { + loop { + let rpc = rpc.client(); + let db = db.client(); + let result: LoopResult<()> = (|| loop { + rpc.wait_for_new_block(0)?; + db.execute("NOTIFY new_block", &[])?; + })(); + if let Err(e) = result { + error!("watcher: {}", e); + } + } +} diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs @@ -22,7 +22,7 @@ use std::{ use bitcoin::{hashes::Hash, Amount as BtcAmount, BlockHash, Txid}; use btc_wire::{ - rpc::{self, BtcRpc, Category, ErrorCode}, + rpc::{self, BtcRpc, Category, ErrorCode, TransactionFull}, rpc_utils::sender_address, GetOpReturnErr, GetSegwitErr, }; @@ -142,7 +142,7 @@ fn send(db: &mut Client, rpc: &mut BtcRpc, status: TxStatus) -> LoopResult<bool> assert!(status == TxStatus::Delayed || status == TxStatus::Requested); // We rely on the advisory lock to ensure we are the only one sending transactions let row = db.query_opt( - "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 LIMIT 1", + "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY _date LIMIT 1", &[&(status as i16)], )?; if let Some(row) = &row { @@ -186,7 +186,7 @@ fn bounce( assert!(status == BounceStatus::Delayed || status == BounceStatus::Requested); // We rely on the advisory lock to ensure we are the only one sending transactions let row = db.query_opt( - "SELECT id, bounced FROM bounce WHERE status=$1 LIMIT 1", + "SELECT id, bounced FROM bounce WHERE status=$1 ORDER BY _date LIMIT 1", &[&(status as i16)], )?; if let Some(row) = &row { @@ -405,168 +405,193 @@ fn sync_chain_outgoing( { Ok((full, Ok(info))) => match info { Info::Transaction { wtid, .. } => { - let credit_addr = full.details[0].address.as_ref().unwrap(); - let amount = btc_to_taler(&full.amount); - - if confirmations < 0 { - if full.replaced_by_txid.is_none() { - // Handle conflicting tx - let nb_row = db.execute( - "UPDATE tx_out SET status=$1, txid=NULL where txid=$2", - &[&(TxStatus::Delayed as i16), &id.as_ref()], - )?; - if nb_row > 0 { - warn!( - ">> (conflict) {} in {} to {}", - base32(&wtid), - id, - credit_addr - ); - } - } - } else { - // Get previous out tx - let row = db.query_opt( - "SELECT id, status, txid FROM tx_out WHERE wtid=$1 FOR UPDATE", - &[&wtid.as_ref()], + sync_chain_outgoing_send(id, &full, &wtid, rpc, db, confirmations, config)? + } + Info::Bounce { bounced } => { + sync_chain_outgoing_bounce(id, &bounced, db, confirmations)? + } + }, + Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e), + Err(e) => match e { + GetOpReturnErr::MissingOpReturn => { /* Ignore */ } + GetOpReturnErr::RPC(e) => return Err(e)?, + }, + } + Ok(()) +} + +/// Sync database with an outgoing send transaction +fn sync_chain_outgoing_send( + id: &Txid, + full: &TransactionFull, + wtid: &[u8; 32], + rpc: &mut BtcRpc, + db: &mut Client, + confirmations: i32, + config: &Config, +) -> LoopResult<()> { + let credit_addr = full.details[0].address.as_ref().unwrap(); + let amount = btc_to_taler(&full.amount); + + if confirmations < 0 { + if full.replaced_by_txid.is_none() { + // Handle conflicting tx + let nb_row = db.execute( + "UPDATE tx_out SET status=$1, txid=NULL where txid=$2", + &[&(TxStatus::Delayed as i16), &id.as_ref()], + )?; + if nb_row > 0 { + warn!( + ">> (conflict) {} in {} to {}", + base32(wtid), + id, + credit_addr + ); + } + } + } else { + // Get previous out tx + let row = db.query_opt( + "SELECT id, status, txid FROM tx_out WHERE wtid=$1 FOR UPDATE", + &[&wtid.as_ref()], + )?; + if let Some(row) = row { + // If already in database, sync status + let row_id: i32 = row.get(0); + let status: i16 = row.get(1); + match TxStatus::try_from(status as u8).unwrap() { + TxStatus::Requested | TxStatus::Delayed => { + let nb_row = db.execute( + "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3 AND status=$4", + &[&(TxStatus::Sent as i16), &id.as_ref(), &row_id, &status], )?; - if let Some(row) = row { - // If already in database sync status - let row_id: i32 = row.get(0); - let status: i16 = row.get(1); - match TxStatus::try_from(status as u8).unwrap() { - TxStatus::Requested | TxStatus::Delayed => { - let nb_row = db.execute( - "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3 AND status=$4", - &[&(TxStatus::Sent as i16), &id.as_ref(), &row_id, &status], - )?; - if nb_row > 0 { - warn!( - ">> (recovered) {} {} in {} to {}", - amount, - base32(&wtid), - id, - credit_addr - ); - } - } - TxStatus::Sent => { - if let Some(txid) = full.replaces_txid { - let stored_id = sql_txid(&row, 2); - if txid == stored_id { - let nb_row = db.execute( - "UPDATE tx_out SET txid=$1 WHERE txid=$2", - &[&id.as_ref(), &txid.as_ref()], - )?; - if nb_row > 0 { - info!( - ">> (recovered) {} replace {} with {}", - base32(&wtid), - txid, - id - ); - } - } - } - } - } - } else { - // Else add to database - let debit_addr = sender_address(rpc, &full)?; - let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); - let nb = db.execute( - "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", - &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &None::<&[u8]>], - )?; - if nb > 0 { - warn!( - ">> (onchain) {} {} in {} to {}", - amount, - base32(&wtid), - id, - credit_addr - ); - } + if nb_row > 0 { + warn!( + ">> (recovered) {} {} in {} to {}", + amount, + base32(wtid), + id, + credit_addr + ); } - - if let Some(delay) = config.bump_delay { - if confirmations == 0 && full.replaced_by_txid.is_none() { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - if now - full.time > delay as u64 { - let bump = rpc.bump_fee(id)?; - fail_point("(injected) fail bump", 0.3)?; - db.execute( - "UPDATE tx_out SET txid=$1 WHERE txid=$2", - &[&bump.txid.as_ref(), &id.as_ref()], - )?; + } + TxStatus::Sent => { + if let Some(txid) = full.replaces_txid { + let stored_id = sql_txid(&row, 2); + if txid == stored_id { + let nb_row = db.execute( + "UPDATE tx_out SET txid=$1 WHERE txid=$2", + &[&id.as_ref(), &txid.as_ref()], + )?; + if nb_row > 0 { info!( - ">> (bump) {} replace {} with {}", - base32(&wtid), - id, - bump.txid + ">> (recovered) {} replace {} with {}", + base32(wtid), + txid, + id ); } } } } } - Info::Bounce { bounced } => { - if confirmations < 0 { - // Handle conflicting tx + } else { + // Else add to database + let debit_addr = sender_address(rpc, &full)?; + let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); + let nb = db.execute( + "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", + &[&date, &amount.to_string(), &wtid.as_ref(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(credit_addr).as_ref(), &config.base_url.as_ref(), &(TxStatus::Sent as i16), &id.as_ref(), &None::<&[u8]>], + )?; + if nb > 0 { + warn!( + ">> (onchain) {} {} in {} to {}", + amount, + base32(wtid), + id, + credit_addr + ); + } + } + + if let Some(delay) = config.bump_delay { + if confirmations == 0 && full.replaced_by_txid.is_none() { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + if now - full.time > delay as u64 { + let bump = rpc.bump_fee(id)?; + fail_point("(injected) fail bump", 0.3)?; + db.execute( + "UPDATE tx_out SET txid=$1 WHERE txid=$2", + &[&bump.txid.as_ref(), &id.as_ref()], + )?; + info!( + ">> (bump) {} replace {} with {}", + base32(wtid), + id, + bump.txid + ); + } + } + } + } + Ok(()) +} + +/// Sync database with an outgoing bounce transaction +fn sync_chain_outgoing_bounce( + id: &Txid, + bounced: &Txid, + db: &mut Client, + confirmations: i32, +) -> LoopResult<()> { + if confirmations < 0 { + // Handle conflicting tx + let nb_row = db.execute( + "UPDATE bounce SET status=$1, txid=NULL where txid=$2", + &[&(BounceStatus::Delayed as i16), &id.as_ref()], + )?; + if nb_row > 0 { + warn!("|| (conflict) {} in {}", &bounced, &id); + } + } else { + // Get previous bounce + let row = db.query_opt( + "SELECT id, status FROM bounce WHERE bounced=$1", + &[&bounced.as_ref()], + )?; + if let Some(row) = row { + // If already in database, sync status + let row_id: i32 = row.get(0); + let status: i16 = row.get(1); + match BounceStatus::try_from(status as u8).unwrap() { + BounceStatus::Requested | BounceStatus::Delayed => { let nb_row = db.execute( - "UPDATE bounce SET status=$1, txid=NULL where txid=$2", - &[&(BounceStatus::Delayed as i16), &id.as_ref()], + "UPDATE bounce SET status=$1, txid=$2 WHERE id=$3 AND status=$4", + &[&(BounceStatus::Sent as i16), &id.as_ref(), &row_id, &status], )?; if nb_row > 0 { - warn!("|| (conflict) {} in {}", &bounced, &id); - } - } else { - // Get previous bounce - let row = db.query_opt( - "SELECT id, status FROM bounce WHERE bounced=$1", - &[&bounced.as_ref()], - )?; - if let Some(row) = row { - // If already in database sync status - let row_id: i32 = row.get(0); - let status: i16 = row.get(1); - match BounceStatus::try_from(status as u8).unwrap() { - BounceStatus::Requested | BounceStatus::Delayed => { - let nb_row = db.execute( - "UPDATE bounce SET status=$1, txid=$2 WHERE id=$3 AND status=$4", - &[&(BounceStatus::Sent as i16), &id.as_ref(), &row_id, &status], - )?; - if nb_row > 0 { - warn!("|| (recovered) {} in {}", &bounced, &id); - } - } - BounceStatus::Ignored => error!( - "watcher: ignored bounce {} found in chain at {}", - bounced, id - ), - BounceStatus::Sent => { /* Status is correct */ } - } - } else { - // Else add to database - let nb = db.execute( - "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", - &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)], - )?; - if nb > 0 { - warn!("|| (onchain) {} in {}", &bounced, &id); - } + warn!("|| (recovered) {} in {}", &bounced, &id); } } + BounceStatus::Ignored => error!( + "watcher: ignored bounce {} found in chain at {}", + bounced, id + ), + BounceStatus::Sent => { /* Status is correct */ } } - }, - Ok((_, Err(e))) => warn!("send: decode-info {} - {}", id, e), - Err(e) => match e { - GetOpReturnErr::MissingOpReturn => { /* Ignore */ } - GetOpReturnErr::RPC(e) => return Err(e)?, - }, + } else { + // Else add to database + let nb = db.execute( + "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", + &[&bounced.as_ref(), &id.as_ref(), &(BounceStatus::Sent as i16)], + )?; + if nb > 0 { + warn!("|| (onchain) {} in {}", &bounced, &id); + } + } } Ok(()) } diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs @@ -23,7 +23,7 @@ use reconnect::{AutoReconnectRPC, AutoReconnectSql}; use std::{sync::atomic::AtomicU16, thread::JoinHandle}; use taler_common::{config::Config, log::log::info}; -use crate::loops::{analysis::analysis, listener::block_listener, worker::worker}; +use crate::loops::{analysis::analysis, watcher::watcher, worker::worker}; mod fail_point; mod info; @@ -72,27 +72,27 @@ fn main() { let mut rpc = BtcRpc::common(&btc_config).unwrap(); rpc.load_wallet(WIRE_WALLET_NAME).ok(); - let rpc_listener = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME); + let rpc_watcher = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME); let rpc_analysis = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME); let rpc_worker = AutoReconnectRPC::new(btc_config, WIRE_WALLET_NAME); - let db_listener = AutoReconnectSql::new(&config.db_url); + let db_watcher = AutoReconnectSql::new(&config.db_url); let db_analysis = AutoReconnectSql::new(&config.db_url); let db_worker = AutoReconnectSql::new(&config.db_url); - named_spawn("listener".to_string(), move || { - block_listener(rpc_listener, db_listener) + named_spawn("watcher", move || { + watcher(rpc_watcher, db_watcher) }); - named_spawn("analysis".to_string(), move || { + named_spawn("analysis", move || { analysis(rpc_analysis, db_analysis, config, state) }); worker(rpc_worker, db_worker, config, state); } -pub fn named_spawn<F, T>(name: String, f: F) -> JoinHandle<T> +pub fn named_spawn<F, T>(name: impl Into<String>, f: F) -> JoinHandle<T> where F: FnOnce() -> T, F: Send + 'static, T: Send + 'static, { - std::thread::Builder::new().name(name).spawn(f).unwrap() + std::thread::Builder::new().name(name.into()).spawn(f).unwrap() } diff --git a/docs/presentation.tex b/docs/presentation.tex @@ -0,0 +1,119 @@ +\documentclass{beamer} + +\usetheme{default} + +\usepackage{tikz} +\usetikzlibrary{positioning} + +\title{Depolymerization WIP} +\subtitle{TODO} +\author{TODO} +\institute{TODO} +\date{\today} + +\begin{document} + +\begin{frame} + \titlepage +\end{frame} + +\begin{frame}{Outline} + \tableofcontents +\end{frame} + +\begin{frame}{Introduction}{Blockchain} + \begin{block}{Common blockchain limitation} + \begin{itemize} + \item \textbf{Slow} limited amount of transaction per seconds + \item \textbf{Delayed} block and confirmation delay + \item \textbf{Expensive} transaction fees + \item \textbf{Privacy} pseudonym and noise + \item \textbf{Compatibility} multiple blockchain and transactions format + \end{itemize} + \end{block} + + \begin{block}{Biggest cryptocurrencies} + \begin{itemize} + \item \textbf{BTC} Bitcoin + \item \textbf{ETH} Ethereum + \end{itemize} + \end{block} +\end{frame} + +\begin{frame}{Introduction}{Related work} + \begin{block}{Centralization - Coinbase's off-chain sending} + \begin{itemize} + \item [$+$] Fast and cheap: off chain transaction + \item [$-$] Trust in Coinbase: privacy, security \& transparency + \end{itemize} + \end{block} + \begin{block}{Layering - Lightning Network} + \begin{itemize} + \item [$+$] Fast and cheap: off chain transaction + \item [$-$] Incompatible with Bitcoin: require setting up a Bidirectional Payment Channels + \item [$-$] Fraud attempts are mitigated via a complex penalty system + \end{itemize} + \end{block} +\end{frame} + +\begin{frame}{Introduction}{Taler} + \begin{itemize} + \item [$-$] Trust exchange operator or auditors + \item [$+$] Fast and cheap, no blockchain + \item [$+$] Privacy when it can, transparency when it must (avoid tax evasion and money laundering) + \item [$+$] Currency agnostic + \end{itemize} +\end{frame} + +\begin{frame}{Architecture}{Taler bridge to blockchain} + \begin{center} + \begin{tikzpicture}[ + rect/.style={rectangle, draw=black}, + sym/.style={<->, shorten >= 2pt, shorten <= 2pt} + ] + \node[rect](1) {Taler}; + \node[rect, right= of 1](2) {Depolymerization}; + \node[rect, right= of 2](3) {Blockchain}; + + \draw[sym] (1) -- (2); + \draw[sym] (2) -- (3); + \end{tikzpicture} + \end{center} +\end{frame} + +\begin{frame}{Architecture}{depolymerization} + \begin{center} + \begin{tikzpicture}[ + rect/.style={rectangle, draw=black}, + sym/.style={<->, shorten >= 2pt, shorten <= 2pt} + ] + \node[rect](1) {Taler}; + \node[rect, below= of 1](2) {wire\_gateway}; + \node[rect, right= of 2](3) {PostgreSQL}; + \node[rect, right= of 3](4) {btc\_wire}; + \node[rect, above= of 4](5) {bitcoind}; + + \draw[sym] (1) -- node [midway, right] {\tiny HTTP} (2); + \draw[sym] (2) -- node [midway, above] {\tiny SQL} (3); + \draw[sym] (3) -- node [midway, above] {\tiny SQL} (4); + \draw[sym] (4) -- node [midway, left] {\tiny RPC} (5); + \end{tikzpicture} + \end{center} +\end{frame} + +\begin{frame}{btc\_wire}{Architecture} + Three loops + Watcher: Notify other loops when a new block has been mined + Analyzer: Analyse blockchain to adapt defensively + Worker: Sync chain -> Send -> Bounce +\end{frame} + +\begin{frame}{btc\_wire}{Features} + \begin{itemize} + \item Handle stuck transactions + \item Fork resilient + \item Adaptive confirmation + \end{itemize} +\end{frame} + +\end{document} +\ No newline at end of file