taler-rust

GNU Taler code in Rust. Largely core banking integrations.
Log | Files | Refs | Submodules | README | LICENSE

db.rs (4039B)


      1 /*
      2   This file is part of TALER
      3   Copyright (C) 2025, 2026 Taler Systems SA
      4 
      5   TALER is free software; you can redistribute it and/or modify it under the
      6   terms of the GNU Affero General Public License as published by the Free Software
      7   Foundation; either version 3, or (at your option) any later version.
      8 
      9   TALER is distributed in the hope that it will be useful, but WITHOUT ANY
     10   WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
     11   A PARTICULAR PURPOSE.  See the GNU Affero General Public License for more details.
     12 
     13   You should have received a copy of the GNU Affero General Public License along with
     14   TALER; see the file COPYING.  If not, see <http://www.gnu.org/licenses/>
     15 */
     16 
     17 use std::{
     18     io::ErrorKind,
     19     path::{Path, PathBuf},
     20     sync::Arc,
     21 };
     22 
     23 use sqlx::{
     24     Connection, Executor, PgConnection, PgPool, Row,
     25     postgres::{PgConnectOptions, PgPoolOptions, PgRow},
     26 };
     27 use tracing::{debug, info};
     28 
     29 #[derive(Debug, Clone, Copy, PartialEq, Eq, sqlx::Type)]
     30 #[allow(non_camel_case_types)]
     31 #[sqlx(type_name = "incoming_type")]
     32 pub enum IncomingType {
     33     reserve,
     34     kyc,
     35     map,
     36 }
     37 
     38 /* ----- Pool ----- */
     39 
     40 pub async fn pool(cfg: PgConnectOptions, schema: &str) -> sqlx::Result<PgPool> {
     41     let init_sql = Arc::new(format!(
     42         "SET search_path TO {schema};SET default_transaction_isolation = 'serializable';"
     43     ));
     44     let pool = PgPoolOptions::new()
     45         .after_connect(move |conn, _meta| {
     46             let init_sql = init_sql.clone();
     47             Box::pin(async move {
     48                 conn.execute(init_sql.as_str()).await?;
     49 
     50                 Ok(())
     51             })
     52         })
     53         .connect_with(cfg)
     54         .await?;
     55 
     56     // TODO check postgresql version ?
     57 
     58     Ok(pool)
     59 }
     60 
     61 /* ----- Migration----- */
     62 
     63 #[derive(Debug, thiserror::Error)]
     64 pub enum MigrationErr {
     65     #[error("could not read patch at '{}': {}", .0.to_string_lossy(), .1.kind())]
     66     Io(PathBuf, std::io::Error),
     67     #[error(transparent)]
     68     Sql(#[from] sqlx::Error),
     69 }
     70 
     71 pub async fn dbinit(
     72     conn: &mut PgConnection,
     73     sql_dir: &Path,
     74     prefix: &str,
     75     reset: bool,
     76 ) -> Result<(), MigrationErr> {
     77     let mut tx = conn.begin().await?;
     78 
     79     let exec_sql_file =
     80         async |conn: &mut PgConnection, file: &str, action: &str| -> Result<(), MigrationErr> {
     81             let path = sql_dir.join(file);
     82             match std::fs::read_to_string(&path) {
     83                 Ok(content) => {
     84                     info!(target: "dbinit", "applying {action}");
     85                     sqlx::raw_sql(&content).execute(conn).await?;
     86                     Ok(())
     87                 }
     88                 Err(e) => Err(MigrationErr::Io(path, e)),
     89             }
     90         };
     91 
     92     if reset {
     93         info!(target: "dbinit", "reset, sqlqir '{}'", sql_dir.to_string_lossy());
     94         exec_sql_file(&mut *tx, &format!("{prefix}-drop.sql"), "drop").await?;
     95     }
     96 
     97     info!(target: "dbinit", "initialization, sqlqir '{}'", sql_dir.to_string_lossy());
     98 
     99     exec_sql_file(&mut *tx, "versioning.sql", "versioning").await?;
    100 
    101     let applied: Vec<String> = sqlx::query("SELECT patch_name FROM _v.patches")
    102         .try_map(|r: PgRow| r.try_get(0))
    103         .fetch_all(&mut *tx)
    104         .await?;
    105     for n in 1..9999 {
    106         let patch = format!("{prefix}-{n:0>4}");
    107         if applied.contains(&patch) {
    108             debug!(target: "dbinit", "patch {patch} already applied");
    109             continue;
    110         }
    111 
    112         if let Err(e) =
    113             exec_sql_file(&mut *tx, &format!("{patch}.sql"), &format!("patch {patch}")).await
    114         {
    115             if let MigrationErr::Io(path, e) = &e
    116                 && e.kind() == ErrorKind::NotFound
    117             {
    118                 debug!(
    119                     target: "dbinit",
    120                     "path '{}' doesn't exist anymore, stopping",
    121                     path.to_string_lossy()
    122                 );
    123                 break;
    124             }
    125             return Err(e);
    126         }
    127     }
    128 
    129     exec_sql_file(&mut *tx, &format!("{prefix}-procedures.sql"), "procedures").await?;
    130 
    131     tx.commit().await?;
    132 
    133     Ok(())
    134 }