depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

api.rs (14685B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::sync::{
     18     Arc,
     19     atomic::{AtomicBool, Ordering},
     20 };
     21 
     22 use axum::{
     23     extract::{Request, State},
     24     http::StatusCode,
     25     middleware::Next,
     26     response::{IntoResponse as _, Response},
     27 };
     28 use jiff::Timestamp;
     29 use sqlx::{PgPool, postgres::PgListener};
     30 use taler_api::{
     31     api::{
     32         TalerApi,
     33         prepared::{PreparedTransfer, simple_subject},
     34         revenue::Revenue,
     35         wire::WireGateway,
     36     },
     37     error::{ApiResult, failure_code, failure_status},
     38     subject::IncomingSubject,
     39 };
     40 use taler_common::{
     41     ExpoBackoffDecorr,
     42     api::{
     43         params::{History, Page},
     44         prepared::{RegistrationRequest, RegistrationResponse, SubjectFormat, Unregistration},
     45         revenue::RevenueIncomingHistory,
     46         wire::{
     47             AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddMappedRequest,
     48             IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse,
     49             TransferState, TransferStatus,
     50         },
     51     },
     52     error_code::ErrorCode,
     53     types::{
     54         amount::{Amount, Currency},
     55         payto::PaytoURI,
     56         timestamp::TalerTimestamp,
     57     },
     58 };
     59 use tokio::{sync::watch::Sender, time::sleep};
     60 use tracing::{debug, error, warn};
     61 
     62 use crate::{
     63     db::{
     64         self, AddIncomingResult, RegistrationResult, TransferResult, get_status,
     65         register_tx_in_admin, revenue_history, transfer, transfer_register, transfer_unregister,
     66     },
     67     payto::{BtcPayto, FullBtcPayto},
     68 };
     69 
     70 pub struct ServerState {
     71     pool: PgPool,
     72     payto: FullBtcPayto,
     73     currency: Currency,
     74     status: AtomicBool,
     75     in_channel: Sender<i64>,
     76     taler_in_channel: Sender<i64>,
     77     taler_out_channel: Sender<i64>,
     78 }
     79 
     80 pub async fn notification_listener(
     81     pool: PgPool,
     82     in_channel: Sender<i64>,
     83     taler_in_channel: Sender<i64>,
     84     taler_out_channel: Sender<i64>,
     85 ) -> sqlx::Result<()> {
     86     taler_api::notification::notification_listener!(&pool,
     87         "tx_in" => (row_id: i64) {
     88             in_channel.send_replace(row_id);
     89         },
     90         "taler_in" => (row_id: i64) {
     91             taler_in_channel.send_replace(row_id);
     92         },
     93         "taler_out" => (row_id: i64) {
     94             taler_out_channel.send_replace(row_id);
     95         }
     96     )
     97 }
     98 
     99 impl ServerState {
    100     pub async fn start(pool: sqlx::PgPool, payto: FullBtcPayto, currency: Currency) -> Arc<Self> {
    101         let in_channel = Sender::new(0);
    102         let taler_in_channel = Sender::new(0);
    103         let taler_out_channel = Sender::new(0);
    104         let tmp = Self {
    105             pool: pool.clone(),
    106             payto,
    107             currency,
    108             status: AtomicBool::new(true),
    109             in_channel: in_channel.clone(),
    110             taler_in_channel: taler_in_channel.clone(),
    111             taler_out_channel: taler_out_channel.clone(),
    112         };
    113         let state = Arc::new(tmp);
    114         tokio::spawn(status_watcher(state.clone()));
    115         tokio::spawn(notification_listener(
    116             pool,
    117             in_channel,
    118             taler_in_channel,
    119             taler_out_channel,
    120         ));
    121         state
    122     }
    123 }
    124 
    125 impl TalerApi for ServerState {
    126     fn currency(&self) -> Currency {
    127         self.currency
    128     }
    129 
    130     fn implementation(&self) -> &'static str {
    131         "urn:net:taler:specs:depolymerizer-bitcoin:depolymerization"
    132     }
    133 }
    134 
    135 async fn add_incoming(
    136     db: &PgPool,
    137     amount: Amount,
    138     debit_account: PaytoURI,
    139     subject: &IncomingSubject,
    140 ) -> ApiResult<AddIncomingResponse> {
    141     let debtor = FullBtcPayto::try_from(&debit_account)?;
    142     match register_tx_in_admin(db, &amount, &debtor.0, &Timestamp::now(), subject).await? {
    143         AddIncomingResult::Success {
    144             row_id, valued_at, ..
    145         } => Ok(AddIncomingResponse {
    146             row_id,
    147             timestamp: valued_at.into(),
    148         }),
    149         AddIncomingResult::ReservePubReuse => {
    150             Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    151         }
    152         AddIncomingResult::MappingReuse => {
    153             Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_REUSED))
    154         }
    155         AddIncomingResult::UnknownMapping => {
    156             Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_UNKNOWN))
    157         }
    158     }
    159 }
    160 
    161 impl WireGateway for ServerState {
    162     async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> {
    163         let creditor = FullBtcPayto::try_from(&req.credit_account)?;
    164         match transfer(&self.pool, &creditor, &req).await? {
    165             TransferResult::Success(transfer_response) => Ok(transfer_response),
    166             TransferResult::RequestUidReuse => {
    167                 Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED))
    168             }
    169             TransferResult::WtidReuse => Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)),
    170         }
    171     }
    172 
    173     async fn transfer_page(
    174         &self,
    175         params: Page,
    176         status: Option<TransferState>,
    177     ) -> ApiResult<TransferList> {
    178         let transfers = db::transfer_page(&self.pool, &status, &params, &self.currency).await?;
    179         Ok(TransferList {
    180             transfers,
    181             debit_account: self.payto.as_uri(),
    182         })
    183     }
    184 
    185     async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> {
    186         let status = db::transfer_by_id(&self.pool, id, &self.currency).await?;
    187         Ok(status)
    188     }
    189 
    190     async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> {
    191         let outgoing_transactions =
    192             db::outgoing_history(&self.pool, &params, &self.currency, || {
    193                 self.taler_out_channel.subscribe()
    194             })
    195             .await?;
    196         Ok(OutgoingHistory {
    197             debit_account: self.payto.as_uri(),
    198             outgoing_transactions,
    199         })
    200     }
    201 
    202     async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> {
    203         let incoming_transactions =
    204             db::incoming_history(&self.pool, &params, &self.currency, || {
    205                 self.taler_in_channel.subscribe()
    206             })
    207             .await?;
    208         Ok(IncomingHistory {
    209             credit_account: self.payto.as_uri(),
    210             incoming_transactions,
    211         })
    212     }
    213 
    214     async fn add_incoming_reserve(
    215         &self,
    216         req: AddIncomingRequest,
    217     ) -> ApiResult<AddIncomingResponse> {
    218         add_incoming(
    219             &self.pool,
    220             req.amount,
    221             req.debit_account,
    222             &IncomingSubject::Reserve(req.reserve_pub),
    223         )
    224         .await
    225     }
    226 
    227     async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddIncomingResponse> {
    228         add_incoming(
    229             &self.pool,
    230             req.amount,
    231             req.debit_account,
    232             &IncomingSubject::Kyc(req.account_pub),
    233         )
    234         .await
    235     }
    236 
    237     async fn add_incoming_mapped(&self, req: AddMappedRequest) -> ApiResult<AddIncomingResponse> {
    238         add_incoming(
    239             &self.pool,
    240             req.amount,
    241             req.debit_account,
    242             &IncomingSubject::Map(req.authorization_pub),
    243         )
    244         .await
    245     }
    246 
    247     fn support_account_check(&self) -> bool {
    248         // TODO we might be able to check this ?
    249         false
    250     }
    251 }
    252 
    253 impl Revenue for ServerState {
    254     async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> {
    255         Ok(RevenueIncomingHistory {
    256             incoming_transactions: revenue_history(&self.pool, &params, &self.currency, || {
    257                 self.in_channel.subscribe()
    258             })
    259             .await?,
    260             credit_account: self.payto.as_uri(),
    261         })
    262     }
    263 }
    264 
    265 impl PreparedTransfer for ServerState {
    266     // TODO bitcoin subject format
    267     fn supported_formats(&self) -> &[SubjectFormat] {
    268         &[SubjectFormat::SIMPLE]
    269     }
    270 
    271     async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> {
    272         let creditor = BtcPayto::try_from(&req.credit_account)?;
    273         if creditor.0 != self.payto.0 {
    274             return Err(failure_code(ErrorCode::BANK_UNKNOWN_CREDITOR));
    275         }
    276         match transfer_register(
    277             &self.pool,
    278             req.r#type.into(),
    279             &req.account_pub,
    280             &req.authorization_pub,
    281             &req.authorization_sig,
    282             req.recurrent,
    283             &Timestamp::now(),
    284         )
    285         .await?
    286         {
    287             RegistrationResult::Success => ApiResult::Ok(RegistrationResponse {
    288                 subjects: vec![simple_subject(req)],
    289                 expiration: TalerTimestamp::Never,
    290             }),
    291             RegistrationResult::ReservePubReuse => {
    292                 ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT))
    293             }
    294             RegistrationResult::SubjectReuse => {
    295                 ApiResult::Err(failure_code(ErrorCode::BANK_DERIVATION_REUSE))
    296             }
    297         }
    298     }
    299 
    300     async fn unregistration(&self, req: Unregistration) -> ApiResult<bool> {
    301         Ok(transfer_unregister(&self.pool, &req.authorization_pub, &Timestamp::now()).await?)
    302     }
    303 }
    304 
    305 pub async fn status_middleware(
    306     State(state): State<Arc<ServerState>>,
    307     request: Request,
    308     next: Next,
    309 ) -> Response {
    310     if !state.status.load(Ordering::Relaxed) {
    311         failure_status(
    312             ErrorCode::GENERIC_INTERNAL_INVARIANT_FAILURE,
    313             "Currency backing is compromised until the transaction reappear",
    314             StatusCode::BAD_GATEWAY,
    315         )
    316         .into_response()
    317     } else {
    318         next.run(request).await
    319     }
    320 }
    321 
    322 /// Listen to backend status change
    323 async fn status_watcher(state: Arc<ServerState>) {
    324     let mut jitter = ExpoBackoffDecorr::default();
    325     async fn inner(
    326         state: &ServerState,
    327         jitter: &mut ExpoBackoffDecorr,
    328     ) -> Result<(), sqlx::error::Error> {
    329         let mut listener = PgListener::connect_with(&state.pool).await?;
    330         listener.listen("status").await?;
    331         loop {
    332             // Sync state
    333             if let Some([status]) = get_status(&state.pool).await? {
    334                 assert!(status < 2);
    335                 if status == 1 {
    336                     debug!(target: "status-watcher", "Worker healthy");
    337                 } else {
    338                     debug!(target: "status-watcher", "Worker down");
    339                 }
    340                 state.status.store(status == 1, Ordering::SeqCst);
    341             } else {
    342                 warn!(target: "status-watcher", "Status not setup");
    343             }
    344             // Wait for next notification
    345             listener.recv().await?;
    346             jitter.reset();
    347         }
    348     }
    349 
    350     loop {
    351         if let Err(err) = inner(&state, &mut jitter).await {
    352             error!(target: "status-watcher", "{err}");
    353             sleep(jitter.backoff()).await;
    354         }
    355     }
    356 }
    357 
    358 #[cfg(test)]
    359 pub mod test {
    360 
    361     use std::{str::FromStr, sync::LazyLock};
    362 
    363     use axum::Router;
    364     use jiff::Timestamp;
    365     use sqlx::PgPool;
    366     use taler_api::{api::TalerRouter as _, auth::AuthMethod, subject::OutgoingSubject};
    367     use taler_common::{
    368         api::wire::{TransferState, WireConfig},
    369         types::amount::{Currency, amount},
    370     };
    371     use taler_test_utils::{
    372         db::db_test_setup,
    373         routine::{admin_add_incoming_routine, out_history_routine, transfer_routine},
    374         server::TestServer,
    375         tasks,
    376     };
    377 
    378     use crate::{
    379         CONFIG_SOURCE,
    380         api::ServerState,
    381         db::{TxOutKind, sync_out, test::rand_tx_id},
    382         payto::FullBtcPayto,
    383     };
    384 
    385     pub static EXCHANGE: LazyLock<FullBtcPayto> = LazyLock::new(|| {
    386         FullBtcPayto::from_str(
    387             "payto://bitcoin/1FfmbHfnpaZjKFvyi1okTjJJusN455paPH?receiver-name=Exchange",
    388         )
    389         .unwrap()
    390     });
    391 
    392     pub static CLIENT: LazyLock<FullBtcPayto> = LazyLock::new(|| {
    393         FullBtcPayto::from_str(
    394             "payto://bitcoin/1FfmbHfnpaZjKFvyi1okTjJJusN455paPH?receiver-name=Anonymous",
    395         )
    396         .unwrap()
    397     });
    398 
    399     async fn setup() -> (Router, PgPool) {
    400         let (_, pool) = db_test_setup(CONFIG_SOURCE).await;
    401         let api = ServerState::start(
    402             pool.clone(),
    403             EXCHANGE.clone(),
    404             Currency::from_str("BTC").unwrap(),
    405         )
    406         .await;
    407         let server = Router::new()
    408             .wire_gateway(api.clone(), AuthMethod::None)
    409             .prepared_transfer(api.clone())
    410             .revenue(api.clone(), AuthMethod::None)
    411             .finalize();
    412 
    413         (server, pool)
    414     }
    415 
    416     #[tokio::test]
    417     async fn config() {
    418         let (server, _) = setup().await;
    419         server
    420             .get("/taler-wire-gateway/config")
    421             .await
    422             .assert_ok_json::<WireConfig>();
    423     }
    424 
    425     #[tokio::test]
    426     async fn transfer() {
    427         let (server, _) = setup().await;
    428         transfer_routine(
    429             &server.prefix("/taler-wire-gateway"),
    430             TransferState::pending,
    431             &CLIENT.as_uri(),
    432         )
    433         .await;
    434     }
    435 
    436     #[tokio::test]
    437     async fn outgoing_history() {
    438         let (server, db) = setup().await;
    439         out_history_routine(
    440             &server.prefix("/taler-wire-gateway"),
    441             tasks!({
    442                 let sub = &OutgoingSubject::rand();
    443                 sync_out(
    444                     &db,
    445                     &rand_tx_id(),
    446                     None,
    447                     &amount("BTC:10"),
    448                     &EXCHANGE.0,
    449                     &TxOutKind::Talerable {
    450                         wtid: &sub.wtid,
    451                         url: &sub.exchange_base_url,
    452                         metadata: sub.metadata.as_deref(),
    453                     },
    454                     &Timestamp::now(),
    455                 )
    456                 .await
    457                 .unwrap();
    458             }),
    459             tasks!(),
    460         )
    461         .await;
    462     }
    463 
    464     #[tokio::test]
    465     async fn admin_add_incoming() {
    466         let (server, _) = setup().await;
    467         admin_add_incoming_routine(
    468             &server.prefix("/taler-wire-gateway"),
    469             &server.prefix("/taler-prepared-transfer"),
    470             &CLIENT.as_uri(),
    471             &EXCHANGE.as_uri(),
    472         )
    473         .await;
    474     }
    475 }