taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (49062B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use compact_str::CompactString;
     20 use jiff::Timestamp;
     21 use serde::{Serialize, de::DeserializeOwned};
     22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow};
     23 use taler_api::{
     24     db::{BindHelper, TypeHelper, history, page},
     25     serialized,
     26     subject::{IncomingSubject, OutgoingSubject, fmt_out_subject},
     27 };
     28 use taler_common::{
     29     api::{
     30         HashCode, ShortHashCode,
     31         params::{History, Page},
     32         prepared::{RegistrationRequest, Unregistration},
     33         revenue::RevenueIncomingBankTransaction,
     34         wire::{
     35             IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState,
     36             TransferStatus,
     37         },
     38     },
     39     config::Config,
     40     db::IncomingType,
     41     types::{
     42         amount::{Currency, Decimal},
     43         payto::{PaytoImpl as _, PaytoURI},
     44     },
     45 };
     46 use tokio::sync::watch::{Receiver, Sender};
     47 use url::Url;
     48 
     49 use crate::{
     50     config::parse_db_cfg,
     51     payto::{CyclosAccount, CyclosId, FullCyclosPayto},
     52 };
     53 
     54 const SCHEMA: &str = "cyclos";
     55 
     56 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> {
     57     let db = parse_db_cfg(cfg)?;
     58     let pool = taler_common::db::pool(db.cfg, SCHEMA).await?;
     59     Ok(pool)
     60 }
     61 
     62 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> {
     63     let db_cfg = parse_db_cfg(cfg)?;
     64     let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?;
     65     let mut db = pool.acquire().await?;
     66     taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?;
     67     Ok(pool)
     68 }
     69 
     70 pub async fn notification_listener(
     71     pool: PgPool,
     72     in_channel: Sender<i64>,
     73     taler_in_channel: Sender<i64>,
     74     out_channel: Sender<i64>,
     75     taler_out_channel: Sender<i64>,
     76 ) {
     77     taler_api::notification::notification_listener!(&pool,
     78         "tx_in" => (row_id: i64) {
     79             in_channel.send_replace(row_id);
     80         },
     81         "taler_in" => (row_id: i64) {
     82             taler_in_channel.send_replace(row_id);
     83         },
     84         "tx_out" => (row_id: i64) {
     85             out_channel.send_replace(row_id);
     86         },
     87         "taler_out" => (row_id: i64) {
     88             taler_out_channel.send_replace(row_id);
     89         }
     90     )
     91 }
     92 
     93 #[derive(Debug, Clone)]
     94 pub struct TxIn {
     95     pub transfer_id: i64,
     96     pub tx_id: Option<i64>,
     97     pub amount: Decimal,
     98     pub subject: String,
     99     pub debtor_id: i64,
    100     pub debtor_name: CompactString,
    101     pub valued_at: Timestamp,
    102 }
    103 
    104 impl Display for TxIn {
    105     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    106         let Self {
    107             transfer_id,
    108             tx_id,
    109             amount,
    110             subject,
    111             valued_at,
    112             debtor_id,
    113             debtor_name,
    114         } = self;
    115         let tx_id = match tx_id {
    116             Some(id) => format_args!(":{}", *id),
    117             None => format_args!(""),
    118         };
    119         write!(
    120             f,
    121             "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'"
    122         )
    123     }
    124 }
    125 
    126 #[derive(Debug, Clone)]
    127 pub struct TxOut {
    128     pub transfer_id: i64,
    129     pub tx_id: Option<i64>,
    130     pub amount: Decimal,
    131     pub subject: String,
    132     pub creditor_id: i64,
    133     pub creditor_name: CompactString,
    134     pub valued_at: Timestamp,
    135 }
    136 
    137 impl Display for TxOut {
    138     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    139         let Self {
    140             transfer_id,
    141             tx_id,
    142             amount,
    143             subject,
    144             creditor_id,
    145             creditor_name,
    146             valued_at,
    147         } = self;
    148         let tx_id = match tx_id {
    149             Some(id) => format_args!(":{}", *id),
    150             None => format_args!(""),
    151         };
    152         write!(
    153             f,
    154             "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    155         )
    156     }
    157 }
    158 
    159 #[derive(Debug, PartialEq, Eq)]
    160 pub struct Initiated {
    161     pub id: i64,
    162     pub amount: Decimal,
    163     pub subject: String,
    164     pub creditor_id: i64,
    165     pub creditor_name: CompactString,
    166 }
    167 
    168 impl Display for Initiated {
    169     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    170         let Self {
    171             id,
    172             amount,
    173             subject,
    174             creditor_id,
    175             creditor_name,
    176         } = self;
    177         write!(
    178             f,
    179             "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    180         )
    181     }
    182 }
    183 
    184 #[derive(Debug, Clone)]
    185 pub struct TxInAdmin {
    186     pub amount: Decimal,
    187     pub subject: String,
    188     pub debtor_id: i64,
    189     pub debtor_name: CompactString,
    190     pub metadata: IncomingSubject,
    191 }
    192 
    193 #[derive(Debug, PartialEq, Eq)]
    194 pub enum AddIncomingResult {
    195     Success {
    196         new: bool,
    197         pending: bool,
    198         row_id: u64,
    199         valued_at: Timestamp,
    200     },
    201     ReservePubReuse,
    202     UnknownMapping,
    203     MappingReuse,
    204 }
    205 
    206 /// Lock the database for worker execution
    207 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
    208     sqlx::query("SELECT pg_try_advisory_lock(42)")
    209         .try_map(|r: PgRow| r.try_get(0))
    210         .fetch_one(e)
    211         .await
    212 }
    213 
    214 pub async fn register_tx_in_admin(
    215     db: &PgPool,
    216     tx: &TxInAdmin,
    217     now: &Timestamp,
    218 ) -> sqlx::Result<AddIncomingResult> {
    219     serialized!(
    220         sqlx::query(
    221             "
    222             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    223             FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5)
    224         ",
    225         )
    226         .bind(tx.amount)
    227         .bind(&tx.subject)
    228         .bind(tx.debtor_id)
    229         .bind(&tx.debtor_name)
    230         .bind_timestamp(now)
    231         .bind(tx.metadata.ty())
    232         .bind(tx.metadata.key())
    233        .try_map(|r: PgRow| {
    234             Ok(if r.try_get_flag(0)? {
    235                 AddIncomingResult::ReservePubReuse
    236             } else if r.try_get_flag(1)? {
    237                 AddIncomingResult::MappingReuse
    238             } else if r.try_get_flag(2)? {
    239                 AddIncomingResult::UnknownMapping
    240             } else {
    241                 AddIncomingResult::Success {
    242                     row_id: r.try_get_u64(3)?,
    243                     valued_at: r.try_get_timestamp(4)?,
    244                     new: r.try_get(5)?,
    245                     pending: r.try_get(6)?
    246                 }
    247             })
    248         })
    249         .fetch_one(db)
    250     )
    251 }
    252 
    253 pub async fn register_tx_in(
    254     db: &mut PgConnection,
    255     tx: &TxIn,
    256     subject: &Option<IncomingSubject>,
    257     now: &Timestamp,
    258 ) -> sqlx::Result<AddIncomingResult> {
    259     serialized!(
    260         sqlx::query(
    261             "
    262             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    263             FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    264         ",
    265         )
    266         .bind(tx.transfer_id)
    267         .bind(tx.tx_id)
    268         .bind(tx.amount)
    269         .bind(&tx.subject)
    270         .bind(tx.debtor_id)
    271         .bind(&tx.debtor_name)
    272         .bind(tx.valued_at.as_microsecond())
    273         .bind(subject.as_ref().map(|it| it.ty()))
    274         .bind(subject.as_ref().map(|it| it.key()))
    275         .bind(now.as_microsecond())
    276         .try_map(|r: PgRow| {
    277             Ok(if r.try_get_flag(0)? {
    278                 AddIncomingResult::ReservePubReuse
    279             } else if r.try_get_flag(1)? {
    280                 AddIncomingResult::MappingReuse
    281             } else if r.try_get_flag(2)? {
    282                 AddIncomingResult::UnknownMapping
    283             } else {
    284                 AddIncomingResult::Success {
    285                     row_id: r.try_get_u64(3)?,
    286                     valued_at: r.try_get_timestamp(4)?,
    287                     new: r.try_get(5)?,
    288                     pending: r.try_get(6)?
    289                 }
    290             })
    291         })
    292         .fetch_one(&mut *db)
    293     )
    294 }
    295 
    296 #[derive(Debug)]
    297 pub enum TxOutKind {
    298     Simple,
    299     Bounce(i64),
    300     Talerable(OutgoingSubject),
    301 }
    302 
    303 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    304 #[allow(non_camel_case_types)]
    305 #[sqlx(type_name = "register_result")]
    306 pub enum RegisterResult {
    307     /// Already registered
    308     idempotent,
    309     /// Initiated transaction
    310     known,
    311     /// Recovered unknown outgoing transaction
    312     recovered,
    313 }
    314 
    315 #[derive(Debug, PartialEq, Eq)]
    316 pub struct AddOutgoingResult {
    317     pub result: RegisterResult,
    318     pub row_id: i64,
    319 }
    320 
    321 pub async fn register_tx_out(
    322     db: &mut PgConnection,
    323     tx: &TxOut,
    324     kind: &TxOutKind,
    325     now: &Timestamp,
    326 ) -> sqlx::Result<AddOutgoingResult> {
    327     serialized!({
    328         let query = sqlx::query(
    329             "
    330                 SELECT out_result, out_tx_row_id
    331                 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
    332             ",
    333         )
    334         .bind(tx.transfer_id)
    335         .bind(tx.tx_id)
    336         .bind(tx.amount)
    337         .bind(&tx.subject)
    338         .bind(tx.creditor_id)
    339         .bind(&tx.creditor_name)
    340         .bind_timestamp(&tx.valued_at);
    341         let query = match kind {
    342             TxOutKind::Simple => query
    343                 .bind(None::<&[u8]>)
    344                 .bind(None::<&str>)
    345                 .bind(None::<&str>)
    346                 .bind(None::<i64>),
    347             TxOutKind::Bounce(bounced) => query
    348                 .bind(None::<&[u8]>)
    349                 .bind(None::<&str>)
    350                 .bind(None::<&str>)
    351                 .bind(*bounced),
    352             TxOutKind::Talerable(subject) => query
    353                 .bind(&subject.wtid)
    354                 .bind(subject.exchange_base_url.as_str())
    355                 .bind(&subject.metadata)
    356                 .bind(None::<i64>),
    357         };
    358         query
    359             .bind_timestamp(now)
    360             .try_map(|r: PgRow| {
    361                 Ok(AddOutgoingResult {
    362                     result: r.try_get(0)?,
    363                     row_id: r.try_get(1)?,
    364                 })
    365             })
    366             .fetch_one(&mut *db)
    367     })
    368 }
    369 
    370 #[derive(Debug, PartialEq, Eq)]
    371 pub enum TransferResult {
    372     Success { id: u64, initiated_at: Timestamp },
    373     RequestUidReuse,
    374     WtidReuse,
    375 }
    376 
    377 #[derive(Debug, Clone)]
    378 pub struct Transfer {
    379     pub request_uid: HashCode,
    380     pub amount: Decimal,
    381     pub exchange_base_url: Url,
    382     pub metadata: Option<CompactString>,
    383     pub wtid: ShortHashCode,
    384     pub creditor_id: i64,
    385     pub creditor_name: CompactString,
    386 }
    387 
    388 pub async fn make_transfer(
    389     db: &PgPool,
    390     tx: &Transfer,
    391     now: &Timestamp,
    392 ) -> sqlx::Result<TransferResult> {
    393     let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref());
    394     serialized!(
    395         sqlx::query(
    396             "
    397                 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    398                 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9)
    399             ",
    400         )
    401         .bind(&tx.request_uid)
    402         .bind(&tx.wtid)
    403         .bind(&subject)
    404         .bind(tx.amount)
    405         .bind(tx.exchange_base_url.as_str())
    406         .bind(&tx.metadata)
    407         .bind(tx.creditor_id)
    408         .bind(&tx.creditor_name)
    409         .bind_timestamp(now)
    410         .try_map(|r: PgRow| {
    411             Ok(if r.try_get_flag(0)? {
    412                 TransferResult::RequestUidReuse
    413             } else if r.try_get_flag(1)? {
    414                 TransferResult::WtidReuse
    415             } else {
    416                 TransferResult::Success {
    417                     id: r.try_get_u64(2)?,
    418                     initiated_at: r.try_get_timestamp(3)?,
    419                 }
    420             })
    421         })
    422         .fetch_one(db)
    423     )
    424 }
    425 
    426 #[derive(Debug, PartialEq, Eq)]
    427 pub struct BounceResult {
    428     pub tx_id: u64,
    429     pub tx_new: bool,
    430 }
    431 
    432 pub async fn register_bounced_tx_in(
    433     db: &mut PgConnection,
    434     tx: &TxIn,
    435     chargeback_id: i64,
    436     reason: &str,
    437     now: &Timestamp,
    438 ) -> sqlx::Result<BounceResult> {
    439     serialized!(
    440         sqlx::query(
    441             "
    442                 SELECT out_tx_row_id, out_tx_new
    443                 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    444             ",
    445         )
    446         .bind(tx.transfer_id)
    447         .bind(tx.tx_id)
    448         .bind(tx.amount)
    449         .bind(&tx.subject)
    450         .bind(tx.debtor_id)
    451         .bind(&tx.debtor_name)
    452         .bind_timestamp(&tx.valued_at)
    453         .bind(chargeback_id)
    454         .bind(reason)
    455         .bind_timestamp(now)
    456         .try_map(|r: PgRow| {
    457             Ok(BounceResult {
    458                 tx_id: r.try_get_u64(0)?,
    459                 tx_new: r.try_get(1)?,
    460             })
    461         })
    462         .fetch_one(&mut *db)
    463     )
    464 }
    465 
    466 pub async fn transfer_page(
    467     db: &PgPool,
    468     status: &Option<TransferState>,
    469     currency: &Currency,
    470     root: &CompactString,
    471     params: &Page,
    472 ) -> sqlx::Result<Vec<TransferListStatus>> {
    473     page(
    474         db,
    475         params,
    476         "initiated_id",
    477         || {
    478             let mut builder = QueryBuilder::new(
    479                 "
    480                     SELECT
    481                         initiated_id,
    482                         status,
    483                         amount,
    484                         credit_account,
    485                         credit_name,
    486                         initiated_at
    487                     FROM transfer
    488                     JOIN initiated USING (initiated_id)
    489                     WHERE
    490                 ",
    491             );
    492             if let Some(status) = status {
    493                 builder.push(" status = ").push_bind(status).push(" AND ");
    494             }
    495             builder
    496         },
    497         |r: PgRow| {
    498             Ok(TransferListStatus {
    499                 row_id: r.try_get_u64(0)?,
    500                 status: r.try_get(1)?,
    501                 amount: r.try_get_amount(2, currency)?,
    502                 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    503                 timestamp: r.try_get_timestamp(5)?.into(),
    504             })
    505         },
    506     )
    507     .await
    508 }
    509 
    510 pub async fn outgoing_history(
    511     db: &PgPool,
    512     params: &History,
    513     currency: &Currency,
    514     root: &CompactString,
    515     listen: impl FnOnce() -> Receiver<i64>,
    516 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    517     history(
    518         db,
    519         "tx_out_id",
    520         params,
    521         listen,
    522         || {
    523             QueryBuilder::new(
    524                 "
    525                 SELECT
    526                     tx_out_id,
    527                     amount,
    528                     credit_account,
    529                     credit_name,
    530                     valued_at,
    531                     exchange_base_url,
    532                     metadata,
    533                     wtid
    534                 FROM taler_out
    535                 JOIN tx_out USING (tx_out_id)
    536                 WHERE
    537             ",
    538             )
    539         },
    540         |r: PgRow| {
    541             Ok(OutgoingBankTransaction {
    542                 row_id: r.try_get_u64(0)?,
    543                 amount: r.try_get_amount(1, currency)?,
    544                 debit_fee: None,
    545                 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?,
    546                 date: r.try_get_timestamp(4)?.into(),
    547                 exchange_base_url: r.try_get_url(5)?,
    548                 metadata: r.try_get(6)?,
    549                 wtid: r.try_get(7)?,
    550             })
    551         },
    552     )
    553     .await
    554 }
    555 
    556 pub async fn incoming_history(
    557     db: &PgPool,
    558     params: &History,
    559     currency: &Currency,
    560     root: &CompactString,
    561     listen: impl FnOnce() -> Receiver<i64>,
    562 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    563     history(
    564         db,
    565         "tx_in_id",
    566         params,
    567         listen,
    568         || {
    569             QueryBuilder::new(
    570                 "
    571                 SELECT
    572                     type,
    573                     tx_in_id,
    574                     amount,
    575                     debit_account,
    576                     debit_name,
    577                     valued_at,
    578                     metadata,
    579                     authorization_pub,
    580                     authorization_sig
    581                 FROM taler_in
    582                 JOIN tx_in USING (tx_in_id)
    583                 WHERE
    584             ",
    585             )
    586         },
    587         |r: PgRow| {
    588             Ok(match r.try_get(0)? {
    589                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    590                     row_id: r.try_get_u64(1)?,
    591                     amount: r.try_get_amount(2, currency)?,
    592                     credit_fee: None,
    593                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    594                     date: r.try_get_timestamp(5)?.into(),
    595                     reserve_pub: r.try_get(6)?,
    596                     authorization_pub: r.try_get(7)?,
    597                     authorization_sig: r.try_get(8)?,
    598                 },
    599                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    600                     row_id: r.try_get_u64(1)?,
    601                     amount: r.try_get_amount(2, currency)?,
    602                     credit_fee: None,
    603                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    604                     date: r.try_get_timestamp(5)?.into(),
    605                     account_pub: r.try_get(6)?,
    606                     authorization_pub: r.try_get(7)?,
    607                     authorization_sig: r.try_get(8)?,
    608                 },
    609                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    610             })
    611         },
    612     )
    613     .await
    614 }
    615 
    616 pub async fn revenue_history(
    617     db: &PgPool,
    618     params: &History,
    619     currency: &Currency,
    620     root: &CompactString,
    621     listen: impl FnOnce() -> Receiver<i64>,
    622 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    623     history(
    624         db,
    625         "tx_in_id",
    626         params,
    627         listen,
    628         || {
    629             QueryBuilder::new(
    630                 "
    631                 SELECT
    632                     tx_in_id,
    633                     valued_at,
    634                     amount,
    635                     debit_account,
    636                     debit_name,
    637                     subject
    638                 FROM tx_in
    639                 WHERE
    640             ",
    641             )
    642         },
    643         |r: PgRow| {
    644             Ok(RevenueIncomingBankTransaction {
    645                 row_id: r.try_get_u64(0)?,
    646                 date: r.try_get_timestamp(1)?.into(),
    647                 amount: r.try_get_amount(2, currency)?,
    648                 credit_fee: None,
    649                 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    650                 subject: r.try_get(5)?,
    651             })
    652         },
    653     )
    654     .await
    655 }
    656 
    657 pub async fn transfer_by_id(
    658     db: &PgPool,
    659     id: u64,
    660     currency: &Currency,
    661     root: &CompactString,
    662 ) -> sqlx::Result<Option<TransferStatus>> {
    663     serialized!(
    664         sqlx::query(
    665             "
    666                 SELECT
    667                     status,
    668                     status_msg,
    669                     amount,
    670                     exchange_base_url,
    671                     metadata,
    672                     wtid,
    673                     credit_account,
    674                     credit_name,
    675                     initiated_at
    676                 FROM transfer
    677                 JOIN initiated USING (initiated_id)
    678                 WHERE initiated_id = $1
    679             ",
    680         )
    681         .bind(id as i64)
    682         .try_map(|r: PgRow| {
    683             Ok(TransferStatus {
    684                 status: r.try_get(0)?,
    685                 status_msg: r.try_get(1)?,
    686                 amount: r.try_get_amount(2, currency)?,
    687                 origin_exchange_url: r.try_get(3)?,
    688                 metadata: r.try_get(4)?,
    689                 wtid: r.try_get(5)?,
    690                 credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?,
    691                 timestamp: r.try_get_timestamp(8)?.into(),
    692             })
    693         })
    694         .fetch_optional(db)
    695     )
    696 }
    697 
    698 /** Get a batch of pending initiated transactions not attempted since [start] */
    699 pub async fn pending_batch(
    700     db: &mut PgConnection,
    701     start: &Timestamp,
    702 ) -> sqlx::Result<Vec<Initiated>> {
    703     serialized!(
    704         sqlx::query(
    705             "
    706             SELECT initiated_id, amount, subject, credit_account, credit_name
    707             FROM initiated
    708             WHERE tx_id IS NULL
    709                 AND status='pending'
    710                 AND (last_submitted IS NULL OR last_submitted < $1)
    711             LIMIT 100
    712         ",
    713         )
    714         .bind_timestamp(start)
    715         .try_map(|r: PgRow| {
    716             Ok(Initiated {
    717                 id: r.try_get(0)?,
    718                 amount: r.try_get(1)?,
    719                 subject: r.try_get(2)?,
    720                 creditor_id: r.try_get(3)?,
    721                 creditor_name: r.try_get(4)?,
    722             })
    723         })
    724         .fetch_all(&mut *db)
    725     )
    726 }
    727 
    728 /** Update status of a successful submitted initiated transaction */
    729 pub async fn initiated_submit_success(
    730     db: &mut PgConnection,
    731     initiated_id: i64,
    732     timestamp: &Timestamp,
    733     tx_id: i64,
    734 ) -> sqlx::Result<()> {
    735     serialized!(
    736         sqlx::query(
    737             "
    738                 UPDATE initiated
    739                 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2
    740                 WHERE initiated_id=$3
    741             "
    742         )
    743         .bind_timestamp(timestamp)
    744         .bind(tx_id)
    745         .bind(initiated_id)
    746         .execute(&mut *db)
    747     )?;
    748     Ok(())
    749 }
    750 
    751 /** Update status of a permanently failed initiated transaction */
    752 pub async fn initiated_submit_permanent_failure(
    753     db: &mut PgConnection,
    754     initiated_id: i64,
    755     msg: &str,
    756 ) -> sqlx::Result<()> {
    757     serialized!(
    758         sqlx::query(
    759             "
    760                 UPDATE initiated
    761                 SET status='permanent_failure', status_msg=$1
    762                 WHERE initiated_id=$2
    763             ",
    764         )
    765         .bind(msg)
    766         .bind(initiated_id)
    767         .execute(&mut *db)
    768     )?;
    769     Ok(())
    770 }
    771 
    772 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    773 pub enum ChargebackFailureResult {
    774     Unknown,
    775     Known(u64),
    776     Idempotent(u64),
    777 }
    778 
    779 /** Update status of a charged back initiated transaction */
    780 pub async fn initiated_chargeback_failure(
    781     db: &mut PgConnection,
    782     transfer_id: i64,
    783 ) -> sqlx::Result<ChargebackFailureResult> {
    784     Ok(serialized!(
    785         sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)")
    786             .bind(transfer_id)
    787             .try_map(|r: PgRow| {
    788                 let id = r.try_get_u64(0)?;
    789                 Ok(if id == 0 {
    790                     ChargebackFailureResult::Unknown
    791                 } else if r.try_get(1)? {
    792                     ChargebackFailureResult::Known(id)
    793                 } else {
    794                     ChargebackFailureResult::Idempotent(id)
    795                 })
    796             })
    797             .fetch_optional(&mut *db)
    798     )?
    799     .unwrap_or(ChargebackFailureResult::Unknown))
    800 }
    801 
    802 /** Get JSON value from KV table */
    803 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>(
    804     db: &mut PgConnection,
    805     key: &str,
    806 ) -> sqlx::Result<Option<T>> {
    807     serialized!(
    808         sqlx::query("SELECT value FROM kv WHERE key=$1")
    809             .bind(key)
    810             .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    811             .fetch_optional(&mut *db)
    812     )
    813 }
    814 
    815 /** Set JSON value in KV table */
    816 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> {
    817     serialized!(
    818         sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    819             .bind(key)
    820             .bind(sqlx::types::Json(value))
    821             .execute(&mut *db)
    822     )?;
    823     Ok(())
    824 }
    825 
    826 pub enum RegistrationResult {
    827     Success,
    828     ReservePubReuse,
    829 }
    830 
    831 pub async fn transfer_register(
    832     db: &PgPool,
    833     req: &RegistrationRequest,
    834 ) -> sqlx::Result<RegistrationResult> {
    835     let ty: IncomingType = req.r#type.into();
    836     serialized!(
    837         sqlx::query(
    838             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    839         )
    840         .bind(ty)
    841         .bind(&req.account_pub)
    842         .bind(&req.authorization_pub)
    843         .bind(&req.authorization_sig)
    844         .bind(req.recurrent)
    845         .bind_timestamp(&Timestamp::now())
    846         .try_map(|r: PgRow| {
    847             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    848                 RegistrationResult::ReservePubReuse
    849             } else {
    850                 RegistrationResult::Success
    851             })
    852         })
    853         .fetch_one(db)
    854     )
    855 }
    856 
    857 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    858     serialized!(
    859         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)")
    860             .bind(&req.authorization_pub)
    861             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    862             .fetch_one(db)
    863     )
    864 }
    865 
    866 pub trait CyclosTypeHelper {
    867     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    868         &self,
    869         idx: I,
    870         name: I,
    871         root: &CompactString,
    872     ) -> sqlx::Result<FullCyclosPayto>;
    873     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    874         &self,
    875         idx: I,
    876         name: I,
    877         root: &CompactString,
    878     ) -> sqlx::Result<PaytoURI>;
    879 }
    880 
    881 impl CyclosTypeHelper for PgRow {
    882     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    883         &self,
    884         idx: I,
    885         name: I,
    886         root: &CompactString,
    887     ) -> sqlx::Result<FullCyclosPayto> {
    888         let idx = self.try_get(idx)?;
    889         let name = self.try_get(name)?;
    890         Ok(FullCyclosPayto::new(
    891             CyclosAccount {
    892                 id: CyclosId(idx),
    893                 root: root.clone(),
    894             },
    895             name,
    896         ))
    897     }
    898     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    899         &self,
    900         idx: I,
    901         name: I,
    902         root: &CompactString,
    903     ) -> sqlx::Result<PaytoURI> {
    904         let idx = self.try_get(idx)?;
    905         let name = self.try_get(name)?;
    906         Ok(CyclosAccount {
    907             id: CyclosId(idx),
    908             root: root.clone(),
    909         }
    910         .as_full_uri(name))
    911     }
    912 }
    913 
    914 #[cfg(test)]
    915 mod test {
    916     use compact_str::CompactString;
    917     use jiff::{Span, Timestamp};
    918     use serde_json::json;
    919     use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow};
    920     use taler_api::{
    921         db::TypeHelper,
    922         notification::dummy_listen,
    923         subject::{IncomingSubject, OutgoingSubject},
    924     };
    925     use taler_common::{
    926         api::{
    927             EddsaPublicKey, HashCode, ShortHashCode,
    928             params::{History, Page},
    929             wire::TransferState,
    930         },
    931         types::{
    932             amount::{Currency, decimal},
    933             url,
    934             utils::now_sql_stable_ts,
    935         },
    936     };
    937 
    938     use crate::{
    939         constants::CONFIG_SOURCE,
    940         db::{
    941             self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult,
    942             Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind,
    943         },
    944     };
    945 
    946     pub const CURR: Currency = Currency::TEST;
    947     pub const ROOT: CompactString = CompactString::const_new("localhost");
    948 
    949     async fn setup() -> (PoolConnection<Postgres>, PgPool) {
    950         taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await
    951     }
    952 
    953     #[tokio::test]
    954     async fn kv() {
    955         let (mut db, _) = setup().await;
    956 
    957         let value = json!({
    958             "name": "Mr Smith",
    959             "no way": 32
    960         });
    961 
    962         assert_eq!(
    963             db::kv_get::<serde_json::Value>(&mut db, "value")
    964                 .await
    965                 .unwrap(),
    966             None
    967         );
    968         db::kv_set(&mut db, "value", &value).await.unwrap();
    969         db::kv_set(&mut db, "value", &value).await.unwrap();
    970         assert_eq!(
    971             db::kv_get::<serde_json::Value>(&mut db, "value")
    972                 .await
    973                 .unwrap(),
    974             Some(value)
    975         );
    976     }
    977 
    978     #[tokio::test]
    979     async fn tx_in() {
    980         let (mut db, pool) = setup().await;
    981 
    982         let mut routine = async |first: &Option<IncomingSubject>,
    983                                  second: &Option<IncomingSubject>| {
    984             let id = sqlx::query("SELECT count(*) + 1 FROM tx_in")
    985                 .try_map(|r: PgRow| r.try_get_u64(0))
    986                 .fetch_one(&mut *db)
    987                 .await
    988                 .unwrap();
    989             let now = now_sql_stable_ts();
    990             let later = now + Span::new().hours(2);
    991             let tx = TxIn {
    992                 transfer_id: now.as_microsecond() as i64,
    993                 tx_id: None,
    994                 amount: decimal("10"),
    995                 subject: "subject".to_owned(),
    996                 debtor_id: 31000163100000000,
    997                 debtor_name: "Name".into(),
    998                 valued_at: now,
    999             };
   1000             // Insert
   1001             assert_eq!(
   1002                 db::register_tx_in(&mut db, &tx, first, &now)
   1003                     .await
   1004                     .expect("register tx in"),
   1005                 AddIncomingResult::Success {
   1006                     new: true,
   1007                     pending: false,
   1008                     row_id: id,
   1009                     valued_at: now,
   1010                 }
   1011             );
   1012             // Idempotent
   1013             assert_eq!(
   1014                 db::register_tx_in(
   1015                     &mut db,
   1016                     &TxIn {
   1017                         valued_at: later,
   1018                         ..tx.clone()
   1019                     },
   1020                     first,
   1021                     &now
   1022                 )
   1023                 .await
   1024                 .expect("register tx in"),
   1025                 AddIncomingResult::Success {
   1026                     new: false,
   1027                     pending: false,
   1028                     row_id: id,
   1029                     valued_at: now
   1030                 }
   1031             );
   1032             // Many
   1033             assert_eq!(
   1034                 db::register_tx_in(
   1035                     &mut db,
   1036                     &TxIn {
   1037                         transfer_id: later.as_microsecond() as i64,
   1038                         valued_at: later,
   1039                         ..tx
   1040                     },
   1041                     second,
   1042                     &now
   1043                 )
   1044                 .await
   1045                 .expect("register tx in"),
   1046                 AddIncomingResult::Success {
   1047                     new: true,
   1048                     pending: false,
   1049                     row_id: id + 1,
   1050                     valued_at: later
   1051                 }
   1052             );
   1053         };
   1054 
   1055         // Empty db
   1056         assert_eq!(
   1057             db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1058                 .await
   1059                 .unwrap(),
   1060             Vec::new()
   1061         );
   1062         assert_eq!(
   1063             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1064                 .await
   1065                 .unwrap(),
   1066             Vec::new()
   1067         );
   1068 
   1069         // Regular transaction
   1070         routine(&None, &None).await;
   1071 
   1072         // Reserve transaction
   1073         routine(
   1074             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1075             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1076         )
   1077         .await;
   1078 
   1079         // Kyc transaction
   1080         routine(
   1081             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1082             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1083         )
   1084         .await;
   1085 
   1086         // History
   1087         assert_eq!(
   1088             db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1089                 .await
   1090                 .unwrap()
   1091                 .len(),
   1092             6
   1093         );
   1094         assert_eq!(
   1095             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1096                 .await
   1097                 .unwrap()
   1098                 .len(),
   1099             4
   1100         );
   1101     }
   1102 
   1103     #[tokio::test]
   1104     async fn tx_in_admin() {
   1105         let (_, pool) = setup().await;
   1106 
   1107         // Empty db
   1108         assert_eq!(
   1109             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1110                 .await
   1111                 .unwrap(),
   1112             Vec::new()
   1113         );
   1114 
   1115         let now = now_sql_stable_ts();
   1116         let later = now + Span::new().hours(2);
   1117         let tx = TxInAdmin {
   1118             amount: decimal("10"),
   1119             subject: "subject".to_owned(),
   1120             debtor_id: 31000163100000000,
   1121             debtor_name: "Name".into(),
   1122             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1123         };
   1124         // Insert
   1125         assert_eq!(
   1126             db::register_tx_in_admin(&pool, &tx, &now)
   1127                 .await
   1128                 .expect("register tx in"),
   1129             AddIncomingResult::Success {
   1130                 new: true,
   1131                 pending: false,
   1132                 row_id: 1,
   1133                 valued_at: now
   1134             }
   1135         );
   1136         // Many
   1137         assert_eq!(
   1138             db::register_tx_in_admin(
   1139                 &pool,
   1140                 &TxInAdmin {
   1141                     subject: "Other".to_owned(),
   1142                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1143                     ..tx.clone()
   1144                 },
   1145                 &later
   1146             )
   1147             .await
   1148             .expect("register tx in"),
   1149             AddIncomingResult::Success {
   1150                 new: true,
   1151                 pending: false,
   1152                 row_id: 2,
   1153                 valued_at: later
   1154             }
   1155         );
   1156 
   1157         // History
   1158         assert_eq!(
   1159             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1160                 .await
   1161                 .unwrap()
   1162                 .len(),
   1163             2
   1164         );
   1165     }
   1166 
   1167     #[tokio::test]
   1168     async fn tx_out() {
   1169         let (mut db, pool) = setup().await;
   1170 
   1171         let mut routine = async |first: &TxOutKind, second: &TxOutKind| {
   1172             let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out")
   1173                 .try_map(|r: PgRow| r.try_get(0))
   1174                 .fetch_one(&mut *db)
   1175                 .await
   1176                 .unwrap();
   1177             let now = now_sql_stable_ts();
   1178             let later = now + Span::new().hours(2);
   1179             let tx = TxOut {
   1180                 transfer_id,
   1181                 tx_id: Some(transfer_id),
   1182                 amount: decimal("10"),
   1183                 subject: "subject".to_owned(),
   1184                 creditor_id: 31000163100000000,
   1185                 creditor_name: "Name".into(),
   1186                 valued_at: now,
   1187             };
   1188             assert!(matches!(
   1189                 db::make_transfer(
   1190                     &pool,
   1191                     &Transfer {
   1192                         request_uid: HashCode::rand(),
   1193                         amount: decimal("10"),
   1194                         exchange_base_url: url("https://exchange.test.com/"),
   1195                         metadata: None,
   1196                         wtid: ShortHashCode::rand(),
   1197                         creditor_id: 31000163100000000,
   1198                         creditor_name: "Name".into()
   1199                     },
   1200                     &now
   1201                 )
   1202                 .await
   1203                 .unwrap(),
   1204                 TransferResult::Success { .. }
   1205             ));
   1206             db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id)
   1207                 .await
   1208                 .expect("status success");
   1209 
   1210             // Insert
   1211             assert_eq!(
   1212                 db::register_tx_out(&mut db, &tx, first, &now)
   1213                     .await
   1214                     .expect("register tx out"),
   1215                 AddOutgoingResult {
   1216                     result: db::RegisterResult::known,
   1217                     row_id: transfer_id,
   1218                 }
   1219             );
   1220             // Idempotent
   1221             assert_eq!(
   1222                 db::register_tx_out(
   1223                     &mut db,
   1224                     &TxOut {
   1225                         valued_at: later,
   1226                         ..tx.clone()
   1227                     },
   1228                     first,
   1229                     &now
   1230                 )
   1231                 .await
   1232                 .expect("register tx out"),
   1233                 AddOutgoingResult {
   1234                     result: db::RegisterResult::idempotent,
   1235                     row_id: transfer_id,
   1236                 }
   1237             );
   1238             // Recovered
   1239             assert_eq!(
   1240                 db::register_tx_out(
   1241                     &mut db,
   1242                     &TxOut {
   1243                         transfer_id: transfer_id + 1,
   1244                         tx_id: Some(transfer_id + 1),
   1245                         valued_at: later,
   1246                         ..tx.clone()
   1247                     },
   1248                     second,
   1249                     &now
   1250                 )
   1251                 .await
   1252                 .expect("register tx out"),
   1253                 AddOutgoingResult {
   1254                     result: db::RegisterResult::recovered,
   1255                     row_id: transfer_id + 1,
   1256                 }
   1257             );
   1258         };
   1259 
   1260         // Empty db
   1261         assert_eq!(
   1262             db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1263                 .await
   1264                 .unwrap(),
   1265             Vec::new()
   1266         );
   1267 
   1268         // Regular transaction
   1269         routine(&TxOutKind::Simple, &TxOutKind::Simple).await;
   1270 
   1271         // Talerable transaction
   1272         routine(
   1273             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1274             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1275         )
   1276         .await;
   1277 
   1278         // Bounced transaction
   1279         routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1280 
   1281         // History
   1282         assert_eq!(
   1283             db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1284                 .await
   1285                 .unwrap()
   1286                 .len(),
   1287             2
   1288         );
   1289     }
   1290 
   1291     // TODO tx out failure
   1292 
   1293     #[tokio::test]
   1294     async fn transfer() {
   1295         let (_, pool) = setup().await;
   1296 
   1297         // Empty db
   1298         assert_eq!(
   1299             db::transfer_by_id(&pool, 0, &CURR, &ROOT).await.unwrap(),
   1300             None
   1301         );
   1302         assert_eq!(
   1303             db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default())
   1304                 .await
   1305                 .unwrap(),
   1306             Vec::new()
   1307         );
   1308 
   1309         let req = Transfer {
   1310             request_uid: HashCode::rand(),
   1311             amount: decimal("10"),
   1312             exchange_base_url: url("https://exchange.test.com/"),
   1313             metadata: None,
   1314             wtid: ShortHashCode::rand(),
   1315             creditor_id: 31000163100000000,
   1316             creditor_name: "Name".into(),
   1317         };
   1318         let now = now_sql_stable_ts();
   1319         let later = now + Span::new().hours(2);
   1320         // Insert
   1321         assert_eq!(
   1322             db::make_transfer(&pool, &req, &now)
   1323                 .await
   1324                 .expect("transfer"),
   1325             TransferResult::Success {
   1326                 id: 1,
   1327                 initiated_at: now
   1328             }
   1329         );
   1330         // Idempotent
   1331         assert_eq!(
   1332             db::make_transfer(&pool, &req, &later)
   1333                 .await
   1334                 .expect("transfer"),
   1335             TransferResult::Success {
   1336                 id: 1,
   1337                 initiated_at: now
   1338             }
   1339         );
   1340         // Request UID reuse
   1341         assert_eq!(
   1342             db::make_transfer(
   1343                 &pool,
   1344                 &Transfer {
   1345                     wtid: ShortHashCode::rand(),
   1346                     ..req.clone()
   1347                 },
   1348                 &now
   1349             )
   1350             .await
   1351             .expect("transfer"),
   1352             TransferResult::RequestUidReuse
   1353         );
   1354         // wtid reuse
   1355         assert_eq!(
   1356             db::make_transfer(
   1357                 &pool,
   1358                 &Transfer {
   1359                     request_uid: HashCode::rand(),
   1360                     ..req.clone()
   1361                 },
   1362                 &now
   1363             )
   1364             .await
   1365             .expect("transfer"),
   1366             TransferResult::WtidReuse
   1367         );
   1368         // Many
   1369         assert_eq!(
   1370             db::make_transfer(
   1371                 &pool,
   1372                 &Transfer {
   1373                     request_uid: HashCode::rand(),
   1374                     wtid: ShortHashCode::rand(),
   1375                     ..req
   1376                 },
   1377                 &later
   1378             )
   1379             .await
   1380             .expect("transfer"),
   1381             TransferResult::Success {
   1382                 id: 2,
   1383                 initiated_at: later
   1384             }
   1385         );
   1386 
   1387         // Get
   1388         assert!(
   1389             db::transfer_by_id(&pool, 1, &CURR, &ROOT)
   1390                 .await
   1391                 .unwrap()
   1392                 .is_some()
   1393         );
   1394         assert!(
   1395             db::transfer_by_id(&pool, 2, &CURR, &ROOT)
   1396                 .await
   1397                 .unwrap()
   1398                 .is_some()
   1399         );
   1400         assert!(
   1401             db::transfer_by_id(&pool, 3, &CURR, &ROOT)
   1402                 .await
   1403                 .unwrap()
   1404                 .is_none()
   1405         );
   1406         assert_eq!(
   1407             db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default())
   1408                 .await
   1409                 .unwrap()
   1410                 .len(),
   1411             2
   1412         );
   1413     }
   1414 
   1415     #[tokio::test]
   1416     async fn bounce() {
   1417         let (mut db, _) = setup().await;
   1418 
   1419         let amount = decimal("10");
   1420         let now = now_sql_stable_ts();
   1421 
   1422         // Bounce
   1423         assert_eq!(
   1424             db::register_bounced_tx_in(
   1425                 &mut db,
   1426                 &TxIn {
   1427                     transfer_id: 12,
   1428                     tx_id: None,
   1429                     amount,
   1430                     subject: "subject".to_owned(),
   1431                     debtor_id: 31000163100000000,
   1432                     debtor_name: "Name".into(),
   1433                     valued_at: now
   1434                 },
   1435                 22,
   1436                 "good reason",
   1437                 &now
   1438             )
   1439             .await
   1440             .expect("bounce"),
   1441             BounceResult {
   1442                 tx_id: 1,
   1443                 tx_new: true
   1444             }
   1445         );
   1446         // Idempotent
   1447         assert_eq!(
   1448             db::register_bounced_tx_in(
   1449                 &mut db,
   1450                 &TxIn {
   1451                     transfer_id: 12,
   1452                     tx_id: None,
   1453                     amount,
   1454                     subject: "subject".to_owned(),
   1455                     debtor_id: 31000163100000000,
   1456                     debtor_name: "Name".into(),
   1457                     valued_at: now
   1458                 },
   1459                 22,
   1460                 "good reason",
   1461                 &now
   1462             )
   1463             .await
   1464             .expect("bounce"),
   1465             BounceResult {
   1466                 tx_id: 1,
   1467                 tx_new: false
   1468             }
   1469         );
   1470 
   1471         // Many
   1472         assert_eq!(
   1473             db::register_bounced_tx_in(
   1474                 &mut db,
   1475                 &TxIn {
   1476                     transfer_id: 13,
   1477                     tx_id: None,
   1478                     amount,
   1479                     subject: "subject".to_owned(),
   1480                     debtor_id: 31000163100000000,
   1481                     debtor_name: "Name".into(),
   1482                     valued_at: now
   1483                 },
   1484                 23,
   1485                 "good reason",
   1486                 &now
   1487             )
   1488             .await
   1489             .expect("bounce"),
   1490             BounceResult {
   1491                 tx_id: 2,
   1492                 tx_new: true
   1493             }
   1494         );
   1495     }
   1496 
   1497     #[tokio::test]
   1498     async fn status() {
   1499         let (mut db, pool) = setup().await;
   1500 
   1501         let check_status = async |id: u64, status: TransferState, msg: Option<&str>| {
   1502             let transfer = db::transfer_by_id(&pool, id, &CURR, &ROOT)
   1503                 .await
   1504                 .unwrap()
   1505                 .unwrap();
   1506             assert_eq!(
   1507                 (status, msg),
   1508                 (transfer.status, transfer.status_msg.as_deref())
   1509             );
   1510         };
   1511 
   1512         // Unknown transfer
   1513         db::initiated_submit_permanent_failure(&mut db, 1, "msg")
   1514             .await
   1515             .unwrap();
   1516         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1517             .await
   1518             .unwrap();
   1519         assert_eq!(
   1520             db::initiated_chargeback_failure(&mut db, 1).await.unwrap(),
   1521             ChargebackFailureResult::Unknown
   1522         );
   1523 
   1524         // Failure
   1525         db::make_transfer(
   1526             &pool,
   1527             &Transfer {
   1528                 request_uid: HashCode::rand(),
   1529                 amount: decimal("1"),
   1530                 exchange_base_url: url("https://exchange.test.com/"),
   1531                 metadata: None,
   1532                 wtid: ShortHashCode::rand(),
   1533                 creditor_id: 31000163100000000,
   1534                 creditor_name: "Name".into(),
   1535             },
   1536             &Timestamp::now(),
   1537         )
   1538         .await
   1539         .expect("transfer");
   1540         check_status(1, TransferState::pending, None).await;
   1541         db::initiated_submit_permanent_failure(&mut db, 1, "error status")
   1542             .await
   1543             .unwrap();
   1544         check_status(1, TransferState::permanent_failure, Some("error status")).await;
   1545 
   1546         // Success
   1547         db::make_transfer(
   1548             &pool,
   1549             &Transfer {
   1550                 request_uid: HashCode::rand(),
   1551                 amount: decimal("1"),
   1552                 exchange_base_url: url("https://exchange.test.com/"),
   1553                 metadata: None,
   1554                 wtid: ShortHashCode::rand(),
   1555                 creditor_id: 31000163100000000,
   1556                 creditor_name: "Name".into(),
   1557             },
   1558             &Timestamp::now(),
   1559         )
   1560         .await
   1561         .expect("transfer");
   1562         check_status(2, TransferState::pending, None).await;
   1563         db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3)
   1564             .await
   1565             .unwrap();
   1566         check_status(2, TransferState::pending, None).await;
   1567         db::register_tx_out(
   1568             &mut db,
   1569             &TxOut {
   1570                 transfer_id: 5,
   1571                 tx_id: Some(3),
   1572                 amount: decimal("2"),
   1573                 subject: "".to_string(),
   1574                 creditor_id: 31000163100000000,
   1575                 creditor_name: "Name".into(),
   1576                 valued_at: Timestamp::now(),
   1577             },
   1578             &TxOutKind::Simple,
   1579             &Timestamp::now(),
   1580         )
   1581         .await
   1582         .unwrap();
   1583         check_status(2, TransferState::success, None).await;
   1584 
   1585         // Chargeback
   1586         assert_eq!(
   1587             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1588             ChargebackFailureResult::Known(2)
   1589         );
   1590         check_status(2, TransferState::late_failure, Some("charged back")).await;
   1591         assert_eq!(
   1592             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1593             ChargebackFailureResult::Idempotent(2)
   1594         );
   1595     }
   1596 
   1597     #[tokio::test]
   1598     async fn batch() {
   1599         let (mut db, pool) = setup().await;
   1600         let start = Timestamp::now();
   1601 
   1602         // Empty db
   1603         let pendings = db::pending_batch(&mut db, &start)
   1604             .await
   1605             .expect("pending_batch");
   1606         assert_eq!(pendings.len(), 0);
   1607 
   1608         // Some transfers
   1609         for i in 0..3 {
   1610             db::make_transfer(
   1611                 &pool,
   1612                 &Transfer {
   1613                     request_uid: HashCode::rand(),
   1614                     amount: decimal(format!("{}", i + 1)),
   1615                     exchange_base_url: url("https://exchange.test.com/"),
   1616                     metadata: None,
   1617                     wtid: ShortHashCode::rand(),
   1618                     creditor_id: 31000163100000000,
   1619                     creditor_name: "Name".into(),
   1620                 },
   1621                 &Timestamp::now(),
   1622             )
   1623             .await
   1624             .expect("transfer");
   1625         }
   1626         let pendings = db::pending_batch(&mut db, &start)
   1627             .await
   1628             .expect("pending_batch");
   1629         assert_eq!(pendings.len(), 3);
   1630 
   1631         // Max 100 txs in batch
   1632         for i in 0..100 {
   1633             db::make_transfer(
   1634                 &pool,
   1635                 &Transfer {
   1636                     request_uid: HashCode::rand(),
   1637                     amount: decimal(format!("{}", i + 1)),
   1638                     exchange_base_url: url("https://exchange.test.com/"),
   1639                     metadata: None,
   1640                     wtid: ShortHashCode::rand(),
   1641                     creditor_id: 31000163100000000,
   1642                     creditor_name: "Name".into(),
   1643                 },
   1644                 &Timestamp::now(),
   1645             )
   1646             .await
   1647             .expect("transfer");
   1648         }
   1649         let pendings = db::pending_batch(&mut db, &start)
   1650             .await
   1651             .expect("pending_batch");
   1652         assert_eq!(pendings.len(), 100);
   1653 
   1654         // Skip uploaded
   1655         for i in 0..=10 {
   1656             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1657                 .await
   1658                 .expect("status success");
   1659         }
   1660         let pendings = db::pending_batch(&mut db, &start)
   1661             .await
   1662             .expect("pending_batch");
   1663         assert_eq!(pendings.len(), 93);
   1664 
   1665         // Skip failed
   1666         for i in 0..=10 {
   1667             db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure")
   1668                 .await
   1669                 .expect("status failure");
   1670         }
   1671         let pendings = db::pending_batch(&mut db, &start)
   1672             .await
   1673             .expect("pending_batch");
   1674         assert_eq!(pendings.len(), 83);
   1675     }
   1676 }