commit cceaddc0c78603f03b68fc717eab496655188d05
parent bf97302f72d421d47874387ad9c5f2fff2cf2270
Author: Antoine A <>
Date: Sat, 14 Feb 2026 21:26:06 +0100
cyclos: in-db timestamp cursor
Diffstat:
11 files changed, 93 insertions(+), 82 deletions(-)
diff --git a/taler-cyclos/README.md b/taler-cyclos/README.md
@@ -40,7 +40,7 @@ PASSWORD = password
#### Run
```sh
-cargo run --bin cyclos-harness -- -c dev.conf logic -L DEBUG
+cargo run --bin cyclos-harness -- -c dev.conf logic -L DEBUG
```
### Online tests
@@ -66,5 +66,5 @@ PASSWORD = password
#### Run
```sh
-cargo run --bin cyclos-harness -- -c dev.conf online -L DEBUG
+cargo run --bin cyclos-harness -- -c dev.conf online -L DEBUG
```
\ No newline at end of file
diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs
@@ -21,8 +21,7 @@ use compact_str::CompactString;
use failure_injection::{InjectedErr, set_failure_scenario};
use jiff::Timestamp;
use owo_colors::OwoColorize as _;
-use sqlx::{PgPool, postgres::PgRow};
-use taler_api::db::TypeHelper as _;
+use sqlx::{PgPool, Row as _, postgres::PgRow};
use taler_build::long_version;
use taler_common::{
CommonArgs,
@@ -85,8 +84,8 @@ struct Harness<'a> {
wire: Client<'a>,
client_payto: FullCyclosPayto,
wire_payto: FullCyclosPayto,
- payment_type_id: u64,
- account_type_id: u64,
+ payment_type_id: i64,
+ account_type_id: i64,
currency: Currency,
root: CompactString,
}
@@ -108,7 +107,7 @@ impl<'a> Harness<'a> {
}
/// Send transaction from client to exchange
- async fn client_send(&self, subject: &str, amount: Decimal) -> u64 {
+ async fn client_send(&self, subject: &str, amount: Decimal) -> i64 {
*self
.client
.direct_payment(*self.wire_payto.id, self.payment_type_id, amount, subject)
@@ -118,7 +117,7 @@ impl<'a> Harness<'a> {
}
/// Send transaction from exchange to client
- async fn exchange_send(&self, subject: &str, amount: Decimal) -> u64 {
+ async fn exchange_send(&self, subject: &str, amount: Decimal) -> i64 {
*self
.wire
.direct_payment(*self.client_payto.id, self.payment_type_id, amount, subject)
@@ -128,14 +127,14 @@ impl<'a> Harness<'a> {
}
/// Chargeback a transfer
- async fn chargeback(&self, id: u64) -> u64 {
+ async fn chargeback(&self, id: i64) -> i64 {
self.client.chargeback(id).await.unwrap()
}
/// Fetch last transfer related to client
async fn client_last_transfer(&self) -> HistoryItem {
self.client
- .history(*self.client_payto.id, OrderBy::DateDesc, 0)
+ .history(*self.client_payto.id, OrderBy::DateDesc, 0, None)
.await
.unwrap()
.page
@@ -179,7 +178,7 @@ impl<'a> Harness<'a> {
));
}
- async fn custom_transfer(&self, amount: Decimal, creditor_id: u64, creditor_name: &str) -> u64 {
+ async fn custom_transfer(&self, amount: Decimal, creditor_id: i64, creditor_name: &str) -> u64 {
let res = db::make_transfer(
self.pool,
&db::Transfer {
@@ -205,7 +204,7 @@ impl<'a> Harness<'a> {
.await
}
- async fn transfer_id(&self, transfer_id: u64) -> u64 {
+ async fn transfer_id(&self, transfer_id: u64) -> i64 {
sqlx::query(
"SELECT transfer_id
FROM transfer
@@ -214,7 +213,7 @@ impl<'a> Harness<'a> {
WHERE initiated_id=$1",
)
.bind(transfer_id as i64)
- .try_map(|r: PgRow| r.try_get_u64(0))
+ .try_map(|r: PgRow| r.try_get(0))
.fetch_one(self.pool)
.await
.unwrap()
diff --git a/taler-cyclos/src/constants.rs b/taler-cyclos/src/constants.rs
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2025 Taler Systems SA
+ Copyright (C) 2025, 2026 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -17,3 +17,4 @@
use taler_common::config::parser::ConfigSource;
pub const CONFIG_SOURCE: ConfigSource = ConfigSource::new("taler-cyclos", "cyclos", "taler-cyclos");
+pub const SYNC_CURSOR_KEY: &str = "sync_cursor";
diff --git a/taler-cyclos/src/cyclos_api/client.rs b/taler-cyclos/src/cyclos_api/client.rs
@@ -18,6 +18,7 @@ use std::borrow::Cow;
use http_client::sse::SseClient;
use hyper::Method;
+use jiff::Timestamp;
use serde_json::json;
use taler_common::types::amount::Decimal;
use url::Url;
@@ -57,7 +58,7 @@ impl Client<'_> {
.await
}
- pub async fn balance(&self, account_type_id: u64) -> ApiResult<Account> {
+ pub async fn balance(&self, account_type_id: i64) -> ApiResult<Account> {
self.request(Method::GET, format!("self/accounts/{account_type_id}"))
.parse_json()
.await
@@ -71,8 +72,8 @@ impl Client<'_> {
pub async fn direct_payment(
&self,
- creditor: u64,
- payment_type_id: u64,
+ creditor: i64,
+ payment_type_id: i64,
amount: Decimal,
description: &str,
) -> ApiResult<Transaction> {
@@ -88,7 +89,7 @@ impl Client<'_> {
.await
}
- pub async fn chargeback(&self, transfer_id: u64) -> ApiResult<u64> {
+ pub async fn chargeback(&self, transfer_id: i64) -> ApiResult<i64> {
self.request(Method::POST, format!("transfers/{transfer_id}/chargeback"))
.parse_json()
.await
@@ -96,22 +97,26 @@ impl Client<'_> {
pub async fn history(
&self,
- account_type_id: u64,
+ account_type_id: i64,
order_by: OrderBy,
page: u64,
+ since: Option<Timestamp>,
) -> ApiResult<Pagination<Vec<HistoryItem>>> {
- self.request(
- Method::GET,
- format!("self/accounts/{account_type_id}/history"),
- )
- .query("orderBy", order_by)
- .query("skipTotalCount", false)
- .query("page", page)
- .parse_pagination()
- .await
+ let mut req = self
+ .request(
+ Method::GET,
+ format!("self/accounts/{account_type_id}/history"),
+ )
+ .query("orderBy", order_by)
+ .query("skipTotalCount", false)
+ .query("page", page);
+ if let Some(since) = since {
+ req = req.query("datePeriod", since)
+ }
+ req.query("begin", since).parse_pagination().await
}
- pub async fn transfer(&self, transfer_id: u64) -> ApiResult<Transfer> {
+ pub async fn transfer(&self, transfer_id: i64) -> ApiResult<Transfer> {
self.request(Method::GET, format!("transfers/{transfer_id}"))
.parse_json()
.await
@@ -119,7 +124,7 @@ impl Client<'_> {
pub async fn push_notifications(
&self,
- client_id: u64,
+ client_id: i64,
sse_client: &mut SseClient,
) -> ApiResult<()> {
self.request(Method::GET, "push/subscribe")
diff --git a/taler-cyclos/src/cyclos_api/types.rs b/taler-cyclos/src/cyclos_api/types.rs
@@ -107,6 +107,7 @@ pub struct RelatedAccount {
pub kind: AccountKind,
}
+#[derive(Debug)]
pub struct Pagination<T> {
pub page: T,
pub current_page: u64,
diff --git a/taler-cyclos/src/db.rs b/taler-cyclos/src/db.rs
@@ -87,11 +87,11 @@ pub async fn notification_listener(
#[derive(Debug, Clone)]
pub struct TxIn {
- pub transfer_id: u64,
- pub tx_id: Option<u64>,
+ pub transfer_id: i64,
+ pub tx_id: Option<i64>,
pub amount: Decimal,
pub subject: String,
- pub debtor_id: u64,
+ pub debtor_id: i64,
pub debtor_name: String,
pub valued_at: Timestamp,
}
@@ -120,11 +120,11 @@ impl Display for TxIn {
#[derive(Debug, Clone)]
pub struct TxOut {
- pub transfer_id: u64,
- pub tx_id: Option<u64>,
+ pub transfer_id: i64,
+ pub tx_id: Option<i64>,
pub amount: Decimal,
pub subject: String,
- pub creditor_id: u64,
+ pub creditor_id: i64,
pub creditor_name: String,
pub valued_at: Timestamp,
}
@@ -153,10 +153,10 @@ impl Display for TxOut {
#[derive(Debug, PartialEq, Eq)]
pub struct Initiated {
- pub id: u64,
+ pub id: i64,
pub amount: Decimal,
pub subject: String,
- pub creditor_id: u64,
+ pub creditor_id: i64,
pub creditor_name: String,
}
@@ -180,7 +180,7 @@ impl Display for Initiated {
pub struct TxInAdmin {
pub amount: Decimal,
pub subject: String,
- pub debtor_id: u64,
+ pub debtor_id: i64,
pub debtor_name: CompactString,
pub metadata: IncomingSubject,
}
@@ -216,7 +216,7 @@ pub async fn register_tx_in_admin(
)
.bind_decimal(&tx.amount)
.bind(&tx.subject)
- .bind(tx.debtor_id as i64)
+ .bind(tx.debtor_id)
.bind(&tx.debtor_name)
.bind_timestamp(now)
.bind(tx.metadata.ty())
@@ -248,11 +248,11 @@ pub async fn register_tx_in(
FROM register_tx_in($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8, $9, $10, $11)
",
)
- .bind(tx.transfer_id as i64)
- .bind(tx.tx_id.map(|it| it as i64))
+ .bind(tx.transfer_id)
+ .bind(tx.tx_id.map(|it| it))
.bind_decimal(&tx.amount)
.bind(&tx.subject)
- .bind(tx.debtor_id as i64)
+ .bind(tx.debtor_id)
.bind(&tx.debtor_name)
.bind(tx.valued_at.as_microsecond())
.bind(subject.as_ref().map(|it| it.ty()))
@@ -276,7 +276,7 @@ pub async fn register_tx_in(
#[derive(Debug)]
pub enum TxOutKind {
Simple,
- Bounce(u64),
+ Bounce(i64),
Talerable(OutgoingSubject),
}
@@ -295,7 +295,7 @@ pub enum RegisterResult {
#[derive(Debug, PartialEq, Eq)]
pub struct AddOutgoingResult {
pub result: RegisterResult,
- pub row_id: u64,
+ pub row_id: i64,
}
pub async fn register_tx_out(
@@ -310,11 +310,11 @@ pub async fn register_tx_out(
FROM register_tx_out($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8, $9, $10, $11, $12)
",
)
- .bind(tx.transfer_id as i64)
- .bind(tx.tx_id.map(|it| it as i64))
+ .bind(tx.transfer_id)
+ .bind(tx.tx_id)
.bind_decimal(&tx.amount)
.bind(&tx.subject)
- .bind(tx.creditor_id as i64)
+ .bind(tx.creditor_id)
.bind(&tx.creditor_name)
.bind_timestamp(&tx.valued_at);
let query = match kind {
@@ -336,7 +336,7 @@ pub async fn register_tx_out(
.try_map(|r: PgRow| {
Ok(AddOutgoingResult {
result: r.try_get(0)?,
- row_id: r.try_get_u64(1)?,
+ row_id: r.try_get(1)?,
})
})
.fetch_one(db)
@@ -356,7 +356,7 @@ pub struct Transfer {
pub amount: Decimal,
pub exchange_base_url: Url,
pub wtid: ShortHashCode,
- pub creditor_id: u64,
+ pub creditor_id: i64,
pub creditor_name: CompactString,
}
@@ -405,7 +405,7 @@ pub struct BounceResult {
pub async fn register_bounced_tx_in(
db: &mut PgConnection,
tx: &TxIn,
- chargeback_id: u64,
+ chargeback_id: i64,
reason: &str,
now: &Timestamp,
) -> sqlx::Result<BounceResult> {
@@ -678,10 +678,10 @@ pub async fn pending_batch<'a>(
.bind_timestamp(start)
.try_map(|r: PgRow| {
Ok(Initiated {
- id: r.try_get_u64(0)?,
+ id: r.try_get(0)?,
amount: r.try_get_decimal(1, 2)?,
subject: r.try_get(3)?,
- creditor_id: r.try_get_u64(4)?,
+ creditor_id: r.try_get(4)?,
creditor_name: r.try_get(5)?,
})
})
@@ -692,9 +692,9 @@ pub async fn pending_batch<'a>(
/** Update status of a successful submitted initiated transaction */
pub async fn initiated_submit_success<'a>(
db: impl PgExecutor<'a>,
- initiated_id: u64,
+ initiated_id: i64,
timestamp: &Timestamp,
- tx_id: u64,
+ tx_id: i64,
) -> sqlx::Result<()> {
sqlx::query(
"
@@ -703,8 +703,8 @@ pub async fn initiated_submit_success<'a>(
WHERE initiated_id=$3
"
).bind_timestamp(timestamp)
- .bind(tx_id as i64)
- .bind(initiated_id as i64)
+ .bind(tx_id)
+ .bind(initiated_id)
.execute(db).await?;
Ok(())
}
@@ -712,7 +712,7 @@ pub async fn initiated_submit_success<'a>(
/** Update status of a permanently failed initiated transaction */
pub async fn initiated_submit_permanent_failure<'a>(
db: impl PgExecutor<'a>,
- initiated_id: u64,
+ initiated_id: i64,
msg: &str,
) -> sqlx::Result<()> {
sqlx::query(
@@ -723,7 +723,7 @@ pub async fn initiated_submit_permanent_failure<'a>(
",
)
.bind(msg)
- .bind(initiated_id as i64)
+ .bind(initiated_id)
.execute(db)
.await?;
Ok(())
@@ -739,7 +739,7 @@ pub enum ChargebackFailureResult {
/** Update status of a charged back initiated transaction */
pub async fn initiated_chargeback_failure(
db: &mut PgConnection,
- transfer_id: u64,
+ transfer_id: i64,
) -> sqlx::Result<ChargebackFailureResult> {
Ok(
sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)")
@@ -808,7 +808,7 @@ impl CyclosTypeHelper for PgRow {
name: I,
root: &CompactString,
) -> sqlx::Result<FullCyclosPayto> {
- let idx = self.try_get_u64(idx)?;
+ let idx = self.try_get(idx)?;
let name = self.try_get(name)?;
Ok(FullCyclosPayto::new(
CyclosAccount {
@@ -824,7 +824,7 @@ impl CyclosTypeHelper for PgRow {
name: I,
root: &CompactString,
) -> sqlx::Result<PaytoURI> {
- let idx = self.try_get_u64(idx)?;
+ let idx = self.try_get(idx)?;
let name = self.try_get(name)?;
Ok(CyclosAccount {
id: CyclosId(idx),
@@ -841,7 +841,7 @@ mod test {
use compact_str::CompactString;
use jiff::{Span, Timestamp};
use serde_json::json;
- use sqlx::{PgConnection, PgPool, postgres::PgRow};
+ use sqlx::{PgConnection, PgPool, Row as _, postgres::PgRow};
use taler_api::{
db::TypeHelper,
subject::{IncomingSubject, OutgoingSubject},
@@ -921,7 +921,7 @@ mod test {
let now = now_sql_stable_timestamp();
let later = now + Span::new().hours(2);
let tx = TxIn {
- transfer_id: now.as_microsecond() as u64,
+ transfer_id: now.as_microsecond() as i64,
tx_id: None,
amount: decimal("10"),
subject: "subject".to_owned(),
@@ -964,7 +964,7 @@ mod test {
db::register_tx_in(
db,
&TxIn {
- transfer_id: later.as_microsecond() as u64,
+ transfer_id: later.as_microsecond() as i64,
valued_at: later,
..tx
},
@@ -1110,7 +1110,7 @@ mod test {
async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) {
let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out")
- .try_map(|r: PgRow| r.try_get_u64(0))
+ .try_map(|r: PgRow| r.try_get(0))
.fetch_one(&mut *db)
.await
.unwrap();
diff --git a/taler-cyclos/src/dev.rs b/taler-cyclos/src/dev.rs
@@ -70,7 +70,7 @@ pub async fn dev(cfg: &Config, cmd: DevCmd) -> anyhow::Result<()> {
let mut page_idx = 0;
loop {
let page = client
- .history(*cfg.worker.account_type_id, OrderBy::DateAsc, page_idx)
+ .history(*cfg.worker.account_type_id, OrderBy::DateAsc, page_idx, None)
.await?;
for transfer in page.page {
let tx = extract_tx_info(transfer);
diff --git a/taler-cyclos/src/notification.rs b/taler-cyclos/src/notification.rs
@@ -26,7 +26,7 @@ use crate::cyclos_api::{
};
pub async fn watch_notification(client: &Client<'_>, notify: &Notify) -> ! {
- let client_id = Timestamp::now().as_microsecond() as u64;
+ let client_id = Timestamp::now().as_microsecond() as i64;
let mut sse_client = SseClient::new();
let mut jitter = ExpoBackoffDecorr::default();
loop {
diff --git a/taler-cyclos/src/payto.rs b/taler-cyclos/src/payto.rs
@@ -28,10 +28,10 @@ pub struct CyclosAccount {
#[derive(
Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
)]
-pub struct CyclosId(pub u64);
+pub struct CyclosId(pub i64);
impl Deref for CyclosId {
- type Target = u64;
+ type Target = i64;
fn deref(&self) -> &Self::Target {
&self.0
@@ -52,7 +52,7 @@ impl FromStr for CyclosId {
type Err = CyclosIdError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
- Ok(Self(u64::from_str(s).map_err(CyclosIdError)?))
+ Ok(Self(i64::from_str(s).map_err(CyclosIdError)?))
}
}
diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs
@@ -31,6 +31,7 @@ use tracing::{debug, error, info, trace, warn};
use crate::{
config::{AccountType, WorkerCfg},
+ constants::SYNC_CURSOR_KEY,
cyclos_api::{
api::{CyclosAuth, CyclosErr},
client::Client,
@@ -38,6 +39,7 @@ use crate::{
},
db::{
self, AddIncomingResult, ChargebackFailureResult, RegisterResult, TxIn, TxOut, TxOutKind,
+ kv_get, kv_set,
},
notification::watch_notification,
};
@@ -154,8 +156,8 @@ pub struct Worker<'a> {
pub client: &'a Client<'a>,
pub db: &'a mut PgConnection,
pub currency: Currency,
- pub account_type_id: u64,
- pub payment_type_id: u64,
+ pub account_type_id: i64,
+ pub payment_type_id: i64,
pub account_type: AccountType,
}
@@ -169,15 +171,19 @@ impl Worker<'_> {
};
// Sync transactions
- //let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused
+ let mut cursor: Timestamp = kv_get(&mut *self.db, SYNC_CURSOR_KEY)
+ .await?
+ .unwrap_or_default();
- let mut page_idx = 0;
loop {
let page = self
.client
- .history(self.account_type_id, OrderBy::DateAsc, page_idx)
+ .history(self.account_type_id, OrderBy::DateAsc, 0, Some(cursor))
.await?;
for transfer in page.page {
+ if transfer.date > cursor {
+ cursor = transfer.date;
+ }
let tx = extract_tx_info(transfer);
match tx {
Tx::In(tx_in) => self.ingest_in(tx_in).await?,
@@ -185,12 +191,11 @@ impl Worker<'_> {
}
}
+ kv_set(&mut *self.db, SYNC_CURSOR_KEY, &cursor).await?;
+
if !page.has_next_page {
break;
- } else {
- page_idx += 1;
}
- // TODO store cursor
}
// Send transactions
diff --git a/taler-cyclos/tests/api.rs b/taler-cyclos/tests/api.rs
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2025 Taler Systems SA
+ Copyright (C) 2025, 2026 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -104,9 +104,9 @@ async fn outgoing_history() {
db::register_tx_out(
&mut *conn,
&db::TxOut {
- transfer_id: i as u64,
+ transfer_id: i as i64,
tx_id: if i % 2 == 0 {
- Some((i % 2) as u64)
+ Some((i % 2) as i64)
} else {
None
},