taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (44717B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use jiff::Timestamp;
     20 use serde::{Serialize, de::DeserializeOwned};
     21 use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow};
     22 use taler_api::{
     23     db::{BindHelper, IncomingType, TypeHelper, history, page},
     24     subject::{IncomingSubject, OutgoingSubject},
     25 };
     26 use taler_common::{
     27     api_params::{History, Page},
     28     api_revenue::RevenueIncomingBankTransaction,
     29     api_wire::{
     30         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest,
     31         TransferState, TransferStatus,
     32     },
     33     types::{
     34         amount::{Currency, Decimal},
     35         payto::{PaytoImpl as _, PaytoURI},
     36     },
     37 };
     38 use tokio::sync::watch::Receiver;
     39 
     40 use crate::{CyclosId, FullCyclosPayto};
     41 
     42 #[derive(Debug, Clone)]
     43 pub struct TxIn {
     44     pub transfer_id: u64,
     45     pub tx_id: Option<u64>,
     46     pub amount: Decimal,
     47     pub subject: String,
     48     pub debtor: FullCyclosPayto,
     49     pub valued_at: Timestamp,
     50 }
     51 
     52 impl Display for TxIn {
     53     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     54         let Self {
     55             transfer_id,
     56             tx_id,
     57             amount,
     58             subject,
     59             debtor,
     60             valued_at,
     61         } = self;
     62         let tx_id = match tx_id {
     63             Some(id) => format_args!(":{}", *id),
     64             None => format_args!(""),
     65         };
     66         write!(
     67             f,
     68             "{valued_at} {transfer_id}{tx_id} {amount} ({} {}) '{subject}'",
     69             debtor.0, debtor.name
     70         )
     71     }
     72 }
     73 
     74 #[derive(Debug, Clone)]
     75 pub struct TxOut {
     76     pub transfer_id: u64,
     77     pub tx_id: Option<u64>,
     78     pub amount: Decimal,
     79     pub subject: String,
     80     pub creditor: FullCyclosPayto,
     81     pub valued_at: Timestamp,
     82 }
     83 
     84 impl Display for TxOut {
     85     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
     86         let Self {
     87             transfer_id,
     88             tx_id,
     89             amount,
     90             subject,
     91             creditor,
     92             valued_at,
     93         } = self;
     94         let tx_id = match tx_id {
     95             Some(id) => format_args!(":{}", *id),
     96             None => format_args!(""),
     97         };
     98         write!(
     99             f,
    100             "{valued_at} {transfer_id}{tx_id} {amount} ({} {}) '{subject}'",
    101             creditor.0, creditor.name
    102         )
    103     }
    104 }
    105 
    106 #[derive(Debug, PartialEq, Eq)]
    107 pub struct Initiated {
    108     pub id: u64,
    109     pub amount: Decimal,
    110     pub subject: String,
    111     pub creditor: FullCyclosPayto,
    112 }
    113 
    114 impl Display for Initiated {
    115     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    116         let Self {
    117             id,
    118             amount,
    119             subject,
    120             creditor,
    121         } = self;
    122         write!(
    123             f,
    124             "{id} {amount} ({} {}) '{subject}'",
    125             creditor.0, creditor.name
    126         )
    127     }
    128 }
    129 
    130 #[derive(Debug, Clone)]
    131 pub struct TxInAdmin {
    132     pub amount: Decimal,
    133     pub subject: String,
    134     pub debtor: FullCyclosPayto,
    135     pub metadata: IncomingSubject,
    136 }
    137 
    138 #[derive(Debug, PartialEq, Eq)]
    139 pub enum AddIncomingResult {
    140     Success {
    141         new: bool,
    142         row_id: u64,
    143         valued_at: Timestamp,
    144     },
    145     ReservePubReuse,
    146 }
    147 
    148 pub async fn register_tx_in_admin(
    149     db: &PgPool,
    150     tx: &TxInAdmin,
    151     now: &Timestamp,
    152 ) -> sqlx::Result<AddIncomingResult> {
    153     sqlx::query(
    154         "
    155             SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new
    156             FROM register_tx_in(NULL, NULL, ($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8, $6)
    157         ",
    158     )
    159     .bind_decimal(&tx.amount)
    160     .bind(&tx.subject)
    161     .bind(tx.debtor.0 as i64)
    162     .bind(&tx.debtor.name)
    163     .bind_timestamp(now)
    164     .bind(tx.metadata.ty())
    165     .bind(tx.metadata.key())
    166     .try_map(|r: PgRow| {
    167         Ok(if r.try_get(0)? {
    168             AddIncomingResult::ReservePubReuse
    169         } else {
    170             AddIncomingResult::Success {
    171                 row_id: r.try_get_u64(1)?,
    172                 valued_at: r.try_get_timestamp(2)?,
    173                 new: r.try_get(3)?,
    174             }
    175         })
    176     })
    177     .fetch_one(db)
    178     .await
    179 }
    180 
    181 pub async fn register_tx_in(
    182     db: &mut PgConnection,
    183     tx: &TxIn,
    184     subject: &Option<IncomingSubject>,
    185     now: &Timestamp,
    186 ) -> sqlx::Result<AddIncomingResult> {
    187     sqlx::query(
    188         "
    189             SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new
    190             FROM register_tx_in($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8, $9, $10, $11)
    191         ",
    192     )
    193     .bind(tx.transfer_id as i64)
    194     .bind(tx.tx_id.map(|it| it as i64))
    195     .bind_decimal(&tx.amount)
    196     .bind(&tx.subject)
    197     .bind(tx.debtor.0 as i64)
    198     .bind(&tx.debtor.name)
    199     .bind(tx.valued_at.as_microsecond())
    200     .bind(subject.as_ref().map(|it| it.ty()))
    201     .bind(subject.as_ref().map(|it| it.key()))
    202     .bind(now.as_microsecond())
    203     .try_map(|r: PgRow| {
    204         Ok(if r.try_get(0)? {
    205             AddIncomingResult::ReservePubReuse
    206         } else {
    207             AddIncomingResult::Success {
    208                 row_id: r.try_get_u64(1)?,
    209                 valued_at: r.try_get_timestamp(2)?,
    210                 new: r.try_get(3)?,
    211             }
    212         })
    213     })
    214     .fetch_one(db)
    215     .await
    216 }
    217 
    218 #[derive(Debug)]
    219 pub enum TxOutKind {
    220     Simple,
    221     Bounce(u64),
    222     Talerable(OutgoingSubject),
    223 }
    224 
    225 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    226 #[allow(non_camel_case_types)]
    227 #[sqlx(type_name = "register_result")]
    228 pub enum RegisterResult {
    229     /// Already registered
    230     idempotent,
    231     /// Initiated transaction
    232     known,
    233     /// Recovered unknown outgoing transaction
    234     recovered,
    235 }
    236 
    237 #[derive(Debug, PartialEq, Eq)]
    238 pub struct AddOutgoingResult {
    239     pub result: RegisterResult,
    240     pub row_id: u64,
    241 }
    242 
    243 pub async fn register_tx_out(
    244     db: &mut PgConnection,
    245     tx: &TxOut,
    246     kind: &TxOutKind,
    247     now: &Timestamp,
    248 ) -> sqlx::Result<AddOutgoingResult> {
    249     let query = sqlx::query(
    250         "
    251             SELECT out_result, out_tx_row_id 
    252             FROM register_tx_out($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8, $9, $10, $11, $12)
    253         ",
    254     )
    255     .bind(tx.transfer_id as i64)
    256     .bind(tx.tx_id.map(|it| it as i64))
    257     .bind_decimal(&tx.amount)
    258     .bind(&tx.subject)
    259     .bind(tx.creditor.0 as i64)
    260     .bind(&tx.creditor.name)
    261     .bind_timestamp(&tx.valued_at);
    262     let query = match kind {
    263         TxOutKind::Simple => query
    264             .bind(None::<&[u8]>)
    265             .bind(None::<&str>)
    266             .bind(None::<i64>),
    267         TxOutKind::Bounce(bounced) => query
    268             .bind(None::<&[u8]>)
    269             .bind(None::<&str>)
    270             .bind(*bounced as i64),
    271         TxOutKind::Talerable(subject) => query
    272             .bind(subject.0.as_ref())
    273             .bind(subject.1.as_ref())
    274             .bind(None::<i64>),
    275     };
    276     query
    277         .bind_timestamp(now)
    278         .try_map(|r: PgRow| {
    279             Ok(AddOutgoingResult {
    280                 result: r.try_get(0)?,
    281                 row_id: r.try_get_u64(1)?,
    282             })
    283         })
    284         .fetch_one(db)
    285         .await
    286 }
    287 
    288 #[derive(Debug, PartialEq, Eq)]
    289 pub enum TransferResult {
    290     Success { id: u64, initiated_at: Timestamp },
    291     RequestUidReuse,
    292     WtidReuse,
    293 }
    294 
    295 pub async fn make_transfer<'a>(
    296     db: impl PgExecutor<'a>,
    297     req: &TransferRequest,
    298     creditor: &FullCyclosPayto,
    299     now: &Timestamp,
    300 ) -> sqlx::Result<TransferResult> {
    301     let subject = format!("{} {}", req.wtid, req.exchange_base_url);
    302     sqlx::query(
    303         "
    304             SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    305             FROM taler_transfer($1, $2, $3, ($4, $5)::taler_amount, $6, $7, $8, $9)
    306         ",
    307     )
    308     .bind(req.request_uid.as_ref())
    309     .bind(req.wtid.as_ref())
    310     .bind(&subject)
    311     .bind_amount(&req.amount)
    312     .bind(req.exchange_base_url.as_str())
    313     .bind(creditor.0 as i64)
    314     .bind(&creditor.name)
    315     .bind_timestamp(now)
    316     .try_map(|r: PgRow| {
    317         Ok(if r.try_get(0)? {
    318             TransferResult::RequestUidReuse
    319         } else if r.try_get(1)? {
    320             TransferResult::WtidReuse
    321         } else {
    322             TransferResult::Success {
    323                 id: r.try_get_u64(2)?,
    324                 initiated_at: r.try_get_timestamp(3)?,
    325             }
    326         })
    327     })
    328     .fetch_one(db)
    329     .await
    330 }
    331 
    332 #[derive(Debug, PartialEq, Eq)]
    333 pub struct BounceResult {
    334     pub tx_id: u64,
    335     pub tx_new: bool,
    336 }
    337 
    338 pub async fn register_bounced_tx_in(
    339     db: &mut PgConnection,
    340     tx: &TxIn,
    341     chargeback_id: u64,
    342     reason: &str,
    343     now: &Timestamp,
    344 ) -> sqlx::Result<BounceResult> {
    345     sqlx::query(
    346         "
    347             SELECT out_tx_row_id, out_tx_new
    348             FROM register_bounced_tx_in($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8, $9, $10, $11)
    349         ",
    350     )
    351     .bind(tx.transfer_id as i64)
    352     .bind(tx.tx_id.map(|it| it as i64))
    353     .bind_decimal(&tx.amount)
    354     .bind(&tx.subject)
    355     .bind(tx.debtor.0 as i64)
    356     .bind(&tx.debtor.name)
    357     .bind_timestamp(&tx.valued_at)
    358     .bind(chargeback_id as i64)
    359     .bind(reason)
    360     .bind_timestamp(now)
    361     .try_map(|r: PgRow| {
    362         Ok(BounceResult {
    363             tx_id: r.try_get_u64(0)?,
    364             tx_new: r.try_get(1)?,
    365         })
    366     })
    367     .fetch_one(db)
    368     .await
    369 }
    370 
    371 pub async fn transfer_page<'a>(
    372     db: impl PgExecutor<'a>,
    373     status: &Option<TransferState>,
    374     currency: &Currency,
    375     params: &Page,
    376 ) -> sqlx::Result<Vec<TransferListStatus>> {
    377     page(
    378         db,
    379         "initiated_id",
    380         params,
    381         || {
    382             let mut builder = QueryBuilder::new(
    383                 "
    384                     SELECT
    385                         initiated_id,
    386                         status,
    387                         (amount).val as amount_val,
    388                         (amount).frac as amount_frac,
    389                         credit_account,
    390                         credit_name,
    391                         initiated_at
    392                     FROM transfer 
    393                     JOIN initiated USING (initiated_id)
    394                     WHERE
    395                 ",
    396             );
    397             if let Some(status) = status {
    398                 builder.push(" status = ").push_bind(status).push(" AND ");
    399             }
    400             builder
    401         },
    402         |r: PgRow| {
    403             Ok(TransferListStatus {
    404                 row_id: r.try_get_safeu64(0)?,
    405                 status: r.try_get(1)?,
    406                 amount: r.try_get_amount_i(2, currency)?,
    407                 credit_account: r.try_get_cyclos_fullpaytouri(4, 5)?,
    408                 timestamp: r.try_get_timestamp(6)?.into(),
    409             })
    410         },
    411     )
    412     .await
    413 }
    414 
    415 pub async fn outgoing_history(
    416     db: &PgPool,
    417     params: &History,
    418     currency: &Currency,
    419     listen: impl FnOnce() -> Receiver<i64>,
    420 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    421     history(
    422         db,
    423         "tx_out_id",
    424         params,
    425         listen,
    426         || {
    427             QueryBuilder::new(
    428                 "
    429                 SELECT
    430                     tx_out_id,
    431                     (amount).val as amount_val,
    432                     (amount).frac as amount_frac,
    433                     credit_account,
    434                     credit_name,
    435                     valued_at,
    436                     exchange_base_url,
    437                     wtid
    438                 FROM taler_out
    439                 JOIN tx_out USING (tx_out_id)
    440                 WHERE
    441             ",
    442             )
    443         },
    444         |r: PgRow| {
    445             Ok(OutgoingBankTransaction {
    446                 row_id: r.try_get_safeu64(0)?,
    447                 amount: r.try_get_amount_i(1, currency)?,
    448                 credit_account: r.try_get_cyclos_fullpaytouri(3, 4)?,
    449                 date: r.try_get_timestamp(5)?.into(),
    450                 exchange_base_url: r.try_get_url(6)?,
    451                 wtid: r.try_get_base32(7)?,
    452             })
    453         },
    454     )
    455     .await
    456 }
    457 
    458 pub async fn incoming_history(
    459     db: &PgPool,
    460     params: &History,
    461     currency: &Currency,
    462     listen: impl FnOnce() -> Receiver<i64>,
    463 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    464     history(
    465         db,
    466         "tx_in_id",
    467         params,
    468         listen,
    469         || {
    470             QueryBuilder::new(
    471                 "
    472                 SELECT
    473                     type,
    474                     tx_in_id,
    475                     (amount).val as amount_val,
    476                     (amount).frac as amount_frac,
    477                     debit_account,
    478                     debit_name,
    479                     valued_at,
    480                     metadata
    481                 FROM taler_in
    482                 JOIN tx_in USING (tx_in_id)
    483                 WHERE
    484             ",
    485             )
    486         },
    487         |r: PgRow| {
    488             Ok(match r.try_get(0)? {
    489                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    490                     row_id: r.try_get_safeu64(1)?,
    491                     amount: r.try_get_amount_i(2, currency)?,
    492                     debit_account: r.try_get_cyclos_fullpaytouri(4, 5)?,
    493                     date: r.try_get_timestamp(6)?.into(),
    494                     reserve_pub: r.try_get_base32(7)?,
    495                 },
    496                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    497                     row_id: r.try_get_safeu64(1)?,
    498                     amount: r.try_get_amount_i(2, currency)?,
    499                     debit_account: r.try_get_cyclos_fullpaytouri(4, 5)?,
    500                     date: r.try_get_timestamp(6)?.into(),
    501                     account_pub: r.try_get_base32(7)?,
    502                 },
    503                 IncomingType::wad => {
    504                     unimplemented!("WAD is not yet supported")
    505                 }
    506             })
    507         },
    508     )
    509     .await
    510 }
    511 
    512 pub async fn revenue_history(
    513     db: &PgPool,
    514     params: &History,
    515     currency: &Currency,
    516     listen: impl FnOnce() -> Receiver<i64>,
    517 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    518     history(
    519         db,
    520         "tx_in_id",
    521         params,
    522         listen,
    523         || {
    524             QueryBuilder::new(
    525                 "
    526                 SELECT
    527                     tx_in_id,
    528                     valued_at,
    529                     (amount).val as amount_val,
    530                     (amount).frac as amount_frac,
    531                     debit_account,
    532                     debit_name,
    533                     subject
    534                 FROM tx_in
    535                 WHERE
    536             ",
    537             )
    538         },
    539         |r: PgRow| {
    540             Ok(RevenueIncomingBankTransaction {
    541                 row_id: r.try_get_safeu64(0)?,
    542                 date: r.try_get_timestamp(1)?.into(),
    543                 amount: r.try_get_amount_i(2, currency)?,
    544                 credit_fee: None,
    545                 debit_account: r.try_get_cyclos_fullpaytouri(4, 5)?,
    546                 subject: r.try_get(6)?,
    547             })
    548         },
    549     )
    550     .await
    551 }
    552 
    553 pub async fn transfer_by_id<'a>(
    554     db: impl PgExecutor<'a>,
    555     id: u64,
    556     currency: &Currency,
    557 ) -> sqlx::Result<Option<TransferStatus>> {
    558     sqlx::query(
    559         "
    560             SELECT
    561                 status,
    562                 status_msg,
    563                 (amount).val as amount_val,
    564                 (amount).frac as amount_frac,
    565                 exchange_base_url,
    566                 wtid,
    567                 credit_account,
    568                 credit_name,
    569                 initiated_at
    570             FROM transfer 
    571             JOIN initiated USING (initiated_id) 
    572             WHERE initiated_id = $1
    573         ",
    574     )
    575     .bind(id as i64)
    576     .try_map(|r: PgRow| {
    577         Ok(TransferStatus {
    578             status: r.try_get(0)?,
    579             status_msg: r.try_get(1)?,
    580             amount: r.try_get_amount_i(2, currency)?,
    581             origin_exchange_url: r.try_get(4)?,
    582             wtid: r.try_get_base32(5)?,
    583             credit_account: r.try_get_cyclos_fullpaytouri(6, 7)?,
    584             timestamp: r.try_get_timestamp(8)?.into(),
    585         })
    586     })
    587     .fetch_optional(db)
    588     .await
    589 }
    590 
    591 /** Get a batch of pending initiated transactions not attempted since [start] */
    592 pub async fn pending_batch<'a>(
    593     db: impl PgExecutor<'a>,
    594     start: &Timestamp,
    595 ) -> sqlx::Result<Vec<Initiated>> {
    596     sqlx::query(
    597         "
    598             SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name
    599             FROM initiated 
    600             WHERE tx_id IS NULL
    601                 AND status='pending'
    602                 AND (last_submitted IS NULL OR last_submitted < $1)
    603             LIMIT 100
    604         ",
    605     )
    606     .bind_timestamp(start)
    607     .try_map(|r: PgRow| {
    608         Ok(Initiated {
    609             id: r.try_get_u64(0)?,
    610             amount: r.try_get_decimal(1, 2)?,
    611             subject: r.try_get(3)?,
    612             creditor: r.try_get_cyclos_fullpayto(4, 5)?,
    613         })
    614     })
    615     .fetch_all(db)
    616     .await
    617 }
    618 
    619 /** Update status of a successful submitted initiated transaction */
    620 pub async fn initiated_submit_success<'a>(
    621     db: impl PgExecutor<'a>,
    622     initiated_id: u64,
    623     timestamp: &Timestamp,
    624     tx_id: u64,
    625 ) -> sqlx::Result<()> {
    626     sqlx::query(
    627         "
    628             UPDATE initiated
    629             SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2
    630             WHERE initiated_id=$3
    631         "
    632     ).bind_timestamp(timestamp)
    633     .bind(tx_id as i64)
    634     .bind(initiated_id as i64)
    635     .execute(db).await?;
    636     Ok(())
    637 }
    638 
    639 /** Update status of a permanently failed initiated transaction */
    640 pub async fn initiated_submit_permanent_failure<'a>(
    641     db: impl PgExecutor<'a>,
    642     initiated_id: u64,
    643     timestamp: &Timestamp,
    644     msg: &str,
    645 ) -> sqlx::Result<()> {
    646     sqlx::query(
    647         "
    648             UPDATE initiated
    649             SET status='permanent_failure', status_msg=$2
    650             WHERE initiated_id=$3
    651         ",
    652     )
    653     .bind_timestamp(timestamp)
    654     .bind(msg)
    655     .bind(initiated_id as i64)
    656     .execute(db)
    657     .await?;
    658     Ok(())
    659 }
    660 
    661 #[derive(Debug, PartialEq, Eq)]
    662 pub struct OutFailureResult {
    663     pub initiated_id: Option<u64>,
    664     pub new: bool,
    665 }
    666 
    667 /** Update status of a charged back failed initiated transaction */
    668 pub async fn initiated_chargeback_failure(
    669     db: &mut PgConnection,
    670     code: u64,
    671     bounced: Option<u32>,
    672     now: &Timestamp,
    673 ) -> sqlx::Result<OutFailureResult> {
    674     todo!();
    675     sqlx::query(
    676         "
    677             SELECT out_new, out_initiated_id 
    678             FROM register_tx_out_failure($1, $2, $3)
    679         ",
    680     )
    681     .bind(code as i64)
    682     .bind(bounced.map(|i| i as i32))
    683     .bind_timestamp(now)
    684     .try_map(|r: PgRow| {
    685         Ok(OutFailureResult {
    686             new: r.try_get(0)?,
    687             initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64),
    688         })
    689     })
    690     .fetch_one(db)
    691     .await
    692 }
    693 
    694 /** Get JSON value from KV table */
    695 pub async fn kv_get<'a, T: DeserializeOwned + Unpin + Send>(
    696     db: impl PgExecutor<'a>,
    697     key: &str,
    698 ) -> sqlx::Result<Option<T>> {
    699     sqlx::query("SELECT value FROM kv WHERE key=$1")
    700         .bind(key)
    701         .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    702         .fetch_optional(db)
    703         .await
    704 }
    705 
    706 /** Set JSON value in KV table */
    707 pub async fn kv_set<'a, T: Serialize>(
    708     db: impl PgExecutor<'a>,
    709     key: &str,
    710     value: &T,
    711 ) -> sqlx::Result<()> {
    712     sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    713         .bind(key)
    714         .bind(sqlx::types::Json(value))
    715         .execute(db)
    716         .await?;
    717     Ok(())
    718 }
    719 
    720 pub trait CyclosTypeHelper {
    721     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    722         &self,
    723         idx: I,
    724         name: I,
    725     ) -> sqlx::Result<FullCyclosPayto>;
    726     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    727         &self,
    728         idx: I,
    729         name: I,
    730     ) -> sqlx::Result<PaytoURI>;
    731 }
    732 
    733 impl CyclosTypeHelper for PgRow {
    734     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    735         &self,
    736         idx: I,
    737         name: I,
    738     ) -> sqlx::Result<FullCyclosPayto> {
    739         let idx = self.try_get_u64(idx)?;
    740         let name = self.try_get(name)?;
    741         Ok(FullCyclosPayto::new(CyclosId(idx), name))
    742     }
    743     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    744         &self,
    745         idx: I,
    746         name: I,
    747     ) -> sqlx::Result<PaytoURI> {
    748         let idx = self.try_get_u64(idx)?;
    749         let name = self.try_get(name)?;
    750         Ok(CyclosId(idx).as_full_payto(name))
    751     }
    752 }
    753 
    754 #[cfg(test)]
    755 mod test {
    756     use std::sync::LazyLock;
    757 
    758     use jiff::{Span, Timestamp};
    759     use serde_json::json;
    760     use sqlx::{PgConnection, PgPool, postgres::PgRow};
    761     use taler_api::{
    762         db::TypeHelper,
    763         subject::{IncomingSubject, OutgoingSubject},
    764     };
    765     use taler_common::{
    766         api_common::{EddsaPublicKey, HashCode, ShortHashCode},
    767         api_params::{History, Page},
    768         api_wire::{TransferRequest, TransferState, TransferStatus},
    769         types::{
    770             amount::{Currency, amount, decimal},
    771             payto::payto,
    772             url,
    773             utils::now_sql_stable_timestamp,
    774         },
    775     };
    776     use tokio::sync::watch::Receiver;
    777 
    778     use crate::{
    779         constants::CONFIG_SOURCE,
    780         cyclos_payto,
    781         db::{
    782             self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, TransferResult,
    783             TxIn, TxInAdmin, TxOut, TxOutKind,
    784         },
    785     };
    786 
    787     pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "TEST".parse().unwrap());
    788 
    789     fn fake_listen<T: Default>() -> Receiver<T> {
    790         tokio::sync::watch::channel(T::default()).1
    791     }
    792 
    793     async fn setup() -> (PgConnection, PgPool) {
    794         let pool = taler_test_utils::db_test_setup(CONFIG_SOURCE).await;
    795         let conn = pool.acquire().await.unwrap().leak();
    796         (conn, pool)
    797     }
    798 
    799     #[tokio::test]
    800     async fn kv() {
    801         let (mut db, _) = setup().await;
    802 
    803         let value = json!({
    804             "name": "Mr Smith",
    805             "no way": 32
    806         });
    807 
    808         assert_eq!(
    809             db::kv_get::<serde_json::Value>(&mut db, "value")
    810                 .await
    811                 .unwrap(),
    812             None
    813         );
    814         db::kv_set(&mut db, "value", &value).await.unwrap();
    815         db::kv_set(&mut db, "value", &value).await.unwrap();
    816         assert_eq!(
    817             db::kv_get::<serde_json::Value>(&mut db, "value")
    818                 .await
    819                 .unwrap(),
    820             Some(value)
    821         );
    822     }
    823 
    824     #[tokio::test]
    825     async fn tx_in() {
    826         let (mut db, pool) = setup().await;
    827 
    828         async fn routine(
    829             db: &mut PgConnection,
    830             first: &Option<IncomingSubject>,
    831             second: &Option<IncomingSubject>,
    832         ) {
    833             let id = sqlx::query("SELECT count(*) + 1 FROM tx_in")
    834                 .try_map(|r: PgRow| r.try_get_u64(0))
    835                 .fetch_one(&mut *db)
    836                 .await
    837                 .unwrap();
    838             let now = now_sql_stable_timestamp();
    839             let later = now + Span::new().hours(2);
    840             let tx = TxIn {
    841                 transfer_id: now.as_microsecond() as u64,
    842                 tx_id: None,
    843                 amount: decimal("10"),
    844                 subject: "subject".to_owned(),
    845                 debtor: cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"),
    846                 valued_at: now,
    847             };
    848             // Insert
    849             assert_eq!(
    850                 db::register_tx_in(db, &tx, &first, &now)
    851                     .await
    852                     .expect("register tx in"),
    853                 AddIncomingResult::Success {
    854                     new: true,
    855                     row_id: id,
    856                     valued_at: now
    857                 }
    858             );
    859             // Idempotent
    860             assert_eq!(
    861                 db::register_tx_in(
    862                     db,
    863                     &TxIn {
    864                         valued_at: later,
    865                         ..tx.clone()
    866                     },
    867                     &first,
    868                     &now
    869                 )
    870                 .await
    871                 .expect("register tx in"),
    872                 AddIncomingResult::Success {
    873                     new: false,
    874                     row_id: id,
    875                     valued_at: now
    876                 }
    877             );
    878             // Many
    879             assert_eq!(
    880                 db::register_tx_in(
    881                     db,
    882                     &TxIn {
    883                         transfer_id: later.as_microsecond() as u64,
    884                         valued_at: later,
    885                         ..tx
    886                     },
    887                     &second,
    888                     &now
    889                 )
    890                 .await
    891                 .expect("register tx in"),
    892                 AddIncomingResult::Success {
    893                     new: true,
    894                     row_id: id + 1,
    895                     valued_at: later
    896                 }
    897             );
    898         }
    899 
    900         // Empty db
    901         assert_eq!(
    902             db::revenue_history(&pool, &History::default(), &CURRENCY, fake_listen)
    903                 .await
    904                 .unwrap(),
    905             Vec::new()
    906         );
    907         assert_eq!(
    908             db::incoming_history(&pool, &History::default(), &CURRENCY, fake_listen)
    909                 .await
    910                 .unwrap(),
    911             Vec::new()
    912         );
    913 
    914         // Regular transaction
    915         routine(&mut db, &None, &None).await;
    916 
    917         // Reserve transaction
    918         routine(
    919             &mut db,
    920             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
    921             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
    922         )
    923         .await;
    924 
    925         // Kyc transaction
    926         routine(
    927             &mut db,
    928             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
    929             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
    930         )
    931         .await;
    932 
    933         // History
    934         assert_eq!(
    935             db::revenue_history(&pool, &History::default(), &CURRENCY, fake_listen)
    936                 .await
    937                 .unwrap()
    938                 .len(),
    939             6
    940         );
    941         assert_eq!(
    942             db::incoming_history(&pool, &History::default(), &CURRENCY, fake_listen)
    943                 .await
    944                 .unwrap()
    945                 .len(),
    946             4
    947         );
    948     }
    949 
    950     #[tokio::test]
    951     async fn tx_in_admin() {
    952         let (_, pool) = setup().await;
    953 
    954         // Empty db
    955         assert_eq!(
    956             db::incoming_history(&pool, &History::default(), &CURRENCY, fake_listen)
    957                 .await
    958                 .unwrap(),
    959             Vec::new()
    960         );
    961 
    962         let now = now_sql_stable_timestamp();
    963         let later = now + Span::new().hours(2);
    964         let tx = TxInAdmin {
    965             amount: decimal("10"),
    966             subject: "subject".to_owned(),
    967             debtor: cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"),
    968             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
    969         };
    970         // Insert
    971         assert_eq!(
    972             db::register_tx_in_admin(&pool, &tx, &now)
    973                 .await
    974                 .expect("register tx in"),
    975             AddIncomingResult::Success {
    976                 new: true,
    977                 row_id: 1,
    978                 valued_at: now
    979             }
    980         );
    981         // Idempotent
    982         assert_eq!(
    983             db::register_tx_in_admin(&pool, &tx, &later)
    984                 .await
    985                 .expect("register tx in"),
    986             AddIncomingResult::Success {
    987                 new: false,
    988                 row_id: 1,
    989                 valued_at: now
    990             }
    991         );
    992         // Many
    993         assert_eq!(
    994             db::register_tx_in_admin(
    995                 &pool,
    996                 &TxInAdmin {
    997                     subject: "Other".to_owned(),
    998                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
    999                     ..tx.clone()
   1000                 },
   1001                 &later
   1002             )
   1003             .await
   1004             .expect("register tx in"),
   1005             AddIncomingResult::Success {
   1006                 new: true,
   1007                 row_id: 2,
   1008                 valued_at: later
   1009             }
   1010         );
   1011 
   1012         // History
   1013         assert_eq!(
   1014             db::incoming_history(&pool, &History::default(), &CURRENCY, fake_listen)
   1015                 .await
   1016                 .unwrap()
   1017                 .len(),
   1018             2
   1019         );
   1020     }
   1021 
   1022     #[tokio::test]
   1023     async fn tx_out() {
   1024         let (mut db, pool) = setup().await;
   1025 
   1026         async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) {
   1027             let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out")
   1028                 .try_map(|r: PgRow| r.try_get_u64(0))
   1029                 .fetch_one(&mut *db)
   1030                 .await
   1031                 .unwrap();
   1032             let now = now_sql_stable_timestamp();
   1033             let later = now + Span::new().hours(2);
   1034             let tx = TxOut {
   1035                 transfer_id,
   1036                 tx_id: Some(transfer_id),
   1037                 amount: decimal("10"),
   1038                 subject: "subject".to_owned(),
   1039                 creditor: cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"),
   1040                 valued_at: now,
   1041             };
   1042             assert!(matches!(
   1043                 db::make_transfer(
   1044                     &mut *db,
   1045                     &TransferRequest {
   1046                         request_uid: HashCode::rand(),
   1047                         amount: amount("HUF:10"),
   1048                         exchange_base_url: url("https://exchange.test.com/"),
   1049                         wtid: ShortHashCode::rand(),
   1050                         credit_account: payto(
   1051                             "payto://cyclos/31000163100000000?receiver-name=name"
   1052                         ),
   1053                     },
   1054                     &tx.creditor,
   1055                     &now
   1056                 )
   1057                 .await
   1058                 .unwrap(),
   1059                 TransferResult::Success { .. }
   1060             ));
   1061             db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), transfer_id)
   1062                 .await
   1063                 .expect("status success");
   1064 
   1065             // Insert
   1066             assert_eq!(
   1067                 db::register_tx_out(&mut *db, &tx, first, &now)
   1068                     .await
   1069                     .expect("register tx out"),
   1070                 AddOutgoingResult {
   1071                     result: db::RegisterResult::known,
   1072                     row_id: transfer_id,
   1073                 }
   1074             );
   1075             // Idempotent
   1076             assert_eq!(
   1077                 db::register_tx_out(
   1078                     &mut *db,
   1079                     &TxOut {
   1080                         valued_at: later,
   1081                         ..tx.clone()
   1082                     },
   1083                     first,
   1084                     &now
   1085                 )
   1086                 .await
   1087                 .expect("register tx out"),
   1088                 AddOutgoingResult {
   1089                     result: db::RegisterResult::idempotent,
   1090                     row_id: transfer_id,
   1091                 }
   1092             );
   1093             // Recovered
   1094             assert_eq!(
   1095                 db::register_tx_out(
   1096                     &mut *db,
   1097                     &TxOut {
   1098                         transfer_id: transfer_id + 1,
   1099                         tx_id: Some(transfer_id + 1),
   1100                         valued_at: later,
   1101                         ..tx.clone()
   1102                     },
   1103                     second,
   1104                     &now
   1105                 )
   1106                 .await
   1107                 .expect("register tx out"),
   1108                 AddOutgoingResult {
   1109                     result: db::RegisterResult::recovered,
   1110                     row_id: transfer_id + 1,
   1111                 }
   1112             );
   1113         }
   1114 
   1115         // Empty db
   1116         assert_eq!(
   1117             db::outgoing_history(&pool, &History::default(), &CURRENCY, fake_listen)
   1118                 .await
   1119                 .unwrap(),
   1120             Vec::new()
   1121         );
   1122 
   1123         // Regular transaction
   1124         routine(&mut db, &TxOutKind::Simple, &TxOutKind::Simple).await;
   1125 
   1126         // Talerable transaction
   1127         routine(
   1128             &mut db,
   1129             &TxOutKind::Talerable(OutgoingSubject(
   1130                 ShortHashCode::rand(),
   1131                 url("https://exchange.com"),
   1132             )),
   1133             &TxOutKind::Talerable(OutgoingSubject(
   1134                 ShortHashCode::rand(),
   1135                 url("https://exchange.com"),
   1136             )),
   1137         )
   1138         .await;
   1139 
   1140         // Bounced transaction
   1141         routine(&mut db, &TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1142 
   1143         // History
   1144         assert_eq!(
   1145             db::outgoing_history(&pool, &History::default(), &CURRENCY, fake_listen)
   1146                 .await
   1147                 .unwrap()
   1148                 .len(),
   1149             2
   1150         );
   1151     }
   1152 
   1153     // TODO tx out failure
   1154 
   1155     #[tokio::test]
   1156     async fn transfer() {
   1157         let (mut db, _) = setup().await;
   1158 
   1159         // Empty db
   1160         assert_eq!(
   1161             db::transfer_by_id(&mut db, 0, &CURRENCY).await.unwrap(),
   1162             None
   1163         );
   1164         assert_eq!(
   1165             db::transfer_page(&mut db, &None, &CURRENCY, &Page::default())
   1166                 .await
   1167                 .unwrap(),
   1168             Vec::new()
   1169         );
   1170 
   1171         let req = TransferRequest {
   1172             request_uid: HashCode::rand(),
   1173             amount: amount("HUF:10"),
   1174             exchange_base_url: url("https://exchange.test.com/"),
   1175             wtid: ShortHashCode::rand(),
   1176             credit_account: payto("payto://iban/HU02162000031000164800000000?receiver-name=name"),
   1177         };
   1178         let payto = cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name");
   1179         let now = now_sql_stable_timestamp();
   1180         let later = now + Span::new().hours(2);
   1181         // Insert
   1182         assert_eq!(
   1183             db::make_transfer(&mut db, &req, &payto, &now)
   1184                 .await
   1185                 .expect("transfer"),
   1186             TransferResult::Success {
   1187                 id: 1,
   1188                 initiated_at: now
   1189             }
   1190         );
   1191         // Idempotent
   1192         assert_eq!(
   1193             db::make_transfer(&mut db, &req, &payto, &later)
   1194                 .await
   1195                 .expect("transfer"),
   1196             TransferResult::Success {
   1197                 id: 1,
   1198                 initiated_at: now
   1199             }
   1200         );
   1201         // Request UID reuse
   1202         assert_eq!(
   1203             db::make_transfer(
   1204                 &mut db,
   1205                 &TransferRequest {
   1206                     wtid: ShortHashCode::rand(),
   1207                     ..req.clone()
   1208                 },
   1209                 &payto,
   1210                 &now
   1211             )
   1212             .await
   1213             .expect("transfer"),
   1214             TransferResult::RequestUidReuse
   1215         );
   1216         // wtid reuse
   1217         assert_eq!(
   1218             db::make_transfer(
   1219                 &mut db,
   1220                 &TransferRequest {
   1221                     request_uid: HashCode::rand(),
   1222                     ..req.clone()
   1223                 },
   1224                 &payto,
   1225                 &now
   1226             )
   1227             .await
   1228             .expect("transfer"),
   1229             TransferResult::WtidReuse
   1230         );
   1231         // Many
   1232         assert_eq!(
   1233             db::make_transfer(
   1234                 &mut db,
   1235                 &TransferRequest {
   1236                     request_uid: HashCode::rand(),
   1237                     wtid: ShortHashCode::rand(),
   1238                     ..req
   1239                 },
   1240                 &payto,
   1241                 &later
   1242             )
   1243             .await
   1244             .expect("transfer"),
   1245             TransferResult::Success {
   1246                 id: 2,
   1247                 initiated_at: later.into()
   1248             }
   1249         );
   1250 
   1251         // Get
   1252         assert!(
   1253             db::transfer_by_id(&mut db, 1, &CURRENCY)
   1254                 .await
   1255                 .unwrap()
   1256                 .is_some()
   1257         );
   1258         assert!(
   1259             db::transfer_by_id(&mut db, 2, &CURRENCY)
   1260                 .await
   1261                 .unwrap()
   1262                 .is_some()
   1263         );
   1264         assert!(
   1265             db::transfer_by_id(&mut db, 3, &CURRENCY)
   1266                 .await
   1267                 .unwrap()
   1268                 .is_none()
   1269         );
   1270         assert_eq!(
   1271             db::transfer_page(&mut db, &None, &CURRENCY, &Page::default())
   1272                 .await
   1273                 .unwrap()
   1274                 .len(),
   1275             2
   1276         );
   1277     }
   1278 
   1279     #[tokio::test]
   1280     async fn bounce() {
   1281         let (mut db, _) = setup().await;
   1282 
   1283         let amount = decimal("10");
   1284         let payto = cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name");
   1285         let now = now_sql_stable_timestamp();
   1286 
   1287         // Bounce
   1288         assert_eq!(
   1289             db::register_bounced_tx_in(
   1290                 &mut db,
   1291                 &TxIn {
   1292                     transfer_id: 12,
   1293                     tx_id: None,
   1294                     amount,
   1295                     subject: "subject".to_owned(),
   1296                     debtor: payto.clone(),
   1297                     valued_at: now
   1298                 },
   1299                 22,
   1300                 "good reason",
   1301                 &now
   1302             )
   1303             .await
   1304             .expect("bounce"),
   1305             BounceResult {
   1306                 tx_id: 1,
   1307                 tx_new: true
   1308             }
   1309         );
   1310         // Idempotent
   1311         assert_eq!(
   1312             db::register_bounced_tx_in(
   1313                 &mut db,
   1314                 &TxIn {
   1315                     transfer_id: 12,
   1316                     tx_id: None,
   1317                     amount: amount.clone(),
   1318                     subject: "subject".to_owned(),
   1319                     debtor: payto.clone(),
   1320                     valued_at: now
   1321                 },
   1322                 22,
   1323                 "good reason",
   1324                 &now
   1325             )
   1326             .await
   1327             .expect("bounce"),
   1328             BounceResult {
   1329                 tx_id: 1,
   1330                 tx_new: false
   1331             }
   1332         );
   1333 
   1334         // Many
   1335         assert_eq!(
   1336             db::register_bounced_tx_in(
   1337                 &mut db,
   1338                 &TxIn {
   1339                     transfer_id: 13,
   1340                     tx_id: None,
   1341                     amount: amount.clone(),
   1342                     subject: "subject".to_owned(),
   1343                     debtor: payto.clone(),
   1344                     valued_at: now
   1345                 },
   1346                 23,
   1347                 "good reason",
   1348                 &now
   1349             )
   1350             .await
   1351             .expect("bounce"),
   1352             BounceResult {
   1353                 tx_id: 2,
   1354                 tx_new: true
   1355             }
   1356         );
   1357     }
   1358 
   1359     #[tokio::test]
   1360     async fn status() {
   1361         let (mut db, _) = setup().await;
   1362         let cyclos_payto = cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name");
   1363 
   1364         async fn check_status(
   1365             db: &mut PgConnection,
   1366             id: u64,
   1367             status: TransferState,
   1368             msg: Option<&str>,
   1369         ) {
   1370             let transfer = db::transfer_by_id(db, id, &CURRENCY)
   1371                 .await
   1372                 .unwrap()
   1373                 .unwrap();
   1374             assert_eq!(
   1375                 (status, msg),
   1376                 (transfer.status, transfer.status_msg.as_deref())
   1377             );
   1378         }
   1379 
   1380         // Unknown transfer
   1381         db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg")
   1382             .await
   1383             .unwrap();
   1384         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1385             .await
   1386             .unwrap();
   1387 
   1388         // Failure
   1389         db::make_transfer(
   1390             &mut db,
   1391             &TransferRequest {
   1392                 request_uid: HashCode::rand(),
   1393                 amount: amount(format!("{}:1", *CURRENCY)),
   1394                 exchange_base_url: url("https://exchange.test.com/"),
   1395                 wtid: ShortHashCode::rand(),
   1396                 credit_account: payto(
   1397                     "payto://iban/HU02162000031000164800000000?receiver-name=name",
   1398                 ),
   1399             },
   1400             &cyclos_payto,
   1401             &Timestamp::now(),
   1402         )
   1403         .await
   1404         .expect("transfer");
   1405         check_status(&mut db, 1, TransferState::pending, None).await;
   1406         db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "error status")
   1407             .await
   1408             .unwrap();
   1409         check_status(
   1410             &mut db,
   1411             1,
   1412             TransferState::permanent_failure,
   1413             Some("error status"),
   1414         )
   1415         .await;
   1416 
   1417         // Success
   1418         db::make_transfer(
   1419             &mut db,
   1420             &TransferRequest {
   1421                 request_uid: HashCode::rand(),
   1422                 amount: amount(format!("{}:2", *CURRENCY)),
   1423                 exchange_base_url: url("https://exchange.test.com/"),
   1424                 wtid: ShortHashCode::rand(),
   1425                 credit_account: payto(
   1426                     "payto://iban/HU02162000031000164800000000?receiver-name=name",
   1427                 ),
   1428             },
   1429             &cyclos_payto,
   1430             &Timestamp::now(),
   1431         )
   1432         .await
   1433         .expect("transfer");
   1434         check_status(&mut db, 2, TransferState::pending, None).await;
   1435         db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3)
   1436             .await
   1437             .unwrap();
   1438         check_status(&mut db, 2, TransferState::pending, None).await;
   1439         db::register_tx_out(
   1440             &mut db,
   1441             &TxOut {
   1442                 transfer_id: 2,
   1443                 tx_id: Some(3),
   1444                 amount: decimal("2"),
   1445                 subject: "".to_string(),
   1446                 creditor: cyclos_payto,
   1447                 valued_at: Timestamp::now(),
   1448             },
   1449             &TxOutKind::Simple,
   1450             &Timestamp::now(),
   1451         )
   1452         .await
   1453         .unwrap();
   1454         check_status(&mut db, 2, TransferState::success, None).await;
   1455     }
   1456 
   1457     #[tokio::test]
   1458     async fn batch() {
   1459         let (mut db, _) = setup().await;
   1460         let start = Timestamp::now();
   1461         let cyclos_payto = cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name");
   1462 
   1463         // Empty db
   1464         let pendings = db::pending_batch(&mut db, &start)
   1465             .await
   1466             .expect("pending_batch");
   1467         assert_eq!(pendings.len(), 0);
   1468 
   1469         // Some transfers
   1470         for i in 0..3 {
   1471             db::make_transfer(
   1472                 &mut db,
   1473                 &TransferRequest {
   1474                     request_uid: HashCode::rand(),
   1475                     amount: amount(format!("{}:{}", *CURRENCY, i + 1)),
   1476                     exchange_base_url: url("https://exchange.test.com/"),
   1477                     wtid: ShortHashCode::rand(),
   1478                     credit_account: payto(
   1479                         "payto://iban/HU02162000031000164800000000?receiver-name=name",
   1480                     ),
   1481                 },
   1482                 &cyclos_payto,
   1483                 &Timestamp::now(),
   1484             )
   1485             .await
   1486             .expect("transfer");
   1487         }
   1488         let pendings = db::pending_batch(&mut db, &start)
   1489             .await
   1490             .expect("pending_batch");
   1491         assert_eq!(pendings.len(), 3);
   1492 
   1493         // Max 100 txs in batch
   1494         for i in 0..100 {
   1495             db::make_transfer(
   1496                 &mut db,
   1497                 &TransferRequest {
   1498                     request_uid: HashCode::rand(),
   1499                     amount: amount(format!("{}:{}", *CURRENCY, i + 1)),
   1500                     exchange_base_url: url("https://exchange.test.com/"),
   1501                     wtid: ShortHashCode::rand(),
   1502                     credit_account: payto(
   1503                         "payto://iban/HU02162000031000164800000000?receiver-name=name",
   1504                     ),
   1505                 },
   1506                 &cyclos_payto,
   1507                 &Timestamp::now(),
   1508             )
   1509             .await
   1510             .expect("transfer");
   1511         }
   1512         let pendings = db::pending_batch(&mut db, &start)
   1513             .await
   1514             .expect("pending_batch");
   1515         assert_eq!(pendings.len(), 100);
   1516 
   1517         // Skip uploaded
   1518         for i in 0..=10 {
   1519             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1520                 .await
   1521                 .expect("status success");
   1522         }
   1523         let pendings = db::pending_batch(&mut db, &start)
   1524             .await
   1525             .expect("pending_batch");
   1526         assert_eq!(pendings.len(), 93);
   1527 
   1528         // Skip failed
   1529         for i in 0..=10 {
   1530             db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure")
   1531                 .await
   1532                 .expect("status failure");
   1533         }
   1534         let pendings = db::pending_batch(&mut db, &start)
   1535             .await
   1536             .expect("pending_batch");
   1537         assert_eq!(pendings.len(), 83);
   1538     }
   1539 }