taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (50372B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use compact_str::CompactString;
     20 use jiff::{Timestamp, civil::Date, tz::TimeZone};
     21 use serde::{Serialize, de::DeserializeOwned};
     22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow};
     23 use taler_api::{
     24     db::{BindHelper, TypeHelper, history, page},
     25     serialized,
     26     subject::{IncomingSubject, OutgoingSubject, fmt_out_subject},
     27 };
     28 use taler_common::{
     29     api_common::{HashCode, ShortHashCode},
     30     api_params::{History, Page},
     31     api_revenue::RevenueIncomingBankTransaction,
     32     api_transfer::{RegistrationRequest, Unregistration},
     33     api_wire::{
     34         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState,
     35         TransferStatus,
     36     },
     37     config::Config,
     38     db::IncomingType,
     39     types::{
     40         amount::{Amount, Decimal},
     41         payto::PaytoImpl as _,
     42     },
     43 };
     44 use tokio::sync::watch::{Receiver, Sender};
     45 use url::Url;
     46 
     47 use crate::{FullHuPayto, config::parse_db_cfg, constants::CURRENCY, magnet_api::types::TxStatus};
     48 
     49 const SCHEMA: &str = "magnet_bank";
     50 
     51 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> {
     52     let db = parse_db_cfg(cfg)?;
     53     let pool = taler_common::db::pool(db.cfg, SCHEMA).await?;
     54     Ok(pool)
     55 }
     56 
     57 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> {
     58     let db_cfg = parse_db_cfg(cfg)?;
     59     let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?;
     60     let mut db = pool.acquire().await?;
     61     taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
     62     Ok(pool)
     63 }
     64 
     65 pub async fn notification_listener(
     66     pool: PgPool,
     67     in_channel: Sender<i64>,
     68     taler_in_channel: Sender<i64>,
     69     out_channel: Sender<i64>,
     70     taler_out_channel: Sender<i64>,
     71 ) -> sqlx::Result<()> {
     72     taler_api::notification::notification_listener!(&pool,
     73         "tx_in" => (row_id: i64) {
     74             in_channel.send_replace(row_id);
     75         },
     76         "taler_in" => (row_id: i64) {
     77             taler_in_channel.send_replace(row_id);
     78         },
     79         "tx_out" => (row_id: i64) {
     80             out_channel.send_replace(row_id);
     81         },
     82         "taler_out" => (row_id: i64) {
     83             taler_out_channel.send_replace(row_id);
     84         }
     85     )
     86 }
     87 
     88 #[derive(Debug, Clone)]
     89 pub struct TxIn {
     90     pub code: u64,
     91     pub amount: Amount,
     92     pub subject: Box<str>,
     93     pub debtor: FullHuPayto,
     94     pub value_date: Date,
     95     pub status: TxStatus,
     96 }
     97 
     98 impl Display for TxIn {
     99     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    100         let Self {
    101             code,
    102             amount,
    103             subject,
    104             debtor,
    105             value_date,
    106             status,
    107         } = self;
    108         write!(
    109             f,
    110             "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
    111             debtor.bban(),
    112             debtor.name
    113         )
    114     }
    115 }
    116 
    117 #[derive(Debug, Clone)]
    118 pub struct TxOut {
    119     pub code: u64,
    120     pub amount: Amount,
    121     pub subject: Box<str>,
    122     pub creditor: FullHuPayto,
    123     pub value_date: Date,
    124     pub status: TxStatus,
    125 }
    126 
    127 impl Display for TxOut {
    128     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    129         let Self {
    130             code,
    131             amount,
    132             subject,
    133             creditor,
    134             value_date,
    135             status,
    136         } = self;
    137         write!(
    138             f,
    139             "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
    140             creditor.bban(),
    141             &creditor.name
    142         )
    143     }
    144 }
    145 
    146 #[derive(Debug, PartialEq, Eq)]
    147 pub struct Initiated {
    148     pub id: u64,
    149     pub amount: Amount,
    150     pub subject: Box<str>,
    151     pub creditor: FullHuPayto,
    152 }
    153 
    154 impl Display for Initiated {
    155     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    156         let Self {
    157             id,
    158             amount,
    159             subject,
    160             creditor,
    161         } = self;
    162         write!(
    163             f,
    164             "{id} {amount} ({} {}) '{subject}'",
    165             creditor.bban(),
    166             &creditor.name
    167         )
    168     }
    169 }
    170 
    171 #[derive(Debug, Clone)]
    172 pub struct TxInAdmin {
    173     pub amount: Amount,
    174     pub subject: String,
    175     pub debtor: FullHuPayto,
    176     pub metadata: IncomingSubject,
    177 }
    178 
    179 /// Lock the database for worker execution
    180 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
    181     sqlx::query("SELECT pg_try_advisory_lock(42)")
    182         .try_map(|r: PgRow| r.try_get(0))
    183         .fetch_one(e)
    184         .await
    185 }
    186 
    187 #[derive(Debug, PartialEq, Eq)]
    188 pub enum AddIncomingResult {
    189     Success {
    190         new: bool,
    191         pending: bool,
    192         row_id: u64,
    193         valued_at: Date,
    194     },
    195     ReservePubReuse,
    196     UnknownMapping,
    197     MappingReuse,
    198 }
    199 
    200 pub async fn register_tx_in_admin(
    201     db: &PgPool,
    202     tx: &TxInAdmin,
    203     now: &Timestamp,
    204 ) -> sqlx::Result<AddIncomingResult> {
    205     serialized!(
    206         sqlx::query(
    207             "
    208                 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    209                 FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5)
    210             ",
    211         )
    212         .bind(&tx.amount)
    213         .bind(&tx.subject)
    214         .bind(tx.debtor.iban())
    215         .bind(&tx.debtor.name)
    216         .bind_date(&now.to_zoned(TimeZone::UTC).date())
    217         .bind(tx.metadata.ty())
    218         .bind(tx.metadata.key())
    219         .try_map(|r: PgRow| {
    220             Ok(if r.try_get_flag(0)? {
    221                 AddIncomingResult::ReservePubReuse
    222             } else if r.try_get_flag(1)? {
    223                 AddIncomingResult::MappingReuse
    224             } else if r.try_get_flag(2)? {
    225                 AddIncomingResult::UnknownMapping
    226             } else {
    227                 AddIncomingResult::Success {
    228                     row_id: r.try_get_u64(3)?,
    229                     valued_at: r.try_get_date(4)?,
    230                     new: r.try_get(5)?,
    231                     pending: r.try_get(6)?
    232                 }
    233             })
    234         })
    235         .fetch_one(db)
    236     )
    237 }
    238 
    239 pub async fn register_tx_in(
    240     db: &mut PgConnection,
    241     tx: &TxIn,
    242     subject: &Option<IncomingSubject>,
    243     now: &Timestamp,
    244 ) -> sqlx::Result<AddIncomingResult> {
    245     serialized!(
    246         sqlx::query(
    247             "
    248                 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    249                 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9)
    250             ",
    251         )
    252         .bind(tx.code as i64)
    253         .bind(&tx.amount)
    254         .bind(&tx.subject)
    255         .bind(tx.debtor.iban())
    256         .bind(&tx.debtor.name)
    257         .bind_date(&tx.value_date)
    258         .bind(subject.as_ref().map(|it| it.ty()))
    259         .bind(subject.as_ref().map(|it| it.key()))
    260         .bind_timestamp(now)
    261         .try_map(|r: PgRow| {
    262             Ok(if r.try_get_flag(0)? {
    263                 AddIncomingResult::ReservePubReuse
    264             } else if r.try_get_flag(1)? {
    265                 AddIncomingResult::MappingReuse
    266             } else if r.try_get_flag(2)? {
    267                 AddIncomingResult::UnknownMapping
    268             } else {
    269                 AddIncomingResult::Success {
    270                     row_id: r.try_get_u64(3)?,
    271                     valued_at: r.try_get_date(4)?,
    272                     new: r.try_get(5)?,
    273                     pending: r.try_get(6)?
    274                 }
    275             })
    276         })
    277         .fetch_one(&mut *db)
    278     )
    279 }
    280 
    281 #[derive(Debug)]
    282 pub enum TxOutKind {
    283     Simple,
    284     Bounce(u32),
    285     Talerable(OutgoingSubject),
    286 }
    287 
    288 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    289 #[allow(non_camel_case_types)]
    290 #[sqlx(type_name = "register_result")]
    291 pub enum RegisterResult {
    292     /// Already registered
    293     idempotent,
    294     /// Initiated transaction
    295     known,
    296     /// Recovered unknown outgoing transaction
    297     recovered,
    298 }
    299 
    300 #[derive(Debug, PartialEq, Eq)]
    301 pub struct AddOutgoingResult {
    302     pub result: RegisterResult,
    303     pub row_id: u64,
    304 }
    305 
    306 pub async fn register_tx_out(
    307     db: &mut PgConnection,
    308     tx: &TxOut,
    309     kind: &TxOutKind,
    310     now: &Timestamp,
    311 ) -> sqlx::Result<AddOutgoingResult> {
    312     serialized!({
    313         let query = sqlx::query(
    314             "
    315                 SELECT out_result, out_tx_row_id 
    316                 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
    317             ",
    318         )
    319         .bind(tx.code as i64)
    320         .bind(&tx.amount)
    321         .bind(&tx.subject)
    322         .bind(tx.creditor.iban())
    323         .bind(&tx.creditor.name)
    324         .bind_date(&tx.value_date);
    325         let query = match kind {
    326             TxOutKind::Simple => query
    327                 .bind(None::<&[u8]>)
    328                 .bind(None::<&str>)
    329                 .bind(None::<&str>)
    330                 .bind(None::<i64>),
    331             TxOutKind::Bounce(bounced) => query
    332                 .bind(None::<&[u8]>)
    333                 .bind(None::<&str>)
    334                 .bind(None::<&str>)
    335                 .bind(*bounced as i64),
    336             TxOutKind::Talerable(subject) => query
    337                 .bind(&subject.wtid)
    338                 .bind(subject.exchange_base_url.as_str())
    339                 .bind(&subject.metadata)
    340                 .bind(None::<i64>),
    341         };
    342         query
    343             .bind_timestamp(now)
    344             .try_map(|r: PgRow| {
    345                 Ok(AddOutgoingResult {
    346                     result: r.try_get(0)?,
    347                     row_id: r.try_get_u64(1)?,
    348                 })
    349             })
    350             .fetch_one(&mut *db)
    351     })
    352 }
    353 
    354 #[derive(Debug, PartialEq, Eq)]
    355 pub struct OutFailureResult {
    356     pub initiated_id: Option<u64>,
    357     pub new: bool,
    358 }
    359 
    360 pub async fn register_tx_out_failure(
    361     db: &mut PgConnection,
    362     code: u64,
    363     bounced: Option<u32>,
    364     now: &Timestamp,
    365 ) -> sqlx::Result<OutFailureResult> {
    366     serialized!(
    367         sqlx::query(
    368             "
    369                 SELECT out_new, out_initiated_id 
    370                 FROM register_tx_out_failure($1, $2, $3)
    371             ",
    372         )
    373         .bind(code as i64)
    374         .bind(bounced.map(|i| i as i32))
    375         .bind_timestamp(now)
    376         .try_map(|r: PgRow| {
    377             Ok(OutFailureResult {
    378                 new: r.try_get(0)?,
    379                 initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64),
    380             })
    381         })
    382         .fetch_one(&mut *db)
    383     )
    384 }
    385 
    386 #[derive(Debug, PartialEq, Eq)]
    387 pub enum TransferResult {
    388     Success { id: u64, initiated_at: Timestamp },
    389     RequestUidReuse,
    390     WtidReuse,
    391 }
    392 
    393 #[derive(Debug, Clone)]
    394 pub struct Transfer {
    395     pub request_uid: HashCode,
    396     pub amount: Decimal,
    397     pub exchange_base_url: Url,
    398     pub metadata: Option<CompactString>,
    399     pub wtid: ShortHashCode,
    400     pub creditor: FullHuPayto,
    401 }
    402 
    403 pub async fn make_transfer(
    404     db: &PgPool,
    405     tx: &Transfer,
    406     now: &Timestamp,
    407 ) -> sqlx::Result<TransferResult> {
    408     let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref());
    409     serialized!(
    410         sqlx::query(
    411             "
    412                 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    413                 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9)
    414             ",
    415         )
    416         .bind(&tx.request_uid)
    417         .bind(&tx.wtid)
    418         .bind(&subject)
    419         .bind(tx.amount)
    420         .bind(tx.exchange_base_url.as_str())
    421         .bind(&tx.metadata)
    422         .bind(tx.creditor.iban())
    423         .bind(&tx.creditor.name)
    424         .bind_timestamp(now)
    425         .try_map(|r: PgRow| {
    426             Ok(if r.try_get_flag(0)? {
    427                 TransferResult::RequestUidReuse
    428             } else if r.try_get_flag(1)? {
    429                 TransferResult::WtidReuse
    430             } else {
    431                 TransferResult::Success {
    432                     id: r.try_get_u64(2)?,
    433                     initiated_at: r.try_get_timestamp(3)?,
    434                 }
    435             })
    436         })
    437         .fetch_one(db)
    438     )
    439 }
    440 
    441 #[derive(Debug, PartialEq, Eq)]
    442 pub struct BounceResult {
    443     pub tx_id: u64,
    444     pub tx_new: bool,
    445     pub bounce_id: u64,
    446     pub bounce_new: bool,
    447 }
    448 
    449 pub async fn register_bounce_tx_in(
    450     db: &mut PgConnection,
    451     tx: &TxIn,
    452     reason: &str,
    453     now: &Timestamp,
    454 ) -> sqlx::Result<BounceResult> {
    455     serialized!(
    456         sqlx::query(
    457             "
    458                 SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new
    459                 FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8)
    460             ",
    461         )
    462         .bind(tx.code as i64)
    463         .bind(&tx.amount)
    464         .bind(&tx.subject)
    465         .bind(tx.debtor.iban())
    466         .bind(&tx.debtor.name)
    467         .bind_date(&tx.value_date)
    468         .bind(reason)
    469         .bind_timestamp(now)
    470         .try_map(|r: PgRow| {
    471             Ok(BounceResult {
    472                 tx_id: r.try_get_u64(0)?,
    473                 tx_new: r.try_get(1)?,
    474                 bounce_id: r.try_get_u64(2)?,
    475                 bounce_new: r.try_get(3)?,
    476             })
    477         })
    478         .fetch_one(&mut *db)
    479     )
    480 }
    481 
    482 pub async fn transfer_page(
    483     db: &PgPool,
    484     status: &Option<TransferState>,
    485     params: &Page,
    486 ) -> sqlx::Result<Vec<TransferListStatus>> {
    487     page(
    488         db,
    489         "initiated_id",
    490         params,
    491         || {
    492             let mut builder = QueryBuilder::new(
    493                 "
    494                     SELECT
    495                         initiated_id,
    496                         status,
    497                         amount,
    498                         credit_account,
    499                         credit_name,
    500                         initiated_at
    501                     FROM transfer 
    502                     JOIN initiated USING (initiated_id)
    503                     WHERE
    504                 ",
    505             );
    506             if let Some(status) = status {
    507                 builder.push(" status = ").push_bind(status).push(" AND ");
    508             }
    509             builder
    510         },
    511         |r: PgRow| {
    512             Ok(TransferListStatus {
    513                 row_id: r.try_get_safeu64(0)?,
    514                 status: r.try_get(1)?,
    515                 amount: r.try_get_amount(2, &CURRENCY)?,
    516                 credit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    517                 timestamp: r.try_get_timestamp(5)?.into(),
    518             })
    519         },
    520     )
    521     .await
    522 }
    523 
    524 pub async fn outgoing_history(
    525     db: &PgPool,
    526     params: &History,
    527     listen: impl FnOnce() -> Receiver<i64>,
    528 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    529     history(
    530         db,
    531         "tx_out_id",
    532         params,
    533         listen,
    534         || {
    535             QueryBuilder::new(
    536                 "
    537                 SELECT
    538                     tx_out_id,
    539                     amount,
    540                     credit_account,
    541                     credit_name,
    542                     valued_at,
    543                     exchange_base_url,
    544                     metadata,
    545                     wtid
    546                 FROM taler_out
    547                 JOIN tx_out USING (tx_out_id)
    548                 WHERE
    549             ",
    550             )
    551         },
    552         |r: PgRow| {
    553             Ok(OutgoingBankTransaction {
    554                 row_id: r.try_get_safeu64(0)?,
    555                 amount: r.try_get_amount(1, &CURRENCY)?,
    556                 debit_fee: None,
    557                 credit_account: r.try_get_iban(2)?.as_full_payto(r.try_get(3)?),
    558                 date: r.try_get_timestamp(4)?.into(),
    559                 exchange_base_url: r.try_get_url(5)?,
    560                 metadata: r.try_get(6)?,
    561                 wtid: r.try_get(7)?,
    562             })
    563         },
    564     )
    565     .await
    566 }
    567 
    568 pub async fn incoming_history(
    569     db: &PgPool,
    570     params: &History,
    571     listen: impl FnOnce() -> Receiver<i64>,
    572 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    573     history(
    574         db,
    575         "tx_in_id",
    576         params,
    577         listen,
    578         || {
    579             QueryBuilder::new(
    580                 "
    581                 SELECT
    582                     type,
    583                     tx_in_id,
    584                     amount,
    585                     debit_account,
    586                     debit_name,
    587                     valued_at,
    588                     metadata,
    589                     authorization_pub,
    590                     authorization_sig
    591                 FROM taler_in
    592                 JOIN tx_in USING (tx_in_id)
    593                 WHERE
    594             ",
    595             )
    596         },
    597         |r: PgRow| {
    598             Ok(match r.try_get(0)? {
    599                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    600                     row_id: r.try_get_safeu64(1)?,
    601                     amount: r.try_get_amount(2, &CURRENCY)?,
    602                     credit_fee: None,
    603                     debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    604                     date: r.try_get_timestamp(5)?.into(),
    605                     reserve_pub: r.try_get(6)?,
    606                     authorization_pub: r.try_get(7)?,
    607                     authorization_sig: r.try_get(8)?,
    608                 },
    609                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    610                     row_id: r.try_get_safeu64(1)?,
    611                     amount: r.try_get_amount(2, &CURRENCY)?,
    612                     credit_fee: None,
    613                     debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    614                     date: r.try_get_timestamp(5)?.into(),
    615                     account_pub: r.try_get(6)?,
    616                     authorization_pub: r.try_get(7)?,
    617                     authorization_sig: r.try_get(8)?,
    618                 },
    619                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    620             })
    621         },
    622     )
    623     .await
    624 }
    625 
    626 pub async fn revenue_history(
    627     db: &PgPool,
    628     params: &History,
    629     listen: impl FnOnce() -> Receiver<i64>,
    630 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    631     history(
    632         db,
    633         "tx_in_id",
    634         params,
    635         listen,
    636         || {
    637             QueryBuilder::new(
    638                 "
    639                 SELECT
    640                     tx_in_id,
    641                     valued_at,
    642                     amount,
    643                     debit_account,
    644                     debit_name,
    645                     subject
    646                 FROM tx_in
    647                 WHERE
    648             ",
    649             )
    650         },
    651         |r: PgRow| {
    652             Ok(RevenueIncomingBankTransaction {
    653                 row_id: r.try_get_safeu64(0)?,
    654                 date: r.try_get_timestamp(1)?.into(),
    655                 amount: r.try_get_amount(2, &CURRENCY)?,
    656                 credit_fee: None,
    657                 debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    658                 subject: r.try_get(5)?,
    659             })
    660         },
    661     )
    662     .await
    663 }
    664 
    665 pub async fn transfer_by_id(db: &PgPool, id: u64) -> sqlx::Result<Option<TransferStatus>> {
    666     serialized!(
    667         sqlx::query(
    668             "
    669                 SELECT
    670                     status,
    671                     status_msg,
    672                     amount,
    673                     exchange_base_url,
    674                     metadata,
    675                     wtid,
    676                     credit_account,
    677                     credit_name,
    678                     initiated_at
    679                 FROM transfer 
    680                 JOIN initiated USING (initiated_id) 
    681                 WHERE initiated_id = $1
    682             ",
    683         )
    684         .bind(id as i64)
    685         .try_map(|r: PgRow| {
    686             Ok(TransferStatus {
    687                 status: r.try_get(0)?,
    688                 status_msg: r.try_get(1)?,
    689                 amount: r.try_get_amount(2, &CURRENCY)?,
    690                 origin_exchange_url: r.try_get(3)?,
    691                 metadata: r.try_get(4)?,
    692                 wtid: r.try_get(5)?,
    693                 credit_account: r.try_get_iban(6)?.as_full_payto(r.try_get(7)?),
    694                 timestamp: r.try_get_timestamp(8)?.into(),
    695             })
    696         })
    697         .fetch_optional(db)
    698     )
    699 }
    700 
    701 /** Get a batch of pending initiated transactions not attempted since [start] */
    702 pub async fn pending_batch(
    703     db: &mut PgConnection,
    704     start: &Timestamp,
    705 ) -> sqlx::Result<Vec<Initiated>> {
    706     serialized!(
    707         sqlx::query(
    708             "
    709                 SELECT initiated_id, amount, subject, credit_account, credit_name
    710                 FROM initiated 
    711                 WHERE magnet_code IS NULL
    712                     AND status='pending'
    713                     AND (last_submitted IS NULL OR last_submitted < $1)
    714                 LIMIT 100
    715             ",
    716         )
    717         .bind_timestamp(start)
    718         .try_map(|r: PgRow| {
    719             Ok(Initiated {
    720                 id: r.try_get_u64(0)?,
    721                 amount: r.try_get_amount(1, &CURRENCY)?,
    722                 subject: r.try_get(2)?,
    723                 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?),
    724             })
    725         })
    726         .fetch_all(&mut *db)
    727     )
    728 }
    729 
    730 /** Get an initiated transaction matching the given magnet [code] */
    731 pub async fn initiated_by_code(
    732     db: &mut PgConnection,
    733     code: u64,
    734 ) -> sqlx::Result<Option<Initiated>> {
    735     serialized!(
    736         sqlx::query(
    737             "
    738                 SELECT initiated_id, amount, subject, credit_account, credit_name
    739                 FROM initiated 
    740                 WHERE magnet_code IS $1
    741             ",
    742         )
    743         .bind(code as i64)
    744         .try_map(|r: PgRow| {
    745             Ok(Initiated {
    746                 id: r.try_get_u64(0)?,
    747                 amount: r.try_get_amount(1, &CURRENCY)?,
    748                 subject: r.try_get(2)?,
    749                 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?),
    750             })
    751         })
    752         .fetch_optional(&mut *db)
    753     )
    754 }
    755 
    756 /** Update status of a successful submitted initiated transaction */
    757 pub async fn initiated_submit_success(
    758     db: &mut PgConnection,
    759     id: u64,
    760     timestamp: &Timestamp,
    761     magnet_code: u64,
    762 ) -> sqlx::Result<()> {
    763     serialized!(
    764         sqlx::query(
    765             "
    766                 UPDATE initiated
    767                 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2
    768                 WHERE initiated_id=$3
    769             "
    770         ).bind_timestamp(timestamp)
    771         .bind(magnet_code as i64)
    772         .bind(id as i64)
    773         .execute(&mut *db)
    774     )?;
    775     Ok(())
    776 }
    777 
    778 /** Update status of a permanently failed initiated transaction */
    779 pub async fn initiated_submit_permanent_failure(
    780     db: &mut PgConnection,
    781     id: u64,
    782     timestamp: &Timestamp,
    783     msg: &str,
    784 ) -> sqlx::Result<()> {
    785     serialized!(
    786         sqlx::query(
    787             "
    788                 UPDATE initiated
    789                 SET status='permanent_failure', status_msg=$2
    790                 WHERE initiated_id=$3
    791             ",
    792         )
    793         .bind_timestamp(timestamp)
    794         .bind(msg)
    795         .bind(id as i64)
    796         .execute(&mut *db)
    797     )?;
    798     Ok(())
    799 }
    800 
    801 /** Check if an initiated transaction exist for a magnet code */
    802 pub async fn initiated_exists_for_code(
    803     db: &mut PgConnection,
    804     code: u64,
    805 ) -> sqlx::Result<Option<u64>> {
    806     serialized!(
    807         sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1")
    808             .bind(code as i64)
    809             .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64))
    810             .fetch_optional(&mut *db)
    811     )
    812 }
    813 
    814 /** Get JSON value from KV table */
    815 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>(
    816     db: &mut PgConnection,
    817     key: &str,
    818 ) -> sqlx::Result<Option<T>> {
    819     serialized!(
    820         sqlx::query("SELECT value FROM kv WHERE key=$1")
    821             .bind(key)
    822             .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    823             .fetch_optional(&mut *db)
    824     )
    825 }
    826 
    827 /** Set JSON value in KV table */
    828 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> {
    829     serialized!(
    830         sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    831             .bind(key)
    832             .bind(sqlx::types::Json(value))
    833             .execute(&mut *db)
    834     )?;
    835     Ok(())
    836 }
    837 
    838 pub enum RegistrationResult {
    839     Success,
    840     ReservePubReuse,
    841 }
    842 
    843 pub async fn transfer_register(
    844     db: &PgPool,
    845     req: &RegistrationRequest,
    846 ) -> sqlx::Result<RegistrationResult> {
    847     let ty: IncomingType = req.r#type.into();
    848     serialized!(
    849         sqlx::query(
    850             "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)"
    851         )
    852         .bind(ty)
    853         .bind(&req.account_pub)
    854         .bind(&req.authorization_pub)
    855         .bind(&req.authorization_sig)
    856         .bind(req.recurrent)
    857         .bind_timestamp(&Timestamp::now())
    858         .try_map(|r: PgRow| {
    859             Ok(if r.try_get_flag("out_reserve_pub_reuse")? {
    860                 RegistrationResult::ReservePubReuse
    861             } else {
    862                 RegistrationResult::Success
    863             })
    864         })
    865         .fetch_one(db)
    866     )
    867 }
    868 
    869 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> {
    870     serialized!(
    871         sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)")
    872             .bind(&req.authorization_pub)
    873             .bind_timestamp(&Timestamp::now())
    874             .try_map(|r: PgRow| r.try_get_flag("out_found"))
    875             .fetch_one(db)
    876     )
    877 }
    878 
    879 #[cfg(test)]
    880 mod test {
    881     use jiff::{Span, Timestamp, Zoned};
    882     use serde_json::json;
    883     use sqlx::{PgPool, Postgres, pool::PoolConnection, postgres::PgRow};
    884     use taler_api::{
    885         db::TypeHelper,
    886         notification::dummy_listen,
    887         subject::{IncomingSubject, OutgoingSubject},
    888     };
    889     use taler_common::{
    890         api_common::{EddsaPublicKey, HashCode, ShortHashCode},
    891         api_params::{History, Page},
    892         types::{
    893             amount::{amount, decimal},
    894             url,
    895             utils::now_sql_stable_ts,
    896         },
    897     };
    898 
    899     use super::TxInAdmin;
    900     use crate::{
    901         constants::CONFIG_SOURCE,
    902         db::{
    903             self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult,
    904             TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer,
    905             register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out,
    906         },
    907         magnet_api::types::TxStatus,
    908         magnet_payto,
    909     };
    910 
    911     async fn setup() -> (PoolConnection<Postgres>, PgPool) {
    912         taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await
    913     }
    914 
    915     #[tokio::test]
    916     async fn kv() {
    917         let (mut db, _) = setup().await;
    918 
    919         let value = json!({
    920             "name": "Mr Smith",
    921             "no way": 32
    922         });
    923 
    924         assert_eq!(
    925             kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
    926             None
    927         );
    928         kv_set(&mut db, "value", &value).await.unwrap();
    929         kv_set(&mut db, "value", &value).await.unwrap();
    930         assert_eq!(
    931             kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
    932             Some(value)
    933         );
    934     }
    935 
    936     #[tokio::test]
    937     async fn tx_in() {
    938         let (mut db, pool) = setup().await;
    939 
    940         let mut routine = async |first: &Option<IncomingSubject>,
    941                                  second: &Option<IncomingSubject>| {
    942             let (id, code) =
    943                 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in")
    944                     .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
    945                     .fetch_one(&mut *db)
    946                     .await
    947                     .unwrap();
    948             let now = now_sql_stable_ts();
    949             let date = Zoned::now().date();
    950             let later = date.tomorrow().unwrap();
    951             let tx = TxIn {
    952                 code,
    953                 amount: amount("EUR:10"),
    954                 subject: "subject".into(),
    955                 debtor: magnet_payto(
    956                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
    957                 ),
    958                 value_date: date,
    959                 status: TxStatus::Completed,
    960             };
    961             // Insert
    962             assert_eq!(
    963                 register_tx_in(&mut db, &tx, first, &now)
    964                     .await
    965                     .expect("register tx in"),
    966                 AddIncomingResult::Success {
    967                     new: true,
    968                     pending: false,
    969                     row_id: id,
    970                     valued_at: date
    971                 }
    972             );
    973             // Idempotent
    974             assert_eq!(
    975                 register_tx_in(
    976                     &mut db,
    977                     &TxIn {
    978                         value_date: later,
    979                         ..tx.clone()
    980                     },
    981                     first,
    982                     &now
    983                 )
    984                 .await
    985                 .expect("register tx in"),
    986                 AddIncomingResult::Success {
    987                     new: false,
    988                     pending: false,
    989                     row_id: id,
    990                     valued_at: date
    991                 }
    992             );
    993             // Many
    994             assert_eq!(
    995                 register_tx_in(
    996                     &mut db,
    997                     &TxIn {
    998                         code: code + 1,
    999                         value_date: later,
   1000                         ..tx
   1001                     },
   1002                     second,
   1003                     &now
   1004                 )
   1005                 .await
   1006                 .expect("register tx in"),
   1007                 AddIncomingResult::Success {
   1008                     new: true,
   1009                     pending: false,
   1010                     row_id: id + 1,
   1011                     valued_at: later
   1012                 }
   1013             );
   1014         };
   1015 
   1016         // Empty db
   1017         assert_eq!(
   1018             db::revenue_history(&pool, &History::default(), dummy_listen)
   1019                 .await
   1020                 .unwrap(),
   1021             Vec::new()
   1022         );
   1023         assert_eq!(
   1024             db::incoming_history(&pool, &History::default(), dummy_listen)
   1025                 .await
   1026                 .unwrap(),
   1027             Vec::new()
   1028         );
   1029 
   1030         // Regular transaction
   1031         routine(&None, &None).await;
   1032 
   1033         // Reserve transaction
   1034         routine(
   1035             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1036             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1037         )
   1038         .await;
   1039 
   1040         // Kyc transaction
   1041         routine(
   1042             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1043             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1044         )
   1045         .await;
   1046 
   1047         // History
   1048         assert_eq!(
   1049             db::revenue_history(&pool, &History::default(), dummy_listen)
   1050                 .await
   1051                 .unwrap()
   1052                 .len(),
   1053             6
   1054         );
   1055         assert_eq!(
   1056             db::incoming_history(&pool, &History::default(), dummy_listen)
   1057                 .await
   1058                 .unwrap()
   1059                 .len(),
   1060             4
   1061         );
   1062     }
   1063 
   1064     #[tokio::test]
   1065     async fn tx_in_admin() {
   1066         let (_, pool) = setup().await;
   1067 
   1068         // Empty db
   1069         assert_eq!(
   1070             db::incoming_history(&pool, &History::default(), dummy_listen)
   1071                 .await
   1072                 .unwrap(),
   1073             Vec::new()
   1074         );
   1075 
   1076         let now = now_sql_stable_ts();
   1077         let later = now + Span::new().hours(2);
   1078         let date = Zoned::now().date();
   1079         let tx = TxInAdmin {
   1080             amount: amount("EUR:10"),
   1081             subject: "subject".to_owned(),
   1082             debtor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"),
   1083             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1084         };
   1085         // Insert
   1086         assert_eq!(
   1087             register_tx_in_admin(&pool, &tx, &now)
   1088                 .await
   1089                 .expect("register tx in"),
   1090             AddIncomingResult::Success {
   1091                 new: true,
   1092                 pending: false,
   1093                 row_id: 1,
   1094                 valued_at: date
   1095             }
   1096         );
   1097         // Many
   1098         assert_eq!(
   1099             register_tx_in_admin(
   1100                 &pool,
   1101                 &TxInAdmin {
   1102                     subject: "Other".to_owned(),
   1103                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1104                     ..tx.clone()
   1105                 },
   1106                 &later
   1107             )
   1108             .await
   1109             .expect("register tx in"),
   1110             AddIncomingResult::Success {
   1111                 new: true,
   1112                 pending: false,
   1113                 row_id: 2,
   1114                 valued_at: date
   1115             }
   1116         );
   1117 
   1118         // History
   1119         assert_eq!(
   1120             db::incoming_history(&pool, &History::default(), dummy_listen)
   1121                 .await
   1122                 .unwrap()
   1123                 .len(),
   1124             2
   1125         );
   1126     }
   1127 
   1128     #[tokio::test]
   1129     async fn tx_out() {
   1130         let (mut db, pool) = setup().await;
   1131 
   1132         let mut routine = async |first: &TxOutKind, second: &TxOutKind| {
   1133             let (id, code) =
   1134                 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out")
   1135                     .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
   1136                     .fetch_one(&mut *db)
   1137                     .await
   1138                     .unwrap();
   1139             let now = now_sql_stable_ts();
   1140             let date = Zoned::now().date();
   1141             let later = date.tomorrow().unwrap();
   1142             let tx = TxOut {
   1143                 code,
   1144                 amount: amount("HUF:10"),
   1145                 subject: "subject".into(),
   1146                 creditor: magnet_payto(
   1147                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
   1148                 ),
   1149                 value_date: date,
   1150                 status: TxStatus::Completed,
   1151             };
   1152             assert!(matches!(
   1153                 make_transfer(
   1154                     &pool,
   1155                     &db::Transfer {
   1156                         request_uid: HashCode::rand(),
   1157                         amount: decimal("10"),
   1158                         exchange_base_url: url("https://exchange.test.com/"),
   1159                         metadata: None,
   1160                         wtid: ShortHashCode::rand(),
   1161                         creditor: tx.creditor.clone()
   1162                     },
   1163                     &now
   1164                 )
   1165                 .await
   1166                 .unwrap(),
   1167                 TransferResult::Success { .. }
   1168             ));
   1169             db::initiated_submit_success(&mut db, 1, &Timestamp::now(), tx.code)
   1170                 .await
   1171                 .expect("status success");
   1172 
   1173             // Insert
   1174             assert_eq!(
   1175                 register_tx_out(&mut db, &tx, first, &now)
   1176                     .await
   1177                     .expect("register tx out"),
   1178                 AddOutgoingResult {
   1179                     result: db::RegisterResult::known,
   1180                     row_id: id,
   1181                 }
   1182             );
   1183             // Idempotent
   1184             assert_eq!(
   1185                 register_tx_out(
   1186                     &mut db,
   1187                     &TxOut {
   1188                         value_date: later,
   1189                         ..tx.clone()
   1190                     },
   1191                     first,
   1192                     &now
   1193                 )
   1194                 .await
   1195                 .expect("register tx out"),
   1196                 AddOutgoingResult {
   1197                     result: db::RegisterResult::idempotent,
   1198                     row_id: id,
   1199                 }
   1200             );
   1201             // Recovered
   1202             assert_eq!(
   1203                 register_tx_out(
   1204                     &mut db,
   1205                     &TxOut {
   1206                         code: code + 1,
   1207                         value_date: later,
   1208                         ..tx.clone()
   1209                     },
   1210                     second,
   1211                     &now
   1212                 )
   1213                 .await
   1214                 .expect("register tx out"),
   1215                 AddOutgoingResult {
   1216                     result: db::RegisterResult::recovered,
   1217                     row_id: id + 1,
   1218                 }
   1219             );
   1220         };
   1221 
   1222         // Empty db
   1223         assert_eq!(
   1224             db::outgoing_history(&pool, &History::default(), dummy_listen)
   1225                 .await
   1226                 .unwrap(),
   1227             Vec::new()
   1228         );
   1229 
   1230         // Regular transaction
   1231         routine(&TxOutKind::Simple, &TxOutKind::Simple).await;
   1232 
   1233         // Talerable transaction
   1234         routine(
   1235             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1236             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1237         )
   1238         .await;
   1239 
   1240         // Bounced transaction
   1241         routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1242 
   1243         // History
   1244         assert_eq!(
   1245             db::outgoing_history(&pool, &History::default(), dummy_listen)
   1246                 .await
   1247                 .unwrap()
   1248                 .len(),
   1249             2
   1250         );
   1251     }
   1252 
   1253     #[tokio::test]
   1254     async fn tx_out_failure() {
   1255         let (mut db, pool) = setup().await;
   1256 
   1257         let now = now_sql_stable_ts();
   1258 
   1259         // Unknown
   1260         assert_eq!(
   1261             db::register_tx_out_failure(&mut db, 42, None, &now)
   1262                 .await
   1263                 .unwrap(),
   1264             OutFailureResult {
   1265                 initiated_id: None,
   1266                 new: false
   1267             }
   1268         );
   1269         assert_eq!(
   1270             db::register_tx_out_failure(&mut db, 42, Some(12), &now)
   1271                 .await
   1272                 .unwrap(),
   1273             OutFailureResult {
   1274                 initiated_id: None,
   1275                 new: false
   1276             }
   1277         );
   1278 
   1279         // Initiated
   1280         let req = db::Transfer {
   1281             request_uid: HashCode::rand(),
   1282             amount: decimal("10"),
   1283             exchange_base_url: url("https://exchange.test.com/"),
   1284             metadata: None,
   1285             wtid: ShortHashCode::rand(),
   1286             creditor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"),
   1287         };
   1288         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1289         assert_eq!(
   1290             make_transfer(&pool, &req, &now).await.unwrap(),
   1291             TransferResult::Success {
   1292                 id: 1,
   1293                 initiated_at: now
   1294             }
   1295         );
   1296         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34)
   1297             .await
   1298             .expect("status success");
   1299         assert_eq!(
   1300             db::register_tx_out_failure(&mut db, 34, None, &now)
   1301                 .await
   1302                 .unwrap(),
   1303             OutFailureResult {
   1304                 initiated_id: Some(1),
   1305                 new: true
   1306             }
   1307         );
   1308         assert_eq!(
   1309             db::register_tx_out_failure(&mut db, 34, None, &now)
   1310                 .await
   1311                 .unwrap(),
   1312             OutFailureResult {
   1313                 initiated_id: Some(1),
   1314                 new: false
   1315             }
   1316         );
   1317 
   1318         // Recovered bounce
   1319         let tx = TxIn {
   1320             code: 12,
   1321             amount: amount("HUF:11"),
   1322             subject: "malformed transaction".into(),
   1323             debtor: payto,
   1324             value_date: Zoned::now().date(),
   1325             status: TxStatus::Completed,
   1326         };
   1327         assert_eq!(
   1328             db::register_bounce_tx_in(&mut db, &tx, "no reason", &now)
   1329                 .await
   1330                 .unwrap(),
   1331             BounceResult {
   1332                 tx_id: 1,
   1333                 tx_new: true,
   1334                 bounce_id: 2,
   1335                 bounce_new: true
   1336             }
   1337         );
   1338         assert_eq!(
   1339             db::register_tx_out_failure(&mut db, 10, Some(12), &now)
   1340                 .await
   1341                 .unwrap(),
   1342             OutFailureResult {
   1343                 initiated_id: Some(2),
   1344                 new: true
   1345             }
   1346         );
   1347         assert_eq!(
   1348             db::register_tx_out_failure(&mut db, 10, Some(12), &now)
   1349                 .await
   1350                 .unwrap(),
   1351             OutFailureResult {
   1352                 initiated_id: Some(2),
   1353                 new: false
   1354             }
   1355         );
   1356     }
   1357 
   1358     #[tokio::test]
   1359     async fn transfer() {
   1360         let (_, pool) = setup().await;
   1361 
   1362         // Empty db
   1363         assert_eq!(db::transfer_by_id(&pool, 0).await.unwrap(), None);
   1364         assert_eq!(
   1365             db::transfer_page(&pool, &None, &Page::default())
   1366                 .await
   1367                 .unwrap(),
   1368             Vec::new()
   1369         );
   1370 
   1371         let req = db::Transfer {
   1372             request_uid: HashCode::rand(),
   1373             amount: decimal("10"),
   1374             exchange_base_url: url("https://exchange.test.com/"),
   1375             metadata: None,
   1376             wtid: ShortHashCode::rand(),
   1377             creditor: magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name"),
   1378         };
   1379         let now = now_sql_stable_ts();
   1380         let later = now + Span::new().hours(2);
   1381         // Insert
   1382         assert_eq!(
   1383             make_transfer(&pool, &req, &now).await.expect("transfer"),
   1384             TransferResult::Success {
   1385                 id: 1,
   1386                 initiated_at: now
   1387             }
   1388         );
   1389         // Idempotent
   1390         assert_eq!(
   1391             make_transfer(&pool, &req, &later).await.expect("transfer"),
   1392             TransferResult::Success {
   1393                 id: 1,
   1394                 initiated_at: now
   1395             }
   1396         );
   1397         // Request UID reuse
   1398         assert_eq!(
   1399             make_transfer(
   1400                 &pool,
   1401                 &db::Transfer {
   1402                     wtid: ShortHashCode::rand(),
   1403                     ..req.clone()
   1404                 },
   1405                 &now
   1406             )
   1407             .await
   1408             .expect("transfer"),
   1409             TransferResult::RequestUidReuse
   1410         );
   1411         // wtid reuse
   1412         assert_eq!(
   1413             make_transfer(
   1414                 &pool,
   1415                 &db::Transfer {
   1416                     request_uid: HashCode::rand(),
   1417                     ..req.clone()
   1418                 },
   1419                 &now
   1420             )
   1421             .await
   1422             .expect("transfer"),
   1423             TransferResult::WtidReuse
   1424         );
   1425         // Many
   1426         assert_eq!(
   1427             make_transfer(
   1428                 &pool,
   1429                 &db::Transfer {
   1430                     request_uid: HashCode::rand(),
   1431                     wtid: ShortHashCode::rand(),
   1432                     ..req
   1433                 },
   1434                 &later
   1435             )
   1436             .await
   1437             .expect("transfer"),
   1438             TransferResult::Success {
   1439                 id: 2,
   1440                 initiated_at: later
   1441             }
   1442         );
   1443 
   1444         // Get
   1445         assert!(db::transfer_by_id(&pool, 1).await.unwrap().is_some());
   1446         assert!(db::transfer_by_id(&pool, 2).await.unwrap().is_some());
   1447         assert!(db::transfer_by_id(&pool, 3).await.unwrap().is_none());
   1448         assert_eq!(
   1449             db::transfer_page(&pool, &None, &Page::default())
   1450                 .await
   1451                 .unwrap()
   1452                 .len(),
   1453             2
   1454         );
   1455     }
   1456 
   1457     #[tokio::test]
   1458     async fn bounce() {
   1459         let (mut db, _) = setup().await;
   1460 
   1461         let amount = amount("HUF:10");
   1462         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1463         let now = now_sql_stable_ts();
   1464         let date = Zoned::now().date();
   1465 
   1466         // Empty db
   1467         assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty());
   1468 
   1469         // Insert
   1470         assert_eq!(
   1471             register_tx_in(
   1472                 &mut db,
   1473                 &TxIn {
   1474                     code: 13,
   1475                     amount: amount.clone(),
   1476                     subject: "subject".into(),
   1477                     debtor: payto.clone(),
   1478                     value_date: date,
   1479                     status: TxStatus::Completed
   1480                 },
   1481                 &None,
   1482                 &now
   1483             )
   1484             .await
   1485             .expect("register tx in"),
   1486             AddIncomingResult::Success {
   1487                 new: true,
   1488                 pending: false,
   1489                 row_id: 1,
   1490                 valued_at: date
   1491             }
   1492         );
   1493 
   1494         // Bounce
   1495         assert_eq!(
   1496             register_bounce_tx_in(
   1497                 &mut db,
   1498                 &TxIn {
   1499                     code: 12,
   1500                     amount: amount.clone(),
   1501                     subject: "subject".into(),
   1502                     debtor: payto.clone(),
   1503                     value_date: date,
   1504                     status: TxStatus::Completed
   1505                 },
   1506                 "good reason",
   1507                 &now
   1508             )
   1509             .await
   1510             .expect("bounce"),
   1511             BounceResult {
   1512                 tx_id: 2,
   1513                 tx_new: true,
   1514                 bounce_id: 1,
   1515                 bounce_new: true
   1516             }
   1517         );
   1518         // Idempotent
   1519         assert_eq!(
   1520             register_bounce_tx_in(
   1521                 &mut db,
   1522                 &TxIn {
   1523                     code: 12,
   1524                     amount: amount.clone(),
   1525                     subject: "subject".into(),
   1526                     debtor: payto.clone(),
   1527                     value_date: date,
   1528                     status: TxStatus::Completed
   1529                 },
   1530                 "good reason",
   1531                 &now
   1532             )
   1533             .await
   1534             .expect("bounce"),
   1535             BounceResult {
   1536                 tx_id: 2,
   1537                 tx_new: false,
   1538                 bounce_id: 1,
   1539                 bounce_new: false
   1540             }
   1541         );
   1542 
   1543         // Bounce registered
   1544         assert_eq!(
   1545             register_bounce_tx_in(
   1546                 &mut db,
   1547                 &TxIn {
   1548                     code: 13,
   1549                     amount: amount.clone(),
   1550                     subject: "subject".into(),
   1551                     debtor: payto.clone(),
   1552                     value_date: date,
   1553                     status: TxStatus::Completed
   1554                 },
   1555                 "good reason",
   1556                 &now
   1557             )
   1558             .await
   1559             .expect("bounce"),
   1560             BounceResult {
   1561                 tx_id: 1,
   1562                 tx_new: false,
   1563                 bounce_id: 2,
   1564                 bounce_new: true
   1565             }
   1566         );
   1567         // Idempotent registered
   1568         assert_eq!(
   1569             register_bounce_tx_in(
   1570                 &mut db,
   1571                 &TxIn {
   1572                     code: 13,
   1573                     amount: amount.clone(),
   1574                     subject: "subject".into(),
   1575                     debtor: payto.clone(),
   1576                     value_date: date,
   1577                     status: TxStatus::Completed
   1578                 },
   1579                 "good reason",
   1580                 &now
   1581             )
   1582             .await
   1583             .expect("bounce"),
   1584             BounceResult {
   1585                 tx_id: 1,
   1586                 tx_new: false,
   1587                 bounce_id: 2,
   1588                 bounce_new: false
   1589             }
   1590         );
   1591 
   1592         // Batch
   1593         assert_eq!(
   1594             db::pending_batch(&mut db, &now).await.unwrap(),
   1595             &[
   1596                 Initiated {
   1597                     id: 1,
   1598                     amount: amount.clone(),
   1599                     subject: "bounce: 12".into(),
   1600                     creditor: payto.clone()
   1601                 },
   1602                 Initiated {
   1603                     id: 2,
   1604                     amount,
   1605                     subject: "bounce: 13".into(),
   1606                     creditor: payto
   1607                 }
   1608             ]
   1609         );
   1610     }
   1611 
   1612     #[tokio::test]
   1613     async fn status() {
   1614         let (mut db, _) = setup().await;
   1615 
   1616         // Unknown transfer
   1617         db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg")
   1618             .await
   1619             .unwrap();
   1620         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1621             .await
   1622             .unwrap();
   1623     }
   1624 
   1625     #[tokio::test]
   1626     async fn batch() {
   1627         let (mut db, pool) = setup().await;
   1628         let start = Timestamp::now();
   1629         let magnet_payto =
   1630             magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1631 
   1632         // Empty db
   1633         let pendings = db::pending_batch(&mut db, &start)
   1634             .await
   1635             .expect("pending_batch");
   1636         assert_eq!(pendings.len(), 0);
   1637 
   1638         // Some transfers
   1639         for i in 0..3 {
   1640             make_transfer(
   1641                 &pool,
   1642                 &db::Transfer {
   1643                     request_uid: HashCode::rand(),
   1644                     amount: decimal(format!("{}", i + 1)),
   1645                     exchange_base_url: url("https://exchange.test.com/"),
   1646                     metadata: None,
   1647                     wtid: ShortHashCode::rand(),
   1648                     creditor: magnet_payto.clone(),
   1649                 },
   1650                 &Timestamp::now(),
   1651             )
   1652             .await
   1653             .expect("transfer");
   1654         }
   1655         let pendings = db::pending_batch(&mut db, &start)
   1656             .await
   1657             .expect("pending_batch");
   1658         assert_eq!(pendings.len(), 3);
   1659 
   1660         // Max 100 txs in batch
   1661         for i in 0..100 {
   1662             make_transfer(
   1663                 &pool,
   1664                 &db::Transfer {
   1665                     request_uid: HashCode::rand(),
   1666                     amount: decimal(format!("{}", i + 1)),
   1667                     exchange_base_url: url("https://exchange.test.com/"),
   1668                     metadata: None,
   1669                     wtid: ShortHashCode::rand(),
   1670                     creditor: magnet_payto.clone(),
   1671                 },
   1672                 &Timestamp::now(),
   1673             )
   1674             .await
   1675             .expect("transfer");
   1676         }
   1677         let pendings = db::pending_batch(&mut db, &start)
   1678             .await
   1679             .expect("pending_batch");
   1680         assert_eq!(pendings.len(), 100);
   1681 
   1682         // Skip uploaded
   1683         for i in 0..=10 {
   1684             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1685                 .await
   1686                 .expect("status success");
   1687         }
   1688         let pendings = db::pending_batch(&mut db, &start)
   1689             .await
   1690             .expect("pending_batch");
   1691         assert_eq!(pendings.len(), 93);
   1692 
   1693         // Skip failed
   1694         for i in 0..=10 {
   1695             db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure")
   1696                 .await
   1697                 .expect("status failure");
   1698         }
   1699         let pendings = db::pending_batch(&mut db, &start)
   1700             .await
   1701             .expect("pending_batch");
   1702         assert_eq!(pendings.len(), 83);
   1703     }
   1704 }