api.rs (8169B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use compact_str::CompactString; 18 use jiff::Timestamp; 19 use taler_api::{ 20 api::{TalerApi, revenue::Revenue, wire::WireGateway}, 21 error::{ApiResult, failure}, 22 subject::IncomingSubject, 23 }; 24 use taler_common::{ 25 api_common::{SafeU64, safe_u64}, 26 api_params::{History, Page}, 27 api_revenue::RevenueIncomingHistory, 28 api_wire::{ 29 AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, 30 IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, 31 TransferState, TransferStatus, 32 }, 33 error_code::ErrorCode, 34 types::{amount::Currency, payto::PaytoURI}, 35 }; 36 use tokio::sync::watch::Sender; 37 38 use crate::{ 39 db::{self, AddIncomingResult, Transfer, TxInAdmin}, 40 payto::FullCyclosPayto, 41 }; 42 43 pub struct CyclosApi { 44 pub pool: sqlx::PgPool, 45 pub currency: Currency, 46 pub payto: PaytoURI, 47 pub in_channel: Sender<i64>, 48 pub taler_in_channel: Sender<i64>, 49 pub out_channel: Sender<i64>, 50 pub taler_out_channel: Sender<i64>, 51 pub root: CompactString, 52 } 53 54 impl CyclosApi { 55 pub async fn start( 56 pool: sqlx::PgPool, 57 root: CompactString, 58 payto: PaytoURI, 59 currency: Currency, 60 ) -> Self { 61 let in_channel = Sender::new(0); 62 let taler_in_channel = Sender::new(0); 63 let out_channel = Sender::new(0); 64 let taler_out_channel = Sender::new(0); 65 let tmp = Self { 66 pool: pool.clone(), 67 payto, 68 currency, 69 root, 70 in_channel: in_channel.clone(), 71 taler_in_channel: taler_in_channel.clone(), 72 out_channel: out_channel.clone(), 73 taler_out_channel: taler_out_channel.clone(), 74 }; 75 tokio::spawn(db::notification_listener( 76 pool, 77 in_channel, 78 taler_in_channel, 79 out_channel, 80 taler_out_channel, 81 )); 82 tmp 83 } 84 } 85 86 impl TalerApi for CyclosApi { 87 fn currency(&self) -> &str { 88 self.currency.as_ref() 89 } 90 91 fn implementation(&self) -> &'static str { 92 "urn:net:taler:specs:taler-cyclos:taler-rust" 93 } 94 } 95 96 impl WireGateway for CyclosApi { 97 async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { 98 let creditor = FullCyclosPayto::try_from(&req.credit_account)?; 99 let result = db::make_transfer( 100 &self.pool, 101 &Transfer { 102 request_uid: req.request_uid, 103 amount: req.amount.decimal(), 104 exchange_base_url: req.exchange_base_url, 105 wtid: req.wtid, 106 creditor_id: *creditor.id, 107 creditor_name: creditor.name, 108 }, 109 &Timestamp::now(), 110 ) 111 .await?; 112 match result { 113 db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse { 114 timestamp: initiated_at.into(), 115 row_id: SafeU64::try_from(id).unwrap(), 116 }), 117 db::TransferResult::RequestUidReuse => Err(failure( 118 ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, 119 "request_uid used already", 120 )), 121 db::TransferResult::WtidReuse => Err(failure( 122 ErrorCode::BANK_TRANSFER_WTID_REUSED, 123 "wtid used already", 124 )), 125 } 126 } 127 128 async fn transfer_page( 129 &self, 130 page: Page, 131 status: Option<TransferState>, 132 ) -> ApiResult<TransferList> { 133 Ok(TransferList { 134 transfers: db::transfer_page(&self.pool, &status, &self.currency, &self.root, &page) 135 .await?, 136 debit_account: self.payto.clone(), 137 }) 138 } 139 140 async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { 141 Ok(db::transfer_by_id(&self.pool, id, &self.currency, &self.root).await?) 142 } 143 144 async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { 145 Ok(OutgoingHistory { 146 outgoing_transactions: db::outgoing_history( 147 &self.pool, 148 ¶ms, 149 &self.currency, 150 &self.root, 151 || self.taler_out_channel.subscribe(), 152 ) 153 .await?, 154 debit_account: self.payto.clone(), 155 }) 156 } 157 158 async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { 159 Ok(IncomingHistory { 160 incoming_transactions: db::incoming_history( 161 &self.pool, 162 ¶ms, 163 &self.currency, 164 &self.root, 165 || self.taler_in_channel.subscribe(), 166 ) 167 .await?, 168 credit_account: self.payto.clone(), 169 }) 170 } 171 172 async fn add_incoming_reserve( 173 &self, 174 req: AddIncomingRequest, 175 ) -> ApiResult<AddIncomingResponse> { 176 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 177 let res = db::register_tx_in_admin( 178 &self.pool, 179 &TxInAdmin { 180 amount: req.amount.decimal(), 181 subject: format!("Admin incoming {}", req.reserve_pub), 182 debtor_id: *debtor.id, 183 debtor_name: debtor.name, 184 metadata: IncomingSubject::Reserve(req.reserve_pub), 185 }, 186 &Timestamp::now(), 187 ) 188 .await?; 189 match res { 190 AddIncomingResult::Success { 191 row_id, valued_at, .. 192 } => Ok(AddIncomingResponse { 193 row_id: safe_u64(row_id), 194 timestamp: valued_at.into(), 195 }), 196 AddIncomingResult::ReservePubReuse => Err(failure( 197 ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, 198 "reserve_pub used already".to_owned(), 199 )), 200 } 201 } 202 203 async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> { 204 let debtor = FullCyclosPayto::try_from(&req.debit_account)?; 205 let res = db::register_tx_in_admin( 206 &self.pool, 207 &TxInAdmin { 208 amount: req.amount.decimal(), 209 subject: format!("Admin incoming KYC:{}", req.account_pub), 210 debtor_id: *debtor.id, 211 debtor_name: debtor.name, 212 metadata: IncomingSubject::Kyc(req.account_pub), 213 }, 214 &Timestamp::now(), 215 ) 216 .await?; 217 match res { 218 AddIncomingResult::Success { 219 row_id, valued_at, .. 220 } => Ok(AddKycauthResponse { 221 row_id: safe_u64(row_id), 222 timestamp: valued_at.into(), 223 }), 224 AddIncomingResult::ReservePubReuse => Err(failure( 225 ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, 226 "reserve_pub used already".to_owned(), 227 )), 228 } 229 } 230 231 fn support_account_check(&self) -> bool { 232 false 233 } 234 } 235 236 impl Revenue for CyclosApi { 237 async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> { 238 Ok(RevenueIncomingHistory { 239 incoming_transactions: db::revenue_history( 240 &self.pool, 241 ¶ms, 242 &self.currency, 243 &self.root, 244 || self.in_channel.subscribe(), 245 ) 246 .await?, 247 credit_account: self.payto.clone(), 248 }) 249 } 250 }