taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit da524311dcb265950ce4c36e35027fe8a223c3ad
parent 0da9e9bda792358586a8367ac2a5166dc635e3eb
Author: Antoine A <>
Date:   Wed, 29 Jan 2025 13:16:50 +0100

common: Revenue API and other improvements

Diffstat:
MCargo.lock | 22+++++++++++-----------
Aadapter/taler-magnet-bank-adapter/src/adapter.rs | 189+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Madapter/taler-magnet-bank-adapter/src/lib.rs | 2+-
Madapter/taler-magnet-bank-adapter/src/magnet/error.rs | 2+-
Madapter/taler-magnet-bank-adapter/src/main.rs | 16+++++++---------
Dadapter/taler-magnet-bank-adapter/src/wire_gateway.rs | 191-------------------------------------------------------------------------------
Madapter/taler-magnet-bank-adapter/tests/api.rs | 22++++++++++++----------
Acommon/taler-api/src/api.rs | 209+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acommon/taler-api/src/api/revenue.rs | 74++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acommon/taler-api/src/api/wire.rs | 174+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcommon/taler-api/src/constants.rs | 5+++--
Mcommon/taler-api/src/lib.rs | 313+------------------------------------------------------------------------------
Mcommon/taler-api/tests/api.rs | 33++++++++++++++++++++-------------
Mcommon/taler-api/tests/common/db.rs | 44++++++++++++++++++++++++++++++++++++++++++--
Mcommon/taler-api/tests/common/mod.rs | 48+++++++++++++++++++++++++++++-------------------
Acommon/taler-common/src/api_revenue.rs | 49+++++++++++++++++++++++++++++++++++++++++++++++++
Mcommon/taler-common/src/lib.rs | 1+
Mcommon/taler-test-utils/src/helpers.rs | 12++++++------
Mcommon/taler-test-utils/src/routine.rs | 64++++++++++++++++++++++++++++++++++++++--------------------------
19 files changed, 869 insertions(+), 601 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -274,9 +274,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.16.0" +version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" +checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" [[package]] name = "byteorder" @@ -1098,9 +1098,9 @@ dependencies = [ [[package]] name = "httparse" -version = "1.9.5" +version = "1.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7d71d3574edd2771538b901e6549113b4006ece66150fb69c0fb6d9a2adae946" +checksum = "f2d708df4e7140240a16cd6ab0ab65c972d7433ab77819ea693fde9c43811e2a" [[package]] name = "httpdate" @@ -1110,9 +1110,9 @@ checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" [[package]] name = "hyper" -version = "1.5.2" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "256fb8d4bd6413123cc9d91832d78325c48ff41677595be797d90f42969beae0" +checksum = "cc2b571658e38e0c01b1fdca3bbbe93c00d3d71693ff2770043f8c29bc7d6f80" dependencies = [ "bytes", "futures-channel", @@ -2236,9 +2236,9 @@ dependencies = [ [[package]] name = "serde_json" -version = "1.0.137" +version = "1.0.138" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "930cfb6e6abf99298aaad7d29abbef7a9999a9a8806a40088f55f0dcec03146b" +checksum = "d434192e7da787e94a6ea7e9670b26a036d0ca41e0b7efb2676dd32bae872949" dependencies = [ "itoa", "memchr", @@ -2642,13 +2642,13 @@ dependencies = [ [[package]] name = "tempfile" -version = "3.15.0" +version = "3.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a8a559c81686f576e8cd0290cd2a24a2a9ad80c98b3478856500fcbd7acd704" +checksum = "38c246215d7d24f48ae091a2902398798e05d978b24315d6efbc00ede9a8bb91" dependencies = [ "cfg-if", "fastrand", - "getrandom 0.2.15", + "getrandom 0.3.1", "once_cell", "rustix", "windows-sys 0.59.0", diff --git a/adapter/taler-magnet-bank-adapter/src/adapter.rs b/adapter/taler-magnet-bank-adapter/src/adapter.rs @@ -0,0 +1,189 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use taler_api::{ + api::{wire::WireGateway, TalerComponent}, + error::{failure, ApiResult}, + subject::IncomingSubject, +}; +use taler_common::{ + api_common::{safe_u64, SafeU64}, + api_params::{History, Page}, + api_wire::{ + AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, + IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, + TransferState, TransferStatus, + }, + error_code::ErrorCode, + types::{payto::Payto, timestamp::Timestamp}, +}; +use tokio::sync::watch::Sender; + +use crate::{ + constant::CURRENCY, + db::{self, AddIncomingResult, TxInAdmin}, + MagnetPayto, +}; + +pub struct MagnetAdapter { + pub pool: sqlx::PgPool, + pub payto: Payto, + pub in_channel: Sender<i64>, + pub taler_in_channel: Sender<i64>, + pub out_channel: Sender<i64>, + pub taler_out_channel: Sender<i64>, +} + +impl MagnetAdapter { + pub async fn start(pool: sqlx::PgPool, payto: Payto) -> Self { + let in_channel = Sender::new(0); + let taler_in_channel = Sender::new(0); + let out_channel = Sender::new(0); + let taler_out_channel = Sender::new(0); + let tmp = Self { + pool: pool.clone(), + payto, + in_channel: in_channel.clone(), + taler_in_channel: taler_in_channel.clone(), + out_channel: out_channel.clone(), + taler_out_channel: taler_out_channel.clone(), + }; + tokio::spawn(db::notification_listener( + pool, + in_channel, + taler_in_channel, + out_channel, + taler_out_channel, + )); + tmp + } +} + +impl TalerComponent for MagnetAdapter { + fn currency(&self) -> &str { + CURRENCY + } + + fn implementation(&self) -> Option<&str> { + Some("taler-magnet-bank-adapter") + } +} + +impl WireGateway for MagnetAdapter { + async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { + let creditor = MagnetPayto::try_from(&req.credit_account)?; + let result = db::make_transfer(&self.pool, &req, &creditor, &Timestamp::now()).await?; + match result { + db::TransferResult::Success { id, timestamp } => Ok(TransferResponse { + timestamp, + row_id: SafeU64::try_from(id).unwrap(), + }), + db::TransferResult::RequestUidReuse => Err(failure( + ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, + "request_uid used already", + )), + db::TransferResult::WtidReuse => unimplemented!(), + } + } + + async fn transfer_page( + &self, + page: Page, + status: Option<TransferState>, + ) -> ApiResult<TransferList> { + Ok(TransferList { + transfers: db::transfer_page(&self.pool, &status, &page).await?, + debit_account: self.payto.clone(), + }) + } + + async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { + Ok(db::transfer_by_id(&self.pool, id).await?) + } + + async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { + Ok(OutgoingHistory { + outgoing_transactions: db::outgoing_history(&self.pool, &params, || { + self.taler_out_channel.subscribe() + }) + .await?, + debit_account: self.payto.clone(), + }) + } + + async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { + Ok(IncomingHistory { + incoming_transactions: db::incoming_history(&self.pool, &params, || { + self.taler_in_channel.subscribe() + }) + .await?, + credit_account: self.payto.clone(), + }) + } + + async fn add_incoming_reserve( + &self, + req: AddIncomingRequest, + ) -> ApiResult<AddIncomingResponse> { + let debtor = MagnetPayto::try_from(&req.debit_account)?; + let res = db::register_tx_in_admin( + &self.pool, + &TxInAdmin { + amount: req.amount, + subject: format!("Admin incoming {}", req.reserve_pub), + debtor, + timestamp: Timestamp::now(), + metadata: IncomingSubject::Reserve(req.reserve_pub), + }, + ) + .await?; + match res { + AddIncomingResult::Success(res) => Ok(AddIncomingResponse { + row_id: safe_u64(res.row_id), + timestamp: res.timestamp, + }), + AddIncomingResult::ReservePubReuse => Err(failure( + ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, + "reserve_pub used already".to_owned(), + )), + } + } + + async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> { + let debtor = MagnetPayto::try_from(&req.debit_account)?; + let res = db::register_tx_in_admin( + &self.pool, + &TxInAdmin { + amount: req.amount, + subject: format!("Admin incoming KYC:{}", req.account_pub), + debtor, + timestamp: Timestamp::now(), + metadata: IncomingSubject::Kyc(req.account_pub), + }, + ) + .await?; + match res { + AddIncomingResult::Success(res) => Ok(AddKycauthResponse { + row_id: safe_u64(res.row_id), + timestamp: res.timestamp, + }), + AddIncomingResult::ReservePubReuse => Err(failure( + ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, + "reserve_pub used already".to_owned(), + )), + } + } +} diff --git a/adapter/taler-magnet-bank-adapter/src/lib.rs b/adapter/taler-magnet-bank-adapter/src/lib.rs @@ -22,7 +22,7 @@ pub mod db; pub mod dev; pub mod keys; pub mod magnet; -pub mod wire_gateway; +pub mod adapter; pub mod worker; pub mod failure_injection { pub fn fail_point(_name: &'static str) { diff --git a/adapter/taler-magnet-bank-adapter/src/magnet/error.rs b/adapter/taler-magnet-bank-adapter/src/magnet/error.rs @@ -120,7 +120,7 @@ async fn error_handling(res: reqwest::Result<Response>) -> ApiResult<String> { } /** Parse JSON and track error path */ -fn parse<T: DeserializeOwned>(str: &str) -> ApiResult<T> { +fn parse<'de, T: Deserialize<'de>>(str: &'de str) -> ApiResult<T> { let deserializer = &mut serde_json::Deserializer::from_str(str); serde_path_to_error::deserialize(deserializer).map_err(ApiError::Json) } diff --git a/adapter/taler-magnet-bank-adapter/src/main.rs b/adapter/taler-magnet-bank-adapter/src/main.rs @@ -18,16 +18,17 @@ use std::sync::Arc; use clap::Parser; use magnet_bank::{ + adapter::MagnetAdapter, config::{DbConfig, MagnetConfig, WireGatewayConfig}, db, dev::{self, DevCmd}, keys, magnet::AuthClient, - wire_gateway::MagnetWireGateway, worker::Worker, MagnetPayto, }; use sqlx::PgPool; +use taler_api::api::TalerApi; use taler_common::{ cli::ConfigCmd, config::{parser::ConfigSource, Config}, @@ -87,14 +88,11 @@ async fn app(args: Args, cfg: Config) -> anyhow::Result<()> { let db = DbConfig::parse(&cfg)?; let pool = PgPool::connect_with(db.cfg).await?; let cfg = WireGatewayConfig::parse(&cfg)?; - let gateway = MagnetWireGateway::start(pool, payto("payto://magnet-bank/todo")).await; - taler_api::server( - taler_api::wire_gateway_api(Arc::new(gateway)), - cfg.serve, - cfg.auth, - None, - ) - .await?; + let gateway = MagnetAdapter::start(pool, payto("payto://magnet-bank/todo")).await; + TalerApi::new() + .wire_gateway(Arc::new(gateway), cfg.auth) + .serve(cfg.serve, None) + .await?; } Command::Worker { account } => { let db = DbConfig::parse(&cfg)?; diff --git a/adapter/taler-magnet-bank-adapter/src/wire_gateway.rs b/adapter/taler-magnet-bank-adapter/src/wire_gateway.rs @@ -1,191 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use taler_api::{ - error::{failure, ApiResult}, - subject::IncomingSubject, - WireGatewayImpl, -}; -use taler_common::{ - api_common::{safe_u64, SafeU64}, - api_params::{History, Page}, - api_wire::{ - AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, - IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, - TransferState, TransferStatus, - }, - error_code::ErrorCode, - types::{payto::Payto, timestamp::Timestamp}, -}; -use tokio::sync::watch::Sender; - -use crate::{ - constant::CURRENCY, - db::{self, AddIncomingResult, TxInAdmin}, - MagnetPayto, -}; - -pub struct MagnetWireGateway { - pub pool: sqlx::PgPool, - pub payto: Payto, - pub in_channel: Sender<i64>, - pub taler_in_channel: Sender<i64>, - pub out_channel: Sender<i64>, - pub taler_out_channel: Sender<i64>, -} - -impl MagnetWireGateway { - pub async fn start(pool: sqlx::PgPool, payto: Payto) -> Self { - let in_channel = Sender::new(0); - let taler_in_channel = Sender::new(0); - let out_channel = Sender::new(0); - let taler_out_channel = Sender::new(0); - let tmp = Self { - pool: pool.clone(), - payto, - in_channel: in_channel.clone(), - taler_in_channel: taler_in_channel.clone(), - out_channel: out_channel.clone(), - taler_out_channel: taler_out_channel.clone(), - }; - tokio::spawn(db::notification_listener( - pool, - in_channel, - taler_in_channel, - out_channel, - taler_out_channel, - )); - tmp - } -} - -impl WireGatewayImpl for MagnetWireGateway { - fn name(&self) -> &str { - "magnet-bank" - } - - fn currency(&self) -> &str { - CURRENCY - } - - fn implementation(&self) -> Option<&str> { - None - } - - async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { - let creditor = MagnetPayto::try_from(&req.credit_account)?; - let result = db::make_transfer(&self.pool, &req, &creditor, &Timestamp::now()).await?; - match result { - db::TransferResult::Success { id, timestamp } => Ok(TransferResponse { - timestamp, - row_id: SafeU64::try_from(id).unwrap(), - }), - db::TransferResult::RequestUidReuse => Err(failure( - ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, - "request_uid used already", - )), - db::TransferResult::WtidReuse => unimplemented!(), - } - } - - async fn transfer_page( - &self, - page: Page, - status: Option<TransferState>, - ) -> ApiResult<TransferList> { - Ok(TransferList { - transfers: db::transfer_page(&self.pool, &status, &page).await?, - debit_account: self.payto.clone(), - }) - } - - async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { - Ok(db::transfer_by_id(&self.pool, id).await?) - } - - async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { - Ok(OutgoingHistory { - outgoing_transactions: db::outgoing_history(&self.pool, &params, || { - self.taler_out_channel.subscribe() - }) - .await?, - debit_account: self.payto.clone(), - }) - } - - async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { - Ok(IncomingHistory { - incoming_transactions: db::incoming_history(&self.pool, &params, || { - self.taler_in_channel.subscribe() - }) - .await?, - credit_account: self.payto.clone(), - }) - } - - async fn add_incoming_reserve( - &self, - req: AddIncomingRequest, - ) -> ApiResult<AddIncomingResponse> { - let debtor = MagnetPayto::try_from(&req.debit_account)?; - let res = db::register_tx_in_admin( - &self.pool, - &TxInAdmin { - amount: req.amount, - subject: format!("Admin incoming {}", req.reserve_pub), - debtor, - timestamp: Timestamp::now(), - metadata: IncomingSubject::Reserve(req.reserve_pub), - }, - ) - .await?; - match res { - AddIncomingResult::Success(res) => Ok(AddIncomingResponse { - row_id: safe_u64(res.row_id), - timestamp: res.timestamp, - }), - AddIncomingResult::ReservePubReuse => Err(failure( - ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, - "reserve_pub used already".to_owned(), - )), - } - } - - async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> { - let debtor = MagnetPayto::try_from(&req.debit_account)?; - let res = db::register_tx_in_admin( - &self.pool, - &TxInAdmin { - amount: req.amount, - subject: format!("Admin incoming KYC:{}", req.account_pub), - debtor, - timestamp: Timestamp::now(), - metadata: IncomingSubject::Kyc(req.account_pub), - }, - ) - .await?; - match res { - AddIncomingResult::Success(res) => Ok(AddKycauthResponse { - row_id: safe_u64(res.row_id), - timestamp: res.timestamp, - }), - AddIncomingResult::ReservePubReuse => Err(failure( - ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, - "reserve_pub used already".to_owned(), - )), - } - } -} diff --git a/adapter/taler-magnet-bank-adapter/tests/api.rs b/adapter/taler-magnet-bank-adapter/tests/api.rs @@ -16,9 +16,9 @@ use std::sync::Arc; -use magnet_bank::{db, wire_gateway::MagnetWireGateway, MagnetPayto}; +use magnet_bank::{adapter::MagnetAdapter, db, MagnetPayto}; use sqlx::PgPool; -use taler_api::{auth::AuthMethod, standard_layer, subject::OutgoingSubject}; +use taler_api::{api::TalerApi, auth::AuthMethod, subject::OutgoingSubject}; use taler_common::{ api_common::ShortHashCode, api_wire::{OutgoingHistory, TransferState}, @@ -34,12 +34,11 @@ use taler_test_utils::{ async fn setup() -> (TestServer, PgPool) { let pool = db_test_setup().await; db::db_init(&pool, false).await.unwrap(); - let gateway = MagnetWireGateway::start(pool.clone(), payto("payto://magnet-bank/todo")).await; - let server = TestServer::new(standard_layer( - taler_api::wire_gateway_api(Arc::new(gateway)), - AuthMethod::None, - )) - .unwrap(); + let gateway = MagnetAdapter::start(pool.clone(), payto("payto://magnet-bank/todo")).await; + let api = TalerApi::new() + .wire_gateway(Arc::new(gateway), AuthMethod::None) + .finalize(); + let server = TestServer::new(api).unwrap(); (server, pool) } @@ -58,10 +57,13 @@ async fn transfer() { #[tokio::test] async fn outgoing_history() { let (server, pool) = setup().await; - server.get("/history/outgoing").await.assert_no_content(); + server + .get("/taler-wire-gateway/history/outgoing") + .await + .assert_no_content(); routine_pagination::<OutgoingHistory, _>( &server, - "/history/outgoing", + "/taler-wire-gateway/history/outgoing", |it| { it.outgoing_transactions .into_iter() diff --git a/common/taler-api/src/api.rs b/common/taler-api/src/api.rs @@ -0,0 +1,209 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::{ + sync::{ + atomic::{AtomicU32, Ordering}, + Arc, + }, + time::Instant, +}; + +use axum::{ + extract::{Request, State}, + middleware::{self, Next}, + response::Response, + Router, +}; +use revenue::Revenue; +use taler_common::{error_code::ErrorCode, types::amount::Amount}; +use tokio::signal; +use tracing::info; +use wire::WireGateway; + +use crate::{ + auth::AuthMethod, + error::{failure, failure_code, ApiResult}, + Listener, Serve, +}; + +pub mod revenue; +pub mod wire; + +pub trait TalerComponent: Send + Sync + 'static { + fn currency(&self) -> &str; + fn implementation(&self) -> Option<&str>; + fn check_currency(&self, amount: &Amount) -> ApiResult<()> { + let currency = self.currency(); + if amount.currency.as_ref() != currency { + Err(failure( + ErrorCode::GENERIC_CURRENCY_MISMATCH, + format!( + "Wrong currency: expected {} got {}", + currency, amount.currency + ), + )) + } else { + Ok(()) + } + } +} + +pub struct TalerApi { + router: Router, +} + +impl TalerApi { + pub fn new() -> Self { + Self { + router: Router::new(), + } + } + + pub fn wire_gateway<T: WireGateway>(mut self, api: Arc<T>, auth: AuthMethod) -> Self { + self.router = self.router.nest( + "/taler-wire-gateway", + wire::router(api).layer(middleware::from_fn_with_state( + Arc::new(auth), + crate::auth::auth_middleware, + )), + ); + self + } + + pub fn revenue<T: Revenue>(mut self, api: Arc<T>, auth: AuthMethod) -> Self { + self.router = self.router.nest( + "/taler-revenue", + revenue::router(api).layer(middleware::from_fn_with_state( + Arc::new(auth), + crate::auth::auth_middleware, + )), + ); + self + } + + pub fn finalize(self) -> Router { + self.router + .method_not_allowed_fallback(|| async { + failure_code(ErrorCode::GENERIC_METHOD_INVALID) + }) + .fallback(|| async { failure_code(ErrorCode::GENERIC_ENDPOINT_UNKNOWN) }) + .layer(middleware::from_fn(logger_middleware)) + } + + pub async fn serve(mut self, serve: Serve, lifetime: Option<u32>) -> std::io::Result<()> { + let listener = serve.resolve()?; + + let notify = Arc::new(tokio::sync::Notify::new()); + if let Some(lifetime) = lifetime { + self.router = self.router.layer(middleware::from_fn_with_state( + Arc::new(LifetimeMiddlewareState { + notify: notify.clone(), + lifetime: AtomicU32::new(lifetime), + }), + lifetime_middleware, + )) + } + let router = self.finalize(); + let signal = shutdown_signal(notify); + match listener { + Listener::Tcp(tcp_listener) => { + axum::serve(tcp_listener, router) + .with_graceful_shutdown(signal) + .await?; + } + Listener::Unix(unix_listener) => { + axum::serve(unix_listener, router) + .with_graceful_shutdown(signal) + .await?; + } + } + + info!(target: "api", "Server stopped"); + Ok(()) + } +} + +struct LifetimeMiddlewareState { + lifetime: AtomicU32, + notify: Arc<tokio::sync::Notify>, +} + +async fn lifetime_middleware( + State(state): State<Arc<LifetimeMiddlewareState>>, + request: Request, + next: Next, +) -> Response { + let mut current = state.lifetime.load(Ordering::Relaxed); + while current != 0 { + match state.lifetime.compare_exchange_weak( + current, + current - 1, + Ordering::Relaxed, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(new) => current = new, + } + } + if current == 0 { + state.notify.notify_one(); + } + next.run(request).await +} + +/** Wait for manual shutdown or system signal shutdown */ +async fn shutdown_signal(manual_shutdown: Arc<tokio::sync::Notify>) { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to install signal handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + let manual = async { manual_shutdown.notified().await }; + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + _ = manual => {} + } +} + +/** Taler API logger */ +async fn logger_middleware(request: Request, next: Next) -> Response { + let request_info = format!("{} {}", request.method(), request.uri().path()); + let now = Instant::now(); + let response = next.run(request).await; + let elapsed = now.elapsed(); + // TODO log error message + info!(target: "api", + "{} {request_info} {}ms", + response.status(), + elapsed.as_millis() + ); + response +} diff --git a/common/taler-api/src/api/revenue.rs b/common/taler-api/src/api/revenue.rs @@ -0,0 +1,74 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::sync::Arc; + +use axum::{ + extract::{Query, State}, + http::StatusCode, + response::IntoResponse, + routing::get, + Json, Router, +}; +use taler_common::{ + api_params::{History, HistoryParams}, + api_revenue::{RevenueConfig, RevenueIncomingHistory}, +}; + +use crate::{ + constants::{MAX_PAGE_SIZE, MAX_TIMEOUT_MS, REVENUE_API_VERSION}, + error::ApiResult, +}; + +use super::TalerComponent; + +pub trait Revenue: TalerComponent { + fn history( + &self, + history: History, + ) -> impl std::future::Future<Output = ApiResult<RevenueIncomingHistory>> + Send; +} + +pub fn router<I: Revenue>(state: Arc<I>) -> Router { + Router::new() + .route( + "/config", + get(|State(state): State<Arc<I>>| async move { + Json(RevenueConfig { + name: "taler-revenue", + version: REVENUE_API_VERSION, + currency: state.currency(), + implementation: state.implementation(), + }) + .into_response() + }), + ) + .route( + "/history", + get( + |State(state): State<Arc<I>>, Query(params): Query<HistoryParams>| async move { + let params = params.check(MAX_PAGE_SIZE, MAX_TIMEOUT_MS)?; + let history = state.history(params).await?; + ApiResult::Ok(if history.incoming_transactions.is_empty() { + StatusCode::NO_CONTENT.into_response() + } else { + Json(history).into_response() + }) + }, + ), + ) + .with_state(state) +} diff --git a/common/taler-api/src/api/wire.rs b/common/taler-api/src/api/wire.rs @@ -0,0 +1,174 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::sync::Arc; + +use axum::{ + extract::{Path, Query, State}, + http::StatusCode, + response::IntoResponse as _, + routing::{get, post}, + Json, Router, +}; +use taler_common::{ + api_params::{History, HistoryParams, Page, TransferParams}, + api_wire::{ + AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, + IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, + TransferState, TransferStatus, WireConfig, + }, + error_code::ErrorCode, +}; + +use crate::{ + constants::{MAX_PAGE_SIZE, MAX_TIMEOUT_MS, WIRE_GATEWAY_API_VERSION}, + error::{failure, ApiResult}, + json::Req, +}; + +use super::TalerComponent; + +pub trait WireGateway: TalerComponent { + fn transfer( + &self, + req: TransferRequest, + ) -> impl std::future::Future<Output = ApiResult<TransferResponse>> + Send; + fn transfer_page( + &self, + page: Page, + status: Option<TransferState>, + ) -> impl std::future::Future<Output = ApiResult<TransferList>> + Send; + fn transfer_by_id( + &self, + id: u64, + ) -> impl std::future::Future<Output = ApiResult<Option<TransferStatus>>> + Send; + fn outgoing_history( + &self, + params: History, + ) -> impl std::future::Future<Output = ApiResult<OutgoingHistory>> + Send; + fn incoming_history( + &self, + params: History, + ) -> impl std::future::Future<Output = ApiResult<IncomingHistory>> + Send; + fn add_incoming_reserve( + &self, + req: AddIncomingRequest, + ) -> impl std::future::Future<Output = ApiResult<AddIncomingResponse>> + Send; + fn add_incoming_kyc( + &self, + req: AddKycauthRequest, + ) -> impl std::future::Future<Output = ApiResult<AddKycauthResponse>> + Send; +} + +pub fn router<I: WireGateway>(state: Arc<I>) -> Router { + Router::new() + .route( + "/config", + get(|State(state): State<Arc<I>>| async move { + Json(WireConfig { + name: "taler-wire-gateway", + version: WIRE_GATEWAY_API_VERSION, + currency: state.currency(), + implementation: state.implementation(), + }) + .into_response() + }), + ) + .route( + "/transfer", + post( + |State(state): State<Arc<I>>, Req(req): Req<TransferRequest>| async move { + state.check_currency(&req.amount)?; + ApiResult::Ok(Json(state.transfer(req).await?)) + }, + ), + ) + .route( + "/transfers", + get( + |State(state): State<Arc<I>>, Query(params): Query<TransferParams>| async move { + let page = params.pagination.check(MAX_PAGE_SIZE)?; + let list = state.transfer_page(page, params.status).await?; + ApiResult::Ok(if list.transfers.is_empty() { + StatusCode::NO_CONTENT.into_response() + } else { + Json(list).into_response() + }) + }, + ), + ) + .route( + "/transfers/{id}", + get( + |State(state): State<Arc<I>>, Path(id): Path<u64>| async move { + match state.transfer_by_id(id).await? { + Some(it) => Ok(Json(it)), + None => Err(failure( + ErrorCode::BANK_TRANSACTION_NOT_FOUND, + format!("Transfer '{id}' not found"), + )), + } + }, + ), + ) + .route( + "/history/incoming", + get( + |State(state): State<Arc<I>>, Query(params): Query<HistoryParams>| async move { + let params = params.check(MAX_PAGE_SIZE, MAX_TIMEOUT_MS)?; + let history = state.incoming_history(params).await?; + ApiResult::Ok(if history.incoming_transactions.is_empty() { + StatusCode::NO_CONTENT.into_response() + } else { + Json(history).into_response() + }) + }, + ), + ) + .route( + "/history/outgoing", + get( + |State(state): State<Arc<I>>, Query(params): Query<HistoryParams>| async move { + let params = params.check(MAX_PAGE_SIZE, MAX_TIMEOUT_MS)?; + let history = state.outgoing_history(params).await?; + ApiResult::Ok(if history.outgoing_transactions.is_empty() { + StatusCode::NO_CONTENT.into_response() + } else { + Json(history).into_response() + }) + }, + ), + ) + .route( + "/admin/add-incoming", + post( + |State(state): State<Arc<I>>, Req(req): Req<AddIncomingRequest>| async move { + state.check_currency(&req.amount)?; + ApiResult::Ok(Json(state.add_incoming_reserve(req).await?)) + }, + ), + ) + .route( + "/admin/add-kycauth", + post( + |State(state): State<Arc<I>>, Req(req): Req<AddKycauthRequest>| async move { + state.check_currency(&req.amount)?; + ApiResult::Ok(Json(state.add_incoming_kyc(req).await?)) + }, + ), + ) + .with_state(state) +} diff --git a/common/taler-api/src/constants.rs b/common/taler-api/src/constants.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2024 Taler Systems SA + Copyright (C) 2024-2025 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -14,7 +14,8 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -pub const WIRE_GATEWAY_API_VERSION: &str = "0:0:0"; +pub const WIRE_GATEWAY_API_VERSION: &str = "0:0:0"; // TODO update +pub const REVENUE_API_VERSION: &str = "0:0:0"; // TODO update pub const MAX_PAGE_SIZE: i64 = 1024; pub const MAX_TIMEOUT_MS: u64 = 60 * 60 * 10; // 1H pub const MAX_BODY_LENGTH: usize = 4 * 1024; // 4kB diff --git a/common/taler-api/src/lib.rs b/common/taler-api/src/lib.rs @@ -14,47 +14,13 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::{ - fs::Permissions, - io::ErrorKind, - net::SocketAddr, - os::unix::fs::PermissionsExt, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, - }, - time::Instant, -}; +use std::{fs::Permissions, io::ErrorKind, net::SocketAddr, os::unix::fs::PermissionsExt as _}; -use auth::{auth_middleware, AuthMethod}; -use axum::{ - extract::{Path, Query, Request, State}, - http::StatusCode, - middleware::{self, Next}, - response::{IntoResponse, Response}, - routing::{get, post}, - Json, Router, -}; -use constants::{MAX_PAGE_SIZE, MAX_TIMEOUT_MS, WIRE_GATEWAY_API_VERSION}; -use error::{failure, failure_code, ApiResult}; -use json::Req; use listenfd::ListenFd; -use taler_common::{ - api_params::{History, HistoryParams, Page, TransferParams}, - api_wire::{ - AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, - IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, - TransferState, TransferStatus, WireConfig, - }, - error_code::ErrorCode, - types::amount::Amount, -}; -use tokio::{ - net::{TcpListener, UnixListener}, - signal, -}; +use tokio::net::{TcpListener, UnixListener}; use tracing::info; +pub mod api; pub mod auth; mod constants; pub mod db; @@ -63,66 +29,6 @@ pub mod json; pub mod notification; pub mod subject; -pub trait WireGatewayImpl: Send + Sync { - fn name(&self) -> &str; - fn currency(&self) -> &str; - fn implementation(&self) -> Option<&str>; - fn transfer( - &self, - req: TransferRequest, - ) -> impl std::future::Future<Output = ApiResult<TransferResponse>> + Send; - fn transfer_page( - &self, - page: Page, - status: Option<TransferState>, - ) -> impl std::future::Future<Output = ApiResult<TransferList>> + Send; - fn transfer_by_id( - &self, - id: u64, - ) -> impl std::future::Future<Output = ApiResult<Option<TransferStatus>>> + Send; - fn outgoing_history( - &self, - params: History, - ) -> impl std::future::Future<Output = ApiResult<OutgoingHistory>> + Send; - fn incoming_history( - &self, - params: History, - ) -> impl std::future::Future<Output = ApiResult<IncomingHistory>> + Send; - fn add_incoming_reserve( - &self, - req: AddIncomingRequest, - ) -> impl std::future::Future<Output = ApiResult<AddIncomingResponse>> + Send; - fn add_incoming_kyc( - &self, - req: AddKycauthRequest, - ) -> impl std::future::Future<Output = ApiResult<AddKycauthResponse>> + Send; - - fn check_currency(&self, amount: &Amount) -> ApiResult<()> { - let currency = self.currency(); - if amount.currency.as_ref() != currency { - Err(failure( - ErrorCode::GENERIC_CURRENCY_MISMATCH, - format!( - "Wrong currency: expected {} got {}", - currency, amount.currency - ), - )) - } else { - Ok(()) - } - } -} - -pub fn standard_layer(router: Router, auth: AuthMethod) -> Router { - router - .method_not_allowed_fallback(|| async { failure_code(ErrorCode::GENERIC_METHOD_INVALID) }) - .fallback(|| async { failure_code(ErrorCode::GENERIC_ENDPOINT_UNKNOWN) }) - .layer(middleware::from_fn_with_state( - Arc::new(auth), - auth_middleware, - )) -} - pub enum Serve { Tcp(SocketAddr), Unix { @@ -179,216 +85,3 @@ enum Listener { Tcp(TcpListener), Unix(UnixListener), } - -pub async fn server( - mut router: Router, - serve: Serve, - auth: AuthMethod, - lifetime_counter: Option<AtomicU32>, -) -> Result<(), std::io::Error> { - let notify = Arc::new(tokio::sync::Notify::new()); - if let Some(lifetime) = lifetime_counter { - router = router.layer(middleware::from_fn_with_state( - Arc::new(LifetimeMiddlewareState { - notify: notify.clone(), - lifetime, - }), - lifetime_middleware, - )) - } - let router = standard_layer(router, auth); - let listener = serve.resolve()?; - - match listener { - Listener::Tcp(tcp_listener) => { - axum::serve( - tcp_listener, - router.layer(middleware::from_fn(logger_middleware)), - ) - .with_graceful_shutdown(shutdown_signal(notify)) - .await?; - } - Listener::Unix(unix_listener) => { - axum::serve( - unix_listener, - router.layer(middleware::from_fn(logger_middleware)), - ) - .with_graceful_shutdown(shutdown_signal(notify)) - .await?; - } - } - - info!(target: "api", "Server stopped"); - Ok(()) -} - -struct LifetimeMiddlewareState { - lifetime: AtomicU32, - notify: Arc<tokio::sync::Notify>, -} - -async fn lifetime_middleware( - State(state): State<Arc<LifetimeMiddlewareState>>, - request: Request, - next: Next, -) -> Response { - let mut current = state.lifetime.load(Ordering::Relaxed); - while current != 0 { - match state.lifetime.compare_exchange_weak( - current, - current - 1, - Ordering::Relaxed, - Ordering::Relaxed, - ) { - Ok(_) => break, - Err(new) => current = new, - } - } - if current == 0 { - state.notify.notify_one(); - } - next.run(request).await -} - -/** Wait for manual shutdown or system signal shutdown */ -async fn shutdown_signal(manual_shutdown: Arc<tokio::sync::Notify>) { - let ctrl_c = async { - signal::ctrl_c() - .await - .expect("failed to install Ctrl+C handler"); - }; - - #[cfg(unix)] - let terminate = async { - signal::unix::signal(signal::unix::SignalKind::terminate()) - .expect("failed to install signal handler") - .recv() - .await; - }; - - #[cfg(not(unix))] - let terminate = std::future::pending::<()>(); - - let manual = async { manual_shutdown.notified().await }; - - tokio::select! { - _ = ctrl_c => {}, - _ = terminate => {}, - _ = manual => {} - } -} - -/** Taler API logger */ -async fn logger_middleware(request: Request, next: Next) -> Response { - let request_info = format!("{} {}", request.method(), request.uri().path()); - let now = Instant::now(); - let response = next.run(request).await; - let elapsed = now.elapsed(); - // TODO log error message - info!(target: "api", - "{} {request_info} {}ms", - response.status(), - elapsed.as_millis() - ); - response -} - -pub fn wire_gateway_api<I: WireGatewayImpl + 'static>(wg: Arc<I>) -> Router { - Router::new() - .route( - "/config", - get(|State(state): State<Arc<I>>| async move { - Json(WireConfig { - name: state.name(), - version: WIRE_GATEWAY_API_VERSION, - currency: state.currency(), - implementation: state.implementation(), - }) - .into_response() - }), - ) - .route( - "/transfer", - post( - |State(state): State<Arc<I>>, Req(req): Req<TransferRequest>| async move { - state.check_currency(&req.amount)?; - ApiResult::Ok(Json(state.transfer(req).await?)) - }, - ), - ) - .route( - "/transfers", - get( - |State(state): State<Arc<I>>, Query(params): Query<TransferParams>| async move { - let page = params.pagination.check(MAX_PAGE_SIZE)?; - let list = state.transfer_page(page, params.status).await?; - ApiResult::Ok(if list.transfers.is_empty() { - StatusCode::NO_CONTENT.into_response() - } else { - Json(list).into_response() - }) - }, - ), - ) - .route( - "/transfers/{id}", - get( - |State(state): State<Arc<I>>, Path(id): Path<u64>| async move { - match state.transfer_by_id(id).await? { - Some(it) => Ok(Json(it)), - None => Err(failure( - ErrorCode::BANK_TRANSACTION_NOT_FOUND, - format!("Transfer '{id}' not found"), - )), - } - }, - ), - ) - .route( - "/history/incoming", - get( - |State(state): State<Arc<I>>, Query(params): Query<HistoryParams>| async move { - let params = params.check(MAX_PAGE_SIZE, MAX_TIMEOUT_MS)?; - let history = state.incoming_history(params).await?; - ApiResult::Ok(if history.incoming_transactions.is_empty() { - StatusCode::NO_CONTENT.into_response() - } else { - Json(history).into_response() - }) - }, - ), - ) - .route( - "/history/outgoing", - get( - |State(state): State<Arc<I>>, Query(params): Query<HistoryParams>| async move { - let params = params.check(MAX_PAGE_SIZE, MAX_TIMEOUT_MS)?; - let history = state.outgoing_history(params).await?; - ApiResult::Ok(if history.outgoing_transactions.is_empty() { - StatusCode::NO_CONTENT.into_response() - } else { - Json(history).into_response() - }) - }, - ), - ) - .route( - "/admin/add-incoming", - post( - |State(state): State<Arc<I>>, Req(req): Req<AddIncomingRequest>| async move { - state.check_currency(&req.amount)?; - ApiResult::Ok(Json(state.add_incoming_reserve(req).await?)) - }, - ), - ) - .route( - "/admin/add-kycauth", - post( - |State(state): State<Arc<I>>, Req(req): Req<AddKycauthRequest>| async move { - state.check_currency(&req.amount)?; - ApiResult::Ok(Json(state.add_incoming_kyc(req).await?)) - }, - ), - ) - .with_state(wg) -} diff --git a/common/taler-api/tests/api.rs b/common/taler-api/tests/api.rs @@ -14,12 +14,12 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use common::sample_wire_gateway_api; +use common::test_api; use sqlx::PgPool; -use taler_api::{auth::AuthMethod, standard_layer}; use taler_common::{ api_common::{HashCode, ShortHashCode}, - api_wire::{OutgoingHistory, TransferResponse, TransferState}, + api_revenue::RevenueConfig, + api_wire::{OutgoingHistory, TransferResponse, TransferState, WireConfig}, error_code::ErrorCode, types::{amount::amount, payto::payto, url}, }; @@ -35,12 +35,9 @@ mod common; async fn setup() -> (TestServer, PgPool) { let pool = db_test_setup().await; + let api = test_api(pool.clone(), "EUR".to_string()).await; - let server = TestServer::new(standard_layer( - sample_wire_gateway_api(Some(pool.clone()), "EUR".to_string()).await, - AuthMethod::None, - )) - .unwrap(); + let server = TestServer::new(api.finalize()).unwrap(); (server, pool) } @@ -53,7 +50,7 @@ async fn errors() { .await .assert_error(ErrorCode::GENERIC_ENDPOINT_UNKNOWN); server - .post("/config") + .post("/taler-revenue/config") .await .assert_error(ErrorCode::GENERIC_METHOD_INVALID); } @@ -61,7 +58,14 @@ async fn errors() { #[tokio::test] async fn config() { let (server, _) = setup().await; - server.get("/config").await.assert_status_ok(); + server + .get("/taler-wire-gateway/config") + .await + .assert_ok_json::<WireConfig>(); + server + .get("/taler-revenue/config") + .await + .assert_ok_json::<RevenueConfig>(); } #[tokio::test] @@ -73,11 +77,14 @@ async fn transfer() { #[tokio::test] async fn outgoing_history() { let (server, _) = setup().await; - server.get("/history/outgoing").await.assert_no_content(); + server + .get("/taler-wire-gateway/history/outgoing") + .await + .assert_no_content(); routine_pagination::<OutgoingHistory, _>( &server, - "/history/outgoing", + "/taler-wire-gateway/history/outgoing", |it| { it.outgoing_transactions .into_iter() @@ -86,7 +93,7 @@ async fn outgoing_history() { }, |server, i| async move { server - .post("/transfer") + .post("/taler-wire-gateway/transfer") .json(&json!({ "request_uid": HashCode::rand(), "amount": amount(&format!("EUR:0.0{i}")), diff --git a/common/taler-api/tests/common/db.rs b/common/taler-api/tests/common/db.rs @@ -19,6 +19,7 @@ use taler_api::db::{history, page, BindHelper, IncomingType, TypeHelper}; use taler_common::{ api_common::{EddsaPublicKey, SafeU64}, api_params::{History, Page}, + api_revenue::RevenueIncomingBankTransaction, api_wire::{ IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, TransferResponse, TransferState, TransferStatus, @@ -151,7 +152,7 @@ pub async fn transfer_by_id( .await } -pub async fn outgoing_page( +pub async fn outgoing_revenue( db: &PgPool, params: &History, currency: &str, @@ -228,7 +229,7 @@ pub async fn add_incoming( .await } -pub async fn incoming_page( +pub async fn incoming_history( db: &PgPool, params: &History, currency: &str, @@ -285,3 +286,42 @@ pub async fn incoming_page( ) .await } + +pub async fn revenue_history( + db: &PgPool, + params: &History, + currency: &str, + listen: impl FnOnce() -> Receiver<i64>, +) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { + history( + db, + "incoming_transaction_id", + params, + listen, + || { + QueryBuilder::new( + " + SELECT + incoming_transaction_id, + (amount).val as amount_val, + (amount).frac as amount_frac, + creation_time, + debit_payto, + subject + FROM incoming_transactions WHERE + ", + ) + }, + |r: PgRow| { + Ok(RevenueIncomingBankTransaction { + row_id: r.try_get_safeu64("incoming_transaction_id")?, + date: r.try_get_timestamp("creation_time")?, + amount: r.try_get_amount("amount", currency)?, + credit_fee: None, + debit_account: r.try_get_payto("debit_payto")?, + subject: r.try_get("subject")?, + }) + }, + ) + .await +} diff --git a/common/taler-api/tests/common/mod.rs b/common/taler-api/tests/common/mod.rs @@ -16,16 +16,17 @@ use std::sync::Arc; -use axum::Router; use db::notification_listener; use sqlx::PgPool; use taler_api::{ + api::{revenue::Revenue, wire::WireGateway, TalerApi, TalerComponent}, + auth::AuthMethod, db::IncomingType, error::{failure, ApiResult}, - wire_gateway_api, WireGatewayImpl, }; use taler_common::{ api_params::{History, Page}, + api_revenue::RevenueIncomingHistory, api_wire::{ AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, @@ -38,19 +39,15 @@ use tokio::sync::watch::Sender; pub mod db; -/// Sample Wire Gateway implementation for tests -pub struct SampleState { +/// Taler API implementation for tests +pub struct TestAdapter { currency: String, pool: PgPool, outgoing_channel: Sender<i64>, incoming_channel: Sender<i64>, } -impl WireGatewayImpl for SampleState { - fn name(&self) -> &str { - "taler-wire-gateway" - } - +impl TalerComponent for TestAdapter { fn currency(&self) -> &str { &self.currency } @@ -58,7 +55,9 @@ impl WireGatewayImpl for SampleState { fn implementation(&self) -> Option<&str> { None } +} +impl WireGateway for TestAdapter { async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { let result = db::transfer(&self.pool, req).await?; match result { @@ -86,7 +85,7 @@ impl WireGatewayImpl for SampleState { } async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { - let txs = db::outgoing_page(&self.pool, &params, &self.currency, || { + let txs = db::outgoing_revenue(&self.pool, &params, &self.currency, || { self.outgoing_channel.subscribe() }) .await?; @@ -97,7 +96,7 @@ impl WireGatewayImpl for SampleState { } async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { - let txs = db::incoming_page(&self.pool, &params, &self.currency, || { + let txs = db::incoming_history(&self.pool, &params, &self.currency, || { self.incoming_channel.subscribe() }) .await?; @@ -153,12 +152,20 @@ impl WireGatewayImpl for SampleState { } } -pub async fn sample_wire_gateway_api(pool: Option<PgPool>, currency: String) -> Router { - // Create pool - let pool = match pool { - Some(it) => it, - None => PgPool::connect("postgre:///magnetcheck").await.unwrap(), - }; +impl Revenue for TestAdapter { + async fn history(&self, history: History) -> ApiResult<RevenueIncomingHistory> { + let txs = db::revenue_history(&self.pool, &history, &self.currency, || { + self.incoming_channel.subscribe() + }) + .await?; + Ok(RevenueIncomingHistory { + incoming_transactions: txs, + credit_account: payto("payto://test"), + }) + } +} + +pub async fn test_api(pool: PgPool, currency: String) -> TalerApi { // Reset db sqlx::raw_sql(include_str!("../schema.sql")) .execute(&pool) @@ -166,7 +173,7 @@ pub async fn sample_wire_gateway_api(pool: Option<PgPool>, currency: String) -> .unwrap(); let outgoing_channel = Sender::new(0); let incoming_channel = Sender::new(0); - let wg = SampleState { + let wg = TestAdapter { currency, pool: pool.clone(), outgoing_channel: outgoing_channel.clone(), @@ -177,5 +184,8 @@ pub async fn sample_wire_gateway_api(pool: Option<PgPool>, currency: String) -> outgoing_channel, incoming_channel, )); - wire_gateway_api(Arc::new(wg)) + let state = Arc::new(wg); + TalerApi::new() + .wire_gateway(state.clone(), AuthMethod::None) + .revenue(state, AuthMethod::None) } diff --git a/common/taler-common/src/api_revenue.rs b/common/taler-common/src/api_revenue.rs @@ -0,0 +1,49 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +//! Type for the Taler Wire Gateway HTTP API <https://docs.taler.net/core/api-bank-wire.html#taler-wire-gateway-http-api> + +use crate::types::{amount::Amount, payto::Payto, timestamp::Timestamp}; + +use super::api_common::SafeU64; +use serde::{Deserialize, Serialize}; + +/// <https://docs.taler.net/core/api-bank-revenue.html#tsref-type-RevenueConfig> +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct RevenueConfig<'a> { + pub name: &'a str, + pub version: &'a str, + pub currency: &'a str, + pub implementation: Option<&'a str>, +} + +/// <https://docs.taler.net/core/api-bank-revenue.html#tsref-type-RevenueIncomingHistory> +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RevenueIncomingHistory { + pub incoming_transactions: Vec<RevenueIncomingBankTransaction>, + pub credit_account: Payto, +} + +/// <https://docs.taler.net/core/api-bank-revenue.html#tsref-type-RevenueIncomingBankTransaction> +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct RevenueIncomingBankTransaction { + pub row_id: SafeU64, + pub date: Timestamp, + pub amount: Amount, + pub credit_fee: Option<Amount>, + pub debit_account: Payto, + pub subject: String, +} diff --git a/common/taler-common/src/lib.rs b/common/taler-common/src/lib.rs @@ -23,6 +23,7 @@ use tracing_subscriber::{util::SubscriberInitExt as _, FmtSubscriber}; pub mod api_common; pub mod api_params; +pub mod api_revenue; pub mod api_wire; pub mod cli; pub mod config; diff --git a/common/taler-test-utils/src/helpers.rs b/common/taler-test-utils/src/helpers.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2024 Taler Systems SA + Copyright (C) 2024-2025 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -16,13 +16,13 @@ use axum::http::StatusCode; use axum_test::TestResponse; -use serde::de::DeserializeOwned; +use serde::Deserialize; use serde_json::Value; use taler_common::{api_common::ErrorDetail, error_code::ErrorCode}; pub trait TestResponseHelper { - fn json_parse<T: DeserializeOwned>(&self) -> T; - fn assert_ok_json<T: DeserializeOwned>(&self) -> T; + fn json_parse<'de, T: Deserialize<'de>>(&'de self) -> T; + fn assert_ok_json<'de, T: Deserialize<'de>>(&'de self) -> T; fn assert_ok(&self); fn assert_no_content(&self); fn assert_error(&self, error_code: ErrorCode); @@ -55,7 +55,7 @@ pub fn assert_status(resp: &TestResponse, status: StatusCode) { impl TestResponseHelper for TestResponse { #[track_caller] - fn assert_ok_json<T: DeserializeOwned>(&self) -> T { + fn assert_ok_json<'de, T: Deserialize<'de>>(&'de self) -> T { self.assert_ok(); self.json_parse() } @@ -76,7 +76,7 @@ impl TestResponseHelper for TestResponse { assert_eq!(error_code as u32, err.code); } #[track_caller] - fn json_parse<T: DeserializeOwned>(&self) -> T { + fn json_parse<'de, T: Deserialize<'de>>(&'de self) -> T { match serde_json::from_slice(self.as_bytes()) { Ok(body) => body, Err(err) => match serde_json::from_slice::<Value>(self.as_bytes()) { diff --git a/common/taler-test-utils/src/routine.rs b/common/taler-test-utils/src/routine.rs @@ -24,7 +24,7 @@ use std::{ use axum::{extract::Query, http::Uri}; use axum_test::{TestResponse, TestServer}; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Deserialize}; use taler_api::db::IncomingType; use taler_common::{ api_common::{EddsaPublicKey, HashCode, ShortHashCode}, @@ -49,7 +49,7 @@ pub async fn routine_pagination<'a, T: DeserializeOwned, F: Future<Output = ()>> // Check history is following specs let assert_history = |args: Cow<'static, str>, size: usize| async move { let resp = server.get(&format!("{url}?{args}")).await; - assert_history_ids(resp, ids, size) + assert_history_ids(&resp, ids, size) }; // Get latest registered id let latest_id = || async move { assert_history("limit=-1".into(), 1).await[0] }; @@ -98,7 +98,7 @@ pub async fn routine_history< // Check history is following specs let assert_history = |args: String, size: usize| async move { let resp = server.get(&format!("{url}?{args}")).await; - assert_history_ids(resp, ids, size) + assert_history_ids(&resp, ids, size) }; // Get latest registered id let latest_id = || async { assert_history(format!("limit=-1"), 1).await[0] }; @@ -202,8 +202,8 @@ pub async fn routine_history< } #[track_caller] -fn assert_history_ids<T: DeserializeOwned>( - resp: TestResponse, +fn assert_history_ids<'de, T: Deserialize<'de>>( + resp: &'de TestResponse, ids: impl Fn(T) -> Vec<i64>, size: usize, ) -> Vec<i64> { @@ -255,7 +255,7 @@ fn assert_history_ids<T: DeserializeOwned>( // Get currency from config async fn get_currency(server: &TestServer) -> String { let config = server - .get("/config") + .get("/taler-wire-gateway/config") .await .assert_ok_json::<serde_json::Value>(); let currency = config["currency"].as_str().unwrap(); @@ -279,9 +279,15 @@ pub async fn transfer_routine( }); // Check empty db - server.get("/transfers").await.assert_no_content(); server - .get(&format!("/transfers?status={}", default_status.as_ref())) + .get("/taler-wire-gateway/transfers") + .await + .assert_no_content(); + server + .get(&format!( + "/taler-wire-gateway/transfers?status={}", + default_status.as_ref() + )) .await .assert_no_content(); @@ -289,13 +295,13 @@ pub async fn transfer_routine( { // Check OK let first = server - .post("/transfer") + .post("/taler-wire-gateway/transfer") .json(&transfer_request) .await .assert_ok_json::<TransferResponse>(); // Check idempotent let second = server - .post("/transfer") + .post("/taler-wire-gateway/transfer") .json(&transfer_request) .await .assert_ok_json::<TransferResponse>(); @@ -304,7 +310,7 @@ pub async fn transfer_routine( // Check request uid reuse server - .post("/transfer") + .post("/taler-wire-gateway/transfer") .json(&json!(transfer_request + { "wtid": ShortHashCode::rand() })) @@ -313,7 +319,7 @@ pub async fn transfer_routine( // Check currency mismatch server - .post("/transfer") + .post("/taler-wire-gateway/transfer") .json(&json!(transfer_request + { "amount": "BAD:42" })) @@ -325,7 +331,7 @@ pub async fn transfer_routine( { let wtid = ShortHashCode::rand(); let resp = server - .post("/transfer") + .post("/taler-wire-gateway/transfer") .json(&json!(transfer_request + { "request_uid": HashCode::rand(), "wtid": wtid, @@ -335,7 +341,7 @@ pub async fn transfer_routine( // Check OK let tx = server - .get(&format!("/transfers/{}", resp.row_id)) + .get(&format!("/taler-wire-gateway/transfers/{}", resp.row_id)) .await .assert_ok_json::<TransferStatus>(); assert_eq!(default_status, tx.status); @@ -346,7 +352,7 @@ pub async fn transfer_routine( // Check unknown transaction server - .get("/transfers/42") + .get("/taler-wire-gateway/transfers/42") .await .assert_error(ErrorCode::BANK_TRANSACTION_NOT_FOUND); } @@ -355,7 +361,7 @@ pub async fn transfer_routine( { for _ in 0..4 { server - .post("/transfer") + .post("/taler-wire-gateway/transfer") .json(&json!(transfer_request + { "request_uid": HashCode::rand(), "wtid": ShortHashCode::rand(), @@ -365,14 +371,17 @@ pub async fn transfer_routine( } { let list = server - .get("/transfers") + .get("/taler-wire-gateway/transfers") .await .assert_ok_json::<TransferList>(); assert_eq!(list.transfers.len(), 6); assert_eq!( list, server - .get(&format!("/transfers?status={}", default_status.as_ref())) + .get(&format!( + "/taler-wire-gateway/transfers?status={}", + default_status.as_ref() + )) .await .assert_ok_json::<TransferList>() ); @@ -381,7 +390,7 @@ pub async fn transfer_routine( // Pagination test routine_pagination::<TransferList, _>( server, - "/transfers", + "/taler-wire-gateway/transfers", |it| { it.transfers .into_iter() @@ -390,7 +399,7 @@ pub async fn transfer_routine( }, |server, i| async move { server - .post("/transfer") + .post("/taler-wire-gateway/transfer") .json(&json!({ "request_uid": HashCode::rand(), "amount": amount(format!("{currency}:0.0{i}")), @@ -413,8 +422,8 @@ async fn add_incoming_routine( debit_acount: &Payto, ) { let (path, key) = match kind { - IncomingType::reserve => ("/admin/add-incoming", "reserve_pub"), - IncomingType::kyc => ("/admin/add-kycauth", "account_pub"), + IncomingType::reserve => ("/taler-wire-gateway/admin/add-incoming", "reserve_pub"), + IncomingType::kyc => ("/taler-wire-gateway/admin/add-kycauth", "account_pub"), IncomingType::wad => unreachable!(), }; let valid_req = json!({ @@ -485,10 +494,13 @@ pub async fn admin_add_incoming_routine(server: &TestServer, debit_acount: &Payt let currency = &get_currency(server).await; // History - server.get("/history/incoming").await.assert_no_content(); + server + .get("/taler-wire-gateway/history/incoming") + .await + .assert_no_content(); routine_history( server, - "/history/incoming", + "/taler-wire-gateway/history/incoming", |it: IncomingHistory| { it.incoming_transactions .into_iter() @@ -503,7 +515,7 @@ pub async fn admin_add_incoming_routine(server: &TestServer, debit_acount: &Payt |server, i| async move { if i % 2 == 0 { server - .post("/admin/add-incoming") + .post("/taler-wire-gateway/admin/add-incoming") .json(&json!({ "amount": format!("{currency}:0.0{i}"), "reserve_pub": EddsaPublicKey::rand(), @@ -513,7 +525,7 @@ pub async fn admin_add_incoming_routine(server: &TestServer, debit_acount: &Payt .assert_ok_json::<TransferResponse>(); } else { server - .post("/admin/add-kycauth") + .post("/taler-wire-gateway/admin/add-kycauth") .json(&json!({ "amount": format!("{currency}:0.0{i}"), "account_pub": EddsaPublicKey::rand(),