1use crate::service::{Service, ServiceInstance, ServiceHealth};
3use crate::config::DiscoverySettings;
4use crate::store::Store;
5use anyhow::{Result, Context};
6use chrono::Utc;
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use tokio::sync::{RwLock, broadcast};
10use tokio::time::{interval, Duration};
11use tracing::{debug, info, warn, error};
12use uuid::Uuid;
13
14#[derive(Debug, Clone)]
16pub enum RegistryEvent {
17 ServiceRegistered {
19 service_name: String,
20 instance_id: String,
21 },
22 ServiceDeregistered {
24 service_name: String,
25 instance_id: String,
26 },
27 ServiceHealthChanged {
29 service_name: String,
30 instance_id: String,
31 old_health: ServiceHealth,
32 new_health: ServiceHealth,
33 },
34 ServiceUpdated {
36 service_name: String,
37 },
38}
39
40pub struct ServiceRegistry {
42 store: Arc<Box<dyn Store>>,
44
45 services: RwLock<HashMap<String, Service>>,
47
48 settings: DiscoverySettings,
50
51 event_tx: broadcast::Sender<RegistryEvent>,
53
54 node_id: String,
56
57 active: RwLock<bool>,
59}
60
61impl ServiceRegistry {
62 pub fn new(
64 store: Arc<Box<dyn Store>>,
65 settings: DiscoverySettings,
66 node_id: String,
67 ) -> Self {
68 let (event_tx, _) = broadcast::channel(100);
69
70 Self {
71 store,
72 services: RwLock::new(HashMap::new()),
73 settings,
74 event_tx,
75 node_id,
76 active: RwLock::new(false),
77 }
78 }
79
80 pub async fn start(&self) -> Result<()> {
82 self.load_services().await?;
84
85 let mut active = self.active.write().await;
87 *active = true;
88 drop(active);
89
90 self.start_health_checker();
92
93 self.start_ttl_checker();
95
96 info!("Service registry started");
97 Ok(())
98 }
99
100 pub async fn stop(&self) -> Result<()> {
102 let mut active = self.active.write().await;
104 *active = false;
105
106 self.save_services().await?;
108
109 info!("Service registry stopped");
110 Ok(())
111 }
112
113 pub async fn register_service(
115 &self,
116 service_name: &str,
117 host: &str,
118 port: u16,
119 protocol: &str,
120 tags: Vec<String>,
121 health_check_path: Option<String>,
122 metadata: Option<HashMap<String, String>>,
123 ) -> Result<String> {
124 let mut instance = ServiceInstance::new(
126 service_name,
127 host,
128 port,
129 protocol,
130 &self.node_id,
131 );
132
133 instance.tags = tags;
135
136 if let Some(path) = health_check_path {
138 instance.health_check_path = Some(path);
139 } else if protocol == "http" || protocol == "https" {
140 instance.health_check_path = Some(self.settings.default_health_check_path.clone());
141 }
142
143 if let Some(meta) = metadata {
145 instance.metadata = meta;
146 }
147
148 instance.set_ttl(self.settings.service_ttl_secs);
150
151 let instance_id = instance.id.clone();
153
154 let mut services = self.services.write().await;
156
157 let service = services
159 .entry(service_name.to_string())
160 .or_insert_with(|| Service::new(service_name));
161
162 service.add_instance(instance);
164
165 self.save_service(service_name, service).await?;
167
168 let _ = self.event_tx.send(RegistryEvent::ServiceRegistered {
170 service_name: service_name.to_string(),
171 instance_id: instance_id.clone(),
172 });
173
174 info!("Registered service instance {} for service {}", instance_id, service_name);
175
176 Ok(instance_id)
177 }
178
179 pub async fn deregister_service(&self, service_name: &str, instance_id: &str) -> Result<()> {
181 let mut services = self.services.write().await;
182
183 if let Some(service) = services.get_mut(service_name) {
185 if let Some(instance) = service.remove_instance(instance_id) {
187 drop(services); self.save_services().await?;
190
191 let _ = self.event_tx.send(RegistryEvent::ServiceDeregistered {
193 service_name: service_name.to_string(),
194 instance_id: instance_id.to_string(),
195 });
196
197 info!("Deregistered service instance {} for service {}", instance_id, service_name);
198
199 return Ok(());
200 }
201 }
202
203 Err(anyhow::anyhow!("Service instance not found"))
204 }
205
206 pub async fn get_service(&self, service_name: &str) -> Option<Service> {
208 let services = self.services.read().await;
209 services.get(service_name).cloned()
210 }
211
212 pub async fn get_instance(&self, service_name: &str, instance_id: &str) -> Option<ServiceInstance> {
214 let service = self.get_service(service_name).await?;
215 service.get_instance(instance_id).cloned()
216 }
217
218 pub async fn get_services(&self) -> HashMap<String, Service> {
220 let services = self.services.read().await;
221 services.clone()
222 }
223
224 pub async fn get_instances(&self, service_name: &str) -> Vec<ServiceInstance> {
226 if let Some(service) = self.get_service(service_name).await {
227 service.instances
228 } else {
229 Vec::new()
230 }
231 }
232
233 pub async fn get_healthy_instances(&self, service_name: &str) -> Vec<ServiceInstance> {
235 if let Some(service) = self.get_service(service_name).await {
236 service.instances.into_iter()
237 .filter(|i| i.health == ServiceHealth::Healthy)
238 .collect()
239 } else {
240 Vec::new()
241 }
242 }
243
244 pub async fn update_instance_health(
246 &self,
247 service_name: &str,
248 instance_id: &str,
249 health: ServiceHealth,
250 ) -> Result<()> {
251 let mut services = self.services.write().await;
252
253 if let Some(service) = services.get_mut(service_name) {
255 if let Some(instance) = service.get_instance_mut(instance_id) {
257 let old_health = instance.health;
258
259 if old_health != health {
261 instance.update_health(health);
262
263 drop(services); self.save_services().await?;
266
267 let _ = self.event_tx.send(RegistryEvent::ServiceHealthChanged {
269 service_name: service_name.to_string(),
270 instance_id: instance_id.to_string(),
271 old_health,
272 new_health: health,
273 });
274
275 debug!(
276 "Updated service instance {} health for service {} from {:?} to {:?}",
277 instance_id, service_name, old_health, health
278 );
279 }
280
281 return Ok(());
282 }
283 }
284
285 Err(anyhow::anyhow!("Service instance not found"))
286 }
287
288 pub async fn heartbeat(&self, service_name: &str, instance_id: &str) -> Result<()> {
290 let mut services = self.services.write().await;
291
292 if let Some(service) = services.get_mut(service_name) {
294 if let Some(instance) = service.get_instance_mut(instance_id) {
296 instance.heartbeat();
298
299 if instance.health != ServiceHealth::Healthy {
301 drop(services); self.update_instance_health(service_name, instance_id, ServiceHealth::Healthy).await?;
303 }
304
305 return Ok(());
306 }
307 }
308
309 Err(anyhow::anyhow!("Service instance not found"))
310 }
311
312 pub fn subscribe(&self) -> broadcast::Receiver<RegistryEvent> {
314 self.event_tx.subscribe()
315 }
316
317 async fn load_services(&self) -> Result<()> {
319 let services_data = self.store.get("services").await?;
320
321 if let Some(data) = services_data {
322 let services: HashMap<String, Service> = serde_json::from_slice(&data)
323 .context("Failed to deserialize services from store")?;
324
325 let mut services_lock = self.services.write().await;
326 *services_lock = services;
327
328 info!("Loaded {} services from store", services_lock.len());
329 } else {
330 info!("No services found in store");
331 }
332
333 Ok(())
334 }
335
336 async fn save_services(&self) -> Result<()> {
338 let services = self.services.read().await;
339 let data = serde_json::to_vec(&*services)
340 .context("Failed to serialize services for store")?;
341
342 self.store.set("services", &data).await?;
343 debug!("Saved {} services to store", services.len());
344
345 Ok(())
346 }
347
348 async fn save_service(&self, service_name: &str, service: &Service) -> Result<()> {
350 let data = serde_json::to_vec(service)
351 .context("Failed to serialize service for store")?;
352
353 self.store.set(&format!("service:{}", service_name), &data).await?;
354 debug!("Saved service {} to store", service_name);
355
356 Ok(())
357 }
358
359 fn start_health_checker(&self) {
361 let registry = self.clone();
362 let interval_secs = self.settings.health_check_interval_secs;
363
364 tokio::spawn(async move {
365 let mut interval = interval(Duration::from_secs(interval_secs));
366
367 loop {
368 interval.tick().await;
369
370 let active = *registry.active.read().await;
372 if !active {
373 break;
374 }
375
376 if let Err(e) = registry.check_service_health().await {
378 error!("Error performing health checks: {}", e);
379 }
380 }
381 });
382 }
383
384 fn start_ttl_checker(&self) {
386 let registry = self.clone();
387 let check_interval = Duration::from_secs(self.settings.service_ttl_secs / 2);
388
389 tokio::spawn(async move {
390 let mut interval = interval(check_interval);
391
392 loop {
393 interval.tick().await;
394
395 let active = *registry.active.read().await;
397 if !active {
398 break;
399 }
400
401 if let Err(e) = registry.check_expired_instances().await {
403 error!("Error checking expired instances: {}", e);
404 }
405 }
406 });
407 }
408
409 async fn check_service_health(&self) -> Result<()> {
411 let services = self.get_services().await;
412
413 for (service_name, service) in services {
414 for instance in service.instances {
415 let service_name = service_name.clone();
416 let instance_id = instance.id.clone();
417 let registry = self.clone();
418
419 tokio::spawn(async move {
420 match registry.check_instance_health(&instance).await {
422 Ok(health) => {
423 if instance.health != health {
425 if let Err(e) = registry.update_instance_health(&service_name, &instance_id, health).await {
426 error!("Failed to update health for {}/{}: {}", service_name, instance_id, e);
427 }
428 }
429 }
430 Err(e) => {
431 error!("Health check failed for {}/{}: {}", service_name, instance_id, e);
432
433 if instance.health == ServiceHealth::Healthy {
435 if let Err(e) = registry.update_instance_health(&service_name, &instance_id, ServiceHealth::Unhealthy).await {
436 error!("Failed to mark unhealthy for {}/{}: {}", service_name, instance_id, e);
437 }
438 }
439 }
440 }
441 });
442 }
443 }
444
445 Ok(())
446 }
447
448 async fn check_instance_health(&self, instance: &ServiceInstance) -> Result<ServiceHealth> {
450 match instance.health {
452 ServiceHealth::Deregistering | ServiceHealth::Maintenance => {
453 return Ok(instance.health);
454 }
455 _ => {}
456 }
457
458 match instance.protocol.as_str() {
460 "http" | "https" => {
461 self.check_http_health(instance).await
462 }
463 "tcp" => {
464 self.check_tcp_health(instance).await
465 }
466 _ => {
467 Ok(instance.health)
469 }
470 }
471 }
472
473 async fn check_http_health(&self, instance: &ServiceInstance) -> Result<ServiceHealth> {
475 let health_path = instance.health_check_path.as_deref()
477 .unwrap_or(&self.settings.default_health_check_path);
478
479 let url = format!("{}://{}:{}{}",
481 instance.protocol,
482 instance.host,
483 instance.port,
484 health_path
485 );
486
487 let client = reqwest::Client::builder()
489 .timeout(Duration::from_millis(self.settings.health_check_timeout_ms))
490 .build()?;
491
492 let response = client.get(&url).send().await?;
494
495 if response.status().is_success() {
497 Ok(ServiceHealth::Healthy)
498 } else {
499 Ok(ServiceHealth::Unhealthy)
500 }
501 }
502
503 async fn check_tcp_health(&self, instance: &ServiceInstance) -> Result<ServiceHealth> {
505 let addr = format!("{}:{}", instance.host, instance.port);
507 let timeout = Duration::from_millis(self.settings.health_check_timeout_ms);
508
509 match tokio::time::timeout(timeout, tokio::net::TcpStream::connect(&addr)).await {
510 Ok(Ok(_)) => Ok(ServiceHealth::Healthy),
511 _ => Ok(ServiceHealth::Unhealthy),
512 }
513 }
514
515 async fn check_expired_instances(&self) -> Result<()> {
517 let services = self.get_services().await;
518 let mut to_deregister = Vec::new();
519
520 for (service_name, service) in services {
522 for instance in service.instances {
523 if instance.is_expired() {
524 to_deregister.push((service_name.clone(), instance.id.clone()));
525 }
526 }
527 }
528
529 for (service_name, instance_id) in to_deregister {
531 warn!("Deregistering expired instance {} for service {}", instance_id, service_name);
532
533 if let Err(e) = self.deregister_service(&service_name, &instance_id).await {
534 error!("Failed to deregister expired instance {}/{}: {}", service_name, instance_id, e);
535 }
536 }
537
538 Ok(())
539 }
540}
541
542impl Clone for ServiceRegistry {
543 fn clone(&self) -> Self {
544 Self {
545 store: self.store.clone(),
546 services: RwLock::new(HashMap::new()), settings: self.settings.clone(),
548 event_tx: self.event_tx.clone(),
549 node_id: self.node_id.clone(),
550 active: RwLock::new(false), }
552 }
553}