taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

worker.rs (6327B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::time::Duration;
     18 
     19 use jiff::Timestamp;
     20 use sqlx::PgPool;
     21 use taler_common::{ExpoBackoffDecorr, config::Config};
     22 use tracing::{error, info, trace, warn};
     23 
     24 use crate::{
     25     apns::{ApnsError, Client, Reason},
     26     config::WorkerCfg,
     27     db,
     28 };
     29 
     30 #[derive(Debug, thiserror::Error)]
     31 pub enum WorkerError {
     32     #[error(transparent)]
     33     Db(#[from] sqlx::Error),
     34     #[error(transparent)]
     35     Apns(#[from] ApnsError),
     36 }
     37 
     38 const NB_RETRY: usize = 5;
     39 
     40 async fn wakeup(pool: &PgPool, client: &mut Client) -> Result<(), WorkerError> {
     41     let tokens = db::all_registrations(pool).await?;
     42     info!(target: "worker", "send notification to {} devices", tokens.len());
     43 
     44     // Wait at least 15 minutes before retrying API calls as per: https://developer.apple.com/documentation/usernotifications/sending-notification-requests-to-apns#Follow-best-practices-while-sending-push-notifications-with-APNs
     45     // After 15 minutes, you can retry JSON payloads that receive response status codes that begin with 5XX.
     46     // While retrying, you may employ a back-off technique. Most notifications with the status code 4XX can
     47     // be retried after you fix the error noted in the reason field. Don’t retry notification responses with
     48     // the error code BadDeviceToken, DeviceTokenNotForTopic, Forbidden, ExpiredToken, Unregistered, or PayloadTooLarge.
     49     // You can retry with a delay, if you get the error code TooManyRequests.
     50     let mut jitter = ExpoBackoffDecorr::new(Duration::from_mins(15), Duration::from_hours(1), 2.5);
     51 
     52     // TODO paginate tokens
     53     // TODO send in batches
     54 
     55     for token in tokens {
     56         trace!(target: "worker", "send background notification to {token}");
     57         let mut attempt = 0;
     58         while let Err(e) = client.send(&token).await {
     59             trace!(target: "worker", "notification to {token} failed {e}");
     60             if let ApnsError::Err { reason, timestamp } = e {
     61                 match reason {
     62                     // Fatal error
     63                     Reason::BadCollapseId
     64                     | Reason::BadDeviceToken
     65                     | Reason::BadExpirationDate
     66                     | Reason::BadMessageId
     67                     | Reason::BadPriority
     68                     | Reason::BadTopic
     69                     | Reason::DuplicateHeaders
     70                     | Reason::InvalidPushType
     71                     | Reason::MissingDeviceToken
     72                     | Reason::MissingTopic
     73                     | Reason::PayloadEmpty
     74                     | Reason::BadPath
     75                     | Reason::MethodNotAllowed => {
     76                         error!(target: "worker", "fatal error the service is broken: {e}");
     77                         std::process::exit(9);
     78                     }
     79                     // Config error
     80                     Reason::TopicDisallowed
     81                     | Reason::BadCertificate
     82                     | Reason::BadCertificateEnvironment
     83                     | Reason::ExpiredProviderToken
     84                     | Reason::Forbidden
     85                     | Reason::InvalidProviderToken
     86                     | Reason::MissingProviderToken
     87                     | Reason::UnrelatedKeyIdInToken
     88                     | Reason::BadEnvironmentKeyIdInToken
     89                     | Reason::PayloadTooLarge => {
     90                         error!(target: "worker", "config error, check the configuration: {e}");
     91                         std::process::exit(9);
     92                     }
     93                     // Unregister
     94                     Reason::DeviceTokenNotForTopic
     95                     | Reason::ExpiredToken
     96                     | Reason::Unregistered => {
     97                         db::unregister(
     98                             pool,
     99                             &token,
    100                             &timestamp
    101                                 .and_then(|s| Timestamp::from_second(s as i64).ok())
    102                                 .unwrap_or_else(Timestamp::now),
    103                         )
    104                         .await?;
    105                         break;
    106                     }
    107                     // Wait before retry
    108                     Reason::IdleTimeout
    109                     | Reason::TooManyProviderTokenUpdates
    110                     | Reason::TooManyRequests
    111                     | Reason::InternalServerError
    112                     | Reason::ServiceUnavailable
    113                     | Reason::Shutdown => {}
    114                 }
    115             }
    116 
    117             attempt += 1;
    118             if attempt >= NB_RETRY {
    119                 error!(target: "worker", "notification to {token} failed more than {NB_RETRY} times, stopping worker");
    120                 return Err(WorkerError::Apns(e));
    121             } else {
    122                 warn!(target: "worker", "notification to {token} failed {e}, retrying after back-off");
    123                 tokio::time::sleep(jitter.backoff()).await;
    124             }
    125         }
    126         info!(target: "trace", "notification to {token} succeeded");
    127     }
    128 
    129     Ok(())
    130 }
    131 
    132 pub async fn run(cfg: &Config, pool: &PgPool, transient: bool) -> anyhow::Result<()> {
    133     let cfg = WorkerCfg::parse(cfg)?;
    134     let mut client = Client::new(&cfg.apns)?;
    135     let mut jitter = ExpoBackoffDecorr::default();
    136 
    137     if transient {
    138         wakeup(pool, &mut client).await?;
    139         return Ok(());
    140     }
    141 
    142     info!(target: "worker", "running at initialisation");
    143     loop {
    144         while let Err(WorkerError::Db(e)) = wakeup(pool, &mut client).await {
    145             error!(target: "worker", "{e}");
    146             tokio::time::sleep(jitter.backoff()).await;
    147         }
    148 
    149         // TODO take sending time into account
    150         tokio::time::sleep(cfg.frequency).await;
    151         info!(target: "worker", "running at frequency");
    152     }
    153 }