db.rs (18044B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use bitcoin::{Address, BlockHash}; 18 use bitcoin::{Txid, hashes::Hash}; 19 use depolymerizer_common::status::{BounceStatus, DebitStatus}; 20 use sqlx::{ 21 PgExecutor, PgPool, QueryBuilder, Row, 22 postgres::{PgListener, PgRow}, 23 }; 24 use taler_api::db::{BindHelper as _, TypeHelper as _, history, page}; 25 use taler_common::{ 26 api_common::{EddsaPublicKey, SafeU64, ShortHashCode}, 27 api_params::{History, Page}, 28 api_wire::{ 29 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, 30 TransferResponse, TransferState, TransferStatus, 31 }, 32 types::{ 33 amount::{Amount, Currency}, 34 timestamp::Timestamp, 35 }, 36 }; 37 use tokio::sync::watch::Receiver; 38 use url::Url; 39 40 use crate::{ 41 payto::FullBtcPayto, 42 sql::{sql_addr, sql_btc_amount, sql_generic_payto, sql_payto}, 43 }; 44 45 /// Lock the database for worker execution 46 pub async fn worker_lock<'a>(e: impl PgExecutor<'a>) -> sqlx::Result<bool> { 47 sqlx::query("SELECT pg_try_advisory_lock(42)") 48 .try_map(|r: PgRow| r.try_get(0)) 49 .fetch_one(e) 50 .await 51 } 52 53 /// Initialize the worker status 54 pub async fn init_status<'a>(e: impl PgExecutor<'a>) -> sqlx::Result<()> { 55 sqlx::query( 56 "INSERT INTO state (name, value) VALUES ('status', $1) ON CONFLICT (name) DO NOTHING", 57 ) 58 .bind([1u8]) 59 .execute(e) 60 .await?; 61 Ok(()) 62 } 63 64 /// Update the worker status 65 pub async fn update_status(e: &mut PgListener, new_status: bool) -> sqlx::Result<()> { 66 sqlx::query("UPDATE state SET value=$1 WHERE name='status'") 67 .bind([new_status as u8]) 68 .execute(&mut *e) 69 .await?; 70 sqlx::query("NOTIFY status").execute(e).await?; 71 Ok(()) 72 } 73 74 /// Initialize the worker sync state 75 pub async fn init_sync_state<'a>( 76 e: impl PgExecutor<'a>, 77 hash: &BlockHash, 78 reset: bool, 79 ) -> sqlx::Result<()> { 80 sqlx::query(if reset { 81 "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO UPDATE SET value=$1" 82 } else { 83 "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING" 84 }) 85 .bind(hash.as_byte_array()) 86 .execute(e) 87 .await?; 88 Ok(()) 89 } 90 91 /// Get the current worker sync state 92 pub async fn get_sync_state<'a>(e: impl PgExecutor<'a>) -> sqlx::Result<BlockHash> { 93 sqlx::query("SELECT value FROM state WHERE name='last_hash'") 94 .try_map(|r: PgRow| r.try_get_map(0, BlockHash::from_slice)) 95 .fetch_one(e) 96 .await 97 } 98 99 /// Update the worker sync state if it hasn't changed yet 100 pub async fn swap_sync_state<'a>( 101 e: impl PgExecutor<'a>, 102 from: &BlockHash, 103 to: &BlockHash, 104 ) -> sqlx::Result<()> { 105 sqlx::query("UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2") 106 .bind(to.as_byte_array()) 107 .bind(from.as_byte_array()) 108 .execute(e) 109 .await?; 110 Ok(()) 111 } 112 113 pub enum TransferResult { 114 Success(TransferResponse), 115 RequestUidReuse, 116 WtidReuse, 117 } 118 119 /// Initiate a new Taler transfer idempotently 120 pub async fn transfer<'a>( 121 e: impl PgExecutor<'a>, 122 creditor: &FullBtcPayto, 123 transfer: &TransferRequest, 124 ) -> sqlx::Result<TransferResult> { 125 sqlx::query( 126 " 127 SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at 128 FROM taler_transfer(($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8) 129 ", 130 ) 131 .bind_amount(&transfer.amount) 132 .bind(transfer.exchange_base_url.as_str()) 133 .bind(creditor.0.to_string()) 134 .bind(&creditor.name) 135 .bind(transfer.request_uid.as_slice()) 136 .bind(transfer.wtid.as_slice()) 137 .bind_timestamp(&Timestamp::now()) 138 .try_map(|r: PgRow| { 139 Ok(if r.try_get("out_request_uid_reuse")? { 140 TransferResult::RequestUidReuse 141 } else if r.try_get("out_wtid_reuse")? { 142 TransferResult::WtidReuse 143 } else { 144 TransferResult::Success(TransferResponse { 145 row_id: r.try_get_safeu64("out_transfer_row_id")?, 146 timestamp: r.try_get_timestamp("out_created_at")?, 147 }) 148 }) 149 }) 150 .fetch_one(e) 151 .await 152 } 153 154 /// Paginate initiated Taler transfers 155 pub async fn transfer_page<'a>( 156 e: impl PgExecutor<'a>, 157 status: &Option<TransferState>, 158 params: &Page, 159 currency: &Currency, 160 ) -> sqlx::Result<Vec<TransferListStatus>> { 161 let status = match status { 162 Some(s) => match s { 163 TransferState::pending => Some(DebitStatus::requested), 164 TransferState::success => Some(DebitStatus::sent), 165 TransferState::transient_failure | TransferState::permanent_failure => { 166 return Ok(Vec::new()); 167 } 168 }, 169 None => None, 170 }; 171 172 page( 173 e, 174 "id", 175 params, 176 || { 177 let mut sql = QueryBuilder::new( 178 " 179 SELECT 180 id, 181 status, 182 (amount).val as amount_val, 183 (amount).frac as amount_frac, 184 credit_acc, 185 credit_name, 186 created 187 FROM tx_out WHERE request_uid IS NOT NULL AND 188 ", 189 ); 190 if let Some(status) = status { 191 sql.push(" status = ").push_bind(status).push(" AND "); 192 } 193 sql 194 }, 195 |r: PgRow| { 196 Ok(TransferListStatus { 197 row_id: r.try_get_safeu64(0)?, 198 status: match r.try_get(1)? { 199 DebitStatus::requested => TransferState::pending, 200 DebitStatus::sent => TransferState::success, 201 }, 202 amount: r.try_get_amount("amount", currency)?, 203 credit_account: sql_payto(&r, "credit_acc", "credit_name")?, 204 timestamp: r.try_get_timestamp("created")?, 205 }) 206 }, 207 ) 208 .await 209 } 210 211 /// Get a Taler transfer info 212 pub async fn transfer_by_id<'a>( 213 e: impl PgExecutor<'a>, 214 id: u64, 215 currency: &Currency, 216 ) -> sqlx::Result<Option<TransferStatus>> { 217 sqlx::query( 218 " 219 SELECT 220 status, 221 (amount).val as amount_val, 222 (amount).frac as amount_frac, 223 exchange_url, 224 wtid, 225 credit_acc, 226 credit_name, 227 created 228 FROM tx_out WHERE request_uid IS NOT NULL AND id = $1 229 ", 230 ) 231 .bind(id as i64) 232 .try_map(|r: PgRow| { 233 Ok(TransferStatus { 234 status: match r.try_get(0)? { 235 DebitStatus::requested => TransferState::pending, 236 DebitStatus::sent => TransferState::success, 237 }, 238 status_msg: None, 239 amount: r.try_get_amount_i(1, currency)?, 240 origin_exchange_url: r.try_get(3)?, 241 wtid: r.try_get_base32(4)?, 242 credit_account: sql_payto(&r, 5, 6)?, 243 timestamp: r.try_get_timestamp(7)?, 244 }) 245 }) 246 .fetch_optional(e) 247 .await 248 } 249 250 /// Fetch outgoing Taler transactions history 251 pub async fn outgoing_history( 252 db: &PgPool, 253 params: &History, 254 currency: &Currency, 255 listen: impl FnOnce() -> Receiver<i64>, 256 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 257 history( 258 db, 259 "id", 260 params, 261 listen, 262 || QueryBuilder::new( 263 "SELECT id, created, (amount).val, (amount).frac, wtid, credit_acc, credit_name, exchange_url FROM tx_out WHERE" 264 ), |r| { 265 Ok(OutgoingBankTransaction { 266 row_id: r.try_get_safeu64(0)?, 267 date: r.try_get_timestamp(1)?, 268 amount: r.try_get_amount_i(2, currency)?, 269 wtid: r.try_get_base32(4)?, 270 credit_account: sql_payto(&r, 5, 6)?, 271 exchange_base_url: r.try_get_url(7)?, 272 }) 273 }).await 274 } 275 276 /// Fetch incoming Taler transactions history 277 pub async fn incoming_history( 278 db: &PgPool, 279 params: &History, 280 currency: &Currency, 281 listen: impl FnOnce() -> Receiver<i64>, 282 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 283 history( 284 db, 285 "id", 286 params, 287 listen, 288 || QueryBuilder::new( 289 "SELECT id, received, (amount).val, (amount).frac, reserve_pub, debit_acc FROM tx_in WHERE" 290 ), |r| { 291 Ok(IncomingBankTransaction::Reserve { 292 row_id: r.try_get_safeu64(0)?, 293 date: r.try_get_timestamp(1)?, 294 amount: r.try_get_amount_i(2, currency)?, 295 reserve_pub: r.try_get_base32(4)?, 296 debit_account: sql_generic_payto(&r, 5)?, 297 }) 298 }).await 299 } 300 301 #[derive(Debug, PartialEq, Eq)] 302 pub enum AddIncomingResult { 303 Success { 304 new: bool, 305 row_id: SafeU64, 306 valued_at: Timestamp, 307 }, 308 ReservePubReuse, 309 } 310 311 /// Register a fake Taler credit 312 pub async fn register_tx_in_admin<'a>( 313 e: impl PgExecutor<'a>, 314 amount: &Amount, 315 debit_acc: &Address, 316 received: &Timestamp, 317 reserve_pub: &EddsaPublicKey, 318 ) -> sqlx::Result<AddIncomingResult> { 319 sqlx::query( 320 " 321 SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new 322 FROM register_tx_in(($1, $2)::taler_amount, $3, $4, $5, NULL) 323 ", 324 ) 325 .bind_amount(amount) 326 .bind(debit_acc.to_string()) 327 .bind(reserve_pub.as_slice()) 328 .bind_timestamp(received) 329 .try_map(|r: PgRow| { 330 Ok(if r.try_get(0)? { 331 AddIncomingResult::ReservePubReuse 332 } else { 333 AddIncomingResult::Success { 334 row_id: r.try_get_safeu64(1)?, 335 valued_at: r.try_get_timestamp(2)?, 336 new: r.try_get(3)?, 337 } 338 }) 339 }) 340 .fetch_one(e) 341 .await 342 } 343 344 /// Register a Taler credit 345 pub async fn register_tx_in<'a>( 346 e: impl PgExecutor<'a>, 347 txid: &Txid, 348 amount: &Amount, 349 debit_acc: &Address, 350 received: &Timestamp, 351 reserve_pub: &EddsaPublicKey, 352 ) -> sqlx::Result<AddIncomingResult> { 353 sqlx::query( 354 " 355 SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new 356 FROM register_tx_in(($1, $2)::taler_amount, $3, $4, $5, $6) 357 ", 358 ) 359 .bind_amount(amount) 360 .bind(debit_acc.to_string()) 361 .bind(reserve_pub.as_slice()) 362 .bind_timestamp(received) 363 .bind(txid.as_byte_array()) 364 .try_map(|r: PgRow| { 365 Ok(if r.try_get(0)? { 366 AddIncomingResult::ReservePubReuse 367 } else { 368 AddIncomingResult::Success { 369 row_id: r.try_get_safeu64(1)?, 370 valued_at: r.try_get_timestamp(2)?, 371 new: r.try_get(3)?, 372 } 373 }) 374 }) 375 .fetch_one(e) 376 .await 377 } 378 379 /// Update a transaction id after bumping it 380 pub async fn bump_tx_id<'a>( 381 e: impl PgExecutor<'a>, 382 from: &Txid, 383 to: &Txid, 384 ) -> sqlx::Result<ShortHashCode> { 385 sqlx::query("UPDATE tx_out SET txid=$1 WHERE txid=$2 RETURNING wtid") 386 .bind(to.as_byte_array()) 387 .bind(from.as_byte_array()) 388 .try_map(|r: PgRow| r.try_get_base32(0)) 389 .fetch_one(e) 390 .await 391 } 392 393 /// Reset the state of a conflicted debit 394 pub async fn conflict_tx_out<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> { 395 Ok( 396 sqlx::query("UPDATE tx_out SET status=$1, txid=NULL where txid=$2") 397 .bind(DebitStatus::requested) 398 .bind(id.as_byte_array()) 399 .execute(e) 400 .await? 401 .rows_affected() 402 > 0, 403 ) 404 } 405 406 /// Reset the state of a conflicted bounce 407 pub async fn conflict_bounce<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> { 408 Ok( 409 sqlx::query("UPDATE bounce SET status=$1, txid=NULL where txid=$2") 410 .bind(BounceStatus::requested) 411 .bind(id.as_byte_array()) 412 .execute(e) 413 .await? 414 .rows_affected() 415 > 0, 416 ) 417 } 418 419 /// Initiate a bounce 420 pub async fn bounce<'a>(e: impl PgExecutor<'a>, txid: &Txid, reason: &str) -> sqlx::Result<()> { 421 sqlx::query("INSERT INTO bounce (created, bounced, reason, status) VALUES ($1, $2, $3, 'requested') ON CONFLICT (bounced) DO NOTHING") 422 .bind_timestamp(&Timestamp::now()) 423 .bind(txid.as_byte_array()) 424 .bind(reason) 425 .execute(e) 426 .await?; 427 Ok(()) 428 } 429 430 pub enum ProblematicTx { 431 In { 432 txid: Txid, 433 addr: Address, 434 reserve_pub: EddsaPublicKey, 435 }, 436 Bounce { 437 txid: Txid, 438 bounced: Txid, 439 }, 440 } 441 442 /// Handle transactions being removed during a reorganization 443 pub async fn reorg<'a>(e: impl PgExecutor<'a>, ids: &[Txid]) -> sqlx::Result<Vec<ProblematicTx>> { 444 // A removed incoming transaction is a correctness issues in only two cases: 445 // - it is a confirmed credit registered in the database 446 // - it is an invalid transactions already bounced 447 // Those two cases can compromise bitcoin backing 448 // Removed outgoing transactions will be retried automatically by the node 449 sqlx::query( 450 " 451 SELECT tx_in.txid, NULL, debit_acc, tx_in.reserve_pub 452 FROM tx_in WHERE tx_in.txid = ANY($1) 453 UNION ALL 454 SELECT bounce.txid, bounce.bounced, NULL, NULL 455 from bounce WHERE bounce.bounced = ANY($1); 456 ", 457 ) 458 .bind(ids.iter().map(|it| it.as_byte_array()).collect::<Vec<_>>()) 459 .try_map(|r: PgRow| { 460 let txid = r.try_get_map(0, Txid::from_slice)?; 461 let check: Option<&[u8]> = r.try_get(1)?; 462 Ok(if check.is_some() { 463 ProblematicTx::Bounce { 464 txid, 465 bounced: r.try_get_map(1, Txid::from_slice)?, 466 } 467 } else { 468 ProblematicTx::In { 469 txid, 470 addr: sql_addr(&r, 2)?, 471 reserve_pub: r.try_get_base32(3)?, 472 } 473 }) 474 }) 475 .fetch_all(e) 476 .await 477 } 478 479 pub enum SyncOutResult { 480 New, 481 Replaced, 482 Recovered, 483 None, 484 } 485 486 pub async fn sync_out<'a>( 487 e: impl PgExecutor<'a>, 488 txid: &Txid, 489 replace_txid: Option<&Txid>, 490 amount: &Amount, 491 exchange_url: &Url, 492 credit_acc: &Address, 493 wtid: &ShortHashCode, 494 created: &Timestamp, 495 ) -> sqlx::Result<SyncOutResult> { 496 sqlx::query( 497 " 498 SELECT out_replaced, out_recovered, out_new 499 FROM sync_out($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8) 500 ", 501 ) 502 .bind(txid.as_byte_array()) 503 .bind(replace_txid.map(|it| it.as_byte_array())) 504 .bind_amount(amount) 505 .bind(exchange_url.to_string()) 506 .bind(credit_acc.to_string()) 507 .bind(wtid.as_slice()) 508 .bind_timestamp(created) 509 .try_map(|r: PgRow| { 510 Ok(if r.try_get(0)? { 511 SyncOutResult::Replaced 512 } else if r.try_get(1)? { 513 SyncOutResult::Recovered 514 } else if r.try_get(2)? { 515 SyncOutResult::New 516 } else { 517 SyncOutResult::None 518 }) 519 }) 520 .fetch_one(e) 521 .await 522 } 523 524 pub async fn pending_debit<'a>( 525 e: impl PgExecutor<'a>, 526 currency: &Currency, 527 ) -> sqlx::Result<Option<(i64, bitcoin::Amount, ShortHashCode, Address, Url)>> { 528 sqlx::query( 529 "SELECT id, (amount).val, (amount).frac, wtid, credit_acc, exchange_url FROM tx_out WHERE status='requested' ORDER BY created LIMIT 1", 530 ) 531 .try_map(|r: PgRow| { 532 Ok(( 533 r.try_get(0)?, 534 sql_btc_amount(&r, 1, currency)?, 535 r.try_get_base32(3)?, 536 sql_addr(&r, 4)?, 537 r.try_get_parse(5)? 538 )) 539 }) 540 .fetch_optional(e) 541 .await 542 } 543 544 pub async fn debit_sent<'a>(e: impl PgExecutor<'a>, id: i64, txid: &Txid) -> sqlx::Result<()> { 545 sqlx::query("UPDATE tx_out SET status='sent', txid=$1 WHERE id=$2") 546 .bind(txid.as_byte_array()) 547 .bind(id) 548 .execute(e) 549 .await?; 550 Ok(()) 551 } 552 553 pub async fn pending_bounce<'a>( 554 e: impl PgExecutor<'a>, 555 ) -> sqlx::Result<Option<(i64, Txid, Option<String>)>> { 556 sqlx::query( 557 "SELECT id, bounced, reason FROM bounce WHERE status='requested' ORDER BY created LIMIT 1", 558 ) 559 .try_map(|r: PgRow| { 560 Ok(( 561 r.try_get(0)?, 562 r.try_get_map(1, Txid::from_slice)?, 563 r.try_get(2)?, 564 )) 565 }) 566 .fetch_optional(e) 567 .await 568 } 569 570 pub async fn bounce_set_status<'a>( 571 e: impl PgExecutor<'a>, 572 id: i64, 573 txid: Option<&Txid>, 574 status: &BounceStatus, 575 ) -> sqlx::Result<()> { 576 sqlx::query("UPDATE bounce SET txid=$1, status=$2 WHERE id=$3") 577 .bind(txid.map(|it| it.as_byte_array())) 578 .bind(status) 579 .bind(id) 580 .execute(e) 581 .await?; 582 Ok(()) 583 } 584 585 pub enum SyncBounceResult { 586 New, 587 Recovered, 588 None, 589 } 590 591 pub async fn sync_bounce<'a>( 592 e: impl PgExecutor<'a>, 593 txid: &Txid, 594 bounced: &Txid, 595 created: &Timestamp, 596 ) -> sqlx::Result<SyncBounceResult> { 597 sqlx::query( 598 " 599 SELECT out_recovered, out_new 600 FROM sync_bounce($1, $2, $3) 601 ", 602 ) 603 .bind(txid.as_byte_array()) 604 .bind(bounced.as_byte_array()) 605 .bind_timestamp(created) 606 .try_map(|r: PgRow| { 607 Ok(if r.try_get(0)? { 608 SyncBounceResult::Recovered 609 } else if r.try_get(1)? { 610 SyncBounceResult::New 611 } else { 612 SyncBounceResult::None 613 }) 614 }) 615 .fetch_one(e) 616 .await 617 }