worker.rs (8622B)
1 //! Webhook worker daemon 2 //! 3 //! Background process that delivers webhooks to client endpoints. 4 //! Uses PostgreSQL NOTIFY/LISTEN for event-driven wake-up. 5 6 use anyhow::Result; 7 use sqlx::postgres::PgListener; 8 use sqlx::PgPool; 9 use std::sync::atomic::{AtomicUsize, Ordering}; 10 use std::sync::Arc; 11 use std::time::Duration; 12 use tokio::sync::watch; 13 use tracing::{debug, error, info, warn}; 14 15 use crate::{ 16 config::WebhookWorkerConfig, 17 db::notification_webhooks::{ 18 delete_webhook, disable_webhook, get_next_scheduled_webhook, get_pending_webhooks, 19 schedule_retry, PendingWebhook, 20 }, 21 }; 22 23 /// Run the webhook worker 24 /// 25 /// This function blocks until shutdown signal is received. 26 /// 1. Fetch pending webhooks (next_attempt <= NOW) 27 /// 2. Execute HTTP requests, wait for all to complete 28 /// 3. If no pending, check for future webhooks and schedule wake-up 29 /// 4. Fallback poll configured interval 30 pub async fn run_worker( 31 pool: PgPool, 32 config: &WebhookWorkerConfig, 33 test_mode: bool, 34 mut shutdown: watch::Receiver<bool>, 35 ) -> Result<()> { 36 info!("Starting webhook worker (test_mode={})", test_mode); 37 38 let http_client = reqwest::Client::builder() 39 .timeout(Duration::from_secs(30)) 40 .redirect(reqwest::redirect::Policy::limited(5)) 41 .build()?; 42 43 // Set up psql LISTEN 44 let mut listener = PgListener::connect_with(&pool).await?; 45 listener.listen("oauth2gw_webhook_pending").await?; 46 info!("Listening on channel 'oauth2gw_webhook_pending'"); 47 48 loop { 49 // select work: process pending webhooks (keep looping while there's work) 50 loop { 51 let has_work = select_work(&pool, &http_client, config).await; 52 53 if !has_work { 54 if test_mode { 55 info!("Test mode: no pending webhooks, exiting"); 56 return Ok(()); 57 } 58 break; // Exit inner loop, wait for NOTIFY/timer 59 } 60 // If has_work, loop again to check for more pending webhooks 61 } 62 63 // Determine how long to sleep 64 let sleep_duration = match get_next_scheduled_webhook(&pool).await { 65 Ok(Some(next_attempt)) => { 66 let now = chrono::Utc::now().timestamp(); 67 let delay = (next_attempt - now).max(0) as u64; 68 debug!("Next webhook scheduled in {} seconds", delay); 69 Duration::from_secs(delay) 70 } 71 Ok(None) => { 72 debug!("No future webhooks, fallback poll in {} seconds", config.fallback_poll_secs); 73 Duration::from_secs(config.fallback_poll_secs) 74 } 75 Err(e) => { 76 error!("Failed to get next scheduled webhook: {}", e); 77 Duration::from_secs(config.fallback_poll_secs) 78 } 79 }; 80 81 tokio::select! { 82 // Shutdown signal 83 _ = shutdown.changed() => { 84 if *shutdown.borrow() { 85 info!("Shutdown signal received"); 86 break; 87 } 88 } 89 90 // psql NOTIFY received - immediately process 91 notification = listener.recv() => { 92 match notification { 93 Ok(n) => { 94 debug!("NOTIFY received: {}", n.payload()); 95 // Don't sleep, go straight to select_work 96 continue; 97 } 98 Err(e) => { 99 error!("LISTEN error: {}", e); 100 tokio::time::sleep(Duration::from_secs(1)).await; 101 } 102 } 103 } 104 105 // Sleep until next scheduled webhook or fallback poll 106 _ = tokio::time::sleep(sleep_duration) => { 107 debug!("Timer expired, checking for work"); 108 } 109 } 110 } 111 112 info!("Webhook worker stopped"); 113 Ok(()) 114 } 115 116 /// Process all pending webhooks 117 /// 118 /// Returns true if any webhooks were processed. 119 async fn select_work(pool: &PgPool, http_client: &reqwest::Client, config: &WebhookWorkerConfig) -> bool { 120 let webhooks = match get_pending_webhooks(pool, config.batch_size).await { 121 Ok(w) => w, 122 Err(e) => { 123 error!("Failed to fetch pending webhooks: {}", e); 124 return false; 125 } 126 }; 127 128 if webhooks.is_empty() { 129 debug!("No pending webhooks"); 130 return false; 131 } 132 133 info!("Processing {} pending webhooks", webhooks.len()); 134 135 // Track in-flight jobs 136 let in_flight = Arc::new(AtomicUsize::new(webhooks.len())); 137 let mut handles = Vec::with_capacity(webhooks.len()); 138 139 for webhook in webhooks { 140 let pool = pool.clone(); 141 let client = http_client.clone(); 142 let counter = in_flight.clone(); 143 let cfg = config.clone(); 144 145 // Fire HTTP request 146 let handle = tokio::spawn(async move { 147 execute_webhook(&pool, &client, webhook, &cfg).await; 148 counter.fetch_sub(1, Ordering::SeqCst); 149 }); 150 151 handles.push(handle); 152 } 153 154 // Wait for all jobs to complete 155 for handle in handles { 156 if let Err(e) = handle.await { 157 error!("Webhook task panicked: {}", e); 158 } 159 } 160 161 true 162 } 163 164 /// Execute a single webhook HTTP request 165 async fn execute_webhook(pool: &PgPool, client: &reqwest::Client, webhook: PendingWebhook, config: &WebhookWorkerConfig) { 166 let serial = webhook.webhook_pending_serial; 167 168 debug!( 169 "Webhook {}: {} {} (retry #{})", 170 serial, webhook.http_method, webhook.url, webhook.retries 171 ); 172 173 // Build request based on http_method 174 let mut request = match webhook.http_method.to_uppercase().as_str() { 175 "POST" => client.post(&webhook.url), 176 "PUT" => client.put(&webhook.url), 177 "GET" => client.get(&webhook.url), 178 "DELETE" => client.delete(&webhook.url), 179 method => { 180 error!("Unsupported HTTP method '{}' for webhook {}", method, serial); 181 let _ = disable_webhook(pool, serial).await; 182 return; 183 } 184 }; 185 186 // Add headers if present 187 if let Some(headers) = &webhook.header { 188 for line in headers.lines() { 189 if let Some((name, value)) = line.split_once(':') { 190 request = request.header(name.trim(), value.trim()); 191 } 192 } 193 } 194 195 // Add body for POST/PUT 196 if matches!(webhook.http_method.to_uppercase().as_str(), "POST" | "PUT") { 197 request = request 198 .header("Content-Type", "application/json") 199 .body(webhook.body.clone()); 200 } 201 202 // Execute request 203 let response = match request.send().await { 204 Ok(r) => r, 205 Err(e) => { 206 warn!("Network error for webhook {}: {}", serial, e); 207 handle_webhook_response(pool, serial, 0, config).await; 208 return; 209 } 210 }; 211 212 let status_code = response.status().as_u16(); 213 handle_webhook_response(pool, serial, status_code.into(), config).await; 214 } 215 216 /// Handle webhook response 217 /// 218 /// - 2xx: delete pending webhook (success) 219 /// - 400: next_attempt = FOREVER (never retry) 220 /// - 500: next_attempt = NOW + configured delay 221 /// - 403: next_attempt = NOW + configured delay 222 /// - other: next_attempt = NOW + configured delay 223 async fn handle_webhook_response(pool: &PgPool, serial: i64, response_code: i64, config: &WebhookWorkerConfig) { 224 info!("Webhook {} returned with status {}", serial, response_code); 225 226 // 200 success - delete from queue 227 if response_code >= 200 && response_code < 300 { 228 match delete_webhook(pool, serial).await { 229 Ok(true) => { 230 debug!("Webhook {} deleted successfully", serial); 231 } 232 Ok(false) => { 233 warn!("Webhook {} not found for deletion", serial); 234 } 235 Err(e) => { 236 error!("Failed to delete webhook {}: {}", serial, e); 237 } 238 } 239 return; 240 } 241 242 // Determine retry delay based on response code 243 let delay = match response_code { 244 400 => { 245 warn!("Webhook {} got 400, disabling permanently", serial); 246 let _ = disable_webhook(pool, serial).await; 247 return; 248 } 249 500 => config.retry_delay_server_error, 250 403 => config.retry_delay_forbidden, 251 _ => config.retry_delay_other, 252 }; 253 254 debug!("Scheduling webhook {} retry in {} seconds", serial, delay); 255 if let Err(e) = schedule_retry(pool, serial, delay).await { 256 error!("Failed to schedule retry for webhook {}: {}", serial, e); 257 } 258 }