depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

db.rs (18044B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use bitcoin::{Address, BlockHash};
     18 use bitcoin::{Txid, hashes::Hash};
     19 use depolymerizer_common::status::{BounceStatus, DebitStatus};
     20 use sqlx::{
     21     PgExecutor, PgPool, QueryBuilder, Row,
     22     postgres::{PgListener, PgRow},
     23 };
     24 use taler_api::db::{BindHelper as _, TypeHelper as _, history, page};
     25 use taler_common::{
     26     api_common::{EddsaPublicKey, SafeU64, ShortHashCode},
     27     api_params::{History, Page},
     28     api_wire::{
     29         IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest,
     30         TransferResponse, TransferState, TransferStatus,
     31     },
     32     types::{
     33         amount::{Amount, Currency},
     34         timestamp::Timestamp,
     35     },
     36 };
     37 use tokio::sync::watch::Receiver;
     38 use url::Url;
     39 
     40 use crate::{
     41     payto::FullBtcPayto,
     42     sql::{sql_addr, sql_btc_amount, sql_generic_payto, sql_payto},
     43 };
     44 
     45 /// Lock the database for worker execution
     46 pub async fn worker_lock<'a>(e: impl PgExecutor<'a>) -> sqlx::Result<bool> {
     47     sqlx::query("SELECT pg_try_advisory_lock(42)")
     48         .try_map(|r: PgRow| r.try_get(0))
     49         .fetch_one(e)
     50         .await
     51 }
     52 
     53 /// Initialize the worker status
     54 pub async fn init_status<'a>(e: impl PgExecutor<'a>) -> sqlx::Result<()> {
     55     sqlx::query(
     56         "INSERT INTO state (name, value) VALUES ('status', $1) ON CONFLICT (name) DO NOTHING",
     57     )
     58     .bind([1u8])
     59     .execute(e)
     60     .await?;
     61     Ok(())
     62 }
     63 
     64 /// Update the worker status
     65 pub async fn update_status(e: &mut PgListener, new_status: bool) -> sqlx::Result<()> {
     66     sqlx::query("UPDATE state SET value=$1 WHERE name='status'")
     67         .bind([new_status as u8])
     68         .execute(&mut *e)
     69         .await?;
     70     sqlx::query("NOTIFY status").execute(e).await?;
     71     Ok(())
     72 }
     73 
     74 /// Initialize the worker sync state
     75 pub async fn init_sync_state<'a>(
     76     e: impl PgExecutor<'a>,
     77     hash: &BlockHash,
     78     reset: bool,
     79 ) -> sqlx::Result<()> {
     80     sqlx::query(if reset {
     81         "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO UPDATE SET value=$1"
     82     } else {
     83         "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING"
     84     })
     85     .bind(hash.as_byte_array())
     86     .execute(e)
     87     .await?;
     88     Ok(())
     89 }
     90 
     91 /// Get the current worker sync state
     92 pub async fn get_sync_state<'a>(e: impl PgExecutor<'a>) -> sqlx::Result<BlockHash> {
     93     sqlx::query("SELECT value FROM state WHERE name='last_hash'")
     94         .try_map(|r: PgRow| r.try_get_map(0, BlockHash::from_slice))
     95         .fetch_one(e)
     96         .await
     97 }
     98 
     99 /// Update the worker sync state if it hasn't changed yet
    100 pub async fn swap_sync_state<'a>(
    101     e: impl PgExecutor<'a>,
    102     from: &BlockHash,
    103     to: &BlockHash,
    104 ) -> sqlx::Result<()> {
    105     sqlx::query("UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2")
    106         .bind(to.as_byte_array())
    107         .bind(from.as_byte_array())
    108         .execute(e)
    109         .await?;
    110     Ok(())
    111 }
    112 
    113 pub enum TransferResult {
    114     Success(TransferResponse),
    115     RequestUidReuse,
    116     WtidReuse,
    117 }
    118 
    119 /// Initiate a new Taler transfer idempotently
    120 pub async fn transfer<'a>(
    121     e: impl PgExecutor<'a>,
    122     creditor: &FullBtcPayto,
    123     transfer: &TransferRequest,
    124 ) -> sqlx::Result<TransferResult> {
    125     sqlx::query(
    126         "
    127             SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at
    128             FROM taler_transfer(($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8)
    129         ",
    130     )
    131     .bind_amount(&transfer.amount)
    132     .bind(transfer.exchange_base_url.as_str())
    133     .bind(creditor.0.to_string())
    134     .bind(&creditor.name)
    135     .bind(transfer.request_uid.as_slice())
    136     .bind(transfer.wtid.as_slice())
    137     .bind_timestamp(&Timestamp::now())
    138     .try_map(|r: PgRow| {
    139         Ok(if r.try_get("out_request_uid_reuse")? {
    140             TransferResult::RequestUidReuse
    141         } else if r.try_get("out_wtid_reuse")? {
    142             TransferResult::WtidReuse
    143         } else {
    144             TransferResult::Success(TransferResponse {
    145                 row_id: r.try_get_safeu64("out_transfer_row_id")?,
    146                 timestamp: r.try_get_timestamp("out_created_at")?,
    147             })
    148         })
    149     })
    150     .fetch_one(e)
    151     .await
    152 }
    153 
    154 /// Paginate initiated Taler transfers
    155 pub async fn transfer_page<'a>(
    156     e: impl PgExecutor<'a>,
    157     status: &Option<TransferState>,
    158     params: &Page,
    159     currency: &Currency,
    160 ) -> sqlx::Result<Vec<TransferListStatus>> {
    161     let status = match status {
    162         Some(s) => match s {
    163             TransferState::pending => Some(DebitStatus::requested),
    164             TransferState::success => Some(DebitStatus::sent),
    165             TransferState::transient_failure | TransferState::permanent_failure => {
    166                 return Ok(Vec::new());
    167             }
    168         },
    169         None => None,
    170     };
    171 
    172     page(
    173         e,
    174         "id",
    175         params,
    176         || {
    177             let mut sql = QueryBuilder::new(
    178                 "
    179                     SELECT
    180                         id,
    181                         status,
    182                         (amount).val as amount_val,
    183                         (amount).frac as amount_frac,
    184                         credit_acc,
    185                         credit_name,
    186                         created
    187                     FROM tx_out WHERE request_uid IS NOT NULL AND
    188                 ",
    189             );
    190             if let Some(status) = status {
    191                 sql.push(" status = ").push_bind(status).push(" AND ");
    192             }
    193             sql
    194         },
    195         |r: PgRow| {
    196             Ok(TransferListStatus {
    197                 row_id: r.try_get_safeu64(0)?,
    198                 status: match r.try_get(1)? {
    199                     DebitStatus::requested => TransferState::pending,
    200                     DebitStatus::sent => TransferState::success,
    201                 },
    202                 amount: r.try_get_amount("amount", currency)?,
    203                 credit_account: sql_payto(&r, "credit_acc", "credit_name")?,
    204                 timestamp: r.try_get_timestamp("created")?,
    205             })
    206         },
    207     )
    208     .await
    209 }
    210 
    211 /// Get a Taler transfer info
    212 pub async fn transfer_by_id<'a>(
    213     e: impl PgExecutor<'a>,
    214     id: u64,
    215     currency: &Currency,
    216 ) -> sqlx::Result<Option<TransferStatus>> {
    217     sqlx::query(
    218         "
    219             SELECT
    220                 status,
    221                 (amount).val as amount_val,
    222                 (amount).frac as amount_frac,
    223                 exchange_url,
    224                 wtid,
    225                 credit_acc,
    226                 credit_name,
    227                 created
    228             FROM tx_out WHERE request_uid IS NOT NULL AND id = $1
    229         ",
    230     )
    231     .bind(id as i64)
    232     .try_map(|r: PgRow| {
    233         Ok(TransferStatus {
    234             status: match r.try_get(0)? {
    235                 DebitStatus::requested => TransferState::pending,
    236                 DebitStatus::sent => TransferState::success,
    237             },
    238             status_msg: None,
    239             amount: r.try_get_amount_i(1, currency)?,
    240             origin_exchange_url: r.try_get(3)?,
    241             wtid: r.try_get_base32(4)?,
    242             credit_account: sql_payto(&r, 5, 6)?,
    243             timestamp: r.try_get_timestamp(7)?,
    244         })
    245     })
    246     .fetch_optional(e)
    247     .await
    248 }
    249 
    250 /// Fetch outgoing Taler transactions history
    251 pub async fn outgoing_history(
    252     db: &PgPool,
    253     params: &History,
    254     currency: &Currency,
    255     listen: impl FnOnce() -> Receiver<i64>,
    256 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
    257     history(
    258         db,
    259         "id", 
    260         params,
    261         listen,
    262         ||  QueryBuilder::new(
    263         "SELECT id, created, (amount).val, (amount).frac, wtid, credit_acc, credit_name, exchange_url FROM tx_out WHERE"
    264     ), |r| {
    265         Ok(OutgoingBankTransaction {
    266             row_id: r.try_get_safeu64(0)?,
    267             date: r.try_get_timestamp(1)?,
    268             amount: r.try_get_amount_i(2, currency)?,
    269             wtid: r.try_get_base32(4)?,
    270             credit_account: sql_payto(&r, 5, 6)?,
    271             exchange_base_url: r.try_get_url(7)?,
    272         })
    273     }).await
    274 }
    275 
    276 /// Fetch incoming Taler transactions history
    277 pub async fn incoming_history(
    278     db: &PgPool,
    279     params: &History,
    280     currency: &Currency,
    281     listen: impl FnOnce() -> Receiver<i64>,
    282 ) -> sqlx::Result<Vec<IncomingBankTransaction>> {
    283     history(
    284         db,
    285         "id", 
    286         params,
    287         listen,
    288         ||  QueryBuilder::new(
    289         "SELECT id, received, (amount).val, (amount).frac, reserve_pub, debit_acc FROM tx_in WHERE"
    290     ), |r| {
    291         Ok(IncomingBankTransaction::Reserve {
    292             row_id: r.try_get_safeu64(0)?,
    293             date: r.try_get_timestamp(1)?,
    294             amount: r.try_get_amount_i(2, currency)?,
    295             reserve_pub: r.try_get_base32(4)?,
    296             debit_account: sql_generic_payto(&r, 5)?,
    297         })
    298     }).await
    299 }
    300 
    301 #[derive(Debug, PartialEq, Eq)]
    302 pub enum AddIncomingResult {
    303     Success {
    304         new: bool,
    305         row_id: SafeU64,
    306         valued_at: Timestamp,
    307     },
    308     ReservePubReuse,
    309 }
    310 
    311 /// Register a fake Taler credit
    312 pub async fn register_tx_in_admin<'a>(
    313     e: impl PgExecutor<'a>,
    314     amount: &Amount,
    315     debit_acc: &Address,
    316     received: &Timestamp,
    317     reserve_pub: &EddsaPublicKey,
    318 ) -> sqlx::Result<AddIncomingResult> {
    319     sqlx::query(
    320         "
    321             SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new
    322             FROM register_tx_in(($1, $2)::taler_amount, $3, $4, $5, NULL)
    323         ",
    324     )
    325     .bind_amount(amount)
    326     .bind(debit_acc.to_string())
    327     .bind(reserve_pub.as_slice())
    328     .bind_timestamp(received)
    329     .try_map(|r: PgRow| {
    330         Ok(if r.try_get(0)? {
    331             AddIncomingResult::ReservePubReuse
    332         } else {
    333             AddIncomingResult::Success {
    334                 row_id: r.try_get_safeu64(1)?,
    335                 valued_at: r.try_get_timestamp(2)?,
    336                 new: r.try_get(3)?,
    337             }
    338         })
    339     })
    340     .fetch_one(e)
    341     .await
    342 }
    343 
    344 /// Register a Taler credit
    345 pub async fn register_tx_in<'a>(
    346     e: impl PgExecutor<'a>,
    347     txid: &Txid,
    348     amount: &Amount,
    349     debit_acc: &Address,
    350     received: &Timestamp,
    351     reserve_pub: &EddsaPublicKey,
    352 ) -> sqlx::Result<AddIncomingResult> {
    353     sqlx::query(
    354         "
    355             SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new
    356             FROM register_tx_in(($1, $2)::taler_amount, $3, $4, $5, $6)
    357         ",
    358     )
    359     .bind_amount(amount)
    360     .bind(debit_acc.to_string())
    361     .bind(reserve_pub.as_slice())
    362     .bind_timestamp(received)
    363     .bind(txid.as_byte_array())
    364     .try_map(|r: PgRow| {
    365         Ok(if r.try_get(0)? {
    366             AddIncomingResult::ReservePubReuse
    367         } else {
    368             AddIncomingResult::Success {
    369                 row_id: r.try_get_safeu64(1)?,
    370                 valued_at: r.try_get_timestamp(2)?,
    371                 new: r.try_get(3)?,
    372             }
    373         })
    374     })
    375     .fetch_one(e)
    376     .await
    377 }
    378 
    379 /// Update a transaction id after bumping it
    380 pub async fn bump_tx_id<'a>(
    381     e: impl PgExecutor<'a>,
    382     from: &Txid,
    383     to: &Txid,
    384 ) -> sqlx::Result<ShortHashCode> {
    385     sqlx::query("UPDATE tx_out SET txid=$1 WHERE txid=$2 RETURNING wtid")
    386         .bind(to.as_byte_array())
    387         .bind(from.as_byte_array())
    388         .try_map(|r: PgRow| r.try_get_base32(0))
    389         .fetch_one(e)
    390         .await
    391 }
    392 
    393 /// Reset the state of a conflicted debit
    394 pub async fn conflict_tx_out<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> {
    395     Ok(
    396         sqlx::query("UPDATE tx_out SET status=$1, txid=NULL where txid=$2")
    397             .bind(DebitStatus::requested)
    398             .bind(id.as_byte_array())
    399             .execute(e)
    400             .await?
    401             .rows_affected()
    402             > 0,
    403     )
    404 }
    405 
    406 /// Reset the state of a conflicted bounce
    407 pub async fn conflict_bounce<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> {
    408     Ok(
    409         sqlx::query("UPDATE bounce SET status=$1, txid=NULL where txid=$2")
    410             .bind(BounceStatus::requested)
    411             .bind(id.as_byte_array())
    412             .execute(e)
    413             .await?
    414             .rows_affected()
    415             > 0,
    416     )
    417 }
    418 
    419 /// Initiate a bounce
    420 pub async fn bounce<'a>(e: impl PgExecutor<'a>, txid: &Txid, reason: &str) -> sqlx::Result<()> {
    421     sqlx::query("INSERT INTO bounce (created, bounced, reason, status) VALUES ($1, $2, $3, 'requested') ON CONFLICT (bounced) DO NOTHING")
    422         .bind_timestamp(&Timestamp::now())
    423         .bind(txid.as_byte_array())
    424         .bind(reason)
    425         .execute(e)
    426         .await?;
    427     Ok(())
    428 }
    429 
    430 pub enum ProblematicTx {
    431     In {
    432         txid: Txid,
    433         addr: Address,
    434         reserve_pub: EddsaPublicKey,
    435     },
    436     Bounce {
    437         txid: Txid,
    438         bounced: Txid,
    439     },
    440 }
    441 
    442 /// Handle transactions being removed during a reorganization
    443 pub async fn reorg<'a>(e: impl PgExecutor<'a>, ids: &[Txid]) -> sqlx::Result<Vec<ProblematicTx>> {
    444     // A removed incoming transaction is a correctness issues in only two cases:
    445     // - it is a confirmed credit registered in the database
    446     // - it is an invalid transactions already bounced
    447     // Those two cases can compromise bitcoin backing
    448     // Removed outgoing transactions will be retried automatically by the node
    449     sqlx::query(
    450         "
    451         SELECT tx_in.txid, NULL, debit_acc, tx_in.reserve_pub
    452         FROM tx_in WHERE tx_in.txid = ANY($1)
    453         UNION ALL
    454         SELECT bounce.txid, bounce.bounced, NULL, NULL
    455         from bounce WHERE bounce.bounced = ANY($1);
    456     ",
    457     )
    458     .bind(ids.iter().map(|it| it.as_byte_array()).collect::<Vec<_>>())
    459     .try_map(|r: PgRow| {
    460         let txid = r.try_get_map(0, Txid::from_slice)?;
    461         let check: Option<&[u8]> = r.try_get(1)?;
    462         Ok(if check.is_some() {
    463             ProblematicTx::Bounce {
    464                 txid,
    465                 bounced: r.try_get_map(1, Txid::from_slice)?,
    466             }
    467         } else {
    468             ProblematicTx::In {
    469                 txid,
    470                 addr: sql_addr(&r, 2)?,
    471                 reserve_pub: r.try_get_base32(3)?,
    472             }
    473         })
    474     })
    475     .fetch_all(e)
    476     .await
    477 }
    478 
    479 pub enum SyncOutResult {
    480     New,
    481     Replaced,
    482     Recovered,
    483     None,
    484 }
    485 
    486 pub async fn sync_out<'a>(
    487     e: impl PgExecutor<'a>,
    488     txid: &Txid,
    489     replace_txid: Option<&Txid>,
    490     amount: &Amount,
    491     exchange_url: &Url,
    492     credit_acc: &Address,
    493     wtid: &ShortHashCode,
    494     created: &Timestamp,
    495 ) -> sqlx::Result<SyncOutResult> {
    496     sqlx::query(
    497         "
    498             SELECT out_replaced, out_recovered, out_new
    499             FROM sync_out($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8)
    500         ",
    501     )
    502     .bind(txid.as_byte_array())
    503     .bind(replace_txid.map(|it| it.as_byte_array()))
    504     .bind_amount(amount)
    505     .bind(exchange_url.to_string())
    506     .bind(credit_acc.to_string())
    507     .bind(wtid.as_slice())
    508     .bind_timestamp(created)
    509     .try_map(|r: PgRow| {
    510         Ok(if r.try_get(0)? {
    511             SyncOutResult::Replaced
    512         } else if r.try_get(1)? {
    513             SyncOutResult::Recovered
    514         } else if r.try_get(2)? {
    515             SyncOutResult::New
    516         } else {
    517             SyncOutResult::None
    518         })
    519     })
    520     .fetch_one(e)
    521     .await
    522 }
    523 
    524 pub async fn pending_debit<'a>(
    525     e: impl PgExecutor<'a>,
    526     currency: &Currency,
    527 ) -> sqlx::Result<Option<(i64, bitcoin::Amount, ShortHashCode, Address, Url)>> {
    528     sqlx::query(
    529         "SELECT id, (amount).val, (amount).frac, wtid, credit_acc, exchange_url FROM tx_out WHERE status='requested' ORDER BY created LIMIT 1",
    530     )
    531     .try_map(|r: PgRow| {
    532         Ok((
    533             r.try_get(0)?,
    534             sql_btc_amount(&r, 1, currency)?,
    535             r.try_get_base32(3)?,
    536             sql_addr(&r, 4)?,
    537             r.try_get_parse(5)?
    538         ))
    539     })
    540     .fetch_optional(e)
    541     .await
    542 }
    543 
    544 pub async fn debit_sent<'a>(e: impl PgExecutor<'a>, id: i64, txid: &Txid) -> sqlx::Result<()> {
    545     sqlx::query("UPDATE tx_out SET status='sent', txid=$1 WHERE id=$2")
    546         .bind(txid.as_byte_array())
    547         .bind(id)
    548         .execute(e)
    549         .await?;
    550     Ok(())
    551 }
    552 
    553 pub async fn pending_bounce<'a>(
    554     e: impl PgExecutor<'a>,
    555 ) -> sqlx::Result<Option<(i64, Txid, Option<String>)>> {
    556     sqlx::query(
    557         "SELECT id, bounced, reason FROM bounce WHERE status='requested' ORDER BY created LIMIT 1",
    558     )
    559     .try_map(|r: PgRow| {
    560         Ok((
    561             r.try_get(0)?,
    562             r.try_get_map(1, Txid::from_slice)?,
    563             r.try_get(2)?,
    564         ))
    565     })
    566     .fetch_optional(e)
    567     .await
    568 }
    569 
    570 pub async fn bounce_set_status<'a>(
    571     e: impl PgExecutor<'a>,
    572     id: i64,
    573     txid: Option<&Txid>,
    574     status: &BounceStatus,
    575 ) -> sqlx::Result<()> {
    576     sqlx::query("UPDATE bounce SET txid=$1, status=$2 WHERE id=$3")
    577         .bind(txid.map(|it| it.as_byte_array()))
    578         .bind(status)
    579         .bind(id)
    580         .execute(e)
    581         .await?;
    582     Ok(())
    583 }
    584 
    585 pub enum SyncBounceResult {
    586     New,
    587     Recovered,
    588     None,
    589 }
    590 
    591 pub async fn sync_bounce<'a>(
    592     e: impl PgExecutor<'a>,
    593     txid: &Txid,
    594     bounced: &Txid,
    595     created: &Timestamp,
    596 ) -> sqlx::Result<SyncBounceResult> {
    597     sqlx::query(
    598         "
    599             SELECT out_recovered, out_new
    600             FROM sync_bounce($1, $2, $3)
    601         ",
    602     )
    603     .bind(txid.as_byte_array())
    604     .bind(bounced.as_byte_array())
    605     .bind_timestamp(created)
    606     .try_map(|r: PgRow| {
    607         Ok(if r.try_get(0)? {
    608             SyncBounceResult::Recovered
    609         } else if r.try_get(1)? {
    610             SyncBounceResult::New
    611         } else {
    612             SyncBounceResult::None
    613         })
    614     })
    615     .fetch_one(e)
    616     .await
    617 }