1use std::collections::HashMap;
4use std::sync::Arc;
5use std::time::{SystemTime, UNIX_EPOCH};
6use tokio::sync::{mpsc, RwLock};
7use tokio::time::{interval, Duration};
8use tracing::{debug, error, info, warn};
9
10use crate::callbacks::{CallbackContext, LighthouseCallbacks};
11use crate::error::{LighthouseError, LighthouseResult};
12use crate::types::{
13 LighthouseConfig, ResourceId, ResourceMetrics, ScaleAction, ScaleDirection, ScalingPolicy,
14};
15
16#[derive(Debug)]
18pub enum EngineCommand {
19 UpdateMetrics(ResourceMetrics),
21 UpdateConfig(LighthouseConfig),
23 GetRecommendation {
25 resource_id: ResourceId,
26 response: tokio::sync::oneshot::Sender<LighthouseResult<Option<ScaleAction>>>,
27 },
28 GetStatus {
30 response: tokio::sync::oneshot::Sender<EngineStatus>,
31 },
32 Shutdown,
34}
35
36#[derive(Debug, Clone)]
38pub struct EngineStatus {
39 pub is_running: bool,
40 pub resources_tracked: usize,
41 pub total_recommendations: u64,
42 pub last_evaluation: Option<u64>,
43}
44
45#[derive(Debug)]
47struct CooldownTracker {
48 last_scale_time: HashMap<ResourceId, u64>,
49}
50
51impl CooldownTracker {
52 fn new() -> Self {
53 Self {
54 last_scale_time: HashMap::new(),
55 }
56 }
57
58 fn is_cooled_down(&self, resource_id: &ResourceId, cooldown_seconds: u64) -> bool {
59 let now = current_timestamp();
60 match self.last_scale_time.get(resource_id) {
61 Some(last_time) => now >= last_time + cooldown_seconds,
62 None => true, }
64 }
65
66 fn record_scale_action(&mut self, resource_id: &ResourceId) {
67 self.last_scale_time.insert(resource_id.clone(), current_timestamp());
68 }
69}
70
71pub struct LighthouseEngine {
73 config: Arc<RwLock<LighthouseConfig>>,
74 callbacks: LighthouseCallbacks,
75 command_tx: mpsc::UnboundedSender<EngineCommand>,
76 command_rx: Option<mpsc::UnboundedReceiver<EngineCommand>>,
77 status: Arc<RwLock<EngineStatus>>,
78 metrics_cache: Arc<RwLock<HashMap<ResourceId, ResourceMetrics>>>,
79 cooldown_tracker: Arc<RwLock<CooldownTracker>>,
80}
81
82impl LighthouseEngine {
83 pub fn new(config: LighthouseConfig, callbacks: LighthouseCallbacks) -> Self {
85 let (command_tx, command_rx) = mpsc::unbounded_channel();
86
87 Self {
88 config: Arc::new(RwLock::new(config)),
89 callbacks,
90 command_tx,
91 command_rx: Some(command_rx),
92 status: Arc::new(RwLock::new(EngineStatus {
93 is_running: false,
94 resources_tracked: 0,
95 total_recommendations: 0,
96 last_evaluation: None,
97 })),
98 metrics_cache: Arc::new(RwLock::new(HashMap::new())),
99 cooldown_tracker: Arc::new(RwLock::new(CooldownTracker::new())),
100 }
101 }
102
103 pub fn handle(&self) -> LighthouseHandle {
105 LighthouseHandle {
106 command_tx: self.command_tx.clone(),
107 }
108 }
109
110 pub async fn start(mut self) -> LighthouseResult<()> {
112 let mut command_rx = self.command_rx.take()
113 .ok_or_else(|| LighthouseError::engine_not_running("Engine already started"))?;
114
115 {
117 let mut status = self.status.write().await;
118 status.is_running = true;
119 }
120
121 info!("Lighthouse engine starting...");
122
123 let config = self.config.read().await;
125 let interval_duration = Duration::from_secs(config.evaluation_interval_seconds);
126 drop(config);
127
128 let mut evaluation_timer = interval(interval_duration);
129
130 loop {
132 tokio::select! {
133 command = command_rx.recv() => {
135 match command {
136 Some(cmd) => {
137 if let Err(e) = self.handle_command(cmd).await {
138 error!("Error handling command: {}", e);
139 }
140 }
141 None => {
142 info!("Command channel closed, shutting down engine");
143 break;
144 }
145 }
146 }
147
148 _ = evaluation_timer.tick() => {
150 if let Err(e) = self.evaluate_all_resources().await {
151 error!("Error during evaluation: {}", e);
152 }
153 }
154 }
155 }
156
157 {
159 let mut status = self.status.write().await;
160 status.is_running = false;
161 }
162
163 info!("Lighthouse engine stopped");
164 Ok(())
165 }
166
167 async fn handle_command(&self, command: EngineCommand) -> LighthouseResult<()> {
169 match command {
170 EngineCommand::UpdateMetrics(metrics) => {
171 self.update_metrics(metrics).await?;
172 }
173 EngineCommand::UpdateConfig(new_config) => {
174 self.update_config(new_config).await?;
175 }
176 EngineCommand::GetRecommendation { resource_id, response } => {
177 let recommendation = self.get_recommendation(&resource_id).await;
178 let _ = response.send(recommendation); }
180 EngineCommand::GetStatus { response } => {
181 let status = self.status.read().await.clone();
182 let _ = response.send(status); }
184 EngineCommand::Shutdown => {
185 info!("Shutdown command received");
186 return Err(LighthouseError::engine_not_running("Shutdown requested"));
187 }
188 }
189 Ok(())
190 }
191
192 async fn update_metrics(&self, metrics: ResourceMetrics) -> LighthouseResult<()> {
194 let context = CallbackContext {
196 timestamp: current_timestamp(),
197 metadata: HashMap::new(),
198 };
199
200 let validated_metrics = self.callbacks.metrics_provider
201 .validate_metrics(&metrics, &context).await?;
202
203 if let Some(validated) = validated_metrics {
204 let mut cache = self.metrics_cache.write().await;
205 cache.insert(validated.resource_id.clone(), validated);
206
207 let mut status = self.status.write().await;
209 status.resources_tracked = cache.len();
210 }
211
212 Ok(())
213 }
214
215 async fn update_config(&self, new_config: LighthouseConfig) -> LighthouseResult<()> {
217 let mut config = self.config.write().await;
218 *config = new_config;
219 info!("Configuration updated");
220 Ok(())
221 }
222
223 async fn get_recommendation(&self, resource_id: &ResourceId) -> LighthouseResult<Option<ScaleAction>> {
225 let metrics_cache = self.metrics_cache.read().await;
226 let config = self.config.read().await;
227
228 let metrics = match metrics_cache.get(resource_id) {
229 Some(m) => m,
230 None => return Ok(None), };
232
233 let resource_config = config.resource_configs.get(&metrics.resource_type);
235 let resource_config = match resource_config {
236 Some(rc) => rc,
237 None => return Ok(None), };
239
240 for policy in &resource_config.policies {
242 if !policy.enabled {
243 continue;
244 }
245
246 if let Some(action) = self.evaluate_policy(metrics, policy).await? {
247 let cooldown_tracker = self.cooldown_tracker.read().await;
249 let cooldown_seconds = policy.thresholds.iter()
250 .map(|t| t.cooldown_seconds)
251 .max()
252 .unwrap_or(0);
253
254 if !cooldown_tracker.is_cooled_down(resource_id, cooldown_seconds) {
255 debug!("Scaling action skipped due to cooldown: {}", resource_id);
256 continue;
257 }
258
259 return Ok(Some(action));
260 }
261 }
262
263 Ok(None)
264 }
265
266 async fn evaluate_policy(
268 &self,
269 metrics: &ResourceMetrics,
270 policy: &ScalingPolicy,
271 ) -> LighthouseResult<Option<ScaleAction>> {
272 for threshold in &policy.thresholds {
273 let metric_value = match metrics.metrics.get(&threshold.metric_name) {
274 Some(v) => *v,
275 None => continue, };
277
278 if metric_value > threshold.scale_up_threshold {
280 return Ok(Some(ScaleAction {
281 resource_id: metrics.resource_id.clone(),
282 resource_type: metrics.resource_type.clone(),
283 direction: ScaleDirection::Up,
284 target_capacity: None,
285 scale_factor: Some(threshold.scale_factor),
286 reason: format!(
287 "{} ({:.2}) exceeded scale-up threshold ({:.2})",
288 threshold.metric_name, metric_value, threshold.scale_up_threshold
289 ),
290 confidence: 0.8, timestamp: current_timestamp(),
292 metadata: HashMap::new(),
293 }));
294 }
295
296 if metric_value < threshold.scale_down_threshold {
298 return Ok(Some(ScaleAction {
299 resource_id: metrics.resource_id.clone(),
300 resource_type: metrics.resource_type.clone(),
301 direction: ScaleDirection::Down,
302 target_capacity: None,
303 scale_factor: Some(1.0 / threshold.scale_factor),
304 reason: format!(
305 "{} ({:.2}) below scale-down threshold ({:.2})",
306 threshold.metric_name, metric_value, threshold.scale_down_threshold
307 ),
308 confidence: 0.8, timestamp: current_timestamp(),
310 metadata: HashMap::new(),
311 }));
312 }
313 }
314
315 Ok(None)
316 }
317
318 async fn evaluate_all_resources(&self) -> LighthouseResult<()> {
320 let metrics_cache = self.metrics_cache.read().await;
321 let resource_ids: Vec<ResourceId> = metrics_cache.keys().cloned().collect();
322 drop(metrics_cache);
323
324 let mut status = self.status.write().await;
325 status.last_evaluation = Some(current_timestamp());
326 drop(status);
327
328 for resource_id in resource_ids {
329 if let Some(action) = self.get_recommendation(&resource_id).await? {
330 if let Err(e) = self.execute_scaling_action(action).await {
331 error!("Failed to execute scaling action for {}: {}", resource_id, e);
332 }
333 }
334 }
335
336 Ok(())
337 }
338
339 async fn execute_scaling_action(&self, action: ScaleAction) -> LighthouseResult<()> {
341 let context = CallbackContext {
342 timestamp: current_timestamp(),
343 metadata: HashMap::new(),
344 };
345
346 for observer in &self.callbacks.observers {
348 if let Err(e) = observer.on_scaling_decision(&action, &context).await {
349 warn!("Observer error on scaling decision: {}", e);
350 }
351 }
352
353 let is_safe = self.callbacks.scaling_executor
355 .is_safe_to_scale(&action, &context).await?;
356
357 if !is_safe {
358 for observer in &self.callbacks.observers {
359 if let Err(e) = observer.on_scaling_skipped(&action, "Safety check failed", &context).await {
360 warn!("Observer error on scaling skipped: {}", e);
361 }
362 }
363 return Ok(());
364 }
365
366 match self.callbacks.scaling_executor.execute_scale_action(&action, &context).await {
368 Ok(executed) => {
369 if executed {
370 let mut cooldown_tracker = self.cooldown_tracker.write().await;
372 cooldown_tracker.record_scale_action(&action.resource_id);
373
374 let mut status = self.status.write().await;
375 status.total_recommendations += 1;
376
377 info!("Scaling action executed: {} - {}", action.resource_id, action.reason);
378 }
379
380 for observer in &self.callbacks.observers {
382 if let Err(e) = observer.on_scaling_executed(&action, executed, &context).await {
383 warn!("Observer error on scaling executed: {}", e);
384 }
385 }
386 }
387 Err(e) => {
388 error!("Failed to execute scaling action: {}", e);
389
390 for observer in &self.callbacks.observers {
392 if let Err(err) = observer.on_scaling_error(&action, &e, &context).await {
393 warn!("Observer error on scaling error: {}", err);
394 }
395 }
396
397 return Err(e);
398 }
399 }
400
401 Ok(())
402 }
403}
404
405#[derive(Clone)]
407pub struct LighthouseHandle {
408 command_tx: mpsc::UnboundedSender<EngineCommand>,
409}
410
411impl LighthouseHandle {
412 pub async fn update_metrics(&self, metrics: ResourceMetrics) -> LighthouseResult<()> {
414 self.command_tx.send(EngineCommand::UpdateMetrics(metrics))?;
415 Ok(())
416 }
417
418 pub async fn update_config(&self, config: LighthouseConfig) -> LighthouseResult<()> {
420 self.command_tx.send(EngineCommand::UpdateConfig(config))?;
421 Ok(())
422 }
423
424 pub async fn get_recommendation(&self, resource_id: ResourceId) -> LighthouseResult<Option<ScaleAction>> {
426 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
427 self.command_tx.send(EngineCommand::GetRecommendation {
428 resource_id,
429 response: response_tx,
430 })?;
431 response_rx.await?
432 }
433
434 pub async fn get_status(&self) -> LighthouseResult<EngineStatus> {
436 let (response_tx, response_rx) = tokio::sync::oneshot::channel();
437 self.command_tx.send(EngineCommand::GetStatus {
438 response: response_tx,
439 })?;
440 Ok(response_rx.await?)
441 }
442
443 pub async fn shutdown(&self) -> LighthouseResult<()> {
445 self.command_tx.send(EngineCommand::Shutdown)?;
446 Ok(())
447 }
448}
449
450fn current_timestamp() -> u64 {
452 SystemTime::now()
453 .duration_since(UNIX_EPOCH)
454 .unwrap_or_default()
455 .as_secs()
456}