db.rs (50372B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::fmt::Display; 18 19 use compact_str::CompactString; 20 use jiff::{Timestamp, civil::Date, tz::TimeZone}; 21 use serde::{Serialize, de::DeserializeOwned}; 22 use sqlx::{PgConnection, PgPool, QueryBuilder, Row, postgres::PgRow}; 23 use taler_api::{ 24 db::{BindHelper, TypeHelper, history, page}, 25 serialized, 26 subject::{IncomingSubject, OutgoingSubject, fmt_out_subject}, 27 }; 28 use taler_common::{ 29 api_common::{HashCode, ShortHashCode}, 30 api_params::{History, Page}, 31 api_revenue::RevenueIncomingBankTransaction, 32 api_transfer::{RegistrationRequest, Unregistration}, 33 api_wire::{ 34 IncomingBankTransaction, OutgoingBankTransaction, TransferListStatus, TransferState, 35 TransferStatus, 36 }, 37 config::Config, 38 db::IncomingType, 39 types::{ 40 amount::{Amount, Decimal}, 41 payto::PaytoImpl as _, 42 }, 43 }; 44 use tokio::sync::watch::{Receiver, Sender}; 45 use url::Url; 46 47 use crate::{FullHuPayto, config::parse_db_cfg, constants::CURRENCY, magnet_api::types::TxStatus}; 48 49 const SCHEMA: &str = "magnet_bank"; 50 51 pub async fn pool(cfg: &Config) -> anyhow::Result<PgPool> { 52 let db = parse_db_cfg(cfg)?; 53 let pool = taler_common::db::pool(db.cfg, SCHEMA).await?; 54 Ok(pool) 55 } 56 57 pub async fn dbinit(cfg: &Config, reset: bool) -> anyhow::Result<PgPool> { 58 let db_cfg = parse_db_cfg(cfg)?; 59 let pool = taler_common::db::pool(db_cfg.cfg, SCHEMA).await?; 60 let mut db = pool.acquire().await?; 61 taler_common::db::dbinit(&mut db, db_cfg.sql_dir.as_ref(), "magnet-bank", reset).await?; 62 Ok(pool) 63 } 64 65 pub async fn notification_listener( 66 pool: PgPool, 67 in_channel: Sender<i64>, 68 taler_in_channel: Sender<i64>, 69 out_channel: Sender<i64>, 70 taler_out_channel: Sender<i64>, 71 ) -> sqlx::Result<()> { 72 taler_api::notification::notification_listener!(&pool, 73 "tx_in" => (row_id: i64) { 74 in_channel.send_replace(row_id); 75 }, 76 "taler_in" => (row_id: i64) { 77 taler_in_channel.send_replace(row_id); 78 }, 79 "tx_out" => (row_id: i64) { 80 out_channel.send_replace(row_id); 81 }, 82 "taler_out" => (row_id: i64) { 83 taler_out_channel.send_replace(row_id); 84 } 85 ) 86 } 87 88 #[derive(Debug, Clone)] 89 pub struct TxIn { 90 pub code: u64, 91 pub amount: Amount, 92 pub subject: Box<str>, 93 pub debtor: FullHuPayto, 94 pub value_date: Date, 95 pub status: TxStatus, 96 } 97 98 impl Display for TxIn { 99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 100 let Self { 101 code, 102 amount, 103 subject, 104 debtor, 105 value_date, 106 status, 107 } = self; 108 write!( 109 f, 110 "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'", 111 debtor.bban(), 112 debtor.name 113 ) 114 } 115 } 116 117 #[derive(Debug, Clone)] 118 pub struct TxOut { 119 pub code: u64, 120 pub amount: Amount, 121 pub subject: Box<str>, 122 pub creditor: FullHuPayto, 123 pub value_date: Date, 124 pub status: TxStatus, 125 } 126 127 impl Display for TxOut { 128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 129 let Self { 130 code, 131 amount, 132 subject, 133 creditor, 134 value_date, 135 status, 136 } = self; 137 write!( 138 f, 139 "{value_date} {code} {amount} ({} {}) {status:?} '{subject}'", 140 creditor.bban(), 141 &creditor.name 142 ) 143 } 144 } 145 146 #[derive(Debug, PartialEq, Eq)] 147 pub struct Initiated { 148 pub id: u64, 149 pub amount: Amount, 150 pub subject: Box<str>, 151 pub creditor: FullHuPayto, 152 } 153 154 impl Display for Initiated { 155 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 156 let Self { 157 id, 158 amount, 159 subject, 160 creditor, 161 } = self; 162 write!( 163 f, 164 "{id} {amount} ({} {}) '{subject}'", 165 creditor.bban(), 166 &creditor.name 167 ) 168 } 169 } 170 171 #[derive(Debug, Clone)] 172 pub struct TxInAdmin { 173 pub amount: Amount, 174 pub subject: String, 175 pub debtor: FullHuPayto, 176 pub metadata: IncomingSubject, 177 } 178 179 /// Lock the database for worker execution 180 pub async fn worker_lock(e: &mut PgConnection) -> sqlx::Result<bool> { 181 sqlx::query("SELECT pg_try_advisory_lock(42)") 182 .try_map(|r: PgRow| r.try_get(0)) 183 .fetch_one(e) 184 .await 185 } 186 187 #[derive(Debug, PartialEq, Eq)] 188 pub enum AddIncomingResult { 189 Success { 190 new: bool, 191 pending: bool, 192 row_id: u64, 193 valued_at: Date, 194 }, 195 ReservePubReuse, 196 UnknownMapping, 197 MappingReuse, 198 } 199 200 pub async fn register_tx_in_admin( 201 db: &PgPool, 202 tx: &TxInAdmin, 203 now: &Timestamp, 204 ) -> sqlx::Result<AddIncomingResult> { 205 serialized!( 206 sqlx::query( 207 " 208 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 209 FROM register_tx_in(NULL, $1, $2, $3, $4, $5, $6, $7, $5) 210 ", 211 ) 212 .bind(&tx.amount) 213 .bind(&tx.subject) 214 .bind(tx.debtor.iban()) 215 .bind(&tx.debtor.name) 216 .bind_date(&now.to_zoned(TimeZone::UTC).date()) 217 .bind(tx.metadata.ty()) 218 .bind(tx.metadata.key()) 219 .try_map(|r: PgRow| { 220 Ok(if r.try_get_flag(0)? { 221 AddIncomingResult::ReservePubReuse 222 } else if r.try_get_flag(1)? { 223 AddIncomingResult::MappingReuse 224 } else if r.try_get_flag(2)? { 225 AddIncomingResult::UnknownMapping 226 } else { 227 AddIncomingResult::Success { 228 row_id: r.try_get_u64(3)?, 229 valued_at: r.try_get_date(4)?, 230 new: r.try_get(5)?, 231 pending: r.try_get(6)? 232 } 233 }) 234 }) 235 .fetch_one(db) 236 ) 237 } 238 239 pub async fn register_tx_in( 240 db: &mut PgConnection, 241 tx: &TxIn, 242 subject: &Option<IncomingSubject>, 243 now: &Timestamp, 244 ) -> sqlx::Result<AddIncomingResult> { 245 serialized!( 246 sqlx::query( 247 " 248 SELECT out_reserve_pub_reuse, out_mapping_reuse, out_unknown_mapping, out_tx_row_id, out_valued_at, out_new, out_pending 249 FROM register_tx_in($1, $2, $3, $4, $5, $6, $7, $8, $9) 250 ", 251 ) 252 .bind(tx.code as i64) 253 .bind(&tx.amount) 254 .bind(&tx.subject) 255 .bind(tx.debtor.iban()) 256 .bind(&tx.debtor.name) 257 .bind_date(&tx.value_date) 258 .bind(subject.as_ref().map(|it| it.ty())) 259 .bind(subject.as_ref().map(|it| it.key())) 260 .bind_timestamp(now) 261 .try_map(|r: PgRow| { 262 Ok(if r.try_get_flag(0)? { 263 AddIncomingResult::ReservePubReuse 264 } else if r.try_get_flag(1)? { 265 AddIncomingResult::MappingReuse 266 } else if r.try_get_flag(2)? { 267 AddIncomingResult::UnknownMapping 268 } else { 269 AddIncomingResult::Success { 270 row_id: r.try_get_u64(3)?, 271 valued_at: r.try_get_date(4)?, 272 new: r.try_get(5)?, 273 pending: r.try_get(6)? 274 } 275 }) 276 }) 277 .fetch_one(&mut *db) 278 ) 279 } 280 281 #[derive(Debug)] 282 pub enum TxOutKind { 283 Simple, 284 Bounce(u32), 285 Talerable(OutgoingSubject), 286 } 287 288 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 289 #[allow(non_camel_case_types)] 290 #[sqlx(type_name = "register_result")] 291 pub enum RegisterResult { 292 /// Already registered 293 idempotent, 294 /// Initiated transaction 295 known, 296 /// Recovered unknown outgoing transaction 297 recovered, 298 } 299 300 #[derive(Debug, PartialEq, Eq)] 301 pub struct AddOutgoingResult { 302 pub result: RegisterResult, 303 pub row_id: u64, 304 } 305 306 pub async fn register_tx_out( 307 db: &mut PgConnection, 308 tx: &TxOut, 309 kind: &TxOutKind, 310 now: &Timestamp, 311 ) -> sqlx::Result<AddOutgoingResult> { 312 serialized!({ 313 let query = sqlx::query( 314 " 315 SELECT out_result, out_tx_row_id 316 FROM register_tx_out($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) 317 ", 318 ) 319 .bind(tx.code as i64) 320 .bind(&tx.amount) 321 .bind(&tx.subject) 322 .bind(tx.creditor.iban()) 323 .bind(&tx.creditor.name) 324 .bind_date(&tx.value_date); 325 let query = match kind { 326 TxOutKind::Simple => query 327 .bind(None::<&[u8]>) 328 .bind(None::<&str>) 329 .bind(None::<&str>) 330 .bind(None::<i64>), 331 TxOutKind::Bounce(bounced) => query 332 .bind(None::<&[u8]>) 333 .bind(None::<&str>) 334 .bind(None::<&str>) 335 .bind(*bounced as i64), 336 TxOutKind::Talerable(subject) => query 337 .bind(&subject.wtid) 338 .bind(subject.exchange_base_url.as_str()) 339 .bind(&subject.metadata) 340 .bind(None::<i64>), 341 }; 342 query 343 .bind_timestamp(now) 344 .try_map(|r: PgRow| { 345 Ok(AddOutgoingResult { 346 result: r.try_get(0)?, 347 row_id: r.try_get_u64(1)?, 348 }) 349 }) 350 .fetch_one(&mut *db) 351 }) 352 } 353 354 #[derive(Debug, PartialEq, Eq)] 355 pub struct OutFailureResult { 356 pub initiated_id: Option<u64>, 357 pub new: bool, 358 } 359 360 pub async fn register_tx_out_failure( 361 db: &mut PgConnection, 362 code: u64, 363 bounced: Option<u32>, 364 now: &Timestamp, 365 ) -> sqlx::Result<OutFailureResult> { 366 serialized!( 367 sqlx::query( 368 " 369 SELECT out_new, out_initiated_id 370 FROM register_tx_out_failure($1, $2, $3) 371 ", 372 ) 373 .bind(code as i64) 374 .bind(bounced.map(|i| i as i32)) 375 .bind_timestamp(now) 376 .try_map(|r: PgRow| { 377 Ok(OutFailureResult { 378 new: r.try_get(0)?, 379 initiated_id: r.try_get::<Option<i64>, _>(1)?.map(|i| i as u64), 380 }) 381 }) 382 .fetch_one(&mut *db) 383 ) 384 } 385 386 #[derive(Debug, PartialEq, Eq)] 387 pub enum TransferResult { 388 Success { id: u64, initiated_at: Timestamp }, 389 RequestUidReuse, 390 WtidReuse, 391 } 392 393 #[derive(Debug, Clone)] 394 pub struct Transfer { 395 pub request_uid: HashCode, 396 pub amount: Decimal, 397 pub exchange_base_url: Url, 398 pub metadata: Option<CompactString>, 399 pub wtid: ShortHashCode, 400 pub creditor: FullHuPayto, 401 } 402 403 pub async fn make_transfer( 404 db: &PgPool, 405 tx: &Transfer, 406 now: &Timestamp, 407 ) -> sqlx::Result<TransferResult> { 408 let subject = fmt_out_subject(&tx.wtid, &tx.exchange_base_url, tx.metadata.as_deref()); 409 serialized!( 410 sqlx::query( 411 " 412 SELECT out_request_uid_reuse, out_wtid_reuse, out_initiated_row_id, out_initiated_at 413 FROM taler_transfer($1, $2, $3, $4, $5, $6, $7, $8, $9) 414 ", 415 ) 416 .bind(&tx.request_uid) 417 .bind(&tx.wtid) 418 .bind(&subject) 419 .bind(tx.amount) 420 .bind(tx.exchange_base_url.as_str()) 421 .bind(&tx.metadata) 422 .bind(tx.creditor.iban()) 423 .bind(&tx.creditor.name) 424 .bind_timestamp(now) 425 .try_map(|r: PgRow| { 426 Ok(if r.try_get_flag(0)? { 427 TransferResult::RequestUidReuse 428 } else if r.try_get_flag(1)? { 429 TransferResult::WtidReuse 430 } else { 431 TransferResult::Success { 432 id: r.try_get_u64(2)?, 433 initiated_at: r.try_get_timestamp(3)?, 434 } 435 }) 436 }) 437 .fetch_one(db) 438 ) 439 } 440 441 #[derive(Debug, PartialEq, Eq)] 442 pub struct BounceResult { 443 pub tx_id: u64, 444 pub tx_new: bool, 445 pub bounce_id: u64, 446 pub bounce_new: bool, 447 } 448 449 pub async fn register_bounce_tx_in( 450 db: &mut PgConnection, 451 tx: &TxIn, 452 reason: &str, 453 now: &Timestamp, 454 ) -> sqlx::Result<BounceResult> { 455 serialized!( 456 sqlx::query( 457 " 458 SELECT out_tx_row_id, out_tx_new, out_bounce_row_id, out_bounce_new 459 FROM register_bounce_tx_in($1, $2, $3, $4, $5, $6, $7, $8) 460 ", 461 ) 462 .bind(tx.code as i64) 463 .bind(&tx.amount) 464 .bind(&tx.subject) 465 .bind(tx.debtor.iban()) 466 .bind(&tx.debtor.name) 467 .bind_date(&tx.value_date) 468 .bind(reason) 469 .bind_timestamp(now) 470 .try_map(|r: PgRow| { 471 Ok(BounceResult { 472 tx_id: r.try_get_u64(0)?, 473 tx_new: r.try_get(1)?, 474 bounce_id: r.try_get_u64(2)?, 475 bounce_new: r.try_get(3)?, 476 }) 477 }) 478 .fetch_one(&mut *db) 479 ) 480 } 481 482 pub async fn transfer_page( 483 db: &PgPool, 484 status: &Option<TransferState>, 485 params: &Page, 486 ) -> sqlx::Result<Vec<TransferListStatus>> { 487 page( 488 db, 489 "initiated_id", 490 params, 491 || { 492 let mut builder = QueryBuilder::new( 493 " 494 SELECT 495 initiated_id, 496 status, 497 amount, 498 credit_account, 499 credit_name, 500 initiated_at 501 FROM transfer 502 JOIN initiated USING (initiated_id) 503 WHERE 504 ", 505 ); 506 if let Some(status) = status { 507 builder.push(" status = ").push_bind(status).push(" AND "); 508 } 509 builder 510 }, 511 |r: PgRow| { 512 Ok(TransferListStatus { 513 row_id: r.try_get_safeu64(0)?, 514 status: r.try_get(1)?, 515 amount: r.try_get_amount(2, &CURRENCY)?, 516 credit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 517 timestamp: r.try_get_timestamp(5)?.into(), 518 }) 519 }, 520 ) 521 .await 522 } 523 524 pub async fn outgoing_history( 525 db: &PgPool, 526 params: &History, 527 listen: impl FnOnce() -> Receiver<i64>, 528 ) -> sqlx::Result<Vec<OutgoingBankTransaction>> { 529 history( 530 db, 531 "tx_out_id", 532 params, 533 listen, 534 || { 535 QueryBuilder::new( 536 " 537 SELECT 538 tx_out_id, 539 amount, 540 credit_account, 541 credit_name, 542 valued_at, 543 exchange_base_url, 544 metadata, 545 wtid 546 FROM taler_out 547 JOIN tx_out USING (tx_out_id) 548 WHERE 549 ", 550 ) 551 }, 552 |r: PgRow| { 553 Ok(OutgoingBankTransaction { 554 row_id: r.try_get_safeu64(0)?, 555 amount: r.try_get_amount(1, &CURRENCY)?, 556 debit_fee: None, 557 credit_account: r.try_get_iban(2)?.as_full_payto(r.try_get(3)?), 558 date: r.try_get_timestamp(4)?.into(), 559 exchange_base_url: r.try_get_url(5)?, 560 metadata: r.try_get(6)?, 561 wtid: r.try_get(7)?, 562 }) 563 }, 564 ) 565 .await 566 } 567 568 pub async fn incoming_history( 569 db: &PgPool, 570 params: &History, 571 listen: impl FnOnce() -> Receiver<i64>, 572 ) -> sqlx::Result<Vec<IncomingBankTransaction>> { 573 history( 574 db, 575 "tx_in_id", 576 params, 577 listen, 578 || { 579 QueryBuilder::new( 580 " 581 SELECT 582 type, 583 tx_in_id, 584 amount, 585 debit_account, 586 debit_name, 587 valued_at, 588 metadata, 589 authorization_pub, 590 authorization_sig 591 FROM taler_in 592 JOIN tx_in USING (tx_in_id) 593 WHERE 594 ", 595 ) 596 }, 597 |r: PgRow| { 598 Ok(match r.try_get(0)? { 599 IncomingType::reserve => IncomingBankTransaction::Reserve { 600 row_id: r.try_get_safeu64(1)?, 601 amount: r.try_get_amount(2, &CURRENCY)?, 602 credit_fee: None, 603 debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 604 date: r.try_get_timestamp(5)?.into(), 605 reserve_pub: r.try_get(6)?, 606 authorization_pub: r.try_get(7)?, 607 authorization_sig: r.try_get(8)?, 608 }, 609 IncomingType::kyc => IncomingBankTransaction::Kyc { 610 row_id: r.try_get_safeu64(1)?, 611 amount: r.try_get_amount(2, &CURRENCY)?, 612 credit_fee: None, 613 debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 614 date: r.try_get_timestamp(5)?.into(), 615 account_pub: r.try_get(6)?, 616 authorization_pub: r.try_get(7)?, 617 authorization_sig: r.try_get(8)?, 618 }, 619 IncomingType::map => unimplemented!("MAP are never listed in the history"), 620 }) 621 }, 622 ) 623 .await 624 } 625 626 pub async fn revenue_history( 627 db: &PgPool, 628 params: &History, 629 listen: impl FnOnce() -> Receiver<i64>, 630 ) -> sqlx::Result<Vec<RevenueIncomingBankTransaction>> { 631 history( 632 db, 633 "tx_in_id", 634 params, 635 listen, 636 || { 637 QueryBuilder::new( 638 " 639 SELECT 640 tx_in_id, 641 valued_at, 642 amount, 643 debit_account, 644 debit_name, 645 subject 646 FROM tx_in 647 WHERE 648 ", 649 ) 650 }, 651 |r: PgRow| { 652 Ok(RevenueIncomingBankTransaction { 653 row_id: r.try_get_safeu64(0)?, 654 date: r.try_get_timestamp(1)?.into(), 655 amount: r.try_get_amount(2, &CURRENCY)?, 656 credit_fee: None, 657 debit_account: r.try_get_iban(3)?.as_full_payto(r.try_get(4)?), 658 subject: r.try_get(5)?, 659 }) 660 }, 661 ) 662 .await 663 } 664 665 pub async fn transfer_by_id(db: &PgPool, id: u64) -> sqlx::Result<Option<TransferStatus>> { 666 serialized!( 667 sqlx::query( 668 " 669 SELECT 670 status, 671 status_msg, 672 amount, 673 exchange_base_url, 674 metadata, 675 wtid, 676 credit_account, 677 credit_name, 678 initiated_at 679 FROM transfer 680 JOIN initiated USING (initiated_id) 681 WHERE initiated_id = $1 682 ", 683 ) 684 .bind(id as i64) 685 .try_map(|r: PgRow| { 686 Ok(TransferStatus { 687 status: r.try_get(0)?, 688 status_msg: r.try_get(1)?, 689 amount: r.try_get_amount(2, &CURRENCY)?, 690 origin_exchange_url: r.try_get(3)?, 691 metadata: r.try_get(4)?, 692 wtid: r.try_get(5)?, 693 credit_account: r.try_get_iban(6)?.as_full_payto(r.try_get(7)?), 694 timestamp: r.try_get_timestamp(8)?.into(), 695 }) 696 }) 697 .fetch_optional(db) 698 ) 699 } 700 701 /** Get a batch of pending initiated transactions not attempted since [start] */ 702 pub async fn pending_batch( 703 db: &mut PgConnection, 704 start: &Timestamp, 705 ) -> sqlx::Result<Vec<Initiated>> { 706 serialized!( 707 sqlx::query( 708 " 709 SELECT initiated_id, amount, subject, credit_account, credit_name 710 FROM initiated 711 WHERE magnet_code IS NULL 712 AND status='pending' 713 AND (last_submitted IS NULL OR last_submitted < $1) 714 LIMIT 100 715 ", 716 ) 717 .bind_timestamp(start) 718 .try_map(|r: PgRow| { 719 Ok(Initiated { 720 id: r.try_get_u64(0)?, 721 amount: r.try_get_amount(1, &CURRENCY)?, 722 subject: r.try_get(2)?, 723 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), 724 }) 725 }) 726 .fetch_all(&mut *db) 727 ) 728 } 729 730 /** Get an initiated transaction matching the given magnet [code] */ 731 pub async fn initiated_by_code( 732 db: &mut PgConnection, 733 code: u64, 734 ) -> sqlx::Result<Option<Initiated>> { 735 serialized!( 736 sqlx::query( 737 " 738 SELECT initiated_id, amount, subject, credit_account, credit_name 739 FROM initiated 740 WHERE magnet_code IS $1 741 ", 742 ) 743 .bind(code as i64) 744 .try_map(|r: PgRow| { 745 Ok(Initiated { 746 id: r.try_get_u64(0)?, 747 amount: r.try_get_amount(1, &CURRENCY)?, 748 subject: r.try_get(2)?, 749 creditor: FullHuPayto::new(r.try_get_parse(3)?, r.try_get(4)?), 750 }) 751 }) 752 .fetch_optional(&mut *db) 753 ) 754 } 755 756 /** Update status of a successful submitted initiated transaction */ 757 pub async fn initiated_submit_success( 758 db: &mut PgConnection, 759 id: u64, 760 timestamp: &Timestamp, 761 magnet_code: u64, 762 ) -> sqlx::Result<()> { 763 serialized!( 764 sqlx::query( 765 " 766 UPDATE initiated 767 SET status='pending', submission_counter=submission_counter+1, last_submitted=$1, magnet_code=$2 768 WHERE initiated_id=$3 769 " 770 ).bind_timestamp(timestamp) 771 .bind(magnet_code as i64) 772 .bind(id as i64) 773 .execute(&mut *db) 774 )?; 775 Ok(()) 776 } 777 778 /** Update status of a permanently failed initiated transaction */ 779 pub async fn initiated_submit_permanent_failure( 780 db: &mut PgConnection, 781 id: u64, 782 timestamp: &Timestamp, 783 msg: &str, 784 ) -> sqlx::Result<()> { 785 serialized!( 786 sqlx::query( 787 " 788 UPDATE initiated 789 SET status='permanent_failure', status_msg=$2 790 WHERE initiated_id=$3 791 ", 792 ) 793 .bind_timestamp(timestamp) 794 .bind(msg) 795 .bind(id as i64) 796 .execute(&mut *db) 797 )?; 798 Ok(()) 799 } 800 801 /** Check if an initiated transaction exist for a magnet code */ 802 pub async fn initiated_exists_for_code( 803 db: &mut PgConnection, 804 code: u64, 805 ) -> sqlx::Result<Option<u64>> { 806 serialized!( 807 sqlx::query("SELECT initiated_id FROM initiated WHERE magnet_code=$1") 808 .bind(code as i64) 809 .try_map(|r| Ok(r.try_get::<i64, _>(0)? as u64)) 810 .fetch_optional(&mut *db) 811 ) 812 } 813 814 /** Get JSON value from KV table */ 815 pub async fn kv_get<T: DeserializeOwned + Unpin + Send>( 816 db: &mut PgConnection, 817 key: &str, 818 ) -> sqlx::Result<Option<T>> { 819 serialized!( 820 sqlx::query("SELECT value FROM kv WHERE key=$1") 821 .bind(key) 822 .try_map(|r| Ok(r.try_get::<sqlx::types::Json<T>, _>(0)?.0)) 823 .fetch_optional(&mut *db) 824 ) 825 } 826 827 /** Set JSON value in KV table */ 828 pub async fn kv_set<T: Serialize>(db: &mut PgConnection, key: &str, value: &T) -> sqlx::Result<()> { 829 serialized!( 830 sqlx::query("INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value=EXCLUDED.value") 831 .bind(key) 832 .bind(sqlx::types::Json(value)) 833 .execute(&mut *db) 834 )?; 835 Ok(()) 836 } 837 838 pub enum RegistrationResult { 839 Success, 840 ReservePubReuse, 841 } 842 843 pub async fn transfer_register( 844 db: &PgPool, 845 req: &RegistrationRequest, 846 ) -> sqlx::Result<RegistrationResult> { 847 let ty: IncomingType = req.r#type.into(); 848 serialized!( 849 sqlx::query( 850 "SELECT out_reserve_pub_reuse FROM register_prepared_transfers($1,$2,$3,$4,$5,$6)" 851 ) 852 .bind(ty) 853 .bind(&req.account_pub) 854 .bind(&req.authorization_pub) 855 .bind(&req.authorization_sig) 856 .bind(req.recurrent) 857 .bind_timestamp(&Timestamp::now()) 858 .try_map(|r: PgRow| { 859 Ok(if r.try_get_flag("out_reserve_pub_reuse")? { 860 RegistrationResult::ReservePubReuse 861 } else { 862 RegistrationResult::Success 863 }) 864 }) 865 .fetch_one(db) 866 ) 867 } 868 869 pub async fn transfer_unregister(db: &PgPool, req: &Unregistration) -> sqlx::Result<bool> { 870 serialized!( 871 sqlx::query("SELECT out_found FROM delete_prepared_transfers($1,$2)") 872 .bind(&req.authorization_pub) 873 .bind_timestamp(&Timestamp::now()) 874 .try_map(|r: PgRow| r.try_get_flag("out_found")) 875 .fetch_one(db) 876 ) 877 } 878 879 #[cfg(test)] 880 mod test { 881 use jiff::{Span, Timestamp, Zoned}; 882 use serde_json::json; 883 use sqlx::{PgPool, Postgres, pool::PoolConnection, postgres::PgRow}; 884 use taler_api::{ 885 db::TypeHelper, 886 notification::dummy_listen, 887 subject::{IncomingSubject, OutgoingSubject}, 888 }; 889 use taler_common::{ 890 api_common::{EddsaPublicKey, HashCode, ShortHashCode}, 891 api_params::{History, Page}, 892 types::{ 893 amount::{amount, decimal}, 894 url, 895 utils::now_sql_stable_ts, 896 }, 897 }; 898 899 use super::TxInAdmin; 900 use crate::{ 901 constants::CONFIG_SOURCE, 902 db::{ 903 self, AddIncomingResult, AddOutgoingResult, BounceResult, Initiated, OutFailureResult, 904 TransferResult, TxIn, TxOut, TxOutKind, kv_get, kv_set, make_transfer, 905 register_bounce_tx_in, register_tx_in, register_tx_in_admin, register_tx_out, 906 }, 907 magnet_api::types::TxStatus, 908 magnet_payto, 909 }; 910 911 async fn setup() -> (PoolConnection<Postgres>, PgPool) { 912 taler_test_utils::db::db_test_setup(CONFIG_SOURCE).await 913 } 914 915 #[tokio::test] 916 async fn kv() { 917 let (mut db, _) = setup().await; 918 919 let value = json!({ 920 "name": "Mr Smith", 921 "no way": 32 922 }); 923 924 assert_eq!( 925 kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), 926 None 927 ); 928 kv_set(&mut db, "value", &value).await.unwrap(); 929 kv_set(&mut db, "value", &value).await.unwrap(); 930 assert_eq!( 931 kv_get::<serde_json::Value>(&mut db, "value").await.unwrap(), 932 Some(value) 933 ); 934 } 935 936 #[tokio::test] 937 async fn tx_in() { 938 let (mut db, pool) = setup().await; 939 940 let mut routine = async |first: &Option<IncomingSubject>, 941 second: &Option<IncomingSubject>| { 942 let (id, code) = 943 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_in") 944 .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) 945 .fetch_one(&mut *db) 946 .await 947 .unwrap(); 948 let now = now_sql_stable_ts(); 949 let date = Zoned::now().date(); 950 let later = date.tomorrow().unwrap(); 951 let tx = TxIn { 952 code, 953 amount: amount("EUR:10"), 954 subject: "subject".into(), 955 debtor: magnet_payto( 956 "payto://iban/HU30162000031000163100000000?receiver-name=name", 957 ), 958 value_date: date, 959 status: TxStatus::Completed, 960 }; 961 // Insert 962 assert_eq!( 963 register_tx_in(&mut db, &tx, first, &now) 964 .await 965 .expect("register tx in"), 966 AddIncomingResult::Success { 967 new: true, 968 pending: false, 969 row_id: id, 970 valued_at: date 971 } 972 ); 973 // Idempotent 974 assert_eq!( 975 register_tx_in( 976 &mut db, 977 &TxIn { 978 value_date: later, 979 ..tx.clone() 980 }, 981 first, 982 &now 983 ) 984 .await 985 .expect("register tx in"), 986 AddIncomingResult::Success { 987 new: false, 988 pending: false, 989 row_id: id, 990 valued_at: date 991 } 992 ); 993 // Many 994 assert_eq!( 995 register_tx_in( 996 &mut db, 997 &TxIn { 998 code: code + 1, 999 value_date: later, 1000 ..tx 1001 }, 1002 second, 1003 &now 1004 ) 1005 .await 1006 .expect("register tx in"), 1007 AddIncomingResult::Success { 1008 new: true, 1009 pending: false, 1010 row_id: id + 1, 1011 valued_at: later 1012 } 1013 ); 1014 }; 1015 1016 // Empty db 1017 assert_eq!( 1018 db::revenue_history(&pool, &History::default(), dummy_listen) 1019 .await 1020 .unwrap(), 1021 Vec::new() 1022 ); 1023 assert_eq!( 1024 db::incoming_history(&pool, &History::default(), dummy_listen) 1025 .await 1026 .unwrap(), 1027 Vec::new() 1028 ); 1029 1030 // Regular transaction 1031 routine(&None, &None).await; 1032 1033 // Reserve transaction 1034 routine( 1035 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1036 &Some(IncomingSubject::Reserve(EddsaPublicKey::rand())), 1037 ) 1038 .await; 1039 1040 // Kyc transaction 1041 routine( 1042 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1043 &Some(IncomingSubject::Kyc(EddsaPublicKey::rand())), 1044 ) 1045 .await; 1046 1047 // History 1048 assert_eq!( 1049 db::revenue_history(&pool, &History::default(), dummy_listen) 1050 .await 1051 .unwrap() 1052 .len(), 1053 6 1054 ); 1055 assert_eq!( 1056 db::incoming_history(&pool, &History::default(), dummy_listen) 1057 .await 1058 .unwrap() 1059 .len(), 1060 4 1061 ); 1062 } 1063 1064 #[tokio::test] 1065 async fn tx_in_admin() { 1066 let (_, pool) = setup().await; 1067 1068 // Empty db 1069 assert_eq!( 1070 db::incoming_history(&pool, &History::default(), dummy_listen) 1071 .await 1072 .unwrap(), 1073 Vec::new() 1074 ); 1075 1076 let now = now_sql_stable_ts(); 1077 let later = now + Span::new().hours(2); 1078 let date = Zoned::now().date(); 1079 let tx = TxInAdmin { 1080 amount: amount("EUR:10"), 1081 subject: "subject".to_owned(), 1082 debtor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"), 1083 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1084 }; 1085 // Insert 1086 assert_eq!( 1087 register_tx_in_admin(&pool, &tx, &now) 1088 .await 1089 .expect("register tx in"), 1090 AddIncomingResult::Success { 1091 new: true, 1092 pending: false, 1093 row_id: 1, 1094 valued_at: date 1095 } 1096 ); 1097 // Many 1098 assert_eq!( 1099 register_tx_in_admin( 1100 &pool, 1101 &TxInAdmin { 1102 subject: "Other".to_owned(), 1103 metadata: IncomingSubject::Reserve(EddsaPublicKey::rand()), 1104 ..tx.clone() 1105 }, 1106 &later 1107 ) 1108 .await 1109 .expect("register tx in"), 1110 AddIncomingResult::Success { 1111 new: true, 1112 pending: false, 1113 row_id: 2, 1114 valued_at: date 1115 } 1116 ); 1117 1118 // History 1119 assert_eq!( 1120 db::incoming_history(&pool, &History::default(), dummy_listen) 1121 .await 1122 .unwrap() 1123 .len(), 1124 2 1125 ); 1126 } 1127 1128 #[tokio::test] 1129 async fn tx_out() { 1130 let (mut db, pool) = setup().await; 1131 1132 let mut routine = async |first: &TxOutKind, second: &TxOutKind| { 1133 let (id, code) = 1134 sqlx::query("SELECT count(*) + 1, COALESCE(max(magnet_code), 0) + 20 FROM tx_out") 1135 .try_map(|r: PgRow| Ok((r.try_get_u64(0)?, r.try_get_u64(1)?))) 1136 .fetch_one(&mut *db) 1137 .await 1138 .unwrap(); 1139 let now = now_sql_stable_ts(); 1140 let date = Zoned::now().date(); 1141 let later = date.tomorrow().unwrap(); 1142 let tx = TxOut { 1143 code, 1144 amount: amount("HUF:10"), 1145 subject: "subject".into(), 1146 creditor: magnet_payto( 1147 "payto://iban/HU30162000031000163100000000?receiver-name=name", 1148 ), 1149 value_date: date, 1150 status: TxStatus::Completed, 1151 }; 1152 assert!(matches!( 1153 make_transfer( 1154 &pool, 1155 &db::Transfer { 1156 request_uid: HashCode::rand(), 1157 amount: decimal("10"), 1158 exchange_base_url: url("https://exchange.test.com/"), 1159 metadata: None, 1160 wtid: ShortHashCode::rand(), 1161 creditor: tx.creditor.clone() 1162 }, 1163 &now 1164 ) 1165 .await 1166 .unwrap(), 1167 TransferResult::Success { .. } 1168 )); 1169 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), tx.code) 1170 .await 1171 .expect("status success"); 1172 1173 // Insert 1174 assert_eq!( 1175 register_tx_out(&mut db, &tx, first, &now) 1176 .await 1177 .expect("register tx out"), 1178 AddOutgoingResult { 1179 result: db::RegisterResult::known, 1180 row_id: id, 1181 } 1182 ); 1183 // Idempotent 1184 assert_eq!( 1185 register_tx_out( 1186 &mut db, 1187 &TxOut { 1188 value_date: later, 1189 ..tx.clone() 1190 }, 1191 first, 1192 &now 1193 ) 1194 .await 1195 .expect("register tx out"), 1196 AddOutgoingResult { 1197 result: db::RegisterResult::idempotent, 1198 row_id: id, 1199 } 1200 ); 1201 // Recovered 1202 assert_eq!( 1203 register_tx_out( 1204 &mut db, 1205 &TxOut { 1206 code: code + 1, 1207 value_date: later, 1208 ..tx.clone() 1209 }, 1210 second, 1211 &now 1212 ) 1213 .await 1214 .expect("register tx out"), 1215 AddOutgoingResult { 1216 result: db::RegisterResult::recovered, 1217 row_id: id + 1, 1218 } 1219 ); 1220 }; 1221 1222 // Empty db 1223 assert_eq!( 1224 db::outgoing_history(&pool, &History::default(), dummy_listen) 1225 .await 1226 .unwrap(), 1227 Vec::new() 1228 ); 1229 1230 // Regular transaction 1231 routine(&TxOutKind::Simple, &TxOutKind::Simple).await; 1232 1233 // Talerable transaction 1234 routine( 1235 &TxOutKind::Talerable(OutgoingSubject::rand()), 1236 &TxOutKind::Talerable(OutgoingSubject::rand()), 1237 ) 1238 .await; 1239 1240 // Bounced transaction 1241 routine(&TxOutKind::Bounce(21), &TxOutKind::Bounce(42)).await; 1242 1243 // History 1244 assert_eq!( 1245 db::outgoing_history(&pool, &History::default(), dummy_listen) 1246 .await 1247 .unwrap() 1248 .len(), 1249 2 1250 ); 1251 } 1252 1253 #[tokio::test] 1254 async fn tx_out_failure() { 1255 let (mut db, pool) = setup().await; 1256 1257 let now = now_sql_stable_ts(); 1258 1259 // Unknown 1260 assert_eq!( 1261 db::register_tx_out_failure(&mut db, 42, None, &now) 1262 .await 1263 .unwrap(), 1264 OutFailureResult { 1265 initiated_id: None, 1266 new: false 1267 } 1268 ); 1269 assert_eq!( 1270 db::register_tx_out_failure(&mut db, 42, Some(12), &now) 1271 .await 1272 .unwrap(), 1273 OutFailureResult { 1274 initiated_id: None, 1275 new: false 1276 } 1277 ); 1278 1279 // Initiated 1280 let req = db::Transfer { 1281 request_uid: HashCode::rand(), 1282 amount: decimal("10"), 1283 exchange_base_url: url("https://exchange.test.com/"), 1284 metadata: None, 1285 wtid: ShortHashCode::rand(), 1286 creditor: magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"), 1287 }; 1288 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1289 assert_eq!( 1290 make_transfer(&pool, &req, &now).await.unwrap(), 1291 TransferResult::Success { 1292 id: 1, 1293 initiated_at: now 1294 } 1295 ); 1296 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 34) 1297 .await 1298 .expect("status success"); 1299 assert_eq!( 1300 db::register_tx_out_failure(&mut db, 34, None, &now) 1301 .await 1302 .unwrap(), 1303 OutFailureResult { 1304 initiated_id: Some(1), 1305 new: true 1306 } 1307 ); 1308 assert_eq!( 1309 db::register_tx_out_failure(&mut db, 34, None, &now) 1310 .await 1311 .unwrap(), 1312 OutFailureResult { 1313 initiated_id: Some(1), 1314 new: false 1315 } 1316 ); 1317 1318 // Recovered bounce 1319 let tx = TxIn { 1320 code: 12, 1321 amount: amount("HUF:11"), 1322 subject: "malformed transaction".into(), 1323 debtor: payto, 1324 value_date: Zoned::now().date(), 1325 status: TxStatus::Completed, 1326 }; 1327 assert_eq!( 1328 db::register_bounce_tx_in(&mut db, &tx, "no reason", &now) 1329 .await 1330 .unwrap(), 1331 BounceResult { 1332 tx_id: 1, 1333 tx_new: true, 1334 bounce_id: 2, 1335 bounce_new: true 1336 } 1337 ); 1338 assert_eq!( 1339 db::register_tx_out_failure(&mut db, 10, Some(12), &now) 1340 .await 1341 .unwrap(), 1342 OutFailureResult { 1343 initiated_id: Some(2), 1344 new: true 1345 } 1346 ); 1347 assert_eq!( 1348 db::register_tx_out_failure(&mut db, 10, Some(12), &now) 1349 .await 1350 .unwrap(), 1351 OutFailureResult { 1352 initiated_id: Some(2), 1353 new: false 1354 } 1355 ); 1356 } 1357 1358 #[tokio::test] 1359 async fn transfer() { 1360 let (_, pool) = setup().await; 1361 1362 // Empty db 1363 assert_eq!(db::transfer_by_id(&pool, 0).await.unwrap(), None); 1364 assert_eq!( 1365 db::transfer_page(&pool, &None, &Page::default()) 1366 .await 1367 .unwrap(), 1368 Vec::new() 1369 ); 1370 1371 let req = db::Transfer { 1372 request_uid: HashCode::rand(), 1373 amount: decimal("10"), 1374 exchange_base_url: url("https://exchange.test.com/"), 1375 metadata: None, 1376 wtid: ShortHashCode::rand(), 1377 creditor: magnet_payto("payto://iban/HU02162000031000164800000000?receiver-name=name"), 1378 }; 1379 let now = now_sql_stable_ts(); 1380 let later = now + Span::new().hours(2); 1381 // Insert 1382 assert_eq!( 1383 make_transfer(&pool, &req, &now).await.expect("transfer"), 1384 TransferResult::Success { 1385 id: 1, 1386 initiated_at: now 1387 } 1388 ); 1389 // Idempotent 1390 assert_eq!( 1391 make_transfer(&pool, &req, &later).await.expect("transfer"), 1392 TransferResult::Success { 1393 id: 1, 1394 initiated_at: now 1395 } 1396 ); 1397 // Request UID reuse 1398 assert_eq!( 1399 make_transfer( 1400 &pool, 1401 &db::Transfer { 1402 wtid: ShortHashCode::rand(), 1403 ..req.clone() 1404 }, 1405 &now 1406 ) 1407 .await 1408 .expect("transfer"), 1409 TransferResult::RequestUidReuse 1410 ); 1411 // wtid reuse 1412 assert_eq!( 1413 make_transfer( 1414 &pool, 1415 &db::Transfer { 1416 request_uid: HashCode::rand(), 1417 ..req.clone() 1418 }, 1419 &now 1420 ) 1421 .await 1422 .expect("transfer"), 1423 TransferResult::WtidReuse 1424 ); 1425 // Many 1426 assert_eq!( 1427 make_transfer( 1428 &pool, 1429 &db::Transfer { 1430 request_uid: HashCode::rand(), 1431 wtid: ShortHashCode::rand(), 1432 ..req 1433 }, 1434 &later 1435 ) 1436 .await 1437 .expect("transfer"), 1438 TransferResult::Success { 1439 id: 2, 1440 initiated_at: later 1441 } 1442 ); 1443 1444 // Get 1445 assert!(db::transfer_by_id(&pool, 1).await.unwrap().is_some()); 1446 assert!(db::transfer_by_id(&pool, 2).await.unwrap().is_some()); 1447 assert!(db::transfer_by_id(&pool, 3).await.unwrap().is_none()); 1448 assert_eq!( 1449 db::transfer_page(&pool, &None, &Page::default()) 1450 .await 1451 .unwrap() 1452 .len(), 1453 2 1454 ); 1455 } 1456 1457 #[tokio::test] 1458 async fn bounce() { 1459 let (mut db, _) = setup().await; 1460 1461 let amount = amount("HUF:10"); 1462 let payto = magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1463 let now = now_sql_stable_ts(); 1464 let date = Zoned::now().date(); 1465 1466 // Empty db 1467 assert!(db::pending_batch(&mut db, &now).await.unwrap().is_empty()); 1468 1469 // Insert 1470 assert_eq!( 1471 register_tx_in( 1472 &mut db, 1473 &TxIn { 1474 code: 13, 1475 amount: amount.clone(), 1476 subject: "subject".into(), 1477 debtor: payto.clone(), 1478 value_date: date, 1479 status: TxStatus::Completed 1480 }, 1481 &None, 1482 &now 1483 ) 1484 .await 1485 .expect("register tx in"), 1486 AddIncomingResult::Success { 1487 new: true, 1488 pending: false, 1489 row_id: 1, 1490 valued_at: date 1491 } 1492 ); 1493 1494 // Bounce 1495 assert_eq!( 1496 register_bounce_tx_in( 1497 &mut db, 1498 &TxIn { 1499 code: 12, 1500 amount: amount.clone(), 1501 subject: "subject".into(), 1502 debtor: payto.clone(), 1503 value_date: date, 1504 status: TxStatus::Completed 1505 }, 1506 "good reason", 1507 &now 1508 ) 1509 .await 1510 .expect("bounce"), 1511 BounceResult { 1512 tx_id: 2, 1513 tx_new: true, 1514 bounce_id: 1, 1515 bounce_new: true 1516 } 1517 ); 1518 // Idempotent 1519 assert_eq!( 1520 register_bounce_tx_in( 1521 &mut db, 1522 &TxIn { 1523 code: 12, 1524 amount: amount.clone(), 1525 subject: "subject".into(), 1526 debtor: payto.clone(), 1527 value_date: date, 1528 status: TxStatus::Completed 1529 }, 1530 "good reason", 1531 &now 1532 ) 1533 .await 1534 .expect("bounce"), 1535 BounceResult { 1536 tx_id: 2, 1537 tx_new: false, 1538 bounce_id: 1, 1539 bounce_new: false 1540 } 1541 ); 1542 1543 // Bounce registered 1544 assert_eq!( 1545 register_bounce_tx_in( 1546 &mut db, 1547 &TxIn { 1548 code: 13, 1549 amount: amount.clone(), 1550 subject: "subject".into(), 1551 debtor: payto.clone(), 1552 value_date: date, 1553 status: TxStatus::Completed 1554 }, 1555 "good reason", 1556 &now 1557 ) 1558 .await 1559 .expect("bounce"), 1560 BounceResult { 1561 tx_id: 1, 1562 tx_new: false, 1563 bounce_id: 2, 1564 bounce_new: true 1565 } 1566 ); 1567 // Idempotent registered 1568 assert_eq!( 1569 register_bounce_tx_in( 1570 &mut db, 1571 &TxIn { 1572 code: 13, 1573 amount: amount.clone(), 1574 subject: "subject".into(), 1575 debtor: payto.clone(), 1576 value_date: date, 1577 status: TxStatus::Completed 1578 }, 1579 "good reason", 1580 &now 1581 ) 1582 .await 1583 .expect("bounce"), 1584 BounceResult { 1585 tx_id: 1, 1586 tx_new: false, 1587 bounce_id: 2, 1588 bounce_new: false 1589 } 1590 ); 1591 1592 // Batch 1593 assert_eq!( 1594 db::pending_batch(&mut db, &now).await.unwrap(), 1595 &[ 1596 Initiated { 1597 id: 1, 1598 amount: amount.clone(), 1599 subject: "bounce: 12".into(), 1600 creditor: payto.clone() 1601 }, 1602 Initiated { 1603 id: 2, 1604 amount, 1605 subject: "bounce: 13".into(), 1606 creditor: payto 1607 } 1608 ] 1609 ); 1610 } 1611 1612 #[tokio::test] 1613 async fn status() { 1614 let (mut db, _) = setup().await; 1615 1616 // Unknown transfer 1617 db::initiated_submit_permanent_failure(&mut db, 1, &Timestamp::now(), "msg") 1618 .await 1619 .unwrap(); 1620 db::initiated_submit_success(&mut db, 1, &Timestamp::now(), 12) 1621 .await 1622 .unwrap(); 1623 } 1624 1625 #[tokio::test] 1626 async fn batch() { 1627 let (mut db, pool) = setup().await; 1628 let start = Timestamp::now(); 1629 let magnet_payto = 1630 magnet_payto("payto://iban/HU30162000031000163100000000?receiver-name=name"); 1631 1632 // Empty db 1633 let pendings = db::pending_batch(&mut db, &start) 1634 .await 1635 .expect("pending_batch"); 1636 assert_eq!(pendings.len(), 0); 1637 1638 // Some transfers 1639 for i in 0..3 { 1640 make_transfer( 1641 &pool, 1642 &db::Transfer { 1643 request_uid: HashCode::rand(), 1644 amount: decimal(format!("{}", i + 1)), 1645 exchange_base_url: url("https://exchange.test.com/"), 1646 metadata: None, 1647 wtid: ShortHashCode::rand(), 1648 creditor: magnet_payto.clone(), 1649 }, 1650 &Timestamp::now(), 1651 ) 1652 .await 1653 .expect("transfer"); 1654 } 1655 let pendings = db::pending_batch(&mut db, &start) 1656 .await 1657 .expect("pending_batch"); 1658 assert_eq!(pendings.len(), 3); 1659 1660 // Max 100 txs in batch 1661 for i in 0..100 { 1662 make_transfer( 1663 &pool, 1664 &db::Transfer { 1665 request_uid: HashCode::rand(), 1666 amount: decimal(format!("{}", i + 1)), 1667 exchange_base_url: url("https://exchange.test.com/"), 1668 metadata: None, 1669 wtid: ShortHashCode::rand(), 1670 creditor: magnet_payto.clone(), 1671 }, 1672 &Timestamp::now(), 1673 ) 1674 .await 1675 .expect("transfer"); 1676 } 1677 let pendings = db::pending_batch(&mut db, &start) 1678 .await 1679 .expect("pending_batch"); 1680 assert_eq!(pendings.len(), 100); 1681 1682 // Skip uploaded 1683 for i in 0..=10 { 1684 db::initiated_submit_success(&mut db, i, &Timestamp::now(), i) 1685 .await 1686 .expect("status success"); 1687 } 1688 let pendings = db::pending_batch(&mut db, &start) 1689 .await 1690 .expect("pending_batch"); 1691 assert_eq!(pendings.len(), 93); 1692 1693 // Skip failed 1694 for i in 0..=10 { 1695 db::initiated_submit_permanent_failure(&mut db, 10 + i, &Timestamp::now(), "failure") 1696 .await 1697 .expect("status failure"); 1698 } 1699 let pendings = db::pending_batch(&mut db, &start) 1700 .await 1701 .expect("pending_batch"); 1702 assert_eq!(pendings.len(), 83); 1703 } 1704 }