foundationdb/
database.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBDatabase C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#database>
12
13use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use foundationdb_macros::cfg_api_versions;
20use foundationdb_sys as fdb_sys;
21
22use crate::options;
23use crate::transaction::*;
24use crate::{error, FdbError, FdbResult};
25
26use crate::error::FdbBindingError;
27use futures::prelude::*;
28
29#[cfg(any(feature = "fdb-7_1", feature = "fdb-7_3"))]
30#[cfg(feature = "tenant-experimental")]
31use crate::tenant::FdbTenant;
32
33/// Wrapper around the boolean representing whether the
34/// previous transaction is still on fly
35/// This wrapper prevents the boolean to be copy and force it
36/// to be moved instead.
37/// This pretty handy when you don't want to see the `Database::run` closure
38/// capturing the environment.
39pub struct MaybeCommitted(bool);
40
41impl From<MaybeCommitted> for bool {
42    fn from(value: MaybeCommitted) -> Self {
43        value.0
44    }
45}
46
47/// Represents a FoundationDB database
48///
49/// A mutable, lexicographically ordered mapping from binary keys to binary values.
50///
51/// Modifications to a database are performed via transactions.
52pub struct Database {
53    pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
54}
55unsafe impl Send for Database {}
56unsafe impl Sync for Database {}
57impl Drop for Database {
58    fn drop(&mut self) {
59        unsafe {
60            fdb_sys::fdb_database_destroy(self.inner.as_ptr());
61        }
62    }
63}
64
65#[cfg_api_versions(min = 610)]
66impl Database {
67    /// Create a database for the given configuration path if any, or the default one.
68    pub fn new(path: Option<&str>) -> FdbResult<Database> {
69        let path_str =
70            path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
71        let path_ptr = path_str
72            .as_ref()
73            .map(|path| path.as_ptr())
74            .unwrap_or(std::ptr::null());
75        let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
76        let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
77        drop(path_str); // path_str own the CString that we are getting the ptr from
78        error::eval(err)?;
79        let ptr =
80            NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
81        Ok(Self::new_from_pointer(ptr))
82    }
83
84    /// Create a new FDBDatabase from a raw pointer. Users are expected to use the `new` method.
85    pub fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
86        Self { inner: ptr }
87    }
88
89    /// Create a database for the given configuration path
90    pub fn from_path(path: &str) -> FdbResult<Database> {
91        Self::new(Some(path))
92    }
93
94    /// Create a database for the default configuration path
95    #[allow(clippy::should_implement_trait)]
96    pub fn default() -> FdbResult<Database> {
97        Self::new(None)
98    }
99}
100
101#[cfg_api_versions(min = 710)]
102#[cfg(feature = "tenant-experimental")]
103impl Database {
104    pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
105        let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
106        let err = unsafe {
107            fdb_sys::fdb_database_open_tenant(
108                self.inner.as_ptr(),
109                tenant_name.as_ptr(),
110                tenant_name.len().try_into().unwrap(),
111                &mut ptr,
112            )
113        };
114        error::eval(err)?;
115        Ok(FdbTenant {
116            inner: NonNull::new(ptr)
117                .expect("fdb_database_open_tenant to not return null if there is no error"),
118            name: tenant_name.to_owned(),
119        })
120    }
121}
122
123#[cfg_api_versions(min = 730)]
124impl Database {
125    /// Retrieve a client-side status information in a JSON format.
126    pub fn get_client_status(
127        &self,
128    ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin {
129        crate::future::FdbFuture::new(unsafe {
130            fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
131        })
132    }
133}
134
135impl Database {
136    /// Create a database for the given configuration path
137    ///
138    /// This is a compatibility api. If you only use API version ≥ 610 you should
139    /// use `Database::new`, `Database::from_path` or  `Database::default`.
140    pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
141        #[cfg(any(feature = "fdb-5_1", feature = "fdb-5_2", feature = "fdb-6_0"))]
142        {
143            let cluster = crate::cluster::Cluster::new(path).await?;
144            let database = cluster.create_database().await?;
145            Ok(database)
146        }
147
148        #[cfg(not(any(feature = "fdb-5_1", feature = "fdb-5_2", feature = "fdb-6_0")))]
149        {
150            Database::new(path)
151        }
152    }
153
154    /// Called to set an option an on `Database`.
155    pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
156        unsafe { opt.apply(self.inner.as_ptr()) }
157    }
158
159    /// Creates a new transaction on the given database.
160    pub fn create_trx(&self) -> FdbResult<Transaction> {
161        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
162        let err =
163            unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
164        error::eval(err)?;
165        Ok(Transaction::new(NonNull::new(trx).expect(
166            "fdb_database_create_transaction to not return null if there is no error",
167        )))
168    }
169
170    fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
171        Ok(RetryableTransaction::new(self.create_trx()?))
172    }
173
174    /// `transact` returns a future which retries on error. It tries to resolve a future created by
175    /// caller-provided function `f` inside a retry loop, providing it with a newly created
176    /// transaction. After caller-provided future resolves, the transaction will be committed
177    /// automatically.
178    ///
179    /// # Warning
180    ///
181    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
182    /// set `TransactionOption::RetryLimit` or `TransactionOption::SetTimeout` on the transaction
183    /// if the task need to be guaranteed to finish.
184    ///
185    /// Once [Generic Associated Types](https://github.com/rust-lang/rfcs/blob/master/text/1598-generic_associated_types.md)
186    /// lands in stable rust, the returned future of f won't need to be boxed anymore, also the
187    /// lifetime limitations around f might be lowered.
188    pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
189    where
190        F: DatabaseTransact,
191    {
192        let is_idempotent = options.is_idempotent;
193        let time_out = options.time_out.map(|d| Instant::now() + d);
194        let retry_limit = options.retry_limit;
195        let mut tries: u32 = 0;
196        let mut trx = self.create_trx()?;
197        let mut can_retry = move || {
198            tries += 1;
199            retry_limit.map(|limit| tries < limit).unwrap_or(true)
200                && time_out.map(|t| Instant::now() < t).unwrap_or(true)
201        };
202        loop {
203            let r = f.transact(trx).await;
204            f = r.0;
205            trx = r.1;
206            trx = match r.2 {
207                Ok(item) => match trx.commit().await {
208                    Ok(_) => break Ok(item),
209                    Err(e) => {
210                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
211                            e.on_error().await?
212                        } else {
213                            break Err(F::Error::from(e.into()));
214                        }
215                    }
216                },
217                Err(user_err) => match user_err.try_into_fdb_error() {
218                    Ok(e) => {
219                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
220                            trx.on_error(e).await?
221                        } else {
222                            break Err(F::Error::from(e));
223                        }
224                    }
225                    Err(user_err) => break Err(user_err),
226                },
227            };
228        }
229    }
230
231    pub fn transact_boxed<'trx, F, D, T, E>(
232        &'trx self,
233        data: D,
234        f: F,
235        options: TransactOption,
236    ) -> impl Future<Output = Result<T, E>> + Send + 'trx
237    where
238        for<'a> F: FnMut(
239            &'a Transaction,
240            &'a mut D,
241        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
242        E: TransactError,
243        F: Send + 'trx,
244        T: Send + 'trx,
245        E: Send + 'trx,
246        D: Send + 'trx,
247    {
248        self.transact(
249            boxed::FnMutBoxed {
250                f,
251                d: data,
252                m: PhantomData,
253            },
254            options,
255        )
256    }
257
258    pub fn transact_boxed_local<'trx, F, D, T, E>(
259        &'trx self,
260        data: D,
261        f: F,
262        options: TransactOption,
263    ) -> impl Future<Output = Result<T, E>> + 'trx
264    where
265        for<'a> F:
266            FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
267        E: TransactError,
268        F: 'trx,
269        T: 'trx,
270        E: 'trx,
271        D: 'trx,
272    {
273        self.transact(
274            boxed_local::FnMutBoxedLocal {
275                f,
276                d: data,
277                m: PhantomData,
278            },
279            options,
280        )
281    }
282
283    /// Runs a transactional function against this Database with retry logic.
284    /// The associated closure will be called until a non-retryable FDBError
285    /// is thrown or commit(), returns success.
286    ///
287    /// Users are **not** expected to keep reference to the `RetryableTransaction`. If a weak or strong
288    /// reference is kept by the user, the binding will throw an error.
289    ///
290    /// # Warning: retry
291    ///
292    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
293    /// set [`options::TransactionOption::RetryLimit`] or [`options::TransactionOption::Timeout`] on the transaction
294    /// if the task need to be guaranteed to finish. These options can be safely set on every iteration of the closure.
295    ///
296    /// # Warning: Maybe committed transactions
297    ///
298    /// As with other client/server databases, in some failure scenarios a client may be unable to determine
299    /// whether a transaction succeeded. You should make sure your closure is idempotent.
300    ///
301    /// The closure will notify the user in case of a maybe_committed transaction in a previous run
302    ///  with the `MaybeCommitted` provided in the closure.
303    ///
304    /// This one can used as boolean with
305    /// ```ignore
306    /// db.run(|trx, maybe_committed| async {
307    ///     if maybe_committed.into() {
308    ///         // Handle the problem if needed
309    ///     }
310    /// }).await;
311    ///```
312    pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
313    where
314        F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
315        Fut: Future<Output = Result<T, FdbBindingError>>,
316    {
317        let mut maybe_committed_transaction = false;
318        // we just need to create the transaction once,
319        // in case there is a error, it will be reset automatically
320        let mut transaction = self.create_retryable_trx()?;
321
322        loop {
323            // executing the closure
324            let result_closure = closure(
325                transaction.clone(),
326                MaybeCommitted(maybe_committed_transaction),
327            )
328            .await;
329
330            if let Err(e) = result_closure {
331                // checks if it is an FdbError
332                if let Some(e) = e.get_fdb_error() {
333                    maybe_committed_transaction = e.is_maybe_committed();
334                    // The closure returned an Error,
335                    match transaction.on_error(e).await {
336                        // we can retry the error
337                        Ok(Ok(t)) => {
338                            transaction = t;
339                            continue;
340                        }
341                        Ok(Err(non_retryable_error)) => {
342                            return Err(FdbBindingError::from(non_retryable_error))
343                        }
344                        // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
345                        Err(non_retryable_error) => return Err(non_retryable_error),
346                    }
347                }
348                // Otherwise, it cannot be retried
349                return Err(e);
350            }
351
352            let commit_result = transaction.commit().await;
353
354            match commit_result {
355                // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
356                Err(err) => return Err(err),
357                Ok(Ok(_)) => return result_closure,
358                Ok(Err(transaction_commit_error)) => {
359                    maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
360                    // we have an error during commit, checking if it is a retryable error
361                    match transaction_commit_error.on_error().await {
362                        Ok(t) => {
363                            transaction = RetryableTransaction::new(t);
364                            continue;
365                        }
366                        Err(non_retryable_error) => {
367                            return Err(FdbBindingError::from(non_retryable_error))
368                        }
369                    }
370                }
371            }
372        }
373    }
374
375    /// Perform a no-op against FDB to check network thread liveness. This operation will not change the underlying data
376    /// in any way, nor will it perform any I/O against the FDB cluster. However, it will schedule some amount of work
377    /// onto the FDB client and wait for it to complete. The FoundationDB client operates by scheduling onto an event
378    /// queue that is then processed by a single thread (the "network thread"). This method can be used to determine if
379    /// the network thread has entered a state where it is no longer processing requests or if its time to process
380    /// requests has increased. If the network thread is busy, this operation may take some amount of time to complete,
381    /// which is why this operation returns a future.
382    pub async fn perform_no_op(&self) -> FdbResult<()> {
383        let trx = self.create_trx()?;
384
385        // Set the read version of the transaction, then read it back. This requires no I/O, but it does
386        // require the network thread be running. The exact value used for the read version is unimportant.
387        trx.set_read_version(42);
388        trx.get_read_version().await?;
389        Ok(())
390    }
391
392    /// Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that the client is saturated.
393    /// By default, this value is updated every second.
394    #[cfg_api_versions(min = 710)]
395    pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
396        let busyness =
397            unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
398        Ok(busyness)
399    }
400}
401pub trait DatabaseTransact: Sized {
402    type Item;
403    type Error: TransactError;
404    type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
405    fn transact(self, trx: Transaction) -> Self::Future;
406}
407
408#[allow(clippy::needless_lifetimes)]
409#[allow(clippy::type_complexity)]
410mod boxed {
411    use super::*;
412
413    async fn boxed_data_fut<'t, F, T, E, D>(
414        mut f: FnMutBoxed<'t, F, D>,
415        trx: Transaction,
416    ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
417    where
418        F: for<'a> FnMut(
419            &'a Transaction,
420            &'a mut D,
421        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
422        E: TransactError,
423    {
424        let r = (f.f)(&trx, &mut f.d).await;
425        (f, trx, r)
426    }
427
428    pub struct FnMutBoxed<'t, F, D> {
429        pub f: F,
430        pub d: D,
431        pub m: PhantomData<&'t ()>,
432    }
433    impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
434    where
435        F: for<'a> FnMut(
436            &'a Transaction,
437            &'a mut D,
438        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
439        F: 't + Send,
440        T: 't,
441        E: 't,
442        D: 't + Send,
443        E: TransactError,
444    {
445        type Item = T;
446        type Error = E;
447        type Future = Pin<
448            Box<
449                dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
450                    + Send
451                    + 't,
452            >,
453        >;
454
455        fn transact(self, trx: Transaction) -> Self::Future {
456            boxed_data_fut(self, trx).boxed()
457        }
458    }
459}
460
461#[allow(clippy::needless_lifetimes)]
462#[allow(clippy::type_complexity)]
463mod boxed_local {
464    use super::*;
465
466    async fn boxed_local_data_fut<'t, F, T, E, D>(
467        mut f: FnMutBoxedLocal<'t, F, D>,
468        trx: Transaction,
469    ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
470    where
471        F: for<'a> FnMut(
472            &'a Transaction,
473            &'a mut D,
474        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
475        E: TransactError,
476    {
477        let r = (f.f)(&trx, &mut f.d).await;
478        (f, trx, r)
479    }
480
481    pub struct FnMutBoxedLocal<'t, F, D> {
482        pub f: F,
483        pub d: D,
484        pub m: PhantomData<&'t ()>,
485    }
486    impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
487    where
488        F: for<'a> FnMut(
489            &'a Transaction,
490            &'a mut D,
491        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
492        F: 't,
493        T: 't,
494        E: 't,
495        D: 't,
496        E: TransactError,
497    {
498        type Item = T;
499        type Error = E;
500        type Future = Pin<
501            Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
502        >;
503
504        fn transact(self, trx: Transaction) -> Self::Future {
505            boxed_local_data_fut(self, trx).boxed_local()
506        }
507    }
508}
509
510/// A trait that must be implemented to use `Database::transact` this application error types.
511pub trait TransactError: From<FdbError> {
512    fn try_into_fdb_error(self) -> Result<FdbError, Self>;
513}
514impl<T> TransactError for T
515where
516    T: From<FdbError> + TryInto<FdbError, Error = T>,
517{
518    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
519        self.try_into()
520    }
521}
522impl TransactError for FdbError {
523    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
524        Ok(self)
525    }
526}
527
528/// A set of options that controls the behavior of `Database::transact`.
529#[derive(Default, Clone)]
530pub struct TransactOption {
531    pub retry_limit: Option<u32>,
532    pub time_out: Option<Duration>,
533    pub is_idempotent: bool,
534}
535
536impl TransactOption {
537    /// An idempotent TransactOption
538    pub fn idempotent() -> Self {
539        Self {
540            is_idempotent: true,
541            ..TransactOption::default()
542        }
543    }
544}