omni_orchestrator/autoscalar/
mod.rs1use lighthouse::{
2 policies, utils, CallbackContext, LighthouseConfig, LighthouseResult, MetricsProvider,
3 ResourceMetrics, ScaleAction, ScalingExecutor, ScaleDirection, LighthouseEngine,
4 LighthouseCallbacks, LighthouseHandle, ResourceConfig, ScalingPolicy, ScalingThreshold
5};
6use std::sync::Arc;
7use std::collections::HashMap;
8use async_trait::async_trait;
9use colored::Colorize;
10
11pub mod types;
12
13#[async_trait]
15impl MetricsProvider for types::OmniMetricsProvider {
16 async fn get_metrics(
17 &self,
18 resource_id: &str,
19 _context: &CallbackContext,
20 ) -> LighthouseResult<Option<ResourceMetrics>> {
21 let metrics = vec![
23 ("cpu_usage", 75.0),
24 ("memory_usage", 65.0),
25 ("disk_io", 120.0),
26 ("network_io", 200.0),
27 ];
28
29 Ok(Some(utils::multi_metrics(
30 resource_id,
31 "omni_application",
32 metrics,
33 )))
34 }
35}
36
37#[async_trait]
38impl ScalingExecutor for types::OmniScalar {
39 async fn execute_scale_action(
40 &self,
41 action: &ScaleAction,
42 _context: &CallbackContext,
43 ) -> LighthouseResult<bool> {
44 match action.direction {
45 ScaleDirection::Up => {
46 self.scale_deployment(&action.resource_id, action.scale_factor).await
47 .map_err(|e| lighthouse::LighthouseError::unexpected(e.to_string()))?;
48 println!("✅ Scaled up {}: {}", action.resource_id, action.reason);
49 }
50 ScaleDirection::Down => {
51 self.scale_deployment(&action.resource_id, action.scale_factor).await
52 .map_err(|e| lighthouse::LighthouseError::unexpected(e.to_string()))?;
53 println!("📉 Scaled down {}: {}", action.resource_id, action.reason);
54 }
55 ScaleDirection::Maintain => {
56 println!("➡️ Maintaining {}", action.resource_id);
57 }
58 }
59 Ok(true)
60 }
61
62 async fn is_safe_to_scale(
63 &self,
64 _action: &ScaleAction,
65 _context: &CallbackContext,
66 ) -> LighthouseResult<bool> {
67 let has_capacity = self.check_cluster_capacity().await
68 .map_err(|e| lighthouse::LighthouseError::unexpected(e.to_string()))?;
69
70 Ok(has_capacity && !self.is_maintenance_window())
71 }
72
73 async fn get_current_capacity(
74 &self,
75 _resource_id: &str,
76 _context: &CallbackContext,
77 ) -> LighthouseResult<Option<u32>> {
78 Ok(Some(3)) }
81}
82
83pub fn init() -> (LighthouseEngine, LighthouseHandle) {
85 log::info!("{}", "Initializing Lighthouse autoscaler...".yellow());
86
87 let config = LighthouseConfig::builder()
89 .evaluation_interval(30) .enable_logging(true)
91 .add_resource_config("omni_application", ResourceConfig {
92 resource_type: "omni_application".to_string(),
93 policies: vec![
94 ScalingPolicy {
96 name: "cpu-scaling".to_string(),
97 thresholds: vec![ScalingThreshold {
98 metric_name: "cpu_usage".to_string(),
99 scale_up_threshold: 80.0, scale_down_threshold: 30.0, scale_factor: 1.5, cooldown_seconds: 300, }],
104 min_capacity: Some(1),
105 max_capacity: Some(10),
106 enabled: true,
107 },
108 ScalingPolicy {
110 name: "memory-scaling".to_string(),
111 thresholds: vec![ScalingThreshold {
112 metric_name: "memory_usage".to_string(),
113 scale_up_threshold: 85.0,
114 scale_down_threshold: 40.0,
115 scale_factor: 1.3,
116 cooldown_seconds: 600, }],
118 min_capacity: Some(1),
119 max_capacity: Some(10),
120 enabled: true,
121 },
122 ],
123 default_policy: Some("cpu-scaling".to_string()),
124 settings: HashMap::new(),
125 })
126 .global_setting("environment", "production")
127 .global_setting("cluster_name", "omni-cluster")
128 .build();
129
130 let metrics_provider = Arc::new(types::OmniMetricsProvider::new());
132 let scaling_executor = Arc::new(types::OmniScalar::new());
133
134 let callbacks = LighthouseCallbacks::new(metrics_provider, scaling_executor);
135
136 let engine = LighthouseEngine::new(config, callbacks);
138 let handle = engine.handle();
139
140 log::info!("{}", "✅ Lighthouse autoscaler initialized successfully!".green());
141
142 (engine, handle)
143}
144
145pub async fn start_autoscaler() -> LighthouseHandle {
147 let (engine, handle) = init();
148
149 tokio::spawn(async move {
151 if let Err(e) = engine.start().await {
152 log::error!("Lighthouse engine failed: {}", e);
153 }
154 });
155
156 tokio::time::sleep(std::time::Duration::from_millis(100)).await;
158
159 log::info!("{}", "🚀 Lighthouse autoscaler started!".green());
160 handle
161}
162
163#[cfg(test)]
165mod example_usage {
166 use super::*;
167
168 #[tokio::test]
169 async fn example_usage() {
170 let (engine, handle) = init();
172
173 tokio::spawn(async move {
175 engine.start().await.unwrap();
176 });
177
178 handle.update_metrics(utils::single_metric(
180 "my-app-instance-1",
181 "omni_application",
182 "cpu_usage",
183 85.0 )).await.unwrap();
185
186 let handle = start_autoscaler().await;
188
189 handle.update_metrics(utils::multi_metrics(
192 "my-app-instance-1",
193 "omni_application",
194 vec![
195 ("cpu_usage", 85.0),
196 ("memory_usage", 70.0),
197 ]
198 )).await.unwrap();
199
200 let status = handle.get_status().await.unwrap();
202 println!("Engine status: {:?}", status);
203
204 handle.shutdown().await.unwrap();
206 }
207}