foundationdb/recipes/leader_election/mod.rs
1// Copyright 2024 foundationdb-rs developers
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8//! # Leader Election for FoundationDB
9//!
10//! A distributed leader election recipe using FoundationDB as coordination backend.
11//! Similar to [Apache Curator's LeaderLatch](https://curator.apache.org/) for ZooKeeper,
12//! but leveraging FDB's serializable transactions for stronger guarantees.
13//!
14//! ## When to Use This
15//!
16//! **Good use cases:**
17//! - Singleton services (only one instance should be active)
18//! - Job schedulers (one coordinator assigns work)
19//! - Primary/backup failover
20//! - Exclusive access to external resources
21//!
22//! **Consider alternatives if:**
23//! - You need mutex/lock semantics for short critical sections
24//! (use FoundationDB transactions directly)
25//! - You need fair queuing (this uses priority-based preemption)
26//!
27//! ## API Overview
28//!
29//! The main entry point is [`LeaderElection`](leader_election::LeaderElection). Typical usage follows this pattern:
30//!
31//! | Step | Method | Frequency |
32//! |------|--------|-----------|
33//! | 1. Setup | [`new`](leader_election::LeaderElection::new) | Once per process |
34//! | 2. Initialize | [`initialize`](leader_election::LeaderElection::initialize) | Once globally (idempotent) |
35//! | 3. Register | [`register_candidate`](leader_election::LeaderElection::register_candidate) | Once per process |
36//! | 4. Election loop | [`run_election_cycle`](leader_election::LeaderElection::run_election_cycle) | Every heartbeat interval |
37//! | 5. Shutdown | [`resign_leadership`](leader_election::LeaderElection::resign_leadership) + [`unregister_candidate`](leader_election::LeaderElection::unregister_candidate) | On graceful exit |
38//!
39//! For advanced use cases, lower-level methods are available:
40//! - [`try_claim_leadership`](leader_election::LeaderElection::try_claim_leadership) - Attempt to become leader
41//! - [`refresh_lease`](leader_election::LeaderElection::refresh_lease) - Extend leadership lease
42//! - [`get_leader`](leader_election::LeaderElection::get_leader) - Query current leader
43//! - [`is_leader`](leader_election::LeaderElection::is_leader) - Check if this process is leader
44//!
45//! ## Key Concepts
46//!
47//! ### Ballots
48//! Ballot numbers work like Raft's term - a monotonically increasing counter
49//! that establishes ordering. Higher ballot always wins. Each leadership claim
50//! or lease refresh increments the ballot. This prevents split-brain scenarios
51//! after network partitions heal.
52//!
53//! The ballot is returned in [`LeaderState::ballot`](leader_election::LeaderState::ballot) and can be used as a
54//! fencing token when accessing external resources.
55//!
56//! ### Leases
57//! Leaders hold time-bounded leases configured via [`lease_duration`](leader_election::ElectionConfig::lease_duration).
58//! A leader must call [`run_election_cycle`](leader_election::LeaderElection::run_election_cycle) (or [`refresh_lease`](leader_election::LeaderElection::refresh_lease))
59//! before the lease expires to maintain leadership.
60//!
61//! If a leader fails to refresh (crash, network partition), other candidates
62//! can claim leadership after the lease expires.
63//!
64//! ### Preemption
65//! When [`allow_preemption`](leader_election::ElectionConfig::allow_preemption) is true, higher-priority candidates
66//! can preempt lower-priority leaders. Priority is set via the `priority` parameter
67//! in [`register_candidate`](leader_election::LeaderElection::register_candidate). This enables graceful leadership migration
68//! to new machines during rolling deployments or infrastructure upgrades.
69//!
70//! ## Configuration
71//!
72//! Configure via [`ElectionConfig`](leader_election::ElectionConfig) passed to [`initialize_with_config`](leader_election::LeaderElection::initialize_with_config):
73//!
74//! | Field | Default | Description |
75//! |-------|---------|-------------|
76//! | [`lease_duration`](leader_election::ElectionConfig::lease_duration) | 10s | How long leadership is valid without refresh |
77//! | [`heartbeat_interval`](leader_election::ElectionConfig::heartbeat_interval) | 3s | Recommended interval for calling `run_election_cycle` |
78//! | [`candidate_timeout`](leader_election::ElectionConfig::candidate_timeout) | 15s | When to consider candidates dead |
79//! | [`election_enabled`](leader_election::ElectionConfig::election_enabled) | true | Enable/disable elections globally |
80//! | [`allow_preemption`](leader_election::ElectionConfig::allow_preemption) | true | Allow priority-based preemption |
81//!
82//! **Rule of thumb:** `heartbeat_interval` should be less than `lease_duration / 3`
83//! to allow retries before lease expires.
84//!
85//! ## Return Types
86//!
87//! - [`ElectionResult`](leader_election::ElectionResult) - Returned by [`run_election_cycle`](leader_election::LeaderElection::run_election_cycle), indicates
88//! whether this process is the leader or a follower
89//! - [`LeaderState`](leader_election::LeaderState) - Information about a leader: process ID, ballot, lease expiry
90//! - [`CandidateInfo`](leader_election::CandidateInfo) - Information about a registered candidate
91//!
92//! ## Safety Properties
93//!
94//! - **Mutual Exclusion**: At most one leader at any time (guaranteed by FDB serializable transactions)
95//! - **Liveness**: A correct process eventually becomes leader
96//! - **Consistency**: Ballot numbers provide total ordering of leadership changes
97//!
98//! ## Simulation Testing
99//!
100//! This implementation is validated through FoundationDB's deterministic simulation
101//! framework under extreme conditions including network partitions, process failures,
102//! and clock skew up to ±2 seconds.
103//!
104//! Key invariants verified:
105//! - No overlapping leadership (mutual exclusion)
106//! - Ballot monotonicity (ballots never regress)
107//! - Fencing token validity (each claim increments ballot)
108//!
109//! See `foundationdb-recipes-simulation` crate for test configurations.
110
111mod algorithm;
112mod errors;
113mod keys;
114mod types;
115
116pub use errors::{LeaderElectionError, Result};
117pub use types::{CandidateInfo, ElectionConfig, ElectionResult, LeaderState};
118
119use crate::{tuple::Subspace, Transaction};
120use std::ops::Deref;
121use std::time::Duration;
122
123/// Coordinator for distributed leader election.
124///
125/// `LeaderElection` provides the interface for participating in leader elections
126/// backed by FoundationDB. Multiple independent elections can coexist by using
127/// different [`Subspace`]s.
128///
129/// # Lifecycle
130///
131/// ```text
132/// ┌─────────────────────────────────────────────────────────────────┐
133/// │ 1. new() Create instance with subspace │
134/// │ 2. initialize() Set up election (once globally) │
135/// │ 3. register_candidate() Join as candidate (once per process) │
136/// │ 4. run_election_cycle() Main loop (every heartbeat_interval) │
137/// │ 5. resign_leadership() + unregister_candidate() Cleanup │
138/// └─────────────────────────────────────────────────────────────────┘
139/// ```
140///
141/// # Methods by Category
142///
143/// ## Setup
144/// - [`new`](Self::new) - Create election instance
145/// - [`initialize`](Self::initialize) - Initialize with defaults
146/// - [`initialize_with_config`](Self::initialize_with_config) - Initialize with custom [`ElectionConfig`]
147///
148/// ## Candidate Management
149/// - [`register_candidate`](Self::register_candidate) - Join the election
150/// - [`heartbeat_candidate`](Self::heartbeat_candidate) - Send liveness heartbeat
151/// - [`unregister_candidate`](Self::unregister_candidate) - Leave the election
152/// - [`get_candidate`](Self::get_candidate) - Query candidate info
153/// - [`list_candidates`](Self::list_candidates) - List all alive candidates (O(N))
154/// - [`evict_dead_candidates`](Self::evict_dead_candidates) - Remove timed-out candidates (O(N))
155///
156/// ## Leadership Operations (O(1))
157/// - [`try_claim_leadership`](Self::try_claim_leadership) - Attempt to become leader
158/// - [`refresh_lease`](Self::refresh_lease) - Extend leadership lease
159/// - [`resign_leadership`](Self::resign_leadership) - Voluntarily step down
160/// - [`is_leader`](Self::is_leader) - Check if this process is leader
161/// - [`get_leader`](Self::get_leader) - Get current leader (lease-validated)
162/// - [`get_leader_raw`](Self::get_leader_raw) - Get current leader (no lease check)
163///
164/// ## High-Level API
165/// - [`run_election_cycle`](Self::run_election_cycle) - Combined heartbeat + claim (recommended for main loop)
166///
167/// ## Configuration
168/// - [`read_config`](Self::read_config) - Read current configuration
169/// - [`write_config`](Self::write_config) - Update configuration dynamically
170///
171/// # Thread Safety
172///
173/// `LeaderElection` is [`Clone`], [`Send`], and [`Sync`]. It holds only a
174/// [`Subspace`] and can be safely shared across tasks.
175/// Each method operates within the provided transaction's scope.
176///
177/// # Related Types
178///
179/// - [`ElectionConfig`] - Configuration parameters
180/// - [`ElectionResult`] - Return type from [`run_election_cycle`](Self::run_election_cycle)
181/// - [`LeaderState`] - Leader information including ballot (fencing token)
182/// - [`CandidateInfo`] - Registered candidate information
183/// - [`LeaderElectionError`] - Error types for this module
184///
185/// # See Also
186///
187/// See the [module documentation](self) for algorithm details, key concepts
188/// (ballots, leases, preemption), and configuration guidelines.
189#[derive(Clone)]
190pub struct LeaderElection {
191 subspace: Subspace,
192}
193
194impl LeaderElection {
195 /// Create a new leader election instance with the given subspace
196 ///
197 /// The subspace isolates this election from others in the database.
198 /// All election data will be stored under this subspace prefix.
199 ///
200 /// # Arguments
201 /// * `subspace` - The FoundationDB subspace to use for storing election data
202 pub fn new(subspace: Subspace) -> Self {
203 Self { subspace }
204 }
205
206 // ========================================================================
207 // INITIALIZATION
208 // ========================================================================
209
210 /// Initialize the leader election system with default settings
211 ///
212 /// This must be called once before any processes can participate in the election.
213 /// Sets up the necessary configuration with sensible defaults:
214 /// - 10 second lease duration
215 /// - 3 second heartbeat interval
216 /// - 15 second candidate timeout
217 /// - Elections enabled
218 /// - Preemption enabled
219 ///
220 /// This operation is idempotent - calling it multiple times has no effect.
221 pub async fn initialize<T>(&self, txn: &T) -> Result<()>
222 where
223 T: Deref<Target = Transaction>,
224 {
225 algorithm::initialize(txn, &self.subspace, ElectionConfig::default()).await
226 }
227
228 /// Initialize the leader election system with custom configuration
229 ///
230 /// Allows fine-tuning election parameters for specific use cases.
231 /// This must be called once before any processes can participate.
232 ///
233 /// # Arguments
234 /// * `config` - Custom election configuration
235 pub async fn initialize_with_config<T>(&self, txn: &T, config: ElectionConfig) -> Result<()>
236 where
237 T: Deref<Target = Transaction>,
238 {
239 algorithm::initialize(txn, &self.subspace, config).await
240 }
241
242 /// Write election configuration
243 ///
244 /// Updates the global election parameters dynamically.
245 ///
246 /// # Warning
247 /// Changing configuration during active elections may cause temporary
248 /// leadership instability.
249 pub async fn write_config<T>(&self, txn: &T, config: &ElectionConfig) -> Result<()>
250 where
251 T: Deref<Target = Transaction>,
252 {
253 algorithm::write_config(txn, &self.subspace, config).await
254 }
255
256 /// Read current election configuration
257 pub async fn read_config<T>(&self, txn: &T) -> Result<ElectionConfig>
258 where
259 T: Deref<Target = Transaction>,
260 {
261 algorithm::read_config(txn, &self.subspace).await
262 }
263
264 // ========================================================================
265 // CANDIDATE MANAGEMENT
266 // ========================================================================
267
268 /// Register as a candidate
269 ///
270 /// Registers this process as a candidate for leadership. Uses SetVersionstampedValue
271 /// to assign a unique versionstamp at registration time.
272 ///
273 /// # Arguments
274 /// * `process_id` - Unique identifier for this process
275 /// * `priority` - Priority level (higher = more preferred for leadership)
276 /// * `current_time` - Current time for heartbeat timestamp
277 ///
278 /// # Note
279 /// The versionstamp is assigned once at registration and preserved on heartbeats.
280 /// You'll need to read the candidate info after commit to get the actual versionstamp.
281 pub async fn register_candidate<T>(
282 &self,
283 txn: &T,
284 process_id: &str,
285 priority: i32,
286 current_time: Duration,
287 ) -> Result<()>
288 where
289 T: Deref<Target = Transaction>,
290 {
291 algorithm::register_candidate(txn, &self.subspace, process_id, priority, current_time).await
292 }
293
294 /// Send heartbeat as candidate
295 ///
296 /// Updates the candidate's timestamp to indicate liveness.
297 /// This should be called periodically (at `heartbeat_interval`).
298 ///
299 /// # Arguments
300 /// * `process_id` - Unique identifier for this process
301 /// * `priority` - Priority level (can be updated on each heartbeat)
302 /// * `current_time` - Current time for heartbeat timestamp
303 ///
304 /// # Errors
305 /// Returns `ProcessNotFound` if not registered
306 pub async fn heartbeat_candidate<T>(
307 &self,
308 txn: &T,
309 process_id: &str,
310 priority: i32,
311 current_time: Duration,
312 ) -> Result<()>
313 where
314 T: Deref<Target = Transaction>,
315 {
316 algorithm::heartbeat_candidate(txn, &self.subspace, process_id, priority, current_time)
317 .await
318 }
319
320 /// Unregister as candidate
321 ///
322 /// Removes this process from the candidate list.
323 /// If leader, call `resign_leadership` first.
324 pub async fn unregister_candidate<T>(&self, txn: &T, process_id: &str) -> Result<()>
325 where
326 T: Deref<Target = Transaction>,
327 {
328 algorithm::unregister_candidate(txn, &self.subspace, process_id).await
329 }
330
331 /// Get candidate info for a specific process
332 pub async fn get_candidate<T>(&self, txn: &T, process_id: &str) -> Result<Option<CandidateInfo>>
333 where
334 T: Deref<Target = Transaction>,
335 {
336 algorithm::get_candidate(txn, &self.subspace, process_id).await
337 }
338
339 /// List all alive candidates
340 ///
341 /// O(N) operation - use sparingly, mainly for monitoring.
342 pub async fn list_candidates<T>(
343 &self,
344 txn: &T,
345 current_time: Duration,
346 ) -> Result<Vec<CandidateInfo>>
347 where
348 T: Deref<Target = Transaction>,
349 {
350 algorithm::list_candidates(txn, &self.subspace, current_time).await
351 }
352
353 /// Remove dead candidates
354 ///
355 /// O(N) operation - should be called by leader periodically.
356 /// Returns count of evicted candidates.
357 pub async fn evict_dead_candidates<T>(&self, txn: &T, current_time: Duration) -> Result<usize>
358 where
359 T: Deref<Target = Transaction>,
360 {
361 algorithm::evict_dead_candidates(txn, &self.subspace, current_time).await
362 }
363
364 // ========================================================================
365 // LEADERSHIP OPERATIONS (O(1))
366 // ========================================================================
367
368 /// Try to claim leadership
369 ///
370 /// Attempts to become the leader. This is an O(1) operation that:
371 /// 1. Looks up candidate registration to get versionstamp
372 /// 2. Reads the current leader state
373 /// 3. Checks if we can claim (no leader, expired lease, or preemption)
374 /// 4. Writes new leader state with incremented ballot
375 ///
376 /// # Arguments
377 /// * `process_id` - Unique identifier for this process
378 /// * `priority` - Priority level for preemption decisions
379 /// * `current_time` - Current time for lease calculation
380 ///
381 /// # Returns
382 /// * `Ok(Some(state))` - Successfully claimed leadership
383 /// * `Ok(None)` - Cannot claim, another valid leader exists
384 /// * `Err(UnregisteredCandidate)` - Process is not registered as a candidate
385 pub async fn try_claim_leadership<T>(
386 &self,
387 txn: &T,
388 process_id: &str,
389 priority: i32,
390 current_time: Duration,
391 ) -> Result<Option<LeaderState>>
392 where
393 T: Deref<Target = Transaction>,
394 {
395 algorithm::try_claim_leadership(txn, &self.subspace, process_id, priority, current_time)
396 .await
397 }
398
399 /// Refresh leadership lease
400 ///
401 /// Called periodically by the leader to extend lease.
402 /// Fails if no longer the leader.
403 ///
404 /// # Returns
405 /// * `Ok(Some(state))` - Lease refreshed
406 /// * `Ok(None)` - No longer the leader
407 pub async fn refresh_lease<T>(
408 &self,
409 txn: &T,
410 process_id: &str,
411 current_time: Duration,
412 ) -> Result<Option<LeaderState>>
413 where
414 T: Deref<Target = Transaction>,
415 {
416 algorithm::refresh_lease(txn, &self.subspace, process_id, current_time).await
417 }
418
419 /// Voluntarily resign leadership
420 ///
421 /// Immediately releases leadership.
422 ///
423 /// # Returns
424 /// `true` if was leader and resigned, `false` otherwise
425 pub async fn resign_leadership<T>(&self, txn: &T, process_id: &str) -> Result<bool>
426 where
427 T: Deref<Target = Transaction>,
428 {
429 algorithm::resign_leadership(txn, &self.subspace, process_id).await
430 }
431
432 /// Check if this process is the current leader
433 ///
434 /// O(1) operation.
435 pub async fn is_leader<T>(
436 &self,
437 txn: &T,
438 process_id: &str,
439 current_time: Duration,
440 ) -> Result<bool>
441 where
442 T: Deref<Target = Transaction>,
443 {
444 algorithm::is_leader(txn, &self.subspace, process_id, current_time).await
445 }
446
447 /// Get current leader information
448 ///
449 /// O(1) operation. Returns None if no leader or lease expired.
450 pub async fn get_leader<T>(
451 &self,
452 txn: &T,
453 current_time: Duration,
454 ) -> Result<Option<LeaderState>>
455 where
456 T: Deref<Target = Transaction>,
457 {
458 algorithm::get_leader(txn, &self.subspace, current_time).await
459 }
460
461 /// Get current leader information without lease validation
462 ///
463 /// O(1) operation. Returns leader state regardless of whether lease has expired.
464 /// Useful for debugging, monitoring, and invariant checking.
465 pub async fn get_leader_raw<T>(&self, txn: &T) -> Result<Option<LeaderState>>
466 where
467 T: Deref<Target = Transaction>,
468 {
469 algorithm::get_leader_raw(txn, &self.subspace).await
470 }
471
472 // ========================================================================
473 // HIGH-LEVEL CONVENIENCE API
474 // ========================================================================
475
476 /// Run a complete election cycle
477 ///
478 /// Combines candidate heartbeat + leadership claim in one operation.
479 /// This is what most users should call in their main loop.
480 ///
481 /// # Arguments
482 /// * `process_id` - Unique identifier for this process
483 /// * `priority` - Priority level
484 /// * `current_time` - Current time
485 ///
486 /// # Returns
487 /// `ElectionResult::Leader` if this process is leader, `ElectionResult::Follower` otherwise
488 pub async fn run_election_cycle<T>(
489 &self,
490 txn: &T,
491 process_id: &str,
492 priority: i32,
493 current_time: Duration,
494 ) -> Result<ElectionResult>
495 where
496 T: Deref<Target = Transaction>,
497 {
498 algorithm::run_election_cycle(txn, &self.subspace, process_id, priority, current_time).await
499 }
500}