commit b6bbec29bc0ff1b5361cc3a9347efaeb31f2f176
parent 5b3c3146dc8e8fd24fac5e3da7a6d6975fad51fd
Author: Antoine A <>
Date: Sun, 15 Feb 2026 18:53:44 +0100
cyclos: improve error handling
Diffstat:
1 file changed, 40 insertions(+), 25 deletions(-)
diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs
@@ -35,7 +35,7 @@ use crate::{
cyclos_api::{
api::{CyclosAuth, CyclosErr},
client::Client,
- types::{AccountKind, HistoryItem, NotFoundError, OrderBy},
+ types::{AccountKind, HistoryItem, InputError, NotFoundError, OrderBy},
},
db::{
self, AddIncomingResult, ChargebackFailureResult, RegisterResult, TxIn, TxOut, TxOutKind,
@@ -95,16 +95,36 @@ pub async fn run_worker(
};
let worker = async {
let mut jitter = ExpoBackoffDecorr::default();
+ let mut skip_notifications: bool = true;
loop {
+ info!(target: "worker", "running at initialisation");
let res: WorkerResult = async {
let db = &mut PgListener::connect_with(pool).await?;
// Listen to all channels
db.listen_all(["transfer"]).await?;
- info!(target: "worker", "running at initialisation");
-
loop {
+ if !skip_notifications {
+ tokio::select! {
+ _ = tokio::time::sleep(cfg.frequency) => {
+ info!(target: "worker", "running at frequency");
+ }
+ res = db.try_recv() => {
+ let mut ntf = res?;
+ // Conflate all notifications
+ while let Some(n) = ntf {
+ debug!(target: "worker", "notification from {}", n.channel());
+ ntf = db.next_buffered();
+ }
+ info!(target: "worker", "running at db trigger");
+ }
+ _ = notification.notified() => {
+ info!(target: "worker", "running at notification trigger");
+ }
+ };
+ }
+ skip_notifications = false;
Worker {
client: &client,
db: db.acquire().await?,
@@ -116,34 +136,29 @@ pub async fn run_worker(
.run()
.await?;
jitter.reset();
-
- tokio::select! {
- _ = tokio::time::sleep(cfg.frequency) => {
- info!(target: "worker", "running at frequency");
- }
- res = db.try_recv() => {
- let mut ntf = res?;
- // Conflate all notifications
- while let Some(n) = ntf {
- debug!(target: "worker", "notification from {}", n.channel());
- ntf = db.next_buffered();
- }
- info!(target: "worker", "running at db trigger");
- }
- _ = notification.notified() => {
- info!(target: "worker", "running at notification trigger");
- }
- };
}
}
.await;
let err = res.unwrap_err();
error!(target: "worker", "{err}");
- if matches!(err, WorkerError::Concurrency) {
- // This error won't resolve by itself easily and it mean we are actually making progress
- // in another worker so we can jitter more aggressively
- tokio::time::sleep(Duration::from_secs(15)).await;
+ match err {
+ WorkerError::Concurrency => {
+ // This error won't resolve by itself easily and it mean we are actually making progress
+ // in another worker so we can jitter more aggressively
+ tokio::time::sleep(Duration::from_secs(15)).await;
+ skip_notifications = false;
+ }
+ WorkerError::Api(ApiErr {
+ ctx: _,
+ err: CyclosErr::Input(InputError::Validation { .. }),
+ }) => {
+ // In case of validation failure we do not want to retry right away as it can DOS the service
+ skip_notifications = false;
+ }
+ WorkerError::Api(_) | WorkerError::Db(_) | WorkerError::Injected(_) => {
+ skip_notifications = true;
+ }
}
tokio::time::sleep(jitter.backoff()).await;
}