omni_orchestrator/worker_autoscaler/
director.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Instant;
4use log::{info, error};
5use async_trait::async_trait;
6
7use super::error::AutoscalerError;
8use super::node_types::{Node, NodeType};
9use super::vm::{VM, VMState};
10
11/// Interface for a director that manages VM operations on nodes
12#[async_trait]
13pub trait Director: Send + Sync + std::fmt::Debug {
14    /// Get the unique ID of this director
15    async fn id(&self) -> String;
16    
17    /// Get the nodes managed by this director
18    async fn get_nodes(&self) -> Result<Vec<Node>, AutoscalerError>;
19    
20    /// Get information about a specific node
21    async fn get_node(&self, node_id: &str) -> Result<Node, AutoscalerError>;
22    
23    /// Create a new VM on a specific node
24    async fn create_vm(&self, node_id: &str, name: &str, cpu: u32, memory: u32, storage: u32) 
25        -> Result<VM, AutoscalerError>;
26    
27    /// Terminate a VM
28    async fn terminate_vm(&self, vm_id: &str) -> Result<(), AutoscalerError>;
29    
30    /// Get information about a specific VM
31    async fn get_vm(&self, vm_id: &str) -> Result<VM, AutoscalerError>;
32    
33    /// Get all VMs managed by this director
34    async fn get_vms(&self) -> Result<Vec<VM>, AutoscalerError>;
35    
36    /// Get metrics for a specific VM
37    async fn get_vm_metrics(&self, vm_id: &str) -> Result<HashMap<String, f32>, AutoscalerError>;
38}
39
40/// Implementation of a cloud director (AWS, Azure, GCP)
41#[derive(Debug)]
42pub struct CloudDirector {
43    /// Unique ID of this director
44    id: String,
45    /// Name of the cloud provider
46    provider: String,
47    /// Region for this cloud provider
48    region: String,
49    /// Simulated nodes for this director
50    nodes: Arc<Mutex<HashMap<String, Node>>>,
51    /// Simulated VMs for this director
52    vms: Arc<Mutex<HashMap<String, VM>>>,
53}
54
55impl CloudDirector {
56    /// Create a new cloud director
57    pub fn new(id: String, provider: String, region: String) -> Self {
58        let mut nodes = HashMap::new();
59        
60        // Create a single "infinite" capacity cloud node
61        let node_id = format!("{}-{}-node", provider, region);
62        let node = Node::new_cloud(
63            node_id.clone(),
64            format!("{} {} Default Node", provider, region),
65            id.clone(),
66        );
67        
68        nodes.insert(node_id, node);
69        
70        Self {
71            id,
72            provider,
73            region,
74            nodes: Arc::new(Mutex::new(nodes)),
75            vms: Arc::new(Mutex::new(HashMap::new())),
76        }
77    }
78}
79
80#[async_trait]
81impl Director for CloudDirector {
82    async fn id(&self) -> String {
83        self.id.clone()
84    }
85    
86    async fn get_nodes(&self) -> Result<Vec<Node>, AutoscalerError> {
87        let nodes = self.nodes.lock().unwrap();
88        Ok(nodes.values().cloned().collect())
89    }
90    
91    async fn get_node(&self, node_id: &str) -> Result<Node, AutoscalerError> {
92        let nodes = self.nodes.lock().unwrap();
93        nodes.get(node_id).cloned().ok_or_else(|| 
94            AutoscalerError::NodeNotFound(format!("Node {} not found", node_id)))
95    }
96    
97    async fn create_vm(&self, node_id: &str, name: &str, cpu: u32, memory: u32, storage: u32) 
98        -> Result<VM, AutoscalerError> {
99        // Verify the node exists
100        let mut nodes = self.nodes.lock().unwrap();
101        let node = nodes.get_mut(node_id).ok_or_else(|| 
102            AutoscalerError::NodeNotFound(format!("Node {} not found", node_id)))?;
103        
104        // Reserve capacity on the node
105        node.reserve_capacity(cpu, memory, storage)?;
106        
107        // Create a new VM
108        let vm_id = format!("{}-{}", node_id, uuid::Uuid::new_v4());
109        let vm = VM::new(
110            vm_id.clone(),
111            name.to_string(),
112            node_id.to_string(),
113            cpu,
114            memory,
115            storage,
116        );
117        
118        // Store the VM
119        let mut vms = self.vms.lock().unwrap();
120        vms.insert(vm_id, vm.clone());
121        
122        // Simulate API call to cloud provider
123        info!("Cloud Director {} creating VM {} on node {}", self.id, vm.id, node_id);
124        
125        // In a real implementation, this would make API calls to the cloud provider
126        
127        Ok(vm)
128    }
129    
130    async fn terminate_vm(&self, vm_id: &str) -> Result<(), AutoscalerError> {
131        // Find the VM
132        let mut vms = self.vms.lock().unwrap();
133        let vm = vms.get_mut(vm_id).ok_or_else(|| 
134            AutoscalerError::VMNotFound(format!("VM {} not found", vm_id)))?;
135        
136        // Update VM state
137        vm.state = VMState::Terminating;
138        vm.updated_at = Instant::now();
139        
140        // Simulate API call to cloud provider
141        info!("Cloud Director {} terminating VM {}", self.id, vm_id);
142        
143        // Release capacity on the node
144        let mut nodes = self.nodes.lock().unwrap();
145        if let Some(node) = nodes.get_mut(&vm.node_id) {
146            node.release_capacity(vm.cpu, vm.memory, vm.storage);
147        }
148        
149        // In a real implementation, this would make API calls to the cloud provider
150        
151        // Mark VM as terminated
152        vm.state = VMState::Terminated;
153        
154        Ok(())
155    }
156    
157    async fn get_vm(&self, vm_id: &str) -> Result<VM, AutoscalerError> {
158        let vms = self.vms.lock().unwrap();
159        vms.get(vm_id).cloned().ok_or_else(|| 
160            AutoscalerError::VMNotFound(format!("VM {} not found", vm_id)))
161    }
162    
163    async fn get_vms(&self) -> Result<Vec<VM>, AutoscalerError> {
164        let vms = self.vms.lock().unwrap();
165        Ok(vms.values().cloned().collect())
166    }
167    
168    async fn get_vm_metrics(&self, vm_id: &str) -> Result<HashMap<String, f32>, AutoscalerError> {
169        // Verify the VM exists
170        let vms = self.vms.lock().unwrap();
171        let _vm = vms.get(vm_id).ok_or_else(|| 
172            AutoscalerError::VMNotFound(format!("VM {} not found", vm_id)))?;
173        
174        // Simulate gathering metrics
175        // In a real implementation, this would make API calls to the cloud provider
176        let mut metrics = HashMap::new();
177        metrics.insert("cpu_utilization".to_string(), 50.0 + (rand::random::<f32>() * 30.0 - 15.0));
178        metrics.insert("memory_utilization".to_string(), 60.0 + (rand::random::<f32>() * 20.0 - 10.0));
179        
180        Ok(metrics)
181    }
182}