Lodestone/
metrics.rs

1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Instant;
4use std::collections::HashMap;
5use tokio::sync::Mutex;
6use lazy_static::lazy_static;
7use prometheus::{
8    IntCounterVec, Counter, Gauge, Histogram, HistogramOpts, HistogramVec, Registry,
9    register_int_counter_vec, register_counter, register_gauge, register_histogram,
10};
11use tracing::info;
12
13// Define a global registry for Prometheus metrics
14lazy_static! {
15    static ref REGISTRY: Mutex<Option<Registry>> = Mutex::new(None);
16
17    // Service discovery metrics
18    static ref SERVICE_REGISTERED_COUNTER: IntCounterVec = register_int_counter_vec!(
19        "lodestone_service_registered_total",
20        "Total number of service instances registered",
21        &["service_name"]
22    ).expect("Failed to register SERVICE_REGISTERED_COUNTER");
23
24    static ref SERVICE_DEREGISTERED_COUNTER: IntCounterVec = register_int_counter_vec!(
25        "lodestone_service_deregistered_total", 
26        "Total number of service instances deregistered",
27        &["service_name"]
28    ).expect("Failed to register SERVICE_DEREGISTERED_COUNTER");
29
30    static ref SERVICE_HEALTH_CHECK_COUNTER: IntCounterVec = register_int_counter_vec!(
31        "lodestone_service_health_check_total",
32        "Total number of service health checks",
33        &["service_name", "result"]
34    ).expect("Failed to register SERVICE_HEALTH_CHECK_COUNTER");
35
36    static ref SERVICE_INSTANCE_GAUGE: Gauge = register_gauge!(
37        "lodestone_service_instances",
38        "Current number of registered service instances"
39    ).expect("Failed to register SERVICE_INSTANCE_GAUGE");
40
41    static ref SERVICE_HEALTHY_GAUGE: Gauge = register_gauge!(
42        "lodestone_service_healthy_instances",
43        "Current number of healthy service instances"
44    ).expect("Failed to register SERVICE_HEALTHY_GAUGE");
45
46    // Router metrics
47    static ref ROUTER_REQUEST_COUNTER: IntCounterVec = register_int_counter_vec!(
48        "lodestone_router_requests_total",
49        "Total number of requests processed by the router",
50        &["route", "status"]
51    ).expect("Failed to register ROUTER_REQUEST_COUNTER");
52
53    static ref ROUTER_REQUEST_DURATION: HistogramVec = {
54        let opts = HistogramOpts::new(
55            "lodestone_router_request_duration_seconds",
56            "Request duration in seconds"
57        )
58        .buckets(vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]);
59        
60        prometheus::register_histogram_vec!(opts, &["route"]).expect("Failed to register ROUTER_REQUEST_DURATION")
61    };
62
63    static ref ROUTER_ACTIVE_CONNECTIONS: Gauge = register_gauge!(
64        "lodestone_router_active_connections",
65        "Current number of active connections"
66    ).expect("Failed to register ROUTER_ACTIVE_CONNECTIONS");
67
68    // Raft metrics
69    static ref RAFT_LEADER_CHANGES: Counter = register_counter!(
70        "lodestone_raft_leader_changes_total",
71        "Total number of Raft leader changes"
72    ).expect("Failed to register RAFT_LEADER_CHANGES");
73
74    static ref RAFT_COMMIT_INDEX: Gauge = register_gauge!(
75        "lodestone_raft_commit_index",
76        "Current Raft commit index"
77    ).expect("Failed to register RAFT_COMMIT_INDEX");
78
79    static ref RAFT_LOG_SIZE: Gauge = register_gauge!(
80        "lodestone_raft_log_size",
81        "Current size of the Raft log in entries"
82    ).expect("Failed to register RAFT_LOG_SIZE");
83
84    static ref RAFT_APPLY_DURATION: Histogram = register_histogram!(
85        HistogramOpts::new(
86            "lodestone_raft_apply_duration_seconds",
87            "Time taken to apply Raft operations"
88        )
89        .buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5])
90    ).expect("Failed to register RAFT_APPLY_DURATION");
91}
92
93/// Initialize metrics
94pub async fn init_metrics() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
95    let mut registry_guard = REGISTRY.lock().await;
96    
97    if registry_guard.is_none() {
98        let registry = Registry::new();
99        
100        // Register default metrics collectors
101        *registry_guard = Some(registry);
102        
103        info!("Metrics initialized");
104    }
105    
106    Ok(())
107}
108
109/// Expose metrics in Prometheus format
110pub fn gather_metrics() -> String {
111    // This would use prometheus client to gather metrics
112    // For simplicity, we just return an example metrics string
113    let mut output = String::new();
114    
115    // Headers and samples
116    output.push_str("# HELP lodestone_service_instances Current number of registered service instances\n");
117    output.push_str("# TYPE lodestone_service_instances gauge\n");
118    output.push_str("lodestone_service_instances ");
119    output.push_str(&SERVICE_INSTANCE_GAUGE.get().to_string());
120    output.push_str("\n");
121    
122    // More metrics...
123    
124    output
125}
126
127/// Record a service registration
128pub fn record_service_registered(service_name: &str) {
129    SERVICE_REGISTERED_COUNTER.with_label_values(&[service_name]).inc();
130    SERVICE_INSTANCE_GAUGE.inc();
131}
132
133/// Record a service deregistration
134pub fn record_service_deregistered(service_name: &str) {
135    SERVICE_DEREGISTERED_COUNTER.with_label_values(&[service_name]).inc();
136    SERVICE_INSTANCE_GAUGE.dec();
137}
138
139/// Record a service health check
140pub fn record_service_health_check(service_name: &str, healthy: bool) {
141    let result = if healthy { "healthy" } else { "unhealthy" };
142    SERVICE_HEALTH_CHECK_COUNTER.with_label_values(&[service_name, result]).inc();
143}
144
145/// Update service health gauges
146pub fn update_service_health_gauges(healthy_count: usize) {
147    SERVICE_HEALTHY_GAUGE.set(healthy_count as f64);
148}
149
150/// Record a router request
151pub fn record_router_request(route: &str, status_code: u16) {
152    let status_range = match status_code {
153        100..=199 => "1xx",
154        200..=299 => "2xx",
155        300..=399 => "3xx",
156        400..=499 => "4xx",
157        500..=599 => "5xx",
158        _ => "unknown",
159    };
160    
161    ROUTER_REQUEST_COUNTER.with_label_values(&[route, status_range]).inc();
162}
163
164/// Track router request duration
165pub struct RequestTimer {
166    route: String,
167    start: Instant,
168}
169
170impl RequestTimer {
171    /// Create a new request timer
172    pub fn new(route: &str) -> Self {
173        Self {
174            route: route.to_string(),
175            start: Instant::now(),
176        }
177    }
178    
179    /// Record the request duration
180    pub fn observe(self) {
181        let duration = self.start.elapsed();
182        ROUTER_REQUEST_DURATION
183            .with_label_values(&[&self.route])
184            .observe(duration.as_secs_f64());
185    }
186}
187
188/// Increment active connections
189pub fn increment_active_connections() {
190    ROUTER_ACTIVE_CONNECTIONS.inc();
191}
192
193/// Decrement active connections
194pub fn decrement_active_connections() {
195    ROUTER_ACTIVE_CONNECTIONS.dec();
196}
197
198/// Record a Raft leader change
199pub fn record_raft_leader_change() {
200    RAFT_LEADER_CHANGES.inc();
201}
202
203/// Update Raft commit index
204pub fn update_raft_commit_index(index: u64) {
205    RAFT_COMMIT_INDEX.set(index as f64);
206}
207
208/// Update Raft log size
209pub fn update_raft_log_size(size: usize) {
210    RAFT_LOG_SIZE.set(size as f64);
211}
212
213/// Measure Raft apply operation duration
214pub struct RaftApplyTimer {
215    start: Instant,
216}
217
218impl RaftApplyTimer {
219    /// Create a new Raft apply timer
220    pub fn new() -> Self {
221        Self {
222            start: Instant::now(),
223        }
224    }
225    
226    /// Record the apply duration
227    pub fn observe(self) {
228        let duration = self.start.elapsed();
229        RAFT_APPLY_DURATION.observe(duration.as_secs_f64());
230    }
231}
232
233/// Reset all metrics to their initial state
234pub fn reset_metrics() {
235    // Since we can't easily reset Prometheus metrics, 
236    // this is more for testing purposes
237    SERVICE_INSTANCE_GAUGE.set(0.0);
238    SERVICE_HEALTHY_GAUGE.set(0.0);
239    ROUTER_ACTIVE_CONNECTIONS.set(0.0);
240    RAFT_COMMIT_INDEX.set(0.0);
241    RAFT_LOG_SIZE.set(0.0);
242}
243
244/// Utility struct for tracking metrics in a single node
245#[derive(Debug, Clone)]
246pub struct NodeMetrics {
247    /// Node identifier
248    pub node_id: String,
249    
250    /// Number of registered services
251    pub registered_services: Arc<AtomicU64>,
252    
253    /// Number of healthy instances
254    pub healthy_instances: Arc<AtomicU64>,
255    
256    /// Number of active connections
257    pub active_connections: Arc<AtomicU64>,
258    
259    /// Is node leader
260    pub is_leader: bool,
261    
262    /// Uptime in seconds
263    pub uptime: u64,
264    
265    /// CPU usage percentage
266    pub cpu_usage: f64,
267    
268    /// Memory usage in MB
269    pub memory_usage: f64,
270}
271
272impl NodeMetrics {
273    /// Create new node metrics
274    pub fn new(node_id: &str) -> Self {
275        Self {
276            node_id: node_id.to_string(),
277            registered_services: Arc::new(AtomicU64::new(0)),
278            healthy_instances: Arc::new(AtomicU64::new(0)),
279            active_connections: Arc::new(AtomicU64::new(0)),
280            is_leader: false,
281            uptime: 0,
282            cpu_usage: 0.0,
283            memory_usage: 0.0,
284        }
285    }
286    
287    /// Increment registered services
288    pub fn increment_services(&self) {
289        self.registered_services.fetch_add(1, Ordering::SeqCst);
290    }
291    
292    /// Decrement registered services
293    pub fn decrement_services(&self) {
294        self.registered_services.fetch_sub(1, Ordering::SeqCst);
295    }
296    
297    /// Update healthy instances
298    pub fn set_healthy_instances(&self, count: u64) {
299        self.healthy_instances.store(count, Ordering::SeqCst);
300    }
301    
302    /// Increment active connections
303    pub fn increment_connections(&self) {
304        self.active_connections.fetch_add(1, Ordering::SeqCst);
305    }
306    
307    /// Decrement active connections
308    pub fn decrement_connections(&self) {
309        self.active_connections.fetch_sub(1, Ordering::SeqCst);
310    }
311    
312    /// Get all metrics as a HashMap
313    pub fn as_hashmap(&self) -> HashMap<String, String> {
314        let mut map = HashMap::new();
315        
316        map.insert("node_id".to_string(), self.node_id.clone());
317        map.insert("registered_services".to_string(), self.registered_services.load(Ordering::SeqCst).to_string());
318        map.insert("healthy_instances".to_string(), self.healthy_instances.load(Ordering::SeqCst).to_string());
319        map.insert("active_connections".to_string(), self.active_connections.load(Ordering::SeqCst).to_string());
320        map.insert("is_leader".to_string(), self.is_leader.to_string());
321        map.insert("uptime".to_string(), self.uptime.to_string());
322        map.insert("cpu_usage".to_string(), format!("{:.2}", self.cpu_usage));
323        map.insert("memory_usage".to_string(), format!("{:.2}", self.memory_usage));
324        
325        map
326    }
327}