taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

commit 6b86233c35e7d0d3d065ad8195512d8691e93fcf
parent c75da6c72bce6dc7633611a8bd058d9d30774992
Author: Antoine A <>
Date:   Mon,  9 Dec 2024 17:28:00 +0100

utils: refactor db utils and add uint helpers

Diffstat:
Dtaler-api/schema.sql | 175-------------------------------------------------------------------------------
Mtaler-api/src/db.rs | 439+++++++++++++++++++++++++++++--------------------------------------------------
Dtaler-api/src/db/query_helper.rs | 107-------------------------------------------------------------------------------
Dtaler-api/src/db/type_helper.rs | 93-------------------------------------------------------------------------------
Dtaler-api/tests/common.rs | 191-------------------------------------------------------------------------------
Ataler-api/tests/common/db.rs | 312+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-api/tests/common/mod.rs | 194+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Ataler-api/tests/schema.sql | 190+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mtaler-common/src/api_common.rs | 5+++--
9 files changed, 860 insertions(+), 846 deletions(-)

diff --git a/taler-api/schema.sql b/taler-api/schema.sql @@ -1,175 +0,0 @@ -BEGIN; -DROP SCHEMA public CASCADE; -CREATE SCHEMA public; - -CREATE TYPE taler_amount - AS (val INT8, frac INT4); -COMMENT ON TYPE taler_amount - IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; - -CREATE TYPE transfer_status AS ENUM ( - 'pending' - ,'transient_failure' - ,'permanent_failure' - ,'success' -); - -CREATE TABLE transfers ( - transfer_id INT8 PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY - ,amount taler_amount NOT NULL - ,exchange_base_url TEXT NOT NULL - ,subject TEXT NOT NULL - ,credit_payto TEXT NOT NULL - ,request_uid BYTEA UNIQUE NOT NULL CHECK (LENGTH(request_uid)=64) - ,wtid BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid)=32) - ,status transfer_status NOT NULL - ,status_msg TEXT - ,transfer_time INT8 NOT NULL -); - -CREATE TYPE incoming_type AS ENUM - ('reserve' ,'kyc', 'wad'); -CREATE TABLE incoming_transactions ( - incoming_transaction_id INT8 PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY - ,amount taler_amount NOT NULL - ,debit_payto TEXT NOT NULL - ,creation_time INT8 NOT NULL - ,type incoming_type NOT NULL - ,reserve_pub BYTEA UNIQUE CHECK (LENGTH(reserve_pub)=32) - ,account_pub BYTEA CHECK (LENGTH(account_pub)=32) - ,wad_id BYTEA CHECK (LENGTH(wad_id)=24) - ,origin_exchange_url TEXT - ,CONSTRAINT incoming_polymorphism CHECK( - CASE type - WHEN 'reserve' THEN reserve_pub IS NOT NULL AND account_pub IS NULL AND origin_exchange_url IS NULL AND wad_id IS NULL - WHEN 'kyc' THEN reserve_pub IS NULL AND account_pub IS NOT NULL AND origin_exchange_url IS NULL AND wad_id IS NULL - WHEN 'wad' THEN reserve_pub IS NULL AND account_pub IS NULL AND origin_exchange_url IS NOT NULL AND wad_id IS NOT NULL - END - ) -); - -CREATE FUNCTION taler_transfer( - IN in_amount taler_amount, - IN in_exchange_base_url TEXT, - IN in_subject TEXT, - IN in_credit_payto TEXT, - IN in_request_uid BYTEA, - IN in_wtid BYTEA, - IN in_timestamp INT8, - -- Error status - OUT out_request_uid_reuse BOOLEAN, - -- Success return - OUT out_tx_row_id INT8, - OUT out_timestamp INT8 -) -LANGUAGE plpgsql AS $$ -BEGIN --- Check for idempotence and conflict -SELECT (amount != in_amount - OR credit_payto != in_credit_payto - OR exchange_base_url != in_exchange_base_url - OR wtid != in_wtid) - ,transfer_id, transfer_time - INTO out_request_uid_reuse, out_tx_row_id, out_timestamp - FROM transfers - WHERE request_uid = in_request_uid; -IF found THEN - RETURN; -END IF; -out_request_uid_reuse=false; -out_timestamp=in_timestamp; --- Register exchange -INSERT INTO transfers ( - amount, - exchange_base_url, - subject, - credit_payto, - request_uid, - wtid, - status, - status_msg, - transfer_time -) VALUES ( - in_amount, - in_exchange_base_url, - in_subject, - in_credit_payto, - in_request_uid, - in_wtid, - 'success', - NULL, - in_timestamp -) RETURNING transfer_id INTO out_tx_row_id; --- Notify new transaction -PERFORM pg_notify('outgoing_tx', out_tx_row_id || ''); -END $$; -COMMENT ON FUNCTION taler_transfer IS 'Create an outgoing taler transaction and register it'; - -CREATE FUNCTION add_incoming( - IN in_key BYTEA, - IN in_subject TEXT, - IN in_amount taler_amount, - IN in_debit_payto TEXT, - IN in_timestamp INT8, - IN in_type incoming_type, - -- Error status - OUT out_reserve_pub_reuse BOOLEAN, - -- Success return - OUT out_tx_row_id INT8 -) -LANGUAGE plpgsql AS $$ -BEGIN --- Check conflict -SELECT in_type = 'reserve'::incoming_type AND EXISTS(SELECT FROM incoming_transactions WHERE reserve_pub = in_key) - INTO out_reserve_pub_reuse; -IF out_reserve_pub_reuse THEN - RETURN; -END IF; --- Register incoming transaction -CASE in_type - WHEN 'reserve' THEN INSERT INTO incoming_transactions ( - amount, - debit_payto, - creation_time, - type, - reserve_pub, - account_pub, - wad_id, - origin_exchange_url - ) VALUES ( - in_amount, - in_debit_payto, - in_timestamp, - in_type, - in_key, - NULL, - NULL, - NULL - ) RETURNING incoming_transaction_id INTO out_tx_row_id; - WHEN 'kyc' THEN INSERT INTO incoming_transactions ( - amount, - debit_payto, - creation_time, - type, - reserve_pub, - account_pub, - wad_id, - origin_exchange_url - ) VALUES ( - in_amount, - in_debit_payto, - in_timestamp, - in_type, - NULL, - in_key, - NULL, - NULL - ) RETURNING incoming_transaction_id INTO out_tx_row_id; - ELSE RAISE EXCEPTION 'Unsupported incoming type %', in_type; -END CASE; --- Notify new incoming transaction -PERFORM pg_notify('incoming_tx', out_tx_row_id || ''); -END $$; -COMMENT ON FUNCTION add_incoming IS 'Create an incoming taler transaction and register it'; - -COMMIT; diff --git a/taler-api/src/db.rs b/taler-api/src/db.rs @@ -14,33 +14,20 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use query_helper::{history, page, SqlQueryHelper}; -use sqlx::{ - postgres::{PgListener, PgRow}, - QueryBuilder, Row, -}; -use sqlx::{PgPool, Postgres}; +use std::time::Duration; + +use sqlx::{error::BoxDynError, postgres::PgRow, query::Query, Decode, Error, QueryBuilder, Type}; +use sqlx::{PgPool, Postgres, Row}; use taler_common::{ - amount::Amount, - api_common::{EddsaPublicKey, SafeU64, Timestamp}, + amount::{Amount, Decimal}, + api_common::{Base32, SafeU64, Timestamp}, api_params::{History, Page}, - api_wire::{ - IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, - TransferResponse, TransferState, TransferStatus, - }, }; -use tokio::sync::watch::{Receiver, Sender}; -use tracing::debug; -use type_helper::SqlTypeHelper; +use tokio::sync::watch::Receiver; use url::Url; -use crate::error::{ApiError, ApiResult}; - pub type PgQueryBuilder<'b> = QueryBuilder<'b, Postgres>; -pub mod query_helper; -pub mod type_helper; - #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] #[allow(non_camel_case_types)] #[sqlx(type_name = "incoming_type")] @@ -50,278 +37,174 @@ pub enum IncomingType { wad, } -pub async fn notification_listener( - pool: PgPool, - outgoing_channel: Sender<i64>, - incoming_channel: Sender<i64>, -) -> Result<(), ApiError> { - let mut listener = PgListener::connect_with(&pool).await?; - listener.listen_all(["outgoing_tx", "incoming_tx"]).await?; - loop { - while let Some(notification) = listener.try_recv().await? { - debug!( - "db notification: {} - {}", - notification.channel(), - notification.payload() - ); - match notification.channel() { - "outgoing_tx" => { - let row_id: i64 = notification.payload().parse().unwrap(); - outgoing_channel.send_replace(row_id); - } - "incoming_tx" => { - let row_id: i64 = notification.payload().parse().unwrap(); - incoming_channel.send_replace(row_id); - } - unknown => unreachable!("{}", unknown), +/* ----- Routines ------ */ + +pub async fn page<'a, R: Send + Unpin>( + pool: &PgPool, + id_col: &str, + params: &Page, + prepare: impl Fn() -> QueryBuilder<'a, Postgres>, + map: impl Fn(PgRow) -> Result<R, Error> + Send, +) -> Result<Vec<R>, Error> { + let mut builder = prepare(); + if let Some(offset) = params.offset { + builder + .push(format_args!( + " {id_col} {}", + if params.backward() { '<' } else { '>' } + )) + .push_bind(offset); + } else { + builder.push("TRUE"); + } + builder.push(format_args!( + " ORDER BY {id_col} {} LIMIT ", + if params.backward() { "DESC" } else { "ASC" } + )); + builder + .push_bind(params.limit.abs()) + .build() + .try_map(map) + .fetch_all(pool) + .await +} + +pub async fn history<'a, R: Send + Unpin>( + pool: &PgPool, + id_col: &str, + params: &History, + listen: impl FnOnce() -> Receiver<i64>, + prepare: impl Fn() -> QueryBuilder<'a, Postgres> + Copy, + map: impl Fn(PgRow) -> Result<R, Error> + Send + Copy, +) -> Result<Vec<R>, Error> { + let load = || async { page(pool, id_col, &params.page, prepare, map).await }; + + // When going backward there is always at least one transaction or none + if params.page.limit >= 0 && params.timeout_ms.is_some_and(|it| it > 0) { + let mut listener = listen(); + let init = load().await?; + // Long polling if we found no transactions + if init.is_empty() { + let pooling = tokio::time::timeout( + Duration::from_millis(params.timeout_ms.unwrap_or(0)), + async { + listener + .wait_for(|id| params.page.offset.is_none_or(|offset| *id > offset)) + .await + .ok(); + }, + ) + .await; + match pooling { + Ok(_) => load().await, + Err(_) => Ok(init), } + } else { + Ok(init) } - // TODO wait before reconnect + } else { + load().await } } -pub enum TransferResult { - Success(TransferResponse), - RequestUidReuse, -} +/* ----- Bind ----- */ -pub async fn transfer(db: &PgPool, transfer: TransferRequest) -> ApiResult<TransferResult> { - Ok(sqlx::query( - " - SELECT out_request_uid_reuse, out_tx_row_id, out_timestamp - FROM taler_transfer(($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8) - ", - ) - .bind_amount(&transfer.amount) - .bind(transfer.exchange_base_url.as_str()) - .bind(format!("{} {}", transfer.wtid, transfer.exchange_base_url)) - .bind(transfer.credit_account.as_str()) - .bind(transfer.request_uid.as_slice()) - .bind(transfer.wtid.as_slice()) - .bind_timestamp(&Timestamp::now()) - .try_map(|r: PgRow| { - Ok(if r.try_get("out_request_uid_reuse")? { - TransferResult::RequestUidReuse - } else { - TransferResult::Success(TransferResponse { - timestamp: r.try_get_timestamp("out_timestamp")?, - row_id: r.try_get_safeu64("out_tx_row_id")?, - }) - }) - }) - .fetch_one(db) - .await?) +pub trait BindHelper { + fn bind_amount(self, amount: &Amount) -> Self; + fn bind_decimal(self, decimal: &Decimal) -> Self; + fn bind_timestamp(self, timestamp: &Timestamp) -> Self; } -pub async fn transfer_page( - db: &PgPool, - status: &Option<TransferState>, - params: &Page, - currency: &str, -) -> ApiResult<Vec<TransferListStatus>> { - Ok(page( - db, - "transfer_id", - params, - || { - let mut builder = QueryBuilder::new( - " - SELECT - transfer_id, - status, - (amount).val as amount_val, - (amount).frac as amount_frac, - credit_payto, - transfer_time - FROM transfers WHERE - ", - ); - if let Some(status) = status { - builder.push(" status = ").push_bind(status).push(" AND "); - } - builder - }, - |r: PgRow| { - Ok(TransferListStatus { - row_id: r.try_get_safeu64("transfer_id")?, - status: r.try_get("status")?, - amount: r.try_get_amount("amount", currency)?, - credit_account: r.try_get_url("credit_payto")?, - timestamp: r.try_get_timestamp("transfer_time")?, - }) - }, - ) - .await?) -} +impl<'q> BindHelper for Query<'q, Postgres, <Postgres as sqlx::Database>::Arguments<'q>> { + fn bind_amount(self, amount: &Amount) -> Self { + self.bind_decimal(&amount.decimal) + } -pub async fn transfer_by_id( - db: &PgPool, - id: u64, - currency: &str, -) -> ApiResult<Option<TransferStatus>> { - Ok(sqlx::query( - " - SELECT - status, - status_msg, - (amount).val as amount_val, - (amount).frac as amount_frac, - exchange_base_url, - wtid, - credit_payto, - transfer_time - FROM transfers WHERE transfer_id = $1 - ", - ) - .bind(id as i64) - .try_map(|r: PgRow| { - Ok(TransferStatus { - status: r.try_get("status")?, - status_msg: r.try_get("status_msg")?, - amount: r.try_get_amount("amount", currency)?, - origin_exchange_url: r.try_get("exchange_base_url")?, - wtid: r.try_get_base32("wtid")?, - credit_account: r.try_get_url("credit_payto")?, - timestamp: r.try_get_timestamp("transfer_time")?, - }) - }) - .fetch_optional(db) - .await?) -} + fn bind_decimal(self, decimal: &Decimal) -> Self { + self.bind(decimal.value as i64) + .bind(decimal.fraction as i32) + } -pub async fn outgoing_page( - db: &PgPool, - params: &History, - currency: &str, - listen: impl FnOnce() -> Receiver<i64>, -) -> ApiResult<Vec<OutgoingBankTransaction>> { - Ok(history( - db, - "transfer_id", - params, - listen, - || { - QueryBuilder::new( - " - SELECT - transfer_id, - (amount).val as amount_val, - (amount).frac as amount_frac, - exchange_base_url, - wtid, - credit_payto, - transfer_time - FROM transfers WHERE status = 'success' AND - ", - ) - }, - |r| { - Ok(OutgoingBankTransaction { - amount: r.try_get_amount("amount", currency)?, - wtid: r.try_get_base32("wtid")?, - credit_account: r.try_get_url("credit_payto")?, - row_id: r.try_get_safeu64("transfer_id")?, - date: r.try_get_timestamp("transfer_time")?, - exchange_base_url: r.try_get_url("exchange_base_url")?, - }) - }, - ) - .await?) + fn bind_timestamp(self, timestamp: &Timestamp) -> Self { + self.bind(timestamp.as_sql_micros()) + } } -pub enum AddIncomingResult { - Success(SafeU64), - ReservePubReuse, +/* ----- Get ----- */ + +pub trait TypeHelper { + fn try_get_map< + 'r, + I: sqlx::ColumnIndex<Self>, + T: Decode<'r, Postgres> + Type<Postgres>, + E: Into<BoxDynError>, + R, + M: FnOnce(T) -> Result<R, E>, + >( + &'r self, + index: I, + map: M, + ) -> sqlx::Result<R>; + fn try_get_timestamp<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<Timestamp> { + self.try_get_map(index, Timestamp::from_sql_micros) + } + fn try_get_u32<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<u32> { + self.try_get_map(index, |signed: i32| signed.try_into()) + } + fn try_get_u64<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<u64> { + self.try_get_map(index, |signed: i64| signed.try_into()) + } + fn try_get_safeu64<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<SafeU64> { + self.try_get_map(index, |signed: i64| SafeU64::try_from(signed)) + } + fn try_get_base32<I: sqlx::ColumnIndex<Self>, const L: usize>( + &self, + index: I, + ) -> sqlx::Result<Base32<L>> { + self.try_get_map(index, |slice: &[u8]| Base32::try_from(slice)) + } + fn try_get_url<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<Url> { + self.try_get_map(index, Url::parse) + } + fn try_get_amount(&self, index: &str, currency: &str) -> sqlx::Result<Amount>; + fn try_get_amount_i(&self, index: usize, currency: &str) -> sqlx::Result<Amount>; } -pub async fn add_incoming( - db: &PgPool, - amount: &Amount, - debit_account: &Url, - subject: &str, - timestamp: &Timestamp, - kind: IncomingType, - key: &EddsaPublicKey, -) -> ApiResult<AddIncomingResult> { - Ok(sqlx::query( - " - SELECT out_reserve_pub_reuse, out_tx_row_id - FROM add_incoming($1, $2, ($3, $4)::taler_amount, $5, $6, $7) - ", - ) - .bind(key.as_slice()) - .bind(subject) - .bind_amount(amount) - .bind(debit_account.as_str()) - .bind_timestamp(timestamp) - .bind(kind) - .try_map(|r: PgRow| { - Ok(if r.try_get("out_reserve_pub_reuse")? { - AddIncomingResult::ReservePubReuse - } else { - AddIncomingResult::Success(r.try_get_safeu64("out_tx_row_id")?) +impl TypeHelper for PgRow { + fn try_get_map< + 'r, + I: sqlx::ColumnIndex<Self>, + T: Decode<'r, Postgres> + Type<Postgres>, + E: Into<BoxDynError>, + R, + M: FnOnce(T) -> Result<R, E>, + >( + &'r self, + index: I, + map: M, + ) -> sqlx::Result<R> { + let primitive: T = self.try_get(&index)?; + map(primitive).map_err(|source| sqlx::Error::ColumnDecode { + index: format!("{index:?}"), + source: source.into(), }) - }) - .fetch_one(db) - .await?) -} + } -pub async fn incoming_page( - db: &PgPool, - params: &History, - currency: &str, - listen: impl FnOnce() -> Receiver<i64>, -) -> ApiResult<Vec<IncomingBankTransaction>> { - Ok(history( - db, - "incoming_transaction_id", - params, - listen, - || { - QueryBuilder::new( - " - SELECT - type, - incoming_transaction_id, - (amount).val as amount_val, - (amount).frac as amount_frac, - creation_time, - debit_payto, - reserve_pub, - account_pub, - wad_id, - origin_exchange_url - FROM incoming_transactions WHERE - ", - ) - }, - |r: PgRow| { - let kind: IncomingType = r.try_get("type")?; - Ok(match kind { - IncomingType::reserve => IncomingBankTransaction::IncomingReserveTransaction { - row_id: r.try_get_safeu64("incoming_transaction_id")?, - date: r.try_get_timestamp("creation_time")?, - amount: r.try_get_amount("amount", currency)?, - debit_account: r.try_get_url("debit_payto")?, - reserve_pub: r.try_get_base32("reserve_pub")?, - }, - IncomingType::kyc => IncomingBankTransaction::IncomingKycAuthTransaction { - row_id: r.try_get_safeu64("incoming_transaction_id")?, - date: r.try_get_timestamp("creation_time")?, - amount: r.try_get_amount("amount", currency)?, - debit_account: r.try_get_url("debit_payto")?, - account_pub: r.try_get_base32("account_pub")?, - }, - IncomingType::wad => IncomingBankTransaction::IncomingWadTransaction { - row_id: r.try_get_safeu64("incoming_transaction_id")?, - date: r.try_get_timestamp("creation_time")?, - amount: r.try_get_amount("amount", currency)?, - debit_account: r.try_get_url("debit_payto")?, - origin_exchange_url: r.try_get_url("origin_exchange_url")?, - wad_id: r.try_get_base32("wa_id")?, - }, - }) - }, - ) - .await?) + fn try_get_amount(&self, index: &str, currency: &str) -> sqlx::Result<Amount> { + let val_idx = format!("{index}_val"); + let frac_idx = format!("{index}_frac"); + let val_idx = val_idx.as_str(); + let frac_idx = frac_idx.as_str(); + let val = self.try_get_u64(val_idx)?; + let frac = self.try_get_u32(frac_idx)?; + + Ok(Amount::new(currency, val, frac)) + } + + fn try_get_amount_i(&self, index: usize, currency: &str) -> sqlx::Result<Amount> { + let val = self.try_get_u64(index)?; + let frac = self.try_get_u32(index + 1)?; + + Ok(Amount::new(currency, val, frac)) + } } diff --git a/taler-api/src/db/query_helper.rs b/taler-api/src/db/query_helper.rs @@ -1,107 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2024 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::time::Duration; - -use sqlx::{postgres::PgRow, query::Query, Error, PgPool, Postgres, QueryBuilder}; -use taler_common::{ - amount::{Amount, Decimal}, api_common::Timestamp, api_params::{History, Page} -}; -use tokio::sync::watch::Receiver; - -pub trait SqlQueryHelper { - fn bind_amount(self, amount: &Amount) -> Self; - fn bind_decimal(self, decimal: &Decimal) -> Self; - fn bind_timestamp(self, timestamp: &Timestamp) -> Self; -} - -impl<'q> SqlQueryHelper for Query<'q, Postgres, <Postgres as sqlx::Database>::Arguments<'q>> { - fn bind_amount(self, amount: &Amount) -> Self { - self.bind_decimal(&amount.decimal) - } - - fn bind_decimal(self, decimal: &Decimal) -> Self { - self.bind(decimal.value as i64) - .bind(decimal.fraction as i32) - } - - fn bind_timestamp(self, timestamp: &Timestamp) -> Self { - self.bind(timestamp.as_sql_micros()) - } -} - -pub async fn page<'a, R: Send + Unpin>( - pool: &PgPool, - id_col: &str, - params: &Page, - prepare: impl Fn() -> QueryBuilder<'a, Postgres>, - map: impl Fn(PgRow) -> Result<R, Error> + Send, -) -> Result<Vec<R>, Error> { - let mut builder = prepare(); - if let Some(offset) = params.offset { - builder - .push(format_args!( - " {id_col} {}", - if params.backward() { '<' } else { '>' } - )) - .push_bind(offset); - } else { - builder.push("TRUE"); - } - builder.push(format_args!( - " ORDER BY {id_col} {} LIMIT ", - if params.backward() { "DESC" } else { "ASC" } - )); - builder.push_bind(params.limit.abs()).build().try_map(map).fetch_all(pool).await -} - -pub async fn history<'a, R: Send + Unpin>( - pool: &PgPool, - id_col: &str, - params: &History, - listen: impl FnOnce() -> Receiver<i64>, - prepare: impl Fn() -> QueryBuilder<'a, Postgres> + Copy, - map: impl Fn(PgRow) -> Result<R, Error> + Send + Copy, -) -> Result<Vec<R>, Error> { - let load = || async { page(pool, id_col, &params.page, prepare, map).await }; - - // When going backward there is always at least one transaction or none - if params.page.limit >= 0 && params.timeout_ms.is_some_and(|it| it > 0) { - let mut listener = listen(); - let init = load().await?; - // Long polling if we found no transactions - if init.is_empty() { - let pooling = tokio::time::timeout( - Duration::from_millis(params.timeout_ms.unwrap_or(0)), - async { - listener - .wait_for(|id| params.page.offset.is_none_or(|offset| *id > offset)) - .await - .ok(); - }, - ) - .await; - match pooling { - Ok(_) => load().await, - Err(_) => Ok(init), - } - } else { - Ok(init) - } - } else { - load().await - } -} diff --git a/taler-api/src/db/type_helper.rs b/taler-api/src/db/type_helper.rs @@ -1,93 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2024 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use sqlx::{error::BoxDynError, postgres::PgRow, Decode, Postgres, Row, Type}; -use taler_common::{ - amount::Amount, - api_common::{Base32, SafeU64, Timestamp}, -}; -use url::Url; - -pub trait SqlTypeHelper { - fn try_get_map< - 'r, - I: sqlx::ColumnIndex<Self>, - T: Decode<'r, Postgres> + Type<Postgres>, - E: Into<BoxDynError>, - R, - M: FnOnce(T) -> Result<R, E>, - >( - &'r self, - index: I, - map: M, - ) -> sqlx::Result<R>; - fn try_get_timestamp<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<Timestamp> { - self.try_get_map(index, Timestamp::from_sql_micros) - } - fn try_get_safeu64<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<SafeU64> { - self.try_get_map(index, |signed: i64| SafeU64::try_from(signed)) - } - fn try_get_base32<I: sqlx::ColumnIndex<Self>, const L: usize>( - &self, - index: I, - ) -> sqlx::Result<Base32<L>> { - self.try_get_map(index, |slice: &[u8]| Base32::try_from(slice)) - } - fn try_get_url<I: sqlx::ColumnIndex<Self>>(&self, index: I) -> sqlx::Result<Url> { - self.try_get_map(index, Url::parse) - } - fn try_get_amount(&self, index: &str, currency: &str) -> sqlx::Result<Amount>; - fn try_get_amount_i(&self, index: usize, currency: &str) -> sqlx::Result<Amount>; -} - -impl SqlTypeHelper for PgRow { - fn try_get_map< - 'r, - I: sqlx::ColumnIndex<Self>, - T: Decode<'r, Postgres> + Type<Postgres>, - E: Into<BoxDynError>, - R, - M: FnOnce(T) -> Result<R, E>, - >( - &'r self, - index: I, - map: M, - ) -> sqlx::Result<R> { - let primitive: T = self.try_get(&index)?; - map(primitive).map_err(|source| sqlx::Error::ColumnDecode { - index: format!("{index:?}"), - source: source.into(), - }) - } - - fn try_get_amount(&self, index: &str, currency: &str) -> sqlx::Result<Amount> { - let val_idx = format!("{index}_val"); - let frac_idx = format!("{index}_frac"); - let val_idx = val_idx.as_str(); - let frac_idx = frac_idx.as_str(); - let val: i64 = self.try_get(val_idx)?; - let frac: i32 = self.try_get(frac_idx)?; - - Ok(Amount::new(currency, val as u64, frac as u32)) - } - - fn try_get_amount_i(&self, index: usize, currency: &str) -> sqlx::Result<Amount> { - let val: i64 = self.try_get(index)?; - let frac: i32 = self.try_get(index + 1)?; - - Ok(Amount::new(currency, val as u64, frac as u32)) - } -} diff --git a/taler-api/tests/common.rs b/taler-api/tests/common.rs @@ -1,191 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2024 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::sync::Arc; - -use axum::Router; -use sqlx::PgPool; -use taler_api::{ - db::{self, notification_listener}, - error::{failure, ApiResult}, - wire_gateway_api, WireGatewayImpl, -}; -use taler_common::{ - api_common::Timestamp, - api_params::{History, Page}, - api_wire::{ - AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, - IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, - TransferState, TransferStatus, - }, - error_code::ErrorCode, -}; -use tokio::sync::watch::Sender; -use url::Url; - -/// Sample Wire Gateway implementation for tests -pub struct SampleState { - currency: String, - pool: PgPool, - outgoing_channel: Sender<i64>, - incoming_channel: Sender<i64>, -} - -impl WireGatewayImpl for SampleState { - fn currency(&self) -> &str { - &self.currency - } - - fn implementation(&self) -> Option<&str> { - None - } - - async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { - let result = db::transfer(&self.pool, req).await?; - match result { - db::TransferResult::Success(transfer_response) => Ok(transfer_response), - db::TransferResult::RequestUidReuse => Err(failure( - ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, - "request_uid used already", - )), - } - } - - async fn transfer_page( - &self, - page: Page, - status: Option<TransferState>, - ) -> ApiResult<TransferList> { - Ok(TransferList { - transfers: db::transfer_page(&self.pool, &status, &page, &self.currency).await?, - debit_account: Url::parse("payto://todo").unwrap(), - }) - } - - async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { - db::transfer_by_id(&self.pool, id, &self.currency).await - } - - async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { - let txs = db::outgoing_page(&self.pool, &params, &self.currency, || { - self.outgoing_channel.subscribe() - }) - .await?; - Ok(OutgoingHistory { - outgoing_transactions: txs, - debit_account: Url::parse("payto://todo").unwrap(), - }) - } - - async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { - let txs = db::incoming_page(&self.pool, &params, &self.currency, || { - self.incoming_channel.subscribe() - }) - .await?; - Ok(IncomingHistory { - incoming_transactions: txs, - credit_account: Url::parse("payto://todo").unwrap(), - }) - } - - async fn add_incoming_reserve( - &self, - req: AddIncomingRequest, - ) -> ApiResult<AddIncomingResponse> { - let timestamp = Timestamp::now(); - let res = db::add_incoming( - &self.pool, - &req.amount, - &req.debit_account, - "", - &timestamp, - db::IncomingType::reserve, - &req.reserve_pub, - ) - .await?; - match res { - db::AddIncomingResult::Success(row_id) => Ok(AddIncomingResponse { timestamp, row_id }), - db::AddIncomingResult::ReservePubReuse => Err(failure( - ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, - "reserve_pub used already".to_owned(), - )), - } - } - - async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> { - let timestamp = Timestamp::now(); - let res = db::add_incoming( - &self.pool, - &req.amount, - &req.debit_account, - "", - &timestamp, - db::IncomingType::kyc, - &req.account_pub, - ) - .await?; - match res { - db::AddIncomingResult::Success(row_id) => Ok(AddKycauthResponse { timestamp, row_id }), - db::AddIncomingResult::ReservePubReuse => Err(failure( - ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, - "reserve_pub used already".to_owned(), - )), - } - } - - fn check_currency(&self, amount: &taler_common::amount::Amount) -> ApiResult<()> { - let currency = self.currency(); - if amount.currency.as_ref() != currency { - Err(failure( - ErrorCode::GENERIC_CURRENCY_MISMATCH, - std::format!( - "Wrong currency: expected {} got {}", - currency, - amount.currency - ), - )) - } else { - Ok(()) - } - } -} - -pub async fn sample_wire_gateway_api(pool: Option<PgPool>, currency: String) -> Router { - // Create pool - let pool = match pool { - Some(it) => it, - None => PgPool::connect("postgre:///magnetcheck").await.unwrap(), - }; - // Reset db - sqlx::raw_sql(include_str!("../schema.sql")) - .execute(&pool) - .await - .unwrap(); - let outgoing_channel = Sender::new(0); - let incoming_channel = Sender::new(0); - let wg = SampleState { - currency, - pool: pool.clone(), - outgoing_channel: outgoing_channel.clone(), - incoming_channel: incoming_channel.clone(), - }; - tokio::spawn(notification_listener( - pool, - outgoing_channel, - incoming_channel, - )); - wire_gateway_api(Arc::new(wg)) -} diff --git a/taler-api/tests/common/db.rs b/taler-api/tests/common/db.rs @@ -0,0 +1,312 @@ +/* + This file is part of TALER + Copyright (C) 2024 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use sqlx::{ + postgres::{PgListener, PgRow}, + PgPool, QueryBuilder, Row, +}; +use taler_api::{ + db::{history, page, BindHelper, IncomingType, TypeHelper}, + error::{ApiError, ApiResult}, +}; +use taler_common::{ + amount::Amount, + api_common::{EddsaPublicKey, SafeU64, Timestamp}, + api_params::{History, Page}, + api_wire::{ + IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, + TransferResponse, TransferState, TransferStatus, + }, +}; +use tokio::sync::watch::{Receiver, Sender}; +use tracing::debug; +use url::Url; + +pub async fn notification_listener( + pool: PgPool, + outgoing_channel: Sender<i64>, + incoming_channel: Sender<i64>, +) -> Result<(), ApiError> { + let mut listener = PgListener::connect_with(&pool).await?; + listener.listen_all(["outgoing_tx", "incoming_tx"]).await?; + loop { + while let Some(notification) = listener.try_recv().await? { + debug!( + "db notification: {} - {}", + notification.channel(), + notification.payload() + ); + match notification.channel() { + "outgoing_tx" => { + let row_id: i64 = notification.payload().parse().unwrap(); + outgoing_channel.send_replace(row_id); + } + "incoming_tx" => { + let row_id: i64 = notification.payload().parse().unwrap(); + incoming_channel.send_replace(row_id); + } + unknown => unreachable!("{}", unknown), + } + } + // TODO wait before reconnect + } +} + +pub enum TransferResult { + Success(TransferResponse), + RequestUidReuse, +} + +pub async fn transfer(db: &PgPool, transfer: TransferRequest) -> ApiResult<TransferResult> { + Ok(sqlx::query( + " + SELECT out_request_uid_reuse, out_tx_row_id, out_timestamp + FROM taler_transfer(($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8) + ", + ) + .bind_amount(&transfer.amount) + .bind(transfer.exchange_base_url.as_str()) + .bind(format!("{} {}", transfer.wtid, transfer.exchange_base_url)) + .bind(transfer.credit_account.as_str()) + .bind(transfer.request_uid.as_slice()) + .bind(transfer.wtid.as_slice()) + .bind_timestamp(&Timestamp::now()) + .try_map(|r: PgRow| { + Ok(if r.try_get("out_request_uid_reuse")? { + TransferResult::RequestUidReuse + } else { + TransferResult::Success(TransferResponse { + timestamp: r.try_get_timestamp("out_timestamp")?, + row_id: r.try_get_safeu64("out_tx_row_id")?, + }) + }) + }) + .fetch_one(db) + .await?) +} + +pub async fn transfer_page( + db: &PgPool, + status: &Option<TransferState>, + params: &Page, + currency: &str, +) -> ApiResult<Vec<TransferListStatus>> { + Ok(page( + db, + "transfer_id", + params, + || { + let mut builder = QueryBuilder::new( + " + SELECT + transfer_id, + status, + (amount).val as amount_val, + (amount).frac as amount_frac, + credit_payto, + transfer_time + FROM transfers WHERE + ", + ); + if let Some(status) = status { + builder.push(" status = ").push_bind(status).push(" AND "); + } + builder + }, + |r: PgRow| { + Ok(TransferListStatus { + row_id: r.try_get_safeu64("transfer_id")?, + status: r.try_get("status")?, + amount: r.try_get_amount("amount", currency)?, + credit_account: r.try_get_url("credit_payto")?, + timestamp: r.try_get_timestamp("transfer_time")?, + }) + }, + ) + .await?) +} + +pub async fn transfer_by_id( + db: &PgPool, + id: u64, + currency: &str, +) -> ApiResult<Option<TransferStatus>> { + Ok(sqlx::query( + " + SELECT + status, + status_msg, + (amount).val as amount_val, + (amount).frac as amount_frac, + exchange_base_url, + wtid, + credit_payto, + transfer_time + FROM transfers WHERE transfer_id = $1 + ", + ) + .bind(id as i64) + .try_map(|r: PgRow| { + Ok(TransferStatus { + status: r.try_get("status")?, + status_msg: r.try_get("status_msg")?, + amount: r.try_get_amount("amount", currency)?, + origin_exchange_url: r.try_get("exchange_base_url")?, + wtid: r.try_get_base32("wtid")?, + credit_account: r.try_get_url("credit_payto")?, + timestamp: r.try_get_timestamp("transfer_time")?, + }) + }) + .fetch_optional(db) + .await?) +} + +pub async fn outgoing_page( + db: &PgPool, + params: &History, + currency: &str, + listen: impl FnOnce() -> Receiver<i64>, +) -> ApiResult<Vec<OutgoingBankTransaction>> { + Ok(history( + db, + "transfer_id", + params, + listen, + || { + QueryBuilder::new( + " + SELECT + transfer_id, + (amount).val as amount_val, + (amount).frac as amount_frac, + exchange_base_url, + wtid, + credit_payto, + transfer_time + FROM transfers WHERE status = 'success' AND + ", + ) + }, + |r| { + Ok(OutgoingBankTransaction { + amount: r.try_get_amount("amount", currency)?, + wtid: r.try_get_base32("wtid")?, + credit_account: r.try_get_url("credit_payto")?, + row_id: r.try_get_safeu64("transfer_id")?, + date: r.try_get_timestamp("transfer_time")?, + exchange_base_url: r.try_get_url("exchange_base_url")?, + }) + }, + ) + .await?) +} + +pub enum AddIncomingResult { + Success(SafeU64), + ReservePubReuse, +} + +pub async fn add_incoming( + db: &PgPool, + amount: &Amount, + debit_account: &Url, + subject: &str, + timestamp: &Timestamp, + kind: IncomingType, + key: &EddsaPublicKey, +) -> ApiResult<AddIncomingResult> { + Ok(sqlx::query( + " + SELECT out_reserve_pub_reuse, out_tx_row_id + FROM add_incoming($1, $2, ($3, $4)::taler_amount, $5, $6, $7) + ", + ) + .bind(key.as_slice()) + .bind(subject) + .bind_amount(amount) + .bind(debit_account.as_str()) + .bind_timestamp(timestamp) + .bind(kind) + .try_map(|r: PgRow| { + Ok(if r.try_get("out_reserve_pub_reuse")? { + AddIncomingResult::ReservePubReuse + } else { + AddIncomingResult::Success(r.try_get_safeu64("out_tx_row_id")?) + }) + }) + .fetch_one(db) + .await?) +} + +pub async fn incoming_page( + db: &PgPool, + params: &History, + currency: &str, + listen: impl FnOnce() -> Receiver<i64>, +) -> ApiResult<Vec<IncomingBankTransaction>> { + Ok(history( + db, + "incoming_transaction_id", + params, + listen, + || { + QueryBuilder::new( + " + SELECT + type, + incoming_transaction_id, + (amount).val as amount_val, + (amount).frac as amount_frac, + creation_time, + debit_payto, + reserve_pub, + account_pub, + wad_id, + origin_exchange_url + FROM incoming_transactions WHERE + ", + ) + }, + |r: PgRow| { + let kind: IncomingType = r.try_get("type")?; + Ok(match kind { + IncomingType::reserve => IncomingBankTransaction::IncomingReserveTransaction { + row_id: r.try_get_safeu64("incoming_transaction_id")?, + date: r.try_get_timestamp("creation_time")?, + amount: r.try_get_amount("amount", currency)?, + debit_account: r.try_get_url("debit_payto")?, + reserve_pub: r.try_get_base32("reserve_pub")?, + }, + IncomingType::kyc => IncomingBankTransaction::IncomingKycAuthTransaction { + row_id: r.try_get_safeu64("incoming_transaction_id")?, + date: r.try_get_timestamp("creation_time")?, + amount: r.try_get_amount("amount", currency)?, + debit_account: r.try_get_url("debit_payto")?, + account_pub: r.try_get_base32("account_pub")?, + }, + IncomingType::wad => IncomingBankTransaction::IncomingWadTransaction { + row_id: r.try_get_safeu64("incoming_transaction_id")?, + date: r.try_get_timestamp("creation_time")?, + amount: r.try_get_amount("amount", currency)?, + debit_account: r.try_get_url("debit_payto")?, + origin_exchange_url: r.try_get_url("origin_exchange_url")?, + wad_id: r.try_get_base32("wa_id")?, + }, + }) + }, + ) + .await?) +} diff --git a/taler-api/tests/common/mod.rs b/taler-api/tests/common/mod.rs @@ -0,0 +1,194 @@ +/* + This file is part of TALER + Copyright (C) 2024 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use std::sync::Arc; + +use axum::Router; +use db::notification_listener; +use sqlx::PgPool; +use taler_api::{ + db::IncomingType, + error::{failure, ApiResult}, + wire_gateway_api, WireGatewayImpl, +}; +use taler_common::{ + api_common::Timestamp, + api_params::{History, Page}, + api_wire::{ + AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, + IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, + TransferState, TransferStatus, + }, + error_code::ErrorCode, +}; +use tokio::sync::watch::Sender; +use url::Url; + +pub mod db; + +/// Sample Wire Gateway implementation for tests +pub struct SampleState { + currency: String, + pool: PgPool, + outgoing_channel: Sender<i64>, + incoming_channel: Sender<i64>, +} + +impl WireGatewayImpl for SampleState { + fn currency(&self) -> &str { + &self.currency + } + + fn implementation(&self) -> Option<&str> { + None + } + + async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { + let result = db::transfer(&self.pool, req).await?; + match result { + db::TransferResult::Success(transfer_response) => Ok(transfer_response), + db::TransferResult::RequestUidReuse => Err(failure( + ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, + "request_uid used already", + )), + } + } + + async fn transfer_page( + &self, + page: Page, + status: Option<TransferState>, + ) -> ApiResult<TransferList> { + Ok(TransferList { + transfers: db::transfer_page(&self.pool, &status, &page, &self.currency).await?, + debit_account: Url::parse("payto://todo").unwrap(), + }) + } + + async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { + db::transfer_by_id(&self.pool, id, &self.currency).await + } + + async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { + let txs = db::outgoing_page(&self.pool, &params, &self.currency, || { + self.outgoing_channel.subscribe() + }) + .await?; + Ok(OutgoingHistory { + outgoing_transactions: txs, + debit_account: Url::parse("payto://todo").unwrap(), + }) + } + + async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { + let txs = db::incoming_page(&self.pool, &params, &self.currency, || { + self.incoming_channel.subscribe() + }) + .await?; + Ok(IncomingHistory { + incoming_transactions: txs, + credit_account: Url::parse("payto://todo").unwrap(), + }) + } + + async fn add_incoming_reserve( + &self, + req: AddIncomingRequest, + ) -> ApiResult<AddIncomingResponse> { + let timestamp = Timestamp::now(); + let res = db::add_incoming( + &self.pool, + &req.amount, + &req.debit_account, + "", + &timestamp, + IncomingType::reserve, + &req.reserve_pub, + ) + .await?; + match res { + db::AddIncomingResult::Success(row_id) => Ok(AddIncomingResponse { timestamp, row_id }), + db::AddIncomingResult::ReservePubReuse => Err(failure( + ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, + "reserve_pub used already".to_owned(), + )), + } + } + + async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> { + let timestamp = Timestamp::now(); + let res = db::add_incoming( + &self.pool, + &req.amount, + &req.debit_account, + "", + &timestamp, + IncomingType::kyc, + &req.account_pub, + ) + .await?; + match res { + db::AddIncomingResult::Success(row_id) => Ok(AddKycauthResponse { timestamp, row_id }), + db::AddIncomingResult::ReservePubReuse => Err(failure( + ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, + "reserve_pub used already".to_owned(), + )), + } + } + + fn check_currency(&self, amount: &taler_common::amount::Amount) -> ApiResult<()> { + let currency = self.currency(); + if amount.currency.as_ref() != currency { + Err(failure( + ErrorCode::GENERIC_CURRENCY_MISMATCH, + std::format!( + "Wrong currency: expected {} got {}", + currency, + amount.currency + ), + )) + } else { + Ok(()) + } + } +} + +pub async fn sample_wire_gateway_api(pool: Option<PgPool>, currency: String) -> Router { + // Create pool + let pool = match pool { + Some(it) => it, + None => PgPool::connect("postgre:///magnetcheck").await.unwrap(), + }; + // Reset db + sqlx::raw_sql(include_str!("../schema.sql")) + .execute(&pool) + .await + .unwrap(); + let outgoing_channel = Sender::new(0); + let incoming_channel = Sender::new(0); + let wg = SampleState { + currency, + pool: pool.clone(), + outgoing_channel: outgoing_channel.clone(), + incoming_channel: incoming_channel.clone(), + }; + tokio::spawn(notification_listener( + pool, + outgoing_channel, + incoming_channel, + )); + wire_gateway_api(Arc::new(wg)) +} diff --git a/taler-api/tests/schema.sql b/taler-api/tests/schema.sql @@ -0,0 +1,190 @@ +-- +-- This file is part of TALER +-- Copyright (C) 2024 Taler Systems SA +-- +-- TALER is free software; you can redistribute it and/or modify it under the +-- terms of the GNU General Public License as published by the Free Software +-- Foundation; either version 3, or (at your option) any later version. +-- +-- TALER is distributed in the hope that it will be useful, but WITHOUT ANY +-- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +-- A PARTICULAR PURPOSE. See the GNU General Public License for more details. +-- +-- You should have received a copy of the GNU General Public License along with +-- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> + +BEGIN; +DROP SCHEMA public CASCADE; +CREATE SCHEMA public; + +CREATE TYPE taler_amount + AS (val INT8, frac INT4); +COMMENT ON TYPE taler_amount + IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; + +CREATE TYPE transfer_status AS ENUM ( + 'pending' + ,'transient_failure' + ,'permanent_failure' + ,'success' +); + +CREATE TABLE transfers ( + transfer_id INT8 PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY + ,amount taler_amount NOT NULL + ,exchange_base_url TEXT NOT NULL + ,subject TEXT NOT NULL + ,credit_payto TEXT NOT NULL + ,request_uid BYTEA UNIQUE NOT NULL CHECK (LENGTH(request_uid)=64) + ,wtid BYTEA UNIQUE NOT NULL CHECK (LENGTH(wtid)=32) + ,status transfer_status NOT NULL + ,status_msg TEXT + ,transfer_time INT8 NOT NULL +); + +CREATE TYPE incoming_type AS ENUM + ('reserve' ,'kyc', 'wad'); +CREATE TABLE incoming_transactions ( + incoming_transaction_id INT8 PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY + ,amount taler_amount NOT NULL + ,debit_payto TEXT NOT NULL + ,creation_time INT8 NOT NULL + ,type incoming_type NOT NULL + ,reserve_pub BYTEA UNIQUE CHECK (LENGTH(reserve_pub)=32) + ,account_pub BYTEA CHECK (LENGTH(account_pub)=32) + ,wad_id BYTEA CHECK (LENGTH(wad_id)=24) + ,origin_exchange_url TEXT + ,CONSTRAINT incoming_polymorphism CHECK( + CASE type + WHEN 'reserve' THEN reserve_pub IS NOT NULL AND account_pub IS NULL AND origin_exchange_url IS NULL AND wad_id IS NULL + WHEN 'kyc' THEN reserve_pub IS NULL AND account_pub IS NOT NULL AND origin_exchange_url IS NULL AND wad_id IS NULL + WHEN 'wad' THEN reserve_pub IS NULL AND account_pub IS NULL AND origin_exchange_url IS NOT NULL AND wad_id IS NOT NULL + END + ) +); + +CREATE FUNCTION taler_transfer( + IN in_amount taler_amount, + IN in_exchange_base_url TEXT, + IN in_subject TEXT, + IN in_credit_payto TEXT, + IN in_request_uid BYTEA, + IN in_wtid BYTEA, + IN in_timestamp INT8, + -- Error status + OUT out_request_uid_reuse BOOLEAN, + -- Success return + OUT out_tx_row_id INT8, + OUT out_timestamp INT8 +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check for idempotence and conflict +SELECT (amount != in_amount + OR credit_payto != in_credit_payto + OR exchange_base_url != in_exchange_base_url + OR wtid != in_wtid) + ,transfer_id, transfer_time + INTO out_request_uid_reuse, out_tx_row_id, out_timestamp + FROM transfers + WHERE request_uid = in_request_uid; +IF found THEN + RETURN; +END IF; +out_request_uid_reuse=false; +out_timestamp=in_timestamp; +-- Register exchange +INSERT INTO transfers ( + amount, + exchange_base_url, + subject, + credit_payto, + request_uid, + wtid, + status, + status_msg, + transfer_time +) VALUES ( + in_amount, + in_exchange_base_url, + in_subject, + in_credit_payto, + in_request_uid, + in_wtid, + 'success', + NULL, + in_timestamp +) RETURNING transfer_id INTO out_tx_row_id; +-- Notify new transaction +PERFORM pg_notify('outgoing_tx', out_tx_row_id || ''); +END $$; +COMMENT ON FUNCTION taler_transfer IS 'Create an outgoing taler transaction and register it'; + +CREATE FUNCTION add_incoming( + IN in_key BYTEA, + IN in_subject TEXT, + IN in_amount taler_amount, + IN in_debit_payto TEXT, + IN in_timestamp INT8, + IN in_type incoming_type, + -- Error status + OUT out_reserve_pub_reuse BOOLEAN, + -- Success return + OUT out_tx_row_id INT8 +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check conflict +SELECT in_type = 'reserve'::incoming_type AND EXISTS(SELECT FROM incoming_transactions WHERE reserve_pub = in_key) + INTO out_reserve_pub_reuse; +IF out_reserve_pub_reuse THEN + RETURN; +END IF; +-- Register incoming transaction +CASE in_type + WHEN 'reserve' THEN INSERT INTO incoming_transactions ( + amount, + debit_payto, + creation_time, + type, + reserve_pub, + account_pub, + wad_id, + origin_exchange_url + ) VALUES ( + in_amount, + in_debit_payto, + in_timestamp, + in_type, + in_key, + NULL, + NULL, + NULL + ) RETURNING incoming_transaction_id INTO out_tx_row_id; + WHEN 'kyc' THEN INSERT INTO incoming_transactions ( + amount, + debit_payto, + creation_time, + type, + reserve_pub, + account_pub, + wad_id, + origin_exchange_url + ) VALUES ( + in_amount, + in_debit_payto, + in_timestamp, + in_type, + NULL, + in_key, + NULL, + NULL + ) RETURNING incoming_transaction_id INTO out_tx_row_id; + ELSE RAISE EXCEPTION 'Unsupported incoming type %', in_type; +END CASE; +-- Notify new incoming transaction +PERFORM pg_notify('incoming_tx', out_tx_row_id || ''); +END $$; +COMMENT ON FUNCTION add_incoming IS 'Create an incoming taler transaction and register it'; + +COMMIT; diff --git a/taler-common/src/api_common.rs b/taler-common/src/api_common.rs @@ -59,13 +59,14 @@ impl Timestamp { } pub fn as_sql_micros(&self) -> i64 { - // TODO fail is i64::MAX match self { Timestamp::Never => i64::MAX, Timestamp::Time(system_time) => system_time .duration_since(SystemTime::UNIX_EPOCH) .unwrap() - .as_micros() as i64, + .as_micros() + .try_into() + .expect("expect timestamp to be a valid i64 number"), } }