foundationdb/
database.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBDatabase C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#database>
12
13use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use fdb_sys::if_cfg_api_versions;
20use foundationdb_macros::cfg_api_versions;
21use foundationdb_sys as fdb_sys;
22
23use crate::metrics::{MetricsReport, TransactionMetrics};
24use crate::options;
25use crate::transaction::*;
26use crate::{error, FdbError, FdbResult};
27
28use crate::error::FdbBindingError;
29#[cfg_api_versions(min = 710)]
30#[cfg(feature = "tenant-experimental")]
31use crate::tenant::FdbTenant;
32use futures::prelude::*;
33
34/// Wrapper around the boolean representing whether the
35/// previous transaction is still on fly
36/// This wrapper prevents the boolean to be copy and force it
37/// to be moved instead.
38/// This pretty handy when you don't want to see the `Database::run` closure
39/// capturing the environment.
40pub struct MaybeCommitted(bool);
41
42impl From<MaybeCommitted> for bool {
43    fn from(value: MaybeCommitted) -> Self {
44        value.0
45    }
46}
47
48/// Represents a FoundationDB database
49///
50/// A mutable, lexicographically ordered mapping from binary keys to binary values.
51///
52/// Modifications to a database are performed via transactions.
53pub struct Database {
54    pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
55}
56unsafe impl Send for Database {}
57unsafe impl Sync for Database {}
58impl Drop for Database {
59    fn drop(&mut self) {
60        unsafe {
61            fdb_sys::fdb_database_destroy(self.inner.as_ptr());
62        }
63    }
64}
65
66#[cfg_api_versions(min = 610)]
67impl Database {
68    /// Create a database for the given configuration path if any, or the default one.
69    pub fn new(path: Option<&str>) -> FdbResult<Database> {
70        let path_str =
71            path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
72        let path_ptr = path_str
73            .as_ref()
74            .map(|path| path.as_ptr())
75            .unwrap_or(std::ptr::null());
76        let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
77        let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
78        drop(path_str); // path_str own the CString that we are getting the ptr from
79        error::eval(err)?;
80        let ptr =
81            NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
82        // Safe because the database is constructed in this scope and we know it's
83        // a valid pointer.
84        Ok(unsafe { Self::new_from_pointer(ptr) })
85    }
86
87    /// Create a new FDBDatabase from a raw pointer. Users are expected to use the `new` method.
88    pub unsafe fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
89        Self { inner: ptr }
90    }
91
92    /// Create a database for the given configuration path
93    pub fn from_path(path: &str) -> FdbResult<Database> {
94        Self::new(Some(path))
95    }
96
97    /// Create a database for the default configuration path
98    #[allow(clippy::should_implement_trait)]
99    pub fn default() -> FdbResult<Database> {
100        Self::new(None)
101    }
102}
103
104#[cfg_api_versions(min = 710)]
105#[cfg(feature = "tenant-experimental")]
106impl Database {
107    pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
108        let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
109        let err = unsafe {
110            fdb_sys::fdb_database_open_tenant(
111                self.inner.as_ptr(),
112                tenant_name.as_ptr(),
113                tenant_name.len().try_into().unwrap(),
114                &mut ptr,
115            )
116        };
117        error::eval(err)?;
118        Ok(FdbTenant {
119            inner: NonNull::new(ptr)
120                .expect("fdb_database_open_tenant to not return null if there is no error"),
121            name: tenant_name.to_owned(),
122        })
123    }
124}
125
126#[cfg_api_versions(min = 730)]
127impl Database {
128    /// Retrieve a client-side status information in a JSON format.
129    pub fn get_client_status(
130        &self,
131    ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin {
132        crate::future::FdbFuture::new(unsafe {
133            fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
134        })
135    }
136}
137
138impl Database {
139    /// Create a database for the given configuration path
140    ///
141    /// This is a compatibility api. If you only use API version ≥ 610 you should
142    /// use `Database::new`, `Database::from_path` or  `Database::default`.
143    pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
144        if_cfg_api_versions!(min = 510, max = 600 => {
145            let cluster = crate::cluster::Cluster::new(path).await?;
146            let database = cluster.create_database().await?;
147            Ok(database)
148        } else {
149            Database::new(path)
150        })
151    }
152
153    /// Called to set an option an on `Database`.
154    pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
155        unsafe { opt.apply(self.inner.as_ptr()) }
156    }
157
158    /// Creates a new transaction on the given database.
159    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
160    pub fn create_trx(&self) -> FdbResult<Transaction> {
161        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
162        let err =
163            unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
164        error::eval(err)?;
165        Ok(Transaction::new(NonNull::new(trx).expect(
166            "fdb_database_create_transaction to not return null if there is no error",
167        )))
168    }
169
170    /// Creates a new transaction on the given database with metrics collection.
171    ///
172    /// This method is similar to `create_trx()` but additionally collects metrics about
173    /// the transaction execution, including operation counts, bytes read/written, and retry counts.
174    ///
175    /// # Arguments
176    /// * `metrics` - A TransactionMetrics instance to collect metrics
177    ///
178    /// # Returns
179    /// * `Result<Transaction, (FdbBindingError, MetricsData)>` - A transaction with metrics collection enabled
180    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
181    pub fn create_instrumented_trx(
182        &self,
183        metrics: TransactionMetrics,
184    ) -> Result<Transaction, FdbBindingError> {
185        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
186        let err =
187            unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
188        error::eval(err)?;
189
190        let inner = NonNull::new(trx)
191            .expect("fdb_database_create_transaction to not return null if there is no error");
192        Ok(Transaction::new_instrumented(inner, metrics))
193    }
194
195    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
196    fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
197        Ok(RetryableTransaction::new(self.create_trx()?))
198    }
199
200    /// Creates a new retryable transaction on the given database with metrics collection.
201    ///
202    /// This method is similar to `create_retryable_trx()` but additionally collects metrics about
203    /// the transaction execution, including operation counts, bytes read/written, and retry counts.
204    ///
205    /// # Arguments
206    /// * `metrics` - A TransactionMetrics instance to collect metrics
207    ///
208    /// # Returns
209    /// * `Result<RetryableTransaction, (FdbBindingError, MetricsData)>` - A retryable transaction with metrics collection enabled
210    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
211    pub fn create_intrumented_retryable_trx(
212        &self,
213        metrics: TransactionMetrics,
214    ) -> Result<RetryableTransaction, FdbBindingError> {
215        Ok(RetryableTransaction::new(
216            self.create_instrumented_trx(metrics.clone())?,
217        ))
218    }
219
220    /// `transact` returns a future which retries on error. It tries to resolve a future created by
221    /// caller-provided function `f` inside a retry loop, providing it with a newly created
222    /// transaction. After caller-provided future resolves, the transaction will be committed
223    /// automatically.
224    ///
225    /// # Warning
226    ///
227    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
228    /// set `TransactionOption::RetryLimit` or `TransactionOption::SetTimeout` on the transaction
229    /// if the task need to be guaranteed to finish.
230    ///
231    /// Once [Generic Associated Types](https://github.com/rust-lang/rfcs/blob/master/text/1598-generic_associated_types.md)
232    /// lands in stable rust, the returned future of f won't need to be boxed anymore, also the
233    /// lifetime limitations around f might be lowered.
234    pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
235    where
236        F: DatabaseTransact,
237    {
238        let is_idempotent = options.is_idempotent;
239        let time_out = options.time_out.map(|d| Instant::now() + d);
240        let retry_limit = options.retry_limit;
241        let mut tries: u32 = 0;
242        let mut trx = self.create_trx()?;
243        let mut can_retry = move || {
244            tries += 1;
245            retry_limit.map(|limit| tries < limit).unwrap_or(true)
246                && time_out.map(|t| Instant::now() < t).unwrap_or(true)
247        };
248        loop {
249            let r = f.transact(trx).await;
250            f = r.0;
251            trx = r.1;
252            trx = match r.2 {
253                Ok(item) => match trx.commit().await {
254                    Ok(_) => break Ok(item),
255                    Err(e) => {
256                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
257                            e.on_error().await?
258                        } else {
259                            break Err(F::Error::from(e.into()));
260                        }
261                    }
262                },
263                Err(user_err) => match user_err.try_into_fdb_error() {
264                    Ok(e) => {
265                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
266                            trx.on_error(e).await?
267                        } else {
268                            break Err(F::Error::from(e));
269                        }
270                    }
271                    Err(user_err) => break Err(user_err),
272                },
273            };
274        }
275    }
276
277    pub fn transact_boxed<'trx, F, D, T, E>(
278        &'trx self,
279        data: D,
280        f: F,
281        options: TransactOption,
282    ) -> impl Future<Output = Result<T, E>> + Send + 'trx
283    where
284        for<'a> F: FnMut(
285            &'a Transaction,
286            &'a mut D,
287        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
288        E: TransactError,
289        F: Send + 'trx,
290        T: Send + 'trx,
291        E: Send + 'trx,
292        D: Send + 'trx,
293    {
294        self.transact(
295            boxed::FnMutBoxed {
296                f,
297                d: data,
298                m: PhantomData,
299            },
300            options,
301        )
302    }
303
304    pub fn transact_boxed_local<'trx, F, D, T, E>(
305        &'trx self,
306        data: D,
307        f: F,
308        options: TransactOption,
309    ) -> impl Future<Output = Result<T, E>> + 'trx
310    where
311        for<'a> F:
312            FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
313        E: TransactError,
314        F: 'trx,
315        T: 'trx,
316        E: 'trx,
317        D: 'trx,
318    {
319        self.transact(
320            boxed_local::FnMutBoxedLocal {
321                f,
322                d: data,
323                m: PhantomData,
324            },
325            options,
326        )
327    }
328
329    /// Runs a transactional function against this Database with retry logic.
330    /// The associated closure will be called until a non-retryable FDBError
331    /// is thrown or commit(), returns success.
332    ///
333    /// Users are **not** expected to keep reference to the `RetryableTransaction`. If a weak or strong
334    /// reference is kept by the user, the binding will throw an error.
335    ///
336    /// # Warning: retry
337    ///
338    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
339    /// set [`options::TransactionOption::RetryLimit`] or [`options::TransactionOption::Timeout`] on the transaction
340    /// if the task needs to be guaranteed to finish. These options can be safely set on every iteration of the closure.
341    ///
342    /// # Warning: Maybe committed transactions
343    ///
344    /// As with other client/server databases, in some failure scenarios a client may be unable to determine
345    /// whether a transaction succeeded. You should make sure your closure is idempotent.
346    ///
347    /// The closure will notify the user in case of a maybe_committed transaction in a previous run
348    ///  with the `MaybeCommitted` provided in the closure.
349    ///
350    /// This one can be used as boolean with
351    /// ```ignore
352    /// db.run(|trx, maybe_committed| async {
353    ///     if maybe_committed.into() {
354    ///         // Handle the problem if needed
355    ///     }
356    /// }).await;
357    ///```
358    #[cfg_attr(
359        feature = "trace",
360        tracing::instrument(level = "debug", skip(self, closure))
361    )]
362    pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
363    where
364        F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
365        Fut: Future<Output = Result<T, FdbBindingError>>,
366    {
367        let mut maybe_committed_transaction = false;
368        // we just need to create the transaction once,
369        // in case there is an error; it will be reset automatically
370        let mut transaction = self.create_retryable_trx()?;
371        #[cfg(feature = "trace")]
372        let mut iteration = 0;
373
374        loop {
375            #[cfg(feature = "trace")]
376            {
377                iteration += 1;
378            }
379            // executing the closure
380            let result_closure = closure(
381                transaction.clone(),
382                MaybeCommitted(maybe_committed_transaction),
383            )
384            .await;
385
386            if let Err(e) = result_closure {
387                // checks if it is an FdbError
388                if let Some(e) = e.get_fdb_error() {
389                    maybe_committed_transaction = e.is_maybe_committed();
390                    // The closure returned an Error,
391                    match transaction.on_error(e).await {
392                        // we can retry the error
393                        Ok(Ok(t)) => {
394                            #[cfg(feature = "trace")]
395                            {
396                                let error_code = e.code();
397                                tracing::warn!(iteration, error_code, "restarting transaction");
398                            }
399
400                            transaction = t;
401                            continue;
402                        }
403                        Ok(Err(non_retryable_error)) => {
404                            return Err(FdbBindingError::from(non_retryable_error))
405                        }
406                        // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
407                        Err(non_retryable_error) => return Err(non_retryable_error),
408                    }
409                }
410                // Otherwise, it cannot be retried
411                return Err(e);
412            }
413
414            #[cfg(feature = "trace")]
415            tracing::info!(iteration, "closure executed, checking result...");
416
417            let commit_result = transaction.commit().await;
418
419            match commit_result {
420                // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
421                Err(err) => {
422                    #[cfg(feature = "trace")]
423                    tracing::error!(
424                        iteration,
425                        "transaction reference kept, aborting transaction"
426                    );
427                    return Err(err);
428                }
429                Ok(Ok(_)) => {
430                    #[cfg(feature = "trace")]
431                    tracing::info!(iteration, "success, returning result");
432                    return result_closure;
433                }
434                Ok(Err(transaction_commit_error)) => {
435                    #[cfg(feature = "trace")]
436                    let error_code = transaction_commit_error.code();
437
438                    maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
439                    // we have an error during commit, checking if it is a retryable error
440                    match transaction_commit_error.on_error().await {
441                        Ok(t) => {
442                            #[cfg(feature = "trace")]
443                            tracing::warn!(iteration, error_code, "restarting transaction");
444
445                            transaction = RetryableTransaction::new(t);
446                            continue;
447                        }
448                        Err(non_retryable_error) => {
449                            return Err(FdbBindingError::from(non_retryable_error))
450                        }
451                    }
452                }
453            }
454        }
455    }
456
457    /// Runs a transactional function against this Database with retry logic and metrics collection.
458    /// The associated closure will be called until a non-retryable FDBError
459    /// is thrown or commit() returns success.
460    ///
461    /// This method is similar to `run()` but additionally collects and returns metrics about
462    /// the transaction execution, including operation counts, bytes read/written, and retry counts.
463    ///
464    /// # Arguments
465    /// * `closure` - A function that takes a RetryableTransaction and MaybeCommitted flag and returns a Future
466    ///
467    /// # Returns
468    /// * `Result<(T, Metrics), (FdbBindingError, Metrics)>` - On success, returns the result of the transaction and collected metrics.
469    ///   On failure, returns the error and the metrics collected up to the point of failure.
470    ///
471    /// # Warning: retry
472    ///
473    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
474    /// set [`options::TransactionOption::RetryLimit`] or [`options::TransactionOption::Timeout`] on the transaction
475    /// if the task needs to be guaranteed to finish.
476    ///
477    /// # Warning: Maybe committed transactions
478    ///
479    /// As with other client/server databases, in some failure scenarios a client may be unable to determine
480    /// whether a transaction succeeded. The closure will be notified of a maybe_committed transaction
481    /// in a previous run with the `MaybeCommitted` provided in the closure.
482    #[cfg_attr(
483        feature = "trace",
484        tracing::instrument(level = "debug", skip(self, closure))
485    )]
486    pub async fn instrumented_run<F, Fut, T>(
487        &self,
488        closure: F,
489    ) -> Result<(T, MetricsReport), (FdbBindingError, MetricsReport)>
490    where
491        F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
492        Fut: Future<Output = Result<T, FdbBindingError>>,
493    {
494        let now_start = std::time::Instant::now();
495        let metrics = TransactionMetrics::new();
496        let mut maybe_committed_transaction = false;
497
498        // we just need to create the transaction once,
499        // in case there is a error, it will be reset automatically
500        let mut transaction = match self.create_intrumented_retryable_trx(metrics.clone()) {
501            Ok(trx) => trx,
502            Err(err) => {
503                // Update total execution time before returning
504                let total_duration = now_start.elapsed().as_millis() as u64;
505                metrics.set_execution_time(total_duration);
506                return Err((err, metrics.get_metrics_data()));
507            }
508        };
509
510        loop {
511            // executing the closure
512            let result_closure = closure(
513                transaction.clone(),
514                MaybeCommitted(maybe_committed_transaction),
515            )
516            .await;
517
518            if let Err(error) = result_closure {
519                if let Some(e) = error.get_fdb_error() {
520                    // checks if it is an FdbError
521                    maybe_committed_transaction = e.is_maybe_committed();
522                    // The closure returned an Error,
523                    let now_on_error = std::time::Instant::now();
524                    let on_error_result = transaction.on_error(e).await;
525                    let error_duration = now_on_error.elapsed().as_millis() as u64;
526                    metrics.add_error_time(error_duration);
527
528                    match on_error_result {
529                        // we can retry the error
530                        Ok(Ok(t)) => {
531                            transaction = t;
532                            // Use the original metrics instance to increment retry count
533                            metrics.reset_current();
534                            continue;
535                        }
536                        Ok(Err(non_retryable_error)) => {
537                            let total_duration = now_start.elapsed().as_millis() as u64;
538                            metrics.set_execution_time(total_duration);
539                            return Err((
540                                FdbBindingError::from(non_retryable_error),
541                                metrics.get_metrics_data(),
542                            ));
543                        }
544                        // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
545                        Err(non_retryable_error) => {
546                            let total_duration = now_start.elapsed().as_millis() as u64;
547                            metrics.set_execution_time(total_duration);
548                            return Err((non_retryable_error, metrics.get_metrics_data()));
549                        }
550                    }
551                }
552                // Otherwise, it cannot be retried
553                let total_duration = now_start.elapsed().as_millis() as u64;
554                metrics.set_execution_time(total_duration);
555                return Err((error, metrics.get_metrics_data()));
556            }
557
558            let now_commit = std::time::Instant::now();
559            let commit_result = transaction.commit().await;
560            let commit_duration = now_commit.elapsed().as_millis() as u64;
561            metrics.record_commit_time(commit_duration);
562
563            match commit_result {
564                // The only FdbBindingError that can be thrown here is `ReferenceToTransactionKept`
565                Err(err) => {
566                    let total_duration = now_start.elapsed().as_millis() as u64;
567                    metrics.set_execution_time(total_duration);
568                    return Err((err, metrics.get_metrics_data()));
569                }
570                Ok(Ok(committed)) => {
571                    // Handle committed_version() result properly to match our tuple-based error handling
572                    match committed.committed_version() {
573                        Ok(version) => metrics.set_commit_version(version),
574                        Err(_err) => {
575                            // If we can't get the commit version, we still want to return the result
576                            // but we'll log the error or handle it as needed
577                            // For now, we just continue without setting the commit version
578                        }
579                    }
580
581                    let total_duration = now_start.elapsed().as_millis() as u64;
582                    metrics.set_execution_time(total_duration);
583                    return Ok((result_closure.unwrap(), metrics.get_metrics_data()));
584                }
585                Ok(Err(transaction_commit_error)) => {
586                    maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
587                    // we have an error during commit, checking if it is a retryable error
588                    let now_on_error = std::time::Instant::now();
589                    let on_error_result = transaction_commit_error.on_error().await;
590                    let error_duration = now_on_error.elapsed().as_millis() as u64;
591                    metrics.add_error_time(error_duration);
592
593                    match on_error_result {
594                        Ok(t) => {
595                            transaction = RetryableTransaction::new(t);
596                            // Use the original metrics instance for commit errors too
597                            metrics.reset_current();
598                            continue;
599                        }
600                        Err(non_retryable_error) => {
601                            let total_duration = now_start.elapsed().as_millis() as u64;
602                            metrics.set_execution_time(total_duration);
603                            return Err((
604                                FdbBindingError::from(non_retryable_error),
605                                metrics.get_metrics_data(),
606                            ));
607                        }
608                    }
609                }
610            }
611        }
612    }
613
614    /// Perform a no-op against FDB to check network thread liveness. This operation will not change the underlying data
615    /// in any way, nor will it perform any I/O against the FDB cluster. However, it will schedule some amount of work
616    /// onto the FDB client and wait for it to complete. The FoundationDB client operates by scheduling onto an event
617    /// queue that is then processed by a single thread (the "network thread"). This method can be used to determine if
618    /// the network thread has entered a state where it is no longer processing requests or if its time to process
619    /// requests has increased. If the network thread is busy, this operation may take some amount of time to complete,
620    /// which is why this operation returns a future.
621    pub async fn perform_no_op(&self) -> FdbResult<()> {
622        let trx = self.create_trx()?;
623
624        // Set the read version of the transaction, then read it back. This requires no I/O, but it does
625        // require the network thread be running. The exact value used for the read version is unimportant.
626        trx.set_read_version(42);
627        trx.get_read_version().await?;
628        Ok(())
629    }
630
631    /// Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that the client is saturated.
632    /// By default, this value is updated every second.
633    #[cfg_api_versions(min = 710)]
634    pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
635        let busyness =
636            unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
637        Ok(busyness)
638    }
639}
640pub trait DatabaseTransact: Sized {
641    type Item;
642    type Error: TransactError;
643    type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
644    fn transact(self, trx: Transaction) -> Self::Future;
645}
646
647#[allow(clippy::needless_lifetimes)]
648#[allow(clippy::type_complexity)]
649mod boxed {
650    use super::*;
651
652    async fn boxed_data_fut<'t, F, T, E, D>(
653        mut f: FnMutBoxed<'t, F, D>,
654        trx: Transaction,
655    ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
656    where
657        F: for<'a> FnMut(
658            &'a Transaction,
659            &'a mut D,
660        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
661        E: TransactError,
662    {
663        let r = (f.f)(&trx, &mut f.d).await;
664        (f, trx, r)
665    }
666
667    pub struct FnMutBoxed<'t, F, D> {
668        pub f: F,
669        pub d: D,
670        pub m: PhantomData<&'t ()>,
671    }
672    impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
673    where
674        F: for<'a> FnMut(
675            &'a Transaction,
676            &'a mut D,
677        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
678        F: 't + Send,
679        T: 't,
680        E: 't,
681        D: 't + Send,
682        E: TransactError,
683    {
684        type Item = T;
685        type Error = E;
686        type Future = Pin<
687            Box<
688                dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
689                    + Send
690                    + 't,
691            >,
692        >;
693
694        fn transact(self, trx: Transaction) -> Self::Future {
695            boxed_data_fut(self, trx).boxed()
696        }
697    }
698}
699
700#[allow(clippy::needless_lifetimes)]
701#[allow(clippy::type_complexity)]
702mod boxed_local {
703    use super::*;
704
705    async fn boxed_local_data_fut<'t, F, T, E, D>(
706        mut f: FnMutBoxedLocal<'t, F, D>,
707        trx: Transaction,
708    ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
709    where
710        F: for<'a> FnMut(
711            &'a Transaction,
712            &'a mut D,
713        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
714        E: TransactError,
715    {
716        let r = (f.f)(&trx, &mut f.d).await;
717        (f, trx, r)
718    }
719
720    pub struct FnMutBoxedLocal<'t, F, D> {
721        pub f: F,
722        pub d: D,
723        pub m: PhantomData<&'t ()>,
724    }
725    impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
726    where
727        F: for<'a> FnMut(
728            &'a Transaction,
729            &'a mut D,
730        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
731        F: 't,
732        T: 't,
733        E: 't,
734        D: 't,
735        E: TransactError,
736    {
737        type Item = T;
738        type Error = E;
739        type Future = Pin<
740            Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
741        >;
742
743        fn transact(self, trx: Transaction) -> Self::Future {
744            boxed_local_data_fut(self, trx).boxed_local()
745        }
746    }
747}
748
749/// A trait that must be implemented to use `Database::transact` this application error types.
750pub trait TransactError: From<FdbError> {
751    fn try_into_fdb_error(self) -> Result<FdbError, Self>;
752}
753impl<T> TransactError for T
754where
755    T: From<FdbError> + TryInto<FdbError, Error = T>,
756{
757    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
758        self.try_into()
759    }
760}
761impl TransactError for FdbError {
762    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
763        Ok(self)
764    }
765}
766
767/// A set of options that controls the behavior of `Database::transact`.
768#[derive(Default, Clone)]
769pub struct TransactOption {
770    pub retry_limit: Option<u32>,
771    pub time_out: Option<Duration>,
772    pub is_idempotent: bool,
773}
774
775impl TransactOption {
776    /// An idempotent TransactOption
777    pub fn idempotent() -> Self {
778        Self {
779            is_idempotent: true,
780            ..TransactOption::default()
781        }
782    }
783}