api.rs (7935B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::sync::{ 18 Arc, 19 atomic::{AtomicBool, Ordering}, 20 }; 21 22 use axum::{ 23 extract::{Request, State}, 24 http::StatusCode, 25 middleware::Next, 26 response::{IntoResponse as _, Response}, 27 }; 28 use sqlx::{PgPool, Row, postgres::PgListener}; 29 use taler_api::{ 30 api::{TalerApi, wire::WireGateway}, 31 error::{ApiResult, failure, failure_status, not_implemented}, 32 }; 33 use taler_common::{ 34 ExpoBackoffDecorr, 35 api_params::{History, Page}, 36 api_wire::{ 37 AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, 38 IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, 39 TransferState, TransferStatus, 40 }, 41 error_code::ErrorCode, 42 types::{amount::Currency, payto::PaytoURI, timestamp::Timestamp}, 43 }; 44 use tokio::{sync::watch::Sender, time::sleep}; 45 use tracing::error; 46 47 use crate::{ 48 db::{self}, 49 payto::FullBtcPayto, 50 }; 51 52 pub struct ServerState { 53 pool: PgPool, 54 payto: PaytoURI, 55 currency: Currency, 56 status: AtomicBool, 57 taler_in_channel: Sender<i64>, 58 taler_out_channel: Sender<i64>, 59 } 60 61 pub async fn notification_listener( 62 pool: PgPool, 63 taler_in_channel: Sender<i64>, 64 taler_out_channel: Sender<i64>, 65 ) -> sqlx::Result<()> { 66 taler_api::notification::notification_listener!(&pool, 67 "taler_in" => (row_id: i64) { 68 taler_in_channel.send_replace(row_id); 69 }, 70 "taler_out" => (row_id: i64) { 71 taler_out_channel.send_replace(row_id); 72 } 73 ) 74 } 75 76 impl ServerState { 77 pub async fn start(pool: sqlx::PgPool, payto: PaytoURI, currency: Currency) -> Arc<Self> { 78 let taler_in_channel = Sender::new(0); 79 let taler_out_channel = Sender::new(0); 80 let tmp = Self { 81 pool: pool.clone(), 82 payto, 83 currency, 84 status: AtomicBool::new(true), 85 taler_in_channel: taler_in_channel.clone(), 86 taler_out_channel: taler_out_channel.clone(), 87 }; 88 let state = Arc::new(tmp); 89 tokio::spawn(status_watcher(state.clone())); 90 tokio::spawn(notification_listener( 91 pool, 92 taler_in_channel, 93 taler_out_channel, 94 )); 95 state 96 } 97 } 98 99 impl TalerApi for ServerState { 100 fn currency(&self) -> &str { 101 self.currency.as_ref() 102 } 103 104 fn implementation(&self) -> Option<&str> { 105 None 106 } 107 } 108 109 impl WireGateway for ServerState { 110 async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { 111 let creditor = FullBtcPayto::try_from(&req.credit_account)?; 112 113 match db::transfer(&self.pool, &creditor, &req).await? { 114 db::TransferResult::Success(transfer_response) => Ok(transfer_response), 115 db::TransferResult::RequestUidReuse => Err(failure( 116 ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, 117 format!("Request UID {} already used", req.request_uid), 118 )), 119 db::TransferResult::WtidReuse => Err(failure( 120 ErrorCode::BANK_TRANSFER_WTID_REUSED, 121 format!("wtid {} already used", req.request_uid), 122 )), 123 } 124 } 125 126 async fn transfer_page( 127 &self, 128 params: Page, 129 status: Option<TransferState>, 130 ) -> ApiResult<TransferList> { 131 let transfers = db::transfer_page(&self.pool, &status, ¶ms, &self.currency).await?; 132 Ok(TransferList { 133 transfers, 134 debit_account: self.payto.clone(), 135 }) 136 } 137 138 async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { 139 let status = db::transfer_by_id(&self.pool, id, &self.currency).await?; 140 Ok(status) 141 } 142 143 async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { 144 let outgoing_transactions = 145 db::outgoing_history(&self.pool, ¶ms, &self.currency, || { 146 self.taler_out_channel.subscribe() 147 }) 148 .await?; 149 Ok(OutgoingHistory { 150 debit_account: self.payto.clone(), 151 outgoing_transactions, 152 }) 153 } 154 155 async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { 156 let incoming_transactions = 157 db::incoming_history(&self.pool, ¶ms, &self.currency, || { 158 self.taler_in_channel.subscribe() 159 }) 160 .await?; 161 Ok(IncomingHistory { 162 credit_account: self.payto.clone(), 163 incoming_transactions, 164 }) 165 } 166 167 async fn add_incoming_reserve( 168 &self, 169 req: AddIncomingRequest, 170 ) -> ApiResult<AddIncomingResponse> { 171 let debtor = FullBtcPayto::try_from(&req.debit_account)?; 172 let timestamp = Timestamp::now(); 173 match db::register_tx_in_admin( 174 &self.pool, 175 &req.amount, 176 &debtor.0, 177 ×tamp, 178 &req.reserve_pub, 179 ) 180 .await? 181 { 182 db::AddIncomingResult::Success { 183 new: _, 184 row_id, 185 valued_at, 186 } => Ok(AddIncomingResponse { 187 timestamp: valued_at, 188 row_id, 189 }), 190 db::AddIncomingResult::ReservePubReuse => Err(failure( 191 ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, 192 "reserve_pub used already".to_owned(), 193 )), 194 } 195 } 196 197 async fn add_incoming_kyc(&self, _req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> { 198 Err(not_implemented( 199 "depolymerizer-bitcoin does not supports KYC", 200 )) 201 } 202 203 fn support_account_check(&self) -> bool { 204 false 205 } 206 } 207 208 pub async fn status_middleware( 209 State(state): State<Arc<ServerState>>, 210 request: Request, 211 next: Next, 212 ) -> Response { 213 if !state.status.load(Ordering::Relaxed) { 214 failure_status( 215 ErrorCode::GENERIC_INTERNAL_INVARIANT_FAILURE, 216 "Currency backing is compromised until the transaction reappear", 217 StatusCode::BAD_GATEWAY, 218 ) 219 .into_response() 220 } else { 221 next.run(request).await 222 } 223 } 224 225 /// Listen to backend status change 226 async fn status_watcher(state: Arc<ServerState>) { 227 let mut jitter = ExpoBackoffDecorr::default(); 228 async fn inner( 229 state: &ServerState, 230 jitter: &mut ExpoBackoffDecorr, 231 ) -> Result<(), sqlx::error::Error> { 232 let mut listener = PgListener::connect_with(&state.pool).await?; 233 listener.listen("status").await?; 234 loop { 235 // Sync state 236 let row = sqlx::query("SELECT value FROM state WHERE name = 'status'") 237 .fetch_one(&state.pool) 238 .await?; 239 let status: &[u8] = row.try_get(0)?; 240 assert!(status.len() == 1 && status[0] < 2); 241 state.status.store(status[0] == 1, Ordering::SeqCst); 242 // Wait for next notification 243 listener.recv().await?; 244 jitter.reset(); 245 } 246 } 247 248 loop { 249 if let Err(err) = inner(&state, &mut jitter).await { 250 error!(target: "status-watcher", "{err}"); 251 sleep(jitter.backoff()).await; 252 } 253 } 254 }