omni_forge/autoscalar/
mod.rs

1use std::sync::{ Arc, Mutex };
2use std::collections::HashMap;
3use std::time::Duration;
4use anyhow::Result;
5
6/// A struct that represents the metrics that can be gathered from an AppInstance
7/// # Fields
8///
9/// * `cpu_load` - Generally applicable CPU load for any container (Gathered from the runtime if supported)
10/// * `ram_pressure` - Generally applicable memory pressure for any container (Gathered from the runtime if supported)
11/// * `ram_usage` - Generally applicable memory usage for any container (Gathered from the runtime if supported)
12/// * `clients` - For more advanced services that are client-aware
13/// * `app_response_time` - For more advanced use cases in which an in-container monitor exists to monitor app response time
14/// * `network_latency` - For more advanced use cases in which an in-container monitor exists to monitor external response time
15/// * `disk_bandwidth` - Generally applicable disk bandwidth usage for any container (Gathered from the runtime if supported)
16///
17/// # Example
18/// ```
19/// let metric = InstanceMetrics {
20///    cpu_load: Some(50),
21///    ram_pressure: Some(50),
22///    ram_usage: Some(50),
23///    clients: Some(50),
24///    app_response_time: Some(50),
25///    network_latency: Some(50),
26///    disk_bandwidth: Some(50),
27/// };
28/// ```
29#[derive(Default, Debug)]
30struct InstanceMetrics {
31    cpu_load:          Option<u64>,
32    ram_pressure:      Option<u64>,
33    ram_usage:         Option<u64>,
34    clients:           Option<u64>,
35    app_response_time: Option<u64>,
36    network_latency:   Option<u64>,
37    disk_bandwidth:    Option<u64>,
38}
39
40impl InstanceMetrics {
41    fn new(
42        cpu_load:          Option<u64>,
43        ram_pressure:      Option<u64>,
44        ram_usage:         Option<u64>,
45        clients:           Option<u64>,
46        app_response_time: Option<u64>,
47        network_latency:   Option<u64>,
48        disk_bandwidth:    Option<u64>
49    ) -> Self {
50        InstanceMetrics {
51            cpu_load,
52            ram_pressure,
53            ram_usage,
54            clients,
55            app_response_time,
56            network_latency,
57            disk_bandwidth,
58        }
59    }
60    fn get_fields(&self) -> Vec<(&str, Option<u64>)> {
61        vec![
62            ("cpu_load", self.cpu_load),
63            ("ram_pressure", self.ram_pressure),
64            ("ram_usage", self.ram_usage),
65            ("clients", self.clients),
66            ("app_response_time", self.app_response_time),
67            ("network_latency", self.network_latency),
68            ("disk_bandwith", self.disk_bandwidth)
69        ]
70    }
71    fn set_field(&mut self, field_name: &str, field_value: Option<u64>) {}
72}
73
74enum ScaleAction {
75    ScaleUp,
76    ScaleDown,
77    ScaleLeft,
78    ScaleRight,
79    NoAction,
80}
81
82#[derive(Hash, Eq, PartialEq, Debug)]
83enum ResourceType {
84    CPU,
85    RAM,
86    Clients,
87    ResponseTime,
88}
89
90struct AutoscalerThresholds {
91    cpu_threshold:           u64,
92    ram_threshold:           u64,
93    client_threshold:        u64,
94    response_time_threshold: u64,
95}
96
97impl AutoscalerThresholds {
98    fn new(
99        cpu_threshold:           u64,
100        ram_threshold:           u64,
101        client_threshold:        u64,
102        response_time_threshold: u64,
103    ) -> Self {
104        AutoscalerThresholds {
105            cpu_threshold,
106            ram_threshold,
107            client_threshold,
108            response_time_threshold,
109        }
110    }
111
112    fn decide_all(&self, metrics: &InstanceMetrics) -> HashMap<ResourceType, ScaleAction> {
113        let mut actions = HashMap::new();
114
115        let cpu_action = match metrics.cpu_load {
116            Some(cpu_load) if cpu_load > self.cpu_threshold => ScaleAction::ScaleUp,
117            Some(cpu_load) if cpu_load < self.cpu_threshold / 2 => ScaleAction::ScaleDown,
118            _ => ScaleAction::NoAction,
119        };
120        actions.insert(ResourceType::CPU, cpu_action);
121
122        let ram_action = match metrics.ram_usage {
123            Some(ram_usage) if ram_usage > self.ram_threshold => ScaleAction::ScaleUp,
124            Some(ram_usage) if ram_usage < self.ram_threshold / 2 => ScaleAction::ScaleLeft,
125            _ => ScaleAction::NoAction,
126        };
127        actions.insert(ResourceType::RAM, ram_action);
128
129        let client_action = match metrics.clients {
130            Some(clients) if clients > self.client_threshold => ScaleAction::ScaleRight,
131            _ => ScaleAction::NoAction,
132        };
133        actions.insert(ResourceType::Clients, client_action);
134
135        let response_time_action = match metrics.app_response_time {
136            Some(response_time) if response_time > self.response_time_threshold => ScaleAction::ScaleRight,
137            _ => ScaleAction::NoAction,
138        };
139        actions.insert(ResourceType::ResponseTime, response_time_action);
140
141        actions // Return the actions to complete for each scaling category
142    }
143}
144
145/// A struct that represents an AppInstance
146/// # Fields
147/// allocated_memory - Allocated memory to the app instance in MB
148/// allocated_cpu - Allocated cpu to the app instance 100% per core
149/// allocated_disk_bandwidth - Allocated disk to the app instance in MB/s
150/// allocated_network_bandwidth - Allocated network to the app instance in MB/s
151///
152/// # Example
153/// ```
154/// let app_instance = AppInstance {
155///   allocated_memory: 1024,
156///   allocated_cpu: 100,
157///   allocated_disk_bandwidth: 100,
158///   allocated_network_bandwidth: 100,
159/// };
160/// ```
161struct AppInstance {
162    state: ApplicationState,
163    allocated_memory:            u64,
164    allocated_cpu:               u64,
165    allocated_disk_bandwidth:    u64,
166    allocated_network_bandwidth: u64,
167}
168
169/// An enum that represents the state of an application
170/// # Variants
171/// Healthy - Host is online and functioning within expected parameters.
172/// Suspicious - Host is online but is exhibiting behavior that is outside of the expected parameters.
173/// Erroneous - Host is exhibiting unexpected behavior and has been lowered on the load balancers priority list, avoiding routing traffic to the erroneous host if possible.
174/// Blacklisted - Host has been blacklisted from the network by an operator.
175/// Down - Host is offline or otherwise unreachable. (Its probably DNS's fault)
176/// 
177/// # Example
178/// ```
179/// let state = ApplicationState::Healthy;
180/// ```
181enum ApplicationState {
182    Healthy,
183    Suspicious,
184    Erroneous,
185    Blacklisted,
186    Down
187}
188
189/// A trait that represents an AutoScaler
190/// # Methods
191/// scale - Scales the app instance based on the metrics
192/// query - Queries the metrics
193///
194/// # Example
195/// ```
196/// struct ExampleScaler {
197///    metrics: Arc<Mutex<Metri>
198/// }
199///
200/// impl AutoScaler for ExampleScaler {
201///   fn query(&self) -> InstanceMetric {
202///    println!("Foo data");
203///   InstanceMetric::default()
204///  }
205/// }
206///
207/// fn run() {
208///  let example_scaler = ExampleScaler;
209/// test(example_scaler);
210/// }
211
212enum ScaleResult {
213    Success,
214}
215
216/// A trait that represents an AutoScaler
217/// # Methods
218/// scale - Scales the app instance based on the metrics
219/// query - Queries the metrics
220/// query_over_period - Queries the metrics over a period of time
221/// reallocate_memory - Set new targets for the total memory avail to an app, this will be split among the instances which are rounded up
222/// reallocate_cpu - Set new targets for the total cpu avail to an app, this will be split among the instances which are rounded up
223/// reallocate_disk_bandwidth - Set new targets for the total disk avail to an app, this will be split among the instances which are rounded up
224/// reallocate_network_bandwidth - Set new targets for the total network avail to an app, this will be split among the instances which are rounded up
225/// 
226/// # Example
227/// ```
228/// struct ExampleScaler {
229///   metrics: Arc<Mutex<InstanceMetrics>>,
230/// }
231trait AutoScaler {
232    fn scale(&mut self, instance: &mut AppInstance,action: ScaleAction) 
233      -> ScaleResult;
234    fn query(&self) -> InstanceMetrics;
235    async fn query_over_period(&self, duration: Duration)
236      -> Vec<(InstanceMetrics, chrono::DateTime<chrono::Utc>)>;
237    fn reallocate_memory(&self, megabytes: u64);                      
238    fn reallocate_cpu(&self, percentage: u64);                        
239    fn reallocate_disk_bandwidth(&self, megabytes_per_second: u64);   
240    fn reallocate_network_bandwidth(&self, megabytes_per_second: u64);
241}
242
243struct ExampleScaler {
244    metrics: Arc<Mutex<InstanceMetrics>>,
245}
246
247impl AutoScaler for ExampleScaler {
248    fn query(&self) -> InstanceMetrics {
249        let mut metric_collection = InstanceMetrics::new(None, None, None, None, None, None, None);
250        for field in self.metrics.try_lock().unwrap().get_fields() {
251            println!("{field:#?}");
252            metric_collection.set_field(field.0, field.1);
253        }
254        println!("{:#?}", self.metrics.lock().unwrap().get_fields());
255
256        metric_collection
257    }
258    
259    fn scale(&mut self, instance: &mut AppInstance, action: ScaleAction) -> ScaleResult {
260        match action {
261            ScaleAction::ScaleUp => {
262                instance.allocated_cpu += 10;
263                instance.allocated_memory += 1024;
264            }
265            ScaleAction::ScaleDown => {
266                instance.allocated_cpu = instance.allocated_cpu.saturating_sub(10);
267                instance.allocated_memory = instance.allocated_memory.saturating_sub(1024);
268            }
269            ScaleAction::ScaleLeft => {
270                instance.allocated_disk_bandwidth = instance.allocated_disk_bandwidth.saturating_sub(10);
271            }
272            ScaleAction::ScaleRight => {
273                instance.allocated_network_bandwidth += 10;
274            }
275            ScaleAction::NoAction => {}
276        }
277        ScaleResult::Success
278    }
279    
280    async fn query_over_period(&self, duration: Duration) -> Vec<(InstanceMetrics, chrono::DateTime<chrono::Utc>)> {
281        let mut results = Vec::new();
282        let start = chrono::offset::Utc::now();
283        let end = start + chrono::Duration::from_std(duration).unwrap();
284        let mut current_time = start;
285
286        while current_time < end {
287            let metrics = self.query();
288            results.push((metrics, current_time));
289            current_time = chrono::offset::Utc::now();
290            tokio::time::sleep(Duration::from_secs(1)).await;
291        }
292
293        results
294    }
295    
296    fn reallocate_memory(&self, megabytes: u64) {
297        if let Ok(mut metrics) = self.metrics.lock() {
298            metrics.ram_usage = Some(megabytes);
299        } else {
300            eprintln!("Failed to acquire lock for reallocate_memory");
301        }
302    }
303    
304    fn reallocate_cpu(&self, percentage: u64) {
305        if let Ok(mut metrics) = self.metrics.lock() {
306            metrics.cpu_load = Some(percentage);
307        } else {
308            eprintln!("Failed to acquire lock for reallocate_cpu");
309        }
310    }
311    
312    fn reallocate_disk_bandwidth(&self, megabytes_per_second: u64) {
313        if let Ok(mut metrics) = self.metrics.lock() {
314            metrics.disk_bandwidth = Some(megabytes_per_second);
315        } else {
316            eprintln!("Failed to acquire lock for reallocate_disk_bandwidth");
317        }
318    }
319    
320    fn reallocate_network_bandwidth(&self, megabytes_per_second: u64) {
321        if let Ok(mut metrics) = self.metrics.lock() {
322            metrics.network_latency = Some(megabytes_per_second);
323        } else {
324            eprintln!("Failed to acquire lock for reallocate_network_bandwidth");
325        }
326    }
327}
328
329/// A function that runs the example
330#[test]
331fn run() {
332    let metrics = InstanceMetrics::new(Some(75), Some(60), Some(100), Some(150), Some(200), Some(50), Some(100));
333    let autoscaler = AutoscalerThresholds::new(80, 75, 100, 180);
334
335    let actions = autoscaler.decide_all(&metrics);
336    for (resource, action) in actions {
337        match action {
338            ScaleAction::ScaleUp => println!("Scaling up {:?}...", resource),
339            ScaleAction::ScaleDown => println!("Scaling down {:?}...", resource),
340            ScaleAction::ScaleLeft => println!("Scaling left {:?}...", resource),
341            ScaleAction::ScaleRight => println!("Scaling right {:?}...", resource),
342            ScaleAction::NoAction => println!("No scaling action needed for {:?}.", resource),
343        }
344    }
345}
346
347//#[cfg(test)]
348fn test(scaler: impl AutoScaler) {
349    scaler.query();
350}