taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

api.rs (15740B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use jiff::Timestamp;
     18 use taler_api::{
     19     api::{TalerApi, revenue::Revenue, transfer::PreparedTransfer, wire::WireGateway},
     20     error::{ApiResult, failure, failure_code},
     21     subject::{IncomingSubject, fmt_in_subject},
     22 };
     23 use taler_common::{
     24     api_common::safe_u64,
     25     api_params::{History, Page},
     26     api_revenue::RevenueIncomingHistory,
     27     api_transfer::{
     28         RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, Unregistration,
     29     },
     30     api_wire::{
     31         AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddMappedRequest,
     32         IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse,
     33         TransferState, TransferStatus,
     34     },
     35     db::IncomingType,
     36     error_code::ErrorCode,
     37     types::{payto::PaytoURI, timestamp::TalerTimestamp, utils::date_to_utc_ts},
     38 };
     39 use tokio::sync::watch::Sender;
     40 
     41 use crate::{
     42     FullHuPayto,
     43     constants::CURRENCY,
     44     db::{self, AddIncomingResult, Transfer, TxInAdmin},
     45 };
     46 
     47 pub struct MagnetApi {
     48     pub pool: sqlx::PgPool,
     49     pub payto: PaytoURI,
     50     pub in_channel: Sender<i64>,
     51     pub taler_in_channel: Sender<i64>,
     52     pub out_channel: Sender<i64>,
     53     pub taler_out_channel: Sender<i64>,
     54 }
     55 
     56 impl MagnetApi {
     57     pub async fn start(pool: sqlx::PgPool, payto: PaytoURI) -> Self {
     58         let in_channel = Sender::new(0);
     59         let taler_in_channel = Sender::new(0);
     60         let out_channel = Sender::new(0);
     61         let taler_out_channel = Sender::new(0);
     62         let tmp = Self {
     63             pool: pool.clone(),
     64             payto,
     65             in_channel: in_channel.clone(),
     66             taler_in_channel: taler_in_channel.clone(),
     67             out_channel: out_channel.clone(),
     68             taler_out_channel: taler_out_channel.clone(),
     69         };
     70         tokio::spawn(db::notification_listener(
     71             pool,
     72             in_channel,
     73             taler_in_channel,
     74             out_channel,
     75             taler_out_channel,
     76         ));
     77         tmp
     78     }
     79 }
     80 
     81 impl TalerApi for MagnetApi {
     82     fn currency(&self) -> &str {
     83         CURRENCY.as_ref()
     84     }
     85 
     86     fn implementation(&self) -> &'static str {
     87         "urn:net:taler:specs:taler-magnet-bank:taler-rust"
     88     }
     89 }
     90 
     91 impl WireGateway for MagnetApi {
     92     async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> {
     93         let creditor = FullHuPayto::try_from(&req.credit_account)?;
     94         let result = db::make_transfer(
     95             &self.pool,
     96             &Transfer {
     97                 request_uid: req.request_uid,
     98                 wtid: req.wtid,
     99                 amount: req.amount.decimal(),
    100                 metadata: req.metadata,
    101                 creditor,
    102                 exchange_base_url: req.exchange_base_url,
    103             },
    104             &Timestamp::now(),
    105         )
    106         .await?;
    107         match result {
    108             db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse {
    109                 timestamp: initiated_at.into(),
    110                 row_id: safe_u64(id),
    111             }),
    112             db::TransferResult::RequestUidReuse => {
    113                 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED))
    114             }
    115             db::TransferResult::WtidReuse => {
    116                 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED))
    117             }
    118         }
    119     }
    120 
    121     async fn transfer_page(
    122         &self,
    123         page: Page,
    124         status: Option<TransferState>,
    125     ) -> ApiResult<TransferList> {
    126         Ok(TransferList {
    127             transfers: db::transfer_page(&self.pool, &status, &page).await?,
    128             debit_account: self.payto.clone(),
    129         })
    130     }
    131 
    132     async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> {
    133         Ok(db::transfer_by_id(&self.pool, id).await?)
    134     }
    135 
    136     async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> {
    137         Ok(OutgoingHistory {
    138             outgoing_transactions: db::outgoing_history(&self.pool, &params, || {
    139                 self.taler_out_channel.subscribe()
    140             })
    141             .await?,
    142             debit_account: self.payto.clone(),
    143         })
    144     }
    145 
    146     async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> {
    147         Ok(IncomingHistory {
    148             incoming_transactions: db::incoming_history(&self.pool, &params, || {
    149                 self.taler_in_channel.subscribe()
    150             })
    151             .await?,
    152             credit_account: self.payto.clone(),
    153         })
    154     }
    155 
    156     async fn add_incoming_reserve(
    157         &self,
    158         req: AddIncomingRequest,
    159     ) -> ApiResult<AddIncomingResponse> {
    160         let debtor = FullHuPayto::try_from(&req.debit_account)?;
    161         let res = db::register_tx_in_admin(
    162             &self.pool,
    163             &TxInAdmin {
    164                 amount: req.amount,
    165                 subject: format!("Admin incoming {}", req.reserve_pub),
    166                 debtor,
    167                 metadata: IncomingSubject::Reserve(req.reserve_pub),
    168             },
    169             &Timestamp::now(),
    170         )
    171         .await?;
    172         match res {
    173             AddIncomingResult::Success {
    174                 row_id, valued_at, ..
    175             } => Ok(AddIncomingResponse {
    176                 row_id: safe_u64(row_id),
    177                 timestamp: date_to_utc_ts(&valued_at).into(),
    178             }),
    179             AddIncomingResult::ReservePubReuse => {
    180                 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    181             }
    182             AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => {
    183                 unreachable!("mapping not used")
    184             }
    185         }
    186     }
    187 
    188     async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddIncomingResponse> {
    189         let debtor = FullHuPayto::try_from(&req.debit_account)?;
    190         let res = db::register_tx_in_admin(
    191             &self.pool,
    192             &TxInAdmin {
    193                 amount: req.amount,
    194                 subject: format!("Admin incoming KYC:{}", req.account_pub),
    195                 debtor,
    196                 metadata: IncomingSubject::Kyc(req.account_pub),
    197             },
    198             &Timestamp::now(),
    199         )
    200         .await?;
    201         match res {
    202             AddIncomingResult::Success {
    203                 row_id, valued_at, ..
    204             } => Ok(AddIncomingResponse {
    205                 row_id: safe_u64(row_id),
    206                 timestamp: date_to_utc_ts(&valued_at).into(),
    207             }),
    208             AddIncomingResult::ReservePubReuse => unreachable!("kyc"),
    209             AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => {
    210                 unreachable!("mapping not used")
    211             }
    212         }
    213     }
    214 
    215     async fn add_incoming_mapped(&self, req: AddMappedRequest) -> ApiResult<AddIncomingResponse> {
    216         let debtor = FullHuPayto::try_from(&req.debit_account)?;
    217         let res = db::register_tx_in_admin(
    218             &self.pool,
    219             &TxInAdmin {
    220                 amount: req.amount,
    221                 subject: format!("Admin incoming MAP:{}", req.authorization_pub),
    222                 debtor,
    223                 metadata: IncomingSubject::Map(req.authorization_pub),
    224             },
    225             &Timestamp::now(),
    226         )
    227         .await?;
    228         match res {
    229             AddIncomingResult::Success {
    230                 row_id, valued_at, ..
    231             } => Ok(AddIncomingResponse {
    232                 row_id: safe_u64(row_id),
    233                 timestamp: date_to_utc_ts(&valued_at).into(),
    234             }),
    235             AddIncomingResult::ReservePubReuse => {
    236                 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    237             }
    238             AddIncomingResult::UnknownMapping => {
    239                 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_UNKNOWN))
    240             }
    241             AddIncomingResult::MappingReuse => {
    242                 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_REUSED))
    243             }
    244         }
    245     }
    246 
    247     fn support_account_check(&self) -> bool {
    248         false
    249     }
    250 }
    251 
    252 impl Revenue for MagnetApi {
    253     async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> {
    254         Ok(RevenueIncomingHistory {
    255             incoming_transactions: db::revenue_history(&self.pool, &params, || {
    256                 self.in_channel.subscribe()
    257             })
    258             .await?,
    259             credit_account: self.payto.clone(),
    260         })
    261     }
    262 }
    263 
    264 impl PreparedTransfer for MagnetApi {
    265     fn supported_formats(&self) -> &[SubjectFormat] {
    266         &[SubjectFormat::SIMPLE]
    267     }
    268 
    269     async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> {
    270         match db::transfer_register(&self.pool, &req).await? {
    271             db::RegistrationResult::Success => {
    272                 let simple = TransferSubject::Simple {
    273                     credit_amount: req.credit_amount,
    274                     subject: if req.authorization_pub == req.account_pub && !req.recurrent {
    275                         fmt_in_subject(req.r#type.into(), &req.account_pub)
    276                     } else {
    277                         fmt_in_subject(IncomingType::map, &req.authorization_pub)
    278                     },
    279                 };
    280                 ApiResult::Ok(RegistrationResponse {
    281                     subjects: vec![simple],
    282                     expiration: TalerTimestamp::Never,
    283                 })
    284             }
    285             db::RegistrationResult::ReservePubReuse => {
    286                 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    287             }
    288         }
    289     }
    290 
    291     async fn unregistration(&self, req: Unregistration) -> ApiResult<()> {
    292         if !db::transfer_unregister(&self.pool, &req).await? {
    293             Err(failure(
    294                 ErrorCode::BANK_TRANSACTION_NOT_FOUND,
    295                 format!("Prepared transfer '{}' not found", req.authorization_pub),
    296             ))
    297         } else {
    298             Ok(())
    299         }
    300     }
    301 }
    302 
    303 #[cfg(test)]
    304 mod test {
    305 
    306     use std::sync::{Arc, LazyLock};
    307 
    308     use jiff::{Timestamp, Zoned};
    309     use sqlx::{PgPool, Row as _, postgres::PgRow};
    310     use taler_api::{
    311         api::TalerRouter as _, auth::AuthMethod, db::TypeHelper as _, subject::OutgoingSubject,
    312     };
    313     use taler_common::{
    314         api_revenue::RevenueConfig,
    315         api_transfer::PreparedTransferConfig,
    316         api_wire::{OutgoingHistory, TransferState, WireConfig},
    317         db::IncomingType,
    318         types::{
    319             amount::amount,
    320             payto::{PaytoURI, payto},
    321         },
    322     };
    323     use taler_test_utils::{
    324         Router,
    325         db::db_test_setup,
    326         routine::{
    327             Status, admin_add_incoming_routine, in_history_routine, registration_routine,
    328             revenue_routine, routine_pagination, transfer_routine,
    329         },
    330         server::TestServer,
    331     };
    332 
    333     use crate::{
    334         FullHuPayto,
    335         api::MagnetApi,
    336         constants::CONFIG_SOURCE,
    337         db::{self, TxOutKind},
    338         magnet_api::types::TxStatus,
    339         magnet_payto,
    340     };
    341 
    342     static PAYTO: LazyLock<FullHuPayto> = LazyLock::new(|| {
    343         magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name")
    344     });
    345     static ACCOUNT: LazyLock<PaytoURI> = LazyLock::new(|| PAYTO.as_payto());
    346 
    347     async fn setup() -> (Router, PgPool) {
    348         let (_, pool) = db_test_setup(CONFIG_SOURCE).await;
    349         let api = Arc::new(MagnetApi::start(pool.clone(), ACCOUNT.clone()).await);
    350         let server = Router::new()
    351             .wire_gateway(api.clone(), AuthMethod::None)
    352             .prepared_transfer(api.clone())
    353             .revenue(api, AuthMethod::None)
    354             .finalize();
    355 
    356         (server, pool)
    357     }
    358 
    359     #[tokio::test]
    360     async fn config() {
    361         let (server, _) = setup().await;
    362         server
    363             .get("/taler-wire-gateway/config")
    364             .await
    365             .assert_ok_json::<WireConfig>();
    366         server
    367             .get("/taler-prepared-transfer/config")
    368             .await
    369             .assert_ok_json::<PreparedTransferConfig>();
    370         server
    371             .get("/taler-revenue/config")
    372             .await
    373             .assert_ok_json::<RevenueConfig>();
    374     }
    375 
    376     #[tokio::test]
    377     async fn transfer() {
    378         let (server, _) = setup().await;
    379         transfer_routine(
    380             &server,
    381             TransferState::pending,
    382             &payto("payto://iban/HU02162000031000164800000000?receiver-name=name"),
    383         )
    384         .await;
    385     }
    386 
    387     #[tokio::test]
    388     async fn outgoing_history() {
    389         let (server, pool) = setup().await;
    390         routine_pagination::<OutgoingHistory>(
    391             &server,
    392             "/taler-wire-gateway/history/outgoing",
    393             async |i| {
    394                 let mut conn = pool.acquire().await.unwrap();
    395                 let now = Zoned::now().date();
    396                 db::register_tx_out(
    397                     &mut conn,
    398                     &db::TxOut {
    399                         code: i as u64,
    400                         amount: amount("EUR:10"),
    401                         subject: "subject".into(),
    402                         creditor: PAYTO.clone(),
    403                         value_date: now,
    404                         status: TxStatus::Completed,
    405                     },
    406                     &TxOutKind::Talerable(OutgoingSubject::rand()),
    407                     &Timestamp::now(),
    408                 )
    409                 .await
    410                 .unwrap();
    411             },
    412         )
    413         .await;
    414     }
    415 
    416     #[tokio::test]
    417     async fn admin_add_incoming() {
    418         let (server, _) = setup().await;
    419         admin_add_incoming_routine(&server, &ACCOUNT, true).await;
    420     }
    421 
    422     #[tokio::test]
    423     async fn in_history() {
    424         let (server, _) = setup().await;
    425         in_history_routine(&server, &ACCOUNT, true).await;
    426     }
    427 
    428     #[tokio::test]
    429     async fn revenue() {
    430         let (server, _) = setup().await;
    431         revenue_routine(&server, &ACCOUNT, true).await;
    432     }
    433 
    434     async fn check_in(pool: &PgPool) -> Vec<Status> {
    435         sqlx::query(
    436             "
    437             SELECT pending_recurrent_in.authorization_pub IS NOT NULL, initiated_id IS NOT NULL, type, metadata 
    438             FROM tx_in
    439                 LEFT JOIN taler_in USING (tx_in_id)
    440                 LEFT JOIN pending_recurrent_in USING (tx_in_id)
    441                 LEFT JOIN bounced USING (tx_in_id)
    442             ORDER BY tx_in.tx_in_id
    443         ",
    444         )
    445         .try_map(|r: PgRow| {
    446             Ok(
    447                 if r.try_get_flag(0)? {
    448                     Status::Pending
    449                 } else if r.try_get_flag(1)? {
    450                     Status::Bounced
    451                 } else {
    452                     match r.try_get(2)? {
    453                         None => Status::Simple,
    454                         Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?),
    455                         Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?),
    456                         Some(e) => unreachable!("{e:?}")
    457                     }
    458                 }
    459             )
    460         })
    461         .fetch_all(pool)
    462         .await
    463         .unwrap()
    464     }
    465 
    466     #[tokio::test]
    467     async fn registration() {
    468         let (server, pool) = setup().await;
    469         registration_routine(&server, &ACCOUNT, || check_in(&pool)).await;
    470     }
    471 }