depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 635e0e1d0ce6773f7bea4b4fed7c22f614ba2829
parent c97f15ab23c310dfb630f78c1513c38803ab0f2a
Author: Antoine A <>
Date:   Thu,  1 Feb 2024 18:18:49 +0100

Update taler wire api to the newest specification

Diffstat:
Mcommon/src/api_wire.rs | 199++++++++++++++++++++++++++++++++++++++++---------------------------------------
Minstrumentation/src/main.rs | 2+-
Minstrumentation/src/utils.rs | 50+++++++++++++++++++++++++++++++-------------------
Mwire-gateway/src/json.rs | 2+-
Mwire-gateway/src/main.rs | 1084++++++++++++++++++++++++++++++++++++++++---------------------------------------
5 files changed, 687 insertions(+), 650 deletions(-)

diff --git a/common/src/api_wire.rs b/common/src/api_wire.rs @@ -1,98 +1,101 @@ -/* - This file is part of TALER - Copyright (C) 2022 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -use url::Url; - -use crate::api_common::{Amount, EddsaPublicKey, HashCode, SafeU64, ShortHashCode, Timestamp}; - -/// <https://docs.taler.net/core/api-wire.html#tsref-type-TransferResponse> -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct TransferResponse { - pub timestamp: Timestamp, - pub row_id: SafeU64, -} - -/// <https://docs.taler.net/core/api-wire.html#tsref-type-TransferRequest> -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] -pub struct TransferRequest { - pub request_uid: HashCode, - pub amount: Amount, - pub exchange_base_url: Url, - pub wtid: ShortHashCode, - pub credit_account: Url, -} - -/// <https://docs.taler.net/core/api-wire.html#tsref-type-OutgoingHistory> -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct OutgoingHistory { - pub outgoing_transactions: Vec<OutgoingBankTransaction>, -} - -/// <https://docs.taler.net/core/api-wire.html#tsref-type-OutgoingBankTransaction> -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct OutgoingBankTransaction { - pub row_id: SafeU64, - pub date: Timestamp, - pub amount: Amount, - pub credit_account: Url, - pub debit_account: Url, - pub wtid: ShortHashCode, - pub exchange_base_url: Url, -} - -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct IncomingHistory { - pub incoming_transactions: Vec<IncomingBankTransaction>, -} -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -#[serde(tag = "type")] -pub enum IncomingBankTransaction { - #[serde(rename = "RESERVE")] - IncomingReserveTransaction { - row_id: SafeU64, - date: Timestamp, - amount: Amount, - credit_account: Url, - debit_account: Url, - reserve_pub: EddsaPublicKey, - }, - #[serde(rename = "WAD")] - IncomingWadTransaction { - // TODO not yet supported - }, -} - -/// <https://docs.taler.net/core/api-wire.html#tsref-type-AddIncomingRequest> -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AddIncomingRequest { - pub amount: Amount, - pub reserve_pub: EddsaPublicKey, - pub debit_account: Url, -} - -/// <https://docs.taler.net/core/api-wire.html#tsref-type-AddIncomingResponse> -#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] -pub struct AddIncomingResponse { - pub row_id: SafeU64, - pub timestamp: Timestamp, -} - -/// <https://docs.taler.net/core/api-wire.html#querying-the-transaction-history> -#[derive(Debug, Clone, serde::Deserialize)] -pub struct HistoryParams { - pub start: Option<u64>, - pub delta: i64, - pub long_pool_ms: Option<u64>, -} +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +/// Type for the Taler Wire Gateway HTTP API <https://docs.taler.net/core/api-bank-wire.html#taler-wire-gateway-http-api> +use url::Url; + +use crate::api_common::{Amount, EddsaPublicKey, HashCode, SafeU64, ShortHashCode, Timestamp}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct WireConfig { + pub name: &'static str, + pub version: &'static str, + pub currency: &'static str, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TransferResponse { + pub timestamp: Timestamp, + pub row_id: SafeU64, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub struct TransferRequest { + pub request_uid: HashCode, + pub amount: Amount, + pub exchange_base_url: Url, + pub wtid: ShortHashCode, + pub credit_account: Url, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct OutgoingHistory { + pub outgoing_transactions: Vec<OutgoingBankTransaction>, + pub debit_account: Url, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct OutgoingBankTransaction { + pub row_id: SafeU64, + pub date: Timestamp, + pub amount: Amount, + pub credit_account: Url, + pub wtid: ShortHashCode, + pub exchange_base_url: Url, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct IncomingHistory { + pub credit_account: Url, + pub incoming_transactions: Vec<IncomingBankTransaction>, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(tag = "type")] +pub enum IncomingBankTransaction { + #[serde(rename = "RESERVE")] + IncomingReserveTransaction { + row_id: SafeU64, + date: Timestamp, + amount: Amount, + debit_account: Url, + reserve_pub: EddsaPublicKey, + }, + #[serde(rename = "WAD")] + IncomingWadTransaction { + // TODO not yet supported + }, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct AddIncomingRequest { + pub amount: Amount, + pub reserve_pub: EddsaPublicKey, + pub debit_account: Url, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct AddIncomingResponse { + pub row_id: SafeU64, + pub timestamp: Timestamp, +} + +#[derive(Debug, Clone, serde::Deserialize)] +pub struct HistoryParams { + pub start: Option<u64>, + pub delta: i64, + pub long_pool_ms: Option<u64>, +} diff --git a/instrumentation/src/main.rs b/instrumentation/src/main.rs @@ -152,7 +152,7 @@ pub fn main() { let len = results.len(); m.clear().unwrap(); for ((result, _, out, msg), name) in &results { - if let Err(_) = result { + if result.is_err() { println!("{} {}\n{}", name.magenta(), msg.red(), out.bright_black()); } } diff --git a/instrumentation/src/utils.rs b/instrumentation/src/utils.rs @@ -44,16 +44,21 @@ pub fn print_now(disp: impl Display) { #[must_use] pub fn check_incoming(base_url: &str, txs: &[([u8; 32], Amount)]) -> bool { - let history: IncomingHistory = ureq::get(&format!("{}history/incoming", base_url)) + let res = ureq::get(&format!("{}history/incoming", base_url)) .query("delta", &format!("-{}", txs.len())) .call() - .unwrap() - .into_json() .unwrap(); + if txs.is_empty() { + res.status() == 204 + } else { + if res.status() != 200 { + return false; + } + let history: IncomingHistory = res.into_json().unwrap(); - history.incoming_transactions.len() == txs.len() - && txs.iter().all(|(reserve_pub_key, taler_amount)| { - history.incoming_transactions.iter().any(|h| { + history.incoming_transactions.len() == txs.len() + && txs.iter().all(|(reserve_pub_key, taler_amount)| { + history.incoming_transactions.iter().any(|h| { matches!( h, IncomingBankTransaction::IncomingReserveTransaction { @@ -63,7 +68,8 @@ pub fn check_incoming(base_url: &str, txs: &[([u8; 32], Amount)]) -> bool { } if reserve_pub == &Base32::from(*reserve_pub_key) && amount == taler_amount ) }) - }) + }) + } } pub fn gateway_error(path: &str, error: u16) { @@ -96,10 +102,7 @@ pub fn check_gateway_down(base_url: &str) -> bool { #[must_use] pub fn check_gateway_up(base_url: &str) -> bool { - ureq::get(&format!("{}history/incoming", base_url)) - .query("delta", "-5") - .call() - .is_ok() + ureq::get(&format!("{}config", base_url)).call().is_ok() } pub fn transfer(base_url: &str, wtid: &[u8; 32], url: &Url, credit_account: Url, amount: &Amount) { @@ -116,18 +119,27 @@ pub fn transfer(base_url: &str, wtid: &[u8; 32], url: &Url, credit_account: Url, #[must_use] pub fn check_outgoing(base_url: &str, url: &Url, txs: &[([u8; 32], Amount)]) -> bool { - let history: OutgoingHistory = ureq::get(&format!("{}history/outgoing", base_url)) + let res = ureq::get(&format!("{}history/outgoing", base_url)) .query("delta", &format!("-{}", txs.len())) .call() - .unwrap() - .into_json() .unwrap(); - history.outgoing_transactions.len() == txs.len() - && txs.iter().all(|(wtid, amount)| { - history.outgoing_transactions.iter().any(|h| { - h.wtid == Base32::from(*wtid) && &h.exchange_base_url == url && &h.amount == amount + if txs.is_empty() { + res.status() == 204 + } else { + if res.status() != 200 { + return false; + } + let history: OutgoingHistory = res.into_json().unwrap(); + + history.outgoing_transactions.len() == txs.len() + && txs.iter().all(|(wtid, amount)| { + history.outgoing_transactions.iter().any(|h| { + h.wtid == Base32::from(*wtid) + && &h.exchange_base_url == url + && &h.amount == amount + }) }) - }) + } } pub struct ChildGuard(pub Child); diff --git a/wire-gateway/src/json.rs b/wire-gateway/src/json.rs @@ -73,7 +73,7 @@ pub enum EncodeBodyError { Json(#[from] serde_json::Error), } -pub async fn encode_body<J: serde::Serialize>( +pub fn encode_body<J: serde::Serialize>( parts: &Parts, status: StatusCode, json: &J, diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs @@ -1,531 +1,553 @@ -/* - This file is part of TALER - Copyright (C) 2022 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -use clap::Parser; -use common::{ - api_common::{ShortHashCode, Timestamp}, - api_wire::{ - HistoryParams, IncomingBankTransaction, IncomingHistory, OutgoingBankTransaction, - OutgoingHistory, TransferRequest, TransferResponse, - }, - config::{AuthMethod, TalerConfig}, - currency::Currency, - error_codes::ErrorCode, - log::{ - fail, - log::{error, info, log, Level}, - }, - postgres::{self, fallible_iterator::FallibleIterator}, - sql::{sql_amount, sql_array, sql_safe_u64, sql_url}, - url::Url, -}; -use deadpool_postgres::{Pool, Runtime}; -use error::{CatchResult, ServerError}; -use hyper::{ - http::request::Parts, - service::{make_service_fn, service_fn}, - Body, Method, Response, Server, StatusCode, -}; -use json::{encode_body, parse_body}; -use listenfd::ListenFd; -use std::{ - convert::Infallible, - path::PathBuf, - str::FromStr, - sync::atomic::{AtomicBool, AtomicU32, Ordering}, - time::{Duration, Instant}, -}; -use tokio::sync::Notify; -use tokio_postgres::{config::Host, NoTls}; - -mod error; -mod json; - -struct ServerState { - pool: Pool, - db_config: postgres::Config, - payto: Url, - currency: Currency, - notify: Notify, - lifetime: Option<AtomicU32>, - status: AtomicBool, - auth: AuthMethod, -} - -impl ServerState { - /// Decrease lifetime, triggering graceful shutdown when reach lifetime end - pub fn step(&self) { - if let Some(lifetime) = &self.lifetime { - let mut current = lifetime.load(Ordering::Relaxed); - loop { - if current == 0 { - self.notify.notify_one(); - } - match lifetime.compare_exchange_weak( - current, - current - 1, - Ordering::SeqCst, - Ordering::Relaxed, - ) { - Ok(_) => break, - Err(new) => current = new, - } - } - } - } - - pub async fn shutdown_signal(&self) { - self.notify.notified().await; - info!("Reach end of lifetime"); - } -} - -/// Taler wire gateway server for depolymerizer -#[derive(clap::Parser, Debug)] -struct Args { - /// Override default configuration file path - #[clap(global = true, short, long)] - config: Option<PathBuf>, -} - -#[tokio::main] -async fn main() { - common::log::init(); - let args = Args::parse(); - let taler_config = TalerConfig::load(args.config.as_deref()); - - #[cfg(feature = "test")] - common::log::log::warn!("Running with test admin endpoint unsuitable for production"); - - // Parse postgres url - let db_config = taler_config.db_config(); - // TODO find a way to clean this ugly mess - let mut cfg = deadpool_postgres::Config::new(); - cfg.user = db_config.get_user().map(|it| it.to_string()); - cfg.password = db_config - .get_password() - .map(|it| String::from_utf8(it.to_vec()).unwrap()); - cfg.dbname = db_config.get_dbname().map(|it| it.to_string()); - cfg.options = db_config.get_options().map(|it| it.to_string()); - cfg.host = Some( - db_config - .get_hosts() - .iter() - .map(|it| match it { - Host::Tcp(it) => it.to_string(), - #[cfg(unix)] - Host::Unix(it) => it.to_str().unwrap().to_string(), - }) - .collect(), - ); - cfg.ports = Some(db_config.get_ports().to_vec()); - cfg.application_name = db_config.get_application_name().map(|it| it.to_string()); - cfg.connect_timeout = db_config.get_connect_timeout().cloned(); - - let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap(); - let payto = taler_config.payto(); - let state = ServerState { - pool, - notify: Notify::new(), - lifetime: taler_config.http_lifetime().map(AtomicU32::new), - status: AtomicBool::new(true), - db_config, - payto, - currency: taler_config.currency, - auth: taler_config.auth_method(), - }; - let state: &'static ServerState = Box::leak(Box::new(state)); - std::thread::spawn(move || status_watcher(state)); - let service = service_fn(move |req| async move { - state.step(); - let start = Instant::now(); - let (parts, body) = req.into_parts(); - let (response, msg) = match router(&parts, body, state).await { - Ok(resp) => (resp, String::new()), - Err(err) => err.response(), - }; - let status = response.status().as_u16(); - let level = if status >= 500 { - Level::Error - } else if status >= 400 { - Level::Warn - } else { - Level::Info - }; - log!( - level, - "{} {} {}ms {} - {}", - parts.method, - status, - start.elapsed().as_millis(), - parts.uri.path(), - msg - ); - Ok::<Response<Body>, Infallible>(response) - }); - let make_service = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) }); - let make_service_unix = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) }); - - let mut listenfd = ListenFd::from_env(); - - if let Some(listener) = listenfd.take_tcp_listener(0).unwrap() { - info!( - "Server listening on activated socket {}", - listener.local_addr().unwrap() - ); - let server = Server::from_tcp(listener) - .unwrap() - .serve(make_service) - .with_graceful_shutdown(state.shutdown_signal()); - if let Err(e) = server.await { - error!("server: {}", e); - } - } else if let Some(path) = taler_config.unix_path() { - use hyperlocal::UnixServerExt; - info!("Server listening on unix domain socket {:?}", path); - if let Err(err) = std::fs::remove_file(&path) { - if err.kind() != std::io::ErrorKind::NotFound { - fail(format_args!("{}", err)); - } - } - let server = Server::bind_unix(path) - .unwrap() - .serve(make_service_unix) - .with_graceful_shutdown(state.shutdown_signal()); - if let Err(e) = server.await { - error!("server: {}", e); - } - } else { - let addr = ([0, 0, 0, 0], taler_config.port()).into(); - info!("Server listening on http://{}", &addr); - let server = Server::bind(&addr) - .serve(make_service) - .with_graceful_shutdown(state.shutdown_signal()); - if let Err(e) = server.await { - error!("server: {}", e); - } - }; - info!("wire-gateway stopped"); -} - -/// Check if an url if a valid payto url for the configured currency -fn check_payto(url: &Url, currency: Currency) -> bool { - match currency { - Currency::ETH(_) => check_pay_to_eth(url), - Currency::BTC(_) => check_pay_to_btc(url), - } -} - -/// Check if an url is a valid bitcoin payto url -fn check_pay_to_btc(url: &Url) -> bool { - return url.domain() == Some("bitcoin") - && url.scheme() == "payto" - && url.username() == "" - && url.password().is_none() - && url.query().is_none() - && url.fragment().is_none() - && bitcoin::Address::from_str(url.path().trim_start_matches('/')).is_ok(); -} - -/// Check if an url is a valid ethereum payto url -fn check_pay_to_eth(url: &Url) -> bool { - return url.domain() == Some("ethereum") - && url.scheme() == "payto" - && url.username() == "" - && url.password().is_none() - && url.query().is_none() - && url.fragment().is_none() - && ethereum_types::H160::from_str(url.path().trim_start_matches('/')).is_ok(); -} - -/// Assert request method match expected -fn assert_method(parts: &Parts, method: Method) -> Result<(), ServerError> { - if parts.method == method { - Ok(()) - } else { - Err(ServerError::code( - StatusCode::METHOD_NOT_ALLOWED, - ErrorCode::GENERIC_METHOD_INVALID, - )) - } -} - -/// Parse history params from request -fn history_params(parts: &Parts) -> Result<HistoryParams, ServerError> { - let params: HistoryParams = serde_urlencoded::from_str(parts.uri.query().unwrap_or("")) - .catch_code( - StatusCode::BAD_REQUEST, - ErrorCode::GENERIC_PARAMETER_MALFORMED, - )?; - if params.delta == 0 { - return Err(ServerError::code( - StatusCode::BAD_REQUEST, - ErrorCode::GENERIC_PARAMETER_MALFORMED, - )); - } - Ok(params) -} - -/// Generate sql query filter from history params -fn sql_history_filter(params: &HistoryParams) -> String { - let asc = params.delta > 0; - let limit = params.delta.abs(); - let order_sql = if asc { "ASC" } else { "DESC" }; - let where_sql = if let Some(start) = params.start { - format!("WHERE id {} {}", if asc { '>' } else { '<' }, start) - } else { - String::new() - }; - format!("{} ORDER BY id {} LIMIT {}", where_sql, order_sql, limit) -} - -async fn router( - parts: &Parts, - body: Body, - state: &'static ServerState, -) -> Result<Response<Body>, ServerError> { - // Check status error - if !state.status.load(Ordering::SeqCst) { - return Ok(Response::builder() - .status(StatusCode::BAD_GATEWAY) - .body(Body::empty()) - .unwrap()); - } - - // Check auth method - match &state.auth { - AuthMethod::Basic(auth) => { - if !matches!(parts.headers - .get(hyper::header::AUTHORIZATION) - .and_then(|h| h.to_str().ok()) - .and_then(|s| s.strip_prefix("Basic ")), - Some(token) if token == auth ) - { - return Ok(Response::builder() - .status(StatusCode::UNAUTHORIZED) - .body(Body::empty()) - .unwrap()); - } - } - AuthMethod::None => {} - } - - let response = match parts.uri.path() { - "/transfer" => { - assert_method(parts, Method::POST)?; - let request: TransferRequest = parse_body(parts, body).await.catch_code( - StatusCode::BAD_REQUEST, - ErrorCode::GENERIC_PARAMETER_MALFORMED, - )?; - if !check_payto(&request.credit_account, state.currency) { - return Err(ServerError::code( - StatusCode::BAD_REQUEST, - ErrorCode::GENERIC_PAYTO_URI_MALFORMED, - )); - } - if Currency::from_str(&request.amount.currency) != Ok(state.currency) { - return Err(ServerError::code( - StatusCode::BAD_REQUEST, - ErrorCode::GENERIC_PARAMETER_MALFORMED, - )); - } - let mut db = state.pool.get().await.catch_code( - StatusCode::GATEWAY_TIMEOUT, - ErrorCode::GENERIC_DB_FETCH_FAILED, - )?; - // Handle idempotence, check previous transaction with the same request_uid - let row = db.query_opt("SELECT amount, exchange_url, wtid, credit_acc, id, _date FROM tx_out WHERE request_uid = $1", &[&request.request_uid.as_slice()]) - .await?; - if let Some(row) = row { - let prev = TransferRequest { - request_uid: request.request_uid.clone(), - amount: sql_amount(&row, 0), - exchange_base_url: sql_url(&row, 1), - wtid: ShortHashCode::from(sql_array(&row, 2)), - credit_account: sql_url(&row, 3), - }; - if prev == request { - // Idempotence - return encode_body( - parts, - StatusCode::OK, - &TransferResponse { - timestamp: Timestamp::Time(row.get(5)), - row_id: sql_safe_u64(&row, 4), - }, - ) - .await - .unexpected(); - } else { - return Err(ServerError::status(StatusCode::CONFLICT)); - } - } - - let timestamp = Timestamp::now(); - let tx = db.transaction().await?; - let row = tx.query_one("INSERT INTO tx_out (amount, wtid, debit_acc, credit_acc, exchange_url, request_uid) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id", &[ - &request.amount.to_string(), &request.wtid.as_slice(), &state.payto.as_ref(), &request.credit_account.as_ref(), &request.exchange_base_url.as_ref(), &request.request_uid.as_slice() - ]).await?; - tx.execute("NOTIFY new_tx", &[]).await?; - tx.commit().await?; - encode_body( - parts, - StatusCode::OK, - &TransferResponse { - timestamp, - row_id: sql_safe_u64(&row, 0), - }, - ) - .await - .unexpected()? - } - "/history/incoming" => { - assert_method(parts, Method::GET)?; - let params = history_params(parts)?; - let filter = sql_history_filter(&params); - let db = state.pool.get().await.catch_code( - StatusCode::GATEWAY_TIMEOUT, - ErrorCode::GENERIC_DB_FETCH_FAILED, - )?; - let transactions = db - .query( - &format!("SELECT id, _date, amount, reserve_pub, debit_acc, credit_acc FROM tx_in {}", filter), - &[], - ) - .await.catch_code( - StatusCode::BAD_GATEWAY, - ErrorCode::GENERIC_DB_FETCH_FAILED, - )? - .into_iter() - .map(|row| IncomingBankTransaction::IncomingReserveTransaction { - row_id: sql_safe_u64(&row, 0), - date: Timestamp::Time(row.get(1)), - amount: sql_amount(&row, 2), - reserve_pub: ShortHashCode::from(sql_array(&row, 3)), - debit_account: sql_url(&row, 4), - credit_account: sql_url(&row, 5), - }) - .collect(); - encode_body( - parts, - StatusCode::OK, - &IncomingHistory { - incoming_transactions: transactions, - }, - ) - .await - .unexpected()? - } - "/history/outgoing" => { - assert_method(parts, Method::GET)?; - let params = history_params(parts)?; - let filter = sql_history_filter(&params); - - let db = state.pool.get().await.catch_code( - StatusCode::GATEWAY_TIMEOUT, - ErrorCode::GENERIC_DB_FETCH_FAILED, - )?; - let transactions = db - .query( - &format!("SELECT id, _date, amount, wtid, debit_acc, credit_acc, exchange_url FROM tx_out {}",filter), - &[], - ) - .await.catch_code( - StatusCode::BAD_GATEWAY, - ErrorCode::GENERIC_DB_FETCH_FAILED, - )? - .into_iter() - .map(|row| OutgoingBankTransaction { - row_id: sql_safe_u64(&row, 0), - date: Timestamp::Time(row.get(1)), - amount: sql_amount(&row, 2), - wtid: ShortHashCode::from(sql_array(&row, 3)), - debit_account: sql_url(&row, 4), - credit_account: sql_url(&row, 5), - exchange_base_url:sql_url(&row, 6), - }) - .collect(); - encode_body( - parts, - StatusCode::OK, - &OutgoingHistory { - outgoing_transactions: transactions, - }, - ) - .await - .unexpected()? - } - #[cfg(feature = "test")] - "/admin/add-incoming" => { - // We do not check input as this is a test admin endpoint - assert_method(&parts, Method::POST).unwrap(); - let request: common::api_wire::AddIncomingRequest = - parse_body(&parts, body).await.unwrap(); - let timestamp = Timestamp::now(); - let db = state.pool.get().await.catch_code( - StatusCode::GATEWAY_TIMEOUT, - ErrorCode::GENERIC_DB_FETCH_FAILED, - )?; - let row = db.query_one("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES (now(), $1, $2, $3, $4) RETURNING id", &[ - &request.amount.to_string(), &request.reserve_pub.as_slice(), &request.debit_account.as_ref(), &"payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj" - ]).await.catch_code( - StatusCode::BAD_GATEWAY, - ErrorCode::GENERIC_DB_FETCH_FAILED, - )?; - encode_body( - parts, - StatusCode::OK, - &TransferResponse { - timestamp, - row_id: sql_safe_u64(&row, 0), - }, - ) - .await - .unexpected()? - } - _ => { - return Err(ServerError::code( - StatusCode::NOT_FOUND, - ErrorCode::GENERIC_ENDPOINT_UNKNOWN, - )) - } - }; - Ok(response) -} - -/// Listen to backend status change -fn status_watcher(state: &'static ServerState) { - fn inner(state: &'static ServerState) -> Result<(), Box<dyn std::error::Error>> { - let mut db = state.db_config.connect(NoTls)?; - // Register as listener - db.batch_execute("LISTEN status")?; - loop { - // Sync state - let row = db.query_one("SELECT value FROM state WHERE name = 'status'", &[])?; - let status: &[u8] = row.get(0); - assert!(status.len() == 1 && status[0] < 2); - state.status.store(status[0] == 1, Ordering::SeqCst); - // Wait for next notification - db.notifications().blocking_iter().next()?; - } - } - - loop { - if let Err(err) = inner(state) { - error!("status-watcher: {}", err); - std::thread::sleep(Duration::from_secs(5)); - } - } -} +/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ +use clap::Parser; +use common::{ + api_common::{ShortHashCode, Timestamp}, + api_wire::{ + HistoryParams, IncomingBankTransaction, IncomingHistory, OutgoingBankTransaction, + OutgoingHistory, TransferRequest, TransferResponse, WireConfig, + }, + config::{AuthMethod, TalerConfig}, + currency::Currency, + error_codes::ErrorCode, + log::{ + fail, + log::{error, info, log, Level}, + }, + postgres::{self, fallible_iterator::FallibleIterator}, + sql::{sql_amount, sql_array, sql_safe_u64, sql_url}, + url::Url, +}; +use deadpool_postgres::{Pool, Runtime}; +use error::{CatchResult, ServerError}; +use hyper::{ + http::request::Parts, + service::{make_service_fn, service_fn}, + Body, Method, Response, Server, StatusCode, +}; +use json::{encode_body, parse_body}; +use listenfd::ListenFd; +use std::{ + convert::Infallible, + path::PathBuf, + str::FromStr, + sync::atomic::{AtomicBool, AtomicU32, Ordering}, + time::{Duration, Instant}, +}; +use tokio::sync::Notify; +use tokio_postgres::{config::Host, NoTls}; + +mod error; +mod json; + +struct ServerState { + pool: Pool, + db_config: postgres::Config, + payto: Url, + currency: Currency, + notify: Notify, + lifetime: Option<AtomicU32>, + status: AtomicBool, + auth: AuthMethod, +} + +impl ServerState { + /// Decrease lifetime, triggering graceful shutdown when reach lifetime end + pub fn step(&self) { + if let Some(lifetime) = &self.lifetime { + let mut current = lifetime.load(Ordering::Relaxed); + loop { + if current == 0 { + self.notify.notify_one(); + } + match lifetime.compare_exchange_weak( + current, + current - 1, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(new) => current = new, + } + } + } + } + + pub async fn shutdown_signal(&self) { + self.notify.notified().await; + info!("Reach end of lifetime"); + } +} + +/// Taler wire gateway server for depolymerizer +#[derive(clap::Parser, Debug)] +struct Args { + /// Override default configuration file path + #[clap(global = true, short, long)] + config: Option<PathBuf>, +} + +#[tokio::main] +async fn main() { + common::log::init(); + let args = Args::parse(); + let taler_config = TalerConfig::load(args.config.as_deref()); + + #[cfg(feature = "test")] + common::log::log::warn!("Running with test admin endpoint unsuitable for production"); + + // Parse postgres url + let db_config = taler_config.db_config(); + // TODO find a way to clean this ugly mess + let mut cfg = deadpool_postgres::Config::new(); + cfg.user = db_config.get_user().map(|it| it.to_string()); + cfg.password = db_config + .get_password() + .map(|it| String::from_utf8(it.to_vec()).unwrap()); + cfg.dbname = db_config.get_dbname().map(|it| it.to_string()); + cfg.options = db_config.get_options().map(|it| it.to_string()); + cfg.host = Some( + db_config + .get_hosts() + .iter() + .map(|it| match it { + Host::Tcp(it) => it.to_string(), + #[cfg(unix)] + Host::Unix(it) => it.to_str().unwrap().to_string(), + }) + .collect(), + ); + cfg.ports = Some(db_config.get_ports().to_vec()); + cfg.application_name = db_config.get_application_name().map(|it| it.to_string()); + cfg.connect_timeout = db_config.get_connect_timeout().cloned(); + + let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap(); + let payto = taler_config.payto(); + let state = ServerState { + pool, + notify: Notify::new(), + lifetime: taler_config.http_lifetime().map(AtomicU32::new), + status: AtomicBool::new(true), + db_config, + payto, + currency: taler_config.currency, + auth: taler_config.auth_method(), + }; + let state: &'static ServerState = Box::leak(Box::new(state)); + std::thread::spawn(move || status_watcher(state)); + let service = service_fn(move |req| async move { + state.step(); + let start = Instant::now(); + let (parts, body) = req.into_parts(); + let (response, msg) = match router(&parts, body, state).await { + Ok(resp) => (resp, String::new()), + Err(err) => err.response(), + }; + let status = response.status().as_u16(); + let level = if status >= 500 { + Level::Error + } else if status >= 400 { + Level::Warn + } else { + Level::Info + }; + log!( + level, + "{} {} {}ms {} - {}", + parts.method, + status, + start.elapsed().as_millis(), + parts.uri.path(), + msg + ); + Ok::<Response<Body>, Infallible>(response) + }); + let make_service = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) }); + let make_service_unix = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) }); + + let mut listenfd = ListenFd::from_env(); + + if let Some(listener) = listenfd.take_tcp_listener(0).unwrap() { + info!( + "Server listening on activated socket {}", + listener.local_addr().unwrap() + ); + let server = Server::from_tcp(listener) + .unwrap() + .serve(make_service) + .with_graceful_shutdown(state.shutdown_signal()); + if let Err(e) = server.await { + error!("server: {}", e); + } + } else if let Some(path) = taler_config.unix_path() { + use hyperlocal::UnixServerExt; + info!("Server listening on unix domain socket {:?}", path); + if let Err(err) = std::fs::remove_file(&path) { + if err.kind() != std::io::ErrorKind::NotFound { + fail(format_args!("{}", err)); + } + } + let server = Server::bind_unix(path) + .unwrap() + .serve(make_service_unix) + .with_graceful_shutdown(state.shutdown_signal()); + if let Err(e) = server.await { + error!("server: {}", e); + } + } else { + let addr = ([0, 0, 0, 0], taler_config.port()).into(); + info!("Server listening on http://{}", &addr); + let server = Server::bind(&addr) + .serve(make_service) + .with_graceful_shutdown(state.shutdown_signal()); + if let Err(e) = server.await { + error!("server: {}", e); + } + }; + info!("wire-gateway stopped"); +} + +/// Check if an url if a valid payto url for the configured currency +fn check_payto(url: &Url, currency: Currency) -> bool { + match currency { + Currency::ETH(_) => check_pay_to_eth(url), + Currency::BTC(_) => check_pay_to_btc(url), + } +} + +/// Check if an url is a valid bitcoin payto url +fn check_pay_to_btc(url: &Url) -> bool { + return url.domain() == Some("bitcoin") + && url.scheme() == "payto" + && url.username() == "" + && url.password().is_none() + && url.query().is_none() + && url.fragment().is_none() + && bitcoin::Address::from_str(url.path().trim_start_matches('/')).is_ok(); +} + +/// Check if an url is a valid ethereum payto url +fn check_pay_to_eth(url: &Url) -> bool { + return url.domain() == Some("ethereum") + && url.scheme() == "payto" + && url.username() == "" + && url.password().is_none() + && url.query().is_none() + && url.fragment().is_none() + && ethereum_types::H160::from_str(url.path().trim_start_matches('/')).is_ok(); +} + +/// Assert request method match expected +fn assert_method(parts: &Parts, method: Method) -> Result<(), ServerError> { + if parts.method == method { + Ok(()) + } else { + Err(ServerError::code( + StatusCode::METHOD_NOT_ALLOWED, + ErrorCode::GENERIC_METHOD_INVALID, + )) + } +} + +/// Parse history params from request +fn history_params(parts: &Parts) -> Result<HistoryParams, ServerError> { + let params: HistoryParams = serde_urlencoded::from_str(parts.uri.query().unwrap_or("")) + .catch_code( + StatusCode::BAD_REQUEST, + ErrorCode::GENERIC_PARAMETER_MALFORMED, + )?; + if params.delta == 0 { + return Err(ServerError::code( + StatusCode::BAD_REQUEST, + ErrorCode::GENERIC_PARAMETER_MALFORMED, + )); + } + Ok(params) +} + +/// Generate sql query filter from history params +fn sql_history_filter(params: &HistoryParams) -> String { + let asc = params.delta > 0; + let limit = params.delta.abs(); + let order_sql = if asc { "ASC" } else { "DESC" }; + let where_sql = if let Some(start) = params.start { + format!("WHERE id {} {}", if asc { '>' } else { '<' }, start) + } else { + String::new() + }; + format!("{} ORDER BY id {} LIMIT {}", where_sql, order_sql, limit) +} + +async fn router( + parts: &Parts, + body: Body, + state: &'static ServerState, +) -> Result<Response<Body>, ServerError> { + // Check status error + if !state.status.load(Ordering::SeqCst) { + return Ok(Response::builder() + .status(StatusCode::BAD_GATEWAY) + .body(Body::empty()) + .unwrap()); + } + + // Check auth method + match &state.auth { + AuthMethod::Basic(auth) => { + if !matches!(parts.headers + .get(hyper::header::AUTHORIZATION) + .and_then(|h| h.to_str().ok()) + .and_then(|s| s.strip_prefix("Basic ")), + Some(token) if token == auth ) + { + return Ok(Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Body::empty()) + .unwrap()); + } + } + AuthMethod::None => {} + } + + let response = match parts.uri.path() { + "/config" => { + assert_method(parts, Method::GET)?; + encode_body( + parts, + StatusCode::OK, + &WireConfig { + name: "taler-wire-gateway", + version: "0.0.0", + currency: state.currency.to_str(), + }, + ) + .unexpected()? + } + "/transfer" => { + assert_method(parts, Method::POST)?; + let request: TransferRequest = parse_body(parts, body).await.catch_code( + StatusCode::BAD_REQUEST, + ErrorCode::GENERIC_PARAMETER_MALFORMED, + )?; + if !check_payto(&request.credit_account, state.currency) { + return Err(ServerError::code( + StatusCode::BAD_REQUEST, + ErrorCode::GENERIC_PAYTO_URI_MALFORMED, + )); + } + if Currency::from_str(&request.amount.currency) != Ok(state.currency) { + return Err(ServerError::code( + StatusCode::BAD_REQUEST, + ErrorCode::GENERIC_PARAMETER_MALFORMED, + )); + } + let mut db = state.pool.get().await.catch_code( + StatusCode::GATEWAY_TIMEOUT, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )?; + // Handle idempotence, check previous transaction with the same request_uid + let row = db.query_opt("SELECT amount, exchange_url, wtid, credit_acc, id, _date FROM tx_out WHERE request_uid = $1", &[&request.request_uid.as_slice()]) + .await?; + if let Some(row) = row { + let prev = TransferRequest { + request_uid: request.request_uid.clone(), + amount: sql_amount(&row, 0), + exchange_base_url: sql_url(&row, 1), + wtid: ShortHashCode::from(sql_array(&row, 2)), + credit_account: sql_url(&row, 3), + }; + if prev == request { + // Idempotence + return encode_body( + parts, + StatusCode::OK, + &TransferResponse { + timestamp: Timestamp::Time(row.get(5)), + row_id: sql_safe_u64(&row, 4), + }, + ) + .unexpected(); + } else { + return Err(ServerError::status(StatusCode::CONFLICT)); + } + } + + let timestamp = Timestamp::now(); + let tx = db.transaction().await?; + let row = tx.query_one("INSERT INTO tx_out (amount, wtid, debit_acc, credit_acc, exchange_url, request_uid) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id", &[ + &request.amount.to_string(), &request.wtid.as_slice(), &state.payto.as_ref(), &request.credit_account.as_ref(), &request.exchange_base_url.as_ref(), &request.request_uid.as_slice() + ]).await?; + tx.execute("NOTIFY new_tx", &[]).await?; + tx.commit().await?; + encode_body( + parts, + StatusCode::OK, + &TransferResponse { + timestamp, + row_id: sql_safe_u64(&row, 0), + }, + ) + .unexpected()? + } + "/history/incoming" => { + assert_method(parts, Method::GET)?; + let params = history_params(parts)?; + let filter = sql_history_filter(&params); + let db = state.pool.get().await.catch_code( + StatusCode::GATEWAY_TIMEOUT, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )?; + let transactions: Vec<_> = db + .query( + &format!("SELECT id, _date, amount, reserve_pub, debit_acc FROM tx_in {}", filter), + &[], + ) + .await.catch_code( + StatusCode::BAD_GATEWAY, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )? + .into_iter() + .map(| row| { + IncomingBankTransaction::IncomingReserveTransaction { + row_id: sql_safe_u64(&row, 0), + date: Timestamp::Time(row.get(1)), + amount: sql_amount(&row, 2), + reserve_pub: ShortHashCode::from(sql_array(&row, 3)), + debit_account: sql_url(&row, 4), + } + }).collect(); + if transactions.is_empty() { + Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty()) + .unwrap() + } else { + encode_body( + parts, + StatusCode::OK, + &IncomingHistory { + credit_account: state.payto.clone(), + incoming_transactions: transactions, + }, + ) + .unexpected()? + } + } + "/history/outgoing" => { + assert_method(parts, Method::GET)?; + let params = history_params(parts)?; + let filter = sql_history_filter(&params); + + let db = state.pool.get().await.catch_code( + StatusCode::GATEWAY_TIMEOUT, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )?; + let transactions: Vec<_> =db + .query( + &format!("SELECT id, _date, amount, wtid, credit_acc, exchange_url FROM tx_out {}", filter), + &[], + ) + .await.catch_code( + StatusCode::BAD_GATEWAY, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )? + .into_iter() + .map(|row| OutgoingBankTransaction { + row_id: sql_safe_u64(&row, 0), + date: Timestamp::Time(row.get(1)), + amount: sql_amount(&row, 2), + wtid: ShortHashCode::from(sql_array(&row, 3)), + credit_account: sql_url(&row, 4), + exchange_base_url:sql_url(&row, 5), + }).collect(); + if transactions.is_empty() { + Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty()) + .unwrap() + } else { + encode_body( + parts, + StatusCode::OK, + &OutgoingHistory { + debit_account: state.payto.clone(), + outgoing_transactions: transactions, + }, + ) + .unexpected()? + } + } + #[cfg(feature = "test")] + "/admin/add-incoming" => { + // We do not check input as this is a test admin endpoint + assert_method(&parts, Method::POST).unwrap(); + let request: common::api_wire::AddIncomingRequest = + parse_body(&parts, body).await.unwrap(); + let timestamp = Timestamp::now(); + let db = state.pool.get().await.catch_code( + StatusCode::GATEWAY_TIMEOUT, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )?; + let row = db.query_one("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES (now(), $1, $2, $3, $4) RETURNING id", &[ + &request.amount.to_string(), &request.reserve_pub.as_slice(), &request.debit_account.as_ref(), &"payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj" + ]).await.catch_code( + StatusCode::BAD_GATEWAY, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )?; + encode_body( + parts, + StatusCode::OK, + &TransferResponse { + timestamp, + row_id: sql_safe_u64(&row, 0), + }, + ) + .unexpected()? + } + _ => { + return Err(ServerError::code( + StatusCode::NOT_FOUND, + ErrorCode::GENERIC_ENDPOINT_UNKNOWN, + )) + } + }; + Ok(response) +} + +/// Listen to backend status change +fn status_watcher(state: &'static ServerState) { + fn inner(state: &'static ServerState) -> Result<(), Box<dyn std::error::Error>> { + let mut db = state.db_config.connect(NoTls)?; + // Register as listener + db.batch_execute("LISTEN status")?; + loop { + // Sync state + let row = db.query_one("SELECT value FROM state WHERE name = 'status'", &[])?; + let status: &[u8] = row.get(0); + assert!(status.len() == 1 && status[0] < 2); + state.status.store(status[0] == 1, Ordering::SeqCst); + // Wait for next notification + db.notifications().blocking_iter().next()?; + } + } + + loop { + if let Err(err) = inner(state) { + error!("status-watcher: {}", err); + std::thread::sleep(Duration::from_secs(5)); + } + } +}