worker.rs (6327B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::time::Duration; 18 19 use jiff::Timestamp; 20 use sqlx::PgPool; 21 use taler_common::{ExpoBackoffDecorr, config::Config}; 22 use tracing::{error, info, trace, warn}; 23 24 use crate::{ 25 apns::{ApnsError, Client, Reason}, 26 config::WorkerCfg, 27 db, 28 }; 29 30 #[derive(Debug, thiserror::Error)] 31 pub enum WorkerError { 32 #[error(transparent)] 33 Db(#[from] sqlx::Error), 34 #[error(transparent)] 35 Apns(#[from] ApnsError), 36 } 37 38 const NB_RETRY: usize = 5; 39 40 async fn wakeup(pool: &PgPool, client: &mut Client) -> Result<(), WorkerError> { 41 let tokens = db::all_registrations(pool).await?; 42 info!(target: "worker", "send notification to {} devices", tokens.len()); 43 44 // Wait at least 15 minutes before retrying API calls as per: https://developer.apple.com/documentation/usernotifications/sending-notification-requests-to-apns#Follow-best-practices-while-sending-push-notifications-with-APNs 45 // After 15 minutes, you can retry JSON payloads that receive response status codes that begin with 5XX. 46 // While retrying, you may employ a back-off technique. Most notifications with the status code 4XX can 47 // be retried after you fix the error noted in the reason field. Don’t retry notification responses with 48 // the error code BadDeviceToken, DeviceTokenNotForTopic, Forbidden, ExpiredToken, Unregistered, or PayloadTooLarge. 49 // You can retry with a delay, if you get the error code TooManyRequests. 50 let mut jitter = ExpoBackoffDecorr::new(Duration::from_mins(15), Duration::from_hours(1), 2.5); 51 52 // TODO paginate tokens 53 // TODO send in batches 54 55 for token in tokens { 56 trace!(target: "worker", "send background notification to {token}"); 57 let mut attempt = 0; 58 while let Err(e) = client.send(&token).await { 59 trace!(target: "worker", "notification to {token} failed {e}"); 60 if let ApnsError::Err { reason, timestamp } = e { 61 match reason { 62 // Fatal error 63 Reason::BadCollapseId 64 | Reason::BadDeviceToken 65 | Reason::BadExpirationDate 66 | Reason::BadMessageId 67 | Reason::BadPriority 68 | Reason::BadTopic 69 | Reason::DuplicateHeaders 70 | Reason::InvalidPushType 71 | Reason::MissingDeviceToken 72 | Reason::MissingTopic 73 | Reason::PayloadEmpty 74 | Reason::BadPath 75 | Reason::MethodNotAllowed => { 76 error!(target: "worker", "fatal error the service is broken: {e}"); 77 std::process::exit(9); 78 } 79 // Config error 80 Reason::TopicDisallowed 81 | Reason::BadCertificate 82 | Reason::BadCertificateEnvironment 83 | Reason::ExpiredProviderToken 84 | Reason::Forbidden 85 | Reason::InvalidProviderToken 86 | Reason::MissingProviderToken 87 | Reason::UnrelatedKeyIdInToken 88 | Reason::BadEnvironmentKeyIdInToken 89 | Reason::PayloadTooLarge => { 90 error!(target: "worker", "config error, check the configuration: {e}"); 91 std::process::exit(9); 92 } 93 // Unregister 94 Reason::DeviceTokenNotForTopic 95 | Reason::ExpiredToken 96 | Reason::Unregistered => { 97 db::unregister( 98 pool, 99 &token, 100 ×tamp 101 .and_then(|s| Timestamp::from_second(s as i64).ok()) 102 .unwrap_or_else(Timestamp::now), 103 ) 104 .await?; 105 break; 106 } 107 // Wait before retry 108 Reason::IdleTimeout 109 | Reason::TooManyProviderTokenUpdates 110 | Reason::TooManyRequests 111 | Reason::InternalServerError 112 | Reason::ServiceUnavailable 113 | Reason::Shutdown => {} 114 } 115 } 116 117 attempt += 1; 118 if attempt >= NB_RETRY { 119 error!(target: "worker", "notification to {token} failed more than {NB_RETRY} times, stopping worker"); 120 return Err(WorkerError::Apns(e)); 121 } else { 122 warn!(target: "worker", "notification to {token} failed {e}, retrying after back-off"); 123 tokio::time::sleep(jitter.backoff()).await; 124 } 125 } 126 info!(target: "trace", "notification to {token} succeeded"); 127 } 128 129 Ok(()) 130 } 131 132 pub async fn run(cfg: &Config, pool: &PgPool, transient: bool) -> anyhow::Result<()> { 133 let cfg = WorkerCfg::parse(cfg)?; 134 let mut client = Client::new(&cfg.apns)?; 135 let mut jitter = ExpoBackoffDecorr::default(); 136 137 if transient { 138 wakeup(pool, &mut client).await?; 139 return Ok(()); 140 } 141 142 info!(target: "worker", "running at initialisation"); 143 loop { 144 while let Err(WorkerError::Db(e)) = wakeup(pool, &mut client).await { 145 error!(target: "worker", "{e}"); 146 tokio::time::sleep(jitter.backoff()).await; 147 } 148 149 // TODO take sending time into account 150 tokio::time::sleep(cfg.frequency).await; 151 info!(target: "worker", "running at frequency"); 152 } 153 }