taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit 1628e68a3bb29cbb42f3f76e2bd222a1463d601e
parent acd8bcc08592bdeacebdd806d897baf48badf652
Author: Antoine A <>
Date:   Tue, 23 Dec 2025 12:44:39 +0100

cyclos: add api and cli

Diffstat:
Mtaler-cyclos/cyclos.conf | 6++++++
Ataler-cyclos/src/api.rs | 221+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtaler-cyclos/src/bin/cyclos-harness.rs | 25++++++++++++-------------
Mtaler-cyclos/src/config.rs | 49+++++++++++++++++++++++++++++++++++++++++++++++--
Mtaler-cyclos/src/db.rs | 44++++++++++++++++++++++++++++++++++++++++++--
Mtaler-cyclos/src/lib.rs | 27+++++++++++++++++++++++++--
Mtaler-cyclos/src/main.rs | 129+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
Ataler-cyclos/src/setup.rs | 22++++++++++++++++++++++
Mtaler-cyclos/tests/api.rs | 45++++++++++++++++++++++++++++-----------------
Mtaler-magnet-bank/src/db.rs | 19++++++++++++++++++-
Mtaler-magnet-bank/src/main.rs | 22++++++----------------
Mtaler-magnet-bank/tests/api.rs | 3++-
12 files changed, 556 insertions(+), 56 deletions(-)

