depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 05720fcf2dd2829009adf0f145f1341d6e8260a2
parent 9c49f61e1591b445fcb07efb68248a2238df1b70
Author: Antoine A <>
Date:   Wed,  8 Dec 2021 15:48:29 +0100

Add reponse compression and improve error handling

Diffstat:
Mmakefile | 3+--
Mscript/generate_in_tx.sh | 7+++----
Mwire-gateway/src/api_common.rs | 4++--
Mwire-gateway/src/main.rs | 152+++++++++++++++++++++++++++++++++++++++++++++++++++++++++----------------------
4 files changed, 116 insertions(+), 50 deletions(-)

diff --git a/makefile b/makefile @@ -1,6 +1,5 @@ install: - cargo install --path btc-wire - cargo install --path btc-wire --bin btc-wire-cli + cargo install --path btc-wire --bin btc-wire-cli btc-wire cargo install --path wire-gateway test: diff --git a/script/generate_in_tx.sh b/script/generate_in_tx.sh @@ -1,7 +1,6 @@ -for n in `seq 0 $1`; do - RAND=$RANDOM - btc-wire-cli transfer 0.0000$RANDOM - echo "BTC0.0000$RANDOM" +for n in `shuf -i 000-999 -n $1`; do + btc-wire-cli transfer 0.0000$n + echo "BTC0.0000$n" done btc-wire-cli nblock echo "$1 incoming transaction generated" \ No newline at end of file diff --git a/wire-gateway/src/api_common.rs b/wire-gateway/src/api_common.rs @@ -56,11 +56,11 @@ impl<'de> Deserialize<'de> for Timestamp { SystemTime::UNIX_EPOCH + Duration::from_millis(since_epoch_ms), )) } else { - Err(todo!()) + Err(Error::custom("Expected epoch time millis")) } } Value::String(str) if str == "never" => Ok(Self::Never), - it => Err(todo!()), + _ => Err(Error::custom("Expected epoch time millis or 'never'")), } } } diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs @@ -1,17 +1,17 @@ use std::{process::exit, str::FromStr}; use api_common::{Amount, SafeUint64, ShortHashCode, Timestamp}; -use api_wire::{AddIncomingRequest, OutgoingBankTransaction, OutgoingHistory}; -use async_compression::tokio::bufread::ZlibDecoder; +use api_wire::{OutgoingBankTransaction, OutgoingHistory}; +use async_compression::tokio::{bufread::ZlibDecoder, write::ZlibEncoder}; use error_codes::ErrorCode; use hyper::{ header, http::request::Parts, service::{make_service_fn, service_fn}, - Body, Error, Method, Request, Response, Server, StatusCode, + Body, Error, Method, Response, Server, StatusCode, }; -use tokio::io::AsyncReadExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio_postgres::{Client, NoTls}; use url::Url; @@ -34,7 +34,6 @@ fn check_pay_to(url: &Url) -> bool { } const SELF_PAYTO: &str = "payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj"; -const SELF_URL: &str = "http://localhost:8080"; #[cfg(target_family = "windows")] const DB_URL: &str = "postgres://localhost/wire_gateway?user=postgres"; #[cfg(target_family = "unix")] @@ -55,27 +54,33 @@ async fn main() { let addr = ([0, 0, 0, 0], 8080).into(); let make_service = make_service_fn(move |_| async move { Ok::<_, Error>(service_fn(move |req| async move { - let response = match router(req, state).await { + let (parts, body) = req.into_parts(); + let response = match router(&parts, body, state).await { Ok(resp) => resp, - Err((status, err)) => { - json_response( - status, - &ErrorDetail { - code: err as i64, - hint: None, - detail: None, - parameter: None, - path: None, - offset: None, - index: None, - object: None, - currency: None, - type_expected: None, - type_actual: None, - }, - ) - .await - } + Err((status, err)) => json_response( + &parts, + status, + &ErrorDetail { + code: err as i64, + hint: None, + detail: None, + parameter: None, + path: None, + offset: None, + index: None, + object: None, + currency: None, + type_expected: None, + type_actual: None, + }, + ) + .await + .unwrap_or( + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::empty()) + .unwrap(), + ), }; Ok::<Response<Body>, Error>(response) })) @@ -97,8 +102,21 @@ struct ServerState { pub mod api_common; pub mod api_wire; -async fn parse_json<J: serde::de::DeserializeOwned>(parts: &Parts, body: Body) -> J { - let bytes = hyper::body::to_bytes(body).await.unwrap(); +#[derive(Debug, thiserror::Error)] +enum ParseError { + #[error(transparent)] + Body(#[from] hyper::Error), + #[error(transparent)] + Json(#[from] serde_json::Error), + #[error(transparent)] + Deflate(#[from] tokio::io::Error), +} + +async fn parse_json<J: serde::de::DeserializeOwned>( + parts: &Parts, + body: Body, +) -> Result<J, ParseError> { + let bytes = hyper::body::to_bytes(body).await?; let mut buf = Vec::new(); let decompressed = if parts .headers @@ -107,23 +125,53 @@ async fn parse_json<J: serde::de::DeserializeOwned>(parts: &Parts, body: Body) - .unwrap_or(false) { let mut decoder = ZlibDecoder::new(bytes.as_ref()); - decoder.read_to_end(&mut buf).await.unwrap(); + decoder.read_to_end(&mut buf).await?; &buf } else { bytes.as_ref() }; - serde_json::from_slice(&decompressed).unwrap() + Ok(serde_json::from_slice(&decompressed)?) } -async fn json_response<J: serde::Serialize>(status: StatusCode, json: &J) -> Response<Body> { - let json = serde_json::to_vec(json).unwrap(); - // TODO investigate why curl do not like my async compression - Response::builder() - .status(status) - .header(header::CONTENT_TYPE, "application/json") - .body(Body::from(json)) - .unwrap() +#[derive(Debug, thiserror::Error)] +enum JsonRespError { + #[error(transparent)] + Json(#[from] serde_json::Error), + #[error(transparent)] + Deflate(#[from] tokio::io::Error), +} + +async fn json_response<J: serde::Serialize>( + parts: &Parts, + status: StatusCode, + json: &J, +) -> Result<Response<Body>, JsonRespError> { + let json = serde_json::to_vec(json)?; + if parts + .headers + .get(header::ACCEPT_ENCODING) + .and_then(|it| it.to_str().ok()) + .map(|str| str.contains("deflate")) + .unwrap_or(false) + { + let mut encoder = ZlibEncoder::new(Vec::new()); + encoder.write_all(&json).await?; + encoder.shutdown().await?; + let compressed = encoder.into_inner(); + Ok(Response::builder() + .status(status) + .header(header::CONTENT_TYPE, "application/json") + .header(header::CONTENT_ENCODING, "deflate") + .body(Body::from(compressed)) + .unwrap()) + } else { + Ok(Response::builder() + .status(status) + .header(header::CONTENT_TYPE, "application/json") + .body(Body::from(json)) + .unwrap()) + } } type ServerErr = (StatusCode, ErrorCode); @@ -169,14 +217,19 @@ fn sql_history_filter(params: &HistoryParams) -> String { } async fn router( - req: Request<Body>, + parts: &Parts, + body: Body, state: &'static ServerState, ) -> Result<Response<Body>, (StatusCode, ErrorCode)> { - let (parts, body) = req.into_parts(); let response = match parts.uri.path() { "/transfer" => { assert_method(&parts, Method::POST)?; - let request: TransferRequest = parse_json(&parts, body).await; + let request: TransferRequest = parse_json(&parts, body).await.map_err(|_| { + ( + StatusCode::BAD_REQUEST, + ErrorCode::GENERIC_PARAMETER_MALFORMED, + ) + })?; if !check_pay_to(&request.credit_account) { return Err(( StatusCode::BAD_REQUEST, @@ -194,6 +247,7 @@ async fn router( &request.amount.to_string(), &request.wtid.as_ref(), &SELF_PAYTO, &request.credit_account.to_string(), &request.exchange_base_url.to_string(), &0i16 ]).await.unwrap(); json_response( + parts, StatusCode::OK, &TransferResponse { timestamp, @@ -236,6 +290,7 @@ async fn router( }) .collect(); json_response( + parts, StatusCode::OK, &IncomingHistory { incoming_transactions: transactions, @@ -275,6 +330,7 @@ async fn router( }) .collect(); json_response( + parts, StatusCode::OK, &OutgoingHistory { outgoing_transactions: transactions, @@ -285,13 +341,20 @@ async fn router( #[cfg(feature = "test")] "/admin/add-incoming" => { assert_method(&parts, Method::POST)?; - let request: AddIncomingRequest = parse_json(&parts, body).await; + let request: api_wire::AddIncomingRequest = + parse_json(&parts, body).await.map_err(|_| { + ( + StatusCode::BAD_REQUEST, + ErrorCode::GENERIC_PARAMETER_MALFORMED, + ) + })?; // We do not check input as this is an admin endpoint let timestamp = Timestamp::now(); let row = state.client.query_one("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES (now(), $1, $2, $3, $4) RETURNING id", &[ &request.amount.to_string(), &request.reserve_pub.as_ref(), &request.debit_account.to_string(), &"payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj" ]).await.unwrap(); json_response( + parts, StatusCode::OK, &TransferResponse { timestamp, @@ -305,5 +368,10 @@ async fn router( } _ => return Err((StatusCode::NOT_FOUND, ErrorCode::GENERIC_ENDPOINT_UNKNOWN)), }; - return Ok(response); + return Ok(response.unwrap_or( + Response::builder() + .status(StatusCode::INTERNAL_SERVER_ERROR) + .body(Body::empty()) + .unwrap(), + )); }