commit 72832a7d9de80cc40a8307f0f66152d45ccfa77d
parent 0e445837bc785c9885e85812591ce7935943cc58
Author: Antoine A <>
Date: Wed, 15 Jan 2025 18:41:38 +0100
magnet-bank: long polling
Diffstat:
16 files changed, 887 insertions(+), 429 deletions(-)
diff --git a/Cargo.lock b/Cargo.lock
@@ -196,9 +196,9 @@ dependencies = [
[[package]]
name = "axum-test"
-version = "17.0.2"
+version = "17.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "fb67e8e9ef63a57d8f494ce291b92a215413ab9752a12bbf7de4969acb7b8cdd"
+checksum = "375ec4f6db373ce6d696839249203c57049aefe1213cfa77bb2e12e10ed43d08"
dependencies = [
"anyhow",
"assert-json-diff",
@@ -259,9 +259,9 @@ checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b"
[[package]]
name = "bitflags"
-version = "2.7.0"
+version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "1be3f42a67d6d345ecd59f675f3f012d6974981560836e938c22b424b85ce1be"
+checksum = "8f68f53c83ab957f72c32642f3868eec03eb974d1fb82e453128456482613d36"
[[package]]
name = "block-buffer"
@@ -2606,6 +2606,7 @@ dependencies = [
"serde",
"serde_json",
"sqlx",
+ "taler-api",
"taler-common",
"tempfile",
"tokio",
diff --git a/common/taler-api/src/notification.rs b/common/taler-api/src/notification.rs
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2024 Taler Systems SA
+ Copyright (C) 2024-2025 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -20,6 +20,40 @@ use std::sync::Arc;
use dashmap::DashMap;
use tokio::sync::watch;
+pub mod de;
+
+/// Listen for many postgres notification channels using a single connection
+#[macro_export]
+macro_rules! notification_listener {
+ ($pool: expr, $($channel:expr => ($($arg:ident: $type:ty),*) $lambda:block),*$(,)?) => {
+ {
+ let mut listener = ::sqlx::postgres::PgListener::connect_with($pool).await?;
+ listener.listen_all([$($channel,)*]).await?;
+ loop {
+ while let Some(notification) = listener.try_recv().await? {
+ tracing::debug!(
+ "db notification: {} - {}",
+ notification.channel(),
+ notification.payload()
+ );
+ match notification.channel() {
+ $($channel => {
+ let ($($arg,)*): ($($type,)*) =
+ ::taler_api::notification::de::from_str(notification.payload()).unwrap();// TODO error handling
+ $lambda
+ }),*
+ unknown => unreachable!("{}", unknown),
+ }
+ }
+ // TODO wait before reconnect
+ }
+ }
+
+ }
+}
+
+pub use notification_listener;
+
type CountedNotify<T> = watch::Sender<Option<T>>;
#[derive(Default)]
diff --git a/common/taler-api/src/notification/de.rs b/common/taler-api/src/notification/de.rs
@@ -0,0 +1,219 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use serde::{
+ de::{self, value::SeqDeserializer, Error, Visitor},
+ forward_to_deserialize_any, Deserialize,
+};
+
+/// Parse notification message content
+pub fn from_str<'de, T: Deserialize<'de>>(
+ notification: &'de str,
+) -> Result<T, serde::de::value::Error> {
+ T::deserialize(Deserializer(notification))
+}
+
+macro_rules! gen_fn {
+ (deserialize_unit_struct, $self:ident, $v:ident, $content:block) => {
+ gen_fn!(deserialize_unit_struct, $self, (_name: &'static str, $v: V), $content);
+ };
+ (deserialize_newtype_struct, $self:ident, $v:ident, $content:block) => {
+ gen_fn!(deserialize_newtype_struct, $self, (_name: &'static str, $v: V), $content);
+ };
+ (deserialize_tuple, $self:ident, $v:ident, $content:block) => {
+ gen_fn!(deserialize_tuple, $self, (_len: usize, $v: V), $content);
+ };
+ (deserialize_tuple_struct, $self:ident, $v:ident, $content:block) => {
+ gen_fn!(deserialize_tuple_struct, $self, (_name: &'static str, _len: usize, $v: V), $content);
+ };
+ (deserialize_struct, $self:ident, $v:ident, $content:block) => {
+ gen_fn!(deserialize_struct, $self, (_name: &'static str, _fields: &'static [&'static str], $v: V), $content);
+ };
+ (deserialize_enum, $self:ident, $v:ident, $content:block) => {
+ gen_fn!(deserialize_enum, $self, (_name: &'static str, _variants: &'static [&'static str], $v: V), $content);
+ };
+ ($fn:ident, $self:ident, $v:ident, $content:block) => { gen_fn!($fn, $self, ($v: V), $content); };
+ ($fn:ident, $self:ident, ($($arg:ident: $type:ty),*), $content:block) => {
+ fn $fn<V: Visitor<'de>>($self$(, $arg: $type)*) -> Result<V::Value, de::value::Error> {
+ $content
+ }
+ };
+}
+
+macro_rules! forward_to_parse {
+ ($(($from:ident, $visit_func:ident)),*$(,)?) => {
+ $(
+ gen_fn!($from, self, visitor, {
+ visitor.$visit_func(
+ self.0
+ .parse()
+ .map_err(|e| Error::custom(format!("{}", e)))?,
+ )
+ });
+ )*
+ }
+}
+
+macro_rules! forward_to_other {
+ ($(($from:ident, $to:ident)),*$(,)?) => {
+ $(
+ gen_fn!($from, self, visitor, {
+ self.$to(visitor)
+ });
+ )*
+ }
+}
+
+macro_rules! fail_unsupported {
+ ($(($from:ident, $ty:expr)),*$(,)?) => {
+ $(
+ gen_fn!($from, self, _visitor, {
+ Err(Error::custom(format!("cannot deserialize non primitive type {}", $ty)))
+ });
+ )*
+ }
+}
+
+/// Space-separated values deserializer for Postgres notification
+/// Support tuples only
+struct Deserializer<'de>(&'de str);
+
+impl<'de> serde::de::Deserializer<'de> for Deserializer<'de> {
+ type Error = de::value::Error;
+
+ fn deserialize_any<V>(self, visitor: V) -> Result<V::Value, Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ let de = SeqDeserializer::new(self.0.split(' ').map(PlainDe));
+ visitor.visit_seq(de)
+ }
+
+ fn deserialize_unit<V>(self, visitor: V) -> Result<V::Value, Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ PlainDe(self.0).deserialize_unit(visitor)
+ }
+
+ forward_to_deserialize_any! {
+ bool i8 i16 i32 i64 i128 u8 u16 u32 u64 u128 f32 f64 char str string
+ bytes byte_buf option unit_struct newtype_struct seq tuple
+ tuple_struct map struct enum identifier ignored_any
+ }
+}
+
+/// Deserializer for any plain value that can be parsed from a string
+struct PlainDe<'de>(&'de str);
+
+impl<'de> serde::de::IntoDeserializer<'de> for PlainDe<'de> {
+ type Deserializer = Self;
+
+ fn into_deserializer(self) -> Self::Deserializer {
+ self
+ }
+}
+
+impl<'de> serde::de::Deserializer<'de> for PlainDe<'de> {
+ type Error = serde::de::value::Error;
+
+ fn deserialize_str<V>(self, visitor: V) -> Result<V::Value, Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ visitor.visit_borrowed_str(self.0)
+ }
+
+ fn deserialize_option<V>(self, visitor: V) -> Result<V::Value, Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ if self.0.is_empty() {
+ visitor.visit_none()
+ } else {
+ visitor.visit_some(self)
+ }
+ }
+
+ fn deserialize_unit<V>(self, visitor: V) -> Result<V::Value, Self::Error>
+ where
+ V: Visitor<'de>,
+ {
+ if self.0.is_empty() {
+ visitor.visit_unit()
+ } else {
+ Err(Error::custom("expected empty string for unit"))
+ }
+ }
+
+ forward_to_parse!(
+ (deserialize_i8, visit_i8),
+ (deserialize_i16, visit_i16),
+ (deserialize_i32, visit_i32),
+ (deserialize_i64, visit_i64),
+ (deserialize_i128, visit_i128),
+ (deserialize_u8, visit_u8),
+ (deserialize_u16, visit_u16),
+ (deserialize_u32, visit_u32),
+ (deserialize_u64, visit_u64),
+ (deserialize_u128, visit_u128),
+ (deserialize_f32, visit_f32),
+ (deserialize_f64, visit_f64),
+ (deserialize_char, visit_char),
+ (deserialize_bool, visit_bool)
+ );
+
+ forward_to_other!(
+ (deserialize_string, deserialize_str),
+ (deserialize_any, deserialize_str),
+ (deserialize_unit_struct, deserialize_unit),
+ );
+
+ fail_unsupported!(
+ (deserialize_bytes, "bytes"),
+ (deserialize_byte_buf, "bytes"),
+ (deserialize_seq, "seq"),
+ (deserialize_tuple_struct, "tuple"),
+ (deserialize_tuple, "tuple"),
+ (deserialize_map, "map"),
+ (deserialize_struct, "struct"),
+ (deserialize_newtype_struct, "struct"),
+ (deserialize_enum, "enum"),
+ (deserialize_identifier, "identifier"),
+ (deserialize_ignored_any, "any")
+ );
+}
+
+#[test]
+pub fn parse() {
+ // Parse simple message
+ assert_eq!((1,), from_str::<(u8,)>("1").unwrap());
+ assert_eq!((false,), from_str::<(bool,)>("false").unwrap());
+ assert_eq!(("username",), from_str::<(&str,)>("username").unwrap());
+
+ // Parse composite message
+ assert_eq!((1, false), from_str::<(u8, bool)>("1 false").unwrap());
+ assert_eq!([1, 2, 3, 4], from_str::<[u8; 4]>("1 2 3 4").unwrap());
+
+ // Parse corner case
+ assert_eq!((), from_str::<()>("").unwrap());
+ assert_eq!((1, None), from_str::<(u8, Option<bool>)>("1 ").unwrap());
+ assert_eq!((1, ()), from_str::<(u8, ())>("1 ").unwrap());
+ assert_eq!(
+ (1, (), true),
+ from_str::<(u8, (), bool)>("1 true").unwrap()
+ );
+}
diff --git a/common/taler-api/tests/api.rs b/common/taler-api/tests/api.rs
@@ -16,22 +16,19 @@
use common::sample_wire_gateway_api;
use sqlx::PgPool;
-use taler_api::{auth::AuthMethod, db::IncomingType, standard_layer};
+use taler_api::{auth::AuthMethod, standard_layer};
use taler_common::{
- api_common::{EddsaPublicKey, HashCode, ShortHashCode},
- api_wire::{
- IncomingBankTransaction, IncomingHistory, OutgoingHistory, TransferList, TransferResponse,
- TransferState, TransferStatus,
- },
+ api_common::{HashCode, ShortHashCode},
+ api_wire::{OutgoingHistory, TransferResponse, TransferState},
error_code::ErrorCode,
- types::{amount::amount, base32::Base32, url},
+ types::{amount::amount, url},
};
use test_utils::{
axum_test::TestServer,
db_test_setup,
helpers::TestResponseHelper,
json,
- routine::{routine_history, routine_pagination},
+ routine::{admin_add_incoming_routine, routine_pagination, transfer_routine},
};
mod common;
@@ -70,145 +67,7 @@ async fn config() {
#[tokio::test]
async fn transfer() {
let (server, _) = setup().await;
- let valid_request = json!({
- "request_uid": HashCode::rand(),
- "amount": "EUR:42",
- "exchange_base_url": "http://exchange.taler",
- "wtid": ShortHashCode::rand(),
- "credit_account": "payto://todo",
- });
-
- // Check OK
- let first = server
- .post("/transfer")
- .json(&valid_request)
- .await
- .assert_ok_json::<TransferResponse>();
- // Check idempotent
- let second = server
- .post("/transfer")
- .json(&valid_request)
- .await
- .assert_ok_json::<TransferResponse>();
- assert_eq!(first.row_id, second.row_id);
- assert_eq!(first.timestamp, second.timestamp);
-
- // Check request uid reuse
- server
- .post("/transfer")
- .json(&json!(valid_request + {
- "wtid": ShortHashCode::rand()
- }))
- .await
- .assert_error(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED);
-
- // Check currency mismatch
- server
- .post("/transfer")
- .json(&json!(valid_request + {
- "amount": "CHF:42"
- }))
- .await
- .assert_error(ErrorCode::GENERIC_CURRENCY_MISMATCH);
-}
-
-#[tokio::test]
-async fn transfer_by_id() {
- let (server, _) = setup().await;
-
- let wtid = ShortHashCode::rand();
- let resp = server
- .post("/transfer")
- .json(&json!({
- "request_uid": HashCode::rand(),
- "amount": "EUR:55",
- "exchange_base_url": "http://exchange.taler",
- "wtid": wtid,
- "credit_account": "payto://todo",
- }))
- .await
- .assert_ok_json::<TransferResponse>();
-
- // Check OK
- let tx = server
- .get(&format!("/transfers/{}", resp.row_id))
- .await
- .assert_ok_json::<TransferStatus>();
- assert_eq!(TransferState::success, tx.status);
- assert_eq!(amount("EUR:55"), tx.amount);
- assert_eq!("http://exchange.taler/", tx.origin_exchange_url);
- assert_eq!(wtid, tx.wtid);
- assert_eq!(resp.timestamp, tx.timestamp);
-
- // Check unknown transaction
- server
- .get("/transfers/42")
- .await
- .assert_error(ErrorCode::BANK_TRANSACTION_NOT_FOUND);
-}
-
-#[tokio::test]
-async fn transfer_page() {
- let (server, _) = setup().await;
- server.get("/transfers").await.assert_no_content();
- server
- .get("/transfers?status=success")
- .await
- .assert_no_content();
-
- for _ in 0..6 {
- server
- .post("/transfer")
- .json(&json!({
- "request_uid": HashCode::rand(),
- "amount": "EUR:55",
- "exchange_base_url": "http://exchange.taler",
- "wtid": ShortHashCode::rand(),
- "credit_account": "payto://todo",
- }))
- .await
- .assert_ok_json::<TransferResponse>();
- }
- {
- let list = server
- .get("/transfers")
- .await
- .assert_ok_json::<TransferList>();
- assert_eq!(list.transfers.len(), 6);
- assert_eq!(
- list,
- server
- .get("/transfers?status=success")
- .await
- .assert_ok_json::<TransferList>()
- );
- }
-
- // Pagination test
- routine_pagination::<TransferList, _>(
- &server,
- "/transfers",
- |it| {
- it.transfers
- .into_iter()
- .map(|it| *it.row_id as i64)
- .collect()
- },
- |server, i| async move {
- server
- .post("/transfer")
- .json(&json!({
- "request_uid": HashCode::rand(),
- "amount": amount(&format!("EUR:0.0{i}")),
- "exchange_base_url": url("http://exchange.taler"),
- "wtid": ShortHashCode::rand(),
- "credit_account": url("payto://todo"),
- }))
- .await
- .assert_ok_json::<TransferResponse>();
- },
- )
- .await;
+ transfer_routine(&server, TransferState::success).await;
}
#[tokio::test]
@@ -243,126 +102,7 @@ async fn outgoing_history() {
}
#[tokio::test]
-async fn incoming_history() {
- let (server, _) = setup().await;
- server.get("/history/incoming").await.assert_no_content();
-
- routine_history(
- &server,
- "/history/incoming",
- |it: IncomingHistory| {
- it.incoming_transactions
- .into_iter()
- .map(|it| match it {
- IncomingBankTransaction::Reserve { row_id, .. }
- | IncomingBankTransaction::Wad { row_id, .. }
- | IncomingBankTransaction::Kyc { row_id, .. } => *row_id as i64,
- })
- .collect()
- },
- 2,
- |server, i| async move {
- if i % 2 == 0 {
- server
- .post("/admin/add-incoming")
- .json(&json!({
- "amount": format!("EUR:0.0{i}"),
- "reserve_pub": EddsaPublicKey::rand(),
- "debit_account": "payto://todo",
- }))
- .await
- .assert_ok_json::<TransferResponse>();
- } else {
- server
- .post("/admin/add-kycauth")
- .json(&json!({
- "amount": format!("EUR:0.0{i}"),
- "account_pub": EddsaPublicKey::rand(),
- "debit_account": "payto://todo",
- }))
- .await
- .assert_ok_json::<TransferResponse>();
- }
- },
- 0,
- |_, _| async move {},
- )
- .await;
-}
-
-async fn add_incoming_routine(server: TestServer, kind: IncomingType) {
- let (path, key) = match kind {
- IncomingType::reserve => ("/admin/add-incoming", "reserve_pub"),
- IncomingType::kyc => ("/admin/add-kycauth", "account_pub"),
- IncomingType::wad => unreachable!(),
- };
- let valid_req = json!({
- "amount": "EUR:44",
- key: EddsaPublicKey::rand(),
- "debit_account": "payto://todo",
- });
-
- // Check OK
- server.post(&path).json(&valid_req).await.assert_ok();
-
- match kind {
- IncomingType::reserve => {
- // Trigger conflict due to reused reserve_pub
- server
- .post(&path)
- .json(&valid_req)
- .await
- .assert_error(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)
- }
- IncomingType::kyc => {
- // Non conflict on reuse
- server.post(&path).json(&valid_req).await.assert_ok();
- }
- IncomingType::wad => unreachable!(),
- }
-
- // Currency mismatch
- server
- .post(&path)
- .json(&json!(valid_req + {
- "amount": "CHF:33"
- }))
- .await
- .assert_error(ErrorCode::GENERIC_CURRENCY_MISMATCH);
-
- // Bad BASE32 reserve_pub
- server
- .post(&path)
- .json(&json!(valid_req + {
- key: "I love chocolate"
- }))
- .await
- .assert_error(ErrorCode::GENERIC_JSON_INVALID);
-
- server
- .post(&path)
- .json(&json!(valid_req + {
- key: Base32::<31>::rand()
- }))
- .await
- .assert_error(ErrorCode::GENERIC_JSON_INVALID);
- /*
- // Bad payto kind
- client.postA("/taler-wire-gateway/admin/$path") {
- json(valid_req) {
- "debit_account" to "payto://x-taler-bank/bank.hostname.test/bar"
- }
- }.assertBadRequest()*/
-}
-
-#[tokio::test]
-async fn add_incoming_reserve() {
- let (server, _) = setup().await;
- add_incoming_routine(server, IncomingType::reserve).await;
-}
-
-#[tokio::test]
-async fn add_incoming_kyc() {
+async fn admin_add_incoming() {
let (server, _) = setup().await;
- add_incoming_routine(server, IncomingType::kyc).await;
+ admin_add_incoming_routine(&server).await;
}
diff --git a/common/taler-api/tests/common/db.rs b/common/taler-api/tests/common/db.rs
@@ -14,14 +14,8 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use sqlx::{
- postgres::{PgListener, PgRow},
- PgPool, QueryBuilder, Row,
-};
-use taler_api::{
- db::{history, page, BindHelper, IncomingType, TypeHelper},
- error::ApiError,
-};
+use sqlx::{postgres::PgRow, PgPool, QueryBuilder, Row};
+use taler_api::db::{history, page, BindHelper, IncomingType, TypeHelper};
use taler_common::{
api_common::{EddsaPublicKey, SafeU64},
api_params::{History, Page},
@@ -32,36 +26,20 @@ use taler_common::{
types::{amount::Amount, payto::Payto, timestamp::Timestamp},
};
use tokio::sync::watch::{Receiver, Sender};
-use tracing::debug;
pub async fn notification_listener(
pool: PgPool,
outgoing_channel: Sender<i64>,
incoming_channel: Sender<i64>,
-) -> Result<(), ApiError> {
- let mut listener = PgListener::connect_with(&pool).await?;
- listener.listen_all(["outgoing_tx", "incoming_tx"]).await?;
- loop {
- while let Some(notification) = listener.try_recv().await? {
- debug!(
- "db notification: {} - {}",
- notification.channel(),
- notification.payload()
- );
- match notification.channel() {
- "outgoing_tx" => {
- let row_id: i64 = notification.payload().parse().unwrap();
- outgoing_channel.send_replace(row_id);
- }
- "incoming_tx" => {
- let row_id: i64 = notification.payload().parse().unwrap();
- incoming_channel.send_replace(row_id);
- }
- unknown => unreachable!("{}", unknown),
- }
+) -> sqlx::Result<()> {
+ taler_api::notification::notification_listener!(&pool,
+ "outgoing_tx" => (row_id: i64) {
+ outgoing_channel.send_replace(row_id);
+ },
+ "incoming_tx" => (row_id: i64) {
+ incoming_channel.send_replace(row_id);
}
- // TODO wait before reconnect
- }
+ )
}
pub enum TransferResult {
diff --git a/common/taler-api/tests/schema.sql b/common/taler-api/tests/schema.sql
@@ -13,7 +13,6 @@
-- You should have received a copy of the GNU General Public License along with
-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-BEGIN;
DROP SCHEMA public CASCADE;
CREATE SCHEMA public;
@@ -158,6 +157,4 @@ INSERT INTO incoming_transactions (
-- Notify new incoming transaction
PERFORM pg_notify('incoming_tx', out_tx_row_id || '');
END $$;
-COMMENT ON FUNCTION add_incoming IS 'Create an incoming taler transaction and register it';
-
-COMMIT;
+COMMENT ON FUNCTION add_incoming IS 'Create an incoming taler transaction and register it';
+\ No newline at end of file
diff --git a/common/taler-common/src/api_wire.rs b/common/taler-common/src/api_wire.rs
@@ -173,3 +173,14 @@ pub enum TransferState {
permanent_failure,
success,
}
+
+impl AsRef<str> for TransferState {
+ fn as_ref(&self) -> &str {
+ match self {
+ TransferState::pending => "pending",
+ TransferState::transient_failure => "transient_failure",
+ TransferState::permanent_failure => "permanent_failure",
+ TransferState::success => "success",
+ }
+ }
+}
diff --git a/common/test-utils/Cargo.toml b/common/test-utils/Cargo.toml
@@ -10,6 +10,7 @@ tokio.workspace = true
serde_json.workspace = true
serde.workspace = true
taler-common.workspace = true
+taler-api.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
tempfile.workspace = true
diff --git a/common/test-utils/src/json.rs b/common/test-utils/src/json.rs
@@ -14,27 +14,24 @@
TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
*/
-use serde_json::Value;
-
-pub fn merge(a: &mut Value, b: Value) {
- match (a, b) {
- (a @ &mut Value::Object(_), Value::Object(b)) => {
- let a = a.as_object_mut().unwrap();
- for (k, v) in b {
- merge(a.entry(k).or_insert(Value::Null), v);
- }
- }
- (a, b) => *a = b,
- }
-}
-
#[macro_export]
macro_rules! json {
($object:ident + $($json:tt)+) => {
{
+ pub fn merge(a: &mut ::serde_json::Value, b: ::serde_json::Value) {
+ match (a, b) {
+ (a @ &mut ::serde_json::Value::Object(_), ::serde_json::Value::Object(b)) => {
+ let a = a.as_object_mut().unwrap();
+ for (k, v) in b {
+ merge(a.entry(k).or_insert(::serde_json::Value::Null), v);
+ }
+ }
+ (a, b) => *a = b,
+ }
+ }
let change = json!($($json)+);
let mut original = $object.clone();
- test_utils::json::merge(&mut original, change);
+ merge(&mut original, change);
original
}
};
diff --git a/common/test-utils/src/lib.rs b/common/test-utils/src/lib.rs
@@ -70,9 +70,11 @@ async fn db_pool() -> PgPool {
.expect("pg pool")
}
+const TEST_TRACING_LEVEL: tracing::Level = Level::WARN;
+
fn setup_tracing() {
FmtSubscriber::builder()
- .with_max_level(Level::TRACE)
+ .with_max_level(TEST_TRACING_LEVEL)
.with_writer(std::io::stderr)
.finish()
.try_init()
diff --git a/common/test-utils/src/routine.rs b/common/test-utils/src/routine.rs
@@ -1,6 +1,6 @@
/*
This file is part of TALER
- Copyright (C) 2024 Taler Systems SA
+ Copyright (C) 2024-2025 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
@@ -25,16 +25,26 @@ use std::{
use axum::{extract::Query, http::Uri};
use axum_test::{TestResponse, TestServer};
use serde::de::DeserializeOwned;
-use taler_common::api_params::PageParams;
+use taler_api::db::IncomingType;
+use taler_common::{
+ api_common::{EddsaPublicKey, HashCode, ShortHashCode},
+ api_params::PageParams,
+ api_wire::{
+ IncomingBankTransaction, IncomingHistory, TransferList, TransferResponse, TransferState,
+ TransferStatus,
+ },
+ error_code::ErrorCode,
+ types::{amount::amount, base32::Base32, url},
+};
use tokio::time::sleep;
-use crate::helpers::TestResponseHelper;
+use crate::{helpers::TestResponseHelper, json};
pub async fn routine_pagination<'a, T: DeserializeOwned, F: Future<Output = ()>>(
server: &'a TestServer,
url: &str,
ids: fn(T) -> Vec<i64>,
- register: fn(&'a TestServer, usize) -> F,
+ mut register: impl FnMut(&'a TestServer, usize) -> F,
) {
// Check history is following specs
let assert_history = |args: Cow<'static, str>, size: usize| async move {
@@ -81,9 +91,9 @@ pub async fn routine_history<
url: &str,
ids: fn(T) -> Vec<i64>,
nb_register: usize,
- register: fn(&'a TestServer, usize) -> FR,
+ mut register: impl FnMut(&'a TestServer, usize) -> FR,
nb_ignore: usize,
- ignore: fn(&'a TestServer, usize) -> FI,
+ mut ignore: impl FnMut(&'a TestServer, usize) -> FI,
) {
// Check history is following specs
let assert_history = |args: String, size: usize| async move {
@@ -241,3 +251,273 @@ fn assert_history_ids<T: DeserializeOwned>(
}
history
}
+
+// Get currency from config
+async fn get_currency(server: &TestServer) -> String {
+ let config = server
+ .get("/config")
+ .await
+ .assert_ok_json::<serde_json::Value>();
+ let currency = config["currency"].as_str().unwrap();
+ currency.to_owned()
+}
+
+/// Test standard behavior of the transfer endpoints
+pub async fn transfer_routine(server: &TestServer, default_status: TransferState) {
+ let currency = &get_currency(server).await;
+ let default_amount = amount(format!("{currency}:42"));
+ let transfer_request = json!({
+ "request_uid": HashCode::rand(),
+ "amount": default_amount,
+ "exchange_base_url": "http://exchange.taler",
+ "wtid": ShortHashCode::rand(),
+ "credit_account": "payto://todo",
+ });
+
+ // Check empty db
+ server.get("/transfers").await.assert_no_content();
+ server
+ .get(&format!("/transfers?status={}", default_status.as_ref()))
+ .await
+ .assert_no_content();
+
+ // Check create transfer
+ {
+ // Check OK
+ let first = server
+ .post("/transfer")
+ .json(&transfer_request)
+ .await
+ .assert_ok_json::<TransferResponse>();
+ // Check idempotent
+ let second = server
+ .post("/transfer")
+ .json(&transfer_request)
+ .await
+ .assert_ok_json::<TransferResponse>();
+ assert_eq!(first.row_id, second.row_id);
+ assert_eq!(first.timestamp, second.timestamp);
+
+ // Check request uid reuse
+ server
+ .post("/transfer")
+ .json(&json!(transfer_request + {
+ "wtid": ShortHashCode::rand()
+ }))
+ .await
+ .assert_error(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED);
+
+ // Check currency mismatch
+ server
+ .post("/transfer")
+ .json(&json!(transfer_request + {
+ "amount": "BAD:42"
+ }))
+ .await
+ .assert_error(ErrorCode::GENERIC_CURRENCY_MISMATCH);
+ }
+
+ // Check transfer by id
+ {
+ let wtid = ShortHashCode::rand();
+ let resp = server
+ .post("/transfer")
+ .json(&json!(transfer_request + {
+ "request_uid": HashCode::rand(),
+ "wtid": wtid,
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+
+ // Check OK
+ let tx = server
+ .get(&format!("/transfers/{}", resp.row_id))
+ .await
+ .assert_ok_json::<TransferStatus>();
+ assert_eq!(default_status, tx.status);
+ assert_eq!(default_amount, tx.amount);
+ assert_eq!("http://exchange.taler/", tx.origin_exchange_url);
+ assert_eq!(wtid, tx.wtid);
+ assert_eq!(resp.timestamp, tx.timestamp);
+
+ // Check unknown transaction
+ server
+ .get("/transfers/42")
+ .await
+ .assert_error(ErrorCode::BANK_TRANSACTION_NOT_FOUND);
+ }
+
+ // Check transfer page
+ {
+ for _ in 0..4 {
+ server
+ .post("/transfer")
+ .json(&json!(transfer_request + {
+ "request_uid": HashCode::rand(),
+ "wtid": ShortHashCode::rand(),
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ }
+ {
+ let list = server
+ .get("/transfers")
+ .await
+ .assert_ok_json::<TransferList>();
+ assert_eq!(list.transfers.len(), 6);
+ assert_eq!(
+ list,
+ server
+ .get(&format!("/transfers?status={}", default_status.as_ref()))
+ .await
+ .assert_ok_json::<TransferList>()
+ );
+ }
+
+ // Pagination test
+ routine_pagination::<TransferList, _>(
+ server,
+ "/transfers",
+ |it| {
+ it.transfers
+ .into_iter()
+ .map(|it| *it.row_id as i64)
+ .collect()
+ },
+ |server, i| async move {
+ server
+ .post("/transfer")
+ .json(&json!({
+ "request_uid": HashCode::rand(),
+ "amount": amount(format!("{currency}:0.0{i}")),
+ "exchange_base_url": url("http://exchange.taler"),
+ "wtid": ShortHashCode::rand(),
+ "credit_account": url("payto://todo"),
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ },
+ )
+ .await;
+ }
+}
+
+async fn add_incoming_routine(server: &TestServer, currency: &str, kind: IncomingType) {
+ let (path, key) = match kind {
+ IncomingType::reserve => ("/admin/add-incoming", "reserve_pub"),
+ IncomingType::kyc => ("/admin/add-kycauth", "account_pub"),
+ IncomingType::wad => unreachable!(),
+ };
+ let valid_req = json!({
+ "amount": format!("{currency}:44"),
+ key: EddsaPublicKey::rand(),
+ "debit_account": "payto://todo",
+ });
+
+ // Check OK
+ server.post(path).json(&valid_req).await.assert_ok();
+
+ match kind {
+ IncomingType::reserve => {
+ // Trigger conflict due to reused reserve_pub
+ server
+ .post(path)
+ .json(&json!(valid_req + {
+ "amount": format!("{currency}:44.1"),
+ }))
+ .await
+ .assert_error(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)
+ }
+ IncomingType::kyc => {
+ // Non conflict on reuse
+ server.post(path).json(&valid_req).await.assert_ok();
+ }
+ IncomingType::wad => unreachable!(),
+ }
+
+ // Currency mismatch
+ server
+ .post(path)
+ .json(&json!(valid_req + {
+ "amount": "BAD:33"
+ }))
+ .await
+ .assert_error(ErrorCode::GENERIC_CURRENCY_MISMATCH);
+
+ // Bad BASE32 reserve_pub
+ server
+ .post(path)
+ .json(&json!(valid_req + {
+ key: "I love chocolate"
+ }))
+ .await
+ .assert_error(ErrorCode::GENERIC_JSON_INVALID);
+
+ server
+ .post(path)
+ .json(&json!(valid_req + {
+ key: Base32::<31>::rand()
+ }))
+ .await
+ .assert_error(ErrorCode::GENERIC_JSON_INVALID);
+ /*
+ // Bad payto kind
+ client.postA("/taler-wire-gateway/admin/$path") {
+ json(valid_req) {
+ "debit_account" to "payto://x-taler-bank/bank.hostname.test/bar"
+ }
+ }.assertBadRequest()*/
+}
+
+/// Test standard behavior of the admin add incoming endpoints
+pub async fn admin_add_incoming_routine(server: &TestServer) {
+ let currency = &get_currency(server).await;
+
+ // History
+ server.get("/history/incoming").await.assert_no_content();
+ routine_history(
+ server,
+ "/history/incoming",
+ |it: IncomingHistory| {
+ it.incoming_transactions
+ .into_iter()
+ .map(|it| match it {
+ IncomingBankTransaction::Reserve { row_id, .. }
+ | IncomingBankTransaction::Wad { row_id, .. }
+ | IncomingBankTransaction::Kyc { row_id, .. } => *row_id as i64,
+ })
+ .collect()
+ },
+ 2,
+ |server, i| async move {
+ if i % 2 == 0 {
+ server
+ .post("/admin/add-incoming")
+ .json(&json!({
+ "amount": format!("{currency}:0.0{i}"),
+ "reserve_pub": EddsaPublicKey::rand(),
+ "debit_account": "payto://todo",
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ } else {
+ server
+ .post("/admin/add-kycauth")
+ .json(&json!({
+ "amount": format!("{currency}:0.0{i}"),
+ "account_pub": EddsaPublicKey::rand(),
+ "debit_account": "payto://todo",
+ }))
+ .await
+ .assert_ok_json::<TransferResponse>();
+ }
+ },
+ 0,
+ |_, _| async move {},
+ )
+ .await;
+ // Add incoming reserve
+ add_incoming_routine(server, currency, IncomingType::reserve).await;
+ // Add incoming kyc
+ add_incoming_routine(server, currency, IncomingType::kyc).await;
+}
diff --git a/wire-gateway/magnet-bank/db/schema.sql b/wire-gateway/magnet-bank/db/schema.sql
@@ -13,7 +13,6 @@
-- You should have received a copy of the GNU General Public License along with
-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-BEGIN;
CREATE TYPE taler_amount AS (val INT8, frac INT4);
COMMENT ON TYPE taler_amount IS 'Stores an amount, fraction is in units of 1/100000000 of the base value';
@@ -104,6 +103,8 @@ CREATE FUNCTION register_tx_in(
IN in_timestamp INT8,
IN in_type incoming_type,
IN in_metadata BYTEA,
+ -- Error status
+ OUT out_reserve_pub_reuse BOOLEAN,
-- Success return
OUT out_tx_row_id INT8,
OUT out_timestamp INT8,
@@ -117,41 +118,50 @@ INTO out_tx_row_id, out_timestamp
FROM tx_in
WHERE (in_code IS NOT NULL AND magnet_code = in_code) -- Magnet transaction
OR (in_code IS NULL AND amount = in_amount AND debit_payto = in_debit_payto AND subject = in_subject); -- Admin transaction
-
out_new = NOT found;
-IF out_new THEN
- -- Insert new incoming transaction
- INSERT INTO tx_in (
- magnet_code,
- amount,
- subject,
- debit_payto,
- created
+IF NOT out_new THEN
+ out_reserve_pub_reuse=false;
+ RETURN;
+END IF;
+
+-- Check conflict
+SELECT in_type = 'reserve'::incoming_type AND EXISTS(SELECT FROM taler_in WHERE metadata = in_metadata AND type = 'reserve')
+ INTO out_reserve_pub_reuse;
+IF out_reserve_pub_reuse THEN
+ RETURN;
+END IF;
+
+-- Insert new incoming transaction
+INSERT INTO tx_in (
+ magnet_code,
+ amount,
+ subject,
+ debit_payto,
+ created
+) VALUES (
+ in_code,
+ in_amount,
+ in_subject,
+ in_debit_payto,
+ in_timestamp
+)
+RETURNING tx_in_id, created
+INTO out_tx_row_id, out_timestamp;
+-- Notify new incoming transaction registration
+PERFORM pg_notify('tx_in', out_tx_row_id || '');
+IF in_type IS NOT NULL THEN
+ -- Insert new incoming talerable transaction
+ INSERT INTO taler_in (
+ tx_in_id,
+ type,
+ metadata
) VALUES (
- in_code,
- in_amount,
- in_subject,
- in_debit_payto,
- in_timestamp
- )
- RETURNING tx_in_id, created
- INTO out_tx_row_id, out_timestamp;
- -- Notify new incoming transaction registration
- PERFORM pg_notify('tx_in', out_tx_row_id || '');
- IF in_type IS NOT NULL THEN
- -- Insert new incoming talerable transaction
- INSERT INTO taler_in (
- tx_in_id,
- type,
- metadata
- ) VALUES (
- out_tx_row_id,
- in_type,
- in_metadata
- );
- -- Notify new incoming talerable transaction registration
- PERFORM pg_notify('taler_in', out_tx_row_id || '');
- END IF;
+ out_tx_row_id,
+ in_type,
+ in_metadata
+ );
+ -- Notify new incoming talerable transaction registration
+ PERFORM pg_notify('taler_in', out_tx_row_id || '');
END IF;
END $$;
COMMENT ON FUNCTION register_tx_in IS 'Register an incoming transaction idempotently';
@@ -304,6 +314,4 @@ BEGIN
WHERE initiated_id = in_initiated_id;
END IF;
END IF;
-END $$;
-
-COMMIT;
+END $$;
+\ No newline at end of file
diff --git a/wire-gateway/magnet-bank/src/db.rs b/wire-gateway/magnet-bank/src/db.rs
@@ -29,9 +29,33 @@ use taler_common::{
},
types::{amount::Amount, payto::Payto, timestamp::Timestamp},
};
+use tokio::sync::watch::{Receiver, Sender};
use crate::constant::CURRENCY;
+pub async fn notification_listener(
+ pool: PgPool,
+ in_channel: Sender<i64>,
+ taler_in_channel: Sender<i64>,
+ out_channel: Sender<i64>,
+ taler_out_channel: Sender<i64>,
+) -> sqlx::Result<()> {
+ taler_api::notification::notification_listener!(&pool,
+ "tx_in" => (row_id: i64) {
+ in_channel.send_replace(row_id);
+ },
+ "taler_in" => (row_id: i64) {
+ taler_in_channel.send_replace(row_id);
+ },
+ "tx_out" => (row_id: i64) {
+ out_channel.send_replace(row_id);
+ },
+ "taler_out" => (row_id: i64) {
+ taler_out_channel.send_replace(row_id);
+ }
+ )
+}
+
#[derive(Debug, Clone)]
pub struct TxIn {
pub code: u64,
@@ -67,6 +91,12 @@ pub struct RegisteredTx {
}
#[derive(Debug, PartialEq, Eq)]
+pub enum AddIncomingResult {
+ Success(RegisteredTx),
+ ReservePubReuse,
+}
+
+#[derive(Debug, PartialEq, Eq)]
pub struct Initiated {
pub id: u64,
pub amount: Amount,
@@ -99,10 +129,10 @@ pub async fn db_init(db: &PgPool, reset: bool) -> sqlx::Result<()> {
Ok(())
}
-pub async fn register_tx_in_admin(db: &PgPool, tx: &TxInAdmin) -> sqlx::Result<RegisteredTx> {
+pub async fn register_tx_in_admin(db: &PgPool, tx: &TxInAdmin) -> sqlx::Result<AddIncomingResult> {
sqlx::query(
"
- SELECT out_new, out_tx_row_id, out_timestamp
+ SELECT out_reserve_pub_reuse, out_new, out_tx_row_id, out_timestamp
FROM register_tx_in(NULL, ($1, $2)::taler_amount, $3, $4, $5, $6, $7)
",
)
@@ -113,10 +143,14 @@ pub async fn register_tx_in_admin(db: &PgPool, tx: &TxInAdmin) -> sqlx::Result<R
.bind(tx.metadata.ty())
.bind(tx.metadata.key())
.try_map(|r: PgRow| {
- Ok(RegisteredTx {
- new: r.try_get(0)?,
- row_id: r.try_get_u64(1)?,
- timestamp: r.try_get_timestamp(2)?,
+ Ok(if r.try_get(0)? {
+ AddIncomingResult::ReservePubReuse
+ } else {
+ AddIncomingResult::Success(RegisteredTx {
+ new: r.try_get(1)?,
+ row_id: r.try_get_u64(2)?,
+ timestamp: r.try_get_timestamp(3)?,
+ })
})
})
.fetch_one(db)
@@ -127,10 +161,10 @@ pub async fn register_tx_in(
db: &mut PgConnection,
tx: &TxIn,
subject: &Option<IncomingSubject>,
-) -> sqlx::Result<RegisteredTx> {
+) -> sqlx::Result<AddIncomingResult> {
sqlx::query(
"
- SELECT out_new, out_tx_row_id, out_timestamp
+ SELECT out_reserve_pub_reuse, out_new, out_tx_row_id, out_timestamp
FROM register_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8)
",
)
@@ -142,10 +176,14 @@ pub async fn register_tx_in(
.bind(subject.as_ref().map(|it| it.ty()))
.bind(subject.as_ref().map(|it| it.key()))
.try_map(|r: PgRow| {
- Ok(RegisteredTx {
- new: r.try_get(0)?,
- row_id: r.try_get_u64(1)?,
- timestamp: r.try_get_timestamp(2)?,
+ Ok(if r.try_get(0)? {
+ AddIncomingResult::ReservePubReuse
+ } else {
+ AddIncomingResult::Success(RegisteredTx {
+ new: r.try_get(1)?,
+ row_id: r.try_get_u64(2)?,
+ timestamp: r.try_get_timestamp(3)?,
+ })
})
})
.fetch_one(db)
@@ -268,12 +306,13 @@ pub async fn transfer_page<'a>(
pub async fn outgoing_history(
db: &PgPool,
params: &History,
+ listen: impl FnOnce() -> Receiver<i64>,
) -> sqlx::Result<Vec<OutgoingBankTransaction>> {
history(
db,
"tx_out_id",
params,
- || tokio::sync::watch::channel(0).1,
+ listen,
|| {
QueryBuilder::new(
"
@@ -308,12 +347,13 @@ pub async fn outgoing_history(
pub async fn incoming_history(
db: &PgPool,
params: &History,
+ listen: impl FnOnce() -> Receiver<i64>,
) -> sqlx::Result<Vec<IncomingBankTransaction>> {
history(
db,
"tx_in_id",
params,
- || tokio::sync::watch::channel(0).1,
+ listen,
|| {
QueryBuilder::new(
"
@@ -379,13 +419,13 @@ pub async fn transfer_by_id<'a>(
.bind(id as i64)
.try_map(|r: PgRow| {
Ok(TransferStatus {
- status: r.try_get(1)?,
- status_msg: r.try_get(2)?,
- amount: r.try_get_amount_i(3, CURRENCY)?,
- origin_exchange_url: r.try_get(5)?,
- wtid: r.try_get_base32(6)?,
- credit_account: r.try_get_payto(7)?,
- timestamp: r.try_get_timestamp(8)?,
+ status: r.try_get(0)?,
+ status_msg: r.try_get(1)?,
+ amount: r.try_get_amount_i(2, CURRENCY)?,
+ origin_exchange_url: r.try_get(4)?,
+ wtid: r.try_get_base32(5)?,
+ credit_account: r.try_get_payto(6)?,
+ timestamp: r.try_get_timestamp(7)?,
})
})
.fetch_optional(db)
@@ -473,17 +513,22 @@ mod test {
api_wire::TransferRequest,
types::{amount::amount, payto::payto, timestamp::Timestamp, url},
};
+ use tokio::sync::watch::Receiver;
use crate::{
constant::CURRENCY,
db::{
self, make_transfer, register_tx_in, register_tx_in_admin, register_tx_out,
- RegisteredTx, TransferResult, TxIn, TxOut,
+ AddIncomingResult, RegisteredTx, TransferResult, TxIn, TxOut,
},
};
use super::TxInAdmin;
+ fn fake_listen<T: Default>() -> Receiver<T> {
+ tokio::sync::watch::channel(T::default()).1
+ }
+
async fn setup() -> (PgConnection, PgPool) {
let pool = test_utils::db_test_setup().await;
db::db_init(&pool, false).await.expect("dbinit");
@@ -518,11 +563,11 @@ mod test {
register_tx_in(db, &tx, &first)
.await
.expect("register tx in"),
- RegisteredTx {
+ AddIncomingResult::Success(RegisteredTx {
new: true,
row_id: id,
timestamp: tx.timestamp
- }
+ })
);
// Idempotent
assert_eq!(
@@ -536,11 +581,11 @@ mod test {
)
.await
.expect("register tx in"),
- RegisteredTx {
+ AddIncomingResult::Success(RegisteredTx {
new: false,
row_id: id,
timestamp: tx.timestamp
- }
+ })
);
// Many
assert_eq!(
@@ -554,17 +599,17 @@ mod test {
)
.await
.expect("register tx in"),
- RegisteredTx {
+ AddIncomingResult::Success(RegisteredTx {
new: true,
row_id: id + 1,
timestamp: tx.timestamp
- }
+ })
);
}
// Empty db
assert_eq!(
- db::incoming_history(&pool, &History::default())
+ db::incoming_history(&pool, &History::default(), fake_listen)
.await
.unwrap(),
Vec::new()
@@ -591,7 +636,7 @@ mod test {
// History
assert_eq!(
- db::incoming_history(&pool, &History::default())
+ db::incoming_history(&pool, &History::default(), fake_listen)
.await
.unwrap()
.len(),
@@ -601,11 +646,11 @@ mod test {
#[tokio::test]
async fn tx_in_admin() {
- let (mut db, pool) = setup().await;
+ let (_, pool) = setup().await;
// Empty db
assert_eq!(
- db::incoming_history(&pool, &History::default())
+ db::incoming_history(&pool, &History::default(), fake_listen)
.await
.unwrap(),
Vec::new()
@@ -623,11 +668,11 @@ mod test {
register_tx_in_admin(&pool, &tx)
.await
.expect("register tx in"),
- RegisteredTx {
+ AddIncomingResult::Success(RegisteredTx {
new: true,
row_id: 1,
timestamp: tx.timestamp
- }
+ })
);
// Idempotent
assert_eq!(
@@ -640,11 +685,11 @@ mod test {
)
.await
.expect("register tx in"),
- RegisteredTx {
+ AddIncomingResult::Success(RegisteredTx {
new: false,
row_id: 1,
timestamp: tx.timestamp
- }
+ })
);
// Many
assert_eq!(
@@ -658,16 +703,16 @@ mod test {
)
.await
.expect("register tx in"),
- RegisteredTx {
+ AddIncomingResult::Success(RegisteredTx {
new: true,
row_id: 2,
timestamp: tx.timestamp
- }
+ })
);
// History
assert_eq!(
- db::incoming_history(&pool, &History::default())
+ db::incoming_history(&pool, &History::default(), fake_listen)
.await
.unwrap()
.len(),
@@ -691,7 +736,7 @@ mod test {
.await
.unwrap();
let tx = TxOut {
- code: code,
+ code,
amount: amount("EUR:10"),
subject: "subject".to_owned(),
credit_payto: payto("payto://"),
@@ -748,7 +793,7 @@ mod test {
// Empty db
assert_eq!(
- db::outgoing_history(&pool, &History::default())
+ db::outgoing_history(&pool, &History::default(), fake_listen)
.await
.unwrap(),
Vec::new()
@@ -773,7 +818,7 @@ mod test {
// History
assert_eq!(
- db::outgoing_history(&pool, &History::default())
+ db::outgoing_history(&pool, &History::default(), fake_listen)
.await
.unwrap()
.len(),
@@ -870,8 +915,9 @@ mod test {
);
// Get
- //assert!(db::transfer_by_id(&mut db, 1).await.unwrap().is_some());
- //assert!(db::transfer_by_id(&mut db, 2).await.unwrap().is_some());
+ assert!(db::transfer_by_id(&mut db, 1).await.unwrap().is_some());
+ assert!(db::transfer_by_id(&mut db, 2).await.unwrap().is_some());
+ assert!(db::transfer_by_id(&mut db, 3).await.unwrap().is_none());
assert_eq!(
db::transfer_page(&mut db, &None, &Page::default())
.await
diff --git a/wire-gateway/magnet-bank/src/main.rs b/wire-gateway/magnet-bank/src/main.rs
@@ -102,11 +102,9 @@ async fn app(args: Args) -> Result<(), anyhow::Error> {
let db = DbConfig::parse(&cfg)?;
let pool = PgPool::connect_with(db.cfg).await?;
let cfg = WireGatewayConfig::parse(&cfg)?;
+ let gateway = MagnetWireGateway::start(pool, payto("payto://todo")).await;
taler_api::server(
- taler_api::wire_gateway_api(Arc::new(MagnetWireGateway {
- pool,
- payto: payto("payto://todo"),
- })),
+ taler_api::wire_gateway_api(Arc::new(gateway)),
cfg.serve,
cfg.auth,
None,
diff --git a/wire-gateway/magnet-bank/src/wire_gateway.rs b/wire-gateway/magnet-bank/src/wire_gateway.rs
@@ -33,15 +33,45 @@ use taler_common::{
timestamp::Timestamp,
},
};
+use tokio::sync::watch::Sender;
use crate::{
constant::CURRENCY,
- db::{self, TxInAdmin},
+ db::{self, AddIncomingResult, TxInAdmin},
};
pub struct MagnetWireGateway {
pub pool: sqlx::PgPool,
pub payto: Payto,
+ pub in_channel: Sender<i64>,
+ pub taler_in_channel: Sender<i64>,
+ pub out_channel: Sender<i64>,
+ pub taler_out_channel: Sender<i64>,
+}
+
+impl MagnetWireGateway {
+ pub async fn start(pool: sqlx::PgPool, payto: Payto) -> Self {
+ let in_channel = Sender::new(0);
+ let taler_in_channel = Sender::new(0);
+ let out_channel = Sender::new(0);
+ let taler_out_channel = Sender::new(0);
+ let tmp = Self {
+ pool: pool.clone(),
+ payto,
+ in_channel: in_channel.clone(),
+ taler_in_channel: taler_in_channel.clone(),
+ out_channel: out_channel.clone(),
+ taler_out_channel: taler_out_channel.clone(),
+ };
+ tokio::spawn(db::notification_listener(
+ pool,
+ in_channel,
+ taler_in_channel,
+ out_channel,
+ taler_out_channel,
+ ));
+ tmp
+ }
}
impl WireGatewayImpl for MagnetWireGateway {
@@ -89,14 +119,20 @@ impl WireGatewayImpl for MagnetWireGateway {
async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> {
Ok(OutgoingHistory {
- outgoing_transactions: db::outgoing_history(&self.pool, ¶ms).await?,
+ outgoing_transactions: db::outgoing_history(&self.pool, ¶ms, || {
+ self.taler_out_channel.subscribe()
+ })
+ .await?,
debit_account: self.payto.clone(),
})
}
async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> {
Ok(IncomingHistory {
- incoming_transactions: db::incoming_history(&self.pool, ¶ms).await?,
+ incoming_transactions: db::incoming_history(&self.pool, ¶ms, || {
+ self.taler_in_channel.subscribe()
+ })
+ .await?,
credit_account: self.payto.clone(),
})
}
@@ -105,7 +141,7 @@ impl WireGatewayImpl for MagnetWireGateway {
&self,
req: AddIncomingRequest,
) -> ApiResult<AddIncomingResponse> {
- let result = db::register_tx_in_admin(
+ let res = db::register_tx_in_admin(
&self.pool,
&TxInAdmin {
amount: req.amount,
@@ -116,14 +152,20 @@ impl WireGatewayImpl for MagnetWireGateway {
},
)
.await?;
- Ok(AddIncomingResponse {
- row_id: safe_u64(result.row_id),
- timestamp: result.timestamp,
- })
+ match res {
+ AddIncomingResult::Success(res) => Ok(AddIncomingResponse {
+ row_id: safe_u64(res.row_id),
+ timestamp: res.timestamp,
+ }),
+ AddIncomingResult::ReservePubReuse => Err(failure(
+ ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT,
+ "reserve_pub used already".to_owned(),
+ )),
+ }
}
async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> {
- let result = db::register_tx_in_admin(
+ let res = db::register_tx_in_admin(
&self.pool,
&TxInAdmin {
amount: req.amount,
@@ -134,9 +176,15 @@ impl WireGatewayImpl for MagnetWireGateway {
},
)
.await?;
- Ok(AddKycauthResponse {
- row_id: safe_u64(result.row_id),
- timestamp: result.timestamp,
- })
+ match res {
+ AddIncomingResult::Success(res) => Ok(AddKycauthResponse {
+ row_id: safe_u64(res.row_id),
+ timestamp: res.timestamp,
+ }),
+ AddIncomingResult::ReservePubReuse => Err(failure(
+ ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT,
+ "reserve_pub used already".to_owned(),
+ )),
+ }
}
}
diff --git a/wire-gateway/magnet-bank/tests/api.rs b/wire-gateway/magnet-bank/tests/api.rs
@@ -0,0 +1,96 @@
+/*
+ This file is part of TALER
+ Copyright (C) 2025 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+use std::sync::Arc;
+
+use magnet_bank::{db, wire_gateway::MagnetWireGateway};
+use sqlx::PgPool;
+use taler_api::{auth::AuthMethod, standard_layer, subject::OutgoingSubject};
+use taler_common::{
+ api_common::ShortHashCode,
+ api_wire::{OutgoingHistory, TransferState},
+ types::{amount::amount, payto::payto, timestamp::Timestamp, url},
+};
+use test_utils::{
+ axum_test::TestServer,
+ db_test_setup,
+ helpers::TestResponseHelper,
+ routine::{admin_add_incoming_routine, routine_pagination, transfer_routine},
+};
+
+async fn setup() -> (TestServer, PgPool) {
+ let pool = db_test_setup().await;
+ db::db_init(&pool, false).await.unwrap();
+ let gateway = MagnetWireGateway::start(pool.clone(), payto("payto://test")).await;
+ let server = TestServer::new(standard_layer(
+ taler_api::wire_gateway_api(Arc::new(gateway)),
+ AuthMethod::None,
+ ))
+ .unwrap();
+
+ (server, pool)
+}
+
+#[tokio::test]
+async fn transfer() {
+ let (server, _) = setup().await;
+ transfer_routine(&server, TransferState::pending).await;
+}
+
+#[tokio::test]
+async fn outgoing_history() {
+ let (server, pool) = setup().await;
+ server.get("/history/outgoing").await.assert_no_content();
+ routine_pagination::<OutgoingHistory, _>(
+ &server,
+ "/history/outgoing",
+ |it| {
+ it.outgoing_transactions
+ .into_iter()
+ .map(|it| *it.row_id as i64)
+ .collect()
+ },
+ |_, i| {
+ let acquire = pool.acquire();
+ async move {
+ let mut conn = acquire.await.unwrap();
+ db::register_tx_out(
+ &mut *conn,
+ &db::TxOut {
+ code: i as u64,
+ amount: amount("EUR:10"),
+ subject: "subject".to_owned(),
+ credit_payto: payto("payto://"),
+ timestamp: Timestamp::now_stable(),
+ },
+ &Some(OutgoingSubject(
+ ShortHashCode::rand(),
+ url("https://exchange.test"),
+ )),
+ )
+ .await
+ .unwrap();
+ }
+ },
+ )
+ .await;
+}
+
+#[tokio::test]
+async fn admin_add_incoming() {
+ let (server, _) = setup().await;
+ admin_add_incoming_routine(&server).await;
+}