1mod config;
20mod leader;
21mod state;
22mod app_autoscaler;
23mod cluster;
24mod network;
25mod worker_autoscaler;
26mod db_manager;
27mod schemas;
28
29use anyhow::anyhow;
34use anyhow::Result;
35use anyhow::Error;
36use clickhouse;
37use colored::Colorize;
38use schemas::v1::models::platform;
39use schemas::v1::models::platform::Platform;
40use core::panic;
41use env_logger::Builder;
42use lazy_static::lazy_static;
43use reqwest::Client;
44use rocket::fairing::{Fairing, Info, Kind};
45use rocket::http::Header;
46use rocket::Build;
47use rocket::Rocket;
48use rocket::{Request, Response};
49use schemas::auth::AuthConfig;
50use serde::{Deserialize, Serialize};
51use sqlx::mysql::MySqlPool;
52use sqlx::{MySql, Pool};
53use std::collections::HashMap;
54use std::io::Write;
55use std::time::Duration;
56use std::{env, sync::Arc};
57use tokio::sync::RwLock;
58use worker_autoscaler::create_default_cpu_memory_scaling_policy;
59use worker_autoscaler::WorkerAutoscaler;
60use worker_autoscaler::{CloudDirector, VMConfig, VMTemplate};
61
62use crate::cluster::{ClusterManager, NodeInfo};
64use crate::config::ServerConfig;
65use crate::config::SERVER_CONFIG;
66use crate::leader::LeaderElection;
67use crate::state::SharedState;
68use crate::db_manager::DatabaseManager; use schemas::v1::{api, models};
71
72#[allow(unused_imports)]
75#[macro_use]
76extern crate rocket;
77
78pub static PROJECT_ROOT: &str = env!("CARGO_MANIFEST_DIR");
79
80
81
82
83
84
85
86
87pub struct CORS;
92
93#[rocket::async_trait]
94impl Fairing for CORS {
95 fn info(&self) -> Info {
96 Info {
97 name: "Add comprehensive CORS headers to responses",
98 kind: Kind::Response,
99 }
100 }
101
102 async fn on_response<'r>(&self, _request: &'r Request<'_>, response: &mut Response<'r>) {
103 response.set_header(Header::new("Access-Control-Allow-Origin", "*"));
104 response.set_header(Header::new(
105 "Access-Control-Allow-Methods",
106 "GET, POST, PUT, PATCH, DELETE, OPTIONS, HEAD",
107 ));
108 response.set_header(Header::new(
109 "Access-Control-Allow-Headers",
110 "Authorization, Content-Type, Accept, Origin, X-Requested-With",
111 ));
112 response.set_header(Header::new("Access-Control-Allow-Credentials", "true"));
113 response.set_header(Header::new("Access-Control-Max-Age", "86400")); }
115}
116
117#[options("/<_..>")]
119fn cors_preflight() -> &'static str {
120 ""
121}
122
123trait RocketExt {
128 fn mount_routes(self, routes: Vec<(&'static str, Vec<rocket::Route>)>) -> Self;
134}
135
136impl RocketExt for Rocket<Build> {
137 fn mount_routes(self, routes: Vec<(&'static str, Vec<rocket::Route>)>) -> Self {
138 let mut rocket = self;
139 for (path, routes) in routes {
140 log::info!("{}", format!("Mounting routes at {}", path).green());
141 rocket = rocket.mount(path, routes);
142 }
143 rocket
144 }
145}
146
147lazy_static! {
153 static ref CLUSTER_MANAGER: Arc<RwLock<ClusterManager>> = {
154 let state = format!("{}:{}", SERVER_CONFIG.address, SERVER_CONFIG.port);
155
156 let bind1 = &state;
157 let bind = bind1.clone();
158 let state = SharedState::new(bind.into());
159 let shared_state = Arc::new(RwLock::new(state));
160 Arc::new(RwLock::new(ClusterManager::new(shared_state)))
161 };
162}
163
164#[derive(Debug, Serialize, Deserialize)]
169struct ClusterStatusMessage {
170 node_roles: String,
172 cluster_nodes: Vec<NodeInfo>,
174}
175
176#[derive(Serialize, Deserialize)]
178struct ApiResponse {
179 status: String,
181 message: ClusterStatusMessage,
183}
184
185impl ClusterManager {
189 pub async fn discover_peers(&self, config: &ServerConfig, my_port: u16) -> Result<()> {
200 let client = Client::new();
201 log::info!("{}", "Starting peer discovery...".cyan());
202
203 for instance in &config.instances {
204 let string = format!("{:#?}", instance);
205 log::info!("{}", format!("Discovered: {}", string).blue().bold());
206 if instance.port == my_port {
207 log::debug!("Skipping self-connection at port {}", my_port);
208 continue;
209 }
210
211 let node_address: Arc<str> = format!("{}:{}", instance.address, instance.port).into();
212 let node_uri = format!("{}", node_address);
213
214 match self.connect_to_peer(&client, &node_uri.clone()).await {
215 Ok(_) => log::info!(
216 "{}",
217 format!("Successfully connected to peer: {}", node_uri).green()
218 ),
219 Err(e) => {
220 log::warn!(
221 "{}",
222 format!("Failed to connect to peer: {} {}", node_uri, e).yellow()
223 );
224 self.remove_node(node_uri.into()).await;
225 }
226 }
227 }
228
229 log::info!("{}", "Peer discovery completed".cyan());
230 Ok(())
231 }
232
233 async fn connect_to_peer(&self, client: &Client, node_address: &str) -> Result<()> {
244 let health_url = format!("{}/health", node_address);
245 log::debug!("Checking health at: {}", health_url);
246
247 let response = client
248 .get(&health_url)
249 .timeout(Duration::from_secs(5))
250 .send()
251 .await?;
252
253 if response.status().is_success() {
254 let port = node_address
255 .split(':')
256 .next_back()
257 .unwrap_or("80")
258 .parse::<u16>()
259 .unwrap_or(80);
260
261 let node_info = NodeInfo {
262 id: node_address.into(),
263 address: node_address.into(),
264 port,
265 };
266
267 self.register_node(node_info).await;
268 log::debug!("Node registered: {}", node_address);
269 Ok(())
270 } else {
271 Err(anyhow!("Node health check failed"))
272 }
273 }
274}
275
276#[get("/health")]
285async fn health_check() -> rocket::serde::json::Json<ApiResponse> {
286 log::debug!("Health check endpoint called");
287 rocket::serde::json::Json(ApiResponse {
288 status: "ok".to_string(),
289 message: ClusterStatusMessage {
290 node_roles: "unknown".to_string(),
291 cluster_nodes: vec![],
292 },
293 })
294}
295
296#[get("/cluster/status")]
307async fn cluster_status(
308 state: &rocket::State<Arc<RwLock<SharedState>>>,
309 cluster: &rocket::State<Arc<RwLock<ClusterManager>>>,
310) -> rocket::serde::json::Json<ApiResponse> {
311 log::debug!("Cluster status endpoint called");
312 let state = state.read().await;
313 let nodes = cluster.read().await;
314
315 let role = if state.is_leader {
316 "leader".to_string()
317 } else {
318 "follower".to_string()
319 };
320
321 log::info!("{}", format!("Current node role: {}", role).cyan());
322
323 let response = ApiResponse {
324 status: "ok".to_string(),
325 message: ClusterStatusMessage {
326 node_roles: role,
327 cluster_nodes: nodes.get_nodes().await,
328 },
329 };
330
331 rocket::serde::json::Json(response)
332}
333
334#[rocket::main]
339async fn main() -> Result<(), Box<dyn std::error::Error>> {
340 let port = SERVER_CONFIG.port;
342 println!(
343 "{}",
344 "╔═══════════════════════════════════════════════════════════════╗".bright_cyan()
345 );
346 println!(
347 "{}",
348 "║ OMNI ORCHESTRATOR SERVER STARTING ║".bright_cyan()
349 );
350 println!(
351 "{}",
352 "╚═══════════════════════════════════════════════════════════════╝".bright_cyan()
353 );
354 println!("{}", format!("⇒ Starting server on port {}", port).green());
355
356 Builder::new()
358 .filter_level(log::LevelFilter::Info)
359 .format(|buf, record| {
360 let _style = buf.default_level_style(record.level());
362 writeln!(buf, "{}: {}", record.level(), format!("{}", record.args()))
363 })
364 .init();
365
366 log::info!("{}", "Logger initialized successfully".green());
367
368 println!(
370 "{}",
371 "╔═══════════════════════════════════════════════════════════════╗".bright_blue()
372 );
373 println!(
374 "{}",
375 "║ Deployment Database Connection ║".bright_blue()
376 );
377 println!(
378 "{}",
379 "╚═══════════════════════════════════════════════════════════════╝".bright_blue()
380 );
381
382 let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| {
384 dotenv::dotenv().ok();
385 env::var("DEFAULT_DATABASE_URL")
386 .unwrap_or_else(|_| "mysql://root:root@localhost:4001".to_string())
387 });
388
389 log::info!("{}", format!("Database URL: {}", database_url).blue());
391 let db_manager = Arc::new(DatabaseManager::new(&database_url).await?);
392
393 let pool = db_manager.get_main_pool();
395
396 println!(
398 "{}",
399 "╔═══════════════════════════════════════════════════════════════╗".bright_blue()
400 );
401 println!(
402 "{}",
403 "║ Platform Database Registration ║".bright_blue()
404 );
405 println!(
406 "{}",
407 "╚═══════════════════════════════════════════════════════════════╝".bright_blue()
408 );
409
410 let platforms = db_manager.get_all_platforms().await?;
412 log::info!("{}", format!("Found {} platforms", platforms.len()).blue());
413
414 for platform in &platforms {
415 log::info!(
416 "{}",
417 format!(
418 "Pre-initializing connection for platform: {}",
419 platform.name
420 )
421 .blue()
422 );
423 db_manager.get_platform_pool(&platform.name, platform.id.unwrap_or(0)).await?;
424 }
425
426 println!(
428 "{}",
429 "╔═══════════════════════════════════════════════════════════════╗".blue()
430 );
431 println!(
432 "{}",
433 "║ CLICKHOUSE CONNECTION ║".blue()
434 );
435 println!(
436 "{}",
437 "╚═══════════════════════════════════════════════════════════════╝".blue()
438 );
439 let clickhouse_url = env::var("CLICKHOUSE_URL").unwrap_or_else(|_| {
441 dotenv::dotenv().ok(); env::var("DEFAULT_CLICKHOUSE_URL").unwrap_or_else(|_| "http://localhost:8123".to_string())
443 });
444 log::info!("{}", format!("ClickHouse URL: {}", clickhouse_url).blue());
445 log::info!("{}", "Initializing ClickHouse connection...".blue());
446
447 let clickhouse_client = clickhouse::Client::default()
449 .with_url(&clickhouse_url)
450 .with_database("default")
451 .with_user("default")
452 .with_password("your_secure_password");
453
454 match clickhouse_client.query("SELECT 1").execute().await {
456 Ok(_) => log::info!("✓ ClickHouse connection test successful"),
457 Err(e) => {
458 log::error!("ClickHouse connection test failed: {:?}", e);
459 panic!("Cannot connect to ClickHouse");
460 }
461 }
462
463 log::info!("{}", "✓ ClickHouse connection established".green());
464 log::info!("{}", "✓ ClickHouse connection pool initialized".green());
465
466 log::info!("{}", "Loading schema files...".blue());
470 let schema_version =
471 schemas::v1::db::queries::metadata::get_meta_value(pool, "omni_schema_version")
472 .await
473 .unwrap_or_else(|_| "1".to_string());
474
475 let schema_path = format!("{}/sql/v{}/clickhouse_up.sql", PROJECT_ROOT, schema_version);
476 log::info!(
477 "{}",
478 format!("Loading schema from path: {}", schema_path).blue()
479 );
480
481 log::info!("{}", "Initializing ClickHouse schema...".blue());
483 match api::logging::init_clickhouse_db(&clickhouse_client, &schema_path).await {
484 Ok(_) => log::info!("{}", "✓ ClickHouse schema initialized".green()),
485 Err(e) => {
486 log::error!(
487 "{}",
488 format!("Failed to initialize ClickHouse schema: {:?}", e).red()
489 );
490 panic!("Failed to initialize ClickHouse schema");
491 }
492 };
493
494 println!(
496 "{}",
497 "╔═══════════════════════════════════════════════════════════════╗".bright_magenta()
498 );
499 println!(
500 "{}",
501 "║ CLUSTER MANAGEMENT ║".bright_magenta()
502 );
503 println!(
504 "{}",
505 "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
506 );
507
508 let node_id: Arc<str> =
510 format!("{}:{}", SERVER_CONFIG.address.clone(), SERVER_CONFIG.port).into();
511 log::info!("{}", format!("Node ID: {}", node_id).magenta());
512
513 let shared_state: Arc<RwLock<SharedState>> =
514 Arc::new(RwLock::new(SharedState::new(node_id.clone())));
515
516 log::info!("{}", "Starting peer discovery background task".magenta());
518 tokio::task::spawn({
519 let cluster_manager = CLUSTER_MANAGER.clone();
520 let server_config = SERVER_CONFIG.clone();
521 async move {
522 loop {
523 if let Err(e) = cluster_manager
524 .read()
525 .await
526 .discover_peers(&server_config, port)
527 .await
528 {
529 log::error!("{}", format!("Failed to discover peers: {e}").red());
530 }
531 tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
532 }
533 }
534 });
535
536 println!(
538 "{}",
539 "╔═══════════════════════════════════════════════════════════════╗".bright_yellow()
540 );
541 println!(
542 "{}",
543 "║ AUTOSCALER SETUP ║".bright_yellow()
544 );
545 println!(
546 "{}",
547 "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
548 );
549
550 log::info!(
552 "{}",
553 "Creating worker autoscaler with default policy".yellow()
554 );
555 let policy = create_default_cpu_memory_scaling_policy();
556 let mut autoscaler = WorkerAutoscaler::new(1, 1, policy);
557
558 log::info!("{}", "Adding cloud director (AWS/us-east-1)".yellow());
560 let cloud_director = Arc::new(CloudDirector::new(
561 "cloud-1".to_string(),
562 "aws".to_string(),
563 "us-east-1".to_string(),
564 ));
565 autoscaler.add_director(cloud_director);
566
567 log::info!("{}", "Setting up VM template for worker nodes".yellow());
569 let mut vm_template = VMTemplate::default();
570 vm_template.base_name = "omni-worker".to_string();
571 vm_template.config = VMConfig {
572 cpu: 2,
573 memory: 4096, storage: 80, options: HashMap::new(),
576 };
577 autoscaler.set_vm_template(vm_template);
578 log::info!("{}", "✓ Worker autoscaler configured".green());
579
580 log::info!("{}", "Starting worker autoscaler discovery tasks".yellow());
582 tokio::spawn({
583 let mut autoscaler = autoscaler;
584 async move {
585 log::debug!("Sleeping for 1.5 seconds before discovery...");
587 tokio::time::sleep(Duration::from_millis(1500)).await;
588 loop {
589 log::info!("{}", "Discovering nodes and VMs...".yellow());
590 if let Err(e) = autoscaler.discover_nodes().await {
591 log::error!("{}", format!("Node discovery error: {}", e).red());
592 }
593 if let Err(e) = autoscaler.discover_vms().await {
594 log::error!("{}", format!("VM discovery error: {}", e).red());
595 }
596 let metrics: HashMap<String, f32> = HashMap::new(); if let Err(e) = autoscaler.check_scaling(&metrics) {
598 log::error!("{}", format!("Worker scaling error: {}", e).red());
599 }
600 log::debug!("Sleeping autoscaling thread for 3 seconds...");
602 tokio::time::sleep(Duration::from_secs(3)).await;
603 }
604 }
605 });
606
607 log::info!(
609 "{}",
610 "Creating application autoscaler with default policy".yellow()
611 );
612 let app_policy = app_autoscaler::policy::create_default_cpu_memory_scaling_policy();
613 let app_autoscaler = app_autoscaler::app_autoscaler::AppAutoscaler::new(
614 1, 10, app_policy,
617 );
618 log::info!("{}", "✓ Application autoscaler configured".green());
619
620 log::info!(
622 "{}",
623 "Starting application autoscaler discovery tasks".yellow()
624 );
625 tokio::spawn({
626 let mut app_autoscaler = app_autoscaler;
627 async move {
628 log::debug!("Sleeping for 1.5 seconds before app autoscaler discovery...");
630 tokio::time::sleep(Duration::from_millis(1500)).await;
631 loop {
632 log::info!("{}", "Discovering app instances...".yellow());
633 if let Err(e) = app_autoscaler.discover_app_instances().await {
634 log::error!("{}", format!("App instance discovery error: {}", e).red());
635 }
636
637 let metrics: HashMap<String, f32> = HashMap::new(); if let Err(e) = app_autoscaler.check_scaling(&metrics) {
639 log::error!("{}", format!("App scaling error: {}", e).red());
640 }
641
642 log::debug!("Sleeping app autoscaling thread for 3 seconds...");
644 tokio::time::sleep(Duration::from_secs(3)).await;
645 }
646 }
647 });
648
649 println!(
651 "{}",
652 "╔═══════════════════════════════════════════════════════════════╗".bright_green()
653 );
654 println!(
655 "{}",
656 "║ LEADER ELECTION ║".bright_green()
657 );
658 println!(
659 "{}",
660 "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
661 );
662
663 log::info!("{}", "Initializing leader election process".green());
665 let _leader_election = LeaderElection::new(node_id, shared_state.clone());
666 log::info!("{}", "✓ Leader election initialized".green());
667
668 println!(
670 "{}",
671 "╔═══════════════════════════════════════════════════════════════╗".bright_cyan()
672 );
673 println!(
674 "{}",
675 "║ SERVER STARTUP ║".bright_cyan()
676 );
677 println!(
678 "{}",
679 "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
680 );
681
682 log::info!("{}", "Defining API routes".cyan());
684 let routes = vec![
685 (
686 "/",
687 routes![
688 health_check,
689 api::index::routes_ui,
690 cluster_status,
691 cors_preflight
692 ],
693 ),
694 ("/api/v1", api::routes()),
695 ];
696
697 let auth_config = AuthConfig {
698 jwt_secret: std::env::var("JWT_SECRET")
699 .expect("Environment variable JWT_SECRET must be set for secure operation."),
700 token_expiry_hours: std::env::var("TOKEN_EXPIRY_HOURS")
701 .unwrap_or_else(|_| "24".to_string())
702 .parse()
703 .expect("Invalid value for TOKEN_EXPIRY_HOURS"),
704 };
705
706 log::info!("{}", "Building Rocket instance".cyan());
708 let rocket_instance = rocket::build()
709 .configure(rocket::Config {
710 port,
711 address: std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
712 ..Default::default()
713 })
714 .manage(db_manager.clone())
716 .manage(pool.clone()) .manage(CLUSTER_MANAGER.clone())
719 .manage(clickhouse_client)
720 .manage(shared_state)
721 .manage(auth_config)
722 .attach(CORS); log::info!("{}", "Mounting API routes".cyan());
726 let rocket_with_routes = rocket_instance.mount_routes(routes);
727
728 api::index::collect_routes(&rocket_with_routes);
730
731 log::info!("{}", "🚀 LAUNCHING SERVER...".bright_cyan().bold());
733 let _rocket = rocket_with_routes.launch().await?;
734
735 Ok(())
736}