commit 2ee83d5e05d211489505bdc46154842d2914a7c2
parent 02ba8844ac5e62515dd6b4bbba900cf27cf4582b
Author: Antoine A <>
Date: Fri, 19 Dec 2025 19:01:35 +0100
cyclos: improve chargeback handling
Diffstat:
16 files changed, 329 insertions(+), 151 deletions(-)
diff --git a/common/taler-api/tests/common/mod.rs b/common/taler-api/tests/common/mod.rs
@@ -36,7 +36,7 @@ use taler_common::{
error_code::ErrorCode,
types::{amount::Currency, payto::payto},
};
-use taler_test_utils::db_test_setup_manual;
+use taler_test_utils::db::db_test_setup_manual;
use tokio::sync::watch::Sender;
pub mod db;
diff --git a/common/taler-common/src/api_wire.rs b/common/taler-common/src/api_wire.rs
@@ -172,6 +172,7 @@ pub enum TransferState {
pending,
transient_failure,
permanent_failure,
+ late_failure,
success,
}
@@ -181,6 +182,7 @@ impl AsRef<str> for TransferState {
TransferState::pending => "pending",
TransferState::transient_failure => "transient_failure",
TransferState::permanent_failure => "permanent_failure",
+ TransferState::late_failure => "late_failure",
TransferState::success => "success",
}
}
diff --git a/common/taler-test-utils/src/db.rs b/common/taler-test-utils/src/db.rs
@@ -0,0 +1,87 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use std::{
+ path::Path,
+ str::FromStr as _,
+ sync::{
+ OnceLock,
+ atomic::{AtomicUsize, Ordering},
+ },
+};
+
+use sqlx::{
+ PgPool,
+ postgres::{PgConnectOptions, PgPoolOptions},
+};
+use taler_api::config::DbCfg;
+use taler_common::{
+ config::{Config, parser::ConfigSource},
+ db::{dbinit, pool},
+};
+
+use crate::setup_tracing;
+
+pub async fn db_test_setup(src: ConfigSource) -> PgPool {
+ let cfg = Config::from_file(src, None::<&str>).unwrap();
+ let name = format!("{}db-postgres", src.component_name);
+ let sect = cfg.section(&name);
+ let db_cfg = DbCfg::parse(sect).unwrap();
+ db_test_setup_manual(db_cfg.sql_dir.as_ref(), src.component_name).await
+}
+
+pub async fn db_test_setup_manual(sql_dir: &Path, component_name: &str) -> PgPool {
+ println!("{sql_dir:?} {component_name}");
+ setup_tracing();
+ let cfg = test_db().await;
+ let pool = pool(cfg, &component_name.replace("-", "_")).await.unwrap();
+ let mut conn = pool.acquire().await.unwrap();
+
+ dbinit(&mut conn, sql_dir, component_name, true)
+ .await
+ .unwrap();
+ pool
+}
+
+static MASTER_POOL: OnceLock<PgPool> = OnceLock::new();
+
+const DB: &str = "postgres:///taler_rust_check";
+static NB_DB: AtomicUsize = AtomicUsize::new(0);
+
+async fn test_db() -> PgConnectOptions {
+ let master = MASTER_POOL.get_or_init(|| {
+ PgPoolOptions::new()
+ .max_connections(20)
+ .test_before_acquire(false)
+ .after_release(|_conn, _| Box::pin(async move { Ok(false) }))
+ .connect_lazy(DB)
+ .expect("pg pool")
+ });
+ let idx = NB_DB.fetch_add(1, Ordering::Relaxed);
+ // Cleanup test db
+ let name = format!("taler_rust_test_{idx}");
+ let mut conn = master.acquire().await.unwrap();
+ sqlx::raw_sql(&format!("DROP DATABASE IF EXISTS {name}"))
+ .execute(&mut *conn)
+ .await
+ .unwrap();
+ sqlx::raw_sql(&format!("CREATE DATABASE {name}"))
+ .execute(&mut *conn)
+ .await
+ .unwrap();
+ drop(conn);
+ PgConnectOptions::from_str(&format!("postgresql:/{name}")).unwrap()
+}
diff --git a/common/taler-test-utils/src/lib.rs b/common/taler-test-utils/src/lib.rs
@@ -14,84 +14,15 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use std::{
- path::Path,
- str::FromStr,
- sync::{
- OnceLock,
- atomic::{AtomicUsize, Ordering},
- },
-};
-
-use sqlx::{
- PgPool,
- postgres::{PgConnectOptions, PgPoolOptions},
-};
-
-use taler_api::config::DbCfg;
-use taler_common::{
- config::{Config, parser::ConfigSource},
- db::{dbinit, pool},
- log::taler_logger,
-};
+use taler_common::log::taler_logger;
use tracing_subscriber::util::SubscriberInitExt;
pub use axum::Router;
+pub mod db;
pub mod json;
pub mod routine;
pub mod server;
-pub async fn db_test_setup(src: ConfigSource) -> PgPool {
- let cfg = Config::from_file(src, None::<&str>).unwrap();
- let name = format!("{}db-postgres", src.component_name);
- let sect = cfg.section(&name);
- let db_cfg = DbCfg::parse(sect).unwrap();
- db_test_setup_manual(db_cfg.sql_dir.as_ref(), src.component_name).await
-}
-
-pub async fn db_test_setup_manual(sql_dir: &Path, component_name: &str) -> PgPool {
- println!("{sql_dir:?} {component_name}");
- setup_tracing();
- let cfg = test_db().await;
- let pool = pool(cfg, &component_name.replace("-", "_")).await.unwrap();
- let mut conn = pool.acquire().await.unwrap();
-
- dbinit(&mut conn, sql_dir, component_name, true)
- .await
- .unwrap();
- pool
-}
-
-static MASTER_POOL: OnceLock<PgPool> = OnceLock::new();
-
-const DB: &str = "postgres:///taler_rust_check";
-static NB_DB: AtomicUsize = AtomicUsize::new(0);
-
-async fn test_db() -> PgConnectOptions {
- let master = MASTER_POOL.get_or_init(|| {
- PgPoolOptions::new()
- .max_connections(20)
- .test_before_acquire(false)
- .after_release(|_conn, _| Box::pin(async move { Ok(false) }))
- .connect_lazy(DB)
- .expect("pg pool")
- });
- let idx = NB_DB.fetch_add(1, Ordering::Relaxed);
- // Cleanup test db
- let name = format!("taler_rust_test_{idx}");
- let mut conn = master.acquire().await.unwrap();
- sqlx::raw_sql(&format!("DROP DATABASE IF EXISTS {name}"))
- .execute(&mut *conn)
- .await
- .unwrap();
- sqlx::raw_sql(&format!("CREATE DATABASE {name}"))
- .execute(&mut *conn)
- .await
- .unwrap();
- drop(conn);
- PgConnectOptions::from_str(&format!("postgresql:/{name}")).unwrap()
-}
-
fn setup_tracing() {
taler_logger(None).try_init().ok();
}
diff --git a/docker-compose.yml b/docker-compose.yml
@@ -1,13 +1,13 @@
services:
cyclos-db:
- image: docker.io/kartoza/postgis:latest
+ image: docker.io/nickblah/postgis:latest
container_name: cyclos-db
environment:
POSTGRES_DBNAME: cyclos
POSTGRES_USER: cyclos
POSTGRES_PASSWORD: password
volumes:
- - cyclos-db-data:/var/lib/postgresql/data
+ - cyclos-db-data:/var/lib/postgresql
networks:
- cyclos-network
restart: unless-stopped
diff --git a/taler-cyclos/README.md b/taler-cyclos/README.md
@@ -14,12 +14,14 @@ ANything will do
- give money at creation
- enable channel for all users
+- enable transactions ?
+Enable chargeback in Product details
# Create two users
wire f20n4X3qV44dNoZUmpeU
client 1EkY5JJMrkwyvv9yK7x4
-Enable chargeback Product details
+
Type
Member
\ No newline at end of file
diff --git a/taler-cyclos/db/cyclos-procedures.sql b/taler-cyclos/db/cyclos-procedures.sql
@@ -203,6 +203,39 @@ END IF;
END $$;
COMMENT ON FUNCTION register_tx_out IS 'Register an outgoing transaction idempotently';
+CREATE FUNCTION register_charge_back_failure(
+ IN in_transfer_id INT8,
+ -- Success return
+ OUT out_initiated_id INT8,
+ OUT out_new BOOLEAN
+)
+LANGUAGE plpgsql AS $$
+DECLARE
+current_status transfer_status;
+BEGIN
+-- Found existing initiated transaction
+SELECT status, initiated_id
+INTO current_status, out_initiated_id
+FROM initiated
+JOIN tx_out USING (tx_out_id)
+WHERE transfer_id = in_transfer_id;
+IF NOT FOUND THEN
+ out_initiated_id=0;
+ RETURN;
+END IF;
+
+-- Update status if new
+out_new = current_status != 'late_failure';
+IF out_new THEN
+ UPDATE initiated
+ SET
+ status = 'late_failure',
+ status_msg = 'charged back'
+ WHERE initiated_id = out_initiated_id;
+END IF;
+END $$;
+COMMENT ON FUNCTION register_charge_back_failure IS 'Register an outgoing transaction chargeback idempotently';
+
CREATE FUNCTION taler_transfer(
IN in_request_uid BYTEA,
IN in_wtid BYTEA,
diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs
@@ -19,7 +19,8 @@ use std::{str::FromStr as _, time::Duration};
use clap::Parser as _;
use jiff::Timestamp;
use owo_colors::OwoColorize as _;
-use sqlx::PgPool;
+use sqlx::{PgPool, Row, postgres::PgRow};
+use taler_api::db::TypeHelper as _;
use taler_build::long_version;
use taler_common::{
CommonArgs,
@@ -38,7 +39,7 @@ use taler_cyclos::{
CyclosId, FullCyclosPayto,
config::{AccountType, parse_db_cfg},
constants::CONFIG_SOURCE,
- cyclos_api::{api::CyclosAuth, client::Client},
+ cyclos_api::{api::CyclosAuth, client::Client, types::HistoryItem},
db::{self, TransferResult},
worker::{Worker, WorkerResult},
};
@@ -92,19 +93,37 @@ impl<'a> Harness<'a> {
}
/// Send transaction from client to exchange
- async fn client_send(&self, subject: &str, amount: Decimal) {
- self.client
+ async fn client_send(&self, subject: &str, amount: Decimal) -> u64 {
+ *self
+ .client
.direct_payment(self.wire_id, amount, subject)
.await
- .unwrap();
+ .unwrap()
+ .id
}
/// Send transaction from exchange to client
- async fn exchange_send(&self, subject: &str, amount: Decimal) {
- self.wire
+ async fn exchange_send(&self, subject: &str, amount: Decimal) -> u64 {
+ *self
+ .wire
.direct_payment(self.client_id, amount, subject)
.await
- .unwrap();
+ .unwrap()
+ .id
+ }
+
+ /// Chargeback a transfer
+ async fn chargeback(&self, id: u64) -> u64 {
+ self.client.chargeback(id).await.unwrap()
+ }
+
+ /// Fetch last transfer related to client
+ async fn client_last_transfer(&self) -> HistoryItem {
+ self.client
+ .transfers(self.client_id)
+ .await
+ .unwrap()
+ .remove(0)
}
/// Run the worker once
@@ -164,6 +183,21 @@ impl<'a> Harness<'a> {
}
}
+ async fn transfer_id(&self, transfer_id: u64) -> u64 {
+ sqlx::query(
+ "SELECT transfer_id
+ FROM transfer
+ JOIN initiated USING (initiated_id)
+ JOIN tx_out USING (tx_out_id)
+ WHERE initiated_id=$1",
+ )
+ .bind(transfer_id as i64)
+ .try_map(|r: PgRow| r.try_get_u64(0))
+ .fetch_one(self.pool)
+ .await
+ .unwrap()
+ }
+
async fn expect_transfer_status(&self, id: u64, status: TransferState, msg: Option<&str>) {
let mut attempts = 0;
loop {
@@ -268,7 +302,6 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
step("Warmup");
harness.worker().await.unwrap();
-
let now = Timestamp::now();
let balance = &mut Balances::new(&harness).await;
@@ -339,7 +372,7 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
.await;
step("Test transfer to unknown account");
- // Init a transfer to self
+ // Init a transfer to unknown
let transfer_id = harness
.custom_transfer(
decimal("10.1"),
@@ -368,7 +401,53 @@ async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> {
balance.expect_sub(amount).await;
harness.worker().await?;
- step("Finish");
+ step("Test transfer chargeback");
+ let amount = decimal("10.1");
+ // Init a transfer to client
+ let transfer_id = harness
+ .custom_transfer(
+ amount,
+ &FullCyclosPayto::new(CyclosId(harness.client_id), "Chargeback".to_string()),
+ )
+ .await;
+ harness
+ .expect_transfer_status(transfer_id, TransferState::pending, None)
+ .await;
+ // Send
+ harness.worker().await?;
+ balance.expect_sub(amount).await;
+ harness
+ .expect_transfer_status(transfer_id, TransferState::pending, None)
+ .await;
+ // Sync
+ harness.worker().await?;
+ harness
+ .expect_transfer_status(transfer_id, TransferState::success, None)
+ .await;
+ // Chargeback
+ harness
+ .chargeback(harness.transfer_id(transfer_id).await)
+ .await;
+ balance.expect_add(amount).await;
+ harness.worker().await?;
+ harness
+ .expect_transfer_status(transfer_id, TransferState::late_failure, Some("charged back"))
+ .await;
+
+ step("Test recover unexpected chargeback");
+ let amount = decimal("10.22");
+ // Manual tx from the exchange
+ harness
+ .exchange_send(&format!("What is this chargebacked ? {now}"), amount)
+ .await;
+ balance.expect_sub(amount).await;
+ // Chargeback
+ harness
+ .chargeback(*harness.client_last_transfer().await.id)
+ .await;
+ balance.expect_add(amount).await;
+ // Sync
+ harness.worker().await?;
Ok(())
}
diff --git a/taler-cyclos/src/cyclos_api/client.rs b/taler-cyclos/src/cyclos_api/client.rs
@@ -37,9 +37,7 @@ impl Client<'_> {
}
pub async fn whoami(&self) -> ApiResult<User> {
- self.request(Method::GET, "users/self")
- .parse_json()
- .await
+ self.request(Method::GET, "users/self").parse_json().await
}
pub async fn accounts(&self) -> ApiResult<Vec<Account>> {
diff --git a/taler-cyclos/src/cyclos_api/types.rs b/taler-cyclos/src/cyclos_api/types.rs
@@ -123,7 +123,7 @@ pub struct Transfer {
pub kind: TransferKind,
pub currency: Currency,
pub chargeback_of: Option<TransferView>,
- pub chargeback_by: Option<TransferView>,
+ pub charged_back_by: Option<TransferView>,
#[serde(rename = "type")]
pub ty: Type,
}
diff --git a/taler-cyclos/src/db.rs b/taler-cyclos/src/db.rs
@@ -640,17 +640,15 @@ pub async fn initiated_submit_success<'a>(
pub async fn initiated_submit_permanent_failure<'a>(
db: impl PgExecutor<'a>,
initiated_id: u64,
- timestamp: &Timestamp,
msg: &str,
) -> sqlx::Result<()> {
sqlx::query(
"
UPDATE initiated
- SET status='permanent_failure', status_msg=$2
- WHERE initiated_id=$3
+ SET status='permanent_failure', status_msg=$1
+ WHERE initiated_id=$2
",
)
- .bind_timestamp(timestamp)
.bind(msg)
.bind(initiated_id as i64)
.execute(db)
@@ -658,37 +656,35 @@ pub async fn initiated_submit_permanent_failure<'a>(
Ok(())
}
-#[derive(Debug, PartialEq, Eq)]
-pub struct OutFailureResult {
- pub initiated_id: Option<u64>,
- pub new: bool,
+#[derive(Debug, Clone, Copy, PartialEq, Eq)]
+pub enum ChargebackFailureResult {
+ Unknown,
+ Known(u64),
+ Idempotent(u64),
}
-/** Update status of a charged back failed initiated transaction */
+/** Update status of a charged back initiated transaction */
pub async fn initiated_chargeback_failure(
db: &mut PgConnection,
- code: u64,
- bounced: Option<u32>,
- now: &Timestamp,
-) -> sqlx::Result<OutFailureResult> {
- todo!();
- sqlx::query(
- "
- SELECT out_new, out_initiated_id
- FROM register_tx_out_failure($1, $2, $3)
- ",
+ transfer_id: u64,
+) -> sqlx::Result<ChargebackFailureResult> {
+ Ok(
+ sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)")
+ .bind(transfer_id as i64)
+ .try_map(|r: PgRow| {
+ let id = r.try_get_u64(0)?;
+ Ok(if id == 0 {
+ ChargebackFailureResult::Unknown
+ } else if r.try_get(1)? {
+ ChargebackFailureResult::Known(id)
+ } else {
+ ChargebackFailureResult::Idempotent(id)
+ })
+ })
+ .fetch_optional(db)
+ .await?
+ .unwrap_or(ChargebackFailureResult::Unknown),
)
- .bind(code as i64)
- .bind(bounced.map(|i| i as i32))
- .bind_timestamp(now)
- .try_map(|r: PgRow| {
- Ok(OutFailureResult {
- new: r.try_get(0)?,
- initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64),
- })
- })
- .fetch_one(db)
- .await
}
/** Get JSON value from KV table */
@@ -765,7 +761,7 @@ mod test {
use taler_common::{
api_common::{EddsaPublicKey, HashCode, ShortHashCode},
api_params::{History, Page},
- api_wire::{TransferRequest, TransferState, TransferStatus},
+ api_wire::{TransferRequest, TransferState},
types::{
amount::{Currency, amount, decimal},
payto::payto,
@@ -779,8 +775,8 @@ mod test {
constants::CONFIG_SOURCE,
cyclos_payto,
db::{
- self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, TransferResult,
- TxIn, TxInAdmin, TxOut, TxOutKind,
+ self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult,
+ TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind,
},
};
@@ -791,7 +787,7 @@ mod test {
}
async fn setup() -> (PgConnection, PgPool) {
- let pool = taler_test_utils::db_test_setup(CONFIG_SOURCE).await;
+ let pool = taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await;
let conn = pool.acquire().await.unwrap().leak();
(conn, pool)
}
@@ -1378,12 +1374,16 @@ mod test {
}
// Unknown transfer
- db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg")
+ db::initiated_submit_permanent_failure(&mut db, 1, "msg")
.await
.unwrap();
db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12)
.await
.unwrap();
+ assert_eq!(
+ db::initiated_chargeback_failure(&mut db, 1).await.unwrap(),
+ ChargebackFailureResult::Unknown
+ );
// Failure
db::make_transfer(
@@ -1403,7 +1403,7 @@ mod test {
.await
.expect("transfer");
check_status(&mut db, 1, TransferState::pending, None).await;
- db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "error status")
+ db::initiated_submit_permanent_failure(&mut db, 1, "error status")
.await
.unwrap();
check_status(
@@ -1439,7 +1439,7 @@ mod test {
db::register_tx_out(
&mut db,
&TxOut {
- transfer_id: 2,
+ transfer_id: 5,
tx_id: Some(3),
amount: decimal("2"),
subject: "".to_string(),
@@ -1452,6 +1452,23 @@ mod test {
.await
.unwrap();
check_status(&mut db, 2, TransferState::success, None).await;
+
+ // Chargeback
+ assert_eq!(
+ db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
+ ChargebackFailureResult::Known(2)
+ );
+ check_status(
+ &mut db,
+ 2,
+ TransferState::late_failure,
+ Some("charged back"),
+ )
+ .await;
+ assert_eq!(
+ db::initiated_chargeback_failure(&mut db, 5).await.unwrap(),
+ ChargebackFailureResult::Idempotent(2)
+ );
}
#[tokio::test]
@@ -1527,7 +1544,7 @@ mod test {
// Skip failed
for i in 0..=10 {
- db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure")
+ db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure")
.await
.expect("status failure");
}
diff --git a/taler-cyclos/src/lib.rs b/taler-cyclos/src/lib.rs
@@ -51,7 +51,7 @@ impl FromStr for CyclosId {
type Err = CyclosIdError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
- Ok(Self(u64::from_str(s).map_err(CyclosIdError)?))
+ Ok(Self(u64::from_str(s.trim_start_matches('-')).map_err(CyclosIdError)?))
}
}
diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs
@@ -28,7 +28,9 @@ use crate::{
client::Client,
types::{AccountKind, HistoryItem, NotFoundError},
},
- db::{self, AddIncomingResult, RegisterResult, TxIn, TxOut, TxOutKind},
+ db::{
+ self, AddIncomingResult, ChargebackFailureResult, RegisterResult, TxIn, TxOut, TxOutKind,
+ },
};
#[derive(Debug, thiserror::Error)]
@@ -105,13 +107,8 @@ impl Worker<'_> {
_ => return Err(e.into()),
};
// TODO is permission should be considered are hard or soft failure ?
- db::initiated_submit_permanent_failure(
- &mut *self.db,
- initiated.id,
- &Timestamp::now(),
- &msg,
- )
- .await?;
+ db::initiated_submit_permanent_failure(&mut *self.db, initiated.id, &msg)
+ .await?;
error!(target: "worker", "initiated failure {initiated}: {msg}");
}
}
@@ -129,7 +126,7 @@ impl Worker<'_> {
reason: &str|
-> Result<(), WorkerError> {
// Fetch existing transaction
- if let Some(chargeback) = transfer.chargeback_by {
+ if let Some(chargeback) = transfer.charged_back_by {
let res = db::register_bounced_tx_in(
db,
&tx,
@@ -177,7 +174,30 @@ impl Worker<'_> {
Ok(())
};
if let Some(chargeback) = transfer.chargeback_of {
- warn!("{tx} - This is a transaction failure, we need to handle it");
+ // This a chargeback of one of our transaction, if we bounce we might enter a loop
+ match db::initiated_chargeback_failure(&mut *self.db, *chargeback.id).await? {
+ ChargebackFailureResult::Unknown => {
+ trace!(target: "worker", "initiated failure unknown: charged back")
+ }
+ ChargebackFailureResult::Known(initiated) => {
+ error!(target: "worker", "initiated failure {initiated}: charged back")
+ }
+ ChargebackFailureResult::Idempotent(initiated) => {
+ trace!(target: "worker", "initiated failure {initiated} already seen: charged back")
+ }
+ }
+ // Sill register the incoming transaction as an incoming one
+ match db::register_tx_in(self.db, &tx, &None, &Timestamp::now()).await? {
+ AddIncomingResult::Success { new, .. } => {
+ if new {
+ info!(target: "worker", "in {tx} chargeback");
+ } else {
+ trace!(target: "worker", "in {tx} chargeback already seen");
+ }
+ }
+ AddIncomingResult::ReservePubReuse => unreachable!(),
+ }
+
return Ok(());
}
match parse_incoming_unstructured(&tx.subject) {
@@ -222,19 +242,28 @@ impl Worker<'_> {
AccountType::Exchange => {
let transfer = self.client.transfer(tx.transfer_id).await?;
+ if let Some(_) = transfer.charged_back_by {
+ match db::initiated_chargeback_failure(&mut *self.db, *transfer.id).await? {
+ ChargebackFailureResult::Unknown => {
+ trace!(target: "worker", "initiated failure unknown: charged back")
+ }
+ ChargebackFailureResult::Known(initiated) => {
+ error!(target: "worker", "initiated failure {initiated}: charged back")
+ }
+ ChargebackFailureResult::Idempotent(initiated) => {
+ trace!(target: "worker", "initiated failure {initiated} already seen: charged back")
+ }
+ }
+ }
+
let kind = if let Ok(subject) = subject::parse_outgoing(&tx.subject) {
TxOutKind::Talerable(subject)
- } else if let Some(chargeback) = transfer.chargeback_of {
+ } else if let Some(chargeback) = &transfer.chargeback_of {
TxOutKind::Bounce(*chargeback.id)
} else {
TxOutKind::Simple
};
- if let Some(chargeback) = transfer.chargeback_by {
- warn!("{tx} - This is a transaction failure, we need to handle it");
- return Ok(());
- }
-
let res = db::register_tx_out(self.db, &tx, &kind, &Timestamp::now()).await?;
match res.result {
RegisterResult::idempotent => match kind {
diff --git a/taler-cyclos/tests/api.rs b/taler-cyclos/tests/api.rs
@@ -26,12 +26,13 @@ use taler_common::{
types::{amount::amount, payto::payto, url},
};
use taler_test_utils::{
- Router, db_test_setup,
+ Router,
+ db::db_test_setup,
routine::{admin_add_incoming_routine, revenue_routine, routine_pagination, transfer_routine},
server::TestServer,
};
-/*
+/*
async fn setup() -> (Router, PgPool) {
let pool = db_test_setup(CONFIG_SOURCE).await;
@@ -138,4 +139,4 @@ async fn revenue() {
)
.await;
}
-*/
-\ No newline at end of file
+*/
diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs
@@ -798,7 +798,7 @@ mod test {
}
async fn setup() -> (PgConnection, PgPool) {
- let pool = taler_test_utils::db_test_setup(CONFIG_SOURCE).await;
+ let pool = taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await;
let conn = pool.acquire().await.unwrap().leak();
(conn, pool)
}
diff --git a/taler-magnet-bank/tests/api.rs b/taler-magnet-bank/tests/api.rs
@@ -33,7 +33,7 @@ use taler_magnet_bank::{
magnet_payto,
};
use taler_test_utils::{
- Router, db_test_setup,
+ Router, db::db_test_setup,
routine::{admin_add_incoming_routine, revenue_routine, routine_pagination, transfer_routine},
server::TestServer,
};