omni_orchestrator/db_manager/
migration.rs

1use sqlx::{MySql, Pool};
2use std::env;
3use log::{info, warn, error};
4use colored::Colorize;
5use crate::db_manager;
6use crate::db_manager::error::DatabaseError;
7use crate::schemas::v1::models::platform::{self, Platform};
8
9/// Manages database schema migrations
10pub struct MigrationManager;
11
12impl MigrationManager {
13    /// Initializes and migrates the main database schema
14    pub async fn initialize_main_schema(
15        db_manager: &db_manager::DatabaseManager
16    ) -> Result<(), DatabaseError> {
17        info!("Initializing main database schema...");
18        
19        let pool = db_manager.get_main_pool();
20
21        Self::initialize_metadata_system(pool).await?;
22
23        let target_version = Self::get_target_schema_version()?;
24        let current_version = Self::get_current_schema_version(pool).await?;
25        
26        if current_version == target_version {
27            info!("Schema version check: OK (version {})", current_version);
28            return Ok(());
29        }
30        
31        Self::migrate_schema(db_manager, current_version, target_version, None, None).await
32    }
33    
34    /// Initializes and migrates a platform database schema
35    pub async fn initialize_platform_schema(
36        db_manager: &db_manager::DatabaseManager,
37        platform_name: String,
38        platform_id: i64
39    ) -> Result<(), DatabaseError> {
40        info!("Initializing platform database schema for {}...", platform_name);
41        
42        let pool = db_manager.get_platform_pool(&platform_name, platform_id).await?;
43
44        // Initialize metadata system if needed
45        Self::initialize_metadata_system(&pool).await?;
46        
47        let target_version = Self::get_target_schema_version()?;
48        let current_version = Self::get_current_schema_version(&pool).await?;
49        
50        if current_version == target_version {
51            info!("Schema version check: OK (version {})", current_version);
52            return Ok(());
53        }
54        
55        Self::migrate_schema(db_manager, current_version, target_version, Some(platform_name), Some(platform_id)).await
56    }
57    
58    /// Gets the target schema version from environment or defaults to 1
59    fn get_target_schema_version() -> Result<i64, DatabaseError> {
60        let version = env::var("OMNI_ORCH_SCHEMA_VERSION")
61            .unwrap_or_else(|_| "1".to_string())
62            .parse::<i64>()
63            .map_err(|_| DatabaseError::Other("Invalid schema version".into()))?;
64            
65        Ok(version)
66    }
67    
68    /// Gets the current schema version from the database
69    async fn get_current_schema_version(pool: &Pool<MySql>) -> Result<i64, DatabaseError> {
70        // Check if metadata table exists
71        let metadata_exists = sqlx::query("SHOW TABLES LIKE 'metadata'")
72            .fetch_optional(pool)
73            .await
74            .map_err(|e| DatabaseError::SqlxError(e))?
75            .is_some();
76            
77        if !metadata_exists {
78            return Ok(0); // No schema version yet
79        }
80        
81        let version = crate::schemas::v1::db::queries::metadata::get_meta_value(pool, "omni_schema_version")
82            .await
83            .unwrap_or_else(|_| "0".to_string())
84            .parse::<i64>()
85            .unwrap_or(0);
86            
87        Ok(version)
88    }
89    
90    /// Initializes the metadata system if it doesn't exist
91    async fn initialize_metadata_system(pool: &Pool<MySql>) -> Result<(), DatabaseError> {
92        info!("Initializing metadata system...");
93        
94        crate::schemas::v1::db::queries::metadata::initialize_metadata_system(pool)
95            .await
96            .map_err(|e| DatabaseError::MigrationError(format!(
97                "Failed to initialize metadata system: {}", e
98            )))?;
99            
100        info!("✓ Metadata system initialized");
101        Ok(())
102    }
103    
104    /// Migrates a schema from one version to another
105    async fn migrate_schema(
106        db_manager: &super::DatabaseManager,
107        current_version: i64, 
108        target_version: i64,
109        platform_name: Option<String>,
110        platform_id: Option<i64>,
111    ) -> Result<(), DatabaseError> {
112        warn!(
113            "{}",
114            format!(
115                "Schema version mismatch! Current: {}, Target: {}",
116                current_version, target_version
117            ).yellow()
118        );
119        
120        // Check for migration confirmation
121        let should_proceed = if env::var("OMNI_ORCH_BYPASS_CONFIRM").unwrap_or_default() == "confirm" {
122            warn!("{}", "Bypassing schema update confirmation due to env var".yellow());
123            true
124        } else {
125            // In an actual implementation, you would prompt the user here
126            // For simplicity, we'll just log a message and proceed
127            warn!("Type 'confirm' to update schema version:");
128            // Assume confirmed for this example
129            true
130        };
131        
132        if !should_proceed {
133            return Err(DatabaseError::Other("Schema update cancelled by user".into()));
134        }
135        
136        let mut pool: Pool<MySql>;
137
138        // Perform the migration
139        match (&platform_name, platform_id) {
140            (Some(platform), Some(platform_id_val)) => {
141                // Platform-specific schema
142                info!("Initializing platform database schema...");
143
144                pool = db_manager.get_platform_pool(platform, platform_id_val).await?;
145
146                crate::schemas::v1::db::init_platform_schema(platform, platform_id_val, target_version, db_manager)
147                    .await
148                    .map_err(|e| DatabaseError::MigrationError(format!(
149                        "Failed to migrate platform schema: {}", e
150                    )))?;
151                
152                info!("✓ Platform database schema initialized");
153                    
154                // Also initialize sample data
155                info!("Initializing platform sample data...");
156                crate::schemas::v1::db::sample_platform_data(&pool, target_version)
157                    .await
158                    .map_err(|e| DatabaseError::MigrationError(format!(
159                        "Failed to initialize platform sample data: {}", e
160                    )))?;
161                
162                info!("✓ Platform sample data initialized");
163            },
164            _ => {
165                // Main schema
166                info!("Initializing deployment database schema...");
167
168                pool = db_manager.get_main_pool().clone();
169
170                crate::schemas::v1::db::init_deployment_schema(target_version, &pool)
171                    .await
172                    .map_err(|e| DatabaseError::MigrationError(format!(
173                        "Failed to migrate deployment schema: {}", e
174                    )))?;
175                
176                info!("✓ Deployment database schema initialized");
177                    
178                // Also initialize sample data
179                info!("Initializing deployment sample data...");
180                crate::schemas::v1::db::sample_deployment_data(&pool, target_version)
181                    .await
182                    .map_err(|e| DatabaseError::MigrationError(format!(
183                        "Failed to initialize deployment sample data: {}", e
184                    )))?;
185                
186                info!("✓ Deployment sample data initialized");
187            }
188        }
189        
190        // Update schema version
191        crate::schemas::v1::db::queries::metadata::set_meta_value(
192            &pool,
193            "omni_schema_version",
194            &target_version.to_string(),
195        )
196        .await
197        .map_err(|e| DatabaseError::MigrationError(format!(
198            "Failed to update schema version: {}", e
199        )))?;
200        
201        info!("Schema migrated from version {} to {}", current_version, target_version);
202        
203        Ok(())
204    }
205}