foundationdb/
tenant.rs

1//! Implementation of the Tenants API. Experimental features exposed through the `tenant-experimental` feature.
2//!
3//! Please refers to the official [documentation](https://apple.github.io/foundationdb/tenants.html)
4//!
5//! ## Warning
6//!
7//! Tenants are currently experimental and are not recommended for use in production. Please note that
8//! currently, we [cannot upgrade a cluster with tenants enabled](https://forums.foundationdb.org/t/tenant-feature-metadata-changes-in-7-2-release/3459).
9
10use crate::options::TransactionOption;
11use std::future::Future;
12
13use crate::database::TransactError;
14use crate::{
15    error, Database, DatabaseTransact, FdbBindingError, FdbError, FdbResult, KeySelector,
16    RangeOption, RetryableTransaction, TransactOption, Transaction,
17};
18use foundationdb_sys as fdb_sys;
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use serde_json::Error;
22use std::ptr::NonNull;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::time::Instant;
25
26#[cfg(feature = "fdb-7_1")]
27const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant_map/";
28#[cfg(feature = "fdb-7_1")]
29const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant_map0";
30
31#[cfg(feature = "fdb-7_3")]
32const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant/map/";
33#[cfg(feature = "fdb-7_3")]
34const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant/map0";
35
36/// A `FdbTenant` represents a named key-space within a database that can be interacted with transactionally.
37pub struct FdbTenant {
38    pub(crate) inner: NonNull<fdb_sys::FDBTenant>,
39    pub(crate) name: Vec<u8>,
40}
41
42unsafe impl Send for FdbTenant {}
43unsafe impl Sync for FdbTenant {}
44
45impl Drop for FdbTenant {
46    fn drop(&mut self) {
47        unsafe {
48            fdb_sys::fdb_tenant_destroy(self.inner.as_ptr());
49        }
50    }
51}
52
53impl FdbTenant {
54    /// Returns the name of this [`FdbTenant`].
55    pub fn get_name(&self) -> &[u8] {
56        &self.name
57    }
58
59    /// Creates a new transaction on the given database and tenant.
60    pub fn create_trx(&self) -> FdbResult<Transaction> {
61        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
62        let err = unsafe { fdb_sys::fdb_tenant_create_transaction(self.inner.as_ptr(), &mut trx) };
63        error::eval(err)?;
64        Ok(Transaction::new(NonNull::new(trx).expect(
65            "fdb_tenant_create_transaction to not return null if there is no error",
66        )))
67    }
68
69    fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
70        Ok(RetryableTransaction::new(self.create_trx()?))
71    }
72
73    /// Runs a transactional function against this Tenant with retry logic.
74    /// The associated closure will be called until a non-retryable FDBError
75    /// is thrown or commit(), returns success.
76    ///
77    /// Users are **not** expected to keep reference to the `RetryableTransaction`. If a weak or strong
78    /// reference is kept by the user, the binding will throw an error.
79    ///
80    /// # Warning: retry
81    ///
82    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
83    /// set the [crate::options::TransactionOption::RetryLimit] or [crate::options::TransactionOption::RetryLimit] on the transaction
84    /// if the task need to be guaranteed to finish. These options can be safely set on every iteration of the closure.
85    ///
86    /// # Warning: Maybe committed transactions
87    ///
88    /// As with other client/server databases, in some failure scenarios a client may be unable to determine
89    /// whether a transaction succeeded. You should make sure your closure is idempotent.
90    ///
91    /// The closure will notify the user in case of a maybe_committed transaction in a previous run
92    ///  with the boolean provided in the closure.
93    ///
94    pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
95    where
96        F: Fn(RetryableTransaction, bool) -> Fut,
97        Fut: Future<Output = Result<T, FdbBindingError>>,
98    {
99        let mut maybe_committed_transaction = false;
100        // we just need to create the transaction once,
101        // in case there is a error, it will be reset automatically
102        let mut transaction = self.create_retryable_trx()?;
103
104        loop {
105            // executing the closure
106            let result_closure = closure(transaction.clone(), maybe_committed_transaction).await;
107
108            if let Err(e) = result_closure {
109                // checks if it is an FdbError
110                if let Some(e) = e.get_fdb_error() {
111                    maybe_committed_transaction = e.is_maybe_committed();
112                    // The closure returned an Error,
113                    match transaction.on_error(e).await {
114                        // we can retry the error
115                        Ok(Ok(t)) => {
116                            transaction = t;
117                            continue;
118                        }
119                        Ok(Err(non_retryable_error)) => {
120                            return Err(FdbBindingError::from(non_retryable_error))
121                        }
122                        // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
123                        Err(non_retryable_error) => return Err(non_retryable_error),
124                    }
125                }
126                // Otherwise, it cannot be retried
127                return Err(e);
128            }
129
130            let commit_result = transaction.commit().await;
131
132            match commit_result {
133                // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
134                Err(err) => return Err(err),
135                Ok(Ok(_)) => return result_closure,
136                Ok(Err(transaction_commit_error)) => {
137                    maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
138                    // we have an error during commit, checking if it is a retryable error
139                    match transaction_commit_error.on_error().await {
140                        Ok(t) => {
141                            transaction = RetryableTransaction::new(t);
142                            continue;
143                        }
144                        Err(non_retryable_error) => {
145                            return Err(FdbBindingError::from(non_retryable_error))
146                        }
147                    }
148                }
149            }
150        }
151    }
152
153    /// `transact` returns a future which retries on error. It tries to resolve a future created by
154    /// caller-provided function `f` inside a retry loop, providing it with a newly created
155    /// transaction. After caller-provided future resolves, the transaction will be committed
156    /// automatically.
157    ///
158    /// # Warning
159    ///
160    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
161    /// set `TransactionOption::RetryLimit` or `TransactionOption::SetTimeout` on the transaction
162    /// if the task need to be guaranteed to finish.
163    ///
164    /// Once [Generic Associated Types](https://github.com/rust-lang/rfcs/blob/master/text/1598-generic_associated_types.md)
165    /// lands in stable rust, the returned future of f won't need to be boxed anymore, also the
166    /// lifetime limitations around f might be lowered.
167    pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
168    where
169        F: DatabaseTransact,
170    {
171        let is_idempotent = options.is_idempotent;
172        let time_out = options.time_out.map(|d| Instant::now() + d);
173        let retry_limit = options.retry_limit;
174        let mut tries: u32 = 0;
175        let mut trx = self.create_trx()?;
176        let mut can_retry = move || {
177            tries += 1;
178            retry_limit.map(|limit| tries < limit).unwrap_or(true)
179                && time_out.map(|t| Instant::now() < t).unwrap_or(true)
180        };
181        loop {
182            let r = f.transact(trx).await;
183            f = r.0;
184            trx = r.1;
185            trx = match r.2 {
186                Ok(item) => match trx.commit().await {
187                    Ok(_) => break Ok(item),
188                    Err(e) => {
189                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
190                            e.on_error().await?
191                        } else {
192                            break Err(F::Error::from(e.into()));
193                        }
194                    }
195                },
196                Err(user_err) => match user_err.try_into_fdb_error() {
197                    Ok(e) => {
198                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
199                            trx.on_error(e).await?
200                        } else {
201                            break Err(F::Error::from(e));
202                        }
203                    }
204                    Err(user_err) => break Err(user_err),
205                },
206            };
207        }
208    }
209}
210
211#[cfg(feature = "fdb-7_1")]
212/// Holds the information about a tenant
213#[derive(Serialize, Deserialize, Debug)]
214pub struct TenantInfo {
215    pub id: i64,
216    pub prefix: Vec<u8>,
217    pub name: Vec<u8>,
218}
219
220#[cfg(feature = "fdb-7_3")]
221/// Holds the information about a tenant
222#[derive(Serialize, Deserialize, Debug)]
223pub struct TenantInfo {
224    pub id: i64,
225    pub prefix: FDBTenantPrintableInfo,
226    pub name: FDBTenantPrintableInfo,
227}
228
229impl TryFrom<(&[u8], &[u8])> for TenantInfo {
230    type Error = Error;
231
232    fn try_from(k_v: (&[u8], &[u8])) -> Result<Self, Self::Error> {
233        let value = k_v.1;
234        match serde_json::from_slice::<FDBTenantInfo>(value) {
235            #[cfg(feature = "fdb-7_1")]
236            Ok(tenant_info) => Ok(TenantInfo {
237                name: k_v.0.split_at(TENANT_MAP_PREFIX.len()).1.to_vec(),
238                id: tenant_info.id,
239                prefix: tenant_info.prefix,
240            }),
241
242            #[cfg(feature = "fdb-7_3")]
243            Ok(tenant_info) => Ok(TenantInfo {
244                name: tenant_info.name,
245                id: tenant_info.id,
246                prefix: tenant_info.prefix,
247            }),
248
249            Err(err) => Err(err),
250        }
251    }
252}
253
254/// Holds the information about a tenant. This is the struct that is stored in FDB
255#[cfg(feature = "fdb-7_1")]
256#[derive(Serialize, Deserialize, Debug)]
257struct FDBTenantInfo {
258    id: i64,
259    #[serde(with = "serde_bytes")]
260    prefix: Vec<u8>,
261}
262
263#[cfg(feature = "fdb-7_3")]
264#[derive(Serialize, Deserialize, Debug)]
265struct FDBTenantInfo {
266    id: i64,
267    lock_state: TenantLockState,
268    name: FDBTenantPrintableInfo,
269    prefix: FDBTenantPrintableInfo,
270}
271
272#[cfg(feature = "fdb-7_3")]
273#[derive(Serialize, Deserialize, Debug)]
274#[serde(rename_all = "camelCase")]
275enum TenantLockState {
276    Unlocked,
277    Locked,
278    ReadOnly,
279}
280
281#[cfg(feature = "fdb-7_3")]
282/// Display a printable version of bytes
283#[derive(Serialize, Deserialize, Debug)]
284pub struct FDBTenantPrintableInfo {
285    base64: String,
286    printable: String,
287}
288
289/// The FoundationDB API includes function to manage the set of tenants in a cluster.
290// This is a port from https://github.com/apple/foundationdb/blob/87ee0a2963f615079b3f50afa332acd0ead5f1fe/bindings/java/src/main/com/apple/foundationdb/TenantManagement.java
291#[derive(Debug)]
292pub struct TenantManagement;
293
294impl TenantManagement {
295    /// Creates a new tenant in the cluster using a transaction created on the specified Database.
296    /// his operation will first check whether the tenant exists, and if it does it will set the Result
297    /// to a `tenant_already_exists` error. Otherwise, it will attempt to create the tenant in a retry loop.
298    /// If the tenant is created concurrently by another transaction, this function may still return successfully.
299    pub async fn create_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
300        let checked_existence = AtomicBool::new(false);
301        let checked_existence_ref = &checked_existence;
302
303        let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
304        key.extend_from_slice(TENANT_MAP_PREFIX);
305        key.extend_from_slice(tenant_name);
306
307        let key_ref = &key;
308
309        db.run(|trx, _maybe_committed| async move {
310            trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
311
312            if checked_existence_ref.load(Ordering::SeqCst) {
313                trx.set(key_ref, &[]);
314                Ok(())
315            } else {
316                let maybe_key = trx.get(key_ref, false).await?;
317
318                checked_existence_ref.store(true, Ordering::SeqCst);
319
320                match maybe_key {
321                    None => {
322                        trx.set(key_ref, &[]);
323                        Ok(())
324                    }
325                    Some(_) => {
326                        // `tenant_already_exists` error
327                        Err(FdbBindingError::from(FdbError::new(2132)))
328                    }
329                }
330            }
331        })
332        .await
333        // error can only be an fdb_error
334        .map_err(|e| e.get_fdb_error().unwrap())
335    }
336
337    /// Get a tenant in the cluster using a transaction created on the specified Database.
338    pub async fn get_tenant(
339        db: &Database,
340        tenant_name: &[u8],
341    ) -> Result<Option<Result<TenantInfo, serde_json::Error>>, FdbError> {
342        let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
343        key.extend_from_slice(TENANT_MAP_PREFIX);
344        key.extend_from_slice(tenant_name);
345
346        let key_ref = &key;
347        match db
348            .run(|trx, _maybe_committed| async move {
349                trx.set_option(TransactionOption::ReadSystemKeys)?;
350                trx.set_option(TransactionOption::ReadLockAware)?;
351
352                Ok(trx.get(key_ref, false).await?)
353            })
354            .await
355        {
356            Ok(None) => Ok(None),
357            Ok(Some(kv)) => Ok(Some(TenantInfo::try_from((key.as_slice(), kv.as_ref())))),
358            // error can only be an fdb_error
359            Err(err) => Err(err.get_fdb_error().unwrap()),
360        }
361    }
362
363    /// Deletes a tenant from the cluster using a transaction created on the specified `Database`.
364    /// This operation will first check whether the tenant exists, and if it does not it will set the
365    /// result to a `tenant_not_found` error. Otherwise, it will attempt to delete the tenant in a retry loop.
366    /// If the tenant is deleted concurrently by another transaction, this function may still return successfully.
367    pub async fn delete_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
368        let checked_existence = AtomicBool::new(false);
369        let checked_existence_ref = &checked_existence;
370
371        let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
372        key.extend_from_slice(TENANT_MAP_PREFIX);
373        key.extend_from_slice(tenant_name);
374
375        let key_ref = &key;
376
377        db.run(|trx, _maybe_committed| async move {
378            trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
379
380            if checked_existence_ref.load(Ordering::SeqCst) {
381                trx.clear(key_ref);
382                Ok(())
383            } else {
384                let maybe_key = trx.get(key_ref, false).await?;
385
386                checked_existence_ref.store(true, Ordering::SeqCst);
387
388                match maybe_key {
389                    None => {
390                        // `tenant_not_found` error
391                        Err(FdbBindingError::from(FdbError::new(2131)))
392                    }
393                    Some(_) => {
394                        trx.clear(key_ref);
395                        Ok(())
396                    }
397                }
398            }
399        })
400        .await
401        // error can only be an fdb_error
402        .map_err(|e| e.get_fdb_error().unwrap())
403    }
404
405    /// Lists all tenants in between the range specified. The number of tenants listed can be restricted.
406    /// This is a convenience method that generates the begin and end ranges by packing two Tuples.
407    pub async fn list_tenant(
408        db: &Database,
409        begin: &[u8],
410        end: &[u8],
411        limit: Option<usize>,
412    ) -> Result<Vec<Result<TenantInfo, serde_json::Error>>, FdbError> {
413        let trx = db.create_trx()?;
414        trx.set_option(TransactionOption::ReadSystemKeys)?;
415        trx.set_option(TransactionOption::ReadLockAware)?;
416
417        let mut begin_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + begin.len());
418        begin_range.extend_from_slice(TENANT_MAP_PREFIX);
419        begin_range.extend_from_slice(begin);
420
421        let end_range = if end.is_empty() {
422            TENANT_MAP_PREFIX_END.to_vec()
423        } else {
424            let mut end_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + end.len());
425            end_range.extend_from_slice(TENANT_MAP_PREFIX);
426            end_range.extend_from_slice(end);
427
428            end_range
429        };
430
431        let range_option = RangeOption {
432            begin: KeySelector::first_greater_than(begin_range),
433            end: KeySelector::first_greater_than(end_range),
434            limit,
435            ..Default::default()
436        };
437
438        trx.get_ranges_keyvalues(range_option, false)
439            .map_ok(|fdb_value| TenantInfo::try_from((fdb_value.key(), fdb_value.value())))
440            .try_collect::<Vec<Result<TenantInfo, serde_json::Error>>>()
441            .await
442    }
443}