worker.rs (20520B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::time::Duration; 18 19 use failure_injection::{InjectedErr, fail_point}; 20 use http_client::ApiErr; 21 use jiff::Timestamp; 22 use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener}; 23 use taler_api::subject::{self, parse_incoming_unstructured}; 24 use taler_common::{ 25 ExpoBackoffDecorr, 26 config::Config, 27 types::amount::{self, Currency}, 28 }; 29 use tokio::{join, sync::Notify}; 30 use tracing::{debug, error, info, trace, warn}; 31 32 use crate::{ 33 config::{AccountType, WorkerCfg}, 34 constants::SYNC_CURSOR_KEY, 35 cyclos_api::{ 36 api::{CyclosAuth, CyclosErr}, 37 client::Client, 38 types::{AccountKind, HistoryItem, InputError, NotFoundError, OrderBy}, 39 }, 40 db::{ 41 self, AddIncomingResult, ChargebackFailureResult, RegisterResult, TxIn, TxOut, TxOutKind, 42 kv_get, kv_set, 43 }, 44 notification::watch_notification, 45 }; 46 47 #[derive(Debug, thiserror::Error)] 48 pub enum WorkerError { 49 #[error(transparent)] 50 Db(#[from] sqlx::Error), 51 #[error(transparent)] 52 Api(#[from] ApiErr<CyclosErr>), 53 #[error("Another worker is running concurrently")] 54 Concurrency, 55 #[error(transparent)] 56 Injected(#[from] InjectedErr), 57 } 58 59 pub type WorkerResult = Result<(), WorkerError>; 60 61 pub async fn run_worker( 62 cfg: &Config, 63 pool: &PgPool, 64 client: &http_client::Client, 65 transient: bool, 66 ) -> anyhow::Result<()> { 67 let cfg = WorkerCfg::parse(cfg)?; 68 let client = Client { 69 client, 70 api_url: &cfg.host.api_url, 71 auth: &CyclosAuth::Basic { 72 username: cfg.host.username, 73 password: cfg.host.password, 74 }, 75 }; 76 if transient { 77 let mut conn = pool.acquire().await?; 78 Worker { 79 client: &client, 80 db: &mut conn, 81 account_type_id: *cfg.account_type_id, 82 payment_type_id: *cfg.payment_type_id, 83 account_type: cfg.account_type, 84 currency: cfg.currency.clone(), 85 } 86 .run() 87 .await?; 88 return Ok(()); 89 } 90 91 let notification = Notify::new(); 92 93 let watcher = async { 94 watch_notification(&client, ¬ification).await; 95 }; 96 let worker = async { 97 let mut jitter = ExpoBackoffDecorr::default(); 98 let mut skip_notifications: bool = true; 99 loop { 100 info!(target: "worker", "running at initialisation"); 101 let res: WorkerResult = async { 102 let db = &mut PgListener::connect_with(pool).await?; 103 104 // Listen to all channels 105 db.listen_all(["transfer"]).await?; 106 107 loop { 108 if !skip_notifications { 109 tokio::select! { 110 _ = tokio::time::sleep(cfg.frequency) => { 111 info!(target: "worker", "running at frequency"); 112 } 113 res = db.try_recv() => { 114 let mut ntf = res?; 115 // Conflate all notifications 116 while let Some(n) = ntf { 117 debug!(target: "worker", "notification from {}", n.channel()); 118 ntf = db.next_buffered(); 119 } 120 info!(target: "worker", "running at db trigger"); 121 } 122 _ = notification.notified() => { 123 info!(target: "worker", "running at notification trigger"); 124 } 125 }; 126 } 127 skip_notifications = false; 128 Worker { 129 client: &client, 130 db: db.acquire().await?, 131 account_type_id: *cfg.account_type_id, 132 payment_type_id: *cfg.payment_type_id, 133 account_type: cfg.account_type, 134 currency: cfg.currency.clone(), 135 } 136 .run() 137 .await?; 138 jitter.reset(); 139 } 140 } 141 .await; 142 let err = res.unwrap_err(); 143 error!(target: "worker", "{err}"); 144 145 match err { 146 WorkerError::Concurrency => { 147 // This error won't resolve by itself easily and it mean we are actually making progress 148 // in another worker so we can jitter more aggressively 149 tokio::time::sleep(Duration::from_secs(15)).await; 150 skip_notifications = false; 151 } 152 WorkerError::Api(ApiErr { 153 ctx: _, 154 err: CyclosErr::Input(InputError::Validation { .. }), 155 }) => { 156 // In case of validation failure we do not want to retry right away as it can DOS the service 157 skip_notifications = false; 158 } 159 WorkerError::Api(_) | WorkerError::Db(_) | WorkerError::Injected(_) => { 160 skip_notifications = true; 161 } 162 } 163 tokio::time::sleep(jitter.backoff()).await; 164 } 165 }; 166 join!(watcher, worker); // TODO try_join 167 Ok(()) 168 } 169 170 pub struct Worker<'a> { 171 pub client: &'a Client<'a>, 172 pub db: &'a mut PgConnection, 173 pub currency: Currency, 174 pub account_type_id: i64, 175 pub payment_type_id: i64, 176 pub account_type: AccountType, 177 } 178 179 impl Worker<'_> { 180 /// Run a single worker pass 181 pub async fn run(&mut self) -> WorkerResult { 182 // Some worker operations are not idempotent, therefore it's not safe to have multiple worker 183 // running concurrently. We use a global Postgres advisory lock to prevent it. 184 if !db::worker_lock(self.db).await? { 185 return Err(WorkerError::Concurrency); 186 }; 187 188 // Sync transactions 189 let mut cursor: Timestamp = kv_get(&mut *self.db, SYNC_CURSOR_KEY) 190 .await? 191 .unwrap_or_default(); 192 193 loop { 194 let page = self 195 .client 196 .history(self.account_type_id, OrderBy::DateAsc, 0, Some(cursor)) 197 .await?; 198 for transfer in page.page { 199 if transfer.date > cursor { 200 cursor = transfer.date; 201 } 202 let tx = extract_tx_info(transfer); 203 match tx { 204 Tx::In(tx_in) => self.ingest_in(tx_in).await?, 205 Tx::Out(tx_out) => self.ingest_out(tx_out).await?, 206 } 207 } 208 209 kv_set(&mut *self.db, SYNC_CURSOR_KEY, &cursor).await?; 210 211 if !page.has_next_page { 212 break; 213 } 214 } 215 216 // Send transactions 217 let start = Timestamp::now(); 218 loop { 219 let batch = db::pending_batch(&mut *self.db, &start).await?; 220 if batch.is_empty() { 221 break; 222 } 223 for initiated in batch { 224 debug!(target: "worker", "send tx {initiated}"); 225 let res = self 226 .client 227 .direct_payment( 228 initiated.creditor_id, 229 self.payment_type_id, 230 initiated.amount, 231 &initiated.subject, 232 ) 233 .await; 234 fail_point("direct-payment")?; 235 match res { 236 Ok(tx) => { 237 // Update transaction status, on failure the initiated transaction will be orphan 238 db::initiated_submit_success( 239 &mut *self.db, 240 initiated.id, 241 &tx.date, 242 tx.id.0, 243 ) 244 .await?; 245 trace!(target: "worker", "init tx {}", tx.id); 246 } 247 Err(e) => { 248 let msg = match e.err { 249 CyclosErr::Unknown(NotFoundError { entity_type, key }) => { 250 format!("unknown {entity_type} {key}") 251 } 252 CyclosErr::Forbidden(err) => err.to_string(), 253 _ => return Err(e.into()), 254 }; 255 // TODO is permission should be considered are hard or soft failure ? 256 db::initiated_submit_permanent_failure(&mut *self.db, initiated.id, &msg) 257 .await?; 258 error!(target: "worker", "initiated failure {initiated}: {msg}"); 259 } 260 } 261 } 262 } 263 Ok(()) 264 } 265 266 /// Ingest an incoming transaction 267 async fn ingest_in(&mut self, tx: TxIn) -> WorkerResult { 268 match self.account_type { 269 AccountType::Exchange => { 270 let transfer = self.client.transfer(tx.transfer_id).await?; 271 let bounce = async |db: &mut PgConnection, 272 reason: &str| 273 -> Result<(), WorkerError> { 274 // Fetch existing transaction 275 if let Some(chargeback) = transfer.charged_back_by { 276 let res = db::register_bounced_tx_in( 277 db, 278 &tx, 279 *chargeback.id, 280 reason, 281 &Timestamp::now(), 282 ) 283 .await?; 284 if res.tx_new { 285 info!(target: "worker", 286 "in {tx} bounced (recovered) in {}: {reason}", chargeback.id 287 ); 288 } else { 289 trace!(target: "worker", 290 "in {tx} already seen and bounced in {}: {reason}",chargeback.id 291 ); 292 } 293 } else if !transfer.can_chargeback { 294 match db::register_tx_in(db, &tx, &None, &Timestamp::now()).await? { 295 AddIncomingResult::Success { new, .. } => { 296 if new { 297 warn!(target: "worker", "in {tx} cannot bounce: {reason}"); 298 } else { 299 trace!(target: "worker", "in {tx} already seen and cannot bounce "); 300 } 301 } 302 AddIncomingResult::ReservePubReuse => unreachable!(), 303 } 304 } else { 305 let chargeback_id = self.client.chargeback(*transfer.id).await?; 306 fail_point("chargeback")?; 307 let res = db::register_bounced_tx_in( 308 db, 309 &tx, 310 chargeback_id, 311 reason, 312 &Timestamp::now(), 313 ) 314 .await?; 315 if res.tx_new { 316 info!(target: "worker", "in {tx} bounced in {chargeback_id}: {reason}"); 317 } else { 318 trace!(target: "worker", "in {tx} already seen and bounced in {chargeback_id}: {reason}"); 319 } 320 } 321 Ok(()) 322 }; 323 if let Some(chargeback) = transfer.chargeback_of { 324 // This a chargeback of one of our transaction, if we bounce we might enter a loop 325 match db::initiated_chargeback_failure(&mut *self.db, *chargeback.id).await? { 326 ChargebackFailureResult::Unknown => { 327 trace!(target: "worker", "initiated failure unknown: charged back") 328 } 329 ChargebackFailureResult::Known(initiated) => { 330 error!(target: "worker", "initiated failure {initiated}: charged back") 331 } 332 ChargebackFailureResult::Idempotent(initiated) => { 333 trace!(target: "worker", "initiated failure {initiated} already seen: charged back") 334 } 335 } 336 // Sill register the incoming transaction as an incoming one 337 match db::register_tx_in(self.db, &tx, &None, &Timestamp::now()).await? { 338 AddIncomingResult::Success { new, .. } => { 339 if new { 340 info!(target: "worker", "in {tx} chargeback"); 341 } else { 342 trace!(target: "worker", "in {tx} chargeback already seen"); 343 } 344 } 345 AddIncomingResult::ReservePubReuse => unreachable!(), 346 } 347 348 return Ok(()); 349 } 350 match parse_incoming_unstructured(&tx.subject) { 351 Ok(None) => bounce(self.db, "missing public key").await?, 352 Ok(Some(subject)) => { 353 match db::register_tx_in(self.db, &tx, &Some(subject), &Timestamp::now()) 354 .await? 355 { 356 AddIncomingResult::Success { new, .. } => { 357 if new { 358 info!(target: "worker", "in {tx}"); 359 } else { 360 trace!(target: "worker", "in {tx} already seen"); 361 } 362 } 363 AddIncomingResult::ReservePubReuse => { 364 bounce(self.db, "reserve pub reuse").await? 365 } 366 } 367 } 368 Err(e) => bounce(self.db, &e.to_string()).await?, 369 } 370 } 371 AccountType::Normal => { 372 match db::register_tx_in(self.db, &tx, &None, &Timestamp::now()).await? { 373 AddIncomingResult::Success { new, .. } => { 374 if new { 375 info!(target: "worker", "in {tx}"); 376 } else { 377 trace!(target: "worker", "in {tx} already seen"); 378 } 379 } 380 AddIncomingResult::ReservePubReuse => unreachable!(), 381 } 382 } 383 } 384 Ok(()) 385 } 386 387 async fn ingest_out(&mut self, tx: TxOut) -> WorkerResult { 388 match self.account_type { 389 AccountType::Exchange => { 390 let transfer = self.client.transfer(tx.transfer_id).await?; 391 392 if transfer.charged_back_by.is_some() { 393 match db::initiated_chargeback_failure(&mut *self.db, *transfer.id).await? { 394 ChargebackFailureResult::Unknown => { 395 trace!(target: "worker", "initiated failure unknown: charged back") 396 } 397 ChargebackFailureResult::Known(initiated) => { 398 error!(target: "worker", "initiated failure {initiated}: charged back") 399 } 400 ChargebackFailureResult::Idempotent(initiated) => { 401 trace!(target: "worker", "initiated failure {initiated} already seen: charged back") 402 } 403 } 404 } 405 406 let kind = if let Ok(subject) = subject::parse_outgoing(&tx.subject) { 407 TxOutKind::Talerable(subject) 408 } else if let Some(chargeback) = &transfer.chargeback_of { 409 TxOutKind::Bounce(*chargeback.id) 410 } else { 411 TxOutKind::Simple 412 }; 413 414 let res = db::register_tx_out(self.db, &tx, &kind, &Timestamp::now()).await?; 415 match res.result { 416 RegisterResult::idempotent => match kind { 417 TxOutKind::Simple => { 418 trace!(target: "worker", "out malformed {tx} already seen") 419 } 420 TxOutKind::Bounce(_) => { 421 trace!(target: "worker", "out bounce {tx} already seen") 422 } 423 TxOutKind::Talerable(_) => { 424 trace!(target: "worker", "out {tx} already seen") 425 } 426 }, 427 RegisterResult::known => match kind { 428 TxOutKind::Simple => { 429 warn!(target: "worker", "out malformed {tx}") 430 } 431 TxOutKind::Bounce(_) => { 432 info!(target: "worker", "out bounce {tx}") 433 } 434 TxOutKind::Talerable(_) => { 435 info!(target: "worker", "out {tx}") 436 } 437 }, 438 RegisterResult::recovered => match kind { 439 TxOutKind::Simple => { 440 warn!(target: "worker", "out malformed (recovered) {tx}") 441 } 442 TxOutKind::Bounce(_) => { 443 warn!(target: "worker", "out bounce (recovered) {tx}") 444 } 445 TxOutKind::Talerable(_) => { 446 warn!(target: "worker", "out (recovered) {tx}") 447 } 448 }, 449 } 450 } 451 AccountType::Normal => { 452 let res = db::register_tx_out(self.db, &tx, &TxOutKind::Simple, &Timestamp::now()) 453 .await?; 454 match res.result { 455 RegisterResult::idempotent => { 456 trace!(target: "worker", "out {tx} already seen"); 457 } 458 RegisterResult::known => { 459 info!(target: "worker", "out {tx}"); 460 } 461 RegisterResult::recovered => { 462 warn!(target: "worker", "out (recovered) {tx}"); 463 } 464 } 465 } 466 } 467 Ok(()) 468 } 469 } 470 471 pub enum Tx { 472 In(TxIn), 473 Out(TxOut), 474 } 475 476 pub fn extract_tx_info(tx: HistoryItem) -> Tx { 477 let amount = amount::decimal(tx.amount.trim_start_matches('-')); 478 let (id, name) = match tx.related_account.kind { 479 AccountKind::System => (tx.related_account.ty.id, tx.related_account.ty.name), 480 AccountKind::User { user } => (user.id, user.display), 481 }; 482 if tx.amount.starts_with("-") { 483 Tx::Out(TxOut { 484 transfer_id: *tx.id, 485 tx_id: tx.transaction.map(|it| *it.id), 486 amount, 487 subject: tx.description.unwrap_or_default(), 488 creditor_id: *id, 489 creditor_name: name, 490 valued_at: tx.date, 491 }) 492 } else { 493 Tx::In(TxIn { 494 transfer_id: *tx.id, 495 tx_id: tx.transaction.map(|it| *it.id), 496 amount, 497 subject: tx.description.unwrap_or_default(), 498 debtor_id: *id, 499 debtor_name: name, 500 valued_at: tx.date, 501 }) 502 } 503 }