lighthouse/
engine.rs

1// src/engine.rs
2
3use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{SystemTime, UNIX_EPOCH};
6use tokio::sync::{mpsc, RwLock};
7use tokio::time::{interval, Duration};
8use tracing::{debug, error, info, warn};
9
10use crate::callbacks::{CallbackContext, LighthouseCallbacks};
11use crate::error::{LighthouseError, LighthouseResult};
12use crate::types::{
13    LighthouseConfig, ResourceId, ResourceMetrics, ScaleAction, ScaleDirection, ScalingPolicy,
14};
15
16/// Commands that can be sent to the lighthouse engine
17#[derive(Debug)]
18pub enum EngineCommand {
19    /// Add or update metrics for a resource
20    UpdateMetrics(ResourceMetrics),
21    /// Update the configuration
22    UpdateConfig(LighthouseConfig),
23    /// Request scaling recommendation for a resource
24    GetRecommendation {
25        resource_id: ResourceId,
26        response: tokio::sync::oneshot::Sender<LighthouseResult<Option<ScaleAction>>>,
27    },
28    /// Get current engine status
29    GetStatus {
30        response: tokio::sync::oneshot::Sender<EngineStatus>,
31    },
32    /// Shutdown the engine
33    Shutdown,
34}
35
36/// Status information about the lighthouse engine
37#[derive(Debug, Clone)]
38pub struct EngineStatus {
39    pub is_running: bool,
40    pub resources_tracked: usize,
41    pub total_recommendations: u64,
42    pub last_evaluation: Option<u64>,
43}
44
45/// Tracks cooldown periods for resources to prevent flapping
46#[derive(Debug)]
47struct CooldownTracker {
48    last_scale_time: HashMap<ResourceId, u64>,
49}
50
51impl CooldownTracker {
52    fn new() -> Self {
53        Self {
54            last_scale_time: HashMap::new(),
55        }
56    }
57
58    fn is_cooled_down(&self, resource_id: &ResourceId, cooldown_seconds: u64) -> bool {
59        let now = current_timestamp();
60        match self.last_scale_time.get(resource_id) {
61            Some(last_time) => now >= last_time + cooldown_seconds,
62            None => true, // Never scaled before
63        }
64    }
65
66    fn record_scale_action(&mut self, resource_id: &ResourceId) {
67        self.last_scale_time.insert(resource_id.clone(), current_timestamp());
68    }
69}
70
71/// The main lighthouse autoscaling engine
72pub struct LighthouseEngine {
73    config: Arc<RwLock<LighthouseConfig>>,
74    callbacks: LighthouseCallbacks,
75    command_tx: mpsc::UnboundedSender<EngineCommand>,
76    command_rx: Option<mpsc::UnboundedReceiver<EngineCommand>>,
77    status: Arc<RwLock<EngineStatus>>,
78    metrics_cache: Arc<RwLock<HashMap<ResourceId, ResourceMetrics>>>,
79    cooldown_tracker: Arc<RwLock<CooldownTracker>>,
80}
81
82impl LighthouseEngine {
83    /// Create a new lighthouse engine
84    pub fn new(config: LighthouseConfig, callbacks: LighthouseCallbacks) -> Self {
85        let (command_tx, command_rx) = mpsc::unbounded_channel();
86
87        Self {
88            config: Arc::new(RwLock::new(config)),
89            callbacks,
90            command_tx,
91            command_rx: Some(command_rx),
92            status: Arc::new(RwLock::new(EngineStatus {
93                is_running: false,
94                resources_tracked: 0,
95                total_recommendations: 0,
96                last_evaluation: None,
97            })),
98            metrics_cache: Arc::new(RwLock::new(HashMap::new())),
99            cooldown_tracker: Arc::new(RwLock::new(CooldownTracker::new())),
100        }
101    }
102
103    /// Get a handle to send commands to the engine
104    pub fn handle(&self) -> LighthouseHandle {
105        LighthouseHandle {
106            command_tx: self.command_tx.clone(),
107        }
108    }
109
110    /// Start the lighthouse engine (consumes self)
111    pub async fn start(mut self) -> LighthouseResult<()> {
112        let mut command_rx = self.command_rx.take()
113            .ok_or_else(|| LighthouseError::engine_not_running("Engine already started"))?;
114
115        // Update status
116        {
117            let mut status = self.status.write().await;
118            status.is_running = true;
119        }
120
121        info!("Lighthouse engine starting...");
122
123        // Create evaluation timer
124        let config = self.config.read().await;
125        let interval_duration = Duration::from_secs(config.evaluation_interval_seconds);
126        drop(config);
127
128        let mut evaluation_timer = interval(interval_duration);
129
130        // Main engine loop
131        loop {
132            tokio::select! {
133                // Handle commands
134                command = command_rx.recv() => {
135                    match command {
136                        Some(cmd) => {
137                            if let Err(e) = self.handle_command(cmd).await {
138                                error!("Error handling command: {}", e);
139                            }
140                        }
141                        None => {
142                            info!("Command channel closed, shutting down engine");
143                            break;
144                        }
145                    }
146                }
147
148                // Periodic evaluation
149                _ = evaluation_timer.tick() => {
150                    if let Err(e) = self.evaluate_all_resources().await {
151                        error!("Error during evaluation: {}", e);
152                    }
153                }
154            }
155        }
156
157        // Update status
158        {
159            let mut status = self.status.write().await;
160            status.is_running = false;
161        }
162
163        info!("Lighthouse engine stopped");
164        Ok(())
165    }
166
167    /// Handle incoming commands
168    async fn handle_command(&self, command: EngineCommand) -> LighthouseResult<()> {
169        match command {
170            EngineCommand::UpdateMetrics(metrics) => {
171                self.update_metrics(metrics).await?;
172            }
173            EngineCommand::UpdateConfig(new_config) => {
174                self.update_config(new_config).await?;
175            }
176            EngineCommand::GetRecommendation { resource_id, response } => {
177                let recommendation = self.get_recommendation(&resource_id).await;
178                let _ = response.send(recommendation); // Ignore send errors
179            }
180            EngineCommand::GetStatus { response } => {
181                let status = self.status.read().await.clone();
182                let _ = response.send(status); // Ignore send errors
183            }
184            EngineCommand::Shutdown => {
185                info!("Shutdown command received");
186                return Err(LighthouseError::engine_not_running("Shutdown requested"));
187            }
188        }
189        Ok(())
190    }
191
192    /// Update metrics for a resource
193    async fn update_metrics(&self, metrics: ResourceMetrics) -> LighthouseResult<()> {
194        // Validate metrics using callback
195        let context = CallbackContext {
196            timestamp: current_timestamp(),
197            metadata: HashMap::new(),
198        };
199
200        let validated_metrics = self.callbacks.metrics_provider
201            .validate_metrics(&metrics, &context).await?;
202
203        if let Some(validated) = validated_metrics {
204            let mut cache = self.metrics_cache.write().await;
205            cache.insert(validated.resource_id.clone(), validated);
206
207            // Update status
208            let mut status = self.status.write().await;
209            status.resources_tracked = cache.len();
210        }
211
212        Ok(())
213    }
214
215    /// Update the engine configuration
216    async fn update_config(&self, new_config: LighthouseConfig) -> LighthouseResult<()> {
217        let mut config = self.config.write().await;
218        *config = new_config;
219        info!("Configuration updated");
220        Ok(())
221    }
222
223    /// Get scaling recommendation for a specific resource
224    async fn get_recommendation(&self, resource_id: &ResourceId) -> LighthouseResult<Option<ScaleAction>> {
225        let metrics_cache = self.metrics_cache.read().await;
226        let config = self.config.read().await;
227
228        let metrics = match metrics_cache.get(resource_id) {
229            Some(m) => m,
230            None => return Ok(None), // No metrics available
231        };
232
233        // Find appropriate scaling policy
234        let resource_config = config.resource_configs.get(&metrics.resource_type);
235        let resource_config = match resource_config {
236            Some(rc) => rc,
237            None => return Ok(None), // No configuration for this resource type
238        };
239
240        // Evaluate all policies for this resource
241        for policy in &resource_config.policies {
242            if !policy.enabled {
243                continue;
244            }
245
246            if let Some(action) = self.evaluate_policy(metrics, policy).await? {
247                // Check cooldown
248                let cooldown_tracker = self.cooldown_tracker.read().await;
249                let cooldown_seconds = policy.thresholds.iter()
250                    .map(|t| t.cooldown_seconds)
251                    .max()
252                    .unwrap_or(0);
253
254                if !cooldown_tracker.is_cooled_down(resource_id, cooldown_seconds) {
255                    debug!("Scaling action skipped due to cooldown: {}", resource_id);
256                    continue;
257                }
258
259                return Ok(Some(action));
260            }
261        }
262
263        Ok(None)
264    }
265
266    /// Evaluate a scaling policy against metrics
267    async fn evaluate_policy(
268        &self,
269        metrics: &ResourceMetrics,
270        policy: &ScalingPolicy,
271    ) -> LighthouseResult<Option<ScaleAction>> {
272        for threshold in &policy.thresholds {
273            let metric_value = match metrics.metrics.get(&threshold.metric_name) {
274                Some(v) => *v,
275                None => continue, // Metric not available
276            };
277
278            // Check for scale up
279            if metric_value > threshold.scale_up_threshold {
280                return Ok(Some(ScaleAction {
281                    resource_id: metrics.resource_id.clone(),
282                    resource_type: metrics.resource_type.clone(),
283                    direction: ScaleDirection::Up,
284                    target_capacity: None,
285                    scale_factor: Some(threshold.scale_factor),
286                    reason: format!(
287                        "{} ({:.2}) exceeded scale-up threshold ({:.2})",
288                        threshold.metric_name, metric_value, threshold.scale_up_threshold
289                    ),
290                    confidence: 0.8, // TODO: Make this configurable
291                    timestamp: current_timestamp(),
292                    metadata: HashMap::new(),
293                }));
294            }
295
296            // Check for scale down
297            if metric_value < threshold.scale_down_threshold {
298                return Ok(Some(ScaleAction {
299                    resource_id: metrics.resource_id.clone(),
300                    resource_type: metrics.resource_type.clone(),
301                    direction: ScaleDirection::Down,
302                    target_capacity: None,
303                    scale_factor: Some(1.0 / threshold.scale_factor),
304                    reason: format!(
305                        "{} ({:.2}) below scale-down threshold ({:.2})",
306                        threshold.metric_name, metric_value, threshold.scale_down_threshold
307                    ),
308                    confidence: 0.8, // TODO: Make this configurable
309                    timestamp: current_timestamp(),
310                    metadata: HashMap::new(),
311                }));
312            }
313        }
314
315        Ok(None)
316    }
317
318    /// Evaluate all resources and execute scaling actions
319    async fn evaluate_all_resources(&self) -> LighthouseResult<()> {
320        let metrics_cache = self.metrics_cache.read().await;
321        let resource_ids: Vec<ResourceId> = metrics_cache.keys().cloned().collect();
322        drop(metrics_cache);
323
324        let mut status = self.status.write().await;
325        status.last_evaluation = Some(current_timestamp());
326        drop(status);
327
328        for resource_id in resource_ids {
329            if let Some(action) = self.get_recommendation(&resource_id).await? {
330                if let Err(e) = self.execute_scaling_action(action).await {
331                    error!("Failed to execute scaling action for {}: {}", resource_id, e);
332                }
333            }
334        }
335
336        Ok(())
337    }
338
339    /// Execute a scaling action
340    async fn execute_scaling_action(&self, action: ScaleAction) -> LighthouseResult<()> {
341        let context = CallbackContext {
342            timestamp: current_timestamp(),
343            metadata: HashMap::new(),
344        };
345
346        // Notify observers of decision
347        for observer in &self.callbacks.observers {
348            if let Err(e) = observer.on_scaling_decision(&action, &context).await {
349                warn!("Observer error on scaling decision: {}", e);
350            }
351        }
352
353        // Check safety
354        let is_safe = self.callbacks.scaling_executor
355            .is_safe_to_scale(&action, &context).await?;
356
357        if !is_safe {
358            for observer in &self.callbacks.observers {
359                if let Err(e) = observer.on_scaling_skipped(&action, "Safety check failed", &context).await {
360                    warn!("Observer error on scaling skipped: {}", e);
361                }
362            }
363            return Ok(());
364        }
365
366        // Execute the action
367        match self.callbacks.scaling_executor.execute_scale_action(&action, &context).await {
368            Ok(executed) => {
369                if executed {
370                    // Record successful scaling
371                    let mut cooldown_tracker = self.cooldown_tracker.write().await;
372                    cooldown_tracker.record_scale_action(&action.resource_id);
373
374                    let mut status = self.status.write().await;
375                    status.total_recommendations += 1;
376
377                    info!("Scaling action executed: {} - {}", action.resource_id, action.reason);
378                }
379
380                // Notify observers
381                for observer in &self.callbacks.observers {
382                    if let Err(e) = observer.on_scaling_executed(&action, executed, &context).await {
383                        warn!("Observer error on scaling executed: {}", e);
384                    }
385                }
386            }
387            Err(e) => {
388                error!("Failed to execute scaling action: {}", e);
389                
390                // Notify observers of error
391                for observer in &self.callbacks.observers {
392                    if let Err(err) = observer.on_scaling_error(&action, &e, &context).await {
393                        warn!("Observer error on scaling error: {}", err);
394                    }
395                }
396
397                return Err(e);
398            }
399        }
400
401        Ok(())
402    }
403}
404
405/// Handle for interacting with a running lighthouse engine
406#[derive(Clone)]
407pub struct LighthouseHandle {
408    command_tx: mpsc::UnboundedSender<EngineCommand>,
409}
410
411impl LighthouseHandle {
412    /// Send metrics to the engine
413    pub async fn update_metrics(&self, metrics: ResourceMetrics) -> LighthouseResult<()> {
414        self.command_tx.send(EngineCommand::UpdateMetrics(metrics))?;
415        Ok(())
416    }
417
418    /// Update the engine configuration
419    pub async fn update_config(&self, config: LighthouseConfig) -> LighthouseResult<()> {
420        self.command_tx.send(EngineCommand::UpdateConfig(config))?;
421        Ok(())
422    }
423
424    /// Get a scaling recommendation for a resource
425    pub async fn get_recommendation(&self, resource_id: ResourceId) -> LighthouseResult<Option<ScaleAction>> {
426        let (response_tx, response_rx) = tokio::sync::oneshot::channel();
427        self.command_tx.send(EngineCommand::GetRecommendation {
428            resource_id,
429            response: response_tx,
430        })?;
431        response_rx.await?
432    }
433
434    /// Get current engine status
435    pub async fn get_status(&self) -> LighthouseResult<EngineStatus> {
436        let (response_tx, response_rx) = tokio::sync::oneshot::channel();
437        self.command_tx.send(EngineCommand::GetStatus {
438            response: response_tx,
439        })?;
440        Ok(response_rx.await?)
441    }
442
443    /// Shutdown the engine
444    pub async fn shutdown(&self) -> LighthouseResult<()> {
445        self.command_tx.send(EngineCommand::Shutdown)?;
446        Ok(())
447    }
448}
449
450/// Get current Unix timestamp
451fn current_timestamp() -> u64 {
452    SystemTime::now()
453        .duration_since(UNIX_EPOCH)
454        .unwrap_or_default()
455        .as_secs()
456}