omni_orchestrator/cluster.rs
1use anyhow::{anyhow, Result};
2use colored::Colorize;
3use log::debug;
4use reqwest::Client;
5use serde::{Deserialize, Serialize};
6use std::collections::HashMap;
7use std::sync::Arc;
8use std::time::Duration;
9use tokio::sync::RwLock;
10
11use crate::config::ServerConfig;
12use crate::state::SharedState;
13
14/// Represents a node in the OmniOrchestrator cluster.
15///
16/// This structure contains all the necessary information to identify and
17/// communicate with a specific node in the distributed system. Each node
18/// is uniquely identified by its ID, and contains network location information.
19///
20/// # Fields
21///
22/// * `id` - Unique identifier for the node, typically in the format of "address:port"
23/// * `port` - The port number that the node is listening on
24/// * `address` - The network address of the node for communication
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct NodeInfo {
27 /// Unique identifier for the node in the cluster
28 pub id: Arc<str>,
29 /// Port number the node is listening on
30 pub port: u16,
31 /// Network address of the node
32 pub address: Arc<str>,
33}
34
35/// Central manager for cluster operations and node tracking.
36///
37/// The ClusterManager is responsible for maintaining the state of the entire
38/// cluster, including tracking which nodes are currently active, managing node
39/// registration and removal, and providing information about the cluster's
40/// current composition.
41///
42/// It uses thread-safe data structures to allow concurrent access from multiple
43/// parts of the application, particularly important in a distributed system where
44/// node changes can happen at any time.
45///
46/// # Fields
47///
48/// * `state` - Shared state that includes information about the current node and cluster
49/// * `nodes` - Thread-safe map of all known nodes in the cluster, indexed by their address
50#[derive(Debug)]
51pub struct ClusterManager {
52 /// Shared state containing information about the current node and overall cluster
53 pub state: Arc<RwLock<SharedState>>,
54 /// Thread-safe map of all nodes in the cluster, keyed by node address
55 pub nodes: Arc<RwLock<HashMap<Arc<str>, NodeInfo>>>,
56}
57
58impl ClusterManager {
59 /// Creates a new instance of the ClusterManager.
60 ///
61 /// Initializes a new cluster manager with the provided shared state and
62 /// an empty nodes map. This is typically called during application startup
63 /// to establish the cluster management subsystem.
64 ///
65 /// # Arguments
66 ///
67 /// * `state` - Shared state containing information about the current node
68 ///
69 /// # Returns
70 ///
71 /// A new ClusterManager instance ready to track and manage cluster nodes
72 pub fn new(state: Arc<RwLock<SharedState>>) -> Self {
73 Self {
74 state,
75 nodes: Arc::new(RwLock::new(HashMap::new())),
76 }
77 }
78
79 /// Registers a new node in the cluster.
80 ///
81 /// This method adds a node to the cluster's node registry if it doesn't already exist.
82 /// After registration, it updates the shared state with the new cluster size.
83 /// The method uses colorized output to make debugging in the console easier.
84 ///
85 /// # Arguments
86 ///
87 /// * `node` - Information about the node to register
88 ///
89 /// # Side Effects
90 ///
91 /// * Updates the internal nodes map if the node is new
92 /// * Updates the cluster size in the shared state
93 /// * Prints diagnostic information to console
94 pub async fn register_node(&self, node: NodeInfo) {
95 let node_uid = node.address.clone();
96 println!(
97 "{}{}",
98 "CALLED REGISTER NODE FUNCTION WITH PARAMS OF:"
99 .white()
100 .on_red()
101 .bold(),
102 node_uid.green()
103 );
104
105 // Check if the node already exists in our registry
106 if self.nodes.read().await.contains_key(&node_uid) {
107 println!("WE ALREADY HAD THIS NODE");
108 return;
109 }
110
111 // Add the node to our registry and get the new size
112 let size = {
113 let mut nodes = self.nodes.write().await;
114 println!(
115 "{}{}",
116 "ADDING NODE".white().on_red().bold().underline(),
117 node_uid
118 );
119 nodes.insert(node_uid, node);
120 let size = nodes.len();
121 println!("Current node map: {:?}", nodes);
122 size
123 };
124
125 // Update the cluster size in the shared state
126 let mut state = self.state.write().await;
127 state.cluster_size = size;
128 }
129
130 /// Removes a node from the cluster.
131 ///
132 /// This method removes a node from the cluster's node registry if it exists.
133 /// After removal, it updates the shared state with the new cluster size.
134 /// The method includes debug logging to track node removal operations.
135 ///
136 /// # Arguments
137 ///
138 /// * `node_uid` - Unique identifier of the node to remove, typically its address
139 ///
140 /// # Side Effects
141 ///
142 /// * Updates the internal nodes map by removing the specified node
143 /// * Updates the cluster size in the shared state
144 /// * Logs diagnostic information about the removal operation
145 pub async fn remove_node(&self, node_uid: Arc<str>) {
146 debug!(
147 "{}{}",
148 "CALING REMOVE NODE FUNCTION WITH PARAMS OF:"
149 .white()
150 .on_green()
151 .bold(),
152 node_uid.green()
153 );
154
155 // First check if the node exists in our registry
156 {
157 let nodes_read = self.nodes.read().await;
158 if !nodes_read.contains_key(&node_uid) {
159 log::info!("Attempted to remove a node that does not exist");
160 log::info!("Current nodes: {:?}", nodes_read);
161 return;
162 }
163 }
164
165 // Remove the node from our registry
166 let mut nodes = self.nodes.write().await;
167 log::info!("Removing node: {}", node_uid.white().on_green().bold());
168 nodes.remove(&node_uid);
169
170 // Update the cluster size in the shared state
171 let mut state = self.state.write().await;
172 state.cluster_size = nodes.len();
173 }
174
175 /// Retrieves a list of all known nodes in the cluster.
176 ///
177 /// This method provides a snapshot of all the nodes currently registered
178 /// in the cluster manager. It's useful for operations that need to iterate
179 /// over all nodes or display cluster status information.
180 ///
181 /// # Returns
182 ///
183 /// A vector containing information about all known nodes in the cluster
184 pub async fn get_nodes(&self) -> Vec<NodeInfo> {
185 let nodes = self.nodes.read().await;
186 nodes.values().cloned().collect()
187 }
188
189 /// Retrieves a list of all known nodes plus the current node.
190 ///
191 /// This method provides a complete view of the cluster including the current node.
192 /// It's particularly useful for operations that need a complete picture of the cluster,
193 /// such as leader election or quorum calculations.
194 ///
195 /// # Returns
196 ///
197 /// A vector containing information about all nodes in the cluster, including the current node
198 ///
199 /// # Side Effects
200 ///
201 /// * Prints diagnostic information about the current node and known nodes to console
202 pub async fn get_nodes_and_self(&self) -> Vec<NodeInfo> {
203 let state = self.state.read().await;
204 let nodes = self.nodes.read().await;
205
206 // Collect all known nodes into a vector
207 let mut all_nodes: Vec<NodeInfo> = nodes.values().cloned().collect();
208
209 // Add the current node to the collection
210 all_nodes.push(NodeInfo {
211 id: format!("{}", state.node_id).into(),
212 address: state.node_id.split(':').next().unwrap_or_default().into(),
213 port: state
214 .node_id
215 .split(':')
216 .nth(1)
217 .unwrap_or_default()
218 .parse()
219 .unwrap_or(0),
220 });
221
222 // Print diagnostic information
223 println!("Current node ID: {}", state.node_id);
224 println!("Known nodes: {:?}", nodes);
225
226 all_nodes
227 }
228
229 /// Checks if a specific node is currently active in the cluster.
230 ///
231 /// This method determines if a node is still considered active within the cluster
232 /// by checking if it exists in the nodes registry. It's useful for operations that
233 /// need to verify a node's presence before attempting to communicate with it.
234 ///
235 /// # Arguments
236 ///
237 /// * `node_uid` - Unique identifier of the node to check, typically its address
238 ///
239 /// # Returns
240 ///
241 /// `true` if the node is active in the cluster, `false` otherwise
242 pub async fn is_node_alive(&self, node_uid: Arc<str>) -> bool {
243 let nodes = self.nodes.read().await;
244 nodes.contains_key(&node_uid)
245 }
246
247 pub async fn discover_peers(&self, config: &ServerConfig, my_port: u16) -> Result<()> {
248 let client = Client::new();
249 log::info!("{}", "Starting peer discovery...".cyan());
250
251 for instance in &config.instances {
252 let string = format!("{:#?}", instance);
253 log::info!("{}", format!("Discovered: {}", string).blue().bold());
254 if instance.port == my_port {
255 log::debug!("Skipping self-connection at port {}", my_port);
256 continue;
257 }
258
259 let node_address: Arc<str> = format!("{}:{}", instance.address, instance.port).into();
260 let node_uri = format!("{}", node_address);
261
262 match self.connect_to_peer(&client, &node_uri.clone()).await {
263 Ok(_) => log::info!(
264 "{}",
265 format!("Successfully connected to peer: {}", node_uri).green()
266 ),
267 Err(e) => {
268 log::warn!(
269 "{}",
270 format!("Failed to connect to peer: {} {}", node_uri, e).yellow()
271 );
272 self.remove_node(node_uri.into()).await;
273 }
274 }
275 }
276
277 log::info!("{}", "Peer discovery completed".cyan());
278 Ok(())
279 }
280
281 async fn connect_to_peer(&self, client: &Client, node_address: &str) -> Result<()> {
282 let health_url = format!("{}/health", node_address);
283 log::debug!("Checking health at: {}", health_url);
284
285 let response = client
286 .get(&health_url)
287 .timeout(Duration::from_secs(5))
288 .send()
289 .await?;
290
291 if response.status().is_success() {
292 let port = node_address
293 .split(':')
294 .next_back()
295 .unwrap_or("80")
296 .parse::<u16>()
297 .unwrap_or(80);
298
299 let node_info = NodeInfo {
300 id: node_address.into(),
301 address: node_address.into(),
302 port,
303 };
304
305 self.register_node(node_info).await;
306 log::debug!("Node registered: {}", node_address);
307 Ok(())
308 } else {
309 Err(anyhow!("Node health check failed"))
310 }
311 }
312}