taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit acd8bcc08592bdeacebdd806d897baf48badf652
parent 13d35fb71aad9b9b02c555629ab84902852b09b8
Author: Antoine A <>
Date:   Tue, 23 Dec 2025 12:13:38 +0100

cyclos: improve config, doc and spec compliance and add online tests

Diffstat:
Mtaler-cyclos/README.md | 73++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------
Mtaler-cyclos/cyclos.conf | 34+++++++++++++---------------------
Rdocker-compose.yml -> taler-cyclos/docker-compose.yml | 0
Mtaler-cyclos/src/bin/cyclos-harness.rs | 188++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------
Mtaler-cyclos/src/config.rs | 54+++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mtaler-cyclos/src/constants.rs | 3+++
Mtaler-cyclos/src/cyclos_api/api.rs | 52++++++++++++++++++++++++++++++++++++++++++++++------
Mtaler-cyclos/src/cyclos_api/types.rs | 25+++++++++++++------------
Mtaler-cyclos/src/lib.rs | 6++++--
Mtaler-cyclos/src/worker.rs | 72+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-----
Mtaler-magnet-bank/src/bin/magnet-bank-harness.rs | 17+----------------
Mtaler-magnet-bank/src/worker.rs | 5++++-
12 files changed, 401 insertions(+), 128 deletions(-)

