db.rs (49062B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use compact_str::CompactString; 20 use jiff::Timestamp; 21 use serde::{Serialize, de::DeserializeOwned}; 22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow}; 23 use taler_api::{ 24 db::{BindHelper, TypeHelper, history, page}, 25 serialized, 26 subject::{IncomingSubject, OutgoingSubject, fmt_out_subject}, 27 }; 28 use taler_common::{ 29 api::{ 30 HashCode, ShortHashCode, 31 params::{History, Page}, 32 prepared::{RegistrationRequest, Unregistration}, 33 revenue::RevenueIncomingBankTransaction, 34 wire::{ 35 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, 36 TransferStatus, 37 }, 38 }, 39 config::Config, 40 db::IncomingType, 41 types::{ 42 amount::{Currency, Decimal}, 43 payto::{PaytoImpl as _, PaytoURI}, 44 }, 45 }; 46 use tokio::sync::watch::{Receiver, Sender}; 47 use url::Url; 48 49 use crate::{ 50 config::parse_db_cfg, 51 payto::{CyclosAccount, CyclosId, FullCyclosPayto}, 52 }; 53 54 const SCHEMA: &str = "cyclos"; 55 56 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { 57 let db = parse_db_cfg(cfg)?; 58 let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; 59 Ok(pool) 60 } 61 62 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { 63 let db_cfg = parse_db_cfg(cfg)?; 64 let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; 65 let mut db = pool.acquire().await?; 66 taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?; 67 Ok(pool) 68 } 69 70 pub async fn notification_listener( 71 pool: PgPool, 72 in_channel: Sender<i64>, 73 taler_in_channel: Sender<i64>, 74 out_channel: Sender<i64>, 75 taler_out_channel: Sender<i64>, 76 ) { 77 taler_api::notification::notification_listener!(&pool, 78 "tx_in" => (row_id: i64) { 79 in_channel.send_replace(row_id); 80 }, 81 "taler_in" => (row_id: i64) { 82 taler_in_channel.send_replace(row_id); 83 }, 84 "tx_out" => (row_id: i64) { 85 out_channel.send_replace(row_id); 86 }, 87 "taler_out" => (row_id: i64) { 88 taler_out_channel.send_replace(row_id); 89 } 90 ) 91 } 92 93 #[derive(Debug, Clone)] 94 pub struct TxIn { 95 pub transfer_id: i64, 96 pub tx_id: Option<i64>, 97 pub amount: Decimal, 98 pub subject: String, 99 pub debtor_id: i64, 100 pub debtor_name: CompactString, 101 pub valued_at: Timestamp, 102 } 103 104 impl Display for TxIn { 105 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 106 let Self { 107 transfer_id, 108 tx_id, 109 amount, 110 subject, 111 valued_at, 112 debtor_id, 113 debtor_name, 114 } = self; 115 let tx_id = match tx_id { 116 Some(id) => format_args!(":{}", *id), 117 None => format_args!(""), 118 }; 119 write!( 120 f, 121 "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'" 122 ) 123 } 124 } 125 126 #[derive(Debug, Clone)] 127 pub struct TxOut { 128 pub transfer_id: i64, 129 pub tx_id: Option<i64>, 130 pub amount: Decimal, 131 pub subject: String, 132 pub creditor_id: i64, 133 pub creditor_name: CompactString, 134 pub valued_at: Timestamp, 135 } 136 137 impl Display for TxOut { 138 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 139 let Self { 140 transfer_id, 141 tx_id, 142 amount, 143 subject, 144 creditor_id, 145 creditor_name, 146 valued_at, 147 } = self; 148 let tx_id = match tx_id { 149 Some(id) => format_args!(":{}", *id), 150 None => format_args!(""), 151 }; 152 write!( 153 f, 154 "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 155 ) 156 } 157 } 158 159 #[derive(Debug, PartialEq, Eq)] 160 pub struct Initiated { 161 pub id: i64, 162 pub amount: Decimal, 163 pub subject: String, 164 pub creditor_id: i64, 165 pub creditor_name: CompactString, 166 } 167 168 impl Display for Initiated { 169 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 170 let Self { 171 id, 172 amount, 173 subject, 174 creditor_id, 175 creditor_name, 176 } = self; 177 write!( 178 f, 179 "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 180 ) 181 } 182 } 183 184 #[derive(Debug, Clone)] 185 pub struct TxInAdmin { 186 pub amount: Decimal, 187 pub subject: String, 188 pub debtor_id: i64, 189 pub debtor_name: CompactString, 190 pub metadata: IncomingSubject, 191 } 192 193 #[derive(Debug, PartialEq, Eq)] 194 pub enum AddIncomingResult { 195 Success { 196 new: bool, 197 pending: bool, 198 row_id: u64, 199 valued_at: Timestamp, 200 }, 201 ReservePubReuse, 202 UnknownMapping, 203 MappingReuse, 204 } 205 206 /// Lock the database for worker execution 207 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { 208 sqlx::query("SELECT pg_try_advisory_lock(42)") 209 .try_map(|r: PgRow| r.try_get(0)) 210 .fetch_one(e) 211 .await 212 } 213 214 pub async fn register_tx_in_admin( 215 db: &PgPool, 216 tx: &TxInAdmin, 217 now: &Timestamp, 218 ) -> sqlx::Result<AddIncomingResult> { 219 serialized!( 220 sqlx::query( 221 " 222 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 223 FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5) 224 ", 225 ) 226 .bind(tx.amount) 227 .bind(&tx.subject) 228 .bind(tx.debtor_id) 229 .bind(&tx.debtor_name) 230 .bind_timestamp(now) 231 .bind(tx.metadata.ty()) 232 .bind(tx.metadata.key()) 233 .try_map(|r: PgRow| { 234 Ok(if r.try_get_flag(0)? { 235 AddIncomingResult::ReservePubReuse 236 } else if r.try_get_flag(1)? { 237 AddIncomingResult::MappingReuse 238 } else if r.try_get_flag(2)? { 239 AddIncomingResult::UnknownMapping 240 } else { 241 AddIncomingResult::Success { 242 row_id: r.try_get_u64(3)?, 243 valued_at: r.try_get_timestamp(4)?, 244 new: r.try_get(5)?, 245 pending: r.try_get(6)? 246 } 247 }) 248 }) 249 .fetch_one(db) 250 ) 251 } 252 253 pub async fn register_tx_in( 254 db: &mut PgConnection, 255 tx: &TxIn, 256 subject: &Option<IncomingSubject>, 257 now: &Timestamp, 258 ) -> sqlx::Result<AddIncomingResult> { 259 serialized!( 260 sqlx::query( 261 " 262 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 263 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 264 ", 265 ) 266 .bind(tx.transfer_id) 267 .bind(tx.tx_id) 268 .bind(tx.amount) 269 .bind(&tx.subject) 270 .bind(tx.debtor_id) 271 .bind(&tx.debtor_name) 272 .bind(tx.valued_at.as_microsecond()) 273 .bind(subject.as_ref().map(|it| it.ty())) 274 .bind(subject.as_ref().map(|it| it.key())) 275 .bind(now.as_microsecond()) 276 .try_map(|r: PgRow| { 277 Ok(if r.try_get_flag(0)? { 278 AddIncomingResult::ReservePubReuse 279 } else if r.try_get_flag(1)? { 280 AddIncomingResult::MappingReuse 281 } else if r.try_get_flag(2)? { 282 AddIncomingResult::UnknownMapping 283 } else { 284 AddIncomingResult::Success { 285 row_id: r.try_get_u64(3)?, 286 valued_at: r.try_get_timestamp(4)?, 287 new: r.try_get(5)?, 288 pending: r.try_get(6)? 289 } 290 }) 291 }) 292 .fetch_one(&mut *db) 293 ) 294 } 295 296 #[derive(Debug)] 297 pub enum TxOutKind { 298 Simple, 299 Bounce(i64), 300 Talerable(OutgoingSubject), 301 } 302 303 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 304 #[allow(non_camel_case_types)] 305 #[sqlx(type_name = "register_result")] 306 pub enum RegisterResult { 307 /// Already registered 308 idempotent, 309 /// Initiated transaction 310 known, 311 /// Recovered unknown outgoing transaction 312 recovered, 313 } 314 315 #[derive(Debug, PartialEq, Eq)] 316 pub struct AddOutgoingResult { 317 pub result: RegisterResult, 318 pub row_id: i64, 319 } 320 321 pub async fn register_tx_out( 322 db: &mut PgConnection, 323 tx: &TxOut, 324 kind: &TxOutKind, 325 now: &Timestamp, 326 ) -> sqlx::Result<AddOutgoingResult> { 327 serialized!({ 328 let query = sqlx::query( 329 " 330 SELECT out_result, out_tx_row_id 331 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) 332 ", 333 ) 334 .bind(tx.transfer_id) 335 .bind(tx.tx_id) 336 .bind(tx.amount) 337 .bind(&tx.subject) 338 .bind(tx.creditor_id) 339 .bind(&tx.creditor_name) 340 .bind_timestamp(&tx.valued_at); 341 let query = match kind { 342 TxOutKind::Simple => query 343 .bind(None::<&[u8]>) 344 .bind(None::<&str>) 345 .bind(None::<&str>) 346 .bind(None::<i64>), 347 TxOutKind::Bounce(bounced) => query 348 .bind(None::<&[u8]>) 349 .bind(None::<&str>) 350 .bind(None::<&str>) 351 .bind(*bounced), 352 TxOutKind::Talerable(subject) => query 353 .bind(&subject.wtid) 354 .bind(subject.exchange_base_url.as_str()) 355 .bind(&subject.metadata) 356 .bind(None::<i64>), 357 }; 358 query 359 .bind_timestamp(now) 360 .try_map(|r: PgRow| { 361 Ok(AddOutgoingResult { 362 result: r.try_get(0)?, 363 row_id: r.try_get(1)?, 364 }) 365 }) 366 .fetch_one(&mut *db) 367 }) 368 } 369 370 #[derive(Debug, PartialEq, Eq)] 371 pub enum TransferResult { 372 Success { id: u64, initiated_at: Timestamp }, 373 RequestUidReuse, 374 WtidReuse, 375 } 376 377 #[derive(Debug, Clone)] 378 pub struct Transfer { 379 pub request_uid: HashCode, 380 pub amount: Decimal, 381 pub exchange_base_url: Url, 382 pub metadata: Option<CompactString>, 383 pub wtid: ShortHashCode, 384 pub creditor_id: i64, 385 pub creditor_name: CompactString, 386 } 387 388 pub async fn make_transfer( 389 db: &PgPool, 390 tx: &Transfer, 391 now: &Timestamp, 392 ) -> sqlx::Result<TransferResult> { 393 let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref()); 394 serialized!( 395 sqlx::query( 396 " 397 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 398 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9) 399 ", 400 ) 401 .bind(&tx.request_uid) 402 .bind(&tx.wtid) 403 .bind(&subject) 404 .bind(tx.amount) 405 .bind(tx.exchange_base_url.as_str()) 406 .bind(&tx.metadata) 407 .bind(tx.creditor_id) 408 .bind(&tx.creditor_name) 409 .bind_timestamp(now) 410 .try_map(|r: PgRow| { 411 Ok(if r.try_get_flag(0)? { 412 TransferResult::RequestUidReuse 413 } else if r.try_get_flag(1)? { 414 TransferResult::WtidReuse 415 } else { 416 TransferResult::Success { 417 id: r.try_get_u64(2)?, 418 initiated_at: r.try_get_timestamp(3)?, 419 } 420 }) 421 }) 422 .fetch_one(db) 423 ) 424 } 425 426 #[derive(Debug, PartialEq, Eq)] 427 pub struct BounceResult { 428 pub tx_id: u64, 429 pub tx_new: bool, 430 } 431 432 pub async fn register_bounced_tx_in( 433 db: &mut PgConnection, 434 tx: &TxIn, 435 chargeback_id: i64, 436 reason: &str, 437 now: &Timestamp, 438 ) -> sqlx::Result<BounceResult> { 439 serialized!( 440 sqlx::query( 441 " 442 SELECT out_tx_row_id, out_tx_new 443 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 444 ", 445 ) 446 .bind(tx.transfer_id) 447 .bind(tx.tx_id) 448 .bind(tx.amount) 449 .bind(&tx.subject) 450 .bind(tx.debtor_id) 451 .bind(&tx.debtor_name) 452 .bind_timestamp(&tx.valued_at) 453 .bind(chargeback_id) 454 .bind(reason) 455 .bind_timestamp(now) 456 .try_map(|r: PgRow| { 457 Ok(BounceResult { 458 tx_id: r.try_get_u64(0)?, 459 tx_new: r.try_get(1)?, 460 }) 461 }) 462 .fetch_one(&mut *db) 463 ) 464 } 465 466 pub async fn transfer_page( 467 db: &PgPool, 468 status: &Option<TransferState>, 469 currency: &Currency, 470 root: &CompactString, 471 params: &Page, 472 ) -> sqlx::Result<Vec<TransferListStatus>> { 473 page( 474 db, 475 params, 476 "initiated_id", 477 || { 478 let mut builder = QueryBuilder::new( 479 " 480 SELECT 481 initiated_id, 482 status, 483 amount, 484 credit_account, 485 credit_name, 486 initiated_at 487 FROM transfer 488 JOIN initiated USING (initiated_id) 489 WHERE 490 ", 491 ); 492 if let Some(status) = status { 493 builder.push(" status = ").push_bind(status).push(" AND "); 494 } 495 builder 496 }, 497 |r: PgRow| { 498 Ok(TransferListStatus { 499 row_id: r.try_get_u64(0)?, 500 status: r.try_get(1)?, 501 amount: r.try_get_amount(2, currency)?, 502 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 503 timestamp: r.try_get_timestamp(5)?.into(), 504 }) 505 }, 506 ) 507 .await 508 } 509 510 pub async fn outgoing_history( 511 db: &PgPool, 512 params: &History, 513 currency: &Currency, 514 root: &CompactString, 515 listen: impl FnOnce() -> Receiver<i64>, 516 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 517 history( 518 db, 519 "tx_out_id", 520 params, 521 listen, 522 || { 523 QueryBuilder::new( 524 " 525 SELECT 526 tx_out_id, 527 amount, 528 credit_account, 529 credit_name, 530 valued_at, 531 exchange_base_url, 532 metadata, 533 wtid 534 FROM taler_out 535 JOIN tx_out USING (tx_out_id) 536 WHERE 537 ", 538 ) 539 }, 540 |r: PgRow| { 541 Ok(OutgoingBankTransaction { 542 row_id: r.try_get_u64(0)?, 543 amount: r.try_get_amount(1, currency)?, 544 debit_fee: None, 545 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?, 546 date: r.try_get_timestamp(4)?.into(), 547 exchange_base_url: r.try_get_url(5)?, 548 metadata: r.try_get(6)?, 549 wtid: r.try_get(7)?, 550 }) 551 }, 552 ) 553 .await 554 } 555 556 pub async fn incoming_history( 557 db: &PgPool, 558 params: &History, 559 currency: &Currency, 560 root: &CompactString, 561 listen: impl FnOnce() -> Receiver<i64>, 562 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 563 history( 564 db, 565 "tx_in_id", 566 params, 567 listen, 568 || { 569 QueryBuilder::new( 570 " 571 SELECT 572 type, 573 tx_in_id, 574 amount, 575 debit_account, 576 debit_name, 577 valued_at, 578 metadata, 579 authorization_pub, 580 authorization_sig 581 FROM taler_in 582 JOIN tx_in USING (tx_in_id) 583 WHERE 584 ", 585 ) 586 }, 587 |r: PgRow| { 588 Ok(match r.try_get(0)? { 589 IncomingType::reserve => IncomingBankTransaction::Reserve { 590 row_id: r.try_get_u64(1)?, 591 amount: r.try_get_amount(2, currency)?, 592 credit_fee: None, 593 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 594 date: r.try_get_timestamp(5)?.into(), 595 reserve_pub: r.try_get(6)?, 596 authorization_pub: r.try_get(7)?, 597 authorization_sig: r.try_get(8)?, 598 }, 599 IncomingType::kyc => IncomingBankTransaction::Kyc { 600 row_id: r.try_get_u64(1)?, 601 amount: r.try_get_amount(2, currency)?, 602 credit_fee: None, 603 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 604 date: r.try_get_timestamp(5)?.into(), 605 account_pub: r.try_get(6)?, 606 authorization_pub: r.try_get(7)?, 607 authorization_sig: r.try_get(8)?, 608 }, 609 IncomingType::map => unimplemented!("MAP are never listed in the history"), 610 }) 611 }, 612 ) 613 .await 614 } 615 616 pub async fn revenue_history( 617 db: &PgPool, 618 params: &History, 619 currency: &Currency, 620 root: &CompactString, 621 listen: impl FnOnce() -> Receiver<i64>, 622 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 623 history( 624 db, 625 "tx_in_id", 626 params, 627 listen, 628 || { 629 QueryBuilder::new( 630 " 631 SELECT 632 tx_in_id, 633 valued_at, 634 amount, 635 debit_account, 636 debit_name, 637 subject 638 FROM tx_in 639 WHERE 640 ", 641 ) 642 }, 643 |r: PgRow| { 644 Ok(RevenueIncomingBankTransaction { 645 row_id: r.try_get_u64(0)?, 646 date: r.try_get_timestamp(1)?.into(), 647 amount: r.try_get_amount(2, currency)?, 648 credit_fee: None, 649 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 650 subject: r.try_get(5)?, 651 }) 652 }, 653 ) 654 .await 655 } 656 657 pub async fn transfer_by_id( 658 db: &PgPool, 659 id: u64, 660 currency: &Currency, 661 root: &CompactString, 662 ) -> sqlx::Result<Option<TransferStatus>> { 663 serialized!( 664 sqlx::query( 665 " 666 SELECT 667 status, 668 status_msg, 669 amount, 670 exchange_base_url, 671 metadata, 672 wtid, 673 credit_account, 674 credit_name, 675 initiated_at 676 FROM transfer 677 JOIN initiated USING (initiated_id) 678 WHERE initiated_id = $1 679 ", 680 ) 681 .bind(id as i64) 682 .try_map(|r: PgRow| { 683 Ok(TransferStatus { 684 status: r.try_get(0)?, 685 status_msg: r.try_get(1)?, 686 amount: r.try_get_amount(2, currency)?, 687 origin_exchange_url: r.try_get(3)?, 688 metadata: r.try_get(4)?, 689 wtid: r.try_get(5)?, 690 credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?, 691 timestamp: r.try_get_timestamp(8)?.into(), 692 }) 693 }) 694 .fetch_optional(db) 695 ) 696 } 697 698 /** Get a batch of pending initiated transactions not attempted since [start] */ 699 pub async fn pending_batch( 700 db: &mut PgConnection, 701 start: &Timestamp, 702 ) -> sqlx::Result<Vec<Initiated>> { 703 serialized!( 704 sqlx::query( 705 " 706 SELECT initiated_id, amount, subject, credit_account, credit_name 707 FROM initiated 708 WHERE tx_id IS NULL 709 AND status='pending' 710 AND (last_submitted IS NULL OR last_submitted < $1) 711 LIMIT 100 712 ", 713 ) 714 .bind_timestamp(start) 715 .try_map(|r: PgRow| { 716 Ok(Initiated { 717 id: r.try_get(0)?, 718 amount: r.try_get(1)?, 719 subject: r.try_get(2)?, 720 creditor_id: r.try_get(3)?, 721 creditor_name: r.try_get(4)?, 722 }) 723 }) 724 .fetch_all(&mut *db) 725 ) 726 } 727 728 /** Update status of a successful submitted initiated transaction */ 729 pub async fn initiated_submit_success( 730 db: &mut PgConnection, 731 initiated_id: i64, 732 timestamp: &Timestamp, 733 tx_id: i64, 734 ) -> sqlx::Result<()> { 735 serialized!( 736 sqlx::query( 737 " 738 UPDATE initiated 739 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2 740 WHERE initiated_id=$3 741 " 742 ) 743 .bind_timestamp(timestamp) 744 .bind(tx_id) 745 .bind(initiated_id) 746 .execute(&mut *db) 747 )?; 748 Ok(()) 749 } 750 751 /** Update status of a permanently failed initiated transaction */ 752 pub async fn initiated_submit_permanent_failure( 753 db: &mut PgConnection, 754 initiated_id: i64, 755 msg: &str, 756 ) -> sqlx::Result<()> { 757 serialized!( 758 sqlx::query( 759 " 760 UPDATE initiated 761 SET status='permanent_failure', status_msg=$1 762 WHERE initiated_id=$2 763 ", 764 ) 765 .bind(msg) 766 .bind(initiated_id) 767 .execute(&mut *db) 768 )?; 769 Ok(()) 770 } 771 772 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 773 pub enum ChargebackFailureResult { 774 Unknown, 775 Known(u64), 776 Idempotent(u64), 777 } 778 779 /** Update status of a charged back initiated transaction */ 780 pub async fn initiated_chargeback_failure( 781 db: &mut PgConnection, 782 transfer_id: i64, 783 ) -> sqlx::Result<ChargebackFailureResult> { 784 Ok(serialized!( 785 sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)") 786 .bind(transfer_id) 787 .try_map(|r: PgRow| { 788 let id = r.try_get_u64(0)?; 789 Ok(if id == 0 { 790 ChargebackFailureResult::Unknown 791 } else if r.try_get(1)? { 792 ChargebackFailureResult::Known(id) 793 } else { 794 ChargebackFailureResult::Idempotent(id) 795 }) 796 }) 797 .fetch_optional(&mut *db) 798 )? 799 .unwrap_or(ChargebackFailureResult::Unknown)) 800 } 801 802 /** Get JSON value from KV table */ 803 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>( 804 db: &mut PgConnection, 805 key: &str, 806 ) -> sqlx::Result<Option<T>> { 807 serialized!( 808 sqlx::query("SELECT value FROM kv WHERE key=$1") 809 .bind(key) 810 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 811 .fetch_optional(&mut *db) 812 ) 813 } 814 815 /** Set JSON value in KV table */ 816 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> { 817 serialized!( 818 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 819 .bind(key) 820 .bind(sqlx::types::Json(value)) 821 .execute(&mut *db) 822 )?; 823 Ok(()) 824 } 825 826 pub enum RegistrationResult { 827 Success, 828 ReservePubReuse, 829 } 830 831 pub async fn transfer_register( 832 db: &PgPool, 833 req: &RegistrationRequest, 834 ) -> sqlx::Result<RegistrationResult> { 835 let ty: IncomingType = req.r#type.into(); 836 serialized!( 837 sqlx::query( 838 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 839 ) 840 .bind(ty) 841 .bind(&req.account_pub) 842 .bind(&req.authorization_pub) 843 .bind(&req.authorization_sig) 844 .bind(req.recurrent) 845 .bind_timestamp(&Timestamp::now()) 846 .try_map(|r: PgRow| { 847 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 848 RegistrationResult::ReservePubReuse 849 } else { 850 RegistrationResult::Success 851 }) 852 }) 853 .fetch_one(db) 854 ) 855 } 856 857 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 858 serialized!( 859 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)") 860 .bind(&req.authorization_pub) 861 .try_map(|r: PgRow| r.try_get_flag("out_found")) 862 .fetch_one(db) 863 ) 864 } 865 866 pub trait CyclosTypeHelper { 867 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 868 &self, 869 idx: I, 870 name: I, 871 root: &CompactString, 872 ) -> sqlx::Result<FullCyclosPayto>; 873 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 874 &self, 875 idx: I, 876 name: I, 877 root: &CompactString, 878 ) -> sqlx::Result<PaytoURI>; 879 } 880 881 impl CyclosTypeHelper for PgRow { 882 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 883 &self, 884 idx: I, 885 name: I, 886 root: &CompactString, 887 ) -> sqlx::Result<FullCyclosPayto> { 888 let idx = self.try_get(idx)?; 889 let name = self.try_get(name)?; 890 Ok(FullCyclosPayto::new( 891 CyclosAccount { 892 id: CyclosId(idx), 893 root: root.clone(), 894 }, 895 name, 896 )) 897 } 898 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 899 &self, 900 idx: I, 901 name: I, 902 root: &CompactString, 903 ) -> sqlx::Result<PaytoURI> { 904 let idx = self.try_get(idx)?; 905 let name = self.try_get(name)?; 906 Ok(CyclosAccount { 907 id: CyclosId(idx), 908 root: root.clone(), 909 } 910 .as_full_uri(name)) 911 } 912 } 913 914 #[cfg(test)] 915 mod test { 916 use compact_str::CompactString; 917 use jiff::{Span, Timestamp}; 918 use serde_json::json; 919 use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow}; 920 use taler_api::{ 921 db::TypeHelper, 922 notification::dummy_listen, 923 subject::{IncomingSubject, OutgoingSubject}, 924 }; 925 use taler_common::{ 926 api::{ 927 EddsaPublicKey, HashCode, ShortHashCode, 928 params::{History, Page}, 929 wire::TransferState, 930 }, 931 types::{ 932 amount::{Currency, decimal}, 933 url, 934 utils::now_sql_stable_ts, 935 }, 936 }; 937 938 use crate::{ 939 constants::CONFIG_SOURCE, 940 db::{ 941 self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult, 942 Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind, 943 }, 944 }; 945 946 pub const CURR: Currency = Currency::TEST; 947 pub const ROOT: CompactString = CompactString::const_new("localhost"); 948 949 async fn setup() -> (PoolConnection<Postgres>, PgPool) { 950 taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await 951 } 952 953 #[tokio::test] 954 async fn kv() { 955 let (mut db, _) = setup().await; 956 957 let value = json!({ 958 "name": "Mr Smith", 959 "no way": 32 960 }); 961 962 assert_eq!( 963 db::kv_get::<serde_json::Value>(&mut db, "value") 964 .await 965 .unwrap(), 966 None 967 ); 968 db::kv_set(&mut db, "value", &value).await.unwrap(); 969 db::kv_set(&mut db, "value", &value).await.unwrap(); 970 assert_eq!( 971 db::kv_get::<serde_json::Value>(&mut db, "value") 972 .await 973 .unwrap(), 974 Some(value) 975 ); 976 } 977 978 #[tokio::test] 979 async fn tx_in() { 980 let (mut db, pool) = setup().await; 981 982 let mut routine = async |first: &Option<IncomingSubject>, 983 second: &Option<IncomingSubject>| { 984 let id = sqlx::query("SELECT count(*) + 1 FROM tx_in") 985 .try_map(|r: PgRow| r.try_get_u64(0)) 986 .fetch_one(&mut *db) 987 .await 988 .unwrap(); 989 let now = now_sql_stable_ts(); 990 let later = now + Span::new().hours(2); 991 let tx = TxIn { 992 transfer_id: now.as_microsecond() as i64, 993 tx_id: None, 994 amount: decimal("10"), 995 subject: "subject".to_owned(), 996 debtor_id: 31000163100000000, 997 debtor_name: "Name".into(), 998 valued_at: now, 999 }; 1000 // Insert 1001 assert_eq!( 1002 db::register_tx_in(&mut db, &tx, first, &now) 1003 .await 1004 .expect("register tx in"), 1005 AddIncomingResult::Success { 1006 new: true, 1007 pending: false, 1008 row_id: id, 1009 valued_at: now, 1010 } 1011 ); 1012 // Idempotent 1013 assert_eq!( 1014 db::register_tx_in( 1015 &mut db, 1016 &TxIn { 1017 valued_at: later, 1018 ..tx.clone() 1019 }, 1020 first, 1021 &now 1022 ) 1023 .await 1024 .expect("register tx in"), 1025 AddIncomingResult::Success { 1026 new: false, 1027 pending: false, 1028 row_id: id, 1029 valued_at: now 1030 } 1031 ); 1032 // Many 1033 assert_eq!( 1034 db::register_tx_in( 1035 &mut db, 1036 &TxIn { 1037 transfer_id: later.as_microsecond() as i64, 1038 valued_at: later, 1039 ..tx 1040 }, 1041 second, 1042 &now 1043 ) 1044 .await 1045 .expect("register tx in"), 1046 AddIncomingResult::Success { 1047 new: true, 1048 pending: false, 1049 row_id: id + 1, 1050 valued_at: later 1051 } 1052 ); 1053 }; 1054 1055 // Empty db 1056 assert_eq!( 1057 db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1058 .await 1059 .unwrap(), 1060 Vec::new() 1061 ); 1062 assert_eq!( 1063 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1064 .await 1065 .unwrap(), 1066 Vec::new() 1067 ); 1068 1069 // Regular transaction 1070 routine(&None, &None).await; 1071 1072 // Reserve transaction 1073 routine( 1074 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1075 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1076 ) 1077 .await; 1078 1079 // Kyc transaction 1080 routine( 1081 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1082 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1083 ) 1084 .await; 1085 1086 // History 1087 assert_eq!( 1088 db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1089 .await 1090 .unwrap() 1091 .len(), 1092 6 1093 ); 1094 assert_eq!( 1095 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1096 .await 1097 .unwrap() 1098 .len(), 1099 4 1100 ); 1101 } 1102 1103 #[tokio::test] 1104 async fn tx_in_admin() { 1105 let (_, pool) = setup().await; 1106 1107 // Empty db 1108 assert_eq!( 1109 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1110 .await 1111 .unwrap(), 1112 Vec::new() 1113 ); 1114 1115 let now = now_sql_stable_ts(); 1116 let later = now + Span::new().hours(2); 1117 let tx = TxInAdmin { 1118 amount: decimal("10"), 1119 subject: "subject".to_owned(), 1120 debtor_id: 31000163100000000, 1121 debtor_name: "Name".into(), 1122 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1123 }; 1124 // Insert 1125 assert_eq!( 1126 db::register_tx_in_admin(&pool, &tx, &now) 1127 .await 1128 .expect("register tx in"), 1129 AddIncomingResult::Success { 1130 new: true, 1131 pending: false, 1132 row_id: 1, 1133 valued_at: now 1134 } 1135 ); 1136 // Many 1137 assert_eq!( 1138 db::register_tx_in_admin( 1139 &pool, 1140 &TxInAdmin { 1141 subject: "Other".to_owned(), 1142 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1143 ..tx.clone() 1144 }, 1145 &later 1146 ) 1147 .await 1148 .expect("register tx in"), 1149 AddIncomingResult::Success { 1150 new: true, 1151 pending: false, 1152 row_id: 2, 1153 valued_at: later 1154 } 1155 ); 1156 1157 // History 1158 assert_eq!( 1159 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1160 .await 1161 .unwrap() 1162 .len(), 1163 2 1164 ); 1165 } 1166 1167 #[tokio::test] 1168 async fn tx_out() { 1169 let (mut db, pool) = setup().await; 1170 1171 let mut routine = async |first: &TxOutKind, second: &TxOutKind| { 1172 let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out") 1173 .try_map(|r: PgRow| r.try_get(0)) 1174 .fetch_one(&mut *db) 1175 .await 1176 .unwrap(); 1177 let now = now_sql_stable_ts(); 1178 let later = now + Span::new().hours(2); 1179 let tx = TxOut { 1180 transfer_id, 1181 tx_id: Some(transfer_id), 1182 amount: decimal("10"), 1183 subject: "subject".to_owned(), 1184 creditor_id: 31000163100000000, 1185 creditor_name: "Name".into(), 1186 valued_at: now, 1187 }; 1188 assert!(matches!( 1189 db::make_transfer( 1190 &pool, 1191 &Transfer { 1192 request_uid: HashCode::rand(), 1193 amount: decimal("10"), 1194 exchange_base_url: url("https://exchange.test.com/"), 1195 metadata: None, 1196 wtid: ShortHashCode::rand(), 1197 creditor_id: 31000163100000000, 1198 creditor_name: "Name".into() 1199 }, 1200 &now 1201 ) 1202 .await 1203 .unwrap(), 1204 TransferResult::Success { .. } 1205 )); 1206 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id) 1207 .await 1208 .expect("status success"); 1209 1210 // Insert 1211 assert_eq!( 1212 db::register_tx_out(&mut db, &tx, first, &now) 1213 .await 1214 .expect("register tx out"), 1215 AddOutgoingResult { 1216 result: db::RegisterResult::known, 1217 row_id: transfer_id, 1218 } 1219 ); 1220 // Idempotent 1221 assert_eq!( 1222 db::register_tx_out( 1223 &mut db, 1224 &TxOut { 1225 valued_at: later, 1226 ..tx.clone() 1227 }, 1228 first, 1229 &now 1230 ) 1231 .await 1232 .expect("register tx out"), 1233 AddOutgoingResult { 1234 result: db::RegisterResult::idempotent, 1235 row_id: transfer_id, 1236 } 1237 ); 1238 // Recovered 1239 assert_eq!( 1240 db::register_tx_out( 1241 &mut db, 1242 &TxOut { 1243 transfer_id: transfer_id + 1, 1244 tx_id: Some(transfer_id + 1), 1245 valued_at: later, 1246 ..tx.clone() 1247 }, 1248 second, 1249 &now 1250 ) 1251 .await 1252 .expect("register tx out"), 1253 AddOutgoingResult { 1254 result: db::RegisterResult::recovered, 1255 row_id: transfer_id + 1, 1256 } 1257 ); 1258 }; 1259 1260 // Empty db 1261 assert_eq!( 1262 db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1263 .await 1264 .unwrap(), 1265 Vec::new() 1266 ); 1267 1268 // Regular transaction 1269 routine(&TxOutKind::Simple, &TxOutKind::Simple).await; 1270 1271 // Talerable transaction 1272 routine( 1273 &TxOutKind::Talerable(OutgoingSubject::rand()), 1274 &TxOutKind::Talerable(OutgoingSubject::rand()), 1275 ) 1276 .await; 1277 1278 // Bounced transaction 1279 routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1280 1281 // History 1282 assert_eq!( 1283 db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1284 .await 1285 .unwrap() 1286 .len(), 1287 2 1288 ); 1289 } 1290 1291 // TODO tx out failure 1292 1293 #[tokio::test] 1294 async fn transfer() { 1295 let (_, pool) = setup().await; 1296 1297 // Empty db 1298 assert_eq!( 1299 db::transfer_by_id(&pool, 0, &CURR, &ROOT).await.unwrap(), 1300 None 1301 ); 1302 assert_eq!( 1303 db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default()) 1304 .await 1305 .unwrap(), 1306 Vec::new() 1307 ); 1308 1309 let req = Transfer { 1310 request_uid: HashCode::rand(), 1311 amount: decimal("10"), 1312 exchange_base_url: url("https://exchange.test.com/"), 1313 metadata: None, 1314 wtid: ShortHashCode::rand(), 1315 creditor_id: 31000163100000000, 1316 creditor_name: "Name".into(), 1317 }; 1318 let now = now_sql_stable_ts(); 1319 let later = now + Span::new().hours(2); 1320 // Insert 1321 assert_eq!( 1322 db::make_transfer(&pool, &req, &now) 1323 .await 1324 .expect("transfer"), 1325 TransferResult::Success { 1326 id: 1, 1327 initiated_at: now 1328 } 1329 ); 1330 // Idempotent 1331 assert_eq!( 1332 db::make_transfer(&pool, &req, &later) 1333 .await 1334 .expect("transfer"), 1335 TransferResult::Success { 1336 id: 1, 1337 initiated_at: now 1338 } 1339 ); 1340 // Request UID reuse 1341 assert_eq!( 1342 db::make_transfer( 1343 &pool, 1344 &Transfer { 1345 wtid: ShortHashCode::rand(), 1346 ..req.clone() 1347 }, 1348 &now 1349 ) 1350 .await 1351 .expect("transfer"), 1352 TransferResult::RequestUidReuse 1353 ); 1354 // wtid reuse 1355 assert_eq!( 1356 db::make_transfer( 1357 &pool, 1358 &Transfer { 1359 request_uid: HashCode::rand(), 1360 ..req.clone() 1361 }, 1362 &now 1363 ) 1364 .await 1365 .expect("transfer"), 1366 TransferResult::WtidReuse 1367 ); 1368 // Many 1369 assert_eq!( 1370 db::make_transfer( 1371 &pool, 1372 &Transfer { 1373 request_uid: HashCode::rand(), 1374 wtid: ShortHashCode::rand(), 1375 ..req 1376 }, 1377 &later 1378 ) 1379 .await 1380 .expect("transfer"), 1381 TransferResult::Success { 1382 id: 2, 1383 initiated_at: later 1384 } 1385 ); 1386 1387 // Get 1388 assert!( 1389 db::transfer_by_id(&pool, 1, &CURR, &ROOT) 1390 .await 1391 .unwrap() 1392 .is_some() 1393 ); 1394 assert!( 1395 db::transfer_by_id(&pool, 2, &CURR, &ROOT) 1396 .await 1397 .unwrap() 1398 .is_some() 1399 ); 1400 assert!( 1401 db::transfer_by_id(&pool, 3, &CURR, &ROOT) 1402 .await 1403 .unwrap() 1404 .is_none() 1405 ); 1406 assert_eq!( 1407 db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default()) 1408 .await 1409 .unwrap() 1410 .len(), 1411 2 1412 ); 1413 } 1414 1415 #[tokio::test] 1416 async fn bounce() { 1417 let (mut db, _) = setup().await; 1418 1419 let amount = decimal("10"); 1420 let now = now_sql_stable_ts(); 1421 1422 // Bounce 1423 assert_eq!( 1424 db::register_bounced_tx_in( 1425 &mut db, 1426 &TxIn { 1427 transfer_id: 12, 1428 tx_id: None, 1429 amount, 1430 subject: "subject".to_owned(), 1431 debtor_id: 31000163100000000, 1432 debtor_name: "Name".into(), 1433 valued_at: now 1434 }, 1435 22, 1436 "good reason", 1437 &now 1438 ) 1439 .await 1440 .expect("bounce"), 1441 BounceResult { 1442 tx_id: 1, 1443 tx_new: true 1444 } 1445 ); 1446 // Idempotent 1447 assert_eq!( 1448 db::register_bounced_tx_in( 1449 &mut db, 1450 &TxIn { 1451 transfer_id: 12, 1452 tx_id: None, 1453 amount, 1454 subject: "subject".to_owned(), 1455 debtor_id: 31000163100000000, 1456 debtor_name: "Name".into(), 1457 valued_at: now 1458 }, 1459 22, 1460 "good reason", 1461 &now 1462 ) 1463 .await 1464 .expect("bounce"), 1465 BounceResult { 1466 tx_id: 1, 1467 tx_new: false 1468 } 1469 ); 1470 1471 // Many 1472 assert_eq!( 1473 db::register_bounced_tx_in( 1474 &mut db, 1475 &TxIn { 1476 transfer_id: 13, 1477 tx_id: None, 1478 amount, 1479 subject: "subject".to_owned(), 1480 debtor_id: 31000163100000000, 1481 debtor_name: "Name".into(), 1482 valued_at: now 1483 }, 1484 23, 1485 "good reason", 1486 &now 1487 ) 1488 .await 1489 .expect("bounce"), 1490 BounceResult { 1491 tx_id: 2, 1492 tx_new: true 1493 } 1494 ); 1495 } 1496 1497 #[tokio::test] 1498 async fn status() { 1499 let (mut db, pool) = setup().await; 1500 1501 let check_status = async |id: u64, status: TransferState, msg: Option<&str>| { 1502 let transfer = db::transfer_by_id(&pool, id, &CURR, &ROOT) 1503 .await 1504 .unwrap() 1505 .unwrap(); 1506 assert_eq!( 1507 (status, msg), 1508 (transfer.status, transfer.status_msg.as_deref()) 1509 ); 1510 }; 1511 1512 // Unknown transfer 1513 db::initiated_submit_permanent_failure(&mut db, 1, "msg") 1514 .await 1515 .unwrap(); 1516 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1517 .await 1518 .unwrap(); 1519 assert_eq!( 1520 db::initiated_chargeback_failure(&mut db, 1).await.unwrap(), 1521 ChargebackFailureResult::Unknown 1522 ); 1523 1524 // Failure 1525 db::make_transfer( 1526 &pool, 1527 &Transfer { 1528 request_uid: HashCode::rand(), 1529 amount: decimal("1"), 1530 exchange_base_url: url("https://exchange.test.com/"), 1531 metadata: None, 1532 wtid: ShortHashCode::rand(), 1533 creditor_id: 31000163100000000, 1534 creditor_name: "Name".into(), 1535 }, 1536 &Timestamp::now(), 1537 ) 1538 .await 1539 .expect("transfer"); 1540 check_status(1, TransferState::pending, None).await; 1541 db::initiated_submit_permanent_failure(&mut db, 1, "error status") 1542 .await 1543 .unwrap(); 1544 check_status(1, TransferState::permanent_failure, Some("error status")).await; 1545 1546 // Success 1547 db::make_transfer( 1548 &pool, 1549 &Transfer { 1550 request_uid: HashCode::rand(), 1551 amount: decimal("1"), 1552 exchange_base_url: url("https://exchange.test.com/"), 1553 metadata: None, 1554 wtid: ShortHashCode::rand(), 1555 creditor_id: 31000163100000000, 1556 creditor_name: "Name".into(), 1557 }, 1558 &Timestamp::now(), 1559 ) 1560 .await 1561 .expect("transfer"); 1562 check_status(2, TransferState::pending, None).await; 1563 db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3) 1564 .await 1565 .unwrap(); 1566 check_status(2, TransferState::pending, None).await; 1567 db::register_tx_out( 1568 &mut db, 1569 &TxOut { 1570 transfer_id: 5, 1571 tx_id: Some(3), 1572 amount: decimal("2"), 1573 subject: "".to_string(), 1574 creditor_id: 31000163100000000, 1575 creditor_name: "Name".into(), 1576 valued_at: Timestamp::now(), 1577 }, 1578 &TxOutKind::Simple, 1579 &Timestamp::now(), 1580 ) 1581 .await 1582 .unwrap(); 1583 check_status(2, TransferState::success, None).await; 1584 1585 // Chargeback 1586 assert_eq!( 1587 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1588 ChargebackFailureResult::Known(2) 1589 ); 1590 check_status(2, TransferState::late_failure, Some("charged back")).await; 1591 assert_eq!( 1592 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1593 ChargebackFailureResult::Idempotent(2) 1594 ); 1595 } 1596 1597 #[tokio::test] 1598 async fn batch() { 1599 let (mut db, pool) = setup().await; 1600 let start = Timestamp::now(); 1601 1602 // Empty db 1603 let pendings = db::pending_batch(&mut db, &start) 1604 .await 1605 .expect("pending_batch"); 1606 assert_eq!(pendings.len(), 0); 1607 1608 // Some transfers 1609 for i in 0..3 { 1610 db::make_transfer( 1611 &pool, 1612 &Transfer { 1613 request_uid: HashCode::rand(), 1614 amount: decimal(format!("{}", i + 1)), 1615 exchange_base_url: url("https://exchange.test.com/"), 1616 metadata: None, 1617 wtid: ShortHashCode::rand(), 1618 creditor_id: 31000163100000000, 1619 creditor_name: "Name".into(), 1620 }, 1621 &Timestamp::now(), 1622 ) 1623 .await 1624 .expect("transfer"); 1625 } 1626 let pendings = db::pending_batch(&mut db, &start) 1627 .await 1628 .expect("pending_batch"); 1629 assert_eq!(pendings.len(), 3); 1630 1631 // Max 100 txs in batch 1632 for i in 0..100 { 1633 db::make_transfer( 1634 &pool, 1635 &Transfer { 1636 request_uid: HashCode::rand(), 1637 amount: decimal(format!("{}", i + 1)), 1638 exchange_base_url: url("https://exchange.test.com/"), 1639 metadata: None, 1640 wtid: ShortHashCode::rand(), 1641 creditor_id: 31000163100000000, 1642 creditor_name: "Name".into(), 1643 }, 1644 &Timestamp::now(), 1645 ) 1646 .await 1647 .expect("transfer"); 1648 } 1649 let pendings = db::pending_batch(&mut db, &start) 1650 .await 1651 .expect("pending_batch"); 1652 assert_eq!(pendings.len(), 100); 1653 1654 // Skip uploaded 1655 for i in 0..=10 { 1656 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1657 .await 1658 .expect("status success"); 1659 } 1660 let pendings = db::pending_batch(&mut db, &start) 1661 .await 1662 .expect("pending_batch"); 1663 assert_eq!(pendings.len(), 93); 1664 1665 // Skip failed 1666 for i in 0..=10 { 1667 db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure") 1668 .await 1669 .expect("status failure"); 1670 } 1671 let pendings = db::pending_batch(&mut db, &start) 1672 .await 1673 .expect("pending_batch"); 1674 assert_eq!(pendings.len(), 83); 1675 } 1676 }