kych

OAuth 2.0 API for Swiyu to enable Taler integration of Swiyu for KYC (experimental)
Log | Files | Refs

commit 70ba61ef26d99196ab86ff8564d0ed107ff06578
parent 227a247fb717169a6b9969d1b42f2d775200ed2c
Author: Henrique Chan Carvalho Machado <henriqueccmachado@tecnico.ulisboa.pt>
Date:   Sun, 23 Nov 2025 16:46:47 +0100

oauth2_gateway: implement api endpoints according to new sequence diagrams; add webhook worker

Diffstat:
Moauth2_gateway/Cargo.toml | 4++++
Moauth2_gateway/config.example.ini | 6------
Doauth2_gateway/scripts/test_integration.sh | 171-------------------------------------------------------------------------------
Aoauth2_gateway/src/bin/cli.rs | 306+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Aoauth2_gateway/src/bin/webhook_worker.rs | 92+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Moauth2_gateway/src/config.rs | 4+---
Moauth2_gateway/src/db/clients.rs | 2+-
Moauth2_gateway/src/handlers.rs | 591++++++++++++++++++++++++++++++++++++++++++++-----------------------------------
Moauth2_gateway/src/lib.rs | 5+++--
Moauth2_gateway/src/main.rs | 5+++--
Moauth2_gateway/src/models.rs | 23+++++++++++++++++------
Moauth2_gateway/src/state.rs | 2++
Aoauth2_gateway/src/worker.rs | 264+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
13 files changed, 1024 insertions(+), 451 deletions(-)

