depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

worker.rs (18398B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2022-2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 use std::{fmt::Write, time::SystemTime};
     17 
     18 use bitcoin::{Amount as BtcAmount, Txid, hashes::Hash};
     19 use depolymerizer_common::{metadata::OutMetadata, status::BounceStatus};
     20 use sqlx::{PgPool, postgres::PgListener};
     21 use taler_common::{ExpoBackoffDecorr, api_common::ShortHashCode, types::timestamp::Timestamp};
     22 use tokio::time::sleep;
     23 use tracing::{debug, error, info, trace, warn};
     24 use url::Url;
     25 
     26 use crate::{
     27     GetOpReturnErr, GetSegwitErr,
     28     config::WorkerCfg,
     29     db::{self, AddIncomingResult, SyncBounceResult, SyncOutResult, worker_lock},
     30     fail_point::fail_point,
     31     rpc::{
     32         self, Category, ErrorCode, ListSinceBlock, ListTransaction, Rpc, Transaction, rpc_wallet,
     33     },
     34     rpc_utils::sender_address,
     35     taler_utils::btc_to_taler,
     36 };
     37 
     38 use super::{LoopError, LoopResult, analysis::analysis};
     39 
     40 pub async fn worker_loop(mut state: WorkerCfg, pool: PgPool) {
     41     let mut jitter = ExpoBackoffDecorr::default();
     42     let mut lifetime = state.lifetime;
     43     let mut status = true;
     44     let mut skip_notification = true;
     45 
     46     loop {
     47         let result: LoopResult<()> = async {
     48             // Connect
     49             let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg).await?;
     50             let db = &mut PgListener::connect_with(&pool).await?;
     51 
     52             // It is not possible to atomically update the blockchain and the database.
     53             // When we failed to sync the database and the blockchain state we rely on
     54             // sync_chain to recover the lost updates.
     55             // When this function is running concurrently, it not possible to known another
     56             // execution has failed, and this can lead to a transaction being sent multiple time.
     57             // To ensure only a single version of this function is running at a given time we rely
     58             // on postgres advisory lock
     59 
     60             // Take the lock
     61             if !worker_lock(&mut *db).await? {
     62                 return Err(LoopError::Concurrency);
     63             }
     64 
     65             // Listen to all channels
     66             db.listen_all(["new_block", "taler_out"]).await?;
     67 
     68             loop {
     69                 // Wait for the next notification
     70                 {
     71                     let ntf = db.next_buffered();
     72                     if let Some(ntf) = &ntf {
     73                         trace!(target: "worker", "notification from {}", ntf.channel())
     74                     }
     75                     if !skip_notification && ntf.is_none() {
     76                         debug!(target: "worker", "waiting for notifications");
     77                         // Block until next notification
     78                         if let Some(ntf) = db.try_recv().await? {
     79                             trace!(target: "worker", "notification from {}", ntf.channel())
     80                         }
     81                     }
     82                     // Conflate all notifications
     83                     while let Some(ntf) = db.next_buffered() {
     84                         trace!(target: "worker", "notification from {}", ntf.channel())
     85                     }
     86                 }
     87 
     88                 // Check lifetime
     89                 if let Some(nb) = lifetime.as_mut() {
     90                     if *nb == 0 {
     91                         info!(target: "worker", "Reach end of lifetime");
     92                         return Ok(());
     93                     } else {
     94                         *nb -= 1;
     95                     }
     96                 }
     97 
     98                 debug!(target: "worker", "syncing blockchain");
     99 
    100                 // Perform analysis
    101                 state.confirmation =
    102                     analysis(rpc, state.confirmation, state.max_confirmation).await?;
    103 
    104                 worker_step(rpc, db, &mut state, &mut status).await?;
    105 
    106                 skip_notification = false;
    107                 jitter.reset();
    108             }
    109         }
    110         .await;
    111         if let Err(e) = result {
    112             error!(target: "worker", "{e}");
    113             // When we catch an error, we sometimes want to retry immediately (eg. reconnect to RPC or DB).
    114             // Bitcoin error codes are generic. We need to match the msg to get precise ones. Some errors
    115             // can resolve themselves when a new block is mined (new fees, new transactions). Our simple
    116             // approach is to wait for the next loop when an RPC error is caught to prevent endless logged errors.
    117             skip_notification = matches!(
    118                 e,
    119                 LoopError::Rpc(rpc::Error::Transport(_))
    120                     | LoopError::DB(_)
    121                     | LoopError::Injected(_)
    122                     | LoopError::Concurrency
    123             );
    124             sleep(jitter.backoff()).await;
    125         } else {
    126             return;
    127         }
    128     }
    129 }
    130 
    131 pub async fn worker_transient(mut state: WorkerCfg, pool: PgPool) -> LoopResult<()> {
    132     let mut status = true;
    133 
    134     // Connect
    135     let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg).await?;
    136     let db = &mut PgListener::connect_with(&pool).await?;
    137 
    138     // It is not possible to atomically update the blockchain and the database.
    139     // When we failed to sync the database and the blockchain state we rely on
    140     // sync_chain to recover the lost updates.
    141     // When this function is running concurrently, it not possible to known another
    142     // execution has failed, and this can lead to a transaction being sent multiple time.
    143     // To ensure only a single version of this function is running at a given time we rely
    144     // on postgres advisory lock
    145 
    146     // Take the lock
    147     if !worker_lock(&mut *db).await? {
    148         return Err(LoopError::Concurrency);
    149     }
    150 
    151     worker_step(rpc, db, &mut state, &mut status).await?;
    152     Ok(())
    153 }
    154 
    155 /// Synchronize local db with blockchain and perform transactions
    156 async fn worker_step(
    157     rpc: &mut Rpc,
    158     db: &mut PgListener,
    159     state: &mut WorkerCfg,
    160     status: &mut bool,
    161 ) -> LoopResult<()> {
    162     // Sync chain
    163     if let Some(stuck) = sync_chain(rpc, db, state, status).await? {
    164         // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent
    165 
    166         // Send requested debits
    167         while debit(db, rpc, state).await? {}
    168 
    169         // Bump stuck transactions
    170         for id in stuck {
    171             let bump = rpc.bump_fee(&id).await?;
    172             fail_point("(injected) fail bump", 0.3)?;
    173             let wtid = db::bump_tx_id(&mut *db, &id, &bump.txid).await?;
    174             info!(target: "worker", ">> (bump) {wtid} replace {id} with {}", bump.txid);
    175         }
    176 
    177         // Send requested bounce
    178         while bounce(db, rpc, &state.bounce_fee).await? {}
    179     }
    180     Ok(())
    181 }
    182 
    183 /// Parse new transactions, return stuck transactions if the database is up to date with the latest mined block
    184 async fn sync_chain(
    185     rpc: &mut Rpc,
    186     db: &mut PgListener,
    187     state: &WorkerCfg,
    188     status: &mut bool,
    189 ) -> LoopResult<Option<Vec<Txid>>> {
    190     // Get stored last_hash
    191     let sync_state = db::get_sync_state(&mut *db).await?;
    192     // Get the current confirmation delay
    193     let conf_delay = state.confirmation;
    194 
    195     // Get all transactions made since this block
    196     let ListSinceBlock {
    197         transactions,
    198         removed,
    199         lastblock,
    200     } = rpc.list_since_block(Some(&sync_state), conf_delay).await?;
    201 
    202     // Check if a confirmed incoming transaction have been removed by a blockchain reorganization
    203     let new_status = sync_chain_removed(removed, db, conf_delay as i32).await?;
    204 
    205     // Sync status with database
    206     if *status != new_status {
    207         db::update_status(db, new_status).await?;
    208         *status = new_status;
    209         if new_status {
    210             info!(target: "worker", "Recovered lost transactions");
    211         }
    212     }
    213     if !new_status {
    214         return Ok(None);
    215     }
    216 
    217     let mut stuck = vec![];
    218 
    219     for tx in transactions {
    220         match tx.category {
    221             Category::Send => {
    222                 if sync_chain_outgoing(&tx.txid, tx.confirmations, rpc, db, state).await? {
    223                     stuck.push(tx.txid);
    224                 }
    225             }
    226             Category::Receive if tx.confirmations >= conf_delay as i32 => {
    227                 sync_chain_incoming_confirmed(&tx.txid, rpc, db, state).await?
    228             }
    229             _ => {
    230                 // Ignore coinbase and unconfirmed send transactions
    231             }
    232         }
    233     }
    234 
    235     // Move last_hash forward
    236     db::swap_sync_state(db, &sync_state, &lastblock).await?;
    237 
    238     Ok(Some(stuck))
    239 }
    240 
    241 /// Sync database with removed transactions, return false if bitcoin backing is compromised
    242 async fn sync_chain_removed(
    243     removed: Vec<ListTransaction>,
    244     db: &mut PgListener,
    245     min_confirmations: i32,
    246 ) -> LoopResult<bool> {
    247     // A removed incoming transaction is a correctness issues in only two cases:
    248     // - it is a confirmed credit registered in the database
    249     // - it is an invalid transactions already bounced
    250     // Those two cases can compromise bitcoin backing
    251     // Removed outgoing transactions will be retried automatically by the node
    252 
    253     let potential_problematic_ids: Vec<Txid> = removed
    254         .into_iter()
    255         .filter_map(|tx| {
    256             (tx.category == Category::Receive && tx.confirmations < min_confirmations)
    257                 .then_some(tx.txid)
    258         })
    259         .collect();
    260 
    261     // Only keep incoming transaction that are not reconfirmed
    262     let problematic_tx = db::reorg(&mut *db, &potential_problematic_ids).await?;
    263     if problematic_tx.is_empty() {
    264         return Ok(true);
    265     }
    266     let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string();
    267     for tx in problematic_tx {
    268         match tx {
    269             db::ProblematicTx::In {
    270                 txid,
    271                 addr,
    272                 reserve_pub,
    273             } => {
    274                 write!(&mut buf, "\n\tcredit {reserve_pub} in {txid} from {addr}",).unwrap();
    275             }
    276             db::ProblematicTx::Bounce { txid, bounced } => {
    277                 write!(&mut buf, "\n\tbounced {txid} in {bounced}").unwrap();
    278             }
    279         }
    280     }
    281     error!(target: "worker", "{buf}");
    282     Ok(false)
    283 }
    284 
    285 /// Sync database with an incoming confirmed transaction
    286 async fn sync_chain_incoming_confirmed(
    287     id: &Txid,
    288     rpc: &mut Rpc,
    289     db: &mut PgListener,
    290     state: &WorkerCfg,
    291 ) -> Result<(), LoopError> {
    292     match rpc.get_tx_segwit_key(id).await {
    293         Ok((full, reserve_pub)) => {
    294             // Store transactions in database
    295             let debit_addr = sender_address(rpc, &full).await?;
    296             let amount = btc_to_taler(&full.amount, &state.currency);
    297             match db::register_tx_in(
    298                 &mut *db,
    299                 id,
    300                 &amount,
    301                 &debit_addr,
    302                 &Timestamp::from_sql_micros((full.time * 1000000) as i64).unwrap(),
    303                 &reserve_pub,
    304             )
    305             .await?
    306             {
    307                 AddIncomingResult::Success {
    308                     new,
    309                     row_id: _,
    310                     valued_at: _,
    311                 } => {
    312                     if new {
    313                         info!(target: "worker", "<< {amount} {reserve_pub} in {id} from {debit_addr}");
    314                     }
    315                 }
    316                 AddIncomingResult::ReservePubReuse => {
    317                     db::bounce(db, id, "reserve_pub reuse").await?
    318                 }
    319             }
    320         }
    321         Err(err) => match err {
    322             GetSegwitErr::Decode(e) => db::bounce(db, id, &e.to_string()).await?,
    323             GetSegwitErr::RPC(e) => return Err(e.into()),
    324         },
    325     }
    326     Ok(())
    327 }
    328 
    329 /// Sync database with a debit transaction, return true if stuck
    330 async fn sync_chain_debit(
    331     txid: &Txid,
    332     full: &Transaction,
    333     wtid: &ShortHashCode,
    334     exchange_url: &Url,
    335     db: &mut PgListener,
    336     confirmations: i32,
    337     state: &WorkerCfg,
    338 ) -> LoopResult<bool> {
    339     let credit_addr = full.details[0].address.clone().unwrap().assume_checked();
    340     let amount = btc_to_taler(&full.amount, &state.currency);
    341 
    342     if confirmations < 0 {
    343         // Handle conflicting tx
    344         if full.replaced_by_txid.is_none() && db::conflict_tx_out(db, txid).await? {
    345             warn!(target: "worker", ">> (conflict) {wtid} in {txid} to {credit_addr}");
    346         }
    347     } else {
    348         match db::sync_out(
    349             db,
    350             txid,
    351             full.replaced_by_txid.as_ref(),
    352             &amount,
    353             exchange_url,
    354             &credit_addr,
    355             wtid,
    356             &Timestamp::from_sql_micros((full.time * 1000000) as i64).unwrap(),
    357         )
    358         .await?
    359         {
    360             SyncOutResult::New => {
    361                 warn!(target: "worker", ">> (onchain) {amount} {wtid} in {txid} to {credit_addr}");
    362             }
    363             SyncOutResult::Replaced => {
    364                 info!(
    365                     target: "worker",
    366                     ">> (recovered) {wtid} replace {txid} with {}",
    367                     full.replaced_by_txid.unwrap()
    368                 )
    369             }
    370             SyncOutResult::Recovered => {
    371                 warn!(target: "worker", ">> (recovered) {amount} {wtid} in {txid} to {credit_addr}")
    372             }
    373             SyncOutResult::None => {}
    374         }
    375 
    376         // Check if stuck
    377         if let Some(delay) = state.bump_delay
    378             && confirmations == 0
    379             && full.replaced_by_txid.is_none()
    380         {
    381             let now = SystemTime::now()
    382                 .duration_since(SystemTime::UNIX_EPOCH)
    383                 .unwrap()
    384                 .as_secs();
    385             if now - full.time > delay as u64 {
    386                 return Ok(true);
    387             }
    388         }
    389     }
    390     Ok(false)
    391 }
    392 
    393 /// Sync database with an outgoing bounce transaction
    394 async fn sync_chain_bounce(
    395     txid: &Txid,
    396     bounced: &Txid,
    397     db: &mut PgListener,
    398     confirmations: i32,
    399 ) -> LoopResult<()> {
    400     if confirmations < 0 {
    401         // Handle conflicting tx
    402         if db::conflict_bounce(db, txid).await? {
    403             warn!(target: "worker", "|| (conflict) {bounced} in {txid}");
    404         }
    405     } else {
    406         match db::sync_bounce(db, txid, bounced, &Timestamp::now()).await? {
    407             SyncBounceResult::New => warn!(target: "worker", "|| (onchain) {bounced} in {txid}"),
    408             SyncBounceResult::Recovered => {
    409                 warn!(target: "worker", "|| (recovered) {bounced} in {txid}")
    410             }
    411             SyncBounceResult::None => {}
    412         }
    413     }
    414 
    415     Ok(())
    416 }
    417 
    418 /// Sync database with an outgoing transaction, return true if stuck
    419 async fn sync_chain_outgoing(
    420     id: &Txid,
    421     confirmations: i32,
    422     rpc: &mut Rpc,
    423     db: &mut PgListener,
    424     state: &WorkerCfg,
    425 ) -> LoopResult<bool> {
    426     match rpc
    427         .get_tx_op_return(id)
    428         .await
    429         .map(|(full, bytes)| (full, OutMetadata::decode(&bytes)))
    430     {
    431         Ok((full, Ok(info))) => match info {
    432             OutMetadata::Debit { wtid, url } => {
    433                 return sync_chain_debit(id, &full, &wtid, &url, db, confirmations, state).await;
    434             }
    435             OutMetadata::Bounce { bounced } => {
    436                 sync_chain_bounce(id, &Txid::from_byte_array(bounced), db, confirmations).await?
    437             }
    438         },
    439         Ok((_, Err(e))) => warn!(target: "worker", "send: decode-info {id} - {e}"),
    440         Err(e) => match e {
    441             GetOpReturnErr::MissingOpReturn => { /* Ignore */ }
    442             GetOpReturnErr::RPC(e) => return Err(e)?,
    443         },
    444     }
    445     Ok(false)
    446 }
    447 
    448 /// Send a debit transaction on the blockchain, return false if no more requested transactions are found
    449 async fn debit(db: &mut PgListener, rpc: &mut Rpc, state: &WorkerCfg) -> LoopResult<bool> {
    450     // We rely on the advisory lock to ensure we are the only one sending transactions
    451     if let Some((id, amount, wtid, addr, url)) =
    452         db::pending_debit(&mut *db, &state.currency).await?
    453     {
    454         let metadata = OutMetadata::Debit {
    455             wtid: wtid.clone(),
    456             url,
    457         };
    458 
    459         let txid = rpc
    460             .send(&addr, &amount, Some(&metadata.encode().unwrap()), false)
    461             .await?;
    462         fail_point("(injected) fail debit", 0.3)?;
    463         db::debit_sent(db, id, &txid).await?;
    464         let amount = btc_to_taler(&amount.to_signed().unwrap(), &state.currency);
    465         info!(target: "worker", ">> {amount} {wtid} in {txid} to {addr}");
    466         Ok(true)
    467     } else {
    468         Ok(false)
    469     }
    470 }
    471 
    472 /// Bounce a transaction on the blockchain, return false if no more requested transactions are found
    473 async fn bounce(db: &mut PgListener, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> {
    474     // We rely on the advisory lock to ensure we are the only one sending transactions
    475     if let Some((id, bounced, reason)) = db::pending_bounce(&mut *db).await? {
    476         let metadata = OutMetadata::Bounce {
    477             bounced: *bounced.as_byte_array(),
    478         };
    479 
    480         match rpc
    481             .bounce(&bounced, fee, Some(&metadata.encode().unwrap()))
    482             .await
    483         {
    484             Ok(txid) => {
    485                 fail_point("(injected) fail bounce", 0.3)?;
    486                 db::bounce_set_status(db, id, Some(&txid), &BounceStatus::sent).await?;
    487                 if let Some(reason) = reason {
    488                     info!(target: "worker", "|| {bounced} in {txid}: {reason}");
    489                 } else {
    490                     info!(target: "worker", "|| {bounced} in {txid}");
    491                 }
    492             }
    493             Err(err) => match err {
    494                 rpc::Error::RPC {
    495                     code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError,
    496                     msg: _,
    497                 } => {
    498                     warn!(target: "worker", "{err}");
    499                     return Ok(false);
    500                 }
    501                 e => Err(e)?,
    502             },
    503         }
    504         Ok(true)
    505     } else {
    506         Ok(false)
    507     }
    508 }