taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (47336B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use jiff::{Timestamp, civil::Date, tz::TimeZone};
     20 use serde::{Serialize, de::DeserializeOwned};
     21 use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow};
     22 use taler_api::{
     23     db::{BindHelper, IncomingType, TypeHelper, history, page},
     24     subject::{IncomingSubject, OutgoingSubject},
     25 };
     26 use taler_common::{
     27     api_common::{HashCode, ShortHashCode},
     28     api_params::{History, Page},
     29     api_revenue::RevenueIncomingBankTransaction,
     30     api_wire::{
     31         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState,
     32         TransferStatus,
     33     },
     34     config::Config,
     35     types::{
     36         amount::{Amount, Decimal},
     37         payto::PaytoImpl as _,
     38     },
     39 };
     40 use tokio::sync::watch::{Receiver, Sender};
     41 use url::Url;
     42 
     43 use crate::{FullHuPayto, config::parse_db_cfg, constants::CURRENCY, magnet_api::types::TxStatus};
     44 
     45 const SCHEMA: &str = "magnet_bank";
     46 
     47 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> {
     48     let db = parse_db_cfg(cfg)?;
     49     let pool = taler_common::db::pool(db.cfg, SCHEMA).await?;
     50     Ok(pool)
     51 }
     52 
     53 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> {
     54     let db_cfg = parse_db_cfg(cfg)?;
     55     let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?;
     56     let mut db = pool.acquire().await?;
     57     taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
     58     Ok(pool)
     59 }
     60 
     61 pub async fn notification_listener(
     62     pool: PgPool,
     63     in_channel: Sender<i64>,
     64     taler_in_channel: Sender<i64>,
     65     out_channel: Sender<i64>,
     66     taler_out_channel: Sender<i64>,
     67 ) -> sqlx::Result<()> {
     68     taler_api::notification::notification_listener!(&pool,
     69         "tx_in" => (row_id: i64) {
     70             in_channel.send_replace(row_id);
     71         },
     72         "taler_in" => (row_id: i64) {
     73             taler_in_channel.send_replace(row_id);
     74         },
     75         "tx_out" => (row_id: i64) {
     76             out_channel.send_replace(row_id);
     77         },
     78         "taler_out" => (row_id: i64) {
     79             taler_out_channel.send_replace(row_id);
     80         }
     81     )
     82 }
     83 
     84 #[derive(Debug, Clone)]
     85 pub struct TxIn {
     86     pub code: u64,
     87     pub amount: Amount,
     88     pub subject: String,
     89     pub debtor: FullHuPayto,
     90     pub value_date: Date,
     91     pub status: TxStatus,
     92 }
     93 
     94 impl Display for TxIn {
     95     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     96         let Self {
     97             code,
     98             amount,
     99             subject,
    100             debtor,
    101             value_date,
    102             status,
    103         } = self;
    104         write!(
    105             f,
    106             "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
    107             debtor.bban(),
    108             debtor.name
    109         )
    110     }
    111 }
    112 
    113 #[derive(Debug, Clone)]
    114 pub struct TxOut {
    115     pub code: u64,
    116     pub amount: Amount,
    117     pub subject: String,
    118     pub creditor: FullHuPayto,
    119     pub value_date: Date,
    120     pub status: TxStatus,
    121 }
    122 
    123 impl Display for TxOut {
    124     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    125         let Self {
    126             code,
    127             amount,
    128             subject,
    129             creditor,
    130             value_date,
    131             status,
    132         } = self;
    133         write!(
    134             f,
    135             "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
    136             creditor.bban(),
    137             &creditor.name
    138         )
    139     }
    140 }
    141 
    142 #[derive(Debug, PartialEq, Eq)]
    143 pub struct Initiated {
    144     pub id: u64,
    145     pub amount: Amount,
    146     pub subject: String,
    147     pub creditor: FullHuPayto,
    148 }
    149 
    150 impl Display for Initiated {
    151     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    152         let Self {
    153             id,
    154             amount,
    155             subject,
    156             creditor,
    157         } = self;
    158         write!(
    159             f,
    160             "{id} {amount} ({} {}) '{subject}'",
    161             creditor.bban(),
    162             &creditor.name
    163         )
    164     }
    165 }
    166 
    167 #[derive(Debug, Clone)]
    168 pub struct TxInAdmin {
    169     pub amount: Amount,
    170     pub subject: String,
    171     pub debtor: FullHuPayto,
    172     pub metadata: IncomingSubject,
    173 }
    174 
    175 /// Lock the database for worker execution
    176 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
    177     sqlx::query("SELECT pg_try_advisory_lock(42)")
    178         .try_map(|r: PgRow| r.try_get(0))
    179         .fetch_one(e)
    180         .await
    181 }
    182 
    183 #[derive(Debug, PartialEq, Eq)]
    184 pub enum AddIncomingResult {
    185     Success {
    186         new: bool,
    187         row_id: u64,
    188         valued_at: Date,
    189     },
    190     ReservePubReuse,
    191 }
    192 
    193 pub async fn register_tx_in_admin(
    194     db: &PgPool,
    195     tx: &TxInAdmin,
    196     now: &Timestamp,
    197 ) -> sqlx::Result<AddIncomingResult> {
    198     sqlx::query(
    199         "
    200             SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new
    201             FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5)
    202         ",
    203     )
    204     .bind(&tx.amount)
    205     .bind(&tx.subject)
    206     .bind(tx.debtor.iban())
    207     .bind(&tx.debtor.name)
    208     .bind_date(&now.to_zoned(TimeZone::UTC).date())
    209     .bind(tx.metadata.ty())
    210     .bind(tx.metadata.key())
    211     .try_map(|r: PgRow| {
    212         Ok(if r.try_get_flag(0)? {
    213             AddIncomingResult::ReservePubReuse
    214         } else {
    215             AddIncomingResult::Success {
    216                 row_id: r.try_get_u64(1)?,
    217                 valued_at: r.try_get_date(2)?,
    218                 new: r.try_get(3)?,
    219             }
    220         })
    221     })
    222     .fetch_one(db)
    223     .await
    224 }
    225 
    226 pub async fn register_tx_in(
    227     db: &mut PgConnection,
    228     tx: &TxIn,
    229     subject: &Option<IncomingSubject>,
    230     now: &Timestamp,
    231 ) -> sqlx::Result<AddIncomingResult> {
    232     sqlx::query(
    233         "
    234             SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new
    235             FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9)
    236         ",
    237     )
    238     .bind(tx.code as i64)
    239     .bind(&tx.amount)
    240     .bind(&tx.subject)
    241     .bind(tx.debtor.iban())
    242     .bind(&tx.debtor.name)
    243     .bind_date(&tx.value_date)
    244     .bind(subject.as_ref().map(|it| it.ty()))
    245     .bind(subject.as_ref().map(|it| it.key()))
    246     .bind_timestamp(now)
    247     .try_map(|r: PgRow| {
    248         Ok(if r.try_get_flag(0)? {
    249             AddIncomingResult::ReservePubReuse
    250         } else {
    251             AddIncomingResult::Success {
    252                 row_id: r.try_get_u64(1)?,
    253                 valued_at: r.try_get_date(2)?,
    254                 new: r.try_get(3)?,
    255             }
    256         })
    257     })
    258     .fetch_one(db)
    259     .await
    260 }
    261 
    262 #[derive(Debug)]
    263 pub enum TxOutKind {
    264     Simple,
    265     Bounce(u32),
    266     Talerable(OutgoingSubject),
    267 }
    268 
    269 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    270 #[allow(non_camel_case_types)]
    271 #[sqlx(type_name = "register_result")]
    272 pub enum RegisterResult {
    273     /// Already registered
    274     idempotent,
    275     /// Initiated transaction
    276     known,
    277     /// Recovered unknown outgoing transaction
    278     recovered,
    279 }
    280 
    281 #[derive(Debug, PartialEq, Eq)]
    282 pub struct AddOutgoingResult {
    283     pub result: RegisterResult,
    284     pub row_id: u64,
    285 }
    286 
    287 pub async fn register_tx_out(
    288     db: &mut PgConnection,
    289     tx: &TxOut,
    290     kind: &TxOutKind,
    291     now: &Timestamp,
    292 ) -> sqlx::Result<AddOutgoingResult> {
    293     let query = sqlx::query(
    294         "
    295             SELECT out_result, out_tx_row_id 
    296             FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    297         ",
    298     )
    299     .bind(tx.code as i64)
    300     .bind(&tx.amount)
    301     .bind(&tx.subject)
    302     .bind(tx.creditor.iban())
    303     .bind(&tx.creditor.name)
    304     .bind_date(&tx.value_date);
    305     let query = match kind {
    306         TxOutKind::Simple => query
    307             .bind(None::<&[u8]>)
    308             .bind(None::<&str>)
    309             .bind(None::<i64>),
    310         TxOutKind::Bounce(bounced) => query
    311             .bind(None::<&[u8]>)
    312             .bind(None::<&str>)
    313             .bind(*bounced as i64),
    314         TxOutKind::Talerable(subject) => query
    315             .bind(subject.wtid.as_ref())
    316             .bind(subject.exchange_base_url.as_ref())
    317             .bind(None::<i64>),
    318     };
    319     query
    320         .bind_timestamp(now)
    321         .try_map(|r: PgRow| {
    322             Ok(AddOutgoingResult {
    323                 result: r.try_get(0)?,
    324                 row_id: r.try_get_u64(1)?,
    325             })
    326         })
    327         .fetch_one(db)
    328         .await
    329 }
    330 
    331 #[derive(Debug, PartialEq, Eq)]
    332 pub struct OutFailureResult {
    333     pub initiated_id: Option<u64>,
    334     pub new: bool,
    335 }
    336 
    337 pub async fn register_tx_out_failure(
    338     db: &mut PgConnection,
    339     code: u64,
    340     bounced: Option<u32>,
    341     now: &Timestamp,
    342 ) -> sqlx::Result<OutFailureResult> {
    343     sqlx::query(
    344         "
    345             SELECT out_new, out_initiated_id 
    346             FROM register_tx_out_failure($1, $2, $3)
    347         ",
    348     )
    349     .bind(code as i64)
    350     .bind(bounced.map(|i| i as i32))
    351     .bind_timestamp(now)
    352     .try_map(|r: PgRow| {
    353         Ok(OutFailureResult {
    354             new: r.try_get(0)?,
    355             initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64),
    356         })
    357     })
    358     .fetch_one(db)
    359     .await
    360 }
    361 
    362 #[derive(Debug, PartialEq, Eq)]
    363 pub enum TransferResult {
    364     Success { id: u64, initiated_at: Timestamp },
    365     RequestUidReuse,
    366     WtidReuse,
    367 }
    368 
    369 #[derive(Debug, Clone)]
    370 pub struct Transfer {
    371     pub request_uid: HashCode,
    372     pub amount: Decimal,
    373     pub exchange_base_url: Url,
    374     pub wtid: ShortHashCode,
    375     pub creditor: FullHuPayto,
    376 }
    377 
    378 pub async fn make_transfer<'a>(
    379     db: impl PgExecutor<'a>,
    380     tx: &Transfer,
    381     now: &Timestamp,
    382 ) -> sqlx::Result<TransferResult> {
    383     let subject = format!("{} {}", tx.wtid, tx.exchange_base_url);
    384     sqlx::query(
    385         "
    386             SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    387             FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8)
    388         ",
    389     )
    390     .bind(tx.request_uid.as_ref())
    391     .bind(tx.wtid.as_ref())
    392     .bind(&subject)
    393     .bind(tx.amount)
    394     .bind(tx.exchange_base_url.as_str())
    395     .bind(tx.creditor.iban())
    396     .bind(&tx.creditor.name)
    397     .bind_timestamp(now)
    398     .try_map(|r: PgRow| {
    399         Ok(if r.try_get_flag(0)? {
    400             TransferResult::RequestUidReuse
    401         } else if r.try_get_flag(1)? {
    402             TransferResult::WtidReuse
    403         } else {
    404             TransferResult::Success {
    405                 id: r.try_get_u64(2)?,
    406                 initiated_at: r.try_get_timestamp(3)?,
    407             }
    408         })
    409     })
    410     .fetch_one(db)
    411     .await
    412 }
    413 
    414 #[derive(Debug, PartialEq, Eq)]
    415 pub struct BounceResult {
    416     pub tx_id: u64,
    417     pub tx_new: bool,
    418     pub bounce_id: u64,
    419     pub bounce_new: bool,
    420 }
    421 
    422 pub async fn register_bounce_tx_in(
    423     db: &mut PgConnection,
    424     tx: &TxIn,
    425     amount: &Amount,
    426     reason: &str,
    427     now: &Timestamp,
    428 ) -> sqlx::Result<BounceResult> {
    429     sqlx::query(
    430         "
    431             SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new
    432             FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9)
    433         ",
    434     )
    435     .bind(tx.code as i64)
    436     .bind(&tx.amount)
    437     .bind(&tx.subject)
    438     .bind(tx.debtor.iban())
    439     .bind(&tx.debtor.name)
    440     .bind_date(&tx.value_date)
    441     .bind(amount)
    442     .bind(reason)
    443     .bind_timestamp(now)
    444     .try_map(|r: PgRow| {
    445         Ok(BounceResult {
    446             tx_id: r.try_get_u64(0)?,
    447             tx_new: r.try_get(1)?,
    448             bounce_id: r.try_get_u64(2)?,
    449             bounce_new: r.try_get(3)?,
    450         })
    451     })
    452     .fetch_one(db)
    453     .await
    454 }
    455 
    456 pub async fn transfer_page<'a>(
    457     db: impl PgExecutor<'a>,
    458     status: &Option<TransferState>,
    459     params: &Page,
    460 ) -> sqlx::Result<Vec<TransferListStatus>> {
    461     page(
    462         db,
    463         "initiated_id",
    464         params,
    465         || {
    466             let mut builder = QueryBuilder::new(
    467                 "
    468                     SELECT
    469                         initiated_id,
    470                         status,
    471                         amount,
    472                         credit_account,
    473                         credit_name,
    474                         initiated_at
    475                     FROM transfer 
    476                     JOIN initiated USING (initiated_id)
    477                     WHERE
    478                 ",
    479             );
    480             if let Some(status) = status {
    481                 builder.push(" status = ").push_bind(status).push(" AND ");
    482             }
    483             builder
    484         },
    485         |r: PgRow| {
    486             Ok(TransferListStatus {
    487                 row_id: r.try_get_safeu64(0)?,
    488                 status: r.try_get(1)?,
    489                 amount: r.try_get_amount(2, &CURRENCY)?,
    490                 credit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    491                 timestamp: r.try_get_timestamp(5)?.into(),
    492             })
    493         },
    494     )
    495     .await
    496 }
    497 
    498 pub async fn outgoing_history(
    499     db: &PgPool,
    500     params: &History,
    501     listen: impl FnOnce() -> Receiver<i64>,
    502 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    503     history(
    504         db,
    505         "tx_out_id",
    506         params,
    507         listen,
    508         || {
    509             QueryBuilder::new(
    510                 "
    511                 SELECT
    512                     tx_out_id,
    513                     amount,
    514                     credit_account,
    515                     credit_name,
    516                     valued_at,
    517                     exchange_base_url,
    518                     wtid
    519                 FROM taler_out
    520                 JOIN tx_out USING (tx_out_id)
    521                 WHERE
    522             ",
    523             )
    524         },
    525         |r: PgRow| {
    526             Ok(OutgoingBankTransaction {
    527                 row_id: r.try_get_safeu64(0)?,
    528                 amount: r.try_get_amount(1, &CURRENCY)?,
    529                 debit_fee: None,
    530                 credit_account: r.try_get_iban(2)?.as_full_payto(r.try_get(3)?),
    531                 date: r.try_get_timestamp(4)?.into(),
    532                 exchange_base_url: r.try_get_url(5)?,
    533                 wtid: r.try_get(6)?,
    534             })
    535         },
    536     )
    537     .await
    538 }
    539 
    540 pub async fn incoming_history(
    541     db: &PgPool,
    542     params: &History,
    543     listen: impl FnOnce() -> Receiver<i64>,
    544 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    545     history(
    546         db,
    547         "tx_in_id",
    548         params,
    549         listen,
    550         || {
    551             QueryBuilder::new(
    552                 "
    553                 SELECT
    554                     type,
    555                     tx_in_id,
    556                     amount,
    557                     debit_account,
    558                     debit_name,
    559                     valued_at,
    560                     metadata
    561                 FROM taler_in
    562                 JOIN tx_in USING (tx_in_id)
    563                 WHERE
    564             ",
    565             )
    566         },
    567         |r: PgRow| {
    568             Ok(match r.try_get(0)? {
    569                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    570                     row_id: r.try_get_safeu64(1)?,
    571                     amount: r.try_get_amount(2, &CURRENCY)?,
    572                     credit_fee: None,
    573                     debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    574                     date: r.try_get_timestamp(5)?.into(),
    575                     reserve_pub: r.try_get(6)?,
    576                     authorization_pub: None,
    577                     authorization_sig: None,
    578                 },
    579                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    580                     row_id: r.try_get_safeu64(1)?,
    581                     amount: r.try_get_amount(2, &CURRENCY)?,
    582                     credit_fee: None,
    583                     debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    584                     date: r.try_get_timestamp(5)?.into(),
    585                     account_pub: r.try_get(6)?,
    586                     authorization_pub: None,
    587                     authorization_sig: None,
    588                 },
    589                 IncomingType::wad => {
    590                     unimplemented!("WAD is not yet supported")
    591                 }
    592             })
    593         },
    594     )
    595     .await
    596 }
    597 
    598 pub async fn revenue_history(
    599     db: &PgPool,
    600     params: &History,
    601     listen: impl FnOnce() -> Receiver<i64>,
    602 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    603     history(
    604         db,
    605         "tx_in_id",
    606         params,
    607         listen,
    608         || {
    609             QueryBuilder::new(
    610                 "
    611                 SELECT
    612                     tx_in_id,
    613                     valued_at,
    614                     amount,
    615                     debit_account,
    616                     debit_name,
    617                     subject
    618                 FROM tx_in
    619                 WHERE
    620             ",
    621             )
    622         },
    623         |r: PgRow| {
    624             Ok(RevenueIncomingBankTransaction {
    625                 row_id: r.try_get_safeu64(0)?,
    626                 date: r.try_get_timestamp(1)?.into(),
    627                 amount: r.try_get_amount(2, &CURRENCY)?,
    628                 credit_fee: None,
    629                 debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    630                 subject: r.try_get(5)?,
    631             })
    632         },
    633     )
    634     .await
    635 }
    636 
    637 pub async fn transfer_by_id<'a>(
    638     db: impl PgExecutor<'a>,
    639     id: u64,
    640 ) -> sqlx::Result<Option<TransferStatus>> {
    641     sqlx::query(
    642         "
    643             SELECT
    644                 status,
    645                 status_msg,
    646                 amount,
    647                 exchange_base_url,
    648                 wtid,
    649                 credit_account,
    650                 credit_name,
    651                 initiated_at
    652             FROM transfer 
    653             JOIN initiated USING (initiated_id) 
    654             WHERE initiated_id = $1
    655         ",
    656     )
    657     .bind(id as i64)
    658     .try_map(|r: PgRow| {
    659         Ok(TransferStatus {
    660             status: r.try_get(0)?,
    661             status_msg: r.try_get(1)?,
    662             amount: r.try_get_amount(2, &CURRENCY)?,
    663             origin_exchange_url: r.try_get(3)?,
    664             wtid: r.try_get(4)?,
    665             credit_account: r.try_get_iban(5)?.as_full_payto(r.try_get(6)?),
    666             timestamp: r.try_get_timestamp(7)?.into(),
    667         })
    668     })
    669     .fetch_optional(db)
    670     .await
    671 }
    672 
    673 /** Get a batch of pending initiated transactions not attempted since [start] */
    674 pub async fn pending_batch<'a>(
    675     db: impl PgExecutor<'a>,
    676     start: &Timestamp,
    677 ) -> sqlx::Result<Vec<Initiated>> {
    678     sqlx::query(
    679         "
    680             SELECT initiated_id, amount, subject, credit_account, credit_name
    681             FROM initiated 
    682             WHERE magnet_code IS NULL
    683                 AND status='pending'
    684                 AND (last_submitted IS NULL OR last_submitted < $1)
    685             LIMIT 100
    686         ",
    687     )
    688     .bind_timestamp(start)
    689     .try_map(|r: PgRow| {
    690         Ok(Initiated {
    691             id: r.try_get_u64(0)?,
    692             amount: r.try_get_amount(1, &CURRENCY)?,
    693             subject: r.try_get(2)?,
    694             creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?),
    695         })
    696     })
    697     .fetch_all(db)
    698     .await
    699 }
    700 
    701 /** Get an initiated transaction matching the given magnet [code] */
    702 pub async fn initiated_by_code<'a>(
    703     db: impl PgExecutor<'a>,
    704     code: u64,
    705 ) -> sqlx::Result<Option<Initiated>> {
    706     sqlx::query(
    707         "
    708             SELECT initiated_id, amount, subject, credit_account, credit_name
    709             FROM initiated 
    710             WHERE magnet_code IS $1
    711         ",
    712     )
    713     .bind(code as i64)
    714     .try_map(|r: PgRow| {
    715         Ok(Initiated {
    716             id: r.try_get_u64(0)?,
    717             amount: r.try_get_amount(1, &CURRENCY)?,
    718             subject: r.try_get(2)?,
    719             creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?),
    720         })
    721     })
    722     .fetch_optional(db)
    723     .await
    724 }
    725 
    726 /** Update status of a successful submitted initiated transaction */
    727 pub async fn initiated_submit_success<'a>(
    728     db: impl PgExecutor<'a>,
    729     id: u64,
    730     timestamp: &Timestamp,
    731     magnet_code: u64,
    732 ) -> sqlx::Result<()> {
    733     sqlx::query(
    734         "
    735             UPDATE initiated
    736             SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2
    737             WHERE initiated_id=$3
    738         "
    739     ).bind_timestamp(timestamp)
    740     .bind(magnet_code as i64)
    741     .bind(id as i64)
    742     .execute(db).await?;
    743     Ok(())
    744 }
    745 
    746 /** Update status of a permanently failed initiated transaction */
    747 pub async fn initiated_submit_permanent_failure<'a>(
    748     db: impl PgExecutor<'a>,
    749     id: u64,
    750     timestamp: &Timestamp,
    751     msg: &str,
    752 ) -> sqlx::Result<()> {
    753     sqlx::query(
    754         "
    755             UPDATE initiated
    756             SET status='permanent_failure', status_msg=$2
    757             WHERE initiated_id=$3
    758         ",
    759     )
    760     .bind_timestamp(timestamp)
    761     .bind(msg)
    762     .bind(id as i64)
    763     .execute(db)
    764     .await?;
    765     Ok(())
    766 }
    767 
    768 /** Check if an initiated transaction exist for a magnet code */
    769 pub async fn initiated_exists_for_code<'a>(
    770     db: impl PgExecutor<'a>,
    771     code: u64,
    772 ) -> sqlx::Result<Option<u64>> {
    773     sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1")
    774         .bind(code as i64)
    775         .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64))
    776         .fetch_optional(db)
    777         .await
    778 }
    779 
    780 /** Get JSON value from KV table */
    781 pub async fn kv_get<'a, T: DeserializeOwned + Unpin + Send>(
    782     db: impl PgExecutor<'a>,
    783     key: &str,
    784 ) -> sqlx::Result<Option<T>> {
    785     sqlx::query("SELECT value FROM kv WHERE key=$1")
    786         .bind(key)
    787         .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    788         .fetch_optional(db)
    789         .await
    790 }
    791 
    792 /** Set JSON value in KV table */
    793 pub async fn kv_set<'a, T: Serialize>(
    794     db: impl PgExecutor<'a>,
    795     key: &str,
    796     value: &T,
    797 ) -> sqlx::Result<()> {
    798     sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    799         .bind(key)
    800         .bind(sqlx::types::Json(value))
    801         .execute(db)
    802         .await?;
    803     Ok(())
    804 }
    805 
    806 #[cfg(test)]
    807 mod test {
    808     use jiff::{Span, Timestamp, Zoned};
    809     use serde_json::json;
    810     use sqlx::{PgConnection, PgPool, postgres::PgRow};
    811     use taler_api::{
    812         db::TypeHelper,
    813         notification::dummy_listen,
    814         subject::{IncomingSubject, OutgoingSubject},
    815     };
    816     use taler_common::{
    817         api_common::{EddsaPublicKey, HashCode, ShortHashCode},
    818         api_params::{History, Page},
    819         types::{
    820             amount::{amount, decimal},
    821             url,
    822             utils::now_sql_stable_timestamp,
    823         },
    824     };
    825 
    826     use crate::{
    827         constants::CONFIG_SOURCE,
    828         db::{
    829             self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult,
    830             TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer,
    831             register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out,
    832         },
    833         magnet_api::types::TxStatus,
    834         magnet_payto,
    835     };
    836 
    837     use super::TxInAdmin;
    838 
    839     async fn setup() -> (PgConnection, PgPool) {
    840         let pool = taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await;
    841         let conn = pool.acquire().await.unwrap().leak();
    842         (conn, pool)
    843     }
    844 
    845     #[tokio::test]
    846     async fn kv() {
    847         let (mut db, _) = setup().await;
    848 
    849         let value = json!({
    850             "name": "Mr Smith",
    851             "no way": 32
    852         });
    853 
    854         assert_eq!(
    855             kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
    856             None
    857         );
    858         kv_set(&mut db, "value", &value).await.unwrap();
    859         kv_set(&mut db, "value", &value).await.unwrap();
    860         assert_eq!(
    861             kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
    862             Some(value)
    863         );
    864     }
    865 
    866     #[tokio::test]
    867     async fn tx_in() {
    868         let (mut db, pool) = setup().await;
    869 
    870         async fn routine(
    871             db: &mut PgConnection,
    872             first: &Option<IncomingSubject>,
    873             second: &Option<IncomingSubject>,
    874         ) {
    875             let (id, code) =
    876                 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in")
    877                     .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
    878                     .fetch_one(&mut *db)
    879                     .await
    880                     .unwrap();
    881             let now = now_sql_stable_timestamp();
    882             let date = Zoned::now().date();
    883             let later = date.tomorrow().unwrap();
    884             let tx = TxIn {
    885                 code: code,
    886                 amount: amount("EUR:10"),
    887                 subject: "subject".to_owned(),
    888                 debtor: magnet_payto(
    889                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
    890                 ),
    891                 value_date: date,
    892                 status: TxStatus::Completed,
    893             };
    894             // Insert
    895             assert_eq!(
    896                 register_tx_in(db, &tx, &first, &now)
    897                     .await
    898                     .expect("register tx in"),
    899                 AddIncomingResult::Success {
    900                     new: true,
    901                     row_id: id,
    902                     valued_at: date
    903                 }
    904             );
    905             // Idempotent
    906             assert_eq!(
    907                 register_tx_in(
    908                     db,
    909                     &TxIn {
    910                         value_date: later,
    911                         ..tx.clone()
    912                     },
    913                     &first,
    914                     &now
    915                 )
    916                 .await
    917                 .expect("register tx in"),
    918                 AddIncomingResult::Success {
    919                     new: false,
    920                     row_id: id,
    921                     valued_at: date
    922                 }
    923             );
    924             // Many
    925             assert_eq!(
    926                 register_tx_in(
    927                     db,
    928                     &TxIn {
    929                         code: code + 1,
    930                         value_date: later,
    931                         ..tx
    932                     },
    933                     &second,
    934                     &now
    935                 )
    936                 .await
    937                 .expect("register tx in"),
    938                 AddIncomingResult::Success {
    939                     new: true,
    940                     row_id: id + 1,
    941                     valued_at: later
    942                 }
    943             );
    944         }
    945 
    946         // Empty db
    947         assert_eq!(
    948             db::revenue_history(&pool, &History::default(), dummy_listen)
    949                 .await
    950                 .unwrap(),
    951             Vec::new()
    952         );
    953         assert_eq!(
    954             db::incoming_history(&pool, &History::default(), dummy_listen)
    955                 .await
    956                 .unwrap(),
    957             Vec::new()
    958         );
    959 
    960         // Regular transaction
    961         routine(&mut db, &None, &None).await;
    962 
    963         // Reserve transaction
    964         routine(
    965             &mut db,
    966             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
    967             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
    968         )
    969         .await;
    970 
    971         // Kyc transaction
    972         routine(
    973             &mut db,
    974             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
    975             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
    976         )
    977         .await;
    978 
    979         // History
    980         assert_eq!(
    981             db::revenue_history(&pool, &History::default(), dummy_listen)
    982                 .await
    983                 .unwrap()
    984                 .len(),
    985             6
    986         );
    987         assert_eq!(
    988             db::incoming_history(&pool, &History::default(), dummy_listen)
    989                 .await
    990                 .unwrap()
    991                 .len(),
    992             4
    993         );
    994     }
    995 
    996     #[tokio::test]
    997     async fn tx_in_admin() {
    998         let (_, pool) = setup().await;
    999 
   1000         // Empty db
   1001         assert_eq!(
   1002             db::incoming_history(&pool, &History::default(), dummy_listen)
   1003                 .await
   1004                 .unwrap(),
   1005             Vec::new()
   1006         );
   1007 
   1008         let now = now_sql_stable_timestamp();
   1009         let later = now + Span::new().hours(2);
   1010         let date = Zoned::now().date();
   1011         let tx = TxInAdmin {
   1012             amount: amount("EUR:10"),
   1013             subject: "subject".to_owned(),
   1014             debtor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"),
   1015             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1016         };
   1017         // Insert
   1018         assert_eq!(
   1019             register_tx_in_admin(&pool, &tx, &now)
   1020                 .await
   1021                 .expect("register tx in"),
   1022             AddIncomingResult::Success {
   1023                 new: true,
   1024                 row_id: 1,
   1025                 valued_at: date
   1026             }
   1027         );
   1028         // Idempotent
   1029         assert_eq!(
   1030             register_tx_in_admin(&pool, &tx, &later)
   1031                 .await
   1032                 .expect("register tx in"),
   1033             AddIncomingResult::Success {
   1034                 new: false,
   1035                 row_id: 1,
   1036                 valued_at: date
   1037             }
   1038         );
   1039         // Many
   1040         assert_eq!(
   1041             register_tx_in_admin(
   1042                 &pool,
   1043                 &TxInAdmin {
   1044                     subject: "Other".to_owned(),
   1045                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1046                     ..tx.clone()
   1047                 },
   1048                 &later
   1049             )
   1050             .await
   1051             .expect("register tx in"),
   1052             AddIncomingResult::Success {
   1053                 new: true,
   1054                 row_id: 2,
   1055                 valued_at: date
   1056             }
   1057         );
   1058 
   1059         // History
   1060         assert_eq!(
   1061             db::incoming_history(&pool, &History::default(), dummy_listen)
   1062                 .await
   1063                 .unwrap()
   1064                 .len(),
   1065             2
   1066         );
   1067     }
   1068 
   1069     #[tokio::test]
   1070     async fn tx_out() {
   1071         let (mut db, pool) = setup().await;
   1072 
   1073         async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) {
   1074             let (id, code) =
   1075                 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out")
   1076                     .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
   1077                     .fetch_one(&mut *db)
   1078                     .await
   1079                     .unwrap();
   1080             let now = now_sql_stable_timestamp();
   1081             let date = Zoned::now().date();
   1082             let later = date.tomorrow().unwrap();
   1083             let tx = TxOut {
   1084                 code,
   1085                 amount: amount("HUF:10"),
   1086                 subject: "subject".to_owned(),
   1087                 creditor: magnet_payto(
   1088                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
   1089                 ),
   1090                 value_date: date,
   1091                 status: TxStatus::Completed,
   1092             };
   1093             assert!(matches!(
   1094                 make_transfer(
   1095                     &mut *db,
   1096                     &db::Transfer {
   1097                         request_uid: HashCode::rand(),
   1098                         amount: decimal("10"),
   1099                         exchange_base_url: url("https://exchange.test.com/"),
   1100                         wtid: ShortHashCode::rand(),
   1101                         creditor: tx.creditor.clone()
   1102                     },
   1103                     &now
   1104                 )
   1105                 .await
   1106                 .unwrap(),
   1107                 TransferResult::Success { .. }
   1108             ));
   1109             db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), tx.code)
   1110                 .await
   1111                 .expect("status success");
   1112 
   1113             // Insert
   1114             assert_eq!(
   1115                 register_tx_out(&mut *db, &tx, first, &now)
   1116                     .await
   1117                     .expect("register tx out"),
   1118                 AddOutgoingResult {
   1119                     result: db::RegisterResult::known,
   1120                     row_id: id,
   1121                 }
   1122             );
   1123             // Idempotent
   1124             assert_eq!(
   1125                 register_tx_out(
   1126                     &mut *db,
   1127                     &TxOut {
   1128                         value_date: later,
   1129                         ..tx.clone()
   1130                     },
   1131                     first,
   1132                     &now
   1133                 )
   1134                 .await
   1135                 .expect("register tx out"),
   1136                 AddOutgoingResult {
   1137                     result: db::RegisterResult::idempotent,
   1138                     row_id: id,
   1139                 }
   1140             );
   1141             // Recovered
   1142             assert_eq!(
   1143                 register_tx_out(
   1144                     &mut *db,
   1145                     &TxOut {
   1146                         code: code + 1,
   1147                         value_date: later,
   1148                         ..tx.clone()
   1149                     },
   1150                     second,
   1151                     &now
   1152                 )
   1153                 .await
   1154                 .expect("register tx out"),
   1155                 AddOutgoingResult {
   1156                     result: db::RegisterResult::recovered,
   1157                     row_id: id + 1,
   1158                 }
   1159             );
   1160         }
   1161 
   1162         // Empty db
   1163         assert_eq!(
   1164             db::outgoing_history(&pool, &History::default(), dummy_listen)
   1165                 .await
   1166                 .unwrap(),
   1167             Vec::new()
   1168         );
   1169 
   1170         // Regular transaction
   1171         routine(&mut db, &TxOutKind::Simple, &TxOutKind::Simple).await;
   1172 
   1173         // Talerable transaction
   1174         routine(
   1175             &mut db,
   1176             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1177             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1178         )
   1179         .await;
   1180 
   1181         // Bounced transaction
   1182         routine(&mut db, &TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1183 
   1184         // History
   1185         assert_eq!(
   1186             db::outgoing_history(&pool, &History::default(), dummy_listen)
   1187                 .await
   1188                 .unwrap()
   1189                 .len(),
   1190             2
   1191         );
   1192     }
   1193 
   1194     #[tokio::test]
   1195     async fn tx_out_failure() {
   1196         let (mut db, _) = setup().await;
   1197 
   1198         let now = now_sql_stable_timestamp();
   1199 
   1200         // Unknown
   1201         assert_eq!(
   1202             db::register_tx_out_failure(&mut db, 42, None, &now)
   1203                 .await
   1204                 .unwrap(),
   1205             OutFailureResult {
   1206                 initiated_id: None,
   1207                 new: false
   1208             }
   1209         );
   1210         assert_eq!(
   1211             db::register_tx_out_failure(&mut db, 42, Some(12), &now)
   1212                 .await
   1213                 .unwrap(),
   1214             OutFailureResult {
   1215                 initiated_id: None,
   1216                 new: false
   1217             }
   1218         );
   1219 
   1220         // Initiated
   1221         let req = db::Transfer {
   1222             request_uid: HashCode::rand(),
   1223             amount: decimal("10"),
   1224             exchange_base_url: url("https://exchange.test.com/"),
   1225             wtid: ShortHashCode::rand(),
   1226             creditor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"),
   1227         };
   1228         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1229         assert_eq!(
   1230             make_transfer(&mut db, &req, &now).await.unwrap(),
   1231             TransferResult::Success {
   1232                 id: 1,
   1233                 initiated_at: now
   1234             }
   1235         );
   1236         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34)
   1237             .await
   1238             .expect("status success");
   1239         assert_eq!(
   1240             db::register_tx_out_failure(&mut db, 34, None, &now)
   1241                 .await
   1242                 .unwrap(),
   1243             OutFailureResult {
   1244                 initiated_id: Some(1),
   1245                 new: true
   1246             }
   1247         );
   1248         assert_eq!(
   1249             db::register_tx_out_failure(&mut db, 34, None, &now)
   1250                 .await
   1251                 .unwrap(),
   1252             OutFailureResult {
   1253                 initiated_id: Some(1),
   1254                 new: false
   1255             }
   1256         );
   1257 
   1258         // Recovered bounce
   1259         let tx = TxIn {
   1260             code: 12,
   1261             amount: amount("HUF:11"),
   1262             subject: "malformed transaction".to_owned(),
   1263             debtor: payto,
   1264             value_date: Zoned::now().date(),
   1265             status: TxStatus::Completed,
   1266         };
   1267         assert_eq!(
   1268             db::register_bounce_tx_in(&mut db, &tx, &tx.amount, "no reason", &now)
   1269                 .await
   1270                 .unwrap(),
   1271             BounceResult {
   1272                 tx_id: 1,
   1273                 tx_new: true,
   1274                 bounce_id: 2,
   1275                 bounce_new: true
   1276             }
   1277         );
   1278         assert_eq!(
   1279             db::register_tx_out_failure(&mut db, 10, Some(12), &now)
   1280                 .await
   1281                 .unwrap(),
   1282             OutFailureResult {
   1283                 initiated_id: Some(2),
   1284                 new: true
   1285             }
   1286         );
   1287         assert_eq!(
   1288             db::register_tx_out_failure(&mut db, 10, Some(12), &now)
   1289                 .await
   1290                 .unwrap(),
   1291             OutFailureResult {
   1292                 initiated_id: Some(2),
   1293                 new: false
   1294             }
   1295         );
   1296     }
   1297 
   1298     #[tokio::test]
   1299     async fn transfer() {
   1300         let (mut db, _) = setup().await;
   1301 
   1302         // Empty db
   1303         assert_eq!(db::transfer_by_id(&mut db, 0).await.unwrap(), None);
   1304         assert_eq!(
   1305             db::transfer_page(&mut db, &None, &Page::default())
   1306                 .await
   1307                 .unwrap(),
   1308             Vec::new()
   1309         );
   1310 
   1311         let req = db::Transfer {
   1312             request_uid: HashCode::rand(),
   1313             amount: decimal("10"),
   1314             exchange_base_url: url("https://exchange.test.com/"),
   1315             wtid: ShortHashCode::rand(),
   1316             creditor: magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name"),
   1317         };
   1318         let now = now_sql_stable_timestamp();
   1319         let later = now + Span::new().hours(2);
   1320         // Insert
   1321         assert_eq!(
   1322             make_transfer(&mut db, &req, &now).await.expect("transfer"),
   1323             TransferResult::Success {
   1324                 id: 1,
   1325                 initiated_at: now.into()
   1326             }
   1327         );
   1328         // Idempotent
   1329         assert_eq!(
   1330             make_transfer(&mut db, &req, &later)
   1331                 .await
   1332                 .expect("transfer"),
   1333             TransferResult::Success {
   1334                 id: 1,
   1335                 initiated_at: now.into()
   1336             }
   1337         );
   1338         // Request UID reuse
   1339         assert_eq!(
   1340             make_transfer(
   1341                 &mut db,
   1342                 &db::Transfer {
   1343                     wtid: ShortHashCode::rand(),
   1344                     ..req.clone()
   1345                 },
   1346                 &now
   1347             )
   1348             .await
   1349             .expect("transfer"),
   1350             TransferResult::RequestUidReuse
   1351         );
   1352         // wtid reuse
   1353         assert_eq!(
   1354             make_transfer(
   1355                 &mut db,
   1356                 &db::Transfer {
   1357                     request_uid: HashCode::rand(),
   1358                     ..req.clone()
   1359                 },
   1360                 &now
   1361             )
   1362             .await
   1363             .expect("transfer"),
   1364             TransferResult::WtidReuse
   1365         );
   1366         // Many
   1367         assert_eq!(
   1368             make_transfer(
   1369                 &mut db,
   1370                 &db::Transfer {
   1371                     request_uid: HashCode::rand(),
   1372                     wtid: ShortHashCode::rand(),
   1373                     ..req
   1374                 },
   1375                 &later
   1376             )
   1377             .await
   1378             .expect("transfer"),
   1379             TransferResult::Success {
   1380                 id: 2,
   1381                 initiated_at: later.into()
   1382             }
   1383         );
   1384 
   1385         // Get
   1386         assert!(db::transfer_by_id(&mut db, 1).await.unwrap().is_some());
   1387         assert!(db::transfer_by_id(&mut db, 2).await.unwrap().is_some());
   1388         assert!(db::transfer_by_id(&mut db, 3).await.unwrap().is_none());
   1389         assert_eq!(
   1390             db::transfer_page(&mut db, &None, &Page::default())
   1391                 .await
   1392                 .unwrap()
   1393                 .len(),
   1394             2
   1395         );
   1396     }
   1397 
   1398     #[tokio::test]
   1399     async fn bounce() {
   1400         let (mut db, _) = setup().await;
   1401 
   1402         let amount = amount("HUF:10");
   1403         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1404         let now = now_sql_stable_timestamp();
   1405         let date = Zoned::now().date();
   1406 
   1407         // Empty db
   1408         assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty());
   1409 
   1410         // Insert
   1411         assert_eq!(
   1412             register_tx_in(
   1413                 &mut db,
   1414                 &TxIn {
   1415                     code: 13,
   1416                     amount: amount.clone(),
   1417                     subject: "subject".to_owned(),
   1418                     debtor: payto.clone(),
   1419                     value_date: date,
   1420                     status: TxStatus::Completed
   1421                 },
   1422                 &None,
   1423                 &now
   1424             )
   1425             .await
   1426             .expect("register tx in"),
   1427             AddIncomingResult::Success {
   1428                 new: true,
   1429                 row_id: 1,
   1430                 valued_at: date
   1431             }
   1432         );
   1433 
   1434         // Bounce
   1435         assert_eq!(
   1436             register_bounce_tx_in(
   1437                 &mut db,
   1438                 &TxIn {
   1439                     code: 12,
   1440                     amount: amount.clone(),
   1441                     subject: "subject".to_owned(),
   1442                     debtor: payto.clone(),
   1443                     value_date: date,
   1444                     status: TxStatus::Completed
   1445                 },
   1446                 &amount,
   1447                 "good reason",
   1448                 &now
   1449             )
   1450             .await
   1451             .expect("bounce"),
   1452             BounceResult {
   1453                 tx_id: 2,
   1454                 tx_new: true,
   1455                 bounce_id: 1,
   1456                 bounce_new: true
   1457             }
   1458         );
   1459         // Idempotent
   1460         assert_eq!(
   1461             register_bounce_tx_in(
   1462                 &mut db,
   1463                 &TxIn {
   1464                     code: 12,
   1465                     amount: amount.clone(),
   1466                     subject: "subject".to_owned(),
   1467                     debtor: payto.clone(),
   1468                     value_date: date,
   1469                     status: TxStatus::Completed
   1470                 },
   1471                 &amount,
   1472                 "good reason",
   1473                 &now
   1474             )
   1475             .await
   1476             .expect("bounce"),
   1477             BounceResult {
   1478                 tx_id: 2,
   1479                 tx_new: false,
   1480                 bounce_id: 1,
   1481                 bounce_new: false
   1482             }
   1483         );
   1484 
   1485         // Bounce registered
   1486         assert_eq!(
   1487             register_bounce_tx_in(
   1488                 &mut db,
   1489                 &TxIn {
   1490                     code: 13,
   1491                     amount: amount.clone(),
   1492                     subject: "subject".to_owned(),
   1493                     debtor: payto.clone(),
   1494                     value_date: date,
   1495                     status: TxStatus::Completed
   1496                 },
   1497                 &amount,
   1498                 "good reason",
   1499                 &now
   1500             )
   1501             .await
   1502             .expect("bounce"),
   1503             BounceResult {
   1504                 tx_id: 1,
   1505                 tx_new: false,
   1506                 bounce_id: 2,
   1507                 bounce_new: true
   1508             }
   1509         );
   1510         // Idempotent registered
   1511         assert_eq!(
   1512             register_bounce_tx_in(
   1513                 &mut db,
   1514                 &TxIn {
   1515                     code: 13,
   1516                     amount: amount.clone(),
   1517                     subject: "subject".to_owned(),
   1518                     debtor: payto.clone(),
   1519                     value_date: date,
   1520                     status: TxStatus::Completed
   1521                 },
   1522                 &amount,
   1523                 "good reason",
   1524                 &now
   1525             )
   1526             .await
   1527             .expect("bounce"),
   1528             BounceResult {
   1529                 tx_id: 1,
   1530                 tx_new: false,
   1531                 bounce_id: 2,
   1532                 bounce_new: false
   1533             }
   1534         );
   1535 
   1536         // Batch
   1537         assert_eq!(
   1538             db::pending_batch(&mut db, &now).await.unwrap(),
   1539             &[
   1540                 Initiated {
   1541                     id: 1,
   1542                     amount: amount.clone(),
   1543                     subject: "bounce: 12".to_owned(),
   1544                     creditor: payto.clone()
   1545                 },
   1546                 Initiated {
   1547                     id: 2,
   1548                     amount,
   1549                     subject: "bounce: 13".to_owned(),
   1550                     creditor: payto
   1551                 }
   1552             ]
   1553         );
   1554     }
   1555 
   1556     #[tokio::test]
   1557     async fn status() {
   1558         let (mut db, _) = setup().await;
   1559 
   1560         // Unknown transfer
   1561         db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg")
   1562             .await
   1563             .unwrap();
   1564         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1565             .await
   1566             .unwrap();
   1567     }
   1568 
   1569     #[tokio::test]
   1570     async fn batch() {
   1571         let (mut db, _) = setup().await;
   1572         let start = Timestamp::now();
   1573         let magnet_payto =
   1574             magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1575 
   1576         // Empty db
   1577         let pendings = db::pending_batch(&mut db, &start)
   1578             .await
   1579             .expect("pending_batch");
   1580         assert_eq!(pendings.len(), 0);
   1581 
   1582         // Some transfers
   1583         for i in 0..3 {
   1584             make_transfer(
   1585                 &mut db,
   1586                 &db::Transfer {
   1587                     request_uid: HashCode::rand(),
   1588                     amount: decimal(format!("{}", i + 1)),
   1589                     exchange_base_url: url("https://exchange.test.com/"),
   1590                     wtid: ShortHashCode::rand(),
   1591                     creditor: magnet_payto.clone(),
   1592                 },
   1593                 &Timestamp::now(),
   1594             )
   1595             .await
   1596             .expect("transfer");
   1597         }
   1598         let pendings = db::pending_batch(&mut db, &start)
   1599             .await
   1600             .expect("pending_batch");
   1601         assert_eq!(pendings.len(), 3);
   1602 
   1603         // Max 100 txs in batch
   1604         for i in 0..100 {
   1605             make_transfer(
   1606                 &mut db,
   1607                 &db::Transfer {
   1608                     request_uid: HashCode::rand(),
   1609                     amount: decimal(format!("{}", i + 1)),
   1610                     exchange_base_url: url("https://exchange.test.com/"),
   1611                     wtid: ShortHashCode::rand(),
   1612                     creditor: magnet_payto.clone(),
   1613                 },
   1614                 &Timestamp::now(),
   1615             )
   1616             .await
   1617             .expect("transfer");
   1618         }
   1619         let pendings = db::pending_batch(&mut db, &start)
   1620             .await
   1621             .expect("pending_batch");
   1622         assert_eq!(pendings.len(), 100);
   1623 
   1624         // Skip uploaded
   1625         for i in 0..=10 {
   1626             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1627                 .await
   1628                 .expect("status success");
   1629         }
   1630         let pendings = db::pending_batch(&mut db, &start)
   1631             .await
   1632             .expect("pending_batch");
   1633         assert_eq!(pendings.len(), 93);
   1634 
   1635         // Skip failed
   1636         for i in 0..=10 {
   1637             db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure")
   1638                 .await
   1639                 .expect("status failure");
   1640         }
   1641         let pendings = db::pending_batch(&mut db, &start)
   1642             .await
   1643             .expect("pending_batch");
   1644         assert_eq!(pendings.len(), 83);
   1645     }
   1646 }