taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (3872B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{
     18     io::ErrorKind,
     19     path::{Path, PathBuf},
     20     sync::Arc,
     21 };
     22 
     23 use sqlx::{
     24     Connection, Executor, PgPool, Row,
     25     postgres::{PgConnectOptions, PgPoolOptions},
     26 };
     27 use sqlx::{PgConnection, postgres::PgRow};
     28 use tracing::{debug, info};
     29 
     30 /* ----- Pool ----- */
     31 
     32 pub async fn pool(cfg: PgConnectOptions, schema: &str) -> sqlx::Result<PgPool> {
     33     let init_sql = Arc::new(format!(
     34         "SET search_path TO {schema};SET default_transaction_isolation = 'serializable';"
     35     ));
     36     let pool = PgPoolOptions::new()
     37         .after_connect(move |conn, _meta| {
     38             let init_sql = init_sql.clone();
     39             Box::pin(async move {
     40                 conn.execute(init_sql.as_str()).await?;
     41 
     42                 Ok(())
     43             })
     44         })
     45         .connect_with(cfg)
     46         .await?;
     47 
     48     // TODO check postgresql version ?
     49 
     50     Ok(pool)
     51 }
     52 
     53 /* ----- Migration----- */
     54 
     55 #[derive(Debug, thiserror::Error)]
     56 pub enum MigrationErr {
     57     #[error("could not read patch at '{}': {}", .0.to_string_lossy(), .1.kind())]
     58     Io(PathBuf, std::io::Error),
     59     #[error(transparent)]
     60     Sql(#[from] sqlx::Error),
     61 }
     62 
     63 pub async fn dbinit(
     64     conn: &mut PgConnection,
     65     sql_dir: &Path,
     66     prefix: &str,
     67     reset: bool,
     68 ) -> Result<(), MigrationErr> {
     69     let mut tx = conn.begin().await?;
     70 
     71     let exec_sql_file =
     72         async |conn: &mut PgConnection, file: &str, action: &str| -> Result<(), MigrationErr> {
     73             let path = sql_dir.join(file);
     74             match std::fs::read_to_string(&path) {
     75                 Ok(content) => {
     76                     info!(target: "dbinit", "applying {action}");
     77                     sqlx::raw_sql(&content).execute(conn).await?;
     78                     Ok(())
     79                 }
     80                 Err(e) => Err(MigrationErr::Io(path, e)),
     81             }
     82         };
     83 
     84     if reset {
     85         info!(target: "dbinit", "reset, sqlqir '{}'", sql_dir.to_string_lossy());
     86         exec_sql_file(&mut *tx, &format!("{prefix}-drop.sql"), "drop").await?;
     87     }
     88 
     89     info!(target: "dbinit", "initialization, sqlqir '{}'", sql_dir.to_string_lossy());
     90 
     91     exec_sql_file(&mut *tx, "versioning.sql", "versioning").await?;
     92 
     93     let applied: Vec<String> = sqlx::query("SELECT patch_name FROM _v.patches")
     94         .try_map(|r: PgRow| r.try_get(0))
     95         .fetch_all(&mut *tx)
     96         .await?;
     97     for n in 1..9999 {
     98         let patch = format!("{prefix}-{n:0>4}");
     99         if applied.contains(&patch) {
    100             debug!(target: "dbinit", "patch {patch} already applied");
    101             continue;
    102         }
    103 
    104         if let Err(e) =
    105             exec_sql_file(&mut *tx, &format!("{patch}.sql"), &format!("patch {patch}")).await
    106         {
    107             if let MigrationErr::Io(path, e) = &e
    108                 && e.kind() == ErrorKind::NotFound
    109             {
    110                 debug!(
    111                     target: "dbinit",
    112                     "path '{}' doesn't exist anymore, stopping",
    113                     path.to_string_lossy()
    114                 );
    115                 break;
    116             }
    117             return Err(e);
    118         }
    119     }
    120 
    121     exec_sql_file(&mut *tx, &format!("{prefix}-procedures.sql"), "procedures").await?;
    122 
    123     tx.commit().await?;
    124 
    125     Ok(())
    126 }