foundationdb/
tenant.rs

1//! Implementation of the Tenants API. Experimental features exposed through the `tenant-experimental` feature.
2//!
3//! Please refers to the official [documentation](https://apple.github.io/foundationdb/tenants.html)
4//!
5//! ## Warning
6//!
7//! Tenants are currently experimental and are not recommended for use in production. Please note that
8//! currently, we [cannot upgrade a cluster with tenants enabled](https://forums.foundationdb.org/t/tenant-feature-metadata-changes-in-7-2-release/3459).
9
10use crate::options::TransactionOption;
11use std::future::Future;
12
13use crate::database::TransactError;
14use crate::{
15    error, Database, DatabaseTransact, FdbBindingError, FdbError, FdbResult, KeySelector,
16    RangeOption, RetryableTransaction, TransactOption, Transaction,
17};
18use foundationdb_macros::cfg_api_versions;
19use foundationdb_sys as fdb_sys;
20use fdb_sys::if_cfg_api_versions;
21use futures::TryStreamExt;
22use serde::{Deserialize, Serialize};
23use serde_json::Error;
24use std::ptr::NonNull;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::time::Instant;
27
28#[cfg(feature = "fdb-7_1")]
29const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant_map/";
30#[cfg(feature = "fdb-7_1")]
31const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant_map0";
32
33#[cfg_api_versions(min = 730)]
34const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant/map/";
35#[cfg_api_versions(min = 730)]
36const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant/map0";
37
38/// A `FdbTenant` represents a named key-space within a database that can be interacted with transactionally.
39pub struct FdbTenant {
40    pub(crate) inner: NonNull<fdb_sys::FDBTenant>,
41    pub(crate) name: Vec<u8>,
42}
43
44unsafe impl Send for FdbTenant {}
45unsafe impl Sync for FdbTenant {}
46
47impl Drop for FdbTenant {
48    fn drop(&mut self) {
49        unsafe {
50            fdb_sys::fdb_tenant_destroy(self.inner.as_ptr());
51        }
52    }
53}
54
55impl FdbTenant {
56    /// Returns the name of this [`FdbTenant`].
57    pub fn get_name(&self) -> &[u8] {
58        &self.name
59    }
60
61    /// Creates a new transaction on the given database and tenant.
62    pub fn create_trx(&self) -> FdbResult<Transaction> {
63        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
64        let err = unsafe { fdb_sys::fdb_tenant_create_transaction(self.inner.as_ptr(), &mut trx) };
65        error::eval(err)?;
66        Ok(Transaction::new(NonNull::new(trx).expect(
67            "fdb_tenant_create_transaction to not return null if there is no error",
68        )))
69    }
70
71    fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
72        Ok(RetryableTransaction::new(self.create_trx()?))
73    }
74
75    /// Runs a transactional function against this Tenant with retry logic.
76    /// The associated closure will be called until a non-retryable FDBError
77    /// is thrown or commit(), returns success.
78    ///
79    /// Users are **not** expected to keep reference to the `RetryableTransaction`. If a weak or strong
80    /// reference is kept by the user, the binding will throw an error.
81    ///
82    /// # Warning: retry
83    ///
84    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
85    /// set the [crate::options::TransactionOption::RetryLimit] or [crate::options::TransactionOption::RetryLimit] on the transaction
86    /// if the task need to be guaranteed to finish. These options can be safely set on every iteration of the closure.
87    ///
88    /// # Warning: Maybe committed transactions
89    ///
90    /// As with other client/server databases, in some failure scenarios a client may be unable to determine
91    /// whether a transaction succeeded. You should make sure your closure is idempotent.
92    ///
93    /// The closure will notify the user in case of a maybe_committed transaction in a previous run
94    ///  with the boolean provided in the closure.
95    ///
96    pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
97    where
98        F: Fn(RetryableTransaction, bool) -> Fut,
99        Fut: Future<Output = Result<T, FdbBindingError>>,
100    {
101        let mut maybe_committed_transaction = false;
102        // we just need to create the transaction once,
103        // in case there is a error, it will be reset automatically
104        let mut transaction = self.create_retryable_trx()?;
105
106        loop {
107            // executing the closure
108            let result_closure = closure(transaction.clone(), maybe_committed_transaction).await;
109
110            if let Err(e) = result_closure {
111                // checks if it is an FdbError
112                if let Some(e) = e.get_fdb_error() {
113                    maybe_committed_transaction = e.is_maybe_committed();
114                    // The closure returned an Error,
115                    match transaction.on_error(e).await {
116                        // we can retry the error
117                        Ok(Ok(t)) => {
118                            transaction = t;
119                            continue;
120                        }
121                        Ok(Err(non_retryable_error)) => {
122                            return Err(FdbBindingError::from(non_retryable_error))
123                        }
124                        // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
125                        Err(non_retryable_error) => return Err(non_retryable_error),
126                    }
127                }
128                // Otherwise, it cannot be retried
129                return Err(e);
130            }
131
132            let commit_result = transaction.commit().await;
133
134            match commit_result {
135                // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
136                Err(err) => return Err(err),
137                Ok(Ok(_)) => return result_closure,
138                Ok(Err(transaction_commit_error)) => {
139                    maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
140                    // we have an error during commit, checking if it is a retryable error
141                    match transaction_commit_error.on_error().await {
142                        Ok(t) => {
143                            transaction = RetryableTransaction::new(t);
144                            continue;
145                        }
146                        Err(non_retryable_error) => {
147                            return Err(FdbBindingError::from(non_retryable_error))
148                        }
149                    }
150                }
151            }
152        }
153    }
154
155    /// `transact` returns a future which retries on error. It tries to resolve a future created by
156    /// caller-provided function `f` inside a retry loop, providing it with a newly created
157    /// transaction. After caller-provided future resolves, the transaction will be committed
158    /// automatically.
159    ///
160    /// # Warning
161    ///
162    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
163    /// set `TransactionOption::RetryLimit` or `TransactionOption::SetTimeout` on the transaction
164    /// if the task need to be guaranteed to finish.
165    ///
166    /// Once [Generic Associated Types](https://github.com/rust-lang/rfcs/blob/master/text/1598-generic_associated_types.md)
167    /// lands in stable rust, the returned future of f won't need to be boxed anymore, also the
168    /// lifetime limitations around f might be lowered.
169    pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
170    where
171        F: DatabaseTransact,
172    {
173        let is_idempotent = options.is_idempotent;
174        let time_out = options.time_out.map(|d| Instant::now() + d);
175        let retry_limit = options.retry_limit;
176        let mut tries: u32 = 0;
177        let mut trx = self.create_trx()?;
178        let mut can_retry = move || {
179            tries += 1;
180            retry_limit.map(|limit| tries < limit).unwrap_or(true)
181                && time_out.map(|t| Instant::now() < t).unwrap_or(true)
182        };
183        loop {
184            let r = f.transact(trx).await;
185            f = r.0;
186            trx = r.1;
187            trx = match r.2 {
188                Ok(item) => match trx.commit().await {
189                    Ok(_) => break Ok(item),
190                    Err(e) => {
191                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
192                            e.on_error().await?
193                        } else {
194                            break Err(F::Error::from(e.into()));
195                        }
196                    }
197                },
198                Err(user_err) => match user_err.try_into_fdb_error() {
199                    Ok(e) => {
200                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
201                            trx.on_error(e).await?
202                        } else {
203                            break Err(F::Error::from(e));
204                        }
205                    }
206                    Err(user_err) => break Err(user_err),
207                },
208            };
209        }
210    }
211}
212
213#[cfg(feature = "fdb-7_1")]
214/// Holds the information about a tenant
215#[derive(Serialize, Deserialize, Debug)]
216pub struct TenantInfo {
217    pub id: i64,
218    pub prefix: Vec<u8>,
219    pub name: Vec<u8>,
220}
221
222#[cfg_api_versions(min = 730)]
223/// Holds the information about a tenant
224#[derive(Serialize, Deserialize, Debug)]
225pub struct TenantInfo {
226    pub id: i64,
227    pub prefix: FDBTenantPrintableInfo,
228    pub name: FDBTenantPrintableInfo,
229}
230
231impl TryFrom<(&[u8], &[u8])> for TenantInfo {
232    type Error = Error;
233
234    fn try_from(k_v: (&[u8], &[u8])) -> Result<Self, Self::Error> {
235        let value = k_v.1;
236        match serde_json::from_slice::<FDBTenantInfo>(value) {
237            Ok(tenant_info) => if_cfg_api_versions!(min = 730 => {
238                Ok(TenantInfo {
239                    name: tenant_info.name,
240                    id: tenant_info.id,
241                    prefix: tenant_info.prefix,
242                })
243            } else {
244                Ok(TenantInfo {
245                    name: k_v.0.split_at(TENANT_MAP_PREFIX.len()).1.to_vec(),
246                    id: tenant_info.id,
247                    prefix: tenant_info.prefix,
248                })
249            }),
250
251            Err(err) => Err(err),
252        }
253    }
254}
255
256/// Holds the information about a tenant. This is the struct that is stored in FDB
257#[cfg(feature = "fdb-7_1")]
258#[derive(Serialize, Deserialize, Debug)]
259struct FDBTenantInfo {
260    id: i64,
261    #[serde(with = "serde_bytes")]
262    prefix: Vec<u8>,
263}
264
265#[cfg_api_versions(min = 730)]
266#[derive(Serialize, Deserialize, Debug)]
267struct FDBTenantInfo {
268    id: i64,
269    lock_state: TenantLockState,
270    name: FDBTenantPrintableInfo,
271    prefix: FDBTenantPrintableInfo,
272}
273
274#[cfg_api_versions(min = 730)]
275#[derive(Serialize, Deserialize, Debug)]
276#[serde(rename_all = "camelCase")]
277enum TenantLockState {
278    Unlocked,
279    Locked,
280    ReadOnly,
281}
282
283#[cfg_api_versions(min = 730)]
284/// Display a printable version of bytes
285#[derive(Serialize, Deserialize, Debug)]
286pub struct FDBTenantPrintableInfo {
287    base64: String,
288    printable: String,
289}
290
291/// The FoundationDB API includes function to manage the set of tenants in a cluster.
292// This is a port from https://github.com/apple/foundationdb/blob/87ee0a2963f615079b3f50afa332acd0ead5f1fe/bindings/java/src/main/com/apple/foundationdb/TenantManagement.java
293#[derive(Debug)]
294pub struct TenantManagement;
295
296impl TenantManagement {
297    /// Creates a new tenant in the cluster using a transaction created on the specified Database.
298    /// his operation will first check whether the tenant exists, and if it does it will set the Result
299    /// to a `tenant_already_exists` error. Otherwise, it will attempt to create the tenant in a retry loop.
300    /// If the tenant is created concurrently by another transaction, this function may still return successfully.
301    pub async fn create_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
302        let checked_existence = AtomicBool::new(false);
303        let checked_existence_ref = &checked_existence;
304
305        let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
306        key.extend_from_slice(TENANT_MAP_PREFIX);
307        key.extend_from_slice(tenant_name);
308
309        let key_ref = &key;
310
311        db.run(|trx, _maybe_committed| async move {
312            trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
313
314            if checked_existence_ref.load(Ordering::SeqCst) {
315                trx.set(key_ref, &[]);
316                Ok(())
317            } else {
318                let maybe_key = trx.get(key_ref, false).await?;
319
320                checked_existence_ref.store(true, Ordering::SeqCst);
321
322                match maybe_key {
323                    None => {
324                        trx.set(key_ref, &[]);
325                        Ok(())
326                    }
327                    Some(_) => {
328                        // `tenant_already_exists` error
329                        Err(FdbBindingError::from(FdbError::new(2132)))
330                    }
331                }
332            }
333        })
334        .await
335        // error can only be an fdb_error
336        .map_err(|e| e.get_fdb_error().unwrap())
337    }
338
339    /// Get a tenant in the cluster using a transaction created on the specified Database.
340    pub async fn get_tenant(
341        db: &Database,
342        tenant_name: &[u8],
343    ) -> Result<Option<Result<TenantInfo, serde_json::Error>>, FdbError> {
344        let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
345        key.extend_from_slice(TENANT_MAP_PREFIX);
346        key.extend_from_slice(tenant_name);
347
348        let key_ref = &key;
349        match db
350            .run(|trx, _maybe_committed| async move {
351                trx.set_option(TransactionOption::ReadSystemKeys)?;
352                trx.set_option(TransactionOption::ReadLockAware)?;
353
354                Ok(trx.get(key_ref, false).await?)
355            })
356            .await
357        {
358            Ok(None) => Ok(None),
359            Ok(Some(kv)) => Ok(Some(TenantInfo::try_from((key.as_slice(), kv.as_ref())))),
360            // error can only be an fdb_error
361            Err(err) => Err(err.get_fdb_error().unwrap()),
362        }
363    }
364
365    /// Deletes a tenant from the cluster using a transaction created on the specified `Database`.
366    /// This operation will first check whether the tenant exists, and if it does not it will set the
367    /// result to a `tenant_not_found` error. Otherwise, it will attempt to delete the tenant in a retry loop.
368    /// If the tenant is deleted concurrently by another transaction, this function may still return successfully.
369    pub async fn delete_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
370        let checked_existence = AtomicBool::new(false);
371        let checked_existence_ref = &checked_existence;
372
373        let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
374        key.extend_from_slice(TENANT_MAP_PREFIX);
375        key.extend_from_slice(tenant_name);
376
377        let key_ref = &key;
378
379        db.run(|trx, _maybe_committed| async move {
380            trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
381
382            if checked_existence_ref.load(Ordering::SeqCst) {
383                trx.clear(key_ref);
384                Ok(())
385            } else {
386                let maybe_key = trx.get(key_ref, false).await?;
387
388                checked_existence_ref.store(true, Ordering::SeqCst);
389
390                match maybe_key {
391                    None => {
392                        // `tenant_not_found` error
393                        Err(FdbBindingError::from(FdbError::new(2131)))
394                    }
395                    Some(_) => {
396                        trx.clear(key_ref);
397                        Ok(())
398                    }
399                }
400            }
401        })
402        .await
403        // error can only be an fdb_error
404        .map_err(|e| e.get_fdb_error().unwrap())
405    }
406
407    /// Lists all tenants in between the range specified. The number of tenants listed can be restricted.
408    /// This is a convenience method that generates the begin and end ranges by packing two Tuples.
409    pub async fn list_tenant(
410        db: &Database,
411        begin: &[u8],
412        end: &[u8],
413        limit: Option<usize>,
414    ) -> Result<Vec<Result<TenantInfo, serde_json::Error>>, FdbError> {
415        let trx = db.create_trx()?;
416        trx.set_option(TransactionOption::ReadSystemKeys)?;
417        trx.set_option(TransactionOption::ReadLockAware)?;
418
419        let mut begin_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + begin.len());
420        begin_range.extend_from_slice(TENANT_MAP_PREFIX);
421        begin_range.extend_from_slice(begin);
422
423        let end_range = if end.is_empty() {
424            TENANT_MAP_PREFIX_END.to_vec()
425        } else {
426            let mut end_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + end.len());
427            end_range.extend_from_slice(TENANT_MAP_PREFIX);
428            end_range.extend_from_slice(end);
429
430            end_range
431        };
432
433        let range_option = RangeOption {
434            begin: KeySelector::first_greater_than(begin_range),
435            end: KeySelector::first_greater_than(end_range),
436            limit,
437            ..Default::default()
438        };
439
440        trx.get_ranges_keyvalues(range_option, false)
441            .map_ok(|fdb_value| TenantInfo::try_from((fdb_value.key(), fdb_value.value())))
442            .try_collect::<Vec<Result<TenantInfo, serde_json::Error>>>()
443            .await
444    }
445}