kych

OAuth 2.0 API for Swiyu to enable Taler integration of Swiyu for KYC (experimental)
Log | Files | Refs

worker.rs (8622B)


      1 //! Webhook worker daemon
      2 //!
      3 //! Background process that delivers webhooks to client endpoints.
      4 //! Uses PostgreSQL NOTIFY/LISTEN for event-driven wake-up.
      5 
      6 use anyhow::Result;
      7 use sqlx::postgres::PgListener;
      8 use sqlx::PgPool;
      9 use std::sync::atomic::{AtomicUsize, Ordering};
     10 use std::sync::Arc;
     11 use std::time::Duration;
     12 use tokio::sync::watch;
     13 use tracing::{debug, error, info, warn};
     14 
     15 use crate::{
     16     config::WebhookWorkerConfig,
     17     db::notification_webhooks::{
     18         delete_webhook, disable_webhook, get_next_scheduled_webhook, get_pending_webhooks,
     19         schedule_retry, PendingWebhook,
     20     },
     21 };
     22 
     23 /// Run the webhook worker
     24 ///
     25 /// This function blocks until shutdown signal is received.
     26 /// 1. Fetch pending webhooks (next_attempt <= NOW)
     27 /// 2. Execute HTTP requests, wait for all to complete
     28 /// 3. If no pending, check for future webhooks and schedule wake-up
     29 /// 4. Fallback poll configured interval
     30 pub async fn run_worker(
     31     pool: PgPool,
     32     config: &WebhookWorkerConfig,
     33     test_mode: bool,
     34     mut shutdown: watch::Receiver<bool>,
     35 ) -> Result<()> {
     36     info!("Starting webhook worker (test_mode={})", test_mode);
     37 
     38     let http_client = reqwest::Client::builder()
     39         .timeout(Duration::from_secs(30))
     40         .redirect(reqwest::redirect::Policy::limited(5))
     41         .build()?;
     42 
     43     // Set up psql LISTEN
     44     let mut listener = PgListener::connect_with(&pool).await?;
     45     listener.listen("oauth2gw_webhook_pending").await?;
     46     info!("Listening on channel 'oauth2gw_webhook_pending'");
     47 
     48     loop {
     49         // select work: process pending webhooks (keep looping while there's work)
     50         loop {
     51             let has_work = select_work(&pool, &http_client, config).await;
     52 
     53             if !has_work {
     54                 if test_mode {
     55                     info!("Test mode: no pending webhooks, exiting");
     56                     return Ok(());
     57                 }
     58                 break; // Exit inner loop, wait for NOTIFY/timer
     59             }
     60             // If has_work, loop again to check for more pending webhooks
     61         }
     62 
     63         // Determine how long to sleep
     64         let sleep_duration = match get_next_scheduled_webhook(&pool).await {
     65             Ok(Some(next_attempt)) => {
     66                 let now = chrono::Utc::now().timestamp();
     67                 let delay = (next_attempt - now).max(0) as u64;
     68                 debug!("Next webhook scheduled in {} seconds", delay);
     69                 Duration::from_secs(delay)
     70             }
     71             Ok(None) => {
     72                 debug!("No future webhooks, fallback poll in {} seconds", config.fallback_poll_secs);
     73                 Duration::from_secs(config.fallback_poll_secs)
     74             }
     75             Err(e) => {
     76                 error!("Failed to get next scheduled webhook: {}", e);
     77                 Duration::from_secs(config.fallback_poll_secs)
     78             }
     79         };
     80 
     81         tokio::select! {
     82             // Shutdown signal
     83             _ = shutdown.changed() => {
     84                 if *shutdown.borrow() {
     85                     info!("Shutdown signal received");
     86                     break;
     87                 }
     88             }
     89 
     90             // psql NOTIFY received - immediately process
     91             notification = listener.recv() => {
     92                 match notification {
     93                     Ok(n) => {
     94                         debug!("NOTIFY received: {}", n.payload());
     95                         // Don't sleep, go straight to select_work
     96                         continue;
     97                     }
     98                     Err(e) => {
     99                         error!("LISTEN error: {}", e);
    100                         tokio::time::sleep(Duration::from_secs(1)).await;
    101                     }
    102                 }
    103             }
    104 
    105             // Sleep until next scheduled webhook or fallback poll
    106             _ = tokio::time::sleep(sleep_duration) => {
    107                 debug!("Timer expired, checking for work");
    108             }
    109         }
    110     }
    111 
    112     info!("Webhook worker stopped");
    113     Ok(())
    114 }
    115 
    116 /// Process all pending webhooks
    117 ///
    118 /// Returns true if any webhooks were processed.
    119 async fn select_work(pool: &PgPool, http_client: &reqwest::Client, config: &WebhookWorkerConfig) -> bool {
    120     let webhooks = match get_pending_webhooks(pool, config.batch_size).await {
    121         Ok(w) => w,
    122         Err(e) => {
    123             error!("Failed to fetch pending webhooks: {}", e);
    124             return false;
    125         }
    126     };
    127 
    128     if webhooks.is_empty() {
    129         debug!("No pending webhooks");
    130         return false;
    131     }
    132 
    133     info!("Processing {} pending webhooks", webhooks.len());
    134 
    135     // Track in-flight jobs
    136     let in_flight = Arc::new(AtomicUsize::new(webhooks.len()));
    137     let mut handles = Vec::with_capacity(webhooks.len());
    138 
    139     for webhook in webhooks {
    140         let pool = pool.clone();
    141         let client = http_client.clone();
    142         let counter = in_flight.clone();
    143         let cfg = config.clone();
    144 
    145         // Fire HTTP request
    146         let handle = tokio::spawn(async move {
    147             execute_webhook(&pool, &client, webhook, &cfg).await;
    148             counter.fetch_sub(1, Ordering::SeqCst);
    149         });
    150 
    151         handles.push(handle);
    152     }
    153 
    154     // Wait for all jobs to complete
    155     for handle in handles {
    156         if let Err(e) = handle.await {
    157             error!("Webhook task panicked: {}", e);
    158         }
    159     }
    160 
    161     true
    162 }
    163 
    164 /// Execute a single webhook HTTP request
    165 async fn execute_webhook(pool: &PgPool, client: &reqwest::Client, webhook: PendingWebhook, config: &WebhookWorkerConfig) {
    166     let serial = webhook.webhook_pending_serial;
    167 
    168     debug!(
    169         "Webhook {}: {} {} (retry #{})",
    170         serial, webhook.http_method, webhook.url, webhook.retries
    171     );
    172 
    173     // Build request based on http_method
    174     let mut request = match webhook.http_method.to_uppercase().as_str() {
    175         "POST" => client.post(&webhook.url),
    176         "PUT" => client.put(&webhook.url),
    177         "GET" => client.get(&webhook.url),
    178         "DELETE" => client.delete(&webhook.url),
    179         method => {
    180             error!("Unsupported HTTP method '{}' for webhook {}", method, serial);
    181             let _ = disable_webhook(pool, serial).await;
    182             return;
    183         }
    184     };
    185 
    186     // Add headers if present
    187     if let Some(headers) = &webhook.header {
    188         for line in headers.lines() {
    189             if let Some((name, value)) = line.split_once(':') {
    190                 request = request.header(name.trim(), value.trim());
    191             }
    192         }
    193     }
    194 
    195     // Add body for POST/PUT
    196     if matches!(webhook.http_method.to_uppercase().as_str(), "POST" | "PUT") {
    197         request = request
    198             .header("Content-Type", "application/json")
    199             .body(webhook.body.clone());
    200     }
    201 
    202     // Execute request
    203     let response = match request.send().await {
    204         Ok(r) => r,
    205         Err(e) => {
    206             warn!("Network error for webhook {}: {}", serial, e);
    207             handle_webhook_response(pool, serial, 0, config).await;
    208             return;
    209         }
    210     };
    211 
    212     let status_code = response.status().as_u16();
    213     handle_webhook_response(pool, serial, status_code.into(), config).await;
    214 }
    215 
    216 /// Handle webhook response
    217 ///
    218 /// - 2xx: delete pending webhook (success)
    219 /// - 400: next_attempt = FOREVER (never retry)
    220 /// - 500: next_attempt = NOW + configured delay
    221 /// - 403: next_attempt = NOW + configured delay
    222 /// - other: next_attempt = NOW + configured delay
    223 async fn handle_webhook_response(pool: &PgPool, serial: i64, response_code: i64, config: &WebhookWorkerConfig) {
    224     info!("Webhook {} returned with status {}", serial, response_code);
    225 
    226     // 200 success - delete from queue
    227     if response_code >= 200 && response_code < 300 {
    228         match delete_webhook(pool, serial).await {
    229             Ok(true) => {
    230                 debug!("Webhook {} deleted successfully", serial);
    231             }
    232             Ok(false) => {
    233                 warn!("Webhook {} not found for deletion", serial);
    234             }
    235             Err(e) => {
    236                 error!("Failed to delete webhook {}: {}", serial, e);
    237             }
    238         }
    239         return;
    240     }
    241 
    242     // Determine retry delay based on response code
    243     let delay = match response_code {
    244         400 => {
    245             warn!("Webhook {} got 400, disabling permanently", serial);
    246             let _ = disable_webhook(pool, serial).await;
    247             return;
    248         }
    249         500 => config.retry_delay_server_error,
    250         403 => config.retry_delay_forbidden,
    251         _ => config.retry_delay_other,
    252     };
    253 
    254     debug!("Scheduling webhook {} retry in {} seconds", serial, delay);
    255     if let Err(e) = schedule_retry(pool, serial, delay).await {
    256         error!("Failed to schedule retry for webhook {}: {}", serial, e);
    257     }
    258 }