taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

api.rs (19395B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use compact_str::CompactString;
     18 use jiff::Timestamp;
     19 use taler_api::{
     20     api::{TalerApi, prepared::PreparedTransfer, revenue::Revenue, wire::WireGateway},
     21     error::{ApiResult, failure_code},
     22     subject::{IncomingSubject, fmt_in_subject},
     23 };
     24 use taler_common::{
     25     api::{
     26         params::{History, Page},
     27         prepared::{
     28             RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject,
     29             Unregistration,
     30         },
     31         revenue::RevenueIncomingHistory,
     32         wire::{
     33             AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddMappedRequest,
     34             IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse,
     35             TransferState, TransferStatus,
     36         },
     37     },
     38     db::IncomingType,
     39     error_code::ErrorCode,
     40     types::{amount::Currency, timestamp::TalerTimestamp},
     41 };
     42 use tokio::sync::watch::Sender;
     43 
     44 use crate::{
     45     db::{self, AddIncomingResult, Transfer, TxInAdmin},
     46     payto::{CyclosPayto, FullCyclosPayto},
     47 };
     48 
     49 pub struct CyclosApi {
     50     pub pool: sqlx::PgPool,
     51     pub currency: Currency,
     52     pub payto: FullCyclosPayto,
     53     pub in_channel: Sender<i64>,
     54     pub taler_in_channel: Sender<i64>,
     55     pub out_channel: Sender<i64>,
     56     pub taler_out_channel: Sender<i64>,
     57     pub root: CompactString,
     58 }
     59 
     60 impl CyclosApi {
     61     pub fn start(
     62         pool: sqlx::PgPool,
     63         root: CompactString,
     64         payto: FullCyclosPayto,
     65         currency: Currency,
     66     ) -> Self {
     67         let in_channel = Sender::new(0);
     68         let taler_in_channel = Sender::new(0);
     69         let out_channel = Sender::new(0);
     70         let taler_out_channel = Sender::new(0);
     71         let tmp = Self {
     72             pool: pool.clone(),
     73             payto,
     74             currency,
     75             root,
     76             in_channel: in_channel.clone(),
     77             taler_in_channel: taler_in_channel.clone(),
     78             out_channel: out_channel.clone(),
     79             taler_out_channel: taler_out_channel.clone(),
     80         };
     81         tokio::spawn(db::notification_listener(
     82             pool,
     83             in_channel,
     84             taler_in_channel,
     85             out_channel,
     86             taler_out_channel,
     87         ));
     88         tmp
     89     }
     90 }
     91 
     92 impl TalerApi for CyclosApi {
     93     fn currency(&self) -> Currency {
     94         self.currency
     95     }
     96 
     97     fn implementation(&self) -> &'static str {
     98         "urn:net:taler:specs:taler-cyclos:taler-rust"
     99     }
    100 }
    101 
    102 impl WireGateway for CyclosApi {
    103     async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> {
    104         let creditor = FullCyclosPayto::try_from(&req.credit_account)?;
    105         let result = db::make_transfer(
    106             &self.pool,
    107             &Transfer {
    108                 request_uid: req.request_uid,
    109                 amount: req.amount.decimal(),
    110                 exchange_base_url: req.exchange_base_url,
    111                 metadata: req.metadata,
    112                 wtid: req.wtid,
    113                 creditor_id: *creditor.id,
    114                 creditor_name: creditor.name,
    115             },
    116             &Timestamp::now(),
    117         )
    118         .await?;
    119         match result {
    120             db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse {
    121                 timestamp: initiated_at.into(),
    122                 row_id: id,
    123             }),
    124             db::TransferResult::RequestUidReuse => {
    125                 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED))
    126             }
    127             db::TransferResult::WtidReuse => {
    128                 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED))
    129             }
    130         }
    131     }
    132 
    133     async fn transfer_page(
    134         &self,
    135         page: Page,
    136         status: Option<TransferState>,
    137     ) -> ApiResult<TransferList> {
    138         Ok(TransferList {
    139             transfers: db::transfer_page(&self.pool, &status, &self.currency, &self.root, &page)
    140                 .await?,
    141             debit_account: self.payto.as_uri(),
    142         })
    143     }
    144 
    145     async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> {
    146         Ok(db::transfer_by_id(&self.pool, id, &self.currency, &self.root).await?)
    147     }
    148 
    149     async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> {
    150         Ok(OutgoingHistory {
    151             outgoing_transactions: db::outgoing_history(
    152                 &self.pool,
    153                 &params,
    154                 &self.currency,
    155                 &self.root,
    156                 || self.taler_out_channel.subscribe(),
    157             )
    158             .await?,
    159             debit_account: self.payto.as_uri(),
    160         })
    161     }
    162 
    163     async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> {
    164         Ok(IncomingHistory {
    165             incoming_transactions: db::incoming_history(
    166                 &self.pool,
    167                 &params,
    168                 &self.currency,
    169                 &self.root,
    170                 || self.taler_in_channel.subscribe(),
    171             )
    172             .await?,
    173             credit_account: self.payto.as_uri(),
    174         })
    175     }
    176 
    177     async fn add_incoming_reserve(
    178         &self,
    179         req: AddIncomingRequest,
    180     ) -> ApiResult<AddIncomingResponse> {
    181         let debtor = FullCyclosPayto::try_from(&req.debit_account)?;
    182         let res = db::register_tx_in_admin(
    183             &self.pool,
    184             &TxInAdmin {
    185                 amount: req.amount.decimal(),
    186                 subject: format!("Admin incoming {}", req.reserve_pub),
    187                 debtor_id: *debtor.id,
    188                 debtor_name: debtor.name,
    189                 metadata: IncomingSubject::Reserve(req.reserve_pub),
    190             },
    191             &Timestamp::now(),
    192         )
    193         .await?;
    194         match res {
    195             AddIncomingResult::Success {
    196                 row_id, valued_at, ..
    197             } => Ok(AddIncomingResponse {
    198                 row_id,
    199                 timestamp: valued_at.into(),
    200             }),
    201             AddIncomingResult::ReservePubReuse => {
    202                 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    203             }
    204             AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => {
    205                 unreachable!("mapping unused")
    206             }
    207         }
    208     }
    209 
    210     async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddIncomingResponse> {
    211         let debtor = FullCyclosPayto::try_from(&req.debit_account)?;
    212         let res = db::register_tx_in_admin(
    213             &self.pool,
    214             &TxInAdmin {
    215                 amount: req.amount.decimal(),
    216                 subject: format!("Admin incoming KYC:{}", req.account_pub),
    217                 debtor_id: *debtor.id,
    218                 debtor_name: debtor.name,
    219                 metadata: IncomingSubject::Kyc(req.account_pub),
    220             },
    221             &Timestamp::now(),
    222         )
    223         .await?;
    224         match res {
    225             AddIncomingResult::Success {
    226                 row_id, valued_at, ..
    227             } => Ok(AddIncomingResponse {
    228                 row_id,
    229                 timestamp: valued_at.into(),
    230             }),
    231             AddIncomingResult::ReservePubReuse => {
    232                 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    233             }
    234             AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => {
    235                 unreachable!("mapping unused")
    236             }
    237         }
    238     }
    239 
    240     async fn add_incoming_mapped(&self, req: AddMappedRequest) -> ApiResult<AddIncomingResponse> {
    241         let debtor = FullCyclosPayto::try_from(&req.debit_account)?;
    242         let res = db::register_tx_in_admin(
    243             &self.pool,
    244             &TxInAdmin {
    245                 amount: req.amount.decimal(),
    246                 subject: format!("Admin incoming MAP:{}", req.authorization_pub),
    247                 debtor_id: *debtor.id,
    248                 debtor_name: debtor.name,
    249                 metadata: IncomingSubject::Map(req.authorization_pub),
    250             },
    251             &Timestamp::now(),
    252         )
    253         .await?;
    254         match res {
    255             AddIncomingResult::Success {
    256                 row_id, valued_at, ..
    257             } => Ok(AddIncomingResponse {
    258                 row_id,
    259                 timestamp: valued_at.into(),
    260             }),
    261             AddIncomingResult::ReservePubReuse => {
    262                 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    263             }
    264             AddIncomingResult::UnknownMapping => {
    265                 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_UNKNOWN))
    266             }
    267             AddIncomingResult::MappingReuse => {
    268                 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_REUSED))
    269             }
    270         }
    271     }
    272 
    273     fn support_account_check(&self) -> bool {
    274         false
    275     }
    276 }
    277 
    278 impl Revenue for CyclosApi {
    279     async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> {
    280         Ok(RevenueIncomingHistory {
    281             incoming_transactions: db::revenue_history(
    282                 &self.pool,
    283                 &params,
    284                 &self.currency,
    285                 &self.root,
    286                 || self.in_channel.subscribe(),
    287             )
    288             .await?,
    289             credit_account: self.payto.as_uri(),
    290         })
    291     }
    292 }
    293 
    294 impl PreparedTransfer for CyclosApi {
    295     fn supported_formats(&self) -> &[SubjectFormat] {
    296         &[SubjectFormat::SIMPLE]
    297     }
    298 
    299     async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> {
    300         let creditor = CyclosPayto::try_from(&req.credit_account)?;
    301         if *creditor != *self.payto {
    302             return Err(failure_code(ErrorCode::BANK_UNKNOWN_CREDITOR));
    303         }
    304         match db::transfer_register(&self.pool, &req).await? {
    305             db::RegistrationResult::Success => {
    306                 let simple = TransferSubject::Simple {
    307                     credit_amount: req.credit_amount,
    308                     subject: if req.authorization_pub == req.account_pub && !req.recurrent {
    309                         fmt_in_subject(req.r#type.into(), &req.account_pub).to_string()
    310                     } else {
    311                         fmt_in_subject(IncomingType::map, &req.authorization_pub).to_string()
    312                     },
    313                 };
    314                 ApiResult::Ok(RegistrationResponse {
    315                     subjects: vec![simple],
    316                     expiration: TalerTimestamp::Never,
    317                 })
    318             }
    319             db::RegistrationResult::ReservePubReuse => {
    320                 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    321             }
    322         }
    323     }
    324 
    325     async fn unregistration(&self, req: Unregistration) -> ApiResult<bool> {
    326         Ok(db::transfer_unregister(&self.pool, &req).await?)
    327     }
    328 }
    329 
    330 #[cfg(test)]
    331 mod test {
    332     use std::sync::{
    333         Arc, LazyLock,
    334         atomic::{AtomicI64, Ordering},
    335     };
    336 
    337     use compact_str::CompactString;
    338     use jiff::Timestamp;
    339     use sqlx::{PgPool, Row as _, postgres::PgRow};
    340     use taler_api::{
    341         api::TalerRouter as _,
    342         auth::AuthMethod,
    343         db::TypeHelper as _,
    344         subject::{IncomingSubject, OutgoingSubject},
    345     };
    346     use taler_common::{
    347         api::{
    348             EddsaPublicKey,
    349             prepared::PreparedTransferConfig,
    350             revenue::RevenueConfig,
    351             wire::{TransferState, WireConfig},
    352         },
    353         db::IncomingType,
    354         types::{
    355             amount::{Currency, decimal},
    356             payto::{PaytoURI, payto},
    357         },
    358     };
    359     use taler_test_utils::{
    360         Router,
    361         db::db_test_setup,
    362         routine::{
    363             Status, admin_add_incoming_routine, in_history_routine, out_history_routine,
    364             registration_routine, revenue_routine, transfer_routine,
    365         },
    366         server::TestServer as _,
    367         tasks,
    368     };
    369 
    370     use crate::{
    371         api::CyclosApi,
    372         constants::CONFIG_SOURCE,
    373         db::{self, TxIn, TxOutKind},
    374         payto::{FullCyclosPayto, cyclos_payto},
    375     };
    376 
    377     static PAYTO: LazyLock<FullCyclosPayto> = LazyLock::new(|| {
    378         cyclos_payto("payto://cyclos/localhost/7762070814178012479?receiver-name=Smith")
    379     });
    380     static EXCHANGE: LazyLock<PaytoURI> = LazyLock::new(|| PAYTO.as_uri());
    381     static UNKNOWN: LazyLock<PaytoURI> = LazyLock::new(|| {
    382         payto("payto://cyclos/localhost/7762070814178012478?receiver-name=Unknown")
    383     });
    384 
    385     async fn setup() -> (Router, PgPool) {
    386         let (_, pool) = db_test_setup(CONFIG_SOURCE).await;
    387         let api = Arc::new(CyclosApi::start(
    388             pool.clone(),
    389             CompactString::const_new("localhost"),
    390             PAYTO.clone(),
    391             Currency::TEST,
    392         ));
    393         let server = Router::new()
    394             .wire_gateway(api.clone(), AuthMethod::None)
    395             .prepared_transfer(api.clone())
    396             .revenue(api, AuthMethod::None)
    397             .finalize();
    398 
    399         (server, pool)
    400     }
    401 
    402     #[tokio::test]
    403     async fn config() {
    404         let (server, _) = setup().await;
    405         server
    406             .get("/taler-wire-gateway/config")
    407             .await
    408             .assert_ok_json::<WireConfig>();
    409         server
    410             .get("/taler-prepared-transfer/config")
    411             .await
    412             .assert_ok_json::<PreparedTransferConfig>();
    413         server
    414             .get("/taler-revenue/config")
    415             .await
    416             .assert_ok_json::<RevenueConfig>();
    417     }
    418 
    419     #[tokio::test]
    420     async fn transfer() {
    421         let (server, _) = setup().await;
    422         transfer_routine(
    423             &server.prefix("/taler-wire-gateway"),
    424             TransferState::pending,
    425             &EXCHANGE,
    426         )
    427         .await;
    428     }
    429 
    430     static CODE: AtomicI64 = AtomicI64::new(0);
    431 
    432     async fn r#in(db: &PgPool, subject: Option<IncomingSubject>) {
    433         let now = Timestamp::now();
    434         db::register_tx_in(
    435             &mut db.acquire().await.unwrap(),
    436             &TxIn {
    437                 transfer_id: CODE.fetch_add(1, Ordering::Relaxed),
    438                 tx_id: None,
    439                 amount: decimal("10"),
    440                 subject: "subject".to_owned(),
    441                 debtor_id: 31000163100000000,
    442                 debtor_name: "Name".into(),
    443                 valued_at: Timestamp::now(),
    444             },
    445             &subject,
    446             &now,
    447         )
    448         .await
    449         .unwrap();
    450     }
    451 
    452     async fn in_malformed(db: &PgPool) {
    453         r#in(db, None).await
    454     }
    455 
    456     async fn in_talerable(db: &PgPool) {
    457         r#in(db, Some(IncomingSubject::Reserve(EddsaPublicKey::rand()))).await
    458     }
    459 
    460     async fn out(db: &PgPool, kind: &TxOutKind) {
    461         let i = CODE.fetch_add(1, Ordering::Relaxed);
    462         let now = Timestamp::now();
    463         db::register_tx_out(
    464             &mut db.acquire().await.unwrap(),
    465             &db::TxOut {
    466                 transfer_id: i,
    467                 tx_id: if i % 2 == 0 { Some(i % 2) } else { None },
    468                 amount: decimal("10"),
    469                 subject: "subject".to_owned(),
    470                 creditor_id: 31000163100000000,
    471                 creditor_name: "Name".into(),
    472                 valued_at: now,
    473             },
    474             kind,
    475             &now,
    476         )
    477         .await
    478         .unwrap();
    479     }
    480 
    481     async fn out_talerable(db: &PgPool) {
    482         out(db, &TxOutKind::Talerable(OutgoingSubject::rand())).await
    483     }
    484 
    485     async fn out_bounce(db: &PgPool) {
    486         out(db, &TxOutKind::Bounce(CODE.load(Ordering::Relaxed))).await
    487     }
    488 
    489     async fn out_malformed(db: &PgPool) {
    490         out(db, &TxOutKind::Simple).await
    491     }
    492 
    493     #[tokio::test]
    494     async fn outgoing_history() {
    495         let (server, db) = &setup().await;
    496 
    497         out_history_routine(
    498             &server.prefix("/taler-wire-gateway"),
    499             tasks!({ out_talerable(db).await }),
    500             tasks!(
    501                 { out_bounce(db).await },
    502                 { out_malformed(db).await },
    503                 { in_malformed(db).await },
    504                 { in_talerable(db).await }
    505             ),
    506         )
    507         .await;
    508     }
    509 
    510     #[tokio::test]
    511     async fn admin_add_incoming() {
    512         let (server, _) = setup().await;
    513         admin_add_incoming_routine(
    514             &server.prefix("/taler-wire-gateway"),
    515             &server.prefix("/taler-prepared-transfer"),
    516             &EXCHANGE,
    517             &EXCHANGE,
    518             true,
    519         )
    520         .await;
    521     }
    522 
    523     #[tokio::test]
    524     async fn in_history() {
    525         let (server, db) = &setup().await;
    526         in_history_routine(
    527             &server.prefix("/taler-wire-gateway"),
    528             &server.prefix("/taler-prepared-transfer"),
    529             &EXCHANGE,
    530             &EXCHANGE,
    531             true,
    532             tasks!({ in_talerable(db).await }),
    533             tasks!(
    534                 { out_malformed(db).await },
    535                 { out_talerable(db).await },
    536                 { out_bounce(db).await },
    537                 { in_malformed(db).await }
    538             ),
    539         )
    540         .await;
    541     }
    542 
    543     #[tokio::test]
    544     async fn revenue() {
    545         let (server, db) = &setup().await;
    546         revenue_routine(
    547             &server.prefix("/taler-wire-gateway"),
    548             &server.prefix("/taler-revenue"),
    549             &EXCHANGE,
    550             true,
    551             tasks!({ in_malformed(db).await }, { in_talerable(db).await },),
    552             tasks!({ out_malformed(db).await }, { out_talerable(db).await }, {
    553                 out_bounce(db).await
    554             }),
    555         )
    556         .await;
    557     }
    558 
    559     async fn check_in(pool: &PgPool) -> Vec<Status> {
    560         sqlx::query(
    561             "
    562             SELECT pending_recurrent_in.authorization_pub IS NOT NULL, bounced.tx_in_id IS NOT NULL, type, metadata
    563             FROM tx_in
    564                 LEFT JOIN taler_in USING (tx_in_id)
    565                 LEFT JOIN pending_recurrent_in USING (tx_in_id)
    566                 LEFT JOIN bounced USING (tx_in_id)
    567             ORDER BY tx_in.tx_in_id
    568         ",
    569         )
    570         .try_map(|r: PgRow| {
    571             Ok(
    572                 if r.try_get_flag(0)? {
    573                     Status::Pending
    574                 } else if r.try_get_flag(1)? {
    575                     Status::Bounced
    576                 } else {
    577                     match r.try_get(2)? {
    578                         None => Status::Simple,
    579                         Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?),
    580                         Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?),
    581                         Some(e) => unreachable!("{e:?}")
    582                     }
    583                 }
    584             )
    585         })
    586         .fetch_all(pool)
    587         .await
    588         .unwrap()
    589     }
    590 
    591     #[tokio::test]
    592     async fn registration() {
    593         let (server, pool) = setup().await;
    594         registration_routine(
    595             &server.prefix("/taler-wire-gateway"),
    596             &server.prefix("/taler-prepared-transfer"),
    597             &EXCHANGE,
    598             &EXCHANGE,
    599             &UNKNOWN,
    600             || check_in(&pool),
    601         )
    602         .await;
    603     }
    604 }