taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (49234B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use compact_str::CompactString;
     20 use jiff::Timestamp;
     21 use serde::{Serialize, de::DeserializeOwned};
     22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow};
     23 use taler_api::{
     24     db::{BindHelper, TypeHelper, history, page},
     25     serialized,
     26     subject::{IncomingSubject, OutgoingSubject, fmt_out_subject},
     27 };
     28 use taler_common::{
     29     api_common::{HashCode, ShortHashCode},
     30     api_params::{History, Page},
     31     api_revenue::RevenueIncomingBankTransaction,
     32     api_transfer::{RegistrationRequest, Unregistration},
     33     api_wire::{
     34         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState,
     35         TransferStatus,
     36     },
     37     config::Config,
     38     db::IncomingType,
     39     types::{
     40         amount::{Currency, Decimal},
     41         payto::{PaytoImpl as _, PaytoURI},
     42     },
     43 };
     44 use tokio::sync::watch::{Receiver, Sender};
     45 use url::Url;
     46 
     47 use crate::{
     48     config::parse_db_cfg,
     49     payto::{CyclosAccount, CyclosId, FullCyclosPayto},
     50 };
     51 
     52 const SCHEMA: &str = "cyclos";
     53 
     54 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> {
     55     let db = parse_db_cfg(cfg)?;
     56     let pool = taler_common::db::pool(db.cfg, SCHEMA).await?;
     57     Ok(pool)
     58 }
     59 
     60 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> {
     61     let db_cfg = parse_db_cfg(cfg)?;
     62     let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?;
     63     let mut db = pool.acquire().await?;
     64     taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?;
     65     Ok(pool)
     66 }
     67 
     68 pub async fn notification_listener(
     69     pool: PgPool,
     70     in_channel: Sender<i64>,
     71     taler_in_channel: Sender<i64>,
     72     out_channel: Sender<i64>,
     73     taler_out_channel: Sender<i64>,
     74 ) -> sqlx::Result<()> {
     75     taler_api::notification::notification_listener!(&pool,
     76         "tx_in" => (row_id: i64) {
     77             in_channel.send_replace(row_id);
     78         },
     79         "taler_in" => (row_id: i64) {
     80             taler_in_channel.send_replace(row_id);
     81         },
     82         "tx_out" => (row_id: i64) {
     83             out_channel.send_replace(row_id);
     84         },
     85         "taler_out" => (row_id: i64) {
     86             taler_out_channel.send_replace(row_id);
     87         }
     88     )
     89 }
     90 
     91 #[derive(Debug, Clone)]
     92 pub struct TxIn {
     93     pub transfer_id: i64,
     94     pub tx_id: Option<i64>,
     95     pub amount: Decimal,
     96     pub subject: String,
     97     pub debtor_id: i64,
     98     pub debtor_name: CompactString,
     99     pub valued_at: Timestamp,
    100 }
    101 
    102 impl Display for TxIn {
    103     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    104         let Self {
    105             transfer_id,
    106             tx_id,
    107             amount,
    108             subject,
    109             valued_at,
    110             debtor_id,
    111             debtor_name,
    112         } = self;
    113         let tx_id = match tx_id {
    114             Some(id) => format_args!(":{}", *id),
    115             None => format_args!(""),
    116         };
    117         write!(
    118             f,
    119             "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'"
    120         )
    121     }
    122 }
    123 
    124 #[derive(Debug, Clone)]
    125 pub struct TxOut {
    126     pub transfer_id: i64,
    127     pub tx_id: Option<i64>,
    128     pub amount: Decimal,
    129     pub subject: String,
    130     pub creditor_id: i64,
    131     pub creditor_name: CompactString,
    132     pub valued_at: Timestamp,
    133 }
    134 
    135 impl Display for TxOut {
    136     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    137         let Self {
    138             transfer_id,
    139             tx_id,
    140             amount,
    141             subject,
    142             creditor_id,
    143             creditor_name,
    144             valued_at,
    145         } = self;
    146         let tx_id = match tx_id {
    147             Some(id) => format_args!(":{}", *id),
    148             None => format_args!(""),
    149         };
    150         write!(
    151             f,
    152             "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    153         )
    154     }
    155 }
    156 
    157 #[derive(Debug, PartialEq, Eq)]
    158 pub struct Initiated {
    159     pub id: i64,
    160     pub amount: Decimal,
    161     pub subject: String,
    162     pub creditor_id: i64,
    163     pub creditor_name: CompactString,
    164 }
    165 
    166 impl Display for Initiated {
    167     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    168         let Self {
    169             id,
    170             amount,
    171             subject,
    172             creditor_id,
    173             creditor_name,
    174         } = self;
    175         write!(
    176             f,
    177             "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    178         )
    179     }
    180 }
    181 
    182 #[derive(Debug, Clone)]
    183 pub struct TxInAdmin {
    184     pub amount: Decimal,
    185     pub subject: String,
    186     pub debtor_id: i64,
    187     pub debtor_name: CompactString,
    188     pub metadata: IncomingSubject,
    189 }
    190 
    191 #[derive(Debug, PartialEq, Eq)]
    192 pub enum AddIncomingResult {
    193     Success {
    194         new: bool,
    195         pending: bool,
    196         row_id: u64,
    197         valued_at: Timestamp,
    198     },
    199     ReservePubReuse,
    200     UnknownMapping,
    201     MappingReuse,
    202 }
    203 
    204 /// Lock the database for worker execution
    205 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
    206     sqlx::query("SELECT pg_try_advisory_lock(42)")
    207         .try_map(|r: PgRow| r.try_get(0))
    208         .fetch_one(e)
    209         .await
    210 }
    211 
    212 pub async fn register_tx_in_admin(
    213     db: &PgPool,
    214     tx: &TxInAdmin,
    215     now: &Timestamp,
    216 ) -> sqlx::Result<AddIncomingResult> {
    217     serialized!(
    218         sqlx::query(
    219             "
    220             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    221             FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5)
    222         ",
    223         )
    224         .bind(tx.amount)
    225         .bind(&tx.subject)
    226         .bind(tx.debtor_id)
    227         .bind(&tx.debtor_name)
    228         .bind_timestamp(now)
    229         .bind(tx.metadata.ty())
    230         .bind(tx.metadata.key())
    231        .try_map(|r: PgRow| {
    232             Ok(if r.try_get_flag(0)? {
    233                 AddIncomingResult::ReservePubReuse
    234             } else if r.try_get_flag(1)? {
    235                 AddIncomingResult::MappingReuse
    236             } else if r.try_get_flag(2)? {
    237                 AddIncomingResult::UnknownMapping
    238             } else {
    239                 AddIncomingResult::Success {
    240                     row_id: r.try_get_u64(3)?,
    241                     valued_at: r.try_get_timestamp(4)?,
    242                     new: r.try_get(5)?,
    243                     pending: r.try_get(6)?
    244                 }
    245             })
    246         })
    247         .fetch_one(db)
    248     )
    249 }
    250 
    251 pub async fn register_tx_in(
    252     db: &mut PgConnection,
    253     tx: &TxIn,
    254     subject: &Option<IncomingSubject>,
    255     now: &Timestamp,
    256 ) -> sqlx::Result<AddIncomingResult> {
    257     serialized!(
    258         sqlx::query(
    259             "
    260             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    261             FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    262         ",
    263         )
    264         .bind(tx.transfer_id)
    265         .bind(tx.tx_id)
    266         .bind(tx.amount)
    267         .bind(&tx.subject)
    268         .bind(tx.debtor_id)
    269         .bind(&tx.debtor_name)
    270         .bind(tx.valued_at.as_microsecond())
    271         .bind(subject.as_ref().map(|it| it.ty()))
    272         .bind(subject.as_ref().map(|it| it.key()))
    273         .bind(now.as_microsecond())
    274         .try_map(|r: PgRow| {
    275             Ok(if r.try_get_flag(0)? {
    276                 AddIncomingResult::ReservePubReuse
    277             } else if r.try_get_flag(1)? {
    278                 AddIncomingResult::MappingReuse
    279             } else if r.try_get_flag(2)? {
    280                 AddIncomingResult::UnknownMapping
    281             } else {
    282                 AddIncomingResult::Success {
    283                     row_id: r.try_get_u64(3)?,
    284                     valued_at: r.try_get_timestamp(4)?,
    285                     new: r.try_get(5)?,
    286                     pending: r.try_get(6)?
    287                 }
    288             })
    289         })
    290         .fetch_one(&mut *db)
    291     )
    292 }
    293 
    294 #[derive(Debug)]
    295 pub enum TxOutKind {
    296     Simple,
    297     Bounce(i64),
    298     Talerable(OutgoingSubject),
    299 }
    300 
    301 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    302 #[allow(non_camel_case_types)]
    303 #[sqlx(type_name = "register_result")]
    304 pub enum RegisterResult {
    305     /// Already registered
    306     idempotent,
    307     /// Initiated transaction
    308     known,
    309     /// Recovered unknown outgoing transaction
    310     recovered,
    311 }
    312 
    313 #[derive(Debug, PartialEq, Eq)]
    314 pub struct AddOutgoingResult {
    315     pub result: RegisterResult,
    316     pub row_id: i64,
    317 }
    318 
    319 pub async fn register_tx_out(
    320     db: &mut PgConnection,
    321     tx: &TxOut,
    322     kind: &TxOutKind,
    323     now: &Timestamp,
    324 ) -> sqlx::Result<AddOutgoingResult> {
    325     serialized!({
    326         let query = sqlx::query(
    327             "
    328                 SELECT out_result, out_tx_row_id
    329                 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
    330             ",
    331         )
    332         .bind(tx.transfer_id)
    333         .bind(tx.tx_id)
    334         .bind(tx.amount)
    335         .bind(&tx.subject)
    336         .bind(tx.creditor_id)
    337         .bind(&tx.creditor_name)
    338         .bind_timestamp(&tx.valued_at);
    339         let query = match kind {
    340             TxOutKind::Simple => query
    341                 .bind(None::<&[u8]>)
    342                 .bind(None::<&str>)
    343                 .bind(None::<&str>)
    344                 .bind(None::<i64>),
    345             TxOutKind::Bounce(bounced) => query
    346                 .bind(None::<&[u8]>)
    347                 .bind(None::<&str>)
    348                 .bind(None::<&str>)
    349                 .bind(*bounced),
    350             TxOutKind::Talerable(subject) => query
    351                 .bind(&subject.wtid)
    352                 .bind(subject.exchange_base_url.as_str())
    353                 .bind(&subject.metadata)
    354                 .bind(None::<i64>),
    355         };
    356         query
    357             .bind_timestamp(now)
    358             .try_map(|r: PgRow| {
    359                 Ok(AddOutgoingResult {
    360                     result: r.try_get(0)?,
    361                     row_id: r.try_get(1)?,
    362                 })
    363             })
    364             .fetch_one(&mut *db)
    365     })
    366 }
    367 
    368 #[derive(Debug, PartialEq, Eq)]
    369 pub enum TransferResult {
    370     Success { id: u64, initiated_at: Timestamp },
    371     RequestUidReuse,
    372     WtidReuse,
    373 }
    374 
    375 #[derive(Debug, Clone)]
    376 pub struct Transfer {
    377     pub request_uid: HashCode,
    378     pub amount: Decimal,
    379     pub exchange_base_url: Url,
    380     pub metadata: Option<CompactString>,
    381     pub wtid: ShortHashCode,
    382     pub creditor_id: i64,
    383     pub creditor_name: CompactString,
    384 }
    385 
    386 pub async fn make_transfer(
    387     db: &PgPool,
    388     tx: &Transfer,
    389     now: &Timestamp,
    390 ) -> sqlx::Result<TransferResult> {
    391     let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref());
    392     serialized!(
    393         sqlx::query(
    394             "
    395                 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    396                 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9)
    397             ",
    398         )
    399         .bind(&tx.request_uid)
    400         .bind(&tx.wtid)
    401         .bind(&subject)
    402         .bind(tx.amount)
    403         .bind(tx.exchange_base_url.as_str())
    404         .bind(&tx.metadata)
    405         .bind(tx.creditor_id)
    406         .bind(&tx.creditor_name)
    407         .bind_timestamp(now)
    408         .try_map(|r: PgRow| {
    409             Ok(if r.try_get_flag(0)? {
    410                 TransferResult::RequestUidReuse
    411             } else if r.try_get_flag(1)? {
    412                 TransferResult::WtidReuse
    413             } else {
    414                 TransferResult::Success {
    415                     id: r.try_get_u64(2)?,
    416                     initiated_at: r.try_get_timestamp(3)?,
    417                 }
    418             })
    419         })
    420         .fetch_one(db)
    421     )
    422 }
    423 
    424 #[derive(Debug, PartialEq, Eq)]
    425 pub struct BounceResult {
    426     pub tx_id: u64,
    427     pub tx_new: bool,
    428 }
    429 
    430 pub async fn register_bounced_tx_in(
    431     db: &mut PgConnection,
    432     tx: &TxIn,
    433     chargeback_id: i64,
    434     reason: &str,
    435     now: &Timestamp,
    436 ) -> sqlx::Result<BounceResult> {
    437     serialized!(
    438         sqlx::query(
    439             "
    440                 SELECT out_tx_row_id, out_tx_new
    441                 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    442             ",
    443         )
    444         .bind(tx.transfer_id)
    445         .bind(tx.tx_id)
    446         .bind(tx.amount)
    447         .bind(&tx.subject)
    448         .bind(tx.debtor_id)
    449         .bind(&tx.debtor_name)
    450         .bind_timestamp(&tx.valued_at)
    451         .bind(chargeback_id)
    452         .bind(reason)
    453         .bind_timestamp(now)
    454         .try_map(|r: PgRow| {
    455             Ok(BounceResult {
    456                 tx_id: r.try_get_u64(0)?,
    457                 tx_new: r.try_get(1)?,
    458             })
    459         })
    460         .fetch_one(&mut *db)
    461     )
    462 }
    463 
    464 pub async fn transfer_page(
    465     db: &PgPool,
    466     status: &Option<TransferState>,
    467     currency: &Currency,
    468     root: &CompactString,
    469     params: &Page,
    470 ) -> sqlx::Result<Vec<TransferListStatus>> {
    471     page(
    472         db,
    473         "initiated_id",
    474         params,
    475         || {
    476             let mut builder = QueryBuilder::new(
    477                 "
    478                     SELECT
    479                         initiated_id,
    480                         status,
    481                         amount,
    482                         credit_account,
    483                         credit_name,
    484                         initiated_at
    485                     FROM transfer
    486                     JOIN initiated USING (initiated_id)
    487                     WHERE
    488                 ",
    489             );
    490             if let Some(status) = status {
    491                 builder.push(" status = ").push_bind(status).push(" AND ");
    492             }
    493             builder
    494         },
    495         |r: PgRow| {
    496             Ok(TransferListStatus {
    497                 row_id: r.try_get_safeu64(0)?,
    498                 status: r.try_get(1)?,
    499                 amount: r.try_get_amount(2, currency)?,
    500                 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    501                 timestamp: r.try_get_timestamp(5)?.into(),
    502             })
    503         },
    504     )
    505     .await
    506 }
    507 
    508 pub async fn outgoing_history(
    509     db: &PgPool,
    510     params: &History,
    511     currency: &Currency,
    512     root: &CompactString,
    513     listen: impl FnOnce() -> Receiver<i64>,
    514 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    515     history(
    516         db,
    517         "tx_out_id",
    518         params,
    519         listen,
    520         || {
    521             QueryBuilder::new(
    522                 "
    523                 SELECT
    524                     tx_out_id,
    525                     amount,
    526                     credit_account,
    527                     credit_name,
    528                     valued_at,
    529                     exchange_base_url,
    530                     metadata,
    531                     wtid
    532                 FROM taler_out
    533                 JOIN tx_out USING (tx_out_id)
    534                 WHERE
    535             ",
    536             )
    537         },
    538         |r: PgRow| {
    539             Ok(OutgoingBankTransaction {
    540                 row_id: r.try_get_safeu64(0)?,
    541                 amount: r.try_get_amount(1, currency)?,
    542                 debit_fee: None,
    543                 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?,
    544                 date: r.try_get_timestamp(4)?.into(),
    545                 exchange_base_url: r.try_get_url(5)?,
    546                 metadata: r.try_get(6)?,
    547                 wtid: r.try_get(7)?,
    548             })
    549         },
    550     )
    551     .await
    552 }
    553 
    554 pub async fn incoming_history(
    555     db: &PgPool,
    556     params: &History,
    557     currency: &Currency,
    558     root: &CompactString,
    559     listen: impl FnOnce() -> Receiver<i64>,
    560 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    561     history(
    562         db,
    563         "tx_in_id",
    564         params,
    565         listen,
    566         || {
    567             QueryBuilder::new(
    568                 "
    569                 SELECT
    570                     type,
    571                     tx_in_id,
    572                     amount,
    573                     debit_account,
    574                     debit_name,
    575                     valued_at,
    576                     metadata,
    577                     authorization_pub,
    578                     authorization_sig
    579                 FROM taler_in
    580                 JOIN tx_in USING (tx_in_id)
    581                 WHERE
    582             ",
    583             )
    584         },
    585         |r: PgRow| {
    586             Ok(match r.try_get(0)? {
    587                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    588                     row_id: r.try_get_safeu64(1)?,
    589                     amount: r.try_get_amount(2, currency)?,
    590                     credit_fee: None,
    591                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    592                     date: r.try_get_timestamp(5)?.into(),
    593                     reserve_pub: r.try_get(6)?,
    594                     authorization_pub: r.try_get(7)?,
    595                     authorization_sig: r.try_get(8)?,
    596                 },
    597                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    598                     row_id: r.try_get_safeu64(1)?,
    599                     amount: r.try_get_amount(2, currency)?,
    600                     credit_fee: None,
    601                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    602                     date: r.try_get_timestamp(5)?.into(),
    603                     account_pub: r.try_get(6)?,
    604                     authorization_pub: r.try_get(7)?,
    605                     authorization_sig: r.try_get(8)?,
    606                 },
    607                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    608             })
    609         },
    610     )
    611     .await
    612 }
    613 
    614 pub async fn revenue_history(
    615     db: &PgPool,
    616     params: &History,
    617     currency: &Currency,
    618     root: &CompactString,
    619     listen: impl FnOnce() -> Receiver<i64>,
    620 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    621     history(
    622         db,
    623         "tx_in_id",
    624         params,
    625         listen,
    626         || {
    627             QueryBuilder::new(
    628                 "
    629                 SELECT
    630                     tx_in_id,
    631                     valued_at,
    632                     amount,
    633                     debit_account,
    634                     debit_name,
    635                     subject
    636                 FROM tx_in
    637                 WHERE
    638             ",
    639             )
    640         },
    641         |r: PgRow| {
    642             Ok(RevenueIncomingBankTransaction {
    643                 row_id: r.try_get_safeu64(0)?,
    644                 date: r.try_get_timestamp(1)?.into(),
    645                 amount: r.try_get_amount(2, currency)?,
    646                 credit_fee: None,
    647                 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    648                 subject: r.try_get(5)?,
    649             })
    650         },
    651     )
    652     .await
    653 }
    654 
    655 pub async fn transfer_by_id(
    656     db: &PgPool,
    657     id: u64,
    658     currency: &Currency,
    659     root: &CompactString,
    660 ) -> sqlx::Result<Option<TransferStatus>> {
    661     serialized!(
    662         sqlx::query(
    663             "
    664                 SELECT
    665                     status,
    666                     status_msg,
    667                     amount,
    668                     exchange_base_url,
    669                     metadata,
    670                     wtid,
    671                     credit_account,
    672                     credit_name,
    673                     initiated_at
    674                 FROM transfer
    675                 JOIN initiated USING (initiated_id)
    676                 WHERE initiated_id = $1
    677             ",
    678         )
    679         .bind(id as i64)
    680         .try_map(|r: PgRow| {
    681             Ok(TransferStatus {
    682                 status: r.try_get(0)?,
    683                 status_msg: r.try_get(1)?,
    684                 amount: r.try_get_amount(2, currency)?,
    685                 origin_exchange_url: r.try_get(3)?,
    686                 metadata: r.try_get(4)?,
    687                 wtid: r.try_get(5)?,
    688                 credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?,
    689                 timestamp: r.try_get_timestamp(8)?.into(),
    690             })
    691         })
    692         .fetch_optional(db)
    693     )
    694 }
    695 
    696 /** Get a batch of pending initiated transactions not attempted since [start] */
    697 pub async fn pending_batch(
    698     db: &mut PgConnection,
    699     start: &Timestamp,
    700 ) -> sqlx::Result<Vec<Initiated>> {
    701     serialized!(
    702         sqlx::query(
    703             "
    704             SELECT initiated_id, amount, subject, credit_account, credit_name
    705             FROM initiated
    706             WHERE tx_id IS NULL
    707                 AND status='pending'
    708                 AND (last_submitted IS NULL OR last_submitted < $1)
    709             LIMIT 100
    710         ",
    711         )
    712         .bind_timestamp(start)
    713         .try_map(|r: PgRow| {
    714             Ok(Initiated {
    715                 id: r.try_get(0)?,
    716                 amount: r.try_get(1)?,
    717                 subject: r.try_get(2)?,
    718                 creditor_id: r.try_get(3)?,
    719                 creditor_name: r.try_get(4)?,
    720             })
    721         })
    722         .fetch_all(&mut *db)
    723     )
    724 }
    725 
    726 /** Update status of a successful submitted initiated transaction */
    727 pub async fn initiated_submit_success(
    728     db: &mut PgConnection,
    729     initiated_id: i64,
    730     timestamp: &Timestamp,
    731     tx_id: i64,
    732 ) -> sqlx::Result<()> {
    733     serialized!(
    734         sqlx::query(
    735             "
    736                 UPDATE initiated
    737                 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2
    738                 WHERE initiated_id=$3
    739             "
    740         )
    741         .bind_timestamp(timestamp)
    742         .bind(tx_id)
    743         .bind(initiated_id)
    744         .execute(&mut *db)
    745     )?;
    746     Ok(())
    747 }
    748 
    749 /** Update status of a permanently failed initiated transaction */
    750 pub async fn initiated_submit_permanent_failure(
    751     db: &mut PgConnection,
    752     initiated_id: i64,
    753     msg: &str,
    754 ) -> sqlx::Result<()> {
    755     serialized!(
    756         sqlx::query(
    757             "
    758                 UPDATE initiated
    759                 SET status='permanent_failure', status_msg=$1
    760                 WHERE initiated_id=$2
    761             ",
    762         )
    763         .bind(msg)
    764         .bind(initiated_id)
    765         .execute(&mut *db)
    766     )?;
    767     Ok(())
    768 }
    769 
    770 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    771 pub enum ChargebackFailureResult {
    772     Unknown,
    773     Known(u64),
    774     Idempotent(u64),
    775 }
    776 
    777 /** Update status of a charged back initiated transaction */
    778 pub async fn initiated_chargeback_failure(
    779     db: &mut PgConnection,
    780     transfer_id: i64,
    781 ) -> sqlx::Result<ChargebackFailureResult> {
    782     Ok(serialized!(
    783         sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)")
    784             .bind(transfer_id)
    785             .try_map(|r: PgRow| {
    786                 let id = r.try_get_u64(0)?;
    787                 Ok(if id == 0 {
    788                     ChargebackFailureResult::Unknown
    789                 } else if r.try_get(1)? {
    790                     ChargebackFailureResult::Known(id)
    791                 } else {
    792                     ChargebackFailureResult::Idempotent(id)
    793                 })
    794             })
    795             .fetch_optional(&mut *db)
    796     )?
    797     .unwrap_or(ChargebackFailureResult::Unknown))
    798 }
    799 
    800 /** Get JSON value from KV table */
    801 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>(
    802     db: &mut PgConnection,
    803     key: &str,
    804 ) -> sqlx::Result<Option<T>> {
    805     serialized!(
    806         sqlx::query("SELECT value FROM kv WHERE key=$1")
    807             .bind(key)
    808             .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    809             .fetch_optional(&mut *db)
    810     )
    811 }
    812 
    813 /** Set JSON value in KV table */
    814 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> {
    815     serialized!(
    816         sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    817             .bind(key)
    818             .bind(sqlx::types::Json(value))
    819             .execute(&mut *db)
    820     )?;
    821     Ok(())
    822 }
    823 
    824 pub enum RegistrationResult {
    825     Success,
    826     ReservePubReuse,
    827 }
    828 
    829 pub async fn transfer_register(
    830     db: &PgPool,
    831     req: &RegistrationRequest,
    832 ) -> sqlx::Result<RegistrationResult> {
    833     let ty: IncomingType = req.r#type.into();
    834     serialized!(
    835         sqlx::query(
    836             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    837         )
    838         .bind(ty)
    839         .bind(&req.account_pub)
    840         .bind(&req.authorization_pub)
    841         .bind(&req.authorization_sig)
    842         .bind(req.recurrent)
    843         .bind_timestamp(&Timestamp::now())
    844         .try_map(|r: PgRow| {
    845             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    846                 RegistrationResult::ReservePubReuse
    847             } else {
    848                 RegistrationResult::Success
    849             })
    850         })
    851         .fetch_one(db)
    852     )
    853 }
    854 
    855 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    856     serialized!(
    857         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)")
    858             .bind(&req.authorization_pub)
    859             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    860             .fetch_one(db)
    861     )
    862 }
    863 
    864 pub trait CyclosTypeHelper {
    865     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    866         &self,
    867         idx: I,
    868         name: I,
    869         root: &CompactString,
    870     ) -> sqlx::Result<FullCyclosPayto>;
    871     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    872         &self,
    873         idx: I,
    874         name: I,
    875         root: &CompactString,
    876     ) -> sqlx::Result<PaytoURI>;
    877 }
    878 
    879 impl CyclosTypeHelper for PgRow {
    880     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    881         &self,
    882         idx: I,
    883         name: I,
    884         root: &CompactString,
    885     ) -> sqlx::Result<FullCyclosPayto> {
    886         let idx = self.try_get(idx)?;
    887         let name = self.try_get(name)?;
    888         Ok(FullCyclosPayto::new(
    889             CyclosAccount {
    890                 id: CyclosId(idx),
    891                 root: root.clone(),
    892             },
    893             name,
    894         ))
    895     }
    896     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    897         &self,
    898         idx: I,
    899         name: I,
    900         root: &CompactString,
    901     ) -> sqlx::Result<PaytoURI> {
    902         let idx = self.try_get(idx)?;
    903         let name = self.try_get(name)?;
    904         Ok(CyclosAccount {
    905             id: CyclosId(idx),
    906             root: root.clone(),
    907         }
    908         .as_full_payto(name))
    909     }
    910 }
    911 
    912 #[cfg(test)]
    913 mod test {
    914     use std::sync::LazyLock;
    915 
    916     use compact_str::CompactString;
    917     use jiff::{Span, Timestamp};
    918     use serde_json::json;
    919     use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow};
    920     use taler_api::{
    921         db::TypeHelper,
    922         notification::dummy_listen,
    923         subject::{IncomingSubject, OutgoingSubject},
    924     };
    925     use taler_common::{
    926         api_common::{EddsaPublicKey, HashCode, ShortHashCode},
    927         api_params::{History, Page},
    928         api_wire::TransferState,
    929         types::{
    930             amount::{Currency, decimal},
    931             url,
    932             utils::now_sql_stable_ts,
    933         },
    934     };
    935 
    936     use crate::{
    937         constants::CONFIG_SOURCE,
    938         db::{
    939             self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult,
    940             Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind,
    941         },
    942     };
    943 
    944     pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "TEST".parse().unwrap());
    945     pub const ROOT: CompactString = CompactString::const_new("localhost");
    946 
    947     async fn setup() -> (PoolConnection<Postgres>, PgPool) {
    948         taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await
    949     }
    950 
    951     #[tokio::test]
    952     async fn kv() {
    953         let (mut db, _) = setup().await;
    954 
    955         let value = json!({
    956             "name": "Mr Smith",
    957             "no way": 32
    958         });
    959 
    960         assert_eq!(
    961             db::kv_get::<serde_json::Value>(&mut db, "value")
    962                 .await
    963                 .unwrap(),
    964             None
    965         );
    966         db::kv_set(&mut db, "value", &value).await.unwrap();
    967         db::kv_set(&mut db, "value", &value).await.unwrap();
    968         assert_eq!(
    969             db::kv_get::<serde_json::Value>(&mut db, "value")
    970                 .await
    971                 .unwrap(),
    972             Some(value)
    973         );
    974     }
    975 
    976     #[tokio::test]
    977     async fn tx_in() {
    978         let (mut db, pool) = setup().await;
    979 
    980         let mut routine = async |first: &Option<IncomingSubject>,
    981                                  second: &Option<IncomingSubject>| {
    982             let id = sqlx::query("SELECT count(*) + 1 FROM tx_in")
    983                 .try_map(|r: PgRow| r.try_get_u64(0))
    984                 .fetch_one(&mut *db)
    985                 .await
    986                 .unwrap();
    987             let now = now_sql_stable_ts();
    988             let later = now + Span::new().hours(2);
    989             let tx = TxIn {
    990                 transfer_id: now.as_microsecond() as i64,
    991                 tx_id: None,
    992                 amount: decimal("10"),
    993                 subject: "subject".to_owned(),
    994                 debtor_id: 31000163100000000,
    995                 debtor_name: "Name".into(),
    996                 valued_at: now,
    997             };
    998             // Insert
    999             assert_eq!(
   1000                 db::register_tx_in(&mut db, &tx, first, &now)
   1001                     .await
   1002                     .expect("register tx in"),
   1003                 AddIncomingResult::Success {
   1004                     new: true,
   1005                     pending: false,
   1006                     row_id: id,
   1007                     valued_at: now,
   1008                 }
   1009             );
   1010             // Idempotent
   1011             assert_eq!(
   1012                 db::register_tx_in(
   1013                     &mut db,
   1014                     &TxIn {
   1015                         valued_at: later,
   1016                         ..tx.clone()
   1017                     },
   1018                     first,
   1019                     &now
   1020                 )
   1021                 .await
   1022                 .expect("register tx in"),
   1023                 AddIncomingResult::Success {
   1024                     new: false,
   1025                     pending: false,
   1026                     row_id: id,
   1027                     valued_at: now
   1028                 }
   1029             );
   1030             // Many
   1031             assert_eq!(
   1032                 db::register_tx_in(
   1033                     &mut db,
   1034                     &TxIn {
   1035                         transfer_id: later.as_microsecond() as i64,
   1036                         valued_at: later,
   1037                         ..tx
   1038                     },
   1039                     second,
   1040                     &now
   1041                 )
   1042                 .await
   1043                 .expect("register tx in"),
   1044                 AddIncomingResult::Success {
   1045                     new: true,
   1046                     pending: false,
   1047                     row_id: id + 1,
   1048                     valued_at: later
   1049                 }
   1050             );
   1051         };
   1052 
   1053         // Empty db
   1054         assert_eq!(
   1055             db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1056                 .await
   1057                 .unwrap(),
   1058             Vec::new()
   1059         );
   1060         assert_eq!(
   1061             db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1062                 .await
   1063                 .unwrap(),
   1064             Vec::new()
   1065         );
   1066 
   1067         // Regular transaction
   1068         routine(&None, &None).await;
   1069 
   1070         // Reserve transaction
   1071         routine(
   1072             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1073             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1074         )
   1075         .await;
   1076 
   1077         // Kyc transaction
   1078         routine(
   1079             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1080             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1081         )
   1082         .await;
   1083 
   1084         // History
   1085         assert_eq!(
   1086             db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1087                 .await
   1088                 .unwrap()
   1089                 .len(),
   1090             6
   1091         );
   1092         assert_eq!(
   1093             db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1094                 .await
   1095                 .unwrap()
   1096                 .len(),
   1097             4
   1098         );
   1099     }
   1100 
   1101     #[tokio::test]
   1102     async fn tx_in_admin() {
   1103         let (_, pool) = setup().await;
   1104 
   1105         // Empty db
   1106         assert_eq!(
   1107             db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1108                 .await
   1109                 .unwrap(),
   1110             Vec::new()
   1111         );
   1112 
   1113         let now = now_sql_stable_ts();
   1114         let later = now + Span::new().hours(2);
   1115         let tx = TxInAdmin {
   1116             amount: decimal("10"),
   1117             subject: "subject".to_owned(),
   1118             debtor_id: 31000163100000000,
   1119             debtor_name: "Name".into(),
   1120             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1121         };
   1122         // Insert
   1123         assert_eq!(
   1124             db::register_tx_in_admin(&pool, &tx, &now)
   1125                 .await
   1126                 .expect("register tx in"),
   1127             AddIncomingResult::Success {
   1128                 new: true,
   1129                 pending: false,
   1130                 row_id: 1,
   1131                 valued_at: now
   1132             }
   1133         );
   1134         // Many
   1135         assert_eq!(
   1136             db::register_tx_in_admin(
   1137                 &pool,
   1138                 &TxInAdmin {
   1139                     subject: "Other".to_owned(),
   1140                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1141                     ..tx.clone()
   1142                 },
   1143                 &later
   1144             )
   1145             .await
   1146             .expect("register tx in"),
   1147             AddIncomingResult::Success {
   1148                 new: true,
   1149                 pending: false,
   1150                 row_id: 2,
   1151                 valued_at: later
   1152             }
   1153         );
   1154 
   1155         // History
   1156         assert_eq!(
   1157             db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1158                 .await
   1159                 .unwrap()
   1160                 .len(),
   1161             2
   1162         );
   1163     }
   1164 
   1165     #[tokio::test]
   1166     async fn tx_out() {
   1167         let (mut db, pool) = setup().await;
   1168 
   1169         let mut routine = async |first: &TxOutKind, second: &TxOutKind| {
   1170             let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out")
   1171                 .try_map(|r: PgRow| r.try_get(0))
   1172                 .fetch_one(&mut *db)
   1173                 .await
   1174                 .unwrap();
   1175             let now = now_sql_stable_ts();
   1176             let later = now + Span::new().hours(2);
   1177             let tx = TxOut {
   1178                 transfer_id,
   1179                 tx_id: Some(transfer_id),
   1180                 amount: decimal("10"),
   1181                 subject: "subject".to_owned(),
   1182                 creditor_id: 31000163100000000,
   1183                 creditor_name: "Name".into(),
   1184                 valued_at: now,
   1185             };
   1186             assert!(matches!(
   1187                 db::make_transfer(
   1188                     &pool,
   1189                     &Transfer {
   1190                         request_uid: HashCode::rand(),
   1191                         amount: decimal("10"),
   1192                         exchange_base_url: url("https://exchange.test.com/"),
   1193                         metadata: None,
   1194                         wtid: ShortHashCode::rand(),
   1195                         creditor_id: 31000163100000000,
   1196                         creditor_name: "Name".into()
   1197                     },
   1198                     &now
   1199                 )
   1200                 .await
   1201                 .unwrap(),
   1202                 TransferResult::Success { .. }
   1203             ));
   1204             db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id)
   1205                 .await
   1206                 .expect("status success");
   1207 
   1208             // Insert
   1209             assert_eq!(
   1210                 db::register_tx_out(&mut db, &tx, first, &now)
   1211                     .await
   1212                     .expect("register tx out"),
   1213                 AddOutgoingResult {
   1214                     result: db::RegisterResult::known,
   1215                     row_id: transfer_id,
   1216                 }
   1217             );
   1218             // Idempotent
   1219             assert_eq!(
   1220                 db::register_tx_out(
   1221                     &mut db,
   1222                     &TxOut {
   1223                         valued_at: later,
   1224                         ..tx.clone()
   1225                     },
   1226                     first,
   1227                     &now
   1228                 )
   1229                 .await
   1230                 .expect("register tx out"),
   1231                 AddOutgoingResult {
   1232                     result: db::RegisterResult::idempotent,
   1233                     row_id: transfer_id,
   1234                 }
   1235             );
   1236             // Recovered
   1237             assert_eq!(
   1238                 db::register_tx_out(
   1239                     &mut db,
   1240                     &TxOut {
   1241                         transfer_id: transfer_id + 1,
   1242                         tx_id: Some(transfer_id + 1),
   1243                         valued_at: later,
   1244                         ..tx.clone()
   1245                     },
   1246                     second,
   1247                     &now
   1248                 )
   1249                 .await
   1250                 .expect("register tx out"),
   1251                 AddOutgoingResult {
   1252                     result: db::RegisterResult::recovered,
   1253                     row_id: transfer_id + 1,
   1254                 }
   1255             );
   1256         };
   1257 
   1258         // Empty db
   1259         assert_eq!(
   1260             db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1261                 .await
   1262                 .unwrap(),
   1263             Vec::new()
   1264         );
   1265 
   1266         // Regular transaction
   1267         routine(&TxOutKind::Simple, &TxOutKind::Simple).await;
   1268 
   1269         // Talerable transaction
   1270         routine(
   1271             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1272             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1273         )
   1274         .await;
   1275 
   1276         // Bounced transaction
   1277         routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1278 
   1279         // History
   1280         assert_eq!(
   1281             db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1282                 .await
   1283                 .unwrap()
   1284                 .len(),
   1285             2
   1286         );
   1287     }
   1288 
   1289     // TODO tx out failure
   1290 
   1291     #[tokio::test]
   1292     async fn transfer() {
   1293         let (_, pool) = setup().await;
   1294 
   1295         // Empty db
   1296         assert_eq!(
   1297             db::transfer_by_id(&pool, 0, &CURRENCY, &ROOT)
   1298                 .await
   1299                 .unwrap(),
   1300             None
   1301         );
   1302         assert_eq!(
   1303             db::transfer_page(&pool, &None, &CURRENCY, &ROOT, &Page::default())
   1304                 .await
   1305                 .unwrap(),
   1306             Vec::new()
   1307         );
   1308 
   1309         let req = Transfer {
   1310             request_uid: HashCode::rand(),
   1311             amount: decimal("10"),
   1312             exchange_base_url: url("https://exchange.test.com/"),
   1313             metadata: None,
   1314             wtid: ShortHashCode::rand(),
   1315             creditor_id: 31000163100000000,
   1316             creditor_name: "Name".into(),
   1317         };
   1318         let now = now_sql_stable_ts();
   1319         let later = now + Span::new().hours(2);
   1320         // Insert
   1321         assert_eq!(
   1322             db::make_transfer(&pool, &req, &now)
   1323                 .await
   1324                 .expect("transfer"),
   1325             TransferResult::Success {
   1326                 id: 1,
   1327                 initiated_at: now
   1328             }
   1329         );
   1330         // Idempotent
   1331         assert_eq!(
   1332             db::make_transfer(&pool, &req, &later)
   1333                 .await
   1334                 .expect("transfer"),
   1335             TransferResult::Success {
   1336                 id: 1,
   1337                 initiated_at: now
   1338             }
   1339         );
   1340         // Request UID reuse
   1341         assert_eq!(
   1342             db::make_transfer(
   1343                 &pool,
   1344                 &Transfer {
   1345                     wtid: ShortHashCode::rand(),
   1346                     ..req.clone()
   1347                 },
   1348                 &now
   1349             )
   1350             .await
   1351             .expect("transfer"),
   1352             TransferResult::RequestUidReuse
   1353         );
   1354         // wtid reuse
   1355         assert_eq!(
   1356             db::make_transfer(
   1357                 &pool,
   1358                 &Transfer {
   1359                     request_uid: HashCode::rand(),
   1360                     ..req.clone()
   1361                 },
   1362                 &now
   1363             )
   1364             .await
   1365             .expect("transfer"),
   1366             TransferResult::WtidReuse
   1367         );
   1368         // Many
   1369         assert_eq!(
   1370             db::make_transfer(
   1371                 &pool,
   1372                 &Transfer {
   1373                     request_uid: HashCode::rand(),
   1374                     wtid: ShortHashCode::rand(),
   1375                     ..req
   1376                 },
   1377                 &later
   1378             )
   1379             .await
   1380             .expect("transfer"),
   1381             TransferResult::Success {
   1382                 id: 2,
   1383                 initiated_at: later
   1384             }
   1385         );
   1386 
   1387         // Get
   1388         assert!(
   1389             db::transfer_by_id(&pool, 1, &CURRENCY, &ROOT)
   1390                 .await
   1391                 .unwrap()
   1392                 .is_some()
   1393         );
   1394         assert!(
   1395             db::transfer_by_id(&pool, 2, &CURRENCY, &ROOT)
   1396                 .await
   1397                 .unwrap()
   1398                 .is_some()
   1399         );
   1400         assert!(
   1401             db::transfer_by_id(&pool, 3, &CURRENCY, &ROOT)
   1402                 .await
   1403                 .unwrap()
   1404                 .is_none()
   1405         );
   1406         assert_eq!(
   1407             db::transfer_page(&pool, &None, &CURRENCY, &ROOT, &Page::default())
   1408                 .await
   1409                 .unwrap()
   1410                 .len(),
   1411             2
   1412         );
   1413     }
   1414 
   1415     #[tokio::test]
   1416     async fn bounce() {
   1417         let (mut db, _) = setup().await;
   1418 
   1419         let amount = decimal("10");
   1420         let now = now_sql_stable_ts();
   1421 
   1422         // Bounce
   1423         assert_eq!(
   1424             db::register_bounced_tx_in(
   1425                 &mut db,
   1426                 &TxIn {
   1427                     transfer_id: 12,
   1428                     tx_id: None,
   1429                     amount,
   1430                     subject: "subject".to_owned(),
   1431                     debtor_id: 31000163100000000,
   1432                     debtor_name: "Name".into(),
   1433                     valued_at: now
   1434                 },
   1435                 22,
   1436                 "good reason",
   1437                 &now
   1438             )
   1439             .await
   1440             .expect("bounce"),
   1441             BounceResult {
   1442                 tx_id: 1,
   1443                 tx_new: true
   1444             }
   1445         );
   1446         // Idempotent
   1447         assert_eq!(
   1448             db::register_bounced_tx_in(
   1449                 &mut db,
   1450                 &TxIn {
   1451                     transfer_id: 12,
   1452                     tx_id: None,
   1453                     amount,
   1454                     subject: "subject".to_owned(),
   1455                     debtor_id: 31000163100000000,
   1456                     debtor_name: "Name".into(),
   1457                     valued_at: now
   1458                 },
   1459                 22,
   1460                 "good reason",
   1461                 &now
   1462             )
   1463             .await
   1464             .expect("bounce"),
   1465             BounceResult {
   1466                 tx_id: 1,
   1467                 tx_new: false
   1468             }
   1469         );
   1470 
   1471         // Many
   1472         assert_eq!(
   1473             db::register_bounced_tx_in(
   1474                 &mut db,
   1475                 &TxIn {
   1476                     transfer_id: 13,
   1477                     tx_id: None,
   1478                     amount,
   1479                     subject: "subject".to_owned(),
   1480                     debtor_id: 31000163100000000,
   1481                     debtor_name: "Name".into(),
   1482                     valued_at: now
   1483                 },
   1484                 23,
   1485                 "good reason",
   1486                 &now
   1487             )
   1488             .await
   1489             .expect("bounce"),
   1490             BounceResult {
   1491                 tx_id: 2,
   1492                 tx_new: true
   1493             }
   1494         );
   1495     }
   1496 
   1497     #[tokio::test]
   1498     async fn status() {
   1499         let (mut db, pool) = setup().await;
   1500 
   1501         let check_status = async |id: u64, status: TransferState, msg: Option<&str>| {
   1502             let transfer = db::transfer_by_id(&pool, id, &CURRENCY, &ROOT)
   1503                 .await
   1504                 .unwrap()
   1505                 .unwrap();
   1506             assert_eq!(
   1507                 (status, msg),
   1508                 (transfer.status, transfer.status_msg.as_deref())
   1509             );
   1510         };
   1511 
   1512         // Unknown transfer
   1513         db::initiated_submit_permanent_failure(&mut db, 1, "msg")
   1514             .await
   1515             .unwrap();
   1516         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1517             .await
   1518             .unwrap();
   1519         assert_eq!(
   1520             db::initiated_chargeback_failure(&mut db, 1).await.unwrap(),
   1521             ChargebackFailureResult::Unknown
   1522         );
   1523 
   1524         // Failure
   1525         db::make_transfer(
   1526             &pool,
   1527             &Transfer {
   1528                 request_uid: HashCode::rand(),
   1529                 amount: decimal("1"),
   1530                 exchange_base_url: url("https://exchange.test.com/"),
   1531                 metadata: None,
   1532                 wtid: ShortHashCode::rand(),
   1533                 creditor_id: 31000163100000000,
   1534                 creditor_name: "Name".into(),
   1535             },
   1536             &Timestamp::now(),
   1537         )
   1538         .await
   1539         .expect("transfer");
   1540         check_status(1, TransferState::pending, None).await;
   1541         db::initiated_submit_permanent_failure(&mut db, 1, "error status")
   1542             .await
   1543             .unwrap();
   1544         check_status(1, TransferState::permanent_failure, Some("error status")).await;
   1545 
   1546         // Success
   1547         db::make_transfer(
   1548             &pool,
   1549             &Transfer {
   1550                 request_uid: HashCode::rand(),
   1551                 amount: decimal("1"),
   1552                 exchange_base_url: url("https://exchange.test.com/"),
   1553                 metadata: None,
   1554                 wtid: ShortHashCode::rand(),
   1555                 creditor_id: 31000163100000000,
   1556                 creditor_name: "Name".into(),
   1557             },
   1558             &Timestamp::now(),
   1559         )
   1560         .await
   1561         .expect("transfer");
   1562         check_status(2, TransferState::pending, None).await;
   1563         db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3)
   1564             .await
   1565             .unwrap();
   1566         check_status(2, TransferState::pending, None).await;
   1567         db::register_tx_out(
   1568             &mut db,
   1569             &TxOut {
   1570                 transfer_id: 5,
   1571                 tx_id: Some(3),
   1572                 amount: decimal("2"),
   1573                 subject: "".to_string(),
   1574                 creditor_id: 31000163100000000,
   1575                 creditor_name: "Name".into(),
   1576                 valued_at: Timestamp::now(),
   1577             },
   1578             &TxOutKind::Simple,
   1579             &Timestamp::now(),
   1580         )
   1581         .await
   1582         .unwrap();
   1583         check_status(2, TransferState::success, None).await;
   1584 
   1585         // Chargeback
   1586         assert_eq!(
   1587             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1588             ChargebackFailureResult::Known(2)
   1589         );
   1590         check_status(2, TransferState::late_failure, Some("charged back")).await;
   1591         assert_eq!(
   1592             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1593             ChargebackFailureResult::Idempotent(2)
   1594         );
   1595     }
   1596 
   1597     #[tokio::test]
   1598     async fn batch() {
   1599         let (mut db, pool) = setup().await;
   1600         let start = Timestamp::now();
   1601 
   1602         // Empty db
   1603         let pendings = db::pending_batch(&mut db, &start)
   1604             .await
   1605             .expect("pending_batch");
   1606         assert_eq!(pendings.len(), 0);
   1607 
   1608         // Some transfers
   1609         for i in 0..3 {
   1610             db::make_transfer(
   1611                 &pool,
   1612                 &Transfer {
   1613                     request_uid: HashCode::rand(),
   1614                     amount: decimal(format!("{}", i + 1)),
   1615                     exchange_base_url: url("https://exchange.test.com/"),
   1616                     metadata: None,
   1617                     wtid: ShortHashCode::rand(),
   1618                     creditor_id: 31000163100000000,
   1619                     creditor_name: "Name".into(),
   1620                 },
   1621                 &Timestamp::now(),
   1622             )
   1623             .await
   1624             .expect("transfer");
   1625         }
   1626         let pendings = db::pending_batch(&mut db, &start)
   1627             .await
   1628             .expect("pending_batch");
   1629         assert_eq!(pendings.len(), 3);
   1630 
   1631         // Max 100 txs in batch
   1632         for i in 0..100 {
   1633             db::make_transfer(
   1634                 &pool,
   1635                 &Transfer {
   1636                     request_uid: HashCode::rand(),
   1637                     amount: decimal(format!("{}", i + 1)),
   1638                     exchange_base_url: url("https://exchange.test.com/"),
   1639                     metadata: None,
   1640                     wtid: ShortHashCode::rand(),
   1641                     creditor_id: 31000163100000000,
   1642                     creditor_name: "Name".into(),
   1643                 },
   1644                 &Timestamp::now(),
   1645             )
   1646             .await
   1647             .expect("transfer");
   1648         }
   1649         let pendings = db::pending_batch(&mut db, &start)
   1650             .await
   1651             .expect("pending_batch");
   1652         assert_eq!(pendings.len(), 100);
   1653 
   1654         // Skip uploaded
   1655         for i in 0..=10 {
   1656             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1657                 .await
   1658                 .expect("status success");
   1659         }
   1660         let pendings = db::pending_batch(&mut db, &start)
   1661             .await
   1662             .expect("pending_batch");
   1663         assert_eq!(pendings.len(), 93);
   1664 
   1665         // Skip failed
   1666         for i in 0..=10 {
   1667             db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure")
   1668                 .await
   1669                 .expect("status failure");
   1670         }
   1671         let pendings = db::pending_batch(&mut db, &start)
   1672             .await
   1673             .expect("pending_batch");
   1674         assert_eq!(pendings.len(), 83);
   1675     }
   1676 }