db.rs (4039B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025, 2026 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::{ 18 io::ErrorKind, 19 path::{Path, PathBuf}, 20 sync::Arc, 21 }; 22 23 use sqlx::{ 24 Connection, Executor, PgConnection, PgPool, Row, 25 postgres::{PgConnectOptions, PgPoolOptions, PgRow}, 26 }; 27 use tracing::{debug, info}; 28 29 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)] 30 #[allow(non_camel_case_types)] 31 #[sqlx(type_name = "incoming_type")] 32 pub enum IncomingType { 33 reserve, 34 kyc, 35 map, 36 } 37 38 /* ----- Pool ----- */ 39 40 pub async fn pool(cfg: PgConnectOptions, schema: &str) -> sqlx::Result<PgPool> { 41 let init_sql = Arc::new(format!( 42 "SET search_path TO {schema};SET default_transaction_isolation = 'serializable';" 43 )); 44 let pool = PgPoolOptions::new() 45 .after_connect(move |conn, _meta| { 46 let init_sql = init_sql.clone(); 47 Box::pin(async move { 48 conn.execute(init_sql.as_str()).await?; 49 50 Ok(()) 51 }) 52 }) 53 .connect_with(cfg) 54 .await?; 55 56 // TODO check postgresql version ? 57 58 Ok(pool) 59 } 60 61 /* ----- Migration----- */ 62 63 #[derive(Debug, thiserror::Error)] 64 pub enum MigrationErr { 65 #[error("could not read patch at '{}': {}", .0.to_string_lossy(), .1.kind())] 66 Io(PathBuf, std::io::Error), 67 #[error(transparent)] 68 Sql(#[from] sqlx::Error), 69 } 70 71 pub async fn dbinit( 72 conn: &mut PgConnection, 73 sql_dir: &Path, 74 prefix: &str, 75 reset: bool, 76 ) -> Result<(), MigrationErr> { 77 let mut tx = conn.begin().await?; 78 79 let exec_sql_file = 80 async |conn: &mut PgConnection, file: &str, action: &str| -> Result<(), MigrationErr> { 81 let path = sql_dir.join(file); 82 match std::fs::read_to_string(&path) { 83 Ok(content) => { 84 info!(target: "dbinit", "applying {action}"); 85 sqlx::raw_sql(&content).execute(conn).await?; 86 Ok(()) 87 } 88 Err(e) => Err(MigrationErr::Io(path, e)), 89 } 90 }; 91 92 if reset { 93 info!(target: "dbinit", "reset, sqlqir '{}'", sql_dir.to_string_lossy()); 94 exec_sql_file(&mut *tx, &format!("{prefix}-drop.sql"), "drop").await?; 95 } 96 97 info!(target: "dbinit", "initialization, sqlqir '{}'", sql_dir.to_string_lossy()); 98 99 exec_sql_file(&mut *tx, "versioning.sql", "versioning").await?; 100 101 let applied: Vec<String> = sqlx::query("SELECT patch_name FROM _v.patches") 102 .try_map(|r: PgRow| r.try_get(0)) 103 .fetch_all(&mut *tx) 104 .await?; 105 for n in 1..9999 { 106 let patch = format!("{prefix}-{n:0>4}"); 107 if applied.contains(&patch) { 108 debug!(target: "dbinit", "patch {patch} already applied"); 109 continue; 110 } 111 112 if let Err(e) = 113 exec_sql_file(&mut *tx, &format!("{patch}.sql"), &format!("patch {patch}")).await 114 { 115 if let MigrationErr::Io(path, e) = &e 116 && e.kind() == ErrorKind::NotFound 117 { 118 debug!( 119 target: "dbinit", 120 "path '{}' doesn't exist anymore, stopping", 121 path.to_string_lossy() 122 ); 123 break; 124 } 125 return Err(e); 126 } 127 } 128 129 exec_sql_file(&mut *tx, &format!("{prefix}-procedures.sql"), "procedures").await?; 130 131 tx.commit().await?; 132 133 Ok(()) 134 }