depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

db.rs (38035B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use bitcoin::{Address, BlockHash, Txid, hashes::Hash};
     18 use compact_str::CompactString;
     19 use depolymerizer_common::status::DebitStatus;
     20 use jiff::Timestamp;
     21 use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow};
     22 use taler_api::{
     23     db::{BindHelper as _, TypeHelper as _, history, page},
     24     serialized,
     25     subject::IncomingSubject,
     26 };
     27 use taler_common::{
     28     api::{
     29         EddsaPublicKey, EddsaSignature, ShortHashCode,
     30         params::{History, Page},
     31         revenue::RevenueIncomingBankTransaction,
     32         wire::{
     33             IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest,
     34             TransferResponse, TransferState, TransferStatus,
     35         },
     36     },
     37     db::IncomingType,
     38     types::amount::{Amount, Currency},
     39 };
     40 use tokio::sync::watch::Receiver;
     41 use url::Url;
     42 
     43 use crate::{
     44     payto::FullBtcPayto,
     45     sql::{sql_addr, sql_btc_amount, sql_generic_payto, sql_payto},
     46 };
     47 
     48 /// Initialize the worker status
     49 pub async fn init_status(db: &PgPool) -> sqlx::Result<()> {
     50     sqlx::query(
     51         "INSERT INTO state (name, value) VALUES ('status', $1) ON CONFLICT (name) DO NOTHING",
     52     )
     53     .bind([1u8])
     54     .execute(db)
     55     .await?;
     56     Ok(())
     57 }
     58 
     59 /// Get the worker status
     60 pub async fn get_status(db: &PgPool) -> sqlx::Result<Option<[u8; 1]>> {
     61     sqlx::query_scalar("SELECT value FROM state WHERE name = 'status'")
     62         .fetch_optional(db)
     63         .await
     64 }
     65 
     66 /// Update the worker status
     67 pub async fn update_status(db: &mut PgConnection, new_status: bool) -> sqlx::Result<()> {
     68     sqlx::query("UPDATE state SET value=$1 WHERE name='status'")
     69         .bind([new_status as u8])
     70         .execute(&mut *db)
     71         .await?;
     72     sqlx::query("NOTIFY status").execute(db).await?;
     73     Ok(())
     74 }
     75 
     76 /// Initialize the worker sync state
     77 pub async fn init_sync_state(db: &PgPool, hash: &BlockHash, reset: bool) -> sqlx::Result<()> {
     78     sqlx::query(if reset {
     79         "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO UPDATE SET value=$1"
     80     } else {
     81         "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING"
     82     })
     83     .bind(hash.as_byte_array())
     84     .execute(db)
     85     .await?;
     86     Ok(())
     87 }
     88 
     89 /// Get the current worker sync state
     90 pub async fn get_sync_state(db: &mut PgConnection) -> sqlx::Result<BlockHash> {
     91     sqlx::query("SELECT value FROM state WHERE name='last_hash'")
     92         .try_map(|r: PgRow| r.try_get_map(0, BlockHash::from_slice))
     93         .fetch_one(db)
     94         .await
     95 }
     96 
     97 /// Update the worker sync state if it hasn't changed yet
     98 pub async fn swap_sync_state(
     99     db: &mut PgConnection,
    100     from: &BlockHash,
    101     to: &BlockHash,
    102 ) -> sqlx::Result<()> {
    103     sqlx::query("UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2")
    104         .bind(to.as_byte_array())
    105         .bind(from.as_byte_array())
    106         .execute(db)
    107         .await?;
    108     Ok(())
    109 }
    110 
    111 #[derive(Debug)]
    112 pub enum TransferResult {
    113     Success(TransferResponse),
    114     RequestUidReuse,
    115     WtidReuse,
    116 }
    117 
    118 /// Initiate a new Taler transfer idempotently
    119 pub async fn transfer(
    120     db: &PgPool,
    121     creditor: &FullBtcPayto,
    122     transfer: &TransferRequest,
    123 ) -> sqlx::Result<TransferResult> {
    124     serialized!(
    125         sqlx::query(
    126             "
    127             SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at
    128             FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8)
    129         ",
    130         )
    131         .bind(transfer.amount)
    132         .bind(transfer.exchange_base_url.as_str())
    133         .bind(creditor.0.to_string())
    134         .bind(&creditor.name)
    135         .bind(transfer.request_uid.as_slice())
    136         .bind(transfer.wtid.as_slice())
    137         .bind(transfer.metadata.as_deref())
    138         .bind_timestamp(&Timestamp::now())
    139         .try_map(|r: PgRow| {
    140             Ok(if r.try_get_flag("out_request_uid_reuse")? {
    141                 TransferResult::RequestUidReuse
    142             } else if r.try_get_flag("out_wtid_reuse")? {
    143                 TransferResult::WtidReuse
    144             } else {
    145                 TransferResult::Success(TransferResponse {
    146                     row_id: r.try_get_u64("out_transfer_row_id")?,
    147                     timestamp: r.try_get_taler_timestamp("out_created_at")?,
    148                 })
    149             })
    150         })
    151         .fetch_one(db)
    152     )
    153 }
    154 
    155 /// Paginate initiated Taler transfers
    156 pub async fn transfer_page(
    157     db: &PgPool,
    158     status: &Option<TransferState>,
    159     params: &Page,
    160     currency: &Currency,
    161 ) -> sqlx::Result<Vec<TransferListStatus>> {
    162     let status = match status {
    163         Some(s) => match s {
    164             TransferState::pending => Some(DebitStatus::requested),
    165             TransferState::success => Some(DebitStatus::sent),
    166             TransferState::transient_failure
    167             | TransferState::permanent_failure
    168             | TransferState::late_failure => {
    169                 return Ok(Vec::new());
    170             }
    171         },
    172         None => None,
    173     };
    174 
    175     page(
    176         db,
    177         params,
    178         "transfer_id",
    179         || {
    180             let mut sql = QueryBuilder::new(
    181                 "
    182                     SELECT
    183                         transfer_id,
    184                         status,
    185                         amount,
    186                         credit_acc,
    187                         credit_name,
    188                         created_at
    189                     FROM transfer WHERE
    190                 ",
    191             );
    192             if let Some(status) = status {
    193                 sql.push(" status = ").push_bind(status).push(" AND ");
    194             }
    195             sql
    196         },
    197         |r: PgRow| {
    198             Ok(TransferListStatus {
    199                 row_id: r.try_get_u64(0)?,
    200                 status: match r.try_get(1)? {
    201                     DebitStatus::requested | DebitStatus::sent => TransferState::pending,
    202                     DebitStatus::confirmed => TransferState::success,
    203                 },
    204                 amount: r.try_get_amount(2, currency)?,
    205                 credit_account: sql_payto(&r, 3, 4)?,
    206                 timestamp: r.try_get_taler_timestamp(5)?,
    207             })
    208         },
    209     )
    210     .await
    211 }
    212 
    213 /// Get a Taler transfer info
    214 pub async fn transfer_by_id(
    215     db: &PgPool,
    216     id: u64,
    217     currency: &Currency,
    218 ) -> sqlx::Result<Option<TransferStatus>> {
    219     serialized!(
    220         sqlx::query(
    221             "
    222             SELECT
    223                 status,
    224                 amount,
    225                 exchange_url,
    226                 wtid,
    227                 credit_acc,
    228                 credit_name,
    229                 metadata,
    230                 created_at
    231             FROM transfer WHERE transfer_id = $1
    232         ",
    233         )
    234         .bind(id as i64)
    235         .try_map(|r: PgRow| {
    236             Ok(TransferStatus {
    237                 status: match r.try_get(0)? {
    238                     DebitStatus::requested | DebitStatus::sent => TransferState::pending,
    239                     DebitStatus::confirmed => TransferState::success,
    240                 },
    241                 status_msg: None,
    242                 amount: r.try_get_amount(1, currency)?,
    243                 origin_exchange_url: r.try_get(2)?,
    244                 wtid: r.try_get(3)?,
    245                 credit_account: sql_payto(&r, 4, 5)?,
    246                 metadata: r.try_get(6)?,
    247                 timestamp: r.try_get_taler_timestamp(7)?,
    248             })
    249         })
    250         .fetch_optional(db)
    251     )
    252 }
    253 
    254 /// Fetch outgoing Taler transactions history
    255 pub async fn outgoing_history(
    256     db: &PgPool,
    257     params: &History,
    258     currency: &Currency,
    259     listen: impl FnOnce() -> Receiver<i64>,
    260 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    261     history(
    262         db,
    263         "tx_out_id",
    264         params,
    265         listen,
    266         || {
    267             QueryBuilder::new(
    268                 "
    269         SELECT
    270             tx_out_id,
    271             tx_out.created_at,
    272             tx_out.amount,
    273             taler_out.wtid,
    274             tx_out.credit_acc,
    275             transfer.credit_name,
    276             taler_out.exchange_base_url,
    277             taler_out.metadata
    278         FROM tx_out
    279             JOIN taler_out USING (tx_out_id)
    280             LEFT JOIN transfer USING (txid)
    281         WHERE
    282         ",
    283             )
    284         },
    285         |r| {
    286             Ok(OutgoingBankTransaction {
    287                 row_id: r.try_get_u64(0)?,
    288                 date: r.try_get_taler_timestamp(1)?,
    289                 amount: r.try_get_amount(2, currency)?,
    290                 wtid: r.try_get(3)?,
    291                 credit_account: sql_payto(&r, 4, 5)?,
    292                 exchange_base_url: r.try_get_url(6)?,
    293                 debit_fee: None, // TODO we can actually get this information
    294                 metadata: r.try_get(7)?,
    295             })
    296         },
    297     )
    298     .await
    299 }
    300 
    301 /// Fetch incoming Taler transactions history
    302 pub async fn incoming_history(
    303     db: &PgPool,
    304     params: &History,
    305     currency: &Currency,
    306     listen: impl FnOnce() -> Receiver<i64>,
    307 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    308     history(
    309         db,
    310         "tx_in_id",
    311         params,
    312         listen,
    313         || {
    314             QueryBuilder::new(
    315                 "
    316                 SELECT
    317                     tx_in_id,
    318                     received_at,
    319                     amount,
    320                     debit_acc,
    321                     type,
    322                     metadata,
    323                     authorization_pub,
    324                     authorization_sig
    325                 FROM tx_in JOIN taler_in USING (tx_in_id)
    326                 WHERE
    327                 ",
    328             )
    329         },
    330         |r| {
    331             Ok(match r.try_get(4)? {
    332                 IncomingType::reserve => IncomingBankTransaction::Reserve {
    333                     row_id: r.try_get_u64(0)?,
    334                     date: r.try_get_taler_timestamp(1)?,
    335                     amount: r.try_get_amount(2, currency)?,
    336                     reserve_pub: r.try_get(5)?,
    337                     debit_account: sql_generic_payto(&r, 3)?,
    338                     credit_fee: None, // TODO store this
    339                     authorization_pub: r.try_get(6)?,
    340                     authorization_sig: r.try_get(7)?,
    341                 },
    342                 IncomingType::kyc => IncomingBankTransaction::Kyc {
    343                     row_id: r.try_get_u64(0)?,
    344                     date: r.try_get_taler_timestamp(1)?,
    345                     amount: r.try_get_amount(2, currency)?,
    346                     account_pub: r.try_get(5)?,
    347                     debit_account: sql_generic_payto(&r, 3)?,
    348                     credit_fee: None, // TODO store this
    349                     authorization_pub: r.try_get(6)?,
    350                     authorization_sig: r.try_get(7)?,
    351                 },
    352                 IncomingType::map => unimplemented!("MAP are never listed in the history"),
    353             })
    354         },
    355     )
    356     .await
    357 }
    358 
    359 /// Fetch incoming Taler transactions history
    360 pub async fn revenue_history(
    361     db: &PgPool,
    362     params: &History,
    363     currency: &Currency,
    364     listen: impl FnOnce() -> Receiver<i64>,
    365 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
    366     history(
    367         db,
    368         "tx_in_id",
    369         params,
    370         listen,
    371         || {
    372             QueryBuilder::new(
    373                 "
    374                 SELECT
    375                     tx_in_id,
    376                     received_at,
    377                     amount,
    378                     debit_acc
    379                 FROM tx_in
    380                 WHERE
    381                 ",
    382             )
    383         },
    384         |r| {
    385             Ok(RevenueIncomingBankTransaction {
    386                 row_id: r.try_get_u64(0)?,
    387                 date: r.try_get_taler_timestamp(1)?,
    388                 amount: r.try_get_amount(2, currency)?,
    389                 debit_account: sql_generic_payto(&r, 3)?,
    390                 credit_fee: None, // TODO store this
    391                 subject: String::new(),
    392             })
    393         },
    394     )
    395     .await
    396 }
    397 
    398 #[derive(Debug, PartialEq, Eq)]
    399 pub enum AddIncomingResult {
    400     Success {
    401         new: bool,
    402         pending: bool,
    403         row_id: u64,
    404         valued_at: Timestamp,
    405     },
    406     ReservePubReuse,
    407     UnknownMapping,
    408     MappingReuse,
    409 }
    410 
    411 /// Register a fake Taler credit
    412 pub async fn register_tx_in_admin(
    413     db: &PgPool,
    414     amount: &Amount,
    415     debit_acc: &Address,
    416     received: &Timestamp,
    417     metadata: &IncomingSubject,
    418 ) -> sqlx::Result<AddIncomingResult> {
    419     sqlx::query(
    420         "
    421             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    422             FROM register_tx_in(NULL, $1, $2, $3, $4, $5)
    423         ",
    424     )
    425     .bind(amount)
    426     .bind(debit_acc.to_string())
    427     .bind_timestamp(received)
    428     .bind(metadata.ty())
    429     .bind(metadata.key())
    430     .try_map(|r: PgRow| {
    431         Ok(if r.try_get_flag(0)? {
    432             AddIncomingResult::ReservePubReuse
    433         } else if r.try_get_flag(1)? {
    434             AddIncomingResult::MappingReuse
    435         } else if r.try_get_flag(2)? {
    436             AddIncomingResult::UnknownMapping
    437         } else {
    438             AddIncomingResult::Success {
    439                 row_id: r.try_get_u64(3)?,
    440                 valued_at: r.try_get_timestamp(4)?,
    441                 new: r.try_get_flag(5)?,
    442                 pending: r.try_get_flag(6)?
    443             }
    444         })
    445     })
    446     .fetch_one(db)
    447     .await
    448 }
    449 
    450 /// Register a Taler credit
    451 pub async fn register_tx_in<'a>(
    452     e: impl PgExecutor<'a>,
    453     txid: &Txid,
    454     amount: &Amount,
    455     debit_acc: &Address,
    456     received: &Timestamp,
    457     subject: &Option<IncomingSubject>,
    458 ) -> sqlx::Result<AddIncomingResult> {
    459     sqlx::query(
    460         "
    461             SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending
    462             FROM register_tx_in($1, $2, $3, $4, $5, $6)
    463         ",
    464     )
    465     .bind(txid.as_byte_array())
    466     .bind(amount)
    467     .bind(debit_acc.to_string())
    468     .bind_timestamp(received)
    469     .bind(subject.as_ref().map(|it| it.ty()))
    470     .bind(subject.as_ref().map(|it| it.key()))
    471     .try_map(|r: PgRow| {
    472         Ok(if r.try_get_flag(0)? {
    473             AddIncomingResult::ReservePubReuse
    474         } else if r.try_get_flag(1)? {
    475             AddIncomingResult::MappingReuse
    476         } else if r.try_get_flag(2)? {
    477             AddIncomingResult::UnknownMapping
    478         } else {
    479             AddIncomingResult::Success {
    480                 row_id: r.try_get_u64(3)?,
    481                 valued_at: r.try_get_timestamp(4)?,
    482                 new: r.try_get_flag(5)?,
    483                 pending: r.try_get_flag(6)?,
    484             }
    485         })
    486     })
    487     .fetch_one(e)
    488     .await
    489 }
    490 
    491 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
    492 pub enum RegistrationResult {
    493     Success,
    494     ReservePubReuse,
    495     SubjectReuse,
    496 }
    497 
    498 pub async fn transfer_register(
    499     db: &PgPool,
    500     ty: IncomingType,
    501     account_pub: &EddsaPublicKey,
    502     auth_pub: &EddsaPublicKey,
    503     auth_sig: &EddsaSignature,
    504     recurrent: bool,
    505     timestamp: &Timestamp,
    506 ) -> sqlx::Result<RegistrationResult> {
    507     serialized!(
    508         sqlx::query(
    509             "
    510         SELECT out_reserve_pub_reuse
    511         FROM register_prepared_transfers (
    512             $1,$2,$3,$4,$5,$6
    513         )
    514         ",
    515         )
    516         .bind(ty)
    517         .bind(account_pub)
    518         .bind(auth_pub)
    519         .bind(auth_sig)
    520         .bind(recurrent)
    521         .bind_timestamp(timestamp)
    522         .try_map(|r: PgRow| {
    523             Ok(if r.try_get_flag(0)? {
    524                 RegistrationResult::ReservePubReuse
    525             } else {
    526                 RegistrationResult::Success
    527             })
    528         })
    529         .fetch_one(db)
    530     )
    531 }
    532 
    533 pub async fn transfer_unregister(
    534     db: &PgPool,
    535     auth_pub: &EddsaPublicKey,
    536     timestamp: &Timestamp,
    537 ) -> sqlx::Result<bool> {
    538     serialized!(
    539         sqlx::query_scalar("SELECT out_found FROM delete_prepared_transfers($1,$2)")
    540             .bind(auth_pub)
    541             .bind_timestamp(timestamp)
    542             .fetch_one(db)
    543     )
    544 }
    545 
    546 /// Update a transaction id after bumping it
    547 pub async fn bump_tx_id(
    548     db: &mut PgConnection,
    549     to: &Txid,
    550     wtid: &ShortHashCode,
    551 ) -> sqlx::Result<()> {
    552     sqlx::query("UPDATE transfer SET txid=$1 WHERE wtid=$2")
    553         .bind(to.as_byte_array())
    554         .bind(wtid)
    555         .execute(db)
    556         .await?;
    557     Ok(())
    558 }
    559 
    560 /// Initiate a bounce
    561 pub async fn bounce<'a>(
    562     e: impl PgExecutor<'a>,
    563     txid: &Txid,
    564     amount: &Amount,
    565     debit_acc: &Address,
    566     received: &Timestamp,
    567     reason: &str,
    568 ) -> sqlx::Result<()> {
    569     sqlx::query("SELECT FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6)")
    570         .bind(txid.as_byte_array())
    571         .bind(amount)
    572         .bind(debit_acc.to_string())
    573         .bind_timestamp(received)
    574         .bind(reason)
    575         .bind_timestamp(&Timestamp::now())
    576         .execute(e)
    577         .await?;
    578     Ok(())
    579 }
    580 
    581 #[derive(Debug, PartialEq, Eq)]
    582 pub enum ProblematicTx {
    583     Taler {
    584         txid: Txid,
    585         addr: Address,
    586         ty: IncomingType,
    587         metadata: EddsaPublicKey,
    588     },
    589     Bounce {
    590         txid: Txid,
    591         bounced_in: Txid,
    592     },
    593     Simple {
    594         txid: Txid,
    595     },
    596 }
    597 
    598 /// Handle transactions being removed during a reorganization
    599 pub async fn reorg<'a>(e: impl PgExecutor<'a>, ids: &[Txid]) -> sqlx::Result<Vec<ProblematicTx>> {
    600     // Any incoming transactions that is currently considered final ('confirmed') is a potential correctness issues
    601     // Removed outgoing transactions will be retried automatically by the node/wallet and therefore
    602     // do not mandate a full adapter stop
    603 
    604     sqlx::query(
    605         "
    606         SELECT txid, NULL, type, debit_acc, metadata
    607         FROM tx_in JOIN taler_in USING (tx_in_id) WHERE txid = ANY($1)
    608         UNION ALL
    609         SELECT tx_in.txid, bounced.txid, NULL, NULL, NULL
    610         from tx_in JOIN bounced USING (tx_in_id) WHERE tx_in.txid = ANY($1)
    611     ",
    612     )
    613     .bind(ids.iter().map(|it| it.as_byte_array()).collect::<Vec<_>>())
    614     .try_map(|r: PgRow| {
    615         let txid = r.try_get_map(0, Txid::from_slice)?;
    616         Ok(
    617             if let Some(bounced_in) = r.try_get_opt_map(1, Txid::from_slice)? {
    618                 ProblematicTx::Bounce { txid, bounced_in }
    619             } else if let Some(ty) = r.try_get(2)? {
    620                 ProblematicTx::Taler {
    621                     txid,
    622                     ty,
    623                     addr: sql_addr(&r, 3)?,
    624                     metadata: r.try_get(4)?,
    625                 }
    626             } else {
    627                 ProblematicTx::Simple { txid }
    628             },
    629         )
    630     })
    631     .fetch_all(e)
    632     .await
    633 }
    634 
    635 #[derive(Debug)]
    636 pub enum SyncOutResult {
    637     New,
    638     Replaced,
    639     Recovered,
    640     None,
    641 }
    642 
    643 #[derive(Debug)]
    644 pub enum TxOutKind<'a> {
    645     Simple,
    646     Bounce(Txid),
    647     Talerable {
    648         wtid: &'a ShortHashCode,
    649         url: &'a Url,
    650         metadata: Option<&'a str>,
    651     },
    652 }
    653 
    654 pub async fn sync_out<'a>(
    655     e: impl PgExecutor<'a>,
    656     txid: &Txid,
    657     replace_txid: Option<&Txid>,
    658     amount: &Amount,
    659     credit_acc: &Address,
    660     kind: &TxOutKind<'_>,
    661     created: &Timestamp,
    662 ) -> sqlx::Result<SyncOutResult> {
    663     let query = sqlx::query(
    664         "
    665             SELECT out_replaced, out_recovered, out_new
    666             FROM sync_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
    667         ",
    668     )
    669     .bind(txid.as_byte_array())
    670     .bind(replace_txid.map(|it| it.as_byte_array()))
    671     .bind(amount)
    672     .bind(credit_acc.to_string());
    673     match kind {
    674         TxOutKind::Simple => query
    675             .bind(None::<&[u8]>)
    676             .bind(None::<&str>)
    677             .bind(None::<&str>)
    678             .bind(None::<&[u8]>),
    679         TxOutKind::Bounce(bounced) => query
    680             .bind(None::<&[u8]>)
    681             .bind(None::<&str>)
    682             .bind(None::<&str>)
    683             .bind(bounced.as_byte_array()),
    684         TxOutKind::Talerable {
    685             wtid,
    686             url,
    687             metadata,
    688         } => query
    689             .bind(wtid)
    690             .bind(url.as_str())
    691             .bind(metadata)
    692             .bind(None::<&[u8]>),
    693     }
    694     .bind_timestamp(created)
    695     .bind_timestamp(&Timestamp::now())
    696     .try_map(|r: PgRow| {
    697         Ok(if r.try_get_flag(0)? {
    698             SyncOutResult::Replaced
    699         } else if r.try_get_flag(1)? {
    700             SyncOutResult::Recovered
    701         } else if r.try_get_flag(2)? {
    702             SyncOutResult::New
    703         } else {
    704             SyncOutResult::None
    705         })
    706     })
    707     .fetch_one(e)
    708     .await
    709 }
    710 
    711 pub async fn pending_transfer<'a>(
    712     e: impl PgExecutor<'a>,
    713     currency: &Currency,
    714 ) -> sqlx::Result<
    715     Option<(
    716         i64,
    717         bitcoin::Amount,
    718         ShortHashCode,
    719         Address,
    720         Url,
    721         Option<CompactString>,
    722     )>,
    723 > {
    724     sqlx::query(
    725         "
    726         SELECT
    727           transfer_id,
    728           amount,
    729           wtid,
    730           credit_acc,
    731           exchange_url,
    732           metadata
    733         FROM transfer
    734         WHERE status='requested'
    735         ORDER BY created_at LIMIT 1",
    736     )
    737     .try_map(|r: PgRow| {
    738         Ok((
    739             r.try_get(0)?,
    740             sql_btc_amount(&r, 1, currency)?,
    741             r.try_get(2)?,
    742             sql_addr(&r, 3)?,
    743             r.try_get_parse(4)?,
    744             r.try_get(5)?,
    745         ))
    746     })
    747     .fetch_optional(e)
    748     .await
    749 }
    750 
    751 /// Update transfer status to 'sent' and bind it to a txid
    752 pub async fn transfer_sent<'a>(e: impl PgExecutor<'a>, id: i64, txid: &Txid) -> sqlx::Result<()> {
    753     sqlx::query("UPDATE transfer SET status='sent', txid=$2 WHERE transfer_id=$1")
    754         .bind(id)
    755         .bind(txid.as_byte_array())
    756         .execute(e)
    757         .await?;
    758     Ok(())
    759 }
    760 
    761 /// Reset the state of a conflicted transfer
    762 pub async fn transfer_conflict<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> {
    763     Ok(
    764         sqlx::query("UPDATE transfer SET status='requested',txid=NULL WHERE txid=$1")
    765             .bind(id.as_byte_array())
    766             .execute(e)
    767             .await?
    768             .rows_affected()
    769             > 0,
    770     )
    771 }
    772 
    773 pub async fn pending_bounce<'a>(
    774     e: impl PgExecutor<'a>,
    775 ) -> sqlx::Result<Option<(i64, Txid, Option<String>)>> {
    776     sqlx::query(
    777         "
    778         SELECT
    779           tx_in_id,
    780           tx_in.txid,
    781           reason
    782         FROM bounced
    783           JOIN tx_in USING (tx_in_id)
    784         WHERE status='requested' ORDER BY received_at LIMIT 1
    785     ",
    786     )
    787     .try_map(|r: PgRow| {
    788         Ok((
    789             r.try_get(0)?,
    790             r.try_get_map(1, Txid::from_slice)?,
    791             r.try_get(2)?,
    792         ))
    793     })
    794     .fetch_optional(e)
    795     .await
    796 }
    797 
    798 /// Update bounce status to 'sent' and bind it to a txid
    799 pub async fn bounce_sent<'a>(e: impl PgExecutor<'a>, id: i64, txid: &Txid) -> sqlx::Result<()> {
    800     sqlx::query("UPDATE bounced SET status='sent', txid=$2 WHERE tx_in_id=$1")
    801         .bind(id)
    802         .bind(txid.as_byte_array())
    803         .execute(e)
    804         .await?;
    805     Ok(())
    806 }
    807 
    808 /// Reset the state of a conflicted bounce
    809 pub async fn bounce_conflict<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> {
    810     Ok(
    811         sqlx::query("UPDATE bounced SET status='requested',txid=NULL where txid=$1")
    812             .bind(id.as_byte_array())
    813             .execute(e)
    814             .await?
    815             .rows_affected()
    816             > 0,
    817     )
    818 }
    819 
    820 pub enum SyncBounceResult {
    821     New,
    822     Recovered,
    823     None,
    824 }
    825 
    826 #[cfg(test)]
    827 pub mod test {
    828     use std::{assert_matches, str::FromStr, sync::LazyLock};
    829 
    830     use bitcoin::{
    831         Address, BlockHash, Txid,
    832         address::NetworkUnchecked,
    833         hashes::{Hash as _, sha256d::Hash},
    834     };
    835     use jiff::Span;
    836     use sqlx::{PgPool, Postgres, pool::PoolConnection, postgres::PgRow};
    837     use taler_api::{db::TypeHelper as _, notification::dummy_listen, subject::IncomingSubject};
    838     use taler_common::{
    839         api::{EddsaPublicKey, HashCode, ShortHashCode, params::History, wire::TransferRequest},
    840         types::{
    841             amount::{Currency, amount},
    842             url,
    843             utils::now_sql_stable_ts,
    844         },
    845     };
    846 
    847     use crate::{
    848         CONFIG_SOURCE,
    849         api::test::CLIENT,
    850         db::{
    851             AddIncomingResult, ProblematicTx, SyncOutResult, TransferResult, TxOutKind, bounce,
    852             bounce_sent, bump_tx_id, get_sync_state, incoming_history, init_status,
    853             init_sync_state, pending_bounce, register_tx_in, register_tx_in_admin, reorg,
    854             revenue_history, swap_sync_state, sync_out, transfer, update_status,
    855         },
    856     };
    857 
    858     pub const CURR: Currency = Currency::TEST;
    859 
    860     async fn setup() -> (PoolConnection<Postgres>, PgPool) {
    861         taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await
    862     }
    863 
    864     #[tokio::test]
    865     async fn kv() {
    866         let (mut db, pool) = setup().await;
    867 
    868         // Empty status
    869         update_status(&mut db, false).await.unwrap();
    870         update_status(&mut db, true).await.unwrap();
    871 
    872         // Init status
    873         init_status(&pool).await.unwrap();
    874         update_status(&mut db, false).await.unwrap();
    875         update_status(&mut db, true).await.unwrap();
    876 
    877         // Sync state
    878         let first = BlockHash::from_raw_hash(Hash::all_zeros());
    879         let second = BlockHash::from_raw_hash(Hash::from_byte_array([
    880             0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
    881             0, 0, 1,
    882         ]));
    883         init_sync_state(&pool, &first, true).await.unwrap();
    884         assert_eq!(get_sync_state(&mut db).await.unwrap(), first);
    885         init_sync_state(&pool, &second, false).await.unwrap();
    886         assert_eq!(get_sync_state(&mut db).await.unwrap(), first);
    887         init_sync_state(&pool, &second, true).await.unwrap();
    888         assert_eq!(get_sync_state(&mut db).await.unwrap(), second);
    889         swap_sync_state(&mut db, &second, &first).await.unwrap();
    890         assert_eq!(get_sync_state(&mut db).await.unwrap(), first);
    891         swap_sync_state(&mut db, &second, &first).await.unwrap();
    892         assert_eq!(get_sync_state(&mut db).await.unwrap(), first);
    893     }
    894 
    895     pub fn rand_tx_id() -> Txid {
    896         Txid::from_byte_array(rand::random())
    897     }
    898 
    899     static ADDR: LazyLock<Address> = LazyLock::new(|| {
    900         Address::<NetworkUnchecked>::from_str("bcrt1qpw3pjhtf9myl0qk9cxt54qt8qxu2mj955c7esx")
    901             .unwrap()
    902             .assume_checked()
    903     });
    904 
    905     #[tokio::test]
    906     async fn tx_in() {
    907         let (mut db, pool) = setup().await;
    908         let amount = amount("KUDOS:10");
    909 
    910         let mut routine = async |first: &Option<IncomingSubject>,
    911                                  second: &Option<IncomingSubject>| {
    912             let id = sqlx::query("SELECT count(*) + 1 FROM tx_in")
    913                 .try_map(|r: PgRow| r.try_get_u64(0))
    914                 .fetch_one(&mut *db)
    915                 .await
    916                 .unwrap();
    917             let now = now_sql_stable_ts();
    918             let later = now + Span::new().hours(2);
    919             let txid = rand_tx_id();
    920             // Insert
    921             assert_eq!(
    922                 register_tx_in(&pool, &txid, &amount, &ADDR, &now, first)
    923                     .await
    924                     .unwrap(),
    925                 AddIncomingResult::Success {
    926                     new: true,
    927                     pending: false,
    928                     row_id: id,
    929                     valued_at: now,
    930                 }
    931             );
    932             // Idempotent
    933             assert_eq!(
    934                 register_tx_in(&pool, &txid, &amount, &ADDR, &later, first)
    935                     .await
    936                     .expect("register tx in"),
    937                 AddIncomingResult::Success {
    938                     new: false,
    939                     pending: false,
    940                     row_id: id,
    941                     valued_at: now
    942                 }
    943             );
    944             // Many
    945             assert_eq!(
    946                 register_tx_in(&pool, &rand_tx_id(), &amount, &ADDR, &later, second)
    947                     .await
    948                     .expect("register tx in"),
    949                 AddIncomingResult::Success {
    950                     new: true,
    951                     pending: false,
    952                     row_id: id + 1,
    953                     valued_at: later
    954                 }
    955             );
    956         };
    957 
    958         // Empty db
    959         assert_eq!(
    960             revenue_history(&pool, &History::default(), &CURR, dummy_listen)
    961                 .await
    962                 .unwrap(),
    963             Vec::new()
    964         );
    965         assert_eq!(
    966             incoming_history(&pool, &History::default(), &CURR, dummy_listen)
    967                 .await
    968                 .unwrap(),
    969             Vec::new()
    970         );
    971 
    972         // Regular transaction
    973         routine(&None, &None).await;
    974 
    975         let first = EddsaPublicKey::rand();
    976         let second = EddsaPublicKey::rand();
    977 
    978         // Reserve transaction
    979         routine(
    980             &Some(IncomingSubject::Reserve(first.clone())),
    981             &Some(IncomingSubject::Reserve(second)),
    982         )
    983         .await;
    984 
    985         // Kyc transaction
    986         routine(
    987             &Some(IncomingSubject::Kyc(first.clone())),
    988             &Some(IncomingSubject::Kyc(first)),
    989         )
    990         .await;
    991 
    992         // History
    993         assert_eq!(
    994             revenue_history(&pool, &History::default(), &CURR, dummy_listen)
    995                 .await
    996                 .unwrap()
    997                 .len(),
    998             6
    999         );
   1000         assert_eq!(
   1001             incoming_history(&pool, &History::default(), &CURR, dummy_listen)
   1002                 .await
   1003                 .unwrap()
   1004                 .len(),
   1005             4
   1006         );
   1007     }
   1008 
   1009     #[tokio::test]
   1010     async fn tx_in_admin() {
   1011         let (_, pool) = setup().await;
   1012 
   1013         let amount = amount("KUDOS:10");
   1014 
   1015         // Empty db
   1016         assert_eq!(
   1017             incoming_history(&pool, &History::default(), &CURR, dummy_listen)
   1018                 .await
   1019                 .unwrap(),
   1020             Vec::new()
   1021         );
   1022 
   1023         let now = now_sql_stable_ts();
   1024         let later = now + Span::new().hours(2);
   1025         // Insert
   1026         assert_eq!(
   1027             register_tx_in_admin(
   1028                 &pool,
   1029                 &amount,
   1030                 &ADDR,
   1031                 &now,
   1032                 &IncomingSubject::Reserve(EddsaPublicKey::rand())
   1033             )
   1034             .await
   1035             .expect("register tx in"),
   1036             AddIncomingResult::Success {
   1037                 new: true,
   1038                 pending: false,
   1039                 row_id: 1,
   1040                 valued_at: now
   1041             }
   1042         );
   1043         // Many
   1044         assert_eq!(
   1045             register_tx_in_admin(
   1046                 &pool,
   1047                 &amount,
   1048                 &ADDR,
   1049                 &later,
   1050                 &IncomingSubject::Reserve(EddsaPublicKey::rand())
   1051             )
   1052             .await
   1053             .expect("register tx in"),
   1054             AddIncomingResult::Success {
   1055                 new: true,
   1056                 pending: false,
   1057                 row_id: 2,
   1058                 valued_at: later
   1059             }
   1060         );
   1061 
   1062         // History
   1063         assert_eq!(
   1064             incoming_history(&pool, &History::default(), &CURR, dummy_listen)
   1065                 .await
   1066                 .unwrap()
   1067                 .len(),
   1068             2
   1069         );
   1070     }
   1071 
   1072     #[tokio::test]
   1073     async fn bounces() {
   1074         let (_, db) = setup().await;
   1075         let amount = amount("KUDOS:10");
   1076         let now = now_sql_stable_ts();
   1077 
   1078         // No bounces
   1079         assert_eq!(pending_bounce(&db).await.unwrap(), None);
   1080         bounce_sent(&db, 12, &rand_tx_id()).await.unwrap();
   1081 
   1082         // Bounced
   1083         let bounced_txid = rand_tx_id();
   1084         let bounce_txid = rand_tx_id();
   1085         bounce(&db, &bounced_txid, &amount, &ADDR, &now, "invalid format")
   1086             .await
   1087             .unwrap();
   1088         bounce(&db, &bounced_txid, &amount, &ADDR, &now, "invalid format")
   1089             .await
   1090             .unwrap();
   1091         match pending_bounce(&db).await.unwrap() {
   1092             Some((id, txid, _)) if txid == bounced_txid => {
   1093                 bounce_sent(&db, id, &txid).await.unwrap();
   1094                 bounce_sent(&db, id, &txid).await.unwrap();
   1095             }
   1096             _ => unreachable!(),
   1097         }
   1098         assert_matches!(
   1099             sync_out(
   1100                 &db,
   1101                 &bounce_txid,
   1102                 None,
   1103                 &amount,
   1104                 &ADDR,
   1105                 &TxOutKind::Bounce(bounced_txid),
   1106                 &now,
   1107             )
   1108             .await
   1109             .unwrap(),
   1110             SyncOutResult::New
   1111         );
   1112         assert_eq!(pending_bounce(&db).await.unwrap(), None);
   1113         assert_matches!(
   1114             sync_out(
   1115                 &db,
   1116                 &bounce_txid,
   1117                 None,
   1118                 &amount,
   1119                 &ADDR,
   1120                 &TxOutKind::Bounce(bounced_txid),
   1121                 &now,
   1122             )
   1123             .await
   1124             .unwrap(),
   1125             SyncOutResult::None
   1126         );
   1127 
   1128         // Recovered
   1129         let bounced_txid = rand_tx_id();
   1130         let bounce_txid = rand_tx_id();
   1131         assert_matches!(
   1132             register_tx_in(&db, &bounced_txid, &amount, &ADDR, &now, &None)
   1133                 .await
   1134                 .expect("register tx in"),
   1135             AddIncomingResult::Success {
   1136                 new: true,
   1137                 pending: false,
   1138                 ..
   1139             }
   1140         );
   1141         assert_matches!(
   1142             sync_out(
   1143                 &db,
   1144                 &bounce_txid,
   1145                 None,
   1146                 &amount,
   1147                 &ADDR,
   1148                 &TxOutKind::Bounce(bounced_txid),
   1149                 &now,
   1150             )
   1151             .await
   1152             .unwrap(),
   1153             SyncOutResult::Recovered
   1154         );
   1155         assert_matches!(
   1156             sync_out(
   1157                 &db,
   1158                 &bounce_txid,
   1159                 None,
   1160                 &amount,
   1161                 &ADDR,
   1162                 &TxOutKind::Bounce(bounced_txid),
   1163                 &now,
   1164             )
   1165             .await
   1166             .unwrap(),
   1167             SyncOutResult::None
   1168         );
   1169         assert_eq!(pending_bounce(&db).await.unwrap(), None);
   1170     }
   1171 
   1172     #[tokio::test]
   1173     async fn sync_out_talerable_and_replace() {
   1174         let (mut db, poll) = setup().await;
   1175         let amount = amount("KUDOS:10");
   1176         let now = now_sql_stable_ts();
   1177 
   1178         // 1. Simple Sync Out
   1179         let txid = rand_tx_id();
   1180         assert_matches!(
   1181             sync_out(&poll, &txid, None, &amount, &ADDR, &TxOutKind::Simple, &now,)
   1182                 .await
   1183                 .unwrap(),
   1184             SyncOutResult::New
   1185         );
   1186         assert_matches!(
   1187             sync_out(&poll, &txid, None, &amount, &ADDR, &TxOutKind::Simple, &now,)
   1188                 .await
   1189                 .unwrap(),
   1190             SyncOutResult::None
   1191         );
   1192 
   1193         // 2. Replace (Fee Bump)
   1194         assert_matches!(
   1195             sync_out(
   1196                 &poll,
   1197                 &rand_tx_id(),
   1198                 Some(&txid),
   1199                 &amount,
   1200                 &ADDR,
   1201                 &TxOutKind::Simple,
   1202                 &now,
   1203             )
   1204             .await
   1205             .unwrap(),
   1206             SyncOutResult::Replaced
   1207         );
   1208 
   1209         // 3. Recover Talerable Transfer
   1210         let t = TransferRequest {
   1211             amount,
   1212             exchange_base_url: url("https://exchange.example.com"),
   1213             request_uid: HashCode::rand(),
   1214             wtid: ShortHashCode::rand(),
   1215             metadata: None,
   1216             credit_account: CLIENT.as_uri(),
   1217         };
   1218 
   1219         // Create the pending transfer
   1220         assert_matches!(
   1221             transfer(&poll, &CLIENT, &t).await.unwrap(),
   1222             TransferResult::Success(_)
   1223         );
   1224 
   1225         let txid = rand_tx_id();
   1226         // Sync it out
   1227         assert_matches!(
   1228             sync_out(
   1229                 &poll,
   1230                 &txid,
   1231                 None,
   1232                 &amount,
   1233                 &ADDR,
   1234                 &TxOutKind::Talerable {
   1235                     wtid: &t.wtid,
   1236                     url: &t.exchange_base_url,
   1237                     metadata: None,
   1238                 },
   1239                 &now,
   1240             )
   1241             .await
   1242             .unwrap(),
   1243             SyncOutResult::Recovered
   1244         );
   1245 
   1246         // Bump fee
   1247         bump_tx_id(&mut db, &rand_tx_id(), &t.wtid).await.unwrap();
   1248     }
   1249 
   1250     #[tokio::test]
   1251     async fn reorgs() {
   1252         let (_, pool) = setup().await;
   1253         let amount = amount("KUDOS:10");
   1254         let now = now_sql_stable_ts();
   1255 
   1256         // 1. Setup a normal incoming transaction (Credit)
   1257         let txid_normal = rand_tx_id();
   1258         let reserve_pub = EddsaPublicKey::rand();
   1259         register_tx_in(
   1260             &pool,
   1261             &txid_normal,
   1262             &amount,
   1263             &ADDR,
   1264             &now,
   1265             &Some(IncomingSubject::Reserve(reserve_pub.clone())),
   1266         )
   1267         .await
   1268         .unwrap();
   1269 
   1270         // 2. Setup a bounced transaction that was successfully synced out
   1271         let txid_bounced = rand_tx_id();
   1272         let txid_bounce = rand_tx_id();
   1273         bounce(&pool, &txid_bounced, &amount, &ADDR, &now, "bad data")
   1274             .await
   1275             .unwrap();
   1276         sync_out(
   1277             &pool,
   1278             &txid_bounce,
   1279             None,
   1280             &amount,
   1281             &ADDR,
   1282             &TxOutKind::Bounce(txid_bounced),
   1283             &now,
   1284         )
   1285         .await
   1286         .unwrap();
   1287 
   1288         // 3. Trigger a Reorg dropping both the incoming reserve and the outgoing bounce
   1289         let problematic = reorg(&pool, &[txid_normal, txid_bounced]).await.unwrap();
   1290 
   1291         assert_eq!(
   1292             problematic.as_slice(),
   1293             &[
   1294                 ProblematicTx::Taler {
   1295                     txid: txid_normal,
   1296                     addr: ADDR.clone(),
   1297                     ty: taler_common::db::IncomingType::reserve,
   1298                     metadata: reserve_pub
   1299                 },
   1300                 ProblematicTx::Bounce {
   1301                     txid: txid_bounced,
   1302                     bounced_in: txid_bounce
   1303                 }
   1304             ]
   1305         );
   1306     }
   1307 }