foundationdb/
database.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBDatabase C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#database>
12
13use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use fdb_sys::if_cfg_api_versions;
20use foundationdb_macros::cfg_api_versions;
21use foundationdb_sys as fdb_sys;
22
23use crate::metrics::{MetricsReport, TransactionMetrics};
24use crate::options;
25use crate::transaction::*;
26use crate::{error, FdbError, FdbResult};
27
28use crate::error::FdbBindingError;
29#[cfg_api_versions(min = 710)]
30#[cfg(feature = "tenant-experimental")]
31use crate::tenant::FdbTenant;
32use futures::prelude::*;
33
34/// Wrapper around the boolean representing whether the
35/// previous transaction is still on fly
36/// This wrapper prevents the boolean to be copy and force it
37/// to be moved instead.
38/// This pretty handy when you don't want to see the `Database::run` closure
39/// capturing the environment.
40pub struct MaybeCommitted(bool);
41
42impl From<MaybeCommitted> for bool {
43    fn from(value: MaybeCommitted) -> Self {
44        value.0
45    }
46}
47
48/// Lifecycle hooks for the transaction retry runner.
49///
50/// All methods have default no-op implementations, so callers only override what they need.
51/// This trait enables a single internal retry loop ([`run_with_hooks`]) to serve both
52/// `Database::run()` (no-op hooks) and `Database::instrumented_run()` (metrics hooks).
53pub trait RunnerHooks {
54    /// Called when commit fails, **before** `on_error()` resets the transaction.
55    /// This is the only window to read conflicting keys or inspect error state.
56    ///
57    /// Errors are logged (behind `trace` feature) but do not abort the retry loop.
58    fn on_commit_error(
59        &self,
60        _err: &TransactionCommitError,
61    ) -> impl Future<Output = FdbResult<()>> + Send {
62        async { Ok(()) }
63    }
64
65    /// Called when a closure error triggers a retry.
66    fn on_closure_error(&self, _err: &FdbError) {}
67
68    /// Called after `on_error()` completes with its duration.
69    fn on_error_duration(&self, _duration_ms: u64) {}
70
71    /// Called after successful commit with the committed transaction and commit duration.
72    fn on_commit_success(&self, _committed: &TransactionCommitted, _commit_duration_ms: u64) {}
73
74    /// Called before the next retry iteration (after `on_error` succeeds).
75    fn on_retry(&self) {}
76
77    /// Called when the runner finishes (success or final failure).
78    fn on_complete(&self) {}
79}
80
81/// No-op hooks — zero overhead. Used by [`Database::run()`].
82#[derive(Debug, Clone, Copy, Default)]
83pub struct NoopHooks;
84impl RunnerHooks for NoopHooks {}
85
86/// Metrics-collecting hooks for `Database::instrumented_run()`.
87pub(crate) struct InstrumentedHooks {
88    pub(crate) metrics: TransactionMetrics,
89    pub(crate) start: Instant,
90}
91
92impl RunnerHooks for InstrumentedHooks {
93    async fn on_commit_error(&self, err: &TransactionCommitError) -> FdbResult<()> {
94        // not_committed (1020) = commit conflict
95        if err.code() == 1020 {
96            self.metrics.increment_conflict_count();
97        }
98        // Reading from the \xff\xff/transaction/conflicting_keys/ special keyspace is
99        // resolved client-side — no network round-trip to the cluster. The future still
100        // goes through the FDB network thread event loop, but the data comes from an
101        // in-memory map populated during the commit response. Returns empty if
102        // ReportConflictingKeys was not set.
103        let keys = err.conflicting_keys().await?;
104        if !keys.is_empty() {
105            self.metrics.set_conflicting_keys(keys);
106        }
107        Ok(())
108    }
109
110    fn on_error_duration(&self, duration_ms: u64) {
111        self.metrics.add_error_time(duration_ms);
112    }
113
114    fn on_commit_success(&self, committed: &TransactionCommitted, commit_duration_ms: u64) {
115        self.metrics.record_commit_time(commit_duration_ms);
116        if let Ok(version) = committed.committed_version() {
117            self.metrics.set_commit_version(version);
118        }
119    }
120
121    fn on_retry(&self) {
122        self.metrics.reset_current();
123    }
124
125    fn on_complete(&self) {
126        let total_duration = self.start.elapsed().as_millis() as u64;
127        self.metrics.set_execution_time(total_duration);
128    }
129}
130
131/// Single internal retry loop that all public entrypoints (`run`, `instrumented_run`,
132/// `FdbTenant::run`) delegate to.
133///
134/// # Hook lifecycle per iteration
135///
136/// 1. Execute closure
137/// 2. If closure returns `Err` with an `FdbError`:
138///    - `on_closure_error` → `on_error()` → `on_error_duration` → `on_retry` → loop
139/// 3. If closure succeeds, attempt commit:
140///    - Commit succeeds → `on_commit_success` → return `Ok`
141///    - Commit fails (retryable) → `on_commit_error` → `on_error()` → `on_error_duration` → `on_retry` → loop
142///    - Commit fails (non-retryable) → return `Err`
143#[cfg_attr(
144    feature = "trace",
145    tracing::instrument(level = "debug", skip(initial_transaction, hooks, closure))
146)]
147pub(crate) async fn run_with_hooks<F, Fut, T, H: RunnerHooks>(
148    initial_transaction: RetryableTransaction,
149    hooks: &H,
150    closure: F,
151) -> Result<T, FdbBindingError>
152where
153    F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
154    Fut: Future<Output = Result<T, FdbBindingError>>,
155{
156    let mut maybe_committed = false;
157    let mut transaction = initial_transaction;
158    #[cfg(feature = "trace")]
159    let mut iteration: u64 = 0;
160
161    loop {
162        #[cfg(feature = "trace")]
163        {
164            iteration += 1;
165        }
166
167        let result_closure = closure(transaction.clone(), MaybeCommitted(maybe_committed)).await;
168
169        if let Err(e) = result_closure {
170            if let Some(fdb_err) = e.get_fdb_error() {
171                maybe_committed = fdb_err.is_maybe_committed();
172                hooks.on_closure_error(&fdb_err);
173
174                let now_on_error = Instant::now();
175                match transaction.on_error(fdb_err).await {
176                    Ok(Ok(t)) => {
177                        hooks.on_error_duration(now_on_error.elapsed().as_millis() as u64);
178
179                        #[cfg(feature = "trace")]
180                        {
181                            let error_code = fdb_err.code();
182                            tracing::warn!(iteration, error_code, "restarting transaction");
183                        }
184
185                        hooks.on_retry();
186                        transaction = t;
187                        continue;
188                    }
189                    Ok(Err(non_retryable)) => {
190                        return Err(FdbBindingError::from(non_retryable));
191                    }
192                    Err(binding_err) => {
193                        return Err(binding_err);
194                    }
195                }
196            }
197            return Err(e);
198        }
199
200        #[cfg(feature = "trace")]
201        tracing::info!(iteration, "closure executed, checking result...");
202
203        let now_commit = Instant::now();
204        let commit_result = transaction.commit().await;
205        let commit_duration = now_commit.elapsed().as_millis() as u64;
206
207        match commit_result {
208            Err(err) => {
209                #[cfg(feature = "trace")]
210                tracing::error!(
211                    iteration,
212                    "transaction reference kept, aborting transaction"
213                );
214                return Err(err);
215            }
216            Ok(Ok(committed)) => {
217                hooks.on_commit_success(&committed, commit_duration);
218
219                #[cfg(feature = "trace")]
220                tracing::info!(iteration, "success, returning result");
221
222                return result_closure;
223            }
224            Ok(Err(commit_error)) => {
225                #[cfg(feature = "trace")]
226                let error_code = commit_error.code();
227
228                maybe_committed = commit_error.is_maybe_committed();
229                if let Err(_e) = hooks.on_commit_error(&commit_error).await {
230                    #[cfg(feature = "trace")]
231                    tracing::debug!(error_code = _e.code(), "on_commit_error hook failed");
232                }
233
234                let now_on_error = Instant::now();
235                match commit_error.on_error().await {
236                    Ok(t) => {
237                        hooks.on_error_duration(now_on_error.elapsed().as_millis() as u64);
238
239                        #[cfg(feature = "trace")]
240                        tracing::warn!(iteration, error_code, "restarting transaction");
241
242                        hooks.on_retry();
243                        transaction = RetryableTransaction::new(t);
244                        continue;
245                    }
246                    Err(non_retryable) => {
247                        #[cfg(feature = "trace")]
248                        {
249                            let error_code = non_retryable.code();
250                            tracing::error!(
251                                iteration,
252                                error_code,
253                                "could not commit, non retryable error"
254                            );
255                        }
256
257                        return Err(FdbBindingError::from(non_retryable));
258                    }
259                }
260            }
261        }
262    }
263}
264
265/// Represents a FoundationDB database
266///
267/// A mutable, lexicographically ordered mapping from binary keys to binary values.
268///
269/// Modifications to a database are performed via transactions.
270pub struct Database {
271    pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
272}
273unsafe impl Send for Database {}
274unsafe impl Sync for Database {}
275impl Drop for Database {
276    fn drop(&mut self) {
277        unsafe {
278            fdb_sys::fdb_database_destroy(self.inner.as_ptr());
279        }
280    }
281}
282
283#[cfg_api_versions(min = 610)]
284impl Database {
285    /// Create a database for the given configuration path if any, or the default one.
286    pub fn new(path: Option<&str>) -> FdbResult<Database> {
287        let path_str =
288            path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
289        let path_ptr = path_str
290            .as_ref()
291            .map(|path| path.as_ptr())
292            .unwrap_or(std::ptr::null());
293        let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
294        let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
295        drop(path_str); // path_str own the CString that we are getting the ptr from
296        error::eval(err)?;
297        let ptr =
298            NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
299        // Safe because the database is constructed in this scope and we know it's
300        // a valid pointer.
301        Ok(unsafe { Self::new_from_pointer(ptr) })
302    }
303
304    /// Create a new FDBDatabase from a raw pointer. Users are expected to use the `new` method.
305    ///
306    /// # Safety
307    ///
308    /// The caller must ensure that `ptr` is a valid pointer to an `FDBDatabase` object
309    /// obtained from the FoundationDB C API, and that the pointer is not aliased or used
310    /// after being passed to this function.
311    pub unsafe fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
312        Self { inner: ptr }
313    }
314
315    /// Create a database for the given configuration path
316    pub fn from_path(path: &str) -> FdbResult<Database> {
317        Self::new(Some(path))
318    }
319
320    /// Create a database for the default configuration path
321    #[allow(clippy::should_implement_trait)]
322    pub fn default() -> FdbResult<Database> {
323        Self::new(None)
324    }
325}
326
327#[cfg_api_versions(min = 710)]
328#[cfg(feature = "tenant-experimental")]
329impl Database {
330    pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
331        let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
332        let err = unsafe {
333            fdb_sys::fdb_database_open_tenant(
334                self.inner.as_ptr(),
335                tenant_name.as_ptr(),
336                tenant_name.len().try_into().unwrap(),
337                &mut ptr,
338            )
339        };
340        error::eval(err)?;
341        Ok(FdbTenant {
342            inner: NonNull::new(ptr)
343                .expect("fdb_database_open_tenant to not return null if there is no error"),
344            name: tenant_name.to_owned(),
345        })
346    }
347}
348
349#[cfg_api_versions(min = 730)]
350impl Database {
351    /// Retrieve a client-side status information in a JSON format.
352    pub fn get_client_status(
353        &self,
354    ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin {
355        crate::future::FdbFuture::new(unsafe {
356            fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
357        })
358    }
359}
360
361impl Database {
362    /// Create a database for the given configuration path
363    ///
364    /// This is a compatibility api. If you only use API version ≥ 610 you should
365    /// use `Database::new`, `Database::from_path` or  `Database::default`.
366    pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
367        if_cfg_api_versions!(min = 510, max = 600 => {
368            let cluster = crate::cluster::Cluster::new(path).await?;
369            let database = cluster.create_database().await?;
370            Ok(database)
371        } else {
372            Database::new(path)
373        })
374    }
375
376    /// Called to set an option an on `Database`.
377    pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
378        unsafe { opt.apply(self.inner.as_ptr()) }
379    }
380
381    /// Creates a new transaction on the given database.
382    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
383    pub fn create_trx(&self) -> FdbResult<Transaction> {
384        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
385        let err =
386            unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
387        error::eval(err)?;
388        Ok(Transaction::new(NonNull::new(trx).expect(
389            "fdb_database_create_transaction to not return null if there is no error",
390        )))
391    }
392
393    /// Creates a new transaction on the given database with metrics collection.
394    ///
395    /// This method is similar to `create_trx()` but additionally collects metrics about
396    /// the transaction execution, including operation counts, bytes read/written, and retry counts.
397    ///
398    /// # Arguments
399    /// * `metrics` - A TransactionMetrics instance to collect metrics
400    ///
401    /// # Returns
402    /// * `Result<Transaction, (FdbBindingError, MetricsData)>` - A transaction with metrics collection enabled
403    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
404    pub fn create_instrumented_trx(
405        &self,
406        metrics: TransactionMetrics,
407    ) -> Result<Transaction, FdbBindingError> {
408        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
409        let err =
410            unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
411        error::eval(err)?;
412
413        let inner = NonNull::new(trx)
414            .expect("fdb_database_create_transaction to not return null if there is no error");
415        Ok(Transaction::new_instrumented(inner, metrics))
416    }
417
418    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
419    fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
420        Ok(RetryableTransaction::new(self.create_trx()?))
421    }
422
423    /// Creates a new retryable transaction on the given database with metrics collection.
424    ///
425    /// This method is similar to `create_retryable_trx()` but additionally collects metrics about
426    /// the transaction execution, including operation counts, bytes read/written, and retry counts.
427    ///
428    /// # Arguments
429    /// * `metrics` - A TransactionMetrics instance to collect metrics
430    ///
431    /// # Returns
432    /// * `Result<RetryableTransaction, (FdbBindingError, MetricsData)>` - A retryable transaction with metrics collection enabled
433    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
434    pub fn create_intrumented_retryable_trx(
435        &self,
436        metrics: TransactionMetrics,
437    ) -> Result<RetryableTransaction, FdbBindingError> {
438        Ok(RetryableTransaction::new(
439            self.create_instrumented_trx(metrics.clone())?,
440        ))
441    }
442
443    /// `transact` returns a future which retries on error. It tries to resolve a future created by
444    /// caller-provided function `f` inside a retry loop, providing it with a newly created
445    /// transaction. After caller-provided future resolves, the transaction will be committed
446    /// automatically.
447    ///
448    /// # Warning
449    ///
450    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
451    /// set `TransactionOption::RetryLimit` or `TransactionOption::SetTimeout` on the transaction
452    /// if the task need to be guaranteed to finish.
453    ///
454    /// Once [Generic Associated Types](https://github.com/rust-lang/rfcs/blob/master/text/1598-generic_associated_types.md)
455    /// lands in stable rust, the returned future of f won't need to be boxed anymore, also the
456    /// lifetime limitations around f might be lowered.
457    pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
458    where
459        F: DatabaseTransact,
460    {
461        let is_idempotent = options.is_idempotent;
462        let time_out = options.time_out.map(|d| Instant::now() + d);
463        let retry_limit = options.retry_limit;
464        let mut tries: u32 = 0;
465        let mut trx = self.create_trx()?;
466        let mut can_retry = move || {
467            tries += 1;
468            retry_limit.map(|limit| tries < limit).unwrap_or(true)
469                && time_out.map(|t| Instant::now() < t).unwrap_or(true)
470        };
471        loop {
472            let r = f.transact(trx).await;
473            f = r.0;
474            trx = r.1;
475            trx = match r.2 {
476                Ok(item) => match trx.commit().await {
477                    Ok(_) => break Ok(item),
478                    Err(e) => {
479                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
480                            e.on_error().await?
481                        } else {
482                            break Err(F::Error::from(e.into()));
483                        }
484                    }
485                },
486                Err(user_err) => match user_err.try_into_fdb_error() {
487                    Ok(e) => {
488                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
489                            trx.on_error(e).await?
490                        } else {
491                            break Err(F::Error::from(e));
492                        }
493                    }
494                    Err(user_err) => break Err(user_err),
495                },
496            };
497        }
498    }
499
500    pub fn transact_boxed<'trx, F, D, T, E>(
501        &'trx self,
502        data: D,
503        f: F,
504        options: TransactOption,
505    ) -> impl Future<Output = Result<T, E>> + Send + 'trx
506    where
507        for<'a> F: FnMut(
508            &'a Transaction,
509            &'a mut D,
510        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
511        E: TransactError,
512        F: Send + 'trx,
513        T: Send + 'trx,
514        E: Send + 'trx,
515        D: Send + 'trx,
516    {
517        self.transact(
518            boxed::FnMutBoxed {
519                f,
520                d: data,
521                m: PhantomData,
522            },
523            options,
524        )
525    }
526
527    pub fn transact_boxed_local<'trx, F, D, T, E>(
528        &'trx self,
529        data: D,
530        f: F,
531        options: TransactOption,
532    ) -> impl Future<Output = Result<T, E>> + 'trx
533    where
534        for<'a> F:
535            FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
536        E: TransactError,
537        F: 'trx,
538        T: 'trx,
539        E: 'trx,
540        D: 'trx,
541    {
542        self.transact(
543            boxed_local::FnMutBoxedLocal {
544                f,
545                d: data,
546                m: PhantomData,
547            },
548            options,
549        )
550    }
551
552    /// Runs a transactional function against this Database with retry logic.
553    /// The associated closure will be called until a non-retryable FDBError
554    /// is thrown or commit(), returns success.
555    ///
556    /// Users are **not** expected to keep reference to the `RetryableTransaction`. If a weak or strong
557    /// reference is kept by the user, the binding will throw an error.
558    ///
559    /// # Warning: retry
560    ///
561    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
562    /// set [`options::TransactionOption::RetryLimit`] or [`options::TransactionOption::Timeout`] on the transaction
563    /// if the task needs to be guaranteed to finish. These options can be safely set on every iteration of the closure.
564    ///
565    /// # Warning: Maybe committed transactions
566    ///
567    /// As with other client/server databases, in some failure scenarios a client may be unable to determine
568    /// whether a transaction succeeded. You should make sure your closure is idempotent.
569    ///
570    /// The closure will notify the user in case of a maybe_committed transaction in a previous run
571    ///  with the `MaybeCommitted` provided in the closure.
572    ///
573    /// This one can be used as boolean with
574    /// ```ignore
575    /// db.run(|trx, maybe_committed| async {
576    ///     if maybe_committed.into() {
577    ///         // Handle the problem if needed
578    ///     }
579    /// }).await;
580    ///```
581    #[cfg_attr(
582        feature = "trace",
583        tracing::instrument(level = "debug", skip(self, closure))
584    )]
585    pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
586    where
587        F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
588        Fut: Future<Output = Result<T, FdbBindingError>>,
589    {
590        let transaction = self.create_retryable_trx()?;
591        run_with_hooks(transaction, &NoopHooks, closure).await
592    }
593
594    /// Runs a transactional function against this Database with retry logic and custom hooks.
595    ///
596    /// This is the most flexible entrypoint — implement [`RunnerHooks`] to observe
597    /// or react to each phase of the retry loop (commit errors, retries, success).
598    ///
599    /// See [`RunnerHooks`] for the hook lifecycle documentation.
600    #[cfg_attr(
601        feature = "trace",
602        tracing::instrument(level = "debug", skip(self, hooks, closure))
603    )]
604    pub async fn run_with_hooks<F, Fut, T, H: RunnerHooks>(
605        &self,
606        hooks: &H,
607        closure: F,
608    ) -> Result<T, FdbBindingError>
609    where
610        F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
611        Fut: Future<Output = Result<T, FdbBindingError>>,
612    {
613        let transaction = self.create_retryable_trx()?;
614        run_with_hooks(transaction, hooks, closure).await
615    }
616
617    /// Runs a transactional function against this Database with retry logic and metrics collection.
618    /// The associated closure will be called until a non-retryable FDBError
619    /// is thrown or commit() returns success.
620    ///
621    /// This method is similar to `run()` but additionally collects and returns metrics about
622    /// the transaction execution, including operation counts, bytes read/written, and retry counts.
623    ///
624    /// # Arguments
625    /// * `closure` - A function that takes a RetryableTransaction and MaybeCommitted flag and returns a Future
626    ///
627    /// # Returns
628    /// * `Result<(T, Metrics), (FdbBindingError, Metrics)>` - On success, returns the result of the transaction and collected metrics.
629    ///   On failure, returns the error and the metrics collected up to the point of failure.
630    ///
631    /// # Warning: retry
632    ///
633    /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
634    /// set [`options::TransactionOption::RetryLimit`] or [`options::TransactionOption::Timeout`] on the transaction
635    /// if the task needs to be guaranteed to finish.
636    ///
637    /// # Warning: Maybe committed transactions
638    ///
639    /// As with other client/server databases, in some failure scenarios a client may be unable to determine
640    /// whether a transaction succeeded. The closure will be notified of a maybe_committed transaction
641    /// in a previous run with the `MaybeCommitted` provided in the closure.
642    #[cfg_attr(
643        feature = "trace",
644        tracing::instrument(level = "debug", skip(self, closure))
645    )]
646    pub async fn instrumented_run<F, Fut, T>(
647        &self,
648        closure: F,
649    ) -> Result<(T, MetricsReport), (FdbBindingError, MetricsReport)>
650    where
651        F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
652        Fut: Future<Output = Result<T, FdbBindingError>>,
653    {
654        let metrics = TransactionMetrics::new();
655        let hooks = InstrumentedHooks {
656            metrics: metrics.clone(),
657            start: Instant::now(),
658        };
659        let transaction = match self.create_intrumented_retryable_trx(metrics.clone()) {
660            Ok(trx) => trx,
661            Err(err) => {
662                hooks.on_complete();
663                return Err((err, metrics.get_metrics_data()));
664            }
665        };
666
667        match run_with_hooks(transaction, &hooks, closure).await {
668            Ok(val) => {
669                hooks.on_complete();
670                Ok((val, metrics.get_metrics_data()))
671            }
672            Err(err) => {
673                hooks.on_complete();
674                Err((err, metrics.get_metrics_data()))
675            }
676        }
677    }
678
679    /// Perform a no-op against FDB to check network thread liveness. This operation will not change the underlying data
680    /// in any way, nor will it perform any I/O against the FDB cluster. However, it will schedule some amount of work
681    /// onto the FDB client and wait for it to complete. The FoundationDB client operates by scheduling onto an event
682    /// queue that is then processed by a single thread (the "network thread"). This method can be used to determine if
683    /// the network thread has entered a state where it is no longer processing requests or if its time to process
684    /// requests has increased. If the network thread is busy, this operation may take some amount of time to complete,
685    /// which is why this operation returns a future.
686    pub async fn perform_no_op(&self) -> FdbResult<()> {
687        let trx = self.create_trx()?;
688
689        // Set the read version of the transaction, then read it back. This requires no I/O, but it does
690        // require the network thread be running. The exact value used for the read version is unimportant.
691        trx.set_read_version(42);
692        trx.get_read_version().await?;
693        Ok(())
694    }
695
696    /// Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that the client is saturated.
697    /// By default, this value is updated every second.
698    #[cfg_api_versions(min = 710)]
699    pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
700        let busyness =
701            unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
702        Ok(busyness)
703    }
704}
705pub trait DatabaseTransact: Sized {
706    type Item;
707    type Error: TransactError;
708    type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
709    fn transact(self, trx: Transaction) -> Self::Future;
710}
711
712#[allow(clippy::needless_lifetimes)]
713#[allow(clippy::type_complexity)]
714mod boxed {
715    use super::*;
716
717    async fn boxed_data_fut<'t, F, T, E, D>(
718        mut f: FnMutBoxed<'t, F, D>,
719        trx: Transaction,
720    ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
721    where
722        F: for<'a> FnMut(
723            &'a Transaction,
724            &'a mut D,
725        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
726        E: TransactError,
727    {
728        let r = (f.f)(&trx, &mut f.d).await;
729        (f, trx, r)
730    }
731
732    pub struct FnMutBoxed<'t, F, D> {
733        pub f: F,
734        pub d: D,
735        pub m: PhantomData<&'t ()>,
736    }
737    impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
738    where
739        F: for<'a> FnMut(
740            &'a Transaction,
741            &'a mut D,
742        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
743        F: 't + Send,
744        T: 't,
745        E: 't,
746        D: 't + Send,
747        E: TransactError,
748    {
749        type Item = T;
750        type Error = E;
751        type Future = Pin<
752            Box<
753                dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
754                    + Send
755                    + 't,
756            >,
757        >;
758
759        fn transact(self, trx: Transaction) -> Self::Future {
760            boxed_data_fut(self, trx).boxed()
761        }
762    }
763}
764
765#[allow(clippy::needless_lifetimes)]
766#[allow(clippy::type_complexity)]
767mod boxed_local {
768    use super::*;
769
770    async fn boxed_local_data_fut<'t, F, T, E, D>(
771        mut f: FnMutBoxedLocal<'t, F, D>,
772        trx: Transaction,
773    ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
774    where
775        F: for<'a> FnMut(
776            &'a Transaction,
777            &'a mut D,
778        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
779        E: TransactError,
780    {
781        let r = (f.f)(&trx, &mut f.d).await;
782        (f, trx, r)
783    }
784
785    pub struct FnMutBoxedLocal<'t, F, D> {
786        pub f: F,
787        pub d: D,
788        pub m: PhantomData<&'t ()>,
789    }
790    impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
791    where
792        F: for<'a> FnMut(
793            &'a Transaction,
794            &'a mut D,
795        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
796        F: 't,
797        T: 't,
798        E: 't,
799        D: 't,
800        E: TransactError,
801    {
802        type Item = T;
803        type Error = E;
804        type Future = Pin<
805            Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
806        >;
807
808        fn transact(self, trx: Transaction) -> Self::Future {
809            boxed_local_data_fut(self, trx).boxed_local()
810        }
811    }
812}
813
814/// A trait that must be implemented to use `Database::transact` this application error types.
815pub trait TransactError: From<FdbError> {
816    fn try_into_fdb_error(self) -> Result<FdbError, Self>;
817}
818impl<T> TransactError for T
819where
820    T: From<FdbError> + TryInto<FdbError, Error = T>,
821{
822    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
823        self.try_into()
824    }
825}
826impl TransactError for FdbError {
827    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
828        Ok(self)
829    }
830}
831
832/// A set of options that controls the behavior of `Database::transact`.
833#[derive(Default, Clone)]
834pub struct TransactOption {
835    pub retry_limit: Option<u32>,
836    pub time_out: Option<Duration>,
837    pub is_idempotent: bool,
838}
839
840impl TransactOption {
841    /// An idempotent TransactOption
842    pub fn idempotent() -> Self {
843        Self {
844            is_idempotent: true,
845            ..TransactOption::default()
846        }
847    }
848}