kych

OAuth 2.0 API for Swiyu to enable Taler integration of Swiyu for KYC (experimental)
Log | Files | Refs

commit 227a247fb717169a6b9969d1b42f2d775200ed2c
parent 972f4685094d17b5d25d90b9305bc9a3c1f937d8
Author: Henrique Chan Carvalho Machado <henriqueccmachado@tecnico.ulisboa.pt>
Date:   Sun, 23 Nov 2025 16:22:07 +0100

oauth2_gateway: db: refactor for oauth2 code and webhook worker implementation

Diffstat:
Doauth2_gateway/migrations/oauth2gw-0001.sql | 158-------------------------------------------------------------------------------
Roauth2_gateway/migrations/drop.sql -> oauth2_gateway/oauth2_gatewaydb/drop.sql | 0
Aoauth2_gateway/oauth2_gatewaydb/oauth2gw-0001.sql | 177+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Roauth2_gateway/migrations/versioning.sql -> oauth2_gateway/oauth2_gatewaydb/versioning.sql | 0
Aoauth2_gateway/src/db/authorization_codes.rs | 154+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Moauth2_gateway/src/db/clients.rs | 44++++++++++++++++++++++----------------------
Doauth2_gateway/src/db/logs.rs | 312-------------------------------------------------------------------------------
Moauth2_gateway/src/db/mod.rs | 3++-
Aoauth2_gateway/src/db/notification_webhooks.rs | 221+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Moauth2_gateway/src/db/sessions.rs | 349+++++++++++++++++++++++++++++++++++++++++++++----------------------------------
Moauth2_gateway/src/db/tokens.rs | 167++++++++++++++++++++++++++++++-------------------------------------------------
11 files changed, 839 insertions(+), 746 deletions(-)

