commit 21a3305026bcd4a57e10266aa4b2310f1e50236a
parent ac63613a10968694202b4dfd11548fd8f44649ea
Author: Antoine A <>
Date: Tue, 28 Oct 2025 17:33:19 +0100
magnet-bank: init harness and improve worker logic
Diffstat:
18 files changed, 723 insertions(+), 194 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -211,9 +211,9 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
-version = "1.2.41"
+version = "1.2.43"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ac9fe6cdbb24b6ade63616c0a0688e45bb56732262c158df3c0c4bea4ca47cb7"
+checksum = "739eb0f94557554b3ca9a86d2d37bebd49c5e6d0c1d2bda35ba5bdac830befc2"
dependencies = [
"find-msvc-tools",
"shlex",
@@ -874,11 +874,11 @@ dependencies = [
[[package]]
name = "home"
-version = "0.5.11"
+version = "0.5.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "589533453244b0995c858700322199b2becb13b627df2851f64a2775d024abcf"
+checksum = "cc627f471c528ff0c4a49e1d5e60450c8f6461dd6d10ba9dcd3a61d3dff7728d"
dependencies = [
- "windows-sys 0.59.0",
+ "windows-sys 0.61.2",
]
[[package]]
@@ -1177,9 +1177,9 @@ dependencies = [
[[package]]
name = "js-sys"
-version = "0.3.81"
+version = "0.3.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "ec48937a97411dcb524a265206ccd4c90bb711fca92b2792c407f268825b9305"
+checksum = "b011eec8cc36da2aab2d5cff675ec18454fad408585853910a202391cf9f8e65"
dependencies = [
"once_cell",
"wasm-bindgen",
@@ -1476,9 +1476,9 @@ dependencies = [
[[package]]
name = "proc-macro2"
-version = "1.0.101"
+version = "1.0.103"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de"
+checksum = "5ee95bc4ef87b8d5ba32e8b7714ccc834865276eab0aed5c9958d00ec45f49e8"
dependencies = [
"unicode-ident",
]
@@ -1756,9 +1756,9 @@ dependencies = [
[[package]]
name = "rustls-pki-types"
-version = "1.12.0"
+version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79"
+checksum = "94182ad936a0c91c324cd46c6511b9510ed16af436d7b5bab34beab0afd55f7a"
dependencies = [
"web-time",
"zeroize",
@@ -2152,9 +2152,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
-version = "2.0.107"
+version = "2.0.108"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "2a26dbd934e5451d21ef060c018dae56fc073894c5a7896f882928a76e6d081b"
+checksum = "da58917d35242480a05c2897064da0a80589a2a0476c9a3f2fdc83b53502e917"
dependencies = [
"proc-macro2",
"quote",
@@ -2192,6 +2192,7 @@ dependencies = [
"ed25519-dalek",
"fastrand",
"http-body-util",
+ "jiff",
"libdeflater",
"listenfd",
"serde",
@@ -2250,6 +2251,7 @@ dependencies = [
"serde_json",
"serde_path_to_error",
"serde_urlencoded",
+ "serde_with",
"sha1",
"spki",
"sqlx",
@@ -2643,9 +2645,9 @@ checksum = "b8dad83b4f25e74f184f64c43b150b91efe7647395b42289f38e50566d82855b"
[[package]]
name = "wasm-bindgen"
-version = "0.2.104"
+version = "0.2.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "c1da10c01ae9f1ae40cbfac0bac3b1e724b320abfcf52229f80b547c0d250e2d"
+checksum = "da95793dfc411fbbd93f5be7715b0578ec61fe87cb1a42b12eb625caa5c5ea60"
dependencies = [
"cfg-if",
"once_cell",
@@ -2655,24 +2657,10 @@ dependencies = [
]
[[package]]
-name = "wasm-bindgen-backend"
-version = "0.2.104"
-source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "671c9a5a66f49d8a47345ab942e2cb93c7d1d0339065d4f8139c486121b43b19"
-dependencies = [
- "bumpalo",
- "log",
- "proc-macro2",
- "quote",
- "syn",
- "wasm-bindgen-shared",
-]
-
-[[package]]
name = "wasm-bindgen-futures"
-version = "0.4.54"
+version = "0.4.55"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7e038d41e478cc73bae0ff9b36c60cff1c98b8f38f8d7e8061e79ee63608ac5c"
+checksum = "551f88106c6d5e7ccc7cd9a16f312dd3b5d36ea8b4954304657d5dfba115d4a0"
dependencies = [
"cfg-if",
"js-sys",
@@ -2683,9 +2671,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
-version = "0.2.104"
+version = "0.2.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "7ca60477e4c59f5f2986c50191cd972e3a50d8a95603bc9434501cf156a9a119"
+checksum = "04264334509e04a7bf8690f2384ef5265f05143a4bff3889ab7a3269adab59c2"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -2693,31 +2681,31 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
-version = "0.2.104"
+version = "0.2.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9f07d2f20d4da7b26400c9f4a0511e6e0345b040694e8a75bd41d578fa4421d7"
+checksum = "420bc339d9f322e562942d52e115d57e950d12d88983a14c79b86859ee6c7ebc"
dependencies = [
+ "bumpalo",
"proc-macro2",
"quote",
"syn",
- "wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
-version = "0.2.104"
+version = "0.2.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "bad67dc8b2a1a6e5448428adec4c3e84c43e561d8c9ee8a9e5aabeb193ec41d1"
+checksum = "76f218a38c84bcb33c25ec7059b07847d465ce0e0a76b995e134a45adcb6af76"
dependencies = [
"unicode-ident",
]
[[package]]
name = "web-sys"
-version = "0.3.81"
+version = "0.3.82"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "9367c417a924a74cae129e6a2ae3b47fabb1f8995595ab474029da749a8be120"
+checksum = "3a1f95c0d03a47f4ae1f7a64643a6bb97465d9b740f0fa8f90ea33915c99a9a1"
dependencies = [
"js-sys",
"wasm-bindgen",
diff --git a/Cargo.toml b/Cargo.toml
@@ -23,6 +23,9 @@ serde_json = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_path_to_error = "0.1"
serde_urlencoded = "0.7"
+serde_with = { version = "3.11.0", default-features = false, features = [
+ "macros",
+] }
tokio = { version = "1.42", features = ["macros"] }
axum = "0.8"
sqlx = { version = "0.8", default-features = false, features = [
diff --git a/common/taler-api/Cargo.toml b/common/taler-api/Cargo.toml
@@ -24,6 +24,7 @@ url.workspace = true
thiserror.workspace = true
taler-common.workspace = true
sqlx.workspace = true
+jiff.workspace = true
[dev-dependencies]
taler-test-utils.workspace = true
diff --git a/common/taler-common/Cargo.toml b/common/taler-common/Cargo.toml
@@ -8,9 +8,6 @@ repository.workspace = true
license-file.workspace = true
[dependencies]
-serde_with = { version = "3.11.0", default-features = false, features = [
- "macros",
-] }
glob = "0.3"
indexmap = "2.7"
tempfile.workspace = true
@@ -18,6 +15,7 @@ jiff.workspace = true
serde.workspace = true
serde_json = { workspace = true, features = ["raw_value"] }
serde_path_to_error.workspace = true
+serde_with.workspace = true
serde_urlencoded.workspace = true
url.workspace = true
thiserror.workspace = true
diff --git a/common/taler-common/src/config.rs b/common/taler-common/src/config.rs
@@ -733,6 +733,7 @@ impl<'cfg, 'arg> Section<'cfg, 'arg> {
self.value("path", option, |it| self.config.pathsub(it, 0))
}
+ /** Access [option] as UNIX permissions */
pub fn unix_mode(&self, option: &'arg str) -> Value<'arg, Permissions> {
self.value("unix mode", option, |it| {
u32::from_str_radix(it, 8)
@@ -787,6 +788,16 @@ impl<'cfg, 'arg> Section<'cfg, 'arg> {
pub fn postgres(&self, option: &'arg str) -> Value<'arg, sqlx::postgres::PgConnectOptions> {
self.parse("Postgres URI", option)
}
+
+ /** Access [option] as a timestamp */
+ pub fn timestamp(&self, option: &'arg str) -> Value<'arg, jiff::Timestamp> {
+ self.parse("Timestamp", option)
+ }
+
+ /** Access [option] as a date time */
+ pub fn date(&self, option: &'arg str) -> Value<'arg, jiff::civil::Date> {
+ self.parse("Date", option)
+ }
}
pub struct Value<'arg, T> {
diff --git a/common/taler-common/src/types/timestamp.rs b/common/taler-common/src/types/timestamp.rs
@@ -21,11 +21,8 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer, de::Error, ser::Se
use serde_json::Value;
/// <https://docs.taler.net/core/api-common.html#tsref-type-Timestamp>
-#[derive(Debug, Clone, Copy, PartialEq, Eq)]
-pub enum Timestamp {
- Never,
- Time(jiff::Timestamp),
-}
+#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
+pub struct Timestamp(jiff::Timestamp);
#[derive(Serialize, Deserialize)]
struct TimestampImpl {
@@ -35,7 +32,7 @@ struct TimestampImpl {
impl Timestamp {
/** Timestamp corresponding to "now" */
pub fn now() -> Self {
- Self::Time(jiff::Timestamp::now())
+ Self(jiff::Timestamp::now())
}
/** Timestamp corresponding to now as it would be stored in db */
@@ -46,25 +43,31 @@ impl Timestamp {
/** Timestamp corresponding to "never" */
pub const fn never() -> Self {
- Self::Never
+ Self(jiff::Timestamp::MAX)
+ }
+
+ /** Whether timestamp correspond to "never" */
+ pub fn is_never(&self) -> bool {
+ self.0 == jiff::Timestamp::MAX
}
/** I64 equivalent of this timestamp for db storage */
pub fn as_sql_micros(&self) -> i64 {
- match self {
- Timestamp::Never => i64::MAX,
- Timestamp::Time(timestamp) => timestamp.as_microsecond(),
+ if self.is_never() {
+ i64::MAX
+ } else {
+ self.0.as_microsecond()
}
}
/** Timestamp equivalent of as i64 as stored in db */
pub fn from_sql_micros(micros: i64) -> Result<Self, String> {
if micros == i64::MAX {
- Ok(Self::Never)
+ Ok(Self::never())
} else {
let timestamp = jiff::Timestamp::from_microsecond(micros)
.map_err(|e| format!("expected timestamp micros got overflowing {micros}: {e}"))?;
- Ok(Self::Time(timestamp))
+ Ok(Self(timestamp))
}
}
}
@@ -79,13 +82,13 @@ impl<'de> Deserialize<'de> for Timestamp {
Value::Number(s) => {
if let Some(since_epoch_s) = s.as_i64() {
jiff::Timestamp::from_second(since_epoch_s)
- .map(Self::Time)
+ .map(Self)
.map_err(Error::custom)
} else {
Err(Error::custom("Expected epoch time"))
}
}
- Value::String(str) if str == "never" => Ok(Self::Never),
+ Value::String(str) if str == "never" => Ok(Self::never()),
_ => Err(Error::custom("Expected epoch time or 'never'")),
}
}
@@ -97,10 +100,11 @@ impl Serialize for Timestamp {
S: Serializer,
{
let mut se_struct = se.serialize_struct("Timestamp", 1)?;
- match self {
- Timestamp::Never => se_struct.serialize_field("t_s", "never")?,
- Timestamp::Time(time) => se_struct.serialize_field("t_s", &time.as_second())?,
- };
+ if self.is_never() {
+ se_struct.serialize_field("t_s", "never")?
+ } else {
+ se_struct.serialize_field("t_s", &self.0.as_second())?
+ }
se_struct.end()
}
@@ -108,16 +112,17 @@ impl Serialize for Timestamp {
impl Display for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- match self {
- Timestamp::Never => f.write_str("never"),
- Timestamp::Time(timestamp) => timestamp.fmt(f),
+ if self.is_never() {
+ f.write_str("never")
+ } else {
+ self.0.fmt(f)
}
}
}
impl From<jiff::Timestamp> for Timestamp {
fn from(time: jiff::Timestamp) -> Self {
- Self::Time(time)
+ Self(time)
}
}
@@ -135,9 +140,18 @@ impl Add<jiff::Span> for Timestamp {
type Output = Self;
fn add(self, rhs: jiff::Span) -> Self::Output {
- match self {
- Timestamp::Never => Self::never(),
- Timestamp::Time(timestamp) => Self::Time(timestamp + rhs),
- }
+ Self(self.0 + rhs)
+ }
+}
+
+impl PartialEq<jiff::Timestamp> for Timestamp {
+ fn eq(&self, other: &jiff::Timestamp) -> bool {
+ self.0.eq(other)
+ }
+}
+
+impl PartialOrd<jiff::Timestamp> for Timestamp {
+ fn partial_cmp(&self, other: &jiff::Timestamp) -> Option<std::cmp::Ordering> {
+ self.0.partial_cmp(other)
}
}
diff --git a/taler-magnet-bank/Cargo.toml b/taler-magnet-bank/Cargo.toml
@@ -28,6 +28,7 @@ taler-common.workspace = true
taler-api.workspace = true
clap.workspace = true
serde.workspace = true
+serde_with.workspace = true
serde_path_to_error.workspace = true
serde_urlencoded.workspace = true
thiserror.workspace = true
diff --git a/taler-magnet-bank/db/magnet-bank-procedures.sql b/taler-magnet-bank/db/magnet-bank-procedures.sql
@@ -124,6 +124,7 @@ CREATE FUNCTION register_tx_out(
IN in_valued_at INT8,
IN in_wtid BYTEA,
IN in_origin_exchange_url TEXT,
+ IN in_bounced INT8,
IN in_now INT8,
-- Success return
OUT out_tx_row_id INT8,
@@ -172,6 +173,14 @@ IF out_new THEN
);
-- Notify new outgoing talerable transaction registration
PERFORM pg_notify('taler_out', out_tx_row_id || '');
+ ELSIF in_bounced IS NOT NULL THEN
+ UPDATE initiated
+ SET
+ tx_out_id = out_tx_row_id,
+ status = 'success',
+ status_msg = NULL
+ FROM bounced JOIN tx_in USING (tx_in_id)
+ WHERE initiated.initiated_id = bounced.initiated_id AND tx_in.magnet_code = in_bounced;
END IF;
END IF;
END $$;
diff --git a/taler-magnet-bank/magnet-bank.conf b/taler-magnet-bank/magnet-bank.conf
@@ -22,6 +22,12 @@ API_URL = "https://mobil.magnetbank.hu"
# This can either can be normal or exchange.
# Exchange accounts bounce invalid incoming Taler transactions.
ACCOUNT_TYPE = exchange
+
+# Ignore all transactions prior to a certain date, useful when you want to use an existing account with old transactions that should not be bounced.
+# IGNORE_TRANSACTIONS_BEFORE = YYYY-MM-DD
+
+# Ignore all malformed transactions prior to a certain date, useful when you want to import old transactions without bouncing the malformed ones a second time
+# IGNORE_BOUNCES_BEFORE = YYYY-MM-DD
[magnet-bank-httpd]
# How "taler-magnet-bank serve" serves its API, this can either be tcp or unix
diff --git a/taler-magnet-bank/src/bin/magnet-bank-harness.rs b/taler-magnet-bank/src/bin/magnet-bank-harness.rs
@@ -0,0 +1,225 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use std::time::Duration;
+
+use clap::Parser as _;
+use jiff::{Timestamp, Zoned};
+use p256::ecdsa::SigningKey;
+use taler_common::{
+ CommonArgs,
+ cli::long_version,
+ db::{dbinit, pool},
+ taler_main,
+};
+use taler_magnet_bank::{
+ CONFIG_SOURCE,
+ config::{HarnessCfg, parse_db_cfg},
+ magnet::{Account, ApiClient, AuthClient},
+ setup,
+ worker::Worker,
+};
+
+/// Taler Magnet Bank Adapter harness test suite
+#[derive(clap::Parser, Debug)]
+#[command(long_version = long_version(), about, long_about = None)]
+struct Args {
+ #[clap(flatten)]
+ common: CommonArgs,
+
+ /// Reset database (DANGEROUS: All existing data is lost)
+ #[clap(long, short)]
+ reset: bool,
+}
+
+struct HarnessClient<'a> {
+ api: &'a ApiClient<'a>,
+ exchange: &'a Account,
+ client: &'a Account,
+ signing_key: &'a SigningKey,
+}
+
+impl HarnessClient<'_> {
+ async fn expect_balance(&self, exchange: u32, client: u32) -> anyhow::Result<()> {
+ let mut attemps = 0;
+ loop {
+ let current = self.balance().await?;
+ if current == (exchange, client) {
+ return Ok(());
+ }
+ if attemps > 20 {
+ assert_eq!(current, (exchange, client));
+ }
+ attemps += 1;
+ tokio::time::sleep(Duration::from_secs(1)).await;
+ }
+ }
+
+ async fn balance(&self) -> anyhow::Result<(u32, u32)> {
+ let exchange_balance = self.api.balance_mini(self.exchange.iban.bban()).await?;
+ let client_balance = self.api.balance_mini(self.client.iban.bban()).await?;
+ return Ok((
+ exchange_balance.balance as u32,
+ client_balance.balance as u32,
+ ));
+ }
+
+ async fn send_tx(
+ &self,
+ from: &Account,
+ to: &Account,
+ subject: &str,
+ amount: u32,
+ ) -> anyhow::Result<()> {
+ let now = Zoned::now();
+ let info = self
+ .api
+ .init_tx(
+ from.code,
+ amount as f64,
+ subject,
+ &now.date(),
+ "Name",
+ to.iban.bban(),
+ )
+ .await?;
+ self.api
+ .sign_tx(
+ self.signing_key,
+ &from.number,
+ info.code,
+ info.amount,
+ &now.date(),
+ to.iban.bban(),
+ )
+ .await?;
+ Ok(())
+ }
+
+ async fn client_send(&self, subject: &str, amount: u32) -> anyhow::Result<()> {
+ self.send_tx(self.client, self.exchange, subject, amount)
+ .await
+ }
+}
+
+fn main() {
+ let args = Args::parse();
+ taler_main(CONFIG_SOURCE, args.common, |cfg| async move {
+ // Prepare db
+ let db_cfg = parse_db_cfg(&cfg)?;
+ let pool = pool(db_cfg.cfg, "magnet_bank").await?;
+ let mut db = pool.acquire().await?.detach();
+ dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", args.reset).await?;
+
+ let cfg = HarnessCfg::parse(&cfg)?;
+ let keys = setup::load(&cfg.worker)?;
+ let client = reqwest::Client::new();
+ let client = AuthClient::new(&client, &cfg.worker.api_url, &cfg.worker.consumer)
+ .upgrade(&keys.access_token);
+ let exchange_account = client.account(cfg.worker.payto.bban()).await?;
+ let client_account = client.account(cfg.client_payto.bban()).await?;
+
+ let mut worker = Worker {
+ client: &client,
+ db: &mut db,
+ account_number: &exchange_account.number,
+ account_code: exchange_account.code,
+ key: &keys.signing_key,
+ account_type: cfg.worker.account_type,
+ ignore_tx_before: cfg.worker.ignore_tx_before,
+ ignore_bounces_before: cfg.worker.ignore_bounces_before,
+ };
+ let harness = HarnessClient {
+ api: &client,
+ exchange: &exchange_account,
+ client: &client_account,
+ signing_key: &keys.signing_key,
+ };
+
+ // Fill existing info
+ worker.run().await?;
+
+ let now = Timestamp::now();
+
+ // Load current balance
+ let (mut exchange_balance, mut client_balance) = harness.balance().await?;
+ // Send malformed transaction
+ let amount = 34;
+ harness
+ .client_send(&format!("Malformed test {now}"), amount)
+ .await?;
+ // Wait for transaction to finalize
+ harness
+ .expect_balance(exchange_balance + amount, client_balance - amount)
+ .await?;
+ // Sync and bounce
+ worker.run().await?;
+ // Wait for bounce to finalize
+ harness
+ .expect_balance(exchange_balance, client_balance)
+ .await?;
+
+ /*
+ harness.client_send("subject", 4).await?;
+ exchange_balance += 4;
+ client_balance -= 4;
+ harness
+ .expect_balance(exchange_balance, client_balance)
+ .await?;*/
+
+ /*send_tx(
+ &client,
+ &keys.signing_key,
+ &client_account,
+ &exchange_account,
+ "Test tx",
+ 33,
+ )
+ .await?;
+
+ let exchange_balance = client.balance_mini(exchange_account.iban.bban()).await?;
+ let client_balance = client.balance_mini(client_account.iban.bban()).await?;
+ dbg!(exchange_balance, client_balance);*/
+
+ // Println WTF
+ /*
+ let wtid = EddsaPublicKey::rand();
+ let now = Zoned::now();
+ let info = client
+ .init_tx(
+ client_account.code,
+ 123.0,
+ &format!("Taler test {wtid}"),
+ &now.date(),
+ "Name",
+ exchange_account.iban.bban(),
+ )
+ .await?;
+ worker.run().await?;
+ client
+ .sign_tx(
+ &keys.signing_key,
+ &client_account.number,
+ info.code,
+ info.amount,
+ &now.date(),
+ exchange_account.iban.bban(),
+ )
+ .await?;
+ worker.run().await?;*/
+ Ok(())
+ });
+}
diff --git a/taler-magnet-bank/src/config.rs b/taler-magnet-bank/src/config.rs
@@ -14,6 +14,7 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
+use jiff::tz::TimeZone;
use reqwest::Url;
use taler_api::{
Serve,
@@ -21,6 +22,7 @@ use taler_api::{
};
use taler_common::{
config::{Config, ValueErr},
+ map_config,
types::payto::PaytoURI,
};
@@ -38,6 +40,7 @@ pub fn parse_account_payto(cfg: &Config) -> Result<FullHuPayto, ValueErr> {
Ok(FullHuPayto::new(iban, name))
}
+/// taler-magnet-bank httpd config
pub struct ServeCfg {
pub payto: PaytoURI,
pub serve: Serve,
@@ -65,11 +68,21 @@ impl ServeCfg {
}
}
+#[derive(Debug, Clone, Copy)]
+pub enum AccountType {
+ Exchange,
+ Normal,
+}
+
+/// taler-magnet-bank worker config
pub struct WorkerCfg {
pub payto: FullHuPayto,
pub api_url: Url,
pub consumer: Token,
pub keys_path: String,
+ pub account_type: AccountType,
+ pub ignore_tx_before: Option<jiff::Timestamp>,
+ pub ignore_bounces_before: Option<jiff::Timestamp>,
}
impl WorkerCfg {
@@ -78,12 +91,46 @@ impl WorkerCfg {
let sect = cfg.section("magnet-bank-worker");
Ok(Self {
payto,
+ account_type: map_config!(sect, "account type", "ACCOUNT_TYPE",
+ "exchange" => { Ok(AccountType::Exchange) },
+ "normal" => { Ok(AccountType::Normal) }
+ )
+ .require()?,
api_url: sect.parse("URL", "API_URL").require()?,
consumer: Token {
key: sect.str("CONSUMER_KEY").require()?,
secret: sect.str("CONSUMER_SECRET").require()?,
},
keys_path: sect.path("KEYS_FILE").require()?,
+ ignore_tx_before: sect
+ .date("IGNORE_TRANSACTIONS_BEFORE")
+ .opt()?
+ .map(|d| d.to_zoned(TimeZone::system()).unwrap().timestamp()),
+ ignore_bounces_before: sect
+ .date("IGNORE_BOUNCES_BEFORE")
+ .opt()?
+ .map(|d| d.to_zoned(TimeZone::system()).unwrap().timestamp()),
+ })
+ }
+}
+
+/// magnet-bank-harness config
+pub struct HarnessCfg {
+ pub worker: WorkerCfg,
+ pub client_payto: FullHuPayto,
+}
+
+impl HarnessCfg {
+ pub fn parse(cfg: &Config) -> Result<Self, ValueErr> {
+ let worker = WorkerCfg::parse(cfg)?;
+
+ let sect = cfg.section("magnet-bank-harness");
+ let iban: HuIban = sect.parse("iban", "IBAN").require()?;
+ let name = sect.str("NAME").require()?;
+
+ Ok(Self {
+ worker,
+ client_payto: FullHuPayto::new(iban, name),
})
}
}
diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs
@@ -68,14 +68,19 @@ pub struct TxIn {
impl Display for TxIn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let TxIn {
+ let Self {
code,
amount,
subject,
debtor,
value_date,
} = self;
- write!(f, "{value_date} {amount} {code} {debtor} '{subject}'")
+ write!(
+ f,
+ "{value_date} {amount} {code} ({} {}) '{subject}'",
+ debtor.bban(),
+ debtor.name
+ )
}
}
@@ -90,14 +95,44 @@ pub struct TxOut {
impl Display for TxOut {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- let TxOut {
+ let Self {
code,
amount,
subject,
creditor,
value_date,
} = self;
- write!(f, "{value_date} {amount} {code} {creditor} '{subject}'")
+ write!(
+ f,
+ "{value_date} {amount} {code} ({} {}) '{subject}'",
+ creditor.bban(),
+ &creditor.name
+ )
+ }
+}
+
+#[derive(Debug, PartialEq, Eq)]
+pub struct Initiated {
+ pub id: u64,
+ pub amount: Amount,
+ pub subject: String,
+ pub creditor: FullHuPayto,
+}
+
+impl Display for Initiated {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ let Self {
+ id,
+ amount,
+ subject,
+ creditor,
+ } = self;
+ write!(
+ f,
+ "{id} {amount} ({} {}) '{subject}'",
+ creditor.bban(),
+ &creditor.name
+ )
}
}
@@ -125,24 +160,6 @@ pub enum AddIncomingResult {
ReservePubReuse,
}
-#[derive(Debug, PartialEq, Eq)]
-pub struct Initiated {
- pub id: u64,
- pub amount: Amount,
- pub subject: String,
- pub creditor: FullHuPayto,
-}
-
-impl Display for Initiated {
- fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
- write!(
- f,
- "{} {} {} '{}'",
- self.id, self.amount, self.creditor, self.subject
- )
- }
-}
-
pub async fn register_tx_in_admin(
db: &PgPool,
tx: &TxInAdmin,
@@ -212,16 +229,23 @@ pub async fn register_tx_in(
.await
}
+#[derive(Debug)]
+pub enum TxOutKind {
+ Simple,
+ Bounce(u32),
+ Talerable(OutgoingSubject),
+}
+
pub async fn register_tx_out(
db: &mut PgConnection,
tx: &TxOut,
- subject: &Option<OutgoingSubject>,
+ kind: &TxOutKind,
now: &Timestamp,
) -> sqlx::Result<AddOutgoingResult> {
- sqlx::query(
+ let query = sqlx::query(
"
SELECT out_new, out_tx_row_id
- FROM register_tx_out($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10)
+ FROM register_tx_out($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10, $11)
",
)
.bind(tx.code as i64)
@@ -229,18 +253,31 @@ pub async fn register_tx_out(
.bind(&tx.subject)
.bind(tx.creditor.iban())
.bind(&tx.creditor.name)
- .bind_timestamp(&tx.value_date)
- .bind(subject.as_ref().map(|it| it.0.as_ref()))
- .bind(subject.as_ref().map(|it| it.1.as_str()))
- .bind_timestamp(now)
- .try_map(|r: PgRow| {
- Ok(AddOutgoingResult {
- new: r.try_get(0)?,
- row_id: r.try_get_u64(1)?,
+ .bind_timestamp(&tx.value_date);
+ let query = match kind {
+ TxOutKind::Simple => query
+ .bind(None::<&[u8]>)
+ .bind(None::<&str>)
+ .bind(None::<i64>),
+ TxOutKind::Bounce(bounced) => query
+ .bind(None::<&[u8]>)
+ .bind(None::<&str>)
+ .bind(*bounced as i64),
+ TxOutKind::Talerable(subject) => query
+ .bind(subject.0.as_ref())
+ .bind(subject.1.as_ref())
+ .bind(None::<i64>),
+ };
+ query
+ .bind_timestamp(now)
+ .try_map(|r: PgRow| {
+ Ok(AddOutgoingResult {
+ new: r.try_get(0)?,
+ row_id: r.try_get_u64(1)?,
+ })
})
- })
- .fetch_one(db)
- .await
+ .fetch_one(db)
+ .await
}
#[derive(Debug, PartialEq, Eq)]
@@ -552,7 +589,9 @@ pub async fn pending_batch<'a>(
"
SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name
FROM initiated
- WHERE magnet_code IS NULL AND (last_submitted IS NULL OR last_submitted < $1)
+ WHERE magnet_code IS NULL
+ AND status='pending'
+ AND (last_submitted IS NULL OR last_submitted < $1)
LIMIT 100
",
)
@@ -589,8 +628,8 @@ pub async fn initiated_submit_success<'a>(
Ok(())
}
-/** Update status of a successful submitted initiated transaction */
-pub async fn initiated_submit_failure<'a>(
+/** Update status of a permanently failed initiated transaction */
+pub async fn initiated_submit_permanent_failure<'a>(
db: impl PgExecutor<'a>,
id: u64,
timestamp: &Timestamp,
@@ -599,7 +638,7 @@ pub async fn initiated_submit_failure<'a>(
sqlx::query(
"
UPDATE initiated
- SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, status_msg=$2
+ SET status='permanent_failure', status_msg=$2
WHERE initiated_id=$3
",
)
@@ -613,7 +652,6 @@ pub async fn initiated_submit_failure<'a>(
#[cfg(test)]
mod test {
-
use jiff::Span;
use sqlx::{PgConnection, PgPool, postgres::PgRow};
use taler_api::{
@@ -633,7 +671,7 @@ mod test {
constant::CURRENCY,
db::{
self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, TransferResult,
- TxIn, TxOut, make_transfer, register_bounce_tx_in, register_tx_in,
+ TxIn, TxOut, TxOutKind, make_transfer, register_bounce_tx_in, register_tx_in,
register_tx_in_admin, register_tx_out,
},
magnet_payto,
@@ -855,11 +893,7 @@ mod test {
async fn tx_out() {
let (mut db, pool) = setup().await;
- async fn routine(
- db: &mut PgConnection,
- first: &Option<OutgoingSubject>,
- second: &Option<OutgoingSubject>,
- ) {
+ async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) {
let (id, code) =
sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out")
.try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?)))
@@ -879,7 +913,7 @@ mod test {
};
// Insert
assert_eq!(
- register_tx_out(db, &tx, &first, &now)
+ register_tx_out(&mut *db, &tx, first, &now)
.await
.expect("register tx out"),
AddOutgoingResult {
@@ -890,12 +924,12 @@ mod test {
// Idempotent
assert_eq!(
register_tx_out(
- db,
+ &mut *db,
&TxOut {
value_date: later,
..tx.clone()
},
- &first,
+ first,
&now
)
.await
@@ -908,13 +942,13 @@ mod test {
// Many
assert_eq!(
register_tx_out(
- db,
+ &mut *db,
&TxOut {
code: code + 1,
value_date: later,
..tx.clone()
},
- &second,
+ second,
&now
)
.await
@@ -935,22 +969,25 @@ mod test {
);
// Regular transaction
- routine(&mut db, &None, &None).await;
+ routine(&mut db, &TxOutKind::Simple, &TxOutKind::Simple).await;
// Talerable transaction
routine(
&mut db,
- &Some(OutgoingSubject(
+ &TxOutKind::Talerable(OutgoingSubject(
ShortHashCode::rand(),
url("https://exchange.com"),
)),
- &Some(OutgoingSubject(
+ &TxOutKind::Talerable(OutgoingSubject(
ShortHashCode::rand(),
url("https://exchange.com"),
)),
)
.await;
+ // Bounced transaction
+ routine(&mut db, &TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await;
+
// History
assert_eq!(
db::outgoing_history(&pool, &History::default(), fake_listen)
@@ -1224,7 +1261,7 @@ mod test {
let (mut db, _) = setup().await;
// Unknown transfer
- db::initiated_submit_failure(&mut db, 1, &Timestamp::now(), "msg")
+ db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg")
.await
.unwrap();
db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
@@ -1304,9 +1341,9 @@ mod test {
.expect("pending_batch");
assert_eq!(pendings.len(), 93);
- // Skip tried since start
+ // Skip failed
for i in 0..=10 {
- db::initiated_submit_failure(&mut db, 10 + i, &Timestamp::now(), "failure")
+ db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure")
.await
.expect("status failure");
}
@@ -1314,9 +1351,5 @@ mod test {
.await
.expect("pending_batch");
assert_eq!(pendings.len(), 83);
- let pendings = db::pending_batch(&mut db, &Timestamp::now())
- .await
- .expect("pending_batch");
- assert_eq!(pendings.len(), 93);
}
}
diff --git a/taler-magnet-bank/src/lib.rs b/taler-magnet-bank/src/lib.rs
@@ -42,7 +42,9 @@ pub const MAX_MAGNET_BBAN_SIZE: usize = 24;
pub const CONFIG_SOURCE: ConfigSource =
ConfigSource::new("taler-magnet-bank", "magnet-bank", "taler-magnet-bank");
-#[derive(Debug, Clone, PartialEq, Eq)]
+#[derive(
+ Debug, Clone, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay,
+)]
pub struct HuIban(IBAN);
impl HuIban {
@@ -165,6 +167,12 @@ impl FromStr for HuIban {
}
}
+impl std::fmt::Display for HuIban {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ self.0.fmt(f)
+ }
+}
+
/// Parse a magnet payto URI, panic if malformed
pub fn magnet_payto(url: impl AsRef<str>) -> FullHuPayto {
url.as_ref().parse().expect("invalid magnet payto")
diff --git a/taler-magnet-bank/src/magnet.rs b/taler-magnet-bank/src/magnet.rs
@@ -24,9 +24,12 @@ use p256::{
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use spki::EncodePublicKey;
-use taler_common::types::{amount, iban::IBAN};
+use taler_common::types::amount;
-use crate::magnet::{error::MagnetBuilder, oauth::OAuthBuilder};
+use crate::{
+ HuIban,
+ magnet::{error::MagnetBuilder, oauth::OAuthBuilder},
+};
pub mod error;
mod oauth;
@@ -123,7 +126,7 @@ pub struct Account {
#[serde(rename = "deviza")]
pub currency: Currency,
#[serde(rename = "ibanSzamlaszam")]
- pub iban: IBAN,
+ pub iban: HuIban,
#[serde(rename = "kod")]
pub code: u64,
#[serde(rename = "szamlaszam")]
@@ -138,6 +141,14 @@ pub struct Account {
}
#[derive(Debug, Deserialize)]
+pub struct BalanceMini {
+ #[serde(rename = "pozicio")]
+ pub balance: f64,
+ #[serde(rename = "frissites")]
+ pub last_update_time: jiff::Timestamp,
+}
+
+#[derive(Debug, Deserialize)]
pub struct PartnerAccounts {
pub partner: Partner,
#[serde(rename = "bankszamlaList")]
@@ -428,6 +439,15 @@ impl ApiClient<'_> {
.account)
}
+ pub async fn balance_mini(&self, bban: &str) -> ApiResult<BalanceMini> {
+ self.client
+ .get(self.join(&format!("/RESTApi/resources/v2/egyenleg/{bban}/szukitett")))
+ .oauth(self.consumer, Some(self.access), None)
+ .await
+ .magnet_json()
+ .await
+ }
+
pub async fn page_tx(
&self,
direction: Direction,
@@ -448,7 +468,7 @@ impl ApiClient<'_> {
req = req.query(&[("statusz", status)]);
}
req.query(&[("terheles", direction)])
- .query(&[("tranzakciofrissites", true)])
+ .query(&[("tranzakciofrissites", true), ("ascending", true)])
.oauth(self.consumer, Some(self.access), None)
.await
.magnet_call()
diff --git a/taler-magnet-bank/src/main.rs b/taler-magnet-bank/src/main.rs
@@ -129,6 +129,9 @@ async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> {
account_number: &account.number,
account_code: account.code,
key: &keys.signing_key,
+ account_type: cfg.account_type,
+ ignore_tx_before: cfg.ignore_tx_before,
+ ignore_bounces_before: cfg.ignore_bounces_before,
};
worker.run().await?;
}
diff --git a/taler-magnet-bank/src/setup.rs b/taler-magnet-bank/src/setup.rs
@@ -153,7 +153,7 @@ pub async fn setup(cfg: WorkerCfg, reset: bool) -> anyhow::Result<()> {
let mut ibans = Vec::new();
for partner in res.partners {
for account in partner.bank_accounts {
- if cfg.payto.0 == account.iban {
+ if *cfg.payto == account.iban {
if partner.partner.name != cfg.payto.name {
warn!(
"Expected name '{}' from config got '{}' from bank",
diff --git a/taler-magnet-bank/src/worker.rs b/taler-magnet-bank/src/worker.rs
@@ -14,6 +14,8 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
+use std::num::ParseIntError;
+
use jiff::{Zoned, civil::Date};
use p256::ecdsa::SigningKey;
use sqlx::PgConnection;
@@ -23,13 +25,17 @@ use taler_common::types::{
iban::IBAN,
timestamp::Timestamp,
};
-use tracing::{debug, info};
+use tracing::{debug, error, info, trace, warn};
use crate::{
FullHuPayto, HuIban,
- db::{self, AddIncomingResult, Initiated, TxIn, TxOut},
+ config::AccountType,
+ db::{self, AddIncomingResult, Initiated, TxIn, TxOut, TxOutKind},
failure_injection::fail_point,
- magnet::{ApiClient, Direction, Transaction, error::ApiError},
+ magnet::{
+ ApiClient, Direction, Transaction,
+ error::{ApiError, MagnetError},
+ },
};
#[derive(Debug, thiserror::Error)]
@@ -48,13 +54,16 @@ pub struct Worker<'a> {
pub account_number: &'a str,
pub account_code: u64,
pub key: &'a SigningKey,
+ pub account_type: AccountType,
+ pub ignore_tx_before: Option<jiff::Timestamp>,
+ pub ignore_bounces_before: Option<jiff::Timestamp>,
}
impl Worker<'_> {
/// Run a single worker pass
pub async fn run(&mut self) -> WorkerResult {
// Sync transactions
- let mut next = None;
+ let mut next = None; // TODO Load current state from the db
loop {
let page = self
.client
@@ -65,74 +74,168 @@ impl Worker<'_> {
let tx = extract_tx_info(item.tx);
match tx {
Tx::In(tx_in) => {
+ if let Some(before) = self.ignore_tx_before
+ && tx_in.value_date < before
+ {
+ debug!(target: "worker", "ignore in {tx_in}");
+ continue;
+ }
let bounce = async |db: &mut PgConnection,
reason: &str|
-> Result<(), WorkerError> {
- let res = db::register_bounce_tx_in(
- db,
- &tx_in,
- &tx_in.amount,
- reason,
- &Timestamp::now(),
- )
- .await?;
-
- if res.tx_new {
- info!("incoming {tx_in} bounced in {}: {reason}", res.bounce_id);
+ if let Some(before) = self.ignore_bounces_before
+ && tx_in.value_date < before
+ {
+ match db::register_tx_in(db, &tx_in, &None, &Timestamp::now())
+ .await?
+ {
+ AddIncomingResult::Success { new, .. } => {
+ if new {
+ info!(target: "worker", "in {tx_in} skip bounce: {reason}");
+ } else {
+ debug!(target: "worker", "in {tx_in} already skil bounce ");
+ }
+ }
+ AddIncomingResult::ReservePubReuse => unreachable!(),
+ }
} else {
- debug!(
- "incoming {tx_in} already seen and bounced in {}: {reason}",
- res.bounce_id
- );
- }
- Ok(())
- };
- match parse_incoming_unstructured(&tx_in.subject) {
- Ok(None) => bounce(self.db, "missing public key").await?,
- Ok(Some(subject)) => {
- let res = db::register_tx_in(
- self.db,
+ let res = db::register_bounce_tx_in(
+ db,
&tx_in,
- &Some(subject),
+ &tx_in.amount,
+ reason,
&Timestamp::now(),
)
.await?;
- match res {
+
+ if res.tx_new {
+ info!(target: "worker",
+ "in {tx_in} bounced in {}: {reason}",
+ res.bounce_id
+ );
+ } else {
+ debug!(target: "worker",
+ "in {tx_in} already seen and bounced in {}: {reason}",
+ res.bounce_id
+ );
+ }
+ }
+ Ok(())
+ };
+ match self.account_type {
+ AccountType::Exchange => {
+ match parse_incoming_unstructured(&tx_in.subject) {
+ Ok(None) => bounce(self.db, "missing public key").await?,
+ Ok(Some(subject)) => match db::register_tx_in(
+ self.db,
+ &tx_in,
+ &Some(subject),
+ &Timestamp::now(),
+ )
+ .await?
+ {
+ AddIncomingResult::Success { new, .. } => {
+ if new {
+ info!(target: "worker", "in {tx_in}");
+ } else {
+ debug!(target: "worker", "in {tx_in} already seen");
+ }
+ }
+ AddIncomingResult::ReservePubReuse => {
+ bounce(self.db, "reserve pub reuse").await?
+ }
+ },
+ Err(e) => bounce(self.db, &e.to_string()).await?,
+ }
+ }
+ AccountType::Normal => {
+ match db::register_tx_in(self.db, &tx_in, &None, &Timestamp::now())
+ .await?
+ {
AddIncomingResult::Success { new, .. } => {
if new {
- info!("incoming {tx_in}");
+ info!(target: "worker", "in {tx_in}");
} else {
- debug!("incoming {tx_in}");
+ debug!(target: "worker", "in {tx_in} already seen");
}
}
- AddIncomingResult::ReservePubReuse => {
- bounce(self.db, "reserve pub reuse").await?
- }
+ AddIncomingResult::ReservePubReuse => unreachable!(),
}
}
- Err(e) => bounce(self.db, &e.to_string()).await?,
}
}
Tx::Out(tx_out) => {
- let subject = subject::parse_outgoing(&tx_out.subject);
- let res =
- db::register_tx_out(self.db, &tx_out, &subject.ok(), &Timestamp::now())
+ match self.account_type {
+ AccountType::Exchange => {
+ // TODO log status (known | recovered | founded)
+ if let Ok(subject) = subject::parse_outgoing(&tx_out.subject) {
+ let res = db::register_tx_out(
+ self.db,
+ &tx_out,
+ &TxOutKind::Talerable(subject),
+ &Timestamp::now(),
+ )
+ .await?;
+ if res.new {
+ info!(target: "worker", "out {tx_out}");
+ } else {
+ debug!(target: "worker", "out {tx_out} already seen");
+ }
+ } else if let Ok(bounced) = parse_bounce_outgoing(&tx_out.subject) {
+ let res = db::register_tx_out(
+ self.db,
+ &tx_out,
+ &TxOutKind::Bounce(bounced),
+ &Timestamp::now(),
+ )
+ .await?;
+ if res.new {
+ info!(target: "worker", "out (bounce) {tx_out}");
+ } else {
+ debug!(target: "worker", "out (bounce) {tx_out} already seen");
+ }
+ } else {
+ let res = db::register_tx_out(
+ self.db,
+ &tx_out,
+ &TxOutKind::Simple,
+ &Timestamp::now(),
+ )
+ .await?;
+ if res.new {
+ warn!(target: "worker", "out (malformed) {tx_out}");
+ } else {
+ debug!(target: "worker", "out (malformed) {tx_out} already seen");
+ }
+ }
+ }
+ AccountType::Normal => {
+ let res = db::register_tx_out(
+ self.db,
+ &tx_out,
+ &TxOutKind::Simple,
+ &Timestamp::now(),
+ )
.await?;
- // TODO log recovered & log malformed
- if res.new {
- info!("outgoing {tx_out}");
- } else {
- debug!("outgoing {tx_out} already seen");
+ if res.new {
+ info!(target: "worker", "out {tx_out}");
+ } else {
+ debug!(target: "worker", "out {tx_out} already seen");
+ }
+ }
}
}
}
}
+
if next.is_none() {
break;
+ } else {
+ // TODO Store current state in the db
}
}
- // Send transactions
+ // Send transactions
let start = Timestamp::now();
let now = Zoned::now();
loop {
@@ -141,6 +244,7 @@ impl Worker<'_> {
break;
}
for tx in batch {
+ debug!(target: "worker", "send tx {tx}");
self.create_tx(&tx, &now).await?;
}
}
@@ -157,18 +261,18 @@ impl Worker<'_> {
// we delete it
// TODO
self.client.delete_tx(tx.code).await?;
- debug!("outgoing {}: delete uncompleted orphan", tx.code);
+ debug!(target: "worker", "out {}: delete uncompleted orphan", tx.code);
Ok(())
}
/// Create and sign a forint transfer
pub async fn create_tx(&mut self, tx: &Initiated, now: &Zoned) -> WorkerResult {
- debug!("create tx {tx}");
+ trace!(target: "worker", "create tx {tx}");
assert_eq!(tx.amount.frac, 0);
let date = now.date();
// Initialize the new transaction, on failure an orphan initiated transaction can be created
- let info = self
+ let res = self
.client
.init_tx(
self.account_code,
@@ -178,12 +282,45 @@ impl Worker<'_> {
&tx.creditor.name,
tx.creditor.bban(),
)
- .await?;
- debug!("created tx {}", info.code);
- // TODO handle API errors gracefully
- fail_point("submit-success");
- // Update transaction status, on failure the initiated transaction will be orphan
- db::initiated_submit_success(&mut *self.db, tx.id, &Timestamp::now(), info.code).await?;
+ .await;
+ fail_point("submit-create-tx");
+ let info = match res {
+ // Check if succeeded
+ Ok(info) => {
+ // Update transaction status, on failure the initiated transaction will be orphan
+ db::initiated_submit_success(&mut *self.db, tx.id, &Timestamp::now(), info.code)
+ .await?;
+ info
+ }
+ // Check if error is permanent
+ Err(ApiError::Magnet(MagnetError {
+ error_code,
+ short_message,
+ long_message,
+ })) if matches!(
+ (error_code, short_message.as_str()),
+ (404, "BSZLA_NEM_TALALHATO") // Unknown account
+ | (409, "FORRAS_SZAMLA_ESZAMLA_EGYEZIK") // Same account
+ ) =>
+ {
+ let e = MagnetError {
+ error_code,
+ short_message,
+ long_message,
+ };
+ db::initiated_submit_permanent_failure(
+ &mut *self.db,
+ tx.id,
+ &Timestamp::now(),
+ &e.to_string(),
+ )
+ .await?;
+ error!(target: "worker", "initiated tx {tx} failed: {e}");
+ return WorkerResult::Ok(());
+ }
+ Err(e) => return WorkerResult::Err(WorkerError::Api(e)),
+ };
+ trace!(target: "worker", "created tx {}", info.code);
// Sign transaction
self.sign_tx(info.code, info.amount, &date, tx.creditor.bban())
@@ -199,7 +336,7 @@ impl Worker<'_> {
date: &Date,
creditor: &str,
) -> WorkerResult {
- debug!("sign tx {tx_code}");
+ debug!(target: "worker", "sign tx {tx_code}");
fail_point("sign-tx");
// Sign initiated transaction, on failure we will retry
self.client
@@ -212,7 +349,6 @@ impl Worker<'_> {
creditor,
)
.await?;
- // TODO handle API errors gracefully
Ok(())
}
}
@@ -251,3 +387,24 @@ pub fn extract_tx_info(tx: Transaction) -> Tx {
})
}
}
+
+#[derive(Debug, thiserror::Error)]
+pub enum BounceSubjectErr {
+ #[error("missing parts")]
+ MissingParts,
+ #[error("not a bounce")]
+ NotBounce,
+ #[error("malformed bounced id: {0}")]
+ Id(#[from] ParseIntError),
+}
+
+pub fn parse_bounce_outgoing(subject: &str) -> Result<u32, BounceSubjectErr> {
+ let (prefix, id) = subject
+ .rsplit_once(" ")
+ .ok_or(BounceSubjectErr::MissingParts)?;
+ if !prefix.starts_with("bounce") {
+ return Err(BounceSubjectErr::NotBounce);
+ }
+ let id: u32 = id.parse()?;
+ Ok(id)
+}
diff --git a/taler-magnet-bank/tests/api.rs b/taler-magnet-bank/tests/api.rs
@@ -24,7 +24,12 @@ use taler_common::{
api_wire::{OutgoingHistory, TransferState, WireConfig},
types::{amount::amount, payto::payto, timestamp::Timestamp, url},
};
-use taler_magnet_bank::{CONFIG_SOURCE, api::MagnetApi, db, magnet_payto};
+use taler_magnet_bank::{
+ CONFIG_SOURCE,
+ api::MagnetApi,
+ db::{self, TxOutKind},
+ magnet_payto,
+};
use taler_test_utils::{
Router, db_test_setup,
routine::{admin_add_incoming_routine, revenue_routine, routine_pagination, transfer_routine},
@@ -100,7 +105,7 @@ async fn outgoing_history() {
),
value_date: now,
},
- &Some(OutgoingSubject(
+ &TxOutKind::Talerable(OutgoingSubject(
ShortHashCode::rand(),
url("https://exchange.test"),
)),