depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 2ad5eb9d0cbc24e0c2c5d9ead73d4fbbd2515deb
parent 48b3266bbffa290d96e3f7d7fb6d2ca48679d8a9
Author: Antoine A <>
Date:   Wed,  2 Feb 2022 19:51:09 +0100

eth-wire: wire draft

Diffstat:
Mbtc-wire/src/bin/btc-wire-utils.rs | 11+----------
Mbtc-wire/src/loops/watcher.rs | 2+-
Mbtc-wire/src/loops/worker.rs | 5++---
Mbtc-wire/src/main.rs | 8++++----
Mbtc-wire/src/taler_util.rs | 9++++++---
Meth-wire/src/bin/eth-test.rs | 2+-
Meth-wire/src/bin/eth-wire-cli.rs | 32+++++++++++++++++++++++---------
Meth-wire/src/bin/eth-wire-utils.rs | 149+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Meth-wire/src/lib.rs | 9+++++----
Meth-wire/src/main.rs | 355++++++++++++++++++++++++++++++++++++++++++++++++++++++-------------------------
Meth-wire/src/rpc.rs | 6+++++-
Aeth-wire/src/status.rs | 69+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aeth-wire/src/taler_util.rs | 46++++++++++++++++++++++++++++++++++++++++++++++
Mmakefile | 7+++++--
Mtaler-common/src/config.rs | 12++++++------
Mtest/btc/wire.sh | 2+-
Mtest/common.sh | 76++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------
Mtest/conf/taler_eth.conf | 1-
Atest/eth/wire.sh | 74++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mwire-gateway/src/main.rs | 23+++++++++++++++--------
20 files changed, 710 insertions(+), 188 deletions(-)

