/*
This file is part of TALER
Copyright (C) 2022 Taler Systems SA
TALER is free software; you can redistribute it and/or modify it under the
terms of the GNU Affero General Public License as published by the Free Software
Foundation; either version 3, or (at your option) any later version.
TALER is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License along with
TALER; see the file COPYING. If not, see
*/
use std::{
collections::HashMap,
fmt::Write,
time::{Duration, SystemTime},
};
use bitcoin::{hashes::Hash, Amount as BtcAmount, BlockHash, Txid};
use btc_wire::{
rpc::{self, AutoRpcWallet, Category, ErrorCode, Rpc, Transaction},
rpc_utils::sender_address,
taler_utils::{btc_payto_url, btc_to_taler},
GetOpReturnErr, GetSegwitErr,
};
use common::{
api_common::base32,
log::{
log::{error, info, warn},
OrFail,
},
metadata::OutMetadata,
postgres,
reconnect::AutoReconnectDb,
sql::{sql_array, sql_url},
status::{BounceStatus, DebitStatus},
};
use postgres::{fallible_iterator::FallibleIterator, Client};
use crate::{
fail_point::fail_point,
sql::{sql_addr, sql_btc_amount, sql_txid},
WireState,
};
use super::{analysis::analysis, LoopError, LoopResult};
/// Synchronize local db with blockchain and perform transactions
pub fn worker(mut rpc: AutoRpcWallet, mut db: AutoReconnectDb, mut state: WireState) {
let mut lifetime = state.lifetime;
let mut status = true;
let mut skip_notification = false;
loop {
// Check lifetime
if let Some(nb) = lifetime.as_mut() {
if *nb == 0 {
info!("Reach end of lifetime");
return;
} else {
*nb -= 1;
}
}
// Connect
let rpc = rpc.client();
let db = db.client();
let result: LoopResult<()> = (|| {
// Listen to all channels
db.batch_execute("LISTEN new_block; LISTEN new_tx")?;
// Wait for the next notification
{
let mut ntf = db.notifications();
if !skip_notification && ntf.is_empty() {
// Block until next notification
ntf.blocking_iter().next()?;
}
// Conflate all notifications
let mut iter = ntf.iter();
while iter.next()?.is_some() {}
}
// It is not possible to atomically update the blockchain and the database.
// When we failed to sync the database and the blockchain state we rely on
// sync_chain to recover the lost updates.
// When this function is running concurrently, it not possible to known another
// execution has failed, and this can lead to a transaction being sent multiple time.
// To ensure only a single version of this function is running at a given time we rely
// on postgres advisory lock
// Take the lock
let row = db.query_one("SELECT pg_try_advisory_lock(42)", &[])?;
let locked: bool = row.get(0);
if !locked {
return Err(LoopError::Concurrency);
}
// Perform analysis
state.confirmation = analysis(rpc, state.confirmation, state.max_confirmation)?;
// Sync chain
if let Some(stuck) = sync_chain(rpc, db, &state, &mut status)? {
// As we are now in sync with the blockchain if a transaction has Requested status it have not been sent
// Send requested debits
while debit(db, rpc, &state)? {}
// Bump stuck transactions
for id in stuck {
let bump = rpc.bump_fee(&id)?;
fail_point("(injected) fail bump", 0.3)?;
let row = db.query_one(
"UPDATE tx_out SET txid=$1 WHERE txid=$2 RETURNING wtid",
&[
&bump.txid.as_byte_array().as_slice(),
&id.as_byte_array().as_slice(),
],
)?;
info!(
">> (bump) {} replace {} with {}",
base32(row.get(0)),
id,
bump.txid
);
}
// Send requested bounce
while bounce(db, rpc, &state.bounce_fee)? {}
}
Ok(())
})();
if let Err(e) = result {
error!("worker: {}", e);
// When we catch an error, we sometimes want to retry immediately (eg. reconnect to RPC or DB).
// Bitcoin error codes are generic. We need to match the msg to get precise ones. Some errors
// can resolve themselves when a new block is mined (new fees, new transactions). Our simple
// approach is to wait for the next loop when an RPC error is caught to prevent endless logged errors.
skip_notification = !matches!(
e,
LoopError::Rpc(rpc::Error::RPC { .. } | rpc::Error::Bitcoin(_))
| LoopError::Concurrency
);
} else {
skip_notification = false;
}
}
}
/// Retrieve last stored hash
fn last_hash(db: &mut Client) -> Result {
let row = db.query_one("SELECT value FROM state WHERE name='last_hash'", &[])?;
Ok(BlockHash::from_slice(row.get(0)).unwrap())
}
/// Parse new transactions, return stuck transactions if the database is up to date with the latest mined block
fn sync_chain(
rpc: &mut Rpc,
db: &mut Client,
state: &WireState,
status: &mut bool,
) -> LoopResult