depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

worker.rs (22018B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2022-2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 use std::{fmt::Write, time::SystemTime};
     17 
     18 use bitcoin::{Amount as BtcAmount, Txid, hashes::Hash};
     19 use depolymerizer_common::metadata::OutMetadata;
     20 use jiff::Timestamp;
     21 use sqlx::{
     22     Acquire, Either, PgConnection, PgPool,
     23     postgres::{PgAdvisoryLock, PgAdvisoryLockKey, PgListener},
     24 };
     25 use taler_api::subject::IncomingSubject;
     26 use taler_common::{ExpoBackoffDecorr, api::ShortHashCode, db::IncomingType};
     27 use tokio::time::sleep;
     28 use tracing::{debug, error, info, trace, warn};
     29 
     30 use super::{LoopError, LoopResult, analysis::analysis};
     31 use crate::{
     32     GetOpReturnErr,
     33     config::WorkerCfg,
     34     db::{self, AddIncomingResult, SyncOutResult, TxOutKind},
     35     fail_point::fail_point,
     36     rpc::{self, Category, ErrorCode, ListSinceBlock, ListTransaction, Rpc, rpc_wallet},
     37     rpc_utils::sender_address,
     38     taler_utils::btc_to_taler,
     39 };
     40 
     41 pub async fn worker_loop(mut state: WorkerCfg, pool: PgPool) {
     42     let mut jitter = ExpoBackoffDecorr::default();
     43     let mut lifetime = state.lifetime;
     44     let mut status = true;
     45     let mut skip_notification = true;
     46 
     47     loop {
     48         let result: LoopResult<()> = async {
     49             // Connect
     50             let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg).await?;
     51             let db = &mut PgListener::connect_with(&pool).await?;
     52 
     53             // Listen to all channels
     54             db.listen_all(["new_block", "transfer"]).await?;
     55 
     56             loop {
     57                 // Wait for the next notification
     58                 {
     59                     let ntf = db.next_buffered();
     60                     if let Some(ntf) = &ntf {
     61                         trace!(target: "worker", "notification from {}", ntf.channel())
     62                     }
     63                     if !skip_notification && ntf.is_none() {
     64                         debug!(target: "worker", "waiting for notifications");
     65                         // Block until next notification
     66                         if let Some(ntf) = db.try_recv().await? {
     67                             trace!(target: "worker", "notification from {}", ntf.channel())
     68                         }
     69                     }
     70                     // Conflate all notifications
     71                     while let Some(ntf) = db.next_buffered() {
     72                         trace!(target: "worker", "notification from {}", ntf.channel())
     73                     }
     74                 }
     75 
     76                 // Check lifetime
     77                 if let Some(nb) = lifetime.as_mut() {
     78                     if *nb == 0 {
     79                         info!(target: "worker", "Reach end of lifetime");
     80                         return Ok(());
     81                     } else {
     82                         *nb -= 1;
     83                     }
     84                 }
     85 
     86                 debug!(target: "worker", "syncing blockchain");
     87 
     88                 let mut db = db.acquire().await?;
     89 
     90                 // It is not possible to atomically update the blockchain and the database.
     91                 // When we failed to sync the database and the blockchain state we rely on
     92                 // sync_chain to recover the lost updates.
     93                 // When this function is running concurrently, it not possible to known another
     94                 // execution has failed, and this can lead to a transaction being sent multiple time.
     95                 // To ensure only a single version of this function is running at a given time we rely
     96                 // on postgres advisory lock
     97 
     98                 // Take the lock
     99                 let lock = PgAdvisoryLock::with_key(PgAdvisoryLockKey::BigInt(42));
    100                 let Either::Left(mut lock) = lock.try_acquire(&mut db).await? else {
    101                     return Err(LoopError::Concurrency);
    102                 };
    103 
    104                 // Perform analysis
    105                 state.conf = analysis(rpc, state.conf, state.max_conf).await?;
    106 
    107                 worker_step(rpc, lock.as_mut(), &mut state, &mut status).await?;
    108 
    109                 skip_notification = false;
    110                 jitter.reset();
    111             }
    112         }
    113         .await;
    114         if let Err(e) = result {
    115             error!(target: "worker", "{e}");
    116             // When we catch an error, we sometimes want to retry immediately (eg. reconnect to RPC or DB).
    117             // Bitcoin error codes are generic. We need to match the msg to get precise ones. Some errors
    118             // can resolve themselves when a new block is mined (new fees, new transactions). Our simple
    119             // approach is to wait for the next loop when an RPC error is caught to prevent endless logged errors.
    120             skip_notification = match e {
    121                 LoopError::DB(_) | LoopError::Injected(_) | LoopError::Concurrency => true,
    122                 LoopError::Rpc(e) => match e {
    123                     rpc::Error::Transport(_) | rpc::Error::Connect(_) => true,
    124                     rpc::Error::RPC { code, .. } => code == ErrorCode::RpcWalletError,
    125                     rpc::Error::Bitcoin(_) | rpc::Error::Json { .. } | rpc::Error::Null => false,
    126                 },
    127             };
    128             sleep(jitter.backoff()).await;
    129         } else {
    130             return;
    131         }
    132     }
    133 }
    134 
    135 pub async fn worker_transient(mut state: WorkerCfg, pool: PgPool) -> LoopResult<()> {
    136     let mut status = true;
    137 
    138     // Connect
    139     let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg).await?;
    140     let mut db = pool.acquire().await?;
    141 
    142     // It is not possible to atomically update the blockchain and the database.
    143     // When we failed to sync the database and the blockchain state we rely on
    144     // sync_chain to recover the lost updates.
    145     // When this function is running concurrently, it not possible to known another
    146     // execution has failed, and this can lead to a transaction being sent multiple time.
    147     // To ensure only a single version of this function is running at a given time we rely
    148     // on postgres advisory lock
    149 
    150     // Take the lock
    151     let lock = PgAdvisoryLock::with_key(PgAdvisoryLockKey::BigInt(42));
    152     let Either::Left(mut lock) = lock.try_acquire(&mut db).await? else {
    153         return Err(LoopError::Concurrency);
    154     };
    155 
    156     worker_step(rpc, lock.as_mut(), &mut state, &mut status).await?;
    157     Ok(())
    158 }
    159 
    160 /// Synchronize local db with blockchain and perform transactions
    161 async fn worker_step(
    162     rpc: &mut Rpc,
    163     db: &mut PgConnection,
    164     state: &mut WorkerCfg,
    165     status: &mut bool,
    166 ) -> LoopResult<()> {
    167     // Sync chain
    168     if let Some(stuck) = sync_chain(rpc, db, state, status).await? {
    169         // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent
    170 
    171         // Send requested debits
    172         while debit(db, rpc, state).await? {}
    173 
    174         // Bump stuck transactions
    175         for (txid, wtid) in stuck {
    176             let bump = rpc.bump_fee(&txid).await?;
    177             fail_point("(injected) fail bump", 0.3)?;
    178             db::bump_tx_id(&mut *db, &bump.txid, &wtid).await?;
    179             info!(target: "worker", ">> (bump) {wtid} {txid} -> {}", bump.txid);
    180         }
    181 
    182         // Send requested bounce
    183         while bounce(db, rpc, &state.bounce_fee).await? {}
    184     }
    185     Ok(())
    186 }
    187 
    188 /// Parse new transactions, return stuck transactions if the database is up to date with the latest mined block
    189 async fn sync_chain(
    190     rpc: &mut Rpc,
    191     db: &mut PgConnection,
    192     state: &WorkerCfg,
    193     status: &mut bool,
    194 ) -> LoopResult<Option<Vec<(Txid, ShortHashCode)>>> {
    195     // Get stored last_hash
    196     let sync_state = db::get_sync_state(&mut *db).await?;
    197 
    198     // Get all transactions made since this block
    199     let ListSinceBlock {
    200         mut transactions,
    201         mut removed,
    202         lastblock,
    203     } = rpc.list_since_block(Some(&sync_state), state.conf).await?;
    204     transactions.sort_unstable_by_key(|it| (it.confirmations, it.txid));
    205     transactions.dedup_by_key(|it| it.txid);
    206     removed.sort_unstable_by_key(|it| (it.confirmations, it.txid));
    207     removed.dedup_by_key(|it| it.txid);
    208 
    209     // Check if a confirmed incoming transaction have been removed by a blockchain reorganization
    210     let conflict = sync_chain_removed(removed, db, state.conf as i32).await?;
    211 
    212     // Sync server status with database
    213     let new_status = !conflict.stop_server();
    214     if *status != new_status {
    215         db::update_status(db, new_status).await?;
    216         *status = new_status;
    217         if new_status {
    218             info!(target: "worker", "Recovered lost transactions");
    219         }
    220     }
    221 
    222     if conflict.stop_worker() {
    223         return Ok(None);
    224     }
    225 
    226     let mut stuck = vec![];
    227 
    228     for tx in transactions {
    229         match tx.category {
    230             Category::Send => {
    231                 if let Some(wtid) =
    232                     sync_chain_outgoing(&tx.txid, tx.confirmations, rpc, db, state).await?
    233                 {
    234                     stuck.push((tx.txid, wtid));
    235                 }
    236             }
    237             Category::Receive if tx.confirmations >= state.conf as i32 => {
    238                 sync_chain_incoming_confirmed(&tx.txid, rpc, db, state).await?
    239             }
    240             _ => {
    241                 // Ignore coinbase and unconfirmed send transactions
    242             }
    243         }
    244     }
    245 
    246     // Move last_hash forward
    247     db::swap_sync_state(db, &sync_state, &lastblock).await?;
    248 
    249     Ok(Some(stuck))
    250 }
    251 
    252 #[derive(Debug, Clone, Copy)]
    253 enum ReorgConflict {
    254     BackingCompromised,
    255     IncomingCompromised,
    256     Ok,
    257 }
    258 
    259 impl ReorgConflict {
    260     pub fn stop_server(&self) -> bool {
    261         matches!(self, Self::BackingCompromised)
    262     }
    263 
    264     pub fn stop_worker(&self) -> bool {
    265         matches!(self, Self::BackingCompromised | Self::IncomingCompromised)
    266     }
    267 }
    268 
    269 /// Sync database with removed transactions, return false if bitcoin backing is compromised
    270 async fn sync_chain_removed(
    271     removed: Vec<ListTransaction>,
    272     db: &mut PgConnection,
    273     min_confirmations: i32,
    274 ) -> LoopResult<ReorgConflict> {
    275     let potential_problematic_ids: Vec<Txid> = removed
    276         .into_iter()
    277         .filter_map(|tx| {
    278             (tx.category == Category::Receive && tx.confirmations < min_confirmations)
    279                 .then_some(tx.txid)
    280         })
    281         .collect();
    282 
    283     // Only keep incoming transaction that are not reconfirmed
    284     let problematic_tx = db::reorg(&mut *db, &potential_problematic_ids).await?;
    285     if problematic_tx.is_empty() {
    286         return Ok(ReorgConflict::Ok);
    287     }
    288     // Bitcoin backing can be compromised in only two cases:
    289     // - a confirmed reserve
    290     // - a confirmed bounced
    291 
    292     // TODO use partition_in_place when stable
    293     let (compromise, problematic): (Vec<_>, Vec<_>) =
    294         problematic_tx.iter().partition(|it| match it {
    295             db::ProblematicTx::Taler { ty, .. } => *ty == IncomingType::reserve,
    296             db::ProblematicTx::Bounce { .. } => true,
    297             db::ProblematicTx::Simple { .. } => false,
    298         });
    299     let mut buf = "The following transaction have been removed from the blockchain, ".to_string();
    300     let (txs, state) = if compromise.is_empty() {
    301         buf.push_str("waiting until they reappear:");
    302         (problematic, ReorgConflict::IncomingCompromised)
    303     } else {
    304         buf.push_str("bitcoin backing is compromised until they reappear:");
    305         (compromise, ReorgConflict::BackingCompromised)
    306     };
    307     for tx in txs {
    308         match tx {
    309             db::ProblematicTx::Taler {
    310                 txid,
    311                 addr,
    312                 ty,
    313                 metadata,
    314             } => {
    315                 write!(&mut buf, "\n\t{txid} {ty} {metadata} from {addr}",).unwrap();
    316             }
    317             db::ProblematicTx::Bounce { txid, bounced_in } => {
    318                 write!(&mut buf, "\n\t{txid} bounced in {bounced_in}").unwrap();
    319             }
    320             db::ProblematicTx::Simple { txid } => {
    321                 write!(&mut buf, "\n\t{txid}").unwrap();
    322             }
    323         }
    324     }
    325     error!(target: "worker", "{buf}");
    326     Ok(state)
    327 }
    328 
    329 /// Sync database with an incoming confirmed transaction
    330 async fn sync_chain_incoming_confirmed(
    331     txid: &Txid,
    332     rpc: &mut Rpc,
    333     db: &mut PgConnection,
    334     state: &WorkerCfg,
    335 ) -> Result<(), LoopError> {
    336     let (tx, metadata) = rpc.get_tx_segwit_key(txid).await?;
    337     // Store transactions in database
    338     let debit_addr = sender_address(rpc, &tx).await?;
    339     let amount = btc_to_taler(&tx.amount, &state.currency);
    340     let time = Timestamp::from_second(tx.time as i64).unwrap();
    341     let ty = IncomingType::reserve;
    342     match metadata {
    343         Ok(reserve_pub) => {
    344             match db::register_tx_in(
    345                 &mut *db,
    346                 txid,
    347                 &amount,
    348                 &debit_addr,
    349                 &Timestamp::from_second(tx.time as i64).unwrap(),
    350                 &Some(IncomingSubject::Reserve(reserve_pub.clone())),
    351             )
    352             .await?
    353             {
    354                 AddIncomingResult::Success {
    355                     new,
    356                     row_id: _,
    357                     valued_at: _,
    358                     pending: _,
    359                 } => {
    360                     if new {
    361                         info!(target: "worker", "<< {ty} {reserve_pub} {txid} {debit_addr} {amount}");
    362                     }
    363                 }
    364                 AddIncomingResult::ReservePubReuse => {
    365                     db::bounce(db, txid, &amount, &debit_addr, &time, "reserve_pub reuse").await?
    366                 }
    367                 AddIncomingResult::UnknownMapping => todo!(),
    368                 AddIncomingResult::MappingReuse => todo!(),
    369             }
    370         }
    371         Err(e) => db::bounce(db, txid, &amount, &debit_addr, &time, &e.to_string()).await?,
    372     }
    373     Ok(())
    374 }
    375 
    376 /// Sync database with an outgoing transaction, return true if stuck
    377 async fn sync_chain_outgoing(
    378     txid: &Txid,
    379     confirmations: i32,
    380     rpc: &mut Rpc,
    381     db: &mut PgConnection,
    382     state: &WorkerCfg,
    383 ) -> LoopResult<Option<ShortHashCode>> {
    384     match rpc
    385         .get_tx_op_return(txid)
    386         .await
    387         .map(|(tx, bytes)| (tx, OutMetadata::decode(&bytes)))
    388     {
    389         Ok((tx, Ok(info))) => {
    390             let credit_addr = tx.details[0].address.clone().unwrap().assume_checked();
    391             let amount = btc_to_taler(&tx.amount, &state.currency);
    392             let created_at = Timestamp::from_second(tx.time as i64).unwrap();
    393             match info {
    394                 OutMetadata::Debit {
    395                     wtid,
    396                     url,
    397                     metadata,
    398                 } => {
    399                     if confirmations < 0 {
    400                         // Handle conflicting tx
    401                         if tx.replaced_by_txid.is_none() && db::transfer_conflict(db, txid).await? {
    402                             warn!(target: "worker", ">> (conflict) {wtid} {txid} {credit_addr} {amount}");
    403                         }
    404                     } else if confirmations > state.conf as i32 {
    405                         match db::sync_out(
    406                             db,
    407                             txid,
    408                             tx.replaced_by_txid.as_ref(),
    409                             &amount,
    410                             &credit_addr,
    411                             &TxOutKind::Talerable {
    412                                 wtid: &wtid,
    413                                 url: &url,
    414                                 metadata: metadata.as_deref(),
    415                             },
    416                             &created_at,
    417                         )
    418                         .await?
    419                         {
    420                             SyncOutResult::New => {
    421                                 info!(target: "worker", ">> (onchain) {wtid} {txid} {credit_addr} {amount}");
    422                             }
    423                             SyncOutResult::Replaced => {
    424                                 info!(
    425                                     target: "worker",
    426                                     ">> (recovered) {wtid} {txid} -> {} {credit_addr} {amount}",
    427                                     tx.replaced_by_txid.unwrap()
    428                                 )
    429                             }
    430                             SyncOutResult::Recovered => {
    431                                 warn!(target: "worker", ">> (recovered) {wtid} {txid} {credit_addr} {amount}")
    432                             }
    433                             SyncOutResult::None => {}
    434                         }
    435                     } else {
    436                         // TODO sync transfer sent ?
    437 
    438                         // Check if stuck
    439                         if let Some(delay) = state.bump_delay
    440                             && confirmations == 0
    441                             && tx.replaced_by_txid.is_none()
    442                         {
    443                             let now = SystemTime::now()
    444                                 .duration_since(SystemTime::UNIX_EPOCH)
    445                                 .unwrap()
    446                                 .as_secs();
    447                             if now - tx.time > delay as u64 {
    448                                 return Ok(Some(wtid));
    449                             }
    450                         }
    451                     }
    452                 }
    453                 OutMetadata::Bounce { bounced } => {
    454                     let bounced = Txid::from_byte_array(bounced);
    455                     if confirmations < 0 {
    456                         // Handle conflicting tx
    457                         if db::bounce_conflict(db, txid).await? {
    458                             warn!(target: "worker", "|| (conflict) {bounced} {txid}");
    459                         }
    460                     } else if confirmations > state.conf as i32 {
    461                         match db::sync_out(
    462                             db,
    463                             txid,
    464                             tx.replaces_txid.as_ref(),
    465                             &amount,
    466                             &credit_addr,
    467                             &TxOutKind::Bounce(bounced),
    468                             &created_at,
    469                         )
    470                         .await?
    471                         {
    472                             SyncOutResult::New => {
    473                                 info!(target: "worker", "|| (onchain) {bounced} {txid}")
    474                             }
    475                             SyncOutResult::Replaced => {
    476                                 info!(
    477                                     target: "worker",
    478                                     "|| (recovered) {bounced} {txid} -> {}",
    479                                     tx.replaced_by_txid.unwrap()
    480                                 )
    481                             }
    482                             SyncOutResult::Recovered => {
    483                                 warn!(target: "worker", "|| (recovered) {bounced} {txid}")
    484                             }
    485                             SyncOutResult::None => {}
    486                         }
    487                     } else {
    488                         // TODO sync bounce sent ?
    489                     }
    490                 }
    491             }
    492         }
    493         Ok((_, Err(e))) => warn!(target: "worker", "send: decode-info {txid} - {e}"),
    494         Err(e) => match e {
    495             GetOpReturnErr::MissingOpReturn => { /* Ignore */ }
    496             GetOpReturnErr::RPC(e) => return Err(e)?,
    497         },
    498     }
    499     Ok(None)
    500 }
    501 
    502 /// Send a debit transaction on the blockchain, return false if no more requested transactions are found
    503 async fn debit(db: &mut PgConnection, rpc: &mut Rpc, state: &WorkerCfg) -> LoopResult<bool> {
    504     // We rely on the advisory lock to ensure we are the only one sending transactions
    505     if let Some((id, amount, wtid, addr, url, metadata)) =
    506         db::pending_transfer(&mut *db, &state.currency).await?
    507     {
    508         let metadata = OutMetadata::Debit {
    509             wtid: wtid.clone(),
    510             url,
    511             metadata,
    512         };
    513 
    514         let txid = rpc
    515             .send(&addr, &amount, Some(&metadata.encode().unwrap()), false)
    516             .await?;
    517         fail_point("(injected) fail debit", 0.3)?;
    518         db::transfer_sent(db, id, &txid).await?;
    519         let amount = btc_to_taler(&amount.to_signed().unwrap(), &state.currency);
    520         info!(target: "worker", ">> (sent) {wtid} {txid} {addr} {amount}");
    521         Ok(true)
    522     } else {
    523         Ok(false)
    524     }
    525 }
    526 
    527 /// Bounce a transaction on the blockchain, return false if no more requested transactions are found
    528 async fn bounce(db: &mut PgConnection, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> {
    529     // We rely on the advisory lock to ensure we are the only one sending transactions
    530     if let Some((id, bounced, reason)) = db::pending_bounce(&mut *db).await? {
    531         let metadata = OutMetadata::Bounce {
    532             bounced: *bounced.as_byte_array(),
    533         };
    534 
    535         match rpc
    536             .bounce(&bounced, fee, Some(&metadata.encode().unwrap()))
    537             .await
    538         {
    539             Ok(txid) => {
    540                 fail_point("(injected) fail bounce", 0.3)?;
    541                 db::bounce_sent(db, id, &txid).await?;
    542                 if let Some(reason) = reason {
    543                     info!(target: "worker", "|| (sent) {bounced} {txid}: {reason}");
    544                 } else {
    545                     info!(target: "worker", "|| (sent) {bounced} {txid}");
    546                 }
    547             }
    548             Err(err) => match err {
    549                 rpc::Error::RPC {
    550                     code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError,
    551                     ..
    552                 } => {
    553                     warn!(target: "worker", "{err}");
    554                     return Ok(false);
    555                 }
    556                 e => Err(e)?,
    557             },
    558         }
    559         Ok(true)
    560     } else {
    561         Ok(false)
    562     }
    563 }