depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 9ef4dce283e9c5c6c8fb14ff296692e82a433e5c
parent 42a5714dd465d7699a0907bf3a2083ab8430aabf
Author: Antoine A <>
Date:   Fri, 19 Jun 2026 18:37:28 +0200

common: big upgrade
-support all new APIs at the exception of bitcoin specific subject format
-improve outgoing transaction finality logic
-support outgoing transaction metadata
-logic for kyc and mapping
-support bitcoin v31.0
-improve testbench

Diffstat:
MCargo.toml | 6++++--
MREADME.md | 2+-
Mcontrib/ci/Containerfile | 3++-
Mcontrib/ci/jobs/3-deb-amd64/job.sh | 2+-
Mdebian/etc/depolymerizer-bitcoin/conf.d/depolymerizer-bitcoin-httpd.conf | 11+++++++++--
Mdebian/etc/depolymerizer-bitcoin/conf.d/depolymerizer-bitcoin-worker.conf | 5++++-
Mdebian/etc/depolymerizer-bitcoin/depolymerizer-bitcoin.conf | 7++++++-
Mdebian/etc/depolymerizer-bitcoin/secrets/depolymerizer-bitcoin-httpd.secret.conf | 4++++
Mdepolymerizer-bitcoin/Cargo.toml | 70+++++-----------------------------------------------------------------
Mdepolymerizer-bitcoin/benches/metadata.rs | 7++++---
Mdepolymerizer-bitcoin/db/depolymerizer-bitcoin-0001.sql | 109++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------
Mdepolymerizer-bitcoin/db/depolymerizer-bitcoin-procedures.sql | 377+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------
Mdepolymerizer-bitcoin/depolymerizer-bitcoin.conf | 27+++++++++++++++++++++++++--
Mdepolymerizer-bitcoin/src/api.rs | 351++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---------------
Mdepolymerizer-bitcoin/src/bin/segwit-demo.rs | 4++--
Mdepolymerizer-bitcoin/src/cli.rs | 19+++++++++----------
Mdepolymerizer-bitcoin/src/config.rs | 35+++++++++++++++++++----------------
Mdepolymerizer-bitcoin/src/db.rs | 1232+++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++------------------
Mdepolymerizer-bitcoin/src/lib.rs | 20++++++--------------
Mdepolymerizer-bitcoin/src/loops/watcher.rs | 5++---
Mdepolymerizer-bitcoin/src/loops/worker.rs | 463++++++++++++++++++++++++++++++++++++++++++++-----------------------------------
Mdepolymerizer-bitcoin/src/main.rs | 6+++---
Mdepolymerizer-bitcoin/src/payto.rs | 13+++++++------
Mdepolymerizer-bitcoin/src/rpc.rs | 91++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------
Mdepolymerizer-bitcoin/src/segwit.rs | 17++++++++++-------
Mdepolymerizer-bitcoin/src/setup.rs | 8++++----
Mdepolymerizer-bitcoin/src/sql.rs | 13+++++--------
Ddepolymerizer-bitcoin/tests/api.rs | 105-------------------------------------------------------------------------------
Mdepolymerizer-common/Cargo.toml | 5+++--
Mdepolymerizer-common/src/lib.rs | 6+++---
Mdepolymerizer-common/src/metadata.rs | 111++++++++++++++++++++++++++++++++++++++++++++++++++++---------------------------
Mdepolymerizer-common/src/status.rs | 12+++++++++---
Mmakefile | 24++++++++++++++----------
Arustfmt.toml | 2++
Mscript/prepare.sh | 2+-
Mtestbench/Cargo.toml | 3+--
Mtestbench/src/btc.rs | 372++++++++++++++++++++++++++++++++++++++++++++-----------------------------------
Mtestbench/src/main.rs | 42++++++++++++++++++++++--------------------
Mtestbench/src/utils.rs | 55++++++++++++++++++++++++++++++++++---------------------
Muri-pack/src/lib.rs | 8++++----
40 files changed, 2442 insertions(+), 1212 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml @@ -44,7 +44,9 @@ hex = { package = "const-hex", version = "1.9.1" } clap = { version = "4.5", features = ["derive"] } anyhow = "1" tracing = "0.1" -criterion = "0.7" +criterion = "0.8.2" base64 = "0.22.1" -rand = { version = "0.9.0" } +rand = { version = "0.10.1" } tracing-subscriber = "0.3" +jiff = { version = "0.2", default-features = false, features = ["perf-inline", "std"] } +compact_str = { version = "0.9.0", features = ["serde", "sqlx-postgres"] } diff --git a/README.md b/README.md @@ -7,7 +7,7 @@ Depolymerization is a GNU Taler project to create blockchain adapters ### depolymerizer-bitcoin -Depolymerizer for the bitcoin blockchain using bitcoind v29. Supports the +Depolymerizer for the bitcoin blockchain using bitcoind v31. Supports the [Taler Wire Gateway API](https://docs.taler.net/core/api-bank-wire.html). ## Project structure diff --git a/contrib/ci/Containerfile b/contrib/ci/Containerfile @@ -15,7 +15,8 @@ RUN apt-get update -yq && \ make \ postgresql \ debhelper \ - build-essential && \ + build-essential \ + git-buildpackage && \ # Install rustup toolchain rustup default stable diff --git a/contrib/ci/jobs/3-deb-amd64/job.sh b/contrib/ci/jobs/3-deb-amd64/job.sh @@ -4,11 +4,11 @@ set -exuo pipefail # Update system apt-get update -yq apt-get upgrade -yq -cargo install cargo-deb # Build package export VERSION="$(./contrib/ci/version.sh)" echo "Building package version ${VERSION}" +EMAIL=none gbp dch --dch-opt=-b --ignore-branch --debian-tag="%(version)s" --git-author --new-version="${VERSION}" make deb # Test package diff --git a/debian/etc/depolymerizer-bitcoin/conf.d/depolymerizer-bitcoin-httpd.conf b/debian/etc/depolymerizer-bitcoin/conf.d/depolymerizer-bitcoin-httpd.conf @@ -1,5 +1,13 @@ # Configuration the bitcoin depolymerizer worker REST API. +[depolymerizer-bitcoin-httpd] +SERVE = systemd + [depolymerizer-bitcoin-httpd-wire-gateway-api] # ENABLED = YES -@inline-secret@ depolymerizer-bitcoin-httpd-wire-gateway-api ../secrets/depolymerizer-bitcoin-httpd.secret.conf -\ No newline at end of file +@inline-secret@ depolymerizer-bitcoin-httpd-wire-gateway-api ../secrets/depolymerizer-bitcoin-httpd.secret.conf + +[depolymerizer-bitcoin-httpd-revenue-api] +# ENABLED = YES +@inline-secret@ depolymerizer-bitcoin-httpd-revenue-api ../secrets/depolymerizer-bitcoin-httpd.secret.conf + diff --git a/debian/etc/depolymerizer-bitcoin/conf.d/depolymerizer-bitcoin-worker.conf b/debian/etc/depolymerizer-bitcoin/conf.d/depolymerizer-bitcoin-worker.conf @@ -1,6 +1,9 @@ # Configuration the bitcoin depolymerizer worker. [depolymerizer-bitcoin-worker] -RPC_COOKIE_FILE = /var/lib/bitcoind/.cookie +# Name of the wallet to sync WALLET_NAME = + +RPC_COOKIE_FILE = /var/lib/bitcoind/.cookie + @inline-secret@ depolymerizer-bitcoin-worker ../secrets/depolymerizer-bitcoin-worker.secret.conf \ No newline at end of file diff --git a/debian/etc/depolymerizer-bitcoin/depolymerizer-bitcoin.conf b/debian/etc/depolymerizer-bitcoin/depolymerizer-bitcoin.conf @@ -26,6 +26,11 @@ @inline@ overrides.conf [depolymerizer-bitcoin] -CURRENCY = +# Adapter currency +CURRENCY = BTC + +# Bitcoin wallet address to advertise WALLET = + +# Legal entity that is associated with the Bitcoin wallet NAME = \ No newline at end of file diff --git a/debian/etc/depolymerizer-bitcoin/secrets/depolymerizer-bitcoin-httpd.secret.conf b/debian/etc/depolymerizer-bitcoin/secrets/depolymerizer-bitcoin-httpd.secret.conf @@ -1,3 +1,7 @@ [depolymerizer-bitcoin-httpd-wire-gateway-api] # AUTH_METHOD = bearer +# TOKEN = + +[depolymerizer-bitcoin-httpd-revenue-api] +# AUTH_METHOD = bearer # TOKEN = \ No newline at end of file diff --git a/depolymerizer-bitcoin/Cargo.toml b/depolymerizer-bitcoin/Cargo.toml @@ -13,7 +13,7 @@ license-file.workspace = true fail = [] [dependencies] -bech32 = "0.11.0" +bech32 = "0.12.0" serde_repr = "0.1.16" depolymerizer-common.workspace = true bitcoin.workspace = true @@ -33,6 +33,8 @@ axum.workspace = true base64.workspace = true rand.workspace = true url.workspace = true +jiff.workspace = true +compact_str.workspace = true [dev-dependencies] criterion.workspace = true @@ -40,67 +42,4 @@ taler-test-utils.workspace = true [[bench]] name = "metadata" -harness = false - - -[package.metadata.deb] -name = "depolymerizer-bitcoin" -priority = "optional" -section = "net" -maintainer = "Taler Systems SA <deb@taler.net>" -maintainer-scripts = "../debian/" -systemd-units = [ - { unit-name = "depolymerizer-bitcoin", enable = false, start = false, stop-on-upgrade = false }, - { unit-name = "depolymerizer-bitcoin-httpd", enable = false, start = false, stop-on-upgrade = false }, - { unit-name = "depolymerizer-bitcoin-worker", enable = false, start = false, stop-on-upgrade = false }, - { unit-name = "depolymerizer-bitcoin-node", enable = false, start = false, stop-on-upgrade = false }, -] -recommends = ["nginx | apache2 | httpd", "postgresql (>= 15.0)"] -assets = [ - # Binary - [ - "target/release/depolymerizer-bitcoin", - "/usr/bin/", - "755", - ], - # Scripts - [ - "../contrib/depolymerizer-bitcoin-dbconfig", - "/usr/bin/", - "755", - ], - # Sql - [ - "../depolymerizer-common/db/versioning.sql", - "/usr/share/depolymerizer-bitcoin/sql/", - "644", - ], - [ - "db/depolymerizer-bitcoin*.sql", - "/usr/share/depolymerizer-bitcoin/sql/", - "644", - ], - # Default config - [ - "depolymerizer-bitcoin.conf", - "/usr/share/depolymerizer-bitcoin/config.d/", - "644", - ], - # Configs - [ - "../debian/etc/**/*", - "/etc", - "644", - ], - # Man pages - [ - "../doc/prebuilt/man/depolymerizer-bitcoin.1", - "/usr/share/man/man1/", - "644", - ], - [ - "../doc/prebuilt/man/depolymerizer-bitcoin.conf.5", - "/usr/share/man/man5/", - "644", - ], -] +harness = false +\ No newline at end of file diff --git a/depolymerizer-bitcoin/benches/metadata.rs b/depolymerizer-bitcoin/benches/metadata.rs @@ -1,7 +1,6 @@ -use bech32::Hrp; /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -14,15 +13,17 @@ use bech32::Hrp; You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ +use bech32::Hrp; use criterion::{Criterion, criterion_group, criterion_main}; use depolymerizer_bitcoin::segwit::{decode_segwit_msg, encode_segwit_key, rand_addresses}; +use depolymerizer_common::rand_slice; fn criterion_benchmark(c: &mut Criterion) { let mut group = c.benchmark_group("SegWit addresses"); let hrp = Hrp::parse("bench").unwrap(); group.bench_function("encode", |b| { b.iter_batched( - || rand_slice(), + rand_slice, |key| encode_segwit_key(hrp, &key), criterion::BatchSize::SmallInput, ); diff --git a/depolymerizer-bitcoin/db/depolymerizer-bitcoin-0001.sql b/depolymerizer-bitcoin/db/depolymerizer-bitcoin-0001.sql @@ -1,6 +1,6 @@ -- -- This file is part of TALER --- Copyright (C) 2025 Taler Systems SA +-- Copyright (C) 2025, 2026 Taler Systems SA -- -- TALER is free software; you can redistribute it and/or modify it under the -- terms of the GNU General Public License as published by the Free Software @@ -21,19 +21,6 @@ SET search_path TO depolymerizer_bitcoin; CREATE TYPE taler_amount AS (val INT8, frac INT4); COMMENT ON TYPE taler_amount IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; -CREATE TYPE debit_status AS ENUM( - 'requested', - 'sent' -); -COMMENT ON TYPE debit_status IS 'Status of an outgoing transaction'; - -CREATE TYPE bounce_status AS ENUM( - 'requested', - 'ignored', - 'sent' -); -COMMENT ON TYPE bounce_status IS 'Status of a bounce'; - CREATE TABLE state ( name TEXT NOT NULL PRIMARY KEY, value BYTEA NOT NULL @@ -41,34 +28,100 @@ CREATE TABLE state ( COMMENT ON TABLE state IS 'Key value state'; CREATE TABLE tx_in ( - id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - received INT8 NOT NULL, + tx_in_id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + received_at INT8 NOT NULL, amount taler_amount NOT NULL, - reserve_pub BYTEA NOT NULL UNIQUE CHECK (LENGTH(reserve_pub)=32), debit_acc TEXT NOT NULL, txid BYTEA UNIQUE CHECK (LENGTH(txid)=32) ); -COMMENT ON TABLE state IS 'Incoming transactions'; +COMMENT ON TABLE tx_in IS 'Incoming transactions'; CREATE TABLE tx_out ( - id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - created INT8 NOT NULL, + tx_out_id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + created_at INT8 NOT NULL, + amount taler_amount NOT NULL, + credit_acc TEXT NOT NULL, + txid BYTEA NOT NULL UNIQUE CHECK (LENGTH(txid)=32) +); +COMMENT ON TABLE tx_out IS 'Outgoing transactions'; + +CREATE TYPE incoming_type AS ENUM + ('reserve' ,'kyc', 'map'); +COMMENT ON TYPE incoming_type IS 'Types of incoming talerable transactions'; + +CREATE TABLE taler_in ( + tx_in_id INT8 PRIMARY KEY REFERENCES tx_in(tx_in_id) ON DELETE CASCADE, + type incoming_type NOT NULL, + metadata BYTEA NOT NULL CHECK (LENGTH(metadata)=32), + authorization_pub BYTEA CHECK (LENGTH(authorization_pub)=32), + authorization_sig BYTEA CHECK (LENGTH(authorization_sig)=64) +); +COMMENT ON TABLE tx_in IS 'Incoming talerable transactions'; + +CREATE UNIQUE INDEX taler_in_unique_reserve_pub ON taler_in (metadata) WHERE type = 'reserve'; + +CREATE TABLE taler_out( + tx_out_id INT8 PRIMARY KEY REFERENCES tx_out(tx_out_id) ON DELETE CASCADE, + wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32), + exchange_base_url TEXT NOT NULL, + metadata TEXT +); +COMMENT ON TABLE taler_out IS 'Outgoing talerable transactions'; + +CREATE TABLE prepared_in ( + type incoming_type NOT NULL, + account_pub BYTEA NOT NULL CHECK (LENGTH(account_pub)=32), + authorization_pub BYTEA UNIQUE NOT NULL CHECK (LENGTH(authorization_pub)=32), + authorization_sig BYTEA NOT NULL CHECK (LENGTH(authorization_sig)=64), + recurrent BOOLEAN NOT NULL, + registered_at INT8 NOT NULL, + tx_in_id INT8 UNIQUE REFERENCES tx_in(tx_in_id) ON DELETE CASCADE +); +COMMENT ON TABLE prepared_in IS 'Prepared incoming transaction'; +CREATE UNIQUE INDEX prepared_in_unique_reserve_pub + ON prepared_in (account_pub) WHERE type = 'reserve'; + +CREATE TABLE pending_recurrent_in( + tx_in_id INT8 NOT NULL UNIQUE REFERENCES tx_in(tx_in_id) ON DELETE CASCADE, + authorization_pub BYTEA NOT NULL REFERENCES prepared_in(authorization_pub) +); +CREATE INDEX pending_recurrent_inc_auth_pub + ON pending_recurrent_in (authorization_pub); +COMMENT ON TABLE pending_recurrent_in IS 'Pending recurrent incoming transaction'; + +CREATE TYPE debit_status AS ENUM( + 'requested', + 'sent', + 'confirmed' +); +COMMENT ON TYPE debit_status IS 'Status of an outgoing transaction'; + +CREATE TABLE transfer ( + transfer_id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, + txid BYTEA UNIQUE CHECK (LENGTH(txid)=32), + created_at INT8 NOT NULL, amount taler_amount NOT NULL, wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32), credit_acc TEXT NOT NULL, credit_name TEXT, exchange_url TEXT NOT NULL, - request_uid BYTEA UNIQUE CHECK (LENGTH(request_uid)=64), - status debit_status NOT NULL, - txid BYTEA UNIQUE CHECK (LENGTH(txid)=32) + request_uid BYTEA NOT NULL UNIQUE CHECK (LENGTH(request_uid)=64), + metadata TEXT, + status debit_status NOT NULL ); -COMMENT ON TABLE state IS 'Outgoing transactions'; +COMMENT ON TABLE transfer IS 'Wire Gateway transfers'; + +CREATE TYPE bounce_status AS ENUM( + 'requested', + 'ignored', + 'sent', + 'confirmed' +); +COMMENT ON TYPE bounce_status IS 'Status of a bounce'; -CREATE TABLE bounce ( - id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - bounced BYTEA UNIQUE NOT NULL, +CREATE TABLE bounced ( + tx_in_id INT8 NOT NULL UNIQUE REFERENCES tx_in(tx_in_id) ON DELETE SET NULL, txid BYTEA UNIQUE CHECK (LENGTH(txid)=32), - created INT8 NOT NULL, reason TEXT, status bounce_status NOT NULL ); diff --git a/depolymerizer-bitcoin/db/depolymerizer-bitcoin-procedures.sql b/depolymerizer-bitcoin/db/depolymerizer-bitcoin-procedures.sql @@ -1,6 +1,6 @@ -- -- This file is part of TALER --- Copyright (C) 2025 Taler Systems SA +-- Copyright (C) 2025, 2026 Taler Systems SA -- -- TALER is free software; you can redistribute it and/or modify it under the -- terms of the GNU General Public License as published by the Free Software @@ -45,6 +45,7 @@ CREATE FUNCTION taler_transfer( IN in_credit_name TEXT, IN in_request_uid BYTEA, IN in_wtid BYTEA, + IN in_metadata TEXT, IN in_now INT8, -- Error status OUT out_request_uid_reuse BOOLEAN, @@ -61,25 +62,24 @@ SELECT (amount != in_amount OR credit_name != in_credit_name OR exchange_url != in_exchange_base_url OR wtid != in_wtid) - ,id, created + ,transfer_id, created_at INTO out_request_uid_reuse, out_transfer_row_id, out_created_at - FROM tx_out + FROM transfer WHERE request_uid = in_request_uid; IF FOUND THEN - out_wtid_reuse=FALSE; RETURN; END IF; -out_request_uid_reuse=FALSE; --- Register exchange -INSERT INTO tx_out ( +-- Register a transfer operation +INSERT INTO transfer ( amount, exchange_url, credit_acc, credit_name, request_uid, wtid, - created, + metadata, + created_at, status ) VALUES ( in_amount, @@ -88,72 +88,161 @@ INSERT INTO tx_out ( in_credit_name, in_request_uid, in_wtid, + in_metadata, in_now, 'requested' ) ON CONFLICT (wtid) DO NOTHING - RETURNING id, created INTO out_transfer_row_id, out_created_at; + RETURNING transfer_id, created_at INTO out_transfer_row_id, out_created_at; out_wtid_reuse=NOT FOUND; IF out_wtid_reuse THEN RETURN; END IF; -- Notify new transaction -PERFORM pg_notify('taler_out', out_transfer_row_id || ''); +PERFORM pg_notify('transfer', out_transfer_row_id || ''); END $$; COMMENT ON FUNCTION taler_transfer IS 'Create an outgoing taler transaction and register it'; CREATE FUNCTION register_tx_in( + IN in_txid BYTEA, IN in_amount taler_amount, IN in_debit_acc TEXT, - IN in_reserve_pub BYTEA, - IN in_received INT8, - IN in_txid BYTEA, + IN in_received_at INT8, + IN in_type incoming_type, + IN in_metadata BYTEA, -- Error status OUT out_reserve_pub_reuse BOOLEAN, + OUT out_mapping_reuse BOOLEAN, + OUT out_unknown_mapping BOOLEAN, -- Success return OUT out_tx_row_id INT8, OUT out_valued_at INT8, - OUT out_new BOOLEAN + OUT out_new BOOLEAN, + OUT out_pending BOOLEAN ) LANGUAGE plpgsql AS $$ +DECLARE +local_authorization_pub BYTEA; +local_authorization_sig BYTEA; BEGIN --- Check for idempotence and conflict -SELECT (amount != in_amount OR debit_acc != in_debit_acc OR txid != in_txid), id, received - INTO out_reserve_pub_reuse, out_tx_row_id, out_valued_at - FROM tx_in - WHERE reserve_pub = in_reserve_pub; +out_pending=false; + +-- Check for idempotence, txid is a hash of the transaction data, if the txid match all info match +SELECT tx_in_id, received_at INTO out_tx_row_id, out_valued_at FROM tx_in WHERE txid = in_txid; out_new=NOT FOUND; -IF FOUND THEN +IF NOT out_new THEN + RETURN; +END IF; + +-- Resolve mapping logic +IF in_type = 'map' THEN + SELECT type, account_pub, authorization_pub, authorization_sig, + tx_in_id IS NOT NULL AND NOT recurrent, + tx_in_id IS NOT NULL AND recurrent + INTO in_type, in_metadata, local_authorization_pub, local_authorization_sig, out_mapping_reuse, out_pending + FROM prepared_in + WHERE authorization_pub = in_metadata; + out_unknown_mapping = NOT FOUND; + IF out_unknown_mapping OR out_mapping_reuse THEN + RETURN; + END IF; +END IF; + +-- Check conflict +out_reserve_pub_reuse=NOT out_pending AND in_type = 'reserve' AND EXISTS(SELECT FROM taler_in WHERE metadata = in_metadata AND type = 'reserve'); +IF out_reserve_pub_reuse THEN RETURN; END IF; -out_reserve_pub_reuse=false; -- Insert new incoming transaction INSERT INTO tx_in ( txid, amount, debit_acc, - reserve_pub, - received + received_at ) VALUES ( in_txid, in_amount, in_debit_acc, - in_reserve_pub, - in_received -) RETURNING id, received INTO out_tx_row_id, out_valued_at; -PERFORM pg_notify('taler_in', out_tx_row_id || ''); + in_received_at +) RETURNING tx_in_id, received_at INTO out_tx_row_id, out_valued_at; +-- Notify new incoming transaction registration +PERFORM pg_notify('tx_in', out_tx_row_id || ''); + +IF out_pending THEN + -- Delay talerable registration until mapping again + INSERT INTO pending_recurrent_in (tx_in_id, authorization_pub) + VALUES (out_tx_row_id, local_authorization_pub); +ELSIF in_type IS NOT NULL THEN + UPDATE prepared_in + SET tx_in_id = out_tx_row_id + WHERE (tx_in_id IS NULL AND account_pub = in_metadata) OR authorization_pub = local_authorization_pub; + -- Insert new incoming talerable tranreceived_atsaction + INSERT INTO taler_in ( + tx_in_id, + type, + metadata, + authorization_pub, + authorization_sig + ) VALUES ( + out_tx_row_id, + in_type, + in_metadata, + local_authorization_pub, + local_authorization_sig + ); + -- Notify new incoming talerable transaction registration + PERFORM pg_notify('taler_in', out_tx_row_id || ''); +END IF; END $$; COMMENT ON FUNCTION register_tx_in IS 'Register an incoming transaction idempotently'; + +CREATE FUNCTION register_bounce_tx_in( + IN in_txid BYTEA, + IN in_amount taler_amount, + IN in_debit_acc TEXT, + IN in_received_at INT8, + IN in_reason TEXT, + IN in_now INT8, + -- Success return + OUT out_tx_row_id INT8, + OUT out_tx_new BOOLEAN, + OUT out_bounce_row_id INT8, + OUT out_bounce_new BOOLEAN +) +LANGUAGE plpgsql AS $$ +BEGIN +-- Register incoming transaction idempotently +SELECT register_tx_in.out_tx_row_id, register_tx_in.out_new +INTO out_tx_row_id, out_tx_new +FROM register_tx_in(in_txid, in_amount, in_debit_acc, in_received_at, NULL, NULL); + +-- Register bounce +INSERT INTO bounced( + tx_in_id, + reason, + status +) VALUES ( + out_tx_row_id, + in_reason, + 'requested' +) ON CONFLICT (tx_in_id) DO NOTHING; +END $$; +COMMENT ON FUNCTION register_bounce_tx_in IS 'Register an incoming transaction and bounce it idempotently'; + CREATE FUNCTION sync_out( IN in_txid BYTEA, IN in_replace_txid BYTEA, IN in_amount taler_amount, - IN in_exchange_base_url TEXT, IN in_credit_acc TEXT, IN in_wtid BYTEA, - IN in_created INT8, + IN in_exchange_base_url TEXT, + IN in_metadata TEXT, + IN in_bounced_txid BYTEA, + IN in_created_at INT8, + IN in_now INT8, -- Success return + OUT out_tx_row_id INT8, OUT out_new BOOLEAN, OUT out_replaced BOOLEAN, OUT out_recovered BOOLEAN @@ -162,76 +251,192 @@ LANGUAGE plpgsql AS $$ DECLARE local_id INT8; BEGIN --- Check replacement -IF in_replace_txid IS NOT NULL THEN - UPDATE tx_out SET txid=in_txid, status='status' WHERE txid=in_replace_txid; - IF FOUND THEN - out_new=FALSE; - out_replaced=FALSE; - out_recovered=FALSE; +UPDATE tx_out SET txid=in_txid WHERE txid=in_replace_txid; +out_replaced=FOUND; +out_new=NOT EXISTS(SELECT FROM tx_out WHERE txid = in_txid); +IF NOT out_new THEN RETURN; - END IF; END IF; -out_replaced=FALSE; -out_new=NOT EXISTS(SELECT FROM tx_out WHERE wtid = in_wtid); -IF out_new THEN - INSERT INTO tx_out ( - amount, +-- Insert new outgoing transaction +INSERT INTO tx_out ( + amount, + credit_acc, + txid, + created_at +) VALUES ( + in_amount, + in_credit_acc, + in_txid, + in_created_at +) RETURNING tx_out_id INTO out_tx_row_id; +-- Notify new outgoing transaction registration +PERFORM pg_notify('tx_out', out_tx_row_id || ''); + +IF in_wtid IS NOT NULL THEN + -- Insert new outgoing talerable transaction + INSERT INTO taler_out ( + tx_out_id, wtid, - credit_acc, - exchange_url, - status, - txid, - created, - request_uid + exchange_base_url, + metadata ) VALUES ( - in_amount, + out_tx_row_id, in_wtid, - in_credit_acc, in_exchange_base_url, - 'sent', - in_txid, - in_created, - NULL - ) RETURNING id INTO local_id; - out_recovered=FALSE; - PERFORM pg_notify('taler_out', local_id || ''); -ELSE - UPDATE tx_out SET txid=in_txid, status='sent' WHERE status='requested' AND wtid=in_wtid; + in_metadata + ) ON CONFLICT (wtid) DO NOTHING; + IF FOUND THEN + -- Notify new outgoing talerable transaction registration + PERFORM pg_notify('taler_out', out_tx_row_id || ''); + END IF; + -- Update transfer state + UPDATE transfer SET status='confirmed',txid=in_txid WHERE wtid=in_wtid; out_recovered=FOUND; +ELSIF in_bounced_txid IS NOT NULL THEN + -- Update bounce state + IF NOT EXISTS(SELECT FROM bounced JOIN tx_in USING (tx_in_id) WHERE tx_in.txid=in_bounced_txid) THEN + INSERT INTO bounced ( + tx_in_id, + txid, + status + ) VALUES ( + (SELECT tx_in_id FROM tx_in WHERE txid=in_bounced_txid), + in_txid, + 'confirmed' + ); + out_recovered=TRUE; + ELSE + UPDATE bounced SET status='confirmed',txid=in_txid + FROM tx_in + WHERE bounced.tx_in_id=tx_in.tx_in_id AND tx_in.txid=in_bounced_txid; + out_recovered=FALSE; + END IF; END IF; END $$; COMMENT ON FUNCTION sync_out IS 'Sync a debit blockchain state with local state'; -CREATE FUNCTION sync_bounce( - IN in_txid BYTEA, - IN in_bounced BYTEA, - IN in_created INT8, - -- Success return - OUT out_new BOOLEAN, - OUT out_recovered BOOLEAN + +CREATE FUNCTION register_prepared_transfers ( + IN in_type incoming_type, + IN in_account_pub BYTEA, + IN in_authorization_pub BYTEA, + IN in_authorization_sig BYTEA, + IN in_recurrent BOOLEAN, + IN in_timestamp INT8, + -- Error status + OUT out_reserve_pub_reuse BOOLEAN ) LANGUAGE plpgsql AS $$ +DECLARE + talerable_tx INT8; + idempotent BOOLEAN; BEGIN -out_new=NOT EXISTS(SELECT FROM bounce WHERE bounced = in_bounced); - -IF out_new THEN - INSERT INTO bounce ( - created, - bounced, - txid, - status - ) VALUES ( - in_created, - in_bounced, - in_txid, - 'sent' - ); - out_recovered=false; -ELSE - UPDATE bounce SET txid=in_txid, status='sent' WHERE status!='sent' AND bounced=in_bounced; - out_recovered=FOUND; + +-- Check idempotency +SELECT type = in_type + AND account_pub = in_account_pub + AND recurrent = in_recurrent +INTO idempotent +FROM prepared_in +WHERE authorization_pub = in_authorization_pub; + +-- Check idempotency and delay garbage collection +IF FOUND AND idempotent THEN + UPDATE prepared_in + SET registered_at=in_timestamp + WHERE authorization_pub=in_authorization_pub; + RETURN; +END IF; + +-- Check reserve pub reuse +out_reserve_pub_reuse=in_type = 'reserve' AND ( + EXISTS(SELECT FROM taler_in WHERE metadata = in_account_pub AND type = 'reserve') + OR EXISTS(SELECT FROM prepared_in WHERE account_pub = in_account_pub AND type = 'reserve' AND authorization_pub != in_authorization_pub) +); +IF out_reserve_pub_reuse THEN + RETURN; +END IF; + +IF in_recurrent THEN + -- Finalize one pending right now + WITH moved_tx AS ( + DELETE FROM pending_recurrent_in + WHERE tx_in_id = ( + SELECT txid + FROM tx_in + JOIN tx_in USING (tx_in_id) + WHERE authorization_pub = in_authorization_pub + ORDER BY created_at ASC + LIMIT 1 + ) + RETURNING tx_in_id + ) + INSERT INTO taler_in (tx_in_id, type, metadata, authorization_pub, authorization_sig) + SELECT moved_tx.tx_in_id, in_type, in_account_pub, in_authorization_pub, in_authorization_sig + FROM moved_tx + RETURNING tx_in_id INTO talerable_tx; + IF talerable_tx IS NOT NULL THEN + PERFORM pg_notify('taler_in', talerable_tx::text); + END IF; +ELSE + -- Bounce all pending + WITH bounced AS ( + DELETE FROM pending_recurrent_in + WHERE authorization_pub = in_authorization_pub + RETURNING tx_in_id + ) + INSERT INTO bounced (tx_in_id, reason, status) + SELECT tx_in_id, 'cancelled mapping', 'requested' FROM bounced; END IF; + +-- Upsert registration +INSERT INTO prepared_in ( + type, + account_pub, + authorization_pub, + authorization_sig, + recurrent, + registered_at, + tx_in_id +) VALUES ( + in_type, + in_account_pub, + in_authorization_pub, + in_authorization_sig, + in_recurrent, + in_timestamp, + talerable_tx +) ON CONFLICT (authorization_pub) +DO UPDATE SET + type = EXCLUDED.type, + account_pub = EXCLUDED.account_pub, + recurrent = EXCLUDED.recurrent, + registered_at = EXCLUDED.registered_at, + tx_in_id = EXCLUDED.tx_in_id, + authorization_sig = EXCLUDED.authorization_sig; END $$; -COMMENT ON FUNCTION sync_bounce IS 'Sync a bounce blockchain state with local state'; -\ No newline at end of file + +CREATE FUNCTION delete_prepared_transfers ( + IN in_authorization_pub BYTEA, + IN in_timestamp INT8, + OUT out_found BOOLEAN +) +LANGUAGE plpgsql AS $$ +BEGIN + +-- Bounce all pending +WITH bounced AS ( + DELETE FROM pending_recurrent_in + WHERE authorization_pub = in_authorization_pub + RETURNING tx_in_id +) +INSERT INTO bounced (tx_in_id, reason, status) +SELECT tx_in_id, 'cancelled mapping', 'requested' FROM bounced; + +-- Delete registration +DELETE FROM prepared_in +WHERE authorization_pub = in_authorization_pub; +out_found = FOUND; + +END $$; +\ No newline at end of file diff --git a/depolymerizer-bitcoin/depolymerizer-bitcoin.conf b/depolymerizer-bitcoin/depolymerizer-bitcoin.conf @@ -1,8 +1,11 @@ [depolymerizer-bitcoin] +# Adapter currency +CURRENCY = + # Bitcoin wallet address to advertise WALLET = -# Legal entity that is associated with the Bitcoin wallet +# Legal entity that is associated with the Bitcoin wallet NAME = [depolymerizer-bitcoin-worker] @@ -12,10 +15,15 @@ WALLET_NAME = # Password of the encrypted wallet PASSWORD = +# Specify the account type and therefore the indexing behavior. +# This can either can be normal or exchange. +# Exchange accounts bounce invalid incoming Taler transactions. +ACCOUNT_TYPE = exchange + # Number of blocks to consider a transactions durable CONFIRMATION = 6 -# An additional fee to deduce from the bounced amount +# An additional fee to deduce from the bounced amount # BOUNCE_FEE = BTC:0 # Number of worker's loops before worker shutdown (0 means never) @@ -74,6 +82,21 @@ AUTH_METHOD = bearer # Token for bearer authentication scheme TOKEN = +[depolymerizer-bitcoin-httpd-revenue-api] +# Whether to serve the Revenue API +ENABLED = NO + +# Authentication scheme, this can either can be basic, bearer or none. +AUTH_METHOD = bearer + +# User name for basic authentication scheme +# USERNAME = + +# Password for basic authentication scheme +# PASSWORD = + +# Token for bearer authentication scheme +TOKEN = [depolymerizer-bitcoindb-postgres] # DB connection string diff --git a/depolymerizer-bitcoin/src/api.rs b/depolymerizer-bitcoin/src/api.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -25,45 +25,68 @@ use axum::{ middleware::Next, response::{IntoResponse as _, Response}, }; -use sqlx::{PgPool, Row, postgres::PgListener}; +use jiff::Timestamp; +use sqlx::{PgPool, postgres::PgListener}; use taler_api::{ - api::{TalerApi, wire::WireGateway}, - error::{ApiResult, failure, failure_status, not_implemented}, + api::{ + TalerApi, + prepared::{PreparedTransfer, simple_subject}, + revenue::Revenue, + wire::WireGateway, + }, + error::{ApiResult, failure_code, failure_status}, + subject::IncomingSubject, }; use taler_common::{ ExpoBackoffDecorr, - api_params::{History, Page}, - api_wire::{ - AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, - IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, - TransferState, TransferStatus, + api::{ + params::{History, Page}, + prepared::{RegistrationRequest, RegistrationResponse, SubjectFormat, Unregistration}, + revenue::RevenueIncomingHistory, + wire::{ + AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddMappedRequest, + IncomingHistory, OutgoingHistory, TransferList, TransferRequest, TransferResponse, + TransferState, TransferStatus, + }, }, error_code::ErrorCode, - types::{amount::Currency, payto::PaytoURI, timestamp::Timestamp}, + types::{ + amount::{Amount, Currency}, + payto::PaytoURI, + timestamp::TalerTimestamp, + }, }; use tokio::{sync::watch::Sender, time::sleep}; -use tracing::error; +use tracing::{debug, error, warn}; use crate::{ - db::{self}, - payto::FullBtcPayto, + db::{ + self, AddIncomingResult, RegistrationResult, TransferResult, get_status, + register_tx_in_admin, revenue_history, transfer, transfer_register, transfer_unregister, + }, + payto::{BtcPayto, FullBtcPayto}, }; pub struct ServerState { pool: PgPool, - payto: PaytoURI, + payto: FullBtcPayto, currency: Currency, status: AtomicBool, + in_channel: Sender<i64>, taler_in_channel: Sender<i64>, taler_out_channel: Sender<i64>, } pub async fn notification_listener( pool: PgPool, + in_channel: Sender<i64>, taler_in_channel: Sender<i64>, taler_out_channel: Sender<i64>, ) -> sqlx::Result<()> { taler_api::notification::notification_listener!(&pool, + "tx_in" => (row_id: i64) { + in_channel.send_replace(row_id); + }, "taler_in" => (row_id: i64) { taler_in_channel.send_replace(row_id); }, @@ -74,7 +97,8 @@ pub async fn notification_listener( } impl ServerState { - pub async fn start(pool: sqlx::PgPool, payto: PaytoURI, currency: Currency) -> Arc<Self> { + pub async fn start(pool: sqlx::PgPool, payto: FullBtcPayto, currency: Currency) -> Arc<Self> { + let in_channel = Sender::new(0); let taler_in_channel = Sender::new(0); let taler_out_channel = Sender::new(0); let tmp = Self { @@ -82,6 +106,7 @@ impl ServerState { payto, currency, status: AtomicBool::new(true), + in_channel: in_channel.clone(), taler_in_channel: taler_in_channel.clone(), taler_out_channel: taler_out_channel.clone(), }; @@ -89,6 +114,7 @@ impl ServerState { tokio::spawn(status_watcher(state.clone())); tokio::spawn(notification_listener( pool, + in_channel, taler_in_channel, taler_out_channel, )); @@ -97,29 +123,50 @@ impl ServerState { } impl TalerApi for ServerState { - fn currency(&self) -> &str { - self.currency.as_ref() + fn currency(&self) -> Currency { + self.currency } - fn implementation(&self) -> Option<&str> { - None + fn implementation(&self) -> &'static str { + "urn:net:taler:specs:depolymerizer-bitcoin:depolymerization" + } +} + +async fn add_incoming( + db: &PgPool, + amount: Amount, + debit_account: PaytoURI, + subject: &IncomingSubject, +) -> ApiResult<AddIncomingResponse> { + let debtor = FullBtcPayto::try_from(&debit_account)?; + match register_tx_in_admin(db, &amount, &debtor.0, &Timestamp::now(), subject).await? { + AddIncomingResult::Success { + row_id, valued_at, .. + } => Ok(AddIncomingResponse { + row_id, + timestamp: valued_at.into(), + }), + AddIncomingResult::ReservePubReuse => { + Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) + } + AddIncomingResult::MappingReuse => { + Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_REUSED)) + } + AddIncomingResult::UnknownMapping => { + Err(failure_code(ErrorCode::BANK_TRANSFER_MAPPING_UNKNOWN)) + } } } impl WireGateway for ServerState { async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { let creditor = FullBtcPayto::try_from(&req.credit_account)?; - - match db::transfer(&self.pool, &creditor, &req).await? { - db::TransferResult::Success(transfer_response) => Ok(transfer_response), - db::TransferResult::RequestUidReuse => Err(failure( - ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, - format!("Request UID {} already used", req.request_uid), - )), - db::TransferResult::WtidReuse => Err(failure( - ErrorCode::BANK_TRANSFER_WTID_REUSED, - format!("wtid {} already used", req.request_uid), - )), + match transfer(&self.pool, &creditor, &req).await? { + TransferResult::Success(transfer_response) => Ok(transfer_response), + TransferResult::RequestUidReuse => { + Err(failure_code(ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED)) + } + TransferResult::WtidReuse => Err(failure_code(ErrorCode::BANK_TRANSFER_WTID_REUSED)), } } @@ -131,7 +178,7 @@ impl WireGateway for ServerState { let transfers = db::transfer_page(&self.pool, &status, &params, &self.currency).await?; Ok(TransferList { transfers, - debit_account: self.payto.clone(), + debit_account: self.payto.as_uri(), }) } @@ -147,7 +194,7 @@ impl WireGateway for ServerState { }) .await?; Ok(OutgoingHistory { - debit_account: self.payto.clone(), + debit_account: self.payto.as_uri(), outgoing_transactions, }) } @@ -159,7 +206,7 @@ impl WireGateway for ServerState { }) .await?; Ok(IncomingHistory { - credit_account: self.payto.clone(), + credit_account: self.payto.as_uri(), incoming_transactions, }) } @@ -168,43 +215,93 @@ impl WireGateway for ServerState { &self, req: AddIncomingRequest, ) -> ApiResult<AddIncomingResponse> { - let debtor = FullBtcPayto::try_from(&req.debit_account)?; - let timestamp = Timestamp::now(); - match db::register_tx_in_admin( + add_incoming( &self.pool, - &req.amount, - &debtor.0, - &timestamp, - &req.reserve_pub, + req.amount, + req.debit_account, + &IncomingSubject::Reserve(req.reserve_pub), ) - .await? - { - db::AddIncomingResult::Success { - new: _, - row_id, - valued_at, - } => Ok(AddIncomingResponse { - timestamp: valued_at, - row_id, - }), - db::AddIncomingResult::ReservePubReuse => Err(failure( - ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, - "reserve_pub used already".to_owned(), - )), - } + .await } - async fn add_incoming_kyc(&self, _req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> { - Err(not_implemented( - "depolymerizer-bitcoin does not supports KYC", - )) + async fn add_incoming_kyc(&self, req: AddKycauthRequest) -> ApiResult<AddIncomingResponse> { + add_incoming( + &self.pool, + req.amount, + req.debit_account, + &IncomingSubject::Kyc(req.account_pub), + ) + .await + } + + async fn add_incoming_mapped(&self, req: AddMappedRequest) -> ApiResult<AddIncomingResponse> { + add_incoming( + &self.pool, + req.amount, + req.debit_account, + &IncomingSubject::Map(req.authorization_pub), + ) + .await } fn support_account_check(&self) -> bool { + // TODO we might be able to check this ? false } } +impl Revenue for ServerState { + async fn history(&self, params: History) -> ApiResult<RevenueIncomingHistory> { + Ok(RevenueIncomingHistory { + incoming_transactions: revenue_history(&self.pool, &params, &self.currency, || { + self.in_channel.subscribe() + }) + .await?, + credit_account: self.payto.as_uri(), + }) + } +} + +impl PreparedTransfer for ServerState { + // TODO bitcoin subject format + fn supported_formats(&self) -> &[SubjectFormat] { + &[SubjectFormat::SIMPLE] + } + + async fn registration(&self, req: RegistrationRequest) -> ApiResult<RegistrationResponse> { + let creditor = BtcPayto::try_from(&req.credit_account)?; + if creditor.0 != self.payto.0 { + return Err(failure_code(ErrorCode::BANK_UNKNOWN_CREDITOR)); + } + match transfer_register( + &self.pool, + req.r#type.into(), + &req.account_pub, + &req.authorization_pub, + &req.authorization_sig, + req.recurrent, + &Timestamp::now(), + ) + .await? + { + RegistrationResult::Success => ApiResult::Ok(RegistrationResponse { + subjects: vec![simple_subject(req)], + expiration: TalerTimestamp::Never, + }), + RegistrationResult::ReservePubReuse => { + ApiResult::Err(failure_code(ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT)) + } + RegistrationResult::SubjectReuse => { + ApiResult::Err(failure_code(ErrorCode::BANK_DERIVATION_REUSE)) + } + } + } + + async fn unregistration(&self, req: Unregistration) -> ApiResult<bool> { + Ok(transfer_unregister(&self.pool, &req.authorization_pub, &Timestamp::now()).await?) + } +} + pub async fn status_middleware( State(state): State<Arc<ServerState>>, request: Request, @@ -233,12 +330,17 @@ async fn status_watcher(state: Arc<ServerState>) { listener.listen("status").await?; loop { // Sync state - let row = sqlx::query("SELECT value FROM state WHERE name = 'status'") - .fetch_one(&state.pool) - .await?; - let status: &[u8] = row.try_get(0)?; - assert!(status.len() == 1 && status[0] < 2); - state.status.store(status[0] == 1, Ordering::SeqCst); + if let Some([status]) = get_status(&state.pool).await? { + assert!(status < 2); + if status == 1 { + debug!(target: "status-watcher", "Worker healthy"); + } else { + debug!(target: "status-watcher", "Worker down"); + } + state.status.store(status == 1, Ordering::SeqCst); + } else { + warn!(target: "status-watcher", "Status not setup"); + } // Wait for next notification listener.recv().await?; jitter.reset(); @@ -252,3 +354,122 @@ async fn status_watcher(state: Arc<ServerState>) { } } } + +#[cfg(test)] +pub mod test { + + use std::{str::FromStr, sync::LazyLock}; + + use axum::Router; + use jiff::Timestamp; + use sqlx::PgPool; + use taler_api::{api::TalerRouter as _, auth::AuthMethod, subject::OutgoingSubject}; + use taler_common::{ + api::wire::{TransferState, WireConfig}, + types::amount::{Currency, amount}, + }; + use taler_test_utils::{ + db::db_test_setup, + routine::{admin_add_incoming_routine, out_history_routine, transfer_routine}, + server::TestServer, + tasks, + }; + + use crate::{ + CONFIG_SOURCE, + api::ServerState, + db::{TxOutKind, sync_out, test::rand_tx_id}, + payto::FullBtcPayto, + }; + + pub static EXCHANGE: LazyLock<FullBtcPayto> = LazyLock::new(|| { + FullBtcPayto::from_str( + "payto://bitcoin/1FfmbHfnpaZjKFvyi1okTjJJusN455paPH?receiver-name=Exchange", + ) + .unwrap() + }); + + pub static CLIENT: LazyLock<FullBtcPayto> = LazyLock::new(|| { + FullBtcPayto::from_str( + "payto://bitcoin/1FfmbHfnpaZjKFvyi1okTjJJusN455paPH?receiver-name=Anonymous", + ) + .unwrap() + }); + + async fn setup() -> (Router, PgPool) { + let (_, pool) = db_test_setup(CONFIG_SOURCE).await; + let api = ServerState::start( + pool.clone(), + EXCHANGE.clone(), + Currency::from_str("BTC").unwrap(), + ) + .await; + let server = Router::new() + .wire_gateway(api.clone(), AuthMethod::None) + .prepared_transfer(api.clone()) + .revenue(api.clone(), AuthMethod::None) + .finalize(); + + (server, pool) + } + + #[tokio::test] + async fn config() { + let (server, _) = setup().await; + server + .get("/taler-wire-gateway/config") + .await + .assert_ok_json::<WireConfig>(); + } + + #[tokio::test] + async fn transfer() { + let (server, _) = setup().await; + transfer_routine( + &server.prefix("/taler-wire-gateway"), + TransferState::pending, + &CLIENT.as_uri(), + ) + .await; + } + + #[tokio::test] + async fn outgoing_history() { + let (server, db) = setup().await; + out_history_routine( + &server.prefix("/taler-wire-gateway"), + tasks!({ + let sub = &OutgoingSubject::rand(); + sync_out( + &db, + &rand_tx_id(), + None, + &amount("BTC:10"), + &EXCHANGE.0, + &TxOutKind::Talerable { + wtid: &sub.wtid, + url: &sub.exchange_base_url, + metadata: sub.metadata.as_deref(), + }, + &Timestamp::now(), + ) + .await + .unwrap(); + }), + tasks!(), + ) + .await; + } + + #[tokio::test] + async fn admin_add_incoming() { + let (server, _) = setup().await; + admin_add_incoming_routine( + &server.prefix("/taler-wire-gateway"), + &server.prefix("/taler-prepared-transfer"), + &CLIENT.as_uri(), + &EXCHANGE.as_uri(), + ) + .await; + } +} diff --git a/depolymerizer-bitcoin/src/bin/segwit-demo.rs b/depolymerizer-bitcoin/src/bin/segwit-demo.rs @@ -7,7 +7,7 @@ use depolymerizer_bitcoin::{ segwit::{decode_segwit_msg, encode_segwit_addr}, }; use depolymerizer_common::rand_slice; -use taler_common::types::base32; +use taler_common::encoding::base32; pub fn main() { let address = Address::from_str("tb1qhxrhccqexg0dv4nltgkuw4fg2ce7muplmjsn0v") @@ -22,7 +22,7 @@ pub fn main() { println!("Send {btc} BTC to {address} with reserve public key {reserve_pub}"); println!("\nⅡ - Generate fake segwit addresses"); - let decoded: [u8; 32] = base32::decode(reserve_pub.as_bytes()).unwrap(); + let decoded: [u8; 32] = base32::decode_static(reserve_pub.as_bytes()).unwrap(); println!("Decode reserve public key: 0x{}", hex::encode(&decoded[..])); let prefix: [u8; 4] = rand_slice(); println!("Generate random prefix 0x{}", hex::encode(prefix)); diff --git a/depolymerizer-bitcoin/src/cli.rs b/depolymerizer-bitcoin/src/cli.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -124,19 +124,18 @@ pub async fn run(cmd: Command, cfg: &Config) -> anyhow::Result<()> { let cfg = ServeCfg::parse(cfg)?; let api = ServerState::start(pool, cfg.payto, cfg.currency).await; let mut router = Router::new(); - if let Some(cfg) = cfg.wire_gateway { - router = router.wire_gateway(api.clone(), cfg.auth.method()); - } else { - panic!("lol") + router = router + .wire_gateway(api.clone(), cfg.auth.method()) + .prepared_transfer(api.clone()) + } + if let Some(cfg) = cfg.revenue { + router = router.revenue(api.clone(), cfg.auth.method()); } router - .layer(middleware::from_fn_with_state( - api.clone(), - status_middleware, - )) - .serve(cfg.serve, cfg.lifetime) + .layer(middleware::from_fn_with_state(api, status_middleware)) + .serve(&cfg.serve, cfg.lifetime) .await?; } } diff --git a/depolymerizer-bitcoin/src/config.rs b/depolymerizer-bitcoin/src/config.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -25,7 +25,7 @@ use taler_api::{ use taler_common::{ config::{Config, ValueErr}, map_config, - types::{amount::Currency, payto::PaytoURI}, + types::amount::Currency, }; use crate::{ @@ -40,9 +40,9 @@ pub fn parse_db_cfg(cfg: &Config) -> Result<DbCfg, ValueErr> { pub fn parse_account_payto(cfg: &Config) -> Result<FullBtcPayto, ValueErr> { let sect = cfg.section("depolymerizer-bitcoin"); let wallet: BtcWallet = sect.parse("bitcoin wallet address", "WALLET").require()?; - let name = sect.str("NAME").require()?; + let name = sect.cstr("NAME").require()?; - Ok(FullBtcPayto::new(wallet, name)) + Ok(FullBtcPayto::new(wallet, &name)) } #[derive(Debug, Clone)] @@ -52,9 +52,10 @@ pub enum RpcAuth { } pub struct ServeCfg { - pub payto: PaytoURI, + pub payto: FullBtcPayto, pub serve: Serve, pub wire_gateway: Option<ApiCfg>, + pub revenue: Option<ApiCfg>, pub currency: Currency, pub lifetime: Option<u32>, } @@ -63,22 +64,24 @@ impl ServeCfg { pub fn parse(cfg: &Config) -> Result<Self, ValueErr> { let payto = parse_account_payto(cfg)?; - let sect = cfg.section("depolymerizer-bitcoin-httpd"); + let s = cfg.section("depolymerizer-bitcoin-httpd"); - let lifetime = sect.number("LIFETIME").opt()?.filter(|it| *it != 0); + let lifetime = s.number("LIFETIME").opt()?.filter(|it| *it != 0); - let serve = Serve::parse(sect)?; + let serve = Serve::parse(&s)?; let wire_gateway = ApiCfg::parse(cfg.section("depolymerizer-bitcoin-httpd-wire-gateway-api"))?; + let revenue = ApiCfg::parse(cfg.section("depolymerizer-bitcoin-httpd-revenue-api"))?; let sect = cfg.section("depolymerizer-bitcoin"); Ok(Self { currency: sect.parse("currency", "CURRENCY").require()?, lifetime, - payto: payto.as_payto(), + payto, serve, wire_gateway, + revenue, }) } } @@ -86,8 +89,8 @@ impl ServeCfg { #[derive(Debug, Clone)] pub struct WorkerCfg { - pub confirmation: u32, - pub max_confirmation: u32, + pub conf: u32, + pub max_conf: u32, pub bounce_fee: Amount, pub lifetime: Option<u32>, pub bump_delay: Option<u32>, @@ -108,10 +111,10 @@ impl WorkerCfg { let confirmation = sect.number("CONFIRMATION").require()?; Ok(Self { - confirmation, - max_confirmation: confirmation * 2, + conf: confirmation, + max_conf: confirmation * 2, bounce_fee: sect - .amount("BOUNCE_FEE", currency.as_ref()) + .amount("BOUNCE_FEE", &currency) .opt()? .map(|it| taler_to_btc(&it)) .unwrap_or_default(), @@ -161,10 +164,10 @@ impl RpcCfg { "basic" => { let username = sect.str("RPC_USERNAME").require()?; let password = sect.str("RPC_PASSWORD").require()?; - Ok(RpcAuth::Basic(format!("{username}:{password}"))) + RpcAuth::Basic(format!("{username}:{password}")) }, "cookie" => { - Ok(RpcAuth::Cookie(sect.path("RPC_COOKIE_FILE").require()?)) + RpcAuth::Cookie(sect.path("RPC_COOKIE_FILE").require()?) } ) .require()?; diff --git a/depolymerizer-bitcoin/src/db.rs b/depolymerizer-bitcoin/src/db.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -14,25 +14,28 @@ TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use bitcoin::{Address, BlockHash}; -use bitcoin::{Txid, hashes::Hash}; -use depolymerizer_common::status::{BounceStatus, DebitStatus}; -use sqlx::{ - PgExecutor, PgPool, QueryBuilder, Row, - postgres::{PgListener, PgRow}, +use bitcoin::{Address, BlockHash, Txid, hashes::Hash}; +use compact_str::CompactString; +use depolymerizer_common::status::DebitStatus; +use jiff::Timestamp; +use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow}; +use taler_api::{ + db::{BindHelper as _, TypeHelper as _, history, page}, + serialized, + subject::IncomingSubject, }; -use taler_api::db::{BindHelper as _, TypeHelper as _, history, page}; use taler_common::{ - api_common::{EddsaPublicKey, SafeU64, ShortHashCode}, - api_params::{History, Page}, - api_wire::{ - IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, - TransferResponse, TransferState, TransferStatus, - }, - types::{ - amount::{Amount, Currency}, - timestamp::Timestamp, + api::{ + EddsaPublicKey, EddsaSignature, ShortHashCode, + params::{History, Page}, + revenue::RevenueIncomingBankTransaction, + wire::{ + IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, + TransferResponse, TransferState, TransferStatus, + }, }, + db::IncomingType, + types::amount::{Amount, Currency}, }; use tokio::sync::watch::Receiver; use url::Url; @@ -42,74 +45,70 @@ use crate::{ sql::{sql_addr, sql_btc_amount, sql_generic_payto, sql_payto}, }; -/// Lock the database for worker execution -pub async fn worker_lock<'a>(e: impl PgExecutor<'a>) -> sqlx::Result<bool> { - sqlx::query("SELECT pg_try_advisory_lock(42)") - .try_map(|r: PgRow| r.try_get(0)) - .fetch_one(e) - .await -} - /// Initialize the worker status -pub async fn init_status<'a>(e: impl PgExecutor<'a>) -> sqlx::Result<()> { +pub async fn init_status(db: &PgPool) -> sqlx::Result<()> { sqlx::query( "INSERT INTO state (name, value) VALUES ('status', $1) ON CONFLICT (name) DO NOTHING", ) .bind([1u8]) - .execute(e) + .execute(db) .await?; Ok(()) } +/// Get the worker status +pub async fn get_status(db: &PgPool) -> sqlx::Result<Option<[u8; 1]>> { + sqlx::query_scalar("SELECT value FROM state WHERE name = 'status'") + .fetch_optional(db) + .await +} + /// Update the worker status -pub async fn update_status(e: &mut PgListener, new_status: bool) -> sqlx::Result<()> { +pub async fn update_status(db: &mut PgConnection, new_status: bool) -> sqlx::Result<()> { sqlx::query("UPDATE state SET value=$1 WHERE name='status'") .bind([new_status as u8]) - .execute(&mut *e) + .execute(&mut *db) .await?; - sqlx::query("NOTIFY status").execute(e).await?; + sqlx::query("NOTIFY status").execute(db).await?; Ok(()) } /// Initialize the worker sync state -pub async fn init_sync_state<'a>( - e: impl PgExecutor<'a>, - hash: &BlockHash, - reset: bool, -) -> sqlx::Result<()> { +pub async fn init_sync_state(db: &PgPool, hash: &BlockHash, reset: bool) -> sqlx::Result<()> { sqlx::query(if reset { "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO UPDATE SET value=$1" } else { "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING" }) .bind(hash.as_byte_array()) - .execute(e) + .execute(db) .await?; Ok(()) } /// Get the current worker sync state -pub async fn get_sync_state<'a>(e: impl PgExecutor<'a>) -> sqlx::Result<BlockHash> { +pub async fn get_sync_state(db: &mut PgConnection) -> sqlx::Result<BlockHash> { sqlx::query("SELECT value FROM state WHERE name='last_hash'") .try_map(|r: PgRow| r.try_get_map(0, BlockHash::from_slice)) - .fetch_one(e) + .fetch_one(db) .await } /// Update the worker sync state if it hasn't changed yet -pub async fn swap_sync_state<'a>( - e: impl PgExecutor<'a>, +pub async fn swap_sync_state( + db: &mut PgConnection, from: &BlockHash, to: &BlockHash, ) -> sqlx::Result<()> { sqlx::query("UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2") .bind(to.as_byte_array()) .bind(from.as_byte_array()) - .execute(e) + .execute(db) .await?; Ok(()) } +#[derive(Debug)] pub enum TransferResult { Success(TransferResponse), RequestUidReuse, @@ -117,43 +116,45 @@ pub enum TransferResult { } /// Initiate a new Taler transfer idempotently -pub async fn transfer<'a>( - e: impl PgExecutor<'a>, +pub async fn transfer( + db: &PgPool, creditor: &FullBtcPayto, transfer: &TransferRequest, ) -> sqlx::Result<TransferResult> { - sqlx::query( - " + serialized!( + sqlx::query( + " SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at - FROM taler_transfer(($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8) + FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8) ", - ) - .bind_amount(&transfer.amount) - .bind(transfer.exchange_base_url.as_str()) - .bind(creditor.0.to_string()) - .bind(&creditor.name) - .bind(transfer.request_uid.as_slice()) - .bind(transfer.wtid.as_slice()) - .bind_timestamp(&Timestamp::now()) - .try_map(|r: PgRow| { - Ok(if r.try_get("out_request_uid_reuse")? { - TransferResult::RequestUidReuse - } else if r.try_get("out_wtid_reuse")? { - TransferResult::WtidReuse - } else { - TransferResult::Success(TransferResponse { - row_id: r.try_get_safeu64("out_transfer_row_id")?, - timestamp: r.try_get_timestamp("out_created_at")?, + ) + .bind(transfer.amount) + .bind(transfer.exchange_base_url.as_str()) + .bind(creditor.0.to_string()) + .bind(&creditor.name) + .bind(transfer.request_uid.as_slice()) + .bind(transfer.wtid.as_slice()) + .bind(transfer.metadata.as_deref()) + .bind_timestamp(&Timestamp::now()) + .try_map(|r: PgRow| { + Ok(if r.try_get_flag("out_request_uid_reuse")? { + TransferResult::RequestUidReuse + } else if r.try_get_flag("out_wtid_reuse")? { + TransferResult::WtidReuse + } else { + TransferResult::Success(TransferResponse { + row_id: r.try_get_u64("out_transfer_row_id")?, + timestamp: r.try_get_taler_timestamp("out_created_at")?, + }) }) }) - }) - .fetch_one(e) - .await + .fetch_one(db) + ) } /// Paginate initiated Taler transfers -pub async fn transfer_page<'a>( - e: impl PgExecutor<'a>, +pub async fn transfer_page( + db: &PgPool, status: &Option<TransferState>, params: &Page, currency: &Currency, @@ -162,7 +163,9 @@ pub async fn transfer_page<'a>( Some(s) => match s { TransferState::pending => Some(DebitStatus::requested), TransferState::success => Some(DebitStatus::sent), - TransferState::transient_failure | TransferState::permanent_failure => { + TransferState::transient_failure + | TransferState::permanent_failure + | TransferState::late_failure => { return Ok(Vec::new()); } }, @@ -170,21 +173,20 @@ pub async fn transfer_page<'a>( }; page( - e, - "id", + db, params, + "transfer_id", || { let mut sql = QueryBuilder::new( " SELECT - id, + transfer_id, status, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, credit_acc, credit_name, - created - FROM tx_out WHERE request_uid IS NOT NULL AND + created_at + FROM transfer WHERE ", ); if let Some(status) = status { @@ -194,14 +196,14 @@ pub async fn transfer_page<'a>( }, |r: PgRow| { Ok(TransferListStatus { - row_id: r.try_get_safeu64(0)?, + row_id: r.try_get_u64(0)?, status: match r.try_get(1)? { - DebitStatus::requested => TransferState::pending, - DebitStatus::sent => TransferState::success, + DebitStatus::requested | DebitStatus::sent => TransferState::pending, + DebitStatus::confirmed => TransferState::success, }, - amount: r.try_get_amount("amount", currency)?, - credit_account: sql_payto(&r, "credit_acc", "credit_name")?, - timestamp: r.try_get_timestamp("created")?, + amount: r.try_get_amount(2, currency)?, + credit_account: sql_payto(&r, 3, 4)?, + timestamp: r.try_get_taler_timestamp(5)?, }) }, ) @@ -209,42 +211,44 @@ pub async fn transfer_page<'a>( } /// Get a Taler transfer info -pub async fn transfer_by_id<'a>( - e: impl PgExecutor<'a>, +pub async fn transfer_by_id( + db: &PgPool, id: u64, currency: &Currency, ) -> sqlx::Result<Option<TransferStatus>> { - sqlx::query( - " + serialized!( + sqlx::query( + " SELECT status, - (amount).val as amount_val, - (amount).frac as amount_frac, + amount, exchange_url, wtid, credit_acc, credit_name, - created - FROM tx_out WHERE request_uid IS NOT NULL AND id = $1 + metadata, + created_at + FROM transfer WHERE transfer_id = $1 ", - ) - .bind(id as i64) - .try_map(|r: PgRow| { - Ok(TransferStatus { - status: match r.try_get(0)? { - DebitStatus::requested => TransferState::pending, - DebitStatus::sent => TransferState::success, - }, - status_msg: None, - amount: r.try_get_amount_i(1, currency)?, - origin_exchange_url: r.try_get(3)?, - wtid: r.try_get_base32(4)?, - credit_account: sql_payto(&r, 5, 6)?, - timestamp: r.try_get_timestamp(7)?, + ) + .bind(id as i64) + .try_map(|r: PgRow| { + Ok(TransferStatus { + status: match r.try_get(0)? { + DebitStatus::requested | DebitStatus::sent => TransferState::pending, + DebitStatus::confirmed => TransferState::success, + }, + status_msg: None, + amount: r.try_get_amount(1, currency)?, + origin_exchange_url: r.try_get(2)?, + wtid: r.try_get(3)?, + credit_account: sql_payto(&r, 4, 5)?, + metadata: r.try_get(6)?, + timestamp: r.try_get_taler_timestamp(7)?, + }) }) - }) - .fetch_optional(e) - .await + .fetch_optional(db) + ) } /// Fetch outgoing Taler transactions history @@ -256,21 +260,42 @@ pub async fn outgoing_history( ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { history( db, - "id", + "tx_out_id", params, listen, - || QueryBuilder::new( - "SELECT id, created, (amount).val, (amount).frac, wtid, credit_acc, credit_name, exchange_url FROM tx_out WHERE" - ), |r| { - Ok(OutgoingBankTransaction { - row_id: r.try_get_safeu64(0)?, - date: r.try_get_timestamp(1)?, - amount: r.try_get_amount_i(2, currency)?, - wtid: r.try_get_base32(4)?, - credit_account: sql_payto(&r, 5, 6)?, - exchange_base_url: r.try_get_url(7)?, - }) - }).await + || { + QueryBuilder::new( + " + SELECT + tx_out_id, + tx_out.created_at, + tx_out.amount, + taler_out.wtid, + tx_out.credit_acc, + transfer.credit_name, + taler_out.exchange_base_url, + taler_out.metadata + FROM tx_out + JOIN taler_out USING (tx_out_id) + LEFT JOIN transfer USING (txid) + WHERE + ", + ) + }, + |r| { + Ok(OutgoingBankTransaction { + row_id: r.try_get_u64(0)?, + date: r.try_get_taler_timestamp(1)?, + amount: r.try_get_amount(2, currency)?, + wtid: r.try_get(3)?, + credit_account: sql_payto(&r, 4, 5)?, + exchange_base_url: r.try_get_url(6)?, + debit_fee: None, // TODO we can actually get this information + metadata: r.try_get(7)?, + }) + }, + ) + .await } /// Fetch incoming Taler transactions history @@ -282,62 +307,143 @@ pub async fn incoming_history( ) -> sqlx::Result<Vec<IncomingBankTransaction>> { history( db, - "id", + "tx_in_id", params, listen, - || QueryBuilder::new( - "SELECT id, received, (amount).val, (amount).frac, reserve_pub, debit_acc FROM tx_in WHERE" - ), |r| { - Ok(IncomingBankTransaction::Reserve { - row_id: r.try_get_safeu64(0)?, - date: r.try_get_timestamp(1)?, - amount: r.try_get_amount_i(2, currency)?, - reserve_pub: r.try_get_base32(4)?, - debit_account: sql_generic_payto(&r, 5)?, - }) - }).await + || { + QueryBuilder::new( + " + SELECT + tx_in_id, + received_at, + amount, + debit_acc, + type, + metadata, + authorization_pub, + authorization_sig + FROM tx_in JOIN taler_in USING (tx_in_id) + WHERE + ", + ) + }, + |r| { + Ok(match r.try_get(4)? { + IncomingType::reserve => IncomingBankTransaction::Reserve { + row_id: r.try_get_u64(0)?, + date: r.try_get_taler_timestamp(1)?, + amount: r.try_get_amount(2, currency)?, + reserve_pub: r.try_get(5)?, + debit_account: sql_generic_payto(&r, 3)?, + credit_fee: None, // TODO store this + authorization_pub: r.try_get(6)?, + authorization_sig: r.try_get(7)?, + }, + IncomingType::kyc => IncomingBankTransaction::Kyc { + row_id: r.try_get_u64(0)?, + date: r.try_get_taler_timestamp(1)?, + amount: r.try_get_amount(2, currency)?, + account_pub: r.try_get(5)?, + debit_account: sql_generic_payto(&r, 3)?, + credit_fee: None, // TODO store this + authorization_pub: r.try_get(6)?, + authorization_sig: r.try_get(7)?, + }, + IncomingType::map => unimplemented!("MAP are never listed in the history"), + }) + }, + ) + .await +} + +/// Fetch incoming Taler transactions history +pub async fn revenue_history( + db: &PgPool, + params: &History, + currency: &Currency, + listen: impl FnOnce() -> Receiver<i64>, +) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { + history( + db, + "tx_in_id", + params, + listen, + || { + QueryBuilder::new( + " + SELECT + tx_in_id, + received_at, + amount, + debit_acc + FROM tx_in + WHERE + ", + ) + }, + |r| { + Ok(RevenueIncomingBankTransaction { + row_id: r.try_get_u64(0)?, + date: r.try_get_taler_timestamp(1)?, + amount: r.try_get_amount(2, currency)?, + debit_account: sql_generic_payto(&r, 3)?, + credit_fee: None, // TODO store this + subject: String::new(), + }) + }, + ) + .await } #[derive(Debug, PartialEq, Eq)] pub enum AddIncomingResult { Success { new: bool, - row_id: SafeU64, + pending: bool, + row_id: u64, valued_at: Timestamp, }, ReservePubReuse, + UnknownMapping, + MappingReuse, } /// Register a fake Taler credit -pub async fn register_tx_in_admin<'a>( - e: impl PgExecutor<'a>, +pub async fn register_tx_in_admin( + db: &PgPool, amount: &Amount, debit_acc: &Address, received: &Timestamp, - reserve_pub: &EddsaPublicKey, + metadata: &IncomingSubject, ) -> sqlx::Result<AddIncomingResult> { sqlx::query( " - SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new - FROM register_tx_in(($1, $2)::taler_amount, $3, $4, $5, NULL) + SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending + FROM register_tx_in(NULL, $1, $2, $3, $4, $5) ", ) - .bind_amount(amount) + .bind(amount) .bind(debit_acc.to_string()) - .bind(reserve_pub.as_slice()) .bind_timestamp(received) + .bind(metadata.ty()) + .bind(metadata.key()) .try_map(|r: PgRow| { - Ok(if r.try_get(0)? { + Ok(if r.try_get_flag(0)? { AddIncomingResult::ReservePubReuse + } else if r.try_get_flag(1)? { + AddIncomingResult::MappingReuse + } else if r.try_get_flag(2)? { + AddIncomingResult::UnknownMapping } else { AddIncomingResult::Success { - row_id: r.try_get_safeu64(1)?, - valued_at: r.try_get_timestamp(2)?, - new: r.try_get(3)?, + row_id: r.try_get_u64(3)?, + valued_at: r.try_get_timestamp(4)?, + new: r.try_get_flag(5)?, + pending: r.try_get_flag(6)? } }) }) - .fetch_one(e) + .fetch_one(db) .await } @@ -348,27 +454,33 @@ pub async fn register_tx_in<'a>( amount: &Amount, debit_acc: &Address, received: &Timestamp, - reserve_pub: &EddsaPublicKey, + subject: &Option<IncomingSubject>, ) -> sqlx::Result<AddIncomingResult> { sqlx::query( " - SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new - FROM register_tx_in(($1, $2)::taler_amount, $3, $4, $5, $6) + SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending + FROM register_tx_in($1, $2, $3, $4, $5, $6) ", ) - .bind_amount(amount) + .bind(txid.as_byte_array()) + .bind(amount) .bind(debit_acc.to_string()) - .bind(reserve_pub.as_slice()) .bind_timestamp(received) - .bind(txid.as_byte_array()) + .bind(subject.as_ref().map(|it| it.ty())) + .bind(subject.as_ref().map(|it| it.key())) .try_map(|r: PgRow| { - Ok(if r.try_get(0)? { + Ok(if r.try_get_flag(0)? { AddIncomingResult::ReservePubReuse + } else if r.try_get_flag(1)? { + AddIncomingResult::MappingReuse + } else if r.try_get_flag(2)? { + AddIncomingResult::UnknownMapping } else { AddIncomingResult::Success { - row_id: r.try_get_safeu64(1)?, - valued_at: r.try_get_timestamp(2)?, - new: r.try_get(3)?, + row_id: r.try_get_u64(3)?, + valued_at: r.try_get_timestamp(4)?, + new: r.try_get_flag(5)?, + pending: r.try_get_flag(6)?, } }) }) @@ -376,106 +488,151 @@ pub async fn register_tx_in<'a>( .await } -/// Update a transaction id after bumping it -pub async fn bump_tx_id<'a>( - e: impl PgExecutor<'a>, - from: &Txid, - to: &Txid, -) -> sqlx::Result<ShortHashCode> { - sqlx::query("UPDATE tx_out SET txid=$1 WHERE txid=$2 RETURNING wtid") - .bind(to.as_byte_array()) - .bind(from.as_byte_array()) - .try_map(|r: PgRow| r.try_get_base32(0)) - .fetch_one(e) - .await +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RegistrationResult { + Success, + ReservePubReuse, + SubjectReuse, } -/// Reset the state of a conflicted debit -pub async fn conflict_tx_out<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> { - Ok( - sqlx::query("UPDATE tx_out SET status=$1, txid=NULL where txid=$2") - .bind(DebitStatus::requested) - .bind(id.as_byte_array()) - .execute(e) - .await? - .rows_affected() - > 0, +pub async fn transfer_register( + db: &PgPool, + ty: IncomingType, + account_pub: &EddsaPublicKey, + auth_pub: &EddsaPublicKey, + auth_sig: &EddsaSignature, + recurrent: bool, + timestamp: &Timestamp, +) -> sqlx::Result<RegistrationResult> { + serialized!( + sqlx::query( + " + SELECT out_reserve_pub_reuse + FROM register_prepared_transfers ( + $1,$2,$3,$4,$5,$6 + ) + ", + ) + .bind(ty) + .bind(account_pub) + .bind(auth_pub) + .bind(auth_sig) + .bind(recurrent) + .bind_timestamp(timestamp) + .try_map(|r: PgRow| { + Ok(if r.try_get_flag(0)? { + RegistrationResult::ReservePubReuse + } else { + RegistrationResult::Success + }) + }) + .fetch_one(db) ) } -/// Reset the state of a conflicted bounce -pub async fn conflict_bounce<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> { - Ok( - sqlx::query("UPDATE bounce SET status=$1, txid=NULL where txid=$2") - .bind(BounceStatus::requested) - .bind(id.as_byte_array()) - .execute(e) - .await? - .rows_affected() - > 0, +pub async fn transfer_unregister( + db: &PgPool, + auth_pub: &EddsaPublicKey, + timestamp: &Timestamp, +) -> sqlx::Result<bool> { + serialized!( + sqlx::query_scalar("SELECT out_found FROM delete_prepared_transfers($1,$2)") + .bind(auth_pub) + .bind_timestamp(timestamp) + .fetch_one(db) ) } +/// Update a transaction id after bumping it +pub async fn bump_tx_id( + db: &mut PgConnection, + to: &Txid, + wtid: &ShortHashCode, +) -> sqlx::Result<()> { + sqlx::query("UPDATE transfer SET txid=$1 WHERE wtid=$2") + .bind(to.as_byte_array()) + .bind(wtid) + .execute(db) + .await?; + Ok(()) +} + /// Initiate a bounce -pub async fn bounce<'a>(e: impl PgExecutor<'a>, txid: &Txid, reason: &str) -> sqlx::Result<()> { - sqlx::query("INSERT INTO bounce (created, bounced, reason, status) VALUES ($1, $2, $3, 'requested') ON CONFLICT (bounced) DO NOTHING") - .bind_timestamp(&Timestamp::now()) +pub async fn bounce<'a>( + e: impl PgExecutor<'a>, + txid: &Txid, + amount: &Amount, + debit_acc: &Address, + received: &Timestamp, + reason: &str, +) -> sqlx::Result<()> { + sqlx::query("SELECT FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6)") .bind(txid.as_byte_array()) + .bind(amount) + .bind(debit_acc.to_string()) + .bind_timestamp(received) .bind(reason) + .bind_timestamp(&Timestamp::now()) .execute(e) .await?; Ok(()) } +#[derive(Debug, PartialEq, Eq)] pub enum ProblematicTx { - In { + Taler { txid: Txid, addr: Address, - reserve_pub: EddsaPublicKey, + ty: IncomingType, + metadata: EddsaPublicKey, }, Bounce { txid: Txid, - bounced: Txid, + bounced_in: Txid, + }, + Simple { + txid: Txid, }, } /// Handle transactions being removed during a reorganization pub async fn reorg<'a>(e: impl PgExecutor<'a>, ids: &[Txid]) -> sqlx::Result<Vec<ProblematicTx>> { - // A removed incoming transaction is a correctness issues in only two cases: - // - it is a confirmed credit registered in the database - // - it is an invalid transactions already bounced - // Those two cases can compromise bitcoin backing - // Removed outgoing transactions will be retried automatically by the node + // Any incoming transactions that is currently considered final ('confirmed') is a potential correctness issues + // Removed outgoing transactions will be retried automatically by the node/wallet and therefore + // do not mandate a full adapter stop + sqlx::query( " - SELECT tx_in.txid, NULL, debit_acc, tx_in.reserve_pub - FROM tx_in WHERE tx_in.txid = ANY($1) + SELECT txid, NULL, type, debit_acc, metadata + FROM tx_in JOIN taler_in USING (tx_in_id) WHERE txid = ANY($1) UNION ALL - SELECT bounce.txid, bounce.bounced, NULL, NULL - from bounce WHERE bounce.bounced = ANY($1); + SELECT tx_in.txid, bounced.txid, NULL, NULL, NULL + from tx_in JOIN bounced USING (tx_in_id) WHERE tx_in.txid = ANY($1) ", ) .bind(ids.iter().map(|it| it.as_byte_array()).collect::<Vec<_>>()) .try_map(|r: PgRow| { let txid = r.try_get_map(0, Txid::from_slice)?; - let check: Option<&[u8]> = r.try_get(1)?; - Ok(if check.is_some() { - ProblematicTx::Bounce { - txid, - bounced: r.try_get_map(1, Txid::from_slice)?, - } - } else { - ProblematicTx::In { - txid, - addr: sql_addr(&r, 2)?, - reserve_pub: r.try_get_base32(3)?, - } - }) + Ok( + if let Some(bounced_in) = r.try_get_opt_map(1, Txid::from_slice)? { + ProblematicTx::Bounce { txid, bounced_in } + } else if let Some(ty) = r.try_get(2)? { + ProblematicTx::Taler { + txid, + ty, + addr: sql_addr(&r, 3)?, + metadata: r.try_get(4)?, + } + } else { + ProblematicTx::Simple { txid } + }, + ) }) .fetch_all(e) .await } +#[derive(Debug)] pub enum SyncOutResult { New, Replaced, @@ -483,35 +640,65 @@ pub enum SyncOutResult { None, } +#[derive(Debug)] +pub enum TxOutKind<'a> { + Simple, + Bounce(Txid), + Talerable { + wtid: &'a ShortHashCode, + url: &'a Url, + metadata: Option<&'a str>, + }, +} + pub async fn sync_out<'a>( e: impl PgExecutor<'a>, txid: &Txid, replace_txid: Option<&Txid>, amount: &Amount, - exchange_url: &Url, credit_acc: &Address, - wtid: &ShortHashCode, + kind: &TxOutKind<'_>, created: &Timestamp, ) -> sqlx::Result<SyncOutResult> { - sqlx::query( + let query = sqlx::query( " SELECT out_replaced, out_recovered, out_new - FROM sync_out($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8) + FROM sync_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) ", ) .bind(txid.as_byte_array()) .bind(replace_txid.map(|it| it.as_byte_array())) - .bind_amount(amount) - .bind(exchange_url.to_string()) - .bind(credit_acc.to_string()) - .bind(wtid.as_slice()) + .bind(amount) + .bind(credit_acc.to_string()); + match kind { + TxOutKind::Simple => query + .bind(None::<&[u8]>) + .bind(None::<&str>) + .bind(None::<&str>) + .bind(None::<&[u8]>), + TxOutKind::Bounce(bounced) => query + .bind(None::<&[u8]>) + .bind(None::<&str>) + .bind(None::<&str>) + .bind(bounced.as_byte_array()), + TxOutKind::Talerable { + wtid, + url, + metadata, + } => query + .bind(wtid) + .bind(url.as_str()) + .bind(metadata) + .bind(None::<&[u8]>), + } .bind_timestamp(created) + .bind_timestamp(&Timestamp::now()) .try_map(|r: PgRow| { - Ok(if r.try_get(0)? { + Ok(if r.try_get_flag(0)? { SyncOutResult::Replaced - } else if r.try_get(1)? { + } else if r.try_get_flag(1)? { SyncOutResult::Recovered - } else if r.try_get(2)? { + } else if r.try_get_flag(2)? { SyncOutResult::New } else { SyncOutResult::None @@ -521,40 +708,81 @@ pub async fn sync_out<'a>( .await } -pub async fn pending_debit<'a>( +pub async fn pending_transfer<'a>( e: impl PgExecutor<'a>, currency: &Currency, -) -> sqlx::Result<Option<(i64, bitcoin::Amount, ShortHashCode, Address, Url)>> { +) -> sqlx::Result< + Option<( + i64, + bitcoin::Amount, + ShortHashCode, + Address, + Url, + Option<CompactString>, + )>, +> { sqlx::query( - "SELECT id, (amount).val, (amount).frac, wtid, credit_acc, exchange_url FROM tx_out WHERE status='requested' ORDER BY created LIMIT 1", + " + SELECT + transfer_id, + amount, + wtid, + credit_acc, + exchange_url, + metadata + FROM transfer + WHERE status='requested' + ORDER BY created_at LIMIT 1", ) .try_map(|r: PgRow| { Ok(( r.try_get(0)?, sql_btc_amount(&r, 1, currency)?, - r.try_get_base32(3)?, - sql_addr(&r, 4)?, - r.try_get_parse(5)? + r.try_get(2)?, + sql_addr(&r, 3)?, + r.try_get_parse(4)?, + r.try_get(5)?, )) }) .fetch_optional(e) .await } -pub async fn debit_sent<'a>(e: impl PgExecutor<'a>, id: i64, txid: &Txid) -> sqlx::Result<()> { - sqlx::query("UPDATE tx_out SET status='sent', txid=$1 WHERE id=$2") - .bind(txid.as_byte_array()) +/// Update transfer status to 'sent' and bind it to a txid +pub async fn transfer_sent<'a>(e: impl PgExecutor<'a>, id: i64, txid: &Txid) -> sqlx::Result<()> { + sqlx::query("UPDATE transfer SET status='sent', txid=$2 WHERE transfer_id=$1") .bind(id) + .bind(txid.as_byte_array()) .execute(e) .await?; Ok(()) } +/// Reset the state of a conflicted transfer +pub async fn transfer_conflict<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> { + Ok( + sqlx::query("UPDATE transfer SET status='requested',txid=NULL WHERE txid=$1") + .bind(id.as_byte_array()) + .execute(e) + .await? + .rows_affected() + > 0, + ) +} + pub async fn pending_bounce<'a>( e: impl PgExecutor<'a>, ) -> sqlx::Result<Option<(i64, Txid, Option<String>)>> { sqlx::query( - "SELECT id, bounced, reason FROM bounce WHERE status='requested' ORDER BY created LIMIT 1", + " + SELECT + tx_in_id, + tx_in.txid, + reason + FROM bounced + JOIN tx_in USING (tx_in_id) + WHERE status='requested' ORDER BY received_at LIMIT 1 + ", ) .try_map(|r: PgRow| { Ok(( @@ -567,51 +795,513 @@ pub async fn pending_bounce<'a>( .await } -pub async fn bounce_set_status<'a>( - e: impl PgExecutor<'a>, - id: i64, - txid: Option<&Txid>, - status: &BounceStatus, -) -> sqlx::Result<()> { - sqlx::query("UPDATE bounce SET txid=$1, status=$2 WHERE id=$3") - .bind(txid.map(|it| it.as_byte_array())) - .bind(status) +/// Update bounce status to 'sent' and bind it to a txid +pub async fn bounce_sent<'a>(e: impl PgExecutor<'a>, id: i64, txid: &Txid) -> sqlx::Result<()> { + sqlx::query("UPDATE bounced SET status='sent', txid=$2 WHERE tx_in_id=$1") .bind(id) + .bind(txid.as_byte_array()) .execute(e) .await?; Ok(()) } +/// Reset the state of a conflicted bounce +pub async fn bounce_conflict<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> { + Ok( + sqlx::query("UPDATE bounced SET status='requested',txid=NULL where txid=$1") + .bind(id.as_byte_array()) + .execute(e) + .await? + .rows_affected() + > 0, + ) +} + pub enum SyncBounceResult { New, Recovered, None, } -pub async fn sync_bounce<'a>( - e: impl PgExecutor<'a>, - txid: &Txid, - bounced: &Txid, - created: &Timestamp, -) -> sqlx::Result<SyncBounceResult> { - sqlx::query( - " - SELECT out_recovered, out_new - FROM sync_bounce($1, $2, $3) - ", - ) - .bind(txid.as_byte_array()) - .bind(bounced.as_byte_array()) - .bind_timestamp(created) - .try_map(|r: PgRow| { - Ok(if r.try_get(0)? { - SyncBounceResult::Recovered - } else if r.try_get(1)? { - SyncBounceResult::New - } else { - SyncBounceResult::None - }) - }) - .fetch_one(e) - .await +#[cfg(test)] +pub mod test { + use std::{assert_matches, str::FromStr, sync::LazyLock}; + + use bitcoin::{ + Address, BlockHash, Txid, + address::NetworkUnchecked, + hashes::{Hash as _, sha256d::Hash}, + }; + use jiff::Span; + use sqlx::{PgPool, Postgres, pool::PoolConnection, postgres::PgRow}; + use taler_api::{db::TypeHelper as _, notification::dummy_listen, subject::IncomingSubject}; + use taler_common::{ + api::{EddsaPublicKey, HashCode, ShortHashCode, params::History, wire::TransferRequest}, + types::{ + amount::{Currency, amount}, + url, + utils::now_sql_stable_ts, + }, + }; + + use crate::{ + CONFIG_SOURCE, + api::test::CLIENT, + db::{ + AddIncomingResult, ProblematicTx, SyncOutResult, TransferResult, TxOutKind, bounce, + bounce_sent, bump_tx_id, get_sync_state, incoming_history, init_status, + init_sync_state, pending_bounce, register_tx_in, register_tx_in_admin, reorg, + revenue_history, swap_sync_state, sync_out, transfer, update_status, + }, + }; + + pub const CURR: Currency = Currency::TEST; + + async fn setup() -> (PoolConnection<Postgres>, PgPool) { + taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await + } + + #[tokio::test] + async fn kv() { + let (mut db, pool) = setup().await; + + // Empty status + update_status(&mut db, false).await.unwrap(); + update_status(&mut db, true).await.unwrap(); + + // Init status + init_status(&pool).await.unwrap(); + update_status(&mut db, false).await.unwrap(); + update_status(&mut db, true).await.unwrap(); + + // Sync state + let first = BlockHash::from_raw_hash(Hash::all_zeros()); + let second = BlockHash::from_raw_hash(Hash::from_byte_array([ + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 1, + ])); + init_sync_state(&pool, &first, true).await.unwrap(); + assert_eq!(get_sync_state(&mut db).await.unwrap(), first); + init_sync_state(&pool, &second, false).await.unwrap(); + assert_eq!(get_sync_state(&mut db).await.unwrap(), first); + init_sync_state(&pool, &second, true).await.unwrap(); + assert_eq!(get_sync_state(&mut db).await.unwrap(), second); + swap_sync_state(&mut db, &second, &first).await.unwrap(); + assert_eq!(get_sync_state(&mut db).await.unwrap(), first); + swap_sync_state(&mut db, &second, &first).await.unwrap(); + assert_eq!(get_sync_state(&mut db).await.unwrap(), first); + } + + pub fn rand_tx_id() -> Txid { + Txid::from_byte_array(rand::random()) + } + + static ADDR: LazyLock<Address> = LazyLock::new(|| { + Address::<NetworkUnchecked>::from_str("bcrt1qpw3pjhtf9myl0qk9cxt54qt8qxu2mj955c7esx") + .unwrap() + .assume_checked() + }); + + #[tokio::test] + async fn tx_in() { + let (mut db, pool) = setup().await; + let amount = amount("KUDOS:10"); + + let mut routine = async |first: &Option<IncomingSubject>, + second: &Option<IncomingSubject>| { + let id = sqlx::query("SELECT count(*) + 1 FROM tx_in") + .try_map(|r: PgRow| r.try_get_u64(0)) + .fetch_one(&mut *db) + .await + .unwrap(); + let now = now_sql_stable_ts(); + let later = now + Span::new().hours(2); + let txid = rand_tx_id(); + // Insert + assert_eq!( + register_tx_in(&pool, &txid, &amount, &ADDR, &now, first) + .await + .unwrap(), + AddIncomingResult::Success { + new: true, + pending: false, + row_id: id, + valued_at: now, + } + ); + // Idempotent + assert_eq!( + register_tx_in(&pool, &txid, &amount, &ADDR, &later, first) + .await + .expect("register tx in"), + AddIncomingResult::Success { + new: false, + pending: false, + row_id: id, + valued_at: now + } + ); + // Many + assert_eq!( + register_tx_in(&pool, &rand_tx_id(), &amount, &ADDR, &later, second) + .await + .expect("register tx in"), + AddIncomingResult::Success { + new: true, + pending: false, + row_id: id + 1, + valued_at: later + } + ); + }; + + // Empty db + assert_eq!( + revenue_history(&pool, &History::default(), &CURR, dummy_listen) + .await + .unwrap(), + Vec::new() + ); + assert_eq!( + incoming_history(&pool, &History::default(), &CURR, dummy_listen) + .await + .unwrap(), + Vec::new() + ); + + // Regular transaction + routine(&None, &None).await; + + let first = EddsaPublicKey::rand(); + let second = EddsaPublicKey::rand(); + + // Reserve transaction + routine( + &Some(IncomingSubject::Reserve(first.clone())), + &Some(IncomingSubject::Reserve(second)), + ) + .await; + + // Kyc transaction + routine( + &Some(IncomingSubject::Kyc(first.clone())), + &Some(IncomingSubject::Kyc(first)), + ) + .await; + + // History + assert_eq!( + revenue_history(&pool, &History::default(), &CURR, dummy_listen) + .await + .unwrap() + .len(), + 6 + ); + assert_eq!( + incoming_history(&pool, &History::default(), &CURR, dummy_listen) + .await + .unwrap() + .len(), + 4 + ); + } + + #[tokio::test] + async fn tx_in_admin() { + let (_, pool) = setup().await; + + let amount = amount("KUDOS:10"); + + // Empty db + assert_eq!( + incoming_history(&pool, &History::default(), &CURR, dummy_listen) + .await + .unwrap(), + Vec::new() + ); + + let now = now_sql_stable_ts(); + let later = now + Span::new().hours(2); + // Insert + assert_eq!( + register_tx_in_admin( + &pool, + &amount, + &ADDR, + &now, + &IncomingSubject::Reserve(EddsaPublicKey::rand()) + ) + .await + .expect("register tx in"), + AddIncomingResult::Success { + new: true, + pending: false, + row_id: 1, + valued_at: now + } + ); + // Many + assert_eq!( + register_tx_in_admin( + &pool, + &amount, + &ADDR, + &later, + &IncomingSubject::Reserve(EddsaPublicKey::rand()) + ) + .await + .expect("register tx in"), + AddIncomingResult::Success { + new: true, + pending: false, + row_id: 2, + valued_at: later + } + ); + + // History + assert_eq!( + incoming_history(&pool, &History::default(), &CURR, dummy_listen) + .await + .unwrap() + .len(), + 2 + ); + } + + #[tokio::test] + async fn bounces() { + let (_, db) = setup().await; + let amount = amount("KUDOS:10"); + let now = now_sql_stable_ts(); + + // No bounces + assert_eq!(pending_bounce(&db).await.unwrap(), None); + bounce_sent(&db, 12, &rand_tx_id()).await.unwrap(); + + // Bounced + let bounced_txid = rand_tx_id(); + let bounce_txid = rand_tx_id(); + bounce(&db, &bounced_txid, &amount, &ADDR, &now, "invalid format") + .await + .unwrap(); + bounce(&db, &bounced_txid, &amount, &ADDR, &now, "invalid format") + .await + .unwrap(); + match pending_bounce(&db).await.unwrap() { + Some((id, txid, _)) if txid == bounced_txid => { + bounce_sent(&db, id, &txid).await.unwrap(); + bounce_sent(&db, id, &txid).await.unwrap(); + } + _ => unreachable!(), + } + assert_matches!( + sync_out( + &db, + &bounce_txid, + None, + &amount, + &ADDR, + &TxOutKind::Bounce(bounced_txid), + &now, + ) + .await + .unwrap(), + SyncOutResult::New + ); + assert_eq!(pending_bounce(&db).await.unwrap(), None); + assert_matches!( + sync_out( + &db, + &bounce_txid, + None, + &amount, + &ADDR, + &TxOutKind::Bounce(bounced_txid), + &now, + ) + .await + .unwrap(), + SyncOutResult::None + ); + + // Recovered + let bounced_txid = rand_tx_id(); + let bounce_txid = rand_tx_id(); + assert_matches!( + register_tx_in(&db, &bounced_txid, &amount, &ADDR, &now, &None) + .await + .expect("register tx in"), + AddIncomingResult::Success { + new: true, + pending: false, + .. + } + ); + assert_matches!( + sync_out( + &db, + &bounce_txid, + None, + &amount, + &ADDR, + &TxOutKind::Bounce(bounced_txid), + &now, + ) + .await + .unwrap(), + SyncOutResult::Recovered + ); + assert_matches!( + sync_out( + &db, + &bounce_txid, + None, + &amount, + &ADDR, + &TxOutKind::Bounce(bounced_txid), + &now, + ) + .await + .unwrap(), + SyncOutResult::None + ); + assert_eq!(pending_bounce(&db).await.unwrap(), None); + } + + #[tokio::test] + async fn sync_out_talerable_and_replace() { + let (mut db, poll) = setup().await; + let amount = amount("KUDOS:10"); + let now = now_sql_stable_ts(); + + // 1. Simple Sync Out + let txid = rand_tx_id(); + assert_matches!( + sync_out(&poll, &txid, None, &amount, &ADDR, &TxOutKind::Simple, &now,) + .await + .unwrap(), + SyncOutResult::New + ); + assert_matches!( + sync_out(&poll, &txid, None, &amount, &ADDR, &TxOutKind::Simple, &now,) + .await + .unwrap(), + SyncOutResult::None + ); + + // 2. Replace (Fee Bump) + assert_matches!( + sync_out( + &poll, + &rand_tx_id(), + Some(&txid), + &amount, + &ADDR, + &TxOutKind::Simple, + &now, + ) + .await + .unwrap(), + SyncOutResult::Replaced + ); + + // 3. Recover Talerable Transfer + let t = TransferRequest { + amount, + exchange_base_url: url("https://exchange.example.com"), + request_uid: HashCode::rand(), + wtid: ShortHashCode::rand(), + metadata: None, + credit_account: CLIENT.as_uri(), + }; + + // Create the pending transfer + assert_matches!( + transfer(&poll, &CLIENT, &t).await.unwrap(), + TransferResult::Success(_) + ); + + let txid = rand_tx_id(); + // Sync it out + assert_matches!( + sync_out( + &poll, + &txid, + None, + &amount, + &ADDR, + &TxOutKind::Talerable { + wtid: &t.wtid, + url: &t.exchange_base_url, + metadata: None, + }, + &now, + ) + .await + .unwrap(), + SyncOutResult::Recovered + ); + + // Bump fee + bump_tx_id(&mut db, &rand_tx_id(), &t.wtid).await.unwrap(); + } + + #[tokio::test] + async fn reorgs() { + let (_, pool) = setup().await; + let amount = amount("KUDOS:10"); + let now = now_sql_stable_ts(); + + // 1. Setup a normal incoming transaction (Credit) + let txid_normal = rand_tx_id(); + let reserve_pub = EddsaPublicKey::rand(); + register_tx_in( + &pool, + &txid_normal, + &amount, + &ADDR, + &now, + &Some(IncomingSubject::Reserve(reserve_pub.clone())), + ) + .await + .unwrap(); + + // 2. Setup a bounced transaction that was successfully synced out + let txid_bounced = rand_tx_id(); + let txid_bounce = rand_tx_id(); + bounce(&pool, &txid_bounced, &amount, &ADDR, &now, "bad data") + .await + .unwrap(); + sync_out( + &pool, + &txid_bounce, + None, + &amount, + &ADDR, + &TxOutKind::Bounce(txid_bounced), + &now, + ) + .await + .unwrap(); + + // 3. Trigger a Reorg dropping both the incoming reserve and the outgoing bounce + let problematic = reorg(&pool, &[txid_normal, txid_bounced]).await.unwrap(); + + assert_eq!( + problematic.as_slice(), + &[ + ProblematicTx::Taler { + txid: txid_normal, + addr: ADDR.clone(), + ty: taler_common::db::IncomingType::reserve, + metadata: reserve_pub + }, + ProblematicTx::Bounce { + txid: txid_bounced, + bounced_in: txid_bounce + } + ] + ); + } } diff --git a/depolymerizer-bitcoin/src/lib.rs b/depolymerizer-bitcoin/src/lib.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -19,7 +19,9 @@ use bitcoin::{Address, Amount, Network, Txid, hashes::hex::FromHex}; use rpc::{Category, Rpc, Transaction}; use rpc_utils::{segwit_min_amount, sender_address}; use segwit::{decode_segwit_msg, encode_segwit_key}; -use taler_common::{api_common::EddsaPublicKey, config::parser::ConfigSource}; +use taler_common::{api::EddsaPublicKey, config::parser::ConfigSource}; + +use crate::segwit::DecodeSegWitErr; pub mod api; pub mod cli; @@ -39,14 +41,6 @@ pub const CONFIG_SOURCE: ConfigSource = ConfigSource::simple("depolymerizer-bitc pub const DB_SCHEMA: &str = "depolymerizer_bitcoin"; #[derive(Debug, thiserror::Error)] -pub enum GetSegwitErr { - #[error(transparent)] - Decode(#[from] segwit::DecodeSegWitErr), - #[error(transparent)] - RPC(#[from] rpc::Error), -} - -#[derive(Debug, thiserror::Error)] pub enum GetOpReturnErr { #[error("Missing opreturn")] MissingOpReturn, @@ -85,7 +79,7 @@ impl Rpc { pub async fn get_tx_segwit_key( &mut self, id: &Txid, - ) -> Result<(Transaction, EddsaPublicKey), GetSegwitErr> { + ) -> Result<(Transaction, Result<EddsaPublicKey, DecodeSegWitErr>), rpc::Error> { let full = self.get_tx(id).await?; let addresses: Vec<String> = full @@ -100,9 +94,7 @@ impl Rpc { }) .collect(); - let metadata = decode_segwit_msg(&addresses)?; - - Ok((full, metadata)) + Ok((full, decode_segwit_msg(&addresses))) } /// Get detailed information about an in-wallet transaction and its op_return metadata diff --git a/depolymerizer-bitcoin/src/loops/watcher.rs b/depolymerizer-bitcoin/src/loops/watcher.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -19,9 +19,8 @@ use taler_common::ExpoBackoffDecorr; use tokio::time::sleep; use tracing::error; -use crate::{config::RpcCfg, rpc::rpc_common}; - use super::LoopResult; +use crate::{config::RpcCfg, rpc::rpc_common}; /// Wait for new block and notify arrival with postgreSQL notifications pub async fn watcher(rpc_cfg: RpcCfg, pool: PgPool) { diff --git a/depolymerizer-bitcoin/src/loops/worker.rs b/depolymerizer-bitcoin/src/loops/worker.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -16,27 +16,28 @@ use std::{fmt::Write, time::SystemTime}; use bitcoin::{Amount as BtcAmount, Txid, hashes::Hash}; -use depolymerizer_common::{metadata::OutMetadata, status::BounceStatus}; -use sqlx::{PgPool, postgres::PgListener}; -use taler_common::{ExpoBackoffDecorr, api_common::ShortHashCode, types::timestamp::Timestamp}; +use depolymerizer_common::metadata::OutMetadata; +use jiff::Timestamp; +use sqlx::{ + Acquire, Either, PgConnection, PgPool, + postgres::{PgAdvisoryLock, PgAdvisoryLockKey, PgListener}, +}; +use taler_api::subject::IncomingSubject; +use taler_common::{ExpoBackoffDecorr, api::ShortHashCode, db::IncomingType}; use tokio::time::sleep; use tracing::{debug, error, info, trace, warn}; -use url::Url; +use super::{LoopError, LoopResult, analysis::analysis}; use crate::{ - GetOpReturnErr, GetSegwitErr, + GetOpReturnErr, config::WorkerCfg, - db::{self, AddIncomingResult, SyncBounceResult, SyncOutResult, worker_lock}, + db::{self, AddIncomingResult, SyncOutResult, TxOutKind}, fail_point::fail_point, - rpc::{ - self, Category, ErrorCode, ListSinceBlock, ListTransaction, Rpc, Transaction, rpc_wallet, - }, + rpc::{self, Category, ErrorCode, ListSinceBlock, ListTransaction, Rpc, rpc_wallet}, rpc_utils::sender_address, taler_utils::btc_to_taler, }; -use super::{LoopError, LoopResult, analysis::analysis}; - pub async fn worker_loop(mut state: WorkerCfg, pool: PgPool) { let mut jitter = ExpoBackoffDecorr::default(); let mut lifetime = state.lifetime; @@ -49,21 +50,8 @@ pub async fn worker_loop(mut state: WorkerCfg, pool: PgPool) { let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg).await?; let db = &mut PgListener::connect_with(&pool).await?; - // It is not possible to atomically update the blockchain and the database. - // When we failed to sync the database and the blockchain state we rely on - // sync_chain to recover the lost updates. - // When this function is running concurrently, it not possible to known another - // execution has failed, and this can lead to a transaction being sent multiple time. - // To ensure only a single version of this function is running at a given time we rely - // on postgres advisory lock - - // Take the lock - if !worker_lock(&mut *db).await? { - return Err(LoopError::Concurrency); - } - // Listen to all channels - db.listen_all(["new_block", "taler_out"]).await?; + db.listen_all(["new_block", "transfer"]).await?; loop { // Wait for the next notification @@ -97,11 +85,26 @@ pub async fn worker_loop(mut state: WorkerCfg, pool: PgPool) { debug!(target: "worker", "syncing blockchain"); + let mut db = db.acquire().await?; + + // It is not possible to atomically update the blockchain and the database. + // When we failed to sync the database and the blockchain state we rely on + // sync_chain to recover the lost updates. + // When this function is running concurrently, it not possible to known another + // execution has failed, and this can lead to a transaction being sent multiple time. + // To ensure only a single version of this function is running at a given time we rely + // on postgres advisory lock + + // Take the lock + let lock = PgAdvisoryLock::with_key(PgAdvisoryLockKey::BigInt(42)); + let Either::Left(mut lock) = lock.try_acquire(&mut db).await? else { + return Err(LoopError::Concurrency); + }; + // Perform analysis - state.confirmation = - analysis(rpc, state.confirmation, state.max_confirmation).await?; + state.conf = analysis(rpc, state.conf, state.max_conf).await?; - worker_step(rpc, db, &mut state, &mut status).await?; + worker_step(rpc, lock.as_mut(), &mut state, &mut status).await?; skip_notification = false; jitter.reset(); @@ -114,13 +117,14 @@ pub async fn worker_loop(mut state: WorkerCfg, pool: PgPool) { // Bitcoin error codes are generic. We need to match the msg to get precise ones. Some errors // can resolve themselves when a new block is mined (new fees, new transactions). Our simple // approach is to wait for the next loop when an RPC error is caught to prevent endless logged errors. - skip_notification = matches!( - e, - LoopError::Rpc(rpc::Error::Transport(_)) - | LoopError::DB(_) - | LoopError::Injected(_) - | LoopError::Concurrency - ); + skip_notification = match e { + LoopError::DB(_) | LoopError::Injected(_) | LoopError::Concurrency => true, + LoopError::Rpc(e) => match e { + rpc::Error::Transport(_) | rpc::Error::Connect(_) => true, + rpc::Error::RPC { code, .. } => code == ErrorCode::RpcWalletError, + rpc::Error::Bitcoin(_) | rpc::Error::Json { .. } | rpc::Error::Null => false, + }, + }; sleep(jitter.backoff()).await; } else { return; @@ -133,7 +137,7 @@ pub async fn worker_transient(mut state: WorkerCfg, pool: PgPool) -> LoopResult< // Connect let rpc = &mut rpc_wallet(&state.rpc_cfg, &state.wallet_cfg).await?; - let db = &mut PgListener::connect_with(&pool).await?; + let mut db = pool.acquire().await?; // It is not possible to atomically update the blockchain and the database. // When we failed to sync the database and the blockchain state we rely on @@ -144,18 +148,19 @@ pub async fn worker_transient(mut state: WorkerCfg, pool: PgPool) -> LoopResult< // on postgres advisory lock // Take the lock - if !worker_lock(&mut *db).await? { + let lock = PgAdvisoryLock::with_key(PgAdvisoryLockKey::BigInt(42)); + let Either::Left(mut lock) = lock.try_acquire(&mut db).await? else { return Err(LoopError::Concurrency); - } + }; - worker_step(rpc, db, &mut state, &mut status).await?; + worker_step(rpc, lock.as_mut(), &mut state, &mut status).await?; Ok(()) } /// Synchronize local db with blockchain and perform transactions async fn worker_step( rpc: &mut Rpc, - db: &mut PgListener, + db: &mut PgConnection, state: &mut WorkerCfg, status: &mut bool, ) -> LoopResult<()> { @@ -167,11 +172,11 @@ async fn worker_step( while debit(db, rpc, state).await? {} // Bump stuck transactions - for id in stuck { - let bump = rpc.bump_fee(&id).await?; + for (txid, wtid) in stuck { + let bump = rpc.bump_fee(&txid).await?; fail_point("(injected) fail bump", 0.3)?; - let wtid = db::bump_tx_id(&mut *db, &id, &bump.txid).await?; - info!(target: "worker", ">> (bump) {wtid} replace {id} with {}", bump.txid); + db::bump_tx_id(&mut *db, &bump.txid, &wtid).await?; + info!(target: "worker", ">> (bump) {wtid} {txid} -> {}", bump.txid); } // Send requested bounce @@ -183,26 +188,29 @@ async fn worker_step( /// Parse new transactions, return stuck transactions if the database is up to date with the latest mined block async fn sync_chain( rpc: &mut Rpc, - db: &mut PgListener, + db: &mut PgConnection, state: &WorkerCfg, status: &mut bool, -) -> LoopResult<Option<Vec<Txid>>> { +) -> LoopResult<Option<Vec<(Txid, ShortHashCode)>>> { // Get stored last_hash let sync_state = db::get_sync_state(&mut *db).await?; - // Get the current confirmation delay - let conf_delay = state.confirmation; // Get all transactions made since this block let ListSinceBlock { - transactions, - removed, + mut transactions, + mut removed, lastblock, - } = rpc.list_since_block(Some(&sync_state), conf_delay).await?; + } = rpc.list_since_block(Some(&sync_state), state.conf).await?; + transactions.sort_unstable_by_key(|it| (it.confirmations, it.txid)); + transactions.dedup_by_key(|it| it.txid); + removed.sort_unstable_by_key(|it| (it.confirmations, it.txid)); + removed.dedup_by_key(|it| it.txid); // Check if a confirmed incoming transaction have been removed by a blockchain reorganization - let new_status = sync_chain_removed(removed, db, conf_delay as i32).await?; + let conflict = sync_chain_removed(removed, db, state.conf as i32).await?; - // Sync status with database + // Sync server status with database + let new_status = !conflict.stop_server(); if *status != new_status { db::update_status(db, new_status).await?; *status = new_status; @@ -210,7 +218,8 @@ async fn sync_chain( info!(target: "worker", "Recovered lost transactions"); } } - if !new_status { + + if conflict.stop_worker() { return Ok(None); } @@ -219,11 +228,13 @@ async fn sync_chain( for tx in transactions { match tx.category { Category::Send => { - if sync_chain_outgoing(&tx.txid, tx.confirmations, rpc, db, state).await? { - stuck.push(tx.txid); + if let Some(wtid) = + sync_chain_outgoing(&tx.txid, tx.confirmations, rpc, db, state).await? + { + stuck.push((tx.txid, wtid)); } } - Category::Receive if tx.confirmations >= conf_delay as i32 => { + Category::Receive if tx.confirmations >= state.conf as i32 => { sync_chain_incoming_confirmed(&tx.txid, rpc, db, state).await? } _ => { @@ -238,18 +249,29 @@ async fn sync_chain( Ok(Some(stuck)) } +#[derive(Debug, Clone, Copy)] +enum ReorgConflict { + BackingCompromised, + IncomingCompromised, + Ok, +} + +impl ReorgConflict { + pub fn stop_server(&self) -> bool { + matches!(self, Self::BackingCompromised) + } + + pub fn stop_worker(&self) -> bool { + matches!(self, Self::BackingCompromised | Self::IncomingCompromised) + } +} + /// Sync database with removed transactions, return false if bitcoin backing is compromised async fn sync_chain_removed( removed: Vec<ListTransaction>, - db: &mut PgListener, + db: &mut PgConnection, min_confirmations: i32, -) -> LoopResult<bool> { - // A removed incoming transaction is a correctness issues in only two cases: - // - it is a confirmed credit registered in the database - // - it is an invalid transactions already bounced - // Those two cases can compromise bitcoin backing - // Removed outgoing transactions will be retried automatically by the node - +) -> LoopResult<ReorgConflict> { let potential_problematic_ids: Vec<Txid> = removed .into_iter() .filter_map(|tx| { @@ -261,46 +283,71 @@ async fn sync_chain_removed( // Only keep incoming transaction that are not reconfirmed let problematic_tx = db::reorg(&mut *db, &potential_problematic_ids).await?; if problematic_tx.is_empty() { - return Ok(true); + return Ok(ReorgConflict::Ok); } - let mut buf = "The following transaction have been removed from the blockchain, bitcoin backing is compromised until the transaction reappear:".to_string(); - for tx in problematic_tx { + // Bitcoin backing can be compromised in only two cases: + // - a confirmed reserve + // - a confirmed bounced + + // TODO use partition_in_place when stable + let (compromise, problematic): (Vec<_>, Vec<_>) = + problematic_tx.iter().partition(|it| match it { + db::ProblematicTx::Taler { ty, .. } => *ty == IncomingType::reserve, + db::ProblematicTx::Bounce { .. } => true, + db::ProblematicTx::Simple { .. } => false, + }); + let mut buf = "The following transaction have been removed from the blockchain, ".to_string(); + let (txs, state) = if compromise.is_empty() { + buf.push_str("waiting until they reappear:"); + (problematic, ReorgConflict::IncomingCompromised) + } else { + buf.push_str("bitcoin backing is compromised until they reappear:"); + (compromise, ReorgConflict::BackingCompromised) + }; + for tx in txs { match tx { - db::ProblematicTx::In { + db::ProblematicTx::Taler { txid, addr, - reserve_pub, + ty, + metadata, } => { - write!(&mut buf, "\n\tcredit {reserve_pub} in {txid} from {addr}",).unwrap(); + write!(&mut buf, "\n\t{txid} {ty} {metadata} from {addr}",).unwrap(); } - db::ProblematicTx::Bounce { txid, bounced } => { - write!(&mut buf, "\n\tbounced {txid} in {bounced}").unwrap(); + db::ProblematicTx::Bounce { txid, bounced_in } => { + write!(&mut buf, "\n\t{txid} bounced in {bounced_in}").unwrap(); + } + db::ProblematicTx::Simple { txid } => { + write!(&mut buf, "\n\t{txid}").unwrap(); } } } error!(target: "worker", "{buf}"); - Ok(false) + Ok(state) } /// Sync database with an incoming confirmed transaction async fn sync_chain_incoming_confirmed( - id: &Txid, + txid: &Txid, rpc: &mut Rpc, - db: &mut PgListener, + db: &mut PgConnection, state: &WorkerCfg, ) -> Result<(), LoopError> { - match rpc.get_tx_segwit_key(id).await { - Ok((full, reserve_pub)) => { - // Store transactions in database - let debit_addr = sender_address(rpc, &full).await?; - let amount = btc_to_taler(&full.amount, &state.currency); + let (tx, metadata) = rpc.get_tx_segwit_key(txid).await?; + // Store transactions in database + let debit_addr = sender_address(rpc, &tx).await?; + let amount = btc_to_taler(&tx.amount, &state.currency); + let time = Timestamp::from_second(tx.time as i64).unwrap(); + let ty = IncomingType::reserve; + match metadata { + Ok(reserve_pub) => { match db::register_tx_in( &mut *db, - id, + txid, &amount, &debit_addr, - &Timestamp::from_sql_micros((full.time * 1000000) as i64).unwrap(), - &reserve_pub, + &Timestamp::from_second(tx.time as i64).unwrap(), + &Some(IncomingSubject::Reserve(reserve_pub.clone())), ) .await? { @@ -308,161 +355,169 @@ async fn sync_chain_incoming_confirmed( new, row_id: _, valued_at: _, + pending: _, } => { if new { - info!(target: "worker", "<< {amount} {reserve_pub} in {id} from {debit_addr}"); + info!(target: "worker", "<< {ty} {reserve_pub} {txid} {debit_addr} {amount}"); } } AddIncomingResult::ReservePubReuse => { - db::bounce(db, id, "reserve_pub reuse").await? + db::bounce(db, txid, &amount, &debit_addr, &time, "reserve_pub reuse").await? } + AddIncomingResult::UnknownMapping => todo!(), + AddIncomingResult::MappingReuse => todo!(), } } - Err(err) => match err { - GetSegwitErr::Decode(e) => db::bounce(db, id, &e.to_string()).await?, - GetSegwitErr::RPC(e) => return Err(e.into()), - }, - } - Ok(()) -} - -/// Sync database with a debit transaction, return true if stuck -async fn sync_chain_debit( - txid: &Txid, - full: &Transaction, - wtid: &ShortHashCode, - exchange_url: &Url, - db: &mut PgListener, - confirmations: i32, - state: &WorkerCfg, -) -> LoopResult<bool> { - let credit_addr = full.details[0].address.clone().unwrap().assume_checked(); - let amount = btc_to_taler(&full.amount, &state.currency); - - if confirmations < 0 { - // Handle conflicting tx - if full.replaced_by_txid.is_none() && db::conflict_tx_out(db, txid).await? { - warn!(target: "worker", ">> (conflict) {wtid} in {txid} to {credit_addr}"); - } - } else { - match db::sync_out( - db, - txid, - full.replaced_by_txid.as_ref(), - &amount, - exchange_url, - &credit_addr, - wtid, - &Timestamp::from_sql_micros((full.time * 1000000) as i64).unwrap(), - ) - .await? - { - SyncOutResult::New => { - warn!(target: "worker", ">> (onchain) {amount} {wtid} in {txid} to {credit_addr}"); - } - SyncOutResult::Replaced => { - info!( - target: "worker", - ">> (recovered) {wtid} replace {txid} with {}", - full.replaced_by_txid.unwrap() - ) - } - SyncOutResult::Recovered => { - warn!(target: "worker", ">> (recovered) {amount} {wtid} in {txid} to {credit_addr}") - } - SyncOutResult::None => {} - } - - // Check if stuck - if let Some(delay) = state.bump_delay - && confirmations == 0 - && full.replaced_by_txid.is_none() - { - let now = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap() - .as_secs(); - if now - full.time > delay as u64 { - return Ok(true); - } - } - } - Ok(false) -} - -/// Sync database with an outgoing bounce transaction -async fn sync_chain_bounce( - txid: &Txid, - bounced: &Txid, - db: &mut PgListener, - confirmations: i32, -) -> LoopResult<()> { - if confirmations < 0 { - // Handle conflicting tx - if db::conflict_bounce(db, txid).await? { - warn!(target: "worker", "|| (conflict) {bounced} in {txid}"); - } - } else { - match db::sync_bounce(db, txid, bounced, &Timestamp::now()).await? { - SyncBounceResult::New => warn!(target: "worker", "|| (onchain) {bounced} in {txid}"), - SyncBounceResult::Recovered => { - warn!(target: "worker", "|| (recovered) {bounced} in {txid}") - } - SyncBounceResult::None => {} - } + Err(e) => db::bounce(db, txid, &amount, &debit_addr, &time, &e.to_string()).await?, } - Ok(()) } /// Sync database with an outgoing transaction, return true if stuck async fn sync_chain_outgoing( - id: &Txid, + txid: &Txid, confirmations: i32, rpc: &mut Rpc, - db: &mut PgListener, + db: &mut PgConnection, state: &WorkerCfg, -) -> LoopResult<bool> { +) -> LoopResult<Option<ShortHashCode>> { match rpc - .get_tx_op_return(id) + .get_tx_op_return(txid) .await - .map(|(full, bytes)| (full, OutMetadata::decode(&bytes))) + .map(|(tx, bytes)| (tx, OutMetadata::decode(&bytes))) { - Ok((full, Ok(info))) => match info { - OutMetadata::Debit { wtid, url } => { - return sync_chain_debit(id, &full, &wtid, &url, db, confirmations, state).await; - } - OutMetadata::Bounce { bounced } => { - sync_chain_bounce(id, &Txid::from_byte_array(bounced), db, confirmations).await? + Ok((tx, Ok(info))) => { + let credit_addr = tx.details[0].address.clone().unwrap().assume_checked(); + let amount = btc_to_taler(&tx.amount, &state.currency); + let created_at = Timestamp::from_second(tx.time as i64).unwrap(); + match info { + OutMetadata::Debit { + wtid, + url, + metadata, + } => { + if confirmations < 0 { + // Handle conflicting tx + if tx.replaced_by_txid.is_none() && db::transfer_conflict(db, txid).await? { + warn!(target: "worker", ">> (conflict) {wtid} {txid} {credit_addr} {amount}"); + } + } else if confirmations > state.conf as i32 { + match db::sync_out( + db, + txid, + tx.replaced_by_txid.as_ref(), + &amount, + &credit_addr, + &TxOutKind::Talerable { + wtid: &wtid, + url: &url, + metadata: metadata.as_deref(), + }, + &created_at, + ) + .await? + { + SyncOutResult::New => { + info!(target: "worker", ">> (onchain) {wtid} {txid} {credit_addr} {amount}"); + } + SyncOutResult::Replaced => { + info!( + target: "worker", + ">> (recovered) {wtid} {txid} -> {} {credit_addr} {amount}", + tx.replaced_by_txid.unwrap() + ) + } + SyncOutResult::Recovered => { + warn!(target: "worker", ">> (recovered) {wtid} {txid} {credit_addr} {amount}") + } + SyncOutResult::None => {} + } + } else { + // TODO sync transfer sent ? + + // Check if stuck + if let Some(delay) = state.bump_delay + && confirmations == 0 + && tx.replaced_by_txid.is_none() + { + let now = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_secs(); + if now - tx.time > delay as u64 { + return Ok(Some(wtid)); + } + } + } + } + OutMetadata::Bounce { bounced } => { + let bounced = Txid::from_byte_array(bounced); + if confirmations < 0 { + // Handle conflicting tx + if db::bounce_conflict(db, txid).await? { + warn!(target: "worker", "|| (conflict) {bounced} {txid}"); + } + } else if confirmations > state.conf as i32 { + match db::sync_out( + db, + txid, + tx.replaces_txid.as_ref(), + &amount, + &credit_addr, + &TxOutKind::Bounce(bounced), + &created_at, + ) + .await? + { + SyncOutResult::New => { + info!(target: "worker", "|| (onchain) {bounced} {txid}") + } + SyncOutResult::Replaced => { + info!( + target: "worker", + "|| (recovered) {bounced} {txid} -> {}", + tx.replaced_by_txid.unwrap() + ) + } + SyncOutResult::Recovered => { + warn!(target: "worker", "|| (recovered) {bounced} {txid}") + } + SyncOutResult::None => {} + } + } else { + // TODO sync bounce sent ? + } + } } - }, - Ok((_, Err(e))) => warn!(target: "worker", "send: decode-info {id} - {e}"), + } + Ok((_, Err(e))) => warn!(target: "worker", "send: decode-info {txid} - {e}"), Err(e) => match e { GetOpReturnErr::MissingOpReturn => { /* Ignore */ } GetOpReturnErr::RPC(e) => return Err(e)?, }, } - Ok(false) + Ok(None) } /// Send a debit transaction on the blockchain, return false if no more requested transactions are found -async fn debit(db: &mut PgListener, rpc: &mut Rpc, state: &WorkerCfg) -> LoopResult<bool> { +async fn debit(db: &mut PgConnection, rpc: &mut Rpc, state: &WorkerCfg) -> LoopResult<bool> { // We rely on the advisory lock to ensure we are the only one sending transactions - if let Some((id, amount, wtid, addr, url)) = - db::pending_debit(&mut *db, &state.currency).await? + if let Some((id, amount, wtid, addr, url, metadata)) = + db::pending_transfer(&mut *db, &state.currency).await? { let metadata = OutMetadata::Debit { wtid: wtid.clone(), url, + metadata, }; let txid = rpc .send(&addr, &amount, Some(&metadata.encode().unwrap()), false) .await?; fail_point("(injected) fail debit", 0.3)?; - db::debit_sent(db, id, &txid).await?; + db::transfer_sent(db, id, &txid).await?; let amount = btc_to_taler(&amount.to_signed().unwrap(), &state.currency); - info!(target: "worker", ">> {amount} {wtid} in {txid} to {addr}"); + info!(target: "worker", ">> (sent) {wtid} {txid} {addr} {amount}"); Ok(true) } else { Ok(false) @@ -470,7 +525,7 @@ async fn debit(db: &mut PgListener, rpc: &mut Rpc, state: &WorkerCfg) -> LoopRes } /// Bounce a transaction on the blockchain, return false if no more requested transactions are found -async fn bounce(db: &mut PgListener, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> { +async fn bounce(db: &mut PgConnection, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResult<bool> { // We rely on the advisory lock to ensure we are the only one sending transactions if let Some((id, bounced, reason)) = db::pending_bounce(&mut *db).await? { let metadata = OutMetadata::Bounce { @@ -483,17 +538,17 @@ async fn bounce(db: &mut PgListener, rpc: &mut Rpc, fee: &BtcAmount) -> LoopResu { Ok(txid) => { fail_point("(injected) fail bounce", 0.3)?; - db::bounce_set_status(db, id, Some(&txid), &BounceStatus::sent).await?; + db::bounce_sent(db, id, &txid).await?; if let Some(reason) = reason { - info!(target: "worker", "|| {bounced} in {txid}: {reason}"); + info!(target: "worker", "|| (sent) {bounced} {txid}: {reason}"); } else { - info!(target: "worker", "|| {bounced} in {txid}"); + info!(target: "worker", "|| (sent) {bounced} {txid}"); } } Err(err) => match err { rpc::Error::RPC { code: ErrorCode::RpcWalletInsufficientFunds | ErrorCode::RpcWalletError, - msg: _, + .. } => { warn!(target: "worker", "{err}"); return Ok(false); diff --git a/depolymerizer-bitcoin/src/main.rs b/depolymerizer-bitcoin/src/main.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -23,7 +23,7 @@ use taler_common::taler_main; fn main() { let args = Args::parse(); - taler_main(CONFIG_SOURCE, args.common, |cfg| async move { - run(args.cmd, &cfg).await + taler_main(CONFIG_SOURCE, args.common, async |cfg| { + run(args.cmd, cfg).await }) } diff --git a/depolymerizer-bitcoin/src/payto.rs b/depolymerizer-bitcoin/src/payto.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -29,7 +29,7 @@ const BITCOIN: &str = "bitcoin"; pub struct MissingAddr; impl PaytoImpl for BtcWallet { - fn as_payto(&self) -> PaytoURI { + fn as_uri(&self) -> PaytoURI { PaytoURI::from_parts(BITCOIN, format_args!("/{}", self.0)) } @@ -38,16 +38,17 @@ impl PaytoImpl for BtcWallet { if url.domain() != Some(BITCOIN) { return Err(PaytoErr::UnsupportedKind( BITCOIN, - url.domain().unwrap_or_default().to_owned(), + url.domain().unwrap_or_default().into(), )); } let Some(mut segments) = url.path_segments() else { - return Err(PaytoErr::custom(MissingAddr)); + return Err(PaytoErr::MissingSegment("wallet addr")); }; let Some(first) = segments.next() else { - return Err(PaytoErr::custom(MissingAddr)); + return Err(PaytoErr::MissingSegment("wallet addr")); }; - let address = Address::from_str(first).map_err(PaytoErr::custom)?; + let address = + Address::from_str(first).map_err(|e| PaytoErr::malformed_segment("wallet addr", e))?; Ok(Self(address.assume_checked())) } } diff --git a/depolymerizer-bitcoin/src/rpc.rs b/depolymerizer-bitcoin/src/rpc.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -25,10 +25,6 @@ //! //! bitcoincore RPC documentation: <https://bitcoincore.org/en/doc/29.0.0/> -use base64::Engine; -use base64::prelude::BASE64_STANDARD; -use bitcoin::{Address, Amount, BlockHash, SignedAmount, Txid, address::NetworkUnchecked}; -use serde_json::{Value, json}; use std::{ fmt::Debug, io::{ErrorKind, IoSlice, Write as _}, @@ -37,6 +33,11 @@ use std::{ str::FromStr as _, time::Duration, }; + +use base64::{Engine, prelude::BASE64_STANDARD}; +use bitcoin::{Address, Amount, BlockHash, SignedAmount, Txid, address::NetworkUnchecked}; +use compact_str::{CompactString, format_compact}; +use serde_json::{Value, json}; use tokio::{ io::{self, AsyncReadExt, AsyncWriteExt as _}, net::TcpStream, @@ -76,25 +77,32 @@ enum RpcResponse<T> { error: Option<RpcError>, id: u64, }, - Error(String), + Error(CompactString), } #[derive(Debug, serde::Deserialize)] struct RpcError { code: ErrorCode, - message: String, + message: CompactString, } #[derive(Debug, thiserror::Error)] pub enum Error { #[error("IO: {0:?}")] Transport(#[from] std::io::Error), - #[error("RPC: {code:?} - {msg}")] - RPC { code: ErrorCode, msg: String }, + #[error("RPC {method}: {code:?} - {msg}")] + RPC { + method: CompactString, + code: ErrorCode, + msg: CompactString, + }, #[error("BTC: {0}")] - Bitcoin(String), - #[error("JSON: {0}")] - Json(#[from] serde_json::Error), + Bitcoin(CompactString), + #[error("JSON {method}: {e}")] + Json { + method: CompactString, + e: serde_json::Error, + }, #[error("connect: {0}")] Connect(#[from] RpcConnectErr), #[error("Null rpc, no result or error")] @@ -113,21 +121,24 @@ fn expect_null(result: Result<()>) -> Result<()> { } pub struct JsonSocket { - path: String, - cookie: String, + path: CompactString, + cookie: CompactString, sock: TcpStream, buf: Vec<u8>, } impl JsonSocket { - async fn call<T>(&mut self, body: &impl serde::Serialize) -> Result<T> + async fn call<T>(&mut self, method: &str, body: &impl serde::Serialize) -> Result<T> where T: serde::de::DeserializeOwned, { let buf = &mut self.buf; let sock = &mut self.sock; buf.clear(); - serde_json::to_writer(&mut *buf, body)?; + serde_json::to_writer(&mut *buf, body).map_err(|e| Error::Json { + method: method.into(), + e, + })?; let body_len = buf.len(); // Write HTTP request @@ -149,7 +160,6 @@ impl JsonSocket { sock.flush().await?; // Skip response - buf.clear(); let header_pos = loop { let amount = sock.read_buf(buf).await?; @@ -166,7 +176,10 @@ impl JsonSocket { } }; // Read body - let response = serde_json::from_slice(&buf[header_pos + 4..])?; + let response = serde_json::from_slice(&buf[header_pos + 4..]).map_err(|e| Error::Json { + method: method.into(), + e, + })?; Ok(response) } } @@ -200,9 +213,9 @@ impl Rpc { async fn new(cfg: &RpcCfg, wallet: Option<&str>) -> std::result::Result<Self, RpcConnectErr> { let path = if let Some(wallet) = wallet { - format!("/wallet/{wallet}") + format_compact!("/wallet/{wallet}") } else { - String::from("/") + CompactString::const_new("/") }; let token = match &cfg.auth { @@ -230,7 +243,7 @@ impl Rpc { id: 0, socket: JsonSocket { path, - cookie: format!("Basic {}", BASE64_STANDARD.encode(&token)), + cookie: format_compact!("Basic {}", BASE64_STANDARD.encode(&token)), sock, buf: Vec::with_capacity(16 * 1024), }, @@ -247,7 +260,7 @@ impl Rpc { id: self.id, params, }; - let response: RpcResponse<T> = self.socket.call(&request).await?; + let response: RpcResponse<T> = self.socket.call(method, &request).await?; trace!("RPC < {response:?}"); match response { RpcResponse::RpcResponse { result, error, id } => { @@ -258,6 +271,7 @@ impl Rpc { } else { Err(match error { Some(err) => Error::RPC { + method: method.into(), code: err.code, msg: err.message, }, @@ -330,6 +344,11 @@ impl Rpc { self.call("getblockchaininfo", &EMPTY).await } + /// Get mempool info + pub async fn get_mempool_info(&mut self) -> Result<MemPoolInfo> { + self.call("getmempoolinfo", &EMPTY).await + } + /// Get blockchain info pub async fn get_wallet_info(&mut self) -> Result<WalletInfo> { self.call("getwalletinfo", &EMPTY).await @@ -448,8 +467,11 @@ impl Rpc { hash: Option<&BlockHash>, confirmation: u32, ) -> Result<ListSinceBlock> { - self.call("listsinceblock", &(hash, confirmation.max(1), (), true)) - .await + self.call( + "listsinceblock", + &(hash, confirmation.max(1), (), true, false), + ) + .await } /* ----- Cluster ----- */ @@ -490,15 +512,21 @@ pub struct BlockchainInfo { } #[derive(Clone, Debug, serde::Deserialize)] -pub struct WalletInfo { - pub walletname: String, - pub scanning: Option<Scanning>, +pub struct MemPoolInfo { + pub size: u32, } #[derive(Clone, Debug, serde::Deserialize)] -pub struct Scanning { - pub duration: u64, - pub progress: f32, +#[serde(untagged)] +pub enum Scanning { + Active { duration: u64, progress: f32 }, + Inactive(bool), +} + +#[derive(Clone, Debug, serde::Deserialize)] +pub struct WalletInfo { + pub walletname: String, + pub scanning: Scanning, } #[derive(Debug, serde::Deserialize)] @@ -547,7 +575,6 @@ pub struct ListTransaction { pub confirmations: i32, pub txid: Txid, pub category: Category, - pub vout: i32, } #[derive(Debug, serde::Deserialize)] @@ -708,6 +735,8 @@ pub enum ErrorCode { RpcWalletNotSpecified = -19, /// This same wallet is already loaded RpcWalletAlreadyLoaded = -35, + /// There is already a wallet with the same name + RpcWalletAlreadyExists = -36, /// Server is in safe mode, and command is not allowed in safe mode RpcForbiddenBySafeMode = -2, } diff --git a/depolymerizer-bitcoin/src/segwit.rs b/depolymerizer-bitcoin/src/segwit.rs @@ -13,11 +13,12 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ +use std::cmp::Ordering; + use bech32::Hrp; use depolymerizer_common::rand_slice; use rand::rngs::ThreadRng; -use std::cmp::Ordering; -use taler_common::api_common::EddsaPublicKey; +use taler_common::api::{EddsaPublicKey, EddsaPublicKeyError}; // TODO use segwit v1 to only use a single address @@ -60,7 +61,7 @@ pub fn encode_segwit_key(hrp: Hrp, msg: &[u8; 32]) -> [String; 2] { ] } -#[derive(Debug, Clone, thiserror::Error)] +#[derive(Debug, thiserror::Error)] pub enum DecodeSegWitErr { #[error("There is less than 2 segwit addresses")] MissingSegWitAddress, @@ -68,6 +69,8 @@ pub enum DecodeSegWitErr { NoPrefixMatch, #[error("More than two addresses are sharing a common prefix")] PrefixCollision, + #[error(transparent)] + Malformed(EddsaPublicKeyError), } /// Decode a 32B key into from addresses @@ -117,7 +120,7 @@ pub fn decode_segwit_msg( for (is_first, _, half) in matches { key[*is_first as usize * 16..][..16].copy_from_slice(half); } - Ok(EddsaPublicKey::from(key)) + EddsaPublicKey::try_from(key).map_err(DecodeSegWitErr::Malformed) } Ordering::Greater => Err(DecodeSegWitErr::PrefixCollision), Ordering::Less => Err(DecodeSegWitErr::MissingSegWitAddress), @@ -142,7 +145,7 @@ pub fn rand_addresses(hrp: Hrp, key: &[u8; 32]) -> Vec<String> { #[cfg(test)] mod test { use rand::{prelude::SliceRandom, rngs::ThreadRng}; - use taler_common::types::base32::Base32; + use taler_common::api::EddsaPublicKey; use crate::segwit::{decode_segwit_msg, encode_segwit_key, rand_addresses}; @@ -150,7 +153,7 @@ mod test { fn test_shuffle() { let mut rng = ThreadRng::default(); for _ in 0..1000 { - let key = Base32::rand(); + let key = EddsaPublicKey::rand(); let mut addresses = encode_segwit_key(bech32::hrp::TB, &key); addresses.shuffle(&mut rng); let decoded = @@ -163,7 +166,7 @@ mod test { #[test] fn test_shuffle_many() { for _ in 0..1000 { - let key = Base32::rand(); + let key = EddsaPublicKey::rand(); let addresses = rand_addresses(bech32::hrp::TB, &key); let decoded = decode_segwit_msg(&addresses.iter().map(|s| s.as_str()).collect::<Vec<&str>>()) diff --git a/depolymerizer-bitcoin/src/setup.rs b/depolymerizer-bitcoin/src/setup.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2025 Taler Systems SA + Copyright (C) 2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -24,7 +24,7 @@ use crate::{ config::{WorkerCfg, parse_db_cfg}, db, payto::BtcWallet, - rpc::Rpc, + rpc::{Rpc, Scanning}, }; pub async fn setup(cfg: &Config, reset: bool) -> anyhow::Result<()> { @@ -84,8 +84,8 @@ pub async fn setup(cfg: &Config, reset: bool) -> anyhow::Result<()> { } } else { let wallet = rpc.get_wallet_info().await?; - if let Some(sync) = wallet.scanning { - warn!(target: "setup", "worker wallet is scanning at {:.2}%", sync.progress) + if let Scanning::Active { progress, .. } = wallet.scanning { + warn!(target: "setup", "worker wallet is scanning at {progress:.2}%") } } diff --git a/depolymerizer-bitcoin/src/sql.rs b/depolymerizer-bitcoin/src/sql.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -15,8 +15,7 @@ */ use bitcoin::{Address, Amount as BtcAmount, Txid, address::NetworkUnchecked, hashes::Hash}; -use sqlx::Row; -use sqlx::postgres::PgRow; +use sqlx::{Row, postgres::PgRow}; use taler_api::db::TypeHelper; use taler_common::types::{ amount::Currency, @@ -27,7 +26,7 @@ use crate::{payto::BtcWallet, taler_utils::taler_to_btc}; /// Bitcoin amount from sql pub fn sql_btc_amount(row: &PgRow, idx: usize, currency: &Currency) -> sqlx::Result<BtcAmount> { - let amount = row.try_get_amount_i(idx, currency)?; + let amount = row.try_get_amount(idx, currency)?; Ok(taler_to_btc(&amount)) } @@ -53,9 +52,7 @@ pub fn sql_payto<I: sqlx::ColumnIndex<PgRow>>( .assume_checked(); let name: Option<&str> = r.try_get(name)?; - Ok(BtcWallet(addr) - .as_payto() - .as_full_payto(name.unwrap_or("Bitcoin User"))) + Ok(BtcWallet(addr).as_full_uri(name.unwrap_or("Bitcoin User"))) } pub fn sql_generic_payto<I: sqlx::ColumnIndex<PgRow>>( @@ -66,5 +63,5 @@ pub fn sql_generic_payto<I: sqlx::ColumnIndex<PgRow>>( .try_get_parse::<_, _, bitcoin::Address<NetworkUnchecked>>(idx)? .assume_checked(); - Ok(BtcWallet(addr).as_payto().as_full_payto("Bitcoin User")) + Ok(BtcWallet(addr).as_full_uri("Bitcoin User")) } diff --git a/depolymerizer-bitcoin/tests/api.rs b/depolymerizer-bitcoin/tests/api.rs @@ -1,105 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::str::FromStr; - -use axum::Router; -use depolymerizer_bitcoin::{CONFIG_SOURCE, api::ServerState}; -use sqlx::PgPool; -use taler_api::{api::TalerRouter as _, auth::AuthMethod}; -use taler_common::{ - api_common::{HashCode, ShortHashCode}, - api_wire::{OutgoingHistory, TransferState, WireConfig}, - types::{amount::Currency, payto::payto}, -}; -use taler_test_utils::{ - db_test_setup, json, - routine::{admin_add_incoming_routine, routine_pagination, transfer_routine}, - server::TestServer, -}; - -async fn setup() -> (Router, PgPool) { - let pool = db_test_setup(CONFIG_SOURCE).await; - let api = ServerState::start( - pool.clone(), - payto("payto://bitcoin/1FfmbHfnpaZjKFvyi1okTjJJusN455paPH"), - Currency::from_str("BTC").unwrap(), - ) - .await; - let server = Router::new() - .wire_gateway(api.clone(), AuthMethod::None) - .finalize(); - - (server, pool) -} - -#[tokio::test] -async fn config() { - let (server, _) = setup().await; - server - .get("/taler-wire-gateway/config") - .await - .assert_ok_json::<WireConfig>(); -} - -#[tokio::test] -async fn transfer() { - let (server, _) = setup().await; - transfer_routine( - &server, - TransferState::pending, - &payto("payto://bitcoin/1FfmbHfnpaZjKFvyi1okTjJJusN455paPH?receiver-name=Anonymous"), - ) - .await; -} - -#[tokio::test] -async fn outgoing_history() { - let (server, _) = setup().await; - routine_pagination::<OutgoingHistory, _>( - &server, - "/taler-wire-gateway/history/outgoing", - |it| { - it.outgoing_transactions - .into_iter() - .map(|it| *it.row_id as i64) - .collect() - }, - |s, _| async { - s.post("/taler-wire-gateway/transfer").json( - &json!({ - "request_uid": HashCode::rand(), - "amount": "BTC:10", - "exchange_base_url": "http://exchange.taler/", - "wtid": ShortHashCode::rand(), - "credit_account": "payto://bitcoin/1FfmbHfnpaZjKFvyi1okTjJJusN455paPH?receiver-name=Anonymous", - }) - ).await; - }, - ) - .await; -} - -#[tokio::test] -async fn admin_add_incoming() { - let (server, _) = setup().await; - admin_add_incoming_routine( - &server, - &payto("payto://bitcoin/1FfmbHfnpaZjKFvyi1okTjJJusN455paPH?receiver-name=Anonymous"), - false, - ) - .await; -} diff --git a/depolymerizer-common/Cargo.toml b/depolymerizer-common/Cargo.toml @@ -16,4 +16,5 @@ thiserror.workspace = true # Optimized uri binary format uri-pack = { path = "../uri-pack" } taler-common.workspace = true -rand.workspace = true -\ No newline at end of file +rand.workspace = true +compact_str.workspace = true +\ No newline at end of file diff --git a/depolymerizer-common/src/lib.rs b/depolymerizer-common/src/lib.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -13,7 +13,7 @@ You should have received a copy of the GNU Affero General Public License along with TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> */ -use rand::{RngCore as _, rngs::ThreadRng}; +use rand::{TryRng as _, rngs::SysRng}; pub mod metadata; pub mod status; @@ -21,6 +21,6 @@ pub mod status; /// Secure random slice generator using getrandom pub fn rand_slice<const N: usize>() -> [u8; N] { let mut slice = [0; N]; - ThreadRng::default().fill_bytes(&mut slice); + SysRng.try_fill_bytes(&mut slice).unwrap(); slice } diff --git a/depolymerizer-common/src/metadata.rs b/depolymerizer-common/src/metadata.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -16,7 +16,8 @@ use std::fmt::Debug; -use taler_common::api_common::{EddsaPublicKey, ShortHashCode}; +use compact_str::CompactString; +use taler_common::api::{EddsaPublicKey, ShortHashCode}; use url::Url; #[derive(Debug, Clone, thiserror::Error)] @@ -25,6 +26,8 @@ pub enum DecodeErr { UnknownFirstByte(u8), #[error(transparent)] UriPack(#[from] uri_pack::DecodeErr), + #[error(transparent)] + Metadata(#[from] std::str::Utf8Error), #[error("Unexpected end of file")] UnexpectedEOF, } @@ -38,25 +41,43 @@ pub enum EncodeErr { /// Encoded metadata for outgoing transaction #[derive(Debug, Clone, PartialEq, Eq)] pub enum OutMetadata { - Debit { wtid: ShortHashCode, url: Url }, - Bounce { bounced: [u8; 32] }, + Debit { + wtid: ShortHashCode, + url: Url, + metadata: Option<CompactString>, + }, + Bounce { + bounced: [u8; 32], + }, } // We leave a potential special meaning for u8::MAX -const BOUNCE_BYTE: u8 = u8::MAX - 1; +const BOUNCE_BYTE: u8 = 0b11111110; +const METADATA_FLAG: u8 = 0b10000000; impl OutMetadata { pub fn encode(&self) -> Result<Vec<u8>, EncodeErr> { let mut buffer = Vec::new(); match self { - OutMetadata::Debit { wtid, url } => { - let scheme_id = match url.scheme() { + OutMetadata::Debit { + wtid, + url, + metadata, + } => { + let mut scheme_id = match url.scheme() { "https" => 0, "http" => 1, scheme => return Err(EncodeErr::UnsupportedScheme(scheme.to_string())), }; + if metadata.is_some() { + scheme_id |= METADATA_FLAG; + } buffer.push(scheme_id); buffer.extend_from_slice(wtid.as_slice()); + if let Some(metadata) = metadata { + buffer.push(metadata.len() as u8); + buffer.extend_from_slice(metadata.as_bytes()); + } let parts = format!("{}{}", url.domain().unwrap_or(""), url.path()); let packed = uri_pack::pack_uri(&parts).unwrap(); buffer.extend_from_slice(&packed); @@ -73,32 +94,38 @@ impl OutMetadata { if bytes.is_empty() { return Err(DecodeErr::UnexpectedEOF); } - match bytes[0] { - 0..=1 => { - if bytes.len() < 33 { - return Err(DecodeErr::UnexpectedEOF); - } - let scheme = match bytes[0] { - 0 => "https", - 1 => "http", - _ => unreachable!(), - }; - let packed = format!("{}://{}", scheme, uri_pack::unpack_uri(&bytes[33..])?,); - let url = Url::parse(&packed).unwrap(); - Ok(OutMetadata::Debit { - wtid: bytes[1..33].try_into().unwrap(), - url, - }) + let id = bytes[0]; + if id == BOUNCE_BYTE { + if bytes.len() < 33 { + return Err(DecodeErr::UnexpectedEOF); } - BOUNCE_BYTE => { - if bytes.len() < 33 { - return Err(DecodeErr::UnexpectedEOF); - } - Ok(OutMetadata::Bounce { - bounced: bytes[1..33].try_into().unwrap(), - }) + Ok(OutMetadata::Bounce { + bounced: bytes[1..33].try_into().unwrap(), + }) + } else { + let scheme_id = id & !METADATA_FLAG; + if bytes.len() < 33 { + return Err(DecodeErr::UnexpectedEOF); } - unknown => Err(DecodeErr::UnknownFirstByte(unknown)), + let scheme = match scheme_id { + 0 => "https", + 1 => "http", + _ => return Err(DecodeErr::UnknownFirstByte(id)), + }; + let (metadata, uri) = if (id & METADATA_FLAG) != 0 { + let len = bytes[33] as usize; + let metadata = CompactString::from_utf8(&bytes[33 + 1..][..len])?; + (Some(metadata), (33 + len + 1)..) + } else { + (None, 33..) + }; + let packed = format!("{}://{}", scheme, uri_pack::unpack_uri(&bytes[uri])?,); + let url = Url::parse(&packed).unwrap(); + Ok(OutMetadata::Debit { + wtid: bytes[1..33].try_into().unwrap(), + url, + metadata, + }) } } } @@ -142,7 +169,7 @@ impl InMetadata { #[cfg(test)] mod test { - use taler_common::api_common::{EddsaPublicKey, ShortHashCode}; + use taler_common::api::{EddsaPublicKey, ShortHashCode}; use url::Url; use crate::{ @@ -170,13 +197,20 @@ mod test { "http://git.taler.net/", "http://git.taler.net/depolymerization.git/", ]; + let metadatas = [None, Some("REFERENCE"), Some("internal-id")]; for url in urls { - let wtid = ShortHashCode::rand(); - let url = Url::parse(url).unwrap(); - let metadata = OutMetadata::Debit { wtid, url }; - let encoded = metadata.encode().unwrap(); - let decoded = OutMetadata::decode(&encoded).unwrap(); - assert_eq!(decoded, metadata); + for metadata in metadatas { + let wtid = ShortHashCode::rand(); + let url = Url::parse(url).unwrap(); + let metadata = OutMetadata::Debit { + wtid, + url, + metadata: metadata.map(|it| it.into()), + }; + let encoded = metadata.encode().unwrap(); + let decoded = OutMetadata::decode(&encoded).unwrap(); + assert_eq!(decoded, metadata); + } } } @@ -187,6 +221,7 @@ mod test { let metadata = OutMetadata::Debit { wtid: ShortHashCode::rand(), url, + metadata: None, }; let encoded = metadata.encode(); assert!(encoded.is_err()) diff --git a/depolymerizer-common/src/status.rs b/depolymerizer-common/src/status.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022 Taler Systems SA + Copyright (C) 2022, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -18,7 +18,8 @@ /// Debit transaction status /// -> Requested API request /// Requested -> Sent Announced to the bitcoin network -/// Sent -> Requested Conflicting transaction (reorg) +/// Sent -> Confirmed Mined in an N old block +/// Confirmed -> Requested Conflicting transaction (reorg) #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] #[allow(non_camel_case_types)] #[sqlx(type_name = "debit_status")] @@ -27,13 +28,16 @@ pub enum DebitStatus { requested, /// Debit have been announced to the bitcoin network sent, + /// Debit have been mined in an N old block + confirmed, } /// Bounce transaction status /// -> Requested Credit in wrong format /// Requested -> Ignored Insufficient found /// Requested -> Sent Announced to the bitcoin network -/// Sent -> Requested Conflicting transaction (reorg) +/// Sent -> Confirmed Mined in an N old block +/// Confirmed -> Requested Conflicting transaction (reorg) #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] #[allow(non_camel_case_types)] #[sqlx(type_name = "bounce_status")] @@ -44,4 +48,6 @@ pub enum BounceStatus { ignored, /// Bounce have been announced to the bitcoin network sent, + /// Bounce have been mined in an N old block + confirmed, } diff --git a/makefile b/makefile @@ -22,20 +22,15 @@ install-nobuild-files: install -m 644 -D -t $(share_dir)/depolymerizer-bitcoin/sql depolymerizer-bitcoin/db/depolymerizer-bitcoin*.sql install -m 644 -D -t $(man_dir)/man1 doc/prebuilt/man/depolymerizer-bitcoin.1 install -m 644 -D -t $(man_dir)/man5 doc/prebuilt/man/depolymerizer-bitcoin.conf.5 - -.PHONY: install-nobuild-binaries -install-nobuild-binaries: install -D -t $(bin_dir) contrib/depolymerizer-bitcoin-dbconfig - install -D -t $(bin_dir) target/release/depolymerizer-bitcoin - -.PHONY: install-nobuild -install-nobuild: install-nobuild-files install-nobuild-binaries .PHONY: install -install: build install-nobuild +install: build install-nobuild-files + install -D -t $(bin_dir) target/release/depolymerizer-bitcoin .PHONY: check check: install-nobuild-files + cargo clippy --all-targets cargo test .PHONY: test @@ -52,4 +47,13 @@ deb: .PHONY: ci ci: - contrib/ci/run-all-jobs.sh -\ No newline at end of file + contrib/ci/run-all-jobs.sh + +.PHONY: fmt +fmt: + rustfmt-unstable --apply + + +.PHONY: coverage-cyclos +coverage: + cargo llvm-cov test +\ No newline at end of file diff --git a/rustfmt.toml b/rustfmt.toml @@ -0,0 +1,2 @@ +imports_granularity = "Crate" +group_imports = "StdExternalCrate" diff --git a/script/prepare.sh b/script/prepare.sh @@ -16,7 +16,7 @@ function cleanup() { trap cleanup EXIT -BTC_VER="29.0" +BTC_VER="31.0" CALLER_DIR="$PWD" cd $DIR diff --git a/testbench/Cargo.toml b/testbench/Cargo.toml @@ -10,7 +10,6 @@ license-file.workspace = true [dependencies] # Cli args parser clap.workspace = true -depolymerizer-common.workspace = true # Bitcoin depolymerizer-bitcoin = { path = "../depolymerizer-bitcoin" } bitcoin.workspace = true @@ -22,7 +21,7 @@ owo-colors = "4.0.0" rust-ini = "0.21.0" # Progress reporting indicatif = "0.18.0" -reedline = "0.41.0" +reedline = "0.48.0" taler-common.workspace = true tokio.workspace = true anyhow.workspace = true diff --git a/testbench/src/btc.rs b/testbench/src/btc.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -26,16 +26,19 @@ use depolymerizer_bitcoin::{ CONFIG_SOURCE, config::{RpcAuth, RpcCfg, ServeCfg, WorkerCfg}, payto::BtcWallet, - rpc::{Category, Rpc}, + rpc::{Category, Error, ErrorCode, Rpc}, rpc_utils::segwit_min_amount, taler_utils::btc_to_taler, }; use ini::Ini; use taler_common::{ - api_common::{EddsaPublicKey, ShortHashCode}, - types::base32::Base32, + api::{EddsaPublicKey, ShortHashCode}, + config::Config, + types::{ + base32::Base32, + payto::{Payto, PaytoImpl}, + }, }; -use taler_common::{config::Config, types::payto::Payto}; use crate::{ retry, retry_opt, @@ -104,7 +107,7 @@ impl BtcCtx { }, ); // Load config - let cfg = Config::from_file(CONFIG_SOURCE, Some(ctx.dir.join("config.conf"))).unwrap(); + let cfg = Config::load(CONFIG_SOURCE, Some(ctx.dir.join("config.conf"))).unwrap(); let rpc_cfg = RpcCfg::parse(&cfg).unwrap(); // Start bitcoin nodes let _node = cmd_redirect( @@ -113,13 +116,11 @@ impl BtcCtx { ctx.log("bitcoind"), ); // Connect - retry_opt! { - async { - let mut client = Rpc::common(&rpc_cfg).await?; - client.get_blockchain_info().await?; - Ok::<_, anyhow::Error>(()) - }.await - }; + retry_opt!(async { + let mut client = Rpc::common(&rpc_cfg).await?; + client.get_blockchain_info().await?; + Ok::<_, anyhow::Error>(()) + }); } fn patch_btc_config(from: impl AsRef<Path>, to: impl AsRef<Path>, port: u16, rpc_port: u16) { @@ -163,7 +164,7 @@ impl BtcCtx { }); // Load config - let config = Config::from_file(CONFIG_SOURCE, Some(&ctx.conf)).unwrap(); + let config = Config::load(CONFIG_SOURCE, Some(&ctx.conf)).unwrap(); let cfg = WorkerCfg::parse(&config).unwrap(); // Start bitcoin nodes let btc_node = cmd_redirect( @@ -178,24 +179,33 @@ impl BtcCtx { ); // Setup wallets - let mut common_rpc = retry_opt! { Rpc::common(&cfg.rpc_cfg).await }; - retry_opt! { common_rpc.get_blockchain_info().await }; + let mut common_rpc = retry_opt!(Rpc::common(&cfg.rpc_cfg)); + retry_opt!(common_rpc.get_blockchain_info()); let node2_addr = format!("127.0.0.1:{btc2_port}"); common_rpc.add_node(&node2_addr).await.unwrap(); for name in ["wire", "client", "reserve"] { - common_rpc.create_wallet(name, "").await.unwrap(); + if let Err(e) = common_rpc.load_wallet(name).await { + if let Error::RPC { + code: ErrorCode::RpcWalletNotFound, + .. + } = e + { + common_rpc.create_wallet(name, "").await.unwrap(); + } else { + break; + } + } } - let common_rpc2 = retry_opt! { - Rpc::common(&RpcCfg { - addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), btc2_rpc_port), - auth: RpcAuth::Cookie(ctx - .wire2_dir - .join("regtest/.cookie") - .to_string_lossy() - .to_string()), - }).await - }; + let common_rpc2 = retry_opt!(Rpc::common(&RpcCfg { + addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), btc2_rpc_port), + auth: RpcAuth::Cookie( + ctx.wire2_dir + .join("regtest/.cookie") + .to_string_lossy() + .to_string(), + ), + })); // Generate money let mut reserve_rpc = Rpc::wallet(&cfg.rpc_cfg, "reserve").await.unwrap(); @@ -217,7 +227,7 @@ impl BtcCtx { .set("WALLET", wire_addr.to_string()); }); - let config = Config::from_file(CONFIG_SOURCE, Some(&ctx.conf)).unwrap(); + let config = Config::load(CONFIG_SOURCE, Some(&ctx.conf)).unwrap(); let serve_cfg = ServeCfg::parse(&config).unwrap(); // Setup & run @@ -235,7 +245,7 @@ impl BtcCtx { wire_addr, client_addr, reserve_addr, - conf: cfg.confirmation as u16, + conf: cfg.conf as u16, worker_cfg: cfg, serve_cfg, _btc_node2, @@ -262,15 +272,16 @@ impl BtcCtx { .unwrap(); } - pub async fn cluster_fork(&mut self) { + pub async fn cluster_fork(&mut self) -> u16 { let node1_height = self.common_rpc.get_blockchain_info().await.unwrap().blocks; let node2_height = self.common_rpc2.get_blockchain_info().await.unwrap().blocks; let diff = node1_height - node2_height; self.common_rpc2 - .mine((diff + 1) as u16, &self.reserve_addr) + .mine(diff as u16 + 1, &self.reserve_addr) .await .unwrap(); self.common_rpc.add_node(&self.node2_addr).await.unwrap(); + diff as u16 + 2 } pub async fn restart_node(&mut self, additional_args: &[&str]) { @@ -283,7 +294,7 @@ impl BtcCtx { let mut args = vec![datadir.as_str()]; args.extend_from_slice(additional_args); self.btc_node = cmd_redirect("bitcoind", &args, self.ctx.log("bitcoind")); - self.common_rpc = retry_opt! { Rpc::common(&self.worker_cfg.rpc_cfg).await }; + self.common_rpc = retry_opt!(Rpc::common(&self.worker_cfg.rpc_cfg)); self.common_rpc.add_node(&self.node2_addr).await.unwrap(); for name in ["client", "reserve", "wire"] { self.common_rpc.load_wallet(name).await.ok(); @@ -296,24 +307,34 @@ impl BtcCtx { .await .unwrap(); self.wire_rpc = Rpc::wallet(&self.worker_cfg.rpc_cfg, "wire").await.unwrap(); + tokio::time::sleep(Duration::from_millis(100)).await; } /* ----- Transaction ------ */ pub async fn credit(&mut self, amount: Amount, metadata: &EddsaPublicKey) { - self.client_rpc + while let Err(e) = self + .client_rpc .send_segwit_key(&self.wire_addr, &amount, metadata) .await - .unwrap(); + { + match e { + Error::RPC { + code: ErrorCode::RpcWalletError, + .. + } => { + self.mine(1).await; + } + _ => panic!("{e:?}"), + } + } } pub async fn debit(&mut self, amount: Amount, metadata: &ShortHashCode) { transfer( &self.ctx.gateway_url, metadata, - Payto::new(BtcWallet(self.client_addr.clone())) - .as_payto() - .as_full_payto("name"), + Payto::new(BtcWallet(self.client_addr.clone())).as_full_uri("name"), &btc_to_taler(&amount.to_signed().unwrap(), &self.worker_cfg.currency), ) .await @@ -366,6 +387,17 @@ impl BtcCtx { self.mine(1).await } + pub async fn mine_pending(&mut self) { + while self.common_rpc.get_mempool_info().await.unwrap().size > 0 { + self.mine(1).await + } + } + + pub async fn mine_conf(&mut self) { + self.mine_pending().await; + self.next_conf().await; + } + /* ----- Balances ----- */ pub async fn client_balance(&mut self) -> Amount { @@ -376,29 +408,39 @@ impl BtcCtx { self.wire_rpc.get_balance().await.unwrap() } - pub async fn expect_client_balance(&mut self, balance: Amount, mine: bool) { - retry! {{ + pub async fn expect_c_balance(&mut self, balance: Amount, mine: bool) { + retry!(async { let check = balance == self.client_balance().await; if !check && mine { - self.next_block().await; + self.mine_conf().await; } check - }} + }); } - pub async fn expect_wire_balance(&mut self, balance: Amount, mine: bool) { - retry! {{ + pub async fn expect_w_balance(&mut self, balance: Amount, mine: bool) { + retry!(async { let check = balance == self.wire_balance().await; if !check && mine { - self.next_block().await; + self.mine_conf().await; } check - }} + }); + } + + pub async fn expect_w_balance_less(&mut self, balance: Amount, mine: bool) { + retry!(async { + let check = self.wire_balance().await < balance; + if !check && mine { + self.mine_conf().await; + } + check + }); } /* ----- Wire Gateway ----- */ - pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) { + pub async fn expect_credits(&mut self, txs: &[(EddsaPublicKey, Amount)], mine: bool) { let txs: Vec<_> = txs .iter() .map(|(metadata, amount)| { @@ -408,10 +450,16 @@ impl BtcCtx { ) }) .collect(); - self.ctx.expect_credits(&txs).await + retry!(async { + let check = self.ctx.expect_credits(&txs).await; + if !check && mine { + self.mine_conf().await; + } + check + }); } - pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) { + pub async fn expect_debits(&mut self, txs: &[(ShortHashCode, Amount)], mine: bool) { let txs: Vec<_> = txs .iter() .map(|(metadata, amount)| { @@ -421,7 +469,13 @@ impl BtcCtx { ) }) .collect(); - self.ctx.expect_debits(&txs).await + retry!(async { + let check = self.ctx.expect_debits(&txs).await; + if !check && mine { + self.mine_conf().await; + } + check + }); } } @@ -436,16 +490,14 @@ pub async fn wire(ctx: TestCtx) { let mut balance = ctx.wire_balance().await; let mut txs = Vec::new(); for n in 10..100 { - let metadata = Base32::rand(); + let metadata = EddsaPublicKey::rand(); let amount = Amount::from_sat(n * 1000); ctx.credit(amount, &metadata).await; txs.push((metadata, amount)); balance += amount; - ctx.next_block().await; } - ctx.next_conf().await; - ctx.expect_credits(&txs).await; - ctx.expect_wire_balance(balance, false).await; + ctx.expect_credits(&txs, true).await; + ctx.expect_w_balance(balance, false).await; }; ctx.step("Debit"); @@ -459,22 +511,19 @@ pub async fn wire(ctx: TestCtx) { ctx.debit(amount, &metadata).await; txs.push((metadata, amount)); } - ctx.next_block().await; - ctx.expect_debits(&txs).await; - ctx.expect_client_balance(balance, true).await; + ctx.expect_debits(&txs, true).await; + ctx.expect_c_balance(balance, false).await; } ctx.step("Bounce"); { ctx.reset_wallet().await; - // Send bad transactions let mut balance = ctx.wire_balance().await; for n in 10..40 { ctx.malformed_credit(&Amount::from_sat(n * 1000)).await; balance += ctx.worker_cfg.bounce_fee; } - ctx.next_conf().await; - ctx.expect_wire_balance(balance, true).await; + ctx.expect_w_balance(balance, true).await; } } @@ -484,25 +533,29 @@ pub async fn lifetime(ctx: TestCtx) { let mut ctx = BtcCtx::setup(&ctx, "taler_btc_lifetime.conf", false).await; ctx.step("Check lifetime"); // Start up - retry! { ctx.wire_running() && ctx.gateway_running() }; + retry!(async { ctx.wire_running() && ctx.gateway_running() }); // Consume wire lifetime - for _ in 0..=ctx.worker_cfg.lifetime.unwrap() + 2 { - ctx.credit(segwit_min_amount(), &Base32::rand()).await; - ctx.next_block().await; - tokio::time::sleep(Duration::from_millis(100)).await; + for _ in 0..ctx.worker_cfg.lifetime.unwrap() { + ctx.mine(1).await; } - retry! { !ctx.wire_running() }; + retry!(async { + let check = !ctx.wire_running() && ctx.gateway_running(); + if !check { + ctx.mine(1).await; + } + check + }); // Consume gateway lifetime for _ in 0..=ctx.serve_cfg.lifetime.unwrap() { ctx.debit(segwit_min_amount(), &Base32::rand()).await; - ctx.next_block().await; } // End down - retry! { !ctx.wire_running() && !ctx.gateway_running() }; + retry!(async { !ctx.wire_running() && !ctx.gateway_running() }); } /// Check the capacity of wire-gateway and btc-wire to recover from database and node loss pub async fn reconnect(ctx: TestCtx) { + // TODO check recover metadata ctx.step("Setup"); let mut ctx = BtcCtx::setup(&ctx, "taler_btc.conf", false).await; @@ -511,20 +564,19 @@ pub async fn reconnect(ctx: TestCtx) { ctx.step("With DB"); { - let metadata = Base32::rand(); + let metadata = EddsaPublicKey::rand(); let amount = Amount::from_sat(42000); ctx.credit(amount, &metadata).await; credits.push((metadata, amount)); - ctx.next_block().await; - ctx.next_conf().await; - ctx.expect_credits(&credits).await; + ctx.mine_conf().await; + ctx.expect_credits(&credits, false).await; }; ctx.step("Without DB"); { ctx.stop_db(); ctx.malformed_credit(&Amount::from_sat(24000)).await; - let metadata = Base32::rand(); + let metadata = EddsaPublicKey::rand(); let amount = Amount::from_sat(40000); ctx.credit(amount, &metadata).await; credits.push((metadata, amount)); @@ -540,19 +592,17 @@ pub async fn reconnect(ctx: TestCtx) { let amount = Amount::from_sat(2000); ctx.debit(amount, &metadata).await; debits.push((metadata, amount)); - ctx.next_conf().await; - ctx.expect_debits(&debits).await; - ctx.expect_credits(&credits).await; + ctx.expect_debits(&debits, true).await; + ctx.expect_credits(&credits, false).await; } ctx.step("Recover DB"); { let balance = ctx.wire_balance().await; ctx.reset_db(); - ctx.next_block().await; - ctx.expect_debits(&debits).await; - ctx.expect_credits(&credits).await; - ctx.expect_wire_balance(balance, true).await; + ctx.expect_debits(&debits, false).await; + ctx.expect_credits(&credits, false).await; + ctx.expect_w_balance(balance, false).await; } } @@ -568,16 +618,14 @@ pub async fn stress(ctx: TestCtx) { { let mut balance = ctx.wire_balance().await; for n in 10..30 { - let metadata = Base32::rand(); - let amount = Amount::from_sat(n * 1000); + let metadata = EddsaPublicKey::rand(); + let amount = Amount::from_sat(n * 10000); ctx.credit(amount, &metadata).await; credits.push((metadata, amount)); balance += amount; - ctx.next_block().await; } - ctx.next_conf().await; - ctx.expect_credits(&credits).await; - ctx.expect_wire_balance(balance, true).await; + ctx.expect_credits(&credits, true).await; + ctx.expect_w_balance(balance, false).await; }; ctx.step("Debit"); @@ -590,9 +638,8 @@ pub async fn stress(ctx: TestCtx) { ctx.debit(amount, &metadata).await; debits.push((metadata, amount)); } - ctx.next_block().await; - ctx.expect_debits(&debits).await; - ctx.expect_client_balance(balance, true).await; + ctx.expect_debits(&debits, true).await; + ctx.expect_c_balance(balance, false).await; } ctx.step("Bounce"); @@ -603,18 +650,16 @@ pub async fn stress(ctx: TestCtx) { ctx.malformed_credit(&Amount::from_sat(n * 1000)).await; balance += ctx.worker_cfg.bounce_fee; } - ctx.next_conf().await; - ctx.expect_wire_balance(balance, true).await; + ctx.expect_w_balance(balance, true).await; } ctx.step("Recover DB"); { let balance = ctx.wire_balance().await; ctx.reset_db(); - ctx.next_block().await; - ctx.expect_debits(&debits).await; - ctx.expect_credits(&credits).await; - ctx.expect_wire_balance(balance, true).await; + ctx.expect_debits(&debits, false).await; + ctx.expect_credits(&credits, false).await; + ctx.expect_w_balance(balance, false).await; } } @@ -627,37 +672,37 @@ pub async fn conflict(tctx: TestCtx) { { // Perform credit let amount = Amount::from_sat(4200000); - ctx.credit(amount, &Base32::rand()).await; + ctx.credit(amount, &EddsaPublicKey::rand()).await; ctx.next_conf().await; - ctx.expect_wire_balance(amount, true).await; + ctx.expect_w_balance(amount, false).await; let client = ctx.client_balance().await; let wire = ctx.wire_balance().await; // Perform debit ctx.debit(Amount::from_sat(400000), &Base32::rand()).await; - retry! { ctx.wire_balance().await < wire }; + ctx.expect_w_balance_less(wire, false).await; // Abandon pending transaction ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; ctx.abandon_wire().await; - ctx.expect_client_balance(client, false).await; - ctx.expect_wire_balance(wire, false).await; + ctx.expect_c_balance(client, false).await; + ctx.expect_w_balance(wire, false).await; // Generate conflict ctx.debit(Amount::from_sat(500000), &Base32::rand()).await; - retry! { ctx.wire_balance().await < wire }; + ctx.expect_w_balance_less(wire, false).await; // Resend conflicting transaction - ctx.restart_node(&[]).await; - ctx.next_block().await; let wire = ctx.wire_balance().await; - retry! { ctx.wire_balance().await < wire }; + ctx.restart_node(&[]).await; + ctx.expect_w_balance_less(wire, true).await; } ctx.step("Setup"); drop(ctx); let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; - ctx.credit(Amount::from_sat(3000000), &Base32::rand()).await; + ctx.credit(Amount::from_sat(3000000), &EddsaPublicKey::rand()) + .await; ctx.next_block().await; ctx.step("Conflict bounce"); @@ -668,23 +713,21 @@ pub async fn conflict(tctx: TestCtx) { ctx.malformed_credit(&bounce_amount).await; ctx.next_conf().await; let fee = ctx.worker_cfg.bounce_fee; - ctx.expect_wire_balance(wire + fee, true).await; + ctx.expect_w_balance(wire + fee, true).await; // Abandon pending transaction ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; ctx.abandon_wire().await; - ctx.expect_wire_balance(wire + bounce_amount, false).await; + ctx.expect_w_balance(wire + bounce_amount, false).await; // Generate conflict - let amount = Amount::from_sat(50000); - ctx.debit(amount, &Base32::rand()).await; - retry! { ctx.wire_balance().await < (wire + bounce_amount) }; + ctx.debit(Amount::from_sat(50000), &Base32::rand()).await; + ctx.expect_w_balance_less(wire + bounce_amount, false).await; // Resend conflicting transaction - ctx.restart_node(&[]).await; let wire = ctx.wire_balance().await; - ctx.next_block().await; - retry! { ctx.wire_balance().await < wire }; + ctx.restart_node(&[]).await; + ctx.expect_w_balance_less(wire, true).await; } } @@ -701,21 +744,21 @@ pub async fn reorg(ctx: TestCtx) { // Perform credits let before = ctx.wire_balance().await; for n in 10..21 { - ctx.credit(Amount::from_sat(n * 10000), &Base32::rand()) + ctx.credit(Amount::from_sat(n * 10000), &EddsaPublicKey::rand()) .await; - ctx.next_block().await; } + ctx.mine_conf().await; let after = ctx.wire_balance().await; // Perform fork and check btc-wire hard error ctx.expect_gateway_up().await; - ctx.cluster_fork().await; - ctx.expect_wire_balance(before, false).await; + let fork = ctx.cluster_fork().await; + ctx.expect_w_balance(before, false).await; ctx.expect_gateway_down().await; // Recover orphaned transaction - ctx.mine(12).await; - ctx.expect_wire_balance(after, false).await; + ctx.mine(fork).await; + ctx.expect_w_balance(after, false).await; ctx.expect_gateway_up().await; } @@ -732,18 +775,17 @@ pub async fn reorg(ctx: TestCtx) { ctx.debit(amount, &Base32::rand()).await; after += amount; } - ctx.next_block().await; - ctx.expect_client_balance(after, true).await; + ctx.expect_c_balance(after, true).await; // Perform fork and check btc-wire still up ctx.expect_gateway_up().await; ctx.cluster_fork().await; - ctx.expect_client_balance(before, false).await; + ctx.expect_c_balance(before, false).await; ctx.expect_gateway_up().await; // Recover orphaned transaction ctx.next_conf().await; - ctx.expect_client_balance(after, false).await; + ctx.expect_c_balance(after, false).await; } ctx.step("Handle reorg bounce"); @@ -760,18 +802,17 @@ pub async fn reorg(ctx: TestCtx) { ctx.malformed_credit(&Amount::from_sat(n * 1000)).await; after += ctx.worker_cfg.bounce_fee; } - ctx.next_conf().await; - ctx.expect_wire_balance(after, true).await; + ctx.expect_w_balance(after, true).await; // Perform fork and check btc-wire hard error ctx.expect_gateway_up().await; - ctx.cluster_fork().await; - ctx.expect_wire_balance(before, false).await; + let fork = ctx.cluster_fork().await; + ctx.expect_w_balance(before, false).await; ctx.expect_gateway_down().await; // Recover orphaned transaction - ctx.mine(12).await; - ctx.expect_wire_balance(after, false).await; + ctx.mine(fork).await; + ctx.expect_w_balance(after, false).await; ctx.expect_gateway_up().await; } } @@ -795,14 +836,14 @@ pub async fn hell(tctx: TestCtx) { $ctx.restart_node(&["-minrelaytxfee=0.001"]).await; $ctx.abandon_client().await; let amount = Amount::from_sat(54000); - $ctx.credit(amount, &Base32::rand()).await; - $ctx.expect_wire_balance(amount, true).await; + $ctx.credit(amount, &EddsaPublicKey::rand()).await; + $ctx.expect_w_balance(amount, true).await; // Check btc-wire suspend operation let bounce_amount = Amount::from_sat(34000); $ctx.malformed_credit(&bounce_amount).await; $ctx.next_conf().await; - $ctx.expect_wire_balance(amount + bounce_amount, true).await; + $ctx.expect_w_balance(amount + bounce_amount, true).await; $ctx.expect_gateway_down().await; }; } @@ -812,21 +853,21 @@ pub async fn hell(tctx: TestCtx) { ctx.step("Handle reorg conflicting incoming credit"); step!(ctx, { let amount = Amount::from_sat(420000); - ctx.credit(amount, &Base32::rand()).await; - ctx.next_conf().await; - ctx.expect_wire_balance(amount, true).await; + ctx.credit(amount, &EddsaPublicKey::rand()).await; + ctx.mine_conf().await; + ctx.expect_w_balance(amount, true).await; }); drop(ctx); tctx.step("Setup"); let mut ctx = BtcCtx::setup(&tctx, "taler_btc.conf", false).await; - ctx.step("Handle reorg conflicting incoming credit"); + ctx.step("Handle reorg conflicting bounce"); step!(ctx, { let amount = Amount::from_sat(420000); ctx.malformed_credit(&amount).await; - ctx.next_conf().await; + ctx.mine_conf().await; let fee = ctx.worker_cfg.bounce_fee; - ctx.expect_wire_balance(fee, true).await; + ctx.expect_w_balance(fee, true).await; }); } @@ -842,20 +883,21 @@ pub async fn analysis(ctx: TestCtx) { // Perform credit let before = ctx.wire_balance().await; - ctx.credit(Amount::from_sat(42000), &Base32::rand()).await; - ctx.next_conf().await; + ctx.credit(Amount::from_sat(42000), &EddsaPublicKey::rand()) + .await; + ctx.mine_conf().await; let after = ctx.wire_balance().await; // Perform fork and check btc-wire hard error ctx.expect_gateway_up().await; ctx.cluster_fork().await; - ctx.expect_wire_balance(before, false).await; + ctx.expect_w_balance(before, false).await; ctx.expect_gateway_down().await; // Recover orphaned transaction - ctx.next_conf().await; + ctx.mine_conf().await; ctx.next_block().await; // Conf have changed - ctx.expect_wire_balance(after, false).await; + ctx.expect_w_balance(after, false).await; ctx.expect_gateway_up().await; // Loose second bitcoin node @@ -863,13 +905,14 @@ pub async fn analysis(ctx: TestCtx) { // Perform credit let before = ctx.wire_balance().await; - ctx.credit(Amount::from_sat(42000), &Base32::rand()).await; - ctx.next_conf().await; + ctx.credit(Amount::from_sat(42000), &EddsaPublicKey::rand()) + .await; + ctx.mine_conf().await; // Perform fork and check btc-wire learned from previous attack ctx.expect_gateway_up().await; ctx.cluster_fork().await; - ctx.expect_wire_balance(before, false).await; + ctx.expect_w_balance(before, false).await; ctx.expect_gateway_up().await; } @@ -880,11 +923,10 @@ pub async fn bumpfee(tctx: TestCtx) { // Perform credits to allow wire to perform debits latter for n in 10..13 { - ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()) + ctx.credit(Amount::from_sat(n * 100000), &EddsaPublicKey::rand()) .await; - ctx.next_block().await; } - ctx.next_conf().await; + ctx.mine_conf().await; ctx.step("Bump fee"); { @@ -893,14 +935,14 @@ pub async fn bumpfee(tctx: TestCtx) { let wire = ctx.wire_balance().await; let amount = Amount::from_sat(40000); ctx.debit(amount, &Base32::rand()).await; - retry! { ctx.wire_balance().await < wire }; + ctx.expect_w_balance_less(wire, false).await; // Bump min relay fee making the previous debit stuck ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; // Check bump happen client += amount; - ctx.expect_client_balance(client, true).await; + ctx.expect_c_balance(client, true).await; } ctx.step("Bump fee reorg"); @@ -913,7 +955,7 @@ pub async fn bumpfee(tctx: TestCtx) { let wire = ctx.wire_balance().await; let amount = Amount::from_sat(40000); ctx.debit(amount, &Base32::rand()).await; - retry! { ctx.wire_balance().await < wire }; + ctx.expect_w_balance_less(wire, false).await; // Bump min relay fee and fork making the previous debit stuck and problematic ctx.cluster_fork().await; @@ -921,7 +963,7 @@ pub async fn bumpfee(tctx: TestCtx) { // Check bump happen client += amount; - ctx.expect_client_balance(client, true).await; + ctx.expect_c_balance(client, true).await; } ctx.step("Setup"); drop(ctx); @@ -929,11 +971,10 @@ pub async fn bumpfee(tctx: TestCtx) { // Perform credits to allow wire to perform debits latter for n in 10..61 { - ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()) + ctx.credit(Amount::from_sat(n * 100000), &EddsaPublicKey::rand()) .await; - ctx.next_block().await; } - ctx.next_conf().await; + ctx.mine_conf().await; ctx.step("Bump fee stress"); { @@ -944,18 +985,18 @@ pub async fn bumpfee(tctx: TestCtx) { let client = ctx.client_balance().await; let wire = ctx.wire_balance().await; let mut total_amount = Amount::ZERO; - for n in 10..31 { + for n in 10..30 { let amount = Amount::from_sat(n * 10000); total_amount += amount; ctx.debit(amount, &Base32::rand()).await; } - retry! { ctx.wire_balance().await < wire - total_amount }; + ctx.expect_w_balance_less(wire - total_amount, false).await; // Bump min relay fee making the previous debits stuck ctx.restart_node(&["-minrelaytxfee=0.0001"]).await; // Check bump happen - ctx.expect_client_balance(client + total_amount, true).await; + ctx.expect_c_balance(client + total_amount, true).await; } } @@ -966,11 +1007,10 @@ pub async fn maxfee(ctx: TestCtx) { // Perform credits to allow wire to perform debits latter for n in 10..31 { - ctx.credit(Amount::from_sat(n * 100000), &Base32::rand()) + ctx.credit(Amount::from_sat(n * 100000), &EddsaPublicKey::rand()) .await; - ctx.next_block().await; } - ctx.next_conf().await; + ctx.mine_pending().await; let client = ctx.client_balance().await; let wire = ctx.wire_balance().await; @@ -988,11 +1028,11 @@ pub async fn maxfee(ctx: TestCtx) { total_amount += amount; ctx.debit(amount, &Base32::rand()).await; } - ctx.mine(2).await; + ctx.mine_pending().await; // Check no transaction happen - ctx.expect_wire_balance(wire, false).await; - ctx.expect_client_balance(client, false).await; + ctx.expect_w_balance(wire, false).await; + ctx.expect_c_balance(client, false).await; } ctx.step("Good feed"); @@ -1001,7 +1041,7 @@ pub async fn maxfee(ctx: TestCtx) { ctx.restart_node(&[]).await; // Check transaction now have been made - ctx.expect_client_balance(client + total_amount, true).await; + ctx.expect_c_balance(client + total_amount, true).await; } } diff --git a/testbench/src/main.rs b/testbench/src/main.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -40,8 +40,7 @@ use owo_colors::OwoColorize; use reedline::{Prompt, Reedline, Signal}; use sqlx::PgPool; use taler_common::{ - api_common::{EddsaPublicKey, HashCode, ShortHashCode}, - api_wire::TransferRequest, + api::{EddsaPublicKey, HashCode, ShortHashCode, wire::TransferRequest}, config::Config, db::pool, log::taler_logger, @@ -122,7 +121,7 @@ impl<'a> Tmp<'a> { #[tokio::main] pub async fn main() { - taler_logger(None).init(); + taler_logger(None, false).init(); match Testbench::parse() { Testbench::Instrumentation { filters } => instrumentation(filters).await, Testbench::Bitcoin { network } => bitcoin(network).await, @@ -154,11 +153,12 @@ impl BtcEnv { Network::Test => "btc-test", }; println!("Setup bitcoin {network:?} env in {network_dir}"); - let root_dir = PathBuf::from_str("testbench/env") + let root_dir = PathBuf::from_str("./testbench/env") .unwrap() - .join(network_dir); + .join(network_dir) + .canonicalize() + .unwrap(); std::fs::create_dir_all(root_dir.join("bitcoin")).unwrap(); - let root_dir = root_dir.canonicalize().unwrap(); // Generate bitcoind config let cfg = match network { @@ -215,14 +215,14 @@ impl BtcEnv { )], root_dir.join("bitcoind.log"), ); - let cfg = Config::from_file(CONFIG_SOURCE, Some(&taler_cfg_path)).unwrap(); + let cfg = Config::load(CONFIG_SOURCE, Some(&taler_cfg_path)).unwrap(); let worker_cfg = WorkerCfg::parse(&cfg).unwrap(); - let mut rpc = retry_opt! { rpc_common(&worker_cfg.rpc_cfg).await }; + let mut rpc = retry_opt!(rpc_common(&worker_cfg.rpc_cfg)); for wallet in ["wire", "client"] { loop { let res = rpc.load_wallet(wallet).await; - if let Err(Error::RPC { code, msg: _ }) = res { + if let Err(Error::RPC { code, .. }) = res { match code { ErrorCode::RpcInWarmup => continue, ErrorCode::RpcWalletNotFound => { @@ -263,7 +263,7 @@ impl BtcEnv { ini.with_section(Some("depolymerizer-bitcoin")) .set("WALLET", wire_addr.to_string()); }); - let cfg = Config::from_file(CONFIG_SOURCE, Some(taler_cfg_path)).unwrap(); + let cfg = Config::load(CONFIG_SOURCE, Some(taler_cfg_path)).unwrap(); // Wait for db to start db.wait_running(); @@ -337,15 +337,16 @@ async fn run_cmd(env: &mut BtcEnv, buffer: &str) -> anyhow::Result<bool> { info!(target: "testbench", "Credit {reserve_pub} {amount} {txid} to {}", env.wire_addr); } Shell::Debit => { - let creditor = FullPayto::new(BtcWallet(env.client_addr.clone()), "client".to_string()); + let creditor = FullPayto::new(BtcWallet(env.client_addr.clone()), "client"); let wtid = ShortHashCode::rand(); let amount = amount("DEVBTC:0.0001"); let transfer = TransferRequest { request_uid: HashCode::rand(), - amount: amount.clone(), + amount, exchange_base_url: Url::parse("https://test.com/").unwrap(), wtid: wtid.clone(), - credit_account: creditor.as_payto(), + credit_account: creditor.as_uri(), + metadata: None, }; db::transfer(&env.pool, &creditor, &transfer).await?; info!(target: "testbench", "Debit {wtid} {amount} to {}", env.wire_addr); @@ -427,25 +428,26 @@ async fn bitcoin(network: Network) { progress: format!("{:.6}", info.verification_progress), }; let sig = line_editor.read_line(&prompt).unwrap(); - match sig { - Signal::Success(buffer) => match run_cmd(&mut env, &buffer).await { + while let Signal::Success(buffer) = &sig { + match run_cmd(&mut env, buffer).await { Ok(exit) => { if exit { break; } } Err(e) => error!(target: "testbench", "{e}"), - }, - Signal::CtrlC | Signal::CtrlD => break, + } } } } pub async fn instrumentation(filters: Vec<String>) { - let root = PathBuf::from_str("testbench/instrumentation").unwrap(); + let root = PathBuf::from_str("testbench/instrumentation") + .unwrap() + .canonicalize() + .unwrap(); std::fs::remove_dir_all(&root).ok(); std::fs::create_dir_all(root.join("bin")).unwrap(); - let root = root.canonicalize().unwrap(); // Set panic hook let failures = Arc::new(Mutex::new(Vec::new())); diff --git a/testbench/src/utils.rs b/testbench/src/utils.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -29,13 +29,15 @@ use std::{ use indicatif::ProgressBar; use ini::Ini; use taler_common::{ - api_common::{EddsaPublicKey, ShortHashCode}, - api_wire::{IncomingBankTransaction, IncomingHistory, OutgoingHistory, TransferRequest}, + api::{ + EddsaPublicKey, ShortHashCode, + wire::{IncomingBankTransaction, IncomingHistory, OutgoingHistory, TransferRequest}, + }, types::{amount::Amount, base32::Base32, payto::PaytoURI}, }; use url::Url; -const LOG: &str = "INFO"; +const LOG: &str = "DEBUG"; #[must_use] pub async fn check_incoming(base_url: &str, txs: &[(EddsaPublicKey, Amount)]) -> bool { @@ -86,10 +88,11 @@ pub async fn transfer(base_url: &str, wtid: &[u8; 32], credit_account: PaytoURI, loop { let res = ureq::post(&format!("{base_url}transfer")).send_json(TransferRequest { request_uid: Base32::rand(), - amount: amount.clone(), + amount: *amount, exchange_base_url: Url::parse("https://exchange.test/").unwrap(), wtid: Base32::from(*wtid), credit_account: credit_account.clone(), + metadata: None, }); if !matches!(res, Err(ureq::Error::StatusCode(502))) { res.unwrap(); @@ -173,9 +176,9 @@ macro_rules! retry_opt { async { let start = std::time::Instant::now(); loop { - let result = $expr; + let result = $expr.await; if result.is_err() && start.elapsed() < std::time::Duration::from_secs(30) { - tokio::time::sleep(std::time::Duration::from_millis(500)).await; + tokio::time::sleep(std::time::Duration::from_millis(300)).await; } else { return result.unwrap(); } @@ -189,10 +192,11 @@ macro_rules! retry_opt { macro_rules! retry { ($expr:expr) => { $crate::retry_opt! { - $expr.then_some(()).ok_or("failure") + async { $expr.await.then_some(()).ok_or("failure") } } }; } + #[derive(Clone)] pub struct TestCtx { pub name: String, @@ -322,11 +326,20 @@ impl TalerCtx { pub fn reset_db(&self) { // Reset db - cmd_redirect_ok( - &self.wire_bin_path, - &["-c", self.conf.to_string_lossy().as_ref(), "dbinit", "-r"], - self.log("cmd"), - "wire dbinit reset", + self.db.execute_sql( + " + FOR r IN ( + SELECT schemaname, tablename + FROM pg_tables + WHERE schemaname NOT IN ('pg_catalog', 'information_schema') + ) LOOP + EXECUTE format( + 'TRUNCATE TABLE %I.%I RESTART IDENTITY CASCADE', + r.schemaname, + r.tablename + ); + END LOOP; + ", ); } @@ -334,7 +347,7 @@ impl TalerCtx { // Init db cmd_redirect_ok( &self.wire_bin_path, - &["-c", self.conf.to_string_lossy().as_ref(), "setup"], + &["-c", self.conf.to_string_lossy().as_ref(), "setup", "-r"], self.log("cmd"), "wire setup", ); @@ -382,7 +395,7 @@ impl TalerCtx { // Wait for gateway to be up retry_opt! { - tokio::net::TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.gateway_port)).await + tokio::net::TcpStream::connect(SocketAddrV4::new(Ipv4Addr::LOCALHOST, self.gateway_port)) }; } @@ -406,20 +419,20 @@ impl TalerCtx { /* ----- Wire Gateway -----*/ - pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) { - retry! { check_incoming(&self.gateway_url, txs).await } + pub async fn expect_credits(&self, txs: &[(EddsaPublicKey, Amount)]) -> bool { + check_incoming(&self.gateway_url, txs).await } - pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) { - retry! { check_outgoing(&self.gateway_url, txs).await } + pub async fn expect_debits(&self, txs: &[(ShortHashCode, Amount)]) -> bool { + check_outgoing(&self.gateway_url, txs).await } pub async fn expect_gateway_up(&self) { - retry! { check_gateway_up(&self.gateway_url).await } + retry!(check_gateway_up(&self.gateway_url)) } pub async fn expect_gateway_down(&self) { - retry! { check_gateway_down(&self.gateway_url).await } + retry!(check_gateway_down(&self.gateway_url)) } } diff --git a/uri-pack/src/lib.rs b/uri-pack/src/lib.rs @@ -1,6 +1,6 @@ /* This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA + Copyright (C) 2022-2025, 2026 Taler Systems SA TALER is free software; you can redistribute it and/or modify it under the terms of the GNU Affero General Public License as published by the Free Software @@ -258,9 +258,9 @@ mod test { if href.chars().any(|c| !c.is_ascii_graphic() || c != ' ') { continue; // extended ascii } - let encoded = pack_uri(&href).expect(&format!("Failed to encode {}", &href)); - let decoded = - unpack_uri(&encoded).expect(&format!("Failed to decode encoded {}", &href)); + let encoded = pack_uri(href).unwrap_or_else(|_| panic!("Failed to encode {}", &href)); + let decoded = unpack_uri(&encoded) + .unwrap_or_else(|_| panic!("Failed to decode encoded {}", &href)); assert_eq!(href, decoded); } }