taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit 19db9e56b96f9a6b9ce85c5ad246080e9b775adf
parent 4edbc19380455a0f7079345ec1b897b89c4f2782
Author: Antoine A <>
Date:   Tue, 16 Dec 2025 12:06:51 +0100

cyclos: init taler-cyclos

Diffstat:
MCargo.toml | 1+
MMakefile | 18+++++++++++-------
Mcommon/taler-api/src/db.rs | 24+++++++++++++++++-------
Mcommon/taler-api/tests/common/db.rs | 2+-
Acommon/taler-common/src/error.rs | 46++++++++++++++++++++++++++++++++++++++++++++++
Mcommon/taler-common/src/lib.rs | 1+
Mcommon/taler-common/src/log.rs | 7++++---
Mcommon/taler-common/src/types/amount.rs | 21++++++++++++++++++++-
Acontrib/taler-cyclos-dbconfig | 167+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adocker-compose.yml | 48++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-cyclos/Cargo.toml | 35+++++++++++++++++++++++++++++++++++
Ataler-cyclos/cyclos.conf | 82+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-cyclos/db/cyclos-0001.sql | 128+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-cyclos/db/cyclos-drop.sql | 30++++++++++++++++++++++++++++++
Ataler-cyclos/db/cyclos-procedures.sql | 334+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-cyclos/src/bin/cyclos-codegen.rs | 84+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-cyclos/src/bin/cyclos-harness.rs | 329+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-cyclos/src/config.rs | 28++++++++++++++++++++++++++++
Ataler-cyclos/src/constants.rs | 19+++++++++++++++++++
Ataler-cyclos/src/cyclos_api/api.rs | 150+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-cyclos/src/cyclos_api/client.rs | 100+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-cyclos/src/cyclos_api/mod.rs | 19+++++++++++++++++++
Ataler-cyclos/src/cyclos_api/types.rs | 428+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-cyclos/src/db.rs | 1420+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-cyclos/src/lib.rs | 95+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-cyclos/src/main.rs | 19+++++++++++++++++++
Ataler-cyclos/src/worker.rs | 299+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtaler-magnet-bank/db/magnet-bank-procedures.sql | 109++++++++++++++++++++++++++++++++++++++++---------------------------------------
Mtaler-magnet-bank/src/magnet_api/api.rs | 34++--------------------------------
Rtaler-magnet-bank/src/magnet_api.rs -> taler-magnet-bank/src/magnet_api/mod.rs | 0
Mtaler-magnet-bank/src/worker.rs | 8++++----
31 files changed, 3976 insertions(+), 109 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml @@ -6,6 +6,7 @@ members = [ "common/taler-build", "common/taler-test-utils", "taler-magnet-bank", + "taler-cyclos", ] [workspace.package] diff --git a/Makefile b/Makefile @@ -4,30 +4,34 @@ # Absolute DESTDIR or empty string if DESTDIR unset/empty abs_destdir=$(abspath $(DESTDIR)) -man_dir=$(abs_destdir)$(prefix)/share/man -sql_dir=$(abs_destdir)$(prefix)/share/taler-magnet-bank/sql -config_dir=$(abs_destdir)$(prefix)/share/taler-magnet-bank/config.d bin_dir=$(abs_destdir)$(prefix)/bin lib_dir=$(abs_destdir)$(prefix)/lib +share_dir=$(abs_destdir)$(prefix)/share +man_dir=$(abs_destdir)$(prefix)/share/man all: build .PHONY: build build: - cargo build --release --bin taler-magnet-bank + cargo build --release --bin taler-magnet-bank --bin taler-cyclos .PHONY: install-nobuild-files install-nobuild-files: - install -m 644 -D -t $(config_dir) taler-magnet-bank/magnet-bank.conf - install -m 644 -D -t $(sql_dir) common/taler-common/db/versioning.sql - install -m 644 -D -t $(sql_dir) taler-magnet-bank/db/magnet-bank*.sql + install -m 644 -D -t $(share_dir)/taler-magnet-bank/config.d taler-magnet-bank/magnet-bank.conf + install -m 644 -D -t $(share_dir)/taler-magnet-bank/sql common/taler-common/db/versioning.sql + install -m 644 -D -t $(share_dir)/taler-magnet-bank/sql taler-magnet-bank/db/magnet-bank*.sql install -m 644 -D -t $(man_dir)/man1 doc/prebuilt/man/taler-magnet-bank.1 install -m 644 -D -t $(man_dir)/man5 doc/prebuilt/man/taler-magnet-bank.conf.5 + install -m 644 -D -t $(share_dir)/taler-cyclos/config.d taler-cyclos/cyclos.conf + install -m 644 -D -t $(share_dir)/taler-cyclos/sql common/taler-common/db/versioning.sql + install -m 644 -D -t $(share_dir)/taler-cyclos/sql taler-cyclos/db/cyclos*.sql .PHONY: install install: build install-nobuild-files install -D -t $(bin_dir) contrib/taler-magnet-bank-dbconfig install -D -t $(bin_dir) target/release/taler-magnet-bank + install -D -t $(bin_dir) contrib/taler-cyclos-dbconfig + install -D -t $(bin_dir) target/release/taler-cyclos .PHONY: check check: install-nobuild-files diff --git a/common/taler-api/src/db.rs b/common/taler-api/src/db.rs @@ -203,6 +203,15 @@ pub trait TypeHelper { fn try_get_iban<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<IBAN> { self.try_get_parse(index) } + fn try_get_decimal<I: sqlx::ColumnIndex<Self>>( + &self, + val: I, + frac: I, + ) -> sqlx::Result<Decimal> { + let val = self.try_get_u64(val)?; + let frac = self.try_get_u32(frac)?; + Ok(Decimal::new(val, frac)) + } fn try_get_amount(&self, index: &str, currency: &Currency) -> sqlx::Result<Amount>; fn try_get_amount_i(&self, index: usize, currency: &Currency) -> sqlx::Result<Amount>; } @@ -239,16 +248,17 @@ impl TypeHelper for PgRow { let frac_idx = format!("{index}_frac"); let val_idx = val_idx.as_str(); let frac_idx = frac_idx.as_str(); - let val = self.try_get_u64(val_idx)?; - let frac = self.try_get_u32(frac_idx)?; - Ok(Amount::new(currency, val, frac)) + Ok(Amount::new_decimal( + currency, + self.try_get_decimal(val_idx, frac_idx)?, + )) } fn try_get_amount_i(&self, index: usize, currency: &Currency) -> sqlx::Result<Amount> { - let val = self.try_get_u64(index)?; - let frac = self.try_get_u32(index + 1)?; - - Ok(Amount::new(currency, val, frac)) + Ok(Amount::new_decimal( + currency, + self.try_get_decimal(index, index + 1)?, + )) } } diff --git a/common/taler-api/tests/common/db.rs b/common/taler-api/tests/common/db.rs @@ -231,7 +231,7 @@ pub async fn add_incoming( } else { AddIncomingResult::Success { id: r.try_get_safeu64("out_tx_row_id")?, - created_at: r.try_get_timestamp("out_created_at")?, + created_at: r.try_get_timestamp("out_created_at")?.into(), } }) }) diff --git a/common/taler-common/src/error.rs b/common/taler-common/src/error.rs @@ -0,0 +1,46 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +/// Error wrapper than display an error with its sources +#[derive(Debug)] +pub struct FmtSource<E: std::error::Error>(E); + +fn fmt_with_source( + f: &mut std::fmt::Formatter<'_>, + mut e: &dyn std::error::Error, +) -> std::fmt::Result { + loop { + write!(f, "{}", &e)?; + if let Some(source) = e.source() { + write!(f, ": ")?; + e = source; + } else { + return Ok(()); + } + } +} + +impl<E: std::error::Error> std::fmt::Display for FmtSource<E> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + fmt_with_source(f, &self.0) + } +} + +impl<E: std::error::Error> From<E> for FmtSource<E> { + fn from(value: E) -> Self { + Self(value) + } +} diff --git a/common/taler-common/src/lib.rs b/common/taler-common/src/lib.rs @@ -29,6 +29,7 @@ pub mod api_wire; pub mod cli; pub mod config; pub mod db; +pub mod error; pub mod error_code; pub mod json_file; pub mod log; diff --git a/common/taler-common/src/log.rs b/common/taler-common/src/log.rs @@ -69,8 +69,9 @@ pub fn taler_logger(max_level: Option<Level>) -> impl SubscriberInitExt { ) .with(tracing_subscriber::filter::filter_fn(move |metadata| { let target = metadata.target(); - max_level == Level::TRACE - || (*metadata.level() <= max_level - && !(target.starts_with("sqlx") || target.starts_with("hyper_util"))) + *metadata.level() <= max_level + && !(target.starts_with("sqlx") + || target.starts_with("hyper_util") + || target.starts_with("reqwest")) })) } diff --git a/common/taler-common/src/types/amount.rs b/common/taler-common/src/types/amount.rs @@ -99,7 +99,7 @@ impl Display for Currency { } #[derive( - Debug, Clone, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay, + Debug, Clone, Copy, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay, )] pub struct Decimal { /** Integer part */ @@ -109,6 +109,10 @@ pub struct Decimal { } impl Decimal { + pub fn new(val: u64, frac: u32) -> Self { + Self { val, frac } + } + pub const fn max() -> Self { Self { val: MAX_VALUE, @@ -137,6 +141,16 @@ impl Decimal { .expect("amount fraction overflow should never happen with normalized amounts"); self.normalize() } + + pub fn try_sub(mut self, rhs: &Self) -> Option<Self> { + if rhs.frac > self.frac { + self.val = self.val.checked_sub(1)?; + self.frac += FRAC_BASE; + } + self.val = self.val.checked_sub(rhs.val)?; + self.frac = self.frac.checked_sub(rhs.frac)?; + self.normalize() + } } #[derive(Debug, thiserror::Error)] @@ -205,6 +219,11 @@ impl Display for Decimal { } } +#[track_caller] +pub fn decimal(decimal: impl AsRef<str>) -> Decimal { + decimal.as_ref().parse().expect("Invalid decimal constant") +} + /// <https://docs.taler.net/core/api-common.html#tsref-type-Amount> #[derive( Debug, Clone, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay, diff --git a/contrib/taler-cyclos-dbconfig b/contrib/taler-cyclos-dbconfig @@ -0,0 +1,167 @@ +#!/bin/bash +# This file is part of GNU TALER. +# Copyright (C) 2025 Taler Systems SA +# +# TALER is free software; you can redistribute it and/or modify it under the +# terms of the GNU Lesser General Public License as published by the Free Software +# Foundation; either version 2.1, or (at your option) any later version. +# +# TALER is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. +# +# You should have received a copy of the GNU Lesser General Public License along with +# TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +# +# @author Antoine d'Aligny + +# Error checking on +set -eu + +# 1 is true, 0 is false +RESET_DB=0 +FORCE_PERMS=0 +SKIP_INIT=0 +DBUSER="taler-cyclos-httpd" +DBGROUP="taler-cyclos-db" +CFGFILE="/etc/taler-cyclos/taler-cyclos.conf" + +# Parse command-line options +while getopts 'c:g:hprsu:' OPTION; do + case "$OPTION" in + c) + CFGFILE="$OPTARG" + ;; + g) + DBGROUP="$OPTARG" + ;; + h) + echo 'Supported options:' + echo " -c FILENAME -- use configuration FILENAME (default: $CFGFILE)" + echo " -g GROUP -- taler-cyclos to be run by GROUP (default: $DBGROUP)" + echo " -h -- print this help text" + echo " -r -- reset database (dangerous)" + echo " -p -- force permission setup even without database initialization" + echo " -s -- skip database initialization" + echo " -u USER -- taler-cyclos to be run by USER (default: $DBUSER)" + exit 0 + ;; + p) + FORCE_PERMS="1" + ;; + r) + RESET_DB="1" + ;; + s) + SKIP_INIT="1" + ;; + u) + DBUSER="$OPTARG" + ;; + ?) + echo "Unrecognized command line option '$OPTION'" 1 &>2 + exit 1 + ;; + esac +done + +function exit_fail() { + echo "$@" >&2 + exit 1 +} + +if ! id postgres >/dev/null; then + exit_fail "Could not find 'postgres' user. Please install Postgresql first" +fi + +if ! taler-cyclos --version 2>/dev/null; then + exit_fail "Required 'taler-cyclos' not found. Please fix your installation." +fi + +if [ "$(id -u)" -ne 0 ]; then + exit_fail "This script must be run as root" +fi + +# Check OS users exist +if ! id "$DBUSER" >/dev/null; then + exit_fail "Could not find '$DBUSER' user. Please set it up first" +fi + +# Create DB user matching OS user name +echo "Setting up database user '$DBUSER'." 1>&2 +if ! sudo -i -u postgres createuser "$DBUSER" 2>/dev/null; then + echo "Database user '$DBUSER' already existed. Continuing anyway." 1>&2 +fi + +# Check database name +DBPATH=$(taler-cyclos -c "$CFGFILE" config get cyclosdb-postgres CONFIG) +if ! echo "$DBPATH" | grep "postgres://" >/dev/null; then + exit_fail "Invalid database configuration value '$DBPATH'." 1>&2 +fi +DBNAME=$(echo "$DBPATH" | sed -e "s/postgres:\/\/.*\///" -e "s/?.*//") + +# Reset database +if sudo -i -u postgres psql "$DBNAME" </dev/null 2>/dev/null; then + if [ 1 = "$RESET_DB" ]; then + echo "Deleting existing database '$DBNAME'." 1>&2 + if ! sudo -i -u postgres dropdb "$DBNAME"; then + exit_fail "Failed to delete existing database '$DBNAME'" + fi + DO_CREATE=1 + else + echo "Database '$DBNAME' already exists, continuing anyway." + DO_CREATE=0 + fi +else + DO_CREATE=1 +fi + +# Create database +if [ 1 = "$DO_CREATE" ]; then + echo "Creating database '$DBNAME'." 1>&2 + if ! sudo -i -u postgres createdb -O "$DBUSER" "$DBNAME"; then + exit_fail "Failed to create database '$DBNAME'" + fi +fi + +# Run dbinit +if [ 0 = "$SKIP_INIT" ]; then + echo "Initialize database schema" + if ! sudo -u "$DBUSER" taler-cyclos dbinit -c "$CFGFILE"; then + exit_fail "Failed to initialize database schema" + fi +fi + +# Set permission for group user +if [ 0 = "$SKIP_INIT" ] || [ 1 = "$FORCE_PERMS" ]; then + # Create DB group matching OS group name + echo "Setting up database group '$DBGROUP'." 1>&2 + if ! sudo -i -u postgres createuser "$DBGROUP" 2>/dev/null; then + echo "Database group '$DBGROUP' already existed. Continuing anyway." 1>&2 + fi + if ! sudo -i -u postgres psql "$DBNAME" <<-EOF + GRANT ALL ON SCHEMA cyclos TO "$DBGROUP"; + GRANT SELECT ON ALL TABLES IN SCHEMA cyclos TO "$DBGROUP"; +EOF + then + exit_fail "Failed to grant access to '$DBGROUP'." + fi + + # Update group users rights + DB_GRP="$(getent group "$DBGROUP" | sed -e "s/.*://g" -e "s/,/ /g")" + echo "Initializing permissions for '$DB_GRP' users." 1>&2 + for GROUPIE in $DB_GRP; do + if [ "$GROUPIE" != "$DBUSER" ]; then + if ! sudo -i -u postgres createuser "$GROUPIE" 2>/dev/null; then + echo "Database user '$GROUPIE' already existed. Continuing anyway." 1>&2 + fi + fi + + if ! echo "GRANT \"$DBGROUP\" TO \"$GROUPIE\"" | + sudo -i -u postgres psql "$DBNAME"; then + exit_fail "Failed to make '$GROUPIE' part of '$DBGROUP' db group." + fi + done +fi + +echo "Database configuration finished." 1>&2 diff --git a/docker-compose.yml b/docker-compose.yml @@ -0,0 +1,47 @@ +services: + cyclos-db: + image: docker.io/kartoza/postgis:latest + container_name: cyclos-db + environment: + POSTGRES_DBNAME: cyclos + POSTGRES_USER: cyclos + POSTGRES_PASSWORD: password + volumes: + - cyclos-db-data:/var/lib/postgresql/data + networks: + - cyclos-network + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "pg_isready -U cyclos"] + interval: 10s + timeout: 5s + retries: 5 + + cyclos-app: + image: docker.io/cyclos/cyclos:latest + container_name: cyclos-app + environment: + DB_HOST: cyclos-db + DB_PORT: 5432 + DB_NAME: cyclos + DB_USER: cyclos + DB_PASSWORD: password + CYCLOS_ROOT_URL: http://localhost:8080 + ports: + - "8080:8080" + volumes: + - cyclos-app-data:/usr/local/cyclos/data + networks: + - cyclos-network + depends_on: + cyclos-db: + condition: service_healthy + restart: unless-stopped + +networks: + cyclos-network: + driver: bridge + +volumes: + cyclos-db-data: + cyclos-app-data: +\ No newline at end of file diff --git a/taler-cyclos/Cargo.toml b/taler-cyclos/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "taler-cyclos" +version = "0.0.0" +description = "Taler Cyclos adapter" +edition.workspace = true +authors.workspace = true +homepage.workspace = true +repository.workspace = true +license-file.workspace = true + +[dependencies] +reqwest = { version = "0.12", default-features = false, features = [ + "json", + "rustls-tls", +] } +sqlx.workspace = true +serde_json = { workspace = true, features = ["raw_value"] } +jiff = { workspace = true, features = ["serde"] } +taler-common.workspace = true +taler-api.workspace = true +taler-build.workspace = true +clap.workspace = true +serde.workspace = true +serde_with.workspace = true +serde_path_to_error.workspace = true +serde_urlencoded.workspace = true +thiserror.workspace = true +tracing.workspace = true +tokio.workspace = true +anyhow.workspace = true +base64.workspace = true +owo-colors.workspace = true + +[dev-dependencies] +taler-test-utils.workspace = true diff --git a/taler-cyclos/cyclos.conf b/taler-cyclos/cyclos.conf @@ -0,0 +1,81 @@ +[cyclos] +# IBAN of the Magnet Bank account to sync +IBAN = + +# Legal entity that is associated with the Magnet Bank account +NAME = + +[magnet-bank-worker] +# URL of the Magnet Bank API server +API_URL = "https://mobil.magnetbank.hu" + +# Your Magnet Bank API unique identifier +# CONSUMER_KEY = "Consumer" + +# Your Magnet Bank API confidential key +# CONSUMER_SECRET = "qikgjxc5y06tiil7qgrmh09l7rfi5a8e" + +# File that holds the crypto keys and access token. +# KEYS_FILE = ${MAGNET_BANK_HOME}/keys.json + +# Specify the account type and therefore the indexing behavior. +# This can either can be normal or exchange. +# Exchange accounts bounce invalid incoming Taler transactions. +ACCOUNT_TYPE = exchange + +[magnet-bank-httpd] +# How "taler-magnet-bank serve" serves its API, this can either be tcp or unix +SERVE = tcp + +# Port on which the HTTP server listens, e.g. 9967. Only used if SERVE is tcp. +PORT = 8080 + +# Which IP address should we bind to? E.g. ``127.0.0.1`` or ``::1``for loopback. Only used if SERVE is tcp. +BIND_TO = 0.0.0.0 + +# Which unix domain path should we bind to? Only used if SERVE is unix. +# UNIXPATH = libeufin-bank.sock + +# What should be the file access permissions for UNIXPATH? Only used if SERVE is unix. +# UNIXPATH_MODE = 660 + +[magnet-bank-httpd-wire-gateway-api] +# Whether to serve the Wire Gateway API +ENABLED = NO + +# Authentication scheme, this can either can be basic, bearer or none. +AUTH_METHOD = bearer + +# User name for basic authentication scheme +# USERNAME = + +# Password for basic authentication scheme +# PASSWORD = + +# Token for bearer authentication scheme +TOKEN = + + +[magnet-bank-httpd-revenue-api] +# Whether to serve the Revenue API +ENABLED = NO + +# Authentication scheme, this can either can be basic, bearer or none. +AUTH_METHOD = bearer + +# User name for basic authentication scheme +# USERNAME = + +# Password for basic authentication scheme +# PASSWORD = + +# Token for bearer authentication scheme +TOKEN = + + +[cyclosdb-postgres] +# DB connection string +CONFIG = postgres:///taler-cyclos + +# Where are the SQL files to setup our tables? +SQL_DIR = ${DATADIR}/sql/ +\ No newline at end of file diff --git a/taler-cyclos/db/cyclos-0001.sql b/taler-cyclos/db/cyclos-0001.sql @@ -0,0 +1,127 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2025 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +SELECT _v.register_patch('cyclos-0001', NULL, NULL); + +CREATE SCHEMA cyclos; +SET search_path TO cyclos; + +CREATE TYPE taler_amount AS (val INT8, frac INT4); +COMMENT ON TYPE taler_amount IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; + +CREATE TABLE tx_in( + tx_in_id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + transfer_id INT8 UNIQUE, + amount taler_amount NOT NULL, + subject TEXT NOT NULL, + debit_account INT8 NOT NULL, + debit_name TEXT NOT NULL, + valued_at INT8 NOT NULL, + registered_at INT8 NOT NULL +); +COMMENT ON TABLE tx_in IS 'Incoming transactions'; + +CREATE TABLE tx_out( + tx_out_id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + transfer_id INT8 UNIQUE, + amount taler_amount NOT NULL, + subject TEXT NOT NULL, + credit_account INT8 NOT NULL, + credit_name TEXT NOT NULL, + valued_at INT8 NOT NULL, + registered_at INT8 NOT NULL +); +COMMENT ON TABLE tx_in IS 'Outgoing transactions'; + +CREATE TYPE incoming_type AS ENUM + ('reserve' ,'kyc', 'wad'); +COMMENT ON TYPE incoming_type IS 'Types of incoming talerable transactions'; + +CREATE TABLE taler_in( + tx_in_id INT8 PRIMARY KEY REFERENCES tx_in(tx_in_id) ON DELETE CASCADE, + type incoming_type NOT NULL, + metadata BYTEA NOT NULL, + origin_exchange_url TEXT, + CONSTRAINT polymorphism CHECK( + CASE type + WHEN 'wad' THEN LENGTH(metadata)=24 AND origin_exchange_url IS NOT NULL + ELSE LENGTH(metadata)=32 AND origin_exchange_url IS NULL + END + ) +); +COMMENT ON TABLE tx_in IS 'Incoming talerable transactions'; + +CREATE UNIQUE INDEX taler_in_unique_reserve_pub ON taler_in (metadata) WHERE type = 'reserve'; + +CREATE TABLE taler_out( + tx_out_id INT8 PRIMARY KEY REFERENCES tx_out(tx_out_id) ON DELETE CASCADE, + wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32), + exchange_base_url TEXT NOT NULL +); +COMMENT ON TABLE tx_in IS 'Outgoing talerable transactions'; + +CREATE TYPE transfer_status AS ENUM( + 'pending', + 'transient_failure', + 'permanent_failure', + 'success', + 'late_failure' +); +COMMENT ON TYPE transfer_status IS 'Status of an initiated outgoing transaction'; + +CREATE TABLE initiated( + initiated_id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + amount taler_amount NOT NULL, + subject TEXT NOT NULL, + credit_account INT8 NOT NULL, + credit_name TEXT NOT NULL, + status transfer_status NOT NULL DEFAULT 'pending', + status_msg TEXT, + tx_id INT8 UNIQUE, + last_submitted INT8, + submission_counter INT2 NOT NULL DEFAULT 0, + tx_out_id INT8 UNIQUE REFERENCES tx_out(tx_out_id) ON DELETE CASCADE, + initiated_at INT8 NOT NULL +); +COMMENT ON TABLE tx_in IS 'Initiated outgoing transactions'; + +CREATE TABLE transfer( + initiated_id INT8 PRIMARY KEY REFERENCES initiated(initiated_id) ON DELETE CASCADE, + request_uid BYTEA UNIQUE NOT NULL CHECK (LENGTH(request_uid)=64), + wtid BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid)=32), + exchange_base_url TEXT NOT NULL +); +COMMENT ON TABLE transfer IS 'Wire Gateway transfers'; + +CREATE TABLE bounced( + tx_in_id INT8 NOT NULL UNIQUE REFERENCES tx_in(tx_in_id) ON DELETE CASCADE, + tx_out_id INT8 UNIQUE REFERENCES tx_out(tx_out_id) ON DELETE CASCADE, + chargeback_id INT8 NOT NULL UNIQUE, + reason TEXT NOT NULL +); +COMMENT ON TABLE tx_in IS 'Bounced transactions'; + +CREATE TABLE kv( + key TEXT NOT NULL UNIQUE PRIMARY KEY, + value JSONB NOT NULL +); +COMMENT ON TABLE kv IS 'KV table'; + +CREATE TYPE register_result AS ENUM( + 'idempotent', + 'known', + 'recovered' +); +COMMENT ON TYPE register_result IS 'Status of a registered transaction'; +\ No newline at end of file diff --git a/taler-cyclos/db/cyclos-drop.sql b/taler-cyclos/db/cyclos-drop.sql @@ -0,0 +1,29 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2025 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +DO +$do$ +DECLARE + patch text; +BEGIN + IF EXISTS(SELECT FROM information_schema.schemata WHERE schema_name='_v') THEN + FOR patch IN SELECT patch_name FROM _v.patches WHERE patch_name LIKE 'cyclos_%' LOOP + PERFORM _v.unregister_patch(patch); + END LOOP; + END IF; +END +$do$; + +DROP SCHEMA IF EXISTS cyclos CASCADE; +\ No newline at end of file diff --git a/taler-cyclos/db/cyclos-procedures.sql b/taler-cyclos/db/cyclos-procedures.sql @@ -0,0 +1,333 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2025 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +SET search_path TO cyclos; + +-- Remove all existing functions +DO +$do$ +DECLARE + _sql text; +BEGIN + SELECT INTO _sql + string_agg(format('DROP %s %s CASCADE;' + , CASE prokind + WHEN 'f' THEN 'FUNCTION' + WHEN 'p' THEN 'PROCEDURE' + END + , oid::regprocedure) + , E'\n') + FROM pg_proc + WHERE pronamespace = 'cyclos'::regnamespace; + + IF _sql IS NOT NULL THEN + EXECUTE _sql; + END IF; +END +$do$; + +CREATE FUNCTION register_tx_in( + IN in_transfer_id INT8, + IN in_amount taler_amount, + IN in_subject TEXT, + IN in_debit_account INT8, + IN in_debit_name TEXT, + IN in_valued_at INT8, + IN in_type incoming_type, + IN in_metadata BYTEA, + IN in_now INT8, + -- Error status + OUT out_reserve_pub_reuse BOOLEAN, + -- Success return + OUT out_tx_row_id INT8, + OUT out_valued_at INT8, + OUT out_new BOOLEAN +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check for idempotence +SELECT tx_in_id, valued_at +INTO out_tx_row_id, out_valued_at +FROM tx_in +WHERE (in_transfer_id IS NOT NULL AND transfer_id = in_transfer_id) -- Cyclos transaction + OR (in_transfer_id IS NULL AND amount = in_amount AND debit_account = in_debit_account AND subject = in_subject); -- Admin transaction +out_new = NOT found; +IF NOT out_new THEN + out_reserve_pub_reuse=false; + RETURN; +END IF; + +-- Check conflict +SELECT in_type = 'reserve'::incoming_type AND EXISTS(SELECT FROM taler_in WHERE metadata = in_metadata AND type = 'reserve') + INTO out_reserve_pub_reuse; +IF out_reserve_pub_reuse THEN + RETURN; +END IF; + +-- Insert new incoming transaction +out_valued_at = in_valued_at; +INSERT INTO tx_in ( + transfer_id, + amount, + subject, + debit_account, + debit_name, + valued_at, + registered_at +) VALUES ( + in_transfer_id, + in_amount, + in_subject, + in_debit_account, + in_debit_name, + in_valued_at, + in_now +) +RETURNING tx_in_id INTO out_tx_row_id; +-- Notify new incoming transaction registration +PERFORM pg_notify('tx_in', out_tx_row_id || ''); +IF in_type IS NOT NULL THEN + -- Insert new incoming talerable transaction + INSERT INTO taler_in ( + tx_in_id, + type, + metadata + ) VALUES ( + out_tx_row_id, + in_type, + in_metadata + ); + -- Notify new incoming talerable transaction registration + PERFORM pg_notify('taler_in', out_tx_row_id || ''); +END IF; +END $$; +COMMENT ON FUNCTION register_tx_in IS 'Register an incoming transaction idempotently'; + +CREATE FUNCTION register_tx_out( + IN in_transfer_id INT8, + IN in_amount taler_amount, + IN in_subject TEXT, + IN in_credit_account INT8, + IN in_credit_name TEXT, + IN in_valued_at INT8, + IN in_wtid BYTEA, + IN in_origin_exchange_url TEXT, + IN in_bounced INT8, + IN in_now INT8, + -- Success return + OUT out_tx_row_id INT8, + OUT out_result register_result +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check for idempotence +SELECT tx_out_id INTO out_tx_row_id +FROM tx_out WHERE transfer_id = in_transfer_id; + +IF FOUND THEN + out_result = 'idempotent'; + RETURN; +END IF; + +-- Insert new outgoing transaction +INSERT INTO tx_out ( + transfer_id, + amount, + subject, + credit_account, + credit_name, + valued_at, + registered_at +) VALUES ( + in_transfer_id, + in_amount, + in_subject, + in_credit_account, + in_credit_name, + in_valued_at, + in_now +) +RETURNING tx_out_id INTO out_tx_row_id; +-- Notify new outgoing transaction registration +PERFORM pg_notify('tx_out', out_tx_row_id || ''); + +-- Update initiated status +UPDATE initiated +SET + tx_out_id = out_tx_row_id, + status = 'success', + status_msg = NULL +WHERE tx_id = in_transfer_id; -- This will not work, should we pass the transaction id ? +IF FOUND THEN + out_result = 'known'; +ELSE + out_result = 'recovered'; +END IF; + +IF in_wtid IS NOT NULL THEN + -- Insert new outgoing talerable transaction + INSERT INTO taler_out ( + tx_out_id, + wtid, + exchange_base_url + ) VALUES ( + out_tx_row_id, + in_wtid, + in_origin_exchange_url + ) ON CONFLICT (wtid) DO NOTHING; + IF FOUND THEN + -- Notify new outgoing talerable transaction registration + PERFORM pg_notify('taler_out', out_tx_row_id || ''); + END IF; +ELSIF in_bounced IS NOT NULL THEN + --UPDATE initiated + --SET + -- tx_out_id = out_tx_row_id, + -- status = 'success', + -- status_msg = NULL + --FROM bounced JOIN tx_in USING (tx_in_id) + --WHERE initiated.initiated_id = bounced.initiated_id AND tx_in.transfer_id = in_transfer_id; +END IF; +END $$; +COMMENT ON FUNCTION register_tx_out IS 'Register an outgoing transaction idempotently'; + +CREATE FUNCTION taler_transfer( + IN in_request_uid BYTEA, + IN in_wtid BYTEA, + IN in_subject TEXT, + IN in_amount taler_amount, + IN in_exchange_base_url TEXT, + IN in_credit_account INT8, + IN in_credit_name TEXT, + IN in_now INT8, + -- Error return + OUT out_request_uid_reuse BOOLEAN, + OUT out_wtid_reuse BOOLEAN, + -- Success return + OUT out_initiated_row_id INT8, + OUT out_initiated_at INT8 +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check for idempotence and conflict +SELECT (amount != in_amount + OR credit_account != in_credit_account + OR exchange_base_url != in_exchange_base_url + OR wtid != in_wtid) + ,initiated_id, initiated_at +INTO out_request_uid_reuse, out_initiated_row_id, out_initiated_at +FROM transfer JOIN initiated USING (initiated_id) +WHERE request_uid = in_request_uid; +IF FOUND THEN + out_wtid_reuse=FALSE; + RETURN; +END IF; +out_request_uid_reuse=FALSE; +-- Check for wtid reuse +out_wtid_reuse = EXISTS(SELECT FROM transfer WHERE wtid=in_wtid); +IF out_wtid_reuse THEN + RETURN; +END IF; +-- Insert an initiated outgoing transaction +out_initiated_at = in_now; +INSERT INTO initiated ( + amount, + subject, + credit_account, + credit_name, + initiated_at +) VALUES ( + in_amount, + in_subject, + in_credit_account, + in_credit_name, + in_now +) RETURNING initiated_id +INTO out_initiated_row_id; +-- Insert a transfer operation +INSERT INTO transfer ( + initiated_id, + request_uid, + wtid, + exchange_base_url +) VALUES ( + out_initiated_row_id, + in_request_uid, + in_wtid, + in_exchange_base_url +); +PERFORM pg_notify('transfer', out_initiated_row_id || ''); +END $$; + +CREATE FUNCTION initiated_status_update( + IN in_initiated_id INT8, + IN in_status transfer_status, + IN in_status_msg TEXT +) +RETURNS void +LANGUAGE plpgsql AS $$ +DECLARE +current_status transfer_status; +BEGIN + -- Check current status + SELECT status INTO current_status FROM initiated + WHERE initiated_id = in_initiated_id; + IF FOUND THEN + -- Update unsettled transaction status + IF current_status = 'success' AND in_status = 'permanent_failure' THEN + UPDATE initiated + SET status = 'late_failure', status_msg = in_status_msg + WHERE initiated_id = in_initiated_id; + ELSIF current_status NOT IN ('success', 'permanent_failure', 'late_failure') THEN + UPDATE initiated + SET status = in_status, status_msg = in_status_msg + WHERE initiated_id = in_initiated_id; + END IF; + END IF; +END $$; + +CREATE FUNCTION register_bounced_tx_in( + IN in_transfer_id INT8, + IN in_amount taler_amount, + IN in_subject TEXT, + IN in_debit_account INT8, + IN in_debit_name TEXT, + IN in_valued_at INT8, + IN in_chargeback_id INT8, + IN in_reason TEXT, + IN in_now INT8, + -- Success return + OUT out_tx_row_id INT8, + OUT out_tx_new BOOLEAN +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Register incoming transaction idempotently +SELECT register_tx_in.out_tx_row_id, register_tx_in.out_new +INTO out_tx_row_id, out_tx_new +FROM register_tx_in(in_transfer_id, in_amount, in_subject, in_debit_account, in_debit_name, in_valued_at, NULL, NULL, in_now); + +-- Register the bounce +INSERT INTO bounced ( + tx_in_id, + chargeback_id, + reason +) VALUES ( + out_tx_row_id, + in_chargeback_id, + in_reason +) ON CONFLICT (chargeback_id) DO NOTHING; +END $$; +COMMENT ON FUNCTION register_bounced_tx_in IS 'Register a bounced incoming transaction idempotently'; +\ No newline at end of file diff --git a/taler-cyclos/src/bin/cyclos-codegen.rs b/taler-cyclos/src/bin/cyclos-codegen.rs @@ -0,0 +1,84 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::io::Write; +use std::io::stdout; + +use clap::Parser; +use taler_build::long_version; + +/// Cyclos API schema codegen +#[derive(clap::Parser, Debug)] +#[command(long_version = long_version(), about, long_about = None)] +struct Args { + kind: Kind, + ty: String, +} + +#[derive(Debug, Clone, Copy, clap::ValueEnum)] +enum Kind { + Error, + Enum, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = Args::parse(); + let api: serde_json::Value = reqwest::get("https://demo.cyclos.org/api/openapi.json") + .await? + .json() + .await?; + let schemas = &api["components"]["schemas"]; + + let out = &mut stdout().lock(); + let ty = &schemas[&args.ty]; + let lines: Vec<_> = ty["description"].as_str().unwrap().lines().collect(); + + match args.kind { + Kind::Error => writeln!( + out, + "#[derive(Debug, serde::Deserialize, thiserror::Error)]" + )?, + Kind::Enum => writeln!(out, "#[derive(Debug, serde::Deserialize)]")?, + } + writeln!(out, "#[serde(rename_all = \"camelCase\")]")?; + writeln!(out, "/// {}", lines[0])?; + writeln!(out, "pub enum {} {{", args.ty)?; + for (i, l) in lines[2..].iter().enumerate() { + let name = ty["enum"][i].as_str().unwrap(); + let mut split = l.split("`").skip(1); + let code = split.next().unwrap(); + let msg = split + .next() + .unwrap() + .strip_prefix(": ") + .unwrap() + .trim_end_matches('.'); + assert_eq!(name, code); + match args.kind { + Kind::Error => writeln!(out, " #[error(\"{name} - {msg}\")]")?, + Kind::Enum => writeln!(out, " /// {msg}")?, + } + writeln!( + out, + " {}{},", + name.chars().next().unwrap().to_uppercase(), + &name[1..] + )?; + } + writeln!(out, "}}")?; + Ok(()) +} diff --git a/taler-cyclos/src/bin/cyclos-harness.rs b/taler-cyclos/src/bin/cyclos-harness.rs @@ -0,0 +1,329 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::str::FromStr as _; + +use clap::Parser as _; +use jiff::Timestamp; +use owo_colors::OwoColorize as _; +use sqlx::PgPool; +use taler_build::long_version; +use taler_common::{ + CommonArgs, + api_common::{EddsaPublicKey, HashCode, ShortHashCode, rand_edsa_pub_key}, + api_params::{History, Page}, + api_wire::{IncomingBankTransaction, TransferRequest}, + cli, + config::Config, + db::{dbinit, pool}, + taler_main, + types::{ + amount::{Amount, Currency, Decimal, decimal}, + url, + }, +}; +use taler_cyclos::{ + CyclosId, FullCyclosPayto, + config::{AccountType, parse_db_cfg}, + constants::CONFIG_SOURCE, + cyclos_api::{api::CyclosAuth, client::Client}, + db::{self, TransferResult}, + worker::{Worker, WorkerResult}, +}; + +/// Cyclos Adapter harness test suite +#[derive(clap::Parser, Debug)] +#[command(long_version = long_version(), about, long_about = None)] +struct Args { + #[clap(flatten)] + common: CommonArgs, + + #[command(subcommand)] + cmd: Command, +} + +#[derive(clap::Subcommand, Debug)] +enum Command { + /// Run logic tests + Logic { + #[clap(long, short)] + reset: bool, + }, + /// Run online tests + Online { + #[clap(long, short)] + reset: bool, + }, +} + +fn step(step: &str) { + println!("{}", step.green()); +} + +struct Harness<'a> { + pool: &'a PgPool, + client: Client<'a>, + wire: Client<'a>, + client_id: u64, + wire_id: u64, + currency: Currency, +} + +impl<'a> Harness<'a> { + async fn balance(&self) -> (Decimal, Decimal) { + let (exchange, client) = + tokio::try_join!(self.wire.accounts(), self.client.accounts()).unwrap(); + ( + exchange[0].status.available_balance, + client[0].status.available_balance, + ) + } + + /// Send transaction from client to exchange + async fn client_send(&self, subject: &str, amount: Decimal) { + self.client + .direct_payment(self.wire_id, amount, subject) + .await + .unwrap(); + } + + /// Send transaction from exchange to client + async fn exchange_send(&self, subject: &str, amount: Decimal) { + self.wire + .direct_payment(self.client_id, amount, subject) + .await + .unwrap(); + } + + /// Run the worker once + async fn worker(&'a self) -> WorkerResult { + let db = &mut self.pool.acquire().await.unwrap().detach(); + let account = self.wire.accounts().await.unwrap()[0].clone(); + Worker { + db, + currency: Currency::from_str("TEST").unwrap(), + client: &self.wire, + account_type_id: *account.ty.id, + account_type: AccountType::Exchange, + } + .run() + .await + } + + async fn expect_incoming(&self, key: EddsaPublicKey) { + let transfer = db::incoming_history( + self.pool, + &History { + page: Page { + limit: -1, + offset: None, + }, + timeout_ms: None, + }, + &self.currency, + || tokio::sync::watch::channel(0).1, + ) + .await + .unwrap(); + assert!(matches!( + transfer.first().unwrap(), + IncomingBankTransaction::Reserve { reserve_pub, .. } if *reserve_pub == key, + )); + } + + async fn custom_transfer(&self, amount: Decimal, creditor: &FullCyclosPayto) -> u64 { + let res = db::make_transfer( + self.pool, + &TransferRequest { + request_uid: HashCode::rand(), + amount: Amount::new_decimal(&self.currency, amount), + exchange_base_url: url("https://test.com"), + wtid: ShortHashCode::rand(), + credit_account: creditor.as_payto(), + }, + creditor, + &Timestamp::now(), + ) + .await + .unwrap(); + match res { + TransferResult::Success { id, .. } => id, + TransferResult::RequestUidReuse | TransferResult::WtidReuse => unreachable!(), + } + } +} + +struct Balances<'a> { + client: &'a Harness<'a>, + exchange_balance: Decimal, + client_balance: Decimal, +} + +impl<'a> Balances<'a> { + pub async fn new(client: &'a Harness<'a>) -> Self { + let (exchange_balance, client_balance) = client.balance().await; + Self { + client, + exchange_balance, + client_balance, + } + } + + async fn expect_add(&mut self, diff: Decimal) { + self.exchange_balance = self.exchange_balance.try_add(&diff).unwrap(); + self.client_balance = self.client_balance.try_sub(&diff).unwrap(); + + let current = self.client.balance().await; + assert_eq!( + current, + (self.exchange_balance, self.client_balance), + "{current:?} {diff}" + ); + } + + async fn expect_sub(&mut self, diff: Decimal) { + self.exchange_balance = self.exchange_balance.try_sub(&diff).unwrap(); + self.client_balance = self.client_balance.try_add(&diff).unwrap(); + + let current = self.client.balance().await; + assert_eq!( + current, + (self.exchange_balance, self.client_balance), + "{current:?} {diff}" + ); + } +} + +/// Run logic tests against real Cyclos backend +async fn logic_harness(cfg: &Config, reset: bool) -> anyhow::Result<()> { + step("Run Magnet Bank logic harness tests"); + + step("Prepare db"); + let db_cfg = parse_db_cfg(cfg)?; + let pool = pool(db_cfg.cfg, "cyclos").await?; + let mut db = pool.acquire().await?.detach(); + dbinit(&mut db, db_cfg.sql_dir.as_ref(), "cyclos", reset).await?; + + let client = reqwest::Client::new(); + let api_url = reqwest::Url::from_str("http://localhost:8080/api/").unwrap(); + let wire = Client { + client: &client, + api_url: &api_url, + auth: &CyclosAuth::Basic { + username: "wire".into(), + password: "f20n4X3qV44dNoZUmpeU".into(), + }, + }; + let client = Client { + client: &client, + api_url: &api_url, + auth: &CyclosAuth::Basic { + username: "client".into(), + password: "1EkY5JJMrkwyvv9yK7x4".into(), + }, + }; + let currency = Currency::from_str("TEST").unwrap(); + + let harness = Harness { + pool: &pool, + client_id: *client.whoami().await.unwrap().id, + wire_id: *wire.whoami().await.unwrap().id, + client, + wire, + currency, + }; + + step("Warmup"); + harness.worker().await.unwrap(); + + let now = Timestamp::now(); + let balance = &mut Balances::new(&harness).await; + + /*step("Test incoming talerable transaction"); + // Send talerable transaction + let reserve_pub = rand_edsa_pub_key(); + let amount = decimal("3.3"); + harness + .client_send(&format!("Taler {reserve_pub}"), amount) + .await; + // Sync and register + harness.worker().await?; + harness.expect_incoming(reserve_pub).await; + balance.expect_add(amount).await; + + step("Test incoming malformed transaction"); + // Send malformed transaction + let amount = decimal("3.4"); + harness + .client_send(&format!("Malformed test {now}"), amount) + .await; + balance.expect_add(amount).await; + // Sync and bounce + harness.worker().await?; + balance.expect_sub(amount).await;*/ + + step("Test transfer to self"); + // Init a transfer to self + let transfer_id = harness + .custom_transfer( + decimal("10.1"), + &FullCyclosPayto::new(CyclosId(42), "Self".to_string()), + ) + .await; + // Should failed + harness.worker().await?; + // Check transfer failed + /*harness + .expect_transfer_status( + transfer_id, + TransferState::permanent_failure, + Some("409 FORRAS_SZAMLA_ESZAMLA_EGYEZIK 'A forrás és az ellenszámla egyezik!'"), + ) + .await;*/ + + /*step("Test unexpected outgoing"); + // Manual tx from the exchange + let amount = decimal("4"); + harness + .exchange_send(&format!("What is this ? {now}"), amount) + .await; + harness.worker().await?; + // Wait for transaction to finalize + balance.expect_sub(amount).await; + harness.worker().await?;*/ + step("Finish"); + + Ok(()) +} + +/// Run online tests against real Cyclos backend +async fn online_harness(config: &Config, reset: bool) -> anyhow::Result<()> { + step("Run Cyclos harness tests"); + + step("Finish"); + + Ok(()) +} + +fn main() { + let args = Args::parse(); + taler_main(CONFIG_SOURCE, args.common, |cfg| async move { + match args.cmd { + Command::Logic { reset } => logic_harness(&cfg, reset).await, + Command::Online { reset } => online_harness(&cfg, reset).await, + } + }); +} diff --git a/taler-cyclos/src/config.rs b/taler-cyclos/src/config.rs @@ -0,0 +1,28 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use taler_api::config::DbCfg; +use taler_common::config::{Config, ValueErr}; + +#[derive(Debug, Clone, Copy)] +pub enum AccountType { + Exchange, + Normal, +} + +pub fn parse_db_cfg(cfg: &Config) -> Result<DbCfg, ValueErr> { + DbCfg::parse(cfg.section("cyclosdb-postgres")) +} diff --git a/taler-cyclos/src/constants.rs b/taler-cyclos/src/constants.rs @@ -0,0 +1,19 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use taler_common::config::parser::ConfigSource; + +pub const CONFIG_SOURCE: ConfigSource = ConfigSource::new("taler-cyclos", "cyclos", "taler-cyclos"); diff --git a/taler-cyclos/src/cyclos_api/api.rs b/taler-cyclos/src/cyclos_api/api.rs @@ -0,0 +1,150 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::borrow::Cow; + +use reqwest::{Client, Method, RequestBuilder, StatusCode, Url}; +use serde::{Deserialize, Serialize, de::DeserializeOwned}; +use taler_common::error::FmtSource; +use thiserror::Error; + +use crate::cyclos_api::types::{ + ForbiddenError, InputError, NotFoundError, UnauthorizedError, UnexpectedError, +}; + +pub enum CyclosAuth { + None, + Basic { username: String, password: String }, +} + +#[derive(Error, Debug)] +#[error("{method} {path} {kind}")] +pub struct ApiErr { + pub path: Cow<'static, str>, + pub method: Method, + pub kind: ErrKind, +} + +#[derive(Error, Debug)] +pub enum ErrKind { + #[error("transport: {0}")] + Transport(FmtSource<reqwest::Error>), + #[error("JSON body: {0}")] + Json(#[from] serde_path_to_error::Error<serde_json::Error>), + #[error("unauthorized: {0}")] + Unauthorized(#[from] UnauthorizedError), + #[error("forbidden: {0}")] + Forbidden(#[from] ForbiddenError), + #[error("server: {0}")] + Server(#[from] UnexpectedError), + #[error("unknown: {0}")] + Unknown(#[from] NotFoundError), + #[error("input: {0}")] + Input(#[from] InputError), + #[error("status {0}")] + UnexpectedStatus(StatusCode), +} + +impl From<reqwest::Error> for ErrKind { + fn from(value: reqwest::Error) -> Self { + Self::Transport(value.into()) + } +} + +pub type ApiResult<R> = std::result::Result<R, ApiErr>; + +/** Parse JSON and track error path */ +fn parse<'de, T: Deserialize<'de>>(str: &'de str) -> Result<T, ErrKind> { + let deserializer = &mut serde_json::Deserializer::from_str(str); + serde_path_to_error::deserialize(deserializer).map_err(ErrKind::Json) +} + +async fn json_body<T: DeserializeOwned>(res: reqwest::Response) -> Result<T, ErrKind> { + // TODO check content type? + let body = res.text().await?; + //println!("{body}"); + let parsed = parse(&body)?; + Ok(parsed) +} + +pub struct CyclosRequest<'a> { + path: Cow<'static, str>, + method: Method, + builder: RequestBuilder, + auth: &'a CyclosAuth, +} + +impl<'a> CyclosRequest<'a> { + pub fn new( + client: &Client, + method: Method, + base_url: &Url, + path: impl Into<Cow<'static, str>>, + auth: &'a CyclosAuth, + ) -> Self { + let path = path.into(); + let url = base_url.join(&path).unwrap(); + let builder = client.request(method.clone(), url); + Self { + path, + method, + builder, + auth, + } + } + + pub fn query<T: Serialize + ?Sized>(mut self, query: &T) -> Self { + self.builder = self.builder.query(query); + self + } + + pub fn json<T: Serialize + ?Sized>(mut self, json: &T) -> Self { + self.builder = self.builder.json(json); + self + } + + pub async fn parse_json<T: DeserializeOwned>(self) -> ApiResult<T> { + let Self { + path, + builder, + method, + auth, + } = self; + let (client, req) = match auth { + CyclosAuth::None => builder, + CyclosAuth::Basic { username, password } => { + builder.basic_auth(username, Some(password)) + } + } + .build_split(); + async { + let req = req?; + let res = client.execute(req).await?; + let status = res.status(); + match status { + StatusCode::OK | StatusCode::CREATED => json_body(res).await, + StatusCode::UNAUTHORIZED => Err(ErrKind::Unauthorized(json_body(res).await?)), + StatusCode::FORBIDDEN => Err(ErrKind::Forbidden(json_body(res).await?)), + StatusCode::NOT_FOUND => Err(ErrKind::Unknown(json_body(res).await?)), + StatusCode::UNPROCESSABLE_ENTITY => Err(ErrKind::Input(json_body(res).await?)), + StatusCode::INTERNAL_SERVER_ERROR => Err(ErrKind::Forbidden(json_body(res).await?)), + _ => Err(ErrKind::UnexpectedStatus(status)), + } + } + .await + .map_err(|kind| ApiErr { path, method, kind }) + } +} diff --git a/taler-cyclos/src/cyclos_api/client.rs b/taler-cyclos/src/cyclos_api/client.rs @@ -0,0 +1,100 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::borrow::Cow; + +use reqwest::Method; +use serde_json::json; +use taler_common::types::amount::Decimal; + +use crate::cyclos_api::{ + api::{ApiResult, CyclosAuth, CyclosRequest}, + types::{Account, DataForTransaction, HistoryItem, Transaction, Transfer, User}, +}; + +pub struct Client<'a> { + pub client: &'a reqwest::Client, + pub api_url: &'a reqwest::Url, + pub auth: &'a CyclosAuth, +} + +impl Client<'_> { + fn request(&self, method: Method, path: impl Into<Cow<'static, str>>) -> CyclosRequest<'_> { + CyclosRequest::new(self.client, method, self.api_url, path, self.auth) + } + + pub async fn whoami(&self) -> ApiResult<User> { + self.request(Method::GET, "users/self") + .parse_json() + .await + } + + pub async fn accounts(&self) -> ApiResult<Vec<Account>> { + self.request(Method::GET, "self/accounts") + .parse_json() + .await + } + + pub async fn balance(&self, account_type_id: u64) -> ApiResult<Account> { + self.request(Method::GET, format!("self/accounts/{account_type_id}")) + .parse_json() + .await + } + + pub async fn payment_data(&self) -> ApiResult<DataForTransaction> { + self.request(Method::GET, "self/payments/data-for-perform") + .parse_json() + .await + } + + pub async fn direct_payment( + &self, + account_id: u64, + amount: Decimal, + description: &str, + ) -> ApiResult<Transaction> { + self.request(Method::POST, "self/payments") + .json(&json!({ + "description": description, + "scheduling": "direct", + "subject": account_id, + "amount": amount + })) + .parse_json() + .await + } + + pub async fn chargeback(&self, transfer_id: u64) -> ApiResult<u64> { + self.request(Method::POST, format!("transfers/{transfer_id}/chargeback")) + .parse_json() + .await + } + + pub async fn transfers(&self, account_type_id: u64) -> ApiResult<Vec<HistoryItem>> { + self.request( + Method::GET, + format!("self/accounts/{account_type_id}/history"), + ) + .parse_json() + .await + } + + pub async fn transfer(&self, transfer_id: u64) -> ApiResult<Transfer> { + self.request(Method::GET, format!("transfers/{transfer_id}")) + .parse_json() + .await + } +} diff --git a/taler-cyclos/src/cyclos_api/mod.rs b/taler-cyclos/src/cyclos_api/mod.rs @@ -0,0 +1,19 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +pub mod api; +pub mod client; +pub mod types; diff --git a/taler-cyclos/src/cyclos_api/types.rs b/taler-cyclos/src/cyclos_api/types.rs @@ -0,0 +1,428 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::collections::BTreeMap; + +use jiff::Timestamp; +use serde::Deserialize; +use taler_common::types::amount::Decimal; + +use crate::CyclosId; + +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Type { + pub id: CyclosId, + pub internal_name: String, + pub name: String, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Currency { + #[serde(flatten)] + pub ty: Type, + pub decimal_digits: u8, + pub suffix: String, + pub symbol: String, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct AccountStatus { + pub available_balance: Decimal, + pub balance: Decimal, + pub credit_limit: Decimal, + pub reserved_amount: Decimal, +} + +#[derive(Debug, Deserialize, Clone)] +#[serde(rename_all = "camelCase")] +pub struct Account { + pub id: CyclosId, + pub currency: Currency, + pub status: AccountStatus, + #[serde(rename = "type")] + pub ty: Type, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct User { + pub id: CyclosId, + pub name: String, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct PaymentData { + #[serde(flatten)] + pub ty: Type, + pub currency: Currency, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct DataForTransaction { + pub payment_type_data: PaymentData, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct RelatedAccount { + pub id: CyclosId, + #[serde(rename = "type")] + pub ty: Type, + #[serde(flatten)] + pub kind: AccountKind, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct HistoryItem { + pub id: CyclosId, + pub date: Timestamp, + pub amount: String, + #[serde(rename = "type")] + pub ty: Type, + pub description: Option<String>, + pub related_account: RelatedAccount, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct TransferView { + pub id: CyclosId, + pub date: Timestamp, + pub amount: Decimal, + #[serde(rename = "type")] + pub ty: Type, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Transfer { + pub id: CyclosId, + pub date: Timestamp, + pub amount: Decimal, + pub can_chargeback: bool, + pub kind: TransferKind, + pub currency: Currency, + pub chargeback_of: Option<TransferView>, + pub chargeback_by: Option<TransferView>, + #[serde(rename = "type")] + pub ty: Type, +} + +#[derive(Debug, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct Transaction { + pub id: CyclosId, + pub date: Timestamp, + pub amount: Decimal, + pub kind: TxKind, + pub currency: Currency, + #[serde(rename = "type")] + pub ty: Type, +} + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +/// Indicates the reason the transfer was created. +pub enum TransferKind { + /// A transfer generated by an account fee charge + AccountFee, + /// A transfer which either withdraws or credits funds of an account being disposed + BalanceDisposal, + /// A transfer which is a chargeback of another transfer + Chargeback, + /// An imported transfer + Import, + /// A transfer which is the initial credit for a newly created account + InitialCredit, + /// A transfer generated when processing a scheduled / recurring payment installment / occurrence + Installment, + /// A transfer generated by a direct payment or accepting a webshop order + Payment, + /// A transfer generated by a transfer fee charge + TransferFee, +} + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum TxKind { + Chargeback, + ExternalPayment, + Import, + Order, + Payment, + PaymentRequest, + RecurringPayment, + ScheduledPayment, + Ticket, +} + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum InvalidDeviceConfirmation { + InvalidConfirmation, + InvalidDevice, + MaxCheckAtemptsReached, +} + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct UserInfo { + pub id: CyclosId, + pub display: String, +} + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +#[serde(tag = "kind")] +pub enum AccountKind { + System, + User { user: UserInfo }, +} + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum UserStatus { + Active, + Blocked, + Disabled, + Pending, + Purged, + Removed, +} + +#[derive(Debug, serde::Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum PasswordStatus { + Active, + Disabled, + Expired, + IndefinitelyBlocked, + NeverCreated, + Pending, + Reset, + TemporarilyBlocked, +} + +#[derive(Debug, serde::Deserialize, thiserror::Error)] +#[serde(tag = "code", rename_all = "camelCase")] +pub enum UnauthorizedError { + #[error("blockedAccessClient - The access client used for access is blocked")] + BlockedAccessClient, + #[error("invalidAccessClient - The access client used for access is invalid")] + InvalidAccessClient, + #[error( + "invalidAccessToken '{error}' - The OAuth2 / OpenID Connect access token used for access is invalid" + )] + InvalidAccessToken { error: String }, + #[error( + "invalidChannelUsage - Attempt to login on a stateless-only channel, or use stateless in a stateful-only channel, or invoke as guest in a channel configuration which is only for users" + )] + InvalidChannelUsage, + #[error( + "invalidNetwork - Attempt to access a network that has been disabled, or a restricted global administrator is trying to login to a inaccessible network" + )] + InvalidNetwork, + #[error("loggedOut - The session token used for access is invalid")] + LoggedOut, + #[error( + "login {invalid_device_confirmation:?} {user_status:?} {pasword_status:?} - Either user identification (principal) or password are invalid. May have additional information, such as the user / password status" + )] + #[serde(rename = "camelCase")] + Login { + invalid_device_confirmation: InvalidDeviceConfirmation, + user_status: UserStatus, + pasword_status: PasswordStatus, + }, + #[error( + "missingAuthorization - Attempt to access an operation as guest, but the operation requires authentication" + )] + MissingAuthorization, + #[error( + "remoteAddressBlocked - The IP address being used for access has been blocked by exceeding tries with invalid users" + )] + RemoteAddressBlocked, + #[error( + "unauthorizedAddress - The user cannot access the system using an IP address that is not white-listed" + )] + UnauthorizedAddress, + #[error( + "unauthorizedUrl - The user's configuration demands access using a specific URL, and this access is being done using another one" + )] + UnauthorizedUrl, +} + +#[derive(Debug, serde::Deserialize, thiserror::Error)] +#[serde(tag = "code", rename_all = "camelCase")] +/// Error codes for 403 Forbidden HTTP status. +pub enum ForbiddenError { + #[error("blockedByTotp - The user was blocked for exceeding the TOTP attempts")] + BlockedByTotp, + #[error("devicePinRemoved - The device pin was removed by exceeding the allowed attempts")] + DevicePinRemoved, + #[error("disabledPassword - The password being used was disabled")] + DisabledPassword, + #[error("expiredPassword - The password being used has expired")] + ExpiredPassword, + #[error("illegalAction - Attempt to perform an action that is not allowed on this context")] + IllegalAction, + #[error("inaccessibleChannel - This channel cannot be accessed by the user")] + InaccessibleChannel, + #[error( + "inaccessibleGroup - An administrator logging in another user which is not of the managed groups. Should be handled generically, in case more group-specific login restrictions are added to Cyclos" + )] + InaccessibleGroup, + #[error( + "inaccessibleNetwork - A restricted global administrator is trying to login to a network they can't access" + )] + InaccessibleNetwork, + #[error( + "inaccessiblePrincipal - The used identification method (principal type) cannot be used in this channel" + )] + InaccessiblePrincipal, + #[error( + "indefinitelyBlocked - The password was indefinitely blocked by exceeding the allowed attempts" + )] + IndefinitelyBlocked, + #[error("invalidDeviceActivationCode - The device activation code was no valid")] + InvalidDeviceActivationCode, + #[error( + "invalidDeviceConfirmation - The device confirmation being used is invalid (normally as a confirmation password)" + )] + InvalidDeviceConfirmation, + #[error( + "invalidPassword - The password being used is invalid (normally the confirmation password)" + )] + InvalidPassword, + #[error("invalidTotp - A given TOTP is invalid")] + InvalidTotp, + #[error("loginConfirmation - The user needs to confirm the login before proceeding")] + LoginConfirmation, + #[error( + "operatorWithPendingAgreements - The operator cannot access because his owner member has pending agreements" + )] + OperatorWithPendingAgreements, + #[error("otpInvalidated - The OTP was invalidated")] + OtpInvalidated, + #[error( + "pendingAgreements - There is at least one agreement which needs to be accepted in order to access the system" + )] + PendingAgreements, + #[error( + "permissionDenied - The operation was denied because a required permission was not granted" + )] + PermissionDenied, + #[error("resetPassword - The password being used was manually reset")] + ResetPassword, + #[error( + "temporarilyBlocked - The password was temporarily blocked by exceeding the allowed attempts" + )] + TemporarilyBlocked, +} + +#[derive(Debug, serde::Deserialize, thiserror::Error)] +#[serde(tag = "code", rename_all = "camelCase")] +/// Error codes for 404 Service Unavailable entity HTTP status. It means that some required service couldn't be contacted +pub enum UnavailableError { + #[error("emailSending - An error has occurred trying to send the a required email")] + Emailsending, + #[error("smsSending - An error has occurred trying to send a required SMS message")] + Smssending, +} + +#[derive(Debug, serde::Deserialize, thiserror::Error)] +#[serde(tag = "code", rename_all = "camelCase")] +#[error("{entity_type} {key}")] +/// Error codes for 404 Not Found +pub struct NotFoundError { + entity_type: String, + key: String, +} + +#[derive(Debug, serde::Deserialize, thiserror::Error)] +#[serde(tag = "code", rename_all = "camelCase")] +/// Error types associated to the HTTP Status 500. +pub enum UnexpectedError { + #[error("buyVoucher - An error has occurred when buying a voucher")] + BuyVoucher, + #[error("forgottenPassword - An error has occurred when changing a forgotten password")] + ForgottenPassword, + #[error("general - An unexpected error has occurred")] + General, + #[error("initializeNfc - An error has occurred when initializing a NFC token")] + InitializeNfc, + #[error("nested - An error which has another internal error at a given property / index")] + Nested, + #[error("nfcAuth - An error has occurred when making an external NFC authentication")] + NfcAuth, + #[error("oidc - An error for an OpenID Connect / OAuth 2 operation")] + Oidc, + #[error("payment - An error has occurred when making a payment")] + Payment, + #[error("personalizeNfc - An error has occurred when personalizing a NFC token")] + PersonalizeNfc, + #[error("pos - An error has occurred when receiving a payment on a POS operation")] + Pos, + #[error("redeemVoucher - An error has occurred when redeeming a voucher")] + RedeemVoucher, + #[error("shoppingCart - An error has occurred when interacting with a shopping cart")] + ShoppingCart, + #[error("shoppingCartCheckout - An error has occurred when checking out a shopping cart")] + ShoppingCartCheckout, + #[error("topUpVoucher - An error has occurred on a voucher top-up")] + TopUpVoucher, +} + +#[derive(Debug, serde::Deserialize, thiserror::Error)] +#[serde(tag = "code", rename_all = "camelCase")] +/// Error codes for 422 Unprocessable entity HTTP status. It means there was an error with the input sent to the operation. +pub enum InputError { + #[error("aggregated - Represents an aggregation of other input errors")] + Aggregated, + #[error( + "dataConversion {value} - Some data conversion has failed. For example, when sending a date with an invalid format" + )] + DataConversion { value: String }, + #[error("fileUploadSize {max_file_size} - An uploaded file size exceeds the maximum allowed")] + #[serde(rename_all = "camelCase")] + FileUploadSize { max_file_size: usize }, + #[error( + "maxItems {max_items} - There was an attempt to create an item, but the maximum number of allowed items was exceeded" + )] + #[serde(rename_all = "camelCase")] + MaxItems { max_items: usize }, + #[error("missingParameter {name} - Missing a required request parameter")] + MissingParameter { name: String }, + #[error("queryParse {value} - A full-text query keywords contained an invalid text")] + QueryParse { value: String }, + #[error( + "validation {general_errors:?} {property_errors:?} - One or more of the fields sent contains invalid values" + )] + #[serde(rename_all = "camelCase")] + Validation { + #[serde(default)] + general_errors: Vec<String>, + #[serde(default)] + properties: Vec<String>, + #[serde(default)] + property_errors: BTreeMap<String, Vec<String>>, + }, +} diff --git a/taler-cyclos/src/db.rs b/taler-cyclos/src/db.rs @@ -0,0 +1,1420 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::fmt::Display; + +use jiff::Timestamp; +use serde::{Serialize, de::DeserializeOwned}; +use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow}; +use taler_api::{ + db::{BindHelper, IncomingType, TypeHelper, history, page}, + subject::{IncomingSubject, OutgoingSubject}, +}; +use taler_common::{ + api_params::{History, Page}, + api_revenue::RevenueIncomingBankTransaction, + api_wire::{ + IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, + TransferState, TransferStatus, + }, + types::{ + amount::{Currency, Decimal}, + payto::{PaytoImpl as _, PaytoURI}, + }, +}; +use tokio::sync::watch::Receiver; + +use crate::{CyclosId, FullCyclosPayto}; + +#[derive(Debug, Clone)] +pub struct TxIn { + pub id: u64, + pub amount: Decimal, + pub subject: String, + pub debtor: FullCyclosPayto, + pub valued_at: Timestamp, +} + +impl Display for TxIn { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + id, + amount, + subject, + debtor, + valued_at, + } = self; + write!( + f, + "{valued_at} {id} {amount} ({} {}) '{subject}'", + debtor.0, debtor.name + ) + } +} + +#[derive(Debug, Clone)] +pub struct TxOut { + pub id: u64, + pub amount: Decimal, + pub subject: String, + pub creditor: FullCyclosPayto, + pub valued_at: Timestamp, +} + +impl Display for TxOut { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + id, + amount, + subject, + creditor, + valued_at, + } = self; + write!( + f, + "{valued_at} {id} {amount} ({} {}) '{subject}'", + creditor.0, creditor.name + ) + } +} + +#[derive(Debug, PartialEq, Eq)] +pub struct Initiated { + pub id: u64, + pub amount: Decimal, + pub subject: String, + pub creditor: FullCyclosPayto, +} + +impl Display for Initiated { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + let Self { + id, + amount, + subject, + creditor, + } = self; + write!( + f, + "{id} {amount} ({} {}) '{subject}'", + creditor.0, creditor.name + ) + } +} + +#[derive(Debug, Clone)] +pub struct TxInAdmin { + pub amount: Decimal, + pub subject: String, + pub debtor: FullCyclosPayto, + pub metadata: IncomingSubject, +} + +#[derive(Debug, PartialEq, Eq)] +pub enum AddIncomingResult { + Success { + new: bool, + row_id: u64, + valued_at: Timestamp, + }, + ReservePubReuse, +} + +pub async fn register_tx_in_admin( + db: &PgPool, + tx: &TxInAdmin, + now: &Timestamp, +) -> sqlx::Result<AddIncomingResult> { + sqlx::query( + " + SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new + FROM register_tx_in(NULL, ($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8, $6) + ", + ) + .bind_decimal(&tx.amount) + .bind(&tx.subject) + .bind(tx.debtor.0 as i64) + .bind(&tx.debtor.name) + .bind_timestamp(now) + .bind(tx.metadata.ty()) + .bind(tx.metadata.key()) + .try_map(|r: PgRow| { + Ok(if r.try_get(0)? { + AddIncomingResult::ReservePubReuse + } else { + AddIncomingResult::Success { + row_id: r.try_get_u64(1)?, + valued_at: r.try_get_timestamp(2)?, + new: r.try_get(3)?, + } + }) + }) + .fetch_one(db) + .await +} + +pub async fn register_tx_in( + db: &mut PgConnection, + tx: &TxIn, + subject: &Option<IncomingSubject>, + now: &Timestamp, +) -> sqlx::Result<AddIncomingResult> { + sqlx::query( + " + SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new + FROM register_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10) + ", + ) + .bind(tx.id as i64) + .bind_decimal(&tx.amount) + .bind(&tx.subject) + .bind(tx.debtor.0 as i64) + .bind(&tx.debtor.name) + .bind(tx.valued_at.as_microsecond()) + .bind(subject.as_ref().map(|it| it.ty())) + .bind(subject.as_ref().map(|it| it.key())) + .bind(now.as_microsecond()) + .try_map(|r: PgRow| { + Ok(if r.try_get(0)? { + AddIncomingResult::ReservePubReuse + } else { + AddIncomingResult::Success { + row_id: r.try_get_u64(1)?, + valued_at: r.try_get_timestamp(2)?, + new: r.try_get(3)?, + } + }) + }) + .fetch_one(db) + .await +} + +#[derive(Debug)] +pub enum TxOutKind { + Simple, + Bounce(u64), + Talerable(OutgoingSubject), +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] +#[allow(non_camel_case_types)] +#[sqlx(type_name = "register_result")] +pub enum RegisterResult { + /// Already registered + idempotent, + /// Initiated transaction + known, + /// Recovered unknown outgoing transaction + recovered, +} + +#[derive(Debug, PartialEq, Eq)] +pub struct AddOutgoingResult { + pub result: RegisterResult, + pub row_id: u64, +} + +pub async fn register_tx_out( + db: &mut PgConnection, + tx: &TxOut, + kind: &TxOutKind, + now: &Timestamp, +) -> sqlx::Result<AddOutgoingResult> { + let query = sqlx::query( + " + SELECT out_result, out_tx_row_id + FROM register_tx_out($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10, $11) + ", + ) + .bind(tx.id as i64) + .bind_decimal(&tx.amount) + .bind(&tx.subject) + .bind(tx.creditor.0 as i64) + .bind(&tx.creditor.name) + .bind_timestamp(&tx.valued_at); + let query = match kind { + TxOutKind::Simple => query + .bind(None::<&[u8]>) + .bind(None::<&str>) + .bind(None::<i64>), + TxOutKind::Bounce(bounced) => query + .bind(None::<&[u8]>) + .bind(None::<&str>) + .bind(*bounced as i64), + TxOutKind::Talerable(subject) => query + .bind(subject.0.as_ref()) + .bind(subject.1.as_ref()) + .bind(None::<i64>), + }; + query + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(AddOutgoingResult { + result: r.try_get(0)?, + row_id: r.try_get_u64(1)?, + }) + }) + .fetch_one(db) + .await +} + +#[derive(Debug, PartialEq, Eq)] +pub enum TransferResult { + Success { id: u64, initiated_at: Timestamp }, + RequestUidReuse, + WtidReuse, +} + +pub async fn make_transfer<'a>( + db: impl PgExecutor<'a>, + req: &TransferRequest, + creditor: &FullCyclosPayto, + now: &Timestamp, +) -> sqlx::Result<TransferResult> { + let subject = format!("{} {}", req.wtid, req.exchange_base_url); + sqlx::query( + " + SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at + FROM taler_transfer($1, $2, $3, ($4, $5)::taler_amount, $6, $7, $8, $9) + ", + ) + .bind(req.request_uid.as_ref()) + .bind(req.wtid.as_ref()) + .bind(&subject) + .bind_amount(&req.amount) + .bind(req.exchange_base_url.as_str()) + .bind(creditor.0 as i64) + .bind(&creditor.name) + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(if r.try_get(0)? { + TransferResult::RequestUidReuse + } else if r.try_get(1)? { + TransferResult::WtidReuse + } else { + TransferResult::Success { + id: r.try_get_u64(2)?, + initiated_at: r.try_get_timestamp(3)?, + } + }) + }) + .fetch_one(db) + .await +} + +#[derive(Debug, PartialEq, Eq)] +pub struct BounceResult { + pub tx_id: u64, + pub tx_new: bool, +} + +pub async fn register_bounced_tx_in( + db: &mut PgConnection, + tx: &TxIn, + chargeback_id: u64, + reason: &str, + now: &Timestamp, +) -> sqlx::Result<BounceResult> { + sqlx::query( + " + SELECT out_tx_row_id, out_tx_new + FROM register_bounced_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10) + ", + ) + .bind(tx.id as i64) + .bind_decimal(&tx.amount) + .bind(&tx.subject) + .bind(tx.debtor.0 as i64) + .bind(&tx.debtor.name) + .bind_timestamp(&tx.valued_at) + .bind(chargeback_id as i64) + .bind(reason) + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(BounceResult { + tx_id: r.try_get_u64(0)?, + tx_new: r.try_get(1)?, + }) + }) + .fetch_one(db) + .await +} + +pub async fn transfer_page<'a>( + db: impl PgExecutor<'a>, + status: &Option<TransferState>, + currency: &Currency, + params: &Page, +) -> sqlx::Result<Vec<TransferListStatus>> { + page( + db, + "initiated_id", + params, + || { + let mut builder = QueryBuilder::new( + " + SELECT + initiated_id, + status, + (amount).val as amount_val, + (amount).frac as amount_frac, + credit_account, + credit_name, + initiated_at + FROM transfer + JOIN initiated USING (initiated_id) + WHERE + ", + ); + if let Some(status) = status { + builder.push(" status = ").push_bind(status).push(" AND "); + } + builder + }, + |r: PgRow| { + Ok(TransferListStatus { + row_id: r.try_get_safeu64(0)?, + status: r.try_get(1)?, + amount: r.try_get_amount_i(2, currency)?, + credit_account: r.try_get_cyclos_fullpaytouri(4, 5)?, + timestamp: r.try_get_timestamp(6)?.into(), + }) + }, + ) + .await +} + +pub async fn outgoing_history( + db: &PgPool, + params: &History, + currency: &Currency, + listen: impl FnOnce() -> Receiver<i64>, +) -> sqlx::Result<Vec<OutgoingBankTransaction>> { + history( + db, + "tx_out_id", + params, + listen, + || { + QueryBuilder::new( + " + SELECT + tx_out_id, + (amount).val as amount_val, + (amount).frac as amount_frac, + credit_account, + credit_name, + valued_at, + exchange_base_url, + wtid + FROM taler_out + JOIN tx_out USING (tx_out_id) + WHERE + ", + ) + }, + |r: PgRow| { + Ok(OutgoingBankTransaction { + row_id: r.try_get_safeu64(0)?, + amount: r.try_get_amount_i(1, currency)?, + credit_account: r.try_get_cyclos_fullpaytouri(3, 4)?, + date: r.try_get_timestamp(5)?.into(), + exchange_base_url: r.try_get_url(6)?, + wtid: r.try_get_base32(7)?, + }) + }, + ) + .await +} + +pub async fn incoming_history( + db: &PgPool, + params: &History, + currency: &Currency, + listen: impl FnOnce() -> Receiver<i64>, +) -> sqlx::Result<Vec<IncomingBankTransaction>> { + history( + db, + "tx_in_id", + params, + listen, + || { + QueryBuilder::new( + " + SELECT + type, + tx_in_id, + (amount).val as amount_val, + (amount).frac as amount_frac, + debit_account, + debit_name, + valued_at, + metadata + FROM taler_in + JOIN tx_in USING (tx_in_id) + WHERE + ", + ) + }, + |r: PgRow| { + Ok(match r.try_get(0)? { + IncomingType::reserve => IncomingBankTransaction::Reserve { + row_id: r.try_get_safeu64(1)?, + amount: r.try_get_amount_i(2, currency)?, + debit_account: r.try_get_cyclos_fullpaytouri(4, 5)?, + date: r.try_get_timestamp(6)?.into(), + reserve_pub: r.try_get_base32(7)?, + }, + IncomingType::kyc => IncomingBankTransaction::Kyc { + row_id: r.try_get_safeu64(1)?, + amount: r.try_get_amount_i(2, currency)?, + debit_account: r.try_get_cyclos_fullpaytouri(4, 5)?, + date: r.try_get_timestamp(6)?.into(), + account_pub: r.try_get_base32(7)?, + }, + IncomingType::wad => { + unimplemented!("WAD is not yet supported") + } + }) + }, + ) + .await +} + +pub async fn revenue_history( + db: &PgPool, + params: &History, + currency: &Currency, + listen: impl FnOnce() -> Receiver<i64>, +) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { + history( + db, + "tx_in_id", + params, + listen, + || { + QueryBuilder::new( + " + SELECT + tx_in_id, + valued_at, + (amount).val as amount_val, + (amount).frac as amount_frac, + debit_account, + debit_name, + subject + FROM tx_in + WHERE + ", + ) + }, + |r: PgRow| { + Ok(RevenueIncomingBankTransaction { + row_id: r.try_get_safeu64(0)?, + date: r.try_get_timestamp(1)?.into(), + amount: r.try_get_amount_i(2, currency)?, + credit_fee: None, + debit_account: r.try_get_cyclos_fullpaytouri(4, 5)?, + subject: r.try_get(6)?, + }) + }, + ) + .await +} + +pub async fn transfer_by_id<'a>( + db: impl PgExecutor<'a>, + id: u64, + currency: &Currency, +) -> sqlx::Result<Option<TransferStatus>> { + sqlx::query( + " + SELECT + status, + status_msg, + (amount).val as amount_val, + (amount).frac as amount_frac, + exchange_base_url, + wtid, + credit_account, + credit_name, + initiated_at + FROM transfer + JOIN initiated USING (initiated_id) + WHERE initiated_id = $1 + ", + ) + .bind(id as i64) + .try_map(|r: PgRow| { + Ok(TransferStatus { + status: r.try_get(0)?, + status_msg: r.try_get(1)?, + amount: r.try_get_amount_i(2, currency)?, + origin_exchange_url: r.try_get(4)?, + wtid: r.try_get_base32(5)?, + credit_account: r.try_get_cyclos_fullpaytouri(6, 7)?, + timestamp: r.try_get_timestamp(8)?.into(), + }) + }) + .fetch_optional(db) + .await +} + +/** Get a batch of pending initiated transactions not attempted since [start] */ +pub async fn pending_batch<'a>( + db: impl PgExecutor<'a>, + start: &Timestamp, +) -> sqlx::Result<Vec<Initiated>> { + sqlx::query( + " + SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name + FROM initiated + WHERE tx_id IS NULL + AND status='pending' + AND (last_submitted IS NULL OR last_submitted < $1) + LIMIT 100 + ", + ) + .bind_timestamp(start) + .try_map(|r: PgRow| { + Ok(Initiated { + id: r.try_get_u64(0)?, + amount: r.try_get_decimal(1, 2)?, + subject: r.try_get(3)?, + creditor: r.try_get_cyclos_fullpayto(4, 5)?, + }) + }) + .fetch_all(db) + .await +} + +/** Update status of a successful submitted initiated transaction */ +pub async fn initiated_submit_success<'a>( + db: impl PgExecutor<'a>, + initiated_id: u64, + timestamp: &Timestamp, + tx_id: u64, +) -> sqlx::Result<()> { + sqlx::query( + " + UPDATE initiated + SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2 + WHERE initiated_id=$3 + " + ).bind_timestamp(timestamp) + .bind(tx_id as i64) + .bind(initiated_id as i64) + .execute(db).await?; + Ok(()) +} + +/** Update status of a permanently failed initiated transaction */ +pub async fn initiated_submit_permanent_failure<'a>( + db: impl PgExecutor<'a>, + initiated_id: u64, + timestamp: &Timestamp, + msg: &str, +) -> sqlx::Result<()> { + sqlx::query( + " + UPDATE initiated + SET status='permanent_failure', status_msg=$2 + WHERE initiated_id=$3 + ", + ) + .bind_timestamp(timestamp) + .bind(msg) + .bind(initiated_id as i64) + .execute(db) + .await?; + Ok(()) +} + +#[derive(Debug, PartialEq, Eq)] +pub struct OutFailureResult { + pub initiated_id: Option<u64>, + pub new: bool, +} + +/** Update status of a charged back failed initiated transaction */ +pub async fn initiated_chargeback_failure( + db: &mut PgConnection, + code: u64, + bounced: Option<u32>, + now: &Timestamp, +) -> sqlx::Result<OutFailureResult> { + todo!(); + sqlx::query( + " + SELECT out_new, out_initiated_id + FROM register_tx_out_failure($1, $2, $3) + ", + ) + .bind(code as i64) + .bind(bounced.map(|i| i as i32)) + .bind_timestamp(now) + .try_map(|r: PgRow| { + Ok(OutFailureResult { + new: r.try_get(0)?, + initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64), + }) + }) + .fetch_one(db) + .await +} + +/** Get JSON value from KV table */ +pub async fn kv_get<'a, T: DeserializeOwned + Unpin + Send>( + db: impl PgExecutor<'a>, + key: &str, +) -> sqlx::Result<Option<T>> { + sqlx::query("SELECT value FROM kv WHERE key=$1") + .bind(key) + .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) + .fetch_optional(db) + .await +} + +/** Set JSON value in KV table */ +pub async fn kv_set<'a, T: Serialize>( + db: impl PgExecutor<'a>, + key: &str, + value: &T, +) -> sqlx::Result<()> { + sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") + .bind(key) + .bind(sqlx::types::Json(value)) + .execute(db) + .await?; + Ok(()) +} + +pub trait CyclosTypeHelper { + fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( + &self, + idx: I, + name: I, + ) -> sqlx::Result<FullCyclosPayto>; + fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( + &self, + idx: I, + name: I, + ) -> sqlx::Result<PaytoURI>; +} + +impl CyclosTypeHelper for PgRow { + fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( + &self, + idx: I, + name: I, + ) -> sqlx::Result<FullCyclosPayto> { + let idx = self.try_get_u64(idx)?; + let name = self.try_get(name)?; + Ok(FullCyclosPayto::new(CyclosId(idx), name)) + } + fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( + &self, + idx: I, + name: I, + ) -> sqlx::Result<PaytoURI> { + let idx = self.try_get_u64(idx)?; + let name = self.try_get(name)?; + Ok(CyclosId(idx).as_full_payto(name)) + } +} + +#[cfg(test)] +mod test { + use std::sync::LazyLock; + + use jiff::{Span, Timestamp}; + use serde_json::json; + use sqlx::{PgConnection, PgPool, postgres::PgRow}; + use taler_api::{ + db::TypeHelper, + subject::{IncomingSubject, OutgoingSubject}, + }; + use taler_common::{ + api_common::{EddsaPublicKey, HashCode, ShortHashCode}, + api_params::{History, Page}, + api_wire::TransferRequest, + types::{ + amount::{Currency, amount, decimal}, + payto::payto, + url, + utils::now_sql_stable_timestamp, + }, + }; + use tokio::sync::watch::Receiver; + + use crate::{ + constants::CONFIG_SOURCE, + cyclos_payto, + db::{ + self, AddIncomingResult, AddOutgoingResult, BounceResult, TransferResult, TxIn, + TxInAdmin, TxOut, TxOutKind, + }, + }; + + pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "TEST".parse().unwrap()); + + fn fake_listen<T: Default>() -> Receiver<T> { + tokio::sync::watch::channel(T::default()).1 + } + + async fn setup() -> (PgConnection, PgPool) { + let pool = taler_test_utils::db_test_setup(CONFIG_SOURCE).await; + let conn = pool.acquire().await.unwrap().leak(); + (conn, pool) + } + + #[tokio::test] + async fn kv() { + let (mut db, _) = setup().await; + + let value = json!({ + "name": "Mr Smith", + "no way": 32 + }); + + assert_eq!( + db::kv_get::<serde_json::Value>(&mut db, "value") + .await + .unwrap(), + None + ); + db::kv_set(&mut db, "value", &value).await.unwrap(); + db::kv_set(&mut db, "value", &value).await.unwrap(); + assert_eq!( + db::kv_get::<serde_json::Value>(&mut db, "value") + .await + .unwrap(), + Some(value) + ); + } + + #[tokio::test] + async fn tx_in() { + let (mut db, pool) = setup().await; + + async fn routine( + db: &mut PgConnection, + first: &Option<IncomingSubject>, + second: &Option<IncomingSubject>, + ) { + let id = sqlx::query("SELECT count(*) + 1 FROM tx_in") + .try_map(|r: PgRow| r.try_get_u64(0)) + .fetch_one(&mut *db) + .await + .unwrap(); + let now = now_sql_stable_timestamp(); + let later = now + Span::new().hours(2); + let tx = TxIn { + id: now.as_microsecond() as u64, + amount: decimal("10"), + subject: "subject".to_owned(), + debtor: cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"), + valued_at: now, + }; + // Insert + assert_eq!( + db::register_tx_in(db, &tx, &first, &now) + .await + .expect("register tx in"), + AddIncomingResult::Success { + new: true, + row_id: id, + valued_at: now + } + ); + // Idempotent + assert_eq!( + db::register_tx_in( + db, + &TxIn { + valued_at: later, + ..tx.clone() + }, + &first, + &now + ) + .await + .expect("register tx in"), + AddIncomingResult::Success { + new: false, + row_id: id, + valued_at: now + } + ); + // Many + assert_eq!( + db::register_tx_in( + db, + &TxIn { + id: later.as_microsecond() as u64, + valued_at: later, + ..tx + }, + &second, + &now + ) + .await + .expect("register tx in"), + AddIncomingResult::Success { + new: true, + row_id: id + 1, + valued_at: later + } + ); + } + + // Empty db + assert_eq!( + db::revenue_history(&pool, &History::default(), &CURRENCY, fake_listen) + .await + .unwrap(), + Vec::new() + ); + assert_eq!( + db::incoming_history(&pool, &History::default(), &CURRENCY, fake_listen) + .await + .unwrap(), + Vec::new() + ); + + // Regular transaction + routine(&mut db, &None, &None).await; + + // Reserve transaction + routine( + &mut db, + &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), + &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), + ) + .await; + + // Kyc transaction + routine( + &mut db, + &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), + &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), + ) + .await; + + // History + assert_eq!( + db::revenue_history(&pool, &History::default(), &CURRENCY, fake_listen) + .await + .unwrap() + .len(), + 6 + ); + assert_eq!( + db::incoming_history(&pool, &History::default(), &CURRENCY, fake_listen) + .await + .unwrap() + .len(), + 4 + ); + } + + #[tokio::test] + async fn tx_in_admin() { + let (_, pool) = setup().await; + + // Empty db + assert_eq!( + db::incoming_history(&pool, &History::default(), &CURRENCY, fake_listen) + .await + .unwrap(), + Vec::new() + ); + + let now = now_sql_stable_timestamp(); + let later = now + Span::new().hours(2); + let tx = TxInAdmin { + amount: decimal("10"), + subject: "subject".to_owned(), + debtor: cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"), + metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), + }; + // Insert + assert_eq!( + db::register_tx_in_admin(&pool, &tx, &now) + .await + .expect("register tx in"), + AddIncomingResult::Success { + new: true, + row_id: 1, + valued_at: now + } + ); + // Idempotent + assert_eq!( + db::register_tx_in_admin(&pool, &tx, &later) + .await + .expect("register tx in"), + AddIncomingResult::Success { + new: false, + row_id: 1, + valued_at: now + } + ); + // Many + assert_eq!( + db::register_tx_in_admin( + &pool, + &TxInAdmin { + subject: "Other".to_owned(), + metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), + ..tx.clone() + }, + &later + ) + .await + .expect("register tx in"), + AddIncomingResult::Success { + new: true, + row_id: 2, + valued_at: later + } + ); + + // History + assert_eq!( + db::incoming_history(&pool, &History::default(), &CURRENCY, fake_listen) + .await + .unwrap() + .len(), + 2 + ); + } + + #[tokio::test] + async fn tx_out() { + let (mut db, pool) = setup().await; + + async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) { + let id = sqlx::query("SELECT count(*) + 1 FROM tx_out") + .try_map(|r: PgRow| r.try_get_u64(0)) + .fetch_one(&mut *db) + .await + .unwrap(); + let now = now_sql_stable_timestamp(); + let later = now + Span::new().hours(2); + let tx = TxOut { + id, + amount: decimal("10"), + subject: "subject".to_owned(), + creditor: cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"), + valued_at: now, + }; + assert!(matches!( + db::make_transfer( + &mut *db, + &TransferRequest { + request_uid: HashCode::rand(), + amount: amount("HUF:10"), + exchange_base_url: url("https://exchange.test.com/"), + wtid: ShortHashCode::rand(), + credit_account: payto( + "payto://cyclos/31000163100000000?receiver-name=name" + ), + }, + &tx.creditor, + &now + ) + .await + .unwrap(), + TransferResult::Success { .. } + )); + db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), id) + .await + .expect("status success"); + + // Insert + assert_eq!( + db::register_tx_out(&mut *db, &tx, first, &now) + .await + .expect("register tx out"), + AddOutgoingResult { + result: db::RegisterResult::known, + row_id: id, + } + ); + // Idempotent + assert_eq!( + db::register_tx_out( + &mut *db, + &TxOut { + valued_at: later, + ..tx.clone() + }, + first, + &now + ) + .await + .expect("register tx out"), + AddOutgoingResult { + result: db::RegisterResult::idempotent, + row_id: id, + } + ); + // Recovered + assert_eq!( + db::register_tx_out( + &mut *db, + &TxOut { + id: id + 1, + valued_at: later, + ..tx.clone() + }, + second, + &now + ) + .await + .expect("register tx out"), + AddOutgoingResult { + result: db::RegisterResult::recovered, + row_id: id + 1, + } + ); + } + + // Empty db + assert_eq!( + db::outgoing_history(&pool, &History::default(), &CURRENCY, fake_listen) + .await + .unwrap(), + Vec::new() + ); + + // Regular transaction + routine(&mut db, &TxOutKind::Simple, &TxOutKind::Simple).await; + + // Talerable transaction + routine( + &mut db, + &TxOutKind::Talerable(OutgoingSubject( + ShortHashCode::rand(), + url("https://exchange.com"), + )), + &TxOutKind::Talerable(OutgoingSubject( + ShortHashCode::rand(), + url("https://exchange.com"), + )), + ) + .await; + + // Bounced transaction + routine(&mut db, &TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; + + // History + assert_eq!( + db::outgoing_history(&pool, &History::default(), &CURRENCY, fake_listen) + .await + .unwrap() + .len(), + 2 + ); + } + + // TODO tx out failure + + #[tokio::test] + async fn transfer() { + let (mut db, _) = setup().await; + + // Empty db + assert_eq!( + db::transfer_by_id(&mut db, 0, &CURRENCY).await.unwrap(), + None + ); + assert_eq!( + db::transfer_page(&mut db, &None, &CURRENCY, &Page::default()) + .await + .unwrap(), + Vec::new() + ); + + let req = TransferRequest { + request_uid: HashCode::rand(), + amount: amount("HUF:10"), + exchange_base_url: url("https://exchange.test.com/"), + wtid: ShortHashCode::rand(), + credit_account: payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), + }; + let payto = cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"); + let now = now_sql_stable_timestamp(); + let later = now + Span::new().hours(2); + // Insert + assert_eq!( + db::make_transfer(&mut db, &req, &payto, &now) + .await + .expect("transfer"), + TransferResult::Success { + id: 1, + initiated_at: now + } + ); + // Idempotent + assert_eq!( + db::make_transfer(&mut db, &req, &payto, &later) + .await + .expect("transfer"), + TransferResult::Success { + id: 1, + initiated_at: now + } + ); + // Request UID reuse + assert_eq!( + db::make_transfer( + &mut db, + &TransferRequest { + wtid: ShortHashCode::rand(), + ..req.clone() + }, + &payto, + &now + ) + .await + .expect("transfer"), + TransferResult::RequestUidReuse + ); + // wtid reuse + assert_eq!( + db::make_transfer( + &mut db, + &TransferRequest { + request_uid: HashCode::rand(), + ..req.clone() + }, + &payto, + &now + ) + .await + .expect("transfer"), + TransferResult::WtidReuse + ); + // Many + assert_eq!( + db::make_transfer( + &mut db, + &TransferRequest { + request_uid: HashCode::rand(), + wtid: ShortHashCode::rand(), + ..req + }, + &payto, + &later + ) + .await + .expect("transfer"), + TransferResult::Success { + id: 2, + initiated_at: later.into() + } + ); + + // Get + assert!( + db::transfer_by_id(&mut db, 1, &CURRENCY) + .await + .unwrap() + .is_some() + ); + assert!( + db::transfer_by_id(&mut db, 2, &CURRENCY) + .await + .unwrap() + .is_some() + ); + assert!( + db::transfer_by_id(&mut db, 3, &CURRENCY) + .await + .unwrap() + .is_none() + ); + assert_eq!( + db::transfer_page(&mut db, &None, &CURRENCY, &Page::default()) + .await + .unwrap() + .len(), + 2 + ); + } + + #[tokio::test] + async fn bounce() { + let (mut db, _) = setup().await; + + let amount = decimal("10"); + let payto = cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"); + let now = now_sql_stable_timestamp(); + + // Bounce + assert_eq!( + db::register_bounced_tx_in( + &mut db, + &TxIn { + id: 12, + amount, + subject: "subject".to_owned(), + debtor: payto.clone(), + valued_at: now + }, + 22, + "good reason", + &now + ) + .await + .expect("bounce"), + BounceResult { + tx_id: 1, + tx_new: true + } + ); + // Idempotent + assert_eq!( + db::register_bounced_tx_in( + &mut db, + &TxIn { + id: 12, + amount: amount.clone(), + subject: "subject".to_owned(), + debtor: payto.clone(), + valued_at: now + }, + 22, + "good reason", + &now + ) + .await + .expect("bounce"), + BounceResult { + tx_id: 1, + tx_new: false + } + ); + + // Many + assert_eq!( + db::register_bounced_tx_in( + &mut db, + &TxIn { + id: 13, + amount: amount.clone(), + subject: "subject".to_owned(), + debtor: payto.clone(), + valued_at: now + }, + 23, + "good reason", + &now + ) + .await + .expect("bounce"), + BounceResult { + tx_id: 2, + tx_new: true + } + ); + } + + #[tokio::test] + async fn batch() { + let (mut db, _) = setup().await; + let start = Timestamp::now(); + let cyclos_payto = cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"); + + // Empty db + let pendings = db::pending_batch(&mut db, &start) + .await + .expect("pending_batch"); + assert_eq!(pendings.len(), 0); + + // Some transfers + for i in 0..3 { + db::make_transfer( + &mut db, + &TransferRequest { + request_uid: HashCode::rand(), + amount: amount(format!("{}:{}", *CURRENCY, i + 1)), + exchange_base_url: url("https://exchange.test.com/"), + wtid: ShortHashCode::rand(), + credit_account: payto( + "payto://iban/HU02162000031000164800000000?receiver-name=name", + ), + }, + &cyclos_payto, + &Timestamp::now(), + ) + .await + .expect("transfer"); + } + let pendings = db::pending_batch(&mut db, &start) + .await + .expect("pending_batch"); + assert_eq!(pendings.len(), 3); + + // Max 100 txs in batch + for i in 0..100 { + db::make_transfer( + &mut db, + &TransferRequest { + request_uid: HashCode::rand(), + amount: amount(format!("{}:{}", *CURRENCY, i + 1)), + exchange_base_url: url("https://exchange.test.com/"), + wtid: ShortHashCode::rand(), + credit_account: payto( + "payto://iban/HU02162000031000164800000000?receiver-name=name", + ), + }, + &cyclos_payto, + &Timestamp::now(), + ) + .await + .expect("transfer"); + } + let pendings = db::pending_batch(&mut db, &start) + .await + .expect("pending_batch"); + assert_eq!(pendings.len(), 100); + + // Skip uploaded + for i in 0..=10 { + db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) + .await + .expect("status success"); + } + let pendings = db::pending_batch(&mut db, &start) + .await + .expect("pending_batch"); + assert_eq!(pendings.len(), 93); + + // Skip failed + for i in 0..=10 { + db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure") + .await + .expect("status failure"); + } + let pendings = db::pending_batch(&mut db, &start) + .await + .expect("pending_batch"); + assert_eq!(pendings.len(), 83); + } +} diff --git a/taler-cyclos/src/lib.rs b/taler-cyclos/src/lib.rs @@ -0,0 +1,95 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::{fmt::Display, num::ParseIntError, ops::Deref, str::FromStr}; + +use taler_common::types::payto::{FullPayto, Payto, PaytoErr, PaytoImpl, PaytoURI, TransferPayto}; + +pub mod config; +pub mod constants; +pub mod cyclos_api; +pub mod db; +pub mod worker; + +#[derive( + Debug, Clone, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay, +)] +pub struct CyclosId(pub u64); + +impl Deref for CyclosId { + type Target = u64; + + fn deref(&self) -> &Self::Target { + &self.0 + } +} + +impl Display for CyclosId { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + self.0.fmt(f) + } +} + +#[derive(Debug, thiserror::Error)] +#[error("malformed cyclos id: {0}")] +pub struct CyclosIdError(ParseIntError); + +impl FromStr for CyclosId { + type Err = CyclosIdError; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + Ok(Self(u64::from_str(s).map_err(CyclosIdError)?)) + } +} + +const CYCLOS: &str = "cyclos"; + +#[derive(Debug, thiserror::Error)] +#[error("missing cyclos account id in path")] +pub struct MissingCyclos; + +impl PaytoImpl for CyclosId { + fn as_payto(&self) -> PaytoURI { + PaytoURI::from_parts(CYCLOS, format_args!("/{}", self.0)) + } + + fn parse(raw: &PaytoURI) -> Result<Self, PaytoErr> { + let url = raw.as_ref(); + if url.domain() != Some(CYCLOS) { + return Err(PaytoErr::UnsupportedKind( + CYCLOS, + url.domain().unwrap_or_default().to_owned(), + )); + } + let Some(mut segments) = url.path_segments() else { + return Err(PaytoErr::custom(MissingCyclos)); + }; + let Some(first) = segments.next() else { + return Err(PaytoErr::custom(MissingCyclos)); + }; + + CyclosId::from_str(first).map_err(PaytoErr::custom) + } +} + +/// Parse a cyclos payto URI, panic if malformed +pub fn cyclos_payto(url: impl AsRef<str>) -> FullCyclosPayto { + url.as_ref().parse().expect("invalid cyclos payto") +} + +pub type CyclosPayto = Payto<CyclosId>; +pub type FullCyclosPayto = FullPayto<CyclosId>; +pub type TransferCyclosPayto = TransferPayto<CyclosId>; diff --git a/taler-cyclos/src/main.rs b/taler-cyclos/src/main.rs @@ -0,0 +1,19 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +pub fn main() { + // TODO +} diff --git a/taler-cyclos/src/worker.rs b/taler-cyclos/src/worker.rs @@ -0,0 +1,299 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use jiff::Timestamp; +use sqlx::PgConnection; +use taler_api::subject::{self, parse_incoming_unstructured}; +use taler_common::types::amount::{self, Currency}; +use tracing::{debug, info, trace, warn}; + +use crate::{ + FullCyclosPayto, + config::AccountType, + cyclos_api::{ + api::ApiErr, + client::Client, + types::{AccountKind, HistoryItem}, + }, + db::{self, AddIncomingResult, RegisterResult, TxIn, TxOut, TxOutKind}, +}; + +#[derive(Debug, thiserror::Error)] +pub enum WorkerError { + #[error(transparent)] + Db(#[from] sqlx::Error), + #[error(transparent)] + Api(#[from] ApiErr), + //#[error(transparent)] + //Injected(#[from] InjectedErr), +} + +pub type WorkerResult = Result<(), WorkerError>; + +pub struct Worker<'a> { + pub client: &'a Client<'a>, + pub db: &'a mut PgConnection, + pub currency: Currency, + pub account_type_id: u64, + pub account_type: AccountType, +} + +impl Worker<'_> { + /// Run a single worker pass + pub async fn run(&mut self) -> WorkerResult { + // Sync transactions + //let mut next: Option<Next> = None; //kv_get(&mut *self.db, TXS_CURSOR_KEY).await?; TODO cursor logic is broken and cannot be stored & reused + + loop { + let transfers = self.client.transfers(self.account_type_id).await?; + for transfer in transfers { + let tx = extract_tx_info(transfer); + match tx { + Tx::In(tx_in) => self.ingest_in(tx_in).await?, + Tx::Out(tx_out) => self.ingest_out(tx_out).await?, + } + } + + break; // TODO pagination + } + + // Send transactions + let start = Timestamp::now(); + loop { + let batch = db::pending_batch(&mut *self.db, &start).await?; + if batch.is_empty() { + break; + } + for tx in batch { + debug!(target: "worker", "send tx {tx}"); + let res = self + .client + .direct_payment(tx.creditor.0, tx.amount, &tx.subject) + .await; + match res { + Ok(_) => todo!(), + Err(_) => todo!(), + } + // TODO store success + } + } + Ok(()) + } + + /// Ingest an incoming transaction + async fn ingest_in(&mut self, tx: TxIn) -> WorkerResult { + match self.account_type { + AccountType::Exchange => { + let transfer = self.client.transfer(tx.id).await?; + let bounce = async |db: &mut PgConnection, + reason: &str| + -> Result<(), WorkerError> { + // Fetch existing transaction + if let Some(chargeback) = transfer.chargeback_by { + let res = db::register_bounced_tx_in( + db, + &tx, + *chargeback.id, + reason, + &Timestamp::now(), + ) + .await?; + if res.tx_new { + info!(target: "worker", + "in {tx} bounced (recovered) in {}: {reason}", chargeback.id + ); + } else { + trace!(target: "worker", + "in {tx} already seen and bounced in {}: {reason}",chargeback.id + ); + } + } else if !transfer.can_chargeback { + match db::register_tx_in(db, &tx, &None, &Timestamp::now()).await? { + AddIncomingResult::Success { new, .. } => { + if new { + info!(target: "worker", "in {tx} cannot bounce: {reason}"); + } else { + trace!(target: "worker", "in {tx} already seen and cannot bounce "); + } + } + AddIncomingResult::ReservePubReuse => unreachable!(), + } + } else { + let chargeback_id = self.client.chargeback(*transfer.id).await?; + let res = db::register_bounced_tx_in( + db, + &tx, + chargeback_id, + reason, + &Timestamp::now(), + ) + .await?; + if res.tx_new { + info!(target: "worker", "in {tx} bounced in {chargeback_id}: {reason}"); + } else { + trace!(target: "worker", "in {tx} already seen and bounced in {chargeback_id}: {reason}"); + } + } + Ok(()) + }; + if let Some(chargeback) = transfer.chargeback_of { + warn!("{tx} - This is a transaction failure, we need to handle it"); + return Ok(()); + } + match parse_incoming_unstructured(&tx.subject) { + Ok(None) => bounce(self.db, "missing public key").await?, + Ok(Some(subject)) => { + match db::register_tx_in(self.db, &tx, &Some(subject), &Timestamp::now()) + .await? + { + AddIncomingResult::Success { new, .. } => { + if new { + info!(target: "worker", "in {tx}"); + } else { + trace!(target: "worker", "in {tx} already seen"); + } + } + AddIncomingResult::ReservePubReuse => { + bounce(self.db, "reserve pub reuse").await? + } + } + info!(target: "worker", "in {tx}"); + } + Err(e) => bounce(self.db, &e.to_string()).await?, + } + } + AccountType::Normal => { + match db::register_tx_in(self.db, &tx, &None, &Timestamp::now()).await? { + AddIncomingResult::Success { new, .. } => { + if new { + info!(target: "worker", "in {tx}"); + } else { + trace!(target: "worker", "in {tx} already seen"); + } + } + AddIncomingResult::ReservePubReuse => unreachable!(), + } + } + } + Ok(()) + } + + async fn ingest_out(&mut self, tx: TxOut) -> WorkerResult { + match self.account_type { + AccountType::Exchange => { + let transfer = self.client.transfer(tx.id).await?; + + let kind = if let Ok(subject) = subject::parse_outgoing(&tx.subject) { + TxOutKind::Talerable(subject) + } else if let Some(chargeback) = transfer.chargeback_of { + TxOutKind::Bounce(*chargeback.id) + } else { + TxOutKind::Simple + }; + + if let Some(chargeback) = transfer.chargeback_by { + warn!("{tx} - This is a transaction failure, we need to handle it"); + return Ok(()); + } + + let res = db::register_tx_out(self.db, &tx, &kind, &Timestamp::now()).await?; + match res.result { + RegisterResult::idempotent => match kind { + TxOutKind::Simple => { + trace!(target: "worker", "out malformed {tx} already seen") + } + TxOutKind::Bounce(_) => { + trace!(target: "worker", "out bounce {tx} already seen") + } + TxOutKind::Talerable(_) => { + trace!(target: "worker", "out {tx} already seen") + } + }, + RegisterResult::known => match kind { + TxOutKind::Simple => { + warn!(target: "worker", "out malformed {tx}") + } + TxOutKind::Bounce(_) => { + info!(target: "worker", "out bounce {tx}") + } + TxOutKind::Talerable(_) => { + info!(target: "worker", "out {tx}") + } + }, + RegisterResult::recovered => match kind { + TxOutKind::Simple => { + warn!(target: "worker", "out malformed (recovered) {tx}") + } + TxOutKind::Bounce(_) => { + warn!(target: "worker", "out bounce (recovered) {tx}") + } + TxOutKind::Talerable(_) => { + warn!(target: "worker", "out (recovered) {tx}") + } + }, + } + } + AccountType::Normal => { + let res = db::register_tx_out(self.db, &tx, &TxOutKind::Simple, &Timestamp::now()) + .await?; + match res.result { + RegisterResult::idempotent => { + trace!(target: "worker", "out {tx} already seen"); + } + RegisterResult::known => { + info!(target: "worker", "out {tx}"); + } + RegisterResult::recovered => { + warn!(target: "worker", "out (recovered) {tx}"); + } + } + } + } + Ok(()) + } +} + +pub enum Tx { + In(TxIn), + Out(TxOut), +} + +pub fn extract_tx_info(tx: HistoryItem) -> Tx { + let amount = amount::decimal(tx.amount.trim_start_matches('-')); + let payto = match tx.related_account.kind { + AccountKind::System => { + FullCyclosPayto::new(tx.related_account.ty.id, tx.related_account.ty.name) + } + AccountKind::User { user } => FullCyclosPayto::new(user.id, user.display), + }; + if tx.amount.starts_with("-") { + Tx::Out(TxOut { + id: *tx.id, + amount, + subject: tx.description.unwrap_or_default(), + creditor: payto, + valued_at: tx.date, + }) + } else { + Tx::In(TxIn { + id: *tx.id, + amount, + subject: tx.description.unwrap_or_default(), + debtor: payto, + valued_at: tx.date, + }) + } +} diff --git a/taler-magnet-bank/db/magnet-bank-procedures.sql b/taler-magnet-bank/db/magnet-bank-procedures.sql @@ -138,66 +138,67 @@ FROM tx_out WHERE magnet_code = in_code; IF FOUND THEN out_result = 'idempotent'; + RETURN; +END IF; + +-- Insert new outgoing transaction +INSERT INTO tx_out ( + magnet_code, + amount, + subject, + credit_account, + credit_name, + valued_at, + registered_at +) VALUES ( + in_code, + in_amount, + in_subject, + in_credit_account, + in_credit_name, + in_valued_at, + in_now +) +RETURNING tx_out_id INTO out_tx_row_id; +-- Notify new outgoing transaction registration +PERFORM pg_notify('tx_out', out_tx_row_id || ''); + +-- Update initiated status +UPDATE initiated +SET + tx_out_id = out_tx_row_id, + status = 'success', + status_msg = NULL +WHERE magnet_code = in_code; +IF FOUND THEN + out_result = 'known'; ELSE - -- Insert new outgoing transaction - INSERT INTO tx_out ( - magnet_code, - amount, - subject, - credit_account, - credit_name, - valued_at, - registered_at - ) VALUES ( - in_code, - in_amount, - in_subject, - in_credit_account, - in_credit_name, - in_valued_at, - in_now - ) - RETURNING tx_out_id INTO out_tx_row_id; - -- Notify new outgoing transaction registration - PERFORM pg_notify('tx_out', out_tx_row_id || ''); + out_result = 'recovered'; +END IF; - -- Update initiated status +IF in_wtid IS NOT NULL THEN + -- Insert new outgoing talerable transaction + INSERT INTO taler_out ( + tx_out_id, + wtid, + exchange_base_url + ) VALUES ( + out_tx_row_id, + in_wtid, + in_origin_exchange_url + ) ON CONFLICT (wtid) DO NOTHING; + IF FOUND THEN + -- Notify new outgoing talerable transaction registration + PERFORM pg_notify('taler_out', out_tx_row_id || ''); + END IF; +ELSIF in_bounced IS NOT NULL THEN UPDATE initiated - SET + SET tx_out_id = out_tx_row_id, status = 'success', status_msg = NULL - WHERE magnet_code = in_code; - IF FOUND THEN - out_result = 'known'; - ELSE - out_result = 'recovered'; - END IF; - - IF in_wtid IS NOT NULL THEN - -- Insert new outgoing talerable transaction - INSERT INTO taler_out ( - tx_out_id, - wtid, - exchange_base_url - ) VALUES ( - out_tx_row_id, - in_wtid, - in_origin_exchange_url - ) ON CONFLICT (wtid) DO NOTHING; - IF FOUND THEN - -- Notify new outgoing talerable transaction registration - PERFORM pg_notify('taler_out', out_tx_row_id || ''); - END IF; - ELSIF in_bounced IS NOT NULL THEN - UPDATE initiated - SET - tx_out_id = out_tx_row_id, - status = 'success', - status_msg = NULL - FROM bounced JOIN tx_in USING (tx_in_id) - WHERE initiated.initiated_id = bounced.initiated_id AND tx_in.magnet_code = in_bounced; - END IF; + FROM bounced JOIN tx_in USING (tx_in_id) + WHERE initiated.initiated_id = bounced.initiated_id AND tx_in.magnet_code = in_bounced; END IF; END $$; COMMENT ON FUNCTION register_tx_out IS 'Register an outgoing transaction idempotently'; diff --git a/taler-magnet-bank/src/magnet_api/api.rs b/taler-magnet-bank/src/magnet_api/api.rs @@ -18,6 +18,7 @@ use std::borrow::Cow; use reqwest::{Client, Method, RequestBuilder, Response, StatusCode, Url, header}; use serde::{Deserialize, Serialize, de::DeserializeOwned}; +use taler_common::error::FmtSource; use thiserror::Error; use tracing::Level; @@ -67,40 +68,9 @@ pub enum ErrKind { StatusCause(StatusCode, String), } -#[derive(Debug)] -pub struct FmtSource<E: std::error::Error>(E); - -fn fmt_with_source( - f: &mut std::fmt::Formatter<'_>, - mut e: &dyn std::error::Error, -) -> std::fmt::Result { - loop { - write!(f, "{}", &e)?; - if let Some(source) = e.source() { - write!(f, ": ")?; - e = source; - } else { - return Ok(()); - } - } -} - -impl<E: std::error::Error> std::fmt::Display for FmtSource<E> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - fmt_with_source(f, &self.0) - } -} - -impl<E: std::error::Error> From<E> for FmtSource<E> { - fn from(value: E) -> Self { - Self(value) - } -} - impl From<reqwest::Error> for ErrKind { fn from(value: reqwest::Error) -> Self { - // We remove the URL as we already provide the API path - Self::Transport(FmtSource(value.without_url())) + Self::Transport(value.into()) } } diff --git a/taler-magnet-bank/src/magnet_api.rs b/taler-magnet-bank/src/magnet_api/mod.rs diff --git a/taler-magnet-bank/src/worker.rs b/taler-magnet-bank/src/worker.rs @@ -114,7 +114,7 @@ impl Worker<'_> { if new { info!(target: "worker", "in {tx_in} skip bounce: {reason}"); } else { - trace!(target: "worker", "in {tx_in} already skil bounce "); + trace!(target: "worker", "in {tx_in} already skip bounce "); } } AddIncomingResult::ReservePubReuse => unreachable!(), @@ -225,13 +225,13 @@ impl Worker<'_> { match res.result { RegisterResult::idempotent => match kind { TxOutKind::Simple => { - trace!(target: "worker", "out malformed {tx_out} already see") + trace!(target: "worker", "out malformed {tx_out} already seen") } TxOutKind::Bounce(_) => { - trace!(target: "worker", "out bounce {tx_out} already see") + trace!(target: "worker", "out bounce {tx_out} already seen") } TxOutKind::Talerable(_) => { - trace!(target: "worker", "out {tx_out} already see") + trace!(target: "worker", "out {tx_out} already seen") } }, RegisterResult::known => match kind {