omni_orchestrator/
main.rs

1//=============================================================================
2// OmniOrchestrator - A distributed system for managing and orchestrating
3//=============================================================================
4// Maintained by: Tristan J. Poland, Maxine Deandrade, Caznix, Haywood Spartian
5// and the OmniCloud community.
6//=============================================================================
7// This is the entry point for the OmniOrchestrator server application.
8// It manages the entirety of the OmniCloud platform and its components:
9//    - Database
10//    - API
11//    - Cluster Management
12//    - Bootstrapping
13//    - Load Balancing
14//=============================================================================
15
16// +-------------+
17// | MODULES     |
18// +-------------+
19mod config;
20mod leader;
21mod state;
22mod app_autoscaler;
23mod cluster;
24mod network;
25mod worker_autoscaler;
26mod db_manager;
27mod schemas;
28
29// +-------------+
30// | IMPORTS     |
31// +-------------+
32// Third-party dependencies
33use anyhow::anyhow;
34use anyhow::Result;
35use anyhow::Error;
36use clickhouse;
37use colored::Colorize;
38use schemas::v1::models::platform;
39use schemas::v1::models::platform::Platform;
40use core::panic;
41use env_logger::Builder;
42use lazy_static::lazy_static;
43use reqwest::Client;
44use rocket::fairing::{Fairing, Info, Kind};
45use rocket::http::Header;
46use rocket::Build;
47use rocket::Rocket;
48use rocket::{Request, Response};
49use schemas::auth::AuthConfig;
50use serde::{Deserialize, Serialize};
51use sqlx::mysql::MySqlPool;
52use sqlx::{MySql, Pool};
53use std::collections::HashMap;
54use std::io::Write;
55use std::time::Duration;
56use std::{env, sync::Arc};
57use tokio::sync::RwLock;
58use worker_autoscaler::create_default_cpu_memory_scaling_policy;
59use worker_autoscaler::WorkerAutoscaler;
60use worker_autoscaler::{CloudDirector, VMConfig, VMTemplate};
61
62// Internal imports
63use crate::cluster::{ClusterManager, NodeInfo};
64use crate::config::ServerConfig;
65use crate::config::SERVER_CONFIG;
66use crate::leader::LeaderElection;
67use crate::state::SharedState;
68use crate::db_manager::DatabaseManager; // New database manager import
69
70use schemas::v1::{api, models};
71
72// We ignore this import as it always says
73// unused even when that is not the case
74#[allow(unused_imports)]
75#[macro_use]
76extern crate rocket;
77
78pub static PROJECT_ROOT: &str = env!("CARGO_MANIFEST_DIR");
79
80
81
82
83
84
85
86
87// +---------------------+
88// | CORS IMPLEMENTATION |
89// +---------------------+
90// CORS Fairing struct to add CORS headers to responses
91pub struct CORS;
92
93#[rocket::async_trait]
94impl Fairing for CORS {
95    fn info(&self) -> Info {
96        Info {
97            name: "Add comprehensive CORS headers to responses",
98            kind: Kind::Response,
99        }
100    }
101
102    async fn on_response<'r>(&self, _request: &'r Request<'_>, response: &mut Response<'r>) {
103        response.set_header(Header::new("Access-Control-Allow-Origin", "*"));
104        response.set_header(Header::new(
105            "Access-Control-Allow-Methods",
106            "GET, POST, PUT, PATCH, DELETE, OPTIONS, HEAD",
107        ));
108        response.set_header(Header::new(
109            "Access-Control-Allow-Headers",
110            "Authorization, Content-Type, Accept, Origin, X-Requested-With",
111        ));
112        response.set_header(Header::new("Access-Control-Allow-Credentials", "true"));
113        response.set_header(Header::new("Access-Control-Max-Age", "86400")); // Cache preflight for 24 hours
114    }
115}
116
117// CORS Preflight handler for OPTIONS requests
118#[options("/<_..>")]
119fn cors_preflight() -> &'static str {
120    ""
121}
122
123// +-------------+
124// | EXTENSIONS  |
125// +-------------+
126/// Extension trait for mounting multiple routes to a Rocket instance
127trait RocketExt {
128    /// Mount multiple route groups at once to simplify route registration
129    ///
130    /// # Arguments
131    ///
132    /// * `routes` - A vector of path and route pairs to mount
133    fn mount_routes(self, routes: Vec<(&'static str, Vec<rocket::Route>)>) -> Self;
134}
135
136impl RocketExt for Rocket<Build> {
137    fn mount_routes(self, routes: Vec<(&'static str, Vec<rocket::Route>)>) -> Self {
138        let mut rocket = self;
139        for (path, routes) in routes {
140            log::info!("{}", format!("Mounting routes at {}", path).green());
141            rocket = rocket.mount(path, routes);
142        }
143        rocket
144    }
145}
146
147// +-------------+
148// | GLOBALS     |
149// +-------------+
150// Global singleton instance of the cluster manager
151// Manages node discovery and peer connections
152lazy_static! {
153    static ref CLUSTER_MANAGER: Arc<RwLock<ClusterManager>> = {
154        let state = format!("{}:{}", SERVER_CONFIG.address, SERVER_CONFIG.port);
155
156        let bind1 = &state;
157        let bind = bind1.clone();
158        let state = SharedState::new(bind.into());
159        let shared_state = Arc::new(RwLock::new(state));
160        Arc::new(RwLock::new(ClusterManager::new(shared_state)))
161    };
162}
163
164// +-------------+
165// | MODELS      |
166// +-------------+
167/// Message format for cluster status API responses
168#[derive(Debug, Serialize, Deserialize)]
169struct ClusterStatusMessage {
170    /// Current role of the node (leader/follower)
171    node_roles: String,
172    /// List of nodes in the cluster
173    cluster_nodes: Vec<NodeInfo>,
174}
175
176/// Standard API response format for cluster operations
177#[derive(Serialize, Deserialize)]
178struct ApiResponse {
179    /// Status of the operation ("ok" or "error")
180    status: String,
181    /// Response message containing cluster information
182    message: ClusterStatusMessage,
183}
184
185// +-------------+
186// | METHODS     |
187// +-------------+
188impl ClusterManager {
189    /// Discovers and connects to peer nodes in the cluster
190    ///
191    /// # Arguments
192    ///
193    /// * `config` - Server configuration containing instance information
194    /// * `my_port` - Current node's port to avoid self-connection
195    ///
196    /// # Returns
197    ///
198    /// Result indicating success or failure of the discovery process
199    pub async fn discover_peers(&self, config: &ServerConfig, my_port: u16) -> Result<()> {
200        let client = Client::new();
201        log::info!("{}", "Starting peer discovery...".cyan());
202
203        for instance in &config.instances {
204            let string = format!("{:#?}", instance);
205            log::info!("{}", format!("Discovered: {}", string).blue().bold());
206            if instance.port == my_port {
207                log::debug!("Skipping self-connection at port {}", my_port);
208                continue;
209            }
210
211            let node_address: Arc<str> = format!("{}:{}", instance.address, instance.port).into();
212            let node_uri = format!("{}", node_address);
213
214            match self.connect_to_peer(&client, &node_uri.clone()).await {
215                Ok(_) => log::info!(
216                    "{}",
217                    format!("Successfully connected to peer: {}", node_uri).green()
218                ),
219                Err(e) => {
220                    log::warn!(
221                        "{}",
222                        format!("Failed to connect to peer: {} {}", node_uri, e).yellow()
223                    );
224                    self.remove_node(node_uri.into()).await;
225                }
226            }
227        }
228
229        log::info!("{}", "Peer discovery completed".cyan());
230        Ok(())
231    }
232
233    /// Attempts to establish a connection with a peer node
234    ///
235    /// # Arguments
236    ///
237    /// * `client` - HTTP client for making requests
238    /// * `node_address` - Address of the peer node
239    ///
240    /// # Returns
241    ///
242    /// Result indicating success or failure of the connection attempt
243    async fn connect_to_peer(&self, client: &Client, node_address: &str) -> Result<()> {
244        let health_url = format!("{}/health", node_address);
245        log::debug!("Checking health at: {}", health_url);
246
247        let response = client
248            .get(&health_url)
249            .timeout(Duration::from_secs(5))
250            .send()
251            .await?;
252
253        if response.status().is_success() {
254            let port = node_address
255                .split(':')
256                .next_back()
257                .unwrap_or("80")
258                .parse::<u16>()
259                .unwrap_or(80);
260
261            let node_info = NodeInfo {
262                id: node_address.into(),
263                address: node_address.into(),
264                port,
265            };
266
267            self.register_node(node_info).await;
268            log::debug!("Node registered: {}", node_address);
269            Ok(())
270        } else {
271            Err(anyhow!("Node health check failed"))
272        }
273    }
274}
275
276// +-------------+
277// | ENDPOINTS   |
278// +-------------+
279/// Health check endpoint to verify node status
280///
281/// # Returns
282///
283/// JSON response with basic health status
284#[get("/health")]
285async fn health_check() -> rocket::serde::json::Json<ApiResponse> {
286    log::debug!("Health check endpoint called");
287    rocket::serde::json::Json(ApiResponse {
288        status: "ok".to_string(),
289        message: ClusterStatusMessage {
290            node_roles: "unknown".to_string(),
291            cluster_nodes: vec![],
292        },
293    })
294}
295
296/// Cluster status endpoint providing detailed information about the cluster
297///
298/// # Arguments
299///
300/// * `state` - Shared state containing leadership information
301/// * `cluster` - Cluster manager containing node information
302///
303/// # Returns
304///
305/// JSON response with cluster status details
306#[get("/cluster/status")]
307async fn cluster_status(
308    state: &rocket::State<Arc<RwLock<SharedState>>>,
309    cluster: &rocket::State<Arc<RwLock<ClusterManager>>>,
310) -> rocket::serde::json::Json<ApiResponse> {
311    log::debug!("Cluster status endpoint called");
312    let state = state.read().await;
313    let nodes = cluster.read().await;
314
315    let role = if state.is_leader {
316        "leader".to_string()
317    } else {
318        "follower".to_string()
319    };
320
321    log::info!("{}", format!("Current node role: {}", role).cyan());
322
323    let response = ApiResponse {
324        status: "ok".to_string(),
325        message: ClusterStatusMessage {
326            node_roles: role,
327            cluster_nodes: nodes.get_nodes().await,
328        },
329    };
330
331    rocket::serde::json::Json(response)
332}
333
334// +-------------+
335// | MAIN        |
336// +-------------+
337/// Main entry point for the OmniOrchestrator server
338#[rocket::main]
339async fn main() -> Result<(), Box<dyn std::error::Error>> {
340    // ====================== INITIALIZATION ======================
341    let port = SERVER_CONFIG.port;
342    println!(
343        "{}",
344        "╔═══════════════════════════════════════════════════════════════╗".bright_cyan()
345    );
346    println!(
347        "{}",
348        "║               OMNI ORCHESTRATOR SERVER STARTING               ║".bright_cyan()
349    );
350    println!(
351        "{}",
352        "╚═══════════════════════════════════════════════════════════════╝".bright_cyan()
353    );
354    println!("{}", format!("⇒ Starting server on port {}", port).green());
355
356    // Setup logging
357    Builder::new()
358        .filter_level(log::LevelFilter::Info)
359        .format(|buf, record| {
360            // Get default style
361            let _style = buf.default_level_style(record.level());
362            writeln!(buf, "{}: {}", record.level(), format!("{}", record.args()))
363        })
364        .init();
365
366    log::info!("{}", "Logger initialized successfully".green());
367
368    // ====================== DATABASE SETUP ======================
369    println!(
370        "{}",
371        "╔═══════════════════════════════════════════════════════════════╗".bright_blue()
372    );
373    println!(
374        "{}",
375        "║                 Deployment Database Connection                ║".bright_blue()
376    );
377    println!(
378        "{}",
379        "╚═══════════════════════════════════════════════════════════════╝".bright_blue()
380    );
381
382    // Get the database URL from environment or use default
383    let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| {
384        dotenv::dotenv().ok();
385        env::var("DEFAULT_DATABASE_URL")
386            .unwrap_or_else(|_| "mysql://root:root@localhost:4001".to_string())
387    });
388
389    // Initialize database manager
390    log::info!("{}", format!("Database URL: {}", database_url).blue());
391    let db_manager = Arc::new(DatabaseManager::new(&database_url).await?);
392    
393    // Get main database pool for further operations
394    let pool = db_manager.get_main_pool();
395
396    // Platform database initialization
397    println!(
398        "{}",
399        "╔═══════════════════════════════════════════════════════════════╗".bright_blue()
400    );
401    println!(
402        "{}",
403        "║                 Platform Database Registration                ║".bright_blue()
404    );
405    println!(
406        "{}",
407        "╚═══════════════════════════════════════════════════════════════╝".bright_blue()
408    );
409
410    // Get all platforms and initialize their database pools
411    let platforms = db_manager.get_all_platforms().await?;
412    log::info!("{}", format!("Found {} platforms", platforms.len()).blue());
413
414    for platform in &platforms {
415        log::info!(
416            "{}",
417            format!(
418                "Pre-initializing connection for platform: {}",
419                platform.name
420            )
421            .blue()
422        );
423        db_manager.get_platform_pool(&platform.name, platform.id.unwrap_or(0)).await?;
424    }
425
426    // ======================= ClickHouse SETUP ======================
427    println!(
428        "{}",
429        "╔═══════════════════════════════════════════════════════════════╗".blue()
430    );
431    println!(
432        "{}",
433        "║                  CLICKHOUSE CONNECTION                        ║".blue()
434    );
435    println!(
436        "{}",
437        "╚═══════════════════════════════════════════════════════════════╝".blue()
438    );
439    // Initialize ClickHouse connection pool
440    let clickhouse_url = env::var("CLICKHOUSE_URL").unwrap_or_else(|_| {
441        dotenv::dotenv().ok(); // Load environment variables from a .env file if available
442        env::var("DEFAULT_CLICKHOUSE_URL").unwrap_or_else(|_| "http://localhost:8123".to_string())
443    });
444    log::info!("{}", format!("ClickHouse URL: {}", clickhouse_url).blue());
445    log::info!("{}", "Initializing ClickHouse connection...".blue());
446
447    // Modify your connection to include more debugging info
448    let clickhouse_client = clickhouse::Client::default()
449        .with_url(&clickhouse_url)
450        .with_database("default")
451        .with_user("default")
452        .with_password("your_secure_password");
453
454    // Add a simple ping test before attempting schema initialization
455    match clickhouse_client.query("SELECT 1").execute().await {
456        Ok(_) => log::info!("✓ ClickHouse connection test successful"),
457        Err(e) => {
458            log::error!("ClickHouse connection test failed: {:?}", e);
459            panic!("Cannot connect to ClickHouse");
460        }
461    }
462
463    log::info!("{}", "✓ ClickHouse connection established".green());
464    log::info!("{}", "✓ ClickHouse connection pool initialized".green());
465
466    // ====================== Schema SETUP ======================
467
468    // Load schema based on version
469    log::info!("{}", "Loading schema files...".blue());
470    let schema_version =
471        schemas::v1::db::queries::metadata::get_meta_value(pool, "omni_schema_version")
472            .await
473            .unwrap_or_else(|_| "1".to_string());
474
475    let schema_path = format!("{}/sql/v{}/clickhouse_up.sql", PROJECT_ROOT, schema_version);
476    log::info!(
477        "{}",
478        format!("Loading schema from path: {}", schema_path).blue()
479    );
480
481    // Initialize ClickHouse schema
482    log::info!("{}", "Initializing ClickHouse schema...".blue());
483    match api::logging::init_clickhouse_db(&clickhouse_client, &schema_path).await {
484        Ok(_) => log::info!("{}", "✓ ClickHouse schema initialized".green()),
485        Err(e) => {
486            log::error!(
487                "{}",
488                format!("Failed to initialize ClickHouse schema: {:?}", e).red()
489            );
490            panic!("Failed to initialize ClickHouse schema");
491        }
492    };
493
494    // ====================== CLUSTER SETUP ======================
495    println!(
496        "{}",
497        "╔═══════════════════════════════════════════════════════════════╗".bright_magenta()
498    );
499    println!(
500        "{}",
501        "║                     CLUSTER MANAGEMENT                        ║".bright_magenta()
502    );
503    println!(
504        "{}",
505        "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
506    );
507
508    // Initialize node state and cluster management
509    let node_id: Arc<str> =
510        format!("{}:{}", SERVER_CONFIG.address.clone(), SERVER_CONFIG.port).into();
511    log::info!("{}", format!("Node ID: {}", node_id).magenta());
512
513    let shared_state: Arc<RwLock<SharedState>> =
514        Arc::new(RwLock::new(SharedState::new(node_id.clone())));
515
516    // Start peer discovery in background task
517    log::info!("{}", "Starting peer discovery background task".magenta());
518    tokio::task::spawn({
519        let cluster_manager = CLUSTER_MANAGER.clone();
520        let server_config = SERVER_CONFIG.clone();
521        async move {
522            loop {
523                if let Err(e) = cluster_manager
524                    .read()
525                    .await
526                    .discover_peers(&server_config, port)
527                    .await
528                {
529                    log::error!("{}", format!("Failed to discover peers: {e}").red());
530                }
531                tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
532            }
533        }
534    });
535
536    // ====================== AUTOSCALER SETUP ======================
537    println!(
538        "{}",
539        "╔═══════════════════════════════════════════════════════════════╗".bright_yellow()
540    );
541    println!(
542        "{}",
543        "║                    AUTOSCALER SETUP                           ║".bright_yellow()
544    );
545    println!(
546        "{}",
547        "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
548    );
549
550    // Initialize worker autoscaler with default policy
551    log::info!(
552        "{}",
553        "Creating worker autoscaler with default policy".yellow()
554    );
555    let policy = create_default_cpu_memory_scaling_policy();
556    let mut autoscaler = WorkerAutoscaler::new(1, 1, policy);
557
558    // Add cloud director for managing VMs
559    log::info!("{}", "Adding cloud director (AWS/us-east-1)".yellow());
560    let cloud_director = Arc::new(CloudDirector::new(
561        "cloud-1".to_string(),
562        "aws".to_string(),
563        "us-east-1".to_string(),
564    ));
565    autoscaler.add_director(cloud_director);
566
567    // Set up VM template for worker nodes
568    log::info!("{}", "Setting up VM template for worker nodes".yellow());
569    let mut vm_template = VMTemplate::default();
570    vm_template.base_name = "omni-worker".to_string();
571    vm_template.config = VMConfig {
572        cpu: 2,
573        memory: 4096, // 4GB
574        storage: 80,  // 80GB
575        options: HashMap::new(),
576    };
577    autoscaler.set_vm_template(vm_template);
578    log::info!("{}", "✓ Worker autoscaler configured".green());
579
580    // Start discovery tasks
581    log::info!("{}", "Starting worker autoscaler discovery tasks".yellow());
582    tokio::spawn({
583        let mut autoscaler = autoscaler;
584        async move {
585            // Sleep for 1.5 seconds before starting discovery
586            log::debug!("Sleeping for 1.5 seconds before discovery...");
587            tokio::time::sleep(Duration::from_millis(1500)).await;
588            loop {
589                log::info!("{}", "Discovering nodes and VMs...".yellow());
590                if let Err(e) = autoscaler.discover_nodes().await {
591                    log::error!("{}", format!("Node discovery error: {}", e).red());
592                }
593                if let Err(e) = autoscaler.discover_vms().await {
594                    log::error!("{}", format!("VM discovery error: {}", e).red());
595                }
596                let metrics: HashMap<String, f32> = HashMap::new(); // TODO: populate with actual metrics
597                if let Err(e) = autoscaler.check_scaling(&metrics) {
598                    log::error!("{}", format!("Worker scaling error: {}", e).red());
599                }
600                // Sleep for 3 seconds before next discovery
601                log::debug!("Sleeping autoscaling thread for 3 seconds...");
602                tokio::time::sleep(Duration::from_secs(3)).await;
603            }
604        }
605    });
606
607    // Initialize the app autoscaler
608    log::info!(
609        "{}",
610        "Creating application autoscaler with default policy".yellow()
611    );
612    let app_policy = app_autoscaler::policy::create_default_cpu_memory_scaling_policy();
613    let app_autoscaler = app_autoscaler::app_autoscaler::AppAutoscaler::new(
614        1,  // min instances
615        10, // max instances
616        app_policy,
617    );
618    log::info!("{}", "✓ Application autoscaler configured".green());
619
620    // Spawn a task to run the app autoscaler discovery and scaling loop
621    log::info!(
622        "{}",
623        "Starting application autoscaler discovery tasks".yellow()
624    );
625    tokio::spawn({
626        let mut app_autoscaler = app_autoscaler;
627        async move {
628            // Sleep for 1.5 seconds before starting discovery
629            log::debug!("Sleeping for 1.5 seconds before app autoscaler discovery...");
630            tokio::time::sleep(Duration::from_millis(1500)).await;
631            loop {
632                log::info!("{}", "Discovering app instances...".yellow());
633                if let Err(e) = app_autoscaler.discover_app_instances().await {
634                    log::error!("{}", format!("App instance discovery error: {}", e).red());
635                }
636
637                let metrics: HashMap<String, f32> = HashMap::new(); // TODO: populate with actual metrics
638                if let Err(e) = app_autoscaler.check_scaling(&metrics) {
639                    log::error!("{}", format!("App scaling error: {}", e).red());
640                }
641
642                // Sleep for 3 seconds before next discovery
643                log::debug!("Sleeping app autoscaling thread for 3 seconds...");
644                tokio::time::sleep(Duration::from_secs(3)).await;
645            }
646        }
647    });
648
649    // ====================== LEADER ELECTION ======================
650    println!(
651        "{}",
652        "╔═══════════════════════════════════════════════════════════════╗".bright_green()
653    );
654    println!(
655        "{}",
656        "║                      LEADER ELECTION                          ║".bright_green()
657    );
658    println!(
659        "{}",
660        "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
661    );
662
663    // Initialize and start leader election
664    log::info!("{}", "Initializing leader election process".green());
665    let _leader_election = LeaderElection::new(node_id, shared_state.clone());
666    log::info!("{}", "✓ Leader election initialized".green());
667
668    // ====================== SERVER STARTUP ======================
669    println!(
670        "{}",
671        "╔═══════════════════════════════════════════════════════════════╗".bright_cyan()
672    );
673    println!(
674        "{}",
675        "║                       SERVER STARTUP                          ║".bright_cyan()
676    );
677    println!(
678        "{}",
679        "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
680    );
681
682    // Define routes to mount
683    log::info!("{}", "Defining API routes".cyan());
684    let routes = vec![
685        (
686            "/",
687            routes![
688                health_check,
689                api::index::routes_ui,
690                cluster_status,
691                cors_preflight
692            ],
693        ),
694        ("/api/v1", api::routes()),
695    ];
696
697    let auth_config = AuthConfig {
698        jwt_secret: std::env::var("JWT_SECRET")
699            .expect("Environment variable JWT_SECRET must be set for secure operation."),
700        token_expiry_hours: std::env::var("TOKEN_EXPIRY_HOURS")
701            .unwrap_or_else(|_| "24".to_string())
702            .parse()
703            .expect("Invalid value for TOKEN_EXPIRY_HOURS"),
704    };
705
706    // Build Rocket instance with base configuration
707    log::info!("{}", "Building Rocket instance".cyan());
708    let rocket_instance = rocket::build()
709        .configure(rocket::Config {
710            port,
711            address: std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
712            ..Default::default()
713        })
714        // Add database manager to Rocket's state
715        .manage(db_manager.clone())
716        .manage(pool.clone())  // This is the pool for the core deployment
717                               // database. This is meant to store SYSTEM METADATA and not platform-specific data.
718        .manage(CLUSTER_MANAGER.clone())
719        .manage(clickhouse_client)
720        .manage(shared_state)
721        .manage(auth_config)
722        .attach(CORS); // Attach the CORS fairing
723
724    // Mount routes to the Rocket instance
725    log::info!("{}", "Mounting API routes".cyan());
726    let rocket_with_routes = rocket_instance.mount_routes(routes);
727
728    // Collect routes information before launch
729    api::index::collect_routes(&rocket_with_routes);
730
731    // Launch server
732    log::info!("{}", "🚀 LAUNCHING SERVER...".bright_cyan().bold());
733    let _rocket = rocket_with_routes.launch().await?;
734
735    Ok(())
736}