depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 3429547dca06e9dcc8140581056158163e0ef633
parent 2ad5eb9d0cbc24e0c2c5d9ead73d4fbbd2515deb
Author: Antoine A <>
Date:   Wed,  2 Feb 2022 20:03:30 +0100

Cleanup

Diffstat:
Dbtc-wire/src/bin/btc-test.rs | 398-------------------------------------------------------------------------------
Mbtc-wire/src/bin/btc-wire-cli.rs | 6+++---
Mbtc-wire/src/bin/btc-wire-utils.rs | 10+++++-----
Dbtc-wire/src/info.rs | 128-------------------------------------------------------------------------------
Mbtc-wire/src/lib.rs | 4++--
Mbtc-wire/src/loops.rs | 2+-
Mbtc-wire/src/loops/worker.rs | 35+++++++++++++++++------------------
Mbtc-wire/src/main.rs | 6+++---
Abtc-wire/src/metadata.rs | 128+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mbtc-wire/src/reconnect.rs | 10+++++-----
Mbtc-wire/src/rpc.rs | 4++--
Mbtc-wire/src/rpc_utils.rs | 4++--
Deth-wire/src/bin/eth-test.rs | 140-------------------------------------------------------------------------------
Aeth-wire/src/loops.rs | 18++++++++++++++++++
Aeth-wire/src/loops/watcher.rs | 25+++++++++++++++++++++++++
Aeth-wire/src/loops/worker.rs | 150+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Meth-wire/src/main.rs | 184+------------------------------------------------------------------------------
Meth-wire/src/rpc.rs | 2+-
Aeth-wire/src/sql.rs | 41+++++++++++++++++++++++++++++++++++++++++
Meth-wire/src/taler_util.rs | 15+++++++++++++++
Mtaler-common/src/config.rs | 2+-
21 files changed, 421 insertions(+), 891 deletions(-)

