omni_orchestrator/
cluster.rs

1use colored::Colorize;
2use log::debug;
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::sync::Arc;
6use tokio::sync::RwLock;
7
8use crate::state::SharedState;
9
10/// Represents a node in the OmniOrchestrator cluster.
11///
12/// This structure contains all the necessary information to identify and
13/// communicate with a specific node in the distributed system. Each node
14/// is uniquely identified by its ID, and contains network location information.
15///
16/// # Fields
17///
18/// * `id` - Unique identifier for the node, typically in the format of "address:port"
19/// * `port` - The port number that the node is listening on
20/// * `address` - The network address of the node for communication
21#[derive(Debug, Clone, Serialize, Deserialize)]
22pub struct NodeInfo {
23    /// Unique identifier for the node in the cluster
24    pub id: Arc<str>,
25    /// Port number the node is listening on
26    pub port: u16,
27    /// Network address of the node
28    pub address: Arc<str>,
29}
30
31/// Central manager for cluster operations and node tracking.
32///
33/// The ClusterManager is responsible for maintaining the state of the entire 
34/// cluster, including tracking which nodes are currently active, managing node
35/// registration and removal, and providing information about the cluster's 
36/// current composition.
37///
38/// It uses thread-safe data structures to allow concurrent access from multiple 
39/// parts of the application, particularly important in a distributed system where
40/// node changes can happen at any time.
41///
42/// # Fields
43///
44/// * `state` - Shared state that includes information about the current node and cluster
45/// * `nodes` - Thread-safe map of all known nodes in the cluster, indexed by their address
46#[derive(Debug)]
47pub struct ClusterManager {
48    /// Shared state containing information about the current node and overall cluster
49    pub state: Arc<RwLock<SharedState>>,
50    /// Thread-safe map of all nodes in the cluster, keyed by node address
51    pub nodes: Arc<RwLock<HashMap<Arc<str>, NodeInfo>>>,
52}
53
54impl ClusterManager {
55    /// Creates a new instance of the ClusterManager.
56    ///
57    /// Initializes a new cluster manager with the provided shared state and
58    /// an empty nodes map. This is typically called during application startup
59    /// to establish the cluster management subsystem.
60    ///
61    /// # Arguments
62    ///
63    /// * `state` - Shared state containing information about the current node
64    ///
65    /// # Returns
66    ///
67    /// A new ClusterManager instance ready to track and manage cluster nodes
68    pub fn new(state: Arc<RwLock<SharedState>>) -> Self {
69        Self {
70            state,
71            nodes: Arc::new(RwLock::new(HashMap::new())),
72        }
73    }
74
75    /// Registers a new node in the cluster.
76    ///
77    /// This method adds a node to the cluster's node registry if it doesn't already exist.
78    /// After registration, it updates the shared state with the new cluster size.
79    /// The method uses colorized output to make debugging in the console easier.
80    ///
81    /// # Arguments
82    ///
83    /// * `node` - Information about the node to register
84    ///
85    /// # Side Effects
86    ///
87    /// * Updates the internal nodes map if the node is new
88    /// * Updates the cluster size in the shared state
89    /// * Prints diagnostic information to console
90    pub async fn register_node(&self, node: NodeInfo) {
91        let node_uid = node.address.clone();
92        println!(
93            "{}{}",
94            "CALLED REGISTER NODE FUNCTION WITH PARAMS OF:"
95                .white()
96                .on_red()
97                .bold(),
98            node_uid.green()
99        );
100        
101        // Check if the node already exists in our registry
102        if self.nodes.read().await.contains_key(&node_uid) {
103            println!("WE ALREADY HAD THIS NODE");
104            return;
105        }
106        
107        // Add the node to our registry and get the new size
108        let size = {
109            let mut nodes = self.nodes.write().await;
110            println!(
111                "{}{}",
112                "ADDING NODE".white().on_red().bold().underline(),
113                node_uid
114            );
115            nodes.insert(node_uid, node);
116            let size = nodes.len();
117            println!("Current node map: {:?}", nodes);
118            size
119        };
120        
121        // Update the cluster size in the shared state
122        let mut state = self.state.write().await;
123        state.cluster_size = size;
124    }
125
126    /// Removes a node from the cluster.
127    ///
128    /// This method removes a node from the cluster's node registry if it exists.
129    /// After removal, it updates the shared state with the new cluster size.
130    /// The method includes debug logging to track node removal operations.
131    ///
132    /// # Arguments
133    ///
134    /// * `node_uid` - Unique identifier of the node to remove, typically its address
135    ///
136    /// # Side Effects
137    ///
138    /// * Updates the internal nodes map by removing the specified node
139    /// * Updates the cluster size in the shared state
140    /// * Logs diagnostic information about the removal operation
141    pub async fn remove_node(&self, node_uid: Arc<str>) {
142        debug!(
143            "{}{}",
144            "CALING REMOVE NODE FUNCTION WITH PARAMS OF:"
145                .white()
146                .on_green()
147                .bold(),
148            node_uid.green()
149        );
150        
151        // First check if the node exists in our registry
152        {
153            let nodes_read = self.nodes.read().await;
154            if !nodes_read.contains_key(&node_uid) {
155                log::info!("Attempted to remove a node that does not exist");
156                log::info!("Current nodes: {:?}", nodes_read);
157                return;
158            }
159        }
160        
161        // Remove the node from our registry
162        let mut nodes = self.nodes.write().await;
163        log::info!("Removing node: {}", node_uid.white().on_green().bold());
164        nodes.remove(&node_uid);
165
166        // Update the cluster size in the shared state
167        let mut state = self.state.write().await;
168        state.cluster_size = nodes.len();
169    }
170
171    /// Retrieves a list of all known nodes in the cluster.
172    ///
173    /// This method provides a snapshot of all the nodes currently registered
174    /// in the cluster manager. It's useful for operations that need to iterate
175    /// over all nodes or display cluster status information.
176    ///
177    /// # Returns
178    ///
179    /// A vector containing information about all known nodes in the cluster
180    pub async fn get_nodes(&self) -> Vec<NodeInfo> {
181        let nodes = self.nodes.read().await;
182        nodes.values().cloned().collect()
183    }
184
185    /// Retrieves a list of all known nodes plus the current node.
186    ///
187    /// This method provides a complete view of the cluster including the current node.
188    /// It's particularly useful for operations that need a complete picture of the cluster,
189    /// such as leader election or quorum calculations.
190    ///
191    /// # Returns
192    ///
193    /// A vector containing information about all nodes in the cluster, including the current node
194    ///
195    /// # Side Effects
196    ///
197    /// * Prints diagnostic information about the current node and known nodes to console
198    pub async fn get_nodes_and_self(&self) -> Vec<NodeInfo> {
199        let state = self.state.read().await;
200        let nodes = self.nodes.read().await;
201
202        // Collect all known nodes into a vector
203        let mut all_nodes: Vec<NodeInfo> = nodes.values().cloned().collect();
204        
205        // Add the current node to the collection
206        all_nodes.push(NodeInfo {
207            id: format!("{}", state.node_id).into(),
208            address: state.node_id.split(':').next().unwrap_or_default().into(),
209            port: state
210                .node_id
211                .split(':')
212                .nth(1)
213                .unwrap_or_default()
214                .parse()
215                .unwrap_or(0),
216        });
217
218        // Print diagnostic information
219        println!("Current node ID: {}", state.node_id);
220        println!("Known nodes: {:?}", nodes);
221
222        all_nodes
223    }
224
225    /// Checks if a specific node is currently active in the cluster.
226    ///
227    /// This method determines if a node is still considered active within the cluster
228    /// by checking if it exists in the nodes registry. It's useful for operations that
229    /// need to verify a node's presence before attempting to communicate with it.
230    ///
231    /// # Arguments
232    ///
233    /// * `node_uid` - Unique identifier of the node to check, typically its address
234    ///
235    /// # Returns
236    ///
237    /// `true` if the node is active in the cluster, `false` otherwise
238    pub async fn is_node_alive(&self, node_uid: Arc<str>) -> bool {
239        let nodes = self.nodes.read().await;
240        nodes.contains_key(&node_uid)
241    }
242}