taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit f44a17e74091ce8fdd42f2be507a046c51623844
parent d74a04e7b9e16049e5f3fcc1ffdaf0aa19d24d59
Author: Antoine A <>
Date:   Sun, 28 Dec 2025 21:17:51 +0100

cyclos: improve setup and support multi accounts and payments types

Diffstat:
Mdebian/etc/taler-cyclos/conf.d/cyclos-worker.conf | 6++++++
Mdebian/etc/taler-cyclos/taler-cyclos.conf | 2+-
Mtaler-cyclos/cyclos.conf | 8+++++++-
Mtaler-cyclos/src/bin/cyclos-harness.rs | 41++++++++++++++++++++++++-----------------
Mtaler-cyclos/src/config.rs | 65++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-------
Mtaler-cyclos/src/cyclos_api/api.rs | 3++-
Mtaler-cyclos/src/cyclos_api/client.rs | 8+++++---
Mtaler-cyclos/src/cyclos_api/types.rs | 2+-
Mtaler-cyclos/src/main.rs | 6+++---
Mtaler-cyclos/src/setup.rs | 132++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---
Mtaler-cyclos/src/worker.rs | 20+++++++++++++-------
11 files changed, 248 insertions(+), 45 deletions(-)

diff --git a/debian/etc/taler-cyclos/conf.d/cyclos-worker.conf b/debian/etc/taler-cyclos/conf.d/cyclos-worker.conf @@ -1,4 +1,10 @@ # Configuration the cyclos adapter worker. [cyclos-worker] +# Cyclos account type ID to index +ACCOUNT_TYPE_ID = + +# Cyclos payment type ID to use for transfers +PAYMENT_TYPE_ID = + @inline-secret@ cyclos-worker ../secrets/cyclos-worker.secret.conf diff --git a/debian/etc/taler-cyclos/taler-cyclos.conf b/debian/etc/taler-cyclos/taler-cyclos.conf @@ -24,7 +24,7 @@ # Adapter currency CURRENCY = -# IBAN of the Cyclos account to sync +# Account ID of the Cyclos account to sync ACCOUNT_ID = # Legal entity that is associated with the Cyclos account diff --git a/taler-cyclos/cyclos.conf b/taler-cyclos/cyclos.conf @@ -2,7 +2,7 @@ # Adapter currency CURRENCY = -# IBAN of the Cyclos account to sync +# Account ID of the Cyclos account to sync ACCOUNT_ID = # Legal entity that is associated with the Cyclos account @@ -22,6 +22,12 @@ PASSWORD = # This can either can be normal or exchange. # Exchange accounts bounce invalid incoming Taler transactions. ACCOUNT_TYPE = exchange + +# Cyclos account type ID to index +ACCOUNT_TYPE_ID = + +# Cyclos payment type ID to use for transfers +PAYMENT_TYPE_ID = [cyclos-httpd] # How "taler-cyclos serve" serves its API, this can either be tcp or unix diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs @@ -41,6 +41,7 @@ use taler_cyclos::{ constants::CONFIG_SOURCE, cyclos_api::{api::CyclosAuth, client::Client, types::HistoryItem}, db::{self, TransferResult, dbinit}, + setup, worker::{Worker, WorkerError, WorkerResult, run_worker}, }; @@ -79,6 +80,8 @@ struct Harness<'a> { wire: Client<'a>, client_payto: FullCyclosPayto, wire_payto: FullCyclosPayto, + payment_type_id: u64, + account_type_id: u64, currency: Currency, } @@ -102,7 +105,7 @@ impl<'a> Harness<'a> { async fn client_send(&self, subject: &str, amount: Decimal) -> u64 { *self .client - .direct_payment(**self.wire_payto, amount, subject) + .direct_payment(**self.wire_payto, self.payment_type_id, amount, subject) .await .unwrap() .id @@ -112,7 +115,7 @@ impl<'a> Harness<'a> { async fn exchange_send(&self, subject: &str, amount: Decimal) -> u64 { *self .wire - .direct_payment(**self.client_payto, amount, subject) + .direct_payment(**self.client_payto, self.payment_type_id, amount, subject) .await .unwrap() .id @@ -126,7 +129,7 @@ impl<'a> Harness<'a> { /// Fetch last transfer related to client async fn client_last_transfer(&self) -> HistoryItem { self.client - .transfers(**self.client_payto) + .history(**self.client_payto) .await .unwrap() .remove(0) @@ -135,13 +138,13 @@ impl<'a> Harness<'a> { /// Run the worker once async fn worker(&'a self) -> WorkerResult { let db = &mut self.pool.acquire().await.unwrap().detach(); - let account = self.wire.accounts().await.unwrap()[0].clone(); Worker { db, currency: self.currency.clone(), client: &self.wire, - account_type_id: *account.ty.id, account_type: AccountType::Exchange, + account_type_id: self.account_type_id, + payment_type_id: self.payment_type_id, } .run() .await @@ -297,25 +300,25 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { step("Prepare db"); let pool = dbinit(cfg, reset).await?; - let cfg = HarnessCfg::parse(cfg)?; let client = reqwest::Client::new(); + setup::setup(cfg, reset, &client).await?; + let cfg = HarnessCfg::parse(cfg)?; let wire = Client { client: &client, - api_url: &cfg.worker.api_url, + api_url: &cfg.worker.host.api_url, auth: &CyclosAuth::Basic { - username: cfg.worker.username, - password: cfg.worker.password, + username: cfg.worker.host.username, + password: cfg.worker.host.password, }, }; let client = Client { client: &client, - api_url: &cfg.worker.api_url, + api_url: &cfg.worker.host.api_url, auth: &CyclosAuth::Basic { username: cfg.username, password: cfg.password, }, }; - let harness = Harness { pool: &pool, client_payto: client.whoami().await.unwrap().payto(), @@ -323,6 +326,8 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { client, wire, currency: cfg.worker.currency, + payment_type_id: *cfg.worker.payment_type_id, + account_type_id: *cfg.worker.account_type_id, }; step("Warmup"); @@ -506,20 +511,20 @@ async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> { step("Prepare db"); let pool = dbinit(config, reset).await?; - - let cfg = HarnessCfg::parse(config)?; let http_client = reqwest::Client::new(); + setup::setup(config, reset, &http_client).await?; + let cfg = HarnessCfg::parse(config)?; let wire = Client { client: &http_client, - api_url: &cfg.worker.api_url, + api_url: &cfg.worker.host.api_url, auth: &CyclosAuth::Basic { - username: cfg.worker.username, - password: cfg.worker.password, + username: cfg.worker.host.username, + password: cfg.worker.host.password, }, }; let client = Client { client: &http_client, - api_url: &cfg.worker.api_url, + api_url: &cfg.worker.host.api_url, auth: &CyclosAuth::Basic { username: cfg.username, password: cfg.password, @@ -533,6 +538,8 @@ async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> { client, wire, currency: cfg.worker.currency, + payment_type_id: *cfg.worker.payment_type_id, + account_type_id: *cfg.worker.account_type_id, }; step("Warmup worker"); diff --git a/taler-cyclos/src/config.rs b/taler-cyclos/src/config.rs @@ -25,7 +25,7 @@ use taler_common::{ types::{amount::Currency, payto::PaytoURI}, }; -use crate::FullCyclosPayto; +use crate::{CyclosId, FullCyclosPayto}; #[derive(Debug, Clone, Copy)] pub enum AccountType { @@ -76,13 +76,60 @@ impl ServeCfg { } } +/// Cyclos API config +pub struct HostCfg { + pub api_url: Url, + pub username: String, + pub password: String, +} + +impl HostCfg { + pub fn parse(cfg: &Config) -> Result<Self, ValueErr> { + let sect = cfg.section("cyclos-worker"); + Ok(Self { + username: sect.str("USERNAME").require()?, + api_url: sect.parse("URL", "API_URL").require()?, + password: sect.str("PASSWORD").require()?, + }) + } +} + +/// taler-cyclos setup config +pub struct SetupCfg { + pub currency: Currency, + pub host: HostCfg, + pub id: Option<CyclosId>, + pub name: Option<String>, + pub account_type_id: Option<CyclosId>, + pub payment_type_id: Option<CyclosId>, +} + +impl SetupCfg { + pub fn parse(cfg: &Config) -> Result<Self, ValueErr> { + let main_s = cfg.section("cyclos"); + let worker_s = cfg.section("cyclos-worker"); + Ok(Self { + currency: main_s.parse("currency", "CURRENCY").require()?, + id: main_s.parse("cyclos account id", "ACCOUNT_ID").opt()?, + account_type_id: worker_s + .parse("cyclos account type id", "ACCOUNT_TYPE_ID") + .opt()?, + payment_type_id: worker_s + .parse("cyclos payment type id", "PAYMENT_TYPE_ID") + .opt()?, + name: main_s.str("NAME").opt()?, + host: HostCfg::parse(cfg)?, + }) + } +} + /// taler-cyclos worker config pub struct WorkerCfg { pub currency: Currency, - pub username: String, - pub api_url: Url, - pub password: String, + pub host: HostCfg, pub account_type: AccountType, + pub account_type_id: CyclosId, + pub payment_type_id: CyclosId, } impl WorkerCfg { @@ -91,14 +138,18 @@ impl WorkerCfg { let sect = cfg.section("cyclos-worker"); Ok(Self { currency: main_sect.parse("currency", "CURRENCY").require()?, - username: sect.str("USERNAME").require()?, account_type: map_config!(sect, "account type", "ACCOUNT_TYPE", "exchange" => { Ok(AccountType::Exchange) }, "normal" => { Ok(AccountType::Normal) } ) .require()?, - api_url: sect.parse("URL", "API_URL").require()?, - password: sect.str("PASSWORD").require()?, + account_type_id: sect + .parse("cyclos account type id", "ACCOUNT_TYPE_ID") + .require()?, + payment_type_id: sect + .parse("cyclos payment type id", "PAYMENT_TYPE_ID") + .require()?, + host: HostCfg::parse(cfg)?, }) } } diff --git a/taler-cyclos/src/cyclos_api/api.rs b/taler-cyclos/src/cyclos_api/api.rs @@ -20,6 +20,7 @@ use reqwest::{Client, Method, RequestBuilder, StatusCode, Url}; use serde::{Deserialize, Serialize, de::DeserializeOwned}; use taler_common::error::FmtSource; use thiserror::Error; +use tracing::trace; use crate::cyclos_api::types::{ ForbiddenError, InputError, NotFoundError, UnauthorizedError, UnexpectedError, @@ -92,7 +93,7 @@ fn parse<'de, T: Deserialize<'de>>(str: &'de str) -> Result<T, ErrKind> { async fn json_body<T: DeserializeOwned>(res: reqwest::Response) -> Result<T, ErrKind> { // TODO check content type? let body = res.text().await?; - //println!("{body}"); + trace!("JSON body: {body}"); let parsed = parse(&body)?; Ok(parsed) } diff --git a/taler-cyclos/src/cyclos_api/client.rs b/taler-cyclos/src/cyclos_api/client.rs @@ -60,7 +60,8 @@ impl Client<'_> { pub async fn direct_payment( &self, - account_id: u64, + creditor: u64, + payment_type_id: u64, amount: Decimal, description: &str, ) -> ApiResult<Transaction> { @@ -68,7 +69,8 @@ impl Client<'_> { .json(&json!({ "description": description, "scheduling": "direct", - "subject": account_id, + "subject": creditor, + "type": payment_type_id, "amount": amount })) .parse_json() @@ -81,7 +83,7 @@ impl Client<'_> { .await } - pub async fn transfers(&self, account_type_id: u64) -> ApiResult<Vec<HistoryItem>> { + pub async fn history(&self, account_type_id: u64) -> ApiResult<Vec<HistoryItem>> { self.request( Method::GET, format!("self/accounts/{account_type_id}/history"), diff --git a/taler-cyclos/src/cyclos_api/types.rs b/taler-cyclos/src/cyclos_api/types.rs @@ -52,7 +52,6 @@ pub struct AccountStatus { #[derive(Debug, Deserialize, Clone)] #[serde(rename_all = "camelCase")] pub struct Account { - pub id: CyclosId, pub currency: Currency, pub status: AccountStatus, #[serde(rename = "type")] @@ -84,6 +83,7 @@ pub struct PaymentData { #[serde(rename_all = "camelCase")] pub struct DataForTransaction { pub payment_type_data: PaymentData, + pub payment_types: Vec<PaymentData>, } #[derive(Debug, Deserialize)] diff --git a/taler-cyclos/src/main.rs b/taler-cyclos/src/main.rs @@ -20,7 +20,7 @@ use taler_api::config::{ApiCfg, AuthCfg}; use taler_build::long_version; use taler_common::{CommonArgs, cli::ConfigCmd, config::Config, taler_main}; use taler_cyclos::{ - config::{ServeCfg, WorkerCfg, parse_account_payto}, + config::{ServeCfg, parse_account_payto}, constants::CONFIG_SOURCE, db::{dbinit, pool}, run_serve, setup, @@ -85,8 +85,8 @@ async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> { dbinit(cfg, reset).await?; } Command::Setup { reset } => { - let cfg = WorkerCfg::parse(cfg)?; - setup::setup(cfg, reset).await? + let client = reqwest::Client::new(); + setup::setup(cfg, reset, &client).await? } Command::Serve { check } => { if check { diff --git a/taler-cyclos/src/setup.rs b/taler-cyclos/src/setup.rs @@ -14,9 +14,133 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use crate::config::WorkerCfg; +use std::fmt::Write as _; -pub async fn setup(_cfg: WorkerCfg, _reset: bool) -> anyhow::Result<()> { - // TODO - Ok(()) +use taler_common::config::Config; +use tracing::{debug, error, info, warn}; + +use crate::{ + config::SetupCfg, + cyclos_api::{api::CyclosAuth, client::Client}, +}; + +pub async fn setup(cfg: &Config, _reset: bool, client: &reqwest::Client) -> anyhow::Result<()> { + let cfg = SetupCfg::parse(cfg)?; + let client = Client { + client, + api_url: &cfg.host.api_url, + auth: &CyclosAuth::Basic { + username: cfg.host.username, + password: cfg.host.password, + }, + }; + + info!(target: "setup", "Check API access and configuration"); + let mut ready = true; + + let whoami = client.whoami().await?; + if let Some(id) = cfg.id { + if whoami.id != id { + error!( + target: "setup", + "Expected ACCOUNT_ID {id} from config got {} from server", + whoami.id + ); + ready = false; + } else { + debug!(target: "setup", "ACCOUNT_ID {id} from config match server") + } + } else { + error!(target: "setup", "Missing ACCOUNT_ID got {} from server", whoami.id); + ready = false; + } + if let Some(name) = cfg.name { + if whoami.name != name { + warn!( + target: "setup", + "Expected NAME '{name}' from config got '{}' from server", + whoami.name + ) + } else { + debug!(target: "setup", "NAME {name} from config match server") + } + } else { + error!(target: "setup", "Missing NAME got '{}' from server", whoami.name); + ready = false; + } + + // Check account type + let accounts = client.accounts().await?; + let accounts_fmt = { + let mut s = String::new(); + for a in &accounts { + write!( + &mut s, + "\n{} '{}' in {}", + a.ty.id, a.ty.name, a.currency.ty.name, + ) + .unwrap(); + } + s + }; + let currency = if let Some(id) = cfg.account_type_id { + if let Some(p) = accounts.iter().find(|it| it.ty.id == id) { + debug!(target: "setup", "config ACCOUNT_TYPE_ID {id} one of:{accounts_fmt}"); + Some(&p.currency) + } else { + error!(target: "setup", "got unknown ACCOUNT_TYPE_ID {id} in config, expected one of:{accounts_fmt}"); + ready = false; + None + } + } else { + error!(target: "setup", "no ACCOUNT_TYPE_ID in config, must be one of:{accounts_fmt}"); + ready = false; + None + }; + + if let Some(currency) = currency { + // Check currency + if currency.ty.name != cfg.currency.as_ref() { + warn!(target: "setup", + "Expected CURRENCY '{}' from config got '{}' from server", + cfg.currency, currency.ty.name + ); + } + + // Check payment type + let data = client.payment_data().await?; + let payment_fmt = { + let mut s = String::new(); + for p in &data.payment_types { + if p.currency.ty.id == currency.ty.id { + write!(&mut s, "\n{} {} {}", p.ty.id, p.currency.ty.name, p.ty.name).unwrap(); + } + } + s + }; + if let Some(id) = cfg.payment_type_id { + if let Some(p) = data.payment_types.iter().find(|it| it.ty.id == id) { + if p.currency.ty.id != currency.ty.id { + error!(target: "setup", "PAYMENT_TYPE_ID {} in config use currency {} expected {}, should be one of:{payment_fmt}", p.ty.id, p.currency.ty.name, currency.ty.name); + ready = false + } else { + debug!(target: "setup", "PAYMENT_TYPE_ID {id} config is one of:{payment_fmt}"); + } + } else { + error!(target: "setup", "unknown PAYMENT_TYPE_ID {id} in config, should be one of:{payment_fmt}"); + ready = false; + } + } else { + error!(target: "setup", "no PAYMENT_TYPE_ID in config, must be one of:{payment_fmt}"); + ready = false; + } + } + + if ready { + info!(target: "setup", "Setup finished"); + Ok(()) + } else { + error!(target: "setup", "Setup failed, config need editing"); + std::process::exit(0); + } } diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs @@ -59,17 +59,16 @@ pub async fn run_worker( let cfg = WorkerCfg::parse(cfg)?; let client = Client { client, - api_url: &cfg.api_url, + api_url: &cfg.host.api_url, auth: &CyclosAuth::Basic { - username: cfg.username, - password: cfg.password, + username: cfg.host.username, + password: cfg.host.password, }, }; let mut jitter = ExpoBackoffDecorr::default(); loop { let res: WorkerResult = async { let db = &mut PgListener::connect_with(pool).await?; - let account = client.accounts().await.unwrap()[0].clone(); // TODO take a postgresql lock ? @@ -82,7 +81,8 @@ pub async fn run_worker( Worker { client: &client, db: db.acquire().await?, - account_type_id: *account.ty.id, + account_type_id: *cfg.account_type_id, + payment_type_id: *cfg.payment_type_id, account_type: cfg.account_type, currency: cfg.currency.clone(), } @@ -113,6 +113,7 @@ pub struct Worker<'a> { pub db: &'a mut PgConnection, pub currency: Currency, pub account_type_id: u64, + pub payment_type_id: u64, pub account_type: AccountType, } @@ -123,7 +124,7 @@ impl Worker<'_> { //let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused loop { - let transfers = self.client.transfers(self.account_type_id).await?; + let transfers = self.client.history(self.account_type_id).await?; for transfer in transfers { let tx = extract_tx_info(transfer); match tx { @@ -146,7 +147,12 @@ impl Worker<'_> { debug!(target: "worker", "send tx {initiated}"); let res = self .client - .direct_payment(initiated.creditor.0, initiated.amount, &initiated.subject) + .direct_payment( + initiated.creditor.0, + self.payment_type_id, + initiated.amount, + &initiated.subject, + ) .await; fail_point("direct-payment")?; match res {