Lodestone/
api.rs

1// src/api.rs
2use crate::discovery::ServiceRegistry;
3use crate::service::ServiceHealth;
4use crate::router::Router;
5use anyhow::{Result, Context};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use warp::{Filter, Reply, Rejection};
12use warp::filters::BoxedFilter;
13use tracing::info;
14
15/// API server for handling HTTP requests
16#[derive(Clone)]
17pub struct ApiServer {
18    /// Bind address
19    bind_addr: String,
20    
21    /// Service registry
22    service_registry: Arc<ServiceRegistry>,
23    
24    /// Router
25    router: Arc<tokio::sync::Mutex<Router>>,
26    
27    /// Raft manager (optional)
28//    raft_manager: Option<RaftManager>,
29    
30    /// Shutdown channel
31    shutdown_tx: Option<mpsc::Sender<()>>,
32}
33
34impl ApiServer {
35    /// Create a new API server
36    pub fn new(
37        bind_addr: &str,
38        service_registry: Arc<ServiceRegistry>,
39        router: Arc<tokio::sync::Mutex<Router>>,
40        // raft_manager: Option<RaftManager>,
41    ) -> Self {
42        Self {
43            bind_addr: bind_addr.to_string(),
44            service_registry,
45            router,
46            // raft_manager,
47            shutdown_tx: None,
48        }
49    }
50    
51    /// Start the API server
52    pub async fn start(&mut self) -> Result<()> {
53        info!("Starting API server on {}", self.bind_addr);
54        
55        // Set up routes
56        let api_routes = self.setup_routes();
57        
58        // Create shutdown channel
59        let (tx, mut rx) = mpsc::channel::<()>(1);
60        self.shutdown_tx = Some(tx);
61        
62        // Parse bind address
63        let socket_addr: SocketAddr = self.bind_addr.parse()
64            .context("Invalid bind address")?;
65        
66        // Start server
67        let (_, server) = warp::serve(api_routes)
68            .bind_with_graceful_shutdown(socket_addr, async move {
69                let _ = rx.recv().await;
70                info!("API server shutdown signal received");
71            });
72        
73        // Spawn server
74        tokio::spawn(server);
75        
76        info!("API server started");
77        
78        Ok(())
79    }
80    
81    /// Shutdown the API server
82    pub async fn shutdown(&self) -> Result<()> {
83        info!("Shutting down API server");
84        
85        // Send shutdown signal
86        if let Some(tx) = &self.shutdown_tx {
87            let _ = tx.send(()).await;
88        }
89        
90        Ok(())
91    }
92    
93    /// Set up all API routes
94    fn setup_routes(&self) -> BoxedFilter<(impl Reply,)> {
95        // Clone required services
96        let service_registry = self.service_registry.clone();
97        let router = Arc::clone(&self.router);
98        // let raft_manager = self.raft_manager.clone();
99        
100        // Health check endpoint
101        let health = warp::path!("health")
102            .and(warp::get())
103            .map(|| {
104                warp::reply::json(&HealthResponse {
105                    status: "ok".to_string(),
106                    message: "Service is healthy".to_string(),
107                })
108            });
109        
110        // Setup service registry routes
111        let service_routes = self.setup_service_routes(service_registry.clone());
112        
113        // Setup router routes
114        let router_routes = self.setup_router_routes(&router);
115
116        // Setup metrics endpoint
117        let metrics = warp::path!("metrics")
118            .and(warp::get())
119            .map(|| {
120                // This would integrate with a metrics system in a real implementation
121                warp::reply::with_status(
122                    "# Metrics would be here",
123                    warp::http::StatusCode::OK,
124                )
125            });
126        
127        // Combine all routes
128        health
129            .or(service_routes)
130            .or(router_routes)
131    //        .or(cluster_routes)
132    //        .or(raft_routes)
133            .or(metrics)
134            .recover(handle_rejection)
135            .with(warp::log("api"))
136            .boxed()
137    }
138    
139    /// Setup service registry routes
140    fn setup_service_routes(&self, registry: Arc<ServiceRegistry>) -> BoxedFilter<(impl Reply,)> {
141        // List all services
142        let list_services = warp::path!("services")
143            .and(warp::get())
144            .and(with_registry(registry.clone()))
145            .and_then(handle_list_services);
146        
147        // Get service details
148        let get_service = warp::path!("services" / String)
149            .and(warp::get())
150            .and(with_registry(registry.clone()))
151            .and_then(handle_get_service);
152        
153        // Register a service
154        let register_service = warp::path!("services")
155            .and(warp::post())
156            .and(warp::body::json())
157            .and(with_registry(registry.clone()))
158            .and_then(handle_register_service);
159        
160        // Deregister a service
161        let deregister_service = warp::path!("services" / String / String)
162            .and(warp::delete())
163            .and(with_registry(registry.clone()))
164            .and_then(handle_deregister_service);
165        
166        // Update service health
167        let update_health = warp::path!("services" / String / String / "health")
168            .and(warp::put())
169            .and(warp::body::json())
170            .and(with_registry(registry.clone()))
171            .and_then(handle_update_health);
172        
173        // Record service heartbeat
174        let heartbeat = warp::path!("services" / String / String / "heartbeat")
175            .and(warp::post())
176            .and(with_registry(registry.clone()))
177            .and_then(handle_heartbeat);
178        
179        // Combine service routes
180        warp::path("v1").and(
181            list_services
182                .or(get_service)
183                .or(register_service)
184                .or(deregister_service)
185                .or(update_health)
186                .or(heartbeat)
187        ).boxed()
188    }
189    
190    /// Setup router routes
191    fn setup_router_routes(&self, router: &Arc<tokio::sync::Mutex<Router>>) -> BoxedFilter<(impl Reply,)> {
192        // Clone service registry
193        let service_registry = self.service_registry.clone();
194
195        // Get all routes
196        let get_routes = warp::path!("routes")
197            .and(warp::get())
198            .and(with_router(router.clone()))
199            .and_then(handle_get_routes)
200            .boxed();
201        
202        // Get a specific route
203        let get_route = warp::path!("routes" / String)
204            .and(warp::get())
205            .and(with_router(router.clone()))
206            .and_then(handle_get_route)
207            .boxed();
208        
209        // Create or update a route
210        let upsert_route = warp::path!("routes" / String)
211            .and(warp::put())
212            .and(warp::body::json())
213            .and(with_router(router.clone()))
214            .and(with_registry(service_registry.clone()))
215            .and_then(handle_upsert_route);
216        
217        // Delete a route
218        let delete_route = warp::path!("routes" / String)
219            .and(warp::delete())
220            .and(with_router(router.clone()))
221            .and_then(handle_delete_route);
222        
223        // Update TCP config
224        let update_tcp = warp::path!("router" / "tcp")
225            .and(warp::put())
226            .and(warp::body::json())
227            .and(with_router(router.clone()))
228            .and_then(handle_update_tcp);
229        
230        // Update global router settings
231        let update_settings = warp::path!("router" / "settings")
232            .and(warp::put())
233            .and(warp::body::json())
234            .and(with_router(router.clone()))
235            .and_then(handle_update_settings);
236        
237        // Combine router routes
238        warp::path("v1").and(
239            get_routes
240                .or(get_route)
241                .or(upsert_route)
242                .or(delete_route)
243                .or(update_tcp)
244                .or(update_settings)
245        ).boxed()
246    }
247}
248
249// Helper function to inject the service registry into route handlers
250fn with_registry(registry: Arc<ServiceRegistry>) -> impl Filter<Extract = (Arc<ServiceRegistry>,), Error = std::convert::Infallible> + Clone {
251    warp::any().map(move || registry.clone())
252}
253
254// Helper function to inject the router into route handlers
255fn with_router(router: Arc<tokio::sync::Mutex<Router>>) -> impl Filter<Extract = (Arc<tokio::sync::Mutex<Router>>,), Error = std::convert::Infallible> + Clone {
256    warp::any().map(move || router.clone())
257}
258
259/// Service registration request
260#[derive(Debug, Deserialize)]
261struct RegisterServiceRequest {
262    /// Service name
263    name: String,
264    
265    /// Host address
266    host: String,
267    
268    /// Port number
269    port: u16,
270    
271    /// Protocol (http, https, tcp, udp)
272    protocol: String,
273    
274    /// Health check path (for HTTP/HTTPS)
275    health_check_path: Option<String>,
276    
277    /// Tags for categorization
278    tags: Option<Vec<String>>,
279    
280    /// Instance-specific metadata
281    metadata: Option<HashMap<String, String>>,
282    
283    /// Time to live in seconds before deregistration
284    ttl: Option<u64>,
285}
286
287/// Service health update request
288#[derive(Debug, Deserialize)]
289struct HealthUpdateRequest {
290    /// New health status
291    health: String,
292}
293
294/// TCP configuration request
295#[derive(Debug, Deserialize)]
296struct TcpConfigRequest {
297    /// Enable TCP proxy
298    enabled: Option<bool>,
299    
300    /// TCP listen address
301    listen_addr: Option<String>,
302    
303    /// Enable connection pooling
304    connection_pooling: Option<bool>,
305    
306    /// Maximum idle time in seconds
307    max_idle_time_secs: Option<u64>,
308    
309    /// Enable UDP proxy
310    udp_enabled: Option<bool>,
311    
312    /// UDP listen address
313    udp_listen_addr: Option<String>,
314}
315
316/// Global router settings request
317#[derive(Debug, Deserialize)]
318struct RouterSettingsRequest {
319    /// HTTP listen address
320    listen_addr: Option<String>,
321    
322    /// Global timeout in milliseconds
323    global_timeout_ms: Option<u64>,
324    
325    /// Maximum number of connections
326    max_connections: Option<usize>,
327}
328
329/// Node addition request
330#[derive(Debug, Deserialize)]
331struct AddNodeRequest {
332/// Node ID
333node_id: u64,
334    
335/// Node address
336address: String,
337}
338
339/// Standard API response
340#[derive(Debug, Serialize)]
341struct ApiResponse<T> {
342/// Success flag
343success: bool,
344
345/// Response message
346message: String,
347
348/// Response data
349#[serde(skip_serializing_if = "Option::is_none")]
350data: Option<T>,
351}
352
353/// Health check response
354#[derive(Debug, Serialize)]
355struct HealthResponse {
356/// Health status
357status: String,
358
359/// Health message
360message: String,
361}
362
363/// Create a success response
364fn success<T>(message: &str, data: Option<T>) -> ApiResponse<T> {
365ApiResponse {
366    success: true,
367    message: message.to_string(),
368    data,
369}
370}
371
372/// Create an error response
373fn error<T>(message: &str) -> ApiResponse<T> {
374ApiResponse {
375    success: false,
376    message: message.to_string(),
377    data: None,
378}
379}
380
381/// Handle errors
382async fn handle_rejection(err: Rejection) -> Result<impl Reply, Rejection> {
383let message = if err.is_not_found() {
384    "Not found".to_string()
385} else if let Some(e) = err.find::<warp::filters::body::BodyDeserializeError>() {
386    format!("Invalid request data: {}", e)
387} else {
388    "Internal server error".to_string()
389};
390
391let json = warp::reply::json(&error::<()>(&message));
392let status = if err.is_not_found() {
393    warp::http::StatusCode::NOT_FOUND
394} else if err.find::<warp::filters::body::BodyDeserializeError>().is_some() {
395    warp::http::StatusCode::BAD_REQUEST
396} else {
397    warp::http::StatusCode::INTERNAL_SERVER_ERROR
398};
399
400Ok(warp::reply::with_status(json, status))
401}
402
403/// Handle listing all services
404async fn handle_list_services(
405registry: Arc<ServiceRegistry>,
406) -> Result<impl Reply, Rejection> {
407let services = registry.get_services().await;
408Ok(warp::reply::with_status(
409    warp::reply::json(&success("Services retrieved", Some(services))),
410    warp::http::StatusCode::OK
411))
412}
413
414/// Handle getting a specific service
415async fn handle_get_service(
416name: String,
417registry: Arc<ServiceRegistry>,
418) -> Result<impl Reply, Rejection> {
419match registry.get_service(&name).await {
420    Some(service) => Ok(warp::reply::with_status(
421        warp::reply::json(&success("Service retrieved", Some(service))),
422        warp::http::StatusCode::OK
423    )),
424    None => Ok(warp::reply::with_status(
425        warp::reply::json(&error::<()>(&format!("Service not found: {}", name))),
426        warp::http::StatusCode::NOT_FOUND
427    ))
428}
429}
430
431/// Handle registering a service
432async fn handle_register_service(
433request: RegisterServiceRequest,
434registry: Arc<ServiceRegistry>,
435) -> Result<impl Reply, Rejection> {
436// Convert tags and metadata
437let tags = request.tags.unwrap_or_default();
438let metadata = request.metadata.unwrap_or_default();
439
440// Register service
441match registry.register_service(
442    &request.name,
443    &request.host,
444    request.port,
445    &request.protocol,
446    tags,
447    request.health_check_path,
448    Some(metadata),
449).await {
450    Ok(instance_id) => {
451        let response_data = HashMap::from([
452            ("service_name", request.name),
453            ("instance_id", instance_id),
454        ]);
455        
456        Ok(warp::reply::with_status(
457            warp::reply::json(&success("Service registered successfully", Some(response_data))),
458            warp::http::StatusCode::CREATED
459        ))
460    }
461    Err(e) => Ok(warp::reply::with_status(
462        warp::reply::json(&error::<()>(&format!("Failed to register service: {}", e))),
463        warp::http::StatusCode::INTERNAL_SERVER_ERROR
464    ))
465}
466}
467
468/// Handle deregistering a service
469async fn handle_deregister_service(
470service_name: String,
471instance_id: String,
472registry: Arc<ServiceRegistry>,
473) -> Result<impl Reply, Rejection> {
474match registry.deregister_service(&service_name, &instance_id).await {
475    Ok(_) => Ok(warp::reply::with_status(
476        warp::reply::json(&success::<()>("Service deregistered successfully", None)),
477        warp::http::StatusCode::OK
478    )),
479    Err(e) => Ok(warp::reply::with_status(
480        warp::reply::json(&error::<()>(&format!("Failed to deregister service: {}", e))),
481        warp::http::StatusCode::INTERNAL_SERVER_ERROR
482    ))
483}
484}
485
486/// Handle updating service health
487async fn handle_update_health(
488service_name: String,
489instance_id: String,
490request: HealthUpdateRequest,
491registry: Arc<ServiceRegistry>,
492) -> Result<impl Reply, Rejection> {
493// Convert health string to enum
494let health = match request.health.to_lowercase().as_str() {
495    "healthy" => ServiceHealth::Healthy,
496    "unhealthy" => ServiceHealth::Unhealthy,
497    "unknown" => ServiceHealth::Unknown,
498    "starting" => ServiceHealth::Starting,
499    "maintenance" => ServiceHealth::Maintenance,
500    "deregistering" => ServiceHealth::Deregistering,
501    _ => return Ok(warp::reply::with_status(
502        warp::reply::json(&error::<()>(&format!("Invalid health status: {}", request.health))),
503        warp::http::StatusCode::BAD_REQUEST
504    )),
505};
506
507// Update health
508match registry.update_instance_health(&service_name, &instance_id, health).await {
509    Ok(_) => Ok(warp::reply::with_status(
510        warp::reply::json(&success::<()>("Health updated successfully", None)),
511        warp::http::StatusCode::OK
512    )),
513    Err(e) => Ok(warp::reply::with_status(
514        warp::reply::json(&error::<()>(&format!("Failed to update health: {}", e))),
515        warp::http::StatusCode::INTERNAL_SERVER_ERROR
516    ))
517}
518}
519
520/// Handle service heartbeat
521async fn handle_heartbeat(
522service_name: String,
523instance_id: String,
524registry: Arc<ServiceRegistry>,
525) -> Result<impl Reply, Rejection> {
526match registry.heartbeat(&service_name, &instance_id).await {
527    Ok(_) => Ok(warp::reply::with_status(
528        warp::reply::json(&success::<()>("Heartbeat recorded successfully", None)),
529        warp::http::StatusCode::OK
530    )),
531    Err(e) => Ok(warp::reply::with_status(
532        warp::reply::json(&error::<()>(&format!("Failed to record heartbeat: {}", e))),
533        warp::http::StatusCode::INTERNAL_SERVER_ERROR
534    ))
535}
536}
537
538/// Handle getting all routes
539async fn handle_get_routes(
540_router: Arc<tokio::sync::Mutex<Router>>,
541) -> Result<impl Reply, Rejection> {
542// Placeholder implementation
543Ok(warp::reply::with_status(
544    warp::reply::json(&success("Routes retrieved", Some(HashMap::<String, String>::new()))),
545    warp::http::StatusCode::OK
546))
547}
548
549/// Handle getting a specific route
550async fn handle_get_route(
551_name: String,
552_router: Arc<tokio::sync::Mutex<Router>>,
553) -> Result<impl Reply, Rejection> {
554// Placeholder implementation
555Ok(warp::reply::with_status(
556    warp::reply::json(&success("Route retrieved", Some(HashMap::<String, String>::new()))),
557    warp::http::StatusCode::OK
558))
559}
560
561#[derive(Debug, Deserialize)]
562struct RouteConfigRequest {
563    /// Upstream URL (can now include service:// prefix)
564    upstream: String,
565    
566    /// Timeout in milliseconds
567    timeout_ms: Option<u64>,
568    
569    /// Number of retry attempts
570    retry_count: Option<u32>,
571    
572    /// Route priority
573    priority: Option<i32>,
574    
575    /// Preserve host header
576    preserve_host_header: Option<bool>,
577}
578
579/// Handle upserting a route
580async fn handle_upsert_route(
581    name: String,
582    request: RouteConfigRequest,
583    router: Arc<tokio::sync::Mutex<Router>>,
584    service_registry: Arc<ServiceRegistry>,
585) -> Result<impl Reply, Rejection> {
586    // Check if this is a service-based route
587    let upstream = if request.upstream.starts_with("service://") {
588        let service_name = request.upstream.replace("service://", "");
589        
590        // Find healthy instances of the service
591        let instances = service_registry.get_healthy_instances(&service_name).await;
592        
593        if instances.is_empty() {
594            return Ok(warp::reply::with_status(
595                warp::reply::json(&error::<()>(&format!("No healthy instances for service {}", service_name))),
596                warp::http::StatusCode::BAD_REQUEST
597            ));
598        }
599        
600        // For now, just use the first healthy instance
601        // In a real implementation, you'd want more sophisticated load balancing
602        let instance = instances.first().unwrap();
603        format!("{}://{}:{}", instance.protocol, instance.host, instance.port)
604    } else {
605        request.upstream
606    };
607
608    // Convert request to route configuration
609    let mut route_config = harbr_router::RouteConfig::new(&upstream);
610
611    if let Some(timeout) = request.timeout_ms {
612        route_config = route_config.with_timeout(timeout);
613    }
614
615    if let Some(retry_count) = request.retry_count {
616        route_config = route_config.with_retry_count(retry_count);
617    }
618
619    if let Some(priority) = request.priority {
620        route_config = route_config.with_priority(priority);
621    }
622
623    if let Some(preserve) = request.preserve_host_header {
624        route_config = route_config.preserve_host_header(preserve);
625    }
626
627    // Add route to router
628    let mut router_lock = router.lock().await;
629    router_lock.add_route(&name, route_config);
630
631    Ok(warp::reply::with_status(
632        warp::reply::json(&success::<()>("Route updated successfully", None)),
633        warp::http::StatusCode::OK
634    ))
635}
636
637/// Handle deleting a route
638async fn handle_delete_route(
639_name: String,
640_router: Arc<tokio::sync::Mutex<Router>>,
641) -> Result<impl Reply, Rejection> {
642// Placeholder implementation
643Ok(warp::reply::with_status(
644    warp::reply::json(&success::<()>("Route deleted successfully", None)),
645    warp::http::StatusCode::OK
646))
647}
648
649/// Handle updating TCP configuration
650async fn handle_update_tcp(
651_request: TcpConfigRequest,
652_router: Arc<tokio::sync::Mutex<Router>>,
653) -> Result<impl Reply, Rejection> {
654// Placeholder implementation
655Ok(warp::reply::with_status(
656    warp::reply::json(&success::<()>("TCP configuration updated successfully", None)),
657    warp::http::StatusCode::OK
658))
659}
660
661/// Handle updating global router settings
662async fn handle_update_settings(
663_request: RouterSettingsRequest,
664_router: Arc<tokio::sync::Mutex<Router>>,
665) -> Result<impl Reply, Rejection> {
666// Placeholder implementation
667Ok(warp::reply::with_status(
668    warp::reply::json(&success::<()>("Router settings updated successfully", None)),
669    warp::http::StatusCode::OK
670))
671}