diff --git a/taler-cyclos/cyclos.conf b/taler-cyclos/cyclos.conf @@ -2,6 +2,12 @@ # Adapter currency CURRENCY = +# IBAN of the Cyclos account to sync +ACCOUNT_ID = + +# Legal entity that is associated with the Cyclos account +NAME = + [cyclos-worker] # URL of the Cyclos API server API_URL = "https://mobil.magnetbank.hu" diff --git a/taler-cyclos/src/api.rs b/taler-cyclos/src/api.rs @@ -0,0 +1,221 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use jiff::Timestamp; +use taler_api::{ + api::{TalerApi, revenue::Revenue, wire::WireGateway}, + error::{ApiResult, failure}, + subject::IncomingSubject, +}; +use taler_common::{ + api_common::{SafeU64, safe_u64}, + api_params::{History, Page}, + api_revenue::RevenueIncomingHistory, + api_wire::{ + AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, + IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, + TransferState, TransferStatus, + }, + error_code::ErrorCode, + types::{amount::Currency, payto::PaytoURI}, +}; +use tokio::sync::watch::Sender; + +use crate::{ + FullCyclosPayto, + db::{self, AddIncomingResult, TxInAdmin}, +}; + +pub struct CyclosApi { + pub pool: sqlx::PgPool, + pub currency: Currency, + pub payto: PaytoURI, + pub in_channel: Sender<i64>, + pub taler_in_channel: Sender<i64>, + pub out_channel: Sender<i64>, + pub taler_out_channel: Sender<i64>, +} + +impl CyclosApi { + pub async fn start(pool: sqlx::PgPool, payto: PaytoURI, currency: Currency) -> Self { + let in_channel = Sender::new(0); + let taler_in_channel = Sender::new(0); + let out_channel = Sender::new(0); + let taler_out_channel = Sender::new(0); + let tmp = Self { + pool: pool.clone(), + payto, + currency, + in_channel: in_channel.clone(), + taler_in_channel: taler_in_channel.clone(), + out_channel: out_channel.clone(), + taler_out_channel: taler_out_channel.clone(), + }; + tokio::spawn(db::notification_listener( + pool, + in_channel, + taler_in_channel, + out_channel, + taler_out_channel, + )); + tmp + } +} + +impl TalerApi for CyclosApi { + fn currency(&self) -> &str { + self.currency.as_ref() + } + + fn implementation(&self) -> Option<&str> { + Some("taler-cyclos") + } +} + +impl WireGateway for CyclosApi { + async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { + let creditor = FullCyclosPayto::try_from(&req.credit_account)?; + let result = db::make_transfer(&self.pool, &req, &creditor, &Timestamp::now()).await?; + match result { + db::TransferResult::Success { id, initiated_at } => Ok(TransferResponse { + timestamp: initiated_at.into(), + row_id: SafeU64::try_from(id).unwrap(), + }), + db::TransferResult::RequestUidReuse => Err(failure( + ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, + "request_uid used already", + )), + db::TransferResult::WtidReuse => Err(failure( + ErrorCode::BANK_TRANSFER_WTID_REUSED, + "wtid used already", + )), + } + } + + async fn transfer_page( + &self, + page: Page, + status: Option<TransferState>, + ) -> ApiResult<TransferList> { + Ok(TransferList { + transfers: db::transfer_page(&self.pool, &status, &self.currency, &page).await?, + debit_account: self.payto.clone(), + }) + } + + async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { + Ok(db::transfer_by_id(&self.pool, id, &self.currency).await?) + } + + async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { + Ok(OutgoingHistory { + outgoing_transactions: db::outgoing_history( + &self.pool, + &params, + &self.currency, + || self.taler_out_channel.subscribe(), + ) + .await?, + debit_account: self.payto.clone(), + }) + } + + async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { + Ok(IncomingHistory { + incoming_transactions: db::incoming_history( + &self.pool, + &params, + &self.currency, + || self.taler_in_channel.subscribe(), + ) + .await?, + credit_account: self.payto.clone(), + }) + } + + async fn add_incoming_reserve( + &self, + req: AddIncomingRequest, + ) -> ApiResult<AddIncomingResponse> { + let debtor = FullCyclosPayto::try_from(&req.debit_account)?; + let res = db::register_tx_in_admin( + &self.pool, + &TxInAdmin { + amount: req.amount.decimal(), + subject: format!("Admin incoming {}", req.reserve_pub), + debtor, + metadata: IncomingSubject::Reserve(req.reserve_pub), + }, + &Timestamp::now(), + ) + .await?; + match res { + AddIncomingResult::Success { + row_id, valued_at, .. + } => Ok(AddIncomingResponse { + row_id: safe_u64(row_id), + timestamp: valued_at.into(), + }), + AddIncomingResult::ReservePubReuse => Err(failure( + ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, + "reserve_pub used already".to_owned(), + )), + } + } + + async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> { + let debtor = FullCyclosPayto::try_from(&req.debit_account)?; + let res = db::register_tx_in_admin( + &self.pool, + &TxInAdmin { + amount: req.amount.decimal(), + subject: format!("Admin incoming KYC:{}", req.account_pub), + debtor, + metadata: IncomingSubject::Kyc(req.account_pub), + }, + &Timestamp::now(), + ) + .await?; + match res { + AddIncomingResult::Success { + row_id, valued_at, .. + } => Ok(AddKycauthResponse { + row_id: safe_u64(row_id), + timestamp: valued_at.into(), + }), + AddIncomingResult::ReservePubReuse => Err(failure( + ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, + "reserve_pub used already".to_owned(), + )), + } + } + + fn support_account_check(&self) -> bool { + false + } +} + +impl Revenue for CyclosApi { + async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> { + Ok(RevenueIncomingHistory { + incoming_transactions: db::revenue_history(&self.pool, &params, &self.currency, || { + self.in_channel.subscribe() + }) + .await?, + credit_account: self.payto.clone(), + }) + } +} diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs @@ -29,7 +29,6 @@ use taler_common::{ api_params::{History, Page}, api_wire::{IncomingBankTransaction, TransferRequest, TransferState}, config::Config, - db::{dbinit, pool}, taler_main, types::{ amount::{Amount, Currency, Decimal, decimal}, @@ -38,10 +37,10 @@ use taler_common::{ }; use taler_cyclos::{ CyclosId, FullCyclosPayto, - config::{AccountType, HarnessCfg, parse_db_cfg}, + config::{AccountType, HarnessCfg}, constants::CONFIG_SOURCE, cyclos_api::{api::CyclosAuth, client::Client, types::HistoryItem}, - db::{self, TransferResult}, + db::{self, TransferResult, dbinit}, worker::{Worker, WorkerError, WorkerResult, run_worker}, }; @@ -88,8 +87,14 @@ impl<'a> Harness<'a> { let (exchange, client) = tokio::try_join!(self.wire.accounts(), self.client.accounts()).unwrap(); ( - exchange[0].status.available_balance.unwrap_or(exchange[0].status.balance), - client[0].status.available_balance.unwrap_or(client[0].status.balance), + exchange[0] + .status + .available_balance + .unwrap_or(exchange[0].status.balance), + client[0] + .status + .available_balance + .unwrap_or(client[0].status.balance), ) } @@ -290,10 +295,7 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { step("Run Cyclos logic harness tests"); step("Prepare db"); - let db_cfg = parse_db_cfg(cfg)?; - let pool = pool(db_cfg.cfg, "cyclos").await?; - let mut db = pool.acquire().await?.detach(); - dbinit(&mut db, db_cfg.sql_dir.as_ref(), "cyclos", reset).await?; + let pool = dbinit(cfg, reset).await?; let cfg = HarnessCfg::parse(cfg)?; let client = reqwest::Client::new(); @@ -503,10 +505,7 @@ async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> { step("Run Cyclos online harness tests"); step("Prepare db"); - let db_cfg = parse_db_cfg(config)?; - let pool = pool(db_cfg.cfg, "cyclos").await?; - let mut db = pool.acquire().await?.detach(); - dbinit(&mut db, db_cfg.sql_dir.as_ref(), "cyclos", reset).await?; + let pool = dbinit(config, reset).await?; let cfg = HarnessCfg::parse(config)?; let http_client = reqwest::Client::new(); diff --git a/taler-cyclos/src/config.rs b/taler-cyclos/src/config.rs @@ -15,12 +15,18 @@ */ use reqwest::Url; -use taler_api::config::DbCfg; +use taler_api::{ + Serve, + config::{ApiCfg, DbCfg}, +}; use taler_common::{ config::{Config, ValueErr}, - map_config, types::amount::Currency, + map_config, + types::{amount::Currency, payto::PaytoURI}, }; +use crate::FullCyclosPayto; + #[derive(Debug, Clone, Copy)] pub enum AccountType { Exchange, @@ -31,6 +37,45 @@ pub fn parse_db_cfg(cfg: &Config) -> Result<DbCfg, ValueErr> { DbCfg::parse(cfg.section("cyclosdb-postgres")) } +pub fn parse_account_payto(cfg: &Config) -> Result<FullCyclosPayto, ValueErr> { + let sect = cfg.section("cyclos"); + let id = sect.parse("cyclos account id", "ACCOUNT_ID").require()?; + let name = sect.str("NAME").require()?; + + Ok(FullCyclosPayto::new(id, name)) +} + +/// taler-cyclos httpd config +pub struct ServeCfg { + pub payto: PaytoURI, + pub currency: Currency, + pub serve: Serve, + pub wire_gateway: Option<ApiCfg>, + pub revenue: Option<ApiCfg>, +} + +impl ServeCfg { + pub fn parse(cfg: &Config) -> Result<Self, ValueErr> { + let payto = parse_account_payto(cfg)?; + + let main_sect = cfg.section("cyclos"); + let sect = cfg.section("cyclos-httpd"); + + let serve = Serve::parse(sect)?; + + let wire_gateway = ApiCfg::parse(cfg.section("cyclos-httpd-wire-gateway-api"))?; + let revenue = ApiCfg::parse(cfg.section("cyclos-httpd-revenue-api"))?; + + Ok(Self { + payto: payto.as_payto(), + currency: main_sect.parse("currency", "CURRENCY").require()?, + serve, + wire_gateway, + revenue, + }) + } +} + /// taler-cyclos worker config pub struct WorkerCfg { pub currency: Currency, diff --git a/taler-cyclos/src/db.rs b/taler-cyclos/src/db.rs @@ -30,14 +30,54 @@ use taler_common::{ IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, TransferState, TransferStatus, }, + config::Config, types::{ amount::{Currency, Decimal}, payto::{PaytoImpl as _, PaytoURI}, }, }; -use tokio::sync::watch::Receiver; +use tokio::sync::watch::{Receiver, Sender}; -use crate::{CyclosId, FullCyclosPayto}; +use crate::{CyclosId, FullCyclosPayto, config::parse_db_cfg}; + +const SCHEMA: &str = "cyclos"; + +pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { + let db = parse_db_cfg(cfg)?; + let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; + Ok(pool) +} + +pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { + let db_cfg = parse_db_cfg(cfg)?; + let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; + let mut db = pool.acquire().await?; + taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?; + Ok(pool) +} + +pub async fn notification_listener( + pool: PgPool, + in_channel: Sender<i64>, + taler_in_channel: Sender<i64>, + out_channel: Sender<i64>, + taler_out_channel: Sender<i64>, +) -> sqlx::Result<()> { + taler_api::notification::notification_listener!(&pool, + "tx_in" => (row_id: i64) { + in_channel.send_replace(row_id); + }, + "taler_in" => (row_id: i64) { + taler_in_channel.send_replace(row_id); + }, + "tx_out" => (row_id: i64) { + out_channel.send_replace(row_id); + }, + "taler_out" => (row_id: i64) { + taler_out_channel.send_replace(row_id); + } + ) +} #[derive(Debug, Clone)] pub struct TxIn { diff --git a/taler-cyclos/src/lib.rs b/taler-cyclos/src/lib.rs @@ -14,16 +14,39 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::{fmt::Display, num::ParseIntError, ops::Deref, str::FromStr}; +use std::{fmt::Display, num::ParseIntError, ops::Deref, str::FromStr, sync::Arc}; -use taler_common::types::payto::{FullPayto, Payto, PaytoErr, PaytoImpl, PaytoURI, TransferPayto}; +use sqlx::PgPool; +use taler_api::api::{Router, TalerRouter as _}; +use taler_common::{ + config::Config, + types::payto::{FullPayto, Payto, PaytoErr, PaytoImpl, PaytoURI, TransferPayto}, +}; +use crate::{api::CyclosApi, config::ServeCfg}; + +pub mod api; pub mod config; pub mod constants; pub mod cyclos_api; pub mod db; +pub mod setup; pub mod worker; +pub async fn run_serve(cfg: &Config, pool: PgPool) -> anyhow::Result<()> { + let cfg = ServeCfg::parse(cfg)?; + let api = Arc::new(CyclosApi::start(pool, cfg.payto, cfg.currency).await); + let mut router = Router::new(); + if let Some(cfg) = cfg.wire_gateway { + router = router.wire_gateway(api.clone(), cfg.auth.method()); + } + if let Some(cfg) = cfg.revenue { + router = router.revenue(api, cfg.auth.method()); + } + router.serve(cfg.serve, None).await?; + Ok(()) +} + #[derive( Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay, )] diff --git a/taler-cyclos/src/main.rs b/taler-cyclos/src/main.rs @@ -14,6 +14,131 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -pub fn main() { - // TODO +use anyhow::bail; +use clap::Parser as _; +use taler_api::config::{ApiCfg, AuthCfg}; +use taler_build::long_version; +use taler_common::{CommonArgs, cli::ConfigCmd, config::Config, taler_main}; +use taler_cyclos::{ + config::{ServeCfg, WorkerCfg, parse_account_payto}, + constants::CONFIG_SOURCE, + db::{dbinit, pool}, + run_serve, setup, + worker::run_worker, +}; + +#[derive(clap::Parser, Debug)] +#[command(long_version = long_version(), about, long_about = None)] +struct Args { + #[clap(flatten)] + common: CommonArgs, + + #[command(subcommand)] + cmd: Command, +} + +#[derive(clap::Subcommand, Debug)] +enum Command { + /// Initialize taler-cyclos database + Dbinit { + /// Reset database (DANGEROUS: All existing data is lost) + #[clap(long, short)] + reset: bool, + }, + /// Check taler-cyclos config + Setup { + /// Reset connection info and overwrite keys file + #[clap(long, short)] + reset: bool, + }, + /// Run taler-cyclos worker + Worker { + /// Execute once and return + #[clap(long, short)] + transient: bool, + }, + /// Run taler-cyclos HTTP server + Serve { + /// Check whether an API is in use (if it's useful to start the HTTP + /// server). Exit with 0 if at least one API is enabled, otherwise 1 + #[clap(long)] + check: bool, + }, + #[command(subcommand)] + TalerDeployment(TalerDeployment), + #[command(subcommand)] + Config(ConfigCmd), +} + +/// Helpers to integrate taler-magnet-bank with taler-exchange +#[derive(clap::Subcommand, Debug)] +enum TalerDeployment { + /// Output the exchange payto + ExchangePayto, + /// Output the wire gateway credentials configuration + WireGatewayCredentials, +} + +async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> { + match cmd { + Command::Dbinit { reset } => { + dbinit(cfg, reset).await?; + } + Command::Setup { reset } => { + let cfg = WorkerCfg::parse(cfg)?; + setup::setup(cfg, reset).await? + } + Command::Serve { check } => { + if check { + let cfg = ServeCfg::parse(cfg)?; + if cfg.revenue.is_none() && cfg.wire_gateway.is_none() { + std::process::exit(1); + } + } else { + let pool = pool(cfg).await?; + run_serve(cfg, pool).await?; + } + } + Command::Worker { transient: _ } => { + let pool = pool(cfg).await?; + let client = reqwest::Client::new(); + run_worker(cfg, &pool, &client).await?; + } + Command::Config(cfg_cmd) => cfg_cmd.run(cfg)?, + Command::TalerDeployment(cmd) => match cmd { + TalerDeployment::ExchangePayto => { + let payto = parse_account_payto(cfg)?; + println!("{payto}"); + } + TalerDeployment::WireGatewayCredentials => { + let wire_gateway = ApiCfg::parse(cfg.section("cyclos-httpd-wire-gateway-api"))?; + let Some(wire_gateway) = wire_gateway else { + bail!("Wire Gateway API is disabled"); + }; + + match wire_gateway.auth { + AuthCfg::Basic { username, password } => { + println!("WIRE_GATEWAY_AUTH_METHOD = basic"); + println!("USERNAME = {username}"); + println!("PASSWORD = {password}"); + } + AuthCfg::Bearer(token) => { + println!("WIRE_GATEWAY_AUTH_METHOD = bearer"); + println!("AUTH_TOKEN = {token}"); + } + AuthCfg::None => { + println!("WIRE_GATEWAY_AUTH_METHOD = none"); + } + } + } + }, + } + Ok(()) +} + +fn main() { + let args = Args::parse(); + taler_main(CONFIG_SOURCE, args.common, |cfg| async move { + run(args.cmd, &cfg).await + }); } diff --git a/taler-cyclos/src/setup.rs b/taler-cyclos/src/setup.rs @@ -0,0 +1,22 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use crate::config::WorkerCfg; + +pub async fn setup(_cfg: WorkerCfg, _reset: bool) -> anyhow::Result<()> { + // TODO + Ok(()) +} diff --git a/taler-cyclos/tests/api.rs b/taler-cyclos/tests/api.rs @@ -14,16 +14,26 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::sync::Arc; +use std::{str::FromStr, sync::Arc}; -use jiff::{Timestamp, Zoned}; +use jiff::Timestamp; use sqlx::PgPool; use taler_api::{api::TalerRouter as _, auth::AuthMethod, subject::OutgoingSubject}; use taler_common::{ api_common::ShortHashCode, api_revenue::RevenueConfig, api_wire::{OutgoingHistory, TransferState, WireConfig}, - types::{amount::amount, payto::payto, url}, + types::{ + amount::{Currency, decimal}, + payto::payto, + url, + }, +}; +use taler_cyclos::{ + api::CyclosApi, + constants::CONFIG_SOURCE, + cyclos_payto, + db::{self, TxOutKind}, }; use taler_test_utils::{ Router, @@ -32,14 +42,13 @@ use taler_test_utils::{ server::TestServer, }; -/* - async fn setup() -> (Router, PgPool) { let pool = db_test_setup(CONFIG_SOURCE).await; let api = Arc::new( - MagnetApi::start( + CyclosApi::start( pool.clone(), payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), + Currency::from_str("TEST").unwrap(), ) .await, ); @@ -70,7 +79,7 @@ async fn transfer() { transfer_routine( &server, TransferState::pending, - &payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), + &payto("payto://cyclos/7762070814178012479?receiver-name=name"), ) .await; } @@ -91,18 +100,21 @@ async fn outgoing_history() { let acquire = pool.acquire(); async move { let mut conn = acquire.await.unwrap(); - let now = Zoned::now().date(); db::register_tx_out( &mut *conn, &db::TxOut { - code: i as u64, - amount: amount("EUR:10"), + transfer_id: i as u64, + tx_id: if i % 2 == 0 { + Some((i % 2) as u64) + } else { + None + }, + amount: decimal("10"), subject: "subject".to_owned(), - creditor: magnet_payto( - "payto://iban/HU30162000031000163100000000?receiver-name=name", + creditor: cyclos_payto( + "payto://cyclos/7762070814178012479?receiver-name=name", ), - value_date: now, - status: TxStatus::Completed, + valued_at: Timestamp::now(), }, &TxOutKind::Talerable(OutgoingSubject( ShortHashCode::rand(), @@ -123,7 +135,7 @@ async fn admin_add_incoming() { let (server, _) = setup().await; admin_add_incoming_routine( &server, - &payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), + &payto("payto://cyclos/7762070814178012479?receiver-name=name"), true, ) .await; @@ -134,9 +146,8 @@ async fn revenue() { let (server, _) = setup().await; revenue_routine( &server, - &payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), + &payto("payto://cyclos/7762070814178012479?receiver-name=name"), true, ) .await; } -*/ diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs @@ -30,11 +30,28 @@ use taler_common::{ IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, TransferState, TransferStatus, }, + config::Config, types::{amount::Amount, payto::PaytoImpl as _}, }; use tokio::sync::watch::{Receiver, Sender}; -use crate::{FullHuPayto, constants::CURRENCY, magnet_api::types::TxStatus}; +use crate::{FullHuPayto, config::parse_db_cfg, constants::CURRENCY, magnet_api::types::TxStatus}; + +const SCHEMA: &str = "magnet_bank"; + +pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { + let db = parse_db_cfg(cfg)?; + let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; + Ok(pool) +} + +pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { + let db_cfg = parse_db_cfg(cfg)?; + let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; + let mut db = pool.acquire().await?; + taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?; + Ok(pool) +} pub async fn notification_listener( pool: PgPool, diff --git a/taler-magnet-bank/src/main.rs b/taler-magnet-bank/src/main.rs @@ -18,16 +18,11 @@ use anyhow::bail; use clap::Parser; use taler_api::config::{ApiCfg, AuthCfg}; use taler_build::long_version; -use taler_common::{ - CommonArgs, - cli::ConfigCmd, - config::Config, - db::{dbinit, pool}, - taler_main, -}; +use taler_common::{CommonArgs, cli::ConfigCmd, config::Config, taler_main}; use taler_magnet_bank::{ - config::{ServeCfg, WorkerCfg, parse_account_payto, parse_db_cfg}, + config::{ServeCfg, WorkerCfg, parse_account_payto}, constants::CONFIG_SOURCE, + db::{dbinit, pool}, dev::{self, DevCmd}, run_serve, run_worker, setup, }; @@ -90,10 +85,7 @@ enum TalerDeployment { async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> { match cmd { Command::Dbinit { reset } => { - let cfg = parse_db_cfg(cfg)?; - let pool = pool(cfg.cfg, "magnet_bank").await?; - let mut conn = pool.acquire().await?; - dbinit(&mut conn, cfg.sql_dir.as_ref(), "magnet-bank", reset).await?; + dbinit(cfg, reset).await?; } Command::Setup { reset } => { let cfg = WorkerCfg::parse(cfg)?; @@ -106,14 +98,12 @@ async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> { std::process::exit(1); } } else { - let db = parse_db_cfg(cfg)?; - let pool = pool(db.cfg, "magnet_bank").await?; + let pool = pool(cfg).await?; run_serve(cfg, pool).await?; } } Command::Worker { transient: _ } => { - let db = parse_db_cfg(cfg)?; - let pool = pool(db.cfg, "magnet_bank").await?; + let pool = pool(cfg).await?; let client = reqwest::Client::new(); run_worker(cfg, &pool, &client).await?; } diff --git a/taler-magnet-bank/tests/api.rs b/taler-magnet-bank/tests/api.rs @@ -33,7 +33,8 @@ use taler_magnet_bank::{ magnet_payto, }; use taler_test_utils::{ - Router, db::db_test_setup, + Router, + db::db_test_setup, routine::{admin_add_incoming_routine, revenue_routine, routine_pagination, transfer_routine}, server::TestServer, };