taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

worker.rs (13349B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use jiff::Timestamp;
     18 use sqlx::PgConnection;
     19 use taler_api::subject::{self, parse_incoming_unstructured};
     20 use taler_common::types::amount::{self, Currency};
     21 use tracing::{debug, error, info, trace, warn};
     22 
     23 use crate::{
     24     FullCyclosPayto,
     25     config::AccountType,
     26     cyclos_api::{
     27         api::{ApiErr, ErrKind},
     28         client::Client,
     29         types::{AccountKind, HistoryItem, NotFoundError},
     30     },
     31     db::{self, AddIncomingResult, RegisterResult, TxIn, TxOut, TxOutKind},
     32 };
     33 
     34 #[derive(Debug, thiserror::Error)]
     35 pub enum WorkerError {
     36     #[error(transparent)]
     37     Db(#[from] sqlx::Error),
     38     #[error(transparent)]
     39     Api(#[from] ApiErr),
     40     //#[error(transparent)]
     41     //Injected(#[from] InjectedErr),
     42 }
     43 
     44 pub type WorkerResult = Result<(), WorkerError>;
     45 
     46 pub struct Worker<'a> {
     47     pub client: &'a Client<'a>,
     48     pub db: &'a mut PgConnection,
     49     pub currency: Currency,
     50     pub account_type_id: u64,
     51     pub account_type: AccountType,
     52 }
     53 
     54 impl Worker<'_> {
     55     /// Run a single worker pass
     56     pub async fn run(&mut self) -> WorkerResult {
     57         // Sync transactions
     58         //let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused
     59 
     60         loop {
     61             let transfers = self.client.transfers(self.account_type_id).await?;
     62             for transfer in transfers {
     63                 let tx = extract_tx_info(transfer);
     64                 match tx {
     65                     Tx::In(tx_in) => self.ingest_in(tx_in).await?,
     66                     Tx::Out(tx_out) => self.ingest_out(tx_out).await?,
     67                 }
     68             }
     69 
     70             break; // TODO pagination
     71         }
     72 
     73         // Send transactions
     74         let start = Timestamp::now();
     75         loop {
     76             let batch = db::pending_batch(&mut *self.db, &start).await?;
     77             if batch.is_empty() {
     78                 break;
     79             }
     80             for initiated in batch {
     81                 debug!(target: "worker", "send tx {initiated}");
     82                 let res = self
     83                     .client
     84                     .direct_payment(initiated.creditor.0, initiated.amount, &initiated.subject)
     85                     .await;
     86                 // TODO fail_point("init-tx")?;
     87                 match res {
     88                     Ok(tx) => {
     89                         // Update transaction status, on failure the initiated transaction will be orphan
     90                         db::initiated_submit_success(
     91                             &mut *self.db,
     92                             initiated.id,
     93                             &tx.date,
     94                             tx.id.0,
     95                         )
     96                         .await?;
     97                         trace!(target: "worker", "init tx {}", tx.id);
     98                     }
     99                     Err(e) => {
    100                         let msg = match e.kind {
    101                             ErrKind::Unknown(NotFoundError { entity_type, key }) => {
    102                                 format!("unknown {entity_type} {key}")
    103                             }
    104                             ErrKind::Forbidden(err) => err.to_string(),
    105                             _ => return Err(e.into()),
    106                         };
    107                         // TODO is permission should be considered are hard or soft failure ?
    108                         db::initiated_submit_permanent_failure(
    109                             &mut *self.db,
    110                             initiated.id,
    111                             &Timestamp::now(),
    112                             &msg,
    113                         )
    114                         .await?;
    115                         error!(target: "worker", "initiated failure {initiated}: {msg}");
    116                     }
    117                 }
    118             }
    119         }
    120         Ok(())
    121     }
    122 
    123     /// Ingest an incoming transaction
    124     async fn ingest_in(&mut self, tx: TxIn) -> WorkerResult {
    125         match self.account_type {
    126             AccountType::Exchange => {
    127                 let transfer = self.client.transfer(tx.transfer_id).await?;
    128                 let bounce = async |db: &mut PgConnection,
    129                                     reason: &str|
    130                        -> Result<(), WorkerError> {
    131                     // Fetch existing transaction
    132                     if let Some(chargeback) = transfer.chargeback_by {
    133                         let res = db::register_bounced_tx_in(
    134                             db,
    135                             &tx,
    136                             *chargeback.id,
    137                             reason,
    138                             &Timestamp::now(),
    139                         )
    140                         .await?;
    141                         if res.tx_new {
    142                             info!(target: "worker",
    143                                 "in {tx} bounced (recovered) in {}: {reason}", chargeback.id
    144                             );
    145                         } else {
    146                             trace!(target: "worker",
    147                                 "in {tx} already seen and bounced in {}: {reason}",chargeback.id
    148                             );
    149                         }
    150                     } else if !transfer.can_chargeback {
    151                         match db::register_tx_in(db, &tx, &None, &Timestamp::now()).await? {
    152                             AddIncomingResult::Success { new, .. } => {
    153                                 if new {
    154                                     warn!(target: "worker", "in {tx} cannot bounce: {reason}");
    155                                 } else {
    156                                     trace!(target: "worker", "in {tx} already seen and cannot bounce ");
    157                                 }
    158                             }
    159                             AddIncomingResult::ReservePubReuse => unreachable!(),
    160                         }
    161                     } else {
    162                         let chargeback_id = self.client.chargeback(*transfer.id).await?;
    163                         let res = db::register_bounced_tx_in(
    164                             db,
    165                             &tx,
    166                             chargeback_id,
    167                             reason,
    168                             &Timestamp::now(),
    169                         )
    170                         .await?;
    171                         if res.tx_new {
    172                             info!(target: "worker", "in {tx} bounced in {chargeback_id}: {reason}");
    173                         } else {
    174                             trace!(target: "worker", "in {tx} already seen and bounced in {chargeback_id}: {reason}");
    175                         }
    176                     }
    177                     Ok(())
    178                 };
    179                 if let Some(chargeback) = transfer.chargeback_of {
    180                     warn!("{tx} - This is a transaction failure, we need to handle it");
    181                     return Ok(());
    182                 }
    183                 match parse_incoming_unstructured(&tx.subject) {
    184                     Ok(None) => bounce(self.db, "missing public key").await?,
    185                     Ok(Some(subject)) => {
    186                         match db::register_tx_in(self.db, &tx, &Some(subject), &Timestamp::now())
    187                             .await?
    188                         {
    189                             AddIncomingResult::Success { new, .. } => {
    190                                 if new {
    191                                     info!(target: "worker", "in {tx}");
    192                                 } else {
    193                                     trace!(target: "worker", "in {tx} already seen");
    194                                 }
    195                             }
    196                             AddIncomingResult::ReservePubReuse => {
    197                                 bounce(self.db, "reserve pub reuse").await?
    198                             }
    199                         }
    200                     }
    201                     Err(e) => bounce(self.db, &e.to_string()).await?,
    202                 }
    203             }
    204             AccountType::Normal => {
    205                 match db::register_tx_in(self.db, &tx, &None, &Timestamp::now()).await? {
    206                     AddIncomingResult::Success { new, .. } => {
    207                         if new {
    208                             info!(target: "worker", "in {tx}");
    209                         } else {
    210                             trace!(target: "worker", "in {tx} already seen");
    211                         }
    212                     }
    213                     AddIncomingResult::ReservePubReuse => unreachable!(),
    214                 }
    215             }
    216         }
    217         Ok(())
    218     }
    219 
    220     async fn ingest_out(&mut self, tx: TxOut) -> WorkerResult {
    221         match self.account_type {
    222             AccountType::Exchange => {
    223                 let transfer = self.client.transfer(tx.transfer_id).await?;
    224 
    225                 let kind = if let Ok(subject) = subject::parse_outgoing(&tx.subject) {
    226                     TxOutKind::Talerable(subject)
    227                 } else if let Some(chargeback) = transfer.chargeback_of {
    228                     TxOutKind::Bounce(*chargeback.id)
    229                 } else {
    230                     TxOutKind::Simple
    231                 };
    232 
    233                 if let Some(chargeback) = transfer.chargeback_by {
    234                     warn!("{tx} - This is a transaction failure, we need to handle it");
    235                     return Ok(());
    236                 }
    237 
    238                 let res = db::register_tx_out(self.db, &tx, &kind, &Timestamp::now()).await?;
    239                 match res.result {
    240                     RegisterResult::idempotent => match kind {
    241                         TxOutKind::Simple => {
    242                             trace!(target: "worker", "out malformed {tx} already seen")
    243                         }
    244                         TxOutKind::Bounce(_) => {
    245                             trace!(target: "worker", "out bounce {tx} already seen")
    246                         }
    247                         TxOutKind::Talerable(_) => {
    248                             trace!(target: "worker", "out {tx} already seen")
    249                         }
    250                     },
    251                     RegisterResult::known => match kind {
    252                         TxOutKind::Simple => {
    253                             warn!(target: "worker", "out malformed {tx}")
    254                         }
    255                         TxOutKind::Bounce(_) => {
    256                             info!(target: "worker", "out bounce {tx}")
    257                         }
    258                         TxOutKind::Talerable(_) => {
    259                             info!(target: "worker", "out {tx}")
    260                         }
    261                     },
    262                     RegisterResult::recovered => match kind {
    263                         TxOutKind::Simple => {
    264                             warn!(target: "worker", "out malformed (recovered) {tx}")
    265                         }
    266                         TxOutKind::Bounce(_) => {
    267                             warn!(target: "worker", "out bounce (recovered) {tx}")
    268                         }
    269                         TxOutKind::Talerable(_) => {
    270                             warn!(target: "worker", "out (recovered) {tx}")
    271                         }
    272                     },
    273                 }
    274             }
    275             AccountType::Normal => {
    276                 let res = db::register_tx_out(self.db, &tx, &TxOutKind::Simple, &Timestamp::now())
    277                     .await?;
    278                 match res.result {
    279                     RegisterResult::idempotent => {
    280                         trace!(target: "worker", "out {tx} already seen");
    281                     }
    282                     RegisterResult::known => {
    283                         info!(target: "worker", "out {tx}");
    284                     }
    285                     RegisterResult::recovered => {
    286                         warn!(target: "worker", "out (recovered) {tx}");
    287                     }
    288                 }
    289             }
    290         }
    291         Ok(())
    292     }
    293 }
    294 
    295 pub enum Tx {
    296     In(TxIn),
    297     Out(TxOut),
    298 }
    299 
    300 pub fn extract_tx_info(tx: HistoryItem) -> Tx {
    301     let amount = amount::decimal(tx.amount.trim_start_matches('-'));
    302     let payto = match tx.related_account.kind {
    303         AccountKind::System => {
    304             FullCyclosPayto::new(tx.related_account.ty.id, tx.related_account.ty.name)
    305         }
    306         AccountKind::User { user } => FullCyclosPayto::new(user.id, user.display),
    307     };
    308     if tx.amount.starts_with("-") {
    309         Tx::Out(TxOut {
    310             transfer_id: *tx.id,
    311             tx_id: tx.transaction.map(|it| *it.id),
    312             amount,
    313             subject: tx.description.unwrap_or_default(),
    314             creditor: payto,
    315             valued_at: tx.date,
    316         })
    317     } else {
    318         Tx::In(TxIn {
    319             transfer_id: *tx.id,
    320             tx_id: tx.transaction.map(|it| *it.id),
    321             amount,
    322             subject: tx.description.unwrap_or_default(),
    323             debtor: payto,
    324             valued_at: tx.date,
    325         })
    326     }
    327 }