omni_orchestrator/leader.rs
1use std::sync::Arc;
2use std::time::Duration;
3use tokio::sync::RwLock;
4use tokio::time;
5
6use crate::state::SharedState;
7use crate::CLUSTER_MANAGER;
8
9/// Manages leader election in the OmniOrchestrator cluster.
10///
11/// The LeaderElection module is responsible for determining which node in the cluster
12/// should act as the leader. It implements a simple deterministic leader election
13/// algorithm based on node IDs to ensure that exactly one node assumes the leader role.
14///
15/// Leader election is a critical component in distributed systems that ensures:
16/// - Coordination responsibilities are clearly assigned
17/// - A single point of truth exists for cluster-wide decisions
18/// - System stability is maintained through consistent leadership
19///
20/// The election process runs periodically to accommodate cluster changes such as
21/// nodes joining or leaving the system.
22pub struct LeaderElection {
23 /// Unique identifier for the current node
24 node_id: Arc<str>,
25
26 /// Shared state that tracks leadership status and cluster information
27 state: Arc<RwLock<SharedState>>,
28
29 /// Timestamp of the last heartbeat received
30 /// This can be used for more sophisticated leader election algorithms
31 /// that take into account node responsiveness
32 #[allow(unused)]
33 last_heartbeat: Arc<RwLock<std::time::Instant>>,
34}
35
36impl LeaderElection {
37 /// Creates a new LeaderElection instance.
38 ///
39 /// Initializes the leader election module with the current node's identity
40 /// and a reference to the shared state. The last_heartbeat is initialized to
41 /// the current time.
42 ///
43 /// # Arguments
44 ///
45 /// * `node_id` - Unique identifier for the current node
46 /// * `state` - Shared state for tracking leadership status
47 ///
48 /// # Returns
49 ///
50 /// A new LeaderElection instance ready to begin the election process
51 pub fn new(node_id: Arc<str>, state: Arc<RwLock<SharedState>>) -> Self {
52 Self {
53 node_id,
54 state,
55 last_heartbeat: Arc::new(RwLock::new(std::time::Instant::now())),
56 }
57 }
58
59 /// Starts the leader election process.
60 ///
61 /// This method begins a continuous cycle of leader elections at a fixed interval.
62 /// Once started, it will periodically execute the election_cycle method to
63 /// determine the current leader based on the existing cluster composition.
64 ///
65 /// The election happens every 5 seconds, which provides a balance between
66 /// responsiveness to cluster changes and system overhead.
67 ///
68 /// # Note
69 ///
70 /// This method runs indefinitely in a loop and should typically be
71 /// spawned in its own task or thread.
72 pub async fn start(&self) {
73 // Create a timer that ticks every 5 seconds
74 let mut interval = time::interval(Duration::from_secs(5));
75
76 // Run the election cycle on each tick
77 loop {
78 interval.tick().await;
79 self.election_cycle().await;
80 }
81 }
82
83 /// Performs a single leader election cycle.
84 ///
85 /// This method implements the core leader election algorithm, which follows
86 /// these steps:
87 /// 1. Retrieve all nodes in the cluster
88 /// 2. Sort the nodes by ID for deterministic selection
89 /// 3. Select the first node in the sorted list as the leader
90 /// 4. Update the shared state with the election results
91 ///
92 /// The algorithm is intentionally simple and deterministic, ensuring that all
93 /// nodes will independently arrive at the same conclusion about who the leader is,
94 /// without requiring additional communication.
95 ///
96 /// # Special Cases
97 ///
98 /// - If the cluster contains only one node, that node becomes the leader.
99 /// - If the cluster contains no nodes (which shouldn't happen as the current node
100 /// should always be included), the current node becomes the leader by default.
101 ///
102 /// # Side Effects
103 ///
104 /// - Updates the shared state to reflect the new leader
105 /// - Logs information about the election process and results
106 async fn election_cycle(&self) {
107 // Get reference to cluster manager and retrieve all nodes
108 let cluster_manager = CLUSTER_MANAGER.read().await;
109 let nodes = cluster_manager.get_nodes_and_self().await;
110
111 // Log participating nodes for debugging
112 log::info!("Nodes participating in election:");
113 for node in &nodes {
114 log::info!(" - {}", node.id);
115 }
116
117 // Acquire write lock on shared state to update leadership information
118 let mut state = self.state.write().await;
119
120 // Sort nodes by ID for deterministic leader selection
121 // This ensures all nodes will independently choose the same leader
122 let mut sorted_nodes = nodes.clone();
123 sorted_nodes.sort_by(|a, b| a.id.cmp(&b.id));
124 log::info!("Sorted nodes: {:?}", sorted_nodes);
125
126 // Handle the case where this is the only node (or no nodes, which shouldn't happen)
127 if sorted_nodes.is_empty() {
128 state.is_leader = true;
129 state.leader_id = Some(self.node_id.clone());
130 log::info!("Single node {} becoming leader", self.node_id);
131 return;
132 }
133
134 // First node in sorted list becomes leader
135 let leader = &sorted_nodes[0];
136 let is_self_leader = leader.id == self.node_id;
137 log::info!("Leader logic: {} == {}", leader.id, self.node_id);
138
139 // Update state with leader information
140 state.is_leader = is_self_leader;
141 state.leader_id = Some(leader.id.clone());
142
143 // Log election results
144 log::info!("Leader elected: {})", leader.id);
145 log::info!(
146 "This node ({}) is {}",
147 self.node_id,
148 if is_self_leader { "leader" } else { "follower" }
149 );
150 }
151}