diff --git a/oauth2_gateway/migrations/oauth2gw-0001.sql b/oauth2_gateway/migrations/oauth2gw-0001.sql @@ -1,158 +0,0 @@ --- OAuth2 Gateway Initial Schema --- This migration creates the core tables for the OAuth2 Gateway service -BEGIN; -SELECT _v.register_patch('oauth2gw-0001', NULL, NULL); -CREATE SCHEMA oauth2gw; -SET search_path TO oauth2gw; - --- ============================================================================ --- Table: clients --- Stores registered clients (e.g. Taler Exchange) --- ============================================================================ -CREATE TABLE clients ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - client_id VARCHAR(255) UNIQUE NOT NULL, - client_secret VARCHAR(255) NOT NULL, - notification_url TEXT NOT NULL, -- Client's webhook URL for notifications - verifier_base_url TEXT NOT NULL, -- Swiyu Verifier URL for this client - verifier_management_api_path VARCHAR(255) DEFAULT '/management/api/verifications', - created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, - updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP -); - -CREATE INDEX idx_clients_client_id ON clients(client_id); - -COMMENT ON TABLE clients IS 'Registered clients using the OAuth2 Gateway'; -COMMENT ON COLUMN clients.client_secret IS 'Shared secret for server-to-server authentication'; -COMMENT ON COLUMN clients.notification_url IS 'Client webhook endpoint called when verification completes'; - --- ============================================================================ --- Table: verification_sessions --- Tracks the entire verification flow from setup to completion --- ============================================================================ -CREATE TABLE verification_sessions ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE, - nonce VARCHAR(255) UNIQUE NOT NULL, -- Cryptographically secure random value from /setup - scope TEXT NOT NULL, -- Requested verification scope (space-delimited attributes) - - -- Swiyu Verifier data (populated after /authorize) - verification_url TEXT, - request_id VARCHAR(255), - - -- Session status tracking - status VARCHAR(50) NOT NULL DEFAULT 'pending', - - -- Timestamps - created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, - authorized_at TIMESTAMPTZ, - verified_at TIMESTAMPTZ, - completed_at TIMESTAMPTZ, - failed_at TIMESTAMPTZ, - expires_at TIMESTAMPTZ NOT NULL, - - CONSTRAINT verification_sessions_status_check - CHECK (status IN ('pending', 'authorized', 'verified', 'completed', 'expired', 'failed')) -); - -CREATE INDEX idx_verification_sessions_nonce ON verification_sessions(nonce); -CREATE INDEX idx_verification_sessions_request_id ON verification_sessions(request_id); -CREATE INDEX idx_verification_sessions_status ON verification_sessions(status); -CREATE INDEX idx_verification_sessions_expires_at ON verification_sessions(expires_at); - -COMMENT ON TABLE verification_sessions IS 'Tracks KYC verification sessions from setup to completion'; -COMMENT ON COLUMN verification_sessions.nonce IS 'Cryptographically secure random value used as OAuth2 authorization code'; -COMMENT ON COLUMN verification_sessions.request_id IS 'Swiyu Verifier request ID for tracking the OID4VP session'; -COMMENT ON COLUMN verification_sessions.status IS 'pending: created via /setup, authorized: /authorize called, completed: Client retrieved VC, expired: session timeout, failed: error occurred'; - --- ============================================================================ --- Table: access_tokens --- Stores access tokens issued by /token endpoint --- ============================================================================ -CREATE TABLE access_tokens ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - session_id UUID NOT NULL REFERENCES verification_sessions(id) ON DELETE CASCADE, - token VARCHAR(255) UNIQUE NOT NULL, - token_type VARCHAR(50) NOT NULL DEFAULT 'Bearer', - expires_at TIMESTAMPTZ NOT NULL, - created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, - revoked BOOLEAN DEFAULT FALSE, - revoked_at TIMESTAMPTZ -); - -CREATE INDEX idx_access_tokens_token ON access_tokens(token); -CREATE INDEX idx_access_tokens_session_id ON access_tokens(session_id); -CREATE INDEX idx_access_tokens_expires_at ON access_tokens(expires_at); -CREATE INDEX idx_access_tokens_revoked ON access_tokens(revoked); - -COMMENT ON TABLE access_tokens IS 'OAuth2 access tokens for retrieving VCs from /info endpoint'; -COMMENT ON COLUMN access_tokens.token IS 'Bearer token value (256-bit random, base64-encoded)'; - --- ============================================================================ --- Table: webhook_logs --- Audit log for incoming webhooks from Swiyu Verifier --- ============================================================================ -CREATE TABLE webhook_logs ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - request_id VARCHAR(255), - session_id UUID REFERENCES verification_sessions(id) ON DELETE SET NULL, - payload JSONB NOT NULL, - status_code INTEGER, - processed BOOLEAN DEFAULT FALSE, - error_message TEXT, - received_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, - processed_at TIMESTAMPTZ -); - -CREATE INDEX idx_webhook_logs_request_id ON webhook_logs(request_id); -CREATE INDEX idx_webhook_logs_session_id ON webhook_logs(session_id); -CREATE INDEX idx_webhook_logs_processed ON webhook_logs(processed); -CREATE INDEX idx_webhook_logs_received_at ON webhook_logs(received_at); - -COMMENT ON TABLE webhook_logs IS 'Audit log for incoming webhooks from Swiyu Verifier'; -COMMENT ON COLUMN webhook_logs.processed IS 'Whether webhook was successfully processed and session updated'; - --- ============================================================================ --- Table: notification_logs --- Audit log for outgoing notifications to Client --- ============================================================================ -CREATE TABLE notification_logs ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - session_id UUID NOT NULL REFERENCES verification_sessions(id) ON DELETE CASCADE, - client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE, - notification_url TEXT NOT NULL, -- Client endpoint called - payload JSONB NOT NULL, -- Notification body sent - status_code INTEGER, -- HTTP response code from Client - success BOOLEAN DEFAULT FALSE, - error_message TEXT, - created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, - completed_at TIMESTAMPTZ -); - -CREATE INDEX idx_notification_logs_session_id ON notification_logs(session_id); -CREATE INDEX idx_notification_logs_client_id ON notification_logs(client_id); -CREATE INDEX idx_notification_logs_success ON notification_logs(success); -CREATE INDEX idx_notification_logs_created_at ON notification_logs(created_at); - -COMMENT ON TABLE notification_logs IS 'Audit log for outgoing notifications to Client'; -COMMENT ON COLUMN notification_logs.success IS 'Whether notification was successfully delivered to Client'; - --- ============================================================================ --- Trigger: Auto-update updated_at column --- ============================================================================ -CREATE OR REPLACE FUNCTION update_updated_at_column() -RETURNS TRIGGER AS $$ -BEGIN - NEW.updated_at = CURRENT_TIMESTAMP; - RETURN NEW; -END; -$$ language 'plpgsql'; - -CREATE TRIGGER update_clients_updated_at - BEFORE UPDATE ON clients - FOR EACH ROW - EXECUTE FUNCTION update_updated_at_column(); - -COMMENT ON FUNCTION update_updated_at_column() IS 'Automatically updates updated_at timestamp on row modification'; - -COMMIT; diff --git a/oauth2_gateway/migrations/drop.sql b/oauth2_gateway/oauth2_gatewaydb/drop.sql diff --git a/oauth2_gateway/oauth2_gatewaydb/oauth2gw-0001.sql b/oauth2_gateway/oauth2_gatewaydb/oauth2gw-0001.sql @@ -0,0 +1,177 @@ +-- Everything in one big transaction +BEGIN; + +-- Check patch versioning is in place. +SELECT _v.register_patch('oauth2gw-0001', NULL, NULL); + +CREATE SCHEMA oauth2gw; +COMMENT ON SCHEMA oauth2gw IS 'oauth2 gateway data'; + +SET search_path TO oauth2gw; + +CREATE TABLE IF NOT EXISTS clients ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + client_id VARCHAR(255) UNIQUE NOT NULL, + secret_hash VARCHAR(255) NOT NULL, + webhook_url TEXT NOT NULL, + verifier_url TEXT NOT NULL, + verifier_management_api_path VARCHAR(255) DEFAULT '/management/api/verifications', + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP +); +COMMENT ON TABLE clients + IS 'Clients using the oauth2gw for selective VC disclosure'; +COMMENT ON COLUMN clients.client_id + IS 'ID used for client identification before oauth2 gateway'; +COMMENT ON COLUMN clients.secret_hash + IS 'hash of shared secret used for client authentication before oauth2 gateway'; +COMMENT ON COLUMN clients.webhook_url + IS 'Client URL where oauth2 gateway will callback'; +COMMENT ON COLUMN clients.verifier_url + IS 'Client URL where oauth2 gateway will callback'; +COMMENT ON COLUMN clients.verifier_management_api_path + IS 'Swiyu verifier api endpoint to create verification requests'; + +CREATE INDEX IF NOT EXISTS idx_clients_client_id ON clients(client_id); + +CREATE TABLE verification_sessions ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE, + nonce VARCHAR(255) UNIQUE NOT NULL, + scope TEXT NOT NULL, + verification_url TEXT, + request_id VARCHAR(255), + verifier_nonce VARCHAR(255), + verifiable_credential JSONB, + status VARCHAR(50) NOT NULL DEFAULT 'pending', + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + authorized_at TIMESTAMPTZ, + verified_at TIMESTAMPTZ, + completed_at TIMESTAMPTZ, + failed_at TIMESTAMPTZ, + expires_at TIMESTAMPTZ NOT NULL, + + CONSTRAINT verification_sessions_status_check + CHECK (status IN ('pending', 'authorized', 'verified', 'completed', + 'expired', 'failed')) +); + +CREATE INDEX IF NOT EXISTS + idx_verification_sessions_nonce ON verification_sessions(nonce); +CREATE INDEX IF NOT EXISTS + idx_verification_sessions_request_id ON verification_sessions(request_id); +CREATE INDEX IF NOT EXISTS + idx_verification_sessions_status ON verification_sessions(status); +CREATE INDEX IF NOT EXISTS + idx_verification_sessions_expires_at ON verification_sessions(expires_at); + +COMMENT ON COLUMN verification_sessions.nonce + IS 'Cryptographically secure 256-bit random value used as OAuth2 authorization code'; +COMMENT ON COLUMN verification_sessions.scope + IS 'Space-delimited requested verification attributes (e.g., "first_name last_name")'; +COMMENT ON COLUMN verification_sessions.verification_url + IS 'URL for user wallet to complete verification (populated after /authorize)'; +COMMENT ON COLUMN verification_sessions.request_id + IS 'Swiyu Verifier request ID for tracking the OID4VP session'; +COMMENT ON COLUMN verification_sessions.verifier_nonce + IS 'Nonce returned by verifier for replay protection'; +COMMENT ON COLUMN verification_sessions.status + IS 'pending: created via /setup, authorized: /authorize called, verified: verification complete, completed: Client retrieved VC, expired: session timeout, failed: error occurred'; + +CREATE TABLE IF NOT EXISTS access_tokens ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + session_id UUID NOT NULL REFERENCES verification_sessions(id) ON DELETE CASCADE, + token VARCHAR(255) UNIQUE NOT NULL, + token_type VARCHAR(50) NOT NULL DEFAULT 'Bearer', + expires_at TIMESTAMPTZ NOT NULL, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, + revoked BOOLEAN DEFAULT FALSE, + revoked_at TIMESTAMPTZ +); + +CREATE INDEX idx_access_tokens_token ON access_tokens(token); +CREATE INDEX idx_access_tokens_session_id ON access_tokens(session_id); +CREATE INDEX idx_access_tokens_expires_at ON access_tokens(expires_at); +CREATE INDEX idx_access_tokens_revoked ON access_tokens(revoked); + +COMMENT ON COLUMN access_tokens.token + IS 'Bearer token value (256-bit random, base64-encoded)'; +COMMENT ON COLUMN access_tokens.token_type + IS 'OAuth2 token type (always Bearer)'; +COMMENT ON COLUMN access_tokens.revoked + IS 'Whether token has been explicitly revoked before expiration'; + +CREATE TABLE IF NOT EXISTS authorization_codes ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + session_id UUID NOT NULL REFERENCES verification_sessions(id) ON DELETE CASCADE, + code VARCHAR(255) UNIQUE NOT NULL, + expires_at TIMESTAMPTZ NOT NULL, + used BOOLEAN DEFAULT FALSE, + used_at TIMESTAMPTZ, + created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP +); + +CREATE INDEX idx_authorization_codes_code ON authorization_codes(code); +CREATE INDEX idx_authorization_codes_session_id ON authorization_codes(session_id); +CREATE INDEX idx_authorization_codes_expires_at ON authorization_codes(expires_at); +CREATE INDEX idx_authorization_codes_used ON authorization_codes(used); + +COMMENT ON TABLE authorization_codes + IS 'OAuth2 authorization codes issued after verification completion'; +COMMENT ON COLUMN authorization_codes.code + IS 'Authorization code value (256-bit random, base64-encoded)'; +COMMENT ON COLUMN authorization_codes.used + IS 'Whether code has been exchanged for an access token'; + +CREATE TABLE IF NOT EXISTS notification_pending_webhooks ( + webhook_pending_serial BIGSERIAL PRIMARY KEY, + session_id UUID NOT NULL REFERENCES verification_sessions(id) ON DELETE CASCADE, + client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE, + next_attempt INT8 NOT NULL DEFAULT 0, + retries INT4 NOT NULL DEFAULT 0, + url TEXT NOT NULL, + http_method TEXT NOT NULL DEFAULT 'POST', + header TEXT, + body TEXT NOT NULL +); + +CREATE INDEX idx_notification_pending_webhooks_next_attempt + ON notification_pending_webhooks(next_attempt); +CREATE INDEX idx_notification_pending_webhooks_session_id + ON notification_pending_webhooks(session_id); + +COMMENT ON TABLE notification_pending_webhooks + IS 'Pending client notifications to be sent by background worker'; +COMMENT ON COLUMN notification_pending_webhooks.next_attempt + IS 'Unix timestamp when to attempt sending (0 = execute now, max value = never retry)'; +COMMENT ON COLUMN notification_pending_webhooks.retries + IS 'Number of failed delivery attempts'; +COMMENT ON COLUMN notification_pending_webhooks.url + IS 'Client webhook URL to POST notification'; +COMMENT ON COLUMN notification_pending_webhooks.http_method + IS 'HTTP method for webhook (always POST for notifications)'; +COMMENT ON COLUMN notification_pending_webhooks.header + IS 'Newline-separated HTTP headers for the webhook request'; +COMMENT ON COLUMN notification_pending_webhooks.body + IS 'JSON body to send (stringified, contains nonce, status, code, verification_id)'; + +-- Trigger function to notify webhook worker when new webhooks are queued +CREATE OR REPLACE FUNCTION notify_webhook_pending() +RETURNS TRIGGER AS $$ +BEGIN + -- Notify the webhook worker daemon + PERFORM pg_notify('oauth2gw_webhook_pending', NEW.webhook_pending_serial::TEXT); + RETURN NEW; +END; +$$ LANGUAGE plpgsql; + +CREATE TRIGGER trigger_webhook_pending + AFTER INSERT ON notification_pending_webhooks + FOR EACH ROW + EXECUTE FUNCTION notify_webhook_pending(); + +COMMENT ON FUNCTION notify_webhook_pending() + IS 'Sends PostgreSQL NOTIFY to wake up webhook worker when new webhooks are queued'; + +-- Complete transaction +COMMIT; diff --git a/oauth2_gateway/migrations/versioning.sql b/oauth2_gateway/oauth2_gatewaydb/versioning.sql diff --git a/oauth2_gateway/src/db/authorization_codes.rs b/oauth2_gateway/src/db/authorization_codes.rs @@ -0,0 +1,153 @@ +// Database operations for authorization_codes table + +use sqlx::PgPool; +use anyhow::Result; +use uuid::Uuid; +use chrono::{DateTime, Utc}; + +use super::sessions::SessionStatus; + +/// Authorization code record +#[derive(Debug, Clone, sqlx::FromRow)] +pub struct AuthorizationCode { + pub id: Uuid, + pub session_id: Uuid, + pub code: String, + pub expires_at: DateTime<Utc>, + pub used: bool, + pub used_at: Option<DateTime<Utc>>, + pub created_at: DateTime<Utc>, +} + +/// Result of code exchange query - includes session and existing token data for idempotency +#[derive(Debug, Clone)] +pub struct CodeExchangeData { + pub code_id: Uuid, + pub was_already_used: bool, + pub session_id: Uuid, + pub session_status: SessionStatus, + /// Existing token if one was already created (for idempotent response) + pub existing_token: Option<String>, + pub existing_token_expires_at: Option<DateTime<Utc>>, +} + +/// Create a new authorization code for a session +/// +/// Called after verification completes successfully +pub async fn create_authorization_code( + pool: &PgPool, + session_id: Uuid, + code: &str, + expires_in_minutes: i64, +) -> Result<AuthorizationCode> { + let auth_code = sqlx::query_as::<_, AuthorizationCode>( + r#" + INSERT INTO oauth2gw.authorization_codes (session_id, code, expires_at) + VALUES ($1, $2, NOW() + $3 * INTERVAL '1 minute') + RETURNING id, session_id, code, expires_at, used, used_at, created_at + "# + ) + .bind(session_id) + .bind(code) + .bind(expires_in_minutes) + .fetch_one(pool) + .await?; + + Ok(auth_code) +} + +/// Atomically mark code as used and fetch session + existing token data +/// +/// This is the idempotent query for /token endpoint: +/// - Capture OLD used value before update +/// - JOINs with session to get status +/// - LEFT JOINs with access_tokens to get existing token (for idempotent response) +/// - Returns None if code doesn't exist or is expired +/// +/// Used by the /token endpoint +pub async fn get_code_for_token_exchange( + pool: &PgPool, + code: &str, +) -> Result<Option<CodeExchangeData>> { + // Use CTE to capture old 'used' value before the UPDATE changes it + let result = sqlx::query( + r#" + WITH code_data AS ( + SELECT id, used AS was_already_used, session_id + FROM oauth2gw.authorization_codes + WHERE code = $1 AND expires_at > NOW() + FOR UPDATE + ), + updated_code AS ( + UPDATE oauth2gw.authorization_codes ac + SET used = TRUE, + used_at = CASE WHEN NOT ac.used THEN NOW() ELSE ac.used_at END + FROM code_data cd + WHERE ac.id = cd.id + RETURNING ac.id, ac.session_id + ) + SELECT + uc.id AS code_id, + cd.was_already_used, + uc.session_id, + vs.status AS session_status, + at.token AS existing_token, + at.expires_at AS existing_token_expires_at + FROM updated_code uc + JOIN code_data cd ON uc.id = cd.id + JOIN oauth2gw.verification_sessions vs ON vs.id = uc.session_id + LEFT JOIN oauth2gw.access_tokens at + ON at.session_id = vs.id AND at.revoked = FALSE + "# + ) + .bind(code) + .fetch_optional(pool) + .await?; + + Ok(result.map(|row: sqlx::postgres::PgRow| { + use sqlx::Row; + CodeExchangeData { + code_id: row.get("code_id"), + was_already_used: row.get("was_already_used"), + session_id: row.get("session_id"), + session_status: row.get("session_status"), + existing_token: row.get("existing_token"), + existing_token_expires_at: row.get("existing_token_expires_at"), + } + })) +} + +/// Get authorization code by session ID +pub async fn get_code_by_session( + pool: &PgPool, + session_id: Uuid, +) -> Result<Option<AuthorizationCode>> { + let auth_code = sqlx::query_as::<_, AuthorizationCode>( + r#" + SELECT id, session_id, code, expires_at, used, used_at, created_at + FROM oauth2gw.authorization_codes + WHERE session_id = $1 + ORDER BY created_at DESC + LIMIT 1 + "# + ) + .bind(session_id) + .fetch_optional(pool) + .await?; + + Ok(auth_code) +} + +/// Delete expired authorization codes (garbage collection) +pub async fn delete_expired_codes(pool: &PgPool) -> Result<u64> { + let result = sqlx::query( + r#" + DELETE FROM oauth2gw.authorization_codes + WHERE expires_at < CURRENT_TIMESTAMP + "# + ) + .execute(pool) + .await?; + + Ok(result.rows_affected()) +} +\ No newline at end of file diff --git a/oauth2_gateway/src/db/clients.rs b/oauth2_gateway/src/db/clients.rs @@ -10,9 +10,9 @@ use chrono::{DateTime, Utc}; pub struct Client { pub id: Uuid, pub client_id: String, - pub client_secret: String, - pub notification_url: String, - pub verifier_base_url: String, + pub secret_hash: String, + pub webhook_url: String, + pub verifier_url: String, pub verifier_management_api_path: String, pub created_at: DateTime<Utc>, pub updated_at: DateTime<Utc>, @@ -23,8 +23,8 @@ pub async fn register_client( pool: &PgPool, client_id: &str, client_secret: &str, - notification_url: &str, - verifier_base_url: &str, + webhook_url: &str, + verifier_url: &str, verifier_management_api_path: Option<&str>, ) -> Result<Client> { let api_path = verifier_management_api_path @@ -33,16 +33,16 @@ pub async fn register_client( let client = sqlx::query_as::<_, Client>( r#" INSERT INTO oauth2gw.clients - (client_id, client_secret, notification_url, verifier_base_url, verifier_management_api_path) + (client_id, secret_hash, webhook_url, verifier_url, verifier_management_api_path) VALUES ($1, $2, $3, $4, $5) - RETURNING id, client_id, client_secret, notification_url, verifier_base_url, + RETURNING id, client_id, secret_hash, webhook_url, verifier_url, verifier_management_api_path, created_at, updated_at "# ) .bind(client_id) .bind(client_secret) - .bind(notification_url) - .bind(verifier_base_url) + .bind(webhook_url) + .bind(verifier_url) .bind(api_path) .fetch_one(pool) .await?; @@ -57,7 +57,7 @@ pub async fn get_client_by_id( ) -> Result<Option<Client>> { let client = sqlx::query_as::<_, Client>( r#" - SELECT id, client_id, client_secret, notification_url, verifier_base_url, + SELECT id, client_id, secret_hash, webhook_url, verifier_url, verifier_management_api_path, created_at, updated_at FROM oauth2gw.clients WHERE client_id = $1 @@ -77,7 +77,7 @@ pub async fn get_client_by_uuid( ) -> Result<Option<Client>> { let client = sqlx::query_as::<_, Client>( r#" - SELECT id, client_id, client_secret, notification_url, verifier_base_url, + SELECT id, client_id, secret_hash, webhook_url, verifier_url, verifier_management_api_path, created_at, updated_at FROM oauth2gw.clients WHERE id = $1 @@ -100,10 +100,10 @@ pub async fn authenticate_client( ) -> Result<Option<Client>> { let client = sqlx::query_as::<_, Client>( r#" - SELECT id, client_id, client_secret, notification_url, verifier_base_url, + SELECT id, client_id, secret_hash, webhook_url, verifier_url, verifier_management_api_path, created_at, updated_at FROM oauth2gw.clients - WHERE client_id = $1 AND client_secret = $2 + WHERE client_id = $1 AND secret_hash = $2 "# ) .bind(client_id) @@ -118,16 +118,16 @@ pub async fn authenticate_client( pub async fn update_client( pool: &PgPool, id: Uuid, - notification_url: Option<&str>, - verifier_base_url: Option<&str>, + webhook_url: Option<&str>, + verifier_url: Option<&str>, verifier_management_api_path: Option<&str>, ) -> Result<Client> { // Get current client to use for fields that aren't being updated let current = get_client_by_uuid(pool, id).await? .ok_or_else(|| anyhow::anyhow!("Client not found"))?; - let new_notification_url = notification_url.unwrap_or(&current.notification_url); - let new_verifier_base_url = verifier_base_url.unwrap_or(&current.verifier_base_url); + let new_webhook_url = webhook_url.unwrap_or(&current.webhook_url); + let new_verifier_url = verifier_url.unwrap_or(&current.verifier_url); let new_verifier_api_path = verifier_management_api_path .unwrap_or(&current.verifier_management_api_path); @@ -135,17 +135,17 @@ pub async fn update_client( r#" UPDATE oauth2gw.clients SET - notification_url = $1, - verifier_base_url = $2, + webhook_url = $1, + verifier_url = $2, verifier_management_api_path = $3, updated_at = CURRENT_TIMESTAMP WHERE id = $4 - RETURNING id, client_id, client_secret, notification_url, verifier_base_url, + RETURNING id, client_id, secret_hash, webhook_url, verifier_url, verifier_management_api_path, created_at, updated_at "# ) - .bind(new_notification_url) - .bind(new_verifier_base_url) + .bind(new_webhook_url) + .bind(new_verifier_url) .bind(new_verifier_api_path) .bind(id) .fetch_one(pool) diff --git a/oauth2_gateway/src/db/logs.rs b/oauth2_gateway/src/db/logs.rs @@ -1,311 +0,0 @@ -// Database operations for webhook_logs and notification_logs tables - -use sqlx::PgPool; -use anyhow::Result; -use uuid::Uuid; -use chrono::{DateTime, Utc, Duration}; -use serde_json::Value as JsonValue; - -/// Webhook log record -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct WebhookLog { - pub id: Uuid, - pub request_id: Option<String>, - pub session_id: Option<Uuid>, - pub payload: JsonValue, - pub status_code: Option<i32>, - pub processed: bool, - pub error_message: Option<String>, - pub received_at: DateTime<Utc>, - pub processed_at: Option<DateTime<Utc>>, -} - -/// Notification log record -#[derive(Debug, Clone, sqlx::FromRow)] -pub struct NotificationLog { - pub id: Uuid, - pub session_id: Uuid, - pub client_id: Uuid, - pub notification_url: String, - pub payload: JsonValue, - pub status_code: Option<i32>, - pub success: bool, - pub error_message: Option<String>, - pub created_at: DateTime<Utc>, - pub completed_at: Option<DateTime<Utc>>, -} - -// ============================================================================ -// Webhook Logs (Incoming from Swiyu Verifier) -// ============================================================================ - -/// Log an incoming webhook from Swiyu Verifier -pub async fn log_webhook_received( - pool: &PgPool, - request_id: Option<&str>, - session_id: Option<Uuid>, - payload: &JsonValue, -) -> Result<Uuid> { - let log_id = sqlx::query_scalar::<_, Uuid>( - r#" - INSERT INTO oauth2gw.webhook_logs (request_id, session_id, payload, processed) - VALUES ($1, $2, $3, FALSE) - RETURNING id - "# - ) - .bind(request_id) - .bind(session_id) - .bind(payload) - .fetch_one(pool) - .await?; - - Ok(log_id) -} - -/// Mark webhook as successfully processed -pub async fn mark_webhook_processed( - pool: &PgPool, - log_id: Uuid, - status_code: i32, -) -> Result<()> { - sqlx::query( - r#" - UPDATE oauth2gw.webhook_logs - SET - processed = TRUE, - status_code = $1, - processed_at = CURRENT_TIMESTAMP - WHERE id = $2 - "# - ) - .bind(status_code) - .bind(log_id) - .execute(pool) - .await?; - - Ok(()) -} - -/// Mark webhook as failed with error -pub async fn mark_webhook_failed( - pool: &PgPool, - log_id: Uuid, - status_code: i32, - error_message: &str, -) -> Result<()> { - sqlx::query( - r#" - UPDATE oauth2gw.webhook_logs - SET - processed = FALSE, - status_code = $1, - error_message = $2, - processed_at = CURRENT_TIMESTAMP - WHERE id = $3 - "# - ) - .bind(status_code) - .bind(error_message) - .bind(log_id) - .execute(pool) - .await?; - - Ok(()) -} - -/// Get webhook logs for a session -pub async fn get_webhook_logs_for_session( - pool: &PgPool, - session_id: Uuid, -) -> Result<Vec<WebhookLog>> { - let logs = sqlx::query_as::<_, WebhookLog>( - r#" - SELECT id, request_id, session_id, payload, status_code, processed, - error_message, received_at, processed_at - FROM oauth2gw.webhook_logs - WHERE session_id = $1 - ORDER BY received_at DESC - "# - ) - .bind(session_id) - .fetch_all(pool) - .await?; - - Ok(logs) -} - -/// Get unprocessed webhooks (for retry logic) -pub async fn get_unprocessed_webhooks(pool: &PgPool, limit: i64) -> Result<Vec<WebhookLog>> { - let logs = sqlx::query_as::<_, WebhookLog>( - r#" - SELECT id, request_id, session_id, payload, status_code, processed, - error_message, received_at, processed_at - FROM oauth2gw.webhook_logs - WHERE processed = FALSE - AND received_at > CURRENT_TIMESTAMP - INTERVAL '1 hour' - ORDER BY received_at ASC - LIMIT $1 - "# - ) - .bind(limit) - .fetch_all(pool) - .await?; - - Ok(logs) -} - -/// Delete old webhook logs (garbage collection) -pub async fn delete_old_webhook_logs(pool: &PgPool, older_than_days: i64) -> Result<u64> { - let cutoff = Utc::now() - Duration::days(older_than_days); - - let result = sqlx::query( - r#" - DELETE FROM oauth2gw.webhook_logs - WHERE received_at < $1 - "# - ) - .bind(cutoff) - .execute(pool) - .await?; - - Ok(result.rows_affected()) -} - -// ============================================================================ -// Notification Logs (Outgoing to Client) -// ============================================================================ - -/// Log an outgoing notification to Client -pub async fn log_notification_sent( - pool: &PgPool, - session_id: Uuid, - client_id: Uuid, - notification_url: &str, - payload: &JsonValue, -) -> Result<Uuid> { - let log_id = sqlx::query_scalar::<_, Uuid>( - r#" - INSERT INTO oauth2gw.notification_logs - (session_id, client_id, notification_url, payload, success) - VALUES ($1, $2, $3, $4, FALSE) - RETURNING id - "# - ) - .bind(session_id) - .bind(client_id) - .bind(notification_url) - .bind(payload) - .fetch_one(pool) - .await?; - - Ok(log_id) -} - -/// Mark notification as successfully delivered -pub async fn mark_notification_success( - pool: &PgPool, - log_id: Uuid, - status_code: i32, -) -> Result<()> { - sqlx::query( - r#" - UPDATE oauth2gw.notification_logs - SET - success = TRUE, - status_code = $1, - completed_at = CURRENT_TIMESTAMP - WHERE id = $2 - "# - ) - .bind(status_code) - .bind(log_id) - .execute(pool) - .await?; - - Ok(()) -} - -/// Mark notification as failed -pub async fn mark_notification_failed( - pool: &PgPool, - log_id: Uuid, - status_code: Option<i32>, - error_message: &str, -) -> Result<()> { - sqlx::query( - r#" - UPDATE oauth2gw.notification_logs - SET - success = FALSE, - status_code = $1, - error_message = $2, - completed_at = CURRENT_TIMESTAMP - WHERE id = $3 - "# - ) - .bind(status_code) - .bind(error_message) - .bind(log_id) - .execute(pool) - .await?; - - Ok(()) -} - -/// Get notification logs for a session -pub async fn get_notification_logs_for_session( - pool: &PgPool, - session_id: Uuid, -) -> Result<Vec<NotificationLog>> { - let logs = sqlx::query_as::<_, NotificationLog>( - r#" - SELECT id, session_id, client_id, notification_url, payload, status_code, - success, error_message, created_at, completed_at - FROM oauth2gw.notification_logs - WHERE session_id = $1 - ORDER BY created_at DESC - "# - ) - .bind(session_id) - .fetch_all(pool) - .await?; - - Ok(logs) -} - -/// Get failed notifications for retry -pub async fn get_failed_notifications(pool: &PgPool, limit: i64) -> Result<Vec<NotificationLog>> { - let logs = sqlx::query_as::<_, NotificationLog>( - r#" - SELECT id, session_id, client_id, notification_url, payload, status_code, - success, error_message, created_at, completed_at - FROM oauth2gw.notification_logs - WHERE success = FALSE - AND created_at > CURRENT_TIMESTAMP - INTERVAL '1 hour' - ORDER BY created_at ASC - LIMIT $1 - "# - ) - .bind(limit) - .fetch_all(pool) - .await?; - - Ok(logs) -} - -/// Delete old notification logs (garbage collection) -pub async fn delete_old_notification_logs(pool: &PgPool, older_than_days: i64) -> Result<u64> { - let cutoff = Utc::now() - Duration::days(older_than_days); - - let result = sqlx::query( - r#" - DELETE FROM oauth2gw.notification_logs - WHERE created_at < $1 - "# - ) - .bind(cutoff) - .execute(pool) - .await?; - - Ok(result.rows_affected()) -} -\ No newline at end of file diff --git a/oauth2_gateway/src/db/mod.rs b/oauth2_gateway/src/db/mod.rs @@ -7,7 +7,8 @@ use anyhow::{Result, Context}; pub mod sessions; pub mod tokens; pub mod clients; -pub mod logs; +pub mod authorization_codes; +pub mod notification_webhooks; /// Create a PostgreSQL connection pool /// diff --git a/oauth2_gateway/src/db/notification_webhooks.rs b/oauth2_gateway/src/db/notification_webhooks.rs @@ -0,0 +1,220 @@ +// Database operations for notification_pending_webhooks table + +use sqlx::PgPool; +use anyhow::Result; +use uuid::Uuid; + +/// Pending webhook record with authorization code for client notification +#[derive(Debug, Clone)] +pub struct PendingWebhook { + pub webhook_pending_serial: i64, + pub session_id: Uuid, + pub client_id: Uuid, + pub url: String, + pub http_method: String, + pub header: Option<String>, + pub body: String, + pub retries: i32, + /// Authorization code to include in the webhook payload + pub code: String, +} + +/// Fetch pending webhooks ready to be sent +/// +/// JOINs with authorization_codes to get the code value. +/// Only returns webhooks where next_attempt <= current epoch time. +/// +/// Used by the background worker +pub async fn get_pending_webhooks( + pool: &PgPool, + limit: i64, +) -> Result<Vec<PendingWebhook>> { + let webhooks = sqlx::query( + r#" + SELECT + npw.webhook_pending_serial, + npw.session_id, + npw.client_id, + npw.url, + npw.http_method, + npw.header, + npw.body, + npw.retries, + ac.code + FROM oauth2gw.notification_pending_webhooks npw + INNER JOIN oauth2gw.authorization_codes ac ON ac.session_id = npw.session_id + WHERE npw.next_attempt <= EXTRACT(EPOCH FROM NOW()) + ORDER BY npw.webhook_pending_serial + LIMIT $1 + "# + ) + .bind(limit) + .fetch_all(pool) + .await?; + + Ok(webhooks.into_iter().map(|row: sqlx::postgres::PgRow| { + use sqlx::Row; + PendingWebhook { + webhook_pending_serial: row.get("webhook_pending_serial"), + session_id: row.get("session_id"), + client_id: row.get("client_id"), + url: row.get("url"), + http_method: row.get("http_method"), + header: row.get("header"), + body: row.get("body"), + retries: row.get("retries"), + code: row.get("code"), + } + }).collect()) +} + +/// Delete a webhook after successful delivery +pub async fn delete_webhook( + pool: &PgPool, + webhook_pending_serial: i64, +) -> Result<bool> { + let result = sqlx::query( + r#" + DELETE FROM oauth2gw.notification_pending_webhooks + WHERE webhook_pending_serial = $1 + "# + ) + .bind(webhook_pending_serial) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Disable a webhook (client error - no retry) +/// +/// Sets next_attempt to max i64 value so it never gets picked up again +pub async fn disable_webhook( + pool: &PgPool, + webhook_pending_serial: i64, +) -> Result<bool> { + let result = sqlx::query( + r#" + UPDATE oauth2gw.notification_pending_webhooks + SET next_attempt = 9223372036854775807 + WHERE webhook_pending_serial = $1 + "# + ) + .bind(webhook_pending_serial) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Schedule retry with exponential backoff +/// +/// - Server errors (5xx): retry after delay_seconds (default 60) +/// - Network/timeout: retry after longer delay (default 3600) +pub async fn schedule_retry( + pool: &PgPool, + webhook_pending_serial: i64, + delay_seconds: i64, +) -> Result<bool> { + let result = sqlx::query( + r#" + UPDATE oauth2gw.notification_pending_webhooks + SET next_attempt = EXTRACT(EPOCH FROM NOW()) + $1, + retries = retries + 1 + WHERE webhook_pending_serial = $2 + "# + ) + .bind(delay_seconds) + .bind(webhook_pending_serial) + .execute(pool) + .await?; + + Ok(result.rows_affected() > 0) +} + +/// Insert a new pending webhook notification +/// +/// Triggered by the /notification handler after verification is complete +pub async fn insert_pending_webhook( + pool: &PgPool, + session_id: Uuid, + client_id: Uuid, + url: &str, + body: &str, +) -> Result<i64> { + let result = sqlx::query_scalar::<_, i64>( + r#" + INSERT INTO oauth2gw.notification_pending_webhooks + (session_id, client_id, url, body, next_attempt) + VALUES ($1, $2, $3, $4, 0) + RETURNING webhook_pending_serial + "# + ) + .bind(session_id) + .bind(client_id) + .bind(url) + .bind(body) + .fetch_one(pool) + .await?; + + Ok(result) +} + +/// Delete old disabled webhooks (garbage collection) +/// +/// Removes webhooks that have been disabled (next_attempt = MAX) for cleanup +pub async fn delete_disabled_webhooks(pool: &PgPool) -> Result<u64> { + let result = sqlx::query( + r#" + DELETE FROM oauth2gw.notification_pending_webhooks + WHERE next_attempt = 9223372036854775807 + "# + ) + .execute(pool) + .await?; + + Ok(result.rows_affected()) +} + +/// Delete webhooks that have exceeded max retries +pub async fn delete_max_retries( + pool: &PgPool, + max_retries: i32, +) -> Result<u64> { + let result = sqlx::query( + r#" + DELETE FROM oauth2gw.notification_pending_webhooks + WHERE retries >= $1 + "# + ) + .bind(max_retries) + .execute(pool) + .await?; + + Ok(result.rows_affected()) +} + +/// Get the next scheduled webhook attempt time +/// +/// Returns the earliest `next_attempt` timestamp for webhooks that are: +/// - Not disabled (next_attempt < MAX_INT8) +/// - Scheduled for the future (next_attempt > NOW) +/// +/// Used by worker to schedule precise wake-up for retries. +/// Returns None if no future webhooks are scheduled. +pub async fn get_next_scheduled_webhook(pool: &PgPool) -> Result<Option<i64>> { + let result = sqlx::query_scalar::<_, i64>( + r#" + SELECT next_attempt + FROM oauth2gw.notification_pending_webhooks + WHERE next_attempt > EXTRACT(EPOCH FROM NOW()) + AND next_attempt < 9223372036854775807 + ORDER BY next_attempt ASC + LIMIT 1 + "# + ) + .fetch_optional(pool) + .await?; + + Ok(result) +} +\ No newline at end of file diff --git a/oauth2_gateway/src/db/sessions.rs b/oauth2_gateway/src/db/sessions.rs @@ -3,7 +3,7 @@ use sqlx::PgPool; use anyhow::Result; use uuid::Uuid; -use chrono::{DateTime, Utc, Duration}; +use chrono::{DateTime, Utc}; /// Status of a verification session #[derive(Debug, Clone, sqlx::Type, serde::Serialize, serde::Deserialize, PartialEq)] @@ -23,7 +23,7 @@ pub enum SessionStatus { Failed, } -/// Verification session record +/// Verification session record used in /setup endpoint #[derive(Debug, Clone, sqlx::FromRow)] pub struct VerificationSession { pub id: Uuid, @@ -32,6 +32,7 @@ pub struct VerificationSession { pub scope: String, pub verification_url: Option<String>, pub request_id: Option<String>, + pub verifier_nonce: Option<String>, pub status: SessionStatus, pub created_at: DateTime<Utc>, pub authorized_at: Option<DateTime<Utc>>, @@ -41,214 +42,264 @@ pub struct VerificationSession { pub expires_at: DateTime<Utc>, } +/// Authorize record data used in /authorize endpoint +#[derive(Debug, Clone)] +pub struct AuthorizeSessionData { + // Session fields + pub session_id: Uuid, + pub status: SessionStatus, + pub expires_at: DateTime<Utc>, + pub scope: String, + pub verification_url: Option<String>, + pub request_id: Option<String>, + pub verifier_nonce: Option<String>, + // Client fields + pub verifier_url: String, + pub verifier_management_api_path: String, +} + +/// Notification record data used in /notification webhook endpoint +#[derive(Debug, Clone)] +pub struct NotificationSessionData { + // Session fields + pub session_id: Uuid, + pub nonce: String, + pub status: SessionStatus, + // Client fields + pub client_id: Uuid, + pub webhook_url: String, + pub verifier_url: String, + pub verifier_management_api_path: String, +} + + /// Create a new verification session /// -/// This is called by the /setup endpoint +/// Returns None if client_id doesn't exist +/// +/// Used by the /setup endpoint pub async fn create_session( pool: &PgPool, - client_id: Uuid, + client_id: &str, nonce: &str, scope: &str, expires_in_minutes: i64, -) -> Result<VerificationSession> { - let expires_at = Utc::now() + Duration::minutes(expires_in_minutes); - +) -> Result<Option<VerificationSession>> { let session = sqlx::query_as::<_, VerificationSession>( r#" - INSERT INTO oauth2gw.verification_sessions (client_id, nonce, scope, expires_at, status) - VALUES ($1, $2, $3, $4, 'pending') + INSERT INTO oauth2gw.verification_sessions (client_id, nonce, scope, + expires_at, status) + SELECT c.id, $1, $2, NOW() + $3 * INTERVAL '1 minute', 'pending' + FROM oauth2gw.clients c + WHERE c.client_id = $4 RETURNING - id, client_id, nonce, scope, verification_url, request_id, status, - created_at, authorized_at, verified_at, completed_at, failed_at, expires_at + id, client_id, nonce, scope, verification_url, request_id, + verifier_nonce, status, created_at, authorized_at, + verified_at, completed_at, failed_at, expires_at "# ) - .bind(client_id) - .bind(nonce) - .bind(scope) - .bind(expires_at) - .fetch_one(pool) - .await?; + .bind(nonce) + .bind(scope) + .bind(expires_in_minutes) + .bind(client_id) + .fetch_optional(pool) + .await?; Ok(session) } -/// Lookup session by nonce +/// Fetch session and client data for /authorize endpoint +/// +/// Returns all data needed for backend validation and idempotent responses. /// -/// This is used by /authorize and /token endpoints -pub async fn get_session_by_nonce( +/// Used by the /authorize endpoint +pub async fn get_session_for_authorize( pool: &PgPool, - nonce: &str -) -> Result<Option<VerificationSession>> { - let session = sqlx::query_as::<_, VerificationSession>( + nonce: &str, + client_id: &str, +) -> Result<Option<AuthorizeSessionData>> { + let result = sqlx::query( r#" - SELECT - id, client_id, nonce, scope, verification_url, request_id, status, - created_at, authorized_at, verified_at, completed_at, failed_at, expires_at - FROM oauth2gw.verification_sessions - WHERE nonce = $1 + UPDATE oauth2gw.verification_sessions s + SET status = s.status + FROM oauth2gw.clients c + WHERE s.client_id = c.id + AND s.nonce = $1 + AND c.client_id = $2 + RETURNING + s.id AS session_id, + s.status, + s.expires_at, + s.scope, + s.verification_url, + s.request_id, + s.verifier_nonce, + c.verifier_url, + c.verifier_management_api_path "# ) .bind(nonce) + .bind(client_id) .fetch_optional(pool) .await?; - Ok(session) + Ok(result.map(|row: sqlx::postgres::PgRow| { + use sqlx::Row; + AuthorizeSessionData { + session_id: row.get("session_id"), + status: row.get("status"), + expires_at: row.get("expires_at"), + scope: row.get("scope"), + verification_url: row.get("verification_url"), + request_id: row.get("request_id"), + verifier_nonce: row.get("verifier_nonce"), + verifier_url: row.get("verifier_url"), + verifier_management_api_path: row.get("verifier_management_api_path"), + } + })) } -/// Lookup session by Swiyu request_id +/// Fetch session and client data for /notification webhook +/// +/// Returns all data needed for backend validation and client notification. /// -/// This is used by the webhook handler -pub async fn get_session_by_request_id( +/// Used by the /notification endpoint (incoming webhook from Swiyu) +pub async fn get_session_for_notification( pool: &PgPool, - request_id: &str -) -> Result<Option<VerificationSession>> { - let session = sqlx::query_as::<_, VerificationSession>( + request_id: &str, +) -> Result<Option<NotificationSessionData>> { + let result = sqlx::query( r#" - SELECT - id, client_id, nonce, scope, verification_url, request_id, status, - created_at, authorized_at, verified_at, completed_at, failed_at, expires_at - FROM oauth2gw.verification_sessions - WHERE request_id = $1 + UPDATE oauth2gw.verification_sessions s + SET status = s.status + FROM oauth2gw.clients c + WHERE s.client_id = c.id + AND s.request_id = $1 + RETURNING + s.id AS session_id, + s.nonce, + s.status, + c.id AS client_id, + c.webhook_url, + c.verifier_url, + c.verifier_management_api_path "# ) .bind(request_id) .fetch_optional(pool) .await?; - Ok(session) -} - -/// Update session status and set timestamp -pub async fn update_session_status_with_timestamp( - pool: &PgPool, - session_id: Uuid, - status: SessionStatus, -) -> Result<()> { - let timestamp_field = match status { - SessionStatus::Authorized => "authorized_at", - SessionStatus::Verified => "verified_at", - SessionStatus::Completed => "completed_at", - SessionStatus::Failed => "failed_at", - SessionStatus::Pending | SessionStatus::Expired => { - return Err(anyhow::anyhow!( - "Status {:?} has no timestamp field. Use set_session_status() instead.", - status - )); + Ok(result.map(|row: sqlx::postgres::PgRow| { + use sqlx::Row; + NotificationSessionData { + session_id: row.get("session_id"), + nonce: row.get("nonce"), + status: row.get("status"), + client_id: row.get("client_id"), + webhook_url: row.get("webhook_url"), + verifier_url: row.get("verifier_url"), + verifier_management_api_path: row.get("verifier_management_api_path"), } - }; - - let query = format!( - "UPDATE oauth2gw.verification_sessions SET status = $1, {} = CURRENT_TIMESTAMP WHERE id = $2", - timestamp_field - ); - - sqlx::query(&query) - .bind(status) - .bind(session_id) - .execute(pool) - .await?; - - Ok(()) + })) } -/// Set session status without updating timestamp -pub async fn set_session_status( - pool: &PgPool, - session_id: Uuid, - status: SessionStatus, -) -> Result<()> { - sqlx::query( - r#" - UPDATE oauth2gw.verification_sessions - SET status = $1 - WHERE id = $2 - "# - ) - .bind(status) - .bind(session_id) - .execute(pool) - .await?; - - Ok(()) +/// Data returned after updating session to authorized +#[derive(Debug, Clone)] +pub struct AuthorizedSessionResult { + pub request_id: String, + pub verification_url: String, } -/// Update session with Swiyu verification data after calling /authorize +/// Update session to authorized with verifier response data +/// +/// Called after successful POST to Swiyu Verifier. +/// Sets status to 'authorized' and stores verification_url, request_id, verifier_nonce. +/// Returns the request_id (verification_id) and verification_url. /// -/// Sets verification_url, request_id, status=authorized, and authorized_at timestamp. -pub async fn set_session_authorized( +/// Used by the /authorize endpoint +pub async fn update_session_authorized( pool: &PgPool, session_id: Uuid, verification_url: &str, request_id: &str, -) -> Result<()> { - sqlx::query( + verifier_nonce: Option<&str>, +) -> Result<AuthorizedSessionResult> { + let result = sqlx::query( r#" UPDATE oauth2gw.verification_sessions - SET + SET status = 'authorized', verification_url = $1, request_id = $2, - status = 'authorized', - authorized_at = CURRENT_TIMESTAMP - WHERE id = $3 + verifier_nonce = $3, + authorized_at = NOW() + WHERE id = $4 + RETURNING request_id, verification_url "# ) .bind(verification_url) .bind(request_id) + .bind(verifier_nonce) .bind(session_id) - .execute(pool) + .fetch_one(pool) .await?; - Ok(()) + use sqlx::Row; + Ok(AuthorizedSessionResult { + request_id: result.get("request_id"), + verification_url: result.get("verification_url"), + }) } -/// Get all expired sessions for garbage collection -pub async fn get_expired_sessions(pool: &PgPool) -> Result<Vec<VerificationSession>> { - let sessions = sqlx::query_as::<_, VerificationSession>( - r#" - SELECT - id, client_id, nonce, scope, verification_url, request_id, status, - created_at, authorized_at, verified_at, completed_at, failed_at, expires_at - FROM oauth2gw.verification_sessions - WHERE expires_at < CURRENT_TIMESTAMP - AND status NOT IN ('completed', 'expired', 'failed') - "# - ) - .fetch_all(pool) - .await?; - - Ok(sessions) -} +/// Atomically update session to verified, create authorization code, and queue webhook +/// +/// Returns the generated authorization code on success. +pub async fn verify_session_and_queue_notification( + pool: &PgPool, + session_id: Uuid, + status: SessionStatus, + authorization_code: &str, + code_expires_in_minutes: i64, + client_id: Uuid, + webhook_url: &str, + webhook_body: &str, +) -> Result<String> { + let timestamp_field = match status { + SessionStatus::Verified => "verified_at", + SessionStatus::Failed => "failed_at", + _ => return Err(anyhow::anyhow!("Invalid status for notification: must be Verified or Failed")), + }; -/// Mark expired sessions as expired (garbage collection) -pub async fn mark_expired_sessions(pool: &PgPool) -> Result<u64> { - let result = sqlx::query( + let query = format!( r#" - UPDATE oauth2gw.verification_sessions - SET status = 'expired' - WHERE expires_at < CURRENT_TIMESTAMP - AND status NOT IN ('completed', 'expired', 'failed') - "# - ) - .execute(pool) - .await?; - - Ok(result.rows_affected()) -} - -/// Delete old completed/expired/failed sessions (garbage collection) -pub async fn delete_old_sessions(pool: &PgPool, older_than_days: i64) -> Result<u64> { - let cutoff = Utc::now() - Duration::days(older_than_days); + WITH updated_session AS ( + UPDATE oauth2gw.verification_sessions + SET status = $1, {} = NOW() + WHERE id = $2 + RETURNING id + ), + inserted_code AS ( + INSERT INTO oauth2gw.authorization_codes (session_id, code, expires_at) + VALUES ($2, $3, NOW() + $4 * INTERVAL '1 minute') + RETURNING code + ) + INSERT INTO oauth2gw.notification_pending_webhooks + (session_id, client_id, url, body, next_attempt) + VALUES ($2, $5, $6, $7, 0) + RETURNING (SELECT code FROM inserted_code) + "#, + timestamp_field + ); - let result = sqlx::query( - r#" - DELETE FROM oauth2gw.verification_sessions - WHERE created_at < $1 - AND status IN ('completed', 'expired', 'failed') - "# - ) - .bind(cutoff) - .execute(pool) - .await?; + let code = sqlx::query_scalar::<_, String>(&query) + .bind(status) + .bind(session_id) + .bind(authorization_code) + .bind(code_expires_in_minutes) + .bind(client_id) + .bind(webhook_url) + .bind(webhook_body) + .fetch_one(pool) + .await?; - Ok(result.rows_affected()) + Ok(code) } diff --git a/oauth2_gateway/src/db/tokens.rs b/oauth2_gateway/src/db/tokens.rs @@ -3,7 +3,7 @@ use sqlx::PgPool; use anyhow::Result; use uuid::Uuid; -use chrono::{DateTime, Utc, Duration}; +use chrono::{DateTime, Utc}; /// Access token record #[derive(Debug, Clone, sqlx::FromRow)] @@ -18,131 +18,90 @@ pub struct AccessToken { pub revoked_at: Option<DateTime<Utc>>, } -/// Create a new access token for a session -/// -/// Called by the /token endpoint -pub async fn create_access_token( - pool: &PgPool, - session_id: Uuid, - token: &str, - expires_in_seconds: i64, -) -> Result<AccessToken> { - let expires_at = Utc::now() + Duration::seconds(expires_in_seconds); - - let access_token = sqlx::query_as::<_, AccessToken>( - r#" - INSERT INTO oauth2gw.access_tokens (session_id, token, token_type, expires_at) - VALUES ($1, $2, 'Bearer', $3) - RETURNING id, session_id, token, token_type, expires_at, created_at, revoked, revoked_at - "# - ) - .bind(session_id) - .bind(token) - .bind(expires_at) - .fetch_one(pool) - .await?; +use super::sessions::SessionStatus; - Ok(access_token) +/// Data returned by get_token_with_session for backend validation +#[derive(Debug, Clone)] +pub struct TokenWithSessionData { + // Token fields + pub token_id: Uuid, + pub revoked: bool, + // Session fields + pub session_status: SessionStatus, + pub verifiable_credential: Option<serde_json::Value>, } -/// Lookup and validate an access token +/// Fetch token and session data for /info endpoint /// -/// Returns None if: -/// - Token doesn't exist -/// - Token is expired -/// - Token is revoked +/// Uses UPDATE...SET revoked = revoked (no-op) for row-level locking. +/// Returns all data needed for backend validation. +/// Does NOT filter by revoked status - backend handles validation. /// -/// Called by the /info endpoint -pub async fn verify_access_token( +/// Used by the /info endpoint +pub async fn get_token_with_session( pool: &PgPool, - token: &str -) -> Result<Option<AccessToken>> { - let access_token = sqlx::query_as::<_, AccessToken>( + token: &str, +) -> Result<Option<TokenWithSessionData>> { + let result = sqlx::query( r#" - SELECT id, session_id, token, token_type, expires_at, created_at, revoked, revoked_at - FROM oauth2gw.access_tokens - WHERE token = $1 - AND expires_at > CURRENT_TIMESTAMP - AND revoked = FALSE + UPDATE oauth2gw.access_tokens t + SET revoked = t.revoked + FROM oauth2gw.verification_sessions s + WHERE t.session_id = s.id + AND t.token = $1 + AND t.expires_at > NOW() + RETURNING + t.id AS token_id, + t.revoked, + s.status AS session_status, + s.verifiable_credential "# ) .bind(token) .fetch_optional(pool) .await?; - Ok(access_token) + Ok(result.map(|row: sqlx::postgres::PgRow| { + use sqlx::Row; + TokenWithSessionData { + token_id: row.get("token_id"), + revoked: row.get("revoked"), + session_status: row.get("session_status"), + verifiable_credential: row.get("verifiable_credential"), + } + })) } -/// Get access token by session ID -pub async fn get_token_by_session( +/// Atomically create access token and update session to completed +/// +/// This is the atomicZ operation for /token endpoint. +/// Both operations succeed or fail together. +/// +/// Used by the /token endpoint after validating authorization code +pub async fn create_token_and_complete_session( pool: &PgPool, - session_id: Uuid -) -> Result<Option<AccessToken>> { + session_id: Uuid, + token: &str, + token_expires_in_seconds: i64, +) -> Result<AccessToken> { let access_token = sqlx::query_as::<_, AccessToken>( r#" - SELECT id, session_id, token, token_type, expires_at, created_at, revoked, revoked_at - FROM oauth2gw.access_tokens - WHERE session_id = $1 - ORDER BY created_at DESC - LIMIT 1 + WITH updated_session AS ( + UPDATE oauth2gw.verification_sessions + SET status = 'completed', completed_at = NOW() + WHERE id = $1 + RETURNING id + ) + INSERT INTO oauth2gw.access_tokens (session_id, token, token_type, expires_at) + VALUES ($1, $2, 'Bearer', NOW() + $3 * INTERVAL '1 second') + RETURNING id, session_id, token, token_type, expires_at, created_at, revoked, revoked_at "# ) .bind(session_id) - .fetch_optional(pool) - .await?; - - Ok(access_token) -} - -/// Revoke an access token -pub async fn revoke_token( - pool: &PgPool, - token: &str -) -> Result<bool> { - let result = sqlx::query( - r#" - UPDATE oauth2gw.access_tokens - SET - revoked = TRUE, - revoked_at = CURRENT_TIMESTAMP - WHERE token = $1 AND revoked = FALSE - "# - ) .bind(token) - .execute(pool) - .await?; - - Ok(result.rows_affected() > 0) -} - -/// Delete expired access tokens (garbage collection) -pub async fn delete_expired_tokens(pool: &PgPool) -> Result<u64> { - let result = sqlx::query( - r#" - DELETE FROM oauth2gw.access_tokens - WHERE expires_at < CURRENT_TIMESTAMP - "# - ) - .execute(pool) - .await?; - - Ok(result.rows_affected()) -} - -/// Delete old revoked tokens (garbage collection) -pub async fn delete_old_revoked_tokens(pool: &PgPool, older_than_days: i64) -> Result<u64> { - let cutoff = Utc::now() - Duration::days(older_than_days); - - let result = sqlx::query( - r#" - DELETE FROM oauth2gw.access_tokens - WHERE revoked = TRUE - AND revoked_at < $1 - "# - ) - .bind(cutoff) - .execute(pool) + .bind(token_expires_in_seconds) + .fetch_one(pool) .await?; - Ok(result.rows_affected()) + Ok(access_token) }