worker.rs (22018B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2022-2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 use std::{fmt::Write, time::SystemTime}; 17 18 use bitcoin::{Amount as BtcAmount, Txid, hashes::Hash}; 19 use depolymerizer_common::metadata::OutMetadata; 20 use jiff::Timestamp; 21 use sqlx::{ 22 Acquire, Either, PgConnection, PgPool, 23 postgres::{PgAdvisoryLock, PgAdvisoryLockKey, PgListener}, 24 }; 25 use taler_api::subject::IncomingSubject; 26 use taler_common::{ExpoBackoffDecorr, api::ShortHashCode, db::IncomingType}; 27 use tokio::time::sleep; 28 use tracing::{debug, error, info, trace, warn}; 29 30 use super::{LoopError, LoopResult, analysis::analysis}; 31 use crate::{ 32 GetOpReturnErr, 33 config::WorkerCfg, 34 db::{self, AddIncomingResult, SyncOutResult, TxOutKind}, 35 fail_point::fail_point, 36 rpc::{self, Category, ErrorCode, ListSinceBlock, ListTransaction, Rpc, rpc_wallet}, 37 rpc_utils::sender_address, 38 taler_utils::btc_to_taler, 39 }; 40 41 pub async fn worker_loop(mut state: WorkerCfg, pool: PgPool) { 42 let mut jitter = ExpoBackoffDecorr::default(); 43 let mut lifetime = state.lifetime; 44 let mut status = true; 45 let mut skip_notification = true; 46 47 loop { 48 let result: LoopResult<()> = async { 49 // Connect 50 let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg).await?; 51 let db = &mut PgListener::connect_with(&pool).await?; 52 53 // Listen to all channels 54 db.listen_all(["new_block", "transfer"]).await?; 55 56 loop { 57 // Wait for the next notification 58 { 59 let ntf = db.next_buffered(); 60 if let Some(ntf) = &ntf { 61 trace!(target: "worker", "notification from {}", ntf.channel()) 62 } 63 if !skip_notification && ntf.is_none() { 64 debug!(target: "worker", "waiting for notifications"); 65 // Block until next notification 66 if let Some(ntf) = db.try_recv().await? { 67 trace!(target: "worker", "notification from {}", ntf.channel()) 68 } 69 } 70 // Conflate all notifications 71 while let Some(ntf) = db.next_buffered() { 72 trace!(target: "worker", "notification from {}", ntf.channel()) 73 } 74 } 75 76 // Check lifetime 77 if let Some(nb) = lifetime.as_mut() { 78 if *nb == 0 { 79 info!(target: "worker", "Reach end of lifetime"); 80 return Ok(()); 81 } else { 82 *nb -= 1; 83 } 84 } 85 86 debug!(target: "worker", "syncing blockchain"); 87 88 let mut db = db.acquire().await?; 89 90 // It is not possible to atomically update the blockchain and the database. 91 // When we failed to sync the database and the blockchain state we rely on 92 // sync_chain to recover the lost updates. 93 // When this function is running concurrently, it not possible to known another 94 // execution has failed, and this can lead to a transaction being sent multiple time. 95 // To ensure only a single version of this function is running at a given time we rely 96 // on postgres advisory lock 97 98 // Take the lock 99 let lock = PgAdvisoryLock::with_key(PgAdvisoryLockKey::BigInt(42)); 100 let Either::Left(mut lock) = lock.try_acquire(&mut db).await? else { 101 return Err(LoopError::Concurrency); 102 }; 103 104 // Perform analysis 105 state.conf = analysis(rpc, state.conf, state.max_conf).await?; 106 107 worker_step(rpc, lock.as_mut(), &mut state, &mut status).await?; 108 109 skip_notification = false; 110 jitter.reset(); 111 } 112 } 113 .await; 114 if let Err(e) = result { 115 error!(target: "worker", "{e}"); 116 // When we catch an error, we sometimes want to retry immediately (eg. reconnect to RPC or DB). 117 // Bitcoin error codes are generic. We need to match the msg to get precise ones. Some errors 118 // can resolve themselves when a new block is mined (new fees, new transactions). Our simple 119 // approach is to wait for the next loop when an RPC error is caught to prevent endless logged errors. 120 skip_notification = match e { 121 LoopError::DB(_) | LoopError::Injected(_) | LoopError::Concurrency => true, 122 LoopError::Rpc(e) => match e { 123 rpc::Error::Transport(_) | rpc::Error::Connect(_) => true, 124 rpc::Error::RPC { code, .. } => code == ErrorCode::RpcWalletError, 125 rpc::Error::Bitcoin(_) | rpc::Error::Json { .. } | rpc::Error::Null => false, 126 }, 127 }; 128 sleep(jitter.backoff()).await; 129 } else { 130 return; 131 } 132 } 133 } 134 135 pub async fn worker_transient(mut state: WorkerCfg, pool: PgPool) -> LoopResult<()> { 136 let mut status = true; 137 138 // Connect 139 let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg).await?; 140 let mut db = pool.acquire().await?; 141 142 // It is not possible to atomically update the blockchain and the database. 143 // When we failed to sync the database and the blockchain state we rely on 144 // sync_chain to recover the lost updates. 145 // When this function is running concurrently, it not possible to known another 146 // execution has failed, and this can lead to a transaction being sent multiple time. 147 // To ensure only a single version of this function is running at a given time we rely 148 // on postgres advisory lock 149 150 // Take the lock 151 let lock = PgAdvisoryLock::with_key(PgAdvisoryLockKey::BigInt(42)); 152 let Either::Left(mut lock) = lock.try_acquire(&mut db).await? else { 153 return Err(LoopError::Concurrency); 154 }; 155 156 worker_step(rpc, lock.as_mut(), &mut state, &mut status).await?; 157 Ok(()) 158 } 159 160 /// Synchronize local db with blockchain and perform transactions 161 async fn worker_step( 162 rpc: &mut Rpc, 163 db: &mut PgConnection, 164 state: &mut WorkerCfg, 165 status: &mut bool, 166 ) -> LoopResult<()> { 167 // Sync chain 168 if let Some(stuck) = sync_chain(rpc, db, state, status).await? { 169 // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent 170 171 // Send requested debits 172 while debit(db, rpc, state).await? {} 173 174 // Bump stuck transactions 175 for (txid, wtid) in stuck { 176 let bump = rpc.bump_fee(&txid).await?; 177 fail_point("(injected) fail bump", 0.3)?; 178 db::bump_tx_id(&mut *db, &bump.txid, &wtid).await?; 179 info!(target: "worker", ">> (bump) {wtid} {txid} -> {}", bump.txid); 180 } 181 182 // Send requested bounce 183 while bounce(db, rpc, &state.bounce_fee).await? {} 184 } 185 Ok(()) 186 } 187 188 /// Parse new transactions, return stuck transactions if the database is up to date with the latest mined block 189 async fn sync_chain( 190 rpc: &mut Rpc, 191 db: &mut PgConnection, 192 state: &WorkerCfg, 193 status: &mut bool, 194 ) -> LoopResult<Option<Vec<(Txid, ShortHashCode)>>> { 195 // Get stored last_hash 196 let sync_state = db::get_sync_state(&mut *db).await?; 197 198 // Get all transactions made since this block 199 let ListSinceBlock { 200 mut transactions, 201 mut removed, 202 lastblock, 203 } = rpc.list_since_block(Some(&sync_state), state.conf).await?; 204 transactions.sort_unstable_by_key(|it| (it.confirmations, it.txid)); 205 transactions.dedup_by_key(|it| it.txid); 206 removed.sort_unstable_by_key(|it| (it.confirmations, it.txid)); 207 removed.dedup_by_key(|it| it.txid); 208 209 // Check if a confirmed incoming transaction have been removed by a blockchain reorganization 210 let conflict = sync_chain_removed(removed, db, state.conf as i32).await?; 211 212 // Sync server status with database 213 let new_status = !conflict.stop_server(); 214 if *status != new_status { 215 db::update_status(db, new_status).await?; 216 *status = new_status; 217 if new_status { 218 info!(target: "worker", "Recovered lost transactions"); 219 } 220 } 221 222 if conflict.stop_worker() { 223 return Ok(None); 224 } 225 226 let mut stuck = vec![]; 227 228 for tx in transactions { 229 match tx.category { 230 Category::Send => { 231 if let Some(wtid) = 232 sync_chain_outgoing(&tx.txid, tx.confirmations, rpc, db, state).await? 233 { 234 stuck.push((tx.txid, wtid)); 235 } 236 } 237 Category::Receive if tx.confirmations >= state.conf as i32 => { 238 sync_chain_incoming_confirmed(&tx.txid, rpc, db, state).await? 239 } 240 _ => { 241 // Ignore coinbase and unconfirmed send transactions 242 } 243 } 244 } 245 246 // Move last_hash forward 247 db::swap_sync_state(db, &sync_state, &lastblock).await?; 248 249 Ok(Some(stuck)) 250 } 251 252 #[derive(Debug, Clone, Copy)] 253 enum ReorgConflict { 254 BackingCompromised, 255 IncomingCompromised, 256 Ok, 257 } 258 259 impl ReorgConflict { 260 pub fn stop_server(&self) -> bool { 261 matches!(self, Self::BackingCompromised) 262 } 263 264 pub fn stop_worker(&self) -> bool { 265 matches!(self, Self::BackingCompromised | Self::IncomingCompromised) 266 } 267 } 268 269 /// Sync database with removed transactions, return false if bitcoin backing is compromised 270 async fn sync_chain_removed( 271 removed: Vec<ListTransaction>, 272 db: &mut PgConnection, 273 min_confirmations: i32, 274 ) -> LoopResult<ReorgConflict> { 275 let potential_problematic_ids: Vec<Txid> = removed 276 .into_iter() 277 .filter_map(|tx| { 278 (tx.category == Category::Receive && tx.confirmations < min_confirmations) 279 .then_some(tx.txid) 280 }) 281 .collect(); 282 283 // Only keep incoming transaction that are not reconfirmed 284 let problematic_tx = db::reorg(&mut *db, &potential_problematic_ids).await?; 285 if problematic_tx.is_empty() { 286 return Ok(ReorgConflict::Ok); 287 } 288 // Bitcoin backing can be compromised in only two cases: 289 // - a confirmed reserve 290 // - a confirmed bounced 291 292 // TODO use partition_in_place when stable 293 let (compromise, problematic): (Vec<_>, Vec<_>) = 294 problematic_tx.iter().partition(|it| match it { 295 db::ProblematicTx::Taler { ty, .. } => *ty == IncomingType::reserve, 296 db::ProblematicTx::Bounce { .. } => true, 297 db::ProblematicTx::Simple { .. } => false, 298 }); 299 let mut buf = "The following transaction have been removed from the blockchain, ".to_string(); 300 let (txs, state) = if compromise.is_empty() { 301 buf.push_str("waiting until they reappear:"); 302 (problematic, ReorgConflict::IncomingCompromised) 303 } else { 304 buf.push_str("bitcoin backing is compromised until they reappear:"); 305 (compromise, ReorgConflict::BackingCompromised) 306 }; 307 for tx in txs { 308 match tx { 309 db::ProblematicTx::Taler { 310 txid, 311 addr, 312 ty, 313 metadata, 314 } => { 315 write!(&mut buf, "\n\t{txid} {ty} {metadata} from {addr}",).unwrap(); 316 } 317 db::ProblematicTx::Bounce { txid, bounced_in } => { 318 write!(&mut buf, "\n\t{txid} bounced in {bounced_in}").unwrap(); 319 } 320 db::ProblematicTx::Simple { txid } => { 321 write!(&mut buf, "\n\t{txid}").unwrap(); 322 } 323 } 324 } 325 error!(target: "worker", "{buf}"); 326 Ok(state) 327 } 328 329 /// Sync database with an incoming confirmed transaction 330 async fn sync_chain_incoming_confirmed( 331 txid: &Txid, 332 rpc: &mut Rpc, 333 db: &mut PgConnection, 334 state: &WorkerCfg, 335 ) -> Result<(), LoopError> { 336 let (tx, metadata) = rpc.get_tx_segwit_key(txid).await?; 337 // Store transactions in database 338 let debit_addr = sender_address(rpc, &tx).await?; 339 let amount = btc_to_taler(&tx.amount, &state.currency); 340 let time = Timestamp::from_second(tx.time as i64).unwrap(); 341 let ty = IncomingType::reserve; 342 match metadata { 343 Ok(reserve_pub) => { 344 match db::register_tx_in( 345 &mut *db, 346 txid, 347 &amount, 348 &debit_addr, 349 &Timestamp::from_second(tx.time as i64).unwrap(), 350 &Some(IncomingSubject::Reserve(reserve_pub.clone())), 351 ) 352 .await? 353 { 354 AddIncomingResult::Success { 355 new, 356 row_id: _, 357 valued_at: _, 358 pending: _, 359 } => { 360 if new { 361 info!(target: "worker", "<< {ty} {reserve_pub} {txid} {debit_addr} {amount}"); 362 } 363 } 364 AddIncomingResult::ReservePubReuse => { 365 db::bounce(db, txid, &amount, &debit_addr, &time, "reserve_pub reuse").await? 366 } 367 AddIncomingResult::UnknownMapping => todo!(), 368 AddIncomingResult::MappingReuse => todo!(), 369 } 370 } 371 Err(e) => db::bounce(db, txid, &amount, &debit_addr, &time, &e.to_string()).await?, 372 } 373 Ok(()) 374 } 375 376 /// Sync database with an outgoing transaction, return true if stuck 377 async fn sync_chain_outgoing( 378 txid: &Txid, 379 confirmations: i32, 380 rpc: &mut Rpc, 381 db: &mut PgConnection, 382 state: &WorkerCfg, 383 ) -> LoopResult<Option<ShortHashCode>> { 384 match rpc 385 .get_tx_op_return(txid) 386 .await 387 .map(|(tx, bytes)| (tx, OutMetadata::decode(&bytes))) 388 { 389 Ok((tx, Ok(info))) => { 390 let credit_addr = tx.details[0].address.clone().unwrap().assume_checked(); 391 let amount = btc_to_taler(&tx.amount, &state.currency); 392 let created_at = Timestamp::from_second(tx.time as i64).unwrap(); 393 match info { 394 OutMetadata::Debit { 395 wtid, 396 url, 397 metadata, 398 } => { 399 if confirmations < 0 { 400 // Handle conflicting tx 401 if tx.replaced_by_txid.is_none() && db::transfer_conflict(db, txid).await? { 402 warn!(target: "worker", ">> (conflict) {wtid} {txid} {credit_addr} {amount}"); 403 } 404 } else if confirmations > state.conf as i32 { 405 match db::sync_out( 406 db, 407 txid, 408 tx.replaced_by_txid.as_ref(), 409 &amount, 410 &credit_addr, 411 &TxOutKind::Talerable { 412 wtid: &wtid, 413 url: &url, 414 metadata: metadata.as_deref(), 415 }, 416 &created_at, 417 ) 418 .await? 419 { 420 SyncOutResult::New => { 421 info!(target: "worker", ">> (onchain) {wtid} {txid} {credit_addr} {amount}"); 422 } 423 SyncOutResult::Replaced => { 424 info!( 425 target: "worker", 426 ">> (recovered) {wtid} {txid} -> {} {credit_addr} {amount}", 427 tx.replaced_by_txid.unwrap() 428 ) 429 } 430 SyncOutResult::Recovered => { 431 warn!(target: "worker", ">> (recovered) {wtid} {txid} {credit_addr} {amount}") 432 } 433 SyncOutResult::None => {} 434 } 435 } else { 436 // TODO sync transfer sent ? 437 438 // Check if stuck 439 if let Some(delay) = state.bump_delay 440 && confirmations == 0 441 && tx.replaced_by_txid.is_none() 442 { 443 let now = SystemTime::now() 444 .duration_since(SystemTime::UNIX_EPOCH) 445 .unwrap() 446 .as_secs(); 447 if now - tx.time > delay as u64 { 448 return Ok(Some(wtid)); 449 } 450 } 451 } 452 } 453 OutMetadata::Bounce { bounced } => { 454 let bounced = Txid::from_byte_array(bounced); 455 if confirmations < 0 { 456 // Handle conflicting tx 457 if db::bounce_conflict(db, txid).await? { 458 warn!(target: "worker", "|| (conflict) {bounced} {txid}"); 459 } 460 } else if confirmations > state.conf as i32 { 461 match db::sync_out( 462 db, 463 txid, 464 tx.replaces_txid.as_ref(), 465 &amount, 466 &credit_addr, 467 &TxOutKind::Bounce(bounced), 468 &created_at, 469 ) 470 .await? 471 { 472 SyncOutResult::New => { 473 info!(target: "worker", "|| (onchain) {bounced} {txid}") 474 } 475 SyncOutResult::Replaced => { 476 info!( 477 target: "worker", 478 "|| (recovered) {bounced} {txid} -> {}", 479 tx.replaced_by_txid.unwrap() 480 ) 481 } 482 SyncOutResult::Recovered => { 483 warn!(target: "worker", "|| (recovered) {bounced} {txid}") 484 } 485 SyncOutResult::None => {} 486 } 487 } else { 488 // TODO sync bounce sent ? 489 } 490 } 491 } 492 } 493 Ok((_, Err(e))) => warn!(target: "worker", "send: decode-info {txid} - {e}"), 494 Err(e) => match e { 495 GetOpReturnErr::MissingOpReturn => { /* Ignore */ } 496 GetOpReturnErr::RPC(e) => return Err(e)?, 497 }, 498 } 499 Ok(None) 500 } 501 502 /// Send a debit transaction on the blockchain, return false if no more requested transactions are found 503 async fn debit(db: &mut PgConnection, rpc: &mut Rpc, state: &WorkerCfg) -> LoopResult<bool> { 504 // We rely on the advisory lock to ensure we are the only one sending transactions 505 if let Some((id, amount, wtid, addr, url, metadata)) = 506 db::pending_transfer(&mut *db, &state.currency).await? 507 { 508 let metadata = OutMetadata::Debit { 509 wtid: wtid.clone(), 510 url, 511 metadata, 512 }; 513 514 let txid = rpc 515 .send(&addr, &amount, Some(&metadata.encode().unwrap()), false) 516 .await?; 517 fail_point("(injected) fail debit", 0.3)?; 518 db::transfer_sent(db, id, &txid).await?; 519 let amount = btc_to_taler(&amount.to_signed().unwrap(), &state.currency); 520 info!(target: "worker", ">> (sent) {wtid} {txid} {addr} {amount}"); 521 Ok(true) 522 } else { 523 Ok(false) 524 } 525 } 526 527 /// Bounce a transaction on the blockchain, return false if no more requested transactions are found 528 async fn bounce(db: &mut PgConnection, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> { 529 // We rely on the advisory lock to ensure we are the only one sending transactions 530 if let Some((id, bounced, reason)) = db::pending_bounce(&mut *db).await? { 531 let metadata = OutMetadata::Bounce { 532 bounced: *bounced.as_byte_array(), 533 }; 534 535 match rpc 536 .bounce(&bounced, fee, Some(&metadata.encode().unwrap())) 537 .await 538 { 539 Ok(txid) => { 540 fail_point("(injected) fail bounce", 0.3)?; 541 db::bounce_sent(db, id, &txid).await?; 542 if let Some(reason) = reason { 543 info!(target: "worker", "|| (sent) {bounced} {txid}: {reason}"); 544 } else { 545 info!(target: "worker", "|| (sent) {bounced} {txid}"); 546 } 547 } 548 Err(err) => match err { 549 rpc::Error::RPC { 550 code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, 551 .. 552 } => { 553 warn!(target: "worker", "{err}"); 554 return Ok(false); 555 } 556 e => Err(e)?, 557 }, 558 } 559 Ok(true) 560 } else { 561 Ok(false) 562 } 563 }