taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (10725B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2024-2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use jiff::Timestamp;
     18 use sqlx::{PgPool, QueryBuilder, Row, postgres::PgRow};
     19 use taler_api::db::{BindHelper, IncomingType, TypeHelper, history, page};
     20 use taler_common::{
     21     api_common::{EddsaPublicKey, SafeU64},
     22     api_params::{History, Page},
     23     api_revenue::RevenueIncomingBankTransaction,
     24     api_wire::{
     25         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest,
     26         TransferResponse, TransferState, TransferStatus,
     27     },
     28     types::{
     29         amount::{Amount, Currency},
     30         payto::PaytoURI,
     31     },
     32 };
     33 use tokio::sync::watch::{Receiver, Sender};
     34 
     35 pub async fn notification_listener(
     36     pool: PgPool,
     37     outgoing_channel: Sender<i64>,
     38     incoming_channel: Sender<i64>,
     39 ) -> sqlx::Result<()> {
     40     taler_api::notification::notification_listener!(&pool,
     41         "outgoing_tx" => (row_id: i64) {
     42             outgoing_channel.send_replace(row_id);
     43         },
     44         "incoming_tx" => (row_id: i64) {
     45             incoming_channel.send_replace(row_id);
     46         }
     47     )
     48 }
     49 
     50 pub enum TransferResult {
     51     Success(TransferResponse),
     52     RequestUidReuse,
     53     WtidReuse,
     54 }
     55 
     56 pub async fn transfer(db: &PgPool, transfer: TransferRequest) -> sqlx::Result<TransferResult> {
     57     sqlx::query(
     58         "
     59             SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at
     60             FROM taler_transfer(($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8)
     61         ",
     62     )
     63     .bind_amount(&transfer.amount)
     64     .bind(transfer.exchange_base_url.as_str())
     65     .bind(format!("{} {}", transfer.wtid, transfer.exchange_base_url))
     66     .bind(transfer.credit_account.raw())
     67     .bind(transfer.request_uid.as_slice())
     68     .bind(transfer.wtid.as_slice())
     69     .bind_timestamp(&Timestamp::now())
     70     .try_map(|r: PgRow| {
     71         Ok(if r.try_get("out_request_uid_reuse")? {
     72             TransferResult::RequestUidReuse
     73         } else if r.try_get("out_wtid_reuse")? {
     74             TransferResult::WtidReuse
     75         } else {
     76             TransferResult::Success(TransferResponse {
     77                 row_id: r.try_get_safeu64("out_transfer_row_id")?,
     78                 timestamp: r.try_get_timestamp("out_created_at")?.into(),
     79             })
     80         })
     81     })
     82     .fetch_one(db)
     83     .await
     84 }
     85 
     86 pub async fn transfer_page(
     87     db: &PgPool,
     88     status: &Option<TransferState>,
     89     params: &Page,
     90     currency: &Currency,
     91 ) -> sqlx::Result<Vec<TransferListStatus>> {
     92     page(
     93         db,
     94         "transfer_id",
     95         params,
     96         || {
     97             let mut builder = QueryBuilder::new(
     98                 "
     99                     SELECT
    100                         transfer_id,
    101                         status,
    102                         (amount).val as amount_val,
    103                         (amount).frac as amount_frac,
    104                         credit_payto,
    105                         created_at
    106                     FROM transfer WHERE
    107                 ",
    108             );
    109             if let Some(status) = status {
    110                 builder.push(" status = ").push_bind(status).push(" AND ");
    111             }
    112             builder
    113         },
    114         |r: PgRow| {
    115             Ok(TransferListStatus {
    116                 row_id: r.try_get_safeu64("transfer_id")?,
    117                 status: r.try_get("status")?,
    118                 amount: r.try_get_amount("amount", currency)?,
    119                 credit_account: r.try_get_payto("credit_payto")?,
    120                 timestamp: r.try_get_timestamp("created_at")?.into(),
    121             })
    122         },
    123     )
    124     .await
    125 }
    126 
    127 pub async fn transfer_by_id(
    128     db: &PgPool,
    129     id: u64,
    130     currency: &Currency,
    131 ) -> sqlx::Result<Option<TransferStatus>> {
    132     sqlx::query(
    133         "
    134             SELECT
    135                 status,
    136                 status_msg,
    137                 (amount).val as amount_val,
    138                 (amount).frac as amount_frac,
    139                 exchange_base_url,
    140                 wtid,
    141                 credit_payto,
    142                 created_at
    143             FROM transfer WHERE transfer_id = $1
    144         ",
    145     )
    146     .bind(id as i64)
    147     .try_map(|r: PgRow| {
    148         Ok(TransferStatus {
    149             status: r.try_get("status")?,
    150             status_msg: r.try_get("status_msg")?,
    151             amount: r.try_get_amount("amount", currency)?,
    152             origin_exchange_url: r.try_get("exchange_base_url")?,
    153             wtid: r.try_get_base32("wtid")?,
    154             credit_account: r.try_get_payto("credit_payto")?,
    155             timestamp: r.try_get_timestamp("created_at")?.into(),
    156         })
    157     })
    158     .fetch_optional(db)
    159     .await
    160 }
    161 
    162 pub async fn outgoing_revenue(
    163     db: &PgPool,
    164     params: &History,
    165     currency: &Currency,
    166     listen: impl FnOnce() -> Receiver<i64>,
    167 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    168     history(
    169         db,
    170         "transfer_id",
    171         params,
    172         listen,
    173         || {
    174             QueryBuilder::new(
    175                 "
    176                 SELECT
    177                     transfer_id,
    178                     (amount).val as amount_val,
    179                     (amount).frac as amount_frac,
    180                     exchange_base_url,
    181                     wtid,
    182                     credit_payto,
    183                     created_at
    184                 FROM transfer WHERE status = 'success' AND
    185             ",
    186             )
    187         },
    188         |r| {
    189             Ok(OutgoingBankTransaction {
    190                 amount: r.try_get_amount("amount", currency)?,
    191                 wtid: r.try_get_base32("wtid")?,
    192                 credit_account: r.try_get_payto("credit_payto")?,
    193                 row_id: r.try_get_safeu64("transfer_id")?,
    194                 date: r.try_get_timestamp("created_at")?.into(),
    195                 exchange_base_url: r.try_get_url("exchange_base_url")?,
    196             })
    197         },
    198     )
    199     .await
    200 }
    201 
    202 pub enum AddIncomingResult {
    203     Success { id: SafeU64, created_at: Timestamp },
    204     ReservePubReuse,
    205 }
    206 
    207 pub async fn add_incoming(
    208     db: &PgPool,
    209     amount: &Amount,
    210     debit_account: &PaytoURI,
    211     subject: &str,
    212     timestamp: &Timestamp,
    213     kind: IncomingType,
    214     key: &EddsaPublicKey,
    215 ) -> sqlx::Result<AddIncomingResult> {
    216     sqlx::query(
    217         "
    218             SELECT out_reserve_pub_reuse, out_tx_row_id, out_created_at
    219             FROM add_incoming(($1, $2)::taler_amount, $3, $4, $5, $6, $7)
    220         ",
    221     )
    222     .bind_amount(amount)
    223     .bind(subject)
    224     .bind(debit_account.raw())
    225     .bind(kind)
    226     .bind(key.as_slice())
    227     .bind_timestamp(timestamp)
    228     .try_map(|r: PgRow| {
    229         Ok(if r.try_get("out_reserve_pub_reuse")? {
    230             AddIncomingResult::ReservePubReuse
    231         } else {
    232             AddIncomingResult::Success {
    233                 id: r.try_get_safeu64("out_tx_row_id")?,
    234                 created_at: r.try_get_timestamp("out_created_at")?.into(),
    235             }
    236         })
    237     })
    238     .fetch_one(db)
    239     .await
    240 }
    241 
    242 pub async fn incoming_history(
    243     db: &PgPool,
    244     params: &History,
    245     currency: &Currency,
    246     listen: impl FnOnce() -> Receiver<i64>,
    247 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    248     history(
    249         db,
    250         "tx_in_id",
    251         params,
    252         listen,
    253         || {
    254             QueryBuilder::new(
    255                 "
    256                  SELECT
    257                     type,
    258                     tx_in_id,
    259                     (amount).val as amount_val,
    260                     (amount).frac as amount_frac,
    261                     created_at,
    262                     debit_payto,
    263                     metadata,
    264                     origin_exchange_url
    265                 FROM tx_in WHERE
    266             ",
    267             )
    268         },
    269         |r: PgRow| {
    270             let kind: IncomingType = r.try_get("type")?;
    271             Ok(match kind {
    272                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    273                     row_id: r.try_get_safeu64("tx_in_id")?,
    274                     date: r.try_get_timestamp("created_at")?.into(),
    275                     amount: r.try_get_amount("amount", currency)?,
    276                     debit_account: r.try_get_payto("debit_payto")?,
    277                     reserve_pub: r.try_get_base32("metadata")?,
    278                 },
    279                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    280                     row_id: r.try_get_safeu64("tx_in_id")?,
    281                     date: r.try_get_timestamp("created_at")?.into(),
    282                     amount: r.try_get_amount("amount", currency)?,
    283                     debit_account: r.try_get_payto("debit_payto")?,
    284                     account_pub: r.try_get_base32("metadata")?,
    285                 },
    286                 IncomingType::wad => IncomingBankTransaction::Wad {
    287                     row_id: r.try_get_safeu64("tx_in_id")?,
    288                     date: r.try_get_timestamp("created_at")?.into(),
    289                     amount: r.try_get_amount("amount", currency)?,
    290                     debit_account: r.try_get_payto("debit_payto")?,
    291                     origin_exchange_url: r.try_get_url("origin_exchange_url")?,
    292                     wad_id: r.try_get_base32("metadata")?,
    293                 },
    294             })
    295         },
    296     )
    297     .await
    298 }
    299 
    300 pub async fn revenue_history(
    301     db: &PgPool,
    302     params: &History,
    303     currency: &Currency,
    304     listen: impl FnOnce() -> Receiver<i64>,
    305 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    306     history(
    307         db,
    308         "tx_in_id",
    309         params,
    310         listen,
    311         || {
    312             QueryBuilder::new(
    313                 "
    314                  SELECT
    315                     tx_in_id,
    316                     (amount).val as amount_val,
    317                     (amount).frac as amount_frac,
    318                     created_at,
    319                     debit_payto,
    320                     subject
    321                 FROM tx_in WHERE
    322             ",
    323             )
    324         },
    325         |r: PgRow| {
    326             Ok(RevenueIncomingBankTransaction {
    327                 row_id: r.try_get_safeu64("tx_in_id")?,
    328                 date: r.try_get_timestamp("created_at")?.into(),
    329                 amount: r.try_get_amount("amount", currency)?,
    330                 credit_fee: None,
    331                 debit_account: r.try_get_payto("debit_payto")?,
    332                 subject: r.try_get("subject")?,
    333             })
    334         },
    335     )
    336     .await
    337 }