notification.rs (2719B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use http_client::sse::SseClient; 18 use jiff::Timestamp; 19 use taler_common::ExpoBackoffDecorr; 20 use tokio::sync::Notify; 21 use tracing::{debug, error, trace}; 22 23 use crate::cyclos_api::{ 24 client::Client, 25 types::{NotificationEntityType, NotificationStatus}, 26 }; 27 28 pub async fn watch_notification(client: &Client<'_>, notify: &Notify) -> ! { 29 let client_id = Timestamp::now().as_microsecond(); 30 let mut sse_client = SseClient::new(); 31 let mut jitter = ExpoBackoffDecorr::default(); 32 loop { 33 let res: anyhow::Result<()> = async { 34 loop { 35 // Register listener 36 client 37 .push_notifications(client_id, &mut sse_client) 38 .await?; 39 jitter.reset(); 40 // Read available ones 41 while let Some(message) = sse_client.next().await { 42 let msg = message?; 43 trace!(target: "notification", "new message {}: {}", msg.event, msg.data); 44 if msg.event == "newNotification" { 45 let deserializer = &mut serde_json::Deserializer::from_str(&msg.data); 46 let status: NotificationStatus = 47 serde_path_to_error::deserialize(deserializer)?; 48 debug!(target: "notification", "new notification {} {:?} {:?}", status.notification.id, status.notification.ty, status.notification.entity_type); 49 if status.notification.entity_type == Some(NotificationEntityType::Transfer) 50 { 51 notify.notify_waiters(); 52 } 53 // Find a way to buffer all transactions 54 } 55 } 56 tokio::time::sleep( jitter.backoff()).await; 57 } 58 } 59 .await; 60 let err = res.unwrap_err(); 61 error!(target: "notification", "{err}"); 62 tokio::time::sleep(jitter.backoff()).await; 63 } 64 }