kych

OAuth 2.0 API for Swiyu to enable Taler integration of Swiyu for KYC (experimental)
Log | Files | Refs

notification_webhooks.rs (5872B)


      1 // Database operations for notification_pending_webhooks table
      2 
      3 use sqlx::PgPool;
      4 use anyhow::Result;
      5 use uuid::Uuid;
      6 
      7 /// Pending webhook record with authorization code for client notification
      8 #[derive(Debug, Clone)]
      9 pub struct PendingWebhook {
     10     pub webhook_pending_serial: i64,
     11     pub session_id: Uuid,
     12     pub client_id: Uuid,
     13     pub url: String,
     14     pub http_method: String,
     15     pub header: Option<String>,
     16     pub body: String,
     17     pub retries: i32,
     18     /// Authorization code to include in the webhook payload
     19     pub code: String,
     20 }
     21 
     22 /// Fetch pending webhooks ready to be sent
     23 ///
     24 /// Only returns webhooks where next_attempt <= current epoch time.
     25 ///
     26 /// Used by the background worker
     27 pub async fn get_pending_webhooks(
     28     pool: &PgPool,
     29     limit: i64,
     30 ) -> Result<Vec<PendingWebhook>> {
     31     let webhooks = sqlx::query(
     32         r#"
     33         SELECT
     34             npw.webhook_pending_serial,
     35             npw.session_id,
     36             npw.client_id,
     37             npw.url,
     38             npw.http_method,
     39             npw.header,
     40             npw.body,
     41             npw.retries,
     42             ac.code
     43         FROM oauth2gw.notification_pending_webhooks npw
     44         INNER JOIN oauth2gw.authorization_codes ac ON ac.session_id = npw.session_id
     45         WHERE npw.next_attempt <= EXTRACT(EPOCH FROM NOW())
     46         ORDER BY npw.webhook_pending_serial
     47         LIMIT $1
     48         "#
     49     )
     50     .bind(limit)
     51     .fetch_all(pool)
     52     .await?;
     53 
     54     Ok(webhooks.into_iter().map(|row: sqlx::postgres::PgRow| {
     55         use sqlx::Row;
     56         PendingWebhook {
     57             webhook_pending_serial: row.get("webhook_pending_serial"),
     58             session_id: row.get("session_id"),
     59             client_id: row.get("client_id"),
     60             url: row.get("url"),
     61             http_method: row.get("http_method"),
     62             header: row.get("header"),
     63             body: row.get("body"),
     64             retries: row.get("retries"),
     65             code: row.get("code"),
     66         }
     67     }).collect())
     68 }
     69 
     70 /// Delete a webhook after successful delivery
     71 pub async fn delete_webhook(
     72     pool: &PgPool,
     73     webhook_pending_serial: i64,
     74 ) -> Result<bool> {
     75     let result = sqlx::query(
     76         r#"
     77         DELETE FROM oauth2gw.notification_pending_webhooks
     78         WHERE webhook_pending_serial = $1
     79         "#
     80     )
     81     .bind(webhook_pending_serial)
     82     .execute(pool)
     83     .await?;
     84 
     85     Ok(result.rows_affected() > 0)
     86 }
     87 
     88 /// Disable a webhook (client error - no retry)
     89 ///
     90 /// Sets next_attempt to max i64 value so it never gets picked up again
     91 pub async fn disable_webhook(
     92     pool: &PgPool,
     93     webhook_pending_serial: i64,
     94 ) -> Result<bool> {
     95     let result = sqlx::query(
     96         r#"
     97         UPDATE oauth2gw.notification_pending_webhooks
     98         SET next_attempt = 9223372036854775807
     99         WHERE webhook_pending_serial = $1
    100         "#
    101     )
    102     .bind(webhook_pending_serial)
    103     .execute(pool)
    104     .await?;
    105 
    106     Ok(result.rows_affected() > 0)
    107 }
    108 
    109 /// Schedule retry with exponential backoff
    110 ///
    111 /// - Server errors (5xx): retry after delay_seconds (default 60)
    112 /// - Network/timeout: retry after longer delay (default 3600)
    113 pub async fn schedule_retry(
    114     pool: &PgPool,
    115     webhook_pending_serial: i64,
    116     delay_seconds: i64,
    117 ) -> Result<bool> {
    118     let result = sqlx::query(
    119         r#"
    120         UPDATE oauth2gw.notification_pending_webhooks
    121         SET next_attempt = EXTRACT(EPOCH FROM NOW()) + $1,
    122             retries = retries + 1
    123         WHERE webhook_pending_serial = $2
    124         "#
    125     )
    126     .bind(delay_seconds)
    127     .bind(webhook_pending_serial)
    128     .execute(pool)
    129     .await?;
    130 
    131     Ok(result.rows_affected() > 0)
    132 }
    133 
    134 /// Insert a new pending webhook notification
    135 ///
    136 /// Triggered by the /notification handler after verification is complete
    137 pub async fn insert_pending_webhook(
    138     pool: &PgPool,
    139     session_id: Uuid,
    140     client_id: Uuid,
    141     url: &str,
    142     body: &str,
    143 ) -> Result<i64> {
    144     let result = sqlx::query_scalar::<_, i64>(
    145         r#"
    146         INSERT INTO oauth2gw.notification_pending_webhooks
    147             (session_id, client_id, url, body, next_attempt)
    148         VALUES ($1, $2, $3, $4, 0)
    149         RETURNING webhook_pending_serial
    150         "#
    151     )
    152     .bind(session_id)
    153     .bind(client_id)
    154     .bind(url)
    155     .bind(body)
    156     .fetch_one(pool)
    157     .await?;
    158 
    159     Ok(result)
    160 }
    161 
    162 /// Delete old disabled webhooks (garbage collection)
    163 ///
    164 /// Removes webhooks that have been disabled (next_attempt = MAX) for cleanup
    165 pub async fn delete_disabled_webhooks(pool: &PgPool) -> Result<u64> {
    166     let result = sqlx::query(
    167         r#"
    168         DELETE FROM oauth2gw.notification_pending_webhooks
    169         WHERE next_attempt = 9223372036854775807
    170         "#
    171     )
    172     .execute(pool)
    173     .await?;
    174 
    175     Ok(result.rows_affected())
    176 }
    177 
    178 /// Delete webhooks that have exceeded max retries
    179 pub async fn delete_max_retries(
    180     pool: &PgPool,
    181     max_retries: i32,
    182 ) -> Result<u64> {
    183     let result = sqlx::query(
    184         r#"
    185         DELETE FROM oauth2gw.notification_pending_webhooks
    186         WHERE retries >= $1
    187         "#
    188     )
    189     .bind(max_retries)
    190     .execute(pool)
    191     .await?;
    192 
    193     Ok(result.rows_affected())
    194 }
    195 
    196 /// Get the next scheduled webhook attempt time
    197 ///
    198 /// Returns the earliest `next_attempt` timestamp for webhooks that are:
    199 /// - Not disabled (next_attempt < MAX_INT8)
    200 /// - Scheduled for the future (next_attempt > NOW)
    201 ///
    202 /// Used by worker to schedule precise wake-up for retries.
    203 /// Returns None if no future webhooks are scheduled.
    204 pub async fn get_next_scheduled_webhook(pool: &PgPool) -> Result<Option<i64>> {
    205     let result = sqlx::query_scalar::<_, i64>(
    206         r#"
    207         SELECT next_attempt
    208         FROM oauth2gw.notification_pending_webhooks
    209         WHERE next_attempt > EXTRACT(EPOCH FROM NOW())
    210           AND next_attempt < 9223372036854775807
    211         ORDER BY next_attempt ASC
    212         LIMIT 1
    213         "#
    214     )
    215     .fetch_optional(pool)
    216     .await?;
    217 
    218     Ok(result)
    219 }