1use crate::discovery::ServiceRegistry;
3use crate::service::ServiceHealth;
4use crate::router::Router;
5use anyhow::{Result, Context};
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::net::SocketAddr;
9use std::sync::Arc;
10use tokio::sync::mpsc;
11use warp::{Filter, Reply, Rejection};
12use warp::filters::BoxedFilter;
13use tracing::info;
14
15#[derive(Clone)]
17pub struct ApiServer {
18 bind_addr: String,
20
21 service_registry: Arc<ServiceRegistry>,
23
24 router: Arc<tokio::sync::Mutex<Router>>,
26
27 shutdown_tx: Option<mpsc::Sender<()>>,
32}
33
34impl ApiServer {
35 pub fn new(
37 bind_addr: &str,
38 service_registry: Arc<ServiceRegistry>,
39 router: Arc<tokio::sync::Mutex<Router>>,
40 ) -> Self {
42 Self {
43 bind_addr: bind_addr.to_string(),
44 service_registry,
45 router,
46 shutdown_tx: None,
48 }
49 }
50
51 pub async fn start(&mut self) -> Result<()> {
53 info!("Starting API server on {}", self.bind_addr);
54
55 let api_routes = self.setup_routes();
57
58 let (tx, mut rx) = mpsc::channel::<()>(1);
60 self.shutdown_tx = Some(tx);
61
62 let socket_addr: SocketAddr = self.bind_addr.parse()
64 .context("Invalid bind address")?;
65
66 let (_, server) = warp::serve(api_routes)
68 .bind_with_graceful_shutdown(socket_addr, async move {
69 let _ = rx.recv().await;
70 info!("API server shutdown signal received");
71 });
72
73 tokio::spawn(server);
75
76 info!("API server started");
77
78 Ok(())
79 }
80
81 pub async fn shutdown(&self) -> Result<()> {
83 info!("Shutting down API server");
84
85 if let Some(tx) = &self.shutdown_tx {
87 let _ = tx.send(()).await;
88 }
89
90 Ok(())
91 }
92
93 fn setup_routes(&self) -> BoxedFilter<(impl Reply,)> {
95 let service_registry = self.service_registry.clone();
97 let router = Arc::clone(&self.router);
98 let health = warp::path!("health")
102 .and(warp::get())
103 .map(|| {
104 warp::reply::json(&HealthResponse {
105 status: "ok".to_string(),
106 message: "Service is healthy".to_string(),
107 })
108 });
109
110 let service_routes = self.setup_service_routes(service_registry.clone());
112
113 let router_routes = self.setup_router_routes(&router);
115
116 let metrics = warp::path!("metrics")
118 .and(warp::get())
119 .map(|| {
120 warp::reply::with_status(
122 "# Metrics would be here",
123 warp::http::StatusCode::OK,
124 )
125 });
126
127 health
129 .or(service_routes)
130 .or(router_routes)
131 .or(metrics)
134 .recover(handle_rejection)
135 .with(warp::log("api"))
136 .boxed()
137 }
138
139 fn setup_service_routes(&self, registry: Arc<ServiceRegistry>) -> BoxedFilter<(impl Reply,)> {
141 let list_services = warp::path!("services")
143 .and(warp::get())
144 .and(with_registry(registry.clone()))
145 .and_then(handle_list_services);
146
147 let get_service = warp::path!("services" / String)
149 .and(warp::get())
150 .and(with_registry(registry.clone()))
151 .and_then(handle_get_service);
152
153 let register_service = warp::path!("services")
155 .and(warp::post())
156 .and(warp::body::json())
157 .and(with_registry(registry.clone()))
158 .and_then(handle_register_service);
159
160 let deregister_service = warp::path!("services" / String / String)
162 .and(warp::delete())
163 .and(with_registry(registry.clone()))
164 .and_then(handle_deregister_service);
165
166 let update_health = warp::path!("services" / String / String / "health")
168 .and(warp::put())
169 .and(warp::body::json())
170 .and(with_registry(registry.clone()))
171 .and_then(handle_update_health);
172
173 let heartbeat = warp::path!("services" / String / String / "heartbeat")
175 .and(warp::post())
176 .and(with_registry(registry.clone()))
177 .and_then(handle_heartbeat);
178
179 warp::path("v1").and(
181 list_services
182 .or(get_service)
183 .or(register_service)
184 .or(deregister_service)
185 .or(update_health)
186 .or(heartbeat)
187 ).boxed()
188 }
189
190 fn setup_router_routes(&self, router: &Arc<tokio::sync::Mutex<Router>>) -> BoxedFilter<(impl Reply,)> {
192 let service_registry = self.service_registry.clone();
194
195 let get_routes = warp::path!("routes")
197 .and(warp::get())
198 .and(with_router(router.clone()))
199 .and_then(handle_get_routes)
200 .boxed();
201
202 let get_route = warp::path!("routes" / String)
204 .and(warp::get())
205 .and(with_router(router.clone()))
206 .and_then(handle_get_route)
207 .boxed();
208
209 let upsert_route = warp::path!("routes" / String)
211 .and(warp::put())
212 .and(warp::body::json())
213 .and(with_router(router.clone()))
214 .and(with_registry(service_registry.clone()))
215 .and_then(handle_upsert_route);
216
217 let delete_route = warp::path!("routes" / String)
219 .and(warp::delete())
220 .and(with_router(router.clone()))
221 .and_then(handle_delete_route);
222
223 let update_tcp = warp::path!("router" / "tcp")
225 .and(warp::put())
226 .and(warp::body::json())
227 .and(with_router(router.clone()))
228 .and_then(handle_update_tcp);
229
230 let update_settings = warp::path!("router" / "settings")
232 .and(warp::put())
233 .and(warp::body::json())
234 .and(with_router(router.clone()))
235 .and_then(handle_update_settings);
236
237 warp::path("v1").and(
239 get_routes
240 .or(get_route)
241 .or(upsert_route)
242 .or(delete_route)
243 .or(update_tcp)
244 .or(update_settings)
245 ).boxed()
246 }
247}
248
249fn with_registry(registry: Arc<ServiceRegistry>) -> impl Filter<Extract = (Arc<ServiceRegistry>,), Error = std::convert::Infallible> + Clone {
251 warp::any().map(move || registry.clone())
252}
253
254fn with_router(router: Arc<tokio::sync::Mutex<Router>>) -> impl Filter<Extract = (Arc<tokio::sync::Mutex<Router>>,), Error = std::convert::Infallible> + Clone {
256 warp::any().map(move || router.clone())
257}
258
259#[derive(Debug, Deserialize)]
261struct RegisterServiceRequest {
262 name: String,
264
265 host: String,
267
268 port: u16,
270
271 protocol: String,
273
274 health_check_path: Option<String>,
276
277 tags: Option<Vec<String>>,
279
280 metadata: Option<HashMap<String, String>>,
282
283 ttl: Option<u64>,
285}
286
287#[derive(Debug, Deserialize)]
289struct HealthUpdateRequest {
290 health: String,
292}
293
294#[derive(Debug, Deserialize)]
296struct TcpConfigRequest {
297 enabled: Option<bool>,
299
300 listen_addr: Option<String>,
302
303 connection_pooling: Option<bool>,
305
306 max_idle_time_secs: Option<u64>,
308
309 udp_enabled: Option<bool>,
311
312 udp_listen_addr: Option<String>,
314}
315
316#[derive(Debug, Deserialize)]
318struct RouterSettingsRequest {
319 listen_addr: Option<String>,
321
322 global_timeout_ms: Option<u64>,
324
325 max_connections: Option<usize>,
327}
328
329#[derive(Debug, Deserialize)]
331struct AddNodeRequest {
332node_id: u64,
334
335address: String,
337}
338
339#[derive(Debug, Serialize)]
341struct ApiResponse<T> {
342success: bool,
344
345message: String,
347
348#[serde(skip_serializing_if = "Option::is_none")]
350data: Option<T>,
351}
352
353#[derive(Debug, Serialize)]
355struct HealthResponse {
356status: String,
358
359message: String,
361}
362
363fn success<T>(message: &str, data: Option<T>) -> ApiResponse<T> {
365ApiResponse {
366 success: true,
367 message: message.to_string(),
368 data,
369}
370}
371
372fn error<T>(message: &str) -> ApiResponse<T> {
374ApiResponse {
375 success: false,
376 message: message.to_string(),
377 data: None,
378}
379}
380
381async fn handle_rejection(err: Rejection) -> Result<impl Reply, Rejection> {
383let message = if err.is_not_found() {
384 "Not found".to_string()
385} else if let Some(e) = err.find::<warp::filters::body::BodyDeserializeError>() {
386 format!("Invalid request data: {}", e)
387} else {
388 "Internal server error".to_string()
389};
390
391let json = warp::reply::json(&error::<()>(&message));
392let status = if err.is_not_found() {
393 warp::http::StatusCode::NOT_FOUND
394} else if err.find::<warp::filters::body::BodyDeserializeError>().is_some() {
395 warp::http::StatusCode::BAD_REQUEST
396} else {
397 warp::http::StatusCode::INTERNAL_SERVER_ERROR
398};
399
400Ok(warp::reply::with_status(json, status))
401}
402
403async fn handle_list_services(
405registry: Arc<ServiceRegistry>,
406) -> Result<impl Reply, Rejection> {
407let services = registry.get_services().await;
408Ok(warp::reply::with_status(
409 warp::reply::json(&success("Services retrieved", Some(services))),
410 warp::http::StatusCode::OK
411))
412}
413
414async fn handle_get_service(
416name: String,
417registry: Arc<ServiceRegistry>,
418) -> Result<impl Reply, Rejection> {
419match registry.get_service(&name).await {
420 Some(service) => Ok(warp::reply::with_status(
421 warp::reply::json(&success("Service retrieved", Some(service))),
422 warp::http::StatusCode::OK
423 )),
424 None => Ok(warp::reply::with_status(
425 warp::reply::json(&error::<()>(&format!("Service not found: {}", name))),
426 warp::http::StatusCode::NOT_FOUND
427 ))
428}
429}
430
431async fn handle_register_service(
433request: RegisterServiceRequest,
434registry: Arc<ServiceRegistry>,
435) -> Result<impl Reply, Rejection> {
436let tags = request.tags.unwrap_or_default();
438let metadata = request.metadata.unwrap_or_default();
439
440match registry.register_service(
442 &request.name,
443 &request.host,
444 request.port,
445 &request.protocol,
446 tags,
447 request.health_check_path,
448 Some(metadata),
449).await {
450 Ok(instance_id) => {
451 let response_data = HashMap::from([
452 ("service_name", request.name),
453 ("instance_id", instance_id),
454 ]);
455
456 Ok(warp::reply::with_status(
457 warp::reply::json(&success("Service registered successfully", Some(response_data))),
458 warp::http::StatusCode::CREATED
459 ))
460 }
461 Err(e) => Ok(warp::reply::with_status(
462 warp::reply::json(&error::<()>(&format!("Failed to register service: {}", e))),
463 warp::http::StatusCode::INTERNAL_SERVER_ERROR
464 ))
465}
466}
467
468async fn handle_deregister_service(
470service_name: String,
471instance_id: String,
472registry: Arc<ServiceRegistry>,
473) -> Result<impl Reply, Rejection> {
474match registry.deregister_service(&service_name, &instance_id).await {
475 Ok(_) => Ok(warp::reply::with_status(
476 warp::reply::json(&success::<()>("Service deregistered successfully", None)),
477 warp::http::StatusCode::OK
478 )),
479 Err(e) => Ok(warp::reply::with_status(
480 warp::reply::json(&error::<()>(&format!("Failed to deregister service: {}", e))),
481 warp::http::StatusCode::INTERNAL_SERVER_ERROR
482 ))
483}
484}
485
486async fn handle_update_health(
488service_name: String,
489instance_id: String,
490request: HealthUpdateRequest,
491registry: Arc<ServiceRegistry>,
492) -> Result<impl Reply, Rejection> {
493let health = match request.health.to_lowercase().as_str() {
495 "healthy" => ServiceHealth::Healthy,
496 "unhealthy" => ServiceHealth::Unhealthy,
497 "unknown" => ServiceHealth::Unknown,
498 "starting" => ServiceHealth::Starting,
499 "maintenance" => ServiceHealth::Maintenance,
500 "deregistering" => ServiceHealth::Deregistering,
501 _ => return Ok(warp::reply::with_status(
502 warp::reply::json(&error::<()>(&format!("Invalid health status: {}", request.health))),
503 warp::http::StatusCode::BAD_REQUEST
504 )),
505};
506
507match registry.update_instance_health(&service_name, &instance_id, health).await {
509 Ok(_) => Ok(warp::reply::with_status(
510 warp::reply::json(&success::<()>("Health updated successfully", None)),
511 warp::http::StatusCode::OK
512 )),
513 Err(e) => Ok(warp::reply::with_status(
514 warp::reply::json(&error::<()>(&format!("Failed to update health: {}", e))),
515 warp::http::StatusCode::INTERNAL_SERVER_ERROR
516 ))
517}
518}
519
520async fn handle_heartbeat(
522service_name: String,
523instance_id: String,
524registry: Arc<ServiceRegistry>,
525) -> Result<impl Reply, Rejection> {
526match registry.heartbeat(&service_name, &instance_id).await {
527 Ok(_) => Ok(warp::reply::with_status(
528 warp::reply::json(&success::<()>("Heartbeat recorded successfully", None)),
529 warp::http::StatusCode::OK
530 )),
531 Err(e) => Ok(warp::reply::with_status(
532 warp::reply::json(&error::<()>(&format!("Failed to record heartbeat: {}", e))),
533 warp::http::StatusCode::INTERNAL_SERVER_ERROR
534 ))
535}
536}
537
538async fn handle_get_routes(
540_router: Arc<tokio::sync::Mutex<Router>>,
541) -> Result<impl Reply, Rejection> {
542Ok(warp::reply::with_status(
544 warp::reply::json(&success("Routes retrieved", Some(HashMap::<String, String>::new()))),
545 warp::http::StatusCode::OK
546))
547}
548
549async fn handle_get_route(
551_name: String,
552_router: Arc<tokio::sync::Mutex<Router>>,
553) -> Result<impl Reply, Rejection> {
554Ok(warp::reply::with_status(
556 warp::reply::json(&success("Route retrieved", Some(HashMap::<String, String>::new()))),
557 warp::http::StatusCode::OK
558))
559}
560
561#[derive(Debug, Deserialize)]
562struct RouteConfigRequest {
563 upstream: String,
565
566 timeout_ms: Option<u64>,
568
569 retry_count: Option<u32>,
571
572 priority: Option<i32>,
574
575 preserve_host_header: Option<bool>,
577}
578
579async fn handle_upsert_route(
581 name: String,
582 request: RouteConfigRequest,
583 router: Arc<tokio::sync::Mutex<Router>>,
584 service_registry: Arc<ServiceRegistry>,
585) -> Result<impl Reply, Rejection> {
586 let upstream = if request.upstream.starts_with("service://") {
588 let service_name = request.upstream.replace("service://", "");
589
590 let instances = service_registry.get_healthy_instances(&service_name).await;
592
593 if instances.is_empty() {
594 return Ok(warp::reply::with_status(
595 warp::reply::json(&error::<()>(&format!("No healthy instances for service {}", service_name))),
596 warp::http::StatusCode::BAD_REQUEST
597 ));
598 }
599
600 let instance = instances.first().unwrap();
603 format!("{}://{}:{}", instance.protocol, instance.host, instance.port)
604 } else {
605 request.upstream
606 };
607
608 let mut route_config = harbr_router::RouteConfig::new(&upstream);
610
611 if let Some(timeout) = request.timeout_ms {
612 route_config = route_config.with_timeout(timeout);
613 }
614
615 if let Some(retry_count) = request.retry_count {
616 route_config = route_config.with_retry_count(retry_count);
617 }
618
619 if let Some(priority) = request.priority {
620 route_config = route_config.with_priority(priority);
621 }
622
623 if let Some(preserve) = request.preserve_host_header {
624 route_config = route_config.preserve_host_header(preserve);
625 }
626
627 let mut router_lock = router.lock().await;
629 router_lock.add_route(&name, route_config);
630
631 Ok(warp::reply::with_status(
632 warp::reply::json(&success::<()>("Route updated successfully", None)),
633 warp::http::StatusCode::OK
634 ))
635}
636
637async fn handle_delete_route(
639_name: String,
640_router: Arc<tokio::sync::Mutex<Router>>,
641) -> Result<impl Reply, Rejection> {
642Ok(warp::reply::with_status(
644 warp::reply::json(&success::<()>("Route deleted successfully", None)),
645 warp::http::StatusCode::OK
646))
647}
648
649async fn handle_update_tcp(
651_request: TcpConfigRequest,
652_router: Arc<tokio::sync::Mutex<Router>>,
653) -> Result<impl Reply, Rejection> {
654Ok(warp::reply::with_status(
656 warp::reply::json(&success::<()>("TCP configuration updated successfully", None)),
657 warp::http::StatusCode::OK
658))
659}
660
661async fn handle_update_settings(
663_request: RouterSettingsRequest,
664_router: Arc<tokio::sync::Mutex<Router>>,
665) -> Result<impl Reply, Rejection> {
666Ok(warp::reply::with_status(
668 warp::reply::json(&success::<()>("Router settings updated successfully", None)),
669 warp::http::StatusCode::OK
670))
671}