omni_orchestrator/
leader.rs

1use std::sync::Arc;
2use std::time::Duration;
3use tokio::sync::RwLock;
4use tokio::time;
5
6use crate::state::SharedState;
7use crate::CLUSTER_MANAGER;
8
9/// Manages leader election in the OmniOrchestrator cluster.
10///
11/// The LeaderElection module is responsible for determining which node in the cluster
12/// should act as the leader. It implements a simple deterministic leader election
13/// algorithm based on node IDs to ensure that exactly one node assumes the leader role.
14///
15/// Leader election is a critical component in distributed systems that ensures:
16/// - Coordination responsibilities are clearly assigned
17/// - A single point of truth exists for cluster-wide decisions
18/// - System stability is maintained through consistent leadership
19///
20/// The election process runs periodically to accommodate cluster changes such as
21/// nodes joining or leaving the system.
22pub struct LeaderElection {
23    /// Unique identifier for the current node
24    node_id: Arc<str>,
25    
26    /// Shared state that tracks leadership status and cluster information
27    state: Arc<RwLock<SharedState>>,
28    
29    /// Timestamp of the last heartbeat received
30    /// This can be used for more sophisticated leader election algorithms
31    /// that take into account node responsiveness
32    #[allow(unused)]
33    last_heartbeat: Arc<RwLock<std::time::Instant>>,
34}
35
36impl LeaderElection {
37    /// Creates a new LeaderElection instance.
38    ///
39    /// Initializes the leader election module with the current node's identity
40    /// and a reference to the shared state. The last_heartbeat is initialized to
41    /// the current time.
42    ///
43    /// # Arguments
44    ///
45    /// * `node_id` - Unique identifier for the current node
46    /// * `state` - Shared state for tracking leadership status
47    ///
48    /// # Returns
49    ///
50    /// A new LeaderElection instance ready to begin the election process
51    pub fn new(node_id: Arc<str>, state: Arc<RwLock<SharedState>>) -> Self {
52        Self {
53            node_id,
54            state,
55            last_heartbeat: Arc::new(RwLock::new(std::time::Instant::now())),
56        }
57    }
58
59    /// Starts the leader election process.
60    ///
61    /// This method begins a continuous cycle of leader elections at a fixed interval.
62    /// Once started, it will periodically execute the election_cycle method to
63    /// determine the current leader based on the existing cluster composition.
64    ///
65    /// The election happens every 5 seconds, which provides a balance between
66    /// responsiveness to cluster changes and system overhead.
67    ///
68    /// # Note
69    ///
70    /// This method runs indefinitely in a loop and should typically be
71    /// spawned in its own task or thread.
72    pub async fn start(&self) {
73        // Create a timer that ticks every 5 seconds
74        let mut interval = time::interval(Duration::from_secs(5));
75
76        // Run the election cycle on each tick
77        loop {
78            interval.tick().await;
79            self.election_cycle().await;
80        }
81    }
82
83    /// Performs a single leader election cycle.
84    ///
85    /// This method implements the core leader election algorithm, which follows
86    /// these steps:
87    /// 1. Retrieve all nodes in the cluster
88    /// 2. Sort the nodes by ID for deterministic selection
89    /// 3. Select the first node in the sorted list as the leader
90    /// 4. Update the shared state with the election results
91    ///
92    /// The algorithm is intentionally simple and deterministic, ensuring that all
93    /// nodes will independently arrive at the same conclusion about who the leader is,
94    /// without requiring additional communication.
95    ///
96    /// # Special Cases
97    ///
98    /// - If the cluster contains only one node, that node becomes the leader.
99    /// - If the cluster contains no nodes (which shouldn't happen as the current node
100    ///   should always be included), the current node becomes the leader by default.
101    ///
102    /// # Side Effects
103    ///
104    /// - Updates the shared state to reflect the new leader
105    /// - Logs information about the election process and results
106    async fn election_cycle(&self) {
107        // Get reference to cluster manager and retrieve all nodes
108        let cluster_manager = CLUSTER_MANAGER.read().await;
109        let nodes = cluster_manager.get_nodes_and_self().await;
110        
111        // Log participating nodes for debugging
112        log::info!("Nodes participating in election:");
113        for node in &nodes {
114            log::info!("  - {}", node.id);
115        }
116
117        // Acquire write lock on shared state to update leadership information
118        let mut state = self.state.write().await;
119
120        // Sort nodes by ID for deterministic leader selection
121        // This ensures all nodes will independently choose the same leader
122        let mut sorted_nodes = nodes.clone();
123        sorted_nodes.sort_by(|a, b| a.id.cmp(&b.id));
124        log::info!("Sorted nodes: {:?}", sorted_nodes);
125
126        // Handle the case where this is the only node (or no nodes, which shouldn't happen)
127        if sorted_nodes.is_empty() {
128            state.is_leader = true;
129            state.leader_id = Some(self.node_id.clone());
130            log::info!("Single node {} becoming leader", self.node_id);
131            return;
132        }
133
134        // First node in sorted list becomes leader
135        let leader = &sorted_nodes[0];
136        let is_self_leader = leader.id == self.node_id;
137        log::info!("Leader logic: {} == {}", leader.id, self.node_id);
138
139        // Update state with leader information
140        state.is_leader = is_self_leader;
141        state.leader_id = Some(leader.id.clone());
142
143        // Log election results
144        log::info!("Leader elected: {})", leader.id);
145        log::info!(
146            "This node ({}) is {}",
147            self.node_id,
148            if is_self_leader { "leader" } else { "follower" }
149        );
150    }
151}