taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

worker.rs (23066B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::num::ParseIntError;
     18 
     19 use jiff::{Timestamp, Zoned, civil::Date};
     20 use p256::ecdsa::SigningKey;
     21 use sqlx::PgConnection;
     22 use taler_api::subject::{self, parse_incoming_unstructured};
     23 use taler_common::types::{
     24     amount::{self},
     25     iban::IBAN,
     26 };
     27 use tracing::{debug, error, info, trace, warn};
     28 
     29 use crate::{
     30     FullHuPayto, HuIban,
     31     config::AccountType,
     32     db::{self, AddIncomingResult, Initiated, RegisterResult, TxIn, TxOut, TxOutKind},
     33     failure_injection::{InjectedErr, fail_point},
     34     magnet_api::{
     35         api::{ApiErr, ErrKind},
     36         client::ApiClient,
     37         types::{Direction, Next, Order, TxDto, TxStatus},
     38     },
     39 };
     40 
     41 // const TXS_CURSOR_KEY: &str = "txs_cursor"; TODO cursor is broken
     42 
     43 #[derive(Debug, thiserror::Error)]
     44 pub enum WorkerError {
     45     #[error(transparent)]
     46     Db(#[from] sqlx::Error),
     47     #[error(transparent)]
     48     Api(#[from] ApiErr),
     49     #[error(transparent)]
     50     Injected(#[from] InjectedErr),
     51 }
     52 
     53 pub type WorkerResult = Result<(), WorkerError>;
     54 
     55 pub struct Worker<'a> {
     56     pub client: &'a ApiClient<'a>,
     57     pub db: &'a mut PgConnection,
     58     pub account_number: &'a str,
     59     pub account_code: u64,
     60     pub key: &'a SigningKey,
     61     pub account_type: AccountType,
     62     pub ignore_tx_before: Option<Date>,
     63     pub ignore_bounces_before: Option<Date>,
     64 }
     65 
     66 impl Worker<'_> {
     67     /// Run a single worker pass
     68     pub async fn run(&mut self) -> WorkerResult {
     69         // Sync transactions
     70         let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused
     71         let mut all_final = true;
     72         let mut first = true;
     73         loop {
     74             let page = self
     75                 .client
     76                 .page_tx(
     77                     Direction::Both,
     78                     Order::Ascending,
     79                     100,
     80                     self.account_number,
     81                     &next,
     82                     first,
     83                 )
     84                 .await?;
     85             first = false;
     86             next = page.next;
     87             for item in page.list {
     88                 all_final &= item.tx.status.is_final();
     89                 let tx = extract_tx_info(item.tx);
     90                 match tx {
     91                     Tx::In(tx_in) => {
     92                         // We only register final successful incoming transactions
     93                         if tx_in.status != TxStatus::Completed {
     94                             debug!(target: "worker", "pending or failed in {tx_in}");
     95                             continue;
     96                         }
     97 
     98                         if let Some(before) = self.ignore_tx_before
     99                             && tx_in.value_date < before
    100                         {
    101                             debug!(target: "worker", "ignore in {tx_in}");
    102                             continue;
    103                         }
    104                         let bounce = async |db: &mut PgConnection,
    105                                             reason: &str|
    106                                -> Result<(), WorkerError> {
    107                             if let Some(before) = self.ignore_bounces_before
    108                                 && tx_in.value_date < before
    109                             {
    110                                 match db::register_tx_in(db, &tx_in, &None, &Timestamp::now())
    111                                     .await?
    112                                 {
    113                                     AddIncomingResult::Success { new, .. } => {
    114                                         if new {
    115                                             info!(target: "worker", "in  {tx_in} skip bounce: {reason}");
    116                                         } else {
    117                                             trace!(target: "worker", "in  {tx_in} already skip bounce ");
    118                                         }
    119                                     }
    120                                     AddIncomingResult::ReservePubReuse => unreachable!(),
    121                                 }
    122                             } else {
    123                                 let res = db::register_bounce_tx_in(
    124                                     db,
    125                                     &tx_in,
    126                                     &tx_in.amount,
    127                                     reason,
    128                                     &Timestamp::now(),
    129                                 )
    130                                 .await?;
    131 
    132                                 if res.tx_new {
    133                                     info!(target: "worker",
    134                                         "in  {tx_in} bounced in {}: {reason}",
    135                                         res.bounce_id
    136                                     );
    137                                 } else {
    138                                     trace!(target: "worker",
    139                                         "in  {tx_in} already seen and bounced in {}: {reason}",
    140                                         res.bounce_id
    141                                     );
    142                                 }
    143                             }
    144                             Ok(())
    145                         };
    146                         match self.account_type {
    147                             AccountType::Exchange => {
    148                                 match parse_incoming_unstructured(&tx_in.subject) {
    149                                     Ok(None) => bounce(self.db, "missing public key").await?,
    150                                     Ok(Some(subject)) => match db::register_tx_in(
    151                                         self.db,
    152                                         &tx_in,
    153                                         &Some(subject),
    154                                         &Timestamp::now(),
    155                                     )
    156                                     .await?
    157                                     {
    158                                         AddIncomingResult::Success { new, .. } => {
    159                                             if new {
    160                                                 info!(target: "worker", "in  {tx_in}");
    161                                             } else {
    162                                                 trace!(target: "worker", "in  {tx_in} already seen");
    163                                             }
    164                                         }
    165                                         AddIncomingResult::ReservePubReuse => {
    166                                             bounce(self.db, "reserve pub reuse").await?
    167                                         }
    168                                     },
    169                                     Err(e) => bounce(self.db, &e.to_string()).await?,
    170                                 }
    171                             }
    172                             AccountType::Normal => {
    173                                 match db::register_tx_in(self.db, &tx_in, &None, &Timestamp::now())
    174                                     .await?
    175                                 {
    176                                     AddIncomingResult::Success { new, .. } => {
    177                                         if new {
    178                                             info!(target: "worker", "in  {tx_in}");
    179                                         } else {
    180                                             trace!(target: "worker", "in  {tx_in} already seen");
    181                                         }
    182                                     }
    183                                     AddIncomingResult::ReservePubReuse => unreachable!(),
    184                                 }
    185                             }
    186                         }
    187                     }
    188                     Tx::Out(tx_out) => {
    189                         match tx_out.status {
    190                             TxStatus::ToBeRecorded => {
    191                                 self.recover_tx(&tx_out).await?;
    192                                 continue;
    193                             }
    194                             TxStatus::PendingFirstSignature
    195                             | TxStatus::PendingSecondSignature
    196                             | TxStatus::PendingProcessing
    197                             | TxStatus::Verified
    198                             | TxStatus::PartiallyCompleted
    199                             | TxStatus::UnderReview => {
    200                                 // Still pending
    201                                 debug!(target: "worker", "pending out {tx_out}");
    202                                 continue;
    203                             }
    204                             TxStatus::Rejected | TxStatus::Canceled | TxStatus::Completed => {}
    205                         }
    206                         match self.account_type {
    207                             AccountType::Exchange => {
    208                                 let kind = if let Ok(subject) =
    209                                     subject::parse_outgoing(&tx_out.subject)
    210                                 {
    211                                     TxOutKind::Talerable(subject)
    212                                 } else if let Ok(bounced) = parse_bounce_outgoing(&tx_out.subject) {
    213                                     TxOutKind::Bounce(bounced)
    214                                 } else {
    215                                     TxOutKind::Simple
    216                                 };
    217                                 if tx_out.status == TxStatus::Completed {
    218                                     let res = db::register_tx_out(
    219                                         self.db,
    220                                         &tx_out,
    221                                         &kind,
    222                                         &Timestamp::now(),
    223                                     )
    224                                     .await?;
    225                                     match res.result {
    226                                         RegisterResult::idempotent => match kind {
    227                                             TxOutKind::Simple => {
    228                                                 trace!(target: "worker", "out malformed {tx_out} already seen")
    229                                             }
    230                                             TxOutKind::Bounce(_) => {
    231                                                 trace!(target: "worker", "out bounce {tx_out} already seen")
    232                                             }
    233                                             TxOutKind::Talerable(_) => {
    234                                                 trace!(target: "worker", "out {tx_out} already seen")
    235                                             }
    236                                         },
    237                                         RegisterResult::known => match kind {
    238                                             TxOutKind::Simple => {
    239                                                 warn!(target: "worker", "out malformed {tx_out}")
    240                                             }
    241                                             TxOutKind::Bounce(_) => {
    242                                                 info!(target: "worker", "out bounce {tx_out}")
    243                                             }
    244                                             TxOutKind::Talerable(_) => {
    245                                                 info!(target: "worker", "out {tx_out}")
    246                                             }
    247                                         },
    248                                         RegisterResult::recovered => match kind {
    249                                             TxOutKind::Simple => {
    250                                                 warn!(target: "worker", "out malformed (recovered) {tx_out}")
    251                                             }
    252                                             TxOutKind::Bounce(_) => {
    253                                                 warn!(target: "worker", "out bounce (recovered) {tx_out}")
    254                                             }
    255                                             TxOutKind::Talerable(_) => {
    256                                                 warn!(target: "worker", "out (recovered) {tx_out}")
    257                                             }
    258                                         },
    259                                     }
    260                                 } else {
    261                                     let bounced = match kind {
    262                                         TxOutKind::Simple => None,
    263                                         TxOutKind::Bounce(bounced) => Some(bounced),
    264                                         TxOutKind::Talerable(_) => None,
    265                                     };
    266                                     let res = db::register_tx_out_failure(
    267                                         self.db,
    268                                         tx_out.code,
    269                                         bounced,
    270                                         &Timestamp::now(),
    271                                     )
    272                                     .await?;
    273                                     if let Some(id) = res.initiated_id {
    274                                         if res.new {
    275                                             error!(target: "worker", "out failure {id} {tx_out}");
    276                                         } else {
    277                                             trace!(target: "worker", "out failure {id} {tx_out} already seen");
    278                                         }
    279                                     }
    280                                 }
    281                             }
    282                             AccountType::Normal => {
    283                                 if tx_out.status == TxStatus::Completed {
    284                                     let res = db::register_tx_out(
    285                                         self.db,
    286                                         &tx_out,
    287                                         &TxOutKind::Simple,
    288                                         &Timestamp::now(),
    289                                     )
    290                                     .await?;
    291                                     match res.result {
    292                                         RegisterResult::idempotent => {
    293                                             trace!(target: "worker", "out {tx_out} already seen");
    294                                         }
    295                                         RegisterResult::known => {
    296                                             info!(target: "worker", "out {tx_out}");
    297                                         }
    298                                         RegisterResult::recovered => {
    299                                             warn!(target: "worker", "out (recovered) {tx_out}");
    300                                         }
    301                                     }
    302                                 } else {
    303                                     let res = db::register_tx_out_failure(
    304                                         self.db,
    305                                         tx_out.code,
    306                                         None,
    307                                         &Timestamp::now(),
    308                                     )
    309                                     .await?;
    310                                     if let Some(id) = res.initiated_id {
    311                                          if res.new {
    312                                             error!(target: "worker", "out failure {id} {tx_out}");
    313                                         } else {
    314                                             trace!(target: "worker", "out failure {id} {tx_out} already seen");
    315                                         }
    316                                     }
    317                                 }
    318                             }
    319                         }
    320                     }
    321                 }
    322             }
    323 
    324             if let Some(_next) = &next {
    325                 // Update in db cursor only if all previous transactions where final
    326                 if all_final {
    327                     // debug!(target: "worker", "advance cursor {next:?}");
    328                     // kv_set(&mut *self.db, TXS_CURSOR_KEY, &next).await?; TODO cursor is broken
    329                 }
    330             } else {
    331                 break;
    332             }
    333         }
    334 
    335         // Send transactions
    336         let start = Timestamp::now();
    337         let now = Zoned::now();
    338         loop {
    339             let batch = db::pending_batch(&mut *self.db, &start).await?;
    340             if batch.is_empty() {
    341                 break;
    342             }
    343             for tx in batch {
    344                 debug!(target: "worker", "send tx {tx}");
    345                 self.init_tx(&tx, &now).await?;
    346             }
    347         }
    348         Ok(())
    349     }
    350 
    351     /// Try to sign an unsigned initiated transaction
    352     pub async fn recover_tx(&mut self, tx: &TxOut) -> WorkerResult {
    353         if let Some(_) = db::initiated_exists_for_code(&mut *self.db, tx.code).await? {
    354             // Known initiated we submit it
    355             assert_eq!(tx.amount.frac, 0);
    356             self.submit_tx(
    357                 tx.code,
    358                 -(tx.amount.val as f64),
    359                 &tx.value_date,
    360                 tx.creditor.bban(),
    361             )
    362             .await?;
    363         } else {
    364             // The transaction is unknown (we failed after creating it and before storing it in the db)
    365             // we delete it
    366             self.client.delete_tx(tx.code).await?;
    367             debug!(target: "worker", "out {}: delete uncompleted orphan", tx.code);
    368         }
    369 
    370         Ok(())
    371     }
    372 
    373     /// Create and sign a forint transfer
    374     pub async fn init_tx(&mut self, tx: &Initiated, now: &Zoned) -> WorkerResult {
    375         trace!(target: "worker", "init tx {tx}");
    376         assert_eq!(tx.amount.frac, 0);
    377         let date = now.date();
    378         // Initialize the new transaction, on failure an orphan initiated transaction can be created
    379         let res = self
    380             .client
    381             .init_tx(
    382                 self.account_code,
    383                 tx.amount.val as f64,
    384                 &tx.subject,
    385                 &date,
    386                 &tx.creditor.name,
    387                 tx.creditor.bban(),
    388             )
    389             .await;
    390         fail_point("init-tx")?;
    391         let info = match res {
    392             // Check if succeeded
    393             Ok(info) => {
    394                 // Update transaction status, on failure the initiated transaction will be orphan
    395                 db::initiated_submit_success(&mut *self.db, tx.id, &Timestamp::now(), info.code)
    396                     .await?;
    397                 info
    398             }
    399             Err(e) => {
    400                 if let ApiErr {
    401                     kind: ErrKind::Magnet(e),
    402                     ..
    403                 } = &e
    404                 {
    405                     // Check if error is permanent
    406                     if matches!(
    407                         (e.error_code, e.short_message.as_str()),
    408                         (404, "BSZLA_NEM_TALALHATO") // Unknown account
    409                          | (409, "FORRAS_SZAMLA_ESZAMLA_EGYEZIK") // Same account
    410                     ) {
    411                         db::initiated_submit_permanent_failure(
    412                             &mut *self.db,
    413                             tx.id,
    414                             &Timestamp::now(),
    415                             &e.to_string(),
    416                         )
    417                         .await?;
    418                         error!(target: "worker", "initiated failure {tx}: {e}");
    419                         return WorkerResult::Ok(());
    420                     }
    421                 }
    422                 return Err(e.into());
    423             }
    424         };
    425         trace!(target: "worker", "init tx {}", info.code);
    426 
    427         // Sign transaction
    428         self.submit_tx(info.code, info.amount, &date, tx.creditor.bban())
    429             .await?;
    430         Ok(())
    431     }
    432 
    433     /** Submit an initiated forint transfer */
    434     pub async fn submit_tx(
    435         &mut self,
    436         tx_code: u64,
    437         amount: f64,
    438         date: &Date,
    439         creditor: &str,
    440     ) -> WorkerResult {
    441         debug!(target: "worker", "submit tx {tx_code}");
    442         fail_point("submit-tx")?;
    443         // Submit an initiated transaction, on failure we will retry
    444         match self
    445             .client
    446             .submit_tx(
    447                 self.key,
    448                 self.account_number,
    449                 tx_code,
    450                 amount,
    451                 date,
    452                 creditor,
    453             )
    454             .await
    455         {
    456             Ok(_) => Ok(()),
    457             Err(e) => {
    458                 if let ApiErr {
    459                     kind: ErrKind::Magnet(e),
    460                     ..
    461                 } = &e
    462                 {
    463                     // Check if soft failure
    464                     if matches!(
    465                         (e.error_code, e.short_message.as_str()),
    466                         (409, "TRANZAKCIO_ROSSZ_STATUS") // Already summited or cannot be signed
    467                     ) {
    468                         warn!(target: "worker", "submit tx {tx_code}: {e}");
    469                         return Ok(());
    470                     }
    471                 }
    472                 Err(e.into())
    473             }
    474         }
    475     }
    476 }
    477 
    478 pub enum Tx {
    479     In(TxIn),
    480     Out(TxOut),
    481 }
    482 
    483 pub fn extract_tx_info(tx: TxDto) -> Tx {
    484     // TODO amount from f64 without allocations
    485     let amount = amount::amount(format!("{}:{}", tx.currency, tx.amount.abs()));
    486     // TODO we should support non hungarian account and error handling
    487     let iban = if tx.counter_account.starts_with("HU") {
    488         let iban: IBAN = tx.counter_account.parse().unwrap();
    489         HuIban::try_from(iban).unwrap()
    490     } else {
    491         HuIban::from_bban(&tx.counter_account).unwrap()
    492     };
    493     let counter_account = FullHuPayto::new(iban, tx.counter_name);
    494     if tx.amount.is_sign_positive() {
    495         Tx::In(TxIn {
    496             code: tx.code,
    497             amount,
    498             subject: tx.subject.unwrap_or_default(),
    499             debtor: counter_account,
    500             value_date: tx.value_date,
    501             status: tx.status,
    502         })
    503     } else {
    504         Tx::Out(TxOut {
    505             code: tx.code,
    506             amount,
    507             subject: tx.subject.unwrap_or_default(),
    508             creditor: counter_account,
    509             value_date: tx.value_date,
    510             status: tx.status,
    511         })
    512     }
    513 }
    514 
    515 #[derive(Debug, thiserror::Error)]
    516 pub enum BounceSubjectErr {
    517     #[error("missing parts")]
    518     MissingParts,
    519     #[error("not a bounce")]
    520     NotBounce,
    521     #[error("malformed bounced id: {0}")]
    522     Id(#[from] ParseIntError),
    523 }
    524 
    525 pub fn parse_bounce_outgoing(subject: &str) -> Result<u32, BounceSubjectErr> {
    526     let (prefix, id) = subject
    527         .rsplit_once(" ")
    528         .ok_or(BounceSubjectErr::MissingParts)?;
    529     if !prefix.starts_with("bounce") {
    530         return Err(BounceSubjectErr::NotBounce);
    531     }
    532     let id: u32 = id.parse()?;
    533     Ok(id)
    534 }