kych

OAuth 2.0 API for Swiyu to enable Taler integration of Swiyu for KYC (experimental)
Log | Files | Refs

webhook_worker.rs (2918B)


      1 //! Webhook worker daemon binary
      2 //!
      3 //! Background process that delivers webhooks to client endpoints.
      4 //! Uses PostgreSQL NOTIFY/LISTEN for event-driven wake-up.
      5 //!
      6 //! Usage:
      7 //!   webhook-worker -c config.ini       # Run normally
      8 //!   webhook-worker -c config.ini -t    # Test mode (exit when idle)
      9 
     10 use oauth2_gateway::{config::Config, db, worker};
     11 use anyhow::Result;
     12 use clap::Parser;
     13 use tokio::signal;
     14 use tokio::sync::watch;
     15 use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
     16 
     17 #[derive(Parser, Debug)]
     18 #[command(name = "webhook-worker")]
     19 #[command(version)]
     20 #[command(about = "Background process that executes webhooks")]
     21 struct Args {
     22     /// Configuration file path
     23     #[arg(short = 'c', long = "config", value_name = "FILE")]
     24     config: String,
     25 
     26     /// Run in test mode (exit when idle)
     27     #[arg(short = 't', long = "test")]
     28     test_mode: bool,
     29 }
     30 
     31 #[tokio::main]
     32 async fn main() -> Result<()> {
     33     // Init logging, tracing
     34     tracing_subscriber::registry()
     35         .with(
     36             tracing_subscriber::EnvFilter::try_from_default_env()
     37                 .unwrap_or_else(|_| "oauth2_gateway=info,tower_http=info,sqlx=warn".into()),
     38         )
     39         .with(
     40             tracing_subscriber::fmt::layer()
     41                 .compact()
     42                 .with_ansi(false)
     43                 .with_timer(tracing_subscriber::fmt::time::LocalTime::rfc_3339()),
     44         )
     45         .init();
     46 
     47     let args = Args::parse();
     48 
     49     tracing::info!("Starting webhook worker v{}", env!("CARGO_PKG_VERSION"));
     50     tracing::info!("Loading configuration from: {}", args.config);
     51 
     52     let config = Config::from_file(&args.config)?;
     53 
     54     tracing::info!("Connecting to database: {}", config.database.url);
     55     let pool = db::create_pool(&config.database.url).await?;
     56 
     57     // Set up shutdown signal handling
     58     let (shutdown_tx, shutdown_rx) = watch::channel(false);
     59 
     60     // Spawn signal handler task
     61     tokio::spawn(async move {
     62         let ctrl_c = async {
     63             signal::ctrl_c()
     64                 .await
     65                 .expect("Failed to install Ctrl+C handler");
     66         };
     67 
     68         #[cfg(unix)]
     69         let terminate = async {
     70             signal::unix::signal(signal::unix::SignalKind::terminate())
     71                 .expect("Failed to install SIGTERM handler")
     72                 .recv()
     73                 .await;
     74         };
     75 
     76         #[cfg(not(unix))]
     77         let terminate = std::future::pending::<()>();
     78 
     79         tokio::select! {
     80             _ = ctrl_c => {
     81                 tracing::info!("Received Ctrl+C, initiating shutdown");
     82             }
     83             _ = terminate => {
     84                 tracing::info!("Received SIGTERM, initiating shutdown");
     85             }
     86         }
     87 
     88         let _ = shutdown_tx.send(true);
     89     });
     90 
     91     // Run the worker
     92     worker::run_worker(pool, &config.webhook_worker, args.test_mode, shutdown_rx).await?;
     93 
     94     tracing::info!("Webhook worker exited cleanly");
     95     Ok(())
     96 }