api.rs (15740B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use jiff::Timestamp; 18 use taler_api::{ 19 api::{TalerApi, revenue::Revenue, transfer::PreparedTransfer, wire::WireGateway}, 20 error::{ApiResult, failure, failure_code}, 21 subject::{IncomingSubject, fmt_in_subject}, 22 }; 23 use taler_common::{ 24 api_common::safe_u64, 25 api_params::{History, Page}, 26 api_revenue::RevenueIncomingHistory, 27 api_transfer::{ 28 RegistrationRequest, RegistrationResponse, SubjectFormat, TransferSubject, Unregistration, 29 }, 30 api_wire::{ 31 AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddMappedRequest, 32 IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, 33 TransferState, TransferStatus, 34 }, 35 db::IncomingType, 36 error_code::ErrorCode, 37 types::{payto::PaytoURI, timestamp::TalerTimestamp, utils::date_to_utc_ts}, 38 }; 39 use tokio::sync::watch::Sender; 40 41 use crate::{ 42 FullHuPayto, 43 constants::CURRENCY, 44 db::{self, AddIncomingResult, Transfer, TxInAdmin}, 45 }; 46 47 pub struct MagnetApi { 48 pub pool: sqlx::PgPool, 49 pub payto: PaytoURI, 50 pub in_channel: Sender<i64>, 51 pub taler_in_channel: Sender<i64>, 52 pub out_channel: Sender<i64>, 53 pub taler_out_channel: Sender<i64>, 54 } 55 56 impl MagnetApi { 57 pub async fn start(pool: sqlx::PgPool, payto: PaytoURI) -> Self { 58 let in_channel = Sender::new(0); 59 let taler_in_channel = Sender::new(0); 60 let out_channel = Sender::new(0); 61 let taler_out_channel = Sender::new(0); 62 let tmp = Self { 63 pool: pool.clone(), 64 payto, 65 in_channel: in_channel.clone(), 66 taler_in_channel: taler_in_channel.clone(), 67 out_channel: out_channel.clone(), 68 taler_out_channel: taler_out_channel.clone(), 69 }; 70 tokio::spawn(db::notification_listener( 71 pool, 72 in_channel, 73 taler_in_channel, 74 out_channel, 75 taler_out_channel, 76 )); 77 tmp 78 } 79 } 80 81 impl TalerApi for MagnetApi { 82 fn currency(&self) -> &str { 83 CURRENCY.as_ref() 84 } 85 86 fn implementation(&self) -> &'static str { 87 "urn:net:taler:specs:taler-magnet-bank:taler-rust" 88 } 89 } 90 91 impl WireGateway for MagnetApi { 92 async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { 93 let creditor = FullHuPayto::try_from(&req.credit_account)?; 94 let result = db::make_transfer( 95 &self.pool, 96 &Transfer { 97 request_uid: req.request_uid, 98 wtid: req.wtid, 99 amount: req.amount.decimal(), 100 metadata: req.metadata, 101 creditor, 102 exchange_base_url: req.exchange_base_url, 103 }, 104 &Timestamp::now(), 105 ) 106 .await?; 107 match result { 108 db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse { 109 timestamp: initiated_at.into(), 110 row_id: safe_u64(id), 111 }), 112 db::TransferResult::RequestUidReuse => { 113 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED)) 114 } 115 db::TransferResult::WtidReuse => { 116 Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)) 117 } 118 } 119 } 120 121 async fn transfer_page( 122 &self, 123 page: Page, 124 status: Option<TransferState>, 125 ) -> ApiResult<TransferList> { 126 Ok(TransferList { 127 transfers: db::transfer_page(&self.pool, &status, &page).await?, 128 debit_account: self.payto.clone(), 129 }) 130 } 131 132 async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { 133 Ok(db::transfer_by_id(&self.pool, id).await?) 134 } 135 136 async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { 137 Ok(OutgoingHistory { 138 outgoing_transactions: db::outgoing_history(&self.pool, ¶ms, || { 139 self.taler_out_channel.subscribe() 140 }) 141 .await?, 142 debit_account: self.payto.clone(), 143 }) 144 } 145 146 async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { 147 Ok(IncomingHistory { 148 incoming_transactions: db::incoming_history(&self.pool, ¶ms, || { 149 self.taler_in_channel.subscribe() 150 }) 151 .await?, 152 credit_account: self.payto.clone(), 153 }) 154 } 155 156 async fn add_incoming_reserve( 157 &self, 158 req: AddIncomingRequest, 159 ) -> ApiResult<AddIncomingResponse> { 160 let debtor = FullHuPayto::try_from(&req.debit_account)?; 161 let res = db::register_tx_in_admin( 162 &self.pool, 163 &TxInAdmin { 164 amount: req.amount, 165 subject: format!("Admin incoming {}", req.reserve_pub), 166 debtor, 167 metadata: IncomingSubject::Reserve(req.reserve_pub), 168 }, 169 &Timestamp::now(), 170 ) 171 .await?; 172 match res { 173 AddIncomingResult::Success { 174 row_id, valued_at, .. 175 } => Ok(AddIncomingResponse { 176 row_id: safe_u64(row_id), 177 timestamp: date_to_utc_ts(&valued_at).into(), 178 }), 179 AddIncomingResult::ReservePubReuse => { 180 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 181 } 182 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 183 unreachable!("mapping not used") 184 } 185 } 186 } 187 188 async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddIncomingResponse> { 189 let debtor = FullHuPayto::try_from(&req.debit_account)?; 190 let res = db::register_tx_in_admin( 191 &self.pool, 192 &TxInAdmin { 193 amount: req.amount, 194 subject: format!("Admin incoming KYC:{}", req.account_pub), 195 debtor, 196 metadata: IncomingSubject::Kyc(req.account_pub), 197 }, 198 &Timestamp::now(), 199 ) 200 .await?; 201 match res { 202 AddIncomingResult::Success { 203 row_id, valued_at, .. 204 } => Ok(AddIncomingResponse { 205 row_id: safe_u64(row_id), 206 timestamp: date_to_utc_ts(&valued_at).into(), 207 }), 208 AddIncomingResult::ReservePubReuse => unreachable!("kyc"), 209 AddIncomingResult::UnknownMapping | AddIncomingResult::MappingReuse => { 210 unreachable!("mapping not used") 211 } 212 } 213 } 214 215 async fn add_incoming_mapped(&self, req: AddMappedRequest) -> ApiResult<AddIncomingResponse> { 216 let debtor = FullHuPayto::try_from(&req.debit_account)?; 217 let res = db::register_tx_in_admin( 218 &self.pool, 219 &TxInAdmin { 220 amount: req.amount, 221 subject: format!("Admin incoming MAP:{}", req.authorization_pub), 222 debtor, 223 metadata: IncomingSubject::Map(req.authorization_pub), 224 }, 225 &Timestamp::now(), 226 ) 227 .await?; 228 match res { 229 AddIncomingResult::Success { 230 row_id, valued_at, .. 231 } => Ok(AddIncomingResponse { 232 row_id: safe_u64(row_id), 233 timestamp: date_to_utc_ts(&valued_at).into(), 234 }), 235 AddIncomingResult::ReservePubReuse => { 236 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 237 } 238 AddIncomingResult::UnknownMapping => { 239 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_UNKNOWN)) 240 } 241 AddIncomingResult::MappingReuse => { 242 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_REUSED)) 243 } 244 } 245 } 246 247 fn support_account_check(&self) -> bool { 248 false 249 } 250 } 251 252 impl Revenue for MagnetApi { 253 async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> { 254 Ok(RevenueIncomingHistory { 255 incoming_transactions: db::revenue_history(&self.pool, ¶ms, || { 256 self.in_channel.subscribe() 257 }) 258 .await?, 259 credit_account: self.payto.clone(), 260 }) 261 } 262 } 263 264 impl PreparedTransfer for MagnetApi { 265 fn supported_formats(&self) -> &[SubjectFormat] { 266 &[SubjectFormat::SIMPLE] 267 } 268 269 async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> { 270 match db::transfer_register(&self.pool, &req).await? { 271 db::RegistrationResult::Success => { 272 let simple = TransferSubject::Simple { 273 credit_amount: req.credit_amount, 274 subject: if req.authorization_pub == req.account_pub && !req.recurrent { 275 fmt_in_subject(req.r#type.into(), &req.account_pub) 276 } else { 277 fmt_in_subject(IncomingType::map, &req.authorization_pub) 278 }, 279 }; 280 ApiResult::Ok(RegistrationResponse { 281 subjects: vec![simple], 282 expiration: TalerTimestamp::Never, 283 }) 284 } 285 db::RegistrationResult::ReservePubReuse => { 286 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 287 } 288 } 289 } 290 291 async fn unregistration(&self, req: Unregistration) -> ApiResult<()> { 292 if !db::transfer_unregister(&self.pool, &req).await? { 293 Err(failure( 294 ErrorCode::BANK_TRANSACTION_NOT_FOUND, 295 format!("Prepared transfer '{}' not found", req.authorization_pub), 296 )) 297 } else { 298 Ok(()) 299 } 300 } 301 } 302 303 #[cfg(test)] 304 mod test { 305 306 use std::sync::{Arc, LazyLock}; 307 308 use jiff::{Timestamp, Zoned}; 309 use sqlx::{PgPool, Row as _, postgres::PgRow}; 310 use taler_api::{ 311 api::TalerRouter as _, auth::AuthMethod, db::TypeHelper as _, subject::OutgoingSubject, 312 }; 313 use taler_common::{ 314 api_revenue::RevenueConfig, 315 api_transfer::PreparedTransferConfig, 316 api_wire::{OutgoingHistory, TransferState, WireConfig}, 317 db::IncomingType, 318 types::{ 319 amount::amount, 320 payto::{PaytoURI, payto}, 321 }, 322 }; 323 use taler_test_utils::{ 324 Router, 325 db::db_test_setup, 326 routine::{ 327 Status, admin_add_incoming_routine, in_history_routine, registration_routine, 328 revenue_routine, routine_pagination, transfer_routine, 329 }, 330 server::TestServer, 331 }; 332 333 use crate::{ 334 FullHuPayto, 335 api::MagnetApi, 336 constants::CONFIG_SOURCE, 337 db::{self, TxOutKind}, 338 magnet_api::types::TxStatus, 339 magnet_payto, 340 }; 341 342 static PAYTO: LazyLock<FullHuPayto> = LazyLock::new(|| { 343 magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name") 344 }); 345 static ACCOUNT: LazyLock<PaytoURI> = LazyLock::new(|| PAYTO.as_payto()); 346 347 async fn setup() -> (Router, PgPool) { 348 let (_, pool) = db_test_setup(CONFIG_SOURCE).await; 349 let api = Arc::new(MagnetApi::start(pool.clone(), ACCOUNT.clone()).await); 350 let server = Router::new() 351 .wire_gateway(api.clone(), AuthMethod::None) 352 .prepared_transfer(api.clone()) 353 .revenue(api, AuthMethod::None) 354 .finalize(); 355 356 (server, pool) 357 } 358 359 #[tokio::test] 360 async fn config() { 361 let (server, _) = setup().await; 362 server 363 .get("/taler-wire-gateway/config") 364 .await 365 .assert_ok_json::<WireConfig>(); 366 server 367 .get("/taler-prepared-transfer/config") 368 .await 369 .assert_ok_json::<PreparedTransferConfig>(); 370 server 371 .get("/taler-revenue/config") 372 .await 373 .assert_ok_json::<RevenueConfig>(); 374 } 375 376 #[tokio::test] 377 async fn transfer() { 378 let (server, _) = setup().await; 379 transfer_routine( 380 &server, 381 TransferState::pending, 382 &payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), 383 ) 384 .await; 385 } 386 387 #[tokio::test] 388 async fn outgoing_history() { 389 let (server, pool) = setup().await; 390 routine_pagination::<OutgoingHistory>( 391 &server, 392 "/taler-wire-gateway/history/outgoing", 393 async |i| { 394 let mut conn = pool.acquire().await.unwrap(); 395 let now = Zoned::now().date(); 396 db::register_tx_out( 397 &mut conn, 398 &db::TxOut { 399 code: i as u64, 400 amount: amount("EUR:10"), 401 subject: "subject".into(), 402 creditor: PAYTO.clone(), 403 value_date: now, 404 status: TxStatus::Completed, 405 }, 406 &TxOutKind::Talerable(OutgoingSubject::rand()), 407 &Timestamp::now(), 408 ) 409 .await 410 .unwrap(); 411 }, 412 ) 413 .await; 414 } 415 416 #[tokio::test] 417 async fn admin_add_incoming() { 418 let (server, _) = setup().await; 419 admin_add_incoming_routine(&server, &ACCOUNT, true).await; 420 } 421 422 #[tokio::test] 423 async fn in_history() { 424 let (server, _) = setup().await; 425 in_history_routine(&server, &ACCOUNT, true).await; 426 } 427 428 #[tokio::test] 429 async fn revenue() { 430 let (server, _) = setup().await; 431 revenue_routine(&server, &ACCOUNT, true).await; 432 } 433 434 async fn check_in(pool: &PgPool) -> Vec<Status> { 435 sqlx::query( 436 " 437 SELECT pending_recurrent_in.authorization_pub IS NOT NULL, initiated_id IS NOT NULL, type, metadata 438 FROM tx_in 439 LEFT JOIN taler_in USING (tx_in_id) 440 LEFT JOIN pending_recurrent_in USING (tx_in_id) 441 LEFT JOIN bounced USING (tx_in_id) 442 ORDER BY tx_in.tx_in_id 443 ", 444 ) 445 .try_map(|r: PgRow| { 446 Ok( 447 if r.try_get_flag(0)? { 448 Status::Pending 449 } else if r.try_get_flag(1)? { 450 Status::Bounced 451 } else { 452 match r.try_get(2)? { 453 None => Status::Simple, 454 Some(IncomingType::reserve) => Status::Reserve(r.try_get(3)?), 455 Some(IncomingType::kyc) => Status::Kyc(r.try_get(3)?), 456 Some(e) => unreachable!("{e:?}") 457 } 458 } 459 ) 460 }) 461 .fetch_all(pool) 462 .await 463 .unwrap() 464 } 465 466 #[tokio::test] 467 async fn registration() { 468 let (server, pool) = setup().await; 469 registration_routine(&server, &ACCOUNT, || check_in(&pool)).await; 470 } 471 }