taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit 3da77e2db1959363a8251859b31c0c4cde7a1cbd
parent 17f8dea6a098ca7bffb8813b0802f555680da502
Author: Antoine A <>
Date:   Wed, 11 Mar 2026 10:10:17 +0100

common: improve db utils

Diffstat:
Mcommon/taler-api/src/db.rs | 60++++++++++++------------------------------------------------
Mcommon/taler-api/src/notification.rs | 6+++++-
Mcommon/taler-api/tests/common/db.rs | 43++++++++++++++++++++++---------------------
Mcommon/taler-common/src/api_common.rs | 23+++++++++++++++++++++++
Mcommon/taler-common/src/api_wire.rs | 12+++++++++++-
Mcommon/taler-common/src/types/amount.rs | 55+++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcommon/taler-common/src/types/base32.rs | 22++++++++++++++++++++++
Mtaler-cyclos/src/bin/cyclos-harness.rs | 3++-
Mtaler-cyclos/src/db.rs | 122+++++++++++++++++++++++++++++++++++++++----------------------------------------
Mtaler-magnet-bank/src/bin/magnet-bank-harness.rs | 3++-
Mtaler-magnet-bank/src/db.rs | 132+++++++++++++++++++++++++++++++++++++++----------------------------------------
11 files changed, 279 insertions(+), 202 deletions(-)

