taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

worker.rs (20520B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::time::Duration;
     18 
     19 use failure_injection::{InjectedErr, fail_point};
     20 use http_client::ApiErr;
     21 use jiff::Timestamp;
     22 use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener};
     23 use taler_api::subject::{self, parse_incoming_unstructured};
     24 use taler_common::{
     25     ExpoBackoffDecorr,
     26     config::Config,
     27     types::amount::{self, Currency},
     28 };
     29 use tokio::{join, sync::Notify};
     30 use tracing::{debug, error, info, trace, warn};
     31 
     32 use crate::{
     33     config::{AccountType, WorkerCfg},
     34     constants::SYNC_CURSOR_KEY,
     35     cyclos_api::{
     36         api::{CyclosAuth, CyclosErr},
     37         client::Client,
     38         types::{AccountKind, HistoryItem, InputError, NotFoundError, OrderBy},
     39     },
     40     db::{
     41         self, AddIncomingResult, ChargebackFailureResult, RegisterResult, TxIn, TxOut, TxOutKind,
     42         kv_get, kv_set,
     43     },
     44     notification::watch_notification,
     45 };
     46 
     47 #[derive(Debug, thiserror::Error)]
     48 pub enum WorkerError {
     49     #[error(transparent)]
     50     Db(#[from] sqlx::Error),
     51     #[error(transparent)]
     52     Api(#[from] ApiErr<CyclosErr>),
     53     #[error("Another worker is running concurrently")]
     54     Concurrency,
     55     #[error(transparent)]
     56     Injected(#[from] InjectedErr),
     57 }
     58 
     59 pub type WorkerResult = Result<(), WorkerError>;
     60 
     61 pub async fn run_worker(
     62     cfg: &Config,
     63     pool: &PgPool,
     64     client: &http_client::Client,
     65     transient: bool,
     66 ) -> anyhow::Result<()> {
     67     let cfg = WorkerCfg::parse(cfg)?;
     68     let client = Client {
     69         client,
     70         api_url: &cfg.host.api_url,
     71         auth: &CyclosAuth::Basic {
     72             username: cfg.host.username,
     73             password: cfg.host.password,
     74         },
     75     };
     76     if transient {
     77         let mut conn = pool.acquire().await?;
     78         Worker {
     79             client: &client,
     80             db: &mut conn,
     81             account_type_id: *cfg.account_type_id,
     82             payment_type_id: *cfg.payment_type_id,
     83             account_type: cfg.account_type,
     84             currency: cfg.currency.clone(),
     85         }
     86         .run()
     87         .await?;
     88         return Ok(());
     89     }
     90 
     91     let notification = Notify::new();
     92 
     93     let watcher = async {
     94         watch_notification(&client, &notification).await;
     95     };
     96     let worker = async {
     97         let mut jitter = ExpoBackoffDecorr::default();
     98         let mut skip_notifications: bool = true;
     99         loop {
    100             info!(target: "worker", "running at initialisation");
    101             let res: WorkerResult = async {
    102                 let db = &mut PgListener::connect_with(pool).await?;
    103 
    104                 // Listen to all channels
    105                 db.listen_all(["transfer"]).await?;
    106 
    107                 loop {
    108                     if !skip_notifications {
    109                         tokio::select! {
    110                             _ = tokio::time::sleep(cfg.frequency) => {
    111                                 info!(target: "worker", "running at frequency");
    112                             }
    113                             res = db.try_recv() => {
    114                                 let mut ntf = res?;
    115                                 // Conflate all notifications
    116                                 while let Some(n) = ntf {
    117                                     debug!(target: "worker", "notification from {}", n.channel());
    118                                     ntf = db.next_buffered();
    119                                 }
    120                                 info!(target: "worker", "running at db trigger");
    121                             }
    122                             _ = notification.notified() => {
    123                                 info!(target: "worker", "running at notification trigger");
    124                             }
    125                         };
    126                     }
    127                     skip_notifications = false;
    128                     Worker {
    129                         client: &client,
    130                         db: db.acquire().await?,
    131                         account_type_id: *cfg.account_type_id,
    132                         payment_type_id: *cfg.payment_type_id,
    133                         account_type: cfg.account_type,
    134                         currency: cfg.currency.clone(),
    135                     }
    136                     .run()
    137                     .await?;
    138                     jitter.reset();
    139                 }
    140             }
    141             .await;
    142             let err = res.unwrap_err();
    143             error!(target: "worker", "{err}");
    144 
    145             match err {
    146                 WorkerError::Concurrency => {
    147                     // This error won't resolve by itself easily and it mean we are actually making progress
    148                     // in another worker so we can jitter more aggressively
    149                     tokio::time::sleep(Duration::from_secs(15)).await;
    150                     skip_notifications = false;
    151                 }
    152                 WorkerError::Api(ApiErr {
    153                     ctx: _,
    154                     err: CyclosErr::Input(InputError::Validation { .. }),
    155                 }) => {
    156                     // In case of validation failure we do not want to retry right away as it can DOS the service
    157                     skip_notifications = false;
    158                 }
    159                 WorkerError::Api(_) | WorkerError::Db(_) | WorkerError::Injected(_) => {
    160                     skip_notifications = true;
    161                 }
    162             }
    163             tokio::time::sleep(jitter.backoff()).await;
    164         }
    165     };
    166     join!(watcher, worker); // TODO try_join
    167     Ok(())
    168 }
    169 
    170 pub struct Worker<'a> {
    171     pub client: &'a Client<'a>,
    172     pub db: &'a mut PgConnection,
    173     pub currency: Currency,
    174     pub account_type_id: i64,
    175     pub payment_type_id: i64,
    176     pub account_type: AccountType,
    177 }
    178 
    179 impl Worker<'_> {
    180     /// Run a single worker pass
    181     pub async fn run(&mut self) -> WorkerResult {
    182         // Some worker operations are not idempotent, therefore it's not safe to have multiple worker
    183         // running concurrently. We use a global Postgres advisory lock to prevent it.
    184         if !db::worker_lock(self.db).await? {
    185             return Err(WorkerError::Concurrency);
    186         };
    187 
    188         // Sync transactions
    189         let mut cursor: Timestamp = kv_get(&mut *self.db, SYNC_CURSOR_KEY)
    190             .await?
    191             .unwrap_or_default();
    192 
    193         loop {
    194             let page = self
    195                 .client
    196                 .history(self.account_type_id, OrderBy::DateAsc, 0, Some(cursor))
    197                 .await?;
    198             for transfer in page.page {
    199                 if transfer.date > cursor {
    200                     cursor = transfer.date;
    201                 }
    202                 let tx = extract_tx_info(transfer);
    203                 match tx {
    204                     Tx::In(tx_in) => self.ingest_in(tx_in).await?,
    205                     Tx::Out(tx_out) => self.ingest_out(tx_out).await?,
    206                 }
    207             }
    208 
    209             kv_set(&mut *self.db, SYNC_CURSOR_KEY, &cursor).await?;
    210 
    211             if !page.has_next_page {
    212                 break;
    213             }
    214         }
    215 
    216         // Send transactions
    217         let start = Timestamp::now();
    218         loop {
    219             let batch = db::pending_batch(&mut *self.db, &start).await?;
    220             if batch.is_empty() {
    221                 break;
    222             }
    223             for initiated in batch {
    224                 debug!(target: "worker", "send tx {initiated}");
    225                 let res = self
    226                     .client
    227                     .direct_payment(
    228                         initiated.creditor_id,
    229                         self.payment_type_id,
    230                         initiated.amount,
    231                         &initiated.subject,
    232                     )
    233                     .await;
    234                 fail_point("direct-payment")?;
    235                 match res {
    236                     Ok(tx) => {
    237                         // Update transaction status, on failure the initiated transaction will be orphan
    238                         db::initiated_submit_success(
    239                             &mut *self.db,
    240                             initiated.id,
    241                             &tx.date,
    242                             tx.id.0,
    243                         )
    244                         .await?;
    245                         trace!(target: "worker", "init tx {}", tx.id);
    246                     }
    247                     Err(e) => {
    248                         let msg = match e.err {
    249                             CyclosErr::Unknown(NotFoundError { entity_type, key }) => {
    250                                 format!("unknown {entity_type} {key}")
    251                             }
    252                             CyclosErr::Forbidden(err) => err.to_string(),
    253                             _ => return Err(e.into()),
    254                         };
    255                         // TODO is permission should be considered are hard or soft failure ?
    256                         db::initiated_submit_permanent_failure(&mut *self.db, initiated.id, &msg)
    257                             .await?;
    258                         error!(target: "worker", "initiated failure {initiated}: {msg}");
    259                     }
    260                 }
    261             }
    262         }
    263         Ok(())
    264     }
    265 
    266     /// Ingest an incoming transaction
    267     async fn ingest_in(&mut self, tx: TxIn) -> WorkerResult {
    268         match self.account_type {
    269             AccountType::Exchange => {
    270                 let transfer = self.client.transfer(tx.transfer_id).await?;
    271                 let bounce = async |db: &mut PgConnection,
    272                                     reason: &str|
    273                        -> Result<(), WorkerError> {
    274                     // Fetch existing transaction
    275                     if let Some(chargeback) = transfer.charged_back_by {
    276                         let res = db::register_bounced_tx_in(
    277                             db,
    278                             &tx,
    279                             *chargeback.id,
    280                             reason,
    281                             &Timestamp::now(),
    282                         )
    283                         .await?;
    284                         if res.tx_new {
    285                             info!(target: "worker",
    286                                 "in {tx} bounced (recovered) in {}: {reason}", chargeback.id
    287                             );
    288                         } else {
    289                             trace!(target: "worker",
    290                                 "in {tx} already seen and bounced in {}: {reason}",chargeback.id
    291                             );
    292                         }
    293                     } else if !transfer.can_chargeback {
    294                         match db::register_tx_in(db, &tx, &None, &Timestamp::now()).await? {
    295                             AddIncomingResult::Success { new, .. } => {
    296                                 if new {
    297                                     warn!(target: "worker", "in {tx} cannot bounce: {reason}");
    298                                 } else {
    299                                     trace!(target: "worker", "in {tx} already seen and cannot bounce ");
    300                                 }
    301                             }
    302                             AddIncomingResult::ReservePubReuse => unreachable!(),
    303                         }
    304                     } else {
    305                         let chargeback_id = self.client.chargeback(*transfer.id).await?;
    306                         fail_point("chargeback")?;
    307                         let res = db::register_bounced_tx_in(
    308                             db,
    309                             &tx,
    310                             chargeback_id,
    311                             reason,
    312                             &Timestamp::now(),
    313                         )
    314                         .await?;
    315                         if res.tx_new {
    316                             info!(target: "worker", "in {tx} bounced in {chargeback_id}: {reason}");
    317                         } else {
    318                             trace!(target: "worker", "in {tx} already seen and bounced in {chargeback_id}: {reason}");
    319                         }
    320                     }
    321                     Ok(())
    322                 };
    323                 if let Some(chargeback) = transfer.chargeback_of {
    324                     // This a chargeback of one of our transaction, if we bounce we might enter a loop
    325                     match db::initiated_chargeback_failure(&mut *self.db, *chargeback.id).await? {
    326                         ChargebackFailureResult::Unknown => {
    327                             trace!(target: "worker", "initiated failure unknown: charged back")
    328                         }
    329                         ChargebackFailureResult::Known(initiated) => {
    330                             error!(target: "worker", "initiated failure {initiated}: charged back")
    331                         }
    332                         ChargebackFailureResult::Idempotent(initiated) => {
    333                             trace!(target: "worker", "initiated failure {initiated} already seen: charged back")
    334                         }
    335                     }
    336                     // Sill register the incoming transaction as an incoming one
    337                     match db::register_tx_in(self.db, &tx, &None, &Timestamp::now()).await? {
    338                         AddIncomingResult::Success { new, .. } => {
    339                             if new {
    340                                 info!(target: "worker", "in {tx} chargeback");
    341                             } else {
    342                                 trace!(target: "worker", "in {tx} chargeback already seen");
    343                             }
    344                         }
    345                         AddIncomingResult::ReservePubReuse => unreachable!(),
    346                     }
    347 
    348                     return Ok(());
    349                 }
    350                 match parse_incoming_unstructured(&tx.subject) {
    351                     Ok(None) => bounce(self.db, "missing public key").await?,
    352                     Ok(Some(subject)) => {
    353                         match db::register_tx_in(self.db, &tx, &Some(subject), &Timestamp::now())
    354                             .await?
    355                         {
    356                             AddIncomingResult::Success { new, .. } => {
    357                                 if new {
    358                                     info!(target: "worker", "in {tx}");
    359                                 } else {
    360                                     trace!(target: "worker", "in {tx} already seen");
    361                                 }
    362                             }
    363                             AddIncomingResult::ReservePubReuse => {
    364                                 bounce(self.db, "reserve pub reuse").await?
    365                             }
    366                         }
    367                     }
    368                     Err(e) => bounce(self.db, &e.to_string()).await?,
    369                 }
    370             }
    371             AccountType::Normal => {
    372                 match db::register_tx_in(self.db, &tx, &None, &Timestamp::now()).await? {
    373                     AddIncomingResult::Success { new, .. } => {
    374                         if new {
    375                             info!(target: "worker", "in {tx}");
    376                         } else {
    377                             trace!(target: "worker", "in {tx} already seen");
    378                         }
    379                     }
    380                     AddIncomingResult::ReservePubReuse => unreachable!(),
    381                 }
    382             }
    383         }
    384         Ok(())
    385     }
    386 
    387     async fn ingest_out(&mut self, tx: TxOut) -> WorkerResult {
    388         match self.account_type {
    389             AccountType::Exchange => {
    390                 let transfer = self.client.transfer(tx.transfer_id).await?;
    391 
    392                 if transfer.charged_back_by.is_some() {
    393                     match db::initiated_chargeback_failure(&mut *self.db, *transfer.id).await? {
    394                         ChargebackFailureResult::Unknown => {
    395                             trace!(target: "worker", "initiated failure unknown: charged back")
    396                         }
    397                         ChargebackFailureResult::Known(initiated) => {
    398                             error!(target: "worker", "initiated failure {initiated}: charged back")
    399                         }
    400                         ChargebackFailureResult::Idempotent(initiated) => {
    401                             trace!(target: "worker", "initiated failure {initiated} already seen: charged back")
    402                         }
    403                     }
    404                 }
    405 
    406                 let kind = if let Ok(subject) = subject::parse_outgoing(&tx.subject) {
    407                     TxOutKind::Talerable(subject)
    408                 } else if let Some(chargeback) = &transfer.chargeback_of {
    409                     TxOutKind::Bounce(*chargeback.id)
    410                 } else {
    411                     TxOutKind::Simple
    412                 };
    413 
    414                 let res = db::register_tx_out(self.db, &tx, &kind, &Timestamp::now()).await?;
    415                 match res.result {
    416                     RegisterResult::idempotent => match kind {
    417                         TxOutKind::Simple => {
    418                             trace!(target: "worker", "out malformed {tx} already seen")
    419                         }
    420                         TxOutKind::Bounce(_) => {
    421                             trace!(target: "worker", "out bounce {tx} already seen")
    422                         }
    423                         TxOutKind::Talerable(_) => {
    424                             trace!(target: "worker", "out {tx} already seen")
    425                         }
    426                     },
    427                     RegisterResult::known => match kind {
    428                         TxOutKind::Simple => {
    429                             warn!(target: "worker", "out malformed {tx}")
    430                         }
    431                         TxOutKind::Bounce(_) => {
    432                             info!(target: "worker", "out bounce {tx}")
    433                         }
    434                         TxOutKind::Talerable(_) => {
    435                             info!(target: "worker", "out {tx}")
    436                         }
    437                     },
    438                     RegisterResult::recovered => match kind {
    439                         TxOutKind::Simple => {
    440                             warn!(target: "worker", "out malformed (recovered) {tx}")
    441                         }
    442                         TxOutKind::Bounce(_) => {
    443                             warn!(target: "worker", "out bounce (recovered) {tx}")
    444                         }
    445                         TxOutKind::Talerable(_) => {
    446                             warn!(target: "worker", "out (recovered) {tx}")
    447                         }
    448                     },
    449                 }
    450             }
    451             AccountType::Normal => {
    452                 let res = db::register_tx_out(self.db, &tx, &TxOutKind::Simple, &Timestamp::now())
    453                     .await?;
    454                 match res.result {
    455                     RegisterResult::idempotent => {
    456                         trace!(target: "worker", "out {tx} already seen");
    457                     }
    458                     RegisterResult::known => {
    459                         info!(target: "worker", "out {tx}");
    460                     }
    461                     RegisterResult::recovered => {
    462                         warn!(target: "worker", "out (recovered) {tx}");
    463                     }
    464                 }
    465             }
    466         }
    467         Ok(())
    468     }
    469 }
    470 
    471 pub enum Tx {
    472     In(TxIn),
    473     Out(TxOut),
    474 }
    475 
    476 pub fn extract_tx_info(tx: HistoryItem) -> Tx {
    477     let amount = amount::decimal(tx.amount.trim_start_matches('-'));
    478     let (id, name) = match tx.related_account.kind {
    479         AccountKind::System => (tx.related_account.ty.id, tx.related_account.ty.name),
    480         AccountKind::User { user } => (user.id, user.display),
    481     };
    482     if tx.amount.starts_with("-") {
    483         Tx::Out(TxOut {
    484             transfer_id: *tx.id,
    485             tx_id: tx.transaction.map(|it| *it.id),
    486             amount,
    487             subject: tx.description.unwrap_or_default(),
    488             creditor_id: *id,
    489             creditor_name: name,
    490             valued_at: tx.date,
    491         })
    492     } else {
    493         Tx::In(TxIn {
    494             transfer_id: *tx.id,
    495             tx_id: tx.transaction.map(|it| *it.id),
    496             amount,
    497             subject: tx.description.unwrap_or_default(),
    498             debtor_id: *id,
    499             debtor_name: name,
    500             valued_at: tx.date,
    501         })
    502     }
    503 }