worker.rs (23066B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::num::ParseIntError; 18 19 use jiff::{Timestamp, Zoned, civil::Date}; 20 use p256::ecdsa::SigningKey; 21 use sqlx::PgConnection; 22 use taler_api::subject::{self, parse_incoming_unstructured}; 23 use taler_common::types::{ 24 amount::{self}, 25 iban::IBAN, 26 }; 27 use tracing::{debug, error, info, trace, warn}; 28 29 use crate::{ 30 FullHuPayto, HuIban, 31 config::AccountType, 32 db::{self, AddIncomingResult, Initiated, RegisterResult, TxIn, TxOut, TxOutKind}, 33 failure_injection::{InjectedErr, fail_point}, 34 magnet_api::{ 35 api::{ApiErr, ErrKind}, 36 client::ApiClient, 37 types::{Direction, Next, Order, TxDto, TxStatus}, 38 }, 39 }; 40 41 // const TXS_CURSOR_KEY: &str = "txs_cursor"; TODO cursor is broken 42 43 #[derive(Debug, thiserror::Error)] 44 pub enum WorkerError { 45 #[error(transparent)] 46 Db(#[from] sqlx::Error), 47 #[error(transparent)] 48 Api(#[from] ApiErr), 49 #[error(transparent)] 50 Injected(#[from] InjectedErr), 51 } 52 53 pub type WorkerResult = Result<(), WorkerError>; 54 55 pub struct Worker<'a> { 56 pub client: &'a ApiClient<'a>, 57 pub db: &'a mut PgConnection, 58 pub account_number: &'a str, 59 pub account_code: u64, 60 pub key: &'a SigningKey, 61 pub account_type: AccountType, 62 pub ignore_tx_before: Option<Date>, 63 pub ignore_bounces_before: Option<Date>, 64 } 65 66 impl Worker<'_> { 67 /// Run a single worker pass 68 pub async fn run(&mut self) -> WorkerResult { 69 // Sync transactions 70 let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused 71 let mut all_final = true; 72 let mut first = true; 73 loop { 74 let page = self 75 .client 76 .page_tx( 77 Direction::Both, 78 Order::Ascending, 79 100, 80 self.account_number, 81 &next, 82 first, 83 ) 84 .await?; 85 first = false; 86 next = page.next; 87 for item in page.list { 88 all_final &= item.tx.status.is_final(); 89 let tx = extract_tx_info(item.tx); 90 match tx { 91 Tx::In(tx_in) => { 92 // We only register final successful incoming transactions 93 if tx_in.status != TxStatus::Completed { 94 debug!(target: "worker", "pending or failed in {tx_in}"); 95 continue; 96 } 97 98 if let Some(before) = self.ignore_tx_before 99 && tx_in.value_date < before 100 { 101 debug!(target: "worker", "ignore in {tx_in}"); 102 continue; 103 } 104 let bounce = async |db: &mut PgConnection, 105 reason: &str| 106 -> Result<(), WorkerError> { 107 if let Some(before) = self.ignore_bounces_before 108 && tx_in.value_date < before 109 { 110 match db::register_tx_in(db, &tx_in, &None, &Timestamp::now()) 111 .await? 112 { 113 AddIncomingResult::Success { new, .. } => { 114 if new { 115 info!(target: "worker", "in {tx_in} skip bounce: {reason}"); 116 } else { 117 trace!(target: "worker", "in {tx_in} already skip bounce "); 118 } 119 } 120 AddIncomingResult::ReservePubReuse => unreachable!(), 121 } 122 } else { 123 let res = db::register_bounce_tx_in( 124 db, 125 &tx_in, 126 &tx_in.amount, 127 reason, 128 &Timestamp::now(), 129 ) 130 .await?; 131 132 if res.tx_new { 133 info!(target: "worker", 134 "in {tx_in} bounced in {}: {reason}", 135 res.bounce_id 136 ); 137 } else { 138 trace!(target: "worker", 139 "in {tx_in} already seen and bounced in {}: {reason}", 140 res.bounce_id 141 ); 142 } 143 } 144 Ok(()) 145 }; 146 match self.account_type { 147 AccountType::Exchange => { 148 match parse_incoming_unstructured(&tx_in.subject) { 149 Ok(None) => bounce(self.db, "missing public key").await?, 150 Ok(Some(subject)) => match db::register_tx_in( 151 self.db, 152 &tx_in, 153 &Some(subject), 154 &Timestamp::now(), 155 ) 156 .await? 157 { 158 AddIncomingResult::Success { new, .. } => { 159 if new { 160 info!(target: "worker", "in {tx_in}"); 161 } else { 162 trace!(target: "worker", "in {tx_in} already seen"); 163 } 164 } 165 AddIncomingResult::ReservePubReuse => { 166 bounce(self.db, "reserve pub reuse").await? 167 } 168 }, 169 Err(e) => bounce(self.db, &e.to_string()).await?, 170 } 171 } 172 AccountType::Normal => { 173 match db::register_tx_in(self.db, &tx_in, &None, &Timestamp::now()) 174 .await? 175 { 176 AddIncomingResult::Success { new, .. } => { 177 if new { 178 info!(target: "worker", "in {tx_in}"); 179 } else { 180 trace!(target: "worker", "in {tx_in} already seen"); 181 } 182 } 183 AddIncomingResult::ReservePubReuse => unreachable!(), 184 } 185 } 186 } 187 } 188 Tx::Out(tx_out) => { 189 match tx_out.status { 190 TxStatus::ToBeRecorded => { 191 self.recover_tx(&tx_out).await?; 192 continue; 193 } 194 TxStatus::PendingFirstSignature 195 | TxStatus::PendingSecondSignature 196 | TxStatus::PendingProcessing 197 | TxStatus::Verified 198 | TxStatus::PartiallyCompleted 199 | TxStatus::UnderReview => { 200 // Still pending 201 debug!(target: "worker", "pending out {tx_out}"); 202 continue; 203 } 204 TxStatus::Rejected | TxStatus::Canceled | TxStatus::Completed => {} 205 } 206 match self.account_type { 207 AccountType::Exchange => { 208 let kind = if let Ok(subject) = 209 subject::parse_outgoing(&tx_out.subject) 210 { 211 TxOutKind::Talerable(subject) 212 } else if let Ok(bounced) = parse_bounce_outgoing(&tx_out.subject) { 213 TxOutKind::Bounce(bounced) 214 } else { 215 TxOutKind::Simple 216 }; 217 if tx_out.status == TxStatus::Completed { 218 let res = db::register_tx_out( 219 self.db, 220 &tx_out, 221 &kind, 222 &Timestamp::now(), 223 ) 224 .await?; 225 match res.result { 226 RegisterResult::idempotent => match kind { 227 TxOutKind::Simple => { 228 trace!(target: "worker", "out malformed {tx_out} already seen") 229 } 230 TxOutKind::Bounce(_) => { 231 trace!(target: "worker", "out bounce {tx_out} already seen") 232 } 233 TxOutKind::Talerable(_) => { 234 trace!(target: "worker", "out {tx_out} already seen") 235 } 236 }, 237 RegisterResult::known => match kind { 238 TxOutKind::Simple => { 239 warn!(target: "worker", "out malformed {tx_out}") 240 } 241 TxOutKind::Bounce(_) => { 242 info!(target: "worker", "out bounce {tx_out}") 243 } 244 TxOutKind::Talerable(_) => { 245 info!(target: "worker", "out {tx_out}") 246 } 247 }, 248 RegisterResult::recovered => match kind { 249 TxOutKind::Simple => { 250 warn!(target: "worker", "out malformed (recovered) {tx_out}") 251 } 252 TxOutKind::Bounce(_) => { 253 warn!(target: "worker", "out bounce (recovered) {tx_out}") 254 } 255 TxOutKind::Talerable(_) => { 256 warn!(target: "worker", "out (recovered) {tx_out}") 257 } 258 }, 259 } 260 } else { 261 let bounced = match kind { 262 TxOutKind::Simple => None, 263 TxOutKind::Bounce(bounced) => Some(bounced), 264 TxOutKind::Talerable(_) => None, 265 }; 266 let res = db::register_tx_out_failure( 267 self.db, 268 tx_out.code, 269 bounced, 270 &Timestamp::now(), 271 ) 272 .await?; 273 if let Some(id) = res.initiated_id { 274 if res.new { 275 error!(target: "worker", "out failure {id} {tx_out}"); 276 } else { 277 trace!(target: "worker", "out failure {id} {tx_out} already seen"); 278 } 279 } 280 } 281 } 282 AccountType::Normal => { 283 if tx_out.status == TxStatus::Completed { 284 let res = db::register_tx_out( 285 self.db, 286 &tx_out, 287 &TxOutKind::Simple, 288 &Timestamp::now(), 289 ) 290 .await?; 291 match res.result { 292 RegisterResult::idempotent => { 293 trace!(target: "worker", "out {tx_out} already seen"); 294 } 295 RegisterResult::known => { 296 info!(target: "worker", "out {tx_out}"); 297 } 298 RegisterResult::recovered => { 299 warn!(target: "worker", "out (recovered) {tx_out}"); 300 } 301 } 302 } else { 303 let res = db::register_tx_out_failure( 304 self.db, 305 tx_out.code, 306 None, 307 &Timestamp::now(), 308 ) 309 .await?; 310 if let Some(id) = res.initiated_id { 311 if res.new { 312 error!(target: "worker", "out failure {id} {tx_out}"); 313 } else { 314 trace!(target: "worker", "out failure {id} {tx_out} already seen"); 315 } 316 } 317 } 318 } 319 } 320 } 321 } 322 } 323 324 if let Some(_next) = &next { 325 // Update in db cursor only if all previous transactions where final 326 if all_final { 327 // debug!(target: "worker", "advance cursor {next:?}"); 328 // kv_set(&mut *self.db, TXS_CURSOR_KEY, &next).await?; TODO cursor is broken 329 } 330 } else { 331 break; 332 } 333 } 334 335 // Send transactions 336 let start = Timestamp::now(); 337 let now = Zoned::now(); 338 loop { 339 let batch = db::pending_batch(&mut *self.db, &start).await?; 340 if batch.is_empty() { 341 break; 342 } 343 for tx in batch { 344 debug!(target: "worker", "send tx {tx}"); 345 self.init_tx(&tx, &now).await?; 346 } 347 } 348 Ok(()) 349 } 350 351 /// Try to sign an unsigned initiated transaction 352 pub async fn recover_tx(&mut self, tx: &TxOut) -> WorkerResult { 353 if let Some(_) = db::initiated_exists_for_code(&mut *self.db, tx.code).await? { 354 // Known initiated we submit it 355 assert_eq!(tx.amount.frac, 0); 356 self.submit_tx( 357 tx.code, 358 -(tx.amount.val as f64), 359 &tx.value_date, 360 tx.creditor.bban(), 361 ) 362 .await?; 363 } else { 364 // The transaction is unknown (we failed after creating it and before storing it in the db) 365 // we delete it 366 self.client.delete_tx(tx.code).await?; 367 debug!(target: "worker", "out {}: delete uncompleted orphan", tx.code); 368 } 369 370 Ok(()) 371 } 372 373 /// Create and sign a forint transfer 374 pub async fn init_tx(&mut self, tx: &Initiated, now: &Zoned) -> WorkerResult { 375 trace!(target: "worker", "init tx {tx}"); 376 assert_eq!(tx.amount.frac, 0); 377 let date = now.date(); 378 // Initialize the new transaction, on failure an orphan initiated transaction can be created 379 let res = self 380 .client 381 .init_tx( 382 self.account_code, 383 tx.amount.val as f64, 384 &tx.subject, 385 &date, 386 &tx.creditor.name, 387 tx.creditor.bban(), 388 ) 389 .await; 390 fail_point("init-tx")?; 391 let info = match res { 392 // Check if succeeded 393 Ok(info) => { 394 // Update transaction status, on failure the initiated transaction will be orphan 395 db::initiated_submit_success(&mut *self.db, tx.id, &Timestamp::now(), info.code) 396 .await?; 397 info 398 } 399 Err(e) => { 400 if let ApiErr { 401 kind: ErrKind::Magnet(e), 402 .. 403 } = &e 404 { 405 // Check if error is permanent 406 if matches!( 407 (e.error_code, e.short_message.as_str()), 408 (404, "BSZLA_NEM_TALALHATO") // Unknown account 409 | (409, "FORRAS_SZAMLA_ESZAMLA_EGYEZIK") // Same account 410 ) { 411 db::initiated_submit_permanent_failure( 412 &mut *self.db, 413 tx.id, 414 &Timestamp::now(), 415 &e.to_string(), 416 ) 417 .await?; 418 error!(target: "worker", "initiated failure {tx}: {e}"); 419 return WorkerResult::Ok(()); 420 } 421 } 422 return Err(e.into()); 423 } 424 }; 425 trace!(target: "worker", "init tx {}", info.code); 426 427 // Sign transaction 428 self.submit_tx(info.code, info.amount, &date, tx.creditor.bban()) 429 .await?; 430 Ok(()) 431 } 432 433 /** Submit an initiated forint transfer */ 434 pub async fn submit_tx( 435 &mut self, 436 tx_code: u64, 437 amount: f64, 438 date: &Date, 439 creditor: &str, 440 ) -> WorkerResult { 441 debug!(target: "worker", "submit tx {tx_code}"); 442 fail_point("submit-tx")?; 443 // Submit an initiated transaction, on failure we will retry 444 match self 445 .client 446 .submit_tx( 447 self.key, 448 self.account_number, 449 tx_code, 450 amount, 451 date, 452 creditor, 453 ) 454 .await 455 { 456 Ok(_) => Ok(()), 457 Err(e) => { 458 if let ApiErr { 459 kind: ErrKind::Magnet(e), 460 .. 461 } = &e 462 { 463 // Check if soft failure 464 if matches!( 465 (e.error_code, e.short_message.as_str()), 466 (409, "TRANZAKCIO_ROSSZ_STATUS") // Already summited or cannot be signed 467 ) { 468 warn!(target: "worker", "submit tx {tx_code}: {e}"); 469 return Ok(()); 470 } 471 } 472 Err(e.into()) 473 } 474 } 475 } 476 } 477 478 pub enum Tx { 479 In(TxIn), 480 Out(TxOut), 481 } 482 483 pub fn extract_tx_info(tx: TxDto) -> Tx { 484 // TODO amount from f64 without allocations 485 let amount = amount::amount(format!("{}:{}", tx.currency, tx.amount.abs())); 486 // TODO we should support non hungarian account and error handling 487 let iban = if tx.counter_account.starts_with("HU") { 488 let iban: IBAN = tx.counter_account.parse().unwrap(); 489 HuIban::try_from(iban).unwrap() 490 } else { 491 HuIban::from_bban(&tx.counter_account).unwrap() 492 }; 493 let counter_account = FullHuPayto::new(iban, tx.counter_name); 494 if tx.amount.is_sign_positive() { 495 Tx::In(TxIn { 496 code: tx.code, 497 amount, 498 subject: tx.subject.unwrap_or_default(), 499 debtor: counter_account, 500 value_date: tx.value_date, 501 status: tx.status, 502 }) 503 } else { 504 Tx::Out(TxOut { 505 code: tx.code, 506 amount, 507 subject: tx.subject.unwrap_or_default(), 508 creditor: counter_account, 509 value_date: tx.value_date, 510 status: tx.status, 511 }) 512 } 513 } 514 515 #[derive(Debug, thiserror::Error)] 516 pub enum BounceSubjectErr { 517 #[error("missing parts")] 518 MissingParts, 519 #[error("not a bounce")] 520 NotBounce, 521 #[error("malformed bounced id: {0}")] 522 Id(#[from] ParseIntError), 523 } 524 525 pub fn parse_bounce_outgoing(subject: &str) -> Result<u32, BounceSubjectErr> { 526 let (prefix, id) = subject 527 .rsplit_once(" ") 528 .ok_or(BounceSubjectErr::MissingParts)?; 529 if !prefix.starts_with("bounce") { 530 return Err(BounceSubjectErr::NotBounce); 531 } 532 let id: u32 = id.parse()?; 533 Ok(id) 534 }