depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 5579e64e79ccfc38105ce302704267e3d899fd52
parent 19b478344ed125aa200048332f0e47341ed1f957
Author: Antoine A <>
Date:   Thu, 28 Nov 2024 18:37:05 +0100

Modern schema and clean code

Diffstat:
Mbtc-wire/src/loops/worker.rs | 44+++++++++++++++++++-------------------------
Mbtc-wire/src/sql.rs | 2+-
Mcommon/src/sql.rs | 9+++++----
Mdb/btc.sql | 28++++++++++++++++------------
Mdb/common.sql | 24++++++++++++++----------
Mdb/eth.sql | 30+++++++++++++++++-------------
Meth-wire/src/loops/worker.rs | 64+++++++++++++++++++++++++++++++++++-----------------------------
Meth-wire/src/rpc.rs | 2+-
Meth-wire/src/sql.rs | 2+-
Minstrumentation/src/eth.rs | 4++--
Mtaler-api/src/db.rs | 14+++++++-------
Mtaler-api/src/db/type_helper.rs | 10+++++++++-
Mtaler-api/src/error.rs | 8++++----
Mtaler-api/src/lib.rs | 4++--
Mtaler-api/src/notification.rs | 2+-
Mtaler-common/src/api_wire.rs | 10+++++-----
Mwire-gateway/src/main.rs | 80+++++++++++++++++++++++++++++++++++++------------------------------------------
17 files changed, 176 insertions(+), 161 deletions(-)

diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs @@ -13,11 +13,7 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::{ - collections::HashMap, - fmt::Write, - time::{Duration, SystemTime}, -}; +use std::{collections::HashMap, fmt::Write, time::SystemTime}; use bitcoin::{hashes::Hash, Amount as BtcAmount, BlockHash, Txid}; use btc_wire::{ @@ -36,7 +32,7 @@ use common::{ reconnect::AutoReconnectDb, sql::{sql_array, sql_url}, status::{BounceStatus, DebitStatus}, - taler_common::api_common::base32, + taler_common::api_common::{base32, Timestamp}, }; use postgres::{fallible_iterator::FallibleIterator, Client}; @@ -337,10 +333,9 @@ fn sync_chain_incoming_confirmed( // Store transactions in database let debit_addr = sender_address(rpc, &full)?; let credit_addr = full.details[0].address.clone().unwrap().assume_checked(); - let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); let amount = btc_to_taler(&full.amount, state.currency); - let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ - &date, &amount.to_string(), &reserve_pub.as_slice(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(&credit_addr).as_ref() + let nb = db.execute("INSERT INTO tx_in (received, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6) ON CONFLICT (reserve_pub) DO NOTHING ", &[ + &((full.time * 1000000) as i64), &(amount.value as i64), &(amount.fraction as i32), &reserve_pub.as_slice(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(&credit_addr).as_ref() ])?; if nb > 0 { info!( @@ -356,8 +351,8 @@ fn sync_chain_incoming_confirmed( GetSegwitErr::Decode(_) => { // If encoding is wrong request a bounce db.execute( - "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", - &[&id.as_byte_array().as_slice()], + "INSERT INTO bounce (created, bounced) VALUES ($1, $2) ON CONFLICT (bounced) DO NOTHING", + &[&Timestamp::now().as_sql_micros(), &id.as_byte_array().as_slice()], )?; } GetSegwitErr::RPC(e) => return Err(e.into()), @@ -401,7 +396,7 @@ fn sync_chain_debit( )?; if let Some(row) = row { // If already in database, sync status - let row_id: i32 = row.get(0); + let row_id: i64 = row.get(0); let status: i16 = row.get(1); match DebitStatus::try_from(status as u8).unwrap() { DebitStatus::Requested => { @@ -442,10 +437,9 @@ fn sync_chain_debit( } else { // Else add to database let debit_addr = sender_address(rpc, full)?; - let date = SystemTime::UNIX_EPOCH + Duration::from_secs(full.time); let nb = db.execute( - "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", - &[&date, &amount.to_string(), &wtid.as_slice(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(&credit_addr).as_ref(), &state.base_url.as_ref(), &(DebitStatus::Sent as i16), &id.as_byte_array().as_slice(), &None::<&[u8]>], + "INSERT INTO tx_out (created, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (wtid) DO NOTHING", + &[&((full.time*1000000) as i64), &(amount.value as i64), &(amount.fraction as i32), &wtid.as_slice(), &btc_payto_url(&debit_addr).as_ref(), &btc_payto_url(&credit_addr).as_ref(), &state.base_url.as_ref(), &(DebitStatus::Sent as i16), &id.as_byte_array().as_slice(), &None::<&[u8]>], )?; if nb > 0 { warn!( @@ -498,7 +492,7 @@ fn sync_chain_bounce( )?; if let Some(row) = row { // If already in database, sync status - let row_id: i32 = row.get(0); + let row_id: i64 = row.get(0); let status: i16 = row.get(1); match BounceStatus::try_from(status as u8).unwrap() { BounceStatus::Requested => { @@ -524,8 +518,8 @@ fn sync_chain_bounce( } else { // Else add to database let nb = db.execute( - "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", - &[&bounced.as_byte_array().as_slice(), &id.as_byte_array().as_slice(), &(BounceStatus::Sent as i16)], + "INSERT INTO bounce (created, bounced, txid, status) VALUES ($1, $2, $3, $4) ON CONFLICT (txid) DO NOTHING", + &[&Timestamp::now().as_sql_micros(), &bounced.as_byte_array().as_slice(), &id.as_byte_array().as_slice(), &(BounceStatus::Sent as i16)], )?; if nb > 0 { warn!("|| (onchain) {} in {}", &bounced, &id); @@ -569,15 +563,15 @@ fn sync_chain_outgoing( fn debit(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> { // We rely on the advisory lock to ensure we are the only one sending transactions let row = db.query_opt( - "SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY _date LIMIT 1", + "SELECT id, (amount).val, (amount).frac, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY created LIMIT 1", &[&(DebitStatus::Requested as i16)], )?; if let Some(row) = &row { - let id: i32 = row.get(0); + let id: i64 = row.get(0); let amount = sql_btc_amount(row, 1, state.currency); - let wtid: [u8; 32] = sql_array(row, 2); - let addr = sql_addr(row, 3); - let url = sql_url(row, 4); + let wtid: [u8; 32] = sql_array(row, 3); + let addr = sql_addr(row, 4); + let url = sql_url(row, 5); let metadata = OutMetadata::Debit { wtid, url }; let tx_id = rpc.send( @@ -605,11 +599,11 @@ fn debit(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> fn bounce(db: &mut Client, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> { // We rely on the advisory lock to ensure we are the only one sending transactions let row = db.query_opt( - "SELECT id, bounced FROM bounce WHERE status=$1 ORDER BY _date LIMIT 1", + "SELECT id, bounced FROM bounce WHERE status=$1 ORDER BY created LIMIT 1", &[&(BounceStatus::Requested as i16)], )?; if let Some(row) = &row { - let id: i32 = row.get(0); + let id: i64 = row.get(0); let bounced: Txid = sql_txid(row, 1); let metadata = OutMetadata::Bounce { bounced: *bounced.as_byte_array(), diff --git a/btc-wire/src/sql.rs b/btc-wire/src/sql.rs @@ -24,7 +24,7 @@ use btc_wire::taler_utils::{btc_payto_addr, taler_to_btc}; /// Bitcoin amount from sql pub fn sql_btc_amount(row: &Row, idx: usize, currency: CurrencyBtc) -> BtcAmount { - let amount = sql_amount(row, idx); + let amount = sql_amount(row, idx, currency.to_str()); taler_to_btc(&amount, currency).or_fail(|_| { format!( "Database invariant: expected an bitcoin amount got {}", diff --git a/common/src/sql.rs b/common/src/sql.rs @@ -29,9 +29,10 @@ pub fn sql_url(row: &Row, idx: usize) -> Url { } /// Ethereum amount from sql -pub fn sql_amount(row: &Row, idx: usize) -> Amount { - let str: &str = row.get(idx); - Amount::from_str(str).or_fail(|_| format!("Database invariant: expected an amount got {}", str)) +pub fn sql_amount(row: &Row, idx: usize, currency: &str) -> Amount { + let val: i64 = row.get(idx); + let frac: i32 = row.get(idx + 1); + Amount::new(currency.to_string(), val as u64, frac as u32) } /// Byte array from sql @@ -48,6 +49,6 @@ pub fn sql_array<const N: usize>(row: &Row, idx: usize) -> [u8; N] { /// Safe safe u64 from sql pub fn sql_safe_u64(row: &Row, idx: usize) -> SafeU64 { - let id: i32 = row.get(idx); + let id: i64 = row.get(idx); SafeU64::try_from(id as u64).unwrap() } diff --git a/db/btc.sql b/db/btc.sql @@ -1,38 +1,42 @@ +CREATE TYPE taler_amount AS (val INT8, frac INT4); +COMMENT ON TYPE taler_amount + IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; + -- Key value state CREATE TABLE state ( - name TEXT PRIMARY KEY, + name TEXT NOT NULL PRIMARY KEY, value BYTEA NOT NULL ); -- Incoming transactions CREATE TABLE tx_in ( - id SERIAL PRIMARY KEY, - _date TIMESTAMP NOT NULL DEFAULT now(), - amount TEXT NOT NULL, - reserve_pub BYTEA NOT NULL UNIQUE, + id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + received INT8 NOT NULL, + amount taler_amount NOT NULL, + reserve_pub BYTEA NOT NULL UNIQUE CHECK (LENGTH(reserve_pub)=32), debit_acc TEXT NOT NULL, credit_acc TEXT NOT NULL ); -- Outgoing transactions CREATE TABLE tx_out ( - id SERIAL PRIMARY KEY, - _date TIMESTAMP NOT NULL DEFAULT now(), - amount TEXT NOT NULL, - wtid BYTEA NOT NULL UNIQUE, + id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + created INT8 NOT NULL, + amount taler_amount NOT NULL, + wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32), debit_acc TEXT NOT NULL, credit_acc TEXT NOT NULL, exchange_url TEXT NOT NULL, - request_uid BYTEA UNIQUE, + request_uid BYTEA UNIQUE CHECK (LENGTH(request_uid)=64), status SMALLINT NOT NULL DEFAULT 0, txid BYTEA UNIQUE ); -- Bounced transaction CREATE TABLE bounce ( - id SERIAL PRIMARY KEY, + id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, bounced BYTEA UNIQUE NOT NULL, txid BYTEA UNIQUE, - _date TIMESTAMP NOT NULL DEFAULT now(), + created INT8 NOT NULL, status SMALLINT NOT NULL DEFAULT 0 ) \ No newline at end of file diff --git a/db/common.sql b/db/common.sql @@ -1,27 +1,31 @@ +CREATE TYPE taler_amount AS (val INT8, frac INT4); +COMMENT ON TYPE taler_amount + IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; + -- Key value state CREATE TABLE state ( - name TEXT PRIMARY KEY, + name TEXT NOT NULL PRIMARY KEY, value BYTEA NOT NULL ); -- Incoming transactions CREATE TABLE tx_in ( - id SERIAL PRIMARY KEY, - _date TIMESTAMP NOT NULL DEFAULT now(), - amount TEXT NOT NULL, - reserve_pub BYTEA NOT NULL UNIQUE, + id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + created INT8 NOT NULL DEFAULT now(), + amount taler_amount NOT NULL, + reserve_pub BYTEA NOT NULL UNIQUE CHECK (LENGTH(reserve_pub)=32), debit_acc TEXT NOT NULL, credit_acc TEXT NOT NULL ); -- Outgoing transactions CREATE TABLE tx_out ( - id SERIAL PRIMARY KEY, - _date TIMESTAMP NOT NULL DEFAULT now(), - amount TEXT NOT NULL, - wtid BYTEA NOT NULL UNIQUE, + id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + created INT8 NOT NULL DEFAULT now(), + amount taler_amount NOT NULL, + wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32), debit_acc TEXT NOT NULL, credit_acc TEXT NOT NULL, exchange_url TEXT NOT NULL, - request_uid BYTEA UNIQUE + request_uid BYTEA UNIQUE CHECK (LENGTH(request_uid)=64) ); \ No newline at end of file diff --git a/db/eth.sql b/db/eth.sql @@ -1,39 +1,43 @@ +CREATE TYPE taler_amount AS (val INT8, frac INT4); +COMMENT ON TYPE taler_amount + IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; + -- Key value state CREATE TABLE state ( - name TEXT PRIMARY KEY, + name TEXT NOT NULL PRIMARY KEY, value BYTEA NOT NULL ); -- Incoming transactions CREATE TABLE tx_in ( - id SERIAL PRIMARY KEY, - _date TIMESTAMP NOT NULL DEFAULT now(), - amount TEXT NOT NULL, - reserve_pub BYTEA NOT NULL UNIQUE, + id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + received INT8 NOT NULL, + amount taler_amount NOT NULL, + reserve_pub BYTEA NOT NULL UNIQUE CHECK (LENGTH(reserve_pub)=32), debit_acc TEXT NOT NULL, credit_acc TEXT NOT NULL ); -- Outgoing transactions CREATE TABLE tx_out ( - id SERIAL PRIMARY KEY, - _date TIMESTAMP NOT NULL DEFAULT now(), - amount TEXT NOT NULL, - wtid BYTEA NOT NULL UNIQUE, + id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + created INT8 NOT NULL, + amount taler_amount NOT NULL, + wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32), debit_acc TEXT NOT NULL, credit_acc TEXT NOT NULL, exchange_url TEXT NOT NULL, - request_uid BYTEA UNIQUE, + request_uid BYTEA UNIQUE CHECK (LENGTH(request_uid)=64), status SMALLINT NOT NULL DEFAULT 0, txid BYTEA UNIQUE, - sent TIMESTAMP DEFAULT NULL + sent INT8 DEFAULT NULL ); -- Bounced transaction CREATE TABLE bounce ( - id SERIAL PRIMARY KEY, + id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, bounced BYTEA UNIQUE NOT NULL, txid BYTEA UNIQUE, - _date TIMESTAMP NOT NULL DEFAULT now(), + created INT8 NOT NULL, status SMALLINT NOT NULL DEFAULT 0 ) \ No newline at end of file diff --git a/eth-wire/src/loops/worker.rs b/eth-wire/src/loops/worker.rs @@ -13,7 +13,7 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::{fmt::Write, time::SystemTime}; +use std::fmt::Write; use common::{ log::log::{error, info, warn}, @@ -22,7 +22,7 @@ use common::{ reconnect::AutoReconnectDb, sql::{sql_array, sql_url}, status::{BounceStatus, DebitStatus}, - taler_common::api_common::base32, + taler_common::api_common::{base32, Timestamp}, }; use eth_wire::{ rpc::{self, AutoRpcWallet, Rpc, RpcClient, Transaction, TransactionRequest}, @@ -276,11 +276,10 @@ fn sync_chain_incoming_confirmed( match InMetadata::decode(&tx.input) { Ok(metadata) => match metadata { InMetadata::Credit { reserve_pub } => { - let date = SystemTime::now(); let amount = eth_to_taler(&tx.value, state.currency); let credit_addr = tx.from.expect("Not coinbase"); - let nb = db.execute("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, $2, $3, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ - &date, &amount.to_string(), &reserve_pub.as_ref(), &eth_payto_url(&credit_addr).as_ref(), &state.payto.as_ref() + let nb = db.execute("INSERT INTO tx_in (received, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6) ON CONFLICT (reserve_pub) DO NOTHING ", &[ + &Timestamp::now().as_sql_micros(), &(amount.value as i64), &(amount.fraction as i32), &reserve_pub.as_ref(), &eth_payto_url(&credit_addr).as_ref(), &state.payto.as_ref() ])?; if nb > 0 { info!( @@ -296,8 +295,8 @@ fn sync_chain_incoming_confirmed( Err(_) => { // If encoding is wrong request a bounce db.execute( - "INSERT INTO bounce (bounced) VALUES ($1) ON CONFLICT (bounced) DO NOTHING", - &[&tx.hash.as_ref()], + "INSERT INTO bounce (created, bounced) VALUES ($1, $2) ON CONFLICT (bounced) DO NOTHING", + &[&Timestamp::now().as_sql_micros(), &tx.hash.as_ref()], )?; } } @@ -319,9 +318,9 @@ fn sync_chain_outgoing(tx: &SyncTransaction, db: &mut Client, state: &WireState) )?; if let Some(row) = row { // If already in database, sync status - let row_id: i32 = row.get(0); + let row_id: i64 = row.get(0); let status: i16 = row.get(1); - let sent: Option<SystemTime> = row.get(2); + let sent: Option<i64> = row.get(2); let expected_status = DebitStatus::Sent as i16; let expected_send = sent.filter(|_| *confirmations == 0); @@ -352,10 +351,9 @@ fn sync_chain_outgoing(tx: &SyncTransaction, db: &mut Client, state: &WireState) } } else { // Else add to database - let date = SystemTime::now(); let nb = db.execute( - "INSERT INTO tx_out (_date, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", - &[&date, &amount.to_string(), &wtid.as_ref(), &eth_payto_url(&state.address).as_ref(), &eth_payto_url(&credit_addr).as_ref(), &state.base_url.as_ref(), &(DebitStatus::Sent as i16), &tx.hash.as_ref(), &None::<&[u8]>], + "INSERT INTO tx_out (created, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (wtid) DO NOTHING", + &[&Timestamp::now().as_sql_micros(), &(amount.value as i64), &(amount.fraction as i32), &wtid.as_ref(), &eth_payto_url(&state.address).as_ref(), &eth_payto_url(&credit_addr).as_ref(), &state.base_url.as_ref(), &(DebitStatus::Sent as i16), &tx.hash.as_ref(), &None::<&[u8]>], )?; if nb > 0 { warn!( @@ -377,7 +375,7 @@ fn sync_chain_outgoing(tx: &SyncTransaction, db: &mut Client, state: &WireState) )?; if let Some(row) = row { // If already in database, sync status - let row_id: i32 = row.get(0); + let row_id: i64 = row.get(0); let status: i16 = row.get(1); match BounceStatus::try_from(status as u8).unwrap() { BounceStatus::Requested => { @@ -408,8 +406,8 @@ fn sync_chain_outgoing(tx: &SyncTransaction, db: &mut Client, state: &WireState) } else { // Else add to database let nb = db.execute( - "INSERT INTO bounce (bounced, txid, status) VALUES ($1, $2, $3) ON CONFLICT (txid) DO NOTHING", - &[&bounced.as_ref(), &tx.hash.as_ref(), &(BounceStatus::Sent as i16)], + "INSERT INTO bounce (created, bounced, txid, status) VALUES ($1, $2, $3, $4) ON CONFLICT (txid) DO NOTHING", + &[&Timestamp::now().as_sql_micros(), &bounced.as_ref(), &tx.hash.as_ref(), &(BounceStatus::Sent as i16)], )?; if nb > 0 { warn!( @@ -430,20 +428,26 @@ fn sync_chain_outgoing(tx: &SyncTransaction, db: &mut Client, state: &WireState) fn debit(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> { // We rely on the advisory lock to ensure we are the only one sending transactions let row = db.query_opt( -"SELECT id, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY _date LIMIT 1", +"SELECT id, (amount).val, (amount).frac, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY created LIMIT 1", &[&(DebitStatus::Requested as i16)], )?; if let Some(row) = &row { - let id: i32 = row.get(0); + let id: i64 = row.get(0); let amount = sql_eth_amount(row, 1, state.currency); - let wtid: [u8; 32] = sql_array(row, 2); - let addr = sql_addr(row, 3); - let url = sql_url(row, 4); + let wtid: [u8; 32] = sql_array(row, 3); + let addr = sql_addr(row, 4); + let url = sql_url(row, 5); + let now = Timestamp::now(); let tx_id = rpc.debit(state.address, addr, amount, wtid, url)?; fail_point("(injected) fail debit", 0.3)?; db.execute( - "UPDATE tx_out SET status=$1, txid=$2, sent=now() WHERE id=$3", - &[&(DebitStatus::Sent as i16), &tx_id.as_ref(), &id], + "UPDATE tx_out SET status=$1, txid=$2, sent=$3 WHERE id=$4", + &[ + &(DebitStatus::Sent as i16), + &tx_id.as_ref(), + &now.as_sql_micros(), + &id, + ], )?; let amount = eth_to_taler(&amount, state.currency); info!( @@ -460,13 +464,15 @@ fn debit(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> /// Bump a stuck transaction, return false if no more stuck transactions are found fn bump(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> { if let Some(delay) = state.bump_delay { + let now = Timestamp::now().as_sql_micros(); // We rely on the advisory lock to ensure we are the only one sending transactions let row = db.query_opt( - "SELECT id, txid FROM tx_out WHERE status=$1 AND EXTRACT(EPOCH FROM (now() - sent)) > $2 ORDER BY _date LIMIT 1", - &[&(DebitStatus::Sent as i16), &(delay as f64)], + "SELECT id, txid FROM tx_out WHERE status=$1 AND $2 - sent > $3 ORDER BY created LIMIT 1", + &[&(DebitStatus::Sent as i16), &now, &((delay * 1000000) as i64)], )?; if let Some(row) = &row { - let id: i32 = row.get(0); + let now = Timestamp::now(); + let id: i64 = row.get(0); let txid = sql_hash(row, 1); let tx = rpc.get_transaction(&txid)?.expect("Bump existing tx"); rpc.send_transaction(&TransactionRequest { @@ -478,8 +484,8 @@ fn bump(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> { nonce: Some(tx.nonce), })?; let row = db.query_one( - "UPDATE tx_out SET sent=now() WHERE id=$1 RETURNING wtid", - &[&id], + "UPDATE tx_out SET sent=$1 WHERE id=$2 RETURNING wtid", + &[&now.as_sql_micros(), &id], )?; info!(">> (bump) {} in {}", base32(row.get(0)), hex::encode(txid)); } @@ -493,11 +499,11 @@ fn bump(db: &mut Client, rpc: &mut Rpc, state: &WireState) -> LoopResult<bool> { fn bounce(db: &mut Client, rpc: &mut Rpc, fee: U256) -> LoopResult<bool> { // We rely on the advisory lock to ensure we are the only one sending transactions let row = db.query_opt( - "SELECT id, bounced FROM bounce WHERE status=$1 ORDER BY _date LIMIT 1", + "SELECT id, bounced FROM bounce WHERE status=$1 ORDER BY created LIMIT 1", &[&(BounceStatus::Requested as i16)], )?; if let Some(row) = &row { - let id: i32 = row.get(0); + let id: i64 = row.get(0); let bounced: H256 = sql_hash(row, 1); let bounce = rpc.bounce(bounced, fee)?; diff --git a/eth-wire/src/rpc.rs b/eth-wire/src/rpc.rs @@ -559,7 +559,7 @@ pub mod hex { struct BytesVisitor; - impl<'a> Visitor<'a> for BytesVisitor { + impl Visitor<'_> for BytesVisitor { type Value = Hex; fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { diff --git a/eth-wire/src/sql.rs b/eth-wire/src/sql.rs @@ -24,7 +24,7 @@ use ethereum_types::{H160, H256, U256}; /// Ethereum amount from sql pub fn sql_eth_amount(row: &Row, idx: usize, currency: CurrencyEth) -> U256 { - let amount = sql_amount(row, idx); + let amount = sql_amount(row, idx, currency.to_str()); taler_to_eth(&amount, currency).or_fail(|_| { format!( "Database invariant: expected an ethereum amount got {}", diff --git a/instrumentation/src/eth.rs b/instrumentation/src/eth.rs @@ -469,7 +469,7 @@ impl EthCtx { "import", path, ], - &self.ctx.log("geth"), + self.ctx.log("geth"), "import chain", ); self.resume_node(&[]); @@ -493,7 +493,7 @@ impl EthCtx { "--rpc.enabledeprecatedpersonal", ]; args.extend_from_slice(additional_args); - self.node = cmd_redirect("geth", &args, &self.ctx.log("geth")); + self.node = cmd_redirect("geth", &args, self.ctx.log("geth")); self.rpc = retry_opt(|| Rpc::new(&self.ctx.wire_dir).ok()); for addr in [&self.wire_addr, &self.client_addr, &self.reserve_addr] { self.rpc.unlock_account(addr, &self.passwd).unwrap(); diff --git a/taler-api/src/db.rs b/taler-api/src/db.rs @@ -88,7 +88,7 @@ pub async fn transfer(db: &PgPool, transfer: TransferRequest) -> ApiResult<Trans } else { TransferResult::Success(TransferResponse { timestamp: r.try_get_timestamp("out_timestamp")?, - row_id: r.try_get_safeu64("out_tx_row_id")?, + row_id: r.try_get_safe_u64("out_tx_row_id")?, }) }) }) @@ -122,7 +122,7 @@ pub async fn transfer_page( .build() .try_map(|r: PgRow| { Ok(TransferListStatus { - row_id: r.try_get_safeu64("transfer_id")?, + row_id: r.try_get_safe_u64("transfer_id")?, status: r.try_get("status")?, amount: r.try_get_amount("amount", currency)?, credit_account: r.try_get_url("credit_payto")?, @@ -199,7 +199,7 @@ pub async fn outgoing_page( amount: r.try_get_amount("amount", currency)?, wtid: r.try_get_base32("wtid")?, credit_account: r.try_get_url("credit_payto")?, - row_id: r.try_get_safeu64("transfer_id")?, + row_id: r.try_get_safe_u64("transfer_id")?, date: r.try_get_timestamp("transfer_time")?, exchange_base_url: r.try_get_url("exchange_base_url")?, }) @@ -239,7 +239,7 @@ pub async fn add_incoming( Ok(if r.try_get("out_reserve_pub_reuse")? { AddIncomingResult::ReservePubReuse } else { - AddIncomingResult::Success(r.try_get_safeu64("out_tx_row_id")?) + AddIncomingResult::Success(r.try_get_safe_u64("out_tx_row_id")?) }) }) .fetch_one(db) @@ -279,21 +279,21 @@ pub async fn incoming_page( let kind: IncomingType = r.try_get("type")?; Ok(match kind { IncomingType::reserve => IncomingBankTransaction::IncomingReserveTransaction { - row_id: r.try_get_safeu64("incoming_transaction_id")?, + row_id: r.try_get_safe_u64("incoming_transaction_id")?, date: r.try_get_timestamp("creation_time")?, amount: r.try_get_amount("amount", currency)?, debit_account: r.try_get_url("debit_payto")?, reserve_pub: r.try_get_base32("reserve_pub")?, }, IncomingType::kyc => IncomingBankTransaction::IncomingKycAuthTransaction { - row_id: r.try_get_safeu64("incoming_transaction_id")?, + row_id: r.try_get_safe_u64("incoming_transaction_id")?, date: r.try_get_timestamp("creation_time")?, amount: r.try_get_amount("amount", currency)?, debit_account: r.try_get_url("debit_payto")?, account_pub: r.try_get_base32("account_pub")?, }, IncomingType::wad => IncomingBankTransaction::IncomingWadTransaction { - row_id: r.try_get_safeu64("incoming_transaction_id")?, + row_id: r.try_get_safe_u64("incoming_transaction_id")?, date: r.try_get_timestamp("creation_time")?, amount: r.try_get_amount("amount", currency)?, debit_account: r.try_get_url("debit_payto")?, diff --git a/taler-api/src/db/type_helper.rs b/taler-api/src/db/type_helper.rs @@ -18,7 +18,7 @@ pub trait SqlTypeHelper { fn try_get_timestamp<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<Timestamp> { self.try_get_map(index, Timestamp::from_sql_micros) } - fn try_get_safeu64<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<SafeU64> { + fn try_get_safe_u64<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<SafeU64> { self.try_get_map(index, |signed: i64| SafeU64::try_from(signed)) } fn try_get_base32<I: sqlx::ColumnIndex<Self>, const L: usize>( @@ -31,6 +31,7 @@ pub trait SqlTypeHelper { self.try_get_map(index, Url::parse) } fn try_get_amount(&self, index: &str, currency: &str) -> sqlx::Result<Amount>; + fn try_get_amount_i(&self, index: usize, currency: &str) -> sqlx::Result<Amount>; } impl SqlTypeHelper for PgRow { @@ -63,4 +64,11 @@ impl SqlTypeHelper for PgRow { Ok(Amount::new(currency.to_string(), val as u64, frac as u32)) } + + fn try_get_amount_i(&self, index: usize, currency: &str) -> sqlx::Result<Amount> { + let val: i64 = self.try_get(index)?; + let frac: i32 = self.try_get(index + 1)?; + + Ok(Amount::new(currency.to_string(), val as u64, frac as u32)) + } } diff --git a/taler-api/src/error.rs b/taler-api/src/error.rs @@ -30,7 +30,7 @@ impl From<sqlx::Error> for ApiError { ), }; Self { - code: code, + code, hint: None, status: Some(status), log: Some(format!("db: {value}")), @@ -82,7 +82,7 @@ impl IntoResponse for ApiError { pub fn failure_code(code: ErrorCode) -> ApiError { ApiError { - code: code, + code, hint: None, log: None, status: None, @@ -91,7 +91,7 @@ pub fn failure_code(code: ErrorCode) -> ApiError { pub fn failure(code: ErrorCode, hint: impl Into<String>) -> ApiError { ApiError { - code: code, + code, hint: Some(hint.into()), log: None, status: None, @@ -100,7 +100,7 @@ pub fn failure(code: ErrorCode, hint: impl Into<String>) -> ApiError { pub fn failure_status(code: ErrorCode, hint: impl Into<String>, status: StatusCode) -> ApiError { ApiError { - code: code, + code, hint: Some(hint.into()), log: None, status: Some(status), diff --git a/taler-api/src/lib.rs b/taler-api/src/lib.rs @@ -326,7 +326,7 @@ pub fn wire_gateway_api<I: WireGatewayImpl + 'static>(wg: Arc<I>) -> Router { Json(WireConfig { name: "taler-wire-gateway", version: WIRE_GATEWAY_API_VERSION, - currency: &state.currency(), + currency: state.currency(), implementation: state.implementation(), }) .into_response() @@ -457,7 +457,7 @@ impl WireGatewayImpl for SampleImpl { } async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { - Ok(db::transfer_by_id(&self.pool, id, &self.currency).await?) + db::transfer_by_id(&self.pool, id, &self.currency).await } async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { diff --git a/taler-api/src/notification.rs b/taler-api/src/notification.rs @@ -20,7 +20,7 @@ struct Listener<K: Eq + Hash + Clone, V> { impl<K: Eq + Hash + Clone, V> Listener<K, V> { pub async fn wait_for(mut self, filter: impl Fn(&V) -> bool) { self.channel - .wait_for(|it| it.as_ref().map(|value| filter(value)).unwrap_or(false)) + .wait_for(|it| it.as_ref().map(&filter).unwrap_or(false)) .await .expect("WTF"); } diff --git a/taler-common/src/api_wire.rs b/taler-common/src/api_wire.rs @@ -182,10 +182,10 @@ impl PageParams { } } - return Ok(Page { + Ok(Page { limit: limit, offset: self.offset, - }); + }) } } @@ -197,7 +197,7 @@ pub struct Page { impl Page { pub fn backward(&self) -> bool { - return self.limit < 0; + self.limit < 0 } } @@ -212,10 +212,10 @@ pub struct HistoryParams { impl HistoryParams { pub fn check(self, max_page_size: i64, max_timeout_ms: u64) -> Result<History, ParamsError> { let timeout_ms = self.timeout_ms.map(|it| it.min(max_timeout_ms)); - return Ok(History { + Ok(History { page: self.pagination.check(max_page_size)?, timeout_ms, - }); + }) } } diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs @@ -28,10 +28,7 @@ use common::{ }; use listenfd::ListenFd; use sqlx::Row; -use sqlx::{ - postgres::{PgListener, PgRow}, - PgPool, QueryBuilder, -}; +use sqlx::{postgres::PgListener, PgPool, QueryBuilder}; use std::{ net::SocketAddr, path::PathBuf, @@ -48,7 +45,7 @@ use taler_api::{ wire_gateway_api, WireGatewayImpl, }; use taler_common::{ - api_common::{Amount, SafeU64, Timestamp}, + api_common::Timestamp, api_wire::{ AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, History, IncomingBankTransaction, IncomingHistory, OutgoingBankTransaction, OutgoingHistory, Page, @@ -58,13 +55,6 @@ use taler_common::{ }; use tokio::time::sleep; -pub fn sql_amount(row: &PgRow, idx: usize) -> sqlx::Result<Amount> { - row.try_get_map(idx, |raw: &str| Amount::from_str(raw)) -} -pub fn sql_safe_i32(row: &PgRow, idx: usize) -> sqlx::Result<SafeU64> { - row.try_get_map(idx, |raw: i32| SafeU64::try_from(raw)) -} - struct ServerState { pool: PgPool, payto: Url, @@ -87,22 +77,22 @@ impl WireGatewayImpl for ServerState { } // Handle idempotence, check previous transaction with the same request_uid - let row = sqlx::query("SELECT amount, exchange_url, wtid, credit_acc, id, (extract(epoch from _date) * 1000000)::int8 FROM tx_out WHERE request_uid = $1").bind(req.request_uid.as_slice()) + let row = sqlx::query("SELECT (amount).val, (amount).frac, exchange_url, wtid, credit_acc, id, created FROM tx_out WHERE request_uid = $1").bind(req.request_uid.as_slice()) .fetch_optional(&self.pool) .await?; if let Some(r) = row { let prev = TransferRequest { request_uid: req.request_uid.clone(), - amount: sql_amount(&r, 0)?, - exchange_base_url: r.try_get_url(1)?, - wtid: r.try_get_base32(2)?, - credit_account: r.try_get_url(3)?, + amount: r.try_get_amount_i(0, self.currency())?, + exchange_base_url: r.try_get_url(2)?, + wtid: r.try_get_base32(3)?, + credit_account: r.try_get_url(4)?, }; if prev == req { // Idempotence return Ok(TransferResponse { - timestamp: r.try_get_timestamp(5)?, - row_id: sql_safe_i32(&r, 4)?, + row_id: r.try_get_safe_u64(5)?, + timestamp: r.try_get_timestamp(6)?, }); } else { return Err(failure( @@ -114,10 +104,12 @@ impl WireGatewayImpl for ServerState { let timestamp = Timestamp::now(); let mut tx = self.pool.begin().await?; - let row = sqlx::query( - "INSERT INTO tx_out (amount, wtid, debit_acc, credit_acc, exchange_url, request_uid) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id" + let r = sqlx::query( + "INSERT INTO tx_out (created, amount, wtid, debit_acc, credit_acc, exchange_url, request_uid) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8) RETURNING id" ) - .bind(req.amount.to_string()) + .bind(Timestamp::now().as_sql_micros()) + .bind(req.amount.value as i64) + .bind(req.amount.fraction as i32) .bind(req.wtid.as_slice()) .bind(self.payto.as_str()) .bind(req.credit_account.as_str()) @@ -129,7 +121,7 @@ impl WireGatewayImpl for ServerState { tx.commit().await?; Ok(TransferResponse { timestamp, - row_id: sql_safe_i32(&row, 0)?, + row_id: r.try_get_safe_u64(0)?, }) } @@ -138,26 +130,26 @@ impl WireGatewayImpl for ServerState { _page: Page, _status: Option<TransferState>, ) -> ApiResult<TransferList> { - unimplemented!("depolymerization does not supports trnsfer details API") + unimplemented!("depolymerization does not supports transfer details API") } async fn transfer_by_id(&self, _id: u64) -> ApiResult<Option<TransferStatus>> { - unimplemented!("depolymerization does not supports trnsfer details API") + unimplemented!("depolymerization does not supports transfer details API") } async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { let outgoing_transactions = QueryBuilder::new( - "SELECT id, extract(epoch from _date)::int8 * 1000000, amount, wtid, credit_acc, exchange_url FROM tx_out WHERE", + "SELECT id, created, (amount).val, (amount).frac, wtid, credit_acc, exchange_url FROM tx_out WHERE", ).page("id", &params.page) .build() .try_map(|r| { Ok(OutgoingBankTransaction { - row_id: sql_safe_i32(&r, 0)?, + row_id: r.try_get_safe_u64(0)?, date: r.try_get_timestamp(1)?, - amount: sql_amount(&r, 2)?, - wtid: r.try_get_base32(3)?, - credit_account: r.try_get_url(4)?, - exchange_base_url: r.try_get_url(5)?, + amount: r.try_get_amount_i(2, self.currency())?, + wtid: r.try_get_base32(4)?, + credit_account: r.try_get_url(5)?, + exchange_base_url: r.try_get_url(6)?, }) }) .fetch_all(&self.pool) @@ -169,16 +161,16 @@ impl WireGatewayImpl for ServerState { } async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { - let incoming_transactions = QueryBuilder::new("SELECT id, extract(epoch from _date)::int8 * 1000000, amount, reserve_pub, debit_acc FROM tx_in WHERE") + let incoming_transactions = QueryBuilder::new("SELECT id, received, (amount).val, (amount).frac, reserve_pub, debit_acc FROM tx_in WHERE") .page("id", &params.page) .build() .try_map(|r| { Ok(IncomingBankTransaction::IncomingReserveTransaction { - row_id: sql_safe_i32(&r, 0)?, + row_id: r.try_get_safe_u64(0)?, date: r.try_get_timestamp(1)?, - amount: sql_amount(&r, 2)?, - reserve_pub: r.try_get_base32(3)?, - debit_account: r.try_get_url(4)?, + amount: r.try_get_amount_i(2, self.currency())?, + reserve_pub: r.try_get_base32(4)?, + debit_account: r.try_get_url(5)?, }) }) .fetch_all(&self.pool) @@ -194,15 +186,17 @@ impl WireGatewayImpl for ServerState { req: AddIncomingRequest, ) -> ApiResult<AddIncomingResponse> { let timestamp = Timestamp::now(); - let row = sqlx::query("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES (now(), $1, $2, $3, $4) RETURNING id") - .bind(req.amount.to_string()) + let r = sqlx::query("INSERT INTO tx_in (received, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6) RETURNING id") + .bind(Timestamp::now().as_sql_micros()) + .bind(req.amount.value as i64) + .bind(req.amount.fraction as i32) .bind(req.reserve_pub.as_slice()) .bind(req.debit_account.as_str()) .bind("payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj") .fetch_one(&self.pool).await?; Ok(AddIncomingResponse { timestamp, - row_id: sql_safe_i32(&row, 0)?, + row_id: r.try_get_safe_u64(0)?, }) } @@ -327,24 +321,24 @@ fn check_payto(url: &Url, currency: Currency) -> bool { /// Check if an url is a valid bitcoin payto url fn check_pay_to_btc(url: &Url) -> bool { - return url.domain() == Some("bitcoin") + url.domain() == Some("bitcoin") && url.scheme() == "payto" && url.username() == "" && url.password().is_none() && url.query().is_none() && url.fragment().is_none() - && bitcoin::Address::from_str(url.path().trim_start_matches('/')).is_ok(); + && bitcoin::Address::from_str(url.path().trim_start_matches('/')).is_ok() } /// Check if an url is a valid ethereum payto url fn check_pay_to_eth(url: &Url) -> bool { - return url.domain() == Some("ethereum") + url.domain() == Some("ethereum") && url.scheme() == "payto" && url.username() == "" && url.password().is_none() && url.query().is_none() && url.fragment().is_none() - && ethereum_types::H160::from_str(url.path().trim_start_matches('/')).is_ok(); + && ethereum_types::H160::from_str(url.path().trim_start_matches('/')).is_ok() } /// Listen to backend status change