omni_orchestrator/db_manager/
migration.rs1use sqlx::{MySql, Pool};
2use std::env;
3use log::{info, warn, error};
4use colored::Colorize;
5use crate::db_manager;
6use crate::db_manager::error::DatabaseError;
7
8use libomni::types::db::v1 as types;
10use types::platform::{self, Platform};
11
12pub struct MigrationManager;
14
15impl MigrationManager {
16 pub async fn initialize_main_schema(
18 db_manager: &db_manager::DatabaseManager
19 ) -> Result<(), DatabaseError> {
20 info!("Initializing main database schema...");
21
22 let pool = db_manager.get_main_pool();
23
24 Self::initialize_metadata_system(pool).await?;
25
26 let target_version = Self::get_target_schema_version()?;
27 let current_version = Self::get_current_schema_version(pool).await?;
28
29 if current_version == target_version {
30 info!("Schema version check: OK (version {})", current_version);
31 return Ok(());
32 }
33
34 Self::migrate_schema(db_manager, current_version, target_version, None, None).await
35 }
36
37 pub async fn initialize_platform_schema(
39 db_manager: &db_manager::DatabaseManager,
40 platform_name: String,
41 platform_id: i64
42 ) -> Result<(), DatabaseError> {
43 info!("Initializing platform database schema for {}...", platform_name);
44
45 let pool = db_manager.get_platform_pool(&platform_name, platform_id).await?;
46
47 Self::initialize_metadata_system(&pool).await?;
49
50 let target_version = Self::get_target_schema_version()?;
51 let current_version = Self::get_current_schema_version(&pool).await?;
52
53 if current_version == target_version {
54 info!("Schema version check: OK (version {})", current_version);
55 return Ok(());
56 }
57
58 Self::migrate_schema(db_manager, current_version, target_version, Some(platform_name), Some(platform_id)).await
59 }
60
61 fn get_target_schema_version() -> Result<i64, DatabaseError> {
63 let version = env::var("OMNI_ORCH_SCHEMA_VERSION")
64 .unwrap_or_else(|_| "1".to_string())
65 .parse::<i64>()
66 .map_err(|_| DatabaseError::Other("Invalid schema version".into()))?;
67
68 Ok(version)
69 }
70
71 async fn get_current_schema_version(pool: &Pool<MySql>) -> Result<i64, DatabaseError> {
73 let metadata_exists = sqlx::query("SHOW TABLES LIKE 'metadata'")
75 .fetch_optional(pool)
76 .await
77 .map_err(|e| DatabaseError::SqlxError(e))?
78 .is_some();
79
80 if !metadata_exists {
81 return Ok(0); }
83
84 let version = crate::schemas::v1::db::queries::metadata::get_meta_value(pool, "omni_schema_version")
85 .await
86 .unwrap_or_else(|_| "0".to_string())
87 .parse::<i64>()
88 .unwrap_or(0);
89
90 Ok(version)
91 }
92
93 async fn initialize_metadata_system(pool: &Pool<MySql>) -> Result<(), DatabaseError> {
95 info!("Initializing metadata system...");
96
97 crate::schemas::v1::db::queries::metadata::initialize_metadata_system(pool)
98 .await
99 .map_err(|e| DatabaseError::MigrationError(format!(
100 "Failed to initialize metadata system: {}", e
101 )))?;
102
103 info!("✓ Metadata system initialized");
104 Ok(())
105 }
106
107 async fn migrate_schema(
109 db_manager: &super::DatabaseManager,
110 current_version: i64,
111 target_version: i64,
112 platform_name: Option<String>,
113 platform_id: Option<i64>,
114 ) -> Result<(), DatabaseError> {
115 warn!(
116 "{}",
117 format!(
118 "Schema version mismatch! Current: {}, Target: {}",
119 current_version, target_version
120 ).yellow()
121 );
122
123 let should_proceed = if env::var("OMNI_ORCH_BYPASS_CONFIRM").unwrap_or_default() == "confirm" {
125 warn!("{}", "Bypassing schema update confirmation due to env var".yellow());
126 true
127 } else {
128 warn!("Type 'confirm' to update schema version:");
131 true
133 };
134
135 if !should_proceed {
136 return Err(DatabaseError::Other("Schema update cancelled by user".into()));
137 }
138
139 let mut pool: Pool<MySql>;
140
141 match (&platform_name, platform_id) {
143 (Some(platform), Some(platform_id_val)) => {
144 info!("Initializing platform database schema...");
146
147 pool = db_manager.get_platform_pool(platform, platform_id_val).await?;
148
149 crate::schemas::v1::db::init_platform_schema(platform, platform_id_val, target_version, db_manager)
150 .await
151 .map_err(|e| DatabaseError::MigrationError(format!(
152 "Failed to migrate platform schema: {}", e
153 )))?;
154
155 info!("✓ Platform database schema initialized");
156
157 info!("Initializing platform sample data...");
159 crate::schemas::v1::db::sample_platform_data(&pool, target_version)
160 .await
161 .map_err(|e| DatabaseError::MigrationError(format!(
162 "Failed to initialize platform sample data: {}", e
163 )))?;
164
165 info!("✓ Platform sample data initialized");
166 },
167 _ => {
168 info!("Initializing deployment database schema...");
170
171 pool = db_manager.get_main_pool().clone();
172
173 crate::schemas::v1::db::init_deployment_schema(target_version, &pool)
174 .await
175 .map_err(|e| DatabaseError::MigrationError(format!(
176 "Failed to migrate deployment schema: {}", e
177 )))?;
178
179 info!("✓ Deployment database schema initialized");
180
181 info!("Initializing deployment sample data...");
183 crate::schemas::v1::db::sample_deployment_data(&pool, target_version)
184 .await
185 .map_err(|e| DatabaseError::MigrationError(format!(
186 "Failed to initialize deployment sample data: {}", e
187 )))?;
188
189 info!("✓ Deployment sample data initialized");
190 }
191 }
192
193 crate::schemas::v1::db::queries::metadata::set_meta_value(
195 &pool,
196 "omni_schema_version",
197 &target_version.to_string(),
198 )
199 .await
200 .map_err(|e| DatabaseError::MigrationError(format!(
201 "Failed to update schema version: {}", e
202 )))?;
203
204 info!("Schema migrated from version {} to {}", current_version, target_version);
205
206 Ok(())
207 }
208}