db.rs (46515B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use compact_str::CompactString; 20 use jiff::Timestamp; 21 use serde::{Serialize, de::DeserializeOwned}; 22 use sqlx::{PgConnection, PgExecutor, PgPool, QueryBuilder, Row, postgres::PgRow}; 23 use taler_api::{ 24 db::{BindHelper, IncomingType, TypeHelper, history, page}, 25 subject::{IncomingSubject, OutgoingSubject}, 26 }; 27 use taler_common::{ 28 api_common::{HashCode, ShortHashCode}, 29 api_params::{History, Page}, 30 api_revenue::RevenueIncomingBankTransaction, 31 api_wire::{ 32 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, 33 TransferStatus, 34 }, 35 config::Config, 36 types::{ 37 amount::{Currency, Decimal}, 38 payto::{PaytoImpl as _, PaytoURI}, 39 }, 40 }; 41 use tokio::sync::watch::{Receiver, Sender}; 42 use url::Url; 43 44 use crate::{ 45 config::parse_db_cfg, 46 payto::{CyclosAccount, CyclosId, FullCyclosPayto}, 47 }; 48 49 const SCHEMA: &str = "cyclos"; 50 51 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { 52 let db = parse_db_cfg(cfg)?; 53 let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; 54 Ok(pool) 55 } 56 57 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { 58 let db_cfg = parse_db_cfg(cfg)?; 59 let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; 60 let mut db = pool.acquire().await?; 61 taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), SCHEMA, reset).await?; 62 Ok(pool) 63 } 64 65 pub async fn notification_listener( 66 pool: PgPool, 67 in_channel: Sender<i64>, 68 taler_in_channel: Sender<i64>, 69 out_channel: Sender<i64>, 70 taler_out_channel: Sender<i64>, 71 ) -> sqlx::Result<()> { 72 taler_api::notification::notification_listener!(&pool, 73 "tx_in" => (row_id: i64) { 74 in_channel.send_replace(row_id); 75 }, 76 "taler_in" => (row_id: i64) { 77 taler_in_channel.send_replace(row_id); 78 }, 79 "tx_out" => (row_id: i64) { 80 out_channel.send_replace(row_id); 81 }, 82 "taler_out" => (row_id: i64) { 83 taler_out_channel.send_replace(row_id); 84 } 85 ) 86 } 87 88 #[derive(Debug, Clone)] 89 pub struct TxIn { 90 pub transfer_id: i64, 91 pub tx_id: Option<i64>, 92 pub amount: Decimal, 93 pub subject: String, 94 pub debtor_id: i64, 95 pub debtor_name: String, 96 pub valued_at: Timestamp, 97 } 98 99 impl Display for TxIn { 100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 101 let Self { 102 transfer_id, 103 tx_id, 104 amount, 105 subject, 106 valued_at, 107 debtor_id, 108 debtor_name, 109 } = self; 110 let tx_id = match tx_id { 111 Some(id) => format_args!(":{}", *id), 112 None => format_args!(""), 113 }; 114 write!( 115 f, 116 "{valued_at} {transfer_id}{tx_id} {amount} ({debtor_id} {debtor_name}) '{subject}'" 117 ) 118 } 119 } 120 121 #[derive(Debug, Clone)] 122 pub struct TxOut { 123 pub transfer_id: i64, 124 pub tx_id: Option<i64>, 125 pub amount: Decimal, 126 pub subject: String, 127 pub creditor_id: i64, 128 pub creditor_name: String, 129 pub valued_at: Timestamp, 130 } 131 132 impl Display for TxOut { 133 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 134 let Self { 135 transfer_id, 136 tx_id, 137 amount, 138 subject, 139 creditor_id, 140 creditor_name, 141 valued_at, 142 } = self; 143 let tx_id = match tx_id { 144 Some(id) => format_args!(":{}", *id), 145 None => format_args!(""), 146 }; 147 write!( 148 f, 149 "{valued_at} {transfer_id}{tx_id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 150 ) 151 } 152 } 153 154 #[derive(Debug, PartialEq, Eq)] 155 pub struct Initiated { 156 pub id: i64, 157 pub amount: Decimal, 158 pub subject: String, 159 pub creditor_id: i64, 160 pub creditor_name: String, 161 } 162 163 impl Display for Initiated { 164 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 165 let Self { 166 id, 167 amount, 168 subject, 169 creditor_id, 170 creditor_name, 171 } = self; 172 write!( 173 f, 174 "{id} {amount} ({creditor_id} {creditor_name}) '{subject}'" 175 ) 176 } 177 } 178 179 #[derive(Debug, Clone)] 180 pub struct TxInAdmin { 181 pub amount: Decimal, 182 pub subject: String, 183 pub debtor_id: i64, 184 pub debtor_name: CompactString, 185 pub metadata: IncomingSubject, 186 } 187 188 #[derive(Debug, PartialEq, Eq)] 189 pub enum AddIncomingResult { 190 Success { 191 new: bool, 192 row_id: u64, 193 valued_at: Timestamp, 194 }, 195 ReservePubReuse, 196 } 197 198 /// Lock the database for worker execution 199 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { 200 sqlx::query("SELECT pg_try_advisory_lock(42)") 201 .try_map(|r: PgRow| r.try_get(0)) 202 .fetch_one(e) 203 .await 204 } 205 206 pub async fn register_tx_in_admin( 207 db: &PgPool, 208 tx: &TxInAdmin, 209 now: &Timestamp, 210 ) -> sqlx::Result<AddIncomingResult> { 211 sqlx::query( 212 " 213 SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new 214 FROM register_tx_in(NULL, NULL, $1, $2, $3, $4, $5, $6, $7, $5) 215 ", 216 ) 217 .bind(tx.amount) 218 .bind(&tx.subject) 219 .bind(tx.debtor_id) 220 .bind(&tx.debtor_name) 221 .bind_timestamp(now) 222 .bind(tx.metadata.ty()) 223 .bind(tx.metadata.key()) 224 .try_map(|r: PgRow| { 225 Ok(if r.try_get_flag(0)? { 226 AddIncomingResult::ReservePubReuse 227 } else { 228 AddIncomingResult::Success { 229 row_id: r.try_get_u64(1)?, 230 valued_at: r.try_get_timestamp(2)?, 231 new: r.try_get(3)?, 232 } 233 }) 234 }) 235 .fetch_one(db) 236 .await 237 } 238 239 pub async fn register_tx_in( 240 db: &mut PgConnection, 241 tx: &TxIn, 242 subject: &Option<IncomingSubject>, 243 now: &Timestamp, 244 ) -> sqlx::Result<AddIncomingResult> { 245 sqlx::query( 246 " 247 SELECT out_reserve_pub_reuse, out_tx_row_id, out_valued_at, out_new 248 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 249 ", 250 ) 251 .bind(tx.transfer_id) 252 .bind(tx.tx_id) 253 .bind(tx.amount) 254 .bind(&tx.subject) 255 .bind(tx.debtor_id) 256 .bind(&tx.debtor_name) 257 .bind(tx.valued_at.as_microsecond()) 258 .bind(subject.as_ref().map(|it| it.ty())) 259 .bind(subject.as_ref().map(|it| it.key())) 260 .bind(now.as_microsecond()) 261 .try_map(|r: PgRow| { 262 Ok(if r.try_get_flag(0)? { 263 AddIncomingResult::ReservePubReuse 264 } else { 265 AddIncomingResult::Success { 266 row_id: r.try_get_u64(1)?, 267 valued_at: r.try_get_timestamp(2)?, 268 new: r.try_get(3)?, 269 } 270 }) 271 }) 272 .fetch_one(db) 273 .await 274 } 275 276 #[derive(Debug)] 277 pub enum TxOutKind { 278 Simple, 279 Bounce(i64), 280 Talerable(OutgoingSubject), 281 } 282 283 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 284 #[allow(non_camel_case_types)] 285 #[sqlx(type_name = "register_result")] 286 pub enum RegisterResult { 287 /// Already registered 288 idempotent, 289 /// Initiated transaction 290 known, 291 /// Recovered unknown outgoing transaction 292 recovered, 293 } 294 295 #[derive(Debug, PartialEq, Eq)] 296 pub struct AddOutgoingResult { 297 pub result: RegisterResult, 298 pub row_id: i64, 299 } 300 301 pub async fn register_tx_out( 302 db: &mut PgConnection, 303 tx: &TxOut, 304 kind: &TxOutKind, 305 now: &Timestamp, 306 ) -> sqlx::Result<AddOutgoingResult> { 307 let query = sqlx::query( 308 " 309 SELECT out_result, out_tx_row_id 310 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 311 ", 312 ) 313 .bind(tx.transfer_id) 314 .bind(tx.tx_id) 315 .bind(tx.amount) 316 .bind(&tx.subject) 317 .bind(tx.creditor_id) 318 .bind(&tx.creditor_name) 319 .bind_timestamp(&tx.valued_at); 320 let query = match kind { 321 TxOutKind::Simple => query 322 .bind(None::<&[u8]>) 323 .bind(None::<&str>) 324 .bind(None::<i64>), 325 TxOutKind::Bounce(bounced) => query.bind(None::<&[u8]>).bind(None::<&str>).bind(*bounced), 326 TxOutKind::Talerable(subject) => query 327 .bind(subject.wtid.as_ref()) 328 .bind(subject.exchange_base_url.as_ref()) 329 .bind(None::<i64>), 330 }; 331 query 332 .bind_timestamp(now) 333 .try_map(|r: PgRow| { 334 Ok(AddOutgoingResult { 335 result: r.try_get(0)?, 336 row_id: r.try_get(1)?, 337 }) 338 }) 339 .fetch_one(db) 340 .await 341 } 342 343 #[derive(Debug, PartialEq, Eq)] 344 pub enum TransferResult { 345 Success { id: u64, initiated_at: Timestamp }, 346 RequestUidReuse, 347 WtidReuse, 348 } 349 350 #[derive(Debug, Clone)] 351 pub struct Transfer { 352 pub request_uid: HashCode, 353 pub amount: Decimal, 354 pub exchange_base_url: Url, 355 pub wtid: ShortHashCode, 356 pub creditor_id: i64, 357 pub creditor_name: CompactString, 358 } 359 360 pub async fn make_transfer<'a>( 361 db: impl PgExecutor<'a>, 362 tx: &Transfer, 363 now: &Timestamp, 364 ) -> sqlx::Result<TransferResult> { 365 let subject = format!("{} {}", tx.wtid, tx.exchange_base_url); 366 sqlx::query( 367 " 368 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 369 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8) 370 ", 371 ) 372 .bind(tx.request_uid.as_ref()) 373 .bind(tx.wtid.as_ref()) 374 .bind(&subject) 375 .bind(tx.amount) 376 .bind(tx.exchange_base_url.as_str()) 377 .bind(tx.creditor_id) 378 .bind(&tx.creditor_name) 379 .bind_timestamp(now) 380 .try_map(|r: PgRow| { 381 Ok(if r.try_get_flag(0)? { 382 TransferResult::RequestUidReuse 383 } else if r.try_get_flag(1)? { 384 TransferResult::WtidReuse 385 } else { 386 TransferResult::Success { 387 id: r.try_get_u64(2)?, 388 initiated_at: r.try_get_timestamp(3)?, 389 } 390 }) 391 }) 392 .fetch_one(db) 393 .await 394 } 395 396 #[derive(Debug, PartialEq, Eq)] 397 pub struct BounceResult { 398 pub tx_id: u64, 399 pub tx_new: bool, 400 } 401 402 pub async fn register_bounced_tx_in( 403 db: &mut PgConnection, 404 tx: &TxIn, 405 chargeback_id: i64, 406 reason: &str, 407 now: &Timestamp, 408 ) -> sqlx::Result<BounceResult> { 409 sqlx::query( 410 " 411 SELECT out_tx_row_id, out_tx_new 412 FROM register_bounced_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) 413 ", 414 ) 415 .bind(tx.transfer_id) 416 .bind(tx.tx_id) 417 .bind(tx.amount) 418 .bind(&tx.subject) 419 .bind(tx.debtor_id) 420 .bind(&tx.debtor_name) 421 .bind_timestamp(&tx.valued_at) 422 .bind(chargeback_id) 423 .bind(reason) 424 .bind_timestamp(now) 425 .try_map(|r: PgRow| { 426 Ok(BounceResult { 427 tx_id: r.try_get_u64(0)?, 428 tx_new: r.try_get(1)?, 429 }) 430 }) 431 .fetch_one(db) 432 .await 433 } 434 435 pub async fn transfer_page<'a>( 436 db: impl PgExecutor<'a>, 437 status: &Option<TransferState>, 438 currency: &Currency, 439 root: &CompactString, 440 params: &Page, 441 ) -> sqlx::Result<Vec<TransferListStatus>> { 442 page( 443 db, 444 "initiated_id", 445 params, 446 || { 447 let mut builder = QueryBuilder::new( 448 " 449 SELECT 450 initiated_id, 451 status, 452 amount, 453 credit_account, 454 credit_name, 455 initiated_at 456 FROM transfer 457 JOIN initiated USING (initiated_id) 458 WHERE 459 ", 460 ); 461 if let Some(status) = status { 462 builder.push(" status = ").push_bind(status).push(" AND "); 463 } 464 builder 465 }, 466 |r: PgRow| { 467 Ok(TransferListStatus { 468 row_id: r.try_get_safeu64(0)?, 469 status: r.try_get(1)?, 470 amount: r.try_get_amount(2, currency)?, 471 credit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 472 timestamp: r.try_get_timestamp(5)?.into(), 473 }) 474 }, 475 ) 476 .await 477 } 478 479 pub async fn outgoing_history( 480 db: &PgPool, 481 params: &History, 482 currency: &Currency, 483 root: &CompactString, 484 listen: impl FnOnce() -> Receiver<i64>, 485 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 486 history( 487 db, 488 "tx_out_id", 489 params, 490 listen, 491 || { 492 QueryBuilder::new( 493 " 494 SELECT 495 tx_out_id, 496 amount, 497 credit_account, 498 credit_name, 499 valued_at, 500 exchange_base_url, 501 wtid 502 FROM taler_out 503 JOIN tx_out USING (tx_out_id) 504 WHERE 505 ", 506 ) 507 }, 508 |r: PgRow| { 509 Ok(OutgoingBankTransaction { 510 row_id: r.try_get_safeu64(0)?, 511 amount: r.try_get_amount(1, currency)?, 512 debit_fee: None, 513 credit_account: r.try_get_cyclos_fullpaytouri(2, 3, root)?, 514 date: r.try_get_timestamp(4)?.into(), 515 exchange_base_url: r.try_get_url(5)?, 516 wtid: r.try_get(6)?, 517 }) 518 }, 519 ) 520 .await 521 } 522 523 pub async fn incoming_history( 524 db: &PgPool, 525 params: &History, 526 currency: &Currency, 527 root: &CompactString, 528 listen: impl FnOnce() -> Receiver<i64>, 529 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 530 history( 531 db, 532 "tx_in_id", 533 params, 534 listen, 535 || { 536 QueryBuilder::new( 537 " 538 SELECT 539 type, 540 tx_in_id, 541 amount, 542 debit_account, 543 debit_name, 544 valued_at, 545 metadata 546 FROM taler_in 547 JOIN tx_in USING (tx_in_id) 548 WHERE 549 ", 550 ) 551 }, 552 |r: PgRow| { 553 Ok(match r.try_get(0)? { 554 IncomingType::reserve => IncomingBankTransaction::Reserve { 555 row_id: r.try_get_safeu64(1)?, 556 amount: r.try_get_amount(2, currency)?, 557 credit_fee: None, 558 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 559 date: r.try_get_timestamp(5)?.into(), 560 reserve_pub: r.try_get(6)?, 561 authorization_pub: None, 562 authorization_sig: None, 563 }, 564 IncomingType::kyc => IncomingBankTransaction::Kyc { 565 row_id: r.try_get_safeu64(1)?, 566 amount: r.try_get_amount(2, currency)?, 567 credit_fee: None, 568 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 569 date: r.try_get_timestamp(5)?.into(), 570 account_pub: r.try_get(6)?, 571 authorization_pub: None, 572 authorization_sig: None, 573 }, 574 IncomingType::wad => { 575 unimplemented!("WAD is not yet supported") 576 } 577 }) 578 }, 579 ) 580 .await 581 } 582 583 pub async fn revenue_history( 584 db: &PgPool, 585 params: &History, 586 currency: &Currency, 587 root: &CompactString, 588 listen: impl FnOnce() -> Receiver<i64>, 589 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 590 history( 591 db, 592 "tx_in_id", 593 params, 594 listen, 595 || { 596 QueryBuilder::new( 597 " 598 SELECT 599 tx_in_id, 600 valued_at, 601 amount, 602 debit_account, 603 debit_name, 604 subject 605 FROM tx_in 606 WHERE 607 ", 608 ) 609 }, 610 |r: PgRow| { 611 Ok(RevenueIncomingBankTransaction { 612 row_id: r.try_get_safeu64(0)?, 613 date: r.try_get_timestamp(1)?.into(), 614 amount: r.try_get_amount(2, currency)?, 615 credit_fee: None, 616 debit_account: r.try_get_cyclos_fullpaytouri(3, 4, root)?, 617 subject: r.try_get(5)?, 618 }) 619 }, 620 ) 621 .await 622 } 623 624 pub async fn transfer_by_id<'a>( 625 db: impl PgExecutor<'a>, 626 id: u64, 627 currency: &Currency, 628 root: &CompactString, 629 ) -> sqlx::Result<Option<TransferStatus>> { 630 sqlx::query( 631 " 632 SELECT 633 status, 634 status_msg, 635 amount, 636 exchange_base_url, 637 wtid, 638 credit_account, 639 credit_name, 640 initiated_at 641 FROM transfer 642 JOIN initiated USING (initiated_id) 643 WHERE initiated_id = $1 644 ", 645 ) 646 .bind(id as i64) 647 .try_map(|r: PgRow| { 648 Ok(TransferStatus { 649 status: r.try_get(0)?, 650 status_msg: r.try_get(1)?, 651 amount: r.try_get_amount(2, currency)?, 652 origin_exchange_url: r.try_get(3)?, 653 wtid: r.try_get(4)?, 654 credit_account: r.try_get_cyclos_fullpaytouri(5, 6, root)?, 655 timestamp: r.try_get_timestamp(7)?.into(), 656 }) 657 }) 658 .fetch_optional(db) 659 .await 660 } 661 662 /** Get a batch of pending initiated transactions not attempted since [start] */ 663 pub async fn pending_batch<'a>( 664 db: impl PgExecutor<'a>, 665 start: &Timestamp, 666 ) -> sqlx::Result<Vec<Initiated>> { 667 sqlx::query( 668 " 669 SELECT initiated_id, amount, subject, credit_account, credit_name 670 FROM initiated 671 WHERE tx_id IS NULL 672 AND status='pending' 673 AND (last_submitted IS NULL OR last_submitted < $1) 674 LIMIT 100 675 ", 676 ) 677 .bind_timestamp(start) 678 .try_map(|r: PgRow| { 679 Ok(Initiated { 680 id: r.try_get(0)?, 681 amount: r.try_get(1)?, 682 subject: r.try_get(2)?, 683 creditor_id: r.try_get(3)?, 684 creditor_name: r.try_get(4)?, 685 }) 686 }) 687 .fetch_all(db) 688 .await 689 } 690 691 /** Update status of a successful submitted initiated transaction */ 692 pub async fn initiated_submit_success<'a>( 693 db: impl PgExecutor<'a>, 694 initiated_id: i64, 695 timestamp: &Timestamp, 696 tx_id: i64, 697 ) -> sqlx::Result<()> { 698 sqlx::query( 699 " 700 UPDATE initiated 701 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, tx_id=$2 702 WHERE initiated_id=$3 703 " 704 ).bind_timestamp(timestamp) 705 .bind(tx_id) 706 .bind(initiated_id) 707 .execute(db).await?; 708 Ok(()) 709 } 710 711 /** Update status of a permanently failed initiated transaction */ 712 pub async fn initiated_submit_permanent_failure<'a>( 713 db: impl PgExecutor<'a>, 714 initiated_id: i64, 715 msg: &str, 716 ) -> sqlx::Result<()> { 717 sqlx::query( 718 " 719 UPDATE initiated 720 SET status='permanent_failure', status_msg=$1 721 WHERE initiated_id=$2 722 ", 723 ) 724 .bind(msg) 725 .bind(initiated_id) 726 .execute(db) 727 .await?; 728 Ok(()) 729 } 730 731 #[derive(Debug, Clone, Copy, PartialEq, Eq)] 732 pub enum ChargebackFailureResult { 733 Unknown, 734 Known(u64), 735 Idempotent(u64), 736 } 737 738 /** Update status of a charged back initiated transaction */ 739 pub async fn initiated_chargeback_failure( 740 db: &mut PgConnection, 741 transfer_id: i64, 742 ) -> sqlx::Result<ChargebackFailureResult> { 743 Ok( 744 sqlx::query("SELECT out_initiated_id, out_new FROM register_charge_back_failure($1)") 745 .bind(transfer_id) 746 .try_map(|r: PgRow| { 747 let id = r.try_get_u64(0)?; 748 Ok(if id == 0 { 749 ChargebackFailureResult::Unknown 750 } else if r.try_get(1)? { 751 ChargebackFailureResult::Known(id) 752 } else { 753 ChargebackFailureResult::Idempotent(id) 754 }) 755 }) 756 .fetch_optional(db) 757 .await? 758 .unwrap_or(ChargebackFailureResult::Unknown), 759 ) 760 } 761 762 /** Get JSON value from KV table */ 763 pub async fn kv_get<'a, T: DeserializeOwned + Unpin + Send>( 764 db: impl PgExecutor<'a>, 765 key: &str, 766 ) -> sqlx::Result<Option<T>> { 767 sqlx::query("SELECT value FROM kv WHERE key=$1") 768 .bind(key) 769 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 770 .fetch_optional(db) 771 .await 772 } 773 774 /** Set JSON value in KV table */ 775 pub async fn kv_set<'a, T: Serialize>( 776 db: impl PgExecutor<'a>, 777 key: &str, 778 value: &T, 779 ) -> sqlx::Result<()> { 780 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 781 .bind(key) 782 .bind(sqlx::types::Json(value)) 783 .execute(db) 784 .await?; 785 Ok(()) 786 } 787 788 pub trait CyclosTypeHelper { 789 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 790 &self, 791 idx: I, 792 name: I, 793 root: &CompactString, 794 ) -> sqlx::Result<FullCyclosPayto>; 795 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 796 &self, 797 idx: I, 798 name: I, 799 root: &CompactString, 800 ) -> sqlx::Result<PaytoURI>; 801 } 802 803 impl CyclosTypeHelper for PgRow { 804 fn try_get_cyclos_fullpayto<I: sqlx::ColumnIndex<Self>>( 805 &self, 806 idx: I, 807 name: I, 808 root: &CompactString, 809 ) -> sqlx::Result<FullCyclosPayto> { 810 let idx = self.try_get(idx)?; 811 let name = self.try_get(name)?; 812 Ok(FullCyclosPayto::new( 813 CyclosAccount { 814 id: CyclosId(idx), 815 root: root.clone(), 816 }, 817 name, 818 )) 819 } 820 fn try_get_cyclos_fullpaytouri<I: sqlx::ColumnIndex<Self>>( 821 &self, 822 idx: I, 823 name: I, 824 root: &CompactString, 825 ) -> sqlx::Result<PaytoURI> { 826 let idx = self.try_get(idx)?; 827 let name = self.try_get(name)?; 828 Ok(CyclosAccount { 829 id: CyclosId(idx), 830 root: root.clone(), 831 } 832 .as_full_payto(name)) 833 } 834 } 835 836 #[cfg(test)] 837 mod test { 838 use std::sync::LazyLock; 839 840 use compact_str::CompactString; 841 use jiff::{Span, Timestamp}; 842 use serde_json::json; 843 use sqlx::{PgConnection, PgPool, Row as _, postgres::PgRow}; 844 use taler_api::{ 845 db::TypeHelper, 846 notification::dummy_listen, 847 subject::{IncomingSubject, OutgoingSubject}, 848 }; 849 use taler_common::{ 850 api_common::{EddsaPublicKey, HashCode, ShortHashCode}, 851 api_params::{History, Page}, 852 api_wire::TransferState, 853 types::{ 854 amount::{Currency, decimal}, 855 url, 856 utils::now_sql_stable_timestamp, 857 }, 858 }; 859 860 use crate::{ 861 constants::CONFIG_SOURCE, 862 db::{ 863 self, AddIncomingResult, AddOutgoingResult, BounceResult, ChargebackFailureResult, 864 Transfer, TransferResult, TxIn, TxInAdmin, TxOut, TxOutKind, 865 }, 866 }; 867 868 pub static CURRENCY: LazyLock<Currency> = LazyLock::new(|| "TEST".parse().unwrap()); 869 pub const ROOT: CompactString = CompactString::const_new("localhost"); 870 871 async fn setup() -> (PgConnection, PgPool) { 872 let pool = taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await; 873 let conn = pool.acquire().await.unwrap().leak(); 874 (conn, pool) 875 } 876 877 #[tokio::test] 878 async fn kv() { 879 let (mut db, _) = setup().await; 880 881 let value = json!({ 882 "name": "Mr Smith", 883 "no way": 32 884 }); 885 886 assert_eq!( 887 db::kv_get::<serde_json::Value>(&mut db, "value") 888 .await 889 .unwrap(), 890 None 891 ); 892 db::kv_set(&mut db, "value", &value).await.unwrap(); 893 db::kv_set(&mut db, "value", &value).await.unwrap(); 894 assert_eq!( 895 db::kv_get::<serde_json::Value>(&mut db, "value") 896 .await 897 .unwrap(), 898 Some(value) 899 ); 900 } 901 902 #[tokio::test] 903 async fn tx_in() { 904 let (mut db, pool) = setup().await; 905 906 async fn routine( 907 db: &mut PgConnection, 908 first: &Option<IncomingSubject>, 909 second: &Option<IncomingSubject>, 910 ) { 911 let id = sqlx::query("SELECT count(*) + 1 FROM tx_in") 912 .try_map(|r: PgRow| r.try_get_u64(0)) 913 .fetch_one(&mut *db) 914 .await 915 .unwrap(); 916 let now = now_sql_stable_timestamp(); 917 let later = now + Span::new().hours(2); 918 let tx = TxIn { 919 transfer_id: now.as_microsecond() as i64, 920 tx_id: None, 921 amount: decimal("10"), 922 subject: "subject".to_owned(), 923 debtor_id: 31000163100000000, 924 debtor_name: "Name".to_string(), 925 valued_at: now, 926 }; 927 // Insert 928 assert_eq!( 929 db::register_tx_in(db, &tx, &first, &now) 930 .await 931 .expect("register tx in"), 932 AddIncomingResult::Success { 933 new: true, 934 row_id: id, 935 valued_at: now 936 } 937 ); 938 // Idempotent 939 assert_eq!( 940 db::register_tx_in( 941 db, 942 &TxIn { 943 valued_at: later, 944 ..tx.clone() 945 }, 946 &first, 947 &now 948 ) 949 .await 950 .expect("register tx in"), 951 AddIncomingResult::Success { 952 new: false, 953 row_id: id, 954 valued_at: now 955 } 956 ); 957 // Many 958 assert_eq!( 959 db::register_tx_in( 960 db, 961 &TxIn { 962 transfer_id: later.as_microsecond() as i64, 963 valued_at: later, 964 ..tx 965 }, 966 &second, 967 &now 968 ) 969 .await 970 .expect("register tx in"), 971 AddIncomingResult::Success { 972 new: true, 973 row_id: id + 1, 974 valued_at: later 975 } 976 ); 977 } 978 979 // Empty db 980 assert_eq!( 981 db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 982 .await 983 .unwrap(), 984 Vec::new() 985 ); 986 assert_eq!( 987 db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 988 .await 989 .unwrap(), 990 Vec::new() 991 ); 992 993 // Regular transaction 994 routine(&mut db, &None, &None).await; 995 996 // Reserve transaction 997 routine( 998 &mut db, 999 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1000 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1001 ) 1002 .await; 1003 1004 // Kyc transaction 1005 routine( 1006 &mut db, 1007 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1008 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1009 ) 1010 .await; 1011 1012 // History 1013 assert_eq!( 1014 db::revenue_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1015 .await 1016 .unwrap() 1017 .len(), 1018 6 1019 ); 1020 assert_eq!( 1021 db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1022 .await 1023 .unwrap() 1024 .len(), 1025 4 1026 ); 1027 } 1028 1029 #[tokio::test] 1030 async fn tx_in_admin() { 1031 let (_, pool) = setup().await; 1032 1033 // Empty db 1034 assert_eq!( 1035 db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1036 .await 1037 .unwrap(), 1038 Vec::new() 1039 ); 1040 1041 let now = now_sql_stable_timestamp(); 1042 let later = now + Span::new().hours(2); 1043 let tx = TxInAdmin { 1044 amount: decimal("10"), 1045 subject: "subject".to_owned(), 1046 debtor_id: 31000163100000000, 1047 debtor_name: "Name".into(), 1048 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1049 }; 1050 // Insert 1051 assert_eq!( 1052 db::register_tx_in_admin(&pool, &tx, &now) 1053 .await 1054 .expect("register tx in"), 1055 AddIncomingResult::Success { 1056 new: true, 1057 row_id: 1, 1058 valued_at: now 1059 } 1060 ); 1061 // Idempotent 1062 assert_eq!( 1063 db::register_tx_in_admin(&pool, &tx, &later) 1064 .await 1065 .expect("register tx in"), 1066 AddIncomingResult::Success { 1067 new: false, 1068 row_id: 1, 1069 valued_at: now 1070 } 1071 ); 1072 // Many 1073 assert_eq!( 1074 db::register_tx_in_admin( 1075 &pool, 1076 &TxInAdmin { 1077 subject: "Other".to_owned(), 1078 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1079 ..tx.clone() 1080 }, 1081 &later 1082 ) 1083 .await 1084 .expect("register tx in"), 1085 AddIncomingResult::Success { 1086 new: true, 1087 row_id: 2, 1088 valued_at: later 1089 } 1090 ); 1091 1092 // History 1093 assert_eq!( 1094 db::incoming_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1095 .await 1096 .unwrap() 1097 .len(), 1098 2 1099 ); 1100 } 1101 1102 #[tokio::test] 1103 async fn tx_out() { 1104 let (mut db, pool) = setup().await; 1105 1106 async fn routine(db: &mut PgConnection, first: &TxOutKind, second: &TxOutKind) { 1107 let transfer_id = sqlx::query("SELECT count(*) + 1 FROM tx_out") 1108 .try_map(|r: PgRow| r.try_get(0)) 1109 .fetch_one(&mut *db) 1110 .await 1111 .unwrap(); 1112 let now = now_sql_stable_timestamp(); 1113 let later = now + Span::new().hours(2); 1114 let tx = TxOut { 1115 transfer_id, 1116 tx_id: Some(transfer_id), 1117 amount: decimal("10"), 1118 subject: "subject".to_owned(), 1119 creditor_id: 31000163100000000, 1120 creditor_name: "Name".to_string(), 1121 valued_at: now, 1122 }; 1123 assert!(matches!( 1124 db::make_transfer( 1125 &mut *db, 1126 &Transfer { 1127 request_uid: HashCode::rand(), 1128 amount: decimal("10"), 1129 exchange_base_url: url("https://exchange.test.com/"), 1130 wtid: ShortHashCode::rand(), 1131 creditor_id: 31000163100000000, 1132 creditor_name: "Name".into() 1133 }, 1134 &now 1135 ) 1136 .await 1137 .unwrap(), 1138 TransferResult::Success { .. } 1139 )); 1140 db::initiated_submit_success(&mut *db, 1, &Timestamp::now(), transfer_id) 1141 .await 1142 .expect("status success"); 1143 1144 // Insert 1145 assert_eq!( 1146 db::register_tx_out(&mut *db, &tx, first, &now) 1147 .await 1148 .expect("register tx out"), 1149 AddOutgoingResult { 1150 result: db::RegisterResult::known, 1151 row_id: transfer_id, 1152 } 1153 ); 1154 // Idempotent 1155 assert_eq!( 1156 db::register_tx_out( 1157 &mut *db, 1158 &TxOut { 1159 valued_at: later, 1160 ..tx.clone() 1161 }, 1162 first, 1163 &now 1164 ) 1165 .await 1166 .expect("register tx out"), 1167 AddOutgoingResult { 1168 result: db::RegisterResult::idempotent, 1169 row_id: transfer_id, 1170 } 1171 ); 1172 // Recovered 1173 assert_eq!( 1174 db::register_tx_out( 1175 &mut *db, 1176 &TxOut { 1177 transfer_id: transfer_id + 1, 1178 tx_id: Some(transfer_id + 1), 1179 valued_at: later, 1180 ..tx.clone() 1181 }, 1182 second, 1183 &now 1184 ) 1185 .await 1186 .expect("register tx out"), 1187 AddOutgoingResult { 1188 result: db::RegisterResult::recovered, 1189 row_id: transfer_id + 1, 1190 } 1191 ); 1192 } 1193 1194 // Empty db 1195 assert_eq!( 1196 db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1197 .await 1198 .unwrap(), 1199 Vec::new() 1200 ); 1201 1202 // Regular transaction 1203 routine(&mut db, &TxOutKind::Simple, &TxOutKind::Simple).await; 1204 1205 // Talerable transaction 1206 routine( 1207 &mut db, 1208 &TxOutKind::Talerable(OutgoingSubject::rand()), 1209 &TxOutKind::Talerable(OutgoingSubject::rand()), 1210 ) 1211 .await; 1212 1213 // Bounced transaction 1214 routine(&mut db, &TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1215 1216 // History 1217 assert_eq!( 1218 db::outgoing_history(&pool, &History::default(), &CURRENCY, &ROOT, dummy_listen) 1219 .await 1220 .unwrap() 1221 .len(), 1222 2 1223 ); 1224 } 1225 1226 // TODO tx out failure 1227 1228 #[tokio::test] 1229 async fn transfer() { 1230 let (mut db, _) = setup().await; 1231 1232 // Empty db 1233 assert_eq!( 1234 db::transfer_by_id(&mut db, 0, &CURRENCY, &ROOT) 1235 .await 1236 .unwrap(), 1237 None 1238 ); 1239 assert_eq!( 1240 db::transfer_page(&mut db, &None, &CURRENCY, &ROOT, &Page::default()) 1241 .await 1242 .unwrap(), 1243 Vec::new() 1244 ); 1245 1246 let req = Transfer { 1247 request_uid: HashCode::rand(), 1248 amount: decimal("10"), 1249 exchange_base_url: url("https://exchange.test.com/"), 1250 wtid: ShortHashCode::rand(), 1251 creditor_id: 31000163100000000, 1252 creditor_name: "Name".into(), 1253 }; 1254 let now = now_sql_stable_timestamp(); 1255 let later = now + Span::new().hours(2); 1256 // Insert 1257 assert_eq!( 1258 db::make_transfer(&mut db, &req, &now) 1259 .await 1260 .expect("transfer"), 1261 TransferResult::Success { 1262 id: 1, 1263 initiated_at: now 1264 } 1265 ); 1266 // Idempotent 1267 assert_eq!( 1268 db::make_transfer(&mut db, &req, &later) 1269 .await 1270 .expect("transfer"), 1271 TransferResult::Success { 1272 id: 1, 1273 initiated_at: now 1274 } 1275 ); 1276 // Request UID reuse 1277 assert_eq!( 1278 db::make_transfer( 1279 &mut db, 1280 &Transfer { 1281 wtid: ShortHashCode::rand(), 1282 ..req.clone() 1283 }, 1284 &now 1285 ) 1286 .await 1287 .expect("transfer"), 1288 TransferResult::RequestUidReuse 1289 ); 1290 // wtid reuse 1291 assert_eq!( 1292 db::make_transfer( 1293 &mut db, 1294 &Transfer { 1295 request_uid: HashCode::rand(), 1296 ..req.clone() 1297 }, 1298 &now 1299 ) 1300 .await 1301 .expect("transfer"), 1302 TransferResult::WtidReuse 1303 ); 1304 // Many 1305 assert_eq!( 1306 db::make_transfer( 1307 &mut db, 1308 &Transfer { 1309 request_uid: HashCode::rand(), 1310 wtid: ShortHashCode::rand(), 1311 ..req 1312 }, 1313 &later 1314 ) 1315 .await 1316 .expect("transfer"), 1317 TransferResult::Success { 1318 id: 2, 1319 initiated_at: later.into() 1320 } 1321 ); 1322 1323 // Get 1324 assert!( 1325 db::transfer_by_id(&mut db, 1, &CURRENCY, &ROOT) 1326 .await 1327 .unwrap() 1328 .is_some() 1329 ); 1330 assert!( 1331 db::transfer_by_id(&mut db, 2, &CURRENCY, &ROOT) 1332 .await 1333 .unwrap() 1334 .is_some() 1335 ); 1336 assert!( 1337 db::transfer_by_id(&mut db, 3, &CURRENCY, &ROOT) 1338 .await 1339 .unwrap() 1340 .is_none() 1341 ); 1342 assert_eq!( 1343 db::transfer_page(&mut db, &None, &CURRENCY, &ROOT, &Page::default()) 1344 .await 1345 .unwrap() 1346 .len(), 1347 2 1348 ); 1349 } 1350 1351 #[tokio::test] 1352 async fn bounce() { 1353 let (mut db, _) = setup().await; 1354 1355 let amount = decimal("10"); 1356 let now = now_sql_stable_timestamp(); 1357 1358 // Bounce 1359 assert_eq!( 1360 db::register_bounced_tx_in( 1361 &mut db, 1362 &TxIn { 1363 transfer_id: 12, 1364 tx_id: None, 1365 amount, 1366 subject: "subject".to_owned(), 1367 debtor_id: 31000163100000000, 1368 debtor_name: "Name".to_string(), 1369 valued_at: now 1370 }, 1371 22, 1372 "good reason", 1373 &now 1374 ) 1375 .await 1376 .expect("bounce"), 1377 BounceResult { 1378 tx_id: 1, 1379 tx_new: true 1380 } 1381 ); 1382 // Idempotent 1383 assert_eq!( 1384 db::register_bounced_tx_in( 1385 &mut db, 1386 &TxIn { 1387 transfer_id: 12, 1388 tx_id: None, 1389 amount: amount.clone(), 1390 subject: "subject".to_owned(), 1391 debtor_id: 31000163100000000, 1392 debtor_name: "Name".to_string(), 1393 valued_at: now 1394 }, 1395 22, 1396 "good reason", 1397 &now 1398 ) 1399 .await 1400 .expect("bounce"), 1401 BounceResult { 1402 tx_id: 1, 1403 tx_new: false 1404 } 1405 ); 1406 1407 // Many 1408 assert_eq!( 1409 db::register_bounced_tx_in( 1410 &mut db, 1411 &TxIn { 1412 transfer_id: 13, 1413 tx_id: None, 1414 amount: amount.clone(), 1415 subject: "subject".to_owned(), 1416 debtor_id: 31000163100000000, 1417 debtor_name: "Name".to_string(), 1418 valued_at: now 1419 }, 1420 23, 1421 "good reason", 1422 &now 1423 ) 1424 .await 1425 .expect("bounce"), 1426 BounceResult { 1427 tx_id: 2, 1428 tx_new: true 1429 } 1430 ); 1431 } 1432 1433 #[tokio::test] 1434 async fn status() { 1435 let (mut db, _) = setup().await; 1436 1437 async fn check_status( 1438 db: &mut PgConnection, 1439 id: u64, 1440 status: TransferState, 1441 msg: Option<&str>, 1442 ) { 1443 let transfer = db::transfer_by_id(db, id, &CURRENCY, &ROOT) 1444 .await 1445 .unwrap() 1446 .unwrap(); 1447 assert_eq!( 1448 (status, msg), 1449 (transfer.status, transfer.status_msg.as_deref()) 1450 ); 1451 } 1452 1453 // Unknown transfer 1454 db::initiated_submit_permanent_failure(&mut db, 1, "msg") 1455 .await 1456 .unwrap(); 1457 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1458 .await 1459 .unwrap(); 1460 assert_eq!( 1461 db::initiated_chargeback_failure(&mut db, 1).await.unwrap(), 1462 ChargebackFailureResult::Unknown 1463 ); 1464 1465 // Failure 1466 db::make_transfer( 1467 &mut db, 1468 &Transfer { 1469 request_uid: HashCode::rand(), 1470 amount: decimal("1"), 1471 exchange_base_url: url("https://exchange.test.com/"), 1472 wtid: ShortHashCode::rand(), 1473 creditor_id: 31000163100000000, 1474 creditor_name: "Name".into(), 1475 }, 1476 &Timestamp::now(), 1477 ) 1478 .await 1479 .expect("transfer"); 1480 check_status(&mut db, 1, TransferState::pending, None).await; 1481 db::initiated_submit_permanent_failure(&mut db, 1, "error status") 1482 .await 1483 .unwrap(); 1484 check_status( 1485 &mut db, 1486 1, 1487 TransferState::permanent_failure, 1488 Some("error status"), 1489 ) 1490 .await; 1491 1492 // Success 1493 db::make_transfer( 1494 &mut db, 1495 &Transfer { 1496 request_uid: HashCode::rand(), 1497 amount: decimal("1"), 1498 exchange_base_url: url("https://exchange.test.com/"), 1499 wtid: ShortHashCode::rand(), 1500 creditor_id: 31000163100000000, 1501 creditor_name: "Name".into(), 1502 }, 1503 &Timestamp::now(), 1504 ) 1505 .await 1506 .expect("transfer"); 1507 check_status(&mut db, 2, TransferState::pending, None).await; 1508 db::initiated_submit_success(&mut db, 2, &Timestamp::now(), 3) 1509 .await 1510 .unwrap(); 1511 check_status(&mut db, 2, TransferState::pending, None).await; 1512 db::register_tx_out( 1513 &mut db, 1514 &TxOut { 1515 transfer_id: 5, 1516 tx_id: Some(3), 1517 amount: decimal("2"), 1518 subject: "".to_string(), 1519 creditor_id: 31000163100000000, 1520 creditor_name: "Name".to_string(), 1521 valued_at: Timestamp::now(), 1522 }, 1523 &TxOutKind::Simple, 1524 &Timestamp::now(), 1525 ) 1526 .await 1527 .unwrap(); 1528 check_status(&mut db, 2, TransferState::success, None).await; 1529 1530 // Chargeback 1531 assert_eq!( 1532 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1533 ChargebackFailureResult::Known(2) 1534 ); 1535 check_status( 1536 &mut db, 1537 2, 1538 TransferState::late_failure, 1539 Some("charged back"), 1540 ) 1541 .await; 1542 assert_eq!( 1543 db::initiated_chargeback_failure(&mut db, 5).await.unwrap(), 1544 ChargebackFailureResult::Idempotent(2) 1545 ); 1546 } 1547 1548 #[tokio::test] 1549 async fn batch() { 1550 let (mut db, _) = setup().await; 1551 let start = Timestamp::now(); 1552 1553 // Empty db 1554 let pendings = db::pending_batch(&mut db, &start) 1555 .await 1556 .expect("pending_batch"); 1557 assert_eq!(pendings.len(), 0); 1558 1559 // Some transfers 1560 for i in 0..3 { 1561 db::make_transfer( 1562 &mut db, 1563 &Transfer { 1564 request_uid: HashCode::rand(), 1565 amount: decimal(format!("{}", i + 1)), 1566 exchange_base_url: url("https://exchange.test.com/"), 1567 wtid: ShortHashCode::rand(), 1568 creditor_id: 31000163100000000, 1569 creditor_name: "Name".into(), 1570 }, 1571 &Timestamp::now(), 1572 ) 1573 .await 1574 .expect("transfer"); 1575 } 1576 let pendings = db::pending_batch(&mut db, &start) 1577 .await 1578 .expect("pending_batch"); 1579 assert_eq!(pendings.len(), 3); 1580 1581 // Max 100 txs in batch 1582 for i in 0..100 { 1583 db::make_transfer( 1584 &mut db, 1585 &Transfer { 1586 request_uid: HashCode::rand(), 1587 amount: decimal(format!("{}", i + 1)), 1588 exchange_base_url: url("https://exchange.test.com/"), 1589 wtid: ShortHashCode::rand(), 1590 creditor_id: 31000163100000000, 1591 creditor_name: "Name".into(), 1592 }, 1593 &Timestamp::now(), 1594 ) 1595 .await 1596 .expect("transfer"); 1597 } 1598 let pendings = db::pending_batch(&mut db, &start) 1599 .await 1600 .expect("pending_batch"); 1601 assert_eq!(pendings.len(), 100); 1602 1603 // Skip uploaded 1604 for i in 0..=10 { 1605 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1606 .await 1607 .expect("status success"); 1608 } 1609 let pendings = db::pending_batch(&mut db, &start) 1610 .await 1611 .expect("pending_batch"); 1612 assert_eq!(pendings.len(), 93); 1613 1614 // Skip failed 1615 for i in 0..=10 { 1616 db::initiated_submit_permanent_failure(&mut db, 10 + i, "failure") 1617 .await 1618 .expect("status failure"); 1619 } 1620 let pendings = db::pending_batch(&mut db, &start) 1621 .await 1622 .expect("pending_batch"); 1623 assert_eq!(pendings.len(), 83); 1624 } 1625 }