Lodestone/
discovery.rs

1// src/discovery.rs
2use crate::service::{Service, ServiceInstance, ServiceHealth};
3use crate::config::DiscoverySettings;
4use crate::store::Store;
5use anyhow::{Result, Context};
6use chrono::Utc;
7use std::collections::{HashMap, HashSet};
8use std::sync::Arc;
9use tokio::sync::{RwLock, broadcast};
10use tokio::time::{interval, Duration};
11use tracing::{debug, info, warn, error};
12use uuid::Uuid;
13
14/// Type of registry event
15#[derive(Debug, Clone)]
16pub enum RegistryEvent {
17    /// Service registered
18    ServiceRegistered {
19        service_name: String,
20        instance_id: String,
21    },
22    /// Service instance deregistered
23    ServiceDeregistered {
24        service_name: String,
25        instance_id: String,
26    },
27    /// Service instance health changed
28    ServiceHealthChanged {
29        service_name: String,
30        instance_id: String,
31        old_health: ServiceHealth,
32        new_health: ServiceHealth,
33    },
34    /// Service updates (metadata, etc.)
35    ServiceUpdated {
36        service_name: String,
37    },
38}
39
40/// Service registry for managing service discovery
41pub struct ServiceRegistry {
42    /// Store for persisting registry state
43    store: Arc<Box<dyn Store>>,
44    
45    /// Services cache
46    services: RwLock<HashMap<String, Service>>,
47    
48    /// Registry settings
49    settings: DiscoverySettings,
50    
51    /// Event sender
52    event_tx: broadcast::Sender<RegistryEvent>,
53    
54    /// Node ID running this registry
55    node_id: String,
56    
57    /// Registry is active flag
58    active: RwLock<bool>,
59}
60
61impl ServiceRegistry {
62    /// Create a new service registry
63    pub fn new(
64        store: Arc<Box<dyn Store>>,
65        settings: DiscoverySettings,
66        node_id: String,
67    ) -> Self {
68        let (event_tx, _) = broadcast::channel(100);
69        
70        Self {
71            store,
72            services: RwLock::new(HashMap::new()),
73            settings,
74            event_tx,
75            node_id,
76            active: RwLock::new(false),
77        }
78    }
79    
80    /// Start the registry
81    pub async fn start(&self) -> Result<()> {
82        // Load services from store
83        self.load_services().await?;
84        
85        // Mark as active
86        let mut active = self.active.write().await;
87        *active = true;
88        drop(active);
89        
90        // Start health checker
91        self.start_health_checker();
92        
93        // Start TTL checker
94        self.start_ttl_checker();
95        
96        info!("Service registry started");
97        Ok(())
98    }
99    
100    /// Stop the registry
101    pub async fn stop(&self) -> Result<()> {
102        // Mark as inactive
103        let mut active = self.active.write().await;
104        *active = false;
105        
106        // Save services to store
107        self.save_services().await?;
108        
109        info!("Service registry stopped");
110        Ok(())
111    }
112    
113    /// Register a new service instance
114    pub async fn register_service(
115        &self,
116        service_name: &str,
117        host: &str,
118        port: u16,
119        protocol: &str,
120        tags: Vec<String>,
121        health_check_path: Option<String>,
122        metadata: Option<HashMap<String, String>>,
123    ) -> Result<String> {
124        // Create service instance
125        let mut instance = ServiceInstance::new(
126            service_name,
127            host,
128            port,
129            protocol,
130            &self.node_id,
131        );
132        
133        // Set tags
134        instance.tags = tags;
135        
136        // Set health check path
137        if let Some(path) = health_check_path {
138            instance.health_check_path = Some(path);
139        } else if protocol == "http" || protocol == "https" {
140            instance.health_check_path = Some(self.settings.default_health_check_path.clone());
141        }
142        
143        // Set metadata
144        if let Some(meta) = metadata {
145            instance.metadata = meta;
146        }
147        
148        // Set TTL
149        instance.set_ttl(self.settings.service_ttl_secs);
150        
151        // Get instance ID
152        let instance_id = instance.id.clone();
153        
154        // Update services
155        let mut services = self.services.write().await;
156        
157        // Get or create service
158        let service = services
159            .entry(service_name.to_string())
160            .or_insert_with(|| Service::new(service_name));
161        
162        // Add instance to service
163        service.add_instance(instance);
164        
165        // Save to store
166        self.save_service(service_name, service).await?;
167        
168        // Notify of registration
169        let _ = self.event_tx.send(RegistryEvent::ServiceRegistered {
170            service_name: service_name.to_string(),
171            instance_id: instance_id.clone(),
172        });
173        
174        info!("Registered service instance {} for service {}", instance_id, service_name);
175        
176        Ok(instance_id)
177    }
178    
179    /// Deregister a service instance
180    pub async fn deregister_service(&self, service_name: &str, instance_id: &str) -> Result<()> {
181        let mut services = self.services.write().await;
182        
183        // Find service
184        if let Some(service) = services.get_mut(service_name) {
185            // Remove instance
186            if let Some(instance) = service.remove_instance(instance_id) {
187                // Save to store
188                drop(services); // Release lock before async call
189                self.save_services().await?;
190                
191                // Notify of deregistration
192                let _ = self.event_tx.send(RegistryEvent::ServiceDeregistered {
193                    service_name: service_name.to_string(),
194                    instance_id: instance_id.to_string(),
195                });
196                
197                info!("Deregistered service instance {} for service {}", instance_id, service_name);
198                
199                return Ok(());
200            }
201        }
202        
203        Err(anyhow::anyhow!("Service instance not found"))
204    }
205    
206    /// Get a service by name
207    pub async fn get_service(&self, service_name: &str) -> Option<Service> {
208        let services = self.services.read().await;
209        services.get(service_name).cloned()
210    }
211    
212    /// Get a service instance
213    pub async fn get_instance(&self, service_name: &str, instance_id: &str) -> Option<ServiceInstance> {
214        let service = self.get_service(service_name).await?;
215        service.get_instance(instance_id).cloned()
216    }
217    
218    /// Get all services
219    pub async fn get_services(&self) -> HashMap<String, Service> {
220        let services = self.services.read().await;
221        services.clone()
222    }
223    
224    /// Get all instances for a service
225    pub async fn get_instances(&self, service_name: &str) -> Vec<ServiceInstance> {
226        if let Some(service) = self.get_service(service_name).await {
227            service.instances
228        } else {
229            Vec::new()
230        }
231    }
232    
233    /// Get healthy instances for a service
234    pub async fn get_healthy_instances(&self, service_name: &str) -> Vec<ServiceInstance> {
235        if let Some(service) = self.get_service(service_name).await {
236            service.instances.into_iter()
237                .filter(|i| i.health == ServiceHealth::Healthy)
238                .collect()
239        } else {
240            Vec::new()
241        }
242    }
243    
244    /// Update service instance health
245    pub async fn update_instance_health(
246        &self,
247        service_name: &str,
248        instance_id: &str,
249        health: ServiceHealth,
250    ) -> Result<()> {
251        let mut services = self.services.write().await;
252        
253        // Find service
254        if let Some(service) = services.get_mut(service_name) {
255            // Find instance
256            if let Some(instance) = service.get_instance_mut(instance_id) {
257                let old_health = instance.health;
258                
259                // Update health if changed
260                if old_health != health {
261                    instance.update_health(health);
262                    
263                    // Save to store
264                    drop(services); // Release lock before async call
265                    self.save_services().await?;
266                    
267                    // Notify of health change
268                    let _ = self.event_tx.send(RegistryEvent::ServiceHealthChanged {
269                        service_name: service_name.to_string(),
270                        instance_id: instance_id.to_string(),
271                        old_health,
272                        new_health: health,
273                    });
274                    
275                    debug!(
276                        "Updated service instance {} health for service {} from {:?} to {:?}",
277                        instance_id, service_name, old_health, health
278                    );
279                }
280                
281                return Ok(());
282            }
283        }
284        
285        Err(anyhow::anyhow!("Service instance not found"))
286    }
287    
288    /// Record a heartbeat for an instance
289    pub async fn heartbeat(&self, service_name: &str, instance_id: &str) -> Result<()> {
290        let mut services = self.services.write().await;
291        
292        // Find service
293        if let Some(service) = services.get_mut(service_name) {
294            // Find instance
295            if let Some(instance) = service.get_instance_mut(instance_id) {
296                // Record heartbeat
297                instance.heartbeat();
298                
299                // If instance was unhealthy, mark as healthy
300                if instance.health != ServiceHealth::Healthy {
301                    drop(services); // Release lock before recursive call
302                    self.update_instance_health(service_name, instance_id, ServiceHealth::Healthy).await?;
303                }
304                
305                return Ok(());
306            }
307        }
308        
309        Err(anyhow::anyhow!("Service instance not found"))
310    }
311    
312    /// Subscribe to registry events
313    pub fn subscribe(&self) -> broadcast::Receiver<RegistryEvent> {
314        self.event_tx.subscribe()
315    }
316    
317    /// Load services from store
318    async fn load_services(&self) -> Result<()> {
319        let services_data = self.store.get("services").await?;
320        
321        if let Some(data) = services_data {
322            let services: HashMap<String, Service> = serde_json::from_slice(&data)
323                .context("Failed to deserialize services from store")?;
324            
325            let mut services_lock = self.services.write().await;
326            *services_lock = services;
327            
328            info!("Loaded {} services from store", services_lock.len());
329        } else {
330            info!("No services found in store");
331        }
332        
333        Ok(())
334    }
335    
336    /// Save all services to store
337    async fn save_services(&self) -> Result<()> {
338        let services = self.services.read().await;
339        let data = serde_json::to_vec(&*services)
340            .context("Failed to serialize services for store")?;
341        
342        self.store.set("services", &data).await?;
343        debug!("Saved {} services to store", services.len());
344        
345        Ok(())
346    }
347    
348    /// Save a specific service to store
349    async fn save_service(&self, service_name: &str, service: &Service) -> Result<()> {
350        let data = serde_json::to_vec(service)
351            .context("Failed to serialize service for store")?;
352        
353        self.store.set(&format!("service:{}", service_name), &data).await?;
354        debug!("Saved service {} to store", service_name);
355        
356        Ok(())
357    }
358    
359    /// Start the health checker
360    fn start_health_checker(&self) {
361        let registry = self.clone();
362        let interval_secs = self.settings.health_check_interval_secs;
363        
364        tokio::spawn(async move {
365            let mut interval = interval(Duration::from_secs(interval_secs));
366            
367            loop {
368                interval.tick().await;
369                
370                // Check if registry is still active
371                let active = *registry.active.read().await;
372                if !active {
373                    break;
374                }
375                
376                // Perform health checks
377                if let Err(e) = registry.check_service_health().await {
378                    error!("Error performing health checks: {}", e);
379                }
380            }
381        });
382    }
383    
384    /// Start the TTL checker
385    fn start_ttl_checker(&self) {
386        let registry = self.clone();
387        let check_interval = Duration::from_secs(self.settings.service_ttl_secs / 2);
388        
389        tokio::spawn(async move {
390            let mut interval = interval(check_interval);
391            
392            loop {
393                interval.tick().await;
394                
395                // Check if registry is still active
396                let active = *registry.active.read().await;
397                if !active {
398                    break;
399                }
400                
401                // Check for expired instances
402                if let Err(e) = registry.check_expired_instances().await {
403                    error!("Error checking expired instances: {}", e);
404                }
405            }
406        });
407    }
408    
409    /// Perform health checks on all instances
410    async fn check_service_health(&self) -> Result<()> {
411        let services = self.get_services().await;
412        
413        for (service_name, service) in services {
414            for instance in service.instances {
415                let service_name = service_name.clone();
416                let instance_id = instance.id.clone();
417                let registry = self.clone();
418                
419                tokio::spawn(async move {
420                    // Check instance health asynchronously
421                    match registry.check_instance_health(&instance).await {
422                        Ok(health) => {
423                            // Update health status if needed
424                            if instance.health != health {
425                                if let Err(e) = registry.update_instance_health(&service_name, &instance_id, health).await {
426                                    error!("Failed to update health for {}/{}: {}", service_name, instance_id, e);
427                                }
428                            }
429                        }
430                        Err(e) => {
431                            error!("Health check failed for {}/{}: {}", service_name, instance_id, e);
432                            
433                            // Mark as unhealthy on error
434                            if instance.health == ServiceHealth::Healthy {
435                                if let Err(e) = registry.update_instance_health(&service_name, &instance_id, ServiceHealth::Unhealthy).await {
436                                    error!("Failed to mark unhealthy for {}/{}: {}", service_name, instance_id, e);
437                                }
438                            }
439                        }
440                    }
441                });
442            }
443        }
444        
445        Ok(())
446    }
447    
448    /// Check an individual instance's health
449    async fn check_instance_health(&self, instance: &ServiceInstance) -> Result<ServiceHealth> {
450        // Skip instances in certain states
451        match instance.health {
452            ServiceHealth::Deregistering | ServiceHealth::Maintenance => {
453                return Ok(instance.health);
454            }
455            _ => {}
456        }
457        
458        // Different health check based on protocol
459        match instance.protocol.as_str() {
460            "http" | "https" => {
461                self.check_http_health(instance).await
462            }
463            "tcp" => {
464                self.check_tcp_health(instance).await
465            }
466            _ => {
467                // For other protocols, just trust the current health status
468                Ok(instance.health)
469            }
470        }
471    }
472    
473    /// Check HTTP health for an instance
474    async fn check_http_health(&self, instance: &ServiceInstance) -> Result<ServiceHealth> {
475        // Get health check path
476        let health_path = instance.health_check_path.as_deref()
477            .unwrap_or(&self.settings.default_health_check_path);
478        
479        // Build health check URL
480        let url = format!("{}://{}:{}{}", 
481            instance.protocol, 
482            instance.host, 
483            instance.port, 
484            health_path
485        );
486        
487        // Create HTTP client with timeout
488        let client = reqwest::Client::builder()
489            .timeout(Duration::from_millis(self.settings.health_check_timeout_ms))
490            .build()?;
491        
492        // Perform the health check
493        let response = client.get(&url).send().await?;
494        
495        // Check status code
496        if response.status().is_success() {
497            Ok(ServiceHealth::Healthy)
498        } else {
499            Ok(ServiceHealth::Unhealthy)
500        }
501    }
502    
503    /// Check TCP health for an instance
504    async fn check_tcp_health(&self, instance: &ServiceInstance) -> Result<ServiceHealth> {
505        // Try to connect to the instance
506        let addr = format!("{}:{}", instance.host, instance.port);
507        let timeout = Duration::from_millis(self.settings.health_check_timeout_ms);
508        
509        match tokio::time::timeout(timeout, tokio::net::TcpStream::connect(&addr)).await {
510            Ok(Ok(_)) => Ok(ServiceHealth::Healthy),
511            _ => Ok(ServiceHealth::Unhealthy),
512        }
513    }
514    
515    /// Check for expired TTLs
516    async fn check_expired_instances(&self) -> Result<()> {
517        let services = self.get_services().await;
518        let mut to_deregister = Vec::new();
519        
520        // Collect instances to deregister
521        for (service_name, service) in services {
522            for instance in service.instances {
523                if instance.is_expired() {
524                    to_deregister.push((service_name.clone(), instance.id.clone()));
525                }
526            }
527        }
528        
529        // Deregister expired instances
530        for (service_name, instance_id) in to_deregister {
531            warn!("Deregistering expired instance {} for service {}", instance_id, service_name);
532            
533            if let Err(e) = self.deregister_service(&service_name, &instance_id).await {
534                error!("Failed to deregister expired instance {}/{}: {}", service_name, instance_id, e);
535            }
536        }
537        
538        Ok(())
539    }
540}
541
542impl Clone for ServiceRegistry {
543    fn clone(&self) -> Self {
544        Self {
545            store: self.store.clone(),
546            services: RwLock::new(HashMap::new()), // Empty services map in clone
547            settings: self.settings.clone(),
548            event_tx: self.event_tx.clone(),
549            node_id: self.node_id.clone(),
550            active: RwLock::new(false), // Clones are not active by default
551        }
552    }
553}