depolymerization

wire gateway for Bitcoin/Ethereum
Log | Files | Refs | Submodules | README | LICENSE

commit 050c5fac288d15a6bc8a123cf085fcc833d2e1e4
parent 74d55ab427d989e1adfff9ce5b6de4206574e549
Author: Antoine A <>
Date:   Tue,  8 Jul 2025 13:55:39 +0200

common: drop ethereum support for now

Diffstat:
MCargo.lock | 106-------------------------------------------------------------------------------
MCargo.toml | 4----
DREADME.md | 238-------------------------------------------------------------------------------
Mcommon/Cargo.toml | 1-
Ddatabase-versioning/depolymerizer-ethereum-0001.sql | 62--------------------------------------------------------------
Ddatabase-versioning/depolymerizer-ethereum-drop.sql | 30------------------------------
Ddatabase-versioning/depolymerizer-ethereum-procedures.sql | 40----------------------------------------
Ddepolymerizer-bitcoin/README.md | 69---------------------------------------------------------------------
Ddepolymerizer-ethereum/Cargo.toml | 38--------------------------------------
Ddepolymerizer-ethereum/README.md | 13-------------
Ddepolymerizer-ethereum/depolymerizer-ethereum.conf | 91-------------------------------------------------------------------------------
Ddepolymerizer-ethereum/src/api.rs | 410-------------------------------------------------------------------------------
Ddepolymerizer-ethereum/src/config.rs | 129-------------------------------------------------------------------------------
Ddepolymerizer-ethereum/src/fail_point.rs | 31-------------------------------
Ddepolymerizer-ethereum/src/lib.rs | 248-------------------------------------------------------------------------------
Ddepolymerizer-ethereum/src/loops.rs | 38--------------------------------------
Ddepolymerizer-ethereum/src/loops/analysis.rs | 33---------------------------------
Ddepolymerizer-ethereum/src/loops/watcher.rs | 43-------------------------------------------
Ddepolymerizer-ethereum/src/loops/worker.rs | 530-------------------------------------------------------------------------------
Ddepolymerizer-ethereum/src/main.rs | 214-------------------------------------------------------------------------------
Ddepolymerizer-ethereum/src/payto.rs | 75---------------------------------------------------------------------------
Ddepolymerizer-ethereum/src/rpc.rs | 551-------------------------------------------------------------------------------
Ddepolymerizer-ethereum/src/sql.rs | 40----------------------------------------
Ddepolymerizer-ethereum/src/taler_util.rs | 36------------------------------------
Ddepolymerizer-ethereum/tests/api.rs | 105-------------------------------------------------------------------------------
Minstrumentation/Cargo.toml | 4----
Dinstrumentation/README.md | 39---------------------------------------
Dinstrumentation/src/eth.rs | 1059-------------------------------------------------------------------------------
Minstrumentation/src/main.rs | 14++------------
Mmakefile | 6------
Mscript/prepare.sh | 13-------------
31 files changed, 2 insertions(+), 4308 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock @@ -413,7 +413,6 @@ version = "0.1.0" dependencies = [ "bitcoin", "const-hex", - "ethereum-types", "postgres", "rand 0.9.1", "sqlx", @@ -753,28 +752,6 @@ dependencies = [ ] [[package]] -name = "depolymerizer-ethereum" -version = "0.1.0" -dependencies = [ - "anyhow", - "axum", - "clap", - "common", - "const-hex", - "ethereum-types", - "rustc-hex", - "serde", - "serde_json", - "sqlx", - "taler-api", - "taler-common", - "taler-test-utils", - "thiserror", - "tokio", - "tracing", -] - -[[package]] name = "der" version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -915,31 +892,6 @@ dependencies = [ ] [[package]] -name = "ethbloom" -version = "0.14.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c321610643004cf908ec0f5f2aa0d8f1f8e14b540562a2887a1111ff1ecbf7b" -dependencies = [ - "crunchy", - "fixed-hash", - "impl-serde", - "tiny-keccak", -] - -[[package]] -name = "ethereum-types" -version = "0.15.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ab15ed80916029f878e0267c3a9f92b67df55e79af370bf66199059ae2b4ee3" -dependencies = [ - "ethbloom", - "fixed-hash", - "impl-serde", - "primitive-types", - "uint", -] - -[[package]] name = "event-listener" version = "5.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -969,17 +921,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28dea519a9695b9977216879a3ebfddf92f1c08c05d984f8996aecd6ecdc811d" [[package]] -name = "fixed-hash" -version = "0.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "835c052cb0c08c1acf6ffd71c022172e18723949c8282f2b9f27efbc51e64534" -dependencies = [ - "byteorder", - "rustc-hex", - "static_assertions", -] - -[[package]] name = "flate2" version = "1.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1435,15 +1376,6 @@ dependencies = [ ] [[package]] -name = "impl-serde" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4a143eada6a1ec4aefa5049037a26a6d597bfd64f8c026d07b77133e02b7dd0b" -dependencies = [ - "serde", -] - -[[package]] name = "indexmap" version = "2.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -1474,10 +1406,7 @@ dependencies = [ "bitcoin", "clap", "common", - "const-hex", "depolymerizer-bitcoin", - "depolymerizer-ethereum", - "ethereum-types", "fastrand", "indicatif", "owo-colors", @@ -2031,17 +1960,6 @@ dependencies = [ ] [[package]] -name = "primitive-types" -version = "0.13.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d15600a7d856470b7d278b3fe0e311fe28c2526348549f8ef2ff7db3299c87f5" -dependencies = [ - "fixed-hash", - "impl-serde", - "uint", -] - -[[package]] name = "proc-macro2" version = "1.0.95" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2280,12 +2198,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f" [[package]] -name = "rustc-hex" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e75f6a532d0fd9f7f13144f392b6ad56a32696bfcd9c78f797f16bbb6f072d6" - -[[package]] name = "rustc_version" version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -2799,12 +2711,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" [[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - -[[package]] name = "stringprep" version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" @@ -3217,18 +3123,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dccffe3ce07af9386bfd29e80c0ab1a8205a2fc34e4bcd40364df902cfa8f3f" [[package]] -name = "uint" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "909988d098b2f738727b161a106cfc7cab00c539c2687a8836f8e565976fb53e" -dependencies = [ - "byteorder", - "crunchy", - "hex", - "static_assertions", -] - -[[package]] name = "unarray" version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" diff --git a/Cargo.toml b/Cargo.toml @@ -2,7 +2,6 @@ resolver = "3" members = [ "depolymerizer-bitcoin", - "depolymerizer-ethereum", "uri-pack", "common", "instrumentation", @@ -38,9 +37,6 @@ bitcoin = { version = "0.32.5", features = [ "std", "serde", ], default-features = false } -ethereum-types = { version = "0.15.1", default-features = false, features = [ - "serialize", -] } hex = { package = "const-hex", version = "1.9.1" } clap = { version = "4.5", features = ["derive"] } anyhow = "1" diff --git a/README.md b/README.md @@ -1,237 +0,0 @@ -# Depolymerization - -## Project structure - -- **wire-gateway**: - [Taler Wire Gateway HTTP API](https://docs.taler.net/core/api-wire.html) - server -- **btc-wire**: Taler wire implementation for - [bitcoincore](https://bitcoincore.org/en/about/) node -- **eth-wire**: Taler wire implementation for - [go-ethereum](https://geth.ethereum.org/) node -- **uri-pack**: Efficient probabilistic binary encoding for URI -- **db**: Database schemas -- **docs**: Documentation files -- **test**: Test scripts -- **instrumentation**: Instrumentation test tool - -## Install from source - -Cargo version 1.72.1 or above is required. You can get rustup from your -distribution package manager or from [rustup.rs](https://rustup.rs/). - -``` -git clone https://git.taler.net/depolymerization.git/ -cd depolymerization -make install -``` - -## Getting started - -### Dependencies - -Depolymerizer require: - -- taler-config from [Taler exchange](https://git.taler.net/exchange.git/) - ap -- PostgreSQL - -#### Bitcoin - -[Bitcoind](https://bitcoincore.org/) version 29.0 is expected - -#### Ethereum - -[Geth](https://geth.ethereum.org/) version 1.13.5 is expected - -### Initialization - -We expect you to already have written a [configuration](#configuration) and -that PostgreSQL and the node (bitcoind or geth) is configured and running. - -We will use `btc-wire` in command, but you can use `eth-wire` interchangeably to -depolymerize Ethereum. - -If you want to use a specific configuration file, you can add `-c CONF_PATH` to -every command. - -#### Database initialization - -``` -btc-wire initdb -``` - -#### Wallet initialization - -Depolymerization uses an encrypted wallet, so you need to provide a password in -the environment variable `PASSWORD`. - -``` -btc-wire initwallet -``` - -You then need to update your configuration with the provided `PAYTO` config -value. - -#### Start depolymerization - -``` -btc-wire -``` - -You also need to run the wire gateway `wire-gateway`. It can run from another -user as long it can access the database and the configuration file. - -## Configuration - -The configuration is based on -[taler.conf](https://docs.taler.net/manpages/taler.conf.5.html). - -You can find configurations example for each implementation: - -- btc-wire: [minimal](docs/taler-btc-min.conf) or - [full](docs/taler-btc-full.conf) -- eth-wire: [minimal](docs/taler-eth-min.conf) or - [full](docs/taler-eth-full.conf) - -### Initialization - -This is the required configuration for initialization: - -```ini -# taler.conf - (fill all ___) -[taler] -# Exchange currency -CURRENCY = ___ - -[exchange] -# Exchange base url -BASE_URL = ___ - -[depolymerizer-___] -# Postgres connection URL -DB_URL = ___ -``` - -`PAYTO` is to be added after wallet initialization. - -### Currency - -Unlike Bitcoin or Ethereum, we differentiate currencies between networks. The -following currencies are supported: - -- Bitcoin currencies (btc-wire): - - BITCOINBTC for main network - - TESTBTC for test network - - DEVBTC for regtest network -- Ethereum currencies (eth-wire): - - ETHEREUMETH for main network - - ROPSTENETH for ropsten network - - DEVETH for dev network - -### btc-wire - -btc-wire will automatically read the bitcoin configuration file (bitcoin.conf) -to connect to the RPC server. Two flags are mandatory: - -- `txindex=1`: btc-wire needs access to transactions not linked to the wire - wallet -- `maxtxfee=?`: bitcoin transactions fees can exceed taler wire fees, putting - your wire in bankruptcy. You must specify an acceptable transaction fee cap. - -It is also recommended to disable RPC client timeout with `rpcservertimeout=0` -or to set a timeout delay superior than the block delay (e.g -`rpcservertimeout=720`) to prevent recurrent "Broken pipe" errors. - -```ini -[depolymerizer-bitcoin] -# Datadir or configuration file path -CONF_PATH = ~/.bitcoin -# Number of blocks to consider a transactions durable -CONFIRMATION = 6 -# Amount to keep when bouncing malformed credit -BOUNCE_FEE = 0.00001 -``` - -### eth-wire - -```ini -[depolymerizer-ethereum] -# Datadir or ipc file path -IPC_PATH = ~/.ethereum/geth/geth.ipc -# Number of blocks to consider a transactions durable -CONFIRMATION = 37 -# Amount to keep when bouncing malformed credit -BOUNCE_FEE = 0.00001 -``` - -### Wire gateway - -```ini -[depolymerizer-___] -# Port on which the server listen -PORT = 8080 -# Path on which the server listen (replace port) -UNIXPATH = -# HTTP Authentication Scheme (basic or none) -AUTH_METHOD = -# Authentification token (base64 for basic auth) -AUTH_TOKEN = -``` - -### Process lifetime - -You may want to restart depolymerization processes regularly to improve -stability (ex. fix memory fragmentation). It is possible to configure a lifetime -that triggers a graceful shutdown every time a specific amount of work has been -done. - -```ini -[depolymerizer-___] -# Number of requests to serve before gateway shutdown (0 mean never) -HTTP_LIFETIME = 0 -# Number of worker's loops before wire implementation shutdown (0 mean never) -WIRE_LIFETIME = 0 -``` - -### Stuck transaction - -When we send a transaction with a fee too small, it may not be confirmed in a -timely fashion. To unstuck those transactions, it is possible to replace them -with other transactions with more fees. Depolymerizer can be configured to do -this automatically: - -```ini -[depolymerizer-___] -# Delay in seconds before bumping an unconfirmed transaction fee (0 mean never) -BUMP_DELAY = 0 -``` - -## Security - -Depolymerizer only use an encrypted wallet and provides an easy way to create -them. It is the administrator's responsibility to back up his wallet and -password. - -Only the wire adapter needs to have the password stored in its environment. - -## Log format - -Wire logs use an ASCII code for transaction logs: - -- `<<` for a credit -- `>>` for a debit -- `||` for a bounce - -You can have an additional context: - -- `bump ` a stuck transaction has been bumped -- `conflict ` a transaction has been canceled by a conflicting blockchain - transaction -- `recovered ` a successful transaction that we failed to register in the local - database has been found in the blockchain -- `onchain ` an unknown transaction has been found in the blockchain - -## Test - -Instrumentation test documentation can be founded in the [instrumentation](instrumentation/README.md) directory. -\ No newline at end of file diff --git a/common/Cargo.toml b/common/Cargo.toml @@ -22,6 +22,5 @@ taler-common.workspace = true taler-api.workspace = true sqlx.workspace = true bitcoin.workspace = true -ethereum-types.workspace = true hex.workspace = true tracing.workspace = true \ No newline at end of file diff --git a/database-versioning/depolymerizer-ethereum-0001.sql b/database-versioning/depolymerizer-ethereum-0001.sql @@ -1,61 +0,0 @@ --- --- This file is part of TALER --- Copyright (C) 2025 Taler Systems SA --- --- TALER is free software; you can redistribute it and/or modify it under the --- terms of the GNU General Public License as published by the Free Software --- Foundation; either version 3, or (at your option) any later version. --- --- TALER is distributed in the hope that it will be useful, but WITHOUT ANY --- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR --- A PARTICULAR PURPOSE. See the GNU General Public License for more details. --- --- You should have received a copy of the GNU General Public License along with --- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> - -SELECT _v.register_patch('depolymerizer-ethereum-0001', NULL, NULL); - -CREATE SCHEMA depolymerizer_ethereum; -SET search_path TO depolymerizer_ethereum; - -CREATE TYPE taler_amount AS (val INT8, frac INT4); -COMMENT ON TYPE taler_amount IS 'Stores an amount, fraction is in units of 1/100000000 of the base value'; - -CREATE TABLE state ( - name TEXT NOT NULL PRIMARY KEY, - value BYTEA NOT NULL -); -COMMENT ON TABLE state IS 'Key value state'; - -CREATE TABLE tx_in ( - id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - received INT8 NOT NULL, - amount taler_amount NOT NULL, - reserve_pub BYTEA NOT NULL UNIQUE CHECK (LENGTH(reserve_pub)=32), - debit_acc BYTEA NOT NULL CHECK (LENGTH(debit_acc)=20) -); -COMMENT ON TABLE state IS 'Incoming transactions'; - -CREATE TABLE tx_out ( - id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - created INT8 NOT NULL, - amount taler_amount NOT NULL, - wtid BYTEA NOT NULL UNIQUE CHECK (LENGTH(wtid)=32), - credit_acc BYTEA NOT NULL CHECK (LENGTH(credit_acc)=20), - credit_name TEXT, - exchange_url TEXT NOT NULL, - request_uid BYTEA UNIQUE CHECK (LENGTH(request_uid)=64), - status SMALLINT NOT NULL DEFAULT 0, - txid BYTEA UNIQUE CHECK (LENGTH(txid)=32), - sent INT8 -); -COMMENT ON TABLE state IS 'Outgoing transactions'; - -CREATE TABLE bounce ( - id INT8 PRIMARY KEY GENERATED ALWAYS AS IDENTITY, - bounced BYTEA UNIQUE NOT NULL CHECK (LENGTH(bounced)=32), - txid BYTEA UNIQUE CHECK (LENGTH(txid)=32), - created INT8 NOT NULL, - status SMALLINT NOT NULL DEFAULT 0 -); -COMMENT ON TABLE state IS 'Bounced incoming transactions'; -\ No newline at end of file diff --git a/database-versioning/depolymerizer-ethereum-drop.sql b/database-versioning/depolymerizer-ethereum-drop.sql @@ -1,29 +0,0 @@ --- --- This file is part of TALER --- Copyright (C) 2025 Taler Systems SA --- --- TALER is free software; you can redistribute it and/or modify it under the --- terms of the GNU General Public License as published by the Free Software --- Foundation; either version 3, or (at your option) any later version. --- --- TALER is distributed in the hope that it will be useful, but WITHOUT ANY --- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR --- A PARTICULAR PURPOSE. See the GNU General Public License for more details. --- --- You should have received a copy of the GNU General Public License along with --- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> - -DO -$do$ -DECLARE - patch text; -BEGIN - IF EXISTS(SELECT FROM information_schema.schemata WHERE schema_name='_v') THEN - FOR patch IN SELECT patch_name FROM _v.patches WHERE patch_name LIKE 'depolymerizer-ethereum-%' LOOP - PERFORM _v.unregister_patch(patch); - END LOOP; - END IF; -END -$do$; - -DROP SCHEMA IF EXISTS depolymerizer_ethereum CASCADE; -\ No newline at end of file diff --git a/database-versioning/depolymerizer-ethereum-procedures.sql b/database-versioning/depolymerizer-ethereum-procedures.sql @@ -1,39 +0,0 @@ --- --- This file is part of TALER --- Copyright (C) 2025 Taler Systems SA --- --- TALER is free software; you can redistribute it and/or modify it under the --- terms of the GNU General Public License as published by the Free Software --- Foundation; either version 3, or (at your option) any later version. --- --- TALER is distributed in the hope that it will be useful, but WITHOUT ANY --- WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR --- A PARTICULAR PURPOSE. See the GNU General Public License for more details. --- --- You should have received a copy of the GNU General Public License along with --- TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> - -SET search_path TO depolymerizer_ethereum; - --- Remove all existing functions -DO -$do$ -DECLARE - _sql text; -BEGIN - SELECT INTO _sql - string_agg(format('DROP %s %s CASCADE;' - , CASE prokind - WHEN 'f' THEN 'FUNCTION' - WHEN 'p' THEN 'PROCEDURE' - END - , oid::regprocedure) - , E'\n') - FROM pg_proc - WHERE pronamespace = 'depolymerizer_ethereum'::regnamespace; - - IF _sql IS NOT NULL THEN - EXECUTE _sql; - END IF; -END -$do$; -\ No newline at end of file diff --git a/depolymerizer-bitcoin/README.md b/depolymerizer-bitcoin/README.md @@ -1,69 +0,0 @@ -# btc-wire - -btc-wire is taler wire adapter for -[bitcoincore](https://bitcoincore.org/en/about/) node - -## Credit metadata format - -Starting from a bitcoin payto URI you will have to generate fake segwit -addresses to encode the reserve public key as metadata into a common bitcoin -transaction. - -A single segwit address can contain 20B of chosen data. The reserve pub key -being 32B we need two addresses. Therefore, we use two fake addresses consisting of the two key halves prepended -with the same random pattern, except for the first bit which must be 0 for the -first half and 1 for the second one. You must then send a single transaction -with three addresses as recipients. - -Segwit addresses are encoded using a bitcoin specific format: -[bech32](https://github.com/bitcoin/bips/blob/master/bip-0173.mediawiki) - -As a few lines of code can carry more meaning than many words, you can find a -[simple rust example](src/bin/segwit-demo.rs) in this project and run it with -`make segwit_demo`. - -``` -Ⅰ - Parse payto uri -Got payto uri: payto://bitcoin/bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3t4?amount=BTC:0.1&subject=0ZSX8SH0M30KHX8K3Y1DAMVGDQV82XEF9DG1HC4QMQ3QWYT4AF00 -Send 0.1 BTC to bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3t4 with reserve public key 0ZSX8SH0M30KHX8K3Y1DAMVGDQV82XEF9DG1HC4QMQ3QWYT4AF00 - -Ⅱ - Generate fake segwit addresses -Decode reserve public key: 0x07f3d46620a0c138f5131f82d553706df68175cf4b6018b097a5c77e7b4453c0 -Generate random prefix 0x7ea4c272 -Split reserve public key in two: -0x07f3d46620a0c138f5131f82d553706d -0xf68175cf4b6018b097a5c77e7b4453c0 -Concatenate random prefix with each reserve public key half: -0x7ea4c27207f3d46620a0c138f5131f82d553706d -0x7ea4c272f68175cf4b6018b097a5c77e7b4453c0 -Set first bit of the first half: -0x7ea4c27207f3d46620a0c138f5131f82d553706d -Unset first bit of the second half: -0xfea4c272f68175cf4b6018b097a5c77e7b4453c0 -Encode each half using bech32 to generate a segwit address: -bc1q06jvyus8702xvg9qcyu02yclst24xurdjvsnqz -bc1ql6jvyuhks96u7jmqrzcf0fw80ea5g57q2eccn6 - -Ⅲ - Send to many -Send a single bitcoin transaction with the three addresses as recipient as follow: - -In bitcoincore wallet use 'Add Recipient' button to add two additional recipient and copy adresses and amounts -bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3t4 0.10000000 BTC -bc1q06jvyus8702xvg9qcyu02yclst24xurdjvsnqz 0.00000294 BTC -bc1ql6jvyuhks96u7jmqrzcf0fw80ea5g57q2eccn6 0.00000294 BTC - -In Electrum wallet paste the following three lines in 'Pay to' field : -bc1qw508d6qejxtdg4y5r3zarvary0c5xw7kv8f3t4,0.10000000 -bc1q06jvyus8702xvg9qcyu02yclst24xurdjvsnqz,0.00000294 -bc1ql6jvyuhks96u7jmqrzcf0fw80ea5g57q2eccn6,0.00000294 -Make sure the amount show 0.10000588 BTC, else you have to change the base unit to BTC -``` - -## Implementation details - -### Stuck transaction - -We resolve stuck transactions by always sending replaceable transactions using -[BIP 125](https://github.com/bitcoin/bips/blob/master/bip-0125.mediawiki). - -TODO diff --git a/depolymerizer-ethereum/Cargo.toml b/depolymerizer-ethereum/Cargo.toml @@ -1,38 +0,0 @@ -[package] -name = "depolymerizer-ethereum" -version = "0.1.0" -edition.workspace = true -authors.workspace = true -homepage.workspace = true -repository.workspace = true -license-file.workspace = true - -[features] -# Enable random failures -fail = [] - -[dependencies] -# Cli args -clap.workspace = true -# Serialization library -serde.workspace = true -serde_json.workspace = true -# Hexadecimal encoding -hex.workspace = true -# Ethereum serializable types -ethereum-types.workspace = true -# Error macros -thiserror.workspace = true -# Common lib -common = { path = "../common" } -taler-api.workspace = true -taler-common.workspace = true -rustc-hex = "2.1" -anyhow.workspace = true -sqlx.workspace = true -axum.workspace = true -tokio.workspace = true -tracing.workspace = true - -[dev-dependencies] -taler-test-utils.workspace = true diff --git a/depolymerizer-ethereum/README.md b/depolymerizer-ethereum/README.md @@ -1,12 +0,0 @@ -# eth-wire - -eth-wire is taler wire adapter for [go-ethereum](https://geth.ethereum.org/) -node - -## Credit metadata format - -TODO - -## Implementation details - -TODO -\ No newline at end of file diff --git a/depolymerizer-ethereum/depolymerizer-ethereum.conf b/depolymerizer-ethereum/depolymerizer-ethereum.conf @@ -1,90 +0,0 @@ -[depolymerizer-ethereum] -# Ethereum account address to sync -ACCOUNT = - -# Legal name of the account owner -NAME = - -[depolymerizer-ethereum-worker] -# Password of the encrypted wallet -PASSWORD = - -# Number of blocks to consider a transaction confirmed -CONFIRMATION = 37 - -# An additional fee to deduce from the bounced amount -# BOUNCE_FEE = BTC:0 - -# Specify the account type and therefore the indexing behavior. -# This can either can be normal or exchange. -# Exchange accounts bounce invalid incoming Taler transactions. -ACCOUNT_TYPE = exchange - -# Number of worker's loops before wire implementation shut -LIFETIME = 0 - -# Delay in seconds before bumping an unconfirmed transaction fee (0 mean never) -BUMP_DELAY = 0 - -# Path to the ethereum RPC server -IPC_PATH = $HOME/.ethereum - -[depolymerizer-ethereum-httpd] -# How "depolymerizer-ethereum serve" serves its API, this can either be tcp or unix -SERVE = tcp - -# Port on which the HTTP server listens, e.g. 9967. Only used if SERVE is tcp. -PORT = 8080 - -# Which IP address should we bind to? E.g. ``127.0.0.1`` or ``::1``for loopback. Only used if SERVE is tcp. -BIND_TO = 0.0.0.0 - -# Which unix domain path should we bind to? Only used if SERVE is unix. -# UNIXPATH = depolymerizer-ethereum.sock - -# What should be the file access permissions for UNIXPATH? Only used if SERVE is unix. -# UNIXPATH_MODE = 660 - -# Number of requests to serve before server shutdown (0 mean never) -LIFETIME = 0 - -[depolymerizer-ethereum-httpd-wire-gateway-api] -# Whether to serve the Wire Gateway API -ENABLED = NO - -# Authentication scheme, this can either can be basic, bearer or none. -AUTH_METHOD = bearer - -# User name for basic authentication scheme -# USERNAME = - -# Password for basic authentication scheme -# PASSWORD = - -# Token for bearer authentication scheme -TOKEN = - - -[depolymerizer-ethereum-httpd-revenue-api] -# Whether to serve the Revenue API -ENABLED = NO - -# Authentication scheme, this can either can be basic, bearer or none. -AUTH_METHOD = bearer - -# User name for basic authentication scheme -# USERNAME = - -# Password for basic authentication scheme -# PASSWORD = - -# Token for bearer authentication scheme -TOKEN = - - -[depolymerizer-ethereumdb-postgres] -# DB connection string -CONFIG = postgres:///depolymerizer-ethereum - -# Where are the SQL files to setup our tables? -SQL_DIR = ${DATADIR}/sql/ -\ No newline at end of file diff --git a/depolymerizer-ethereum/src/api.rs b/depolymerizer-ethereum/src/api.rs @@ -1,410 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::{ - sync::{ - Arc, - atomic::{AtomicBool, Ordering}, - }, - time::Duration, -}; - -use axum::{ - extract::{Request, State}, - http::StatusCode, - middleware::Next, - response::{IntoResponse as _, Response}, -}; -use common::reconnect::client_jitter; -use sqlx::{ - PgPool, QueryBuilder, Row, - postgres::{PgListener, PgRow}, -}; -use taler_api::{ - api::{TalerApi, wire::WireGateway}, - db::{BindHelper as _, TypeHelper as _, history, page}, - error::{ApiResult, failure, failure_status, not_implemented}, -}; -use taler_common::{ - ExpoBackoffDecorr, - api_params::{History, Page}, - api_wire::{ - AddIncomingRequest, AddIncomingResponse, AddKycauthRequest, AddKycauthResponse, - IncomingBankTransaction, IncomingHistory, OutgoingBankTransaction, OutgoingHistory, - TransferList, TransferRequest, TransferResponse, TransferState, TransferStatus, - }, - error_code::ErrorCode, - types::{amount::Currency, payto::PaytoURI, timestamp::Timestamp}, -}; -use taler_common::{api_wire::TransferListStatus, types::payto::PaytoImpl}; -use tokio::{sync::watch::Sender, time::sleep}; -use tracing::error; - -use crate::payto::{EthAccount, FullEthPayto}; - -pub struct ServerState { - pool: PgPool, - payto: PaytoURI, - currency: Currency, - status: AtomicBool, - taler_in_channel: Sender<i64>, - taler_out_channel: Sender<i64>, -} - -pub async fn notification_listener( - pool: PgPool, - taler_in_channel: Sender<i64>, - taler_out_channel: Sender<i64>, -) -> sqlx::Result<()> { - taler_api::notification::notification_listener!(&pool, - "taler_in" => (row_id: i64) { - taler_in_channel.send_replace(row_id); - }, - "taler_out" => (row_id: i64) { - taler_out_channel.send_replace(row_id); - } - ) -} - -impl ServerState { - pub async fn start(pool: sqlx::PgPool, payto: PaytoURI, currency: Currency) -> Arc<Self> { - let taler_in_channel = Sender::new(0); - let taler_out_channel = Sender::new(0); - let tmp = Self { - pool: pool.clone(), - payto, - currency, - status: AtomicBool::new(true), - taler_in_channel: taler_in_channel.clone(), - taler_out_channel: taler_out_channel.clone(), - }; - let state = Arc::new(tmp); - tokio::spawn(status_watcher(state.clone())); - tokio::spawn(notification_listener( - pool, - taler_in_channel, - taler_out_channel, - )); - state - } -} - -impl TalerApi for ServerState { - fn currency(&self) -> &str { - self.currency.as_ref() - } - - fn implementation(&self) -> Option<&str> { - None - } -} - -fn sql_payto<I: sqlx::ColumnIndex<PgRow>>(r: &PgRow, addr: I, name: I) -> sqlx::Result<PaytoURI> { - let it: [u8; 20] = r.try_get(addr)?; - let addr = ethereum_types::Address::from_slice(&it); - let name: Option<&str> = r.try_get(name)?; - - Ok(EthAccount(addr) - .as_payto() - .as_full_payto(name.unwrap_or("Ethereum User"))) -} - -fn sql_generic_payto<I: sqlx::ColumnIndex<PgRow>>(row: &PgRow, idx: I) -> sqlx::Result<PaytoURI> { - let it: [u8; 20] = row.try_get(idx)?; - let addr = ethereum_types::Address::from_slice(&it); - - Ok(EthAccount(addr).as_payto().as_full_payto("Ethereum User")) -} - -impl WireGateway for ServerState { - async fn transfer(&self, req: TransferRequest) -> ApiResult<TransferResponse> { - let creditor = FullEthPayto::try_from(&req.credit_account)?; - - // TODO use plpgsql transaction - // Handle idempotence, check previous transaction with the same request_uid - let row = sqlx::query("SELECT (amount).val, (amount).frac, exchange_url, wtid, credit_acc, credit_name, id, created FROM tx_out WHERE request_uid = $1").bind(req.request_uid.as_slice()) - .fetch_optional(&self.pool) - .await?; - if let Some(r) = row { - // TODO store names? - let prev: TransferRequest = TransferRequest { - request_uid: req.request_uid.clone(), - amount: r.try_get_amount_i(0, &self.currency)?, - exchange_base_url: r.try_get_url("exchange_url")?, - wtid: r.try_get_base32("wtid")?, - credit_account: sql_payto(&r, "credit_acc", "credit_name")?, - }; - if prev == req { - // Idempotence - return Ok(TransferResponse { - row_id: r.try_get_safeu64("id")?, - timestamp: r.try_get_timestamp("created")?, - }); - } else { - return Err(failure( - ErrorCode::BANK_TRANSFER_REQUEST_UID_REUSED, - format!("Request UID {} already used", req.request_uid), - )); - } - } - - let timestamp = Timestamp::now(); - let r = sqlx::query( - "INSERT INTO tx_out (created, amount, wtid, credit_acc, credit_name, exchange_url, request_uid) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8) ON CONFLICT (wtid) DO NOTHING RETURNING id" - ) - .bind_timestamp(&Timestamp::now()) - .bind_amount(&req.amount) - .bind(req.wtid.as_slice()) - .bind(creditor.0.as_bytes()) - .bind(&creditor.name) - .bind(req.exchange_base_url.as_str()) - .bind(req.request_uid.as_slice()) - .fetch_optional(&self.pool) - .await?; - let Some(r) = r else { - return Err(failure( - ErrorCode::BANK_TRANSFER_WTID_REUSED, - format!("wtid {} already used", req.request_uid), - )); - }; - let row_id = r.try_get_safeu64(0)?; - sqlx::query("NOTIFY new_tx").execute(&self.pool).await?; - sqlx::query("SELECT pg_notify('taler_out', '' || $1)") - .bind(*row_id as i64) - .execute(&self.pool) - .await?; - - Ok(TransferResponse { timestamp, row_id }) - } - - async fn transfer_page( - &self, - params: Page, - status: Option<TransferState>, - ) -> ApiResult<TransferList> { - let debit_account = self.payto.clone(); - if status.is_some_and(|s| s != TransferState::success) { - return Ok(TransferList { - transfers: Vec::new(), - debit_account, - }); - } - let transfers = page( - &self.pool, - "id", - &params, - || { - QueryBuilder::new( - " - SELECT - id, - status, - (amount).val as amount_val, - (amount).frac as amount_frac, - credit_acc, - credit_name, - created - FROM tx_out WHERE request_uid IS NOT NULL AND - ", - ) - }, - |r: PgRow| { - Ok(TransferListStatus { - row_id: r.try_get_safeu64("id")?, - // TODO Fetch inner status - status: TransferState::success, - amount: r.try_get_amount("amount", &self.currency)?, - credit_account: sql_payto(&r, "credit_acc", "credit_name")?, - timestamp: r.try_get_timestamp("created")?, - }) - }, - ) - .await?; - Ok(TransferList { - transfers, - debit_account, - }) - } - - async fn transfer_by_id(&self, id: u64) -> ApiResult<Option<TransferStatus>> { - Ok(sqlx::query( - " - SELECT - status, - (amount).val as amount_val, - (amount).frac as amount_frac, - exchange_url, - wtid, - credit_acc, - credit_name, - created - FROM tx_out WHERE request_uid IS NOT NULL AND id = $1 - ", - ) - .bind(id as i64) - .try_map(|r: PgRow| { - Ok(TransferStatus { - // TODO Fetch inner status - status: TransferState::success, - status_msg: None, - amount: r.try_get_amount("amount", &self.currency)?, - origin_exchange_url: r.try_get("exchange_url")?, - wtid: r.try_get_base32("wtid")?, - credit_account: sql_payto(&r, "credit_acc", "credit_name")?, - timestamp: r.try_get_timestamp("created")?, - }) - }) - .fetch_optional(&self.pool) - .await?) - } - - async fn outgoing_history(&self, params: History) -> ApiResult<OutgoingHistory> { - let outgoing_transactions = history( - &self.pool, - "id", - &params, - || self.taler_out_channel.subscribe(), - || QueryBuilder::new( - "SELECT id, created, (amount).val, (amount).frac, wtid, credit_acc, credit_name, exchange_url FROM tx_out WHERE" - ), |r| { - Ok(OutgoingBankTransaction { - row_id: r.try_get_safeu64(0)?, - date: r.try_get_timestamp(1)?, - amount: r.try_get_amount_i(2, &self.currency)?, - wtid: r.try_get_base32(4)?, - credit_account: sql_payto(&r, "credit_acc", "credit_name")?, - exchange_base_url: r.try_get_url("exchange_url")?, - }) - }).await?; - Ok(OutgoingHistory { - debit_account: self.payto.clone(), - outgoing_transactions, - }) - } - - async fn incoming_history(&self, params: History) -> ApiResult<IncomingHistory> { - let incoming_transactions = history( - &self.pool, - "id", - &params, - || self.taler_in_channel.subscribe(), - || { - QueryBuilder::new( - "SELECT id, received, (amount).val, (amount).frac, reserve_pub, debit_acc FROM tx_in WHERE" - ) - }, - |r| { - Ok(IncomingBankTransaction::Reserve { - row_id: r.try_get_safeu64(0)?, - date: r.try_get_timestamp(1)?, - amount: r.try_get_amount_i(2, &self.currency)?, - reserve_pub: r.try_get_base32(4)?, - debit_account: sql_generic_payto(&r, 5)?, - }) - }, - ) - .await?; - Ok(IncomingHistory { - credit_account: self.payto.clone(), - incoming_transactions, - }) - } - - async fn add_incoming_reserve( - &self, - req: AddIncomingRequest, - ) -> ApiResult<AddIncomingResponse> { - let debtor = FullEthPayto::try_from(&req.debit_account)?; - let timestamp = Timestamp::now(); - let r = sqlx::query("INSERT INTO tx_in (received, amount, reserve_pub, debit_acc) VALUES ($1, ($2, $3)::taler_amount, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING RETURNING id") - .bind_timestamp(&Timestamp::now()) - .bind_amount(&req.amount) - .bind(req.reserve_pub.as_slice()) - .bind(debtor.0.as_bytes()) - .fetch_optional(&self.pool).await?; - let Some(r) = r else { - return Err(failure( - ErrorCode::BANK_DUPLICATE_RESERVE_PUB_SUBJECT, - "reserve_pub used already".to_owned(), - )); - }; - let row_id = r.try_get_safeu64(0)?; - sqlx::query("SELECT pg_notify('taler_in', '' || $1)") - .bind(*row_id as i64) - .execute(&self.pool) - .await?; - Ok(AddIncomingResponse { timestamp, row_id }) - } - - async fn add_incoming_kyc(&self, _req: AddKycauthRequest) -> ApiResult<AddKycauthResponse> { - Err(not_implemented( - "depolymerizer-ethereum does not supports KYC", - )) - } - - fn support_account_check(&self) -> bool { - false - } -} - -pub async fn status_middleware( - State(state): State<Arc<ServerState>>, - request: Request, - next: Next, -) -> Response { - if !state.status.load(Ordering::Relaxed) { - failure_status( - ErrorCode::GENERIC_INTERNAL_INVARIANT_FAILURE, - "Currency backing is compromised until the transaction reappear", - StatusCode::BAD_GATEWAY, - ) - .into_response() - } else { - next.run(request).await - } -} - -/// Listen to backend status change -async fn status_watcher(state: Arc<ServerState>) { - let mut jitter = client_jitter(); - async fn inner( - state: &ServerState, - jitter: &mut ExpoBackoffDecorr, - ) -> Result<(), sqlx::error::Error> { - let mut listener = PgListener::connect_with(&state.pool).await?; - listener.listen("status").await?; - loop { - // Sync state - let row = sqlx::query("SELECT value FROM state WHERE name = 'status'") - .fetch_one(&state.pool) - .await?; - let status: &[u8] = row.try_get(0)?; - assert!(status.len() == 1 && status[0] < 2); - state.status.store(status[0] == 1, Ordering::SeqCst); - // Wait for next notification - listener.recv().await?; - jitter.reset(); - } - } - - loop { - if let Err(err) = inner(&state, &mut jitter).await { - error!("status-watcher: {}", err); - // TODO better sleep - sleep(Duration::from_secs(jitter.next() as u64)).await; - } - } -} diff --git a/depolymerizer-ethereum/src/config.rs b/depolymerizer-ethereum/src/config.rs @@ -1,129 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use common::postgres; -use ethereum_types::U256; -use taler_api::{ - Serve, - config::{ApiCfg, DbCfg}, -}; -use taler_common::{ - config::{Config, ValueErr}, - types::{amount::Currency, payto::PaytoURI}, -}; - -use crate::{ - payto::{EthAccount, FullEthPayto}, - taler_util::taler_to_eth, -}; - -pub fn parse_db_cfg(cfg: &Config) -> Result<DbCfg, ValueErr> { - DbCfg::parse(cfg.section("depolymerizer-ethereumdb-postgres")) -} - -pub fn parse_account_payto(cfg: &Config) -> Result<FullEthPayto, ValueErr> { - let sect = cfg.section("depolymerizer-ethereum"); - let wallet: EthAccount = sect - .parse("ethereum account address", "ACCOUNT") - .require()?; - let name = sect.str("NAME").require()?; - - Ok(FullEthPayto::new(wallet, name)) -} - -pub struct ServeCfg { - pub payto: PaytoURI, - pub serve: Serve, - pub wire_gateway: Option<ApiCfg>, - pub revenue: Option<ApiCfg>, - pub currency: Currency, - pub lifetime: Option<u32>, -} - -impl ServeCfg { - pub fn parse(cfg: &Config) -> Result<Self, ValueErr> { - let payto = parse_account_payto(cfg)?; - - let sect = cfg.section("depolymerizer-ethereum-httpd"); - - let lifetime = sect.number("LIFETIME").opt()?.filter(|it| *it != 0); - - let serve = Serve::parse(sect)?; - - let wire_gateway = - ApiCfg::parse(cfg.section("depolymerizer-ethereum-httpd-wire-gateway-api"))?; - let revenue = ApiCfg::parse(cfg.section("depolymerizer-ethereum-httpd-revenue-api"))?; - - let sect = cfg.section("depolymerizer-ethereum"); - Ok(Self { - currency: sect.parse("currency", "CURRENCY").require()?, - lifetime, - payto: payto.as_payto(), - serve, - wire_gateway, - revenue, - }) - } -} - -#[derive(Debug, Clone)] - -pub struct WorkerCfg { - pub confirmation: u32, - pub max_confirmation: u32, - pub bounce_fee: U256, - pub ipc_path: String, - pub lifetime: Option<u32>, - pub bump_delay: Option<u32>, - pub db_config: postgres::Config, - pub currency: Currency, - pub account: EthAccount, - pub password: String, -} - -impl WorkerCfg { - pub fn parse(cfg: &Config) -> Result<Self, ValueErr> { - let sect = cfg.section("depolymerizer-ethereum"); - let currency: Currency = sect.parse("currency", "CURRENCY").require()?; - let account = sect.parse("Ethereum account", "ACCOUNT").require()?; - - let sect = cfg.section("depolymerizer-ethereum-worker"); - let confirmation = sect.number("CONFIRMATION").require()?; - let ipc_path = sect.path("IPC_PATH").require()?; - let password = sect.str("PASSWORD").require()?; - - Ok(Self { - account, - ipc_path, - confirmation, - max_confirmation: confirmation * 2, - bounce_fee: sect - .amount("BOUNCE_FEE", currency.as_ref()) - .opt()? - .map(|it| taler_to_eth(&it)) - .unwrap_or(U256::zero()), - lifetime: sect.number("LIFETIME").opt()?.filter(|it| *it != 0), - bump_delay: sect.number("BUMP_DELAY").opt()?.filter(|it| *it != 0), - db_config: cfg - .section("depolymerizer-ethereumdb-postgres") - .parse("Postgres", "CONFIG") - .require() - .unwrap(), - currency, - password, - }) - } -} diff --git a/depolymerizer-ethereum/src/fail_point.rs b/depolymerizer-ethereum/src/fail_point.rs @@ -1,31 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -#[derive(Debug, thiserror::Error)] -#[error("{0}")] -pub struct Injected(&'static str); - -/// Inject random failure when 'fail' feature is used -#[allow(unused_variables)] -pub fn fail_point(msg: &'static str, prob: f32) -> Result<(), Injected> { - #[cfg(feature = "fail")] - return if common::rand::random::<f32>() < prob { - Err(Injected(msg)) - } else { - Ok(()) - }; - - Ok(()) -} diff --git a/depolymerizer-ethereum/src/lib.rs b/depolymerizer-ethereum/src/lib.rs @@ -1,248 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::fmt::Debug; - -use common::{ - log::OrFail, - metadata::{InMetadata, OutMetadata}, - taler_common::api_common::{EddsaPublicKey, ShortHashCode}, - url::Url, -}; -use ethereum_types::{Address, H256, U64, U256}; -use rpc::{Rpc, RpcClient, RpcStream, Transaction, hex::Hex}; -use serde::de::DeserializeOwned; -use taler_common::config::parser::ConfigSource; - -pub mod api; -pub mod config; -pub mod payto; -pub mod rpc; -pub mod taler_util; - -pub const CONFIG_SOURCE: ConfigSource = ConfigSource::simple("depolymerizer-ethereum"); -pub const DB_SCHEMA: &str = "depolymerizer_ethereum"; - -/// An extended geth JSON-RPC api client who can send and retrieve metadata with their transaction -pub trait RpcExtended: RpcClient { - /// Perform a wire credit - fn credit( - &mut self, - from: Address, - to: Address, - value: U256, - reserve_pub: EddsaPublicKey, - ) -> rpc::Result<H256> { - let metadata = InMetadata::Credit { reserve_pub }; - self.send_transaction(&rpc::TransactionRequest { - from, - to, - value, - nonce: None, - gas_price: None, - data: Hex(metadata.encode()), - }) - } - - /// Perform a wire debit - fn debit( - &mut self, - from: Address, - to: Address, - value: U256, - wtid: ShortHashCode, - url: Url, - ) -> rpc::Result<H256> { - let metadata = OutMetadata::Debit { wtid, url }; - self.send_transaction(&rpc::TransactionRequest { - from, - to, - value, - nonce: None, - gas_price: None, - data: Hex(metadata.encode().or_fail(|e| format!("{e}"))), - }) - } - - /// Perform a Taler bounce - fn bounce(&mut self, hash: H256, bounce_fee: U256) -> rpc::Result<Option<H256>> { - let tx = self - .get_transaction(&hash)? - .expect("Cannot bounce a non existent transaction"); - let bounce_value = tx.value.saturating_sub(bounce_fee); - let metadata = OutMetadata::Bounce { bounced: hash.0 }; - let mut request = rpc::TransactionRequest { - from: tx.to.expect("Cannot bounce contract transaction"), - to: tx.from.expect("Cannot bounce coinbase transaction"), - value: bounce_value, - nonce: None, - gas_price: None, - data: Hex(metadata.encode().or_fail(|e| format!("{e}"))), - }; - // Estimate fee price using node - let fill = self.fill_transaction(&request)?; - // Deduce fee price from bounced value - request.value = request - .value - .saturating_sub(fill.tx.gas * fill.tx.gas_price.or(fill.tx.max_fee_per_gas).unwrap()); - Ok(if request.value.is_zero() { - None - } else { - Some(self.send_transaction(&request)?) - }) - } - - /// List new and removed transaction since the last sync state and the size of the reorganized fork if any, returning a new sync state - fn list_since_sync( - &mut self, - address: &Address, - state: SyncState, - min_confirmation: u32, - ) -> rpc::Result<ListSinceSync> { - let match_tx = |txs: Vec<Transaction>, confirmations: u32| -> Vec<SyncTransaction> { - txs.into_iter() - .filter_map(|tx| { - (tx.from == Some(*address) || tx.to == Some(*address)) - .then_some(SyncTransaction { tx, confirmations }) - }) - .collect() - }; - - let mut txs = Vec::new(); - let mut removed = Vec::new(); - let mut fork_len = 0; - - // Add pending transaction - txs.extend(match_tx(self.pending_transactions()?, 0)); - - let latest = self.latest_block()?; - - let mut confirmation = 1; - let mut chain_cursor = latest.clone(); - - // Move until tip height - while chain_cursor.number.unwrap() != state.tip_height { - txs.extend(match_tx(chain_cursor.transactions, confirmation)); - chain_cursor = self - .block(&chain_cursor.parent_hash)? - .expect("broken blockchain"); - confirmation += 1; - } - - // Check if fork - if chain_cursor.hash.unwrap() != state.tip_hash { - let mut fork_cursor = self.block(&state.tip_hash)?.expect("broken blockchain"); - // Move until found common parent - while fork_cursor.hash != chain_cursor.hash { - txs.extend(match_tx(chain_cursor.transactions, confirmation)); - removed.extend(match_tx(fork_cursor.transactions, confirmation)); - chain_cursor = self - .block(&chain_cursor.parent_hash)? - .expect("broken blockchain"); - fork_cursor = self - .block(&fork_cursor.parent_hash)? - .expect("broken blockchain"); - confirmation += 1; - fork_len += 1; - } - } - - // Move until last conf - while chain_cursor.number.unwrap() > state.conf_height { - txs.extend(match_tx(chain_cursor.transactions, confirmation)); - chain_cursor = self - .block(&chain_cursor.parent_hash)? - .expect("broken blockchain"); - confirmation += 1; - } - - Ok(ListSinceSync { - txs, - removed, - fork_len, - state: SyncState { - tip_hash: latest.hash.unwrap(), - tip_height: latest.number.unwrap(), - conf_height: latest - .number - .unwrap() - .saturating_sub(U64::from(min_confirmation)), - }, - }) - } -} - -impl RpcExtended for Rpc {} -impl<N: Debug + DeserializeOwned> RpcExtended for RpcStream<'_, N> {} - -pub struct SyncTransaction { - pub tx: Transaction, - pub confirmations: u32, -} - -pub struct ListSinceSync { - pub txs: Vec<SyncTransaction>, - pub removed: Vec<SyncTransaction>, - pub state: SyncState, - pub fork_len: u32, -} - -#[derive(Debug, Clone, Copy, PartialEq, Eq)] -pub struct SyncState { - pub tip_hash: H256, - pub tip_height: U64, - pub conf_height: U64, -} - -impl SyncState { - pub fn to_bytes(&self) -> [u8; 48] { - let mut bytes = [0; 48]; - bytes[..32].copy_from_slice(self.tip_hash.as_bytes()); - bytes[32..40].copy_from_slice(&self.tip_height.to_little_endian()); - bytes[40..].copy_from_slice(&self.conf_height.to_little_endian()); - bytes - } - - pub fn from_bytes(bytes: &[u8; 48]) -> Self { - Self { - tip_hash: H256::from_slice(&bytes[..32]), - tip_height: U64::from_little_endian(&bytes[32..40]), - conf_height: U64::from_little_endian(&bytes[40..]), - } - } -} - -#[cfg(test)] -mod test { - use common::{rand::random, rand_slice}; - use ethereum_types::{H256, U64}; - - use crate::SyncState; - - #[test] - fn to_from_bytes_block_state() { - for _ in 0..4 { - let state = SyncState { - tip_hash: H256::from_slice(&rand_slice::<32>()), - tip_height: U64::from(random::<u64>()), - conf_height: U64::from(random::<u64>()), - }; - let encoded = state.to_bytes(); - let decoded = SyncState::from_bytes(&encoded); - assert_eq!(state, decoded); - } - } -} diff --git a/depolymerizer-ethereum/src/loops.rs b/depolymerizer-ethereum/src/loops.rs @@ -1,38 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use common::postgres; -use depolymerizer_ethereum::rpc; - -use crate::fail_point::Injected; - -pub mod analysis; -pub mod watcher; -pub mod worker; - -#[derive(Debug, thiserror::Error)] -pub enum LoopError { - #[error("RPC {0}")] - Rpc(#[from] rpc::Error), - #[error("DB {0}")] - DB(#[from] postgres::Error), - #[error("Another eth-wire process is running concurrently")] - Concurrency, - #[error(transparent)] - Injected(#[from] Injected), -} - -pub type LoopResult<T> = Result<T, LoopError>; diff --git a/depolymerizer-ethereum/src/loops/analysis.rs b/depolymerizer-ethereum/src/loops/analysis.rs @@ -1,33 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use tracing::warn; - -use super::LoopResult; - -/// Analyse blockchain behavior and adapt confirmations in real time -pub fn analysis(fork: u32, current: u32, max: u32) -> LoopResult<u32> { - // If new fork is bigger than what current confirmation delay protect against - if fork >= current { - // Limit confirmation growth - let new_conf = fork.saturating_add(1).min(max); - warn!( - "analysis: found dangerous fork of {fork} blocks, adapt confirmation to {new_conf} blocks capped at {max}, you should update taler.conf" - ); - return Ok(new_conf); - } - Ok(current) -} diff --git a/depolymerizer-ethereum/src/loops/watcher.rs b/depolymerizer-ethereum/src/loops/watcher.rs @@ -1,43 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -use std::time::Duration; - -use common::reconnect::{client_jitter, connect_db}; -use depolymerizer_ethereum::{DB_SCHEMA, config::WorkerCfg, rpc::rpc_common}; -use tracing::error; - -use super::LoopResult; - -/// Wait for new block and notify arrival with postgreSQL notifications -pub fn watcher(state: &WorkerCfg) { - let mut jitter = client_jitter(); - loop { - let result: LoopResult<()> = (|| { - let rpc = &mut rpc_common(&state.ipc_path)?; - let db = &mut connect_db(&state.db_config, DB_SCHEMA)?; - let mut notifier = rpc.subscribe_new_head()?; - loop { - db.execute("NOTIFY new_block", &[])?; - notifier.next()?; - jitter.reset(); - } - })(); - if let Err(e) = result { - error!("watcher: {e}"); - std::thread::sleep(Duration::from_millis(jitter.next() as u64)); - } - } -} diff --git a/depolymerizer-ethereum/src/loops/worker.rs b/depolymerizer-ethereum/src/loops/worker.rs @@ -1,530 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -use std::{fmt::Write, time::Duration}; - -use common::{ - metadata::{InMetadata, OutMetadata}, - postgres::{Client, fallible_iterator::FallibleIterator}, - reconnect::{client_jitter, connect_db}, - sql::{sql_array, sql_base_32, sql_url}, - status::{BounceStatus, DebitStatus}, - taler_common::{api_common::ShortHashCode, types::timestamp::Timestamp}, -}; -use depolymerizer_ethereum::{ - DB_SCHEMA, ListSinceSync, RpcExtended, SyncState, SyncTransaction, - config::WorkerCfg, - rpc::{self, Rpc, RpcClient, Transaction, TransactionRequest, rpc_wallet}, - taler_util::eth_to_taler, -}; -use ethereum_types::{Address, H256, U256}; -use tracing::{error, info, warn}; - -use crate::{ - fail_point::fail_point, - loops::LoopError, - sql::{sql_addr, sql_eth_amount, sql_hash}, -}; - -use super::{LoopResult, analysis::analysis}; - -pub fn worker(mut state: WorkerCfg) { - let mut jitter = client_jitter(); - let mut lifetime = state.lifetime; - let mut status = true; - let mut skip_notification = true; - - loop { - let result: LoopResult<()> = (|| { - // Connect - let rpc = &mut rpc_wallet(&state.ipc_path, &state.password, &state.account.0)?; - let db = &mut connect_db(&state.db_config, DB_SCHEMA)?; - - loop { - // Listen to all channels - db.batch_execute("LISTEN new_block; LISTEN new_tx")?; - // Wait for the next notification - { - let mut ntf = db.notifications(); - if !skip_notification && ntf.is_empty() { - // Block until next notification - ntf.blocking_iter().next()?; - } - // Conflate all notifications - let mut iter = ntf.iter(); - while iter.next()?.is_some() {} - } - - // Check lifetime - if let Some(nb) = lifetime.as_mut() { - if *nb == 0 { - info!("Reach end of lifetime"); - return Ok(()); - } else { - *nb -= 1; - } - } - - // It is not possible to atomically update the blockchain and the database. - // When we failed to sync the database and the blockchain state we rely on - // sync_chain to recover the lost updates. - // When this function is running concurrently, it not possible to known another - // execution has failed, and this can lead to a transaction being sent multiple time. - // To ensure only a single version of this function is running at a given time we rely - // on postgres advisory lock - - // Take the lock - let row = db.query_one("SELECT pg_try_advisory_lock(42)", &[])?; - let locked: bool = row.get(0); - if !locked { - return Err(LoopError::Concurrency); - } - - // Get stored sync state - let row = db.query_one("SELECT value FROM state WHERE name='sync'", &[])?; - let sync_state = SyncState::from_bytes(&sql_array(&row, 0)); - - // Get changes - let list = rpc.list_since_sync(&state.account.0, sync_state, state.confirmation)?; - - // Perform analysis - state.confirmation = - analysis(list.fork_len, state.confirmation, state.max_confirmation)?; - - // Sync chain - if sync_chain(db, &state, &mut status, list)? { - // As we are now in sync with the blockchain if a transaction has Requested status it have not been sent - - // Send requested debits - while debit(db, rpc, &state)? {} - - // Bump stuck transactions - while bump(db, rpc, &state)? {} - - // Send requested bounce - while bounce(db, rpc, state.bounce_fee)? {} - } - - skip_notification = false; - jitter.reset(); - } - })(); - - if let Err(e) = result { - error!("worker: {e}"); - // When we catch an error, we sometimes want to retry immediately (eg. reconnect to RPC or DB). - // Rpc error codes are generic. We need to match the msg to get precise ones. Some errors - // can resolve themselves when a new block is mined (new fees, new transactions). Our simple - // approach is to wait for the next loop when an RPC error is caught to prevent endless logged errors. - skip_notification = matches!( - e, - LoopError::Rpc(rpc::Error::Transport(_)) - | LoopError::DB(_) - | LoopError::Injected(_) - ); - std::thread::sleep(Duration::from_millis(jitter.next() as u64)); - } else { - return; - } - } -} - -/// Parse new transactions, return true if the database is up to date with the latest mined block -fn sync_chain( - db: &mut Client, - state: &WorkerCfg, - status: &mut bool, - list: ListSinceSync, -) -> LoopResult<bool> { - // Get the current confirmation delay - let conf_delay = state.confirmation; - - // Check if a confirmed incoming transaction have been removed by a blockchain reorganization - let new_status = - sync_chain_removed(&list.txs, &list.removed, db, &state.account.0, conf_delay)?; - - // Sync status with database - if *status != new_status { - let mut tx = db.transaction()?; - tx.execute( - "UPDATE state SET value=$1 WHERE name='status'", - &[&[new_status as u8].as_slice()], - )?; - tx.execute("NOTIFY status", &[])?; - tx.commit()?; - *status = new_status; - if new_status { - info!("Recovered lost transactions"); - } - } - if !new_status { - return Ok(false); - } - - for sync_tx in list.txs { - let tx = &sync_tx.tx; - if tx.to == Some(state.account.0) && sync_tx.confirmations >= conf_delay { - sync_chain_incoming_confirmed(tx, db, state)?; - } else if tx.from == Some(state.account.0) { - sync_chain_outgoing(&sync_tx, db, state)?; - } - } - - db.execute( - "UPDATE state SET value=$1 WHERE name='sync'", - &[&list.state.to_bytes().as_slice()], - )?; - Ok(true) -} - -/// Sync database with removed transactions, return false if ethereum backing is compromised -fn sync_chain_removed( - txs: &[SyncTransaction], - removed: &[SyncTransaction], - db: &mut Client, - addr: &Address, - min_confirmation: u32, -) -> LoopResult<bool> { - // A removed incoming transaction is a correctness issues in only two cases: - // - it is a confirmed credit registered in the database - // - it is an invalid transactions already bounced - // Those two cases can compromise ethereum backing - // Removed outgoing transactions will be retried automatically by the node - - let mut blocking_credit = Vec::new(); - let mut blocking_bounce = Vec::new(); - - // Only keep incoming transaction that are not reconfirmed - // TODO study risk of accepting only mined transactions for faster recovery - for tx in removed - .iter() - .filter(|sync_tx| { - sync_tx.tx.to == Some(*addr) - && txs - .iter() - .all(|it| it.tx.hash != sync_tx.tx.hash || it.confirmations < min_confirmation) - }) - .map(|s| &s.tx) - { - match InMetadata::decode(&tx.input) { - Ok(metadata) => match metadata { - InMetadata::Credit { reserve_pub } => { - // Credits are only problematic if not reconfirmed and stored in the database - if db - .query_opt( - "SELECT 1 FROM tx_in WHERE reserve_pub=$1", - &[&reserve_pub.as_slice()], - )? - .is_some() - { - blocking_credit.push((reserve_pub, tx.hash, tx.from.unwrap())); - } - } - }, - Err(_) => { - // Invalid tx are only problematic if if not reconfirmed and already bounced - if let Some(row) = db.query_opt( - "SELECT txid FROM bounce WHERE bounced=$1 AND txid IS NOT NULL", - &[&tx.hash.as_ref()], - )? { - blocking_bounce.push((sql_hash(&row, 0), tx.hash)); - } else { - // Remove transaction from bounce table - db.execute("DELETE FROM bounce WHERE bounced=$1", &[&tx.hash.as_ref()])?; - } - } - } - } - - if !blocking_bounce.is_empty() || !blocking_credit.is_empty() { - let mut buf = "The following transaction have been removed from the blockchain, ethereum backing is compromised until the transaction reappear:".to_string(); - for (key, id, addr) in blocking_credit { - write!( - &mut buf, - "\n\tcredit {key} in {} from {}", - hex::encode(id), - hex::encode(addr) - ) - .unwrap(); - } - for (id, bounced) in blocking_bounce { - write!( - &mut buf, - "\n\tbounce {} in {}", - hex::encode(id), - hex::encode(bounced) - ) - .unwrap(); - } - error!("{buf}"); - Ok(false) - } else { - Ok(true) - } -} - -/// Sync database with an incoming confirmed transaction -fn sync_chain_incoming_confirmed( - tx: &Transaction, - db: &mut Client, - state: &WorkerCfg, -) -> Result<(), LoopError> { - match InMetadata::decode(&tx.input) { - Ok(metadata) => match metadata { - InMetadata::Credit { reserve_pub } => { - let amount = eth_to_taler(&tx.value, &state.currency); - let credit_addr = tx.from.expect("Not coinbase"); - let nb = db.execute("INSERT INTO tx_in (received, amount, reserve_pub, debit_acc) VALUES ($1, ($2, $3)::taler_amount, $4, $5) ON CONFLICT (reserve_pub) DO NOTHING ", &[ - &Timestamp::now().as_sql_micros(), &(amount.val as i64), &(amount.frac as i32), &reserve_pub.as_slice(), &credit_addr.as_bytes() - ])?; - if nb > 0 { - info!( - "<< {amount} {reserve_pub} in {} from {}", - hex::encode(tx.hash), - hex::encode(credit_addr), - ); - } - } - }, - Err(_) => { - // If encoding is wrong request a bounce - db.execute( - "INSERT INTO bounce (created, bounced) VALUES ($1, $2) ON CONFLICT (bounced) DO NOTHING", - &[&Timestamp::now().as_sql_micros(), &tx.hash.as_ref()], - )?; - } - } - Ok(()) -} - -/// Sync database with an outgoing transaction -fn sync_chain_outgoing(tx: &SyncTransaction, db: &mut Client, state: &WorkerCfg) -> LoopResult<()> { - let SyncTransaction { tx, confirmations } = tx; - match OutMetadata::decode(&tx.input) { - Ok(metadata) => match metadata { - OutMetadata::Debit { wtid, url } => { - let amount = eth_to_taler(&tx.value, &state.currency); - let credit_addr = tx.to.unwrap(); - // Get previous out tx - let row = db.query_opt( - "SELECT id, status, sent FROM tx_out WHERE wtid=$1 FOR UPDATE", - &[&wtid.as_slice()], - )?; - if let Some(row) = row { - // If already in database, sync status - let row_id: i64 = row.get(0); - let status: i16 = row.get(1); - let sent: Option<i64> = row.get(2); - - let expected_status = DebitStatus::Sent as i16; - let expected_send = sent.filter(|_| *confirmations == 0); - if status != expected_status || sent != expected_send { - let nb_row = db.execute( - "UPDATE tx_out SET status=$1, txid=$2, sent=NULL WHERE id=$3 AND status=$4", - &[ - &(DebitStatus::Sent as i16), - &tx.hash.as_ref(), - &row_id, - &status, - ], - )?; - if nb_row > 0 { - match DebitStatus::try_from(status as u8).unwrap() { - DebitStatus::Requested => { - warn!( - ">> (recovered) {amount} {wtid} in {} to {}", - hex::encode(tx.hash), - hex::encode(credit_addr) - ); - } - DebitStatus::Sent => { /* Status is correct */ } - } - } - } - } else { - // Else add to database - let nb = db.execute( - "INSERT INTO tx_out (created, amount, wtid, credit_acc, exchange_url, status, txid, request_uid) VALUES ($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9) ON CONFLICT (wtid) DO NOTHING", - &[&Timestamp::now().as_sql_micros(), &(amount.val as i64), &(amount.frac as i32), &wtid.as_slice(), &credit_addr.as_bytes(), &url.to_string(), &(DebitStatus::Sent as i16), &tx.hash.as_ref(), &None::<&[u8]>], - )?; - if nb > 0 { - warn!( - ">> (onchain) {amount} {wtid} in {} to {}", - hex::encode(tx.hash), - hex::encode(credit_addr) - ); - } - } - } - OutMetadata::Bounce { bounced } => { - let bounced = H256::from_slice(&bounced); - // Get previous bounce - let row = db.query_opt( - "SELECT id, status FROM bounce WHERE bounced=$1", - &[&bounced.as_ref()], - )?; - if let Some(row) = row { - // If already in database, sync status - let row_id: i64 = row.get(0); - let status: i16 = row.get(1); - match BounceStatus::try_from(status as u8).unwrap() { - BounceStatus::Requested => { - let nb_row = db.execute( - "UPDATE bounce SET status=$1, txid=$2 WHERE id=$3 AND status=$4", - &[ - &(BounceStatus::Sent as i16), - &tx.hash.as_ref(), - &row_id, - &status, - ], - )?; - if nb_row > 0 { - warn!( - "|| (recovered) {} in {}", - hex::encode(bounced), - hex::encode(tx.hash) - ); - } - } - BounceStatus::Ignored => error!( - "watcher: ignored bounce {} found in chain at {}", - bounced, - hex::encode(tx.hash) - ), - BounceStatus::Sent => { /* Status is correct */ } - } - } else { - // Else add to database - let nb = db.execute( - "INSERT INTO bounce (created, bounced, txid, status) VALUES ($1, $2, $3, $4) ON CONFLICT (txid) DO NOTHING", - &[&Timestamp::now().as_sql_micros(), &bounced.as_ref(), &tx.hash.as_ref(), &(BounceStatus::Sent as i16)], - )?; - if nb > 0 { - warn!( - "|| (onchain) {} in {}", - hex::encode(bounced), - hex::encode(tx.hash) - ); - } - } - } - }, - Err(_) => { /* Ignore */ } - } - Ok(()) -} - -/// Send a debit transaction on the blockchain, return false if no more requested transactions are found -fn debit(db: &mut Client, rpc: &mut Rpc, state: &WorkerCfg) -> LoopResult<bool> { - // We rely on the advisory lock to ensure we are the only one sending transactions - let row = db.query_opt( -"SELECT id, (amount).val, (amount).frac, wtid, credit_acc, exchange_url FROM tx_out WHERE status=$1 ORDER BY created LIMIT 1", -&[&(DebitStatus::Requested as i16)], -)?; - if let Some(row) = &row { - let id: i64 = row.get(0); - let amount = sql_eth_amount(row, 1, &state.currency); - let wtid: ShortHashCode = sql_base_32(row, 3); - let addr = sql_addr(row, 4); - let url = sql_url(row, 5); - let now = Timestamp::now(); - let tx_id = rpc.debit(state.account.0, addr, amount, wtid.clone(), url)?; - fail_point("(injected) fail debit", 0.3)?; - db.execute( - "UPDATE tx_out SET status=$1, txid=$2, sent=$3 WHERE id=$4", - &[ - &(DebitStatus::Sent as i16), - &tx_id.as_ref(), - &now.as_sql_micros(), - &id, - ], - )?; - let amount = eth_to_taler(&amount, &state.currency); - info!( - ">> {amount} {wtid} in {} to {}", - hex::encode(tx_id), - hex::encode(addr) - ); - } - Ok(row.is_some()) -} - -/// Bump a stuck transaction, return false if no more stuck transactions are found -fn bump(db: &mut Client, rpc: &mut Rpc, state: &WorkerCfg) -> LoopResult<bool> { - if let Some(delay) = state.bump_delay { - let now = Timestamp::now().as_sql_micros(); - // We rely on the advisory lock to ensure we are the only one sending transactions - let row = db.query_opt( - "SELECT id, txid FROM tx_out WHERE status=$1 AND $2 - sent > $3 ORDER BY created LIMIT 1", - &[&(DebitStatus::Sent as i16), &now, &((delay * 1000000) as i64)], - )?; - if let Some(row) = &row { - let now = Timestamp::now(); - let id: i64 = row.get(0); - let txid = sql_hash(row, 1); - let tx = rpc.get_transaction(&txid)?.expect("Bump existing tx"); - rpc.send_transaction(&TransactionRequest { - from: tx.from.unwrap(), - to: tx.to.unwrap(), - value: tx.value, - gas_price: None, - data: tx.input, - nonce: Some(tx.nonce), - })?; - let row = db.query_one( - "UPDATE tx_out SET sent=$1 WHERE id=$2 RETURNING wtid", - &[&now.as_sql_micros(), &id], - )?; - let wtid: ShortHashCode = sql_base_32(&row, 0); - info!(">> (bump) {wtid} in {}", hex::encode(txid)); - } - Ok(row.is_some()) - } else { - Ok(false) - } -} - -/// Bounce a transaction on the blockchain, return false if no more requested transactions are found -fn bounce(db: &mut Client, rpc: &mut Rpc, fee: U256) -> LoopResult<bool> { - // We rely on the advisory lock to ensure we are the only one sending transactions - let row = db.query_opt( - "SELECT id, bounced FROM bounce WHERE status=$1 ORDER BY created LIMIT 1", - &[&(BounceStatus::Requested as i16)], - )?; - if let Some(row) = &row { - let id: i64 = row.get(0); - let bounced: H256 = sql_hash(row, 1); - - let bounce = rpc.bounce(bounced, fee)?; - match bounce { - Some(hash) => { - fail_point("(injected) fail bounce", 0.3)?; - db.execute( - "UPDATE bounce SET txid=$1, status=$2 WHERE id=$3", - &[&hash.as_ref(), &(BounceStatus::Sent as i16), &id], - )?; - info!("|| {} in {}", hex::encode(bounced), hex::encode(hash)); - } - None => { - db.execute( - "UPDATE bounce SET status=$1 WHERE id=$2", - &[&(BounceStatus::Ignored as i16), &id], - )?; - info!("|| (ignore) {} ", hex::encode(bounced)); - } - } - } - Ok(row.is_some()) -} diff --git a/depolymerizer-ethereum/src/main.rs b/depolymerizer-ethereum/src/main.rs @@ -1,214 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use axum::{Router, middleware}; -use clap::Parser; -use common::{ - named_spawn, - taler_common::{ - CommonArgs, - cli::{ConfigCmd, long_version}, - config::Config, - }, -}; -use depolymerizer_ethereum::{ - CONFIG_SOURCE, DB_SCHEMA, SyncState, - api::{ServerState, status_middleware}, - config::{ServeCfg, WorkerCfg, parse_db_cfg}, - rpc::{Rpc, RpcClient}, -}; -use loops::{watcher::watcher, worker::worker}; -use taler_api::api::TalerRouter as _; -use taler_common::{ - db::{dbinit, pool}, - taler_main, -}; -use tracing::info; - -mod fail_point; -mod loops; -mod sql; - -/// Taler wire for geth -#[derive(clap::Parser, Debug)] -#[command(long_version = long_version(), about, long_about = None)] -struct Args { - #[clap(flatten)] - common: CommonArgs, - #[clap(subcommand)] - cmd: Command, -} - -#[derive(clap::Subcommand, Debug)] -enum Command { - /// Initialize btc-wire database - Dbinit { - /// Reset database (DANGEROUS: All existing data is lost) - #[clap(long, short)] - reset: bool, - }, - /// TODO - Setup { - #[clap(long, short)] - reset: bool, - }, - /// Run btc-wire worker - Worker { - /// Execute once and return - #[clap(long, short)] - transient: bool, - }, - /// Run btc-wire HTTP server - Serve { - /// Check whether an API is in use (if it's useful to start the HTTP - /// server). Exit with 0 if at least one API is enabled, otherwise 1 - #[clap(long)] - check: bool, - }, - #[command(subcommand)] - Config(ConfigCmd), -} - -/* -fn run(config: Option<PathBuf>) { - let state = WireState::load_taler_config(config.as_deref()); - - let rpc_worker = auto_rpc_wallet(state.ipc_path.clone(), state.account.0); - let rpc_watcher = auto_rpc_common(state.ipc_path.clone()); - - let db_watcher = auto_reconnect_db(state.db_config.clone(), "TODO".to_owned()); - let db_worker = auto_reconnect_db(state.db_config.clone(), "TODO".to_owned()); - - named_spawn("watcher", move || watcher(rpc_watcher, db_watcher)); - worker(rpc_worker, db_worker, state); - info!("eth-wire stopped"); -}*/ - -async fn app(args: Args, cfg: Config) -> anyhow::Result<()> { - match args.cmd { - Command::Dbinit { reset } => { - let cfg = parse_db_cfg(&cfg)?; - let pool = pool(cfg.cfg, DB_SCHEMA).await?; - let mut conn = pool.acquire().await?; - dbinit( - &mut conn, - cfg.sql_dir.as_ref(), - CONFIG_SOURCE.component_name, - reset, - ) - .await?; - } - Command::Setup { reset } => { - info!("Connect to geth"); - let state = WorkerCfg::parse(&cfg)?; - let mut rpc = Rpc::new(&state.ipc_path)?; - let block = rpc.earliest_block()?; - - /*#[cfg(feature = "fail")] - if info.chain != "regtest" { - anyhow::bail!("Running with random failures is unsuitable for production"); - }*/ - - //TODO - // TODO wait for the blockchain to sync - // TODO Check wire wallet own config PAYTO address - - info!("Check wallet"); - // TODO - - info!("Setup database state"); - let db_cfg = parse_db_cfg(&cfg)?; - let state = SyncState { - tip_hash: block.hash.unwrap(), - tip_height: block.number.unwrap(), - conf_height: block.number.unwrap(), - }; - let pool = pool(db_cfg.cfg, DB_SCHEMA).await?; - - // Init status to true - sqlx::query("INSERT INTO state (name, value) VALUES ('status', $1) ON CONFLICT (name) DO NOTHING") - .bind([1u8]) - .execute( &pool).await?; - sqlx::query( - "INSERT INTO state (name, value) VALUES ('sync', $1) ON CONFLICT (name) DO NOTHING", - ) - .bind(state.to_bytes()) - .execute(&pool) - .await?; - // TODO reset ? - - println!("Database initialised"); - } - Command::Worker { transient } => { - let state = WorkerCfg::parse(&cfg)?; - - #[cfg(feature = "fail")] - tracing::warn!("Running with random failures"); - // TODO Check wire wallet own config PAYTO address - - named_spawn("worker", move || { - let tmp = state.clone(); - named_spawn("watcher", move || watcher(&tmp)); - worker(state) - }) - .join() - .unwrap(); - - info!("btc-wire stopped"); - } - Command::Serve { check } => { - if check { - let cfg = ServeCfg::parse(&cfg)?; - if cfg.revenue.is_none() && cfg.wire_gateway.is_none() { - std::process::exit(1); - } - } else { - let db = parse_db_cfg(&cfg)?; - let pool = pool(db.cfg, DB_SCHEMA).await?; - let cfg = ServeCfg::parse(&cfg)?; - let api = ServerState::start(pool, cfg.payto, cfg.currency).await; - let mut router = Router::new(); - - if let Some(cfg) = cfg.wire_gateway { - router = router.wire_gateway(api.clone(), cfg.auth); - } else { - panic!("lol") - } - - /*if let Some(cfg) = cfg.revenue { - router = router.revenue(api, cfg.auth); - }*/ - // TODO http lifetime - router - .layer(middleware::from_fn_with_state( - api.clone(), - status_middleware, - )) - .serve(cfg.serve, cfg.lifetime) - .await?; - } - } - Command::Config(cfg_cmd) => cfg_cmd.run(cfg)?, - } - Ok(()) -} - -fn main() { - let args = Args::parse(); - taler_main(CONFIG_SOURCE, args.common.clone(), |cfg| async move { - app(args, cfg).await - }) -} diff --git a/depolymerizer-ethereum/src/payto.rs b/depolymerizer-ethereum/src/payto.rs @@ -1,75 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::str::FromStr; - -use ethereum_types::Address; -use rustc_hex::FromHexError; -use taler_common::types::payto::{FullPayto, Payto, PaytoErr, PaytoImpl, PaytoURI, TransferPayto}; - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct EthAccount(pub Address); - -const ETHEREUM: &str = "ethereum"; - -#[derive(Debug, thiserror::Error)] -pub enum EthErr { - #[error("missing ethereum address in path")] - MissingAddr, - #[error("malformed ethereum address: {0}")] - Addr(FromHexError), -} - -impl PaytoImpl for EthAccount { - fn as_payto(&self) -> PaytoURI { - PaytoURI::from_parts(ETHEREUM, format_args!("/{}", hex::encode(self.0))) - } - - fn parse(raw: &PaytoURI) -> Result<Self, PaytoErr> { - let url = raw.as_ref(); - if url.domain() != Some(ETHEREUM) { - return Err(PaytoErr::UnsupportedKind( - ETHEREUM, - url.domain().unwrap_or_default().to_owned(), - )); - } - let Some(mut segments) = url.path_segments() else { - return Err(PaytoErr::custom(EthErr::MissingAddr)); - }; - let Some(first) = segments.next() else { - return Err(PaytoErr::custom(EthErr::MissingAddr)); - }; - let addr = Address::from_str(first).map_err(|e| PaytoErr::custom(EthErr::Addr(e)))?; - Ok(Self(addr)) - } -} - -impl FromStr for EthAccount { - type Err = FromHexError; - - fn from_str(s: &str) -> Result<Self, Self::Err> { - Ok(Self(Address::from_str(s)?)) - } -} - -/// Parse an ethereum payto URI, panic if malformed -pub fn eth_payto(url: impl AsRef<str>) -> FullEthPayto { - url.as_ref().parse().expect("invalid eth payto") -} - -pub type EthPayto = Payto<EthAccount>; -pub type FullEthPayto = FullPayto<EthAccount>; -pub type TransferEthPayto = TransferPayto<EthAccount>; diff --git a/depolymerizer-ethereum/src/rpc.rs b/depolymerizer-ethereum/src/rpc.rs @@ -1,551 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -//! This is a very simple RPC client designed only for a specific geth version -//! and to use on an secure unix domain socket to a trusted node -//! -//! We only parse the thing we actually use, this reduce memory usage and -//! make our code more compatible with future deprecation - -use common::url::Url; -use ethereum_types::{Address, H256, U64, U256}; -use serde::de::DeserializeOwned; -use std::{ - fmt::Debug, - io::{self, BufWriter, ErrorKind, Read, Write}, - os::unix::net::UnixStream, - path::Path, -}; - -use self::hex::Hex; - -/// Create a rpc connection with an unlocked wallet -pub fn rpc_wallet(ipc_path: impl AsRef<Path>, password: &str, address: &Address) -> Result<Rpc> { - let mut rpc = Rpc::new(ipc_path)?; - rpc.unlock_account(address, password)?; - Ok(rpc) -} - -/// Create a rpc connection -pub fn rpc_common(ipc_path: impl AsRef<Path>) -> Result<Rpc> { - Ok(Rpc::new(ipc_path)?) -} - -#[derive(Debug, serde::Serialize)] -struct RpcRequest<'a, T: serde::Serialize> { - jsonrpc: &'static str, - method: &'a str, - id: u64, - params: &'a T, -} - -#[derive(Debug, serde::Deserialize)] -struct RpcResponse<T> { - result: Option<T>, - error: Option<RpcErr>, - id: u64, -} - -#[derive(Debug, serde::Deserialize)] -struct RpcErr { - code: i64, - message: String, -} - -#[derive(Debug, thiserror::Error)] -pub enum Error { - #[error("{0:?}")] - Transport(#[from] std::io::Error), - #[error("{code:?} - {msg}")] - RPC { code: i64, msg: String }, - #[error("JSON: {0}")] - Json(#[from] serde_json::Error), - #[error("Null rpc, no result or error")] - Null, -} - -pub type Result<T> = std::result::Result<T, Error>; - -const EMPTY: [(); 0] = []; - -/// Ethereum RPC connection -pub struct Rpc { - id: u64, - conn: BufWriter<UnixStream>, - read_buf: Vec<u8>, - cursor: usize, -} - -impl Rpc { - /// Start a RPC connection, path can be datadir or ipc path - pub fn new(path: impl AsRef<Path>) -> io::Result<Self> { - let path = path.as_ref(); - - let conn = if path.is_dir() { - UnixStream::connect(path.join("geth.ipc")) - } else { - UnixStream::connect(path) - }?; - - Ok(Self { - id: 0, - conn: BufWriter::new(conn), - read_buf: vec![0u8; 8 * 1024], - cursor: 0, - }) - } - - fn send(&mut self, method: &str, params: &impl serde::Serialize) -> Result<()> { - let request = RpcRequest { - method, - id: self.id, - params, - jsonrpc: "2.0", - }; - - // Send request - serde_json::to_writer(&mut self.conn, &request)?; - self.conn.flush()?; - Ok(()) - } - - fn receive<T>(&mut self) -> Result<T> - where - T: serde::de::DeserializeOwned + Debug, - { - loop { - // Read one - let pos = self.read_buf[..self.cursor] - .iter() - .position(|c| *c == b'\n') - .map(|pos| pos + 1); // Move after newline - if let Some(pos) = pos { - match serde_json::from_slice(&self.read_buf[..pos]) { - Ok(response) => { - self.read_buf.copy_within(pos..self.cursor, 0); - self.cursor -= pos; - return Ok(response); - } - Err(err) => return Err(err)?, - } - } // Or read more - - // Double buffer size if full - if self.cursor == self.read_buf.len() { - self.read_buf.resize(self.cursor * 2, 0); - } - match self.conn.get_mut().read(&mut self.read_buf[self.cursor..]) { - Ok(0) => Err(std::io::Error::new( - ErrorKind::UnexpectedEof, - "RPC EOF".to_string(), - ))?, - Ok(nb) => self.cursor += nb, - Err(e) if e.kind() == ErrorKind::Interrupted => {} - Err(e) => Err(e)?, - } - } - } - - pub fn subscribe_new_head(&mut self) -> Result<RpcStream<Nothing>> { - let id: String = self.call("eth_subscribe", &["newHeads"])?; - Ok(RpcStream::new(self, id)) - } - - fn handle_response<T>(&mut self, response: RpcResponse<T>) -> Result<T> { - assert_eq!(self.id, response.id); - self.id += 1; - if let Some(ok) = response.result { - Ok(ok) - } else { - Err(match response.error { - Some(err) => Error::RPC { - code: err.code, - msg: err.message, - }, - None => Error::Null, - }) - } - } -} - -impl RpcClient for Rpc { - fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T> - where - T: serde::de::DeserializeOwned + Debug, - { - self.send(method, params)?; - let response = self.receive()?; - self.handle_response(response) - } -} - -#[derive(Debug, serde::Deserialize)] -pub struct NotificationContent<T> { - subscription: String, - result: T, -} - -#[derive(Debug, serde::Deserialize)] - -struct Notification<T> { - params: NotificationContent<T>, -} - -#[derive(Debug, serde::Deserialize)] -#[serde(untagged)] -enum NotificationOrResponse<T, N> { - Notification(Notification<N>), - Response(RpcResponse<T>), -} -#[derive(Debug, serde::Deserialize)] -#[serde(untagged)] -enum SubscribeDirtyFix { - Fix(RpcResponse<bool>), - Id(RpcResponse<String>), -} - -/// A notification stream wrapping an rpc client -pub struct RpcStream<'a, N: Debug + DeserializeOwned> { - rpc: &'a mut Rpc, - id: String, - buff: Vec<N>, -} - -impl<'a, N: Debug + DeserializeOwned> RpcStream<'a, N> { - fn new(rpc: &'a mut Rpc, id: String) -> Self { - Self { - rpc, - id, - buff: vec![], - } - } - - /// Block until next notification - pub fn next(&mut self) -> Result<N> { - match self.buff.pop() { - // Consume buffered notifications - Some(prev) => Ok(prev), - // Else read next one - None => { - let notification: Notification<N> = self.rpc.receive()?; - let notification = notification.params; - assert_eq!(self.id, notification.subscription); - Ok(notification.result) - } - } - } -} - -impl<N: Debug + DeserializeOwned> Drop for RpcStream<'_, N> { - fn drop(&mut self) { - let Self { rpc, id, .. } = self; - // Request unsubscription, ignoring error - rpc.send("eth_unsubscribe", &[id]).ok(); - // Ignore all buffered notification until subscription response - while let Ok(response) = rpc.receive::<NotificationOrResponse<bool, N>>() { - match response { - NotificationOrResponse::Notification(_) => { /* Ignore */ } - NotificationOrResponse::Response(_) => return, - } - } - } -} - -impl<N: Debug + DeserializeOwned> RpcClient for RpcStream<'_, N> { - fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T> - where - T: serde::de::DeserializeOwned + Debug, - { - self.rpc.send(method, params)?; - loop { - // Buffer notifications until response - let response: NotificationOrResponse<T, N> = self.rpc.receive()?; - match response { - NotificationOrResponse::Notification(n) => { - let n = n.params; - assert_eq!(self.id, n.subscription); - self.buff.push(n.result); - } - NotificationOrResponse::Response(response) => { - return self.rpc.handle_response(response); - } - } - } - } -} - -pub trait RpcClient { - fn call<T>(&mut self, method: &str, params: &impl serde::Serialize) -> Result<T> - where - T: serde::de::DeserializeOwned + Debug; - - /* ----- Account management ----- */ - - /// List registered account - fn list_accounts(&mut self) -> Result<Vec<Address>> { - self.call("personal_listAccounts", &EMPTY) - } - - /// Create a new encrypted account - fn new_account(&mut self, passwd: &str) -> Result<Address> { - self.call("personal_newAccount", &[passwd]) - } - - /// Unlock an existing account - fn unlock_account(&mut self, account: &Address, passwd: &str) -> Result<bool> { - self.call("personal_unlockAccount", &(account, passwd, 0)) - } - - /* ----- Getter ----- */ - - /// Get a transaction by hash - fn get_transaction(&mut self, hash: &H256) -> Result<Option<Transaction>> { - match self.call("eth_getTransactionByHash", &[hash]) { - Err(Error::Null) => Ok(None), - r => r, - } - } - - /// Get a transaction receipt by hash - fn get_transaction_receipt(&mut self, hash: &H256) -> Result<Option<TransactionReceipt>> { - match self.call("eth_getTransactionReceipt", &[hash]) { - Err(Error::Null) => Ok(None), - r => r, - } - } - - /// Get block by hash - fn block(&mut self, hash: &H256) -> Result<Option<Block>> { - match self.call("eth_getBlockByHash", &(hash, &true)) { - Err(Error::Null) => Ok(None), - r => r, - } - } - - /// Get pending transactions - fn pending_transactions(&mut self) -> Result<Vec<Transaction>> { - self.call("eth_pendingTransactions", &EMPTY) - } - - /// Get latest block - fn latest_block(&mut self) -> Result<Block> { - self.call("eth_getBlockByNumber", &("latest", &true)) - } - - /// Get earliest block (genesis if not pruned) - fn earliest_block(&mut self) -> Result<Block> { - self.call("eth_getBlockByNumber", &("earliest", &true)) - } - - /// Get latest account balance - fn get_balance_latest(&mut self, addr: &Address) -> Result<U256> { - self.call("eth_getBalance", &(addr, "latest")) - } - - /// Get pending account balance - fn get_balance_pending(&mut self, addr: &Address) -> Result<U256> { - self.call("eth_getBalance", &(addr, "pending")) - } - - /// Get current chain height - fn height(&mut self) -> Result<U64> { - self.call("eth_blockNumber", &EMPTY) - } - - /// Get node info - fn node_info(&mut self) -> Result<NodeInfo> { - self.call("admin_nodeInfo", &EMPTY) - } - - /* ----- Transactions ----- */ - - /// Fill missing options from transaction request with default values - fn fill_transaction(&mut self, req: &TransactionRequest) -> Result<Filled> { - self.call("eth_fillTransaction", &[req]) - } - - /// Send ethereum transaction - fn send_transaction(&mut self, req: &TransactionRequest) -> Result<H256> { - self.call("eth_sendTransaction", &[req]) - } - - /* ----- Peer management ----- */ - - fn export_chain(&mut self, path: &str) -> Result<bool> { - self.call("admin_exportChain", &[path]) - } - - fn import_chain(&mut self, path: &str) -> Result<bool> { - self.call("admin_importChain", &[path]) - } -} - -#[derive(Debug, Clone, serde::Deserialize)] -pub struct Block { - pub hash: Option<H256>, - /// Block number (None if pending) - pub number: Option<U64>, - #[serde(rename = "parentHash")] - pub parent_hash: H256, - pub transactions: Vec<Transaction>, -} - -#[derive(Debug, serde::Deserialize)] -pub struct Nothing {} - -/// Description of a Transaction, pending or in the chain. -#[derive(Debug, Clone, serde::Deserialize)] -pub struct Transaction { - pub hash: H256, - pub nonce: U256, - /// Sender address (None when coinbase) - pub from: Option<Address>, - /// Recipient address (None when contract creation) - pub to: Option<Address>, - /// Transferred value - pub value: U256, - /// Input data - pub input: Hex, -} - -/// Description of a Transaction, pending or in the chain. -#[derive(Debug, Clone, serde::Deserialize)] -pub struct TransactionReceipt { - /// Gas used by this transaction alone. - #[serde(rename = "gasUsed")] - pub gas_used: U256, - /// Effective gas price - #[serde(rename = "effectiveGasPrice")] - pub effective_gas_price: Option<U256>, -} - -/// Fill result -#[derive(Debug, serde::Deserialize)] -pub struct Filled { - pub tx: FilledGas, -} - -/// Filles gas -#[derive(Debug, serde::Deserialize)] -pub struct FilledGas { - /// Supplied gas - pub gas: U256, - #[serde(rename = "gasPrice")] - pub gas_price: Option<U256>, - #[serde(rename = "maxFeePerGas")] - pub max_fee_per_gas: Option<U256>, -} - -/// Send Transaction Parameters -#[derive(Debug, serde::Serialize)] -pub struct TransactionRequest { - /// Sender address - pub from: Address, - /// Recipient address - pub to: Address, - /// Transferred value - pub value: U256, - /// Gas price (None for sensible default) - #[serde(rename = "gasPrice")] - pub gas_price: Option<U256>, - /// Transaction data - pub data: Hex, - /// Transaction nonce (None for next available nonce) - #[serde(skip_serializing_if = "Option::is_none")] - pub nonce: Option<U256>, -} - -#[derive(Debug, serde::Deserialize)] -pub struct NodeInfo { - pub enode: Url, -} - -pub mod hex { - use std::{ - fmt, - ops::{Deref, DerefMut}, - }; - - use serde::{ - Deserialize, Deserializer, Serialize, Serializer, - de::{Error, Unexpected, Visitor}, - }; - - /// Raw bytes wrapper - #[derive(Clone, Debug, Default, PartialEq, Eq, Hash)] - pub struct Hex(pub Vec<u8>); - - impl Deref for Hex { - type Target = Vec<u8>; - - fn deref(&self) -> &Self::Target { - &self.0 - } - } - - impl DerefMut for Hex { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.0 - } - } - - impl Serialize for Hex { - fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> - where - S: Serializer, - { - serializer.serialize_str(&hex::encode_prefixed(&self.0)) - } - } - - impl<'a> Deserialize<'a> for Hex { - fn deserialize<D>(deserializer: D) -> Result<Hex, D::Error> - where - D: Deserializer<'a>, - { - deserializer.deserialize_identifier(BytesVisitor) - } - } - - struct BytesVisitor; - - impl Visitor<'_> for BytesVisitor { - type Value = Hex; - - fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { - write!(formatter, "a 0x-prefixed hex-encoded vector of bytes") - } - - fn visit_str<E>(self, value: &str) -> Result<Self::Value, E> - where - E: Error, - { - if value.len() >= 2 && &value[0..2] == "0x" { - let bytes = hex::decode(&value[2..]) - .map_err(|e| Error::custom(format!("Invalid hex: {e}")))?; - Ok(Hex(bytes)) - } else { - Err(Error::invalid_value(Unexpected::Str(value), &"0x prefix")) - } - } - - fn visit_string<E>(self, value: String) -> Result<Self::Value, E> - where - E: Error, - { - self.visit_str(value.as_ref()) - } - } -} diff --git a/depolymerizer-ethereum/src/sql.rs b/depolymerizer-ethereum/src/sql.rs @@ -1,40 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ -use common::{ - postgres::Row, - sql::{sql_amount, sql_array}, -}; -use depolymerizer_ethereum::taler_util::taler_to_eth; -use ethereum_types::{H160, H256, U256}; -use taler_common::types::amount::Currency; - -/// Ethereum amount from sql -pub fn sql_eth_amount(row: &Row, idx: usize, currency: &Currency) -> U256 { - let amount = sql_amount(row, idx, currency); - taler_to_eth(&amount) -} - -/// Ethereum address from sql -pub fn sql_addr(row: &Row, idx: usize) -> H160 { - let array: [u8; 20] = sql_array(row, idx); - H160::from_slice(&array) -} - -/// Ethereum hash from sql -pub fn sql_hash(row: &Row, idx: usize) -> H256 { - let array: [u8; 32] = sql_array(row, idx); - H256::from_slice(&array) -} diff --git a/depolymerizer-ethereum/src/taler_util.rs b/depolymerizer-ethereum/src/taler_util.rs @@ -1,36 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use common::taler_common::types::amount::{Amount, FRAC_BASE}; -use ethereum_types::U256; -use taler_common::types::amount::Currency; - -pub const WEI: u64 = 1_000_000_000_000_000_000; -pub const TRUNC: u64 = WEI / FRAC_BASE as u64; - -/// Transform a eth amount into a taler amount -pub fn eth_to_taler(amount: &U256, currency: &Currency) -> Amount { - Amount::new( - currency, - (amount / WEI).as_u64(), - ((amount % WEI) / TRUNC).as_u32(), - ) -} - -/// Transform a eth amount into a btc amount -pub fn taler_to_eth(amount: &Amount) -> U256 { - U256::from(amount.val) * WEI + U256::from(amount.frac) * TRUNC -} diff --git a/depolymerizer-ethereum/tests/api.rs b/depolymerizer-ethereum/tests/api.rs @@ -1,105 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::str::FromStr; - -use axum::Router; -use depolymerizer_ethereum::{CONFIG_SOURCE, api::ServerState}; -use sqlx::PgPool; -use taler_api::{api::TalerRouter as _, auth::AuthMethod}; -use taler_common::{ - api_common::{HashCode, ShortHashCode}, - api_wire::{OutgoingHistory, TransferState, WireConfig}, - types::{amount::Currency, payto::payto}, -}; -use taler_test_utils::{ - db_test_setup, json, - routine::{admin_add_incoming_routine, routine_pagination, transfer_routine}, - server::TestServer, -}; - -async fn setup() -> (Router, PgPool) { - let pool = db_test_setup(CONFIG_SOURCE).await; - let api = ServerState::start( - pool.clone(), - payto("payto://ethereum/06012c8cf97bead5deae237070f9587f8e7a266d"), - Currency::from_str("ETH").unwrap(), - ) - .await; - let server = Router::new() - .wire_gateway(api.clone(), AuthMethod::None) - .finalize(); - - (server, pool) -} - -#[tokio::test] -async fn config() { - let (server, _) = setup().await; - server - .get("/taler-wire-gateway/config") - .await - .assert_ok_json::<WireConfig>(); -} - -#[tokio::test] -async fn transfer() { - let (server, _) = setup().await; - transfer_routine( - &server, - TransferState::success, - &payto("payto://ethereum/06012c8cf97bead5deae237070f9587f8e7a266d?receiver-name=Anonymous"), - ) - .await; -} - -#[tokio::test] -async fn outgoing_history() { - let (server, _) = setup().await; - routine_pagination::<OutgoingHistory, _>( - &server, - "/taler-wire-gateway/history/outgoing", - |it| { - it.outgoing_transactions - .into_iter() - .map(|it| *it.row_id as i64) - .collect() - }, - |s, _| async { - s.post("/taler-wire-gateway/transfer").json( - &json!({ - "request_uid": HashCode::rand(), - "amount": "ETH:10", - "exchange_base_url": "http://exchange.taler/", - "wtid": ShortHashCode::rand(), - "credit_account": "payto://ethereum/06012c8cf97bead5deae237070f9587f8e7a266d?receiver-name=Anonymous", - }) - ).await; - }, - ) - .await; -} - -#[tokio::test] -async fn admin_add_incoming() { - let (server, _) = setup().await; - admin_add_incoming_routine( - &server, - &payto("payto://ethereum/06012c8cf97bead5deae237070f9587f8e7a266d?receiver-name=Anonymous"), - false, - ) - .await; -} diff --git a/instrumentation/Cargo.toml b/instrumentation/Cargo.toml @@ -14,10 +14,6 @@ common = { path = "../common" } # Bitcoin depolymerizer-bitcoin = { path = "../depolymerizer-bitcoin" } bitcoin.workspace = true -# Ethereum -depolymerizer-ethereum = { path = "../depolymerizer-ethereum" } -ethereum-types.workspace = true -hex.workspace = true # Wire Gateway ureq = { version = "3.0.0", features = ["json"] } # Generate temporary files diff --git a/instrumentation/README.md b/instrumentation/README.md @@ -1,38 +0,0 @@ -# Depolymerizer instrumentation test - -## Install - -`cargo install --path instrumentation` - -## Offline local tests - -Local tests require additional binaries. The following binaries must be in the -local user PATH: - -- `pg_ctl` and `psql` from PostgreSQL -- `geth` from [go-ethereum](https://geth.ethereum.org/downloads/) -- `bitcoind` and `bitcoin-cli` from - [bitcoincore](https://bitcoincore.org/en/download/) -- `taler-config` and `taler-exchange-wire-gateway-client` from the - [Taler exchange ](https://git.taler.net/exchange.git/) - -You can use the [prepare](script/prepare.sh) script to download and extract -blockchain binaries and find the path of the local Postgres installation. -However, taler binaries need to be compiled from source for now. - -Run `instrumentation --offline` to run all tests. - -## Online tests - -Local tests running on a private development network are meant to test the good -behavior in case of extreme situations but do not attest our capacity to handle -real network behavior. - -First, follow a normal setup for the adapter and then run `instrumentation --online`. The -tested blockchain will be determined based on the taler configuration. - -### Temporary database - -If you do not want to use a persistent database for instrumentation tests, there -is a [script](../script/tmp_db.sh) to generate a temporary database similar to -local tests. -\ No newline at end of file diff --git a/instrumentation/src/eth.rs b/instrumentation/src/eth.rs @@ -1,1059 +0,0 @@ -/* - This file is part of TALER - Copyright (C) 2022-2025 Taler Systems SA - - TALER is free software; you can redistribute it and/or modify it under the - terms of the GNU Affero General Public License as published by the Free Software - Foundation; either version 3, or (at your option) any later version. - - TALER is distributed in the hope that it will be useful, but WITHOUT ANY - WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR - A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License along with - TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> -*/ - -use std::{ - ops::{Deref, DerefMut}, - path::PathBuf, - sync::LazyLock, -}; - -use common::taler_common::{ - api_common::{EddsaPublicKey, ShortHashCode}, - types::{base32::Base32, payto::Payto}, -}; -use depolymerizer_ethereum::{ - CONFIG_SOURCE, RpcExtended, - config::{ServeCfg, WorkerCfg}, - payto::EthAccount, - rpc::{Rpc, RpcClient, TransactionRequest, hex::Hex}, - taler_util::{TRUNC, eth_to_taler}, -}; -use ethereum_types::{H160, U256}; -use taler_common::config::Config; - -use crate::utils::{ - ChildGuard, TalerCtx, TestCtx, cmd_out, cmd_redirect, cmd_redirect_ok, patch_config, retry, - retry_opt, transfer, -}; - -static NETWORK_FEE: LazyLock<U256> = LazyLock::new(|| U256::from(166u32)); - -struct EthCtx { - node: ChildGuard, - rpc: Rpc, - wire_addr: H160, - client_addr: H160, - reserve_addr: H160, - worker_cfg: WorkerCfg, - serve_cfg: ServeCfg, - ctx: TalerCtx, - passwd: String, - pswd_path: PathBuf, -} - -impl Deref for EthCtx { - type Target = TalerCtx; - - fn deref(&self) -> &Self::Target { - &self.ctx - } -} - -impl DerefMut for EthCtx { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.ctx - } -} - -impl EthCtx { - pub fn setup(ctx: &TestCtx, config: &str, stressed: bool) -> Self { - let mut ctx = TalerCtx::new(ctx, "depolymerizer-ethereum", config, stressed); - - ctx.dbinit(); - - // Init chain - let passwd: String = (0..30).map(|_| fastrand::alphanumeric()).collect(); - let pswd_path = ctx.dir.path().join("pswd"); - std::fs::write(&pswd_path, &passwd).unwrap(); - for _ in ["reserve", "client", "wire"] { - cmd_redirect_ok( - "geth", - &[ - "--datadir", - ctx.wire_dir.to_str().unwrap(), - "--lightkdf", - "account", - "new", - "--password", - pswd_path.to_str().unwrap(), - ], - ctx.log("geth"), - "create account", - ) - } - let list = cmd_out( - "geth", - &[ - "--datadir", - ctx.wire_dir.to_str().unwrap(), - "--lightkdf", - "account", - "list", - ], - ); - let mut addrs = list.lines().map(|l| &l[13..][..40]); - let (reserve, client, wire) = ( - addrs.next().unwrap(), - addrs.next().unwrap(), - addrs.next().unwrap(), - ); - let genesis = r#" - { - "config": { - "chainId": 1337, - "homesteadBlock": 0, - "eip150Block": 0, - "eip155Block": 0, - "eip158Block": 0, - "byzantiumBlock": 0, - "constantinopleBlock": 0, - "petersburgBlock": 0, - "istanbulBlock": 0, - "muirGlacierBlock": 0, - "berlinBlock": 0, - "londonBlock": 0, - "arrowGlacierBlock": 0, - "grayGlacierBlock": 0, - "shanghaiTime": 0, - "cancunTime": 0, - "terminalTotalDifficulty": 0, - "terminalTotalDifficultyPassed": true, - "depositContractAddress": "0x0000000000000000000000000000000000000000" - }, - "nonce": "0x0", - "timestamp": "0x0", - "extraData": "0x", - "gasLimit": "0x0", - "difficulty": "0x0", - "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", - "coinbase": "0x0000000000000000000000000000000000000000", - "alloc": { - "0000000000000000000000000000000000000001": { - "balance": "0x1" - }, - "0000000000000000000000000000000000000002": { - "balance": "0x1" - }, - "0000000000000000000000000000000000000003": { - "balance": "0x1" - }, - "0000000000000000000000000000000000000004": { - "balance": "0x1" - }, - "0000000000000000000000000000000000000005": { - "balance": "0x1" - }, - "0000000000000000000000000000000000000006": { - "balance": "0x1" - }, - "0000000000000000000000000000000000000007": { - "balance": "0x1" - }, - "0000000000000000000000000000000000000008": { - "balance": "0x1" - }, - "0000000000000000000000000000000000000009": { - "balance": "0x1" - }, - "CLIENT": { - "balance": "0x100000000000000000" - }, - "RESERVE": { - "balance": "0x100000000000000000" - } - }, - "number": "0x0", - "gasUsed": "0x0", - "parentHash": "0x0000000000000000000000000000000000000000000000000000000000000000", - "baseFeePerGas": "0x0", - "excessBlobGas": null, - "blobGasUsed": null - } - "# - .replace("CLIENT", client) - .replace("RESERVE", reserve); - std::fs::write(ctx.wire_dir.join("genesis.json"), genesis.as_bytes()).unwrap(); - - cmd_redirect_ok( - "geth", - &[ - "--dev", - "--datadir", - ctx.wire_dir.to_str().unwrap(), - "--password", - pswd_path.to_str().unwrap(), - "init", - ctx.wire_dir.join("genesis.json").to_str().unwrap(), - ], - ctx.log("geth"), - "init chain", - ); - cmd_redirect_ok( - "geth", - &[ - "--dev", - "--datadir", - ctx.wire2_dir.to_str().unwrap(), - "--password", - pswd_path.to_str().unwrap(), - "init", - ctx.wire_dir.join("genesis.json").to_str().unwrap(), - ], - ctx.log("geth2"), - "init chain2", - ); - let node = cmd_redirect( - "geth", - &[ - "--datadir", - ctx.wire_dir.to_str().unwrap(), - "--cache", - "16", - "--lightkdf", - "--rpc.enabledeprecatedpersonal", - "--dev", - "--miner.gasprice", - "1", - "--password", - pswd_path.to_str().unwrap(), - ], - ctx.log("geth"), - ); - let mut rpc = retry_opt(|| Rpc::new(&ctx.wire_dir)); - - // Generate wallet - patch_config(&ctx.conf, &ctx.conf, |cfg| { - cfg.with_section(Some("depolymerizer-ethereum")) - .set("NAME", "Exchange Owner") - .set("ACCOUNT", wire); - cfg.with_section(Some("depolymerizer-ethereum-worker")) - .set("PASSWORD", &passwd) - .set("IPC_PATH", ctx.wire_dir.to_str().unwrap()); - }); - - let cfg = Config::from_file(CONFIG_SOURCE, Some(&ctx.conf)).unwrap(); - let worker_cfg = WorkerCfg::parse(&cfg).unwrap(); - let serve_cfg = ServeCfg::parse(&cfg).unwrap(); - - // Setup & run - ctx.setup(); - ctx.run(); - - let accounts = rpc.list_accounts().unwrap(); - let reserve_addr = accounts[0]; - let client_addr = accounts[1]; - let wire_addr = accounts[2]; - for addr in [&client_addr, &reserve_addr] { - rpc.unlock_account(addr, &passwd).unwrap(); - } - - let mut tmp = Self { - node, - rpc, - reserve_addr, - client_addr, - wire_addr, - worker_cfg, - serve_cfg, - ctx, - passwd, - pswd_path, - }; - tmp.mine(0); - tmp - } - - pub fn reset_db(&mut self) { - self.ctx.reset_db(); - self.ctx.setup(); - } - - pub fn stop_node(&mut self) { - // We need to kill node gracefully to avoid corruption - #[cfg(unix)] - { - cmd_redirect_ok( - "kill", - &[&self.node.0.id().to_string()], - "/dev/null", - "fill btc node", - ); - self.node.0.wait().unwrap(); - } - } - - // We use the import/export chain functionality to simulate a connected node peer - // because local network peer are not reliable - // importChain RPC crash so we have to use the cli for now - - fn export(rpc: &mut Rpc, path: &str) { - std::fs::remove_file(path).ok(); - assert!(rpc.export_chain(path).unwrap()) - } - - pub fn cluster_deco(&mut self) { - let path = self.ctx.dir.path().join("chain"); - let path = path.to_str().unwrap(); - Self::export(&mut self.rpc, path); - cmd_redirect_ok( - "geth", - &[ - "--datadir", - self.ctx.wire2_dir.to_str().unwrap(), - "--keystore", - self.ctx.wire_dir.join("keystore").to_str().unwrap(), - "--cache", - "16", - "--lightkdf", - "--password", - self.pswd_path.to_str().unwrap(), - "import", - path, - ], - self.ctx.log("geth2"), - "import chain", - ); - } - - pub fn cluster_fork(&mut self) { - let node2 = cmd_redirect( - "geth", - &[ - "--dev", - "--datadir", - self.ctx.wire2_dir.to_str().unwrap(), - "--keystore", - self.ctx.wire_dir.join("keystore").to_str().unwrap(), - "--cache", - "16", - "--lightkdf", - "--rpc.enabledeprecatedpersonal", - "--password", - self.pswd_path.to_str().unwrap(), - ], - self.ctx.log("geth2"), - ); - let mut rpc = retry_opt(|| Rpc::new(&self.ctx.wire2_dir)); - let node1_height = self.rpc.height().unwrap(); - let node2_height = rpc.height().unwrap(); - let diff = node1_height - node2_height; - self._mine(Some(&mut rpc), diff.as_u32() as u16 + 10); - let path = self.ctx.dir.path().join("chain"); - let path = path.to_str().unwrap(); - Self::export(&mut rpc, path); - drop(node2); - self.stop_node(); - cmd_redirect_ok( - "geth", - &[ - "--datadir", - self.ctx.wire_dir.to_str().unwrap(), - "--lightkdf", - "--password", - self.pswd_path.to_str().unwrap(), - "import", - path, - ], - self.ctx.log("geth"), - "import chain", - ); - self.resume_node(&[]); - } - - pub fn restart_node(&mut self, additional_args: &[&str]) { - self.stop_node(); - self.resume_node(additional_args); - } - - pub fn resume_node(&mut self, additional_args: &[&str]) { - let mut args = vec![ - "--datadir", - self.ctx.wire_dir.to_str().unwrap(), - "--lightkdf", - "--dev", - "--cache", - "16", - "--rpc.enabledeprecatedpersonal", - "--password", - self.pswd_path.to_str().unwrap(), - ]; - args.extend_from_slice(additional_args); - self.node = cmd_redirect("geth", &args, self.ctx.log("geth")); - self.rpc = retry_opt(|| Rpc::new(&self.ctx.wire_dir)); - for addr in [&self.wire_addr, &self.client_addr, &self.reserve_addr] { - self.rpc.unlock_account(addr, &self.passwd).unwrap(); - } - } - - pub fn amount(&self, amount: u32) -> U256 { - U256::from(amount) * TRUNC - } - - /* ----- Transaction ------ */ - - pub fn credit(&mut self, amount: U256, reserve_pub: &EddsaPublicKey) { - self.rpc - .credit( - self.client_addr, - self.wire_addr, - amount, - reserve_pub.clone(), - ) - .unwrap(); - } - - pub fn debit(&mut self, amount: U256, wtid: &ShortHashCode) { - transfer( - &self.ctx.gateway_url, - wtid, - Payto::new(EthAccount(self.client_addr)) - .as_payto() - .as_full_payto("Anonymous"), - &eth_to_taler(&amount, &self.worker_cfg.currency), - ); - } - - pub fn malformed_credit(&mut self, amount: U256) { - self.rpc - .send_transaction(&TransactionRequest { - from: self.client_addr, - to: self.wire_addr, - value: amount, - nonce: None, - gas_price: None, - data: Hex(vec![]), - }) - .unwrap(); - } - - pub fn abandon(&mut self) { - let pending = self.rpc.pending_transactions().unwrap(); - for tx in pending - .into_iter() - .filter(|t| t.from == Some(self.client_addr)) - { - // Replace transaction value with 0 - self.rpc - .send_transaction(&TransactionRequest { - from: self.client_addr, - to: tx.to.unwrap(), - value: U256::zero(), - gas_price: Some(U256::from(110)), // Bigger gas price to replace fee - data: Hex(vec![]), - nonce: Some(tx.nonce), - }) - .unwrap(); - } - } - - /* ----- Mining ----- */ - - fn dummy_tx(&self) -> TransactionRequest { - TransactionRequest { - from: self.reserve_addr, - to: self.reserve_addr, - value: U256::zero(), - gas_price: None, - data: Hex::default(), - nonce: None, - } - } - - fn resume_miner(&mut self) { - self.rpc.send_transaction(&self.dummy_tx()).unwrap(); - self.mine(10); - } - - fn _mine(&mut self, custom: Option<&mut Rpc>, mut nb: u16) { - let dummy_tx = self.dummy_tx(); - let mut rpc = custom - .unwrap_or(&mut self.rpc) - .subscribe_new_head() - .unwrap(); - while !rpc.pending_transactions().unwrap().is_empty() { - rpc.next().unwrap(); - nb = nb.saturating_sub(1); - } - for _ in 0..nb { - rpc.send_transaction(&dummy_tx).unwrap(); - rpc.next().unwrap(); - } - } - - fn mine(&mut self, nb: u16) { - self._mine(None, nb); - } - - pub fn next_conf(&mut self) { - self.mine(0); - self.mine(self.worker_cfg.confirmation as u16) - } - - pub fn mine_pending(&mut self) { - self.mine(0) - } - - /* ----- Balances ----- */ - - pub fn client_balance(&mut self) -> U256 { - self.rpc.get_balance_latest(&self.client_addr).unwrap() - } - - pub fn wire_balance(&mut self) -> U256 { - self.rpc.get_balance_latest(&self.wire_addr).unwrap() - } - - pub fn wire_balance_pending(&mut self) -> U256 { - self.rpc.get_balance_pending(&self.wire_addr).unwrap() - } - - fn expect_balance(&mut self, balance: U256, lambda: fn(&mut Self) -> U256) { - retry( - || { - let current = lambda(self); - if current != balance { - // dbg!(current.abs_diff(balance), current.abs_diff(balance) / 30); - } - balance == current - }, - "balance", - ); - } - - pub fn expect_client_balance(&mut self, balance: U256) { - self.expect_balance(balance, Self::client_balance) - } - - pub fn expect_wire_balance(&mut self, balance: U256) { - self.expect_balance(balance, Self::wire_balance) - } - - /* ----- Wire Gateway ----- */ - - pub fn expect_credits(&self, txs: &[(EddsaPublicKey, U256)]) { - let txs: Vec<_> = txs - .iter() - .map(|(metadata, amount)| { - ( - metadata.clone(), - eth_to_taler(amount, &self.worker_cfg.currency), - ) - }) - .collect(); - self.ctx.expect_credits(&txs) - } - - pub fn expect_debits(&self, txs: &[(ShortHashCode, U256)]) { - let txs: Vec<_> = txs - .iter() - .map(|(metadata, amount)| { - ( - metadata.clone(), - eth_to_taler(amount, &self.worker_cfg.currency), - ) - }) - .collect(); - self.ctx.expect_debits(&txs) - } -} - -/// Test eth-wire correctly receive and send transactions on the blockchain -pub fn wire(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = EthCtx::setup(&ctx, "taler_eth.conf", false); - - ctx.step("Credit"); - { - // Send transactions - let mut balance = ctx.wire_balance(); - let mut txs = Vec::new(); - for n in 10..100 { - let reserve_pub = Base32::rand(); - let amount = ctx.amount(n * 1000); - ctx.credit(amount, &reserve_pub); - txs.push((reserve_pub, amount)); - balance += amount; - } - ctx.next_conf(); - ctx.expect_credits(&txs); - ctx.expect_wire_balance(balance); - }; - - ctx.step("Debit"); - { - let mut balance = ctx.client_balance(); - let mut txs = Vec::new(); - for n in 10..100 { - let metadata = Base32::rand(); - let amount = ctx.amount(n * 100); - balance += amount; - ctx.debit(amount, &metadata); - txs.push((metadata, amount)); - } - ctx.expect_debits(&txs); - ctx.expect_client_balance(balance); - } - - ctx.step("Bounce"); - { - // Send bad transactions - let mut wire_balance = ctx.wire_balance(); - for n in 10..40 { - let amount = ctx.amount(n * 101); - ctx.malformed_credit(amount); - wire_balance += ctx.worker_cfg.bounce_fee + *NETWORK_FEE; - } - - ctx.next_conf(); - ctx.expect_wire_balance(wire_balance); - } -} - -/// Test eth-wire and wire-gateway correctly stop when a lifetime limit is configured -pub fn lifetime(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = EthCtx::setup(&ctx, "taler_eth_lifetime.conf", false); - ctx.step("Check lifetime"); - // Start up - retry( - || ctx.wire_running() && ctx.gateway_running(), - "both running", - ); - // Consume lifetime - for n in 0..=ctx.worker_cfg.lifetime.unwrap() { - ctx.credit(ctx.amount(n * 1000), &Base32::rand()); - ctx.mine_pending(); - } - for n in 0..=ctx.serve_cfg.lifetime.unwrap() { - ctx.debit(ctx.amount(n * 1000), &Base32::rand()); - } - // End down - retry( - || !ctx.wire_running() && !ctx.gateway_running(), - "both down", - ); -} - -/// Check the capacity of wire-gateway and eth-wire to recover from database and node loss -pub fn reconnect(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = EthCtx::setup(&ctx, "taler_eth.conf", false); - - let mut credits = Vec::new(); - let mut debits = Vec::new(); - - ctx.step("With DB"); - { - let metadata = Base32::rand(); - let amount = ctx.amount(42000); - ctx.credit(amount, &metadata); - credits.push((metadata, amount)); - ctx.next_conf(); - ctx.expect_credits(&credits); - }; - - ctx.step("Without DB"); - { - ctx.stop_db(); - ctx.malformed_credit(ctx.amount(24000)); - let metadata = Base32::rand(); - let amount = ctx.amount(4000); - ctx.credit(amount, &metadata); - credits.push((metadata, amount)); - ctx.stop_node(); - ctx.expect_gateway_down(); - } - - ctx.step("Reconnect DB"); - { - ctx.resume_db(); - ctx.resume_node(&[]); - let metadata = Base32::rand(); - let amount = ctx.amount(2000); - ctx.debit(amount, &metadata); - debits.push((metadata, amount)); - ctx.next_conf(); - ctx.expect_debits(&debits); - ctx.expect_credits(&credits); - } - - ctx.step("Recover DB"); - { - let balance = ctx.wire_balance(); - ctx.reset_db(); - ctx.expect_debits(&debits); - ctx.expect_credits(&credits); - ctx.expect_wire_balance(balance); - } -} - -/// Test eth-wire ability to recover from errors in correctness critical paths and prevent concurrent sending -pub fn stress(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = EthCtx::setup(&ctx, "taler_eth.conf", true); - - let mut credits = Vec::new(); - let mut debits = Vec::new(); - - ctx.step("Credit"); - { - let mut balance = ctx.wire_balance(); - for n in 10..30 { - let metadata = Base32::rand(); - let amount = ctx.amount(n * 1000); - ctx.credit(amount, &metadata); - credits.push((metadata, amount)); - balance += amount; - } - ctx.next_conf(); - ctx.expect_credits(&credits); - ctx.expect_wire_balance(balance); - }; - - ctx.step("Debit"); - { - let mut balance = ctx.client_balance(); - for n in 10..30 { - let metadata = Base32::rand(); - let amount = ctx.amount(n * 100); - balance += amount; - ctx.debit(amount, &metadata); - debits.push((metadata, amount)); - } - ctx.expect_debits(&debits); - ctx.expect_client_balance(balance); - } - - ctx.step("Bounce"); - { - let mut wire_balance = ctx.wire_balance(); - for n in 10..30 { - ctx.malformed_credit(ctx.amount(n * 101)); - wire_balance += ctx.worker_cfg.bounce_fee + *NETWORK_FEE; - } - ctx.next_conf(); - ctx.expect_wire_balance(wire_balance); - } - - ctx.step("Recover DB"); - { - let balance = ctx.wire_balance(); - ctx.reset_db(); - ctx.expect_debits(&debits); - ctx.expect_credits(&credits); - ctx.expect_wire_balance(balance); - } -} - -/// Test eth-wire correctness when a blockchain reorganization occurs -pub fn reorg(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = EthCtx::setup(&ctx, "taler_eth.conf", false); - - ctx.step("Handle reorg incoming transactions"); - { - // Loose second node - ctx.cluster_deco(); - - // Perform credits - let before = ctx.wire_balance(); - for n in 10..21 { - ctx.credit(ctx.amount(n * 10000), &Base32::rand()); - } - ctx.next_conf(); - let after = ctx.wire_balance(); - - // Perform fork and check eth-wire hard error - ctx.expect_gateway_up(); - ctx.cluster_fork(); - ctx.expect_wire_balance(before); - ctx.expect_gateway_down(); - - // Recover orphaned transaction - ctx.resume_miner(); - ctx.next_conf(); - ctx.expect_wire_balance(after); - ctx.expect_gateway_up(); - } - - ctx.step("Handle reorg outgoing transactions"); - { - // Loose second node - ctx.cluster_deco(); - - // Perform debits - let before = ctx.client_balance(); - let mut after = ctx.client_balance(); - for n in 10..21 { - let amount = ctx.amount(n * 100); - ctx.debit(amount, &Base32::rand()); - after += amount; - } - ctx.mine_pending(); - ctx.expect_client_balance(after); - - // Perform fork and check eth-wire still up - ctx.expect_gateway_up(); - ctx.cluster_fork(); - ctx.expect_client_balance(before); - ctx.expect_gateway_up(); - - // Recover orphaned transaction - ctx.resume_miner(); - ctx.next_conf(); - ctx.expect_client_balance(after); - } - - ctx.step("Handle reorg bounce"); - { - // Loose second node - ctx.cluster_deco(); - - // Perform bounce - let before = ctx.wire_balance(); - let mut after = ctx.wire_balance(); - for n in 10..21 { - ctx.malformed_credit(ctx.amount(n * 1000)); - after += ctx.worker_cfg.bounce_fee + *NETWORK_FEE; - } - ctx.next_conf(); - ctx.expect_wire_balance(after); - - // Perform fork and check eth-wire hard error - ctx.expect_gateway_up(); - ctx.cluster_fork(); - ctx.expect_wire_balance(before); - ctx.expect_gateway_down(); - - // Recover orphaned transaction - ctx.resume_miner(); - ctx.expect_wire_balance(after); - ctx.expect_gateway_up(); - } -} - -/// Test eth-wire correctness when a blockchain reorganization occurs leading to past incoming transaction conflict -pub fn hell(ctx: TestCtx) { - fn step(ctx: &TestCtx, name: &str, action: impl FnOnce(&mut EthCtx)) { - ctx.step("Setup"); - let mut ctx = EthCtx::setup(ctx, "taler_eth.conf", false); - ctx.step(name); - - // Loose second node - ctx.cluster_deco(); - - // Perform action - action(&mut ctx); - - // Perform fork and check eth-wire hard error - ctx.expect_gateway_up(); - ctx.cluster_fork(); - ctx.expect_gateway_down(); - - // Generate conflict - ctx.restart_node(&["--miner.gasprice", "1000"]); - ctx.abandon(); - ctx.resume_miner(); - let amount = ctx.amount(54000); - ctx.credit(amount, &Base32::rand()); - ctx.expect_wire_balance(amount); - - // Check eth-wire suspend operation - let bounce_amount = ctx.amount(34000); - ctx.malformed_credit(bounce_amount); - ctx.next_conf(); - ctx.expect_wire_balance(amount + bounce_amount); - ctx.expect_gateway_down(); - } - - step(&ctx, "Handle reorg conflicting incoming credit", |ctx| { - let amount = ctx.amount(420000); - ctx.credit(amount, &Base32::rand()); - ctx.next_conf(); - ctx.expect_wire_balance(amount); - }); - - step(&ctx, "Handle reorg conflicting incoming bounce", |ctx| { - let wire_balance = ctx.wire_balance(); - let amount = ctx.amount(420000); - ctx.malformed_credit(amount); - ctx.next_conf(); - ctx.expect_wire_balance(wire_balance + ctx.worker_cfg.bounce_fee + *NETWORK_FEE); - }); -} - -/// Test eth-wire ability to learn and protect itself from blockchain behavior -pub fn analysis(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = EthCtx::setup(&ctx, "taler_eth.conf", false); - - ctx.step("Learn from reorg"); - - // Loose second node - ctx.cluster_deco(); - - // Perform credit - let before = ctx.wire_balance(); - ctx.credit(ctx.amount(42000), &Base32::rand()); - ctx.next_conf(); - let after = ctx.wire_balance(); - - // Perform fork and check eth-wire hard error - ctx.expect_gateway_up(); - ctx.cluster_fork(); - ctx.expect_wire_balance(before); - ctx.expect_gateway_down(); - - // Recover orphaned transaction - ctx.mine(6); - ctx.expect_wire_balance(after); - ctx.expect_gateway_up(); - - // Loose second node - ctx.cluster_deco(); - - // Perform credit - let before = ctx.wire_balance(); - ctx.credit(ctx.amount(42000), &Base32::rand()); - ctx.next_conf(); - - // Perform fork and check eth-wire learned from previous attack - ctx.expect_gateway_up(); - ctx.cluster_fork(); - ctx.expect_wire_balance(before); - ctx.expect_gateway_up(); -} - -/// Test eth-wire ability to handle stuck transaction correctly -pub fn bumpfee(tctx: TestCtx) { - tctx.step("Setup"); - let mut ctx = EthCtx::setup(&tctx, "taler_eth_bump.conf", false); - - // Perform credits to allow wire to perform debits latter - ctx.credit(ctx.amount(90000000), &Base32::rand()); - ctx.next_conf(); - - ctx.step("Bump fee"); - { - // Perform debit - let client = ctx.client_balance(); - let wire = ctx.wire_balance(); - let amount = ctx.amount(40000); - ctx.debit(amount, &Base32::rand()); - retry(|| ctx.wire_balance_pending() < wire, "balance"); - - // Bump min relay fee making the previous debit stuck - ctx.restart_node(&["--miner.gasprice", "100"]); - - // Check bump happen - ctx.expect_client_balance(client); - } - - ctx.step("Bump fee reorg"); - { - // Loose second node - ctx.cluster_deco(); - - // Perform debit - let mut client = ctx.client_balance(); - let wire = ctx.wire_balance(); - let amount = ctx.amount(40000); - ctx.debit(amount, &Base32::rand()); - retry(|| ctx.wire_balance_pending() < wire, "balance"); - - // Bump min relay fee and fork making the previous debit stuck and problematic - ctx.cluster_fork(); - ctx.restart_node(&["--miner.gasprice", "200"]); - - // Check bump happen - client += amount; - ctx.expect_client_balance(client); - } - - ctx.step("Setup"); - drop(ctx); - let mut ctx = EthCtx::setup(&tctx, "taler_eth_bump.conf", true); - - // Perform credit to allow wire to perform debits latter - ctx.credit(ctx.amount(9000000), &Base32::rand()); - ctx.next_conf(); - - ctx.step("Bump fee stress"); - { - // Loose second node - ctx.cluster_deco(); - - // Perform debits - let client = ctx.client_balance(); - let wire = ctx.wire_balance(); - let mut total_amount = U256::zero(); - for n in 10..31 { - let amount = ctx.amount(n * 10000); - total_amount += amount; - ctx.debit(amount, &Base32::rand()); - } - retry( - || ctx.wire_balance_pending() < wire - total_amount, - "balance", - ); - - // Bump min relay fee making the previous debits stuck - ctx.restart_node(&["--miner.gasprice", "1000"]); - - // Check bump happen - ctx.expect_client_balance(client + total_amount); - } -} - -/// Test eth-wire handle transaction fees exceeding limits -pub fn maxfee(ctx: TestCtx) { - ctx.step("Setup"); - let mut ctx = EthCtx::setup(&ctx, "taler_eth.conf", false); - - // Perform credit to allow wire to perform debits latter - ctx.credit(ctx.amount(9000000), &Base32::rand()); - ctx.next_conf(); - - let client = ctx.client_balance(); - let wire = ctx.wire_balance(); - let mut total_amount = U256::zero(); - - ctx.step("Too high fee"); - { - // Change fee config - ctx.restart_node(&["--rpc.txfeecap", "0.00001"]); - - // Perform debits - for n in 10..31 { - let amount = ctx.amount(n * 10000); - total_amount += amount; - ctx.debit(amount, &Base32::rand()); - } - - // Check no transaction happen - ctx.expect_wire_balance(wire); - ctx.expect_client_balance(client); - } - - ctx.step("Good feed"); - { - // Restore default config - ctx.restart_node(&[""]); - - // Check transaction now have been made - ctx.expect_client_balance(client + total_amount); - } -} diff --git a/instrumentation/src/main.rs b/instrumentation/src/main.rs @@ -29,7 +29,6 @@ use utils::TestDb; use crate::utils::{TestCtx, try_cmd_redirect}; mod btc; -mod eth; mod utils; /// Depolymerizer instrumentation test @@ -49,7 +48,7 @@ pub fn main() { let p = ProgressBar::new_spinner(); p.set_style(ProgressStyle::with_template("building {msg} {elapsed:.dim}").unwrap()); p.enable_steady_tick(Duration::from_millis(1000)); - for name in ["depolymerizer-bitcoin", "depolymerizer-ethereum"] { + for name in ["depolymerizer-bitcoin"] { build_bin(&p, name, None, name); build_bin(&p, name, Some("fail"), &format!("{name}-fail")); } @@ -169,14 +168,5 @@ pub const TESTS: &[(fn(TestCtx), &str)] = &[ (btc::analysis, "btc_analysis"), (btc::bumpfee, "btc_bumpfee"), (btc::maxfee, "btc_maxfee"), - (btc::config, "btc_config"), - (eth::wire, "eth_wire"), - (eth::lifetime, "eth_lifetime"), - (eth::reconnect, "eth_reconnect"), - (eth::stress, "eth_stress"), - (eth::reorg, "eth_reorg"), - (eth::hell, "eth_hell"), - (eth::analysis, "eth_analysis"), - (eth::bumpfee, "eth_bumpfee"), - (eth::maxfee, "eth_maxfee"), + (btc::config, "btc_config") ]; diff --git a/makefile b/makefile @@ -25,12 +25,6 @@ install-nobuild: install -m 644 -D -t $(share_dir)/depolymerizer-bitcoin/sql database-versioning/depolymerizer-bitcoin*.sql install -D -t $(bin_dir) contrib/depolymerizer-bitcoin-dbconfig install -D -t $(bin_dir) target/release/depolymerizer-bitcoin - # Ethereum - install -m 644 -D -t $(share_dir)/depolymerizer-ethereum/config.d depolymerizer-ethereum/depolymerizer-ethereum.conf - install -m 644 -D -t $(share_dir)/depolymerizer-ethereum/sql database-versioning/versioning.sql - install -m 644 -D -t $(share_dir)/depolymerizer-ethereum/sql database-versioning/depolymerizer-ethereum*.sql - install -D -t $(bin_dir) contrib/depolymerizer-ethereum-dbconfig - install -D -t $(bin_dir) target/release/depolymerizer-ethereum .PHONY: install install: build install-nobuild diff --git a/script/prepare.sh b/script/prepare.sh @@ -17,8 +17,6 @@ function cleanup() { trap cleanup EXIT BTC_VER="29.0" -ETH_VER="1.14.11-f3c696fa" -FOUNDRY_VER="1.2.3" CALLER_DIR="$PWD" cd $DIR @@ -36,17 +34,6 @@ curl -L https://bitcoincore.org/bin/bitcoin-core-$BTC_VER/bitcoin-$BTC_VER-$ARCH tar xvzf btc.tar.gz mv -v bitcoin-$BTC_VER/bin/* $CALLER_DIR/tools -echo "Ⅲ - Install Go Ethereum (Geth) v$ETH_VER" -ARCH=`dpkg --print-architecture` -curl -L https://gethstore.blob.core.windows.net/builds/geth-linux-$ARCH-$ETH_VER.tar.gz -o geth.tar.gz -tar xvzf geth.tar.gz -mv -v geth-linux-$ARCH-$ETH_VER/geth $CALLER_DIR/tools - -echo "IV - Install Foundry v$FOUNDRY_VER" -curl -L https://github.com/foundry-rs/foundry/releases/download/v$FOUNDRY_VER/foundry_v${FOUNDRY_VER}_linux_$ARCH.tar.gz -o foundry.tar.gz -tar xvzf foundry.tar.gz -mv -v anvil chisel cast forge $CALLER_DIR/tools - echo "Ⅴ - PATH" echo "Add PATH=\"$CALLER_DIR/tools:/usr/lib/postgresql/$PG_VER/bin:\$PATH\" to your bash profile" \ No newline at end of file