diff --git a/btc-wire/src/bin/btc-wire-utils.rs b/btc-wire/src/bin/btc-wire-utils.rs @@ -50,10 +50,6 @@ enum Cmd { #[argh(subcommand, name = "transfer")] /// Wait or mine the next block struct TransferCmd { - #[argh(option, short = 'k')] - /// reserve public key - key: Option<String>, - #[argh(option, short = 'f', default = "String::from(\"client\")")] /// sender wallet from: String, @@ -141,12 +137,7 @@ impl App { fn main() { let args: Args = argh::from_env(); match args.cmd { - Cmd::Transfer(TransferCmd { - key: _key, - from, - to, - amount, - }) => { + Cmd::Transfer(TransferCmd { from, to, amount }) => { let mut app = App::start(args.datadir); let (mut client, _) = app.auto_wallet(&from); let (_, to) = app.auto_wallet(&to); diff --git a/btc-wire/src/loops/watcher.rs b/btc-wire/src/loops/watcher.rs @@ -25,8 +25,8 @@ pub fn watcher(mut rpc: AutoReconnectRPC, mut db: AutoReconnectSql) { let rpc = rpc.client(); let db = db.client(); let result: LoopResult<()> = (|| loop { - rpc.wait_for_new_block(0)?; db.execute("NOTIFY new_block", &[])?; + rpc.wait_for_new_block(0)?; })(); if let Err(e) = result { error!("watcher: {}", e); diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs @@ -57,8 +57,7 @@ pub fn worker( let mut lifetime = config.wire_lifetime; let mut status = true; - // Alway start with a sync work - let mut skip_notification = true; + let mut skip_notification = false; loop { // Check lifetime if let Some(nb) = lifetime.as_mut() { @@ -139,7 +138,7 @@ pub fn worker( } } -/// Send a transaction on the blockchain, return true if more transactions with the same status remains +/// Send atransaction on the blockchain, return true if more transactions with the same status remains fn send(db: &mut Client, rpc: &mut BtcRpc, status: TxStatus) -> LoopResult<bool> { assert!(status == TxStatus::Delayed || status == TxStatus::Requested); // We rely on the advisory lock to ensure we are the only one sending transactions diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs @@ -45,7 +45,7 @@ fn main() { let config = load_btc_config(std::env::args_os().nth(1).expect("Missing conf path arg")); let data_dir = config - .init + .core .data_dir .as_ref() .cloned() @@ -78,9 +78,9 @@ fn main() { let rpc_analysis = AutoReconnectRPC::new(btc_config.clone(), WIRE_WALLET_NAME); let rpc_worker = AutoReconnectRPC::new(btc_config, WIRE_WALLET_NAME); - let db_watcher = AutoReconnectSql::new(&config.init.db_url); - let db_analysis = AutoReconnectSql::new(&config.init.db_url); - let db_worker = AutoReconnectSql::new(&config.init.db_url); + let db_watcher = AutoReconnectSql::new(&config.core.db_url); + let db_analysis = AutoReconnectSql::new(&config.core.db_url); + let db_worker = AutoReconnectSql::new(&config.core.db_url); named_spawn("watcher", move || watcher(rpc_watcher, db_watcher)); named_spawn("analysis", move || { analysis(rpc_analysis, db_analysis, config, state) diff --git a/btc-wire/src/taler_util.rs b/btc-wire/src/taler_util.rs @@ -28,10 +28,13 @@ pub fn btc_payto_url(addr: &Address) -> Url { /// Extract a btc address from a payto uri pub fn btc_payto_addr(url: &Url) -> Result<Address, String> { if url.domain() != Some("bitcoin") { - return Err("".to_string()); + return Err(format!( + "Expected domain 'bitcoin' got '{}'", + url.domain().unwrap_or_default() + )); } let str = url.path().trim_start_matches('/'); - return Address::from_str(str).map_err(|_| "".to_string()); + return Address::from_str(str).map_err(|e| e.to_string()); } /// Transform a btc amount into a taler amount @@ -44,7 +47,7 @@ pub fn btc_to_taler(amount: &SignedAmount) -> Amount { /// Transform a taler amount into a btc amount pub fn taler_to_btc(amount: &Amount) -> Result<BtcAmount, String> { if amount.currency != "BTC" { - return Err("Wrong currency".to_string()); + return Err(format!("expected ETH for {}", amount.currency)); } let sat = amount.value * 100_000_000 + amount.fraction as u64; diff --git a/eth-wire/src/bin/eth-test.rs b/eth-wire/src/bin/eth-test.rs @@ -25,7 +25,7 @@ use taler_common::{config::load_eth_config, rand_slice, url::Url}; pub fn main() { let path = std::env::args().nth(1).unwrap(); let config = load_eth_config(path); - let mut rpc = Rpc::new(config.init.data_dir.unwrap().join("geth.ipc")).unwrap(); + let mut rpc = Rpc::new(config.core.data_dir.unwrap().join("geth.ipc")).unwrap(); let accounts = rpc.list_accounts().unwrap(); for account in &accounts { diff --git a/eth-wire/src/bin/eth-wire-cli.rs b/eth-wire/src/bin/eth-wire-cli.rs @@ -15,6 +15,7 @@ */ use eth_wire::{rpc::Rpc, BlockState}; +use ethereum_types::H160; use taler_common::{ config::{Config, CoreConfig}, postgres::{Client, NoTls}, @@ -51,7 +52,7 @@ fn main() { let block = rpc.current_block().expect("Failed to get current block"); let state = BlockState { hash: block.hash.unwrap(), - nb: block.number.unwrap(), + number: block.number.unwrap(), }; let nb_row = db .execute( @@ -60,21 +61,34 @@ fn main() { ) .expect("Failed to update database state"); if nb_row > 0 { - println!("Skipped {} previous block", state.nb); + println!("Skipped {} previous block", state.number); } - // Load previous address - let mut addresses = rpc.list_accounts().expect("Failed to get accounts"); - if addresses.is_empty() { - addresses = rpc + let prev_addr = db + .query_opt("SELECT value FROM state WHERE name = 'addr'", &[]) + .expect("Failed to query database state"); + let (addr, created) = if let Some(row) = prev_addr { + (H160::from_slice(row.get(0)), false) + } else { + // Or generate a new one + let new = rpc .new_account("password") .expect("Failed creating account"); + db.execute( + "INSERT INTO state (name, value) VALUES ('addr', $1)", + &[&new.as_bytes()], + ) + .expect("Failed to update database state"); + (new, true) + }; + + if created { println!("Created new wallet"); } else { - println!("Created new wallet"); - } + println!("Found already existing wallet") + }; - let addr = addresses[0]; + let addr = hex::encode(addr.as_bytes()); println!("Address is {}", &addr); println!("Add the following line into taler.conf:"); println!("[depolymerizer-ethereum]"); diff --git a/eth-wire/src/bin/eth-wire-utils.rs b/eth-wire/src/bin/eth-wire-utils.rs @@ -1,3 +1,12 @@ +use std::{path::PathBuf, str::FromStr}; + +use eth_wire::{ + rpc::{hex::Hex, Rpc, TransactionRequest}, + taler_util::taler_to_eth, +}; +use ethereum_types::H160; +use taler_common::{api_common::Amount, rand_slice}; + /* This file is part of TALER Copyright (C) 2022 Taler Systems SA @@ -14,6 +23,141 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ +#[derive(argh::FromArgs)] +/// Euthereum wire test client +struct Args { + #[argh(option, short = 'd')] + /// specify data directory + datadir: Option<PathBuf>, + #[argh(subcommand)] + cmd: Cmd, +} + +#[derive(argh::FromArgs)] +#[argh(subcommand)] +enum Cmd { + Send(SendCmd), + Deposit(DepositCmd), + Mine(MineCmd), + ClearDB(ClearCmd), + Balance(BalanceCmd), +} + +#[derive(argh::FromArgs)] +#[argh(subcommand, name = "send")] +/// Send a transaction +struct SendCmd { + #[argh(positional)] + /// sender wallet + from: String, + + #[argh(positional)] + /// receiver wallet + to: String, + + #[argh(positional)] + /// amount to send in eth + amount: f64, +} + +#[derive(argh::FromArgs)] +#[argh(subcommand, name = "deposit")] +/// Perform a deposit transaction +struct DepositCmd { + #[argh(positional)] + /// sender wallet + from: String, + + #[argh(positional)] + /// receiver wallet + to: String, + + #[argh(positional)] + /// amount to send in eth + amount: f64, +} + +#[derive(argh::FromArgs)] +#[argh(subcommand, name = "mine")] +/// Wait or mine the next block +struct MineCmd { + #[argh(positional)] + /// receiver wallet + to: String, + #[argh(positional, default = "0")] + /// amount to mine in eth + amount: u64, +} + +#[derive(argh::FromArgs)] +#[argh(subcommand, name = "cleardb")] +/// Clear database +struct ClearCmd { + #[argh(positional)] + /// taler config + config: String, +} + +#[derive(argh::FromArgs)] +#[argh(subcommand, name = "balance")] +/// Get eth balance +struct BalanceCmd { + #[argh(positional)] + /// account address + addr: String, +} + fn main() { - -} -\ No newline at end of file + let args: Args = argh::from_env(); + match args.cmd { + Cmd::Deposit(DepositCmd { from, to, amount }) => { + let mut rpc = Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap(); + let from = H160::from_str(&from).unwrap(); + let to = H160::from_str(&to).unwrap(); + let amount = Amount::from_str(&format!("ETH:{}", amount)).unwrap(); + let value = taler_to_eth(&amount).unwrap(); + rpc.unlock_account(&from, "password").ok(); + rpc.deposit(from, to, value, rand_slice()).unwrap(); + } + Cmd::Send(SendCmd { from, to, amount }) => { + let mut rpc = Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap(); + let from = H160::from_str(&from).unwrap(); + let to = H160::from_str(&to).unwrap(); + let amount = Amount::from_str(&format!("ETH:{}", amount)).unwrap(); + let value = taler_to_eth(&amount).unwrap(); + rpc.unlock_account(&from, "password").ok(); + rpc.send_transaction(&TransactionRequest { + from, + to, + value, + gas: None, + gas_price: None, + data: Hex(vec![]), + }) + .unwrap(); + } + Cmd::Mine(MineCmd { to, mut amount }) => { + let mut rpc = Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap(); + let to = H160::from_str(&to).unwrap(); + rpc.unlock_account(&to, "password").ok(); + let mut notifier = rpc.subscribe_new_head().unwrap(); + + rpc.miner_start().unwrap(); + while !rpc.pending_transactions().unwrap().is_empty() { + notifier.next().unwrap(); + amount = amount.saturating_sub(1); + } + for _ in 0..amount { + notifier.next().unwrap(); + } + rpc.miner_stop().unwrap(); + } + Cmd::Balance(BalanceCmd { addr }) => { + let mut rpc = Rpc::new(args.datadir.unwrap().join("geth.ipc")).unwrap(); + let addr = H160::from_str(&addr).unwrap(); + let balance = rpc.balance(&addr).unwrap(); + println!("{}", (balance / 10_000_000_000u64).as_u64()); + } + Cmd::ClearDB(_) => todo!(), + } +} diff --git a/eth-wire/src/lib.rs b/eth-wire/src/lib.rs @@ -21,6 +21,7 @@ use taler_common::url::Url; pub mod metadata; pub mod rpc; +pub mod taler_util; impl rpc::Rpc { pub fn deposit( @@ -81,21 +82,21 @@ impl rpc::Rpc { #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct BlockState { pub hash: H256, - pub nb: U64, + pub number: U64, } impl BlockState { pub fn to_bytes(&self) -> [u8; 40] { let mut bytes = [0; 40]; bytes[..32].copy_from_slice(self.hash.as_bytes()); - self.nb.to_little_endian(&mut bytes[32..]); + self.number.to_little_endian(&mut bytes[32..]); bytes } pub fn from_bytes(bytes: &[u8; 40]) -> Self { Self { hash: H256::from_slice(&bytes[..32]), - nb: U64::from_little_endian(&bytes[32..]), + number: U64::from_little_endian(&bytes[32..]), } } } @@ -112,7 +113,7 @@ mod test { for _ in 0..4 { let state = BlockState { hash: H256::from_slice(&rand_slice::<32>()), - nb: U64::from(random::<u64>()), + number: U64::from(random::<u64>()), }; let encoded = state.to_bytes(); let decoded = BlockState::from_bytes(&encoded); diff --git a/eth-wire/src/main.rs b/eth-wire/src/main.rs @@ -13,134 +13,263 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::time::Duration; -use crate::rpc::Rpc; +use std::sync::atomic::AtomicU16; -mod rpc; +use eth_wire::{ + rpc::{self, Rpc}, + taler_util::eth_payto_addr, +}; +use ethereum_types::H160; +use loops::{watcher::watcher, worker::worker}; +use taler_common::{ + config::{load_eth_config, EthConfig}, + postgres::{self, Client, NoTls}, +}; -fn main() { - taler_common::log::init(); - let home = std::env::var("HOME").unwrap(); - let mut rpc = Rpc::new(format!("{}/.ethereum/geth.ipc", home)).unwrap(); - - /*let config = taler_common::config::Config::load_from_file( - std::env::args_os().nth(1).expect("Missing conf path arg"), - );*/ - //println!("Calling accounts."); - //let mut accounts = web3.eth().accounts().await?; - //println!("Accounts: {:?}", accounts); - /*accounts.push("00a329c0648769a73afac7f9381e08fb43dbea72".parse().unwrap()); - - println!("Calling balance."); - for account in &accounts { - let balance = web3.eth().balance(*account, None).await?; - println!("Balance of {:?}: {}", account, balance); - } +mod status; - web3.eth() - .send_transaction(TransactionRequest { - from: accounts[0], - to: Some(accounts[1]), - value: Some(U256::exp10(8)), - data: Some(Bytes::from(vec![0, 1, 2, 3, 4, 5])), - ..Default::default() - }) - .await?; +pub struct WireState { + confirmation: AtomicU16, + address: H160, + config: EthConfig, +} + +#[derive(Debug, thiserror::Error)] +pub enum LoopError { + #[error(transparent)] + RPC(#[from] rpc::Error), + #[error(transparent)] + DB(#[from] postgres::Error), + #[error("Another btc_wire process is running concurrently")] + Concurrency, + // #[error(transparent)] + // Injected(#[from] Injected), +} - println!("Calling balance."); - for account in &accounts { - let balance = web3.eth().balance(*account, None).await?; - println!("Balance of {:?}: {}", account, balance); +pub type LoopResult<T> = Result<T, LoopError>; + +mod sql { + use eth_wire::taler_util::{eth_payto_addr, taler_to_eth}; + use ethereum_types::{H160, U256}; + use taler_common::{ + postgres::Row, + sql::{sql_amount, sql_url}, + }; + + pub fn sql_eth_amount(row: &Row, idx: usize) -> U256 { + let amount = sql_amount(row, idx); + taler_to_eth(&amount).unwrap_or_else(|_| { + panic!( + "Database invariant: expected an ethereum amount got {}", + amount + ) + }) } - let filter = web3 - .eth_filter() - .create_logs_filter( - FilterBuilder::default() - .from_block(BlockNumber::Earliest) - .to_block(BlockNumber::Latest) - .address(vec![accounts[1]]) - .build(), - ) - .await?; - - let result = filter.poll().await?; - dbg!(result); - web3.eth() - .send_transaction(TransactionRequest { - from: accounts[0], - to: Some(accounts[1]), - value: Some(U256::exp10(8)), - data: Some(Bytes::from(vec![0, 1, 2, 3, 4, 5])), - ..Default::default() + pub fn sql_addr(row: &Row, idx: usize) -> H160 { + let url = sql_url(row, idx); + eth_payto_addr(&url).unwrap_or_else(|_| { + panic!( + "Database invariant: expected an ethereum payto url got {}", + url + ) }) - .await?; + } +} + +mod loops { + pub mod watcher { + use eth_wire::rpc::Rpc; + use taler_common::postgres::Client; - println!("Calling balance."); - for account in &accounts { - let balance = web3.eth().balance(*account, None).await?; - println!("Balance of {:?}: {}", account, balance); + pub fn watcher(mut rpc: Rpc, mut db: Client) { + let mut notifier = rpc.subscribe_new_head().unwrap(); + loop { + db.execute("NOTIFY new_block", &[]).unwrap(); + notifier.next().unwrap(); + } + } } - let result = filter.poll().await?; - dbg!(result); - let filter = web3.eth_filter().create_blocks_filter().await?; - let result = filter.poll().await?; - dbg!(result);*/ + pub mod worker { + use std::time::SystemTime; - //let nb = web3.eth().block_number().await.unwrap().as_u64(); - //println!("{} blocks", nb); + use eth_wire::{ + metadata::InMetadata, + rpc::Rpc, + taler_util::{eth_payto_url, eth_to_taler}, + BlockState, + }; + use taler_common::{ + api_common::base32, + log::log::{error, info}, + postgres::{fallible_iterator::FallibleIterator, Client}, + sql::{sql_array, sql_url}, + }; - /*let mut db = rusty_leveldb::DB::open( - format!("{}/.ethereum/geth/chaindata", home), - Options::default(), - ) - .unwrap(); + use crate::{ + sql::{sql_addr, sql_eth_amount}, + status::TxStatus, + LoopResult, WireState, + }; + + pub fn worker(mut rpc: Rpc, mut db: Client, state: &WireState) { + let mut skip_notification = false; + loop { + let result: LoopResult<()> = (|| { + // Listen to all channels + db.batch_execute("LISTEN new_block; LISTEN new_tx")?; + // Wait for the next notification + { + let mut ntf = db.notifications(); + if !skip_notification && ntf.is_empty() { + // Block until next notification + ntf.blocking_iter().next()?; + } + // Conflate all notifications + let mut iter = ntf.iter(); + while iter.next()?.is_some() {} + } + + sync_chain(&mut rpc, &mut db, state)?; - // Getting hash using hashKey - let mut key = [b'h', 0, 0, 0, 0, 0, 0, 0, 0, b'n']; - U64::from(40).to_big_endian(&mut key[1..9]); - dbg!(&key); - let block_hash = db.get(&key).unwrap(); - dbg!(&block_hash); - - let mut key = Vec::new(); - key.push(b'h'); - key.extend_from_slice(&40u64.to_be_bytes()); - key.extend_from_slice(&block_hash); - dbg!(&key); - let header = db.get(&key).unwrap(); - //let header = BlockHeader:: ::from(header); - dbg!(header); - - return;*/ - /*let start = Instant::now(); - let mut nb_block = 0; - let mut nb_tx = 0; - let mut prev = 0; - { - let stdout = std::io::stdout(); - let mut stdout = stdout.lock(); - while let Some(block) = rpc.block(nb_block).unwrap() { - nb_block += 1; - nb_tx += block.transactions.len(); - if nb_block % 1000 == 0 { - let elapsed = start.elapsed().as_secs(); - writeln!( - &mut stdout, - "{:>5}kb {:>10}t {:>7}+ {:}h{:0>2}m{:0>2}s", - nb_block / 1000, - nb_tx, - nb_tx - prev, - elapsed / (60 * 60), - (elapsed % (60 * 60)) / 60, - elapsed % 60 - ) - .ok(); - prev = nb_tx; + while send(&mut db, &mut rpc, state)? {} + Ok(()) + })(); + + if let Err(e) = result { + error!("worker: {}", e); + skip_notification = false; + } else { + skip_notification = false; + } } } + + fn sync_chain(rpc: &mut Rpc, db: &mut Client, state: &WireState) -> LoopResult<bool> { + let row = db.query_one("SELECT value FROM state WHERE name='last_block'", &[])?; + let slice: &[u8] = row.get(0); + let block = BlockState::from_bytes(slice.try_into().unwrap()); + + let mut txs = Vec::new(); + + txs.extend(rpc.pending_transactions()?.into_iter().map(|t| (t, 0))); + + let mut cursor = rpc.current_block()?; + let mut confirmation = 1; + + // TODO check hash to detect reorg + + while cursor.number.expect("Mined block") != block.number { + txs.extend(cursor.transactions.drain(..).map(|t| (t, confirmation))); + cursor = rpc.block(cursor.number.unwrap() - 1u64)?.unwrap(); + confirmation += 1; + } + + for (tx, _confirmation) in txs { + if tx.to == Some(state.address) { + let metadata = InMetadata::decode(&tx.input).unwrap(); + match metadata { + InMetadata::Deposit { reserve_pub } => { + let date = SystemTime::now(); + let amount = eth_to_taler(&tx.value); + let credit_addr = tx.from.expect("Not coinbase"); + let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ + &date, &amount.to_string(), &reserve_pub.as_ref(), &eth_payto_url(&credit_addr).as_ref(), &state.config.payto.as_ref() + ])?; + if nb > 0 { + info!( + "<< {} {} in {} from {}", + amount, + base32(&reserve_pub), + hex::encode(tx.hash), + hex::encode(credit_addr), + ); + } + } + } + } + } + + Ok(true) + } + + /// Send a transaction on the blockchain, return true if more transactions with the same status remains + fn send(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> { + // We rely on the advisory lock to ensure we are the only one sending transactions + let row = db.query_opt( + "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY _date LIMIT 1", + &[&(TxStatus::Requested as i16)], + )?; + if let Some(row) = &row { + let id: i32 = row.get(0); + let amount = sql_eth_amount(row, 1); + let wtid: [u8; 32] = sql_array(row, 2); + let addr = sql_addr(row, 3); + let url = sql_url(row, 4); + match rpc.withdraw(state.address, addr, amount, wtid, url) { + Ok(tx_id) => { + db.execute( + "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3", + &[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id], + )?; + let amount = eth_to_taler(&amount); + info!(">> {} {} in {} to {}", amount, base32(&wtid), tx_id, addr); + } + Err(e) => { + db.execute( + "UPDATE tx_out SET status=$1 WHERE id=$2", + &[&(TxStatus::Delayed as i16), &id], + )?; + Err(e)?; + } + } + } + Ok(row.is_some()) + } } - println!("Done scanned {} blocks in {:?}", nb_block, start.elapsed());*/ +} + +fn main() { + taler_common::log::init(); + + let path = std::env::args().nth(1).unwrap(); + let config = load_eth_config(path); + + let state: &'static WireState = Box::leak(Box::new(WireState { + confirmation: AtomicU16::new(config.confirmation), + address: eth_payto_addr(&config.payto).unwrap(), + config, + })); + + let mut rpc_worker = Rpc::new( + state + .config + .core + .data_dir + .as_ref() + .unwrap() + .join("geth.ipc"), + ) + .unwrap(); + + rpc_worker.unlock_account(&state.address, "password").unwrap(); + + let rpc_watcher = Rpc::new( + state + .config + .core + .data_dir + .as_ref() + .unwrap() + .join("geth.ipc"), + ) + .unwrap(); + let db_watcher = Client::connect(&state.config.core.db_url, NoTls).unwrap(); + let db_worker = Client::connect(&state.config.core.db_url, NoTls).unwrap(); + + std::thread::spawn(move || watcher(rpc_watcher, db_watcher)); + + worker(rpc_worker, db_worker, state); } diff --git a/eth-wire/src/rpc.rs b/eth-wire/src/rpc.rs @@ -173,7 +173,7 @@ impl Rpc { self.call("personal_listAccounts", &EMPTY) } - pub fn new_account(&mut self, passwd: &str) -> Result<Vec<Address>> { + pub fn new_account(&mut self, passwd: &str) -> Result<Address> { self.call("personal_newAccount", &[passwd]) } @@ -225,6 +225,10 @@ impl Rpc { let number: U64 = self.call("eth_blockNumber", &EMPTY)?; Ok(self.block(number)?.expect("Current block must exist")) } + + pub fn balance(&mut self, addr: &Address) -> Result<U256> { + self.call("eth_getBalance", &(addr, "latest")) + } } pub struct RpcStream<T: Debug + DeserializeOwned> { diff --git a/eth-wire/src/status.rs b/eth-wire/src/status.rs @@ -0,0 +1,69 @@ +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ +//! Transactions status in database + +/// Outgoing transaction status +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum TxStatus { + /// Client have ask for a transaction (default) + Requested = 0, + /// The wire failed to send this transaction and will try later + Delayed = 1, + /// Transaction have been announced to the bitcoin network + Sent = 2, +} + +impl TryFrom<u8> for TxStatus { + type Error = (); + + fn try_from(v: u8) -> Result<Self, Self::Error> { + match v { + x if x == TxStatus::Requested as u8 => Ok(TxStatus::Requested), + x if x == TxStatus::Sent as u8 => Ok(TxStatus::Sent), + x if x == TxStatus::Delayed as u8 => Ok(TxStatus::Delayed), + _ => Err(()), + } + } +} + +/// Bounce transaction status +#[repr(u8)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum BounceStatus { + /// Bounce have been requested (default) + Requested = 0, + /// The wire failed to send this bounce and will try later + Delayed = 1, + /// Bounce will not be sent (e.g: bounce amount smaller than bounce fee) + Ignored = 2, + /// Bounce have been announced to the bitcoin network + Sent = 3, +} + +impl TryFrom<u8> for BounceStatus { + type Error = (); + + fn try_from(v: u8) -> Result<Self, Self::Error> { + match v { + x if x == BounceStatus::Requested as u8 => Ok(BounceStatus::Requested), + x if x == BounceStatus::Sent as u8 => Ok(BounceStatus::Sent), + x if x == BounceStatus::Delayed as u8 => Ok(BounceStatus::Delayed), + x if x == BounceStatus::Ignored as u8 => Ok(BounceStatus::Ignored), + _ => Err(()), + } + } +} diff --git a/eth-wire/src/taler_util.rs b/eth-wire/src/taler_util.rs @@ -0,0 +1,46 @@ +use std::str::FromStr; + +use ethereum_types::{Address, U256}; +use taler_common::{api_common::Amount, url::Url}; + +const WEI: u64 = 1_000_000_000_000_000_000; +const TRUNC: u64 = 10_000_000_000; + +/// Generate a payto uri from an eth address +pub fn eth_payto_url(addr: &Address) -> Url { + Url::from_str(&format!( + "payto://ethereum/{}", + hex::encode(addr.as_bytes()) + )) + .unwrap() +} + +/// Extract an eth address from a payto uri +pub fn eth_payto_addr(url: &Url) -> Result<Address, String> { + if url.domain() != Some("ethereum") { + return Err(format!( + "Expected domain 'ethereum' got '{}'", + url.domain().unwrap_or_default() + )); + } + let str = url.path().trim_start_matches('/'); + return Address::from_str(str).map_err(|e| e.to_string()); +} + +/// Transform a eth amount into a taler amount +pub fn eth_to_taler(amount: &U256) -> Amount { + return Amount::new( + "ETH", + (amount / WEI).as_u64(), + ((amount % WEI) / TRUNC).as_u32(), + ); +} + +/// Transform a eth amount into a btc amount +pub fn taler_to_eth(amount: &Amount) -> Result<U256, String> { + if amount.currency != "ETH" { + return Err(format!("expected ETH for {}", amount.currency)); + } + + Ok(U256::from(amount.value) * WEI + U256::from(amount.fraction) * TRUNC) +} diff --git a/makefile b/makefile @@ -19,4 +19,7 @@ test_btc: test/btc/maxfee.sh test/btc/config.sh -test: install test_gateway test_btc -\ No newline at end of file +test_eth: + test/eth/wire.sh + +test: install test_gateway test_btc test_eth +\ No newline at end of file diff --git a/taler-common/src/config.rs b/taler-common/src/config.rs @@ -62,7 +62,7 @@ pub struct GatewayConfig { pub port: u16, pub unix_path: Option<PathBuf>, pub payto: Url, - pub init: CoreConfig, + pub core: CoreConfig, } impl Config for GatewayConfig { @@ -74,7 +74,7 @@ impl Config for GatewayConfig { http_lifetime: nb(dep, "HTTP_LIFETIME") .and_then(|nb| (nb != 0).then(|| Some(nb))) .unwrap_or(None), - init: CoreConfig::load_from_ini(ini, currency, dep), + core: CoreConfig::load_from_ini(ini, currency, dep), } } } @@ -89,7 +89,7 @@ pub struct WireConfig<const DEFAULT_FEE: u64, const DEFAULT_CONFIRMATION: u16> { pub wire_lifetime: Option<u32>, pub bump_delay: Option<u32>, pub payto: Url, - pub init: CoreConfig, + pub core: CoreConfig, } impl<const DEFAULT_FEE: u64, const DEFAULT_CONFIRMATION: u16> Config @@ -108,7 +108,7 @@ impl<const DEFAULT_FEE: u64, const DEFAULT_CONFIRMATION: u16> Config bump_delay: nb(dep, "BUMP_DELAY") .and_then(|nb| (nb != 0).then(|| Some(nb))) .unwrap_or(None), - init: CoreConfig::load_from_ini(ini, currency, dep), + core: CoreConfig::load_from_ini(ini, currency, dep), } } } @@ -117,7 +117,7 @@ pub type BtcConfig = WireConfig<1000, 6>; pub fn load_btc_config(path: impl AsRef<Path>) -> BtcConfig { let config = WireConfig::load_from_file(path); - assert_eq!(config.init.currency, "BTC"); + assert_eq!(config.core.currency, "BTC"); return config; } @@ -125,7 +125,7 @@ pub type EthConfig = WireConfig<1000000, 24>; pub fn load_eth_config(path: impl AsRef<Path>) -> EthConfig { let config = WireConfig::load_from_file(path); - assert_eq!(config.init.currency, "ETH"); + assert_eq!(config.core.currency, "ETH"); return config; } diff --git a/test/btc/wire.sh b/test/btc/wire.sh @@ -1,6 +1,6 @@ #!/bin/bash -## Test btc_wire correctly receive and sens transactions on the blockchain +## Test btc_wire correctly receive and send transactions on the blockchain set -eu diff --git a/test/common.sh b/test/common.sh @@ -156,7 +156,7 @@ function restart_btc() { resume_btc $* } -# Mine blocks +# Mine bitcoin blocks function mine_btc() { $BTC_CLI generatetoaddress "${1:-1}" $RESERVE > /dev/null } @@ -193,34 +193,32 @@ function check_balance() { # ----- btc-wire ----- # -# Start btc_wire +# Start btc-wire function btc_wire() { - cargo build --bin btc-wire --release &> /dev/null - target/release/btc-wire $CONF &>> log/btc_wire.log & + cargo build --bin btc-wire --release &> log/cargo.log + target/release/btc-wire $CONF &> log/btc_wire.log & WIRE_PID="$!" } # Start multiple btc_wire with random failures in parallel function stress_btc_wire() { - cargo build --bin btc-wire --release --features fail &> /dev/null - target/release/btc-wire $CONF &>> log/btc_wire.log & - target/release/btc-wire $CONF &>> log/btc_wire1.log & + cargo build --bin btc-wire --release --features fail &> log/cargo.log + target/release/btc-wire $CONF &> log/btc_wire.log & + target/release/btc-wire $CONF &> log/btc_wire1.log & } # ----- Ethereum node ----- # # Start a geth dev node, generate money, wallet and addresses function init_eth() { - # TODO find a way to init with eth-wire-cli # Create wallets - for pswd in "wire" "client" "reserve"; do + for pswd in "reserve" "client"; do $ETH_CLI account new --password <(echo "password") &> /dev/null done # Retrieve addresses local ADDR=`$ETH_CLI account list 2> /dev/null | grep -oP '(?<={).*?(?=})'` - WIRE=`sed -n '1p' <(echo "$ADDR")` + RESERVE=`sed -n '1p' <(echo "$ADDR")` CLIENT=`sed -n '2p' <(echo "$ADDR")` - RESERVE=`sed -n '3p' <(echo "$ADDR")` # Generate genesis echo "{ \"config\": { @@ -239,14 +237,56 @@ function init_eth() { \"gasLimit\": \"0\", \"baseFeePerGas\": null, \"alloc\": { - \"$CLIENT\": { \"balance\": \"1000000000000000000\" }, - \"$WIRE\": { \"balance\": \"500000000000000000\" } + \"$CLIENT\": { \"balance\": \"10000000000000000000\" } } }" > $DIR/genesis.json # Initialize blockchain - $ETH_CLI init $DIR/genesis.json + $ETH_CLI init $DIR/genesis.json &> log/eth.log # Start node - $ETH_CLI $* &>> log/eth.log & + $ETH_CLI --miner.gasprice 0 $* &> log/eth.log & + sleep 1 + # Create wire address + WIRE=`eth-wire-cli initwallet $CONF | grep -oP '(?<=is ).*'` + echo -e "PAYTO = payto://ethereum/$WIRE" >> $CONF +} + +# Check client and wire balance +function check_balance_eth() { + local CLIENT_BALANCE=`eth-wire-utils -d $WIRE_DIR balance $CLIENT` + local WIRE_BALANCE=`eth-wire-utils -d $WIRE_DIR balance $WIRE` + local CLIENT="${1:-*}" + if [ "$1" == "*" ]; then + local CLIENT="$CLIENT_BALANCE" + fi + if [ "$CLIENT_BALANCE" != "$CLIENT" ] || [ "$WIRE_BALANCE" != "${2:-$WIRE_BALANCE}" ]; then + echo "expected: client $CLIENT wire ${2:-$WIRE_BALANCE} got: client $CLIENT_BALANCE wire $WIRE_BALANCE" + exit 1 + fi +} + + +# ----- eth-wire ----- # + +# Start eth-wire +function eth_wire() { + cargo build --bin eth-wire --release &> log/cargo.log + target/release/eth-wire $CONF &> log/eth_wire.log & + WIRE_PID="$!" +} + +# Mine ethereum blocks +function mine_eth() { + eth-wire-utils -d $WIRE_DIR mine $RESERVE ${1:-} +} + +# Mine previous transactions +function next_eth() { + # Mine enough block to confirm previous transactions + mine_eth ${1:-$CONFIRMATION} + # Wait for eth-wire to catch up + sleep 0.2 + # Mine one more block to trigger eth-wire + mine_eth } # ----- Gateway ------ # @@ -285,12 +325,12 @@ function check_delta() { ALL=`curl -s ${BANK_ENDPOINT}history/$1` PRE=${3:-0.0000} for n in `$2`; do - if ! `echo $ALL | grep BTC:$PRE$n > /dev/null`; then - echo -n " missing tx with amount: BTC:$PRE$n" + if ! `echo $ALL | grep $CURRENCY:$PRE$n > /dev/null`; then + echo -n " missing tx with amount: $CURRENCY:$PRE$n" return 1 fi done - NB=`echo $ALL | grep -o BTC:$PRE | wc -l` + NB=`echo $ALL | grep -o $CURRENCY:$PRE | wc -l` EXPECTED=`$2 | wc -w` if [ "$EXPECTED" != "$NB" ]; then echo -n " expected: $EXPECTED txs found $NB" diff --git a/test/conf/taler_eth.conf b/test/conf/taler_eth.conf @@ -7,5 +7,4 @@ BASE_URL = http://test.com [depolymerizer-ethereum] DB_URL = postgres://localhost:5454/postgres?user=postgres&password=password PORT = 8060 -PAYTO = payto://euthereum/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj CONFIRMATION = 3 \ No newline at end of file diff --git a/test/eth/wire.sh b/test/eth/wire.sh @@ -0,0 +1,73 @@ +#!/bin/bash + +## Test eth-wire correctly receive and send transactions on the blockchain + +set -eu + +source "${BASH_SOURCE%/*}/../common.sh" +SCHEMA=eth.sql +CONFIG=taler_eth.conf + +echo "----- Setup -----" +echo "Load config file" +load_config +echo "Start database" +setup_db +echo "Start ethereum node" +init_eth +echo "Start eth-wire" +eth_wire +echo "Start gateway" +gateway +echo "" + +SEQ="seq 10 29" +RUST_BACKTRACE=1 + +echo "----- Receive -----" + +echo -n "Making wire transfer to exchange:" +for n in `$SEQ`; do + eth-wire-utils -d $WIRE_DIR deposit $CLIENT $WIRE 0.000$n +done +next_eth # Trigger eth-wire +check_balance_eth 999610000 390000 +echo " OK" + +echo -n "Requesting exchange incoming transaction list:" +check_delta "incoming?delta=-100" "$SEQ" "0.000" +echo " OK" + +echo "----- Send -----" + +echo -n "Making wire transfer from exchange:" +for n in `$SEQ`; do + taler-exchange-wire-gateway-client \ + -b $BANK_ENDPOINT \ + -C payto://ethereum/$CLIENT \ + -a ETH:0.0000$n > /dev/null +done +sleep 1 +mine_eth # Mine transactions +check_balance_eth 999649000 351000 +echo " OK" + +echo -n "Requesting exchange's outgoing transaction list:" +check_delta "outgoing?delta=-100" "$SEQ" +echo " OK" + +#echo "----- Bounce -----" + +#clear_wallet + +#echo -n "Bounce:" +#for n in `seq 10 40`; do +# eth-wire-utils -d $WIRE_DIR send $CLIENT $WIRE 0.000$n +# mine_btc +#done +#next_btc +#sleep 3 +#check_balance_eth "*" 0.00031000 +#echo " OK" + +echo "All tests passed!" +\ No newline at end of file diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs @@ -50,6 +50,7 @@ mod json; struct ServerState { pool: Pool, config: GatewayConfig, + domain: &'static str, notify: Notify, lifetime: Option<AtomicU32>, status: AtomicBool, @@ -94,7 +95,7 @@ async fn main() { taler_common::log::log::warn!("Running with test admin endpoint unsuitable for production"); // Parse postgres url - let config = tokio_postgres::Config::from_str(&conf.init.db_url).unwrap(); + let config = tokio_postgres::Config::from_str(&conf.core.db_url).unwrap(); // TODO find a way to clean this ugly mess let mut cfg = deadpool_postgres::Config::new(); cfg.user = config.get_user().map(|it| it.to_string()); @@ -121,6 +122,11 @@ async fn main() { let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap(); let state = ServerState { pool, + domain: match conf.core.currency.as_str() { + "BTC" => "bitcoin", + "ETH" => "ethereum", + currency => unimplemented!("Unsupported currency {}", currency), + }, config: conf.clone(), notify: Notify::new(), lifetime: conf.http_lifetime.map(AtomicU32::new), @@ -200,14 +206,15 @@ async fn main() { } /// Check if an url is a valid bitcoin payto url -fn check_pay_to(url: &Url) -> bool { - return url.domain() == Some("bitcoin") +fn check_pay_to(url: &Url, domain: &str) -> bool { + // TODO currency agnostic + return url.domain() == Some(domain) && url.scheme() == "payto" && url.username() == "" && url.password().is_none() && url.query().is_none() - && url.fragment().is_none() - && bitcoin::Address::from_str(url.path().trim_start_matches('/')).is_ok(); + && url.fragment().is_none(); + //&& bitcoin::Address::from_str(url.path().trim_start_matches('/')).is_ok(); } /// Assert request method match expected @@ -271,13 +278,13 @@ async fn router( StatusCode::BAD_REQUEST, ErrorCode::GENERIC_PARAMETER_MALFORMED, )?; - if !check_pay_to(&request.credit_account) { + if !check_pay_to(&request.credit_account, state.domain) { return Err(ServerError::code( StatusCode::BAD_REQUEST, ErrorCode::GENERIC_PAYTO_URI_MALFORMED, )); } - if request.amount.currency != state.config.init.currency { + if request.amount.currency != state.config.core.currency { return Err(ServerError::code( StatusCode::BAD_REQUEST, ErrorCode::GENERIC_PARAMETER_MALFORMED, @@ -450,7 +457,7 @@ async fn router( /// Listen to backend status change fn status_watcher(state: &'static ServerState) { fn inner(state: &'static ServerState) -> Result<(), Box<dyn std::error::Error>> { - let mut db = Client::connect(&state.config.init.db_url, NoTls)?; + let mut db = Client::connect(&state.config.core.db_url, NoTls)?; // Register as listener db.batch_execute("LISTEN status")?; loop {