taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

notification.rs (2719B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use http_client::sse::SseClient;
     18 use jiff::Timestamp;
     19 use taler_common::ExpoBackoffDecorr;
     20 use tokio::sync::Notify;
     21 use tracing::{debug, error, trace};
     22 
     23 use crate::cyclos_api::{
     24     client::Client,
     25     types::{NotificationEntityType, NotificationStatus},
     26 };
     27 
     28 pub async fn watch_notification(client: &Client<'_>, notify: &Notify) -> ! {
     29     let client_id = Timestamp::now().as_microsecond();
     30     let mut sse_client = SseClient::new();
     31     let mut jitter = ExpoBackoffDecorr::default();
     32     loop {
     33         let res: anyhow::Result<()> = async {
     34             loop {
     35                 // Register listener
     36                 client
     37                     .push_notifications(client_id, &mut sse_client)
     38                     .await?;
     39                 jitter.reset();
     40                 // Read available ones
     41                 while let Some(message) = sse_client.next().await {
     42                     let msg = message?;
     43                     trace!(target: "notification", "new message {}: {}", msg.event, msg.data);
     44                     if msg.event == "newNotification" {
     45                         let deserializer = &mut serde_json::Deserializer::from_str(&msg.data);
     46                         let status: NotificationStatus =
     47                             serde_path_to_error::deserialize(deserializer)?;
     48                         debug!(target: "notification", "new notification {} {:?} {:?}", status.notification.id, status.notification.ty, status.notification.entity_type);
     49                         if status.notification.entity_type == Some(NotificationEntityType::Transfer)
     50                         {
     51                             notify.notify_waiters();
     52                         }
     53                         // Find a way to buffer all transactions
     54                     }
     55                 }
     56                 tokio::time::sleep( jitter.backoff()).await;
     57             }
     58         }
     59         .await;
     60         let err = res.unwrap_err();
     61         error!(target: "notification", "{err}");
     62         tokio::time::sleep(jitter.backoff()).await;
     63     }
     64 }