commit 1f583d2ee6e8b2aebdef2556ddd7d3aca80e52e7
parent 96b8c243dfaba804f39aa58240ee9f272c5c3ae6
Author: Antoine A <>
Date: Mon, 13 Jan 2025 18:05:56 +0100
magnet-bank: add api and serve cmd
Diffstat:
10 files changed, 400 insertions(+), 23 deletions(-)
diff --git a/common/taler-api/src/lib.rs b/common/taler-api/src/lib.rs
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2024 Taler Systems SA
+ Copyright (C) 2024-2025 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -15,6 +15,10 @@
*/
use std::{
+ fs::Permissions,
+ io::ErrorKind,
+ net::SocketAddr,
+ os::unix::fs::PermissionsExt,
sync::{
atomic::{AtomicU32, Ordering},
Arc,
@@ -35,6 +39,7 @@ use axum::{
use constants::{MAX_BODY_LENGTH, MAX_PAGE_SIZE, MAX_TIMEOUT_MS, WIRE_GATEWAY_API_VERSION};
use error::{failure, failure_code, ApiError, ApiResult};
use http_body_util::BodyExt;
+use listenfd::ListenFd;
use serde::de::DeserializeOwned;
use taler_common::{
api_params::{History, HistoryParams, Page, TransferParams},
@@ -46,7 +51,10 @@ use taler_common::{
error_code::ErrorCode,
types::amount::Amount,
};
-use tokio::{net::TcpListener, signal};
+use tokio::{
+ net::{TcpListener, UnixListener},
+ signal,
+};
use tracing::info;
pub mod auth;
@@ -173,6 +181,7 @@ where
}
pub trait WireGatewayImpl: Send + Sync {
+ fn name(&self) -> &str;
fn currency(&self) -> &str;
fn implementation(&self) -> Option<&str>;
fn transfer(
@@ -231,9 +240,66 @@ pub fn standard_layer(router: Router, auth: AuthMethod) -> Router {
))
}
+pub enum Serve {
+ Tcp(SocketAddr),
+ Unix {
+ path: String,
+ permission: Permissions,
+ },
+}
+
+impl Serve {
+ /// Resolve listener from a config and environement
+ fn resolve(&self) -> Result<Listener, std::io::Error> {
+ // Check if systemd is passing a socket
+ let mut listenfd = ListenFd::from_env();
+ if let Ok(Some(unix)) = listenfd.take_unix_listener(0) {
+ info!(
+ "Server listening on activated unix socket {:?}",
+ unix.local_addr()
+ );
+ Ok(Listener::Unix(UnixListener::from_std(unix)?))
+ } else if let Ok(Some(tcp)) = listenfd.take_tcp_listener(0) {
+ info!(
+ "Server listening on activated TCP socket {:?}",
+ tcp.local_addr()
+ );
+ Ok(Listener::Tcp(TcpListener::from_std(tcp)?))
+ } else {
+ match self {
+ Serve::Tcp(socket_addr) => {
+ info!("Server listening on {socket_addr}");
+ let listener = std::net::TcpListener::bind(socket_addr)?;
+ Ok(Listener::Tcp(TcpListener::from_std(listener)?))
+ }
+ Serve::Unix { path, permission } => {
+ info!(
+ "Server listening on unxis domain socket {path} {:o}",
+ permission.mode()
+ );
+ if let Err(e) = std::fs::remove_file(path) {
+ let kind = e.kind();
+ if kind != ErrorKind::NotFound {
+ return Err(e);
+ }
+ }
+ let listener = std::os::unix::net::UnixListener::bind(path)?;
+ std::fs::set_permissions(path, permission.clone())?;
+ Ok(Listener::Unix(UnixListener::from_std(listener)?))
+ }
+ }
+ }
+ }
+}
+
+enum Listener {
+ Tcp(TcpListener),
+ Unix(UnixListener),
+}
+
pub async fn server(
- tcp_listener: TcpListener,
mut router: Router,
+ serve: Serve,
auth: AuthMethod,
lifetime_counter: Option<AtomicU32>,
) -> Result<(), std::io::Error> {
@@ -248,13 +314,29 @@ pub async fn server(
))
}
let router = standard_layer(router, auth);
+ let listener = serve.resolve()?;
+
+ match listener {
+ Listener::Tcp(tcp_listener) => {
+ axum::serve(
+ tcp_listener,
+ router.layer(middleware::from_fn(logger_middleware)),
+ )
+ .with_graceful_shutdown(shutdown_signal(notify))
+ .await?;
+ }
+ Listener::Unix(unix_listener) => {
+ axum::serve(
+ unix_listener,
+ router.layer(middleware::from_fn(logger_middleware)),
+ )
+ .with_graceful_shutdown(shutdown_signal(notify))
+ .await?;
+ }
+ }
- axum::serve(
- tcp_listener,
- router.layer(middleware::from_fn(logger_middleware)),
- )
- .with_graceful_shutdown(shutdown_signal(notify))
- .await
+ info!("Server stopped");
+ Ok(())
}
struct LifetimeMiddlewareState {
@@ -334,7 +416,7 @@ pub fn wire_gateway_api<I: WireGatewayImpl + 'static>(wg: Arc<I>) -> Router {
"/config",
get(|State(state): State<Arc<I>>| async move {
Json(WireConfig {
- name: "taler-wire-gateway",
+ name: state.name(),
version: WIRE_GATEWAY_API_VERSION,
currency: state.currency(),
implementation: state.implementation(),
diff --git a/common/taler-api/tests/common/mod.rs b/common/taler-api/tests/common/mod.rs
@@ -47,6 +47,10 @@ pub struct SampleState {
}
impl WireGatewayImpl for SampleState {
+ fn name(&self) -> &str {
+ "taler-wire-gateway"
+ }
+
fn currency(&self) -> &str {
&self.currency
}
diff --git a/common/taler-common/src/config.rs b/common/taler-common/src/config.rs
@@ -14,7 +14,7 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::{fmt::Debug, str::FromStr};
+use std::{fmt::Debug, fs::Permissions, os::unix::fs::PermissionsExt, str::FromStr};
use indexmap::IndexMap;
use url::Url;
@@ -584,6 +584,38 @@ impl<'cfg, 'arg> Section<'cfg, 'arg> {
self.value(ty, option, |it| it.parse::<T>().map_err(|e| e.to_string()))
}
+ pub fn map<T: Copy>(
+ &self,
+ ty: &'arg str,
+ option: &'arg str,
+ map: &[(&str, T)],
+ ) -> Value<'arg, T> {
+ self.value(ty, option, |value| {
+ map.iter()
+ .find_map(|(k, v)| (*k == value).then_some(*v))
+ .ok_or_else(|| {
+ let mut buf = "expected '".to_owned();
+ match map {
+ [] => unreachable!("you must provide at least one mapping"),
+ [(unique, _)] => buf.push_str(unique),
+ [(first, _), other @ .., (second, _)] => {
+ buf.push_str(first);
+ for (k, _) in other {
+ buf.push_str("', '");
+ buf.push_str(k);
+ }
+ buf.push_str("' or '");
+ buf.push_str(second);
+ }
+ }
+ buf.push_str("' got '");
+ buf.push_str(value);
+ buf.push('\'');
+ buf
+ })
+ })
+ }
+
/** Access [option] as str */
pub fn str(&self, option: &'arg str) -> Value<'arg, String> {
self.value("string", option, |it| Ok::<_, &str>(it.to_owned()))
@@ -594,6 +626,14 @@ impl<'cfg, 'arg> Section<'cfg, 'arg> {
self.value("path", option, |it| self.config.pathsub(it, 0))
}
+ pub fn unix_mode(&self, option: &'arg str) -> Value<'arg, Permissions> {
+ self.value("unix mode", option, |it| {
+ u32::from_str_radix(it, 8)
+ .map(Permissions::from_mode)
+ .map_err(|_| format!("'{it}' not a valid number"))
+ })
+ }
+
/** Access [option] as a number */
pub fn number<T: FromStr>(&self, option: &'arg str) -> Value<'arg, T> {
self.value("number", option, |it| {
diff --git a/magnet-bank.conf b/magnet-bank.conf
@@ -4,5 +4,20 @@ CONSUMER_KEY = "Consumer"
CONSUMER_SECRET = "qikgjxc5y06tiil7qgrmh09l7rfi5a8e"
KEYS_FILE = keys.json
+# How "magnet-bank serve" serves its API, this can either be tcp or unix
+SERVE = tcp
+
+# Port on which the HTTP server listens, e.g. 9967. Only used if SERVE is tcp.
+PORT = 8080
+
+# Which IP address should we bind to? E.g. ``127.0.0.1`` or ``::1``for loopback. Only used if SERVE is tcp.
+BIND_TO = 0.0.0.0
+
+# Which unix domain path should we bind to? Only used if SERVE is unix.
+# UNIXPATH = libeufin-bank.sock
+
+# What should be the file access permissions for UNIXPATH? Only used if SERVE is unix.
+# UNIXPATH_MODE = 660
+
[magnet-bank-postgres]
CONFIG = postgres:/magnet-bank
\ No newline at end of file
diff --git a/wire-gateway/magnet-bank/src/config.rs b/wire-gateway/magnet-bank/src/config.rs
@@ -14,8 +14,12 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
+use std::net::{IpAddr, SocketAddr};
+
+use base64::{prelude::BASE64_STANDARD, Engine};
use reqwest::Url;
use sqlx::postgres::PgConnectOptions;
+use taler_api::{auth::AuthMethod, Serve};
use taler_common::config::{Config, ValueError};
use crate::magnet::Token;
@@ -33,6 +37,56 @@ impl DbConfig {
}
}
+pub struct WireGatewayConfig {
+ pub serve: Serve,
+ pub auth: AuthMethod,
+}
+
+impl WireGatewayConfig {
+ pub fn parse(cfg: &Config) -> Result<Self, ValueError> {
+ let sect = cfg.section("magnet-bank");
+
+ let parse_tcp = || {
+ let port = sect.number("PORT").require()?;
+ let ip: IpAddr = sect.parse("IP addr", "BIND_TO").require()?;
+ Ok(Serve::Tcp(SocketAddr::new(ip, port)))
+ };
+ let parse_unix = || {
+ let path = sect.path("UNIXPATH").require()?;
+ let permission = sect.unix_mode("UNIXPATH_MODE").require()?;
+ Ok(Serve::Unix { path, permission })
+ };
+ let serve = sect
+ .map::<&dyn Fn() -> Result<Serve, ValueError>>(
+ "serve",
+ "SERVE",
+ &[("tcp", &parse_tcp), ("unix", &parse_unix)],
+ )
+ .require()?()?;
+
+ let parse_basic = || {
+ let username = sect.str("USERNAME").require()?;
+ let password = sect.str("PASSWORD").require()?;
+ Ok(AuthMethod::Basic(
+ BASE64_STANDARD.encode(format!("{username}:{password}")),
+ ))
+ };
+ let parse_bearer = || Ok(AuthMethod::Bearer(sect.str("AUTH_TOKEN").require()?));
+ let auth = sect
+ .map::<&dyn Fn() -> Result<AuthMethod, ValueError>>(
+ "auth_method",
+ "AUTH_METHOD",
+ &[
+ ("none", &|| Ok(AuthMethod::None)),
+ ("basic", &parse_basic),
+ ("bearer", &parse_bearer),
+ ],
+ )
+ .require()?()?;
+ Ok(Self { serve, auth })
+ }
+}
+
pub struct MagnetConfig {
pub api_url: Url,
pub consumer: Token,
diff --git a/wire-gateway/magnet-bank/src/db.rs b/wire-gateway/magnet-bank/src/db.rs
@@ -486,7 +486,7 @@ mod test {
async fn setup() -> (PgConnection, PgPool) {
let pool = test_utils::db_test_setup().await;
- db::db_init(&pool, true).await.expect("dbinit");
+ db::db_init(&pool, false).await.expect("dbinit");
let conn = pool.acquire().await.expect("aquire conn").leak();
(conn, pool)
}
diff --git a/wire-gateway/magnet-bank/src/lib.rs b/wire-gateway/magnet-bank/src/lib.rs
@@ -0,0 +1,22 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+pub mod config;
+pub mod constant;
+pub mod db;
+pub mod keys;
+pub mod magnet;
+pub mod wire_gateway;
diff --git a/wire-gateway/magnet-bank/src/magnet/error.rs b/wire-gateway/magnet-bank/src/magnet/error.rs
@@ -141,7 +141,7 @@ async fn magnet_url<T: DeserializeOwned>(response: reqwest::Result<Response>) ->
serde_urlencoded::from_str(&body).map_err(ApiError::Form)
}
-pub trait MagnetBuilder {
+pub(crate) trait MagnetBuilder {
async fn magnet_call_encoded<T: DeserializeOwned>(self) -> ApiResult<T>;
async fn magnet_call<T: DeserializeOwned>(self) -> ApiResult<T>;
async fn magnet_empty(self) -> ApiResult<()>;
diff --git a/wire-gateway/magnet-bank/src/main.rs b/wire-gateway/magnet-bank/src/main.rs
@@ -14,21 +14,22 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::{future::Future, path::PathBuf};
+use std::{future::Future, path::PathBuf, sync::Arc};
use clap::Parser;
-use config::{DbConfig, MagnetConfig};
+use magnet_bank::{
+ config::{DbConfig, MagnetConfig, WireGatewayConfig},
+ db, keys,
+ wire_gateway::MagnetWireGateway,
+};
use sqlx::PgPool;
-use taler_common::config::{parser::ConfigSource, Config};
+use taler_common::{
+ config::{parser::ConfigSource, Config},
+ types::payto::payto,
+};
use tracing::{error, Level};
use tracing_subscriber::{util::SubscriberInitExt as _, FmtSubscriber};
-mod config;
-mod constant;
-mod db;
-mod keys;
-mod magnet;
-
#[derive(clap::Parser, Debug)]
#[command(version, about, long_about = None)]
struct Args {
@@ -48,13 +49,15 @@ struct Args {
#[derive(clap::Subcommand, Debug)]
enum Command {
- /// Setup Magnet Bank auth token and account settings for Wire Gateway use
+ /// Setup magnet-bank auth token and account settings for Wire Gateway use
Setup,
/// Initialize magnet-bank database
Dbinit {
#[clap(long, short)]
reset: bool,
},
+ /// Run magnet-bank HTTP server
+ Serve,
}
fn setup(level: Option<tracing::Level>, app: impl Future<Output = Result<(), anyhow::Error>>) {
@@ -95,6 +98,21 @@ async fn app(args: Args) -> Result<(), anyhow::Error> {
let pool = PgPool::connect_with(db.cfg).await?;
db::db_init(&pool, reset).await?;
}
+ Command::Serve => {
+ let db = DbConfig::parse(&cfg)?;
+ let pool = PgPool::connect_with(db.cfg).await?;
+ let cfg = WireGatewayConfig::parse(&cfg)?;
+ taler_api::server(
+ taler_api::wire_gateway_api(Arc::new(MagnetWireGateway {
+ pool,
+ payto: payto("payto://todo"),
+ })),
+ cfg.serve,
+ cfg.auth,
+ None,
+ )
+ .await?;
+ }
}
Ok(())
}
diff --git a/wire-gateway/magnet-bank/src/wire_gateway.rs b/wire-gateway/magnet-bank/src/wire_gateway.rs
@@ -0,0 +1,142 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use taler_api::{
+ error::{failure, ApiResult},
+ subject::IncomingSubject,
+ WireGatewayImpl,
+};
+use taler_common::{
+ api_common::{safe_u64, SafeU64},
+ api_params::{History, Page},
+ api_wire::{
+ AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse,
+ IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse,
+ TransferState, TransferStatus,
+ },
+ error_code::ErrorCode,
+ types::{
+ payto::{payto, Payto},
+ timestamp::Timestamp,
+ },
+};
+
+use crate::{
+ constant::CURRENCY,
+ db::{self, TxInAdmin},
+};
+
+pub struct MagnetWireGateway {
+ pub pool: sqlx::PgPool,
+ pub payto: Payto,
+}
+
+impl WireGatewayImpl for MagnetWireGateway {
+ fn name(&self) -> &str {
+ "magnet-bank"
+ }
+
+ fn currency(&self) -> &str {
+ CURRENCY
+ }
+
+ fn implementation(&self) -> Option<&str> {
+ None
+ }
+
+ async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> {
+ let result = db::make_transfer(&self.pool, &req, &Timestamp::now()).await?;
+ match result {
+ db::TransferResult::Success { id, timestamp } => Ok(TransferResponse {
+ timestamp,
+ row_id: SafeU64::try_from(id).unwrap(),
+ }),
+ db::TransferResult::RequestUidReuse => Err(failure(
+ ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED,
+ "request_uid used already",
+ )),
+ db::TransferResult::WtidReuse => unimplemented!(),
+ }
+ }
+
+ async fn transfer_page(
+ &self,
+ page: Page,
+ status: Option<TransferState>,
+ ) -> ApiResult<TransferList> {
+ Ok(TransferList {
+ transfers: db::transfer_page(&self.pool, &status, &page).await?,
+ debit_account: payto("payto://todo"),
+ })
+ }
+
+ async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> {
+ Ok(db::transfer_by_id(&self.pool, id).await?)
+ }
+
+ async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> {
+ Ok(OutgoingHistory {
+ outgoing_transactions: db::outgoing_history(&self.pool, ¶ms).await?,
+ debit_account: self.payto.clone(),
+ })
+ }
+
+ async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> {
+ Ok(IncomingHistory {
+ incoming_transactions: db::incoming_history(&self.pool, ¶ms).await?,
+ credit_account: self.payto.clone(),
+ })
+ }
+
+ async fn add_incoming_reserve(
+ &self,
+ req: AddIncomingRequest,
+ ) -> ApiResult<AddIncomingResponse> {
+ let result = db::register_tx_in_admin(
+ &self.pool,
+ &TxInAdmin {
+ amount: req.amount,
+ subject: format!("Admin incoming {}", req.reserve_pub),
+ debit_payto: req.debit_account,
+ timestamp: Timestamp::now(),
+ metadata: IncomingSubject::Reserve(req.reserve_pub),
+ },
+ )
+ .await?;
+ Ok(AddIncomingResponse {
+ row_id: safe_u64(result.row_id),
+ timestamp: result.timestamp,
+ })
+ }
+
+ async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> {
+ let result = db::register_tx_in_admin(
+ &self.pool,
+ &TxInAdmin {
+ amount: req.amount,
+ subject: format!("Admin incoming KYC:{}", req.account_pub),
+ debit_payto: req.debit_account,
+ timestamp: Timestamp::now(),
+ metadata: IncomingSubject::Kyc(req.account_pub),
+ },
+ )
+ .await?;
+ Ok(AddKycauthResponse {
+ row_id: safe_u64(result.row_id),
+ timestamp: result.timestamp,
+ })
+ }
+}