1use std::sync::Arc;
2use std::sync::atomic::{AtomicU64, Ordering};
3use std::time::Instant;
4use std::collections::HashMap;
5use tokio::sync::Mutex;
6use lazy_static::lazy_static;
7use prometheus::{
8 IntCounterVec, Counter, Gauge, Histogram, HistogramOpts, HistogramVec, Registry,
9 register_int_counter_vec, register_counter, register_gauge, register_histogram,
10};
11use tracing::info;
12
13lazy_static! {
15 static ref REGISTRY: Mutex<Option<Registry>> = Mutex::new(None);
16
17 static ref SERVICE_REGISTERED_COUNTER: IntCounterVec = register_int_counter_vec!(
19 "lodestone_service_registered_total",
20 "Total number of service instances registered",
21 &["service_name"]
22 ).expect("Failed to register SERVICE_REGISTERED_COUNTER");
23
24 static ref SERVICE_DEREGISTERED_COUNTER: IntCounterVec = register_int_counter_vec!(
25 "lodestone_service_deregistered_total",
26 "Total number of service instances deregistered",
27 &["service_name"]
28 ).expect("Failed to register SERVICE_DEREGISTERED_COUNTER");
29
30 static ref SERVICE_HEALTH_CHECK_COUNTER: IntCounterVec = register_int_counter_vec!(
31 "lodestone_service_health_check_total",
32 "Total number of service health checks",
33 &["service_name", "result"]
34 ).expect("Failed to register SERVICE_HEALTH_CHECK_COUNTER");
35
36 static ref SERVICE_INSTANCE_GAUGE: Gauge = register_gauge!(
37 "lodestone_service_instances",
38 "Current number of registered service instances"
39 ).expect("Failed to register SERVICE_INSTANCE_GAUGE");
40
41 static ref SERVICE_HEALTHY_GAUGE: Gauge = register_gauge!(
42 "lodestone_service_healthy_instances",
43 "Current number of healthy service instances"
44 ).expect("Failed to register SERVICE_HEALTHY_GAUGE");
45
46 static ref ROUTER_REQUEST_COUNTER: IntCounterVec = register_int_counter_vec!(
48 "lodestone_router_requests_total",
49 "Total number of requests processed by the router",
50 &["route", "status"]
51 ).expect("Failed to register ROUTER_REQUEST_COUNTER");
52
53 static ref ROUTER_REQUEST_DURATION: HistogramVec = {
54 let opts = HistogramOpts::new(
55 "lodestone_router_request_duration_seconds",
56 "Request duration in seconds"
57 )
58 .buckets(vec![0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0]);
59
60 prometheus::register_histogram_vec!(opts, &["route"]).expect("Failed to register ROUTER_REQUEST_DURATION")
61 };
62
63 static ref ROUTER_ACTIVE_CONNECTIONS: Gauge = register_gauge!(
64 "lodestone_router_active_connections",
65 "Current number of active connections"
66 ).expect("Failed to register ROUTER_ACTIVE_CONNECTIONS");
67
68 static ref RAFT_LEADER_CHANGES: Counter = register_counter!(
70 "lodestone_raft_leader_changes_total",
71 "Total number of Raft leader changes"
72 ).expect("Failed to register RAFT_LEADER_CHANGES");
73
74 static ref RAFT_COMMIT_INDEX: Gauge = register_gauge!(
75 "lodestone_raft_commit_index",
76 "Current Raft commit index"
77 ).expect("Failed to register RAFT_COMMIT_INDEX");
78
79 static ref RAFT_LOG_SIZE: Gauge = register_gauge!(
80 "lodestone_raft_log_size",
81 "Current size of the Raft log in entries"
82 ).expect("Failed to register RAFT_LOG_SIZE");
83
84 static ref RAFT_APPLY_DURATION: Histogram = register_histogram!(
85 HistogramOpts::new(
86 "lodestone_raft_apply_duration_seconds",
87 "Time taken to apply Raft operations"
88 )
89 .buckets(vec![0.001, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5])
90 ).expect("Failed to register RAFT_APPLY_DURATION");
91}
92
93pub async fn init_metrics() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
95 let mut registry_guard = REGISTRY.lock().await;
96
97 if registry_guard.is_none() {
98 let registry = Registry::new();
99
100 *registry_guard = Some(registry);
102
103 info!("Metrics initialized");
104 }
105
106 Ok(())
107}
108
109pub fn gather_metrics() -> String {
111 let mut output = String::new();
114
115 output.push_str("# HELP lodestone_service_instances Current number of registered service instances\n");
117 output.push_str("# TYPE lodestone_service_instances gauge\n");
118 output.push_str("lodestone_service_instances ");
119 output.push_str(&SERVICE_INSTANCE_GAUGE.get().to_string());
120 output.push_str("\n");
121
122 output
125}
126
127pub fn record_service_registered(service_name: &str) {
129 SERVICE_REGISTERED_COUNTER.with_label_values(&[service_name]).inc();
130 SERVICE_INSTANCE_GAUGE.inc();
131}
132
133pub fn record_service_deregistered(service_name: &str) {
135 SERVICE_DEREGISTERED_COUNTER.with_label_values(&[service_name]).inc();
136 SERVICE_INSTANCE_GAUGE.dec();
137}
138
139pub fn record_service_health_check(service_name: &str, healthy: bool) {
141 let result = if healthy { "healthy" } else { "unhealthy" };
142 SERVICE_HEALTH_CHECK_COUNTER.with_label_values(&[service_name, result]).inc();
143}
144
145pub fn update_service_health_gauges(healthy_count: usize) {
147 SERVICE_HEALTHY_GAUGE.set(healthy_count as f64);
148}
149
150pub fn record_router_request(route: &str, status_code: u16) {
152 let status_range = match status_code {
153 100..=199 => "1xx",
154 200..=299 => "2xx",
155 300..=399 => "3xx",
156 400..=499 => "4xx",
157 500..=599 => "5xx",
158 _ => "unknown",
159 };
160
161 ROUTER_REQUEST_COUNTER.with_label_values(&[route, status_range]).inc();
162}
163
164pub struct RequestTimer {
166 route: String,
167 start: Instant,
168}
169
170impl RequestTimer {
171 pub fn new(route: &str) -> Self {
173 Self {
174 route: route.to_string(),
175 start: Instant::now(),
176 }
177 }
178
179 pub fn observe(self) {
181 let duration = self.start.elapsed();
182 ROUTER_REQUEST_DURATION
183 .with_label_values(&[&self.route])
184 .observe(duration.as_secs_f64());
185 }
186}
187
188pub fn increment_active_connections() {
190 ROUTER_ACTIVE_CONNECTIONS.inc();
191}
192
193pub fn decrement_active_connections() {
195 ROUTER_ACTIVE_CONNECTIONS.dec();
196}
197
198pub fn record_raft_leader_change() {
200 RAFT_LEADER_CHANGES.inc();
201}
202
203pub fn update_raft_commit_index(index: u64) {
205 RAFT_COMMIT_INDEX.set(index as f64);
206}
207
208pub fn update_raft_log_size(size: usize) {
210 RAFT_LOG_SIZE.set(size as f64);
211}
212
213pub struct RaftApplyTimer {
215 start: Instant,
216}
217
218impl RaftApplyTimer {
219 pub fn new() -> Self {
221 Self {
222 start: Instant::now(),
223 }
224 }
225
226 pub fn observe(self) {
228 let duration = self.start.elapsed();
229 RAFT_APPLY_DURATION.observe(duration.as_secs_f64());
230 }
231}
232
233pub fn reset_metrics() {
235 SERVICE_INSTANCE_GAUGE.set(0.0);
238 SERVICE_HEALTHY_GAUGE.set(0.0);
239 ROUTER_ACTIVE_CONNECTIONS.set(0.0);
240 RAFT_COMMIT_INDEX.set(0.0);
241 RAFT_LOG_SIZE.set(0.0);
242}
243
244#[derive(Debug, Clone)]
246pub struct NodeMetrics {
247 pub node_id: String,
249
250 pub registered_services: Arc<AtomicU64>,
252
253 pub healthy_instances: Arc<AtomicU64>,
255
256 pub active_connections: Arc<AtomicU64>,
258
259 pub is_leader: bool,
261
262 pub uptime: u64,
264
265 pub cpu_usage: f64,
267
268 pub memory_usage: f64,
270}
271
272impl NodeMetrics {
273 pub fn new(node_id: &str) -> Self {
275 Self {
276 node_id: node_id.to_string(),
277 registered_services: Arc::new(AtomicU64::new(0)),
278 healthy_instances: Arc::new(AtomicU64::new(0)),
279 active_connections: Arc::new(AtomicU64::new(0)),
280 is_leader: false,
281 uptime: 0,
282 cpu_usage: 0.0,
283 memory_usage: 0.0,
284 }
285 }
286
287 pub fn increment_services(&self) {
289 self.registered_services.fetch_add(1, Ordering::SeqCst);
290 }
291
292 pub fn decrement_services(&self) {
294 self.registered_services.fetch_sub(1, Ordering::SeqCst);
295 }
296
297 pub fn set_healthy_instances(&self, count: u64) {
299 self.healthy_instances.store(count, Ordering::SeqCst);
300 }
301
302 pub fn increment_connections(&self) {
304 self.active_connections.fetch_add(1, Ordering::SeqCst);
305 }
306
307 pub fn decrement_connections(&self) {
309 self.active_connections.fetch_sub(1, Ordering::SeqCst);
310 }
311
312 pub fn as_hashmap(&self) -> HashMap<String, String> {
314 let mut map = HashMap::new();
315
316 map.insert("node_id".to_string(), self.node_id.clone());
317 map.insert("registered_services".to_string(), self.registered_services.load(Ordering::SeqCst).to_string());
318 map.insert("healthy_instances".to_string(), self.healthy_instances.load(Ordering::SeqCst).to_string());
319 map.insert("active_connections".to_string(), self.active_connections.load(Ordering::SeqCst).to_string());
320 map.insert("is_leader".to_string(), self.is_leader.to_string());
321 map.insert("uptime".to_string(), self.uptime.to_string());
322 map.insert("cpu_usage".to_string(), format!("{:.2}", self.cpu_usage));
323 map.insert("memory_usage".to_string(), format!("{:.2}", self.memory_usage));
324
325 map
326 }
327}