worker.rs (26544B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::{num::ParseIntError, time::Duration}; 18 19 use aws_lc_rs::signature::EcdsaKeyPair; 20 use failure_injection::{InjectedErr, fail_point}; 21 use http_client::ApiErr; 22 use jiff::{Timestamp, Zoned, civil::Date}; 23 use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener}; 24 use taler_api::subject::{self, parse_incoming_unstructured}; 25 use taler_common::{ 26 ExpoBackoffDecorr, 27 config::Config, 28 types::{ 29 amount::{self}, 30 iban::IBAN, 31 }, 32 }; 33 use tracing::{debug, error, info, trace, warn}; 34 35 use crate::{ 36 FullHuPayto, HuIban, 37 config::{AccountType, WorkerCfg}, 38 db::{self, AddIncomingResult, Initiated, RegisterResult, TxIn, TxOut, TxOutKind}, 39 magnet_api::{ 40 api::MagnetErr, 41 client::{ApiClient, AuthClient}, 42 types::{Direction, Next, Order, TxDto, TxStatus}, 43 }, 44 setup, 45 }; 46 47 // const TXS_CURSOR_KEY: &str = "txs_cursor"; TODO cursor is broken 48 49 #[derive(Debug, thiserror::Error)] 50 pub enum WorkerError { 51 #[error(transparent)] 52 Db(#[from] sqlx::Error), 53 #[error(transparent)] 54 Api(#[from] ApiErr<MagnetErr>), 55 #[error("Another worker is running concurrently")] 56 Concurrency, 57 #[error(transparent)] 58 Injected(#[from] InjectedErr), 59 } 60 61 pub type WorkerResult = Result<(), WorkerError>; 62 63 pub async fn run_worker( 64 cfg: &Config, 65 pool: &PgPool, 66 client: &http_client::Client, 67 transient: bool, 68 ) -> anyhow::Result<()> { 69 let cfg = WorkerCfg::parse(cfg)?; 70 let keys = setup::load(&cfg)?; 71 let client = AuthClient::new(client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token); 72 73 if transient { 74 let mut conn = pool.acquire().await?; 75 let account = client.account(cfg.payto.bban()).await?; 76 Worker { 77 client: &client, 78 db: &mut conn, 79 account_number: &account.number, 80 account_code: account.code, 81 key: &keys.signing_key, 82 account_type: cfg.account_type, 83 ignore_tx_before: cfg.ignore_tx_before, 84 ignore_bounces_before: cfg.ignore_bounces_before, 85 } 86 .run() 87 .await?; 88 return Ok(()); 89 } 90 91 let mut jitter = ExpoBackoffDecorr::default(); 92 93 loop { 94 let res: WorkerResult = async { 95 let account = client.account(cfg.payto.bban()).await?; 96 let db = &mut PgListener::connect_with(pool).await?; 97 98 // Listen to all channels 99 db.listen_all(["transfer"]).await?; 100 101 info!(target: "worker", "running at initialisation"); 102 103 loop { 104 debug!(target: "worker", "running"); 105 Worker { 106 client: &client, 107 db: db.acquire().await?, 108 account_number: &account.number, 109 account_code: account.code, 110 key: &keys.signing_key, 111 account_type: cfg.account_type, 112 ignore_tx_before: cfg.ignore_tx_before, 113 ignore_bounces_before: cfg.ignore_bounces_before, 114 } 115 .run() 116 .await?; 117 jitter.reset(); 118 119 // Wait for notifications or sync timeout 120 if let Ok(res) = tokio::time::timeout(cfg.frequency, db.try_recv()).await { 121 let mut ntf = res?; 122 // Conflate all notifications 123 while let Some(n) = ntf { 124 debug!(target: "worker", "notification from {}", n.channel()); 125 ntf = db.next_buffered(); 126 } 127 128 if ntf.is_some() { 129 info!(target: "worker", "running at db trigger"); 130 } else { 131 info!(target: "worker", "running at frequency"); 132 } 133 } 134 } 135 } 136 .await; 137 let err = res.unwrap_err(); 138 error!(target: "worker", "{err}"); 139 140 if matches!(err, WorkerError::Concurrency) { 141 // This error won't resolve by itself easily and it mean we are actually making progress 142 // in another worker so we can jitter more aggressively 143 tokio::time::sleep(Duration::from_secs(15)).await; 144 } 145 tokio::time::sleep(jitter.backoff()).await; 146 } 147 } 148 149 pub struct Worker<'a> { 150 pub client: &'a ApiClient<'a>, 151 pub db: &'a mut PgConnection, 152 pub account_number: &'a str, 153 pub account_code: u64, 154 pub key: &'a EcdsaKeyPair, 155 pub account_type: AccountType, 156 pub ignore_tx_before: Option<Date>, 157 pub ignore_bounces_before: Option<Date>, 158 } 159 160 impl Worker<'_> { 161 /// Run a single worker pass 162 pub async fn run(&mut self) -> WorkerResult { 163 // Some worker operations are not idempotent, therefore it's not safe to have multiple worker 164 // running concurrently. We use a global Postgres advisory lock to prevent it. 165 if !db::worker_lock(self.db).await? { 166 return Err(WorkerError::Concurrency); 167 }; 168 169 // Sync transactions 170 let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused 171 let mut all_final = true; 172 let mut first = true; 173 loop { 174 let page = self 175 .client 176 .page_tx( 177 Direction::Both, 178 Order::Ascending, 179 100, 180 self.account_number, 181 &next, 182 first, 183 ) 184 .await?; 185 first = false; 186 next = page.next; 187 for item in page.list { 188 all_final &= item.tx.status.is_final(); 189 let tx = extract_tx_info(item.tx); 190 match tx { 191 Tx::In(tx_in) => { 192 // We only register final successful incoming transactions 193 if tx_in.status != TxStatus::Completed { 194 debug!(target: "worker", "pending or failed in {tx_in}"); 195 continue; 196 } 197 198 if let Some(before) = self.ignore_tx_before 199 && tx_in.value_date < before 200 { 201 debug!(target: "worker", "ignore in {tx_in}"); 202 continue; 203 } 204 let bounce = async |db: &mut PgConnection, 205 reason: &str| 206 -> Result<(), WorkerError> { 207 if let Some(before) = self.ignore_bounces_before 208 && tx_in.value_date < before 209 { 210 match db::register_tx_in(db, &tx_in, &None, &Timestamp::now()) 211 .await? 212 { 213 AddIncomingResult::Success { new, .. } => { 214 if new { 215 info!(target: "worker", "in {tx_in} skip bounce: {reason}"); 216 } else { 217 trace!(target: "worker", "in {tx_in} already skip bounce "); 218 } 219 } 220 AddIncomingResult::ReservePubReuse => unreachable!(), 221 } 222 } else { 223 let res = db::register_bounce_tx_in( 224 db, 225 &tx_in, 226 &tx_in.amount, 227 reason, 228 &Timestamp::now(), 229 ) 230 .await?; 231 232 if res.tx_new { 233 info!(target: "worker", 234 "in {tx_in} bounced in {}: {reason}", 235 res.bounce_id 236 ); 237 } else { 238 trace!(target: "worker", 239 "in {tx_in} already seen and bounced in {}: {reason}", 240 res.bounce_id 241 ); 242 } 243 } 244 Ok(()) 245 }; 246 match self.account_type { 247 AccountType::Exchange => { 248 match parse_incoming_unstructured(&tx_in.subject) { 249 Ok(None) => bounce(self.db, "missing public key").await?, 250 Ok(Some(subject)) => match db::register_tx_in( 251 self.db, 252 &tx_in, 253 &Some(subject), 254 &Timestamp::now(), 255 ) 256 .await? 257 { 258 AddIncomingResult::Success { new, .. } => { 259 if new { 260 info!(target: "worker", "in {tx_in}"); 261 } else { 262 trace!(target: "worker", "in {tx_in} already seen"); 263 } 264 } 265 AddIncomingResult::ReservePubReuse => { 266 bounce(self.db, "reserve pub reuse").await? 267 } 268 }, 269 Err(e) => bounce(self.db, &e.to_string()).await?, 270 } 271 } 272 AccountType::Normal => { 273 match db::register_tx_in(self.db, &tx_in, &None, &Timestamp::now()) 274 .await? 275 { 276 AddIncomingResult::Success { new, .. } => { 277 if new { 278 info!(target: "worker", "in {tx_in}"); 279 } else { 280 trace!(target: "worker", "in {tx_in} already seen"); 281 } 282 } 283 AddIncomingResult::ReservePubReuse => unreachable!(), 284 } 285 } 286 } 287 } 288 Tx::Out(tx_out) => { 289 match tx_out.status { 290 TxStatus::ToBeRecorded => { 291 self.recover_tx(&tx_out).await?; 292 continue; 293 } 294 TxStatus::PendingFirstSignature 295 | TxStatus::PendingSecondSignature 296 | TxStatus::PendingProcessing 297 | TxStatus::Verified 298 | TxStatus::PartiallyCompleted 299 | TxStatus::UnderReview => { 300 // Still pending 301 debug!(target: "worker", "pending out {tx_out}"); 302 continue; 303 } 304 TxStatus::Rejected | TxStatus::Canceled | TxStatus::Completed => {} 305 } 306 match self.account_type { 307 AccountType::Exchange => { 308 let kind = if let Ok(subject) = 309 subject::parse_outgoing(&tx_out.subject) 310 { 311 TxOutKind::Talerable(subject) 312 } else if let Ok(bounced) = parse_bounce_outgoing(&tx_out.subject) { 313 TxOutKind::Bounce(bounced) 314 } else { 315 TxOutKind::Simple 316 }; 317 if tx_out.status == TxStatus::Completed { 318 let res = db::register_tx_out( 319 self.db, 320 &tx_out, 321 &kind, 322 &Timestamp::now(), 323 ) 324 .await?; 325 match res.result { 326 RegisterResult::idempotent => match kind { 327 TxOutKind::Simple => { 328 trace!(target: "worker", "out malformed {tx_out} already seen") 329 } 330 TxOutKind::Bounce(_) => { 331 trace!(target: "worker", "out bounce {tx_out} already seen") 332 } 333 TxOutKind::Talerable(_) => { 334 trace!(target: "worker", "out {tx_out} already seen") 335 } 336 }, 337 RegisterResult::known => match kind { 338 TxOutKind::Simple => { 339 warn!(target: "worker", "out malformed {tx_out}") 340 } 341 TxOutKind::Bounce(_) => { 342 info!(target: "worker", "out bounce {tx_out}") 343 } 344 TxOutKind::Talerable(_) => { 345 info!(target: "worker", "out {tx_out}") 346 } 347 }, 348 RegisterResult::recovered => match kind { 349 TxOutKind::Simple => { 350 warn!(target: "worker", "out malformed (recovered) {tx_out}") 351 } 352 TxOutKind::Bounce(_) => { 353 warn!(target: "worker", "out bounce (recovered) {tx_out}") 354 } 355 TxOutKind::Talerable(_) => { 356 warn!(target: "worker", "out (recovered) {tx_out}") 357 } 358 }, 359 } 360 } else { 361 let bounced = match kind { 362 TxOutKind::Simple => None, 363 TxOutKind::Bounce(bounced) => Some(bounced), 364 TxOutKind::Talerable(_) => None, 365 }; 366 let res = db::register_tx_out_failure( 367 self.db, 368 tx_out.code, 369 bounced, 370 &Timestamp::now(), 371 ) 372 .await?; 373 if let Some(id) = res.initiated_id { 374 if res.new { 375 error!(target: "worker", "out failure {id} {tx_out}"); 376 } else { 377 trace!(target: "worker", "out failure {id} {tx_out} already seen"); 378 } 379 } 380 } 381 } 382 AccountType::Normal => { 383 if tx_out.status == TxStatus::Completed { 384 let res = db::register_tx_out( 385 self.db, 386 &tx_out, 387 &TxOutKind::Simple, 388 &Timestamp::now(), 389 ) 390 .await?; 391 match res.result { 392 RegisterResult::idempotent => { 393 trace!(target: "worker", "out {tx_out} already seen"); 394 } 395 RegisterResult::known => { 396 info!(target: "worker", "out {tx_out}"); 397 } 398 RegisterResult::recovered => { 399 warn!(target: "worker", "out (recovered) {tx_out}"); 400 } 401 } 402 } else { 403 let res = db::register_tx_out_failure( 404 self.db, 405 tx_out.code, 406 None, 407 &Timestamp::now(), 408 ) 409 .await?; 410 if let Some(id) = res.initiated_id { 411 if res.new { 412 error!(target: "worker", "out failure {id} {tx_out}"); 413 } else { 414 trace!(target: "worker", "out failure {id} {tx_out} already seen"); 415 } 416 } 417 } 418 } 419 } 420 } 421 } 422 } 423 424 if let Some(_next) = &next { 425 // Update in db cursor only if all previous transactions where final 426 if all_final { 427 // debug!(target: "worker", "advance cursor {next:?}"); 428 // kv_set(&mut *self.db, TXS_CURSOR_KEY, &next).await?; TODO cursor is broken 429 } 430 } else { 431 break; 432 } 433 } 434 435 // Send transactions 436 let start = Timestamp::now(); 437 let now = Zoned::now(); 438 loop { 439 let batch = db::pending_batch(&mut *self.db, &start).await?; 440 if batch.is_empty() { 441 break; 442 } 443 for tx in batch { 444 debug!(target: "worker", "send tx {tx}"); 445 self.init_tx(&tx, &now).await?; 446 } 447 } 448 Ok(()) 449 } 450 451 /// Try to sign an unsigned initiated transaction 452 pub async fn recover_tx(&mut self, tx: &TxOut) -> WorkerResult { 453 if db::initiated_exists_for_code(&mut *self.db, tx.code) 454 .await? 455 .is_some() 456 { 457 // Known initiated we submit it 458 assert_eq!(tx.amount.frac, 0); 459 self.submit_tx( 460 tx.code, 461 -(tx.amount.val as f64), 462 &tx.value_date, 463 tx.creditor.bban(), 464 ) 465 .await?; 466 } else { 467 // The transaction is unknown (we failed after creating it and before storing it in the db) 468 // we delete it 469 self.client.delete_tx(tx.code).await?; 470 debug!(target: "worker", "out {}: delete uncompleted orphan", tx.code); 471 } 472 473 Ok(()) 474 } 475 476 /// Create and sign a forint transfer 477 pub async fn init_tx(&mut self, tx: &Initiated, now: &Zoned) -> WorkerResult { 478 trace!(target: "worker", "init tx {tx}"); 479 assert_eq!(tx.amount.frac, 0); 480 let date = now.date(); 481 // Initialize the new transaction, on failure an orphan initiated transaction can be created 482 let res = self 483 .client 484 .init_tx( 485 self.account_code, 486 tx.amount.val as f64, 487 &tx.subject, 488 &date, 489 &tx.creditor.name, 490 tx.creditor.bban(), 491 ) 492 .await; 493 fail_point("init-tx")?; 494 let info = match res { 495 // Check if succeeded 496 Ok(info) => { 497 // Update transaction status, on failure the initiated transaction will be orphan 498 db::initiated_submit_success(&mut *self.db, tx.id, &Timestamp::now(), info.code) 499 .await?; 500 info 501 } 502 Err(e) => { 503 if let MagnetErr::Magnet(e) = &e.err { 504 // Check if error is permanent 505 if matches!( 506 (e.error_code, e.short_message.as_str()), 507 (404, "BSZLA_NEM_TALALHATO") // Unknown account 508 | (409, "FORRAS_SZAMLA_ESZAMLA_EGYEZIK") // Same account 509 ) { 510 db::initiated_submit_permanent_failure( 511 &mut *self.db, 512 tx.id, 513 &Timestamp::now(), 514 &e.to_string(), 515 ) 516 .await?; 517 error!(target: "worker", "initiated failure {tx}: {e}"); 518 return WorkerResult::Ok(()); 519 } 520 } 521 return Err(e.into()); 522 } 523 }; 524 trace!(target: "worker", "init tx {}", info.code); 525 526 // Sign transaction 527 self.submit_tx(info.code, info.amount, &date, tx.creditor.bban()) 528 .await?; 529 Ok(()) 530 } 531 532 /** Submit an initiated forint transfer */ 533 pub async fn submit_tx( 534 &mut self, 535 tx_code: u64, 536 amount: f64, 537 date: &Date, 538 creditor: &str, 539 ) -> WorkerResult { 540 debug!(target: "worker", "submit tx {tx_code}"); 541 fail_point("submit-tx")?; 542 // Submit an initiated transaction, on failure we will retry 543 match self 544 .client 545 .submit_tx( 546 self.key, 547 self.account_number, 548 tx_code, 549 amount, 550 date, 551 creditor, 552 ) 553 .await 554 { 555 Ok(_) => Ok(()), 556 Err(e) => { 557 if let MagnetErr::Magnet(e) = &e.err { 558 // Check if soft failure 559 if matches!( 560 (e.error_code, e.short_message.as_str()), 561 (409, "TRANZAKCIO_ROSSZ_STATUS") // Already summited or cannot be signed 562 ) { 563 warn!(target: "worker", "submit tx {tx_code}: {e}"); 564 return Ok(()); 565 } 566 } 567 Err(e.into()) 568 } 569 } 570 } 571 } 572 573 pub enum Tx { 574 In(TxIn), 575 Out(TxOut), 576 } 577 578 pub fn extract_tx_info(tx: TxDto) -> Tx { 579 // TODO amount from f64 without allocations 580 let amount = amount::amount(format!("{}:{}", tx.currency, tx.amount.abs())); 581 // TODO we should support non hungarian account and error handling 582 let iban = if tx.counter_account.starts_with("HU") { 583 let iban: IBAN = tx.counter_account.parse().unwrap(); 584 HuIban::try_from(iban).unwrap() 585 } else { 586 HuIban::from_bban(&tx.counter_account).unwrap() 587 }; 588 let counter_account = FullHuPayto::new(iban, &tx.counter_name); 589 if tx.amount.is_sign_positive() { 590 Tx::In(TxIn { 591 code: tx.code, 592 amount, 593 subject: tx.subject.unwrap_or_default(), 594 debtor: counter_account, 595 value_date: tx.value_date, 596 status: tx.status, 597 }) 598 } else { 599 Tx::Out(TxOut { 600 code: tx.code, 601 amount, 602 subject: tx.subject.unwrap_or_default(), 603 creditor: counter_account, 604 value_date: tx.value_date, 605 status: tx.status, 606 }) 607 } 608 } 609 610 #[derive(Debug, thiserror::Error)] 611 pub enum BounceSubjectErr { 612 #[error("missing parts")] 613 MissingParts, 614 #[error("not a bounce")] 615 NotBounce, 616 #[error("malformed bounced id: {0}")] 617 Id(#[from] ParseIntError), 618 } 619 620 pub fn parse_bounce_outgoing(subject: &str) -> Result<u32, BounceSubjectErr> { 621 let (prefix, id) = subject 622 .rsplit_once(" ") 623 .ok_or(BounceSubjectErr::MissingParts)?; 624 if !prefix.starts_with("bounce") { 625 return Err(BounceSubjectErr::NotBounce); 626 } 627 let id: u32 = id.parse()?; 628 Ok(id) 629 }