db.rs (47336B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use jiff::{Timestamp, civil::Date, tz::TimeZone}; 20 use serde::{Serialize, de::DeserializeOwned}; 21 use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow}; 22 use taler_api::{ 23 db::{BindHelper, IncomingType, TypeHelper, history, page}, 24 subject::{IncomingSubject, OutgoingSubject}, 25 }; 26 use taler_common::{ 27 api_common::{HashCode, ShortHashCode}, 28 api_params::{History, Page}, 29 api_revenue::RevenueIncomingBankTransaction, 30 api_wire::{ 31 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, 32 TransferStatus, 33 }, 34 config::Config, 35 types::{ 36 amount::{Amount, Decimal}, 37 payto::PaytoImpl as _, 38 }, 39 }; 40 use tokio::sync::watch::{Receiver, Sender}; 41 use url::Url; 42 43 use crate::{FullHuPayto, config::parse_db_cfg, constants::CURRENCY, magnet_api::types::TxStatus}; 44 45 const SCHEMA: &str = "magnet_bank"; 46 47 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { 48 let db = parse_db_cfg(cfg)?; 49 let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; 50 Ok(pool) 51 } 52 53 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { 54 let db_cfg = parse_db_cfg(cfg)?; 55 let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; 56 let mut db = pool.acquire().await?; 57 taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?; 58 Ok(pool) 59 } 60 61 pub async fn notification_listener( 62 pool: PgPool, 63 in_channel: Sender<i64>, 64 taler_in_channel: Sender<i64>, 65 out_channel: Sender<i64>, 66 taler_out_channel: Sender<i64>, 67 ) -> sqlx::Result<()> { 68 taler_api::notification::notification_listener!(&pool, 69 "tx_in" => (row_id: i64) { 70 in_channel.send_replace(row_id); 71 }, 72 "taler_in" => (row_id: i64) { 73 taler_in_channel.send_replace(row_id); 74 }, 75 "tx_out" => (row_id: i64) { 76 out_channel.send_replace(row_id); 77 }, 78 "taler_out" => (row_id: i64) { 79 taler_out_channel.send_replace(row_id); 80 } 81 ) 82 } 83 84 #[derive(Debug, Clone)] 85 pub struct TxIn { 86 pub code: u64, 87 pub amount: Amount, 88 pub subject: String, 89 pub debtor: FullHuPayto, 90 pub value_date: Date, 91 pub status: TxStatus, 92 } 93 94 impl Display for TxIn { 95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 96 let Self { 97 code, 98 amount, 99 subject, 100 debtor, 101 value_date, 102 status, 103 } = self; 104 write!( 105 f, 106 "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'", 107 debtor.bban(), 108 debtor.name 109 ) 110 } 111 } 112 113 #[derive(Debug, Clone)] 114 pub struct TxOut { 115 pub code: u64, 116 pub amount: Amount, 117 pub subject: String, 118 pub creditor: FullHuPayto, 119 pub value_date: Date, 120 pub status: TxStatus, 121 } 122 123 impl Display for TxOut { 124 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 125 let Self { 126 code, 127 amount, 128 subject, 129 creditor, 130 value_date, 131 status, 132 } = self; 133 write!( 134 f, 135 "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'", 136 creditor.bban(), 137 &creditor.name 138 ) 139 } 140 } 141 142 #[derive(Debug, PartialEq, Eq)] 143 pub struct Initiated { 144 pub id: u64, 145 pub amount: Amount, 146 pub subject: String, 147 pub creditor: FullHuPayto, 148 } 149 150 impl Display for Initiated { 151 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 152 let Self { 153 id, 154 amount, 155 subject, 156 creditor, 157 } = self; 158 write!( 159 f, 160 "{id} {amount} ({} {}) '{subject}'", 161 creditor.bban(), 162 &creditor.name 163 ) 164 } 165 } 166 167 #[derive(Debug, Clone)] 168 pub struct TxInAdmin { 169 pub amount: Amount, 170 pub subject: String, 171 pub debtor: FullHuPayto, 172 pub metadata: IncomingSubject, 173 } 174 175 /// Lock the database for worker execution 176 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { 177 sqlx::query("SELECT pg_try_advisory_lock(42)") 178 .try_map(|r: PgRow| r.try_get(0)) 179 .fetch_one(e) 180 .await 181 } 182 183 #[derive(Debug, PartialEq, Eq)] 184 pub enum AddIncomingResult { 185 Success { 186 new: bool, 187 row_id: u64, 188 valued_at: Date, 189 }, 190 ReservePubReuse, 191 } 192 193 pub async fn register_tx_in_admin( 194 db: &PgPool, 195 tx: &TxInAdmin, 196 now: &Timestamp, 197 ) -> sqlx::Result<AddIncomingResult> { 198 sqlx::query( 199 " 200 SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new 201 FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5) 202 ", 203 ) 204 .bind(&tx.amount) 205 .bind(&tx.subject) 206 .bind(tx.debtor.iban()) 207 .bind(&tx.debtor.name) 208 .bind_date(&now.to_zoned(TimeZone::UTC).date()) 209 .bind(tx.metadata.ty()) 210 .bind(tx.metadata.key()) 211 .try_map(|r: PgRow| { 212 Ok(if r.try_get_flag(0)? { 213 AddIncomingResult::ReservePubReuse 214 } else { 215 AddIncomingResult::Success { 216 row_id: r.try_get_u64(1)?, 217 valued_at: r.try_get_date(2)?, 218 new: r.try_get(3)?, 219 } 220 }) 221 }) 222 .fetch_one(db) 223 .await 224 } 225 226 pub async fn register_tx_in( 227 db: &mut PgConnection, 228 tx: &TxIn, 229 subject: &Option<IncomingSubject>, 230 now: &Timestamp, 231 ) -> sqlx::Result<AddIncomingResult> { 232 sqlx::query( 233 " 234 SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new 235 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) 236 ", 237 ) 238 .bind(tx.code as i64) 239 .bind(&tx.amount) 240 .bind(&tx.subject) 241 .bind(tx.debtor.iban()) 242 .bind(&tx.debtor.name) 243 .bind_date(&tx.value_date) 244 .bind(subject.as_ref().map(|it| it.ty())) 245 .bind(subject.as_ref().map(|it| it.key())) 246 .bind_timestamp(now) 247 .try_map(|r: PgRow| { 248 Ok(if r.try_get_flag(0)? { 249 AddIncomingResult::ReservePubReuse 250 } else { 251 AddIncomingResult::Success { 252 row_id: r.try_get_u64(1)?, 253 valued_at: r.try_get_date(2)?, 254 new: r.try_get(3)?, 255 } 256 }) 257 }) 258 .fetch_one(db) 259 .await 260 } 261 262 #[derive(Debug)] 263 pub enum TxOutKind { 264 Simple, 265 Bounce(u32), 266 Talerable(OutgoingSubject), 267 } 268 269 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 270 #[allow(non_camel_case_types)] 271 #[sqlx(type_name = "register_result")] 272 pub enum RegisterResult { 273 /// Already registered 274 idempotent, 275 /// Initiated transaction 276 known, 277 /// Recovered unknown outgoing transaction 278 recovered, 279 } 280 281 #[derive(Debug, PartialEq, Eq)] 282 pub struct AddOutgoingResult { 283 pub result: RegisterResult, 284 pub row_id: u64, 285 } 286 287 pub async fn register_tx_out( 288 db: &mut PgConnection, 289 tx: &TxOut, 290 kind: &TxOutKind, 291 now: &Timestamp, 292 ) -> sqlx::Result<AddOutgoingResult> { 293 let query = sqlx::query( 294 " 295 SELECT out_result, out_tx_row_id 296 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 297 ", 298 ) 299 .bind(tx.code as i64) 300 .bind(&tx.amount) 301 .bind(&tx.subject) 302 .bind(tx.creditor.iban()) 303 .bind(&tx.creditor.name) 304 .bind_date(&tx.value_date); 305 let query = match kind { 306 TxOutKind::Simple => query 307 .bind(None::<&[u8]>) 308 .bind(None::<&str>) 309 .bind(None::<i64>), 310 TxOutKind::Bounce(bounced) => query 311 .bind(None::<&[u8]>) 312 .bind(None::<&str>) 313 .bind(*bounced as i64), 314 TxOutKind::Talerable(subject) => query 315 .bind(subject.wtid.as_ref()) 316 .bind(subject.exchange_base_url.as_ref()) 317 .bind(None::<i64>), 318 }; 319 query 320 .bind_timestamp(now) 321 .try_map(|r: PgRow| { 322 Ok(AddOutgoingResult { 323 result: r.try_get(0)?, 324 row_id: r.try_get_u64(1)?, 325 }) 326 }) 327 .fetch_one(db) 328 .await 329 } 330 331 #[derive(Debug, PartialEq, Eq)] 332 pub struct OutFailureResult { 333 pub initiated_id: Option<u64>, 334 pub new: bool, 335 } 336 337 pub async fn register_tx_out_failure( 338 db: &mut PgConnection, 339 code: u64, 340 bounced: Option<u32>, 341 now: &Timestamp, 342 ) -> sqlx::Result<OutFailureResult> { 343 sqlx::query( 344 " 345 SELECT out_new, out_initiated_id 346 FROM register_tx_out_failure($1, $2, $3) 347 ", 348 ) 349 .bind(code as i64) 350 .bind(bounced.map(|i| i as i32)) 351 .bind_timestamp(now) 352 .try_map(|r: PgRow| { 353 Ok(OutFailureResult { 354 new: r.try_get(0)?, 355 initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64), 356 }) 357 }) 358 .fetch_one(db) 359 .await 360 } 361 362 #[derive(Debug, PartialEq, Eq)] 363 pub enum TransferResult { 364 Success { id: u64, initiated_at: Timestamp }, 365 RequestUidReuse, 366 WtidReuse, 367 } 368 369 #[derive(Debug, Clone)] 370 pub struct Transfer { 371 pub request_uid: HashCode, 372 pub amount: Decimal, 373 pub exchange_base_url: Url, 374 pub wtid: ShortHashCode, 375 pub creditor: FullHuPayto, 376 } 377 378 pub async fn make_transfer<'a>( 379 db: impl PgExecutor<'a>, 380 tx: &Transfer, 381 now: &Timestamp, 382 ) -> sqlx::Result<TransferResult> { 383 let subject = format!("{} {}", tx.wtid, tx.exchange_base_url); 384 sqlx::query( 385 " 386 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 387 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8) 388 ", 389 ) 390 .bind(tx.request_uid.as_ref()) 391 .bind(tx.wtid.as_ref()) 392 .bind(&subject) 393 .bind(tx.amount) 394 .bind(tx.exchange_base_url.as_str()) 395 .bind(tx.creditor.iban()) 396 .bind(&tx.creditor.name) 397 .bind_timestamp(now) 398 .try_map(|r: PgRow| { 399 Ok(if r.try_get_flag(0)? { 400 TransferResult::RequestUidReuse 401 } else if r.try_get_flag(1)? { 402 TransferResult::WtidReuse 403 } else { 404 TransferResult::Success { 405 id: r.try_get_u64(2)?, 406 initiated_at: r.try_get_timestamp(3)?, 407 } 408 }) 409 }) 410 .fetch_one(db) 411 .await 412 } 413 414 #[derive(Debug, PartialEq, Eq)] 415 pub struct BounceResult { 416 pub tx_id: u64, 417 pub tx_new: bool, 418 pub bounce_id: u64, 419 pub bounce_new: bool, 420 } 421 422 pub async fn register_bounce_tx_in( 423 db: &mut PgConnection, 424 tx: &TxIn, 425 amount: &Amount, 426 reason: &str, 427 now: &Timestamp, 428 ) -> sqlx::Result<BounceResult> { 429 sqlx::query( 430 " 431 SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new 432 FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) 433 ", 434 ) 435 .bind(tx.code as i64) 436 .bind(&tx.amount) 437 .bind(&tx.subject) 438 .bind(tx.debtor.iban()) 439 .bind(&tx.debtor.name) 440 .bind_date(&tx.value_date) 441 .bind(amount) 442 .bind(reason) 443 .bind_timestamp(now) 444 .try_map(|r: PgRow| { 445 Ok(BounceResult { 446 tx_id: r.try_get_u64(0)?, 447 tx_new: r.try_get(1)?, 448 bounce_id: r.try_get_u64(2)?, 449 bounce_new: r.try_get(3)?, 450 }) 451 }) 452 .fetch_one(db) 453 .await 454 } 455 456 pub async fn transfer_page<'a>( 457 db: impl PgExecutor<'a>, 458 status: &Option<TransferState>, 459 params: &Page, 460 ) -> sqlx::Result<Vec<TransferListStatus>> { 461 page( 462 db, 463 "initiated_id", 464 params, 465 || { 466 let mut builder = QueryBuilder::new( 467 " 468 SELECT 469 initiated_id, 470 status, 471 amount, 472 credit_account, 473 credit_name, 474 initiated_at 475 FROM transfer 476 JOIN initiated USING (initiated_id) 477 WHERE 478 ", 479 ); 480 if let Some(status) = status { 481 builder.push(" status = ").push_bind(status).push(" AND "); 482 } 483 builder 484 }, 485 |r: PgRow| { 486 Ok(TransferListStatus { 487 row_id: r.try_get_safeu64(0)?, 488 status: r.try_get(1)?, 489 amount: r.try_get_amount(2, &CURRENCY)?, 490 credit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 491 timestamp: r.try_get_timestamp(5)?.into(), 492 }) 493 }, 494 ) 495 .await 496 } 497 498 pub async fn outgoing_history( 499 db: &PgPool, 500 params: &History, 501 listen: impl FnOnce() -> Receiver<i64>, 502 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 503 history( 504 db, 505 "tx_out_id", 506 params, 507 listen, 508 || { 509 QueryBuilder::new( 510 " 511 SELECT 512 tx_out_id, 513 amount, 514 credit_account, 515 credit_name, 516 valued_at, 517 exchange_base_url, 518 wtid 519 FROM taler_out 520 JOIN tx_out USING (tx_out_id) 521 WHERE 522 ", 523 ) 524 }, 525 |r: PgRow| { 526 Ok(OutgoingBankTransaction { 527 row_id: r.try_get_safeu64(0)?, 528 amount: r.try_get_amount(1, &CURRENCY)?, 529 debit_fee: None, 530 credit_account: r.try_get_iban(2)?.as_full_payto(r.try_get(3)?), 531 date: r.try_get_timestamp(4)?.into(), 532 exchange_base_url: r.try_get_url(5)?, 533 wtid: r.try_get(6)?, 534 }) 535 }, 536 ) 537 .await 538 } 539 540 pub async fn incoming_history( 541 db: &PgPool, 542 params: &History, 543 listen: impl FnOnce() -> Receiver<i64>, 544 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 545 history( 546 db, 547 "tx_in_id", 548 params, 549 listen, 550 || { 551 QueryBuilder::new( 552 " 553 SELECT 554 type, 555 tx_in_id, 556 amount, 557 debit_account, 558 debit_name, 559 valued_at, 560 metadata 561 FROM taler_in 562 JOIN tx_in USING (tx_in_id) 563 WHERE 564 ", 565 ) 566 }, 567 |r: PgRow| { 568 Ok(match r.try_get(0)? { 569 IncomingType::reserve => IncomingBankTransaction::Reserve { 570 row_id: r.try_get_safeu64(1)?, 571 amount: r.try_get_amount(2, &CURRENCY)?, 572 credit_fee: None, 573 debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 574 date: r.try_get_timestamp(5)?.into(), 575 reserve_pub: r.try_get(6)?, 576 authorization_pub: None, 577 authorization_sig: None, 578 }, 579 IncomingType::kyc => IncomingBankTransaction::Kyc { 580 row_id: r.try_get_safeu64(1)?, 581 amount: r.try_get_amount(2, &CURRENCY)?, 582 credit_fee: None, 583 debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 584 date: r.try_get_timestamp(5)?.into(), 585 account_pub: r.try_get(6)?, 586 authorization_pub: None, 587 authorization_sig: None, 588 }, 589 IncomingType::wad => { 590 unimplemented!("WAD is not yet supported") 591 } 592 }) 593 }, 594 ) 595 .await 596 } 597 598 pub async fn revenue_history( 599 db: &PgPool, 600 params: &History, 601 listen: impl FnOnce() -> Receiver<i64>, 602 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 603 history( 604 db, 605 "tx_in_id", 606 params, 607 listen, 608 || { 609 QueryBuilder::new( 610 " 611 SELECT 612 tx_in_id, 613 valued_at, 614 amount, 615 debit_account, 616 debit_name, 617 subject 618 FROM tx_in 619 WHERE 620 ", 621 ) 622 }, 623 |r: PgRow| { 624 Ok(RevenueIncomingBankTransaction { 625 row_id: r.try_get_safeu64(0)?, 626 date: r.try_get_timestamp(1)?.into(), 627 amount: r.try_get_amount(2, &CURRENCY)?, 628 credit_fee: None, 629 debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 630 subject: r.try_get(5)?, 631 }) 632 }, 633 ) 634 .await 635 } 636 637 pub async fn transfer_by_id<'a>( 638 db: impl PgExecutor<'a>, 639 id: u64, 640 ) -> sqlx::Result<Option<TransferStatus>> { 641 sqlx::query( 642 " 643 SELECT 644 status, 645 status_msg, 646 amount, 647 exchange_base_url, 648 wtid, 649 credit_account, 650 credit_name, 651 initiated_at 652 FROM transfer 653 JOIN initiated USING (initiated_id) 654 WHERE initiated_id = $1 655 ", 656 ) 657 .bind(id as i64) 658 .try_map(|r: PgRow| { 659 Ok(TransferStatus { 660 status: r.try_get(0)?, 661 status_msg: r.try_get(1)?, 662 amount: r.try_get_amount(2, &CURRENCY)?, 663 origin_exchange_url: r.try_get(3)?, 664 wtid: r.try_get(4)?, 665 credit_account: r.try_get_iban(5)?.as_full_payto(r.try_get(6)?), 666 timestamp: r.try_get_timestamp(7)?.into(), 667 }) 668 }) 669 .fetch_optional(db) 670 .await 671 } 672 673 /** Get a batch of pending initiated transactions not attempted since [start] */ 674 pub async fn pending_batch<'a>( 675 db: impl PgExecutor<'a>, 676 start: &Timestamp, 677 ) -> sqlx::Result<Vec<Initiated>> { 678 sqlx::query( 679 " 680 SELECT initiated_id, amount, subject, credit_account, credit_name 681 FROM initiated 682 WHERE magnet_code IS NULL 683 AND status='pending' 684 AND (last_submitted IS NULL OR last_submitted < $1) 685 LIMIT 100 686 ", 687 ) 688 .bind_timestamp(start) 689 .try_map(|r: PgRow| { 690 Ok(Initiated { 691 id: r.try_get_u64(0)?, 692 amount: r.try_get_amount(1, &CURRENCY)?, 693 subject: r.try_get(2)?, 694 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), 695 }) 696 }) 697 .fetch_all(db) 698 .await 699 } 700 701 /** Get an initiated transaction matching the given magnet [code] */ 702 pub async fn initiated_by_code<'a>( 703 db: impl PgExecutor<'a>, 704 code: u64, 705 ) -> sqlx::Result<Option<Initiated>> { 706 sqlx::query( 707 " 708 SELECT initiated_id, amount, subject, credit_account, credit_name 709 FROM initiated 710 WHERE magnet_code IS $1 711 ", 712 ) 713 .bind(code as i64) 714 .try_map(|r: PgRow| { 715 Ok(Initiated { 716 id: r.try_get_u64(0)?, 717 amount: r.try_get_amount(1, &CURRENCY)?, 718 subject: r.try_get(2)?, 719 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), 720 }) 721 }) 722 .fetch_optional(db) 723 .await 724 } 725 726 /** Update status of a successful submitted initiated transaction */ 727 pub async fn initiated_submit_success<'a>( 728 db: impl PgExecutor<'a>, 729 id: u64, 730 timestamp: &Timestamp, 731 magnet_code: u64, 732 ) -> sqlx::Result<()> { 733 sqlx::query( 734 " 735 UPDATE initiated 736 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2 737 WHERE initiated_id=$3 738 " 739 ).bind_timestamp(timestamp) 740 .bind(magnet_code as i64) 741 .bind(id as i64) 742 .execute(db).await?; 743 Ok(()) 744 } 745 746 /** Update status of a permanently failed initiated transaction */ 747 pub async fn initiated_submit_permanent_failure<'a>( 748 db: impl PgExecutor<'a>, 749 id: u64, 750 timestamp: &Timestamp, 751 msg: &str, 752 ) -> sqlx::Result<()> { 753 sqlx::query( 754 " 755 UPDATE initiated 756 SET status='permanent_failure', status_msg=$2 757 WHERE initiated_id=$3 758 ", 759 ) 760 .bind_timestamp(timestamp) 761 .bind(msg) 762 .bind(id as i64) 763 .execute(db) 764 .await?; 765 Ok(()) 766 } 767 768 /** Check if an initiated transaction exist for a magnet code */ 769 pub async fn initiated_exists_for_code<'a>( 770 db: impl PgExecutor<'a>, 771 code: u64, 772 ) -> sqlx::Result<Option<u64>> { 773 sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1") 774 .bind(code as i64) 775 .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64)) 776 .fetch_optional(db) 777 .await 778 } 779 780 /** Get JSON value from KV table */ 781 pub async fn kv_get<'a, T: DeserializeOwned + Unpin + Send>( 782 db: impl PgExecutor<'a>, 783 key: &str, 784 ) -> sqlx::Result<Option<T>> { 785 sqlx::query("SELECT value FROM kv WHERE key=$1") 786 .bind(key) 787 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 788 .fetch_optional(db) 789 .await 790 } 791 792 /** Set JSON value in KV table */ 793 pub async fn kv_set<'a, T: Serialize>( 794 db: impl PgExecutor<'a>, 795 key: &str, 796 value: &T, 797 ) -> sqlx::Result<()> { 798 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 799 .bind(key) 800 .bind(sqlx::types::Json(value)) 801 .execute(db) 802 .await?; 803 Ok(()) 804 } 805 806 #[cfg(test)] 807 mod test { 808 use jiff::{Span, Timestamp, Zoned}; 809 use serde_json::json; 810 use sqlx::{PgConnection, PgPool, postgres::PgRow}; 811 use taler_api::{ 812 db::TypeHelper, 813 notification::dummy_listen, 814 subject::{IncomingSubject, OutgoingSubject}, 815 }; 816 use taler_common::{ 817 api_common::{EddsaPublicKey, HashCode, ShortHashCode}, 818 api_params::{History, Page}, 819 types::{ 820 amount::{amount, decimal}, 821 url, 822 utils::now_sql_stable_timestamp, 823 }, 824 }; 825 826 use crate::{ 827 constants::CONFIG_SOURCE, 828 db::{ 829 self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult, 830 TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer, 831 register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out, 832 }, 833 magnet_api::types::TxStatus, 834 magnet_payto, 835 }; 836 837 use super::TxInAdmin; 838 839 async fn setup() -> (PgConnection, PgPool) { 840 let pool = taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await; 841 let conn = pool.acquire().await.unwrap().leak(); 842 (conn, pool) 843 } 844 845 #[tokio::test] 846 async fn kv() { 847 let (mut db, _) = setup().await; 848 849 let value = json!({ 850 "name": "Mr Smith", 851 "no way": 32 852 }); 853 854 assert_eq!( 855 kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), 856 None 857 ); 858 kv_set(&mut db, "value", &value).await.unwrap(); 859 kv_set(&mut db, "value", &value).await.unwrap(); 860 assert_eq!( 861 kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), 862 Some(value) 863 ); 864 } 865 866 #[tokio::test] 867 async fn tx_in() { 868 let (mut db, pool) = setup().await; 869 870 async fn routine( 871 db: &mut PgConnection, 872 first: &Option<IncomingSubject>, 873 second: &Option<IncomingSubject>, 874 ) { 875 let (id, code) = 876 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in") 877 .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) 878 .fetch_one(&mut *db) 879 .await 880 .unwrap(); 881 let now = now_sql_stable_timestamp(); 882 let date = Zoned::now().date(); 883 let later = date.tomorrow().unwrap(); 884 let tx = TxIn { 885 code: code, 886 amount: amount("EUR:10"), 887 subject: "subject".to_owned(), 888 debtor: magnet_payto( 889 "payto://iban/HU30162000031000163100000000?receiver-name=name", 890 ), 891 value_date: date, 892 status: TxStatus::Completed, 893 }; 894 // Insert 895 assert_eq!( 896 register_tx_in(db, &tx, &first, &now) 897 .await 898 .expect("register tx in"), 899 AddIncomingResult::Success { 900 new: true, 901 row_id: id, 902 valued_at: date 903 } 904 ); 905 // Idempotent 906 assert_eq!( 907 register_tx_in( 908 db, 909 &TxIn { 910 value_date: later, 911 ..tx.clone() 912 }, 913 &first, 914 &now 915 ) 916 .await 917 .expect("register tx in"), 918 AddIncomingResult::Success { 919 new: false, 920 row_id: id, 921 valued_at: date 922 } 923 ); 924 // Many 925 assert_eq!( 926 register_tx_in( 927 db, 928 &TxIn { 929 code: code + 1, 930 value_date: later, 931 ..tx 932 }, 933 &second, 934 &now 935 ) 936 .await 937 .expect("register tx in"), 938 AddIncomingResult::Success { 939 new: true, 940 row_id: id + 1, 941 valued_at: later 942 } 943 ); 944 } 945 946 // Empty db 947 assert_eq!( 948 db::revenue_history(&pool, &History::default(), dummy_listen) 949 .await 950 .unwrap(), 951 Vec::new() 952 ); 953 assert_eq!( 954 db::incoming_history(&pool, &History::default(), dummy_listen) 955 .await 956 .unwrap(), 957 Vec::new() 958 ); 959 960 // Regular transaction 961 routine(&mut db, &None, &None).await; 962 963 // Reserve transaction 964 routine( 965 &mut db, 966 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 967 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 968 ) 969 .await; 970 971 // Kyc transaction 972 routine( 973 &mut db, 974 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 975 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 976 ) 977 .await; 978 979 // History 980 assert_eq!( 981 db::revenue_history(&pool, &History::default(), dummy_listen) 982 .await 983 .unwrap() 984 .len(), 985 6 986 ); 987 assert_eq!( 988 db::incoming_history(&pool, &History::default(), dummy_listen) 989 .await 990 .unwrap() 991 .len(), 992 4 993 ); 994 } 995 996 #[tokio::test] 997 async fn tx_in_admin() { 998 let (_, pool) = setup().await; 999 1000 // Empty db 1001 assert_eq!( 1002 db::incoming_history(&pool, &History::default(), dummy_listen) 1003 .await 1004 .unwrap(), 1005 Vec::new() 1006 ); 1007 1008 let now = now_sql_stable_timestamp(); 1009 let later = now + Span::new().hours(2); 1010 let date = Zoned::now().date(); 1011 let tx = TxInAdmin { 1012 amount: amount("EUR:10"), 1013 subject: "subject".to_owned(), 1014 debtor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"), 1015 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1016 }; 1017 // Insert 1018 assert_eq!( 1019 register_tx_in_admin(&pool, &tx, &now) 1020 .await 1021 .expect("register tx in"), 1022 AddIncomingResult::Success { 1023 new: true, 1024 row_id: 1, 1025 valued_at: date 1026 } 1027 ); 1028 // Idempotent 1029 assert_eq!( 1030 register_tx_in_admin(&pool, &tx, &later) 1031 .await 1032 .expect("register tx in"), 1033 AddIncomingResult::Success { 1034 new: false, 1035 row_id: 1, 1036 valued_at: date 1037 } 1038 ); 1039 // Many 1040 assert_eq!( 1041 register_tx_in_admin( 1042 &pool, 1043 &TxInAdmin { 1044 subject: "Other".to_owned(), 1045 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1046 ..tx.clone() 1047 }, 1048 &later 1049 ) 1050 .await 1051 .expect("register tx in"), 1052 AddIncomingResult::Success { 1053 new: true, 1054 row_id: 2, 1055 valued_at: date 1056 } 1057 ); 1058 1059 // History 1060 assert_eq!( 1061 db::incoming_history(&pool, &History::default(), dummy_listen) 1062 .await 1063 .unwrap() 1064 .len(), 1065 2 1066 ); 1067 } 1068 1069 #[tokio::test] 1070 async fn tx_out() { 1071 let (mut db, pool) = setup().await; 1072 1073 async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) { 1074 let (id, code) = 1075 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out") 1076 .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) 1077 .fetch_one(&mut *db) 1078 .await 1079 .unwrap(); 1080 let now = now_sql_stable_timestamp(); 1081 let date = Zoned::now().date(); 1082 let later = date.tomorrow().unwrap(); 1083 let tx = TxOut { 1084 code, 1085 amount: amount("HUF:10"), 1086 subject: "subject".to_owned(), 1087 creditor: magnet_payto( 1088 "payto://iban/HU30162000031000163100000000?receiver-name=name", 1089 ), 1090 value_date: date, 1091 status: TxStatus::Completed, 1092 }; 1093 assert!(matches!( 1094 make_transfer( 1095 &mut *db, 1096 &db::Transfer { 1097 request_uid: HashCode::rand(), 1098 amount: decimal("10"), 1099 exchange_base_url: url("https://exchange.test.com/"), 1100 wtid: ShortHashCode::rand(), 1101 creditor: tx.creditor.clone() 1102 }, 1103 &now 1104 ) 1105 .await 1106 .unwrap(), 1107 TransferResult::Success { .. } 1108 )); 1109 db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), tx.code) 1110 .await 1111 .expect("status success"); 1112 1113 // Insert 1114 assert_eq!( 1115 register_tx_out(&mut *db, &tx, first, &now) 1116 .await 1117 .expect("register tx out"), 1118 AddOutgoingResult { 1119 result: db::RegisterResult::known, 1120 row_id: id, 1121 } 1122 ); 1123 // Idempotent 1124 assert_eq!( 1125 register_tx_out( 1126 &mut *db, 1127 &TxOut { 1128 value_date: later, 1129 ..tx.clone() 1130 }, 1131 first, 1132 &now 1133 ) 1134 .await 1135 .expect("register tx out"), 1136 AddOutgoingResult { 1137 result: db::RegisterResult::idempotent, 1138 row_id: id, 1139 } 1140 ); 1141 // Recovered 1142 assert_eq!( 1143 register_tx_out( 1144 &mut *db, 1145 &TxOut { 1146 code: code + 1, 1147 value_date: later, 1148 ..tx.clone() 1149 }, 1150 second, 1151 &now 1152 ) 1153 .await 1154 .expect("register tx out"), 1155 AddOutgoingResult { 1156 result: db::RegisterResult::recovered, 1157 row_id: id + 1, 1158 } 1159 ); 1160 } 1161 1162 // Empty db 1163 assert_eq!( 1164 db::outgoing_history(&pool, &History::default(), dummy_listen) 1165 .await 1166 .unwrap(), 1167 Vec::new() 1168 ); 1169 1170 // Regular transaction 1171 routine(&mut db, &TxOutKind::Simple, &TxOutKind::Simple).await; 1172 1173 // Talerable transaction 1174 routine( 1175 &mut db, 1176 &TxOutKind::Talerable(OutgoingSubject::rand()), 1177 &TxOutKind::Talerable(OutgoingSubject::rand()), 1178 ) 1179 .await; 1180 1181 // Bounced transaction 1182 routine(&mut db, &TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1183 1184 // History 1185 assert_eq!( 1186 db::outgoing_history(&pool, &History::default(), dummy_listen) 1187 .await 1188 .unwrap() 1189 .len(), 1190 2 1191 ); 1192 } 1193 1194 #[tokio::test] 1195 async fn tx_out_failure() { 1196 let (mut db, _) = setup().await; 1197 1198 let now = now_sql_stable_timestamp(); 1199 1200 // Unknown 1201 assert_eq!( 1202 db::register_tx_out_failure(&mut db, 42, None, &now) 1203 .await 1204 .unwrap(), 1205 OutFailureResult { 1206 initiated_id: None, 1207 new: false 1208 } 1209 ); 1210 assert_eq!( 1211 db::register_tx_out_failure(&mut db, 42, Some(12), &now) 1212 .await 1213 .unwrap(), 1214 OutFailureResult { 1215 initiated_id: None, 1216 new: false 1217 } 1218 ); 1219 1220 // Initiated 1221 let req = db::Transfer { 1222 request_uid: HashCode::rand(), 1223 amount: decimal("10"), 1224 exchange_base_url: url("https://exchange.test.com/"), 1225 wtid: ShortHashCode::rand(), 1226 creditor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"), 1227 }; 1228 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1229 assert_eq!( 1230 make_transfer(&mut db, &req, &now).await.unwrap(), 1231 TransferResult::Success { 1232 id: 1, 1233 initiated_at: now 1234 } 1235 ); 1236 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34) 1237 .await 1238 .expect("status success"); 1239 assert_eq!( 1240 db::register_tx_out_failure(&mut db, 34, None, &now) 1241 .await 1242 .unwrap(), 1243 OutFailureResult { 1244 initiated_id: Some(1), 1245 new: true 1246 } 1247 ); 1248 assert_eq!( 1249 db::register_tx_out_failure(&mut db, 34, None, &now) 1250 .await 1251 .unwrap(), 1252 OutFailureResult { 1253 initiated_id: Some(1), 1254 new: false 1255 } 1256 ); 1257 1258 // Recovered bounce 1259 let tx = TxIn { 1260 code: 12, 1261 amount: amount("HUF:11"), 1262 subject: "malformed transaction".to_owned(), 1263 debtor: payto, 1264 value_date: Zoned::now().date(), 1265 status: TxStatus::Completed, 1266 }; 1267 assert_eq!( 1268 db::register_bounce_tx_in(&mut db, &tx, &tx.amount, "no reason", &now) 1269 .await 1270 .unwrap(), 1271 BounceResult { 1272 tx_id: 1, 1273 tx_new: true, 1274 bounce_id: 2, 1275 bounce_new: true 1276 } 1277 ); 1278 assert_eq!( 1279 db::register_tx_out_failure(&mut db, 10, Some(12), &now) 1280 .await 1281 .unwrap(), 1282 OutFailureResult { 1283 initiated_id: Some(2), 1284 new: true 1285 } 1286 ); 1287 assert_eq!( 1288 db::register_tx_out_failure(&mut db, 10, Some(12), &now) 1289 .await 1290 .unwrap(), 1291 OutFailureResult { 1292 initiated_id: Some(2), 1293 new: false 1294 } 1295 ); 1296 } 1297 1298 #[tokio::test] 1299 async fn transfer() { 1300 let (mut db, _) = setup().await; 1301 1302 // Empty db 1303 assert_eq!(db::transfer_by_id(&mut db, 0).await.unwrap(), None); 1304 assert_eq!( 1305 db::transfer_page(&mut db, &None, &Page::default()) 1306 .await 1307 .unwrap(), 1308 Vec::new() 1309 ); 1310 1311 let req = db::Transfer { 1312 request_uid: HashCode::rand(), 1313 amount: decimal("10"), 1314 exchange_base_url: url("https://exchange.test.com/"), 1315 wtid: ShortHashCode::rand(), 1316 creditor: magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), 1317 }; 1318 let now = now_sql_stable_timestamp(); 1319 let later = now + Span::new().hours(2); 1320 // Insert 1321 assert_eq!( 1322 make_transfer(&mut db, &req, &now).await.expect("transfer"), 1323 TransferResult::Success { 1324 id: 1, 1325 initiated_at: now.into() 1326 } 1327 ); 1328 // Idempotent 1329 assert_eq!( 1330 make_transfer(&mut db, &req, &later) 1331 .await 1332 .expect("transfer"), 1333 TransferResult::Success { 1334 id: 1, 1335 initiated_at: now.into() 1336 } 1337 ); 1338 // Request UID reuse 1339 assert_eq!( 1340 make_transfer( 1341 &mut db, 1342 &db::Transfer { 1343 wtid: ShortHashCode::rand(), 1344 ..req.clone() 1345 }, 1346 &now 1347 ) 1348 .await 1349 .expect("transfer"), 1350 TransferResult::RequestUidReuse 1351 ); 1352 // wtid reuse 1353 assert_eq!( 1354 make_transfer( 1355 &mut db, 1356 &db::Transfer { 1357 request_uid: HashCode::rand(), 1358 ..req.clone() 1359 }, 1360 &now 1361 ) 1362 .await 1363 .expect("transfer"), 1364 TransferResult::WtidReuse 1365 ); 1366 // Many 1367 assert_eq!( 1368 make_transfer( 1369 &mut db, 1370 &db::Transfer { 1371 request_uid: HashCode::rand(), 1372 wtid: ShortHashCode::rand(), 1373 ..req 1374 }, 1375 &later 1376 ) 1377 .await 1378 .expect("transfer"), 1379 TransferResult::Success { 1380 id: 2, 1381 initiated_at: later.into() 1382 } 1383 ); 1384 1385 // Get 1386 assert!(db::transfer_by_id(&mut db, 1).await.unwrap().is_some()); 1387 assert!(db::transfer_by_id(&mut db, 2).await.unwrap().is_some()); 1388 assert!(db::transfer_by_id(&mut db, 3).await.unwrap().is_none()); 1389 assert_eq!( 1390 db::transfer_page(&mut db, &None, &Page::default()) 1391 .await 1392 .unwrap() 1393 .len(), 1394 2 1395 ); 1396 } 1397 1398 #[tokio::test] 1399 async fn bounce() { 1400 let (mut db, _) = setup().await; 1401 1402 let amount = amount("HUF:10"); 1403 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1404 let now = now_sql_stable_timestamp(); 1405 let date = Zoned::now().date(); 1406 1407 // Empty db 1408 assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty()); 1409 1410 // Insert 1411 assert_eq!( 1412 register_tx_in( 1413 &mut db, 1414 &TxIn { 1415 code: 13, 1416 amount: amount.clone(), 1417 subject: "subject".to_owned(), 1418 debtor: payto.clone(), 1419 value_date: date, 1420 status: TxStatus::Completed 1421 }, 1422 &None, 1423 &now 1424 ) 1425 .await 1426 .expect("register tx in"), 1427 AddIncomingResult::Success { 1428 new: true, 1429 row_id: 1, 1430 valued_at: date 1431 } 1432 ); 1433 1434 // Bounce 1435 assert_eq!( 1436 register_bounce_tx_in( 1437 &mut db, 1438 &TxIn { 1439 code: 12, 1440 amount: amount.clone(), 1441 subject: "subject".to_owned(), 1442 debtor: payto.clone(), 1443 value_date: date, 1444 status: TxStatus::Completed 1445 }, 1446 &amount, 1447 "good reason", 1448 &now 1449 ) 1450 .await 1451 .expect("bounce"), 1452 BounceResult { 1453 tx_id: 2, 1454 tx_new: true, 1455 bounce_id: 1, 1456 bounce_new: true 1457 } 1458 ); 1459 // Idempotent 1460 assert_eq!( 1461 register_bounce_tx_in( 1462 &mut db, 1463 &TxIn { 1464 code: 12, 1465 amount: amount.clone(), 1466 subject: "subject".to_owned(), 1467 debtor: payto.clone(), 1468 value_date: date, 1469 status: TxStatus::Completed 1470 }, 1471 &amount, 1472 "good reason", 1473 &now 1474 ) 1475 .await 1476 .expect("bounce"), 1477 BounceResult { 1478 tx_id: 2, 1479 tx_new: false, 1480 bounce_id: 1, 1481 bounce_new: false 1482 } 1483 ); 1484 1485 // Bounce registered 1486 assert_eq!( 1487 register_bounce_tx_in( 1488 &mut db, 1489 &TxIn { 1490 code: 13, 1491 amount: amount.clone(), 1492 subject: "subject".to_owned(), 1493 debtor: payto.clone(), 1494 value_date: date, 1495 status: TxStatus::Completed 1496 }, 1497 &amount, 1498 "good reason", 1499 &now 1500 ) 1501 .await 1502 .expect("bounce"), 1503 BounceResult { 1504 tx_id: 1, 1505 tx_new: false, 1506 bounce_id: 2, 1507 bounce_new: true 1508 } 1509 ); 1510 // Idempotent registered 1511 assert_eq!( 1512 register_bounce_tx_in( 1513 &mut db, 1514 &TxIn { 1515 code: 13, 1516 amount: amount.clone(), 1517 subject: "subject".to_owned(), 1518 debtor: payto.clone(), 1519 value_date: date, 1520 status: TxStatus::Completed 1521 }, 1522 &amount, 1523 "good reason", 1524 &now 1525 ) 1526 .await 1527 .expect("bounce"), 1528 BounceResult { 1529 tx_id: 1, 1530 tx_new: false, 1531 bounce_id: 2, 1532 bounce_new: false 1533 } 1534 ); 1535 1536 // Batch 1537 assert_eq!( 1538 db::pending_batch(&mut db, &now).await.unwrap(), 1539 &[ 1540 Initiated { 1541 id: 1, 1542 amount: amount.clone(), 1543 subject: "bounce: 12".to_owned(), 1544 creditor: payto.clone() 1545 }, 1546 Initiated { 1547 id: 2, 1548 amount, 1549 subject: "bounce: 13".to_owned(), 1550 creditor: payto 1551 } 1552 ] 1553 ); 1554 } 1555 1556 #[tokio::test] 1557 async fn status() { 1558 let (mut db, _) = setup().await; 1559 1560 // Unknown transfer 1561 db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg") 1562 .await 1563 .unwrap(); 1564 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1565 .await 1566 .unwrap(); 1567 } 1568 1569 #[tokio::test] 1570 async fn batch() { 1571 let (mut db, _) = setup().await; 1572 let start = Timestamp::now(); 1573 let magnet_payto = 1574 magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1575 1576 // Empty db 1577 let pendings = db::pending_batch(&mut db, &start) 1578 .await 1579 .expect("pending_batch"); 1580 assert_eq!(pendings.len(), 0); 1581 1582 // Some transfers 1583 for i in 0..3 { 1584 make_transfer( 1585 &mut db, 1586 &db::Transfer { 1587 request_uid: HashCode::rand(), 1588 amount: decimal(format!("{}", i + 1)), 1589 exchange_base_url: url("https://exchange.test.com/"), 1590 wtid: ShortHashCode::rand(), 1591 creditor: magnet_payto.clone(), 1592 }, 1593 &Timestamp::now(), 1594 ) 1595 .await 1596 .expect("transfer"); 1597 } 1598 let pendings = db::pending_batch(&mut db, &start) 1599 .await 1600 .expect("pending_batch"); 1601 assert_eq!(pendings.len(), 3); 1602 1603 // Max 100 txs in batch 1604 for i in 0..100 { 1605 make_transfer( 1606 &mut db, 1607 &db::Transfer { 1608 request_uid: HashCode::rand(), 1609 amount: decimal(format!("{}", i + 1)), 1610 exchange_base_url: url("https://exchange.test.com/"), 1611 wtid: ShortHashCode::rand(), 1612 creditor: magnet_payto.clone(), 1613 }, 1614 &Timestamp::now(), 1615 ) 1616 .await 1617 .expect("transfer"); 1618 } 1619 let pendings = db::pending_batch(&mut db, &start) 1620 .await 1621 .expect("pending_batch"); 1622 assert_eq!(pendings.len(), 100); 1623 1624 // Skip uploaded 1625 for i in 0..=10 { 1626 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1627 .await 1628 .expect("status success"); 1629 } 1630 let pendings = db::pending_batch(&mut db, &start) 1631 .await 1632 .expect("pending_batch"); 1633 assert_eq!(pendings.len(), 93); 1634 1635 // Skip failed 1636 for i in 0..=10 { 1637 db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure") 1638 .await 1639 .expect("status failure"); 1640 } 1641 let pendings = db::pending_batch(&mut db, &start) 1642 .await 1643 .expect("pending_batch"); 1644 assert_eq!(pendings.len(), 83); 1645 } 1646 }