api.rs (18407B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use jiff::Timestamp; 18 use taler_api::{ 19 api::{TalerApi, prepared::PreparedTransfer, revenue::Revenue, wire::WireGateway}, 20 error::{ApiResult, failure_code}, 21 subject::{IncomingSubject, fmt_in_subject}, 22 }; 23 use taler_common::{ 24 api::{ 25 params::{History, Page}, 26 prepared::{ 27 RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, 28 Unregistration, 29 }, 30 revenue::RevenueIncomingHistory, 31 wire::{ 32 AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddMappedRequest, 33 IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, 34 TransferState, TransferStatus, 35 }, 36 }, 37 db::IncomingType, 38 error_code::ErrorCode, 39 types::{amount::Currency, timestamp::TalerTimestamp, utils::date_to_utc_ts}, 40 }; 41 use tokio::sync::watch::Sender; 42 43 use crate::{ 44 FullHuPayto, 45 constants::CURR, 46 db::{self, AddIncomingResult, Transfer, TxInAdmin}, 47 }; 48 49 pub struct MagnetApi { 50 pub pool: sqlx::PgPool, 51 pub payto: FullHuPayto, 52 pub in_channel: Sender<i64>, 53 pub taler_in_channel: Sender<i64>, 54 pub out_channel: Sender<i64>, 55 pub taler_out_channel: Sender<i64>, 56 } 57 58 impl MagnetApi { 59 pub async fn start(pool: sqlx::PgPool, payto: FullHuPayto) -> Self { 60 let in_channel = Sender::new(0); 61 let taler_in_channel = Sender::new(0); 62 let out_channel = Sender::new(0); 63 let taler_out_channel = Sender::new(0); 64 let tmp = Self { 65 pool: pool.clone(), 66 payto, 67 in_channel: in_channel.clone(), 68 taler_in_channel: taler_in_channel.clone(), 69 out_channel: out_channel.clone(), 70 taler_out_channel: taler_out_channel.clone(), 71 }; 72 tokio::spawn(db::notification_listener( 73 pool, 74 in_channel, 75 taler_in_channel, 76 out_channel, 77 taler_out_channel, 78 )); 79 tmp 80 } 81 } 82 83 impl TalerApi for MagnetApi { 84 fn currency(&self) -> Currency { 85 CURR 86 } 87 88 fn implementation(&self) -> &'static str { 89 "urn:net:taler:specs:taler-magnet-bank:taler-rust" 90 } 91 } 92 93 impl WireGateway for MagnetApi { 94 async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { 95 let creditor = FullHuPayto::try_from(&req.credit_account)?; 96 let result = db::make_transfer( 97 &self.pool, 98 &Transfer { 99 request_uid: req.request_uid, 100 wtid: req.wtid, 101 amount: req.amount.decimal(), 102 metadata: req.metadata, 103 creditor, 104 exchange_base_url: req.exchange_base_url, 105 }, 106 &Timestamp::now(), 107 ) 108 .await?; 109 match result { 110 db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse { 111 timestamp: initiated_at.into(), 112 row_id: id, 113 }), 114 db::TransferResult::RequestUidReuse => { 115 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED)) 116 } 117 db::TransferResult::WtidReuse => { 118 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)) 119 } 120 } 121 } 122 123 async fn transfer_page( 124 &self, 125 page: Page, 126 status: Option<TransferState>, 127 ) -> ApiResult<TransferList> { 128 Ok(TransferList { 129 transfers: db::transfer_page(&self.pool, &status, &page).await?, 130 debit_account: self.payto.as_uri(), 131 }) 132 } 133 134 async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { 135 Ok(db::transfer_by_id(&self.pool, id).await?) 136 } 137 138 async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { 139 Ok(OutgoingHistory { 140 outgoing_transactions: db::outgoing_history(&self.pool, ¶ms, || { 141 self.taler_out_channel.subscribe() 142 }) 143 .await?, 144 debit_account: self.payto.as_uri(), 145 }) 146 } 147 148 async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { 149 Ok(IncomingHistory { 150 incoming_transactions: db::incoming_history(&self.pool, ¶ms, || { 151 self.taler_in_channel.subscribe() 152 }) 153 .await?, 154 credit_account: self.payto.as_uri(), 155 }) 156 } 157 158 async fn add_incoming_reserve( 159 &self, 160 req: AddIncomingRequest, 161 ) -> ApiResult<AddIncomingResponse> { 162 let debtor = FullHuPayto::try_from(&req.debit_account)?; 163 let res = db::register_tx_in_admin( 164 &self.pool, 165 &TxInAdmin { 166 amount: req.amount, 167 subject: format!("Admin incoming {}", req.reserve_pub), 168 debtor, 169 metadata: IncomingSubject::Reserve(req.reserve_pub), 170 }, 171 &Timestamp::now(), 172 ) 173 .await?; 174 match res { 175 AddIncomingResult::Success { 176 row_id, valued_at, .. 177 } => Ok(AddIncomingResponse { 178 row_id, 179 timestamp: date_to_utc_ts(&valued_at).into(), 180 }), 181 AddIncomingResult::ReservePubReuse => { 182 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 183 } 184 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 185 unreachable!("mapping not used") 186 } 187 } 188 } 189 190 async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddIncomingResponse> { 191 let debtor = FullHuPayto::try_from(&req.debit_account)?; 192 let res = db::register_tx_in_admin( 193 &self.pool, 194 &TxInAdmin { 195 amount: req.amount, 196 subject: format!("Admin incoming KYC:{}", req.account_pub), 197 debtor, 198 metadata: IncomingSubject::Kyc(req.account_pub), 199 }, 200 &Timestamp::now(), 201 ) 202 .await?; 203 match res { 204 AddIncomingResult::Success { 205 row_id, valued_at, .. 206 } => Ok(AddIncomingResponse { 207 row_id, 208 timestamp: date_to_utc_ts(&valued_at).into(), 209 }), 210 AddIncomingResult::ReservePubReuse => unreachable!("kyc"), 211 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 212 unreachable!("mapping not used") 213 } 214 } 215 } 216 217 async fn add_incoming_mapped(&self, req: AddMappedRequest) -> ApiResult<AddIncomingResponse> { 218 let debtor = FullHuPayto::try_from(&req.debit_account)?; 219 let res = db::register_tx_in_admin( 220 &self.pool, 221 &TxInAdmin { 222 amount: req.amount, 223 subject: format!("Admin incoming MAP:{}", req.authorization_pub), 224 debtor, 225 metadata: IncomingSubject::Map(req.authorization_pub), 226 }, 227 &Timestamp::now(), 228 ) 229 .await?; 230 match res { 231 AddIncomingResult::Success { 232 row_id, valued_at, .. 233 } => Ok(AddIncomingResponse { 234 row_id, 235 timestamp: date_to_utc_ts(&valued_at).into(), 236 }), 237 AddIncomingResult::ReservePubReuse => { 238 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 239 } 240 AddIncomingResult::UnknownMapping => { 241 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_UNKNOWN)) 242 } 243 AddIncomingResult::MappingReuse => { 244 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_REUSED)) 245 } 246 } 247 } 248 249 fn support_account_check(&self) -> bool { 250 false 251 } 252 } 253 254 impl Revenue for MagnetApi { 255 async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> { 256 Ok(RevenueIncomingHistory { 257 incoming_transactions: db::revenue_history(&self.pool, ¶ms, || { 258 self.in_channel.subscribe() 259 }) 260 .await?, 261 credit_account: self.payto.as_uri(), 262 }) 263 } 264 } 265 266 impl PreparedTransfer for MagnetApi { 267 fn supported_formats(&self) -> &[SubjectFormat] { 268 &[SubjectFormat::SIMPLE] 269 } 270 271 async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> { 272 let creditor = FullHuPayto::try_from(&req.credit_account)?; 273 if *creditor != *self.payto { 274 return Err(failure_code(ErrorCode::BANK_UNKNOWN_CREDITOR)); 275 } 276 match db::transfer_register(&self.pool, &req).await? { 277 db::RegistrationResult::Success => { 278 let simple = TransferSubject::Simple { 279 credit_amount: req.credit_amount, 280 subject: if req.authorization_pub == req.account_pub && !req.recurrent { 281 fmt_in_subject(req.r#type.into(), &req.account_pub).to_string() 282 } else { 283 fmt_in_subject(IncomingType::map, &req.authorization_pub).to_string() 284 }, 285 }; 286 ApiResult::Ok(RegistrationResponse { 287 subjects: vec![simple], 288 expiration: TalerTimestamp::Never, 289 }) 290 } 291 db::RegistrationResult::ReservePubReuse => { 292 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 293 } 294 } 295 } 296 297 async fn unregistration(&self, req: Unregistration) -> ApiResult<bool> { 298 Ok(db::transfer_unregister(&self.pool, &req).await?) 299 } 300 } 301 302 #[cfg(test)] 303 mod test { 304 305 use std::sync::{ 306 Arc, LazyLock, 307 atomic::{AtomicU64, Ordering}, 308 }; 309 310 use jiff::{Timestamp, Zoned}; 311 use sqlx::{PgPool, Row as _, postgres::PgRow}; 312 use taler_api::{ 313 api::TalerRouter as _, 314 auth::AuthMethod, 315 db::TypeHelper as _, 316 subject::{IncomingSubject, OutgoingSubject}, 317 }; 318 use taler_common::{ 319 api::{ 320 EddsaPublicKey, 321 prepared::PreparedTransferConfig, 322 revenue::RevenueConfig, 323 wire::{TransferState, WireConfig}, 324 }, 325 db::IncomingType, 326 types::{ 327 amount::amount, 328 payto::{PaytoURI, payto}, 329 }, 330 }; 331 use taler_test_utils::{ 332 Router, 333 db::db_test_setup, 334 routine::{ 335 Status, admin_add_incoming_routine, in_history_routine, out_history_routine, 336 registration_routine, revenue_routine, transfer_routine, 337 }, 338 server::TestServer, 339 tasks, 340 }; 341 342 use crate::{ 343 FullHuPayto, 344 api::MagnetApi, 345 constants::CONFIG_SOURCE, 346 db::{self, TxIn, TxOutKind}, 347 magnet_api::types::TxStatus, 348 magnet_payto, 349 }; 350 351 static PAYTO: LazyLock<FullHuPayto> = LazyLock::new(|| { 352 magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=Smith") 353 }); 354 static EXCHANGE: LazyLock<PaytoURI> = LazyLock::new(|| PAYTO.as_uri()); 355 static UNKNOWN: LazyLock<PaytoURI> = 356 LazyLock::new(|| payto("payto://iban/HU60162006491000639900000000?receiver-name=Unknown")); 357 358 async fn setup() -> (Router, PgPool) { 359 let (_, pool) = db_test_setup(CONFIG_SOURCE).await; 360 let api = Arc::new(MagnetApi::start(pool.clone(), PAYTO.clone()).await); 361 let server = Router::new() 362 .wire_gateway(api.clone(), AuthMethod::None) 363 .prepared_transfer(api.clone()) 364 .revenue(api, AuthMethod::None) 365 .finalize(); 366 367 (server, pool) 368 } 369 370 #[tokio::test] 371 async fn config() { 372 let (server, _) = setup().await; 373 server 374 .get("/taler-wire-gateway/config") 375 .await 376 .assert_ok_json::<WireConfig>(); 377 server 378 .get("/taler-prepared-transfer/config") 379 .await 380 .assert_ok_json::<PreparedTransferConfig>(); 381 server 382 .get("/taler-revenue/config") 383 .await 384 .assert_ok_json::<RevenueConfig>(); 385 } 386 387 #[tokio::test] 388 async fn transfer() { 389 let (server, _) = setup().await; 390 transfer_routine( 391 &server.prefix("/taler-wire-gateway"), 392 TransferState::pending, 393 &payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), 394 ) 395 .await; 396 } 397 398 static CODE: AtomicU64 = AtomicU64::new(0); 399 400 async fn r#in(db: &PgPool, subject: Option<IncomingSubject>) { 401 db::register_tx_in( 402 &mut db.acquire().await.unwrap(), 403 &TxIn { 404 code: CODE.fetch_add(1, Ordering::Relaxed), 405 amount: amount("EUR:10"), 406 subject: "subject".into(), 407 debtor: magnet_payto( 408 "payto://iban/HU30162000031000163100000000?receiver-name=name", 409 ), 410 value_date: Zoned::now().date(), 411 status: TxStatus::Completed, 412 }, 413 &subject, 414 &Timestamp::now(), 415 ) 416 .await 417 .unwrap(); 418 } 419 420 async fn in_malformed(db: &PgPool) { 421 r#in(db, None).await 422 } 423 424 async fn in_talerable(db: &PgPool) { 425 r#in(db, Some(IncomingSubject::Reserve(EddsaPublicKey::rand()))).await 426 } 427 428 async fn out(db: &PgPool, kind: &TxOutKind) { 429 db::register_tx_out( 430 &mut db.acquire().await.unwrap(), 431 &db::TxOut { 432 code: CODE.fetch_add(1, Ordering::Relaxed), 433 amount: amount("EUR:10"), 434 subject: "subject".into(), 435 creditor: PAYTO.clone(), 436 value_date: Zoned::now().date(), 437 status: TxStatus::Completed, 438 }, 439 kind, 440 &Timestamp::now(), 441 ) 442 .await 443 .unwrap(); 444 } 445 446 async fn out_talerable(db: &PgPool) { 447 out(db, &TxOutKind::Talerable(OutgoingSubject::rand())).await 448 } 449 450 async fn out_bounce(db: &PgPool) { 451 out(db, &TxOutKind::Bounce(CODE.load(Ordering::Relaxed) as u32)).await 452 } 453 454 async fn out_malformed(db: &PgPool) { 455 out(db, &TxOutKind::Simple).await 456 } 457 458 #[tokio::test] 459 async fn outgoing_history() { 460 let (server, db) = &setup().await; 461 out_history_routine( 462 &server.prefix("/taler-wire-gateway"), 463 tasks!({ out_talerable(db).await }), 464 tasks!( 465 { out_bounce(db).await }, 466 { out_malformed(db).await }, 467 { in_malformed(db).await }, 468 { in_talerable(db).await } 469 ), 470 ) 471 .await; 472 } 473 474 #[tokio::test] 475 async fn admin_add_incoming() { 476 let (server, _) = setup().await; 477 admin_add_incoming_routine( 478 &server.prefix("/taler-wire-gateway"), 479 &server.prefix("/taler-prepared-transfer"), 480 &EXCHANGE, 481 &EXCHANGE, 482 true, 483 ) 484 .await; 485 } 486 487 #[tokio::test] 488 async fn in_history() { 489 let (server, db) = &setup().await; 490 in_history_routine( 491 &server.prefix("/taler-wire-gateway"), 492 &server.prefix("/taler-prepared-transfer"), 493 &EXCHANGE, 494 &EXCHANGE, 495 true, 496 tasks!({ in_talerable(db).await }), 497 tasks!( 498 { out_malformed(db).await }, 499 { out_talerable(db).await }, 500 { out_bounce(db).await }, 501 { in_malformed(db).await } 502 ), 503 ) 504 .await; 505 } 506 507 #[tokio::test] 508 async fn revenue() { 509 let (server, db) = &setup().await; 510 revenue_routine( 511 &server.prefix("/taler-wire-gateway"), 512 &server.prefix("/taler-revenue"), 513 &EXCHANGE, 514 true, 515 tasks!({ in_malformed(db).await }, { in_talerable(db).await },), 516 tasks!({ out_malformed(db).await }, { out_talerable(db).await }, { 517 out_bounce(db).await 518 }), 519 ) 520 .await; 521 } 522 523 async fn check_in(pool: &PgPool) -> Vec<Status> { 524 sqlx::query( 525 " 526 SELECT pending_recurrent_in.authorization_pub IS NOT NULL, initiated_id IS NOT NULL, type, metadata 527 FROM tx_in 528 LEFT JOIN taler_in USING (tx_in_id) 529 LEFT JOIN pending_recurrent_in USING (tx_in_id) 530 LEFT JOIN bounced USING (tx_in_id) 531 ORDER BY tx_in.tx_in_id 532 ", 533 ) 534 .try_map(|r: PgRow| { 535 Ok( 536 if r.try_get_flag(0)? { 537 Status::Pending 538 } else if r.try_get_flag(1)? { 539 Status::Bounced 540 } else { 541 match r.try_get(2)? { 542 None => Status::Simple, 543 Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?), 544 Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?), 545 Some(e) => unreachable!("{e:?}") 546 } 547 } 548 ) 549 }) 550 .fetch_all(pool) 551 .await 552 .unwrap() 553 } 554 555 #[tokio::test] 556 async fn registration() { 557 let (server, pool) = setup().await; 558 registration_routine( 559 &server.prefix("/taler-wire-gateway"), 560 &server.prefix("/taler-prepared-transfer"), 561 &EXCHANGE, 562 &EXCHANGE, 563 &UNKNOWN, 564 || check_in(&pool), 565 ) 566 .await; 567 } 568 }