commit 3cbe83571b65c0a4f1ff69f83b68542e2b076c03
parent 431cef0b0ccdb5a397152b186043fa9a2f606f43
Author: Antoine A <>
Date: Tue, 30 Dec 2025 12:40:31 +0100
cyclos: handle pagination
Diffstat:
6 files changed, 164 insertions(+), 49 deletions(-)
diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs
@@ -39,7 +39,11 @@ use taler_cyclos::{
CyclosId, FullCyclosPayto,
config::{AccountType, HarnessCfg},
constants::CONFIG_SOURCE,
- cyclos_api::{api::CyclosAuth, client::Client, types::HistoryItem},
+ cyclos_api::{
+ api::CyclosAuth,
+ client::Client,
+ types::{HistoryItem, OrderBy},
+ },
db::{self, TransferResult, dbinit},
setup,
worker::{Worker, WorkerError, WorkerResult, run_worker},
@@ -129,9 +133,10 @@ impl<'a> Harness<'a> {
/// Fetch last transfer related to client
async fn client_last_transfer(&self) -> HistoryItem {
self.client
- .history(**self.client_payto)
+ .history(**self.client_payto, OrderBy::DateDesc, 0)
.await
.unwrap()
+ .page
.remove(0)
}
diff --git a/taler-cyclos/src/cyclos_api/api.rs b/taler-cyclos/src/cyclos_api/api.rs
@@ -19,8 +19,8 @@ use std::{borrow::Cow, fmt::Display, pin::Pin};
use compact_str::CompactString;
use futures_util::{Stream, StreamExt, stream};
use reqwest::{
- Client, Method, RequestBuilder, StatusCode, Url,
- header::{self, HeaderName, HeaderValue},
+ Client, Method, RequestBuilder, Response, StatusCode, Url,
+ header::{self, HeaderMap, HeaderName, HeaderValue},
};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use taler_common::error::FmtSource;
@@ -33,7 +33,7 @@ use tokio_util::{
use tracing::trace;
use crate::cyclos_api::types::{
- ForbiddenError, InputError, NotFoundError, UnauthorizedError, UnexpectedError,
+ ForbiddenError, InputError, NotFoundError, Pagination, UnauthorizedError, UnexpectedError,
};
#[derive(Debug)]
@@ -72,6 +72,8 @@ pub enum ErrKind {
Transport(FmtSource<reqwest::Error>),
#[error("JSON body: {0}")]
Json(#[from] serde_path_to_error::Error<serde_json::Error>),
+ #[error("headers: {0}")]
+ Headers(#[from] HeaderError),
#[error("unauthorized: {0}")]
Unauthorized(#[from] UnauthorizedError),
#[error("forbidden: {0}")]
@@ -134,7 +136,7 @@ impl<'a> CyclosRequest<'a> {
}
}
- pub fn query<T: Serialize + ?Sized>(mut self, name: &str, value: &T) -> Self {
+ pub fn query<T: Serialize>(mut self, name: &str, value: T) -> Self {
self.builder = self.builder.query(&[(name, value)]);
self
}
@@ -149,6 +151,29 @@ impl<'a> CyclosRequest<'a> {
self
}
+ async fn send(builder: RequestBuilder, auth: &CyclosAuth) -> Result<Response, ErrKind> {
+ let (client, req) = match auth {
+ CyclosAuth::None => builder,
+ CyclosAuth::Basic { username, password } => {
+ builder.basic_auth(username, Some(password))
+ }
+ }
+ .build_split();
+ Ok(client.execute(req?).await?)
+ }
+
+ async fn error_handling(res: Response) -> Result<Response, ErrKind> {
+ match res.status() {
+ StatusCode::OK | StatusCode::CREATED => Ok(res),
+ StatusCode::UNAUTHORIZED => Err(ErrKind::Unauthorized(json_body(res).await?)),
+ StatusCode::FORBIDDEN => Err(ErrKind::Forbidden(json_body(res).await?)),
+ StatusCode::NOT_FOUND => Err(ErrKind::Unknown(json_body(res).await?)),
+ StatusCode::UNPROCESSABLE_ENTITY => Err(ErrKind::Input(json_body(res).await?)),
+ StatusCode::INTERNAL_SERVER_ERROR => Err(ErrKind::Forbidden(json_body(res).await?)),
+ unexpected => Err(ErrKind::UnexpectedStatus(unexpected)),
+ }
+ }
+
pub async fn into_sse(mut self, sse_client: &mut SseClient) -> ApiResult<()> {
self.builder = self
.builder
@@ -168,20 +193,7 @@ impl<'a> CyclosRequest<'a> {
method,
auth,
} = self;
- let (client, req) = match auth {
- CyclosAuth::None => builder,
- CyclosAuth::Basic { username, password } => {
- builder.basic_auth(username, Some(password))
- }
- }
- .build_split();
- match async {
- let req = req?;
- let res = client.execute(req).await?;
- Ok(res)
- }
- .await
- {
+ match Self::send(builder, auth).await {
Ok(res) => sse_client.connect(res.bytes_stream()),
Err(kind) => {
return Err(ApiErr {
@@ -203,20 +215,42 @@ impl<'a> CyclosRequest<'a> {
method,
auth,
} = self;
- let (client, req) = match auth {
- CyclosAuth::None => builder,
- CyclosAuth::Basic { username, password } => {
- builder.basic_auth(username, Some(password))
+ let res = match Self::send(builder, auth).await {
+ Ok(res) => res,
+ Err(kind) => {
+ return Err(ApiErr {
+ path,
+ method,
+ kind,
+ status: None,
+ });
}
- }
- .build_split();
- let res = match async {
- let req = req?;
- let res = client.execute(req).await?;
- Ok(res)
+ };
+ let status = res.status();
+ match async {
+ let res = Self::error_handling(res).await?;
+ json_body(res).await
}
.await
{
+ Ok(res) => Ok(res),
+ Err(kind) => Err(ApiErr {
+ path,
+ method,
+ kind,
+ status: Some(status),
+ }),
+ }
+ }
+
+ pub async fn parse_pagination<T: DeserializeOwned>(self) -> ApiResult<Pagination<T>> {
+ let Self {
+ path,
+ builder,
+ method,
+ auth,
+ } = self;
+ let res = match Self::send(builder, auth).await {
Ok(res) => res,
Err(kind) => {
return Err(ApiErr {
@@ -229,15 +263,15 @@ impl<'a> CyclosRequest<'a> {
};
let status = res.status();
match async {
- match status {
- StatusCode::OK | StatusCode::CREATED => json_body(res).await,
- StatusCode::UNAUTHORIZED => Err(ErrKind::Unauthorized(json_body(res).await?)),
- StatusCode::FORBIDDEN => Err(ErrKind::Forbidden(json_body(res).await?)),
- StatusCode::NOT_FOUND => Err(ErrKind::Unknown(json_body(res).await?)),
- StatusCode::UNPROCESSABLE_ENTITY => Err(ErrKind::Input(json_body(res).await?)),
- StatusCode::INTERNAL_SERVER_ERROR => Err(ErrKind::Forbidden(json_body(res).await?)),
- _ => Err(ErrKind::UnexpectedStatus(status)),
- }
+ let res = Self::error_handling(res).await?;
+ let headers = res.headers();
+ let current_page = headers.int_header("x-current-page")?;
+ let has_next_page = headers.bool_header("x-has-next-page")?;
+ Ok(Pagination {
+ page: json_body(res).await?,
+ current_page,
+ has_next_page,
+ })
}
.await
{
@@ -338,3 +372,45 @@ impl Default for SseClient {
Self::new()
}
}
+
+#[derive(Debug, thiserror::Error)]
+pub enum HeaderError {
+ #[error("Missing header {0}")]
+ Missing(&'static str),
+ #[error("Malformed header {0} expected string got binary")]
+ NotStr(&'static str),
+ #[error("Malformed header {0} expected {1} got '{2}'")]
+ Malformed(&'static str, &'static str, Box<str>),
+}
+
+trait HeaderParser {
+ fn parse<T>(
+ &self,
+ name: &'static str,
+ kind: &'static str,
+ transform: impl FnOnce(&str) -> Result<T, ()>,
+ ) -> Result<T, HeaderError>;
+ fn int_header(&self, name: &'static str) -> Result<u64, HeaderError> {
+ self.parse(name, "integer", |s| s.parse().map_err(|_| ()))
+ }
+ fn bool_header(&self, name: &'static str) -> Result<bool, HeaderError> {
+ self.parse(name, "boolean", |s| s.parse().map_err(|_| ()))
+ }
+}
+
+impl HeaderParser for HeaderMap {
+ fn parse<T>(
+ &self,
+ name: &'static str,
+ kind: &'static str,
+ transform: impl FnOnce(&str) -> Result<T, ()>,
+ ) -> Result<T, HeaderError> {
+ let Some(value) = self.get(HeaderName::from_static(name)) else {
+ return Err(HeaderError::Missing(name));
+ };
+ let Ok(str) = value.to_str() else {
+ return Err(HeaderError::NotStr(name));
+ };
+ transform(str).map_err(|_| HeaderError::Malformed(name, kind, str.into()))
+ }
+}
diff --git a/taler-cyclos/src/cyclos_api/client.rs b/taler-cyclos/src/cyclos_api/client.rs
@@ -22,7 +22,9 @@ use taler_common::types::amount::Decimal;
use crate::cyclos_api::{
api::{ApiResult, CyclosAuth, CyclosRequest, SseClient},
- types::{Account, DataForTransaction, HistoryItem, Transaction, Transfer, User},
+ types::{
+ Account, DataForTransaction, HistoryItem, OrderBy, Pagination, Transaction, Transfer, User,
+ },
};
pub struct Client<'a> {
@@ -83,12 +85,20 @@ impl Client<'_> {
.await
}
- pub async fn history(&self, account_type_id: u64) -> ApiResult<Vec<HistoryItem>> {
+ pub async fn history(
+ &self,
+ account_type_id: u64,
+ order_by: OrderBy,
+ page: u64,
+ ) -> ApiResult<Pagination<Vec<HistoryItem>>> {
self.request(
Method::GET,
format!("self/accounts/{account_type_id}/history"),
)
- .parse_json()
+ .query("orderBy", order_by)
+ .query("skipTotalCount", false)
+ .query("page", page)
+ .parse_pagination()
.await
}
@@ -104,7 +114,7 @@ impl Client<'_> {
sse_client: &mut SseClient,
) -> ApiResult<()> {
self.request(Method::GET, "push/subscribe")
- .query("clientId", &client_id)
+ .query("clientId", client_id)
.query("kinds", "newNotification")
.into_sse(sse_client)
.await
diff --git a/taler-cyclos/src/cyclos_api/types.rs b/taler-cyclos/src/cyclos_api/types.rs
@@ -17,11 +17,20 @@
use std::collections::BTreeMap;
use jiff::Timestamp;
-use serde::Deserialize;
+use serde::{Deserialize, Serialize};
use taler_common::types::{amount::Decimal, payto::FullPayto};
use crate::{CyclosId, FullCyclosPayto};
+#[derive(Debug, Serialize, Clone)]
+#[serde(rename_all = "camelCase")]
+pub enum OrderBy {
+ AmountAsc,
+ AmountDesc,
+ DateAsc,
+ DateDesc,
+}
+
#[derive(Debug, Deserialize, Clone)]
#[serde(rename_all = "camelCase")]
pub struct Type {
@@ -96,6 +105,12 @@ pub struct RelatedAccount {
pub kind: AccountKind,
}
+pub struct Pagination<T> {
+ pub page: T,
+ pub current_page: u64,
+ pub has_next_page: bool,
+}
+
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct HistoryItem {
diff --git a/taler-cyclos/src/notification.rs b/taler-cyclos/src/notification.rs
@@ -48,7 +48,7 @@ pub async fn watch_notification(client: &Client<'_>, notify: &Notify) -> ! {
debug!(target: "notification", "new notification {} {:?} {:?}", status.notification.id, status.notification.ty, status.notification.entity_type);
if status.notification.entity_type == Some(NotificationEntityType::Transfer)
{
- notify.notify_one();
+ notify.notify_waiters();
}
// Find a way to buffer all transactions
diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs
@@ -34,7 +34,7 @@ use crate::{
cyclos_api::{
api::{ApiErr, CyclosAuth, ErrKind},
client::Client,
- types::{AccountKind, HistoryItem, NotFoundError},
+ types::{AccountKind, HistoryItem, NotFoundError, OrderBy},
},
db::{
self, AddIncomingResult, ChargebackFailureResult, RegisterResult, TxIn, TxOut, TxOutKind,
@@ -171,9 +171,13 @@ impl Worker<'_> {
// Sync transactions
//let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused
+ let mut page_idx = 0;
loop {
- let transfers = self.client.history(self.account_type_id).await?;
- for transfer in transfers {
+ let page = self
+ .client
+ .history(self.account_type_id, OrderBy::DateAsc, page_idx)
+ .await?;
+ for transfer in page.page {
let tx = extract_tx_info(transfer);
match tx {
Tx::In(tx_in) => self.ingest_in(tx_in).await?,
@@ -181,7 +185,12 @@ impl Worker<'_> {
}
}
- break; // TODO pagination
+ if !page.has_next_page {
+ break;
+ } else {
+ page_idx += 1;
+ }
+ // TODO store cursor
}
// Send transactions