omni_orchestrator/
cluster.rs

1use anyhow::{anyhow, Result};
2use colored::Colorize;
3use log::debug;
4use reqwest::Client;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::RwLock;
10
11use crate::config::ServerConfig;
12use crate::state::SharedState;
13
14/// Represents a node in the OmniOrchestrator cluster.
15///
16/// This structure contains all the necessary information to identify and
17/// communicate with a specific node in the distributed system. Each node
18/// is uniquely identified by its ID, and contains network location information.
19///
20/// # Fields
21///
22/// * `id` - Unique identifier for the node, typically in the format of "address:port"
23/// * `port` - The port number that the node is listening on
24/// * `address` - The network address of the node for communication
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct NodeInfo {
27    /// Unique identifier for the node in the cluster
28    pub id: Arc<str>,
29    /// Port number the node is listening on
30    pub port: u16,
31    /// Network address of the node
32    pub address: Arc<str>,
33}
34
35/// Central manager for cluster operations and node tracking.
36///
37/// The ClusterManager is responsible for maintaining the state of the entire 
38/// cluster, including tracking which nodes are currently active, managing node
39/// registration and removal, and providing information about the cluster's 
40/// current composition.
41///
42/// It uses thread-safe data structures to allow concurrent access from multiple 
43/// parts of the application, particularly important in a distributed system where
44/// node changes can happen at any time.
45///
46/// # Fields
47///
48/// * `state` - Shared state that includes information about the current node and cluster
49/// * `nodes` - Thread-safe map of all known nodes in the cluster, indexed by their address
50#[derive(Debug)]
51pub struct ClusterManager {
52    /// Shared state containing information about the current node and overall cluster
53    pub state: Arc<RwLock<SharedState>>,
54    /// Thread-safe map of all nodes in the cluster, keyed by node address
55    pub nodes: Arc<RwLock<HashMap<Arc<str>, NodeInfo>>>,
56}
57
58impl ClusterManager {
59    /// Creates a new instance of the ClusterManager.
60    ///
61    /// Initializes a new cluster manager with the provided shared state and
62    /// an empty nodes map. This is typically called during application startup
63    /// to establish the cluster management subsystem.
64    ///
65    /// # Arguments
66    ///
67    /// * `state` - Shared state containing information about the current node
68    ///
69    /// # Returns
70    ///
71    /// A new ClusterManager instance ready to track and manage cluster nodes
72    pub fn new(state: Arc<RwLock<SharedState>>) -> Self {
73        Self {
74            state,
75            nodes: Arc::new(RwLock::new(HashMap::new())),
76        }
77    }
78
79    /// Registers a new node in the cluster.
80    ///
81    /// This method adds a node to the cluster's node registry if it doesn't already exist.
82    /// After registration, it updates the shared state with the new cluster size.
83    /// The method uses colorized output to make debugging in the console easier.
84    ///
85    /// # Arguments
86    ///
87    /// * `node` - Information about the node to register
88    ///
89    /// # Side Effects
90    ///
91    /// * Updates the internal nodes map if the node is new
92    /// * Updates the cluster size in the shared state
93    /// * Prints diagnostic information to console
94    pub async fn register_node(&self, node: NodeInfo) {
95        let node_uid = node.address.clone();
96        println!(
97            "{}{}",
98            "CALLED REGISTER NODE FUNCTION WITH PARAMS OF:"
99                .white()
100                .on_red()
101                .bold(),
102            node_uid.green()
103        );
104        
105        // Check if the node already exists in our registry
106        if self.nodes.read().await.contains_key(&node_uid) {
107            println!("WE ALREADY HAD THIS NODE");
108            return;
109        }
110        
111        // Add the node to our registry and get the new size
112        let size = {
113            let mut nodes = self.nodes.write().await;
114            println!(
115                "{}{}",
116                "ADDING NODE".white().on_red().bold().underline(),
117                node_uid
118            );
119            nodes.insert(node_uid, node);
120            let size = nodes.len();
121            println!("Current node map: {:?}", nodes);
122            size
123        };
124        
125        // Update the cluster size in the shared state
126        let mut state = self.state.write().await;
127        state.cluster_size = size;
128    }
129
130    /// Removes a node from the cluster.
131    ///
132    /// This method removes a node from the cluster's node registry if it exists.
133    /// After removal, it updates the shared state with the new cluster size.
134    /// The method includes debug logging to track node removal operations.
135    ///
136    /// # Arguments
137    ///
138    /// * `node_uid` - Unique identifier of the node to remove, typically its address
139    ///
140    /// # Side Effects
141    ///
142    /// * Updates the internal nodes map by removing the specified node
143    /// * Updates the cluster size in the shared state
144    /// * Logs diagnostic information about the removal operation
145    pub async fn remove_node(&self, node_uid: Arc<str>) {
146        debug!(
147            "{}{}",
148            "CALING REMOVE NODE FUNCTION WITH PARAMS OF:"
149                .white()
150                .on_green()
151                .bold(),
152            node_uid.green()
153        );
154        
155        // First check if the node exists in our registry
156        {
157            let nodes_read = self.nodes.read().await;
158            if !nodes_read.contains_key(&node_uid) {
159                log::info!("Attempted to remove a node that does not exist");
160                log::info!("Current nodes: {:?}", nodes_read);
161                return;
162            }
163        }
164        
165        // Remove the node from our registry
166        let mut nodes = self.nodes.write().await;
167        log::info!("Removing node: {}", node_uid.white().on_green().bold());
168        nodes.remove(&node_uid);
169
170        // Update the cluster size in the shared state
171        let mut state = self.state.write().await;
172        state.cluster_size = nodes.len();
173    }
174
175    /// Retrieves a list of all known nodes in the cluster.
176    ///
177    /// This method provides a snapshot of all the nodes currently registered
178    /// in the cluster manager. It's useful for operations that need to iterate
179    /// over all nodes or display cluster status information.
180    ///
181    /// # Returns
182    ///
183    /// A vector containing information about all known nodes in the cluster
184    pub async fn get_nodes(&self) -> Vec<NodeInfo> {
185        let nodes = self.nodes.read().await;
186        nodes.values().cloned().collect()
187    }
188
189    /// Retrieves a list of all known nodes plus the current node.
190    ///
191    /// This method provides a complete view of the cluster including the current node.
192    /// It's particularly useful for operations that need a complete picture of the cluster,
193    /// such as leader election or quorum calculations.
194    ///
195    /// # Returns
196    ///
197    /// A vector containing information about all nodes in the cluster, including the current node
198    ///
199    /// # Side Effects
200    ///
201    /// * Prints diagnostic information about the current node and known nodes to console
202    pub async fn get_nodes_and_self(&self) -> Vec<NodeInfo> {
203        let state = self.state.read().await;
204        let nodes = self.nodes.read().await;
205
206        // Collect all known nodes into a vector
207        let mut all_nodes: Vec<NodeInfo> = nodes.values().cloned().collect();
208        
209        // Add the current node to the collection
210        all_nodes.push(NodeInfo {
211            id: format!("{}", state.node_id).into(),
212            address: state.node_id.split(':').next().unwrap_or_default().into(),
213            port: state
214                .node_id
215                .split(':')
216                .nth(1)
217                .unwrap_or_default()
218                .parse()
219                .unwrap_or(0),
220        });
221
222        // Print diagnostic information
223        println!("Current node ID: {}", state.node_id);
224        println!("Known nodes: {:?}", nodes);
225
226        all_nodes
227    }
228
229    /// Checks if a specific node is currently active in the cluster.
230    ///
231    /// This method determines if a node is still considered active within the cluster
232    /// by checking if it exists in the nodes registry. It's useful for operations that
233    /// need to verify a node's presence before attempting to communicate with it.
234    ///
235    /// # Arguments
236    ///
237    /// * `node_uid` - Unique identifier of the node to check, typically its address
238    ///
239    /// # Returns
240    ///
241    /// `true` if the node is active in the cluster, `false` otherwise
242    pub async fn is_node_alive(&self, node_uid: Arc<str>) -> bool {
243        let nodes = self.nodes.read().await;
244        nodes.contains_key(&node_uid)
245    }
246
247    pub async fn discover_peers(&self, config: &ServerConfig, my_port: u16) -> Result<()> {
248        let client = Client::new();
249        log::info!("{}", "Starting peer discovery...".cyan());
250
251        for instance in &config.instances {
252            let string = format!("{:#?}", instance);
253            log::info!("{}", format!("Discovered: {}", string).blue().bold());
254            if instance.port == my_port {
255                log::debug!("Skipping self-connection at port {}", my_port);
256                continue;
257            }
258
259            let node_address: Arc<str> = format!("{}:{}", instance.address, instance.port).into();
260            let node_uri = format!("{}", node_address);
261
262            match self.connect_to_peer(&client, &node_uri.clone()).await {
263                Ok(_) => log::info!(
264                    "{}",
265                    format!("Successfully connected to peer: {}", node_uri).green()
266                ),
267                Err(e) => {
268                    log::warn!(
269                        "{}",
270                        format!("Failed to connect to peer: {} {}", node_uri, e).yellow()
271                    );
272                    self.remove_node(node_uri.into()).await;
273                }
274            }
275        }
276
277        log::info!("{}", "Peer discovery completed".cyan());
278        Ok(())
279    }
280
281    async fn connect_to_peer(&self, client: &Client, node_address: &str) -> Result<()> {
282        let health_url = format!("{}/health", node_address);
283        log::debug!("Checking health at: {}", health_url);
284
285        let response = client
286            .get(&health_url)
287            .timeout(Duration::from_secs(5))
288            .send()
289            .await?;
290
291        if response.status().is_success() {
292            let port = node_address
293                .split(':')
294                .next_back()
295                .unwrap_or("80")
296                .parse::<u16>()
297                .unwrap_or(80);
298
299            let node_info = NodeInfo {
300                id: node_address.into(),
301                address: node_address.into(),
302                port,
303            };
304
305            self.register_node(node_info).await;
306            log::debug!("Node registered: {}", node_address);
307            Ok(())
308        } else {
309            Err(anyhow!("Node health check failed"))
310        }
311    }
312}