summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAntoine A <>2024-02-01 18:18:49 +0100
committerAntoine A <>2024-02-01 18:18:49 +0100
commit635e0e1d0ce6773f7bea4b4fed7c22f614ba2829 (patch)
tree008fb2d6af4efe91302528c7aa76089890e68020
parentc97f15ab23c310dfb630f78c1513c38803ab0f2a (diff)
downloaddepolymerization-635e0e1d0ce6773f7bea4b4fed7c22f614ba2829.tar.gz
depolymerization-635e0e1d0ce6773f7bea4b4fed7c22f614ba2829.tar.bz2
depolymerization-635e0e1d0ce6773f7bea4b4fed7c22f614ba2829.zip
Update taler wire api to the newest specification
-rw-r--r--common/src/api_wire.rs199
-rw-r--r--instrumentation/src/main.rs2
-rw-r--r--instrumentation/src/utils.rs50
-rw-r--r--wire-gateway/src/json.rs2
-rw-r--r--wire-gateway/src/main.rs1084
5 files changed, 687 insertions, 650 deletions
diff --git a/common/src/api_wire.rs b/common/src/api_wire.rs
index 8476069..7f7b477 100644
--- a/common/src/api_wire.rs
+++ b/common/src/api_wire.rs
@@ -1,98 +1,101 @@
-/*
- This file is part of TALER
- Copyright (C) 2022 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-use url::Url;
-
-use crate::api_common::{Amount, EddsaPublicKey, HashCode, SafeU64, ShortHashCode, Timestamp};
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-TransferResponse>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct TransferResponse {
- pub timestamp: Timestamp,
- pub row_id: SafeU64,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-TransferRequest>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
-pub struct TransferRequest {
- pub request_uid: HashCode,
- pub amount: Amount,
- pub exchange_base_url: Url,
- pub wtid: ShortHashCode,
- pub credit_account: Url,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-OutgoingHistory>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct OutgoingHistory {
- pub outgoing_transactions: Vec<OutgoingBankTransaction>,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-OutgoingBankTransaction>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct OutgoingBankTransaction {
- pub row_id: SafeU64,
- pub date: Timestamp,
- pub amount: Amount,
- pub credit_account: Url,
- pub debit_account: Url,
- pub wtid: ShortHashCode,
- pub exchange_base_url: Url,
-}
-
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct IncomingHistory {
- pub incoming_transactions: Vec<IncomingBankTransaction>,
-}
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-#[serde(tag = "type")]
-pub enum IncomingBankTransaction {
- #[serde(rename = "RESERVE")]
- IncomingReserveTransaction {
- row_id: SafeU64,
- date: Timestamp,
- amount: Amount,
- credit_account: Url,
- debit_account: Url,
- reserve_pub: EddsaPublicKey,
- },
- #[serde(rename = "WAD")]
- IncomingWadTransaction {
- // TODO not yet supported
- },
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-AddIncomingRequest>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct AddIncomingRequest {
- pub amount: Amount,
- pub reserve_pub: EddsaPublicKey,
- pub debit_account: Url,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#tsref-type-AddIncomingResponse>
-#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
-pub struct AddIncomingResponse {
- pub row_id: SafeU64,
- pub timestamp: Timestamp,
-}
-
-/// <https://docs.taler.net/core/api-wire.html#querying-the-transaction-history>
-#[derive(Debug, Clone, serde::Deserialize)]
-pub struct HistoryParams {
- pub start: Option<u64>,
- pub delta: i64,
- pub long_pool_ms: Option<u64>,
-}
+/*
+ This file is part of TALER
+ Copyright (C) 2022 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+
+/// Type for the Taler Wire Gateway HTTP API <https://docs.taler.net/core/api-bank-wire.html#taler-wire-gateway-http-api>
+use url::Url;
+
+use crate::api_common::{Amount, EddsaPublicKey, HashCode, SafeU64, ShortHashCode, Timestamp};
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct WireConfig {
+ pub name: &'static str,
+ pub version: &'static str,
+ pub currency: &'static str,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct TransferResponse {
+ pub timestamp: Timestamp,
+ pub row_id: SafeU64,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
+pub struct TransferRequest {
+ pub request_uid: HashCode,
+ pub amount: Amount,
+ pub exchange_base_url: Url,
+ pub wtid: ShortHashCode,
+ pub credit_account: Url,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct OutgoingHistory {
+ pub outgoing_transactions: Vec<OutgoingBankTransaction>,
+ pub debit_account: Url,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct OutgoingBankTransaction {
+ pub row_id: SafeU64,
+ pub date: Timestamp,
+ pub amount: Amount,
+ pub credit_account: Url,
+ pub wtid: ShortHashCode,
+ pub exchange_base_url: Url,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct IncomingHistory {
+ pub credit_account: Url,
+ pub incoming_transactions: Vec<IncomingBankTransaction>,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+#[serde(tag = "type")]
+pub enum IncomingBankTransaction {
+ #[serde(rename = "RESERVE")]
+ IncomingReserveTransaction {
+ row_id: SafeU64,
+ date: Timestamp,
+ amount: Amount,
+ debit_account: Url,
+ reserve_pub: EddsaPublicKey,
+ },
+ #[serde(rename = "WAD")]
+ IncomingWadTransaction {
+ // TODO not yet supported
+ },
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct AddIncomingRequest {
+ pub amount: Amount,
+ pub reserve_pub: EddsaPublicKey,
+ pub debit_account: Url,
+}
+
+#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
+pub struct AddIncomingResponse {
+ pub row_id: SafeU64,
+ pub timestamp: Timestamp,
+}
+
+#[derive(Debug, Clone, serde::Deserialize)]
+pub struct HistoryParams {
+ pub start: Option<u64>,
+ pub delta: i64,
+ pub long_pool_ms: Option<u64>,
+}
diff --git a/instrumentation/src/main.rs b/instrumentation/src/main.rs
index b787d5f..fd3e46f 100644
--- a/instrumentation/src/main.rs
+++ b/instrumentation/src/main.rs
@@ -152,7 +152,7 @@ pub fn main() {
let len = results.len();
m.clear().unwrap();
for ((result, _, out, msg), name) in &results {
- if let Err(_) = result {
+ if result.is_err() {
println!("{} {}\n{}", name.magenta(), msg.red(), out.bright_black());
}
}
diff --git a/instrumentation/src/utils.rs b/instrumentation/src/utils.rs
index 5509754..baf3686 100644
--- a/instrumentation/src/utils.rs
+++ b/instrumentation/src/utils.rs
@@ -44,16 +44,21 @@ pub fn print_now(disp: impl Display) {
#[must_use]
pub fn check_incoming(base_url: &str, txs: &[([u8; 32], Amount)]) -> bool {
- let history: IncomingHistory = ureq::get(&format!("{}history/incoming", base_url))
+ let res = ureq::get(&format!("{}history/incoming", base_url))
.query("delta", &format!("-{}", txs.len()))
.call()
- .unwrap()
- .into_json()
.unwrap();
+ if txs.is_empty() {
+ res.status() == 204
+ } else {
+ if res.status() != 200 {
+ return false;
+ }
+ let history: IncomingHistory = res.into_json().unwrap();
- history.incoming_transactions.len() == txs.len()
- && txs.iter().all(|(reserve_pub_key, taler_amount)| {
- history.incoming_transactions.iter().any(|h| {
+ history.incoming_transactions.len() == txs.len()
+ && txs.iter().all(|(reserve_pub_key, taler_amount)| {
+ history.incoming_transactions.iter().any(|h| {
matches!(
h,
IncomingBankTransaction::IncomingReserveTransaction {
@@ -63,7 +68,8 @@ pub fn check_incoming(base_url: &str, txs: &[([u8; 32], Amount)]) -> bool {
} if reserve_pub == &Base32::from(*reserve_pub_key) && amount == taler_amount
)
})
- })
+ })
+ }
}
pub fn gateway_error(path: &str, error: u16) {
@@ -96,10 +102,7 @@ pub fn check_gateway_down(base_url: &str) -> bool {
#[must_use]
pub fn check_gateway_up(base_url: &str) -> bool {
- ureq::get(&format!("{}history/incoming", base_url))
- .query("delta", "-5")
- .call()
- .is_ok()
+ ureq::get(&format!("{}config", base_url)).call().is_ok()
}
pub fn transfer(base_url: &str, wtid: &[u8; 32], url: &Url, credit_account: Url, amount: &Amount) {
@@ -116,18 +119,27 @@ pub fn transfer(base_url: &str, wtid: &[u8; 32], url: &Url, credit_account: Url,
#[must_use]
pub fn check_outgoing(base_url: &str, url: &Url, txs: &[([u8; 32], Amount)]) -> bool {
- let history: OutgoingHistory = ureq::get(&format!("{}history/outgoing", base_url))
+ let res = ureq::get(&format!("{}history/outgoing", base_url))
.query("delta", &format!("-{}", txs.len()))
.call()
- .unwrap()
- .into_json()
.unwrap();
- history.outgoing_transactions.len() == txs.len()
- && txs.iter().all(|(wtid, amount)| {
- history.outgoing_transactions.iter().any(|h| {
- h.wtid == Base32::from(*wtid) && &h.exchange_base_url == url && &h.amount == amount
+ if txs.is_empty() {
+ res.status() == 204
+ } else {
+ if res.status() != 200 {
+ return false;
+ }
+ let history: OutgoingHistory = res.into_json().unwrap();
+
+ history.outgoing_transactions.len() == txs.len()
+ && txs.iter().all(|(wtid, amount)| {
+ history.outgoing_transactions.iter().any(|h| {
+ h.wtid == Base32::from(*wtid)
+ && &h.exchange_base_url == url
+ && &h.amount == amount
+ })
})
- })
+ }
}
pub struct ChildGuard(pub Child);
diff --git a/wire-gateway/src/json.rs b/wire-gateway/src/json.rs
index da0c3c3..96a55b8 100644
--- a/wire-gateway/src/json.rs
+++ b/wire-gateway/src/json.rs
@@ -73,7 +73,7 @@ pub enum EncodeBodyError {
Json(#[from] serde_json::Error),
}
-pub async fn encode_body<J: serde::Serialize>(
+pub fn encode_body<J: serde::Serialize>(
parts: &Parts,
status: StatusCode,
json: &J,
diff --git a/wire-gateway/src/main.rs b/wire-gateway/src/main.rs
index 7024d9b..98c7589 100644
--- a/wire-gateway/src/main.rs
+++ b/wire-gateway/src/main.rs
@@ -1,531 +1,553 @@
-/*
- This file is part of TALER
- Copyright (C) 2022 Taler Systems SA
-
- TALER is free software; you can redistribute it and/or modify it under the
- terms of the GNU Affero General Public License as published by the Free Software
- Foundation; either version 3, or (at your option) any later version.
-
- TALER is distributed in the hope that it will be useful, but WITHOUT ANY
- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
- A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
-
- You should have received a copy of the GNU Affero General Public License along with
- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
-*/
-use clap::Parser;
-use common::{
- api_common::{ShortHashCode, Timestamp},
- api_wire::{
- HistoryParams, IncomingBankTransaction, IncomingHistory, OutgoingBankTransaction,
- OutgoingHistory, TransferRequest, TransferResponse,
- },
- config::{AuthMethod, TalerConfig},
- currency::Currency,
- error_codes::ErrorCode,
- log::{
- fail,
- log::{error, info, log, Level},
- },
- postgres::{self, fallible_iterator::FallibleIterator},
- sql::{sql_amount, sql_array, sql_safe_u64, sql_url},
- url::Url,
-};
-use deadpool_postgres::{Pool, Runtime};
-use error::{CatchResult, ServerError};
-use hyper::{
- http::request::Parts,
- service::{make_service_fn, service_fn},
- Body, Method, Response, Server, StatusCode,
-};
-use json::{encode_body, parse_body};
-use listenfd::ListenFd;
-use std::{
- convert::Infallible,
- path::PathBuf,
- str::FromStr,
- sync::atomic::{AtomicBool, AtomicU32, Ordering},
- time::{Duration, Instant},
-};
-use tokio::sync::Notify;
-use tokio_postgres::{config::Host, NoTls};
-
-mod error;
-mod json;
-
-struct ServerState {
- pool: Pool,
- db_config: postgres::Config,
- payto: Url,
- currency: Currency,
- notify: Notify,
- lifetime: Option<AtomicU32>,
- status: AtomicBool,
- auth: AuthMethod,
-}
-
-impl ServerState {
- /// Decrease lifetime, triggering graceful shutdown when reach lifetime end
- pub fn step(&self) {
- if let Some(lifetime) = &self.lifetime {
- let mut current = lifetime.load(Ordering::Relaxed);
- loop {
- if current == 0 {
- self.notify.notify_one();
- }
- match lifetime.compare_exchange_weak(
- current,
- current - 1,
- Ordering::SeqCst,
- Ordering::Relaxed,
- ) {
- Ok(_) => break,
- Err(new) => current = new,
- }
- }
- }
- }
-
- pub async fn shutdown_signal(&self) {
- self.notify.notified().await;
- info!("Reach end of lifetime");
- }
-}
-
-/// Taler wire gateway server for depolymerizer
-#[derive(clap::Parser, Debug)]
-struct Args {
- /// Override default configuration file path
- #[clap(global = true, short, long)]
- config: Option<PathBuf>,
-}
-
-#[tokio::main]
-async fn main() {
- common::log::init();
- let args = Args::parse();
- let taler_config = TalerConfig::load(args.config.as_deref());
-
- #[cfg(feature = "test")]
- common::log::log::warn!("Running with test admin endpoint unsuitable for production");
-
- // Parse postgres url
- let db_config = taler_config.db_config();
- // TODO find a way to clean this ugly mess
- let mut cfg = deadpool_postgres::Config::new();
- cfg.user = db_config.get_user().map(|it| it.to_string());
- cfg.password = db_config
- .get_password()
- .map(|it| String::from_utf8(it.to_vec()).unwrap());
- cfg.dbname = db_config.get_dbname().map(|it| it.to_string());
- cfg.options = db_config.get_options().map(|it| it.to_string());
- cfg.host = Some(
- db_config
- .get_hosts()
- .iter()
- .map(|it| match it {
- Host::Tcp(it) => it.to_string(),
- #[cfg(unix)]
- Host::Unix(it) => it.to_str().unwrap().to_string(),
- })
- .collect(),
- );
- cfg.ports = Some(db_config.get_ports().to_vec());
- cfg.application_name = db_config.get_application_name().map(|it| it.to_string());
- cfg.connect_timeout = db_config.get_connect_timeout().cloned();
-
- let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
- let payto = taler_config.payto();
- let state = ServerState {
- pool,
- notify: Notify::new(),
- lifetime: taler_config.http_lifetime().map(AtomicU32::new),
- status: AtomicBool::new(true),
- db_config,
- payto,
- currency: taler_config.currency,
- auth: taler_config.auth_method(),
- };
- let state: &'static ServerState = Box::leak(Box::new(state));
- std::thread::spawn(move || status_watcher(state));
- let service = service_fn(move |req| async move {
- state.step();
- let start = Instant::now();
- let (parts, body) = req.into_parts();
- let (response, msg) = match router(&parts, body, state).await {
- Ok(resp) => (resp, String::new()),
- Err(err) => err.response(),
- };
- let status = response.status().as_u16();
- let level = if status >= 500 {
- Level::Error
- } else if status >= 400 {
- Level::Warn
- } else {
- Level::Info
- };
- log!(
- level,
- "{} {} {}ms {} - {}",
- parts.method,
- status,
- start.elapsed().as_millis(),
- parts.uri.path(),
- msg
- );
- Ok::<Response<Body>, Infallible>(response)
- });
- let make_service = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) });
- let make_service_unix = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) });
-
- let mut listenfd = ListenFd::from_env();
-
- if let Some(listener) = listenfd.take_tcp_listener(0).unwrap() {
- info!(
- "Server listening on activated socket {}",
- listener.local_addr().unwrap()
- );
- let server = Server::from_tcp(listener)
- .unwrap()
- .serve(make_service)
- .with_graceful_shutdown(state.shutdown_signal());
- if let Err(e) = server.await {
- error!("server: {}", e);
- }
- } else if let Some(path) = taler_config.unix_path() {
- use hyperlocal::UnixServerExt;
- info!("Server listening on unix domain socket {:?}", path);
- if let Err(err) = std::fs::remove_file(&path) {
- if err.kind() != std::io::ErrorKind::NotFound {
- fail(format_args!("{}", err));
- }
- }
- let server = Server::bind_unix(path)
- .unwrap()
- .serve(make_service_unix)
- .with_graceful_shutdown(state.shutdown_signal());
- if let Err(e) = server.await {
- error!("server: {}", e);
- }
- } else {
- let addr = ([0, 0, 0, 0], taler_config.port()).into();
- info!("Server listening on http://{}", &addr);
- let server = Server::bind(&addr)
- .serve(make_service)
- .with_graceful_shutdown(state.shutdown_signal());
- if let Err(e) = server.await {
- error!("server: {}", e);
- }
- };
- info!("wire-gateway stopped");
-}
-
-/// Check if an url if a valid payto url for the configured currency
-fn check_payto(url: &Url, currency: Currency) -> bool {
- match currency {
- Currency::ETH(_) => check_pay_to_eth(url),
- Currency::BTC(_) => check_pay_to_btc(url),
- }
-}
-
-/// Check if an url is a valid bitcoin payto url
-fn check_pay_to_btc(url: &Url) -> bool {
- return url.domain() == Some("bitcoin")
- && url.scheme() == "payto"
- && url.username() == ""
- && url.password().is_none()
- && url.query().is_none()
- && url.fragment().is_none()
- && bitcoin::Address::from_str(url.path().trim_start_matches('/')).is_ok();
-}
-
-/// Check if an url is a valid ethereum payto url
-fn check_pay_to_eth(url: &Url) -> bool {
- return url.domain() == Some("ethereum")
- && url.scheme() == "payto"
- && url.username() == ""
- && url.password().is_none()
- && url.query().is_none()
- && url.fragment().is_none()
- && ethereum_types::H160::from_str(url.path().trim_start_matches('/')).is_ok();
-}
-
-/// Assert request method match expected
-fn assert_method(parts: &Parts, method: Method) -> Result<(), ServerError> {
- if parts.method == method {
- Ok(())
- } else {
- Err(ServerError::code(
- StatusCode::METHOD_NOT_ALLOWED,
- ErrorCode::GENERIC_METHOD_INVALID,
- ))
- }
-}
-
-/// Parse history params from request
-fn history_params(parts: &Parts) -> Result<HistoryParams, ServerError> {
- let params: HistoryParams = serde_urlencoded::from_str(parts.uri.query().unwrap_or(""))
- .catch_code(
- StatusCode::BAD_REQUEST,
- ErrorCode::GENERIC_PARAMETER_MALFORMED,
- )?;
- if params.delta == 0 {
- return Err(ServerError::code(
- StatusCode::BAD_REQUEST,
- ErrorCode::GENERIC_PARAMETER_MALFORMED,
- ));
- }
- Ok(params)
-}
-
-/// Generate sql query filter from history params
-fn sql_history_filter(params: &HistoryParams) -> String {
- let asc = params.delta > 0;
- let limit = params.delta.abs();
- let order_sql = if asc { "ASC" } else { "DESC" };
- let where_sql = if let Some(start) = params.start {
- format!("WHERE id {} {}", if asc { '>' } else { '<' }, start)
- } else {
- String::new()
- };
- format!("{} ORDER BY id {} LIMIT {}", where_sql, order_sql, limit)
-}
-
-async fn router(
- parts: &Parts,
- body: Body,
- state: &'static ServerState,
-) -> Result<Response<Body>, ServerError> {
- // Check status error
- if !state.status.load(Ordering::SeqCst) {
- return Ok(Response::builder()
- .status(StatusCode::BAD_GATEWAY)
- .body(Body::empty())
- .unwrap());
- }
-
- // Check auth method
- match &state.auth {
- AuthMethod::Basic(auth) => {
- if !matches!(parts.headers
- .get(hyper::header::AUTHORIZATION)
- .and_then(|h| h.to_str().ok())
- .and_then(|s| s.strip_prefix("Basic ")),
- Some(token) if token == auth )
- {
- return Ok(Response::builder()
- .status(StatusCode::UNAUTHORIZED)
- .body(Body::empty())
- .unwrap());
- }
- }
- AuthMethod::None => {}
- }
-
- let response = match parts.uri.path() {
- "/transfer" => {
- assert_method(parts, Method::POST)?;
- let request: TransferRequest = parse_body(parts, body).await.catch_code(
- StatusCode::BAD_REQUEST,
- ErrorCode::GENERIC_PARAMETER_MALFORMED,
- )?;
- if !check_payto(&request.credit_account, state.currency) {
- return Err(ServerError::code(
- StatusCode::BAD_REQUEST,
- ErrorCode::GENERIC_PAYTO_URI_MALFORMED,
- ));
- }
- if Currency::from_str(&request.amount.currency) != Ok(state.currency) {
- return Err(ServerError::code(
- StatusCode::BAD_REQUEST,
- ErrorCode::GENERIC_PARAMETER_MALFORMED,
- ));
- }
- let mut db = state.pool.get().await.catch_code(
- StatusCode::GATEWAY_TIMEOUT,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?;
- // Handle idempotence, check previous transaction with the same request_uid
- let row = db.query_opt("SELECT amount, exchange_url, wtid, credit_acc, id, _date FROM tx_out WHERE request_uid = $1", &[&request.request_uid.as_slice()])
- .await?;
- if let Some(row) = row {
- let prev = TransferRequest {
- request_uid: request.request_uid.clone(),
- amount: sql_amount(&row, 0),
- exchange_base_url: sql_url(&row, 1),
- wtid: ShortHashCode::from(sql_array(&row, 2)),
- credit_account: sql_url(&row, 3),
- };
- if prev == request {
- // Idempotence
- return encode_body(
- parts,
- StatusCode::OK,
- &TransferResponse {
- timestamp: Timestamp::Time(row.get(5)),
- row_id: sql_safe_u64(&row, 4),
- },
- )
- .await
- .unexpected();
- } else {
- return Err(ServerError::status(StatusCode::CONFLICT));
- }
- }
-
- let timestamp = Timestamp::now();
- let tx = db.transaction().await?;
- let row = tx.query_one("INSERT INTO tx_out (amount, wtid, debit_acc, credit_acc, exchange_url, request_uid) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id", &[
- &request.amount.to_string(), &request.wtid.as_slice(), &state.payto.as_ref(), &request.credit_account.as_ref(), &request.exchange_base_url.as_ref(), &request.request_uid.as_slice()
- ]).await?;
- tx.execute("NOTIFY new_tx", &[]).await?;
- tx.commit().await?;
- encode_body(
- parts,
- StatusCode::OK,
- &TransferResponse {
- timestamp,
- row_id: sql_safe_u64(&row, 0),
- },
- )
- .await
- .unexpected()?
- }
- "/history/incoming" => {
- assert_method(parts, Method::GET)?;
- let params = history_params(parts)?;
- let filter = sql_history_filter(&params);
- let db = state.pool.get().await.catch_code(
- StatusCode::GATEWAY_TIMEOUT,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?;
- let transactions = db
- .query(
- &format!("SELECT id, _date, amount, reserve_pub, debit_acc, credit_acc FROM tx_in {}", filter),
- &[],
- )
- .await.catch_code(
- StatusCode::BAD_GATEWAY,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?
- .into_iter()
- .map(|row| IncomingBankTransaction::IncomingReserveTransaction {
- row_id: sql_safe_u64(&row, 0),
- date: Timestamp::Time(row.get(1)),
- amount: sql_amount(&row, 2),
- reserve_pub: ShortHashCode::from(sql_array(&row, 3)),
- debit_account: sql_url(&row, 4),
- credit_account: sql_url(&row, 5),
- })
- .collect();
- encode_body(
- parts,
- StatusCode::OK,
- &IncomingHistory {
- incoming_transactions: transactions,
- },
- )
- .await
- .unexpected()?
- }
- "/history/outgoing" => {
- assert_method(parts, Method::GET)?;
- let params = history_params(parts)?;
- let filter = sql_history_filter(&params);
-
- let db = state.pool.get().await.catch_code(
- StatusCode::GATEWAY_TIMEOUT,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?;
- let transactions = db
- .query(
- &format!("SELECT id, _date, amount, wtid, debit_acc, credit_acc, exchange_url FROM tx_out {}",filter),
- &[],
- )
- .await.catch_code(
- StatusCode::BAD_GATEWAY,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?
- .into_iter()
- .map(|row| OutgoingBankTransaction {
- row_id: sql_safe_u64(&row, 0),
- date: Timestamp::Time(row.get(1)),
- amount: sql_amount(&row, 2),
- wtid: ShortHashCode::from(sql_array(&row, 3)),
- debit_account: sql_url(&row, 4),
- credit_account: sql_url(&row, 5),
- exchange_base_url:sql_url(&row, 6),
- })
- .collect();
- encode_body(
- parts,
- StatusCode::OK,
- &OutgoingHistory {
- outgoing_transactions: transactions,
- },
- )
- .await
- .unexpected()?
- }
- #[cfg(feature = "test")]
- "/admin/add-incoming" => {
- // We do not check input as this is a test admin endpoint
- assert_method(&parts, Method::POST).unwrap();
- let request: common::api_wire::AddIncomingRequest =
- parse_body(&parts, body).await.unwrap();
- let timestamp = Timestamp::now();
- let db = state.pool.get().await.catch_code(
- StatusCode::GATEWAY_TIMEOUT,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?;
- let row = db.query_one("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES (now(), $1, $2, $3, $4) RETURNING id", &[
- &request.amount.to_string(), &request.reserve_pub.as_slice(), &request.debit_account.as_ref(), &"payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj"
- ]).await.catch_code(
- StatusCode::BAD_GATEWAY,
- ErrorCode::GENERIC_DB_FETCH_FAILED,
- )?;
- encode_body(
- parts,
- StatusCode::OK,
- &TransferResponse {
- timestamp,
- row_id: sql_safe_u64(&row, 0),
- },
- )
- .await
- .unexpected()?
- }
- _ => {
- return Err(ServerError::code(
- StatusCode::NOT_FOUND,
- ErrorCode::GENERIC_ENDPOINT_UNKNOWN,
- ))
- }
- };
- Ok(response)
-}
-
-/// Listen to backend status change
-fn status_watcher(state: &'static ServerState) {
- fn inner(state: &'static ServerState) -> Result<(), Box<dyn std::error::Error>> {
- let mut db = state.db_config.connect(NoTls)?;
- // Register as listener
- db.batch_execute("LISTEN status")?;
- loop {
- // Sync state
- let row = db.query_one("SELECT value FROM state WHERE name = 'status'", &[])?;
- let status: &[u8] = row.get(0);
- assert!(status.len() == 1 && status[0] < 2);
- state.status.store(status[0] == 1, Ordering::SeqCst);
- // Wait for next notification
- db.notifications().blocking_iter().next()?;
- }
- }
-
- loop {
- if let Err(err) = inner(state) {
- error!("status-watcher: {}", err);
- std::thread::sleep(Duration::from_secs(5));
- }
- }
-}
+/*
+ This file is part of TALER
+ Copyright (C) 2022 Taler Systems SA
+
+ TALER is free software; you can redistribute it and/or modify it under the
+ terms of the GNU Affero General Public License as published by the Free Software
+ Foundation; either version 3, or (at your option) any later version.
+
+ TALER is distributed in the hope that it will be useful, but WITHOUT ANY
+ WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
+ A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
+
+ You should have received a copy of the GNU Affero General Public License along with
+ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/>
+*/
+use clap::Parser;
+use common::{
+ api_common::{ShortHashCode, Timestamp},
+ api_wire::{
+ HistoryParams, IncomingBankTransaction, IncomingHistory, OutgoingBankTransaction,
+ OutgoingHistory, TransferRequest, TransferResponse, WireConfig,
+ },
+ config::{AuthMethod, TalerConfig},
+ currency::Currency,
+ error_codes::ErrorCode,
+ log::{
+ fail,
+ log::{error, info, log, Level},
+ },
+ postgres::{self, fallible_iterator::FallibleIterator},
+ sql::{sql_amount, sql_array, sql_safe_u64, sql_url},
+ url::Url,
+};
+use deadpool_postgres::{Pool, Runtime};
+use error::{CatchResult, ServerError};
+use hyper::{
+ http::request::Parts,
+ service::{make_service_fn, service_fn},
+ Body, Method, Response, Server, StatusCode,
+};
+use json::{encode_body, parse_body};
+use listenfd::ListenFd;
+use std::{
+ convert::Infallible,
+ path::PathBuf,
+ str::FromStr,
+ sync::atomic::{AtomicBool, AtomicU32, Ordering},
+ time::{Duration, Instant},
+};
+use tokio::sync::Notify;
+use tokio_postgres::{config::Host, NoTls};
+
+mod error;
+mod json;
+
+struct ServerState {
+ pool: Pool,
+ db_config: postgres::Config,
+ payto: Url,
+ currency: Currency,
+ notify: Notify,
+ lifetime: Option<AtomicU32>,
+ status: AtomicBool,
+ auth: AuthMethod,
+}
+
+impl ServerState {
+ /// Decrease lifetime, triggering graceful shutdown when reach lifetime end
+ pub fn step(&self) {
+ if let Some(lifetime) = &self.lifetime {
+ let mut current = lifetime.load(Ordering::Relaxed);
+ loop {
+ if current == 0 {
+ self.notify.notify_one();
+ }
+ match lifetime.compare_exchange_weak(
+ current,
+ current - 1,
+ Ordering::SeqCst,
+ Ordering::Relaxed,
+ ) {
+ Ok(_) => break,
+ Err(new) => current = new,
+ }
+ }
+ }
+ }
+
+ pub async fn shutdown_signal(&self) {
+ self.notify.notified().await;
+ info!("Reach end of lifetime");
+ }
+}
+
+/// Taler wire gateway server for depolymerizer
+#[derive(clap::Parser, Debug)]
+struct Args {
+ /// Override default configuration file path
+ #[clap(global = true, short, long)]
+ config: Option<PathBuf>,
+}
+
+#[tokio::main]
+async fn main() {
+ common::log::init();
+ let args = Args::parse();
+ let taler_config = TalerConfig::load(args.config.as_deref());
+
+ #[cfg(feature = "test")]
+ common::log::log::warn!("Running with test admin endpoint unsuitable for production");
+
+ // Parse postgres url
+ let db_config = taler_config.db_config();
+ // TODO find a way to clean this ugly mess
+ let mut cfg = deadpool_postgres::Config::new();
+ cfg.user = db_config.get_user().map(|it| it.to_string());
+ cfg.password = db_config
+ .get_password()
+ .map(|it| String::from_utf8(it.to_vec()).unwrap());
+ cfg.dbname = db_config.get_dbname().map(|it| it.to_string());
+ cfg.options = db_config.get_options().map(|it| it.to_string());
+ cfg.host = Some(
+ db_config
+ .get_hosts()
+ .iter()
+ .map(|it| match it {
+ Host::Tcp(it) => it.to_string(),
+ #[cfg(unix)]
+ Host::Unix(it) => it.to_str().unwrap().to_string(),
+ })
+ .collect(),
+ );
+ cfg.ports = Some(db_config.get_ports().to_vec());
+ cfg.application_name = db_config.get_application_name().map(|it| it.to_string());
+ cfg.connect_timeout = db_config.get_connect_timeout().cloned();
+
+ let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
+ let payto = taler_config.payto();
+ let state = ServerState {
+ pool,
+ notify: Notify::new(),
+ lifetime: taler_config.http_lifetime().map(AtomicU32::new),
+ status: AtomicBool::new(true),
+ db_config,
+ payto,
+ currency: taler_config.currency,
+ auth: taler_config.auth_method(),
+ };
+ let state: &'static ServerState = Box::leak(Box::new(state));
+ std::thread::spawn(move || status_watcher(state));
+ let service = service_fn(move |req| async move {
+ state.step();
+ let start = Instant::now();
+ let (parts, body) = req.into_parts();
+ let (response, msg) = match router(&parts, body, state).await {
+ Ok(resp) => (resp, String::new()),
+ Err(err) => err.response(),
+ };
+ let status = response.status().as_u16();
+ let level = if status >= 500 {
+ Level::Error
+ } else if status >= 400 {
+ Level::Warn
+ } else {
+ Level::Info
+ };
+ log!(
+ level,
+ "{} {} {}ms {} - {}",
+ parts.method,
+ status,
+ start.elapsed().as_millis(),
+ parts.uri.path(),
+ msg
+ );
+ Ok::<Response<Body>, Infallible>(response)
+ });
+ let make_service = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) });
+ let make_service_unix = make_service_fn(move |_| async move { Ok::<_, Infallible>(service) });
+
+ let mut listenfd = ListenFd::from_env();
+
+ if let Some(listener) = listenfd.take_tcp_listener(0).unwrap() {
+ info!(
+ "Server listening on activated socket {}",
+ listener.local_addr().unwrap()
+ );
+ let server = Server::from_tcp(listener)
+ .unwrap()
+ .serve(make_service)
+ .with_graceful_shutdown(state.shutdown_signal());
+ if let Err(e) = server.await {
+ error!("server: {}", e);
+ }
+ } else if let Some(path) = taler_config.unix_path() {
+ use hyperlocal::UnixServerExt;
+ info!("Server listening on unix domain socket {:?}", path);
+ if let Err(err) = std::fs::remove_file(&path) {
+ if err.kind() != std::io::ErrorKind::NotFound {
+ fail(format_args!("{}", err));
+ }
+ }
+ let server = Server::bind_unix(path)
+ .unwrap()
+ .serve(make_service_unix)
+ .with_graceful_shutdown(state.shutdown_signal());
+ if let Err(e) = server.await {
+ error!("server: {}", e);
+ }
+ } else {
+ let addr = ([0, 0, 0, 0], taler_config.port()).into();
+ info!("Server listening on http://{}", &addr);
+ let server = Server::bind(&addr)
+ .serve(make_service)
+ .with_graceful_shutdown(state.shutdown_signal());
+ if let Err(e) = server.await {
+ error!("server: {}", e);
+ }
+ };
+ info!("wire-gateway stopped");
+}
+
+/// Check if an url if a valid payto url for the configured currency
+fn check_payto(url: &Url, currency: Currency) -> bool {
+ match currency {
+ Currency::ETH(_) => check_pay_to_eth(url),
+ Currency::BTC(_) => check_pay_to_btc(url),
+ }
+}
+
+/// Check if an url is a valid bitcoin payto url
+fn check_pay_to_btc(url: &Url) -> bool {
+ return url.domain() == Some("bitcoin")
+ && url.scheme() == "payto"
+ && url.username() == ""
+ && url.password().is_none()
+ && url.query().is_none()
+ && url.fragment().is_none()
+ && bitcoin::Address::from_str(url.path().trim_start_matches('/')).is_ok();
+}
+
+/// Check if an url is a valid ethereum payto url
+fn check_pay_to_eth(url: &Url) -> bool {
+ return url.domain() == Some("ethereum")
+ && url.scheme() == "payto"
+ && url.username() == ""
+ && url.password().is_none()
+ && url.query().is_none()
+ && url.fragment().is_none()
+ && ethereum_types::H160::from_str(url.path().trim_start_matches('/')).is_ok();
+}
+
+/// Assert request method match expected
+fn assert_method(parts: &Parts, method: Method) -> Result<(), ServerError> {
+ if parts.method == method {
+ Ok(())
+ } else {
+ Err(ServerError::code(
+ StatusCode::METHOD_NOT_ALLOWED,
+ ErrorCode::GENERIC_METHOD_INVALID,
+ ))
+ }
+}
+
+/// Parse history params from request
+fn history_params(parts: &Parts) -> Result<HistoryParams, ServerError> {
+ let params: HistoryParams = serde_urlencoded::from_str(parts.uri.query().unwrap_or(""))
+ .catch_code(
+ StatusCode::BAD_REQUEST,
+ ErrorCode::GENERIC_PARAMETER_MALFORMED,
+ )?;
+ if params.delta == 0 {
+ return Err(ServerError::code(
+ StatusCode::BAD_REQUEST,
+ ErrorCode::GENERIC_PARAMETER_MALFORMED,
+ ));
+ }
+ Ok(params)
+}
+
+/// Generate sql query filter from history params
+fn sql_history_filter(params: &HistoryParams) -> String {
+ let asc = params.delta > 0;
+ let limit = params.delta.abs();
+ let order_sql = if asc { "ASC" } else { "DESC" };
+ let where_sql = if let Some(start) = params.start {
+ format!("WHERE id {} {}", if asc { '>' } else { '<' }, start)
+ } else {
+ String::new()
+ };
+ format!("{} ORDER BY id {} LIMIT {}", where_sql, order_sql, limit)
+}
+
+async fn router(
+ parts: &Parts,
+ body: Body,
+ state: &'static ServerState,
+) -> Result<Response<Body>, ServerError> {
+ // Check status error
+ if !state.status.load(Ordering::SeqCst) {
+ return Ok(Response::builder()
+ .status(StatusCode::BAD_GATEWAY)
+ .body(Body::empty())
+ .unwrap());
+ }
+
+ // Check auth method
+ match &state.auth {
+ AuthMethod::Basic(auth) => {
+ if !matches!(parts.headers
+ .get(hyper::header::AUTHORIZATION)
+ .and_then(|h| h.to_str().ok())
+ .and_then(|s| s.strip_prefix("Basic ")),
+ Some(token) if token == auth )
+ {
+ return Ok(Response::builder()
+ .status(StatusCode::UNAUTHORIZED)
+ .body(Body::empty())
+ .unwrap());
+ }
+ }
+ AuthMethod::None => {}
+ }
+
+ let response = match parts.uri.path() {
+ "/config" => {
+ assert_method(parts, Method::GET)?;
+ encode_body(
+ parts,
+ StatusCode::OK,
+ &WireConfig {
+ name: "taler-wire-gateway",
+ version: "0.0.0",
+ currency: state.currency.to_str(),
+ },
+ )
+ .unexpected()?
+ }
+ "/transfer" => {
+ assert_method(parts, Method::POST)?;
+ let request: TransferRequest = parse_body(parts, body).await.catch_code(
+ StatusCode::BAD_REQUEST,
+ ErrorCode::GENERIC_PARAMETER_MALFORMED,
+ )?;
+ if !check_payto(&request.credit_account, state.currency) {
+ return Err(ServerError::code(
+ StatusCode::BAD_REQUEST,
+ ErrorCode::GENERIC_PAYTO_URI_MALFORMED,
+ ));
+ }
+ if Currency::from_str(&request.amount.currency) != Ok(state.currency) {
+ return Err(ServerError::code(
+ StatusCode::BAD_REQUEST,
+ ErrorCode::GENERIC_PARAMETER_MALFORMED,
+ ));
+ }
+ let mut db = state.pool.get().await.catch_code(
+ StatusCode::GATEWAY_TIMEOUT,
+ ErrorCode::GENERIC_DB_FETCH_FAILED,
+ )?;
+ // Handle idempotence, check previous transaction with the same request_uid
+ let row = db.query_opt("SELECT amount, exchange_url, wtid, credit_acc, id, _date FROM tx_out WHERE request_uid = $1", &[&request.request_uid.as_slice()])
+ .await?;
+ if let Some(row) = row {
+ let prev = TransferRequest {
+ request_uid: request.request_uid.clone(),
+ amount: sql_amount(&row, 0),
+ exchange_base_url: sql_url(&row, 1),
+ wtid: ShortHashCode::from(sql_array(&row, 2)),
+ credit_account: sql_url(&row, 3),
+ };
+ if prev == request {
+ // Idempotence
+ return encode_body(
+ parts,
+ StatusCode::OK,
+ &TransferResponse {
+ timestamp: Timestamp::Time(row.get(5)),
+ row_id: sql_safe_u64(&row, 4),
+ },
+ )
+ .unexpected();
+ } else {
+ return Err(ServerError::status(StatusCode::CONFLICT));
+ }
+ }
+
+ let timestamp = Timestamp::now();
+ let tx = db.transaction().await?;
+ let row = tx.query_one("INSERT INTO tx_out (amount, wtid, debit_acc, credit_acc, exchange_url, request_uid) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id", &[
+ &request.amount.to_string(), &request.wtid.as_slice(), &state.payto.as_ref(), &request.credit_account.as_ref(), &request.exchange_base_url.as_ref(), &request.request_uid.as_slice()
+ ]).await?;
+ tx.execute("NOTIFY new_tx", &[]).await?;
+ tx.commit().await?;
+ encode_body(
+ parts,
+ StatusCode::OK,
+ &TransferResponse {
+ timestamp,
+ row_id: sql_safe_u64(&row, 0),
+ },
+ )
+ .unexpected()?
+ }
+ "/history/incoming" => {
+ assert_method(parts, Method::GET)?;
+ let params = history_params(parts)?;
+ let filter = sql_history_filter(&params);
+ let db = state.pool.get().await.catch_code(
+ StatusCode::GATEWAY_TIMEOUT,
+ ErrorCode::GENERIC_DB_FETCH_FAILED,
+ )?;
+ let transactions: Vec<_> = db
+ .query(
+ &format!("SELECT id, _date, amount, reserve_pub, debit_acc FROM tx_in {}", filter),
+ &[],
+ )
+ .await.catch_code(
+ StatusCode::BAD_GATEWAY,
+ ErrorCode::GENERIC_DB_FETCH_FAILED,
+ )?
+ .into_iter()
+ .map(| row| {
+ IncomingBankTransaction::IncomingReserveTransaction {
+ row_id: sql_safe_u64(&row, 0),
+ date: Timestamp::Time(row.get(1)),
+ amount: sql_amount(&row, 2),
+ reserve_pub: ShortHashCode::from(sql_array(&row, 3)),
+ debit_account: sql_url(&row, 4),
+ }
+ }).collect();
+ if transactions.is_empty() {
+ Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())
+ .unwrap()
+ } else {
+ encode_body(
+ parts,
+ StatusCode::OK,
+ &IncomingHistory {
+ credit_account: state.payto.clone(),
+ incoming_transactions: transactions,
+ },
+ )
+ .unexpected()?
+ }
+ }
+ "/history/outgoing" => {
+ assert_method(parts, Method::GET)?;
+ let params = history_params(parts)?;
+ let filter = sql_history_filter(&params);
+
+ let db = state.pool.get().await.catch_code(
+ StatusCode::GATEWAY_TIMEOUT,
+ ErrorCode::GENERIC_DB_FETCH_FAILED,
+ )?;
+ let transactions: Vec<_> =db
+ .query(
+ &format!("SELECT id, _date, amount, wtid, credit_acc, exchange_url FROM tx_out {}", filter),
+ &[],
+ )
+ .await.catch_code(
+ StatusCode::BAD_GATEWAY,
+ ErrorCode::GENERIC_DB_FETCH_FAILED,
+ )?
+ .into_iter()
+ .map(|row| OutgoingBankTransaction {
+ row_id: sql_safe_u64(&row, 0),
+ date: Timestamp::Time(row.get(1)),
+ amount: sql_amount(&row, 2),
+ wtid: ShortHashCode::from(sql_array(&row, 3)),
+ credit_account: sql_url(&row, 4),
+ exchange_base_url:sql_url(&row, 5),
+ }).collect();
+ if transactions.is_empty() {
+ Response::builder()
+ .status(StatusCode::NO_CONTENT)
+ .body(Body::empty())
+ .unwrap()
+ } else {
+ encode_body(
+ parts,
+ StatusCode::OK,
+ &OutgoingHistory {
+ debit_account: state.payto.clone(),
+ outgoing_transactions: transactions,
+ },
+ )
+ .unexpected()?
+ }
+ }
+ #[cfg(feature = "test")]
+ "/admin/add-incoming" => {
+ // We do not check input as this is a test admin endpoint
+ assert_method(&parts, Method::POST).unwrap();
+ let request: common::api_wire::AddIncomingRequest =
+ parse_body(&parts, body).await.unwrap();
+ let timestamp = Timestamp::now();
+ let db = state.pool.get().await.catch_code(
+ StatusCode::GATEWAY_TIMEOUT,
+ ErrorCode::GENERIC_DB_FETCH_FAILED,
+ )?;
+ let row = db.query_one("INSERT INTO tx_in (_date, amount, reserve_pub, debit_acc, credit_acc) VALUES (now(), $1, $2, $3, $4) RETURNING id", &[
+ &request.amount.to_string(), &request.reserve_pub.as_slice(), &request.debit_account.as_ref(), &"payto://bitcoin/bcrt1qgkgxkjj27g3f7s87mcvjjsghay7gh34cx39prj"
+ ]).await.catch_code(
+ StatusCode::BAD_GATEWAY,
+ ErrorCode::GENERIC_DB_FETCH_FAILED,
+ )?;
+ encode_body(
+ parts,
+ StatusCode::OK,
+ &TransferResponse {
+ timestamp,
+ row_id: sql_safe_u64(&row, 0),
+ },
+ )
+ .unexpected()?
+ }
+ _ => {
+ return Err(ServerError::code(
+ StatusCode::NOT_FOUND,
+ ErrorCode::GENERIC_ENDPOINT_UNKNOWN,
+ ))
+ }
+ };
+ Ok(response)
+}
+
+/// Listen to backend status change
+fn status_watcher(state: &'static ServerState) {
+ fn inner(state: &'static ServerState) -> Result<(), Box<dyn std::error::Error>> {
+ let mut db = state.db_config.connect(NoTls)?;
+ // Register as listener
+ db.batch_execute("LISTEN status")?;
+ loop {
+ // Sync state
+ let row = db.query_one("SELECT value FROM state WHERE name = 'status'", &[])?;
+ let status: &[u8] = row.get(0);
+ assert!(status.len() == 1 && status[0] < 2);
+ state.status.store(status[0] == 1, Ordering::SeqCst);
+ // Wait for next notification
+ db.notifications().blocking_iter().next()?;
+ }
+ }
+
+ loop {
+ if let Err(err) = inner(state) {
+ error!("status-watcher: {}", err);
+ std::thread::sleep(Duration::from_secs(5));
+ }
+ }
+}