db.rs (47527B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use jiff::{Timestamp, civil::Date, tz::TimeZone}; 20 use serde::{Serialize, de::DeserializeOwned}; 21 use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow}; 22 use taler_api::{ 23 db::{BindHelper, IncomingType, TypeHelper, history, page}, 24 subject::{IncomingSubject, OutgoingSubject}, 25 }; 26 use taler_common::{ 27 api_params::{History, Page}, 28 api_revenue::RevenueIncomingBankTransaction, 29 api_wire::{ 30 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferRequest, 31 TransferState, TransferStatus, 32 }, 33 types::{amount::Amount, payto::PaytoImpl as _}, 34 }; 35 use tokio::sync::watch::{Receiver, Sender}; 36 37 use crate::{FullHuPayto, constants::CURRENCY, magnet_api::types::TxStatus}; 38 39 pub async fn notification_listener( 40 pool: PgPool, 41 in_channel: Sender<i64>, 42 taler_in_channel: Sender<i64>, 43 out_channel: Sender<i64>, 44 taler_out_channel: Sender<i64>, 45 ) -> sqlx::Result<()> { 46 taler_api::notification::notification_listener!(&pool, 47 "tx_in" => (row_id: i64) { 48 in_channel.send_replace(row_id); 49 }, 50 "taler_in" => (row_id: i64) { 51 taler_in_channel.send_replace(row_id); 52 }, 53 "tx_out" => (row_id: i64) { 54 out_channel.send_replace(row_id); 55 }, 56 "taler_out" => (row_id: i64) { 57 taler_out_channel.send_replace(row_id); 58 } 59 ) 60 } 61 62 #[derive(Debug, Clone)] 63 pub struct TxIn { 64 pub code: u64, 65 pub amount: Amount, 66 pub subject: String, 67 pub debtor: FullHuPayto, 68 pub value_date: Date, 69 pub status: TxStatus, 70 } 71 72 impl Display for TxIn { 73 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 74 let Self { 75 code, 76 amount, 77 subject, 78 debtor, 79 value_date, 80 status, 81 } = self; 82 write!( 83 f, 84 "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'", 85 debtor.bban(), 86 debtor.name 87 ) 88 } 89 } 90 91 #[derive(Debug, Clone)] 92 pub struct TxOut { 93 pub code: u64, 94 pub amount: Amount, 95 pub subject: String, 96 pub creditor: FullHuPayto, 97 pub value_date: Date, 98 pub status: TxStatus, 99 } 100 101 impl Display for TxOut { 102 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 103 let Self { 104 code, 105 amount, 106 subject, 107 creditor, 108 value_date, 109 status, 110 } = self; 111 write!( 112 f, 113 "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'", 114 creditor.bban(), 115 &creditor.name 116 ) 117 } 118 } 119 120 #[derive(Debug, PartialEq, Eq)] 121 pub struct Initiated { 122 pub id: u64, 123 pub amount: Amount, 124 pub subject: String, 125 pub creditor: FullHuPayto, 126 } 127 128 impl Display for Initiated { 129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 130 let Self { 131 id, 132 amount, 133 subject, 134 creditor, 135 } = self; 136 write!( 137 f, 138 "{id} {amount} ({} {}) '{subject}'", 139 creditor.bban(), 140 &creditor.name 141 ) 142 } 143 } 144 145 #[derive(Debug, Clone)] 146 pub struct TxInAdmin { 147 pub amount: Amount, 148 pub subject: String, 149 pub debtor: FullHuPayto, 150 pub metadata: IncomingSubject, 151 } 152 153 #[derive(Debug, PartialEq, Eq)] 154 pub enum AddIncomingResult { 155 Success { 156 new: bool, 157 row_id: u64, 158 valued_at: Date, 159 }, 160 ReservePubReuse, 161 } 162 163 pub async fn register_tx_in_admin( 164 db: &PgPool, 165 tx: &TxInAdmin, 166 now: &Timestamp, 167 ) -> sqlx::Result<AddIncomingResult> { 168 sqlx::query( 169 " 170 SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new 171 FROM register_tx_in(NULL, ($1, $2)::taler_amount, $3, $4, $5, $6, $7, $8, $6) 172 ", 173 ) 174 .bind_amount(&tx.amount) 175 .bind(&tx.subject) 176 .bind(tx.debtor.iban()) 177 .bind(&tx.debtor.name) 178 .bind_date(&now.to_zoned(TimeZone::UTC).date()) 179 .bind(tx.metadata.ty()) 180 .bind(tx.metadata.key()) 181 .try_map(|r: PgRow| { 182 Ok(if r.try_get(0)? { 183 AddIncomingResult::ReservePubReuse 184 } else { 185 AddIncomingResult::Success { 186 row_id: r.try_get_u64(1)?, 187 valued_at: r.try_get_date(2)?, 188 new: r.try_get(3)?, 189 } 190 }) 191 }) 192 .fetch_one(db) 193 .await 194 } 195 196 pub async fn register_tx_in( 197 db: &mut PgConnection, 198 tx: &TxIn, 199 subject: &Option<IncomingSubject>, 200 now: &Timestamp, 201 ) -> sqlx::Result<AddIncomingResult> { 202 sqlx::query( 203 " 204 SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new 205 FROM register_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10) 206 ", 207 ) 208 .bind(tx.code as i64) 209 .bind_amount(&tx.amount) 210 .bind(&tx.subject) 211 .bind(tx.debtor.iban()) 212 .bind(&tx.debtor.name) 213 .bind_date(&tx.value_date) 214 .bind(subject.as_ref().map(|it| it.ty())) 215 .bind(subject.as_ref().map(|it| it.key())) 216 .bind_timestamp(now) 217 .try_map(|r: PgRow| { 218 Ok(if r.try_get(0)? { 219 AddIncomingResult::ReservePubReuse 220 } else { 221 AddIncomingResult::Success { 222 row_id: r.try_get_u64(1)?, 223 valued_at: r.try_get_date(2)?, 224 new: r.try_get(3)?, 225 } 226 }) 227 }) 228 .fetch_one(db) 229 .await 230 } 231 232 #[derive(Debug)] 233 pub enum TxOutKind { 234 Simple, 235 Bounce(u32), 236 Talerable(OutgoingSubject), 237 } 238 239 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 240 #[allow(non_camel_case_types)] 241 #[sqlx(type_name = "register_result")] 242 pub enum RegisterResult { 243 /// Already registered 244 idempotent, 245 /// Initiated transaction 246 known, 247 /// Recovered unknown outgoing transaction 248 recovered, 249 } 250 251 #[derive(Debug, PartialEq, Eq)] 252 pub struct AddOutgoingResult { 253 pub result: RegisterResult, 254 pub row_id: u64, 255 } 256 257 pub async fn register_tx_out( 258 db: &mut PgConnection, 259 tx: &TxOut, 260 kind: &TxOutKind, 261 now: &Timestamp, 262 ) -> sqlx::Result<AddOutgoingResult> { 263 let query = sqlx::query( 264 " 265 SELECT out_result, out_tx_row_id 266 FROM register_tx_out($1, ($2, $3)::taler_amount, $4, $5, $6, $7, $8, $9, $10, $11) 267 ", 268 ) 269 .bind(tx.code as i64) 270 .bind_amount(&tx.amount) 271 .bind(&tx.subject) 272 .bind(tx.creditor.iban()) 273 .bind(&tx.creditor.name) 274 .bind_date(&tx.value_date); 275 let query = match kind { 276 TxOutKind::Simple => query 277 .bind(None::<&[u8]>) 278 .bind(None::<&str>) 279 .bind(None::<i64>), 280 TxOutKind::Bounce(bounced) => query 281 .bind(None::<&[u8]>) 282 .bind(None::<&str>) 283 .bind(*bounced as i64), 284 TxOutKind::Talerable(subject) => query 285 .bind(subject.0.as_ref()) 286 .bind(subject.1.as_ref()) 287 .bind(None::<i64>), 288 }; 289 query 290 .bind_timestamp(now) 291 .try_map(|r: PgRow| { 292 Ok(AddOutgoingResult { 293 result: r.try_get(0)?, 294 row_id: r.try_get_u64(1)?, 295 }) 296 }) 297 .fetch_one(db) 298 .await 299 } 300 301 #[derive(Debug, PartialEq, Eq)] 302 pub struct OutFailureResult { 303 pub initiated_id: Option<u64>, 304 pub new: bool, 305 } 306 307 pub async fn register_tx_out_failure( 308 db: &mut PgConnection, 309 code: u64, 310 bounced: Option<u32>, 311 now: &Timestamp, 312 ) -> sqlx::Result<OutFailureResult> { 313 sqlx::query( 314 " 315 SELECT out_new, out_initiated_id 316 FROM register_tx_out_failure($1, $2, $3) 317 ", 318 ) 319 .bind(code as i64) 320 .bind(bounced.map(|i| i as i32)) 321 .bind_timestamp(now) 322 .try_map(|r: PgRow| { 323 Ok(OutFailureResult { 324 new: r.try_get(0)?, 325 initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64), 326 }) 327 }) 328 .fetch_one(db) 329 .await 330 } 331 332 #[derive(Debug, PartialEq, Eq)] 333 pub enum TransferResult { 334 Success { id: u64, initiated_at: Timestamp }, 335 RequestUidReuse, 336 WtidReuse, 337 } 338 339 pub async fn make_transfer<'a>( 340 db: impl PgExecutor<'a>, 341 req: &TransferRequest, 342 creditor: &FullHuPayto, 343 now: &Timestamp, 344 ) -> sqlx::Result<TransferResult> { 345 let subject = format!("{} {}", req.wtid, req.exchange_base_url); 346 sqlx::query( 347 " 348 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 349 FROM taler_transfer($1, $2, $3, ($4, $5)::taler_amount, $6, $7, $8, $9) 350 ", 351 ) 352 .bind(req.request_uid.as_ref()) 353 .bind(req.wtid.as_ref()) 354 .bind(&subject) 355 .bind_amount(&req.amount) 356 .bind(req.exchange_base_url.as_str()) 357 .bind(creditor.iban()) 358 .bind(&creditor.name) 359 .bind_timestamp(now) 360 .try_map(|r: PgRow| { 361 Ok(if r.try_get(0)? { 362 TransferResult::RequestUidReuse 363 } else if r.try_get(1)? { 364 TransferResult::WtidReuse 365 } else { 366 TransferResult::Success { 367 id: r.try_get_u64(2)?, 368 initiated_at: r.try_get_timestamp(3)?, 369 } 370 }) 371 }) 372 .fetch_one(db) 373 .await 374 } 375 376 #[derive(Debug, PartialEq, Eq)] 377 pub struct BounceResult { 378 pub tx_id: u64, 379 pub tx_new: bool, 380 pub bounce_id: u64, 381 pub bounce_new: bool, 382 } 383 384 pub async fn register_bounce_tx_in( 385 db: &mut PgConnection, 386 tx: &TxIn, 387 amount: &Amount, 388 reason: &str, 389 now: &Timestamp, 390 ) -> sqlx::Result<BounceResult> { 391 sqlx::query( 392 " 393 SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new 394 FROM register_bounce_tx_in($1, ($2, $3)::taler_amount, $4, $5, $6, $7, ($8, $9)::taler_amount, $10, $11) 395 ", 396 ) 397 .bind(tx.code as i64) 398 .bind_amount(&tx.amount) 399 .bind(&tx.subject) 400 .bind(tx.debtor.iban()) 401 .bind(&tx.debtor.name) 402 .bind_date(&tx.value_date) 403 .bind_amount(amount) 404 .bind(reason) 405 .bind_timestamp(now) 406 .try_map(|r: PgRow| { 407 Ok(BounceResult { 408 tx_id: r.try_get_u64(0)?, 409 tx_new: r.try_get(1)?, 410 bounce_id: r.try_get_u64(2)?, 411 bounce_new: r.try_get(3)?, 412 }) 413 }) 414 .fetch_one(db) 415 .await 416 } 417 418 pub async fn transfer_page<'a>( 419 db: impl PgExecutor<'a>, 420 status: &Option<TransferState>, 421 params: &Page, 422 ) -> sqlx::Result<Vec<TransferListStatus>> { 423 page( 424 db, 425 "initiated_id", 426 params, 427 || { 428 let mut builder = QueryBuilder::new( 429 " 430 SELECT 431 initiated_id, 432 status, 433 (amount).val as amount_val, 434 (amount).frac as amount_frac, 435 credit_account, 436 credit_name, 437 initiated_at 438 FROM transfer 439 JOIN initiated USING (initiated_id) 440 WHERE 441 ", 442 ); 443 if let Some(status) = status { 444 builder.push(" status = ").push_bind(status).push(" AND "); 445 } 446 builder 447 }, 448 |r: PgRow| { 449 Ok(TransferListStatus { 450 row_id: r.try_get_safeu64(0)?, 451 status: r.try_get(1)?, 452 amount: r.try_get_amount_i(2, &CURRENCY)?, 453 credit_account: r.try_get_iban(4)?.as_full_payto(r.try_get(5)?), 454 timestamp: r.try_get_timestamp(6)?.into(), 455 }) 456 }, 457 ) 458 .await 459 } 460 461 pub async fn outgoing_history( 462 db: &PgPool, 463 params: &History, 464 listen: impl FnOnce() -> Receiver<i64>, 465 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 466 history( 467 db, 468 "tx_out_id", 469 params, 470 listen, 471 || { 472 QueryBuilder::new( 473 " 474 SELECT 475 tx_out_id, 476 (amount).val as amount_val, 477 (amount).frac as amount_frac, 478 credit_account, 479 credit_name, 480 valued_at, 481 exchange_base_url, 482 wtid 483 FROM taler_out 484 JOIN tx_out USING (tx_out_id) 485 WHERE 486 ", 487 ) 488 }, 489 |r: PgRow| { 490 Ok(OutgoingBankTransaction { 491 row_id: r.try_get_safeu64(0)?, 492 amount: r.try_get_amount_i(1, &CURRENCY)?, 493 credit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 494 date: r.try_get_timestamp(5)?.into(), 495 exchange_base_url: r.try_get_url(6)?, 496 wtid: r.try_get_base32(7)?, 497 }) 498 }, 499 ) 500 .await 501 } 502 503 pub async fn incoming_history( 504 db: &PgPool, 505 params: &History, 506 listen: impl FnOnce() -> Receiver<i64>, 507 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 508 history( 509 db, 510 "tx_in_id", 511 params, 512 listen, 513 || { 514 QueryBuilder::new( 515 " 516 SELECT 517 type, 518 tx_in_id, 519 (amount).val as amount_val, 520 (amount).frac as amount_frac, 521 debit_account, 522 debit_name, 523 valued_at, 524 metadata 525 FROM taler_in 526 JOIN tx_in USING (tx_in_id) 527 WHERE 528 ", 529 ) 530 }, 531 |r: PgRow| { 532 Ok(match r.try_get(0)? { 533 IncomingType::reserve => IncomingBankTransaction::Reserve { 534 row_id: r.try_get_safeu64(1)?, 535 amount: r.try_get_amount_i(2, &CURRENCY)?, 536 debit_account: r.try_get_iban(4)?.as_full_payto(r.try_get(5)?), 537 date: r.try_get_timestamp(6)?.into(), 538 reserve_pub: r.try_get_base32(7)?, 539 }, 540 IncomingType::kyc => IncomingBankTransaction::Kyc { 541 row_id: r.try_get_safeu64(1)?, 542 amount: r.try_get_amount_i(2, &CURRENCY)?, 543 debit_account: r.try_get_iban(4)?.as_full_payto(r.try_get(5)?), 544 date: r.try_get_timestamp(6)?.into(), 545 account_pub: r.try_get_base32(7)?, 546 }, 547 IncomingType::wad => { 548 unimplemented!("WAD is not yet supported") 549 } 550 }) 551 }, 552 ) 553 .await 554 } 555 556 pub async fn revenue_history( 557 db: &PgPool, 558 params: &History, 559 listen: impl FnOnce() -> Receiver<i64>, 560 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 561 history( 562 db, 563 "tx_in_id", 564 params, 565 listen, 566 || { 567 QueryBuilder::new( 568 " 569 SELECT 570 tx_in_id, 571 valued_at, 572 (amount).val as amount_val, 573 (amount).frac as amount_frac, 574 debit_account, 575 debit_name, 576 subject 577 FROM tx_in 578 WHERE 579 ", 580 ) 581 }, 582 |r: PgRow| { 583 Ok(RevenueIncomingBankTransaction { 584 row_id: r.try_get_safeu64(0)?, 585 date: r.try_get_timestamp(1)?.into(), 586 amount: r.try_get_amount_i(2, &CURRENCY)?, 587 credit_fee: None, 588 debit_account: r.try_get_iban(4)?.as_full_payto(r.try_get(5)?), 589 subject: r.try_get(6)?, 590 }) 591 }, 592 ) 593 .await 594 } 595 596 pub async fn transfer_by_id<'a>( 597 db: impl PgExecutor<'a>, 598 id: u64, 599 ) -> sqlx::Result<Option<TransferStatus>> { 600 sqlx::query( 601 " 602 SELECT 603 status, 604 status_msg, 605 (amount).val as amount_val, 606 (amount).frac as amount_frac, 607 exchange_base_url, 608 wtid, 609 credit_account, 610 credit_name, 611 initiated_at 612 FROM transfer 613 JOIN initiated USING (initiated_id) 614 WHERE initiated_id = $1 615 ", 616 ) 617 .bind(id as i64) 618 .try_map(|r: PgRow| { 619 Ok(TransferStatus { 620 status: r.try_get(0)?, 621 status_msg: r.try_get(1)?, 622 amount: r.try_get_amount_i(2, &CURRENCY)?, 623 origin_exchange_url: r.try_get(4)?, 624 wtid: r.try_get_base32(5)?, 625 credit_account: r.try_get_iban(6)?.as_full_payto(r.try_get(7)?), 626 timestamp: r.try_get_timestamp(8)?.into(), 627 }) 628 }) 629 .fetch_optional(db) 630 .await 631 } 632 633 /** Get a batch of pending initiated transactions not attempted since [start] */ 634 pub async fn pending_batch<'a>( 635 db: impl PgExecutor<'a>, 636 start: &Timestamp, 637 ) -> sqlx::Result<Vec<Initiated>> { 638 sqlx::query( 639 " 640 SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name 641 FROM initiated 642 WHERE magnet_code IS NULL 643 AND status='pending' 644 AND (last_submitted IS NULL OR last_submitted < $1) 645 LIMIT 100 646 ", 647 ) 648 .bind_timestamp(start) 649 .try_map(|r: PgRow| { 650 Ok(Initiated { 651 id: r.try_get_u64(0)?, 652 amount: r.try_get_amount_i(1, &CURRENCY)?, 653 subject: r.try_get(3)?, 654 creditor: FullHuPayto::new(r.try_get_parse(4)?, r.try_get(5)?), 655 }) 656 }) 657 .fetch_all(db) 658 .await 659 } 660 661 /** Get an initiated transaction matching the given magnet [code] */ 662 pub async fn initiated_by_code<'a>( 663 db: impl PgExecutor<'a>, 664 code: u64, 665 ) -> sqlx::Result<Option<Initiated>> { 666 sqlx::query( 667 " 668 SELECT initiated_id, (amount).val, (amount).frac, subject, credit_account, credit_name 669 FROM initiated 670 WHERE magnet_code IS $1 671 ", 672 ) 673 .bind(code as i64) 674 .try_map(|r: PgRow| { 675 Ok(Initiated { 676 id: r.try_get_u64(0)?, 677 amount: r.try_get_amount_i(1, &CURRENCY)?, 678 subject: r.try_get(3)?, 679 creditor: FullHuPayto::new(r.try_get_parse(4)?, r.try_get(5)?), 680 }) 681 }) 682 .fetch_optional(db) 683 .await 684 } 685 686 /** Update status of a successful submitted initiated transaction */ 687 pub async fn initiated_submit_success<'a>( 688 db: impl PgExecutor<'a>, 689 id: u64, 690 timestamp: &Timestamp, 691 magnet_code: u64, 692 ) -> sqlx::Result<()> { 693 sqlx::query( 694 " 695 UPDATE initiated 696 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2 697 WHERE initiated_id=$3 698 " 699 ).bind_timestamp(timestamp) 700 .bind(magnet_code as i64) 701 .bind(id as i64) 702 .execute(db).await?; 703 Ok(()) 704 } 705 706 /** Update status of a permanently failed initiated transaction */ 707 pub async fn initiated_submit_permanent_failure<'a>( 708 db: impl PgExecutor<'a>, 709 id: u64, 710 timestamp: &Timestamp, 711 msg: &str, 712 ) -> sqlx::Result<()> { 713 sqlx::query( 714 " 715 UPDATE initiated 716 SET status='permanent_failure', status_msg=$2 717 WHERE initiated_id=$3 718 ", 719 ) 720 .bind_timestamp(timestamp) 721 .bind(msg) 722 .bind(id as i64) 723 .execute(db) 724 .await?; 725 Ok(()) 726 } 727 728 /** Check if an initiated transaction exist for a magnet code */ 729 pub async fn initiated_exists_for_code<'a>( 730 db: impl PgExecutor<'a>, 731 code: u64, 732 ) -> sqlx::Result<Option<u64>> { 733 sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1") 734 .bind(code as i64) 735 .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64)) 736 .fetch_optional(db) 737 .await 738 } 739 740 /** Get JSON value from KV table */ 741 pub async fn kv_get<'a, T: DeserializeOwned + Unpin + Send>( 742 db: impl PgExecutor<'a>, 743 key: &str, 744 ) -> sqlx::Result<Option<T>> { 745 sqlx::query("SELECT value FROM kv WHERE key=$1") 746 .bind(key) 747 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 748 .fetch_optional(db) 749 .await 750 } 751 752 /** Set JSON value in KV table */ 753 pub async fn kv_set<'a, T: Serialize>( 754 db: impl PgExecutor<'a>, 755 key: &str, 756 value: &T, 757 ) -> sqlx::Result<()> { 758 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 759 .bind(key) 760 .bind(sqlx::types::Json(value)) 761 .execute(db) 762 .await?; 763 Ok(()) 764 } 765 766 #[cfg(test)] 767 mod test { 768 use jiff::{Span, Timestamp, Zoned}; 769 use serde_json::json; 770 use sqlx::{PgConnection, PgPool, postgres::PgRow}; 771 use taler_api::{ 772 db::TypeHelper, 773 subject::{IncomingSubject, OutgoingSubject}, 774 }; 775 use taler_common::{ 776 api_common::{EddsaPublicKey, HashCode, ShortHashCode}, 777 api_params::{History, Page}, 778 api_wire::TransferRequest, 779 types::{amount::amount, payto::payto, url, utils::now_sql_stable_timestamp}, 780 }; 781 use tokio::sync::watch::Receiver; 782 783 use crate::{ 784 constants::{CONFIG_SOURCE, CURRENCY}, 785 db::{ 786 self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult, 787 TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer, 788 register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out, 789 }, 790 magnet_api::types::TxStatus, 791 magnet_payto, 792 }; 793 794 use super::TxInAdmin; 795 796 fn fake_listen<T: Default>() -> Receiver<T> { 797 tokio::sync::watch::channel(T::default()).1 798 } 799 800 async fn setup() -> (PgConnection, PgPool) { 801 let pool = taler_test_utils::db_test_setup(CONFIG_SOURCE).await; 802 let conn = pool.acquire().await.unwrap().leak(); 803 (conn, pool) 804 } 805 806 #[tokio::test] 807 async fn kv() { 808 let (mut db, _) = setup().await; 809 810 let value = json!({ 811 "name": "Mr Smith", 812 "no way": 32 813 }); 814 815 assert_eq!( 816 kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), 817 None 818 ); 819 kv_set(&mut db, "value", &value).await.unwrap(); 820 kv_set(&mut db, "value", &value).await.unwrap(); 821 assert_eq!( 822 kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), 823 Some(value) 824 ); 825 } 826 827 #[tokio::test] 828 async fn tx_in() { 829 let (mut db, pool) = setup().await; 830 831 async fn routine( 832 db: &mut PgConnection, 833 first: &Option<IncomingSubject>, 834 second: &Option<IncomingSubject>, 835 ) { 836 let (id, code) = 837 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in") 838 .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) 839 .fetch_one(&mut *db) 840 .await 841 .unwrap(); 842 let now = now_sql_stable_timestamp(); 843 let date = Zoned::now().date(); 844 let later = date.tomorrow().unwrap(); 845 let tx = TxIn { 846 code: code, 847 amount: amount("EUR:10"), 848 subject: "subject".to_owned(), 849 debtor: magnet_payto( 850 "payto://iban/HU30162000031000163100000000?receiver-name=name", 851 ), 852 value_date: date, 853 status: TxStatus::Completed, 854 }; 855 // Insert 856 assert_eq!( 857 register_tx_in(db, &tx, &first, &now) 858 .await 859 .expect("register tx in"), 860 AddIncomingResult::Success { 861 new: true, 862 row_id: id, 863 valued_at: date 864 } 865 ); 866 // Idempotent 867 assert_eq!( 868 register_tx_in( 869 db, 870 &TxIn { 871 value_date: later, 872 ..tx.clone() 873 }, 874 &first, 875 &now 876 ) 877 .await 878 .expect("register tx in"), 879 AddIncomingResult::Success { 880 new: false, 881 row_id: id, 882 valued_at: date 883 } 884 ); 885 // Many 886 assert_eq!( 887 register_tx_in( 888 db, 889 &TxIn { 890 code: code + 1, 891 value_date: later, 892 ..tx 893 }, 894 &second, 895 &now 896 ) 897 .await 898 .expect("register tx in"), 899 AddIncomingResult::Success { 900 new: true, 901 row_id: id + 1, 902 valued_at: later 903 } 904 ); 905 } 906 907 // Empty db 908 assert_eq!( 909 db::revenue_history(&pool, &History::default(), fake_listen) 910 .await 911 .unwrap(), 912 Vec::new() 913 ); 914 assert_eq!( 915 db::incoming_history(&pool, &History::default(), fake_listen) 916 .await 917 .unwrap(), 918 Vec::new() 919 ); 920 921 // Regular transaction 922 routine(&mut db, &None, &None).await; 923 924 // Reserve transaction 925 routine( 926 &mut db, 927 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 928 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 929 ) 930 .await; 931 932 // Kyc transaction 933 routine( 934 &mut db, 935 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 936 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 937 ) 938 .await; 939 940 // History 941 assert_eq!( 942 db::revenue_history(&pool, &History::default(), fake_listen) 943 .await 944 .unwrap() 945 .len(), 946 6 947 ); 948 assert_eq!( 949 db::incoming_history(&pool, &History::default(), fake_listen) 950 .await 951 .unwrap() 952 .len(), 953 4 954 ); 955 } 956 957 #[tokio::test] 958 async fn tx_in_admin() { 959 let (_, pool) = setup().await; 960 961 // Empty db 962 assert_eq!( 963 db::incoming_history(&pool, &History::default(), fake_listen) 964 .await 965 .unwrap(), 966 Vec::new() 967 ); 968 969 let now = now_sql_stable_timestamp(); 970 let later = now + Span::new().hours(2); 971 let date = Zoned::now().date(); 972 let tx = TxInAdmin { 973 amount: amount("EUR:10"), 974 subject: "subject".to_owned(), 975 debtor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"), 976 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 977 }; 978 // Insert 979 assert_eq!( 980 register_tx_in_admin(&pool, &tx, &now) 981 .await 982 .expect("register tx in"), 983 AddIncomingResult::Success { 984 new: true, 985 row_id: 1, 986 valued_at: date 987 } 988 ); 989 // Idempotent 990 assert_eq!( 991 register_tx_in_admin(&pool, &tx, &later) 992 .await 993 .expect("register tx in"), 994 AddIncomingResult::Success { 995 new: false, 996 row_id: 1, 997 valued_at: date 998 } 999 ); 1000 // Many 1001 assert_eq!( 1002 register_tx_in_admin( 1003 &pool, 1004 &TxInAdmin { 1005 subject: "Other".to_owned(), 1006 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1007 ..tx.clone() 1008 }, 1009 &later 1010 ) 1011 .await 1012 .expect("register tx in"), 1013 AddIncomingResult::Success { 1014 new: true, 1015 row_id: 2, 1016 valued_at: date 1017 } 1018 ); 1019 1020 // History 1021 assert_eq!( 1022 db::incoming_history(&pool, &History::default(), fake_listen) 1023 .await 1024 .unwrap() 1025 .len(), 1026 2 1027 ); 1028 } 1029 1030 #[tokio::test] 1031 async fn tx_out() { 1032 let (mut db, pool) = setup().await; 1033 1034 async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) { 1035 let (id, code) = 1036 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out") 1037 .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) 1038 .fetch_one(&mut *db) 1039 .await 1040 .unwrap(); 1041 let now = now_sql_stable_timestamp(); 1042 let date = Zoned::now().date(); 1043 let later = date.tomorrow().unwrap(); 1044 let tx = TxOut { 1045 code, 1046 amount: amount("HUF:10"), 1047 subject: "subject".to_owned(), 1048 creditor: magnet_payto( 1049 "payto://iban/HU30162000031000163100000000?receiver-name=name", 1050 ), 1051 value_date: date, 1052 status: TxStatus::Completed, 1053 }; 1054 assert!(matches!( 1055 make_transfer( 1056 &mut *db, 1057 &TransferRequest { 1058 request_uid: HashCode::rand(), 1059 amount: amount("HUF:10"), 1060 exchange_base_url: url("https://exchange.test.com/"), 1061 wtid: ShortHashCode::rand(), 1062 credit_account: payto( 1063 "payto://iban/HU02162000031000164800000000?receiver-name=name" 1064 ), 1065 }, 1066 &tx.creditor, 1067 &now 1068 ) 1069 .await 1070 .unwrap(), 1071 TransferResult::Success { .. } 1072 )); 1073 db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), tx.code) 1074 .await 1075 .expect("status success"); 1076 1077 // Insert 1078 assert_eq!( 1079 register_tx_out(&mut *db, &tx, first, &now) 1080 .await 1081 .expect("register tx out"), 1082 AddOutgoingResult { 1083 result: db::RegisterResult::known, 1084 row_id: id, 1085 } 1086 ); 1087 // Idempotent 1088 assert_eq!( 1089 register_tx_out( 1090 &mut *db, 1091 &TxOut { 1092 value_date: later, 1093 ..tx.clone() 1094 }, 1095 first, 1096 &now 1097 ) 1098 .await 1099 .expect("register tx out"), 1100 AddOutgoingResult { 1101 result: db::RegisterResult::idempotent, 1102 row_id: id, 1103 } 1104 ); 1105 // Recovered 1106 assert_eq!( 1107 register_tx_out( 1108 &mut *db, 1109 &TxOut { 1110 code: code + 1, 1111 value_date: later, 1112 ..tx.clone() 1113 }, 1114 second, 1115 &now 1116 ) 1117 .await 1118 .expect("register tx out"), 1119 AddOutgoingResult { 1120 result: db::RegisterResult::recovered, 1121 row_id: id + 1, 1122 } 1123 ); 1124 } 1125 1126 // Empty db 1127 assert_eq!( 1128 db::outgoing_history(&pool, &History::default(), fake_listen) 1129 .await 1130 .unwrap(), 1131 Vec::new() 1132 ); 1133 1134 // Regular transaction 1135 routine(&mut db, &TxOutKind::Simple, &TxOutKind::Simple).await; 1136 1137 // Talerable transaction 1138 routine( 1139 &mut db, 1140 &TxOutKind::Talerable(OutgoingSubject( 1141 ShortHashCode::rand(), 1142 url("https://exchange.com"), 1143 )), 1144 &TxOutKind::Talerable(OutgoingSubject( 1145 ShortHashCode::rand(), 1146 url("https://exchange.com"), 1147 )), 1148 ) 1149 .await; 1150 1151 // Bounced transaction 1152 routine(&mut db, &TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1153 1154 // History 1155 assert_eq!( 1156 db::outgoing_history(&pool, &History::default(), fake_listen) 1157 .await 1158 .unwrap() 1159 .len(), 1160 2 1161 ); 1162 } 1163 1164 #[tokio::test] 1165 async fn tx_out_failure() { 1166 let (mut db, _) = setup().await; 1167 1168 let now = now_sql_stable_timestamp(); 1169 1170 // Unknown 1171 assert_eq!( 1172 db::register_tx_out_failure(&mut db, 42, None, &now) 1173 .await 1174 .unwrap(), 1175 OutFailureResult { 1176 initiated_id: None, 1177 new: false 1178 } 1179 ); 1180 assert_eq!( 1181 db::register_tx_out_failure(&mut db, 42, Some(12), &now) 1182 .await 1183 .unwrap(), 1184 OutFailureResult { 1185 initiated_id: None, 1186 new: false 1187 } 1188 ); 1189 1190 // Initiated 1191 let req = TransferRequest { 1192 request_uid: HashCode::rand(), 1193 amount: amount("HUF:10"), 1194 exchange_base_url: url("https://exchange.test.com/"), 1195 wtid: ShortHashCode::rand(), 1196 credit_account: payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), 1197 }; 1198 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1199 assert_eq!( 1200 make_transfer(&mut db, &req, &payto, &now).await.unwrap(), 1201 TransferResult::Success { 1202 id: 1, 1203 initiated_at: now 1204 } 1205 ); 1206 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34) 1207 .await 1208 .expect("status success"); 1209 assert_eq!( 1210 db::register_tx_out_failure(&mut db, 34, None, &now) 1211 .await 1212 .unwrap(), 1213 OutFailureResult { 1214 initiated_id: Some(1), 1215 new: true 1216 } 1217 ); 1218 assert_eq!( 1219 db::register_tx_out_failure(&mut db, 34, None, &now) 1220 .await 1221 .unwrap(), 1222 OutFailureResult { 1223 initiated_id: Some(1), 1224 new: false 1225 } 1226 ); 1227 1228 // Recovered bounce 1229 let tx = TxIn { 1230 code: 12, 1231 amount: amount("HUF:11"), 1232 subject: "malformed transaction".to_owned(), 1233 debtor: payto, 1234 value_date: Zoned::now().date(), 1235 status: TxStatus::Completed, 1236 }; 1237 assert_eq!( 1238 db::register_bounce_tx_in(&mut db, &tx, &tx.amount, "no reason", &now) 1239 .await 1240 .unwrap(), 1241 BounceResult { 1242 tx_id: 1, 1243 tx_new: true, 1244 bounce_id: 2, 1245 bounce_new: true 1246 } 1247 ); 1248 assert_eq!( 1249 db::register_tx_out_failure(&mut db, 10, Some(12), &now) 1250 .await 1251 .unwrap(), 1252 OutFailureResult { 1253 initiated_id: Some(2), 1254 new: true 1255 } 1256 ); 1257 assert_eq!( 1258 db::register_tx_out_failure(&mut db, 10, Some(12), &now) 1259 .await 1260 .unwrap(), 1261 OutFailureResult { 1262 initiated_id: Some(2), 1263 new: false 1264 } 1265 ); 1266 } 1267 1268 #[tokio::test] 1269 async fn transfer() { 1270 let (mut db, _) = setup().await; 1271 1272 // Empty db 1273 assert_eq!(db::transfer_by_id(&mut db, 0).await.unwrap(), None); 1274 assert_eq!( 1275 db::transfer_page(&mut db, &None, &Page::default()) 1276 .await 1277 .unwrap(), 1278 Vec::new() 1279 ); 1280 1281 let req = TransferRequest { 1282 request_uid: HashCode::rand(), 1283 amount: amount("HUF:10"), 1284 exchange_base_url: url("https://exchange.test.com/"), 1285 wtid: ShortHashCode::rand(), 1286 credit_account: payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), 1287 }; 1288 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1289 let now = now_sql_stable_timestamp(); 1290 let later = now + Span::new().hours(2); 1291 // Insert 1292 assert_eq!( 1293 make_transfer(&mut db, &req, &payto, &now) 1294 .await 1295 .expect("transfer"), 1296 TransferResult::Success { 1297 id: 1, 1298 initiated_at: now.into() 1299 } 1300 ); 1301 // Idempotent 1302 assert_eq!( 1303 make_transfer(&mut db, &req, &payto, &later) 1304 .await 1305 .expect("transfer"), 1306 TransferResult::Success { 1307 id: 1, 1308 initiated_at: now.into() 1309 } 1310 ); 1311 // Request UID reuse 1312 assert_eq!( 1313 make_transfer( 1314 &mut db, 1315 &TransferRequest { 1316 wtid: ShortHashCode::rand(), 1317 ..req.clone() 1318 }, 1319 &payto, 1320 &now 1321 ) 1322 .await 1323 .expect("transfer"), 1324 TransferResult::RequestUidReuse 1325 ); 1326 // wtid reuse 1327 assert_eq!( 1328 make_transfer( 1329 &mut db, 1330 &TransferRequest { 1331 request_uid: HashCode::rand(), 1332 ..req.clone() 1333 }, 1334 &payto, 1335 &now 1336 ) 1337 .await 1338 .expect("transfer"), 1339 TransferResult::WtidReuse 1340 ); 1341 // Many 1342 assert_eq!( 1343 make_transfer( 1344 &mut db, 1345 &TransferRequest { 1346 request_uid: HashCode::rand(), 1347 wtid: ShortHashCode::rand(), 1348 ..req 1349 }, 1350 &payto, 1351 &later 1352 ) 1353 .await 1354 .expect("transfer"), 1355 TransferResult::Success { 1356 id: 2, 1357 initiated_at: later.into() 1358 } 1359 ); 1360 1361 // Get 1362 assert!(db::transfer_by_id(&mut db, 1).await.unwrap().is_some()); 1363 assert!(db::transfer_by_id(&mut db, 2).await.unwrap().is_some()); 1364 assert!(db::transfer_by_id(&mut db, 3).await.unwrap().is_none()); 1365 assert_eq!( 1366 db::transfer_page(&mut db, &None, &Page::default()) 1367 .await 1368 .unwrap() 1369 .len(), 1370 2 1371 ); 1372 } 1373 1374 #[tokio::test] 1375 async fn bounce() { 1376 let (mut db, _) = setup().await; 1377 1378 let amount = amount("HUF:10"); 1379 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1380 let now = now_sql_stable_timestamp(); 1381 let date = Zoned::now().date(); 1382 1383 // Empty db 1384 assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty()); 1385 1386 // Insert 1387 assert_eq!( 1388 register_tx_in( 1389 &mut db, 1390 &TxIn { 1391 code: 13, 1392 amount: amount.clone(), 1393 subject: "subject".to_owned(), 1394 debtor: payto.clone(), 1395 value_date: date, 1396 status: TxStatus::Completed 1397 }, 1398 &None, 1399 &now 1400 ) 1401 .await 1402 .expect("register tx in"), 1403 AddIncomingResult::Success { 1404 new: true, 1405 row_id: 1, 1406 valued_at: date 1407 } 1408 ); 1409 1410 // Bounce 1411 assert_eq!( 1412 register_bounce_tx_in( 1413 &mut db, 1414 &TxIn { 1415 code: 12, 1416 amount: amount.clone(), 1417 subject: "subject".to_owned(), 1418 debtor: payto.clone(), 1419 value_date: date, 1420 status: TxStatus::Completed 1421 }, 1422 &amount, 1423 "good reason", 1424 &now 1425 ) 1426 .await 1427 .expect("bounce"), 1428 BounceResult { 1429 tx_id: 2, 1430 tx_new: true, 1431 bounce_id: 1, 1432 bounce_new: true 1433 } 1434 ); 1435 // Idempotent 1436 assert_eq!( 1437 register_bounce_tx_in( 1438 &mut db, 1439 &TxIn { 1440 code: 12, 1441 amount: amount.clone(), 1442 subject: "subject".to_owned(), 1443 debtor: payto.clone(), 1444 value_date: date, 1445 status: TxStatus::Completed 1446 }, 1447 &amount, 1448 "good reason", 1449 &now 1450 ) 1451 .await 1452 .expect("bounce"), 1453 BounceResult { 1454 tx_id: 2, 1455 tx_new: false, 1456 bounce_id: 1, 1457 bounce_new: false 1458 } 1459 ); 1460 1461 // Bounce registered 1462 assert_eq!( 1463 register_bounce_tx_in( 1464 &mut db, 1465 &TxIn { 1466 code: 13, 1467 amount: amount.clone(), 1468 subject: "subject".to_owned(), 1469 debtor: payto.clone(), 1470 value_date: date, 1471 status: TxStatus::Completed 1472 }, 1473 &amount, 1474 "good reason", 1475 &now 1476 ) 1477 .await 1478 .expect("bounce"), 1479 BounceResult { 1480 tx_id: 1, 1481 tx_new: false, 1482 bounce_id: 2, 1483 bounce_new: true 1484 } 1485 ); 1486 // Idempotent registered 1487 assert_eq!( 1488 register_bounce_tx_in( 1489 &mut db, 1490 &TxIn { 1491 code: 13, 1492 amount: amount.clone(), 1493 subject: "subject".to_owned(), 1494 debtor: payto.clone(), 1495 value_date: date, 1496 status: TxStatus::Completed 1497 }, 1498 &amount, 1499 "good reason", 1500 &now 1501 ) 1502 .await 1503 .expect("bounce"), 1504 BounceResult { 1505 tx_id: 1, 1506 tx_new: false, 1507 bounce_id: 2, 1508 bounce_new: false 1509 } 1510 ); 1511 1512 // Batch 1513 assert_eq!( 1514 db::pending_batch(&mut db, &now).await.unwrap(), 1515 &[ 1516 Initiated { 1517 id: 1, 1518 amount: amount.clone(), 1519 subject: "bounce: 12".to_owned(), 1520 creditor: payto.clone() 1521 }, 1522 Initiated { 1523 id: 2, 1524 amount, 1525 subject: "bounce: 13".to_owned(), 1526 creditor: payto 1527 } 1528 ] 1529 ); 1530 } 1531 1532 #[tokio::test] 1533 async fn status() { 1534 let (mut db, _) = setup().await; 1535 1536 // Unknown transfer 1537 db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg") 1538 .await 1539 .unwrap(); 1540 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1541 .await 1542 .unwrap(); 1543 } 1544 1545 #[tokio::test] 1546 async fn batch() { 1547 let (mut db, _) = setup().await; 1548 let start = Timestamp::now(); 1549 let magnet_payto = 1550 magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1551 1552 // Empty db 1553 let pendings = db::pending_batch(&mut db, &start) 1554 .await 1555 .expect("pending_batch"); 1556 assert_eq!(pendings.len(), 0); 1557 1558 // Some transfers 1559 for i in 0..3 { 1560 make_transfer( 1561 &mut db, 1562 &TransferRequest { 1563 request_uid: HashCode::rand(), 1564 amount: amount(format!("{}:{}", *CURRENCY, i + 1)), 1565 exchange_base_url: url("https://exchange.test.com/"), 1566 wtid: ShortHashCode::rand(), 1567 credit_account: payto( 1568 "payto://iban/HU02162000031000164800000000?receiver-name=name", 1569 ), 1570 }, 1571 &magnet_payto, 1572 &Timestamp::now(), 1573 ) 1574 .await 1575 .expect("transfer"); 1576 } 1577 let pendings = db::pending_batch(&mut db, &start) 1578 .await 1579 .expect("pending_batch"); 1580 assert_eq!(pendings.len(), 3); 1581 1582 // Max 100 txs in batch 1583 for i in 0..100 { 1584 make_transfer( 1585 &mut db, 1586 &TransferRequest { 1587 request_uid: HashCode::rand(), 1588 amount: amount(format!("{}:{}", *CURRENCY, i + 1)), 1589 exchange_base_url: url("https://exchange.test.com/"), 1590 wtid: ShortHashCode::rand(), 1591 credit_account: payto( 1592 "payto://iban/HU02162000031000164800000000?receiver-name=name", 1593 ), 1594 }, 1595 &magnet_payto, 1596 &Timestamp::now(), 1597 ) 1598 .await 1599 .expect("transfer"); 1600 } 1601 let pendings = db::pending_batch(&mut db, &start) 1602 .await 1603 .expect("pending_batch"); 1604 assert_eq!(pendings.len(), 100); 1605 1606 // Skip uploaded 1607 for i in 0..=10 { 1608 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1609 .await 1610 .expect("status success"); 1611 } 1612 let pendings = db::pending_batch(&mut db, &start) 1613 .await 1614 .expect("pending_batch"); 1615 assert_eq!(pendings.len(), 93); 1616 1617 // Skip failed 1618 for i in 0..=10 { 1619 db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure") 1620 .await 1621 .expect("status failure"); 1622 } 1623 let pendings = db::pending_batch(&mut db, &start) 1624 .await 1625 .expect("pending_batch"); 1626 assert_eq!(pendings.len(), 83); 1627 } 1628 }