depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 1fc2654e3086cad482864f9cf6a11670086ae192
parent db607257abc6f7f2b04f4e2d3fb414d8ba8e4976
Author: Antoine A <>
Date:   Thu, 24 Jul 2025 17:35:00 +0200

bitcoin: rework whole SQL logic and improve worker logic

Diffstat:
MCargo.lock | 1+
Mcommon/Cargo.toml | 1+
Mcommon/src/status.rs | 45+++++++++++----------------------------------
Mdatabase-versioning/depolymerizer-bitcoin-0001.sql | 61+++++++++++++++++++++++++++++++++++++------------------------
Mdatabase-versioning/depolymerizer-bitcoin-procedures.sql | 203++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++-
Mdepolymerizer-bitcoin/depolymerizer-bitcoin.conf | 17-----------------
Mdepolymerizer-bitcoin/src/api.rs | 254++++++++++++++++---------------------------------------------------------------
Adepolymerizer-bitcoin/src/db.rs | 597+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
Mdepolymerizer-bitcoin/src/lib.rs | 2++
Mdepolymerizer-bitcoin/src/loops/worker.rs | 393++++++++++++++++++++++++-------------------------------------------------------
Mdepolymerizer-bitcoin/src/main.rs | 14++++++--------
Mdepolymerizer-bitcoin/src/sql.rs | 35+++++++++++++++++++++++++++++++++--
Mmakefile | 2+-
13 files changed, 1055 insertions(+), 570 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -401,6 +401,7 @@ name = "common" version = "0.1.0" dependencies = [ "rand 0.9.2", + "sqlx", "taler-common", "thiserror", "uri-pack", diff --git a/common/Cargo.toml b/common/Cargo.toml @@ -8,6 +8,7 @@ repository.workspace = true license-file.workspace = true [dependencies] +sqlx = { workspace = true, features = ["macros"] } # Url format url.workspace = true # Error macros diff --git a/common/src/status.rs b/common/src/status.rs @@ -19,25 +19,14 @@ /// -> Requested API request /// Requested -> Sent Announced to the bitcoin network /// Sent -> Requested Conflicting transaction (reorg) -#[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] +#[allow(non_camel_case_types)] +#[sqlx(type_name = "debit_status")] pub enum DebitStatus { /// Debit have been requested (default) - Requested = 0, + requested, /// Debit have been announced to the bitcoin network - Sent = 1, -} - -impl TryFrom<u8> for DebitStatus { - type Error = (); - - fn try_from(v: u8) -> Result<Self, Self::Error> { - match v { - x if x == DebitStatus::Requested as u8 => Ok(DebitStatus::Requested), - x if x == DebitStatus::Sent as u8 => Ok(DebitStatus::Sent), - _ => Err(()), - } - } + sent, } /// Bounce transaction status @@ -45,26 +34,14 @@ impl TryFrom<u8> for DebitStatus { /// Requested -> Ignored Insufficient found /// Requested -> Sent Announced to the bitcoin network /// Sent -> Requested Conflicting transaction (reorg) -#[repr(u8)] -#[derive(Debug, Clone, Copy, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] +#[allow(non_camel_case_types)] +#[sqlx(type_name = "bounce_status")] pub enum BounceStatus { /// Bounce have been requested (default) - Requested = 0, + requested, /// Bounce will never be sent (e.g: bounce amount smaller than bounce fee) - Ignored = 1, + ignored, /// Bounce have been announced to the bitcoin network - Sent = 2, -} - -impl TryFrom<u8> for BounceStatus { - type Error = (); - - fn try_from(v: u8) -> Result<Self, Self::Error> { - match v { - x if x == BounceStatus::Requested as u8 => Ok(BounceStatus::Requested), - x if x == BounceStatus::Sent as u8 => Ok(BounceStatus::Sent), - x if x == BounceStatus::Ignored as u8 => Ok(BounceStatus::Ignored), - _ => Err(()), - } - } + sent, } diff --git a/database-versioning/depolymerizer-bitcoin-0001.sql b/database-versioning/depolymerizer-bitcoin-0001.sql @@ -21,42 +21,55 @@ SET search_path TO depolymerizer_bitcoin; CREATE TYPE taler_amount AS (val INT8, frac INT4); COMMENT ON TYPE taler_amount IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; +CREATE TYPE debit_status AS ENUM( + 'requested', + 'sent' +); +COMMENT ON TYPE debit_status IS 'Status of an outgoing transaction'; + +CREATE TYPE bounce_status AS ENUM( + 'requested', + 'ignored', + 'sent' +); +COMMENT ON TYPE bounce_status IS 'Status of a bounce'; + CREATE TABLE state ( - name TEXT NOT NULL PRIMARY KEY, - value BYTEA NOT NULL + name TEXT NOT NULL PRIMARY KEY, + value BYTEA NOT NULL ); COMMENT ON TABLE state IS 'Key value state'; CREATE TABLE tx_in ( - id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - received INT8 NOT NULL, - amount taler_amount NOT NULL, - reserve_pub BYTEA NOT NULL UNIQUE CHECK (LENGTH(reserve_pub)=32), - debit_acc TEXT NOT NULL, - credit_acc TEXT NOT NULL + id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + received INT8 NOT NULL, + amount taler_amount NOT NULL, + reserve_pub BYTEA NOT NULL UNIQUE CHECK (LENGTH(reserve_pub)=32), + debit_acc TEXT NOT NULL, + txid BYTEA UNIQUE CHECK (LENGTH(txid)=32) ); COMMENT ON TABLE state IS 'Incoming transactions'; CREATE TABLE tx_out ( - id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - created INT8 NOT NULL, - amount taler_amount NOT NULL, - wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32), - debit_acc TEXT, - credit_acc TEXT NOT NULL, - credit_name TEXT, - exchange_url TEXT NOT NULL, - request_uid BYTEA UNIQUE CHECK (LENGTH(request_uid)=64), - status SMALLINT NOT NULL DEFAULT 0, - txid BYTEA UNIQUE CHECK (LENGTH(txid)=32) + id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + created INT8 NOT NULL, + amount taler_amount NOT NULL, + wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32), + credit_acc TEXT NOT NULL, + credit_name TEXT, + exchange_url TEXT NOT NULL, + request_uid BYTEA UNIQUE CHECK (LENGTH(request_uid)=64), + status debit_status NOT NULL, + txid BYTEA UNIQUE CHECK (LENGTH(txid)=32) ); COMMENT ON TABLE state IS 'Outgoing transactions'; CREATE TABLE bounce ( - id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - bounced BYTEA UNIQUE NOT NULL, - txid BYTEA UNIQUE CHECK (LENGTH(txid)=32), - created INT8 NOT NULL, - status SMALLINT NOT NULL DEFAULT 0 + id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + bounced BYTEA UNIQUE NOT NULL, + txid BYTEA UNIQUE CHECK (LENGTH(txid)=32), + created INT8 NOT NULL, + reason TEXT, + status bounce_status NOT NULL ); COMMENT ON TABLE state IS 'Bounced incoming transactions'; \ No newline at end of file diff --git a/database-versioning/depolymerizer-bitcoin-procedures.sql b/database-versioning/depolymerizer-bitcoin-procedures.sql @@ -36,4 +36,203 @@ BEGIN EXECUTE _sql; END IF; END -$do$; -\ No newline at end of file +$do$; + +CREATE FUNCTION taler_transfer( + IN in_amount taler_amount, + IN in_exchange_base_url TEXT, + IN in_credit_acc TEXT, + IN in_credit_name TEXT, + IN in_request_uid BYTEA, + IN in_wtid BYTEA, + IN in_now INT8, + -- Error status + OUT out_request_uid_reuse BOOLEAN, + OUT out_wtid_reuse BOOLEAN, + -- Success return + OUT out_transfer_row_id INT8, + OUT out_created_at INT8 +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check for idempotence and conflict +SELECT (amount != in_amount + OR credit_acc != in_credit_acc + OR credit_name != in_credit_name + OR exchange_url != in_exchange_base_url + OR wtid != in_wtid) + ,id, created + INTO out_request_uid_reuse, out_transfer_row_id, out_created_at + FROM tx_out + WHERE request_uid = in_request_uid; +IF FOUND THEN + out_wtid_reuse=FALSE; + RETURN; +END IF; +out_request_uid_reuse=FALSE; + +-- Register exchange +INSERT INTO tx_out ( + amount, + exchange_url, + credit_acc, + credit_name, + request_uid, + wtid, + created, + status +) VALUES ( + in_amount, + in_exchange_base_url, + in_credit_acc, + in_credit_name, + in_request_uid, + in_wtid, + in_now, + 'requested' +) ON CONFLICT (wtid) DO NOTHING + RETURNING id, created INTO out_transfer_row_id, out_created_at; +out_wtid_reuse=NOT FOUND; +IF out_wtid_reuse THEN + RETURN; +END IF; +-- Notify new transaction +NOTIFY new_tx; +PERFORM pg_notify('taler_out', out_transfer_row_id || ''); +END $$; +COMMENT ON FUNCTION taler_transfer IS 'Create an outgoing taler transaction and register it'; + +CREATE FUNCTION register_tx_in( + IN in_amount taler_amount, + IN in_debit_acc TEXT, + IN in_reserve_pub BYTEA, + IN in_received INT8, + IN in_txid BYTEA, + -- Error status + OUT out_reserve_pub_reuse BOOLEAN, + -- Success return + OUT out_tx_row_id INT8, + OUT out_valued_at INT8, + OUT out_new BOOLEAN +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Check for idempotence and conflict +SELECT (amount != in_amount OR debit_acc != in_debit_acc OR txid != in_txid), id, received + INTO out_reserve_pub_reuse, out_tx_row_id, out_valued_at + FROM tx_in + WHERE reserve_pub = in_reserve_pub; +out_new=NOT FOUND; +IF FOUND THEN + RETURN; +END IF; +out_reserve_pub_reuse=false; + +-- Insert new incoming transaction +INSERT INTO tx_in ( + txid, + amount, + debit_acc, + reserve_pub, + received +) VALUES ( + in_txid, + in_amount, + in_debit_acc, + in_reserve_pub, + in_received +) RETURNING id, received INTO out_tx_row_id, out_valued_at; +PERFORM pg_notify('taler_in', out_tx_row_id || ''); +END $$; +COMMENT ON FUNCTION register_tx_in IS 'Register an incoming transaction idempotently'; + +CREATE FUNCTION sync_out( + IN in_txid BYTEA, + IN in_replace_txid BYTEA, + IN in_amount taler_amount, + IN in_exchange_base_url TEXT, + IN in_credit_acc TEXT, + IN in_wtid BYTEA, + IN in_created INT8, + -- Success return + OUT out_new BOOLEAN, + OUT out_replaced BOOLEAN, + OUT out_recovered BOOLEAN +) +LANGUAGE plpgsql AS $$ +DECLARE + local_id INT8; +BEGIN +-- Check replacement +IF in_replace_txid IS NOT NULL THEN + UPDATE tx_out SET txid=in_txid, status='status' WHERE txid=in_replace_txid; + IF FOUND THEN + out_new=FALSE; + out_replaced=FALSE; + out_recovered=FALSE; + RETURN; + END IF; +END IF; +out_replaced=FALSE; +out_new=NOT EXISTS(SELECT FROM tx_out WHERE wtid = in_wtid); + +IF out_new THEN + INSERT INTO tx_out ( + amount, + wtid, + credit_acc, + exchange_url, + status, + txid, + created, + request_uid + ) VALUES ( + in_amount, + in_wtid, + in_credit_acc, + in_exchange_base_url, + 'sent', + in_txid, + in_created, + NULL + ) RETURNING id INTO local_id; + out_recovered=FALSE; + PERFORM pg_notify('taler_out', local_id || ''); +ELSE + UPDATE tx_out SET txid=in_txid, status='sent' WHERE status='requested' AND wtid=in_wtid; + out_recovered=FOUND; +END IF; +END $$; +COMMENT ON FUNCTION sync_out IS 'Sync a debit blockchain state with local state'; + +CREATE FUNCTION sync_bounce( + IN in_txid BYTEA, + IN in_bounced BYTEA, + IN in_created INT8, + -- Success return + OUT out_new BOOLEAN, + OUT out_recovered BOOLEAN +) +LANGUAGE plpgsql AS $$ +BEGIN +out_new=NOT EXISTS(SELECT FROM bounce WHERE bounced = in_bounced); + +IF out_new THEN + INSERT INTO bounce ( + created, + bounced, + txid, + status + ) VALUES ( + in_created, + in_bounced, + in_txid, + 'sent' + ); + out_recovered=false; +ELSE + UPDATE bounce SET txid=in_txid, status='sent' WHERE status!='sent' AND bounced=in_bounced; + out_recovered=FOUND; +END IF; +END $$; +COMMENT ON FUNCTION sync_bounce IS 'Sync a bounce blockchain state with local state'; +\ No newline at end of file diff --git a/depolymerizer-bitcoin/depolymerizer-bitcoin.conf b/depolymerizer-bitcoin/depolymerizer-bitcoin.conf @@ -80,23 +80,6 @@ AUTH_METHOD = bearer TOKEN = -[depolymerizer-bitcoin-httpd-revenue-api] -# Whether to serve the Revenue API -ENABLED = NO - -# Authentication scheme, this can either can be basic, bearer or none. -AUTH_METHOD = bearer - -# User name for basic authentication scheme -# USERNAME = - -# Password for basic authentication scheme -# PASSWORD = - -# Token for bearer authentication scheme -TOKEN = - - [depolymerizer-bitcoindb-postgres] # DB connection string CONFIG = postgres:///depolymerizer-bitcoin diff --git a/depolymerizer-bitcoin/src/api.rs b/depolymerizer-bitcoin/src/api.rs @@ -25,14 +25,9 @@ use axum::{ middleware::Next, response::{IntoResponse as _, Response}, }; -use bitcoin::address::NetworkUnchecked; -use sqlx::{ - PgPool, QueryBuilder, Row, - postgres::{PgListener, PgRow}, -}; +use sqlx::{PgPool, Row, postgres::PgListener}; use taler_api::{ api::{TalerApi, wire::WireGateway}, - db::{BindHelper as _, TypeHelper as _, history, page}, error::{ApiResult, failure, failure_status, not_implemented}, }; use taler_common::{ @@ -40,17 +35,19 @@ use taler_common::{ api_params::{History, Page}, api_wire::{ AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, - IncomingBankTransaction, IncomingHistory, OutgoingBankTransaction, OutgoingHistory, - TransferList, TransferRequest, TransferResponse, TransferState, TransferStatus, + IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, + TransferState, TransferStatus, }, error_code::ErrorCode, types::{amount::Currency, payto::PaytoURI, timestamp::Timestamp}, }; -use taler_common::{api_wire::TransferListStatus, types::payto::PaytoImpl}; use tokio::{sync::watch::Sender, time::sleep}; use tracing::error; -use crate::payto::{BtcWallet, FullBtcPayto}; +use crate::{ + db::{self}, + payto::FullBtcPayto, +}; pub struct ServerState { pool: PgPool, @@ -109,85 +106,21 @@ impl TalerApi for ServerState { } } -fn sql_payto<I: sqlx::ColumnIndex<PgRow>>(r: &PgRow, addr: I, name: I) -> sqlx::Result<PaytoURI> { - let addr = r - .try_get_parse::<_, _, bitcoin::Address<NetworkUnchecked>>(addr)? - .assume_checked(); - let name: Option<&str> = r.try_get(name)?; - - Ok(BtcWallet(addr) - .as_payto() - .as_full_payto(name.unwrap_or("Bitcoin User"))) -} - -fn sql_generic_payto<I: sqlx::ColumnIndex<PgRow>>(row: &PgRow, idx: I) -> sqlx::Result<PaytoURI> { - let addr = row - .try_get_parse::<_, _, bitcoin::Address<NetworkUnchecked>>(idx)? - .assume_checked(); - - Ok(BtcWallet(addr).as_payto().as_full_payto("Bitcoin User")) -} - impl WireGateway for ServerState { async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { let creditor = FullBtcPayto::try_from(&req.credit_account)?; - // TODO use plpgsql transaction - // Handle idempotence, check previous transaction with the same request_uid - let row = sqlx::query("SELECT (amount).val, (amount).frac, exchange_url, wtid, credit_acc, credit_name, id, created FROM tx_out WHERE request_uid = $1").bind(req.request_uid.as_slice()) - .fetch_optional(&self.pool) - .await?; - if let Some(r) = row { - // TODO store names? - let prev: TransferRequest = TransferRequest { - request_uid: req.request_uid.clone(), - amount: r.try_get_amount_i(0, &self.currency)?, - exchange_base_url: r.try_get_url("exchange_url")?, - wtid: r.try_get_base32("wtid")?, - credit_account: sql_payto(&r, "credit_acc", "credit_name")?, - }; - if prev == req { - // Idempotence - return Ok(TransferResponse { - row_id: r.try_get_safeu64("id")?, - timestamp: r.try_get_timestamp("created")?, - }); - } else { - return Err(failure( - ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, - format!("Request UID {} already used", req.request_uid), - )); - } - } - - let timestamp = Timestamp::now(); - let r = sqlx::query( - "INSERT INTO tx_out (created, amount, wtid, debit_acc, credit_acc, credit_name, exchange_url, request_uid) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING RETURNING id" - ) - .bind_timestamp(&Timestamp::now()) - .bind_amount(&req.amount) - .bind(req.wtid.as_slice()) - .bind(self.payto.to_string()) - .bind(creditor.0.to_string()) - .bind(&creditor.name) - .bind(req.exchange_base_url.as_str()) - .bind(req.request_uid.as_slice()) - .fetch_optional(&self.pool) - .await?; - let Some(r) = r else { - return Err(failure( + match db::transfer(&self.pool, &creditor, &req).await? { + db::TransferResult::Success(transfer_response) => Ok(transfer_response), + db::TransferResult::RequestUidReuse => Err(failure( + ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, + format!("Request UID {} already used", req.request_uid), + )), + db::TransferResult::WtidReuse => Err(failure( ErrorCode::BANK_TRANSFER_WTID_REUSED, format!("wtid {} already used", req.request_uid), - )); - }; - let row_id = r.try_get_safeu64(0)?; - sqlx::query("NOTIFY new_tx").execute(&self.pool).await?; - sqlx::query("SELECT pg_notify('taler_out', '' || $1)") - .bind(*row_id as i64) - .execute(&self.pool) - .await?; - - Ok(TransferResponse { timestamp, row_id }) + )), + } } async fn transfer_page( @@ -195,100 +128,24 @@ impl WireGateway for ServerState { params: Page, status: Option<TransferState>, ) -> ApiResult<TransferList> { - let debit_account = self.payto.clone(); - if status.is_some_and(|s| s != TransferState::success) { - return Ok(TransferList { - transfers: Vec::new(), - debit_account, - }); - } - let transfers = page( - &self.pool, - "id", - &params, - || { - QueryBuilder::new( - " - SELECT - id, - status, - (amount).val as amount_val, - (amount).frac as amount_frac, - credit_acc, - credit_name, - created - FROM tx_out WHERE request_uid IS NOT NULL AND - ", - ) - }, - |r: PgRow| { - Ok(TransferListStatus { - row_id: r.try_get_safeu64("id")?, - // TODO Fetch inner status - status: TransferState::success, - amount: r.try_get_amount("amount", &self.currency)?, - credit_account: sql_payto(&r, "credit_acc", "credit_name")?, - timestamp: r.try_get_timestamp("created")?, - }) - }, - ) - .await?; + let transfers = db::transfer_page(&self.pool, &status, &params, &self.currency).await?; Ok(TransferList { transfers, - debit_account, + debit_account: self.payto.clone(), }) } async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { - Ok(sqlx::query( - " - SELECT - status, - (amount).val as amount_val, - (amount).frac as amount_frac, - exchange_url, - wtid, - credit_acc, - credit_name, - created - FROM tx_out WHERE request_uid IS NOT NULL AND id = $1 - ", - ) - .bind(id as i64) - .try_map(|r: PgRow| { - Ok(TransferStatus { - // TODO Fetch inner status - status: TransferState::success, - status_msg: None, - amount: r.try_get_amount("amount", &self.currency)?, - origin_exchange_url: r.try_get("exchange_url")?, - wtid: r.try_get_base32("wtid")?, - credit_account: sql_payto(&r, "credit_acc", "credit_name")?, - timestamp: r.try_get_timestamp("created")?, - }) - }) - .fetch_optional(&self.pool) - .await?) + let status = db::transfer_by_id(&self.pool, id, &self.currency).await?; + Ok(status) } async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { - let outgoing_transactions = history( - &self.pool, - "id", - &params, - || self.taler_out_channel.subscribe(), - || QueryBuilder::new( - "SELECT id, created, (amount).val, (amount).frac, wtid, credit_acc, credit_name, exchange_url FROM tx_out WHERE" - ), |r| { - Ok(OutgoingBankTransaction { - row_id: r.try_get_safeu64(0)?, - date: r.try_get_timestamp(1)?, - amount: r.try_get_amount_i(2, &self.currency)?, - wtid: r.try_get_base32(4)?, - credit_account: sql_payto(&r, "credit_acc", "credit_name")?, - exchange_base_url: r.try_get_url("exchange_url")?, + let outgoing_transactions = + db::outgoing_history(&self.pool, &params, &self.currency, || { + self.taler_out_channel.subscribe() }) - }).await?; + .await?; Ok(OutgoingHistory { debit_account: self.payto.clone(), outgoing_transactions, @@ -296,27 +153,11 @@ impl WireGateway for ServerState { } async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { - let incoming_transactions = history( - &self.pool, - "id", - &params, - || self.taler_in_channel.subscribe(), - || { - QueryBuilder::new( - "SELECT id, received, (amount).val, (amount).frac, reserve_pub, debit_acc FROM tx_in WHERE" - ) - }, - |r| { - Ok(IncomingBankTransaction::Reserve { - row_id: r.try_get_safeu64(0)?, - date: r.try_get_timestamp(1)?, - amount: r.try_get_amount_i(2, &self.currency)?, - reserve_pub: r.try_get_base32(4)?, - debit_account: sql_generic_payto(&r, 5)?, - }) - }, - ) - .await?; + let incoming_transactions = + db::incoming_history(&self.pool, &params, &self.currency, || { + self.taler_in_channel.subscribe() + }) + .await?; Ok(IncomingHistory { credit_account: self.payto.clone(), incoming_transactions, @@ -329,25 +170,28 @@ impl WireGateway for ServerState { ) -> ApiResult<AddIncomingResponse> { let debtor = FullBtcPayto::try_from(&req.debit_account)?; let timestamp = Timestamp::now(); - let r = sqlx::query("INSERT INTO tx_in (received, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6) ON CONFLICT (reserve_pub) DO NOTHING RETURNING id") - .bind_timestamp(&Timestamp::now()) - .bind_amount(&req.amount) - .bind(req.reserve_pub.as_slice()) - .bind(debtor.0.to_string()) - .bind(self.payto.to_string()) - .fetch_optional(&self.pool).await?; - let Some(r) = r else { - return Err(failure( + match db::register_tx_in_admin( + &self.pool, + &req.amount, + &debtor.0, + &timestamp, + &req.reserve_pub, + ) + .await? + { + db::AddIncomingResult::Success { + new: _, + row_id, + valued_at, + } => Ok(AddIncomingResponse { + timestamp: valued_at, + row_id, + }), + db::AddIncomingResult::ReservePubReuse => Err(failure( ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, "reserve_pub used already".to_owned(), - )); - }; - let row_id = r.try_get_safeu64(0)?; - sqlx::query("SELECT pg_notify('taler_in', '' || $1)") - .bind(*row_id as i64) - .execute(&self.pool) - .await?; - Ok(AddIncomingResponse { timestamp, row_id }) + )), + } } async fn add_incoming_kyc(&self, _req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> { diff --git a/depolymerizer-bitcoin/src/db.rs b/depolymerizer-bitcoin/src/db.rs @@ -0,0 +1,597 @@ +/* + This file is part of TALER + Copyright (C) 2025 Taler Systems SA + + TALER is free software; you can redistribute it and/or modify it under the + terms of the GNU Affero General Public License as published by the Free Software + Foundation; either version 3, or (at your option) any later version. + + TALER is distributed in the hope that it will be useful, but WITHOUT ANY + WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR + A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License along with + TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> +*/ + +use bitcoin::{Address, BlockHash}; +use bitcoin::{Txid, hashes::Hash}; +use common::{ + status::{BounceStatus, DebitStatus}, + url::Url, +}; +use sqlx::{ + PgExecutor, PgPool, QueryBuilder, Row, + postgres::{PgListener, PgRow}, +}; +use taler_api::db::{BindHelper as _, TypeHelper as _, history, page}; +use taler_common::{ + api_common::{EddsaPublicKey, SafeU64, ShortHashCode}, + api_params::{History, Page}, + api_wire::{ + IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, + TransferResponse, TransferState, TransferStatus, + }, + types::{ + amount::{Amount, Currency}, + timestamp::Timestamp, + }, +}; +use tokio::sync::watch::Receiver; + +use crate::{ + payto::FullBtcPayto, + sql::{sql_addr, sql_btc_amount, sql_generic_payto, sql_payto}, +}; + +/// Lock the database for worker execution +pub async fn worker_lock<'a>(e: impl PgExecutor<'a>) -> sqlx::Result<bool> { + sqlx::query("SELECT pg_try_advisory_lock(42)") + .try_map(|r: PgRow| r.try_get(0)) + .fetch_one(e) + .await +} + +/// Initialize the worker status +pub async fn init_status<'a>(e: impl PgExecutor<'a>) -> sqlx::Result<()> { + sqlx::query( + "INSERT INTO state (name, value) VALUES ('status', $1) ON CONFLICT (name) DO NOTHING", + ) + .bind([1u8]) + .execute(e) + .await?; + Ok(()) +} + +/// Update the worker status +pub async fn update_status(e: &mut PgListener, new_status: bool) -> sqlx::Result<()> { + sqlx::query("UPDATE state SET value=$1 WHERE name='status'") + .bind([new_status as u8]) + .execute(&mut *e) + .await?; + sqlx::query("NOTIFY status").execute(e).await?; + Ok(()) +} + +/// Initialize the worker sync state +pub async fn init_sync_state<'a>(e: impl PgExecutor<'a>, hash: &BlockHash) -> sqlx::Result<()> { + sqlx::query( + "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING", + ) + .bind(hash.as_byte_array()) + .execute(e) + .await?; + Ok(()) +} + +/// Get the current worker sync state +pub async fn get_sync_state<'a>(e: impl PgExecutor<'a>) -> sqlx::Result<BlockHash> { + sqlx::query("SELECT value FROM state WHERE name='last_hash'") + .try_map(|r: PgRow| r.try_get_map(0, BlockHash::from_slice)) + .fetch_one(e) + .await +} + +/// Update the worker sync state if it hasn't changed yet +pub async fn swap_sync_state<'a>( + e: impl PgExecutor<'a>, + from: &BlockHash, + to: &BlockHash, +) -> sqlx::Result<()> { + sqlx::query("UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2") + .bind(to.as_byte_array()) + .bind(from.as_byte_array()) + .execute(e) + .await?; + Ok(()) +} + +pub enum TransferResult { + Success(TransferResponse), + RequestUidReuse, + WtidReuse, +} + +/// Initiate a new Taler transfer idempotently +pub async fn transfer<'a>( + e: impl PgExecutor<'a>, + creditor: &FullBtcPayto, + transfer: &TransferRequest, +) -> sqlx::Result<TransferResult> { + sqlx::query( + " + SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at + FROM taler_transfer(($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8) + ", + ) + .bind_amount(&transfer.amount) + .bind(transfer.exchange_base_url.as_str()) + .bind(creditor.0.to_string()) + .bind(&creditor.name) + .bind(transfer.request_uid.as_slice()) + .bind(transfer.wtid.as_slice()) + .bind_timestamp(&Timestamp::now()) + .try_map(|r: PgRow| { + Ok(if r.try_get("out_request_uid_reuse")? { + TransferResult::RequestUidReuse + } else if r.try_get("out_wtid_reuse")? { + TransferResult::WtidReuse + } else { + TransferResult::Success(TransferResponse { + row_id: r.try_get_safeu64("out_transfer_row_id")?, + timestamp: r.try_get_timestamp("out_created_at")?, + }) + }) + }) + .fetch_one(e) + .await +} + +/// Paginate initiated Taler transfers +pub async fn transfer_page<'a>( + e: impl PgExecutor<'a>, + status: &Option<TransferState>, + params: &Page, + currency: &Currency, +) -> sqlx::Result<Vec<TransferListStatus>> { + if status.is_some_and(|s| s != TransferState::success) { + return Ok(Vec::new()); + } + page( + e, + "id", + params, + || { + QueryBuilder::new( + " + SELECT + id, + status, + (amount).val as amount_val, + (amount).frac as amount_frac, + credit_acc, + credit_name, + created + FROM tx_out WHERE request_uid IS NOT NULL AND + ", + ) + }, + |r: PgRow| { + Ok(TransferListStatus { + row_id: r.try_get_safeu64("id")?, + // TODO Fetch inner status + status: TransferState::success, + amount: r.try_get_amount("amount", currency)?, + credit_account: sql_payto(&r, "credit_acc", "credit_name")?, + timestamp: r.try_get_timestamp("created")?, + }) + }, + ) + .await +} + +/// Get a Taler transfer info +pub async fn transfer_by_id<'a>( + e: impl PgExecutor<'a>, + id: u64, + currency: &Currency, +) -> sqlx::Result<Option<TransferStatus>> { + sqlx::query( + " + SELECT + status, + (amount).val as amount_val, + (amount).frac as amount_frac, + exchange_url, + wtid, + credit_acc, + credit_name, + created + FROM tx_out WHERE request_uid IS NOT NULL AND id = $1 + ", + ) + .bind(id as i64) + .try_map(|r: PgRow| { + Ok(TransferStatus { + // TODO Fetch inner status + status: TransferState::success, + status_msg: None, + amount: r.try_get_amount_i(1, currency)?, + origin_exchange_url: r.try_get(3)?, + wtid: r.try_get_base32(4)?, + credit_account: sql_payto(&r, 5, 6)?, + timestamp: r.try_get_timestamp(7)?, + }) + }) + .fetch_optional(e) + .await +} + +/// Fetch outgoing Taler transactions history +pub async fn outgoing_history( + db: &PgPool, + params: &History, + currency: &Currency, + listen: impl FnOnce() -> Receiver<i64>, +) -> sqlx::Result<Vec<OutgoingBankTransaction>> { + history( + db, + "id", + params, + listen, + || QueryBuilder::new( + "SELECT id, created, (amount).val, (amount).frac, wtid, credit_acc, credit_name, exchange_url FROM tx_out WHERE" + ), |r| { + Ok(OutgoingBankTransaction { + row_id: r.try_get_safeu64(0)?, + date: r.try_get_timestamp(1)?, + amount: r.try_get_amount_i(2, currency)?, + wtid: r.try_get_base32(4)?, + credit_account: sql_payto(&r, 5, 6)?, + exchange_base_url: r.try_get_url(7)?, + }) + }).await +} + +/// Fetch incoming Taler transactions history +pub async fn incoming_history( + db: &PgPool, + params: &History, + currency: &Currency, + listen: impl FnOnce() -> Receiver<i64>, +) -> sqlx::Result<Vec<IncomingBankTransaction>> { + history( + db, + "id", + params, + listen, + || QueryBuilder::new( + "SELECT id, received, (amount).val, (amount).frac, reserve_pub, debit_acc FROM tx_in WHERE" + ), |r| { + Ok(IncomingBankTransaction::Reserve { + row_id: r.try_get_safeu64(0)?, + date: r.try_get_timestamp(1)?, + amount: r.try_get_amount_i(2, currency)?, + reserve_pub: r.try_get_base32(4)?, + debit_account: sql_generic_payto(&r, 5)?, + }) + }).await +} + +#[derive(Debug, PartialEq, Eq)] +pub enum AddIncomingResult { + Success { + new: bool, + row_id: SafeU64, + valued_at: Timestamp, + }, + ReservePubReuse, +} + +/// Register a fake Taler credit +pub async fn register_tx_in_admin<'a>( + e: impl PgExecutor<'a>, + amount: &Amount, + debit_acc: &Address, + received: &Timestamp, + reserve_pub: &EddsaPublicKey, +) -> sqlx::Result<AddIncomingResult> { + sqlx::query( + " + SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new + FROM register_tx_in(($1, $2)::taler_amount, $3, $4, $5, NULL) + ", + ) + .bind_amount(amount) + .bind(debit_acc.to_string()) + .bind(reserve_pub.as_slice()) + .bind_timestamp(received) + .try_map(|r: PgRow| { + Ok(if r.try_get(0)? { + AddIncomingResult::ReservePubReuse + } else { + AddIncomingResult::Success { + row_id: r.try_get_safeu64(1)?, + valued_at: r.try_get_timestamp(2)?, + new: r.try_get(3)?, + } + }) + }) + .fetch_one(e) + .await +} + +/// Register a Taler credit +pub async fn register_tx_in<'a>( + e: impl PgExecutor<'a>, + txid: &Txid, + amount: &Amount, + debit_acc: &Address, + received: &Timestamp, + reserve_pub: &EddsaPublicKey, +) -> sqlx::Result<AddIncomingResult> { + sqlx::query( + " + SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new + FROM register_tx_in(($1, $2)::taler_amount, $3, $4, $5, $6) + ", + ) + .bind_amount(amount) + .bind(debit_acc.to_string()) + .bind(reserve_pub.as_slice()) + .bind_timestamp(received) + .bind(txid.as_byte_array()) + .try_map(|r: PgRow| { + Ok(if r.try_get(0)? { + AddIncomingResult::ReservePubReuse + } else { + AddIncomingResult::Success { + row_id: r.try_get_safeu64(1)?, + valued_at: r.try_get_timestamp(2)?, + new: r.try_get(3)?, + } + }) + }) + .fetch_one(e) + .await +} + +/// Update a transaction id after bumping it +pub async fn bump_tx_id<'a>( + e: impl PgExecutor<'a>, + from: &Txid, + to: &Txid, +) -> sqlx::Result<ShortHashCode> { + sqlx::query("UPDATE tx_out SET txid=$1 WHERE txid=$2 RETURNING wtid") + .bind(to.as_byte_array()) + .bind(from.as_byte_array()) + .try_map(|r: PgRow| r.try_get_base32(0)) + .fetch_one(e) + .await +} + +/// Reset the state of a conflicted debit +pub async fn conflict_tx_out<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> { + Ok( + sqlx::query("UPDATE tx_out SET status=$1, txid=NULL where txid=$2") + .bind(DebitStatus::requested) + .bind(id.as_byte_array()) + .execute(e) + .await? + .rows_affected() + > 0, + ) +} + +/// Reset the state of a conflicted bounce +pub async fn conflict_bounce<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> { + Ok( + sqlx::query("UPDATE bounce SET status=$1, txid=NULL where txid=$2") + .bind(BounceStatus::requested) + .bind(id.as_byte_array()) + .execute(e) + .await? + .rows_affected() + > 0, + ) +} + +/// Initiate a bounce +pub async fn bounce<'a>(e: impl PgExecutor<'a>, txid: &Txid, reason: &str) -> sqlx::Result<()> { + sqlx::query("INSERT INTO bounce (created, bounced, reason, status) VALUES ($1, $2, $3, 'requested') ON CONFLICT (bounced) DO NOTHING") + .bind_timestamp(&Timestamp::now()) + .bind(txid.as_byte_array()) + .bind(reason) + .execute(e) + .await?; + Ok(()) +} + +pub enum ProblematicTx { + In { + txid: Txid, + addr: Address, + reserve_pub: EddsaPublicKey, + }, + Bounce { + txid: Txid, + bounced: Txid, + }, +} + +/// Handle transactions being removed during a reorganization +pub async fn reorg<'a>(e: impl PgExecutor<'a>, ids: &[Txid]) -> sqlx::Result<Vec<ProblematicTx>> { + // A removed incoming transaction is a correctness issues in only two cases: + // - it is a confirmed credit registered in the database + // - it is an invalid transactions already bounced + // Those two cases can compromise bitcoin backing + // Removed outgoing transactions will be retried automatically by the node + sqlx::query( + " + SELECT tx_in.txid, NULL, debit_acc, tx_in.reserve_pub + FROM tx_in WHERE tx_in.txid = ANY($1) + UNION ALL + SELECT bounce.txid, bounce.bounced, NULL, NULL + from bounce WHERE bounce.bounced = ANY($1); + ", + ) + .bind(ids.iter().map(|it| it.as_byte_array()).collect::<Vec<_>>()) + .try_map(|r: PgRow| { + let txid = r.try_get_map(0, Txid::from_slice)?; + let check: Option<&[u8]> = r.try_get(1)?; + Ok(if check.is_some() { + ProblematicTx::Bounce { + txid, + bounced: r.try_get_map(1, Txid::from_slice)?, + } + } else { + ProblematicTx::In { + txid, + addr: sql_addr(&r, 2)?, + reserve_pub: r.try_get_base32(3)?, + } + }) + }) + .fetch_all(e) + .await +} + +pub enum SyncOutResult { + New, + Replaced, + Recovered, + None, +} + +pub async fn sync_out<'a>( + e: impl PgExecutor<'a>, + txid: &Txid, + replace_txid: Option<&Txid>, + amount: &Amount, + exchange_url: &Url, + credit_acc: &Address, + wtid: &ShortHashCode, + created: &Timestamp, +) -> sqlx::Result<SyncOutResult> { + sqlx::query( + " + SELECT out_replaced, out_recovered, out_new + FROM sync_out($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8) + ", + ) + .bind(txid.as_byte_array()) + .bind(replace_txid.map(|it| it.as_byte_array())) + .bind_amount(amount) + .bind(exchange_url.to_string()) + .bind(&credit_acc.to_string()) + .bind(wtid.as_slice()) + .bind_timestamp(created) + .try_map(|r: PgRow| { + Ok(if r.try_get(0)? { + SyncOutResult::Replaced + } else if r.try_get(1)? { + SyncOutResult::Recovered + } else if r.try_get(2)? { + SyncOutResult::New + } else { + SyncOutResult::None + }) + }) + .fetch_one(e) + .await +} + +pub async fn pending_debit<'a>( + e: impl PgExecutor<'a>, + currency: &Currency, +) -> sqlx::Result<Option<(i64, bitcoin::Amount, ShortHashCode, Address, Url)>> { + sqlx::query( + "SELECT id, (amount).val, (amount).frac, wtid, credit_acc, exchange_url FROM tx_out WHERE status='requested' ORDER BY created LIMIT 1", + ) + .try_map(|r: PgRow| { + Ok(( + r.try_get(0)?, + sql_btc_amount(&r, 1, currency)?, + r.try_get_base32(3)?, + sql_addr(&r, 4)?, + r.try_get_parse(5)? + )) + }) + .fetch_optional(e) + .await +} + +pub async fn debit_sent<'a>(e: impl PgExecutor<'a>, id: i64, txid: &Txid) -> sqlx::Result<()> { + sqlx::query("UPDATE tx_out SET status='sent', txid=$1 WHERE id=$2") + .bind(txid.as_byte_array()) + .bind(id) + .execute(e) + .await?; + Ok(()) +} + +pub async fn pending_bounce<'a>( + e: impl PgExecutor<'a>, +) -> sqlx::Result<Option<(i64, Txid, Option<String>)>> { + sqlx::query( + "SELECT id, bounced, reason FROM bounce WHERE status='requested' ORDER BY created LIMIT 1", + ) + .try_map(|r: PgRow| { + Ok(( + r.try_get(0)?, + r.try_get_map(1, Txid::from_slice)?, + r.try_get(2)?, + )) + }) + .fetch_optional(e) + .await +} + +pub async fn bounce_set_status<'a>( + e: impl PgExecutor<'a>, + id: i64, + txid: Option<&Txid>, + status: &BounceStatus, +) -> sqlx::Result<()> { + sqlx::query("UPDATE bounce SET txid=$1, status=$2 WHERE id=$3") + .bind(txid.map(|it| it.as_byte_array())) + .bind(status) + .bind(id) + .execute(e) + .await?; + Ok(()) +} + +pub enum SyncBounceResult { + New, + Recovered, + None, +} + +pub async fn sync_bounce<'a>( + e: impl PgExecutor<'a>, + txid: &Txid, + bounced: &Txid, + created: &Timestamp, +) -> sqlx::Result<SyncBounceResult> { + sqlx::query( + " + SELECT out_recovered, out_new + FROM sync_bounce($1, $2, $3) + ", + ) + .bind(txid.as_byte_array()) + .bind(bounced.as_byte_array()) + .bind_timestamp(created) + .try_map(|r: PgRow| { + Ok(if r.try_get(0)? { + SyncBounceResult::Recovered + } else if r.try_get(1)? { + SyncBounceResult::New + } else { + SyncBounceResult::None + }) + }) + .fetch_one(e) + .await +} diff --git a/depolymerizer-bitcoin/src/lib.rs b/depolymerizer-bitcoin/src/lib.rs @@ -23,10 +23,12 @@ use taler_common::{api_common::EddsaPublicKey, config::parser::ConfigSource}; pub mod api; pub mod config; +pub mod db; pub mod payto; pub mod rpc; pub mod rpc_utils; pub mod segwit; +pub mod sql; pub mod taler_utils; pub const CONFIG_SOURCE: ConfigSource = ConfigSource::simple("depolymerizer-bitcoin"); diff --git a/depolymerizer-bitcoin/src/loops/worker.rs b/depolymerizer-bitcoin/src/loops/worker.rs @@ -18,26 +18,22 @@ use std::{collections::HashMap, fmt::Write, time::SystemTime}; use bitcoin::{Amount as BtcAmount, BlockHash, Txid, hashes::Hash}; use common::{ metadata::OutMetadata, - status::{BounceStatus, DebitStatus}, + status::BounceStatus, taler_common::{api_common::ShortHashCode, types::timestamp::Timestamp}, }; use depolymerizer_bitcoin::{ GetOpReturnErr, GetSegwitErr, + db::{self, AddIncomingResult, SyncBounceResult, SyncOutResult, worker_lock}, rpc::{self, Category, ErrorCode, Rpc, Transaction, rpc_wallet}, rpc_utils::sender_address, taler_utils::btc_to_taler, }; -use sqlx::{PgPool, Row, postgres::PgListener}; -use taler_api::db::{BindHelper as _, TypeHelper}; -use taler_common::ExpoBackoffDecorr; +use sqlx::{PgPool, postgres::PgListener}; +use taler_common::{ExpoBackoffDecorr, types::url}; use tokio::time::sleep; use tracing::{error, info, warn}; -use crate::{ - WorkerCfg, - fail_point::fail_point, - sql::{sql_addr, sql_btc_amount, sql_txid}, -}; +use crate::{WorkerCfg, fail_point::fail_point}; use super::{LoopError, LoopResult, analysis::analysis}; @@ -63,11 +59,7 @@ pub async fn worker(mut state: WorkerCfg, pool: PgPool) { // on postgres advisory lock // Take the lock - let row = sqlx::query("SELECT pg_try_advisory_lock(42)") - .fetch_one(&mut *db) - .await?; - let locked: bool = row.try_get(0)?; - if !locked { + if !worker_lock(&mut *db).await? { return Err(LoopError::Concurrency); } @@ -111,13 +103,7 @@ pub async fn worker(mut state: WorkerCfg, pool: PgPool) { for id in stuck { let bump = rpc.bump_fee(&id).await?; fail_point("(injected) fail bump", 0.3)?; - let row = - sqlx::query("UPDATE tx_out SET txid=$1 WHERE txid=$2 RETURNING wtid") - .bind(bump.txid.as_byte_array()) - .bind(id.as_byte_array()) - .fetch_one(&mut *db) - .await?; - let wtid: ShortHashCode = row.try_get_base32(0)?; + let wtid = db::bump_tx_id(&mut *db, &id, &bump.txid).await?; info!(">> (bump) {wtid} replace {id} with {}", bump.txid); } @@ -149,14 +135,6 @@ pub async fn worker(mut state: WorkerCfg, pool: PgPool) { } } -/// Retrieve last stored hash -async fn last_hash(db: &mut PgListener) -> sqlx::Result<BlockHash> { - let row = sqlx::query("SELECT value FROM state WHERE name='last_hash'") - .fetch_one(db) - .await?; - Ok(BlockHash::from_slice(row.get(0)).unwrap()) -} - /// Parse new transactions, return stuck transactions if the database is up to date with the latest mined block async fn sync_chain( rpc: &mut Rpc, @@ -165,7 +143,7 @@ async fn sync_chain( status: &mut bool, ) -> LoopResult<Option<Vec<Txid>>> { // Get stored last_hash - let last_hash = last_hash(db).await?; + let sync_state = db::get_sync_state(&mut *db).await?; // Get the current confirmation delay let conf_delay = state.confirmation; @@ -176,7 +154,7 @@ async fn sync_chain( BlockHash, ) = { // Get all transactions made since this block - let list = rpc.list_since_block(Some(&last_hash), conf_delay).await?; + let list = rpc.list_since_block(Some(&sync_state), conf_delay).await?; // Only keep ids and category let txs = list .transactions @@ -192,15 +170,11 @@ async fn sync_chain( }; // Check if a confirmed incoming transaction have been removed by a blockchain reorganization - let new_status = sync_chain_removed(&txs, &removed, rpc, db, conf_delay as i32).await?; + let new_status = sync_chain_removed(removed, db, conf_delay as i32).await?; // Sync status with database if *status != new_status { - sqlx::query("UPDATE state SET value=$1 WHERE name='status'") - .bind([new_status as u8].as_slice()) - .execute(&mut *db) - .await?; - sqlx::query("NOTIFY status").execute(&mut *db).await?; + db::update_status(db, new_status).await?; *status = new_status; if new_status { info!("Recovered lost transactions"); @@ -229,20 +203,14 @@ async fn sync_chain( } // Move last_hash forward - sqlx::query("UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2") - .bind(lastblock.as_byte_array()) - .bind(last_hash.as_byte_array().as_slice()) - .execute(db) - .await?; + db::swap_sync_state(db, &sync_state, &lastblock).await?; Ok(Some(stuck)) } /// Sync database with removed transactions, return false if bitcoin backing is compromised async fn sync_chain_removed( - txs: &HashMap<Txid, (Category, i32)>, - removed: &HashMap<Txid, (Category, i32)>, - rpc: &mut Rpc, + removed: HashMap<Txid, (Category, i32)>, db: &mut PgListener, min_confirmations: i32, ) -> LoopResult<bool> { @@ -252,67 +220,35 @@ async fn sync_chain_removed( // Those two cases can compromise bitcoin backing // Removed outgoing transactions will be retried automatically by the node - let mut blocking_debit = Vec::new(); - let mut blocking_bounce = Vec::new(); + let potential_problematic_ids: Vec<Txid> = removed + .into_iter() + .filter_map(|(id, (cat, confirmations))| { + (cat == Category::Receive && confirmations < min_confirmations).then_some(id) + }) + .collect(); // Only keep incoming transaction that are not reconfirmed - // TODO study risk of accepting only mined transactions for faster recovery - for (id, _) in removed.iter().filter(|(id, (cat, _))| { - *cat == Category::Receive - && txs - .get(*id) - .map(|(_, confirmations)| *confirmations < min_confirmations) - .unwrap_or(true) - }) { - match rpc.get_tx_segwit_key(id).await { - Ok((full, key)) => { - // Credits are only problematic if not reconfirmed and stored in the database - if sqlx::query("SELECT 1 FROM tx_in WHERE reserve_pub=$1") - .bind(key.as_slice()) - .fetch_optional(&mut *db) - .await? - .is_some() - { - let debit_addr = sender_address(rpc, &full).await?; - blocking_debit.push((key, id, debit_addr)); - } - } - Err(err) => match err { - GetSegwitErr::Decode(_) => { - // Invalid tx are only problematic if already bounced - if let Some(row) = - sqlx::query("SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL") - .bind(id.as_byte_array()) - .fetch_optional(&mut *db) - .await? - { - blocking_bounce.push((sql_txid(&row, 0)?, id)); - } else { - // Remove transaction from bounce table - sqlx::query("DELETE FROM bounce WHERE bounced=$1") - .bind(id.as_byte_array()) - .execute(&mut *db) - .await?; - } - } - GetSegwitErr::RPC(it) => return Err(it.into()), - }, - } + let problematic_tx = db::reorg(&mut *db, &potential_problematic_ids).await?; + if problematic_tx.is_empty() { + return Ok(true); } - - if !blocking_bounce.is_empty() || !blocking_debit.is_empty() { - let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string(); - for (key, id, addr) in blocking_debit { - write!(&mut buf, "\n\tcredit {key} in {id} from {addr}",).unwrap(); - } - for (id, bounced) in blocking_bounce { - write!(&mut buf, "\n\tbounced {id} in {bounced}").unwrap(); + let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string(); + for tx in problematic_tx { + match tx { + db::ProblematicTx::In { + txid, + addr, + reserve_pub, + } => { + write!(&mut buf, "\n\tcredit {reserve_pub} in {txid} from {addr}",).unwrap(); + } + db::ProblematicTx::Bounce { txid, bounced } => { + write!(&mut buf, "\n\tbounced {txid} in {bounced}").unwrap(); + } } - error!("{buf}"); - Ok(false) - } else { - Ok(true) } + error!("{buf}"); + Ok(false) } /// Sync database with an incoming confirmed transaction @@ -322,27 +258,37 @@ async fn sync_chain_incoming_confirmed( db: &mut PgListener, state: &WorkerCfg, ) -> Result<(), LoopError> { - match rpc.get_tx_segwit_key(id).await { + match rpc.get_tx_segwit_key(&id).await { Ok((full, reserve_pub)) => { // Store transactions in database let debit_addr = sender_address(rpc, &full).await?; - let credit_addr = full.details[0].address.clone().unwrap().assume_checked(); let amount = btc_to_taler(&full.amount, &state.currency); - if let Some(row) = sqlx::query("INSERT INTO tx_in (received, amount, reserve_pub, debit_acc, credit_acc) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6) ON CONFLICT (reserve_pub) DO NOTHING RETURNING id") - .bind((full.time * 1000000) as i64).bind_amount(&amount).bind(reserve_pub.as_slice()).bind(debit_addr.to_string()).bind(credit_addr.to_string()).fetch_optional(&mut *db).await? { - info!("<< {amount} {reserve_pub} in {id} from {debit_addr}"); - let id: i64 = row.try_get(0)?; - sqlx::query("SELECT pg_notify('taler_in', $1)").bind(id.to_string()).execute(db).await?; + match db::register_tx_in( + &mut *db, + id, + &amount, + &debit_addr, + &Timestamp::from_sql_micros((full.time * 1000000) as i64).unwrap(), + &reserve_pub, + ) + .await? + { + AddIncomingResult::Success { + new, + row_id: _, + valued_at: _, + } => { + if new { + info!("<< {amount} {reserve_pub} in {id} from {debit_addr}"); + } + } + AddIncomingResult::ReservePubReuse => { + db::bounce(db, &id, "reserve_pub reuse").await? + } } } Err(err) => match err { - GetSegwitErr::Decode(_) => { - // If encoding is wrong request a bounce - sqlx::query( - "INSERT INTO bounce (created, bounced) VALUES ($1, $2) ON CONFLICT (bounced) DO NOTHING") - .bind_timestamp(&Timestamp::now()).bind(id.as_byte_array()) - .execute(db).await?; - } + GetSegwitErr::Decode(e) => db::bounce(db, &id, &e.to_string()).await?, GetSegwitErr::RPC(e) => return Err(e.into()), }, } @@ -351,10 +297,9 @@ async fn sync_chain_incoming_confirmed( /// Sync database with a debit transaction, return true if stuck async fn sync_chain_debit( - id: &Txid, + txid: &Txid, full: &Transaction, wtid: &ShortHashCode, - rpc: &mut Rpc, db: &mut PgListener, confirmations: i32, state: &WorkerCfg, @@ -363,84 +308,36 @@ async fn sync_chain_debit( let amount = btc_to_taler(&full.amount, &state.currency); if confirmations < 0 { - if full.replaced_by_txid.is_none() { - // Handle conflicting tx - let nb_row = sqlx::query("UPDATE tx_out SET status=$1, txid=NULL where txid=$2") - .bind(DebitStatus::Requested as i16) - .bind(id.as_byte_array()) - .execute(db) - .await? - .rows_affected(); - if nb_row > 0 { - warn!(">> (conflict) {wtid} in {id} to {credit_addr}"); - } + // Handle conflicting tx + if full.replaced_by_txid.is_none() && db::conflict_tx_out(db, txid).await? { + warn!(">> (conflict) {wtid} in {txid} to {credit_addr}"); } } else { - // Get previous out tx - if let Some(row) = sqlx::query("SELECT id,status,txid FROM tx_out WHERE wtid=$1 FOR UPDATE") - .bind(wtid.as_slice()) - .fetch_optional(&mut *db) - .await? + match db::sync_out( + db, + txid, + full.replaced_by_txid.as_ref(), + &amount, + &url("https://exchange.url.TODO/"), + &credit_addr, + wtid, + &Timestamp::from_sql_micros((full.time * 1000000) as i64).unwrap(), + ) + .await? { - // If already in database, sync status - let row_id: i64 = row.get(0); - let status: i16 = row.get(1); - match DebitStatus::try_from(status as u8).unwrap() { - DebitStatus::Requested => { - let nb_row = sqlx::query( - "UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3 AND status=$4", - ) - .bind(DebitStatus::Sent as i16) - .bind(id.as_byte_array()) - .bind(row_id) - .bind(status) - .execute(db) - .await? - .rows_affected(); - if nb_row > 0 { - warn!(">> (recovered) {amount} {wtid} in {id} to {credit_addr}"); - } - } - DebitStatus::Sent => { - if let Some(txid) = full.replaces_txid { - let stored_id = sql_txid(&row, 2)?; - if txid == stored_id { - let nb_row = sqlx::query("UPDATE tx_out SET txid=$1 WHERE txid=$2") - .bind(id.as_byte_array()) - .bind(txid.as_byte_array()) - .execute(db) - .await? - .rows_affected(); - if nb_row > 0 { - info!(">> (recovered) {wtid} replace {txid} with {id}",); - } - } - } - } + SyncOutResult::New => { + warn!(">> (onchain) {amount} {wtid} in {txid} to {credit_addr}"); } - } else { - // Else add to database - let debit_addr = sender_address(rpc, full).await?; - let update = sqlx::query( - "INSERT INTO tx_out (created, amount, wtid, debit_acc, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10) ON CONFLICT (wtid) DO NOTHING RETURNING id") - .bind((full.time*1000000) as i64) - .bind_amount(&amount) - .bind(wtid.as_slice()) - .bind(debit_addr.to_string()) - .bind(credit_addr.to_string()) - .bind("https://exchange.url.TODO/") - .bind(DebitStatus::Sent as i16) - .bind(id.as_byte_array()) - .bind(None::<&[u8]>) - .fetch_optional(&mut *db).await?; - if let Some(row) = update { - warn!(">> (onchain) {amount} {wtid} in {id} to {credit_addr}"); - let id: i64 = row.try_get(0)?; - sqlx::query("SELECT pg_notify('taler_out', $1)") - .bind(id) - .execute(db) - .await?; + SyncOutResult::Replaced => { + info!( + ">> (recovered) {wtid} replace {txid} with {}", + full.replaced_by_txid.unwrap() + ) + } + SyncOutResult::Recovered => { + warn!(">> (recovered) {amount} {wtid} in {txid} to {credit_addr}") } + SyncOutResult::None => {} } // Check if stuck @@ -461,61 +358,21 @@ async fn sync_chain_debit( /// Sync database with an outgoing bounce transaction async fn sync_chain_bounce( - id: &Txid, + txid: &Txid, bounced: &Txid, db: &mut PgListener, confirmations: i32, ) -> LoopResult<()> { if confirmations < 0 { // Handle conflicting tx - let nb_row = sqlx::query("UPDATE bounce SET status=$1, txid=NULL where txid=$2") - .bind(BounceStatus::Requested as i16) - .bind(id.as_byte_array()) - .execute(&mut *db) - .await? - .rows_affected(); - if nb_row > 0 { - warn!("|| (conflict) {bounced} in {id}"); + if db::conflict_bounce(db, txid).await? { + warn!("|| (conflict) {bounced} in {txid}"); } } else { - // Get previous bounce - let row = sqlx::query("SELECT id, status FROM bounce WHERE bounced=$1") - .bind(bounced.as_byte_array()) - .fetch_optional(&mut *db) - .await?; - if let Some(row) = row { - // If already in database, sync status - let row_id: i64 = row.get(0); - let status: i16 = row.get(1); - match BounceStatus::try_from(status as u8).unwrap() { - BounceStatus::Requested => { - let nb_row = sqlx::query( - "UPDATE bounce SET status=$1, txid=$2 WHERE id=$3 AND status=$4", - ) - .bind(BounceStatus::Sent as i16) - .bind(id.as_byte_array()) - .bind(row_id) - .bind(status) - .execute(db) - .await? - .rows_affected(); - if nb_row > 0 { - warn!("|| (recovered) {bounced} in {id}"); - } - } - BounceStatus::Ignored => { - error!("watcher: ignored bounce {bounced} found in chain at {id}") - } - BounceStatus::Sent => { /* Status is correct */ } - } - } else { - // Else add to database - let nb = sqlx::query( - "INSERT INTO bounce (created, bounced, txid, status) VALUES ($1, $2, $3, $4) ON CONFLICT (txid) DO NOTHING") - .bind_timestamp(&Timestamp::now()).bind(bounced.as_byte_array()).bind(id.as_byte_array()).bind(BounceStatus::Sent as i16).execute(db).await?.rows_affected(); - if nb > 0 { - warn!("|| (onchain) {bounced} in {id}"); - } + match db::sync_bounce(db, txid, bounced, &Timestamp::now()).await? { + SyncBounceResult::New => warn!("|| (onchain) {bounced} in {txid}"), + SyncBounceResult::Recovered => warn!("|| (recovered) {bounced} in {txid}"), + SyncBounceResult::None => {} } } @@ -537,7 +394,7 @@ async fn sync_chain_outgoing( { Ok((full, Ok(info))) => match info { OutMetadata::Debit { wtid, .. } => { - return sync_chain_debit(id, &full, &wtid, rpc, db, confirmations, state).await; + return sync_chain_debit(id, &full, &wtid, db, confirmations, state).await; } OutMetadata::Bounce { bounced } => { sync_chain_bounce(id, &Txid::from_byte_array(bounced), db, confirmations).await? @@ -555,46 +412,31 @@ async fn sync_chain_outgoing( /// Send a debit transaction on the blockchain, return false if no more requested transactions are found async fn debit(db: &mut PgListener, rpc: &mut Rpc, state: &WorkerCfg) -> LoopResult<bool> { // We rely on the advisory lock to ensure we are the only one sending transactions - let row = sqlx::query( - "SELECT id, (amount).val, (amount).frac, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY created LIMIT 1").bind(DebitStatus::Requested as i16).fetch_optional(&mut *db).await?; - if let Some(row) = &row { - let id: i64 = row.get(0); - let amount = sql_btc_amount(row, 1, &state.currency)?; - let wtid: ShortHashCode = row.try_get_base32(3)?; - let addr = sql_addr(row, 4)?; - let url = row.try_get_parse(5)?; + if let Some((id, amount, wtid, addr, url)) = + db::pending_debit(&mut *db, &state.currency).await? + { let metadata = OutMetadata::Debit { wtid: wtid.clone(), url, }; - let tx_id = rpc + let txid = rpc .send(&addr, &amount, Some(&metadata.encode().unwrap()), false) .await?; fail_point("(injected) fail debit", 0.3)?; - sqlx::query("UPDATE tx_out SET status=$1, txid=$2 WHERE id=$3") - .bind(DebitStatus::Sent as i16) - .bind(tx_id.as_byte_array()) - .bind(id) - .execute(db) - .await?; + db::debit_sent(db, id, &txid).await?; let amount = btc_to_taler(&amount.to_signed().unwrap(), &state.currency); - info!(">> {amount} {wtid} in {tx_id} to {addr}"); + info!(">> {amount} {wtid} in {txid} to {addr}"); + Ok(true) + } else { + Ok(false) } - Ok(row.is_some()) } /// Bounce a transaction on the blockchain, return false if no more requested transactions are found async fn bounce(db: &mut PgListener, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> { // We rely on the advisory lock to ensure we are the only one sending transactions - let row = - sqlx::query("SELECT id, bounced FROM bounce WHERE status=$1 ORDER BY created LIMIT 1") - .bind(BounceStatus::Requested as i16) - .fetch_optional(&mut *db) - .await?; - if let Some(row) = &row { - let id: i64 = row.get(0); - let bounced: Txid = sql_txid(row, 1)?; + if let Some((id, bounced, reason)) = db::pending_bounce(&mut *db).await? { let metadata = OutMetadata::Bounce { bounced: *bounced.as_byte_array(), }; @@ -603,31 +445,28 @@ async fn bounce(db: &mut PgListener, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResu .bounce(&bounced, fee, Some(&metadata.encode().unwrap())) .await { - Ok(it) => { + Ok(txid) => { fail_point("(injected) fail bounce", 0.3)?; - sqlx::query("UPDATE bounce SET txid=$1, status=$2 WHERE id=$3") - .bind(it.as_byte_array()) - .bind(BounceStatus::Sent as i16) - .bind(id) - .execute(db) - .await?; - info!("|| {bounced} in {it}"); + db::bounce_set_status(db, id, Some(&txid), &BounceStatus::sent).await?; + if let Some(reason) = reason { + info!("|| {bounced} in {txid}: {reason}"); + } else { + info!("|| {bounced} in {txid}"); + } } Err(err) => match err { rpc::Error::RPC { code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, msg, } => { - sqlx::query("UPDATE bounce SET status=$1 WHERE id=$2") - .bind(BounceStatus::Ignored as i16) - .bind(id) - .execute(db) - .await?; + db::bounce_set_status(db, id, None, &BounceStatus::ignored).await?; info!("|| (ignore) {bounced} because {msg}"); } e => Err(e)?, }, } + Ok(true) + } else { + Ok(false) } - Ok(row.is_some()) } diff --git a/depolymerizer-bitcoin/src/main.rs b/depolymerizer-bitcoin/src/main.rs @@ -14,12 +14,12 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ use axum::{Router, middleware}; -use bitcoin::hashes::Hash; use clap::Parser; use depolymerizer_bitcoin::{ CONFIG_SOURCE, DB_SCHEMA, api::{ServerState, status_middleware}, config::{ServeCfg, WorkerCfg, parse_db_cfg}, + db, rpc::Rpc, }; use taler_api::api::TalerRouter as _; @@ -30,13 +30,13 @@ use taler_common::{ db::{dbinit, pool}, taler_main, }; +use tokio::try_join; use tracing::info; use crate::loops::{watcher::watcher, worker::worker}; mod fail_point; mod loops; -mod sql; /// Taler adapter for bitcoincore #[derive(clap::Parser, Debug)] @@ -120,12 +120,10 @@ async fn app(args: Args, cfg: Config) -> anyhow::Result<()> { let pool = pool(db_cfg.cfg, DB_SCHEMA).await?; // Init status to true - sqlx::query("INSERT INTO state (name, value) VALUES ('status', $1) ON CONFLICT (name) DO NOTHING") - .bind([1u8]) - .execute(&pool).await?; - sqlx::query("INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING") - .bind(genesis_hash.as_byte_array().as_slice()) - .execute(&pool).await?; + try_join!( + db::init_status(&pool), + db::init_sync_state(&pool, &genesis_hash) + )?; // TODO reset ? println!("Database initialised"); diff --git a/depolymerizer-bitcoin/src/sql.rs b/depolymerizer-bitcoin/src/sql.rs @@ -15,10 +15,15 @@ */ use bitcoin::{Address, Amount as BtcAmount, Txid, address::NetworkUnchecked, hashes::Hash}; -use depolymerizer_bitcoin::taler_utils::taler_to_btc; +use sqlx::Row; use sqlx::postgres::PgRow; use taler_api::db::TypeHelper; -use taler_common::types::amount::Currency; +use taler_common::types::{ + amount::Currency, + payto::{PaytoImpl as _, PaytoURI}, +}; + +use crate::{payto::BtcWallet, taler_utils::taler_to_btc}; /// Bitcoin amount from sql pub fn sql_btc_amount(row: &PgRow, idx: usize, currency: &Currency) -> sqlx::Result<BtcAmount> { @@ -37,3 +42,29 @@ pub fn sql_addr(row: &PgRow, idx: usize) -> sqlx::Result<Address> { pub fn sql_txid(row: &PgRow, idx: usize) -> sqlx::Result<Txid> { row.try_get_map(idx, Txid::from_slice) } + +pub fn sql_payto<I: sqlx::ColumnIndex<PgRow>>( + r: &PgRow, + addr: I, + name: I, +) -> sqlx::Result<PaytoURI> { + let addr = r + .try_get_parse::<_, _, bitcoin::Address<NetworkUnchecked>>(addr)? + .assume_checked(); + let name: Option<&str> = r.try_get(name)?; + + Ok(BtcWallet(addr) + .as_payto() + .as_full_payto(name.unwrap_or("Bitcoin User"))) +} + +pub fn sql_generic_payto<I: sqlx::ColumnIndex<PgRow>>( + row: &PgRow, + idx: I, +) -> sqlx::Result<PaytoURI> { + let addr = row + .try_get_parse::<_, _, bitcoin::Address<NetworkUnchecked>>(idx)? + .assume_checked(); + + Ok(BtcWallet(addr).as_payto().as_full_payto("Bitcoin User")) +} diff --git a/makefile b/makefile @@ -41,7 +41,7 @@ check: install-nobuild-files cargo test .PHONY: test -test: install +test: install-nobuild-files RUST_BACKTRACE=true cargo run --profile dev --bin instrumentation .PHONY: doc