commit b826430a32e7a8c9f909ef072658329977a15e93
parent c9a55ed01615c3e28c4c2921aea585f720444a61
Author: Antoine A <>
Date: Tue, 30 Dec 2025 11:39:51 +0100
common: worker lock and worker frequency config
Diffstat:
18 files changed, 182 insertions(+), 91 deletions(-)
diff --git a/common/taler-common/src/config.rs b/common/taler-common/src/config.rs
@@ -20,9 +20,11 @@ use std::{
os::unix::fs::PermissionsExt,
path::PathBuf,
str::FromStr,
+ time::Duration,
};
use indexmap::IndexMap;
+use jiff::SignedDuration;
use url::Url;
use crate::types::{
@@ -866,6 +868,14 @@ impl<'cfg, 'arg> Section<'cfg, 'arg> {
pub fn date(&self, option: &'arg str) -> Value<'arg, jiff::civil::Date> {
self.parse("Date", option)
}
+
+ /** Access [option] as a duration */
+ pub fn duration(&self, option: &'arg str) -> Value<'arg, Duration> {
+ self.value("temporal", option, |it| {
+ let tmp = SignedDuration::from_str(it).map_err(|e| e.to_string())?;
+ Ok::<_, String>(Duration::from_millis(tmp.as_millis() as u64))
+ })
+ }
}
pub struct Value<'arg, T> {
diff --git a/debian/etc/taler-cyclos/conf.d/cyclos-worker.conf b/debian/etc/taler-cyclos/conf.d/cyclos-worker.conf
@@ -1,6 +1,9 @@
# Configuration the cyclos adapter worker.
[cyclos-worker]
+# How often should worker run when no notification is received
+# FREQUENCY =
+
# Cyclos account type ID to index
ACCOUNT_TYPE_ID =
diff --git a/debian/etc/taler-magnet-bank/conf.d/magnet-bank-worker.conf b/debian/etc/taler-magnet-bank/conf.d/magnet-bank-worker.conf
@@ -1,5 +1,8 @@
# Configuration the magnet bank adapter worker.
[magnet-bank-worker]
+# How often should worker run
+# FREQUENCY
+
KEYS_FILE = ${MAGNET_BANK_HOME}/keys.json
@inline-secret@ magnet-bank-worker ../secrets/magnet-bank-worker.secret.conf
diff --git a/taler-cyclos/cyclos.conf b/taler-cyclos/cyclos.conf
@@ -9,6 +9,9 @@ ACCOUNT_ID =
NAME =
[cyclos-worker]
+# How often should worker run when no notification is received
+FREQUENCY = 30m
+
# URL of the Cyclos API server
API_URL =
diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs
@@ -554,7 +554,6 @@ async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> {
tokio::spawn(async move { run_worker(&config, &pool, &client, false).await })
};
tokio::time::sleep(Duration::from_secs(5)).await;
- harness.worker().await.unwrap();
let now = Timestamp::now();
let balance = &mut Balances::new(&harness).await;
diff --git a/taler-cyclos/src/config.rs b/taler-cyclos/src/config.rs
@@ -14,6 +14,8 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
+use std::time::Duration;
+
use reqwest::Url;
use taler_api::{
Serve,
@@ -126,6 +128,7 @@ impl SetupCfg {
/// taler-cyclos worker config
pub struct WorkerCfg {
pub currency: Currency,
+ pub frequency: Duration,
pub host: HostCfg,
pub account_type: AccountType,
pub account_type_id: CyclosId,
@@ -138,6 +141,7 @@ impl WorkerCfg {
let sect = cfg.section("cyclos-worker");
Ok(Self {
currency: main_sect.parse("currency", "CURRENCY").require()?,
+ frequency: sect.duration("FREQUENCY").require()?,
account_type: map_config!(sect, "account type", "ACCOUNT_TYPE",
"exchange" => { Ok(AccountType::Exchange) },
"normal" => { Ok(AccountType::Normal) }
diff --git a/taler-cyclos/src/constants.rs b/taler-cyclos/src/constants.rs
@@ -14,9 +14,6 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::time::Duration;
-
use taler_common::config::parser::ConfigSource;
pub const CONFIG_SOURCE: ConfigSource = ConfigSource::new("taler-cyclos", "cyclos", "taler-cyclos");
-pub const WORKER_FREQUENCY: Duration = Duration::from_secs(600);
diff --git a/taler-cyclos/src/db.rs b/taler-cyclos/src/db.rs
@@ -185,6 +185,14 @@ pub enum AddIncomingResult {
ReservePubReuse,
}
+/// Lock the database for worker execution
+pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
+ sqlx::query("SELECT pg_try_advisory_lock(42)")
+ .try_map(|r: PgRow| r.try_get(0))
+ .fetch_one(e)
+ .await
+}
+
pub async fn register_tx_in_admin(
db: &PgPool,
tx: &TxInAdmin,
diff --git a/taler-cyclos/src/notification.rs b/taler-cyclos/src/notification.rs
@@ -61,6 +61,6 @@ pub async fn watch_notification(client: &Client<'_>, notify: &Notify) -> ! {
.await;
let err = res.unwrap_err();
error!(target: "notification", "{err}");
- tokio::time::sleep( jitter.backoff()).await;
+ tokio::time::sleep(jitter.backoff()).await;
}
}
diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs
@@ -14,6 +14,8 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
+use std::time::Duration;
+
use failure_injection::{InjectedErr, fail_point};
use jiff::Timestamp;
use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener};
@@ -29,7 +31,6 @@ use tracing::{debug, error, info, trace, warn};
use crate::{
FullCyclosPayto,
config::{AccountType, WorkerCfg},
- constants::WORKER_FREQUENCY,
cyclos_api::{
api::{ApiErr, CyclosAuth, ErrKind},
client::Client,
@@ -47,6 +48,8 @@ pub enum WorkerError {
Db(#[from] sqlx::Error),
#[error(transparent)]
Api(#[from] ApiErr),
+ #[error("Another worker is running concurrently")]
+ Concurrency,
#[error(transparent)]
Injected(#[from] InjectedErr),
}
@@ -56,12 +59,12 @@ pub type WorkerResult = Result<(), WorkerError>;
pub async fn run_worker(
cfg: &Config,
pool: &PgPool,
- http_client: &reqwest::Client,
+ client: &reqwest::Client,
transient: bool,
) -> anyhow::Result<()> {
let cfg = WorkerCfg::parse(cfg)?;
let client = Client {
- client: http_client,
+ client,
api_url: &cfg.host.api_url,
auth: &CyclosAuth::Basic {
username: cfg.host.username,
@@ -72,7 +75,7 @@ pub async fn run_worker(
let mut conn = pool.acquire().await?;
Worker {
client: &client,
- db: &mut *conn,
+ db: &mut conn,
account_type_id: *cfg.account_type_id,
payment_type_id: *cfg.payment_type_id,
account_type: cfg.account_type,
@@ -94,8 +97,6 @@ pub async fn run_worker(
let res: WorkerResult = async {
let db = &mut PgListener::connect_with(pool).await?;
- // TODO take a postgresql lock ?
-
// Listen to all channels
db.listen_all(["transfer"]).await?;
@@ -115,7 +116,7 @@ pub async fn run_worker(
jitter.reset();
tokio::select! {
- _ = tokio::time::sleep(WORKER_FREQUENCY) => {
+ _ = tokio::time::sleep(cfg.frequency) => {
info!(target: "worker", "running at frequency");
}
res = db.try_recv() => {
@@ -136,6 +137,12 @@ pub async fn run_worker(
.await;
let err = res.unwrap_err();
error!(target: "worker", "{err}");
+
+ if matches!(err, WorkerError::Concurrency) {
+ // This error won't resolve by itself easily and it mean we are actually making progress
+ // in another worker so we can jitter more aggressively
+ tokio::time::sleep(Duration::from_secs(15)).await;
+ }
tokio::time::sleep(jitter.backoff()).await;
}
};
@@ -155,6 +162,12 @@ pub struct Worker<'a> {
impl Worker<'_> {
/// Run a single worker pass
pub async fn run(&mut self) -> WorkerResult {
+ // Some worker operations are not idempotent, therefore it's not safe to have multiple worker
+ // running concurrently. We use a global Postgres advisory lock to prevent it.
+ if !db::worker_lock(self.db).await? {
+ return Err(WorkerError::Concurrency);
+ };
+
// Sync transactions
//let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused
diff --git a/taler-magnet-bank/magnet-bank.conf b/taler-magnet-bank/magnet-bank.conf
@@ -6,6 +6,9 @@ IBAN =
NAME =
[magnet-bank-worker]
+# How often should worker run
+FREQUENCY = 1m
+
# URL of the Magnet Bank API server
API_URL = "https://mobil.magnetbank.hu"
diff --git a/taler-magnet-bank/src/bin/magnet-bank-harness.rs b/taler-magnet-bank/src/bin/magnet-bank-harness.rs
@@ -42,9 +42,8 @@ use taler_magnet_bank::{
client::{ApiClient, AuthClient},
types::{Account, Direction, Order, TxDto, TxStatus},
},
- run_worker,
setup::{self, Keys},
- worker::{Worker, WorkerError, WorkerResult},
+ worker::{Worker, WorkerError, WorkerResult, run_worker},
};
// TODO macro for retry/expect logic
@@ -536,7 +535,7 @@ async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> {
let client = client.clone();
let pool = pool.clone();
let config = config.clone();
- tokio::spawn(async move { run_worker(&config, &pool, &client).await })
+ tokio::spawn(async move { run_worker(&config, &pool, &client, false).await })
};
tokio::time::sleep(Duration::from_secs(25)).await;
diff --git a/taler-magnet-bank/src/config.rs b/taler-magnet-bank/src/config.rs
@@ -14,6 +14,8 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
+use std::time::Duration;
+
use jiff::civil::Date;
use reqwest::Url;
use taler_api::{
@@ -77,6 +79,7 @@ pub enum AccountType {
/// taler-magnet-bank worker config
pub struct WorkerCfg {
pub payto: FullHuPayto,
+ pub frequency: Duration,
pub api_url: Url,
pub consumer: Token,
pub keys_path: String,
@@ -91,6 +94,7 @@ impl WorkerCfg {
let sect = cfg.section("magnet-bank-worker");
Ok(Self {
payto,
+ frequency: sect.duration("FREQUENCY").require()?,
account_type: map_config!(sect, "account type", "ACCOUNT_TYPE",
"exchange" => { Ok(AccountType::Exchange) },
"normal" => { Ok(AccountType::Normal) }
diff --git a/taler-magnet-bank/src/constants.rs b/taler-magnet-bank/src/constants.rs
@@ -14,7 +14,7 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::{sync::LazyLock, time::Duration};
+use std::sync::LazyLock;
use taler_common::{config::parser::ConfigSource, types::amount::Currency};
@@ -22,4 +22,3 @@ pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "HUF".parse().unwrap(
pub const MAX_MAGNET_BBAN_SIZE: usize = 24;
pub const CONFIG_SOURCE: ConfigSource =
ConfigSource::new("taler-magnet-bank", "magnet-bank", "taler-magnet-bank");
-pub const WORKER_FREQUENCY: Duration = Duration::from_secs(60);
diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs
@@ -167,6 +167,14 @@ pub struct TxInAdmin {
pub metadata: IncomingSubject,
}
+/// Lock the database for worker execution
+pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> {
+ sqlx::query("SELECT pg_try_advisory_lock(42)")
+ .try_map(|r: PgRow| r.try_get(0))
+ .fetch_one(e)
+ .await
+}
+
#[derive(Debug, PartialEq, Eq)]
pub enum AddIncomingResult {
Success {
diff --git a/taler-magnet-bank/src/lib.rs b/taler-magnet-bank/src/lib.rs
@@ -16,25 +16,17 @@
use std::{borrow::Cow, str::FromStr, sync::Arc};
-use sqlx::{Acquire, PgPool, postgres::PgListener};
+use sqlx::PgPool;
use taler_api::api::{Router, TalerRouter as _};
use taler_common::{
- ExpoBackoffDecorr,
config::Config,
types::{
iban::{Country, IBAN, IbanErrorKind, ParseIbanError},
payto::{FullPayto, IbanPayto, Payto, PaytoErr, PaytoImpl, PaytoURI, TransferPayto},
},
};
-use tracing::{debug, error};
-use crate::{
- api::MagnetApi,
- config::{ServeCfg, WorkerCfg},
- constants::WORKER_FREQUENCY,
- magnet_api::client::AuthClient,
- worker::{Worker, WorkerResult},
-};
+use crate::{api::MagnetApi, config::ServeCfg};
pub mod api;
pub mod config;
@@ -59,60 +51,6 @@ pub async fn run_serve(cfg: &Config, pool: PgPool) -> anyhow::Result<()> {
Ok(())
}
-pub async fn run_worker(
- cfg: &Config,
- pool: &PgPool,
- client: &reqwest::Client,
-) -> anyhow::Result<()> {
- let cfg = WorkerCfg::parse(cfg)?;
- let keys = setup::load(&cfg)?;
- let client = AuthClient::new(client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token);
- let mut jitter = ExpoBackoffDecorr::default();
- // TODO run in loop and handle errors
- loop {
- let res: WorkerResult = async {
- let account = client.account(cfg.payto.bban()).await?;
- let db = &mut PgListener::connect_with(pool).await?;
-
- // TODO take a postgresql lock ?
-
- // Listen to all channels
- db.listen_all(["transfer"]).await?;
-
- loop {
- debug!(target: "worker", "running");
- Worker {
- client: &client,
- db: db.acquire().await?,
- account_number: &account.number,
- account_code: account.code,
- key: &keys.signing_key,
- account_type: cfg.account_type,
- ignore_tx_before: cfg.ignore_tx_before,
- ignore_bounces_before: cfg.ignore_bounces_before,
- }
- .run()
- .await?;
- jitter.reset();
-
- // Wait for notifications or sync timeout
- if let Ok(res) = tokio::time::timeout(WORKER_FREQUENCY, db.try_recv()).await {
- let mut ntf = res?;
- // Conflate all notifications
- while let Some(n) = ntf {
- debug!(target: "worker", "notification from {}", n.channel());
- ntf = db.next_buffered();
- }
- }
- }
- }
- .await;
- let err = res.unwrap_err();
- error!(target: "worker", "{err}");
- tokio::time::sleep(jitter.backoff()).await;
- }
-}
-
#[derive(
Debug, Clone, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
)]
diff --git a/taler-magnet-bank/src/main.rs b/taler-magnet-bank/src/main.rs
@@ -24,7 +24,8 @@ use taler_magnet_bank::{
constants::CONFIG_SOURCE,
db::{dbinit, pool},
dev::{self, DevCmd},
- run_serve, run_worker, setup,
+ run_serve, setup,
+ worker::run_worker,
};
#[derive(clap::Parser, Debug)]
@@ -102,10 +103,10 @@ async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> {
run_serve(cfg, pool).await?;
}
}
- Command::Worker { transient: _ } => {
+ Command::Worker { transient } => {
let pool = pool(cfg).await?;
let client = reqwest::Client::new();
- run_worker(cfg, &pool, &client).await?;
+ run_worker(cfg, &pool, &client, transient).await?;
}
Command::Config(cfg_cmd) => cfg_cmd.run(cfg)?,
Command::Dev(dev_cmd) => dev::dev(cfg, dev_cmd).await?,
diff --git a/taler-magnet-bank/src/worker.rs b/taler-magnet-bank/src/worker.rs
@@ -14,28 +14,33 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::num::ParseIntError;
+use std::{num::ParseIntError, time::Duration};
use failure_injection::{InjectedErr, fail_point};
use jiff::{Timestamp, Zoned, civil::Date};
use p256::ecdsa::SigningKey;
-use sqlx::PgConnection;
+use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener};
use taler_api::subject::{self, parse_incoming_unstructured};
-use taler_common::types::{
- amount::{self},
- iban::IBAN,
+use taler_common::{
+ ExpoBackoffDecorr,
+ config::Config,
+ types::{
+ amount::{self},
+ iban::IBAN,
+ },
};
use tracing::{debug, error, info, trace, warn};
use crate::{
FullHuPayto, HuIban,
- config::AccountType,
+ config::{AccountType, WorkerCfg},
db::{self, AddIncomingResult, Initiated, RegisterResult, TxIn, TxOut, TxOutKind},
magnet_api::{
api::{ApiErr, ErrKind},
- client::ApiClient,
+ client::{ApiClient, AuthClient},
types::{Direction, Next, Order, TxDto, TxStatus},
},
+ setup,
};
// const TXS_CURSOR_KEY: &str = "txs_cursor"; TODO cursor is broken
@@ -46,12 +51,100 @@ pub enum WorkerError {
Db(#[from] sqlx::Error),
#[error(transparent)]
Api(#[from] ApiErr),
+ #[error("Another worker is running concurrently")]
+ Concurrency,
#[error(transparent)]
Injected(#[from] InjectedErr),
}
pub type WorkerResult = Result<(), WorkerError>;
+pub async fn run_worker(
+ cfg: &Config,
+ pool: &PgPool,
+ client: &reqwest::Client,
+ transient: bool,
+) -> anyhow::Result<()> {
+ let cfg = WorkerCfg::parse(cfg)?;
+ let keys = setup::load(&cfg)?;
+ let client = AuthClient::new(client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token);
+
+ if transient {
+ let mut conn = pool.acquire().await?;
+ let account = client.account(cfg.payto.bban()).await?;
+ Worker {
+ client: &client,
+ db: &mut conn,
+ account_number: &account.number,
+ account_code: account.code,
+ key: &keys.signing_key,
+ account_type: cfg.account_type,
+ ignore_tx_before: cfg.ignore_tx_before,
+ ignore_bounces_before: cfg.ignore_bounces_before,
+ }
+ .run()
+ .await?;
+ return Ok(());
+ }
+
+ let mut jitter = ExpoBackoffDecorr::default();
+
+ loop {
+ let res: WorkerResult = async {
+ let account = client.account(cfg.payto.bban()).await?;
+ let db = &mut PgListener::connect_with(pool).await?;
+
+ // Listen to all channels
+ db.listen_all(["transfer"]).await?;
+
+ info!(target: "worker", "running at initialisation");
+
+ loop {
+ debug!(target: "worker", "running");
+ Worker {
+ client: &client,
+ db: db.acquire().await?,
+ account_number: &account.number,
+ account_code: account.code,
+ key: &keys.signing_key,
+ account_type: cfg.account_type,
+ ignore_tx_before: cfg.ignore_tx_before,
+ ignore_bounces_before: cfg.ignore_bounces_before,
+ }
+ .run()
+ .await?;
+ jitter.reset();
+
+ // Wait for notifications or sync timeout
+ if let Ok(res) = tokio::time::timeout(cfg.frequency, db.try_recv()).await {
+ let mut ntf = res?;
+ // Conflate all notifications
+ while let Some(n) = ntf {
+ debug!(target: "worker", "notification from {}", n.channel());
+ ntf = db.next_buffered();
+ }
+
+ if ntf.is_some() {
+ info!(target: "worker", "running at db trigger");
+ } else {
+ info!(target: "worker", "running at frequency");
+ }
+ }
+ }
+ }
+ .await;
+ let err = res.unwrap_err();
+ error!(target: "worker", "{err}");
+
+ if matches!(err, WorkerError::Concurrency) {
+ // This error won't resolve by itself easily and it mean we are actually making progress
+ // in another worker so we can jitter more aggressively
+ tokio::time::sleep(Duration::from_secs(15)).await;
+ }
+ tokio::time::sleep(jitter.backoff()).await;
+ }
+}
+
pub struct Worker<'a> {
pub client: &'a ApiClient<'a>,
pub db: &'a mut PgConnection,
@@ -66,6 +159,12 @@ pub struct Worker<'a> {
impl Worker<'_> {
/// Run a single worker pass
pub async fn run(&mut self) -> WorkerResult {
+ // Some worker operations are not idempotent, therefore it's not safe to have multiple worker
+ // running concurrently. We use a global Postgres advisory lock to prevent it.
+ if !db::worker_lock(self.db).await? {
+ return Err(WorkerError::Concurrency);
+ };
+
// Sync transactions
let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused
let mut all_final = true;