omni_orchestrator/db_manager/
migration.rs1use sqlx::{MySql, Pool};
2use std::env;
3use log::{info, warn, error};
4use colored::Colorize;
5use crate::db_manager;
6use crate::db_manager::error::DatabaseError;
7use crate::schemas::v1::models::platform::{self, Platform};
8
9pub struct MigrationManager;
11
12impl MigrationManager {
13 pub async fn initialize_main_schema(
15 db_manager: &db_manager::DatabaseManager
16 ) -> Result<(), DatabaseError> {
17 info!("Initializing main database schema...");
18
19 let pool = db_manager.get_main_pool();
20
21 Self::initialize_metadata_system(pool).await?;
22
23 let target_version = Self::get_target_schema_version()?;
24 let current_version = Self::get_current_schema_version(pool).await?;
25
26 if current_version == target_version {
27 info!("Schema version check: OK (version {})", current_version);
28 return Ok(());
29 }
30
31 Self::migrate_schema(db_manager, current_version, target_version, None, None).await
32 }
33
34 pub async fn initialize_platform_schema(
36 db_manager: &db_manager::DatabaseManager,
37 platform_name: String,
38 platform_id: i64
39 ) -> Result<(), DatabaseError> {
40 info!("Initializing platform database schema for {}...", platform_name);
41
42 let pool = db_manager.get_platform_pool(&platform_name, platform_id).await?;
43
44 Self::initialize_metadata_system(&pool).await?;
46
47 let target_version = Self::get_target_schema_version()?;
48 let current_version = Self::get_current_schema_version(&pool).await?;
49
50 if current_version == target_version {
51 info!("Schema version check: OK (version {})", current_version);
52 return Ok(());
53 }
54
55 Self::migrate_schema(db_manager, current_version, target_version, Some(platform_name), Some(platform_id)).await
56 }
57
58 fn get_target_schema_version() -> Result<i64, DatabaseError> {
60 let version = env::var("OMNI_ORCH_SCHEMA_VERSION")
61 .unwrap_or_else(|_| "1".to_string())
62 .parse::<i64>()
63 .map_err(|_| DatabaseError::Other("Invalid schema version".into()))?;
64
65 Ok(version)
66 }
67
68 async fn get_current_schema_version(pool: &Pool<MySql>) -> Result<i64, DatabaseError> {
70 let metadata_exists = sqlx::query("SHOW TABLES LIKE 'metadata'")
72 .fetch_optional(pool)
73 .await
74 .map_err(|e| DatabaseError::SqlxError(e))?
75 .is_some();
76
77 if !metadata_exists {
78 return Ok(0); }
80
81 let version = crate::schemas::v1::db::queries::metadata::get_meta_value(pool, "omni_schema_version")
82 .await
83 .unwrap_or_else(|_| "0".to_string())
84 .parse::<i64>()
85 .unwrap_or(0);
86
87 Ok(version)
88 }
89
90 async fn initialize_metadata_system(pool: &Pool<MySql>) -> Result<(), DatabaseError> {
92 info!("Initializing metadata system...");
93
94 crate::schemas::v1::db::queries::metadata::initialize_metadata_system(pool)
95 .await
96 .map_err(|e| DatabaseError::MigrationError(format!(
97 "Failed to initialize metadata system: {}", e
98 )))?;
99
100 info!("✓ Metadata system initialized");
101 Ok(())
102 }
103
104 async fn migrate_schema(
106 db_manager: &super::DatabaseManager,
107 current_version: i64,
108 target_version: i64,
109 platform_name: Option<String>,
110 platform_id: Option<i64>,
111 ) -> Result<(), DatabaseError> {
112 warn!(
113 "{}",
114 format!(
115 "Schema version mismatch! Current: {}, Target: {}",
116 current_version, target_version
117 ).yellow()
118 );
119
120 let should_proceed = if env::var("OMNI_ORCH_BYPASS_CONFIRM").unwrap_or_default() == "confirm" {
122 warn!("{}", "Bypassing schema update confirmation due to env var".yellow());
123 true
124 } else {
125 warn!("Type 'confirm' to update schema version:");
128 true
130 };
131
132 if !should_proceed {
133 return Err(DatabaseError::Other("Schema update cancelled by user".into()));
134 }
135
136 let mut pool: Pool<MySql>;
137
138 match (&platform_name, platform_id) {
140 (Some(platform), Some(platform_id_val)) => {
141 info!("Initializing platform database schema...");
143
144 pool = db_manager.get_platform_pool(platform, platform_id_val).await?;
145
146 crate::schemas::v1::db::init_platform_schema(platform, platform_id_val, target_version, db_manager)
147 .await
148 .map_err(|e| DatabaseError::MigrationError(format!(
149 "Failed to migrate platform schema: {}", e
150 )))?;
151
152 info!("✓ Platform database schema initialized");
153
154 info!("Initializing platform sample data...");
156 crate::schemas::v1::db::sample_platform_data(&pool, target_version)
157 .await
158 .map_err(|e| DatabaseError::MigrationError(format!(
159 "Failed to initialize platform sample data: {}", e
160 )))?;
161
162 info!("✓ Platform sample data initialized");
163 },
164 _ => {
165 info!("Initializing deployment database schema...");
167
168 pool = db_manager.get_main_pool().clone();
169
170 crate::schemas::v1::db::init_deployment_schema(target_version, &pool)
171 .await
172 .map_err(|e| DatabaseError::MigrationError(format!(
173 "Failed to migrate deployment schema: {}", e
174 )))?;
175
176 info!("✓ Deployment database schema initialized");
177
178 info!("Initializing deployment sample data...");
180 crate::schemas::v1::db::sample_deployment_data(&pool, target_version)
181 .await
182 .map_err(|e| DatabaseError::MigrationError(format!(
183 "Failed to initialize deployment sample data: {}", e
184 )))?;
185
186 info!("✓ Deployment sample data initialized");
187 }
188 }
189
190 crate::schemas::v1::db::queries::metadata::set_meta_value(
192 &pool,
193 "omni_schema_version",
194 &target_version.to_string(),
195 )
196 .await
197 .map_err(|e| DatabaseError::MigrationError(format!(
198 "Failed to update schema version: {}", e
199 )))?;
200
201 info!("Schema migrated from version {} to {}", current_version, target_version);
202
203 Ok(())
204 }
205}