db.rs (3872B)
1 /* 2 This file is part of TALER 3 Copyright (C) 2025 Taler Systems SA 4 5 TALER is free software; you can redistribute it and/or modify it under the 6 terms of the GNU Affero General Public License as published by the Free Software 7 Foundation; either version 3, or (at your option) any later version. 8 9 TALER is distributed in the hope that it will be useful, but WITHOUT ANY 10 WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR 11 A PARTICULAR PURPOSE. See the GNU Affero General Public License for more details. 12 13 You should have received a copy of the GNU Affero General Public License along with 14 TALER; see the file COPYING. If not, see <http://www.gnu.org/licenses/> 15 */ 16 17 use std::{ 18 io::ErrorKind, 19 path::{Path, PathBuf}, 20 sync::Arc, 21 }; 22 23 use sqlx::{ 24 Connection, Executor, PgPool, Row, 25 postgres::{PgConnectOptions, PgPoolOptions}, 26 }; 27 use sqlx::{PgConnection, postgres::PgRow}; 28 use tracing::{debug, info}; 29 30 /* ----- Pool ----- */ 31 32 pub async fn pool(cfg: PgConnectOptions, schema: &str) -> sqlx::Result<PgPool> { 33 let init_sql = Arc::new(format!( 34 "SET search_path TO {schema};SET default_transaction_isolation = 'serializable';" 35 )); 36 let pool = PgPoolOptions::new() 37 .after_connect(move |conn, _meta| { 38 let init_sql = init_sql.clone(); 39 Box::pin(async move { 40 conn.execute(init_sql.as_str()).await?; 41 42 Ok(()) 43 }) 44 }) 45 .connect_with(cfg) 46 .await?; 47 48 // TODO check postgresql version ? 49 50 Ok(pool) 51 } 52 53 /* ----- Migration----- */ 54 55 #[derive(Debug, thiserror::Error)] 56 pub enum MigrationErr { 57 #[error("could not read patch at '{}': {}", .0.to_string_lossy(), .1.kind())] 58 Io(PathBuf, std::io::Error), 59 #[error(transparent)] 60 Sql(#[from] sqlx::Error), 61 } 62 63 pub async fn dbinit( 64 conn: &mut PgConnection, 65 sql_dir: &Path, 66 prefix: &str, 67 reset: bool, 68 ) -> Result<(), MigrationErr> { 69 let mut tx = conn.begin().await?; 70 71 let exec_sql_file = 72 async |conn: &mut PgConnection, file: &str, action: &str| -> Result<(), MigrationErr> { 73 let path = sql_dir.join(file); 74 match std::fs::read_to_string(&path) { 75 Ok(content) => { 76 info!(target: "dbinit", "applying {action}"); 77 sqlx::raw_sql(&content).execute(conn).await?; 78 Ok(()) 79 } 80 Err(e) => Err(MigrationErr::Io(path, e)), 81 } 82 }; 83 84 if reset { 85 info!(target: "dbinit", "reset, sqlqir '{}'", sql_dir.to_string_lossy()); 86 exec_sql_file(&mut *tx, &format!("{prefix}-drop.sql"), "drop").await?; 87 } 88 89 info!(target: "dbinit", "initialization, sqlqir '{}'", sql_dir.to_string_lossy()); 90 91 exec_sql_file(&mut *tx, "versioning.sql", "versioning").await?; 92 93 let applied: Vec<String> = sqlx::query("SELECT patch_name FROM _v.patches") 94 .try_map(|r: PgRow| r.try_get(0)) 95 .fetch_all(&mut *tx) 96 .await?; 97 for n in 1..9999 { 98 let patch = format!("{prefix}-{n:0>4}"); 99 if applied.contains(&patch) { 100 debug!(target: "dbinit", "patch {patch} already applied"); 101 continue; 102 } 103 104 if let Err(e) = 105 exec_sql_file(&mut *tx, &format!("{patch}.sql"), &format!("patch {patch}")).await 106 { 107 if let MigrationErr::Io(path, e) = &e 108 && e.kind() == ErrorKind::NotFound 109 { 110 debug!( 111 target: "dbinit", 112 "path '{}' doesn't exist anymore, stopping", 113 path.to_string_lossy() 114 ); 115 break; 116 } 117 return Err(e); 118 } 119 } 120 121 exec_sql_file(&mut *tx, &format!("{prefix}-procedures.sql"), "procedures").await?; 122 123 tx.commit().await?; 124 125 Ok(()) 126 }