worker.rs (13349B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use jiff::Timestamp; 18 use sqlx::PgConnection; 19 use taler_api::subject::{self, parse_incoming_unstructured}; 20 use taler_common::types::amount::{self, Currency}; 21 use tracing::{debug, error, info, trace, warn}; 22 23 use crate::{ 24 FullCyclosPayto, 25 config::AccountType, 26 cyclos_api::{ 27 api::{ApiErr, ErrKind}, 28 client::Client, 29 types::{AccountKind, HistoryItem, NotFoundError}, 30 }, 31 db::{self, AddIncomingResult, RegisterResult, TxIn, TxOut, TxOutKind}, 32 }; 33 34 #[derive(Debug, thiserror::Error)] 35 pub enum WorkerError { 36 #[error(transparent)] 37 Db(#[from] sqlx::Error), 38 #[error(transparent)] 39 Api(#[from] ApiErr), 40 //#[error(transparent)] 41 //Injected(#[from] InjectedErr), 42 } 43 44 pub type WorkerResult = Result<(), WorkerError>; 45 46 pub struct Worker<'a> { 47 pub client: &'a Client<'a>, 48 pub db: &'a mut PgConnection, 49 pub currency: Currency, 50 pub account_type_id: u64, 51 pub account_type: AccountType, 52 } 53 54 impl Worker<'_> { 55 /// Run a single worker pass 56 pub async fn run(&mut self) -> WorkerResult { 57 // Sync transactions 58 //let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused 59 60 loop { 61 let transfers = self.client.transfers(self.account_type_id).await?; 62 for transfer in transfers { 63 let tx = extract_tx_info(transfer); 64 match tx { 65 Tx::In(tx_in) => self.ingest_in(tx_in).await?, 66 Tx::Out(tx_out) => self.ingest_out(tx_out).await?, 67 } 68 } 69 70 break; // TODO pagination 71 } 72 73 // Send transactions 74 let start = Timestamp::now(); 75 loop { 76 let batch = db::pending_batch(&mut *self.db, &start).await?; 77 if batch.is_empty() { 78 break; 79 } 80 for initiated in batch { 81 debug!(target: "worker", "send tx {initiated}"); 82 let res = self 83 .client 84 .direct_payment(initiated.creditor.0, initiated.amount, &initiated.subject) 85 .await; 86 // TODO fail_point("init-tx")?; 87 match res { 88 Ok(tx) => { 89 // Update transaction status, on failure the initiated transaction will be orphan 90 db::initiated_submit_success( 91 &mut *self.db, 92 initiated.id, 93 &tx.date, 94 tx.id.0, 95 ) 96 .await?; 97 trace!(target: "worker", "init tx {}", tx.id); 98 } 99 Err(e) => { 100 let msg = match e.kind { 101 ErrKind::Unknown(NotFoundError { entity_type, key }) => { 102 format!("unknown {entity_type} {key}") 103 } 104 ErrKind::Forbidden(err) => err.to_string(), 105 _ => return Err(e.into()), 106 }; 107 // TODO is permission should be considered are hard or soft failure ? 108 db::initiated_submit_permanent_failure( 109 &mut *self.db, 110 initiated.id, 111 &Timestamp::now(), 112 &msg, 113 ) 114 .await?; 115 error!(target: "worker", "initiated failure {initiated}: {msg}"); 116 } 117 } 118 } 119 } 120 Ok(()) 121 } 122 123 /// Ingest an incoming transaction 124 async fn ingest_in(&mut self, tx: TxIn) -> WorkerResult { 125 match self.account_type { 126 AccountType::Exchange => { 127 let transfer = self.client.transfer(tx.transfer_id).await?; 128 let bounce = async |db: &mut PgConnection, 129 reason: &str| 130 -> Result<(), WorkerError> { 131 // Fetch existing transaction 132 if let Some(chargeback) = transfer.chargeback_by { 133 let res = db::register_bounced_tx_in( 134 db, 135 &tx, 136 *chargeback.id, 137 reason, 138 &Timestamp::now(), 139 ) 140 .await?; 141 if res.tx_new { 142 info!(target: "worker", 143 "in {tx} bounced (recovered) in {}: {reason}", chargeback.id 144 ); 145 } else { 146 trace!(target: "worker", 147 "in {tx} already seen and bounced in {}: {reason}",chargeback.id 148 ); 149 } 150 } else if !transfer.can_chargeback { 151 match db::register_tx_in(db, &tx, &None, &Timestamp::now()).await? { 152 AddIncomingResult::Success { new, .. } => { 153 if new { 154 warn!(target: "worker", "in {tx} cannot bounce: {reason}"); 155 } else { 156 trace!(target: "worker", "in {tx} already seen and cannot bounce "); 157 } 158 } 159 AddIncomingResult::ReservePubReuse => unreachable!(), 160 } 161 } else { 162 let chargeback_id = self.client.chargeback(*transfer.id).await?; 163 let res = db::register_bounced_tx_in( 164 db, 165 &tx, 166 chargeback_id, 167 reason, 168 &Timestamp::now(), 169 ) 170 .await?; 171 if res.tx_new { 172 info!(target: "worker", "in {tx} bounced in {chargeback_id}: {reason}"); 173 } else { 174 trace!(target: "worker", "in {tx} already seen and bounced in {chargeback_id}: {reason}"); 175 } 176 } 177 Ok(()) 178 }; 179 if let Some(chargeback) = transfer.chargeback_of { 180 warn!("{tx} - This is a transaction failure, we need to handle it"); 181 return Ok(()); 182 } 183 match parse_incoming_unstructured(&tx.subject) { 184 Ok(None) => bounce(self.db, "missing public key").await?, 185 Ok(Some(subject)) => { 186 match db::register_tx_in(self.db, &tx, &Some(subject), &Timestamp::now()) 187 .await? 188 { 189 AddIncomingResult::Success { new, .. } => { 190 if new { 191 info!(target: "worker", "in {tx}"); 192 } else { 193 trace!(target: "worker", "in {tx} already seen"); 194 } 195 } 196 AddIncomingResult::ReservePubReuse => { 197 bounce(self.db, "reserve pub reuse").await? 198 } 199 } 200 } 201 Err(e) => bounce(self.db, &e.to_string()).await?, 202 } 203 } 204 AccountType::Normal => { 205 match db::register_tx_in(self.db, &tx, &None, &Timestamp::now()).await? { 206 AddIncomingResult::Success { new, .. } => { 207 if new { 208 info!(target: "worker", "in {tx}"); 209 } else { 210 trace!(target: "worker", "in {tx} already seen"); 211 } 212 } 213 AddIncomingResult::ReservePubReuse => unreachable!(), 214 } 215 } 216 } 217 Ok(()) 218 } 219 220 async fn ingest_out(&mut self, tx: TxOut) -> WorkerResult { 221 match self.account_type { 222 AccountType::Exchange => { 223 let transfer = self.client.transfer(tx.transfer_id).await?; 224 225 let kind = if let Ok(subject) = subject::parse_outgoing(&tx.subject) { 226 TxOutKind::Talerable(subject) 227 } else if let Some(chargeback) = transfer.chargeback_of { 228 TxOutKind::Bounce(*chargeback.id) 229 } else { 230 TxOutKind::Simple 231 }; 232 233 if let Some(chargeback) = transfer.chargeback_by { 234 warn!("{tx} - This is a transaction failure, we need to handle it"); 235 return Ok(()); 236 } 237 238 let res = db::register_tx_out(self.db, &tx, &kind, &Timestamp::now()).await?; 239 match res.result { 240 RegisterResult::idempotent => match kind { 241 TxOutKind::Simple => { 242 trace!(target: "worker", "out malformed {tx} already seen") 243 } 244 TxOutKind::Bounce(_) => { 245 trace!(target: "worker", "out bounce {tx} already seen") 246 } 247 TxOutKind::Talerable(_) => { 248 trace!(target: "worker", "out {tx} already seen") 249 } 250 }, 251 RegisterResult::known => match kind { 252 TxOutKind::Simple => { 253 warn!(target: "worker", "out malformed {tx}") 254 } 255 TxOutKind::Bounce(_) => { 256 info!(target: "worker", "out bounce {tx}") 257 } 258 TxOutKind::Talerable(_) => { 259 info!(target: "worker", "out {tx}") 260 } 261 }, 262 RegisterResult::recovered => match kind { 263 TxOutKind::Simple => { 264 warn!(target: "worker", "out malformed (recovered) {tx}") 265 } 266 TxOutKind::Bounce(_) => { 267 warn!(target: "worker", "out bounce (recovered) {tx}") 268 } 269 TxOutKind::Talerable(_) => { 270 warn!(target: "worker", "out (recovered) {tx}") 271 } 272 }, 273 } 274 } 275 AccountType::Normal => { 276 let res = db::register_tx_out(self.db, &tx, &TxOutKind::Simple, &Timestamp::now()) 277 .await?; 278 match res.result { 279 RegisterResult::idempotent => { 280 trace!(target: "worker", "out {tx} already seen"); 281 } 282 RegisterResult::known => { 283 info!(target: "worker", "out {tx}"); 284 } 285 RegisterResult::recovered => { 286 warn!(target: "worker", "out (recovered) {tx}"); 287 } 288 } 289 } 290 } 291 Ok(()) 292 } 293 } 294 295 pub enum Tx { 296 In(TxIn), 297 Out(TxOut), 298 } 299 300 pub fn extract_tx_info(tx: HistoryItem) -> Tx { 301 let amount = amount::decimal(tx.amount.trim_start_matches('-')); 302 let payto = match tx.related_account.kind { 303 AccountKind::System => { 304 FullCyclosPayto::new(tx.related_account.ty.id, tx.related_account.ty.name) 305 } 306 AccountKind::User { user } => FullCyclosPayto::new(user.id, user.display), 307 }; 308 if tx.amount.starts_with("-") { 309 Tx::Out(TxOut { 310 transfer_id: *tx.id, 311 tx_id: tx.transaction.map(|it| *it.id), 312 amount, 313 subject: tx.description.unwrap_or_default(), 314 creditor: payto, 315 valued_at: tx.date, 316 }) 317 } else { 318 Tx::In(TxIn { 319 transfer_id: *tx.id, 320 tx_id: tx.transaction.map(|it| *it.id), 321 amount, 322 subject: tx.description.unwrap_or_default(), 323 debtor: payto, 324 valued_at: tx.date, 325 }) 326 } 327 }