omni_orchestrator/autoscalar/
mod.rs

1use lighthouse::{
2    policies, utils, CallbackContext, LighthouseConfig, LighthouseResult, MetricsProvider,
3    ResourceMetrics, ScaleAction, ScalingExecutor, ScaleDirection, LighthouseEngine, 
4    LighthouseCallbacks, LighthouseHandle, ResourceConfig, ScalingPolicy, ScalingThreshold
5};
6use std::sync::Arc;
7use std::collections::HashMap;
8use async_trait::async_trait;
9use colored::Colorize;
10
11pub mod types;
12
13// Clean trait implementations (outside of functions!)
14#[async_trait]
15impl MetricsProvider for types::OmniMetricsProvider {
16    async fn get_metrics(
17        &self,
18        resource_id: &str,
19        _context: &CallbackContext,
20    ) -> LighthouseResult<Option<ResourceMetrics>> {
21        // Your metrics collection logic
22        let metrics = vec![
23            ("cpu_usage", 75.0),
24            ("memory_usage", 65.0),
25            ("disk_io", 120.0),
26            ("network_io", 200.0),
27        ];
28
29        Ok(Some(utils::multi_metrics(
30            resource_id,
31            "omni_application",
32            metrics,
33        )))
34    }
35}
36
37#[async_trait]
38impl ScalingExecutor for types::OmniScalar {
39    async fn execute_scale_action(
40        &self,
41        action: &ScaleAction,
42        _context: &CallbackContext,
43    ) -> LighthouseResult<bool> {
44        match action.direction {
45            ScaleDirection::Up => {
46                self.scale_deployment(&action.resource_id, action.scale_factor).await
47                    .map_err(|e| lighthouse::LighthouseError::unexpected(e.to_string()))?;
48                println!("✅ Scaled up {}: {}", action.resource_id, action.reason);
49            }
50            ScaleDirection::Down => {
51                self.scale_deployment(&action.resource_id, action.scale_factor).await
52                    .map_err(|e| lighthouse::LighthouseError::unexpected(e.to_string()))?;
53                println!("📉 Scaled down {}: {}", action.resource_id, action.reason);
54            }
55            ScaleDirection::Maintain => {
56                println!("➡️ Maintaining {}", action.resource_id);
57            }
58        }
59        Ok(true)
60    }
61
62    async fn is_safe_to_scale(
63        &self,
64        _action: &ScaleAction,
65        _context: &CallbackContext,
66    ) -> LighthouseResult<bool> {
67        let has_capacity = self.check_cluster_capacity().await
68            .map_err(|e| lighthouse::LighthouseError::unexpected(e.to_string()))?;
69        
70        Ok(has_capacity && !self.is_maintenance_window())
71    }
72
73    async fn get_current_capacity(
74        &self,
75        _resource_id: &str,
76        _context: &CallbackContext,
77    ) -> LighthouseResult<Option<u32>> {
78        // Return current instance count or None if unknown
79        Ok(Some(3)) // Example: currently have 3 instances
80    }
81}
82
83// The main initialization function that returns everything you need
84pub fn init() -> (LighthouseEngine, LighthouseHandle) {
85    log::info!("{}", "Initializing Lighthouse autoscaler...".yellow());
86
87    // 1. Create configuration with actual values
88    let config = LighthouseConfig::builder()
89        .evaluation_interval(30) // Check every 30 seconds
90        .enable_logging(true)
91        .add_resource_config("omni_application", ResourceConfig {
92            resource_type: "omni_application".to_string(),
93            policies: vec![
94                // CPU-based scaling policy
95                ScalingPolicy {
96                    name: "cpu-scaling".to_string(),
97                    thresholds: vec![ScalingThreshold {
98                        metric_name: "cpu_usage".to_string(),
99                        scale_up_threshold: 80.0,   // Scale up when CPU > 80%
100                        scale_down_threshold: 30.0, // Scale down when CPU < 30%
101                        scale_factor: 1.5,          // Scale by 50%
102                        cooldown_seconds: 300,      // Wait 5 minutes between actions
103                    }],
104                    min_capacity: Some(1),
105                    max_capacity: Some(10),
106                    enabled: true,
107                },
108                // Memory-based scaling policy
109                ScalingPolicy {
110                    name: "memory-scaling".to_string(),
111                    thresholds: vec![ScalingThreshold {
112                        metric_name: "memory_usage".to_string(),
113                        scale_up_threshold: 85.0,
114                        scale_down_threshold: 40.0,
115                        scale_factor: 1.3,
116                        cooldown_seconds: 600, // 10 minute cooldown for memory
117                    }],
118                    min_capacity: Some(1),
119                    max_capacity: Some(10),
120                    enabled: true,
121                },
122            ],
123            default_policy: Some("cpu-scaling".to_string()),
124            settings: HashMap::new(),
125        })
126        .global_setting("environment", "production")
127        .global_setting("cluster_name", "omni-cluster")
128        .build();
129
130    // 2. Create callback implementations
131    let metrics_provider = Arc::new(types::OmniMetricsProvider::new());
132    let scaling_executor = Arc::new(types::OmniScalar::new());
133
134    let callbacks = LighthouseCallbacks::new(metrics_provider, scaling_executor);
135
136    // 3. Create engine and handle
137    let engine = LighthouseEngine::new(config, callbacks);
138    let handle = engine.handle();
139
140    log::info!("{}", "✅ Lighthouse autoscaler initialized successfully!".green());
141
142    (engine, handle)
143}
144
145// Helper function to start the engine in the background
146pub async fn start_autoscaler() -> LighthouseHandle {
147    let (engine, handle) = init();
148
149    // Start the engine in a background task
150    tokio::spawn(async move {
151        if let Err(e) = engine.start().await {
152            log::error!("Lighthouse engine failed: {}", e);
153        }
154    });
155
156    // Give the engine a moment to start
157    tokio::time::sleep(std::time::Duration::from_millis(100)).await;
158
159    log::info!("{}", "🚀 Lighthouse autoscaler started!".green());
160    handle
161}
162
163// Usage example:
164#[cfg(test)]
165mod example_usage {
166    use super::*;
167
168    #[tokio::test]
169    async fn example_usage() {
170        // Option 1: Manual control
171        let (engine, handle) = init();
172        
173        // Start engine in background
174        tokio::spawn(async move {
175            engine.start().await.unwrap();
176        });
177
178        // Send some metrics
179        handle.update_metrics(utils::single_metric(
180            "my-app-instance-1",
181            "omni_application", 
182            "cpu_usage",
183            85.0 // High CPU should trigger scale-up
184        )).await.unwrap();
185
186        // Option 2: Simple start (recommended)
187        let handle = start_autoscaler().await;
188        
189        // The engine is now running and will automatically scale based on metrics
190        // You can still send metrics manually:
191        handle.update_metrics(utils::multi_metrics(
192            "my-app-instance-1",
193            "omni_application",
194            vec![
195                ("cpu_usage", 85.0),
196                ("memory_usage", 70.0),
197            ]
198        )).await.unwrap();
199
200        // Get current status
201        let status = handle.get_status().await.unwrap();
202        println!("Engine status: {:?}", status);
203
204        // Shutdown when done
205        handle.shutdown().await.unwrap();
206    }
207}