commit 6d73ae9d438c1c2d92c32aadfb4aea32d942d737
parent 2a5852b120d1c3f7c6c22f1113b4edd929a9cad0
Author: Antoine A <>
Date: Wed, 5 Nov 2025 12:53:41 +0100
magnet-bank: optimize worker and prepare in-db cursor sync
Diffstat:
9 files changed, 147 insertions(+), 41 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -1117,9 +1117,9 @@ checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
[[package]]
name = "iri-string"
-version = "0.7.8"
+version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "dbc5ebe9c3a1a7a5127f920a418f7585e9e758e911d0466ed004f393b0e380b2"
+checksum = "4f867b9d1d896b67beb18518eda36fdb77a32ea590de864f1325b294a6d14397"
dependencies = [
"memchr",
"serde",
@@ -1744,9 +1744,9 @@ dependencies = [
[[package]]
name = "rustls"
-version = "0.23.34"
+version = "0.23.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "6a9586e9ee2b4f8fab52a0048ca7334d7024eef48e2cb9407e3497bb7cab7fa7"
+checksum = "533f54bc6a7d4f647e46ad909549eda97bf5afc1585190ef692b4286b198bd8f"
dependencies = [
"once_cell",
"ring",
diff --git a/taler-magnet-bank/db/magnet-bank-0001.sql b/taler-magnet-bank/db/magnet-bank-0001.sql
@@ -110,4 +110,10 @@ CREATE TABLE bounced(
initiated_id INT8 NOT NULL UNIQUE REFERENCES initiated(initiated_id) ON DELETE CASCADE,
reason TEXT NOT NULL
);
-COMMENT ON TABLE tx_in IS 'Bounced transactions';
-\ No newline at end of file
+COMMENT ON TABLE tx_in IS 'Bounced transactions';
+
+CREATE TABLE kv(
+ key TEXT NOT NULL UNIQUE PRIMARY KEY,
+ value JSONB NOT NULL
+);
+COMMENT ON TABLE kv IS 'KV table';
+\ No newline at end of file
diff --git a/taler-magnet-bank/src/bin/magnet-bank-harness.rs b/taler-magnet-bank/src/bin/magnet-bank-harness.rs
@@ -51,9 +51,10 @@ struct Args {
#[clap(flatten)]
common: CommonArgs,
- /// Reset database (DANGEROUS: All existing data is lost)
#[clap(long, short)]
reset: bool,
+ #[clap(long, short)]
+ warmup: bool,
}
struct HarnessClient<'a> {
@@ -210,7 +211,11 @@ impl<'a> Balances<'a> {
return Ok(());
}
if attemps > 20 {
- assert_eq!(current, (self.exchange_balance, self.client_balance));
+ assert_eq!(
+ current,
+ (self.exchange_balance, self.client_balance),
+ "{current:?} {diff}"
+ );
}
attemps += 1;
tokio::time::sleep(Duration::from_secs(1)).await;
@@ -259,9 +264,12 @@ fn main() {
signing_key: &keys.signing_key,
};
- // Initial sync
- worker.run().await?;
- tokio::time::sleep(Duration::from_secs(10)).await;
+ if args.warmup {
+ step("Warmup");
+ worker.run().await?;
+ tokio::time::sleep(Duration::from_secs(10)).await;
+ worker.run().await?;
+ }
let now = Timestamp::now();
let balance = &mut Balances::new(&harness).await?;
@@ -314,6 +322,7 @@ fn main() {
.await?;
worker.run().await?;
balance.expect(-4).await?;
+ worker.run().await?;
step("Test transfer transactions to self");
let transfer_id = harness
diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs
@@ -17,6 +17,7 @@
use std::fmt::Display;
use jiff::{civil::Date, tz::TimeZone};
+use serde::{Serialize, de::DeserializeOwned};
use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow};
use taler_api::{
db::{BindHelper, IncomingType, TypeHelper, history, page},
@@ -65,6 +66,7 @@ pub struct TxIn {
pub subject: String,
pub debtor: FullHuPayto,
pub value_date: Date,
+ pub status: TxStatus,
}
impl Display for TxIn {
@@ -75,10 +77,11 @@ impl Display for TxIn {
subject,
debtor,
value_date,
+ status,
} = self;
write!(
f,
- "{value_date} {code} {amount} ({} {}) '{subject}'",
+ "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'",
debtor.bban(),
debtor.name
)
@@ -691,9 +694,36 @@ pub async fn initiated_exists_for_code<'a>(
.await
}
+/** Get JSON value from KV table */
+pub async fn kv_get<'a, T: DeserializeOwned + Unpin + Send>(
+ db: impl PgExecutor<'a>,
+ key: &str,
+) -> sqlx::Result<Option<T>> {
+ sqlx::query("SELECT value FROM kv WHERE key=$1")
+ .bind(key)
+ .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0))
+ .fetch_optional(db)
+ .await
+}
+
+/** Set JSON value in KV table */
+pub async fn kv_set<'a, T: Serialize>(
+ db: impl PgExecutor<'a>,
+ key: &str,
+ value: &T,
+) -> sqlx::Result<()> {
+ sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value")
+ .bind(key)
+ .bind(sqlx::types::Json(value))
+ .execute(db)
+ .await?;
+ Ok(())
+}
+
#[cfg(test)]
mod test {
use jiff::{Span, Zoned};
+ use serde_json::json;
use sqlx::{PgConnection, PgPool, postgres::PgRow};
use taler_api::{
db::TypeHelper,
@@ -712,8 +742,8 @@ mod test {
constant::CURRENCY,
db::{
self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, TransferResult,
- TxIn, TxOut, TxOutKind, make_transfer, register_bounce_tx_in, register_tx_in,
- register_tx_in_admin, register_tx_out,
+ TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer, register_bounce_tx_in,
+ register_tx_in, register_tx_in_admin, register_tx_out,
},
magnet_api::types::TxStatus,
magnet_payto,
@@ -732,6 +762,27 @@ mod test {
}
#[tokio::test]
+ async fn kv() {
+ let (mut db, _) = setup().await;
+
+ let value = json!({
+ "name": "Mr Smith",
+ "no way": 32
+ });
+
+ assert_eq!(
+ kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
+ None
+ );
+ kv_set(&mut db, "value", &value).await.unwrap();
+ kv_set(&mut db, "value", &value).await.unwrap();
+ assert_eq!(
+ kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(),
+ Some(value)
+ );
+ }
+
+ #[tokio::test]
async fn tx_in() {
let (mut db, pool) = setup().await;
@@ -757,6 +808,7 @@ mod test {
"payto://iban/HU30162000031000163100000000?receiver-name=name",
),
value_date: date,
+ status: TxStatus::Completed,
};
// Insert
assert_eq!(
@@ -1171,7 +1223,8 @@ mod test {
amount: amount.clone(),
subject: "subject".to_owned(),
debtor: payto.clone(),
- value_date: date
+ value_date: date,
+ status: TxStatus::Completed
},
&None,
&now
@@ -1194,7 +1247,8 @@ mod test {
amount: amount.clone(),
subject: "subject".to_owned(),
debtor: payto.clone(),
- value_date: date
+ value_date: date,
+ status: TxStatus::Completed
},
&amount,
"good reason",
@@ -1218,7 +1272,8 @@ mod test {
amount: amount.clone(),
subject: "subject".to_owned(),
debtor: payto.clone(),
- value_date: date
+ value_date: date,
+ status: TxStatus::Completed
},
&amount,
"good reason",
@@ -1243,7 +1298,8 @@ mod test {
amount: amount.clone(),
subject: "subject".to_owned(),
debtor: payto.clone(),
- value_date: date
+ value_date: date,
+ status: TxStatus::Completed
},
&amount,
"good reason",
@@ -1267,7 +1323,8 @@ mod test {
amount: amount.clone(),
subject: "subject".to_owned(),
debtor: payto.clone(),
- value_date: date
+ value_date: date,
+ status: TxStatus::Completed
},
&amount,
"good reason",
diff --git a/taler-magnet-bank/src/dev.rs b/taler-magnet-bank/src/dev.rs
@@ -86,7 +86,7 @@ pub async fn dev(cfg: &Config, cmd: DevCmd) -> anyhow::Result<()> {
let mut next = None;
loop {
let page = client
- .page_tx(dir, 100, account.bban(), &next, &None)
+ .page_tx(dir, 100, account.bban(), &next, next.is_none())
.await?;
next = page.next;
for item in page.list {
diff --git a/taler-magnet-bank/src/magnet_api/client.rs b/taler-magnet-bank/src/magnet_api/client.rs
@@ -31,7 +31,7 @@ use crate::magnet_api::{
oauth::{Token, TokenAuth},
types::{
Account, BalanceMini, Direction, Next, PartnerList, SmsCodeSubmission, TokenInfo,
- TransactionPage, Tx, TxStatus,
+ TransactionPage, Tx,
},
};
@@ -211,7 +211,7 @@ impl ApiClient<'_> {
limit: u16,
bban: &str,
next: &Option<Next>,
- status: &Option<TxStatus>,
+ refresh: bool,
) -> ApiResult<TransactionPage> {
let mut req = self.request(
Method::GET,
@@ -222,11 +222,8 @@ impl ApiClient<'_> {
.query(&[("nextId", next.next_id)])
.query(&[("nextTipus", &next.next_type)]);
}
- if let Some(status) = status {
- req = req.query(&[("statusz", status)]);
- }
req.query(&[("terheles", direction)])
- .query(&[("tranzakciofrissites", true), ("ascending", true)])
+ .query(&[("tranzakciofrissites", refresh), ("ascending", true)])
.parse_json()
.await
}
diff --git a/taler-magnet-bank/src/magnet_api/types.rs b/taler-magnet-bank/src/magnet_api/types.rs
@@ -159,6 +159,12 @@ pub enum TxStatus {
UnderReview,
}
+impl TxStatus {
+ pub fn is_final(&self) -> bool {
+ matches!(self, Self::Completed | Self::Rejected | Self::Canceled)
+ }
+}
+
#[derive(Clone, Copy, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub enum Direction {
#[serde(rename = "T")]
@@ -242,7 +248,7 @@ pub struct Transaction {
pub eam: Option<u64>,
}
-#[derive(Debug, Deserialize)]
+#[derive(Debug, Deserialize, Serialize)]
pub struct Next {
#[serde(rename = "next")]
pub next_id: u64,
diff --git a/taler-magnet-bank/src/worker.rs b/taler-magnet-bank/src/worker.rs
@@ -35,10 +35,12 @@ use crate::{
magnet_api::{
api::{ApiErr, ErrKind},
client::ApiClient,
- types::{Direction, Transaction, TxStatus},
+ types::{Direction, Next, Transaction, TxStatus},
},
};
+// const TXS_CURSOR_KEY: &str = "txs_cursor"; TODO cursor is broken
+
#[derive(Debug, thiserror::Error)]
pub enum WorkerError {
#[error(transparent)]
@@ -66,17 +68,27 @@ impl Worker<'_> {
/// Run a single worker pass
pub async fn run(&mut self) -> WorkerResult {
// Sync transactions
- let mut next = None; // TODO Load current state from the db
+ let mut next: Option<Next> = None; // kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused
+ let mut all_final = true;
+ let mut first = true;
loop {
let page = self
.client
- .page_tx(Direction::Both, 100, self.account_number, &next, &None)
+ .page_tx(Direction::Both, 100, self.account_number, &next, first)
.await?;
+ first = false;
next = page.next;
for item in page.list {
+ all_final &= item.tx.status.is_final();
let tx = extract_tx_info(item.tx);
match tx {
Tx::In(tx_in) => {
+ // We only register final successful incoming transactions
+ if tx_in.status != TxStatus::Completed {
+ debug!(target: "worker", "pending or failed in {tx_in}");
+ continue;
+ }
+
if let Some(before) = self.ignore_tx_before
&& tx_in.value_date < before
{
@@ -168,11 +180,26 @@ impl Worker<'_> {
}
}
Tx::Out(tx_out) => {
- if tx_out.status == TxStatus::ToBeRecorded {
- self.recover_tx(&tx_out).await?;
- continue;
- } else if tx_out.status != TxStatus::Completed {
- continue;
+ match tx_out.status {
+ TxStatus::ToBeRecorded => {
+ self.recover_tx(&tx_out).await?;
+ continue;
+ }
+ TxStatus::Rejected | TxStatus::Canceled => {
+ warn!(target: "worker", "out failed {tx_out}");
+ continue;
+ }
+ TxStatus::Completed => {}
+ TxStatus::PendingFirstSignature
+ | TxStatus::PendingSecondSignature
+ | TxStatus::PendingProcessing
+ | TxStatus::Verified
+ | TxStatus::PartiallyCompleted
+ | TxStatus::UnderReview => {
+ // Still pending
+ debug!(target: "worker", "pending out {tx_out}");
+ continue;
+ }
}
match self.account_type {
AccountType::Exchange => {
@@ -237,10 +264,14 @@ impl Worker<'_> {
}
}
- if next.is_none() {
- break;
+ if let Some(_next) = &next {
+ // Update in db cursor only if all previous transactions where final
+ if all_final {
+ // debug!(target: "worker", "advance cursor {next:?}");
+ // kv_set(&mut *self.db, TXS_CURSOR_KEY, next).await?; TODO cursor is broken
+ }
} else {
- // TODO Store current state in the db
+ break;
}
}
@@ -284,7 +315,7 @@ impl Worker<'_> {
/// Create and sign a forint transfer
pub async fn init_tx(&mut self, tx: &Initiated, now: &Zoned) -> WorkerResult {
- trace!(target: "worker", "create tx {tx}");
+ trace!(target: "worker", "init tx {tx}");
assert_eq!(tx.amount.frac, 0);
let date = now.date();
// Initialize the new transaction, on failure an orphan initiated transaction can be created
@@ -404,13 +435,13 @@ pub fn extract_tx_info(tx: Transaction) -> Tx {
};
let counter_account = FullHuPayto::new(iban, tx.counter_name);
if tx.amount.is_sign_positive() {
- assert_eq!(tx.status, TxStatus::Completed, "Can this happen ?");
Tx::In(TxIn {
code: tx.code,
amount,
subject: tx.subject,
debtor: counter_account,
value_date: tx.value_date,
+ status: tx.status,
})
} else {
Tx::Out(TxOut {
diff --git a/taler-magnet-bank/tests/api.rs b/taler-magnet-bank/tests/api.rs
@@ -29,7 +29,7 @@ use taler_magnet_bank::{
CONFIG_SOURCE,
api::MagnetApi,
db::{self, TxOutKind},
- magnet::TxStatus,
+ magnet_api::types::TxStatus,
magnet_payto,
};
use taler_test_utils::{