db.rs (44717B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use jiff::Timestamp; 20 use serde::{Serialize, de::DeserializeOwned}; 21 use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow}; 22 use taler_api::{ 23 db::{BindHelper, IncomingType, TypeHelper, history, page}, 24 subject::{IncomingSubject, OutgoingSubject}, 25 }; 26 use taler_common::{ 27 api_params::{History, Page}, 28 api_revenue::RevenueIncomingBankTransaction, 29 api_wire::{ 30 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, 31 TransferState, TransferStatus, 32 }, 33 types::{ 34 amount::{Currency, Decimal}, 35 payto::{PaytoImpl as _, PaytoURI}, 36 }, 37 }; 38 use tokio::sync::watch::Receiver; 39 40 use crate::{CyclosId, FullCyclosPayto}; 41 42 #[derive(Debug, Clone)] 43 pub struct TxIn { 44 pub transfer_id: u64, 45 pub tx_id: Option<u64>, 46 pub amount: Decimal, 47 pub subject: String, 48 pub debtor: FullCyclosPayto, 49 pub valued_at: Timestamp, 50 } 51 52 impl Display for TxIn { 53 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 54 let Self { 55 transfer_id, 56 tx_id, 57 amount, 58 subject, 59 debtor, 60 valued_at, 61 } = self; 62 let tx_id = match tx_id { 63 Some(id) => format_args!(":{}", *id), 64 None => format_args!(""), 65 }; 66 write!( 67 f, 68 "{valued_at} {transfer_id}{tx_id} {amount} ({} {}) '{subject}'", 69 debtor.0, debtor.name 70 ) 71 } 72 } 73 74 #[derive(Debug, Clone)] 75 pub struct TxOut { 76 pub transfer_id: u64, 77 pub tx_id: Option<u64>, 78 pub amount: Decimal, 79 pub subject: String, 80 pub creditor: FullCyclosPayto, 81 pub valued_at: Timestamp, 82 } 83 84 impl Display for TxOut { 85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 86 let Self { 87 transfer_id, 88 tx_id, 89 amount, 90 subject, 91 creditor, 92 valued_at, 93 } = self; 94 let tx_id = match tx_id { 95 Some(id) => format_args!(":{}", *id), 96 None => format_args!(""), 97 }; 98 write!( 99 f, 100 "{valued_at} {transfer_id}{tx_id} {amount} ({} {}) '{subject}'", 101 creditor.0, creditor.name 102 ) 103 } 104 } 105 106 #[derive(Debug, PartialEq, Eq)] 107 pub struct Initiated { 108 pub id: u64, 109 pub amount: Decimal, 110 pub subject: String, 111 pub creditor: FullCyclosPayto, 112 } 113 114 impl Display for Initiated { 115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 116 let Self { 117 id, 118 amount, 119 subject, 120 creditor, 121 } = self; 122 write!( 123 f, 124 "{id} {amount} ({} {}) '{subject}'", 125 creditor.0, creditor.name 126 ) 127 } 128 } 129 130 #[derive(Debug, Clone)] 131 pub struct TxInAdmin { 132 pub amount: Decimal, 133 pub subject: String, 134 pub debtor: FullCyclosPayto, 135 pub metadata: IncomingSubject, 136 } 137 138 #[derive(Debug, PartialEq, Eq)] 139 pub enum AddIncomingResult { 140 Success { 141 new: bool, 142 row_id: u64, 143 valued_at: Timestamp, 144 }, 145 ReservePubReuse, 146 } 147 148 pub async fn register_tx_in_admin( 149 db: &PgPool, 150 tx: &TxInAdmin, 151 now: &Timestamp, 152 ) -> sqlx::Result<AddIncomingResult> { 153 sqlx::query( 154 " 155 SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new 156 FROM register_tx_in(NULL, NULL, ($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8, $6) 157 ", 158 ) 159 .bind_decimal(&tx.amount) 160 .bind(&tx.subject) 161 .bind(tx.debtor.0 as i64) 162 .bind(&tx.debtor.name) 163 .bind_timestamp(now) 164 .bind(tx.metadata.ty()) 165 .bind(tx.metadata.key()) 166 .try_map(|r: PgRow| { 167 Ok(if r.try_get(0)? { 168 AddIncomingResult::ReservePubReuse 169 } else { 170 AddIncomingResult::Success { 171 row_id: r.try_get_u64(1)?, 172 valued_at: r.try_get_timestamp(2)?, 173 new: r.try_get(3)?, 174 } 175 }) 176 }) 177 .fetch_one(db) 178 .await 179 } 180 181 pub async fn register_tx_in( 182 db: &mut PgConnection, 183 tx: &TxIn, 184 subject: &Option<IncomingSubject>, 185 now: &Timestamp, 186 ) -> sqlx::Result<AddIncomingResult> { 187 sqlx::query( 188 " 189 SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new 190 FROM register_tx_in($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8, $9, $10, $11) 191 ", 192 ) 193 .bind(tx.transfer_id as i64) 194 .bind(tx.tx_id.map(|it| it as i64)) 195 .bind_decimal(&tx.amount) 196 .bind(&tx.subject) 197 .bind(tx.debtor.0 as i64) 198 .bind(&tx.debtor.name) 199 .bind(tx.valued_at.as_microsecond()) 200 .bind(subject.as_ref().map(|it| it.ty())) 201 .bind(subject.as_ref().map(|it| it.key())) 202 .bind(now.as_microsecond()) 203 .try_map(|r: PgRow| { 204 Ok(if r.try_get(0)? { 205 AddIncomingResult::ReservePubReuse 206 } else { 207 AddIncomingResult::Success { 208 row_id: r.try_get_u64(1)?, 209 valued_at: r.try_get_timestamp(2)?, 210 new: r.try_get(3)?, 211 } 212 }) 213 }) 214 .fetch_one(db) 215 .await 216 } 217 218 #[derive(Debug)] 219 pub enum TxOutKind { 220 Simple, 221 Bounce(u64), 222 Talerable(OutgoingSubject), 223 } 224 225 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 226 #[allow(non_camel_case_types)] 227 #[sqlx(type_name = "register_result")] 228 pub enum RegisterResult { 229 /// Already registered 230 idempotent, 231 /// Initiated transaction 232 known, 233 /// Recovered unknown outgoing transaction 234 recovered, 235 } 236 237 #[derive(Debug, PartialEq, Eq)] 238 pub struct AddOutgoingResult { 239 pub result: RegisterResult, 240 pub row_id: u64, 241 } 242 243 pub async fn register_tx_out( 244 db: &mut PgConnection, 245 tx: &TxOut, 246 kind: &TxOutKind, 247 now: &Timestamp, 248 ) -> sqlx::Result<AddOutgoingResult> { 249 let query = sqlx::query( 250 " 251 SELECT out_result, out_tx_row_id 252 FROM register_tx_out($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8, $9, $10, $11, $12) 253 ", 254 ) 255 .bind(tx.transfer_id as i64) 256 .bind(tx.tx_id.map(|it| it as i64)) 257 .bind_decimal(&tx.amount) 258 .bind(&tx.subject) 259 .bind(tx.creditor.0 as i64) 260 .bind(&tx.creditor.name) 261 .bind_timestamp(&tx.valued_at); 262 let query = match kind { 263 TxOutKind::Simple => query 264 .bind(None::<&[u8]>) 265 .bind(None::<&str>) 266 .bind(None::<i64>), 267 TxOutKind::Bounce(bounced) => query 268 .bind(None::<&[u8]>) 269 .bind(None::<&str>) 270 .bind(*bounced as i64), 271 TxOutKind::Talerable(subject) => query 272 .bind(subject.0.as_ref()) 273 .bind(subject.1.as_ref()) 274 .bind(None::<i64>), 275 }; 276 query 277 .bind_timestamp(now) 278 .try_map(|r: PgRow| { 279 Ok(AddOutgoingResult { 280 result: r.try_get(0)?, 281 row_id: r.try_get_u64(1)?, 282 }) 283 }) 284 .fetch_one(db) 285 .await 286 } 287 288 #[derive(Debug, PartialEq, Eq)] 289 pub enum TransferResult { 290 Success { id: u64, initiated_at: Timestamp }, 291 RequestUidReuse, 292 WtidReuse, 293 } 294 295 pub async fn make_transfer<'a>( 296 db: impl PgExecutor<'a>, 297 req: &TransferRequest, 298 creditor: &FullCyclosPayto, 299 now: &Timestamp, 300 ) -> sqlx::Result<TransferResult> { 301 let subject = format!("{} {}", req.wtid, req.exchange_base_url); 302 sqlx::query( 303 " 304 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 305 FROM taler_transfer($1, $2, $3, ($4, $5)::taler_amount, $6, $7, $8, $9) 306 ", 307 ) 308 .bind(req.request_uid.as_ref()) 309 .bind(req.wtid.as_ref()) 310 .bind(&subject) 311 .bind_amount(&req.amount) 312 .bind(req.exchange_base_url.as_str()) 313 .bind(creditor.0 as i64) 314 .bind(&creditor.name) 315 .bind_timestamp(now) 316 .try_map(|r: PgRow| { 317 Ok(if r.try_get(0)? { 318 TransferResult::RequestUidReuse 319 } else if r.try_get(1)? { 320 TransferResult::WtidReuse 321 } else { 322 TransferResult::Success { 323 id: r.try_get_u64(2)?, 324 initiated_at: r.try_get_timestamp(3)?, 325 } 326 }) 327 }) 328 .fetch_one(db) 329 .await 330 } 331 332 #[derive(Debug, PartialEq, Eq)] 333 pub struct BounceResult { 334 pub tx_id: u64, 335 pub tx_new: bool, 336 } 337 338 pub async fn register_bounced_tx_in( 339 db: &mut PgConnection, 340 tx: &TxIn, 341 chargeback_id: u64, 342 reason: &str, 343 now: &Timestamp, 344 ) -> sqlx::Result<BounceResult> { 345 sqlx::query( 346 " 347 SELECT out_tx_row_id, out_tx_new 348 FROM register_bounced_tx_in($1, $2, ($3, $4)::taler_amount, $5, $6, $7, $8, $9, $10, $11) 349 ", 350 ) 351 .bind(tx.transfer_id as i64) 352 .bind(tx.tx_id.map(|it| it as i64)) 353 .bind_decimal(&tx.amount) 354 .bind(&tx.subject) 355 .bind(tx.debtor.0 as i64) 356 .bind(&tx.debtor.name) 357 .bind_timestamp(&tx.valued_at) 358 .bind(chargeback_id as i64) 359 .bind(reason) 360 .bind_timestamp(now) 361 .try_map(|r: PgRow| { 362 Ok(BounceResult { 363 tx_id: r.try_get_u64(0)?, 364 tx_new: r.try_get(1)?, 365 }) 366 }) 367 .fetch_one(db) 368 .await 369 } 370 371 pub async fn transfer_page<'a>( 372 db: impl PgExecutor<'a>, 373 status: &Option<TransferState>, 374 currency: &Currency, 375 params: &Page, 376 ) -> sqlx::Result<Vec<TransferListStatus>> { 377 page( 378 db, 379 "initiated_id", 380 params, 381 || { 382 let mut builder = QueryBuilder::new( 383 " 384 SELECT 385 initiated_id, 386 status, 387 (amount).val as amount_val, 388 (amount).frac as amount_frac, 389 credit_account, 390 credit_name, 391 initiated_at 392 FROM transfer 393 JOIN initiated USING (initiated_id) 394 WHERE 395 ", 396 ); 397 if let Some(status) = status { 398 builder.push(" status = ").push_bind(status).push(" AND "); 399 } 400 builder 401 }, 402 |r: PgRow| { 403 Ok(TransferListStatus { 404 row_id: r.try_get_safeu64(0)?, 405 status: r.try_get(1)?, 406 amount: r.try_get_amount_i(2, currency)?, 407 credit_account: r.try_get_cyclos_fullpaytouri(4, 5)?, 408 timestamp: r.try_get_timestamp(6)?.into(), 409 }) 410 }, 411 ) 412 .await 413 } 414 415 pub async fn outgoing_history( 416 db: &PgPool, 417 params: &History, 418 currency: &Currency, 419 listen: impl FnOnce() -> Receiver<i64>, 420 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 421 history( 422 db, 423 "tx_out_id", 424 params, 425 listen, 426 || { 427 QueryBuilder::new( 428 " 429 SELECT 430 tx_out_id, 431 (amount).val as amount_val, 432 (amount).frac as amount_frac, 433 credit_account, 434 credit_name, 435 valued_at, 436 exchange_base_url, 437 wtid 438 FROM taler_out 439 JOIN tx_out USING (tx_out_id) 440 WHERE 441 ", 442 ) 443 }, 444 |r: PgRow| { 445 Ok(OutgoingBankTransaction { 446 row_id: r.try_get_safeu64(0)?, 447 amount: r.try_get_amount_i(1, currency)?, 448 credit_account: r.try_get_cyclos_fullpaytouri(3, 4)?, 449 date: r.try_get_timestamp(5)?.into(), 450 exchange_base_url: r.try_get_url(6)?, 451 wtid: r.try_get_base32(7)?, 452 }) 453 }, 454 ) 455 .await 456 } 457 458 pub async fn incoming_history( 459 db: &PgPool, 460 params: &History, 461 currency: &Currency, 462 listen: impl FnOnce() -> Receiver<i64>, 463 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 464 history( 465 db, 466 "tx_in_id", 467 params, 468 listen, 469 || { 470 QueryBuilder::new( 471 " 472 SELECT 473 type, 474 tx_in_id, 475 (amount).val as amount_val, 476 (amount).frac as amount_frac, 477 debit_account, 478 debit_name, 479 valued_at, 480 metadata 481 FROM taler_in 482 JOIN tx_in USING (tx_in_id) 483 WHERE 484 ", 485 ) 486 }, 487 |r: PgRow| { 488 Ok(match r.try_get(0)? { 489 IncomingType::reserve => IncomingBankTransaction::Reserve { 490 row_id: r.try_get_safeu64(1)?, 491 amount: r.try_get_amount_i(2, currency)?, 492 debit_account: r.try_get_cyclos_fullpaytouri(4, 5)?, 493 date: r.try_get_timestamp(6)?.into(), 494 reserve_pub: r.try_get_base32(7)?, 495 }, 496 IncomingType::kyc => IncomingBankTransaction::Kyc { 497 row_id: r.try_get_safeu64(1)?, 498 amount: r.try_get_amount_i(2, currency)?, 499 debit_account: r.try_get_cyclos_fullpaytouri(4, 5)?, 500 date: r.try_get_timestamp(6)?.into(), 501 account_pub: r.try_get_base32(7)?, 502 }, 503 IncomingType::wad => { 504 unimplemented!("WAD is not yet supported") 505 } 506 }) 507 }, 508 ) 509 .await 510 } 511 512 pub async fn revenue_history( 513 db: &PgPool, 514 params: &History, 515 currency: &Currency, 516 listen: impl FnOnce() -> Receiver<i64>, 517 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 518 history( 519 db, 520 "tx_in_id", 521 params, 522 listen, 523 || { 524 QueryBuilder::new( 525 " 526 SELECT 527 tx_in_id, 528 valued_at, 529 (amount).val as amount_val, 530 (amount).frac as amount_frac, 531 debit_account, 532 debit_name, 533 subject 534 FROM tx_in 535 WHERE 536 ", 537 ) 538 }, 539 |r: PgRow| { 540 Ok(RevenueIncomingBankTransaction { 541 row_id: r.try_get_safeu64(0)?, 542 date: r.try_get_timestamp(1)?.into(), 543 amount: r.try_get_amount_i(2, currency)?, 544 credit_fee: None, 545 debit_account: r.try_get_cyclos_fullpaytouri(4, 5)?, 546 subject: r.try_get(6)?, 547 }) 548 }, 549 ) 550 .await 551 } 552 553 pub async fn transfer_by_id<'a>( 554 db: impl PgExecutor<'a>, 555 id: u64, 556 currency: &Currency, 557 ) -> sqlx::Result<Option<TransferStatus>> { 558 sqlx::query( 559 " 560 SELECT 561 status, 562 status_msg, 563 (amount).val as amount_val, 564 (amount).frac as amount_frac, 565 exchange_base_url, 566 wtid, 567 credit_account, 568 credit_name, 569 initiated_at 570 FROM transfer 571 JOIN initiated USING (initiated_id) 572 WHERE initiated_id = $1 573 ", 574 ) 575 .bind(id as i64) 576 .try_map(|r: PgRow| { 577 Ok(TransferStatus { 578 status: r.try_get(0)?, 579 status_msg: r.try_get(1)?, 580 amount: r.try_get_amount_i(2, currency)?, 581 origin_exchange_url: r.try_get(4)?, 582 wtid: r.try_get_base32(5)?, 583 credit_account: r.try_get_cyclos_fullpaytouri(6, 7)?, 584 timestamp: r.try_get_timestamp(8)?.into(), 585 }) 586 }) 587 .fetch_optional(db) 588 .await 589 } 590 591 /** Get a batch of pending initiated transactions not attempted since [start] */ 592 pub async fn pending_batch<'a>( 593 db: impl PgExecutor<'a>, 594 start: &Timestamp, 595 ) -> sqlx::Result<Vec<Initiated>> { 596 sqlx::query( 597 " 598 SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name 599 FROM initiated 600 WHERE tx_id IS NULL 601 AND status='pending' 602 AND (last_submitted IS NULL OR last_submitted < $1) 603 LIMIT 100 604 ", 605 ) 606 .bind_timestamp(start) 607 .try_map(|r: PgRow| { 608 Ok(Initiated { 609 id: r.try_get_u64(0)?, 610 amount: r.try_get_decimal(1, 2)?, 611 subject: r.try_get(3)?, 612 creditor: r.try_get_cyclos_fullpayto(4, 5)?, 613 }) 614 }) 615 .fetch_all(db) 616 .await 617 } 618 619 /** Update status of a successful submitted initiated transaction */ 620 pub async fn initiated_submit_success<'a>( 621 db: impl PgExecutor<'a>, 622 initiated_id: u64, 623 timestamp: &Timestamp, 624 tx_id: u64, 625 ) -> sqlx::Result<()> { 626 sqlx::query( 627 " 628 UPDATE initiated 629 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2 630 WHERE initiated_id=$3 631 " 632 ).bind_timestamp(timestamp) 633 .bind(tx_id as i64) 634 .bind(initiated_id as i64) 635 .execute(db).await?; 636 Ok(()) 637 } 638 639 /** Update status of a permanently failed initiated transaction */ 640 pub async fn initiated_submit_permanent_failure<'a>( 641 db: impl PgExecutor<'a>, 642 initiated_id: u64, 643 timestamp: &Timestamp, 644 msg: &str, 645 ) -> sqlx::Result<()> { 646 sqlx::query( 647 " 648 UPDATE initiated 649 SET status='permanent_failure', status_msg=$2 650 WHERE initiated_id=$3 651 ", 652 ) 653 .bind_timestamp(timestamp) 654 .bind(msg) 655 .bind(initiated_id as i64) 656 .execute(db) 657 .await?; 658 Ok(()) 659 } 660 661 #[derive(Debug, PartialEq, Eq)] 662 pub struct OutFailureResult { 663 pub initiated_id: Option<u64>, 664 pub new: bool, 665 } 666 667 /** Update status of a charged back failed initiated transaction */ 668 pub async fn initiated_chargeback_failure( 669 db: &mut PgConnection, 670 code: u64, 671 bounced: Option<u32>, 672 now: &Timestamp, 673 ) -> sqlx::Result<OutFailureResult> { 674 todo!(); 675 sqlx::query( 676 " 677 SELECT out_new, out_initiated_id 678 FROM register_tx_out_failure($1, $2, $3) 679 ", 680 ) 681 .bind(code as i64) 682 .bind(bounced.map(|i| i as i32)) 683 .bind_timestamp(now) 684 .try_map(|r: PgRow| { 685 Ok(OutFailureResult { 686 new: r.try_get(0)?, 687 initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64), 688 }) 689 }) 690 .fetch_one(db) 691 .await 692 } 693 694 /** Get JSON value from KV table */ 695 pub async fn kv_get<'a, T: DeserializeOwned + Unpin + Send>( 696 db: impl PgExecutor<'a>, 697 key: &str, 698 ) -> sqlx::Result<Option<T>> { 699 sqlx::query("SELECT value FROM kv WHERE key=$1") 700 .bind(key) 701 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 702 .fetch_optional(db) 703 .await 704 } 705 706 /** Set JSON value in KV table */ 707 pub async fn kv_set<'a, T: Serialize>( 708 db: impl PgExecutor<'a>, 709 key: &str, 710 value: &T, 711 ) -> sqlx::Result<()> { 712 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 713 .bind(key) 714 .bind(sqlx::types::Json(value)) 715 .execute(db) 716 .await?; 717 Ok(()) 718 } 719 720 pub trait CyclosTypeHelper { 721 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 722 &self, 723 idx: I, 724 name: I, 725 ) -> sqlx::Result<FullCyclosPayto>; 726 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 727 &self, 728 idx: I, 729 name: I, 730 ) -> sqlx::Result<PaytoURI>; 731 } 732 733 impl CyclosTypeHelper for PgRow { 734 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 735 &self, 736 idx: I, 737 name: I, 738 ) -> sqlx::Result<FullCyclosPayto> { 739 let idx = self.try_get_u64(idx)?; 740 let name = self.try_get(name)?; 741 Ok(FullCyclosPayto::new(CyclosId(idx), name)) 742 } 743 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 744 &self, 745 idx: I, 746 name: I, 747 ) -> sqlx::Result<PaytoURI> { 748 let idx = self.try_get_u64(idx)?; 749 let name = self.try_get(name)?; 750 Ok(CyclosId(idx).as_full_payto(name)) 751 } 752 } 753 754 #[cfg(test)] 755 mod test { 756 use std::sync::LazyLock; 757 758 use jiff::{Span, Timestamp}; 759 use serde_json::json; 760 use sqlx::{PgConnection, PgPool, postgres::PgRow}; 761 use taler_api::{ 762 db::TypeHelper, 763 subject::{IncomingSubject, OutgoingSubject}, 764 }; 765 use taler_common::{ 766 api_common::{EddsaPublicKey, HashCode, ShortHashCode}, 767 api_params::{History, Page}, 768 api_wire::{TransferRequest, TransferState, TransferStatus}, 769 types::{ 770 amount::{Currency, amount, decimal}, 771 payto::payto, 772 url, 773 utils::now_sql_stable_timestamp, 774 }, 775 }; 776 use tokio::sync::watch::Receiver; 777 778 use crate::{ 779 constants::CONFIG_SOURCE, 780 cyclos_payto, 781 db::{ 782 self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, TransferResult, 783 TxIn, TxInAdmin, TxOut, TxOutKind, 784 }, 785 }; 786 787 pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "TEST".parse().unwrap()); 788 789 fn fake_listen<T: Default>() -> Receiver<T> { 790 tokio::sync::watch::channel(T::default()).1 791 } 792 793 async fn setup() -> (PgConnection, PgPool) { 794 let pool = taler_test_utils::db_test_setup(CONFIG_SOURCE).await; 795 let conn = pool.acquire().await.unwrap().leak(); 796 (conn, pool) 797 } 798 799 #[tokio::test] 800 async fn kv() { 801 let (mut db, _) = setup().await; 802 803 let value = json!({ 804 "name": "Mr Smith", 805 "no way": 32 806 }); 807 808 assert_eq!( 809 db::kv_get::<serde_json::Value>(&mut db, "value") 810 .await 811 .unwrap(), 812 None 813 ); 814 db::kv_set(&mut db, "value", &value).await.unwrap(); 815 db::kv_set(&mut db, "value", &value).await.unwrap(); 816 assert_eq!( 817 db::kv_get::<serde_json::Value>(&mut db, "value") 818 .await 819 .unwrap(), 820 Some(value) 821 ); 822 } 823 824 #[tokio::test] 825 async fn tx_in() { 826 let (mut db, pool) = setup().await; 827 828 async fn routine( 829 db: &mut PgConnection, 830 first: &Option<IncomingSubject>, 831 second: &Option<IncomingSubject>, 832 ) { 833 let id = sqlx::query("SELECT count(*) + 1 FROM tx_in") 834 .try_map(|r: PgRow| r.try_get_u64(0)) 835 .fetch_one(&mut *db) 836 .await 837 .unwrap(); 838 let now = now_sql_stable_timestamp(); 839 let later = now + Span::new().hours(2); 840 let tx = TxIn { 841 transfer_id: now.as_microsecond() as u64, 842 tx_id: None, 843 amount: decimal("10"), 844 subject: "subject".to_owned(), 845 debtor: cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"), 846 valued_at: now, 847 }; 848 // Insert 849 assert_eq!( 850 db::register_tx_in(db, &tx, &first, &now) 851 .await 852 .expect("register tx in"), 853 AddIncomingResult::Success { 854 new: true, 855 row_id: id, 856 valued_at: now 857 } 858 ); 859 // Idempotent 860 assert_eq!( 861 db::register_tx_in( 862 db, 863 &TxIn { 864 valued_at: later, 865 ..tx.clone() 866 }, 867 &first, 868 &now 869 ) 870 .await 871 .expect("register tx in"), 872 AddIncomingResult::Success { 873 new: false, 874 row_id: id, 875 valued_at: now 876 } 877 ); 878 // Many 879 assert_eq!( 880 db::register_tx_in( 881 db, 882 &TxIn { 883 transfer_id: later.as_microsecond() as u64, 884 valued_at: later, 885 ..tx 886 }, 887 &second, 888 &now 889 ) 890 .await 891 .expect("register tx in"), 892 AddIncomingResult::Success { 893 new: true, 894 row_id: id + 1, 895 valued_at: later 896 } 897 ); 898 } 899 900 // Empty db 901 assert_eq!( 902 db::revenue_history(&pool, &History::default(), &CURRENCY, fake_listen) 903 .await 904 .unwrap(), 905 Vec::new() 906 ); 907 assert_eq!( 908 db::incoming_history(&pool, &History::default(), &CURRENCY, fake_listen) 909 .await 910 .unwrap(), 911 Vec::new() 912 ); 913 914 // Regular transaction 915 routine(&mut db, &None, &None).await; 916 917 // Reserve transaction 918 routine( 919 &mut db, 920 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 921 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 922 ) 923 .await; 924 925 // Kyc transaction 926 routine( 927 &mut db, 928 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 929 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 930 ) 931 .await; 932 933 // History 934 assert_eq!( 935 db::revenue_history(&pool, &History::default(), &CURRENCY, fake_listen) 936 .await 937 .unwrap() 938 .len(), 939 6 940 ); 941 assert_eq!( 942 db::incoming_history(&pool, &History::default(), &CURRENCY, fake_listen) 943 .await 944 .unwrap() 945 .len(), 946 4 947 ); 948 } 949 950 #[tokio::test] 951 async fn tx_in_admin() { 952 let (_, pool) = setup().await; 953 954 // Empty db 955 assert_eq!( 956 db::incoming_history(&pool, &History::default(), &CURRENCY, fake_listen) 957 .await 958 .unwrap(), 959 Vec::new() 960 ); 961 962 let now = now_sql_stable_timestamp(); 963 let later = now + Span::new().hours(2); 964 let tx = TxInAdmin { 965 amount: decimal("10"), 966 subject: "subject".to_owned(), 967 debtor: cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"), 968 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 969 }; 970 // Insert 971 assert_eq!( 972 db::register_tx_in_admin(&pool, &tx, &now) 973 .await 974 .expect("register tx in"), 975 AddIncomingResult::Success { 976 new: true, 977 row_id: 1, 978 valued_at: now 979 } 980 ); 981 // Idempotent 982 assert_eq!( 983 db::register_tx_in_admin(&pool, &tx, &later) 984 .await 985 .expect("register tx in"), 986 AddIncomingResult::Success { 987 new: false, 988 row_id: 1, 989 valued_at: now 990 } 991 ); 992 // Many 993 assert_eq!( 994 db::register_tx_in_admin( 995 &pool, 996 &TxInAdmin { 997 subject: "Other".to_owned(), 998 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 999 ..tx.clone() 1000 }, 1001 &later 1002 ) 1003 .await 1004 .expect("register tx in"), 1005 AddIncomingResult::Success { 1006 new: true, 1007 row_id: 2, 1008 valued_at: later 1009 } 1010 ); 1011 1012 // History 1013 assert_eq!( 1014 db::incoming_history(&pool, &History::default(), &CURRENCY, fake_listen) 1015 .await 1016 .unwrap() 1017 .len(), 1018 2 1019 ); 1020 } 1021 1022 #[tokio::test] 1023 async fn tx_out() { 1024 let (mut db, pool) = setup().await; 1025 1026 async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) { 1027 let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out") 1028 .try_map(|r: PgRow| r.try_get_u64(0)) 1029 .fetch_one(&mut *db) 1030 .await 1031 .unwrap(); 1032 let now = now_sql_stable_timestamp(); 1033 let later = now + Span::new().hours(2); 1034 let tx = TxOut { 1035 transfer_id, 1036 tx_id: Some(transfer_id), 1037 amount: decimal("10"), 1038 subject: "subject".to_owned(), 1039 creditor: cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"), 1040 valued_at: now, 1041 }; 1042 assert!(matches!( 1043 db::make_transfer( 1044 &mut *db, 1045 &TransferRequest { 1046 request_uid: HashCode::rand(), 1047 amount: amount("HUF:10"), 1048 exchange_base_url: url("https://exchange.test.com/"), 1049 wtid: ShortHashCode::rand(), 1050 credit_account: payto( 1051 "payto://cyclos/31000163100000000?receiver-name=name" 1052 ), 1053 }, 1054 &tx.creditor, 1055 &now 1056 ) 1057 .await 1058 .unwrap(), 1059 TransferResult::Success { .. } 1060 )); 1061 db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), transfer_id) 1062 .await 1063 .expect("status success"); 1064 1065 // Insert 1066 assert_eq!( 1067 db::register_tx_out(&mut *db, &tx, first, &now) 1068 .await 1069 .expect("register tx out"), 1070 AddOutgoingResult { 1071 result: db::RegisterResult::known, 1072 row_id: transfer_id, 1073 } 1074 ); 1075 // Idempotent 1076 assert_eq!( 1077 db::register_tx_out( 1078 &mut *db, 1079 &TxOut { 1080 valued_at: later, 1081 ..tx.clone() 1082 }, 1083 first, 1084 &now 1085 ) 1086 .await 1087 .expect("register tx out"), 1088 AddOutgoingResult { 1089 result: db::RegisterResult::idempotent, 1090 row_id: transfer_id, 1091 } 1092 ); 1093 // Recovered 1094 assert_eq!( 1095 db::register_tx_out( 1096 &mut *db, 1097 &TxOut { 1098 transfer_id: transfer_id + 1, 1099 tx_id: Some(transfer_id + 1), 1100 valued_at: later, 1101 ..tx.clone() 1102 }, 1103 second, 1104 &now 1105 ) 1106 .await 1107 .expect("register tx out"), 1108 AddOutgoingResult { 1109 result: db::RegisterResult::recovered, 1110 row_id: transfer_id + 1, 1111 } 1112 ); 1113 } 1114 1115 // Empty db 1116 assert_eq!( 1117 db::outgoing_history(&pool, &History::default(), &CURRENCY, fake_listen) 1118 .await 1119 .unwrap(), 1120 Vec::new() 1121 ); 1122 1123 // Regular transaction 1124 routine(&mut db, &TxOutKind::Simple, &TxOutKind::Simple).await; 1125 1126 // Talerable transaction 1127 routine( 1128 &mut db, 1129 &TxOutKind::Talerable(OutgoingSubject( 1130 ShortHashCode::rand(), 1131 url("https://exchange.com"), 1132 )), 1133 &TxOutKind::Talerable(OutgoingSubject( 1134 ShortHashCode::rand(), 1135 url("https://exchange.com"), 1136 )), 1137 ) 1138 .await; 1139 1140 // Bounced transaction 1141 routine(&mut db, &TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1142 1143 // History 1144 assert_eq!( 1145 db::outgoing_history(&pool, &History::default(), &CURRENCY, fake_listen) 1146 .await 1147 .unwrap() 1148 .len(), 1149 2 1150 ); 1151 } 1152 1153 // TODO tx out failure 1154 1155 #[tokio::test] 1156 async fn transfer() { 1157 let (mut db, _) = setup().await; 1158 1159 // Empty db 1160 assert_eq!( 1161 db::transfer_by_id(&mut db, 0, &CURRENCY).await.unwrap(), 1162 None 1163 ); 1164 assert_eq!( 1165 db::transfer_page(&mut db, &None, &CURRENCY, &Page::default()) 1166 .await 1167 .unwrap(), 1168 Vec::new() 1169 ); 1170 1171 let req = TransferRequest { 1172 request_uid: HashCode::rand(), 1173 amount: amount("HUF:10"), 1174 exchange_base_url: url("https://exchange.test.com/"), 1175 wtid: ShortHashCode::rand(), 1176 credit_account: payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), 1177 }; 1178 let payto = cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"); 1179 let now = now_sql_stable_timestamp(); 1180 let later = now + Span::new().hours(2); 1181 // Insert 1182 assert_eq!( 1183 db::make_transfer(&mut db, &req, &payto, &now) 1184 .await 1185 .expect("transfer"), 1186 TransferResult::Success { 1187 id: 1, 1188 initiated_at: now 1189 } 1190 ); 1191 // Idempotent 1192 assert_eq!( 1193 db::make_transfer(&mut db, &req, &payto, &later) 1194 .await 1195 .expect("transfer"), 1196 TransferResult::Success { 1197 id: 1, 1198 initiated_at: now 1199 } 1200 ); 1201 // Request UID reuse 1202 assert_eq!( 1203 db::make_transfer( 1204 &mut db, 1205 &TransferRequest { 1206 wtid: ShortHashCode::rand(), 1207 ..req.clone() 1208 }, 1209 &payto, 1210 &now 1211 ) 1212 .await 1213 .expect("transfer"), 1214 TransferResult::RequestUidReuse 1215 ); 1216 // wtid reuse 1217 assert_eq!( 1218 db::make_transfer( 1219 &mut db, 1220 &TransferRequest { 1221 request_uid: HashCode::rand(), 1222 ..req.clone() 1223 }, 1224 &payto, 1225 &now 1226 ) 1227 .await 1228 .expect("transfer"), 1229 TransferResult::WtidReuse 1230 ); 1231 // Many 1232 assert_eq!( 1233 db::make_transfer( 1234 &mut db, 1235 &TransferRequest { 1236 request_uid: HashCode::rand(), 1237 wtid: ShortHashCode::rand(), 1238 ..req 1239 }, 1240 &payto, 1241 &later 1242 ) 1243 .await 1244 .expect("transfer"), 1245 TransferResult::Success { 1246 id: 2, 1247 initiated_at: later.into() 1248 } 1249 ); 1250 1251 // Get 1252 assert!( 1253 db::transfer_by_id(&mut db, 1, &CURRENCY) 1254 .await 1255 .unwrap() 1256 .is_some() 1257 ); 1258 assert!( 1259 db::transfer_by_id(&mut db, 2, &CURRENCY) 1260 .await 1261 .unwrap() 1262 .is_some() 1263 ); 1264 assert!( 1265 db::transfer_by_id(&mut db, 3, &CURRENCY) 1266 .await 1267 .unwrap() 1268 .is_none() 1269 ); 1270 assert_eq!( 1271 db::transfer_page(&mut db, &None, &CURRENCY, &Page::default()) 1272 .await 1273 .unwrap() 1274 .len(), 1275 2 1276 ); 1277 } 1278 1279 #[tokio::test] 1280 async fn bounce() { 1281 let (mut db, _) = setup().await; 1282 1283 let amount = decimal("10"); 1284 let payto = cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"); 1285 let now = now_sql_stable_timestamp(); 1286 1287 // Bounce 1288 assert_eq!( 1289 db::register_bounced_tx_in( 1290 &mut db, 1291 &TxIn { 1292 transfer_id: 12, 1293 tx_id: None, 1294 amount, 1295 subject: "subject".to_owned(), 1296 debtor: payto.clone(), 1297 valued_at: now 1298 }, 1299 22, 1300 "good reason", 1301 &now 1302 ) 1303 .await 1304 .expect("bounce"), 1305 BounceResult { 1306 tx_id: 1, 1307 tx_new: true 1308 } 1309 ); 1310 // Idempotent 1311 assert_eq!( 1312 db::register_bounced_tx_in( 1313 &mut db, 1314 &TxIn { 1315 transfer_id: 12, 1316 tx_id: None, 1317 amount: amount.clone(), 1318 subject: "subject".to_owned(), 1319 debtor: payto.clone(), 1320 valued_at: now 1321 }, 1322 22, 1323 "good reason", 1324 &now 1325 ) 1326 .await 1327 .expect("bounce"), 1328 BounceResult { 1329 tx_id: 1, 1330 tx_new: false 1331 } 1332 ); 1333 1334 // Many 1335 assert_eq!( 1336 db::register_bounced_tx_in( 1337 &mut db, 1338 &TxIn { 1339 transfer_id: 13, 1340 tx_id: None, 1341 amount: amount.clone(), 1342 subject: "subject".to_owned(), 1343 debtor: payto.clone(), 1344 valued_at: now 1345 }, 1346 23, 1347 "good reason", 1348 &now 1349 ) 1350 .await 1351 .expect("bounce"), 1352 BounceResult { 1353 tx_id: 2, 1354 tx_new: true 1355 } 1356 ); 1357 } 1358 1359 #[tokio::test] 1360 async fn status() { 1361 let (mut db, _) = setup().await; 1362 let cyclos_payto = cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"); 1363 1364 async fn check_status( 1365 db: &mut PgConnection, 1366 id: u64, 1367 status: TransferState, 1368 msg: Option<&str>, 1369 ) { 1370 let transfer = db::transfer_by_id(db, id, &CURRENCY) 1371 .await 1372 .unwrap() 1373 .unwrap(); 1374 assert_eq!( 1375 (status, msg), 1376 (transfer.status, transfer.status_msg.as_deref()) 1377 ); 1378 } 1379 1380 // Unknown transfer 1381 db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg") 1382 .await 1383 .unwrap(); 1384 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1385 .await 1386 .unwrap(); 1387 1388 // Failure 1389 db::make_transfer( 1390 &mut db, 1391 &TransferRequest { 1392 request_uid: HashCode::rand(), 1393 amount: amount(format!("{}:1", *CURRENCY)), 1394 exchange_base_url: url("https://exchange.test.com/"), 1395 wtid: ShortHashCode::rand(), 1396 credit_account: payto( 1397 "payto://iban/HU02162000031000164800000000?receiver-name=name", 1398 ), 1399 }, 1400 &cyclos_payto, 1401 &Timestamp::now(), 1402 ) 1403 .await 1404 .expect("transfer"); 1405 check_status(&mut db, 1, TransferState::pending, None).await; 1406 db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "error status") 1407 .await 1408 .unwrap(); 1409 check_status( 1410 &mut db, 1411 1, 1412 TransferState::permanent_failure, 1413 Some("error status"), 1414 ) 1415 .await; 1416 1417 // Success 1418 db::make_transfer( 1419 &mut db, 1420 &TransferRequest { 1421 request_uid: HashCode::rand(), 1422 amount: amount(format!("{}:2", *CURRENCY)), 1423 exchange_base_url: url("https://exchange.test.com/"), 1424 wtid: ShortHashCode::rand(), 1425 credit_account: payto( 1426 "payto://iban/HU02162000031000164800000000?receiver-name=name", 1427 ), 1428 }, 1429 &cyclos_payto, 1430 &Timestamp::now(), 1431 ) 1432 .await 1433 .expect("transfer"); 1434 check_status(&mut db, 2, TransferState::pending, None).await; 1435 db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3) 1436 .await 1437 .unwrap(); 1438 check_status(&mut db, 2, TransferState::pending, None).await; 1439 db::register_tx_out( 1440 &mut db, 1441 &TxOut { 1442 transfer_id: 2, 1443 tx_id: Some(3), 1444 amount: decimal("2"), 1445 subject: "".to_string(), 1446 creditor: cyclos_payto, 1447 valued_at: Timestamp::now(), 1448 }, 1449 &TxOutKind::Simple, 1450 &Timestamp::now(), 1451 ) 1452 .await 1453 .unwrap(); 1454 check_status(&mut db, 2, TransferState::success, None).await; 1455 } 1456 1457 #[tokio::test] 1458 async fn batch() { 1459 let (mut db, _) = setup().await; 1460 let start = Timestamp::now(); 1461 let cyclos_payto = cyclos_payto("payto://cyclos/31000163100000000?receiver-name=name"); 1462 1463 // Empty db 1464 let pendings = db::pending_batch(&mut db, &start) 1465 .await 1466 .expect("pending_batch"); 1467 assert_eq!(pendings.len(), 0); 1468 1469 // Some transfers 1470 for i in 0..3 { 1471 db::make_transfer( 1472 &mut db, 1473 &TransferRequest { 1474 request_uid: HashCode::rand(), 1475 amount: amount(format!("{}:{}", *CURRENCY, i + 1)), 1476 exchange_base_url: url("https://exchange.test.com/"), 1477 wtid: ShortHashCode::rand(), 1478 credit_account: payto( 1479 "payto://iban/HU02162000031000164800000000?receiver-name=name", 1480 ), 1481 }, 1482 &cyclos_payto, 1483 &Timestamp::now(), 1484 ) 1485 .await 1486 .expect("transfer"); 1487 } 1488 let pendings = db::pending_batch(&mut db, &start) 1489 .await 1490 .expect("pending_batch"); 1491 assert_eq!(pendings.len(), 3); 1492 1493 // Max 100 txs in batch 1494 for i in 0..100 { 1495 db::make_transfer( 1496 &mut db, 1497 &TransferRequest { 1498 request_uid: HashCode::rand(), 1499 amount: amount(format!("{}:{}", *CURRENCY, i + 1)), 1500 exchange_base_url: url("https://exchange.test.com/"), 1501 wtid: ShortHashCode::rand(), 1502 credit_account: payto( 1503 "payto://iban/HU02162000031000164800000000?receiver-name=name", 1504 ), 1505 }, 1506 &cyclos_payto, 1507 &Timestamp::now(), 1508 ) 1509 .await 1510 .expect("transfer"); 1511 } 1512 let pendings = db::pending_batch(&mut db, &start) 1513 .await 1514 .expect("pending_batch"); 1515 assert_eq!(pendings.len(), 100); 1516 1517 // Skip uploaded 1518 for i in 0..=10 { 1519 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1520 .await 1521 .expect("status success"); 1522 } 1523 let pendings = db::pending_batch(&mut db, &start) 1524 .await 1525 .expect("pending_batch"); 1526 assert_eq!(pendings.len(), 93); 1527 1528 // Skip failed 1529 for i in 0..=10 { 1530 db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure") 1531 .await 1532 .expect("status failure"); 1533 } 1534 let pendings = db::pending_batch(&mut db, &start) 1535 .await 1536 .expect("pending_batch"); 1537 assert_eq!(pendings.len(), 83); 1538 } 1539 }