foundationdb/tuple_ext/hca.rs
1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! The directory layer offers subspace indirection, where logical application subspaces are mapped to short, auto-generated key prefixes. This prefix assignment is done by the High Contention Allocator, which allows many clients to allocate short directory prefixes efficiently.
10//!
11//! The allocation process works over candidate value windows. It uses two subspaces to operate, the "counters" subspace and "recents" subspace (derived from the subspace used to create the HCA).
12//!
13//! "counters" contains a single key : "counters : window_start", whose value is the number of allocations in the current window. "window_start" is an integer that marks the lower bound of values that can be assigned from the current window.
14//! "recents" can contain many keys : "recents : candidate", where each "candidate" is an integer that has been assigned to some client
15//!
16//! Assignment has two stages that are executed in a loop until they both succeed.
17//!
18//! 1. Find the current window. The client scans "counters : *" to get the current "window_start" and how many allocations have been made in the current window.
19//! If the window is more than half-full (using pre-defined window sizes), the window is advanced: "counters : *" and "recents : *" are both cleared, and a new "counters : window_start + window_size" key is created with a value of 0. (1) is retried
20//! If the window still has space, it moves to (2).
21//!
22//! 2. Find a candidate value inside that window. The client picks a candidate number between "[window_start, window_start + window_size)" and tries to set the key "recents : candidate".
23//! If the write succeeds, the candidate is returned as the allocated value. Success!
24//! If the write fails because the window has been advanced, it repeats (1).
25//! If the write fails because the value was already set, it repeats (2).
26
27use std::fmt;
28use std::sync::{Mutex, PoisonError};
29
30use futures::future;
31use rand::{self, rngs::SmallRng, Rng, SeedableRng};
32
33use crate::options::{ConflictRangeType, MutationType, TransactionOption};
34use crate::tuple::{PackError, Subspace};
35use crate::*;
36
37const ONE_BYTES: &[u8] = &[0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00];
38
39pub enum HcaError {
40 FdbError(FdbError),
41 PackError(PackError),
42 InvalidDirectoryLayerMetadata,
43 PoisonError,
44}
45
46impl fmt::Debug for HcaError {
47 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
48 match self {
49 HcaError::FdbError(err) => err.fmt(f),
50 HcaError::PackError(err) => err.fmt(f),
51 HcaError::InvalidDirectoryLayerMetadata => {
52 write!(f, "invalid directory layer metadata")
53 }
54 HcaError::PoisonError => write!(f, "mutex poisoned"),
55 }
56 }
57}
58
59impl From<FdbError> for HcaError {
60 fn from(err: FdbError) -> Self {
61 Self::FdbError(err)
62 }
63}
64impl From<PackError> for HcaError {
65 fn from(err: PackError) -> Self {
66 Self::PackError(err)
67 }
68}
69impl<T> From<PoisonError<T>> for HcaError {
70 fn from(_err: PoisonError<T>) -> Self {
71 Self::PoisonError
72 }
73}
74
75impl TransactError for HcaError {
76 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
77 match self {
78 HcaError::FdbError(err) => Ok(err),
79 _ => Err(self),
80 }
81 }
82}
83
84/// Represents a High Contention Allocator for a given subspace
85#[derive(Debug)]
86pub struct HighContentionAllocator {
87 counters: Subspace,
88 recent: Subspace,
89 allocation_mutex: Mutex<()>,
90}
91
92impl HighContentionAllocator {
93 /// Constructs an allocator that will use the input subspace for assigning values.
94 /// The given subspace should not be used by anything other than the allocator
95 pub fn new(subspace: Subspace) -> HighContentionAllocator {
96 HighContentionAllocator {
97 counters: subspace.subspace(&0i64),
98 recent: subspace.subspace(&1i64),
99 allocation_mutex: Mutex::new(()),
100 }
101 }
102
103 /// Returns a byte string that
104 /// 1) has never and will never be returned by another call to this method on the same subspace
105 /// 2) is nearly as short as possible given the above
106 pub async fn allocate(&self, trx: &Transaction) -> Result<i64, HcaError> {
107 let (begin, end) = self.counters.range();
108 let begin = KeySelector::first_greater_or_equal(begin);
109 let end = KeySelector::first_greater_than(end);
110 let counters_range = RangeOption {
111 begin,
112 end,
113 limit: Some(1),
114 reverse: true,
115 ..RangeOption::default()
116 };
117 let mut rng = SmallRng::from_rng(&mut rand::rng());
118
119 loop {
120 let kvs = trx.get_range(&counters_range, 1, true).await?;
121
122 let mut start: i64 = if let Some(first) = kvs.first() {
123 self.counters.unpack(first.key())?
124 } else {
125 0
126 };
127
128 let mut window_advanced = false;
129
130 let window = loop {
131 let counters_start = self.counters.subspace(&start);
132
133 let count_future = {
134 let _mutex_guard = self.allocation_mutex.lock()?;
135 if window_advanced {
136 trx.clear_range(self.counters.bytes(), counters_start.bytes());
137 trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?;
138 trx.clear_range(self.recent.bytes(), self.recent.subspace(&start).bytes());
139 };
140
141 // Increment the allocation count for the current window
142 trx.atomic_op(counters_start.bytes(), ONE_BYTES, MutationType::Add);
143 trx.get(counters_start.bytes(), true)
144 };
145
146 let count_value = count_future.await?;
147 let count = if let Some(count_value) = count_value {
148 if count_value.len() != 8 {
149 return Err(HcaError::InvalidDirectoryLayerMetadata);
150 }
151 let mut bytes = [0u8; 8];
152 bytes.copy_from_slice(&count_value);
153 i64::from_le_bytes(bytes)
154 } else {
155 0
156 };
157
158 let window = Self::window_size(start);
159 if count * 2 < window {
160 break window;
161 }
162
163 start += window;
164 window_advanced = true;
165 };
166
167 loop {
168 // As of the snapshot being read from, the window is less than half
169 // full, so this should be expected to take 2 tries. Under high
170 // contention (and when the window advances), there is an additional
171 // subsequent risk of conflict for this transaction.
172 let candidate: i64 = rng.random_range(start..start + window);
173 let recent_candidate = self.recent.subspace(&candidate);
174
175 let (latest_counter, candidate_value) = {
176 let _mutex_guard = self.allocation_mutex.lock()?;
177 let latest_counter = trx.get_range(&counters_range, 1, true);
178 let candidate_value = trx.get(recent_candidate.bytes(), false);
179 trx.set_option(TransactionOption::NextWriteNoWriteConflictRange)?;
180 trx.set(recent_candidate.bytes(), &[]);
181 (latest_counter, candidate_value)
182 };
183
184 let (latest_counter, candidate_value) =
185 future::try_join(latest_counter, candidate_value).await?;
186
187 let current_window_start: i64 = if let Some(first) = latest_counter.first() {
188 self.counters.unpack(first.key())?
189 } else {
190 0
191 };
192
193 if current_window_start > start {
194 break;
195 }
196
197 if candidate_value.is_none() {
198 let mut after = recent_candidate.bytes().to_vec();
199 after.push(0x00);
200 trx.add_conflict_range(
201 recent_candidate.bytes(),
202 &after,
203 ConflictRangeType::Write,
204 )?;
205 return Ok(candidate);
206 }
207 }
208 }
209 }
210
211 fn window_size(start: i64) -> i64 {
212 // Larger window sizes are better for high contention, smaller sizes for
213 // keeping the keys small. But if there are many allocations, the keys
214 // can't be too small. So start small and scale up. We don't want this to
215 // ever get *too* big because we have to store about window_size/2 recent
216 // items.
217 match start {
218 _ if start < 255 => 64,
219 _ if start < 65535 => 1024,
220 _ => 8192,
221 }
222 }
223}