db.rs (10563B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2024, 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use jiff::Timestamp; 18 use sqlx::{PgPool, QueryBuilder, Row, postgres::PgRow}; 19 use taler_api::db::{BindHelper, IncomingType, TypeHelper, history, page}; 20 use taler_common::{ 21 api_common::{EddsaPublicKey, SafeU64}, 22 api_params::{History, Page}, 23 api_revenue::RevenueIncomingBankTransaction, 24 api_wire::{ 25 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, 26 TransferResponse, TransferState, TransferStatus, 27 }, 28 types::{ 29 amount::{Amount, Currency}, 30 payto::PaytoURI, 31 }, 32 }; 33 use tokio::sync::watch::{Receiver, Sender}; 34 35 pub async fn notification_listener( 36 pool: PgPool, 37 outgoing_channel: Sender<i64>, 38 incoming_channel: Sender<i64>, 39 ) -> sqlx::Result<()> { 40 taler_api::notification::notification_listener!(&pool, 41 "outgoing_tx" => (row_id: i64) { 42 outgoing_channel.send_replace(row_id); 43 }, 44 "incoming_tx" => (row_id: i64) { 45 incoming_channel.send_replace(row_id); 46 } 47 ) 48 } 49 50 pub enum TransferResult { 51 Success(TransferResponse), 52 RequestUidReuse, 53 WtidReuse, 54 } 55 56 pub async fn transfer(db: &PgPool, transfer: TransferRequest) -> sqlx::Result<TransferResult> { 57 sqlx::query( 58 " 59 SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at 60 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7) 61 ", 62 ) 63 .bind(&transfer.amount) 64 .bind(transfer.exchange_base_url.as_str()) 65 .bind(format!("{} {}", transfer.wtid, transfer.exchange_base_url)) 66 .bind(transfer.credit_account.raw()) 67 .bind(transfer.request_uid.as_slice()) 68 .bind(transfer.wtid.as_slice()) 69 .bind_timestamp(&Timestamp::now()) 70 .try_map(|r: PgRow| { 71 Ok(if r.try_get_flag("out_request_uid_reuse")? { 72 TransferResult::RequestUidReuse 73 } else if r.try_get_flag("out_wtid_reuse")? { 74 TransferResult::WtidReuse 75 } else { 76 TransferResult::Success(TransferResponse { 77 row_id: r.try_get_safeu64("out_transfer_row_id")?, 78 timestamp: r.try_get_timestamp("out_created_at")?.into(), 79 }) 80 }) 81 }) 82 .fetch_one(db) 83 .await 84 } 85 86 pub async fn transfer_page( 87 db: &PgPool, 88 status: &Option<TransferState>, 89 params: &Page, 90 currency: &Currency, 91 ) -> sqlx::Result<Vec<TransferListStatus>> { 92 page( 93 db, 94 "transfer_id", 95 params, 96 || { 97 let mut builder = QueryBuilder::new( 98 " 99 SELECT 100 transfer_id, 101 status, 102 amount, 103 credit_payto, 104 created_at 105 FROM transfer WHERE 106 ", 107 ); 108 if let Some(status) = status { 109 builder.push(" status = ").push_bind(status).push(" AND "); 110 } 111 builder 112 }, 113 |r: PgRow| { 114 Ok(TransferListStatus { 115 row_id: r.try_get_safeu64("transfer_id")?, 116 status: r.try_get("status")?, 117 amount: r.try_get_amount("amount", currency)?, 118 credit_account: r.try_get_payto("credit_payto")?, 119 timestamp: r.try_get_timestamp("created_at")?.into(), 120 }) 121 }, 122 ) 123 .await 124 } 125 126 pub async fn transfer_by_id( 127 db: &PgPool, 128 id: u64, 129 currency: &Currency, 130 ) -> sqlx::Result<Option<TransferStatus>> { 131 sqlx::query( 132 " 133 SELECT 134 status, 135 status_msg, 136 amount, 137 exchange_base_url, 138 wtid, 139 credit_payto, 140 created_at 141 FROM transfer WHERE transfer_id = $1 142 ", 143 ) 144 .bind(id as i64) 145 .try_map(|r: PgRow| { 146 Ok(TransferStatus { 147 status: r.try_get("status")?, 148 status_msg: r.try_get("status_msg")?, 149 amount: r.try_get_amount("amount", currency)?, 150 origin_exchange_url: r.try_get("exchange_base_url")?, 151 wtid: r.try_get("wtid")?, 152 credit_account: r.try_get_payto("credit_payto")?, 153 timestamp: r.try_get_timestamp("created_at")?.into(), 154 }) 155 }) 156 .fetch_optional(db) 157 .await 158 } 159 160 pub async fn outgoing_revenue( 161 db: &PgPool, 162 params: &History, 163 currency: &Currency, 164 listen: impl FnOnce() -> Receiver<i64>, 165 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 166 history( 167 db, 168 "transfer_id", 169 params, 170 listen, 171 || { 172 QueryBuilder::new( 173 " 174 SELECT 175 transfer_id, 176 amount, 177 exchange_base_url, 178 wtid, 179 credit_payto, 180 created_at 181 FROM transfer WHERE status = 'success' AND 182 ", 183 ) 184 }, 185 |r| { 186 Ok(OutgoingBankTransaction { 187 amount: r.try_get_amount("amount", currency)?, 188 debit_fee: None, 189 wtid: r.try_get("wtid")?, 190 credit_account: r.try_get_payto("credit_payto")?, 191 row_id: r.try_get_safeu64("transfer_id")?, 192 date: r.try_get_timestamp("created_at")?.into(), 193 exchange_base_url: r.try_get_url("exchange_base_url")?, 194 }) 195 }, 196 ) 197 .await 198 } 199 200 pub enum AddIncomingResult { 201 Success { id: SafeU64, created_at: Timestamp }, 202 ReservePubReuse, 203 } 204 205 pub async fn add_incoming( 206 db: &PgPool, 207 amount: &Amount, 208 debit_account: &PaytoURI, 209 subject: &str, 210 timestamp: &Timestamp, 211 kind: IncomingType, 212 key: &EddsaPublicKey, 213 ) -> sqlx::Result<AddIncomingResult> { 214 sqlx::query( 215 " 216 SELECT out_reserve_pub_reuse, out_tx_row_id, out_created_at 217 FROM add_incoming($1, $2, $3, $4, $5, $6) 218 ", 219 ) 220 .bind(amount) 221 .bind(subject) 222 .bind(debit_account.raw()) 223 .bind(kind) 224 .bind(key.as_ref().as_slice()) 225 .bind_timestamp(timestamp) 226 .try_map(|r: PgRow| { 227 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 228 AddIncomingResult::ReservePubReuse 229 } else { 230 AddIncomingResult::Success { 231 id: r.try_get_safeu64("out_tx_row_id")?, 232 created_at: r.try_get_timestamp("out_created_at")?.into(), 233 } 234 }) 235 }) 236 .fetch_one(db) 237 .await 238 } 239 240 pub async fn incoming_history( 241 db: &PgPool, 242 params: &History, 243 currency: &Currency, 244 listen: impl FnOnce() -> Receiver<i64>, 245 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 246 history( 247 db, 248 "tx_in_id", 249 params, 250 listen, 251 || { 252 QueryBuilder::new( 253 " 254 SELECT 255 type, 256 tx_in_id, 257 amount, 258 created_at, 259 debit_payto, 260 metadata, 261 origin_exchange_url 262 FROM tx_in WHERE 263 ", 264 ) 265 }, 266 |r: PgRow| { 267 Ok(match r.try_get("type")? { 268 IncomingType::reserve => IncomingBankTransaction::Reserve { 269 row_id: r.try_get_safeu64("tx_in_id")?, 270 date: r.try_get_timestamp("created_at")?.into(), 271 amount: r.try_get_amount("amount", currency)?, 272 credit_fee: None, 273 debit_account: r.try_get_payto("debit_payto")?, 274 reserve_pub: r.try_get("metadata")?, 275 authorization_pub: None, 276 authorization_sig: None, 277 }, 278 IncomingType::kyc => IncomingBankTransaction::Kyc { 279 row_id: r.try_get_safeu64("tx_in_id")?, 280 date: r.try_get_timestamp("created_at")?.into(), 281 amount: r.try_get_amount("amount", currency)?, 282 credit_fee: None, 283 debit_account: r.try_get_payto("debit_payto")?, 284 account_pub: r.try_get("metadata")?, 285 authorization_pub: None, 286 authorization_sig: None, 287 }, 288 IncomingType::wad => IncomingBankTransaction::Wad { 289 row_id: r.try_get_safeu64("tx_in_id")?, 290 date: r.try_get_timestamp("created_at")?.into(), 291 amount: r.try_get_amount("amount", currency)?, 292 debit_account: r.try_get_payto("debit_payto")?, 293 origin_exchange_url: r.try_get_url("origin_exchange_url")?, 294 wad_id: r.try_get("metadata")?, 295 }, 296 }) 297 }, 298 ) 299 .await 300 } 301 302 pub async fn revenue_history( 303 db: &PgPool, 304 params: &History, 305 currency: &Currency, 306 listen: impl FnOnce() -> Receiver<i64>, 307 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 308 history( 309 db, 310 "tx_in_id", 311 params, 312 listen, 313 || { 314 QueryBuilder::new( 315 " 316 SELECT 317 tx_in_id, 318 amount, 319 created_at, 320 debit_payto, 321 subject 322 FROM tx_in WHERE 323 ", 324 ) 325 }, 326 |r: PgRow| { 327 Ok(RevenueIncomingBankTransaction { 328 row_id: r.try_get_safeu64("tx_in_id")?, 329 date: r.try_get_timestamp("created_at")?.into(), 330 amount: r.try_get_amount("amount", currency)?, 331 credit_fee: None, 332 debit_account: r.try_get_payto("debit_payto")?, 333 subject: r.try_get("subject")?, 334 }) 335 }, 336 ) 337 .await 338 }