Lodestone/
util.rs

1// src/util.rs
2use anyhow::{Result, Context};
3use chrono::{DateTime, Duration, Utc};
4use rand::{thread_rng, Rng};
5use rand::distributions::Alphanumeric;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::iter;
9use std::net::{IpAddr, Ipv4Addr, SocketAddr};
10use std::path::Path;
11use std::sync::Arc;
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::fs;
14use tokio::io::AsyncWriteExt;
15use tokio::sync::RwLock;
16use tokio::time::sleep;
17use tracing::{debug, info, warn, error};
18use uuid::Uuid;
19
20/// Create a random ID
21pub fn random_id() -> String {
22    Uuid::new_v4().to_string()
23}
24
25/// Create a random string of specific length
26pub fn random_string(length: usize) -> String {
27    let mut rng = thread_rng();
28    iter::repeat(())
29        .map(|()| rng.sample(Alphanumeric))
30        .map(char::from)
31        .take(length)
32        .collect()
33}
34
35/// Generate a weighted random index based on weights
36pub fn weighted_random_index(weights: &[u32]) -> Option<usize> {
37    if weights.is_empty() {
38        return None;
39    }
40    
41    let sum: u32 = weights.iter().sum();
42    if sum == 0 {
43        return None;
44    }
45    
46    let mut rng = thread_rng();
47    let rand_val = rng.gen_range(0..sum);
48    
49    let mut cumulative = 0;
50    for (i, &weight) in weights.iter().enumerate() {
51        cumulative += weight;
52        if rand_val < cumulative {
53            return Some(i);
54        }
55    }
56    
57    // Fallback (should not happen)
58    Some(0)
59}
60
61/// Round-robin load balancer implementation
62pub struct RoundRobinBalancer {
63    /// Current index
64    current: RwLock<usize>,
65    
66    /// Number of targets
67    targets: usize,
68}
69
70impl RoundRobinBalancer {
71    /// Create a new round-robin balancer
72    pub fn new(targets: usize) -> Self {
73        Self {
74            current: RwLock::new(0),
75            targets,
76        }
77    }
78    
79    /// Get the next index
80    pub async fn next(&self) -> Option<usize> {
81        if self.targets == 0 {
82            return None;
83        }
84        
85        let mut current = self.current.write().await;
86        let idx = *current;
87        *current = (*current + 1) % self.targets;
88        
89        Some(idx)
90    }
91    
92    /// Update the number of targets
93    pub async fn update_targets(&mut self, targets: usize) {
94        self.targets = targets;
95        
96        let mut current = self.current.write().await;
97        if self.targets > 0 {
98            *current = *current % self.targets;
99        } else {
100            *current = 0;
101        }
102    }
103}
104
105/// Weighted round-robin load balancer implementation
106pub struct WeightedRoundRobinBalancer {
107    /// Current running sum
108    current_sum: RwLock<i64>,
109    
110    /// Maximum weight
111    max_weight: i64,
112    
113    /// Greatest common divisor of all weights
114    gcd_weight: i64,
115    
116    /// Weights
117    weights: Vec<i64>,
118}
119
120impl WeightedRoundRobinBalancer {
121    /// Create a new weighted round-robin balancer
122    pub fn new(weights: Vec<u32>) -> Self {
123        let weights_i64: Vec<i64> = weights.iter().map(|&w| w as i64).collect();
124        
125        // Find max weight
126        let max_weight = *weights_i64.iter().max().unwrap_or(&0);
127        
128        // Calculate GCD of all weights
129        let gcd_weight = if weights_i64.len() > 1 {
130            weights_i64
131                .iter()
132                .skip(1)
133                .fold(weights_i64[0], |gcd, &weight| gcd_of_two(gcd, weight))
134        } else {
135            weights_i64.get(0).copied().unwrap_or(1)
136        };
137        
138        Self {
139            current_sum: RwLock::new(0),
140            max_weight,
141            gcd_weight,
142            weights: weights_i64,
143        }
144    }
145    
146    /// Get the next index
147    pub async fn next(&self) -> Option<usize> {
148        if self.weights.is_empty() {
149            return None;
150        }
151        
152        let mut i = 0;
153        loop {
154            let mut current_sum = self.current_sum.write().await;
155            
156            i = i % self.weights.len();
157            
158            if i == 0 {
159                *current_sum -= self.gcd_weight;
160                if *current_sum <= 0 {
161                    *current_sum = self.max_weight;
162                    if *current_sum == 0 {
163                        return Some(0);
164                    }
165                }
166            }
167            
168            if self.weights[i] >= *current_sum {
169                return Some(i);
170            }
171            
172            i += 1;
173        }
174    }
175}
176
177/// Calculate the greatest common divisor of two numbers
178fn gcd_of_two(a: i64, b: i64) -> i64 {
179    if b == 0 {
180        a.abs()
181    } else {
182        gcd_of_two(b, a % b)
183    }
184}
185
186/// Calculate exponential backoff delay
187pub fn exponential_backoff(attempt: u32, base_ms: u64, max_ms: u64) -> u64 {
188    let delay = base_ms * (2_u64.pow(attempt as u32));
189    delay.min(max_ms)
190}
191
192/// Check if a port is available
193pub async fn is_port_available(port: u16) -> bool {
194    let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
195    tokio::net::TcpListener::bind(addr).await.is_ok()
196}
197
198/// Find an available port
199pub async fn find_available_port(start_port: u16, end_port: u16) -> Option<u16> {
200    for port in start_port..=end_port {
201        if is_port_available(port).await {
202            return Some(port);
203        }
204    }
205    None
206}
207
208/// Create directory if it doesn't exist
209pub async fn ensure_directory(dir: impl AsRef<Path>) -> Result<()> {
210    let path = dir.as_ref();
211    if !path.exists() {
212        fs::create_dir_all(path).await
213            .with_context(|| format!("Failed to create directory: {}", path.display()))?;
214    }
215    Ok(())
216}
217
218/// Write to a file atomically
219pub async fn atomic_write_file(path: impl AsRef<Path>, data: &[u8]) -> Result<()> {
220    // Write to a temporary file first
221    let path = path.as_ref();
222    let temp_path = path.with_extension("tmp");
223    
224    // Create parent directory if needed
225    if let Some(parent) = path.parent() {
226        ensure_directory(parent).await?;
227    }
228    
229    // Write to temporary file
230    let mut file = fs::File::create(&temp_path).await
231        .with_context(|| format!("Failed to create temporary file: {}", temp_path.display()))?;
232    
233    file.write_all(data).await
234        .with_context(|| format!("Failed to write to temporary file: {}", temp_path.display()))?;
235    
236    file.flush().await
237        .with_context(|| format!("Failed to flush temporary file: {}", temp_path.display()))?;
238    
239    // Rename temporary file to target file
240    fs::rename(&temp_path, path).await
241        .with_context(|| format!("Failed to rename temporary file to {}", path.display()))?;
242    
243    Ok(())
244}
245
246/// Get current timestamp in milliseconds
247pub fn current_time_millis() -> u64 {
248    SystemTime::now()
249        .duration_since(UNIX_EPOCH)
250        .unwrap_or_default()
251        .as_millis() as u64
252}
253
254/// Format timestamp as ISO 8601
255pub fn format_timestamp(timestamp: u64) -> String {
256    let dt = DateTime::<Utc>::from_utc(
257        chrono::NaiveDateTime::from_timestamp_opt((timestamp / 1000) as i64, 0).unwrap_or_default(),
258        Utc,
259    );
260    dt.to_rfc3339()
261}
262
263/// Parse ISO 8601 timestamp
264pub fn parse_timestamp(timestamp: &str) -> Result<u64> {
265    let dt = DateTime::parse_from_rfc3339(timestamp)
266        .with_context(|| format!("Invalid timestamp format: {}", timestamp))?;
267    
268    Ok(dt.timestamp_millis() as u64)
269}
270
271/// Calculate time difference in seconds
272pub fn time_diff_seconds(from: &DateTime<Utc>, to: &DateTime<Utc>) -> i64 {
273    (*to - *from).num_seconds()
274}
275
276/// System information
277#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct SystemInfo {
279    /// Node ID
280    pub node_id: String,
281    
282    /// Hostname
283    pub hostname: String,
284    
285    /// CPU count
286    pub cpu_count: u32,
287    
288    /// Total memory in MB
289    pub total_memory_mb: u64,
290    
291    /// Available memory in MB
292    pub available_memory_mb: u64,
293    
294    /// Uptime in seconds
295    pub uptime_seconds: u64,
296    
297    /// OS type
298    pub os_type: String,
299    
300    /// OS version
301    pub os_version: String,
302}
303
304impl SystemInfo {
305    /// Get system information
306    pub fn get() -> Result<Self> {
307        // Note: In a real implementation, this would use system-specific APIs
308        // For this example, we'll create dummy values
309        Ok(Self {
310            node_id: random_id(),
311            hostname: hostname::get()
312                .map(|h| h.to_string_lossy().to_string())
313                .unwrap_or_else(|_| "unknown".to_string()),
314            cpu_count: num_cpus::get() as u32,
315            total_memory_mb: 16384, // 16 GB
316            available_memory_mb: 8192, // 8 GB
317            uptime_seconds: 3600, // 1 hour
318            os_type: std::env::consts::OS.to_string(),
319            os_version: "1.0.0".to_string(),
320        })
321    }
322    
323    /// Get as HashMap
324    pub fn as_hashmap(&self) -> HashMap<String, String> {
325        let mut map = HashMap::new();
326        
327        map.insert("node_id".to_string(), self.node_id.clone());
328        map.insert("hostname".to_string(), self.hostname.clone());
329        map.insert("cpu_count".to_string(), self.cpu_count.to_string());
330        map.insert("total_memory_mb".to_string(), self.total_memory_mb.to_string());
331        map.insert("available_memory_mb".to_string(), self.available_memory_mb.to_string());
332        map.insert("uptime_seconds".to_string(), self.uptime_seconds.to_string());
333        map.insert("os_type".to_string(), self.os_type.clone());
334        map.insert("os_version".to_string(), self.os_version.clone());
335        
336        map
337    }
338}
339
340/// Retry with exponential backoff
341pub async fn retry_with_backoff<F, Fut, T, E>(
342    f: F,
343    attempts: u32,
344    base_delay_ms: u64,
345    max_delay_ms: u64,
346) -> Result<T, E>
347where
348    F: Fn() -> Fut,
349    Fut: std::future::Future<Output = Result<T, E>>,
350{
351    let mut attempt = 0;
352    loop {
353        match f().await {
354            Ok(result) => return Ok(result),
355            Err(err) => {
356                attempt += 1;
357                if attempt >= attempts {
358                    return Err(err);
359                }
360                
361                let delay_ms = exponential_backoff(attempt, base_delay_ms, max_delay_ms);
362                sleep(tokio::time::Duration::from_millis(delay_ms)).await;
363            }
364        }
365    }
366}
367
368/// Simple retry
369pub async fn retry<F, Fut, T, E>(f: F, attempts: u32, delay_ms: u64) -> Result<T, E>
370where
371    F: Fn() -> Fut,
372    Fut: std::future::Future<Output = Result<T, E>>,
373{
374    let mut attempt = 0;
375    loop {
376        match f().await {
377            Ok(result) => return Ok(result),
378            Err(err) => {
379                attempt += 1;
380                if attempt >= attempts {
381                    return Err(err);
382                }
383                
384                sleep(tokio::time::Duration::from_millis(delay_ms)).await;
385            }
386        }
387    }
388}
389
390/// Circularly increment counter with wraparound
391#[derive(Debug)]
392pub struct CircularCounter {
393    value: RwLock<u64>,
394    max: u64,
395}
396
397impl CircularCounter {
398    /// Create a new circular counter
399    pub fn new(start: u64, max: u64) -> Self {
400        Self {
401            value: RwLock::new(start % (max + 1)),
402            max,
403        }
404    }
405    
406    /// Get the current value
407    pub async fn get(&self) -> u64 {
408        *self.value.read().await
409    }
410    
411    /// Increment and get the new value
412    pub async fn increment(&self) -> u64 {
413        let mut value = self.value.write().await;
414        *value = (*value + 1) % (self.max + 1);
415        *value
416    }
417    
418    /// Set to a specific value
419    pub async fn set(&self, new_value: u64) {
420        let mut value = self.value.write().await;
421        *value = new_value % (self.max + 1);
422    }
423}