omni_orchestrator/db_manager/
migration.rs

1use sqlx::{MySql, Pool};
2use std::env;
3use log::{info, warn, error};
4use colored::Colorize;
5use crate::db_manager;
6use crate::db_manager::error::DatabaseError;
7
8// Import the types we need
9use libomni::types::db::v1 as types;
10use types::platform::{self, Platform};
11
12/// Manages database schema migrations
13pub struct MigrationManager;
14
15impl MigrationManager {
16    /// Initializes and migrates the main database schema
17    pub async fn initialize_main_schema(
18        db_manager: &db_manager::DatabaseManager
19    ) -> Result<(), DatabaseError> {
20        info!("Initializing main database schema...");
21        
22        let pool = db_manager.get_main_pool();
23
24        Self::initialize_metadata_system(pool).await?;
25
26        let target_version = Self::get_target_schema_version()?;
27        let current_version = Self::get_current_schema_version(pool).await?;
28        
29        if current_version == target_version {
30            info!("Schema version check: OK (version {})", current_version);
31            return Ok(());
32        }
33        
34        Self::migrate_schema(db_manager, current_version, target_version, None, None).await
35    }
36    
37    /// Initializes and migrates a platform database schema
38    pub async fn initialize_platform_schema(
39        db_manager: &db_manager::DatabaseManager,
40        platform_name: String,
41        platform_id: i64
42    ) -> Result<(), DatabaseError> {
43        info!("Initializing platform database schema for {}...", platform_name);
44        
45        let pool = db_manager.get_platform_pool(&platform_name, platform_id).await?;
46
47        // Initialize metadata system if needed
48        Self::initialize_metadata_system(&pool).await?;
49        
50        let target_version = Self::get_target_schema_version()?;
51        let current_version = Self::get_current_schema_version(&pool).await?;
52        
53        if current_version == target_version {
54            info!("Schema version check: OK (version {})", current_version);
55            return Ok(());
56        }
57        
58        Self::migrate_schema(db_manager, current_version, target_version, Some(platform_name), Some(platform_id)).await
59    }
60    
61    /// Gets the target schema version from environment or defaults to 1
62    fn get_target_schema_version() -> Result<i64, DatabaseError> {
63        let version = env::var("OMNI_ORCH_SCHEMA_VERSION")
64            .unwrap_or_else(|_| "1".to_string())
65            .parse::<i64>()
66            .map_err(|_| DatabaseError::Other("Invalid schema version".into()))?;
67            
68        Ok(version)
69    }
70    
71    /// Gets the current schema version from the database
72    async fn get_current_schema_version(pool: &Pool<MySql>) -> Result<i64, DatabaseError> {
73        // Check if metadata table exists
74        let metadata_exists = sqlx::query("SHOW TABLES LIKE 'metadata'")
75            .fetch_optional(pool)
76            .await
77            .map_err(|e| DatabaseError::SqlxError(e))?
78            .is_some();
79            
80        if !metadata_exists {
81            return Ok(0); // No schema version yet
82        }
83        
84        let version = crate::schemas::v1::db::queries::metadata::get_meta_value(pool, "omni_schema_version")
85            .await
86            .unwrap_or_else(|_| "0".to_string())
87            .parse::<i64>()
88            .unwrap_or(0);
89            
90        Ok(version)
91    }
92    
93    /// Initializes the metadata system if it doesn't exist
94    async fn initialize_metadata_system(pool: &Pool<MySql>) -> Result<(), DatabaseError> {
95        info!("Initializing metadata system...");
96        
97        crate::schemas::v1::db::queries::metadata::initialize_metadata_system(pool)
98            .await
99            .map_err(|e| DatabaseError::MigrationError(format!(
100                "Failed to initialize metadata system: {}", e
101            )))?;
102            
103        info!("✓ Metadata system initialized");
104        Ok(())
105    }
106    
107    /// Migrates a schema from one version to another
108    async fn migrate_schema(
109        db_manager: &super::DatabaseManager,
110        current_version: i64, 
111        target_version: i64,
112        platform_name: Option<String>,
113        platform_id: Option<i64>,
114    ) -> Result<(), DatabaseError> {
115        warn!(
116            "{}",
117            format!(
118                "Schema version mismatch! Current: {}, Target: {}",
119                current_version, target_version
120            ).yellow()
121        );
122        
123        // Check for migration confirmation
124        let should_proceed = if env::var("OMNI_ORCH_BYPASS_CONFIRM").unwrap_or_default() == "confirm" {
125            warn!("{}", "Bypassing schema update confirmation due to env var".yellow());
126            true
127        } else {
128            // In an actual implementation, you would prompt the user here
129            // For simplicity, we'll just log a message and proceed
130            warn!("Type 'confirm' to update schema version:");
131            // Assume confirmed for this example
132            true
133        };
134        
135        if !should_proceed {
136            return Err(DatabaseError::Other("Schema update cancelled by user".into()));
137        }
138        
139        let mut pool: Pool<MySql>;
140
141        // Perform the migration
142        match (&platform_name, platform_id) {
143            (Some(platform), Some(platform_id_val)) => {
144                // Platform-specific schema
145                info!("Initializing platform database schema...");
146
147                pool = db_manager.get_platform_pool(platform, platform_id_val).await?;
148
149                crate::schemas::v1::db::init_platform_schema(platform, platform_id_val, target_version, db_manager)
150                    .await
151                    .map_err(|e| DatabaseError::MigrationError(format!(
152                        "Failed to migrate platform schema: {}", e
153                    )))?;
154                
155                info!("✓ Platform database schema initialized");
156                    
157                // Also initialize sample data
158                info!("Initializing platform sample data...");
159                crate::schemas::v1::db::sample_platform_data(&pool, target_version)
160                    .await
161                    .map_err(|e| DatabaseError::MigrationError(format!(
162                        "Failed to initialize platform sample data: {}", e
163                    )))?;
164                
165                info!("✓ Platform sample data initialized");
166            },
167            _ => {
168                // Main schema
169                info!("Initializing deployment database schema...");
170
171                pool = db_manager.get_main_pool().clone();
172
173                crate::schemas::v1::db::init_deployment_schema(target_version, &pool)
174                    .await
175                    .map_err(|e| DatabaseError::MigrationError(format!(
176                        "Failed to migrate deployment schema: {}", e
177                    )))?;
178                
179                info!("✓ Deployment database schema initialized");
180                    
181                // Also initialize sample data
182                info!("Initializing deployment sample data...");
183                crate::schemas::v1::db::sample_deployment_data(&pool, target_version)
184                    .await
185                    .map_err(|e| DatabaseError::MigrationError(format!(
186                        "Failed to initialize deployment sample data: {}", e
187                    )))?;
188                
189                info!("✓ Deployment sample data initialized");
190            }
191        }
192        
193        // Update schema version
194        crate::schemas::v1::db::queries::metadata::set_meta_value(
195            &pool,
196            "omni_schema_version",
197            &target_version.to_string(),
198        )
199        .await
200        .map_err(|e| DatabaseError::MigrationError(format!(
201            "Failed to update schema version: {}", e
202        )))?;
203        
204        info!("Schema migrated from version {} to {}", current_version, target_version);
205        
206        Ok(())
207    }
208}