omni_orchestrator/app_autoscaler/
agent.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Instant;
4use log::{info, error};
5use async_trait::async_trait;
6use rand;
7
8use super::error::AutoscalerError;
9use super::node_types::{Node, NodeType};
10use super::app::{AppInstance, AppInstanceState};
11
12/// Interface for an agent that manages app instance operations on nodes
13#[async_trait]
14pub trait Agent: Send + Sync + std::fmt::Debug {
15    /// Get the unique ID of this agent
16    async fn id(&self) -> String;
17    
18    /// Get the nodes managed by this agent
19    async fn get_nodes(&self) -> Result<Vec<Node>, AutoscalerError>;
20    
21    /// Get information about a specific node
22    async fn get_node(&self, node_id: &str) -> Result<Node, AutoscalerError>;
23    
24    /// Create a new app instance on a specific node
25    async fn create_instance(&self, node_id: &str, name: &str, cpu: u32, memory: u32, storage: u32) 
26        -> Result<AppInstance, AutoscalerError>;
27    
28    /// Terminate an app instance
29    async fn terminate_instance(&self, instance_id: &str) -> Result<(), AutoscalerError>;
30    
31    /// Get information about a specific app instance
32    async fn get_instance(&self, instance_id: &str) -> Result<AppInstance, AutoscalerError>;
33    
34    /// Get all app instances managed by this agent
35    async fn get_instances(&self) -> Result<Vec<AppInstance>, AutoscalerError>;
36    
37    /// Get metrics for a specific app instance
38    async fn get_instance_metrics(&self, instance_id: &str) -> Result<HashMap<String, f32>, AutoscalerError>;
39}
40
41/// Implementation of a cloud agent (AWS, Azure, GCP)
42#[derive(Debug)]
43pub struct CloudAgent {
44    /// Unique ID of this agent
45    id: String,
46    /// Name of the cloud provider
47    provider: String,
48    /// Region for this cloud provider
49    region: String,
50    /// Simulated nodes for this agent
51    nodes: Arc<Mutex<HashMap<String, Node>>>,
52    /// Simulated app instances for this agent
53    instances: Arc<Mutex<HashMap<String, AppInstance>>>,
54}
55
56impl CloudAgent {
57    /// Create a new cloud agent
58    pub fn new(id: String, provider: String, region: String) -> Self {
59        let mut nodes = HashMap::new();
60        
61        // Create a single "infinite" capacity cloud node
62        let node_id = format!("{}-{}-node", provider, region);
63        let node = Node::new_cloud(
64            node_id.clone(),
65            format!("{} {} Default Node", provider, region),
66            id.clone(),
67        );
68        
69        nodes.insert(node_id, node);
70        
71        Self {
72            id,
73            provider,
74            region,
75            nodes: Arc::new(Mutex::new(nodes)),
76            instances: Arc::new(Mutex::new(HashMap::new())),
77        }
78    }
79}
80
81#[async_trait]
82impl Agent for CloudAgent {
83    async fn id(&self) -> String {
84        self.id.clone()
85    }
86    
87    async fn get_nodes(&self) -> Result<Vec<Node>, AutoscalerError> {
88        let nodes = self.nodes.lock().unwrap();
89        Ok(nodes.values().cloned().collect())
90    }
91    
92    async fn get_node(&self, node_id: &str) -> Result<Node, AutoscalerError> {
93        let nodes = self.nodes.lock().unwrap();
94        nodes.get(node_id).cloned().ok_or_else(|| 
95            AutoscalerError::NodeNotFound(format!("Node {} not found", node_id)))
96    }
97    
98    async fn create_instance(&self, node_id: &str, name: &str, cpu: u32, memory: u32, storage: u32) 
99        -> Result<AppInstance, AutoscalerError> {
100        // Verify the node exists
101        let mut nodes = self.nodes.lock().unwrap();
102        let node = nodes.get_mut(node_id).ok_or_else(|| 
103            AutoscalerError::NodeNotFound(format!("Node {} not found", node_id)))?;
104        
105        // Reserve capacity on the node
106        node.reserve_capacity(cpu, memory, storage)?;
107        
108        // Create a new app instance
109        let instance_id = format!("{}-{}", node_id, uuid::Uuid::new_v4());
110        let instance = AppInstance::new(
111            instance_id.clone(),
112            name.to_string(),
113            node_id.to_string(),
114            cpu,
115            memory,
116            storage,
117        );
118        
119        // Store the app instance
120        let mut instances = self.instances.lock().unwrap();
121        instances.insert(instance_id, instance.clone());
122        
123        // Simulate API call to cloud provider
124        info!("Cloud Agent {} creating app instance {} on node {}", self.id, instance.id, node_id);
125        
126        // In a real implementation, this would make API calls to the cloud provider
127        
128        Ok(instance)
129    }
130    
131    async fn terminate_instance(&self, instance_id: &str) -> Result<(), AutoscalerError> {
132        // Find the app instance
133        let mut instances = self.instances.lock().unwrap();
134        let instance = instances.get_mut(instance_id).ok_or_else(|| 
135            AutoscalerError::InstanceNotFound(format!("App instance {} not found", instance_id)))?;
136        
137        // Update app instance state
138        instance.state = AppInstanceState::Terminating;
139        instance.updated_at = Instant::now();
140        
141        // Simulate API call to cloud provider
142        info!("Cloud Agent {} terminating app instance {}", self.id, instance_id);
143        
144        // Release capacity on the node
145        let mut nodes = self.nodes.lock().unwrap();
146        if let Some(node) = nodes.get_mut(&instance.node_id) {
147            node.release_capacity(instance.cpu, instance.memory, instance.storage);
148        }
149        
150        // In a real implementation, this would make API calls to the cloud provider
151        
152        // Mark app instance as terminated
153        instance.state = AppInstanceState::Terminated;
154        
155        Ok(())
156    }
157    
158    async fn get_instance(&self, instance_id: &str) -> Result<AppInstance, AutoscalerError> {
159        let instances = self.instances.lock().unwrap();
160        instances.get(instance_id).cloned().ok_or_else(|| 
161            AutoscalerError::InstanceNotFound(format!("App instance {} not found", instance_id)))
162    }
163    
164    async fn get_instances(&self) -> Result<Vec<AppInstance>, AutoscalerError> {
165        let instances = self.instances.lock().unwrap();
166        Ok(instances.values().cloned().collect())
167    }
168    
169    async fn get_instance_metrics(&self, instance_id: &str) -> Result<HashMap<String, f32>, AutoscalerError> {
170        // Verify the app instance exists
171        let instances = self.instances.lock().unwrap();
172        let _instance = instances.get(instance_id).ok_or_else(|| 
173            AutoscalerError::InstanceNotFound(format!("App instance {} not found", instance_id)))?;
174        
175        // Simulate gathering metrics
176        // In a real implementation, this would make API calls to the cloud provider
177        let mut metrics = HashMap::new();
178        metrics.insert("cpu_utilization".to_string(), 50.0 + (rand::random::<f32>() * 30.0 - 15.0));
179        metrics.insert("memory_utilization".to_string(), 60.0 + (rand::random::<f32>() * 20.0 - 10.0));
180        
181        Ok(metrics)
182    }
183}