omni_orchestrator/schemas/v1/db/
mod.rs1pub mod utils;
2pub mod queries;
3
4use rocket::{http::Status, serde::json::{self, Json}};
5use rocket::serde::json::json;
6use sqlx::{Acquire, MySql};
7use utils::split_sql_statements;
8
9use libomni::types::db::v1 as types;
10use types::platform::Platform;
11use crate::PROJECT_ROOT;
12
13pub async fn init_deployment_schema(version: i64, pool: &sqlx::Pool<MySql>) -> Result<(), sqlx::Error> {
14 println!("Initializing schema version {}", version);
15
16 let base_schema_path = format!("{}/sql/v{}/omni_up.sql", PROJECT_ROOT, version);
18 let base_schema_sql = std::fs::read_to_string(&base_schema_path)
19 .map_err(|e| {
20 println!("Failed to read base schema file '{}': {}", base_schema_path, e);
21 sqlx::Error::Io(e)
22 })?;
23 let mut statements = split_sql_statements(&base_schema_sql);
24
25 for v in 1..=version {
27 let version_file = format!("{}/sql/versions/V{}/omni_up.sql", PROJECT_ROOT, v);
28 if let Ok(sql) = std::fs::read_to_string(version_file.clone()) {
29 println!("Stepping up to version {} using {}", v, version_file);
30 statements.extend(split_sql_statements(&sql));
31 }
32 }
33
34 for statement in statements {
36 if !statement.trim().is_empty() {
37 println!("Executing statement: {}", statement);
38 sqlx::query(&statement).execute(&*pool).await?;
39 }
40 }
41
42 Ok(())
43}
44
45pub async fn init_platform_schema(
46 platform_name: &str,
47 platform_id: i64,
48 version: i64,
49 db_manager: &crate::db_manager::DatabaseManager,
50) -> Result<(), sqlx::Error> {
51 println!("Initializing schema version {}", version);
52
53 let platform_name_string = platform_name.to_string();
55 let pool = match db_manager.get_platform_pool(&platform_name_string, platform_id).await {
56 Ok(pool) => pool,
57 Err(e) => {
58 println!("Failed to connect to platform database: {}", e);
59 return Err(sqlx::Error::Io(std::io::Error::new(
60 std::io::ErrorKind::Other,
61 "Failed to connect to platform database",
62 )));
63 }
64 };
65
66 let base_schema_path = format!("{}/sql/v{}/platform_up.sql", PROJECT_ROOT, version);
68 let base_schema_sql = std::fs::read_to_string(&base_schema_path)
69 .map_err(|e| {
70 println!("Failed to read base schema file '{}': {}", base_schema_path, e);
71 sqlx::Error::Io(e)
72 })?;
73 let mut statements = split_sql_statements(&base_schema_sql);
74
75 for v in 1..=version {
77 let version_file = format!("{}/sql/versions/V{}/platform_up.sql", PROJECT_ROOT, v);
78 if let Ok(sql) = std::fs::read_to_string(version_file.clone()) {
79 println!("Stepping up to version {} using {}", v, version_file);
80 statements.extend(split_sql_statements(&sql));
81 }
82 }
83
84 for statement in statements {
86 if !statement.trim().is_empty() {
87 println!("Executing statement: {}", statement);
88 sqlx::query(&statement).execute(&pool).await?;
89 }
90 }
91
92 Ok(())
93}
94
95pub async fn sample_deployment_data(pool: &sqlx::Pool<MySql>, version: i64) -> Result<(), sqlx::Error> {
96 let mut conn = pool.acquire().await?;
97 let _trans = conn.begin().await?; let sample_data_path = format!("{}/sql/v{}/omni_sample_data.sql", PROJECT_ROOT, version);
99 let sample_data_sql = std::fs::read_to_string(&sample_data_path)
100 .map_err(|e| {
101 println!("Failed to read sample data file '{}': {}", sample_data_path, e);
102 sqlx::Error::Io(e)
103 })?;
104 let statements = split_sql_statements(&sample_data_sql);
105
106 for statement in statements {
108 if !statement.trim().is_empty() {
109 println!("Executing statement: {}", statement);
110 sqlx::query(&statement).execute(pool).await?;
111 }
112 }
113
114 Ok(())
115}
116
117pub async fn sample_platform_data(pool: &sqlx::Pool<MySql>, version: i64) -> Result<(), sqlx::Error> {
118 let mut conn = pool.acquire().await?;
119 let _trans = conn.begin().await?; let sample_data_path = format!("{}/sql/v{}/platform_sample_data.sql", PROJECT_ROOT, version);
121 let sample_data_sql = std::fs::read_to_string(&sample_data_path)
122 .map_err(|e| {
123 println!("Failed to read sample data file '{}': {}", sample_data_path, e);
124 sqlx::Error::Io(e)
125 })?;
126 let statements = split_sql_statements(&sample_data_sql);
127
128 for statement in statements {
130 if !statement.trim().is_empty() {
131 println!("Executing statement: {}", statement);
132 sqlx::query(&statement).execute(pool).await?;
133 }
134 }
135
136 Ok(())
137}