commit 776bb94a5802c12dc3d321e3c771ee4284a2b487
parent abdaebd3a58a4b85526c79405a580bc3e71a930f
Author: Henrique Chan Carvalho Machado <henriqueccmachado@tecnico.ulisboa.pt>
Date: Sun, 26 Oct 2025 15:27:15 +0100
oauth2_gateway: implemented DB v1 with versioning
Diffstat:
15 files changed, 1549 insertions(+), 24 deletions(-)
diff --git a/oauth2_gateway/.gitignore b/oauth2_gateway/.gitignore
@@ -1,8 +1,13 @@
# Rust
-/target/
+.cargo/
Cargo.lock
+/target/
.idea/
.vscode/
.DS_Store
*.log
-*.tmp
-\ No newline at end of file
+*.tmp
+.sqlx/
+
+# Safeguard Env
+.env
+\ No newline at end of file
diff --git a/oauth2_gateway/Cargo.toml b/oauth2_gateway/Cargo.toml
@@ -44,3 +44,9 @@ anyhow = "1.0.100"
# Cryptography
rand = "0.8.5"
base64 = "0.22.1"
+
+# Database
+sqlx = { version = "0.8", features = ["runtime-tokio", "postgres", "uuid", "chrono", "json"] }
+
+[dev-dependencies]
+tempfile = "3.8"
+\ No newline at end of file
diff --git a/oauth2_gateway/migrations/drop.sql b/oauth2_gateway/migrations/drop.sql
@@ -0,0 +1,15 @@
+-- Drop script for OAuth2 Gateway
+-- WARNING: This destroys all data in the oauth2gw schema
+BEGIN;
+
+-- Drop application schema (CASCADE removes all tables, indexes, functions)
+DROP SCHEMA IF EXISTS oauth2gw CASCADE;
+
+-- Unregister all oauth2gw patches from versioning system
+-- This allows re-running migrations from scratch
+SELECT _v.unregister_patch(patch_name)
+FROM _v.patches
+WHERE patch_name LIKE 'oauth2gw-%'
+ORDER BY patch_name DESC;
+
+COMMIT;
diff --git a/oauth2_gateway/migrations/oauth2gw-0001.sql b/oauth2_gateway/migrations/oauth2gw-0001.sql
@@ -0,0 +1,163 @@
+-- OAuth2 Gateway Initial Schema
+-- This migration creates the core tables for the OAuth2 Gateway service
+BEGIN;
+
+-- Register this patch with the depesz versioning system
+SELECT _v.register_patch('oauth2gw-0001', NULL, NULL);
+
+-- Create application schema
+CREATE SCHEMA oauth2gw;
+
+-- Set search path for this migration
+SET search_path TO oauth2gw;
+
+-- ============================================================================
+-- Table: clients
+-- Stores registered clients (e.g. Taler Exchange)
+-- ============================================================================
+CREATE TABLE clients (
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ client_id VARCHAR(255) UNIQUE NOT NULL,
+ client_secret VARCHAR(255) NOT NULL, -- Should be hashed in production
+ notification_url TEXT NOT NULL, -- Client's webhook URL for notifications
+ verifier_base_url TEXT NOT NULL, -- Swiyu Verifier URL for this client
+ verifier_management_api_path VARCHAR(255) DEFAULT '/management/api/verifications',
+ created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
+ updated_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP
+);
+
+CREATE INDEX idx_clients_client_id ON clients(client_id);
+
+COMMENT ON TABLE clients IS 'Registered clients using the OAuth2 Gateway';
+COMMENT ON COLUMN clients.client_secret IS 'Shared secret for server-to-server authentication';
+COMMENT ON COLUMN clients.notification_url IS 'Client webhook endpoint called when verification completes';
+
+-- ============================================================================
+-- Table: verification_sessions
+-- Tracks the entire verification flow from setup to completion
+-- ============================================================================
+CREATE TABLE verification_sessions (
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE,
+ nonce VARCHAR(255) UNIQUE NOT NULL, -- Cryptographically secure random value from /setup
+ scope TEXT NOT NULL, -- Requested verification scope (space-delimited attributes)
+
+ -- Swiyu Verifier data (populated after /authorize)
+ verification_url TEXT, -- URL/QR code for user to scan
+ request_id VARCHAR(255), -- Swiyu's verification request ID
+
+ -- Session status tracking
+ status VARCHAR(50) NOT NULL DEFAULT 'pending',
+
+ -- Timestamps
+ created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
+ authorized_at TIMESTAMPTZ, -- When /authorize was called
+ verified_at TIMESTAMPTZ, -- When Swiyu webhook confirmed verification
+ completed_at TIMESTAMPTZ, -- When Client retrieved the VC via /info
+ expires_at TIMESTAMPTZ NOT NULL, -- Session expiration (e.g., 15 minutes from creation)
+
+ CONSTRAINT verification_sessions_status_check
+ CHECK (status IN ('pending', 'authorized', 'verified', 'completed', 'expired', 'failed'))
+);
+
+CREATE INDEX idx_verification_sessions_nonce ON verification_sessions(nonce);
+CREATE INDEX idx_verification_sessions_request_id ON verification_sessions(request_id);
+CREATE INDEX idx_verification_sessions_status ON verification_sessions(status);
+CREATE INDEX idx_verification_sessions_expires_at ON verification_sessions(expires_at);
+
+COMMENT ON TABLE verification_sessions IS 'Tracks KYC verification sessions from setup to completion';
+COMMENT ON COLUMN verification_sessions.nonce IS 'Cryptographically secure random value used as OAuth2 authorization code';
+COMMENT ON COLUMN verification_sessions.request_id IS 'Swiyu Verifier request ID for tracking the OID4VP session';
+COMMENT ON COLUMN verification_sessions.status IS 'pending: created via /setup, authorized: /authorize called, completed: Client retrieved VC, expired: session timeout, failed: error occurred';
+
+-- ============================================================================
+-- Table: access_tokens
+-- Stores access tokens issued by /token endpoint
+-- ============================================================================
+CREATE TABLE access_tokens (
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ session_id UUID NOT NULL REFERENCES verification_sessions(id) ON DELETE CASCADE,
+ token VARCHAR(255) UNIQUE NOT NULL, -- The access_token value (cryptographically secure)
+ token_type VARCHAR(50) NOT NULL DEFAULT 'Bearer',
+ expires_at TIMESTAMPTZ NOT NULL, -- Token expiration (e.g., 1 hour from issuance)
+ created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
+ revoked BOOLEAN DEFAULT FALSE,
+ revoked_at TIMESTAMPTZ
+);
+
+CREATE INDEX idx_access_tokens_token ON access_tokens(token);
+CREATE INDEX idx_access_tokens_session_id ON access_tokens(session_id);
+CREATE INDEX idx_access_tokens_expires_at ON access_tokens(expires_at);
+CREATE INDEX idx_access_tokens_revoked ON access_tokens(revoked);
+
+COMMENT ON TABLE access_tokens IS 'OAuth2 access tokens for retrieving VCs from /info endpoint';
+COMMENT ON COLUMN access_tokens.token IS 'Bearer token value (256-bit random, base64-encoded)';
+
+-- ============================================================================
+-- Table: webhook_logs
+-- Audit log for incoming webhooks from Swiyu Verifier
+-- ============================================================================
+CREATE TABLE webhook_logs (
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ request_id VARCHAR(255), -- Swiyu's request_id from webhook path
+ session_id UUID REFERENCES verification_sessions(id) ON DELETE SET NULL,
+ payload JSONB NOT NULL, -- Full webhook payload
+ status_code INTEGER, -- HTTP status code of our response
+ processed BOOLEAN DEFAULT FALSE,
+ error_message TEXT,
+ received_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
+ processed_at TIMESTAMPTZ
+);
+
+CREATE INDEX idx_webhook_logs_request_id ON webhook_logs(request_id);
+CREATE INDEX idx_webhook_logs_session_id ON webhook_logs(session_id);
+CREATE INDEX idx_webhook_logs_processed ON webhook_logs(processed);
+CREATE INDEX idx_webhook_logs_received_at ON webhook_logs(received_at);
+
+COMMENT ON TABLE webhook_logs IS 'Audit log for incoming webhooks from Swiyu Verifier';
+COMMENT ON COLUMN webhook_logs.processed IS 'Whether webhook was successfully processed and session updated';
+
+-- ============================================================================
+-- Table: notification_logs
+-- Audit log for outgoing notifications to Client
+-- ============================================================================
+CREATE TABLE notification_logs (
+ id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
+ session_id UUID NOT NULL REFERENCES verification_sessions(id) ON DELETE CASCADE,
+ client_id UUID NOT NULL REFERENCES clients(id) ON DELETE CASCADE,
+ notification_url TEXT NOT NULL, -- Client endpoint called
+ payload JSONB NOT NULL, -- Notification body sent
+ status_code INTEGER, -- HTTP response code from Client
+ success BOOLEAN DEFAULT FALSE,
+ error_message TEXT,
+ created_at TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
+ completed_at TIMESTAMPTZ
+);
+
+CREATE INDEX idx_notification_logs_session_id ON notification_logs(session_id);
+CREATE INDEX idx_notification_logs_client_id ON notification_logs(client_id);
+CREATE INDEX idx_notification_logs_success ON notification_logs(success);
+CREATE INDEX idx_notification_logs_created_at ON notification_logs(created_at);
+
+COMMENT ON TABLE notification_logs IS 'Audit log for outgoing notifications to Client';
+COMMENT ON COLUMN notification_logs.success IS 'Whether notification was successfully delivered to Client';
+
+-- ============================================================================
+-- Trigger: Auto-update updated_at column
+-- ============================================================================
+CREATE OR REPLACE FUNCTION update_updated_at_column()
+RETURNS TRIGGER AS $$
+BEGIN
+ NEW.updated_at = CURRENT_TIMESTAMP;
+ RETURN NEW;
+END;
+$$ language 'plpgsql';
+
+CREATE TRIGGER update_clients_updated_at
+ BEFORE UPDATE ON clients
+ FOR EACH ROW
+ EXECUTE FUNCTION update_updated_at_column();
+
+COMMENT ON FUNCTION update_updated_at_column() IS 'Automatically updates updated_at timestamp on row modification';
+
+COMMIT;
diff --git a/oauth2_gateway/migrations/versioning.sql b/oauth2_gateway/migrations/versioning.sql
@@ -0,0 +1,294 @@
+-- LICENSE AND COPYRIGHT
+--
+-- Copyright (C) 2010 Hubert depesz Lubaczewski
+--
+-- This program is distributed under the (Revised) BSD License:
+-- L<http://www.opensource.org/licenses/bsd-license.php>
+--
+-- Redistribution and use in source and binary forms, with or without
+-- modification, are permitted provided that the following conditions
+-- are met:
+--
+-- * Redistributions of source code must retain the above copyright
+-- notice, this list of conditions and the following disclaimer.
+--
+-- * Redistributions in binary form must reproduce the above copyright
+-- notice, this list of conditions and the following disclaimer in the
+-- documentation and/or other materials provided with the distribution.
+--
+-- * Neither the name of Hubert depesz Lubaczewski's Organization
+-- nor the names of its contributors may be used to endorse or
+-- promote products derived from this software without specific
+-- prior written permission.
+--
+-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+-- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+-- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+-- DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE
+-- FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+-- DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+-- SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+-- CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+-- OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+-- OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+--
+-- Code origin: https://gitlab.com/depesz/Versioning/blob/master/install.versioning.sql
+--
+--
+-- # NAME
+--
+-- **Versioning** - simplistic take on tracking and applying changes to databases.
+--
+-- # DESCRIPTION
+--
+-- This project strives to provide simple way to manage changes to
+-- database.
+--
+-- Instead of making changes on development server, then finding
+-- differences between production and development, deciding which ones
+-- should be installed on production, and finding a way to install them -
+-- you start with writing diffs themselves!
+--
+-- # INSTALLATION
+--
+-- To install versioning simply run install.versioning.sql in your database
+-- (all of them: production, stage, test, devel, ...).
+--
+-- # USAGE
+--
+-- In your files with patches to database, put whole logic in single
+-- transaction, and use \_v.\* functions - usually \_v.register_patch() at
+-- least to make sure everything is OK.
+--
+-- For example. Let's assume you have patch files:
+--
+-- ## 0001.sql:
+--
+-- ```
+-- create table users (id serial primary key, username text);
+-- ```
+--
+-- ## 0002.sql:
+--
+-- ```
+-- insert into users (username) values ('depesz');
+-- ```
+-- To change it to use versioning you would change the files, to this
+-- state:
+--
+-- 0000.sql:
+--
+-- ```
+-- BEGIN;
+-- select _v.register_patch('000-base', NULL, NULL);
+-- create table users (id serial primary key, username text);
+-- COMMIT;
+-- ```
+--
+-- ## 0002.sql:
+--
+-- ```
+-- BEGIN;
+-- select _v.register_patch('001-users', ARRAY['000-base'], NULL);
+-- insert into users (username) values ('depesz');
+-- COMMIT;
+-- ```
+--
+-- This will make sure that patch 001-users can only be applied after
+-- 000-base.
+--
+-- # AVAILABLE FUNCTIONS
+--
+-- ## \_v.register_patch( TEXT )
+--
+-- Registers named patch, or dies if it is already registered.
+--
+-- Returns integer which is id of patch in \_v.patches table - only if it
+-- succeeded.
+--
+-- ## \_v.register_patch( TEXT, TEXT[] )
+--
+-- Same as \_v.register_patch( TEXT ), but checks is all given patches (given as
+-- array in second argument) are already registered.
+--
+-- ## \_v.register_patch( TEXT, TEXT[], TEXT[] )
+--
+-- Same as \_v.register_patch( TEXT, TEXT[] ), but also checks if there are no conflicts with preexisting patches.
+--
+-- Third argument is array of names of patches that conflict with current one. So
+-- if any of them is installed - register_patch will error out.
+--
+-- ## \_v.unregister_patch( TEXT )
+--
+-- Removes information about given patch from the versioning data.
+--
+-- It doesn't remove objects that were created by this patch - just removes
+-- metainformation.
+--
+-- ## \_v.assert_user_is_superuser()
+--
+-- Make sure that current patch is being loaded by superuser.
+--
+-- If it's not - it will raise exception, and break transaction.
+--
+-- ## \_v.assert_user_is_not_superuser()
+--
+-- Make sure that current patch is not being loaded by superuser.
+--
+-- If it is - it will raise exception, and break transaction.
+--
+-- ## \_v.assert_user_is_one_of(TEXT, TEXT, ... )
+--
+-- Make sure that current patch is being loaded by one of listed users.
+--
+-- If ```current_user``` is not listed as one of arguments - function will raise
+-- exception and break the transaction.
+
+BEGIN;
+
+
+-- This file adds versioning support to database it will be loaded to.
+-- It requires that PL/pgSQL is already loaded - will raise exception otherwise.
+-- All versioning "stuff" (tables, functions) is in "_v" schema.
+
+-- All functions are defined as 'RETURNS SETOF INT4' to be able to make them to RETURN literally nothing (0 rows).
+-- >> RETURNS VOID<< IS similar, but it still outputs "empty line" in psql when calling
+CREATE SCHEMA IF NOT EXISTS _v;
+COMMENT ON SCHEMA _v IS 'Schema for versioning data and functionality.';
+
+CREATE TABLE IF NOT EXISTS _v.patches (
+ patch_name TEXT PRIMARY KEY,
+ applied_tsz TIMESTAMPTZ NOT NULL DEFAULT now(),
+ applied_by TEXT NOT NULL,
+ requires TEXT[],
+ conflicts TEXT[]
+);
+COMMENT ON TABLE _v.patches IS 'Contains information about what patches are currently applied on database.';
+COMMENT ON COLUMN _v.patches.patch_name IS 'Name of patch, has to be unique for every patch.';
+COMMENT ON COLUMN _v.patches.applied_tsz IS 'When the patch was applied.';
+COMMENT ON COLUMN _v.patches.applied_by IS 'Who applied this patch (PostgreSQL username)';
+COMMENT ON COLUMN _v.patches.requires IS 'List of patches that are required for given patch.';
+COMMENT ON COLUMN _v.patches.conflicts IS 'List of patches that conflict with given patch.';
+
+CREATE OR REPLACE FUNCTION _v.register_patch( IN in_patch_name TEXT, IN in_requirements TEXT[], in_conflicts TEXT[], OUT versioning INT4 ) RETURNS setof INT4 AS $$
+DECLARE
+ t_text TEXT;
+ t_text_a TEXT[];
+ i INT4;
+BEGIN
+ -- Thanks to this we know only one patch will be applied at a time
+ LOCK TABLE _v.patches IN EXCLUSIVE MODE;
+
+ SELECT patch_name INTO t_text FROM _v.patches WHERE patch_name = in_patch_name;
+ IF FOUND THEN
+ RAISE EXCEPTION 'Patch % is already applied!', in_patch_name;
+ END IF;
+
+ t_text_a := ARRAY( SELECT patch_name FROM _v.patches WHERE patch_name = any( in_conflicts ) );
+ IF array_upper( t_text_a, 1 ) IS NOT NULL THEN
+ RAISE EXCEPTION 'Versioning patches conflict. Conflicting patche(s) installed: %.', array_to_string( t_text_a, ', ' );
+ END IF;
+
+ IF array_upper( in_requirements, 1 ) IS NOT NULL THEN
+ t_text_a := '{}';
+ FOR i IN array_lower( in_requirements, 1 ) .. array_upper( in_requirements, 1 ) LOOP
+ SELECT patch_name INTO t_text FROM _v.patches WHERE patch_name = in_requirements[i];
+ IF NOT FOUND THEN
+ t_text_a := t_text_a || in_requirements[i];
+ END IF;
+ END LOOP;
+ IF array_upper( t_text_a, 1 ) IS NOT NULL THEN
+ RAISE EXCEPTION 'Missing prerequisite(s): %.', array_to_string( t_text_a, ', ' );
+ END IF;
+ END IF;
+
+ INSERT INTO _v.patches (patch_name, applied_tsz, applied_by, requires, conflicts ) VALUES ( in_patch_name, now(), current_user, coalesce( in_requirements, '{}' ), coalesce( in_conflicts, '{}' ) );
+ RETURN;
+END;
+$$ language plpgsql;
+COMMENT ON FUNCTION _v.register_patch( TEXT, TEXT[], TEXT[] ) IS 'Function to register patches in database. Raises exception if there are conflicts, prerequisites are not installed or the migration has already been installed.';
+
+CREATE OR REPLACE FUNCTION _v.register_patch( TEXT, TEXT[] ) RETURNS setof INT4 AS $$
+ SELECT _v.register_patch( $1, $2, NULL );
+$$ language sql;
+COMMENT ON FUNCTION _v.register_patch( TEXT, TEXT[] ) IS 'Wrapper to allow registration of patches without conflicts.';
+CREATE OR REPLACE FUNCTION _v.register_patch( TEXT ) RETURNS setof INT4 AS $$
+ SELECT _v.register_patch( $1, NULL, NULL );
+$$ language sql;
+COMMENT ON FUNCTION _v.register_patch( TEXT ) IS 'Wrapper to allow registration of patches without requirements and conflicts.';
+
+CREATE OR REPLACE FUNCTION _v.unregister_patch( IN in_patch_name TEXT, OUT versioning INT4 ) RETURNS setof INT4 AS $$
+DECLARE
+ i INT4;
+ t_text_a TEXT[];
+BEGIN
+ -- Thanks to this we know only one patch will be applied at a time
+ LOCK TABLE _v.patches IN EXCLUSIVE MODE;
+
+ t_text_a := ARRAY( SELECT patch_name FROM _v.patches WHERE in_patch_name = ANY( requires ) );
+ IF array_upper( t_text_a, 1 ) IS NOT NULL THEN
+ RAISE EXCEPTION 'Cannot uninstall %, as it is required by: %.', in_patch_name, array_to_string( t_text_a, ', ' );
+ END IF;
+
+ DELETE FROM _v.patches WHERE patch_name = in_patch_name;
+ GET DIAGNOSTICS i = ROW_COUNT;
+ IF i < 1 THEN
+ RAISE EXCEPTION 'Patch % is not installed, so it can''t be uninstalled!', in_patch_name;
+ END IF;
+
+ RETURN;
+END;
+$$ language plpgsql;
+COMMENT ON FUNCTION _v.unregister_patch( TEXT ) IS 'Function to unregister patches in database. Dies if the patch is not registered, or if unregistering it would break dependencies.';
+
+CREATE OR REPLACE FUNCTION _v.assert_patch_is_applied( IN in_patch_name TEXT ) RETURNS TEXT as $$
+DECLARE
+ t_text TEXT;
+BEGIN
+ SELECT patch_name INTO t_text FROM _v.patches WHERE patch_name = in_patch_name;
+ IF NOT FOUND THEN
+ RAISE EXCEPTION 'Patch % is not applied!', in_patch_name;
+ END IF;
+ RETURN format('Patch %s is applied.', in_patch_name);
+END;
+$$ language plpgsql;
+COMMENT ON FUNCTION _v.assert_patch_is_applied( TEXT ) IS 'Function that can be used to make sure that patch has been applied.';
+
+CREATE OR REPLACE FUNCTION _v.assert_user_is_superuser() RETURNS TEXT as $$
+DECLARE
+ v_super bool;
+BEGIN
+ SELECT usesuper INTO v_super FROM pg_user WHERE usename = current_user;
+ IF v_super THEN
+ RETURN 'assert_user_is_superuser: OK';
+ END IF;
+ RAISE EXCEPTION 'Current user is not superuser - cannot continue.';
+END;
+$$ language plpgsql;
+COMMENT ON FUNCTION _v.assert_user_is_superuser() IS 'Function that can be used to make sure that patch is being applied using superuser account.';
+
+CREATE OR REPLACE FUNCTION _v.assert_user_is_not_superuser() RETURNS TEXT as $$
+DECLARE
+ v_super bool;
+BEGIN
+ SELECT usesuper INTO v_super FROM pg_user WHERE usename = current_user;
+ IF v_super THEN
+ RAISE EXCEPTION 'Current user is superuser - cannot continue.';
+ END IF;
+ RETURN 'assert_user_is_not_superuser: OK';
+END;
+$$ language plpgsql;
+COMMENT ON FUNCTION _v.assert_user_is_not_superuser() IS 'Function that can be used to make sure that patch is being applied using normal (not superuser) account.';
+
+CREATE OR REPLACE FUNCTION _v.assert_user_is_one_of(VARIADIC p_acceptable_users TEXT[] ) RETURNS TEXT as $$
+DECLARE
+BEGIN
+ IF current_user = any( p_acceptable_users ) THEN
+ RETURN 'assert_user_is_one_of: OK';
+ END IF;
+ RAISE EXCEPTION 'User is not one of: % - cannot continue.', p_acceptable_users;
+END;
+$$ language plpgsql;
+COMMENT ON FUNCTION _v.assert_user_is_one_of(TEXT[]) IS 'Function that can be used to make sure that patch is being applied by one of defined users.';
+
+COMMIT;
diff --git a/oauth2_gateway/scripts/setup_test_db.sh b/oauth2_gateway/scripts/setup_test_db.sh
@@ -0,0 +1,88 @@
+#!/bin/bash
+
+DB_PORT=5432
+DB_NAME=oauth2gw
+DB_USER=oauth2gw
+DB_PASS=password
+DB_ADMIN=${DB_ADMIN:-}
+
+echo "Setting up PostgreSQL database for OAuth2 Gateway test environment..."
+echo
+echo "WARNING: Test Environment!"
+echo
+
+if ! command -v psql &> /dev/null
+then
+ echo "psql could not be found, please install PostgreSQL first."
+ exit 1
+fi
+
+if ! pg_isready -h localhost -p "$DB_PORT" >/dev/null 2>&1; then
+ echo "PostgreSQL is not running."
+ exit 1
+fi
+
+PSQL_CMD="psql -h localhost -p $DB_PORT"
+if [ -n "$DB_ADMIN" ]; then
+ PSQL_CMD="$PSQL_CMD -U $DB_ADMIN"
+fi
+
+# Create user if not exists
+echo "Creating database user..."
+$PSQL_CMD -d postgres -tc "SELECT 1 FROM pg_roles WHERE rolname='$DB_USER'" | grep -q 1 || \
+$PSQL_CMD -d postgres -c "CREATE USER $DB_USER WITH PASSWORD '$DB_PASS';"
+
+# Create database if not exists
+echo "Creating database..."
+$PSQL_CMD -d postgres -tc "SELECT 1 FROM pg_database WHERE datname='$DB_NAME'" | grep -q 1 || \
+$PSQL_CMD -d postgres -c "CREATE DATABASE $DB_NAME OWNER $DB_USER;"
+
+# Grant privileges
+echo "Granting privileges..."
+$PSQL_CMD -d "$DB_NAME" -c "GRANT ALL PRIVILEGES ON DATABASE $DB_NAME TO $DB_USER;"
+
+# Set connection URL
+DB_URL="postgresql://${DB_USER}:${DB_PASS}@localhost:${DB_PORT}/${DB_NAME}"
+
+# Install versioning system (idempotent - safe to run multiple times)
+echo "Installing depesz versioning system..."
+MIGRATIONS_DIR="$(dirname "$0")/../migrations"
+$PSQL_CMD -d "$DB_NAME" -f "$MIGRATIONS_DIR/versioning.sql"
+
+# Apply patches (versioning.sql handles duplicate detection)
+echo "Applying database patches..."
+for patch_file in "$MIGRATIONS_DIR"/oauth2gw-*.sql; do
+ if [ -f "$patch_file" ]; then
+ patch_name=$(basename "$patch_file" .sql)
+ echo "Applying patch: $patch_name"
+ $PSQL_CMD -d "$DB_NAME" -f "$patch_file" 2>&1 | grep -v "Patch .* is already applied" || true
+ fi
+done
+
+# Seed test data
+echo "Seeding test data..."
+$PSQL_CMD -d "$DB_NAME" <<EOF
+INSERT INTO oauth2gw.clients (client_id, client_secret, notification_url, verifier_base_url, verifier_management_api_path)
+VALUES (
+ 'test-exchange-001',
+ 'test-secret-123',
+ 'http://localhost:9000/kyc/webhook',
+ 'http://localhost:8080',
+ '/management/api/verifications'
+)
+ON CONFLICT (client_id) DO NOTHING;
+EOF
+
+echo
+echo "Setup completed."
+echo
+echo "Database: $DB_NAME"
+echo "User: $DB_USER"
+echo "Host: localhost"
+echo "Port: $DB_PORT"
+echo
+echo "Connection string:"
+echo " $DB_URL"
+echo
+echo "Set TEST_DATABASE_URL environment variable:"
+echo " export TEST_DATABASE_URL=\"$DB_URL\""
diff --git a/oauth2_gateway/src/db/clients.rs b/oauth2_gateway/src/db/clients.rs
@@ -0,0 +1,191 @@
+// Database operations for clients table
+
+use sqlx::PgPool;
+use anyhow::Result;
+use uuid::Uuid;
+use chrono::{DateTime, Utc};
+
+/// Client registration record
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct Client {
+ pub id: Uuid,
+ pub client_id: String,
+ pub client_secret: String,
+ pub notification_url: String,
+ pub verifier_base_url: String,
+ pub verifier_management_api_path: String,
+ pub created_at: DateTime<Utc>,
+ pub updated_at: DateTime<Utc>,
+}
+
+/// Register a new client
+pub async fn register_client(
+ pool: &PgPool,
+ client_id: &str,
+ client_secret: &str,
+ notification_url: &str,
+ verifier_base_url: &str,
+ verifier_management_api_path: Option<&str>,
+) -> Result<Client> {
+ let api_path = verifier_management_api_path
+ .unwrap_or("/management/api/verifications");
+
+ let client = sqlx::query_as::<_, Client>(
+ r#"
+ INSERT INTO oauth2gw.clients
+ (client_id, client_secret, notification_url, verifier_base_url, verifier_management_api_path)
+ VALUES ($1, $2, $3, $4, $5)
+ RETURNING id, client_id, client_secret, notification_url, verifier_base_url,
+ verifier_management_api_path, created_at, updated_at
+ "#
+ )
+ .bind(client_id)
+ .bind(client_secret)
+ .bind(notification_url)
+ .bind(verifier_base_url)
+ .bind(api_path)
+ .fetch_one(pool)
+ .await?;
+
+ Ok(client)
+}
+
+/// Lookup client by client_id
+pub async fn get_client_by_id(
+ pool: &PgPool,
+ client_id: &str
+) -> Result<Option<Client>> {
+ let client = sqlx::query_as::<_, Client>(
+ r#"
+ SELECT id, client_id, client_secret, notification_url, verifier_base_url,
+ verifier_management_api_path, created_at, updated_at
+ FROM oauth2gw.clients
+ WHERE client_id = $1
+ "#
+ )
+ .bind(client_id)
+ .fetch_optional(pool)
+ .await?;
+
+ Ok(client)
+}
+
+/// Lookup client by UUID
+pub async fn get_client_by_uuid(
+ pool: &PgPool,
+ id: Uuid
+) -> Result<Option<Client>> {
+ let client = sqlx::query_as::<_, Client>(
+ r#"
+ SELECT id, client_id, client_secret, notification_url, verifier_base_url,
+ verifier_management_api_path, created_at, updated_at
+ FROM oauth2gw.clients
+ WHERE id = $1
+ "#
+ )
+ .bind(id)
+ .fetch_optional(pool)
+ .await?;
+
+ Ok(client)
+}
+
+/// Authenticate a client by validating client_secret
+///
+/// Returns the client if authentication succeeds, None otherwise
+pub async fn authenticate_client(
+ pool: &PgPool,
+ client_id: &str,
+ client_secret: &str
+) -> Result<Option<Client>> {
+ let client = sqlx::query_as::<_, Client>(
+ r#"
+ SELECT id, client_id, client_secret, notification_url, verifier_base_url,
+ verifier_management_api_path, created_at, updated_at
+ FROM oauth2gw.clients
+ WHERE client_id = $1 AND client_secret = $2
+ "#
+ )
+ .bind(client_id)
+ .bind(client_secret)
+ .fetch_optional(pool)
+ .await?;
+
+ Ok(client)
+}
+
+/// Update client configuration
+pub async fn update_client(
+ pool: &PgPool,
+ id: Uuid,
+ notification_url: Option<&str>,
+ verifier_base_url: Option<&str>,
+ verifier_management_api_path: Option<&str>,
+) -> Result<Client> {
+ // Get current client to use for fields that aren't being updated
+ let current = get_client_by_uuid(pool, id).await?
+ .ok_or_else(|| anyhow::anyhow!("Client not found"))?;
+
+ let new_notification_url = notification_url.unwrap_or(¤t.notification_url);
+ let new_verifier_base_url = verifier_base_url.unwrap_or(¤t.verifier_base_url);
+ let new_verifier_api_path = verifier_management_api_path
+ .unwrap_or(¤t.verifier_management_api_path);
+
+ let client = sqlx::query_as::<_, Client>(
+ r#"
+ UPDATE oauth2gw.clients
+ SET
+ notification_url = $1,
+ verifier_base_url = $2,
+ verifier_management_api_path = $3,
+ updated_at = CURRENT_TIMESTAMP
+ WHERE id = $4
+ RETURNING id, client_id, client_secret, notification_url, verifier_base_url,
+ verifier_management_api_path, created_at, updated_at
+ "#
+ )
+ .bind(new_notification_url)
+ .bind(new_verifier_base_url)
+ .bind(new_verifier_api_path)
+ .bind(id)
+ .fetch_one(pool)
+ .await?;
+
+ Ok(client)
+}
+
+/// Delete a client (and cascade delete all associated sessions)
+///
+/// WARNING: Deletes all associated data as well! (e.g., verifications)!
+pub async fn delete_client(
+ pool: &PgPool,
+ id: Uuid
+) -> Result<bool> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM oauth2gw.clients
+ WHERE id = $1
+ "#
+ )
+ .bind(id)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// List all registered clients
+pub async fn list_clients(pool: &PgPool) -> Result<Vec<Client>> {
+ let clients = sqlx::query_as::<_, Client>(
+ r#"
+ SELECT id, client_id, client_secret, notification_url, verifier_base_url,
+ verifier_management_api_path, created_at, updated_at
+ FROM oauth2gw.clients
+ ORDER BY created_at DESC
+ "#
+ )
+ .fetch_all(pool)
+ .await?;
+
+ Ok(clients)
+}
+\ No newline at end of file
diff --git a/oauth2_gateway/src/db/logs.rs b/oauth2_gateway/src/db/logs.rs
@@ -0,0 +1,311 @@
+// Database operations for webhook_logs and notification_logs tables
+
+use sqlx::PgPool;
+use anyhow::Result;
+use uuid::Uuid;
+use chrono::{DateTime, Utc, Duration};
+use serde_json::Value as JsonValue;
+
+/// Webhook log record
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct WebhookLog {
+ pub id: Uuid,
+ pub request_id: Option<String>,
+ pub session_id: Option<Uuid>,
+ pub payload: JsonValue,
+ pub status_code: Option<i32>,
+ pub processed: bool,
+ pub error_message: Option<String>,
+ pub received_at: DateTime<Utc>,
+ pub processed_at: Option<DateTime<Utc>>,
+}
+
+/// Notification log record
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct NotificationLog {
+ pub id: Uuid,
+ pub session_id: Uuid,
+ pub client_id: Uuid,
+ pub notification_url: String,
+ pub payload: JsonValue,
+ pub status_code: Option<i32>,
+ pub success: bool,
+ pub error_message: Option<String>,
+ pub created_at: DateTime<Utc>,
+ pub completed_at: Option<DateTime<Utc>>,
+}
+
+// ============================================================================
+// Webhook Logs (Incoming from Swiyu Verifier)
+// ============================================================================
+
+/// Log an incoming webhook from Swiyu Verifier
+pub async fn log_webhook_received(
+ pool: &PgPool,
+ request_id: Option<&str>,
+ session_id: Option<Uuid>,
+ payload: &JsonValue,
+) -> Result<Uuid> {
+ let log_id = sqlx::query_scalar::<_, Uuid>(
+ r#"
+ INSERT INTO oauth2gw.webhook_logs (request_id, session_id, payload, processed)
+ VALUES ($1, $2, $3, FALSE)
+ RETURNING id
+ "#
+ )
+ .bind(request_id)
+ .bind(session_id)
+ .bind(payload)
+ .fetch_one(pool)
+ .await?;
+
+ Ok(log_id)
+}
+
+/// Mark webhook as successfully processed
+pub async fn mark_webhook_processed(
+ pool: &PgPool,
+ log_id: Uuid,
+ status_code: i32,
+) -> Result<()> {
+ sqlx::query(
+ r#"
+ UPDATE oauth2gw.webhook_logs
+ SET
+ processed = TRUE,
+ status_code = $1,
+ processed_at = CURRENT_TIMESTAMP
+ WHERE id = $2
+ "#
+ )
+ .bind(status_code)
+ .bind(log_id)
+ .execute(pool)
+ .await?;
+
+ Ok(())
+}
+
+/// Mark webhook as failed with error
+pub async fn mark_webhook_failed(
+ pool: &PgPool,
+ log_id: Uuid,
+ status_code: i32,
+ error_message: &str,
+) -> Result<()> {
+ sqlx::query(
+ r#"
+ UPDATE oauth2gw.webhook_logs
+ SET
+ processed = FALSE,
+ status_code = $1,
+ error_message = $2,
+ processed_at = CURRENT_TIMESTAMP
+ WHERE id = $3
+ "#
+ )
+ .bind(status_code)
+ .bind(error_message)
+ .bind(log_id)
+ .execute(pool)
+ .await?;
+
+ Ok(())
+}
+
+/// Get webhook logs for a session
+pub async fn get_webhook_logs_for_session(
+ pool: &PgPool,
+ session_id: Uuid,
+) -> Result<Vec<WebhookLog>> {
+ let logs = sqlx::query_as::<_, WebhookLog>(
+ r#"
+ SELECT id, request_id, session_id, payload, status_code, processed,
+ error_message, received_at, processed_at
+ FROM oauth2gw.webhook_logs
+ WHERE session_id = $1
+ ORDER BY received_at DESC
+ "#
+ )
+ .bind(session_id)
+ .fetch_all(pool)
+ .await?;
+
+ Ok(logs)
+}
+
+/// Get unprocessed webhooks (for retry logic)
+pub async fn get_unprocessed_webhooks(pool: &PgPool, limit: i64) -> Result<Vec<WebhookLog>> {
+ let logs = sqlx::query_as::<_, WebhookLog>(
+ r#"
+ SELECT id, request_id, session_id, payload, status_code, processed,
+ error_message, received_at, processed_at
+ FROM oauth2gw.webhook_logs
+ WHERE processed = FALSE
+ AND received_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
+ ORDER BY received_at ASC
+ LIMIT $1
+ "#
+ )
+ .bind(limit)
+ .fetch_all(pool)
+ .await?;
+
+ Ok(logs)
+}
+
+/// Delete old webhook logs (garbage collection)
+pub async fn delete_old_webhook_logs(pool: &PgPool, older_than_days: i64) -> Result<u64> {
+ let cutoff = Utc::now() - Duration::days(older_than_days);
+
+ let result = sqlx::query(
+ r#"
+ DELETE FROM oauth2gw.webhook_logs
+ WHERE received_at < $1
+ "#
+ )
+ .bind(cutoff)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected())
+}
+
+// ============================================================================
+// Notification Logs (Outgoing to Client)
+// ============================================================================
+
+/// Log an outgoing notification to Client
+pub async fn log_notification_sent(
+ pool: &PgPool,
+ session_id: Uuid,
+ client_id: Uuid,
+ notification_url: &str,
+ payload: &JsonValue,
+) -> Result<Uuid> {
+ let log_id = sqlx::query_scalar::<_, Uuid>(
+ r#"
+ INSERT INTO oauth2gw.notification_logs
+ (session_id, client_id, notification_url, payload, success)
+ VALUES ($1, $2, $3, $4, FALSE)
+ RETURNING id
+ "#
+ )
+ .bind(session_id)
+ .bind(client_id)
+ .bind(notification_url)
+ .bind(payload)
+ .fetch_one(pool)
+ .await?;
+
+ Ok(log_id)
+}
+
+/// Mark notification as successfully delivered
+pub async fn mark_notification_success(
+ pool: &PgPool,
+ log_id: Uuid,
+ status_code: i32,
+) -> Result<()> {
+ sqlx::query(
+ r#"
+ UPDATE oauth2gw.notification_logs
+ SET
+ success = TRUE,
+ status_code = $1,
+ completed_at = CURRENT_TIMESTAMP
+ WHERE id = $2
+ "#
+ )
+ .bind(status_code)
+ .bind(log_id)
+ .execute(pool)
+ .await?;
+
+ Ok(())
+}
+
+/// Mark notification as failed
+pub async fn mark_notification_failed(
+ pool: &PgPool,
+ log_id: Uuid,
+ status_code: Option<i32>,
+ error_message: &str,
+) -> Result<()> {
+ sqlx::query(
+ r#"
+ UPDATE oauth2gw.notification_logs
+ SET
+ success = FALSE,
+ status_code = $1,
+ error_message = $2,
+ completed_at = CURRENT_TIMESTAMP
+ WHERE id = $3
+ "#
+ )
+ .bind(status_code)
+ .bind(error_message)
+ .bind(log_id)
+ .execute(pool)
+ .await?;
+
+ Ok(())
+}
+
+/// Get notification logs for a session
+pub async fn get_notification_logs_for_session(
+ pool: &PgPool,
+ session_id: Uuid,
+) -> Result<Vec<NotificationLog>> {
+ let logs = sqlx::query_as::<_, NotificationLog>(
+ r#"
+ SELECT id, session_id, client_id, notification_url, payload, status_code,
+ success, error_message, created_at, completed_at
+ FROM oauth2gw.notification_logs
+ WHERE session_id = $1
+ ORDER BY created_at DESC
+ "#
+ )
+ .bind(session_id)
+ .fetch_all(pool)
+ .await?;
+
+ Ok(logs)
+}
+
+/// Get failed notifications for retry
+pub async fn get_failed_notifications(pool: &PgPool, limit: i64) -> Result<Vec<NotificationLog>> {
+ let logs = sqlx::query_as::<_, NotificationLog>(
+ r#"
+ SELECT id, session_id, client_id, notification_url, payload, status_code,
+ success, error_message, created_at, completed_at
+ FROM oauth2gw.notification_logs
+ WHERE success = FALSE
+ AND created_at > CURRENT_TIMESTAMP - INTERVAL '1 hour'
+ ORDER BY created_at ASC
+ LIMIT $1
+ "#
+ )
+ .bind(limit)
+ .fetch_all(pool)
+ .await?;
+
+ Ok(logs)
+}
+
+/// Delete old notification logs (garbage collection)
+pub async fn delete_old_notification_logs(pool: &PgPool, older_than_days: i64) -> Result<u64> {
+ let cutoff = Utc::now() - Duration::days(older_than_days);
+
+ let result = sqlx::query(
+ r#"
+ DELETE FROM oauth2gw.notification_logs
+ WHERE created_at < $1
+ "#
+ )
+ .bind(cutoff)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected())
+}
+\ No newline at end of file
diff --git a/oauth2_gateway/src/db/mod.rs b/oauth2_gateway/src/db/mod.rs
@@ -0,0 +1,32 @@
+// Database module for OAuth2 Gateway
+// Provides database connection pooling and business logic operations
+
+use sqlx::{PgPool, postgres::PgPoolOptions};
+use anyhow::{Result, Context};
+
+pub mod sessions;
+pub mod tokens;
+pub mod clients;
+pub mod logs;
+
+/// Create a PostgreSQL connection pool
+///
+/// # Arguments
+/// * `database_url` - PostgreSQL connection string (e.g., "postgresql://user:pass@localhost/dbname")
+///
+/// # Returns
+/// Connection pool ready for use
+///
+/// # Notes
+/// Assumes the database schema is already set up via migrations.
+/// Run scripts/setup_test_db.sh to initialize the database.
+pub async fn create_pool(database_url: &str) -> Result<PgPool> {
+ let pool = PgPoolOptions::new()
+ .max_connections(10)
+ .connect(database_url)
+ .await
+ .context("Failed to connect to PostgreSQL")?;
+
+ tracing::info!("Database connection pool created");
+ Ok(pool)
+}
diff --git a/oauth2_gateway/src/db/sessions.rs b/oauth2_gateway/src/db/sessions.rs
@@ -0,0 +1,258 @@
+// Database operations for verification_sessions table
+
+use sqlx::PgPool;
+use anyhow::Result;
+use uuid::Uuid;
+use chrono::{DateTime, Utc, Duration};
+
+/// Status of a verification session
+#[derive(Debug, Clone, sqlx::Type, serde::Serialize, serde::Deserialize, PartialEq)]
+#[sqlx(type_name = "text")]
+pub enum SessionStatus {
+ #[sqlx(rename = "pending")]
+ Pending,
+ #[sqlx(rename = "authorized")]
+ Authorized,
+ #[sqlx(rename = "verified")]
+ Verified,
+ #[sqlx(rename = "completed")]
+ Completed,
+ #[sqlx(rename = "expired")]
+ Expired,
+ #[sqlx(rename = "failed")]
+ Failed,
+}
+
+/// Verification session record
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct VerificationSession {
+ pub id: Uuid,
+ pub client_id: Uuid,
+ pub nonce: String,
+ pub scope: String,
+ pub verification_url: Option<String>,
+ pub request_id: Option<String>,
+ pub status: SessionStatus,
+ pub created_at: DateTime<Utc>,
+ pub authorized_at: Option<DateTime<Utc>>,
+ pub verified_at: Option<DateTime<Utc>>,
+ pub completed_at: Option<DateTime<Utc>>,
+ pub expires_at: DateTime<Utc>,
+}
+
+/// Create a new verification session
+///
+/// This is called by the /setup endpoint
+pub async fn create_session(
+ pool: &PgPool,
+ client_id: Uuid,
+ nonce: &str,
+ scope: &str,
+ expires_in_minutes: i64,
+) -> Result<VerificationSession> {
+ let expires_at = Utc::now() + Duration::minutes(expires_in_minutes);
+
+ let session = sqlx::query_as::<_, VerificationSession>(
+ r#"
+ INSERT INTO oauth2gw.verification_sessions (client_id, nonce, scope, expires_at, status)
+ VALUES ($1, $2, $3, $4, 'pending')
+ RETURNING
+ id, client_id, nonce, scope, verification_url, request_id, status,
+ created_at, authorized_at, verified_at, completed_at, expires_at
+ "#
+ )
+ .bind(client_id)
+ .bind(nonce)
+ .bind(scope)
+ .bind(expires_at)
+ .fetch_one(pool)
+ .await?;
+
+ Ok(session)
+}
+
+/// Lookup session by nonce
+///
+/// This is used by /authorize and /token endpoints
+pub async fn get_session_by_nonce(
+ pool: &PgPool,
+ nonce: &str
+) -> Result<Option<VerificationSession>> {
+ let session = sqlx::query_as::<_, VerificationSession>(
+ r#"
+ SELECT
+ id, client_id, nonce, scope, verification_url, request_id, status,
+ created_at, authorized_at, verified_at, completed_at, expires_at
+ FROM oauth2gw.verification_sessions
+ WHERE nonce = $1
+ "#
+ )
+ .bind(nonce)
+ .fetch_optional(pool)
+ .await?;
+
+ Ok(session)
+}
+
+/// Lookup session by Swiyu request_id
+///
+/// This is used by the webhook handler
+pub async fn get_session_by_request_id(
+ pool: &PgPool,
+ request_id: &str
+) -> Result<Option<VerificationSession>> {
+ let session = sqlx::query_as::<_, VerificationSession>(
+ r#"
+ SELECT
+ id, client_id, nonce, scope, verification_url, request_id, status,
+ created_at, authorized_at, verified_at, completed_at, expires_at
+ FROM oauth2gw.verification_sessions
+ WHERE request_id = $1
+ "#
+ )
+ .bind(request_id)
+ .fetch_optional(pool)
+ .await?;
+
+ Ok(session)
+}
+
+/// Update session with Swiyu verification data after calling /authorize
+pub async fn update_session_authorized(
+ pool: &PgPool,
+ session_id: Uuid,
+ verification_url: &str,
+ request_id: &str,
+) -> Result<()> {
+ sqlx::query(
+ r#"
+ UPDATE oauth2gw.verification_sessions
+ SET
+ verification_url = $1,
+ request_id = $2,
+ status = 'authorized',
+ authorized_at = CURRENT_TIMESTAMP
+ WHERE id = $3
+ "#
+ )
+ .bind(verification_url)
+ .bind(request_id)
+ .bind(session_id)
+ .execute(pool)
+ .await?;
+
+ Ok(())
+}
+
+/// Mark session as verified (called by webhook handler)
+pub async fn mark_session_verified(
+ pool: &PgPool,
+ session_id: Uuid,
+) -> Result<()> {
+ sqlx::query(
+ r#"
+ UPDATE oauth2gw.verification_sessions
+ SET
+ status = 'verified',
+ verified_at = CURRENT_TIMESTAMP
+ WHERE id = $1
+ "#
+ )
+ .bind(session_id)
+ .execute(pool)
+ .await?;
+
+ Ok(())
+}
+
+/// Mark session as completed (called after /info returns VC)
+pub async fn mark_session_completed(
+ pool: &PgPool,
+ session_id: Uuid,
+) -> Result<()> {
+ sqlx::query(
+ r#"
+ UPDATE oauth2gw.verification_sessions
+ SET
+ status = 'completed',
+ completed_at = CURRENT_TIMESTAMP
+ WHERE id = $1
+ "#
+ )
+ .bind(session_id)
+ .execute(pool)
+ .await?;
+
+ Ok(())
+}
+
+/// Mark session as failed with error
+pub async fn mark_session_failed(
+ pool: &PgPool,
+ session_id: Uuid,
+) -> Result<()> {
+ sqlx::query(
+ r#"
+ UPDATE oauth2gw.verification_sessions
+ SET status = 'failed'
+ WHERE id = $1
+ "#
+ )
+ .bind(session_id)
+ .execute(pool)
+ .await?;
+
+ Ok(())
+}
+
+/// Get all expired sessions for garbage collection
+pub async fn get_expired_sessions(pool: &PgPool) -> Result<Vec<VerificationSession>> {
+ let sessions = sqlx::query_as::<_, VerificationSession>(
+ r#"
+ SELECT
+ id, client_id, nonce, scope, verification_url, request_id, status,
+ created_at, authorized_at, verified_at, completed_at, expires_at
+ FROM oauth2gw.verification_sessions
+ WHERE expires_at < CURRENT_TIMESTAMP
+ AND status NOT IN ('completed', 'expired', 'failed')
+ "#
+ )
+ .fetch_all(pool)
+ .await?;
+
+ Ok(sessions)
+}
+
+/// Mark expired sessions as expired (garbage collection)
+pub async fn mark_expired_sessions(pool: &PgPool) -> Result<u64> {
+ let result = sqlx::query(
+ r#"
+ UPDATE oauth2gw.verification_sessions
+ SET status = 'expired'
+ WHERE expires_at < CURRENT_TIMESTAMP
+ AND status NOT IN ('completed', 'expired', 'failed')
+ "#
+ )
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected())
+}
+
+/// Delete old completed/expired/failed sessions (garbage collection)
+pub async fn delete_old_sessions(pool: &PgPool, older_than_days: i64) -> Result<u64> {
+ let cutoff = Utc::now() - Duration::days(older_than_days);
+
+ let result = sqlx::query(
+ r#"
+ DELETE FROM oauth2gw.verification_sessions
+ WHERE created_at < $1
+ AND status IN ('completed', 'expired', 'failed')
+ "#
+ )
+ .bind(cutoff)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected())
+}
diff --git a/oauth2_gateway/src/db/tokens.rs b/oauth2_gateway/src/db/tokens.rs
@@ -0,0 +1,148 @@
+// Database operations for access_tokens table
+
+use sqlx::PgPool;
+use anyhow::Result;
+use uuid::Uuid;
+use chrono::{DateTime, Utc, Duration};
+
+/// Access token record
+#[derive(Debug, Clone, sqlx::FromRow)]
+pub struct AccessToken {
+ pub id: Uuid,
+ pub session_id: Uuid,
+ pub token: String,
+ pub token_type: String,
+ pub expires_at: DateTime<Utc>,
+ pub created_at: DateTime<Utc>,
+ pub revoked: bool,
+ pub revoked_at: Option<DateTime<Utc>>,
+}
+
+/// Create a new access token for a session
+///
+/// Called by the /token endpoint
+pub async fn create_access_token(
+ pool: &PgPool,
+ session_id: Uuid,
+ token: &str,
+ expires_in_seconds: i64,
+) -> Result<AccessToken> {
+ let expires_at = Utc::now() + Duration::seconds(expires_in_seconds);
+
+ let access_token = sqlx::query_as::<_, AccessToken>(
+ r#"
+ INSERT INTO oauth2gw.access_tokens (session_id, token, token_type, expires_at)
+ VALUES ($1, $2, 'Bearer', $3)
+ RETURNING id, session_id, token, token_type, expires_at, created_at, revoked, revoked_at
+ "#
+ )
+ .bind(session_id)
+ .bind(token)
+ .bind(expires_at)
+ .fetch_one(pool)
+ .await?;
+
+ Ok(access_token)
+}
+
+/// Lookup and validate an access token
+///
+/// Returns None if:
+/// - Token doesn't exist
+/// - Token is expired
+/// - Token is revoked
+///
+/// Called by the /info endpoint
+pub async fn verify_access_token(
+ pool: &PgPool,
+ token: &str
+) -> Result<Option<AccessToken>> {
+ let access_token = sqlx::query_as::<_, AccessToken>(
+ r#"
+ SELECT id, session_id, token, token_type, expires_at, created_at, revoked, revoked_at
+ FROM oauth2gw.access_tokens
+ WHERE token = $1
+ AND expires_at > CURRENT_TIMESTAMP
+ AND revoked = FALSE
+ "#
+ )
+ .bind(token)
+ .fetch_optional(pool)
+ .await?;
+
+ Ok(access_token)
+}
+
+/// Get access token by session ID
+pub async fn get_token_by_session(
+ pool: &PgPool,
+ session_id: Uuid
+) -> Result<Option<AccessToken>> {
+ let access_token = sqlx::query_as::<_, AccessToken>(
+ r#"
+ SELECT id, session_id, token, token_type, expires_at, created_at, revoked, revoked_at
+ FROM oauth2gw.access_tokens
+ WHERE session_id = $1
+ ORDER BY created_at DESC
+ LIMIT 1
+ "#
+ )
+ .bind(session_id)
+ .fetch_optional(pool)
+ .await?;
+
+ Ok(access_token)
+}
+
+/// Revoke an access token
+pub async fn revoke_token(
+ pool: &PgPool,
+ token: &str
+) -> Result<bool> {
+ let result = sqlx::query(
+ r#"
+ UPDATE oauth2gw.access_tokens
+ SET
+ revoked = TRUE,
+ revoked_at = CURRENT_TIMESTAMP
+ WHERE token = $1 AND revoked = FALSE
+ "#
+ )
+ .bind(token)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected() > 0)
+}
+
+/// Delete expired access tokens (garbage collection)
+pub async fn delete_expired_tokens(pool: &PgPool) -> Result<u64> {
+ let result = sqlx::query(
+ r#"
+ DELETE FROM oauth2gw.access_tokens
+ WHERE expires_at < CURRENT_TIMESTAMP
+ "#
+ )
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected())
+}
+
+/// Delete old revoked tokens (garbage collection)
+pub async fn delete_old_revoked_tokens(pool: &PgPool, older_than_days: i64) -> Result<u64> {
+ let cutoff = Utc::now() - Duration::days(older_than_days);
+
+ let result = sqlx::query(
+ r#"
+ DELETE FROM oauth2gw.access_tokens
+ WHERE revoked = TRUE
+ AND revoked_at < $1
+ "#
+ )
+ .bind(cutoff)
+ .execute(pool)
+ .await?;
+
+ Ok(result.rows_affected())
+}
diff --git a/oauth2_gateway/src/lib.rs b/oauth2_gateway/src/lib.rs
@@ -2,4 +2,5 @@ pub mod config;
pub mod handlers;
pub mod models;
pub mod state;
-pub mod crypto;
-\ No newline at end of file
+pub mod crypto;
+pub mod db;
+\ No newline at end of file
diff --git a/oauth2_gateway/src/main.rs b/oauth2_gateway/src/main.rs
@@ -1,4 +1,4 @@
-use oauth2_gateway::{config::Config, handlers, state::AppState};
+use oauth2_gateway::{config::Config, db, handlers, state::AppState};
use clap::Parser;
use tower_http::trace::TraceLayer;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -21,7 +21,7 @@ async fn main() -> Result<()> {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
- .unwrap_or_else(|_| "oauth2_gateway=info,tower_http=info".into()),
+ .unwrap_or_else(|_| "oauth2_gateway=info,tower_http=info,sqlx=warn".into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
@@ -33,7 +33,10 @@ async fn main() -> Result<()> {
let config = Config::from_file(&args.config)?;
- let state = AppState::new(config.clone());
+ tracing::info!("Connecting to database: {}", config.database.url);
+ let pool = db::create_pool(&config.database.url).await?;
+
+ let state = AppState::new(config.clone(), pool);
let app = Router::new()
.route("/health", get(handlers::health_check))
diff --git a/oauth2_gateway/src/state.rs b/oauth2_gateway/src/state.rs
@@ -1,15 +1,18 @@
use crate::config::Config;
+use sqlx::PgPool;
use std::sync::Arc;
#[derive(Clone)]
pub struct AppState {
pub config: Arc<Config>,
+ pub pool: PgPool,
}
impl AppState {
- pub fn new(config: Config) -> Self {
+ pub fn new(config: Config, pool: PgPool) -> Self {
Self {
config: Arc::new(config),
+ pool,
}
}
}
diff --git a/oauth2_gateway/tests/api_tests.rs b/oauth2_gateway/tests/api_tests.rs
@@ -1,25 +1,28 @@
use axum::{routing::*, Router};
use axum_test::TestServer;
-use oauth2_gateway::{config::*, handlers, models::*, state::AppState};
+use oauth2_gateway::{config::*, db, handlers, models::*, state::AppState};
use serde_json::json;
-fn create_test_app() -> Router {
+/// Create a test app with mock configuration
+/// Note: These tests use handlers that return mock data (no real DB operations yet)
+async fn create_test_app() -> Router {
let config = Config {
server: ServerConfig {
host: "127.0.0.1".to_string(),
port: 8080,
},
- exchange: ExchangeConfig {
- base_url: "http://test-exchange.com".to_string(),
- notification_endpoint: "/notify".to_string(),
- },
- verifier: VerifierConfig {
- base_url: "http://test-verifier.com".to_string(),
- management_api_path: "/api".to_string(),
+ database: DatabaseConfig {
+ // Use a test db
+ url: std::env::var("TEST_DATABASE_URL")
+ .unwrap_or_else(|_| "postgresql://localhost/oauth2gw_test".to_string()),
},
};
+
+ let pool = db::create_pool(&config.database.url)
+ .await
+ .expect("Failed to connect to test database");
- let state = AppState::new(config);
+ let state = AppState::new(config, pool);
Router::new()
.route("/health", get(handlers::health_check))
@@ -33,7 +36,7 @@ fn create_test_app() -> Router {
#[tokio::test]
async fn test_health_check() {
- let app = create_test_app();
+ let app = create_test_app().await;
let server = TestServer::new(app).unwrap();
let response = server.get("/health").await;
@@ -48,7 +51,7 @@ async fn test_health_check() {
#[tokio::test]
async fn test_setup_endpoint() {
- let app = create_test_app();
+ let app = create_test_app().await;
let server = TestServer::new(app).unwrap();
let response = server
@@ -68,7 +71,7 @@ async fn test_setup_endpoint() {
#[tokio::test]
async fn test_setup_different_clients() {
- let app = create_test_app();
+ let app = create_test_app().await;
let server = TestServer::new(app).unwrap();
let response1 = server
@@ -92,7 +95,7 @@ async fn test_setup_different_clients() {
#[tokio::test]
async fn test_authorize_endpoint() {
- let app = create_test_app();
+ let app = create_test_app().await;
let server = TestServer::new(app).unwrap();
// Get a nonce from setup
@@ -113,4 +116,4 @@ async fn test_authorize_endpoint() {
let body: AuthorizeResponse = response.json();
assert!(!body.verification_url.is_empty());
println!("Verification URL: {}", body.verification_url);
-}
+}
+\ No newline at end of file