depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

api.rs (7935B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::sync::{
     18     Arc,
     19     atomic::{AtomicBool, Ordering},
     20 };
     21 
     22 use axum::{
     23     extract::{Request, State},
     24     http::StatusCode,
     25     middleware::Next,
     26     response::{IntoResponse as _, Response},
     27 };
     28 use sqlx::{PgPool, Row, postgres::PgListener};
     29 use taler_api::{
     30     api::{TalerApi, wire::WireGateway},
     31     error::{ApiResult, failure, failure_status, not_implemented},
     32 };
     33 use taler_common::{
     34     ExpoBackoffDecorr,
     35     api_params::{History, Page},
     36     api_wire::{
     37         AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse,
     38         IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse,
     39         TransferState, TransferStatus,
     40     },
     41     error_code::ErrorCode,
     42     types::{amount::Currency, payto::PaytoURI, timestamp::Timestamp},
     43 };
     44 use tokio::{sync::watch::Sender, time::sleep};
     45 use tracing::error;
     46 
     47 use crate::{
     48     db::{self},
     49     payto::FullBtcPayto,
     50 };
     51 
     52 pub struct ServerState {
     53     pool: PgPool,
     54     payto: PaytoURI,
     55     currency: Currency,
     56     status: AtomicBool,
     57     taler_in_channel: Sender<i64>,
     58     taler_out_channel: Sender<i64>,
     59 }
     60 
     61 pub async fn notification_listener(
     62     pool: PgPool,
     63     taler_in_channel: Sender<i64>,
     64     taler_out_channel: Sender<i64>,
     65 ) -> sqlx::Result<()> {
     66     taler_api::notification::notification_listener!(&pool,
     67         "taler_in" => (row_id: i64) {
     68             taler_in_channel.send_replace(row_id);
     69         },
     70         "taler_out" => (row_id: i64) {
     71             taler_out_channel.send_replace(row_id);
     72         }
     73     )
     74 }
     75 
     76 impl ServerState {
     77     pub async fn start(pool: sqlx::PgPool, payto: PaytoURI, currency: Currency) -> Arc<Self> {
     78         let taler_in_channel = Sender::new(0);
     79         let taler_out_channel = Sender::new(0);
     80         let tmp = Self {
     81             pool: pool.clone(),
     82             payto,
     83             currency,
     84             status: AtomicBool::new(true),
     85             taler_in_channel: taler_in_channel.clone(),
     86             taler_out_channel: taler_out_channel.clone(),
     87         };
     88         let state = Arc::new(tmp);
     89         tokio::spawn(status_watcher(state.clone()));
     90         tokio::spawn(notification_listener(
     91             pool,
     92             taler_in_channel,
     93             taler_out_channel,
     94         ));
     95         state
     96     }
     97 }
     98 
     99 impl TalerApi for ServerState {
    100     fn currency(&self) -> &str {
    101         self.currency.as_ref()
    102     }
    103 
    104     fn implementation(&self) -> Option<&str> {
    105         None
    106     }
    107 }
    108 
    109 impl WireGateway for ServerState {
    110     async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> {
    111         let creditor = FullBtcPayto::try_from(&req.credit_account)?;
    112 
    113         match db::transfer(&self.pool, &creditor, &req).await? {
    114             db::TransferResult::Success(transfer_response) => Ok(transfer_response),
    115             db::TransferResult::RequestUidReuse => Err(failure(
    116                 ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED,
    117                 format!("Request UID {} already used", req.request_uid),
    118             )),
    119             db::TransferResult::WtidReuse => Err(failure(
    120                 ErrorCode::BANK_TRANSFER_WTID_REUSED,
    121                 format!("wtid {} already used", req.request_uid),
    122             )),
    123         }
    124     }
    125 
    126     async fn transfer_page(
    127         &self,
    128         params: Page,
    129         status: Option<TransferState>,
    130     ) -> ApiResult<TransferList> {
    131         let transfers = db::transfer_page(&self.pool, &status, &params, &self.currency).await?;
    132         Ok(TransferList {
    133             transfers,
    134             debit_account: self.payto.clone(),
    135         })
    136     }
    137 
    138     async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> {
    139         let status = db::transfer_by_id(&self.pool, id, &self.currency).await?;
    140         Ok(status)
    141     }
    142 
    143     async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> {
    144         let outgoing_transactions =
    145             db::outgoing_history(&self.pool, &params, &self.currency, || {
    146                 self.taler_out_channel.subscribe()
    147             })
    148             .await?;
    149         Ok(OutgoingHistory {
    150             debit_account: self.payto.clone(),
    151             outgoing_transactions,
    152         })
    153     }
    154 
    155     async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> {
    156         let incoming_transactions =
    157             db::incoming_history(&self.pool, &params, &self.currency, || {
    158                 self.taler_in_channel.subscribe()
    159             })
    160             .await?;
    161         Ok(IncomingHistory {
    162             credit_account: self.payto.clone(),
    163             incoming_transactions,
    164         })
    165     }
    166 
    167     async fn add_incoming_reserve(
    168         &self,
    169         req: AddIncomingRequest,
    170     ) -> ApiResult<AddIncomingResponse> {
    171         let debtor = FullBtcPayto::try_from(&req.debit_account)?;
    172         let timestamp = Timestamp::now();
    173         match db::register_tx_in_admin(
    174             &self.pool,
    175             &req.amount,
    176             &debtor.0,
    177             &timestamp,
    178             &req.reserve_pub,
    179         )
    180         .await?
    181         {
    182             db::AddIncomingResult::Success {
    183                 new: _,
    184                 row_id,
    185                 valued_at,
    186             } => Ok(AddIncomingResponse {
    187                 timestamp: valued_at,
    188                 row_id,
    189             }),
    190             db::AddIncomingResult::ReservePubReuse => Err(failure(
    191                 ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT,
    192                 "reserve_pub used already".to_owned(),
    193             )),
    194         }
    195     }
    196 
    197     async fn add_incoming_kyc(&self, _req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> {
    198         Err(not_implemented(
    199             "depolymerizer-bitcoin does not supports KYC",
    200         ))
    201     }
    202 
    203     fn support_account_check(&self) -> bool {
    204         false
    205     }
    206 }
    207 
    208 pub async fn status_middleware(
    209     State(state): State<Arc<ServerState>>,
    210     request: Request,
    211     next: Next,
    212 ) -> Response {
    213     if !state.status.load(Ordering::Relaxed) {
    214         failure_status(
    215             ErrorCode::GENERIC_INTERNAL_INVARIANT_FAILURE,
    216             "Currency backing is compromised until the transaction reappear",
    217             StatusCode::BAD_GATEWAY,
    218         )
    219         .into_response()
    220     } else {
    221         next.run(request).await
    222     }
    223 }
    224 
    225 /// Listen to backend status change
    226 async fn status_watcher(state: Arc<ServerState>) {
    227     let mut jitter = ExpoBackoffDecorr::default();
    228     async fn inner(
    229         state: &ServerState,
    230         jitter: &mut ExpoBackoffDecorr,
    231     ) -> Result<(), sqlx::error::Error> {
    232         let mut listener = PgListener::connect_with(&state.pool).await?;
    233         listener.listen("status").await?;
    234         loop {
    235             // Sync state
    236             let row = sqlx::query("SELECT value FROM state WHERE name = 'status'")
    237                 .fetch_one(&state.pool)
    238                 .await?;
    239             let status: &[u8] = row.try_get(0)?;
    240             assert!(status.len() == 1 && status[0] < 2);
    241             state.status.store(status[0] == 1, Ordering::SeqCst);
    242             // Wait for next notification
    243             listener.recv().await?;
    244             jitter.reset();
    245         }
    246     }
    247 
    248     loop {
    249         if let Err(err) = inner(&state, &mut jitter).await {
    250             error!(target: "status-watcher", "{err}");
    251             sleep(jitter.backoff()).await;
    252         }
    253     }
    254 }