commit 8ea13f1b751f0a4ee98dec9d4074942854c88690
parent 6c313f6bf54c394179988f761e87cbbbf27290b9
Author: Antoine A <>
Date: Thu, 23 Jan 2025 12:02:58 +0100
magnet: WIP worker
Diffstat:
7 files changed, 336 insertions(+), 70 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -142,9 +142,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "axum"
-version = "0.8.2"
+version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "efea76243612a2436fb4074ba0cf3ba9ea29efdeb72645d8fc63f116462be1de"
+checksum = "6d6fd624c75e18b3b4c6b9caf42b1afe24437daaee904069137d8bab077be8b8"
dependencies = [
"axum-core",
"bytes",
@@ -176,12 +176,12 @@ dependencies = [
[[package]]
name = "axum-core"
-version = "0.5.1"
+version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "eab1b0df7cded837c40dacaa2e1c33aa17c84fc3356ae67b5645f1e83190753e"
+checksum = "df1362f362fd16024ae199c1970ce98f9661bf5ef94b9808fee734bc3698b733"
dependencies = [
"bytes",
- "futures-core",
+ "futures-util",
"http 1.2.0",
"http-body",
"http-body-util",
@@ -1360,9 +1360,9 @@ checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130"
[[package]]
name = "is-terminal"
-version = "0.4.14"
+version = "0.4.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "3f187290c0ed3dfe3f7c85bedddd320949b68fc86ca0ceb71adfb05b3dc3af2a"
+checksum = "e19b23d53f35ce9f56aebc7d1bb4e6ac1e9c0db7ac85c8d1760c04379edced37"
dependencies = [
"hermit-abi",
"libc",
diff --git a/common/taler-api/src/subject.rs b/common/taler-api/src/subject.rs
@@ -90,6 +90,12 @@ pub enum IncomingSubjectResult {
Ambiguous,
}
+#[derive(Debug, PartialEq, Eq, thiserror::Error)]
+pub enum IncomingSubjectErr {
+ #[error("found multiple public keys")]
+ Ambiguous,
+}
+
#[derive(Debug, thiserror::Error)]
pub enum OutgoingSubjectErr {
#[error("missing parts")]
@@ -121,7 +127,9 @@ pub fn parse_outgoing(subject: &str) -> Result<OutgoingSubject, OutgoingSubjectE
* To parse them while ignoring user errors, we reconstruct valid keys from key
* parts, resolving ambiguities where possible.
**/
-pub fn parse_incoming_unstructured(subject: &str) -> Option<IncomingSubjectResult> {
+pub fn parse_incoming_unstructured(
+ subject: &str,
+) -> Result<Option<IncomingSubject>, IncomingSubjectErr> {
// We expect subject to be less than 65KB
assert!(subject.len() <= u16::MAX as usize);
@@ -206,7 +214,7 @@ pub fn parse_incoming_unstructured(subject: &str) -> Option<IncomingSubjectResul
(IncomingType::kyc | IncomingType::wad, IncomingType::reserve)
)
{
- return Some(IncomingSubjectResult::Ambiguous);
+ return Err(IncomingSubjectErr::Ambiguous);
}
}
None => best = Some(other),
@@ -215,7 +223,7 @@ pub fn parse_incoming_unstructured(subject: &str) -> Option<IncomingSubjectResul
}
}
- best.map(|it| IncomingSubjectResult::Success(it.subject))
+ Ok(best.map(|it| it.subject))
}
#[test]
@@ -238,7 +246,7 @@ fn parse() {
let other_standard = &format!("{prefix}{other}");
let other_mixed = &format!("{prefix}TEGY6d9mh9pgwvwpgs0z0095z854xegfy7jj202yd0esp8p0za60");
- let result = Some(IncomingSubjectResult::Success(match ty {
+ let result = Ok(Some(match ty {
IncomingType::reserve => {
IncomingSubject::Reserve(EddsaPublicKey::from_str(key).unwrap())
}
@@ -297,7 +305,7 @@ fn parse() {
] {
assert_eq!(
parse_incoming_unstructured(&case),
- Some(IncomingSubjectResult::Ambiguous)
+ Err(IncomingSubjectErr::Ambiguous)
);
}
@@ -313,7 +321,7 @@ fn parse() {
for case in [format!("{mixed_l}-{mixed_r} {standard_l}-{standard_r}")] {
let res = parse_incoming_unstructured(&case);
if ty == IncomingType::reserve {
- assert_eq!(res, Some(IncomingSubjectResult::Ambiguous));
+ assert_eq!(res, Err(IncomingSubjectErr::Ambiguous));
} else {
assert_eq!(res, result);
}
@@ -325,7 +333,7 @@ fn parse() {
&standard[1..], // Check fail if missing char
"2MZT6RS3RVB3B0E2RDMYW0YRA3Y0VPHYV0CYDE6XBB0YMPFXCEG0", // Check fail if not a valid key
] {
- assert_eq!(parse_incoming_unstructured(&case), None);
+ assert_eq!(parse_incoming_unstructured(&case), Ok(None));
}
if ty == IncomingType::kyc {
@@ -360,7 +368,7 @@ fn real() {
),
] {
assert_eq!(
- Some(IncomingSubjectResult::Success(IncomingSubject::Reserve(
+ Ok(Some(IncomingSubject::Reserve(
EddsaPublicKey::from_str(key).unwrap(),
))),
parse_incoming_unstructured(subject)
@@ -372,7 +380,7 @@ fn real() {
"JW398X85FWPKKMS0EYB6TQ1799RMY5DDXTZFPW4YC3WJ2DWSJT70",
)] {
assert_eq!(
- Some(IncomingSubjectResult::Success(IncomingSubject::Kyc(
+ Ok(Some(IncomingSubject::Kyc(
EddsaPublicKey::from_str(key).unwrap(),
))),
parse_incoming_unstructured(subject)
diff --git a/wire-gateway/magnet-bank/src/dev.rs b/wire-gateway/magnet-bank/src/dev.rs
@@ -21,16 +21,15 @@ use taler_common::{
types::{
amount::Amount,
payto::{FullPayto, Payto},
- timestamp::Timestamp,
},
};
use tracing::info;
use crate::{
config::MagnetConfig,
- db::{TxIn, TxOut},
keys,
magnet::{AuthClient, Direction},
+ worker::{extract_tx_info, Tx},
MagnetPayto,
};
@@ -97,33 +96,10 @@ pub async fn dev(cfg: Config, cmd: DevCmd) -> anyhow::Result<()> {
.await?;
next = page.next;
for item in page.list {
- let tx = item.tx;
- if tx.amount.is_sign_positive() {
- let amount = format!("{}:{}", tx.currency, tx.amount);
- let tx = TxIn {
- code: tx.code,
- amount: amount.parse().unwrap(),
- subject: tx.subject,
- debtor: MagnetPayto {
- number: tx.bank_account,
- name: tx.bank_account_owner,
- },
- timestamp: Timestamp::from(tx.value_date),
- };
- info!("in {tx}");
- } else {
- let amount = format!("{}:{}", tx.currency, -tx.amount);
- let tx = TxOut {
- code: tx.code,
- amount: amount.parse().unwrap(),
- subject: tx.subject,
- creditor: MagnetPayto {
- number: tx.bank_account,
- name: tx.bank_account_owner,
- },
- timestamp: Timestamp::from(tx.value_date),
- };
- info!("out {tx}");
+ let tx = extract_tx_info(item.tx);
+ match tx {
+ Tx::In(tx_in) => info!("in {tx_in}"),
+ Tx::Out(tx_out) => info!("out {tx_out}"),
}
}
if next.is_none() {
@@ -149,18 +125,18 @@ pub async fn dev(cfg: Config, cmd: DevCmd) -> anyhow::Result<()> {
debtor.code,
amount.val as f64,
&subject,
- date,
+ &date,
&full.receiver_name,
&creditor.number,
)
- .await?;
+ .await?.tx;
client
.sign_tx(
&keys.signing_key,
&debtor.number,
init.code,
init.amount,
- date,
+ &date,
&creditor.number,
)
.await?;
diff --git a/wire-gateway/magnet-bank/src/lib.rs b/wire-gateway/magnet-bank/src/lib.rs
@@ -23,6 +23,12 @@ pub mod dev;
pub mod keys;
pub mod magnet;
pub mod wire_gateway;
+pub mod worker;
+pub mod failure_injection {
+ pub fn fail_point(_name: &'static str) {
+ // TODO inject failures for error handling tests
+ }
+}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct MagnetPayto {
diff --git a/wire-gateway/magnet-bank/src/magnet.rs b/wire-gateway/magnet-bank/src/magnet.rs
@@ -188,6 +188,8 @@ pub enum Direction {
#[derive(Debug, Deserialize)]
pub struct TxInfo {
+ #[serde(flatten)]
+ pub tx: Transaction,
#[serde(rename = "alairas1idopont")]
pub first_signature: Option<Timestamp>,
#[serde(rename = "alairas2idopont")]
@@ -196,25 +198,10 @@ pub struct TxInfo {
pub first_signatory: Option<Partner>,
#[serde(rename = "alairo2")]
pub second_signatory: Option<Partner>,
- #[serde(rename = "kod")]
- pub code: u64,
- #[serde(rename = "bankszamla")]
- pub account: Account,
- #[serde(rename = "deviza")]
- pub currency: Currency,
- #[serde(rename = "eszamla")]
- pub counter_account: String,
- #[serde(rename = "epartner")]
- pub counter_name: String,
- #[serde(rename = "statusz")]
- pub status: TxStatus,
- #[serde(rename = "osszegSigned")]
- pub amount: f64,
#[serde(rename = "reszteljesites")]
pub partial_execution: bool,
#[serde(rename = "sorbaallitas")]
pub queued: bool,
- pub eam: Option<u64>,
}
#[derive(Debug, Deserialize)]
@@ -252,9 +239,12 @@ pub struct Transaction {
#[serde(rename = "erteknap")]
pub value_date: jiff::Timestamp,
#[serde(rename = "eszamla")]
- pub debtor: String,
+ pub counter_account: String,
+ #[serde(rename = "epartner")]
+ pub counter_name: String,
#[serde(rename = "tranzakcioTipus")]
pub ty: Option<String>,
+ pub eam: Option<u64>,
}
#[derive(Debug, Deserialize)]
@@ -429,7 +419,8 @@ impl ApiClient<'_> {
if let Some(next) = next {
req = req
.query(&[("nextId", next.next_id)])
- .query(&[("nextTipus", &next.next_type)]);
+ .query(&[("nextTipus", &next.next_type)])
+ .query(&[("tranzakciofrissite", true)]);
}
if let Some(status) = status {
req = req.query(&[("statusz", status)]);
@@ -450,7 +441,7 @@ impl ApiClient<'_> {
account_code: u64,
amount: f64,
subject: &str,
- date: jiff::civil::Date,
+ date: &jiff::civil::Date,
creditor_name: &str,
creditor_account: &str,
) -> ApiResult<TxInfo> {
@@ -463,7 +454,7 @@ impl ApiClient<'_> {
#[serde(rename = "kozlemeny")]
subject: &'a str,
#[serde(rename = "ertekNap")]
- date: jiff::civil::Date,
+ date: &'a jiff::civil::Date,
#[serde(rename = "ellenpartner")]
creditor_name: &'a str,
#[serde(rename = "ellenszamla")]
@@ -494,7 +485,7 @@ impl ApiClient<'_> {
account: &str,
tx_code: u64,
amount: f64,
- date: jiff::civil::Date,
+ date: &jiff::civil::Date,
creditor: &str,
) -> ApiResult<TxInfo> {
#[derive(Serialize)]
@@ -508,7 +499,7 @@ impl ApiClient<'_> {
#[serde(rename = "osszeg")]
amount: f64,
#[serde(rename = "ertekNap")]
- date: jiff::civil::Date,
+ date: &'a jiff::civil::Date,
signature: &'a str,
}
@@ -532,4 +523,13 @@ impl ApiClient<'_> {
.await?
.info)
}
+
+ pub async fn delete_tx(&self, tx_code: u64) -> ApiResult<()> {
+ self.client
+ .delete(self.join(&format!("/RESTApi/resources/v2/tranzakcio/{tx_code}")))
+ .oauth(self.consumer, Some(self.access), None)
+ .await
+ .magnet_empty()
+ .await
+ }
}
diff --git a/wire-gateway/magnet-bank/src/main.rs b/wire-gateway/magnet-bank/src/main.rs
@@ -22,12 +22,15 @@ use magnet_bank::{
db,
dev::{self, DevCmd},
keys,
+ magnet::AuthClient,
wire_gateway::MagnetWireGateway,
+ worker::Worker,
+ MagnetPayto,
};
use sqlx::PgPool;
use taler_common::{
config::{parser::ConfigSource, Config},
- types::payto::payto,
+ types::payto::{payto, Payto},
};
use tracing::{error, Level};
use tracing_subscriber::{util::SubscriberInitExt as _, FmtSubscriber};
@@ -63,6 +66,11 @@ enum Command {
},
/// Run magnet-bank HTTP server
Serve,
+ /// Run magnet-bank worker
+ Worker {
+ // TODO account in config
+ account: Payto,
+ },
/// Hidden dev commands
#[command(subcommand, hide(true))]
Dev(DevCmd),
@@ -119,6 +127,27 @@ async fn app(args: Args) -> anyhow::Result<()> {
)
.await?;
}
+ Command::Worker { account } => {
+ let db = DbConfig::parse(&cfg)?;
+ let pool = PgPool::connect_with(db.cfg).await?;
+ let cfg = MagnetConfig::parse(&cfg)?;
+ let keys = keys::load(&cfg)?;
+ let client = reqwest::Client::new();
+ let client =
+ AuthClient::new(&client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token);
+ let account = MagnetPayto::try_from(&account)?;
+ let account = client.account(&account.number).await?;
+ let mut db = pool.acquire().await?.detach();
+ // TODO run in loop and handle errors
+ let mut worker = Worker {
+ client: &client,
+ db: &mut db,
+ account_number: &account.number,
+ account_code: account.code,
+ key: &keys.signing_key,
+ };
+ worker.run().await?;
+ }
Command::Dev(dev_cmd) => dev::dev(cfg, dev_cmd).await?,
}
Ok(())
diff --git a/wire-gateway/magnet-bank/src/worker.rs b/wire-gateway/magnet-bank/src/worker.rs
@@ -0,0 +1,247 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use jiff::{civil::Date, Zoned};
+use p256::ecdsa::SigningKey;
+use sqlx::PgConnection;
+use taler_api::subject::{self, parse_incoming_unstructured};
+use taler_common::types::{
+ amount::{self},
+ timestamp::Timestamp,
+};
+use tracing::{debug, info};
+
+use crate::{
+ db::{self, AddIncomingResult, Initiated, TxIn, TxOut},
+ failure_injection::fail_point,
+ magnet::{error::ApiError, ApiClient, Direction, Transaction},
+ MagnetPayto,
+};
+
+#[derive(Debug, thiserror::Error)]
+pub enum WorkerError {
+ #[error(transparent)]
+ Db(#[from] sqlx::Error),
+ #[error(transparent)]
+ Api(#[from] ApiError),
+}
+
+type WorkerResult = Result<(), WorkerError>;
+
+pub struct Worker<'a> {
+ pub client: &'a ApiClient<'a>,
+ pub db: &'a mut PgConnection,
+ pub account_number: &'a str,
+ pub account_code: u64,
+ pub key: &'a SigningKey,
+}
+
+impl Worker<'_> {
+ /// Run a single worker pass
+ pub async fn run(&mut self) -> WorkerResult {
+ // Sync transactions
+ let mut next = None;
+ loop {
+ let page = self
+ .client
+ .page_tx(Direction::Both, 100, self.account_number, &next, &None)
+ .await?;
+ next = page.next;
+ for item in page.list {
+ let tx = extract_tx_info(item.tx);
+ match tx {
+ Tx::In(tx_in) => {
+ async fn bounce(
+ db: &mut PgConnection,
+ tx_in: &TxIn,
+ _reason: &str,
+ ) -> Result<(), WorkerError> {
+ // TODO bounce in db
+ let res = db::register_tx_in(db, &tx_in, &None).await?;
+ match res {
+ AddIncomingResult::Success(registered_tx) => {
+ if registered_tx.new {
+ info!("incoming {tx_in} bounced in TODO");
+ } else {
+ debug!("incoming {tx_in} already seen and bounced in TODO");
+ }
+ }
+ AddIncomingResult::ReservePubReuse => {
+ unreachable!("no reserve pub key")
+ }
+ }
+
+ Ok(())
+ }
+ match parse_incoming_unstructured(&tx_in.subject) {
+ Ok(None) => bounce(self.db, &tx_in, "missing public key").await?,
+ Ok(Some(subject)) => {
+ let res =
+ db::register_tx_in(self.db, &tx_in, &Some(subject)).await?;
+ match res {
+ AddIncomingResult::Success(registered_tx) => {
+ if registered_tx.new {
+ info!("incoming {tx_in}");
+ } else {
+ debug!("incoming {tx_in}");
+ }
+ }
+ AddIncomingResult::ReservePubReuse => {
+ bounce(self.db, &tx_in, "reserve pub reuse").await?
+ }
+ }
+ }
+ Err(e) => bounce(self.db, &tx_in, &e.to_string()).await?,
+ }
+ }
+ Tx::Out(tx_out) => {
+ let subject = subject::parse_outgoing(&tx_out.subject);
+ let res = db::register_tx_out(self.db, &tx_out, &subject.ok()).await?;
+ // TODO log recovered & log malformed
+ if res.new {
+ info!("outgoing {tx_out}");
+ } else {
+ debug!("outgoing {tx_out} already seen");
+ }
+ }
+ }
+ }
+ if next.is_none() {
+ break;
+ }
+ }
+
+ // Send transactions
+ let start = Timestamp::now();
+ let now = Zoned::now();
+ loop {
+ let batch = db::pending_batch(&mut *self.db, &start).await?;
+ if batch.is_empty() {
+ break;
+ }
+ for tx in batch {
+ self.create_tx(&tx, &now).await?;
+ }
+ }
+ Ok(())
+ }
+
+ /// Try to sign an unsigned initiated transaction
+ pub async fn recover_tx(&mut self, tx: &Transaction) -> WorkerResult {
+ // This transaction have not been signed yet, something went wrong
+ // if in db
+ // Then try to sign it -> we completed the transaction
+ // else
+ // The transaction is unknowned (we failed after creating it and before storing it in the db)
+ // we delete it
+ // TODO
+ self.client.delete_tx(tx.code).await?;
+ debug!("outgoing {}: delete uncompleted orphan", tx.code);
+
+ Ok(())
+ }
+
+ /// Create and sign a forint transfer
+ pub async fn create_tx<'a>(&mut self, tx: &Initiated, now: &Zoned) -> WorkerResult {
+ debug!("create tx {tx}");
+ assert_eq!(tx.amount.frac, 0);
+ let date = now.date();
+ // Initialize the new transaction, on failure an orphan initiated transaction can be created
+ let info = self
+ .client
+ .init_tx(
+ self.account_code,
+ tx.amount.val as f64,
+ &tx.subject,
+ &date,
+ &tx.creditor.name,
+ &tx.creditor.number,
+ )
+ .await?.tx;
+ debug!("created tx {}", info.code);
+ // TODO handle API errors gracefully
+ fail_point("submit-success");
+ // Update transaction status, on failure the initiated transaction will be orphan
+ db::initiated_submit_success(&mut *self.db, tx.id, &Timestamp::now(), info.code).await?;
+
+ // Sign transaction
+ self.sign_tx(info.code, info.amount, &date, &tx.creditor.number)
+ .await?;
+ Ok(())
+ }
+
+ /** Sign an initiated forint transfer */
+ pub async fn sign_tx(
+ &mut self,
+ tx_code: u64,
+ amount: f64,
+ date: &Date,
+ creditor: &str,
+ ) -> WorkerResult {
+ debug!("sign tx {tx_code}");
+ fail_point("sign-tx");
+ // Sign initiated transaction, on failure we will retry
+ self.client
+ .sign_tx(
+ &self.key,
+ self.account_number,
+ tx_code,
+ amount,
+ date,
+ creditor,
+ )
+ .await?;
+ // TODO handle API errors gracefully
+ Ok(())
+ }
+}
+
+pub enum Tx {
+ In(TxIn),
+ Out(TxOut),
+}
+
+pub fn extract_tx_info(tx: Transaction) -> Tx {
+ // TODO amount from f64 without allocations
+ let amount = amount::amount(format!("{}:{}", tx.currency, tx.amount.abs()));
+ if tx.amount.is_sign_positive() {
+ let tx = TxIn {
+ code: tx.code,
+ amount,
+ subject: tx.subject,
+ // TODO this is our account, we only have the account number of the debtor
+ debtor: MagnetPayto {
+ number: tx.counter_account,
+ name: tx.counter_name,
+ },
+ timestamp: Timestamp::from(tx.value_date),
+ };
+ Tx::In(tx)
+ } else {
+ let tx = TxOut {
+ code: tx.code,
+ amount,
+ subject: tx.subject,
+ // TODO this is our account, we only have the account number of the debtor
+ creditor: MagnetPayto {
+ number: tx.counter_account,
+ name: tx.counter_name,
+ },
+ timestamp: Timestamp::from(tx.value_date),
+ };
+ Tx::Out(tx)
+ }
+}