foundationdb/recipes/leader_election/
mod.rs

1// Copyright 2024 foundationdb-rs developers
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7
8//! # Leader Election for FoundationDB
9//!
10//! A distributed leader election recipe using FoundationDB as coordination backend.
11//! Similar to [Apache Curator's LeaderLatch](https://curator.apache.org/) for ZooKeeper,
12//! but leveraging FDB's serializable transactions for stronger guarantees.
13//!
14//! ## When to Use This
15//!
16//! **Good use cases:**
17//! - Singleton services (only one instance should be active)
18//! - Job schedulers (one coordinator assigns work)
19//! - Primary/backup failover
20//! - Exclusive access to external resources
21//!
22//! **Consider alternatives if:**
23//! - You need mutex/lock semantics for short critical sections
24//!   (use FoundationDB transactions directly)
25//! - You need fair queuing (this uses priority-based preemption)
26//!
27//! ## API Overview
28//!
29//! The main entry point is [`LeaderElection`](leader_election::LeaderElection). Typical usage follows this pattern:
30//!
31//! | Step | Method | Frequency |
32//! |------|--------|-----------|
33//! | 1. Setup | [`new`](leader_election::LeaderElection::new) | Once per process |
34//! | 2. Initialize | [`initialize`](leader_election::LeaderElection::initialize) | Once globally (idempotent) |
35//! | 3. Register | [`register_candidate`](leader_election::LeaderElection::register_candidate) | Once per process |
36//! | 4. Election loop | [`run_election_cycle`](leader_election::LeaderElection::run_election_cycle) | Every heartbeat interval |
37//! | 5. Shutdown | [`resign_leadership`](leader_election::LeaderElection::resign_leadership) + [`unregister_candidate`](leader_election::LeaderElection::unregister_candidate) | On graceful exit |
38//!
39//! For advanced use cases, lower-level methods are available:
40//! - [`try_claim_leadership`](leader_election::LeaderElection::try_claim_leadership) - Attempt to become leader
41//! - [`refresh_lease`](leader_election::LeaderElection::refresh_lease) - Extend leadership lease
42//! - [`get_leader`](leader_election::LeaderElection::get_leader) - Query current leader
43//! - [`is_leader`](leader_election::LeaderElection::is_leader) - Check if this process is leader
44//!
45//! ## Key Concepts
46//!
47//! ### Ballots
48//! Ballot numbers work like Raft's term - a monotonically increasing counter
49//! that establishes ordering. Higher ballot always wins. Each leadership claim
50//! or lease refresh increments the ballot. This prevents split-brain scenarios
51//! after network partitions heal.
52//!
53//! The ballot is returned in [`LeaderState::ballot`](leader_election::LeaderState::ballot) and can be used as a
54//! fencing token when accessing external resources.
55//!
56//! ### Leases
57//! Leaders hold time-bounded leases configured via [`lease_duration`](leader_election::ElectionConfig::lease_duration).
58//! A leader must call [`run_election_cycle`](leader_election::LeaderElection::run_election_cycle) (or [`refresh_lease`](leader_election::LeaderElection::refresh_lease))
59//! before the lease expires to maintain leadership.
60//!
61//! If a leader fails to refresh (crash, network partition), other candidates
62//! can claim leadership after the lease expires.
63//!
64//! ### Preemption
65//! When [`allow_preemption`](leader_election::ElectionConfig::allow_preemption) is true, higher-priority candidates
66//! can preempt lower-priority leaders. Priority is set via the `priority` parameter
67//! in [`register_candidate`](leader_election::LeaderElection::register_candidate). This enables graceful leadership migration
68//! to new machines during rolling deployments or infrastructure upgrades.
69//!
70//! ## Configuration
71//!
72//! Configure via [`ElectionConfig`](leader_election::ElectionConfig) passed to [`initialize_with_config`](leader_election::LeaderElection::initialize_with_config):
73//!
74//! | Field | Default | Description |
75//! |-------|---------|-------------|
76//! | [`lease_duration`](leader_election::ElectionConfig::lease_duration) | 10s | How long leadership is valid without refresh |
77//! | [`heartbeat_interval`](leader_election::ElectionConfig::heartbeat_interval) | 3s | Recommended interval for calling `run_election_cycle` |
78//! | [`candidate_timeout`](leader_election::ElectionConfig::candidate_timeout) | 15s | When to consider candidates dead |
79//! | [`election_enabled`](leader_election::ElectionConfig::election_enabled) | true | Enable/disable elections globally |
80//! | [`allow_preemption`](leader_election::ElectionConfig::allow_preemption) | true | Allow priority-based preemption |
81//!
82//! **Rule of thumb:** `heartbeat_interval` should be less than `lease_duration / 3`
83//! to allow retries before lease expires.
84//!
85//! ## Return Types
86//!
87//! - [`ElectionResult`](leader_election::ElectionResult) - Returned by [`run_election_cycle`](leader_election::LeaderElection::run_election_cycle), indicates
88//!   whether this process is the leader or a follower
89//! - [`LeaderState`](leader_election::LeaderState) - Information about a leader: process ID, ballot, lease expiry
90//! - [`CandidateInfo`](leader_election::CandidateInfo) - Information about a registered candidate
91//!
92//! ## Safety Properties
93//!
94//! - **Mutual Exclusion**: At most one leader at any time (guaranteed by FDB serializable transactions)
95//! - **Liveness**: A correct process eventually becomes leader
96//! - **Consistency**: Ballot numbers provide total ordering of leadership changes
97//!
98//! ## Simulation Testing
99//!
100//! This implementation is validated through FoundationDB's deterministic simulation
101//! framework under extreme conditions including network partitions, process failures,
102//! and clock skew up to ±2 seconds.
103//!
104//! Key invariants verified:
105//! - No overlapping leadership (mutual exclusion)
106//! - Ballot monotonicity (ballots never regress)
107//! - Fencing token validity (each claim increments ballot)
108//!
109//! See `foundationdb-recipes-simulation` crate for test configurations.
110
111mod algorithm;
112mod errors;
113mod keys;
114mod types;
115
116pub use errors::{LeaderElectionError, Result};
117pub use types::{CandidateInfo, ElectionConfig, ElectionResult, LeaderState};
118
119use crate::{tuple::Subspace, Transaction};
120use std::ops::Deref;
121use std::time::Duration;
122
123/// Coordinator for distributed leader election.
124///
125/// `LeaderElection` provides the interface for participating in leader elections
126/// backed by FoundationDB. Multiple independent elections can coexist by using
127/// different [`Subspace`]s.
128///
129/// # Lifecycle
130///
131/// ```text
132/// ┌─────────────────────────────────────────────────────────────────┐
133/// │  1. new()              Create instance with subspace            │
134/// │  2. initialize()       Set up election (once globally)          │
135/// │  3. register_candidate() Join as candidate (once per process)   │
136/// │  4. run_election_cycle()  Main loop (every heartbeat_interval)  │
137/// │  5. resign_leadership() + unregister_candidate()  Cleanup       │
138/// └─────────────────────────────────────────────────────────────────┘
139/// ```
140///
141/// # Methods by Category
142///
143/// ## Setup
144/// - [`new`](Self::new) - Create election instance
145/// - [`initialize`](Self::initialize) - Initialize with defaults
146/// - [`initialize_with_config`](Self::initialize_with_config) - Initialize with custom [`ElectionConfig`]
147///
148/// ## Candidate Management
149/// - [`register_candidate`](Self::register_candidate) - Join the election
150/// - [`heartbeat_candidate`](Self::heartbeat_candidate) - Send liveness heartbeat
151/// - [`unregister_candidate`](Self::unregister_candidate) - Leave the election
152/// - [`get_candidate`](Self::get_candidate) - Query candidate info
153/// - [`list_candidates`](Self::list_candidates) - List all alive candidates (O(N))
154/// - [`evict_dead_candidates`](Self::evict_dead_candidates) - Remove timed-out candidates (O(N))
155///
156/// ## Leadership Operations (O(1))
157/// - [`try_claim_leadership`](Self::try_claim_leadership) - Attempt to become leader
158/// - [`refresh_lease`](Self::refresh_lease) - Extend leadership lease
159/// - [`resign_leadership`](Self::resign_leadership) - Voluntarily step down
160/// - [`is_leader`](Self::is_leader) - Check if this process is leader
161/// - [`get_leader`](Self::get_leader) - Get current leader (lease-validated)
162/// - [`get_leader_raw`](Self::get_leader_raw) - Get current leader (no lease check)
163///
164/// ## High-Level API
165/// - [`run_election_cycle`](Self::run_election_cycle) - Combined heartbeat + claim (recommended for main loop)
166///
167/// ## Configuration
168/// - [`read_config`](Self::read_config) - Read current configuration
169/// - [`write_config`](Self::write_config) - Update configuration dynamically
170///
171/// # Thread Safety
172///
173/// `LeaderElection` is [`Clone`], [`Send`], and [`Sync`]. It holds only a
174/// [`Subspace`] and can be safely shared across tasks.
175/// Each method operates within the provided transaction's scope.
176///
177/// # Related Types
178///
179/// - [`ElectionConfig`] - Configuration parameters
180/// - [`ElectionResult`] - Return type from [`run_election_cycle`](Self::run_election_cycle)
181/// - [`LeaderState`] - Leader information including ballot (fencing token)
182/// - [`CandidateInfo`] - Registered candidate information
183/// - [`LeaderElectionError`] - Error types for this module
184///
185/// # See Also
186///
187/// See the [module documentation](self) for algorithm details, key concepts
188/// (ballots, leases, preemption), and configuration guidelines.
189#[derive(Clone)]
190pub struct LeaderElection {
191    subspace: Subspace,
192}
193
194impl LeaderElection {
195    /// Create a new leader election instance with the given subspace
196    ///
197    /// The subspace isolates this election from others in the database.
198    /// All election data will be stored under this subspace prefix.
199    ///
200    /// # Arguments
201    /// * `subspace` - The FoundationDB subspace to use for storing election data
202    pub fn new(subspace: Subspace) -> Self {
203        Self { subspace }
204    }
205
206    // ========================================================================
207    // INITIALIZATION
208    // ========================================================================
209
210    /// Initialize the leader election system with default settings
211    ///
212    /// This must be called once before any processes can participate in the election.
213    /// Sets up the necessary configuration with sensible defaults:
214    /// - 10 second lease duration
215    /// - 3 second heartbeat interval
216    /// - 15 second candidate timeout
217    /// - Elections enabled
218    /// - Preemption enabled
219    ///
220    /// This operation is idempotent - calling it multiple times has no effect.
221    pub async fn initialize<T>(&self, txn: &T) -> Result<()>
222    where
223        T: Deref<Target = Transaction>,
224    {
225        algorithm::initialize(txn, &self.subspace, ElectionConfig::default()).await
226    }
227
228    /// Initialize the leader election system with custom configuration
229    ///
230    /// Allows fine-tuning election parameters for specific use cases.
231    /// This must be called once before any processes can participate.
232    ///
233    /// # Arguments
234    /// * `config` - Custom election configuration
235    pub async fn initialize_with_config<T>(&self, txn: &T, config: ElectionConfig) -> Result<()>
236    where
237        T: Deref<Target = Transaction>,
238    {
239        algorithm::initialize(txn, &self.subspace, config).await
240    }
241
242    /// Write election configuration
243    ///
244    /// Updates the global election parameters dynamically.
245    ///
246    /// # Warning
247    /// Changing configuration during active elections may cause temporary
248    /// leadership instability.
249    pub async fn write_config<T>(&self, txn: &T, config: &ElectionConfig) -> Result<()>
250    where
251        T: Deref<Target = Transaction>,
252    {
253        algorithm::write_config(txn, &self.subspace, config).await
254    }
255
256    /// Read current election configuration
257    pub async fn read_config<T>(&self, txn: &T) -> Result<ElectionConfig>
258    where
259        T: Deref<Target = Transaction>,
260    {
261        algorithm::read_config(txn, &self.subspace).await
262    }
263
264    // ========================================================================
265    // CANDIDATE MANAGEMENT
266    // ========================================================================
267
268    /// Register as a candidate
269    ///
270    /// Registers this process as a candidate for leadership. Uses SetVersionstampedValue
271    /// to assign a unique versionstamp at registration time.
272    ///
273    /// # Arguments
274    /// * `process_id` - Unique identifier for this process
275    /// * `priority` - Priority level (higher = more preferred for leadership)
276    /// * `current_time` - Current time for heartbeat timestamp
277    ///
278    /// # Note
279    /// The versionstamp is assigned once at registration and preserved on heartbeats.
280    /// You'll need to read the candidate info after commit to get the actual versionstamp.
281    pub async fn register_candidate<T>(
282        &self,
283        txn: &T,
284        process_id: &str,
285        priority: i32,
286        current_time: Duration,
287    ) -> Result<()>
288    where
289        T: Deref<Target = Transaction>,
290    {
291        algorithm::register_candidate(txn, &self.subspace, process_id, priority, current_time).await
292    }
293
294    /// Send heartbeat as candidate
295    ///
296    /// Updates the candidate's timestamp to indicate liveness.
297    /// This should be called periodically (at `heartbeat_interval`).
298    ///
299    /// # Arguments
300    /// * `process_id` - Unique identifier for this process
301    /// * `priority` - Priority level (can be updated on each heartbeat)
302    /// * `current_time` - Current time for heartbeat timestamp
303    ///
304    /// # Errors
305    /// Returns `ProcessNotFound` if not registered
306    pub async fn heartbeat_candidate<T>(
307        &self,
308        txn: &T,
309        process_id: &str,
310        priority: i32,
311        current_time: Duration,
312    ) -> Result<()>
313    where
314        T: Deref<Target = Transaction>,
315    {
316        algorithm::heartbeat_candidate(txn, &self.subspace, process_id, priority, current_time)
317            .await
318    }
319
320    /// Unregister as candidate
321    ///
322    /// Removes this process from the candidate list.
323    /// If leader, call `resign_leadership` first.
324    pub async fn unregister_candidate<T>(&self, txn: &T, process_id: &str) -> Result<()>
325    where
326        T: Deref<Target = Transaction>,
327    {
328        algorithm::unregister_candidate(txn, &self.subspace, process_id).await
329    }
330
331    /// Get candidate info for a specific process
332    pub async fn get_candidate<T>(&self, txn: &T, process_id: &str) -> Result<Option<CandidateInfo>>
333    where
334        T: Deref<Target = Transaction>,
335    {
336        algorithm::get_candidate(txn, &self.subspace, process_id).await
337    }
338
339    /// List all alive candidates
340    ///
341    /// O(N) operation - use sparingly, mainly for monitoring.
342    pub async fn list_candidates<T>(
343        &self,
344        txn: &T,
345        current_time: Duration,
346    ) -> Result<Vec<CandidateInfo>>
347    where
348        T: Deref<Target = Transaction>,
349    {
350        algorithm::list_candidates(txn, &self.subspace, current_time).await
351    }
352
353    /// Remove dead candidates
354    ///
355    /// O(N) operation - should be called by leader periodically.
356    /// Returns count of evicted candidates.
357    pub async fn evict_dead_candidates<T>(&self, txn: &T, current_time: Duration) -> Result<usize>
358    where
359        T: Deref<Target = Transaction>,
360    {
361        algorithm::evict_dead_candidates(txn, &self.subspace, current_time).await
362    }
363
364    // ========================================================================
365    // LEADERSHIP OPERATIONS (O(1))
366    // ========================================================================
367
368    /// Try to claim leadership
369    ///
370    /// Attempts to become the leader. This is an O(1) operation that:
371    /// 1. Looks up candidate registration to get versionstamp
372    /// 2. Reads the current leader state
373    /// 3. Checks if we can claim (no leader, expired lease, or preemption)
374    /// 4. Writes new leader state with incremented ballot
375    ///
376    /// # Arguments
377    /// * `process_id` - Unique identifier for this process
378    /// * `priority` - Priority level for preemption decisions
379    /// * `current_time` - Current time for lease calculation
380    ///
381    /// # Returns
382    /// * `Ok(Some(state))` - Successfully claimed leadership
383    /// * `Ok(None)` - Cannot claim, another valid leader exists
384    /// * `Err(UnregisteredCandidate)` - Process is not registered as a candidate
385    pub async fn try_claim_leadership<T>(
386        &self,
387        txn: &T,
388        process_id: &str,
389        priority: i32,
390        current_time: Duration,
391    ) -> Result<Option<LeaderState>>
392    where
393        T: Deref<Target = Transaction>,
394    {
395        algorithm::try_claim_leadership(txn, &self.subspace, process_id, priority, current_time)
396            .await
397    }
398
399    /// Refresh leadership lease
400    ///
401    /// Called periodically by the leader to extend lease.
402    /// Fails if no longer the leader.
403    ///
404    /// # Returns
405    /// * `Ok(Some(state))` - Lease refreshed
406    /// * `Ok(None)` - No longer the leader
407    pub async fn refresh_lease<T>(
408        &self,
409        txn: &T,
410        process_id: &str,
411        current_time: Duration,
412    ) -> Result<Option<LeaderState>>
413    where
414        T: Deref<Target = Transaction>,
415    {
416        algorithm::refresh_lease(txn, &self.subspace, process_id, current_time).await
417    }
418
419    /// Voluntarily resign leadership
420    ///
421    /// Immediately releases leadership.
422    ///
423    /// # Returns
424    /// `true` if was leader and resigned, `false` otherwise
425    pub async fn resign_leadership<T>(&self, txn: &T, process_id: &str) -> Result<bool>
426    where
427        T: Deref<Target = Transaction>,
428    {
429        algorithm::resign_leadership(txn, &self.subspace, process_id).await
430    }
431
432    /// Check if this process is the current leader
433    ///
434    /// O(1) operation.
435    pub async fn is_leader<T>(
436        &self,
437        txn: &T,
438        process_id: &str,
439        current_time: Duration,
440    ) -> Result<bool>
441    where
442        T: Deref<Target = Transaction>,
443    {
444        algorithm::is_leader(txn, &self.subspace, process_id, current_time).await
445    }
446
447    /// Get current leader information
448    ///
449    /// O(1) operation. Returns None if no leader or lease expired.
450    pub async fn get_leader<T>(
451        &self,
452        txn: &T,
453        current_time: Duration,
454    ) -> Result<Option<LeaderState>>
455    where
456        T: Deref<Target = Transaction>,
457    {
458        algorithm::get_leader(txn, &self.subspace, current_time).await
459    }
460
461    /// Get current leader information without lease validation
462    ///
463    /// O(1) operation. Returns leader state regardless of whether lease has expired.
464    /// Useful for debugging, monitoring, and invariant checking.
465    pub async fn get_leader_raw<T>(&self, txn: &T) -> Result<Option<LeaderState>>
466    where
467        T: Deref<Target = Transaction>,
468    {
469        algorithm::get_leader_raw(txn, &self.subspace).await
470    }
471
472    // ========================================================================
473    // HIGH-LEVEL CONVENIENCE API
474    // ========================================================================
475
476    /// Run a complete election cycle
477    ///
478    /// Combines candidate heartbeat + leadership claim in one operation.
479    /// This is what most users should call in their main loop.
480    ///
481    /// # Arguments
482    /// * `process_id` - Unique identifier for this process
483    /// * `priority` - Priority level
484    /// * `current_time` - Current time
485    ///
486    /// # Returns
487    /// `ElectionResult::Leader` if this process is leader, `ElectionResult::Follower` otherwise
488    pub async fn run_election_cycle<T>(
489        &self,
490        txn: &T,
491        process_id: &str,
492        priority: i32,
493        current_time: Duration,
494    ) -> Result<ElectionResult>
495    where
496        T: Deref<Target = Transaction>,
497    {
498        algorithm::run_election_cycle(txn, &self.subspace, process_id, priority, current_time).await
499    }
500}