commit 40f293ffd868f5c004e91ea8a358427076a52f9c parent 1a97d3faa33f9afa737897bbca0ecb41ca42126f Author: Antoine A <> Date: Tue, 28 Jan 2025 09:14:29 +0100 common: rename crates Diffstat:
28 files changed, 1296 insertions(+), 1295 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock @@ -1506,7 +1506,7 @@ dependencies = [ "sqlx", "taler-api", "taler-common", - "test-utils", + "taler-test-utils", "thiserror 2.0.11", "tokio", "tracing", @@ -2556,7 +2556,7 @@ dependencies = [ "serde_path_to_error", "sqlx", "taler-common", - "test-utils", + "taler-test-utils", "thiserror 2.0.11", "tokio", "tracing", @@ -2592,21 +2592,7 @@ dependencies = [ ] [[package]] -name = "tempfile" -version = "3.15.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704" -dependencies = [ - "cfg-if", - "fastrand", - "getrandom", - "once_cell", - "rustix", - "windows-sys 0.59.0", -] - -[[package]] -name = "test-utils" +name = "taler-test-utils" version = "0.1.0" dependencies = [ "axum", @@ -2623,6 +2609,20 @@ dependencies = [ ] [[package]] +name = "tempfile" +version = "3.15.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704" +dependencies = [ + "cfg-if", + "fastrand", + "getrandom", + "once_cell", + "rustix", + "windows-sys 0.59.0", +] + +[[package]] name = "thiserror" version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/Cargo.toml b/Cargo.toml @@ -3,8 +3,8 @@ resolver = "2" members = [ "common/taler-api", "common/taler-common", - "common/test-utils", - "wire-gateway/magnet-bank", + "common/taler-test-utils", + "adapter/taler-magnet-bank-adapter", ] [profile.dev] @@ -28,5 +28,5 @@ jiff = { version = "0.1", default-features = false, features = ["tz-system"] } tempfile = "3.15" taler-common = { path = "common/taler-common" } taler-api = { path = "common/taler-api" } -test-utils = { path = "common/test-utils" } -anyhow = "1" +taler-test-utils = { path = "common/taler-test-utils" } +anyhow = "1" +\ No newline at end of file diff --git a/adapter/taler-magnet-bank-adapter/Cargo.toml b/adapter/taler-magnet-bank-adapter/Cargo.toml @@ -0,0 +1,40 @@ +[package] +name = "magnet-bank" +version = "0.1.0" +edition = "2021" + +[dependencies] +rand_core = { version = "*" } +reqwest = { version = "0.12", default-features = false, features = [ + "json", + "native-tls", +] } +hmac = "0.12" +sha1 = "0.10" +p256 = { version = "0.13.2", features = ["alloc", "ecdsa"] } +spki = "0.7.3" +base64 = "0.22" +form_urlencoded = "1.2" +percent-encoding = "2.3" +serde_urlencoded = "0.7.1" +passterm = "2.0" +sqlx = { workspace = true, features = [ + "postgres", + "runtime-tokio-native-tls", + "tls-native-tls", +] } +serde_json = { workspace = true, features = ["raw_value"] } +jiff = { workspace = true, features = ["serde"] } +taler-common.workspace = true +taler-api.workspace = true +clap.workspace = true +serde.workspace = true +serde_path_to_error.workspace = true +thiserror.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +tokio.workspace = true +anyhow.workspace = true + +[dev-dependencies] +taler-test-utils.workspace = true diff --git a/wire-gateway/magnet-bank/db/schema.sql b/adapter/taler-magnet-bank-adapter/db/schema.sql diff --git a/wire-gateway/magnet-bank/src/config.rs b/adapter/taler-magnet-bank-adapter/src/config.rs diff --git a/wire-gateway/magnet-bank/src/constant.rs b/adapter/taler-magnet-bank-adapter/src/constant.rs diff --git a/adapter/taler-magnet-bank-adapter/src/db.rs b/adapter/taler-magnet-bank-adapter/src/db.rs @@ -0,0 +1,1103 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::fmt::Display; + +use sqlx::{postgres::PgRow, PgConnection, PgExecutor, PgPool, QueryBuilder, Row}; +use taler_api::{ + db::{history, page, BindHelper, IncomingType, TypeHelper}, + subject::{IncomingSubject, OutgoingSubject}, +}; +use taler_common::{ + api_params::{History, Page}, + api_wire::{ + IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, + TransferState, TransferStatus, + }, + types::{amount::Amount, timestamp::Timestamp}, +}; +use tokio::sync::watch::{Receiver, Sender}; + +use crate::{constant::CURRENCY, MagnetPayto}; + +pub async fn notification_listener( + pool: PgPool, + in_channel: Sender<i64>, + taler_in_channel: Sender<i64>, + out_channel: Sender<i64>, + taler_out_channel: Sender<i64>, +) -> sqlx::Result<()> { + taler_api::notification::notification_listener!(&pool, + "tx_in" => (row_id: i64) { + in_channel.send_replace(row_id); + }, + "taler_in" => (row_id: i64) { + taler_in_channel.send_replace(row_id); + }, + "tx_out" => (row_id: i64) { + out_channel.send_replace(row_id); + }, + "taler_out" => (row_id: i64) { + taler_out_channel.send_replace(row_id); + } + ) +} + +#[derive(Debug, Clone)] +pub struct TxIn { + pub code: u64, + pub amount: Amount, + pub subject: String, + pub debtor: MagnetPayto, + pub timestamp: Timestamp, +} + +impl Display for TxIn { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let TxIn { + code, + amount, + subject, + debtor, + timestamp, + } = self; + write!(f, "{timestamp} {amount} {code} {debtor} '{subject}'") + } +} + +#[derive(Debug, Clone)] +pub struct TxOut { + pub code: u64, + pub amount: Amount, + pub subject: String, + pub creditor: MagnetPayto, + pub timestamp: Timestamp, +} + +impl Display for TxOut { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let TxOut { + code, + amount, + subject, + creditor, + timestamp, + } = self; + write!(f, "{timestamp} {amount} {code} {creditor} '{subject}'") + } +} + +#[derive(Debug, Clone)] +pub struct TxInAdmin { + pub amount: Amount, + pub subject: String, + pub debtor: MagnetPayto, + pub timestamp: Timestamp, + pub metadata: IncomingSubject, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct RegisteredTx { + pub new: bool, + pub row_id: u64, + pub timestamp: Timestamp, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum AddIncomingResult { + Success(RegisteredTx), + ReservePubReuse, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct Initiated { + pub id: u64, + pub amount: Amount, + pub subject: String, + pub creditor: MagnetPayto, +} + +impl Display for Initiated { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{} {} {} '{}'", + self.id, self.amount, self.creditor, self.subject + ) + } +} + +pub async fn db_init(db: &PgPool, reset: bool) -> sqlx::Result<()> { + let mut tx = db.begin().await?; + if reset { + sqlx::raw_sql("DROP SCHEMA public CASCADE;CREATE SCHEMA public;") + .execute(&mut *tx) + .await?; + } + // TODO migrations + sqlx::raw_sql(include_str!("../db/schema.sql")) + .execute(&mut *tx) + .await?; + tx.commit().await?; + Ok(()) +} + +pub async fn register_tx_in_admin(db: &PgPool, tx: &TxInAdmin) -> sqlx::Result<AddIncomingResult> { + sqlx::query( + " + SELECT out_reserve_pub_reuse, out_new, out_tx_row_id, out_timestamp + FROM register_tx_in(NULL, ($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8) + ", + ) + .bind_amount(&tx.amount) + .bind(&tx.subject) + .bind(&tx.debtor.number) + .bind(&tx.debtor.name) + .bind_timestamp(&tx.timestamp) + .bind(tx.metadata.ty()) + .bind(tx.metadata.key()) + .try_map(|r: PgRow| { + Ok(if r.try_get(0)? { + AddIncomingResult::ReservePubReuse + } else { + AddIncomingResult::Success(RegisteredTx { + new: r.try_get(1)?, + row_id: r.try_get_u64(2)?, + timestamp: r.try_get_timestamp(3)?, + }) + }) + }) + .fetch_one(db) + .await +} + +pub async fn register_tx_in( + db: &mut PgConnection, + tx: &TxIn, + subject: &Option<IncomingSubject>, +) -> sqlx::Result<AddIncomingResult> { + sqlx::query( + " + SELECT out_reserve_pub_reuse, out_new, out_tx_row_id, out_timestamp + FROM register_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9) + ", + ) + .bind(tx.code as i64) + .bind_amount(&tx.amount) + .bind(&tx.subject) + .bind(&tx.debtor.number) + .bind(&tx.debtor.name) + .bind_timestamp(&tx.timestamp) + .bind(subject.as_ref().map(|it| it.ty())) + .bind(subject.as_ref().map(|it| it.key())) + .try_map(|r: PgRow| { + Ok(if r.try_get(0)? { + AddIncomingResult::ReservePubReuse + } else { + AddIncomingResult::Success(RegisteredTx { + new: r.try_get(1)?, + row_id: r.try_get_u64(2)?, + timestamp: r.try_get_timestamp(3)?, + }) + }) + }) + .fetch_one(db) + .await +} + +pub async fn register_tx_out( + db: &mut PgConnection, + tx: &TxOut, + subject: &Option<OutgoingSubject>, +) -> sqlx::Result<RegisteredTx> { + sqlx::query( + " + SELECT out_new, out_tx_row_id, out_timestamp + FROM register_tx_out($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9) + ", + ) + .bind(tx.code as i64) + .bind_amount(&tx.amount) + .bind(&tx.subject) + .bind(&tx.creditor.number) + .bind(&tx.creditor.name) + .bind_timestamp(&tx.timestamp) + .bind(subject.as_ref().map(|it| it.0.as_ref())) + .bind(subject.as_ref().map(|it| it.1.as_str())) + .try_map(|r: PgRow| { + Ok(RegisteredTx { + new: r.try_get(0)?, + row_id: r.try_get_u64(1)?, + timestamp: r.try_get_timestamp(2)?, + }) + }) + .fetch_one(db) + .await +} + +#[derive(Debug, PartialEq, Eq)] +pub enum TransferResult { + Success { id: u64, timestamp: Timestamp }, + RequestUidReuse, + WtidReuse, +} + +pub async fn make_transfer<'a>( + db: impl PgExecutor<'a>, + req: &TransferRequest, + creditor: &MagnetPayto, + timestamp: &Timestamp, +) -> sqlx::Result<TransferResult> { + let subject = format!("{} {}", req.wtid, req.exchange_base_url); + sqlx::query( + " + SELECT out_request_uid_reuse, out_wtid_reuse, out_tx_row_id, out_timestamp + FROM taler_transfer($1, $2, $3, ($4, $5)::taler_amount, $6, $7, $8, $9) + ", + ) + .bind(req.request_uid.as_ref()) + .bind(req.wtid.as_ref()) + .bind(&subject) + .bind_amount(&req.amount) + .bind(req.exchange_base_url.as_str()) + .bind(&creditor.number) + .bind(&creditor.name) + .bind_timestamp(timestamp) + .try_map(|r: PgRow| { + Ok(if r.try_get(0)? { + TransferResult::RequestUidReuse + } else if r.try_get(1)? { + TransferResult::WtidReuse + } else { + TransferResult::Success { + id: r.try_get_u64(2)?, + timestamp: r.try_get_timestamp(3)?, + } + }) + }) + .fetch_one(db) + .await +} + +pub async fn transfer_page<'a>( + db: impl PgExecutor<'a>, + status: &Option<TransferState>, + params: &Page, +) -> sqlx::Result<Vec<TransferListStatus>> { + page( + db, + "initiated_id", + params, + || { + let mut builder = QueryBuilder::new( + " + SELECT + initiated_id, + status, + (amount).val as amount_val, + (amount).frac as amount_frac, + credit_account, + credit_name, + created + FROM transfer + JOIN initiated USING (initiated_id) + WHERE + ", + ); + if let Some(status) = status { + builder.push(" status = ").push_bind(status).push(" AND "); + } + builder + }, + |r: PgRow| { + Ok(TransferListStatus { + row_id: r.try_get_safeu64(0)?, + status: r.try_get(1)?, + amount: r.try_get_amount_i(2, CURRENCY)?, + credit_account: MagnetPayto { + number: r.try_get(4)?, + name: r.try_get(4)?, + } + .as_payto(), + timestamp: r.try_get_timestamp(6)?, + }) + }, + ) + .await +} + +pub async fn outgoing_history( + db: &PgPool, + params: &History, + listen: impl FnOnce() -> Receiver<i64>, +) -> sqlx::Result<Vec<OutgoingBankTransaction>> { + history( + db, + "tx_out_id", + params, + listen, + || { + QueryBuilder::new( + " + SELECT + tx_out_id, + (amount).val as amount_val, + (amount).frac as amount_frac, + credit_account, + credit_name, + created, + exchange_base_url, + wtid + FROM taler_out + JOIN tx_out USING (tx_out_id) + WHERE + ", + ) + }, + |r: PgRow| { + Ok(OutgoingBankTransaction { + row_id: r.try_get_safeu64(0)?, + amount: r.try_get_amount_i(1, CURRENCY)?, + credit_account: MagnetPayto { + number: r.try_get(3)?, + name: r.try_get(4)?, + } + .as_payto(), + date: r.try_get_timestamp(5)?, + exchange_base_url: r.try_get_url(6)?, + wtid: r.try_get_base32(7)?, + }) + }, + ) + .await +} + +pub async fn incoming_history( + db: &PgPool, + params: &History, + listen: impl FnOnce() -> Receiver<i64>, +) -> sqlx::Result<Vec<IncomingBankTransaction>> { + history( + db, + "tx_in_id", + params, + listen, + || { + QueryBuilder::new( + " + SELECT + type, + tx_in_id, + (amount).val as amount_val, + (amount).frac as amount_frac, + debit_account, + debit_name, + created, + metadata + FROM taler_in + JOIN tx_in USING (tx_in_id) + WHERE + ", + ) + }, + |r: PgRow| { + Ok(match r.try_get(0)? { + IncomingType::reserve => IncomingBankTransaction::Reserve { + row_id: r.try_get_safeu64(1)?, + amount: r.try_get_amount_i(2, CURRENCY)?, + debit_account: MagnetPayto { + number: r.try_get(4)?, + name: r.try_get(5)?, + } + .as_payto(), + date: r.try_get_timestamp(6)?, + reserve_pub: r.try_get_base32(7)?, + }, + IncomingType::kyc => IncomingBankTransaction::Kyc { + row_id: r.try_get_safeu64(1)?, + amount: r.try_get_amount_i(2, CURRENCY)?, + debit_account: MagnetPayto { + number: r.try_get(4)?, + name: r.try_get(5)?, + } + .as_payto(), + date: r.try_get_timestamp(6)?, + account_pub: r.try_get_base32(7)?, + }, + IncomingType::wad => { + unimplemented!("WAD is not yet supported") + } + }) + }, + ) + .await +} + +pub async fn transfer_by_id<'a>( + db: impl PgExecutor<'a>, + id: u64, +) -> sqlx::Result<Option<TransferStatus>> { + sqlx::query( + " + SELECT + status, + status_msg, + (amount).val as amount_val, + (amount).frac as amount_frac, + exchange_base_url, + wtid, + credit_account, + credit_name, + created + FROM transfer + JOIN initiated USING (initiated_id) + WHERE initiated_id = $1 + ", + ) + .bind(id as i64) + .try_map(|r: PgRow| { + Ok(TransferStatus { + status: r.try_get(0)?, + status_msg: r.try_get(1)?, + amount: r.try_get_amount_i(2, CURRENCY)?, + origin_exchange_url: r.try_get(4)?, + wtid: r.try_get_base32(5)?, + credit_account: MagnetPayto { + number: r.try_get(6)?, + name: r.try_get(7)?, + } + .as_payto(), + timestamp: r.try_get_timestamp(8)?, + }) + }) + .fetch_optional(db) + .await +} + +pub async fn pending_batch<'a>( + db: impl PgExecutor<'a>, + start: &Timestamp, +) -> sqlx::Result<Vec<Initiated>> { + sqlx::query( + " + SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name + FROM initiated + WHERE magnet_code IS NULL AND (last_submitted IS NULL OR last_submitted < $1) + LIMIT 100 + ", + ) + .bind_timestamp(start) + .try_map(|r: PgRow| { + Ok(Initiated { + id: r.try_get_u64(0)?, + amount: r.try_get_amount_i(1, CURRENCY)?, + subject: r.try_get(3)?, + creditor: MagnetPayto { + number: r.try_get(4)?, + name: r.try_get(5)?, + }, + }) + }) + .fetch_all(db) + .await +} + +/** Update status of a sucessfull submitted initiated transaction */ +pub async fn initiated_submit_success<'a>( + db: impl PgExecutor<'a>, + id: u64, + timestamp: &Timestamp, + magnet_code: u64, +) -> sqlx::Result<()> { + sqlx::query( + " + UPDATE initiated + SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2 + WHERE initiated_id=$3 + " + ).bind_timestamp(timestamp) + .bind(magnet_code as i64) + .bind(id as i64) + .execute(db).await?; + Ok(()) +} + +/** Update status of a sucessfull submitted initiated transaction */ +pub async fn initiated_submit_failure<'a>( + db: impl PgExecutor<'a>, + id: u64, + timestamp: &Timestamp, + msg: &str, +) -> sqlx::Result<()> { + sqlx::query( + " + UPDATE initiated + SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, status_msg=$2 + WHERE initiated_id=$3 + ", + ) + .bind_timestamp(timestamp) + .bind(msg) + .bind(id as i64) + .execute(db) + .await?; + Ok(()) +} + +#[cfg(test)] +mod test { + + use sqlx::{postgres::PgRow, PgConnection, PgPool}; + use taler_api::{ + db::TypeHelper, + subject::{IncomingSubject, OutgoingSubject}, + }; + use taler_common::{ + api_common::{EddsaPublicKey, HashCode, ShortHashCode}, + api_params::{History, Page}, + api_wire::TransferRequest, + types::{amount::amount, payto::payto, timestamp::Timestamp, url}, + }; + use tokio::sync::watch::Receiver; + + use crate::{ + constant::CURRENCY, + db::{ + self, make_transfer, register_tx_in, register_tx_in_admin, register_tx_out, + AddIncomingResult, RegisteredTx, TransferResult, TxIn, TxOut, + }, + MagnetPayto, + }; + + use super::TxInAdmin; + + fn fake_listen<T: Default>() -> Receiver<T> { + tokio::sync::watch::channel(T::default()).1 + } + + async fn setup() -> (PgConnection, PgPool) { + let pool = taler_test_utils::db_test_setup().await; + db::db_init(&pool, false).await.expect("dbinit"); + let conn = pool.acquire().await.expect("aquire conn").leak(); + (conn, pool) + } + + #[tokio::test] + async fn tx_in() { + let (mut db, pool) = setup().await; + + async fn routine( + db: &mut PgConnection, + first: &Option<IncomingSubject>, + second: &Option<IncomingSubject>, + ) { + let (id, code) = + sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in") + .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) + .fetch_one(&mut *db) + .await + .unwrap(); + let tx = TxIn { + code: code, + amount: amount("EUR:10"), + subject: "subject".to_owned(), + debtor: MagnetPayto { + number: "number".to_owned(), + name: "name".to_owned(), + }, + timestamp: Timestamp::now_stable(), + }; + // Insert + assert_eq!( + register_tx_in(db, &tx, &first) + .await + .expect("register tx in"), + AddIncomingResult::Success(RegisteredTx { + new: true, + row_id: id, + timestamp: tx.timestamp + }) + ); + // Idempotent + assert_eq!( + register_tx_in( + db, + &TxIn { + timestamp: Timestamp::now(), + ..tx.clone() + }, + &first + ) + .await + .expect("register tx in"), + AddIncomingResult::Success(RegisteredTx { + new: false, + row_id: id, + timestamp: tx.timestamp + }) + ); + // Many + assert_eq!( + register_tx_in( + db, + &TxIn { + code: code + 1, + ..tx + }, + &second + ) + .await + .expect("register tx in"), + AddIncomingResult::Success(RegisteredTx { + new: true, + row_id: id + 1, + timestamp: tx.timestamp + }) + ); + } + + // Empty db + assert_eq!( + db::incoming_history(&pool, &History::default(), fake_listen) + .await + .unwrap(), + Vec::new() + ); + + // Regular transaction + routine(&mut db, &None, &None).await; + + // Reserve transaction + routine( + &mut db, + &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), + &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), + ) + .await; + + // Kyc transaction + routine( + &mut db, + &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), + &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), + ) + .await; + + // History + assert_eq!( + db::incoming_history(&pool, &History::default(), fake_listen) + .await + .unwrap() + .len(), + 4 + ); + } + + #[tokio::test] + async fn tx_in_admin() { + let (_, pool) = setup().await; + + // Empty db + assert_eq!( + db::incoming_history(&pool, &History::default(), fake_listen) + .await + .unwrap(), + Vec::new() + ); + + let tx = TxInAdmin { + amount: amount("EUR:10"), + subject: "subject".to_owned(), + debtor: MagnetPayto { + number: "number".to_owned(), + name: "name".to_owned(), + }, + timestamp: Timestamp::now_stable(), + metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), + }; + // Insert + assert_eq!( + register_tx_in_admin(&pool, &tx) + .await + .expect("register tx in"), + AddIncomingResult::Success(RegisteredTx { + new: true, + row_id: 1, + timestamp: tx.timestamp + }) + ); + // Idempotent + assert_eq!( + register_tx_in_admin( + &pool, + &TxInAdmin { + timestamp: Timestamp::now(), + ..tx.clone() + } + ) + .await + .expect("register tx in"), + AddIncomingResult::Success(RegisteredTx { + new: false, + row_id: 1, + timestamp: tx.timestamp + }) + ); + // Many + assert_eq!( + register_tx_in_admin( + &pool, + &TxInAdmin { + subject: "Other".to_owned(), + metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), + ..tx.clone() + } + ) + .await + .expect("register tx in"), + AddIncomingResult::Success(RegisteredTx { + new: true, + row_id: 2, + timestamp: tx.timestamp + }) + ); + + // History + assert_eq!( + db::incoming_history(&pool, &History::default(), fake_listen) + .await + .unwrap() + .len(), + 2 + ); + } + + #[tokio::test] + async fn tx_out() { + let (mut db, pool) = setup().await; + + async fn routine( + db: &mut PgConnection, + first: &Option<OutgoingSubject>, + second: &Option<OutgoingSubject>, + ) { + let (id, code) = + sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out") + .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) + .fetch_one(&mut *db) + .await + .unwrap(); + let tx = TxOut { + code, + amount: amount("EUR:10"), + subject: "subject".to_owned(), + creditor: MagnetPayto { + number: "number".to_owned(), + name: "name".to_owned(), + }, + timestamp: Timestamp::now_stable(), + }; + // Insert + assert_eq!( + register_tx_out(db, &tx, &first) + .await + .expect("register tx out"), + RegisteredTx { + new: true, + row_id: id, + timestamp: tx.timestamp + } + ); + // Idempotent + assert_eq!( + register_tx_out( + db, + &TxOut { + timestamp: Timestamp::now(), + ..tx.clone() + }, + &first + ) + .await + .expect("register tx out"), + RegisteredTx { + new: false, + row_id: id, + timestamp: tx.timestamp + } + ); + // Many + assert_eq!( + register_tx_out( + db, + &TxOut { + code: code + 1, + ..tx.clone() + }, + &second + ) + .await + .expect("register tx out"), + RegisteredTx { + new: true, + row_id: id + 1, + timestamp: tx.timestamp + } + ); + } + + // Empty db + assert_eq!( + db::outgoing_history(&pool, &History::default(), fake_listen) + .await + .unwrap(), + Vec::new() + ); + + // Regular transaction + routine(&mut db, &None, &None).await; + + // Talerable transaction + routine( + &mut db, + &Some(OutgoingSubject( + ShortHashCode::rand(), + url("https://exchange.com"), + )), + &Some(OutgoingSubject( + ShortHashCode::rand(), + url("https://exchange.com"), + )), + ) + .await; + + // History + assert_eq!( + db::outgoing_history(&pool, &History::default(), fake_listen) + .await + .unwrap() + .len(), + 2 + ); + } + + #[tokio::test] + async fn transfer() { + let (mut db, _) = setup().await; + + // Empty db + assert_eq!(db::transfer_by_id(&mut db, 0).await.unwrap(), None); + assert_eq!( + db::transfer_page(&mut db, &None, &Page::default()) + .await + .unwrap(), + Vec::new() + ); + + let req = TransferRequest { + request_uid: HashCode::rand(), + amount: amount("EUR:10"), + exchange_base_url: url("https://exchange.test.com/"), + wtid: ShortHashCode::rand(), + credit_account: payto("payto://magnet-bank/todo"), + }; + let payto = MagnetPayto { + number: "number".to_owned(), + name: "name".to_owned(), + }; + let timestamp = Timestamp::now_stable(); + // Insert + assert_eq!( + make_transfer(&mut db, &req, &payto, ×tamp) + .await + .expect("transfer"), + TransferResult::Success { + id: 1, + timestamp: timestamp + } + ); + // Idempotent + assert_eq!( + make_transfer(&mut db, &req, &payto, &Timestamp::now()) + .await + .expect("transfer"), + TransferResult::Success { + id: 1, + timestamp: timestamp + } + ); + // Request UID reuse + assert_eq!( + make_transfer( + &mut db, + &TransferRequest { + wtid: ShortHashCode::rand(), + ..req.clone() + }, + &payto, + &Timestamp::now() + ) + .await + .expect("transfer"), + TransferResult::RequestUidReuse + ); + // wtid reuse + assert_eq!( + make_transfer( + &mut db, + &TransferRequest { + request_uid: HashCode::rand(), + ..req.clone() + }, + &payto, + &Timestamp::now() + ) + .await + .expect("transfer"), + TransferResult::WtidReuse + ); + // Many + assert_eq!( + make_transfer( + &mut db, + &TransferRequest { + request_uid: HashCode::rand(), + wtid: ShortHashCode::rand(), + ..req + }, + &payto, + ×tamp + ) + .await + .expect("transfer"), + TransferResult::Success { + id: 2, + timestamp: timestamp + } + ); + + // Get + assert!(db::transfer_by_id(&mut db, 1).await.unwrap().is_some()); + assert!(db::transfer_by_id(&mut db, 2).await.unwrap().is_some()); + assert!(db::transfer_by_id(&mut db, 3).await.unwrap().is_none()); + assert_eq!( + db::transfer_page(&mut db, &None, &Page::default()) + .await + .unwrap() + .len(), + 2 + ); + } + + #[tokio::test] + async fn status() { + let (mut db, _) = setup().await; + + // Unknown transfer + db::initiated_submit_failure(&mut db, 1, &Timestamp::now(), "msg") + .await + .unwrap(); + db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) + .await + .unwrap(); + } + + #[tokio::test] + async fn batch() { + let (mut db, _) = setup().await; + let start = Timestamp::now(); + let magnet_payto = MagnetPayto { + number: "number".to_owned(), + name: "name".to_owned(), + }; + + // Empty db + let pendings = db::pending_batch(&mut db, &start) + .await + .expect("pending_batch"); + assert_eq!(pendings.len(), 0); + + // Some transfers + for i in 0..3 { + make_transfer( + &mut db, + &TransferRequest { + request_uid: HashCode::rand(), + amount: amount(format!("{CURRENCY}:{}", i + 1)), + exchange_base_url: url("https://exchange.test.com/"), + wtid: ShortHashCode::rand(), + credit_account: payto("payto://magnet-bank/todo"), + }, + &magnet_payto, + &&Timestamp::now(), + ) + .await + .expect("transfer"); + } + let pendings = db::pending_batch(&mut db, &start) + .await + .expect("pending_batch"); + assert_eq!(pendings.len(), 3); + + // Max 100 txs in batch + for i in 0..100 { + make_transfer( + &mut db, + &TransferRequest { + request_uid: HashCode::rand(), + amount: amount(format!("{CURRENCY}:{}", i + 1)), + exchange_base_url: url("https://exchange.test.com/"), + wtid: ShortHashCode::rand(), + credit_account: payto("payto://magnet-bank/todo"), + }, + &magnet_payto, + &Timestamp::now(), + ) + .await + .expect("transfer"); + } + let pendings = db::pending_batch(&mut db, &start) + .await + .expect("pending_batch"); + assert_eq!(pendings.len(), 100); + + // Skip uploaded + for i in 0..=10 { + db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) + .await + .expect("status success"); + } + let pendings = db::pending_batch(&mut db, &start) + .await + .expect("pending_batch"); + assert_eq!(pendings.len(), 93); + + // Skip tried since start + for i in 0..=10 { + db::initiated_submit_failure(&mut db, 10 + i, &Timestamp::now(), "failure") + .await + .expect("status failure"); + } + let pendings = db::pending_batch(&mut db, &start) + .await + .expect("pending_batch"); + assert_eq!(pendings.len(), 83); + let pendings = db::pending_batch(&mut db, &Timestamp::now()) + .await + .expect("pending_batch"); + assert_eq!(pendings.len(), 93); + } +} diff --git a/wire-gateway/magnet-bank/src/dev.rs b/adapter/taler-magnet-bank-adapter/src/dev.rs diff --git a/wire-gateway/magnet-bank/src/keys.rs b/adapter/taler-magnet-bank-adapter/src/keys.rs diff --git a/wire-gateway/magnet-bank/src/lib.rs b/adapter/taler-magnet-bank-adapter/src/lib.rs diff --git a/wire-gateway/magnet-bank/src/magnet.rs b/adapter/taler-magnet-bank-adapter/src/magnet.rs diff --git a/wire-gateway/magnet-bank/src/magnet/error.rs b/adapter/taler-magnet-bank-adapter/src/magnet/error.rs diff --git a/wire-gateway/magnet-bank/src/magnet/oauth.rs b/adapter/taler-magnet-bank-adapter/src/magnet/oauth.rs diff --git a/wire-gateway/magnet-bank/src/main.rs b/adapter/taler-magnet-bank-adapter/src/main.rs diff --git a/wire-gateway/magnet-bank/src/wire_gateway.rs b/adapter/taler-magnet-bank-adapter/src/wire_gateway.rs diff --git a/wire-gateway/magnet-bank/src/worker.rs b/adapter/taler-magnet-bank-adapter/src/worker.rs diff --git a/adapter/taler-magnet-bank-adapter/tests/api.rs b/adapter/taler-magnet-bank-adapter/tests/api.rs @@ -0,0 +1,108 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::sync::Arc; + +use magnet_bank::{db, wire_gateway::MagnetWireGateway, MagnetPayto}; +use sqlx::PgPool; +use taler_api::{auth::AuthMethod, standard_layer, subject::OutgoingSubject}; +use taler_common::{ + api_common::ShortHashCode, + api_wire::{OutgoingHistory, TransferState}, + types::{amount::amount, payto::payto, timestamp::Timestamp, url}, +}; +use taler_test_utils::{ + axum_test::TestServer, + db_test_setup, + helpers::TestResponseHelper, + routine::{admin_add_incoming_routine, routine_pagination, transfer_routine}, +}; + +async fn setup() -> (TestServer, PgPool) { + let pool = db_test_setup().await; + db::db_init(&pool, false).await.unwrap(); + let gateway = MagnetWireGateway::start(pool.clone(), payto("payto://magnet-bank/todo")).await; + let server = TestServer::new(standard_layer( + taler_api::wire_gateway_api(Arc::new(gateway)), + AuthMethod::None, + )) + .unwrap(); + + (server, pool) +} + +#[tokio::test] +async fn transfer() { + let (server, _) = setup().await; + transfer_routine( + &server, + TransferState::pending, + &payto("payto://magnet-bank/account?receiver-name=John+Smith"), + ) + .await; +} + +#[tokio::test] +async fn outgoing_history() { + let (server, pool) = setup().await; + server.get("/history/outgoing").await.assert_no_content(); + routine_pagination::<OutgoingHistory, _>( + &server, + "/history/outgoing", + |it| { + it.outgoing_transactions + .into_iter() + .map(|it| *it.row_id as i64) + .collect() + }, + |_, i| { + let acquire = pool.acquire(); + async move { + let mut conn = acquire.await.unwrap(); + db::register_tx_out( + &mut *conn, + &db::TxOut { + code: i as u64, + amount: amount("EUR:10"), + subject: "subject".to_owned(), + creditor: MagnetPayto { + number: "number".to_owned(), + name: "name".to_owned(), + }, + timestamp: Timestamp::now_stable(), + }, + &Some(OutgoingSubject( + ShortHashCode::rand(), + url("https://exchange.test"), + )), + ) + .await + .unwrap(); + } + }, + ) + .await; +} + +#[tokio::test] +async fn admin_add_incoming() { + let (server, _) = setup().await; + admin_add_incoming_routine( + &server, + &payto("payto://magnet-bank/account?receiver-name=John+Smith"), + ) + .await; +} diff --git a/common/taler-api/Cargo.toml b/common/taler-api/Cargo.toml @@ -27,7 +27,7 @@ thiserror.workspace = true taler-common.workspace = true [dev-dependencies] -test-utils.workspace = true +taler-test-utils.workspace = true criterion.workspace = true fastrand.workspace = true diff --git a/common/taler-api/tests/api.rs b/common/taler-api/tests/api.rs @@ -23,7 +23,7 @@ use taler_common::{ error_code::ErrorCode, types::{amount::amount, payto::payto, url}, }; -use test_utils::{ +use taler_test_utils::{ axum_test::TestServer, db_test_setup, helpers::TestResponseHelper, diff --git a/common/taler-test-utils/Cargo.toml b/common/taler-test-utils/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "taler-test-utils" +version = "0.1.0" +edition = "2021" + +[dependencies] +axum-test = "17.0" +axum.workspace = true +tokio.workspace = true +serde_json.workspace = true +serde.workspace = true +taler-common.workspace = true +taler-api.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +tempfile.workspace = true +sqlx = { workspace = true, features = [ + "postgres", + "runtime-tokio-native-tls", + "tls-native-tls", +] } diff --git a/common/test-utils/src/helpers.rs b/common/taler-test-utils/src/helpers.rs diff --git a/common/test-utils/src/json.rs b/common/taler-test-utils/src/json.rs diff --git a/common/test-utils/src/lib.rs b/common/taler-test-utils/src/lib.rs diff --git a/common/test-utils/src/routine.rs b/common/taler-test-utils/src/routine.rs diff --git a/common/test-utils/Cargo.toml b/common/test-utils/Cargo.toml @@ -1,21 +0,0 @@ -[package] -name = "test-utils" -version = "0.1.0" -edition = "2021" - -[dependencies] -axum-test = "17.0" -axum.workspace = true -tokio.workspace = true -serde_json.workspace = true -serde.workspace = true -taler-common.workspace = true -taler-api.workspace = true -tracing.workspace = true -tracing-subscriber.workspace = true -tempfile.workspace = true -sqlx = { workspace = true, features = [ - "postgres", - "runtime-tokio-native-tls", - "tls-native-tls", -] } diff --git a/wire-gateway/magnet-bank/Cargo.toml b/wire-gateway/magnet-bank/Cargo.toml @@ -1,40 +0,0 @@ -[package] -name = "magnet-bank" -version = "0.1.0" -edition = "2021" - -[dependencies] -rand_core = { version = "*" } -reqwest = { version = "0.12", default-features = false, features = [ - "json", - "native-tls", -] } -hmac = "0.12" -sha1 = "0.10" -p256 = { version = "0.13.2", features = ["alloc", "ecdsa"] } -spki = "0.7.3" -base64 = "0.22" -form_urlencoded = "1.2" -percent-encoding = "2.3" -serde_urlencoded = "0.7.1" -passterm = "2.0" -sqlx = { workspace = true, features = [ - "postgres", - "runtime-tokio-native-tls", - "tls-native-tls", -] } -serde_json = { workspace = true, features = ["raw_value"] } -jiff = { workspace = true, features = ["serde"] } -taler-common.workspace = true -taler-api.workspace = true -clap.workspace = true -serde.workspace = true -serde_path_to_error.workspace = true -thiserror.workspace = true -tracing.workspace = true -tracing-subscriber.workspace = true -tokio.workspace = true -anyhow.workspace = true - -[dev-dependencies] -test-utils.workspace = true diff --git a/wire-gateway/magnet-bank/src/db.rs b/wire-gateway/magnet-bank/src/db.rs @@ -1,1103 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::fmt::Display; - -use sqlx::{postgres::PgRow, PgConnection, PgExecutor, PgPool, QueryBuilder, Row}; -use taler_api::{ - db::{history, page, BindHelper, IncomingType, TypeHelper}, - subject::{IncomingSubject, OutgoingSubject}, -}; -use taler_common::{ - api_params::{History, Page}, - api_wire::{ - IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, - TransferState, TransferStatus, - }, - types::{amount::Amount, timestamp::Timestamp}, -}; -use tokio::sync::watch::{Receiver, Sender}; - -use crate::{constant::CURRENCY, MagnetPayto}; - -pub async fn notification_listener( - pool: PgPool, - in_channel: Sender<i64>, - taler_in_channel: Sender<i64>, - out_channel: Sender<i64>, - taler_out_channel: Sender<i64>, -) -> sqlx::Result<()> { - taler_api::notification::notification_listener!(&pool, - "tx_in" => (row_id: i64) { - in_channel.send_replace(row_id); - }, - "taler_in" => (row_id: i64) { - taler_in_channel.send_replace(row_id); - }, - "tx_out" => (row_id: i64) { - out_channel.send_replace(row_id); - }, - "taler_out" => (row_id: i64) { - taler_out_channel.send_replace(row_id); - } - ) -} - -#[derive(Debug, Clone)] -pub struct TxIn { - pub code: u64, - pub amount: Amount, - pub subject: String, - pub debtor: MagnetPayto, - pub timestamp: Timestamp, -} - -impl Display for TxIn { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let TxIn { - code, - amount, - subject, - debtor, - timestamp, - } = self; - write!(f, "{timestamp} {amount} {code} {debtor} '{subject}'") - } -} - -#[derive(Debug, Clone)] -pub struct TxOut { - pub code: u64, - pub amount: Amount, - pub subject: String, - pub creditor: MagnetPayto, - pub timestamp: Timestamp, -} - -impl Display for TxOut { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let TxOut { - code, - amount, - subject, - creditor, - timestamp, - } = self; - write!(f, "{timestamp} {amount} {code} {creditor} '{subject}'") - } -} - -#[derive(Debug, Clone)] -pub struct TxInAdmin { - pub amount: Amount, - pub subject: String, - pub debtor: MagnetPayto, - pub timestamp: Timestamp, - pub metadata: IncomingSubject, -} - -#[derive(Debug, PartialEq, Eq)] -pub struct RegisteredTx { - pub new: bool, - pub row_id: u64, - pub timestamp: Timestamp, -} - -#[derive(Debug, PartialEq, Eq)] -pub enum AddIncomingResult { - Success(RegisteredTx), - ReservePubReuse, -} - -#[derive(Debug, PartialEq, Eq)] -pub struct Initiated { - pub id: u64, - pub amount: Amount, - pub subject: String, - pub creditor: MagnetPayto, -} - -impl Display for Initiated { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!( - f, - "{} {} {} '{}'", - self.id, self.amount, self.creditor, self.subject - ) - } -} - -pub async fn db_init(db: &PgPool, reset: bool) -> sqlx::Result<()> { - let mut tx = db.begin().await?; - if reset { - sqlx::raw_sql("DROP SCHEMA public CASCADE;CREATE SCHEMA public;") - .execute(&mut *tx) - .await?; - } - // TODO migrations - sqlx::raw_sql(include_str!("../db/schema.sql")) - .execute(&mut *tx) - .await?; - tx.commit().await?; - Ok(()) -} - -pub async fn register_tx_in_admin(db: &PgPool, tx: &TxInAdmin) -> sqlx::Result<AddIncomingResult> { - sqlx::query( - " - SELECT out_reserve_pub_reuse, out_new, out_tx_row_id, out_timestamp - FROM register_tx_in(NULL, ($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8) - ", - ) - .bind_amount(&tx.amount) - .bind(&tx.subject) - .bind(&tx.debtor.number) - .bind(&tx.debtor.name) - .bind_timestamp(&tx.timestamp) - .bind(tx.metadata.ty()) - .bind(tx.metadata.key()) - .try_map(|r: PgRow| { - Ok(if r.try_get(0)? { - AddIncomingResult::ReservePubReuse - } else { - AddIncomingResult::Success(RegisteredTx { - new: r.try_get(1)?, - row_id: r.try_get_u64(2)?, - timestamp: r.try_get_timestamp(3)?, - }) - }) - }) - .fetch_one(db) - .await -} - -pub async fn register_tx_in( - db: &mut PgConnection, - tx: &TxIn, - subject: &Option<IncomingSubject>, -) -> sqlx::Result<AddIncomingResult> { - sqlx::query( - " - SELECT out_reserve_pub_reuse, out_new, out_tx_row_id, out_timestamp - FROM register_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9) - ", - ) - .bind(tx.code as i64) - .bind_amount(&tx.amount) - .bind(&tx.subject) - .bind(&tx.debtor.number) - .bind(&tx.debtor.name) - .bind_timestamp(&tx.timestamp) - .bind(subject.as_ref().map(|it| it.ty())) - .bind(subject.as_ref().map(|it| it.key())) - .try_map(|r: PgRow| { - Ok(if r.try_get(0)? { - AddIncomingResult::ReservePubReuse - } else { - AddIncomingResult::Success(RegisteredTx { - new: r.try_get(1)?, - row_id: r.try_get_u64(2)?, - timestamp: r.try_get_timestamp(3)?, - }) - }) - }) - .fetch_one(db) - .await -} - -pub async fn register_tx_out( - db: &mut PgConnection, - tx: &TxOut, - subject: &Option<OutgoingSubject>, -) -> sqlx::Result<RegisteredTx> { - sqlx::query( - " - SELECT out_new, out_tx_row_id, out_timestamp - FROM register_tx_out($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9) - ", - ) - .bind(tx.code as i64) - .bind_amount(&tx.amount) - .bind(&tx.subject) - .bind(&tx.creditor.number) - .bind(&tx.creditor.name) - .bind_timestamp(&tx.timestamp) - .bind(subject.as_ref().map(|it| it.0.as_ref())) - .bind(subject.as_ref().map(|it| it.1.as_str())) - .try_map(|r: PgRow| { - Ok(RegisteredTx { - new: r.try_get(0)?, - row_id: r.try_get_u64(1)?, - timestamp: r.try_get_timestamp(2)?, - }) - }) - .fetch_one(db) - .await -} - -#[derive(Debug, PartialEq, Eq)] -pub enum TransferResult { - Success { id: u64, timestamp: Timestamp }, - RequestUidReuse, - WtidReuse, -} - -pub async fn make_transfer<'a>( - db: impl PgExecutor<'a>, - req: &TransferRequest, - creditor: &MagnetPayto, - timestamp: &Timestamp, -) -> sqlx::Result<TransferResult> { - let subject = format!("{} {}", req.wtid, req.exchange_base_url); - sqlx::query( - " - SELECT out_request_uid_reuse, out_wtid_reuse, out_tx_row_id, out_timestamp - FROM taler_transfer($1, $2, $3, ($4, $5)::taler_amount, $6, $7, $8, $9) - ", - ) - .bind(req.request_uid.as_ref()) - .bind(req.wtid.as_ref()) - .bind(&subject) - .bind_amount(&req.amount) - .bind(req.exchange_base_url.as_str()) - .bind(&creditor.number) - .bind(&creditor.name) - .bind_timestamp(timestamp) - .try_map(|r: PgRow| { - Ok(if r.try_get(0)? { - TransferResult::RequestUidReuse - } else if r.try_get(1)? { - TransferResult::WtidReuse - } else { - TransferResult::Success { - id: r.try_get_u64(2)?, - timestamp: r.try_get_timestamp(3)?, - } - }) - }) - .fetch_one(db) - .await -} - -pub async fn transfer_page<'a>( - db: impl PgExecutor<'a>, - status: &Option<TransferState>, - params: &Page, -) -> sqlx::Result<Vec<TransferListStatus>> { - page( - db, - "initiated_id", - params, - || { - let mut builder = QueryBuilder::new( - " - SELECT - initiated_id, - status, - (amount).val as amount_val, - (amount).frac as amount_frac, - credit_account, - credit_name, - created - FROM transfer - JOIN initiated USING (initiated_id) - WHERE - ", - ); - if let Some(status) = status { - builder.push(" status = ").push_bind(status).push(" AND "); - } - builder - }, - |r: PgRow| { - Ok(TransferListStatus { - row_id: r.try_get_safeu64(0)?, - status: r.try_get(1)?, - amount: r.try_get_amount_i(2, CURRENCY)?, - credit_account: MagnetPayto { - number: r.try_get(4)?, - name: r.try_get(4)?, - } - .as_payto(), - timestamp: r.try_get_timestamp(6)?, - }) - }, - ) - .await -} - -pub async fn outgoing_history( - db: &PgPool, - params: &History, - listen: impl FnOnce() -> Receiver<i64>, -) -> sqlx::Result<Vec<OutgoingBankTransaction>> { - history( - db, - "tx_out_id", - params, - listen, - || { - QueryBuilder::new( - " - SELECT - tx_out_id, - (amount).val as amount_val, - (amount).frac as amount_frac, - credit_account, - credit_name, - created, - exchange_base_url, - wtid - FROM taler_out - JOIN tx_out USING (tx_out_id) - WHERE - ", - ) - }, - |r: PgRow| { - Ok(OutgoingBankTransaction { - row_id: r.try_get_safeu64(0)?, - amount: r.try_get_amount_i(1, CURRENCY)?, - credit_account: MagnetPayto { - number: r.try_get(3)?, - name: r.try_get(4)?, - } - .as_payto(), - date: r.try_get_timestamp(5)?, - exchange_base_url: r.try_get_url(6)?, - wtid: r.try_get_base32(7)?, - }) - }, - ) - .await -} - -pub async fn incoming_history( - db: &PgPool, - params: &History, - listen: impl FnOnce() -> Receiver<i64>, -) -> sqlx::Result<Vec<IncomingBankTransaction>> { - history( - db, - "tx_in_id", - params, - listen, - || { - QueryBuilder::new( - " - SELECT - type, - tx_in_id, - (amount).val as amount_val, - (amount).frac as amount_frac, - debit_account, - debit_name, - created, - metadata - FROM taler_in - JOIN tx_in USING (tx_in_id) - WHERE - ", - ) - }, - |r: PgRow| { - Ok(match r.try_get(0)? { - IncomingType::reserve => IncomingBankTransaction::Reserve { - row_id: r.try_get_safeu64(1)?, - amount: r.try_get_amount_i(2, CURRENCY)?, - debit_account: MagnetPayto { - number: r.try_get(4)?, - name: r.try_get(5)?, - } - .as_payto(), - date: r.try_get_timestamp(6)?, - reserve_pub: r.try_get_base32(7)?, - }, - IncomingType::kyc => IncomingBankTransaction::Kyc { - row_id: r.try_get_safeu64(1)?, - amount: r.try_get_amount_i(2, CURRENCY)?, - debit_account: MagnetPayto { - number: r.try_get(4)?, - name: r.try_get(5)?, - } - .as_payto(), - date: r.try_get_timestamp(6)?, - account_pub: r.try_get_base32(7)?, - }, - IncomingType::wad => { - unimplemented!("WAD is not yet supported") - } - }) - }, - ) - .await -} - -pub async fn transfer_by_id<'a>( - db: impl PgExecutor<'a>, - id: u64, -) -> sqlx::Result<Option<TransferStatus>> { - sqlx::query( - " - SELECT - status, - status_msg, - (amount).val as amount_val, - (amount).frac as amount_frac, - exchange_base_url, - wtid, - credit_account, - credit_name, - created - FROM transfer - JOIN initiated USING (initiated_id) - WHERE initiated_id = $1 - ", - ) - .bind(id as i64) - .try_map(|r: PgRow| { - Ok(TransferStatus { - status: r.try_get(0)?, - status_msg: r.try_get(1)?, - amount: r.try_get_amount_i(2, CURRENCY)?, - origin_exchange_url: r.try_get(4)?, - wtid: r.try_get_base32(5)?, - credit_account: MagnetPayto { - number: r.try_get(6)?, - name: r.try_get(7)?, - } - .as_payto(), - timestamp: r.try_get_timestamp(8)?, - }) - }) - .fetch_optional(db) - .await -} - -pub async fn pending_batch<'a>( - db: impl PgExecutor<'a>, - start: &Timestamp, -) -> sqlx::Result<Vec<Initiated>> { - sqlx::query( - " - SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name - FROM initiated - WHERE magnet_code IS NULL AND (last_submitted IS NULL OR last_submitted < $1) - LIMIT 100 - ", - ) - .bind_timestamp(start) - .try_map(|r: PgRow| { - Ok(Initiated { - id: r.try_get_u64(0)?, - amount: r.try_get_amount_i(1, CURRENCY)?, - subject: r.try_get(3)?, - creditor: MagnetPayto { - number: r.try_get(4)?, - name: r.try_get(5)?, - }, - }) - }) - .fetch_all(db) - .await -} - -/** Update status of a sucessfull submitted initiated transaction */ -pub async fn initiated_submit_success<'a>( - db: impl PgExecutor<'a>, - id: u64, - timestamp: &Timestamp, - magnet_code: u64, -) -> sqlx::Result<()> { - sqlx::query( - " - UPDATE initiated - SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2 - WHERE initiated_id=$3 - " - ).bind_timestamp(timestamp) - .bind(magnet_code as i64) - .bind(id as i64) - .execute(db).await?; - Ok(()) -} - -/** Update status of a sucessfull submitted initiated transaction */ -pub async fn initiated_submit_failure<'a>( - db: impl PgExecutor<'a>, - id: u64, - timestamp: &Timestamp, - msg: &str, -) -> sqlx::Result<()> { - sqlx::query( - " - UPDATE initiated - SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, status_msg=$2 - WHERE initiated_id=$3 - ", - ) - .bind_timestamp(timestamp) - .bind(msg) - .bind(id as i64) - .execute(db) - .await?; - Ok(()) -} - -#[cfg(test)] -mod test { - - use sqlx::{postgres::PgRow, PgConnection, PgPool}; - use taler_api::{ - db::TypeHelper, - subject::{IncomingSubject, OutgoingSubject}, - }; - use taler_common::{ - api_common::{EddsaPublicKey, HashCode, ShortHashCode}, - api_params::{History, Page}, - api_wire::TransferRequest, - types::{amount::amount, payto::payto, timestamp::Timestamp, url}, - }; - use tokio::sync::watch::Receiver; - - use crate::{ - constant::CURRENCY, - db::{ - self, make_transfer, register_tx_in, register_tx_in_admin, register_tx_out, - AddIncomingResult, RegisteredTx, TransferResult, TxIn, TxOut, - }, - MagnetPayto, - }; - - use super::TxInAdmin; - - fn fake_listen<T: Default>() -> Receiver<T> { - tokio::sync::watch::channel(T::default()).1 - } - - async fn setup() -> (PgConnection, PgPool) { - let pool = test_utils::db_test_setup().await; - db::db_init(&pool, false).await.expect("dbinit"); - let conn = pool.acquire().await.expect("aquire conn").leak(); - (conn, pool) - } - - #[tokio::test] - async fn tx_in() { - let (mut db, pool) = setup().await; - - async fn routine( - db: &mut PgConnection, - first: &Option<IncomingSubject>, - second: &Option<IncomingSubject>, - ) { - let (id, code) = - sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in") - .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) - .fetch_one(&mut *db) - .await - .unwrap(); - let tx = TxIn { - code: code, - amount: amount("EUR:10"), - subject: "subject".to_owned(), - debtor: MagnetPayto { - number: "number".to_owned(), - name: "name".to_owned(), - }, - timestamp: Timestamp::now_stable(), - }; - // Insert - assert_eq!( - register_tx_in(db, &tx, &first) - .await - .expect("register tx in"), - AddIncomingResult::Success(RegisteredTx { - new: true, - row_id: id, - timestamp: tx.timestamp - }) - ); - // Idempotent - assert_eq!( - register_tx_in( - db, - &TxIn { - timestamp: Timestamp::now(), - ..tx.clone() - }, - &first - ) - .await - .expect("register tx in"), - AddIncomingResult::Success(RegisteredTx { - new: false, - row_id: id, - timestamp: tx.timestamp - }) - ); - // Many - assert_eq!( - register_tx_in( - db, - &TxIn { - code: code + 1, - ..tx - }, - &second - ) - .await - .expect("register tx in"), - AddIncomingResult::Success(RegisteredTx { - new: true, - row_id: id + 1, - timestamp: tx.timestamp - }) - ); - } - - // Empty db - assert_eq!( - db::incoming_history(&pool, &History::default(), fake_listen) - .await - .unwrap(), - Vec::new() - ); - - // Regular transaction - routine(&mut db, &None, &None).await; - - // Reserve transaction - routine( - &mut db, - &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), - &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), - ) - .await; - - // Kyc transaction - routine( - &mut db, - &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), - &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), - ) - .await; - - // History - assert_eq!( - db::incoming_history(&pool, &History::default(), fake_listen) - .await - .unwrap() - .len(), - 4 - ); - } - - #[tokio::test] - async fn tx_in_admin() { - let (_, pool) = setup().await; - - // Empty db - assert_eq!( - db::incoming_history(&pool, &History::default(), fake_listen) - .await - .unwrap(), - Vec::new() - ); - - let tx = TxInAdmin { - amount: amount("EUR:10"), - subject: "subject".to_owned(), - debtor: MagnetPayto { - number: "number".to_owned(), - name: "name".to_owned(), - }, - timestamp: Timestamp::now_stable(), - metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), - }; - // Insert - assert_eq!( - register_tx_in_admin(&pool, &tx) - .await - .expect("register tx in"), - AddIncomingResult::Success(RegisteredTx { - new: true, - row_id: 1, - timestamp: tx.timestamp - }) - ); - // Idempotent - assert_eq!( - register_tx_in_admin( - &pool, - &TxInAdmin { - timestamp: Timestamp::now(), - ..tx.clone() - } - ) - .await - .expect("register tx in"), - AddIncomingResult::Success(RegisteredTx { - new: false, - row_id: 1, - timestamp: tx.timestamp - }) - ); - // Many - assert_eq!( - register_tx_in_admin( - &pool, - &TxInAdmin { - subject: "Other".to_owned(), - metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), - ..tx.clone() - } - ) - .await - .expect("register tx in"), - AddIncomingResult::Success(RegisteredTx { - new: true, - row_id: 2, - timestamp: tx.timestamp - }) - ); - - // History - assert_eq!( - db::incoming_history(&pool, &History::default(), fake_listen) - .await - .unwrap() - .len(), - 2 - ); - } - - #[tokio::test] - async fn tx_out() { - let (mut db, pool) = setup().await; - - async fn routine( - db: &mut PgConnection, - first: &Option<OutgoingSubject>, - second: &Option<OutgoingSubject>, - ) { - let (id, code) = - sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out") - .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) - .fetch_one(&mut *db) - .await - .unwrap(); - let tx = TxOut { - code, - amount: amount("EUR:10"), - subject: "subject".to_owned(), - creditor: MagnetPayto { - number: "number".to_owned(), - name: "name".to_owned(), - }, - timestamp: Timestamp::now_stable(), - }; - // Insert - assert_eq!( - register_tx_out(db, &tx, &first) - .await - .expect("register tx out"), - RegisteredTx { - new: true, - row_id: id, - timestamp: tx.timestamp - } - ); - // Idempotent - assert_eq!( - register_tx_out( - db, - &TxOut { - timestamp: Timestamp::now(), - ..tx.clone() - }, - &first - ) - .await - .expect("register tx out"), - RegisteredTx { - new: false, - row_id: id, - timestamp: tx.timestamp - } - ); - // Many - assert_eq!( - register_tx_out( - db, - &TxOut { - code: code + 1, - ..tx.clone() - }, - &second - ) - .await - .expect("register tx out"), - RegisteredTx { - new: true, - row_id: id + 1, - timestamp: tx.timestamp - } - ); - } - - // Empty db - assert_eq!( - db::outgoing_history(&pool, &History::default(), fake_listen) - .await - .unwrap(), - Vec::new() - ); - - // Regular transaction - routine(&mut db, &None, &None).await; - - // Talerable transaction - routine( - &mut db, - &Some(OutgoingSubject( - ShortHashCode::rand(), - url("https://exchange.com"), - )), - &Some(OutgoingSubject( - ShortHashCode::rand(), - url("https://exchange.com"), - )), - ) - .await; - - // History - assert_eq!( - db::outgoing_history(&pool, &History::default(), fake_listen) - .await - .unwrap() - .len(), - 2 - ); - } - - #[tokio::test] - async fn transfer() { - let (mut db, _) = setup().await; - - // Empty db - assert_eq!(db::transfer_by_id(&mut db, 0).await.unwrap(), None); - assert_eq!( - db::transfer_page(&mut db, &None, &Page::default()) - .await - .unwrap(), - Vec::new() - ); - - let req = TransferRequest { - request_uid: HashCode::rand(), - amount: amount("EUR:10"), - exchange_base_url: url("https://exchange.test.com/"), - wtid: ShortHashCode::rand(), - credit_account: payto("payto://magnet-bank/todo"), - }; - let payto = MagnetPayto { - number: "number".to_owned(), - name: "name".to_owned(), - }; - let timestamp = Timestamp::now_stable(); - // Insert - assert_eq!( - make_transfer(&mut db, &req, &payto, ×tamp) - .await - .expect("transfer"), - TransferResult::Success { - id: 1, - timestamp: timestamp - } - ); - // Idempotent - assert_eq!( - make_transfer(&mut db, &req, &payto, &Timestamp::now()) - .await - .expect("transfer"), - TransferResult::Success { - id: 1, - timestamp: timestamp - } - ); - // Request UID reuse - assert_eq!( - make_transfer( - &mut db, - &TransferRequest { - wtid: ShortHashCode::rand(), - ..req.clone() - }, - &payto, - &Timestamp::now() - ) - .await - .expect("transfer"), - TransferResult::RequestUidReuse - ); - // wtid reuse - assert_eq!( - make_transfer( - &mut db, - &TransferRequest { - request_uid: HashCode::rand(), - ..req.clone() - }, - &payto, - &Timestamp::now() - ) - .await - .expect("transfer"), - TransferResult::WtidReuse - ); - // Many - assert_eq!( - make_transfer( - &mut db, - &TransferRequest { - request_uid: HashCode::rand(), - wtid: ShortHashCode::rand(), - ..req - }, - &payto, - ×tamp - ) - .await - .expect("transfer"), - TransferResult::Success { - id: 2, - timestamp: timestamp - } - ); - - // Get - assert!(db::transfer_by_id(&mut db, 1).await.unwrap().is_some()); - assert!(db::transfer_by_id(&mut db, 2).await.unwrap().is_some()); - assert!(db::transfer_by_id(&mut db, 3).await.unwrap().is_none()); - assert_eq!( - db::transfer_page(&mut db, &None, &Page::default()) - .await - .unwrap() - .len(), - 2 - ); - } - - #[tokio::test] - async fn status() { - let (mut db, _) = setup().await; - - // Unknown transfer - db::initiated_submit_failure(&mut db, 1, &Timestamp::now(), "msg") - .await - .unwrap(); - db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) - .await - .unwrap(); - } - - #[tokio::test] - async fn batch() { - let (mut db, _) = setup().await; - let start = Timestamp::now(); - let magnet_payto = MagnetPayto { - number: "number".to_owned(), - name: "name".to_owned(), - }; - - // Empty db - let pendings = db::pending_batch(&mut db, &start) - .await - .expect("pending_batch"); - assert_eq!(pendings.len(), 0); - - // Some transfers - for i in 0..3 { - make_transfer( - &mut db, - &TransferRequest { - request_uid: HashCode::rand(), - amount: amount(format!("{CURRENCY}:{}", i + 1)), - exchange_base_url: url("https://exchange.test.com/"), - wtid: ShortHashCode::rand(), - credit_account: payto("payto://magnet-bank/todo"), - }, - &magnet_payto, - &&Timestamp::now(), - ) - .await - .expect("transfer"); - } - let pendings = db::pending_batch(&mut db, &start) - .await - .expect("pending_batch"); - assert_eq!(pendings.len(), 3); - - // Max 100 txs in batch - for i in 0..100 { - make_transfer( - &mut db, - &TransferRequest { - request_uid: HashCode::rand(), - amount: amount(format!("{CURRENCY}:{}", i + 1)), - exchange_base_url: url("https://exchange.test.com/"), - wtid: ShortHashCode::rand(), - credit_account: payto("payto://magnet-bank/todo"), - }, - &magnet_payto, - &Timestamp::now(), - ) - .await - .expect("transfer"); - } - let pendings = db::pending_batch(&mut db, &start) - .await - .expect("pending_batch"); - assert_eq!(pendings.len(), 100); - - // Skip uploaded - for i in 0..=10 { - db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) - .await - .expect("status success"); - } - let pendings = db::pending_batch(&mut db, &start) - .await - .expect("pending_batch"); - assert_eq!(pendings.len(), 93); - - // Skip tried since start - for i in 0..=10 { - db::initiated_submit_failure(&mut db, 10 + i, &Timestamp::now(), "failure") - .await - .expect("status failure"); - } - let pendings = db::pending_batch(&mut db, &start) - .await - .expect("pending_batch"); - assert_eq!(pendings.len(), 83); - let pendings = db::pending_batch(&mut db, &Timestamp::now()) - .await - .expect("pending_batch"); - assert_eq!(pendings.len(), 93); - } -} diff --git a/wire-gateway/magnet-bank/tests/api.rs b/wire-gateway/magnet-bank/tests/api.rs @@ -1,108 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::sync::Arc; - -use magnet_bank::{db, wire_gateway::MagnetWireGateway, MagnetPayto}; -use sqlx::PgPool; -use taler_api::{auth::AuthMethod, standard_layer, subject::OutgoingSubject}; -use taler_common::{ - api_common::ShortHashCode, - api_wire::{OutgoingHistory, TransferState}, - types::{amount::amount, payto::payto, timestamp::Timestamp, url}, -}; -use test_utils::{ - axum_test::TestServer, - db_test_setup, - helpers::TestResponseHelper, - routine::{admin_add_incoming_routine, routine_pagination, transfer_routine}, -}; - -async fn setup() -> (TestServer, PgPool) { - let pool = db_test_setup().await; - db::db_init(&pool, false).await.unwrap(); - let gateway = MagnetWireGateway::start(pool.clone(), payto("payto://magnet-bank/todo")).await; - let server = TestServer::new(standard_layer( - taler_api::wire_gateway_api(Arc::new(gateway)), - AuthMethod::None, - )) - .unwrap(); - - (server, pool) -} - -#[tokio::test] -async fn transfer() { - let (server, _) = setup().await; - transfer_routine( - &server, - TransferState::pending, - &payto("payto://magnet-bank/account?receiver-name=John+Smith"), - ) - .await; -} - -#[tokio::test] -async fn outgoing_history() { - let (server, pool) = setup().await; - server.get("/history/outgoing").await.assert_no_content(); - routine_pagination::<OutgoingHistory, _>( - &server, - "/history/outgoing", - |it| { - it.outgoing_transactions - .into_iter() - .map(|it| *it.row_id as i64) - .collect() - }, - |_, i| { - let acquire = pool.acquire(); - async move { - let mut conn = acquire.await.unwrap(); - db::register_tx_out( - &mut *conn, - &db::TxOut { - code: i as u64, - amount: amount("EUR:10"), - subject: "subject".to_owned(), - creditor: MagnetPayto { - number: "number".to_owned(), - name: "name".to_owned(), - }, - timestamp: Timestamp::now_stable(), - }, - &Some(OutgoingSubject( - ShortHashCode::rand(), - url("https://exchange.test"), - )), - ) - .await - .unwrap(); - } - }, - ) - .await; -} - -#[tokio::test] -async fn admin_add_incoming() { - let (server, _) = setup().await; - admin_add_incoming_routine( - &server, - &payto("payto://magnet-bank/account?receiver-name=John+Smith"), - ) - .await; -}