diff options
author | Antoine A <> | 2024-02-01 18:18:49 +0100 |
---|---|---|
committer | Antoine A <> | 2024-02-01 18:18:49 +0100 |
commit | 635e0e1d0ce6773f7bea4b4fed7c22f614ba2829 (patch) | |
tree | 008fb2d6af4efe91302528c7aa76089890e68020 | |
parent | c97f15ab23c310dfb630f78c1513c38803ab0f2a (diff) | |
download | depolymerization-635e0e1d0ce6773f7bea4b4fed7c22f614ba2829.tar.gz depolymerization-635e0e1d0ce6773f7bea4b4fed7c22f614ba2829.tar.bz2 depolymerization-635e0e1d0ce6773f7bea4b4fed7c22f614ba2829.zip |
Update taler wire api to the newest specification
-rw-r--r-- | common/src/api_wire.rs | 199 | ||||
-rw-r--r-- | instrumentation/src/main.rs | 2 | ||||
-rw-r--r-- | instrumentation/src/utils.rs | 50 | ||||
-rw-r--r-- | wire-gateway/src/json.rs | 2 | ||||
-rw-r--r-- | wire-gateway/src/main.rs | 1084 |
5 files changed, 687 insertions, 650 deletions
diff --git a/common/src/api_wire.rs b/common/src/api_wire.rs index 8476069..7f7b477 100644 --- a/common/src/api_wire.rs +++ b/common/src/api_wire.rs @@ -1,98 +1,101 @@ -/*
- This file is part of TALER
- Copyright (C) 2022 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-use url::Url;
-
-use crate::api_common::{Amount, EddsaPublicKey, HashCode, SafeU64, ShortHashCode, Timestamp};
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-TransferResponse>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct TransferResponse {
- pub timestamp: Timestamp,
- pub row_id: SafeU64,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-TransferRequest>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
-pub struct TransferRequest {
- pub request_uid: HashCode,
- pub amount: Amount,
- pub exchange_base_url: Url,
- pub wtid: ShortHashCode,
- pub credit_account: Url,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-OutgoingHistory>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct OutgoingHistory {
- pub outgoing_transactions: Vec<OutgoingBankTransaction>,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-OutgoingBankTransaction>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct OutgoingBankTransaction {
- pub row_id: SafeU64,
- pub date: Timestamp,
- pub amount: Amount,
- pub credit_account: Url,
- pub debit_account: Url,
- pub wtid: ShortHashCode,
- pub exchange_base_url: Url,
-}
-
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct IncomingHistory {
- pub incoming_transactions: Vec<IncomingBankTransaction>,
-}
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-#[serde(tag = "type")]
-pub enum IncomingBankTransaction {
- #[serde(rename = "RESERVE")]
- IncomingReserveTransaction {
- row_id: SafeU64,
- date: Timestamp,
- amount: Amount,
- credit_account: Url,
- debit_account: Url,
- reserve_pub: EddsaPublicKey,
- },
- #[serde(rename = "WAD")]
- IncomingWadTransaction {
- // TODO not yet supported
- },
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-AddIncomingRequest>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct AddIncomingRequest {
- pub amount: Amount,
- pub reserve_pub: EddsaPublicKey,
- pub debit_account: Url,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-AddIncomingResponse>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct AddIncomingResponse {
- pub row_id: SafeU64,
- pub timestamp: Timestamp,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#querying-the-transaction-history>
-#[derive(Debug, Clone, serde::Deserialize)]
-pub struct HistoryParams {
- pub start: Option<u64>,
- pub delta: i64,
- pub long_pool_ms: Option<u64>,
-}
+/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +/// Type for the Taler Wire Gateway HTTP API <https://docs.taler.net/core/api-bank-wire.html#taler-wire-gateway-http-api> +use url::Url; + +use crate::api_common::{Amount, EddsaPublicKey, HashCode, SafeU64, ShortHashCode, Timestamp}; + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct WireConfig { + pub name: &'static str, + pub version: &'static str, + pub currency: &'static str, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct TransferResponse { + pub timestamp: Timestamp, + pub row_id: SafeU64, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)] +pub struct TransferRequest { + pub request_uid: HashCode, + pub amount: Amount, + pub exchange_base_url: Url, + pub wtid: ShortHashCode, + pub credit_account: Url, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct OutgoingHistory { + pub outgoing_transactions: Vec<OutgoingBankTransaction>, + pub debit_account: Url, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct OutgoingBankTransaction { + pub row_id: SafeU64, + pub date: Timestamp, + pub amount: Amount, + pub credit_account: Url, + pub wtid: ShortHashCode, + pub exchange_base_url: Url, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct IncomingHistory { + pub credit_account: Url, + pub incoming_transactions: Vec<IncomingBankTransaction>, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +#[serde(tag = "type")] +pub enum IncomingBankTransaction { + #[serde(rename = "RESERVE")] + IncomingReserveTransaction { + row_id: SafeU64, + date: Timestamp, + amount: Amount, + debit_account: Url, + reserve_pub: EddsaPublicKey, + }, + #[serde(rename = "WAD")] + IncomingWadTransaction { + // TODO not yet supported + }, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct AddIncomingRequest { + pub amount: Amount, + pub reserve_pub: EddsaPublicKey, + pub debit_account: Url, +} + +#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] +pub struct AddIncomingResponse { + pub row_id: SafeU64, + pub timestamp: Timestamp, +} + +#[derive(Debug, Clone, serde::Deserialize)] +pub struct HistoryParams { + pub start: Option<u64>, + pub delta: i64, + pub long_pool_ms: Option<u64>, +} diff --git a/instrumentation/src/main.rs b/instrumentation/src/main.rs index b787d5f..fd3e46f 100644 --- a/instrumentation/src/main.rs +++ b/instrumentation/src/main.rs @@ -152,7 +152,7 @@ pub fn main() { let len = results.len(); m.clear().unwrap(); for ((result, _, out, msg), name) in &results { - if let Err(_) = result { + if result.is_err() { println!("{} {}\n{}", name.magenta(), msg.red(), out.bright_black()); } } diff --git a/instrumentation/src/utils.rs b/instrumentation/src/utils.rs index 5509754..baf3686 100644 --- a/instrumentation/src/utils.rs +++ b/instrumentation/src/utils.rs @@ -44,16 +44,21 @@ pub fn print_now(disp: impl Display) { #[must_use] pub fn check_incoming(base_url: &str, txs: &[([u8; 32], Amount)]) -> bool { - let history: IncomingHistory = ureq::get(&format!("{}history/incoming", base_url)) + let res = ureq::get(&format!("{}history/incoming", base_url)) .query("delta", &format!("-{}", txs.len())) .call() - .unwrap() - .into_json() .unwrap(); + if txs.is_empty() { + res.status() == 204 + } else { + if res.status() != 200 { + return false; + } + let history: IncomingHistory = res.into_json().unwrap(); - history.incoming_transactions.len() == txs.len() - && txs.iter().all(|(reserve_pub_key, taler_amount)| { - history.incoming_transactions.iter().any(|h| { + history.incoming_transactions.len() == txs.len() + && txs.iter().all(|(reserve_pub_key, taler_amount)| { + history.incoming_transactions.iter().any(|h| { matches!( h, IncomingBankTransaction::IncomingReserveTransaction { @@ -63,7 +68,8 @@ pub fn check_incoming(base_url: &str, txs: &[([u8; 32], Amount)]) -> bool { } if reserve_pub == &Base32::from(*reserve_pub_key) && amount == taler_amount ) }) - }) + }) + } } pub fn gateway_error(path: &str, error: u16) { @@ -96,10 +102,7 @@ pub fn check_gateway_down(base_url: &str) -> bool { #[must_use] pub fn check_gateway_up(base_url: &str) -> bool { - ureq::get(&format!("{}history/incoming", base_url)) - .query("delta", "-5") - .call() - .is_ok() + ureq::get(&format!("{}config", base_url)).call().is_ok() } pub fn transfer(base_url: &str, wtid: &[u8; 32], url: &Url, credit_account: Url, amount: &Amount) { @@ -116,18 +119,27 @@ pub fn transfer(base_url: &str, wtid: &[u8; 32], url: &Url, credit_account: Url, #[must_use] pub fn check_outgoing(base_url: &str, url: &Url, txs: &[([u8; 32], Amount)]) -> bool { - let history: OutgoingHistory = ureq::get(&format!("{}history/outgoing", base_url)) + let res = ureq::get(&format!("{}history/outgoing", base_url)) .query("delta", &format!("-{}", txs.len())) .call() - .unwrap() - .into_json() .unwrap(); - history.outgoing_transactions.len() == txs.len() - && txs.iter().all(|(wtid, amount)| { - history.outgoing_transactions.iter().any(|h| { - h.wtid == Base32::from(*wtid) && &h.exchange_base_url == url && &h.amount == amount + if txs.is_empty() { + res.status() == 204 + } else { + if res.status() != 200 { + return false; + } + let history: OutgoingHistory = res.into_json().unwrap(); + + history.outgoing_transactions.len() == txs.len() + && txs.iter().all(|(wtid, amount)| { + history.outgoing_transactions.iter().any(|h| { + h.wtid == Base32::from(*wtid) + && &h.exchange_base_url == url + && &h.amount == amount + }) }) - }) + } } pub struct ChildGuard(pub Child); diff --git a/wire-gateway/src/json.rs b/wire-gateway/src/json.rs index da0c3c3..96a55b8 100644 --- a/wire-gateway/src/json.rs +++ b/wire-gateway/src/json.rs @@ -73,7 +73,7 @@ pub enum EncodeBodyError { Json(#[from] serde_json::Error),
}
-pub async fn encode_body<J: serde::Serialize>(
+pub fn encode_body<J: serde::Serialize>(
parts: &Parts,
status: StatusCode,
json: &J,
diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs index 7024d9b..98c7589 100644 --- a/wire-gateway/src/main.rs +++ b/wire-gateway/src/main.rs @@ -1,531 +1,553 @@ -/*
- This file is part of TALER
- Copyright (C) 2022 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-use clap::Parser;
-use common::{
- api_common::{ShortHashCode, Timestamp},
- api_wire::{
- HistoryParams, IncomingBankTransaction, IncomingHistory, OutgoingBankTransaction,
- OutgoingHistory, TransferRequest, TransferResponse,
- },
- config::{AuthMethod, TalerConfig},
- currency::Currency,
- error_codes::ErrorCode,
- log::{
- fail,
- log::{error, info, log, Level},
- },
- postgres::{self, fallible_iterator::FallibleIterator},
- sql::{sql_amount, sql_array, sql_safe_u64, sql_url},
- url::Url,
-};
-use deadpool_postgres::{Pool, Runtime};
-use error::{CatchResult, ServerError};
-use hyper::{
- http::request::Parts,
- service::{make_service_fn, service_fn},
- Body, Method, Response, Server, StatusCode,
-};
-use json::{encode_body, parse_body};
-use listenfd::ListenFd;
-use std::{
- convert::Infallible,
- path::PathBuf,
- str::FromStr,
- sync::atomic::{AtomicBool, AtomicU32, Ordering},
- time::{Duration, Instant},
-};
-use tokio::sync::Notify;
-use tokio_postgres::{config::Host, NoTls};
-
-mod error;
-mod json;
-
-struct ServerState {
- pool: Pool,
- db_config: postgres::Config,
- payto: Url,
- currency: Currency,
- notify: Notify,
- lifetime: Option<AtomicU32>,
- status: AtomicBool,
- auth: AuthMethod,
-}
-
-impl ServerState {
- /// Decrease lifetime, triggering graceful shutdown when reach lifetime end
- pub fn step(&self) {
- if let Some(lifetime) = &self.lifetime {
- let mut current = lifetime.load(Ordering::Relaxed);
- loop {
- if current == 0 {
- self.notify.notify_one();
- }
- match lifetime.compare_exchange_weak(
- current,
- current - 1,
- Ordering::SeqCst,
- Ordering::Relaxed,
- ) {
- Ok(_) => break,
- Err(new) => current = new,
- }
- }
- }
- }
-
- pub async fn shutdown_signal(&self) {
- self.notify.notified().await;
- info!("Reach end of lifetime");
- }
-}
-
-/// Taler wire gateway server for depolymerizer
-#[derive(clap::Parser, Debug)]
-struct Args {
- /// Override default configuration file path
- #[clap(global = true, short, long)]
- config: Option<PathBuf>,
-}
-
-#[tokio::main]
-async fn main() {
- common::log::init();
- let args = Args::parse();
- let taler_config = TalerConfig::load(args.config.as_deref());
-
- #[cfg(feature = "test")]
- common::log::log::warn!("Running with test admin endpoint unsuitable for production");
-
- // Parse postgres url
- let db_config = taler_config.db_config();
- // TODO find a way to clean this ugly mess
- let mut cfg = deadpool_postgres::Config::new();
- cfg.user = db_config.get_user().map(|it| it.to_string());
- cfg.password = db_config
- .get_password()
- .map(|it| String::from_utf8(it.to_vec()).unwrap());
- cfg.dbname = db_config.get_dbname().map(|it| it.to_string());
- cfg.options = db_config.get_options().map(|it| it.to_string());
- cfg.host = Some(
- db_config
- .get_hosts()
- .iter()
- .map(|it| match it {
- Host::Tcp(it) => it.to_string(),
- #[cfg(unix)]
- Host::Unix(it) => it.to_str().unwrap().to_string(),
- })
- .collect(),
- );
- cfg.ports = Some(db_config.get_ports().to_vec());
- cfg.application_name = db_config.get_application_name().map(|it| it.to_string());
- cfg.connect_timeout = db_config.get_connect_timeout().cloned();
-
- let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
- let payto = taler_config.payto();
- let state = ServerState {
- pool,
- notify: Notify::new(),
- lifetime: taler_config.http_lifetime().map(AtomicU32::new),
- status: AtomicBool::new(true),
- db_config,
- payto,
- currency: taler_config.currency,
- auth: taler_config.auth_method(),
- };
- let state: &'static ServerState = Box::leak(Box::new(state));
- std::thread::spawn(move || status_watcher(state));
- let service = service_fn(move |req| async move {
- state.step();
- let start = Instant::now();
- let (parts, body) = req.into_parts();
- let (response, msg) = match router(&parts, body, state).await {
- Ok(resp) => (resp, String::new()),
- Err(err) => err.response(),
- };
- let status = response.status().as_u16();
- let level = if status >= 500 {
- Level::Error
- } else if status >= 400 {
- Level::Warn
- } else {
- Level::Info
- };
- log!(
- level,
- "{} {} {}ms {} - {}",
- parts.method,
- status,
- start.elapsed().as_millis(),
- parts.uri.path(),
- msg
- );
- Ok::<Response<Body>, Infallible>(response)
- });
- let make_service = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) });
- let make_service_unix = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) });
-
- let mut listenfd = ListenFd::from_env();
-
- if let Some(listener) = listenfd.take_tcp_listener(0).unwrap() {
- info!(
- "Server listening on activated socket {}",
- listener.local_addr().unwrap()
- );
- let server = Server::from_tcp(listener)
- .unwrap()
- .serve(make_service)
- .with_graceful_shutdown(state.shutdown_signal());
- if let Err(e) = server.await {
- error!("server: {}", e);
- }
- } else if let Some(path) = taler_config.unix_path() {
- use hyperlocal::UnixServerExt;
- info!("Server listening on unix domain socket {:?}", path);
- if let Err(err) = std::fs::remove_file(&path) {
- if err.kind() != std::io::ErrorKind::NotFound {
- fail(format_args!("{}", err));
- }
- }
- let server = Server::bind_unix(path)
- .unwrap()
- .serve(make_service_unix)
- .with_graceful_shutdown(state.shutdown_signal());
- if let Err(e) = server.await {
- error!("server: {}", e);
- }
- } else {
- let addr = ([0, 0, 0, 0], taler_config.port()).into();
- info!("Server listening on http://{}", &addr);
- let server = Server::bind(&addr)
- .serve(make_service)
- .with_graceful_shutdown(state.shutdown_signal());
- if let Err(e) = server.await {
- error!("server: {}", e);
- }
- };
- info!("wire-gateway stopped");
-}
-
-/// Check if an url if a valid payto url for the configured currency
-fn check_payto(url: &Url, currency: Currency) -> bool {
- match currency {
- Currency::ETH(_) => check_pay_to_eth(url),
- Currency::BTC(_) => check_pay_to_btc(url),
- }
-}
-
-/// Check if an url is a valid bitcoin payto url
-fn check_pay_to_btc(url: &Url) -> bool {
- return url.domain() == Some("bitcoin")
- && url.scheme() == "payto"
- && url.username() == ""
- && url.password().is_none()
- && url.query().is_none()
- && url.fragment().is_none()
- && bitcoin::Address::from_str(url.path().trim_start_matches('/')).is_ok();
-}
-
-/// Check if an url is a valid ethereum payto url
-fn check_pay_to_eth(url: &Url) -> bool {
- return url.domain() == Some("ethereum")
- && url.scheme() == "payto"
- && url.username() == ""
- && url.password().is_none()
- && url.query().is_none()
- && url.fragment().is_none()
- && ethereum_types::H160::from_str(url.path().trim_start_matches('/')).is_ok();
-}
-
-/// Assert request method match expected
-fn assert_method(parts: &Parts, method: Method) -> Result<(), ServerError> {
- if parts.method == method {
- Ok(())
- } else {
- Err(ServerError::code(
- StatusCode::METHOD_NOT_ALLOWED,
- ErrorCode::GENERIC_METHOD_INVALID,
- ))
- }
-}
-
-/// Parse history params from request
-fn history_params(parts: &Parts) -> Result<HistoryParams, ServerError> {
- let params: HistoryParams = serde_urlencoded::from_str(parts.uri.query().unwrap_or(""))
- .catch_code(
- StatusCode::BAD_REQUEST,
- ErrorCode::GENERIC_PARAMETER_MALFORMED,
- )?;
- if params.delta == 0 {
- return Err(ServerError::code(
- StatusCode::BAD_REQUEST,
- ErrorCode::GENERIC_PARAMETER_MALFORMED,
- ));
- }
- Ok(params)
-}
-
-/// Generate sql query filter from history params
-fn sql_history_filter(params: &HistoryParams) -> String {
- let asc = params.delta > 0;
- let limit = params.delta.abs();
- let order_sql = if asc { "ASC" } else { "DESC" };
- let where_sql = if let Some(start) = params.start {
- format!("WHERE id {} {}", if asc { '>' } else { '<' }, start)
- } else {
- String::new()
- };
- format!("{} ORDER BY id {} LIMIT {}", where_sql, order_sql, limit)
-}
-
-async fn router(
- parts: &Parts,
- body: Body,
- state: &'static ServerState,
-) -> Result<Response<Body>, ServerError> {
- // Check status error
- if !state.status.load(Ordering::SeqCst) {
- return Ok(Response::builder()
- .status(StatusCode::BAD_GATEWAY)
- .body(Body::empty())
- .unwrap());
- }
-
- // Check auth method
- match &state.auth {
- AuthMethod::Basic(auth) => {
- if !matches!(parts.headers
- .get(hyper::header::AUTHORIZATION)
- .and_then(|h| h.to_str().ok())
- .and_then(|s| s.strip_prefix("Basic ")),
- Some(token) if token == auth )
- {
- return Ok(Response::builder()
- .status(StatusCode::UNAUTHORIZED)
- .body(Body::empty())
- .unwrap());
- }
- }
- AuthMethod::None => {}
- }
-
- let response = match parts.uri.path() {
- "/transfer" => {
- assert_method(parts, Method::POST)?;
- let request: TransferRequest = parse_body(parts, body).await.catch_code(
- StatusCode::BAD_REQUEST,
- ErrorCode::GENERIC_PARAMETER_MALFORMED,
- )?;
- if !check_payto(&request.credit_account, state.currency) {
- return Err(ServerError::code(
- StatusCode::BAD_REQUEST,
- ErrorCode::GENERIC_PAYTO_URI_MALFORMED,
- ));
- }
- if Currency::from_str(&request.amount.currency) != Ok(state.currency) {
- return Err(ServerError::code(
- StatusCode::BAD_REQUEST,
- ErrorCode::GENERIC_PARAMETER_MALFORMED,
- ));
- }
- let mut db = state.pool.get().await.catch_code(
- StatusCode::GATEWAY_TIMEOUT,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?;
- // Handle idempotence, check previous transaction with the same request_uid
- let row = db.query_opt("SELECT amount, exchange_url, wtid, credit_acc, id, _date FROM tx_out WHERE request_uid = $1", &[&request.request_uid.as_slice()])
- .await?;
- if let Some(row) = row {
- let prev = TransferRequest {
- request_uid: request.request_uid.clone(),
- amount: sql_amount(&row, 0),
- exchange_base_url: sql_url(&row, 1),
- wtid: ShortHashCode::from(sql_array(&row, 2)),
- credit_account: sql_url(&row, 3),
- };
- if prev == request {
- // Idempotence
- return encode_body(
- parts,
- StatusCode::OK,
- &TransferResponse {
- timestamp: Timestamp::Time(row.get(5)),
- row_id: sql_safe_u64(&row, 4),
- },
- )
- .await
- .unexpected();
- } else {
- return Err(ServerError::status(StatusCode::CONFLICT));
- }
- }
-
- let timestamp = Timestamp::now();
- let tx = db.transaction().await?;
- let row = tx.query_one("INSERT INTO tx_out (amount, wtid, debit_acc, credit_acc, exchange_url, request_uid) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id", &[
- &request.amount.to_string(), &request.wtid.as_slice(), &state.payto.as_ref(), &request.credit_account.as_ref(), &request.exchange_base_url.as_ref(), &request.request_uid.as_slice()
- ]).await?;
- tx.execute("NOTIFY new_tx", &[]).await?;
- tx.commit().await?;
- encode_body(
- parts,
- StatusCode::OK,
- &TransferResponse {
- timestamp,
- row_id: sql_safe_u64(&row, 0),
- },
- )
- .await
- .unexpected()?
- }
- "/history/incoming" => {
- assert_method(parts, Method::GET)?;
- let params = history_params(parts)?;
- let filter = sql_history_filter(¶ms);
- let db = state.pool.get().await.catch_code(
- StatusCode::GATEWAY_TIMEOUT,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?;
- let transactions = db
- .query(
- &format!("SELECT id, _date, amount, reserve_pub, debit_acc, credit_acc FROM tx_in {}", filter),
- &[],
- )
- .await.catch_code(
- StatusCode::BAD_GATEWAY,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?
- .into_iter()
- .map(|row| IncomingBankTransaction::IncomingReserveTransaction {
- row_id: sql_safe_u64(&row, 0),
- date: Timestamp::Time(row.get(1)),
- amount: sql_amount(&row, 2),
- reserve_pub: ShortHashCode::from(sql_array(&row, 3)),
- debit_account: sql_url(&row, 4),
- credit_account: sql_url(&row, 5),
- })
- .collect();
- encode_body(
- parts,
- StatusCode::OK,
- &IncomingHistory {
- incoming_transactions: transactions,
- },
- )
- .await
- .unexpected()?
- }
- "/history/outgoing" => {
- assert_method(parts, Method::GET)?;
- let params = history_params(parts)?;
- let filter = sql_history_filter(¶ms);
-
- let db = state.pool.get().await.catch_code(
- StatusCode::GATEWAY_TIMEOUT,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?;
- let transactions = db
- .query(
- &format!("SELECT id, _date, amount, wtid, debit_acc, credit_acc, exchange_url FROM tx_out {}",filter),
- &[],
- )
- .await.catch_code(
- StatusCode::BAD_GATEWAY,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?
- .into_iter()
- .map(|row| OutgoingBankTransaction {
- row_id: sql_safe_u64(&row, 0),
- date: Timestamp::Time(row.get(1)),
- amount: sql_amount(&row, 2),
- wtid: ShortHashCode::from(sql_array(&row, 3)),
- debit_account: sql_url(&row, 4),
- credit_account: sql_url(&row, 5),
- exchange_base_url:sql_url(&row, 6),
- })
- .collect();
- encode_body(
- parts,
- StatusCode::OK,
- &OutgoingHistory {
- outgoing_transactions: transactions,
- },
- )
- .await
- .unexpected()?
- }
- #[cfg(feature = "test")]
- "/admin/add-incoming" => {
- // We do not check input as this is a test admin endpoint
- assert_method(&parts, Method::POST).unwrap();
- let request: common::api_wire::AddIncomingRequest =
- parse_body(&parts, body).await.unwrap();
- let timestamp = Timestamp::now();
- let db = state.pool.get().await.catch_code(
- StatusCode::GATEWAY_TIMEOUT,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?;
- let row = db.query_one("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES (now(), $1, $2, $3, $4) RETURNING id", &[
- &request.amount.to_string(), &request.reserve_pub.as_slice(), &request.debit_account.as_ref(), &"payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj"
- ]).await.catch_code(
- StatusCode::BAD_GATEWAY,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?;
- encode_body(
- parts,
- StatusCode::OK,
- &TransferResponse {
- timestamp,
- row_id: sql_safe_u64(&row, 0),
- },
- )
- .await
- .unexpected()?
- }
- _ => {
- return Err(ServerError::code(
- StatusCode::NOT_FOUND,
- ErrorCode::GENERIC_ENDPOINT_UNKNOWN,
- ))
- }
- };
- Ok(response)
-}
-
-/// Listen to backend status change
-fn status_watcher(state: &'static ServerState) {
- fn inner(state: &'static ServerState) -> Result<(), Box<dyn std::error::Error>> {
- let mut db = state.db_config.connect(NoTls)?;
- // Register as listener
- db.batch_execute("LISTEN status")?;
- loop {
- // Sync state
- let row = db.query_one("SELECT value FROM state WHERE name = 'status'", &[])?;
- let status: &[u8] = row.get(0);
- assert!(status.len() == 1 && status[0] < 2);
- state.status.store(status[0] == 1, Ordering::SeqCst);
- // Wait for next notification
- db.notifications().blocking_iter().next()?;
- }
- }
-
- loop {
- if let Err(err) = inner(state) {
- error!("status-watcher: {}", err);
- std::thread::sleep(Duration::from_secs(5));
- }
- }
-}
+/* + This file is part of TALER + Copyright (C) 2022 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ +use clap::Parser; +use common::{ + api_common::{ShortHashCode, Timestamp}, + api_wire::{ + HistoryParams, IncomingBankTransaction, IncomingHistory, OutgoingBankTransaction, + OutgoingHistory, TransferRequest, TransferResponse, WireConfig, + }, + config::{AuthMethod, TalerConfig}, + currency::Currency, + error_codes::ErrorCode, + log::{ + fail, + log::{error, info, log, Level}, + }, + postgres::{self, fallible_iterator::FallibleIterator}, + sql::{sql_amount, sql_array, sql_safe_u64, sql_url}, + url::Url, +}; +use deadpool_postgres::{Pool, Runtime}; +use error::{CatchResult, ServerError}; +use hyper::{ + http::request::Parts, + service::{make_service_fn, service_fn}, + Body, Method, Response, Server, StatusCode, +}; +use json::{encode_body, parse_body}; +use listenfd::ListenFd; +use std::{ + convert::Infallible, + path::PathBuf, + str::FromStr, + sync::atomic::{AtomicBool, AtomicU32, Ordering}, + time::{Duration, Instant}, +}; +use tokio::sync::Notify; +use tokio_postgres::{config::Host, NoTls}; + +mod error; +mod json; + +struct ServerState { + pool: Pool, + db_config: postgres::Config, + payto: Url, + currency: Currency, + notify: Notify, + lifetime: Option<AtomicU32>, + status: AtomicBool, + auth: AuthMethod, +} + +impl ServerState { + /// Decrease lifetime, triggering graceful shutdown when reach lifetime end + pub fn step(&self) { + if let Some(lifetime) = &self.lifetime { + let mut current = lifetime.load(Ordering::Relaxed); + loop { + if current == 0 { + self.notify.notify_one(); + } + match lifetime.compare_exchange_weak( + current, + current - 1, + Ordering::SeqCst, + Ordering::Relaxed, + ) { + Ok(_) => break, + Err(new) => current = new, + } + } + } + } + + pub async fn shutdown_signal(&self) { + self.notify.notified().await; + info!("Reach end of lifetime"); + } +} + +/// Taler wire gateway server for depolymerizer +#[derive(clap::Parser, Debug)] +struct Args { + /// Override default configuration file path + #[clap(global = true, short, long)] + config: Option<PathBuf>, +} + +#[tokio::main] +async fn main() { + common::log::init(); + let args = Args::parse(); + let taler_config = TalerConfig::load(args.config.as_deref()); + + #[cfg(feature = "test")] + common::log::log::warn!("Running with test admin endpoint unsuitable for production"); + + // Parse postgres url + let db_config = taler_config.db_config(); + // TODO find a way to clean this ugly mess + let mut cfg = deadpool_postgres::Config::new(); + cfg.user = db_config.get_user().map(|it| it.to_string()); + cfg.password = db_config + .get_password() + .map(|it| String::from_utf8(it.to_vec()).unwrap()); + cfg.dbname = db_config.get_dbname().map(|it| it.to_string()); + cfg.options = db_config.get_options().map(|it| it.to_string()); + cfg.host = Some( + db_config + .get_hosts() + .iter() + .map(|it| match it { + Host::Tcp(it) => it.to_string(), + #[cfg(unix)] + Host::Unix(it) => it.to_str().unwrap().to_string(), + }) + .collect(), + ); + cfg.ports = Some(db_config.get_ports().to_vec()); + cfg.application_name = db_config.get_application_name().map(|it| it.to_string()); + cfg.connect_timeout = db_config.get_connect_timeout().cloned(); + + let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap(); + let payto = taler_config.payto(); + let state = ServerState { + pool, + notify: Notify::new(), + lifetime: taler_config.http_lifetime().map(AtomicU32::new), + status: AtomicBool::new(true), + db_config, + payto, + currency: taler_config.currency, + auth: taler_config.auth_method(), + }; + let state: &'static ServerState = Box::leak(Box::new(state)); + std::thread::spawn(move || status_watcher(state)); + let service = service_fn(move |req| async move { + state.step(); + let start = Instant::now(); + let (parts, body) = req.into_parts(); + let (response, msg) = match router(&parts, body, state).await { + Ok(resp) => (resp, String::new()), + Err(err) => err.response(), + }; + let status = response.status().as_u16(); + let level = if status >= 500 { + Level::Error + } else if status >= 400 { + Level::Warn + } else { + Level::Info + }; + log!( + level, + "{} {} {}ms {} - {}", + parts.method, + status, + start.elapsed().as_millis(), + parts.uri.path(), + msg + ); + Ok::<Response<Body>, Infallible>(response) + }); + let make_service = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) }); + let make_service_unix = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) }); + + let mut listenfd = ListenFd::from_env(); + + if let Some(listener) = listenfd.take_tcp_listener(0).unwrap() { + info!( + "Server listening on activated socket {}", + listener.local_addr().unwrap() + ); + let server = Server::from_tcp(listener) + .unwrap() + .serve(make_service) + .with_graceful_shutdown(state.shutdown_signal()); + if let Err(e) = server.await { + error!("server: {}", e); + } + } else if let Some(path) = taler_config.unix_path() { + use hyperlocal::UnixServerExt; + info!("Server listening on unix domain socket {:?}", path); + if let Err(err) = std::fs::remove_file(&path) { + if err.kind() != std::io::ErrorKind::NotFound { + fail(format_args!("{}", err)); + } + } + let server = Server::bind_unix(path) + .unwrap() + .serve(make_service_unix) + .with_graceful_shutdown(state.shutdown_signal()); + if let Err(e) = server.await { + error!("server: {}", e); + } + } else { + let addr = ([0, 0, 0, 0], taler_config.port()).into(); + info!("Server listening on http://{}", &addr); + let server = Server::bind(&addr) + .serve(make_service) + .with_graceful_shutdown(state.shutdown_signal()); + if let Err(e) = server.await { + error!("server: {}", e); + } + }; + info!("wire-gateway stopped"); +} + +/// Check if an url if a valid payto url for the configured currency +fn check_payto(url: &Url, currency: Currency) -> bool { + match currency { + Currency::ETH(_) => check_pay_to_eth(url), + Currency::BTC(_) => check_pay_to_btc(url), + } +} + +/// Check if an url is a valid bitcoin payto url +fn check_pay_to_btc(url: &Url) -> bool { + return url.domain() == Some("bitcoin") + && url.scheme() == "payto" + && url.username() == "" + && url.password().is_none() + && url.query().is_none() + && url.fragment().is_none() + && bitcoin::Address::from_str(url.path().trim_start_matches('/')).is_ok(); +} + +/// Check if an url is a valid ethereum payto url +fn check_pay_to_eth(url: &Url) -> bool { + return url.domain() == Some("ethereum") + && url.scheme() == "payto" + && url.username() == "" + && url.password().is_none() + && url.query().is_none() + && url.fragment().is_none() + && ethereum_types::H160::from_str(url.path().trim_start_matches('/')).is_ok(); +} + +/// Assert request method match expected +fn assert_method(parts: &Parts, method: Method) -> Result<(), ServerError> { + if parts.method == method { + Ok(()) + } else { + Err(ServerError::code( + StatusCode::METHOD_NOT_ALLOWED, + ErrorCode::GENERIC_METHOD_INVALID, + )) + } +} + +/// Parse history params from request +fn history_params(parts: &Parts) -> Result<HistoryParams, ServerError> { + let params: HistoryParams = serde_urlencoded::from_str(parts.uri.query().unwrap_or("")) + .catch_code( + StatusCode::BAD_REQUEST, + ErrorCode::GENERIC_PARAMETER_MALFORMED, + )?; + if params.delta == 0 { + return Err(ServerError::code( + StatusCode::BAD_REQUEST, + ErrorCode::GENERIC_PARAMETER_MALFORMED, + )); + } + Ok(params) +} + +/// Generate sql query filter from history params +fn sql_history_filter(params: &HistoryParams) -> String { + let asc = params.delta > 0; + let limit = params.delta.abs(); + let order_sql = if asc { "ASC" } else { "DESC" }; + let where_sql = if let Some(start) = params.start { + format!("WHERE id {} {}", if asc { '>' } else { '<' }, start) + } else { + String::new() + }; + format!("{} ORDER BY id {} LIMIT {}", where_sql, order_sql, limit) +} + +async fn router( + parts: &Parts, + body: Body, + state: &'static ServerState, +) -> Result<Response<Body>, ServerError> { + // Check status error + if !state.status.load(Ordering::SeqCst) { + return Ok(Response::builder() + .status(StatusCode::BAD_GATEWAY) + .body(Body::empty()) + .unwrap()); + } + + // Check auth method + match &state.auth { + AuthMethod::Basic(auth) => { + if !matches!(parts.headers + .get(hyper::header::AUTHORIZATION) + .and_then(|h| h.to_str().ok()) + .and_then(|s| s.strip_prefix("Basic ")), + Some(token) if token == auth ) + { + return Ok(Response::builder() + .status(StatusCode::UNAUTHORIZED) + .body(Body::empty()) + .unwrap()); + } + } + AuthMethod::None => {} + } + + let response = match parts.uri.path() { + "/config" => { + assert_method(parts, Method::GET)?; + encode_body( + parts, + StatusCode::OK, + &WireConfig { + name: "taler-wire-gateway", + version: "0.0.0", + currency: state.currency.to_str(), + }, + ) + .unexpected()? + } + "/transfer" => { + assert_method(parts, Method::POST)?; + let request: TransferRequest = parse_body(parts, body).await.catch_code( + StatusCode::BAD_REQUEST, + ErrorCode::GENERIC_PARAMETER_MALFORMED, + )?; + if !check_payto(&request.credit_account, state.currency) { + return Err(ServerError::code( + StatusCode::BAD_REQUEST, + ErrorCode::GENERIC_PAYTO_URI_MALFORMED, + )); + } + if Currency::from_str(&request.amount.currency) != Ok(state.currency) { + return Err(ServerError::code( + StatusCode::BAD_REQUEST, + ErrorCode::GENERIC_PARAMETER_MALFORMED, + )); + } + let mut db = state.pool.get().await.catch_code( + StatusCode::GATEWAY_TIMEOUT, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )?; + // Handle idempotence, check previous transaction with the same request_uid + let row = db.query_opt("SELECT amount, exchange_url, wtid, credit_acc, id, _date FROM tx_out WHERE request_uid = $1", &[&request.request_uid.as_slice()]) + .await?; + if let Some(row) = row { + let prev = TransferRequest { + request_uid: request.request_uid.clone(), + amount: sql_amount(&row, 0), + exchange_base_url: sql_url(&row, 1), + wtid: ShortHashCode::from(sql_array(&row, 2)), + credit_account: sql_url(&row, 3), + }; + if prev == request { + // Idempotence + return encode_body( + parts, + StatusCode::OK, + &TransferResponse { + timestamp: Timestamp::Time(row.get(5)), + row_id: sql_safe_u64(&row, 4), + }, + ) + .unexpected(); + } else { + return Err(ServerError::status(StatusCode::CONFLICT)); + } + } + + let timestamp = Timestamp::now(); + let tx = db.transaction().await?; + let row = tx.query_one("INSERT INTO tx_out (amount, wtid, debit_acc, credit_acc, exchange_url, request_uid) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id", &[ + &request.amount.to_string(), &request.wtid.as_slice(), &state.payto.as_ref(), &request.credit_account.as_ref(), &request.exchange_base_url.as_ref(), &request.request_uid.as_slice() + ]).await?; + tx.execute("NOTIFY new_tx", &[]).await?; + tx.commit().await?; + encode_body( + parts, + StatusCode::OK, + &TransferResponse { + timestamp, + row_id: sql_safe_u64(&row, 0), + }, + ) + .unexpected()? + } + "/history/incoming" => { + assert_method(parts, Method::GET)?; + let params = history_params(parts)?; + let filter = sql_history_filter(¶ms); + let db = state.pool.get().await.catch_code( + StatusCode::GATEWAY_TIMEOUT, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )?; + let transactions: Vec<_> = db + .query( + &format!("SELECT id, _date, amount, reserve_pub, debit_acc FROM tx_in {}", filter), + &[], + ) + .await.catch_code( + StatusCode::BAD_GATEWAY, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )? + .into_iter() + .map(| row| { + IncomingBankTransaction::IncomingReserveTransaction { + row_id: sql_safe_u64(&row, 0), + date: Timestamp::Time(row.get(1)), + amount: sql_amount(&row, 2), + reserve_pub: ShortHashCode::from(sql_array(&row, 3)), + debit_account: sql_url(&row, 4), + } + }).collect(); + if transactions.is_empty() { + Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty()) + .unwrap() + } else { + encode_body( + parts, + StatusCode::OK, + &IncomingHistory { + credit_account: state.payto.clone(), + incoming_transactions: transactions, + }, + ) + .unexpected()? + } + } + "/history/outgoing" => { + assert_method(parts, Method::GET)?; + let params = history_params(parts)?; + let filter = sql_history_filter(¶ms); + + let db = state.pool.get().await.catch_code( + StatusCode::GATEWAY_TIMEOUT, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )?; + let transactions: Vec<_> =db + .query( + &format!("SELECT id, _date, amount, wtid, credit_acc, exchange_url FROM tx_out {}", filter), + &[], + ) + .await.catch_code( + StatusCode::BAD_GATEWAY, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )? + .into_iter() + .map(|row| OutgoingBankTransaction { + row_id: sql_safe_u64(&row, 0), + date: Timestamp::Time(row.get(1)), + amount: sql_amount(&row, 2), + wtid: ShortHashCode::from(sql_array(&row, 3)), + credit_account: sql_url(&row, 4), + exchange_base_url:sql_url(&row, 5), + }).collect(); + if transactions.is_empty() { + Response::builder() + .status(StatusCode::NO_CONTENT) + .body(Body::empty()) + .unwrap() + } else { + encode_body( + parts, + StatusCode::OK, + &OutgoingHistory { + debit_account: state.payto.clone(), + outgoing_transactions: transactions, + }, + ) + .unexpected()? + } + } + #[cfg(feature = "test")] + "/admin/add-incoming" => { + // We do not check input as this is a test admin endpoint + assert_method(&parts, Method::POST).unwrap(); + let request: common::api_wire::AddIncomingRequest = + parse_body(&parts, body).await.unwrap(); + let timestamp = Timestamp::now(); + let db = state.pool.get().await.catch_code( + StatusCode::GATEWAY_TIMEOUT, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )?; + let row = db.query_one("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES (now(), $1, $2, $3, $4) RETURNING id", &[ + &request.amount.to_string(), &request.reserve_pub.as_slice(), &request.debit_account.as_ref(), &"payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj" + ]).await.catch_code( + StatusCode::BAD_GATEWAY, + ErrorCode::GENERIC_DB_FETCH_FAILED, + )?; + encode_body( + parts, + StatusCode::OK, + &TransferResponse { + timestamp, + row_id: sql_safe_u64(&row, 0), + }, + ) + .unexpected()? + } + _ => { + return Err(ServerError::code( + StatusCode::NOT_FOUND, + ErrorCode::GENERIC_ENDPOINT_UNKNOWN, + )) + } + }; + Ok(response) +} + +/// Listen to backend status change +fn status_watcher(state: &'static ServerState) { + fn inner(state: &'static ServerState) -> Result<(), Box<dyn std::error::Error>> { + let mut db = state.db_config.connect(NoTls)?; + // Register as listener + db.batch_execute("LISTEN status")?; + loop { + // Sync state + let row = db.query_one("SELECT value FROM state WHERE name = 'status'", &[])?; + let status: &[u8] = row.get(0); + assert!(status.len() == 1 && status[0] < 2); + state.status.store(status[0] == 1, Ordering::SeqCst); + // Wait for next notification + db.notifications().blocking_iter().next()?; + } + } + + loop { + if let Err(err) = inner(state) { + error!("status-watcher: {}", err); + std::thread::sleep(Duration::from_secs(5)); + } + } +} |