worker.rs (18398B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2022-2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 use std::{fmt::Write, time::SystemTime}; 17 18 use bitcoin::{Amount as BtcAmount, Txid, hashes::Hash}; 19 use depolymerizer_common::{metadata::OutMetadata, status::BounceStatus}; 20 use sqlx::{PgPool, postgres::PgListener}; 21 use taler_common::{ExpoBackoffDecorr, api_common::ShortHashCode, types::timestamp::Timestamp}; 22 use tokio::time::sleep; 23 use tracing::{debug, error, info, trace, warn}; 24 use url::Url; 25 26 use crate::{ 27 GetOpReturnErr, GetSegwitErr, 28 config::WorkerCfg, 29 db::{self, AddIncomingResult, SyncBounceResult, SyncOutResult, worker_lock}, 30 fail_point::fail_point, 31 rpc::{ 32 self, Category, ErrorCode, ListSinceBlock, ListTransaction, Rpc, Transaction, rpc_wallet, 33 }, 34 rpc_utils::sender_address, 35 taler_utils::btc_to_taler, 36 }; 37 38 use super::{LoopError, LoopResult, analysis::analysis}; 39 40 pub async fn worker_loop(mut state: WorkerCfg, pool: PgPool) { 41 let mut jitter = ExpoBackoffDecorr::default(); 42 let mut lifetime = state.lifetime; 43 let mut status = true; 44 let mut skip_notification = true; 45 46 loop { 47 let result: LoopResult<()> = async { 48 // Connect 49 let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg).await?; 50 let db = &mut PgListener::connect_with(&pool).await?; 51 52 // It is not possible to atomically update the blockchain and the database. 53 // When we failed to sync the database and the blockchain state we rely on 54 // sync_chain to recover the lost updates. 55 // When this function is running concurrently, it not possible to known another 56 // execution has failed, and this can lead to a transaction being sent multiple time. 57 // To ensure only a single version of this function is running at a given time we rely 58 // on postgres advisory lock 59 60 // Take the lock 61 if !worker_lock(&mut *db).await? { 62 return Err(LoopError::Concurrency); 63 } 64 65 // Listen to all channels 66 db.listen_all(["new_block", "taler_out"]).await?; 67 68 loop { 69 // Wait for the next notification 70 { 71 let ntf = db.next_buffered(); 72 if let Some(ntf) = &ntf { 73 trace!(target: "worker", "notification from {}", ntf.channel()) 74 } 75 if !skip_notification && ntf.is_none() { 76 debug!(target: "worker", "waiting for notifications"); 77 // Block until next notification 78 if let Some(ntf) = db.try_recv().await? { 79 trace!(target: "worker", "notification from {}", ntf.channel()) 80 } 81 } 82 // Conflate all notifications 83 while let Some(ntf) = db.next_buffered() { 84 trace!(target: "worker", "notification from {}", ntf.channel()) 85 } 86 } 87 88 // Check lifetime 89 if let Some(nb) = lifetime.as_mut() { 90 if *nb == 0 { 91 info!(target: "worker", "Reach end of lifetime"); 92 return Ok(()); 93 } else { 94 *nb -= 1; 95 } 96 } 97 98 debug!(target: "worker", "syncing blockchain"); 99 100 // Perform analysis 101 state.confirmation = 102 analysis(rpc, state.confirmation, state.max_confirmation).await?; 103 104 worker_step(rpc, db, &mut state, &mut status).await?; 105 106 skip_notification = false; 107 jitter.reset(); 108 } 109 } 110 .await; 111 if let Err(e) = result { 112 error!(target: "worker", "{e}"); 113 // When we catch an error, we sometimes want to retry immediately (eg. reconnect to RPC or DB). 114 // Bitcoin error codes are generic. We need to match the msg to get precise ones. Some errors 115 // can resolve themselves when a new block is mined (new fees, new transactions). Our simple 116 // approach is to wait for the next loop when an RPC error is caught to prevent endless logged errors. 117 skip_notification = matches!( 118 e, 119 LoopError::Rpc(rpc::Error::Transport(_)) 120 | LoopError::DB(_) 121 | LoopError::Injected(_) 122 | LoopError::Concurrency 123 ); 124 sleep(jitter.backoff()).await; 125 } else { 126 return; 127 } 128 } 129 } 130 131 pub async fn worker_transient(mut state: WorkerCfg, pool: PgPool) -> LoopResult<()> { 132 let mut status = true; 133 134 // Connect 135 let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg).await?; 136 let db = &mut PgListener::connect_with(&pool).await?; 137 138 // It is not possible to atomically update the blockchain and the database. 139 // When we failed to sync the database and the blockchain state we rely on 140 // sync_chain to recover the lost updates. 141 // When this function is running concurrently, it not possible to known another 142 // execution has failed, and this can lead to a transaction being sent multiple time. 143 // To ensure only a single version of this function is running at a given time we rely 144 // on postgres advisory lock 145 146 // Take the lock 147 if !worker_lock(&mut *db).await? { 148 return Err(LoopError::Concurrency); 149 } 150 151 worker_step(rpc, db, &mut state, &mut status).await?; 152 Ok(()) 153 } 154 155 /// Synchronize local db with blockchain and perform transactions 156 async fn worker_step( 157 rpc: &mut Rpc, 158 db: &mut PgListener, 159 state: &mut WorkerCfg, 160 status: &mut bool, 161 ) -> LoopResult<()> { 162 // Sync chain 163 if let Some(stuck) = sync_chain(rpc, db, state, status).await? { 164 // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent 165 166 // Send requested debits 167 while debit(db, rpc, state).await? {} 168 169 // Bump stuck transactions 170 for id in stuck { 171 let bump = rpc.bump_fee(&id).await?; 172 fail_point("(injected) fail bump", 0.3)?; 173 let wtid = db::bump_tx_id(&mut *db, &id, &bump.txid).await?; 174 info!(target: "worker", ">> (bump) {wtid} replace {id} with {}", bump.txid); 175 } 176 177 // Send requested bounce 178 while bounce(db, rpc, &state.bounce_fee).await? {} 179 } 180 Ok(()) 181 } 182 183 /// Parse new transactions, return stuck transactions if the database is up to date with the latest mined block 184 async fn sync_chain( 185 rpc: &mut Rpc, 186 db: &mut PgListener, 187 state: &WorkerCfg, 188 status: &mut bool, 189 ) -> LoopResult<Option<Vec<Txid>>> { 190 // Get stored last_hash 191 let sync_state = db::get_sync_state(&mut *db).await?; 192 // Get the current confirmation delay 193 let conf_delay = state.confirmation; 194 195 // Get all transactions made since this block 196 let ListSinceBlock { 197 transactions, 198 removed, 199 lastblock, 200 } = rpc.list_since_block(Some(&sync_state), conf_delay).await?; 201 202 // Check if a confirmed incoming transaction have been removed by a blockchain reorganization 203 let new_status = sync_chain_removed(removed, db, conf_delay as i32).await?; 204 205 // Sync status with database 206 if *status != new_status { 207 db::update_status(db, new_status).await?; 208 *status = new_status; 209 if new_status { 210 info!(target: "worker", "Recovered lost transactions"); 211 } 212 } 213 if !new_status { 214 return Ok(None); 215 } 216 217 let mut stuck = vec![]; 218 219 for tx in transactions { 220 match tx.category { 221 Category::Send => { 222 if sync_chain_outgoing(&tx.txid, tx.confirmations, rpc, db, state).await? { 223 stuck.push(tx.txid); 224 } 225 } 226 Category::Receive if tx.confirmations >= conf_delay as i32 => { 227 sync_chain_incoming_confirmed(&tx.txid, rpc, db, state).await? 228 } 229 _ => { 230 // Ignore coinbase and unconfirmed send transactions 231 } 232 } 233 } 234 235 // Move last_hash forward 236 db::swap_sync_state(db, &sync_state, &lastblock).await?; 237 238 Ok(Some(stuck)) 239 } 240 241 /// Sync database with removed transactions, return false if bitcoin backing is compromised 242 async fn sync_chain_removed( 243 removed: Vec<ListTransaction>, 244 db: &mut PgListener, 245 min_confirmations: i32, 246 ) -> LoopResult<bool> { 247 // A removed incoming transaction is a correctness issues in only two cases: 248 // - it is a confirmed credit registered in the database 249 // - it is an invalid transactions already bounced 250 // Those two cases can compromise bitcoin backing 251 // Removed outgoing transactions will be retried automatically by the node 252 253 let potential_problematic_ids: Vec<Txid> = removed 254 .into_iter() 255 .filter_map(|tx| { 256 (tx.category == Category::Receive && tx.confirmations < min_confirmations) 257 .then_some(tx.txid) 258 }) 259 .collect(); 260 261 // Only keep incoming transaction that are not reconfirmed 262 let problematic_tx = db::reorg(&mut *db, &potential_problematic_ids).await?; 263 if problematic_tx.is_empty() { 264 return Ok(true); 265 } 266 let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string(); 267 for tx in problematic_tx { 268 match tx { 269 db::ProblematicTx::In { 270 txid, 271 addr, 272 reserve_pub, 273 } => { 274 write!(&mut buf, "\n\tcredit {reserve_pub} in {txid} from {addr}",).unwrap(); 275 } 276 db::ProblematicTx::Bounce { txid, bounced } => { 277 write!(&mut buf, "\n\tbounced {txid} in {bounced}").unwrap(); 278 } 279 } 280 } 281 error!(target: "worker", "{buf}"); 282 Ok(false) 283 } 284 285 /// Sync database with an incoming confirmed transaction 286 async fn sync_chain_incoming_confirmed( 287 id: &Txid, 288 rpc: &mut Rpc, 289 db: &mut PgListener, 290 state: &WorkerCfg, 291 ) -> Result<(), LoopError> { 292 match rpc.get_tx_segwit_key(id).await { 293 Ok((full, reserve_pub)) => { 294 // Store transactions in database 295 let debit_addr = sender_address(rpc, &full).await?; 296 let amount = btc_to_taler(&full.amount, &state.currency); 297 match db::register_tx_in( 298 &mut *db, 299 id, 300 &amount, 301 &debit_addr, 302 &Timestamp::from_sql_micros((full.time * 1000000) as i64).unwrap(), 303 &reserve_pub, 304 ) 305 .await? 306 { 307 AddIncomingResult::Success { 308 new, 309 row_id: _, 310 valued_at: _, 311 } => { 312 if new { 313 info!(target: "worker", "<< {amount} {reserve_pub} in {id} from {debit_addr}"); 314 } 315 } 316 AddIncomingResult::ReservePubReuse => { 317 db::bounce(db, id, "reserve_pub reuse").await? 318 } 319 } 320 } 321 Err(err) => match err { 322 GetSegwitErr::Decode(e) => db::bounce(db, id, &e.to_string()).await?, 323 GetSegwitErr::RPC(e) => return Err(e.into()), 324 }, 325 } 326 Ok(()) 327 } 328 329 /// Sync database with a debit transaction, return true if stuck 330 async fn sync_chain_debit( 331 txid: &Txid, 332 full: &Transaction, 333 wtid: &ShortHashCode, 334 exchange_url: &Url, 335 db: &mut PgListener, 336 confirmations: i32, 337 state: &WorkerCfg, 338 ) -> LoopResult<bool> { 339 let credit_addr = full.details[0].address.clone().unwrap().assume_checked(); 340 let amount = btc_to_taler(&full.amount, &state.currency); 341 342 if confirmations < 0 { 343 // Handle conflicting tx 344 if full.replaced_by_txid.is_none() && db::conflict_tx_out(db, txid).await? { 345 warn!(target: "worker", ">> (conflict) {wtid} in {txid} to {credit_addr}"); 346 } 347 } else { 348 match db::sync_out( 349 db, 350 txid, 351 full.replaced_by_txid.as_ref(), 352 &amount, 353 exchange_url, 354 &credit_addr, 355 wtid, 356 &Timestamp::from_sql_micros((full.time * 1000000) as i64).unwrap(), 357 ) 358 .await? 359 { 360 SyncOutResult::New => { 361 warn!(target: "worker", ">> (onchain) {amount} {wtid} in {txid} to {credit_addr}"); 362 } 363 SyncOutResult::Replaced => { 364 info!( 365 target: "worker", 366 ">> (recovered) {wtid} replace {txid} with {}", 367 full.replaced_by_txid.unwrap() 368 ) 369 } 370 SyncOutResult::Recovered => { 371 warn!(target: "worker", ">> (recovered) {amount} {wtid} in {txid} to {credit_addr}") 372 } 373 SyncOutResult::None => {} 374 } 375 376 // Check if stuck 377 if let Some(delay) = state.bump_delay 378 && confirmations == 0 379 && full.replaced_by_txid.is_none() 380 { 381 let now = SystemTime::now() 382 .duration_since(SystemTime::UNIX_EPOCH) 383 .unwrap() 384 .as_secs(); 385 if now - full.time > delay as u64 { 386 return Ok(true); 387 } 388 } 389 } 390 Ok(false) 391 } 392 393 /// Sync database with an outgoing bounce transaction 394 async fn sync_chain_bounce( 395 txid: &Txid, 396 bounced: &Txid, 397 db: &mut PgListener, 398 confirmations: i32, 399 ) -> LoopResult<()> { 400 if confirmations < 0 { 401 // Handle conflicting tx 402 if db::conflict_bounce(db, txid).await? { 403 warn!(target: "worker", "|| (conflict) {bounced} in {txid}"); 404 } 405 } else { 406 match db::sync_bounce(db, txid, bounced, &Timestamp::now()).await? { 407 SyncBounceResult::New => warn!(target: "worker", "|| (onchain) {bounced} in {txid}"), 408 SyncBounceResult::Recovered => { 409 warn!(target: "worker", "|| (recovered) {bounced} in {txid}") 410 } 411 SyncBounceResult::None => {} 412 } 413 } 414 415 Ok(()) 416 } 417 418 /// Sync database with an outgoing transaction, return true if stuck 419 async fn sync_chain_outgoing( 420 id: &Txid, 421 confirmations: i32, 422 rpc: &mut Rpc, 423 db: &mut PgListener, 424 state: &WorkerCfg, 425 ) -> LoopResult<bool> { 426 match rpc 427 .get_tx_op_return(id) 428 .await 429 .map(|(full, bytes)| (full, OutMetadata::decode(&bytes))) 430 { 431 Ok((full, Ok(info))) => match info { 432 OutMetadata::Debit { wtid, url } => { 433 return sync_chain_debit(id, &full, &wtid, &url, db, confirmations, state).await; 434 } 435 OutMetadata::Bounce { bounced } => { 436 sync_chain_bounce(id, &Txid::from_byte_array(bounced), db, confirmations).await? 437 } 438 }, 439 Ok((_, Err(e))) => warn!(target: "worker", "send: decode-info {id} - {e}"), 440 Err(e) => match e { 441 GetOpReturnErr::MissingOpReturn => { /* Ignore */ } 442 GetOpReturnErr::RPC(e) => return Err(e)?, 443 }, 444 } 445 Ok(false) 446 } 447 448 /// Send a debit transaction on the blockchain, return false if no more requested transactions are found 449 async fn debit(db: &mut PgListener, rpc: &mut Rpc, state: &WorkerCfg) -> LoopResult<bool> { 450 // We rely on the advisory lock to ensure we are the only one sending transactions 451 if let Some((id, amount, wtid, addr, url)) = 452 db::pending_debit(&mut *db, &state.currency).await? 453 { 454 let metadata = OutMetadata::Debit { 455 wtid: wtid.clone(), 456 url, 457 }; 458 459 let txid = rpc 460 .send(&addr, &amount, Some(&metadata.encode().unwrap()), false) 461 .await?; 462 fail_point("(injected) fail debit", 0.3)?; 463 db::debit_sent(db, id, &txid).await?; 464 let amount = btc_to_taler(&amount.to_signed().unwrap(), &state.currency); 465 info!(target: "worker", ">> {amount} {wtid} in {txid} to {addr}"); 466 Ok(true) 467 } else { 468 Ok(false) 469 } 470 } 471 472 /// Bounce a transaction on the blockchain, return false if no more requested transactions are found 473 async fn bounce(db: &mut PgListener, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> { 474 // We rely on the advisory lock to ensure we are the only one sending transactions 475 if let Some((id, bounced, reason)) = db::pending_bounce(&mut *db).await? { 476 let metadata = OutMetadata::Bounce { 477 bounced: *bounced.as_byte_array(), 478 }; 479 480 match rpc 481 .bounce(&bounced, fee, Some(&metadata.encode().unwrap())) 482 .await 483 { 484 Ok(txid) => { 485 fail_point("(injected) fail bounce", 0.3)?; 486 db::bounce_set_status(db, id, Some(&txid), &BounceStatus::sent).await?; 487 if let Some(reason) = reason { 488 info!(target: "worker", "|| {bounced} in {txid}: {reason}"); 489 } else { 490 info!(target: "worker", "|| {bounced} in {txid}"); 491 } 492 } 493 Err(err) => match err { 494 rpc::Error::RPC { 495 code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, 496 msg: _, 497 } => { 498 warn!(target: "worker", "{err}"); 499 return Ok(false); 500 } 501 e => Err(e)?, 502 }, 503 } 504 Ok(true) 505 } else { 506 Ok(false) 507 } 508 }