diff --git a/btc-wire/src/bin/btc-test.rs b/btc-wire/src/bin/btc-test.rs @@ -1,398 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -use core::panic; -use std::{collections::HashSet, iter::repeat_with, panic::AssertUnwindSafe}; - -use bitcoin::{Address, Amount, Network, Txid}; -use btc_wire::{ - config::BitcoinConfig, - rpc::{self, BtcRpc, Category}, - rpc_utils::{default_data_dir, CLIENT, WIRE}, -}; -use taler_common::rand_slice; - -const RESERVE: &str = "reserve"; - -/// Instrumentation test -pub fn main() { - let test_amount = Amount::from_sat(1500); - let data_dir = default_data_dir(); - let config = BitcoinConfig::load(&data_dir).unwrap(); - // Network check - match config.network { - Network::Bitcoin => { - panic!("Do not run tests on the mainnet, you are going to loose money") - } - Network::Testnet | Network::Signet => { - println!("Running on testnet, slow network mining") - } - Network::Regtest => println!("Running on regtest, fast manual mining"), - } - - // Wallet check - { - let existing_wallets: HashSet<String> = std::fs::read_dir(config.dir.join("wallets")) - .unwrap() - .filter_map(|it| it.ok()) - .map(|it| it.file_name().to_string_lossy().to_string()) - .collect(); - - let wallets = [CLIENT, WIRE, RESERVE]; - - let mut rpc = BtcRpc::common(&config).unwrap(); - if !existing_wallets.contains(CLIENT) - || !existing_wallets.contains(WIRE) - || !existing_wallets.contains(RESERVE) - { - println!("Generate tests wallets"); - // Create wallets - for wallet in &wallets { - rpc.create_wallet(wallet).unwrap(); - } - } - - // Load wallets - for wallet in &wallets { - if let Err(e) = rpc.load_wallet(wallet) { - match e { - rpc::Error::RPC { code, .. } - if code == rpc::ErrorCode::RpcWalletAlreadyLoaded => {} - e => Err(e).unwrap(), - } - } - } - } - - // Client initialization - let mut client_rpc = BtcRpc::wallet(&config, CLIENT).unwrap(); - let mut wire_rpc = BtcRpc::wallet(&config, WIRE).unwrap(); - let mut reserve_rpc = BtcRpc::wallet(&config, RESERVE).unwrap(); - let client_addr = client_rpc.get_new_address().unwrap(); - let wire_addr = wire_rpc.get_new_address().unwrap(); - let reserve_addr = reserve_rpc.get_new_address().unwrap(); - - let next_block = |reserve_rpc: &mut BtcRpc| { - match config.network { - Network::Regtest => { - // Manually mine a block - reserve_rpc.generate(1, &reserve_addr).unwrap(); - } - _ => { - // Wait for next network block - reserve_rpc.wait_for_new_block(0).ok(); - } - } - }; - - let wait_for_tx = |rpc: &mut BtcRpc, reserve_rpc: &mut BtcRpc, txs: &[Txid]| { - let mut count = 0; - while txs - .iter() - .any(|id| rpc.get_tx(id).unwrap().confirmations <= 0) - { - next_block(reserve_rpc); - if count > 3 { - panic!("Transaction no sended after 4 blocks"); - } - count += 1; - } - }; - - // Balance check - - { - // Transfer all wire money to client - let wire_balance = wire_rpc.get_balance().unwrap(); - wire_rpc.send(&client_addr, &wire_balance, true).ok(); - // Transfer all wire money to client - let reserve_balance = reserve_rpc.get_balance().unwrap(); - reserve_rpc.send(&client_addr, &reserve_balance, true).ok(); - next_block(&mut reserve_rpc); - - let balance = client_rpc.get_balance().unwrap(); - let min_balance = test_amount * 3; - if balance < min_balance { - println!( - "Client wallet have only {}, {} are required to perform the test", - balance, min_balance - ); - match config.network { - Network::Bitcoin | Network::Testnet | Network::Signet => { - if config.network == Network::Bitcoin { - println!("Send coins to this address: {}", client_addr); - } else { - println!("Request coins from a faucet such as https://bitcoinfaucet.uo1.net/send.php to this address: {}", client_addr); - } - println!("Waiting for the transaction..."); - while client_rpc.get_balance().unwrap() < min_balance { - client_rpc.wait_for_new_block(0).ok(); - } - } - Network::Regtest => { - println!("Add 50B to client wallet"); - client_rpc - .generate(101 /* Need 100 blocks to validate */, &reserve_addr) - .unwrap(); - reserve_rpc - .send(&client_addr, &Amount::from_sat(5_000_000_000), true) - .unwrap(); - next_block(&mut reserve_rpc); - } - } - } - - println!( - "Initial state:\n{} {}\n{} {}", - WIRE, - wire_rpc.get_balance().unwrap(), - CLIENT, - client_rpc.get_balance().unwrap() - ); - } - - let mut runner = TestRunner::new(); - - runner.test("OpReturn metadata", || { - // Send metadata - let msg = "J'aime le chocolat".as_bytes(); - let id = client_rpc - .send_op_return(&wire_addr, &test_amount, msg, false, false) - .unwrap(); - // Check in mempool - assert!( - tx_exist(&mut client_rpc, &id, 0, Category::Send).unwrap(), - "Not in mempool" - ); - // Check mined - wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[id]); - assert!( - tx_exist(&mut wire_rpc, &id, 1, Category::Receive).unwrap(), - "Not mined" - ); - // Check extract - let (_, extracted) = wire_rpc.get_tx_op_return(&id).unwrap(); - assert_eq!(msg, extracted, "Corrupted metadata"); - }); - runner.test("SegWit metadata", || { - // Send metadata - let key = rand_slice(); - let id = client_rpc - .send_segwit_key(&wire_addr, &test_amount, &key) - .unwrap(); - // Check in mempool - assert!( - tx_exist(&mut client_rpc, &id, 0, Category::Send).unwrap(), - "Not in mempool" - ); - // Check mined - wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[id]); - assert!( - tx_exist(&mut wire_rpc, &id, 1, Category::Receive).unwrap(), - "Not mined" - ); - // Check extract - let (_, extracted) = wire_rpc.get_tx_segwit_key(&id).unwrap(); - assert_eq!(key, extracted, "Corrupted metadata"); - }); - - let bounce_fee = Amount::from_sat(300); - runner.test("Bounce simple", || { - let before = client_rpc.get_balance().unwrap(); - let send_id = client_rpc.send(&wire_addr, &test_amount, false).unwrap(); - wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[send_id]); - let bounce_id = wire_rpc.bounce(&send_id, &bounce_fee, &[]).unwrap(); - wait_for_tx(&mut wire_rpc, &mut reserve_rpc, &[bounce_id]); - let bounce_tx_fee = wire_rpc.get_tx(&bounce_id).unwrap().details[0] - .fee - .unwrap() - .abs() - .to_unsigned() - .unwrap(); - let send_tx_fee = client_rpc.get_tx(&send_id).unwrap().details[0] - .fee - .unwrap() - .abs() - .to_unsigned() - .unwrap(); - let after = client_rpc.get_balance().unwrap(); - assert!(before >= after); - assert_eq!(before - after, bounce_tx_fee + bounce_fee + send_tx_fee); - }); - runner.test("Bounce minimal amount", || { - let send_id = client_rpc - .send(&wire_addr, &Amount::from_sat(294), false) - .unwrap(); - wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[send_id]); - assert!(matches!( - wire_rpc.bounce(&send_id, &bounce_fee, &[]), - Err(rpc::Error::RPC { - code: rpc::ErrorCode::RpcWalletInsufficientFunds, - .. - }) - )); - }); - runner.test("Bounce simple with metadata", || { - let before = client_rpc.get_balance().unwrap(); - let send_id = client_rpc.send(&wire_addr, &test_amount, false).unwrap(); - wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[send_id]); - let bounce_id = wire_rpc - .bounce(&send_id, &bounce_fee, &[12, 34, 56, 78]) - .unwrap(); - wait_for_tx(&mut wire_rpc, &mut reserve_rpc, &[bounce_id]); - let bounce_tx_fee = wire_rpc.get_tx(&bounce_id).unwrap().details[0] - .fee - .unwrap() - .abs() - .to_unsigned() - .unwrap(); - let send_tx_fee = client_rpc.get_tx(&send_id).unwrap().details[0] - .fee - .unwrap() - .abs() - .to_unsigned() - .unwrap(); - wire_rpc.get_tx_op_return(&bounce_id).unwrap(); - let after = client_rpc.get_balance().unwrap(); - assert!(before >= after); - assert_eq!(before - after, bounce_tx_fee + bounce_fee + send_tx_fee); - }); - runner.test("Bounce minimal amount with metadata", || { - let send_id = client_rpc - .send(&wire_addr, &Amount::from_sat(294), false) - .unwrap(); - wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[send_id]); - assert!(matches!( - wire_rpc.bounce(&send_id, &bounce_fee, &[12, 34, 56]), - Err(rpc::Error::RPC { - code: rpc::ErrorCode::RpcWalletError, - .. - }) - )); - }); - runner.test("Bounce too small amount", || { - let send_id = client_rpc - .send(&wire_addr, &(Amount::from_sat(294) + bounce_fee), false) - .unwrap(); - wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[send_id]); - assert!(matches!( - wire_rpc.bounce(&send_id, &bounce_fee, &[]), - Err(rpc::Error::RPC { - code: rpc::ErrorCode::RpcWalletInsufficientFunds, - .. - }) - )); - }); - runner.test("Bounce complex", || { - // Generate 6 new addresses - let addresses: Vec<Address> = repeat_with(|| client_rpc.get_new_address().unwrap()) - .take(6) - .collect(); - // Send transaction to self with 1, 2 and 3 outputs - let txs: Vec<Txid> = [&addresses[0..1], &addresses[1..3], &addresses[3..]] - .into_iter() - .map(|addresses| { - client_rpc - .send_custom( - &[], - addresses.iter().map(|addr| (addr, &test_amount)), - None, - false, - false, - ) - .unwrap() - }) - .collect(); - wait_for_tx(&mut client_rpc, &mut reserve_rpc, txs.as_slice()); - let before = client_rpc.get_balance().unwrap(); - // Send a transaction with multiple input from multiple transaction of different outputs len - let send_id = client_rpc - .send_custom(&txs, [(&wire_addr, &(test_amount * 3))], None, false, false) - .unwrap(); - wait_for_tx(&mut client_rpc, &mut reserve_rpc, &[send_id]); - let bounce_id = wire_rpc.bounce(&send_id, &bounce_fee, &[]).unwrap(); - wait_for_tx(&mut wire_rpc, &mut reserve_rpc, &[bounce_id]); - let after = client_rpc.get_balance().unwrap(); - let bounce_tx_fee = wire_rpc.get_tx(&bounce_id).unwrap().details[0] - .fee - .unwrap() - .abs() - .to_unsigned() - .unwrap(); - let send_tx_fee = client_rpc.get_tx(&send_id).unwrap().details[0] - .fee - .unwrap() - .abs() - .to_unsigned() - .unwrap(); - assert!(before >= after); - assert_eq!(before - after, bounce_tx_fee + bounce_fee + send_tx_fee); - }); - - runner.conclude(); -} - -/// Check a specific transaction exist in a wallet historic -fn tx_exist( - rpc: &mut BtcRpc, - id: &Txid, - min_confirmation: i32, - detail: Category, -) -> rpc::Result<bool> { - let result = rpc.list_since_block(None, 1, false).unwrap(); - let found = result - .transactions - .into_iter() - .any(|tx| tx.category == detail && tx.confirmations >= min_confirmation && tx.txid == *id); - Ok(found) -} - -/// Run test track success and errors -struct TestRunner { - nb_ok: usize, - nb_err: usize, -} - -impl TestRunner { - fn new() -> Self { - Self { - nb_err: 0, - nb_ok: 0, - } - } - - fn test(&mut self, name: &str, test: impl FnOnce()) { - println!("{}", name); - - let result = std::panic::catch_unwind(AssertUnwindSafe(test)); - if result.is_ok() { - println!("OK"); - self.nb_ok += 1; - } else { - println!("ERR"); - self.nb_err += 1; - } - } - - /// Wait for tests completion and print results - fn conclude(self) { - println!( - "Result for {} tests: {} ok and {} err", - self.nb_ok + self.nb_err, - self.nb_ok, - self.nb_err - ); - } -} diff --git a/btc-wire/src/bin/btc-wire-cli.rs b/btc-wire/src/bin/btc-wire-cli.rs @@ -15,7 +15,7 @@ */ use btc_wire::{ config::{BitcoinConfig, WIRE_WALLET_NAME}, - rpc::{BtcRpc, Error, ErrorCode}, + rpc::{Rpc, Error, ErrorCode}, rpc_utils::default_data_dir, }; use taler_common::{ @@ -52,7 +52,7 @@ fn main() { .expect("Failed to load bitcoin configuration"); // Connect to bitcoin node let mut rpc = - BtcRpc::common(&btc_conf).expect("Failed to connect to bitcoin RPC server"); + Rpc::common(&btc_conf).expect("Failed to connect to bitcoin RPC server"); // Skip previous blocks let info = rpc @@ -85,7 +85,7 @@ fn main() { String::from_utf8(row.get(0)).expect("Stored address is not a valid string") } else { // Or generate a new one - let new = BtcRpc::wallet(&btc_conf, WIRE_WALLET_NAME) + let new = Rpc::wallet(&btc_conf, WIRE_WALLET_NAME) .expect("Failed to connect to wallet bitcoin RPC server") .get_new_address() .expect("Failed to generate new address") diff --git a/btc-wire/src/bin/btc-wire-utils.rs b/btc-wire/src/bin/btc-wire-utils.rs @@ -18,7 +18,7 @@ use std::path::PathBuf; use bitcoin::{Address, Amount, Network}; use btc_wire::{ config::BitcoinConfig, - rpc::{BtcRpc, Category, Error, ErrorCode}, + rpc::{Rpc, Category, Error, ErrorCode}, rpc_utils::default_data_dir, }; use taler_common::{ @@ -92,19 +92,19 @@ struct ClearCmd { struct App { config: BitcoinConfig, - client: BtcRpc, + client: Rpc, } impl App { pub fn start(data_dir: Option<PathBuf>) -> Self { let data_dir = data_dir.unwrap_or_else(default_data_dir); let config = BitcoinConfig::load(data_dir).unwrap(); - let client = BtcRpc::common(&config).unwrap(); + let client = Rpc::common(&config).unwrap(); Self { config, client } } - pub fn auto_wallet(&mut self, name: &str) -> (BtcRpc, Address) { + pub fn auto_wallet(&mut self, name: &str) -> (Rpc, Address) { // Auto load if let Err(err) = self.client.load_wallet(name) { match err { @@ -112,7 +112,7 @@ impl App { e => Err(e).unwrap(), } } - let mut wallet = BtcRpc::wallet(&self.config, name).unwrap(); + let mut wallet = Rpc::wallet(&self.config, name).unwrap(); let addr = wallet .get_new_address() .unwrap_or_else(|_| panic!("Failed to get wallet address {}", name)); diff --git a/btc-wire/src/info.rs b/btc-wire/src/info.rs @@ -1,128 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -use bitcoin::{hashes::Hash, Txid}; -use taler_common::url::Url; - -#[derive(Debug, Clone, Copy, thiserror::Error)] -pub enum DecodeErr { - #[error("Unknown first byte: {0}")] - UnknownFirstByte(u8), - #[error(transparent)] - UriPack(#[from] uri_pack::DecodeErr), - #[error(transparent)] - Hash(#[from] bitcoin::hashes::Error), - #[error("Unexpected end of file")] - UnexpectedEOF, -} - -// TODO rename Info to OutMetadata to match eth_wire -// TODO use a common url format for both -// TODO generic metadata struct ? - -/// Encoded metadata for outgoing transaction -#[derive(Debug, Clone, PartialEq, Eq)] -pub enum Info { - Transaction { wtid: [u8; 32], url: Url }, - Bounce { bounced: Txid }, -} - -// We leave a potential special meaning for u8::MAX -const BOUNCE_BYTE: u8 = u8::MAX - 1; - -pub fn encode_info(info: &Info) -> Vec<u8> { - let mut buffer = Vec::new(); - match info { - Info::Transaction { wtid, url } => { - buffer.push(if url.scheme() == "http" { 1 } else { 0 }); - buffer.extend_from_slice(wtid); - let parts = format!("{}{}", url.domain().unwrap_or(""), url.path()); - let packed = uri_pack::pack_uri(&parts).unwrap(); - buffer.extend_from_slice(&packed); - return buffer; - } - Info::Bounce { bounced: id } => { - buffer.push(BOUNCE_BYTE); - buffer.extend_from_slice(id.as_ref()); - } - } - return buffer; -} - -pub fn decode_info(bytes: &[u8]) -> Result<Info, DecodeErr> { - if bytes.is_empty() { - return Err(DecodeErr::UnexpectedEOF); - } - match bytes[0] { - 0..=1 => { - if bytes.len() < 33 { - return Err(DecodeErr::UnexpectedEOF); - } - let packed = format!( - "http{}://{}", - if bytes[0] == 0 { "s" } else { "" }, - uri_pack::unpack_uri(&bytes[33..])?, - ); - let url = Url::parse(&packed).unwrap(); - Ok(Info::Transaction { - wtid: bytes[1..33].try_into().unwrap(), - url, - }) - } - BOUNCE_BYTE => Ok(Info::Bounce { - bounced: Txid::from_slice(&bytes[1..])?, - }), - unknown => Err(DecodeErr::UnknownFirstByte(unknown)), - } -} - -#[cfg(test)] -mod test { - use bitcoin::{hashes::Hash, Txid}; - use taler_common::{rand_slice, url::Url}; - - use crate::info::{decode_info, encode_info, Info}; - - #[test] - fn decode_encode_tx() { - let urls = [ - "https://git.taler.net/", - "https://git.taler.net/depolymerization.git/", - "http://git.taler.net/", - "http://git.taler.net/depolymerization.git/", - ]; - for url in urls { - let wtid = rand_slice(); - let url = Url::parse(url).unwrap(); - let info = Info::Transaction { wtid, url }; - let encode = encode_info(&info); - let decoded = decode_info(&encode).unwrap(); - assert_eq!(decoded, info); - } - } - - #[test] - fn decode_encode_bounce() { - for _ in 0..4 { - let id: [u8; 32] = rand_slice(); - let info = Info::Bounce { - bounced: Txid::from_slice(&id).unwrap(), - }; - let encode = encode_info(&info); - let decoded = decode_info(&encode).unwrap(); - assert_eq!(decoded, info); - } - } -} diff --git a/btc-wire/src/lib.rs b/btc-wire/src/lib.rs @@ -16,7 +16,7 @@ use std::str::FromStr; use bitcoin::{hashes::hex::FromHex, Address, Amount, Network, Txid}; -use rpc::{BtcRpc, Category, TransactionFull}; +use rpc::{Rpc, Category, TransactionFull}; use rpc_utils::{segwit_min_amount, sender_address}; use segwit::{decode_segwit_msg, encode_segwit_key}; @@ -42,7 +42,7 @@ pub enum GetOpReturnErr { } /// An extended bitcoincore JSON-RPC api client who can send and retrieve metadata with their transaction -impl BtcRpc { +impl Rpc { /// Send a transaction with a 32B key as metadata encoded using fake segwit addresses pub fn send_segwit_key( &mut self, diff --git a/btc-wire/src/loops.rs b/btc-wire/src/loops.rs @@ -26,7 +26,7 @@ pub mod worker; #[derive(Debug, thiserror::Error)] pub enum LoopError { #[error(transparent)] - RPC(#[from] rpc::Error), + Rpc(#[from] rpc::Error), #[error(transparent)] DB(#[from] postgres::Error), #[error("Another btc_wire process is running concurrently")] diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs @@ -22,7 +22,7 @@ use std::{ use bitcoin::{hashes::Hash, Amount as BtcAmount, BlockHash, Txid}; use btc_wire::{ - rpc::{self, BtcRpc, Category, ErrorCode, TransactionFull}, + rpc::{self, Rpc, Category, ErrorCode, TransactionFull}, rpc_utils::sender_address, GetOpReturnErr, GetSegwitErr, }; @@ -37,7 +37,7 @@ use taler_common::{ use crate::{ fail_point::fail_point, - info::{decode_info, encode_info, Info}, + metadata::OutMetadata, reconnect::{AutoReconnectRPC, AutoReconnectSql}, sql::{sql_addr, sql_btc_amount, sql_txid}, status::{BounceStatus, TxStatus}, @@ -129,7 +129,7 @@ pub fn worker( // approach is to wait for the next loop when an RPC error is caught to prevent endless logged errors. skip_notification = !matches!( e, - LoopError::RPC(rpc::Error::RPC { .. } | rpc::Error::Bitcoin(_)) + LoopError::Rpc(rpc::Error::RPC { .. } | rpc::Error::Bitcoin(_)) | LoopError::Concurrency ); } else { @@ -139,7 +139,7 @@ pub fn worker( } /// Send atransaction on the blockchain, return true if more transactions with the same status remains -fn send(db: &mut Client, rpc: &mut BtcRpc, status: TxStatus) -> LoopResult<bool> { +fn send(db: &mut Client, rpc: &mut Rpc, status: TxStatus) -> LoopResult<bool> { assert!(status == TxStatus::Delayed || status == TxStatus::Requested); // We rely on the advisory lock to ensure we are the only one sending transactions let row = db.query_opt( @@ -152,8 +152,8 @@ fn send(db: &mut Client, rpc: &mut BtcRpc, status: TxStatus) -> LoopResult<bool> let wtid: [u8; 32] = sql_array(row, 2); let addr = sql_addr(row, 3); let url = sql_url(row, 4); - let info = Info::Transaction { wtid, url }; - let metadata = encode_info(&info); + let info = OutMetadata::Transaction { wtid, url }; + let metadata = info.encode(); match rpc.send_op_return(&addr, &amount, &metadata, false, true) { Ok(tx_id) => { @@ -180,7 +180,7 @@ fn send(db: &mut Client, rpc: &mut BtcRpc, status: TxStatus) -> LoopResult<bool> /// Bounce a transaction on the blockchain, return true if more bounce with the same status remains fn bounce( db: &mut Client, - rpc: &mut BtcRpc, + rpc: &mut Rpc, status: BounceStatus, fee: &BtcAmount, ) -> LoopResult<bool> { @@ -193,10 +193,9 @@ fn bounce( if let Some(row) = &row { let id: i32 = row.get(0); let bounced: Txid = sql_txid(row, 1); - let info = Info::Bounce { bounced }; - let metadata = encode_info(&info); + let info = OutMetadata::Bounce { bounced }; - match rpc.bounce(&bounced, fee, &metadata) { + match rpc.bounce(&bounced, fee, &info.encode()) { Ok(it) => { fail_point("(injected) fail bounce", 0.3)?; db.execute( @@ -238,7 +237,7 @@ fn last_hash(db: &mut Client) -> Result<Option<BlockHash>, postgres::Error> { /// Parse new transactions, return true if the database is up to date with the latest mined block fn sync_chain( - rpc: &mut BtcRpc, + rpc: &mut Rpc, db: &mut Client, config: &BtcConfig, state: &WireState, @@ -324,7 +323,7 @@ fn sync_chain( fn sync_chain_removed( txs: &HashMap<Txid, (Category, i32)>, removed: &HashSet<Txid>, - rpc: &mut BtcRpc, + rpc: &mut Rpc, db: &mut Client, min_confirmations: i32, ) -> LoopResult<bool> { @@ -396,19 +395,19 @@ fn sync_chain_removed( fn sync_chain_outgoing( id: &Txid, confirmations: i32, - rpc: &mut BtcRpc, + rpc: &mut Rpc, db: &mut Client, config: &BtcConfig, ) -> LoopResult<()> { match rpc .get_tx_op_return(id) - .map(|(full, bytes)| (full, decode_info(&bytes))) + .map(|(full, bytes)| (full, OutMetadata::decode(&bytes))) { Ok((full, Ok(info))) => match info { - Info::Transaction { wtid, .. } => { + OutMetadata::Transaction { wtid, .. } => { sync_chain_outgoing_send(id, &full, &wtid, rpc, db, confirmations, config)? } - Info::Bounce { bounced } => { + OutMetadata::Bounce { bounced } => { sync_chain_outgoing_bounce(id, &bounced, db, confirmations)? } }, @@ -426,7 +425,7 @@ fn sync_chain_outgoing_send( id: &Txid, full: &TransactionFull, wtid: &[u8; 32], - rpc: &mut BtcRpc, + rpc: &mut Rpc, db: &mut Client, confirmations: i32, config: &BtcConfig, @@ -600,7 +599,7 @@ fn sync_chain_outgoing_bounce( /// Sync database with na incoming confirmed transaction fn sync_chain_incoming_confirmed( id: &Txid, - rpc: &mut BtcRpc, + rpc: &mut Rpc, db: &mut Client, ) -> Result<(), LoopError> { match rpc.get_tx_segwit_key(id) { diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs @@ -16,7 +16,7 @@ use bitcoin::Network; use btc_wire::{ config::{BitcoinConfig, WIRE_WALLET_NAME}, - rpc::BtcRpc, + rpc::Rpc, rpc_utils::default_data_dir, }; use reconnect::{AutoReconnectRPC, AutoReconnectSql}; @@ -29,7 +29,7 @@ use taler_common::{ use crate::loops::{analysis::analysis, watcher::watcher, worker::worker}; mod fail_point; -mod info; +mod metadata; mod loops; mod reconnect; mod sql; @@ -72,7 +72,7 @@ fn main() { confirmation: AtomicU16::new(config.confirmation), })); - let mut rpc = BtcRpc::common(&btc_config).unwrap(); + let mut rpc = Rpc::common(&btc_config).unwrap(); rpc.load_wallet(WIRE_WALLET_NAME).ok(); let rpc_watcher = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME); let rpc_analysis = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME); diff --git a/btc-wire/src/metadata.rs b/btc-wire/src/metadata.rs @@ -0,0 +1,128 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ +use bitcoin::{hashes::Hash, Txid}; +use taler_common::url::Url; + +#[derive(Debug, Clone, Copy, thiserror::Error)] +pub enum DecodeErr { + #[error("Unknown first byte: {0}")] + UnknownFirstByte(u8), + #[error(transparent)] + UriPack(#[from] uri_pack::DecodeErr), + #[error(transparent)] + Hash(#[from] bitcoin::hashes::Error), + #[error("Unexpected end of file")] + UnexpectedEOF, +} + +// TODO use a common url format with eth-wire + +/// Encoded metadata for outgoing transaction +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum OutMetadata { + Transaction { wtid: [u8; 32], url: Url }, + Bounce { bounced: Txid }, +} + +// We leave a potential special meaning for u8::MAX +const BOUNCE_BYTE: u8 = u8::MAX - 1; + +impl OutMetadata { + pub fn encode(&self) -> Vec<u8> { + let mut buffer = Vec::new(); + match self { + OutMetadata::Transaction { wtid, url } => { + buffer.push(if url.scheme() == "http" { 1 } else { 0 }); + buffer.extend_from_slice(wtid); + let parts = format!("{}{}", url.domain().unwrap_or(""), url.path()); + let packed = uri_pack::pack_uri(&parts).unwrap(); + buffer.extend_from_slice(&packed); + return buffer; + } + OutMetadata::Bounce { bounced: id } => { + buffer.push(BOUNCE_BYTE); + buffer.extend_from_slice(id.as_ref()); + } + } + return buffer; + } + + pub fn decode(bytes: &[u8]) -> Result<Self, DecodeErr> { + if bytes.is_empty() { + return Err(DecodeErr::UnexpectedEOF); + } + match bytes[0] { + 0..=1 => { + if bytes.len() < 33 { + return Err(DecodeErr::UnexpectedEOF); + } + let packed = format!( + "http{}://{}", + if bytes[0] == 0 { "s" } else { "" }, + uri_pack::unpack_uri(&bytes[33..])?, + ); + let url = Url::parse(&packed).unwrap(); + Ok(OutMetadata::Transaction { + wtid: bytes[1..33].try_into().unwrap(), + url, + }) + } + BOUNCE_BYTE => Ok(OutMetadata::Bounce { + bounced: Txid::from_slice(&bytes[1..])?, + }), + unknown => Err(DecodeErr::UnknownFirstByte(unknown)), + } + } +} + +#[cfg(test)] +mod test { + use bitcoin::{hashes::Hash, Txid}; + use taler_common::{rand_slice, url::Url}; + + use crate::metadata::OutMetadata; + + #[test] + fn decode_encode_tx() { + let urls = [ + "https://git.taler.net/", + "https://git.taler.net/depolymerization.git/", + "http://git.taler.net/", + "http://git.taler.net/depolymerization.git/", + ]; + for url in urls { + let wtid = rand_slice(); + let url = Url::parse(url).unwrap(); + let info = OutMetadata::Transaction { wtid, url }; + let encoded = info.encode(); + let decoded = OutMetadata::decode(&encoded).unwrap(); + assert_eq!(decoded, info); + } + } + + #[test] + fn decode_encode_bounce() { + for _ in 0..4 { + let id: [u8; 32] = rand_slice(); + let info = OutMetadata::Bounce { + bounced: Txid::from_slice(&id).unwrap(), + }; + let encoded = info.encode(); + let decoded = OutMetadata::decode(&encoded).unwrap(); + assert_eq!(decoded, info); + } + } +} diff --git a/btc-wire/src/reconnect.rs b/btc-wire/src/reconnect.rs @@ -17,7 +17,7 @@ use std::time::Duration; -use btc_wire::{config::BitcoinConfig, rpc::BtcRpc}; +use btc_wire::{config::BitcoinConfig, rpc::Rpc}; use taler_common::log::log::error; use taler_common::postgres::{Client, NoTls}; @@ -27,7 +27,7 @@ const RECONNECT_DELAY: Duration = Duration::from_secs(5); pub struct AutoReconnectRPC { config: BitcoinConfig, wallet: String, - client: BtcRpc, + client: Rpc, } impl AutoReconnectRPC { @@ -41,9 +41,9 @@ impl AutoReconnectRPC { } /// Connect a new client, loop on error - fn connect(config: &BitcoinConfig, wallet: &str) -> BtcRpc { + fn connect(config: &BitcoinConfig, wallet: &str) -> Rpc { loop { - match BtcRpc::wallet(config, wallet) { + match Rpc::wallet(config, wallet) { Ok(mut new) => match new.net_info() { Ok(_) => return new, Err(err) => { @@ -60,7 +60,7 @@ impl AutoReconnectRPC { } /// Get a mutable connection, block until a connection can be established - pub fn client(&mut self) -> &mut BtcRpc { + pub fn client(&mut self) -> &mut Rpc { if self.client.net_info().is_err() { self.client = Self::connect(&self.config, &self.wallet); } diff --git a/btc-wire/src/rpc.rs b/btc-wire/src/rpc.rs @@ -79,7 +79,7 @@ pub type Result<T> = std::result::Result<T, Error>; const EMPTY: [(); 0] = []; /// Bitcoin RPC connection -pub struct BtcRpc { +pub struct Rpc { path: String, id: u64, cookie: String, @@ -87,7 +87,7 @@ pub struct BtcRpc { buf: Vec<u8>, } -impl BtcRpc { +impl Rpc { /// Start a RPC connection pub fn common(config: &BitcoinConfig) -> io::Result<Self> { Self::new(config, None) diff --git a/btc-wire/src/rpc_utils.rs b/btc-wire/src/rpc_utils.rs @@ -17,7 +17,7 @@ use std::{path::PathBuf, str::FromStr}; use bitcoin::{Address, Amount}; -use crate::rpc::{self, BtcRpc, TransactionFull}; +use crate::rpc::{self, Rpc, TransactionFull}; pub const CLIENT: &str = "client"; pub const WIRE: &str = "wire"; @@ -48,7 +48,7 @@ pub fn segwit_min_amount() -> Amount { } /// Get the first sender address from a raw transaction -pub fn sender_address(rpc: &mut BtcRpc, full: &TransactionFull) -> rpc::Result<Address> { +pub fn sender_address(rpc: &mut Rpc, full: &TransactionFull) -> rpc::Result<Address> { let first = &full.decoded.vin[0]; let tx = rpc.get_raw(&first.txid.unwrap())?; Ok(tx diff --git a/eth-wire/src/bin/eth-test.rs b/eth-wire/src/bin/eth-test.rs @@ -1,140 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -use std::{panic::AssertUnwindSafe, str::FromStr}; - -use eth_wire::{ - metadata::{InMetadata, OutMetadata}, - rpc::{self, Rpc}, -}; -use ethereum_types::{H256, U256}; -use taler_common::{config::load_eth_config, rand_slice, url::Url}; - -pub fn main() { - let path = std::env::args().nth(1).unwrap(); - let config = load_eth_config(path); - let mut rpc = Rpc::new(config.core.data_dir.unwrap().join("geth.ipc")).unwrap(); - - let accounts = rpc.list_accounts().unwrap(); - for account in &accounts { - rpc.unlock_account(&account, "password").unwrap(); - } - let wire = accounts[0]; - let client = accounts[1]; - let reserve = accounts[2]; - - let test_value = U256::from(15000u32); - let bounce_value = U256::from(10000u32); - let test_url = Url::from_str("http://test.com").unwrap(); - - let mut runner = TestRunner::new(); - - runner.test("Deposit", || { - let rng = rand_slice(); - let hash = rpc.deposit(client, wire, test_value, rng).unwrap(); - assert!(tx_pending(&mut rpc, &hash).unwrap()); - mine_pending(&mut rpc).unwrap(); - assert!(!tx_pending(&mut rpc, &hash).unwrap()); - let tx = rpc.get_transaction(&hash).unwrap().unwrap(); - let metadata = InMetadata::decode(&tx.input).unwrap(); - assert!(matches!(metadata, InMetadata::Deposit { reserve_pub } if reserve_pub == rng)); - assert_eq!(tx.value, test_value); - }); - - runner.test("Withdraw", || { - let rng = rand_slice(); - let hash = rpc - .withdraw(wire, client, test_value, rng, test_url.clone()) - .unwrap(); - assert!(tx_pending(&mut rpc, &hash).unwrap()); - mine_pending(&mut rpc).unwrap(); - assert!(!tx_pending(&mut rpc, &hash).unwrap()); - let tx = rpc.get_transaction(&hash).unwrap().unwrap(); - let metadata = OutMetadata::decode(&tx.input).unwrap(); - assert!( - matches!(metadata, OutMetadata::Withdraw { wtid, url } if wtid == rng && url == url) - ); - assert_eq!(tx.value, test_value); - }); - - runner.test("Bounce", || { - let rng = rand_slice(); - let deposit = rpc.deposit(client, wire, test_value, rng).unwrap(); - let hash = rpc.bounce(deposit, bounce_value).unwrap(); - assert!(tx_pending(&mut rpc, &hash).unwrap()); - mine_pending(&mut rpc).unwrap(); - assert!(!tx_pending(&mut rpc, &hash).unwrap()); - let tx = rpc.get_transaction(&hash).unwrap().unwrap(); - let metadata = OutMetadata::decode(&tx.input).unwrap(); - assert!(matches!(metadata, OutMetadata::Bounce { bounced } if bounced == deposit)); - assert_eq!(tx.value, test_value - bounce_value); - }); - - runner.conclude(); -} - -/// Check a specific transaction is pending -fn tx_pending(rpc: &mut Rpc, id: &H256) -> rpc::Result<bool> { - Ok(rpc.pending_transactions()?.iter().any(|t| t.hash == *id)) -} - -/// Mine pending transactions -fn mine_pending(rpc: &mut Rpc) -> rpc::Result<()> { - let mut notifier = rpc.subscribe_new_head()?; - rpc.miner_start()?; - while !rpc.pending_transactions()?.is_empty() { - notifier.next()?; - } - rpc.miner_stop()?; - Ok(()) -} - -/// Run test track success and errors -struct TestRunner { - nb_ok: usize, - nb_err: usize, -} - -impl TestRunner { - fn new() -> Self { - Self { - nb_err: 0, - nb_ok: 0, - } - } - - fn test(&mut self, name: &str, test: impl FnOnce()) { - println!("{}", name); - - let result = std::panic::catch_unwind(AssertUnwindSafe(test)); - if result.is_ok() { - println!("OK"); - self.nb_ok += 1; - } else { - println!("ERR"); - self.nb_err += 1; - } - } - - /// Wait for tests completion and print results - fn conclude(self) { - println!( - "Result for {} tests: {} ok and {} err", - self.nb_ok + self.nb_err, - self.nb_ok, - self.nb_err - ); - } -} diff --git a/eth-wire/src/loops.rs b/eth-wire/src/loops.rs @@ -0,0 +1,18 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +pub mod watcher; +pub mod worker; diff --git a/eth-wire/src/loops/watcher.rs b/eth-wire/src/loops/watcher.rs @@ -0,0 +1,25 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ +use eth_wire::rpc::Rpc; +use taler_common::postgres::Client; + +pub fn watcher(mut rpc: Rpc, mut db: Client) { + let mut notifier = rpc.subscribe_new_head().unwrap(); + loop { + db.execute("NOTIFY new_block", &[]).unwrap(); + notifier.next().unwrap(); + } +} diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs @@ -0,0 +1,150 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ +use std::time::SystemTime; + +use eth_wire::{ + metadata::InMetadata, + rpc::Rpc, + taler_util::{eth_payto_url, eth_to_taler}, + BlockState, +}; +use taler_common::{ + api_common::base32, + log::log::{error, info}, + postgres::{fallible_iterator::FallibleIterator, Client}, + sql::{sql_array, sql_url}, +}; + +use crate::{ + sql::{sql_addr, sql_eth_amount}, + status::TxStatus, + LoopResult, WireState, +}; + +pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) { + let mut skip_notification = false; + loop { + let result: LoopResult<()> = (|| { + // Listen to all channels + db.batch_execute("LISTEN new_block; LISTEN new_tx")?; + // Wait for the next notification + { + let mut ntf = db.notifications(); + if !skip_notification && ntf.is_empty() { + // Block until next notification + ntf.blocking_iter().next()?; + } + // Conflate all notifications + let mut iter = ntf.iter(); + while iter.next()?.is_some() {} + } + + sync_chain(&mut rpc, &mut db, state)?; + + while send(&mut db, &mut rpc, state)? {} + Ok(()) + })(); + + if let Err(e) = result { + error!("worker: {}", e); + skip_notification = false; + } else { + skip_notification = false; + } + } +} + +fn sync_chain(rpc: &mut Rpc, db: &mut Client, state: &WireState) -> LoopResult<bool> { + let row = db.query_one("SELECT value FROM state WHERE name='last_block'", &[])?; + let slice: &[u8] = row.get(0); + let block = BlockState::from_bytes(slice.try_into().unwrap()); + + let mut txs = Vec::new(); + + txs.extend(rpc.pending_transactions()?.into_iter().map(|t| (t, 0))); + + let mut cursor = rpc.current_block()?; + let mut confirmation = 1; + + // TODO check hash to detect reorg + + while cursor.number.expect("Mined block") != block.number { + txs.extend(cursor.transactions.drain(..).map(|t| (t, confirmation))); + cursor = rpc.block(cursor.number.unwrap() - 1u64)?.unwrap(); + confirmation += 1; + } + + for (tx, _confirmation) in txs { + if tx.to == Some(state.address) { + let metadata = InMetadata::decode(&tx.input).unwrap(); + match metadata { + InMetadata::Deposit { reserve_pub } => { + let date = SystemTime::now(); + let amount = eth_to_taler(&tx.value); + let credit_addr = tx.from.expect("Not coinbase"); + let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ + &date, &amount.to_string(), &reserve_pub.as_ref(), &eth_payto_url(&credit_addr).as_ref(), &state.config.payto.as_ref() + ])?; + if nb > 0 { + info!( + "<< {} {} in {} from {}", + amount, + base32(&reserve_pub), + hex::encode(tx.hash), + hex::encode(credit_addr), + ); + } + } + } + } + } + + Ok(true) +} + +/// Send a transaction on the blockchain, return true if more transactions with the same status remains +fn send(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> { + // We rely on the advisory lock to ensure we are the only one sending transactions + let row = db.query_opt( +"SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY _date LIMIT 1", +&[&(TxStatus::Requested as i16)], +)?; + if let Some(row) = &row { + let id: i32 = row.get(0); + let amount = sql_eth_amount(row, 1); + let wtid: [u8; 32] = sql_array(row, 2); + let addr = sql_addr(row, 3); + let url = sql_url(row, 4); + match rpc.withdraw(state.address, addr, amount, wtid, url) { + Ok(tx_id) => { + db.execute( + "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", + &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id], + )?; + let amount = eth_to_taler(&amount); + info!(">> {} {} in {} to {}", amount, base32(&wtid), tx_id, addr); + } + Err(e) => { + db.execute( + "UPDATE tx_out SET status=$1 WHERE id=$2", + &[&(TxStatus::Delayed as i16), &id], + )?; + Err(e)?; + } + } + } + Ok(row.is_some()) +} diff --git a/eth-wire/src/main.rs b/eth-wire/src/main.rs @@ -28,6 +28,8 @@ use taler_common::{ }; mod status; +mod sql; +mod loops; pub struct WireState { confirmation: AtomicU16, @@ -49,188 +51,6 @@ pub enum LoopError { pub type LoopResult<T> = Result<T, LoopError>; -mod sql { - use eth_wire::taler_util::{eth_payto_addr, taler_to_eth}; - use ethereum_types::{H160, U256}; - use taler_common::{ - postgres::Row, - sql::{sql_amount, sql_url}, - }; - - pub fn sql_eth_amount(row: &Row, idx: usize) -> U256 { - let amount = sql_amount(row, idx); - taler_to_eth(&amount).unwrap_or_else(|_| { - panic!( - "Database invariant: expected an ethereum amount got {}", - amount - ) - }) - } - - pub fn sql_addr(row: &Row, idx: usize) -> H160 { - let url = sql_url(row, idx); - eth_payto_addr(&url).unwrap_or_else(|_| { - panic!( - "Database invariant: expected an ethereum payto url got {}", - url - ) - }) - } -} - -mod loops { - pub mod watcher { - use eth_wire::rpc::Rpc; - use taler_common::postgres::Client; - - pub fn watcher(mut rpc: Rpc, mut db: Client) { - let mut notifier = rpc.subscribe_new_head().unwrap(); - loop { - db.execute("NOTIFY new_block", &[]).unwrap(); - notifier.next().unwrap(); - } - } - } - - pub mod worker { - use std::time::SystemTime; - - use eth_wire::{ - metadata::InMetadata, - rpc::Rpc, - taler_util::{eth_payto_url, eth_to_taler}, - BlockState, - }; - use taler_common::{ - api_common::base32, - log::log::{error, info}, - postgres::{fallible_iterator::FallibleIterator, Client}, - sql::{sql_array, sql_url}, - }; - - use crate::{ - sql::{sql_addr, sql_eth_amount}, - status::TxStatus, - LoopResult, WireState, - }; - - pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) { - let mut skip_notification = false; - loop { - let result: LoopResult<()> = (|| { - // Listen to all channels - db.batch_execute("LISTEN new_block; LISTEN new_tx")?; - // Wait for the next notification - { - let mut ntf = db.notifications(); - if !skip_notification && ntf.is_empty() { - // Block until next notification - ntf.blocking_iter().next()?; - } - // Conflate all notifications - let mut iter = ntf.iter(); - while iter.next()?.is_some() {} - } - - sync_chain(&mut rpc, &mut db, state)?; - - while send(&mut db, &mut rpc, state)? {} - Ok(()) - })(); - - if let Err(e) = result { - error!("worker: {}", e); - skip_notification = false; - } else { - skip_notification = false; - } - } - } - - fn sync_chain(rpc: &mut Rpc, db: &mut Client, state: &WireState) -> LoopResult<bool> { - let row = db.query_one("SELECT value FROM state WHERE name='last_block'", &[])?; - let slice: &[u8] = row.get(0); - let block = BlockState::from_bytes(slice.try_into().unwrap()); - - let mut txs = Vec::new(); - - txs.extend(rpc.pending_transactions()?.into_iter().map(|t| (t, 0))); - - let mut cursor = rpc.current_block()?; - let mut confirmation = 1; - - // TODO check hash to detect reorg - - while cursor.number.expect("Mined block") != block.number { - txs.extend(cursor.transactions.drain(..).map(|t| (t, confirmation))); - cursor = rpc.block(cursor.number.unwrap() - 1u64)?.unwrap(); - confirmation += 1; - } - - for (tx, _confirmation) in txs { - if tx.to == Some(state.address) { - let metadata = InMetadata::decode(&tx.input).unwrap(); - match metadata { - InMetadata::Deposit { reserve_pub } => { - let date = SystemTime::now(); - let amount = eth_to_taler(&tx.value); - let credit_addr = tx.from.expect("Not coinbase"); - let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ - &date, &amount.to_string(), &reserve_pub.as_ref(), &eth_payto_url(&credit_addr).as_ref(), &state.config.payto.as_ref() - ])?; - if nb > 0 { - info!( - "<< {} {} in {} from {}", - amount, - base32(&reserve_pub), - hex::encode(tx.hash), - hex::encode(credit_addr), - ); - } - } - } - } - } - - Ok(true) - } - - /// Send a transaction on the blockchain, return true if more transactions with the same status remains - fn send(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> { - // We rely on the advisory lock to ensure we are the only one sending transactions - let row = db.query_opt( - "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY _date LIMIT 1", - &[&(TxStatus::Requested as i16)], - )?; - if let Some(row) = &row { - let id: i32 = row.get(0); - let amount = sql_eth_amount(row, 1); - let wtid: [u8; 32] = sql_array(row, 2); - let addr = sql_addr(row, 3); - let url = sql_url(row, 4); - match rpc.withdraw(state.address, addr, amount, wtid, url) { - Ok(tx_id) => { - db.execute( - "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", - &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id], - )?; - let amount = eth_to_taler(&amount); - info!(">> {} {} in {} to {}", amount, base32(&wtid), tx_id, addr); - } - Err(e) => { - db.execute( - "UPDATE tx_out SET status=$1 WHERE id=$2", - &[&(TxStatus::Delayed as i16), &id], - )?; - Err(e)?; - } - } - } - Ok(row.is_some()) - } - } -} - fn main() { taler_common::log::init(); diff --git a/eth-wire/src/rpc.rs b/eth-wire/src/rpc.rs @@ -131,7 +131,7 @@ impl Rpc { } Err(err) if err.classify() == Category::Eof => { if nb == 0 { - return Err(std::io::Error::new( + Err(std::io::Error::new( ErrorKind::UnexpectedEof, "Stream EOF", ))?; diff --git a/eth-wire/src/sql.rs b/eth-wire/src/sql.rs @@ -0,0 +1,41 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ +use eth_wire::taler_util::{eth_payto_addr, taler_to_eth}; +use ethereum_types::{H160, U256}; +use taler_common::{ + postgres::Row, + sql::{sql_amount, sql_url}, +}; + +pub fn sql_eth_amount(row: &Row, idx: usize) -> U256 { + let amount = sql_amount(row, idx); + taler_to_eth(&amount).unwrap_or_else(|_| { + panic!( + "Database invariant: expected an ethereum amount got {}", + amount + ) + }) +} + +pub fn sql_addr(row: &Row, idx: usize) -> H160 { + let url = sql_url(row, idx); + eth_payto_addr(&url).unwrap_or_else(|_| { + panic!( + "Database invariant: expected an ethereum payto url got {}", + url + ) + }) +} diff --git a/eth-wire/src/taler_util.rs b/eth-wire/src/taler_util.rs @@ -1,3 +1,18 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ use std::str::FromStr; use ethereum_types::{Address, U256}; diff --git a/taler-common/src/config.rs b/taler-common/src/config.rs @@ -25,7 +25,7 @@ pub trait Config: Sized { fn load_from_file(config_file: impl AsRef<Path>) -> Self { let conf = ini::Ini::load_from_file(config_file).expect("Failed to open the config file"); let taler = section(&conf, "taler"); - let currency = require(&taler, "CURRENCY", string); + let currency = require(taler, "CURRENCY", string); let section_name = match currency.as_str() { "BTC" => "depolymerizer-bitcoin", "ETH" => "depolymerizer-ethereum",