api.rs (19395B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use compact_str::CompactString; 18 use jiff::Timestamp; 19 use taler_api::{ 20 api::{TalerApi, prepared::PreparedTransfer, revenue::Revenue, wire::WireGateway}, 21 error::{ApiResult, failure_code}, 22 subject::{IncomingSubject, fmt_in_subject}, 23 }; 24 use taler_common::{ 25 api::{ 26 params::{History, Page}, 27 prepared::{ 28 RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, 29 Unregistration, 30 }, 31 revenue::RevenueIncomingHistory, 32 wire::{ 33 AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddMappedRequest, 34 IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, 35 TransferState, TransferStatus, 36 }, 37 }, 38 db::IncomingType, 39 error_code::ErrorCode, 40 types::{amount::Currency, timestamp::TalerTimestamp}, 41 }; 42 use tokio::sync::watch::Sender; 43 44 use crate::{ 45 db::{self, AddIncomingResult, Transfer, TxInAdmin}, 46 payto::{CyclosPayto, FullCyclosPayto}, 47 }; 48 49 pub struct CyclosApi { 50 pub pool: sqlx::PgPool, 51 pub currency: Currency, 52 pub payto: FullCyclosPayto, 53 pub in_channel: Sender<i64>, 54 pub taler_in_channel: Sender<i64>, 55 pub out_channel: Sender<i64>, 56 pub taler_out_channel: Sender<i64>, 57 pub root: CompactString, 58 } 59 60 impl CyclosApi { 61 pub fn start( 62 pool: sqlx::PgPool, 63 root: CompactString, 64 payto: FullCyclosPayto, 65 currency: Currency, 66 ) -> Self { 67 let in_channel = Sender::new(0); 68 let taler_in_channel = Sender::new(0); 69 let out_channel = Sender::new(0); 70 let taler_out_channel = Sender::new(0); 71 let tmp = Self { 72 pool: pool.clone(), 73 payto, 74 currency, 75 root, 76 in_channel: in_channel.clone(), 77 taler_in_channel: taler_in_channel.clone(), 78 out_channel: out_channel.clone(), 79 taler_out_channel: taler_out_channel.clone(), 80 }; 81 tokio::spawn(db::notification_listener( 82 pool, 83 in_channel, 84 taler_in_channel, 85 out_channel, 86 taler_out_channel, 87 )); 88 tmp 89 } 90 } 91 92 impl TalerApi for CyclosApi { 93 fn currency(&self) -> Currency { 94 self.currency 95 } 96 97 fn implementation(&self) -> &'static str { 98 "urn:net:taler:specs:taler-cyclos:taler-rust" 99 } 100 } 101 102 impl WireGateway for CyclosApi { 103 async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { 104 let creditor = FullCyclosPayto::try_from(&req.credit_account)?; 105 let result = db::make_transfer( 106 &self.pool, 107 &Transfer { 108 request_uid: req.request_uid, 109 amount: req.amount.decimal(), 110 exchange_base_url: req.exchange_base_url, 111 metadata: req.metadata, 112 wtid: req.wtid, 113 creditor_id: *creditor.id, 114 creditor_name: creditor.name, 115 }, 116 &Timestamp::now(), 117 ) 118 .await?; 119 match result { 120 db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse { 121 timestamp: initiated_at.into(), 122 row_id: id, 123 }), 124 db::TransferResult::RequestUidReuse => { 125 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED)) 126 } 127 db::TransferResult::WtidReuse => { 128 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)) 129 } 130 } 131 } 132 133 async fn transfer_page( 134 &self, 135 page: Page, 136 status: Option<TransferState>, 137 ) -> ApiResult<TransferList> { 138 Ok(TransferList { 139 transfers: db::transfer_page(&self.pool, &status, &self.currency, &self.root, &page) 140 .await?, 141 debit_account: self.payto.as_uri(), 142 }) 143 } 144 145 async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { 146 Ok(db::transfer_by_id(&self.pool, id, &self.currency, &self.root).await?) 147 } 148 149 async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { 150 Ok(OutgoingHistory { 151 outgoing_transactions: db::outgoing_history( 152 &self.pool, 153 ¶ms, 154 &self.currency, 155 &self.root, 156 || self.taler_out_channel.subscribe(), 157 ) 158 .await?, 159 debit_account: self.payto.as_uri(), 160 }) 161 } 162 163 async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { 164 Ok(IncomingHistory { 165 incoming_transactions: db::incoming_history( 166 &self.pool, 167 ¶ms, 168 &self.currency, 169 &self.root, 170 || self.taler_in_channel.subscribe(), 171 ) 172 .await?, 173 credit_account: self.payto.as_uri(), 174 }) 175 } 176 177 async fn add_incoming_reserve( 178 &self, 179 req: AddIncomingRequest, 180 ) -> ApiResult<AddIncomingResponse> { 181 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 182 let res = db::register_tx_in_admin( 183 &self.pool, 184 &TxInAdmin { 185 amount: req.amount.decimal(), 186 subject: format!("Admin incoming {}", req.reserve_pub), 187 debtor_id: *debtor.id, 188 debtor_name: debtor.name, 189 metadata: IncomingSubject::Reserve(req.reserve_pub), 190 }, 191 &Timestamp::now(), 192 ) 193 .await?; 194 match res { 195 AddIncomingResult::Success { 196 row_id, valued_at, .. 197 } => Ok(AddIncomingResponse { 198 row_id, 199 timestamp: valued_at.into(), 200 }), 201 AddIncomingResult::ReservePubReuse => { 202 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 203 } 204 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 205 unreachable!("mapping unused") 206 } 207 } 208 } 209 210 async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddIncomingResponse> { 211 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 212 let res = db::register_tx_in_admin( 213 &self.pool, 214 &TxInAdmin { 215 amount: req.amount.decimal(), 216 subject: format!("Admin incoming KYC:{}", req.account_pub), 217 debtor_id: *debtor.id, 218 debtor_name: debtor.name, 219 metadata: IncomingSubject::Kyc(req.account_pub), 220 }, 221 &Timestamp::now(), 222 ) 223 .await?; 224 match res { 225 AddIncomingResult::Success { 226 row_id, valued_at, .. 227 } => Ok(AddIncomingResponse { 228 row_id, 229 timestamp: valued_at.into(), 230 }), 231 AddIncomingResult::ReservePubReuse => { 232 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 233 } 234 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 235 unreachable!("mapping unused") 236 } 237 } 238 } 239 240 async fn add_incoming_mapped(&self, req: AddMappedRequest) -> ApiResult<AddIncomingResponse> { 241 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 242 let res = db::register_tx_in_admin( 243 &self.pool, 244 &TxInAdmin { 245 amount: req.amount.decimal(), 246 subject: format!("Admin incoming MAP:{}", req.authorization_pub), 247 debtor_id: *debtor.id, 248 debtor_name: debtor.name, 249 metadata: IncomingSubject::Map(req.authorization_pub), 250 }, 251 &Timestamp::now(), 252 ) 253 .await?; 254 match res { 255 AddIncomingResult::Success { 256 row_id, valued_at, .. 257 } => Ok(AddIncomingResponse { 258 row_id, 259 timestamp: valued_at.into(), 260 }), 261 AddIncomingResult::ReservePubReuse => { 262 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 263 } 264 AddIncomingResult::UnknownMapping => { 265 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_UNKNOWN)) 266 } 267 AddIncomingResult::MappingReuse => { 268 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_REUSED)) 269 } 270 } 271 } 272 273 fn support_account_check(&self) -> bool { 274 false 275 } 276 } 277 278 impl Revenue for CyclosApi { 279 async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> { 280 Ok(RevenueIncomingHistory { 281 incoming_transactions: db::revenue_history( 282 &self.pool, 283 ¶ms, 284 &self.currency, 285 &self.root, 286 || self.in_channel.subscribe(), 287 ) 288 .await?, 289 credit_account: self.payto.as_uri(), 290 }) 291 } 292 } 293 294 impl PreparedTransfer for CyclosApi { 295 fn supported_formats(&self) -> &[SubjectFormat] { 296 &[SubjectFormat::SIMPLE] 297 } 298 299 async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> { 300 let creditor = CyclosPayto::try_from(&req.credit_account)?; 301 if *creditor != *self.payto { 302 return Err(failure_code(ErrorCode::BANK_UNKNOWN_CREDITOR)); 303 } 304 match db::transfer_register(&self.pool, &req).await? { 305 db::RegistrationResult::Success => { 306 let simple = TransferSubject::Simple { 307 credit_amount: req.credit_amount, 308 subject: if req.authorization_pub == req.account_pub && !req.recurrent { 309 fmt_in_subject(req.r#type.into(), &req.account_pub).to_string() 310 } else { 311 fmt_in_subject(IncomingType::map, &req.authorization_pub).to_string() 312 }, 313 }; 314 ApiResult::Ok(RegistrationResponse { 315 subjects: vec![simple], 316 expiration: TalerTimestamp::Never, 317 }) 318 } 319 db::RegistrationResult::ReservePubReuse => { 320 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 321 } 322 } 323 } 324 325 async fn unregistration(&self, req: Unregistration) -> ApiResult<bool> { 326 Ok(db::transfer_unregister(&self.pool, &req).await?) 327 } 328 } 329 330 #[cfg(test)] 331 mod test { 332 use std::sync::{ 333 Arc, LazyLock, 334 atomic::{AtomicI64, Ordering}, 335 }; 336 337 use compact_str::CompactString; 338 use jiff::Timestamp; 339 use sqlx::{PgPool, Row as _, postgres::PgRow}; 340 use taler_api::{ 341 api::TalerRouter as _, 342 auth::AuthMethod, 343 db::TypeHelper as _, 344 subject::{IncomingSubject, OutgoingSubject}, 345 }; 346 use taler_common::{ 347 api::{ 348 EddsaPublicKey, 349 prepared::PreparedTransferConfig, 350 revenue::RevenueConfig, 351 wire::{TransferState, WireConfig}, 352 }, 353 db::IncomingType, 354 types::{ 355 amount::{Currency, decimal}, 356 payto::{PaytoURI, payto}, 357 }, 358 }; 359 use taler_test_utils::{ 360 Router, 361 db::db_test_setup, 362 routine::{ 363 Status, admin_add_incoming_routine, in_history_routine, out_history_routine, 364 registration_routine, revenue_routine, transfer_routine, 365 }, 366 server::TestServer as _, 367 tasks, 368 }; 369 370 use crate::{ 371 api::CyclosApi, 372 constants::CONFIG_SOURCE, 373 db::{self, TxIn, TxOutKind}, 374 payto::{FullCyclosPayto, cyclos_payto}, 375 }; 376 377 static PAYTO: LazyLock<FullCyclosPayto> = LazyLock::new(|| { 378 cyclos_payto("payto://cyclos/localhost/7762070814178012479?receiver-name=Smith") 379 }); 380 static EXCHANGE: LazyLock<PaytoURI> = LazyLock::new(|| PAYTO.as_uri()); 381 static UNKNOWN: LazyLock<PaytoURI> = LazyLock::new(|| { 382 payto("payto://cyclos/localhost/7762070814178012478?receiver-name=Unknown") 383 }); 384 385 async fn setup() -> (Router, PgPool) { 386 let (_, pool) = db_test_setup(CONFIG_SOURCE).await; 387 let api = Arc::new(CyclosApi::start( 388 pool.clone(), 389 CompactString::const_new("localhost"), 390 PAYTO.clone(), 391 Currency::TEST, 392 )); 393 let server = Router::new() 394 .wire_gateway(api.clone(), AuthMethod::None) 395 .prepared_transfer(api.clone()) 396 .revenue(api, AuthMethod::None) 397 .finalize(); 398 399 (server, pool) 400 } 401 402 #[tokio::test] 403 async fn config() { 404 let (server, _) = setup().await; 405 server 406 .get("/taler-wire-gateway/config") 407 .await 408 .assert_ok_json::<WireConfig>(); 409 server 410 .get("/taler-prepared-transfer/config") 411 .await 412 .assert_ok_json::<PreparedTransferConfig>(); 413 server 414 .get("/taler-revenue/config") 415 .await 416 .assert_ok_json::<RevenueConfig>(); 417 } 418 419 #[tokio::test] 420 async fn transfer() { 421 let (server, _) = setup().await; 422 transfer_routine( 423 &server.prefix("/taler-wire-gateway"), 424 TransferState::pending, 425 &EXCHANGE, 426 ) 427 .await; 428 } 429 430 static CODE: AtomicI64 = AtomicI64::new(0); 431 432 async fn r#in(db: &PgPool, subject: Option<IncomingSubject>) { 433 let now = Timestamp::now(); 434 db::register_tx_in( 435 &mut db.acquire().await.unwrap(), 436 &TxIn { 437 transfer_id: CODE.fetch_add(1, Ordering::Relaxed), 438 tx_id: None, 439 amount: decimal("10"), 440 subject: "subject".to_owned(), 441 debtor_id: 31000163100000000, 442 debtor_name: "Name".into(), 443 valued_at: Timestamp::now(), 444 }, 445 &subject, 446 &now, 447 ) 448 .await 449 .unwrap(); 450 } 451 452 async fn in_malformed(db: &PgPool) { 453 r#in(db, None).await 454 } 455 456 async fn in_talerable(db: &PgPool) { 457 r#in(db, Some(IncomingSubject::Reserve(EddsaPublicKey::rand()))).await 458 } 459 460 async fn out(db: &PgPool, kind: &TxOutKind) { 461 let i = CODE.fetch_add(1, Ordering::Relaxed); 462 let now = Timestamp::now(); 463 db::register_tx_out( 464 &mut db.acquire().await.unwrap(), 465 &db::TxOut { 466 transfer_id: i, 467 tx_id: if i % 2 == 0 { Some(i % 2) } else { None }, 468 amount: decimal("10"), 469 subject: "subject".to_owned(), 470 creditor_id: 31000163100000000, 471 creditor_name: "Name".into(), 472 valued_at: now, 473 }, 474 kind, 475 &now, 476 ) 477 .await 478 .unwrap(); 479 } 480 481 async fn out_talerable(db: &PgPool) { 482 out(db, &TxOutKind::Talerable(OutgoingSubject::rand())).await 483 } 484 485 async fn out_bounce(db: &PgPool) { 486 out(db, &TxOutKind::Bounce(CODE.load(Ordering::Relaxed))).await 487 } 488 489 async fn out_malformed(db: &PgPool) { 490 out(db, &TxOutKind::Simple).await 491 } 492 493 #[tokio::test] 494 async fn outgoing_history() { 495 let (server, db) = &setup().await; 496 497 out_history_routine( 498 &server.prefix("/taler-wire-gateway"), 499 tasks!({ out_talerable(db).await }), 500 tasks!( 501 { out_bounce(db).await }, 502 { out_malformed(db).await }, 503 { in_malformed(db).await }, 504 { in_talerable(db).await } 505 ), 506 ) 507 .await; 508 } 509 510 #[tokio::test] 511 async fn admin_add_incoming() { 512 let (server, _) = setup().await; 513 admin_add_incoming_routine( 514 &server.prefix("/taler-wire-gateway"), 515 &server.prefix("/taler-prepared-transfer"), 516 &EXCHANGE, 517 &EXCHANGE, 518 true, 519 ) 520 .await; 521 } 522 523 #[tokio::test] 524 async fn in_history() { 525 let (server, db) = &setup().await; 526 in_history_routine( 527 &server.prefix("/taler-wire-gateway"), 528 &server.prefix("/taler-prepared-transfer"), 529 &EXCHANGE, 530 &EXCHANGE, 531 true, 532 tasks!({ in_talerable(db).await }), 533 tasks!( 534 { out_malformed(db).await }, 535 { out_talerable(db).await }, 536 { out_bounce(db).await }, 537 { in_malformed(db).await } 538 ), 539 ) 540 .await; 541 } 542 543 #[tokio::test] 544 async fn revenue() { 545 let (server, db) = &setup().await; 546 revenue_routine( 547 &server.prefix("/taler-wire-gateway"), 548 &server.prefix("/taler-revenue"), 549 &EXCHANGE, 550 true, 551 tasks!({ in_malformed(db).await }, { in_talerable(db).await },), 552 tasks!({ out_malformed(db).await }, { out_talerable(db).await }, { 553 out_bounce(db).await 554 }), 555 ) 556 .await; 557 } 558 559 async fn check_in(pool: &PgPool) -> Vec<Status> { 560 sqlx::query( 561 " 562 SELECT pending_recurrent_in.authorization_pub IS NOT NULL, bounced.tx_in_id IS NOT NULL, type, metadata 563 FROM tx_in 564 LEFT JOIN taler_in USING (tx_in_id) 565 LEFT JOIN pending_recurrent_in USING (tx_in_id) 566 LEFT JOIN bounced USING (tx_in_id) 567 ORDER BY tx_in.tx_in_id 568 ", 569 ) 570 .try_map(|r: PgRow| { 571 Ok( 572 if r.try_get_flag(0)? { 573 Status::Pending 574 } else if r.try_get_flag(1)? { 575 Status::Bounced 576 } else { 577 match r.try_get(2)? { 578 None => Status::Simple, 579 Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?), 580 Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?), 581 Some(e) => unreachable!("{e:?}") 582 } 583 } 584 ) 585 }) 586 .fetch_all(pool) 587 .await 588 .unwrap() 589 } 590 591 #[tokio::test] 592 async fn registration() { 593 let (server, pool) = setup().await; 594 registration_routine( 595 &server.prefix("/taler-wire-gateway"), 596 &server.prefix("/taler-prepared-transfer"), 597 &EXCHANGE, 598 &EXCHANGE, 599 &UNKNOWN, 600 || check_in(&pool), 601 ) 602 .await; 603 } 604 }