commit 093f7025a62752f257d87f05cf3edecb2e444315
parent 121d46756e448b35397662e519fa2f3cacb9868a
Author: Antoine A <>
Date: Sun, 9 Nov 2025 18:31:38 +0100
magnet-bank: worker that works
Diffstat:
12 files changed, 484 insertions(+), 311 deletions(-)
diff --git a/common/taler-common/src/config.rs b/common/taler-common/src/config.rs
@@ -473,7 +473,7 @@ pub enum PathsubErr {
Unbound(String),
}
-#[derive(Debug)]
+#[derive(Debug, Clone)]
pub struct Config {
sections: IndexMap<String, IndexMap<String, String>>,
install_path: String,
diff --git a/taler-magnet-bank/src/api.rs b/taler-magnet-bank/src/api.rs
@@ -35,7 +35,7 @@ use tokio::sync::watch::Sender;
use crate::{
FullHuPayto,
- constant::CURRENCY,
+ constants::CURRENCY,
db::{self, AddIncomingResult, TxInAdmin},
};
diff --git a/taler-magnet-bank/src/bin/magnet-bank-harness.rs b/taler-magnet-bank/src/bin/magnet-bank-harness.rs
@@ -14,7 +14,7 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::time::Duration;
+use std::{fmt::Debug, time::Duration};
use clap::Parser as _;
use jiff::{Timestamp, Zoned};
@@ -27,23 +27,28 @@ use taler_common::{
api_params::{History, Page},
api_wire::{IncomingBankTransaction, TransferRequest, TransferState},
cli::long_version,
+ config::Config,
db::{dbinit, pool},
taler_main,
types::{self, amount::amount, url},
};
use taler_magnet_bank::{
- CONFIG_SOURCE, FullHuPayto, HuIban,
- config::{HarnessCfg, parse_db_cfg},
+ FullHuPayto, HuIban,
+ config::{AccountType, HarnessCfg, parse_db_cfg},
+ constants::CONFIG_SOURCE,
db::{self, TransferResult},
failure_injection::{FailureLogic, InjectedErr, set_failure_logic},
magnet_api::{
client::{ApiClient, AuthClient},
types::{Account, Direction, Order, TxDto, TxStatus},
},
- setup,
- worker::{Worker, WorkerError},
+ run_worker,
+ setup::{self, Keys},
+ worker::{Worker, WorkerError, WorkerResult},
};
+// TODO macro for retry/expect logic
+
/// Taler Magnet Bank Adapter harness test suite
#[derive(clap::Parser, Debug)]
#[command(long_version = long_version(), about, long_about = None)]
@@ -51,21 +56,74 @@ struct Args {
#[clap(flatten)]
common: CommonArgs,
- #[clap(long, short)]
- reset: bool,
- #[clap(long, short)]
- warmup: bool,
+ #[command(subcommand)]
+ cmd: Command,
+}
+
+#[derive(clap::Subcommand, Debug)]
+enum Command {
+ /// Run logic tests
+ Logic {
+ #[clap(long, short)]
+ reset: bool,
+ },
+ /// Run online tests
+ Online {
+ #[clap(long, short)]
+ reset: bool,
+ },
}
-struct HarnessClient<'a> {
+/// Custom client for harness actions
+struct Harness<'a> {
+ cfg: &'a HarnessCfg,
pool: &'a PgPool,
- api: &'a ApiClient<'a>,
- exchange: &'a Account,
- client: &'a Account,
+ api: ApiClient<'a>,
+ exchange: Account,
+ client: Account,
signing_key: &'a SigningKey,
}
-impl HarnessClient<'_> {
+impl<'a> Harness<'a> {
+ async fn new(
+ cfg: &'a HarnessCfg,
+ client: &'a reqwest::Client,
+ pool: &'a PgPool,
+ keys: &'a Keys,
+ ) -> Self {
+ let api = AuthClient::new(client, &cfg.worker.api_url, &cfg.worker.consumer)
+ .upgrade(&keys.access_token);
+ let (exchange, client) = tokio::try_join!(
+ api.account(cfg.worker.payto.bban()),
+ api.account(cfg.client_payto.bban())
+ )
+ .unwrap();
+ Self {
+ cfg,
+ pool,
+ api,
+ exchange,
+ client,
+ signing_key: &keys.signing_key,
+ }
+ }
+
+ async fn worker(&'a self) -> WorkerResult {
+ let db = &mut self.pool.acquire().await.unwrap().detach();
+ Worker {
+ client: &self.api,
+ db,
+ account_number: &self.exchange.number,
+ account_code: self.exchange.code,
+ key: self.signing_key,
+ account_type: AccountType::Exchange,
+ ignore_tx_before: self.cfg.worker.ignore_tx_before,
+ ignore_bounces_before: self.cfg.worker.ignore_bounces_before,
+ }
+ .run()
+ .await
+ }
+
async fn balance(&self) -> (u32, u32) {
let (exchange_balance, client_balance) = tokio::try_join!(
self.api.balance_mini(self.exchange.iban.bban()),
@@ -88,7 +146,7 @@ impl HarnessClient<'_> {
wtid: ShortHashCode::rand(),
credit_account: creditor.as_payto(),
},
- &creditor,
+ creditor,
&types::timestamp::Timestamp::now(),
)
.await
@@ -108,11 +166,21 @@ impl HarnessClient<'_> {
}
async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) {
- let transfer = db::transfer_by_id(self.pool, id).await.unwrap().unwrap();
- assert_eq!(
- (transfer.status, transfer.status_msg.as_deref()),
- (status, msg)
- );
+ let mut attempts = 0;
+ loop {
+ let transfer = db::transfer_by_id(self.pool, id).await.unwrap().unwrap();
+ if (transfer.status, transfer.status_msg.as_deref()) == (status, msg) {
+ return;
+ }
+ if attempts > 40 {
+ assert_eq!(
+ (transfer.status, transfer.status_msg.as_deref()),
+ (status, msg)
+ );
+ }
+ attempts += 1;
+ tokio::time::sleep(Duration::from_millis(200)).await;
+ }
}
async fn expect_incoming(&self, key: EddsaPublicKey) {
@@ -189,7 +257,7 @@ impl HarnessClient<'_> {
if check(¤t) {
return;
}
- if attempts > 20 {
+ if attempts > 40 {
assert!(check(¤t), "{current:?}");
}
attempts += 1;
@@ -199,13 +267,13 @@ impl HarnessClient<'_> {
/// Send transaction from client to exchange
async fn client_send(&self, subject: &str, amount: u32) -> u64 {
- self.send_tx(self.client, &self.exchange.iban, subject, amount)
+ self.send_tx(&self.client, &self.exchange.iban, subject, amount)
.await
}
/// Send transaction from exchange to client
async fn exchange_send_to(&self, subject: &str, amount: u32, to: &HuIban) -> u64 {
- self.send_tx(self.exchange, to, subject, amount).await
+ self.send_tx(&self.exchange, to, subject, amount).await
}
/// Send transaction from exchange to client
@@ -221,7 +289,7 @@ impl HarnessClient<'_> {
if current == status {
return;
}
- if attempts > 20 {
+ if attempts > 40 {
assert_eq!(current, status, "{code}");
}
attempts += 1;
@@ -231,13 +299,13 @@ impl HarnessClient<'_> {
}
struct Balances<'a> {
- client: &'a HarnessClient<'a>,
+ client: &'a Harness<'a>,
exchange_balance: u32,
client_balance: u32,
}
impl<'a> Balances<'a> {
- pub async fn new(client: &'a HarnessClient<'a>) -> Self {
+ pub async fn new(client: &'a Harness<'a>) -> Self {
let (exchange_balance, client_balance) = client.balance().await;
Self {
client,
@@ -255,7 +323,7 @@ impl<'a> Balances<'a> {
if current == (self.exchange_balance, self.client_balance) {
return;
}
- if attempts > 20 {
+ if attempts > 40 {
assert_eq!(
current,
(self.exchange_balance, self.client_balance),
@@ -272,221 +340,278 @@ fn step(step: &str) {
println!("{}", step.green());
}
-fn main() {
- let args = Args::parse();
- taler_main(CONFIG_SOURCE, args.common, |cfg| async move {
- step("Init");
-
- // Prepare db
- let db_cfg = parse_db_cfg(&cfg)?;
- let pool = pool(db_cfg.cfg, "magnet_bank").await?;
- let mut db = pool.acquire().await?.detach();
- dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", args.reset).await?;
-
- let cfg = HarnessCfg::parse(&cfg)?;
- let keys = setup::load(&cfg.worker)?;
- let client = reqwest::Client::new();
- let client = AuthClient::new(&client, &cfg.worker.api_url, &cfg.worker.consumer)
- .upgrade(&keys.access_token);
- let exchange_account = client.account(cfg.worker.payto.bban()).await?;
- let client_account = client.account(cfg.client_payto.bban()).await?;
-
- let mut worker = Worker {
- client: &client,
- db: &mut db,
- account_number: &exchange_account.number,
- account_code: exchange_account.code,
- key: &keys.signing_key,
- account_type: cfg.worker.account_type,
- ignore_tx_before: cfg.worker.ignore_tx_before,
- ignore_bounces_before: cfg.worker.ignore_bounces_before,
- };
- let harness = HarnessClient {
- pool: &pool,
- api: &client,
- exchange: &exchange_account,
- client: &client_account,
- signing_key: &keys.signing_key,
- };
+/// Run logic tests against real Magnet Bank backend
+async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
+ step("Run Magnet Bank logic harness tests");
+
+ step("Prepare db");
+ let db_cfg = parse_db_cfg(cfg)?;
+ let pool = pool(db_cfg.cfg, "magnet_bank").await?;
+ let mut db = pool.acquire().await?.detach();
+ dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
+
+ let cfg = HarnessCfg::parse(cfg)?;
+ let keys = setup::load(&cfg.worker)?;
+ let client = reqwest::Client::new();
+
+ let harness = Harness::new(&cfg, &client, &pool, &keys).await;
+
+ step("Warmup");
+ harness.worker().await?;
+ tokio::time::sleep(Duration::from_secs(5)).await;
+ harness.worker().await?;
+
+ let unknown_account = FullHuPayto::new(
+ HuIban::from_bban("1620000310991642").unwrap(),
+ "Unknown".to_string(),
+ );
+ let now = Timestamp::now();
+ let balance = &mut Balances::new(&harness).await;
+
+ step("Test incoming talerable transaction");
+ // Send talerable transaction
+ let reserve_pub = rand_edsa_pub_key();
+ harness
+ .client_send(&format!("Taler {reserve_pub}"), 33)
+ .await;
+ // Wait for transaction to finalize
+ balance.expect(33).await;
+ // Sync and register
+ harness.worker().await?;
+ harness.expect_incoming(reserve_pub).await;
+
+ step("Test incoming malformed transaction");
+ // Send malformed transaction
+ harness
+ .client_send(&format!("Malformed test {now}"), 34)
+ .await;
+ // Wait for transaction to finalize
+ balance.expect(34).await;
+ // Sync and bounce
+ harness.worker().await?;
+ // Wait for bounce to finalize
+ balance.expect(-34).await;
+ harness.worker().await?;
+
+ step("Test transfer to self");
+ // Init a transfer to self
+ let transfer_id = harness
+ .custom_transfer(
+ 101,
+ &FullHuPayto::new(harness.exchange.iban.clone(), "Self".to_string()),
+ )
+ .await;
+ // Should failed
+ harness.worker().await?;
+ // Check transfer failed
+ harness
+ .expect_transfer_status(
+ transfer_id,
+ TransferState::permanent_failure,
+ Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"),
+ )
+ .await;
+
+ step("Test transfer transactions");
+ // Init a transfer to client
+ let transfer_id = harness
+ .custom_transfer(
+ 102,
+ &FullHuPayto::new(harness.client.iban.clone(), "Client".to_string()),
+ )
+ .await;
+ // Should send
+ harness.worker().await?;
+ // Check transfer is still pending
+ harness
+ .expect_transfer_status(transfer_id, TransferState::pending, None)
+ .await;
+ // Wait for transaction to finalize
+ balance.expect(-102).await;
+ // Should register
+ harness.worker().await?;
+ // Check transfer is now successful
+ harness
+ .expect_transfer_status(transfer_id, TransferState::success, None)
+ .await;
+
+ step("Test transfer to unknown account");
+ let transfer_id = harness.custom_transfer(103, &unknown_account).await;
+ harness.worker().await?;
+ harness
+ .expect_transfer_status(transfer_id, TransferState::pending, None)
+ .await;
+ balance.expect(0).await;
+ harness.worker().await?;
+ harness
+ .expect_transfer_status(transfer_id, TransferState::permanent_failure, None)
+ .await;
+
+ step("Test unexpected outgoing");
+ // Manual tx from the exchange
+ harness
+ .exchange_send(&format!("What is this ? {now}"), 4)
+ .await;
+ harness.worker().await?;
+ // Wait for transaction to finalize
+ balance.expect(-4).await;
+ harness.worker().await?;
+
+ step("Test transfer failure init-tx");
+ harness.transfer(10).await;
+ set_failure_logic(FailureLogic::History(vec!["init-tx"]));
+ assert!(matches!(
+ harness.worker().await,
+ Err(WorkerError::Injected(InjectedErr("init-tx")))
+ ));
+ harness.worker().await?;
+ balance.expect(-10).await;
+ harness.worker().await?;
+
+ step("Test transfer failure submit-tx");
+ harness.transfer(11).await;
+ set_failure_logic(FailureLogic::History(vec!["submit-tx"]));
+ assert!(matches!(
+ harness.worker().await,
+ Err(WorkerError::Injected(InjectedErr("submit-tx")))
+ ));
+ harness.worker().await?;
+ balance.expect(-11).await;
+ harness.worker().await?;
+
+ step("Test transfer all failures");
+ harness.transfer(13).await;
+ set_failure_logic(FailureLogic::History(vec!["init-tx", "submit-tx"]));
+ assert!(matches!(
+ harness.worker().await,
+ Err(WorkerError::Injected(InjectedErr("init-tx")))
+ ));
+ assert!(matches!(
+ harness.worker().await,
+ Err(WorkerError::Injected(InjectedErr("submit-tx")))
+ ));
+ harness.worker().await?;
+ balance.expect(-13).await;
+ harness.worker().await?;
+
+ step("Test recover successful bounces");
+ let code = harness
+ .client_send(&format!("will be bounced {now}"), 2)
+ .await;
+ balance.expect(2).await;
+ harness
+ .exchange_send(&format!("bounced: {}", code + 1), 2)
+ .await;
+ balance.expect(-2).await;
+ harness.worker().await?;
+
+ step("Test recover failed bounces");
+ // Send malformed transaction
+ harness
+ .client_send(&format!("will be failed bounced {now}"), 3)
+ .await;
+ // Wait for it to be received because rejected transaction take too much time to appear in the transactions log
+ balance.expect(3).await;
+ // Bounce it manually
+ let received = harness.latest_tx(&harness.exchange).await;
+ let bounce_code = harness
+ .exchange_send_to(
+ &format!("bounce manually: {}", received.code),
+ 3,
+ &unknown_account,
+ )
+ .await;
+ harness.expect_status(bounce_code, TxStatus::Rejected).await;
+ // Should not bounce and catch the failure
+ harness.worker().await?;
+ // Wait for it to be bounce regardless because rejected transaction take too much time to appear in the transactions log
+ // TODO fix this
+ balance.expect(-3).await;
+
+ step("Finish");
+ tokio::time::sleep(Duration::from_secs(5)).await;
+ harness.worker().await?;
+ balance.expect(0).await;
+ Ok(())
+}
- if args.warmup {
- step("Warmup");
- worker.run().await?;
- tokio::time::sleep(Duration::from_secs(10)).await;
- worker.run().await?;
- }
+/// Run online tests against real Magnet Bank backend
+async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> {
+ step("Run Magnet Bank online harness tests");
+
+ step("Prepare db");
+ let db_cfg = parse_db_cfg(config)?;
+ let pool = pool(db_cfg.cfg, "magnet_bank").await?;
+ let mut db = pool.acquire().await?.detach();
+ dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?;
+
+ let cfg = HarnessCfg::parse(config)?;
+ let keys = setup::load(&cfg.worker)?;
+ let client = reqwest::Client::new();
+
+ let harness = Harness::new(&cfg, &client, &pool, &keys).await;
+
+ step("Warmup worker");
+ let _worker_task = {
+ let client = client.clone();
+ let pool = pool.clone();
+ let config = config.clone();
+ tokio::spawn(async move { run_worker(&config, &pool, &client).await })
+ };
+ tokio::time::sleep(Duration::from_secs(25)).await;
+
+ let now = Timestamp::now();
+ let balance = &mut Balances::new(&harness).await;
+
+ step("Test incoming transactions");
+ let reserve_pub = rand_edsa_pub_key();
+ harness
+ .client_send(&format!("Taler {reserve_pub}"), 3)
+ .await;
+ harness
+ .client_send(&format!("Malformed test {now}"), 4)
+ .await;
+ balance.expect(3).await;
+ harness.expect_incoming(reserve_pub).await;
+
+ step("Test outgoing transactions");
+ let transfer_self = harness
+ .custom_transfer(
+ 1,
+ &FullHuPayto::new(harness.exchange.iban.clone(), "Self".to_string()),
+ )
+ .await;
+ let transfer_id = harness
+ .custom_transfer(
+ 2,
+ &FullHuPayto::new(harness.client.iban.clone(), "Client".to_string()),
+ )
+ .await;
+ balance.expect(-2).await;
+ harness
+ .expect_transfer_status(
+ transfer_self,
+ TransferState::permanent_failure,
+ Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"),
+ )
+ .await;
+ harness
+ .expect_transfer_status(transfer_id, TransferState::success, None)
+ .await;
- let unknown_account = FullHuPayto::new(
- HuIban::from_bban("1620000310991642").unwrap(),
- "Unknown".to_string(),
- );
- let now = Timestamp::now();
- let balance = &mut Balances::new(&harness).await;
-
- step("Test incoming talerable transaction");
- // Send talerable transaction
- let reserve_pub = rand_edsa_pub_key();
- harness
- .client_send(&format!("Taler {reserve_pub}"), 33)
- .await;
- // Wait for transaction to finalize
- balance.expect(33).await;
- // Sync and register
- worker.run().await?;
- harness.expect_incoming(reserve_pub).await;
-
- step("Test incoming malformed transaction");
- // Send malformed transaction
- harness
- .client_send(&format!("Malformed test {now}"), 34)
- .await;
- // Wait for transaction to finalize
- balance.expect(34).await;
- // Sync and bounce
- worker.run().await?;
- // Wait for bounce to finalize
- balance.expect(-34).await;
- worker.run().await?;
-
- step("Test transfer to self");
- // Init a transfer to self
- let transfer_id = harness
- .custom_transfer(
- 101,
- &FullHuPayto::new(exchange_account.iban.clone(), "Self".to_string()),
- )
- .await;
- // Should failed
- worker.run().await?;
- // Check transfer failed
- harness
- .expect_transfer_status(
- transfer_id,
- TransferState::permanent_failure,
- Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"),
- )
- .await;
-
- step("Test transfer transactions");
- // Init a transfer to client
- let transfer_id = harness
- .custom_transfer(
- 102,
- &FullHuPayto::new(client_account.iban.clone(), "Client".to_string()),
- )
- .await;
- // Should send
- worker.run().await?;
- // Check transfer is still pending
- harness
- .expect_transfer_status(transfer_id, TransferState::pending, None)
- .await;
- // Wait for transaction to finalize
- balance.expect(-102).await;
- // Should register
- worker.run().await?;
- // Check transfer is now successful
- harness
- .expect_transfer_status(transfer_id, TransferState::success, None)
- .await;
-
- step("Test transfer to unknown account");
- let transfer_id = harness.custom_transfer(103, &unknown_account).await;
- worker.run().await?;
- harness
- .expect_transfer_status(transfer_id, TransferState::pending, None)
- .await;
- balance.expect(0).await;
- worker.run().await?;
- harness
- .expect_transfer_status(transfer_id, TransferState::permanent_failure, None)
- .await;
-
- step("Test unexpected outgoing");
- // Manual tx from the exchange
- harness
- .exchange_send(&format!("What is this ? {now}"), 4)
- .await;
- worker.run().await?;
- // Wait for transaction to finalize
- balance.expect(-4).await;
- worker.run().await?;
-
- step("Test transfer failure init-tx");
- harness.transfer(10).await;
- set_failure_logic(FailureLogic::History(vec!["init-tx"]));
- assert!(matches!(
- worker.run().await,
- Err(WorkerError::Injected(InjectedErr("init-tx")))
- ));
- worker.run().await?;
- balance.expect(-10).await;
- worker.run().await?;
+ step("Finish");
+ tokio::time::sleep(Duration::from_secs(5)).await;
+ balance.expect(0).await;
- step("Test transfer failure submit-tx");
- harness.transfer(11).await;
- set_failure_logic(FailureLogic::History(vec!["submit-tx"]));
- assert!(matches!(
- worker.run().await,
- Err(WorkerError::Injected(InjectedErr("submit-tx")))
- ));
- worker.run().await?;
- balance.expect(-11).await;
- worker.run().await?;
+ Ok(())
+}
- step("Test transfer all failures");
- harness.transfer(13).await;
- set_failure_logic(FailureLogic::History(vec!["init-tx", "submit-tx"]));
- assert!(matches!(
- worker.run().await,
- Err(WorkerError::Injected(InjectedErr("init-tx")))
- ));
- assert!(matches!(
- worker.run().await,
- Err(WorkerError::Injected(InjectedErr("submit-tx")))
- ));
- worker.run().await?;
- balance.expect(-13).await;
- worker.run().await?;
-
- step("Test recover successful bounces");
- let code = harness
- .client_send(&format!("will be bounced {now}"), 2)
- .await;
- balance.expect(2).await;
- harness
- .exchange_send(&format!("bounced: {}", code + 1), 2)
- .await;
- balance.expect(-2).await;
- worker.run().await?;
-
- step("Test recover failed bounces");
- // Send malformed transaction
- harness
- .client_send(&format!("will be failed bounced {now}"), 3)
- .await;
- // Wait for it to be received because rejected transaction take too much time to appear in the transactions log
- balance.expect(3).await;
- // Bounce it manually
- let received = harness.latest_tx(&exchange_account).await;
- let bounce_code = harness
- .exchange_send_to(
- &format!("bounce manualy: {}", received.code),
- 3,
- &unknown_account,
- )
- .await;
- harness.expect_status(bounce_code, TxStatus::Rejected).await;
- // Should not bounce and catch the failure
- worker.run().await?;
- // Wait for it to be bounce regardless because rejected transaction take too much time to appear in the transactions log
- // TODO fix this
- balance.expect(-3).await;
-
- step("Finish");
- tokio::time::sleep(Duration::from_secs(10)).await;
- worker.run().await?;
- balance.expect(0).await;
- Ok(())
+fn main() {
+ let args = Args::parse();
+ taler_main(CONFIG_SOURCE, args.common, |cfg| async move {
+ match args.cmd {
+ Command::Logic { reset } => logic_harness(&cfg, reset).await,
+ Command::Online { reset } => online_harness(&cfg, reset).await,
+ }
});
}
diff --git a/taler-magnet-bank/src/constant.rs b/taler-magnet-bank/src/constant.rs
@@ -1,21 +0,0 @@
-/*
- This file is part of TALER
- Copyright (C) 2025 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-
-use std::sync::LazyLock;
-
-use taler_common::types::amount::Currency;
-
-pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "HUF".parse().unwrap());
diff --git a/taler-magnet-bank/src/constants.rs b/taler-magnet-bank/src/constants.rs
@@ -0,0 +1,25 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use std::{sync::LazyLock, time::Duration};
+
+use taler_common::{config::parser::ConfigSource, types::amount::Currency};
+
+pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "HUF".parse().unwrap());
+pub const MAX_MAGNET_BBAN_SIZE: usize = 24;
+pub const CONFIG_SOURCE: ConfigSource =
+ ConfigSource::new("taler-magnet-bank", "magnet-bank", "taler-magnet-bank");
+pub const WORKER_FREQUENCY: Duration = Duration::from_secs(5);
diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs
@@ -34,7 +34,7 @@ use taler_common::{
};
use tokio::sync::watch::{Receiver, Sender};
-use crate::{FullHuPayto, constant::CURRENCY, magnet_api::types::TxStatus};
+use crate::{FullHuPayto, constants::CURRENCY, magnet_api::types::TxStatus};
pub async fn notification_listener(
pool: PgPool,
@@ -781,8 +781,7 @@ mod test {
use tokio::sync::watch::Receiver;
use crate::{
- CONFIG_SOURCE,
- constant::CURRENCY,
+ constants::{CONFIG_SOURCE, CURRENCY},
db::{
self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult,
TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer,
diff --git a/taler-magnet-bank/src/lib.rs b/taler-magnet-bank/src/lib.rs
@@ -14,19 +14,31 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::{borrow::Cow, str::FromStr};
+use std::{borrow::Cow, str::FromStr, sync::Arc};
+use sqlx::{Acquire, PgPool, postgres::PgListener};
+use taler_api::api::{Router, TalerRouter as _};
use taler_common::{
- config::parser::ConfigSource,
+ ExpoBackoffDecorr,
+ config::Config,
types::{
iban::{Country, IBAN, IbanErrorKind, ParseIbanError},
payto::{FullPayto, IbanPayto, Payto, PaytoErr, PaytoImpl, PaytoURI, TransferPayto},
},
};
+use tracing::{debug, error};
+
+use crate::{
+ api::MagnetApi,
+ config::{ServeCfg, WorkerCfg},
+ constants::WORKER_FREQUENCY,
+ magnet_api::client::AuthClient,
+ worker::{Worker, WorkerResult},
+};
pub mod api;
pub mod config;
-pub mod constant;
+pub mod constants;
pub mod db;
pub mod dev;
pub mod magnet_api;
@@ -81,9 +93,73 @@ pub mod failure_injection {
}
}
-pub const MAX_MAGNET_BBAN_SIZE: usize = 24;
-pub const CONFIG_SOURCE: ConfigSource =
- ConfigSource::new("taler-magnet-bank", "magnet-bank", "taler-magnet-bank");
+pub async fn run_serve(cfg: &Config, pool: PgPool) -> anyhow::Result<()> {
+ let cfg = ServeCfg::parse(cfg)?;
+ let api = Arc::new(MagnetApi::start(pool, cfg.payto).await);
+ let mut router = Router::new();
+ if let Some(cfg) = cfg.wire_gateway {
+ router = router.wire_gateway(api.clone(), cfg.auth);
+ }
+ if let Some(cfg) = cfg.revenue {
+ router = router.revenue(api, cfg.auth);
+ }
+ router.serve(cfg.serve, None).await?;
+ Ok(())
+}
+
+pub async fn run_worker(
+ cfg: &Config,
+ pool: &PgPool,
+ client: &reqwest::Client,
+) -> anyhow::Result<()> {
+ let cfg = WorkerCfg::parse(cfg)?;
+ let keys = setup::load(&cfg)?;
+ let client = AuthClient::new(client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token);
+ let mut jitter = ExpoBackoffDecorr::default();
+ // TODO run in loop and handle errors
+ loop {
+ let res: WorkerResult = async {
+ let account = client.account(cfg.payto.bban()).await?;
+ let db = &mut PgListener::connect_with(pool).await?;
+
+ // TODO take a postgresql lock ?
+
+ // Listen to all channels
+ db.listen_all(["transfer"]).await?;
+
+ loop {
+ debug!(target: "worker", "running");
+ Worker {
+ client: &client,
+ db: db.acquire().await?,
+ account_number: &account.number,
+ account_code: account.code,
+ key: &keys.signing_key,
+ account_type: cfg.account_type,
+ ignore_tx_before: cfg.ignore_tx_before,
+ ignore_bounces_before: cfg.ignore_bounces_before,
+ }
+ .run()
+ .await?;
+ jitter.reset();
+
+ // Wait for notifications or sync timeout
+ if let Ok(res) = tokio::time::timeout(WORKER_FREQUENCY, db.try_recv()).await {
+ let mut ntf = res?;
+ // Conflate all notifications
+ while let Some(n) = ntf {
+ debug!(target: "worker", "notification from {}", n.channel());
+ ntf = db.next_buffered();
+ }
+ }
+ }
+ }
+ .await;
+ let err = res.unwrap_err();
+ error!(target: "worker", "{err}");
+ tokio::time::sleep(jitter.backoff()).await;
+ }
+}
#[derive(
Debug, Clone, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
diff --git a/taler-magnet-bank/src/magnet_api/api.rs b/taler-magnet-bank/src/magnet_api/api.rs
@@ -99,7 +99,8 @@ impl<E: std::error::Error> From<E> for FmtSource<E> {
impl From<reqwest::Error> for ErrKind {
fn from(value: reqwest::Error) -> Self {
- Self::Transport(FmtSource(value))
+ // We remove the URL as we already provide the API path
+ Self::Transport(FmtSource(value.without_url()))
}
}
diff --git a/taler-magnet-bank/src/magnet_api/client.rs b/taler-magnet-bank/src/magnet_api/client.rs
@@ -117,7 +117,7 @@ impl<'a> AuthClient<'a> {
}
pub struct ApiClient<'a> {
- client: &'a reqwest::Client,
+ pub client: &'a reqwest::Client,
api_url: &'a reqwest::Url,
consumer: &'a Token,
access: &'a Token,
diff --git a/taler-magnet-bank/src/main.rs b/taler-magnet-bank/src/main.rs
@@ -14,10 +14,7 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::sync::Arc;
-
use clap::Parser;
-use taler_api::api::{Router, TalerRouter as _};
use taler_common::{
CommonArgs,
cli::{ConfigCmd, long_version},
@@ -26,13 +23,10 @@ use taler_common::{
taler_main,
};
use taler_magnet_bank::{
- CONFIG_SOURCE,
- api::MagnetApi,
config::{ServeCfg, WorkerCfg, parse_db_cfg},
+ constants::CONFIG_SOURCE,
dev::{self, DevCmd},
- magnet_api::client::AuthClient,
- setup,
- worker::Worker,
+ run_serve, run_worker, setup,
};
#[derive(clap::Parser, Debug)]
@@ -100,40 +94,14 @@ async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> {
} else {
let db = parse_db_cfg(cfg)?;
let pool = pool(db.cfg, "magnet_bank").await?;
- let cfg = ServeCfg::parse(cfg)?;
- let api = Arc::new(MagnetApi::start(pool, cfg.payto).await);
- let mut router = Router::new();
- if let Some(cfg) = cfg.wire_gateway {
- router = router.wire_gateway(api.clone(), cfg.auth);
- }
- if let Some(cfg) = cfg.revenue {
- router = router.revenue(api, cfg.auth);
- }
- router.serve(cfg.serve, None).await?;
+ run_serve(cfg, pool).await?;
}
}
Command::Worker { transient: _ } => {
let db = parse_db_cfg(cfg)?;
let pool = pool(db.cfg, "magnet_bank").await?;
- let cfg = WorkerCfg::parse(cfg)?;
- let keys = setup::load(&cfg)?;
let client = reqwest::Client::new();
- let client =
- AuthClient::new(&client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token);
- let account = client.account(cfg.payto.bban()).await?;
- let mut db = pool.acquire().await?.detach();
- // TODO run in loop and handle errors
- let mut worker = Worker {
- client: &client,
- db: &mut db,
- account_number: &account.number,
- account_code: account.code,
- key: &keys.signing_key,
- account_type: cfg.account_type,
- ignore_tx_before: cfg.ignore_tx_before,
- ignore_bounces_before: cfg.ignore_bounces_before,
- };
- worker.run().await?;
+ run_worker(cfg, &pool, &client).await?;
}
Command::Config(cfg_cmd) => cfg_cmd.run(cfg)?,
Command::Dev(dev_cmd) => dev::dev(cfg, dev_cmd).await?,
diff --git a/taler-magnet-bank/src/worker.rs b/taler-magnet-bank/src/worker.rs
@@ -51,7 +51,7 @@ pub enum WorkerError {
Injected(#[from] InjectedErr),
}
-type WorkerResult = Result<(), WorkerError>;
+pub type WorkerResult = Result<(), WorkerError>;
pub struct Worker<'a> {
pub client: &'a ApiClient<'a>,
diff --git a/taler-magnet-bank/tests/api.rs b/taler-magnet-bank/tests/api.rs
@@ -26,8 +26,8 @@ use taler_common::{
types::{amount::amount, payto::payto, timestamp::Timestamp, url},
};
use taler_magnet_bank::{
- CONFIG_SOURCE,
api::MagnetApi,
+ constants::CONFIG_SOURCE,
db::{self, TxOutKind},
magnet_api::types::TxStatus,
magnet_payto,