commit 527c75419ded4d76d25ad26d76da11b45c5292bd
parent e93f49639c6229d5f2dbbdabb05e5301f0b9b82e
Author: Antoine A <>
Date: Wed, 29 Jan 2025 17:30:07 +0100
magnet-bank: revenue API, more rename and other improvements
Diffstat:
35 files changed, 2309 insertions(+), 2109 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -2593,7 +2593,7 @@ dependencies = [
]
[[package]]
-name = "taler-magnet-bank-adapter"
+name = "taler-magnet-bank"
version = "0.1.0"
dependencies = [
"anyhow",
diff --git a/Cargo.toml b/Cargo.toml
@@ -4,7 +4,7 @@ members = [
"common/taler-api",
"common/taler-common",
"common/taler-test-utils",
- "adapter/taler-magnet-bank-adapter",
+ "taler-magnet-bank",
]
[profile.dev]
diff --git a/adapter/taler-magnet-bank-adapter/Cargo.toml b/adapter/taler-magnet-bank-adapter/Cargo.toml
@@ -1,40 +0,0 @@
-[package]
-name = "taler-magnet-bank-adapter"
-version = "0.1.0"
-edition = "2021"
-
-[dependencies]
-rand_core = { version = "0.6.4" }
-reqwest = { version = "0.12", default-features = false, features = [
- "json",
- "native-tls",
-] }
-hmac = "0.12"
-sha1 = "0.10"
-p256 = { version = "0.13.2", features = ["alloc", "ecdsa"] }
-spki = "0.7.3"
-base64 = "0.22"
-form_urlencoded = "1.2"
-percent-encoding = "2.3"
-passterm = "2.0"
-sqlx = { workspace = true, features = [
- "postgres",
- "runtime-tokio-native-tls",
- "tls-native-tls",
-] }
-serde_json = { workspace = true, features = ["raw_value"] }
-jiff = { workspace = true, features = ["serde"] }
-taler-common.workspace = true
-taler-api.workspace = true
-clap.workspace = true
-serde.workspace = true
-serde_path_to_error.workspace = true
-serde_urlencoded.workspace = true
-thiserror.workspace = true
-tracing.workspace = true
-tracing-subscriber.workspace = true
-tokio.workspace = true
-anyhow.workspace = true
-
-[dev-dependencies]
-taler-test-utils.workspace = true
diff --git a/adapter/taler-magnet-bank-adapter/src/adapter.rs b/adapter/taler-magnet-bank-adapter/src/adapter.rs
@@ -1,189 +0,0 @@
-/*
- This file is part of TALER
- Copyright (C) 2025 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-
-use taler_api::{
- api::{wire::WireGateway, TalerComponent},
- error::{failure, ApiResult},
- subject::IncomingSubject,
-};
-use taler_common::{
- api_common::{safe_u64, SafeU64},
- api_params::{History, Page},
- api_wire::{
- AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse,
- IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse,
- TransferState, TransferStatus,
- },
- error_code::ErrorCode,
- types::{payto::Payto, timestamp::Timestamp},
-};
-use tokio::sync::watch::Sender;
-
-use crate::{
- constant::CURRENCY,
- db::{self, AddIncomingResult, TxInAdmin},
- MagnetPayto,
-};
-
-pub struct MagnetAdapter {
- pub pool: sqlx::PgPool,
- pub payto: Payto,
- pub in_channel: Sender<i64>,
- pub taler_in_channel: Sender<i64>,
- pub out_channel: Sender<i64>,
- pub taler_out_channel: Sender<i64>,
-}
-
-impl MagnetAdapter {
- pub async fn start(pool: sqlx::PgPool, payto: Payto) -> Self {
- let in_channel = Sender::new(0);
- let taler_in_channel = Sender::new(0);
- let out_channel = Sender::new(0);
- let taler_out_channel = Sender::new(0);
- let tmp = Self {
- pool: pool.clone(),
- payto,
- in_channel: in_channel.clone(),
- taler_in_channel: taler_in_channel.clone(),
- out_channel: out_channel.clone(),
- taler_out_channel: taler_out_channel.clone(),
- };
- tokio::spawn(db::notification_listener(
- pool,
- in_channel,
- taler_in_channel,
- out_channel,
- taler_out_channel,
- ));
- tmp
- }
-}
-
-impl TalerComponent for MagnetAdapter {
- fn currency(&self) -> &str {
- CURRENCY
- }
-
- fn implementation(&self) -> Option<&str> {
- Some("taler-magnet-bank-adapter")
- }
-}
-
-impl WireGateway for MagnetAdapter {
- async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> {
- let creditor = MagnetPayto::try_from(&req.credit_account)?;
- let result = db::make_transfer(&self.pool, &req, &creditor, &Timestamp::now()).await?;
- match result {
- db::TransferResult::Success { id, timestamp } => Ok(TransferResponse {
- timestamp,
- row_id: SafeU64::try_from(id).unwrap(),
- }),
- db::TransferResult::RequestUidReuse => Err(failure(
- ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED,
- "request_uid used already",
- )),
- db::TransferResult::WtidReuse => unimplemented!(),
- }
- }
-
- async fn transfer_page(
- &self,
- page: Page,
- status: Option<TransferState>,
- ) -> ApiResult<TransferList> {
- Ok(TransferList {
- transfers: db::transfer_page(&self.pool, &status, &page).await?,
- debit_account: self.payto.clone(),
- })
- }
-
- async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> {
- Ok(db::transfer_by_id(&self.pool, id).await?)
- }
-
- async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> {
- Ok(OutgoingHistory {
- outgoing_transactions: db::outgoing_history(&self.pool, ¶ms, || {
- self.taler_out_channel.subscribe()
- })
- .await?,
- debit_account: self.payto.clone(),
- })
- }
-
- async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> {
- Ok(IncomingHistory {
- incoming_transactions: db::incoming_history(&self.pool, ¶ms, || {
- self.taler_in_channel.subscribe()
- })
- .await?,
- credit_account: self.payto.clone(),
- })
- }
-
- async fn add_incoming_reserve(
- &self,
- req: AddIncomingRequest,
- ) -> ApiResult<AddIncomingResponse> {
- let debtor = MagnetPayto::try_from(&req.debit_account)?;
- let res = db::register_tx_in_admin(
- &self.pool,
- &TxInAdmin {
- amount: req.amount,
- subject: format!("Admin incoming {}", req.reserve_pub),
- debtor,
- timestamp: Timestamp::now(),
- metadata: IncomingSubject::Reserve(req.reserve_pub),
- },
- )
- .await?;
- match res {
- AddIncomingResult::Success(res) => Ok(AddIncomingResponse {
- row_id: safe_u64(res.row_id),
- timestamp: res.timestamp,
- }),
- AddIncomingResult::ReservePubReuse => Err(failure(
- ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT,
- "reserve_pub used already".to_owned(),
- )),
- }
- }
-
- async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> {
- let debtor = MagnetPayto::try_from(&req.debit_account)?;
- let res = db::register_tx_in_admin(
- &self.pool,
- &TxInAdmin {
- amount: req.amount,
- subject: format!("Admin incoming KYC:{}", req.account_pub),
- debtor,
- timestamp: Timestamp::now(),
- metadata: IncomingSubject::Kyc(req.account_pub),
- },
- )
- .await?;
- match res {
- AddIncomingResult::Success(res) => Ok(AddKycauthResponse {
- row_id: safe_u64(res.row_id),
- timestamp: res.timestamp,
- }),
- AddIncomingResult::ReservePubReuse => Err(failure(
- ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT,
- "reserve_pub used already".to_owned(),
- )),
- }
- }
-}
diff --git a/adapter/taler-magnet-bank-adapter/src/config.rs b/adapter/taler-magnet-bank-adapter/src/config.rs
@@ -1,101 +0,0 @@
-/*
- This file is part of TALER
- Copyright (C) 2025 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-
-use std::net::{IpAddr, SocketAddr};
-
-use base64::{prelude::BASE64_STANDARD, Engine};
-use reqwest::Url;
-use sqlx::postgres::PgConnectOptions;
-use taler_api::{auth::AuthMethod, Serve};
-use taler_common::config::{map_config, Config, ValueErr};
-
-use crate::magnet::Token;
-
-pub struct DbConfig {
- pub cfg: PgConnectOptions,
-}
-
-impl DbConfig {
- pub fn parse(cfg: &Config) -> Result<Self, ValueErr> {
- let sect = cfg.section("taler-magnet-bank-adapter-postgres");
- Ok(Self {
- cfg: sect.postgres("CONFIG").require()?,
- })
- }
-}
-
-pub struct WireGatewayConfig {
- pub serve: Serve,
- pub auth: AuthMethod,
-}
-
-impl WireGatewayConfig {
- pub fn parse(cfg: &Config) -> Result<Self, ValueErr> {
- let sect = cfg.section("taler-magnet-bank-adapter");
-
- let serve = map_config!(sect, "serve", "SERVE",
- "tcp" => {
- let port = sect.number("PORT").require()?;
- let ip: IpAddr = sect.parse("IP addr", "BIND_TO").require()?;
- Ok::<Serve, ValueErr>(Serve::Tcp(SocketAddr::new(ip, port)))
- },
- "unix" => {
- let path = sect.path("UNIXPATH").require()?;
- let permission = sect.unix_mode("UNIXPATH_MODE").require()?;
- Ok::<Serve, ValueErr>(Serve::Unix { path, permission })
- }
- )
- .require()?;
- let auth = map_config!(sect, "auth_method", "AUTH_METHOD",
- "none" => {
- Ok(AuthMethod::None)
- },
- "basic" => {
- let username = sect.str("USERNAME").require()?;
- let password = sect.str("PASSWORD").require()?;
- Ok(AuthMethod::Basic(
- BASE64_STANDARD.encode(format!("{username}:{password}")),
- ))
- },
- "bearer" => {
- Ok(AuthMethod::Bearer(sect.str("AUTH_TOKEN").require()?))
- }
- )
- .require()?;
-
- Ok(Self { serve, auth })
- }
-}
-
-pub struct MagnetConfig {
- pub api_url: Url,
- pub consumer: Token,
- pub keys_path: String,
-}
-
-impl MagnetConfig {
- pub fn parse(cfg: &Config) -> Result<Self, ValueErr> {
- let sect = cfg.section("taler-magnet-bank-adapter");
- Ok(Self {
- api_url: sect.parse("URL", "API_URL").require()?,
- consumer: Token {
- key: sect.str("CONSUMER_KEY").require()?,
- secret: sect.str("CONSUMER_SECRET").require()?,
- },
- keys_path: sect.path("KEYS_FILE").require()?,
- })
- }
-}
diff --git a/adapter/taler-magnet-bank-adapter/src/db.rs b/adapter/taler-magnet-bank-adapter/src/db.rs
@@ -1,1103 +0,0 @@
-/*
- This file is part of TALER
- Copyright (C) 2025 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-
-use std::fmt::Display;
-
-use sqlx::{postgres::PgRow, PgConnection, PgExecutor, PgPool, QueryBuilder, Row};
-use taler_api::{
- db::{history, page, BindHelper, IncomingType, TypeHelper},
- subject::{IncomingSubject, OutgoingSubject},
-};
-use taler_common::{
- api_params::{History, Page},
- api_wire::{
- IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest,
- TransferState, TransferStatus,
- },
- types::{amount::Amount, timestamp::Timestamp},
-};
-use tokio::sync::watch::{Receiver, Sender};
-
-use crate::{constant::CURRENCY, MagnetPayto};
-
-pub async fn notification_listener(
- pool: PgPool,
- in_channel: Sender<i64>,
- taler_in_channel: Sender<i64>,
- out_channel: Sender<i64>,
- taler_out_channel: Sender<i64>,
-) -> sqlx::Result<()> {
- taler_api::notification::notification_listener!(&pool,
- "tx_in" => (row_id: i64) {
- in_channel.send_replace(row_id);
- },
- "taler_in" => (row_id: i64) {
- taler_in_channel.send_replace(row_id);
- },
- "tx_out" => (row_id: i64) {
- out_channel.send_replace(row_id);
- },
- "taler_out" => (row_id: i64) {
- taler_out_channel.send_replace(row_id);
- }
- )
-}
-
-#[derive(Debug, Clone)]
-pub struct TxIn {
- pub code: u64,
- pub amount: Amount,
- pub subject: String,
- pub debtor: MagnetPayto,
- pub timestamp: Timestamp,
-}
-
-impl Display for TxIn {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let TxIn {
- code,
- amount,
- subject,
- debtor,
- timestamp,
- } = self;
- write!(f, "{timestamp} {amount} {code} {debtor} '{subject}'")
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct TxOut {
- pub code: u64,
- pub amount: Amount,
- pub subject: String,
- pub creditor: MagnetPayto,
- pub timestamp: Timestamp,
-}
-
-impl Display for TxOut {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let TxOut {
- code,
- amount,
- subject,
- creditor,
- timestamp,
- } = self;
- write!(f, "{timestamp} {amount} {code} {creditor} '{subject}'")
- }
-}
-
-#[derive(Debug, Clone)]
-pub struct TxInAdmin {
- pub amount: Amount,
- pub subject: String,
- pub debtor: MagnetPayto,
- pub timestamp: Timestamp,
- pub metadata: IncomingSubject,
-}
-
-#[derive(Debug, PartialEq, Eq)]
-pub struct RegisteredTx {
- pub new: bool,
- pub row_id: u64,
- pub timestamp: Timestamp,
-}
-
-#[derive(Debug, PartialEq, Eq)]
-pub enum AddIncomingResult {
- Success(RegisteredTx),
- ReservePubReuse,
-}
-
-#[derive(Debug, PartialEq, Eq)]
-pub struct Initiated {
- pub id: u64,
- pub amount: Amount,
- pub subject: String,
- pub creditor: MagnetPayto,
-}
-
-impl Display for Initiated {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(
- f,
- "{} {} {} '{}'",
- self.id, self.amount, self.creditor, self.subject
- )
- }
-}
-
-pub async fn db_init(db: &PgPool, reset: bool) -> sqlx::Result<()> {
- let mut tx = db.begin().await?;
- if reset {
- sqlx::raw_sql("DROP SCHEMA public CASCADE;CREATE SCHEMA public;")
- .execute(&mut *tx)
- .await?;
- }
- // TODO migrations
- sqlx::raw_sql(include_str!("../db/schema.sql"))
- .execute(&mut *tx)
- .await?;
- tx.commit().await?;
- Ok(())
-}
-
-pub async fn register_tx_in_admin(db: &PgPool, tx: &TxInAdmin) -> sqlx::Result<AddIncomingResult> {
- sqlx::query(
- "
- SELECT out_reserve_pub_reuse, out_new, out_tx_row_id, out_timestamp
- FROM register_tx_in(NULL, ($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8)
- ",
- )
- .bind_amount(&tx.amount)
- .bind(&tx.subject)
- .bind(&tx.debtor.number)
- .bind(&tx.debtor.name)
- .bind_timestamp(&tx.timestamp)
- .bind(tx.metadata.ty())
- .bind(tx.metadata.key())
- .try_map(|r: PgRow| {
- Ok(if r.try_get(0)? {
- AddIncomingResult::ReservePubReuse
- } else {
- AddIncomingResult::Success(RegisteredTx {
- new: r.try_get(1)?,
- row_id: r.try_get_u64(2)?,
- timestamp: r.try_get_timestamp(3)?,
- })
- })
- })
- .fetch_one(db)
- .await
-}
-
-pub async fn register_tx_in(
- db: &mut PgConnection,
- tx: &TxIn,
- subject: &Option<IncomingSubject>,
-) -> sqlx::Result<AddIncomingResult> {
- sqlx::query(
- "
- SELECT out_reserve_pub_reuse, out_new, out_tx_row_id, out_timestamp
- FROM register_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9)
- ",
- )
- .bind(tx.code as i64)
- .bind_amount(&tx.amount)
- .bind(&tx.subject)
- .bind(&tx.debtor.number)
- .bind(&tx.debtor.name)
- .bind_timestamp(&tx.timestamp)
- .bind(subject.as_ref().map(|it| it.ty()))
- .bind(subject.as_ref().map(|it| it.key()))
- .try_map(|r: PgRow| {
- Ok(if r.try_get(0)? {
- AddIncomingResult::ReservePubReuse
- } else {
- AddIncomingResult::Success(RegisteredTx {
- new: r.try_get(1)?,
- row_id: r.try_get_u64(2)?,
- timestamp: r.try_get_timestamp(3)?,
- })
- })
- })
- .fetch_one(db)
- .await
-}
-
-pub async fn register_tx_out(
- db: &mut PgConnection,
- tx: &TxOut,
- subject: &Option<OutgoingSubject>,
-) -> sqlx::Result<RegisteredTx> {
- sqlx::query(
- "
- SELECT out_new, out_tx_row_id, out_timestamp
- FROM register_tx_out($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9)
- ",
- )
- .bind(tx.code as i64)
- .bind_amount(&tx.amount)
- .bind(&tx.subject)
- .bind(&tx.creditor.number)
- .bind(&tx.creditor.name)
- .bind_timestamp(&tx.timestamp)
- .bind(subject.as_ref().map(|it| it.0.as_ref()))
- .bind(subject.as_ref().map(|it| it.1.as_str()))
- .try_map(|r: PgRow| {
- Ok(RegisteredTx {
- new: r.try_get(0)?,
- row_id: r.try_get_u64(1)?,
- timestamp: r.try_get_timestamp(2)?,
- })
- })
- .fetch_one(db)
- .await
-}
-
-#[derive(Debug, PartialEq, Eq)]
-pub enum TransferResult {
- Success { id: u64, timestamp: Timestamp },
- RequestUidReuse,
- WtidReuse,
-}
-
-pub async fn make_transfer<'a>(
- db: impl PgExecutor<'a>,
- req: &TransferRequest,
- creditor: &MagnetPayto,
- timestamp: &Timestamp,
-) -> sqlx::Result<TransferResult> {
- let subject = format!("{} {}", req.wtid, req.exchange_base_url);
- sqlx::query(
- "
- SELECT out_request_uid_reuse, out_wtid_reuse, out_tx_row_id, out_timestamp
- FROM taler_transfer($1, $2, $3, ($4, $5)::taler_amount, $6, $7, $8, $9)
- ",
- )
- .bind(req.request_uid.as_ref())
- .bind(req.wtid.as_ref())
- .bind(&subject)
- .bind_amount(&req.amount)
- .bind(req.exchange_base_url.as_str())
- .bind(&creditor.number)
- .bind(&creditor.name)
- .bind_timestamp(timestamp)
- .try_map(|r: PgRow| {
- Ok(if r.try_get(0)? {
- TransferResult::RequestUidReuse
- } else if r.try_get(1)? {
- TransferResult::WtidReuse
- } else {
- TransferResult::Success {
- id: r.try_get_u64(2)?,
- timestamp: r.try_get_timestamp(3)?,
- }
- })
- })
- .fetch_one(db)
- .await
-}
-
-pub async fn transfer_page<'a>(
- db: impl PgExecutor<'a>,
- status: &Option<TransferState>,
- params: &Page,
-) -> sqlx::Result<Vec<TransferListStatus>> {
- page(
- db,
- "initiated_id",
- params,
- || {
- let mut builder = QueryBuilder::new(
- "
- SELECT
- initiated_id,
- status,
- (amount).val as amount_val,
- (amount).frac as amount_frac,
- credit_account,
- credit_name,
- created
- FROM transfer
- JOIN initiated USING (initiated_id)
- WHERE
- ",
- );
- if let Some(status) = status {
- builder.push(" status = ").push_bind(status).push(" AND ");
- }
- builder
- },
- |r: PgRow| {
- Ok(TransferListStatus {
- row_id: r.try_get_safeu64(0)?,
- status: r.try_get(1)?,
- amount: r.try_get_amount_i(2, CURRENCY)?,
- credit_account: MagnetPayto {
- number: r.try_get(4)?,
- name: r.try_get(4)?,
- }
- .as_payto(),
- timestamp: r.try_get_timestamp(6)?,
- })
- },
- )
- .await
-}
-
-pub async fn outgoing_history(
- db: &PgPool,
- params: &History,
- listen: impl FnOnce() -> Receiver<i64>,
-) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
- history(
- db,
- "tx_out_id",
- params,
- listen,
- || {
- QueryBuilder::new(
- "
- SELECT
- tx_out_id,
- (amount).val as amount_val,
- (amount).frac as amount_frac,
- credit_account,
- credit_name,
- created,
- exchange_base_url,
- wtid
- FROM taler_out
- JOIN tx_out USING (tx_out_id)
- WHERE
- ",
- )
- },
- |r: PgRow| {
- Ok(OutgoingBankTransaction {
- row_id: r.try_get_safeu64(0)?,
- amount: r.try_get_amount_i(1, CURRENCY)?,
- credit_account: MagnetPayto {
- number: r.try_get(3)?,
- name: r.try_get(4)?,
- }
- .as_payto(),
- date: r.try_get_timestamp(5)?,
- exchange_base_url: r.try_get_url(6)?,
- wtid: r.try_get_base32(7)?,
- })
- },
- )
- .await
-}
-
-pub async fn incoming_history(
- db: &PgPool,
- params: &History,
- listen: impl FnOnce() -> Receiver<i64>,
-) -> sqlx::Result<Vec<IncomingBankTransaction>> {
- history(
- db,
- "tx_in_id",
- params,
- listen,
- || {
- QueryBuilder::new(
- "
- SELECT
- type,
- tx_in_id,
- (amount).val as amount_val,
- (amount).frac as amount_frac,
- debit_account,
- debit_name,
- created,
- metadata
- FROM taler_in
- JOIN tx_in USING (tx_in_id)
- WHERE
- ",
- )
- },
- |r: PgRow| {
- Ok(match r.try_get(0)? {
- IncomingType::reserve => IncomingBankTransaction::Reserve {
- row_id: r.try_get_safeu64(1)?,
- amount: r.try_get_amount_i(2, CURRENCY)?,
- debit_account: MagnetPayto {
- number: r.try_get(4)?,
- name: r.try_get(5)?,
- }
- .as_payto(),
- date: r.try_get_timestamp(6)?,
- reserve_pub: r.try_get_base32(7)?,
- },
- IncomingType::kyc => IncomingBankTransaction::Kyc {
- row_id: r.try_get_safeu64(1)?,
- amount: r.try_get_amount_i(2, CURRENCY)?,
- debit_account: MagnetPayto {
- number: r.try_get(4)?,
- name: r.try_get(5)?,
- }
- .as_payto(),
- date: r.try_get_timestamp(6)?,
- account_pub: r.try_get_base32(7)?,
- },
- IncomingType::wad => {
- unimplemented!("WAD is not yet supported")
- }
- })
- },
- )
- .await
-}
-
-pub async fn transfer_by_id<'a>(
- db: impl PgExecutor<'a>,
- id: u64,
-) -> sqlx::Result<Option<TransferStatus>> {
- sqlx::query(
- "
- SELECT
- status,
- status_msg,
- (amount).val as amount_val,
- (amount).frac as amount_frac,
- exchange_base_url,
- wtid,
- credit_account,
- credit_name,
- created
- FROM transfer
- JOIN initiated USING (initiated_id)
- WHERE initiated_id = $1
- ",
- )
- .bind(id as i64)
- .try_map(|r: PgRow| {
- Ok(TransferStatus {
- status: r.try_get(0)?,
- status_msg: r.try_get(1)?,
- amount: r.try_get_amount_i(2, CURRENCY)?,
- origin_exchange_url: r.try_get(4)?,
- wtid: r.try_get_base32(5)?,
- credit_account: MagnetPayto {
- number: r.try_get(6)?,
- name: r.try_get(7)?,
- }
- .as_payto(),
- timestamp: r.try_get_timestamp(8)?,
- })
- })
- .fetch_optional(db)
- .await
-}
-
-pub async fn pending_batch<'a>(
- db: impl PgExecutor<'a>,
- start: &Timestamp,
-) -> sqlx::Result<Vec<Initiated>> {
- sqlx::query(
- "
- SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name
- FROM initiated
- WHERE magnet_code IS NULL AND (last_submitted IS NULL OR last_submitted < $1)
- LIMIT 100
- ",
- )
- .bind_timestamp(start)
- .try_map(|r: PgRow| {
- Ok(Initiated {
- id: r.try_get_u64(0)?,
- amount: r.try_get_amount_i(1, CURRENCY)?,
- subject: r.try_get(3)?,
- creditor: MagnetPayto {
- number: r.try_get(4)?,
- name: r.try_get(5)?,
- },
- })
- })
- .fetch_all(db)
- .await
-}
-
-/** Update status of a sucessfull submitted initiated transaction */
-pub async fn initiated_submit_success<'a>(
- db: impl PgExecutor<'a>,
- id: u64,
- timestamp: &Timestamp,
- magnet_code: u64,
-) -> sqlx::Result<()> {
- sqlx::query(
- "
- UPDATE initiated
- SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2
- WHERE initiated_id=$3
- "
- ).bind_timestamp(timestamp)
- .bind(magnet_code as i64)
- .bind(id as i64)
- .execute(db).await?;
- Ok(())
-}
-
-/** Update status of a sucessfull submitted initiated transaction */
-pub async fn initiated_submit_failure<'a>(
- db: impl PgExecutor<'a>,
- id: u64,
- timestamp: &Timestamp,
- msg: &str,
-) -> sqlx::Result<()> {
- sqlx::query(
- "
- UPDATE initiated
- SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, status_msg=$2
- WHERE initiated_id=$3
- ",
- )
- .bind_timestamp(timestamp)
- .bind(msg)
- .bind(id as i64)
- .execute(db)
- .await?;
- Ok(())
-}
-
-#[cfg(test)]
-mod test {
-
- use sqlx::{postgres::PgRow, PgConnection, PgPool};
- use taler_api::{
- db::TypeHelper,
- subject::{IncomingSubject, OutgoingSubject},
- };
- use taler_common::{
- api_common::{EddsaPublicKey, HashCode, ShortHashCode},
- api_params::{History, Page},
- api_wire::TransferRequest,
- types::{amount::amount, payto::payto, timestamp::Timestamp, url},
- };
- use tokio::sync::watch::Receiver;
-
- use crate::{
- constant::CURRENCY,
- db::{
- self, make_transfer, register_tx_in, register_tx_in_admin, register_tx_out,
- AddIncomingResult, RegisteredTx, TransferResult, TxIn, TxOut,
- },
- MagnetPayto,
- };
-
- use super::TxInAdmin;
-
- fn fake_listen<T: Default>() -> Receiver<T> {
- tokio::sync::watch::channel(T::default()).1
- }
-
- async fn setup() -> (PgConnection, PgPool) {
- let pool = taler_test_utils::db_test_setup().await;
- db::db_init(&pool, false).await.expect("dbinit");
- let conn = pool.acquire().await.expect("aquire conn").leak();
- (conn, pool)
- }
-
- #[tokio::test]
- async fn tx_in() {
- let (mut db, pool) = setup().await;
-
- async fn routine(
- db: &mut PgConnection,
- first: &Option<IncomingSubject>,
- second: &Option<IncomingSubject>,
- ) {
- let (id, code) =
- sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in")
- .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
- .fetch_one(&mut *db)
- .await
- .unwrap();
- let tx = TxIn {
- code: code,
- amount: amount("EUR:10"),
- subject: "subject".to_owned(),
- debtor: MagnetPayto {
- number: "number".to_owned(),
- name: "name".to_owned(),
- },
- timestamp: Timestamp::now_stable(),
- };
- // Insert
- assert_eq!(
- register_tx_in(db, &tx, &first)
- .await
- .expect("register tx in"),
- AddIncomingResult::Success(RegisteredTx {
- new: true,
- row_id: id,
- timestamp: tx.timestamp
- })
- );
- // Idempotent
- assert_eq!(
- register_tx_in(
- db,
- &TxIn {
- timestamp: Timestamp::now(),
- ..tx.clone()
- },
- &first
- )
- .await
- .expect("register tx in"),
- AddIncomingResult::Success(RegisteredTx {
- new: false,
- row_id: id,
- timestamp: tx.timestamp
- })
- );
- // Many
- assert_eq!(
- register_tx_in(
- db,
- &TxIn {
- code: code + 1,
- ..tx
- },
- &second
- )
- .await
- .expect("register tx in"),
- AddIncomingResult::Success(RegisteredTx {
- new: true,
- row_id: id + 1,
- timestamp: tx.timestamp
- })
- );
- }
-
- // Empty db
- assert_eq!(
- db::incoming_history(&pool, &History::default(), fake_listen)
- .await
- .unwrap(),
- Vec::new()
- );
-
- // Regular transaction
- routine(&mut db, &None, &None).await;
-
- // Reserve transaction
- routine(
- &mut db,
- &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
- &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
- )
- .await;
-
- // Kyc transaction
- routine(
- &mut db,
- &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
- &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
- )
- .await;
-
- // History
- assert_eq!(
- db::incoming_history(&pool, &History::default(), fake_listen)
- .await
- .unwrap()
- .len(),
- 4
- );
- }
-
- #[tokio::test]
- async fn tx_in_admin() {
- let (_, pool) = setup().await;
-
- // Empty db
- assert_eq!(
- db::incoming_history(&pool, &History::default(), fake_listen)
- .await
- .unwrap(),
- Vec::new()
- );
-
- let tx = TxInAdmin {
- amount: amount("EUR:10"),
- subject: "subject".to_owned(),
- debtor: MagnetPayto {
- number: "number".to_owned(),
- name: "name".to_owned(),
- },
- timestamp: Timestamp::now_stable(),
- metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
- };
- // Insert
- assert_eq!(
- register_tx_in_admin(&pool, &tx)
- .await
- .expect("register tx in"),
- AddIncomingResult::Success(RegisteredTx {
- new: true,
- row_id: 1,
- timestamp: tx.timestamp
- })
- );
- // Idempotent
- assert_eq!(
- register_tx_in_admin(
- &pool,
- &TxInAdmin {
- timestamp: Timestamp::now(),
- ..tx.clone()
- }
- )
- .await
- .expect("register tx in"),
- AddIncomingResult::Success(RegisteredTx {
- new: false,
- row_id: 1,
- timestamp: tx.timestamp
- })
- );
- // Many
- assert_eq!(
- register_tx_in_admin(
- &pool,
- &TxInAdmin {
- subject: "Other".to_owned(),
- metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
- ..tx.clone()
- }
- )
- .await
- .expect("register tx in"),
- AddIncomingResult::Success(RegisteredTx {
- new: true,
- row_id: 2,
- timestamp: tx.timestamp
- })
- );
-
- // History
- assert_eq!(
- db::incoming_history(&pool, &History::default(), fake_listen)
- .await
- .unwrap()
- .len(),
- 2
- );
- }
-
- #[tokio::test]
- async fn tx_out() {
- let (mut db, pool) = setup().await;
-
- async fn routine(
- db: &mut PgConnection,
- first: &Option<OutgoingSubject>,
- second: &Option<OutgoingSubject>,
- ) {
- let (id, code) =
- sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out")
- .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
- .fetch_one(&mut *db)
- .await
- .unwrap();
- let tx = TxOut {
- code,
- amount: amount("EUR:10"),
- subject: "subject".to_owned(),
- creditor: MagnetPayto {
- number: "number".to_owned(),
- name: "name".to_owned(),
- },
- timestamp: Timestamp::now_stable(),
- };
- // Insert
- assert_eq!(
- register_tx_out(db, &tx, &first)
- .await
- .expect("register tx out"),
- RegisteredTx {
- new: true,
- row_id: id,
- timestamp: tx.timestamp
- }
- );
- // Idempotent
- assert_eq!(
- register_tx_out(
- db,
- &TxOut {
- timestamp: Timestamp::now(),
- ..tx.clone()
- },
- &first
- )
- .await
- .expect("register tx out"),
- RegisteredTx {
- new: false,
- row_id: id,
- timestamp: tx.timestamp
- }
- );
- // Many
- assert_eq!(
- register_tx_out(
- db,
- &TxOut {
- code: code + 1,
- ..tx.clone()
- },
- &second
- )
- .await
- .expect("register tx out"),
- RegisteredTx {
- new: true,
- row_id: id + 1,
- timestamp: tx.timestamp
- }
- );
- }
-
- // Empty db
- assert_eq!(
- db::outgoing_history(&pool, &History::default(), fake_listen)
- .await
- .unwrap(),
- Vec::new()
- );
-
- // Regular transaction
- routine(&mut db, &None, &None).await;
-
- // Talerable transaction
- routine(
- &mut db,
- &Some(OutgoingSubject(
- ShortHashCode::rand(),
- url("https://exchange.com"),
- )),
- &Some(OutgoingSubject(
- ShortHashCode::rand(),
- url("https://exchange.com"),
- )),
- )
- .await;
-
- // History
- assert_eq!(
- db::outgoing_history(&pool, &History::default(), fake_listen)
- .await
- .unwrap()
- .len(),
- 2
- );
- }
-
- #[tokio::test]
- async fn transfer() {
- let (mut db, _) = setup().await;
-
- // Empty db
- assert_eq!(db::transfer_by_id(&mut db, 0).await.unwrap(), None);
- assert_eq!(
- db::transfer_page(&mut db, &None, &Page::default())
- .await
- .unwrap(),
- Vec::new()
- );
-
- let req = TransferRequest {
- request_uid: HashCode::rand(),
- amount: amount("EUR:10"),
- exchange_base_url: url("https://exchange.test.com/"),
- wtid: ShortHashCode::rand(),
- credit_account: payto("payto://magnet-bank/todo"),
- };
- let payto = MagnetPayto {
- number: "number".to_owned(),
- name: "name".to_owned(),
- };
- let timestamp = Timestamp::now_stable();
- // Insert
- assert_eq!(
- make_transfer(&mut db, &req, &payto, ×tamp)
- .await
- .expect("transfer"),
- TransferResult::Success {
- id: 1,
- timestamp: timestamp
- }
- );
- // Idempotent
- assert_eq!(
- make_transfer(&mut db, &req, &payto, &Timestamp::now())
- .await
- .expect("transfer"),
- TransferResult::Success {
- id: 1,
- timestamp: timestamp
- }
- );
- // Request UID reuse
- assert_eq!(
- make_transfer(
- &mut db,
- &TransferRequest {
- wtid: ShortHashCode::rand(),
- ..req.clone()
- },
- &payto,
- &Timestamp::now()
- )
- .await
- .expect("transfer"),
- TransferResult::RequestUidReuse
- );
- // wtid reuse
- assert_eq!(
- make_transfer(
- &mut db,
- &TransferRequest {
- request_uid: HashCode::rand(),
- ..req.clone()
- },
- &payto,
- &Timestamp::now()
- )
- .await
- .expect("transfer"),
- TransferResult::WtidReuse
- );
- // Many
- assert_eq!(
- make_transfer(
- &mut db,
- &TransferRequest {
- request_uid: HashCode::rand(),
- wtid: ShortHashCode::rand(),
- ..req
- },
- &payto,
- ×tamp
- )
- .await
- .expect("transfer"),
- TransferResult::Success {
- id: 2,
- timestamp: timestamp
- }
- );
-
- // Get
- assert!(db::transfer_by_id(&mut db, 1).await.unwrap().is_some());
- assert!(db::transfer_by_id(&mut db, 2).await.unwrap().is_some());
- assert!(db::transfer_by_id(&mut db, 3).await.unwrap().is_none());
- assert_eq!(
- db::transfer_page(&mut db, &None, &Page::default())
- .await
- .unwrap()
- .len(),
- 2
- );
- }
-
- #[tokio::test]
- async fn status() {
- let (mut db, _) = setup().await;
-
- // Unknown transfer
- db::initiated_submit_failure(&mut db, 1, &Timestamp::now(), "msg")
- .await
- .unwrap();
- db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
- .await
- .unwrap();
- }
-
- #[tokio::test]
- async fn batch() {
- let (mut db, _) = setup().await;
- let start = Timestamp::now();
- let magnet_payto = MagnetPayto {
- number: "number".to_owned(),
- name: "name".to_owned(),
- };
-
- // Empty db
- let pendings = db::pending_batch(&mut db, &start)
- .await
- .expect("pending_batch");
- assert_eq!(pendings.len(), 0);
-
- // Some transfers
- for i in 0..3 {
- make_transfer(
- &mut db,
- &TransferRequest {
- request_uid: HashCode::rand(),
- amount: amount(format!("{CURRENCY}:{}", i + 1)),
- exchange_base_url: url("https://exchange.test.com/"),
- wtid: ShortHashCode::rand(),
- credit_account: payto("payto://magnet-bank/todo"),
- },
- &magnet_payto,
- &&Timestamp::now(),
- )
- .await
- .expect("transfer");
- }
- let pendings = db::pending_batch(&mut db, &start)
- .await
- .expect("pending_batch");
- assert_eq!(pendings.len(), 3);
-
- // Max 100 txs in batch
- for i in 0..100 {
- make_transfer(
- &mut db,
- &TransferRequest {
- request_uid: HashCode::rand(),
- amount: amount(format!("{CURRENCY}:{}", i + 1)),
- exchange_base_url: url("https://exchange.test.com/"),
- wtid: ShortHashCode::rand(),
- credit_account: payto("payto://magnet-bank/todo"),
- },
- &magnet_payto,
- &Timestamp::now(),
- )
- .await
- .expect("transfer");
- }
- let pendings = db::pending_batch(&mut db, &start)
- .await
- .expect("pending_batch");
- assert_eq!(pendings.len(), 100);
-
- // Skip uploaded
- for i in 0..=10 {
- db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
- .await
- .expect("status success");
- }
- let pendings = db::pending_batch(&mut db, &start)
- .await
- .expect("pending_batch");
- assert_eq!(pendings.len(), 93);
-
- // Skip tried since start
- for i in 0..=10 {
- db::initiated_submit_failure(&mut db, 10 + i, &Timestamp::now(), "failure")
- .await
- .expect("status failure");
- }
- let pendings = db::pending_batch(&mut db, &start)
- .await
- .expect("pending_batch");
- assert_eq!(pendings.len(), 83);
- let pendings = db::pending_batch(&mut db, &Timestamp::now())
- .await
- .expect("pending_batch");
- assert_eq!(pendings.len(), 93);
- }
-}
diff --git a/adapter/taler-magnet-bank-adapter/src/dev.rs b/adapter/taler-magnet-bank-adapter/src/dev.rs
@@ -1,147 +0,0 @@
-/*
- This file is part of TALER
- Copyright (C) 2025 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-
-use clap::ValueEnum;
-use jiff::Zoned;
-use taler_common::{
- config::Config,
- types::{
- amount::Amount,
- payto::{FullPayto, Payto},
- },
-};
-use tracing::info;
-
-use crate::{
- config::MagnetConfig,
- keys,
- magnet::{AuthClient, Direction},
- worker::{extract_tx_info, Tx},
- MagnetPayto,
-};
-
-#[derive(Debug, Clone, PartialEq, Eq, ValueEnum)]
-pub enum DirArg {
- #[value(alias("in"))]
- Incoming,
- #[value(alias("out"))]
- Outgoing,
- Both,
-}
-
-#[derive(clap::Subcommand, Debug)]
-pub enum DevCmd {
- /// Print account info
- Accounts,
- Tx {
- account: Payto,
- #[clap(long, short, value_enum, default_value_t = DirArg::Both)]
- direction: DirArg,
- },
- Transfer {
- #[clap(long)]
- debtor: Payto,
- #[clap(long)]
- creditor: Payto,
- #[clap(long)]
- amount: Amount,
- #[clap(long)]
- subject: String,
- },
-}
-
-pub async fn dev(cfg: Config, cmd: DevCmd) -> anyhow::Result<()> {
- let cfg = MagnetConfig::parse(&cfg)?;
- let keys = keys::load(&cfg)?;
- let client = reqwest::Client::new();
- let client = AuthClient::new(&client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token);
- match cmd {
- DevCmd::Accounts => {
- let res = client.list_accounts().await?;
- for partner in res.partners {
- for account in partner.bank_accounts {
- let payto = MagnetPayto {
- number: account.number,
- name: partner.partner.name.clone(),
- };
- info!("{} {} {payto}", account.code, account.currency.symbol);
- }
- }
- }
- DevCmd::Tx { account, direction } => {
- let account = MagnetPayto::try_from(&account)?;
- let dir = match direction {
- DirArg::Incoming => Direction::Incoming,
- DirArg::Outgoing => Direction::Outgoing,
- DirArg::Both => Direction::Both,
- };
- // Register incoming
- let mut next = None;
- loop {
- let page = client
- .page_tx(dir, 5, &account.number, &next, &None)
- .await?;
- next = page.next;
- for item in page.list {
- let tx = extract_tx_info(item.tx);
- match tx {
- Tx::In(tx_in) => info!("in {tx_in}"),
- Tx::Out(tx_out) => info!("out {tx_out}"),
- }
- }
- if next.is_none() {
- break;
- }
- }
- }
- DevCmd::Transfer {
- debtor,
- creditor,
- amount,
- subject,
- } => {
- let full: FullPayto = creditor.query()?;
- let debtor = MagnetPayto::try_from(&debtor)?;
- let creditor = MagnetPayto::try_from(&creditor)?;
- let debtor = client.account(&debtor.number).await?;
- let now = Zoned::now();
- let date = now.date();
-
- let init = client
- .init_tx(
- debtor.code,
- amount.val as f64,
- &subject,
- &date,
- &full.receiver_name,
- &creditor.number,
- )
- .await?
- .tx;
- client
- .sign_tx(
- &keys.signing_key,
- &debtor.number,
- init.code,
- init.amount,
- &date,
- &creditor.number,
- )
- .await?;
- }
- }
- Ok(())
-}
diff --git a/adapter/taler-magnet-bank-adapter/src/keys.rs b/adapter/taler-magnet-bank-adapter/src/keys.rs
@@ -1,147 +0,0 @@
-/*
- This file is part of TALER
- Copyright (C) 2025 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-
-use std::io::ErrorKind;
-
-use p256::ecdsa::SigningKey;
-use taler_common::{json_file, types::base32::Base32};
-use tracing::info;
-
-use crate::{
- config::MagnetConfig,
- magnet::{
- error::{ApiError, MagnetError},
- AuthClient, Token, TokenAuth,
- },
-};
-
-#[derive(Default, Debug, serde::Deserialize, serde::Serialize)]
-struct KeysFile {
- access_token: Option<Token>,
- signing_key: Option<Base32<32>>,
-}
-
-#[derive(Debug)]
-pub struct Keys {
- pub access_token: Token,
- pub signing_key: SigningKey,
-}
-
-pub fn load(cfg: &MagnetConfig) -> anyhow::Result<Keys> {
- // Load JSON file
- let file: KeysFile = match json_file::load(&cfg.keys_path) {
- Ok(file) => file,
- Err(e) => return Err(anyhow::anyhow!("Could not magnet keys: {e}")),
- };
-
- fn incomplete_err() -> anyhow::Error {
- anyhow::anyhow!("Missing magnet keys, run 'taler-magnet-bank-adapter setup' first")
- }
-
- // Check full
- let access_token = file.access_token.ok_or_else(incomplete_err)?;
- let signing_key = file.signing_key.ok_or_else(incomplete_err)?;
-
- // Load signing key
-
- let signing_key = SigningKey::from_slice(&*signing_key)?;
-
- Ok(Keys {
- access_token,
- signing_key,
- })
-}
-
-pub async fn setup(cfg: MagnetConfig, reset: bool) -> anyhow::Result<()> {
- if reset {
- if let Err(e) = std::fs::remove_file(&cfg.keys_path) {
- if e.kind() != ErrorKind::NotFound {
- Err(e)?;
- }
- }
- }
- let mut keys = match json_file::load(&cfg.keys_path) {
- Ok(existing) => existing,
- Err(e) if e.kind() == ErrorKind::NotFound => KeysFile::default(),
- Err(e) => Err(e)?,
- };
- let client = reqwest::Client::new();
- let client = AuthClient::new(&client, &cfg.api_url, &cfg.consumer);
-
- info!("Setup OAuth access token");
- if keys.access_token.is_none() {
- let token_request = client.token_request().await?;
-
- // TODO how to do it in a generic way ?
- // TODO Ask MagnetBank if they could support out-of-band configuration
- println!(
- "Login at {}?oauth_token={}",
- client.join("/NetBankOAuth/authtoken.xhtml"),
- token_request.key
- );
- let auth_url = passterm::prompt_password_tty(Some("Enter the result URL>"))?;
- let auth_url = reqwest::Url::parse(&auth_url)?;
- let token_auth: TokenAuth =
- serde_urlencoded::from_str(auth_url.query().unwrap_or_default())?;
- assert_eq!(token_request.key, token_auth.oauth_token);
-
- let access_token = client.token_access(&token_request, &token_auth).await?;
- keys.access_token = Some(access_token);
- json_file::persist(&cfg.keys_path, &keys)?;
- }
-
- let client = client.upgrade(keys.access_token.as_ref().unwrap());
-
- info!("Setup Strong Customer Authentication");
- // TODO find a proper way to check if SCA is required without trigerring SCA.GLOBAL_FEATURE_NOT_ENABLED
- let request = client.request_sms_code().await?;
- println!(
- "A SCA code have been sent through {} to {}",
- request.channel,
- request.sent_to.join(", ")
- );
- let sca_code = passterm::prompt_password_tty(Some("Enter the code>"))?;
- if let Err(e) = client.perform_sca(&sca_code).await {
- // Ignore error if SCA already performed
- if !matches!(e, ApiError::Magnet(MagnetError { ref short_message, .. }) if short_message == "TOKEN_SCA_HITELESITETT")
- {
- return Err(e.into());
- }
- }
-
- info!("Setup public key");
- // TODO find a proper way to check if a public key have been setup
- // TODO use the better from/to_array API in the next version of the crypto lib
- let signing_key = match keys.signing_key {
- Some(bytes) => SigningKey::from_slice(bytes.as_ref())?,
- None => {
- let rand = SigningKey::random(&mut rand_core::OsRng);
- let array: [u8; 32] = rand.to_bytes().as_slice().try_into().unwrap();
- keys.signing_key = Some(Base32::from(array));
- json_file::persist(&cfg.keys_path, &keys)?;
- rand
- }
- };
- if let Err(e) = client.upload_public_key(&signing_key).await {
- // Ignore error if public key already uploaded
- if !matches!(e, ApiError::Magnet(MagnetError { ref short_message, .. }) if short_message == "KULCS_MAR_HASZNALATBAN")
- {
- return Err(e.into());
- }
- }
-
- Ok(())
-}
diff --git a/adapter/taler-magnet-bank-adapter/src/lib.rs b/adapter/taler-magnet-bank-adapter/src/lib.rs
@@ -1,87 +0,0 @@
-/*
- This file is part of TALER
- Copyright (C) 2025 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-
-use taler_common::types::payto::{FullPayto, Payto, PaytoErr};
-
-pub mod config;
-pub mod constant;
-pub mod db;
-pub mod dev;
-pub mod keys;
-pub mod magnet;
-pub mod adapter;
-pub mod worker;
-pub mod failure_injection {
- pub fn fail_point(_name: &'static str) {
- // TODO inject failures for error handling tests
- }
-}
-
-#[derive(Debug, Clone, PartialEq, Eq)]
-pub struct MagnetPayto {
- pub number: String,
- pub name: String,
-}
-
-impl MagnetPayto {
- pub fn as_payto(&self) -> Payto {
- Payto::from_parts(
- format_args!("{MAGNET_BANK}/{}", self.number),
- [("receiver-name", &self.name)],
- )
- }
-}
-
-impl std::fmt::Display for MagnetPayto {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- self.as_payto().fmt(f)
- }
-}
-#[derive(Debug, thiserror::Error)]
-pub enum MagnetPaytoErr {
- #[error("missing Magnet Bank account number in path")]
- MissingAccount,
-}
-
-const MAGNET_BANK: &str = "magnet-bank";
-
-impl TryFrom<&Payto> for MagnetPayto {
- type Error = PaytoErr;
-
- fn try_from(value: &Payto) -> Result<Self, Self::Error> {
- let url = value.as_ref();
- if url.domain() != Some(MAGNET_BANK) {
- return Err(PaytoErr::UnsupportedKind(
- MAGNET_BANK,
- url.domain().unwrap_or_default().to_owned(),
- ));
- }
- let Some(mut segments) = url.path_segments() else {
- return Err(PaytoErr::custom(MagnetPaytoErr::MissingAccount));
- };
- let Some(account) = segments.next() else {
- return Err(PaytoErr::custom(MagnetPaytoErr::MissingAccount));
- };
- if segments.next().is_some() {
- return Err(PaytoErr::TooLong(MAGNET_BANK));
- }
- let full: FullPayto = value.query()?;
- Ok(Self {
- number: account.to_owned(),
- name: full.receiver_name,
- })
- }
-}
diff --git a/adapter/taler-magnet-bank-adapter/src/main.rs b/adapter/taler-magnet-bank-adapter/src/main.rs
@@ -1,131 +0,0 @@
-/*
- This file is part of TALER
- Copyright (C) 2025 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-
-use std::sync::Arc;
-
-use clap::Parser;
-use taler_magnet_bank_adapter::{
- adapter::MagnetAdapter,
- config::{DbConfig, MagnetConfig, WireGatewayConfig},
- db,
- dev::{self, DevCmd},
- keys,
- magnet::AuthClient,
- worker::Worker,
- MagnetPayto,
-};
-use sqlx::PgPool;
-use taler_api::api::TalerApi;
-use taler_common::{
- cli::ConfigCmd,
- config::{parser::ConfigSource, Config},
- taler_main,
- types::payto::{payto, Payto},
- CommonArgs,
-};
-
-#[derive(clap::Parser, Debug)]
-#[command(version, about, long_about = None)]
-struct Args {
- #[clap(flatten)]
- common: CommonArgs,
-
- #[command(subcommand)]
- cmd: Command,
-}
-
-#[derive(clap::Subcommand, Debug)]
-enum Command {
- /// Setup taler-magnet-bank-adapter auth token and account settings for Wire Gateway use
- Setup {
- #[clap(long, short)]
- reset: bool,
- },
- /// Initialize taler-magnet-bank-adapter database
- Dbinit {
- #[clap(long, short)]
- reset: bool,
- },
- /// Run taler-magnet-bank-adapter HTTP server
- Serve,
- /// Run taler-magnet-bank-adapter worker
- Worker {
- // TODO account in config
- account: Payto,
- },
- #[command(subcommand)]
- Config(ConfigCmd),
- /// Hidden dev commands
- #[command(subcommand, hide(true))]
- Dev(DevCmd),
-}
-
-async fn app(args: Args, cfg: Config) -> anyhow::Result<()> {
- match args.cmd {
- Command::Setup { reset } => {
- let cfg = MagnetConfig::parse(&cfg)?;
- keys::setup(cfg, reset).await?
- }
- Command::Dbinit { reset } => {
- let db = DbConfig::parse(&cfg)?;
- let pool = PgPool::connect_with(db.cfg).await?;
- db::db_init(&pool, reset).await?;
- }
- Command::Serve => {
- let db = DbConfig::parse(&cfg)?;
- let pool = PgPool::connect_with(db.cfg).await?;
- let cfg = WireGatewayConfig::parse(&cfg)?;
- let gateway = MagnetAdapter::start(pool, payto("payto://magnet-bank/todo")).await;
- TalerApi::new()
- .wire_gateway(Arc::new(gateway), cfg.auth)
- .serve(cfg.serve, None)
- .await?;
- }
- Command::Worker { account } => {
- let db = DbConfig::parse(&cfg)?;
- let pool = PgPool::connect_with(db.cfg).await?;
- let cfg = MagnetConfig::parse(&cfg)?;
- let keys = keys::load(&cfg)?;
- let client = reqwest::Client::new();
- let client =
- AuthClient::new(&client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token);
- let account = MagnetPayto::try_from(&account)?;
- let account = client.account(&account.number).await?;
- let mut db = pool.acquire().await?.detach();
- // TODO run in loop and handle errors
- let mut worker = Worker {
- client: &client,
- db: &mut db,
- account_number: &account.number,
- account_code: account.code,
- key: &keys.signing_key,
- };
- worker.run().await?;
- }
- Command::Config(cfg_cmd) => cfg_cmd.run(cfg)?,
- Command::Dev(dev_cmd) => dev::dev(cfg, dev_cmd).await?,
- }
- Ok(())
-}
-
-fn main() {
- let args = Args::parse();
- taler_main(
- ConfigSource::new("taler-magnet-bank-adapter", "taler-magnet-bank-adapter", "taler-magnet-bank-adapter"),
- args.common.clone(),
- |cfg| async move { app(args, cfg).await },
- );
-}
diff --git a/adapter/taler-magnet-bank-adapter/tests/api.rs b/adapter/taler-magnet-bank-adapter/tests/api.rs
@@ -1,110 +0,0 @@
-/*
- This file is part of TALER
- Copyright (C) 2025 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-
-use std::sync::Arc;
-
-use sqlx::PgPool;
-use taler_api::{api::TalerApi, auth::AuthMethod, subject::OutgoingSubject};
-use taler_common::{
- api_common::ShortHashCode,
- api_wire::{OutgoingHistory, TransferState},
- types::{amount::amount, payto::payto, timestamp::Timestamp, url},
-};
-use taler_magnet_bank_adapter::{adapter::MagnetAdapter, db, MagnetPayto};
-use taler_test_utils::{
- axum_test::TestServer,
- db_test_setup,
- helpers::TestResponseHelper,
- routine::{admin_add_incoming_routine, routine_pagination, transfer_routine},
-};
-
-async fn setup() -> (TestServer, PgPool) {
- let pool = db_test_setup().await;
- db::db_init(&pool, false).await.unwrap();
- let gateway = MagnetAdapter::start(pool.clone(), payto("payto://magnet-bank/todo")).await;
- let api = TalerApi::new()
- .wire_gateway(Arc::new(gateway), AuthMethod::None)
- .finalize();
- let server = TestServer::new(api).unwrap();
-
- (server, pool)
-}
-
-#[tokio::test]
-async fn transfer() {
- let (server, _) = setup().await;
- transfer_routine(
- &server,
- TransferState::pending,
- &payto("payto://magnet-bank/account?receiver-name=John+Smith"),
- )
- .await;
-}
-
-#[tokio::test]
-async fn outgoing_history() {
- let (server, pool) = setup().await;
- server
- .get("/taler-wire-gateway/history/outgoing")
- .await
- .assert_no_content();
- routine_pagination::<OutgoingHistory, _>(
- &server,
- "/taler-wire-gateway/history/outgoing",
- |it| {
- it.outgoing_transactions
- .into_iter()
- .map(|it| *it.row_id as i64)
- .collect()
- },
- |_, i| {
- let acquire = pool.acquire();
- async move {
- let mut conn = acquire.await.unwrap();
- db::register_tx_out(
- &mut *conn,
- &db::TxOut {
- code: i as u64,
- amount: amount("EUR:10"),
- subject: "subject".to_owned(),
- creditor: MagnetPayto {
- number: "number".to_owned(),
- name: "name".to_owned(),
- },
- timestamp: Timestamp::now_stable(),
- },
- &Some(OutgoingSubject(
- ShortHashCode::rand(),
- url("https://exchange.test"),
- )),
- )
- .await
- .unwrap();
- }
- },
- )
- .await;
-}
-
-#[tokio::test]
-async fn admin_add_incoming() {
- let (server, _) = setup().await;
- admin_add_incoming_routine(
- &server,
- &payto("payto://magnet-bank/account?receiver-name=John+Smith"),
- )
- .await;
-}
diff --git a/common/taler-api/src/api.rs b/common/taler-api/src/api.rs
@@ -43,7 +43,7 @@ use crate::{
pub mod revenue;
pub mod wire;
-pub trait TalerComponent: Send + Sync + 'static {
+pub trait TalerApi: Send + Sync + 'static {
fn currency(&self) -> &str;
fn implementation(&self) -> Option<&str>;
fn check_currency(&self, amount: &Amount) -> ApiResult<()> {
@@ -62,11 +62,11 @@ pub trait TalerComponent: Send + Sync + 'static {
}
}
-pub struct TalerApi {
+pub struct TalerApiBuilder {
router: Router,
}
-impl TalerApi {
+impl TalerApiBuilder {
pub fn new() -> Self {
Self {
router: Router::new(),
diff --git a/common/taler-api/src/api/revenue.rs b/common/taler-api/src/api/revenue.rs
@@ -33,12 +33,12 @@ use crate::{
error::ApiResult,
};
-use super::TalerComponent;
+use super::TalerApi;
-pub trait Revenue: TalerComponent {
+pub trait Revenue: TalerApi {
fn history(
&self,
- history: History,
+ params: History,
) -> impl std::future::Future<Output = ApiResult<RevenueIncomingHistory>> + Send;
}
diff --git a/common/taler-api/src/api/wire.rs b/common/taler-api/src/api/wire.rs
@@ -39,9 +39,9 @@ use crate::{
json::Req,
};
-use super::TalerComponent;
+use super::TalerApi;
-pub trait WireGateway: TalerComponent {
+pub trait WireGateway: TalerApi {
fn transfer(
&self,
req: TransferRequest,
diff --git a/common/taler-api/tests/api.rs b/common/taler-api/tests/api.rs
@@ -28,7 +28,7 @@ use taler_test_utils::{
db_test_setup,
helpers::TestResponseHelper,
json,
- routine::{admin_add_incoming_routine, routine_pagination, transfer_routine},
+ routine::{admin_add_incoming_routine, revenue_routine, routine_pagination, transfer_routine},
};
mod common;
@@ -77,11 +77,6 @@ async fn transfer() {
#[tokio::test]
async fn outgoing_history() {
let (server, _) = setup().await;
- server
- .get("/taler-wire-gateway/history/outgoing")
- .await
- .assert_no_content();
-
routine_pagination::<OutgoingHistory, _>(
&server,
"/taler-wire-gateway/history/outgoing",
@@ -113,3 +108,9 @@ async fn admin_add_incoming() {
let (server, _) = setup().await;
admin_add_incoming_routine(&server, &payto("payto://test")).await;
}
+
+#[tokio::test]
+async fn revenue() {
+ let (server, _) = setup().await;
+ revenue_routine(&server, &payto("payto://test")).await;
+}
diff --git a/common/taler-api/tests/common/mod.rs b/common/taler-api/tests/common/mod.rs
@@ -19,7 +19,7 @@ use std::sync::Arc;
use db::notification_listener;
use sqlx::PgPool;
use taler_api::{
- api::{revenue::Revenue, wire::WireGateway, TalerApi, TalerComponent},
+ api::{revenue::Revenue, wire::WireGateway, TalerApi, TalerApiBuilder},
auth::AuthMethod,
db::IncomingType,
error::{failure, ApiResult},
@@ -40,14 +40,14 @@ use tokio::sync::watch::Sender;
pub mod db;
/// Taler API implementation for tests
-pub struct TestAdapter {
+pub struct TestApi {
currency: String,
pool: PgPool,
outgoing_channel: Sender<i64>,
incoming_channel: Sender<i64>,
}
-impl TalerComponent for TestAdapter {
+impl TalerApi for TestApi {
fn currency(&self) -> &str {
&self.currency
}
@@ -57,7 +57,7 @@ impl TalerComponent for TestAdapter {
}
}
-impl WireGateway for TestAdapter {
+impl WireGateway for TestApi {
async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> {
let result = db::transfer(&self.pool, req).await?;
match result {
@@ -152,9 +152,9 @@ impl WireGateway for TestAdapter {
}
}
-impl Revenue for TestAdapter {
- async fn history(&self, history: History) -> ApiResult<RevenueIncomingHistory> {
- let txs = db::revenue_history(&self.pool, &history, &self.currency, || {
+impl Revenue for TestApi {
+ async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> {
+ let txs = db::revenue_history(&self.pool, ¶ms, &self.currency, || {
self.incoming_channel.subscribe()
})
.await?;
@@ -165,7 +165,7 @@ impl Revenue for TestAdapter {
}
}
-pub async fn test_api(pool: PgPool, currency: String) -> TalerApi {
+pub async fn test_api(pool: PgPool, currency: String) -> TalerApiBuilder {
// Reset db
sqlx::raw_sql(include_str!("../schema.sql"))
.execute(&pool)
@@ -173,7 +173,7 @@ pub async fn test_api(pool: PgPool, currency: String) -> TalerApi {
.unwrap();
let outgoing_channel = Sender::new(0);
let incoming_channel = Sender::new(0);
- let wg = TestAdapter {
+ let wg = TestApi {
currency,
pool: pool.clone(),
outgoing_channel: outgoing_channel.clone(),
@@ -185,7 +185,7 @@ pub async fn test_api(pool: PgPool, currency: String) -> TalerApi {
incoming_channel,
));
let state = Arc::new(wg);
- TalerApi::new()
+ TalerApiBuilder::new()
.wire_gateway(state.clone(), AuthMethod::None)
.revenue(state, AuthMethod::None)
}
diff --git a/common/taler-api/tests/schema.sql b/common/taler-api/tests/schema.sql
@@ -46,6 +46,7 @@ CREATE TYPE incoming_type AS ENUM
CREATE TABLE incoming_transactions (
incoming_transaction_id INT8 PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY
,amount taler_amount NOT NULL
+ ,subject TEXT NOT NULL
,debit_payto TEXT NOT NULL
,creation_time INT8 NOT NULL
,type incoming_type NOT NULL
@@ -143,6 +144,7 @@ INSERT INTO incoming_transactions (
amount,
debit_payto,
creation_time,
+ subject,
type,
metadata,
origin_exchange_url
@@ -150,6 +152,7 @@ INSERT INTO incoming_transactions (
in_amount,
in_debit_payto,
in_timestamp,
+ in_subject,
in_type,
in_key,
NULL
diff --git a/common/taler-test-utils/src/routine.rs b/common/taler-test-utils/src/routine.rs
@@ -29,6 +29,7 @@ use taler_api::db::IncomingType;
use taler_common::{
api_common::{EddsaPublicKey, HashCode, ShortHashCode},
api_params::PageParams,
+ api_revenue::RevenueIncomingHistory,
api_wire::{
IncomingBankTransaction, IncomingHistory, TransferList, TransferResponse, TransferState,
TransferStatus,
@@ -489,15 +490,55 @@ async fn add_incoming_routine(
.assert_error(ErrorCode::GENERIC_JSON_INVALID);
}
+/// Test standard behavior of the revenue endpoints
+pub async fn revenue_routine(server: &TestServer, debit_acount: &Payto) {
+ let currency = &get_currency(server).await;
+
+ routine_history(
+ server,
+ "/taler-revenue/history",
+ |it: RevenueIncomingHistory| {
+ it.incoming_transactions
+ .into_iter()
+ .map(|it| *it.row_id as i64)
+ .collect()
+ },
+ 2,
+ |server, i| async move {
+ if i % 2 == 0 {
+ server
+ .post("/taler-wire-gateway/admin/add-incoming")
+ .json(&json!({
+ "amount": format!("{currency}:0.0{i}"),
+ "reserve_pub": EddsaPublicKey::rand(),
+ "debit_account": debit_acount,
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ } else {
+ server
+ .post("/taler-wire-gateway/admin/add-kycauth")
+ .json(&json!({
+ "amount": format!("{currency}:0.0{i}"),
+ "account_pub": EddsaPublicKey::rand(),
+ "debit_account": debit_acount,
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ }
+ },
+ 0,
+ |_, _| async move {},
+ )
+ .await;
+}
+
/// Test standard behavior of the admin add incoming endpoints
pub async fn admin_add_incoming_routine(server: &TestServer, debit_acount: &Payto) {
let currency = &get_currency(server).await;
// History
- server
- .get("/taler-wire-gateway/history/incoming")
- .await
- .assert_no_content();
+ // TODO check non taler some are ignored
routine_history(
server,
"/taler-wire-gateway/history/incoming",
diff --git a/taler-magnet-bank-adapter.conf b/taler-magnet-bank-adapter.conf
@@ -1,23 +0,0 @@
-[taler-magnet-bank-adapter]
-API_URL = "https://mobil.magnetbank.hu"
-CONSUMER_KEY = "Consumer"
-CONSUMER_SECRET = "qikgjxc5y06tiil7qgrmh09l7rfi5a8e"
-KEYS_FILE = keys.json
-
-# How "taler-magnet-bank-adapter serve" serves its API, this can either be tcp or unix
-SERVE = tcp
-
-# Port on which the HTTP server listens, e.g. 9967. Only used if SERVE is tcp.
-PORT = 8080
-
-# Which IP address should we bind to? E.g. ``127.0.0.1`` or ``::1``for loopback. Only used if SERVE is tcp.
-BIND_TO = 0.0.0.0
-
-# Which unix domain path should we bind to? Only used if SERVE is unix.
-# UNIXPATH = libeufin-bank.sock
-
-# What should be the file access permissions for UNIXPATH? Only used if SERVE is unix.
-# UNIXPATH_MODE = 660
-
-[taler-magnet-bank-adapter-postgres]
-CONFIG = postgres:/taler-magnet-bank-adapter
-\ No newline at end of file
diff --git a/taler-magnet-bank.conf b/taler-magnet-bank.conf
@@ -0,0 +1,58 @@
+[magnet-bank]
+API_URL = "https://mobil.magnetbank.hu"
+CONSUMER_KEY = "Consumer"
+CONSUMER_SECRET = "qikgjxc5y06tiil7qgrmh09l7rfi5a8e"
+KEYS_FILE = keys.json
+
+# How "taler-magnet-bank-adapter serve" serves its API, this can either be tcp or unix
+SERVE = tcp
+
+# Port on which the HTTP server listens, e.g. 9967. Only used if SERVE is tcp.
+PORT = 8080
+
+# Which IP address should we bind to? E.g. ``127.0.0.1`` or ``::1``for loopback. Only used if SERVE is tcp.
+BIND_TO = 0.0.0.0
+
+# Which unix domain path should we bind to? Only used if SERVE is unix.
+# UNIXPATH = libeufin-bank.sock
+
+# What should be the file access permissions for UNIXPATH? Only used if SERVE is unix.
+# UNIXPATH_MODE = 660
+
+
+[magnet-bank-wire-gateway-api]
+# Whether to serve the Wire Gateway API
+ENABLED = NO
+
+# Authentication scheme, this can either can be basic, bearer or none.
+AUTH_METHOD = bearer
+
+# User name for basic authentication scheme
+# USERNAME =
+
+# Password for basic authentication scheme
+# PASSWORD =
+
+# Token for bearer authentication scheme
+TOKEN =
+
+
+[magnet-bank-revenue-api]
+# Whether to serve the Revenue API
+ENABLED = NO
+
+# Authentication scheme, this can either can be basic, bearer or none.
+AUTH_METHOD = bearer
+
+# User name for basic authentication scheme
+# USERNAME =
+
+# Password for basic authentication scheme
+#Â PASSWORD =
+
+# Token for bearer authentication scheme
+TOKEN =
+
+
+[magnet-bank-db-postgres]
+CONFIG = postgres:/taler-magnet-bank-adapter
+\ No newline at end of file
diff --git a/taler-magnet-bank/Cargo.toml b/taler-magnet-bank/Cargo.toml
@@ -0,0 +1,40 @@
+[package]
+name = "taler-magnet-bank"
+version = "0.1.0"
+edition = "2021"
+
+[dependencies]
+rand_core = { version = "0.6.4" }
+reqwest = { version = "0.12", default-features = false, features = [
+ "json",
+ "native-tls",
+] }
+hmac = "0.12"
+sha1 = "0.10"
+p256 = { version = "0.13.2", features = ["alloc", "ecdsa"] }
+spki = "0.7.3"
+base64 = "0.22"
+form_urlencoded = "1.2"
+percent-encoding = "2.3"
+passterm = "2.0"
+sqlx = { workspace = true, features = [
+ "postgres",
+ "runtime-tokio-native-tls",
+ "tls-native-tls",
+] }
+serde_json = { workspace = true, features = ["raw_value"] }
+jiff = { workspace = true, features = ["serde"] }
+taler-common.workspace = true
+taler-api.workspace = true
+clap.workspace = true
+serde.workspace = true
+serde_path_to_error.workspace = true
+serde_urlencoded.workspace = true
+thiserror.workspace = true
+tracing.workspace = true
+tracing-subscriber.workspace = true
+tokio.workspace = true
+anyhow.workspace = true
+
+[dev-dependencies]
+taler-test-utils.workspace = true
diff --git a/adapter/taler-magnet-bank-adapter/db/schema.sql b/taler-magnet-bank/db/schema.sql
diff --git a/taler-magnet-bank/src/adapter.rs b/taler-magnet-bank/src/adapter.rs
@@ -0,0 +1,202 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use taler_api::{
+ api::{revenue::Revenue, wire::WireGateway, TalerApi},
+ error::{failure, ApiResult},
+ subject::IncomingSubject,
+};
+use taler_common::{
+ api_common::{safe_u64, SafeU64},
+ api_params::{History, Page},
+ api_revenue::RevenueIncomingHistory,
+ api_wire::{
+ AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse,
+ IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse,
+ TransferState, TransferStatus,
+ },
+ error_code::ErrorCode,
+ types::{payto::Payto, timestamp::Timestamp},
+};
+use tokio::sync::watch::Sender;
+
+use crate::{
+ constant::CURRENCY,
+ db::{self, AddIncomingResult, TxInAdmin},
+ MagnetPayto,
+};
+
+pub struct MagnetApi {
+ pub pool: sqlx::PgPool,
+ pub payto: Payto,
+ pub in_channel: Sender<i64>,
+ pub taler_in_channel: Sender<i64>,
+ pub out_channel: Sender<i64>,
+ pub taler_out_channel: Sender<i64>,
+}
+
+impl MagnetApi {
+ pub async fn start(pool: sqlx::PgPool, payto: Payto) -> Self {
+ let in_channel = Sender::new(0);
+ let taler_in_channel = Sender::new(0);
+ let out_channel = Sender::new(0);
+ let taler_out_channel = Sender::new(0);
+ let tmp = Self {
+ pool: pool.clone(),
+ payto,
+ in_channel: in_channel.clone(),
+ taler_in_channel: taler_in_channel.clone(),
+ out_channel: out_channel.clone(),
+ taler_out_channel: taler_out_channel.clone(),
+ };
+ tokio::spawn(db::notification_listener(
+ pool,
+ in_channel,
+ taler_in_channel,
+ out_channel,
+ taler_out_channel,
+ ));
+ tmp
+ }
+}
+
+impl TalerApi for MagnetApi {
+ fn currency(&self) -> &str {
+ CURRENCY
+ }
+
+ fn implementation(&self) -> Option<&str> {
+ Some("taler-magnet-bank")
+ }
+}
+
+impl WireGateway for MagnetApi {
+ async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> {
+ let creditor = MagnetPayto::try_from(&req.credit_account)?;
+ let result = db::make_transfer(&self.pool, &req, &creditor, &Timestamp::now()).await?;
+ match result {
+ db::TransferResult::Success { id, timestamp } => Ok(TransferResponse {
+ timestamp,
+ row_id: SafeU64::try_from(id).unwrap(),
+ }),
+ db::TransferResult::RequestUidReuse => Err(failure(
+ ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED,
+ "request_uid used already",
+ )),
+ db::TransferResult::WtidReuse => unimplemented!(),
+ }
+ }
+
+ async fn transfer_page(
+ &self,
+ page: Page,
+ status: Option<TransferState>,
+ ) -> ApiResult<TransferList> {
+ Ok(TransferList {
+ transfers: db::transfer_page(&self.pool, &status, &page).await?,
+ debit_account: self.payto.clone(),
+ })
+ }
+
+ async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> {
+ Ok(db::transfer_by_id(&self.pool, id).await?)
+ }
+
+ async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> {
+ Ok(OutgoingHistory {
+ outgoing_transactions: db::outgoing_history(&self.pool, ¶ms, || {
+ self.taler_out_channel.subscribe()
+ })
+ .await?,
+ debit_account: self.payto.clone(),
+ })
+ }
+
+ async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> {
+ Ok(IncomingHistory {
+ incoming_transactions: db::incoming_history(&self.pool, ¶ms, || {
+ self.taler_in_channel.subscribe()
+ })
+ .await?,
+ credit_account: self.payto.clone(),
+ })
+ }
+
+ async fn add_incoming_reserve(
+ &self,
+ req: AddIncomingRequest,
+ ) -> ApiResult<AddIncomingResponse> {
+ let debtor = MagnetPayto::try_from(&req.debit_account)?;
+ let res = db::register_tx_in_admin(
+ &self.pool,
+ &TxInAdmin {
+ amount: req.amount,
+ subject: format!("Admin incoming {}", req.reserve_pub),
+ debtor,
+ timestamp: Timestamp::now(),
+ metadata: IncomingSubject::Reserve(req.reserve_pub),
+ },
+ )
+ .await?;
+ match res {
+ AddIncomingResult::Success(res) => Ok(AddIncomingResponse {
+ row_id: safe_u64(res.row_id),
+ timestamp: res.timestamp,
+ }),
+ AddIncomingResult::ReservePubReuse => Err(failure(
+ ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT,
+ "reserve_pub used already".to_owned(),
+ )),
+ }
+ }
+
+ async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> {
+ let debtor = MagnetPayto::try_from(&req.debit_account)?;
+ let res = db::register_tx_in_admin(
+ &self.pool,
+ &TxInAdmin {
+ amount: req.amount,
+ subject: format!("Admin incoming KYC:{}", req.account_pub),
+ debtor,
+ timestamp: Timestamp::now(),
+ metadata: IncomingSubject::Kyc(req.account_pub),
+ },
+ )
+ .await?;
+ match res {
+ AddIncomingResult::Success(res) => Ok(AddKycauthResponse {
+ row_id: safe_u64(res.row_id),
+ timestamp: res.timestamp,
+ }),
+ AddIncomingResult::ReservePubReuse => Err(failure(
+ ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT,
+ "reserve_pub used already".to_owned(),
+ )),
+ }
+ }
+}
+
+impl Revenue for MagnetApi {
+ async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> {
+ Ok(RevenueIncomingHistory {
+ incoming_transactions: db::revenue_history(&self.pool, ¶ms, || {
+ self.in_channel.subscribe()
+ })
+ .await?,
+ credit_account: self.payto.clone(),
+ })
+ }
+}
diff --git a/taler-magnet-bank/src/config.rs b/taler-magnet-bank/src/config.rs
@@ -0,0 +1,123 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use std::net::{IpAddr, SocketAddr};
+
+use base64::{prelude::BASE64_STANDARD, Engine};
+use reqwest::Url;
+use sqlx::postgres::PgConnectOptions;
+use taler_api::{auth::AuthMethod, Serve};
+use taler_common::config::{map_config, Config, Section, ValueErr};
+
+use crate::magnet::Token;
+
+pub struct DbCfg {
+ pub cfg: PgConnectOptions,
+}
+
+impl DbCfg {
+ pub fn parse(cfg: &Config) -> Result<Self, ValueErr> {
+ let sect = cfg.section("magnet-bankdb-postgres");
+ Ok(Self {
+ cfg: sect.postgres("CONFIG").require()?,
+ })
+ }
+}
+
+pub struct ApiCfg {
+ pub auth: AuthMethod,
+}
+
+impl ApiCfg {
+ pub fn parse(sect: Section) -> Result<Option<Self>, ValueErr> {
+ Ok(if sect.boolean("ENABLED").require()? {
+ let auth = map_config!(sect, "auth_method", "AUTH_METHOD",
+ "none" => {
+ Ok(AuthMethod::None)
+ },
+ "basic" => {
+ let username = sect.str("USERNAME").require()?;
+ let password = sect.str("PASSWORD").require()?;
+ Ok(AuthMethod::Basic(
+ BASE64_STANDARD.encode(format!("{username}:{password}")),
+ ))
+ },
+ "bearer" => {
+ Ok(AuthMethod::Bearer(sect.str("AUTH_TOKEN").require()?))
+ }
+ )
+ .require()?;
+ Some(Self { auth })
+ } else {
+ None
+ })
+ }
+}
+
+pub struct ServeCfg {
+ pub serve: Serve,
+ pub wire_gateway: Option<ApiCfg>,
+ pub revenue: Option<ApiCfg>,
+}
+
+impl ServeCfg {
+ pub fn parse(cfg: &Config) -> Result<Self, ValueErr> {
+ let sect = cfg.section("magnet-bank");
+
+ let serve = map_config!(sect, "serve", "SERVE",
+ "tcp" => {
+ let port = sect.number("PORT").require()?;
+ let ip: IpAddr = sect.parse("IP addr", "BIND_TO").require()?;
+ Ok::<Serve, ValueErr>(Serve::Tcp(SocketAddr::new(ip, port)))
+ },
+ "unix" => {
+ let path = sect.path("UNIXPATH").require()?;
+ let permission = sect.unix_mode("UNIXPATH_MODE").require()?;
+ Ok::<Serve, ValueErr>(Serve::Unix { path, permission })
+ }
+ )
+ .require()?;
+
+ let wire_gateway = ApiCfg::parse(cfg.section("magnet-bank-wire-gateway-api"))?;
+ let revenue = ApiCfg::parse(cfg.section("magnet-bank-revenue-api"))?;
+
+ Ok(Self {
+ serve,
+ wire_gateway,
+ revenue,
+ })
+ }
+}
+
+pub struct WorkerCfg {
+ pub api_url: Url,
+ pub consumer: Token,
+ pub keys_path: String,
+}
+
+impl WorkerCfg {
+ pub fn parse(cfg: &Config) -> Result<Self, ValueErr> {
+ let sect = cfg.section("magnet-bank");
+ Ok(Self {
+ api_url: sect.parse("URL", "API_URL").require()?,
+ consumer: Token {
+ key: sect.str("CONSUMER_KEY").require()?,
+ secret: sect.str("CONSUMER_SECRET").require()?,
+ },
+ keys_path: sect.path("KEYS_FILE").require()?,
+ })
+ }
+}
diff --git a/adapter/taler-magnet-bank-adapter/src/constant.rs b/taler-magnet-bank/src/constant.rs
diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs
@@ -0,0 +1,1161 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use std::fmt::Display;
+
+use sqlx::{postgres::PgRow, PgConnection, PgExecutor, PgPool, QueryBuilder, Row};
+use taler_api::{
+ db::{history, page, BindHelper, IncomingType, TypeHelper},
+ subject::{IncomingSubject, OutgoingSubject},
+};
+use taler_common::{
+ api_params::{History, Page},
+ api_revenue::RevenueIncomingBankTransaction,
+ api_wire::{
+ IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest,
+ TransferState, TransferStatus,
+ },
+ types::{amount::Amount, timestamp::Timestamp},
+};
+use tokio::sync::watch::{Receiver, Sender};
+
+use crate::{constant::CURRENCY, MagnetPayto};
+
+pub async fn notification_listener(
+ pool: PgPool,
+ in_channel: Sender<i64>,
+ taler_in_channel: Sender<i64>,
+ out_channel: Sender<i64>,
+ taler_out_channel: Sender<i64>,
+) -> sqlx::Result<()> {
+ taler_api::notification::notification_listener!(&pool,
+ "tx_in" => (row_id: i64) {
+ in_channel.send_replace(row_id);
+ },
+ "taler_in" => (row_id: i64) {
+ taler_in_channel.send_replace(row_id);
+ },
+ "tx_out" => (row_id: i64) {
+ out_channel.send_replace(row_id);
+ },
+ "taler_out" => (row_id: i64) {
+ taler_out_channel.send_replace(row_id);
+ }
+ )
+}
+
+#[derive(Debug, Clone)]
+pub struct TxIn {
+ pub code: u64,
+ pub amount: Amount,
+ pub subject: String,
+ pub debtor: MagnetPayto,
+ pub timestamp: Timestamp,
+}
+
+impl Display for TxIn {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let TxIn {
+ code,
+ amount,
+ subject,
+ debtor,
+ timestamp,
+ } = self;
+ write!(f, "{timestamp} {amount} {code} {debtor} '{subject}'")
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct TxOut {
+ pub code: u64,
+ pub amount: Amount,
+ pub subject: String,
+ pub creditor: MagnetPayto,
+ pub timestamp: Timestamp,
+}
+
+impl Display for TxOut {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let TxOut {
+ code,
+ amount,
+ subject,
+ creditor,
+ timestamp,
+ } = self;
+ write!(f, "{timestamp} {amount} {code} {creditor} '{subject}'")
+ }
+}
+
+#[derive(Debug, Clone)]
+pub struct TxInAdmin {
+ pub amount: Amount,
+ pub subject: String,
+ pub debtor: MagnetPayto,
+ pub timestamp: Timestamp,
+ pub metadata: IncomingSubject,
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub struct RegisteredTx {
+ pub new: bool,
+ pub row_id: u64,
+ pub timestamp: Timestamp,
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub enum AddIncomingResult {
+ Success(RegisteredTx),
+ ReservePubReuse,
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub struct Initiated {
+ pub id: u64,
+ pub amount: Amount,
+ pub subject: String,
+ pub creditor: MagnetPayto,
+}
+
+impl Display for Initiated {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ write!(
+ f,
+ "{} {} {} '{}'",
+ self.id, self.amount, self.creditor, self.subject
+ )
+ }
+}
+
+pub async fn db_init(db: &PgPool, reset: bool) -> sqlx::Result<()> {
+ let mut tx = db.begin().await?;
+ if reset {
+ sqlx::raw_sql("DROP SCHEMA public CASCADE;CREATE SCHEMA public;")
+ .execute(&mut *tx)
+ .await?;
+ }
+ // TODO migrations
+ sqlx::raw_sql(include_str!("../db/schema.sql"))
+ .execute(&mut *tx)
+ .await?;
+ tx.commit().await?;
+ Ok(())
+}
+
+pub async fn register_tx_in_admin(db: &PgPool, tx: &TxInAdmin) -> sqlx::Result<AddIncomingResult> {
+ sqlx::query(
+ "
+ SELECT out_reserve_pub_reuse, out_new, out_tx_row_id, out_timestamp
+ FROM register_tx_in(NULL, ($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8)
+ ",
+ )
+ .bind_amount(&tx.amount)
+ .bind(&tx.subject)
+ .bind(&tx.debtor.number)
+ .bind(&tx.debtor.name)
+ .bind_timestamp(&tx.timestamp)
+ .bind(tx.metadata.ty())
+ .bind(tx.metadata.key())
+ .try_map(|r: PgRow| {
+ Ok(if r.try_get(0)? {
+ AddIncomingResult::ReservePubReuse
+ } else {
+ AddIncomingResult::Success(RegisteredTx {
+ new: r.try_get(1)?,
+ row_id: r.try_get_u64(2)?,
+ timestamp: r.try_get_timestamp(3)?,
+ })
+ })
+ })
+ .fetch_one(db)
+ .await
+}
+
+pub async fn register_tx_in(
+ db: &mut PgConnection,
+ tx: &TxIn,
+ subject: &Option<IncomingSubject>,
+) -> sqlx::Result<AddIncomingResult> {
+ sqlx::query(
+ "
+ SELECT out_reserve_pub_reuse, out_new, out_tx_row_id, out_timestamp
+ FROM register_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9)
+ ",
+ )
+ .bind(tx.code as i64)
+ .bind_amount(&tx.amount)
+ .bind(&tx.subject)
+ .bind(&tx.debtor.number)
+ .bind(&tx.debtor.name)
+ .bind_timestamp(&tx.timestamp)
+ .bind(subject.as_ref().map(|it| it.ty()))
+ .bind(subject.as_ref().map(|it| it.key()))
+ .try_map(|r: PgRow| {
+ Ok(if r.try_get(0)? {
+ AddIncomingResult::ReservePubReuse
+ } else {
+ AddIncomingResult::Success(RegisteredTx {
+ new: r.try_get(1)?,
+ row_id: r.try_get_u64(2)?,
+ timestamp: r.try_get_timestamp(3)?,
+ })
+ })
+ })
+ .fetch_one(db)
+ .await
+}
+
+pub async fn register_tx_out(
+ db: &mut PgConnection,
+ tx: &TxOut,
+ subject: &Option<OutgoingSubject>,
+) -> sqlx::Result<RegisteredTx> {
+ sqlx::query(
+ "
+ SELECT out_new, out_tx_row_id, out_timestamp
+ FROM register_tx_out($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9)
+ ",
+ )
+ .bind(tx.code as i64)
+ .bind_amount(&tx.amount)
+ .bind(&tx.subject)
+ .bind(&tx.creditor.number)
+ .bind(&tx.creditor.name)
+ .bind_timestamp(&tx.timestamp)
+ .bind(subject.as_ref().map(|it| it.0.as_ref()))
+ .bind(subject.as_ref().map(|it| it.1.as_str()))
+ .try_map(|r: PgRow| {
+ Ok(RegisteredTx {
+ new: r.try_get(0)?,
+ row_id: r.try_get_u64(1)?,
+ timestamp: r.try_get_timestamp(2)?,
+ })
+ })
+ .fetch_one(db)
+ .await
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub enum TransferResult {
+ Success { id: u64, timestamp: Timestamp },
+ RequestUidReuse,
+ WtidReuse,
+}
+
+pub async fn make_transfer<'a>(
+ db: impl PgExecutor<'a>,
+ req: &TransferRequest,
+ creditor: &MagnetPayto,
+ timestamp: &Timestamp,
+) -> sqlx::Result<TransferResult> {
+ let subject = format!("{} {}", req.wtid, req.exchange_base_url);
+ sqlx::query(
+ "
+ SELECT out_request_uid_reuse, out_wtid_reuse, out_tx_row_id, out_timestamp
+ FROM taler_transfer($1, $2, $3, ($4, $5)::taler_amount, $6, $7, $8, $9)
+ ",
+ )
+ .bind(req.request_uid.as_ref())
+ .bind(req.wtid.as_ref())
+ .bind(&subject)
+ .bind_amount(&req.amount)
+ .bind(req.exchange_base_url.as_str())
+ .bind(&creditor.number)
+ .bind(&creditor.name)
+ .bind_timestamp(timestamp)
+ .try_map(|r: PgRow| {
+ Ok(if r.try_get(0)? {
+ TransferResult::RequestUidReuse
+ } else if r.try_get(1)? {
+ TransferResult::WtidReuse
+ } else {
+ TransferResult::Success {
+ id: r.try_get_u64(2)?,
+ timestamp: r.try_get_timestamp(3)?,
+ }
+ })
+ })
+ .fetch_one(db)
+ .await
+}
+
+pub async fn transfer_page<'a>(
+ db: impl PgExecutor<'a>,
+ status: &Option<TransferState>,
+ params: &Page,
+) -> sqlx::Result<Vec<TransferListStatus>> {
+ page(
+ db,
+ "initiated_id",
+ params,
+ || {
+ let mut builder = QueryBuilder::new(
+ "
+ SELECT
+ initiated_id,
+ status,
+ (amount).val as amount_val,
+ (amount).frac as amount_frac,
+ credit_account,
+ credit_name,
+ created
+ FROM transfer
+ JOIN initiated USING (initiated_id)
+ WHERE
+ ",
+ );
+ if let Some(status) = status {
+ builder.push(" status = ").push_bind(status).push(" AND ");
+ }
+ builder
+ },
+ |r: PgRow| {
+ Ok(TransferListStatus {
+ row_id: r.try_get_safeu64(0)?,
+ status: r.try_get(1)?,
+ amount: r.try_get_amount_i(2, CURRENCY)?,
+ credit_account: MagnetPayto {
+ number: r.try_get(4)?,
+ name: r.try_get(4)?,
+ }
+ .as_payto(),
+ timestamp: r.try_get_timestamp(6)?,
+ })
+ },
+ )
+ .await
+}
+
+pub async fn outgoing_history(
+ db: &PgPool,
+ params: &History,
+ listen: impl FnOnce() -> Receiver<i64>,
+) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
+ history(
+ db,
+ "tx_out_id",
+ params,
+ listen,
+ || {
+ QueryBuilder::new(
+ "
+ SELECT
+ tx_out_id,
+ (amount).val as amount_val,
+ (amount).frac as amount_frac,
+ credit_account,
+ credit_name,
+ created,
+ exchange_base_url,
+ wtid
+ FROM taler_out
+ JOIN tx_out USING (tx_out_id)
+ WHERE
+ ",
+ )
+ },
+ |r: PgRow| {
+ Ok(OutgoingBankTransaction {
+ row_id: r.try_get_safeu64(0)?,
+ amount: r.try_get_amount_i(1, CURRENCY)?,
+ credit_account: MagnetPayto {
+ number: r.try_get(3)?,
+ name: r.try_get(4)?,
+ }
+ .as_payto(),
+ date: r.try_get_timestamp(5)?,
+ exchange_base_url: r.try_get_url(6)?,
+ wtid: r.try_get_base32(7)?,
+ })
+ },
+ )
+ .await
+}
+
+pub async fn incoming_history(
+ db: &PgPool,
+ params: &History,
+ listen: impl FnOnce() -> Receiver<i64>,
+) -> sqlx::Result<Vec<IncomingBankTransaction>> {
+ history(
+ db,
+ "tx_in_id",
+ params,
+ listen,
+ || {
+ QueryBuilder::new(
+ "
+ SELECT
+ type,
+ tx_in_id,
+ (amount).val as amount_val,
+ (amount).frac as amount_frac,
+ debit_account,
+ debit_name,
+ created,
+ metadata
+ FROM taler_in
+ JOIN tx_in USING (tx_in_id)
+ WHERE
+ ",
+ )
+ },
+ |r: PgRow| {
+ Ok(match r.try_get(0)? {
+ IncomingType::reserve => IncomingBankTransaction::Reserve {
+ row_id: r.try_get_safeu64(1)?,
+ amount: r.try_get_amount_i(2, CURRENCY)?,
+ debit_account: MagnetPayto {
+ number: r.try_get(4)?,
+ name: r.try_get(5)?,
+ }
+ .as_payto(),
+ date: r.try_get_timestamp(6)?,
+ reserve_pub: r.try_get_base32(7)?,
+ },
+ IncomingType::kyc => IncomingBankTransaction::Kyc {
+ row_id: r.try_get_safeu64(1)?,
+ amount: r.try_get_amount_i(2, CURRENCY)?,
+ debit_account: MagnetPayto {
+ number: r.try_get(4)?,
+ name: r.try_get(5)?,
+ }
+ .as_payto(),
+ date: r.try_get_timestamp(6)?,
+ account_pub: r.try_get_base32(7)?,
+ },
+ IncomingType::wad => {
+ unimplemented!("WAD is not yet supported")
+ }
+ })
+ },
+ )
+ .await
+}
+
+pub async fn revenue_history(
+ db: &PgPool,
+ params: &History,
+ listen: impl FnOnce() -> Receiver<i64>,
+) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> {
+ history(
+ db,
+ "tx_in_id",
+ params,
+ listen,
+ || {
+ QueryBuilder::new(
+ "
+ SELECT
+ tx_in_id,
+ created,
+ (amount).val as amount_val,
+ (amount).frac as amount_frac,
+ debit_account,
+ debit_name,
+ subject
+ FROM tx_in
+ WHERE
+ ",
+ )
+ },
+ |r: PgRow| {
+ Ok(RevenueIncomingBankTransaction {
+ row_id: r.try_get_safeu64(0)?,
+ date: r.try_get_timestamp(1)?,
+ amount: r.try_get_amount_i(2, CURRENCY)?,
+ credit_fee: None,
+ debit_account: MagnetPayto {
+ number: r.try_get(4)?,
+ name: r.try_get(5)?,
+ }
+ .as_payto(),
+ subject: r.try_get(6)?,
+ })
+ },
+ )
+ .await
+}
+
+pub async fn transfer_by_id<'a>(
+ db: impl PgExecutor<'a>,
+ id: u64,
+) -> sqlx::Result<Option<TransferStatus>> {
+ sqlx::query(
+ "
+ SELECT
+ status,
+ status_msg,
+ (amount).val as amount_val,
+ (amount).frac as amount_frac,
+ exchange_base_url,
+ wtid,
+ credit_account,
+ credit_name,
+ created
+ FROM transfer
+ JOIN initiated USING (initiated_id)
+ WHERE initiated_id = $1
+ ",
+ )
+ .bind(id as i64)
+ .try_map(|r: PgRow| {
+ Ok(TransferStatus {
+ status: r.try_get(0)?,
+ status_msg: r.try_get(1)?,
+ amount: r.try_get_amount_i(2, CURRENCY)?,
+ origin_exchange_url: r.try_get(4)?,
+ wtid: r.try_get_base32(5)?,
+ credit_account: MagnetPayto {
+ number: r.try_get(6)?,
+ name: r.try_get(7)?,
+ }
+ .as_payto(),
+ timestamp: r.try_get_timestamp(8)?,
+ })
+ })
+ .fetch_optional(db)
+ .await
+}
+
+pub async fn pending_batch<'a>(
+ db: impl PgExecutor<'a>,
+ start: &Timestamp,
+) -> sqlx::Result<Vec<Initiated>> {
+ sqlx::query(
+ "
+ SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name
+ FROM initiated
+ WHERE magnet_code IS NULL AND (last_submitted IS NULL OR last_submitted < $1)
+ LIMIT 100
+ ",
+ )
+ .bind_timestamp(start)
+ .try_map(|r: PgRow| {
+ Ok(Initiated {
+ id: r.try_get_u64(0)?,
+ amount: r.try_get_amount_i(1, CURRENCY)?,
+ subject: r.try_get(3)?,
+ creditor: MagnetPayto {
+ number: r.try_get(4)?,
+ name: r.try_get(5)?,
+ },
+ })
+ })
+ .fetch_all(db)
+ .await
+}
+
+/** Update status of a sucessfull submitted initiated transaction */
+pub async fn initiated_submit_success<'a>(
+ db: impl PgExecutor<'a>,
+ id: u64,
+ timestamp: &Timestamp,
+ magnet_code: u64,
+) -> sqlx::Result<()> {
+ sqlx::query(
+ "
+ UPDATE initiated
+ SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2
+ WHERE initiated_id=$3
+ "
+ ).bind_timestamp(timestamp)
+ .bind(magnet_code as i64)
+ .bind(id as i64)
+ .execute(db).await?;
+ Ok(())
+}
+
+/** Update status of a sucessfull submitted initiated transaction */
+pub async fn initiated_submit_failure<'a>(
+ db: impl PgExecutor<'a>,
+ id: u64,
+ timestamp: &Timestamp,
+ msg: &str,
+) -> sqlx::Result<()> {
+ sqlx::query(
+ "
+ UPDATE initiated
+ SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, status_msg=$2
+ WHERE initiated_id=$3
+ ",
+ )
+ .bind_timestamp(timestamp)
+ .bind(msg)
+ .bind(id as i64)
+ .execute(db)
+ .await?;
+ Ok(())
+}
+
+#[cfg(test)]
+mod test {
+
+ use sqlx::{postgres::PgRow, PgConnection, PgPool};
+ use taler_api::{
+ db::TypeHelper,
+ subject::{IncomingSubject, OutgoingSubject},
+ };
+ use taler_common::{
+ api_common::{EddsaPublicKey, HashCode, ShortHashCode},
+ api_params::{History, Page},
+ api_wire::TransferRequest,
+ types::{amount::amount, payto::payto, timestamp::Timestamp, url},
+ };
+ use tokio::sync::watch::Receiver;
+
+ use crate::{
+ constant::CURRENCY,
+ db::{
+ self, make_transfer, register_tx_in, register_tx_in_admin, register_tx_out,
+ AddIncomingResult, RegisteredTx, TransferResult, TxIn, TxOut,
+ },
+ MagnetPayto,
+ };
+
+ use super::TxInAdmin;
+
+ fn fake_listen<T: Default>() -> Receiver<T> {
+ tokio::sync::watch::channel(T::default()).1
+ }
+
+ async fn setup() -> (PgConnection, PgPool) {
+ let pool = taler_test_utils::db_test_setup().await;
+ db::db_init(&pool, false).await.expect("dbinit");
+ let conn = pool.acquire().await.expect("aquire conn").leak();
+ (conn, pool)
+ }
+
+ #[tokio::test]
+ async fn tx_in() {
+ let (mut db, pool) = setup().await;
+
+ async fn routine(
+ db: &mut PgConnection,
+ first: &Option<IncomingSubject>,
+ second: &Option<IncomingSubject>,
+ ) {
+ let (id, code) =
+ sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in")
+ .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
+ .fetch_one(&mut *db)
+ .await
+ .unwrap();
+ let tx = TxIn {
+ code: code,
+ amount: amount("EUR:10"),
+ subject: "subject".to_owned(),
+ debtor: MagnetPayto {
+ number: "number".to_owned(),
+ name: "name".to_owned(),
+ },
+ timestamp: Timestamp::now_stable(),
+ };
+ // Insert
+ assert_eq!(
+ register_tx_in(db, &tx, &first)
+ .await
+ .expect("register tx in"),
+ AddIncomingResult::Success(RegisteredTx {
+ new: true,
+ row_id: id,
+ timestamp: tx.timestamp
+ })
+ );
+ // Idempotent
+ assert_eq!(
+ register_tx_in(
+ db,
+ &TxIn {
+ timestamp: Timestamp::now(),
+ ..tx.clone()
+ },
+ &first
+ )
+ .await
+ .expect("register tx in"),
+ AddIncomingResult::Success(RegisteredTx {
+ new: false,
+ row_id: id,
+ timestamp: tx.timestamp
+ })
+ );
+ // Many
+ assert_eq!(
+ register_tx_in(
+ db,
+ &TxIn {
+ code: code + 1,
+ ..tx
+ },
+ &second
+ )
+ .await
+ .expect("register tx in"),
+ AddIncomingResult::Success(RegisteredTx {
+ new: true,
+ row_id: id + 1,
+ timestamp: tx.timestamp
+ })
+ );
+ }
+
+ // Empty db
+ assert_eq!(
+ db::revenue_history(&pool, &History::default(), fake_listen)
+ .await
+ .unwrap(),
+ Vec::new()
+ );
+ assert_eq!(
+ db::incoming_history(&pool, &History::default(), fake_listen)
+ .await
+ .unwrap(),
+ Vec::new()
+ );
+
+ // Regular transaction
+ routine(&mut db, &None, &None).await;
+
+ // Reserve transaction
+ routine(
+ &mut db,
+ &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
+ &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())),
+ )
+ .await;
+
+ // Kyc transaction
+ routine(
+ &mut db,
+ &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
+ &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())),
+ )
+ .await;
+
+ // History
+ assert_eq!(
+ db::revenue_history(&pool, &History::default(), fake_listen)
+ .await
+ .unwrap()
+ .len(),
+ 6
+ );
+ assert_eq!(
+ db::incoming_history(&pool, &History::default(), fake_listen)
+ .await
+ .unwrap()
+ .len(),
+ 4
+ );
+ }
+
+ #[tokio::test]
+ async fn tx_in_admin() {
+ let (_, pool) = setup().await;
+
+ // Empty db
+ assert_eq!(
+ db::incoming_history(&pool, &History::default(), fake_listen)
+ .await
+ .unwrap(),
+ Vec::new()
+ );
+
+ let tx = TxInAdmin {
+ amount: amount("EUR:10"),
+ subject: "subject".to_owned(),
+ debtor: MagnetPayto {
+ number: "number".to_owned(),
+ name: "name".to_owned(),
+ },
+ timestamp: Timestamp::now_stable(),
+ metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
+ };
+ // Insert
+ assert_eq!(
+ register_tx_in_admin(&pool, &tx)
+ .await
+ .expect("register tx in"),
+ AddIncomingResult::Success(RegisteredTx {
+ new: true,
+ row_id: 1,
+ timestamp: tx.timestamp
+ })
+ );
+ // Idempotent
+ assert_eq!(
+ register_tx_in_admin(
+ &pool,
+ &TxInAdmin {
+ timestamp: Timestamp::now(),
+ ..tx.clone()
+ }
+ )
+ .await
+ .expect("register tx in"),
+ AddIncomingResult::Success(RegisteredTx {
+ new: false,
+ row_id: 1,
+ timestamp: tx.timestamp
+ })
+ );
+ // Many
+ assert_eq!(
+ register_tx_in_admin(
+ &pool,
+ &TxInAdmin {
+ subject: "Other".to_owned(),
+ metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()),
+ ..tx.clone()
+ }
+ )
+ .await
+ .expect("register tx in"),
+ AddIncomingResult::Success(RegisteredTx {
+ new: true,
+ row_id: 2,
+ timestamp: tx.timestamp
+ })
+ );
+
+ // History
+ assert_eq!(
+ db::incoming_history(&pool, &History::default(), fake_listen)
+ .await
+ .unwrap()
+ .len(),
+ 2
+ );
+ }
+
+ #[tokio::test]
+ async fn tx_out() {
+ let (mut db, pool) = setup().await;
+
+ async fn routine(
+ db: &mut PgConnection,
+ first: &Option<OutgoingSubject>,
+ second: &Option<OutgoingSubject>,
+ ) {
+ let (id, code) =
+ sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out")
+ .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
+ .fetch_one(&mut *db)
+ .await
+ .unwrap();
+ let tx = TxOut {
+ code,
+ amount: amount("EUR:10"),
+ subject: "subject".to_owned(),
+ creditor: MagnetPayto {
+ number: "number".to_owned(),
+ name: "name".to_owned(),
+ },
+ timestamp: Timestamp::now_stable(),
+ };
+ // Insert
+ assert_eq!(
+ register_tx_out(db, &tx, &first)
+ .await
+ .expect("register tx out"),
+ RegisteredTx {
+ new: true,
+ row_id: id,
+ timestamp: tx.timestamp
+ }
+ );
+ // Idempotent
+ assert_eq!(
+ register_tx_out(
+ db,
+ &TxOut {
+ timestamp: Timestamp::now(),
+ ..tx.clone()
+ },
+ &first
+ )
+ .await
+ .expect("register tx out"),
+ RegisteredTx {
+ new: false,
+ row_id: id,
+ timestamp: tx.timestamp
+ }
+ );
+ // Many
+ assert_eq!(
+ register_tx_out(
+ db,
+ &TxOut {
+ code: code + 1,
+ ..tx.clone()
+ },
+ &second
+ )
+ .await
+ .expect("register tx out"),
+ RegisteredTx {
+ new: true,
+ row_id: id + 1,
+ timestamp: tx.timestamp
+ }
+ );
+ }
+
+ // Empty db
+ assert_eq!(
+ db::outgoing_history(&pool, &History::default(), fake_listen)
+ .await
+ .unwrap(),
+ Vec::new()
+ );
+
+ // Regular transaction
+ routine(&mut db, &None, &None).await;
+
+ // Talerable transaction
+ routine(
+ &mut db,
+ &Some(OutgoingSubject(
+ ShortHashCode::rand(),
+ url("https://exchange.com"),
+ )),
+ &Some(OutgoingSubject(
+ ShortHashCode::rand(),
+ url("https://exchange.com"),
+ )),
+ )
+ .await;
+
+ // History
+ assert_eq!(
+ db::outgoing_history(&pool, &History::default(), fake_listen)
+ .await
+ .unwrap()
+ .len(),
+ 2
+ );
+ }
+
+ #[tokio::test]
+ async fn transfer() {
+ let (mut db, _) = setup().await;
+
+ // Empty db
+ assert_eq!(db::transfer_by_id(&mut db, 0).await.unwrap(), None);
+ assert_eq!(
+ db::transfer_page(&mut db, &None, &Page::default())
+ .await
+ .unwrap(),
+ Vec::new()
+ );
+
+ let req = TransferRequest {
+ request_uid: HashCode::rand(),
+ amount: amount("EUR:10"),
+ exchange_base_url: url("https://exchange.test.com/"),
+ wtid: ShortHashCode::rand(),
+ credit_account: payto("payto://magnet-bank/todo"),
+ };
+ let payto = MagnetPayto {
+ number: "number".to_owned(),
+ name: "name".to_owned(),
+ };
+ let timestamp = Timestamp::now_stable();
+ // Insert
+ assert_eq!(
+ make_transfer(&mut db, &req, &payto, ×tamp)
+ .await
+ .expect("transfer"),
+ TransferResult::Success {
+ id: 1,
+ timestamp: timestamp
+ }
+ );
+ // Idempotent
+ assert_eq!(
+ make_transfer(&mut db, &req, &payto, &Timestamp::now())
+ .await
+ .expect("transfer"),
+ TransferResult::Success {
+ id: 1,
+ timestamp: timestamp
+ }
+ );
+ // Request UID reuse
+ assert_eq!(
+ make_transfer(
+ &mut db,
+ &TransferRequest {
+ wtid: ShortHashCode::rand(),
+ ..req.clone()
+ },
+ &payto,
+ &Timestamp::now()
+ )
+ .await
+ .expect("transfer"),
+ TransferResult::RequestUidReuse
+ );
+ // wtid reuse
+ assert_eq!(
+ make_transfer(
+ &mut db,
+ &TransferRequest {
+ request_uid: HashCode::rand(),
+ ..req.clone()
+ },
+ &payto,
+ &Timestamp::now()
+ )
+ .await
+ .expect("transfer"),
+ TransferResult::WtidReuse
+ );
+ // Many
+ assert_eq!(
+ make_transfer(
+ &mut db,
+ &TransferRequest {
+ request_uid: HashCode::rand(),
+ wtid: ShortHashCode::rand(),
+ ..req
+ },
+ &payto,
+ ×tamp
+ )
+ .await
+ .expect("transfer"),
+ TransferResult::Success {
+ id: 2,
+ timestamp: timestamp
+ }
+ );
+
+ // Get
+ assert!(db::transfer_by_id(&mut db, 1).await.unwrap().is_some());
+ assert!(db::transfer_by_id(&mut db, 2).await.unwrap().is_some());
+ assert!(db::transfer_by_id(&mut db, 3).await.unwrap().is_none());
+ assert_eq!(
+ db::transfer_page(&mut db, &None, &Page::default())
+ .await
+ .unwrap()
+ .len(),
+ 2
+ );
+ }
+
+ #[tokio::test]
+ async fn status() {
+ let (mut db, _) = setup().await;
+
+ // Unknown transfer
+ db::initiated_submit_failure(&mut db, 1, &Timestamp::now(), "msg")
+ .await
+ .unwrap();
+ db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
+ .await
+ .unwrap();
+ }
+
+ #[tokio::test]
+ async fn batch() {
+ let (mut db, _) = setup().await;
+ let start = Timestamp::now();
+ let magnet_payto = MagnetPayto {
+ number: "number".to_owned(),
+ name: "name".to_owned(),
+ };
+
+ // Empty db
+ let pendings = db::pending_batch(&mut db, &start)
+ .await
+ .expect("pending_batch");
+ assert_eq!(pendings.len(), 0);
+
+ // Some transfers
+ for i in 0..3 {
+ make_transfer(
+ &mut db,
+ &TransferRequest {
+ request_uid: HashCode::rand(),
+ amount: amount(format!("{CURRENCY}:{}", i + 1)),
+ exchange_base_url: url("https://exchange.test.com/"),
+ wtid: ShortHashCode::rand(),
+ credit_account: payto("payto://magnet-bank/todo"),
+ },
+ &magnet_payto,
+ &&Timestamp::now(),
+ )
+ .await
+ .expect("transfer");
+ }
+ let pendings = db::pending_batch(&mut db, &start)
+ .await
+ .expect("pending_batch");
+ assert_eq!(pendings.len(), 3);
+
+ // Max 100 txs in batch
+ for i in 0..100 {
+ make_transfer(
+ &mut db,
+ &TransferRequest {
+ request_uid: HashCode::rand(),
+ amount: amount(format!("{CURRENCY}:{}", i + 1)),
+ exchange_base_url: url("https://exchange.test.com/"),
+ wtid: ShortHashCode::rand(),
+ credit_account: payto("payto://magnet-bank/todo"),
+ },
+ &magnet_payto,
+ &Timestamp::now(),
+ )
+ .await
+ .expect("transfer");
+ }
+ let pendings = db::pending_batch(&mut db, &start)
+ .await
+ .expect("pending_batch");
+ assert_eq!(pendings.len(), 100);
+
+ // Skip uploaded
+ for i in 0..=10 {
+ db::initiated_submit_success(&mut db, i, &Timestamp::now(), i)
+ .await
+ .expect("status success");
+ }
+ let pendings = db::pending_batch(&mut db, &start)
+ .await
+ .expect("pending_batch");
+ assert_eq!(pendings.len(), 93);
+
+ // Skip tried since start
+ for i in 0..=10 {
+ db::initiated_submit_failure(&mut db, 10 + i, &Timestamp::now(), "failure")
+ .await
+ .expect("status failure");
+ }
+ let pendings = db::pending_batch(&mut db, &start)
+ .await
+ .expect("pending_batch");
+ assert_eq!(pendings.len(), 83);
+ let pendings = db::pending_batch(&mut db, &Timestamp::now())
+ .await
+ .expect("pending_batch");
+ assert_eq!(pendings.len(), 93);
+ }
+}
diff --git a/taler-magnet-bank/src/dev.rs b/taler-magnet-bank/src/dev.rs
@@ -0,0 +1,147 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use clap::ValueEnum;
+use jiff::Zoned;
+use taler_common::{
+ config::Config,
+ types::{
+ amount::Amount,
+ payto::{FullPayto, Payto},
+ },
+};
+use tracing::info;
+
+use crate::{
+ config::WorkerCfg,
+ keys,
+ magnet::{AuthClient, Direction},
+ worker::{extract_tx_info, Tx},
+ MagnetPayto,
+};
+
+#[derive(Debug, Clone, PartialEq, Eq, ValueEnum)]
+pub enum DirArg {
+ #[value(alias("in"))]
+ Incoming,
+ #[value(alias("out"))]
+ Outgoing,
+ Both,
+}
+
+#[derive(clap::Subcommand, Debug)]
+pub enum DevCmd {
+ /// Print account info
+ Accounts,
+ Tx {
+ account: Payto,
+ #[clap(long, short, value_enum, default_value_t = DirArg::Both)]
+ direction: DirArg,
+ },
+ Transfer {
+ #[clap(long)]
+ debtor: Payto,
+ #[clap(long)]
+ creditor: Payto,
+ #[clap(long)]
+ amount: Amount,
+ #[clap(long)]
+ subject: String,
+ },
+}
+
+pub async fn dev(cfg: Config, cmd: DevCmd) -> anyhow::Result<()> {
+ let cfg = WorkerCfg::parse(&cfg)?;
+ let keys = keys::load(&cfg)?;
+ let client = reqwest::Client::new();
+ let client = AuthClient::new(&client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token);
+ match cmd {
+ DevCmd::Accounts => {
+ let res = client.list_accounts().await?;
+ for partner in res.partners {
+ for account in partner.bank_accounts {
+ let payto = MagnetPayto {
+ number: account.number,
+ name: partner.partner.name.clone(),
+ };
+ info!("{} {} {payto}", account.code, account.currency.symbol);
+ }
+ }
+ }
+ DevCmd::Tx { account, direction } => {
+ let account = MagnetPayto::try_from(&account)?;
+ let dir = match direction {
+ DirArg::Incoming => Direction::Incoming,
+ DirArg::Outgoing => Direction::Outgoing,
+ DirArg::Both => Direction::Both,
+ };
+ // Register incoming
+ let mut next = None;
+ loop {
+ let page = client
+ .page_tx(dir, 5, &account.number, &next, &None)
+ .await?;
+ next = page.next;
+ for item in page.list {
+ let tx = extract_tx_info(item.tx);
+ match tx {
+ Tx::In(tx_in) => info!("in {tx_in}"),
+ Tx::Out(tx_out) => info!("out {tx_out}"),
+ }
+ }
+ if next.is_none() {
+ break;
+ }
+ }
+ }
+ DevCmd::Transfer {
+ debtor,
+ creditor,
+ amount,
+ subject,
+ } => {
+ let full: FullPayto = creditor.query()?;
+ let debtor = MagnetPayto::try_from(&debtor)?;
+ let creditor = MagnetPayto::try_from(&creditor)?;
+ let debtor = client.account(&debtor.number).await?;
+ let now = Zoned::now();
+ let date = now.date();
+
+ let init = client
+ .init_tx(
+ debtor.code,
+ amount.val as f64,
+ &subject,
+ &date,
+ &full.receiver_name,
+ &creditor.number,
+ )
+ .await?
+ .tx;
+ client
+ .sign_tx(
+ &keys.signing_key,
+ &debtor.number,
+ init.code,
+ init.amount,
+ &date,
+ &creditor.number,
+ )
+ .await?;
+ }
+ }
+ Ok(())
+}
diff --git a/taler-magnet-bank/src/keys.rs b/taler-magnet-bank/src/keys.rs
@@ -0,0 +1,147 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use std::io::ErrorKind;
+
+use p256::ecdsa::SigningKey;
+use taler_common::{json_file, types::base32::Base32};
+use tracing::info;
+
+use crate::{
+ config::WorkerCfg,
+ magnet::{
+ error::{ApiError, MagnetError},
+ AuthClient, Token, TokenAuth,
+ },
+};
+
+#[derive(Default, Debug, serde::Deserialize, serde::Serialize)]
+struct KeysFile {
+ access_token: Option<Token>,
+ signing_key: Option<Base32<32>>,
+}
+
+#[derive(Debug)]
+pub struct Keys {
+ pub access_token: Token,
+ pub signing_key: SigningKey,
+}
+
+pub fn load(cfg: &WorkerCfg) -> anyhow::Result<Keys> {
+ // Load JSON file
+ let file: KeysFile = match json_file::load(&cfg.keys_path) {
+ Ok(file) => file,
+ Err(e) => return Err(anyhow::anyhow!("Could not magnet keys: {e}")),
+ };
+
+ fn incomplete_err() -> anyhow::Error {
+ anyhow::anyhow!("Missing magnet keys, run 'taler-magnet-bank setup' first")
+ }
+
+ // Check full
+ let access_token = file.access_token.ok_or_else(incomplete_err)?;
+ let signing_key = file.signing_key.ok_or_else(incomplete_err)?;
+
+ // Load signing key
+
+ let signing_key = SigningKey::from_slice(&*signing_key)?;
+
+ Ok(Keys {
+ access_token,
+ signing_key,
+ })
+}
+
+pub async fn setup(cfg: WorkerCfg, reset: bool) -> anyhow::Result<()> {
+ if reset {
+ if let Err(e) = std::fs::remove_file(&cfg.keys_path) {
+ if e.kind() != ErrorKind::NotFound {
+ Err(e)?;
+ }
+ }
+ }
+ let mut keys = match json_file::load(&cfg.keys_path) {
+ Ok(existing) => existing,
+ Err(e) if e.kind() == ErrorKind::NotFound => KeysFile::default(),
+ Err(e) => Err(e)?,
+ };
+ let client = reqwest::Client::new();
+ let client = AuthClient::new(&client, &cfg.api_url, &cfg.consumer);
+
+ info!("Setup OAuth access token");
+ if keys.access_token.is_none() {
+ let token_request = client.token_request().await?;
+
+ // TODO how to do it in a generic way ?
+ // TODO Ask MagnetBank if they could support out-of-band configuration
+ println!(
+ "Login at {}?oauth_token={}",
+ client.join("/NetBankOAuth/authtoken.xhtml"),
+ token_request.key
+ );
+ let auth_url = passterm::prompt_password_tty(Some("Enter the result URL>"))?;
+ let auth_url = reqwest::Url::parse(&auth_url)?;
+ let token_auth: TokenAuth =
+ serde_urlencoded::from_str(auth_url.query().unwrap_or_default())?;
+ assert_eq!(token_request.key, token_auth.oauth_token);
+
+ let access_token = client.token_access(&token_request, &token_auth).await?;
+ keys.access_token = Some(access_token);
+ json_file::persist(&cfg.keys_path, &keys)?;
+ }
+
+ let client = client.upgrade(keys.access_token.as_ref().unwrap());
+
+ info!("Setup Strong Customer Authentication");
+ // TODO find a proper way to check if SCA is required without trigerring SCA.GLOBAL_FEATURE_NOT_ENABLED
+ let request = client.request_sms_code().await?;
+ println!(
+ "A SCA code have been sent through {} to {}",
+ request.channel,
+ request.sent_to.join(", ")
+ );
+ let sca_code = passterm::prompt_password_tty(Some("Enter the code>"))?;
+ if let Err(e) = client.perform_sca(&sca_code).await {
+ // Ignore error if SCA already performed
+ if !matches!(e, ApiError::Magnet(MagnetError { ref short_message, .. }) if short_message == "TOKEN_SCA_HITELESITETT")
+ {
+ return Err(e.into());
+ }
+ }
+
+ info!("Setup public key");
+ // TODO find a proper way to check if a public key have been setup
+ // TODO use the better from/to_array API in the next version of the crypto lib
+ let signing_key = match keys.signing_key {
+ Some(bytes) => SigningKey::from_slice(bytes.as_ref())?,
+ None => {
+ let rand = SigningKey::random(&mut rand_core::OsRng);
+ let array: [u8; 32] = rand.to_bytes().as_slice().try_into().unwrap();
+ keys.signing_key = Some(Base32::from(array));
+ json_file::persist(&cfg.keys_path, &keys)?;
+ rand
+ }
+ };
+ if let Err(e) = client.upload_public_key(&signing_key).await {
+ // Ignore error if public key already uploaded
+ if !matches!(e, ApiError::Magnet(MagnetError { ref short_message, .. }) if short_message == "KULCS_MAR_HASZNALATBAN")
+ {
+ return Err(e.into());
+ }
+ }
+
+ Ok(())
+}
diff --git a/taler-magnet-bank/src/lib.rs b/taler-magnet-bank/src/lib.rs
@@ -0,0 +1,87 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use taler_common::types::payto::{FullPayto, Payto, PaytoErr};
+
+pub mod adapter;
+pub mod config;
+pub mod constant;
+pub mod db;
+pub mod dev;
+pub mod keys;
+pub mod magnet;
+pub mod worker;
+pub mod failure_injection {
+ pub fn fail_point(_name: &'static str) {
+ // TODO inject failures for error handling tests
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Eq)]
+pub struct MagnetPayto {
+ pub number: String,
+ pub name: String,
+}
+
+impl MagnetPayto {
+ pub fn as_payto(&self) -> Payto {
+ Payto::from_parts(
+ format_args!("{MAGNET_BANK}/{}", self.number),
+ [("receiver-name", &self.name)],
+ )
+ }
+}
+
+impl std::fmt::Display for MagnetPayto {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ self.as_payto().fmt(f)
+ }
+}
+#[derive(Debug, thiserror::Error)]
+pub enum MagnetPaytoErr {
+ #[error("missing Magnet Bank account number in path")]
+ MissingAccount,
+}
+
+const MAGNET_BANK: &str = "magnet-bank";
+
+impl TryFrom<&Payto> for MagnetPayto {
+ type Error = PaytoErr;
+
+ fn try_from(value: &Payto) -> Result<Self, Self::Error> {
+ let url = value.as_ref();
+ if url.domain() != Some(MAGNET_BANK) {
+ return Err(PaytoErr::UnsupportedKind(
+ MAGNET_BANK,
+ url.domain().unwrap_or_default().to_owned(),
+ ));
+ }
+ let Some(mut segments) = url.path_segments() else {
+ return Err(PaytoErr::custom(MagnetPaytoErr::MissingAccount));
+ };
+ let Some(account) = segments.next() else {
+ return Err(PaytoErr::custom(MagnetPaytoErr::MissingAccount));
+ };
+ if segments.next().is_some() {
+ return Err(PaytoErr::TooLong(MAGNET_BANK));
+ }
+ let full: FullPayto = value.query()?;
+ Ok(Self {
+ number: account.to_owned(),
+ name: full.receiver_name,
+ })
+ }
+}
diff --git a/adapter/taler-magnet-bank-adapter/src/magnet.rs b/taler-magnet-bank/src/magnet.rs
diff --git a/adapter/taler-magnet-bank-adapter/src/magnet/error.rs b/taler-magnet-bank/src/magnet/error.rs
diff --git a/adapter/taler-magnet-bank-adapter/src/magnet/oauth.rs b/taler-magnet-bank/src/magnet/oauth.rs
diff --git a/taler-magnet-bank/src/main.rs b/taler-magnet-bank/src/main.rs
@@ -0,0 +1,152 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use std::sync::Arc;
+
+use clap::Parser;
+use sqlx::PgPool;
+use taler_api::api::TalerApiBuilder;
+use taler_common::{
+ cli::ConfigCmd,
+ config::{parser::ConfigSource, Config},
+ taler_main,
+ types::payto::{payto, Payto},
+ CommonArgs,
+};
+use taler_magnet_bank::{
+ adapter::MagnetApi,
+ config::{DbCfg, ServeCfg, WorkerCfg},
+ db,
+ dev::{self, DevCmd},
+ keys,
+ magnet::AuthClient,
+ worker::Worker,
+ MagnetPayto,
+};
+
+#[derive(clap::Parser, Debug)]
+#[command(version, about, long_about = None)]
+struct Args {
+ #[clap(flatten)]
+ common: CommonArgs,
+
+ #[command(subcommand)]
+ cmd: Command,
+}
+
+#[derive(clap::Subcommand, Debug)]
+enum Command {
+ /// Setup taler-magnet-bank auth token and account settings for Wire Gateway use
+ Setup {
+ #[clap(long, short)]
+ reset: bool,
+ },
+ /// Initialize taler-magnet-bank database
+ Dbinit {
+ /// Reset database (DANGEROUS: All existing data is lost)
+ #[clap(long, short)]
+ reset: bool,
+ },
+ /// Run taler-magnet-bank HTTP server
+ Serve {
+ /// Check whether an API is in use (if it's useful to start the HTTP
+ /// server). Exit with 0 if at least one API is enabled, otherwise 1
+ #[clap(long)]
+ check: bool,
+ },
+ /// Run taler-magnet-bank worker
+ Worker {
+ // TODO account in config
+ account: Payto,
+ },
+ #[command(subcommand)]
+ Config(ConfigCmd),
+ /// Hidden dev commands
+ #[command(subcommand, hide(true))]
+ Dev(DevCmd),
+}
+
+async fn app(args: Args, cfg: Config) -> anyhow::Result<()> {
+ match args.cmd {
+ Command::Setup { reset } => {
+ let cfg = WorkerCfg::parse(&cfg)?;
+ keys::setup(cfg, reset).await?
+ }
+ Command::Dbinit { reset } => {
+ let db = DbCfg::parse(&cfg)?;
+ let pool = PgPool::connect_with(db.cfg).await?;
+ db::db_init(&pool, reset).await?;
+ }
+ Command::Serve { check } => {
+ if check {
+ let cfg = ServeCfg::parse(&cfg)?;
+ if cfg.revenue.is_none() && cfg.wire_gateway.is_none() {
+ std::process::exit(1);
+ }
+ } else {
+ let db = DbCfg::parse(&cfg)?;
+ let pool = PgPool::connect_with(db.cfg).await?;
+ let cfg = ServeCfg::parse(&cfg)?;
+ let api = Arc::new(MagnetApi::start(pool, payto("payto://magnet-bank/todo")).await);
+ let mut builder = TalerApiBuilder::new();
+ if let Some(cfg) = cfg.wire_gateway {
+ builder = builder.wire_gateway(api.clone(), cfg.auth);
+ }
+ if let Some(cfg) = cfg.revenue {
+ builder = builder.revenue(api, cfg.auth);
+ }
+ builder.serve(cfg.serve, None).await?;
+ }
+ }
+ Command::Worker { account } => {
+ let db = DbCfg::parse(&cfg)?;
+ let pool = PgPool::connect_with(db.cfg).await?;
+ let cfg = WorkerCfg::parse(&cfg)?;
+ let keys = keys::load(&cfg)?;
+ let client = reqwest::Client::new();
+ let client =
+ AuthClient::new(&client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token);
+ let account = MagnetPayto::try_from(&account)?;
+ let account = client.account(&account.number).await?;
+ let mut db = pool.acquire().await?.detach();
+ // TODO run in loop and handle errors
+ let mut worker = Worker {
+ client: &client,
+ db: &mut db,
+ account_number: &account.number,
+ account_code: account.code,
+ key: &keys.signing_key,
+ };
+ worker.run().await?;
+ }
+ Command::Config(cfg_cmd) => cfg_cmd.run(cfg)?,
+ Command::Dev(dev_cmd) => dev::dev(cfg, dev_cmd).await?,
+ }
+ Ok(())
+}
+
+fn main() {
+ let args = Args::parse();
+ taler_main(
+ ConfigSource::new(
+ "taler-magnet-bank",
+ "taler-magnet-bank",
+ "taler-magnet-bank",
+ ),
+ args.common.clone(),
+ |cfg| async move { app(args, cfg).await },
+ );
+}
diff --git a/adapter/taler-magnet-bank-adapter/src/worker.rs b/taler-magnet-bank/src/worker.rs
diff --git a/taler-magnet-bank/tests/api.rs b/taler-magnet-bank/tests/api.rs
@@ -0,0 +1,116 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use std::sync::Arc;
+
+use sqlx::PgPool;
+use taler_api::{api::TalerApiBuilder, auth::AuthMethod, subject::OutgoingSubject};
+use taler_common::{
+ api_common::ShortHashCode,
+ api_wire::{OutgoingHistory, TransferState},
+ types::{amount::amount, payto::payto, timestamp::Timestamp, url},
+};
+use taler_magnet_bank::{adapter::MagnetApi, db, MagnetPayto};
+use taler_test_utils::{
+ axum_test::TestServer,
+ db_test_setup,
+ routine::{admin_add_incoming_routine, revenue_routine, routine_pagination, transfer_routine},
+};
+
+async fn setup() -> (TestServer, PgPool) {
+ let pool = db_test_setup().await;
+ db::db_init(&pool, false).await.unwrap();
+ let api = Arc::new(MagnetApi::start(pool.clone(), payto("payto://magnet-bank/todo")).await);
+ let builder = TalerApiBuilder::new()
+ .wire_gateway(api.clone(), AuthMethod::None)
+ .revenue(api, AuthMethod::None)
+ .finalize();
+ let server = TestServer::new(builder).unwrap();
+
+ (server, pool)
+}
+
+#[tokio::test]
+async fn transfer() {
+ let (server, _) = setup().await;
+ transfer_routine(
+ &server,
+ TransferState::pending,
+ &payto("payto://magnet-bank/account?receiver-name=John+Smith"),
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn outgoing_history() {
+ let (server, pool) = setup().await;
+ routine_pagination::<OutgoingHistory, _>(
+ &server,
+ "/taler-wire-gateway/history/outgoing",
+ |it| {
+ it.outgoing_transactions
+ .into_iter()
+ .map(|it| *it.row_id as i64)
+ .collect()
+ },
+ |_, i| {
+ let acquire = pool.acquire();
+ async move {
+ let mut conn = acquire.await.unwrap();
+ db::register_tx_out(
+ &mut *conn,
+ &db::TxOut {
+ code: i as u64,
+ amount: amount("EUR:10"),
+ subject: "subject".to_owned(),
+ creditor: MagnetPayto {
+ number: "number".to_owned(),
+ name: "name".to_owned(),
+ },
+ timestamp: Timestamp::now_stable(),
+ },
+ &Some(OutgoingSubject(
+ ShortHashCode::rand(),
+ url("https://exchange.test"),
+ )),
+ )
+ .await
+ .unwrap();
+ }
+ },
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn admin_add_incoming() {
+ let (server, _) = setup().await;
+ admin_add_incoming_routine(
+ &server,
+ &payto("payto://magnet-bank/account?receiver-name=John+Smith"),
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn revenue() {
+ let (server, _) = setup().await;
+ revenue_routine(
+ &server,
+ &payto("payto://magnet-bank/account?receiver-name=John+Smith"),
+ )
+ .await;
+}