taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit c9a55ed01615c3e28c4c2921aea585f720444a61
parent 852e9192220c8fc834a80fe3ed9653bbf4908e76
Author: Antoine A <>
Date:   Mon, 29 Dec 2025 17:48:41 +0100

cyclos: watch transfer notifications

Diffstat:
MCargo.toml | 1+
Mcommon/taler-common/Cargo.toml | 2+-
Mcommon/taler-common/src/error.rs | 2++
Mcommon/taler-common/src/lib.rs | 2+-
Mtaler-cyclos/Cargo.toml | 8++++++++
Mtaler-cyclos/src/bin/cyclos-harness.rs | 10+++++++---
Mtaler-cyclos/src/constants.rs | 2+-
Mtaler-cyclos/src/cyclos_api/api.rs | 161++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mtaler-cyclos/src/cyclos_api/client.rs | 14+++++++++++++-
Mtaler-cyclos/src/cyclos_api/types.rs | 289++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtaler-cyclos/src/lib.rs | 5++---
Mtaler-cyclos/src/main.rs | 4++--
Ataler-cyclos/src/notification.rs | 66++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtaler-cyclos/src/setup.rs | 54++++++++++++++++++++++++++++++------------------------
Mtaler-cyclos/src/worker.rs | 103+++++++++++++++++++++++++++++++++++++++++++++++++++++--------------------------
15 files changed, 647 insertions(+), 76 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml @@ -59,3 +59,4 @@ ed25519-dalek = { version = "2.1.1", default-features = false, features = [ "rand_core", ] } rand_core = { version = "0.6.4" } +compact_str = { version = "0.9.0", features = ["serde", "sqlx-postgres"] } diff --git a/common/taler-common/Cargo.toml b/common/taler-common/Cargo.toml @@ -28,7 +28,7 @@ ed25519-dalek.workspace = true rand_core.workspace = true tokio = { workspace = true, features = ["rt-multi-thread"] } sqlx = { workspace = true, features = ["macros"] } -compact_str = { version = "0.9.0", features = ["serde", "sqlx-postgres"] } +compact_str.workspace = true [dev-dependencies] criterion.workspace = true diff --git a/common/taler-common/src/error.rs b/common/taler-common/src/error.rs @@ -33,6 +33,8 @@ fn fmt_with_source( } } +impl<E: std::error::Error> std::error::Error for FmtSource<E> {} + impl<E: std::error::Error> std::fmt::Display for FmtSource<E> { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fmt_with_source(f, &self.0) diff --git a/common/taler-common/src/lib.rs b/common/taler-common/src/lib.rs @@ -107,6 +107,6 @@ impl ExpoBackoffDecorr { impl Default for ExpoBackoffDecorr { fn default() -> Self { - Self::new(200, 15 * 1000, 2.0) + Self::new(400, 30 * 1000, 2.5) } } diff --git a/taler-cyclos/Cargo.toml b/taler-cyclos/Cargo.toml @@ -12,6 +12,7 @@ license-file.workspace = true reqwest = { version = "0.12", default-features = false, features = [ "json", "rustls-tls", + "stream", ] } sqlx.workspace = true serde_json = { workspace = true, features = ["raw_value"] } @@ -27,6 +28,13 @@ serde_urlencoded.workspace = true thiserror.workspace = true tracing.workspace = true tokio.workspace = true +tokio-util = { version = "0.7.17", default-features = false, features = [ + "codec", + "io", +] } +futures-util = { version = "0.3", default-features = false } +compact_str.workspace = true + anyhow.workspace = true base64.workspace = true owo-colors.workspace = true diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs @@ -262,7 +262,9 @@ impl<'a> Balances<'a> { assert_eq!( current, (self.exchange_balance, self.client_balance), - "{current:?} {diff}" + "({} {}) +{diff}", + current.0, + current.1 ); } attempts += 1; @@ -284,7 +286,9 @@ impl<'a> Balances<'a> { assert_eq!( current, (self.exchange_balance, self.client_balance), - "{current:?} {diff}" + "({} {}) -{diff}", + current.0, + current.1 ); } attempts += 1; @@ -547,7 +551,7 @@ async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> { let client = http_client.clone(); let pool = pool.clone(); let config = config.clone(); - tokio::spawn(async move { run_worker(&config, &pool, &client).await }) + tokio::spawn(async move { run_worker(&config, &pool, &client, false).await }) }; tokio::time::sleep(Duration::from_secs(5)).await; harness.worker().await.unwrap(); diff --git a/taler-cyclos/src/constants.rs b/taler-cyclos/src/constants.rs @@ -19,4 +19,4 @@ use std::time::Duration; use taler_common::config::parser::ConfigSource; pub const CONFIG_SOURCE: ConfigSource = ConfigSource::new("taler-cyclos", "cyclos", "taler-cyclos"); -pub const WORKER_FREQUENCY: Duration = Duration::from_secs(60); +pub const WORKER_FREQUENCY: Duration = Duration::from_secs(600); diff --git a/taler-cyclos/src/cyclos_api/api.rs b/taler-cyclos/src/cyclos_api/api.rs @@ -14,12 +14,22 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::{borrow::Cow, fmt::Display}; +use std::{borrow::Cow, fmt::Display, pin::Pin}; -use reqwest::{Client, Method, RequestBuilder, StatusCode, Url}; +use compact_str::CompactString; +use futures_util::{Stream, StreamExt, stream}; +use reqwest::{ + Client, Method, RequestBuilder, StatusCode, Url, + header::{self, HeaderName, HeaderValue}, +}; use serde::{Deserialize, Serialize, de::DeserializeOwned}; use taler_common::error::FmtSource; use thiserror::Error; +use tokio_util::{ + bytes::Bytes, + codec::{FramedRead, LinesCodec, LinesCodecError}, + io::StreamReader, +}; use tracing::trace; use crate::cyclos_api::types::{ @@ -58,7 +68,7 @@ impl Display for ApiErr { #[derive(Error, Debug)] pub enum ErrKind { - #[error("transport: {0}")] + #[error(transparent)] Transport(FmtSource<reqwest::Error>), #[error("JSON body: {0}")] Json(#[from] serde_path_to_error::Error<serde_json::Error>), @@ -78,7 +88,7 @@ pub enum ErrKind { impl From<reqwest::Error> for ErrKind { fn from(value: reqwest::Error) -> Self { - Self::Transport(value.into()) + Self::Transport(value.without_url().into()) } } @@ -124,8 +134,13 @@ impl<'a> CyclosRequest<'a> { } } - pub fn query<T: Serialize + ?Sized>(mut self, query: &T) -> Self { - self.builder = self.builder.query(query); + pub fn query<T: Serialize + ?Sized>(mut self, name: &str, value: &T) -> Self { + self.builder = self.builder.query(&[(name, value)]); + self + } + + pub fn header(mut self, key: impl Into<HeaderName>, value: impl Into<HeaderValue>) -> Self { + self.builder = self.builder.header(key, value); self } @@ -134,6 +149,53 @@ impl<'a> CyclosRequest<'a> { self } + pub async fn into_sse(mut self, sse_client: &mut SseClient) -> ApiResult<()> { + self.builder = self + .builder + .header( + header::ACCEPT, + HeaderValue::from_static("text/event-stream"), + ) + .header(header::CACHE_CONTROL, HeaderValue::from_static("no-cache")); + if let Some(id) = &sse_client.last_event_id { + self.builder = self + .builder + .header(HeaderName::from_static("Last-Event-ID"), id.as_str()); + } + let Self { + path, + builder, + method, + auth, + } = self; + let (client, req) = match auth { + CyclosAuth::None => builder, + CyclosAuth::Basic { username, password } => { + builder.basic_auth(username, Some(password)) + } + } + .build_split(); + match async { + let req = req?; + let res = client.execute(req).await?; + Ok(res) + } + .await + { + Ok(res) => sse_client.connect(res.bytes_stream()), + Err(kind) => { + return Err(ApiErr { + path, + method, + kind, + status: None, + }); + } + } + + Ok(()) + } + pub async fn parse_json<T: DeserializeOwned>(self) -> ApiResult<T> { let Self { path, @@ -189,3 +251,90 @@ impl<'a> CyclosRequest<'a> { } } } + +#[derive(Debug, Default)] +pub struct SseMessage { + pub event: CompactString, + pub data: String, +} + +type SseStream = dyn Stream<Item = std::result::Result<String, LinesCodecError>> + Send; + +pub struct SseClient { + last_event_id: Option<CompactString>, + reconnection_time: Option<u64>, + stream: Pin<Box<SseStream>>, +} + +impl SseClient { + pub fn new() -> Self { + Self { + last_event_id: None, + reconnection_time: None, + stream: Box::pin(stream::empty()), + } + } + + pub fn connect<E: std::error::Error + Send + Sync + 'static>( + &mut self, + stream: impl Stream<Item = Result<Bytes, E>> + 'static + Send, + ) { + let stream = stream.map(|it| it.map_err(std::io::Error::other)); + let lines = FramedRead::new(StreamReader::new(stream), LinesCodec::new()); + self.stream = Box::pin(lines); + } + + pub async fn next(&mut self) -> Option<Result<SseMessage, LinesCodecError>> { + let mut event = CompactString::new("message"); + let mut data = String::new(); + while let Some(res) = self.stream.next().await { + let line = match res { + Ok(line) => line, + Err(e) => return Some(Err(e)), + }; + // Parse line + let (field, value): (&str, &str) = if line.is_empty() { + if data.ends_with('\n') { + data.pop(); + } + return Some(Ok(SseMessage { event, data })); + } else if let Some(comment) = line.strip_prefix(':') { + trace!(target: "sse", "{comment}"); + continue; + } else if let Some((k, v)) = line.split_once(':') { + (k, v.trim_start_matches(' ')) + } else { + (&line, "") + }; + + // Process field + match field { + "event" => event = CompactString::new(value), + "data" => { + data.push_str(value); + data.push('\n'); + } + "id" => { + if value.contains('\0') { + self.last_event_id = Some(CompactString::new(value)) + } + } + "retry" => { + if value.as_bytes().iter().all(|c| c.is_ascii_digit()) + && let Ok(int) = value.parse::<u64>() + { + self.reconnection_time = Some(int) + } + } + _ => continue, + } + } + None + } +} + +impl Default for SseClient { + fn default() -> Self { + Self::new() + } +} diff --git a/taler-cyclos/src/cyclos_api/client.rs b/taler-cyclos/src/cyclos_api/client.rs @@ -21,7 +21,7 @@ use serde_json::json; use taler_common::types::amount::Decimal; use crate::cyclos_api::{ - api::{ApiResult, CyclosAuth, CyclosRequest}, + api::{ApiResult, CyclosAuth, CyclosRequest, SseClient}, types::{Account, DataForTransaction, HistoryItem, Transaction, Transfer, User}, }; @@ -97,4 +97,16 @@ impl Client<'_> { .parse_json() .await } + + pub async fn push_notifications( + &self, + client_id: u64, + sse_client: &mut SseClient, + ) -> ApiResult<()> { + self.request(Method::GET, "push/subscribe") + .query("clientId", &client_id) + .query("kinds", "newNotification") + .into_sse(sse_client) + .await + } } diff --git a/taler-cyclos/src/cyclos_api/types.rs b/taler-cyclos/src/cyclos_api/types.rs @@ -211,6 +211,23 @@ pub enum AccountKind { System, User { user: UserInfo }, } +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct NotificationStatus { + pub new_notifications: u32, + pub unread_notifications: u32, + pub notification: Notification, +} + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Notification { + pub id: CyclosId, + pub date: Timestamp, + #[serde(rename = "type")] + pub ty: NotificationType, + pub entity_type: Option<NotificationEntityType>, +} #[derive(Debug, serde::Deserialize)] #[serde(rename_all = "camelCase")] @@ -435,3 +452,275 @@ pub enum InputError { property_errors: BTreeMap<String, Vec<String>>, }, } + +#[derive(Debug, serde::Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +/// The different notification types generated for users / administrators. +pub enum NotificationType { + /// A notification generated if a notification created when an advertisement is authorized + AdAuthorized, + /// A notification generated if a notification created when an advertisement expires + AdExpired, + /// A notification generated if a notification created by a new advertisement (Simple or Webshop) + AdInterestNotification, + /// A notification generated if an ad is pending by broker authorization + AdPendingAuthorization, + /// An admin notification generated if an advertisement is pending for authorization + AdPendingByAdminAuthorization, + /// A notification generated if a question answered to some AD (Simple or Webshop) + AdQuestionAnswered, + /// A notification generated if a question created to some AD (Simple or Webshop) + AdQuestionCreated, + /// A notification generated if a notification created when an advertisement authorization is rejected + AdRejected, + /// A notification generated if a user performed a new payment through a channel that is not the SMS channel + AllNonSmsPerformedPayments, + /// An admin notification generated if an application error has occurred + ApplicationError, + /// A notification generated if a webshop product is out of stock + ArticleOutOfStock, + /// A notification generated if the authorization of a payment was canceled. This notification is to be sent to the payer + AuthorizedPaymentCanceled, + /// A notification generated if the authorization of a payment was denied. This notification is to be sent to the payer + AuthorizedPaymentDenied, + /// A notification generated if the authorization of a payment has expired. This notification is to be sent to the payer + AuthorizedPaymentExpired, + /// A notification generated if the authorization of a payment succeeded (the payment went successfully through its final authorization and is now processed). This notification is to be sent to the payer + AuthorizedPaymentSucceeded, + /// Deprecated: Voucher notifications are no longer only for bought.. A notification generated if a one or more bought vouchers are about to expire + BoughtVouchersAboutToExpire, + /// Deprecated: Voucher notifications are no longer only for bought.. A notification generated if a bought voucher has new expiration date + BoughtVouchersExpirationDateChanged, + /// Deprecated: Voucher notifications are no longer only for bought.. A notification generated if one or more bought vouchers have expired + BoughtVouchersExpired, + /// A notification generated if a broker has been assigned to a user + BrokerAssigned, + /// A notification generated if a broker has been unassigned from a user + BrokerUnassigned, + /// A notification generated if the external payment has reached the expiration date + ExternalPaymentExpired, + /// A notification generated if the performed external payment has failed processing + ExternalPaymentPerformedFailed, + /// A notification generated if the received external payment has failed processing + ExternalPaymentReceivedFailed, + /// An admin notification generated if an external payment has expired + ExternalUserPaymentExpired, + /// An admin notification generated if an external payment failed processing + ExternalUserPaymentPerformedFailed, + /// A notification generated if a transaction feedback was modified + FeedbackChanged, + /// A notification generated if a transaction feedback was created + FeedbackCreated, + /// A notification generated if a transaction feedback is about to expire + FeedbackExpirationReminder, + /// A notification generated if a performed payment can have an optional feedback + FeedbackOptional, + /// A notification generated if a transaction feedback was replied + FeedbackReplyCreated, + /// A notification generated if a performed payment needs to be given a feedback + FeedbackRequired, + /// An admin notification generated if a voucher will expire in a few days + GeneratedVouchersAboutToExpire, + /// An admin notification generated if a voucher has expired + GeneratedVouchersExpired, + /// A notification generated if a recurring payment to a user has been canceled (only if the recurring payment is shown to receiver) + IncomingRecurringPaymentCanceled, + /// A notification generated if a recurring payment to a user has failed (only if the recurring payment is shown to receiver) + IncomingRecurringPaymentFailed, + /// A notification generated if a recurring payment to a user was received (only if the recurring payment is shown to receiver) + IncomingRecurringPaymentReceived, + /// A notification generated if a scheduled payment to a user has been canceled (only if the scheduled payment is shown to receiver) + IncomingScheduledPaymentCanceled, + /// A notification generated if a scheduled payment to a user has failed (only if the scheduled payment is shown to receiver) + IncomingScheduledPaymentFailed, + /// A notification generated if a scheduled payment to a user was received (only if the scheduled payment is shown to receiver) + IncomingScheduledPaymentReceived, + /// A notification generated if a limit (lower/upper) has changed on an account + LimitChange, + /// A notification generated if a product with stock quantity under limit + LowStockQuantity, + /// A notification generated if the maximum number of SMS messages per month has been reached + MaxSmsPerMonthReached, + /// A notification generated if an user has been assigned to a broker + MemberAssigned, + /// A notification generated if an user has been unassigned from a broker + MemberUnassigned, + /// An admin notification generated if a network is created + NetworkCreated, + /// A notification generated if a token / card has been created + NewToken, + /// A notification generated if a token / card has been created, but needs to be activated before being used + NewTokenPendingActivation, + /// A notification generated if a payment performed by an operator with authorization type + OperatorAuthorizedPaymentApprovedStillPending, + /// A notification generated if a payment performed by an operator with authorization type + OperatorAuthorizedPaymentCanceled, + /// A notification generated if a payment performed by an operator with authorization type + OperatorAuthorizedPaymentDenied, + /// A notification generated if a payment performed by an operator with authorization type + OperatorAuthorizedPaymentExpired, + /// A notification generated if a payment performed by an operator with authorization type + OperatorAuthorizedPaymentSucceeded, + /// A notification generated if a payment performed by an operator with authorization type + OperatorPaymentAwaitingAuthorization, + /// A notification generated if a pending order has been canceled + OrderCanceledBuyer, + /// A notification generated if a pending order has been canceled + OrderCanceledSeller, + /// A notification generated if a new web shop order created from a shopping cart checkout + OrderCreated, + /// A notification generated if an order payment was canceled by authorizer + OrderPaymentCanceledBuyer, + /// A notification generated if an order payment was canceled by authorizer + OrderPaymentCanceledSeller, + /// A notification generated if an order payment was denied by authorizer + OrderPaymentDeniedBuyer, + /// A notification generated if an order payment was denied by authorizer + OrderPaymentDeniedSeller, + /// A notification generated if an order payment has automatically expired + OrderPaymentExpiredBuyer, + /// A notification generated if an order payment has automatically expired + OrderPaymentExpiredSeller, + /// A notification generated if an order accepted by buyer/seller but the payment is pending for authorization + OrderPendingAuthorizationBuyer, + /// A notification generated if an order accepted by buyer/seller but the payment is pending for authorization + OrderPendingAuthorizationSeller, + /// A notification generated if an order pending buyer approval + OrderPendingBuyer, + /// A notification generated if an order buyer needs to fill in the delivery data + OrderPendingDeliveryDataBuyer, + /// A notification generated if an order seller needs to fill in the delivery data + OrderPendingDeliveryDataSeller, + /// A notification generated if an order accepted by buyer (sent to seller) + OrderRealizedBuyer, + /// A notification generated if an order accepted by seller (sent to buyer) + OrderRealizedSeller, + /// A notification generated if an order rejected by buyer + OrderRejectedByBuyer, + /// A notification generated if an order rejected by seller + OrderRejectedBySeller, + /// A notification generated if a password status has changed + PasswordStatusChanged, + /// An admin notification generated if a payment is awaiting for authorization + PaymentAwaitingAdminAuthorization, + /// A notification generated if a user must authorize a pending payment + PaymentAwaitingAuthorization, + /// An admin notification generated if a payment is performed + PaymentPerformed, + /// A notification generated if a payment performed from a user is charged back + PaymentPerformedChargedBack, + /// A notification generated if a user received a new payment + PaymentReceived, + /// A notification generated if a payment received by a user is charged back + PaymentReceivedChargedBack, + /// A notification generated if a payment request was canceled + PaymentRequestCanceled, + /// A notification generated if a payment request was denied + PaymentRequestDenied, + /// A notification generated if the payment request's expiration date has changed + PaymentRequestExpirationDateChanged, + /// A notification generated if a payment request has expired + PaymentRequestExpired, + /// A notification generated if a payment request was processed + PaymentRequestProcessed, + /// A notification generated if a payment request was received + PaymentRequestReceived, + /// A notification generated if a recurring payment from a user has failed (probably because of lack of funds) + RecurringPaymentFailed, + /// A notification generated if an occurrence of an outgoing recurring payment was processed + RecurringPaymentOccurrenceProcessed, + /// A notification generated if a reference was modified + ReferenceChanged, + /// A notification generated if a reference has been set + ReferenceCreated, + /// A notification generated if a sale pending buyer approval + SalePendingBuyer, + /// A notification generated if a sale accepted by buyer (sent to seller) + SaleRealizedBuyer, + /// A notification generated if a sale rejected by seller + SaleRejectedSeller, + /// A notification generated if a scheduled payment from a user has failed (probably because of lack of funds) + ScheduledPaymentFailed, + /// A notification generated if a scheduled payment to a user has been processed + ScheduledPaymentInstallmentProcessed, + /// A notification generated if a payment request which was scheduled has failed processing (probably because of lack of funds), and is being reopened + ScheduledPaymentRequestFailed, + /// A notification generated if the payment request's expiration date has changed. This notification is to be sent to the sender + SentPaymentRequestExpirationDateChanged, + /// A notification generated if a user performed a new payment through SMS + SmsPerformedPayment, + /// An admin notification generated if a system alert as occurred + SystemAlert, + /// A notification generated if the invocation of a webhook after (a successful) ticket approval has failed + TicketWebhookFailed, + /// A notification generated if a token / card status has changed + TokenStatusChanged, + /// An admin notification generated if a member alert as occurred + UserAlert, + /// An admin notification generated if a user import has been done + UserImport, + /// An admin notification generated if a new user has been registered + UserRegistration, + /// A notification generated if a user status has changed + UserStatusChanged, + /// A notification generated if a one or more bought vouchers are about to expire + VoucherAboutToExpire, + /// A notification generated when a voucher was assigned to the user + VoucherAssigned, + /// An admin notification generated if a voucher type allowing buy is about to expire + VoucherBuyingAboutToExpire, + /// A notification generated if a bought voucher has new expiration date + VoucherExpirationDateChanged, + /// A notification generated if one or more bought vouchers have expired + VoucherExpired, + /// A voucher PIN was blocked by exceeding invalid attempts + VoucherPinBlocked, + /// A voucher was redeemed + VoucherRedeem, + /// A voucher was topped-up + VoucherTopUp, +} + +#[derive(Debug, serde::Deserialize, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +/// The type of the entity referenced by the notification, if any. +pub enum NotificationEntityType { + /// The entity is an user account + Account, + /// The entity is an advertisement question + AdQuestion, + /// The entity is an error log + ErrorLog, + /// The entity is a transaction feedback + Feedback, + /// The entity is a + Marketplace, + /// The entity is a network + Network, + /// The entity is an order + Order, + /// The entity is a password type + PasswordType, + /// The entity is an user reference + Reference, + /// The entity is a system alert + SystemAlert, + /// The entity is a token (user identification) + Token, + /// The entity is a transaction + Transaction, + /// The entity is a transfer + Transfer, + /// The entity is an user + User, + /// The entity is an user alert + UserAlert, + /// The entity is an user imported file + UserImportedFile, + /// The entity is a voucher + Voucher, + /// The entity is a voucher transaction (redeem or top-up) + VoucherTransaction, + /// The entity is a voucher type + VoucherType, +} diff --git a/taler-cyclos/src/lib.rs b/taler-cyclos/src/lib.rs @@ -30,6 +30,7 @@ pub mod config; pub mod constants; pub mod cyclos_api; pub mod db; +pub mod notification; pub mod setup; pub mod worker; @@ -74,9 +75,7 @@ impl FromStr for CyclosId { type Err = CyclosIdError; fn from_str(s: &str) -> Result<Self, Self::Err> { - Ok(Self( - u64::from_str(s.trim_start_matches('-')).map_err(CyclosIdError)?, - )) + Ok(Self(u64::from_str(s).map_err(CyclosIdError)?)) } } diff --git a/taler-cyclos/src/main.rs b/taler-cyclos/src/main.rs @@ -99,10 +99,10 @@ async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> { run_serve(cfg, pool).await?; } } - Command::Worker { transient: _ } => { + Command::Worker { transient } => { let pool = pool(cfg).await?; let client = reqwest::Client::new(); - run_worker(cfg, &pool, &client).await?; + run_worker(cfg, &pool, &client, transient).await?; } Command::Config(cfg_cmd) => cfg_cmd.run(cfg)?, Command::TalerDeployment(cmd) => match cmd { diff --git a/taler-cyclos/src/notification.rs b/taler-cyclos/src/notification.rs @@ -0,0 +1,66 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use jiff::Timestamp; +use taler_common::ExpoBackoffDecorr; +use tokio::sync::Notify; +use tracing::{debug, error, trace}; + +use crate::cyclos_api::{ + api::SseClient, + client::Client, + types::{NotificationEntityType, NotificationStatus}, +}; + +pub async fn watch_notification(client: &Client<'_>, notify: &Notify) -> ! { + let client_id = Timestamp::now().as_microsecond() as u64; + let mut sse_client = SseClient::new(); + let mut jitter = ExpoBackoffDecorr::default(); + loop { + let res: anyhow::Result<()> = async { + loop { + // Register listener + client + .push_notifications(client_id, &mut sse_client) + .await?; + jitter.reset(); + // Read available ones + while let Some(message) = sse_client.next().await { + let msg = message?; + trace!(target: "notification", "new message {}: {}", msg.event, msg.data); + if msg.event == "newNotification" { + let deserializer = &mut serde_json::Deserializer::from_str(&msg.data); + let status: NotificationStatus = + serde_path_to_error::deserialize(deserializer)?; + debug!(target: "notification", "new notification {} {:?} {:?}", status.notification.id, status.notification.ty, status.notification.entity_type); + if status.notification.entity_type == Some(NotificationEntityType::Transfer) + { + notify.notify_one(); + } + + // Find a way to buffer all transactions + } + } + + tokio::time::sleep( jitter.backoff()).await; + } + } + .await; + let err = res.unwrap_err(); + error!(target: "notification", "{err}"); + tokio::time::sleep( jitter.backoff()).await; + } +} diff --git a/taler-cyclos/src/setup.rs b/taler-cyclos/src/setup.rs @@ -106,39 +106,45 @@ pub async fn setup(cfg: &Config, _reset: bool, client: &reqwest::Client) -> anyh cfg.currency, currency.ty.name ); } + } - // Check payment type - let data = client.payment_data().await?; - let payment_fmt = { - let mut s = String::new(); - for p in &data.payment_types { + // Check payment type + let data = client.payment_data().await?; + let payment_fmt = { + let mut s = String::new(); + for p in &data.payment_types { + if let Some(currency) = currency { if p.currency.ty.id == currency.ty.id { - write!( - &mut s, - "\n{} '{}' in {}", - p.ty.id, p.ty.name, p.currency.ty.name - ) - .unwrap(); + write!(&mut s, "\n{} '{}'", p.ty.id, p.ty.name).unwrap(); } + } else { + write!( + &mut s, + "\n{} '{}' in {}", + p.ty.id, p.ty.name, p.currency.ty.name + ) + .unwrap(); } - s - }; - if let Some(id) = cfg.payment_type_id { - if let Some(p) = data.payment_types.iter().find(|it| it.ty.id == id) { - if p.currency.ty.id != currency.ty.id { - error!(target: "setup", "PAYMENT_TYPE_ID {} in config use currency {} expected {}, must be one of:{payment_fmt}", p.ty.id, p.currency.ty.name, currency.ty.name); - ready = false - } else { - debug!(target: "setup", "PAYMENT_TYPE_ID {id} config is one of:{payment_fmt}"); - } + } + s + }; + if let Some(id) = cfg.payment_type_id { + if let Some(p) = data.payment_types.iter().find(|it| it.ty.id == id) { + if let Some(currency) = currency + && p.currency.ty.id != currency.ty.id + { + error!(target: "setup", "PAYMENT_TYPE_ID {} in config use currency {} expected {}, must be one of:{payment_fmt}", p.ty.id, p.currency.ty.name, currency.ty.name); + ready = false } else { - error!(target: "setup", "Unknown PAYMENT_TYPE_ID {id} in config, must be one of:{payment_fmt}"); - ready = false; + debug!(target: "setup", "PAYMENT_TYPE_ID {id} config is one of:{payment_fmt}"); } } else { - error!(target: "setup", "Missing PAYMENT_TYPE_ID in config, must be one of:{payment_fmt}"); + error!(target: "setup", "Unknown PAYMENT_TYPE_ID {id} in config, must be one of:{payment_fmt}"); ready = false; } + } else { + error!(target: "setup", "Missing PAYMENT_TYPE_ID in config, must be one of:{payment_fmt}"); + ready = false; } if ready { diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs @@ -23,6 +23,7 @@ use taler_common::{ config::Config, types::amount::{self, Currency}, }; +use tokio::{join, sync::Notify}; use tracing::{debug, error, info, trace, warn}; use crate::{ @@ -37,6 +38,7 @@ use crate::{ db::{ self, AddIncomingResult, ChargebackFailureResult, RegisterResult, TxIn, TxOut, TxOutKind, }, + notification::watch_notification, }; #[derive(Debug, thiserror::Error)] @@ -54,58 +56,91 @@ pub type WorkerResult = Result<(), WorkerError>; pub async fn run_worker( cfg: &Config, pool: &PgPool, - client: &reqwest::Client, + http_client: &reqwest::Client, + transient: bool, ) -> anyhow::Result<()> { let cfg = WorkerCfg::parse(cfg)?; let client = Client { - client, + client: http_client, api_url: &cfg.host.api_url, auth: &CyclosAuth::Basic { username: cfg.host.username, password: cfg.host.password, }, }; - let mut jitter = ExpoBackoffDecorr::default(); - loop { - let res: WorkerResult = async { - let db = &mut PgListener::connect_with(pool).await?; + if transient { + let mut conn = pool.acquire().await?; + Worker { + client: &client, + db: &mut *conn, + account_type_id: *cfg.account_type_id, + payment_type_id: *cfg.payment_type_id, + account_type: cfg.account_type, + currency: cfg.currency.clone(), + } + .run() + .await?; + return Ok(()); + } - // TODO take a postgresql lock ? + let notification = Notify::new(); - // Listen to all channels - db.listen_all(["transfer"]).await?; + let watcher = async { + watch_notification(&client, &notification).await; + }; + let worker = async { + let mut jitter = ExpoBackoffDecorr::default(); + loop { + let res: WorkerResult = async { + let db = &mut PgListener::connect_with(pool).await?; - loop { - debug!(target: "worker", "running"); + // TODO take a postgresql lock ? - Worker { - client: &client, - db: db.acquire().await?, - account_type_id: *cfg.account_type_id, - payment_type_id: *cfg.payment_type_id, - account_type: cfg.account_type, - currency: cfg.currency.clone(), - } - .run() - .await?; - jitter.reset(); + // Listen to all channels + db.listen_all(["transfer"]).await?; + + info!(target: "worker", "running at initialisation"); - // Wait for notifications or sync timeout - if let Ok(res) = tokio::time::timeout(WORKER_FREQUENCY, db.try_recv()).await { - let mut ntf = res?; - // Conflate all notifications - while let Some(n) = ntf { - debug!(target: "worker", "notification from {}", n.channel()); - ntf = db.next_buffered(); + loop { + Worker { + client: &client, + db: db.acquire().await?, + account_type_id: *cfg.account_type_id, + payment_type_id: *cfg.payment_type_id, + account_type: cfg.account_type, + currency: cfg.currency.clone(), } + .run() + .await?; + jitter.reset(); + + tokio::select! { + _ = tokio::time::sleep(WORKER_FREQUENCY) => { + info!(target: "worker", "running at frequency"); + } + res = db.try_recv() => { + let mut ntf = res?; + // Conflate all notifications + while let Some(n) = ntf { + debug!(target: "worker", "notification from {}", n.channel()); + ntf = db.next_buffered(); + } + info!(target: "worker", "running at db trigger"); + } + _ = notification.notified() => { + info!(target: "worker", "running at notification trigger"); + } + }; } } + .await; + let err = res.unwrap_err(); + error!(target: "worker", "{err}"); + tokio::time::sleep(jitter.backoff()).await; } - .await; - let err = res.unwrap_err(); - error!(target: "worker", "{err}"); - tokio::time::sleep(jitter.backoff()).await; - } + }; + join!(watcher, worker); // TODO try_join + Ok(()) } pub struct Worker<'a> {