db.rs (49234B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use compact_str::CompactString; 20 use jiff::Timestamp; 21 use serde::{Serialize, de::DeserializeOwned}; 22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow}; 23 use taler_api::{ 24 db::{BindHelper, TypeHelper, history, page}, 25 serialized, 26 subject::{IncomingSubject, OutgoingSubject, fmt_out_subject}, 27 }; 28 use taler_common::{ 29 api_common::{HashCode, ShortHashCode}, 30 api_params::{History, Page}, 31 api_revenue::RevenueIncomingBankTransaction, 32 api_transfer::{RegistrationRequest, Unregistration}, 33 api_wire::{ 34 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, 35 TransferStatus, 36 }, 37 config::Config, 38 db::IncomingType, 39 types::{ 40 amount::{Currency, Decimal}, 41 payto::{PaytoImpl as _, PaytoURI}, 42 }, 43 }; 44 use tokio::sync::watch::{Receiver, Sender}; 45 use url::Url; 46 47 use crate::{ 48 config::parse_db_cfg, 49 payto::{CyclosAccount, CyclosId, FullCyclosPayto}, 50 }; 51 52 const SCHEMA: &str = "cyclos"; 53 54 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { 55 let db = parse_db_cfg(cfg)?; 56 let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; 57 Ok(pool) 58 } 59 60 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { 61 let db_cfg = parse_db_cfg(cfg)?; 62 let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; 63 let mut db = pool.acquire().await?; 64 taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?; 65 Ok(pool) 66 } 67 68 pub async fn notification_listener( 69 pool: PgPool, 70 in_channel: Sender<i64>, 71 taler_in_channel: Sender<i64>, 72 out_channel: Sender<i64>, 73 taler_out_channel: Sender<i64>, 74 ) -> sqlx::Result<()> { 75 taler_api::notification::notification_listener!(&pool, 76 "tx_in" => (row_id: i64) { 77 in_channel.send_replace(row_id); 78 }, 79 "taler_in" => (row_id: i64) { 80 taler_in_channel.send_replace(row_id); 81 }, 82 "tx_out" => (row_id: i64) { 83 out_channel.send_replace(row_id); 84 }, 85 "taler_out" => (row_id: i64) { 86 taler_out_channel.send_replace(row_id); 87 } 88 ) 89 } 90 91 #[derive(Debug, Clone)] 92 pub struct TxIn { 93 pub transfer_id: i64, 94 pub tx_id: Option<i64>, 95 pub amount: Decimal, 96 pub subject: String, 97 pub debtor_id: i64, 98 pub debtor_name: CompactString, 99 pub valued_at: Timestamp, 100 } 101 102 impl Display for TxIn { 103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 104 let Self { 105 transfer_id, 106 tx_id, 107 amount, 108 subject, 109 valued_at, 110 debtor_id, 111 debtor_name, 112 } = self; 113 let tx_id = match tx_id { 114 Some(id) => format_args!(":{}", *id), 115 None => format_args!(""), 116 }; 117 write!( 118 f, 119 "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'" 120 ) 121 } 122 } 123 124 #[derive(Debug, Clone)] 125 pub struct TxOut { 126 pub transfer_id: i64, 127 pub tx_id: Option<i64>, 128 pub amount: Decimal, 129 pub subject: String, 130 pub creditor_id: i64, 131 pub creditor_name: CompactString, 132 pub valued_at: Timestamp, 133 } 134 135 impl Display for TxOut { 136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 137 let Self { 138 transfer_id, 139 tx_id, 140 amount, 141 subject, 142 creditor_id, 143 creditor_name, 144 valued_at, 145 } = self; 146 let tx_id = match tx_id { 147 Some(id) => format_args!(":{}", *id), 148 None => format_args!(""), 149 }; 150 write!( 151 f, 152 "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 153 ) 154 } 155 } 156 157 #[derive(Debug, PartialEq, Eq)] 158 pub struct Initiated { 159 pub id: i64, 160 pub amount: Decimal, 161 pub subject: String, 162 pub creditor_id: i64, 163 pub creditor_name: CompactString, 164 } 165 166 impl Display for Initiated { 167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 168 let Self { 169 id, 170 amount, 171 subject, 172 creditor_id, 173 creditor_name, 174 } = self; 175 write!( 176 f, 177 "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 178 ) 179 } 180 } 181 182 #[derive(Debug, Clone)] 183 pub struct TxInAdmin { 184 pub amount: Decimal, 185 pub subject: String, 186 pub debtor_id: i64, 187 pub debtor_name: CompactString, 188 pub metadata: IncomingSubject, 189 } 190 191 #[derive(Debug, PartialEq, Eq)] 192 pub enum AddIncomingResult { 193 Success { 194 new: bool, 195 pending: bool, 196 row_id: u64, 197 valued_at: Timestamp, 198 }, 199 ReservePubReuse, 200 UnknownMapping, 201 MappingReuse, 202 } 203 204 /// Lock the database for worker execution 205 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { 206 sqlx::query("SELECT pg_try_advisory_lock(42)") 207 .try_map(|r: PgRow| r.try_get(0)) 208 .fetch_one(e) 209 .await 210 } 211 212 pub async fn register_tx_in_admin( 213 db: &PgPool, 214 tx: &TxInAdmin, 215 now: &Timestamp, 216 ) -> sqlx::Result<AddIncomingResult> { 217 serialized!( 218 sqlx::query( 219 " 220 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 221 FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5) 222 ", 223 ) 224 .bind(tx.amount) 225 .bind(&tx.subject) 226 .bind(tx.debtor_id) 227 .bind(&tx.debtor_name) 228 .bind_timestamp(now) 229 .bind(tx.metadata.ty()) 230 .bind(tx.metadata.key()) 231 .try_map(|r: PgRow| { 232 Ok(if r.try_get_flag(0)? { 233 AddIncomingResult::ReservePubReuse 234 } else if r.try_get_flag(1)? { 235 AddIncomingResult::MappingReuse 236 } else if r.try_get_flag(2)? { 237 AddIncomingResult::UnknownMapping 238 } else { 239 AddIncomingResult::Success { 240 row_id: r.try_get_u64(3)?, 241 valued_at: r.try_get_timestamp(4)?, 242 new: r.try_get(5)?, 243 pending: r.try_get(6)? 244 } 245 }) 246 }) 247 .fetch_one(db) 248 ) 249 } 250 251 pub async fn register_tx_in( 252 db: &mut PgConnection, 253 tx: &TxIn, 254 subject: &Option<IncomingSubject>, 255 now: &Timestamp, 256 ) -> sqlx::Result<AddIncomingResult> { 257 serialized!( 258 sqlx::query( 259 " 260 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 261 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 262 ", 263 ) 264 .bind(tx.transfer_id) 265 .bind(tx.tx_id) 266 .bind(tx.amount) 267 .bind(&tx.subject) 268 .bind(tx.debtor_id) 269 .bind(&tx.debtor_name) 270 .bind(tx.valued_at.as_microsecond()) 271 .bind(subject.as_ref().map(|it| it.ty())) 272 .bind(subject.as_ref().map(|it| it.key())) 273 .bind(now.as_microsecond()) 274 .try_map(|r: PgRow| { 275 Ok(if r.try_get_flag(0)? { 276 AddIncomingResult::ReservePubReuse 277 } else if r.try_get_flag(1)? { 278 AddIncomingResult::MappingReuse 279 } else if r.try_get_flag(2)? { 280 AddIncomingResult::UnknownMapping 281 } else { 282 AddIncomingResult::Success { 283 row_id: r.try_get_u64(3)?, 284 valued_at: r.try_get_timestamp(4)?, 285 new: r.try_get(5)?, 286 pending: r.try_get(6)? 287 } 288 }) 289 }) 290 .fetch_one(&mut *db) 291 ) 292 } 293 294 #[derive(Debug)] 295 pub enum TxOutKind { 296 Simple, 297 Bounce(i64), 298 Talerable(OutgoingSubject), 299 } 300 301 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 302 #[allow(non_camel_case_types)] 303 #[sqlx(type_name = "register_result")] 304 pub enum RegisterResult { 305 /// Already registered 306 idempotent, 307 /// Initiated transaction 308 known, 309 /// Recovered unknown outgoing transaction 310 recovered, 311 } 312 313 #[derive(Debug, PartialEq, Eq)] 314 pub struct AddOutgoingResult { 315 pub result: RegisterResult, 316 pub row_id: i64, 317 } 318 319 pub async fn register_tx_out( 320 db: &mut PgConnection, 321 tx: &TxOut, 322 kind: &TxOutKind, 323 now: &Timestamp, 324 ) -> sqlx::Result<AddOutgoingResult> { 325 serialized!({ 326 let query = sqlx::query( 327 " 328 SELECT out_result, out_tx_row_id 329 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) 330 ", 331 ) 332 .bind(tx.transfer_id) 333 .bind(tx.tx_id) 334 .bind(tx.amount) 335 .bind(&tx.subject) 336 .bind(tx.creditor_id) 337 .bind(&tx.creditor_name) 338 .bind_timestamp(&tx.valued_at); 339 let query = match kind { 340 TxOutKind::Simple => query 341 .bind(None::<&[u8]>) 342 .bind(None::<&str>) 343 .bind(None::<&str>) 344 .bind(None::<i64>), 345 TxOutKind::Bounce(bounced) => query 346 .bind(None::<&[u8]>) 347 .bind(None::<&str>) 348 .bind(None::<&str>) 349 .bind(*bounced), 350 TxOutKind::Talerable(subject) => query 351 .bind(&subject.wtid) 352 .bind(subject.exchange_base_url.as_str()) 353 .bind(&subject.metadata) 354 .bind(None::<i64>), 355 }; 356 query 357 .bind_timestamp(now) 358 .try_map(|r: PgRow| { 359 Ok(AddOutgoingResult { 360 result: r.try_get(0)?, 361 row_id: r.try_get(1)?, 362 }) 363 }) 364 .fetch_one(&mut *db) 365 }) 366 } 367 368 #[derive(Debug, PartialEq, Eq)] 369 pub enum TransferResult { 370 Success { id: u64, initiated_at: Timestamp }, 371 RequestUidReuse, 372 WtidReuse, 373 } 374 375 #[derive(Debug, Clone)] 376 pub struct Transfer { 377 pub request_uid: HashCode, 378 pub amount: Decimal, 379 pub exchange_base_url: Url, 380 pub metadata: Option<CompactString>, 381 pub wtid: ShortHashCode, 382 pub creditor_id: i64, 383 pub creditor_name: CompactString, 384 } 385 386 pub async fn make_transfer( 387 db: &PgPool, 388 tx: &Transfer, 389 now: &Timestamp, 390 ) -> sqlx::Result<TransferResult> { 391 let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref()); 392 serialized!( 393 sqlx::query( 394 " 395 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 396 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9) 397 ", 398 ) 399 .bind(&tx.request_uid) 400 .bind(&tx.wtid) 401 .bind(&subject) 402 .bind(tx.amount) 403 .bind(tx.exchange_base_url.as_str()) 404 .bind(&tx.metadata) 405 .bind(tx.creditor_id) 406 .bind(&tx.creditor_name) 407 .bind_timestamp(now) 408 .try_map(|r: PgRow| { 409 Ok(if r.try_get_flag(0)? { 410 TransferResult::RequestUidReuse 411 } else if r.try_get_flag(1)? { 412 TransferResult::WtidReuse 413 } else { 414 TransferResult::Success { 415 id: r.try_get_u64(2)?, 416 initiated_at: r.try_get_timestamp(3)?, 417 } 418 }) 419 }) 420 .fetch_one(db) 421 ) 422 } 423 424 #[derive(Debug, PartialEq, Eq)] 425 pub struct BounceResult { 426 pub tx_id: u64, 427 pub tx_new: bool, 428 } 429 430 pub async fn register_bounced_tx_in( 431 db: &mut PgConnection, 432 tx: &TxIn, 433 chargeback_id: i64, 434 reason: &str, 435 now: &Timestamp, 436 ) -> sqlx::Result<BounceResult> { 437 serialized!( 438 sqlx::query( 439 " 440 SELECT out_tx_row_id, out_tx_new 441 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 442 ", 443 ) 444 .bind(tx.transfer_id) 445 .bind(tx.tx_id) 446 .bind(tx.amount) 447 .bind(&tx.subject) 448 .bind(tx.debtor_id) 449 .bind(&tx.debtor_name) 450 .bind_timestamp(&tx.valued_at) 451 .bind(chargeback_id) 452 .bind(reason) 453 .bind_timestamp(now) 454 .try_map(|r: PgRow| { 455 Ok(BounceResult { 456 tx_id: r.try_get_u64(0)?, 457 tx_new: r.try_get(1)?, 458 }) 459 }) 460 .fetch_one(&mut *db) 461 ) 462 } 463 464 pub async fn transfer_page( 465 db: &PgPool, 466 status: &Option<TransferState>, 467 currency: &Currency, 468 root: &CompactString, 469 params: &Page, 470 ) -> sqlx::Result<Vec<TransferListStatus>> { 471 page( 472 db, 473 "initiated_id", 474 params, 475 || { 476 let mut builder = QueryBuilder::new( 477 " 478 SELECT 479 initiated_id, 480 status, 481 amount, 482 credit_account, 483 credit_name, 484 initiated_at 485 FROM transfer 486 JOIN initiated USING (initiated_id) 487 WHERE 488 ", 489 ); 490 if let Some(status) = status { 491 builder.push(" status = ").push_bind(status).push(" AND "); 492 } 493 builder 494 }, 495 |r: PgRow| { 496 Ok(TransferListStatus { 497 row_id: r.try_get_safeu64(0)?, 498 status: r.try_get(1)?, 499 amount: r.try_get_amount(2, currency)?, 500 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 501 timestamp: r.try_get_timestamp(5)?.into(), 502 }) 503 }, 504 ) 505 .await 506 } 507 508 pub async fn outgoing_history( 509 db: &PgPool, 510 params: &History, 511 currency: &Currency, 512 root: &CompactString, 513 listen: impl FnOnce() -> Receiver<i64>, 514 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 515 history( 516 db, 517 "tx_out_id", 518 params, 519 listen, 520 || { 521 QueryBuilder::new( 522 " 523 SELECT 524 tx_out_id, 525 amount, 526 credit_account, 527 credit_name, 528 valued_at, 529 exchange_base_url, 530 metadata, 531 wtid 532 FROM taler_out 533 JOIN tx_out USING (tx_out_id) 534 WHERE 535 ", 536 ) 537 }, 538 |r: PgRow| { 539 Ok(OutgoingBankTransaction { 540 row_id: r.try_get_safeu64(0)?, 541 amount: r.try_get_amount(1, currency)?, 542 debit_fee: None, 543 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?, 544 date: r.try_get_timestamp(4)?.into(), 545 exchange_base_url: r.try_get_url(5)?, 546 metadata: r.try_get(6)?, 547 wtid: r.try_get(7)?, 548 }) 549 }, 550 ) 551 .await 552 } 553 554 pub async fn incoming_history( 555 db: &PgPool, 556 params: &History, 557 currency: &Currency, 558 root: &CompactString, 559 listen: impl FnOnce() -> Receiver<i64>, 560 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 561 history( 562 db, 563 "tx_in_id", 564 params, 565 listen, 566 || { 567 QueryBuilder::new( 568 " 569 SELECT 570 type, 571 tx_in_id, 572 amount, 573 debit_account, 574 debit_name, 575 valued_at, 576 metadata, 577 authorization_pub, 578 authorization_sig 579 FROM taler_in 580 JOIN tx_in USING (tx_in_id) 581 WHERE 582 ", 583 ) 584 }, 585 |r: PgRow| { 586 Ok(match r.try_get(0)? { 587 IncomingType::reserve => IncomingBankTransaction::Reserve { 588 row_id: r.try_get_safeu64(1)?, 589 amount: r.try_get_amount(2, currency)?, 590 credit_fee: None, 591 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 592 date: r.try_get_timestamp(5)?.into(), 593 reserve_pub: r.try_get(6)?, 594 authorization_pub: r.try_get(7)?, 595 authorization_sig: r.try_get(8)?, 596 }, 597 IncomingType::kyc => IncomingBankTransaction::Kyc { 598 row_id: r.try_get_safeu64(1)?, 599 amount: r.try_get_amount(2, currency)?, 600 credit_fee: None, 601 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 602 date: r.try_get_timestamp(5)?.into(), 603 account_pub: r.try_get(6)?, 604 authorization_pub: r.try_get(7)?, 605 authorization_sig: r.try_get(8)?, 606 }, 607 IncomingType::map => unimplemented!("MAP are never listed in the history"), 608 }) 609 }, 610 ) 611 .await 612 } 613 614 pub async fn revenue_history( 615 db: &PgPool, 616 params: &History, 617 currency: &Currency, 618 root: &CompactString, 619 listen: impl FnOnce() -> Receiver<i64>, 620 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 621 history( 622 db, 623 "tx_in_id", 624 params, 625 listen, 626 || { 627 QueryBuilder::new( 628 " 629 SELECT 630 tx_in_id, 631 valued_at, 632 amount, 633 debit_account, 634 debit_name, 635 subject 636 FROM tx_in 637 WHERE 638 ", 639 ) 640 }, 641 |r: PgRow| { 642 Ok(RevenueIncomingBankTransaction { 643 row_id: r.try_get_safeu64(0)?, 644 date: r.try_get_timestamp(1)?.into(), 645 amount: r.try_get_amount(2, currency)?, 646 credit_fee: None, 647 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 648 subject: r.try_get(5)?, 649 }) 650 }, 651 ) 652 .await 653 } 654 655 pub async fn transfer_by_id( 656 db: &PgPool, 657 id: u64, 658 currency: &Currency, 659 root: &CompactString, 660 ) -> sqlx::Result<Option<TransferStatus>> { 661 serialized!( 662 sqlx::query( 663 " 664 SELECT 665 status, 666 status_msg, 667 amount, 668 exchange_base_url, 669 metadata, 670 wtid, 671 credit_account, 672 credit_name, 673 initiated_at 674 FROM transfer 675 JOIN initiated USING (initiated_id) 676 WHERE initiated_id = $1 677 ", 678 ) 679 .bind(id as i64) 680 .try_map(|r: PgRow| { 681 Ok(TransferStatus { 682 status: r.try_get(0)?, 683 status_msg: r.try_get(1)?, 684 amount: r.try_get_amount(2, currency)?, 685 origin_exchange_url: r.try_get(3)?, 686 metadata: r.try_get(4)?, 687 wtid: r.try_get(5)?, 688 credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?, 689 timestamp: r.try_get_timestamp(8)?.into(), 690 }) 691 }) 692 .fetch_optional(db) 693 ) 694 } 695 696 /** Get a batch of pending initiated transactions not attempted since [start] */ 697 pub async fn pending_batch( 698 db: &mut PgConnection, 699 start: &Timestamp, 700 ) -> sqlx::Result<Vec<Initiated>> { 701 serialized!( 702 sqlx::query( 703 " 704 SELECT initiated_id, amount, subject, credit_account, credit_name 705 FROM initiated 706 WHERE tx_id IS NULL 707 AND status='pending' 708 AND (last_submitted IS NULL OR last_submitted < $1) 709 LIMIT 100 710 ", 711 ) 712 .bind_timestamp(start) 713 .try_map(|r: PgRow| { 714 Ok(Initiated { 715 id: r.try_get(0)?, 716 amount: r.try_get(1)?, 717 subject: r.try_get(2)?, 718 creditor_id: r.try_get(3)?, 719 creditor_name: r.try_get(4)?, 720 }) 721 }) 722 .fetch_all(&mut *db) 723 ) 724 } 725 726 /** Update status of a successful submitted initiated transaction */ 727 pub async fn initiated_submit_success( 728 db: &mut PgConnection, 729 initiated_id: i64, 730 timestamp: &Timestamp, 731 tx_id: i64, 732 ) -> sqlx::Result<()> { 733 serialized!( 734 sqlx::query( 735 " 736 UPDATE initiated 737 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2 738 WHERE initiated_id=$3 739 " 740 ) 741 .bind_timestamp(timestamp) 742 .bind(tx_id) 743 .bind(initiated_id) 744 .execute(&mut *db) 745 )?; 746 Ok(()) 747 } 748 749 /** Update status of a permanently failed initiated transaction */ 750 pub async fn initiated_submit_permanent_failure( 751 db: &mut PgConnection, 752 initiated_id: i64, 753 msg: &str, 754 ) -> sqlx::Result<()> { 755 serialized!( 756 sqlx::query( 757 " 758 UPDATE initiated 759 SET status='permanent_failure', status_msg=$1 760 WHERE initiated_id=$2 761 ", 762 ) 763 .bind(msg) 764 .bind(initiated_id) 765 .execute(&mut *db) 766 )?; 767 Ok(()) 768 } 769 770 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 771 pub enum ChargebackFailureResult { 772 Unknown, 773 Known(u64), 774 Idempotent(u64), 775 } 776 777 /** Update status of a charged back initiated transaction */ 778 pub async fn initiated_chargeback_failure( 779 db: &mut PgConnection, 780 transfer_id: i64, 781 ) -> sqlx::Result<ChargebackFailureResult> { 782 Ok(serialized!( 783 sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)") 784 .bind(transfer_id) 785 .try_map(|r: PgRow| { 786 let id = r.try_get_u64(0)?; 787 Ok(if id == 0 { 788 ChargebackFailureResult::Unknown 789 } else if r.try_get(1)? { 790 ChargebackFailureResult::Known(id) 791 } else { 792 ChargebackFailureResult::Idempotent(id) 793 }) 794 }) 795 .fetch_optional(&mut *db) 796 )? 797 .unwrap_or(ChargebackFailureResult::Unknown)) 798 } 799 800 /** Get JSON value from KV table */ 801 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>( 802 db: &mut PgConnection, 803 key: &str, 804 ) -> sqlx::Result<Option<T>> { 805 serialized!( 806 sqlx::query("SELECT value FROM kv WHERE key=$1") 807 .bind(key) 808 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 809 .fetch_optional(&mut *db) 810 ) 811 } 812 813 /** Set JSON value in KV table */ 814 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> { 815 serialized!( 816 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 817 .bind(key) 818 .bind(sqlx::types::Json(value)) 819 .execute(&mut *db) 820 )?; 821 Ok(()) 822 } 823 824 pub enum RegistrationResult { 825 Success, 826 ReservePubReuse, 827 } 828 829 pub async fn transfer_register( 830 db: &PgPool, 831 req: &RegistrationRequest, 832 ) -> sqlx::Result<RegistrationResult> { 833 let ty: IncomingType = req.r#type.into(); 834 serialized!( 835 sqlx::query( 836 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 837 ) 838 .bind(ty) 839 .bind(&req.account_pub) 840 .bind(&req.authorization_pub) 841 .bind(&req.authorization_sig) 842 .bind(req.recurrent) 843 .bind_timestamp(&Timestamp::now()) 844 .try_map(|r: PgRow| { 845 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 846 RegistrationResult::ReservePubReuse 847 } else { 848 RegistrationResult::Success 849 }) 850 }) 851 .fetch_one(db) 852 ) 853 } 854 855 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 856 serialized!( 857 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)") 858 .bind(&req.authorization_pub) 859 .try_map(|r: PgRow| r.try_get_flag("out_found")) 860 .fetch_one(db) 861 ) 862 } 863 864 pub trait CyclosTypeHelper { 865 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 866 &self, 867 idx: I, 868 name: I, 869 root: &CompactString, 870 ) -> sqlx::Result<FullCyclosPayto>; 871 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 872 &self, 873 idx: I, 874 name: I, 875 root: &CompactString, 876 ) -> sqlx::Result<PaytoURI>; 877 } 878 879 impl CyclosTypeHelper for PgRow { 880 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 881 &self, 882 idx: I, 883 name: I, 884 root: &CompactString, 885 ) -> sqlx::Result<FullCyclosPayto> { 886 let idx = self.try_get(idx)?; 887 let name = self.try_get(name)?; 888 Ok(FullCyclosPayto::new( 889 CyclosAccount { 890 id: CyclosId(idx), 891 root: root.clone(), 892 }, 893 name, 894 )) 895 } 896 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 897 &self, 898 idx: I, 899 name: I, 900 root: &CompactString, 901 ) -> sqlx::Result<PaytoURI> { 902 let idx = self.try_get(idx)?; 903 let name = self.try_get(name)?; 904 Ok(CyclosAccount { 905 id: CyclosId(idx), 906 root: root.clone(), 907 } 908 .as_full_payto(name)) 909 } 910 } 911 912 #[cfg(test)] 913 mod test { 914 use std::sync::LazyLock; 915 916 use compact_str::CompactString; 917 use jiff::{Span, Timestamp}; 918 use serde_json::json; 919 use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow}; 920 use taler_api::{ 921 db::TypeHelper, 922 notification::dummy_listen, 923 subject::{IncomingSubject, OutgoingSubject}, 924 }; 925 use taler_common::{ 926 api_common::{EddsaPublicKey, HashCode, ShortHashCode}, 927 api_params::{History, Page}, 928 api_wire::TransferState, 929 types::{ 930 amount::{Currency, decimal}, 931 url, 932 utils::now_sql_stable_ts, 933 }, 934 }; 935 936 use crate::{ 937 constants::CONFIG_SOURCE, 938 db::{ 939 self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult, 940 Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind, 941 }, 942 }; 943 944 pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "TEST".parse().unwrap()); 945 pub const ROOT: CompactString = CompactString::const_new("localhost"); 946 947 async fn setup() -> (PoolConnection<Postgres>, PgPool) { 948 taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await 949 } 950 951 #[tokio::test] 952 async fn kv() { 953 let (mut db, _) = setup().await; 954 955 let value = json!({ 956 "name": "Mr Smith", 957 "no way": 32 958 }); 959 960 assert_eq!( 961 db::kv_get::<serde_json::Value>(&mut db, "value") 962 .await 963 .unwrap(), 964 None 965 ); 966 db::kv_set(&mut db, "value", &value).await.unwrap(); 967 db::kv_set(&mut db, "value", &value).await.unwrap(); 968 assert_eq!( 969 db::kv_get::<serde_json::Value>(&mut db, "value") 970 .await 971 .unwrap(), 972 Some(value) 973 ); 974 } 975 976 #[tokio::test] 977 async fn tx_in() { 978 let (mut db, pool) = setup().await; 979 980 let mut routine = async |first: &Option<IncomingSubject>, 981 second: &Option<IncomingSubject>| { 982 let id = sqlx::query("SELECT count(*) + 1 FROM tx_in") 983 .try_map(|r: PgRow| r.try_get_u64(0)) 984 .fetch_one(&mut *db) 985 .await 986 .unwrap(); 987 let now = now_sql_stable_ts(); 988 let later = now + Span::new().hours(2); 989 let tx = TxIn { 990 transfer_id: now.as_microsecond() as i64, 991 tx_id: None, 992 amount: decimal("10"), 993 subject: "subject".to_owned(), 994 debtor_id: 31000163100000000, 995 debtor_name: "Name".into(), 996 valued_at: now, 997 }; 998 // Insert 999 assert_eq!( 1000 db::register_tx_in(&mut db, &tx, first, &now) 1001 .await 1002 .expect("register tx in"), 1003 AddIncomingResult::Success { 1004 new: true, 1005 pending: false, 1006 row_id: id, 1007 valued_at: now, 1008 } 1009 ); 1010 // Idempotent 1011 assert_eq!( 1012 db::register_tx_in( 1013 &mut db, 1014 &TxIn { 1015 valued_at: later, 1016 ..tx.clone() 1017 }, 1018 first, 1019 &now 1020 ) 1021 .await 1022 .expect("register tx in"), 1023 AddIncomingResult::Success { 1024 new: false, 1025 pending: false, 1026 row_id: id, 1027 valued_at: now 1028 } 1029 ); 1030 // Many 1031 assert_eq!( 1032 db::register_tx_in( 1033 &mut db, 1034 &TxIn { 1035 transfer_id: later.as_microsecond() as i64, 1036 valued_at: later, 1037 ..tx 1038 }, 1039 second, 1040 &now 1041 ) 1042 .await 1043 .expect("register tx in"), 1044 AddIncomingResult::Success { 1045 new: true, 1046 pending: false, 1047 row_id: id + 1, 1048 valued_at: later 1049 } 1050 ); 1051 }; 1052 1053 // Empty db 1054 assert_eq!( 1055 db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1056 .await 1057 .unwrap(), 1058 Vec::new() 1059 ); 1060 assert_eq!( 1061 db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1062 .await 1063 .unwrap(), 1064 Vec::new() 1065 ); 1066 1067 // Regular transaction 1068 routine(&None, &None).await; 1069 1070 // Reserve transaction 1071 routine( 1072 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1073 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1074 ) 1075 .await; 1076 1077 // Kyc transaction 1078 routine( 1079 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1080 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1081 ) 1082 .await; 1083 1084 // History 1085 assert_eq!( 1086 db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1087 .await 1088 .unwrap() 1089 .len(), 1090 6 1091 ); 1092 assert_eq!( 1093 db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1094 .await 1095 .unwrap() 1096 .len(), 1097 4 1098 ); 1099 } 1100 1101 #[tokio::test] 1102 async fn tx_in_admin() { 1103 let (_, pool) = setup().await; 1104 1105 // Empty db 1106 assert_eq!( 1107 db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1108 .await 1109 .unwrap(), 1110 Vec::new() 1111 ); 1112 1113 let now = now_sql_stable_ts(); 1114 let later = now + Span::new().hours(2); 1115 let tx = TxInAdmin { 1116 amount: decimal("10"), 1117 subject: "subject".to_owned(), 1118 debtor_id: 31000163100000000, 1119 debtor_name: "Name".into(), 1120 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1121 }; 1122 // Insert 1123 assert_eq!( 1124 db::register_tx_in_admin(&pool, &tx, &now) 1125 .await 1126 .expect("register tx in"), 1127 AddIncomingResult::Success { 1128 new: true, 1129 pending: false, 1130 row_id: 1, 1131 valued_at: now 1132 } 1133 ); 1134 // Many 1135 assert_eq!( 1136 db::register_tx_in_admin( 1137 &pool, 1138 &TxInAdmin { 1139 subject: "Other".to_owned(), 1140 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1141 ..tx.clone() 1142 }, 1143 &later 1144 ) 1145 .await 1146 .expect("register tx in"), 1147 AddIncomingResult::Success { 1148 new: true, 1149 pending: false, 1150 row_id: 2, 1151 valued_at: later 1152 } 1153 ); 1154 1155 // History 1156 assert_eq!( 1157 db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1158 .await 1159 .unwrap() 1160 .len(), 1161 2 1162 ); 1163 } 1164 1165 #[tokio::test] 1166 async fn tx_out() { 1167 let (mut db, pool) = setup().await; 1168 1169 let mut routine = async |first: &TxOutKind, second: &TxOutKind| { 1170 let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out") 1171 .try_map(|r: PgRow| r.try_get(0)) 1172 .fetch_one(&mut *db) 1173 .await 1174 .unwrap(); 1175 let now = now_sql_stable_ts(); 1176 let later = now + Span::new().hours(2); 1177 let tx = TxOut { 1178 transfer_id, 1179 tx_id: Some(transfer_id), 1180 amount: decimal("10"), 1181 subject: "subject".to_owned(), 1182 creditor_id: 31000163100000000, 1183 creditor_name: "Name".into(), 1184 valued_at: now, 1185 }; 1186 assert!(matches!( 1187 db::make_transfer( 1188 &pool, 1189 &Transfer { 1190 request_uid: HashCode::rand(), 1191 amount: decimal("10"), 1192 exchange_base_url: url("https://exchange.test.com/"), 1193 metadata: None, 1194 wtid: ShortHashCode::rand(), 1195 creditor_id: 31000163100000000, 1196 creditor_name: "Name".into() 1197 }, 1198 &now 1199 ) 1200 .await 1201 .unwrap(), 1202 TransferResult::Success { .. } 1203 )); 1204 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id) 1205 .await 1206 .expect("status success"); 1207 1208 // Insert 1209 assert_eq!( 1210 db::register_tx_out(&mut db, &tx, first, &now) 1211 .await 1212 .expect("register tx out"), 1213 AddOutgoingResult { 1214 result: db::RegisterResult::known, 1215 row_id: transfer_id, 1216 } 1217 ); 1218 // Idempotent 1219 assert_eq!( 1220 db::register_tx_out( 1221 &mut db, 1222 &TxOut { 1223 valued_at: later, 1224 ..tx.clone() 1225 }, 1226 first, 1227 &now 1228 ) 1229 .await 1230 .expect("register tx out"), 1231 AddOutgoingResult { 1232 result: db::RegisterResult::idempotent, 1233 row_id: transfer_id, 1234 } 1235 ); 1236 // Recovered 1237 assert_eq!( 1238 db::register_tx_out( 1239 &mut db, 1240 &TxOut { 1241 transfer_id: transfer_id + 1, 1242 tx_id: Some(transfer_id + 1), 1243 valued_at: later, 1244 ..tx.clone() 1245 }, 1246 second, 1247 &now 1248 ) 1249 .await 1250 .expect("register tx out"), 1251 AddOutgoingResult { 1252 result: db::RegisterResult::recovered, 1253 row_id: transfer_id + 1, 1254 } 1255 ); 1256 }; 1257 1258 // Empty db 1259 assert_eq!( 1260 db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1261 .await 1262 .unwrap(), 1263 Vec::new() 1264 ); 1265 1266 // Regular transaction 1267 routine(&TxOutKind::Simple, &TxOutKind::Simple).await; 1268 1269 // Talerable transaction 1270 routine( 1271 &TxOutKind::Talerable(OutgoingSubject::rand()), 1272 &TxOutKind::Talerable(OutgoingSubject::rand()), 1273 ) 1274 .await; 1275 1276 // Bounced transaction 1277 routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1278 1279 // History 1280 assert_eq!( 1281 db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1282 .await 1283 .unwrap() 1284 .len(), 1285 2 1286 ); 1287 } 1288 1289 // TODO tx out failure 1290 1291 #[tokio::test] 1292 async fn transfer() { 1293 let (_, pool) = setup().await; 1294 1295 // Empty db 1296 assert_eq!( 1297 db::transfer_by_id(&pool, 0, &CURRENCY, &ROOT) 1298 .await 1299 .unwrap(), 1300 None 1301 ); 1302 assert_eq!( 1303 db::transfer_page(&pool, &None, &CURRENCY, &ROOT, &Page::default()) 1304 .await 1305 .unwrap(), 1306 Vec::new() 1307 ); 1308 1309 let req = Transfer { 1310 request_uid: HashCode::rand(), 1311 amount: decimal("10"), 1312 exchange_base_url: url("https://exchange.test.com/"), 1313 metadata: None, 1314 wtid: ShortHashCode::rand(), 1315 creditor_id: 31000163100000000, 1316 creditor_name: "Name".into(), 1317 }; 1318 let now = now_sql_stable_ts(); 1319 let later = now + Span::new().hours(2); 1320 // Insert 1321 assert_eq!( 1322 db::make_transfer(&pool, &req, &now) 1323 .await 1324 .expect("transfer"), 1325 TransferResult::Success { 1326 id: 1, 1327 initiated_at: now 1328 } 1329 ); 1330 // Idempotent 1331 assert_eq!( 1332 db::make_transfer(&pool, &req, &later) 1333 .await 1334 .expect("transfer"), 1335 TransferResult::Success { 1336 id: 1, 1337 initiated_at: now 1338 } 1339 ); 1340 // Request UID reuse 1341 assert_eq!( 1342 db::make_transfer( 1343 &pool, 1344 &Transfer { 1345 wtid: ShortHashCode::rand(), 1346 ..req.clone() 1347 }, 1348 &now 1349 ) 1350 .await 1351 .expect("transfer"), 1352 TransferResult::RequestUidReuse 1353 ); 1354 // wtid reuse 1355 assert_eq!( 1356 db::make_transfer( 1357 &pool, 1358 &Transfer { 1359 request_uid: HashCode::rand(), 1360 ..req.clone() 1361 }, 1362 &now 1363 ) 1364 .await 1365 .expect("transfer"), 1366 TransferResult::WtidReuse 1367 ); 1368 // Many 1369 assert_eq!( 1370 db::make_transfer( 1371 &pool, 1372 &Transfer { 1373 request_uid: HashCode::rand(), 1374 wtid: ShortHashCode::rand(), 1375 ..req 1376 }, 1377 &later 1378 ) 1379 .await 1380 .expect("transfer"), 1381 TransferResult::Success { 1382 id: 2, 1383 initiated_at: later 1384 } 1385 ); 1386 1387 // Get 1388 assert!( 1389 db::transfer_by_id(&pool, 1, &CURRENCY, &ROOT) 1390 .await 1391 .unwrap() 1392 .is_some() 1393 ); 1394 assert!( 1395 db::transfer_by_id(&pool, 2, &CURRENCY, &ROOT) 1396 .await 1397 .unwrap() 1398 .is_some() 1399 ); 1400 assert!( 1401 db::transfer_by_id(&pool, 3, &CURRENCY, &ROOT) 1402 .await 1403 .unwrap() 1404 .is_none() 1405 ); 1406 assert_eq!( 1407 db::transfer_page(&pool, &None, &CURRENCY, &ROOT, &Page::default()) 1408 .await 1409 .unwrap() 1410 .len(), 1411 2 1412 ); 1413 } 1414 1415 #[tokio::test] 1416 async fn bounce() { 1417 let (mut db, _) = setup().await; 1418 1419 let amount = decimal("10"); 1420 let now = now_sql_stable_ts(); 1421 1422 // Bounce 1423 assert_eq!( 1424 db::register_bounced_tx_in( 1425 &mut db, 1426 &TxIn { 1427 transfer_id: 12, 1428 tx_id: None, 1429 amount, 1430 subject: "subject".to_owned(), 1431 debtor_id: 31000163100000000, 1432 debtor_name: "Name".into(), 1433 valued_at: now 1434 }, 1435 22, 1436 "good reason", 1437 &now 1438 ) 1439 .await 1440 .expect("bounce"), 1441 BounceResult { 1442 tx_id: 1, 1443 tx_new: true 1444 } 1445 ); 1446 // Idempotent 1447 assert_eq!( 1448 db::register_bounced_tx_in( 1449 &mut db, 1450 &TxIn { 1451 transfer_id: 12, 1452 tx_id: None, 1453 amount, 1454 subject: "subject".to_owned(), 1455 debtor_id: 31000163100000000, 1456 debtor_name: "Name".into(), 1457 valued_at: now 1458 }, 1459 22, 1460 "good reason", 1461 &now 1462 ) 1463 .await 1464 .expect("bounce"), 1465 BounceResult { 1466 tx_id: 1, 1467 tx_new: false 1468 } 1469 ); 1470 1471 // Many 1472 assert_eq!( 1473 db::register_bounced_tx_in( 1474 &mut db, 1475 &TxIn { 1476 transfer_id: 13, 1477 tx_id: None, 1478 amount, 1479 subject: "subject".to_owned(), 1480 debtor_id: 31000163100000000, 1481 debtor_name: "Name".into(), 1482 valued_at: now 1483 }, 1484 23, 1485 "good reason", 1486 &now 1487 ) 1488 .await 1489 .expect("bounce"), 1490 BounceResult { 1491 tx_id: 2, 1492 tx_new: true 1493 } 1494 ); 1495 } 1496 1497 #[tokio::test] 1498 async fn status() { 1499 let (mut db, pool) = setup().await; 1500 1501 let check_status = async |id: u64, status: TransferState, msg: Option<&str>| { 1502 let transfer = db::transfer_by_id(&pool, id, &CURRENCY, &ROOT) 1503 .await 1504 .unwrap() 1505 .unwrap(); 1506 assert_eq!( 1507 (status, msg), 1508 (transfer.status, transfer.status_msg.as_deref()) 1509 ); 1510 }; 1511 1512 // Unknown transfer 1513 db::initiated_submit_permanent_failure(&mut db, 1, "msg") 1514 .await 1515 .unwrap(); 1516 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1517 .await 1518 .unwrap(); 1519 assert_eq!( 1520 db::initiated_chargeback_failure(&mut db, 1).await.unwrap(), 1521 ChargebackFailureResult::Unknown 1522 ); 1523 1524 // Failure 1525 db::make_transfer( 1526 &pool, 1527 &Transfer { 1528 request_uid: HashCode::rand(), 1529 amount: decimal("1"), 1530 exchange_base_url: url("https://exchange.test.com/"), 1531 metadata: None, 1532 wtid: ShortHashCode::rand(), 1533 creditor_id: 31000163100000000, 1534 creditor_name: "Name".into(), 1535 }, 1536 &Timestamp::now(), 1537 ) 1538 .await 1539 .expect("transfer"); 1540 check_status(1, TransferState::pending, None).await; 1541 db::initiated_submit_permanent_failure(&mut db, 1, "error status") 1542 .await 1543 .unwrap(); 1544 check_status(1, TransferState::permanent_failure, Some("error status")).await; 1545 1546 // Success 1547 db::make_transfer( 1548 &pool, 1549 &Transfer { 1550 request_uid: HashCode::rand(), 1551 amount: decimal("1"), 1552 exchange_base_url: url("https://exchange.test.com/"), 1553 metadata: None, 1554 wtid: ShortHashCode::rand(), 1555 creditor_id: 31000163100000000, 1556 creditor_name: "Name".into(), 1557 }, 1558 &Timestamp::now(), 1559 ) 1560 .await 1561 .expect("transfer"); 1562 check_status(2, TransferState::pending, None).await; 1563 db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3) 1564 .await 1565 .unwrap(); 1566 check_status(2, TransferState::pending, None).await; 1567 db::register_tx_out( 1568 &mut db, 1569 &TxOut { 1570 transfer_id: 5, 1571 tx_id: Some(3), 1572 amount: decimal("2"), 1573 subject: "".to_string(), 1574 creditor_id: 31000163100000000, 1575 creditor_name: "Name".into(), 1576 valued_at: Timestamp::now(), 1577 }, 1578 &TxOutKind::Simple, 1579 &Timestamp::now(), 1580 ) 1581 .await 1582 .unwrap(); 1583 check_status(2, TransferState::success, None).await; 1584 1585 // Chargeback 1586 assert_eq!( 1587 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1588 ChargebackFailureResult::Known(2) 1589 ); 1590 check_status(2, TransferState::late_failure, Some("charged back")).await; 1591 assert_eq!( 1592 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1593 ChargebackFailureResult::Idempotent(2) 1594 ); 1595 } 1596 1597 #[tokio::test] 1598 async fn batch() { 1599 let (mut db, pool) = setup().await; 1600 let start = Timestamp::now(); 1601 1602 // Empty db 1603 let pendings = db::pending_batch(&mut db, &start) 1604 .await 1605 .expect("pending_batch"); 1606 assert_eq!(pendings.len(), 0); 1607 1608 // Some transfers 1609 for i in 0..3 { 1610 db::make_transfer( 1611 &pool, 1612 &Transfer { 1613 request_uid: HashCode::rand(), 1614 amount: decimal(format!("{}", i + 1)), 1615 exchange_base_url: url("https://exchange.test.com/"), 1616 metadata: None, 1617 wtid: ShortHashCode::rand(), 1618 creditor_id: 31000163100000000, 1619 creditor_name: "Name".into(), 1620 }, 1621 &Timestamp::now(), 1622 ) 1623 .await 1624 .expect("transfer"); 1625 } 1626 let pendings = db::pending_batch(&mut db, &start) 1627 .await 1628 .expect("pending_batch"); 1629 assert_eq!(pendings.len(), 3); 1630 1631 // Max 100 txs in batch 1632 for i in 0..100 { 1633 db::make_transfer( 1634 &pool, 1635 &Transfer { 1636 request_uid: HashCode::rand(), 1637 amount: decimal(format!("{}", i + 1)), 1638 exchange_base_url: url("https://exchange.test.com/"), 1639 metadata: None, 1640 wtid: ShortHashCode::rand(), 1641 creditor_id: 31000163100000000, 1642 creditor_name: "Name".into(), 1643 }, 1644 &Timestamp::now(), 1645 ) 1646 .await 1647 .expect("transfer"); 1648 } 1649 let pendings = db::pending_batch(&mut db, &start) 1650 .await 1651 .expect("pending_batch"); 1652 assert_eq!(pendings.len(), 100); 1653 1654 // Skip uploaded 1655 for i in 0..=10 { 1656 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1657 .await 1658 .expect("status success"); 1659 } 1660 let pendings = db::pending_batch(&mut db, &start) 1661 .await 1662 .expect("pending_batch"); 1663 assert_eq!(pendings.len(), 93); 1664 1665 // Skip failed 1666 for i in 0..=10 { 1667 db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure") 1668 .await 1669 .expect("status failure"); 1670 } 1671 let pendings = db::pending_batch(&mut db, &start) 1672 .await 1673 .expect("pending_batch"); 1674 assert_eq!(pendings.len(), 83); 1675 } 1676 }