omni_orchestrator/cluster.rs
1use colored::Colorize;
2use log::debug;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7
8use crate::state::SharedState;
9
10/// Represents a node in the OmniOrchestrator cluster.
11///
12/// This structure contains all the necessary information to identify and
13/// communicate with a specific node in the distributed system. Each node
14/// is uniquely identified by its ID, and contains network location information.
15///
16/// # Fields
17///
18/// * `id` - Unique identifier for the node, typically in the format of "address:port"
19/// * `port` - The port number that the node is listening on
20/// * `address` - The network address of the node for communication
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct NodeInfo {
23 /// Unique identifier for the node in the cluster
24 pub id: Arc<str>,
25 /// Port number the node is listening on
26 pub port: u16,
27 /// Network address of the node
28 pub address: Arc<str>,
29}
30
31/// Central manager for cluster operations and node tracking.
32///
33/// The ClusterManager is responsible for maintaining the state of the entire
34/// cluster, including tracking which nodes are currently active, managing node
35/// registration and removal, and providing information about the cluster's
36/// current composition.
37///
38/// It uses thread-safe data structures to allow concurrent access from multiple
39/// parts of the application, particularly important in a distributed system where
40/// node changes can happen at any time.
41///
42/// # Fields
43///
44/// * `state` - Shared state that includes information about the current node and cluster
45/// * `nodes` - Thread-safe map of all known nodes in the cluster, indexed by their address
46#[derive(Debug)]
47pub struct ClusterManager {
48 /// Shared state containing information about the current node and overall cluster
49 pub state: Arc<RwLock<SharedState>>,
50 /// Thread-safe map of all nodes in the cluster, keyed by node address
51 pub nodes: Arc<RwLock<HashMap<Arc<str>, NodeInfo>>>,
52}
53
54impl ClusterManager {
55 /// Creates a new instance of the ClusterManager.
56 ///
57 /// Initializes a new cluster manager with the provided shared state and
58 /// an empty nodes map. This is typically called during application startup
59 /// to establish the cluster management subsystem.
60 ///
61 /// # Arguments
62 ///
63 /// * `state` - Shared state containing information about the current node
64 ///
65 /// # Returns
66 ///
67 /// A new ClusterManager instance ready to track and manage cluster nodes
68 pub fn new(state: Arc<RwLock<SharedState>>) -> Self {
69 Self {
70 state,
71 nodes: Arc::new(RwLock::new(HashMap::new())),
72 }
73 }
74
75 /// Registers a new node in the cluster.
76 ///
77 /// This method adds a node to the cluster's node registry if it doesn't already exist.
78 /// After registration, it updates the shared state with the new cluster size.
79 /// The method uses colorized output to make debugging in the console easier.
80 ///
81 /// # Arguments
82 ///
83 /// * `node` - Information about the node to register
84 ///
85 /// # Side Effects
86 ///
87 /// * Updates the internal nodes map if the node is new
88 /// * Updates the cluster size in the shared state
89 /// * Prints diagnostic information to console
90 pub async fn register_node(&self, node: NodeInfo) {
91 let node_uid = node.address.clone();
92 println!(
93 "{}{}",
94 "CALLED REGISTER NODE FUNCTION WITH PARAMS OF:"
95 .white()
96 .on_red()
97 .bold(),
98 node_uid.green()
99 );
100
101 // Check if the node already exists in our registry
102 if self.nodes.read().await.contains_key(&node_uid) {
103 println!("WE ALREADY HAD THIS NODE");
104 return;
105 }
106
107 // Add the node to our registry and get the new size
108 let size = {
109 let mut nodes = self.nodes.write().await;
110 println!(
111 "{}{}",
112 "ADDING NODE".white().on_red().bold().underline(),
113 node_uid
114 );
115 nodes.insert(node_uid, node);
116 let size = nodes.len();
117 println!("Current node map: {:?}", nodes);
118 size
119 };
120
121 // Update the cluster size in the shared state
122 let mut state = self.state.write().await;
123 state.cluster_size = size;
124 }
125
126 /// Removes a node from the cluster.
127 ///
128 /// This method removes a node from the cluster's node registry if it exists.
129 /// After removal, it updates the shared state with the new cluster size.
130 /// The method includes debug logging to track node removal operations.
131 ///
132 /// # Arguments
133 ///
134 /// * `node_uid` - Unique identifier of the node to remove, typically its address
135 ///
136 /// # Side Effects
137 ///
138 /// * Updates the internal nodes map by removing the specified node
139 /// * Updates the cluster size in the shared state
140 /// * Logs diagnostic information about the removal operation
141 pub async fn remove_node(&self, node_uid: Arc<str>) {
142 debug!(
143 "{}{}",
144 "CALING REMOVE NODE FUNCTION WITH PARAMS OF:"
145 .white()
146 .on_green()
147 .bold(),
148 node_uid.green()
149 );
150
151 // First check if the node exists in our registry
152 {
153 let nodes_read = self.nodes.read().await;
154 if !nodes_read.contains_key(&node_uid) {
155 log::info!("Attempted to remove a node that does not exist");
156 log::info!("Current nodes: {:?}", nodes_read);
157 return;
158 }
159 }
160
161 // Remove the node from our registry
162 let mut nodes = self.nodes.write().await;
163 log::info!("Removing node: {}", node_uid.white().on_green().bold());
164 nodes.remove(&node_uid);
165
166 // Update the cluster size in the shared state
167 let mut state = self.state.write().await;
168 state.cluster_size = nodes.len();
169 }
170
171 /// Retrieves a list of all known nodes in the cluster.
172 ///
173 /// This method provides a snapshot of all the nodes currently registered
174 /// in the cluster manager. It's useful for operations that need to iterate
175 /// over all nodes or display cluster status information.
176 ///
177 /// # Returns
178 ///
179 /// A vector containing information about all known nodes in the cluster
180 pub async fn get_nodes(&self) -> Vec<NodeInfo> {
181 let nodes = self.nodes.read().await;
182 nodes.values().cloned().collect()
183 }
184
185 /// Retrieves a list of all known nodes plus the current node.
186 ///
187 /// This method provides a complete view of the cluster including the current node.
188 /// It's particularly useful for operations that need a complete picture of the cluster,
189 /// such as leader election or quorum calculations.
190 ///
191 /// # Returns
192 ///
193 /// A vector containing information about all nodes in the cluster, including the current node
194 ///
195 /// # Side Effects
196 ///
197 /// * Prints diagnostic information about the current node and known nodes to console
198 pub async fn get_nodes_and_self(&self) -> Vec<NodeInfo> {
199 let state = self.state.read().await;
200 let nodes = self.nodes.read().await;
201
202 // Collect all known nodes into a vector
203 let mut all_nodes: Vec<NodeInfo> = nodes.values().cloned().collect();
204
205 // Add the current node to the collection
206 all_nodes.push(NodeInfo {
207 id: format!("{}", state.node_id).into(),
208 address: state.node_id.split(':').next().unwrap_or_default().into(),
209 port: state
210 .node_id
211 .split(':')
212 .nth(1)
213 .unwrap_or_default()
214 .parse()
215 .unwrap_or(0),
216 });
217
218 // Print diagnostic information
219 println!("Current node ID: {}", state.node_id);
220 println!("Known nodes: {:?}", nodes);
221
222 all_nodes
223 }
224
225 /// Checks if a specific node is currently active in the cluster.
226 ///
227 /// This method determines if a node is still considered active within the cluster
228 /// by checking if it exists in the nodes registry. It's useful for operations that
229 /// need to verify a node's presence before attempting to communicate with it.
230 ///
231 /// # Arguments
232 ///
233 /// * `node_uid` - Unique identifier of the node to check, typically its address
234 ///
235 /// # Returns
236 ///
237 /// `true` if the node is active in the cluster, `false` otherwise
238 pub async fn is_node_alive(&self, node_uid: Arc<str>) -> bool {
239 let nodes = self.nodes.read().await;
240 nodes.contains_key(&node_uid)
241 }
242}