taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

worker.rs (26544B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{num::ParseIntError, time::Duration};
     18 
     19 use aws_lc_rs::signature::EcdsaKeyPair;
     20 use failure_injection::{InjectedErr, fail_point};
     21 use http_client::ApiErr;
     22 use jiff::{Timestamp, Zoned, civil::Date};
     23 use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener};
     24 use taler_api::subject::{self, parse_incoming_unstructured};
     25 use taler_common::{
     26     ExpoBackoffDecorr,
     27     config::Config,
     28     types::{
     29         amount::{self},
     30         iban::IBAN,
     31     },
     32 };
     33 use tracing::{debug, error, info, trace, warn};
     34 
     35 use crate::{
     36     FullHuPayto, HuIban,
     37     config::{AccountType, WorkerCfg},
     38     db::{self, AddIncomingResult, Initiated, RegisterResult, TxIn, TxOut, TxOutKind},
     39     magnet_api::{
     40         api::MagnetErr,
     41         client::{ApiClient, AuthClient},
     42         types::{Direction, Next, Order, TxDto, TxStatus},
     43     },
     44     setup,
     45 };
     46 
     47 // const TXS_CURSOR_KEY: &str = "txs_cursor"; TODO cursor is broken
     48 
     49 #[derive(Debug, thiserror::Error)]
     50 pub enum WorkerError {
     51     #[error(transparent)]
     52     Db(#[from] sqlx::Error),
     53     #[error(transparent)]
     54     Api(#[from] ApiErr<MagnetErr>),
     55     #[error("Another worker is running concurrently")]
     56     Concurrency,
     57     #[error(transparent)]
     58     Injected(#[from] InjectedErr),
     59 }
     60 
     61 pub type WorkerResult = Result<(), WorkerError>;
     62 
     63 pub async fn run_worker(
     64     cfg: &Config,
     65     pool: &PgPool,
     66     client: &http_client::Client,
     67     transient: bool,
     68 ) -> anyhow::Result<()> {
     69     let cfg = WorkerCfg::parse(cfg)?;
     70     let keys = setup::load(&cfg)?;
     71     let client = AuthClient::new(client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token);
     72 
     73     if transient {
     74         let mut conn = pool.acquire().await?;
     75         let account = client.account(cfg.payto.bban()).await?;
     76         Worker {
     77             client: &client,
     78             db: &mut conn,
     79             account_number: &account.number,
     80             account_code: account.code,
     81             key: &keys.signing_key,
     82             account_type: cfg.account_type,
     83             ignore_tx_before: cfg.ignore_tx_before,
     84             ignore_bounces_before: cfg.ignore_bounces_before,
     85         }
     86         .run()
     87         .await?;
     88         return Ok(());
     89     }
     90 
     91     let mut jitter = ExpoBackoffDecorr::default();
     92 
     93     loop {
     94         let res: WorkerResult = async {
     95             let account = client.account(cfg.payto.bban()).await?;
     96             let db = &mut PgListener::connect_with(pool).await?;
     97 
     98             // Listen to all channels
     99             db.listen_all(["transfer"]).await?;
    100 
    101             info!(target: "worker", "running at initialisation");
    102 
    103             loop {
    104                 debug!(target: "worker", "running");
    105                 Worker {
    106                     client: &client,
    107                     db: db.acquire().await?,
    108                     account_number: &account.number,
    109                     account_code: account.code,
    110                     key: &keys.signing_key,
    111                     account_type: cfg.account_type,
    112                     ignore_tx_before: cfg.ignore_tx_before,
    113                     ignore_bounces_before: cfg.ignore_bounces_before,
    114                 }
    115                 .run()
    116                 .await?;
    117                 jitter.reset();
    118 
    119                 // Wait for notifications or sync timeout
    120                 if let Ok(res) = tokio::time::timeout(cfg.frequency, db.try_recv()).await {
    121                     let mut ntf = res?;
    122                     // Conflate all notifications
    123                     while let Some(n) = ntf {
    124                         debug!(target: "worker", "notification from {}", n.channel());
    125                         ntf = db.next_buffered();
    126                     }
    127 
    128                     if ntf.is_some() {
    129                         info!(target: "worker", "running at db trigger");
    130                     } else {
    131                         info!(target: "worker", "running at frequency");
    132                     }
    133                 }
    134             }
    135         }
    136         .await;
    137         let err = res.unwrap_err();
    138         error!(target: "worker", "{err}");
    139 
    140         if matches!(err, WorkerError::Concurrency) {
    141             // This error won't resolve by itself easily and it mean we are actually making progress
    142             // in another worker so we can jitter more aggressively
    143             tokio::time::sleep(Duration::from_secs(15)).await;
    144         }
    145         tokio::time::sleep(jitter.backoff()).await;
    146     }
    147 }
    148 
    149 pub struct Worker<'a> {
    150     pub client: &'a ApiClient<'a>,
    151     pub db: &'a mut PgConnection,
    152     pub account_number: &'a str,
    153     pub account_code: u64,
    154     pub key: &'a EcdsaKeyPair,
    155     pub account_type: AccountType,
    156     pub ignore_tx_before: Option<Date>,
    157     pub ignore_bounces_before: Option<Date>,
    158 }
    159 
    160 impl Worker<'_> {
    161     /// Run a single worker pass
    162     pub async fn run(&mut self) -> WorkerResult {
    163         // Some worker operations are not idempotent, therefore it's not safe to have multiple worker
    164         // running concurrently. We use a global Postgres advisory lock to prevent it.
    165         if !db::worker_lock(self.db).await? {
    166             return Err(WorkerError::Concurrency);
    167         };
    168 
    169         // Sync transactions
    170         let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused
    171         let mut all_final = true;
    172         let mut first = true;
    173         loop {
    174             let page = self
    175                 .client
    176                 .page_tx(
    177                     Direction::Both,
    178                     Order::Ascending,
    179                     100,
    180                     self.account_number,
    181                     &next,
    182                     first,
    183                 )
    184                 .await?;
    185             first = false;
    186             next = page.next;
    187             for item in page.list {
    188                 all_final &= item.tx.status.is_final();
    189                 let tx = extract_tx_info(item.tx);
    190                 match tx {
    191                     Tx::In(tx_in) => {
    192                         // We only register final successful incoming transactions
    193                         if tx_in.status != TxStatus::Completed {
    194                             debug!(target: "worker", "pending or failed in {tx_in}");
    195                             continue;
    196                         }
    197 
    198                         if let Some(before) = self.ignore_tx_before
    199                             && tx_in.value_date < before
    200                         {
    201                             debug!(target: "worker", "ignore in {tx_in}");
    202                             continue;
    203                         }
    204                         let bounce = async |db: &mut PgConnection,
    205                                             reason: &str|
    206                                -> Result<(), WorkerError> {
    207                             if let Some(before) = self.ignore_bounces_before
    208                                 && tx_in.value_date < before
    209                             {
    210                                 match db::register_tx_in(db, &tx_in, &None, &Timestamp::now())
    211                                     .await?
    212                                 {
    213                                     AddIncomingResult::Success { new, .. } => {
    214                                         if new {
    215                                             info!(target: "worker", "in  {tx_in} skip bounce: {reason}");
    216                                         } else {
    217                                             trace!(target: "worker", "in  {tx_in} already skip bounce ");
    218                                         }
    219                                     }
    220                                     AddIncomingResult::ReservePubReuse => unreachable!(),
    221                                 }
    222                             } else {
    223                                 let res = db::register_bounce_tx_in(
    224                                     db,
    225                                     &tx_in,
    226                                     &tx_in.amount,
    227                                     reason,
    228                                     &Timestamp::now(),
    229                                 )
    230                                 .await?;
    231 
    232                                 if res.tx_new {
    233                                     info!(target: "worker",
    234                                         "in  {tx_in} bounced in {}: {reason}",
    235                                         res.bounce_id
    236                                     );
    237                                 } else {
    238                                     trace!(target: "worker",
    239                                         "in  {tx_in} already seen and bounced in {}: {reason}",
    240                                         res.bounce_id
    241                                     );
    242                                 }
    243                             }
    244                             Ok(())
    245                         };
    246                         match self.account_type {
    247                             AccountType::Exchange => {
    248                                 match parse_incoming_unstructured(&tx_in.subject) {
    249                                     Ok(None) => bounce(self.db, "missing public key").await?,
    250                                     Ok(Some(subject)) => match db::register_tx_in(
    251                                         self.db,
    252                                         &tx_in,
    253                                         &Some(subject),
    254                                         &Timestamp::now(),
    255                                     )
    256                                     .await?
    257                                     {
    258                                         AddIncomingResult::Success { new, .. } => {
    259                                             if new {
    260                                                 info!(target: "worker", "in  {tx_in}");
    261                                             } else {
    262                                                 trace!(target: "worker", "in  {tx_in} already seen");
    263                                             }
    264                                         }
    265                                         AddIncomingResult::ReservePubReuse => {
    266                                             bounce(self.db, "reserve pub reuse").await?
    267                                         }
    268                                     },
    269                                     Err(e) => bounce(self.db, &e.to_string()).await?,
    270                                 }
    271                             }
    272                             AccountType::Normal => {
    273                                 match db::register_tx_in(self.db, &tx_in, &None, &Timestamp::now())
    274                                     .await?
    275                                 {
    276                                     AddIncomingResult::Success { new, .. } => {
    277                                         if new {
    278                                             info!(target: "worker", "in  {tx_in}");
    279                                         } else {
    280                                             trace!(target: "worker", "in  {tx_in} already seen");
    281                                         }
    282                                     }
    283                                     AddIncomingResult::ReservePubReuse => unreachable!(),
    284                                 }
    285                             }
    286                         }
    287                     }
    288                     Tx::Out(tx_out) => {
    289                         match tx_out.status {
    290                             TxStatus::ToBeRecorded => {
    291                                 self.recover_tx(&tx_out).await?;
    292                                 continue;
    293                             }
    294                             TxStatus::PendingFirstSignature
    295                             | TxStatus::PendingSecondSignature
    296                             | TxStatus::PendingProcessing
    297                             | TxStatus::Verified
    298                             | TxStatus::PartiallyCompleted
    299                             | TxStatus::UnderReview => {
    300                                 // Still pending
    301                                 debug!(target: "worker", "pending out {tx_out}");
    302                                 continue;
    303                             }
    304                             TxStatus::Rejected | TxStatus::Canceled | TxStatus::Completed => {}
    305                         }
    306                         match self.account_type {
    307                             AccountType::Exchange => {
    308                                 let kind = if let Ok(subject) =
    309                                     subject::parse_outgoing(&tx_out.subject)
    310                                 {
    311                                     TxOutKind::Talerable(subject)
    312                                 } else if let Ok(bounced) = parse_bounce_outgoing(&tx_out.subject) {
    313                                     TxOutKind::Bounce(bounced)
    314                                 } else {
    315                                     TxOutKind::Simple
    316                                 };
    317                                 if tx_out.status == TxStatus::Completed {
    318                                     let res = db::register_tx_out(
    319                                         self.db,
    320                                         &tx_out,
    321                                         &kind,
    322                                         &Timestamp::now(),
    323                                     )
    324                                     .await?;
    325                                     match res.result {
    326                                         RegisterResult::idempotent => match kind {
    327                                             TxOutKind::Simple => {
    328                                                 trace!(target: "worker", "out malformed {tx_out} already seen")
    329                                             }
    330                                             TxOutKind::Bounce(_) => {
    331                                                 trace!(target: "worker", "out bounce {tx_out} already seen")
    332                                             }
    333                                             TxOutKind::Talerable(_) => {
    334                                                 trace!(target: "worker", "out {tx_out} already seen")
    335                                             }
    336                                         },
    337                                         RegisterResult::known => match kind {
    338                                             TxOutKind::Simple => {
    339                                                 warn!(target: "worker", "out malformed {tx_out}")
    340                                             }
    341                                             TxOutKind::Bounce(_) => {
    342                                                 info!(target: "worker", "out bounce {tx_out}")
    343                                             }
    344                                             TxOutKind::Talerable(_) => {
    345                                                 info!(target: "worker", "out {tx_out}")
    346                                             }
    347                                         },
    348                                         RegisterResult::recovered => match kind {
    349                                             TxOutKind::Simple => {
    350                                                 warn!(target: "worker", "out malformed (recovered) {tx_out}")
    351                                             }
    352                                             TxOutKind::Bounce(_) => {
    353                                                 warn!(target: "worker", "out bounce (recovered) {tx_out}")
    354                                             }
    355                                             TxOutKind::Talerable(_) => {
    356                                                 warn!(target: "worker", "out (recovered) {tx_out}")
    357                                             }
    358                                         },
    359                                     }
    360                                 } else {
    361                                     let bounced = match kind {
    362                                         TxOutKind::Simple => None,
    363                                         TxOutKind::Bounce(bounced) => Some(bounced),
    364                                         TxOutKind::Talerable(_) => None,
    365                                     };
    366                                     let res = db::register_tx_out_failure(
    367                                         self.db,
    368                                         tx_out.code,
    369                                         bounced,
    370                                         &Timestamp::now(),
    371                                     )
    372                                     .await?;
    373                                     if let Some(id) = res.initiated_id {
    374                                         if res.new {
    375                                             error!(target: "worker", "out failure {id} {tx_out}");
    376                                         } else {
    377                                             trace!(target: "worker", "out failure {id} {tx_out} already seen");
    378                                         }
    379                                     }
    380                                 }
    381                             }
    382                             AccountType::Normal => {
    383                                 if tx_out.status == TxStatus::Completed {
    384                                     let res = db::register_tx_out(
    385                                         self.db,
    386                                         &tx_out,
    387                                         &TxOutKind::Simple,
    388                                         &Timestamp::now(),
    389                                     )
    390                                     .await?;
    391                                     match res.result {
    392                                         RegisterResult::idempotent => {
    393                                             trace!(target: "worker", "out {tx_out} already seen");
    394                                         }
    395                                         RegisterResult::known => {
    396                                             info!(target: "worker", "out {tx_out}");
    397                                         }
    398                                         RegisterResult::recovered => {
    399                                             warn!(target: "worker", "out (recovered) {tx_out}");
    400                                         }
    401                                     }
    402                                 } else {
    403                                     let res = db::register_tx_out_failure(
    404                                         self.db,
    405                                         tx_out.code,
    406                                         None,
    407                                         &Timestamp::now(),
    408                                     )
    409                                     .await?;
    410                                     if let Some(id) = res.initiated_id {
    411                                         if res.new {
    412                                             error!(target: "worker", "out failure {id} {tx_out}");
    413                                         } else {
    414                                             trace!(target: "worker", "out failure {id} {tx_out} already seen");
    415                                         }
    416                                     }
    417                                 }
    418                             }
    419                         }
    420                     }
    421                 }
    422             }
    423 
    424             if let Some(_next) = &next {
    425                 // Update in db cursor only if all previous transactions where final
    426                 if all_final {
    427                     // debug!(target: "worker", "advance cursor {next:?}");
    428                     // kv_set(&mut *self.db, TXS_CURSOR_KEY, &next).await?; TODO cursor is broken
    429                 }
    430             } else {
    431                 break;
    432             }
    433         }
    434 
    435         // Send transactions
    436         let start = Timestamp::now();
    437         let now = Zoned::now();
    438         loop {
    439             let batch = db::pending_batch(&mut *self.db, &start).await?;
    440             if batch.is_empty() {
    441                 break;
    442             }
    443             for tx in batch {
    444                 debug!(target: "worker", "send tx {tx}");
    445                 self.init_tx(&tx, &now).await?;
    446             }
    447         }
    448         Ok(())
    449     }
    450 
    451     /// Try to sign an unsigned initiated transaction
    452     pub async fn recover_tx(&mut self, tx: &TxOut) -> WorkerResult {
    453         if db::initiated_exists_for_code(&mut *self.db, tx.code)
    454             .await?
    455             .is_some()
    456         {
    457             // Known initiated we submit it
    458             assert_eq!(tx.amount.frac, 0);
    459             self.submit_tx(
    460                 tx.code,
    461                 -(tx.amount.val as f64),
    462                 &tx.value_date,
    463                 tx.creditor.bban(),
    464             )
    465             .await?;
    466         } else {
    467             // The transaction is unknown (we failed after creating it and before storing it in the db)
    468             // we delete it
    469             self.client.delete_tx(tx.code).await?;
    470             debug!(target: "worker", "out {}: delete uncompleted orphan", tx.code);
    471         }
    472 
    473         Ok(())
    474     }
    475 
    476     /// Create and sign a forint transfer
    477     pub async fn init_tx(&mut self, tx: &Initiated, now: &Zoned) -> WorkerResult {
    478         trace!(target: "worker", "init tx {tx}");
    479         assert_eq!(tx.amount.frac, 0);
    480         let date = now.date();
    481         // Initialize the new transaction, on failure an orphan initiated transaction can be created
    482         let res = self
    483             .client
    484             .init_tx(
    485                 self.account_code,
    486                 tx.amount.val as f64,
    487                 &tx.subject,
    488                 &date,
    489                 &tx.creditor.name,
    490                 tx.creditor.bban(),
    491             )
    492             .await;
    493         fail_point("init-tx")?;
    494         let info = match res {
    495             // Check if succeeded
    496             Ok(info) => {
    497                 // Update transaction status, on failure the initiated transaction will be orphan
    498                 db::initiated_submit_success(&mut *self.db, tx.id, &Timestamp::now(), info.code)
    499                     .await?;
    500                 info
    501             }
    502             Err(e) => {
    503                 if let MagnetErr::Magnet(e) = &e.err {
    504                     // Check if error is permanent
    505                     if matches!(
    506                         (e.error_code, e.short_message.as_str()),
    507                         (404, "BSZLA_NEM_TALALHATO") // Unknown account
    508                          | (409, "FORRAS_SZAMLA_ESZAMLA_EGYEZIK") // Same account
    509                     ) {
    510                         db::initiated_submit_permanent_failure(
    511                             &mut *self.db,
    512                             tx.id,
    513                             &Timestamp::now(),
    514                             &e.to_string(),
    515                         )
    516                         .await?;
    517                         error!(target: "worker", "initiated failure {tx}: {e}");
    518                         return WorkerResult::Ok(());
    519                     }
    520                 }
    521                 return Err(e.into());
    522             }
    523         };
    524         trace!(target: "worker", "init tx {}", info.code);
    525 
    526         // Sign transaction
    527         self.submit_tx(info.code, info.amount, &date, tx.creditor.bban())
    528             .await?;
    529         Ok(())
    530     }
    531 
    532     /** Submit an initiated forint transfer */
    533     pub async fn submit_tx(
    534         &mut self,
    535         tx_code: u64,
    536         amount: f64,
    537         date: &Date,
    538         creditor: &str,
    539     ) -> WorkerResult {
    540         debug!(target: "worker", "submit tx {tx_code}");
    541         fail_point("submit-tx")?;
    542         // Submit an initiated transaction, on failure we will retry
    543         match self
    544             .client
    545             .submit_tx(
    546                 self.key,
    547                 self.account_number,
    548                 tx_code,
    549                 amount,
    550                 date,
    551                 creditor,
    552             )
    553             .await
    554         {
    555             Ok(_) => Ok(()),
    556             Err(e) => {
    557                 if let MagnetErr::Magnet(e) = &e.err {
    558                     // Check if soft failure
    559                     if matches!(
    560                         (e.error_code, e.short_message.as_str()),
    561                         (409, "TRANZAKCIO_ROSSZ_STATUS") // Already summited or cannot be signed
    562                     ) {
    563                         warn!(target: "worker", "submit tx {tx_code}: {e}");
    564                         return Ok(());
    565                     }
    566                 }
    567                 Err(e.into())
    568             }
    569         }
    570     }
    571 }
    572 
    573 pub enum Tx {
    574     In(TxIn),
    575     Out(TxOut),
    576 }
    577 
    578 pub fn extract_tx_info(tx: TxDto) -> Tx {
    579     // TODO amount from f64 without allocations
    580     let amount = amount::amount(format!("{}:{}", tx.currency, tx.amount.abs()));
    581     // TODO we should support non hungarian account and error handling
    582     let iban = if tx.counter_account.starts_with("HU") {
    583         let iban: IBAN = tx.counter_account.parse().unwrap();
    584         HuIban::try_from(iban).unwrap()
    585     } else {
    586         HuIban::from_bban(&tx.counter_account).unwrap()
    587     };
    588     let counter_account = FullHuPayto::new(iban, &tx.counter_name);
    589     if tx.amount.is_sign_positive() {
    590         Tx::In(TxIn {
    591             code: tx.code,
    592             amount,
    593             subject: tx.subject.unwrap_or_default(),
    594             debtor: counter_account,
    595             value_date: tx.value_date,
    596             status: tx.status,
    597         })
    598     } else {
    599         Tx::Out(TxOut {
    600             code: tx.code,
    601             amount,
    602             subject: tx.subject.unwrap_or_default(),
    603             creditor: counter_account,
    604             value_date: tx.value_date,
    605             status: tx.status,
    606         })
    607     }
    608 }
    609 
    610 #[derive(Debug, thiserror::Error)]
    611 pub enum BounceSubjectErr {
    612     #[error("missing parts")]
    613     MissingParts,
    614     #[error("not a bounce")]
    615     NotBounce,
    616     #[error("malformed bounced id: {0}")]
    617     Id(#[from] ParseIntError),
    618 }
    619 
    620 pub fn parse_bounce_outgoing(subject: &str) -> Result<u32, BounceSubjectErr> {
    621     let (prefix, id) = subject
    622         .rsplit_once(" ")
    623         .ok_or(BounceSubjectErr::MissingParts)?;
    624     if !prefix.starts_with("bounce") {
    625         return Err(BounceSubjectErr::NotBounce);
    626     }
    627     let id: u32 = id.parse()?;
    628     Ok(id)
    629 }