1use anyhow::{Result, Context};
3use chrono::{DateTime, Duration, Utc};
4use rand::{thread_rng, Rng};
5use rand::distributions::Alphanumeric;
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::iter;
9use std::net::{IpAddr, Ipv4Addr, SocketAddr};
10use std::path::Path;
11use std::sync::Arc;
12use std::time::{SystemTime, UNIX_EPOCH};
13use tokio::fs;
14use tokio::io::AsyncWriteExt;
15use tokio::sync::RwLock;
16use tokio::time::sleep;
17use tracing::{debug, info, warn, error};
18use uuid::Uuid;
19
20pub fn random_id() -> String {
22 Uuid::new_v4().to_string()
23}
24
25pub fn random_string(length: usize) -> String {
27 let mut rng = thread_rng();
28 iter::repeat(())
29 .map(|()| rng.sample(Alphanumeric))
30 .map(char::from)
31 .take(length)
32 .collect()
33}
34
35pub fn weighted_random_index(weights: &[u32]) -> Option<usize> {
37 if weights.is_empty() {
38 return None;
39 }
40
41 let sum: u32 = weights.iter().sum();
42 if sum == 0 {
43 return None;
44 }
45
46 let mut rng = thread_rng();
47 let rand_val = rng.gen_range(0..sum);
48
49 let mut cumulative = 0;
50 for (i, &weight) in weights.iter().enumerate() {
51 cumulative += weight;
52 if rand_val < cumulative {
53 return Some(i);
54 }
55 }
56
57 Some(0)
59}
60
61pub struct RoundRobinBalancer {
63 current: RwLock<usize>,
65
66 targets: usize,
68}
69
70impl RoundRobinBalancer {
71 pub fn new(targets: usize) -> Self {
73 Self {
74 current: RwLock::new(0),
75 targets,
76 }
77 }
78
79 pub async fn next(&self) -> Option<usize> {
81 if self.targets == 0 {
82 return None;
83 }
84
85 let mut current = self.current.write().await;
86 let idx = *current;
87 *current = (*current + 1) % self.targets;
88
89 Some(idx)
90 }
91
92 pub async fn update_targets(&mut self, targets: usize) {
94 self.targets = targets;
95
96 let mut current = self.current.write().await;
97 if self.targets > 0 {
98 *current = *current % self.targets;
99 } else {
100 *current = 0;
101 }
102 }
103}
104
105pub struct WeightedRoundRobinBalancer {
107 current_sum: RwLock<i64>,
109
110 max_weight: i64,
112
113 gcd_weight: i64,
115
116 weights: Vec<i64>,
118}
119
120impl WeightedRoundRobinBalancer {
121 pub fn new(weights: Vec<u32>) -> Self {
123 let weights_i64: Vec<i64> = weights.iter().map(|&w| w as i64).collect();
124
125 let max_weight = *weights_i64.iter().max().unwrap_or(&0);
127
128 let gcd_weight = if weights_i64.len() > 1 {
130 weights_i64
131 .iter()
132 .skip(1)
133 .fold(weights_i64[0], |gcd, &weight| gcd_of_two(gcd, weight))
134 } else {
135 weights_i64.get(0).copied().unwrap_or(1)
136 };
137
138 Self {
139 current_sum: RwLock::new(0),
140 max_weight,
141 gcd_weight,
142 weights: weights_i64,
143 }
144 }
145
146 pub async fn next(&self) -> Option<usize> {
148 if self.weights.is_empty() {
149 return None;
150 }
151
152 let mut i = 0;
153 loop {
154 let mut current_sum = self.current_sum.write().await;
155
156 i = i % self.weights.len();
157
158 if i == 0 {
159 *current_sum -= self.gcd_weight;
160 if *current_sum <= 0 {
161 *current_sum = self.max_weight;
162 if *current_sum == 0 {
163 return Some(0);
164 }
165 }
166 }
167
168 if self.weights[i] >= *current_sum {
169 return Some(i);
170 }
171
172 i += 1;
173 }
174 }
175}
176
177fn gcd_of_two(a: i64, b: i64) -> i64 {
179 if b == 0 {
180 a.abs()
181 } else {
182 gcd_of_two(b, a % b)
183 }
184}
185
186pub fn exponential_backoff(attempt: u32, base_ms: u64, max_ms: u64) -> u64 {
188 let delay = base_ms * (2_u64.pow(attempt as u32));
189 delay.min(max_ms)
190}
191
192pub async fn is_port_available(port: u16) -> bool {
194 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), port);
195 tokio::net::TcpListener::bind(addr).await.is_ok()
196}
197
198pub async fn find_available_port(start_port: u16, end_port: u16) -> Option<u16> {
200 for port in start_port..=end_port {
201 if is_port_available(port).await {
202 return Some(port);
203 }
204 }
205 None
206}
207
208pub async fn ensure_directory(dir: impl AsRef<Path>) -> Result<()> {
210 let path = dir.as_ref();
211 if !path.exists() {
212 fs::create_dir_all(path).await
213 .with_context(|| format!("Failed to create directory: {}", path.display()))?;
214 }
215 Ok(())
216}
217
218pub async fn atomic_write_file(path: impl AsRef<Path>, data: &[u8]) -> Result<()> {
220 let path = path.as_ref();
222 let temp_path = path.with_extension("tmp");
223
224 if let Some(parent) = path.parent() {
226 ensure_directory(parent).await?;
227 }
228
229 let mut file = fs::File::create(&temp_path).await
231 .with_context(|| format!("Failed to create temporary file: {}", temp_path.display()))?;
232
233 file.write_all(data).await
234 .with_context(|| format!("Failed to write to temporary file: {}", temp_path.display()))?;
235
236 file.flush().await
237 .with_context(|| format!("Failed to flush temporary file: {}", temp_path.display()))?;
238
239 fs::rename(&temp_path, path).await
241 .with_context(|| format!("Failed to rename temporary file to {}", path.display()))?;
242
243 Ok(())
244}
245
246pub fn current_time_millis() -> u64 {
248 SystemTime::now()
249 .duration_since(UNIX_EPOCH)
250 .unwrap_or_default()
251 .as_millis() as u64
252}
253
254pub fn format_timestamp(timestamp: u64) -> String {
256 let dt = DateTime::<Utc>::from_utc(
257 chrono::NaiveDateTime::from_timestamp_opt((timestamp / 1000) as i64, 0).unwrap_or_default(),
258 Utc,
259 );
260 dt.to_rfc3339()
261}
262
263pub fn parse_timestamp(timestamp: &str) -> Result<u64> {
265 let dt = DateTime::parse_from_rfc3339(timestamp)
266 .with_context(|| format!("Invalid timestamp format: {}", timestamp))?;
267
268 Ok(dt.timestamp_millis() as u64)
269}
270
271pub fn time_diff_seconds(from: &DateTime<Utc>, to: &DateTime<Utc>) -> i64 {
273 (*to - *from).num_seconds()
274}
275
276#[derive(Debug, Clone, Serialize, Deserialize)]
278pub struct SystemInfo {
279 pub node_id: String,
281
282 pub hostname: String,
284
285 pub cpu_count: u32,
287
288 pub total_memory_mb: u64,
290
291 pub available_memory_mb: u64,
293
294 pub uptime_seconds: u64,
296
297 pub os_type: String,
299
300 pub os_version: String,
302}
303
304impl SystemInfo {
305 pub fn get() -> Result<Self> {
307 Ok(Self {
310 node_id: random_id(),
311 hostname: hostname::get()
312 .map(|h| h.to_string_lossy().to_string())
313 .unwrap_or_else(|_| "unknown".to_string()),
314 cpu_count: num_cpus::get() as u32,
315 total_memory_mb: 16384, available_memory_mb: 8192, uptime_seconds: 3600, os_type: std::env::consts::OS.to_string(),
319 os_version: "1.0.0".to_string(),
320 })
321 }
322
323 pub fn as_hashmap(&self) -> HashMap<String, String> {
325 let mut map = HashMap::new();
326
327 map.insert("node_id".to_string(), self.node_id.clone());
328 map.insert("hostname".to_string(), self.hostname.clone());
329 map.insert("cpu_count".to_string(), self.cpu_count.to_string());
330 map.insert("total_memory_mb".to_string(), self.total_memory_mb.to_string());
331 map.insert("available_memory_mb".to_string(), self.available_memory_mb.to_string());
332 map.insert("uptime_seconds".to_string(), self.uptime_seconds.to_string());
333 map.insert("os_type".to_string(), self.os_type.clone());
334 map.insert("os_version".to_string(), self.os_version.clone());
335
336 map
337 }
338}
339
340pub async fn retry_with_backoff<F, Fut, T, E>(
342 f: F,
343 attempts: u32,
344 base_delay_ms: u64,
345 max_delay_ms: u64,
346) -> Result<T, E>
347where
348 F: Fn() -> Fut,
349 Fut: std::future::Future<Output = Result<T, E>>,
350{
351 let mut attempt = 0;
352 loop {
353 match f().await {
354 Ok(result) => return Ok(result),
355 Err(err) => {
356 attempt += 1;
357 if attempt >= attempts {
358 return Err(err);
359 }
360
361 let delay_ms = exponential_backoff(attempt, base_delay_ms, max_delay_ms);
362 sleep(tokio::time::Duration::from_millis(delay_ms)).await;
363 }
364 }
365 }
366}
367
368pub async fn retry<F, Fut, T, E>(f: F, attempts: u32, delay_ms: u64) -> Result<T, E>
370where
371 F: Fn() -> Fut,
372 Fut: std::future::Future<Output = Result<T, E>>,
373{
374 let mut attempt = 0;
375 loop {
376 match f().await {
377 Ok(result) => return Ok(result),
378 Err(err) => {
379 attempt += 1;
380 if attempt >= attempts {
381 return Err(err);
382 }
383
384 sleep(tokio::time::Duration::from_millis(delay_ms)).await;
385 }
386 }
387 }
388}
389
390#[derive(Debug)]
392pub struct CircularCounter {
393 value: RwLock<u64>,
394 max: u64,
395}
396
397impl CircularCounter {
398 pub fn new(start: u64, max: u64) -> Self {
400 Self {
401 value: RwLock::new(start % (max + 1)),
402 max,
403 }
404 }
405
406 pub async fn get(&self) -> u64 {
408 *self.value.read().await
409 }
410
411 pub async fn increment(&self) -> u64 {
413 let mut value = self.value.write().await;
414 *value = (*value + 1) % (self.max + 1);
415 *value
416 }
417
418 pub async fn set(&self, new_value: u64) {
420 let mut value = self.value.write().await;
421 *value = new_value % (self.max + 1);
422 }
423}