api.rs (14685B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::sync::{ 18 Arc, 19 atomic::{AtomicBool, Ordering}, 20 }; 21 22 use axum::{ 23 extract::{Request, State}, 24 http::StatusCode, 25 middleware::Next, 26 response::{IntoResponse as _, Response}, 27 }; 28 use jiff::Timestamp; 29 use sqlx::{PgPool, postgres::PgListener}; 30 use taler_api::{ 31 api::{ 32 TalerApi, 33 prepared::{PreparedTransfer, simple_subject}, 34 revenue::Revenue, 35 wire::WireGateway, 36 }, 37 error::{ApiResult, failure_code, failure_status}, 38 subject::IncomingSubject, 39 }; 40 use taler_common::{ 41 ExpoBackoffDecorr, 42 api::{ 43 params::{History, Page}, 44 prepared::{RegistrationRequest, RegistrationResponse, SubjectFormat, Unregistration}, 45 revenue::RevenueIncomingHistory, 46 wire::{ 47 AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddMappedRequest, 48 IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, 49 TransferState, TransferStatus, 50 }, 51 }, 52 error_code::ErrorCode, 53 types::{ 54 amount::{Amount, Currency}, 55 payto::PaytoURI, 56 timestamp::TalerTimestamp, 57 }, 58 }; 59 use tokio::{sync::watch::Sender, time::sleep}; 60 use tracing::{debug, error, warn}; 61 62 use crate::{ 63 db::{ 64 self, AddIncomingResult, RegistrationResult, TransferResult, get_status, 65 register_tx_in_admin, revenue_history, transfer, transfer_register, transfer_unregister, 66 }, 67 payto::{BtcPayto, FullBtcPayto}, 68 }; 69 70 pub struct ServerState { 71 pool: PgPool, 72 payto: FullBtcPayto, 73 currency: Currency, 74 status: AtomicBool, 75 in_channel: Sender<i64>, 76 taler_in_channel: Sender<i64>, 77 taler_out_channel: Sender<i64>, 78 } 79 80 pub async fn notification_listener( 81 pool: PgPool, 82 in_channel: Sender<i64>, 83 taler_in_channel: Sender<i64>, 84 taler_out_channel: Sender<i64>, 85 ) -> sqlx::Result<()> { 86 taler_api::notification::notification_listener!(&pool, 87 "tx_in" => (row_id: i64) { 88 in_channel.send_replace(row_id); 89 }, 90 "taler_in" => (row_id: i64) { 91 taler_in_channel.send_replace(row_id); 92 }, 93 "taler_out" => (row_id: i64) { 94 taler_out_channel.send_replace(row_id); 95 } 96 ) 97 } 98 99 impl ServerState { 100 pub async fn start(pool: sqlx::PgPool, payto: FullBtcPayto, currency: Currency) -> Arc<Self> { 101 let in_channel = Sender::new(0); 102 let taler_in_channel = Sender::new(0); 103 let taler_out_channel = Sender::new(0); 104 let tmp = Self { 105 pool: pool.clone(), 106 payto, 107 currency, 108 status: AtomicBool::new(true), 109 in_channel: in_channel.clone(), 110 taler_in_channel: taler_in_channel.clone(), 111 taler_out_channel: taler_out_channel.clone(), 112 }; 113 let state = Arc::new(tmp); 114 tokio::spawn(status_watcher(state.clone())); 115 tokio::spawn(notification_listener( 116 pool, 117 in_channel, 118 taler_in_channel, 119 taler_out_channel, 120 )); 121 state 122 } 123 } 124 125 impl TalerApi for ServerState { 126 fn currency(&self) -> Currency { 127 self.currency 128 } 129 130 fn implementation(&self) -> &'static str { 131 "urn:net:taler:specs:depolymerizer-bitcoin:depolymerization" 132 } 133 } 134 135 async fn add_incoming( 136 db: &PgPool, 137 amount: Amount, 138 debit_account: PaytoURI, 139 subject: &IncomingSubject, 140 ) -> ApiResult<AddIncomingResponse> { 141 let debtor = FullBtcPayto::try_from(&debit_account)?; 142 match register_tx_in_admin(db, &amount, &debtor.0, &Timestamp::now(), subject).await? { 143 AddIncomingResult::Success { 144 row_id, valued_at, .. 145 } => Ok(AddIncomingResponse { 146 row_id, 147 timestamp: valued_at.into(), 148 }), 149 AddIncomingResult::ReservePubReuse => { 150 Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 151 } 152 AddIncomingResult::MappingReuse => { 153 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_REUSED)) 154 } 155 AddIncomingResult::UnknownMapping => { 156 Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_UNKNOWN)) 157 } 158 } 159 } 160 161 impl WireGateway for ServerState { 162 async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { 163 let creditor = FullBtcPayto::try_from(&req.credit_account)?; 164 match transfer(&self.pool, &creditor, &req).await? { 165 TransferResult::Success(transfer_response) => Ok(transfer_response), 166 TransferResult::RequestUidReuse => { 167 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED)) 168 } 169 TransferResult::WtidReuse => Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)), 170 } 171 } 172 173 async fn transfer_page( 174 &self, 175 params: Page, 176 status: Option<TransferState>, 177 ) -> ApiResult<TransferList> { 178 let transfers = db::transfer_page(&self.pool, &status, ¶ms, &self.currency).await?; 179 Ok(TransferList { 180 transfers, 181 debit_account: self.payto.as_uri(), 182 }) 183 } 184 185 async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { 186 let status = db::transfer_by_id(&self.pool, id, &self.currency).await?; 187 Ok(status) 188 } 189 190 async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { 191 let outgoing_transactions = 192 db::outgoing_history(&self.pool, ¶ms, &self.currency, || { 193 self.taler_out_channel.subscribe() 194 }) 195 .await?; 196 Ok(OutgoingHistory { 197 debit_account: self.payto.as_uri(), 198 outgoing_transactions, 199 }) 200 } 201 202 async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { 203 let incoming_transactions = 204 db::incoming_history(&self.pool, ¶ms, &self.currency, || { 205 self.taler_in_channel.subscribe() 206 }) 207 .await?; 208 Ok(IncomingHistory { 209 credit_account: self.payto.as_uri(), 210 incoming_transactions, 211 }) 212 } 213 214 async fn add_incoming_reserve( 215 &self, 216 req: AddIncomingRequest, 217 ) -> ApiResult<AddIncomingResponse> { 218 add_incoming( 219 &self.pool, 220 req.amount, 221 req.debit_account, 222 &IncomingSubject::Reserve(req.reserve_pub), 223 ) 224 .await 225 } 226 227 async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddIncomingResponse> { 228 add_incoming( 229 &self.pool, 230 req.amount, 231 req.debit_account, 232 &IncomingSubject::Kyc(req.account_pub), 233 ) 234 .await 235 } 236 237 async fn add_incoming_mapped(&self, req: AddMappedRequest) -> ApiResult<AddIncomingResponse> { 238 add_incoming( 239 &self.pool, 240 req.amount, 241 req.debit_account, 242 &IncomingSubject::Map(req.authorization_pub), 243 ) 244 .await 245 } 246 247 fn support_account_check(&self) -> bool { 248 // TODO we might be able to check this ? 249 false 250 } 251 } 252 253 impl Revenue for ServerState { 254 async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> { 255 Ok(RevenueIncomingHistory { 256 incoming_transactions: revenue_history(&self.pool, ¶ms, &self.currency, || { 257 self.in_channel.subscribe() 258 }) 259 .await?, 260 credit_account: self.payto.as_uri(), 261 }) 262 } 263 } 264 265 impl PreparedTransfer for ServerState { 266 // TODO bitcoin subject format 267 fn supported_formats(&self) -> &[SubjectFormat] { 268 &[SubjectFormat::SIMPLE] 269 } 270 271 async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> { 272 let creditor = BtcPayto::try_from(&req.credit_account)?; 273 if creditor.0 != self.payto.0 { 274 return Err(failure_code(ErrorCode::BANK_UNKNOWN_CREDITOR)); 275 } 276 match transfer_register( 277 &self.pool, 278 req.r#type.into(), 279 &req.account_pub, 280 &req.authorization_pub, 281 &req.authorization_sig, 282 req.recurrent, 283 &Timestamp::now(), 284 ) 285 .await? 286 { 287 RegistrationResult::Success => ApiResult::Ok(RegistrationResponse { 288 subjects: vec![simple_subject(req)], 289 expiration: TalerTimestamp::Never, 290 }), 291 RegistrationResult::ReservePubReuse => { 292 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) 293 } 294 RegistrationResult::SubjectReuse => { 295 ApiResult::Err(failure_code(ErrorCode::BANK_DERIVATION_REUSE)) 296 } 297 } 298 } 299 300 async fn unregistration(&self, req: Unregistration) -> ApiResult<bool> { 301 Ok(transfer_unregister(&self.pool, &req.authorization_pub, &Timestamp::now()).await?) 302 } 303 } 304 305 pub async fn status_middleware( 306 State(state): State<Arc<ServerState>>, 307 request: Request, 308 next: Next, 309 ) -> Response { 310 if !state.status.load(Ordering::Relaxed) { 311 failure_status( 312 ErrorCode::GENERIC_INTERNAL_INVARIANT_FAILURE, 313 "Currency backing is compromised until the transaction reappear", 314 StatusCode::BAD_GATEWAY, 315 ) 316 .into_response() 317 } else { 318 next.run(request).await 319 } 320 } 321 322 /// Listen to backend status change 323 async fn status_watcher(state: Arc<ServerState>) { 324 let mut jitter = ExpoBackoffDecorr::default(); 325 async fn inner( 326 state: &ServerState, 327 jitter: &mut ExpoBackoffDecorr, 328 ) -> Result<(), sqlx::error::Error> { 329 let mut listener = PgListener::connect_with(&state.pool).await?; 330 listener.listen("status").await?; 331 loop { 332 // Sync state 333 if let Some([status]) = get_status(&state.pool).await? { 334 assert!(status < 2); 335 if status == 1 { 336 debug!(target: "status-watcher", "Worker healthy"); 337 } else { 338 debug!(target: "status-watcher", "Worker down"); 339 } 340 state.status.store(status == 1, Ordering::SeqCst); 341 } else { 342 warn!(target: "status-watcher", "Status not setup"); 343 } 344 // Wait for next notification 345 listener.recv().await?; 346 jitter.reset(); 347 } 348 } 349 350 loop { 351 if let Err(err) = inner(&state, &mut jitter).await { 352 error!(target: "status-watcher", "{err}"); 353 sleep(jitter.backoff()).await; 354 } 355 } 356 } 357 358 #[cfg(test)] 359 pub mod test { 360 361 use std::{str::FromStr, sync::LazyLock}; 362 363 use axum::Router; 364 use jiff::Timestamp; 365 use sqlx::PgPool; 366 use taler_api::{api::TalerRouter as _, auth::AuthMethod, subject::OutgoingSubject}; 367 use taler_common::{ 368 api::wire::{TransferState, WireConfig}, 369 types::amount::{Currency, amount}, 370 }; 371 use taler_test_utils::{ 372 db::db_test_setup, 373 routine::{admin_add_incoming_routine, out_history_routine, transfer_routine}, 374 server::TestServer, 375 tasks, 376 }; 377 378 use crate::{ 379 CONFIG_SOURCE, 380 api::ServerState, 381 db::{TxOutKind, sync_out, test::rand_tx_id}, 382 payto::FullBtcPayto, 383 }; 384 385 pub static EXCHANGE: LazyLock<FullBtcPayto> = LazyLock::new(|| { 386 FullBtcPayto::from_str( 387 "payto://bitcoin/1FfmbHfnpaZjKFvyi1okTjJJusN455paPH?receiver-name=Exchange", 388 ) 389 .unwrap() 390 }); 391 392 pub static CLIENT: LazyLock<FullBtcPayto> = LazyLock::new(|| { 393 FullBtcPayto::from_str( 394 "payto://bitcoin/1FfmbHfnpaZjKFvyi1okTjJJusN455paPH?receiver-name=Anonymous", 395 ) 396 .unwrap() 397 }); 398 399 async fn setup() -> (Router, PgPool) { 400 let (_, pool) = db_test_setup(CONFIG_SOURCE).await; 401 let api = ServerState::start( 402 pool.clone(), 403 EXCHANGE.clone(), 404 Currency::from_str("BTC").unwrap(), 405 ) 406 .await; 407 let server = Router::new() 408 .wire_gateway(api.clone(), AuthMethod::None) 409 .prepared_transfer(api.clone()) 410 .revenue(api.clone(), AuthMethod::None) 411 .finalize(); 412 413 (server, pool) 414 } 415 416 #[tokio::test] 417 async fn config() { 418 let (server, _) = setup().await; 419 server 420 .get("/taler-wire-gateway/config") 421 .await 422 .assert_ok_json::<WireConfig>(); 423 } 424 425 #[tokio::test] 426 async fn transfer() { 427 let (server, _) = setup().await; 428 transfer_routine( 429 &server.prefix("/taler-wire-gateway"), 430 TransferState::pending, 431 &CLIENT.as_uri(), 432 ) 433 .await; 434 } 435 436 #[tokio::test] 437 async fn outgoing_history() { 438 let (server, db) = setup().await; 439 out_history_routine( 440 &server.prefix("/taler-wire-gateway"), 441 tasks!({ 442 let sub = &OutgoingSubject::rand(); 443 sync_out( 444 &db, 445 &rand_tx_id(), 446 None, 447 &amount("BTC:10"), 448 &EXCHANGE.0, 449 &TxOutKind::Talerable { 450 wtid: &sub.wtid, 451 url: &sub.exchange_base_url, 452 metadata: sub.metadata.as_deref(), 453 }, 454 &Timestamp::now(), 455 ) 456 .await 457 .unwrap(); 458 }), 459 tasks!(), 460 ) 461 .await; 462 } 463 464 #[tokio::test] 465 async fn admin_add_incoming() { 466 let (server, _) = setup().await; 467 admin_add_incoming_routine( 468 &server.prefix("/taler-wire-gateway"), 469 &server.prefix("/taler-prepared-transfer"), 470 &CLIENT.as_uri(), 471 &EXCHANGE.as_uri(), 472 ) 473 .await; 474 } 475 }