taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (46515B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::fmt::Display;
     18 
     19 use compact_str::CompactString;
     20 use jiff::Timestamp;
     21 use serde::{Serialize, de::DeserializeOwned};
     22 use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow};
     23 use taler_api::{
     24     db::{BindHelper, IncomingType, TypeHelper, history, page},
     25     subject::{IncomingSubject, OutgoingSubject},
     26 };
     27 use taler_common::{
     28     api_common::{HashCode, ShortHashCode},
     29     api_params::{History, Page},
     30     api_revenue::RevenueIncomingBankTransaction,
     31     api_wire::{
     32         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState,
     33         TransferStatus,
     34     },
     35     config::Config,
     36     types::{
     37         amount::{Currency, Decimal},
     38         payto::{PaytoImpl as _, PaytoURI},
     39     },
     40 };
     41 use tokio::sync::watch::{Receiver, Sender};
     42 use url::Url;
     43 
     44 use crate::{
     45     config::parse_db_cfg,
     46     payto::{CyclosAccount, CyclosId, FullCyclosPayto},
     47 };
     48 
     49 const SCHEMA: &str = "cyclos";
     50 
     51 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> {
     52     let db = parse_db_cfg(cfg)?;
     53     let pool = taler_common::db::pool(db.cfg, SCHEMA).await?;
     54     Ok(pool)
     55 }
     56 
     57 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> {
     58     let db_cfg = parse_db_cfg(cfg)?;
     59     let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?;
     60     let mut db = pool.acquire().await?;
     61     taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?;
     62     Ok(pool)
     63 }
     64 
     65 pub async fn notification_listener(
     66     pool: PgPool,
     67     in_channel: Sender<i64>,
     68     taler_in_channel: Sender<i64>,
     69     out_channel: Sender<i64>,
     70     taler_out_channel: Sender<i64>,
     71 ) -> sqlx::Result<()> {
     72     taler_api::notification::notification_listener!(&pool,
     73         "tx_in" => (row_id: i64) {
     74             in_channel.send_replace(row_id);
     75         },
     76         "taler_in" => (row_id: i64) {
     77             taler_in_channel.send_replace(row_id);
     78         },
     79         "tx_out" => (row_id: i64) {
     80             out_channel.send_replace(row_id);
     81         },
     82         "taler_out" => (row_id: i64) {
     83             taler_out_channel.send_replace(row_id);
     84         }
     85     )
     86 }
     87 
     88 #[derive(Debug, Clone)]
     89 pub struct TxIn {
     90     pub transfer_id: i64,
     91     pub tx_id: Option<i64>,
     92     pub amount: Decimal,
     93     pub subject: String,
     94     pub debtor_id: i64,
     95     pub debtor_name: String,
     96     pub valued_at: Timestamp,
     97 }
     98 
     99 impl Display for TxIn {
    100     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    101         let Self {
    102             transfer_id,
    103             tx_id,
    104             amount,
    105             subject,
    106             valued_at,
    107             debtor_id,
    108             debtor_name,
    109         } = self;
    110         let tx_id = match tx_id {
    111             Some(id) => format_args!(":{}", *id),
    112             None => format_args!(""),
    113         };
    114         write!(
    115             f,
    116             "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'"
    117         )
    118     }
    119 }
    120 
    121 #[derive(Debug, Clone)]
    122 pub struct TxOut {
    123     pub transfer_id: i64,
    124     pub tx_id: Option<i64>,
    125     pub amount: Decimal,
    126     pub subject: String,
    127     pub creditor_id: i64,
    128     pub creditor_name: String,
    129     pub valued_at: Timestamp,
    130 }
    131 
    132 impl Display for TxOut {
    133     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    134         let Self {
    135             transfer_id,
    136             tx_id,
    137             amount,
    138             subject,
    139             creditor_id,
    140             creditor_name,
    141             valued_at,
    142         } = self;
    143         let tx_id = match tx_id {
    144             Some(id) => format_args!(":{}", *id),
    145             None => format_args!(""),
    146         };
    147         write!(
    148             f,
    149             "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    150         )
    151     }
    152 }
    153 
    154 #[derive(Debug, PartialEq, Eq)]
    155 pub struct Initiated {
    156     pub id: i64,
    157     pub amount: Decimal,
    158     pub subject: String,
    159     pub creditor_id: i64,
    160     pub creditor_name: String,
    161 }
    162 
    163 impl Display for Initiated {
    164     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
    165         let Self {
    166             id,
    167             amount,
    168             subject,
    169             creditor_id,
    170             creditor_name,
    171         } = self;
    172         write!(
    173             f,
    174             "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'"
    175         )
    176     }
    177 }
    178 
    179 #[derive(Debug, Clone)]
    180 pub struct TxInAdmin {
    181     pub amount: Decimal,
    182     pub subject: String,
    183     pub debtor_id: i64,
    184     pub debtor_name: CompactString,
    185     pub metadata: IncomingSubject,
    186 }
    187 
    188 #[derive(Debug, PartialEq, Eq)]
    189 pub enum AddIncomingResult {
    190     Success {
    191         new: bool,
    192         row_id: u64,
    193         valued_at: Timestamp,
    194     },
    195     ReservePubReuse,
    196 }
    197 
    198 /// Lock the database for worker execution
    199 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
    200     sqlx::query("SELECT pg_try_advisory_lock(42)")
    201         .try_map(|r: PgRow| r.try_get(0))
    202         .fetch_one(e)
    203         .await
    204 }
    205 
    206 pub async fn register_tx_in_admin(
    207     db: &PgPool,
    208     tx: &TxInAdmin,
    209     now: &Timestamp,
    210 ) -> sqlx::Result<AddIncomingResult> {
    211     sqlx::query(
    212         "
    213             SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new
    214             FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5)
    215         ",
    216     )
    217     .bind(tx.amount)
    218     .bind(&tx.subject)
    219     .bind(tx.debtor_id)
    220     .bind(&tx.debtor_name)
    221     .bind_timestamp(now)
    222     .bind(tx.metadata.ty())
    223     .bind(tx.metadata.key())
    224     .try_map(|r: PgRow| {
    225         Ok(if r.try_get_flag(0)? {
    226             AddIncomingResult::ReservePubReuse
    227         } else {
    228             AddIncomingResult::Success {
    229                 row_id: r.try_get_u64(1)?,
    230                 valued_at: r.try_get_timestamp(2)?,
    231                 new: r.try_get(3)?,
    232             }
    233         })
    234     })
    235     .fetch_one(db)
    236     .await
    237 }
    238 
    239 pub async fn register_tx_in(
    240     db: &mut PgConnection,
    241     tx: &TxIn,
    242     subject: &Option<IncomingSubject>,
    243     now: &Timestamp,
    244 ) -> sqlx::Result<AddIncomingResult> {
    245     sqlx::query(
    246         "
    247             SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new
    248             FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    249         ",
    250     )
    251     .bind(tx.transfer_id)
    252     .bind(tx.tx_id)
    253     .bind(tx.amount)
    254     .bind(&tx.subject)
    255     .bind(tx.debtor_id)
    256     .bind(&tx.debtor_name)
    257     .bind(tx.valued_at.as_microsecond())
    258     .bind(subject.as_ref().map(|it| it.ty()))
    259     .bind(subject.as_ref().map(|it| it.key()))
    260     .bind(now.as_microsecond())
    261     .try_map(|r: PgRow| {
    262         Ok(if r.try_get_flag(0)? {
    263             AddIncomingResult::ReservePubReuse
    264         } else {
    265             AddIncomingResult::Success {
    266                 row_id: r.try_get_u64(1)?,
    267                 valued_at: r.try_get_timestamp(2)?,
    268                 new: r.try_get(3)?,
    269             }
    270         })
    271     })
    272     .fetch_one(db)
    273     .await
    274 }
    275 
    276 #[derive(Debug)]
    277 pub enum TxOutKind {
    278     Simple,
    279     Bounce(i64),
    280     Talerable(OutgoingSubject),
    281 }
    282 
    283 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
    284 #[allow(non_camel_case_types)]
    285 #[sqlx(type_name = "register_result")]
    286 pub enum RegisterResult {
    287     /// Already registered
    288     idempotent,
    289     /// Initiated transaction
    290     known,
    291     /// Recovered unknown outgoing transaction
    292     recovered,
    293 }
    294 
    295 #[derive(Debug, PartialEq, Eq)]
    296 pub struct AddOutgoingResult {
    297     pub result: RegisterResult,
    298     pub row_id: i64,
    299 }
    300 
    301 pub async fn register_tx_out(
    302     db: &mut PgConnection,
    303     tx: &TxOut,
    304     kind: &TxOutKind,
    305     now: &Timestamp,
    306 ) -> sqlx::Result<AddOutgoingResult> {
    307     let query = sqlx::query(
    308         "
    309             SELECT out_result, out_tx_row_id
    310             FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
    311         ",
    312     )
    313     .bind(tx.transfer_id)
    314     .bind(tx.tx_id)
    315     .bind(tx.amount)
    316     .bind(&tx.subject)
    317     .bind(tx.creditor_id)
    318     .bind(&tx.creditor_name)
    319     .bind_timestamp(&tx.valued_at);
    320     let query = match kind {
    321         TxOutKind::Simple => query
    322             .bind(None::<&[u8]>)
    323             .bind(None::<&str>)
    324             .bind(None::<i64>),
    325         TxOutKind::Bounce(bounced) => query.bind(None::<&[u8]>).bind(None::<&str>).bind(*bounced),
    326         TxOutKind::Talerable(subject) => query
    327             .bind(subject.wtid.as_ref())
    328             .bind(subject.exchange_base_url.as_ref())
    329             .bind(None::<i64>),
    330     };
    331     query
    332         .bind_timestamp(now)
    333         .try_map(|r: PgRow| {
    334             Ok(AddOutgoingResult {
    335                 result: r.try_get(0)?,
    336                 row_id: r.try_get(1)?,
    337             })
    338         })
    339         .fetch_one(db)
    340         .await
    341 }
    342 
    343 #[derive(Debug, PartialEq, Eq)]
    344 pub enum TransferResult {
    345     Success { id: u64, initiated_at: Timestamp },
    346     RequestUidReuse,
    347     WtidReuse,
    348 }
    349 
    350 #[derive(Debug, Clone)]
    351 pub struct Transfer {
    352     pub request_uid: HashCode,
    353     pub amount: Decimal,
    354     pub exchange_base_url: Url,
    355     pub wtid: ShortHashCode,
    356     pub creditor_id: i64,
    357     pub creditor_name: CompactString,
    358 }
    359 
    360 pub async fn make_transfer<'a>(
    361     db: impl PgExecutor<'a>,
    362     tx: &Transfer,
    363     now: &Timestamp,
    364 ) -> sqlx::Result<TransferResult> {
    365     let subject = format!("{} {}", tx.wtid, tx.exchange_base_url);
    366     sqlx::query(
    367         "
    368             SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at
    369             FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8)
    370         ",
    371     )
    372     .bind(tx.request_uid.as_ref())
    373     .bind(tx.wtid.as_ref())
    374     .bind(&subject)
    375     .bind(tx.amount)
    376     .bind(tx.exchange_base_url.as_str())
    377     .bind(tx.creditor_id)
    378     .bind(&tx.creditor_name)
    379     .bind_timestamp(now)
    380     .try_map(|r: PgRow| {
    381         Ok(if r.try_get_flag(0)? {
    382             TransferResult::RequestUidReuse
    383         } else if r.try_get_flag(1)? {
    384             TransferResult::WtidReuse
    385         } else {
    386             TransferResult::Success {
    387                 id: r.try_get_u64(2)?,
    388                 initiated_at: r.try_get_timestamp(3)?,
    389             }
    390         })
    391     })
    392     .fetch_one(db)
    393     .await
    394 }
    395 
    396 #[derive(Debug, PartialEq, Eq)]
    397 pub struct BounceResult {
    398     pub tx_id: u64,
    399     pub tx_new: bool,
    400 }
    401 
    402 pub async fn register_bounced_tx_in(
    403     db: &mut PgConnection,
    404     tx: &TxIn,
    405     chargeback_id: i64,
    406     reason: &str,
    407     now: &Timestamp,
    408 ) -> sqlx::Result<BounceResult> {
    409     sqlx::query(
    410         "
    411             SELECT out_tx_row_id, out_tx_new
    412             FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    413         ",
    414     )
    415     .bind(tx.transfer_id)
    416     .bind(tx.tx_id)
    417     .bind(tx.amount)
    418     .bind(&tx.subject)
    419     .bind(tx.debtor_id)
    420     .bind(&tx.debtor_name)
    421     .bind_timestamp(&tx.valued_at)
    422     .bind(chargeback_id)
    423     .bind(reason)
    424     .bind_timestamp(now)
    425     .try_map(|r: PgRow| {
    426         Ok(BounceResult {
    427             tx_id: r.try_get_u64(0)?,
    428             tx_new: r.try_get(1)?,
    429         })
    430     })
    431     .fetch_one(db)
    432     .await
    433 }
    434 
    435 pub async fn transfer_page<'a>(
    436     db: impl PgExecutor<'a>,
    437     status: &Option<TransferState>,
    438     currency: &Currency,
    439     root: &CompactString,
    440     params: &Page,
    441 ) -> sqlx::Result<Vec<TransferListStatus>> {
    442     page(
    443         db,
    444         "initiated_id",
    445         params,
    446         || {
    447             let mut builder = QueryBuilder::new(
    448                 "
    449                     SELECT
    450                         initiated_id,
    451                         status,
    452                         amount,
    453                         credit_account,
    454                         credit_name,
    455                         initiated_at
    456                     FROM transfer
    457                     JOIN initiated USING (initiated_id)
    458                     WHERE
    459                 ",
    460             );
    461             if let Some(status) = status {
    462                 builder.push(" status = ").push_bind(status).push(" AND ");
    463             }
    464             builder
    465         },
    466         |r: PgRow| {
    467             Ok(TransferListStatus {
    468                 row_id: r.try_get_safeu64(0)?,
    469                 status: r.try_get(1)?,
    470                 amount: r.try_get_amount(2, currency)?,
    471                 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    472                 timestamp: r.try_get_timestamp(5)?.into(),
    473             })
    474         },
    475     )
    476     .await
    477 }
    478 
    479 pub async fn outgoing_history(
    480     db: &PgPool,
    481     params: &History,
    482     currency: &Currency,
    483     root: &CompactString,
    484     listen: impl FnOnce() -> Receiver<i64>,
    485 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    486     history(
    487         db,
    488         "tx_out_id",
    489         params,
    490         listen,
    491         || {
    492             QueryBuilder::new(
    493                 "
    494                 SELECT
    495                     tx_out_id,
    496                     amount,
    497                     credit_account,
    498                     credit_name,
    499                     valued_at,
    500                     exchange_base_url,
    501                     wtid
    502                 FROM taler_out
    503                 JOIN tx_out USING (tx_out_id)
    504                 WHERE
    505             ",
    506             )
    507         },
    508         |r: PgRow| {
    509             Ok(OutgoingBankTransaction {
    510                 row_id: r.try_get_safeu64(0)?,
    511                 amount: r.try_get_amount(1, currency)?,
    512                 debit_fee: None,
    513                 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?,
    514                 date: r.try_get_timestamp(4)?.into(),
    515                 exchange_base_url: r.try_get_url(5)?,
    516                 wtid: r.try_get(6)?,
    517             })
    518         },
    519     )
    520     .await
    521 }
    522 
    523 pub async fn incoming_history(
    524     db: &PgPool,
    525     params: &History,
    526     currency: &Currency,
    527     root: &CompactString,
    528     listen: impl FnOnce() -> Receiver<i64>,
    529 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    530     history(
    531         db,
    532         "tx_in_id",
    533         params,
    534         listen,
    535         || {
    536             QueryBuilder::new(
    537                 "
    538                 SELECT
    539                     type,
    540                     tx_in_id,
    541                     amount,
    542                     debit_account,
    543                     debit_name,
    544                     valued_at,
    545                     metadata
    546                 FROM taler_in
    547                 JOIN tx_in USING (tx_in_id)
    548                 WHERE
    549             ",
    550             )
    551         },
    552         |r: PgRow| {
    553             Ok(match r.try_get(0)? {
    554                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    555                     row_id: r.try_get_safeu64(1)?,
    556                     amount: r.try_get_amount(2, currency)?,
    557                     credit_fee: None,
    558                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    559                     date: r.try_get_timestamp(5)?.into(),
    560                     reserve_pub: r.try_get(6)?,
    561                     authorization_pub: None,
    562                     authorization_sig: None,
    563                 },
    564                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    565                     row_id: r.try_get_safeu64(1)?,
    566                     amount: r.try_get_amount(2, currency)?,
    567                     credit_fee: None,
    568                     debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    569                     date: r.try_get_timestamp(5)?.into(),
    570                     account_pub: r.try_get(6)?,
    571                     authorization_pub: None,
    572                     authorization_sig: None,
    573                 },
    574                 IncomingType::wad => {
    575                     unimplemented!("WAD is not yet supported")
    576                 }
    577             })
    578         },
    579     )
    580     .await
    581 }
    582 
    583 pub async fn revenue_history(
    584     db: &PgPool,
    585     params: &History,
    586     currency: &Currency,
    587     root: &CompactString,
    588     listen: impl FnOnce() -> Receiver<i64>,
    589 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    590     history(
    591         db,
    592         "tx_in_id",
    593         params,
    594         listen,
    595         || {
    596             QueryBuilder::new(
    597                 "
    598                 SELECT
    599                     tx_in_id,
    600                     valued_at,
    601                     amount,
    602                     debit_account,
    603                     debit_name,
    604                     subject
    605                 FROM tx_in
    606                 WHERE
    607             ",
    608             )
    609         },
    610         |r: PgRow| {
    611             Ok(RevenueIncomingBankTransaction {
    612                 row_id: r.try_get_safeu64(0)?,
    613                 date: r.try_get_timestamp(1)?.into(),
    614                 amount: r.try_get_amount(2, currency)?,
    615                 credit_fee: None,
    616                 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?,
    617                 subject: r.try_get(5)?,
    618             })
    619         },
    620     )
    621     .await
    622 }
    623 
    624 pub async fn transfer_by_id<'a>(
    625     db: impl PgExecutor<'a>,
    626     id: u64,
    627     currency: &Currency,
    628     root: &CompactString,
    629 ) -> sqlx::Result<Option<TransferStatus>> {
    630     sqlx::query(
    631         "
    632             SELECT
    633                 status,
    634                 status_msg,
    635                 amount,
    636                 exchange_base_url,
    637                 wtid,
    638                 credit_account,
    639                 credit_name,
    640                 initiated_at
    641             FROM transfer
    642             JOIN initiated USING (initiated_id)
    643             WHERE initiated_id = $1
    644         ",
    645     )
    646     .bind(id as i64)
    647     .try_map(|r: PgRow| {
    648         Ok(TransferStatus {
    649             status: r.try_get(0)?,
    650             status_msg: r.try_get(1)?,
    651             amount: r.try_get_amount(2, currency)?,
    652             origin_exchange_url: r.try_get(3)?,
    653             wtid: r.try_get(4)?,
    654             credit_account: r.try_get_cyclos_fullpaytouri(5, 6, root)?,
    655             timestamp: r.try_get_timestamp(7)?.into(),
    656         })
    657     })
    658     .fetch_optional(db)
    659     .await
    660 }
    661 
    662 /** Get a batch of pending initiated transactions not attempted since [start] */
    663 pub async fn pending_batch<'a>(
    664     db: impl PgExecutor<'a>,
    665     start: &Timestamp,
    666 ) -> sqlx::Result<Vec<Initiated>> {
    667     sqlx::query(
    668         "
    669             SELECT initiated_id, amount, subject, credit_account, credit_name
    670             FROM initiated
    671             WHERE tx_id IS NULL
    672                 AND status='pending'
    673                 AND (last_submitted IS NULL OR last_submitted < $1)
    674             LIMIT 100
    675         ",
    676     )
    677     .bind_timestamp(start)
    678     .try_map(|r: PgRow| {
    679         Ok(Initiated {
    680             id: r.try_get(0)?,
    681             amount: r.try_get(1)?,
    682             subject: r.try_get(2)?,
    683             creditor_id: r.try_get(3)?,
    684             creditor_name: r.try_get(4)?,
    685         })
    686     })
    687     .fetch_all(db)
    688     .await
    689 }
    690 
    691 /** Update status of a successful submitted initiated transaction */
    692 pub async fn initiated_submit_success<'a>(
    693     db: impl PgExecutor<'a>,
    694     initiated_id: i64,
    695     timestamp: &Timestamp,
    696     tx_id: i64,
    697 ) -> sqlx::Result<()> {
    698     sqlx::query(
    699         "
    700             UPDATE initiated
    701             SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2
    702             WHERE initiated_id=$3
    703         "
    704     ).bind_timestamp(timestamp)
    705     .bind(tx_id)
    706     .bind(initiated_id)
    707     .execute(db).await?;
    708     Ok(())
    709 }
    710 
    711 /** Update status of a permanently failed initiated transaction */
    712 pub async fn initiated_submit_permanent_failure<'a>(
    713     db: impl PgExecutor<'a>,
    714     initiated_id: i64,
    715     msg: &str,
    716 ) -> sqlx::Result<()> {
    717     sqlx::query(
    718         "
    719             UPDATE initiated
    720             SET status='permanent_failure', status_msg=$1
    721             WHERE initiated_id=$2
    722         ",
    723     )
    724     .bind(msg)
    725     .bind(initiated_id)
    726     .execute(db)
    727     .await?;
    728     Ok(())
    729 }
    730 
    731 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    732 pub enum ChargebackFailureResult {
    733     Unknown,
    734     Known(u64),
    735     Idempotent(u64),
    736 }
    737 
    738 /** Update status of a charged back initiated transaction */
    739 pub async fn initiated_chargeback_failure(
    740     db: &mut PgConnection,
    741     transfer_id: i64,
    742 ) -> sqlx::Result<ChargebackFailureResult> {
    743     Ok(
    744         sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)")
    745             .bind(transfer_id)
    746             .try_map(|r: PgRow| {
    747                 let id = r.try_get_u64(0)?;
    748                 Ok(if id == 0 {
    749                     ChargebackFailureResult::Unknown
    750                 } else if r.try_get(1)? {
    751                     ChargebackFailureResult::Known(id)
    752                 } else {
    753                     ChargebackFailureResult::Idempotent(id)
    754                 })
    755             })
    756             .fetch_optional(db)
    757             .await?
    758             .unwrap_or(ChargebackFailureResult::Unknown),
    759     )
    760 }
    761 
    762 /** Get JSON value from KV table */
    763 pub async fn kv_get<'a, T: DeserializeOwned + Unpin + Send>(
    764     db: impl PgExecutor<'a>,
    765     key: &str,
    766 ) -> sqlx::Result<Option<T>> {
    767     sqlx::query("SELECT value FROM kv WHERE key=$1")
    768         .bind(key)
    769         .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
    770         .fetch_optional(db)
    771         .await
    772 }
    773 
    774 /** Set JSON value in KV table */
    775 pub async fn kv_set<'a, T: Serialize>(
    776     db: impl PgExecutor<'a>,
    777     key: &str,
    778     value: &T,
    779 ) -> sqlx::Result<()> {
    780     sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
    781         .bind(key)
    782         .bind(sqlx::types::Json(value))
    783         .execute(db)
    784         .await?;
    785     Ok(())
    786 }
    787 
    788 pub trait CyclosTypeHelper {
    789     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    790         &self,
    791         idx: I,
    792         name: I,
    793         root: &CompactString,
    794     ) -> sqlx::Result<FullCyclosPayto>;
    795     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    796         &self,
    797         idx: I,
    798         name: I,
    799         root: &CompactString,
    800     ) -> sqlx::Result<PaytoURI>;
    801 }
    802 
    803 impl CyclosTypeHelper for PgRow {
    804     fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>(
    805         &self,
    806         idx: I,
    807         name: I,
    808         root: &CompactString,
    809     ) -> sqlx::Result<FullCyclosPayto> {
    810         let idx = self.try_get(idx)?;
    811         let name = self.try_get(name)?;
    812         Ok(FullCyclosPayto::new(
    813             CyclosAccount {
    814                 id: CyclosId(idx),
    815                 root: root.clone(),
    816             },
    817             name,
    818         ))
    819     }
    820     fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>(
    821         &self,
    822         idx: I,
    823         name: I,
    824         root: &CompactString,
    825     ) -> sqlx::Result<PaytoURI> {
    826         let idx = self.try_get(idx)?;
    827         let name = self.try_get(name)?;
    828         Ok(CyclosAccount {
    829             id: CyclosId(idx),
    830             root: root.clone(),
    831         }
    832         .as_full_payto(name))
    833     }
    834 }
    835 
    836 #[cfg(test)]
    837 mod test {
    838     use std::sync::LazyLock;
    839 
    840     use compact_str::CompactString;
    841     use jiff::{Span, Timestamp};
    842     use serde_json::json;
    843     use sqlx::{PgConnection, PgPool, Row as _, postgres::PgRow};
    844     use taler_api::{
    845         db::TypeHelper,
    846         notification::dummy_listen,
    847         subject::{IncomingSubject, OutgoingSubject},
    848     };
    849     use taler_common::{
    850         api_common::{EddsaPublicKey, HashCode, ShortHashCode},
    851         api_params::{History, Page},
    852         api_wire::TransferState,
    853         types::{
    854             amount::{Currency, decimal},
    855             url,
    856             utils::now_sql_stable_timestamp,
    857         },
    858     };
    859 
    860     use crate::{
    861         constants::CONFIG_SOURCE,
    862         db::{
    863             self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult,
    864             Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind,
    865         },
    866     };
    867 
    868     pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "TEST".parse().unwrap());
    869     pub const ROOT: CompactString = CompactString::const_new("localhost");
    870 
    871     async fn setup() -> (PgConnection, PgPool) {
    872         let pool = taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await;
    873         let conn = pool.acquire().await.unwrap().leak();
    874         (conn, pool)
    875     }
    876 
    877     #[tokio::test]
    878     async fn kv() {
    879         let (mut db, _) = setup().await;
    880 
    881         let value = json!({
    882             "name": "Mr Smith",
    883             "no way": 32
    884         });
    885 
    886         assert_eq!(
    887             db::kv_get::<serde_json::Value>(&mut db, "value")
    888                 .await
    889                 .unwrap(),
    890             None
    891         );
    892         db::kv_set(&mut db, "value", &value).await.unwrap();
    893         db::kv_set(&mut db, "value", &value).await.unwrap();
    894         assert_eq!(
    895             db::kv_get::<serde_json::Value>(&mut db, "value")
    896                 .await
    897                 .unwrap(),
    898             Some(value)
    899         );
    900     }
    901 
    902     #[tokio::test]
    903     async fn tx_in() {
    904         let (mut db, pool) = setup().await;
    905 
    906         async fn routine(
    907             db: &mut PgConnection,
    908             first: &Option<IncomingSubject>,
    909             second: &Option<IncomingSubject>,
    910         ) {
    911             let id = sqlx::query("SELECT count(*) + 1 FROM tx_in")
    912                 .try_map(|r: PgRow| r.try_get_u64(0))
    913                 .fetch_one(&mut *db)
    914                 .await
    915                 .unwrap();
    916             let now = now_sql_stable_timestamp();
    917             let later = now + Span::new().hours(2);
    918             let tx = TxIn {
    919                 transfer_id: now.as_microsecond() as i64,
    920                 tx_id: None,
    921                 amount: decimal("10"),
    922                 subject: "subject".to_owned(),
    923                 debtor_id: 31000163100000000,
    924                 debtor_name: "Name".to_string(),
    925                 valued_at: now,
    926             };
    927             // Insert
    928             assert_eq!(
    929                 db::register_tx_in(db, &tx, &first, &now)
    930                     .await
    931                     .expect("register tx in"),
    932                 AddIncomingResult::Success {
    933                     new: true,
    934                     row_id: id,
    935                     valued_at: now
    936                 }
    937             );
    938             // Idempotent
    939             assert_eq!(
    940                 db::register_tx_in(
    941                     db,
    942                     &TxIn {
    943                         valued_at: later,
    944                         ..tx.clone()
    945                     },
    946                     &first,
    947                     &now
    948                 )
    949                 .await
    950                 .expect("register tx in"),
    951                 AddIncomingResult::Success {
    952                     new: false,
    953                     row_id: id,
    954                     valued_at: now
    955                 }
    956             );
    957             // Many
    958             assert_eq!(
    959                 db::register_tx_in(
    960                     db,
    961                     &TxIn {
    962                         transfer_id: later.as_microsecond() as i64,
    963                         valued_at: later,
    964                         ..tx
    965                     },
    966                     &second,
    967                     &now
    968                 )
    969                 .await
    970                 .expect("register tx in"),
    971                 AddIncomingResult::Success {
    972                     new: true,
    973                     row_id: id + 1,
    974                     valued_at: later
    975                 }
    976             );
    977         }
    978 
    979         // Empty db
    980         assert_eq!(
    981             db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
    982                 .await
    983                 .unwrap(),
    984             Vec::new()
    985         );
    986         assert_eq!(
    987             db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
    988                 .await
    989                 .unwrap(),
    990             Vec::new()
    991         );
    992 
    993         // Regular transaction
    994         routine(&mut db, &None, &None).await;
    995 
    996         // Reserve transaction
    997         routine(
    998             &mut db,
    999             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1000             &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
   1001         )
   1002         .await;
   1003 
   1004         // Kyc transaction
   1005         routine(
   1006             &mut db,
   1007             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1008             &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
   1009         )
   1010         .await;
   1011 
   1012         // History
   1013         assert_eq!(
   1014             db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1015                 .await
   1016                 .unwrap()
   1017                 .len(),
   1018             6
   1019         );
   1020         assert_eq!(
   1021             db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1022                 .await
   1023                 .unwrap()
   1024                 .len(),
   1025             4
   1026         );
   1027     }
   1028 
   1029     #[tokio::test]
   1030     async fn tx_in_admin() {
   1031         let (_, pool) = setup().await;
   1032 
   1033         // Empty db
   1034         assert_eq!(
   1035             db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1036                 .await
   1037                 .unwrap(),
   1038             Vec::new()
   1039         );
   1040 
   1041         let now = now_sql_stable_timestamp();
   1042         let later = now + Span::new().hours(2);
   1043         let tx = TxInAdmin {
   1044             amount: decimal("10"),
   1045             subject: "subject".to_owned(),
   1046             debtor_id: 31000163100000000,
   1047             debtor_name: "Name".into(),
   1048             metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1049         };
   1050         // Insert
   1051         assert_eq!(
   1052             db::register_tx_in_admin(&pool, &tx, &now)
   1053                 .await
   1054                 .expect("register tx in"),
   1055             AddIncomingResult::Success {
   1056                 new: true,
   1057                 row_id: 1,
   1058                 valued_at: now
   1059             }
   1060         );
   1061         // Idempotent
   1062         assert_eq!(
   1063             db::register_tx_in_admin(&pool, &tx, &later)
   1064                 .await
   1065                 .expect("register tx in"),
   1066             AddIncomingResult::Success {
   1067                 new: false,
   1068                 row_id: 1,
   1069                 valued_at: now
   1070             }
   1071         );
   1072         // Many
   1073         assert_eq!(
   1074             db::register_tx_in_admin(
   1075                 &pool,
   1076                 &TxInAdmin {
   1077                     subject: "Other".to_owned(),
   1078                     metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
   1079                     ..tx.clone()
   1080                 },
   1081                 &later
   1082             )
   1083             .await
   1084             .expect("register tx in"),
   1085             AddIncomingResult::Success {
   1086                 new: true,
   1087                 row_id: 2,
   1088                 valued_at: later
   1089             }
   1090         );
   1091 
   1092         // History
   1093         assert_eq!(
   1094             db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1095                 .await
   1096                 .unwrap()
   1097                 .len(),
   1098             2
   1099         );
   1100     }
   1101 
   1102     #[tokio::test]
   1103     async fn tx_out() {
   1104         let (mut db, pool) = setup().await;
   1105 
   1106         async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) {
   1107             let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out")
   1108                 .try_map(|r: PgRow| r.try_get(0))
   1109                 .fetch_one(&mut *db)
   1110                 .await
   1111                 .unwrap();
   1112             let now = now_sql_stable_timestamp();
   1113             let later = now + Span::new().hours(2);
   1114             let tx = TxOut {
   1115                 transfer_id,
   1116                 tx_id: Some(transfer_id),
   1117                 amount: decimal("10"),
   1118                 subject: "subject".to_owned(),
   1119                 creditor_id: 31000163100000000,
   1120                 creditor_name: "Name".to_string(),
   1121                 valued_at: now,
   1122             };
   1123             assert!(matches!(
   1124                 db::make_transfer(
   1125                     &mut *db,
   1126                     &Transfer {
   1127                         request_uid: HashCode::rand(),
   1128                         amount: decimal("10"),
   1129                         exchange_base_url: url("https://exchange.test.com/"),
   1130                         wtid: ShortHashCode::rand(),
   1131                         creditor_id: 31000163100000000,
   1132                         creditor_name: "Name".into()
   1133                     },
   1134                     &now
   1135                 )
   1136                 .await
   1137                 .unwrap(),
   1138                 TransferResult::Success { .. }
   1139             ));
   1140             db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), transfer_id)
   1141                 .await
   1142                 .expect("status success");
   1143 
   1144             // Insert
   1145             assert_eq!(
   1146                 db::register_tx_out(&mut *db, &tx, first, &now)
   1147                     .await
   1148                     .expect("register tx out"),
   1149                 AddOutgoingResult {
   1150                     result: db::RegisterResult::known,
   1151                     row_id: transfer_id,
   1152                 }
   1153             );
   1154             // Idempotent
   1155             assert_eq!(
   1156                 db::register_tx_out(
   1157                     &mut *db,
   1158                     &TxOut {
   1159                         valued_at: later,
   1160                         ..tx.clone()
   1161                     },
   1162                     first,
   1163                     &now
   1164                 )
   1165                 .await
   1166                 .expect("register tx out"),
   1167                 AddOutgoingResult {
   1168                     result: db::RegisterResult::idempotent,
   1169                     row_id: transfer_id,
   1170                 }
   1171             );
   1172             // Recovered
   1173             assert_eq!(
   1174                 db::register_tx_out(
   1175                     &mut *db,
   1176                     &TxOut {
   1177                         transfer_id: transfer_id + 1,
   1178                         tx_id: Some(transfer_id + 1),
   1179                         valued_at: later,
   1180                         ..tx.clone()
   1181                     },
   1182                     second,
   1183                     &now
   1184                 )
   1185                 .await
   1186                 .expect("register tx out"),
   1187                 AddOutgoingResult {
   1188                     result: db::RegisterResult::recovered,
   1189                     row_id: transfer_id + 1,
   1190                 }
   1191             );
   1192         }
   1193 
   1194         // Empty db
   1195         assert_eq!(
   1196             db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1197                 .await
   1198                 .unwrap(),
   1199             Vec::new()
   1200         );
   1201 
   1202         // Regular transaction
   1203         routine(&mut db, &TxOutKind::Simple, &TxOutKind::Simple).await;
   1204 
   1205         // Talerable transaction
   1206         routine(
   1207             &mut db,
   1208             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1209             &TxOutKind::Talerable(OutgoingSubject::rand()),
   1210         )
   1211         .await;
   1212 
   1213         // Bounced transaction
   1214         routine(&mut db, &TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
   1215 
   1216         // History
   1217         assert_eq!(
   1218             db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen)
   1219                 .await
   1220                 .unwrap()
   1221                 .len(),
   1222             2
   1223         );
   1224     }
   1225 
   1226     // TODO tx out failure
   1227 
   1228     #[tokio::test]
   1229     async fn transfer() {
   1230         let (mut db, _) = setup().await;
   1231 
   1232         // Empty db
   1233         assert_eq!(
   1234             db::transfer_by_id(&mut db, 0, &CURRENCY, &ROOT)
   1235                 .await
   1236                 .unwrap(),
   1237             None
   1238         );
   1239         assert_eq!(
   1240             db::transfer_page(&mut db, &None, &CURRENCY, &ROOT, &Page::default())
   1241                 .await
   1242                 .unwrap(),
   1243             Vec::new()
   1244         );
   1245 
   1246         let req = Transfer {
   1247             request_uid: HashCode::rand(),
   1248             amount: decimal("10"),
   1249             exchange_base_url: url("https://exchange.test.com/"),
   1250             wtid: ShortHashCode::rand(),
   1251             creditor_id: 31000163100000000,
   1252             creditor_name: "Name".into(),
   1253         };
   1254         let now = now_sql_stable_timestamp();
   1255         let later = now + Span::new().hours(2);
   1256         // Insert
   1257         assert_eq!(
   1258             db::make_transfer(&mut db, &req, &now)
   1259                 .await
   1260                 .expect("transfer"),
   1261             TransferResult::Success {
   1262                 id: 1,
   1263                 initiated_at: now
   1264             }
   1265         );
   1266         // Idempotent
   1267         assert_eq!(
   1268             db::make_transfer(&mut db, &req, &later)
   1269                 .await
   1270                 .expect("transfer"),
   1271             TransferResult::Success {
   1272                 id: 1,
   1273                 initiated_at: now
   1274             }
   1275         );
   1276         // Request UID reuse
   1277         assert_eq!(
   1278             db::make_transfer(
   1279                 &mut db,
   1280                 &Transfer {
   1281                     wtid: ShortHashCode::rand(),
   1282                     ..req.clone()
   1283                 },
   1284                 &now
   1285             )
   1286             .await
   1287             .expect("transfer"),
   1288             TransferResult::RequestUidReuse
   1289         );
   1290         // wtid reuse
   1291         assert_eq!(
   1292             db::make_transfer(
   1293                 &mut db,
   1294                 &Transfer {
   1295                     request_uid: HashCode::rand(),
   1296                     ..req.clone()
   1297                 },
   1298                 &now
   1299             )
   1300             .await
   1301             .expect("transfer"),
   1302             TransferResult::WtidReuse
   1303         );
   1304         // Many
   1305         assert_eq!(
   1306             db::make_transfer(
   1307                 &mut db,
   1308                 &Transfer {
   1309                     request_uid: HashCode::rand(),
   1310                     wtid: ShortHashCode::rand(),
   1311                     ..req
   1312                 },
   1313                 &later
   1314             )
   1315             .await
   1316             .expect("transfer"),
   1317             TransferResult::Success {
   1318                 id: 2,
   1319                 initiated_at: later.into()
   1320             }
   1321         );
   1322 
   1323         // Get
   1324         assert!(
   1325             db::transfer_by_id(&mut db, 1, &CURRENCY, &ROOT)
   1326                 .await
   1327                 .unwrap()
   1328                 .is_some()
   1329         );
   1330         assert!(
   1331             db::transfer_by_id(&mut db, 2, &CURRENCY, &ROOT)
   1332                 .await
   1333                 .unwrap()
   1334                 .is_some()
   1335         );
   1336         assert!(
   1337             db::transfer_by_id(&mut db, 3, &CURRENCY, &ROOT)
   1338                 .await
   1339                 .unwrap()
   1340                 .is_none()
   1341         );
   1342         assert_eq!(
   1343             db::transfer_page(&mut db, &None, &CURRENCY, &ROOT, &Page::default())
   1344                 .await
   1345                 .unwrap()
   1346                 .len(),
   1347             2
   1348         );
   1349     }
   1350 
   1351     #[tokio::test]
   1352     async fn bounce() {
   1353         let (mut db, _) = setup().await;
   1354 
   1355         let amount = decimal("10");
   1356         let now = now_sql_stable_timestamp();
   1357 
   1358         // Bounce
   1359         assert_eq!(
   1360             db::register_bounced_tx_in(
   1361                 &mut db,
   1362                 &TxIn {
   1363                     transfer_id: 12,
   1364                     tx_id: None,
   1365                     amount,
   1366                     subject: "subject".to_owned(),
   1367                     debtor_id: 31000163100000000,
   1368                     debtor_name: "Name".to_string(),
   1369                     valued_at: now
   1370                 },
   1371                 22,
   1372                 "good reason",
   1373                 &now
   1374             )
   1375             .await
   1376             .expect("bounce"),
   1377             BounceResult {
   1378                 tx_id: 1,
   1379                 tx_new: true
   1380             }
   1381         );
   1382         // Idempotent
   1383         assert_eq!(
   1384             db::register_bounced_tx_in(
   1385                 &mut db,
   1386                 &TxIn {
   1387                     transfer_id: 12,
   1388                     tx_id: None,
   1389                     amount: amount.clone(),
   1390                     subject: "subject".to_owned(),
   1391                     debtor_id: 31000163100000000,
   1392                     debtor_name: "Name".to_string(),
   1393                     valued_at: now
   1394                 },
   1395                 22,
   1396                 "good reason",
   1397                 &now
   1398             )
   1399             .await
   1400             .expect("bounce"),
   1401             BounceResult {
   1402                 tx_id: 1,
   1403                 tx_new: false
   1404             }
   1405         );
   1406 
   1407         // Many
   1408         assert_eq!(
   1409             db::register_bounced_tx_in(
   1410                 &mut db,
   1411                 &TxIn {
   1412                     transfer_id: 13,
   1413                     tx_id: None,
   1414                     amount: amount.clone(),
   1415                     subject: "subject".to_owned(),
   1416                     debtor_id: 31000163100000000,
   1417                     debtor_name: "Name".to_string(),
   1418                     valued_at: now
   1419                 },
   1420                 23,
   1421                 "good reason",
   1422                 &now
   1423             )
   1424             .await
   1425             .expect("bounce"),
   1426             BounceResult {
   1427                 tx_id: 2,
   1428                 tx_new: true
   1429             }
   1430         );
   1431     }
   1432 
   1433     #[tokio::test]
   1434     async fn status() {
   1435         let (mut db, _) = setup().await;
   1436 
   1437         async fn check_status(
   1438             db: &mut PgConnection,
   1439             id: u64,
   1440             status: TransferState,
   1441             msg: Option<&str>,
   1442         ) {
   1443             let transfer = db::transfer_by_id(db, id, &CURRENCY, &ROOT)
   1444                 .await
   1445                 .unwrap()
   1446                 .unwrap();
   1447             assert_eq!(
   1448                 (status, msg),
   1449                 (transfer.status, transfer.status_msg.as_deref())
   1450             );
   1451         }
   1452 
   1453         // Unknown transfer
   1454         db::initiated_submit_permanent_failure(&mut db, 1, "msg")
   1455             .await
   1456             .unwrap();
   1457         db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
   1458             .await
   1459             .unwrap();
   1460         assert_eq!(
   1461             db::initiated_chargeback_failure(&mut db, 1).await.unwrap(),
   1462             ChargebackFailureResult::Unknown
   1463         );
   1464 
   1465         // Failure
   1466         db::make_transfer(
   1467             &mut db,
   1468             &Transfer {
   1469                 request_uid: HashCode::rand(),
   1470                 amount: decimal("1"),
   1471                 exchange_base_url: url("https://exchange.test.com/"),
   1472                 wtid: ShortHashCode::rand(),
   1473                 creditor_id: 31000163100000000,
   1474                 creditor_name: "Name".into(),
   1475             },
   1476             &Timestamp::now(),
   1477         )
   1478         .await
   1479         .expect("transfer");
   1480         check_status(&mut db, 1, TransferState::pending, None).await;
   1481         db::initiated_submit_permanent_failure(&mut db, 1, "error status")
   1482             .await
   1483             .unwrap();
   1484         check_status(
   1485             &mut db,
   1486             1,
   1487             TransferState::permanent_failure,
   1488             Some("error status"),
   1489         )
   1490         .await;
   1491 
   1492         // Success
   1493         db::make_transfer(
   1494             &mut db,
   1495             &Transfer {
   1496                 request_uid: HashCode::rand(),
   1497                 amount: decimal("1"),
   1498                 exchange_base_url: url("https://exchange.test.com/"),
   1499                 wtid: ShortHashCode::rand(),
   1500                 creditor_id: 31000163100000000,
   1501                 creditor_name: "Name".into(),
   1502             },
   1503             &Timestamp::now(),
   1504         )
   1505         .await
   1506         .expect("transfer");
   1507         check_status(&mut db, 2, TransferState::pending, None).await;
   1508         db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3)
   1509             .await
   1510             .unwrap();
   1511         check_status(&mut db, 2, TransferState::pending, None).await;
   1512         db::register_tx_out(
   1513             &mut db,
   1514             &TxOut {
   1515                 transfer_id: 5,
   1516                 tx_id: Some(3),
   1517                 amount: decimal("2"),
   1518                 subject: "".to_string(),
   1519                 creditor_id: 31000163100000000,
   1520                 creditor_name: "Name".to_string(),
   1521                 valued_at: Timestamp::now(),
   1522             },
   1523             &TxOutKind::Simple,
   1524             &Timestamp::now(),
   1525         )
   1526         .await
   1527         .unwrap();
   1528         check_status(&mut db, 2, TransferState::success, None).await;
   1529 
   1530         // Chargeback
   1531         assert_eq!(
   1532             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1533             ChargebackFailureResult::Known(2)
   1534         );
   1535         check_status(
   1536             &mut db,
   1537             2,
   1538             TransferState::late_failure,
   1539             Some("charged back"),
   1540         )
   1541         .await;
   1542         assert_eq!(
   1543             db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
   1544             ChargebackFailureResult::Idempotent(2)
   1545         );
   1546     }
   1547 
   1548     #[tokio::test]
   1549     async fn batch() {
   1550         let (mut db, _) = setup().await;
   1551         let start = Timestamp::now();
   1552 
   1553         // Empty db
   1554         let pendings = db::pending_batch(&mut db, &start)
   1555             .await
   1556             .expect("pending_batch");
   1557         assert_eq!(pendings.len(), 0);
   1558 
   1559         // Some transfers
   1560         for i in 0..3 {
   1561             db::make_transfer(
   1562                 &mut db,
   1563                 &Transfer {
   1564                     request_uid: HashCode::rand(),
   1565                     amount: decimal(format!("{}", i + 1)),
   1566                     exchange_base_url: url("https://exchange.test.com/"),
   1567                     wtid: ShortHashCode::rand(),
   1568                     creditor_id: 31000163100000000,
   1569                     creditor_name: "Name".into(),
   1570                 },
   1571                 &Timestamp::now(),
   1572             )
   1573             .await
   1574             .expect("transfer");
   1575         }
   1576         let pendings = db::pending_batch(&mut db, &start)
   1577             .await
   1578             .expect("pending_batch");
   1579         assert_eq!(pendings.len(), 3);
   1580 
   1581         // Max 100 txs in batch
   1582         for i in 0..100 {
   1583             db::make_transfer(
   1584                 &mut db,
   1585                 &Transfer {
   1586                     request_uid: HashCode::rand(),
   1587                     amount: decimal(format!("{}", i + 1)),
   1588                     exchange_base_url: url("https://exchange.test.com/"),
   1589                     wtid: ShortHashCode::rand(),
   1590                     creditor_id: 31000163100000000,
   1591                     creditor_name: "Name".into(),
   1592                 },
   1593                 &Timestamp::now(),
   1594             )
   1595             .await
   1596             .expect("transfer");
   1597         }
   1598         let pendings = db::pending_batch(&mut db, &start)
   1599             .await
   1600             .expect("pending_batch");
   1601         assert_eq!(pendings.len(), 100);
   1602 
   1603         // Skip uploaded
   1604         for i in 0..=10 {
   1605             db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
   1606                 .await
   1607                 .expect("status success");
   1608         }
   1609         let pendings = db::pending_batch(&mut db, &start)
   1610             .await
   1611             .expect("pending_batch");
   1612         assert_eq!(pendings.len(), 93);
   1613 
   1614         // Skip failed
   1615         for i in 0..=10 {
   1616             db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure")
   1617                 .await
   1618                 .expect("status failure");
   1619         }
   1620         let pendings = db::pending_batch(&mut db, &start)
   1621             .await
   1622             .expect("pending_batch");
   1623         assert_eq!(pendings.len(), 83);
   1624     }
   1625 }