lib.rs (10464B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::{borrow::Cow, str::FromStr, sync::Arc}; 18 19 use sqlx::{Acquire, PgPool, postgres::PgListener}; 20 use taler_api::api::{Router, TalerRouter as _}; 21 use taler_common::{ 22 ExpoBackoffDecorr, 23 config::Config, 24 types::{ 25 iban::{Country, IBAN, IbanErrorKind, ParseIbanError}, 26 payto::{FullPayto, IbanPayto, Payto, PaytoErr, PaytoImpl, PaytoURI, TransferPayto}, 27 }, 28 }; 29 use tracing::{debug, error}; 30 31 use crate::{ 32 api::MagnetApi, 33 config::{ServeCfg, WorkerCfg}, 34 constants::WORKER_FREQUENCY, 35 magnet_api::client::AuthClient, 36 worker::{Worker, WorkerResult}, 37 }; 38 39 pub mod api; 40 pub mod config; 41 pub mod constants; 42 pub mod db; 43 pub mod dev; 44 pub mod magnet_api; 45 pub mod setup; 46 pub mod worker; 47 pub mod failure_injection { 48 use std::sync::Mutex; 49 50 #[derive(Debug, Clone, Copy, PartialEq, Eq, thiserror::Error)] 51 #[error("injected failure at {0}")] 52 pub struct InjectedErr(pub &'static str); 53 54 pub enum FailureLogic { 55 None, 56 Always(&'static str), 57 History(Vec<&'static str>), 58 } 59 60 impl FailureLogic { 61 /// Check whether this step should fail 62 pub fn check(&mut self, name: &'static str) -> bool { 63 match self { 64 FailureLogic::None => false, 65 FailureLogic::Always(step) => *step == name, 66 FailureLogic::History(items) => { 67 if let Some(step) = items.first() 68 && *step == name 69 { 70 items.remove(0); 71 true 72 } else { 73 false 74 } 75 } 76 } 77 } 78 } 79 80 static FAILURE_STATE: Mutex<FailureLogic> = Mutex::new(FailureLogic::None); 81 82 pub fn set_failure_logic(logic: FailureLogic) { 83 let mut lock = FAILURE_STATE.lock().unwrap(); 84 *lock = logic; 85 } 86 87 pub fn fail_point(step: &'static str) -> Result<(), InjectedErr> { 88 if FAILURE_STATE.lock().unwrap().check(step) { 89 Err(InjectedErr(step)) 90 } else { 91 Ok(()) 92 } 93 } 94 } 95 96 pub async fn run_serve(cfg: &Config, pool: PgPool) -> anyhow::Result<()> { 97 let cfg = ServeCfg::parse(cfg)?; 98 let api = Arc::new(MagnetApi::start(pool, cfg.payto).await); 99 let mut router = Router::new(); 100 if let Some(cfg) = cfg.wire_gateway { 101 router = router.wire_gateway(api.clone(), cfg.auth.method()); 102 } 103 if let Some(cfg) = cfg.revenue { 104 router = router.revenue(api, cfg.auth.method()); 105 } 106 router.serve(cfg.serve, None).await?; 107 Ok(()) 108 } 109 110 pub async fn run_worker( 111 cfg: &Config, 112 pool: &PgPool, 113 client: &reqwest::Client, 114 ) -> anyhow::Result<()> { 115 let cfg = WorkerCfg::parse(cfg)?; 116 let keys = setup::load(&cfg)?; 117 let client = AuthClient::new(client, &cfg.api_url, &cfg.consumer).upgrade(&keys.access_token); 118 let mut jitter = ExpoBackoffDecorr::default(); 119 // TODO run in loop and handle errors 120 loop { 121 let res: WorkerResult = async { 122 let account = client.account(cfg.payto.bban()).await?; 123 let db = &mut PgListener::connect_with(pool).await?; 124 125 // TODO take a postgresql lock ? 126 127 // Listen to all channels 128 db.listen_all(["transfer"]).await?; 129 130 loop { 131 debug!(target: "worker", "running"); 132 Worker { 133 client: &client, 134 db: db.acquire().await?, 135 account_number: &account.number, 136 account_code: account.code, 137 key: &keys.signing_key, 138 account_type: cfg.account_type, 139 ignore_tx_before: cfg.ignore_tx_before, 140 ignore_bounces_before: cfg.ignore_bounces_before, 141 } 142 .run() 143 .await?; 144 jitter.reset(); 145 146 // Wait for notifications or sync timeout 147 if let Ok(res) = tokio::time::timeout(WORKER_FREQUENCY, db.try_recv()).await { 148 let mut ntf = res?; 149 // Conflate all notifications 150 while let Some(n) = ntf { 151 debug!(target: "worker", "notification from {}", n.channel()); 152 ntf = db.next_buffered(); 153 } 154 } 155 } 156 } 157 .await; 158 let err = res.unwrap_err(); 159 error!(target: "worker", "{err}"); 160 tokio::time::sleep(jitter.backoff()).await; 161 } 162 } 163 164 #[derive( 165 Debug, Clone, PartialEq, Eq, serde_with::DeserializeFromStr, serde_with::SerializeDisplay, 166 )] 167 pub struct HuIban(IBAN); 168 169 impl HuIban { 170 #[allow(clippy::identity_op)] 171 pub fn checksum(b: &[u8]) -> Result<(), (u8, u8)> { 172 let expected_digit = b[7] - b'0'; 173 let sum = ((b[0] - b'0') * 9) as u16 174 + ((b[1] - b'0') * 7) as u16 175 + ((b[2] - b'0') * 3) as u16 176 + ((b[3] - b'0') * 1) as u16 177 + ((b[4] - b'0') * 9) as u16 178 + ((b[5] - b'0') * 7) as u16 179 + ((b[6] - b'0') * 3) as u16; 180 let modulo = ((10 - (sum % 10)) % 10) as u8; 181 if expected_digit != modulo { 182 Err((expected_digit, modulo)) 183 } else { 184 Ok(()) 185 } 186 } 187 188 fn check_bban(bban: &str) -> Result<(), HuIbanErr> { 189 let bban = bban.as_bytes(); 190 if bban.len() != 16 && bban.len() != 24 { 191 return Err(HuIbanErr::BbanSize(bban.len())); 192 } else if !bban.iter().all(u8::is_ascii_digit) { 193 return Err(HuIbanErr::Invalid); 194 } 195 Self::checksum(&bban[..8]).map_err(|e| HuIbanErr::checksum("bank-branch number", e))?; 196 if bban.len() == 16 { 197 Self::checksum(&bban[8..]).map_err(|e| HuIbanErr::checksum("account number", e))?; 198 } else { 199 Self::checksum(&bban[8..16]) 200 .map_err(|e| HuIbanErr::checksum("account number first group", e))?; 201 Self::checksum(&bban[16..]) 202 .map_err(|e| HuIbanErr::checksum("account number second group", e))?; 203 } 204 Ok(()) 205 } 206 207 pub fn from_bban(bban: &str) -> Result<Self, HuIbanErr> { 208 Self::check_bban(bban)?; 209 let full_bban = if bban.len() == 16 { 210 Cow::Owned(format!("{bban}00000000")) 211 } else { 212 Cow::Borrowed(bban) 213 }; 214 let iban = IBAN::from_parts(Country::HU, &full_bban); 215 Ok(Self(iban)) 216 } 217 218 pub fn bban(&self) -> &str { 219 let bban = self.0.bban(); 220 bban.strip_suffix("00000000").unwrap_or(bban) 221 } 222 223 pub fn iban(&self) -> &str { 224 self.0.as_ref() 225 } 226 } 227 228 #[derive(Debug, thiserror::Error)] 229 pub enum HuIbanErr { 230 #[error("contains illegal characters (only 0-9 allowed)")] 231 Invalid, 232 #[error("expected an hungarian IBAN starting with HU got {0}")] 233 Country(Country), 234 #[error("invalid length expected 16 or 24 chars got {0}")] 235 BbanSize(usize), 236 #[error("invalid checksum for {0} expected {1} got {2}")] 237 Checksum(&'static str, u8, u8), 238 #[error(transparent)] 239 Iban(IbanErrorKind), 240 } 241 242 impl From<ParseIbanError> for HuIbanErr { 243 fn from(value: ParseIbanError) -> Self { 244 Self::Iban(value.kind) 245 } 246 } 247 248 impl HuIbanErr { 249 fn checksum(part: &'static str, (expected, checksum): (u8, u8)) -> Self { 250 Self::Checksum(part, expected, checksum) 251 } 252 } 253 254 impl TryFrom<IBAN> for HuIban { 255 type Error = HuIbanErr; 256 257 fn try_from(iban: IBAN) -> Result<Self, Self::Error> { 258 let country = iban.country(); 259 if country != Country::HU { 260 return Err(HuIbanErr::Country(country)); 261 } 262 263 Self::check_bban(iban.bban())?; 264 265 Ok(Self(iban)) 266 } 267 } 268 269 impl PaytoImpl for HuIban { 270 fn as_payto(&self) -> PaytoURI { 271 PaytoURI::from_parts("iban", format_args!("/{}", self.0)) 272 } 273 274 fn parse(raw: &PaytoURI) -> Result<Self, PaytoErr> { 275 let iban_payto = IbanPayto::try_from(raw).map_err(PaytoErr::custom)?; 276 HuIban::try_from(iban_payto.into_inner().iban).map_err(PaytoErr::custom) 277 } 278 } 279 280 impl FromStr for HuIban { 281 type Err = HuIbanErr; 282 283 fn from_str(s: &str) -> Result<Self, Self::Err> { 284 let iban: IBAN = s.parse()?; 285 Self::try_from(iban) 286 } 287 } 288 289 impl std::fmt::Display for HuIban { 290 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 291 self.0.fmt(f) 292 } 293 } 294 295 /// Parse a magnet payto URI, panic if malformed 296 pub fn magnet_payto(url: impl AsRef<str>) -> FullHuPayto { 297 url.as_ref().parse().expect("invalid magnet payto") 298 } 299 300 pub type HuPayto = Payto<HuIban>; 301 pub type FullHuPayto = FullPayto<HuIban>; 302 pub type TransferHuPayto = TransferPayto<HuIban>; 303 304 #[cfg(test)] 305 mod test { 306 use taler_common::types::{ 307 iban::IBAN, 308 payto::{Payto, PaytoImpl, payto}, 309 }; 310 311 use crate::HuIban; 312 313 #[test] 314 fn hu_iban() { 315 for (valid, account) in [ 316 ( 317 payto("payto://iban/HU30162000031000163100000000"), 318 "1620000310001631", 319 ), 320 ( 321 payto("payto://iban/HU02162000031000164800000000"), 322 "1620000310001648", 323 ), 324 ( 325 payto("payto://iban/HU60162000101006446300000000"), 326 "1620001010064463", 327 ), 328 ] { 329 // Parsing 330 let iban_payto: Payto<IBAN> = (&valid).try_into().unwrap(); 331 let hu_payto: HuIban = iban_payto.into_inner().try_into().unwrap(); 332 assert_eq!(hu_payto.bban(), account); 333 // Roundtrip 334 let iban = HuIban::from_bban(&account).unwrap(); 335 let payto = iban.as_payto(); 336 assert_eq!(payto, valid); 337 } 338 } 339 }