taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (10563B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2024, 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use jiff::Timestamp;
     18 use sqlx::{PgPool, QueryBuilder, Row, postgres::PgRow};
     19 use taler_api::db::{BindHelper, IncomingType, TypeHelper, history, page};
     20 use taler_common::{
     21     api_common::{EddsaPublicKey, SafeU64},
     22     api_params::{History, Page},
     23     api_revenue::RevenueIncomingBankTransaction,
     24     api_wire::{
     25         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest,
     26         TransferResponse, TransferState, TransferStatus,
     27     },
     28     types::{
     29         amount::{Amount, Currency},
     30         payto::PaytoURI,
     31     },
     32 };
     33 use tokio::sync::watch::{Receiver, Sender};
     34 
     35 pub async fn notification_listener(
     36     pool: PgPool,
     37     outgoing_channel: Sender<i64>,
     38     incoming_channel: Sender<i64>,
     39 ) -> sqlx::Result<()> {
     40     taler_api::notification::notification_listener!(&pool,
     41         "outgoing_tx" => (row_id: i64) {
     42             outgoing_channel.send_replace(row_id);
     43         },
     44         "incoming_tx" => (row_id: i64) {
     45             incoming_channel.send_replace(row_id);
     46         }
     47     )
     48 }
     49 
     50 pub enum TransferResult {
     51     Success(TransferResponse),
     52     RequestUidReuse,
     53     WtidReuse,
     54 }
     55 
     56 pub async fn transfer(db: &PgPool, transfer: TransferRequest) -> sqlx::Result<TransferResult> {
     57     sqlx::query(
     58         "
     59             SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at
     60             FROM taler_transfer($1, $2, $3, $4, $5, $6, $7)
     61         ",
     62     )
     63     .bind(&transfer.amount)
     64     .bind(transfer.exchange_base_url.as_str())
     65     .bind(format!("{} {}", transfer.wtid, transfer.exchange_base_url))
     66     .bind(transfer.credit_account.raw())
     67     .bind(transfer.request_uid.as_slice())
     68     .bind(transfer.wtid.as_slice())
     69     .bind_timestamp(&Timestamp::now())
     70     .try_map(|r: PgRow| {
     71         Ok(if r.try_get_flag("out_request_uid_reuse")? {
     72             TransferResult::RequestUidReuse
     73         } else if r.try_get_flag("out_wtid_reuse")? {
     74             TransferResult::WtidReuse
     75         } else {
     76             TransferResult::Success(TransferResponse {
     77                 row_id: r.try_get_safeu64("out_transfer_row_id")?,
     78                 timestamp: r.try_get_timestamp("out_created_at")?.into(),
     79             })
     80         })
     81     })
     82     .fetch_one(db)
     83     .await
     84 }
     85 
     86 pub async fn transfer_page(
     87     db: &PgPool,
     88     status: &Option<TransferState>,
     89     params: &Page,
     90     currency: &Currency,
     91 ) -> sqlx::Result<Vec<TransferListStatus>> {
     92     page(
     93         db,
     94         "transfer_id",
     95         params,
     96         || {
     97             let mut builder = QueryBuilder::new(
     98                 "
     99                     SELECT
    100                         transfer_id,
    101                         status,
    102                         amount,
    103                         credit_payto,
    104                         created_at
    105                     FROM transfer WHERE
    106                 ",
    107             );
    108             if let Some(status) = status {
    109                 builder.push(" status = ").push_bind(status).push(" AND ");
    110             }
    111             builder
    112         },
    113         |r: PgRow| {
    114             Ok(TransferListStatus {
    115                 row_id: r.try_get_safeu64("transfer_id")?,
    116                 status: r.try_get("status")?,
    117                 amount: r.try_get_amount("amount", currency)?,
    118                 credit_account: r.try_get_payto("credit_payto")?,
    119                 timestamp: r.try_get_timestamp("created_at")?.into(),
    120             })
    121         },
    122     )
    123     .await
    124 }
    125 
    126 pub async fn transfer_by_id(
    127     db: &PgPool,
    128     id: u64,
    129     currency: &Currency,
    130 ) -> sqlx::Result<Option<TransferStatus>> {
    131     sqlx::query(
    132         "
    133             SELECT
    134                 status,
    135                 status_msg,
    136                 amount,
    137                 exchange_base_url,
    138                 wtid,
    139                 credit_payto,
    140                 created_at
    141             FROM transfer WHERE transfer_id = $1
    142         ",
    143     )
    144     .bind(id as i64)
    145     .try_map(|r: PgRow| {
    146         Ok(TransferStatus {
    147             status: r.try_get("status")?,
    148             status_msg: r.try_get("status_msg")?,
    149             amount: r.try_get_amount("amount", currency)?,
    150             origin_exchange_url: r.try_get("exchange_base_url")?,
    151             wtid: r.try_get("wtid")?,
    152             credit_account: r.try_get_payto("credit_payto")?,
    153             timestamp: r.try_get_timestamp("created_at")?.into(),
    154         })
    155     })
    156     .fetch_optional(db)
    157     .await
    158 }
    159 
    160 pub async fn outgoing_revenue(
    161     db: &PgPool,
    162     params: &History,
    163     currency: &Currency,
    164     listen: impl FnOnce() -> Receiver<i64>,
    165 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    166     history(
    167         db,
    168         "transfer_id",
    169         params,
    170         listen,
    171         || {
    172             QueryBuilder::new(
    173                 "
    174                 SELECT
    175                     transfer_id,
    176                     amount,
    177                     exchange_base_url,
    178                     wtid,
    179                     credit_payto,
    180                     created_at
    181                 FROM transfer WHERE status = 'success' AND
    182             ",
    183             )
    184         },
    185         |r| {
    186             Ok(OutgoingBankTransaction {
    187                 amount: r.try_get_amount("amount", currency)?,
    188                 debit_fee: None,
    189                 wtid: r.try_get("wtid")?,
    190                 credit_account: r.try_get_payto("credit_payto")?,
    191                 row_id: r.try_get_safeu64("transfer_id")?,
    192                 date: r.try_get_timestamp("created_at")?.into(),
    193                 exchange_base_url: r.try_get_url("exchange_base_url")?,
    194             })
    195         },
    196     )
    197     .await
    198 }
    199 
    200 pub enum AddIncomingResult {
    201     Success { id: SafeU64, created_at: Timestamp },
    202     ReservePubReuse,
    203 }
    204 
    205 pub async fn add_incoming(
    206     db: &PgPool,
    207     amount: &Amount,
    208     debit_account: &PaytoURI,
    209     subject: &str,
    210     timestamp: &Timestamp,
    211     kind: IncomingType,
    212     key: &EddsaPublicKey,
    213 ) -> sqlx::Result<AddIncomingResult> {
    214     sqlx::query(
    215         "
    216             SELECT out_reserve_pub_reuse, out_tx_row_id, out_created_at
    217             FROM add_incoming($1, $2, $3, $4, $5, $6)
    218         ",
    219     )
    220     .bind(amount)
    221     .bind(subject)
    222     .bind(debit_account.raw())
    223     .bind(kind)
    224     .bind(key.as_ref().as_slice())
    225     .bind_timestamp(timestamp)
    226     .try_map(|r: PgRow| {
    227         Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    228             AddIncomingResult::ReservePubReuse
    229         } else {
    230             AddIncomingResult::Success {
    231                 id: r.try_get_safeu64("out_tx_row_id")?,
    232                 created_at: r.try_get_timestamp("out_created_at")?.into(),
    233             }
    234         })
    235     })
    236     .fetch_one(db)
    237     .await
    238 }
    239 
    240 pub async fn incoming_history(
    241     db: &PgPool,
    242     params: &History,
    243     currency: &Currency,
    244     listen: impl FnOnce() -> Receiver<i64>,
    245 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    246     history(
    247         db,
    248         "tx_in_id",
    249         params,
    250         listen,
    251         || {
    252             QueryBuilder::new(
    253                 "
    254                  SELECT
    255                     type,
    256                     tx_in_id,
    257                     amount,
    258                     created_at,
    259                     debit_payto,
    260                     metadata,
    261                     origin_exchange_url
    262                 FROM tx_in WHERE
    263             ",
    264             )
    265         },
    266         |r: PgRow| {
    267             Ok(match r.try_get("type")? {
    268                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    269                     row_id: r.try_get_safeu64("tx_in_id")?,
    270                     date: r.try_get_timestamp("created_at")?.into(),
    271                     amount: r.try_get_amount("amount", currency)?,
    272                     credit_fee: None,
    273                     debit_account: r.try_get_payto("debit_payto")?,
    274                     reserve_pub: r.try_get("metadata")?,
    275                     authorization_pub: None,
    276                     authorization_sig: None,
    277                 },
    278                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    279                     row_id: r.try_get_safeu64("tx_in_id")?,
    280                     date: r.try_get_timestamp("created_at")?.into(),
    281                     amount: r.try_get_amount("amount", currency)?,
    282                     credit_fee: None,
    283                     debit_account: r.try_get_payto("debit_payto")?,
    284                     account_pub: r.try_get("metadata")?,
    285                     authorization_pub: None,
    286                     authorization_sig: None,
    287                 },
    288                 IncomingType::wad => IncomingBankTransaction::Wad {
    289                     row_id: r.try_get_safeu64("tx_in_id")?,
    290                     date: r.try_get_timestamp("created_at")?.into(),
    291                     amount: r.try_get_amount("amount", currency)?,
    292                     debit_account: r.try_get_payto("debit_payto")?,
    293                     origin_exchange_url: r.try_get_url("origin_exchange_url")?,
    294                     wad_id: r.try_get("metadata")?,
    295                 },
    296             })
    297         },
    298     )
    299     .await
    300 }
    301 
    302 pub async fn revenue_history(
    303     db: &PgPool,
    304     params: &History,
    305     currency: &Currency,
    306     listen: impl FnOnce() -> Receiver<i64>,
    307 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    308     history(
    309         db,
    310         "tx_in_id",
    311         params,
    312         listen,
    313         || {
    314             QueryBuilder::new(
    315                 "
    316                  SELECT
    317                     tx_in_id,
    318                     amount,
    319                     created_at,
    320                     debit_payto,
    321                     subject
    322                 FROM tx_in WHERE
    323             ",
    324             )
    325         },
    326         |r: PgRow| {
    327             Ok(RevenueIncomingBankTransaction {
    328                 row_id: r.try_get_safeu64("tx_in_id")?,
    329                 date: r.try_get_timestamp("created_at")?.into(),
    330                 amount: r.try_get_amount("amount", currency)?,
    331                 credit_fee: None,
    332                 debit_account: r.try_get_payto("debit_payto")?,
    333                 subject: r.try_get("subject")?,
    334             })
    335         },
    336     )
    337     .await
    338 }