taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit 3d298ba6af0174cabf1a811c27a784d9129f30b0
parent 64112a95a5942591bc42647098630c7bc21ed173
Author: Antoine A <>
Date:   Thu, 13 Feb 2025 14:09:49 +0100

common: sql migration

Diffstat:
MMakefile | 2++
Mcommon/taler-api/tests/api.rs | 2+-
Mcommon/taler-api/tests/common/mod.rs | 5-----
Dcommon/taler-api/tests/schema.sql | 164-------------------------------------------------------------------------------
Mcommon/taler-common/Cargo.toml | 2+-
Acommon/taler-common/src/db.rs | 140+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mcommon/taler-common/src/lib.rs | 1+
Mcommon/taler-test-utils/src/lib.rs | 42++++++++++++++++++++++++++++++------------
Adatabase-versioning/magnet-bank-0001.sql | 112+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adatabase-versioning/magnet-bank-drop.sql | 30++++++++++++++++++++++++++++++
Adatabase-versioning/magnet-bank-procedures.sql | 325+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adatabase-versioning/taler-api-0001.sql | 66++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adatabase-versioning/taler-api-drop.sql | 30++++++++++++++++++++++++++++++
Adatabase-versioning/taler-api-procedures.sql | 140+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Adatabase-versioning/versioning.sql | 294+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtaler-magnet-bank/Cargo.toml | 35+++++++++++++++++++++++++++++++----
Dtaler-magnet-bank/db/schema.sql | 392-------------------------------------------------------------------------------
Mtaler-magnet-bank/magnet-bank.conf | 8++++++--
Mtaler-magnet-bank/src/config.rs | 2++
Mtaler-magnet-bank/src/db.rs | 20++------------------
Mtaler-magnet-bank/src/main.rs | 14+++++++-------
Mtaler-magnet-bank/tests/api.rs | 3+--
22 files changed, 1221 insertions(+), 608 deletions(-)

