foundationdb/tuple_ext/
hca.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! The directory layer offers subspace indirection, where logical application subspaces are mapped to short, auto-generated key prefixes. This prefix assignment is done by the High Contention Allocator, which allows many clients to allocate short directory prefixes efficiently.
10//!
11//! The allocation process works over candidate value windows. It uses two subspaces to operate, the "counters" subspace and "recents" subspace (derived from the subspace used to create the HCA).
12//!
13//! "counters" contains a single key : "counters : window_start", whose value is the number of allocations in the current window. "window_start" is an integer that marks the lower bound of values that can be assigned from the current window.
14//! "recents" can contain many keys : "recents : candidate", where each "candidate" is an integer that has been assigned to some client
15//!
16//! Assignment has two stages that are executed in a loop until they both succeed.
17//!
18//! 1. Find the current window. The client scans "counters : *" to get the current "window_start" and how many allocations have been made in the current window.
19//!      If the window is more than half-full (using pre-defined window sizes), the window is advanced: "counters : *" and "recents : *" are both cleared, and a new "counters : window_start + window_size" key is created with a value of 0. (1) is retried
20//!      If the window still has space, it moves to (2).
21//!
22//! 2. Find a candidate value inside that window. The client picks a candidate number between "[window_start, window_start + window_size)" and tries to set the key "recents : candidate".
23//!      If the write succeeds, the candidate is returned as the allocated value. Success!
24//!      If the write fails because the window has been advanced, it repeats (1).
25//!      If the write fails because the value was already set, it repeats (2).
26
27use std::fmt;
28use std::sync::{Mutex, PoisonError};
29
30use futures::future;
31use rand::{self, rngs::SmallRng, Rng, SeedableRng};
32
33use crate::options::{ConflictRangeType, MutationType, TransactionOption};
34use crate::tuple::{PackError, Subspace};
35use crate::*;
36
37const ONE_BYTES: &[u8] = &[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
38
39pub enum HcaError {
40    FdbError(FdbError),
41    PackError(PackError),
42    InvalidDirectoryLayerMetadata,
43    PoisonError,
44}
45
46impl fmt::Debug for HcaError {
47    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
48        match self {
49            HcaError::FdbError(err) => err.fmt(f),
50            HcaError::PackError(err) => err.fmt(f),
51            HcaError::InvalidDirectoryLayerMetadata => {
52                write!(f, "invalid directory layer metadata")
53            }
54            HcaError::PoisonError => write!(f, "mutex poisoned"),
55        }
56    }
57}
58
59impl From<FdbError> for HcaError {
60    fn from(err: FdbError) -> Self {
61        Self::FdbError(err)
62    }
63}
64impl From<PackError> for HcaError {
65    fn from(err: PackError) -> Self {
66        Self::PackError(err)
67    }
68}
69impl<T> From<PoisonError<T>> for HcaError {
70    fn from(_err: PoisonError<T>) -> Self {
71        Self::PoisonError
72    }
73}
74
75impl TransactError for HcaError {
76    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
77        match self {
78            HcaError::FdbError(err) => Ok(err),
79            _ => Err(self),
80        }
81    }
82}
83
84/// Represents a High Contention Allocator for a given subspace
85#[derive(Debug)]
86pub struct HighContentionAllocator {
87    counters: Subspace,
88    recent: Subspace,
89    allocation_mutex: Mutex<()>,
90}
91
92impl HighContentionAllocator {
93    /// Constructs an allocator that will use the input subspace for assigning values.
94    /// The given subspace should not be used by anything other than the allocator
95    pub fn new(subspace: Subspace) -> HighContentionAllocator {
96        HighContentionAllocator {
97            counters: subspace.subspace(&0i64),
98            recent: subspace.subspace(&1i64),
99            allocation_mutex: Mutex::new(()),
100        }
101    }
102
103    /// Returns a byte string that
104    ///   1) has never and will never be returned by another call to this method on the same subspace
105    ///   2) is nearly as short as possible given the above
106    pub async fn allocate(&self, trx: &Transaction) -> Result<i64, HcaError> {
107        let (begin, end) = self.counters.range();
108        let begin = KeySelector::first_greater_or_equal(begin);
109        let end = KeySelector::first_greater_than(end);
110        let counters_range = RangeOption {
111            begin,
112            end,
113            limit: Some(1),
114            reverse: true,
115            ..RangeOption::default()
116        };
117        let mut rng = SmallRng::from_rng(&mut rand::rng());
118
119        loop {
120            let kvs = trx.get_range(&counters_range, 1, true).await?;
121
122            let mut start: i64 = if let Some(first) = kvs.first() {
123                self.counters.unpack(first.key())?
124            } else {
125                0
126            };
127
128            let mut window_advanced = false;
129
130            let window = loop {
131                let counters_start = self.counters.subspace(&start);
132
133                let count_future = {
134                    let _mutex_guard = self.allocation_mutex.lock()?;
135                    if window_advanced {
136                        trx.clear_range(self.counters.bytes(), counters_start.bytes());
137                        trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?;
138                        trx.clear_range(self.recent.bytes(), self.recent.subspace(&start).bytes());
139                    };
140
141                    // Increment the allocation count for the current window
142                    trx.atomic_op(counters_start.bytes(), ONE_BYTES, MutationType::Add);
143                    trx.get(counters_start.bytes(), true)
144                };
145
146                let count_value = count_future.await?;
147                let count = if let Some(count_value) = count_value {
148                    if count_value.len() != 8 {
149                        return Err(HcaError::InvalidDirectoryLayerMetadata);
150                    }
151                    let mut bytes = [0u8; 8];
152                    bytes.copy_from_slice(&count_value);
153                    i64::from_le_bytes(bytes)
154                } else {
155                    0
156                };
157
158                let window = Self::window_size(start);
159                if count * 2 < window {
160                    break window;
161                }
162
163                start += window;
164                window_advanced = true;
165            };
166
167            loop {
168                // As of the snapshot being read from, the window is less than half
169                // full, so this should be expected to take 2 tries.  Under high
170                // contention (and when the window advances), there is an additional
171                // subsequent risk of conflict for this transaction.
172                let candidate: i64 = rng.random_range(start..start + window);
173                let recent_candidate = self.recent.subspace(&candidate);
174
175                let (latest_counter, candidate_value) = {
176                    let _mutex_guard = self.allocation_mutex.lock()?;
177                    let latest_counter = trx.get_range(&counters_range, 1, true);
178                    let candidate_value = trx.get(recent_candidate.bytes(), false);
179                    trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?;
180                    trx.set(recent_candidate.bytes(), &[]);
181                    (latest_counter, candidate_value)
182                };
183
184                let (latest_counter, candidate_value) =
185                    future::try_join(latest_counter, candidate_value).await?;
186
187                let current_window_start: i64 = if let Some(first) = latest_counter.first() {
188                    self.counters.unpack(first.key())?
189                } else {
190                    0
191                };
192
193                if current_window_start > start {
194                    break;
195                }
196
197                if candidate_value.is_none() {
198                    let mut after = recent_candidate.bytes().to_vec();
199                    after.push(0x00);
200                    trx.add_conflict_range(
201                        recent_candidate.bytes(),
202                        &after,
203                        ConflictRangeType::Write,
204                    )?;
205                    return Ok(candidate);
206                }
207            }
208        }
209    }
210
211    fn window_size(start: i64) -> i64 {
212        // Larger window sizes are better for high contention, smaller sizes for
213        // keeping the keys small.  But if there are many allocations, the keys
214        // can't be too small.  So start small and scale up.  We don't want this to
215        // ever get *too* big because we have to store about window_size/2 recent
216        // items.
217        match start {
218            _ if start < 255 => 64,
219            _ if start < 65535 => 1024,
220            _ => 8192,
221        }
222    }
223}