foundationdb/
database.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBDatabase C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#database>
12
13use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use fdb_sys::if_cfg_api_versions;
20use foundationdb_macros::cfg_api_versions;
21use foundationdb_sys as fdb_sys;
22
23use crate::options;
24use crate::transaction::*;
25use crate::{error, FdbError, FdbResult};
26
27use crate::error::FdbBindingError;
28use futures::prelude::*;
29
30#[cfg_api_versions(min = 710)]
31#[cfg(feature = "tenant-experimental")]
32use crate::tenant::FdbTenant;
33
34/// Wrapper around the boolean representing whether the
35/// previous transaction is still on fly
36/// This wrapper prevents the boolean to be copy and force it
37/// to be moved instead.
38/// This pretty handy when you don't want to see the `Database::run` closure
39/// capturing the environment.
40pub struct MaybeCommitted(bool);
41
42impl From<MaybeCommitted> for bool {
43    fn from(value: MaybeCommitted) -> Self {
44        value.0
45    }
46}
47
48/// Represents a FoundationDB database
49///
50/// A mutable, lexicographically ordered mapping from binary keys to binary values.
51///
52/// Modifications to a database are performed via transactions.
53pub struct Database {
54    pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
55}
56unsafe impl Send for Database {}
57unsafe impl Sync for Database {}
58impl Drop for Database {
59    fn drop(&mut self) {
60        unsafe {
61            fdb_sys::fdb_database_destroy(self.inner.as_ptr());
62        }
63    }
64}
65
66#[cfg_api_versions(min = 610)]
67impl Database {
68    /// Create a database for the given configuration path if any, or the default one.
69    pub fn new(path: Option<&str>) -> FdbResult<Database> {
70        let path_str =
71            path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
72        let path_ptr = path_str
73            .as_ref()
74            .map(|path| path.as_ptr())
75            .unwrap_or(std::ptr::null());
76        let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
77        let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
78        drop(path_str); // path_str own the CString that we are getting the ptr from
79        error::eval(err)?;
80        let ptr =
81            NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
82        Ok(Self::new_from_pointer(ptr))
83    }
84
85    /// Create a new FDBDatabase from a raw pointer. Users are expected to use the `new` method.
86    pub fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
87        Self { inner: ptr }
88    }
89
90    /// Create a database for the given configuration path
91    pub fn from_path(path: &str) -> FdbResult<Database> {
92        Self::new(Some(path))
93    }
94
95    /// Create a database for the default configuration path
96    #[allow(clippy::should_implement_trait)]
97    pub fn default() -> FdbResult<Database> {
98        Self::new(None)
99    }
100}
101
102#[cfg_api_versions(min = 710)]
103#[cfg(feature = "tenant-experimental")]
104impl Database {
105    pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
106        let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
107        let err = unsafe {
108            fdb_sys::fdb_database_open_tenant(
109                self.inner.as_ptr(),
110                tenant_name.as_ptr(),
111                tenant_name.len().try_into().unwrap(),
112                &mut ptr,
113            )
114        };
115        error::eval(err)?;
116        Ok(FdbTenant {
117            inner: NonNull::new(ptr)
118                .expect("fdb_database_open_tenant to not return null if there is no error"),
119            name: tenant_name.to_owned(),
120        })
121    }
122}
123
124#[cfg_api_versions(min = 730)]
125impl Database {
126    /// Retrieve a client-side status information in a JSON format.
127    pub fn get_client_status(
128        &self,
129    ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin {
130        crate::future::FdbFuture::new(unsafe {
131            fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
132        })
133    }
134}
135
136impl Database {
137    /// Create a database for the given configuration path
138    ///
139    /// This is a compatibility api. If you only use API version ≥ 610 you should
140    /// use `Database::new`, `Database::from_path` or  `Database::default`.
141    pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
142        if_cfg_api_versions!(min = 510, max = 600 => {
143            let cluster = crate::cluster::Cluster::new(path).await?;
144            let database = cluster.create_database().await?;
145            Ok(database)
146        } else {
147            Database::new(path)
148        })
149    }
150
151    /// Called to set an option an on `Database`.
152    pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
153        unsafe { opt.apply(self.inner.as_ptr()) }
154    }
155
156    /// Creates a new transaction on the given database.
157    pub fn create_trx(&self) -> FdbResult<Transaction> {
158        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
159        let err =
160            unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
161        error::eval(err)?;
162        Ok(Transaction::new(NonNull::new(trx).expect(
163            "fdb_database_create_transaction to not return null if there is no error",
164        )))
165    }
166
167    fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
168        Ok(RetryableTransaction::new(self.create_trx()?))
169    }
170
171    /// `transact` returns a future which retries on error. It tries to resolve a future created by
172    /// caller-provided function `f` inside a retry loop, providing it with a newly created
173    /// transaction. After caller-provided future resolves, the transaction will be committed
174    /// automatically.
175    ///
176    /// # Warning
177    ///
178    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
179    /// set `TransactionOption::RetryLimit` or `TransactionOption::SetTimeout` on the transaction
180    /// if the task need to be guaranteed to finish.
181    ///
182    /// Once [Generic Associated Types](https://github.com/rust-lang/rfcs/blob/master/text/1598-generic_associated_types.md)
183    /// lands in stable rust, the returned future of f won't need to be boxed anymore, also the
184    /// lifetime limitations around f might be lowered.
185    pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
186    where
187        F: DatabaseTransact,
188    {
189        let is_idempotent = options.is_idempotent;
190        let time_out = options.time_out.map(|d| Instant::now() + d);
191        let retry_limit = options.retry_limit;
192        let mut tries: u32 = 0;
193        let mut trx = self.create_trx()?;
194        let mut can_retry = move || {
195            tries += 1;
196            retry_limit.map(|limit| tries < limit).unwrap_or(true)
197                && time_out.map(|t| Instant::now() < t).unwrap_or(true)
198        };
199        loop {
200            let r = f.transact(trx).await;
201            f = r.0;
202            trx = r.1;
203            trx = match r.2 {
204                Ok(item) => match trx.commit().await {
205                    Ok(_) => break Ok(item),
206                    Err(e) => {
207                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
208                            e.on_error().await?
209                        } else {
210                            break Err(F::Error::from(e.into()));
211                        }
212                    }
213                },
214                Err(user_err) => match user_err.try_into_fdb_error() {
215                    Ok(e) => {
216                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
217                            trx.on_error(e).await?
218                        } else {
219                            break Err(F::Error::from(e));
220                        }
221                    }
222                    Err(user_err) => break Err(user_err),
223                },
224            };
225        }
226    }
227
228    pub fn transact_boxed<'trx, F, D, T, E>(
229        &'trx self,
230        data: D,
231        f: F,
232        options: TransactOption,
233    ) -> impl Future<Output = Result<T, E>> + Send + 'trx
234    where
235        for<'a> F: FnMut(
236            &'a Transaction,
237            &'a mut D,
238        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
239        E: TransactError,
240        F: Send + 'trx,
241        T: Send + 'trx,
242        E: Send + 'trx,
243        D: Send + 'trx,
244    {
245        self.transact(
246            boxed::FnMutBoxed {
247                f,
248                d: data,
249                m: PhantomData,
250            },
251            options,
252        )
253    }
254
255    pub fn transact_boxed_local<'trx, F, D, T, E>(
256        &'trx self,
257        data: D,
258        f: F,
259        options: TransactOption,
260    ) -> impl Future<Output = Result<T, E>> + 'trx
261    where
262        for<'a> F:
263            FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
264        E: TransactError,
265        F: 'trx,
266        T: 'trx,
267        E: 'trx,
268        D: 'trx,
269    {
270        self.transact(
271            boxed_local::FnMutBoxedLocal {
272                f,
273                d: data,
274                m: PhantomData,
275            },
276            options,
277        )
278    }
279
280    /// Runs a transactional function against this Database with retry logic.
281    /// The associated closure will be called until a non-retryable FDBError
282    /// is thrown or commit(), returns success.
283    ///
284    /// Users are **not** expected to keep reference to the `RetryableTransaction`. If a weak or strong
285    /// reference is kept by the user, the binding will throw an error.
286    ///
287    /// # Warning: retry
288    ///
289    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
290    /// set [`options::TransactionOption::RetryLimit`] or [`options::TransactionOption::Timeout`] on the transaction
291    /// if the task need to be guaranteed to finish. These options can be safely set on every iteration of the closure.
292    ///
293    /// # Warning: Maybe committed transactions
294    ///
295    /// As with other client/server databases, in some failure scenarios a client may be unable to determine
296    /// whether a transaction succeeded. You should make sure your closure is idempotent.
297    ///
298    /// The closure will notify the user in case of a maybe_committed transaction in a previous run
299    ///  with the `MaybeCommitted` provided in the closure.
300    ///
301    /// This one can used as boolean with
302    /// ```ignore
303    /// db.run(|trx, maybe_committed| async {
304    ///     if maybe_committed.into() {
305    ///         // Handle the problem if needed
306    ///     }
307    /// }).await;
308    ///```
309    pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
310    where
311        F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
312        Fut: Future<Output = Result<T, FdbBindingError>>,
313    {
314        let mut maybe_committed_transaction = false;
315        // we just need to create the transaction once,
316        // in case there is a error, it will be reset automatically
317        let mut transaction = self.create_retryable_trx()?;
318
319        loop {
320            // executing the closure
321            let result_closure = closure(
322                transaction.clone(),
323                MaybeCommitted(maybe_committed_transaction),
324            )
325            .await;
326
327            if let Err(e) = result_closure {
328                // checks if it is an FdbError
329                if let Some(e) = e.get_fdb_error() {
330                    maybe_committed_transaction = e.is_maybe_committed();
331                    // The closure returned an Error,
332                    match transaction.on_error(e).await {
333                        // we can retry the error
334                        Ok(Ok(t)) => {
335                            transaction = t;
336                            continue;
337                        }
338                        Ok(Err(non_retryable_error)) => {
339                            return Err(FdbBindingError::from(non_retryable_error))
340                        }
341                        // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
342                        Err(non_retryable_error) => return Err(non_retryable_error),
343                    }
344                }
345                // Otherwise, it cannot be retried
346                return Err(e);
347            }
348
349            let commit_result = transaction.commit().await;
350
351            match commit_result {
352                // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
353                Err(err) => return Err(err),
354                Ok(Ok(_)) => return result_closure,
355                Ok(Err(transaction_commit_error)) => {
356                    maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
357                    // we have an error during commit, checking if it is a retryable error
358                    match transaction_commit_error.on_error().await {
359                        Ok(t) => {
360                            transaction = RetryableTransaction::new(t);
361                            continue;
362                        }
363                        Err(non_retryable_error) => {
364                            return Err(FdbBindingError::from(non_retryable_error))
365                        }
366                    }
367                }
368            }
369        }
370    }
371
372    /// Perform a no-op against FDB to check network thread liveness. This operation will not change the underlying data
373    /// in any way, nor will it perform any I/O against the FDB cluster. However, it will schedule some amount of work
374    /// onto the FDB client and wait for it to complete. The FoundationDB client operates by scheduling onto an event
375    /// queue that is then processed by a single thread (the "network thread"). This method can be used to determine if
376    /// the network thread has entered a state where it is no longer processing requests or if its time to process
377    /// requests has increased. If the network thread is busy, this operation may take some amount of time to complete,
378    /// which is why this operation returns a future.
379    pub async fn perform_no_op(&self) -> FdbResult<()> {
380        let trx = self.create_trx()?;
381
382        // Set the read version of the transaction, then read it back. This requires no I/O, but it does
383        // require the network thread be running. The exact value used for the read version is unimportant.
384        trx.set_read_version(42);
385        trx.get_read_version().await?;
386        Ok(())
387    }
388
389    /// Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that the client is saturated.
390    /// By default, this value is updated every second.
391    #[cfg_api_versions(min = 710)]
392    pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
393        let busyness =
394            unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
395        Ok(busyness)
396    }
397}
398pub trait DatabaseTransact: Sized {
399    type Item;
400    type Error: TransactError;
401    type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
402    fn transact(self, trx: Transaction) -> Self::Future;
403}
404
405#[allow(clippy::needless_lifetimes)]
406#[allow(clippy::type_complexity)]
407mod boxed {
408    use super::*;
409
410    async fn boxed_data_fut<'t, F, T, E, D>(
411        mut f: FnMutBoxed<'t, F, D>,
412        trx: Transaction,
413    ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
414    where
415        F: for<'a> FnMut(
416            &'a Transaction,
417            &'a mut D,
418        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
419        E: TransactError,
420    {
421        let r = (f.f)(&trx, &mut f.d).await;
422        (f, trx, r)
423    }
424
425    pub struct FnMutBoxed<'t, F, D> {
426        pub f: F,
427        pub d: D,
428        pub m: PhantomData<&'t ()>,
429    }
430    impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
431    where
432        F: for<'a> FnMut(
433            &'a Transaction,
434            &'a mut D,
435        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
436        F: 't + Send,
437        T: 't,
438        E: 't,
439        D: 't + Send,
440        E: TransactError,
441    {
442        type Item = T;
443        type Error = E;
444        type Future = Pin<
445            Box<
446                dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
447                    + Send
448                    + 't,
449            >,
450        >;
451
452        fn transact(self, trx: Transaction) -> Self::Future {
453            boxed_data_fut(self, trx).boxed()
454        }
455    }
456}
457
458#[allow(clippy::needless_lifetimes)]
459#[allow(clippy::type_complexity)]
460mod boxed_local {
461    use super::*;
462
463    async fn boxed_local_data_fut<'t, F, T, E, D>(
464        mut f: FnMutBoxedLocal<'t, F, D>,
465        trx: Transaction,
466    ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
467    where
468        F: for<'a> FnMut(
469            &'a Transaction,
470            &'a mut D,
471        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
472        E: TransactError,
473    {
474        let r = (f.f)(&trx, &mut f.d).await;
475        (f, trx, r)
476    }
477
478    pub struct FnMutBoxedLocal<'t, F, D> {
479        pub f: F,
480        pub d: D,
481        pub m: PhantomData<&'t ()>,
482    }
483    impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
484    where
485        F: for<'a> FnMut(
486            &'a Transaction,
487            &'a mut D,
488        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
489        F: 't,
490        T: 't,
491        E: 't,
492        D: 't,
493        E: TransactError,
494    {
495        type Item = T;
496        type Error = E;
497        type Future = Pin<
498            Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
499        >;
500
501        fn transact(self, trx: Transaction) -> Self::Future {
502            boxed_local_data_fut(self, trx).boxed_local()
503        }
504    }
505}
506
507/// A trait that must be implemented to use `Database::transact` this application error types.
508pub trait TransactError: From<FdbError> {
509    fn try_into_fdb_error(self) -> Result<FdbError, Self>;
510}
511impl<T> TransactError for T
512where
513    T: From<FdbError> + TryInto<FdbError, Error = T>,
514{
515    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
516        self.try_into()
517    }
518}
519impl TransactError for FdbError {
520    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
521        Ok(self)
522    }
523}
524
525/// A set of options that controls the behavior of `Database::transact`.
526#[derive(Default, Clone)]
527pub struct TransactOption {
528    pub retry_limit: Option<u32>,
529    pub time_out: Option<Duration>,
530    pub is_idempotent: bool,
531}
532
533impl TransactOption {
534    /// An idempotent TransactOption
535    pub fn idempotent() -> Self {
536        Self {
537            is_idempotent: true,
538            ..TransactOption::default()
539        }
540    }
541}