db.rs (49068B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use compact_str::CompactString; 20 use jiff::Timestamp; 21 use serde::{Serialize, de::DeserializeOwned}; 22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow}; 23 use taler_api::{ 24 db::{BindHelper, TypeHelper, history, page}, 25 serialized, 26 subject::{IncomingSubject, OutgoingSubject, fmt_out_subject}, 27 }; 28 use taler_common::{ 29 api_common::{HashCode, ShortHashCode}, 30 api_params::{History, Page}, 31 api_revenue::RevenueIncomingBankTransaction, 32 api_transfer::{RegistrationRequest, Unregistration}, 33 api_wire::{ 34 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, 35 TransferStatus, 36 }, 37 config::Config, 38 db::IncomingType, 39 types::{ 40 amount::{Currency, Decimal}, 41 payto::{PaytoImpl as _, PaytoURI}, 42 }, 43 }; 44 use tokio::sync::watch::{Receiver, Sender}; 45 use url::Url; 46 47 use crate::{ 48 config::parse_db_cfg, 49 payto::{CyclosAccount, CyclosId, FullCyclosPayto}, 50 }; 51 52 const SCHEMA: &str = "cyclos"; 53 54 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { 55 let db = parse_db_cfg(cfg)?; 56 let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; 57 Ok(pool) 58 } 59 60 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { 61 let db_cfg = parse_db_cfg(cfg)?; 62 let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; 63 let mut db = pool.acquire().await?; 64 taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?; 65 Ok(pool) 66 } 67 68 pub async fn notification_listener( 69 pool: PgPool, 70 in_channel: Sender<i64>, 71 taler_in_channel: Sender<i64>, 72 out_channel: Sender<i64>, 73 taler_out_channel: Sender<i64>, 74 ) -> sqlx::Result<()> { 75 taler_api::notification::notification_listener!(&pool, 76 "tx_in" => (row_id: i64) { 77 in_channel.send_replace(row_id); 78 }, 79 "taler_in" => (row_id: i64) { 80 taler_in_channel.send_replace(row_id); 81 }, 82 "tx_out" => (row_id: i64) { 83 out_channel.send_replace(row_id); 84 }, 85 "taler_out" => (row_id: i64) { 86 taler_out_channel.send_replace(row_id); 87 } 88 ) 89 } 90 91 #[derive(Debug, Clone)] 92 pub struct TxIn { 93 pub transfer_id: i64, 94 pub tx_id: Option<i64>, 95 pub amount: Decimal, 96 pub subject: String, 97 pub debtor_id: i64, 98 pub debtor_name: CompactString, 99 pub valued_at: Timestamp, 100 } 101 102 impl Display for TxIn { 103 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 104 let Self { 105 transfer_id, 106 tx_id, 107 amount, 108 subject, 109 valued_at, 110 debtor_id, 111 debtor_name, 112 } = self; 113 let tx_id = match tx_id { 114 Some(id) => format_args!(":{}", *id), 115 None => format_args!(""), 116 }; 117 write!( 118 f, 119 "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'" 120 ) 121 } 122 } 123 124 #[derive(Debug, Clone)] 125 pub struct TxOut { 126 pub transfer_id: i64, 127 pub tx_id: Option<i64>, 128 pub amount: Decimal, 129 pub subject: String, 130 pub creditor_id: i64, 131 pub creditor_name: CompactString, 132 pub valued_at: Timestamp, 133 } 134 135 impl Display for TxOut { 136 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 137 let Self { 138 transfer_id, 139 tx_id, 140 amount, 141 subject, 142 creditor_id, 143 creditor_name, 144 valued_at, 145 } = self; 146 let tx_id = match tx_id { 147 Some(id) => format_args!(":{}", *id), 148 None => format_args!(""), 149 }; 150 write!( 151 f, 152 "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 153 ) 154 } 155 } 156 157 #[derive(Debug, PartialEq, Eq)] 158 pub struct Initiated { 159 pub id: i64, 160 pub amount: Decimal, 161 pub subject: String, 162 pub creditor_id: i64, 163 pub creditor_name: CompactString, 164 } 165 166 impl Display for Initiated { 167 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 168 let Self { 169 id, 170 amount, 171 subject, 172 creditor_id, 173 creditor_name, 174 } = self; 175 write!( 176 f, 177 "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 178 ) 179 } 180 } 181 182 #[derive(Debug, Clone)] 183 pub struct TxInAdmin { 184 pub amount: Decimal, 185 pub subject: String, 186 pub debtor_id: i64, 187 pub debtor_name: CompactString, 188 pub metadata: IncomingSubject, 189 } 190 191 #[derive(Debug, PartialEq, Eq)] 192 pub enum AddIncomingResult { 193 Success { 194 new: bool, 195 pending: bool, 196 row_id: u64, 197 valued_at: Timestamp, 198 }, 199 ReservePubReuse, 200 UnknownMapping, 201 MappingReuse, 202 } 203 204 /// Lock the database for worker execution 205 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { 206 sqlx::query("SELECT pg_try_advisory_lock(42)") 207 .try_map(|r: PgRow| r.try_get(0)) 208 .fetch_one(e) 209 .await 210 } 211 212 pub async fn register_tx_in_admin( 213 db: &PgPool, 214 tx: &TxInAdmin, 215 now: &Timestamp, 216 ) -> sqlx::Result<AddIncomingResult> { 217 serialized!( 218 sqlx::query( 219 " 220 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 221 FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5) 222 ", 223 ) 224 .bind(tx.amount) 225 .bind(&tx.subject) 226 .bind(tx.debtor_id) 227 .bind(&tx.debtor_name) 228 .bind_timestamp(now) 229 .bind(tx.metadata.ty()) 230 .bind(tx.metadata.key()) 231 .try_map(|r: PgRow| { 232 Ok(if r.try_get_flag(0)? { 233 AddIncomingResult::ReservePubReuse 234 } else if r.try_get_flag(1)? { 235 AddIncomingResult::MappingReuse 236 } else if r.try_get_flag(2)? { 237 AddIncomingResult::UnknownMapping 238 } else { 239 AddIncomingResult::Success { 240 row_id: r.try_get_u64(3)?, 241 valued_at: r.try_get_timestamp(4)?, 242 new: r.try_get(5)?, 243 pending: r.try_get(6)? 244 } 245 }) 246 }) 247 .fetch_one(db) 248 ) 249 } 250 251 pub async fn register_tx_in( 252 db: &mut PgConnection, 253 tx: &TxIn, 254 subject: &Option<IncomingSubject>, 255 now: &Timestamp, 256 ) -> sqlx::Result<AddIncomingResult> { 257 serialized!( 258 sqlx::query( 259 " 260 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 261 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 262 ", 263 ) 264 .bind(tx.transfer_id) 265 .bind(tx.tx_id) 266 .bind(tx.amount) 267 .bind(&tx.subject) 268 .bind(tx.debtor_id) 269 .bind(&tx.debtor_name) 270 .bind(tx.valued_at.as_microsecond()) 271 .bind(subject.as_ref().map(|it| it.ty())) 272 .bind(subject.as_ref().map(|it| it.key())) 273 .bind(now.as_microsecond()) 274 .try_map(|r: PgRow| { 275 Ok(if r.try_get_flag(0)? { 276 AddIncomingResult::ReservePubReuse 277 } else if r.try_get_flag(1)? { 278 AddIncomingResult::MappingReuse 279 } else if r.try_get_flag(2)? { 280 AddIncomingResult::UnknownMapping 281 } else { 282 AddIncomingResult::Success { 283 row_id: r.try_get_u64(3)?, 284 valued_at: r.try_get_timestamp(4)?, 285 new: r.try_get(5)?, 286 pending: r.try_get(6)? 287 } 288 }) 289 }) 290 .fetch_one(&mut *db) 291 ) 292 } 293 294 #[derive(Debug)] 295 pub enum TxOutKind { 296 Simple, 297 Bounce(i64), 298 Talerable(OutgoingSubject), 299 } 300 301 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 302 #[allow(non_camel_case_types)] 303 #[sqlx(type_name = "register_result")] 304 pub enum RegisterResult { 305 /// Already registered 306 idempotent, 307 /// Initiated transaction 308 known, 309 /// Recovered unknown outgoing transaction 310 recovered, 311 } 312 313 #[derive(Debug, PartialEq, Eq)] 314 pub struct AddOutgoingResult { 315 pub result: RegisterResult, 316 pub row_id: i64, 317 } 318 319 pub async fn register_tx_out( 320 db: &mut PgConnection, 321 tx: &TxOut, 322 kind: &TxOutKind, 323 now: &Timestamp, 324 ) -> sqlx::Result<AddOutgoingResult> { 325 serialized!({ 326 let query = sqlx::query( 327 " 328 SELECT out_result, out_tx_row_id 329 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12) 330 ", 331 ) 332 .bind(tx.transfer_id) 333 .bind(tx.tx_id) 334 .bind(tx.amount) 335 .bind(&tx.subject) 336 .bind(tx.creditor_id) 337 .bind(&tx.creditor_name) 338 .bind_timestamp(&tx.valued_at); 339 let query = match kind { 340 TxOutKind::Simple => query 341 .bind(None::<&[u8]>) 342 .bind(None::<&str>) 343 .bind(None::<&str>) 344 .bind(None::<i64>), 345 TxOutKind::Bounce(bounced) => query 346 .bind(None::<&[u8]>) 347 .bind(None::<&str>) 348 .bind(None::<&str>) 349 .bind(*bounced), 350 TxOutKind::Talerable(subject) => query 351 .bind(&subject.wtid) 352 .bind(subject.exchange_base_url.as_str()) 353 .bind(&subject.metadata) 354 .bind(None::<i64>), 355 }; 356 query 357 .bind_timestamp(now) 358 .try_map(|r: PgRow| { 359 Ok(AddOutgoingResult { 360 result: r.try_get(0)?, 361 row_id: r.try_get(1)?, 362 }) 363 }) 364 .fetch_one(&mut *db) 365 }) 366 } 367 368 #[derive(Debug, PartialEq, Eq)] 369 pub enum TransferResult { 370 Success { id: u64, initiated_at: Timestamp }, 371 RequestUidReuse, 372 WtidReuse, 373 } 374 375 #[derive(Debug, Clone)] 376 pub struct Transfer { 377 pub request_uid: HashCode, 378 pub amount: Decimal, 379 pub exchange_base_url: Url, 380 pub metadata: Option<CompactString>, 381 pub wtid: ShortHashCode, 382 pub creditor_id: i64, 383 pub creditor_name: CompactString, 384 } 385 386 pub async fn make_transfer( 387 db: &PgPool, 388 tx: &Transfer, 389 now: &Timestamp, 390 ) -> sqlx::Result<TransferResult> { 391 let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref()); 392 serialized!( 393 sqlx::query( 394 " 395 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 396 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9) 397 ", 398 ) 399 .bind(&tx.request_uid) 400 .bind(&tx.wtid) 401 .bind(&subject) 402 .bind(tx.amount) 403 .bind(tx.exchange_base_url.as_str()) 404 .bind(&tx.metadata) 405 .bind(tx.creditor_id) 406 .bind(&tx.creditor_name) 407 .bind_timestamp(now) 408 .try_map(|r: PgRow| { 409 Ok(if r.try_get_flag(0)? { 410 TransferResult::RequestUidReuse 411 } else if r.try_get_flag(1)? { 412 TransferResult::WtidReuse 413 } else { 414 TransferResult::Success { 415 id: r.try_get_u64(2)?, 416 initiated_at: r.try_get_timestamp(3)?, 417 } 418 }) 419 }) 420 .fetch_one(db) 421 ) 422 } 423 424 #[derive(Debug, PartialEq, Eq)] 425 pub struct BounceResult { 426 pub tx_id: u64, 427 pub tx_new: bool, 428 } 429 430 pub async fn register_bounced_tx_in( 431 db: &mut PgConnection, 432 tx: &TxIn, 433 chargeback_id: i64, 434 reason: &str, 435 now: &Timestamp, 436 ) -> sqlx::Result<BounceResult> { 437 serialized!( 438 sqlx::query( 439 " 440 SELECT out_tx_row_id, out_tx_new 441 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 442 ", 443 ) 444 .bind(tx.transfer_id) 445 .bind(tx.tx_id) 446 .bind(tx.amount) 447 .bind(&tx.subject) 448 .bind(tx.debtor_id) 449 .bind(&tx.debtor_name) 450 .bind_timestamp(&tx.valued_at) 451 .bind(chargeback_id) 452 .bind(reason) 453 .bind_timestamp(now) 454 .try_map(|r: PgRow| { 455 Ok(BounceResult { 456 tx_id: r.try_get_u64(0)?, 457 tx_new: r.try_get(1)?, 458 }) 459 }) 460 .fetch_one(&mut *db) 461 ) 462 } 463 464 pub async fn transfer_page( 465 db: &PgPool, 466 status: &Option<TransferState>, 467 currency: &Currency, 468 root: &CompactString, 469 params: &Page, 470 ) -> sqlx::Result<Vec<TransferListStatus>> { 471 page( 472 db, 473 "initiated_id", 474 params, 475 || { 476 let mut builder = QueryBuilder::new( 477 " 478 SELECT 479 initiated_id, 480 status, 481 amount, 482 credit_account, 483 credit_name, 484 initiated_at 485 FROM transfer 486 JOIN initiated USING (initiated_id) 487 WHERE 488 ", 489 ); 490 if let Some(status) = status { 491 builder.push(" status = ").push_bind(status).push(" AND "); 492 } 493 builder 494 }, 495 |r: PgRow| { 496 Ok(TransferListStatus { 497 row_id: r.try_get_safeu64(0)?, 498 status: r.try_get(1)?, 499 amount: r.try_get_amount(2, currency)?, 500 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 501 timestamp: r.try_get_timestamp(5)?.into(), 502 }) 503 }, 504 ) 505 .await 506 } 507 508 pub async fn outgoing_history( 509 db: &PgPool, 510 params: &History, 511 currency: &Currency, 512 root: &CompactString, 513 listen: impl FnOnce() -> Receiver<i64>, 514 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 515 history( 516 db, 517 "tx_out_id", 518 params, 519 listen, 520 || { 521 QueryBuilder::new( 522 " 523 SELECT 524 tx_out_id, 525 amount, 526 credit_account, 527 credit_name, 528 valued_at, 529 exchange_base_url, 530 metadata, 531 wtid 532 FROM taler_out 533 JOIN tx_out USING (tx_out_id) 534 WHERE 535 ", 536 ) 537 }, 538 |r: PgRow| { 539 Ok(OutgoingBankTransaction { 540 row_id: r.try_get_safeu64(0)?, 541 amount: r.try_get_amount(1, currency)?, 542 debit_fee: None, 543 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?, 544 date: r.try_get_timestamp(4)?.into(), 545 exchange_base_url: r.try_get_url(5)?, 546 metadata: r.try_get(6)?, 547 wtid: r.try_get(7)?, 548 }) 549 }, 550 ) 551 .await 552 } 553 554 pub async fn incoming_history( 555 db: &PgPool, 556 params: &History, 557 currency: &Currency, 558 root: &CompactString, 559 listen: impl FnOnce() -> Receiver<i64>, 560 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 561 history( 562 db, 563 "tx_in_id", 564 params, 565 listen, 566 || { 567 QueryBuilder::new( 568 " 569 SELECT 570 type, 571 tx_in_id, 572 amount, 573 debit_account, 574 debit_name, 575 valued_at, 576 metadata, 577 authorization_pub, 578 authorization_sig 579 FROM taler_in 580 JOIN tx_in USING (tx_in_id) 581 WHERE 582 ", 583 ) 584 }, 585 |r: PgRow| { 586 Ok(match r.try_get(0)? { 587 IncomingType::reserve => IncomingBankTransaction::Reserve { 588 row_id: r.try_get_safeu64(1)?, 589 amount: r.try_get_amount(2, currency)?, 590 credit_fee: None, 591 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 592 date: r.try_get_timestamp(5)?.into(), 593 reserve_pub: r.try_get(6)?, 594 authorization_pub: r.try_get(7)?, 595 authorization_sig: r.try_get(8)?, 596 }, 597 IncomingType::kyc => IncomingBankTransaction::Kyc { 598 row_id: r.try_get_safeu64(1)?, 599 amount: r.try_get_amount(2, currency)?, 600 credit_fee: None, 601 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 602 date: r.try_get_timestamp(5)?.into(), 603 account_pub: r.try_get(6)?, 604 authorization_pub: r.try_get(7)?, 605 authorization_sig: r.try_get(8)?, 606 }, 607 IncomingType::map => unimplemented!("MAP are never listed in the history"), 608 }) 609 }, 610 ) 611 .await 612 } 613 614 pub async fn revenue_history( 615 db: &PgPool, 616 params: &History, 617 currency: &Currency, 618 root: &CompactString, 619 listen: impl FnOnce() -> Receiver<i64>, 620 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 621 history( 622 db, 623 "tx_in_id", 624 params, 625 listen, 626 || { 627 QueryBuilder::new( 628 " 629 SELECT 630 tx_in_id, 631 valued_at, 632 amount, 633 debit_account, 634 debit_name, 635 subject 636 FROM tx_in 637 WHERE 638 ", 639 ) 640 }, 641 |r: PgRow| { 642 Ok(RevenueIncomingBankTransaction { 643 row_id: r.try_get_safeu64(0)?, 644 date: r.try_get_timestamp(1)?.into(), 645 amount: r.try_get_amount(2, currency)?, 646 credit_fee: None, 647 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 648 subject: r.try_get(5)?, 649 }) 650 }, 651 ) 652 .await 653 } 654 655 pub async fn transfer_by_id( 656 db: &PgPool, 657 id: u64, 658 currency: &Currency, 659 root: &CompactString, 660 ) -> sqlx::Result<Option<TransferStatus>> { 661 serialized!( 662 sqlx::query( 663 " 664 SELECT 665 status, 666 status_msg, 667 amount, 668 exchange_base_url, 669 metadata, 670 wtid, 671 credit_account, 672 credit_name, 673 initiated_at 674 FROM transfer 675 JOIN initiated USING (initiated_id) 676 WHERE initiated_id = $1 677 ", 678 ) 679 .bind(id as i64) 680 .try_map(|r: PgRow| { 681 Ok(TransferStatus { 682 status: r.try_get(0)?, 683 status_msg: r.try_get(1)?, 684 amount: r.try_get_amount(2, currency)?, 685 origin_exchange_url: r.try_get(3)?, 686 metadata: r.try_get(4)?, 687 wtid: r.try_get(5)?, 688 credit_account: r.try_get_cyclos_fullpaytouri(6, 7, root)?, 689 timestamp: r.try_get_timestamp(8)?.into(), 690 }) 691 }) 692 .fetch_optional(db) 693 ) 694 } 695 696 /** Get a batch of pending initiated transactions not attempted since [start] */ 697 pub async fn pending_batch( 698 db: &mut PgConnection, 699 start: &Timestamp, 700 ) -> sqlx::Result<Vec<Initiated>> { 701 serialized!( 702 sqlx::query( 703 " 704 SELECT initiated_id, amount, subject, credit_account, credit_name 705 FROM initiated 706 WHERE tx_id IS NULL 707 AND status='pending' 708 AND (last_submitted IS NULL OR last_submitted < $1) 709 LIMIT 100 710 ", 711 ) 712 .bind_timestamp(start) 713 .try_map(|r: PgRow| { 714 Ok(Initiated { 715 id: r.try_get(0)?, 716 amount: r.try_get(1)?, 717 subject: r.try_get(2)?, 718 creditor_id: r.try_get(3)?, 719 creditor_name: r.try_get(4)?, 720 }) 721 }) 722 .fetch_all(&mut *db) 723 ) 724 } 725 726 /** Update status of a successful submitted initiated transaction */ 727 pub async fn initiated_submit_success( 728 db: &mut PgConnection, 729 initiated_id: i64, 730 timestamp: &Timestamp, 731 tx_id: i64, 732 ) -> sqlx::Result<()> { 733 serialized!( 734 sqlx::query( 735 " 736 UPDATE initiated 737 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2 738 WHERE initiated_id=$3 739 " 740 ) 741 .bind_timestamp(timestamp) 742 .bind(tx_id) 743 .bind(initiated_id) 744 .execute(&mut *db) 745 )?; 746 Ok(()) 747 } 748 749 /** Update status of a permanently failed initiated transaction */ 750 pub async fn initiated_submit_permanent_failure( 751 db: &mut PgConnection, 752 initiated_id: i64, 753 msg: &str, 754 ) -> sqlx::Result<()> { 755 serialized!( 756 sqlx::query( 757 " 758 UPDATE initiated 759 SET status='permanent_failure', status_msg=$1 760 WHERE initiated_id=$2 761 ", 762 ) 763 .bind(msg) 764 .bind(initiated_id) 765 .execute(&mut *db) 766 )?; 767 Ok(()) 768 } 769 770 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 771 pub enum ChargebackFailureResult { 772 Unknown, 773 Known(u64), 774 Idempotent(u64), 775 } 776 777 /** Update status of a charged back initiated transaction */ 778 pub async fn initiated_chargeback_failure( 779 db: &mut PgConnection, 780 transfer_id: i64, 781 ) -> sqlx::Result<ChargebackFailureResult> { 782 Ok(serialized!( 783 sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)") 784 .bind(transfer_id) 785 .try_map(|r: PgRow| { 786 let id = r.try_get_u64(0)?; 787 Ok(if id == 0 { 788 ChargebackFailureResult::Unknown 789 } else if r.try_get(1)? { 790 ChargebackFailureResult::Known(id) 791 } else { 792 ChargebackFailureResult::Idempotent(id) 793 }) 794 }) 795 .fetch_optional(&mut *db) 796 )? 797 .unwrap_or(ChargebackFailureResult::Unknown)) 798 } 799 800 /** Get JSON value from KV table */ 801 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>( 802 db: &mut PgConnection, 803 key: &str, 804 ) -> sqlx::Result<Option<T>> { 805 serialized!( 806 sqlx::query("SELECT value FROM kv WHERE key=$1") 807 .bind(key) 808 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 809 .fetch_optional(&mut *db) 810 ) 811 } 812 813 /** Set JSON value in KV table */ 814 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> { 815 serialized!( 816 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 817 .bind(key) 818 .bind(sqlx::types::Json(value)) 819 .execute(&mut *db) 820 )?; 821 Ok(()) 822 } 823 824 pub enum RegistrationResult { 825 Success, 826 ReservePubReuse, 827 } 828 829 pub async fn transfer_register( 830 db: &PgPool, 831 req: &RegistrationRequest, 832 ) -> sqlx::Result<RegistrationResult> { 833 let ty: IncomingType = req.r#type.into(); 834 serialized!( 835 sqlx::query( 836 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 837 ) 838 .bind(ty) 839 .bind(&req.account_pub) 840 .bind(&req.authorization_pub) 841 .bind(&req.authorization_sig) 842 .bind(req.recurrent) 843 .bind_timestamp(&Timestamp::now()) 844 .try_map(|r: PgRow| { 845 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 846 RegistrationResult::ReservePubReuse 847 } else { 848 RegistrationResult::Success 849 }) 850 }) 851 .fetch_one(db) 852 ) 853 } 854 855 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 856 serialized!( 857 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1)") 858 .bind(&req.authorization_pub) 859 .try_map(|r: PgRow| r.try_get_flag("out_found")) 860 .fetch_one(db) 861 ) 862 } 863 864 pub trait CyclosTypeHelper { 865 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 866 &self, 867 idx: I, 868 name: I, 869 root: &CompactString, 870 ) -> sqlx::Result<FullCyclosPayto>; 871 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 872 &self, 873 idx: I, 874 name: I, 875 root: &CompactString, 876 ) -> sqlx::Result<PaytoURI>; 877 } 878 879 impl CyclosTypeHelper for PgRow { 880 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 881 &self, 882 idx: I, 883 name: I, 884 root: &CompactString, 885 ) -> sqlx::Result<FullCyclosPayto> { 886 let idx = self.try_get(idx)?; 887 let name = self.try_get(name)?; 888 Ok(FullCyclosPayto::new( 889 CyclosAccount { 890 id: CyclosId(idx), 891 root: root.clone(), 892 }, 893 name, 894 )) 895 } 896 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 897 &self, 898 idx: I, 899 name: I, 900 root: &CompactString, 901 ) -> sqlx::Result<PaytoURI> { 902 let idx = self.try_get(idx)?; 903 let name = self.try_get(name)?; 904 Ok(CyclosAccount { 905 id: CyclosId(idx), 906 root: root.clone(), 907 } 908 .as_full_payto(name)) 909 } 910 } 911 912 #[cfg(test)] 913 mod test { 914 use compact_str::CompactString; 915 use jiff::{Span, Timestamp}; 916 use serde_json::json; 917 use sqlx::{PgPool, Postgres, Row as _, pool::PoolConnection, postgres::PgRow}; 918 use taler_api::{ 919 db::TypeHelper, 920 notification::dummy_listen, 921 subject::{IncomingSubject, OutgoingSubject}, 922 }; 923 use taler_common::{ 924 api_common::{EddsaPublicKey, HashCode, ShortHashCode}, 925 api_params::{History, Page}, 926 api_wire::TransferState, 927 types::{ 928 amount::{Currency, decimal}, 929 url, 930 utils::now_sql_stable_ts, 931 }, 932 }; 933 934 use crate::{ 935 constants::CONFIG_SOURCE, 936 db::{ 937 self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult, 938 Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind, 939 }, 940 }; 941 942 pub const CURR: Currency = Currency::TEST; 943 pub const ROOT: CompactString = CompactString::const_new("localhost"); 944 945 async fn setup() -> (PoolConnection<Postgres>, PgPool) { 946 taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await 947 } 948 949 #[tokio::test] 950 async fn kv() { 951 let (mut db, _) = setup().await; 952 953 let value = json!({ 954 "name": "Mr Smith", 955 "no way": 32 956 }); 957 958 assert_eq!( 959 db::kv_get::<serde_json::Value>(&mut db, "value") 960 .await 961 .unwrap(), 962 None 963 ); 964 db::kv_set(&mut db, "value", &value).await.unwrap(); 965 db::kv_set(&mut db, "value", &value).await.unwrap(); 966 assert_eq!( 967 db::kv_get::<serde_json::Value>(&mut db, "value") 968 .await 969 .unwrap(), 970 Some(value) 971 ); 972 } 973 974 #[tokio::test] 975 async fn tx_in() { 976 let (mut db, pool) = setup().await; 977 978 let mut routine = async |first: &Option<IncomingSubject>, 979 second: &Option<IncomingSubject>| { 980 let id = sqlx::query("SELECT count(*) + 1 FROM tx_in") 981 .try_map(|r: PgRow| r.try_get_u64(0)) 982 .fetch_one(&mut *db) 983 .await 984 .unwrap(); 985 let now = now_sql_stable_ts(); 986 let later = now + Span::new().hours(2); 987 let tx = TxIn { 988 transfer_id: now.as_microsecond() as i64, 989 tx_id: None, 990 amount: decimal("10"), 991 subject: "subject".to_owned(), 992 debtor_id: 31000163100000000, 993 debtor_name: "Name".into(), 994 valued_at: now, 995 }; 996 // Insert 997 assert_eq!( 998 db::register_tx_in(&mut db, &tx, first, &now) 999 .await 1000 .expect("register tx in"), 1001 AddIncomingResult::Success { 1002 new: true, 1003 pending: false, 1004 row_id: id, 1005 valued_at: now, 1006 } 1007 ); 1008 // Idempotent 1009 assert_eq!( 1010 db::register_tx_in( 1011 &mut db, 1012 &TxIn { 1013 valued_at: later, 1014 ..tx.clone() 1015 }, 1016 first, 1017 &now 1018 ) 1019 .await 1020 .expect("register tx in"), 1021 AddIncomingResult::Success { 1022 new: false, 1023 pending: false, 1024 row_id: id, 1025 valued_at: now 1026 } 1027 ); 1028 // Many 1029 assert_eq!( 1030 db::register_tx_in( 1031 &mut db, 1032 &TxIn { 1033 transfer_id: later.as_microsecond() as i64, 1034 valued_at: later, 1035 ..tx 1036 }, 1037 second, 1038 &now 1039 ) 1040 .await 1041 .expect("register tx in"), 1042 AddIncomingResult::Success { 1043 new: true, 1044 pending: false, 1045 row_id: id + 1, 1046 valued_at: later 1047 } 1048 ); 1049 }; 1050 1051 // Empty db 1052 assert_eq!( 1053 db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1054 .await 1055 .unwrap(), 1056 Vec::new() 1057 ); 1058 assert_eq!( 1059 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1060 .await 1061 .unwrap(), 1062 Vec::new() 1063 ); 1064 1065 // Regular transaction 1066 routine(&None, &None).await; 1067 1068 // Reserve transaction 1069 routine( 1070 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1071 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1072 ) 1073 .await; 1074 1075 // Kyc transaction 1076 routine( 1077 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1078 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1079 ) 1080 .await; 1081 1082 // History 1083 assert_eq!( 1084 db::revenue_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1085 .await 1086 .unwrap() 1087 .len(), 1088 6 1089 ); 1090 assert_eq!( 1091 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1092 .await 1093 .unwrap() 1094 .len(), 1095 4 1096 ); 1097 } 1098 1099 #[tokio::test] 1100 async fn tx_in_admin() { 1101 let (_, pool) = setup().await; 1102 1103 // Empty db 1104 assert_eq!( 1105 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1106 .await 1107 .unwrap(), 1108 Vec::new() 1109 ); 1110 1111 let now = now_sql_stable_ts(); 1112 let later = now + Span::new().hours(2); 1113 let tx = TxInAdmin { 1114 amount: decimal("10"), 1115 subject: "subject".to_owned(), 1116 debtor_id: 31000163100000000, 1117 debtor_name: "Name".into(), 1118 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1119 }; 1120 // Insert 1121 assert_eq!( 1122 db::register_tx_in_admin(&pool, &tx, &now) 1123 .await 1124 .expect("register tx in"), 1125 AddIncomingResult::Success { 1126 new: true, 1127 pending: false, 1128 row_id: 1, 1129 valued_at: now 1130 } 1131 ); 1132 // Many 1133 assert_eq!( 1134 db::register_tx_in_admin( 1135 &pool, 1136 &TxInAdmin { 1137 subject: "Other".to_owned(), 1138 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1139 ..tx.clone() 1140 }, 1141 &later 1142 ) 1143 .await 1144 .expect("register tx in"), 1145 AddIncomingResult::Success { 1146 new: true, 1147 pending: false, 1148 row_id: 2, 1149 valued_at: later 1150 } 1151 ); 1152 1153 // History 1154 assert_eq!( 1155 db::incoming_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1156 .await 1157 .unwrap() 1158 .len(), 1159 2 1160 ); 1161 } 1162 1163 #[tokio::test] 1164 async fn tx_out() { 1165 let (mut db, pool) = setup().await; 1166 1167 let mut routine = async |first: &TxOutKind, second: &TxOutKind| { 1168 let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out") 1169 .try_map(|r: PgRow| r.try_get(0)) 1170 .fetch_one(&mut *db) 1171 .await 1172 .unwrap(); 1173 let now = now_sql_stable_ts(); 1174 let later = now + Span::new().hours(2); 1175 let tx = TxOut { 1176 transfer_id, 1177 tx_id: Some(transfer_id), 1178 amount: decimal("10"), 1179 subject: "subject".to_owned(), 1180 creditor_id: 31000163100000000, 1181 creditor_name: "Name".into(), 1182 valued_at: now, 1183 }; 1184 assert!(matches!( 1185 db::make_transfer( 1186 &pool, 1187 &Transfer { 1188 request_uid: HashCode::rand(), 1189 amount: decimal("10"), 1190 exchange_base_url: url("https://exchange.test.com/"), 1191 metadata: None, 1192 wtid: ShortHashCode::rand(), 1193 creditor_id: 31000163100000000, 1194 creditor_name: "Name".into() 1195 }, 1196 &now 1197 ) 1198 .await 1199 .unwrap(), 1200 TransferResult::Success { .. } 1201 )); 1202 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), transfer_id) 1203 .await 1204 .expect("status success"); 1205 1206 // Insert 1207 assert_eq!( 1208 db::register_tx_out(&mut db, &tx, first, &now) 1209 .await 1210 .expect("register tx out"), 1211 AddOutgoingResult { 1212 result: db::RegisterResult::known, 1213 row_id: transfer_id, 1214 } 1215 ); 1216 // Idempotent 1217 assert_eq!( 1218 db::register_tx_out( 1219 &mut db, 1220 &TxOut { 1221 valued_at: later, 1222 ..tx.clone() 1223 }, 1224 first, 1225 &now 1226 ) 1227 .await 1228 .expect("register tx out"), 1229 AddOutgoingResult { 1230 result: db::RegisterResult::idempotent, 1231 row_id: transfer_id, 1232 } 1233 ); 1234 // Recovered 1235 assert_eq!( 1236 db::register_tx_out( 1237 &mut db, 1238 &TxOut { 1239 transfer_id: transfer_id + 1, 1240 tx_id: Some(transfer_id + 1), 1241 valued_at: later, 1242 ..tx.clone() 1243 }, 1244 second, 1245 &now 1246 ) 1247 .await 1248 .expect("register tx out"), 1249 AddOutgoingResult { 1250 result: db::RegisterResult::recovered, 1251 row_id: transfer_id + 1, 1252 } 1253 ); 1254 }; 1255 1256 // Empty db 1257 assert_eq!( 1258 db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1259 .await 1260 .unwrap(), 1261 Vec::new() 1262 ); 1263 1264 // Regular transaction 1265 routine(&TxOutKind::Simple, &TxOutKind::Simple).await; 1266 1267 // Talerable transaction 1268 routine( 1269 &TxOutKind::Talerable(OutgoingSubject::rand()), 1270 &TxOutKind::Talerable(OutgoingSubject::rand()), 1271 ) 1272 .await; 1273 1274 // Bounced transaction 1275 routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1276 1277 // History 1278 assert_eq!( 1279 db::outgoing_history(&pool, &History::default(), &CURR, &ROOT, dummy_listen) 1280 .await 1281 .unwrap() 1282 .len(), 1283 2 1284 ); 1285 } 1286 1287 // TODO tx out failure 1288 1289 #[tokio::test] 1290 async fn transfer() { 1291 let (_, pool) = setup().await; 1292 1293 // Empty db 1294 assert_eq!( 1295 db::transfer_by_id(&pool, 0, &CURR, &ROOT).await.unwrap(), 1296 None 1297 ); 1298 assert_eq!( 1299 db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default()) 1300 .await 1301 .unwrap(), 1302 Vec::new() 1303 ); 1304 1305 let req = Transfer { 1306 request_uid: HashCode::rand(), 1307 amount: decimal("10"), 1308 exchange_base_url: url("https://exchange.test.com/"), 1309 metadata: None, 1310 wtid: ShortHashCode::rand(), 1311 creditor_id: 31000163100000000, 1312 creditor_name: "Name".into(), 1313 }; 1314 let now = now_sql_stable_ts(); 1315 let later = now + Span::new().hours(2); 1316 // Insert 1317 assert_eq!( 1318 db::make_transfer(&pool, &req, &now) 1319 .await 1320 .expect("transfer"), 1321 TransferResult::Success { 1322 id: 1, 1323 initiated_at: now 1324 } 1325 ); 1326 // Idempotent 1327 assert_eq!( 1328 db::make_transfer(&pool, &req, &later) 1329 .await 1330 .expect("transfer"), 1331 TransferResult::Success { 1332 id: 1, 1333 initiated_at: now 1334 } 1335 ); 1336 // Request UID reuse 1337 assert_eq!( 1338 db::make_transfer( 1339 &pool, 1340 &Transfer { 1341 wtid: ShortHashCode::rand(), 1342 ..req.clone() 1343 }, 1344 &now 1345 ) 1346 .await 1347 .expect("transfer"), 1348 TransferResult::RequestUidReuse 1349 ); 1350 // wtid reuse 1351 assert_eq!( 1352 db::make_transfer( 1353 &pool, 1354 &Transfer { 1355 request_uid: HashCode::rand(), 1356 ..req.clone() 1357 }, 1358 &now 1359 ) 1360 .await 1361 .expect("transfer"), 1362 TransferResult::WtidReuse 1363 ); 1364 // Many 1365 assert_eq!( 1366 db::make_transfer( 1367 &pool, 1368 &Transfer { 1369 request_uid: HashCode::rand(), 1370 wtid: ShortHashCode::rand(), 1371 ..req 1372 }, 1373 &later 1374 ) 1375 .await 1376 .expect("transfer"), 1377 TransferResult::Success { 1378 id: 2, 1379 initiated_at: later 1380 } 1381 ); 1382 1383 // Get 1384 assert!( 1385 db::transfer_by_id(&pool, 1, &CURR, &ROOT) 1386 .await 1387 .unwrap() 1388 .is_some() 1389 ); 1390 assert!( 1391 db::transfer_by_id(&pool, 2, &CURR, &ROOT) 1392 .await 1393 .unwrap() 1394 .is_some() 1395 ); 1396 assert!( 1397 db::transfer_by_id(&pool, 3, &CURR, &ROOT) 1398 .await 1399 .unwrap() 1400 .is_none() 1401 ); 1402 assert_eq!( 1403 db::transfer_page(&pool, &None, &CURR, &ROOT, &Page::default()) 1404 .await 1405 .unwrap() 1406 .len(), 1407 2 1408 ); 1409 } 1410 1411 #[tokio::test] 1412 async fn bounce() { 1413 let (mut db, _) = setup().await; 1414 1415 let amount = decimal("10"); 1416 let now = now_sql_stable_ts(); 1417 1418 // Bounce 1419 assert_eq!( 1420 db::register_bounced_tx_in( 1421 &mut db, 1422 &TxIn { 1423 transfer_id: 12, 1424 tx_id: None, 1425 amount, 1426 subject: "subject".to_owned(), 1427 debtor_id: 31000163100000000, 1428 debtor_name: "Name".into(), 1429 valued_at: now 1430 }, 1431 22, 1432 "good reason", 1433 &now 1434 ) 1435 .await 1436 .expect("bounce"), 1437 BounceResult { 1438 tx_id: 1, 1439 tx_new: true 1440 } 1441 ); 1442 // Idempotent 1443 assert_eq!( 1444 db::register_bounced_tx_in( 1445 &mut db, 1446 &TxIn { 1447 transfer_id: 12, 1448 tx_id: None, 1449 amount, 1450 subject: "subject".to_owned(), 1451 debtor_id: 31000163100000000, 1452 debtor_name: "Name".into(), 1453 valued_at: now 1454 }, 1455 22, 1456 "good reason", 1457 &now 1458 ) 1459 .await 1460 .expect("bounce"), 1461 BounceResult { 1462 tx_id: 1, 1463 tx_new: false 1464 } 1465 ); 1466 1467 // Many 1468 assert_eq!( 1469 db::register_bounced_tx_in( 1470 &mut db, 1471 &TxIn { 1472 transfer_id: 13, 1473 tx_id: None, 1474 amount, 1475 subject: "subject".to_owned(), 1476 debtor_id: 31000163100000000, 1477 debtor_name: "Name".into(), 1478 valued_at: now 1479 }, 1480 23, 1481 "good reason", 1482 &now 1483 ) 1484 .await 1485 .expect("bounce"), 1486 BounceResult { 1487 tx_id: 2, 1488 tx_new: true 1489 } 1490 ); 1491 } 1492 1493 #[tokio::test] 1494 async fn status() { 1495 let (mut db, pool) = setup().await; 1496 1497 let check_status = async |id: u64, status: TransferState, msg: Option<&str>| { 1498 let transfer = db::transfer_by_id(&pool, id, &CURR, &ROOT) 1499 .await 1500 .unwrap() 1501 .unwrap(); 1502 assert_eq!( 1503 (status, msg), 1504 (transfer.status, transfer.status_msg.as_deref()) 1505 ); 1506 }; 1507 1508 // Unknown transfer 1509 db::initiated_submit_permanent_failure(&mut db, 1, "msg") 1510 .await 1511 .unwrap(); 1512 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1513 .await 1514 .unwrap(); 1515 assert_eq!( 1516 db::initiated_chargeback_failure(&mut db, 1).await.unwrap(), 1517 ChargebackFailureResult::Unknown 1518 ); 1519 1520 // Failure 1521 db::make_transfer( 1522 &pool, 1523 &Transfer { 1524 request_uid: HashCode::rand(), 1525 amount: decimal("1"), 1526 exchange_base_url: url("https://exchange.test.com/"), 1527 metadata: None, 1528 wtid: ShortHashCode::rand(), 1529 creditor_id: 31000163100000000, 1530 creditor_name: "Name".into(), 1531 }, 1532 &Timestamp::now(), 1533 ) 1534 .await 1535 .expect("transfer"); 1536 check_status(1, TransferState::pending, None).await; 1537 db::initiated_submit_permanent_failure(&mut db, 1, "error status") 1538 .await 1539 .unwrap(); 1540 check_status(1, TransferState::permanent_failure, Some("error status")).await; 1541 1542 // Success 1543 db::make_transfer( 1544 &pool, 1545 &Transfer { 1546 request_uid: HashCode::rand(), 1547 amount: decimal("1"), 1548 exchange_base_url: url("https://exchange.test.com/"), 1549 metadata: None, 1550 wtid: ShortHashCode::rand(), 1551 creditor_id: 31000163100000000, 1552 creditor_name: "Name".into(), 1553 }, 1554 &Timestamp::now(), 1555 ) 1556 .await 1557 .expect("transfer"); 1558 check_status(2, TransferState::pending, None).await; 1559 db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3) 1560 .await 1561 .unwrap(); 1562 check_status(2, TransferState::pending, None).await; 1563 db::register_tx_out( 1564 &mut db, 1565 &TxOut { 1566 transfer_id: 5, 1567 tx_id: Some(3), 1568 amount: decimal("2"), 1569 subject: "".to_string(), 1570 creditor_id: 31000163100000000, 1571 creditor_name: "Name".into(), 1572 valued_at: Timestamp::now(), 1573 }, 1574 &TxOutKind::Simple, 1575 &Timestamp::now(), 1576 ) 1577 .await 1578 .unwrap(); 1579 check_status(2, TransferState::success, None).await; 1580 1581 // Chargeback 1582 assert_eq!( 1583 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1584 ChargebackFailureResult::Known(2) 1585 ); 1586 check_status(2, TransferState::late_failure, Some("charged back")).await; 1587 assert_eq!( 1588 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1589 ChargebackFailureResult::Idempotent(2) 1590 ); 1591 } 1592 1593 #[tokio::test] 1594 async fn batch() { 1595 let (mut db, pool) = setup().await; 1596 let start = Timestamp::now(); 1597 1598 // Empty db 1599 let pendings = db::pending_batch(&mut db, &start) 1600 .await 1601 .expect("pending_batch"); 1602 assert_eq!(pendings.len(), 0); 1603 1604 // Some transfers 1605 for i in 0..3 { 1606 db::make_transfer( 1607 &pool, 1608 &Transfer { 1609 request_uid: HashCode::rand(), 1610 amount: decimal(format!("{}", i + 1)), 1611 exchange_base_url: url("https://exchange.test.com/"), 1612 metadata: None, 1613 wtid: ShortHashCode::rand(), 1614 creditor_id: 31000163100000000, 1615 creditor_name: "Name".into(), 1616 }, 1617 &Timestamp::now(), 1618 ) 1619 .await 1620 .expect("transfer"); 1621 } 1622 let pendings = db::pending_batch(&mut db, &start) 1623 .await 1624 .expect("pending_batch"); 1625 assert_eq!(pendings.len(), 3); 1626 1627 // Max 100 txs in batch 1628 for i in 0..100 { 1629 db::make_transfer( 1630 &pool, 1631 &Transfer { 1632 request_uid: HashCode::rand(), 1633 amount: decimal(format!("{}", i + 1)), 1634 exchange_base_url: url("https://exchange.test.com/"), 1635 metadata: None, 1636 wtid: ShortHashCode::rand(), 1637 creditor_id: 31000163100000000, 1638 creditor_name: "Name".into(), 1639 }, 1640 &Timestamp::now(), 1641 ) 1642 .await 1643 .expect("transfer"); 1644 } 1645 let pendings = db::pending_batch(&mut db, &start) 1646 .await 1647 .expect("pending_batch"); 1648 assert_eq!(pendings.len(), 100); 1649 1650 // Skip uploaded 1651 for i in 0..=10 { 1652 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1653 .await 1654 .expect("status success"); 1655 } 1656 let pendings = db::pending_batch(&mut db, &start) 1657 .await 1658 .expect("pending_batch"); 1659 assert_eq!(pendings.len(), 93); 1660 1661 // Skip failed 1662 for i in 0..=10 { 1663 db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure") 1664 .await 1665 .expect("status failure"); 1666 } 1667 let pendings = db::pending_batch(&mut db, &start) 1668 .await 1669 .expect("pending_batch"); 1670 assert_eq!(pendings.len(), 83); 1671 } 1672 }