diff --git a/common/taler-api/src/db.rs b/common/taler-api/src/db.rs @@ -27,11 +27,10 @@ use sqlx::{ }; use sqlx::{Postgres, Row}; use taler_common::{ - api_common::{EddsaPublicKey, SafeU64}, + api_common::SafeU64, api_params::{History, Page}, types::{ amount::{Amount, Currency, Decimal}, - base32::Base32, iban::IBAN, payto::PaytoURI, utils::date_to_utc_timestamp, @@ -124,21 +123,11 @@ pub async fn history<'a, 'b, R: Send + Unpin>( /* ----- Bind ----- */ pub trait BindHelper { - fn bind_amount(self, amount: &Amount) -> Self; - fn bind_decimal(self, decimal: &Decimal) -> Self; fn bind_timestamp(self, timestamp: &Timestamp) -> Self; fn bind_date(self, date: &Date) -> Self; } impl<'q> BindHelper for Query<'q, Postgres, <Postgres as sqlx::Database>::Arguments<'q>> { - fn bind_amount(self, amount: &Amount) -> Self { - self.bind_decimal(&amount.decimal()) - } - - fn bind_decimal(self, decimal: &Decimal) -> Self { - self.bind(decimal.val as i64).bind(decimal.frac as i32) - } - fn bind_timestamp(self, timestamp: &Timestamp) -> Self { self.bind(timestamp.as_microsecond()) } @@ -188,15 +177,6 @@ pub trait TypeHelper { fn try_get_safeu64<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<SafeU64> { self.try_get_map(index, |signed: i64| SafeU64::try_from(signed)) } - fn try_get_base32<I: sqlx::ColumnIndex<Self>, const L: usize>( - &self, - index: I, - ) -> sqlx::Result<Base32<L>> { - self.try_get_map(index, |slice: &[u8]| Base32::try_from(slice)) - } - fn try_get_eddsa<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<EddsaPublicKey> { - self.try_get_map(index, |slice: &[u8]| EddsaPublicKey::try_from(slice)) - } fn try_get_url<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<Url> { self.try_get_parse(index) } @@ -206,17 +186,11 @@ pub trait TypeHelper { fn try_get_iban<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<IBAN> { self.try_get_parse(index) } - fn try_get_decimal<I: sqlx::ColumnIndex<Self>>( + fn try_get_amount<I: sqlx::ColumnIndex<Self>>( &self, - val: I, - frac: I, - ) -> sqlx::Result<Decimal> { - let val = self.try_get_u64(val)?; - let frac = self.try_get_u32(frac)?; - Ok(Decimal::new(val, frac)) - } - fn try_get_amount(&self, index: &str, currency: &Currency) -> sqlx::Result<Amount>; - fn try_get_amount_i(&self, index: usize, currency: &Currency) -> sqlx::Result<Amount>; + index: I, + currency: &Currency, + ) -> sqlx::Result<Amount>; /** Flag consider NULL and false to be the same */ fn try_get_flag<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<bool>; @@ -249,23 +223,13 @@ impl TypeHelper for PgRow { self.try_get_map(index, |s: &str| s.parse()) } - fn try_get_amount(&self, index: &str, currency: &Currency) -> sqlx::Result<Amount> { - let val_idx = format!("{index}_val"); - let frac_idx = format!("{index}_frac"); - let val_idx = val_idx.as_str(); - let frac_idx = frac_idx.as_str(); - - Ok(Amount::new_decimal( - currency, - self.try_get_decimal(val_idx, frac_idx)?, - )) - } - - fn try_get_amount_i(&self, index: usize, currency: &Currency) -> sqlx::Result<Amount> { - Ok(Amount::new_decimal( - currency, - self.try_get_decimal(index, index + 1)?, - )) + fn try_get_amount<I: sqlx::ColumnIndex<Self>>( + &self, + index: I, + currency: &Currency, + ) -> sqlx::Result<Amount> { + let decimal: Decimal = self.try_get(index)?; + Ok(Amount::new_decimal(currency, decimal)) } fn try_get_flag<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<bool> { diff --git a/common/taler-api/src/notification.rs b/common/taler-api/src/notification.rs @@ -18,7 +18,7 @@ use std::hash::Hash; use std::sync::Arc; use dashmap::DashMap; -use tokio::sync::watch; +use tokio::sync::watch::{self, Receiver}; pub mod de; @@ -97,6 +97,10 @@ impl<K: Eq + Hash + Clone, V> NotificationChannel<K, V> { } } +pub fn dummy_listen<T: Default>() -> Receiver<T> { + tokio::sync::watch::channel(T::default()).1 +} + #[tokio::test] async fn channel_gc() { use std::time::Duration; diff --git a/common/taler-api/tests/common/db.rs b/common/taler-api/tests/common/db.rs @@ -57,10 +57,10 @@ pub async fn transfer(db: &PgPool, transfer: TransferRequest) -> sqlx::Result<Tr sqlx::query( " SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at - FROM taler_transfer(($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8) + FROM taler_transfer($1, $2, $3, $4, $5, $6, $7) ", ) - .bind_amount(&transfer.amount) + .bind(&transfer.amount) .bind(transfer.exchange_base_url.as_str()) .bind(format!("{} {}", transfer.wtid, transfer.exchange_base_url)) .bind(transfer.credit_account.raw()) @@ -99,8 +99,7 @@ pub async fn transfer_page( SELECT transfer_id, status, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, credit_payto, created_at FROM transfer WHERE @@ -134,8 +133,7 @@ pub async fn transfer_by_id( SELECT status, status_msg, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, exchange_base_url, wtid, credit_payto, @@ -150,7 +148,7 @@ pub async fn transfer_by_id( status_msg: r.try_get("status_msg")?, amount: r.try_get_amount("amount", currency)?, origin_exchange_url: r.try_get("exchange_base_url")?, - wtid: r.try_get_base32("wtid")?, + wtid: r.try_get("wtid")?, credit_account: r.try_get_payto("credit_payto")?, timestamp: r.try_get_timestamp("created_at")?.into(), }) @@ -175,8 +173,7 @@ pub async fn outgoing_revenue( " SELECT transfer_id, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, exchange_base_url, wtid, credit_payto, @@ -188,7 +185,8 @@ pub async fn outgoing_revenue( |r| { Ok(OutgoingBankTransaction { amount: r.try_get_amount("amount", currency)?, - wtid: r.try_get_base32("wtid")?, + debit_fee: None, + wtid: r.try_get("wtid")?, credit_account: r.try_get_payto("credit_payto")?, row_id: r.try_get_safeu64("transfer_id")?, date: r.try_get_timestamp("created_at")?.into(), @@ -216,10 +214,10 @@ pub async fn add_incoming( sqlx::query( " SELECT out_reserve_pub_reuse, out_tx_row_id, out_created_at - FROM add_incoming(($1, $2)::taler_amount, $3, $4, $5, $6, $7) + FROM add_incoming($1, $2, $3, $4, $5, $6) ", ) - .bind_amount(amount) + .bind(amount) .bind(subject) .bind(debit_account.raw()) .bind(kind) @@ -256,8 +254,7 @@ pub async fn incoming_history( SELECT type, tx_in_id, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, created_at, debit_payto, metadata, @@ -267,21 +264,26 @@ pub async fn incoming_history( ) }, |r: PgRow| { - let kind: IncomingType = r.try_get("type")?; - Ok(match kind { + Ok(match r.try_get("type")? { IncomingType::reserve => IncomingBankTransaction::Reserve { row_id: r.try_get_safeu64("tx_in_id")?, date: r.try_get_timestamp("created_at")?.into(), amount: r.try_get_amount("amount", currency)?, + credit_fee: None, debit_account: r.try_get_payto("debit_payto")?, - reserve_pub: r.try_get_eddsa("metadata")?, + reserve_pub: r.try_get("metadata")?, + authorization_pub: None, + authorization_sig: None, }, IncomingType::kyc => IncomingBankTransaction::Kyc { row_id: r.try_get_safeu64("tx_in_id")?, date: r.try_get_timestamp("created_at")?.into(), amount: r.try_get_amount("amount", currency)?, + credit_fee: None, debit_account: r.try_get_payto("debit_payto")?, - account_pub: r.try_get_eddsa("metadata")?, + account_pub: r.try_get("metadata")?, + authorization_pub: None, + authorization_sig: None, }, IncomingType::wad => IncomingBankTransaction::Wad { row_id: r.try_get_safeu64("tx_in_id")?, @@ -289,7 +291,7 @@ pub async fn incoming_history( amount: r.try_get_amount("amount", currency)?, debit_account: r.try_get_payto("debit_payto")?, origin_exchange_url: r.try_get_url("origin_exchange_url")?, - wad_id: r.try_get_base32("metadata")?, + wad_id: r.try_get("metadata")?, }, }) }, @@ -313,8 +315,7 @@ pub async fn revenue_history( " SELECT tx_in_id, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, created_at, debit_payto, subject diff --git a/common/taler-common/src/api_common.rs b/common/taler-common/src/api_common.rs @@ -118,6 +118,7 @@ pub type HashCode = Base32<64>; /// 32-bytes hash code pub type ShortHashCode = Base32<32>; pub type WadId = Base32<24>; +pub type EddsaSignature = Base32<64>; /// EdDSA and ECDHE public keys always point on Curve25519 /// and represented using the standard 256 bits Ed25519 compact format, @@ -202,3 +203,25 @@ impl EddsaPublicKey { Self(Base32::from(bytes)) } } + +impl sqlx::Type<sqlx::Postgres> for EddsaPublicKey { + fn type_info() -> sqlx::postgres::PgTypeInfo { + <Base32<32>>::type_info() + } +} + +impl<'q> sqlx::Encode<'q, sqlx::Postgres> for EddsaPublicKey { + fn encode_by_ref( + &self, + buf: &mut sqlx::postgres::PgArgumentBuffer, + ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> { + self.0.encode_by_ref(buf) + } +} + +impl<'r> sqlx::Decode<'r, sqlx::Postgres> for EddsaPublicKey { + fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> { + let raw = <Base32<32>>::decode(value)?; + Ok(Self(raw)) + } +} diff --git a/common/taler-common/src/api_wire.rs b/common/taler-common/src/api_wire.rs @@ -18,7 +18,10 @@ use url::Url; -use crate::types::{amount::Amount, payto::PaytoURI, timestamp::TalerTimestamp}; +use crate::{ + api_common::EddsaSignature, + types::{amount::Amount, payto::PaytoURI, timestamp::TalerTimestamp}, +}; use super::api_common::{EddsaPublicKey, HashCode, SafeU64, ShortHashCode, WadId}; use serde::{Deserialize, Serialize}; @@ -92,6 +95,7 @@ pub struct OutgoingBankTransaction { pub row_id: SafeU64, pub date: TalerTimestamp, pub amount: Amount, + pub debit_fee: Option<Amount>, pub credit_account: PaytoURI, pub wtid: ShortHashCode, pub exchange_base_url: Url, @@ -113,8 +117,11 @@ pub enum IncomingBankTransaction { row_id: SafeU64, date: TalerTimestamp, amount: Amount, + credit_fee: Option<Amount>, debit_account: PaytoURI, reserve_pub: EddsaPublicKey, + authorization_pub: Option<EddsaPublicKey>, + authorization_sig: Option<EddsaSignature>, }, #[serde(rename = "WAD")] Wad { @@ -130,8 +137,11 @@ pub enum IncomingBankTransaction { row_id: SafeU64, date: TalerTimestamp, amount: Amount, + credit_fee: Option<Amount>, debit_account: PaytoURI, account_pub: EddsaPublicKey, + authorization_pub: Option<EddsaPublicKey>, + authorization_sig: Option<EddsaSignature>, }, } diff --git a/common/taler-common/src/types/amount.rs b/common/taler-common/src/types/amount.rs @@ -98,6 +98,13 @@ impl Display for Currency { } } +#[derive(sqlx::Type)] +#[sqlx(type_name = "taler_amount")] +struct PgTalerAmount { + pub val: i64, + pub frac: i32, +} + #[derive( Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay, )] @@ -151,6 +158,10 @@ impl Decimal { self.frac = self.frac.checked_sub(rhs.frac)?; self.normalize() } + + pub fn to_amount(self, currency: &Currency) -> Amount { + Amount::new_decimal(currency, self) + } } #[derive(Debug, thiserror::Error)] @@ -219,6 +230,35 @@ impl Display for Decimal { } } +impl sqlx::Type<sqlx::Postgres> for Decimal { + fn type_info() -> sqlx::postgres::PgTypeInfo { + PgTalerAmount::type_info() + } +} + +impl<'q> sqlx::Encode<'q, sqlx::Postgres> for Decimal { + fn encode_by_ref( + &self, + buf: &mut sqlx::postgres::PgArgumentBuffer, + ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> { + PgTalerAmount { + val: self.val as i64, + frac: self.frac as i32, + } + .encode_by_ref(buf) + } +} + +impl<'r> sqlx::Decode<'r, sqlx::Postgres> for Decimal { + fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> { + let pg = PgTalerAmount::decode(value)?; + Ok(Self { + val: pg.val as u64, + frac: pg.frac as u32, + }) + } +} + #[track_caller] pub fn decimal(decimal: impl AsRef<str>) -> Decimal { decimal.as_ref().parse().expect("Invalid decimal constant") @@ -326,6 +366,21 @@ impl Display for Amount { } } +impl sqlx::Type<sqlx::Postgres> for Amount { + fn type_info() -> sqlx::postgres::PgTypeInfo { + PgTalerAmount::type_info() + } +} + +impl<'q> sqlx::Encode<'q, sqlx::Postgres> for Amount { + fn encode_by_ref( + &self, + buf: &mut sqlx::postgres::PgArgumentBuffer, + ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> { + self.decimal().encode_by_ref(buf) + } +} + #[test] fn test_amount_parse() { const TALER_AMOUNT_FRAC_BASE: u32 = 100000000; diff --git a/common/taler-common/src/types/base32.rs b/common/taler-common/src/types/base32.rs @@ -253,3 +253,25 @@ impl<'a, const L: usize> TryFrom<&'a [u8]> for Base32<L> { )) } } + +impl<const L: usize> sqlx::Type<sqlx::Postgres> for Base32<L> { + fn type_info() -> sqlx::postgres::PgTypeInfo { + <&[u8]>::type_info() + } +} + +impl<'q, const L: usize> sqlx::Encode<'q, sqlx::Postgres> for Base32<L> { + fn encode_by_ref( + &self, + buf: &mut sqlx::postgres::PgArgumentBuffer, + ) -> Result<sqlx::encode::IsNull, sqlx::error::BoxDynError> { + self.0.encode_by_ref(buf) + } +} + +impl<'r, const L: usize> sqlx::Decode<'r, sqlx::Postgres> for Base32<L> { + fn decode(value: sqlx::postgres::PgValueRef<'r>) -> Result<Self, sqlx::error::BoxDynError> { + let array = <[u8; L]>::decode(value)?; + Ok(Self(array)) + } +} diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs @@ -22,6 +22,7 @@ use failure_injection::{InjectedErr, set_failure_scenario}; use jiff::Timestamp; use owo_colors::OwoColorize as _; use sqlx::{PgPool, Row as _, postgres::PgRow}; +use taler_api::notification::dummy_listen; use taler_build::long_version; use taler_common::{ CommonArgs, @@ -168,7 +169,7 @@ impl<'a> Harness<'a> { }, &self.currency, &self.root, - || tokio::sync::watch::channel(0).1, + dummy_listen, ) .await .unwrap(); diff --git a/taler-cyclos/src/db.rs b/taler-cyclos/src/db.rs @@ -211,10 +211,10 @@ pub async fn register_tx_in_admin( sqlx::query( " SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new - FROM register_tx_in(NULL, NULL, ($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8, $6) + FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5) ", ) - .bind_decimal(&tx.amount) + .bind(tx.amount) .bind(&tx.subject) .bind(tx.debtor_id) .bind(&tx.debtor_name) @@ -245,12 +245,12 @@ pub async fn register_tx_in( sqlx::query( " SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new - FROM register_tx_in($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8, $9, $10, $11) + FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ", ) .bind(tx.transfer_id) .bind(tx.tx_id) - .bind_decimal(&tx.amount) + .bind(tx.amount) .bind(&tx.subject) .bind(tx.debtor_id) .bind(&tx.debtor_name) @@ -307,12 +307,12 @@ pub async fn register_tx_out( let query = sqlx::query( " SELECT out_result, out_tx_row_id - FROM register_tx_out($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8, $9, $10, $11, $12) + FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) ", ) .bind(tx.transfer_id) .bind(tx.tx_id) - .bind_decimal(&tx.amount) + .bind(tx.amount) .bind(&tx.subject) .bind(tx.creditor_id) .bind(&tx.creditor_name) @@ -366,13 +366,13 @@ pub async fn make_transfer<'a>( sqlx::query( " SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at - FROM taler_transfer($1, $2, $3, ($4, $5)::taler_amount, $6, $7, $8, $9) + FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8) ", ) .bind(tx.request_uid.as_ref()) .bind(tx.wtid.as_ref()) .bind(&subject) - .bind_decimal(&tx.amount) + .bind(tx.amount) .bind(tx.exchange_base_url.as_str()) .bind(tx.creditor_id) .bind(&tx.creditor_name) @@ -409,12 +409,12 @@ pub async fn register_bounced_tx_in( sqlx::query( " SELECT out_tx_row_id, out_tx_new - FROM register_bounced_tx_in($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8, $9, $10, $11) + FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ", ) .bind(tx.transfer_id) .bind(tx.tx_id) - .bind_decimal(&tx.amount) + .bind(tx.amount) .bind(&tx.subject) .bind(tx.debtor_id) .bind(&tx.debtor_name) @@ -449,8 +449,7 @@ pub async fn transfer_page<'a>( SELECT initiated_id, status, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, credit_account, credit_name, initiated_at @@ -468,9 +467,9 @@ pub async fn transfer_page<'a>( Ok(TransferListStatus { row_id: r.try_get_safeu64(0)?, status: r.try_get(1)?, - amount: r.try_get_amount_i(2, currency)?, - credit_account: r.try_get_cyclos_fullpaytouri(4, 5, root)?, - timestamp: r.try_get_timestamp(6)?.into(), + amount: r.try_get_amount(2, currency)?, + credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, + timestamp: r.try_get_timestamp(5)?.into(), }) }, ) @@ -494,8 +493,7 @@ pub async fn outgoing_history( " SELECT tx_out_id, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, credit_account, credit_name, valued_at, @@ -510,11 +508,12 @@ pub async fn outgoing_history( |r: PgRow| { Ok(OutgoingBankTransaction { row_id: r.try_get_safeu64(0)?, - amount: r.try_get_amount_i(1, currency)?, - credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, - date: r.try_get_timestamp(5)?.into(), - exchange_base_url: r.try_get_url(6)?, - wtid: r.try_get_base32(7)?, + amount: r.try_get_amount(1, currency)?, + debit_fee: None, + credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?, + date: r.try_get_timestamp(4)?.into(), + exchange_base_url: r.try_get_url(5)?, + wtid: r.try_get(6)?, }) }, ) @@ -539,8 +538,7 @@ pub async fn incoming_history( SELECT type, tx_in_id, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, debit_account, debit_name, valued_at, @@ -555,17 +553,23 @@ pub async fn incoming_history( Ok(match r.try_get(0)? { IncomingType::reserve => IncomingBankTransaction::Reserve { row_id: r.try_get_safeu64(1)?, - amount: r.try_get_amount_i(2, currency)?, - debit_account: r.try_get_cyclos_fullpaytouri(4, 5, root)?, - date: r.try_get_timestamp(6)?.into(), - reserve_pub: r.try_get_eddsa(7)?, + amount: r.try_get_amount(2, currency)?, + credit_fee: None, + debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, + date: r.try_get_timestamp(5)?.into(), + reserve_pub: r.try_get(6)?, + authorization_pub: None, + authorization_sig: None, }, IncomingType::kyc => IncomingBankTransaction::Kyc { row_id: r.try_get_safeu64(1)?, - amount: r.try_get_amount_i(2, currency)?, - debit_account: r.try_get_cyclos_fullpaytouri(4, 5, root)?, - date: r.try_get_timestamp(6)?.into(), - account_pub: r.try_get_eddsa(7)?, + amount: r.try_get_amount(2, currency)?, + credit_fee: None, + debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, + date: r.try_get_timestamp(5)?.into(), + account_pub: r.try_get(6)?, + authorization_pub: None, + authorization_sig: None, }, IncomingType::wad => { unimplemented!("WAD is not yet supported") @@ -594,8 +598,7 @@ pub async fn revenue_history( SELECT tx_in_id, valued_at, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, debit_account, debit_name, subject @@ -608,10 +611,10 @@ pub async fn revenue_history( Ok(RevenueIncomingBankTransaction { row_id: r.try_get_safeu64(0)?, date: r.try_get_timestamp(1)?.into(), - amount: r.try_get_amount_i(2, currency)?, + amount: r.try_get_amount(2, currency)?, credit_fee: None, - debit_account: r.try_get_cyclos_fullpaytouri(4, 5, root)?, - subject: r.try_get(6)?, + debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, + subject: r.try_get(5)?, }) }, ) @@ -629,8 +632,7 @@ pub async fn transfer_by_id<'a>( SELECT status, status_msg, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, exchange_base_url, wtid, credit_account, @@ -646,11 +648,11 @@ pub async fn transfer_by_id<'a>( Ok(TransferStatus { status: r.try_get(0)?, status_msg: r.try_get(1)?, - amount: r.try_get_amount_i(2, currency)?, - origin_exchange_url: r.try_get(4)?, - wtid: r.try_get_base32(5)?, - credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?, - timestamp: r.try_get_timestamp(8)?.into(), + amount: r.try_get_amount(2, currency)?, + origin_exchange_url: r.try_get(3)?, + wtid: r.try_get(4)?, + credit_account: r.try_get_cyclos_fullpaytouri(5, 6, root)?, + timestamp: r.try_get_timestamp(7)?.into(), }) }) .fetch_optional(db) @@ -664,7 +666,7 @@ pub async fn pending_batch<'a>( ) -> sqlx::Result<Vec<Initiated>> { sqlx::query( " - SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name + SELECT initiated_id, amount, subject, credit_account, credit_name FROM initiated WHERE tx_id IS NULL AND status='pending' @@ -676,10 +678,10 @@ pub async fn pending_batch<'a>( .try_map(|r: PgRow| { Ok(Initiated { id: r.try_get(0)?, - amount: r.try_get_decimal(1, 2)?, - subject: r.try_get(3)?, - creditor_id: r.try_get(4)?, - creditor_name: r.try_get(5)?, + amount: r.try_get(1)?, + subject: r.try_get(2)?, + creditor_id: r.try_get(3)?, + creditor_name: r.try_get(4)?, }) }) .fetch_all(db) @@ -841,6 +843,7 @@ mod test { use sqlx::{PgConnection, PgPool, Row as _, postgres::PgRow}; use taler_api::{ db::TypeHelper, + notification::dummy_listen, subject::{IncomingSubject, OutgoingSubject}, }; use taler_common::{ @@ -853,7 +856,6 @@ mod test { utils::now_sql_stable_timestamp, }, }; - use tokio::sync::watch::Receiver; use crate::{ constants::CONFIG_SOURCE, @@ -866,10 +868,6 @@ mod test { pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "TEST".parse().unwrap()); pub const ROOT: CompactString = CompactString::const_new("localhost"); - fn fake_listen<T: Default>() -> Receiver<T> { - tokio::sync::watch::channel(T::default()).1 - } - async fn setup() -> (PgConnection, PgPool) { let pool = taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await; let conn = pool.acquire().await.unwrap().leak(); @@ -980,13 +978,13 @@ mod test { // Empty db assert_eq!( - db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, fake_listen) + db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) .await .unwrap(), Vec::new() ); assert_eq!( - db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, fake_listen) + db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) .await .unwrap(), Vec::new() @@ -1013,14 +1011,14 @@ mod test { // History assert_eq!( - db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, fake_listen) + db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) .await .unwrap() .len(), 6 ); assert_eq!( - db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, fake_listen) + db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) .await .unwrap() .len(), @@ -1034,7 +1032,7 @@ mod test { // Empty db assert_eq!( - db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, fake_listen) + db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) .await .unwrap(), Vec::new() @@ -1093,7 +1091,7 @@ mod test { // History assert_eq!( - db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, fake_listen) + db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) .await .unwrap() .len(), @@ -1195,7 +1193,7 @@ mod test { // Empty db assert_eq!( - db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, fake_listen) + db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) .await .unwrap(), Vec::new() @@ -1223,7 +1221,7 @@ mod test { // History assert_eq!( - db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, fake_listen) + db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) .await .unwrap() .len(), diff --git a/taler-magnet-bank/src/bin/magnet-bank-harness.rs b/taler-magnet-bank/src/bin/magnet-bank-harness.rs @@ -22,6 +22,7 @@ use failure_injection::{InjectedErr, set_failure_scenario}; use jiff::{Timestamp, Zoned}; use owo_colors::OwoColorize; use sqlx::PgPool; +use taler_api::notification::dummy_listen; use taler_build::long_version; use taler_common::{ CommonArgs, @@ -188,7 +189,7 @@ impl<'a> Harness<'a> { }, timeout_ms: None, }, - || tokio::sync::watch::channel(0).1, + dummy_listen, ) .await .unwrap(); diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs @@ -198,10 +198,10 @@ pub async fn register_tx_in_admin( sqlx::query( " SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new - FROM register_tx_in(NULL, ($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8, $6) + FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5) ", ) - .bind_amount(&tx.amount) + .bind(&tx.amount) .bind(&tx.subject) .bind(tx.debtor.iban()) .bind(&tx.debtor.name) @@ -232,11 +232,11 @@ pub async fn register_tx_in( sqlx::query( " SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new - FROM register_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10) + FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) ", ) .bind(tx.code as i64) - .bind_amount(&tx.amount) + .bind(&tx.amount) .bind(&tx.subject) .bind(tx.debtor.iban()) .bind(&tx.debtor.name) @@ -293,11 +293,11 @@ pub async fn register_tx_out( let query = sqlx::query( " SELECT out_result, out_tx_row_id - FROM register_tx_out($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10, $11) + FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ", ) .bind(tx.code as i64) - .bind_amount(&tx.amount) + .bind(&tx.amount) .bind(&tx.subject) .bind(tx.creditor.iban()) .bind(&tx.creditor.name) @@ -384,13 +384,13 @@ pub async fn make_transfer<'a>( sqlx::query( " SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at - FROM taler_transfer($1, $2, $3, ($4, $5)::taler_amount, $6, $7, $8, $9) + FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8) ", ) .bind(tx.request_uid.as_ref()) .bind(tx.wtid.as_ref()) .bind(&subject) - .bind_decimal(&tx.amount) + .bind(tx.amount) .bind(tx.exchange_base_url.as_str()) .bind(tx.creditor.iban()) .bind(&tx.creditor.name) @@ -429,16 +429,16 @@ pub async fn register_bounce_tx_in( sqlx::query( " SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new - FROM register_bounce_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, ($8, $9)::taler_amount, $10, $11) + FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) ", ) .bind(tx.code as i64) - .bind_amount(&tx.amount) + .bind(&tx.amount) .bind(&tx.subject) .bind(tx.debtor.iban()) .bind(&tx.debtor.name) - .bind_date(&tx.value_date) - .bind_amount(amount) + .bind_date(&tx.value_date) + .bind(amount) .bind(reason) .bind_timestamp(now) .try_map(|r: PgRow| { @@ -468,8 +468,7 @@ pub async fn transfer_page<'a>( SELECT initiated_id, status, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, credit_account, credit_name, initiated_at @@ -487,9 +486,9 @@ pub async fn transfer_page<'a>( Ok(TransferListStatus { row_id: r.try_get_safeu64(0)?, status: r.try_get(1)?, - amount: r.try_get_amount_i(2, &CURRENCY)?, - credit_account: r.try_get_iban(4)?.as_full_payto(r.try_get(5)?), - timestamp: r.try_get_timestamp(6)?.into(), + amount: r.try_get_amount(2, &CURRENCY)?, + credit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), + timestamp: r.try_get_timestamp(5)?.into(), }) }, ) @@ -511,8 +510,7 @@ pub async fn outgoing_history( " SELECT tx_out_id, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, credit_account, credit_name, valued_at, @@ -527,11 +525,12 @@ pub async fn outgoing_history( |r: PgRow| { Ok(OutgoingBankTransaction { row_id: r.try_get_safeu64(0)?, - amount: r.try_get_amount_i(1, &CURRENCY)?, - credit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), - date: r.try_get_timestamp(5)?.into(), - exchange_base_url: r.try_get_url(6)?, - wtid: r.try_get_base32(7)?, + amount: r.try_get_amount(1, &CURRENCY)?, + debit_fee: None, + credit_account: r.try_get_iban(2)?.as_full_payto(r.try_get(3)?), + date: r.try_get_timestamp(4)?.into(), + exchange_base_url: r.try_get_url(5)?, + wtid: r.try_get(6)?, }) }, ) @@ -554,8 +553,7 @@ pub async fn incoming_history( SELECT type, tx_in_id, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, debit_account, debit_name, valued_at, @@ -570,17 +568,23 @@ pub async fn incoming_history( Ok(match r.try_get(0)? { IncomingType::reserve => IncomingBankTransaction::Reserve { row_id: r.try_get_safeu64(1)?, - amount: r.try_get_amount_i(2, &CURRENCY)?, - debit_account: r.try_get_iban(4)?.as_full_payto(r.try_get(5)?), - date: r.try_get_timestamp(6)?.into(), - reserve_pub: r.try_get_eddsa(7)?, + amount: r.try_get_amount(2, &CURRENCY)?, + credit_fee: None, + debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), + date: r.try_get_timestamp(5)?.into(), + reserve_pub: r.try_get(6)?, + authorization_pub: None, + authorization_sig: None, }, IncomingType::kyc => IncomingBankTransaction::Kyc { row_id: r.try_get_safeu64(1)?, - amount: r.try_get_amount_i(2, &CURRENCY)?, - debit_account: r.try_get_iban(4)?.as_full_payto(r.try_get(5)?), - date: r.try_get_timestamp(6)?.into(), - account_pub: r.try_get_eddsa(7)?, + amount: r.try_get_amount(2, &CURRENCY)?, + credit_fee: None, + debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), + date: r.try_get_timestamp(5)?.into(), + account_pub: r.try_get(6)?, + authorization_pub: None, + authorization_sig: None, }, IncomingType::wad => { unimplemented!("WAD is not yet supported") @@ -607,8 +611,7 @@ pub async fn revenue_history( SELECT tx_in_id, valued_at, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, debit_account, debit_name, subject @@ -621,10 +624,10 @@ pub async fn revenue_history( Ok(RevenueIncomingBankTransaction { row_id: r.try_get_safeu64(0)?, date: r.try_get_timestamp(1)?.into(), - amount: r.try_get_amount_i(2, &CURRENCY)?, + amount: r.try_get_amount(2, &CURRENCY)?, credit_fee: None, - debit_account: r.try_get_iban(4)?.as_full_payto(r.try_get(5)?), - subject: r.try_get(6)?, + debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), + subject: r.try_get(5)?, }) }, ) @@ -640,8 +643,7 @@ pub async fn transfer_by_id<'a>( SELECT status, status_msg, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, exchange_base_url, wtid, credit_account, @@ -657,11 +659,11 @@ pub async fn transfer_by_id<'a>( Ok(TransferStatus { status: r.try_get(0)?, status_msg: r.try_get(1)?, - amount: r.try_get_amount_i(2, &CURRENCY)?, - origin_exchange_url: r.try_get(4)?, - wtid: r.try_get_base32(5)?, - credit_account: r.try_get_iban(6)?.as_full_payto(r.try_get(7)?), - timestamp: r.try_get_timestamp(8)?.into(), + amount: r.try_get_amount(2, &CURRENCY)?, + origin_exchange_url: r.try_get(3)?, + wtid: r.try_get(4)?, + credit_account: r.try_get_iban(5)?.as_full_payto(r.try_get(6)?), + timestamp: r.try_get_timestamp(7)?.into(), }) }) .fetch_optional(db) @@ -675,7 +677,7 @@ pub async fn pending_batch<'a>( ) -> sqlx::Result<Vec<Initiated>> { sqlx::query( " - SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name + SELECT initiated_id, amount, subject, credit_account, credit_name FROM initiated WHERE magnet_code IS NULL AND status='pending' @@ -687,9 +689,9 @@ pub async fn pending_batch<'a>( .try_map(|r: PgRow| { Ok(Initiated { id: r.try_get_u64(0)?, - amount: r.try_get_amount_i(1, &CURRENCY)?, - subject: r.try_get(3)?, - creditor: FullHuPayto::new(r.try_get_parse(4)?, r.try_get(5)?), + amount: r.try_get_amount(1, &CURRENCY)?, + subject: r.try_get(2)?, + creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), }) }) .fetch_all(db) @@ -703,7 +705,7 @@ pub async fn initiated_by_code<'a>( ) -> sqlx::Result<Option<Initiated>> { sqlx::query( " - SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name + SELECT initiated_id, amount, subject, credit_account, credit_name FROM initiated WHERE magnet_code IS $1 ", @@ -712,9 +714,9 @@ pub async fn initiated_by_code<'a>( .try_map(|r: PgRow| { Ok(Initiated { id: r.try_get_u64(0)?, - amount: r.try_get_amount_i(1, &CURRENCY)?, - subject: r.try_get(3)?, - creditor: FullHuPayto::new(r.try_get_parse(4)?, r.try_get(5)?), + amount: r.try_get_amount(1, &CURRENCY)?, + subject: r.try_get(2)?, + creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), }) }) .fetch_optional(db) @@ -808,6 +810,7 @@ mod test { use sqlx::{PgConnection, PgPool, postgres::PgRow}; use taler_api::{ db::TypeHelper, + notification::dummy_listen, subject::{IncomingSubject, OutgoingSubject}, }; use taler_common::{ @@ -819,7 +822,6 @@ mod test { utils::now_sql_stable_timestamp, }, }; - use tokio::sync::watch::Receiver; use crate::{ constants::CONFIG_SOURCE, @@ -834,10 +836,6 @@ mod test { use super::TxInAdmin; - fn fake_listen<T: Default>() -> Receiver<T> { - tokio::sync::watch::channel(T::default()).1 - } - async fn setup() -> (PgConnection, PgPool) { let pool = taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await; let conn = pool.acquire().await.unwrap().leak(); @@ -947,13 +945,13 @@ mod test { // Empty db assert_eq!( - db::revenue_history(&pool, &History::default(), fake_listen) + db::revenue_history(&pool, &History::default(), dummy_listen) .await .unwrap(), Vec::new() ); assert_eq!( - db::incoming_history(&pool, &History::default(), fake_listen) + db::incoming_history(&pool, &History::default(), dummy_listen) .await .unwrap(), Vec::new() @@ -980,14 +978,14 @@ mod test { // History assert_eq!( - db::revenue_history(&pool, &History::default(), fake_listen) + db::revenue_history(&pool, &History::default(), dummy_listen) .await .unwrap() .len(), 6 ); assert_eq!( - db::incoming_history(&pool, &History::default(), fake_listen) + db::incoming_history(&pool, &History::default(), dummy_listen) .await .unwrap() .len(), @@ -1001,7 +999,7 @@ mod test { // Empty db assert_eq!( - db::incoming_history(&pool, &History::default(), fake_listen) + db::incoming_history(&pool, &History::default(), dummy_listen) .await .unwrap(), Vec::new() @@ -1060,7 +1058,7 @@ mod test { // History assert_eq!( - db::incoming_history(&pool, &History::default(), fake_listen) + db::incoming_history(&pool, &History::default(), dummy_listen) .await .unwrap() .len(), @@ -1163,7 +1161,7 @@ mod test { // Empty db assert_eq!( - db::outgoing_history(&pool, &History::default(), fake_listen) + db::outgoing_history(&pool, &History::default(), dummy_listen) .await .unwrap(), Vec::new() @@ -1191,7 +1189,7 @@ mod test { // History assert_eq!( - db::outgoing_history(&pool, &History::default(), fake_listen) + db::outgoing_history(&pool, &History::default(), dummy_listen) .await .unwrap() .len(),