commit d72b55d18e46edb5cc63bd84d3b8124ef7c289d3
parent 9c58a4c727e36c10fe6e1eec241d2a7530cfbc97
Author: Henrique Chan Carvalho Machado <henriqueccmachado@tecnico.ulisboa.pt>
Date: Mon, 19 Jan 2026 21:31:55 +0100
Remove webhook worker and notification queue
Remove the unused webhook worker, its database queue/trigger, and related
notification code. This eliminates dead binaries, worker logic, and schema
elements that were no longer referenced by the gateway.
Diffstat:
5 files changed, 2 insertions(+), 636 deletions(-)
diff --git a/kych_oauth2_gateway/oauth2_gatewaydb/oauth2gw-0001.sql b/kych_oauth2_gateway/oauth2_gatewaydb/oauth2gw-0001.sql
@@ -13,10 +13,9 @@ CREATE TABLE IF NOT EXISTS clients (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
client_id VARCHAR(255) UNIQUE NOT NULL,
secret_hash VARCHAR(255) NOT NULL,
- webhook_url TEXT NOT NULL,
verifier_url TEXT NOT NULL,
verifier_management_api_path VARCHAR(255) DEFAULT '/management/api/verifications',
- redirect_uri TEXT,
+ redirect_uri TEXT NOT NULL,
accepted_issuer_dids TEXT,
created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
@@ -31,10 +30,8 @@ COMMENT ON COLUMN clients.redirect_uri
IS 'Default OAuth2 redirect URI for this client';
COMMENT ON COLUMN clients.accepted_issuer_dids
IS 'Comma-separated list of accepted DID issuers for credential verification';
-COMMENT ON COLUMN clients.webhook_url
- IS 'Client URL where oauth2 gateway will callback';
COMMENT ON COLUMN clients.verifier_url
- IS 'Client URL where oauth2 gateway will callback';
+ IS 'Base URL of the Swiyu verifier';
COMMENT ON COLUMN clients.verifier_management_api_path
IS 'Swiyu verifier api endpoint to create verification requests';
@@ -136,55 +133,5 @@ COMMENT ON COLUMN authorization_codes.code
COMMENT ON COLUMN authorization_codes.used
IS 'Whether code has been exchanged for an access token';
-CREATE TABLE IF NOT EXISTS notification_pending_webhooks (
- webhook_pending_serial BIGSERIAL PRIMARY KEY,
- session_id UUID NOT NULL REFERENCES verification_sessions(id) ON DELETE CASCADE,
- client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE,
- next_attempt INT8 NOT NULL DEFAULT 0,
- retries INT4 NOT NULL DEFAULT 0,
- url TEXT NOT NULL,
- http_method TEXT NOT NULL DEFAULT 'POST',
- header TEXT,
- body TEXT NOT NULL
-);
-
-CREATE INDEX idx_notification_pending_webhooks_next_attempt
- ON notification_pending_webhooks(next_attempt);
-CREATE INDEX idx_notification_pending_webhooks_session_id
- ON notification_pending_webhooks(session_id);
-
-COMMENT ON TABLE notification_pending_webhooks
- IS 'Pending client notifications to be sent by background worker';
-COMMENT ON COLUMN notification_pending_webhooks.next_attempt
- IS 'Unix timestamp when to attempt sending (0 = execute now, max value = never retry)';
-COMMENT ON COLUMN notification_pending_webhooks.retries
- IS 'Number of failed delivery attempts';
-COMMENT ON COLUMN notification_pending_webhooks.url
- IS 'Client webhook URL to POST notification';
-COMMENT ON COLUMN notification_pending_webhooks.http_method
- IS 'HTTP method for webhook (always POST for notifications)';
-COMMENT ON COLUMN notification_pending_webhooks.header
- IS 'Newline-separated HTTP headers for the webhook request';
-COMMENT ON COLUMN notification_pending_webhooks.body
- IS 'JSON body to send (stringified, contains nonce, status, code, verification_id)';
-
--- Trigger function to notify webhook worker when new webhooks are queued
-CREATE OR REPLACE FUNCTION notify_webhook_pending()
-RETURNS TRIGGER AS $$
-BEGIN
- -- Notify the webhook worker daemon
- PERFORM pg_notify('oauth2gw_webhook_pending', NEW.webhook_pending_serial::TEXT);
- RETURN NEW;
-END;
-$$ LANGUAGE plpgsql;
-
-CREATE TRIGGER trigger_webhook_pending
- AFTER INSERT ON notification_pending_webhooks
- FOR EACH ROW
- EXECUTE FUNCTION notify_webhook_pending();
-
-COMMENT ON FUNCTION notify_webhook_pending()
- IS 'Sends PostgreSQL NOTIFY to wake up webhook worker when new webhooks are queued';
-
-- Complete transaction
COMMIT;
diff --git a/kych_oauth2_gateway/src/bin/webhook_worker.rs b/kych_oauth2_gateway/src/bin/webhook_worker.rs
@@ -1,102 +0,0 @@
-//! Webhook worker daemon binary
-//!
-//! Background process that delivers webhooks to client endpoints.
-//! Uses PostgreSQL NOTIFY/LISTEN for event-driven wake-up.
-//!
-//! Usage:
-//! webhook-worker -c config.ini # Run normally
-//! webhook-worker -c config.ini -t # Test mode (exit when idle)
-
-use kych_oauth2_gateway_lib::{config::Config, db, worker};
-use anyhow::Result;
-use clap::Parser;
-use tokio::signal;
-use tokio::sync::watch;
-use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
-
-#[derive(Parser, Debug)]
-#[command(name = "webhook-worker")]
-#[command(version)]
-#[command(about = "Background process that executes webhooks")]
-struct Args {
- #[arg(short = 'c', long = "config", value_name = "FILE")]
- config: String,
-
- #[arg(short = 't', long = "test")]
- test_mode: bool,
-
- #[arg(short = 'L', long = "log-level", value_name = "LEVEL", default_value = "INFO")]
- log_level: String,
-}
-
-#[tokio::main]
-async fn main() -> Result<()> {
- let args = Args::parse();
-
- let level = args.log_level.to_lowercase();
- let filter = format!(
- "kych_webhook_worker={},kych_oauth2_gateway_lib={},tower_http={},sqlx=warn",
- level, level, level
- );
-
- tracing_subscriber::registry()
- .with(
- tracing_subscriber::EnvFilter::try_from_default_env()
- .unwrap_or_else(|_| filter.into()),
- )
- .with(
- tracing_subscriber::fmt::layer()
- .compact()
- .with_ansi(false)
- .with_timer(tracing_subscriber::fmt::time::LocalTime::rfc_3339()),
- )
- .init();
-
- tracing::info!("Starting webhook worker v{}", env!("CARGO_PKG_VERSION"));
- tracing::info!("Loading configuration from: {}", args.config);
-
- let config = Config::from_file(&args.config)?;
-
- tracing::info!("Connecting to database: {}", config.database.url);
- let pool = db::create_pool(&config.database.url).await?;
-
- // Set up shutdown signal handling
- let (shutdown_tx, shutdown_rx) = watch::channel(false);
-
- // Spawn signal handler task
- tokio::spawn(async move {
- let ctrl_c = async {
- signal::ctrl_c()
- .await
- .expect("Failed to install Ctrl+C handler");
- };
-
- #[cfg(unix)]
- let terminate = async {
- signal::unix::signal(signal::unix::SignalKind::terminate())
- .expect("Failed to install SIGTERM handler")
- .recv()
- .await;
- };
-
- #[cfg(not(unix))]
- let terminate = std::future::pending::<()>();
-
- tokio::select! {
- _ = ctrl_c => {
- tracing::info!("Received Ctrl+C, initiating shutdown");
- }
- _ = terminate => {
- tracing::info!("Received SIGTERM, initiating shutdown");
- }
- }
-
- let _ = shutdown_tx.send(true);
- });
-
- // Run the worker
- worker::run_worker(pool, &config.webhook_worker, args.test_mode, shutdown_rx).await?;
-
- tracing::info!("Webhook worker exited cleanly");
- Ok(())
-}
diff --git a/kych_oauth2_gateway/src/db/mod.rs b/kych_oauth2_gateway/src/db/mod.rs
@@ -8,7 +8,6 @@ pub mod sessions;
pub mod tokens;
pub mod clients;
pub mod authorization_codes;
-pub mod notification_webhooks;
/// Create a PostgreSQL connection pool
///
diff --git a/kych_oauth2_gateway/src/db/notification_webhooks.rs b/kych_oauth2_gateway/src/db/notification_webhooks.rs
@@ -1,219 +0,0 @@
-// Database operations for notification_pending_webhooks table
-
-use sqlx::PgPool;
-use anyhow::Result;
-use uuid::Uuid;
-
-/// Pending webhook record with authorization code for client notification
-#[derive(Debug, Clone)]
-pub struct PendingWebhook {
- pub webhook_pending_serial: i64,
- pub session_id: Uuid,
- pub client_id: Uuid,
- pub url: String,
- pub http_method: String,
- pub header: Option<String>,
- pub body: String,
- pub retries: i32,
- /// Authorization code to include in the webhook payload
- pub code: String,
-}
-
-/// Fetch pending webhooks ready to be sent
-///
-/// Only returns webhooks where next_attempt <= current epoch time.
-///
-/// Used by the background worker
-pub async fn get_pending_webhooks(
- pool: &PgPool,
- limit: i64,
-) -> Result<Vec<PendingWebhook>> {
- let webhooks = sqlx::query(
- r#"
- SELECT
- npw.webhook_pending_serial,
- npw.session_id,
- npw.client_id,
- npw.url,
- npw.http_method,
- npw.header,
- npw.body,
- npw.retries,
- ac.code
- FROM oauth2gw.notification_pending_webhooks npw
- INNER JOIN oauth2gw.authorization_codes ac ON ac.session_id = npw.session_id
- WHERE npw.next_attempt <= EXTRACT(EPOCH FROM NOW())
- ORDER BY npw.webhook_pending_serial
- LIMIT $1
- "#
- )
- .bind(limit)
- .fetch_all(pool)
- .await?;
-
- Ok(webhooks.into_iter().map(|row: sqlx::postgres::PgRow| {
- use sqlx::Row;
- PendingWebhook {
- webhook_pending_serial: row.get("webhook_pending_serial"),
- session_id: row.get("session_id"),
- client_id: row.get("client_id"),
- url: row.get("url"),
- http_method: row.get("http_method"),
- header: row.get("header"),
- body: row.get("body"),
- retries: row.get("retries"),
- code: row.get("code"),
- }
- }).collect())
-}
-
-/// Delete a webhook after successful delivery
-pub async fn delete_webhook(
- pool: &PgPool,
- webhook_pending_serial: i64,
-) -> Result<bool> {
- let result = sqlx::query(
- r#"
- DELETE FROM oauth2gw.notification_pending_webhooks
- WHERE webhook_pending_serial = $1
- "#
- )
- .bind(webhook_pending_serial)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Disable a webhook (client error - no retry)
-///
-/// Sets next_attempt to max i64 value so it never gets picked up again
-pub async fn disable_webhook(
- pool: &PgPool,
- webhook_pending_serial: i64,
-) -> Result<bool> {
- let result = sqlx::query(
- r#"
- UPDATE oauth2gw.notification_pending_webhooks
- SET next_attempt = 9223372036854775807
- WHERE webhook_pending_serial = $1
- "#
- )
- .bind(webhook_pending_serial)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Schedule retry with exponential backoff
-///
-/// - Server errors (5xx): retry after delay_seconds (default 60)
-/// - Network/timeout: retry after longer delay (default 3600)
-pub async fn schedule_retry(
- pool: &PgPool,
- webhook_pending_serial: i64,
- delay_seconds: i64,
-) -> Result<bool> {
- let result = sqlx::query(
- r#"
- UPDATE oauth2gw.notification_pending_webhooks
- SET next_attempt = EXTRACT(EPOCH FROM NOW()) + $1,
- retries = retries + 1
- WHERE webhook_pending_serial = $2
- "#
- )
- .bind(delay_seconds)
- .bind(webhook_pending_serial)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected() > 0)
-}
-
-/// Insert a new pending webhook notification
-///
-/// Triggered by the /notification handler after verification is complete
-pub async fn insert_pending_webhook(
- pool: &PgPool,
- session_id: Uuid,
- client_id: Uuid,
- url: &str,
- body: &str,
-) -> Result<i64> {
- let result = sqlx::query_scalar::<_, i64>(
- r#"
- INSERT INTO oauth2gw.notification_pending_webhooks
- (session_id, client_id, url, body, next_attempt)
- VALUES ($1, $2, $3, $4, 0)
- RETURNING webhook_pending_serial
- "#
- )
- .bind(session_id)
- .bind(client_id)
- .bind(url)
- .bind(body)
- .fetch_one(pool)
- .await?;
-
- Ok(result)
-}
-
-/// Delete old disabled webhooks (garbage collection)
-///
-/// Removes webhooks that have been disabled (next_attempt = MAX) for cleanup
-pub async fn delete_disabled_webhooks(pool: &PgPool) -> Result<u64> {
- let result = sqlx::query(
- r#"
- DELETE FROM oauth2gw.notification_pending_webhooks
- WHERE next_attempt = 9223372036854775807
- "#
- )
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected())
-}
-
-/// Delete webhooks that have exceeded max retries
-pub async fn delete_max_retries(
- pool: &PgPool,
- max_retries: i32,
-) -> Result<u64> {
- let result = sqlx::query(
- r#"
- DELETE FROM oauth2gw.notification_pending_webhooks
- WHERE retries >= $1
- "#
- )
- .bind(max_retries)
- .execute(pool)
- .await?;
-
- Ok(result.rows_affected())
-}
-
-/// Get the next scheduled webhook attempt time
-///
-/// Returns the earliest `next_attempt` timestamp for webhooks that are:
-/// - Not disabled (next_attempt < MAX_INT8)
-/// - Scheduled for the future (next_attempt > NOW)
-///
-/// Used by worker to schedule precise wake-up for retries.
-/// Returns None if no future webhooks are scheduled.
-pub async fn get_next_scheduled_webhook(pool: &PgPool) -> Result<Option<i64>> {
- let result = sqlx::query_scalar::<_, i64>(
- r#"
- SELECT next_attempt
- FROM oauth2gw.notification_pending_webhooks
- WHERE next_attempt > EXTRACT(EPOCH FROM NOW())
- AND next_attempt < 9223372036854775807
- ORDER BY next_attempt ASC
- LIMIT 1
- "#
- )
- .fetch_optional(pool)
- .await?;
-
- Ok(result)
-}
-\ No newline at end of file
diff --git a/kych_oauth2_gateway/src/worker.rs b/kych_oauth2_gateway/src/worker.rs
@@ -1,258 +0,0 @@
-//! Webhook worker daemon
-//!
-//! Background process that delivers webhooks to client endpoints.
-//! Uses PostgreSQL NOTIFY/LISTEN for event-driven wake-up.
-
-use anyhow::Result;
-use sqlx::postgres::PgListener;
-use sqlx::PgPool;
-use std::sync::atomic::{AtomicUsize, Ordering};
-use std::sync::Arc;
-use std::time::Duration;
-use tokio::sync::watch;
-use tracing::{debug, error, info, warn};
-
-use crate::{
- config::WebhookWorkerConfig,
- db::notification_webhooks::{
- delete_webhook, disable_webhook, get_next_scheduled_webhook, get_pending_webhooks,
- schedule_retry, PendingWebhook,
- },
-};
-
-/// Run the webhook worker
-///
-/// This function blocks until shutdown signal is received.
-/// 1. Fetch pending webhooks (next_attempt <= NOW)
-/// 2. Execute HTTP requests, wait for all to complete
-/// 3. If no pending, check for future webhooks and schedule wake-up
-/// 4. Fallback poll configured interval
-pub async fn run_worker(
- pool: PgPool,
- config: &WebhookWorkerConfig,
- test_mode: bool,
- mut shutdown: watch::Receiver<bool>,
-) -> Result<()> {
- info!("Starting webhook worker (test_mode={})", test_mode);
-
- let http_client = reqwest::Client::builder()
- .timeout(Duration::from_secs(30))
- .redirect(reqwest::redirect::Policy::limited(5))
- .build()?;
-
- // Set up psql LISTEN
- let mut listener = PgListener::connect_with(&pool).await?;
- listener.listen("oauth2gw_webhook_pending").await?;
- info!("Listening on channel 'oauth2gw_webhook_pending'");
-
- loop {
- // select work: process pending webhooks (keep looping while there's work)
- loop {
- let has_work = select_work(&pool, &http_client, config).await;
-
- if !has_work {
- if test_mode {
- info!("Test mode: no pending webhooks, exiting");
- return Ok(());
- }
- break; // Exit inner loop, wait for NOTIFY/timer
- }
- // If has_work, loop again to check for more pending webhooks
- }
-
- // Determine how long to sleep
- let sleep_duration = match get_next_scheduled_webhook(&pool).await {
- Ok(Some(next_attempt)) => {
- let now = chrono::Utc::now().timestamp();
- let delay = (next_attempt - now).max(0) as u64;
- debug!("Next webhook scheduled in {} seconds", delay);
- Duration::from_secs(delay)
- }
- Ok(None) => {
- debug!("No future webhooks, fallback poll in {} seconds", config.fallback_poll_secs);
- Duration::from_secs(config.fallback_poll_secs)
- }
- Err(e) => {
- error!("Failed to get next scheduled webhook: {}", e);
- Duration::from_secs(config.fallback_poll_secs)
- }
- };
-
- tokio::select! {
- // Shutdown signal
- _ = shutdown.changed() => {
- if *shutdown.borrow() {
- info!("Shutdown signal received");
- break;
- }
- }
-
- // psql NOTIFY received - immediately process
- notification = listener.recv() => {
- match notification {
- Ok(n) => {
- debug!("NOTIFY received: {}", n.payload());
- // Don't sleep, go straight to select_work
- continue;
- }
- Err(e) => {
- error!("LISTEN error: {}", e);
- tokio::time::sleep(Duration::from_secs(1)).await;
- }
- }
- }
-
- // Sleep until next scheduled webhook or fallback poll
- _ = tokio::time::sleep(sleep_duration) => {
- debug!("Timer expired, checking for work");
- }
- }
- }
-
- info!("Webhook worker stopped");
- Ok(())
-}
-
-/// Process all pending webhooks
-///
-/// Returns true if any webhooks were processed.
-async fn select_work(pool: &PgPool, http_client: &reqwest::Client, config: &WebhookWorkerConfig) -> bool {
- let webhooks = match get_pending_webhooks(pool, config.batch_size).await {
- Ok(w) => w,
- Err(e) => {
- error!("Failed to fetch pending webhooks: {}", e);
- return false;
- }
- };
-
- if webhooks.is_empty() {
- debug!("No pending webhooks");
- return false;
- }
-
- info!("Processing {} pending webhooks", webhooks.len());
-
- // Track in-flight jobs
- let in_flight = Arc::new(AtomicUsize::new(webhooks.len()));
- let mut handles = Vec::with_capacity(webhooks.len());
-
- for webhook in webhooks {
- let pool = pool.clone();
- let client = http_client.clone();
- let counter = in_flight.clone();
- let cfg = config.clone();
-
- // Fire HTTP request
- let handle = tokio::spawn(async move {
- execute_webhook(&pool, &client, webhook, &cfg).await;
- counter.fetch_sub(1, Ordering::SeqCst);
- });
-
- handles.push(handle);
- }
-
- // Wait for all jobs to complete
- for handle in handles {
- if let Err(e) = handle.await {
- error!("Webhook task panicked: {}", e);
- }
- }
-
- true
-}
-
-/// Execute a single webhook HTTP request
-async fn execute_webhook(pool: &PgPool, client: &reqwest::Client, webhook: PendingWebhook, config: &WebhookWorkerConfig) {
- let serial = webhook.webhook_pending_serial;
-
- debug!(
- "Webhook {}: {} {} (retry #{})",
- serial, webhook.http_method, webhook.url, webhook.retries
- );
-
- // Build request based on http_method
- let mut request = match webhook.http_method.to_uppercase().as_str() {
- "POST" => client.post(&webhook.url),
- "PUT" => client.put(&webhook.url),
- "GET" => client.get(&webhook.url),
- "DELETE" => client.delete(&webhook.url),
- method => {
- error!("Unsupported HTTP method '{}' for webhook {}", method, serial);
- let _ = disable_webhook(pool, serial).await;
- return;
- }
- };
-
- // Add headers if present
- if let Some(headers) = &webhook.header {
- for line in headers.lines() {
- if let Some((name, value)) = line.split_once(':') {
- request = request.header(name.trim(), value.trim());
- }
- }
- }
-
- // Add body for POST/PUT
- if matches!(webhook.http_method.to_uppercase().as_str(), "POST" | "PUT") {
- request = request
- .header("Content-Type", "application/json")
- .body(webhook.body.clone());
- }
-
- // Execute request
- let response = match request.send().await {
- Ok(r) => r,
- Err(e) => {
- warn!("Network error for webhook {}: {}", serial, e);
- handle_webhook_response(pool, serial, 0, config).await;
- return;
- }
- };
-
- let status_code = response.status().as_u16();
- handle_webhook_response(pool, serial, status_code.into(), config).await;
-}
-
-/// Handle webhook response
-///
-/// - 2xx: delete pending webhook (success)
-/// - 400: next_attempt = FOREVER (never retry)
-/// - 500: next_attempt = NOW + configured delay
-/// - 403: next_attempt = NOW + configured delay
-/// - other: next_attempt = NOW + configured delay
-async fn handle_webhook_response(pool: &PgPool, serial: i64, response_code: i64, config: &WebhookWorkerConfig) {
- info!("Webhook {} returned with status {}", serial, response_code);
-
- // 200 success - delete from queue
- if response_code >= 200 && response_code < 300 {
- match delete_webhook(pool, serial).await {
- Ok(true) => {
- debug!("Webhook {} deleted successfully", serial);
- }
- Ok(false) => {
- warn!("Webhook {} not found for deletion", serial);
- }
- Err(e) => {
- error!("Failed to delete webhook {}: {}", serial, e);
- }
- }
- return;
- }
-
- // Determine retry delay based on response code
- let delay = match response_code {
- 400 => {
- warn!("Webhook {} got 400, disabling permanently", serial);
- let _ = disable_webhook(pool, serial).await;
- return;
- }
- 500 => config.retry_delay_server_error,
- 403 => config.retry_delay_forbidden,
- _ => config.retry_delay_other,
- };
-
- debug!("Scheduling webhook {} retry in {} seconds", serial, delay);
- if let Err(e) = schedule_retry(pool, serial, delay).await {
- error!("Failed to schedule retry for webhook {}: {}", serial, e);
- }
-}