notification_webhooks.rs (5872B)
1 // Database operations for notification_pending_webhooks table 2 3 use sqlx::PgPool; 4 use anyhow::Result; 5 use uuid::Uuid; 6 7 /// Pending webhook record with authorization code for client notification 8 #[derive(Debug, Clone)] 9 pub struct PendingWebhook { 10 pub webhook_pending_serial: i64, 11 pub session_id: Uuid, 12 pub client_id: Uuid, 13 pub url: String, 14 pub http_method: String, 15 pub header: Option<String>, 16 pub body: String, 17 pub retries: i32, 18 /// Authorization code to include in the webhook payload 19 pub code: String, 20 } 21 22 /// Fetch pending webhooks ready to be sent 23 /// 24 /// Only returns webhooks where next_attempt <= current epoch time. 25 /// 26 /// Used by the background worker 27 pub async fn get_pending_webhooks( 28 pool: &PgPool, 29 limit: i64, 30 ) -> Result<Vec<PendingWebhook>> { 31 let webhooks = sqlx::query( 32 r#" 33 SELECT 34 npw.webhook_pending_serial, 35 npw.session_id, 36 npw.client_id, 37 npw.url, 38 npw.http_method, 39 npw.header, 40 npw.body, 41 npw.retries, 42 ac.code 43 FROM oauth2gw.notification_pending_webhooks npw 44 INNER JOIN oauth2gw.authorization_codes ac ON ac.session_id = npw.session_id 45 WHERE npw.next_attempt <= EXTRACT(EPOCH FROM NOW()) 46 ORDER BY npw.webhook_pending_serial 47 LIMIT $1 48 "# 49 ) 50 .bind(limit) 51 .fetch_all(pool) 52 .await?; 53 54 Ok(webhooks.into_iter().map(|row: sqlx::postgres::PgRow| { 55 use sqlx::Row; 56 PendingWebhook { 57 webhook_pending_serial: row.get("webhook_pending_serial"), 58 session_id: row.get("session_id"), 59 client_id: row.get("client_id"), 60 url: row.get("url"), 61 http_method: row.get("http_method"), 62 header: row.get("header"), 63 body: row.get("body"), 64 retries: row.get("retries"), 65 code: row.get("code"), 66 } 67 }).collect()) 68 } 69 70 /// Delete a webhook after successful delivery 71 pub async fn delete_webhook( 72 pool: &PgPool, 73 webhook_pending_serial: i64, 74 ) -> Result<bool> { 75 let result = sqlx::query( 76 r#" 77 DELETE FROM oauth2gw.notification_pending_webhooks 78 WHERE webhook_pending_serial = $1 79 "# 80 ) 81 .bind(webhook_pending_serial) 82 .execute(pool) 83 .await?; 84 85 Ok(result.rows_affected() > 0) 86 } 87 88 /// Disable a webhook (client error - no retry) 89 /// 90 /// Sets next_attempt to max i64 value so it never gets picked up again 91 pub async fn disable_webhook( 92 pool: &PgPool, 93 webhook_pending_serial: i64, 94 ) -> Result<bool> { 95 let result = sqlx::query( 96 r#" 97 UPDATE oauth2gw.notification_pending_webhooks 98 SET next_attempt = 9223372036854775807 99 WHERE webhook_pending_serial = $1 100 "# 101 ) 102 .bind(webhook_pending_serial) 103 .execute(pool) 104 .await?; 105 106 Ok(result.rows_affected() > 0) 107 } 108 109 /// Schedule retry with exponential backoff 110 /// 111 /// - Server errors (5xx): retry after delay_seconds (default 60) 112 /// - Network/timeout: retry after longer delay (default 3600) 113 pub async fn schedule_retry( 114 pool: &PgPool, 115 webhook_pending_serial: i64, 116 delay_seconds: i64, 117 ) -> Result<bool> { 118 let result = sqlx::query( 119 r#" 120 UPDATE oauth2gw.notification_pending_webhooks 121 SET next_attempt = EXTRACT(EPOCH FROM NOW()) + $1, 122 retries = retries + 1 123 WHERE webhook_pending_serial = $2 124 "# 125 ) 126 .bind(delay_seconds) 127 .bind(webhook_pending_serial) 128 .execute(pool) 129 .await?; 130 131 Ok(result.rows_affected() > 0) 132 } 133 134 /// Insert a new pending webhook notification 135 /// 136 /// Triggered by the /notification handler after verification is complete 137 pub async fn insert_pending_webhook( 138 pool: &PgPool, 139 session_id: Uuid, 140 client_id: Uuid, 141 url: &str, 142 body: &str, 143 ) -> Result<i64> { 144 let result = sqlx::query_scalar::<_, i64>( 145 r#" 146 INSERT INTO oauth2gw.notification_pending_webhooks 147 (session_id, client_id, url, body, next_attempt) 148 VALUES ($1, $2, $3, $4, 0) 149 RETURNING webhook_pending_serial 150 "# 151 ) 152 .bind(session_id) 153 .bind(client_id) 154 .bind(url) 155 .bind(body) 156 .fetch_one(pool) 157 .await?; 158 159 Ok(result) 160 } 161 162 /// Delete old disabled webhooks (garbage collection) 163 /// 164 /// Removes webhooks that have been disabled (next_attempt = MAX) for cleanup 165 pub async fn delete_disabled_webhooks(pool: &PgPool) -> Result<u64> { 166 let result = sqlx::query( 167 r#" 168 DELETE FROM oauth2gw.notification_pending_webhooks 169 WHERE next_attempt = 9223372036854775807 170 "# 171 ) 172 .execute(pool) 173 .await?; 174 175 Ok(result.rows_affected()) 176 } 177 178 /// Delete webhooks that have exceeded max retries 179 pub async fn delete_max_retries( 180 pool: &PgPool, 181 max_retries: i32, 182 ) -> Result<u64> { 183 let result = sqlx::query( 184 r#" 185 DELETE FROM oauth2gw.notification_pending_webhooks 186 WHERE retries >= $1 187 "# 188 ) 189 .bind(max_retries) 190 .execute(pool) 191 .await?; 192 193 Ok(result.rows_affected()) 194 } 195 196 /// Get the next scheduled webhook attempt time 197 /// 198 /// Returns the earliest `next_attempt` timestamp for webhooks that are: 199 /// - Not disabled (next_attempt < MAX_INT8) 200 /// - Scheduled for the future (next_attempt > NOW) 201 /// 202 /// Used by worker to schedule precise wake-up for retries. 203 /// Returns None if no future webhooks are scheduled. 204 pub async fn get_next_scheduled_webhook(pool: &PgPool) -> Result<Option<i64>> { 205 let result = sqlx::query_scalar::<_, i64>( 206 r#" 207 SELECT next_attempt 208 FROM oauth2gw.notification_pending_webhooks 209 WHERE next_attempt > EXTRACT(EPOCH FROM NOW()) 210 AND next_attempt < 9223372036854775807 211 ORDER BY next_attempt ASC 212 LIMIT 1 213 "# 214 ) 215 .fetch_optional(pool) 216 .await?; 217 218 Ok(result) 219 }