omni_orchestrator/schemas/v1/db/
mod.rs1pub mod utils;
2pub mod queries;
3
4use rocket::{http::Status, serde::json::{self, Json}};
5use rocket::serde::json::json;
6use sqlx::{Acquire, MySql};
7use utils::split_sql_statements;
8use crate::{models::platform::Platform, PROJECT_ROOT};
9
10pub async fn init_deployment_schema(version: i64, pool: &sqlx::Pool<MySql>) -> Result<(), sqlx::Error> {
11 println!("Initializing schema version {}", version);
12
13 let base_schema_path = format!("{}/sql/v{}/omni_up.sql", PROJECT_ROOT, version);
15 let base_schema_sql = std::fs::read_to_string(&base_schema_path)
16 .map_err(|e| {
17 println!("Failed to read base schema file '{}': {}", base_schema_path, e);
18 sqlx::Error::Io(e)
19 })?;
20 let mut statements = split_sql_statements(&base_schema_sql);
21
22 for v in 1..=version {
24 let version_file = format!("{}/sql/versions/V{}/omni_up.sql", PROJECT_ROOT, v);
25 if let Ok(sql) = std::fs::read_to_string(version_file.clone()) {
26 println!("Stepping up to version {} using {}", v, version_file);
27 statements.extend(split_sql_statements(&sql));
28 }
29 }
30
31 for statement in statements {
33 if !statement.trim().is_empty() {
34 println!("Executing statement: {}", statement);
35 sqlx::query(&statement).execute(&*pool).await?;
36 }
37 }
38
39 Ok(())
40}
41
42pub async fn init_platform_schema(
43 platform_name: &str,
44 platform_id: i64,
45 version: i64,
46 db_manager: &crate::db_manager::DatabaseManager,
47) -> Result<(), sqlx::Error> {
48 println!("Initializing schema version {}", version);
49
50 let platform_name_string = platform_name.to_string();
52 let pool = match db_manager.get_platform_pool(&platform_name_string, platform_id).await {
53 Ok(pool) => pool,
54 Err(e) => {
55 println!("Failed to connect to platform database: {}", e);
56 return Err(sqlx::Error::Io(std::io::Error::new(
57 std::io::ErrorKind::Other,
58 "Failed to connect to platform database",
59 )));
60 }
61 };
62
63 let base_schema_path = format!("{}/sql/v{}/platform_up.sql", PROJECT_ROOT, version);
65 let base_schema_sql = std::fs::read_to_string(&base_schema_path)
66 .map_err(|e| {
67 println!("Failed to read base schema file '{}': {}", base_schema_path, e);
68 sqlx::Error::Io(e)
69 })?;
70 let mut statements = split_sql_statements(&base_schema_sql);
71
72 for v in 1..=version {
74 let version_file = format!("{}/sql/versions/V{}/platform_up.sql", PROJECT_ROOT, v);
75 if let Ok(sql) = std::fs::read_to_string(version_file.clone()) {
76 println!("Stepping up to version {} using {}", v, version_file);
77 statements.extend(split_sql_statements(&sql));
78 }
79 }
80
81 for statement in statements {
83 if !statement.trim().is_empty() {
84 println!("Executing statement: {}", statement);
85 sqlx::query(&statement).execute(&pool).await?;
86 }
87 }
88
89 Ok(())
90}
91
92pub async fn sample_deployment_data(pool: &sqlx::Pool<MySql>, version: i64) -> Result<(), sqlx::Error> {
93 let mut conn = pool.acquire().await?;
94 let _trans = conn.begin().await?; let sample_data_path = format!("{}/sql/v{}/omni_sample_data.sql", PROJECT_ROOT, version);
96 let sample_data_sql = std::fs::read_to_string(&sample_data_path)
97 .map_err(|e| {
98 println!("Failed to read sample data file '{}': {}", sample_data_path, e);
99 sqlx::Error::Io(e)
100 })?;
101 let statements = split_sql_statements(&sample_data_sql);
102
103 for statement in statements {
105 if !statement.trim().is_empty() {
106 println!("Executing statement: {}", statement);
107 sqlx::query(&statement).execute(pool).await?;
108 }
109 }
110
111 Ok(())
112}
113
114pub async fn sample_platform_data(pool: &sqlx::Pool<MySql>, version: i64) -> Result<(), sqlx::Error> {
115 let mut conn = pool.acquire().await?;
116 let _trans = conn.begin().await?; let sample_data_path = format!("{}/sql/v{}/platform_sample_data.sql", PROJECT_ROOT, version);
118 let sample_data_sql = std::fs::read_to_string(&sample_data_path)
119 .map_err(|e| {
120 println!("Failed to read sample data file '{}': {}", sample_data_path, e);
121 sqlx::Error::Io(e)
122 })?;
123 let statements = split_sql_statements(&sample_data_sql);
124
125 for statement in statements {
127 if !statement.trim().is_empty() {
128 println!("Executing statement: {}", statement);
129 sqlx::query(&statement).execute(pool).await?;
130 }
131 }
132
133 Ok(())
134}