diff --git a/taler-cyclos/README.md b/taler-cyclos/README.md @@ -1,27 +1,70 @@ +# Taler Cyclos Adapter +Taler adapter for the [Cyclos payment system](https://www.cyclos.org/) v4.16.17. -podman-compose up +## Harness -# Create test deploment +Some tests are not automated and must be run on a real Cyclos backend to ensure the correct behavior of the adapter in real conditions. This is also necessary to ensure we support API upgrades. -ANything will do +### Logical tests -# Create and configure network +Those tests are exhaustive tests of the worker with failure injection it will send a lot of transactions and therefore should not be made using real money. +#### Setup +A [docker-compose.yml](./docker-compose.yml) file is provided for you, you can start it using: -- allow debt -- give money at creation +```sh +cd taler-cyclos && podman-compose up +``` -- enable channel for all users -- enable transactions ? -Enable chargeback in Product details +Got to `localhost:8080` and follow the configuration wizard. -# Create two users +You will need to create two accounts, one for the exchange and one for the client. Both accounts must have the web service enabled (System > Configuration. +s > Channels > Web Services > User access) and some money available. You also need to enable chargeback for user payments (System > Products > Chargeback of payments) -wire f20n4X3qV44dNoZUmpeU -client 1EkY5JJMrkwyvv9yK7x4 +#### Configuration +Configure the exchange account like you would in production and add the additional harness config with the client account info: -Type -Member -\ No newline at end of file +```ini +# dev.json +[cyclos-harness] +API_URL = http://localhost:8080/api/ + +[cyclos-harness] +USERNAME = client +PASSWORD = password +``` + +#### Run + +```sh +cargo run --bin cyclos-harness -- -c dev.conf logic -L DEBUG +``` + +### Online tests + +Those are lighter tests that perform only a few transactions. It's configured like logical test and can be run on a production environment like [demo.cyclos.org](https://demo.cyclos.org). + +#### Setup + +Create manually two accounts with some money. + +#### Configuration + +```ini +# dev.json +[cyclos-harness] +API_URL = https://demo.cyclos.org/api/ + +[cyclos-harness] +USERNAME = client +PASSWORD = password +``` + +#### Run + +```sh +cargo run --bin cyclos-harness -- -c dev.conf online -L DEBUG +``` +\ No newline at end of file diff --git a/taler-cyclos/cyclos.conf b/taler-cyclos/cyclos.conf @@ -1,30 +1,24 @@ [cyclos] -# IBAN of the Magnet Bank account to sync -IBAN = +# Adapter currency +CURRENCY = -# Legal entity that is associated with the Magnet Bank account -NAME = - -[magnet-bank-worker] -# URL of the Magnet Bank API server +[cyclos-worker] +# URL of the Cyclos API server API_URL = "https://mobil.magnetbank.hu" -# Your Magnet Bank API unique identifier -# CONSUMER_KEY = "Consumer" - -# Your Magnet Bank API confidential key -# CONSUMER_SECRET = "qikgjxc5y06tiil7qgrmh09l7rfi5a8e" +# Account username +USERNAME = -# File that holds the crypto keys and access token. -# KEYS_FILE = ${MAGNET_BANK_HOME}/keys.json +# Account password +PASSWORD = # Specify the account type and therefore the indexing behavior. # This can either can be normal or exchange. # Exchange accounts bounce invalid incoming Taler transactions. ACCOUNT_TYPE = exchange -[magnet-bank-httpd] -# How "taler-magnet-bank serve" serves its API, this can either be tcp or unix +[cyclos-httpd] +# How "taler-cyclos serve" serves its API, this can either be tcp or unix SERVE = tcp # Port on which the HTTP server listens, e.g. 9967. Only used if SERVE is tcp. @@ -34,12 +28,12 @@ PORT = 8080 BIND_TO = 0.0.0.0 # Which unix domain path should we bind to? Only used if SERVE is unix. -# UNIXPATH = libeufin-bank.sock +# UNIXPATH = taler-cyclos.sock # What should be the file access permissions for UNIXPATH? Only used if SERVE is unix. # UNIXPATH_MODE = 660 -[magnet-bank-httpd-wire-gateway-api] +[cyclos-httpd-wire-gateway-api] # Whether to serve the Wire Gateway API ENABLED = NO @@ -55,8 +49,7 @@ AUTH_METHOD = bearer # Token for bearer authentication scheme TOKEN = - -[magnet-bank-httpd-revenue-api] +[cyclos-httpd-revenue-api] # Whether to serve the Revenue API ENABLED = NO @@ -72,7 +65,6 @@ AUTH_METHOD = bearer # Token for bearer authentication scheme TOKEN = - [cyclosdb-postgres] # DB connection string CONFIG = postgres:///taler-cyclos diff --git a/docker-compose.yml b/taler-cyclos/docker-compose.yml diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs @@ -14,7 +14,7 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::{str::FromStr as _, time::Duration}; +use std::time::Duration; use clap::Parser as _; use failure_injection::{InjectedErr, set_failure_scenario}; @@ -38,11 +38,11 @@ use taler_common::{ }; use taler_cyclos::{ CyclosId, FullCyclosPayto, - config::{AccountType, parse_db_cfg}, + config::{AccountType, HarnessCfg, parse_db_cfg}, constants::CONFIG_SOURCE, cyclos_api::{api::CyclosAuth, client::Client, types::HistoryItem}, db::{self, TransferResult}, - worker::{Worker, WorkerError, WorkerResult}, + worker::{Worker, WorkerError, WorkerResult, run_worker}, }; /// Cyclos Adapter harness test suite @@ -78,8 +78,8 @@ struct Harness<'a> { pool: &'a PgPool, client: Client<'a>, wire: Client<'a>, - client_id: u64, - wire_id: u64, + client_payto: FullCyclosPayto, + wire_payto: FullCyclosPayto, currency: Currency, } @@ -88,8 +88,8 @@ impl<'a> Harness<'a> { let (exchange, client) = tokio::try_join!(self.wire.accounts(), self.client.accounts()).unwrap(); ( - exchange[0].status.available_balance, - client[0].status.available_balance, + exchange[0].status.available_balance.unwrap_or(exchange[0].status.balance), + client[0].status.available_balance.unwrap_or(client[0].status.balance), ) } @@ -97,7 +97,7 @@ impl<'a> Harness<'a> { async fn client_send(&self, subject: &str, amount: Decimal) -> u64 { *self .client - .direct_payment(self.wire_id, amount, subject) + .direct_payment(**self.wire_payto, amount, subject) .await .unwrap() .id @@ -107,7 +107,7 @@ impl<'a> Harness<'a> { async fn exchange_send(&self, subject: &str, amount: Decimal) -> u64 { *self .wire - .direct_payment(self.client_id, amount, subject) + .direct_payment(**self.client_payto, amount, subject) .await .unwrap() .id @@ -121,7 +121,7 @@ impl<'a> Harness<'a> { /// Fetch last transfer related to client async fn client_last_transfer(&self) -> HistoryItem { self.client - .transfers(self.client_id) + .transfers(**self.client_payto) .await .unwrap() .remove(0) @@ -133,7 +133,7 @@ impl<'a> Harness<'a> { let account = self.wire.accounts().await.unwrap()[0].clone(); Worker { db, - currency: Currency::from_str("TEST").unwrap(), + currency: self.currency.clone(), client: &self.wire, account_type_id: *account.ty.id, account_type: AccountType::Exchange, @@ -185,8 +185,7 @@ impl<'a> Harness<'a> { } async fn transfer(&self, amount: Decimal) -> u64 { - let creditor = FullCyclosPayto::new(CyclosId(self.client_id), "Client".to_string()); - self.custom_transfer(amount, &creditor).await + self.custom_transfer(amount, &self.client_payto).await } async fn transfer_id(&self, transfer_id: u64) -> u64 { @@ -245,29 +244,48 @@ impl<'a> Balances<'a> { async fn expect_add(&mut self, diff: Decimal) { self.exchange_balance = self.exchange_balance.try_add(&diff).unwrap(); self.client_balance = self.client_balance.try_sub(&diff).unwrap(); - - let current = self.client.balance().await; - assert_eq!( - current, - (self.exchange_balance, self.client_balance), - "{current:?} {diff}" - ); + let mut attempts = 0; + loop { + let current = self.client.balance().await; + if current == (self.exchange_balance, self.client_balance) { + return; + } + if attempts > 40 { + assert_eq!( + current, + (self.exchange_balance, self.client_balance), + "{current:?} {diff}" + ); + } + attempts += 1; + tokio::time::sleep(Duration::from_millis(200)).await; + } } async fn expect_sub(&mut self, diff: Decimal) { self.exchange_balance = self.exchange_balance.try_sub(&diff).unwrap(); self.client_balance = self.client_balance.try_add(&diff).unwrap(); - let current = self.client.balance().await; - assert_eq!( - current, - (self.exchange_balance, self.client_balance), - "{current:?} {diff}" - ); + let mut attempts = 0; + loop { + let current = self.client.balance().await; + if current == (self.exchange_balance, self.client_balance) { + return; + } + if attempts > 40 { + assert_eq!( + current, + (self.exchange_balance, self.client_balance), + "{current:?} {diff}" + ); + } + attempts += 1; + tokio::time::sleep(Duration::from_millis(200)).await; + } } } -/// Run logic tests against real Cyclos backend +/// Run logic tests against local Cyclos backend async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { step("Run Cyclos logic harness tests"); @@ -277,33 +295,32 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { let mut db = pool.acquire().await?.detach(); dbinit(&mut db, db_cfg.sql_dir.as_ref(), "cyclos", reset).await?; + let cfg = HarnessCfg::parse(cfg)?; let client = reqwest::Client::new(); - let api_url = reqwest::Url::from_str("http://localhost:8080/api/").unwrap(); let wire = Client { client: &client, - api_url: &api_url, + api_url: &cfg.worker.api_url, auth: &CyclosAuth::Basic { - username: "wire".into(), - password: "f20n4X3qV44dNoZUmpeU".into(), + username: cfg.worker.username, + password: cfg.worker.password, }, }; let client = Client { client: &client, - api_url: &api_url, + api_url: &cfg.worker.api_url, auth: &CyclosAuth::Basic { - username: "client".into(), - password: "1EkY5JJMrkwyvv9yK7x4".into(), + username: cfg.username, + password: cfg.password, }, }; - let currency = Currency::from_str("TEST").unwrap(); let harness = Harness { pool: &pool, - client_id: *client.whoami().await.unwrap().id, - wire_id: *wire.whoami().await.unwrap().id, + client_payto: client.whoami().await.unwrap().payto(), + wire_payto: wire.whoami().await.unwrap().payto(), client, wire, - currency, + currency: cfg.worker.currency, }; step("Warmup"); @@ -356,10 +373,7 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { step("Test transfer to self"); // Init a transfer to self let transfer_id = harness - .custom_transfer( - decimal("10.1"), - &FullCyclosPayto::new(CyclosId(harness.wire_id), "Self".to_string()), - ) + .custom_transfer(decimal("10.1"), &harness.wire_payto) .await; // Should failed harness.worker().await?; @@ -405,12 +419,7 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { step("Test transfer chargeback"); let amount = decimal("10.1"); // Init a transfer to client - let transfer_id = harness - .custom_transfer( - amount, - &FullCyclosPayto::new(CyclosId(harness.client_id), "Chargeback".to_string()), - ) - .await; + let transfer_id = harness.transfer(amount).await; harness .expect_transfer_status(transfer_id, TransferState::pending, None) .await; @@ -491,9 +500,90 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { /// Run online tests against real Cyclos backend async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> { - step("Run Cyclos harness tests"); + step("Run Cyclos online harness tests"); + + step("Prepare db"); + let db_cfg = parse_db_cfg(config)?; + let pool = pool(db_cfg.cfg, "cyclos").await?; + let mut db = pool.acquire().await?.detach(); + dbinit(&mut db, db_cfg.sql_dir.as_ref(), "cyclos", reset).await?; + + let cfg = HarnessCfg::parse(config)?; + let http_client = reqwest::Client::new(); + let wire = Client { + client: &http_client, + api_url: &cfg.worker.api_url, + auth: &CyclosAuth::Basic { + username: cfg.worker.username, + password: cfg.worker.password, + }, + }; + let client = Client { + client: &http_client, + api_url: &cfg.worker.api_url, + auth: &CyclosAuth::Basic { + username: cfg.username, + password: cfg.password, + }, + }; + + let harness = Harness { + pool: &pool, + client_payto: client.whoami().await.unwrap().payto(), + wire_payto: wire.whoami().await.unwrap().payto(), + client, + wire, + currency: cfg.worker.currency, + }; + + step("Warmup worker"); + let _worker_task = { + let client = http_client.clone(); + let pool = pool.clone(); + let config = config.clone(); + tokio::spawn(async move { run_worker(&config, &pool, &client).await }) + }; + tokio::time::sleep(Duration::from_secs(5)).await; + harness.worker().await.unwrap(); + let now = Timestamp::now(); + let balance = &mut Balances::new(&harness).await; + + step("Test incoming transactions"); + let taler_amount = decimal("3"); + let malformed_amount = decimal("4"); + let reserve_pub = rand_edsa_pub_key(); + harness + .client_send(&format!("Taler {reserve_pub}"), taler_amount) + .await; + harness + .client_send(&format!("Malformed test {now}"), malformed_amount) + .await; + balance.expect_add(taler_amount).await; + harness.expect_incoming(reserve_pub).await; + + step("Test outgoing transactions"); + let self_amount = decimal("1"); + let taler_amount = decimal("2"); + + let transfer_self = harness + .custom_transfer(self_amount, &harness.wire_payto) + .await; + let transfer_id = harness.transfer(taler_amount).await; + balance.expect_sub(taler_amount).await; + harness + .expect_transfer_status( + transfer_self, + TransferState::permanent_failure, + Some("permissionDenied - The operation was denied because a required permission was not granted"), + ) + .await; + harness + .expect_transfer_status(transfer_id, TransferState::success, None) + .await; step("Finish"); + tokio::time::sleep(Duration::from_secs(5)).await; + balance.expect_add(Decimal::zero()).await; Ok(()) } diff --git a/taler-cyclos/src/config.rs b/taler-cyclos/src/config.rs @@ -14,8 +14,12 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ +use reqwest::Url; use taler_api::config::DbCfg; -use taler_common::config::{Config, ValueErr}; +use taler_common::{ + config::{Config, ValueErr}, + map_config, types::amount::Currency, +}; #[derive(Debug, Clone, Copy)] pub enum AccountType { @@ -26,3 +30,51 @@ pub enum AccountType { pub fn parse_db_cfg(cfg: &Config) -> Result<DbCfg, ValueErr> { DbCfg::parse(cfg.section("cyclosdb-postgres")) } + +/// taler-cyclos worker config +pub struct WorkerCfg { + pub currency: Currency, + pub username: String, + pub api_url: Url, + pub password: String, + pub account_type: AccountType, +} + +impl WorkerCfg { + pub fn parse(cfg: &Config) -> Result<Self, ValueErr> { + let main_sect = cfg.section("cyclos"); + let sect = cfg.section("cyclos-worker"); + Ok(Self { + currency: main_sect.parse("currency", "CURRENCY").require()?, + username: sect.str("USERNAME").require()?, + account_type: map_config!(sect, "account type", "ACCOUNT_TYPE", + "exchange" => { Ok(AccountType::Exchange) }, + "normal" => { Ok(AccountType::Normal) } + ) + .require()?, + api_url: sect.parse("URL", "API_URL").require()?, + password: sect.str("PASSWORD").require()?, + }) + } +} + +/// cyclos-harness config +pub struct HarnessCfg { + pub worker: WorkerCfg, + pub username: String, + pub password: String, +} + +impl HarnessCfg { + pub fn parse(cfg: &Config) -> Result<Self, ValueErr> { + let worker = WorkerCfg::parse(cfg)?; + + let sect = cfg.section("cyclos-harness"); + + Ok(Self { + worker, + username: sect.str("USERNAME").require()?, + password: sect.str("PASSWORD").require()?, + }) + } +} diff --git a/taler-cyclos/src/constants.rs b/taler-cyclos/src/constants.rs @@ -14,6 +14,9 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ +use std::time::Duration; + use taler_common::config::parser::ConfigSource; pub const CONFIG_SOURCE: ConfigSource = ConfigSource::new("taler-cyclos", "cyclos", "taler-cyclos"); +pub const WORKER_FREQUENCY: Duration = Duration::from_secs(5); diff --git a/taler-cyclos/src/cyclos_api/api.rs b/taler-cyclos/src/cyclos_api/api.rs @@ -14,7 +14,7 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::borrow::Cow; +use std::{borrow::Cow, fmt::Display}; use reqwest::{Client, Method, RequestBuilder, StatusCode, Url}; use serde::{Deserialize, Serialize, de::DeserializeOwned}; @@ -25,19 +25,36 @@ use crate::cyclos_api::types::{ ForbiddenError, InputError, NotFoundError, UnauthorizedError, UnexpectedError, }; +#[derive(Debug)] pub enum CyclosAuth { None, Basic { username: String, password: String }, } #[derive(Error, Debug)] -#[error("{method} {path} {kind}")] pub struct ApiErr { pub path: Cow<'static, str>, pub method: Method, + pub status: Option<StatusCode>, pub kind: ErrKind, } +impl Display for ApiErr { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + path, + method, + status, + kind, + } = self; + write!(f, "{path} {method} ")?; + if let Some(status) = status { + write!(f, "{status} ")?; + } + write!(f, "{kind}") + } +} + #[derive(Error, Debug)] pub enum ErrKind { #[error("transport: {0}")] @@ -75,7 +92,7 @@ fn parse<'de, T: Deserialize<'de>>(str: &'de str) -> Result<T, ErrKind> { async fn json_body<T: DeserializeOwned>(res: reqwest::Response) -> Result<T, ErrKind> { // TODO check content type? let body = res.text().await?; - // println!("{body}"); + //println!("{body}"); let parsed = parse(&body)?; Ok(parsed) } @@ -130,10 +147,25 @@ impl<'a> CyclosRequest<'a> { } } .build_split(); - async { + let res = match async { let req = req?; let res = client.execute(req).await?; - let status = res.status(); + Ok(res) + } + .await + { + Ok(res) => res, + Err(kind) => { + return Err(ApiErr { + path, + method, + kind, + status: None, + }); + } + }; + let status = res.status(); + match async { match status { StatusCode::OK | StatusCode::CREATED => json_body(res).await, StatusCode::UNAUTHORIZED => Err(ErrKind::Unauthorized(json_body(res).await?)), @@ -145,6 +177,14 @@ impl<'a> CyclosRequest<'a> { } } .await - .map_err(|kind| ApiErr { path, method, kind }) + { + Ok(res) => Ok(res), + Err(kind) => Err(ApiErr { + path, + method, + kind, + status: Some(status), + }), + } } } diff --git a/taler-cyclos/src/cyclos_api/types.rs b/taler-cyclos/src/cyclos_api/types.rs @@ -18,15 +18,15 @@ use std::collections::BTreeMap; use jiff::Timestamp; use serde::Deserialize; -use taler_common::types::amount::Decimal; +use taler_common::types::{amount::Decimal, payto::FullPayto}; -use crate::CyclosId; +use crate::{CyclosId, FullCyclosPayto}; #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Type { pub id: CyclosId, - pub internal_name: String, + pub internal_name: Option<String>, pub name: String, } @@ -43,7 +43,7 @@ pub struct Currency { #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct AccountStatus { - pub available_balance: Decimal, + pub available_balance: Option<Decimal>, pub balance: Decimal, pub credit_limit: Decimal, pub reserved_amount: Decimal, @@ -66,6 +66,12 @@ pub struct User { pub name: String, } +impl User { + pub fn payto(&self) -> FullCyclosPayto { + FullPayto::new(self.id, self.name.clone()) + } +} + #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct PaymentData { @@ -100,7 +106,7 @@ pub struct HistoryItem { pub ty: Type, pub description: Option<String>, pub related_account: RelatedAccount, - pub transaction: Option<TransactionMini> + pub transaction: Option<TransactionMini>, } #[derive(Debug, Deserialize)] @@ -252,14 +258,9 @@ pub enum UnauthorizedError { #[error("loggedOut - The session token used for access is invalid")] LoggedOut, #[error( - "login {invalid_device_confirmation:?} {user_status:?} {pasword_status:?} - Either user identification (principal) or password are invalid. May have additional information, such as the user / password status" + "login - Either user identification (principal) or password are invalid. May have additional information, such as the user / password status" )] - #[serde(rename = "camelCase")] - Login { - invalid_device_confirmation: InvalidDeviceConfirmation, - user_status: UserStatus, - pasword_status: PasswordStatus, - }, + Login, #[error( "missingAuthorization - Attempt to access an operation as guest, but the operation requires authentication" )] diff --git a/taler-cyclos/src/lib.rs b/taler-cyclos/src/lib.rs @@ -25,7 +25,7 @@ pub mod db; pub mod worker; #[derive( - Debug, Clone, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay, + Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay, )] pub struct CyclosId(pub u64); @@ -51,7 +51,9 @@ impl FromStr for CyclosId { type Err = CyclosIdError; fn from_str(s: &str) -> Result<Self, Self::Err> { - Ok(Self(u64::from_str(s.trim_start_matches('-')).map_err(CyclosIdError)?)) + Ok(Self( + u64::from_str(s.trim_start_matches('-')).map_err(CyclosIdError)?, + )) } } diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs @@ -16,16 +16,21 @@ use failure_injection::{InjectedErr, fail_point}; use jiff::Timestamp; -use sqlx::PgConnection; +use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener}; use taler_api::subject::{self, parse_incoming_unstructured}; -use taler_common::types::amount::{self, Currency}; +use taler_common::{ + ExpoBackoffDecorr, + config::Config, + types::amount::{self, Currency}, +}; use tracing::{debug, error, info, trace, warn}; use crate::{ FullCyclosPayto, - config::AccountType, + config::{AccountType, WorkerCfg}, + constants::WORKER_FREQUENCY, cyclos_api::{ - api::{ApiErr, ErrKind}, + api::{ApiErr, CyclosAuth, ErrKind}, client::Client, types::{AccountKind, HistoryItem, NotFoundError}, }, @@ -46,6 +51,63 @@ pub enum WorkerError { pub type WorkerResult = Result<(), WorkerError>; +pub async fn run_worker( + cfg: &Config, + pool: &PgPool, + client: &reqwest::Client, +) -> anyhow::Result<()> { + let cfg = WorkerCfg::parse(cfg)?; + let client = Client { + client, + api_url: &cfg.api_url, + auth: &CyclosAuth::Basic { + username: cfg.username, + password: cfg.password, + }, + }; + let mut jitter = ExpoBackoffDecorr::default(); + loop { + let res: WorkerResult = async { + let db = &mut PgListener::connect_with(pool).await?; + let account = client.accounts().await.unwrap()[0].clone(); + + // TODO take a postgresql lock ? + + // Listen to all channels + db.listen_all(["transfer"]).await?; + + loop { + debug!(target: "worker", "running"); + + Worker { + client: &client, + db: db.acquire().await?, + account_type_id: *account.ty.id, + account_type: cfg.account_type, + currency: cfg.currency.clone(), + } + .run() + .await?; + jitter.reset(); + + // Wait for notifications or sync timeout + if let Ok(res) = tokio::time::timeout(WORKER_FREQUENCY, db.try_recv()).await { + let mut ntf = res?; + // Conflate all notifications + while let Some(n) = ntf { + debug!(target: "worker", "notification from {}", n.channel()); + ntf = db.next_buffered(); + } + } + } + } + .await; + let err = res.unwrap_err(); + error!(target: "worker", "{err}"); + tokio::time::sleep(jitter.backoff()).await; + } +} + pub struct Worker<'a> { pub client: &'a Client<'a>, pub db: &'a mut PgConnection, @@ -244,7 +306,7 @@ impl Worker<'_> { AccountType::Exchange => { let transfer = self.client.transfer(tx.transfer_id).await?; - if let Some(_) = transfer.charged_back_by { + if transfer.charged_back_by.is_some() { match db::initiated_chargeback_failure(&mut *self.db, *transfer.id).await? { ChargebackFailureResult::Unknown => { trace!(target: "worker", "initiated failure unknown: charged back") diff --git a/taler-magnet-bank/src/bin/magnet-bank-harness.rs b/taler-magnet-bank/src/bin/magnet-bank-harness.rs @@ -250,21 +250,6 @@ impl<'a> Harness<'a> { .tx } - async fn expect_latest_tx(&self, account: &Account, mut check: impl FnMut(&TxDto) -> bool) { - let mut attempts = 0; - loop { - let current = self.latest_tx(account).await; - if check(&current) { - return; - } - if attempts > 40 { - assert!(check(&current), "{current:?}"); - } - attempts += 1; - tokio::time::sleep(Duration::from_millis(200)).await; - } - } - /// Send transaction from client to exchange async fn client_send(&self, subject: &str, amount: u32) -> u64 { self.send_tx(&self.client, &self.exchange.iban, subject, amount) @@ -340,7 +325,7 @@ fn step(step: &str) { println!("{}", step.green()); } -/// Run logic tests against real Magnet Bank backend +/// Run logic tests against local Magnet Bank backend async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { step("Run Magnet Bank logic harness tests"); diff --git a/taler-magnet-bank/src/worker.rs b/taler-magnet-bank/src/worker.rs @@ -350,7 +350,10 @@ impl Worker<'_> { /// Try to sign an unsigned initiated transaction pub async fn recover_tx(&mut self, tx: &TxOut) -> WorkerResult { - if let Some(_) = db::initiated_exists_for_code(&mut *self.db, tx.code).await? { + if db::initiated_exists_for_code(&mut *self.db, tx.code) + .await? + .is_some() + { // Known initiated we submit it assert_eq!(tx.amount.frac, 0); self.submit_tx(