omni_orchestrator/
main.rs

1//=============================================================================
2// OmniOrchestrator - A distributed system for managing and orchestrating
3//=============================================================================
4// Maintained by: Tristan J. Poland, Maxine Deandrade, Caznix, Haywood Spartian
5// and the OmniCloud community.
6//=============================================================================
7// This is the entry point for the OmniOrchestrator server application.
8// It manages the entirety of the OmniCloud platform and its components:
9//    - Database
10//    - API
11//    - Cluster Management
12//    - Bootstrapping
13//    - Load Balancing
14//=============================================================================
15
16// +-------------+
17// | MODULES     |
18// +-------------+
19mod autoscalar;
20mod cluster;
21mod config;
22mod db_manager;
23mod leader;
24mod network;
25mod schemas;
26mod state;
27
28// +-------------+
29// | IMPORTS     |
30// +-------------+
31// Third-party dependencies
32use anyhow::anyhow;
33use anyhow::Result;
34use clickhouse;
35use colored::Colorize;
36use core::panic;
37use env_logger::Builder;
38use lazy_static::lazy_static;
39use libomni::types::db::auth::AuthConfig;
40use reqwest::Client;
41use rocket::{
42    fairing::{Fairing, Info, Kind},
43    http::Header,
44    Build, Request, Response, Rocket,
45};
46use serde::{Deserialize, Serialize};
47use std::time::Duration;
48use std::{env, sync::Arc};
49use tokio::sync::RwLock;
50use std::io::Write;
51
52// Internal imports
53use crate::cluster::{ClusterManager, NodeInfo};
54use crate::config::ServerConfig;
55use crate::config::SERVER_CONFIG;
56use crate::db_manager::DatabaseManager;
57use crate::leader::LeaderElection;
58use crate::state::SharedState; // New database manager import
59
60use libomni::types::db::v1 as types;
61use lighthouse::{
62    policies, LighthouseConfig, ResourceConfig, ScalingPolicy, ScalingThreshold
63};
64use schemas::v1::api;
65
66// We ignore this import as it always says
67// unused even when that is not the case
68#[allow(unused_imports)]
69#[macro_use]
70extern crate rocket;
71
72pub static PROJECT_ROOT: &str = env!("CARGO_MANIFEST_DIR");
73
74// +---------------------+
75// | CORS IMPLEMENTATION |
76// +---------------------+
77// CORS Fairing struct to add CORS headers to responses
78pub struct CORS;
79
80#[rocket::async_trait]
81impl Fairing for CORS {
82    fn info(&self) -> Info {
83        Info {
84            name: "Add comprehensive CORS headers to responses",
85            kind: Kind::Response,
86        }
87    }
88
89    async fn on_response<'r>(&self, _request: &'r Request<'_>, response: &mut Response<'r>) {
90        response.set_header(Header::new("Access-Control-Allow-Origin", "*"));
91        response.set_header(Header::new(
92            "Access-Control-Allow-Methods",
93            "GET, POST, PUT, PATCH, DELETE, OPTIONS, HEAD",
94        ));
95        response.set_header(Header::new(
96            "Access-Control-Allow-Headers",
97            "Authorization, Content-Type, Accept, Origin, X-Requested-With",
98        ));
99        response.set_header(Header::new("Access-Control-Allow-Credentials", "true"));
100        response.set_header(Header::new("Access-Control-Max-Age", "86400")); // Cache preflight for 24 hours
101    }
102}
103
104// CORS Preflight handler for OPTIONS requests
105#[options("/<_..>")]
106fn cors_preflight() -> &'static str {
107    ""
108}
109
110// +-------------+
111// | EXTENSIONS  |
112// +-------------+
113/// Extension trait for mounting multiple routes to a Rocket instance
114trait RocketExt {
115    /// Mount multiple route groups at once to simplify route registration
116    ///
117    /// # Arguments
118    ///
119    /// * `routes` - A vector of path and route pairs to mount
120    fn mount_routes(self, routes: Vec<(&'static str, Vec<rocket::Route>)>) -> Self;
121}
122
123impl RocketExt for Rocket<Build> {
124    fn mount_routes(self, routes: Vec<(&'static str, Vec<rocket::Route>)>) -> Self {
125        let mut rocket = self;
126        for (path, routes) in routes {
127            log::info!("{}", format!("Mounting routes at {}", path).green());
128            rocket = rocket.mount(path, routes);
129        }
130        rocket
131    }
132}
133
134// +-------------+
135// | GLOBALS     |
136// +-------------+
137// Global singleton instance of the cluster manager
138// Manages node discovery and peer connections
139lazy_static! {
140    static ref CLUSTER_MANAGER: Arc<RwLock<ClusterManager>> = {
141        let state = format!("{}:{}", SERVER_CONFIG.address, SERVER_CONFIG.port);
142
143        let bind1 = &state;
144        let bind = bind1.clone();
145        let state = SharedState::new(bind.into());
146        let shared_state = Arc::new(RwLock::new(state));
147        Arc::new(RwLock::new(ClusterManager::new(shared_state)))
148    };
149}
150
151// +-------------+
152// | MODELS      |
153// +-------------+
154/// Message format for cluster status API responses
155#[derive(Debug, Serialize, Deserialize)]
156struct ClusterStatusMessage {
157    /// Current role of the node (leader/follower)
158    node_roles: String,
159    /// List of nodes in the cluster
160    cluster_nodes: Vec<NodeInfo>,
161}
162
163/// Standard API response format for cluster operations
164#[derive(Serialize, Deserialize)]
165struct ApiResponse {
166    /// Status of the operation ("ok" or "error")
167    status: String,
168    /// Response message containing cluster information
169    message: ClusterStatusMessage,
170}
171
172// +-------------+
173// | METHODS     |
174// +-------------+
175impl ClusterManager {
176    /// Discovers and connects to peer nodes in the cluster
177    ///
178    /// # Arguments
179    ///
180    /// * `config` - Server configuration containing instance information
181    /// * `my_port` - Current node's port to avoid self-connection
182    ///
183    /// # Returns
184    ///
185    /// Result indicating success or failure of the discovery process
186    pub async fn discover_peers(&self, config: &ServerConfig, my_port: u16) -> Result<()> {
187        let client = Client::new();
188        log::info!("{}", "Starting peer discovery...".cyan());
189
190        for instance in &config.instances {
191            let string = format!("{:#?}", instance);
192            log::info!("{}", format!("Discovered: {}", string).blue().bold());
193            if instance.port == my_port {
194                log::debug!("Skipping self-connection at port {}", my_port);
195                continue;
196            }
197
198            let node_address: Arc<str> = format!("{}:{}", instance.address, instance.port).into();
199            let node_uri = format!("{}", node_address);
200
201            match self.connect_to_peer(&client, &node_uri.clone()).await {
202                Ok(_) => log::info!(
203                    "{}",
204                    format!("Successfully connected to peer: {}", node_uri).green()
205                ),
206                Err(e) => {
207                    log::warn!(
208                        "{}",
209                        format!("Failed to connect to peer: {} {}", node_uri, e).yellow()
210                    );
211                    self.remove_node(node_uri.into()).await;
212                }
213            }
214        }
215
216        log::info!("{}", "Peer discovery completed".cyan());
217        Ok(())
218    }
219
220    /// Attempts to establish a connection with a peer node
221    ///
222    /// # Arguments
223    ///
224    /// * `client` - HTTP client for making requests
225    /// * `node_address` - Address of the peer node
226    ///
227    /// # Returns
228    ///
229    /// Result indicating success or failure of the connection attempt
230    async fn connect_to_peer(&self, client: &Client, node_address: &str) -> Result<()> {
231        let health_url = format!("{}/health", node_address);
232        log::debug!("Checking health at: {}", health_url);
233
234        let response = client
235            .get(&health_url)
236            .timeout(Duration::from_secs(5))
237            .send()
238            .await?;
239
240        if response.status().is_success() {
241            let port = node_address
242                .split(':')
243                .next_back()
244                .unwrap_or("80")
245                .parse::<u16>()
246                .unwrap_or(80);
247
248            let node_info = NodeInfo {
249                id: node_address.into(),
250                address: node_address.into(),
251                port,
252            };
253
254            self.register_node(node_info).await;
255            log::debug!("Node registered: {}", node_address);
256            Ok(())
257        } else {
258            Err(anyhow!("Node health check failed"))
259        }
260    }
261}
262
263// +-------------+
264// | ENDPOINTS   |
265// +-------------+
266/// Health check endpoint to verify node status
267///
268/// # Returns
269///
270/// JSON response with basic health status
271#[get("/health")]
272async fn health_check() -> rocket::serde::json::Json<ApiResponse> {
273    log::debug!("Health check endpoint called");
274    rocket::serde::json::Json(ApiResponse {
275        status: "ok".to_string(),
276        message: ClusterStatusMessage {
277            node_roles: "unknown".to_string(),
278            cluster_nodes: vec![],
279        },
280    })
281}
282
283/// Cluster status endpoint providing detailed information about the cluster
284///
285/// # Arguments
286///
287/// * `state` - Shared state containing leadership information
288/// * `cluster` - Cluster manager containing node information
289///
290/// # Returns
291///
292/// JSON response with cluster status details
293#[get("/cluster/status")]
294async fn cluster_status(
295    state: &rocket::State<Arc<RwLock<SharedState>>>,
296    cluster: &rocket::State<Arc<RwLock<ClusterManager>>>,
297) -> rocket::serde::json::Json<ApiResponse> {
298    log::debug!("Cluster status endpoint called");
299    let state = state.read().await;
300    let nodes = cluster.read().await;
301
302    let role = if state.is_leader {
303        "leader".to_string()
304    } else {
305        "follower".to_string()
306    };
307
308    log::info!("{}", format!("Current node role: {}", role).cyan());
309
310    let response = ApiResponse {
311        status: "ok".to_string(),
312        message: ClusterStatusMessage {
313            node_roles: role,
314            cluster_nodes: nodes.get_nodes().await,
315        },
316    };
317
318    rocket::serde::json::Json(response)
319}
320
321// +-------------+
322// | MAIN        |
323// +-------------+
324/// Main entry point for the OmniOrchestrator server
325#[rocket::main]
326async fn main() -> Result<(), Box<dyn std::error::Error>> {
327    // ====================== INITIALIZATION ======================
328    let port = SERVER_CONFIG.port;
329    println!(
330        "{}",
331        "╔═══════════════════════════════════════════════════════════════╗".bright_cyan()
332    );
333    println!(
334        "{}",
335        "║               OMNI ORCHESTRATOR SERVER STARTING               ║".bright_cyan()
336    );
337    println!(
338        "{}",
339        "╚═══════════════════════════════════════════════════════════════╝".bright_cyan()
340    );
341    println!("{}", format!("⇒ Starting server on port {}", port).green());
342
343    // Setup logging
344    Builder::new()
345        .filter_level(log::LevelFilter::Info)
346        .format(|buf, record| {
347            // Get default style
348            let _style = buf.default_level_style(record.level());
349            writeln!(buf, "{}: {}", record.level(), format!("{}", record.args()))
350        })
351        .init();
352
353    log::info!("{}", "Logger initialized successfully".green());
354
355    // ====================== DATABASE SETUP ======================
356    println!(
357        "{}",
358        "╔═══════════════════════════════════════════════════════════════╗".bright_blue()
359    );
360    println!(
361        "{}",
362        "║                 Deployment Database Connection                ║".bright_blue()
363    );
364    println!(
365        "{}",
366        "╚═══════════════════════════════════════════════════════════════╝".bright_blue()
367    );
368
369    // Get the database URL from environment or use default
370    let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| {
371        dotenv::dotenv().ok();
372        env::var("DEFAULT_DATABASE_URL")
373            .unwrap_or_else(|_| "mysql://root:root@localhost:4001".to_string())
374    });
375
376    // Initialize database manager
377    log::info!("{}", format!("Database URL: {}", database_url).blue());
378    let db_manager = Arc::new(DatabaseManager::new(&database_url).await?);
379
380    // Get main database pool for further operations
381    let pool = db_manager.get_main_pool();
382
383    // Platform database initialization
384    println!(
385        "{}",
386        "╔═══════════════════════════════════════════════════════════════╗".bright_blue()
387    );
388    println!(
389        "{}",
390        "║                 Platform Database Registration                ║".bright_blue()
391    );
392    println!(
393        "{}",
394        "╚═══════════════════════════════════════════════════════════════╝".bright_blue()
395    );
396
397    // Get all platforms and initialize their database pools
398    let platforms = db_manager.get_all_platforms().await?;
399    log::info!("{}", format!("Found {} platforms", platforms.len()).blue());
400
401    for platform in &platforms {
402        log::info!(
403            "{}",
404            format!(
405                "Pre-initializing connection for platform: {}",
406                platform.name
407            )
408            .blue()
409        );
410        db_manager
411            .get_platform_pool(&platform.name, platform.id.unwrap_or(0))
412            .await?;
413    }
414
415    // ======================= ClickHouse SETUP ======================
416    println!(
417        "{}",
418        "╔═══════════════════════════════════════════════════════════════╗".blue()
419    );
420    println!(
421        "{}",
422        "║                  CLICKHOUSE CONNECTION                        ║".blue()
423    );
424    println!(
425        "{}",
426        "╚═══════════════════════════════════════════════════════════════╝".blue()
427    );
428    // Initialize ClickHouse connection pool
429    let clickhouse_url = env::var("CLICKHOUSE_URL").unwrap_or_else(|_| {
430        dotenv::dotenv().ok(); // Load environment variables from a .env file if available
431        env::var("DEFAULT_CLICKHOUSE_URL").unwrap_or_else(|_| "http://localhost:8123".to_string())
432    });
433    log::info!("{}", format!("ClickHouse URL: {}", clickhouse_url).blue());
434    log::info!("{}", "Initializing ClickHouse connection...".blue());
435
436    // Modify your connection to include more debugging info
437    let clickhouse_client = clickhouse::Client::default()
438        .with_url(&clickhouse_url)
439        .with_database("default")
440        .with_user("default")
441        .with_password("your_secure_password");
442
443    // Add a simple ping test before attempting schema initialization
444    match clickhouse_client.query("SELECT 1").execute().await {
445        Ok(_) => log::info!("✓ ClickHouse connection test successful"),
446        Err(e) => {
447            log::error!("ClickHouse connection test failed: {:?}", e);
448            panic!("Cannot connect to ClickHouse");
449        }
450    }
451
452    log::info!("{}", "✓ ClickHouse connection established".green());
453    log::info!("{}", "✓ ClickHouse connection pool initialized".green());
454
455    // ====================== Schema SETUP ======================
456
457    // Load schema based on version
458    log::info!("{}", "Loading schema files...".blue());
459    let schema_version =
460        schemas::v1::db::queries::metadata::get_meta_value(pool, "omni_schema_version")
461            .await
462            .unwrap_or_else(|_| "1".to_string());
463
464    let schema_path = format!("{}/sql/v{}/clickhouse_up.sql", PROJECT_ROOT, schema_version);
465    log::info!(
466        "{}",
467        format!("Loading schema from path: {}", schema_path).blue()
468    );
469
470    // Initialize ClickHouse schema
471    log::info!("{}", "Initializing ClickHouse schema...".blue());
472    match api::logging::init_clickhouse_db(&clickhouse_client, &schema_path).await {
473        Ok(_) => log::info!("{}", "✓ ClickHouse schema initialized".green()),
474        Err(e) => {
475            log::error!(
476                "{}",
477                format!("Failed to initialize ClickHouse schema: {:?}", e).red()
478            );
479            panic!("Failed to initialize ClickHouse schema");
480        }
481    };
482
483    // ====================== CLUSTER SETUP ======================
484    println!(
485        "{}",
486        "╔═══════════════════════════════════════════════════════════════╗".bright_magenta()
487    );
488    println!(
489        "{}",
490        "║                     CLUSTER MANAGEMENT                        ║".bright_magenta()
491    );
492    println!(
493        "{}",
494        "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
495    );
496
497    // Initialize node state and cluster management
498    let node_id: Arc<str> =
499        format!("{}:{}", SERVER_CONFIG.address.clone(), SERVER_CONFIG.port).into();
500    log::info!("{}", format!("Node ID: {}", node_id).magenta());
501
502    let shared_state: Arc<RwLock<SharedState>> =
503        Arc::new(RwLock::new(SharedState::new(node_id.clone())));
504
505    // Start peer discovery in background task
506    log::info!("{}", "Starting peer discovery background task".magenta());
507    tokio::task::spawn({
508        let cluster_manager = CLUSTER_MANAGER.clone();
509        let server_config = SERVER_CONFIG.clone();
510        async move {
511            loop {
512                if let Err(e) = cluster_manager
513                    .read()
514                    .await
515                    .discover_peers(&server_config, port)
516                    .await
517                {
518                    log::error!("{}", format!("Failed to discover peers: {e}").red());
519                }
520                tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
521            }
522        }
523    });
524
525    // ====================== AUTOSCALER SETUP ======================
526    println!(
527        "{}",
528        "╔═══════════════════════════════════════════════════════════════╗".bright_yellow()
529    );
530    println!(
531        "{}",
532        "║                    AUTOSCALER SETUP                           ║".bright_yellow()
533    );
534    println!(
535        "{}",
536        "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
537    );
538
539    // Initialize the autoscaler engine
540    let (engine, handle) = autoscalar::init();
541    log::info!("{}", "Autoscaler engine initialized successfully".yellow());
542
543    let config = LighthouseConfig::builder()
544        .evaluation_interval(30) // Check every 30 seconds
545        .add_resource_config(
546            "web-tier",
547            ResourceConfig {
548                resource_type: "kubernetes-deployment".to_string(),
549                policies: vec![
550                    // Multi-metric policy
551                    policies::multi_metric_policy(
552                        "web-scaling",
553                        (75.0, 25.0), // CPU thresholds
554                        (80.0, 30.0), // Memory thresholds
555                        1.5,          // Scale factor
556                        300,          // 5 minute cooldown
557                    ),
558                    // Custom policy for request rate
559                    ScalingPolicy {
560                        name: "request-rate".to_string(),
561                        thresholds: vec![ScalingThreshold {
562                            metric_name: "requests_per_second".to_string(),
563                            scale_up_threshold: 1000.0,
564                            scale_down_threshold: 200.0,
565                            scale_factor: 2.0,
566                            cooldown_seconds: 180,
567                        }],
568                        min_capacity: Some(2),
569                        max_capacity: Some(50),
570                        enabled: true,
571                    },
572                ],
573                default_policy: Some("web-scaling".to_string()),
574                settings: [
575                    ("cluster_name".to_string(), "prod-us-west".to_string()),
576                    ("namespace".to_string(), "web-services".to_string()),
577                ]
578                .into(),
579            },
580        )
581        .global_setting("environment", "production")
582        .enable_logging(true)
583        .build();
584
585    // Update configuration live
586    handle.update_config(config).await?;
587    
588    // ====================== LEADER ELECTION ======================
589    println!(
590        "{}",
591        "╔═══════════════════════════════════════════════════════════════╗".bright_green()
592    );
593    println!(
594        "{}",
595        "║                      LEADER ELECTION                          ║".bright_green()
596    );
597    println!(
598        "{}",
599        "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
600    );
601
602    // Initialize and start leader election
603    log::info!("{}", "Initializing leader election process".green());
604    let _leader_election = LeaderElection::new(node_id, shared_state.clone());
605    log::info!("{}", "✓ Leader election initialized".green());
606
607    // ====================== SERVER STARTUP ======================
608    println!(
609        "{}",
610        "╔═══════════════════════════════════════════════════════════════╗".bright_cyan()
611    );
612    println!(
613        "{}",
614        "║                       SERVER STARTUP                          ║".bright_cyan()
615    );
616    println!(
617        "{}",
618        "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
619    );
620
621    // Define routes to mount
622    log::info!("{}", "Defining API routes".cyan());
623    let routes = vec![
624        (
625            "/",
626            routes![
627                health_check,
628                api::index::routes_ui,
629                cluster_status,
630                cors_preflight
631            ],
632        ),
633        ("/api/v1", api::routes()),
634    ];
635
636    let auth_config = AuthConfig {
637        jwt_secret: std::env::var("JWT_SECRET")
638            .expect("Environment variable JWT_SECRET must be set for secure operation."),
639        token_expiry_hours: std::env::var("TOKEN_EXPIRY_HOURS")
640            .unwrap_or_else(|_| "24".to_string())
641            .parse()
642            .expect("Invalid value for TOKEN_EXPIRY_HOURS"),
643    };
644
645    // Build Rocket instance with base configuration
646    log::info!("{}", "Building Rocket instance".cyan());
647    let rocket_instance = rocket::build()
648        .configure(rocket::Config {
649            port,
650            address: std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
651            ..Default::default()
652        })
653        // Add database manager to Rocket's state
654        .manage(db_manager.clone())
655        .manage(pool.clone()) // This is the pool for the core deployment
656        // database. This is meant to store SYSTEM METADATA and not platform-specific data.
657        .manage(CLUSTER_MANAGER.clone())
658        .manage(clickhouse_client)
659        .manage(shared_state)
660        .manage(auth_config)
661        .attach(CORS); // Attach the CORS fairing
662
663    // Mount routes to the Rocket instance
664    log::info!("{}", "Mounting API routes".cyan());
665    let rocket_with_routes = rocket_instance.mount_routes(routes);
666
667    // Collect routes information before launch
668    api::index::collect_routes(&rocket_with_routes);
669
670    // Launch server
671    log::info!("{}", "🚀 LAUNCHING SERVER...".bright_cyan().bold());
672    let _rocket = rocket_with_routes.launch().await?;
673
674    Ok(())
675}