foundationdb/
tenant.rs

1//! Implementation of the Tenants API. Experimental features exposed through the `tenant-experimental` feature.
2//!
3//! Please refers to the official [documentation](https://apple.github.io/foundationdb/tenants.html)
4//!
5//! ## Warning
6//!
7//! Tenants are currently experimental and are not recommended for use in production. Please note that
8//! currently, we [cannot upgrade a cluster with tenants enabled](https://forums.foundationdb.org/t/tenant-feature-metadata-changes-in-7-2-release/3459).
9
10use crate::options::TransactionOption;
11use std::future::Future;
12
13use crate::database::TransactError;
14use crate::database::{run_with_hooks, NoopHooks};
15use crate::{
16    error, Database, DatabaseTransact, FdbBindingError, FdbError, FdbResult, KeySelector,
17    RangeOption, RetryableTransaction, TransactOption, Transaction,
18};
19use foundationdb_macros::cfg_api_versions;
20use foundationdb_sys as fdb_sys;
21use fdb_sys::if_cfg_api_versions;
22use futures::TryStreamExt;
23use serde::{Deserialize, Serialize};
24use serde_json::Error;
25use std::ptr::NonNull;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::time::Instant;
28
29#[cfg(feature = "fdb-7_1")]
30const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant_map/";
31#[cfg(feature = "fdb-7_1")]
32const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant_map0";
33
34#[cfg_api_versions(min = 730)]
35const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant/map/";
36#[cfg_api_versions(min = 730)]
37const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant/map0";
38
39/// A `FdbTenant` represents a named key-space within a database that can be interacted with transactionally.
40pub struct FdbTenant {
41    pub(crate) inner: NonNull<fdb_sys::FDBTenant>,
42    pub(crate) name: Vec<u8>,
43}
44
45unsafe impl Send for FdbTenant {}
46unsafe impl Sync for FdbTenant {}
47
48impl Drop for FdbTenant {
49    fn drop(&mut self) {
50        unsafe {
51            fdb_sys::fdb_tenant_destroy(self.inner.as_ptr());
52        }
53    }
54}
55
56impl FdbTenant {
57    /// Returns the name of this [`FdbTenant`].
58    pub fn get_name(&self) -> &[u8] {
59        &self.name
60    }
61
62    /// Creates a new transaction on the given database and tenant.
63    pub fn create_trx(&self) -> FdbResult<Transaction> {
64        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
65        let err = unsafe { fdb_sys::fdb_tenant_create_transaction(self.inner.as_ptr(), &mut trx) };
66        error::eval(err)?;
67        Ok(Transaction::new(NonNull::new(trx).expect(
68            "fdb_tenant_create_transaction to not return null if there is no error",
69        )))
70    }
71
72    fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
73        Ok(RetryableTransaction::new(self.create_trx()?))
74    }
75
76    /// Runs a transactional function against this Tenant with retry logic.
77    /// The associated closure will be called until a non-retryable FDBError
78    /// is thrown or commit(), returns success.
79    ///
80    /// Users are **not** expected to keep reference to the `RetryableTransaction`. If a weak or strong
81    /// reference is kept by the user, the binding will throw an error.
82    ///
83    /// # Warning: retry
84    ///
85    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
86    /// set the [crate::options::TransactionOption::RetryLimit] or [crate::options::TransactionOption::RetryLimit] on the transaction
87    /// if the task need to be guaranteed to finish. These options can be safely set on every iteration of the closure.
88    ///
89    /// # Warning: Maybe committed transactions
90    ///
91    /// As with other client/server databases, in some failure scenarios a client may be unable to determine
92    /// whether a transaction succeeded. You should make sure your closure is idempotent.
93    ///
94    /// The closure will notify the user in case of a maybe_committed transaction in a previous run
95    ///  with the boolean provided in the closure.
96    ///
97    pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
98    where
99        F: Fn(RetryableTransaction, bool) -> Fut,
100        Fut: Future<Output = Result<T, FdbBindingError>>,
101    {
102        let transaction = self.create_retryable_trx()?;
103        run_with_hooks(transaction, &NoopHooks, |trx, mc| {
104            closure(trx, mc.into())
105        })
106        .await
107    }
108
109    /// `transact` returns a future which retries on error. It tries to resolve a future created by
110    /// caller-provided function `f` inside a retry loop, providing it with a newly created
111    /// transaction. After caller-provided future resolves, the transaction will be committed
112    /// automatically.
113    ///
114    /// # Warning
115    ///
116    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
117    /// set `TransactionOption::RetryLimit` or `TransactionOption::SetTimeout` on the transaction
118    /// if the task need to be guaranteed to finish.
119    ///
120    /// Once [Generic Associated Types](https://github.com/rust-lang/rfcs/blob/master/text/1598-generic_associated_types.md)
121    /// lands in stable rust, the returned future of f won't need to be boxed anymore, also the
122    /// lifetime limitations around f might be lowered.
123    pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
124    where
125        F: DatabaseTransact,
126    {
127        let is_idempotent = options.is_idempotent;
128        let time_out = options.time_out.map(|d| Instant::now() + d);
129        let retry_limit = options.retry_limit;
130        let mut tries: u32 = 0;
131        let mut trx = self.create_trx()?;
132        let mut can_retry = move || {
133            tries += 1;
134            retry_limit.map(|limit| tries < limit).unwrap_or(true)
135                && time_out.map(|t| Instant::now() < t).unwrap_or(true)
136        };
137        loop {
138            let r = f.transact(trx).await;
139            f = r.0;
140            trx = r.1;
141            trx = match r.2 {
142                Ok(item) => match trx.commit().await {
143                    Ok(_) => break Ok(item),
144                    Err(e) => {
145                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
146                            e.on_error().await?
147                        } else {
148                            break Err(F::Error::from(e.into()));
149                        }
150                    }
151                },
152                Err(user_err) => match user_err.try_into_fdb_error() {
153                    Ok(e) => {
154                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
155                            trx.on_error(e).await?
156                        } else {
157                            break Err(F::Error::from(e));
158                        }
159                    }
160                    Err(user_err) => break Err(user_err),
161                },
162            };
163        }
164    }
165}
166
167#[cfg(feature = "fdb-7_1")]
168/// Holds the information about a tenant
169#[derive(Serialize, Deserialize, Debug)]
170pub struct TenantInfo {
171    pub id: i64,
172    pub prefix: Vec<u8>,
173    pub name: Vec<u8>,
174}
175
176#[cfg_api_versions(min = 730)]
177/// Holds the information about a tenant
178#[derive(Serialize, Deserialize, Debug)]
179pub struct TenantInfo {
180    pub id: i64,
181    pub prefix: FDBTenantPrintableInfo,
182    pub name: FDBTenantPrintableInfo,
183}
184
185impl TryFrom<(&[u8], &[u8])> for TenantInfo {
186    type Error = Error;
187
188    fn try_from(k_v: (&[u8], &[u8])) -> Result<Self, Self::Error> {
189        let value = k_v.1;
190        match serde_json::from_slice::<FDBTenantInfo>(value) {
191            Ok(tenant_info) => if_cfg_api_versions!(min = 730 => {
192                Ok(TenantInfo {
193                    name: tenant_info.name,
194                    id: tenant_info.id,
195                    prefix: tenant_info.prefix,
196                })
197            } else {
198                Ok(TenantInfo {
199                    name: k_v.0.split_at(TENANT_MAP_PREFIX.len()).1.to_vec(),
200                    id: tenant_info.id,
201                    prefix: tenant_info.prefix,
202                })
203            }),
204
205            Err(err) => Err(err),
206        }
207    }
208}
209
210/// Holds the information about a tenant. This is the struct that is stored in FDB
211#[cfg(feature = "fdb-7_1")]
212#[derive(Serialize, Deserialize, Debug)]
213struct FDBTenantInfo {
214    id: i64,
215    #[serde(with = "serde_bytes")]
216    prefix: Vec<u8>,
217}
218
219#[cfg_api_versions(min = 730)]
220#[derive(Serialize, Deserialize, Debug)]
221struct FDBTenantInfo {
222    id: i64,
223    lock_state: TenantLockState,
224    name: FDBTenantPrintableInfo,
225    prefix: FDBTenantPrintableInfo,
226}
227
228#[cfg_api_versions(min = 730)]
229#[derive(Serialize, Deserialize, Debug)]
230#[serde(rename_all = "camelCase")]
231enum TenantLockState {
232    Unlocked,
233    Locked,
234    ReadOnly,
235}
236
237#[cfg_api_versions(min = 730)]
238/// Display a printable version of bytes
239#[derive(Serialize, Deserialize, Debug)]
240pub struct FDBTenantPrintableInfo {
241    base64: String,
242    printable: String,
243}
244
245/// The FoundationDB API includes function to manage the set of tenants in a cluster.
246// This is a port from https://github.com/apple/foundationdb/blob/87ee0a2963f615079b3f50afa332acd0ead5f1fe/bindings/java/src/main/com/apple/foundationdb/TenantManagement.java
247#[derive(Debug)]
248pub struct TenantManagement;
249
250impl TenantManagement {
251    /// Creates a new tenant in the cluster using a transaction created on the specified Database.
252    /// his operation will first check whether the tenant exists, and if it does it will set the Result
253    /// to a `tenant_already_exists` error. Otherwise, it will attempt to create the tenant in a retry loop.
254    /// If the tenant is created concurrently by another transaction, this function may still return successfully.
255    pub async fn create_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
256        let checked_existence = AtomicBool::new(false);
257        let checked_existence_ref = &checked_existence;
258
259        let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
260        key.extend_from_slice(TENANT_MAP_PREFIX);
261        key.extend_from_slice(tenant_name);
262
263        let key_ref = &key;
264
265        db.run(|trx, _maybe_committed| async move {
266            trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
267
268            if checked_existence_ref.load(Ordering::SeqCst) {
269                trx.set(key_ref, &[]);
270                Ok(())
271            } else {
272                let maybe_key = trx.get(key_ref, false).await?;
273
274                checked_existence_ref.store(true, Ordering::SeqCst);
275
276                match maybe_key {
277                    None => {
278                        trx.set(key_ref, &[]);
279                        Ok(())
280                    }
281                    Some(_) => {
282                        // `tenant_already_exists` error
283                        Err(FdbBindingError::from(FdbError::new(2132)))
284                    }
285                }
286            }
287        })
288        .await
289        // error can only be an fdb_error
290        .map_err(|e| e.get_fdb_error().unwrap())
291    }
292
293    /// Get a tenant in the cluster using a transaction created on the specified Database.
294    pub async fn get_tenant(
295        db: &Database,
296        tenant_name: &[u8],
297    ) -> Result<Option<Result<TenantInfo, serde_json::Error>>, FdbError> {
298        let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
299        key.extend_from_slice(TENANT_MAP_PREFIX);
300        key.extend_from_slice(tenant_name);
301
302        let key_ref = &key;
303        match db
304            .run(|trx, _maybe_committed| async move {
305                trx.set_option(TransactionOption::ReadSystemKeys)?;
306                trx.set_option(TransactionOption::ReadLockAware)?;
307
308                Ok(trx.get(key_ref, false).await?)
309            })
310            .await
311        {
312            Ok(None) => Ok(None),
313            Ok(Some(kv)) => Ok(Some(TenantInfo::try_from((key.as_slice(), kv.as_ref())))),
314            // error can only be an fdb_error
315            Err(err) => Err(err.get_fdb_error().unwrap()),
316        }
317    }
318
319    /// Deletes a tenant from the cluster using a transaction created on the specified `Database`.
320    /// This operation will first check whether the tenant exists, and if it does not it will set the
321    /// result to a `tenant_not_found` error. Otherwise, it will attempt to delete the tenant in a retry loop.
322    /// If the tenant is deleted concurrently by another transaction, this function may still return successfully.
323    pub async fn delete_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
324        let checked_existence = AtomicBool::new(false);
325        let checked_existence_ref = &checked_existence;
326
327        let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
328        key.extend_from_slice(TENANT_MAP_PREFIX);
329        key.extend_from_slice(tenant_name);
330
331        let key_ref = &key;
332
333        db.run(|trx, _maybe_committed| async move {
334            trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
335
336            if checked_existence_ref.load(Ordering::SeqCst) {
337                trx.clear(key_ref);
338                Ok(())
339            } else {
340                let maybe_key = trx.get(key_ref, false).await?;
341
342                checked_existence_ref.store(true, Ordering::SeqCst);
343
344                match maybe_key {
345                    None => {
346                        // `tenant_not_found` error
347                        Err(FdbBindingError::from(FdbError::new(2131)))
348                    }
349                    Some(_) => {
350                        trx.clear(key_ref);
351                        Ok(())
352                    }
353                }
354            }
355        })
356        .await
357        // error can only be an fdb_error
358        .map_err(|e| e.get_fdb_error().unwrap())
359    }
360
361    /// Lists all tenants in between the range specified. The number of tenants listed can be restricted.
362    /// This is a convenience method that generates the begin and end ranges by packing two Tuples.
363    pub async fn list_tenant(
364        db: &Database,
365        begin: &[u8],
366        end: &[u8],
367        limit: Option<usize>,
368    ) -> Result<Vec<Result<TenantInfo, serde_json::Error>>, FdbError> {
369        let trx = db.create_trx()?;
370        trx.set_option(TransactionOption::ReadSystemKeys)?;
371        trx.set_option(TransactionOption::ReadLockAware)?;
372
373        let mut begin_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + begin.len());
374        begin_range.extend_from_slice(TENANT_MAP_PREFIX);
375        begin_range.extend_from_slice(begin);
376
377        let end_range = if end.is_empty() {
378            TENANT_MAP_PREFIX_END.to_vec()
379        } else {
380            let mut end_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + end.len());
381            end_range.extend_from_slice(TENANT_MAP_PREFIX);
382            end_range.extend_from_slice(end);
383
384            end_range
385        };
386
387        let range_option = RangeOption {
388            begin: KeySelector::first_greater_than(begin_range),
389            end: KeySelector::first_greater_than(end_range),
390            limit,
391            ..Default::default()
392        };
393
394        trx.get_ranges_keyvalues(range_option, false)
395            .map_ok(|fdb_value| TenantInfo::try_from((fdb_value.key(), fdb_value.value())))
396            .try_collect::<Vec<Result<TenantInfo, serde_json::Error>>>()
397            .await
398    }
399}