foundationdb/
database.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBDatabase C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#database>
12
13use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use fdb_sys::if_cfg_api_versions;
20use foundationdb_macros::cfg_api_versions;
21use foundationdb_sys as fdb_sys;
22
23use crate::metrics::{MetricsReport, TransactionMetrics};
24use crate::options;
25use crate::transaction::*;
26use crate::{error, FdbError, FdbResult};
27
28use crate::error::FdbBindingError;
29#[cfg_api_versions(min = 710)]
30#[cfg(feature = "tenant-experimental")]
31use crate::tenant::FdbTenant;
32use futures::prelude::*;
33
34/// Wrapper around the boolean representing whether the
35/// previous transaction is still on fly
36/// This wrapper prevents the boolean to be copy and force it
37/// to be moved instead.
38/// This pretty handy when you don't want to see the `Database::run` closure
39/// capturing the environment.
40pub struct MaybeCommitted(bool);
41
42impl From<MaybeCommitted> for bool {
43    fn from(value: MaybeCommitted) -> Self {
44        value.0
45    }
46}
47
48/// Represents a FoundationDB database
49///
50/// A mutable, lexicographically ordered mapping from binary keys to binary values.
51///
52/// Modifications to a database are performed via transactions.
53pub struct Database {
54    pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
55}
56unsafe impl Send for Database {}
57unsafe impl Sync for Database {}
58impl Drop for Database {
59    fn drop(&mut self) {
60        unsafe {
61            fdb_sys::fdb_database_destroy(self.inner.as_ptr());
62        }
63    }
64}
65
66#[cfg_api_versions(min = 610)]
67impl Database {
68    /// Create a database for the given configuration path if any, or the default one.
69    pub fn new(path: Option<&str>) -> FdbResult<Database> {
70        let path_str =
71            path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
72        let path_ptr = path_str
73            .as_ref()
74            .map(|path| path.as_ptr())
75            .unwrap_or(std::ptr::null());
76        let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
77        let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
78        drop(path_str); // path_str own the CString that we are getting the ptr from
79        error::eval(err)?;
80        let ptr =
81            NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
82        Ok(Self::new_from_pointer(ptr))
83    }
84
85    /// Create a new FDBDatabase from a raw pointer. Users are expected to use the `new` method.
86    pub fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
87        Self { inner: ptr }
88    }
89
90    /// Create a database for the given configuration path
91    pub fn from_path(path: &str) -> FdbResult<Database> {
92        Self::new(Some(path))
93    }
94
95    /// Create a database for the default configuration path
96    #[allow(clippy::should_implement_trait)]
97    pub fn default() -> FdbResult<Database> {
98        Self::new(None)
99    }
100}
101
102#[cfg_api_versions(min = 710)]
103#[cfg(feature = "tenant-experimental")]
104impl Database {
105    pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
106        let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
107        let err = unsafe {
108            fdb_sys::fdb_database_open_tenant(
109                self.inner.as_ptr(),
110                tenant_name.as_ptr(),
111                tenant_name.len().try_into().unwrap(),
112                &mut ptr,
113            )
114        };
115        error::eval(err)?;
116        Ok(FdbTenant {
117            inner: NonNull::new(ptr)
118                .expect("fdb_database_open_tenant to not return null if there is no error"),
119            name: tenant_name.to_owned(),
120        })
121    }
122}
123
124#[cfg_api_versions(min = 730)]
125impl Database {
126    /// Retrieve a client-side status information in a JSON format.
127    pub fn get_client_status(
128        &self,
129    ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin {
130        crate::future::FdbFuture::new(unsafe {
131            fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
132        })
133    }
134}
135
136impl Database {
137    /// Create a database for the given configuration path
138    ///
139    /// This is a compatibility api. If you only use API version ≥ 610 you should
140    /// use `Database::new`, `Database::from_path` or  `Database::default`.
141    pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
142        if_cfg_api_versions!(min = 510, max = 600 => {
143            let cluster = crate::cluster::Cluster::new(path).await?;
144            let database = cluster.create_database().await?;
145            Ok(database)
146        } else {
147            Database::new(path)
148        })
149    }
150
151    /// Called to set an option an on `Database`.
152    pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
153        unsafe { opt.apply(self.inner.as_ptr()) }
154    }
155
156    /// Creates a new transaction on the given database.
157    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
158    pub fn create_trx(&self) -> FdbResult<Transaction> {
159        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
160        let err =
161            unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
162        error::eval(err)?;
163        Ok(Transaction::new(NonNull::new(trx).expect(
164            "fdb_database_create_transaction to not return null if there is no error",
165        )))
166    }
167
168    /// Creates a new transaction on the given database with metrics collection.
169    ///
170    /// This method is similar to `create_trx()` but additionally collects metrics about
171    /// the transaction execution, including operation counts, bytes read/written, and retry counts.
172    ///
173    /// # Arguments
174    /// * `metrics` - A TransactionMetrics instance to collect metrics
175    ///
176    /// # Returns
177    /// * `Result<Transaction, (FdbBindingError, MetricsData)>` - A transaction with metrics collection enabled
178    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
179    pub fn create_instrumented_trx(
180        &self,
181        metrics: TransactionMetrics,
182    ) -> Result<Transaction, FdbBindingError> {
183        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
184        let err =
185            unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
186        error::eval(err)?;
187
188        let inner = NonNull::new(trx)
189            .expect("fdb_database_create_transaction to not return null if there is no error");
190        Ok(Transaction::new_instrumented(inner, metrics))
191    }
192
193    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
194    fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
195        Ok(RetryableTransaction::new(self.create_trx()?))
196    }
197
198    /// Creates a new retryable transaction on the given database with metrics collection.
199    ///
200    /// This method is similar to `create_retryable_trx()` but additionally collects metrics about
201    /// the transaction execution, including operation counts, bytes read/written, and retry counts.
202    ///
203    /// # Arguments
204    /// * `metrics` - A TransactionMetrics instance to collect metrics
205    ///
206    /// # Returns
207    /// * `Result<RetryableTransaction, (FdbBindingError, MetricsData)>` - A retryable transaction with metrics collection enabled
208    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
209    pub fn create_intrumented_retryable_trx(
210        &self,
211        metrics: TransactionMetrics,
212    ) -> Result<RetryableTransaction, FdbBindingError> {
213        Ok(RetryableTransaction::new(
214            self.create_instrumented_trx(metrics.clone())?,
215        ))
216    }
217
218    /// `transact` returns a future which retries on error. It tries to resolve a future created by
219    /// caller-provided function `f` inside a retry loop, providing it with a newly created
220    /// transaction. After caller-provided future resolves, the transaction will be committed
221    /// automatically.
222    ///
223    /// # Warning
224    ///
225    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
226    /// set `TransactionOption::RetryLimit` or `TransactionOption::SetTimeout` on the transaction
227    /// if the task need to be guaranteed to finish.
228    ///
229    /// Once [Generic Associated Types](https://github.com/rust-lang/rfcs/blob/master/text/1598-generic_associated_types.md)
230    /// lands in stable rust, the returned future of f won't need to be boxed anymore, also the
231    /// lifetime limitations around f might be lowered.
232    pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
233    where
234        F: DatabaseTransact,
235    {
236        let is_idempotent = options.is_idempotent;
237        let time_out = options.time_out.map(|d| Instant::now() + d);
238        let retry_limit = options.retry_limit;
239        let mut tries: u32 = 0;
240        let mut trx = self.create_trx()?;
241        let mut can_retry = move || {
242            tries += 1;
243            retry_limit.map(|limit| tries < limit).unwrap_or(true)
244                && time_out.map(|t| Instant::now() < t).unwrap_or(true)
245        };
246        loop {
247            let r = f.transact(trx).await;
248            f = r.0;
249            trx = r.1;
250            trx = match r.2 {
251                Ok(item) => match trx.commit().await {
252                    Ok(_) => break Ok(item),
253                    Err(e) => {
254                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
255                            e.on_error().await?
256                        } else {
257                            break Err(F::Error::from(e.into()));
258                        }
259                    }
260                },
261                Err(user_err) => match user_err.try_into_fdb_error() {
262                    Ok(e) => {
263                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
264                            trx.on_error(e).await?
265                        } else {
266                            break Err(F::Error::from(e));
267                        }
268                    }
269                    Err(user_err) => break Err(user_err),
270                },
271            };
272        }
273    }
274
275    pub fn transact_boxed<'trx, F, D, T, E>(
276        &'trx self,
277        data: D,
278        f: F,
279        options: TransactOption,
280    ) -> impl Future<Output = Result<T, E>> + Send + 'trx
281    where
282        for<'a> F: FnMut(
283            &'a Transaction,
284            &'a mut D,
285        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
286        E: TransactError,
287        F: Send + 'trx,
288        T: Send + 'trx,
289        E: Send + 'trx,
290        D: Send + 'trx,
291    {
292        self.transact(
293            boxed::FnMutBoxed {
294                f,
295                d: data,
296                m: PhantomData,
297            },
298            options,
299        )
300    }
301
302    pub fn transact_boxed_local<'trx, F, D, T, E>(
303        &'trx self,
304        data: D,
305        f: F,
306        options: TransactOption,
307    ) -> impl Future<Output = Result<T, E>> + 'trx
308    where
309        for<'a> F:
310            FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
311        E: TransactError,
312        F: 'trx,
313        T: 'trx,
314        E: 'trx,
315        D: 'trx,
316    {
317        self.transact(
318            boxed_local::FnMutBoxedLocal {
319                f,
320                d: data,
321                m: PhantomData,
322            },
323            options,
324        )
325    }
326
327    /// Runs a transactional function against this Database with retry logic.
328    /// The associated closure will be called until a non-retryable FDBError
329    /// is thrown or commit(), returns success.
330    ///
331    /// Users are **not** expected to keep reference to the `RetryableTransaction`. If a weak or strong
332    /// reference is kept by the user, the binding will throw an error.
333    ///
334    /// # Warning: retry
335    ///
336    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
337    /// set [`options::TransactionOption::RetryLimit`] or [`options::TransactionOption::Timeout`] on the transaction
338    /// if the task needs to be guaranteed to finish. These options can be safely set on every iteration of the closure.
339    ///
340    /// # Warning: Maybe committed transactions
341    ///
342    /// As with other client/server databases, in some failure scenarios a client may be unable to determine
343    /// whether a transaction succeeded. You should make sure your closure is idempotent.
344    ///
345    /// The closure will notify the user in case of a maybe_committed transaction in a previous run
346    ///  with the `MaybeCommitted` provided in the closure.
347    ///
348    /// This one can be used as boolean with
349    /// ```ignore
350    /// db.run(|trx, maybe_committed| async {
351    ///     if maybe_committed.into() {
352    ///         // Handle the problem if needed
353    ///     }
354    /// }).await;
355    ///```
356    #[cfg_attr(
357        feature = "trace",
358        tracing::instrument(level = "debug", skip(self, closure))
359    )]
360    pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
361    where
362        F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
363        Fut: Future<Output = Result<T, FdbBindingError>>,
364    {
365        let mut maybe_committed_transaction = false;
366        // we just need to create the transaction once,
367        // in case there is an error; it will be reset automatically
368        let mut transaction = self.create_retryable_trx()?;
369        #[cfg(feature = "trace")]
370        let mut iteration = 0;
371
372        loop {
373            #[cfg(feature = "trace")]
374            {
375                iteration += 1;
376            }
377            // executing the closure
378            let result_closure = closure(
379                transaction.clone(),
380                MaybeCommitted(maybe_committed_transaction),
381            )
382            .await;
383
384            if let Err(e) = result_closure {
385                // checks if it is an FdbError
386                if let Some(e) = e.get_fdb_error() {
387                    maybe_committed_transaction = e.is_maybe_committed();
388                    // The closure returned an Error,
389                    match transaction.on_error(e).await {
390                        // we can retry the error
391                        Ok(Ok(t)) => {
392                            #[cfg(feature = "trace")]
393                            {
394                                let error_code = e.code();
395                                tracing::warn!(iteration, error_code, "restarting transaction");
396                            }
397
398                            transaction = t;
399                            continue;
400                        }
401                        Ok(Err(non_retryable_error)) => {
402                            return Err(FdbBindingError::from(non_retryable_error))
403                        }
404                        // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
405                        Err(non_retryable_error) => return Err(non_retryable_error),
406                    }
407                }
408                // Otherwise, it cannot be retried
409                return Err(e);
410            }
411
412            #[cfg(feature = "trace")]
413            tracing::info!(iteration, "closure executed, checking result...");
414
415            let commit_result = transaction.commit().await;
416
417            match commit_result {
418                // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
419                Err(err) => {
420                    #[cfg(feature = "trace")]
421                    tracing::error!(
422                        iteration,
423                        "transaction reference kept, aborting transaction"
424                    );
425                    return Err(err);
426                }
427                Ok(Ok(_)) => {
428                    #[cfg(feature = "trace")]
429                    tracing::info!(iteration, "success, returning result");
430                    return result_closure;
431                }
432                Ok(Err(transaction_commit_error)) => {
433                    #[cfg(feature = "trace")]
434                    let error_code = transaction_commit_error.code();
435
436                    maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
437                    // we have an error during commit, checking if it is a retryable error
438                    match transaction_commit_error.on_error().await {
439                        Ok(t) => {
440                            #[cfg(feature = "trace")]
441                            tracing::warn!(iteration, error_code, "restarting transaction");
442
443                            transaction = RetryableTransaction::new(t);
444                            continue;
445                        }
446                        Err(non_retryable_error) => {
447                            return Err(FdbBindingError::from(non_retryable_error))
448                        }
449                    }
450                }
451            }
452        }
453    }
454
455    /// Runs a transactional function against this Database with retry logic and metrics collection.
456    /// The associated closure will be called until a non-retryable FDBError
457    /// is thrown or commit() returns success.
458    ///
459    /// This method is similar to `run()` but additionally collects and returns metrics about
460    /// the transaction execution, including operation counts, bytes read/written, and retry counts.
461    ///
462    /// # Arguments
463    /// * `closure` - A function that takes a RetryableTransaction and MaybeCommitted flag and returns a Future
464    ///
465    /// # Returns
466    /// * `Result<(T, Metrics), (FdbBindingError, Metrics)>` - On success, returns the result of the transaction and collected metrics.
467    ///   On failure, returns the error and the metrics collected up to the point of failure.
468    ///
469    /// # Warning: retry
470    ///
471    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
472    /// set [`options::TransactionOption::RetryLimit`] or [`options::TransactionOption::Timeout`] on the transaction
473    /// if the task needs to be guaranteed to finish.
474    ///
475    /// # Warning: Maybe committed transactions
476    ///
477    /// As with other client/server databases, in some failure scenarios a client may be unable to determine
478    /// whether a transaction succeeded. The closure will be notified of a maybe_committed transaction
479    /// in a previous run with the `MaybeCommitted` provided in the closure.
480    #[cfg_attr(
481        feature = "trace",
482        tracing::instrument(level = "debug", skip(self, closure))
483    )]
484    pub async fn instrumented_run<F, Fut, T>(
485        &self,
486        closure: F,
487    ) -> Result<(T, MetricsReport), (FdbBindingError, MetricsReport)>
488    where
489        F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
490        Fut: Future<Output = Result<T, FdbBindingError>>,
491    {
492        let now_start = std::time::Instant::now();
493        let metrics = TransactionMetrics::new();
494        let mut maybe_committed_transaction = false;
495
496        // we just need to create the transaction once,
497        // in case there is a error, it will be reset automatically
498        let mut transaction = match self.create_intrumented_retryable_trx(metrics.clone()) {
499            Ok(trx) => trx,
500            Err(err) => {
501                // Update total execution time before returning
502                let total_duration = now_start.elapsed().as_millis() as u64;
503                metrics.set_execution_time(total_duration);
504                return Err((err, metrics.get_metrics_data()));
505            }
506        };
507
508        loop {
509            // executing the closure
510            let result_closure = closure(
511                transaction.clone(),
512                MaybeCommitted(maybe_committed_transaction),
513            )
514            .await;
515
516            if let Err(error) = result_closure {
517                if let Some(e) = error.get_fdb_error() {
518                    // checks if it is an FdbError
519                    maybe_committed_transaction = e.is_maybe_committed();
520                    // The closure returned an Error,
521                    let now_on_error = std::time::Instant::now();
522                    let on_error_result = transaction.on_error(e).await;
523                    let error_duration = now_on_error.elapsed().as_millis() as u64;
524                    metrics.add_error_time(error_duration);
525
526                    match on_error_result {
527                        // we can retry the error
528                        Ok(Ok(t)) => {
529                            transaction = t;
530                            // Use the original metrics instance to increment retry count
531                            metrics.reset_current();
532                            continue;
533                        }
534                        Ok(Err(non_retryable_error)) => {
535                            let total_duration = now_start.elapsed().as_millis() as u64;
536                            metrics.set_execution_time(total_duration);
537                            return Err((
538                                FdbBindingError::from(non_retryable_error),
539                                metrics.get_metrics_data(),
540                            ));
541                        }
542                        // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
543                        Err(non_retryable_error) => {
544                            let total_duration = now_start.elapsed().as_millis() as u64;
545                            metrics.set_execution_time(total_duration);
546                            return Err((non_retryable_error, metrics.get_metrics_data()));
547                        }
548                    }
549                }
550                // Otherwise, it cannot be retried
551                let total_duration = now_start.elapsed().as_millis() as u64;
552                metrics.set_execution_time(total_duration);
553                return Err((error, metrics.get_metrics_data()));
554            }
555
556            let now_commit = std::time::Instant::now();
557            let commit_result = transaction.commit().await;
558            let commit_duration = now_commit.elapsed().as_millis() as u64;
559            metrics.record_commit_time(commit_duration);
560
561            match commit_result {
562                // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
563                Err(err) => {
564                    let total_duration = now_start.elapsed().as_millis() as u64;
565                    metrics.set_execution_time(total_duration);
566                    return Err((err, metrics.get_metrics_data()));
567                }
568                Ok(Ok(committed)) => {
569                    // Handle committed_version() result properly to match our tuple-based error handling
570                    match committed.committed_version() {
571                        Ok(version) => metrics.set_commit_version(version),
572                        Err(_err) => {
573                            // If we can't get the commit version, we still want to return the result
574                            // but we'll log the error or handle it as needed
575                            // For now, we just continue without setting the commit version
576                        }
577                    }
578
579                    let total_duration = now_start.elapsed().as_millis() as u64;
580                    metrics.set_execution_time(total_duration);
581                    return Ok((result_closure.unwrap(), metrics.get_metrics_data()));
582                }
583                Ok(Err(transaction_commit_error)) => {
584                    maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
585                    // we have an error during commit, checking if it is a retryable error
586                    let now_on_error = std::time::Instant::now();
587                    let on_error_result = transaction_commit_error.on_error().await;
588                    let error_duration = now_on_error.elapsed().as_millis() as u64;
589                    metrics.add_error_time(error_duration);
590
591                    match on_error_result {
592                        Ok(t) => {
593                            transaction = RetryableTransaction::new(t);
594                            // Use the original metrics instance for commit errors too
595                            metrics.reset_current();
596                            continue;
597                        }
598                        Err(non_retryable_error) => {
599                            let total_duration = now_start.elapsed().as_millis() as u64;
600                            metrics.set_execution_time(total_duration);
601                            return Err((
602                                FdbBindingError::from(non_retryable_error),
603                                metrics.get_metrics_data(),
604                            ));
605                        }
606                    }
607                }
608            }
609        }
610    }
611
612    /// Perform a no-op against FDB to check network thread liveness. This operation will not change the underlying data
613    /// in any way, nor will it perform any I/O against the FDB cluster. However, it will schedule some amount of work
614    /// onto the FDB client and wait for it to complete. The FoundationDB client operates by scheduling onto an event
615    /// queue that is then processed by a single thread (the "network thread"). This method can be used to determine if
616    /// the network thread has entered a state where it is no longer processing requests or if its time to process
617    /// requests has increased. If the network thread is busy, this operation may take some amount of time to complete,
618    /// which is why this operation returns a future.
619    pub async fn perform_no_op(&self) -> FdbResult<()> {
620        let trx = self.create_trx()?;
621
622        // Set the read version of the transaction, then read it back. This requires no I/O, but it does
623        // require the network thread be running. The exact value used for the read version is unimportant.
624        trx.set_read_version(42);
625        trx.get_read_version().await?;
626        Ok(())
627    }
628
629    /// Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that the client is saturated.
630    /// By default, this value is updated every second.
631    #[cfg_api_versions(min = 710)]
632    pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
633        let busyness =
634            unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
635        Ok(busyness)
636    }
637}
638pub trait DatabaseTransact: Sized {
639    type Item;
640    type Error: TransactError;
641    type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
642    fn transact(self, trx: Transaction) -> Self::Future;
643}
644
645#[allow(clippy::needless_lifetimes)]
646#[allow(clippy::type_complexity)]
647mod boxed {
648    use super::*;
649
650    async fn boxed_data_fut<'t, F, T, E, D>(
651        mut f: FnMutBoxed<'t, F, D>,
652        trx: Transaction,
653    ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
654    where
655        F: for<'a> FnMut(
656            &'a Transaction,
657            &'a mut D,
658        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
659        E: TransactError,
660    {
661        let r = (f.f)(&trx, &mut f.d).await;
662        (f, trx, r)
663    }
664
665    pub struct FnMutBoxed<'t, F, D> {
666        pub f: F,
667        pub d: D,
668        pub m: PhantomData<&'t ()>,
669    }
670    impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
671    where
672        F: for<'a> FnMut(
673            &'a Transaction,
674            &'a mut D,
675        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
676        F: 't + Send,
677        T: 't,
678        E: 't,
679        D: 't + Send,
680        E: TransactError,
681    {
682        type Item = T;
683        type Error = E;
684        type Future = Pin<
685            Box<
686                dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
687                    + Send
688                    + 't,
689            >,
690        >;
691
692        fn transact(self, trx: Transaction) -> Self::Future {
693            boxed_data_fut(self, trx).boxed()
694        }
695    }
696}
697
698#[allow(clippy::needless_lifetimes)]
699#[allow(clippy::type_complexity)]
700mod boxed_local {
701    use super::*;
702
703    async fn boxed_local_data_fut<'t, F, T, E, D>(
704        mut f: FnMutBoxedLocal<'t, F, D>,
705        trx: Transaction,
706    ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
707    where
708        F: for<'a> FnMut(
709            &'a Transaction,
710            &'a mut D,
711        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
712        E: TransactError,
713    {
714        let r = (f.f)(&trx, &mut f.d).await;
715        (f, trx, r)
716    }
717
718    pub struct FnMutBoxedLocal<'t, F, D> {
719        pub f: F,
720        pub d: D,
721        pub m: PhantomData<&'t ()>,
722    }
723    impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
724    where
725        F: for<'a> FnMut(
726            &'a Transaction,
727            &'a mut D,
728        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
729        F: 't,
730        T: 't,
731        E: 't,
732        D: 't,
733        E: TransactError,
734    {
735        type Item = T;
736        type Error = E;
737        type Future = Pin<
738            Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
739        >;
740
741        fn transact(self, trx: Transaction) -> Self::Future {
742            boxed_local_data_fut(self, trx).boxed_local()
743        }
744    }
745}
746
747/// A trait that must be implemented to use `Database::transact` this application error types.
748pub trait TransactError: From<FdbError> {
749    fn try_into_fdb_error(self) -> Result<FdbError, Self>;
750}
751impl<T> TransactError for T
752where
753    T: From<FdbError> + TryInto<FdbError, Error = T>,
754{
755    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
756        self.try_into()
757    }
758}
759impl TransactError for FdbError {
760    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
761        Ok(self)
762    }
763}
764
765/// A set of options that controls the behavior of `Database::transact`.
766#[derive(Default, Clone)]
767pub struct TransactOption {
768    pub retry_limit: Option<u32>,
769    pub time_out: Option<Duration>,
770    pub is_idempotent: bool,
771}
772
773impl TransactOption {
774    /// An idempotent TransactOption
775    pub fn idempotent() -> Self {
776        Self {
777            is_idempotent: true,
778            ..TransactOption::default()
779        }
780    }
781}