omni_orchestrator/schemas/v1/db/
mod.rs

1pub mod utils;
2pub mod queries;
3
4use rocket::{http::Status, serde::json::{self, Json}};
5use rocket::serde::json::json;
6use sqlx::{Acquire, MySql};
7use utils::split_sql_statements;
8use crate::{models::platform::Platform, PROJECT_ROOT};
9
10pub async fn init_deployment_schema(version: i64, pool: &sqlx::Pool<MySql>) -> Result<(), sqlx::Error> {
11    println!("Initializing schema version {}", version);
12
13    // Load base schema
14    let base_schema_path = format!("{}/sql/v{}/omni_up.sql", PROJECT_ROOT, version);
15    let base_schema_sql = std::fs::read_to_string(&base_schema_path)
16        .map_err(|e| {
17            println!("Failed to read base schema file '{}': {}", base_schema_path, e);
18            sqlx::Error::Io(e)
19        })?;
20    let mut statements = split_sql_statements(&base_schema_sql);
21
22    // Add all versions up to the requested schema version
23    for v in 1..=version {
24        let version_file = format!("{}/sql/versions/V{}/omni_up.sql", PROJECT_ROOT, v);
25        if let Ok(sql) = std::fs::read_to_string(version_file.clone()) {
26            println!("Stepping up to version {} using {}", v, version_file);
27            statements.extend(split_sql_statements(&sql));
28        }
29    }
30
31    // Execute each statement separately
32    for statement in statements {
33        if !statement.trim().is_empty() {
34            println!("Executing statement: {}", statement);
35            sqlx::query(&statement).execute(&*pool).await?;
36        }
37    }
38
39    Ok(())
40}
41
42pub async fn init_platform_schema(
43    platform_name: &str,
44    platform_id: i64,
45    version: i64,
46    db_manager: &crate::db_manager::DatabaseManager,
47) -> Result<(), sqlx::Error> {
48    println!("Initializing schema version {}", version);
49
50    // Get platform-specific database pool
51    let platform_name_string = platform_name.to_string();
52    let pool = match db_manager.get_platform_pool(&platform_name_string, platform_id).await {
53        Ok(pool) => pool,
54        Err(e) => {
55            println!("Failed to connect to platform database: {}", e);
56            return Err(sqlx::Error::Io(std::io::Error::new(
57                std::io::ErrorKind::Other,
58                "Failed to connect to platform database",
59            )));
60        }
61    };
62
63    // Load base schema
64    let base_schema_path = format!("{}/sql/v{}/platform_up.sql", PROJECT_ROOT, version);
65    let base_schema_sql = std::fs::read_to_string(&base_schema_path)
66        .map_err(|e| {
67            println!("Failed to read base schema file '{}': {}", base_schema_path, e);
68            sqlx::Error::Io(e)
69        })?;
70    let mut statements = split_sql_statements(&base_schema_sql);
71
72    // Add all versions up to the requested schema version
73    for v in 1..=version {
74        let version_file = format!("{}/sql/versions/V{}/platform_up.sql", PROJECT_ROOT, v);
75        if let Ok(sql) = std::fs::read_to_string(version_file.clone()) {
76            println!("Stepping up to version {} using {}", v, version_file);
77            statements.extend(split_sql_statements(&sql));
78        }
79    }
80
81    // Execute each statement separately
82    for statement in statements {
83        if !statement.trim().is_empty() {
84            println!("Executing statement: {}", statement);
85            sqlx::query(&statement).execute(&pool).await?;
86        }
87    }
88
89    Ok(())
90}
91
92pub async fn sample_deployment_data(pool: &sqlx::Pool<MySql>, version: i64) -> Result<(), sqlx::Error> {
93    let mut conn = pool.acquire().await?;
94    let _trans = conn.begin().await?; // Changed to _trans since it's not used
95    let sample_data_path = format!("{}/sql/v{}/omni_sample_data.sql", PROJECT_ROOT, version);
96    let sample_data_sql = std::fs::read_to_string(&sample_data_path)
97        .map_err(|e| {
98            println!("Failed to read sample data file '{}': {}", sample_data_path, e);
99            sqlx::Error::Io(e)
100        })?;
101    let statements = split_sql_statements(&sample_data_sql);
102
103    // Execute each statement separately
104    for statement in statements {
105        if !statement.trim().is_empty() {
106            println!("Executing statement: {}", statement);
107            sqlx::query(&statement).execute(pool).await?;
108        }
109    }
110
111    Ok(())
112}
113
114pub async fn sample_platform_data(pool: &sqlx::Pool<MySql>, version: i64) -> Result<(), sqlx::Error> {
115    let mut conn = pool.acquire().await?;
116    let _trans = conn.begin().await?; // Changed to _trans since it's not used
117    let sample_data_path = format!("{}/sql/v{}/platform_sample_data.sql", PROJECT_ROOT, version);
118    let sample_data_sql = std::fs::read_to_string(&sample_data_path)
119        .map_err(|e| {
120            println!("Failed to read sample data file '{}': {}", sample_data_path, e);
121            sqlx::Error::Io(e)
122        })?;
123    let statements = split_sql_statements(&sample_data_sql);
124
125    // Execute each statement separately
126    for statement in statements {
127        if !statement.trim().is_empty() {
128            println!("Executing statement: {}", statement);
129            sqlx::query(&statement).execute(pool).await?;
130        }
131    }
132
133    Ok(())
134}