commit 75cf00cfbe6d004077fc13d6948983e33e425e68
parent 80b72aa6968330a0a27421e8786faf411c8fafa6
Author: Henrique Chan Carvalho Machado <henriqueccmachado@tecnico.ulisboa.pt>
Date: Thu, 11 Dec 2025 21:39:19 +0100
oauth2_gateway: refactor token, webhook settings to config file. refactor random byte dependent functions
Diffstat:
5 files changed, 139 insertions(+), 46 deletions(-)
diff --git a/oauth2_gateway/src/bin/webhook_worker.rs b/oauth2_gateway/src/bin/webhook_worker.rs
@@ -89,7 +89,7 @@ async fn main() -> Result<()> {
});
// Run the worker
- worker::run_worker(pool, args.test_mode, shutdown_rx).await?;
+ worker::run_worker(pool, &config.webhook_worker, args.test_mode, shutdown_rx).await?;
tracing::info!("Webhook worker exited cleanly");
Ok(())
diff --git a/oauth2_gateway/src/config.rs b/oauth2_gateway/src/config.rs
@@ -6,7 +6,9 @@ use std::path::Path;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Config {
pub server: ServerConfig,
- pub database: DatabaseConfig
+ pub database: DatabaseConfig,
+ pub crypto: CryptoConfig,
+ pub webhook_worker: WebhookWorkerConfig,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@@ -46,6 +48,22 @@ pub struct DatabaseConfig {
pub url: String,
}
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct CryptoConfig {
+ pub nonce_bytes: usize,
+ pub token_bytes: usize,
+ pub authorization_code_bytes: usize,
+}
+
+#[derive(Debug, Clone, Serialize, Deserialize)]
+pub struct WebhookWorkerConfig {
+ pub retry_delay_server_error: i64,
+ pub retry_delay_forbidden: i64,
+ pub retry_delay_other: i64,
+ pub fallback_poll_secs: u64,
+ pub batch_size: i64,
+}
+
impl Config {
pub fn from_file<P: AsRef<Path>>(path: P) -> Result<Self> {
let ini = Ini::load_from_file(path.as_ref())
@@ -82,9 +100,65 @@ impl Config {
.to_string(),
};
+ let crypto_section = ini
+ .section(Some("crypto"))
+ .context("Missing [crypto] section")?;
+
+ let crypto = CryptoConfig {
+ nonce_bytes: crypto_section
+ .get("nonce_bytes")
+ .context("Missing crypto.nonce_bytes")?
+ .parse()
+ .context("Invalid crypto.nonce_bytes")?,
+ token_bytes: crypto_section
+ .get("token_bytes")
+ .context("Missing crypto.token_bytes")?
+ .parse()
+ .context("Invalid crypto.token_bytes")?,
+ authorization_code_bytes: crypto_section
+ .get("authorization_code_bytes")
+ .context("Missing crypto.authorization_code_bytes")?
+ .parse()
+ .context("Invalid crypto.authorization_code_bytes")?,
+ };
+
+ let webhook_worker_section = ini
+ .section(Some("webhook_worker"))
+ .context("Missing [webhook_worker] section")?;
+
+ let webhook_worker = WebhookWorkerConfig {
+ retry_delay_server_error: webhook_worker_section
+ .get("retry_delay_server_error")
+ .context("Missing webhook_worker.retry_delay_server_error")?
+ .parse()
+ .context("Invalid webhook_worker.retry_delay_server_error")?,
+ retry_delay_forbidden: webhook_worker_section
+ .get("retry_delay_forbidden")
+ .context("Missing webhook_worker.retry_delay_forbidden")?
+ .parse()
+ .context("Invalid webhook_worker.retry_delay_forbidden")?,
+ retry_delay_other: webhook_worker_section
+ .get("retry_delay_other")
+ .context("Missing webhook_worker.retry_delay_other")?
+ .parse()
+ .context("Invalid webhook_worker.retry_delay_other")?,
+ fallback_poll_secs: webhook_worker_section
+ .get("fallback_poll_secs")
+ .context("Missing webhook_worker.fallback_poll_secs")?
+ .parse()
+ .context("Invalid webhook_worker.fallback_poll_secs")?,
+ batch_size: webhook_worker_section
+ .get("batch_size")
+ .context("Missing webhook_worker.batch_size")?
+ .parse()
+ .context("Invalid webhook_worker.batch_size")?,
+ };
+
Ok(Config {
server,
database,
+ crypto,
+ webhook_worker,
})
}
}
diff --git a/oauth2_gateway/src/crypto.rs b/oauth2_gateway/src/crypto.rs
@@ -1,18 +1,43 @@
use base64::{engine::general_purpose::URL_SAFE_NO_PAD, Engine as _};
use rand::Rng;
+/// Generate cryptographically secure random bytes
+///
+/// Returns a vector of random bytes of the specified length
+pub fn generate_random_bytes(length: usize) -> Vec<u8> {
+ let mut rng = rand::thread_rng();
+ let mut bytes = vec![0u8; length];
+ rng.fill(&mut bytes[..]);
+ bytes
+}
+
/// Generate a cryptographically secure nonce
///
/// Format: base64 encoded random bytes
-/// Length: 256 bits (43 base64 characters)
///
/// Example: "k7E9mZqYvXwPxR2nT8uL5sA6fH3jC1dG4bN0iM9oU2p"
-pub fn generate_nonce() -> String {
- let mut rng = rand::thread_rng();
- let mut bytes = [0u8; 32]; // 32 bytes
- rng.fill(&mut bytes);
+pub fn generate_nonce(bytes_len: usize) -> String {
+ let bytes = generate_random_bytes(bytes_len);
+ URL_SAFE_NO_PAD.encode(bytes)
+}
+
+/// Generate a cryptographically secure access token
+///
+/// Format: base64 encoded random bytes
+///
+/// Example: "xR2nT8uL5sA6fH3jC1dG4bN0iM9oU2pk7E9mZqYvXwP"
+pub fn generate_token(bytes_len: usize) -> String {
+ let bytes = generate_random_bytes(bytes_len);
+ URL_SAFE_NO_PAD.encode(bytes)
+}
- // base64 encoding (no padding)
+/// Generate a cryptographically secure authorization code
+///
+/// Format: base64 encoded random bytes
+///
+/// Example: "a1B2c3D4e5F6g7H8i9J0k1L2m3N4o5P6q7R8s9T0u1V"
+pub fn generate_authorization_code(bytes_len: usize) -> String {
+ let bytes = generate_random_bytes(bytes_len);
URL_SAFE_NO_PAD.encode(bytes)
}
@@ -23,7 +48,7 @@ mod tests {
#[test]
fn test_nonce_generation() {
- let nonce = generate_nonce();
+ let nonce = generate_nonce(32);
// Check length (32 bytes base64 = 43 chars without padding)
assert_eq!(nonce.len(), 43);
@@ -38,7 +63,7 @@ mod tests {
// Generate 1000 nonces, all should be unique
for _ in 0..1000 {
- let nonce = generate_nonce();
+ let nonce = generate_nonce(32);
assert!(nonces.insert(nonce), "Duplicate nonce generated!");
}
}
@@ -46,7 +71,7 @@ mod tests {
#[test]
fn test_no_padding_in_tokens() {
// Ensure no '=' padding characters
- let nonce = generate_nonce();
+ let nonce = generate_nonce(32);
assert!(!nonce.contains('='));
}
diff --git a/oauth2_gateway/src/handlers.rs b/oauth2_gateway/src/handlers.rs
@@ -82,7 +82,7 @@ pub async fn setup(
));
}
- let nonce = crypto::generate_nonce();
+ let nonce = crypto::generate_nonce(state.config.crypto.nonce_bytes);
tracing::debug!("Generated nonce: {}", nonce);
@@ -482,7 +482,7 @@ pub async fn token(
}
// Generate new token and complete session
- let access_token = crypto::generate_nonce();
+ let access_token = crypto::generate_token(state.config.crypto.token_bytes);
let token = crate::db::tokens::create_token_and_complete_session(
&state.pool,
data.session_id,
@@ -671,7 +671,7 @@ pub async fn notification_webhook(
};
// Generate authorization code
- let authorization_code = crypto::generate_nonce();
+ let authorization_code = crypto::generate_authorization_code(state.config.crypto.authorization_code_bytes);
// Construct GET request URL: redirect_uri?code=XXX&state=YYY
let redirect_uri = session_data.redirect_uri.as_ref()
diff --git a/oauth2_gateway/src/worker.rs b/oauth2_gateway/src/worker.rs
@@ -12,31 +12,24 @@ use std::time::Duration;
use tokio::sync::watch;
use tracing::{debug, error, info, warn};
-use crate::db::notification_webhooks::{
- delete_webhook, disable_webhook, get_next_scheduled_webhook, get_pending_webhooks,
- schedule_retry, PendingWebhook,
+use crate::{
+ config::WebhookWorkerConfig,
+ db::notification_webhooks::{
+ delete_webhook, disable_webhook, get_next_scheduled_webhook, get_pending_webhooks,
+ schedule_retry, PendingWebhook,
+ },
};
-/// Retry delays based on HTTP response codes
-const RETRY_DELAY_SERVER_ERROR: i64 = 60; // 500
-const RETRY_DELAY_FORBIDDEN: i64 = 60; // 403
-const RETRY_DELAY_OTHER: i64 = 3600; // default
-
-/// Fallback polling interval when no webhooks are pending
-const FALLBACK_POLL_SECS: u64 = 300;
-
-/// Maximum webhooks to fetch per batch
-const BATCH_SIZE: i64 = 100;
-
/// Run the webhook worker
///
/// This function blocks until shutdown signal is received.
/// 1. Fetch pending webhooks (next_attempt <= NOW)
/// 2. Execute HTTP requests, wait for all to complete
/// 3. If no pending, check for future webhooks and schedule wake-up
-/// 4. Fallback poll every 5 minutes
+/// 4. Fallback poll configured interval
pub async fn run_worker(
pool: PgPool,
+ config: &WebhookWorkerConfig,
test_mode: bool,
mut shutdown: watch::Receiver<bool>,
) -> Result<()> {
@@ -55,7 +48,7 @@ pub async fn run_worker(
loop {
// select work: process pending webhooks (keep looping while there's work)
loop {
- let has_work = select_work(&pool, &http_client).await;
+ let has_work = select_work(&pool, &http_client, config).await;
if !has_work {
if test_mode {
@@ -76,12 +69,12 @@ pub async fn run_worker(
Duration::from_secs(delay)
}
Ok(None) => {
- debug!("No future webhooks, fallback poll in {} seconds", FALLBACK_POLL_SECS);
- Duration::from_secs(FALLBACK_POLL_SECS)
+ debug!("No future webhooks, fallback poll in {} seconds", config.fallback_poll_secs);
+ Duration::from_secs(config.fallback_poll_secs)
}
Err(e) => {
error!("Failed to get next scheduled webhook: {}", e);
- Duration::from_secs(FALLBACK_POLL_SECS)
+ Duration::from_secs(config.fallback_poll_secs)
}
};
@@ -123,8 +116,8 @@ pub async fn run_worker(
/// Process all pending webhooks
///
/// Returns true if any webhooks were processed.
-async fn select_work(pool: &PgPool, http_client: &reqwest::Client) -> bool {
- let webhooks = match get_pending_webhooks(pool, BATCH_SIZE).await {
+async fn select_work(pool: &PgPool, http_client: &reqwest::Client, config: &WebhookWorkerConfig) -> bool {
+ let webhooks = match get_pending_webhooks(pool, config.batch_size).await {
Ok(w) => w,
Err(e) => {
error!("Failed to fetch pending webhooks: {}", e);
@@ -147,10 +140,11 @@ async fn select_work(pool: &PgPool, http_client: &reqwest::Client) -> bool {
let pool = pool.clone();
let client = http_client.clone();
let counter = in_flight.clone();
+ let cfg = config.clone();
// Fire HTTP request
let handle = tokio::spawn(async move {
- execute_webhook(&pool, &client, webhook).await;
+ execute_webhook(&pool, &client, webhook, &cfg).await;
counter.fetch_sub(1, Ordering::SeqCst);
});
@@ -168,7 +162,7 @@ async fn select_work(pool: &PgPool, http_client: &reqwest::Client) -> bool {
}
/// Execute a single webhook HTTP request
-async fn execute_webhook(pool: &PgPool, client: &reqwest::Client, webhook: PendingWebhook) {
+async fn execute_webhook(pool: &PgPool, client: &reqwest::Client, webhook: PendingWebhook, config: &WebhookWorkerConfig) {
let serial = webhook.webhook_pending_serial;
debug!(
@@ -210,23 +204,23 @@ async fn execute_webhook(pool: &PgPool, client: &reqwest::Client, webhook: Pendi
Ok(r) => r,
Err(e) => {
warn!("Network error for webhook {}: {}", serial, e);
- handle_webhook_response(pool, serial, 0).await;
+ handle_webhook_response(pool, serial, 0, config).await;
return;
}
};
let status_code = response.status().as_u16();
- handle_webhook_response(pool, serial, status_code.into()).await;
+ handle_webhook_response(pool, serial, status_code.into(), config).await;
}
/// Handle webhook response
///
/// - 2xx: delete pending webhook (success)
/// - 400: next_attempt = FOREVER (never retry)
-/// - 500: next_attempt = NOW + 1 minute
-/// - 403: next_attempt = NOW + 1 minute
-/// - other: next_attempt = NOW + 1 hour
-async fn handle_webhook_response(pool: &PgPool, serial: i64, response_code: i64) {
+/// - 500: next_attempt = NOW + configured delay
+/// - 403: next_attempt = NOW + configured delay
+/// - other: next_attempt = NOW + configured delay
+async fn handle_webhook_response(pool: &PgPool, serial: i64, response_code: i64, config: &WebhookWorkerConfig) {
info!("Webhook {} returned with status {}", serial, response_code);
// 200 success - delete from queue
@@ -252,9 +246,9 @@ async fn handle_webhook_response(pool: &PgPool, serial: i64, response_code: i64)
let _ = disable_webhook(pool, serial).await;
return;
}
- 500 => RETRY_DELAY_SERVER_ERROR,
- 403 => RETRY_DELAY_FORBIDDEN,
- _ => RETRY_DELAY_OTHER,
+ 500 => config.retry_delay_server_error,
+ 403 => config.retry_delay_forbidden,
+ _ => config.retry_delay_other,
};
debug!("Scheduling webhook {} retry in {} seconds", serial, delay);