api.rs (16705B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use compact_str::CompactString; 18 use jiff::Timestamp; 19 use taler_api::{ 20 api::{TalerApi, revenue::Revenue, transfer::PreparedTransfer, wire::WireGateway}, 21 error::{ApiResult, failure, failure_code}, 22 subject::{IncomingSubject, fmt_in_subject}, 23 }; 24 use taler_common::{ 25 api_common::safe_u64, 26 api_params::{History, Page}, 27 api_revenue::RevenueIncomingHistory, 28 api_transfer::{ 29 RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, Unregistration, 30 }, 31 api_wire::{ 32 AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddMappedRequest, 33 IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, 34 TransferState, TransferStatus, 35 }, 36 db::IncomingType, 37 error_code::ErrorCode, 38 types::{amount::Currency, payto::PaytoURI, timestamp::TalerTimestamp}, 39 }; 40 use tokio::sync::watch::Sender; 41 42 use crate::{ 43 db::{self, AddIncomingResult, Transfer, TxInAdmin}, 44 payto::FullCyclosPayto, 45 }; 46 47 pub struct CyclosApi { 48 pub pool: sqlx::PgPool, 49 pub currency: Currency, 50 pub payto: PaytoURI, 51 pub in_channel: Sender<i64>, 52 pub taler_in_channel: Sender<i64>, 53 pub out_channel: Sender<i64>, 54 pub taler_out_channel: Sender<i64>, 55 pub root: CompactString, 56 } 57 58 impl CyclosApi { 59 pub async fn start( 60 pool: sqlx::PgPool, 61 root: CompactString, 62 payto: PaytoURI, 63 currency: Currency, 64 ) -> Self { 65 let in_channel = Sender::new(0); 66 let taler_in_channel = Sender::new(0); 67 let out_channel = Sender::new(0); 68 let taler_out_channel = Sender::new(0); 69 let tmp = Self { 70 pool: pool.clone(), 71 payto, 72 currency, 73 root, 74 in_channel: in_channel.clone(), 75 taler_in_channel: taler_in_channel.clone(), 76 out_channel: out_channel.clone(), 77 taler_out_channel: taler_out_channel.clone(), 78 }; 79 tokio::spawn(db::notification_listener( 80 pool, 81 in_channel, 82 taler_in_channel, 83 out_channel, 84 taler_out_channel, 85 )); 86 tmp 87 } 88 } 89 90 impl TalerApi for CyclosApi { 91 fn currency(&self) -> &str { 92 self.currency.as_ref() 93 } 94 95 fn implementation(&self) -> &'static str { 96 "urn:net:taler:specs:taler-cyclos:taler-rust" 97 } 98 } 99 100 impl WireGateway for CyclosApi { 101 async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { 102 let creditor = FullCyclosPayto::try_from(&req.credit_account)?; 103 let result = db::make_transfer( 104 &self.pool, 105 &Transfer { 106 request_uid: req.request_uid, 107 amount: req.amount.decimal(), 108 exchange_base_url: req.exchange_base_url, 109 metadata: req.metadata, 110 wtid: req.wtid, 111 creditor_id: *creditor.id, 112 creditor_name: creditor.name, 113 }, 114 &Timestamp::now(), 115 ) 116 .await?; 117 match result { 118 db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse { 119 timestamp: initiated_at.into(), 120 row_id: safe_u64(id), 121 }), 122 db::TransferResult::RequestUidReuse => { 123 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED)) 124 } 125 db::TransferResult::WtidReuse => { 126 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)) 127 } 128 } 129 } 130 131 async fn transfer_page( 132 &self, 133 page: Page, 134 status: Option<TransferState>, 135 ) -> ApiResult<TransferList> { 136 Ok(TransferList { 137 transfers: db::transfer_page(&self.pool, &status, &self.currency, &self.root, &page) 138 .await?, 139 debit_account: self.payto.clone(), 140 }) 141 } 142 143 async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { 144 Ok(db::transfer_by_id(&self.pool, id, &self.currency, &self.root).await?) 145 } 146 147 async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { 148 Ok(OutgoingHistory { 149 outgoing_transactions: db::outgoing_history( 150 &self.pool, 151 ¶ms, 152 &self.currency, 153 &self.root, 154 || self.taler_out_channel.subscribe(), 155 ) 156 .await?, 157 debit_account: self.payto.clone(), 158 }) 159 } 160 161 async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { 162 Ok(IncomingHistory { 163 incoming_transactions: db::incoming_history( 164 &self.pool, 165 ¶ms, 166 &self.currency, 167 &self.root, 168 || self.taler_in_channel.subscribe(), 169 ) 170 .await?, 171 credit_account: self.payto.clone(), 172 }) 173 } 174 175 async fn add_incoming_reserve( 176 &self, 177 req: AddIncomingRequest, 178 ) -> ApiResult<AddIncomingResponse> { 179 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 180 let res = db::register_tx_in_admin( 181 &self.pool, 182 &TxInAdmin { 183 amount: req.amount.decimal(), 184 subject: format!("Admin incoming {}", req.reserve_pub), 185 debtor_id: *debtor.id, 186 debtor_name: debtor.name, 187 metadata: IncomingSubject::Reserve(req.reserve_pub), 188 }, 189 &Timestamp::now(), 190 ) 191 .await?; 192 match res { 193 AddIncomingResult::Success { 194 row_id, valued_at, .. 195 } => Ok(AddIncomingResponse { 196 row_id: safe_u64(row_id), 197 timestamp: valued_at.into(), 198 }), 199 AddIncomingResult::ReservePubReuse => { 200 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 201 } 202 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 203 unreachable!("mapping unused") 204 } 205 } 206 } 207 208 async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddIncomingResponse> { 209 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 210 let res = db::register_tx_in_admin( 211 &self.pool, 212 &TxInAdmin { 213 amount: req.amount.decimal(), 214 subject: format!("Admin incoming KYC:{}", req.account_pub), 215 debtor_id: *debtor.id, 216 debtor_name: debtor.name, 217 metadata: IncomingSubject::Kyc(req.account_pub), 218 }, 219 &Timestamp::now(), 220 ) 221 .await?; 222 match res { 223 AddIncomingResult::Success { 224 row_id, valued_at, .. 225 } => Ok(AddIncomingResponse { 226 row_id: safe_u64(row_id), 227 timestamp: valued_at.into(), 228 }), 229 AddIncomingResult::ReservePubReuse => { 230 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 231 } 232 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 233 unreachable!("mapping unused") 234 } 235 } 236 } 237 238 async fn add_incoming_mapped(&self, req: AddMappedRequest) -> ApiResult<AddIncomingResponse> { 239 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 240 let res = db::register_tx_in_admin( 241 &self.pool, 242 &TxInAdmin { 243 amount: req.amount.decimal(), 244 subject: format!("Admin incoming MAP:{}", req.authorization_pub), 245 debtor_id: *debtor.id, 246 debtor_name: debtor.name, 247 metadata: IncomingSubject::Map(req.authorization_pub), 248 }, 249 &Timestamp::now(), 250 ) 251 .await?; 252 match res { 253 AddIncomingResult::Success { 254 row_id, valued_at, .. 255 } => Ok(AddIncomingResponse { 256 row_id: safe_u64(row_id), 257 timestamp: valued_at.into(), 258 }), 259 AddIncomingResult::ReservePubReuse => { 260 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 261 } 262 AddIncomingResult::UnknownMapping => { 263 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_UNKNOWN)) 264 } 265 AddIncomingResult::MappingReuse => { 266 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_REUSED)) 267 } 268 } 269 } 270 271 fn support_account_check(&self) -> bool { 272 false 273 } 274 } 275 276 impl Revenue for CyclosApi { 277 async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> { 278 Ok(RevenueIncomingHistory { 279 incoming_transactions: db::revenue_history( 280 &self.pool, 281 ¶ms, 282 &self.currency, 283 &self.root, 284 || self.in_channel.subscribe(), 285 ) 286 .await?, 287 credit_account: self.payto.clone(), 288 }) 289 } 290 } 291 292 impl PreparedTransfer for CyclosApi { 293 fn supported_formats(&self) -> &[SubjectFormat] { 294 &[SubjectFormat::SIMPLE] 295 } 296 297 async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> { 298 match db::transfer_register(&self.pool, &req).await? { 299 db::RegistrationResult::Success => { 300 let simple = TransferSubject::Simple { 301 credit_amount: req.credit_amount, 302 subject: if req.authorization_pub == req.account_pub && !req.recurrent { 303 fmt_in_subject(req.r#type.into(), &req.account_pub) 304 } else { 305 fmt_in_subject(IncomingType::map, &req.authorization_pub) 306 }, 307 }; 308 ApiResult::Ok(RegistrationResponse { 309 subjects: vec![simple], 310 expiration: TalerTimestamp::Never, 311 }) 312 } 313 db::RegistrationResult::ReservePubReuse => { 314 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 315 } 316 } 317 } 318 319 async fn unregistration(&self, req: Unregistration) -> ApiResult<()> { 320 if !db::transfer_unregister(&self.pool, &req).await? { 321 Err(failure( 322 ErrorCode::BANK_TRANSACTION_NOT_FOUND, 323 format!("Prepared transfer '{}' not found", req.authorization_pub), 324 )) 325 } else { 326 Ok(()) 327 } 328 } 329 } 330 331 #[cfg(test)] 332 mod test { 333 use std::{ 334 str::FromStr as _, 335 sync::{Arc, LazyLock}, 336 }; 337 338 use compact_str::CompactString; 339 use jiff::Timestamp; 340 use sqlx::{PgPool, Row as _, postgres::PgRow}; 341 use taler_api::{ 342 api::TalerRouter as _, auth::AuthMethod, db::TypeHelper as _, subject::OutgoingSubject, 343 }; 344 use taler_common::{ 345 api_revenue::RevenueConfig, 346 api_transfer::PreparedTransferConfig, 347 api_wire::{OutgoingHistory, TransferState, WireConfig}, 348 db::IncomingType, 349 types::{ 350 amount::{Currency, decimal}, 351 payto::{PaytoURI, payto}, 352 }, 353 }; 354 use taler_test_utils::{ 355 Router, 356 db::db_test_setup, 357 routine::{ 358 Status, admin_add_incoming_routine, in_history_routine, registration_routine, 359 revenue_routine, routine_pagination, transfer_routine, 360 }, 361 server::TestServer as _, 362 }; 363 364 use crate::{ 365 api::CyclosApi, 366 constants::CONFIG_SOURCE, 367 db::{self, TxOutKind}, 368 }; 369 370 static ACCOUNT: LazyLock<PaytoURI> = 371 LazyLock::new(|| payto("payto://cyclos/localhost/7762070814178012479?receiver-name=name")); 372 373 async fn setup() -> (Router, PgPool) { 374 let (_, pool) = db_test_setup(CONFIG_SOURCE).await; 375 let api = Arc::new( 376 CyclosApi::start( 377 pool.clone(), 378 CompactString::const_new("localhost"), 379 ACCOUNT.clone(), 380 Currency::from_str("TEST").unwrap(), 381 ) 382 .await, 383 ); 384 let server = Router::new() 385 .wire_gateway(api.clone(), AuthMethod::None) 386 .prepared_transfer(api.clone()) 387 .revenue(api, AuthMethod::None) 388 .finalize(); 389 390 (server, pool) 391 } 392 393 #[tokio::test] 394 async fn config() { 395 let (server, _) = setup().await; 396 server 397 .get("/taler-wire-gateway/config") 398 .await 399 .assert_ok_json::<WireConfig>(); 400 server 401 .get("/taler-prepared-transfer/config") 402 .await 403 .assert_ok_json::<PreparedTransferConfig>(); 404 server 405 .get("/taler-revenue/config") 406 .await 407 .assert_ok_json::<RevenueConfig>(); 408 } 409 410 #[tokio::test] 411 async fn transfer() { 412 let (server, _) = setup().await; 413 transfer_routine(&server, TransferState::pending, &ACCOUNT).await; 414 } 415 416 #[tokio::test] 417 async fn outgoing_history() { 418 let (server, pool) = setup().await; 419 routine_pagination::<OutgoingHistory>( 420 &server, 421 "/taler-wire-gateway/history/outgoing", 422 async |i| { 423 db::register_tx_out( 424 &mut pool.acquire().await.unwrap(), 425 &db::TxOut { 426 transfer_id: i as i64, 427 tx_id: if i % 2 == 0 { 428 Some((i % 2) as i64) 429 } else { 430 None 431 }, 432 amount: decimal("10"), 433 subject: "subject".to_owned(), 434 creditor_id: 31000163100000000, 435 creditor_name: "Name".into(), 436 valued_at: Timestamp::now(), 437 }, 438 &TxOutKind::Talerable(OutgoingSubject::rand()), 439 &Timestamp::now(), 440 ) 441 .await 442 .unwrap(); 443 }, 444 ) 445 .await; 446 } 447 448 #[tokio::test] 449 async fn admin_add_incoming() { 450 let (server, _) = setup().await; 451 admin_add_incoming_routine(&server, &ACCOUNT, true).await; 452 } 453 454 #[tokio::test] 455 async fn in_history() { 456 let (server, _) = setup().await; 457 in_history_routine(&server, &ACCOUNT, true).await; 458 } 459 460 #[tokio::test] 461 async fn revenue() { 462 let (server, _) = setup().await; 463 revenue_routine(&server, &ACCOUNT, true).await; 464 } 465 466 async fn check_in(pool: &PgPool) -> Vec<Status> { 467 sqlx::query( 468 " 469 SELECT pending_recurrent_in.authorization_pub IS NOT NULL, bounced.tx_in_id IS NOT NULL, type, metadata 470 FROM tx_in 471 LEFT JOIN taler_in USING (tx_in_id) 472 LEFT JOIN pending_recurrent_in USING (tx_in_id) 473 LEFT JOIN bounced USING (tx_in_id) 474 ORDER BY tx_in.tx_in_id 475 ", 476 ) 477 .try_map(|r: PgRow| { 478 Ok( 479 if r.try_get_flag(0)? { 480 Status::Pending 481 } else if r.try_get_flag(1)? { 482 Status::Bounced 483 } else { 484 match r.try_get(2)? { 485 None => Status::Simple, 486 Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?), 487 Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?), 488 Some(e) => unreachable!("{e:?}") 489 } 490 } 491 ) 492 }) 493 .fetch_all(pool) 494 .await 495 .unwrap() 496 } 497 498 #[tokio::test] 499 async fn registration() { 500 let (server, pool) = setup().await; 501 registration_routine(&server, &ACCOUNT, || check_in(&pool)).await; 502 } 503 }