diff --git a/Makefile b/Makefile @@ -21,6 +21,8 @@ build: .PHONY: install-nobuild install-nobuild: install -m 644 -D -t $(config_dir) taler-magnet-bank/magnet-bank.conf + install -m 644 -D -t $(sql_dir) database-versioning/versioning.sql + install -m 644 -D -t $(sql_dir) database-versioning/magnet-bank*.sql install -D -t $(bin_dir) contrib/taler-magnet-bank-dbconfig install -D -t $(bin_dir) target/release/taler-magnet-bank diff --git a/common/taler-api/tests/api.rs b/common/taler-api/tests/api.rs @@ -34,7 +34,7 @@ use taler_test_utils::{ mod common; async fn setup() -> (TestServer, PgPool) { - let pool = db_test_setup().await; + let pool = db_test_setup("taler-api").await; let api = test_api(pool.clone(), "EUR".to_string()).await; let server = TestServer::new(api.finalize()).unwrap(); diff --git a/common/taler-api/tests/common/mod.rs b/common/taler-api/tests/common/mod.rs @@ -166,11 +166,6 @@ impl Revenue for TestApi { } pub async fn test_api(pool: PgPool, currency: String) -> TalerApiBuilder { - // Reset db - sqlx::raw_sql(include_str!("../schema.sql")) - .execute(&pool) - .await - .unwrap(); let outgoing_channel = Sender::new(0); let incoming_channel = Sender::new(0); let wg = TestApi { diff --git a/common/taler-api/tests/schema.sql b/common/taler-api/tests/schema.sql @@ -1,163 +0,0 @@ --- --- This file is part of TALER --- Copyright (C) 2024-2025 Taler Systems SA --- --- TALER is free software; you can redistribute it and/or modify it under the --- terms of the GNU General Public License as published by the Free Software --- Foundation; either version 3, or (at your option) any later version. --- --- TALER is distributed in the hope that it will be useful, but WITHOUT ANY --- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR --- A PARTICULAR PURPOSE. See the GNU General Public License for more details. --- --- You should have received a copy of the GNU General Public License along with --- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> - -DROP SCHEMA public CASCADE; -CREATE SCHEMA public; - -CREATE TYPE taler_amount - AS (val INT8, frac INT4); -COMMENT ON TYPE taler_amount - IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; - -CREATE TYPE transfer_status AS ENUM ( - 'pending' - ,'transient_failure' - ,'permanent_failure' - ,'success' -); - -CREATE TABLE transfers ( - transfer_id INT8 PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY - ,amount taler_amount NOT NULL - ,exchange_base_url TEXT NOT NULL - ,subject TEXT NOT NULL - ,credit_payto TEXT NOT NULL - ,request_uid BYTEA UNIQUE NOT NULL CHECK (LENGTH(request_uid)=64) - ,wtid BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid)=32) - ,status transfer_status NOT NULL - ,status_msg TEXT - ,transfer_time INT8 NOT NULL -); - -CREATE TYPE incoming_type AS ENUM - ('reserve' ,'kyc', 'wad'); -CREATE TABLE incoming_transactions ( - incoming_transaction_id INT8 PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY - ,amount taler_amount NOT NULL - ,subject TEXT NOT NULL - ,debit_payto TEXT NOT NULL - ,creation_time INT8 NOT NULL - ,type incoming_type NOT NULL - ,metadata BYTEA NOT NULL - ,origin_exchange_url TEXT - ,CONSTRAINT polymorphism CHECK( - CASE type - WHEN 'wad' THEN LENGTH(metadata)=24 AND origin_exchange_url IS NOT NULL - ELSE LENGTH(metadata)=32 AND origin_exchange_url IS NULL - END - ) -); - -CREATE UNIQUE INDEX incoming_transactions_unique_reserve_pub ON incoming_transactions (metadata) WHERE type = 'reserve'; - -CREATE FUNCTION taler_transfer( - IN in_amount taler_amount, - IN in_exchange_base_url TEXT, - IN in_subject TEXT, - IN in_credit_payto TEXT, - IN in_request_uid BYTEA, - IN in_wtid BYTEA, - IN in_timestamp INT8, - -- Error status - OUT out_request_uid_reuse BOOLEAN, - -- Success return - OUT out_tx_row_id INT8, - OUT out_timestamp INT8 -) -LANGUAGE plpgsql AS $$ -BEGIN --- Check for idempotence and conflict -SELECT (amount != in_amount - OR credit_payto != in_credit_payto - OR exchange_base_url != in_exchange_base_url - OR wtid != in_wtid) - ,transfer_id, transfer_time - INTO out_request_uid_reuse, out_tx_row_id, out_timestamp - FROM transfers - WHERE request_uid = in_request_uid; -IF found THEN - RETURN; -END IF; -out_request_uid_reuse=false; -out_timestamp=in_timestamp; --- Register exchange -INSERT INTO transfers ( - amount, - exchange_base_url, - subject, - credit_payto, - request_uid, - wtid, - status, - status_msg, - transfer_time -) VALUES ( - in_amount, - in_exchange_base_url, - in_subject, - in_credit_payto, - in_request_uid, - in_wtid, - 'success', - NULL, - in_timestamp -) RETURNING transfer_id INTO out_tx_row_id; --- Notify new transaction -PERFORM pg_notify('outgoing_tx', out_tx_row_id || ''); -END $$; -COMMENT ON FUNCTION taler_transfer IS 'Create an outgoing taler transaction and register it'; - -CREATE FUNCTION add_incoming( - IN in_key BYTEA, - IN in_subject TEXT, - IN in_amount taler_amount, - IN in_debit_payto TEXT, - IN in_timestamp INT8, - IN in_type incoming_type, - -- Error status - OUT out_reserve_pub_reuse BOOLEAN, - -- Success return - OUT out_tx_row_id INT8 -) -LANGUAGE plpgsql AS $$ -BEGIN --- Check conflict -SELECT in_type = 'reserve'::incoming_type AND EXISTS(SELECT FROM incoming_transactions WHERE metadata = in_key AND type = 'reserve') - INTO out_reserve_pub_reuse; -IF out_reserve_pub_reuse THEN - RETURN; -END IF; --- Register incoming transaction -INSERT INTO incoming_transactions ( - amount, - debit_payto, - creation_time, - subject, - type, - metadata, - origin_exchange_url -) VALUES ( - in_amount, - in_debit_payto, - in_timestamp, - in_subject, - in_type, - in_key, - NULL -) RETURNING incoming_transaction_id INTO out_tx_row_id; --- Notify new incoming transaction -PERFORM pg_notify('incoming_tx', out_tx_row_id || ''); -END $$; -COMMENT ON FUNCTION add_incoming IS 'Create an incoming taler transaction and register it'; -\ No newline at end of file diff --git a/common/taler-common/Cargo.toml b/common/taler-common/Cargo.toml @@ -25,7 +25,7 @@ clap.workspace = true anyhow.workspace = true tracing-subscriber.workspace = true tokio = { workspace = true, features = ["rt-multi-thread"] } -sqlx = { workspace = true, features = ["macros", "postgres"] } +sqlx = { workspace = true, features = ["macros"] } [dev-dependencies] criterion.workspace = true diff --git a/common/taler-common/src/db.rs b/common/taler-common/src/db.rs @@ -0,0 +1,140 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::{ + io::ErrorKind, + path::{Path, PathBuf}, + sync::Arc, +}; + +use sqlx::{postgres::PgRow, PgConnection}; +use sqlx::{ + postgres::{PgConnectOptions, PgPoolOptions}, + Connection, Executor, PgPool, Row, +}; +use tracing::{debug, info}; + +/* ----- Pool ----- */ + +pub async fn pool(cfg: PgConnectOptions, schema: &str) -> sqlx::Result<PgPool> { + let init_sql = Arc::new(format!( + "SET search_path TO {schema};SET default_transaction_isolation = 'serializable';" + )); + let pool = PgPoolOptions::new() + .after_connect(move |conn, _meta| { + let init_sql = init_sql.clone(); + Box::pin(async move { + conn.execute(init_sql.as_str()).await?; + + Ok(()) + }) + }) + .connect_with(cfg) + .await?; + + // TODO check postgresql version ? + + Ok(pool) +} + +/* ----- Migration----- */ + +#[derive(Debug, thiserror::Error)] +pub enum MigrationErr { + #[error("could not read patch at '{}': {}", .0.to_string_lossy(), .1.kind())] + Io(PathBuf, std::io::Error), + #[error(transparent)] + Sql(#[from] sqlx::Error), +} + +pub async fn dbinit( + conn: &mut PgConnection, + sql_dir: &Path, + prefix: &str, + reset: bool, +) -> Result<(), MigrationErr> { + let mut tx = conn.begin().await?; + + async fn exec_sql_file( + conn: &mut PgConnection, + sql_dir: &Path, + file: &str, + action: &str, + ) -> Result<(), MigrationErr> { + let path = sql_dir.join(file); + match std::fs::read_to_string(&path) { + Ok(content) => { + info!("applying {action}"); + sqlx::raw_sql(&content).execute(&mut *conn).await?; + Ok(()) + } + Err(e) => Err(MigrationErr::Io(path, e)), + } + } + + if reset { + info!("DB reset, sqlqir '{}'", sql_dir.to_string_lossy()); + exec_sql_file(&mut tx, sql_dir, &format!("{prefix}-drop.sql"), "drop").await?; + } + + info!("DB initialization, sqlqir '{}'", sql_dir.to_string_lossy()); + + exec_sql_file(&mut tx, sql_dir, "versioning.sql", "versioning").await?; + + let applied: Vec<String> = sqlx::query("SELECT patch_name FROM _v.patches") + .try_map(|r: PgRow| r.try_get(1)) + .fetch_all(&mut *tx) + .await?; + for n in 1..9999 { + let patch = format!("{prefix}-{n:0>4}"); + if applied.contains(&patch) { + debug!("patch {patch} already applied"); + continue; + } + + if let Err(e) = exec_sql_file( + &mut tx, + sql_dir, + &format!("{patch}.sql"), + &format!("patch {patch}"), + ) + .await + { + if let MigrationErr::Io(path, e) = &e { + if e.kind() == ErrorKind::NotFound { + debug!( + "path '{}' doesn't exist anymore, stopping", + path.to_string_lossy() + ); + break; + } + } + return Err(e); + } + } + + exec_sql_file( + &mut tx, + sql_dir, + &format!("{prefix}-procedures.sql"), + "procedures", + ) + .await?; + + tx.commit().await?; + + Ok(()) +} diff --git a/common/taler-common/src/lib.rs b/common/taler-common/src/lib.rs @@ -31,6 +31,7 @@ pub mod error_code; pub mod json_file; mod log; pub mod types; +pub mod db; #[derive(clap::Parser, Debug, Clone)] pub struct CommonArgs { diff --git a/common/taler-test-utils/src/lib.rs b/common/taler-test-utils/src/lib.rs @@ -14,23 +14,45 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - OnceLock, +use std::{ + path::Path, + str::FromStr, + sync::{ + atomic::{AtomicUsize, Ordering}, + OnceLock, + }, }; -use sqlx::{postgres::PgPoolOptions, PgPool}; +use sqlx::{ + postgres::{PgConnectOptions, PgPoolOptions}, + PgPool, +}; pub use axum_test; +use taler_common::db::{dbinit, pool}; use tracing::Level; use tracing_subscriber::{util::SubscriberInitExt, FmtSubscriber}; pub mod helpers; pub mod json; pub mod routine; -pub async fn db_test_setup() -> PgPool { +pub async fn db_test_setup(prefix: &str) -> PgPool { + let schema = prefix.replace("-", "_"); setup_tracing(); - db_pool().await + let cfg = test_db().await; + let pool = pool(cfg, &schema).await.unwrap(); + let mut conn = pool.acquire().await.unwrap(); + let path: &Path = env!("CARGO_MANIFEST_DIR").as_ref(); + + dbinit( + &mut conn, + &path.join("../../database-versioning"), + prefix, + true, + ) + .await + .unwrap(); + pool } static MASTER_POOL: OnceLock<PgPool> = OnceLock::new(); @@ -38,7 +60,7 @@ static MASTER_POOL: OnceLock<PgPool> = OnceLock::new(); const DB: &str = "postgres:///taler_rust_check"; static NB_DB: AtomicUsize = AtomicUsize::new(0); -async fn db_pool() -> PgPool { +async fn test_db() -> PgConnectOptions { let master = MASTER_POOL.get_or_init(|| { PgPoolOptions::new() .max_connections(20) @@ -60,11 +82,7 @@ async fn db_pool() -> PgPool { .await .unwrap(); drop(conn); - - PgPoolOptions::new() - .test_before_acquire(false) - .connect_lazy(&format!("postgresql:/{name}")) - .expect("pg pool") + PgConnectOptions::from_str(&format!("postgresql:/{name}")).unwrap() } const TEST_TRACING_LEVEL: tracing::Level = Level::INFO; diff --git a/database-versioning/magnet-bank-0001.sql b/database-versioning/magnet-bank-0001.sql @@ -0,0 +1,111 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2025 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +SELECT _v.register_patch('magnet-bank-0001', NULL, NULL); + +CREATE SCHEMA magnet_bank; +SET search_path TO magnet_bank; + +CREATE TYPE taler_amount AS (val INT8, frac INT4); +COMMENT ON TYPE taler_amount IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; + +CREATE TABLE tx_in( + tx_in_id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + magnet_code INT8 UNIQUE, + amount taler_amount NOT NULL, + subject TEXT NOT NULL, + debit_account TEXT NOT NULL, + debit_name TEXT NOT NULL, + created INT8 NOT NULL +); +COMMENT ON TABLE tx_in IS 'Incoming transactions'; + +CREATE TABLE tx_out( + tx_out_id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + magnet_code INT8 UNIQUE, + amount taler_amount NOT NULL, + subject TEXT NOT NULL, + credit_account TEXT NOT NULL, + credit_name TEXT NOT NULL, + created INT8 NOT NULL +); +COMMENT ON TABLE tx_in IS 'Outgoing transactions'; + +CREATE TYPE incoming_type AS ENUM + ('reserve' ,'kyc', 'wad'); +COMMENT ON TYPE incoming_type IS 'Types of incoming talerable transactions'; + +CREATE TABLE taler_in( + tx_in_id INT8 PRIMARY KEY REFERENCES tx_in(tx_in_id) ON DELETE CASCADE, + type incoming_type NOT NULL, + metadata BYTEA NOT NULL, + origin_exchange_url TEXT, + CONSTRAINT polymorphism CHECK( + CASE type + WHEN 'wad' THEN LENGTH(metadata)=24 AND origin_exchange_url IS NOT NULL + ELSE LENGTH(metadata)=32 AND origin_exchange_url IS NULL + END + ) +); +COMMENT ON TABLE tx_in IS 'Incoming talerable transactions'; + +CREATE UNIQUE INDEX taler_in_unique_reserve_pub ON taler_in (metadata) WHERE type = 'reserve'; + +CREATE TABLE taler_out( + tx_out_id INT8 PRIMARY KEY REFERENCES tx_out(tx_out_id) ON DELETE CASCADE, + wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32), + exchange_base_url TEXT NOT NULL +); +COMMENT ON TABLE tx_in IS 'Outgoing talerable transactions'; + +CREATE TYPE transfer_status AS ENUM( + 'pending', + 'transient_failure', + 'permanent_failure', + 'success', + 'late_failure' +); +COMMENT ON TYPE transfer_status IS 'Status of an initiated outgoing transaction'; + +CREATE TABLE initiated( + initiated_id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + amount taler_amount NOT NULL, + subject TEXT NOT NULL, + credit_account TEXT NOT NULL, + credit_name TEXT NOT NULL, + status transfer_status NOT NULL DEFAULT 'pending', + status_msg TEXT, + magnet_code INT8 UNIQUE, + last_submitted INT8, + submission_counter INT2 NOT NULL DEFAULT 0, + tx_out_id INT8 UNIQUE REFERENCES tx_out(tx_out_id) ON DELETE CASCADE, + created INT8 NOT NULL +); +COMMENT ON TABLE tx_in IS 'Initiated outgoing transactions'; + +CREATE TABLE transfer( + initiated_id INT8 PRIMARY KEY REFERENCES initiated(initiated_id) ON DELETE CASCADE, + request_uid BYTEA UNIQUE NOT NULL CHECK (LENGTH(request_uid)=64), + wtid BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid)=32), + exchange_base_url TEXT NOT NULL +); +COMMENT ON TABLE transfer IS 'Wire Gateway transfers'; + +CREATE TABLE bounced( + tx_in_id INT8 NOT NULL UNIQUE REFERENCES tx_in(tx_in_id) ON DELETE CASCADE, + initiated_id INT8 NOT NULL UNIQUE REFERENCES initiated(initiated_id) ON DELETE CASCADE, + reason TEXT NOT NULL +); +COMMENT ON TABLE tx_in IS 'Bounced transactions'; +\ No newline at end of file diff --git a/database-versioning/magnet-bank-drop.sql b/database-versioning/magnet-bank-drop.sql @@ -0,0 +1,29 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2025 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +DO +$do$ +DECLARE + patch text; +BEGIN + IF EXISTS(SELECT FROM information_schema.schemata WHERE schema_name='_v') THEN + FOR patch IN SELECT patch_name FROM _v.patches WHERE patch_name LIKE 'magnet_bank_%' LOOP + PERFORM _v.unregister_patch(patch); + END LOOP; + END IF; +END +$do$; + +DROP SCHEMA IF EXISTS magnet_bank CASCADE; +\ No newline at end of file diff --git a/database-versioning/magnet-bank-procedures.sql b/database-versioning/magnet-bank-procedures.sql @@ -0,0 +1,324 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2025 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +SET search_path TO magnet_bank; + +-- Remove all existing functions +DO +$do$ +DECLARE + _sql text; +BEGIN + SELECT INTO _sql + string_agg(format('DROP %s %s CASCADE;' + , CASE prokind + WHEN 'f' THEN 'FUNCTION' + WHEN 'p' THEN 'PROCEDURE' + END + , oid::regprocedure) + , E'\n') + FROM pg_proc + WHERE pronamespace = 'magnet_bank'::regnamespace; + + IF _sql IS NOT NULL THEN + EXECUTE _sql; + END IF; +END +$do$; + +CREATE FUNCTION register_tx_in( + IN in_code INT8, + IN in_amount taler_amount, + IN in_subject TEXT, + IN in_debit_account TEXT, + IN in_debit_name TEXT, + IN in_timestamp INT8, + IN in_type incoming_type, + IN in_metadata BYTEA, + -- Error status + OUT out_reserve_pub_reuse BOOLEAN, + -- Success return + OUT out_tx_row_id INT8, + OUT out_timestamp INT8, + OUT out_new BOOLEAN +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check for idempotence +SELECT tx_in_id, created +INTO out_tx_row_id, out_timestamp +FROM tx_in +WHERE (in_code IS NOT NULL AND magnet_code = in_code) -- Magnet transaction + OR (in_code IS NULL AND amount = in_amount AND debit_account = in_debit_account AND subject = in_subject); -- Admin transaction +out_new = NOT found; +IF NOT out_new THEN + out_reserve_pub_reuse=false; + RETURN; +END IF; + +-- Check conflict +SELECT in_type = 'reserve'::incoming_type AND EXISTS(SELECT FROM taler_in WHERE metadata = in_metadata AND type = 'reserve') + INTO out_reserve_pub_reuse; +IF out_reserve_pub_reuse THEN + RETURN; +END IF; + +-- Insert new incoming transaction +INSERT INTO tx_in ( + magnet_code, + amount, + subject, + debit_account, + debit_name, + created +) VALUES ( + in_code, + in_amount, + in_subject, + in_debit_account, + in_debit_name, + in_timestamp +) +RETURNING tx_in_id, created +INTO out_tx_row_id, out_timestamp; +-- Notify new incoming transaction registration +PERFORM pg_notify('tx_in', out_tx_row_id || ''); +IF in_type IS NOT NULL THEN + -- Insert new incoming talerable transaction + INSERT INTO taler_in ( + tx_in_id, + type, + metadata + ) VALUES ( + out_tx_row_id, + in_type, + in_metadata + ); + -- Notify new incoming talerable transaction registration + PERFORM pg_notify('taler_in', out_tx_row_id || ''); +END IF; +END $$; +COMMENT ON FUNCTION register_tx_in IS 'Register an incoming transaction idempotently'; + +CREATE FUNCTION register_tx_out( + IN in_code INT8, + IN in_amount taler_amount, + IN in_subject TEXT, + IN in_credit_account TEXT, + IN in_credit_name TEXT, + IN in_timestamp INT8, + IN in_wtid BYTEA, + IN in_origin_exchange_url TEXT, + -- Success return + OUT out_tx_row_id INT8, + OUT out_timestamp INT8, + OUT out_new BOOLEAN +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check for idempotence +SELECT tx_out_id, created +INTO out_tx_row_id, out_timestamp +FROM tx_out WHERE magnet_code = in_code; + +out_new = NOT found; +IF out_new THEN + -- Insert new outgoing transaction + INSERT INTO tx_out ( + magnet_code, + amount, + subject, + credit_account, + credit_name, + created + ) VALUES ( + in_code, + in_amount, + in_subject, + in_credit_account, + in_credit_name, + in_timestamp + ) + RETURNING tx_out_id, created + INTO out_tx_row_id, out_timestamp; + -- Notify new outgoing transaction registration + PERFORM pg_notify('tx_out', out_tx_row_id || ''); + + IF in_wtid IS NOT NULL THEN + -- Insert new outgoing talerable transaction + INSERT INTO taler_out ( + tx_out_id, + wtid, + exchange_base_url + ) VALUES ( + out_tx_row_id, + in_wtid, + in_origin_exchange_url + ); + -- Notify new outgoing talerable transaction registration + PERFORM pg_notify('taler_out', out_tx_row_id || ''); + END IF; +END IF; +END $$; +COMMENT ON FUNCTION register_tx_out IS 'Register an outgoing transaction idempotently'; + +CREATE FUNCTION taler_transfer( + IN in_request_uid BYTEA, + IN in_wtid BYTEA, + IN in_subject TEXT, + IN in_amount taler_amount, + IN in_exchange_base_url TEXT, + IN in_credit_account TEXT, + IN in_credit_name TEXT, + IN in_timestamp INT8, + -- Error return + OUT out_request_uid_reuse BOOLEAN, + OUT out_wtid_reuse BOOLEAN, + -- Success return + OUT out_tx_row_id INT8, + OUT out_timestamp INT8 +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check for idempotence and conflict +SELECT (amount != in_amount + OR credit_account != in_credit_account + OR exchange_base_url != in_exchange_base_url + OR wtid != in_wtid) + ,transfer.initiated_id, created +INTO out_request_uid_reuse, out_tx_row_id, out_timestamp +FROM transfer JOIN initiated USING (initiated_id) +WHERE request_uid = in_request_uid; +IF FOUND THEN + out_wtid_reuse=FALSE; + RETURN; +END IF; +out_request_uid_reuse=FALSE; +-- Check for wtid reuse +out_wtid_reuse = EXISTS(SELECT FROM transfer WHERE wtid=in_wtid); +IF out_wtid_reuse THEN + RETURN; +END IF; +-- Insert an initiated outgoing transaction +INSERT INTO initiated ( + amount, + subject, + credit_account, + credit_name, + created +) VALUES ( + in_amount, + in_subject, + in_credit_account, + in_credit_name, + in_timestamp +) RETURNING initiated_id, created +INTO out_tx_row_id, out_timestamp; +-- Insert a transfer operation +INSERT INTO transfer ( + initiated_id, + request_uid, + wtid, + exchange_base_url +) VALUES ( + out_tx_row_id, + in_request_uid, + in_wtid, + in_exchange_base_url +); +PERFORM pg_notify('transfer', out_tx_row_id || ''); +END $$; + +CREATE FUNCTION initiated_status_update( + IN in_initiated_id INT8, + IN in_status transfer_status, + IN in_status_msg TEXT +) +RETURNS void +LANGUAGE plpgsql AS $$ +DECLARE +current_status transfer_status; +BEGIN + -- Check current status + SELECT status INTO current_status FROM initiated + WHERE initiated_id = in_initiated_id; + IF FOUND THEN + -- Update unsettled transaction status + IF current_status = 'success' AND in_status = 'permanent_failure' THEN + UPDATE initiated + SET status = 'late_failure', status_msg = in_status_msg + WHERE initiated_id = in_initiated_id; + ELSIF current_status NOT IN ('success', 'permanent_failure', 'late_failure') THEN + UPDATE initiated + SET status = in_status, status_msg = in_status_msg + WHERE initiated_id = in_initiated_id; + END IF; + END IF; +END $$; + +CREATE FUNCTION bounce( + IN in_tx_in_id INT8, + IN in_amount taler_amount, + IN in_reason TEXT, + IN in_timestamp INT8, + OUT out_tx_row_id INT8, + OUT out_timestamp INT8 +) +LANGUAGE plpgsql AS $$ +DECLARE +local_debit_account TEXT; +local_debit_name TEXT; +local_magnet_code INT8; +BEGIN +-- Check if already bounce +SELECT initiated_id, created + INTO out_tx_row_id, out_timestamp + FROM bounced JOIN initiated USING (initiated_id) + WHERE tx_in_id = in_tx_in_id; + +-- Else initiate the bounce transaction +IF NOT FOUND THEN + -- Get incoming transaction bank ID and creditor + SELECT debit_account, debit_name, magnet_code + INTO local_debit_account, local_debit_name, local_magnet_code + FROM tx_in + WHERE tx_in_id = in_tx_in_id; + -- Initiate the bounce transaction + INSERT INTO initiated ( + amount, + subject, + credit_account, + credit_name, + created + ) VALUES ( + in_amount, + 'bounce: ' || local_magnet_code, + local_debit_account, + local_debit_name, + in_timestamp + ) + RETURNING initiated_id, created INTO out_tx_row_id, out_timestamp; + -- Register the bounce + INSERT INTO bounced ( + tx_in_id, + initiated_id, + reason + ) VALUES ( + in_tx_in_id, + out_tx_row_id, + in_reason + ); +END IF; +END$$; +\ No newline at end of file diff --git a/database-versioning/taler-api-0001.sql b/database-versioning/taler-api-0001.sql @@ -0,0 +1,65 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2024-2025 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +SELECT _v.register_patch('taler-api-0001', NULL, NULL); + +CREATE SCHEMA taler_api; +SET search_path TO taler_api; + +CREATE TYPE taler_amount + AS (val INT8, frac INT4); +COMMENT ON TYPE taler_amount + IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; + +CREATE TYPE transfer_status AS ENUM ( + 'pending' + ,'transient_failure' + ,'permanent_failure' + ,'success' +); + +CREATE TABLE transfers ( + transfer_id INT8 PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY + ,amount taler_amount NOT NULL + ,exchange_base_url TEXT NOT NULL + ,subject TEXT NOT NULL + ,credit_payto TEXT NOT NULL + ,request_uid BYTEA UNIQUE NOT NULL CHECK (LENGTH(request_uid)=64) + ,wtid BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid)=32) + ,status transfer_status NOT NULL + ,status_msg TEXT + ,transfer_time INT8 NOT NULL +); + +CREATE TYPE incoming_type AS ENUM + ('reserve' ,'kyc', 'wad'); +CREATE TABLE incoming_transactions ( + incoming_transaction_id INT8 PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY + ,amount taler_amount NOT NULL + ,subject TEXT NOT NULL + ,debit_payto TEXT NOT NULL + ,creation_time INT8 NOT NULL + ,type incoming_type NOT NULL + ,metadata BYTEA NOT NULL + ,origin_exchange_url TEXT + ,CONSTRAINT polymorphism CHECK( + CASE type + WHEN 'wad' THEN LENGTH(metadata)=24 AND origin_exchange_url IS NOT NULL + ELSE LENGTH(metadata)=32 AND origin_exchange_url IS NULL + END + ) +); + +CREATE UNIQUE INDEX incoming_transactions_unique_reserve_pub ON incoming_transactions (metadata) WHERE type = 'reserve'; +\ No newline at end of file diff --git a/database-versioning/taler-api-drop.sql b/database-versioning/taler-api-drop.sql @@ -0,0 +1,29 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2025 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +DO +$do$ +DECLARE + patch text; +BEGIN + IF EXISTS(SELECT FROM information_schema.schemata WHERE schema_name='_v') THEN + FOR patch IN SELECT patch_name FROM _v.patches WHERE patch_name LIKE 'taler_api_%' LOOP + PERFORM _v.unregister_patch(patch); + END LOOP; + END IF; +END +$do$; + +DROP SCHEMA IF EXISTS taler_api CASCADE; +\ No newline at end of file diff --git a/database-versioning/taler-api-procedures.sql b/database-versioning/taler-api-procedures.sql @@ -0,0 +1,139 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2025 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +SET search_path TO taler_api; + +-- Remove all existing functions +DO +$do$ +DECLARE + _sql text; +BEGIN + SELECT INTO _sql + string_agg(format('DROP %s %s CASCADE;' + , CASE prokind + WHEN 'f' THEN 'FUNCTION' + WHEN 'p' THEN 'PROCEDURE' + END + , oid::regprocedure) + , E'\n') + FROM pg_proc + WHERE pronamespace = 'taler_api'::regnamespace; + + IF _sql IS NOT NULL THEN + EXECUTE _sql; + END IF; +END +$do$; + +CREATE FUNCTION taler_transfer( + IN in_amount taler_amount, + IN in_exchange_base_url TEXT, + IN in_subject TEXT, + IN in_credit_payto TEXT, + IN in_request_uid BYTEA, + IN in_wtid BYTEA, + IN in_timestamp INT8, + -- Error status + OUT out_request_uid_reuse BOOLEAN, + -- Success return + OUT out_tx_row_id INT8, + OUT out_timestamp INT8 +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check for idempotence and conflict +SELECT (amount != in_amount + OR credit_payto != in_credit_payto + OR exchange_base_url != in_exchange_base_url + OR wtid != in_wtid) + ,transfer_id, transfer_time + INTO out_request_uid_reuse, out_tx_row_id, out_timestamp + FROM transfers + WHERE request_uid = in_request_uid; +IF found THEN + RETURN; +END IF; +out_request_uid_reuse=false; +out_timestamp=in_timestamp; +-- Register exchange +INSERT INTO transfers ( + amount, + exchange_base_url, + subject, + credit_payto, + request_uid, + wtid, + status, + status_msg, + transfer_time +) VALUES ( + in_amount, + in_exchange_base_url, + in_subject, + in_credit_payto, + in_request_uid, + in_wtid, + 'success', + NULL, + in_timestamp +) RETURNING transfer_id INTO out_tx_row_id; +-- Notify new transaction +PERFORM pg_notify('outgoing_tx', out_tx_row_id || ''); +END $$; +COMMENT ON FUNCTION taler_transfer IS 'Create an outgoing taler transaction and register it'; + +CREATE FUNCTION add_incoming( + IN in_key BYTEA, + IN in_subject TEXT, + IN in_amount taler_amount, + IN in_debit_payto TEXT, + IN in_timestamp INT8, + IN in_type incoming_type, + -- Error status + OUT out_reserve_pub_reuse BOOLEAN, + -- Success return + OUT out_tx_row_id INT8 +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check conflict +SELECT in_type = 'reserve'::incoming_type AND EXISTS(SELECT FROM incoming_transactions WHERE metadata = in_key AND type = 'reserve') + INTO out_reserve_pub_reuse; +IF out_reserve_pub_reuse THEN + RETURN; +END IF; +-- Register incoming transaction +INSERT INTO incoming_transactions ( + amount, + debit_payto, + creation_time, + subject, + type, + metadata, + origin_exchange_url +) VALUES ( + in_amount, + in_debit_payto, + in_timestamp, + in_subject, + in_type, + in_key, + NULL +) RETURNING incoming_transaction_id INTO out_tx_row_id; +-- Notify new incoming transaction +PERFORM pg_notify('incoming_tx', out_tx_row_id || ''); +END $$; +COMMENT ON FUNCTION add_incoming IS 'Create an incoming taler transaction and register it'; +\ No newline at end of file diff --git a/database-versioning/versioning.sql b/database-versioning/versioning.sql @@ -0,0 +1,294 @@ +-- LICENSE AND COPYRIGHT +-- +-- Copyright (C) 2010 Hubert depesz Lubaczewski +-- +-- This program is distributed under the (Revised) BSD License: +-- L<http://www.opensource.org/licenses/bsd-license.php> +-- +-- Redistribution and use in source and binary forms, with or without +-- modification, are permitted provided that the following conditions +-- are met: +-- +-- * Redistributions of source code must retain the above copyright +-- notice, this list of conditions and the following disclaimer. +-- +-- * Redistributions in binary form must reproduce the above copyright +-- notice, this list of conditions and the following disclaimer in the +-- documentation and/or other materials provided with the distribution. +-- +-- * Neither the name of Hubert depesz Lubaczewski's Organization +-- nor the names of its contributors may be used to endorse or +-- promote products derived from this software without specific +-- prior written permission. +-- +-- THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +-- AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +-- IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +-- DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE +-- FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +-- DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +-- SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +-- CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +-- OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +-- OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +-- +-- Code origin: https://gitlab.com/depesz/Versioning/blob/master/install.versioning.sql +-- +-- +-- # NAME +-- +-- **Versioning** - simplistic take on tracking and applying changes to databases. +-- +-- # DESCRIPTION +-- +-- This project strives to provide simple way to manage changes to +-- database. +-- +-- Instead of making changes on development server, then finding +-- differences between production and development, deciding which ones +-- should be installed on production, and finding a way to install them - +-- you start with writing diffs themselves! +-- +-- # INSTALLATION +-- +-- To install versioning simply run install.versioning.sql in your database +-- (all of them: production, stage, test, devel, ...). +-- +-- # USAGE +-- +-- In your files with patches to database, put whole logic in single +-- transaction, and use \_v.\* functions - usually \_v.register_patch() at +-- least to make sure everything is OK. +-- +-- For example. Let's assume you have patch files: +-- +-- ## 0001.sql: +-- +-- ``` +-- create table users (id serial primary key, username text); +-- ``` +-- +-- ## 0002.sql: +-- +-- ``` +-- insert into users (username) values ('depesz'); +-- ``` +-- To change it to use versioning you would change the files, to this +-- state: +-- +-- 0000.sql: +-- +-- ``` +-- BEGIN; +-- select _v.register_patch('000-base', NULL, NULL); +-- create table users (id serial primary key, username text); +-- COMMIT; +-- ``` +-- +-- ## 0002.sql: +-- +-- ``` +-- BEGIN; +-- select _v.register_patch('001-users', ARRAY['000-base'], NULL); +-- insert into users (username) values ('depesz'); +-- COMMIT; +-- ``` +-- +-- This will make sure that patch 001-users can only be applied after +-- 000-base. +-- +-- # AVAILABLE FUNCTIONS +-- +-- ## \_v.register_patch( TEXT ) +-- +-- Registers named patch, or dies if it is already registered. +-- +-- Returns integer which is id of patch in \_v.patches table - only if it +-- succeeded. +-- +-- ## \_v.register_patch( TEXT, TEXT[] ) +-- +-- Same as \_v.register_patch( TEXT ), but checks is all given patches (given as +-- array in second argument) are already registered. +-- +-- ## \_v.register_patch( TEXT, TEXT[], TEXT[] ) +-- +-- Same as \_v.register_patch( TEXT, TEXT[] ), but also checks if there are no conflicts with preexisting patches. +-- +-- Third argument is array of names of patches that conflict with current one. So +-- if any of them is installed - register_patch will error out. +-- +-- ## \_v.unregister_patch( TEXT ) +-- +-- Removes information about given patch from the versioning data. +-- +-- It doesn't remove objects that were created by this patch - just removes +-- metainformation. +-- +-- ## \_v.assert_user_is_superuser() +-- +-- Make sure that current patch is being loaded by superuser. +-- +-- If it's not - it will raise exception, and break transaction. +-- +-- ## \_v.assert_user_is_not_superuser() +-- +-- Make sure that current patch is not being loaded by superuser. +-- +-- If it is - it will raise exception, and break transaction. +-- +-- ## \_v.assert_user_is_one_of(TEXT, TEXT, ... ) +-- +-- Make sure that current patch is being loaded by one of listed users. +-- +-- If ```current_user``` is not listed as one of arguments - function will raise +-- exception and break the transaction. + +BEGIN; + + +-- This file adds versioning support to database it will be loaded to. +-- It requires that PL/pgSQL is already loaded - will raise exception otherwise. +-- All versioning "stuff" (tables, functions) is in "_v" schema. + +-- All functions are defined as 'RETURNS SETOF INT4' to be able to make them to RETURN literally nothing (0 rows). +-- >> RETURNS VOID<< IS similar, but it still outputs "empty line" in psql when calling +CREATE SCHEMA IF NOT EXISTS _v; +COMMENT ON SCHEMA _v IS 'Schema for versioning data and functionality.'; + +CREATE TABLE IF NOT EXISTS _v.patches ( + patch_name TEXT PRIMARY KEY, + applied_tsz TIMESTAMPTZ NOT NULL DEFAULT now(), + applied_by TEXT NOT NULL, + requires TEXT[], + conflicts TEXT[] +); +COMMENT ON TABLE _v.patches IS 'Contains information about what patches are currently applied on database.'; +COMMENT ON COLUMN _v.patches.patch_name IS 'Name of patch, has to be unique for every patch.'; +COMMENT ON COLUMN _v.patches.applied_tsz IS 'When the patch was applied.'; +COMMENT ON COLUMN _v.patches.applied_by IS 'Who applied this patch (PostgreSQL username)'; +COMMENT ON COLUMN _v.patches.requires IS 'List of patches that are required for given patch.'; +COMMENT ON COLUMN _v.patches.conflicts IS 'List of patches that conflict with given patch.'; + +CREATE OR REPLACE FUNCTION _v.register_patch( IN in_patch_name TEXT, IN in_requirements TEXT[], in_conflicts TEXT[], OUT versioning INT4 ) RETURNS setof INT4 AS $$ +DECLARE + t_text TEXT; + t_text_a TEXT[]; + i INT4; +BEGIN + -- Thanks to this we know only one patch will be applied at a time + LOCK TABLE _v.patches IN EXCLUSIVE MODE; + + SELECT patch_name INTO t_text FROM _v.patches WHERE patch_name = in_patch_name; + IF FOUND THEN + RAISE EXCEPTION 'Patch % is already applied!', in_patch_name; + END IF; + + t_text_a := ARRAY( SELECT patch_name FROM _v.patches WHERE patch_name = any( in_conflicts ) ); + IF array_upper( t_text_a, 1 ) IS NOT NULL THEN + RAISE EXCEPTION 'Versioning patches conflict. Conflicting patche(s) installed: %.', array_to_string( t_text_a, ', ' ); + END IF; + + IF array_upper( in_requirements, 1 ) IS NOT NULL THEN + t_text_a := '{}'; + FOR i IN array_lower( in_requirements, 1 ) .. array_upper( in_requirements, 1 ) LOOP + SELECT patch_name INTO t_text FROM _v.patches WHERE patch_name = in_requirements[i]; + IF NOT FOUND THEN + t_text_a := t_text_a || in_requirements[i]; + END IF; + END LOOP; + IF array_upper( t_text_a, 1 ) IS NOT NULL THEN + RAISE EXCEPTION 'Missing prerequisite(s): %.', array_to_string( t_text_a, ', ' ); + END IF; + END IF; + + INSERT INTO _v.patches (patch_name, applied_tsz, applied_by, requires, conflicts ) VALUES ( in_patch_name, now(), current_user, coalesce( in_requirements, '{}' ), coalesce( in_conflicts, '{}' ) ); + RETURN; +END; +$$ language plpgsql; +COMMENT ON FUNCTION _v.register_patch( TEXT, TEXT[], TEXT[] ) IS 'Function to register patches in database. Raises exception if there are conflicts, prerequisites are not installed or the migration has already been installed.'; + +CREATE OR REPLACE FUNCTION _v.register_patch( TEXT, TEXT[] ) RETURNS setof INT4 AS $$ + SELECT _v.register_patch( $1, $2, NULL ); +$$ language sql; +COMMENT ON FUNCTION _v.register_patch( TEXT, TEXT[] ) IS 'Wrapper to allow registration of patches without conflicts.'; +CREATE OR REPLACE FUNCTION _v.register_patch( TEXT ) RETURNS setof INT4 AS $$ + SELECT _v.register_patch( $1, NULL, NULL ); +$$ language sql; +COMMENT ON FUNCTION _v.register_patch( TEXT ) IS 'Wrapper to allow registration of patches without requirements and conflicts.'; + +CREATE OR REPLACE FUNCTION _v.unregister_patch( IN in_patch_name TEXT, OUT versioning INT4 ) RETURNS setof INT4 AS $$ +DECLARE + i INT4; + t_text_a TEXT[]; +BEGIN + -- Thanks to this we know only one patch will be applied at a time + LOCK TABLE _v.patches IN EXCLUSIVE MODE; + + t_text_a := ARRAY( SELECT patch_name FROM _v.patches WHERE in_patch_name = ANY( requires ) ); + IF array_upper( t_text_a, 1 ) IS NOT NULL THEN + RAISE EXCEPTION 'Cannot uninstall %, as it is required by: %.', in_patch_name, array_to_string( t_text_a, ', ' ); + END IF; + + DELETE FROM _v.patches WHERE patch_name = in_patch_name; + GET DIAGNOSTICS i = ROW_COUNT; + IF i < 1 THEN + RAISE EXCEPTION 'Patch % is not installed, so it can''t be uninstalled!', in_patch_name; + END IF; + + RETURN; +END; +$$ language plpgsql; +COMMENT ON FUNCTION _v.unregister_patch( TEXT ) IS 'Function to unregister patches in database. Dies if the patch is not registered, or if unregistering it would break dependencies.'; + +CREATE OR REPLACE FUNCTION _v.assert_patch_is_applied( IN in_patch_name TEXT ) RETURNS TEXT as $$ +DECLARE + t_text TEXT; +BEGIN + SELECT patch_name INTO t_text FROM _v.patches WHERE patch_name = in_patch_name; + IF NOT FOUND THEN + RAISE EXCEPTION 'Patch % is not applied!', in_patch_name; + END IF; + RETURN format('Patch %s is applied.', in_patch_name); +END; +$$ language plpgsql; +COMMENT ON FUNCTION _v.assert_patch_is_applied( TEXT ) IS 'Function that can be used to make sure that patch has been applied.'; + +CREATE OR REPLACE FUNCTION _v.assert_user_is_superuser() RETURNS TEXT as $$ +DECLARE + v_super bool; +BEGIN + SELECT usesuper INTO v_super FROM pg_user WHERE usename = current_user; + IF v_super THEN + RETURN 'assert_user_is_superuser: OK'; + END IF; + RAISE EXCEPTION 'Current user is not superuser - cannot continue.'; +END; +$$ language plpgsql; +COMMENT ON FUNCTION _v.assert_user_is_superuser() IS 'Function that can be used to make sure that patch is being applied using superuser account.'; + +CREATE OR REPLACE FUNCTION _v.assert_user_is_not_superuser() RETURNS TEXT as $$ +DECLARE + v_super bool; +BEGIN + SELECT usesuper INTO v_super FROM pg_user WHERE usename = current_user; + IF v_super THEN + RAISE EXCEPTION 'Current user is superuser - cannot continue.'; + END IF; + RETURN 'assert_user_is_not_superuser: OK'; +END; +$$ language plpgsql; +COMMENT ON FUNCTION _v.assert_user_is_not_superuser() IS 'Function that can be used to make sure that patch is being applied using normal (not superuser) account.'; + +CREATE OR REPLACE FUNCTION _v.assert_user_is_one_of(VARIADIC p_acceptable_users TEXT[] ) RETURNS TEXT as $$ +DECLARE +BEGIN + IF current_user = any( p_acceptable_users ) THEN + RETURN 'assert_user_is_one_of: OK'; + END IF; + RAISE EXCEPTION 'User is not one of: % - cannot continue.', p_acceptable_users; +END; +$$ language plpgsql; +COMMENT ON FUNCTION _v.assert_user_is_one_of(TEXT[]) IS 'Function that can be used to make sure that patch is being applied by one of defined users.'; + +COMMIT; diff --git a/taler-magnet-bank/Cargo.toml b/taler-magnet-bank/Cargo.toml @@ -53,11 +53,38 @@ systemd-units = [ recommends = ["apache2 | nginx | httpd", "postgresql (>= 15.0)"] assets = [ # Binary - ["target/release/taler-magnet-bank", "/usr/bin/", "755"], + [ + "target/release/taler-magnet-bank", + "/usr/bin/", + "755", + ], # Scripts - ["../contrib/taler-magnet-bank-dbconfig", "/usr/bin/", "755"], + [ + "../contrib/taler-magnet-bank-dbconfig", + "/usr/bin/", + "755", + ], + # Sql + [ + "../database-versioning/versioning.sql", + "/usr/share/taler-magnet-bank/sql/", + "644", + ], + [ + "../database-versioning/magnet-bank*.sql", + "/usr/share/taler-magnet-bank/sql/", + "644", + ], # Default config - ["magnet-bank.conf", "/usr/share/taler-magnet-bank/config.d/", "644"], + [ + "magnet-bank.conf", + "/usr/share/taler-magnet-bank/config.d/", + "644", + ], # Configs - ["../debian/etc/**/*", "/etc", "644"], + [ + "../debian/etc/**/*", + "/etc", + "644", + ], ] diff --git a/taler-magnet-bank/db/schema.sql b/taler-magnet-bank/db/schema.sql @@ -1,391 +0,0 @@ --- --- This file is part of TALER --- Copyright (C) 2025 Taler Systems SA --- --- TALER is free software; you can redistribute it and/or modify it under the --- terms of the GNU General Public License as published by the Free Software --- Foundation; either version 3, or (at your option) any later version. --- --- TALER is distributed in the hope that it will be useful, but WITHOUT ANY --- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR --- A PARTICULAR PURPOSE. See the GNU General Public License for more details. --- --- You should have received a copy of the GNU General Public License along with --- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> - -CREATE TYPE taler_amount AS (val INT8, frac INT4); -COMMENT ON TYPE taler_amount IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; - -CREATE TABLE tx_in( - tx_in_id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - magnet_code INT8 UNIQUE, - amount taler_amount NOT NULL, - subject TEXT NOT NULL, - debit_account TEXT NOT NULL, - debit_name TEXT NOT NULL, - created INT8 NOT NULL -); -COMMENT ON TABLE tx_in IS 'Incoming transactions'; - -CREATE TABLE tx_out( - tx_out_id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - magnet_code INT8 UNIQUE, - amount taler_amount NOT NULL, - subject TEXT NOT NULL, - credit_account TEXT NOT NULL, - credit_name TEXT NOT NULL, - created INT8 NOT NULL -); -COMMENT ON TABLE tx_in IS 'Outgoing transactions'; - -CREATE TYPE incoming_type AS ENUM - ('reserve' ,'kyc', 'wad'); -COMMENT ON TYPE incoming_type IS 'Types of incoming talerable transactions'; - -CREATE TABLE taler_in( - tx_in_id INT8 PRIMARY KEY REFERENCES tx_in(tx_in_id) ON DELETE CASCADE, - type incoming_type NOT NULL, - metadata BYTEA NOT NULL, - origin_exchange_url TEXT, - CONSTRAINT polymorphism CHECK( - CASE type - WHEN 'wad' THEN LENGTH(metadata)=24 AND origin_exchange_url IS NOT NULL - ELSE LENGTH(metadata)=32 AND origin_exchange_url IS NULL - END - ) -); -COMMENT ON TABLE tx_in IS 'Incoming talerable transactions'; - -CREATE UNIQUE INDEX taler_in_unique_reserve_pub ON taler_in (metadata) WHERE type = 'reserve'; - -CREATE TABLE taler_out( - tx_out_id INT8 PRIMARY KEY REFERENCES tx_out(tx_out_id) ON DELETE CASCADE, - wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32), - exchange_base_url TEXT NOT NULL -); -COMMENT ON TABLE tx_in IS 'Outgoing talerable transactions'; - -CREATE TYPE transfer_status AS ENUM( - 'pending', - 'transient_failure', - 'permanent_failure', - 'success', - 'late_failure' -); -COMMENT ON TYPE transfer_status IS 'Status of an initiated outgoing transaction'; - -CREATE TABLE initiated( - initiated_id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - amount taler_amount NOT NULL, - subject TEXT NOT NULL, - credit_account TEXT NOT NULL, - credit_name TEXT NOT NULL, - status transfer_status NOT NULL DEFAULT 'pending', - status_msg TEXT, - magnet_code INT8 UNIQUE, - last_submitted INT8, - submission_counter INT2 NOT NULL DEFAULT 0, - tx_out_id INT8 UNIQUE REFERENCES tx_out(tx_out_id) ON DELETE CASCADE, - created INT8 NOT NULL -); -COMMENT ON TABLE tx_in IS 'Initiated outgoing transactions'; - -CREATE TABLE transfer( - initiated_id INT8 PRIMARY KEY REFERENCES initiated(initiated_id) ON DELETE CASCADE, - request_uid BYTEA UNIQUE NOT NULL CHECK (LENGTH(request_uid)=64), - wtid BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid)=32), - exchange_base_url TEXT NOT NULL -); -COMMENT ON TABLE transfer IS 'Wire Gateway transfers'; - -CREATE TABLE bounced( - tx_in_id INT8 NOT NULL UNIQUE REFERENCES tx_in(tx_in_id) ON DELETE CASCADE, - initiated_id INT8 NOT NULL UNIQUE REFERENCES initiated(initiated_id) ON DELETE CASCADE, - reason TEXT NOT NULL -); -COMMENT ON TABLE tx_in IS 'Bounced transactions'; - -CREATE FUNCTION register_tx_in( - IN in_code INT8, - IN in_amount taler_amount, - IN in_subject TEXT, - IN in_debit_account TEXT, - IN in_debit_name TEXT, - IN in_timestamp INT8, - IN in_type incoming_type, - IN in_metadata BYTEA, - -- Error status - OUT out_reserve_pub_reuse BOOLEAN, - -- Success return - OUT out_tx_row_id INT8, - OUT out_timestamp INT8, - OUT out_new BOOLEAN -) -LANGUAGE plpgsql AS $$ -BEGIN --- Check for idempotence -SELECT tx_in_id, created -INTO out_tx_row_id, out_timestamp -FROM tx_in -WHERE (in_code IS NOT NULL AND magnet_code = in_code) -- Magnet transaction - OR (in_code IS NULL AND amount = in_amount AND debit_account = in_debit_account AND subject = in_subject); -- Admin transaction -out_new = NOT found; -IF NOT out_new THEN - out_reserve_pub_reuse=false; - RETURN; -END IF; - --- Check conflict -SELECT in_type = 'reserve'::incoming_type AND EXISTS(SELECT FROM taler_in WHERE metadata = in_metadata AND type = 'reserve') - INTO out_reserve_pub_reuse; -IF out_reserve_pub_reuse THEN - RETURN; -END IF; - --- Insert new incoming transaction -INSERT INTO tx_in ( - magnet_code, - amount, - subject, - debit_account, - debit_name, - created -) VALUES ( - in_code, - in_amount, - in_subject, - in_debit_account, - in_debit_name, - in_timestamp -) -RETURNING tx_in_id, created -INTO out_tx_row_id, out_timestamp; --- Notify new incoming transaction registration -PERFORM pg_notify('tx_in', out_tx_row_id || ''); -IF in_type IS NOT NULL THEN - -- Insert new incoming talerable transaction - INSERT INTO taler_in ( - tx_in_id, - type, - metadata - ) VALUES ( - out_tx_row_id, - in_type, - in_metadata - ); - -- Notify new incoming talerable transaction registration - PERFORM pg_notify('taler_in', out_tx_row_id || ''); -END IF; -END $$; -COMMENT ON FUNCTION register_tx_in IS 'Register an incoming transaction idempotently'; - -CREATE FUNCTION register_tx_out( - IN in_code INT8, - IN in_amount taler_amount, - IN in_subject TEXT, - IN in_credit_account TEXT, - IN in_credit_name TEXT, - IN in_timestamp INT8, - IN in_wtid BYTEA, - IN in_origin_exchange_url TEXT, - -- Success return - OUT out_tx_row_id INT8, - OUT out_timestamp INT8, - OUT out_new BOOLEAN -) -LANGUAGE plpgsql AS $$ -BEGIN --- Check for idempotence -SELECT tx_out_id, created -INTO out_tx_row_id, out_timestamp -FROM tx_out WHERE magnet_code = in_code; - -out_new = NOT found; -IF out_new THEN - -- Insert new outgoing transaction - INSERT INTO tx_out ( - magnet_code, - amount, - subject, - credit_account, - credit_name, - created - ) VALUES ( - in_code, - in_amount, - in_subject, - in_credit_account, - in_credit_name, - in_timestamp - ) - RETURNING tx_out_id, created - INTO out_tx_row_id, out_timestamp; - -- Notify new outgoing transaction registration - PERFORM pg_notify('tx_out', out_tx_row_id || ''); - - IF in_wtid IS NOT NULL THEN - -- Insert new outgoing talerable transaction - INSERT INTO taler_out ( - tx_out_id, - wtid, - exchange_base_url - ) VALUES ( - out_tx_row_id, - in_wtid, - in_origin_exchange_url - ); - -- Notify new outgoing talerable transaction registration - PERFORM pg_notify('taler_out', out_tx_row_id || ''); - END IF; -END IF; -END $$; -COMMENT ON FUNCTION register_tx_out IS 'Register an outgoing transaction idempotently'; - -CREATE FUNCTION taler_transfer( - IN in_request_uid BYTEA, - IN in_wtid BYTEA, - IN in_subject TEXT, - IN in_amount taler_amount, - IN in_exchange_base_url TEXT, - IN in_credit_account TEXT, - IN in_credit_name TEXT, - IN in_timestamp INT8, - -- Error return - OUT out_request_uid_reuse BOOLEAN, - OUT out_wtid_reuse BOOLEAN, - -- Success return - OUT out_tx_row_id INT8, - OUT out_timestamp INT8 -) -LANGUAGE plpgsql AS $$ -BEGIN --- Check for idempotence and conflict -SELECT (amount != in_amount - OR credit_account != in_credit_account - OR exchange_base_url != in_exchange_base_url - OR wtid != in_wtid) - ,transfer.initiated_id, created -INTO out_request_uid_reuse, out_tx_row_id, out_timestamp -FROM transfer JOIN initiated USING (initiated_id) -WHERE request_uid = in_request_uid; -IF FOUND THEN - out_wtid_reuse=FALSE; - RETURN; -END IF; -out_request_uid_reuse=FALSE; --- Check for wtid reuse -out_wtid_reuse = EXISTS(SELECT FROM transfer WHERE wtid=in_wtid); -IF out_wtid_reuse THEN - RETURN; -END IF; --- Insert an initiated outgoing transaction -INSERT INTO initiated ( - amount, - subject, - credit_account, - credit_name, - created -) VALUES ( - in_amount, - in_subject, - in_credit_account, - in_credit_name, - in_timestamp -) RETURNING initiated_id, created -INTO out_tx_row_id, out_timestamp; --- Insert a transfer operation -INSERT INTO transfer ( - initiated_id, - request_uid, - wtid, - exchange_base_url -) VALUES ( - out_tx_row_id, - in_request_uid, - in_wtid, - in_exchange_base_url -); -PERFORM pg_notify('transfer', out_tx_row_id || ''); -END $$; - -CREATE FUNCTION initiated_status_update( - IN in_initiated_id INT8, - IN in_status transfer_status, - IN in_status_msg TEXT -) -RETURNS void -LANGUAGE plpgsql AS $$ -DECLARE -current_status transfer_status; -BEGIN - -- Check current status - SELECT status INTO current_status FROM initiated - WHERE initiated_id = in_initiated_id; - IF FOUND THEN - -- Update unsettled transaction status - IF current_status = 'success' AND in_status = 'permanent_failure' THEN - UPDATE initiated - SET status = 'late_failure', status_msg = in_status_msg - WHERE initiated_id = in_initiated_id; - ELSIF current_status NOT IN ('success', 'permanent_failure', 'late_failure') THEN - UPDATE initiated - SET status = in_status, status_msg = in_status_msg - WHERE initiated_id = in_initiated_id; - END IF; - END IF; -END $$; - -CREATE FUNCTION bounce( - IN in_tx_in_id INT8, - IN in_amount taler_amount, - IN in_reason TEXT, - IN in_timestamp INT8, - OUT out_tx_row_id INT8, - OUT out_timestamp INT8 -) -LANGUAGE plpgsql AS $$ -DECLARE -local_debit_account TEXT; -local_debit_name TEXT; -local_magnet_code INT8; -BEGIN --- Check if already bounce -SELECT initiated_id, created - INTO out_tx_row_id, out_timestamp - FROM bounced JOIN initiated USING (initiated_id) - WHERE tx_in_id = in_tx_in_id; - --- Else initiate the bounce transaction -IF NOT FOUND THEN - -- Get incoming transaction bank ID and creditor - SELECT debit_account, debit_name, magnet_code - INTO local_debit_account, local_debit_name, local_magnet_code - FROM tx_in - WHERE tx_in_id = in_tx_in_id; - -- Initiate the bounce transaction - INSERT INTO initiated ( - amount, - subject, - credit_account, - credit_name, - created - ) VALUES ( - in_amount, - 'bounce: ' || local_magnet_code, - local_debit_account, - local_debit_name, - in_timestamp - ) - RETURNING initiated_id, created INTO out_tx_row_id, out_timestamp; - -- Register the bounce - INSERT INTO bounced ( - tx_in_id, - initiated_id, - reason - ) VALUES ( - in_tx_in_id, - out_tx_row_id, - in_reason - ); -END IF; -END$$; -\ No newline at end of file diff --git a/taler-magnet-bank/magnet-bank.conf b/taler-magnet-bank/magnet-bank.conf @@ -74,4 +74,8 @@ TOKEN = [magnet-bankdb-postgres] -CONFIG = postgres:///taler-magnet-bank -\ No newline at end of file +# DB connection string +CONFIG = postgres:///taler-magnet-bank + +# Where are the SQL files to setup our tables? +SQL_DIR = ${DATADIR}/sql/ +\ No newline at end of file diff --git a/taler-magnet-bank/src/config.rs b/taler-magnet-bank/src/config.rs @@ -26,6 +26,7 @@ use crate::magnet::Token; pub struct DbCfg { pub cfg: PgConnectOptions, + pub sql_dir: String, } impl DbCfg { @@ -33,6 +34,7 @@ impl DbCfg { let sect = cfg.section("magnet-bankdb-postgres"); Ok(Self { cfg: sect.postgres("CONFIG").require()?, + sql_dir: sect.path("SQL_DIR").require()?, }) } } diff --git a/taler-magnet-bank/src/db.rs b/taler-magnet-bank/src/db.rs @@ -145,21 +145,6 @@ impl Display for Initiated { } } -pub async fn db_init(db: &PgPool, reset: bool) -> sqlx::Result<()> { - let mut tx = db.begin().await?; - if reset { - sqlx::raw_sql("DROP SCHEMA public CASCADE;CREATE SCHEMA public;") - .execute(&mut *tx) - .await?; - } - // TODO migrations - sqlx::raw_sql(include_str!("../db/schema.sql")) - .execute(&mut *tx) - .await?; - tx.commit().await?; - Ok(()) -} - pub async fn register_tx_in_admin(db: &PgPool, tx: &TxInAdmin) -> sqlx::Result<AddIncomingResult> { sqlx::query( " @@ -645,9 +630,8 @@ mod test { } async fn setup() -> (PgConnection, PgPool) { - let pool = taler_test_utils::db_test_setup().await; - db::db_init(&pool, false).await.expect("dbinit"); - let conn = pool.acquire().await.expect("acquire conn").leak(); + let pool = taler_test_utils::db_test_setup("magnet-bank").await; + let conn = pool.acquire().await.unwrap().leak(); (conn, pool) } diff --git a/taler-magnet-bank/src/main.rs b/taler-magnet-bank/src/main.rs @@ -17,11 +17,11 @@ use std::sync::Arc; use clap::Parser; -use sqlx::PgPool; use taler_api::api::TalerApiBuilder; use taler_common::{ cli::ConfigCmd, config::{parser::ConfigSource, Config}, + db::{dbinit, pool}, taler_main, types::payto::{payto, PaytoURI}, CommonArgs, @@ -29,7 +29,6 @@ use taler_common::{ use taler_magnet_bank::{ adapter::MagnetApi, config::{DbCfg, ServeCfg, WorkerCfg}, - db, dev::{self, DevCmd}, keys, magnet::AuthClient, @@ -101,9 +100,10 @@ async fn app(args: Args, cfg: Config) -> anyhow::Result<()> { keys::setup(cfg, reset).await? } Command::Dbinit { reset } => { - let db = DbCfg::parse(&cfg)?; - let pool = PgPool::connect_with(db.cfg).await?; - db::db_init(&pool, reset).await?; + let cfg = DbCfg::parse(&cfg)?; + let pool = pool(cfg.cfg, "magnet_bank").await?; + let mut conn = pool.acquire().await?; + dbinit(&mut conn, cfg.sql_dir.as_ref(), "magnet-bank", reset).await?; } Command::Serve { check } => { if check { @@ -113,7 +113,7 @@ async fn app(args: Args, cfg: Config) -> anyhow::Result<()> { } } else { let db = DbCfg::parse(&cfg)?; - let pool = PgPool::connect_with(db.cfg).await?; + let pool = pool(db.cfg, "magnet_bank").await?; let cfg = ServeCfg::parse(&cfg)?; let api = Arc::new( MagnetApi::start( @@ -137,7 +137,7 @@ async fn app(args: Args, cfg: Config) -> anyhow::Result<()> { transient: _, } => { let db = DbCfg::parse(&cfg)?; - let pool = PgPool::connect_with(db.cfg).await?; + let pool = pool(db.cfg, "magnet_bank").await?; let cfg = WorkerCfg::parse(&cfg)?; let keys = keys::load(&cfg)?; let client = reqwest::Client::new(); diff --git a/taler-magnet-bank/tests/api.rs b/taler-magnet-bank/tests/api.rs @@ -31,8 +31,7 @@ use taler_test_utils::{ }; async fn setup() -> (TestServer, PgPool) { - let pool = db_test_setup().await; - db::db_init(&pool, false).await.unwrap(); + let pool = db_test_setup("magnet-bank").await; let api = Arc::new( MagnetApi::start( pool.clone(),