taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (49068B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use compact_str::CompactString;
     20 use jiff::Timestamp;
     21 use serde::{Serialize, de::DeserializeOwned};
     22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow};
     23 use taler_api::{
     24     db::{BindHelper, TypeHelper, history, page},
     25     serialized,
     26     subject::{IncomingSubject, OutgoingSubject, fmt_out_subject},
     27 };
     28 use taler_common::{
     29     api_common::{HashCode, ShortHashCode},
     30     api_params::{History, Page},
     31     api_revenue::RevenueIncomingBankTransaction,
     32     api_transfer::{RegistrationRequest, Unregistration},
     33     api_wire::{
     34         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState,
     35         TransferStatus,
     36     },
     37     config::Config,
     38     db::IncomingType,
     39     types::{
     40         amount::{Currency, Decimal},
     41         payto::{PaytoImpl as _, PaytoURI},
     42     },
     43 };
     44 use tokio::sync::watch::{Receiver, Sender};
     45 use url::Url;
     46 
     47 use crate::{
     48     config::parse_db_cfg,
     49     payto::{CyclosAccount, CyclosId, FullCyclosPayto},
     50 };
     51 
     52 const SCHEMA: &str = "cyclos";
     53 
     54 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> {
     55     let db = parse_db_cfg(cfg)?;
     56     let pool = taler_common::db::pool(db.cfg, SCHEMA).await?;
     57     Ok(pool)
     58 }
     59 
     60 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> {
     61     let db_cfg = parse_db_cfg(cfg)?;
     62     let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?;
     63     let mut db = pool.acquire().await?;
     64     taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?;
     65     Ok(pool)
     66 }
     67 
     68 pub async fn notification_listener(
     69     pool: PgPool,
     70     in_channel: Sender<i64>,
     71     taler_in_channel: Sender<i64>,
     72     out_channel: Sender<i64>,
     73     taler_out_channel: Sender<i64>,
     74 ) -> sqlx::Result<()> {
     75     taler_api::notification::notification_listener!(&pool,
     76         "tx_in" => (row_id: i64) {
     77             in_channel.send_replace(row_id);
     78         },
     79         "taler_in" => (row_id: i64) {
     80             taler_in_channel.send_replace(row_id);
     81         },
     82         "tx_out" => (row_id: i64) {
     83             out_channel.send_replace(row_id);
     84         },
     85         "taler_out" => (row_id: i64) {
     86             taler_out_channel.send_replace(row_id);
     87         }
     88     )
     89 }
     90 
     91 #[derive(Debug, Clone)]
     92 pub struct TxIn {
     93     pub transfer_id: i64,
     94     pub tx_id: Option<i64>,
     95     pub amount: Decimal,
     96     pub subject: String,
     97     pub debtor_id: i64,
     98     pub debtor_name: CompactString,
     99     pub valued_at: Timestamp,
    100 }
    101 
    102 impl Display for TxIn {
    103     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    104         let Self {
    105             transfer_id,
    106             tx_id,
    107             amount,
    108             subject,
    109             valued_at,
    110             debtor_id,
    111             debtor_name,
    112         } = self;
    113         let tx_id = match tx_id {
    114             Some(id) => format_args!(":{}", *id),
    115             None => format_args!(""),
    116         };
    117         write!(
    118             f,
    119             "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'"
    120         )
    121     }
    122 }
    123 
    124 #[derive(Debug, Clone)]
    125 pub struct TxOut {
    126     pub transfer_id: i64,
    127     pub tx_id: Option<i64>,
    128     pub amount: Decimal,
    129     pub subject: String,
    130     pub creditor_id: i64,
    131     pub creditor_name: CompactString,
    132     pub valued_at: Timestamp,
    133 }
    134 
    135 impl Display for TxOut {
    136     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    137         let Self {
    138             transfer_id,
    139             tx_id,
    140             amount,
    141             subject,
    142             creditor_id,
    143             creditor_name,
    144             valued_at,
    145         } = self;
    146         let tx_id = match tx_id {
    147             Some(id) => format_args!(":{}", *id),
    148             None => format_args!(""),
    149         };
    150         write!(
    151             f,
    152             "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    153         )
    154     }
    155 }
    156 
    157 #[derive(Debug, PartialEq, Eq)]
    158 pub struct Initiated {
    159     pub id: i64,
    160     pub amount: Decimal,
    161     pub subject: String,
    162     pub creditor_id: i64,
    163     pub creditor_name: CompactString,
    164 }
    165 
    166 impl Display for Initiated {
    167     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    168         let Self {
    169             id,
    170             amount,
    171             subject,
    172             creditor_id,
    173             creditor_name,
    174         } = self;
    175         write!(
    176             f,
    177             "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    178         )
    179     }
    180 }
    181 
    182 #[derive(Debug, Clone)]
    183 pub struct TxInAdmin {
    184     pub amount: Decimal,
    185     pub subject: String,
    186     pub debtor_id: i64,
    187     pub debtor_name: CompactString,
    188     pub metadata: IncomingSubject,
    189 }
    190 
    191 #[derive(Debug, PartialEq, Eq)]
    192 pub enum AddIncomingResult {
    193     Success {
    194         new: bool,
    195         pending: bool,
    196         row_id: u64,
    197         valued_at: Timestamp,
    198     },
    199     ReservePubReuse,
    200     UnknownMapping,
    201     MappingReuse,
    202 }
    203 
    204 /// Lock the database for worker execution
    205 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
    206     sqlx::query("SELECT pg_try_advisory_lock(42)")
    207         .try_map(|r: PgRow| r.try_get(0))
    208         .fetch_one(e)
    209         .await
    210 }
    211 
    212 pub async fn register_tx_in_admin(
    213     db: &PgPool,
    214     tx: &TxInAdmin,
    215     now: &Timestamp,
    216 ) -> sqlx::Result<AddIncomingResult> {
    217     serialized!(
    218         sqlx::query(
    219             "
    220             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    221             FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5)
    222         ",
    223         )
    224         .bind(tx.amount)
    225         .bind(&tx.subject)
    226         .bind(tx.debtor_id)
    227         .bind(&tx.debtor_name)
    228         .bind_timestamp(now)
    229         .bind(tx.metadata.ty())
    230         .bind(tx.metadata.key())
    231        .try_map(|r: PgRow| {
    232             Ok(if r.try_get_flag(0)? {
    233                 AddIncomingResult::ReservePubReuse
    234             } else if r.try_get_flag(1)? {
    235                 AddIncomingResult::MappingReuse
    236             } else if r.try_get_flag(2)? {
    237                 AddIncomingResult::UnknownMapping
    238             } else {
    239                 AddIncomingResult::Success {
    240                     row_id: r.try_get_u64(3)?,
    241                     valued_at: r.try_get_timestamp(4)?,
    242                     new: r.try_get(5)?,
    243                     pending: r.try_get(6)?
    244                 }
    245             })
    246         })
    247         .fetch_one(db)
    248     )
    249 }
    250 
    251 pub async fn register_tx_in(
    252     db: &mut PgConnection,
    253     tx: &TxIn,
    254     subject: &Option<IncomingSubject>,
    255     now: &Timestamp,
    256 ) -> sqlx::Result<AddIncomingResult> {
    257     serialized!(
    258         sqlx::query(
    259             "
    260             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    261             FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    262         ",
    263         )
    264         .bind(tx.transfer_id)
    265         .bind(tx.tx_id)
    266         .bind(tx.amount)
    267         .bind(&tx.subject)
    268         .bind(tx.debtor_id)
    269         .bind(&tx.debtor_name)
    270         .bind(tx.valued_at.as_microsecond())
    271         .bind(subject.as_ref().map(|it| it.ty()))
    272         .bind(subject.as_ref().map(|it| it.key()))
    273         .bind(now.as_microsecond())
    274         .try_map(|r: PgRow| {
    275             Ok(if r.try_get_flag(0)? {
    276                 AddIncomingResult::ReservePubReuse
    277             } else if r.try_get_flag(1)? {
    278                 AddIncomingResult::MappingReuse
    279             } else if r.try_get_flag(2)? {
    280                 AddIncomingResult::UnknownMapping
    281             } else {
    282                 AddIncomingResult::Success {
    283                     row_id: r.try_get_u64(3)?,
    284                     valued_at: r.try_get_timestamp(4)?,
    285                     new: r.try_get(5)?,
    286                     pending: r.try_get(6)?
    287                 }
    288             })
    289         })
    290         .fetch_one(&mut *db)
    291     )
    292 }
    293 
    294 #[derive(Debug)]
    295 pub enum TxOutKind {
    296     Simple,
    297     Bounce(i64),
    298     Talerable(OutgoingSubject),
    299 }
    300 
    301 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    302 #[allow(non_camel_case_types)]
    303 #[sqlx(type_name = "register_result")]
    304 pub enum RegisterResult {
    305     /// Already registered
    306     idempotent,
    307     /// Initiated transaction
    308     known,
    309     /// Recovered unknown outgoing transaction
    310     recovered,
    311 }
    312 
    313 #[derive(Debug, PartialEq, Eq)]
    314 pub struct AddOutgoingResult {
    315     pub result: RegisterResult,
    316     pub row_id: i64,
    317 }
    318 
    319 pub async fn register_tx_out(
    320     db: &mut PgConnection,
    321     tx: &TxOut,
    322     kind: &TxOutKind,
    323     now: &Timestamp,
    324 ) -> sqlx::Result<AddOutgoingResult> {
    325     serialized!({
    326         let query = sqlx::query(
    327             "
    328                 SELECT out_result, out_tx_row_id
    329                 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
    330             ",
    331         )
    332         .bind(tx.transfer_id)
    333         .bind(tx.tx_id)
    334         .bind(tx.amount)
    335         .bind(&tx.subject)
    336         .bind(tx.creditor_id)
    337         .bind(&tx.creditor_name)
    338         .bind_timestamp(&tx.valued_at);
    339         let query = match kind {
    340             TxOutKind::Simple => query
    341                 .bind(None::<&[u8]>)
    342                 .bind(None::<&str>)
    343                 .bind(None::<&str>)
    344                 .bind(None::<i64>),
    345             TxOutKind::Bounce(bounced) => query
    346                 .bind(None::<&[u8]>)
    347                 .bind(None::<&str>)
    348                 .bind(None::<&str>)
    349                 .bind(*bounced),
    350             TxOutKind::Talerable(subject) => query
    351                 .bind(&subject.wtid)
    352                 .bind(subject.exchange_base_url.as_str())
    353                 .bind(&subject.metadata)
    354                 .bind(None::<i64>),
    355         };
    356         query
    357             .bind_timestamp(now)
    358             .try_map(|r: PgRow| {
    359                 Ok(AddOutgoingResult {
    360                     result: r.try_get(0)?,
    361                     row_id: r.try_get(1)?,
    362                 })
    363             })
    364             .fetch_one(&mut *db)
    365     })
    366 }
    367 
    368 #[derive(Debug, PartialEq, Eq)]
    369 pub enum TransferResult {
    370     Success { id: u64, initiated_at: Timestamp },
    371     RequestUidReuse,
    372     WtidReuse,
    373 }
    374 
    375 #[derive(Debug, Clone)]
    376 pub struct Transfer {
    377     pub request_uid: HashCode,
    378     pub amount: Decimal,
    379     pub exchange_base_url: Url,
    380     pub metadata: Option<CompactString>,
    381     pub wtid: ShortHashCode,
    382     pub creditor_id: i64,
    383     pub creditor_name: CompactString,
    384 }
    385 
    386 pub async fn make_transfer(
    387     db: &PgPool,
    388     tx: &Transfer,
    389     now: &Timestamp,
    390 ) -> sqlx::Result<TransferResult> {
    391     let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref());
    392     serialized!(
    393         sqlx::query(
    394             "
    395                 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    396                 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9)
    397             ",
    398         )
    399         .bind(&tx.request_uid)
    400         .bind(&tx.wtid)
    401         .bind(&subject)
    402         .bind(tx.amount)
    403         .bind(tx.exchange_base_url.as_str())
    404         .bind(&tx.metadata)
    405         .bind(tx.creditor_id)
    406         .bind(&tx.creditor_name)
    407         .bind_timestamp(now)
    408         .try_map(|r: PgRow| {
    409             Ok(if r.try_get_flag(0)? {
    410                 TransferResult::RequestUidReuse
    411             } else if r.try_get_flag(1)? {
    412                 TransferResult::WtidReuse
    413             } else {
    414                 TransferResult::Success {
    415                     id: r.try_get_u64(2)?,
    416                     initiated_at: r.try_get_timestamp(3)?,
    417                 }
    418             })
    419         })
    420         .fetch_one(db)
    421     )
    422 }
    423 
    424 #[derive(Debug, PartialEq, Eq)]
    425 pub struct BounceResult {
    426     pub tx_id: u64,
    427     pub tx_new: bool,
    428 }
    429 
    430 pub async fn register_bounced_tx_in(
    431     db: &mut PgConnection,
    432     tx: &TxIn,
    433     chargeback_id: i64,
    434     reason: &str,
    435     now: &Timestamp,
    436 ) -> sqlx::Result<BounceResult> {
    437     serialized!(
    438         sqlx::query(
    439             "
    440                 SELECT out_tx_row_id, out_tx_new
    441                 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    442             ",
    443         )
    444         .bind(tx.transfer_id)
    445         .bind(tx.tx_id)
    446         .bind(tx.amount)
    447         .bind(&tx.subject)
    448         .bind(tx.debtor_id)
    449         .bind(&tx.debtor_name)
    450         .bind_timestamp(&tx.valued_at)
    451         .bind(chargeback_id)
    452         .bind(reason)
    453         .bind_timestamp(now)
    454         .try_map(|r: PgRow| {
    455             Ok(BounceResult {
    456                 tx_id: r.try_get_u64(0)?,
    457                 tx_new: r.try_get(1)?,
    458             })
    459         })
    460         .fetch_one(&mut *db)
    461     )
    462 }
    463 
    464 pub async fn transfer_page(
    465     db: &PgPool,
    466     status: &Option<TransferState>,
    467     currency: &Currency,
    468     root: &CompactString,
    469     params: &Page,
    470 ) -> sqlx::Result<Vec<TransferListStatus>> {
    471     page(
    472         db,
    473         "initiated_id",
    474         params,
    475         || {
    476             let mut builder = QueryBuilder::new(
    477                 "
    478                     SELECT
    479                         initiated_id,
    480                         status,
    481                         amount,
    482                         credit_account,
    483                         credit_name,
    484                         initiated_at
    485                     FROM transfer
    486                     JOIN initiated USING (initiated_id)
    487                     WHERE
    488                 ",
    489             );
    490             if let Some(status) = status {
    491                 builder.push(" status = ").push_bind(status).push(" AND ");
    492             }
    493             builder
    494         },
    495         |r: PgRow| {
    496             Ok(TransferListStatus {
    497                 row_id: r.try_get_safeu64(0)?,
    498                 status: r.try_get(1)?,
    499                 amount: r.try_get_amount(2, currency)?,
    500                 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    501                 timestamp: r.try_get_timestamp(5)?.into(),
    502             })
    503         },
    504     )
    505     .await
    506 }
    507 
    508 pub async fn outgoing_history(
    509     db: &PgPool,
    510     params: &History,
    511     currency: &Currency,
    512     root: &CompactString,
    513     listen: impl FnOnce() -> Receiver<i64>,
    514 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    515     history(
    516         db,
    517         "tx_out_id",
    518         params,
    519         listen,
    520         || {
    521             QueryBuilder::new(
    522                 "
    523                 SELECT
    524                     tx_out_id,
    525                     amount,
    526                     credit_account,
    527                     credit_name,
    528                     valued_at,
    529                     exchange_base_url,
    530                     metadata,
    531                     wtid
    532                 FROM taler_out
    533                 JOIN tx_out USING (tx_out_id)
    534                 WHERE
    535             ",
    536             )
    537         },
    538         |r: PgRow| {
    539             Ok(OutgoingBankTransaction {
    540                 row_id: r.try_get_safeu64(0)?,
    541                 amount: r.try_get_amount(1, currency)?,
    542                 debit_fee: None,
    543                 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?,
    544                 date: r.try_get_timestamp(4)?.into(),
    545                 exchange_base_url: r.try_get_url(5)?,
    546                 metadata: r.try_get(6)?,
    547                 wtid: r.try_get(7)?,
    548             })
    549         },
    550     )
    551     .await
    552 }
    553 
    554 pub async fn incoming_history(
    555     db: &PgPool,
    556     params: &History,
    557     currency: &Currency,
    558     root: &CompactString,
    559     listen: impl FnOnce() -> Receiver<i64>,
    560 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    561     history(
    562         db,
    563         "tx_in_id",
    564         params,
    565         listen,
    566         || {
    567             QueryBuilder::new(
    568                 "
    569                 SELECT
    570                     type,
    571                     tx_in_id,
    572                     amount,
    573                     debit_account,
    574                     debit_name,
    575                     valued_at,
    576                     metadata,
    577                     authorization_pub,
    578                     authorization_sig
    579                 FROM taler_in
    580                 JOIN tx_in USING (tx_in_id)
    581                 WHERE
    582             ",
    583             )
    584         },
    585         |r: PgRow| {
    586             Ok(match r.try_get(0)? {
    587                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    588                     row_id: r.try_get_safeu64(1)?,
    589                     amount: r.try_get_amount(2, currency)?,
    590                     credit_fee: None,
    591                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    592                     date: r.try_get_timestamp(5)?.into(),
    593                     reserve_pub: r.try_get(6)?,
    594                     authorization_pub: r.try_get(7)?,
    595                     authorization_sig: r.try_get(8)?,
    596                 },
    597                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    598                     row_id: r.try_get_safeu64(1)?,
    599                     amount: r.try_get_amount(2, currency)?,
    600                     credit_fee: None,
    601                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    602                     date: r.try_get_timestamp(5)?.into(),
    603                     account_pub: r.try_get(6)?,
    604                     authorization_pub: r.try_get(7)?,
    605                     authorization_sig: r.try_get(8)?,
    606                 },
    607                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    608             })
    609         },
    610     )
    611     .await
    612 }
    613 
    614 pub async fn revenue_history(
    615     db: &PgPool,
    616     params: &History,
    617     currency: &Currency,
    618     root: &CompactString,
    619     listen: impl FnOnce() -> Receiver<i64>,
    620 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    621     history(
    622         db,
    623         "tx_in_id",
    624         params,
    625         listen,
    626         || {
    627             QueryBuilder::new(
    628                 "
    629                 SELECT
    630                     tx_in_id,
    631                     valued_at,
    632                     amount,
    633                     debit_account,
    634                     debit_name,
    635                     subject
    636                 FROM tx_in
    637                 WHERE
    638             ",
    639             )
    640         },
    641         |r: PgRow| {
    642             Ok(RevenueIncomingBankTransaction {
    643                 row_id: r.try_get_safeu64(0)?,
    644                 date: r.try_get_timestamp(1)?.into(),
    645                 amount: r.try_get_amount(2, currency)?,
    646                 credit_fee: None,
    647                 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    648                 subject: r.try_get(5)?,
    649             })
    650         },
    651     )
    652     .await
    653 }
    654 
    655 pub async fn transfer_by_id(
    656     db: &PgPool,
    657     id: u64,
    658     currency: &Currency,
    659     root: &CompactString,
    660 ) -> sqlx::Result<Option<TransferStatus>> {
    661     serialized!(
    662         sqlx::query(
    663             "
    664                 SELECT
    665                     status,
    666                     status_msg,
    667                     amount,
    668                     exchange_base_url,
    669                     metadata,
    670                     wtid,
    671                     credit_account,
    672                     credit_name,
    673                     initiated_at
    674                 FROM transfer
    675                 JOIN initiated USING (initiated_id)
    676                 WHERE initiated_id = $1
    677             ",
    678         )
    679         .bind(id as i64)
    680         .try_map(|r: PgRow| {
    681             Ok(TransferStatus {
    682                 status: r.try_get(0)?,
    683                 status_msg: r.try_get(1)?,
    684                 amount: r.try_get_amount(2, currency)?,
    685                 origin_exchange_url: r.try_get(3)?,
    686                 metadata: r.try_get(4)?,
    687                 wtid: r.try_get(5)?,
    688                 credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?,
    689                 timestamp: r.try_get_timestamp(8)?.into(),
    690             })
    691         })
    692         .fetch_optional(db)
    693     )
    694 }
    695 
    696 /** Get a batch of pending initiated transactions not attempted since [start] */
    697 pub async fn pending_batch(
    698     db: &mut PgConnection,
    699     start: &Timestamp,
    700 ) -> sqlx::Result<Vec<Initiated>> {
    701     serialized!(
    702         sqlx::query(
    703             "
    704             SELECT initiated_id, amount, subject, credit_account, credit_name
    705             FROM initiated
    706             WHERE tx_id IS NULL
    707                 AND status='pending'
    708                 AND (last_submitted IS NULL OR last_submitted < $1)
    709             LIMIT 100
    710         ",
    711         )
    712         .bind_timestamp(start)
    713         .try_map(|r: PgRow| {
    714             Ok(Initiated {
    715                 id: r.try_get(0)?,
    716                 amount: r.try_get(1)?,
    717                 subject: r.try_get(2)?,
    718                 creditor_id: r.try_get(3)?,
    719                 creditor_name: r.try_get(4)?,
    720             })
    721         })
    722         .fetch_all(&mut *db)
    723     )
    724 }
    725 
    726 /** Update status of a successful submitted initiated transaction */
    727 pub async fn initiated_submit_success(
    728     db: &mut PgConnection,
    729     initiated_id: i64,
    730     timestamp: &Timestamp,
    731     tx_id: i64,
    732 ) -> sqlx::Result<()> {
    733     serialized!(
    734         sqlx::query(
    735             "
    736                 UPDATE initiated
    737                 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2
    738                 WHERE initiated_id=$3
    739             "
    740         )
    741         .bind_timestamp(timestamp)
    742         .bind(tx_id)
    743         .bind(initiated_id)
    744         .execute(&mut *db)
    745     )?;
    746     Ok(())
    747 }
    748 
    749 /** Update status of a permanently failed initiated transaction */
    750 pub async fn initiated_submit_permanent_failure(
    751     db: &mut PgConnection,
    752     initiated_id: i64,
    753     msg: &str,
    754 ) -> sqlx::Result<()> {
    755     serialized!(
    756         sqlx::query(
    757             "
    758                 UPDATE initiated
    759                 SET status='permanent_failure', status_msg=$1
    760                 WHERE initiated_id=$2
    761             ",
    762         )
    763         .bind(msg)
    764         .bind(initiated_id)
    765         .execute(&mut *db)
    766     )?;
    767     Ok(())
    768 }
    769 
    770 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    771 pub enum ChargebackFailureResult {
    772     Unknown,
    773     Known(u64),
    774     Idempotent(u64),
    775 }
    776 
    777 /** Update status of a charged back initiated transaction */
    778 pub async fn initiated_chargeback_failure(
    779     db: &mut PgConnection,
    780     transfer_id: i64,
    781 ) -> sqlx::Result<ChargebackFailureResult> {
    782     Ok(serialized!(
    783         sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)")
    784             .bind(transfer_id)
    785             .try_map(|r: PgRow| {
    786                 let id = r.try_get_u64(0)?;
    787                 Ok(if id == 0 {
    788                     ChargebackFailureResult::Unknown
    789                 } else if r.try_get(1)? {
    790                     ChargebackFailureResult::Known(id)
    791                 } else {
    792                     ChargebackFailureResult::Idempotent(id)
    793                 })
    794             })
    795             .fetch_optional(&mut *db)
    796     )?
    797     .unwrap_or(ChargebackFailureResult::Unknown))
    798 }
    799 
    800 /** Get JSON value from KV table */
    801 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>(
    802     db: &mut PgConnection,
    803     key: &str,
    804 ) -> sqlx::Result<Option<T>> {
    805     serialized!(
    806         sqlx::query("SELECT value FROM kv WHERE key=$1")
    807             .bind(key)
    808             .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    809             .fetch_optional(&mut *db)
    810     )
    811 }
    812 
    813 /** Set JSON value in KV table */
    814 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> {
    815     serialized!(
    816         sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    817             .bind(key)
    818             .bind(sqlx::types::Json(value))
    819             .execute(&mut *db)
    820     )?;
    821     Ok(())
    822 }
    823 
    824 pub enum RegistrationResult {
    825     Success,
    826     ReservePubReuse,
    827 }
    828 
    829 pub async fn transfer_register(
    830     db: &PgPool,
    831     req: &RegistrationRequest,
    832 ) -> sqlx::Result<RegistrationResult> {
    833     let ty: IncomingType = req.r#type.into();
    834     serialized!(
    835         sqlx::query(
    836             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    837         )
    838         .bind(ty)
    839         .bind(&req.account_pub)
    840         .bind(&req.authorization_pub)
    841         .bind(&req.authorization_sig)
    842         .bind(req.recurrent)
    843         .bind_timestamp(&Timestamp::now())
    844         .try_map(|r: PgRow| {
    845             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    846                 RegistrationResult::ReservePubReuse
    847             } else {
    848                 RegistrationResult::Success
    849             })
    850         })
    851         .fetch_one(db)
    852     )
    853 }
    854 
    855 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    856     serialized!(
    857         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)")
    858             .bind(&req.authorization_pub)
    859             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    860             .fetch_one(db)
    861     )
    862 }
    863 
    864 pub trait CyclosTypeHelper {
    865     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    866         &self,
    867         idx: I,
    868         name: I,
    869         root: &CompactString,
    870     ) -> sqlx::Result<FullCyclosPayto>;
    871     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    872         &self,
    873         idx: I,
    874         name: I,
    875         root: &CompactString,
    876     ) -> sqlx::Result<PaytoURI>;
    877 }
    878 
    879 impl CyclosTypeHelper for PgRow {
    880     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    881         &self,
    882         idx: I,
    883         name: I,
    884         root: &CompactString,
    885     ) -> sqlx::Result<FullCyclosPayto> {
    886         let idx = self.try_get(idx)?;
    887         let name = self.try_get(name)?;
    888         Ok(FullCyclosPayto::new(
    889             CyclosAccount {
    890                 id: CyclosId(idx),
    891                 root: root.clone(),
    892             },
    893             name,
    894         ))
    895     }
    896     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    897         &self,
    898         idx: I,
    899         name: I,
    900         root: &CompactString,
    901     ) -> sqlx::Result<PaytoURI> {
    902         let idx = self.try_get(idx)?;
    903         let name = self.try_get(name)?;
    904         Ok(CyclosAccount {
    905             id: CyclosId(idx),
    906             root: root.clone(),
    907         }
    908         .as_full_payto(name))
    909     }
    910 }
    911 
    912 #[cfg(test)]
    913 mod test {
    914     use compact_str::CompactString;
    915     use jiff::{Span, Timestamp};
    916     use serde_json::json;
    917     use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow};
    918     use taler_api::{
    919         db::TypeHelper,
    920         notification::dummy_listen,
    921         subject::{IncomingSubject, OutgoingSubject},
    922     };
    923     use taler_common::{
    924         api_common::{EddsaPublicKey, HashCode, ShortHashCode},
    925         api_params::{History, Page},
    926         api_wire::TransferState,
    927         types::{
    928             amount::{Currency, decimal},
    929             url,
    930             utils::now_sql_stable_ts,
    931         },
    932     };
    933 
    934     use crate::{
    935         constants::CONFIG_SOURCE,
    936         db::{
    937             self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult,
    938             Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind,
    939         },
    940     };
    941 
    942     pub const CURR: Currency = Currency::TEST;
    943     pub const ROOT: CompactString = CompactString::const_new("localhost");
    944 
    945     async fn setup() -> (PoolConnection<Postgres>, PgPool) {
    946         taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await
    947     }
    948 
    949     #[tokio::test]
    950     async fn kv() {
    951         let (mut db, _) = setup().await;
    952 
    953         let value = json!({
    954             "name": "Mr Smith",
    955             "no way": 32
    956         });
    957 
    958         assert_eq!(
    959             db::kv_get::<serde_json::Value>(&mut db, "value")
    960                 .await
    961                 .unwrap(),
    962             None
    963         );
    964         db::kv_set(&mut db, "value", &value).await.unwrap();
    965         db::kv_set(&mut db, "value", &value).await.unwrap();
    966         assert_eq!(
    967             db::kv_get::<serde_json::Value>(&mut db, "value")
    968                 .await
    969                 .unwrap(),
    970             Some(value)
    971         );
    972     }
    973 
    974     #[tokio::test]
    975     async fn tx_in() {
    976         let (mut db, pool) = setup().await;
    977 
    978         let mut routine = async |first: &Option<IncomingSubject>,
    979                                  second: &Option<IncomingSubject>| {
    980             let id = sqlx::query("SELECT count(*) + 1 FROM tx_in")
    981                 .try_map(|r: PgRow| r.try_get_u64(0))
    982                 .fetch_one(&mut *db)
    983                 .await
    984                 .unwrap();
    985             let now = now_sql_stable_ts();
    986             let later = now + Span::new().hours(2);
    987             let tx = TxIn {
    988                 transfer_id: now.as_microsecond() as i64,
    989                 tx_id: None,
    990                 amount: decimal("10"),
    991                 subject: "subject".to_owned(),
    992                 debtor_id: 31000163100000000,
    993                 debtor_name: "Name".into(),
    994                 valued_at: now,
    995             };
    996             // Insert
    997             assert_eq!(
    998                 db::register_tx_in(&mut db, &tx, first, &now)
    999                     .await
   1000                     .expect("register tx in"),
   1001                 AddIncomingResult::Success {
   1002                     new: true,
   1003                     pending: false,
   1004                     row_id: id,
   1005                     valued_at: now,
   1006                 }
   1007             );
   1008             // Idempotent
   1009             assert_eq!(
   1010                 db::register_tx_in(
   1011                     &mut db,
   1012                     &TxIn {
   1013                         valued_at: later,
   1014                         ..tx.clone()
   1015                     },
   1016                     first,
   1017                     &now
   1018                 )
   1019                 .await
   1020                 .expect("register tx in"),
   1021                 AddIncomingResult::Success {
   1022                     new: false,
   1023                     pending: false,
   1024                     row_id: id,
   1025                     valued_at: now
   1026                 }
   1027             );
   1028             // Many
   1029             assert_eq!(
   1030                 db::register_tx_in(
   1031                     &mut db,
   1032                     &TxIn {
   1033                         transfer_id: later.as_microsecond() as i64,
   1034                         valued_at: later,
   1035                         ..tx
   1036                     },
   1037                     second,
   1038                     &now
   1039                 )
   1040                 .await
   1041                 .expect("register tx in"),
   1042                 AddIncomingResult::Success {
   1043                     new: true,
   1044                     pending: false,
   1045                     row_id: id + 1,
   1046                     valued_at: later
   1047                 }
   1048             );
   1049         };
   1050 
   1051         // Empty db
   1052         assert_eq!(
   1053             db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1054                 .await
   1055                 .unwrap(),
   1056             Vec::new()
   1057         );
   1058         assert_eq!(
   1059             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1060                 .await
   1061                 .unwrap(),
   1062             Vec::new()
   1063         );
   1064 
   1065         // Regular transaction
   1066         routine(&None, &None).await;
   1067 
   1068         // Reserve transaction
   1069         routine(
   1070             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1071             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1072         )
   1073         .await;
   1074 
   1075         // Kyc transaction
   1076         routine(
   1077             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1078             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1079         )
   1080         .await;
   1081 
   1082         // History
   1083         assert_eq!(
   1084             db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1085                 .await
   1086                 .unwrap()
   1087                 .len(),
   1088             6
   1089         );
   1090         assert_eq!(
   1091             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1092                 .await
   1093                 .unwrap()
   1094                 .len(),
   1095             4
   1096         );
   1097     }
   1098 
   1099     #[tokio::test]
   1100     async fn tx_in_admin() {
   1101         let (_, pool) = setup().await;
   1102 
   1103         // Empty db
   1104         assert_eq!(
   1105             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1106                 .await
   1107                 .unwrap(),
   1108             Vec::new()
   1109         );
   1110 
   1111         let now = now_sql_stable_ts();
   1112         let later = now + Span::new().hours(2);
   1113         let tx = TxInAdmin {
   1114             amount: decimal("10"),
   1115             subject: "subject".to_owned(),
   1116             debtor_id: 31000163100000000,
   1117             debtor_name: "Name".into(),
   1118             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1119         };
   1120         // Insert
   1121         assert_eq!(
   1122             db::register_tx_in_admin(&pool, &tx, &now)
   1123                 .await
   1124                 .expect("register tx in"),
   1125             AddIncomingResult::Success {
   1126                 new: true,
   1127                 pending: false,
   1128                 row_id: 1,
   1129                 valued_at: now
   1130             }
   1131         );
   1132         // Many
   1133         assert_eq!(
   1134             db::register_tx_in_admin(
   1135                 &pool,
   1136                 &TxInAdmin {
   1137                     subject: "Other".to_owned(),
   1138                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1139                     ..tx.clone()
   1140                 },
   1141                 &later
   1142             )
   1143             .await
   1144             .expect("register tx in"),
   1145             AddIncomingResult::Success {
   1146                 new: true,
   1147                 pending: false,
   1148                 row_id: 2,
   1149                 valued_at: later
   1150             }
   1151         );
   1152 
   1153         // History
   1154         assert_eq!(
   1155             db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1156                 .await
   1157                 .unwrap()
   1158                 .len(),
   1159             2
   1160         );
   1161     }
   1162 
   1163     #[tokio::test]
   1164     async fn tx_out() {
   1165         let (mut db, pool) = setup().await;
   1166 
   1167         let mut routine = async |first: &TxOutKind, second: &TxOutKind| {
   1168             let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out")
   1169                 .try_map(|r: PgRow| r.try_get(0))
   1170                 .fetch_one(&mut *db)
   1171                 .await
   1172                 .unwrap();
   1173             let now = now_sql_stable_ts();
   1174             let later = now + Span::new().hours(2);
   1175             let tx = TxOut {
   1176                 transfer_id,
   1177                 tx_id: Some(transfer_id),
   1178                 amount: decimal("10"),
   1179                 subject: "subject".to_owned(),
   1180                 creditor_id: 31000163100000000,
   1181                 creditor_name: "Name".into(),
   1182                 valued_at: now,
   1183             };
   1184             assert!(matches!(
   1185                 db::make_transfer(
   1186                     &pool,
   1187                     &Transfer {
   1188                         request_uid: HashCode::rand(),
   1189                         amount: decimal("10"),
   1190                         exchange_base_url: url("https://exchange.test.com/"),
   1191                         metadata: None,
   1192                         wtid: ShortHashCode::rand(),
   1193                         creditor_id: 31000163100000000,
   1194                         creditor_name: "Name".into()
   1195                     },
   1196                     &now
   1197                 )
   1198                 .await
   1199                 .unwrap(),
   1200                 TransferResult::Success { .. }
   1201             ));
   1202             db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id)
   1203                 .await
   1204                 .expect("status success");
   1205 
   1206             // Insert
   1207             assert_eq!(
   1208                 db::register_tx_out(&mut db, &tx, first, &now)
   1209                     .await
   1210                     .expect("register tx out"),
   1211                 AddOutgoingResult {
   1212                     result: db::RegisterResult::known,
   1213                     row_id: transfer_id,
   1214                 }
   1215             );
   1216             // Idempotent
   1217             assert_eq!(
   1218                 db::register_tx_out(
   1219                     &mut db,
   1220                     &TxOut {
   1221                         valued_at: later,
   1222                         ..tx.clone()
   1223                     },
   1224                     first,
   1225                     &now
   1226                 )
   1227                 .await
   1228                 .expect("register tx out"),
   1229                 AddOutgoingResult {
   1230                     result: db::RegisterResult::idempotent,
   1231                     row_id: transfer_id,
   1232                 }
   1233             );
   1234             // Recovered
   1235             assert_eq!(
   1236                 db::register_tx_out(
   1237                     &mut db,
   1238                     &TxOut {
   1239                         transfer_id: transfer_id + 1,
   1240                         tx_id: Some(transfer_id + 1),
   1241                         valued_at: later,
   1242                         ..tx.clone()
   1243                     },
   1244                     second,
   1245                     &now
   1246                 )
   1247                 .await
   1248                 .expect("register tx out"),
   1249                 AddOutgoingResult {
   1250                     result: db::RegisterResult::recovered,
   1251                     row_id: transfer_id + 1,
   1252                 }
   1253             );
   1254         };
   1255 
   1256         // Empty db
   1257         assert_eq!(
   1258             db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1259                 .await
   1260                 .unwrap(),
   1261             Vec::new()
   1262         );
   1263 
   1264         // Regular transaction
   1265         routine(&TxOutKind::Simple, &TxOutKind::Simple).await;
   1266 
   1267         // Talerable transaction
   1268         routine(
   1269             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1270             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1271         )
   1272         .await;
   1273 
   1274         // Bounced transaction
   1275         routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1276 
   1277         // History
   1278         assert_eq!(
   1279             db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen)
   1280                 .await
   1281                 .unwrap()
   1282                 .len(),
   1283             2
   1284         );
   1285     }
   1286 
   1287     // TODO tx out failure
   1288 
   1289     #[tokio::test]
   1290     async fn transfer() {
   1291         let (_, pool) = setup().await;
   1292 
   1293         // Empty db
   1294         assert_eq!(
   1295             db::transfer_by_id(&pool, 0, &CURR, &ROOT).await.unwrap(),
   1296             None
   1297         );
   1298         assert_eq!(
   1299             db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default())
   1300                 .await
   1301                 .unwrap(),
   1302             Vec::new()
   1303         );
   1304 
   1305         let req = Transfer {
   1306             request_uid: HashCode::rand(),
   1307             amount: decimal("10"),
   1308             exchange_base_url: url("https://exchange.test.com/"),
   1309             metadata: None,
   1310             wtid: ShortHashCode::rand(),
   1311             creditor_id: 31000163100000000,
   1312             creditor_name: "Name".into(),
   1313         };
   1314         let now = now_sql_stable_ts();
   1315         let later = now + Span::new().hours(2);
   1316         // Insert
   1317         assert_eq!(
   1318             db::make_transfer(&pool, &req, &now)
   1319                 .await
   1320                 .expect("transfer"),
   1321             TransferResult::Success {
   1322                 id: 1,
   1323                 initiated_at: now
   1324             }
   1325         );
   1326         // Idempotent
   1327         assert_eq!(
   1328             db::make_transfer(&pool, &req, &later)
   1329                 .await
   1330                 .expect("transfer"),
   1331             TransferResult::Success {
   1332                 id: 1,
   1333                 initiated_at: now
   1334             }
   1335         );
   1336         // Request UID reuse
   1337         assert_eq!(
   1338             db::make_transfer(
   1339                 &pool,
   1340                 &Transfer {
   1341                     wtid: ShortHashCode::rand(),
   1342                     ..req.clone()
   1343                 },
   1344                 &now
   1345             )
   1346             .await
   1347             .expect("transfer"),
   1348             TransferResult::RequestUidReuse
   1349         );
   1350         // wtid reuse
   1351         assert_eq!(
   1352             db::make_transfer(
   1353                 &pool,
   1354                 &Transfer {
   1355                     request_uid: HashCode::rand(),
   1356                     ..req.clone()
   1357                 },
   1358                 &now
   1359             )
   1360             .await
   1361             .expect("transfer"),
   1362             TransferResult::WtidReuse
   1363         );
   1364         // Many
   1365         assert_eq!(
   1366             db::make_transfer(
   1367                 &pool,
   1368                 &Transfer {
   1369                     request_uid: HashCode::rand(),
   1370                     wtid: ShortHashCode::rand(),
   1371                     ..req
   1372                 },
   1373                 &later
   1374             )
   1375             .await
   1376             .expect("transfer"),
   1377             TransferResult::Success {
   1378                 id: 2,
   1379                 initiated_at: later
   1380             }
   1381         );
   1382 
   1383         // Get
   1384         assert!(
   1385             db::transfer_by_id(&pool, 1, &CURR, &ROOT)
   1386                 .await
   1387                 .unwrap()
   1388                 .is_some()
   1389         );
   1390         assert!(
   1391             db::transfer_by_id(&pool, 2, &CURR, &ROOT)
   1392                 .await
   1393                 .unwrap()
   1394                 .is_some()
   1395         );
   1396         assert!(
   1397             db::transfer_by_id(&pool, 3, &CURR, &ROOT)
   1398                 .await
   1399                 .unwrap()
   1400                 .is_none()
   1401         );
   1402         assert_eq!(
   1403             db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default())
   1404                 .await
   1405                 .unwrap()
   1406                 .len(),
   1407             2
   1408         );
   1409     }
   1410 
   1411     #[tokio::test]
   1412     async fn bounce() {
   1413         let (mut db, _) = setup().await;
   1414 
   1415         let amount = decimal("10");
   1416         let now = now_sql_stable_ts();
   1417 
   1418         // Bounce
   1419         assert_eq!(
   1420             db::register_bounced_tx_in(
   1421                 &mut db,
   1422                 &TxIn {
   1423                     transfer_id: 12,
   1424                     tx_id: None,
   1425                     amount,
   1426                     subject: "subject".to_owned(),
   1427                     debtor_id: 31000163100000000,
   1428                     debtor_name: "Name".into(),
   1429                     valued_at: now
   1430                 },
   1431                 22,
   1432                 "good reason",
   1433                 &now
   1434             )
   1435             .await
   1436             .expect("bounce"),
   1437             BounceResult {
   1438                 tx_id: 1,
   1439                 tx_new: true
   1440             }
   1441         );
   1442         // Idempotent
   1443         assert_eq!(
   1444             db::register_bounced_tx_in(
   1445                 &mut db,
   1446                 &TxIn {
   1447                     transfer_id: 12,
   1448                     tx_id: None,
   1449                     amount,
   1450                     subject: "subject".to_owned(),
   1451                     debtor_id: 31000163100000000,
   1452                     debtor_name: "Name".into(),
   1453                     valued_at: now
   1454                 },
   1455                 22,
   1456                 "good reason",
   1457                 &now
   1458             )
   1459             .await
   1460             .expect("bounce"),
   1461             BounceResult {
   1462                 tx_id: 1,
   1463                 tx_new: false
   1464             }
   1465         );
   1466 
   1467         // Many
   1468         assert_eq!(
   1469             db::register_bounced_tx_in(
   1470                 &mut db,
   1471                 &TxIn {
   1472                     transfer_id: 13,
   1473                     tx_id: None,
   1474                     amount,
   1475                     subject: "subject".to_owned(),
   1476                     debtor_id: 31000163100000000,
   1477                     debtor_name: "Name".into(),
   1478                     valued_at: now
   1479                 },
   1480                 23,
   1481                 "good reason",
   1482                 &now
   1483             )
   1484             .await
   1485             .expect("bounce"),
   1486             BounceResult {
   1487                 tx_id: 2,
   1488                 tx_new: true
   1489             }
   1490         );
   1491     }
   1492 
   1493     #[tokio::test]
   1494     async fn status() {
   1495         let (mut db, pool) = setup().await;
   1496 
   1497         let check_status = async |id: u64, status: TransferState, msg: Option<&str>| {
   1498             let transfer = db::transfer_by_id(&pool, id, &CURR, &ROOT)
   1499                 .await
   1500                 .unwrap()
   1501                 .unwrap();
   1502             assert_eq!(
   1503                 (status, msg),
   1504                 (transfer.status, transfer.status_msg.as_deref())
   1505             );
   1506         };
   1507 
   1508         // Unknown transfer
   1509         db::initiated_submit_permanent_failure(&mut db, 1, "msg")
   1510             .await
   1511             .unwrap();
   1512         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1513             .await
   1514             .unwrap();
   1515         assert_eq!(
   1516             db::initiated_chargeback_failure(&mut db, 1).await.unwrap(),
   1517             ChargebackFailureResult::Unknown
   1518         );
   1519 
   1520         // Failure
   1521         db::make_transfer(
   1522             &pool,
   1523             &Transfer {
   1524                 request_uid: HashCode::rand(),
   1525                 amount: decimal("1"),
   1526                 exchange_base_url: url("https://exchange.test.com/"),
   1527                 metadata: None,
   1528                 wtid: ShortHashCode::rand(),
   1529                 creditor_id: 31000163100000000,
   1530                 creditor_name: "Name".into(),
   1531             },
   1532             &Timestamp::now(),
   1533         )
   1534         .await
   1535         .expect("transfer");
   1536         check_status(1, TransferState::pending, None).await;
   1537         db::initiated_submit_permanent_failure(&mut db, 1, "error status")
   1538             .await
   1539             .unwrap();
   1540         check_status(1, TransferState::permanent_failure, Some("error status")).await;
   1541 
   1542         // Success
   1543         db::make_transfer(
   1544             &pool,
   1545             &Transfer {
   1546                 request_uid: HashCode::rand(),
   1547                 amount: decimal("1"),
   1548                 exchange_base_url: url("https://exchange.test.com/"),
   1549                 metadata: None,
   1550                 wtid: ShortHashCode::rand(),
   1551                 creditor_id: 31000163100000000,
   1552                 creditor_name: "Name".into(),
   1553             },
   1554             &Timestamp::now(),
   1555         )
   1556         .await
   1557         .expect("transfer");
   1558         check_status(2, TransferState::pending, None).await;
   1559         db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3)
   1560             .await
   1561             .unwrap();
   1562         check_status(2, TransferState::pending, None).await;
   1563         db::register_tx_out(
   1564             &mut db,
   1565             &TxOut {
   1566                 transfer_id: 5,
   1567                 tx_id: Some(3),
   1568                 amount: decimal("2"),
   1569                 subject: "".to_string(),
   1570                 creditor_id: 31000163100000000,
   1571                 creditor_name: "Name".into(),
   1572                 valued_at: Timestamp::now(),
   1573             },
   1574             &TxOutKind::Simple,
   1575             &Timestamp::now(),
   1576         )
   1577         .await
   1578         .unwrap();
   1579         check_status(2, TransferState::success, None).await;
   1580 
   1581         // Chargeback
   1582         assert_eq!(
   1583             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1584             ChargebackFailureResult::Known(2)
   1585         );
   1586         check_status(2, TransferState::late_failure, Some("charged back")).await;
   1587         assert_eq!(
   1588             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1589             ChargebackFailureResult::Idempotent(2)
   1590         );
   1591     }
   1592 
   1593     #[tokio::test]
   1594     async fn batch() {
   1595         let (mut db, pool) = setup().await;
   1596         let start = Timestamp::now();
   1597 
   1598         // Empty db
   1599         let pendings = db::pending_batch(&mut db, &start)
   1600             .await
   1601             .expect("pending_batch");
   1602         assert_eq!(pendings.len(), 0);
   1603 
   1604         // Some transfers
   1605         for i in 0..3 {
   1606             db::make_transfer(
   1607                 &pool,
   1608                 &Transfer {
   1609                     request_uid: HashCode::rand(),
   1610                     amount: decimal(format!("{}", i + 1)),
   1611                     exchange_base_url: url("https://exchange.test.com/"),
   1612                     metadata: None,
   1613                     wtid: ShortHashCode::rand(),
   1614                     creditor_id: 31000163100000000,
   1615                     creditor_name: "Name".into(),
   1616                 },
   1617                 &Timestamp::now(),
   1618             )
   1619             .await
   1620             .expect("transfer");
   1621         }
   1622         let pendings = db::pending_batch(&mut db, &start)
   1623             .await
   1624             .expect("pending_batch");
   1625         assert_eq!(pendings.len(), 3);
   1626 
   1627         // Max 100 txs in batch
   1628         for i in 0..100 {
   1629             db::make_transfer(
   1630                 &pool,
   1631                 &Transfer {
   1632                     request_uid: HashCode::rand(),
   1633                     amount: decimal(format!("{}", i + 1)),
   1634                     exchange_base_url: url("https://exchange.test.com/"),
   1635                     metadata: None,
   1636                     wtid: ShortHashCode::rand(),
   1637                     creditor_id: 31000163100000000,
   1638                     creditor_name: "Name".into(),
   1639                 },
   1640                 &Timestamp::now(),
   1641             )
   1642             .await
   1643             .expect("transfer");
   1644         }
   1645         let pendings = db::pending_batch(&mut db, &start)
   1646             .await
   1647             .expect("pending_batch");
   1648         assert_eq!(pendings.len(), 100);
   1649 
   1650         // Skip uploaded
   1651         for i in 0..=10 {
   1652             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1653                 .await
   1654                 .expect("status success");
   1655         }
   1656         let pendings = db::pending_batch(&mut db, &start)
   1657             .await
   1658             .expect("pending_batch");
   1659         assert_eq!(pendings.len(), 93);
   1660 
   1661         // Skip failed
   1662         for i in 0..=10 {
   1663             db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure")
   1664                 .await
   1665                 .expect("status failure");
   1666         }
   1667         let pendings = db::pending_batch(&mut db, &start)
   1668             .await
   1669             .expect("pending_batch");
   1670         assert_eq!(pendings.len(), 83);
   1671     }
   1672 }