1mod autoscalar;
20mod cluster;
21mod config;
22mod db_manager;
23mod leader;
24mod network;
25mod schemas;
26mod state;
27
28use anyhow::anyhow;
33use anyhow::Result;
34use clickhouse;
35use colored::Colorize;
36use core::panic;
37use env_logger::Builder;
38use lazy_static::lazy_static;
39use libomni::types::db::auth::AuthConfig;
40use reqwest::Client;
41use rocket::{
42 fairing::{Fairing, Info, Kind},
43 http::Header,
44 Build, Request, Response, Rocket,
45};
46use serde::{Deserialize, Serialize};
47use std::time::Duration;
48use std::{env, sync::Arc};
49use tokio::sync::RwLock;
50use std::io::Write;
51
52use crate::cluster::{ClusterManager, NodeInfo};
54use crate::config::ServerConfig;
55use crate::config::SERVER_CONFIG;
56use crate::db_manager::DatabaseManager;
57use crate::leader::LeaderElection;
58use crate::state::SharedState; use libomni::types::db::v1 as types;
61use lighthouse::{
62 policies, LighthouseConfig, ResourceConfig, ScalingPolicy, ScalingThreshold
63};
64use schemas::v1::api;
65
66#[allow(unused_imports)]
69#[macro_use]
70extern crate rocket;
71
72pub static PROJECT_ROOT: &str = env!("CARGO_MANIFEST_DIR");
73
74pub struct CORS;
79
80#[rocket::async_trait]
81impl Fairing for CORS {
82 fn info(&self) -> Info {
83 Info {
84 name: "Add comprehensive CORS headers to responses",
85 kind: Kind::Response,
86 }
87 }
88
89 async fn on_response<'r>(&self, _request: &'r Request<'_>, response: &mut Response<'r>) {
90 response.set_header(Header::new("Access-Control-Allow-Origin", "*"));
91 response.set_header(Header::new(
92 "Access-Control-Allow-Methods",
93 "GET, POST, PUT, PATCH, DELETE, OPTIONS, HEAD",
94 ));
95 response.set_header(Header::new(
96 "Access-Control-Allow-Headers",
97 "Authorization, Content-Type, Accept, Origin, X-Requested-With",
98 ));
99 response.set_header(Header::new("Access-Control-Allow-Credentials", "true"));
100 response.set_header(Header::new("Access-Control-Max-Age", "86400")); }
102}
103
104#[options("/<_..>")]
106fn cors_preflight() -> &'static str {
107 ""
108}
109
110trait RocketExt {
115 fn mount_routes(self, routes: Vec<(&'static str, Vec<rocket::Route>)>) -> Self;
121}
122
123impl RocketExt for Rocket<Build> {
124 fn mount_routes(self, routes: Vec<(&'static str, Vec<rocket::Route>)>) -> Self {
125 let mut rocket = self;
126 for (path, routes) in routes {
127 log::info!("{}", format!("Mounting routes at {}", path).green());
128 rocket = rocket.mount(path, routes);
129 }
130 rocket
131 }
132}
133
134lazy_static! {
140 static ref CLUSTER_MANAGER: Arc<RwLock<ClusterManager>> = {
141 let state = format!("{}:{}", SERVER_CONFIG.address, SERVER_CONFIG.port);
142
143 let bind1 = &state;
144 let bind = bind1.clone();
145 let state = SharedState::new(bind.into());
146 let shared_state = Arc::new(RwLock::new(state));
147 Arc::new(RwLock::new(ClusterManager::new(shared_state)))
148 };
149}
150
151#[derive(Debug, Serialize, Deserialize)]
156struct ClusterStatusMessage {
157 node_roles: String,
159 cluster_nodes: Vec<NodeInfo>,
161}
162
163#[derive(Serialize, Deserialize)]
165struct ApiResponse {
166 status: String,
168 message: ClusterStatusMessage,
170}
171
172impl ClusterManager {
176 pub async fn discover_peers(&self, config: &ServerConfig, my_port: u16) -> Result<()> {
187 let client = Client::new();
188 log::info!("{}", "Starting peer discovery...".cyan());
189
190 for instance in &config.instances {
191 let string = format!("{:#?}", instance);
192 log::info!("{}", format!("Discovered: {}", string).blue().bold());
193 if instance.port == my_port {
194 log::debug!("Skipping self-connection at port {}", my_port);
195 continue;
196 }
197
198 let node_address: Arc<str> = format!("{}:{}", instance.address, instance.port).into();
199 let node_uri = format!("{}", node_address);
200
201 match self.connect_to_peer(&client, &node_uri.clone()).await {
202 Ok(_) => log::info!(
203 "{}",
204 format!("Successfully connected to peer: {}", node_uri).green()
205 ),
206 Err(e) => {
207 log::warn!(
208 "{}",
209 format!("Failed to connect to peer: {} {}", node_uri, e).yellow()
210 );
211 self.remove_node(node_uri.into()).await;
212 }
213 }
214 }
215
216 log::info!("{}", "Peer discovery completed".cyan());
217 Ok(())
218 }
219
220 async fn connect_to_peer(&self, client: &Client, node_address: &str) -> Result<()> {
231 let health_url = format!("{}/health", node_address);
232 log::debug!("Checking health at: {}", health_url);
233
234 let response = client
235 .get(&health_url)
236 .timeout(Duration::from_secs(5))
237 .send()
238 .await?;
239
240 if response.status().is_success() {
241 let port = node_address
242 .split(':')
243 .next_back()
244 .unwrap_or("80")
245 .parse::<u16>()
246 .unwrap_or(80);
247
248 let node_info = NodeInfo {
249 id: node_address.into(),
250 address: node_address.into(),
251 port,
252 };
253
254 self.register_node(node_info).await;
255 log::debug!("Node registered: {}", node_address);
256 Ok(())
257 } else {
258 Err(anyhow!("Node health check failed"))
259 }
260 }
261}
262
263#[get("/health")]
272async fn health_check() -> rocket::serde::json::Json<ApiResponse> {
273 log::debug!("Health check endpoint called");
274 rocket::serde::json::Json(ApiResponse {
275 status: "ok".to_string(),
276 message: ClusterStatusMessage {
277 node_roles: "unknown".to_string(),
278 cluster_nodes: vec![],
279 },
280 })
281}
282
283#[get("/cluster/status")]
294async fn cluster_status(
295 state: &rocket::State<Arc<RwLock<SharedState>>>,
296 cluster: &rocket::State<Arc<RwLock<ClusterManager>>>,
297) -> rocket::serde::json::Json<ApiResponse> {
298 log::debug!("Cluster status endpoint called");
299 let state = state.read().await;
300 let nodes = cluster.read().await;
301
302 let role = if state.is_leader {
303 "leader".to_string()
304 } else {
305 "follower".to_string()
306 };
307
308 log::info!("{}", format!("Current node role: {}", role).cyan());
309
310 let response = ApiResponse {
311 status: "ok".to_string(),
312 message: ClusterStatusMessage {
313 node_roles: role,
314 cluster_nodes: nodes.get_nodes().await,
315 },
316 };
317
318 rocket::serde::json::Json(response)
319}
320
321#[rocket::main]
326async fn main() -> Result<(), Box<dyn std::error::Error>> {
327 let port = SERVER_CONFIG.port;
329 println!(
330 "{}",
331 "╔═══════════════════════════════════════════════════════════════╗".bright_cyan()
332 );
333 println!(
334 "{}",
335 "║ OMNI ORCHESTRATOR SERVER STARTING ║".bright_cyan()
336 );
337 println!(
338 "{}",
339 "╚═══════════════════════════════════════════════════════════════╝".bright_cyan()
340 );
341 println!("{}", format!("⇒ Starting server on port {}", port).green());
342
343 Builder::new()
345 .filter_level(log::LevelFilter::Info)
346 .format(|buf, record| {
347 let _style = buf.default_level_style(record.level());
349 writeln!(buf, "{}: {}", record.level(), format!("{}", record.args()))
350 })
351 .init();
352
353 log::info!("{}", "Logger initialized successfully".green());
354
355 println!(
357 "{}",
358 "╔═══════════════════════════════════════════════════════════════╗".bright_blue()
359 );
360 println!(
361 "{}",
362 "║ Deployment Database Connection ║".bright_blue()
363 );
364 println!(
365 "{}",
366 "╚═══════════════════════════════════════════════════════════════╝".bright_blue()
367 );
368
369 let database_url = env::var("DATABASE_URL").unwrap_or_else(|_| {
371 dotenv::dotenv().ok();
372 env::var("DEFAULT_DATABASE_URL")
373 .unwrap_or_else(|_| "mysql://root:root@localhost:4001".to_string())
374 });
375
376 log::info!("{}", format!("Database URL: {}", database_url).blue());
378 let db_manager = Arc::new(DatabaseManager::new(&database_url).await?);
379
380 let pool = db_manager.get_main_pool();
382
383 println!(
385 "{}",
386 "╔═══════════════════════════════════════════════════════════════╗".bright_blue()
387 );
388 println!(
389 "{}",
390 "║ Platform Database Registration ║".bright_blue()
391 );
392 println!(
393 "{}",
394 "╚═══════════════════════════════════════════════════════════════╝".bright_blue()
395 );
396
397 let platforms = db_manager.get_all_platforms().await?;
399 log::info!("{}", format!("Found {} platforms", platforms.len()).blue());
400
401 for platform in &platforms {
402 log::info!(
403 "{}",
404 format!(
405 "Pre-initializing connection for platform: {}",
406 platform.name
407 )
408 .blue()
409 );
410 db_manager
411 .get_platform_pool(&platform.name, platform.id.unwrap_or(0))
412 .await?;
413 }
414
415 println!(
417 "{}",
418 "╔═══════════════════════════════════════════════════════════════╗".blue()
419 );
420 println!(
421 "{}",
422 "║ CLICKHOUSE CONNECTION ║".blue()
423 );
424 println!(
425 "{}",
426 "╚═══════════════════════════════════════════════════════════════╝".blue()
427 );
428 let clickhouse_url = env::var("CLICKHOUSE_URL").unwrap_or_else(|_| {
430 dotenv::dotenv().ok(); env::var("DEFAULT_CLICKHOUSE_URL").unwrap_or_else(|_| "http://localhost:8123".to_string())
432 });
433 log::info!("{}", format!("ClickHouse URL: {}", clickhouse_url).blue());
434 log::info!("{}", "Initializing ClickHouse connection...".blue());
435
436 let clickhouse_client = clickhouse::Client::default()
438 .with_url(&clickhouse_url)
439 .with_database("default")
440 .with_user("default")
441 .with_password("your_secure_password");
442
443 match clickhouse_client.query("SELECT 1").execute().await {
445 Ok(_) => log::info!("✓ ClickHouse connection test successful"),
446 Err(e) => {
447 log::error!("ClickHouse connection test failed: {:?}", e);
448 panic!("Cannot connect to ClickHouse");
449 }
450 }
451
452 log::info!("{}", "✓ ClickHouse connection established".green());
453 log::info!("{}", "✓ ClickHouse connection pool initialized".green());
454
455 log::info!("{}", "Loading schema files...".blue());
459 let schema_version =
460 schemas::v1::db::queries::metadata::get_meta_value(pool, "omni_schema_version")
461 .await
462 .unwrap_or_else(|_| "1".to_string());
463
464 let schema_path = format!("{}/sql/v{}/clickhouse_up.sql", PROJECT_ROOT, schema_version);
465 log::info!(
466 "{}",
467 format!("Loading schema from path: {}", schema_path).blue()
468 );
469
470 log::info!("{}", "Initializing ClickHouse schema...".blue());
472 match api::logging::init_clickhouse_db(&clickhouse_client, &schema_path).await {
473 Ok(_) => log::info!("{}", "✓ ClickHouse schema initialized".green()),
474 Err(e) => {
475 log::error!(
476 "{}",
477 format!("Failed to initialize ClickHouse schema: {:?}", e).red()
478 );
479 panic!("Failed to initialize ClickHouse schema");
480 }
481 };
482
483 println!(
485 "{}",
486 "╔═══════════════════════════════════════════════════════════════╗".bright_magenta()
487 );
488 println!(
489 "{}",
490 "║ CLUSTER MANAGEMENT ║".bright_magenta()
491 );
492 println!(
493 "{}",
494 "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
495 );
496
497 let node_id: Arc<str> =
499 format!("{}:{}", SERVER_CONFIG.address.clone(), SERVER_CONFIG.port).into();
500 log::info!("{}", format!("Node ID: {}", node_id).magenta());
501
502 let shared_state: Arc<RwLock<SharedState>> =
503 Arc::new(RwLock::new(SharedState::new(node_id.clone())));
504
505 log::info!("{}", "Starting peer discovery background task".magenta());
507 tokio::task::spawn({
508 let cluster_manager = CLUSTER_MANAGER.clone();
509 let server_config = SERVER_CONFIG.clone();
510 async move {
511 loop {
512 if let Err(e) = cluster_manager
513 .read()
514 .await
515 .discover_peers(&server_config, port)
516 .await
517 {
518 log::error!("{}", format!("Failed to discover peers: {e}").red());
519 }
520 tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
521 }
522 }
523 });
524
525 println!(
527 "{}",
528 "╔═══════════════════════════════════════════════════════════════╗".bright_yellow()
529 );
530 println!(
531 "{}",
532 "║ AUTOSCALER SETUP ║".bright_yellow()
533 );
534 println!(
535 "{}",
536 "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
537 );
538
539 let (engine, handle) = autoscalar::init();
541 log::info!("{}", "Autoscaler engine initialized successfully".yellow());
542
543 let config = LighthouseConfig::builder()
544 .evaluation_interval(30) .add_resource_config(
546 "web-tier",
547 ResourceConfig {
548 resource_type: "kubernetes-deployment".to_string(),
549 policies: vec![
550 policies::multi_metric_policy(
552 "web-scaling",
553 (75.0, 25.0), (80.0, 30.0), 1.5, 300, ),
558 ScalingPolicy {
560 name: "request-rate".to_string(),
561 thresholds: vec![ScalingThreshold {
562 metric_name: "requests_per_second".to_string(),
563 scale_up_threshold: 1000.0,
564 scale_down_threshold: 200.0,
565 scale_factor: 2.0,
566 cooldown_seconds: 180,
567 }],
568 min_capacity: Some(2),
569 max_capacity: Some(50),
570 enabled: true,
571 },
572 ],
573 default_policy: Some("web-scaling".to_string()),
574 settings: [
575 ("cluster_name".to_string(), "prod-us-west".to_string()),
576 ("namespace".to_string(), "web-services".to_string()),
577 ]
578 .into(),
579 },
580 )
581 .global_setting("environment", "production")
582 .enable_logging(true)
583 .build();
584
585 handle.update_config(config).await?;
587
588 println!(
590 "{}",
591 "╔═══════════════════════════════════════════════════════════════╗".bright_green()
592 );
593 println!(
594 "{}",
595 "║ LEADER ELECTION ║".bright_green()
596 );
597 println!(
598 "{}",
599 "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
600 );
601
602 log::info!("{}", "Initializing leader election process".green());
604 let _leader_election = LeaderElection::new(node_id, shared_state.clone());
605 log::info!("{}", "✓ Leader election initialized".green());
606
607 println!(
609 "{}",
610 "╔═══════════════════════════════════════════════════════════════╗".bright_cyan()
611 );
612 println!(
613 "{}",
614 "║ SERVER STARTUP ║".bright_cyan()
615 );
616 println!(
617 "{}",
618 "╚═══════════════════════════════════════════════════════════════╝".bright_magenta()
619 );
620
621 log::info!("{}", "Defining API routes".cyan());
623 let routes = vec![
624 (
625 "/",
626 routes![
627 health_check,
628 api::index::routes_ui,
629 cluster_status,
630 cors_preflight
631 ],
632 ),
633 ("/api/v1", api::routes()),
634 ];
635
636 let auth_config = AuthConfig {
637 jwt_secret: std::env::var("JWT_SECRET")
638 .expect("Environment variable JWT_SECRET must be set for secure operation."),
639 token_expiry_hours: std::env::var("TOKEN_EXPIRY_HOURS")
640 .unwrap_or_else(|_| "24".to_string())
641 .parse()
642 .expect("Invalid value for TOKEN_EXPIRY_HOURS"),
643 };
644
645 log::info!("{}", "Building Rocket instance".cyan());
647 let rocket_instance = rocket::build()
648 .configure(rocket::Config {
649 port,
650 address: std::net::IpAddr::V4(std::net::Ipv4Addr::new(0, 0, 0, 0)),
651 ..Default::default()
652 })
653 .manage(db_manager.clone())
655 .manage(pool.clone()) .manage(CLUSTER_MANAGER.clone())
658 .manage(clickhouse_client)
659 .manage(shared_state)
660 .manage(auth_config)
661 .attach(CORS); log::info!("{}", "Mounting API routes".cyan());
665 let rocket_with_routes = rocket_instance.mount_routes(routes);
666
667 api::index::collect_routes(&rocket_with_routes);
669
670 log::info!("{}", "🚀 LAUNCHING SERVER...".bright_cyan().bold());
672 let _rocket = rocket_with_routes.launch().await?;
673
674 Ok(())
675}