taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (47527B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use jiff::{Timestamp, civil::Date, tz::TimeZone};
     20 use serde::{Serialize, de::DeserializeOwned};
     21 use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow};
     22 use taler_api::{
     23     db::{BindHelper, IncomingType, TypeHelper, history, page},
     24     subject::{IncomingSubject, OutgoingSubject},
     25 };
     26 use taler_common::{
     27     api_params::{History, Page},
     28     api_revenue::RevenueIncomingBankTransaction,
     29     api_wire::{
     30         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest,
     31         TransferState, TransferStatus,
     32     },
     33     types::{amount::Amount, payto::PaytoImpl as _},
     34 };
     35 use tokio::sync::watch::{Receiver, Sender};
     36 
     37 use crate::{FullHuPayto, constants::CURRENCY, magnet_api::types::TxStatus};
     38 
     39 pub async fn notification_listener(
     40     pool: PgPool,
     41     in_channel: Sender<i64>,
     42     taler_in_channel: Sender<i64>,
     43     out_channel: Sender<i64>,
     44     taler_out_channel: Sender<i64>,
     45 ) -> sqlx::Result<()> {
     46     taler_api::notification::notification_listener!(&pool,
     47         "tx_in" => (row_id: i64) {
     48             in_channel.send_replace(row_id);
     49         },
     50         "taler_in" => (row_id: i64) {
     51             taler_in_channel.send_replace(row_id);
     52         },
     53         "tx_out" => (row_id: i64) {
     54             out_channel.send_replace(row_id);
     55         },
     56         "taler_out" => (row_id: i64) {
     57             taler_out_channel.send_replace(row_id);
     58         }
     59     )
     60 }
     61 
     62 #[derive(Debug, Clone)]
     63 pub struct TxIn {
     64     pub code: u64,
     65     pub amount: Amount,
     66     pub subject: String,
     67     pub debtor: FullHuPayto,
     68     pub value_date: Date,
     69     pub status: TxStatus,
     70 }
     71 
     72 impl Display for TxIn {
     73     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     74         let Self {
     75             code,
     76             amount,
     77             subject,
     78             debtor,
     79             value_date,
     80             status,
     81         } = self;
     82         write!(
     83             f,
     84             "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
     85             debtor.bban(),
     86             debtor.name
     87         )
     88     }
     89 }
     90 
     91 #[derive(Debug, Clone)]
     92 pub struct TxOut {
     93     pub code: u64,
     94     pub amount: Amount,
     95     pub subject: String,
     96     pub creditor: FullHuPayto,
     97     pub value_date: Date,
     98     pub status: TxStatus,
     99 }
    100 
    101 impl Display for TxOut {
    102     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    103         let Self {
    104             code,
    105             amount,
    106             subject,
    107             creditor,
    108             value_date,
    109             status,
    110         } = self;
    111         write!(
    112             f,
    113             "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
    114             creditor.bban(),
    115             &creditor.name
    116         )
    117     }
    118 }
    119 
    120 #[derive(Debug, PartialEq, Eq)]
    121 pub struct Initiated {
    122     pub id: u64,
    123     pub amount: Amount,
    124     pub subject: String,
    125     pub creditor: FullHuPayto,
    126 }
    127 
    128 impl Display for Initiated {
    129     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    130         let Self {
    131             id,
    132             amount,
    133             subject,
    134             creditor,
    135         } = self;
    136         write!(
    137             f,
    138             "{id} {amount} ({} {}) '{subject}'",
    139             creditor.bban(),
    140             &creditor.name
    141         )
    142     }
    143 }
    144 
    145 #[derive(Debug, Clone)]
    146 pub struct TxInAdmin {
    147     pub amount: Amount,
    148     pub subject: String,
    149     pub debtor: FullHuPayto,
    150     pub metadata: IncomingSubject,
    151 }
    152 
    153 #[derive(Debug, PartialEq, Eq)]
    154 pub enum AddIncomingResult {
    155     Success {
    156         new: bool,
    157         row_id: u64,
    158         valued_at: Date,
    159     },
    160     ReservePubReuse,
    161 }
    162 
    163 pub async fn register_tx_in_admin(
    164     db: &PgPool,
    165     tx: &TxInAdmin,
    166     now: &Timestamp,
    167 ) -> sqlx::Result<AddIncomingResult> {
    168     sqlx::query(
    169         "
    170             SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new
    171             FROM register_tx_in(NULL, ($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8, $6)
    172         ",
    173     )
    174     .bind_amount(&tx.amount)
    175     .bind(&tx.subject)
    176     .bind(tx.debtor.iban())
    177     .bind(&tx.debtor.name)
    178     .bind_date(&now.to_zoned(TimeZone::UTC).date())
    179     .bind(tx.metadata.ty())
    180     .bind(tx.metadata.key())
    181     .try_map(|r: PgRow| {
    182         Ok(if r.try_get(0)? {
    183             AddIncomingResult::ReservePubReuse
    184         } else {
    185             AddIncomingResult::Success {
    186                 row_id: r.try_get_u64(1)?,
    187                 valued_at: r.try_get_date(2)?,
    188                 new: r.try_get(3)?,
    189             }
    190         })
    191     })
    192     .fetch_one(db)
    193     .await
    194 }
    195 
    196 pub async fn register_tx_in(
    197     db: &mut PgConnection,
    198     tx: &TxIn,
    199     subject: &Option<IncomingSubject>,
    200     now: &Timestamp,
    201 ) -> sqlx::Result<AddIncomingResult> {
    202     sqlx::query(
    203         "
    204             SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new
    205             FROM register_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10)
    206         ",
    207     )
    208     .bind(tx.code as i64)
    209     .bind_amount(&tx.amount)
    210     .bind(&tx.subject)
    211     .bind(tx.debtor.iban())
    212     .bind(&tx.debtor.name)
    213     .bind_date(&tx.value_date)
    214     .bind(subject.as_ref().map(|it| it.ty()))
    215     .bind(subject.as_ref().map(|it| it.key()))
    216     .bind_timestamp(now)
    217     .try_map(|r: PgRow| {
    218         Ok(if r.try_get(0)? {
    219             AddIncomingResult::ReservePubReuse
    220         } else {
    221             AddIncomingResult::Success {
    222                 row_id: r.try_get_u64(1)?,
    223                 valued_at: r.try_get_date(2)?,
    224                 new: r.try_get(3)?,
    225             }
    226         })
    227     })
    228     .fetch_one(db)
    229     .await
    230 }
    231 
    232 #[derive(Debug)]
    233 pub enum TxOutKind {
    234     Simple,
    235     Bounce(u32),
    236     Talerable(OutgoingSubject),
    237 }
    238 
    239 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    240 #[allow(non_camel_case_types)]
    241 #[sqlx(type_name = "register_result")]
    242 pub enum RegisterResult {
    243     /// Already registered
    244     idempotent,
    245     /// Initiated transaction
    246     known,
    247     /// Recovered unknown outgoing transaction
    248     recovered,
    249 }
    250 
    251 #[derive(Debug, PartialEq, Eq)]
    252 pub struct AddOutgoingResult {
    253     pub result: RegisterResult,
    254     pub row_id: u64,
    255 }
    256 
    257 pub async fn register_tx_out(
    258     db: &mut PgConnection,
    259     tx: &TxOut,
    260     kind: &TxOutKind,
    261     now: &Timestamp,
    262 ) -> sqlx::Result<AddOutgoingResult> {
    263     let query = sqlx::query(
    264         "
    265             SELECT out_result, out_tx_row_id 
    266             FROM register_tx_out($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10, $11)
    267         ",
    268     )
    269     .bind(tx.code as i64)
    270     .bind_amount(&tx.amount)
    271     .bind(&tx.subject)
    272     .bind(tx.creditor.iban())
    273     .bind(&tx.creditor.name)
    274     .bind_date(&tx.value_date);
    275     let query = match kind {
    276         TxOutKind::Simple => query
    277             .bind(None::<&[u8]>)
    278             .bind(None::<&str>)
    279             .bind(None::<i64>),
    280         TxOutKind::Bounce(bounced) => query
    281             .bind(None::<&[u8]>)
    282             .bind(None::<&str>)
    283             .bind(*bounced as i64),
    284         TxOutKind::Talerable(subject) => query
    285             .bind(subject.0.as_ref())
    286             .bind(subject.1.as_ref())
    287             .bind(None::<i64>),
    288     };
    289     query
    290         .bind_timestamp(now)
    291         .try_map(|r: PgRow| {
    292             Ok(AddOutgoingResult {
    293                 result: r.try_get(0)?,
    294                 row_id: r.try_get_u64(1)?,
    295             })
    296         })
    297         .fetch_one(db)
    298         .await
    299 }
    300 
    301 #[derive(Debug, PartialEq, Eq)]
    302 pub struct OutFailureResult {
    303     pub initiated_id: Option<u64>,
    304     pub new: bool,
    305 }
    306 
    307 pub async fn register_tx_out_failure(
    308     db: &mut PgConnection,
    309     code: u64,
    310     bounced: Option<u32>,
    311     now: &Timestamp,
    312 ) -> sqlx::Result<OutFailureResult> {
    313     sqlx::query(
    314         "
    315             SELECT out_new, out_initiated_id 
    316             FROM register_tx_out_failure($1, $2, $3)
    317         ",
    318     )
    319     .bind(code as i64)
    320     .bind(bounced.map(|i| i as i32))
    321     .bind_timestamp(now)
    322     .try_map(|r: PgRow| {
    323         Ok(OutFailureResult {
    324             new: r.try_get(0)?,
    325             initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64),
    326         })
    327     })
    328     .fetch_one(db)
    329     .await
    330 }
    331 
    332 #[derive(Debug, PartialEq, Eq)]
    333 pub enum TransferResult {
    334     Success { id: u64, initiated_at: Timestamp },
    335     RequestUidReuse,
    336     WtidReuse,
    337 }
    338 
    339 pub async fn make_transfer<'a>(
    340     db: impl PgExecutor<'a>,
    341     req: &TransferRequest,
    342     creditor: &FullHuPayto,
    343     now: &Timestamp,
    344 ) -> sqlx::Result<TransferResult> {
    345     let subject = format!("{} {}", req.wtid, req.exchange_base_url);
    346     sqlx::query(
    347         "
    348             SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    349             FROM taler_transfer($1, $2, $3, ($4, $5)::taler_amount, $6, $7, $8, $9)
    350         ",
    351     )
    352     .bind(req.request_uid.as_ref())
    353     .bind(req.wtid.as_ref())
    354     .bind(&subject)
    355     .bind_amount(&req.amount)
    356     .bind(req.exchange_base_url.as_str())
    357     .bind(creditor.iban())
    358     .bind(&creditor.name)
    359     .bind_timestamp(now)
    360     .try_map(|r: PgRow| {
    361         Ok(if r.try_get(0)? {
    362             TransferResult::RequestUidReuse
    363         } else if r.try_get(1)? {
    364             TransferResult::WtidReuse
    365         } else {
    366             TransferResult::Success {
    367                 id: r.try_get_u64(2)?,
    368                 initiated_at: r.try_get_timestamp(3)?,
    369             }
    370         })
    371     })
    372     .fetch_one(db)
    373     .await
    374 }
    375 
    376 #[derive(Debug, PartialEq, Eq)]
    377 pub struct BounceResult {
    378     pub tx_id: u64,
    379     pub tx_new: bool,
    380     pub bounce_id: u64,
    381     pub bounce_new: bool,
    382 }
    383 
    384 pub async fn register_bounce_tx_in(
    385     db: &mut PgConnection,
    386     tx: &TxIn,
    387     amount: &Amount,
    388     reason: &str,
    389     now: &Timestamp,
    390 ) -> sqlx::Result<BounceResult> {
    391     sqlx::query(
    392         "
    393             SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new
    394             FROM register_bounce_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, ($8, $9)::taler_amount, $10, $11)
    395         ",
    396     )
    397     .bind(tx.code as i64)
    398     .bind_amount(&tx.amount)
    399     .bind(&tx.subject)
    400     .bind(tx.debtor.iban())
    401     .bind(&tx.debtor.name)
    402      .bind_date(&tx.value_date)
    403     .bind_amount(amount)
    404     .bind(reason)
    405     .bind_timestamp(now)
    406     .try_map(|r: PgRow| {
    407         Ok(BounceResult {
    408             tx_id: r.try_get_u64(0)?,
    409             tx_new: r.try_get(1)?,
    410             bounce_id: r.try_get_u64(2)?,
    411             bounce_new: r.try_get(3)?,
    412         })
    413     })
    414     .fetch_one(db)
    415     .await
    416 }
    417 
    418 pub async fn transfer_page<'a>(
    419     db: impl PgExecutor<'a>,
    420     status: &Option<TransferState>,
    421     params: &Page,
    422 ) -> sqlx::Result<Vec<TransferListStatus>> {
    423     page(
    424         db,
    425         "initiated_id",
    426         params,
    427         || {
    428             let mut builder = QueryBuilder::new(
    429                 "
    430                     SELECT
    431                         initiated_id,
    432                         status,
    433                         (amount).val as amount_val,
    434                         (amount).frac as amount_frac,
    435                         credit_account,
    436                         credit_name,
    437                         initiated_at
    438                     FROM transfer 
    439                     JOIN initiated USING (initiated_id)
    440                     WHERE
    441                 ",
    442             );
    443             if let Some(status) = status {
    444                 builder.push(" status = ").push_bind(status).push(" AND ");
    445             }
    446             builder
    447         },
    448         |r: PgRow| {
    449             Ok(TransferListStatus {
    450                 row_id: r.try_get_safeu64(0)?,
    451                 status: r.try_get(1)?,
    452                 amount: r.try_get_amount_i(2, &CURRENCY)?,
    453                 credit_account: r.try_get_iban(4)?.as_full_payto(r.try_get(5)?),
    454                 timestamp: r.try_get_timestamp(6)?.into(),
    455             })
    456         },
    457     )
    458     .await
    459 }
    460 
    461 pub async fn outgoing_history(
    462     db: &PgPool,
    463     params: &History,
    464     listen: impl FnOnce() -> Receiver<i64>,
    465 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    466     history(
    467         db,
    468         "tx_out_id",
    469         params,
    470         listen,
    471         || {
    472             QueryBuilder::new(
    473                 "
    474                 SELECT
    475                     tx_out_id,
    476                     (amount).val as amount_val,
    477                     (amount).frac as amount_frac,
    478                     credit_account,
    479                     credit_name,
    480                     valued_at,
    481                     exchange_base_url,
    482                     wtid
    483                 FROM taler_out
    484                 JOIN tx_out USING (tx_out_id)
    485                 WHERE
    486             ",
    487             )
    488         },
    489         |r: PgRow| {
    490             Ok(OutgoingBankTransaction {
    491                 row_id: r.try_get_safeu64(0)?,
    492                 amount: r.try_get_amount_i(1, &CURRENCY)?,
    493                 credit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?),
    494                 date: r.try_get_timestamp(5)?.into(),
    495                 exchange_base_url: r.try_get_url(6)?,
    496                 wtid: r.try_get_base32(7)?,
    497             })
    498         },
    499     )
    500     .await
    501 }
    502 
    503 pub async fn incoming_history(
    504     db: &PgPool,
    505     params: &History,
    506     listen: impl FnOnce() -> Receiver<i64>,
    507 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    508     history(
    509         db,
    510         "tx_in_id",
    511         params,
    512         listen,
    513         || {
    514             QueryBuilder::new(
    515                 "
    516                 SELECT
    517                     type,
    518                     tx_in_id,
    519                     (amount).val as amount_val,
    520                     (amount).frac as amount_frac,
    521                     debit_account,
    522                     debit_name,
    523                     valued_at,
    524                     metadata
    525                 FROM taler_in
    526                 JOIN tx_in USING (tx_in_id)
    527                 WHERE
    528             ",
    529             )
    530         },
    531         |r: PgRow| {
    532             Ok(match r.try_get(0)? {
    533                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    534                     row_id: r.try_get_safeu64(1)?,
    535                     amount: r.try_get_amount_i(2, &CURRENCY)?,
    536                     debit_account: r.try_get_iban(4)?.as_full_payto(r.try_get(5)?),
    537                     date: r.try_get_timestamp(6)?.into(),
    538                     reserve_pub: r.try_get_base32(7)?,
    539                 },
    540                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    541                     row_id: r.try_get_safeu64(1)?,
    542                     amount: r.try_get_amount_i(2, &CURRENCY)?,
    543                     debit_account: r.try_get_iban(4)?.as_full_payto(r.try_get(5)?),
    544                     date: r.try_get_timestamp(6)?.into(),
    545                     account_pub: r.try_get_base32(7)?,
    546                 },
    547                 IncomingType::wad => {
    548                     unimplemented!("WAD is not yet supported")
    549                 }
    550             })
    551         },
    552     )
    553     .await
    554 }
    555 
    556 pub async fn revenue_history(
    557     db: &PgPool,
    558     params: &History,
    559     listen: impl FnOnce() -> Receiver<i64>,
    560 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    561     history(
    562         db,
    563         "tx_in_id",
    564         params,
    565         listen,
    566         || {
    567             QueryBuilder::new(
    568                 "
    569                 SELECT
    570                     tx_in_id,
    571                     valued_at,
    572                     (amount).val as amount_val,
    573                     (amount).frac as amount_frac,
    574                     debit_account,
    575                     debit_name,
    576                     subject
    577                 FROM tx_in
    578                 WHERE
    579             ",
    580             )
    581         },
    582         |r: PgRow| {
    583             Ok(RevenueIncomingBankTransaction {
    584                 row_id: r.try_get_safeu64(0)?,
    585                 date: r.try_get_timestamp(1)?.into(),
    586                 amount: r.try_get_amount_i(2, &CURRENCY)?,
    587                 credit_fee: None,
    588                 debit_account: r.try_get_iban(4)?.as_full_payto(r.try_get(5)?),
    589                 subject: r.try_get(6)?,
    590             })
    591         },
    592     )
    593     .await
    594 }
    595 
    596 pub async fn transfer_by_id<'a>(
    597     db: impl PgExecutor<'a>,
    598     id: u64,
    599 ) -> sqlx::Result<Option<TransferStatus>> {
    600     sqlx::query(
    601         "
    602             SELECT
    603                 status,
    604                 status_msg,
    605                 (amount).val as amount_val,
    606                 (amount).frac as amount_frac,
    607                 exchange_base_url,
    608                 wtid,
    609                 credit_account,
    610                 credit_name,
    611                 initiated_at
    612             FROM transfer 
    613             JOIN initiated USING (initiated_id) 
    614             WHERE initiated_id = $1
    615         ",
    616     )
    617     .bind(id as i64)
    618     .try_map(|r: PgRow| {
    619         Ok(TransferStatus {
    620             status: r.try_get(0)?,
    621             status_msg: r.try_get(1)?,
    622             amount: r.try_get_amount_i(2, &CURRENCY)?,
    623             origin_exchange_url: r.try_get(4)?,
    624             wtid: r.try_get_base32(5)?,
    625             credit_account: r.try_get_iban(6)?.as_full_payto(r.try_get(7)?),
    626             timestamp: r.try_get_timestamp(8)?.into(),
    627         })
    628     })
    629     .fetch_optional(db)
    630     .await
    631 }
    632 
    633 /** Get a batch of pending initiated transactions not attempted since [start] */
    634 pub async fn pending_batch<'a>(
    635     db: impl PgExecutor<'a>,
    636     start: &Timestamp,
    637 ) -> sqlx::Result<Vec<Initiated>> {
    638     sqlx::query(
    639         "
    640             SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name
    641             FROM initiated 
    642             WHERE magnet_code IS NULL
    643                 AND status='pending'
    644                 AND (last_submitted IS NULL OR last_submitted < $1)
    645             LIMIT 100
    646         ",
    647     )
    648     .bind_timestamp(start)
    649     .try_map(|r: PgRow| {
    650         Ok(Initiated {
    651             id: r.try_get_u64(0)?,
    652             amount: r.try_get_amount_i(1, &CURRENCY)?,
    653             subject: r.try_get(3)?,
    654             creditor: FullHuPayto::new(r.try_get_parse(4)?, r.try_get(5)?),
    655         })
    656     })
    657     .fetch_all(db)
    658     .await
    659 }
    660 
    661 /** Get an initiated transaction matching the given magnet [code] */
    662 pub async fn initiated_by_code<'a>(
    663     db: impl PgExecutor<'a>,
    664     code: u64,
    665 ) -> sqlx::Result<Option<Initiated>> {
    666     sqlx::query(
    667         "
    668             SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name
    669             FROM initiated 
    670             WHERE magnet_code IS $1
    671         ",
    672     )
    673     .bind(code as i64)
    674     .try_map(|r: PgRow| {
    675         Ok(Initiated {
    676             id: r.try_get_u64(0)?,
    677             amount: r.try_get_amount_i(1, &CURRENCY)?,
    678             subject: r.try_get(3)?,
    679             creditor: FullHuPayto::new(r.try_get_parse(4)?, r.try_get(5)?),
    680         })
    681     })
    682     .fetch_optional(db)
    683     .await
    684 }
    685 
    686 /** Update status of a successful submitted initiated transaction */
    687 pub async fn initiated_submit_success<'a>(
    688     db: impl PgExecutor<'a>,
    689     id: u64,
    690     timestamp: &Timestamp,
    691     magnet_code: u64,
    692 ) -> sqlx::Result<()> {
    693     sqlx::query(
    694         "
    695             UPDATE initiated
    696             SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2
    697             WHERE initiated_id=$3
    698         "
    699     ).bind_timestamp(timestamp)
    700     .bind(magnet_code as i64)
    701     .bind(id as i64)
    702     .execute(db).await?;
    703     Ok(())
    704 }
    705 
    706 /** Update status of a permanently failed initiated transaction */
    707 pub async fn initiated_submit_permanent_failure<'a>(
    708     db: impl PgExecutor<'a>,
    709     id: u64,
    710     timestamp: &Timestamp,
    711     msg: &str,
    712 ) -> sqlx::Result<()> {
    713     sqlx::query(
    714         "
    715             UPDATE initiated
    716             SET status='permanent_failure', status_msg=$2
    717             WHERE initiated_id=$3
    718         ",
    719     )
    720     .bind_timestamp(timestamp)
    721     .bind(msg)
    722     .bind(id as i64)
    723     .execute(db)
    724     .await?;
    725     Ok(())
    726 }
    727 
    728 /** Check if an initiated transaction exist for a magnet code */
    729 pub async fn initiated_exists_for_code<'a>(
    730     db: impl PgExecutor<'a>,
    731     code: u64,
    732 ) -> sqlx::Result<Option<u64>> {
    733     sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1")
    734         .bind(code as i64)
    735         .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64))
    736         .fetch_optional(db)
    737         .await
    738 }
    739 
    740 /** Get JSON value from KV table */
    741 pub async fn kv_get<'a, T: DeserializeOwned + Unpin + Send>(
    742     db: impl PgExecutor<'a>,
    743     key: &str,
    744 ) -> sqlx::Result<Option<T>> {
    745     sqlx::query("SELECT value FROM kv WHERE key=$1")
    746         .bind(key)
    747         .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    748         .fetch_optional(db)
    749         .await
    750 }
    751 
    752 /** Set JSON value in KV table */
    753 pub async fn kv_set<'a, T: Serialize>(
    754     db: impl PgExecutor<'a>,
    755     key: &str,
    756     value: &T,
    757 ) -> sqlx::Result<()> {
    758     sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    759         .bind(key)
    760         .bind(sqlx::types::Json(value))
    761         .execute(db)
    762         .await?;
    763     Ok(())
    764 }
    765 
    766 #[cfg(test)]
    767 mod test {
    768     use jiff::{Span, Timestamp, Zoned};
    769     use serde_json::json;
    770     use sqlx::{PgConnection, PgPool, postgres::PgRow};
    771     use taler_api::{
    772         db::TypeHelper,
    773         subject::{IncomingSubject, OutgoingSubject},
    774     };
    775     use taler_common::{
    776         api_common::{EddsaPublicKey, HashCode, ShortHashCode},
    777         api_params::{History, Page},
    778         api_wire::TransferRequest,
    779         types::{amount::amount, payto::payto, url, utils::now_sql_stable_timestamp},
    780     };
    781     use tokio::sync::watch::Receiver;
    782 
    783     use crate::{
    784         constants::{CONFIG_SOURCE, CURRENCY},
    785         db::{
    786             self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult,
    787             TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer,
    788             register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out,
    789         },
    790         magnet_api::types::TxStatus,
    791         magnet_payto,
    792     };
    793 
    794     use super::TxInAdmin;
    795 
    796     fn fake_listen<T: Default>() -> Receiver<T> {
    797         tokio::sync::watch::channel(T::default()).1
    798     }
    799 
    800     async fn setup() -> (PgConnection, PgPool) {
    801         let pool = taler_test_utils::db_test_setup(CONFIG_SOURCE).await;
    802         let conn = pool.acquire().await.unwrap().leak();
    803         (conn, pool)
    804     }
    805 
    806     #[tokio::test]
    807     async fn kv() {
    808         let (mut db, _) = setup().await;
    809 
    810         let value = json!({
    811             "name": "Mr Smith",
    812             "no way": 32
    813         });
    814 
    815         assert_eq!(
    816             kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
    817             None
    818         );
    819         kv_set(&mut db, "value", &value).await.unwrap();
    820         kv_set(&mut db, "value", &value).await.unwrap();
    821         assert_eq!(
    822             kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
    823             Some(value)
    824         );
    825     }
    826 
    827     #[tokio::test]
    828     async fn tx_in() {
    829         let (mut db, pool) = setup().await;
    830 
    831         async fn routine(
    832             db: &mut PgConnection,
    833             first: &Option<IncomingSubject>,
    834             second: &Option<IncomingSubject>,
    835         ) {
    836             let (id, code) =
    837                 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in")
    838                     .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
    839                     .fetch_one(&mut *db)
    840                     .await
    841                     .unwrap();
    842             let now = now_sql_stable_timestamp();
    843             let date = Zoned::now().date();
    844             let later = date.tomorrow().unwrap();
    845             let tx = TxIn {
    846                 code: code,
    847                 amount: amount("EUR:10"),
    848                 subject: "subject".to_owned(),
    849                 debtor: magnet_payto(
    850                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
    851                 ),
    852                 value_date: date,
    853                 status: TxStatus::Completed,
    854             };
    855             // Insert
    856             assert_eq!(
    857                 register_tx_in(db, &tx, &first, &now)
    858                     .await
    859                     .expect("register tx in"),
    860                 AddIncomingResult::Success {
    861                     new: true,
    862                     row_id: id,
    863                     valued_at: date
    864                 }
    865             );
    866             // Idempotent
    867             assert_eq!(
    868                 register_tx_in(
    869                     db,
    870                     &TxIn {
    871                         value_date: later,
    872                         ..tx.clone()
    873                     },
    874                     &first,
    875                     &now
    876                 )
    877                 .await
    878                 .expect("register tx in"),
    879                 AddIncomingResult::Success {
    880                     new: false,
    881                     row_id: id,
    882                     valued_at: date
    883                 }
    884             );
    885             // Many
    886             assert_eq!(
    887                 register_tx_in(
    888                     db,
    889                     &TxIn {
    890                         code: code + 1,
    891                         value_date: later,
    892                         ..tx
    893                     },
    894                     &second,
    895                     &now
    896                 )
    897                 .await
    898                 .expect("register tx in"),
    899                 AddIncomingResult::Success {
    900                     new: true,
    901                     row_id: id + 1,
    902                     valued_at: later
    903                 }
    904             );
    905         }
    906 
    907         // Empty db
    908         assert_eq!(
    909             db::revenue_history(&pool, &History::default(), fake_listen)
    910                 .await
    911                 .unwrap(),
    912             Vec::new()
    913         );
    914         assert_eq!(
    915             db::incoming_history(&pool, &History::default(), fake_listen)
    916                 .await
    917                 .unwrap(),
    918             Vec::new()
    919         );
    920 
    921         // Regular transaction
    922         routine(&mut db, &None, &None).await;
    923 
    924         // Reserve transaction
    925         routine(
    926             &mut db,
    927             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
    928             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
    929         )
    930         .await;
    931 
    932         // Kyc transaction
    933         routine(
    934             &mut db,
    935             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
    936             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
    937         )
    938         .await;
    939 
    940         // History
    941         assert_eq!(
    942             db::revenue_history(&pool, &History::default(), fake_listen)
    943                 .await
    944                 .unwrap()
    945                 .len(),
    946             6
    947         );
    948         assert_eq!(
    949             db::incoming_history(&pool, &History::default(), fake_listen)
    950                 .await
    951                 .unwrap()
    952                 .len(),
    953             4
    954         );
    955     }
    956 
    957     #[tokio::test]
    958     async fn tx_in_admin() {
    959         let (_, pool) = setup().await;
    960 
    961         // Empty db
    962         assert_eq!(
    963             db::incoming_history(&pool, &History::default(), fake_listen)
    964                 .await
    965                 .unwrap(),
    966             Vec::new()
    967         );
    968 
    969         let now = now_sql_stable_timestamp();
    970         let later = now + Span::new().hours(2);
    971         let date = Zoned::now().date();
    972         let tx = TxInAdmin {
    973             amount: amount("EUR:10"),
    974             subject: "subject".to_owned(),
    975             debtor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"),
    976             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
    977         };
    978         // Insert
    979         assert_eq!(
    980             register_tx_in_admin(&pool, &tx, &now)
    981                 .await
    982                 .expect("register tx in"),
    983             AddIncomingResult::Success {
    984                 new: true,
    985                 row_id: 1,
    986                 valued_at: date
    987             }
    988         );
    989         // Idempotent
    990         assert_eq!(
    991             register_tx_in_admin(&pool, &tx, &later)
    992                 .await
    993                 .expect("register tx in"),
    994             AddIncomingResult::Success {
    995                 new: false,
    996                 row_id: 1,
    997                 valued_at: date
    998             }
    999         );
   1000         // Many
   1001         assert_eq!(
   1002             register_tx_in_admin(
   1003                 &pool,
   1004                 &TxInAdmin {
   1005                     subject: "Other".to_owned(),
   1006                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1007                     ..tx.clone()
   1008                 },
   1009                 &later
   1010             )
   1011             .await
   1012             .expect("register tx in"),
   1013             AddIncomingResult::Success {
   1014                 new: true,
   1015                 row_id: 2,
   1016                 valued_at: date
   1017             }
   1018         );
   1019 
   1020         // History
   1021         assert_eq!(
   1022             db::incoming_history(&pool, &History::default(), fake_listen)
   1023                 .await
   1024                 .unwrap()
   1025                 .len(),
   1026             2
   1027         );
   1028     }
   1029 
   1030     #[tokio::test]
   1031     async fn tx_out() {
   1032         let (mut db, pool) = setup().await;
   1033 
   1034         async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) {
   1035             let (id, code) =
   1036                 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out")
   1037                     .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
   1038                     .fetch_one(&mut *db)
   1039                     .await
   1040                     .unwrap();
   1041             let now = now_sql_stable_timestamp();
   1042             let date = Zoned::now().date();
   1043             let later = date.tomorrow().unwrap();
   1044             let tx = TxOut {
   1045                 code,
   1046                 amount: amount("HUF:10"),
   1047                 subject: "subject".to_owned(),
   1048                 creditor: magnet_payto(
   1049                     "payto://iban/HU30162000031000163100000000?receiver-name=name",
   1050                 ),
   1051                 value_date: date,
   1052                 status: TxStatus::Completed,
   1053             };
   1054             assert!(matches!(
   1055                 make_transfer(
   1056                     &mut *db,
   1057                     &TransferRequest {
   1058                         request_uid: HashCode::rand(),
   1059                         amount: amount("HUF:10"),
   1060                         exchange_base_url: url("https://exchange.test.com/"),
   1061                         wtid: ShortHashCode::rand(),
   1062                         credit_account: payto(
   1063                             "payto://iban/HU02162000031000164800000000?receiver-name=name"
   1064                         ),
   1065                     },
   1066                     &tx.creditor,
   1067                     &now
   1068                 )
   1069                 .await
   1070                 .unwrap(),
   1071                 TransferResult::Success { .. }
   1072             ));
   1073             db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), tx.code)
   1074                 .await
   1075                 .expect("status success");
   1076 
   1077             // Insert
   1078             assert_eq!(
   1079                 register_tx_out(&mut *db, &tx, first, &now)
   1080                     .await
   1081                     .expect("register tx out"),
   1082                 AddOutgoingResult {
   1083                     result: db::RegisterResult::known,
   1084                     row_id: id,
   1085                 }
   1086             );
   1087             // Idempotent
   1088             assert_eq!(
   1089                 register_tx_out(
   1090                     &mut *db,
   1091                     &TxOut {
   1092                         value_date: later,
   1093                         ..tx.clone()
   1094                     },
   1095                     first,
   1096                     &now
   1097                 )
   1098                 .await
   1099                 .expect("register tx out"),
   1100                 AddOutgoingResult {
   1101                     result: db::RegisterResult::idempotent,
   1102                     row_id: id,
   1103                 }
   1104             );
   1105             // Recovered
   1106             assert_eq!(
   1107                 register_tx_out(
   1108                     &mut *db,
   1109                     &TxOut {
   1110                         code: code + 1,
   1111                         value_date: later,
   1112                         ..tx.clone()
   1113                     },
   1114                     second,
   1115                     &now
   1116                 )
   1117                 .await
   1118                 .expect("register tx out"),
   1119                 AddOutgoingResult {
   1120                     result: db::RegisterResult::recovered,
   1121                     row_id: id + 1,
   1122                 }
   1123             );
   1124         }
   1125 
   1126         // Empty db
   1127         assert_eq!(
   1128             db::outgoing_history(&pool, &History::default(), fake_listen)
   1129                 .await
   1130                 .unwrap(),
   1131             Vec::new()
   1132         );
   1133 
   1134         // Regular transaction
   1135         routine(&mut db, &TxOutKind::Simple, &TxOutKind::Simple).await;
   1136 
   1137         // Talerable transaction
   1138         routine(
   1139             &mut db,
   1140             &TxOutKind::Talerable(OutgoingSubject(
   1141                 ShortHashCode::rand(),
   1142                 url("https://exchange.com"),
   1143             )),
   1144             &TxOutKind::Talerable(OutgoingSubject(
   1145                 ShortHashCode::rand(),
   1146                 url("https://exchange.com"),
   1147             )),
   1148         )
   1149         .await;
   1150 
   1151         // Bounced transaction
   1152         routine(&mut db, &TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1153 
   1154         // History
   1155         assert_eq!(
   1156             db::outgoing_history(&pool, &History::default(), fake_listen)
   1157                 .await
   1158                 .unwrap()
   1159                 .len(),
   1160             2
   1161         );
   1162     }
   1163 
   1164     #[tokio::test]
   1165     async fn tx_out_failure() {
   1166         let (mut db, _) = setup().await;
   1167 
   1168         let now = now_sql_stable_timestamp();
   1169 
   1170         // Unknown
   1171         assert_eq!(
   1172             db::register_tx_out_failure(&mut db, 42, None, &now)
   1173                 .await
   1174                 .unwrap(),
   1175             OutFailureResult {
   1176                 initiated_id: None,
   1177                 new: false
   1178             }
   1179         );
   1180         assert_eq!(
   1181             db::register_tx_out_failure(&mut db, 42, Some(12), &now)
   1182                 .await
   1183                 .unwrap(),
   1184             OutFailureResult {
   1185                 initiated_id: None,
   1186                 new: false
   1187             }
   1188         );
   1189 
   1190         // Initiated
   1191         let req = TransferRequest {
   1192             request_uid: HashCode::rand(),
   1193             amount: amount("HUF:10"),
   1194             exchange_base_url: url("https://exchange.test.com/"),
   1195             wtid: ShortHashCode::rand(),
   1196             credit_account: payto("payto://iban/HU02162000031000164800000000?receiver-name=name"),
   1197         };
   1198         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1199         assert_eq!(
   1200             make_transfer(&mut db, &req, &payto, &now).await.unwrap(),
   1201             TransferResult::Success {
   1202                 id: 1,
   1203                 initiated_at: now
   1204             }
   1205         );
   1206         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34)
   1207             .await
   1208             .expect("status success");
   1209         assert_eq!(
   1210             db::register_tx_out_failure(&mut db, 34, None, &now)
   1211                 .await
   1212                 .unwrap(),
   1213             OutFailureResult {
   1214                 initiated_id: Some(1),
   1215                 new: true
   1216             }
   1217         );
   1218         assert_eq!(
   1219             db::register_tx_out_failure(&mut db, 34, None, &now)
   1220                 .await
   1221                 .unwrap(),
   1222             OutFailureResult {
   1223                 initiated_id: Some(1),
   1224                 new: false
   1225             }
   1226         );
   1227 
   1228         // Recovered bounce
   1229         let tx = TxIn {
   1230             code: 12,
   1231             amount: amount("HUF:11"),
   1232             subject: "malformed transaction".to_owned(),
   1233             debtor: payto,
   1234             value_date: Zoned::now().date(),
   1235             status: TxStatus::Completed,
   1236         };
   1237         assert_eq!(
   1238             db::register_bounce_tx_in(&mut db, &tx, &tx.amount, "no reason", &now)
   1239                 .await
   1240                 .unwrap(),
   1241             BounceResult {
   1242                 tx_id: 1,
   1243                 tx_new: true,
   1244                 bounce_id: 2,
   1245                 bounce_new: true
   1246             }
   1247         );
   1248         assert_eq!(
   1249             db::register_tx_out_failure(&mut db, 10, Some(12), &now)
   1250                 .await
   1251                 .unwrap(),
   1252             OutFailureResult {
   1253                 initiated_id: Some(2),
   1254                 new: true
   1255             }
   1256         );
   1257         assert_eq!(
   1258             db::register_tx_out_failure(&mut db, 10, Some(12), &now)
   1259                 .await
   1260                 .unwrap(),
   1261             OutFailureResult {
   1262                 initiated_id: Some(2),
   1263                 new: false
   1264             }
   1265         );
   1266     }
   1267 
   1268     #[tokio::test]
   1269     async fn transfer() {
   1270         let (mut db, _) = setup().await;
   1271 
   1272         // Empty db
   1273         assert_eq!(db::transfer_by_id(&mut db, 0).await.unwrap(), None);
   1274         assert_eq!(
   1275             db::transfer_page(&mut db, &None, &Page::default())
   1276                 .await
   1277                 .unwrap(),
   1278             Vec::new()
   1279         );
   1280 
   1281         let req = TransferRequest {
   1282             request_uid: HashCode::rand(),
   1283             amount: amount("HUF:10"),
   1284             exchange_base_url: url("https://exchange.test.com/"),
   1285             wtid: ShortHashCode::rand(),
   1286             credit_account: payto("payto://iban/HU02162000031000164800000000?receiver-name=name"),
   1287         };
   1288         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1289         let now = now_sql_stable_timestamp();
   1290         let later = now + Span::new().hours(2);
   1291         // Insert
   1292         assert_eq!(
   1293             make_transfer(&mut db, &req, &payto, &now)
   1294                 .await
   1295                 .expect("transfer"),
   1296             TransferResult::Success {
   1297                 id: 1,
   1298                 initiated_at: now.into()
   1299             }
   1300         );
   1301         // Idempotent
   1302         assert_eq!(
   1303             make_transfer(&mut db, &req, &payto, &later)
   1304                 .await
   1305                 .expect("transfer"),
   1306             TransferResult::Success {
   1307                 id: 1,
   1308                 initiated_at: now.into()
   1309             }
   1310         );
   1311         // Request UID reuse
   1312         assert_eq!(
   1313             make_transfer(
   1314                 &mut db,
   1315                 &TransferRequest {
   1316                     wtid: ShortHashCode::rand(),
   1317                     ..req.clone()
   1318                 },
   1319                 &payto,
   1320                 &now
   1321             )
   1322             .await
   1323             .expect("transfer"),
   1324             TransferResult::RequestUidReuse
   1325         );
   1326         // wtid reuse
   1327         assert_eq!(
   1328             make_transfer(
   1329                 &mut db,
   1330                 &TransferRequest {
   1331                     request_uid: HashCode::rand(),
   1332                     ..req.clone()
   1333                 },
   1334                 &payto,
   1335                 &now
   1336             )
   1337             .await
   1338             .expect("transfer"),
   1339             TransferResult::WtidReuse
   1340         );
   1341         // Many
   1342         assert_eq!(
   1343             make_transfer(
   1344                 &mut db,
   1345                 &TransferRequest {
   1346                     request_uid: HashCode::rand(),
   1347                     wtid: ShortHashCode::rand(),
   1348                     ..req
   1349                 },
   1350                 &payto,
   1351                 &later
   1352             )
   1353             .await
   1354             .expect("transfer"),
   1355             TransferResult::Success {
   1356                 id: 2,
   1357                 initiated_at: later.into()
   1358             }
   1359         );
   1360 
   1361         // Get
   1362         assert!(db::transfer_by_id(&mut db, 1).await.unwrap().is_some());
   1363         assert!(db::transfer_by_id(&mut db, 2).await.unwrap().is_some());
   1364         assert!(db::transfer_by_id(&mut db, 3).await.unwrap().is_none());
   1365         assert_eq!(
   1366             db::transfer_page(&mut db, &None, &Page::default())
   1367                 .await
   1368                 .unwrap()
   1369                 .len(),
   1370             2
   1371         );
   1372     }
   1373 
   1374     #[tokio::test]
   1375     async fn bounce() {
   1376         let (mut db, _) = setup().await;
   1377 
   1378         let amount = amount("HUF:10");
   1379         let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1380         let now = now_sql_stable_timestamp();
   1381         let date = Zoned::now().date();
   1382 
   1383         // Empty db
   1384         assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty());
   1385 
   1386         // Insert
   1387         assert_eq!(
   1388             register_tx_in(
   1389                 &mut db,
   1390                 &TxIn {
   1391                     code: 13,
   1392                     amount: amount.clone(),
   1393                     subject: "subject".to_owned(),
   1394                     debtor: payto.clone(),
   1395                     value_date: date,
   1396                     status: TxStatus::Completed
   1397                 },
   1398                 &None,
   1399                 &now
   1400             )
   1401             .await
   1402             .expect("register tx in"),
   1403             AddIncomingResult::Success {
   1404                 new: true,
   1405                 row_id: 1,
   1406                 valued_at: date
   1407             }
   1408         );
   1409 
   1410         // Bounce
   1411         assert_eq!(
   1412             register_bounce_tx_in(
   1413                 &mut db,
   1414                 &TxIn {
   1415                     code: 12,
   1416                     amount: amount.clone(),
   1417                     subject: "subject".to_owned(),
   1418                     debtor: payto.clone(),
   1419                     value_date: date,
   1420                     status: TxStatus::Completed
   1421                 },
   1422                 &amount,
   1423                 "good reason",
   1424                 &now
   1425             )
   1426             .await
   1427             .expect("bounce"),
   1428             BounceResult {
   1429                 tx_id: 2,
   1430                 tx_new: true,
   1431                 bounce_id: 1,
   1432                 bounce_new: true
   1433             }
   1434         );
   1435         // Idempotent
   1436         assert_eq!(
   1437             register_bounce_tx_in(
   1438                 &mut db,
   1439                 &TxIn {
   1440                     code: 12,
   1441                     amount: amount.clone(),
   1442                     subject: "subject".to_owned(),
   1443                     debtor: payto.clone(),
   1444                     value_date: date,
   1445                     status: TxStatus::Completed
   1446                 },
   1447                 &amount,
   1448                 "good reason",
   1449                 &now
   1450             )
   1451             .await
   1452             .expect("bounce"),
   1453             BounceResult {
   1454                 tx_id: 2,
   1455                 tx_new: false,
   1456                 bounce_id: 1,
   1457                 bounce_new: false
   1458             }
   1459         );
   1460 
   1461         // Bounce registered
   1462         assert_eq!(
   1463             register_bounce_tx_in(
   1464                 &mut db,
   1465                 &TxIn {
   1466                     code: 13,
   1467                     amount: amount.clone(),
   1468                     subject: "subject".to_owned(),
   1469                     debtor: payto.clone(),
   1470                     value_date: date,
   1471                     status: TxStatus::Completed
   1472                 },
   1473                 &amount,
   1474                 "good reason",
   1475                 &now
   1476             )
   1477             .await
   1478             .expect("bounce"),
   1479             BounceResult {
   1480                 tx_id: 1,
   1481                 tx_new: false,
   1482                 bounce_id: 2,
   1483                 bounce_new: true
   1484             }
   1485         );
   1486         // Idempotent registered
   1487         assert_eq!(
   1488             register_bounce_tx_in(
   1489                 &mut db,
   1490                 &TxIn {
   1491                     code: 13,
   1492                     amount: amount.clone(),
   1493                     subject: "subject".to_owned(),
   1494                     debtor: payto.clone(),
   1495                     value_date: date,
   1496                     status: TxStatus::Completed
   1497                 },
   1498                 &amount,
   1499                 "good reason",
   1500                 &now
   1501             )
   1502             .await
   1503             .expect("bounce"),
   1504             BounceResult {
   1505                 tx_id: 1,
   1506                 tx_new: false,
   1507                 bounce_id: 2,
   1508                 bounce_new: false
   1509             }
   1510         );
   1511 
   1512         // Batch
   1513         assert_eq!(
   1514             db::pending_batch(&mut db, &now).await.unwrap(),
   1515             &[
   1516                 Initiated {
   1517                     id: 1,
   1518                     amount: amount.clone(),
   1519                     subject: "bounce: 12".to_owned(),
   1520                     creditor: payto.clone()
   1521                 },
   1522                 Initiated {
   1523                     id: 2,
   1524                     amount,
   1525                     subject: "bounce: 13".to_owned(),
   1526                     creditor: payto
   1527                 }
   1528             ]
   1529         );
   1530     }
   1531 
   1532     #[tokio::test]
   1533     async fn status() {
   1534         let (mut db, _) = setup().await;
   1535 
   1536         // Unknown transfer
   1537         db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg")
   1538             .await
   1539             .unwrap();
   1540         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1541             .await
   1542             .unwrap();
   1543     }
   1544 
   1545     #[tokio::test]
   1546     async fn batch() {
   1547         let (mut db, _) = setup().await;
   1548         let start = Timestamp::now();
   1549         let magnet_payto =
   1550             magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name");
   1551 
   1552         // Empty db
   1553         let pendings = db::pending_batch(&mut db, &start)
   1554             .await
   1555             .expect("pending_batch");
   1556         assert_eq!(pendings.len(), 0);
   1557 
   1558         // Some transfers
   1559         for i in 0..3 {
   1560             make_transfer(
   1561                 &mut db,
   1562                 &TransferRequest {
   1563                     request_uid: HashCode::rand(),
   1564                     amount: amount(format!("{}:{}", *CURRENCY, i + 1)),
   1565                     exchange_base_url: url("https://exchange.test.com/"),
   1566                     wtid: ShortHashCode::rand(),
   1567                     credit_account: payto(
   1568                         "payto://iban/HU02162000031000164800000000?receiver-name=name",
   1569                     ),
   1570                 },
   1571                 &magnet_payto,
   1572                 &Timestamp::now(),
   1573             )
   1574             .await
   1575             .expect("transfer");
   1576         }
   1577         let pendings = db::pending_batch(&mut db, &start)
   1578             .await
   1579             .expect("pending_batch");
   1580         assert_eq!(pendings.len(), 3);
   1581 
   1582         // Max 100 txs in batch
   1583         for i in 0..100 {
   1584             make_transfer(
   1585                 &mut db,
   1586                 &TransferRequest {
   1587                     request_uid: HashCode::rand(),
   1588                     amount: amount(format!("{}:{}", *CURRENCY, i + 1)),
   1589                     exchange_base_url: url("https://exchange.test.com/"),
   1590                     wtid: ShortHashCode::rand(),
   1591                     credit_account: payto(
   1592                         "payto://iban/HU02162000031000164800000000?receiver-name=name",
   1593                     ),
   1594                 },
   1595                 &magnet_payto,
   1596                 &Timestamp::now(),
   1597             )
   1598             .await
   1599             .expect("transfer");
   1600         }
   1601         let pendings = db::pending_batch(&mut db, &start)
   1602             .await
   1603             .expect("pending_batch");
   1604         assert_eq!(pendings.len(), 100);
   1605 
   1606         // Skip uploaded
   1607         for i in 0..=10 {
   1608             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1609                 .await
   1610                 .expect("status success");
   1611         }
   1612         let pendings = db::pending_batch(&mut db, &start)
   1613             .await
   1614             .expect("pending_batch");
   1615         assert_eq!(pendings.len(), 93);
   1616 
   1617         // Skip failed
   1618         for i in 0..=10 {
   1619             db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure")
   1620                 .await
   1621                 .expect("status failure");
   1622         }
   1623         let pendings = db::pending_batch(&mut db, &start)
   1624             .await
   1625             .expect("pending_batch");
   1626         assert_eq!(pendings.len(), 83);
   1627     }
   1628 }