commit 0edd2ffc4c06ad18ea9dcfa2325a8ffb863474cb
parent 59faf13eaf2b552616c7aa1c69149c92506bc73d
Author: Antoine A <>
Date: Mon, 24 Jan 2022 14:04:17 +0100
Ergonomic sql extraction
Diffstat:
11 files changed, 146 insertions(+), 70 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -1488,6 +1488,7 @@ dependencies = [
"base32",
"flexi_logger",
"log",
+ "postgres",
"rust-ini",
"serde",
"serde_json",
diff --git a/README.md b/README.md
@@ -36,7 +36,7 @@ done.
[depolymerizer-___]
# Number of requests to serve before gateway shutdown (0 mean never)
HTTP_LIFETIME = 0
-# Number of done worker's loops before wire implementation shutdown (0 mean never)
+# Number of worker's loops before wire implementation shutdown (0 mean never)
WIRE_LIFETIME = 0
```
diff --git a/btc-wire/src/bin/btc-wire-utils.rs b/btc-wire/src/bin/btc-wire-utils.rs
@@ -115,9 +115,9 @@ impl App {
}
}
let mut wallet = BtcRpc::wallet(&self.config, name).unwrap();
- let addr = wallet
- .get_new_address()
- .expect(&format!("Failed to get wallet address {}", name));
+ let addr = wallet.get_new_address().unwrap_or_else(|_| {
+ panic!("Failed to get wallet address {}", name)
+ });
(wallet, addr)
}
diff --git a/btc-wire/src/loops/worker.rs b/btc-wire/src/loops/worker.rs
@@ -16,12 +16,11 @@
use std::{
collections::{HashMap, HashSet},
fmt::Write,
- str::FromStr,
sync::atomic::Ordering,
time::{Duration, SystemTime},
};
-use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, BlockHash, Txid};
+use bitcoin::{hashes::Hash, Amount as BtcAmount, BlockHash, Txid};
use btc_wire::{
rpc::{self, BtcRpc, Category, ErrorCode},
rpc_utils::sender_address,
@@ -29,18 +28,19 @@ use btc_wire::{
};
use postgres::{fallible_iterator::FallibleIterator, Client};
use taler_common::{
- api_common::{base32, Amount},
+ api_common::base32,
config::Config,
log::log::{error, info, warn},
+ sql::{sql_array, sql_url},
};
-use url::Url;
use crate::{
fail_point::fail_point,
info::{decode_info, encode_info, Info},
reconnect::{AutoReconnectRPC, AutoReconnectSql},
+ sql::{sql_addr, sql_btc_amount, sql_txid},
status::{BounceStatus, TxStatus},
- taler_util::{btc_payto_addr, btc_payto_url, btc_to_taler, taler_to_btc},
+ taler_util::{btc_payto_url, btc_to_taler},
WireState,
};
@@ -143,14 +143,11 @@ fn send(
)?;
if let Some(row) = &row {
let id: i32 = row.get(0);
- let amount = taler_to_btc(&Amount::from_str(row.get(1))?)?;
- let wtid: &[u8] = row.get(2);
- let addr: Address = btc_payto_addr(&Url::parse(row.get(3))?)?;
- let exchange_base_url: Url = Url::parse(row.get(4))?;
- let info = Info::Transaction {
- wtid: wtid.try_into()?,
- url: exchange_base_url,
- };
+ let amount = sql_btc_amount(row, 1);
+ let wtid: [u8; 32] = sql_array(row, 2);
+ let addr = sql_addr(row, 3);
+ let url = sql_url(row, 4);
+ let info = Info::Transaction { wtid, url };
let metadata = encode_info(&info);
match rpc.send_op_return(&addr, &amount, &metadata, false, true) {
@@ -161,7 +158,7 @@ fn send(
&[&(TxStatus::Sent as i16), &tx_id.as_ref(), &id],
)?;
let amount = btc_to_taler(&amount.to_signed().unwrap());
- info!(">> {} {} in {} to {}", amount, base32(wtid), tx_id, addr);
+ info!(">> {} {} in {} to {}", amount, base32(&wtid), tx_id, addr);
}
Err(e) => {
db.execute(
@@ -190,7 +187,7 @@ fn bounce(
)?;
if let Some(row) = &row {
let id: i32 = row.get(0);
- let bounced: Txid = Txid::from_slice(row.get(1))?;
+ let bounced: Txid = sql_txid(row, 1);
let info = Info::Bounce { bounced };
let metadata = encode_info(&info);
@@ -357,8 +354,7 @@ fn sync_chain_removed(
"SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL",
&[&id.as_ref()],
)? {
- let txid = Txid::from_slice(row.get(0)).unwrap();
- blocking_bounce.push((txid, id));
+ blocking_bounce.push((sql_txid(&row, 0), id));
} else {
// Remove transaction from bounce table
db.execute("DELETE FROM bounce WHERE bounced=$1", &[&id.as_ref()])?;
@@ -452,7 +448,7 @@ fn sync_chain_outgoing(
}
TxStatus::Sent => {
if let Some(txid) = full.replaces_txid {
- let stored_id: Txid = Txid::from_slice(row.get(2)).unwrap();
+ let stored_id = sql_txid(&row, 2);
if txid == stored_id {
let nb_row = db.execute(
"UPDATE tx_out SET txid=$1 WHERE txid=$2",
@@ -496,7 +492,7 @@ fn sync_chain_outgoing(
.unwrap()
.as_secs();
if now - full.time > delay as u64 {
- let bump = rpc.bump_fee(&id)?;
+ let bump = rpc.bump_fee(id)?;
fail_point("(injected) fail bump", 0.3)?;
db.execute(
"UPDATE tx_out SET txid=$1 WHERE txid=$2",
diff --git a/btc-wire/src/main.rs b/btc-wire/src/main.rs
@@ -31,6 +31,7 @@ mod loops;
mod reconnect;
mod status;
mod taler_util;
+mod sql;
pub struct WireState {
confirmation: AtomicU16,
diff --git a/btc-wire/src/sql.rs b/btc-wire/src/sql.rs
@@ -0,0 +1,51 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2022 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use bitcoin::{hashes::Hash, Address, Amount as BtcAmount, Txid};
+use postgres::Row;
+use taler_common::sql::{sql_amount, sql_url};
+
+use crate::taler_util::{btc_payto_addr, taler_to_btc};
+
+pub fn sql_btc_amount(row: &Row, idx: usize) -> BtcAmount {
+ let amount = sql_amount(row, idx);
+ taler_to_btc(&amount).unwrap_or_else(|_| {
+ panic!(
+ "Database invariant: expected an bitcoin amount got {}",
+ amount
+ )
+ })
+}
+
+pub fn sql_addr(row: &Row, idx: usize) -> Address {
+ let url = sql_url(row, idx);
+ btc_payto_addr(&url).unwrap_or_else(|_| {
+ panic!(
+ "Database invariant: expected an bitcoin payto url got {}",
+ url
+ )
+ })
+}
+
+pub fn sql_txid(row: &Row, idx: usize) -> Txid {
+ let slice: &[u8] = row.get(idx);
+ Txid::from_slice(slice).unwrap_or_else(|_| {
+ panic!(
+ "Database invariant: expected a transaction if got an array of {}B",
+ slice.len()
+ )
+ })
+}
diff --git a/taler-common/Cargo.toml b/taler-common/Cargo.toml
@@ -28,3 +28,5 @@ flexi_logger = { version = "0.22.2", default-features = false, features = [
] }
# Local timz
time = { version = "0.3.5", features = ["formatting", "macros"] }
+# Postgres client
+postgres = "0.19.2"
diff --git a/taler-common/src/lib.rs b/taler-common/src/lib.rs
@@ -20,3 +20,4 @@ pub mod api_wire;
pub mod config;
pub mod error_codes;
pub mod log;
+pub mod sql;
+\ No newline at end of file
diff --git a/taler-common/src/sql.rs b/taler-common/src/sql.rs
@@ -0,0 +1,49 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2022 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use std::str::FromStr;
+
+use postgres::Row;
+use url::Url;
+
+use crate::api_common::{Amount, SafeUint64};
+
+pub fn sql_url(row: &Row, idx: usize) -> Url {
+ let str: &str = row.get(idx);
+ Url::from_str(str).unwrap_or_else(|_| panic!("Database invariant: expected an url got {}", str))
+}
+
+pub fn sql_amount(row: &Row, idx: usize) -> Amount {
+ let str: &str = row.get(idx);
+ Amount::from_str(str)
+ .unwrap_or_else(|_| panic!("Database invariant: expected an amount got {}", str))
+}
+
+pub fn sql_array<const N: usize>(row: &Row, idx: usize) -> [u8; N] {
+ let slice: &[u8] = row.get(idx);
+ slice.try_into().unwrap_or_else(|_| {
+ panic!(
+ "Database invariant: expected an byte array of {}B for {}B",
+ N,
+ slice.len()
+ )
+ })
+}
+
+pub fn sql_safe_u64(row: &Row, idx: usize) -> SafeUint64 {
+ let id: i32 = row.get(idx);
+ SafeUint64::try_from(id as u64).unwrap()
+}
diff --git a/test/conf/bitcoin.conf b/test/conf/bitcoin.conf
@@ -1,6 +1,6 @@
regtest=1
txindex=1
-maxtxfee=0.00002000
+maxtxfee=0.01
fallbackfee=0.00000001
[regtest]
diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs
@@ -30,13 +30,14 @@ use std::{
time::{Duration, Instant},
};
use taler_common::{
- api_common::{Amount, SafeUint64, ShortHashCode, Timestamp},
+ api_common::{ShortHashCode, Timestamp},
api_wire::{
HistoryParams, IncomingBankTransaction, IncomingHistory, OutgoingBankTransaction,
OutgoingHistory, TransferRequest, TransferResponse,
},
error_codes::ErrorCode,
log::log::{error, info, log, Level},
+ sql::{sql_amount, sql_array, sql_safe_u64, sql_url},
url::Url,
};
use tokio::sync::Notify;
@@ -292,14 +293,10 @@ async fn router(
if let Some(row) = row {
let prev = TransferRequest {
request_uid: request.request_uid.clone(),
- amount: Amount::from_str(row.get(0)).unwrap(),
- exchange_base_url: Url::parse(row.get(1)).unwrap(),
- wtid: {
- let slice: &[u8] = row.get(2);
- let array: [u8; 32] = slice.try_into().unwrap();
- ShortHashCode::from(array)
- },
- credit_account: Url::parse(row.get(3)).unwrap(),
+ amount: sql_amount(&row, 0),
+ exchange_base_url: sql_url(&row, 1),
+ wtid: ShortHashCode::from(sql_array(&row, 2)),
+ credit_account: sql_url(&row, 3),
};
if prev == request {
// Idempotence
@@ -308,10 +305,7 @@ async fn router(
StatusCode::OK,
&TransferResponse {
timestamp: Timestamp::Time(row.get(5)),
- row_id: {
- let id: i32 = row.get(4);
- SafeUint64::try_from(id as u64).unwrap()
- },
+ row_id: sql_safe_u64(&row, 4),
},
)
.await
@@ -333,10 +327,7 @@ async fn router(
StatusCode::OK,
&TransferResponse {
timestamp,
- row_id: {
- let id: i32 = row.get(0);
- SafeUint64::try_from(id as u64).unwrap()
- },
+ row_id: sql_safe_u64(&row, 0),
},
)
.await
@@ -361,19 +352,12 @@ async fn router(
)?
.into_iter()
.map(|row| IncomingBankTransaction::IncomingReserveTransaction {
- row_id: {
- let id: i32 = row.get(0);
- SafeUint64::try_from(id as u64).unwrap()
- },
+ row_id: sql_safe_u64(&row, 0),
date: Timestamp::Time(row.get(1)),
- amount: Amount::from_str(row.get(2)).unwrap(),
- reserve_pub: {
- let slice: &[u8] = row.get(3);
- let array: [u8; 32] = slice.try_into().unwrap();
- ShortHashCode::from(array)
- },
- debit_account: Url::parse(row.get(4)).unwrap(),
- credit_account: Url::parse(row.get(5)).unwrap(),
+ amount: sql_amount(&row, 2),
+ reserve_pub: ShortHashCode::from(sql_array(&row, 3)),
+ debit_account: sql_url(&row, 4),
+ credit_account: sql_url(&row, 5),
})
.collect();
encode_body(
@@ -406,20 +390,13 @@ async fn router(
)?
.into_iter()
.map(|row| OutgoingBankTransaction {
- row_id: {
- let id: i32 = row.get(0);
- SafeUint64::try_from(id as u64).unwrap()
- },
+ row_id: sql_safe_u64(&row, 0),
date: Timestamp::Time(row.get(1)),
- amount: Amount::from_str(row.get(2)).unwrap(),
- wtid: {
- let slice : &[u8] = row.get(3);
- let array: [u8; 32] = slice.try_into().unwrap();
- ShortHashCode::from(array)
- },
- debit_account: Url::parse(row.get(4)).unwrap(),
- credit_account: Url::parse(row.get(5)).unwrap(),
- exchange_base_url: Url::parse(row.get(6)).unwrap(),
+ amount: sql_amount(&row, 2),
+ wtid: ShortHashCode::from(sql_array(&row, 3)),
+ debit_account: sql_url(&row, 4),
+ credit_account: sql_url(&row, 5),
+ exchange_base_url:sql_url(&row, 6),
})
.collect();
encode_body(
@@ -454,10 +431,7 @@ async fn router(
StatusCode::OK,
&TransferResponse {
timestamp,
- row_id: {
- let id: i32 = row.get(0);
- SafeUint64::try_from(id as u64).unwrap()
- },
+ row_id: sql_safe_u64(&row, 0),
},
)
.await