db.rs (38035B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use bitcoin::{Address, BlockHash, Txid, hashes::Hash}; 18 use compact_str::CompactString; 19 use depolymerizer_common::status::DebitStatus; 20 use jiff::Timestamp; 21 use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow}; 22 use taler_api::{ 23 db::{BindHelper as _, TypeHelper as _, history, page}, 24 serialized, 25 subject::IncomingSubject, 26 }; 27 use taler_common::{ 28 api::{ 29 EddsaPublicKey, EddsaSignature, ShortHashCode, 30 params::{History, Page}, 31 revenue::RevenueIncomingBankTransaction, 32 wire::{ 33 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, 34 TransferResponse, TransferState, TransferStatus, 35 }, 36 }, 37 db::IncomingType, 38 types::amount::{Amount, Currency}, 39 }; 40 use tokio::sync::watch::Receiver; 41 use url::Url; 42 43 use crate::{ 44 payto::FullBtcPayto, 45 sql::{sql_addr, sql_btc_amount, sql_generic_payto, sql_payto}, 46 }; 47 48 /// Initialize the worker status 49 pub async fn init_status(db: &PgPool) -> sqlx::Result<()> { 50 sqlx::query( 51 "INSERT INTO state (name, value) VALUES ('status', $1) ON CONFLICT (name) DO NOTHING", 52 ) 53 .bind([1u8]) 54 .execute(db) 55 .await?; 56 Ok(()) 57 } 58 59 /// Get the worker status 60 pub async fn get_status(db: &PgPool) -> sqlx::Result<Option<[u8; 1]>> { 61 sqlx::query_scalar("SELECT value FROM state WHERE name = 'status'") 62 .fetch_optional(db) 63 .await 64 } 65 66 /// Update the worker status 67 pub async fn update_status(db: &mut PgConnection, new_status: bool) -> sqlx::Result<()> { 68 sqlx::query("UPDATE state SET value=$1 WHERE name='status'") 69 .bind([new_status as u8]) 70 .execute(&mut *db) 71 .await?; 72 sqlx::query("NOTIFY status").execute(db).await?; 73 Ok(()) 74 } 75 76 /// Initialize the worker sync state 77 pub async fn init_sync_state(db: &PgPool, hash: &BlockHash, reset: bool) -> sqlx::Result<()> { 78 sqlx::query(if reset { 79 "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO UPDATE SET value=$1" 80 } else { 81 "INSERT INTO state (name, value) VALUES ('last_hash', $1) ON CONFLICT (name) DO NOTHING" 82 }) 83 .bind(hash.as_byte_array()) 84 .execute(db) 85 .await?; 86 Ok(()) 87 } 88 89 /// Get the current worker sync state 90 pub async fn get_sync_state(db: &mut PgConnection) -> sqlx::Result<BlockHash> { 91 sqlx::query("SELECT value FROM state WHERE name='last_hash'") 92 .try_map(|r: PgRow| r.try_get_map(0, BlockHash::from_slice)) 93 .fetch_one(db) 94 .await 95 } 96 97 /// Update the worker sync state if it hasn't changed yet 98 pub async fn swap_sync_state( 99 db: &mut PgConnection, 100 from: &BlockHash, 101 to: &BlockHash, 102 ) -> sqlx::Result<()> { 103 sqlx::query("UPDATE state SET value=$1 WHERE name='last_hash' AND value=$2") 104 .bind(to.as_byte_array()) 105 .bind(from.as_byte_array()) 106 .execute(db) 107 .await?; 108 Ok(()) 109 } 110 111 #[derive(Debug)] 112 pub enum TransferResult { 113 Success(TransferResponse), 114 RequestUidReuse, 115 WtidReuse, 116 } 117 118 /// Initiate a new Taler transfer idempotently 119 pub async fn transfer( 120 db: &PgPool, 121 creditor: &FullBtcPayto, 122 transfer: &TransferRequest, 123 ) -> sqlx::Result<TransferResult> { 124 serialized!( 125 sqlx::query( 126 " 127 SELECT out_request_uid_reuse, out_wtid_reuse, out_transfer_row_id, out_created_at 128 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8) 129 ", 130 ) 131 .bind(transfer.amount) 132 .bind(transfer.exchange_base_url.as_str()) 133 .bind(creditor.0.to_string()) 134 .bind(&creditor.name) 135 .bind(transfer.request_uid.as_slice()) 136 .bind(transfer.wtid.as_slice()) 137 .bind(transfer.metadata.as_deref()) 138 .bind_timestamp(&Timestamp::now()) 139 .try_map(|r: PgRow| { 140 Ok(if r.try_get_flag("out_request_uid_reuse")? { 141 TransferResult::RequestUidReuse 142 } else if r.try_get_flag("out_wtid_reuse")? { 143 TransferResult::WtidReuse 144 } else { 145 TransferResult::Success(TransferResponse { 146 row_id: r.try_get_u64("out_transfer_row_id")?, 147 timestamp: r.try_get_taler_timestamp("out_created_at")?, 148 }) 149 }) 150 }) 151 .fetch_one(db) 152 ) 153 } 154 155 /// Paginate initiated Taler transfers 156 pub async fn transfer_page( 157 db: &PgPool, 158 status: &Option<TransferState>, 159 params: &Page, 160 currency: &Currency, 161 ) -> sqlx::Result<Vec<TransferListStatus>> { 162 let status = match status { 163 Some(s) => match s { 164 TransferState::pending => Some(DebitStatus::requested), 165 TransferState::success => Some(DebitStatus::sent), 166 TransferState::transient_failure 167 | TransferState::permanent_failure 168 | TransferState::late_failure => { 169 return Ok(Vec::new()); 170 } 171 }, 172 None => None, 173 }; 174 175 page( 176 db, 177 params, 178 "transfer_id", 179 || { 180 let mut sql = QueryBuilder::new( 181 " 182 SELECT 183 transfer_id, 184 status, 185 amount, 186 credit_acc, 187 credit_name, 188 created_at 189 FROM transfer WHERE 190 ", 191 ); 192 if let Some(status) = status { 193 sql.push(" status = ").push_bind(status).push(" AND "); 194 } 195 sql 196 }, 197 |r: PgRow| { 198 Ok(TransferListStatus { 199 row_id: r.try_get_u64(0)?, 200 status: match r.try_get(1)? { 201 DebitStatus::requested | DebitStatus::sent => TransferState::pending, 202 DebitStatus::confirmed => TransferState::success, 203 }, 204 amount: r.try_get_amount(2, currency)?, 205 credit_account: sql_payto(&r, 3, 4)?, 206 timestamp: r.try_get_taler_timestamp(5)?, 207 }) 208 }, 209 ) 210 .await 211 } 212 213 /// Get a Taler transfer info 214 pub async fn transfer_by_id( 215 db: &PgPool, 216 id: u64, 217 currency: &Currency, 218 ) -> sqlx::Result<Option<TransferStatus>> { 219 serialized!( 220 sqlx::query( 221 " 222 SELECT 223 status, 224 amount, 225 exchange_url, 226 wtid, 227 credit_acc, 228 credit_name, 229 metadata, 230 created_at 231 FROM transfer WHERE transfer_id = $1 232 ", 233 ) 234 .bind(id as i64) 235 .try_map(|r: PgRow| { 236 Ok(TransferStatus { 237 status: match r.try_get(0)? { 238 DebitStatus::requested | DebitStatus::sent => TransferState::pending, 239 DebitStatus::confirmed => TransferState::success, 240 }, 241 status_msg: None, 242 amount: r.try_get_amount(1, currency)?, 243 origin_exchange_url: r.try_get(2)?, 244 wtid: r.try_get(3)?, 245 credit_account: sql_payto(&r, 4, 5)?, 246 metadata: r.try_get(6)?, 247 timestamp: r.try_get_taler_timestamp(7)?, 248 }) 249 }) 250 .fetch_optional(db) 251 ) 252 } 253 254 /// Fetch outgoing Taler transactions history 255 pub async fn outgoing_history( 256 db: &PgPool, 257 params: &History, 258 currency: &Currency, 259 listen: impl FnOnce() -> Receiver<i64>, 260 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 261 history( 262 db, 263 "tx_out_id", 264 params, 265 listen, 266 || { 267 QueryBuilder::new( 268 " 269 SELECT 270 tx_out_id, 271 tx_out.created_at, 272 tx_out.amount, 273 taler_out.wtid, 274 tx_out.credit_acc, 275 transfer.credit_name, 276 taler_out.exchange_base_url, 277 taler_out.metadata 278 FROM tx_out 279 JOIN taler_out USING (tx_out_id) 280 LEFT JOIN transfer USING (txid) 281 WHERE 282 ", 283 ) 284 }, 285 |r| { 286 Ok(OutgoingBankTransaction { 287 row_id: r.try_get_u64(0)?, 288 date: r.try_get_taler_timestamp(1)?, 289 amount: r.try_get_amount(2, currency)?, 290 wtid: r.try_get(3)?, 291 credit_account: sql_payto(&r, 4, 5)?, 292 exchange_base_url: r.try_get_url(6)?, 293 debit_fee: None, // TODO we can actually get this information 294 metadata: r.try_get(7)?, 295 }) 296 }, 297 ) 298 .await 299 } 300 301 /// Fetch incoming Taler transactions history 302 pub async fn incoming_history( 303 db: &PgPool, 304 params: &History, 305 currency: &Currency, 306 listen: impl FnOnce() -> Receiver<i64>, 307 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 308 history( 309 db, 310 "tx_in_id", 311 params, 312 listen, 313 || { 314 QueryBuilder::new( 315 " 316 SELECT 317 tx_in_id, 318 received_at, 319 amount, 320 debit_acc, 321 type, 322 metadata, 323 authorization_pub, 324 authorization_sig 325 FROM tx_in JOIN taler_in USING (tx_in_id) 326 WHERE 327 ", 328 ) 329 }, 330 |r| { 331 Ok(match r.try_get(4)? { 332 IncomingType::reserve => IncomingBankTransaction::Reserve { 333 row_id: r.try_get_u64(0)?, 334 date: r.try_get_taler_timestamp(1)?, 335 amount: r.try_get_amount(2, currency)?, 336 reserve_pub: r.try_get(5)?, 337 debit_account: sql_generic_payto(&r, 3)?, 338 credit_fee: None, // TODO store this 339 authorization_pub: r.try_get(6)?, 340 authorization_sig: r.try_get(7)?, 341 }, 342 IncomingType::kyc => IncomingBankTransaction::Kyc { 343 row_id: r.try_get_u64(0)?, 344 date: r.try_get_taler_timestamp(1)?, 345 amount: r.try_get_amount(2, currency)?, 346 account_pub: r.try_get(5)?, 347 debit_account: sql_generic_payto(&r, 3)?, 348 credit_fee: None, // TODO store this 349 authorization_pub: r.try_get(6)?, 350 authorization_sig: r.try_get(7)?, 351 }, 352 IncomingType::map => unimplemented!("MAP are never listed in the history"), 353 }) 354 }, 355 ) 356 .await 357 } 358 359 /// Fetch incoming Taler transactions history 360 pub async fn revenue_history( 361 db: &PgPool, 362 params: &History, 363 currency: &Currency, 364 listen: impl FnOnce() -> Receiver<i64>, 365 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 366 history( 367 db, 368 "tx_in_id", 369 params, 370 listen, 371 || { 372 QueryBuilder::new( 373 " 374 SELECT 375 tx_in_id, 376 received_at, 377 amount, 378 debit_acc 379 FROM tx_in 380 WHERE 381 ", 382 ) 383 }, 384 |r| { 385 Ok(RevenueIncomingBankTransaction { 386 row_id: r.try_get_u64(0)?, 387 date: r.try_get_taler_timestamp(1)?, 388 amount: r.try_get_amount(2, currency)?, 389 debit_account: sql_generic_payto(&r, 3)?, 390 credit_fee: None, // TODO store this 391 subject: String::new(), 392 }) 393 }, 394 ) 395 .await 396 } 397 398 #[derive(Debug, PartialEq, Eq)] 399 pub enum AddIncomingResult { 400 Success { 401 new: bool, 402 pending: bool, 403 row_id: u64, 404 valued_at: Timestamp, 405 }, 406 ReservePubReuse, 407 UnknownMapping, 408 MappingReuse, 409 } 410 411 /// Register a fake Taler credit 412 pub async fn register_tx_in_admin( 413 db: &PgPool, 414 amount: &Amount, 415 debit_acc: &Address, 416 received: &Timestamp, 417 metadata: &IncomingSubject, 418 ) -> sqlx::Result<AddIncomingResult> { 419 sqlx::query( 420 " 421 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 422 FROM register_tx_in(NULL, $1, $2, $3, $4, $5) 423 ", 424 ) 425 .bind(amount) 426 .bind(debit_acc.to_string()) 427 .bind_timestamp(received) 428 .bind(metadata.ty()) 429 .bind(metadata.key()) 430 .try_map(|r: PgRow| { 431 Ok(if r.try_get_flag(0)? { 432 AddIncomingResult::ReservePubReuse 433 } else if r.try_get_flag(1)? { 434 AddIncomingResult::MappingReuse 435 } else if r.try_get_flag(2)? { 436 AddIncomingResult::UnknownMapping 437 } else { 438 AddIncomingResult::Success { 439 row_id: r.try_get_u64(3)?, 440 valued_at: r.try_get_timestamp(4)?, 441 new: r.try_get_flag(5)?, 442 pending: r.try_get_flag(6)? 443 } 444 }) 445 }) 446 .fetch_one(db) 447 .await 448 } 449 450 /// Register a Taler credit 451 pub async fn register_tx_in<'a>( 452 e: impl PgExecutor<'a>, 453 txid: &Txid, 454 amount: &Amount, 455 debit_acc: &Address, 456 received: &Timestamp, 457 subject: &Option<IncomingSubject>, 458 ) -> sqlx::Result<AddIncomingResult> { 459 sqlx::query( 460 " 461 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 462 FROM register_tx_in($1, $2, $3, $4, $5, $6) 463 ", 464 ) 465 .bind(txid.as_byte_array()) 466 .bind(amount) 467 .bind(debit_acc.to_string()) 468 .bind_timestamp(received) 469 .bind(subject.as_ref().map(|it| it.ty())) 470 .bind(subject.as_ref().map(|it| it.key())) 471 .try_map(|r: PgRow| { 472 Ok(if r.try_get_flag(0)? { 473 AddIncomingResult::ReservePubReuse 474 } else if r.try_get_flag(1)? { 475 AddIncomingResult::MappingReuse 476 } else if r.try_get_flag(2)? { 477 AddIncomingResult::UnknownMapping 478 } else { 479 AddIncomingResult::Success { 480 row_id: r.try_get_u64(3)?, 481 valued_at: r.try_get_timestamp(4)?, 482 new: r.try_get_flag(5)?, 483 pending: r.try_get_flag(6)?, 484 } 485 }) 486 }) 487 .fetch_one(e) 488 .await 489 } 490 491 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 492 pub enum RegistrationResult { 493 Success, 494 ReservePubReuse, 495 SubjectReuse, 496 } 497 498 pub async fn transfer_register( 499 db: &PgPool, 500 ty: IncomingType, 501 account_pub: &EddsaPublicKey, 502 auth_pub: &EddsaPublicKey, 503 auth_sig: &EddsaSignature, 504 recurrent: bool, 505 timestamp: &Timestamp, 506 ) -> sqlx::Result<RegistrationResult> { 507 serialized!( 508 sqlx::query( 509 " 510 SELECT out_reserve_pub_reuse 511 FROM register_prepared_transfers ( 512 $1,$2,$3,$4,$5,$6 513 ) 514 ", 515 ) 516 .bind(ty) 517 .bind(account_pub) 518 .bind(auth_pub) 519 .bind(auth_sig) 520 .bind(recurrent) 521 .bind_timestamp(timestamp) 522 .try_map(|r: PgRow| { 523 Ok(if r.try_get_flag(0)? { 524 RegistrationResult::ReservePubReuse 525 } else { 526 RegistrationResult::Success 527 }) 528 }) 529 .fetch_one(db) 530 ) 531 } 532 533 pub async fn transfer_unregister( 534 db: &PgPool, 535 auth_pub: &EddsaPublicKey, 536 timestamp: &Timestamp, 537 ) -> sqlx::Result<bool> { 538 serialized!( 539 sqlx::query_scalar("SELECT out_found FROM delete_prepared_transfers($1,$2)") 540 .bind(auth_pub) 541 .bind_timestamp(timestamp) 542 .fetch_one(db) 543 ) 544 } 545 546 /// Update a transaction id after bumping it 547 pub async fn bump_tx_id( 548 db: &mut PgConnection, 549 to: &Txid, 550 wtid: &ShortHashCode, 551 ) -> sqlx::Result<()> { 552 sqlx::query("UPDATE transfer SET txid=$1 WHERE wtid=$2") 553 .bind(to.as_byte_array()) 554 .bind(wtid) 555 .execute(db) 556 .await?; 557 Ok(()) 558 } 559 560 /// Initiate a bounce 561 pub async fn bounce<'a>( 562 e: impl PgExecutor<'a>, 563 txid: &Txid, 564 amount: &Amount, 565 debit_acc: &Address, 566 received: &Timestamp, 567 reason: &str, 568 ) -> sqlx::Result<()> { 569 sqlx::query("SELECT FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6)") 570 .bind(txid.as_byte_array()) 571 .bind(amount) 572 .bind(debit_acc.to_string()) 573 .bind_timestamp(received) 574 .bind(reason) 575 .bind_timestamp(&Timestamp::now()) 576 .execute(e) 577 .await?; 578 Ok(()) 579 } 580 581 #[derive(Debug, PartialEq, Eq)] 582 pub enum ProblematicTx { 583 Taler { 584 txid: Txid, 585 addr: Address, 586 ty: IncomingType, 587 metadata: EddsaPublicKey, 588 }, 589 Bounce { 590 txid: Txid, 591 bounced_in: Txid, 592 }, 593 Simple { 594 txid: Txid, 595 }, 596 } 597 598 /// Handle transactions being removed during a reorganization 599 pub async fn reorg<'a>(e: impl PgExecutor<'a>, ids: &[Txid]) -> sqlx::Result<Vec<ProblematicTx>> { 600 // Any incoming transactions that is currently considered final ('confirmed') is a potential correctness issues 601 // Removed outgoing transactions will be retried automatically by the node/wallet and therefore 602 // do not mandate a full adapter stop 603 604 sqlx::query( 605 " 606 SELECT txid, NULL, type, debit_acc, metadata 607 FROM tx_in JOIN taler_in USING (tx_in_id) WHERE txid = ANY($1) 608 UNION ALL 609 SELECT tx_in.txid, bounced.txid, NULL, NULL, NULL 610 from tx_in JOIN bounced USING (tx_in_id) WHERE tx_in.txid = ANY($1) 611 ", 612 ) 613 .bind(ids.iter().map(|it| it.as_byte_array()).collect::<Vec<_>>()) 614 .try_map(|r: PgRow| { 615 let txid = r.try_get_map(0, Txid::from_slice)?; 616 Ok( 617 if let Some(bounced_in) = r.try_get_opt_map(1, Txid::from_slice)? { 618 ProblematicTx::Bounce { txid, bounced_in } 619 } else if let Some(ty) = r.try_get(2)? { 620 ProblematicTx::Taler { 621 txid, 622 ty, 623 addr: sql_addr(&r, 3)?, 624 metadata: r.try_get(4)?, 625 } 626 } else { 627 ProblematicTx::Simple { txid } 628 }, 629 ) 630 }) 631 .fetch_all(e) 632 .await 633 } 634 635 #[derive(Debug)] 636 pub enum SyncOutResult { 637 New, 638 Replaced, 639 Recovered, 640 None, 641 } 642 643 #[derive(Debug)] 644 pub enum TxOutKind<'a> { 645 Simple, 646 Bounce(Txid), 647 Talerable { 648 wtid: &'a ShortHashCode, 649 url: &'a Url, 650 metadata: Option<&'a str>, 651 }, 652 } 653 654 pub async fn sync_out<'a>( 655 e: impl PgExecutor<'a>, 656 txid: &Txid, 657 replace_txid: Option<&Txid>, 658 amount: &Amount, 659 credit_acc: &Address, 660 kind: &TxOutKind<'_>, 661 created: &Timestamp, 662 ) -> sqlx::Result<SyncOutResult> { 663 let query = sqlx::query( 664 " 665 SELECT out_replaced, out_recovered, out_new 666 FROM sync_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 667 ", 668 ) 669 .bind(txid.as_byte_array()) 670 .bind(replace_txid.map(|it| it.as_byte_array())) 671 .bind(amount) 672 .bind(credit_acc.to_string()); 673 match kind { 674 TxOutKind::Simple => query 675 .bind(None::<&[u8]>) 676 .bind(None::<&str>) 677 .bind(None::<&str>) 678 .bind(None::<&[u8]>), 679 TxOutKind::Bounce(bounced) => query 680 .bind(None::<&[u8]>) 681 .bind(None::<&str>) 682 .bind(None::<&str>) 683 .bind(bounced.as_byte_array()), 684 TxOutKind::Talerable { 685 wtid, 686 url, 687 metadata, 688 } => query 689 .bind(wtid) 690 .bind(url.as_str()) 691 .bind(metadata) 692 .bind(None::<&[u8]>), 693 } 694 .bind_timestamp(created) 695 .bind_timestamp(&Timestamp::now()) 696 .try_map(|r: PgRow| { 697 Ok(if r.try_get_flag(0)? { 698 SyncOutResult::Replaced 699 } else if r.try_get_flag(1)? { 700 SyncOutResult::Recovered 701 } else if r.try_get_flag(2)? { 702 SyncOutResult::New 703 } else { 704 SyncOutResult::None 705 }) 706 }) 707 .fetch_one(e) 708 .await 709 } 710 711 pub async fn pending_transfer<'a>( 712 e: impl PgExecutor<'a>, 713 currency: &Currency, 714 ) -> sqlx::Result< 715 Option<( 716 i64, 717 bitcoin::Amount, 718 ShortHashCode, 719 Address, 720 Url, 721 Option<CompactString>, 722 )>, 723 > { 724 sqlx::query( 725 " 726 SELECT 727 transfer_id, 728 amount, 729 wtid, 730 credit_acc, 731 exchange_url, 732 metadata 733 FROM transfer 734 WHERE status='requested' 735 ORDER BY created_at LIMIT 1", 736 ) 737 .try_map(|r: PgRow| { 738 Ok(( 739 r.try_get(0)?, 740 sql_btc_amount(&r, 1, currency)?, 741 r.try_get(2)?, 742 sql_addr(&r, 3)?, 743 r.try_get_parse(4)?, 744 r.try_get(5)?, 745 )) 746 }) 747 .fetch_optional(e) 748 .await 749 } 750 751 /// Update transfer status to 'sent' and bind it to a txid 752 pub async fn transfer_sent<'a>(e: impl PgExecutor<'a>, id: i64, txid: &Txid) -> sqlx::Result<()> { 753 sqlx::query("UPDATE transfer SET status='sent', txid=$2 WHERE transfer_id=$1") 754 .bind(id) 755 .bind(txid.as_byte_array()) 756 .execute(e) 757 .await?; 758 Ok(()) 759 } 760 761 /// Reset the state of a conflicted transfer 762 pub async fn transfer_conflict<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> { 763 Ok( 764 sqlx::query("UPDATE transfer SET status='requested',txid=NULL WHERE txid=$1") 765 .bind(id.as_byte_array()) 766 .execute(e) 767 .await? 768 .rows_affected() 769 > 0, 770 ) 771 } 772 773 pub async fn pending_bounce<'a>( 774 e: impl PgExecutor<'a>, 775 ) -> sqlx::Result<Option<(i64, Txid, Option<String>)>> { 776 sqlx::query( 777 " 778 SELECT 779 tx_in_id, 780 tx_in.txid, 781 reason 782 FROM bounced 783 JOIN tx_in USING (tx_in_id) 784 WHERE status='requested' ORDER BY received_at LIMIT 1 785 ", 786 ) 787 .try_map(|r: PgRow| { 788 Ok(( 789 r.try_get(0)?, 790 r.try_get_map(1, Txid::from_slice)?, 791 r.try_get(2)?, 792 )) 793 }) 794 .fetch_optional(e) 795 .await 796 } 797 798 /// Update bounce status to 'sent' and bind it to a txid 799 pub async fn bounce_sent<'a>(e: impl PgExecutor<'a>, id: i64, txid: &Txid) -> sqlx::Result<()> { 800 sqlx::query("UPDATE bounced SET status='sent', txid=$2 WHERE tx_in_id=$1") 801 .bind(id) 802 .bind(txid.as_byte_array()) 803 .execute(e) 804 .await?; 805 Ok(()) 806 } 807 808 /// Reset the state of a conflicted bounce 809 pub async fn bounce_conflict<'a>(e: impl PgExecutor<'a>, id: &Txid) -> sqlx::Result<bool> { 810 Ok( 811 sqlx::query("UPDATE bounced SET status='requested',txid=NULL where txid=$1") 812 .bind(id.as_byte_array()) 813 .execute(e) 814 .await? 815 .rows_affected() 816 > 0, 817 ) 818 } 819 820 pub enum SyncBounceResult { 821 New, 822 Recovered, 823 None, 824 } 825 826 #[cfg(test)] 827 pub mod test { 828 use std::{assert_matches, str::FromStr, sync::LazyLock}; 829 830 use bitcoin::{ 831 Address, BlockHash, Txid, 832 address::NetworkUnchecked, 833 hashes::{Hash as _, sha256d::Hash}, 834 }; 835 use jiff::Span; 836 use sqlx::{PgPool, Postgres, pool::PoolConnection, postgres::PgRow}; 837 use taler_api::{db::TypeHelper as _, notification::dummy_listen, subject::IncomingSubject}; 838 use taler_common::{ 839 api::{EddsaPublicKey, HashCode, ShortHashCode, params::History, wire::TransferRequest}, 840 types::{ 841 amount::{Currency, amount}, 842 url, 843 utils::now_sql_stable_ts, 844 }, 845 }; 846 847 use crate::{ 848 CONFIG_SOURCE, 849 api::test::CLIENT, 850 db::{ 851 AddIncomingResult, ProblematicTx, SyncOutResult, TransferResult, TxOutKind, bounce, 852 bounce_sent, bump_tx_id, get_sync_state, incoming_history, init_status, 853 init_sync_state, pending_bounce, register_tx_in, register_tx_in_admin, reorg, 854 revenue_history, swap_sync_state, sync_out, transfer, update_status, 855 }, 856 }; 857 858 pub const CURR: Currency = Currency::TEST; 859 860 async fn setup() -> (PoolConnection<Postgres>, PgPool) { 861 taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await 862 } 863 864 #[tokio::test] 865 async fn kv() { 866 let (mut db, pool) = setup().await; 867 868 // Empty status 869 update_status(&mut db, false).await.unwrap(); 870 update_status(&mut db, true).await.unwrap(); 871 872 // Init status 873 init_status(&pool).await.unwrap(); 874 update_status(&mut db, false).await.unwrap(); 875 update_status(&mut db, true).await.unwrap(); 876 877 // Sync state 878 let first = BlockHash::from_raw_hash(Hash::all_zeros()); 879 let second = BlockHash::from_raw_hash(Hash::from_byte_array([ 880 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 881 0, 0, 1, 882 ])); 883 init_sync_state(&pool, &first, true).await.unwrap(); 884 assert_eq!(get_sync_state(&mut db).await.unwrap(), first); 885 init_sync_state(&pool, &second, false).await.unwrap(); 886 assert_eq!(get_sync_state(&mut db).await.unwrap(), first); 887 init_sync_state(&pool, &second, true).await.unwrap(); 888 assert_eq!(get_sync_state(&mut db).await.unwrap(), second); 889 swap_sync_state(&mut db, &second, &first).await.unwrap(); 890 assert_eq!(get_sync_state(&mut db).await.unwrap(), first); 891 swap_sync_state(&mut db, &second, &first).await.unwrap(); 892 assert_eq!(get_sync_state(&mut db).await.unwrap(), first); 893 } 894 895 pub fn rand_tx_id() -> Txid { 896 Txid::from_byte_array(rand::random()) 897 } 898 899 static ADDR: LazyLock<Address> = LazyLock::new(|| { 900 Address::<NetworkUnchecked>::from_str("bcrt1qpw3pjhtf9myl0qk9cxt54qt8qxu2mj955c7esx") 901 .unwrap() 902 .assume_checked() 903 }); 904 905 #[tokio::test] 906 async fn tx_in() { 907 let (mut db, pool) = setup().await; 908 let amount = amount("KUDOS:10"); 909 910 let mut routine = async |first: &Option<IncomingSubject>, 911 second: &Option<IncomingSubject>| { 912 let id = sqlx::query("SELECT count(*) + 1 FROM tx_in") 913 .try_map(|r: PgRow| r.try_get_u64(0)) 914 .fetch_one(&mut *db) 915 .await 916 .unwrap(); 917 let now = now_sql_stable_ts(); 918 let later = now + Span::new().hours(2); 919 let txid = rand_tx_id(); 920 // Insert 921 assert_eq!( 922 register_tx_in(&pool, &txid, &amount, &ADDR, &now, first) 923 .await 924 .unwrap(), 925 AddIncomingResult::Success { 926 new: true, 927 pending: false, 928 row_id: id, 929 valued_at: now, 930 } 931 ); 932 // Idempotent 933 assert_eq!( 934 register_tx_in(&pool, &txid, &amount, &ADDR, &later, first) 935 .await 936 .expect("register tx in"), 937 AddIncomingResult::Success { 938 new: false, 939 pending: false, 940 row_id: id, 941 valued_at: now 942 } 943 ); 944 // Many 945 assert_eq!( 946 register_tx_in(&pool, &rand_tx_id(), &amount, &ADDR, &later, second) 947 .await 948 .expect("register tx in"), 949 AddIncomingResult::Success { 950 new: true, 951 pending: false, 952 row_id: id + 1, 953 valued_at: later 954 } 955 ); 956 }; 957 958 // Empty db 959 assert_eq!( 960 revenue_history(&pool, &History::default(), &CURR, dummy_listen) 961 .await 962 .unwrap(), 963 Vec::new() 964 ); 965 assert_eq!( 966 incoming_history(&pool, &History::default(), &CURR, dummy_listen) 967 .await 968 .unwrap(), 969 Vec::new() 970 ); 971 972 // Regular transaction 973 routine(&None, &None).await; 974 975 let first = EddsaPublicKey::rand(); 976 let second = EddsaPublicKey::rand(); 977 978 // Reserve transaction 979 routine( 980 &Some(IncomingSubject::Reserve(first.clone())), 981 &Some(IncomingSubject::Reserve(second)), 982 ) 983 .await; 984 985 // Kyc transaction 986 routine( 987 &Some(IncomingSubject::Kyc(first.clone())), 988 &Some(IncomingSubject::Kyc(first)), 989 ) 990 .await; 991 992 // History 993 assert_eq!( 994 revenue_history(&pool, &History::default(), &CURR, dummy_listen) 995 .await 996 .unwrap() 997 .len(), 998 6 999 ); 1000 assert_eq!( 1001 incoming_history(&pool, &History::default(), &CURR, dummy_listen) 1002 .await 1003 .unwrap() 1004 .len(), 1005 4 1006 ); 1007 } 1008 1009 #[tokio::test] 1010 async fn tx_in_admin() { 1011 let (_, pool) = setup().await; 1012 1013 let amount = amount("KUDOS:10"); 1014 1015 // Empty db 1016 assert_eq!( 1017 incoming_history(&pool, &History::default(), &CURR, dummy_listen) 1018 .await 1019 .unwrap(), 1020 Vec::new() 1021 ); 1022 1023 let now = now_sql_stable_ts(); 1024 let later = now + Span::new().hours(2); 1025 // Insert 1026 assert_eq!( 1027 register_tx_in_admin( 1028 &pool, 1029 &amount, 1030 &ADDR, 1031 &now, 1032 &IncomingSubject::Reserve(EddsaPublicKey::rand()) 1033 ) 1034 .await 1035 .expect("register tx in"), 1036 AddIncomingResult::Success { 1037 new: true, 1038 pending: false, 1039 row_id: 1, 1040 valued_at: now 1041 } 1042 ); 1043 // Many 1044 assert_eq!( 1045 register_tx_in_admin( 1046 &pool, 1047 &amount, 1048 &ADDR, 1049 &later, 1050 &IncomingSubject::Reserve(EddsaPublicKey::rand()) 1051 ) 1052 .await 1053 .expect("register tx in"), 1054 AddIncomingResult::Success { 1055 new: true, 1056 pending: false, 1057 row_id: 2, 1058 valued_at: later 1059 } 1060 ); 1061 1062 // History 1063 assert_eq!( 1064 incoming_history(&pool, &History::default(), &CURR, dummy_listen) 1065 .await 1066 .unwrap() 1067 .len(), 1068 2 1069 ); 1070 } 1071 1072 #[tokio::test] 1073 async fn bounces() { 1074 let (_, db) = setup().await; 1075 let amount = amount("KUDOS:10"); 1076 let now = now_sql_stable_ts(); 1077 1078 // No bounces 1079 assert_eq!(pending_bounce(&db).await.unwrap(), None); 1080 bounce_sent(&db, 12, &rand_tx_id()).await.unwrap(); 1081 1082 // Bounced 1083 let bounced_txid = rand_tx_id(); 1084 let bounce_txid = rand_tx_id(); 1085 bounce(&db, &bounced_txid, &amount, &ADDR, &now, "invalid format") 1086 .await 1087 .unwrap(); 1088 bounce(&db, &bounced_txid, &amount, &ADDR, &now, "invalid format") 1089 .await 1090 .unwrap(); 1091 match pending_bounce(&db).await.unwrap() { 1092 Some((id, txid, _)) if txid == bounced_txid => { 1093 bounce_sent(&db, id, &txid).await.unwrap(); 1094 bounce_sent(&db, id, &txid).await.unwrap(); 1095 } 1096 _ => unreachable!(), 1097 } 1098 assert_matches!( 1099 sync_out( 1100 &db, 1101 &bounce_txid, 1102 None, 1103 &amount, 1104 &ADDR, 1105 &TxOutKind::Bounce(bounced_txid), 1106 &now, 1107 ) 1108 .await 1109 .unwrap(), 1110 SyncOutResult::New 1111 ); 1112 assert_eq!(pending_bounce(&db).await.unwrap(), None); 1113 assert_matches!( 1114 sync_out( 1115 &db, 1116 &bounce_txid, 1117 None, 1118 &amount, 1119 &ADDR, 1120 &TxOutKind::Bounce(bounced_txid), 1121 &now, 1122 ) 1123 .await 1124 .unwrap(), 1125 SyncOutResult::None 1126 ); 1127 1128 // Recovered 1129 let bounced_txid = rand_tx_id(); 1130 let bounce_txid = rand_tx_id(); 1131 assert_matches!( 1132 register_tx_in(&db, &bounced_txid, &amount, &ADDR, &now, &None) 1133 .await 1134 .expect("register tx in"), 1135 AddIncomingResult::Success { 1136 new: true, 1137 pending: false, 1138 .. 1139 } 1140 ); 1141 assert_matches!( 1142 sync_out( 1143 &db, 1144 &bounce_txid, 1145 None, 1146 &amount, 1147 &ADDR, 1148 &TxOutKind::Bounce(bounced_txid), 1149 &now, 1150 ) 1151 .await 1152 .unwrap(), 1153 SyncOutResult::Recovered 1154 ); 1155 assert_matches!( 1156 sync_out( 1157 &db, 1158 &bounce_txid, 1159 None, 1160 &amount, 1161 &ADDR, 1162 &TxOutKind::Bounce(bounced_txid), 1163 &now, 1164 ) 1165 .await 1166 .unwrap(), 1167 SyncOutResult::None 1168 ); 1169 assert_eq!(pending_bounce(&db).await.unwrap(), None); 1170 } 1171 1172 #[tokio::test] 1173 async fn sync_out_talerable_and_replace() { 1174 let (mut db, poll) = setup().await; 1175 let amount = amount("KUDOS:10"); 1176 let now = now_sql_stable_ts(); 1177 1178 // 1. Simple Sync Out 1179 let txid = rand_tx_id(); 1180 assert_matches!( 1181 sync_out(&poll, &txid, None, &amount, &ADDR, &TxOutKind::Simple, &now,) 1182 .await 1183 .unwrap(), 1184 SyncOutResult::New 1185 ); 1186 assert_matches!( 1187 sync_out(&poll, &txid, None, &amount, &ADDR, &TxOutKind::Simple, &now,) 1188 .await 1189 .unwrap(), 1190 SyncOutResult::None 1191 ); 1192 1193 // 2. Replace (Fee Bump) 1194 assert_matches!( 1195 sync_out( 1196 &poll, 1197 &rand_tx_id(), 1198 Some(&txid), 1199 &amount, 1200 &ADDR, 1201 &TxOutKind::Simple, 1202 &now, 1203 ) 1204 .await 1205 .unwrap(), 1206 SyncOutResult::Replaced 1207 ); 1208 1209 // 3. Recover Talerable Transfer 1210 let t = TransferRequest { 1211 amount, 1212 exchange_base_url: url("https://exchange.example.com"), 1213 request_uid: HashCode::rand(), 1214 wtid: ShortHashCode::rand(), 1215 metadata: None, 1216 credit_account: CLIENT.as_uri(), 1217 }; 1218 1219 // Create the pending transfer 1220 assert_matches!( 1221 transfer(&poll, &CLIENT, &t).await.unwrap(), 1222 TransferResult::Success(_) 1223 ); 1224 1225 let txid = rand_tx_id(); 1226 // Sync it out 1227 assert_matches!( 1228 sync_out( 1229 &poll, 1230 &txid, 1231 None, 1232 &amount, 1233 &ADDR, 1234 &TxOutKind::Talerable { 1235 wtid: &t.wtid, 1236 url: &t.exchange_base_url, 1237 metadata: None, 1238 }, 1239 &now, 1240 ) 1241 .await 1242 .unwrap(), 1243 SyncOutResult::Recovered 1244 ); 1245 1246 // Bump fee 1247 bump_tx_id(&mut db, &rand_tx_id(), &t.wtid).await.unwrap(); 1248 } 1249 1250 #[tokio::test] 1251 async fn reorgs() { 1252 let (_, pool) = setup().await; 1253 let amount = amount("KUDOS:10"); 1254 let now = now_sql_stable_ts(); 1255 1256 // 1. Setup a normal incoming transaction (Credit) 1257 let txid_normal = rand_tx_id(); 1258 let reserve_pub = EddsaPublicKey::rand(); 1259 register_tx_in( 1260 &pool, 1261 &txid_normal, 1262 &amount, 1263 &ADDR, 1264 &now, 1265 &Some(IncomingSubject::Reserve(reserve_pub.clone())), 1266 ) 1267 .await 1268 .unwrap(); 1269 1270 // 2. Setup a bounced transaction that was successfully synced out 1271 let txid_bounced = rand_tx_id(); 1272 let txid_bounce = rand_tx_id(); 1273 bounce(&pool, &txid_bounced, &amount, &ADDR, &now, "bad data") 1274 .await 1275 .unwrap(); 1276 sync_out( 1277 &pool, 1278 &txid_bounce, 1279 None, 1280 &amount, 1281 &ADDR, 1282 &TxOutKind::Bounce(txid_bounced), 1283 &now, 1284 ) 1285 .await 1286 .unwrap(); 1287 1288 // 3. Trigger a Reorg dropping both the incoming reserve and the outgoing bounce 1289 let problematic = reorg(&pool, &[txid_normal, txid_bounced]).await.unwrap(); 1290 1291 assert_eq!( 1292 problematic.as_slice(), 1293 &[ 1294 ProblematicTx::Taler { 1295 txid: txid_normal, 1296 addr: ADDR.clone(), 1297 ty: taler_common::db::IncomingType::reserve, 1298 metadata: reserve_pub 1299 }, 1300 ProblematicTx::Bounce { 1301 txid: txid_bounced, 1302 bounced_in: txid_bounce 1303 } 1304 ] 1305 ); 1306 } 1307 }