omni_orchestrator/app_autoscaler/
agent.rs1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Instant;
4use log::{info, error};
5use async_trait::async_trait;
6use rand;
7
8use super::error::AutoscalerError;
9use super::node_types::{Node, NodeType};
10use super::app::{AppInstance, AppInstanceState};
11
12#[async_trait]
14pub trait Agent: Send + Sync + std::fmt::Debug {
15 async fn id(&self) -> String;
17
18 async fn get_nodes(&self) -> Result<Vec<Node>, AutoscalerError>;
20
21 async fn get_node(&self, node_id: &str) -> Result<Node, AutoscalerError>;
23
24 async fn create_instance(&self, node_id: &str, name: &str, cpu: u32, memory: u32, storage: u32)
26 -> Result<AppInstance, AutoscalerError>;
27
28 async fn terminate_instance(&self, instance_id: &str) -> Result<(), AutoscalerError>;
30
31 async fn get_instance(&self, instance_id: &str) -> Result<AppInstance, AutoscalerError>;
33
34 async fn get_instances(&self) -> Result<Vec<AppInstance>, AutoscalerError>;
36
37 async fn get_instance_metrics(&self, instance_id: &str) -> Result<HashMap<String, f32>, AutoscalerError>;
39}
40
41#[derive(Debug)]
43pub struct CloudAgent {
44 id: String,
46 provider: String,
48 region: String,
50 nodes: Arc<Mutex<HashMap<String, Node>>>,
52 instances: Arc<Mutex<HashMap<String, AppInstance>>>,
54}
55
56impl CloudAgent {
57 pub fn new(id: String, provider: String, region: String) -> Self {
59 let mut nodes = HashMap::new();
60
61 let node_id = format!("{}-{}-node", provider, region);
63 let node = Node::new_cloud(
64 node_id.clone(),
65 format!("{} {} Default Node", provider, region),
66 id.clone(),
67 );
68
69 nodes.insert(node_id, node);
70
71 Self {
72 id,
73 provider,
74 region,
75 nodes: Arc::new(Mutex::new(nodes)),
76 instances: Arc::new(Mutex::new(HashMap::new())),
77 }
78 }
79}
80
81#[async_trait]
82impl Agent for CloudAgent {
83 async fn id(&self) -> String {
84 self.id.clone()
85 }
86
87 async fn get_nodes(&self) -> Result<Vec<Node>, AutoscalerError> {
88 let nodes = self.nodes.lock().unwrap();
89 Ok(nodes.values().cloned().collect())
90 }
91
92 async fn get_node(&self, node_id: &str) -> Result<Node, AutoscalerError> {
93 let nodes = self.nodes.lock().unwrap();
94 nodes.get(node_id).cloned().ok_or_else(||
95 AutoscalerError::NodeNotFound(format!("Node {} not found", node_id)))
96 }
97
98 async fn create_instance(&self, node_id: &str, name: &str, cpu: u32, memory: u32, storage: u32)
99 -> Result<AppInstance, AutoscalerError> {
100 let mut nodes = self.nodes.lock().unwrap();
102 let node = nodes.get_mut(node_id).ok_or_else(||
103 AutoscalerError::NodeNotFound(format!("Node {} not found", node_id)))?;
104
105 node.reserve_capacity(cpu, memory, storage)?;
107
108 let instance_id = format!("{}-{}", node_id, uuid::Uuid::new_v4());
110 let instance = AppInstance::new(
111 instance_id.clone(),
112 name.to_string(),
113 node_id.to_string(),
114 cpu,
115 memory,
116 storage,
117 );
118
119 let mut instances = self.instances.lock().unwrap();
121 instances.insert(instance_id, instance.clone());
122
123 info!("Cloud Agent {} creating app instance {} on node {}", self.id, instance.id, node_id);
125
126 Ok(instance)
129 }
130
131 async fn terminate_instance(&self, instance_id: &str) -> Result<(), AutoscalerError> {
132 let mut instances = self.instances.lock().unwrap();
134 let instance = instances.get_mut(instance_id).ok_or_else(||
135 AutoscalerError::InstanceNotFound(format!("App instance {} not found", instance_id)))?;
136
137 instance.state = AppInstanceState::Terminating;
139 instance.updated_at = Instant::now();
140
141 info!("Cloud Agent {} terminating app instance {}", self.id, instance_id);
143
144 let mut nodes = self.nodes.lock().unwrap();
146 if let Some(node) = nodes.get_mut(&instance.node_id) {
147 node.release_capacity(instance.cpu, instance.memory, instance.storage);
148 }
149
150 instance.state = AppInstanceState::Terminated;
154
155 Ok(())
156 }
157
158 async fn get_instance(&self, instance_id: &str) -> Result<AppInstance, AutoscalerError> {
159 let instances = self.instances.lock().unwrap();
160 instances.get(instance_id).cloned().ok_or_else(||
161 AutoscalerError::InstanceNotFound(format!("App instance {} not found", instance_id)))
162 }
163
164 async fn get_instances(&self) -> Result<Vec<AppInstance>, AutoscalerError> {
165 let instances = self.instances.lock().unwrap();
166 Ok(instances.values().cloned().collect())
167 }
168
169 async fn get_instance_metrics(&self, instance_id: &str) -> Result<HashMap<String, f32>, AutoscalerError> {
170 let instances = self.instances.lock().unwrap();
172 let _instance = instances.get(instance_id).ok_or_else(||
173 AutoscalerError::InstanceNotFound(format!("App instance {} not found", instance_id)))?;
174
175 let mut metrics = HashMap::new();
178 metrics.insert("cpu_utilization".to_string(), 50.0 + (rand::random::<f32>() * 30.0 - 15.0));
179 metrics.insert("memory_utilization".to_string(), 60.0 + (rand::random::<f32>() * 20.0 - 10.0));
180
181 Ok(metrics)
182 }
183}