omni_orchestrator/worker_autoscaler/
director.rs1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Instant;
4use log::{info, error};
5use async_trait::async_trait;
6
7use super::error::AutoscalerError;
8use super::node_types::{Node, NodeType};
9use super::vm::{VM, VMState};
10
11#[async_trait]
13pub trait Director: Send + Sync + std::fmt::Debug {
14 async fn id(&self) -> String;
16
17 async fn get_nodes(&self) -> Result<Vec<Node>, AutoscalerError>;
19
20 async fn get_node(&self, node_id: &str) -> Result<Node, AutoscalerError>;
22
23 async fn create_vm(&self, node_id: &str, name: &str, cpu: u32, memory: u32, storage: u32)
25 -> Result<VM, AutoscalerError>;
26
27 async fn terminate_vm(&self, vm_id: &str) -> Result<(), AutoscalerError>;
29
30 async fn get_vm(&self, vm_id: &str) -> Result<VM, AutoscalerError>;
32
33 async fn get_vms(&self) -> Result<Vec<VM>, AutoscalerError>;
35
36 async fn get_vm_metrics(&self, vm_id: &str) -> Result<HashMap<String, f32>, AutoscalerError>;
38}
39
40#[derive(Debug)]
42pub struct CloudDirector {
43 id: String,
45 provider: String,
47 region: String,
49 nodes: Arc<Mutex<HashMap<String, Node>>>,
51 vms: Arc<Mutex<HashMap<String, VM>>>,
53}
54
55impl CloudDirector {
56 pub fn new(id: String, provider: String, region: String) -> Self {
58 let mut nodes = HashMap::new();
59
60 let node_id = format!("{}-{}-node", provider, region);
62 let node = Node::new_cloud(
63 node_id.clone(),
64 format!("{} {} Default Node", provider, region),
65 id.clone(),
66 );
67
68 nodes.insert(node_id, node);
69
70 Self {
71 id,
72 provider,
73 region,
74 nodes: Arc::new(Mutex::new(nodes)),
75 vms: Arc::new(Mutex::new(HashMap::new())),
76 }
77 }
78}
79
80#[async_trait]
81impl Director for CloudDirector {
82 async fn id(&self) -> String {
83 self.id.clone()
84 }
85
86 async fn get_nodes(&self) -> Result<Vec<Node>, AutoscalerError> {
87 let nodes = self.nodes.lock().unwrap();
88 Ok(nodes.values().cloned().collect())
89 }
90
91 async fn get_node(&self, node_id: &str) -> Result<Node, AutoscalerError> {
92 let nodes = self.nodes.lock().unwrap();
93 nodes.get(node_id).cloned().ok_or_else(||
94 AutoscalerError::NodeNotFound(format!("Node {} not found", node_id)))
95 }
96
97 async fn create_vm(&self, node_id: &str, name: &str, cpu: u32, memory: u32, storage: u32)
98 -> Result<VM, AutoscalerError> {
99 let mut nodes = self.nodes.lock().unwrap();
101 let node = nodes.get_mut(node_id).ok_or_else(||
102 AutoscalerError::NodeNotFound(format!("Node {} not found", node_id)))?;
103
104 node.reserve_capacity(cpu, memory, storage)?;
106
107 let vm_id = format!("{}-{}", node_id, uuid::Uuid::new_v4());
109 let vm = VM::new(
110 vm_id.clone(),
111 name.to_string(),
112 node_id.to_string(),
113 cpu,
114 memory,
115 storage,
116 );
117
118 let mut vms = self.vms.lock().unwrap();
120 vms.insert(vm_id, vm.clone());
121
122 info!("Cloud Director {} creating VM {} on node {}", self.id, vm.id, node_id);
124
125 Ok(vm)
128 }
129
130 async fn terminate_vm(&self, vm_id: &str) -> Result<(), AutoscalerError> {
131 let mut vms = self.vms.lock().unwrap();
133 let vm = vms.get_mut(vm_id).ok_or_else(||
134 AutoscalerError::VMNotFound(format!("VM {} not found", vm_id)))?;
135
136 vm.state = VMState::Terminating;
138 vm.updated_at = Instant::now();
139
140 info!("Cloud Director {} terminating VM {}", self.id, vm_id);
142
143 let mut nodes = self.nodes.lock().unwrap();
145 if let Some(node) = nodes.get_mut(&vm.node_id) {
146 node.release_capacity(vm.cpu, vm.memory, vm.storage);
147 }
148
149 vm.state = VMState::Terminated;
153
154 Ok(())
155 }
156
157 async fn get_vm(&self, vm_id: &str) -> Result<VM, AutoscalerError> {
158 let vms = self.vms.lock().unwrap();
159 vms.get(vm_id).cloned().ok_or_else(||
160 AutoscalerError::VMNotFound(format!("VM {} not found", vm_id)))
161 }
162
163 async fn get_vms(&self) -> Result<Vec<VM>, AutoscalerError> {
164 let vms = self.vms.lock().unwrap();
165 Ok(vms.values().cloned().collect())
166 }
167
168 async fn get_vm_metrics(&self, vm_id: &str) -> Result<HashMap<String, f32>, AutoscalerError> {
169 let vms = self.vms.lock().unwrap();
171 let _vm = vms.get(vm_id).ok_or_else(||
172 AutoscalerError::VMNotFound(format!("VM {} not found", vm_id)))?;
173
174 let mut metrics = HashMap::new();
177 metrics.insert("cpu_utilization".to_string(), 50.0 + (rand::random::<f32>() * 30.0 - 15.0));
178 metrics.insert("memory_utilization".to_string(), 60.0 + (rand::random::<f32>() * 20.0 - 10.0));
179
180 Ok(metrics)
181 }
182}