webhook_worker.rs (2918B)
1 //! Webhook worker daemon binary 2 //! 3 //! Background process that delivers webhooks to client endpoints. 4 //! Uses PostgreSQL NOTIFY/LISTEN for event-driven wake-up. 5 //! 6 //! Usage: 7 //! webhook-worker -c config.ini # Run normally 8 //! webhook-worker -c config.ini -t # Test mode (exit when idle) 9 10 use oauth2_gateway::{config::Config, db, worker}; 11 use anyhow::Result; 12 use clap::Parser; 13 use tokio::signal; 14 use tokio::sync::watch; 15 use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; 16 17 #[derive(Parser, Debug)] 18 #[command(name = "webhook-worker")] 19 #[command(version)] 20 #[command(about = "Background process that executes webhooks")] 21 struct Args { 22 /// Configuration file path 23 #[arg(short = 'c', long = "config", value_name = "FILE")] 24 config: String, 25 26 /// Run in test mode (exit when idle) 27 #[arg(short = 't', long = "test")] 28 test_mode: bool, 29 } 30 31 #[tokio::main] 32 async fn main() -> Result<()> { 33 // Init logging, tracing 34 tracing_subscriber::registry() 35 .with( 36 tracing_subscriber::EnvFilter::try_from_default_env() 37 .unwrap_or_else(|_| "oauth2_gateway=info,tower_http=info,sqlx=warn".into()), 38 ) 39 .with( 40 tracing_subscriber::fmt::layer() 41 .compact() 42 .with_ansi(false) 43 .with_timer(tracing_subscriber::fmt::time::LocalTime::rfc_3339()), 44 ) 45 .init(); 46 47 let args = Args::parse(); 48 49 tracing::info!("Starting webhook worker v{}", env!("CARGO_PKG_VERSION")); 50 tracing::info!("Loading configuration from: {}", args.config); 51 52 let config = Config::from_file(&args.config)?; 53 54 tracing::info!("Connecting to database: {}", config.database.url); 55 let pool = db::create_pool(&config.database.url).await?; 56 57 // Set up shutdown signal handling 58 let (shutdown_tx, shutdown_rx) = watch::channel(false); 59 60 // Spawn signal handler task 61 tokio::spawn(async move { 62 let ctrl_c = async { 63 signal::ctrl_c() 64 .await 65 .expect("Failed to install Ctrl+C handler"); 66 }; 67 68 #[cfg(unix)] 69 let terminate = async { 70 signal::unix::signal(signal::unix::SignalKind::terminate()) 71 .expect("Failed to install SIGTERM handler") 72 .recv() 73 .await; 74 }; 75 76 #[cfg(not(unix))] 77 let terminate = std::future::pending::<()>(); 78 79 tokio::select! { 80 _ = ctrl_c => { 81 tracing::info!("Received Ctrl+C, initiating shutdown"); 82 } 83 _ = terminate => { 84 tracing::info!("Received SIGTERM, initiating shutdown"); 85 } 86 } 87 88 let _ = shutdown_tx.send(true); 89 }); 90 91 // Run the worker 92 worker::run_worker(pool, &config.webhook_worker, args.test_mode, shutdown_rx).await?; 93 94 tracing::info!("Webhook worker exited cleanly"); 95 Ok(()) 96 }