taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit dfa67e614a67ce85367b70d606e1869ecaba883e
parent 559e9869f102c4aa36d00b5c864be2f75cc25b90
Author: Antoine A <>
Date:   Thu,  8 Jan 2026 15:33:58 +0100

common: use common HTTP client logic

Diffstat:
MCargo.toml | 8+++-----
Acommon/http-client/Cargo.toml | 34++++++++++++++++++++++++++++++++++
Acommon/http-client/src/builder.rs | 281+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acommon/http-client/src/headers.rs | 62++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acommon/http-client/src/lib.rs | 117+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Acommon/http-client/src/sse.rs | 221+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtaler-cyclos/Cargo.toml | 12+++---------
Mtaler-cyclos/src/bin/cyclos-codegen.rs | 23++++++++++++++++++-----
Mtaler-cyclos/src/bin/cyclos-harness.rs | 6+++---
Mtaler-cyclos/src/config.rs | 4++--
Mtaler-cyclos/src/cyclos_api/api.rs | 369+++++++++++--------------------------------------------------------------------
Mtaler-cyclos/src/cyclos_api/client.rs | 12+++++++-----
Mtaler-cyclos/src/db.rs | 4++--
Mtaler-cyclos/src/dev.rs | 4++--
Mtaler-cyclos/src/main.rs | 6+++---
Mtaler-cyclos/src/notification.rs | 6++----
Mtaler-cyclos/src/setup.rs | 4++--
Mtaler-cyclos/src/worker.rs | 15++++++++-------
Mtaler-magnet-bank/Cargo.toml | 5+++--
Mtaler-magnet-bank/src/bin/magnet-bank-harness.rs | 8++++----
Mtaler-magnet-bank/src/config.rs | 4++--
Mtaler-magnet-bank/src/db.rs | 4++--
Mtaler-magnet-bank/src/dev.rs | 4++--
Mtaler-magnet-bank/src/magnet_api/api.rs | 146++++++++++++++++++++++++++-----------------------------------------------------
Mtaler-magnet-bank/src/magnet_api/client.rs | 30++++++++++++++----------------
Mtaler-magnet-bank/src/magnet_api/oauth.rs | 14+++++++-------
Mtaler-magnet-bank/src/main.rs | 4++--
Mtaler-magnet-bank/src/setup.rs | 12++++++------
Mtaler-magnet-bank/src/worker.rs | 21+++++++--------------
29 files changed, 918 insertions(+), 522 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "common/taler-build", "common/taler-test-utils", "common/failure-injection", + "common/http-client", "taler-magnet-bank", "taler-cyclos", ] @@ -50,6 +51,8 @@ taler-api = { path = "common/taler-api" } taler-test-utils = { path = "common/taler-test-utils" } taler-build = { path = "common/taler-build" } failure-injection = { path = "common/failure-injection" } +http-client = { path = "common/http-client" } +hyper = { version = "1.8.1", features = ["client", "http1", "http2"] } anyhow = "1" http-body-util = "0.1.2" libdeflater = "1.22.0" @@ -60,8 +63,3 @@ ed25519-dalek = { version = "2.1.1", default-features = false, features = [ ] } rand_core = { version = "0.6.4" } compact_str = { version = "0.9.0", features = ["serde", "sqlx-postgres"] } -reqwest = { version = "0.13", default-features = false, features = [ - "json", - "rustls", - "query" -] } diff --git a/common/http-client/Cargo.toml b/common/http-client/Cargo.toml @@ -0,0 +1,33 @@ +[package] +name = "http-client" +version.workspace = true +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license-file.workspace = true + +[dependencies] +serde_json = { workspace = true, features = ["raw_value"] } +serde.workspace = true +serde_path_to_error.workspace = true +serde_urlencoded.workspace = true +thiserror.workspace = true +tracing.workspace = true +taler-common.workspace = true +compact_str.workspace = true +url.workspace = true +anyhow.workspace = true +base64.workspace = true +hyper.workspace = true +tokio.workspace = true +tokio-util = { version = "0.7.17", default-features = false, features = [ + "codec", + "io", +] } +futures-util = { version = "0.3", default-features = false } +http-body-util = { version = "0.1" } +hyper-util = { version = "0.1", features = ["client-legacy", "http1", "http2"] } +hyper-rustls = { version = "0.27", features = ["http2"] } +rustls = "0.23" +http = "1.4" +\ No newline at end of file diff --git a/common/http-client/src/builder.rs b/common/http-client/src/builder.rs @@ -0,0 +1,281 @@ +/* + This file is part of TALER + Copyright (C) 2026 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::{borrow::Cow, fmt}; + +use base64::Engine; +use http::{ + HeaderMap, HeaderName, HeaderValue, StatusCode, + header::{self}, +}; +use http_body_util::BodyExt; +use http_body_util::{BodyDataStream, Full}; +use hyper::{Method, body::Bytes}; +use serde::{Serialize, de::DeserializeOwned}; +use url::Url; + +use crate::{Client, ClientErr, Ctx, headers::HeaderParser, sse::SseClient}; + +struct Builder { + headers: HeaderMap, + body: Full<Bytes>, +} + +pub struct Req { + client: Client, + url: Url, + builder: Result<Builder, ClientErr>, + ctx: Ctx, +} + +impl Req { + pub fn new( + client: &Client, + method: Method, + base_url: &Url, + path: impl Into<Cow<'static, str>>, + ) -> Self { + let path = path.into(); + let url = base_url.join(&path).unwrap(); + Self { + client: client.clone(), + url, + builder: Ok(Builder { + headers: HeaderMap::new(), + body: Full::default(), + }), + ctx: Ctx { + path, + method, + status: None, + }, + } + } + + pub fn method(&self) -> &Method { + &self.ctx.method + } + + pub fn url(&self) -> &Url { + &self.url + } + + pub fn header<K, V>(mut self, key: K, value: V) -> Self + where + K: TryInto<HeaderName>, + <K as TryInto<HeaderName>>::Error: Into<http::Error>, + V: TryInto<HeaderValue>, + <V as TryInto<HeaderValue>>::Error: Into<http::Error>, + { + self.builder = self.builder.and_then(move |mut builder| { + let name = key.try_into().map_err(Into::into)?; + let value = value.try_into().map_err(Into::into)?; + builder.headers.insert(name, value); + Ok(builder) + }); + self + } + + pub fn sensitive_header<K, V>(mut self, key: K, value: V) -> Self + where + K: TryInto<HeaderName>, + <K as TryInto<HeaderName>>::Error: Into<http::Error>, + V: TryInto<HeaderValue>, + <V as TryInto<HeaderValue>>::Error: Into<http::Error>, + { + self.builder = self.builder.and_then(move |mut builder| { + let name = key.try_into().map_err(Into::into)?; + let mut value = value.try_into().map_err(Into::into)?; + value.set_sensitive(true); + builder.headers.insert(name, value); + Ok(builder) + }); + self + } + + pub fn query<T: Serialize>(mut self, name: &str, value: T) -> Self { + if self.builder.is_ok() { + let mut pairs = self.url.query_pairs_mut(); + let serializer = serde_urlencoded::Serializer::new(&mut pairs); + if let Err(e) = [(name, value)].serialize(serializer) { + drop(pairs); + self.builder = Err(e.into()); + return self; + } + } + + self + } + + pub fn json<T: Serialize + ?Sized>(mut self, json: &T) -> Self { + let mut buf = Vec::new(); + let serializer: &mut serde_json::Serializer<&mut Vec<u8>> = + &mut serde_json::Serializer::new(&mut buf); + if let Err(e) = serde_path_to_error::serialize(json, serializer).map_err(ClientErr::ReqJson) + { + self.builder = Err(e); + return self; + }; + if let Ok(builder) = &mut self.builder { + builder.headers.insert( + header::CONTENT_TYPE, + HeaderValue::from_static("application/json"), + ); + builder.body = Full::new(buf.into()) + } + self + } + + pub fn basic_auth<U, P>(self, username: U, password: P) -> Req + where + U: fmt::Display, + P: fmt::Display, + { + let token = format!("{username}:{password}"); + let mut header = "Basic ".to_string(); + base64::engine::general_purpose::STANDARD.encode_string(token, &mut header); + self.sensitive_header(header::AUTHORIZATION, header) + } + + pub fn bearer_auth<T>(self, token: T) -> Req + where + T: fmt::Display, + { + let header = format!("Bearer {token}"); + self.sensitive_header(header::AUTHORIZATION, header) + } + + pub fn req_sse(mut self, client: &SseClient) -> Req { + self = self + .header( + header::ACCEPT, + HeaderValue::from_static("text/event-stream"), + ) + .header(header::CACHE_CONTROL, HeaderValue::from_static("no-cache")); + if let Some(id) = &client.last_event_id { + self = self.header( + HeaderName::from_static("Last-Event-ID"), + HeaderValue::from_str(id).unwrap(), + ); + } + self + } + + pub async fn send(self) -> Result<(Ctx, Res), (Ctx, ClientErr)> { + let Self { + client, + ctx, + builder, + url, + } = self; + let req = match async { + let Builder { headers, body } = builder?; + let mut builder = http::request::Request::builder() + .uri(url.as_str()) + .method(ctx.method.clone()); + if let Some(headers_mut) = builder.headers_mut() { + *headers_mut = headers; + } + let req = builder.body(body)?; + Ok(req) + } + .await + { + Ok(it) => it, + Err(e) => return Err((ctx, e)), + }; + match client.request(req).await { + Ok(res) => { + let (head, body) = res.into_parts(); + Ok((ctx, Res { head, body })) + } + Err(e) => Err((ctx, ClientErr::ReqTransport(e.into()))), + } + } +} + +pub struct Res { + head: http::response::Parts, + body: hyper::body::Incoming, +} + +impl Res { + pub fn status(&self) -> StatusCode { + self.head.status + } + + pub fn str_header(&self, name: &'static str) -> Result<String, ClientErr> { + self.head + .headers + .str_header(name) + .map_err(ClientErr::Headers) + } + + pub fn int_header(&self, name: &'static str) -> Result<u64, ClientErr> { + self.head + .headers + .int_header(name) + .map_err(ClientErr::Headers) + } + + pub fn bool_header(&self, name: &'static str) -> Result<bool, ClientErr> { + self.head + .headers + .bool_header(name) + .map_err(ClientErr::Headers) + } + + pub fn sse(self, client: &mut SseClient) -> Result<(), ClientErr> { + // TODO check content type? + // TODO check status + client.connect(BodyDataStream::new(self.body)); + Ok(()) + } + + async fn full_body(self) -> Result<Bytes, ClientErr> { + // TODO body size limit ? + self.body + .collect() + .await + .map(|it| it.to_bytes()) + .map_err(|e| ClientErr::ResTransport(e.into())) + } + + /** Parse request body into a JSON type */ + pub async fn json<T: DeserializeOwned>(self) -> Result<T, ClientErr> { + // TODO check content type? + let body = self.full_body().await?; + let deserializer = &mut serde_json::Deserializer::from_slice(&body); + let parsed = serde_path_to_error::deserialize(deserializer).map_err(ClientErr::ResJson)?; + Ok(parsed) + } + + /** Parse request body into a URL encoded type */ + pub async fn urlencoded<T: DeserializeOwned>(self) -> Result<T, ClientErr> { + // TODO check content type? + let body = self.full_body().await?; + let parsed = serde_urlencoded::from_bytes(&body).map_err(ClientErr::Form)?; + Ok(parsed) + } + + /** Parse request body into as text */ + pub async fn text(self) -> Result<String, ClientErr> { + let body = self.full_body().await?; + let parsed = + String::from_utf8(body.to_vec()).map_err(|e| ClientErr::Text(e.utf8_error()))?; + Ok(parsed) + } +} diff --git a/common/http-client/src/headers.rs b/common/http-client/src/headers.rs @@ -0,0 +1,62 @@ +/* + This file is part of TALER + Copyright (C) 2026 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use hyper::{HeaderMap, header::HeaderName}; + +#[derive(Debug, thiserror::Error)] +pub enum HeaderError { + #[error("Missing header {0}")] + Missing(&'static str), + #[error("Malformed header {0} expected string got binary")] + NotStr(&'static str), + #[error("Malformed header {0} expected {1} got '{2}'")] + Malformed(&'static str, &'static str, Box<str>), +} + +pub trait HeaderParser { + fn parse<T>( + &self, + name: &'static str, + kind: &'static str, + transform: impl FnOnce(&str) -> Result<T, ()>, + ) -> Result<T, HeaderError>; + fn str_header(&self, name: &'static str) -> Result<String, HeaderError> { + self.parse(name, "string", |s| Ok(s.to_owned())) + } + fn int_header(&self, name: &'static str) -> Result<u64, HeaderError> { + self.parse(name, "integer", |s| s.parse().map_err(|_| ())) + } + fn bool_header(&self, name: &'static str) -> Result<bool, HeaderError> { + self.parse(name, "boolean", |s| s.parse().map_err(|_| ())) + } +} + +impl HeaderParser for HeaderMap { + fn parse<T>( + &self, + name: &'static str, + kind: &'static str, + transform: impl FnOnce(&str) -> Result<T, ()>, + ) -> Result<T, HeaderError> { + let Some(value) = self.get(HeaderName::from_static(name)) else { + return Err(HeaderError::Missing(name)); + }; + let Ok(str) = value.to_str() else { + return Err(HeaderError::NotStr(name)); + }; + transform(str).map_err(|_| HeaderError::Malformed(name, kind, str.into())) + } +} diff --git a/common/http-client/src/lib.rs b/common/http-client/src/lib.rs @@ -0,0 +1,117 @@ +/* + This file is part of TALER + Copyright (C) 2026 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::{borrow::Cow, fmt::Display, str::Utf8Error}; + +use http_body_util::Full; +use hyper::{Method, StatusCode, body::Bytes}; +use hyper_rustls::ConfigBuilderExt as _; +use hyper_util::rt::TokioExecutor; +use taler_common::error::FmtSource; +use thiserror::Error; + +use crate::headers::HeaderError; + +pub mod builder; +pub mod headers; +pub mod sse; + +pub type Client = hyper_util::client::legacy::Client< + hyper_rustls::HttpsConnector<hyper_util::client::legacy::connect::HttpConnector>, + Full<Bytes>, +>; + +#[derive(Error, Debug)] +/// API call errors +pub enum ClientErr { + #[error("request: {0}")] + Http(#[from] http::Error), + #[error("request query: {0}")] + Query(#[from] serde_urlencoded::ser::Error), + #[error("request JSON body: {0}")] + ReqJson(serde_path_to_error::Error<serde_json::Error>), + #[error("request: {0}")] + ReqTransport(FmtSource<hyper_util::client::legacy::Error>), + #[error("response JSON body: {0}")] + ResJson(serde_path_to_error::Error<serde_json::Error>), + #[error("response txt body: {0}")] + Text(#[from] Utf8Error), + #[error("response form body: {0}")] + Form(#[from] serde_urlencoded::de::Error), + #[error("response headers: {0}")] + Headers(#[from] HeaderError), + #[error("response: {0}")] + ResTransport(FmtSource<hyper::Error>), +} + +pub fn client() -> anyhow::Result<Client> { + rustls::crypto::aws_lc_rs::default_provider() + .install_default() + .expect("failed to install the default TLS provider"); + + // Prepare the TLS client config + let tls = rustls::ClientConfig::builder() + .with_native_roots()? + .with_no_client_auth(); + + // Prepare the HTTPS connector + let https = hyper_rustls::HttpsConnectorBuilder::new() + .with_tls_config(tls) + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + + // Build the hyper client from the HTTPS connector. + let client = hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build(https); + Ok(client) +} + +#[derive(Debug, Clone)] +pub struct Ctx { + path: Cow<'static, str>, + method: Method, + status: Option<StatusCode>, +} + +impl Ctx { + pub fn wrap<E: std::error::Error>(self, err: E) -> ApiErr<E> { + ApiErr { ctx: self, err } + } +} + +impl Display for Ctx { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + path, + method, + status, + } = self; + write!(f, "{path} {method} ")?; + if let Some(status) = status { + write!(f, "{status}")?; + } + Ok(()) + } +} + +#[derive(Debug, Error)] +/// Error happening whith api request context +#[error("{ctx} {err}")] +pub struct ApiErr<E: std::error::Error> { + pub ctx: Ctx, + pub err: E, +} diff --git a/common/http-client/src/sse.rs b/common/http-client/src/sse.rs @@ -0,0 +1,221 @@ +/* + This file is part of TALER + Copyright (C) 2026 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::pin::Pin; + +use compact_str::CompactString; +use futures_util::{Stream, StreamExt as _, stream}; +use tokio_util::{ + bytes::Bytes, + codec::{FramedRead, LinesCodec, LinesCodecError}, + io::StreamReader, +}; +use tracing::trace; + +#[derive(Debug, Default, PartialEq, Eq)] +pub struct SseMessage { + pub event: CompactString, + pub data: String, +} + +type SseStream = dyn Stream<Item = std::result::Result<String, LinesCodecError>> + Send; + +/// Server-sent event client +pub struct SseClient { + pub last_event_id: Option<CompactString>, + pub reconnection_time: Option<u64>, + stream: Pin<Box<SseStream>>, +} + +impl SseClient { + pub fn new() -> Self { + Self { + last_event_id: None, + reconnection_time: None, + stream: Box::pin(stream::empty()), + } + } + + pub fn connect<E: std::error::Error + Send + Sync + 'static>( + &mut self, + stream: impl Stream<Item = Result<Bytes, E>> + 'static + Send, + ) { + let stream = stream.map(|it| it.map_err(std::io::Error::other)); + let lines = FramedRead::new(StreamReader::new(stream), LinesCodec::new()); + self.stream = Box::pin(lines); + } + + pub async fn next(&mut self) -> Option<Result<SseMessage, LinesCodecError>> { + // TODO add tests + let mut event = CompactString::new("message"); + let mut data = None::<String>; + while let Some(res) = self.stream.next().await { + let line = match res { + Ok(line) => line, + Err(e) => return Some(Err(e)), + }; + // Parse line + let (field, value): (&str, &str) = if line.is_empty() { + if let Some(data) = data.take() { + return Some(Ok(SseMessage { event, data })); + } else { + event = CompactString::new("message"); + continue; + } + } else if let Some(comment) = line.strip_prefix(':') { + trace!(target: "sse", "{comment}"); + continue; + } else if let Some((k, v)) = line.split_once(':') { + (k, v.strip_prefix(' ').unwrap_or(v)) + } else { + (&line, "") + }; + + // Process field + match field { + "event" => event = CompactString::new(value), + "data" => match data.as_mut() { + Some(data) => { + data.push('\n'); + data.push_str(value); + } + None => data = Some(value.to_string()), + }, + "id" => { + if !value.contains('\0') { + self.last_event_id = Some(CompactString::new(value)) + } + } + "retry" => { + if value.as_bytes().iter().all(|c| c.is_ascii_digit()) + && let Ok(int) = value.parse::<u64>() + { + self.reconnection_time = Some(int) + } + } + _ => continue, + } + } + None + } +} + +impl Default for SseClient { + fn default() -> Self { + Self::new() + } +} + +#[tokio::test] +pub async fn protocol() { + pub async fn test( + stream: &'static str, + result: &[(&str, &str)], + last_event_id: Option<&str>, + reconnection_time: Option<u64>, + ) { + let stream = stream::iter( + stream + .as_bytes() + .chunks(12) + .map(|chunk| std::io::Result::Ok(Bytes::from_static(chunk))), + ); + let mut client = SseClient::new(); + client.connect(stream); + let mut res = Vec::new(); + while let Some(msg) = client.next().await { + res.push(msg.unwrap()); + } + assert_eq!( + result, + &res.iter() + .map(|m| (m.event.as_ref(), m.data.as_ref())) + .collect::<Vec<_>>() + ); + assert_eq!(client.last_event_id.as_deref(), last_event_id); + assert_eq!(client.reconnection_time, reconnection_time); + } + + macro_rules! check { + // Handle multiple tuples + optional id and retry + ($stream:expr $(, ($e:expr, $d:expr))* $(, id: $id:expr)? $(, retry: $retry:expr)?) => { + test( + $stream, + &[ $( ($e, $d) ),* ], + { let mut _id = None; $( _id = Some($id); )? _id }, + { let mut _r = None; $( _r = Some($retry); )? _r } + ).await + }; + } + + check!("data\n\n", ("message", "")); + check!("data:key:value\n\n", ("message", "key:value")); + check!("data: value\n\n", ("message", "value")); + check!("data:first\ndata:second\n\n", ("message", "first\nsecond")); + + check!( + "event:first\nevent:second\ndata:test\n\n", + ("second", "test") + ); + + check!("data:test\r\n\r\n", ("message", "test")); + check!("data:test\r\r"); + check!("data:line1\r\ndata:line2\n\n", ("message", "line1\nline2")); + check!("data:test\n"); + check!("data:test\n\n\n", ("message", "test")); + check!("data:\ndata:\n\n", ("message", "\n")); + check!("data:\ndata:content\ndata:\n\n", ("message", "\ncontent\n")); + check!("data:Hello 世界 🌍\n\n", ("message", "Hello 世界 🌍")); + check!("data: \n\n", ("message", " ")); + check!("id:123\ndata:test\n\n", ("message", "test"), id: "123"); + check!("id:first\nid:second\ndata:test\n\n", ("message", "test"), id: "second"); + check!("id:first\nid:second\nid:\ndata:test\n\n", ("message", "test"), id: ""); + check!("id:\ndata:test\n\n", ("message", "test"), id: ""); + check!( + "id:test:123\ndata:test\n\n", + ("message", "test"), + id: "test:123" + ); + check!("id:123\x00456\ndata:test\n\n", ("message", "test")); + check!("id:123\n\n", id: "123"); + check!( + "id:123\ndata:first\n\ndata:second\n\n", + ("message", "first"), ("message", "second"), + id: "123" + ); + check!("event:customEvent\ndata:test\n\n", ("customEvent", "test")); + check!("event:\ndata:test\n\n", ("", "test")); + check!("event:my event\ndata:test\n\n", ("my event", "test")); + + check!("retry:3000\n\n", retry: 3000); + check!("retry:0\n\n", retry: 0); + check!("retry:abc\n\n"); + check!("retry:-1000\n\n"); + check!("retry:1000.5\n\n"); + check!("retry:1000\nretry:2000\n\n", retry: 2000); + + check!(":comment\n\n"); + check!(": comment\n\n"); + check!(":comment\ndata:test\n\n", ("message", "test")); + check!("unknown:value\ndata:test\n\n", ("message", "test")); + check!("datta:test\n\n"); + check!(" data:test\n\n"); + check!("data :test\n\n"); + check!("data:\tvalue\n\n", ("message", "\tvalue")); + check!("id:123\n\n", id: "123"); + check!("event:test\n\n"); + check!("event:test\n\ndata:value\n\n", ("message", "value")); +} diff --git a/taler-cyclos/Cargo.toml b/taler-cyclos/Cargo.toml @@ -9,32 +9,26 @@ repository.workspace = true license-file.workspace = true [dependencies] -reqwest = { workspace = true, features = ["stream"] } sqlx.workspace = true serde_json = { workspace = true, features = ["raw_value"] } jiff = { workspace = true, features = ["serde"] } taler-common.workspace = true taler-api.workspace = true taler-build.workspace = true +http-client.workspace = true clap.workspace = true serde.workspace = true serde_with.workspace = true serde_path_to_error.workspace = true -serde_urlencoded.workspace = true thiserror.workspace = true tracing.workspace = true tokio.workspace = true -tokio-util = { version = "0.7.17", default-features = false, features = [ - "codec", - "io", -] } -futures-util = { version = "0.3", default-features = false } compact_str.workspace = true - anyhow.workspace = true -base64.workspace = true owo-colors.workspace = true failure-injection.workspace = true +hyper.workspace = true +url.workspace = true [dev-dependencies] taler-test-utils.workspace = true diff --git a/taler-cyclos/src/bin/cyclos-codegen.rs b/taler-cyclos/src/bin/cyclos-codegen.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -18,7 +18,10 @@ use std::io::Write; use std::io::stdout; use clap::Parser; +use http_client::builder::Req; +use hyper::Method; use taler_build::long_version; +use url::Url; /// Cyclos API schema codegen #[derive(clap::Parser, Debug)] @@ -37,10 +40,19 @@ enum Kind { #[tokio::main] async fn main() -> anyhow::Result<()> { let args = Args::parse(); - let api: serde_json::Value = reqwest::get("https://demo.cyclos.org/api/openapi.json") - .await? - .json() - .await?; + let client = http_client::client()?; + let api: serde_json::Value = Req::new( + &client, + Method::GET, + &Url::parse("https://demo.cyclos.org/api/").unwrap(), + "openapi.json", + ) + .send() + .await + .unwrap() + .1 + .json() + .await?; let schemas = &api["components"]["schemas"]; let out = &mut stdout().lock(); @@ -80,5 +92,6 @@ async fn main() -> anyhow::Result<()> { )?; } writeln!(out, "}}")?; + Ok(()) } diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -313,7 +313,7 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { step("Prepare db"); let pool = dbinit(cfg, reset).await?; - let client = reqwest::Client::new(); + let client = http_client::client()?; setup::setup(cfg, reset, &client).await?; let cfg = HarnessCfg::parse(cfg)?; let wire = Client { @@ -530,7 +530,7 @@ async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> { step("Prepare db"); let pool = dbinit(config, reset).await?; - let http_client = reqwest::Client::new(); + let http_client = http_client::client()?; setup::setup(config, reset, &http_client).await?; let cfg = HarnessCfg::parse(config)?; let wire = Client { diff --git a/taler-cyclos/src/config.rs b/taler-cyclos/src/config.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -17,7 +17,6 @@ use std::time::Duration; use compact_str::{CompactString, format_compact}; -use reqwest::Url; use taler_api::{ Serve, config::{ApiCfg, DbCfg}, @@ -27,6 +26,7 @@ use taler_common::{ map_config, types::{amount::Currency, payto::PaytoURI}, }; +use url::Url; use crate::{CyclosAccount, CyclosId, FullCyclosPayto}; diff --git a/taler-cyclos/src/cyclos_api/api.rs b/taler-cyclos/src/cyclos_api/api.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -14,23 +14,20 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::{borrow::Cow, fmt::Display, pin::Pin}; +use std::borrow::Cow; -use compact_str::CompactString; -use futures_util::{Stream, StreamExt, stream}; -use reqwest::{ - Client, Method, RequestBuilder, Response, StatusCode, Url, - header::{self, HeaderMap, HeaderName, HeaderValue}, +use http_client::{ + ApiErr, Client, ClientErr, Ctx, + builder::{Req, Res}, + sse::SseClient, }; -use serde::{Deserialize, Serialize, de::DeserializeOwned}; -use taler_common::error::FmtSource; -use thiserror::Error; -use tokio_util::{ - bytes::Bytes, - codec::{FramedRead, LinesCodec, LinesCodecError}, - io::StreamReader, +use hyper::{ + Method, StatusCode, + header::{HeaderName, HeaderValue}, }; -use tracing::trace; +use serde::{Serialize, de::DeserializeOwned}; +use thiserror::Error; +use url::Url; use crate::cyclos_api::types::{ ForbiddenError, InputError, NotFoundError, Pagination, UnauthorizedError, UnexpectedError, @@ -43,37 +40,7 @@ pub enum CyclosAuth { } #[derive(Error, Debug)] -pub struct ApiErr { - pub path: Cow<'static, str>, - pub method: Method, - pub status: Option<StatusCode>, - pub kind: ErrKind, -} - -impl Display for ApiErr { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let Self { - path, - method, - status, - kind, - } = self; - write!(f, "{path} {method} ")?; - if let Some(status) = status { - write!(f, "{status} ")?; - } - write!(f, "{kind}") - } -} - -#[derive(Error, Debug)] -pub enum ErrKind { - #[error(transparent)] - Transport(FmtSource<reqwest::Error>), - #[error("JSON body: {0}")] - Json(#[from] serde_path_to_error::Error<serde_json::Error>), - #[error("headers: {0}")] - Headers(#[from] HeaderError), +pub enum CyclosErr { #[error("unauthorized: {0}")] Unauthorized(#[from] UnauthorizedError), #[error("forbidden: {0}")] @@ -86,34 +53,13 @@ pub enum ErrKind { Input(#[from] InputError), #[error("status {0}")] UnexpectedStatus(StatusCode), + #[error(transparent)] + Client(#[from] ClientErr), } -impl From<reqwest::Error> for ErrKind { - fn from(value: reqwest::Error) -> Self { - Self::Transport(value.without_url().into()) - } -} - -pub type ApiResult<R> = std::result::Result<R, ApiErr>; - -/** Parse JSON and track error path */ -fn parse<'de, T: Deserialize<'de>>(str: &'de str) -> Result<T, ErrKind> { - let deserializer = &mut serde_json::Deserializer::from_str(str); - serde_path_to_error::deserialize(deserializer).map_err(ErrKind::Json) -} - -async fn json_body<T: DeserializeOwned>(res: reqwest::Response) -> Result<T, ErrKind> { - // TODO check content type? - let body = res.text().await?; - trace!("JSON body: {body}"); - let parsed = parse(&body)?; - Ok(parsed) -} - +pub type ApiResult<R> = std::result::Result<R, ApiErr<CyclosErr>>; pub struct CyclosRequest<'a> { - path: Cow<'static, str>, - method: Method, - builder: RequestBuilder, + req: Req, auth: &'a CyclosAuth, } @@ -125,293 +71,80 @@ impl<'a> CyclosRequest<'a> { path: impl Into<Cow<'static, str>>, auth: &'a CyclosAuth, ) -> Self { - let path = path.into(); - let url = base_url.join(&path).unwrap(); - let builder = client.request(method.clone(), url); Self { - path, - method, - builder, + req: Req::new(client, method, base_url, path), auth, } } pub fn query<T: Serialize>(mut self, name: &str, value: T) -> Self { - self.builder = self.builder.query(&[(name, value)]); + self.req = self.req.query(name, value); self } pub fn header(mut self, key: impl Into<HeaderName>, value: impl Into<HeaderValue>) -> Self { - self.builder = self.builder.header(key, value); + self.req = self.req.header(key, value); self } pub fn json<T: Serialize + ?Sized>(mut self, json: &T) -> Self { - self.builder = self.builder.json(json); + self.req = self.req.json(json); self } - async fn send(builder: RequestBuilder, auth: &CyclosAuth) -> Result<Response, ErrKind> { - let (client, req) = match auth { - CyclosAuth::None => builder, - CyclosAuth::Basic { username, password } => { - builder.basic_auth(username, Some(password)) - } + async fn send(self) -> ApiResult<(Ctx, Res)> { + let Self { req, auth } = self; + match auth { + CyclosAuth::None => req, + CyclosAuth::Basic { username, password } => req.basic_auth(username, password), } - .build_split(); - Ok(client.execute(req?).await?) + .send() + .await + .map_err(|(ctx, e)| ctx.wrap(e.into())) } - async fn error_handling(res: Response) -> Result<Response, ErrKind> { + async fn error_handling(res: Res) -> Result<Res, CyclosErr> { match res.status() { StatusCode::OK | StatusCode::CREATED => Ok(res), - StatusCode::UNAUTHORIZED => Err(ErrKind::Unauthorized(json_body(res).await?)), - StatusCode::FORBIDDEN => Err(ErrKind::Forbidden(json_body(res).await?)), - StatusCode::NOT_FOUND => Err(ErrKind::Unknown(json_body(res).await?)), - StatusCode::UNPROCESSABLE_ENTITY => Err(ErrKind::Input(json_body(res).await?)), - StatusCode::INTERNAL_SERVER_ERROR => Err(ErrKind::Forbidden(json_body(res).await?)), - unexpected => Err(ErrKind::UnexpectedStatus(unexpected)), + StatusCode::UNAUTHORIZED => Err(CyclosErr::Unauthorized(res.json().await?)), + StatusCode::FORBIDDEN => Err(CyclosErr::Forbidden(res.json().await?)), + StatusCode::NOT_FOUND => Err(CyclosErr::Unknown(res.json().await?)), + StatusCode::UNPROCESSABLE_ENTITY => Err(CyclosErr::Input(res.json().await?)), + StatusCode::INTERNAL_SERVER_ERROR => Err(CyclosErr::Forbidden(res.json().await?)), + unexpected => Err(CyclosErr::UnexpectedStatus(unexpected)), } } - pub async fn into_sse(mut self, sse_client: &mut SseClient) -> ApiResult<()> { - self.builder = self - .builder - .header( - header::ACCEPT, - HeaderValue::from_static("text/event-stream"), - ) - .header(header::CACHE_CONTROL, HeaderValue::from_static("no-cache")); - if let Some(id) = &sse_client.last_event_id { - self.builder = self - .builder - .header(HeaderName::from_static("Last-Event-ID"), id.as_str()); - } - let Self { - path, - builder, - method, - auth, - } = self; - match Self::send(builder, auth).await { - Ok(res) => sse_client.connect(res.bytes_stream()), - Err(kind) => { - return Err(ApiErr { - path, - method, - kind, - status: None, - }); - } - } - - Ok(()) + pub async fn into_sse(mut self, client: &mut SseClient) -> ApiResult<()> { + self.req = self.req.req_sse(client); + let (ctx, res) = self.send().await?; + res.sse(client).map_err(|e| ctx.wrap(e.into())) } pub async fn parse_json<T: DeserializeOwned>(self) -> ApiResult<T> { - let Self { - path, - builder, - method, - auth, - } = self; - let res = match Self::send(builder, auth).await { - Ok(res) => res, - Err(kind) => { - return Err(ApiErr { - path, - method, - kind, - status: None, - }); - } - }; - let status = res.status(); - match async { + let (ctx, res) = self.send().await?; + async { let res = Self::error_handling(res).await?; - json_body(res).await + let json = res.json().await?; + Ok(json) } .await - { - Ok(res) => Ok(res), - Err(kind) => Err(ApiErr { - path, - method, - kind, - status: Some(status), - }), - } + .map_err(|e| ctx.wrap(e)) } pub async fn parse_pagination<T: DeserializeOwned>(self) -> ApiResult<Pagination<T>> { - let Self { - path, - builder, - method, - auth, - } = self; - let res = match Self::send(builder, auth).await { - Ok(res) => res, - Err(kind) => { - return Err(ApiErr { - path, - method, - kind, - status: None, - }); - } - }; - let status = res.status(); - match async { + let (ctx, res) = self.send().await?; + async { let res = Self::error_handling(res).await?; - let headers = res.headers(); - let current_page = headers.int_header("x-current-page")?; - let has_next_page = headers.bool_header("x-has-next-page")?; + let current_page = res.int_header("x-current-page")?; + let has_next_page = res.bool_header("x-has-next-page")?; Ok(Pagination { - page: json_body(res).await?, + page: res.json().await?, current_page, has_next_page, }) } .await - { - Ok(res) => Ok(res), - Err(kind) => Err(ApiErr { - path, - method, - kind, - status: Some(status), - }), - } - } -} - -#[derive(Debug, Default)] -pub struct SseMessage { - pub event: CompactString, - pub data: String, -} - -type SseStream = dyn Stream<Item = std::result::Result<String, LinesCodecError>> + Send; - -pub struct SseClient { - last_event_id: Option<CompactString>, - reconnection_time: Option<u64>, - stream: Pin<Box<SseStream>>, -} - -impl SseClient { - pub fn new() -> Self { - Self { - last_event_id: None, - reconnection_time: None, - stream: Box::pin(stream::empty()), - } - } - - pub fn connect<E: std::error::Error + Send + Sync + 'static>( - &mut self, - stream: impl Stream<Item = Result<Bytes, E>> + 'static + Send, - ) { - let stream = stream.map(|it| it.map_err(std::io::Error::other)); - let lines = FramedRead::new(StreamReader::new(stream), LinesCodec::new()); - self.stream = Box::pin(lines); - } - - pub async fn next(&mut self) -> Option<Result<SseMessage, LinesCodecError>> { - // TODO add tests - let mut event = CompactString::new("message"); - let mut data = String::new(); - while let Some(res) = self.stream.next().await { - let line = match res { - Ok(line) => line, - Err(e) => return Some(Err(e)), - }; - // Parse line - let (field, value): (&str, &str) = if line.is_empty() { - if data.ends_with('\n') { - data.pop(); - } - return Some(Ok(SseMessage { event, data })); - } else if let Some(comment) = line.strip_prefix(':') { - trace!(target: "sse", "{comment}"); - continue; - } else if let Some((k, v)) = line.split_once(':') { - (k, v.trim_start_matches(' ')) - } else { - (&line, "") - }; - - // Process field - match field { - "event" => event = CompactString::new(value), - "data" => { - data.push_str(value); - data.push('\n'); - } - "id" => { - if value.contains('\0') { - self.last_event_id = Some(CompactString::new(value)) - } - } - "retry" => { - if value.as_bytes().iter().all(|c| c.is_ascii_digit()) - && let Ok(int) = value.parse::<u64>() - { - self.reconnection_time = Some(int) - } - } - _ => continue, - } - } - None - } -} - -impl Default for SseClient { - fn default() -> Self { - Self::new() - } -} - -#[derive(Debug, thiserror::Error)] -pub enum HeaderError { - #[error("Missing header {0}")] - Missing(&'static str), - #[error("Malformed header {0} expected string got binary")] - NotStr(&'static str), - #[error("Malformed header {0} expected {1} got '{2}'")] - Malformed(&'static str, &'static str, Box<str>), -} - -trait HeaderParser { - fn parse<T>( - &self, - name: &'static str, - kind: &'static str, - transform: impl FnOnce(&str) -> Result<T, ()>, - ) -> Result<T, HeaderError>; - fn int_header(&self, name: &'static str) -> Result<u64, HeaderError> { - self.parse(name, "integer", |s| s.parse().map_err(|_| ())) - } - fn bool_header(&self, name: &'static str) -> Result<bool, HeaderError> { - self.parse(name, "boolean", |s| s.parse().map_err(|_| ())) - } -} - -impl HeaderParser for HeaderMap { - fn parse<T>( - &self, - name: &'static str, - kind: &'static str, - transform: impl FnOnce(&str) -> Result<T, ()>, - ) -> Result<T, HeaderError> { - let Some(value) = self.get(HeaderName::from_static(name)) else { - return Err(HeaderError::Missing(name)); - }; - let Ok(str) = value.to_str() else { - return Err(HeaderError::NotStr(name)); - }; - transform(str).map_err(|_| HeaderError::Malformed(name, kind, str.into())) + .map_err(|e| ctx.wrap(e)) } } diff --git a/taler-cyclos/src/cyclos_api/client.rs b/taler-cyclos/src/cyclos_api/client.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -16,20 +16,22 @@ use std::borrow::Cow; -use reqwest::Method; +use http_client::sse::SseClient; +use hyper::Method; use serde_json::json; use taler_common::types::amount::Decimal; +use url::Url; use crate::cyclos_api::{ - api::{ApiResult, CyclosAuth, CyclosRequest, SseClient}, + api::{ApiResult, CyclosAuth, CyclosRequest}, types::{ Account, DataForTransaction, HistoryItem, OrderBy, Pagination, Transaction, Transfer, User, }, }; pub struct Client<'a> { - pub client: &'a reqwest::Client, - pub api_url: &'a reqwest::Url, + pub client: &'a http_client::Client, + pub api_url: &'a Url, pub auth: &'a CyclosAuth, } diff --git a/taler-cyclos/src/db.rs b/taler-cyclos/src/db.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -18,7 +18,6 @@ use std::fmt::Display; use compact_str::CompactString; use jiff::Timestamp; -use reqwest::Url; use serde::{Serialize, de::DeserializeOwned}; use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow}; use taler_api::{ @@ -40,6 +39,7 @@ use taler_common::{ }, }; use tokio::sync::watch::{Receiver, Sender}; +use url::Url; use crate::{CyclosAccount, CyclosId, FullCyclosPayto, config::parse_db_cfg}; diff --git a/taler-cyclos/src/dev.rs b/taler-cyclos/src/dev.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -43,7 +43,7 @@ pub enum DevCmd { } pub async fn dev(cfg: &Config, cmd: DevCmd) -> anyhow::Result<()> { - let client = reqwest::Client::new(); + let client = http_client::client()?; let cfg = HarnessCfg::parse(cfg)?; let wire = Client { client: &client, diff --git a/taler-cyclos/src/main.rs b/taler-cyclos/src/main.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -89,7 +89,7 @@ async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> { dbinit(cfg, reset).await?; } Command::Setup { reset } => { - let client = reqwest::Client::new(); + let client = http_client::client()?; setup::setup(cfg, reset, &client).await? } Command::Serve { check } => { @@ -105,7 +105,7 @@ async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> { } Command::Worker { transient } => { let pool = pool(cfg).await?; - let client = reqwest::Client::new(); + let client = http_client::client()?; run_worker(cfg, &pool, &client, transient).await?; } Command::Config(cmd) => cmd.run(cfg)?, diff --git a/taler-cyclos/src/notification.rs b/taler-cyclos/src/notification.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -14,13 +14,13 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ +use http_client::sse::SseClient; use jiff::Timestamp; use taler_common::ExpoBackoffDecorr; use tokio::sync::Notify; use tracing::{debug, error, trace}; use crate::cyclos_api::{ - api::SseClient, client::Client, types::{NotificationEntityType, NotificationStatus}, }; @@ -50,11 +50,9 @@ pub async fn watch_notification(client: &Client<'_>, notify: &Notify) -> ! { { notify.notify_waiters(); } - // Find a way to buffer all transactions } } - tokio::time::sleep( jitter.backoff()).await; } } diff --git a/taler-cyclos/src/setup.rs b/taler-cyclos/src/setup.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -24,7 +24,7 @@ use crate::{ cyclos_api::{api::CyclosAuth, client::Client}, }; -pub async fn setup(cfg: &Config, _reset: bool, client: &reqwest::Client) -> anyhow::Result<()> { +pub async fn setup(cfg: &Config, _reset: bool, client: &http_client::Client) -> anyhow::Result<()> { let cfg = SetupCfg::parse(cfg)?; let client = Client { client, diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -17,6 +17,7 @@ use std::time::Duration; use failure_injection::{InjectedErr, fail_point}; +use http_client::ApiErr; use jiff::Timestamp; use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener}; use taler_api::subject::{self, parse_incoming_unstructured}; @@ -31,7 +32,7 @@ use tracing::{debug, error, info, trace, warn}; use crate::{ config::{AccountType, WorkerCfg}, cyclos_api::{ - api::{ApiErr, CyclosAuth, ErrKind}, + api::{CyclosAuth, CyclosErr}, client::Client, types::{AccountKind, HistoryItem, NotFoundError, OrderBy}, }, @@ -46,7 +47,7 @@ pub enum WorkerError { #[error(transparent)] Db(#[from] sqlx::Error), #[error(transparent)] - Api(#[from] ApiErr), + Api(#[from] ApiErr<CyclosErr>), #[error("Another worker is running concurrently")] Concurrency, #[error(transparent)] @@ -58,7 +59,7 @@ pub type WorkerResult = Result<(), WorkerError>; pub async fn run_worker( cfg: &Config, pool: &PgPool, - client: &reqwest::Client, + client: &http_client::Client, transient: bool, ) -> anyhow::Result<()> { let cfg = WorkerCfg::parse(cfg)?; @@ -224,11 +225,11 @@ impl Worker<'_> { trace!(target: "worker", "init tx {}", tx.id); } Err(e) => { - let msg = match e.kind { - ErrKind::Unknown(NotFoundError { entity_type, key }) => { + let msg = match e.err { + CyclosErr::Unknown(NotFoundError { entity_type, key }) => { format!("unknown {entity_type} {key}") } - ErrKind::Forbidden(err) => err.to_string(), + CyclosErr::Forbidden(err) => err.to_string(), _ => return Err(e.into()), }; // TODO is permission should be considered are hard or soft failure ? diff --git a/taler-magnet-bank/Cargo.toml b/taler-magnet-bank/Cargo.toml @@ -15,13 +15,13 @@ p256 = { version = "0.13.2", features = ["alloc", "ecdsa"] } form_urlencoded = "1.2" percent-encoding = "2.3" rpassword = "7.4" -reqwest.workspace = true sqlx.workspace = true serde_json = { workspace = true, features = ["raw_value"] } jiff = { workspace = true, features = ["serde"] } taler-common.workspace = true taler-api.workspace = true taler-build.workspace = true +http-client.workspace = true clap.workspace = true serde.workspace = true serde_with.workspace = true @@ -35,7 +35,8 @@ base64.workspace = true rand_core.workspace = true owo-colors.workspace = true failure-injection.workspace = true -compact_str.workspace = true +hyper.workspace = true +url.workspace = true [dev-dependencies] taler-test-utils.workspace = true diff --git a/taler-magnet-bank/src/bin/magnet-bank-harness.rs b/taler-magnet-bank/src/bin/magnet-bank-harness.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -86,7 +86,7 @@ struct Harness<'a> { impl<'a> Harness<'a> { async fn new( cfg: &'a HarnessCfg, - client: &'a reqwest::Client, + client: &'a http_client::Client, pool: &'a PgPool, keys: &'a Keys, ) -> Self { @@ -332,7 +332,7 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { let cfg = HarnessCfg::parse(cfg)?; let keys = setup::load(&cfg.worker)?; - let client = reqwest::Client::new(); + let client = http_client::client()?; let harness = Harness::new(&cfg, &client, &pool, &keys).await; @@ -519,7 +519,7 @@ async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> { let cfg = HarnessCfg::parse(config)?; let keys = setup::load(&cfg.worker)?; - let client = reqwest::Client::new(); + let client = http_client::client()?; let harness = Harness::new(&cfg, &client, &pool, &keys).await; diff --git a/taler-magnet-bank/src/config.rs b/taler-magnet-bank/src/config.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -17,7 +17,6 @@ use std::time::Duration; use jiff::civil::Date; -use reqwest::Url; use taler_api::{ Serve, config::{ApiCfg, DbCfg}, @@ -27,6 +26,7 @@ use taler_common::{ map_config, types::payto::PaytoURI, }; +use url::Url; use crate::{FullHuPayto, HuIban, magnet_api::oauth::Token}; diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -17,7 +17,6 @@ use std::fmt::Display; use jiff::{Timestamp, civil::Date, tz::TimeZone}; -use reqwest::Url; use serde::{Serialize, de::DeserializeOwned}; use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow}; use taler_api::{ @@ -39,6 +38,7 @@ use taler_common::{ }, }; use tokio::sync::watch::{Receiver, Sender}; +use url::Url; use crate::{FullHuPayto, config::parse_db_cfg, constants::CURRENCY, magnet_api::types::TxStatus}; diff --git a/taler-magnet-bank/src/dev.rs b/taler-magnet-bank/src/dev.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -67,7 +67,7 @@ pub enum DevCmd { pub async fn dev(cfg: &Config, cmd: DevCmd) -> anyhow::Result<()> { let cfg = WorkerCfg::parse(cfg)?; let keys = setup::load(&cfg)?; - let client = reqwest::Client::new(); + let client = http_client::client()?; let client = AuthClient::new(&client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token); match cmd { DevCmd::Accounts => { diff --git a/taler-magnet-bank/src/magnet_api/api.rs b/taler-magnet-bank/src/magnet_api/api.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -16,11 +16,15 @@ use std::borrow::Cow; -use reqwest::{Client, Method, RequestBuilder, Response, StatusCode, Url, header}; +use http_client::{ + ApiErr, Client, ClientErr, Ctx, + builder::{Req, Res}, +}; +use hyper::{Method, StatusCode, header}; use serde::{Deserialize, Serialize, de::DeserializeOwned}; -use taler_common::error::FmtSource; use thiserror::Error; use tracing::Level; +use url::Url; use crate::magnet_api::oauth::{Token, oauth}; @@ -45,92 +49,47 @@ pub struct MagnetError { } #[derive(Error, Debug)] -#[error("{method} {path} {kind}")] -pub struct ApiErr { - pub path: Cow<'static, str>, - pub method: Method, - pub kind: ErrKind, -} - -#[derive(Error, Debug)] -pub enum ErrKind { - #[error("transport: {0}")] - Transport(FmtSource<reqwest::Error>), +pub enum MagnetErr { #[error("magnet {0}")] Magnet(#[from] MagnetError), - #[error("JSON body: {0}")] - Json(#[from] serde_path_to_error::Error<serde_json::Error>), - #[error("form body: {0}")] - Form(#[from] serde_urlencoded::de::Error), #[error("status {0}")] Status(StatusCode), #[error("status {0} '{1}'")] StatusCause(StatusCode, String), + #[error(transparent)] + Client(#[from] ClientErr), } - -impl From<reqwest::Error> for ErrKind { - fn from(value: reqwest::Error) -> Self { - Self::Transport(value.into()) - } -} - -pub type ApiResult<R> = std::result::Result<R, ApiErr>; +pub type ApiResult<R> = std::result::Result<R, ApiErr<MagnetErr>>; /** Handle error from magnet API calls */ -async fn error_handling(res: reqwest::Result<Response>) -> Result<String, ErrKind> { - let res = res?; +async fn error_handling(res: Res) -> Result<Res, MagnetErr> { let status = res.status(); match status { - StatusCode::OK => Ok(res.text().await?), - StatusCode::BAD_REQUEST => Err(ErrKind::Status(status)), + StatusCode::OK => Ok(res), + StatusCode::BAD_REQUEST => Err(MagnetErr::Status(status)), StatusCode::FORBIDDEN => { - let cause = res - .headers() - .get(header::WWW_AUTHENTICATE) - .map(|s| s.to_str().unwrap_or_default()) - .unwrap_or_default(); - Err(ErrKind::StatusCause(status, cause.to_string())) + let cause = res.str_header(header::WWW_AUTHENTICATE.as_str())?; + Err(MagnetErr::StatusCause(status, cause.to_string())) } _ => { if tracing::enabled!(Level::DEBUG) { - tracing::debug!("unexpected error: {:?}", &res); let body = res.text().await; tracing::debug!("unexpected error body: {:?}", body); } - Err(ErrKind::Status(status)) + Err(MagnetErr::Status(status)) } } } /** Parse JSON and track error path */ -fn parse<'de, T: Deserialize<'de>>(str: &'de str) -> Result<T, ErrKind> { +fn parse<'de, T: Deserialize<'de>>(str: &'de str) -> Result<T, MagnetErr> { let deserializer = &mut serde_json::Deserializer::from_str(str); - serde_path_to_error::deserialize(deserializer).map_err(ErrKind::Json) -} - -/** Parse magnet JSON response */ -async fn magnet_json<T: DeserializeOwned>(res: reqwest::Result<Response>) -> Result<T, ErrKind> { - let body = error_handling(res).await?; - let header: Header = parse(&body)?; - if header.error_code.unwrap_or(200) == 200 { - parse(&body) - } else { - Err(ErrKind::Magnet(parse(&body)?)) - } -} - -/** Parse magnet URL encoded response into our own type */ -async fn magnet_url<T: DeserializeOwned>( - response: reqwest::Result<Response>, -) -> Result<T, ErrKind> { - let body = error_handling(response).await?; - serde_urlencoded::from_str(&body).map_err(ErrKind::Form) + serde_path_to_error::deserialize(deserializer) + .map_err(|e| MagnetErr::Client(ClientErr::ResJson(e))) } pub struct MagnetRequest<'a> { - path: Cow<'static, str>, - method: Method, - builder: RequestBuilder, + req: Req, consumer: &'a Token, access: Option<&'a Token>, verifier: Option<&'a str>, @@ -146,67 +105,58 @@ impl<'a> MagnetRequest<'a> { access: Option<&'a Token>, verifier: Option<&'a str>, ) -> Self { - let path = path.into(); - let url = base_url.join(&path).unwrap(); - let builder = client.request(method.clone(), url); Self { - path, - method, - builder, + req: Req::new(client, method, base_url, path), consumer, access, verifier, } } - pub fn query<T: Serialize + ?Sized>(mut self, query: &T) -> Self { - self.builder = self.builder.query(query); + pub fn query<T: Serialize>(mut self, name: &str, value: T) -> Self { + self.req = self.req.query(name, value); self } pub fn json<T: Serialize + ?Sized>(mut self, json: &T) -> Self { - self.builder = self.builder.json(json); + self.req = self.req.json(json); self } - pub async fn parse_url<T: DeserializeOwned>(self) -> ApiResult<T> { + async fn send(self) -> ApiResult<(Ctx, Res)> { let Self { - path, - builder, - method, + req, consumer, access, verifier, } = self; - let (client, req) = builder.build_split(); - async { - let mut req = req?; - oauth(&mut req, consumer, access, verifier); - let res = client.execute(req).await; - magnet_url(res).await - } - .await - .map_err(|kind| ApiErr { path, method, kind }) + oauth(req, consumer, access, verifier) + .send() + .await + .map_err(|(ctx, e)| ctx.wrap(e.into())) + } + + pub async fn parse_url<T: DeserializeOwned>(self) -> ApiResult<T> { + let (ctx, res) = self.send().await?; + async { Ok(error_handling(res).await?.urlencoded().await?) } + .await + .map_err(|e| ctx.wrap(e)) } pub async fn parse_json<T: DeserializeOwned>(self) -> ApiResult<T> { - let Self { - path, - builder, - method, - consumer, - access, - verifier, - } = self; - let (client, req) = builder.build_split(); + let (ctx, res) = self.send().await?; async { - let mut req = req?; - oauth(&mut req, consumer, access, verifier); - let res = client.execute(req).await; - magnet_json(res).await + let res = error_handling(res).await?; + let raw: Box<serde_json::value::RawValue> = res.json().await?; + let header: Header = parse(raw.get())?; + if header.error_code.unwrap_or(200) == 200 { + Ok(parse(raw.get())?) + } else { + Err(MagnetErr::Magnet(parse(raw.get())?)) + } } .await - .map_err(|kind| ApiErr { path, method, kind }) + .map_err(|e| ctx.wrap(e)) } pub async fn parse_empty(self) -> ApiResult<()> { diff --git a/taler-magnet-bank/src/magnet_api/client.rs b/taler-magnet-bank/src/magnet_api/client.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -17,12 +17,12 @@ use std::borrow::Cow; use base64::{Engine as _, prelude::BASE64_STANDARD}; +use hyper::Method; use p256::{ PublicKey, ecdsa::{DerSignature, SigningKey, signature::Signer as _}, pkcs8::EncodePublicKey, }; -use reqwest::Method; use serde::{Deserialize, Serialize}; use serde_json::{Value, json}; @@ -48,15 +48,15 @@ pub struct AccountWrapper { } pub struct AuthClient<'a> { - client: &'a reqwest::Client, - pub api_url: &'a reqwest::Url, + client: &'a http_client::Client, + pub api_url: &'a url::Url, consumer: &'a Token, } impl<'a> AuthClient<'a> { pub fn new( - client: &'a reqwest::Client, - api_url: &'a reqwest::Url, + client: &'a http_client::Client, + api_url: &'a url::Url, consumer: &'a Token, ) -> Self { Self { @@ -86,7 +86,7 @@ impl<'a> AuthClient<'a> { pub async fn token_request(&self) -> ApiResult<Token> { self.request(Method::GET, "/NetBankOAuth/token/request", None, None) - .query(&[("oauth_callback", "oob")]) + .query("oauth_callback", "oob") .parse_url() .await } @@ -117,8 +117,8 @@ impl<'a> AuthClient<'a> { } pub struct ApiClient<'a> { - pub client: &'a reqwest::Client, - api_url: &'a reqwest::Url, + pub client: &'a http_client::Client, + api_url: &'a url::Url, consumer: &'a Token, access: &'a Token, } @@ -220,14 +220,12 @@ impl ApiClient<'_> { ); if let Some(next) = next { req = req - .query(&[("nextId", next.next_id)]) - .query(&[("nextTipus", &next.next_type)]); + .query("nextId", next.next_id) + .query("nextTipus", &next.next_type); } - req.query(&[("terheles", direction)]) - .query(&[ - ("tranzakciofrissites", sync), - ("ascending", order == Order::Ascending), - ]) + req.query("terheles", direction) + .query("tranzakciofrissites", sync) + .query("ascending", order == Order::Ascending) .parse_json() .await } diff --git a/taler-magnet-bank/src/magnet_api/oauth.rs b/taler-magnet-bank/src/magnet_api/oauth.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -18,9 +18,10 @@ use std::{borrow::Cow, time::SystemTime}; use base64::{Engine as _, prelude::BASE64_STANDARD}; use hmac::{Hmac, Mac}; +use http_client::builder::Req; +use hyper::header; use percent_encoding::NON_ALPHANUMERIC; use rand_core::RngCore; -use reqwest::{Request, header::HeaderValue}; use serde::{Deserialize, Serialize}; use sha1::Sha1; @@ -59,8 +60,8 @@ fn oauth_timestamp() -> u64 { /** Generate a valid OAuth Authorization header */ fn oauth_header( - method: &reqwest::Method, - url: &reqwest::Url, + method: &hyper::Method, + url: &url::Url, consumer: &Token, access: Option<&Token>, verifier: Option<&str>, @@ -152,8 +153,7 @@ fn oauth_header( } /** Perform OAuth on an HTTP request */ -pub fn oauth(req: &mut Request, consumer: &Token, access: Option<&Token>, verifier: Option<&str>) { +pub fn oauth(req: Req, consumer: &Token, access: Option<&Token>, verifier: Option<&str>) -> Req { let header = oauth_header(req.method(), req.url(), consumer, access, verifier); - req.headers_mut() - .append("Authorization", HeaderValue::from_str(&header).unwrap()); + req.sensitive_header(header::AUTHORIZATION, header) } diff --git a/taler-magnet-bank/src/main.rs b/taler-magnet-bank/src/main.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -105,7 +105,7 @@ async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> { } Command::Worker { transient } => { let pool = pool(cfg).await?; - let client = reqwest::Client::new(); + let client = http_client::client()?; run_worker(cfg, &pool, &client, transient).await?; } Command::Config(cmd) => cmd.run(cfg)?, diff --git a/taler-magnet-bank/src/setup.rs b/taler-magnet-bank/src/setup.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -20,7 +20,7 @@ use p256::ecdsa::SigningKey; use taler_common::{json_file, types::base32::Base32}; use tracing::{info, warn}; -use crate::magnet_api::{api::ErrKind, client::AuthClient, oauth::Token}; +use crate::magnet_api::{api::MagnetErr, client::AuthClient, oauth::Token}; use crate::{ config::WorkerCfg, magnet_api::{api::MagnetError, oauth::TokenAuth}, @@ -81,7 +81,7 @@ pub async fn setup(cfg: WorkerCfg, reset: bool) -> anyhow::Result<()> { Err(e) if e.kind() == ErrorKind::NotFound => KeysFile::default(), Err(e) => Err(e)?, }; - let client = reqwest::Client::new(); + let client = http_client::client()?; let client = AuthClient::new(&client, &cfg.api_url, &cfg.consumer); info!("Setup OAuth access token"); @@ -99,7 +99,7 @@ pub async fn setup(cfg: WorkerCfg, reset: bool) -> anyhow::Result<()> { token_request.key ); let auth_url = rpassword::prompt_password("Enter the result URL>")?; - let auth_url = reqwest::Url::parse(&auth_url)?; + let auth_url = url::Url::parse(&auth_url)?; let token_auth: TokenAuth = serde_urlencoded::from_str(auth_url.query().unwrap_or_default())?; assert_eq!(token_request.key, token_auth.oauth_token); @@ -122,7 +122,7 @@ pub async fn setup(cfg: WorkerCfg, reset: bool) -> anyhow::Result<()> { let sca_code = rpassword::prompt_password("Enter the code>")?; if let Err(e) = client.perform_sca(&sca_code).await { // Ignore error if SCA already performed - if !matches!(e.kind, ErrKind::Magnet(MagnetError { ref short_message, .. }) if short_message == "TOKEN_SCA_HITELESITETT") + if !matches!(e.err, MagnetErr::Magnet(MagnetError { ref short_message, .. }) if short_message == "TOKEN_SCA_HITELESITETT") { return Err(e.into()); } @@ -143,7 +143,7 @@ pub async fn setup(cfg: WorkerCfg, reset: bool) -> anyhow::Result<()> { }; if let Err(e) = client.upload_public_key(&signing_key).await { // Ignore error if public key already uploaded - if !matches!(e.kind, ErrKind::Magnet(MagnetError { ref short_message, .. }) if short_message== "KULCS_MAR_HASZNALATBAN") + if !matches!(e.err, MagnetErr::Magnet(MagnetError { ref short_message, .. }) if short_message== "KULCS_MAR_HASZNALATBAN") { return Err(e.into()); } diff --git a/taler-magnet-bank/src/worker.rs b/taler-magnet-bank/src/worker.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -17,6 +17,7 @@ use std::{num::ParseIntError, time::Duration}; use failure_injection::{InjectedErr, fail_point}; +use http_client::ApiErr; use jiff::{Timestamp, Zoned, civil::Date}; use p256::ecdsa::SigningKey; use sqlx::{Acquire as _, PgConnection, PgPool, postgres::PgListener}; @@ -36,7 +37,7 @@ use crate::{ config::{AccountType, WorkerCfg}, db::{self, AddIncomingResult, Initiated, RegisterResult, TxIn, TxOut, TxOutKind}, magnet_api::{ - api::{ApiErr, ErrKind}, + api::MagnetErr, client::{ApiClient, AuthClient}, types::{Direction, Next, Order, TxDto, TxStatus}, }, @@ -50,7 +51,7 @@ pub enum WorkerError { #[error(transparent)] Db(#[from] sqlx::Error), #[error(transparent)] - Api(#[from] ApiErr), + Api(#[from] ApiErr<MagnetErr>), #[error("Another worker is running concurrently")] Concurrency, #[error(transparent)] @@ -62,7 +63,7 @@ pub type WorkerResult = Result<(), WorkerError>; pub async fn run_worker( cfg: &Config, pool: &PgPool, - client: &reqwest::Client, + client: &http_client::Client, transient: bool, ) -> anyhow::Result<()> { let cfg = WorkerCfg::parse(cfg)?; @@ -499,11 +500,7 @@ impl Worker<'_> { info } Err(e) => { - if let ApiErr { - kind: ErrKind::Magnet(e), - .. - } = &e - { + if let MagnetErr::Magnet(e) = &e.err { // Check if error is permanent if matches!( (e.error_code, e.short_message.as_str()), @@ -557,11 +554,7 @@ impl Worker<'_> { { Ok(_) => Ok(()), Err(e) => { - if let ApiErr { - kind: ErrKind::Magnet(e), - .. - } = &e - { + if let MagnetErr::Magnet(e) = &e.err { // Check if soft failure if matches!( (e.error_code, e.short_message.as_str()),