omni_orchestrator/schemas/v1/db/
mod.rs

1pub mod utils;
2pub mod queries;
3
4use rocket::{http::Status, serde::json::{self, Json}};
5use rocket::serde::json::json;
6use sqlx::{Acquire, MySql};
7use utils::split_sql_statements;
8
9use libomni::types::db::v1 as types;
10use types::platform::Platform;
11use crate::PROJECT_ROOT;
12
13pub async fn init_deployment_schema(version: i64, pool: &sqlx::Pool<MySql>) -> Result<(), sqlx::Error> {
14    println!("Initializing schema version {}", version);
15
16    // Load base schema
17    let base_schema_path = format!("{}/sql/v{}/omni_up.sql", PROJECT_ROOT, version);
18    let base_schema_sql = std::fs::read_to_string(&base_schema_path)
19        .map_err(|e| {
20            println!("Failed to read base schema file '{}': {}", base_schema_path, e);
21            sqlx::Error::Io(e)
22        })?;
23    let mut statements = split_sql_statements(&base_schema_sql);
24
25    // Add all versions up to the requested schema version
26    for v in 1..=version {
27        let version_file = format!("{}/sql/versions/V{}/omni_up.sql", PROJECT_ROOT, v);
28        if let Ok(sql) = std::fs::read_to_string(version_file.clone()) {
29            println!("Stepping up to version {} using {}", v, version_file);
30            statements.extend(split_sql_statements(&sql));
31        }
32    }
33
34    // Execute each statement separately
35    for statement in statements {
36        if !statement.trim().is_empty() {
37            println!("Executing statement: {}", statement);
38            sqlx::query(&statement).execute(&*pool).await?;
39        }
40    }
41
42    Ok(())
43}
44
45pub async fn init_platform_schema(
46    platform_name: &str,
47    platform_id: i64,
48    version: i64,
49    db_manager: &crate::db_manager::DatabaseManager,
50) -> Result<(), sqlx::Error> {
51    println!("Initializing schema version {}", version);
52
53    // Get platform-specific database pool
54    let platform_name_string = platform_name.to_string();
55    let pool = match db_manager.get_platform_pool(&platform_name_string, platform_id).await {
56        Ok(pool) => pool,
57        Err(e) => {
58            println!("Failed to connect to platform database: {}", e);
59            return Err(sqlx::Error::Io(std::io::Error::new(
60                std::io::ErrorKind::Other,
61                "Failed to connect to platform database",
62            )));
63        }
64    };
65
66    // Load base schema
67    let base_schema_path = format!("{}/sql/v{}/platform_up.sql", PROJECT_ROOT, version);
68    let base_schema_sql = std::fs::read_to_string(&base_schema_path)
69        .map_err(|e| {
70            println!("Failed to read base schema file '{}': {}", base_schema_path, e);
71            sqlx::Error::Io(e)
72        })?;
73    let mut statements = split_sql_statements(&base_schema_sql);
74
75    // Add all versions up to the requested schema version
76    for v in 1..=version {
77        let version_file = format!("{}/sql/versions/V{}/platform_up.sql", PROJECT_ROOT, v);
78        if let Ok(sql) = std::fs::read_to_string(version_file.clone()) {
79            println!("Stepping up to version {} using {}", v, version_file);
80            statements.extend(split_sql_statements(&sql));
81        }
82    }
83
84    // Execute each statement separately
85    for statement in statements {
86        if !statement.trim().is_empty() {
87            println!("Executing statement: {}", statement);
88            sqlx::query(&statement).execute(&pool).await?;
89        }
90    }
91
92    Ok(())
93}
94
95pub async fn sample_deployment_data(pool: &sqlx::Pool<MySql>, version: i64) -> Result<(), sqlx::Error> {
96    let mut conn = pool.acquire().await?;
97    let _trans = conn.begin().await?; // Changed to _trans since it's not used
98    let sample_data_path = format!("{}/sql/v{}/omni_sample_data.sql", PROJECT_ROOT, version);
99    let sample_data_sql = std::fs::read_to_string(&sample_data_path)
100        .map_err(|e| {
101            println!("Failed to read sample data file '{}': {}", sample_data_path, e);
102            sqlx::Error::Io(e)
103        })?;
104    let statements = split_sql_statements(&sample_data_sql);
105
106    // Execute each statement separately
107    for statement in statements {
108        if !statement.trim().is_empty() {
109            println!("Executing statement: {}", statement);
110            sqlx::query(&statement).execute(pool).await?;
111        }
112    }
113
114    Ok(())
115}
116
117pub async fn sample_platform_data(pool: &sqlx::Pool<MySql>, version: i64) -> Result<(), sqlx::Error> {
118    let mut conn = pool.acquire().await?;
119    let _trans = conn.begin().await?; // Changed to _trans since it's not used
120    let sample_data_path = format!("{}/sql/v{}/platform_sample_data.sql", PROJECT_ROOT, version);
121    let sample_data_sql = std::fs::read_to_string(&sample_data_path)
122        .map_err(|e| {
123            println!("Failed to read sample data file '{}': {}", sample_data_path, e);
124            sqlx::Error::Io(e)
125        })?;
126    let statements = split_sql_statements(&sample_data_sql);
127
128    // Execute each statement separately
129    for statement in statements {
130        if !statement.trim().is_empty() {
131            println!("Executing statement: {}", statement);
132            sqlx::query(&statement).execute(pool).await?;
133        }
134    }
135
136    Ok(())
137}