diff --git a/oauth2_gateway/Cargo.toml b/oauth2_gateway/Cargo.toml @@ -11,6 +11,10 @@ path = "src/lib.rs" name = "oauth2-gateway" path = "src/main.rs" +[[bin]] +name = "webhook-worker" +path = "src/bin/webhook_worker.rs" + [dependencies] # Web framework axum = "0.8.6" diff --git a/oauth2_gateway/config.example.ini b/oauth2_gateway/config.example.ini @@ -1,12 +1,6 @@ -# OAuth2 Gateway Configuration -# -# NOTE: Client-specific configuration (notification URLs, verifier URLs) is stored -# in the database, not in this file. - [server] host = 127.0.0.1 port = 9090 [database] -# PostgreSQL connection string url = postgresql://oauth2gw:password@localhost/oauth2gw diff --git a/oauth2_gateway/scripts/test_integration.sh b/oauth2_gateway/scripts/test_integration.sh @@ -1,171 +0,0 @@ -#!/bin/bash - -set -e - -GATEWAY_URL="http://localhost:9090" -VERIFIER_URL="http://localhost:8080" -CLIENT_ID="test-exchange-001" -SCOPE="age_over_18" -QR_CODE_FILE="oauth2gw_qr_code.png" - -echo "================================================================" -echo "OAuth2 Gateway Integration Test" -echo "================================================================" -echo "" -echo "Prerequisites:" -echo " - OAuth2 Gateway running at $GATEWAY_URL" -echo " - Swiyu Verifier running at $VERIFIER_URL" -echo " - Verifier configured with WEBHOOK_CALLBACK_URI=$GATEWAY_URL/notification" -echo " - Test database seeded with test-exchange-001 client" -echo " - qrencode installed (for QR code generation)" -echo "" -echo "================================================================" -echo "" - -echo "[1/4] Testing /setup endpoint..." -SETUP_RESPONSE=$(curl -s -X POST "$GATEWAY_URL/setup/$CLIENT_ID" \ - -H "Content-Type: application/json" \ - -d "{\"scope\": \"$SCOPE\"}") - -echo "Response: $SETUP_RESPONSE" - -NONCE=$(echo "$SETUP_RESPONSE" | jq -r '.nonce') -if [ -z "$NONCE" ] || [ "$NONCE" = "null" ]; then - echo "FAILED: No nonce returned" - exit 1 -fi -echo "SUCCESS: Received nonce: $NONCE" -echo "" - -echo "[2/4] Testing /authorize endpoint..." -AUTHORIZE_RESPONSE=$(curl -s -X GET "$GATEWAY_URL/authorize/$NONCE") - -echo "Response: $AUTHORIZE_RESPONSE" -echo "" - -VERIFICATION_URL=$(echo "$AUTHORIZE_RESPONSE" | jq -r '.verification_url') -VERIFICATION_ID=$(echo "$AUTHORIZE_RESPONSE" | jq -r '.verificationId') - -if [ -z "$VERIFICATION_URL" ] || [ "$VERIFICATION_URL" = "null" ]; then - echo "FAILED: No verification_url returned" - exit 1 -fi - -if [ -z "$VERIFICATION_ID" ] || [ "$VERIFICATION_ID" = "null" ]; then - echo "FAILED: No verificationId returned" - exit 1 -fi - -echo "SUCCESS: Received verification URL: $VERIFICATION_URL" -echo "SUCCESS: Received verification ID: $VERIFICATION_ID" -echo "" - -echo "Generating QR code..." -echo "$VERIFICATION_URL" | tee /dev/tty | xargs qrencode -o "$QR_CODE_FILE" -echo "QR code saved to: $QR_CODE_FILE" -echo "" - -open "$QR_CODE_FILE" - -echo "" -echo "[3/4] Waiting for user to scan QR code with wallet..." -echo "Please scan the QR code and complete the verification in your wallet." -echo "Waiting 60 seconds for verification to complete..." -echo "" - -# Poll for session status changes -MAX_WAIT=60 -ELAPSED=0 -INITIAL_STATUS="" - -while [ $ELAPSED -lt $MAX_WAIT ]; do - # Check session status by querying verifier - VERIFIER_STATUS=$(curl -s "$VERIFIER_URL/management/api/verifications/$VERIFICATION_ID" | jq -r '.state' 2>/dev/null || echo "PENDING") - - if [ -z "$INITIAL_STATUS" ]; then - INITIAL_STATUS="$VERIFIER_STATUS" - echo "Initial verification status: $INITIAL_STATUS" - fi - - if [ "$VERIFIER_STATUS" != "PENDING" ] && [ "$VERIFIER_STATUS" != "null" ]; then - echo "Verification status changed to: $VERIFIER_STATUS" - break - fi - - echo -n "." - sleep 20 - ELAPSED=$((ELAPSED + 20)) -done - -echo "" -echo "" - -if [ "$VERIFIER_STATUS" = "PENDING" ] || [ "$VERIFIER_STATUS" = "null" ]; then - echo "WARNING: Verification still pending after $MAX_WAIT seconds" - echo "Skipping webhook test - you can complete verification later" -else - echo "SUCCESS: Verification completed with status: $VERIFIER_STATUS" - echo "" - echo "[4/4] Testing webhook notification..." - echo "Waiting for Swiyu Verifier to send webhook to Gateway..." - echo "Webhook should be sent within the configured interval (typically 5-30 seconds)." - echo "" - - # Wait for webhook to be sent and processed (webhook interval + processing time) - echo "Waiting 15 seconds for webhook delivery and processing..." - sleep 15 - - # Check if webhook was processed by querying the database - echo "" - echo "Verifying webhook was processed..." - - SESSION_STATUS=$(psql -h localhost -p 5432 -U oauth2gw -d oauth2gw -tAc "SELECT status FROM oauth2gw.verification_sessions WHERE nonce = '$NONCE';" 2>/dev/null || echo "") - - if [ -n "$SESSION_STATUS" ]; then - echo "Session status in database: $SESSION_STATUS" - - if [ "$VERIFIER_STATUS" = "SUCCESS" ] && [ "$SESSION_STATUS" = "verified" ]; then - echo "SUCCESS: Webhook processed correctly - session marked as verified" - elif [ "$VERIFIER_STATUS" = "FAILED" ] && [ "$SESSION_STATUS" = "failed" ]; then - echo "SUCCESS: Webhook processed correctly - session marked as failed" - else - echo "WARNING: Session status '$SESSION_STATUS' doesn't match expected state for verifier status '$VERIFIER_STATUS'" - echo "Check Gateway logs for webhook processing details" - fi - else - echo "INFO: Could not query database to verify webhook processing" - echo "Check Gateway logs manually for:" - echo " - 'Webhook received from Swiyu: verification_id=$VERIFICATION_ID'" - echo " - 'Updated session ... status to Verified' (if SUCCESS)" - echo " - 'Updated session ... status to Failed' (if FAILED)" - fi -fi - -echo "" -echo "================================================================" -echo "Integration test completed" -echo "================================================================" -echo "" -echo "Summary:" -echo " [OK] /setup endpoint: Created session with nonce" -echo " [OK] /authorize endpoint: Got verification URL from Swiyu" -if [ "$VERIFIER_STATUS" = "SUCCESS" ]; then - echo " [OK] User verification: Completed successfully" - if [ -n "$SESSION_STATUS" ] && [ "$SESSION_STATUS" = "verified" ]; then - echo " [OK] Webhook notification: Session status updated to verified" - else - echo " [!!] Webhook notification: Could not verify - check Gateway logs" - fi -elif [ "$VERIFIER_STATUS" = "FAILED" ]; then - echo " [FAIL] User verification: Failed" - if [ -n "$SESSION_STATUS" ] && [ "$SESSION_STATUS" = "failed" ]; then - echo " [OK] Webhook notification: Session status updated to failed" - else - echo " [!!] Webhook notification: Could not verify - check Gateway logs" - fi -else - echo " [WAIT] User verification: Still pending (timeout after ${MAX_WAIT}s)" - echo " - Complete verification manually and webhook will be sent" -fi -echo "" -echo "================================================================" diff --git a/oauth2_gateway/src/bin/cli.rs b/oauth2_gateway/src/bin/cli.rs @@ -0,0 +1,306 @@ +//! OAuth2 Gateway CLI +//! +//! Command-line tool for managing OAuth2 Gateway clients. +//! +//! Usage: +//! oauth2gw-cli -c config.ini client list +//! oauth2gw-cli -c config.ini client show <client_id> +//! oauth2gw-cli -c config.ini client create --client-id <id> --secret <secret> ... +//! oauth2gw-cli -c config.ini client update <client_id> --webhook-url <url> +//! oauth2gw-cli -c config.ini client delete <client_id> + +use anyhow::{Context, Result}; +use clap::{Parser, Subcommand}; +use oauth2_gateway::{config::Config, db}; + +#[derive(Parser, Debug)] +#[command(name = "oauth2gw-cli")] +#[command(version)] +#[command(about = "OAuth2 Gateway administration CLI")] +struct Args { + /// Configuration file path + #[arg(short = 'c', long = "config", value_name = "FILE")] + config: String, + + #[command(subcommand)] + command: Commands, +} + +#[derive(Subcommand, Debug)] +enum Commands { + /// Manage clients + Client { + #[command(subcommand)] + action: ClientAction, + }, +} + +#[derive(Subcommand, Debug)] +enum ClientAction { + /// List all registered clients + List, + + /// Show details for a specific client + Show { + /// Client ID to show + client_id: String, + }, + + /// Register a new client + Create { + /// Unique client identifier + #[arg(long)] + client_id: String, + + /// Client secret (stored as hash) + #[arg(long)] + secret: String, + + /// Webhook URL for notifications + #[arg(long)] + webhook_url: String, + + /// Swiyu verifier base URL + #[arg(long)] + verifier_url: String, + + /// Verifier management API path (default: /management/api/verifications) + #[arg(long)] + verifier_api_path: Option<String>, + }, + + /// Update an existing client + Update { + /// Client ID to update + client_id: String, + + /// New webhook URL + #[arg(long)] + webhook_url: Option<String>, + + /// New verifier URL + #[arg(long)] + verifier_url: Option<String>, + + /// New verifier management API path + #[arg(long)] + verifier_api_path: Option<String>, + }, + + /// Delete a client (WARNING: cascades to all sessions) + Delete { + /// Client ID to delete + client_id: String, + + /// Skip confirmation prompt + #[arg(long, short = 'y')] + yes: bool, + }, +} + +#[tokio::main] +async fn main() -> Result<()> { + let args = Args::parse(); + + let config = Config::from_file(&args.config) + .with_context(|| format!("Failed to load config from: {}", args.config))?; + + let pool = db::create_pool(&config.database.url) + .await + .context("Failed to connect to database")?; + + match args.command { + Commands::Client { action } => match action { + ClientAction::List => cmd_list_clients(&pool).await?, + ClientAction::Show { client_id } => cmd_show_client(&pool, &client_id).await?, + ClientAction::Create { + client_id, + secret, + webhook_url, + verifier_url, + verifier_api_path, + } => { + cmd_create_client( + &pool, + &client_id, + &secret, + &webhook_url, + &verifier_url, + verifier_api_path.as_deref(), + ) + .await? + } + ClientAction::Update { + client_id, + webhook_url, + verifier_url, + verifier_api_path, + } => { + cmd_update_client( + &pool, + &client_id, + webhook_url.as_deref(), + verifier_url.as_deref(), + verifier_api_path.as_deref(), + ) + .await? + } + ClientAction::Delete { client_id, yes } => { + cmd_delete_client(&pool, &client_id, yes).await? + } + }, + } + + Ok(()) +} + +async fn cmd_list_clients(pool: &sqlx::PgPool) -> Result<()> { + let clients = db::clients::list_clients(pool).await?; + + if clients.is_empty() { + println!("No clients registered."); + return Ok(()); + } + + println!("{:<36} {:<20} {:<40} {}", "UUID", "CLIENT_ID", "WEBHOOK_URL", "CREATED"); + println!("{}", "-".repeat(120)); + + for client in clients { + println!( + "{:<36} {:<20} {:<40} {}", + client.id, + truncate(&client.client_id, 20), + truncate(&client.webhook_url, 40), + client.created_at.format("%Y-%m-%d %H:%M") + ); + } + + Ok(()) +} + +async fn cmd_show_client(pool: &sqlx::PgPool, client_id: &str) -> Result<()> { + let client = db::clients::get_client_by_id(pool, client_id) + .await? + .ok_or_else(|| anyhow::anyhow!("Client not found: {}", client_id))?; + + println!("Client Details"); + println!("{}", "=".repeat(60)); + println!("UUID: {}", client.id); + println!("Client ID: {}", client.client_id); + println!("Secret Hash: {}...", &client.secret_hash[..20.min(client.secret_hash.len())]); + println!("Webhook URL: {}", client.webhook_url); + println!("Verifier URL: {}", client.verifier_url); + println!("Verifier API Path: {}", client.verifier_management_api_path); + println!("Created: {}", client.created_at); + println!("Updated: {}", client.updated_at); + + Ok(()) +} + +async fn cmd_create_client( + pool: &sqlx::PgPool, + client_id: &str, + secret: &str, + webhook_url: &str, + verifier_url: &str, + verifier_api_path: Option<&str>, +) -> Result<()> { + let client = db::clients::register_client( + pool, + client_id, + secret, + webhook_url, + verifier_url, + verifier_api_path, + ) + .await + .context("Failed to create client")?; + + println!("Client created successfully."); + println!(); + println!("UUID: {}", client.id); + println!("Client ID: {}", client.client_id); + println!("Webhook URL: {}", client.webhook_url); + + Ok(()) +} + +async fn cmd_update_client( + pool: &sqlx::PgPool, + client_id: &str, + webhook_url: Option<&str>, + verifier_url: Option<&str>, + verifier_api_path: Option<&str>, +) -> Result<()> { + if webhook_url.is_none() && verifier_url.is_none() && verifier_api_path.is_none() { + anyhow::bail!("No fields to update. Specify at least one of: --webhook-url, --verifier-url, --verifier-api-path"); + } + + let client = db::clients::get_client_by_id(pool, client_id) + .await? + .ok_or_else(|| anyhow::anyhow!("Client not found: {}", client_id))?; + + let updated = db::clients::update_client( + pool, + client.id, + webhook_url, + verifier_url, + verifier_api_path, + ) + .await + .context("Failed to update client")?; + + println!("Client updated successfully."); + println!(); + println!("UUID: {}", updated.id); + println!("Client ID: {}", updated.client_id); + println!("Webhook URL: {}", updated.webhook_url); + println!("Verifier URL: {}", updated.verifier_url); + println!("Verifier API Path: {}", updated.verifier_management_api_path); + + Ok(()) +} + +async fn cmd_delete_client(pool: &sqlx::PgPool, client_id: &str, skip_confirm: bool) -> Result<()> { + let client = db::clients::get_client_by_id(pool, client_id) + .await? + .ok_or_else(|| anyhow::anyhow!("Client not found: {}", client_id))?; + + if !skip_confirm { + println!("WARNING: This will delete client '{}' and ALL associated data:", client_id); + println!(" - All sessions"); + println!(" - All tokens"); + println!(" - All pending webhooks"); + println!(); + print!("Type 'yes' to confirm: "); + + use std::io::{self, Write}; + io::stdout().flush()?; + + let mut input = String::new(); + io::stdin().read_line(&mut input)?; + + if input.trim() != "yes" { + println!("Aborted."); + return Ok(()); + } + } + + let deleted = db::clients::delete_client(pool, client.id).await?; + + if deleted { + println!("Client '{}' deleted successfully.", client_id); + } else { + println!("Client not found (may have been deleted already)."); + } + + Ok(()) +} + +fn truncate(s: &str, max_len: usize) -> String { + if s.len() <= max_len { + s.to_string() + } else { + format!("{}...", &s[..max_len - 3]) + } +} diff --git a/oauth2_gateway/src/bin/webhook_worker.rs b/oauth2_gateway/src/bin/webhook_worker.rs @@ -0,0 +1,91 @@ +//! Webhook worker daemon binary +//! +//! Background process that delivers webhooks to client endpoints. +//! Uses PostgreSQL NOTIFY/LISTEN for event-driven wake-up. +//! +//! Usage: +//! webhook-worker -c config.ini # Run normally +//! webhook-worker -c config.ini -t # Test mode (exit when idle) + +use oauth2_gateway::{config::Config, db, worker}; +use anyhow::Result; +use clap::Parser; +use tokio::signal; +use tokio::sync::watch; +use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; + +#[derive(Parser, Debug)] +#[command(name = "webhook-worker")] +#[command(version)] +#[command(about = "Background process that executes webhooks")] +struct Args { + /// Configuration file path + #[arg(short = 'c', long = "config", value_name = "FILE")] + config: String, + + /// Run in test mode (exit when idle) + #[arg(short = 't', long = "test")] + test_mode: bool, +} + +#[tokio::main] +async fn main() -> Result<()> { + // Init logging, tracing + tracing_subscriber::registry() + .with( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| "webhook_worker=info,oauth2_gateway=info,sqlx=warn".into()), + ) + .with(tracing_subscriber::fmt::layer()) + .init(); + + let args = Args::parse(); + + tracing::info!("Starting webhook worker v{}", env!("CARGO_PKG_VERSION")); + tracing::info!("Loading configuration from: {}", args.config); + + let config = Config::from_file(&args.config)?; + + tracing::info!("Connecting to database: {}", config.database.url); + let pool = db::create_pool(&config.database.url).await?; + + // Set up shutdown signal handling + let (shutdown_tx, shutdown_rx) = watch::channel(false); + + // Spawn signal handler task + tokio::spawn(async move { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("Failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("Failed to install SIGTERM handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + tracing::info!("Received Ctrl+C, initiating shutdown"); + } + _ = terminate => { + tracing::info!("Received SIGTERM, initiating shutdown"); + } + } + + let _ = shutdown_tx.send(true); + }); + + // Run the worker + worker::run_worker(pool, args.test_mode, shutdown_rx).await?; + + tracing::info!("Webhook worker exited cleanly"); + Ok(()) +} +\ No newline at end of file diff --git a/oauth2_gateway/src/config.rs b/oauth2_gateway/src/config.rs @@ -6,7 +6,7 @@ use std::path::Path; #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Config { pub server: ServerConfig, - pub database: DatabaseConfig, + pub database: DatabaseConfig } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -25,7 +25,6 @@ impl Config { let ini = Ini::load_from_file(path.as_ref()) .context("Failed to load config file")?; - // Server section let server_section = ini .section(Some("server")) .context("Missing [server] section")?; @@ -42,7 +41,6 @@ impl Config { .context("Invalid port")?, }; - // Database section let database_section = ini .section(Some("database")) .context("Missing [database] section")?; diff --git a/oauth2_gateway/src/db/clients.rs b/oauth2_gateway/src/db/clients.rs @@ -178,7 +178,7 @@ pub async fn delete_client( pub async fn list_clients(pool: &PgPool) -> Result<Vec<Client>> { let clients = sqlx::query_as::<_, Client>( r#" - SELECT id, client_id, client_secret, notification_url, verifier_base_url, + SELECT id, client_id, secret_hash, webhook_url, verifier_url, verifier_management_api_path, created_at, updated_at FROM oauth2gw.clients ORDER BY created_at DESC diff --git a/oauth2_gateway/src/handlers.rs b/oauth2_gateway/src/handlers.rs @@ -1,12 +1,14 @@ use axum::{ - extract::{Path, State}, - http::StatusCode, + extract::{Path, Query, State}, + http::{header, StatusCode}, response::IntoResponse, Json, }; use serde_json::json; +use chrono::Utc; use crate::{ + db::sessions::SessionStatus, models::*, state::AppState, crypto, @@ -17,7 +19,6 @@ pub async fn health_check() -> impl IntoResponse { Json(json!({ "status": "healthy", "service": "oauth2-gateway", - // "version": env!("CARGO_PKG_VERSION") // leak version info? })) } @@ -27,35 +28,15 @@ pub async fn setup( Path(client_id): Path<String>, Json(request): Json<SetupRequest>, ) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> { - tracing::info!("Setup request for client: {}, scope: {}", client_id, request.scope); + tracing::info!("Setup request for client: {}, scope: {}", + client_id, request.scope); - // Look up client in database - let client = crate::db::clients::get_client_by_id(&state.pool, &client_id) - .await - .map_err(|e| { - tracing::error!("Database error looking up client {}: {}", client_id, e); - (StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse::new("internal_error"))) - })?; - - let client = match client { - Some(c) => c, - None => { - tracing::warn!("Client not found: {}", client_id); - return Err((StatusCode::NOT_FOUND, Json(ErrorResponse::new("client_not_found")))); - } - }; - - tracing::debug!("Found client: {} (UUID: {})", client.client_id, client.id); - - // Generate cryptographically secure nonce let nonce = crypto::generate_nonce(); - tracing::info!("Generated nonce: {}", nonce); + tracing::debug!("Generated nonce: {}", nonce); - // Create verification session in database - // TODO: Should this be transactional? - let _session = crate::db::sessions::create_session( + let session = crate::db::sessions::create_session( &state.pool, - client.id, + &client_id, &nonce, &request.scope, 15, // 15 minutes expiration @@ -63,136 +44,163 @@ pub async fn setup( .await .map_err(|e| { tracing::error!("Failed to create session: {}", e); - (StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse::new("internal_error"))) + (StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse::new("internal_error"))) })?; - tracing::info!("Created session for client {} with nonce {}", client_id, nonce); + let session = match session { + Some(s) => s, + None => { + tracing::warn!("Client not found: {}", client_id); + return Err((StatusCode::NOT_FOUND, + Json(ErrorResponse::new("client_not_found")))) + } + }; + + tracing::info!("Created session {} for client {} with nonce {}", + session.id, client_id, nonce); Ok((StatusCode::OK, Json(SetupResponse { nonce }))) } -// GET /authorize/{nonce} +// GET /authorize?response_type=code&client_id={client_id}&nonce={nonce} pub async fn authorize( State(state): State<AppState>, - Path(nonce): Path<String>, + Query(params): Query<AuthorizeQuery>, ) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> { - tracing::info!("Authorize request for nonce: {}", nonce); + tracing::info!("Authorize request for client: {}, nonce: {}", + params.client_id, params.nonce); - // Look up session by nonce - let session = crate::db::sessions::get_session_by_nonce(&state.pool, &nonce) - .await - .map_err(|e| { - tracing::error!("Database error looking up session: {}", e); - (StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse::new("internal_error"))) - })?; + // Validate response_type + if params.response_type != "code" { + return Err((StatusCode::BAD_REQUEST, + Json(ErrorResponse::new("invalid_request")))); + } - let session = match session { - Some(js) => js, + // Fetch session and client data (idempotent) + let session_data = crate::db::sessions::get_session_for_authorize( + &state.pool, + &params.nonce, + &params.client_id, + ) + .await + .map_err(|e| { + tracing::error!("DB error in authorize: {}", e); + (StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse::new("internal_error"))) + })?; + + let data = match session_data { + Some(d) => d, None => { - tracing::warn!("Session not found for nonce: {}", nonce); - return Err((StatusCode::NOT_FOUND, Json(ErrorResponse::new("session_not_found")))); + tracing::warn!("Session not found for nonce: {}", params.nonce); + return Err((StatusCode::NOT_FOUND, + Json(ErrorResponse::new("session_not_found")))); } }; - // Validate pending state - if session.status != crate::db::sessions::SessionStatus::Pending { - tracing::warn!("Session {} is not in pending state: {:?}", session.id, session.status); - return Err((StatusCode::BAD_REQUEST, Json(ErrorResponse::new("invalid_session_state")))); + // Backend validation + if data.expires_at < Utc::now() { + tracing::warn!("Session expired: {}", data.session_id); + return Err((StatusCode::GONE, + Json(ErrorResponse::new("session_expired")))); } - // Check if session expired - let now = chrono::Utc::now(); - if now > session.expires_at { - tracing::warn!("Session {} has expired", session.id); - return Err((StatusCode::BAD_REQUEST, Json(ErrorResponse::new("session_expired")))); + // Check status for idempotency + match data.status { + SessionStatus::Authorized => { + // Already authorized - return cached response + tracing::info!("Session {} already authorized, returning cached response", + data.session_id); + let verification_id = data.request_id + .and_then(|id| uuid::Uuid::parse_str(&id).ok()) + .unwrap_or(uuid::Uuid::nil()); + return Ok((StatusCode::OK, Json(AuthorizeResponse { + verification_id, + verification_url: data.verification_url.unwrap_or_default(), + }))); + } + SessionStatus::Pending => { + // Proceed with authorization + } + _ => { + tracing::warn!("Session {} in invalid status: {:?}", + data.session_id, data.status); + return Err((StatusCode::CONFLICT, + Json(ErrorResponse::new("invalid_session_status")))); + } } - // Look up client - let client = crate::db::clients::get_client_by_uuid(&state.pool, session.client_id) - .await - .map_err(|e| { - tracing::error!("Database error looking up client: {}", e); - (StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse::new("internal_error"))) - })? - .ok_or_else(|| { - tracing::error!("Client {} not found for session {}", session.client_id, session.id); - (StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse::new("internal_error"))) - })?; - - tracing::debug!("Found client {} for session {}", client.client_id, session.id); - // Build presentation definition from scope - let presentation_definition = build_presentation_definition(&session.scope); - - // Build Swiyu API request - // Note: response_mode, presentation_definition, configuration_override are REQUIRED - let swiyu_request = SwiyuCreateVerificationRequest { - accepted_issuer_dids: None, // Accept all issuers by default - trust_anchors: None, // No trust anchors by default - jwt_secured_authorization_request: Some(true), // Beta requires true for JWT-signed requests - response_mode: ResponseMode::DirectPost, // REQUIRED - presentation_definition, // REQUIRED - configuration_override: ConfigurationOverride::default(), // REQUIRED (empty is OK) - dcql_query: None, // Using Presentation Exchange, not DCQL + let presentation_definition = build_presentation_definition(&data.scope); + + // Call Swiyu Verifier + let verifier_url = format!("{}{}", + data.verifier_url, + data.verifier_management_api_path); + + let verifier_request = SwiyuCreateVerificationRequest { + accepted_issuer_dids: None, + trust_anchors: None, + jwt_secured_authorization_request: Some(true), + response_mode: ResponseMode::DirectPostJwt, + presentation_definition, + configuration_override: ConfigurationOverride::default(), + dcql_query: None, }; - // Call Swiyu Verifier API - let swiyu_url = format!("{}{}", client.verifier_base_url, client.verifier_management_api_path); - tracing::info!("Calling Swiyu Verifier API: {}", swiyu_url); + tracing::debug!("Calling Swiyu verifier at: {}", verifier_url); - let http_client = reqwest::Client::new(); - let swiyu_response = http_client - .post(&swiyu_url) - .json(&swiyu_request) + let verifier_response = state.http_client + .post(&verifier_url) + .json(&verifier_request) .send() .await .map_err(|e| { - tracing::error!("Failed to call Swiyu Verifier API: {}", e); - (StatusCode::SERVICE_UNAVAILABLE, Json(ErrorResponse::new("verifier_unavailable"))) + tracing::error!("Failed to call Swiyu verifier: {}", e); + (StatusCode::BAD_GATEWAY, + Json(ErrorResponse::new("verifier_unavailable"))) })?; - if !swiyu_response.status().is_success() { - let status = swiyu_response.status(); - let error_body = swiyu_response.text().await.unwrap_or_default(); - tracing::error!("Swiyu Verifier returned error {}: {}", status, error_body); - return Err((StatusCode::BAD_GATEWAY, Json(ErrorResponse::new("verifier_error")))); + if !verifier_response.status().is_success() { + let status = verifier_response.status(); + let body = verifier_response.text().await.unwrap_or_default(); + tracing::error!("Swiyu verifier returned error {}: {}", status, body); + return Err((StatusCode::BAD_GATEWAY, + Json(ErrorResponse::new("verifier_error")))); } - let swiyu_verification: SwiyuManagementResponse = swiyu_response + let swiyu_response: SwiyuManagementResponse = verifier_response .json() .await .map_err(|e| { tracing::error!("Failed to parse Swiyu response: {}", e); - (StatusCode::BAD_GATEWAY, Json(ErrorResponse::new("verifier_invalid_response"))) + (StatusCode::BAD_GATEWAY, + Json(ErrorResponse::new("verifier_invalid_response"))) })?; - tracing::info!( - "Created Swiyu verification: id={}, url={}", - swiyu_verification.id, - swiyu_verification.verification_url - ); - - // Update session with verification data - crate::db::sessions::set_session_authorized( + // Update session with verifier data + let result = crate::db::sessions::update_session_authorized( &state.pool, - session.id, - &swiyu_verification.verification_url, - &swiyu_verification.id.to_string(), + data.session_id, + &swiyu_response.verification_url, + &swiyu_response.id.to_string(), + swiyu_response.request_nonce.as_deref(), ) .await .map_err(|e| { tracing::error!("Failed to update session: {}", e); - (StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse::new("internal_error"))) + (StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse::new("internal_error"))) })?; - tracing::info!("Session {} updated with verification data", session.id); + tracing::info!("Session {} authorized, verification_id: {}", + data.session_id, swiyu_response.id); - let response = AuthorizeResponse { - verification_id: swiyu_verification.id, - verification_url: swiyu_verification.verification_url, - }; - - Ok((StatusCode::OK, Json(response))) + Ok((StatusCode::OK, Json(AuthorizeResponse { + verification_id: swiyu_response.id, + verification_url: result.verification_url, + }))) } /// Build a presentation definition from a space-delimited scope string @@ -202,12 +210,11 @@ fn build_presentation_definition(scope: &str) -> PresentationDefinition { use uuid::Uuid; use std::collections::HashMap; - // Parse scope into individual attributes let attributes: Vec<&str> = scope.split_whitespace().collect(); - tracing::debug!("Building presentation definition for attributes: {:?}", attributes); + tracing::debug!("Building presentation definition for attributes: {:?}", + attributes); - // Create a field for each attribute let fields: Vec<Field> = attributes .iter() .map(|attr| Field { @@ -219,7 +226,6 @@ fn build_presentation_definition(scope: &str) -> PresentationDefinition { }) .collect(); - // Create format specification for SD-JWT with ES256 let mut format = HashMap::new(); format.insert( "vc+sd-jwt".to_string(), @@ -229,7 +235,6 @@ fn build_presentation_definition(scope: &str) -> PresentationDefinition { }, ); - // Build input descriptor let input_descriptor = InputDescriptor { id: Uuid::new_v4().to_string(), name: Some("Requested credentials".to_string()), @@ -249,209 +254,274 @@ fn build_presentation_definition(scope: &str) -> PresentationDefinition { // POST /token pub async fn token( - State(_state): State<AppState>, + State(state): State<AppState>, Json(request): Json<TokenRequest>, ) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> { tracing::info!("Token request for code: {}", request.code); + // Validate grant_type if request.grant_type != "authorization_code" { return Err(( StatusCode::BAD_REQUEST, - Json(ErrorResponse::new("invalid_grant_type")) + Json(ErrorResponse::new("unsupported_grant_type")) )); } - // TODO: Validate nonce/code + // Fetch code (idempotent) + let code_data = crate::db::authorization_codes::get_code_for_token_exchange( + &state.pool, + &request.code, + ) + .await + .map_err(|e| { + tracing::error!("DB error in token exchange: {}", e); + (StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse::new("internal_error"))) + })?; + + let data = match code_data { + Some(d) => d, + None => { + tracing::warn!("Authorization code not found or expired: {}", request.code); + return Err((StatusCode::BAD_REQUEST, + Json(ErrorResponse::new("invalid_grant")))); + } + }; + + // Check for existing token + if let Some(existing_token) = data.existing_token { + tracing::info!("Token already exists for session {}, returning cached response", + data.session_id); + return Ok((StatusCode::OK, Json(TokenResponse { + access_token: existing_token, + token_type: "Bearer".to_string(), + expires_in: 3600, + }))); + } + // Check if code was already used + if data.was_already_used { + tracing::warn!("Authorization code {} was already used", request.code); + return Err((StatusCode::BAD_REQUEST, + Json(ErrorResponse::new("invalid_grant")))); + } + + // Validate session status + if data.session_status != SessionStatus::Verified { + tracing::warn!("Session {} not in verified status: {:?}", + data.session_id, data.session_status); + return Err((StatusCode::BAD_REQUEST, + Json(ErrorResponse::new("invalid_grant")))); + } + + // Generate new token and complete session let access_token = crypto::generate_nonce(); - - let response = TokenResponse { - access_token, + let token = crate::db::tokens::create_token_and_complete_session( + &state.pool, + data.session_id, + &access_token, + 3600, // 1 hour + ) + .await + .map_err(|e| { + tracing::error!("Failed to create token: {}", e); + (StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse::new("internal_error"))) + })?; + + tracing::info!("Token created for session {}", data.session_id); + + Ok((StatusCode::OK, Json(TokenResponse { + access_token: token.token, token_type: "Bearer".to_string(), expires_in: 3600, - }; - - Ok((StatusCode::OK, Json(response))) + }))) } // GET /info pub async fn info( - State(_state): State<AppState>, -) -> impl IntoResponse { + State(state): State<AppState>, + headers: axum::http::HeaderMap, +) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> { tracing::info!("Info request received"); - // TODO: Validate access token - // TODO: Call the SwiyuVerifier to retrieve the VC data - - let credential = VerifiableCredential { - data: json!({ - "@context": ["https://www.w3.org/2018/credentials/v1"], - "type": ["VerifiableCredential"], - "credentialSubject": { - "givenName": "John", - "familyName": "Doe", - "dateOfBirth": "1990-01-01", - "age_over_18": true, - } - }), + // Extract token from Authorization header + let auth_header = headers + .get(header::AUTHORIZATION) + .and_then(|h| h.to_str().ok()); + + let token = match auth_header { + Some(h) if h.starts_with("Bearer ") => &h[7..], + _ => { + tracing::warn!("Missing or malformed Authorization header"); + return Err((StatusCode::UNAUTHORIZED, + Json(ErrorResponse::new("invalid_token")))); + } }; - (StatusCode::OK, Json(credential)) + // Fetch token with session data (idempotent) + let token_data = crate::db::tokens::get_token_with_session(&state.pool, token) + .await + .map_err(|e| { + tracing::error!("DB error in info: {}", e); + (StatusCode::INTERNAL_SERVER_ERROR, + Json(ErrorResponse::new("internal_error"))) + })?; + + let data = match token_data { + Some(d) => d, + None => { + tracing::warn!("Token not found or expired"); + return Err((StatusCode::UNAUTHORIZED, + Json(ErrorResponse::new("invalid_token")))); + } + }; + + // Validate token + if data.revoked { + tracing::warn!("Token {} is revoked", data.token_id); + return Err((StatusCode::UNAUTHORIZED, + Json(ErrorResponse::new("invalid_token")))); + } + + if data.session_status != SessionStatus::Completed { + tracing::warn!("Session not completed: {:?}", data.session_status); + return Err((StatusCode::UNAUTHORIZED, + Json(ErrorResponse::new("invalid_token")))); + } + + // Return verifiable credential + let credential = VerifiableCredential { + data: data.verifiable_credential.unwrap_or(json!({})), + }; + + tracing::info!("Returning credential for token {}", data.token_id); + + Ok((StatusCode::OK, Json(credential))) } // POST /notification +// Always returns 200 OK to Swiyu - errors are logged internally pub async fn notification_webhook( State(state): State<AppState>, Json(webhook): Json<NotificationRequest>, -) -> Result<impl IntoResponse, (StatusCode, Json<ErrorResponse>)> { +) -> impl IntoResponse { tracing::info!( "Webhook received from Swiyu: verification_id={}, timestamp={}", webhook.verification_id, webhook.timestamp ); - // Look up session by request_id (Swiyu's verification_id) - let session = crate::db::sessions::get_session_by_request_id(&state.pool, &webhook.verification_id.to_string()) - .await - .map_err(|e| { - tracing::error!("Database error looking up session: {}", e); - (StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse::new("internal_error"))) - })?; - - let session = match session { - Some(s) => s, - None => { + // Lookup session by request_id (verification_id) + let session_data = match crate::db::sessions::get_session_for_notification( + &state.pool, + &webhook.verification_id.to_string(), + ).await { + Ok(Some(data)) => data, + Ok(None) => { tracing::warn!("Session not found for verification_id: {}", webhook.verification_id); - return Err((StatusCode::NOT_FOUND, Json(ErrorResponse::new("session_not_found")))); + return StatusCode::OK; } - }; - - tracing::info!("Found session {} for client_id {}", session.id, session.client_id); - - // Get client info to know which verifier to call - let client = crate::db::clients::get_client_by_uuid(&state.pool, session.client_id) - .await - .map_err(|e| { - tracing::error!("Database error looking up client: {}", e); - (StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse::new("internal_error"))) - })?; - - let client = match client { - Some(c) => c, - None => { - tracing::error!("Client {} not found for session {}", session.client_id, session.id); - return Err((StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse::new("internal_error")))); + Err(e) => { + tracing::error!("DB error looking up session: {}", e); + return StatusCode::OK; } }; - // Call Swiyu Verifier to get verification status - let swiyu_url = format!( + // Validate session status + if session_data.status != SessionStatus::Authorized { + tracing::warn!( + "Session {} not in authorized status: {:?}", + session_data.session_id, session_data.status + ); + return StatusCode::OK; + } + + // Call Swiyu verifier to get verification result + let verifier_url = format!( "{}{}/{}", - client.verifier_base_url, - client.verifier_management_api_path, + session_data.verifier_url, + session_data.verifier_management_api_path, webhook.verification_id ); - tracing::info!("Fetching verification status from: {}", swiyu_url); - let http_client = reqwest::Client::new(); - let swiyu_response = http_client - .get(&swiyu_url) - .send() - .await - .map_err(|e| { - tracing::error!("Failed to call Swiyu Verifier API: {}", e); - (StatusCode::SERVICE_UNAVAILABLE, Json(ErrorResponse::new("verifier_unavailable"))) - })?; + tracing::debug!("Fetching verification result from: {}", verifier_url); - if !swiyu_response.status().is_success() { - let status = swiyu_response.status(); - let error_body = swiyu_response.text().await.unwrap_or_default(); - tracing::error!("Swiyu Verifier returned error {}: {}", status, error_body); - return Err((StatusCode::BAD_GATEWAY, Json(ErrorResponse::new("verifier_error")))); + let verifier_response = match state.http_client.get(&verifier_url).send().await { + Ok(resp) => resp, + Err(e) => { + tracing::error!("Failed to call Swiyu verifier: {}", e); + return StatusCode::OK; + } + }; + + if !verifier_response.status().is_success() { + let status = verifier_response.status(); + tracing::error!("Swiyu verifier returned error: {}", status); + return StatusCode::OK; } - let swiyu_status: SwiyuManagementResponse = swiyu_response - .json() - .await - .map_err(|e| { + let swiyu_result: SwiyuManagementResponse = match verifier_response.json().await { + Ok(r) => r, + Err(e) => { tracing::error!("Failed to parse Swiyu response: {}", e); - (StatusCode::BAD_GATEWAY, Json(ErrorResponse::new("verifier_invalid_response"))) - })?; - - tracing::info!( - "Swiyu verification status: {:?} for verification_id={}", - swiyu_status.state, - webhook.verification_id - ); + return StatusCode::OK; + } + }; - // Map Swiyu status to our SessionStatus - let new_status = match swiyu_status.state { - SwiyuVerificationStatus::Success => crate::db::sessions::SessionStatus::Verified, - SwiyuVerificationStatus::Failed => crate::db::sessions::SessionStatus::Failed, + // Determine status based on verification result + let (new_status, status_str) = match swiyu_result.state { + SwiyuVerificationStatus::Success => (SessionStatus::Verified, "verified"), + SwiyuVerificationStatus::Failed => (SessionStatus::Failed, "failed"), SwiyuVerificationStatus::Pending => { - tracing::warn!("Received webhook but verification still pending"); - return Ok(StatusCode::OK); + tracing::info!("Verification {} still pending, ignoring webhook", webhook.verification_id); + return StatusCode::OK; } }; - // Update session status with timestamp - crate::db::sessions::update_session_status_with_timestamp(&state.pool, session.id, new_status.clone()) - .await - .map_err(|e| { - tracing::error!("Failed to update session status: {}", e); - (StatusCode::INTERNAL_SERVER_ERROR, Json(ErrorResponse::new("internal_error"))) - })?; + // Generate authorization code + let authorization_code = crypto::generate_nonce(); - tracing::info!("Updated session {} status to {:?}", session.id, new_status); - - // TODO: Log notification event to database - - // Notify Exchange of verification result - let notification_payload = crate::models::ExchangeNotification { - nonce: session.nonce.clone(), - status: match new_status { - crate::db::sessions::SessionStatus::Verified => "verified".to_string(), - crate::db::sessions::SessionStatus::Failed => "failed".to_string(), - _ => unreachable!("new_status can only be Verified or Failed at this point"), - }, + // Build webhook body for client notification + let client_notification = ClientNotification { + nonce: session_data.nonce.clone(), + status: status_str.to_string(), verification_id: webhook.verification_id, - timestamp: chrono::Utc::now().to_rfc3339(), + timestamp: webhook.timestamp.clone(), }; - tracing::info!( - "Notifying Exchange at {} with status: {}", - client.notification_url, - notification_payload.status - ); - - let exchange_response = http_client - .post(&client.notification_url) - .json(&notification_payload) - .send() - .await; - - match exchange_response { - Ok(resp) if resp.status().is_success() => { - tracing::info!("Successfully notified Exchange at {}", client.notification_url); + let webhook_body = match serde_json::to_string(&client_notification) { + Ok(b) => b, + Err(e) => { + tracing::error!("Failed to serialize client notification: {}", e); + return StatusCode::OK; } - Ok(resp) => { - let status = resp.status(); - let error_body = resp.text().await.unwrap_or_default(); - tracing::warn!( - "Exchange notification endpoint returned error {}: {}", - status, - error_body + }; + + // Update session, create auth code, and queue webhook + match crate::db::sessions::verify_session_and_queue_notification( + &state.pool, + session_data.session_id, + new_status, + &authorization_code, + 10, // 10 minutes for auth code expiry + session_data.client_id, + &session_data.webhook_url, + &webhook_body, + ).await { + Ok(code) => { + tracing::info!( + "Session {} updated to {}, auth code created, webhook queued", + session_data.session_id, status_str ); - // Note: We don't fail the webhook handler if Exchange notification fails - // The session is already updated, Exchange can poll if needed + tracing::debug!("Generated authorization code: {}", code); } Err(e) => { - tracing::error!("Failed to notify Exchange at {}: {}", client.notification_url, e); - // Note: We don't fail the webhook handler if Exchange notification fails + tracing::error!("Failed to update session and queue notification: {}", e); } } - Ok(StatusCode::OK) + StatusCode::OK } #[cfg(test)] @@ -537,7 +607,8 @@ mod tests { // Verify descriptor has proper metadata assert_eq!(descriptor.name, Some("Requested credentials".to_string())); - assert_eq!(descriptor.purpose, Some("KYC verification via OAuth2 Gateway".to_string())); + assert_eq!(descriptor.purpose, + Some("KYC verification via OAuth2 Gateway".to_string())); // Verify format is specified at descriptor level too assert!(descriptor.format.is_some()); diff --git a/oauth2_gateway/src/lib.rs b/oauth2_gateway/src/lib.rs @@ -3,4 +3,5 @@ pub mod handlers; pub mod models; pub mod state; pub mod crypto; -pub mod db; -\ No newline at end of file +pub mod db; +pub mod worker; +\ No newline at end of file diff --git a/oauth2_gateway/src/main.rs b/oauth2_gateway/src/main.rs @@ -11,7 +11,8 @@ use axum::{ #[derive(Parser, Debug)] #[command(version)] struct Args { - #[arg(short, value_name = "FILE")] + /// Configuration file path + #[arg(short = 'c', long = "config", value_name = "FILE")] config: String, } @@ -41,7 +42,7 @@ async fn main() -> Result<()> { let app = Router::new() .route("/health", get(handlers::health_check)) .route("/setup/{client_id}", post(handlers::setup)) - .route("/authorize/{nonce}", get(handlers::authorize)) + .route("/authorize", get(handlers::authorize)) .route("/token", post(handlers::token)) .route("/info", get(handlers::info)) .route("/notification", post(handlers::notification_webhook)) diff --git a/oauth2_gateway/src/models.rs b/oauth2_gateway/src/models.rs @@ -13,18 +13,24 @@ pub struct SetupResponse { pub nonce: String, } +#[derive(Debug, Deserialize)] +pub struct AuthorizeQuery { + pub response_type: String, + pub client_id: String, + pub nonce: String, +} + // Authorize endpoint #[derive(Debug, Deserialize, Serialize)] pub struct AuthorizeResponse { #[serde(rename = "verificationId")] - pub verification_id: Uuid, // Does the browser need this? + pub verification_id: Uuid, pub verification_url: String, } // Token endpoint // WARING: RFC 6749 also requires: // - redirect_uri -// - client_id #[derive(Debug, Deserialize, Serialize)] pub struct TokenRequest { pub grant_type: String, @@ -52,9 +58,9 @@ pub struct NotificationRequest { pub timestamp: String, } -// Notification payload sent to Exchange +// Notification payload sent to Client (Exchange, etc.) #[derive(Debug, Serialize)] -pub struct ExchangeNotification { +pub struct ClientNotification { pub nonce: String, pub status: String, // "verified" or "failed" pub verification_id: Uuid, @@ -77,12 +83,17 @@ impl ErrorResponse { // Swiyu Verifier API models +/// Default issuer DID for Swiyu verification +fn default_accepted_issuer_dids() -> Vec<String> { + vec!["did:tdw:QmPEZPhDFR4nEYSFK5bMnvECqdpf1tPTPJuWs9QrMjCumw:identifier-reg.trust-infra.swiyu-int.admin.ch:api:v1:did:9a5559f0-b81c-4368-a170-e7b4ae424527".to_string()] +} + /// Request body for creating a verification with Swiyu Verifier /// POST /management/api/verifications #[derive(Debug, Serialize, Deserialize)] pub struct SwiyuCreateVerificationRequest { - #[serde(skip_serializing_if = "Option::is_none")] - pub accepted_issuer_dids: Option<Vec<String>>, + #[serde(default = "default_accepted_issuer_dids")] + pub accepted_issuer_dids: Vec<String>, #[serde(skip_serializing_if = "Option::is_none")] pub trust_anchors: Option<Vec<TrustAnchor>>, diff --git a/oauth2_gateway/src/state.rs b/oauth2_gateway/src/state.rs @@ -6,6 +6,7 @@ use std::sync::Arc; pub struct AppState { pub config: Arc<Config>, pub pool: PgPool, + pub http_client: reqwest::Client, } impl AppState { @@ -13,6 +14,7 @@ impl AppState { Self { config: Arc::new(config), pool, + http_client: reqwest::Client::new(), } } } diff --git a/oauth2_gateway/src/worker.rs b/oauth2_gateway/src/worker.rs @@ -0,0 +1,264 @@ +//! Webhook worker daemon +//! +//! Background process that delivers webhooks to client endpoints. +//! Uses PostgreSQL NOTIFY/LISTEN for event-driven wake-up. + +use anyhow::Result; +use sqlx::postgres::PgListener; +use sqlx::PgPool; +use std::sync::atomic::{AtomicUsize, Ordering}; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::watch; +use tracing::{debug, error, info, warn}; + +use crate::db::notification_webhooks::{ + delete_webhook, disable_webhook, get_next_scheduled_webhook, get_pending_webhooks, + schedule_retry, PendingWebhook, +}; + +/// Retry delays based on HTTP response codes +const RETRY_DELAY_SERVER_ERROR: i64 = 60; // 500 +const RETRY_DELAY_FORBIDDEN: i64 = 60; // 403 +const RETRY_DELAY_OTHER: i64 = 3600; // default + +/// Fallback polling interval when no webhooks are pending +const FALLBACK_POLL_SECS: u64 = 300; + +/// Maximum webhooks to fetch per batch +const BATCH_SIZE: i64 = 100; + +/// Run the webhook worker +/// +/// This function blocks until shutdown signal is received. +/// 1. Fetch pending webhooks (next_attempt <= NOW) +/// 2. Execute HTTP requests, wait for all to complete +/// 3. If no pending, check for future webhooks and schedule wake-up +/// 4. Fallback poll every 5 minutes +pub async fn run_worker( + pool: PgPool, + test_mode: bool, + mut shutdown: watch::Receiver<bool>, +) -> Result<()> { + info!("Starting webhook worker (test_mode={})", test_mode); + + let http_client = reqwest::Client::builder() + .timeout(Duration::from_secs(30)) + .redirect(reqwest::redirect::Policy::limited(5)) + .build()?; + + // Set up psql LISTEN + let mut listener = PgListener::connect_with(&pool).await?; + listener.listen("oauth2gw_webhook_pending").await?; + info!("Listening on channel 'oauth2gw_webhook_pending'"); + + loop { + // select work: process pending webhooks (keep looping while there's work) + loop { + let has_work = select_work(&pool, &http_client).await; + + if !has_work { + if test_mode { + info!("Test mode: no pending webhooks, exiting"); + return Ok(()); + } + break; // Exit inner loop, wait for NOTIFY/timer + } + // If has_work, loop again to check for more pending webhooks + } + + // Determine how long to sleep + let sleep_duration = match get_next_scheduled_webhook(&pool).await { + Ok(Some(next_attempt)) => { + let now = chrono::Utc::now().timestamp(); + let delay = (next_attempt - now).max(0) as u64; + debug!("Next webhook scheduled in {} seconds", delay); + Duration::from_secs(delay) + } + Ok(None) => { + debug!("No future webhooks, fallback poll in {} seconds", FALLBACK_POLL_SECS); + Duration::from_secs(FALLBACK_POLL_SECS) + } + Err(e) => { + error!("Failed to get next scheduled webhook: {}", e); + Duration::from_secs(FALLBACK_POLL_SECS) + } + }; + + tokio::select! { + // Shutdown signal + _ = shutdown.changed() => { + if *shutdown.borrow() { + info!("Shutdown signal received"); + break; + } + } + + // psql NOTIFY received - immediately process + notification = listener.recv() => { + match notification { + Ok(n) => { + debug!("NOTIFY received: {}", n.payload()); + // Don't sleep, go straight to select_work + continue; + } + Err(e) => { + error!("LISTEN error: {}", e); + tokio::time::sleep(Duration::from_secs(1)).await; + } + } + } + + // Sleep until next scheduled webhook or fallback poll + _ = tokio::time::sleep(sleep_duration) => { + debug!("Timer expired, checking for work"); + } + } + } + + info!("Webhook worker stopped"); + Ok(()) +} + +/// Process all pending webhooks +/// +/// Returns true if any webhooks were processed. +async fn select_work(pool: &PgPool, http_client: &reqwest::Client) -> bool { + let webhooks = match get_pending_webhooks(pool, BATCH_SIZE).await { + Ok(w) => w, + Err(e) => { + error!("Failed to fetch pending webhooks: {}", e); + return false; + } + }; + + if webhooks.is_empty() { + debug!("No pending webhooks"); + return false; + } + + info!("Processing {} pending webhooks", webhooks.len()); + + // Track in-flight jobs + let in_flight = Arc::new(AtomicUsize::new(webhooks.len())); + let mut handles = Vec::with_capacity(webhooks.len()); + + for webhook in webhooks { + let pool = pool.clone(); + let client = http_client.clone(); + let counter = in_flight.clone(); + + // Fire HTTP request + let handle = tokio::spawn(async move { + execute_webhook(&pool, &client, webhook).await; + counter.fetch_sub(1, Ordering::SeqCst); + }); + + handles.push(handle); + } + + // Wait for all jobs to complete + for handle in handles { + if let Err(e) = handle.await { + error!("Webhook task panicked: {}", e); + } + } + + true +} + +/// Execute a single webhook HTTP request +async fn execute_webhook(pool: &PgPool, client: &reqwest::Client, webhook: PendingWebhook) { + let serial = webhook.webhook_pending_serial; + + debug!( + "Webhook {}: {} {} (retry #{})", + serial, webhook.http_method, webhook.url, webhook.retries + ); + + // Build request based on http_method + let mut request = match webhook.http_method.to_uppercase().as_str() { + "POST" => client.post(&webhook.url), + "PUT" => client.put(&webhook.url), + "GET" => client.get(&webhook.url), + "DELETE" => client.delete(&webhook.url), + method => { + error!("Unsupported HTTP method '{}' for webhook {}", method, serial); + let _ = disable_webhook(pool, serial).await; + return; + } + }; + + // Add headers if present + if let Some(headers) = &webhook.header { + for line in headers.lines() { + if let Some((name, value)) = line.split_once(':') { + request = request.header(name.trim(), value.trim()); + } + } + } + + // Add body for POST/PUT + if matches!(webhook.http_method.to_uppercase().as_str(), "POST" | "PUT") { + request = request + .header("Content-Type", "application/json") + .body(webhook.body.clone()); + } + + // Execute request + let response = match request.send().await { + Ok(r) => r, + Err(e) => { + warn!("Network error for webhook {}: {}", serial, e); + handle_webhook_response(pool, serial, 0).await; + return; + } + }; + + let status_code = response.status().as_u16(); + handle_webhook_response(pool, serial, status_code.into()).await; +} + +/// Handle webhook response +/// +/// - 2xx: delete pending webhook (success) +/// - 400: next_attempt = FOREVER (never retry) +/// - 500: next_attempt = NOW + 1 minute +/// - 403: next_attempt = NOW + 1 minute +/// - other: next_attempt = NOW + 1 hour +async fn handle_webhook_response(pool: &PgPool, serial: i64, response_code: i64) { + info!("Webhook {} returned with status {}", serial, response_code); + + // 200 success - delete from queue + if response_code >= 200 && response_code < 300 { + match delete_webhook(pool, serial).await { + Ok(true) => { + debug!("Webhook {} deleted successfully", serial); + } + Ok(false) => { + warn!("Webhook {} not found for deletion", serial); + } + Err(e) => { + error!("Failed to delete webhook {}: {}", serial, e); + } + } + return; + } + + // Determine retry delay based on response code + let delay = match response_code { + 400 => { + warn!("Webhook {} got 400, disabling permanently", serial); + let _ = disable_webhook(pool, serial).await; + return; + } + 500 => RETRY_DELAY_SERVER_ERROR, + 403 => RETRY_DELAY_FORBIDDEN, + _ => RETRY_DELAY_OTHER, + }; + + debug!("Scheduling webhook {} retry in {} seconds", serial, delay); + if let Err(e) = schedule_retry(pool, serial, delay).await { + error!("Failed to schedule retry for webhook {}: {}", serial, e); + } +}