db.rs (10725B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2024-2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use jiff::Timestamp; 18 use sqlx::{PgPool, QueryBuilder, Row, postgres::PgRow}; 19 use taler_api::db::{BindHelper, IncomingType, TypeHelper, history, page}; 20 use taler_common::{ 21 api_common::{EddsaPublicKey, SafeU64}, 22 api_params::{History, Page}, 23 api_revenue::RevenueIncomingBankTransaction, 24 api_wire::{ 25 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, 26 TransferResponse, TransferState, TransferStatus, 27 }, 28 types::{ 29 amount::{Amount, Currency}, 30 payto::PaytoURI, 31 }, 32 }; 33 use tokio::sync::watch::{Receiver, Sender}; 34 35 pub async fn notification_listener( 36 pool: PgPool, 37 outgoing_channel: Sender<i64>, 38 incoming_channel: Sender<i64>, 39 ) -> sqlx::Result<()> { 40 taler_api::notification::notification_listener!(&pool, 41 "outgoing_tx" => (row_id: i64) { 42 outgoing_channel.send_replace(row_id); 43 }, 44 "incoming_tx" => (row_id: i64) { 45 incoming_channel.send_replace(row_id); 46 } 47 ) 48 } 49 50 pub enum TransferResult { 51 Success(TransferResponse), 52 RequestUidReuse, 53 WtidReuse, 54 } 55 56 pub async fn transfer(db: &PgPool, transfer: TransferRequest) -> sqlx::Result<TransferResult> { 57 sqlx::query( 58 " 59 SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at 60 FROM taler_transfer(($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8) 61 ", 62 ) 63 .bind_amount(&transfer.amount) 64 .bind(transfer.exchange_base_url.as_str()) 65 .bind(format!("{} {}", transfer.wtid, transfer.exchange_base_url)) 66 .bind(transfer.credit_account.raw()) 67 .bind(transfer.request_uid.as_slice()) 68 .bind(transfer.wtid.as_slice()) 69 .bind_timestamp(&Timestamp::now()) 70 .try_map(|r: PgRow| { 71 Ok(if r.try_get("out_request_uid_reuse")? { 72 TransferResult::RequestUidReuse 73 } else if r.try_get("out_wtid_reuse")? { 74 TransferResult::WtidReuse 75 } else { 76 TransferResult::Success(TransferResponse { 77 row_id: r.try_get_safeu64("out_transfer_row_id")?, 78 timestamp: r.try_get_timestamp("out_created_at")?.into(), 79 }) 80 }) 81 }) 82 .fetch_one(db) 83 .await 84 } 85 86 pub async fn transfer_page( 87 db: &PgPool, 88 status: &Option<TransferState>, 89 params: &Page, 90 currency: &Currency, 91 ) -> sqlx::Result<Vec<TransferListStatus>> { 92 page( 93 db, 94 "transfer_id", 95 params, 96 || { 97 let mut builder = QueryBuilder::new( 98 " 99 SELECT 100 transfer_id, 101 status, 102 (amount).val as amount_val, 103 (amount).frac as amount_frac, 104 credit_payto, 105 created_at 106 FROM transfer WHERE 107 ", 108 ); 109 if let Some(status) = status { 110 builder.push(" status = ").push_bind(status).push(" AND "); 111 } 112 builder 113 }, 114 |r: PgRow| { 115 Ok(TransferListStatus { 116 row_id: r.try_get_safeu64("transfer_id")?, 117 status: r.try_get("status")?, 118 amount: r.try_get_amount("amount", currency)?, 119 credit_account: r.try_get_payto("credit_payto")?, 120 timestamp: r.try_get_timestamp("created_at")?.into(), 121 }) 122 }, 123 ) 124 .await 125 } 126 127 pub async fn transfer_by_id( 128 db: &PgPool, 129 id: u64, 130 currency: &Currency, 131 ) -> sqlx::Result<Option<TransferStatus>> { 132 sqlx::query( 133 " 134 SELECT 135 status, 136 status_msg, 137 (amount).val as amount_val, 138 (amount).frac as amount_frac, 139 exchange_base_url, 140 wtid, 141 credit_payto, 142 created_at 143 FROM transfer WHERE transfer_id = $1 144 ", 145 ) 146 .bind(id as i64) 147 .try_map(|r: PgRow| { 148 Ok(TransferStatus { 149 status: r.try_get("status")?, 150 status_msg: r.try_get("status_msg")?, 151 amount: r.try_get_amount("amount", currency)?, 152 origin_exchange_url: r.try_get("exchange_base_url")?, 153 wtid: r.try_get_base32("wtid")?, 154 credit_account: r.try_get_payto("credit_payto")?, 155 timestamp: r.try_get_timestamp("created_at")?.into(), 156 }) 157 }) 158 .fetch_optional(db) 159 .await 160 } 161 162 pub async fn outgoing_revenue( 163 db: &PgPool, 164 params: &History, 165 currency: &Currency, 166 listen: impl FnOnce() -> Receiver<i64>, 167 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 168 history( 169 db, 170 "transfer_id", 171 params, 172 listen, 173 || { 174 QueryBuilder::new( 175 " 176 SELECT 177 transfer_id, 178 (amount).val as amount_val, 179 (amount).frac as amount_frac, 180 exchange_base_url, 181 wtid, 182 credit_payto, 183 created_at 184 FROM transfer WHERE status = 'success' AND 185 ", 186 ) 187 }, 188 |r| { 189 Ok(OutgoingBankTransaction { 190 amount: r.try_get_amount("amount", currency)?, 191 wtid: r.try_get_base32("wtid")?, 192 credit_account: r.try_get_payto("credit_payto")?, 193 row_id: r.try_get_safeu64("transfer_id")?, 194 date: r.try_get_timestamp("created_at")?.into(), 195 exchange_base_url: r.try_get_url("exchange_base_url")?, 196 }) 197 }, 198 ) 199 .await 200 } 201 202 pub enum AddIncomingResult { 203 Success { id: SafeU64, created_at: Timestamp }, 204 ReservePubReuse, 205 } 206 207 pub async fn add_incoming( 208 db: &PgPool, 209 amount: &Amount, 210 debit_account: &PaytoURI, 211 subject: &str, 212 timestamp: &Timestamp, 213 kind: IncomingType, 214 key: &EddsaPublicKey, 215 ) -> sqlx::Result<AddIncomingResult> { 216 sqlx::query( 217 " 218 SELECT out_reserve_pub_reuse, out_tx_row_id, out_created_at 219 FROM add_incoming(($1, $2)::taler_amount, $3, $4, $5, $6, $7) 220 ", 221 ) 222 .bind_amount(amount) 223 .bind(subject) 224 .bind(debit_account.raw()) 225 .bind(kind) 226 .bind(key.as_slice()) 227 .bind_timestamp(timestamp) 228 .try_map(|r: PgRow| { 229 Ok(if r.try_get("out_reserve_pub_reuse")? { 230 AddIncomingResult::ReservePubReuse 231 } else { 232 AddIncomingResult::Success { 233 id: r.try_get_safeu64("out_tx_row_id")?, 234 created_at: r.try_get_timestamp("out_created_at")?.into(), 235 } 236 }) 237 }) 238 .fetch_one(db) 239 .await 240 } 241 242 pub async fn incoming_history( 243 db: &PgPool, 244 params: &History, 245 currency: &Currency, 246 listen: impl FnOnce() -> Receiver<i64>, 247 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 248 history( 249 db, 250 "tx_in_id", 251 params, 252 listen, 253 || { 254 QueryBuilder::new( 255 " 256 SELECT 257 type, 258 tx_in_id, 259 (amount).val as amount_val, 260 (amount).frac as amount_frac, 261 created_at, 262 debit_payto, 263 metadata, 264 origin_exchange_url 265 FROM tx_in WHERE 266 ", 267 ) 268 }, 269 |r: PgRow| { 270 let kind: IncomingType = r.try_get("type")?; 271 Ok(match kind { 272 IncomingType::reserve => IncomingBankTransaction::Reserve { 273 row_id: r.try_get_safeu64("tx_in_id")?, 274 date: r.try_get_timestamp("created_at")?.into(), 275 amount: r.try_get_amount("amount", currency)?, 276 debit_account: r.try_get_payto("debit_payto")?, 277 reserve_pub: r.try_get_base32("metadata")?, 278 }, 279 IncomingType::kyc => IncomingBankTransaction::Kyc { 280 row_id: r.try_get_safeu64("tx_in_id")?, 281 date: r.try_get_timestamp("created_at")?.into(), 282 amount: r.try_get_amount("amount", currency)?, 283 debit_account: r.try_get_payto("debit_payto")?, 284 account_pub: r.try_get_base32("metadata")?, 285 }, 286 IncomingType::wad => IncomingBankTransaction::Wad { 287 row_id: r.try_get_safeu64("tx_in_id")?, 288 date: r.try_get_timestamp("created_at")?.into(), 289 amount: r.try_get_amount("amount", currency)?, 290 debit_account: r.try_get_payto("debit_payto")?, 291 origin_exchange_url: r.try_get_url("origin_exchange_url")?, 292 wad_id: r.try_get_base32("metadata")?, 293 }, 294 }) 295 }, 296 ) 297 .await 298 } 299 300 pub async fn revenue_history( 301 db: &PgPool, 302 params: &History, 303 currency: &Currency, 304 listen: impl FnOnce() -> Receiver<i64>, 305 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 306 history( 307 db, 308 "tx_in_id", 309 params, 310 listen, 311 || { 312 QueryBuilder::new( 313 " 314 SELECT 315 tx_in_id, 316 (amount).val as amount_val, 317 (amount).frac as amount_frac, 318 created_at, 319 debit_payto, 320 subject 321 FROM tx_in WHERE 322 ", 323 ) 324 }, 325 |r: PgRow| { 326 Ok(RevenueIncomingBankTransaction { 327 row_id: r.try_get_safeu64("tx_in_id")?, 328 date: r.try_get_timestamp("created_at")?.into(), 329 amount: r.try_get_amount("amount", currency)?, 330 credit_fee: None, 331 debit_account: r.try_get_payto("debit_payto")?, 332 subject: r.try_get("subject")?, 333 }) 334 }, 335 ) 336 .await 337 }