foundationdb/database.rs
1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBDatabase C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#database>
12
13use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use fdb_sys::if_cfg_api_versions;
20use foundationdb_macros::cfg_api_versions;
21use foundationdb_sys as fdb_sys;
22
23use crate::metrics::{MetricsReport, TransactionMetrics};
24use crate::options;
25use crate::transaction::*;
26use crate::{FdbError, FdbResult, error};
27
28use crate::error::FdbBindingError;
29#[cfg_api_versions(min = 710)]
30#[cfg(feature = "tenant-experimental")]
31use crate::tenant::FdbTenant;
32use futures::prelude::*;
33
34/// Wrapper around the boolean representing whether the
35/// previous transaction is still on fly
36/// This wrapper prevents the boolean to be copy and force it
37/// to be moved instead.
38/// This pretty handy when you don't want to see the `Database::run` closure
39/// capturing the environment.
40pub struct MaybeCommitted(bool);
41
42impl From<MaybeCommitted> for bool {
43 fn from(value: MaybeCommitted) -> Self {
44 value.0
45 }
46}
47
48/// Lifecycle hooks for the transaction retry runner.
49///
50/// All methods have default no-op implementations, so callers only override what they need.
51/// This trait enables a single internal retry loop ([`run_with_hooks`]) to serve both
52/// `Database::run()` (no-op hooks) and `Database::instrumented_run()` (metrics hooks).
53pub trait RunnerHooks {
54 /// Called when commit fails, **before** `on_error()` resets the transaction.
55 /// This is the only window to read conflicting keys or inspect error state.
56 ///
57 /// Errors are logged (behind `trace` feature) but do not abort the retry loop.
58 fn on_commit_error(
59 &self,
60 _err: &TransactionCommitError,
61 ) -> impl Future<Output = FdbResult<()>> + Send {
62 async { Ok(()) }
63 }
64
65 /// Called when a closure error triggers a retry.
66 fn on_closure_error(&self, _err: &FdbError) {}
67
68 /// Called after `on_error()` completes with its duration.
69 fn on_error_duration(&self, _duration_ms: u64) {}
70
71 /// Called after successful commit with the committed transaction and commit duration.
72 fn on_commit_success(&self, _committed: &TransactionCommitted, _commit_duration_ms: u64) {}
73
74 /// Called before the next retry iteration (after `on_error` succeeds).
75 fn on_retry(&self) {}
76
77 /// Called when the runner finishes (success or final failure).
78 fn on_complete(&self) {}
79}
80
81/// No-op hooks — zero overhead. Used by [`Database::run()`].
82#[derive(Debug, Clone, Copy, Default)]
83pub struct NoopHooks;
84impl RunnerHooks for NoopHooks {}
85
86/// Metrics-collecting hooks for `Database::instrumented_run()`.
87pub(crate) struct InstrumentedHooks {
88 pub(crate) metrics: TransactionMetrics,
89 pub(crate) start: Instant,
90}
91
92impl RunnerHooks for InstrumentedHooks {
93 async fn on_commit_error(&self, err: &TransactionCommitError) -> FdbResult<()> {
94 // not_committed (1020) = commit conflict
95 if err.code() == 1020 {
96 self.metrics.increment_conflict_count();
97 }
98 // Reading from the \xff\xff/transaction/conflicting_keys/ special keyspace is
99 // resolved client-side — no network round-trip to the cluster. The future still
100 // goes through the FDB network thread event loop, but the data comes from an
101 // in-memory map populated during the commit response. Returns empty if
102 // ReportConflictingKeys was not set.
103 let keys = err.conflicting_keys().await?;
104 if !keys.is_empty() {
105 self.metrics.set_conflicting_keys(keys);
106 }
107 Ok(())
108 }
109
110 fn on_error_duration(&self, duration_ms: u64) {
111 self.metrics.add_error_time(duration_ms);
112 }
113
114 fn on_commit_success(&self, committed: &TransactionCommitted, commit_duration_ms: u64) {
115 self.metrics.record_commit_time(commit_duration_ms);
116 if let Ok(version) = committed.committed_version() {
117 self.metrics.set_commit_version(version);
118 }
119 }
120
121 fn on_retry(&self) {
122 self.metrics.reset_current();
123 }
124
125 fn on_complete(&self) {
126 let total_duration = self.start.elapsed().as_millis() as u64;
127 self.metrics.set_execution_time(total_duration);
128 }
129}
130
131/// Single internal retry loop that all public entrypoints (`run`, `instrumented_run`,
132/// `FdbTenant::run`) delegate to.
133///
134/// # Hook lifecycle per iteration
135///
136/// 1. Execute closure
137/// 2. If closure returns `Err` with an `FdbError`:
138/// - `on_closure_error` → `on_error()` → `on_error_duration` → `on_retry` → loop
139/// 3. If closure succeeds, attempt commit:
140/// - Commit succeeds → `on_commit_success` → return `Ok`
141/// - Commit fails (retryable) → `on_commit_error` → `on_error()` → `on_error_duration` → `on_retry` → loop
142/// - Commit fails (non-retryable) → return `Err`
143#[cfg_attr(
144 feature = "trace",
145 tracing::instrument(level = "debug", skip(initial_transaction, hooks, closure))
146)]
147pub(crate) async fn run_with_hooks<F, Fut, T, H: RunnerHooks>(
148 initial_transaction: RetryableTransaction,
149 hooks: &H,
150 closure: F,
151) -> Result<T, FdbBindingError>
152where
153 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
154 Fut: Future<Output = Result<T, FdbBindingError>>,
155{
156 let mut maybe_committed = false;
157 let mut transaction = initial_transaction;
158 #[cfg(feature = "trace")]
159 let mut iteration: u64 = 0;
160
161 loop {
162 #[cfg(feature = "trace")]
163 {
164 iteration += 1;
165 }
166
167 let result_closure = closure(transaction.clone(), MaybeCommitted(maybe_committed)).await;
168
169 if let Err(e) = result_closure {
170 if let Some(fdb_err) = e.get_fdb_error() {
171 maybe_committed = fdb_err.is_maybe_committed();
172 hooks.on_closure_error(&fdb_err);
173
174 let now_on_error = Instant::now();
175 match transaction.on_error(fdb_err).await {
176 Ok(Ok(t)) => {
177 hooks.on_error_duration(now_on_error.elapsed().as_millis() as u64);
178
179 #[cfg(feature = "trace")]
180 {
181 let error_code = fdb_err.code();
182 tracing::warn!(iteration, error_code, "restarting transaction");
183 }
184
185 hooks.on_retry();
186 transaction = t;
187 continue;
188 }
189 Ok(Err(non_retryable)) => {
190 return Err(FdbBindingError::from(non_retryable));
191 }
192 Err(binding_err) => {
193 return Err(binding_err);
194 }
195 }
196 }
197 return Err(e);
198 }
199
200 #[cfg(feature = "trace")]
201 tracing::info!(iteration, "closure executed, checking result...");
202
203 let now_commit = Instant::now();
204 let commit_result = transaction.commit().await;
205 let commit_duration = now_commit.elapsed().as_millis() as u64;
206
207 match commit_result {
208 Err(err) => {
209 #[cfg(feature = "trace")]
210 tracing::error!(
211 iteration,
212 "transaction reference kept, aborting transaction"
213 );
214 return Err(err);
215 }
216 Ok(Ok(committed)) => {
217 hooks.on_commit_success(&committed, commit_duration);
218
219 #[cfg(feature = "trace")]
220 tracing::info!(iteration, "success, returning result");
221
222 return result_closure;
223 }
224 Ok(Err(commit_error)) => {
225 #[cfg(feature = "trace")]
226 let error_code = commit_error.code();
227
228 maybe_committed = commit_error.is_maybe_committed();
229 if let Err(_e) = hooks.on_commit_error(&commit_error).await {
230 #[cfg(feature = "trace")]
231 tracing::debug!(error_code = _e.code(), "on_commit_error hook failed");
232 }
233
234 let now_on_error = Instant::now();
235 match commit_error.on_error().await {
236 Ok(t) => {
237 hooks.on_error_duration(now_on_error.elapsed().as_millis() as u64);
238
239 #[cfg(feature = "trace")]
240 tracing::warn!(iteration, error_code, "restarting transaction");
241
242 hooks.on_retry();
243 transaction = RetryableTransaction::new(t);
244 continue;
245 }
246 Err(non_retryable) => {
247 #[cfg(feature = "trace")]
248 {
249 let error_code = non_retryable.code();
250 tracing::error!(
251 iteration,
252 error_code,
253 "could not commit, non retryable error"
254 );
255 }
256
257 return Err(FdbBindingError::from(non_retryable));
258 }
259 }
260 }
261 }
262 }
263}
264
265/// Represents a FoundationDB database
266///
267/// A mutable, lexicographically ordered mapping from binary keys to binary values.
268///
269/// Modifications to a database are performed via transactions.
270pub struct Database {
271 pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
272}
273unsafe impl Send for Database {}
274unsafe impl Sync for Database {}
275impl Drop for Database {
276 fn drop(&mut self) {
277 unsafe {
278 fdb_sys::fdb_database_destroy(self.inner.as_ptr());
279 }
280 }
281}
282
283#[cfg_api_versions(min = 610)]
284impl Database {
285 /// Create a database for the given configuration path if any, or the default one.
286 pub fn new(path: Option<&str>) -> FdbResult<Database> {
287 let path_str =
288 path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
289 let path_ptr = path_str
290 .as_ref()
291 .map(|path| path.as_ptr())
292 .unwrap_or(std::ptr::null());
293 let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
294 let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
295 drop(path_str); // path_str own the CString that we are getting the ptr from
296 error::eval(err)?;
297 let ptr =
298 NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
299 // Safe because the database is constructed in this scope and we know it's
300 // a valid pointer.
301 Ok(unsafe { Self::new_from_pointer(ptr) })
302 }
303
304 /// Create a new FDBDatabase from a raw pointer. Users are expected to use the `new` method.
305 ///
306 /// # Safety
307 ///
308 /// The caller must ensure that `ptr` is a valid pointer to an `FDBDatabase` object
309 /// obtained from the FoundationDB C API, and that the pointer is not aliased or used
310 /// after being passed to this function.
311 pub unsafe fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
312 Self { inner: ptr }
313 }
314
315 /// Create a database for the given configuration path
316 pub fn from_path(path: &str) -> FdbResult<Database> {
317 Self::new(Some(path))
318 }
319
320 /// Create a database for the default configuration path
321 #[allow(clippy::should_implement_trait)]
322 pub fn default() -> FdbResult<Database> {
323 Self::new(None)
324 }
325}
326
327#[cfg_api_versions(min = 710)]
328#[cfg(feature = "tenant-experimental")]
329impl Database {
330 pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
331 let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
332 let err = unsafe {
333 fdb_sys::fdb_database_open_tenant(
334 self.inner.as_ptr(),
335 tenant_name.as_ptr(),
336 tenant_name.len().try_into().unwrap(),
337 &mut ptr,
338 )
339 };
340 error::eval(err)?;
341 Ok(FdbTenant {
342 inner: NonNull::new(ptr)
343 .expect("fdb_database_open_tenant to not return null if there is no error"),
344 name: tenant_name.to_owned(),
345 })
346 }
347}
348
349#[cfg_api_versions(min = 730)]
350impl Database {
351 /// Retrieve a client-side status information in a JSON format.
352 pub fn get_client_status(
353 &self,
354 ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin + use<>
355 {
356 crate::future::FdbFuture::new(unsafe {
357 fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
358 })
359 }
360}
361
362impl Database {
363 /// Create a database for the given configuration path
364 ///
365 /// This is a compatibility api. If you only use API version ≥ 610 you should
366 /// use `Database::new`, `Database::from_path` or `Database::default`.
367 pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
368 if_cfg_api_versions!(min = 510, max = 600 => {
369 let cluster = crate::cluster::Cluster::new(path).await?;
370 let database = cluster.create_database().await?;
371 Ok(database)
372 } else {
373 Database::new(path)
374 })
375 }
376
377 /// Called to set an option an on `Database`.
378 pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
379 unsafe { opt.apply(self.inner.as_ptr()) }
380 }
381
382 /// Creates a new transaction on the given database.
383 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
384 pub fn create_trx(&self) -> FdbResult<Transaction> {
385 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
386 let err =
387 unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
388 error::eval(err)?;
389 Ok(Transaction::new(NonNull::new(trx).expect(
390 "fdb_database_create_transaction to not return null if there is no error",
391 )))
392 }
393
394 /// Creates a new transaction on the given database with metrics collection.
395 ///
396 /// This method is similar to `create_trx()` but additionally collects metrics about
397 /// the transaction execution, including operation counts, bytes read/written, and retry counts.
398 ///
399 /// # Arguments
400 /// * `metrics` - A TransactionMetrics instance to collect metrics
401 ///
402 /// # Returns
403 /// * `Result<Transaction, (FdbBindingError, MetricsData)>` - A transaction with metrics collection enabled
404 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
405 pub fn create_instrumented_trx(
406 &self,
407 metrics: TransactionMetrics,
408 ) -> Result<Transaction, FdbBindingError> {
409 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
410 let err =
411 unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
412 error::eval(err)?;
413
414 let inner = NonNull::new(trx)
415 .expect("fdb_database_create_transaction to not return null if there is no error");
416 Ok(Transaction::new_instrumented(inner, metrics))
417 }
418
419 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
420 fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
421 Ok(RetryableTransaction::new(self.create_trx()?))
422 }
423
424 /// Creates a new retryable transaction on the given database with metrics collection.
425 ///
426 /// This method is similar to `create_retryable_trx()` but additionally collects metrics about
427 /// the transaction execution, including operation counts, bytes read/written, and retry counts.
428 ///
429 /// # Arguments
430 /// * `metrics` - A TransactionMetrics instance to collect metrics
431 ///
432 /// # Returns
433 /// * `Result<RetryableTransaction, (FdbBindingError, MetricsData)>` - A retryable transaction with metrics collection enabled
434 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
435 pub fn create_intrumented_retryable_trx(
436 &self,
437 metrics: TransactionMetrics,
438 ) -> Result<RetryableTransaction, FdbBindingError> {
439 Ok(RetryableTransaction::new(
440 self.create_instrumented_trx(metrics.clone())?,
441 ))
442 }
443
444 /// `transact` returns a future which retries on error. It tries to resolve a future created by
445 /// caller-provided function `f` inside a retry loop, providing it with a newly created
446 /// transaction. After caller-provided future resolves, the transaction will be committed
447 /// automatically.
448 ///
449 /// # Warning: Hanging on Network/DNS failures
450 ///
451 /// By default, the FoundationDB C API will retry indefinitely if it cannot reach the cluster
452 /// or if DNS resolution fails. This can cause `transact` to hang forever.
453 /// To prevent this, you should set [`options::DatabaseOption::TransactionTimeout`] or
454 /// [`options::DatabaseOption::TransactionRetryLimit`] on the [`Database`] object, or
455 /// [`options::TransactionOption::Timeout`] or [`options::TransactionOption::RetryLimit`] on the transaction
456 /// itself.
457 ///
458 /// Note that `TransactOption` also provides `retry_limit` and `time_out`, but these are
459 /// Rust-side budgets that are only checked *between* retries. If the C API hangs during a call
460 /// like `commit()` or `on_error()`, these budgets will not be reached.
461 ///
462 /// Once [Generic Associated Types](https://github.com/rust-lang/rfcs/blob/master/text/1598-generic_associated_types.md)
463 /// lands in stable rust, the returned future of f won't need to be boxed anymore, also the
464 /// lifetime limitations around f might be lowered.
465 pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
466 where
467 F: DatabaseTransact,
468 {
469 let is_idempotent = options.is_idempotent;
470 let time_out = options.time_out.map(|d| Instant::now() + d);
471 let retry_limit = options.retry_limit;
472 let mut tries: u32 = 0;
473 let mut trx = self.create_trx()?;
474 let mut can_retry = move || {
475 tries += 1;
476 retry_limit.map(|limit| tries < limit).unwrap_or(true)
477 && time_out.map(|t| Instant::now() < t).unwrap_or(true)
478 };
479 loop {
480 let r = f.transact(trx).await;
481 f = r.0;
482 trx = r.1;
483 trx = match r.2 {
484 Ok(item) => match trx.commit().await {
485 Ok(_) => break Ok(item),
486 Err(e) => {
487 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
488 e.on_error().await?
489 } else {
490 break Err(F::Error::from(e.into()));
491 }
492 }
493 },
494 Err(user_err) => match user_err.try_into_fdb_error() {
495 Ok(e) => {
496 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
497 trx.on_error(e).await?
498 } else {
499 break Err(F::Error::from(e));
500 }
501 }
502 Err(user_err) => break Err(user_err),
503 },
504 };
505 }
506 }
507
508 /// `transact_boxed` is a version of [`Database::transact`] that accepts a closure returning a
509 /// pinned, boxed future.
510 ///
511 /// # Warning: Hanging on Network/DNS failures
512 ///
513 /// By default, the FoundationDB C API will retry indefinitely if it cannot reach the cluster
514 /// or if DNS resolution fails. This can cause `transact_boxed` to hang forever.
515 /// To prevent this, you should set [`options::DatabaseOption::TransactionTimeout`] or
516 /// [`options::DatabaseOption::TransactionRetryLimit`] on the [`Database`] object, or
517 /// [`options::TransactionOption::Timeout`] or [`options::TransactionOption::RetryLimit`] on the transaction
518 /// itself.
519 ///
520 /// Note that `TransactOption` also provides `retry_limit` and `time_out`, but these are
521 /// Rust-side budgets that are only checked *between* retries. If the C API hangs during a call
522 /// like `commit()` or `on_error()`, these budgets will not be reached.
523 pub fn transact_boxed<'trx, F, D, T, E>(
524 &'trx self,
525 data: D,
526 f: F,
527 options: TransactOption,
528 ) -> impl Future<Output = Result<T, E>> + Send + 'trx
529 where
530 for<'a> F: FnMut(
531 &'a Transaction,
532 &'a mut D,
533 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
534 E: TransactError,
535 F: Send + 'trx,
536 T: Send + 'trx,
537 E: Send + 'trx,
538 D: Send + 'trx,
539 {
540 self.transact(
541 boxed::FnMutBoxed {
542 f,
543 d: data,
544 m: PhantomData,
545 },
546 options,
547 )
548 }
549
550 /// `transact_boxed_local` is a version of [`Database::transact`] that accepts a closure returning a
551 /// pinned, boxed future that is not `Send`.
552 ///
553 /// # Warning: Hanging on Network/DNS failures
554 ///
555 /// By default, the FoundationDB C API will retry indefinitely if it cannot reach the cluster
556 /// or if DNS resolution fails. This can cause `transact_boxed_local` to hang forever.
557 /// To prevent this, you should set [`options::DatabaseOption::TransactionTimeout`] or
558 /// [`options::DatabaseOption::TransactionRetryLimit`] on the [`Database`] object, or
559 /// [`options::TransactionOption::Timeout`] or [`options::TransactionOption::RetryLimit`] on the transaction
560 /// itself.
561 ///
562 /// Note that `TransactOption` also provides `retry_limit` and `time_out`, but these are
563 /// Rust-side budgets that are only checked *between* retries. If the C API hangs during a call
564 /// like `commit()` or `on_error()`, these budgets will not be reached.
565 pub fn transact_boxed_local<'trx, F, D, T, E>(
566 &'trx self,
567 data: D,
568 f: F,
569 options: TransactOption,
570 ) -> impl Future<Output = Result<T, E>> + 'trx
571 where
572 for<'a> F:
573 FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
574 E: TransactError,
575 F: 'trx,
576 T: 'trx,
577 E: 'trx,
578 D: 'trx,
579 {
580 self.transact(
581 boxed_local::FnMutBoxedLocal {
582 f,
583 d: data,
584 m: PhantomData,
585 },
586 options,
587 )
588 }
589
590 /// Runs a transactional function against this Database with retry logic.
591 /// The associated closure will be called until a non-retryable FDBError
592 /// is thrown or commit(), returns success.
593 ///
594 /// Users are **not** expected to keep reference to the `RetryableTransaction`. If a weak or strong
595 /// reference is kept by the user, the binding will throw an error.
596 ///
597 /// # Warning: retry
598 ///
599 /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
600 /// set [`options::TransactionOption::RetryLimit`] or [`options::TransactionOption::Timeout`] on the transaction
601 /// if the task needs to be guaranteed to finish. These options can be safely set on every iteration of the closure.
602 ///
603 /// # Warning: Hanging on Network/DNS failures
604 ///
605 /// By default, the FoundationDB C API will retry indefinitely if it cannot reach the cluster
606 /// or if DNS resolution fails. This can cause `run` to hang forever.
607 /// To prevent this, you should set [`options::DatabaseOption::TransactionTimeout`] or
608 /// [`options::DatabaseOption::TransactionRetryLimit`] on the [`Database`] object, or
609 /// [`options::TransactionOption::Timeout`] or [`options::TransactionOption::RetryLimit`] on the transaction
610 /// itself.
611 ///
612 /// # Warning: Maybe committed transactions
613 ///
614 /// As with other client/server databases, in some failure scenarios a client may be unable to determine
615 /// whether a transaction succeeded. You should make sure your closure is idempotent.
616 ///
617 /// The closure will notify the user in case of a maybe_committed transaction in a previous run
618 /// with the `MaybeCommitted` provided in the closure.
619 ///
620 /// This one can be used as boolean with
621 /// ```ignore
622 /// db.run(|trx, maybe_committed| async {
623 /// if maybe_committed.into() {
624 /// // Handle the problem if needed
625 /// }
626 /// }).await;
627 ///```
628 #[cfg_attr(
629 feature = "trace",
630 tracing::instrument(level = "debug", skip(self, closure))
631 )]
632 pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
633 where
634 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
635 Fut: Future<Output = Result<T, FdbBindingError>>,
636 {
637 let transaction = self.create_retryable_trx()?;
638 run_with_hooks(transaction, &NoopHooks, closure).await
639 }
640
641 /// Runs a transactional function against this Database with retry logic and custom hooks.
642 ///
643 /// This is the most flexible entrypoint — implement [`RunnerHooks`] to observe
644 /// or react to each phase of the retry loop (commit errors, retries, success).
645 ///
646 /// See [`RunnerHooks`] for the hook lifecycle documentation.
647 #[cfg_attr(
648 feature = "trace",
649 tracing::instrument(level = "debug", skip(self, hooks, closure))
650 )]
651 pub async fn run_with_hooks<F, Fut, T, H: RunnerHooks>(
652 &self,
653 hooks: &H,
654 closure: F,
655 ) -> Result<T, FdbBindingError>
656 where
657 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
658 Fut: Future<Output = Result<T, FdbBindingError>>,
659 {
660 let transaction = self.create_retryable_trx()?;
661 run_with_hooks(transaction, hooks, closure).await
662 }
663
664 /// Runs a transactional function against this Database with retry logic and metrics collection.
665 /// The associated closure will be called until a non-retryable FDBError
666 /// is thrown or commit() returns success.
667 ///
668 /// This method is similar to `run()` but additionally collects and returns metrics about
669 /// the transaction execution, including operation counts, bytes read/written, and retry counts.
670 ///
671 /// # Arguments
672 /// * `closure` - A function that takes a RetryableTransaction and MaybeCommitted flag and returns a Future
673 ///
674 /// # Returns
675 /// * `Result<(T, Metrics), (FdbBindingError, Metrics)>` - On success, returns the result of the transaction and collected metrics.
676 /// On failure, returns the error and the metrics collected up to the point of failure.
677 ///
678 /// # Warning: retry
679 ///
680 /// It might retry indefinitely if the transaction is highly contentious. It is recommended to
681 /// set [`options::TransactionOption::RetryLimit`] or [`options::TransactionOption::Timeout`] on the transaction
682 /// if the task needs to be guaranteed to finish.
683 ///
684 /// # Warning: Maybe committed transactions
685 ///
686 /// As with other client/server databases, in some failure scenarios a client may be unable to determine
687 /// whether a transaction succeeded. The closure will be notified of a maybe_committed transaction
688 /// in a previous run with the `MaybeCommitted` provided in the closure.
689 #[cfg_attr(
690 feature = "trace",
691 tracing::instrument(level = "debug", skip(self, closure))
692 )]
693 pub async fn instrumented_run<F, Fut, T>(
694 &self,
695 closure: F,
696 ) -> Result<(T, MetricsReport), (FdbBindingError, MetricsReport)>
697 where
698 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
699 Fut: Future<Output = Result<T, FdbBindingError>>,
700 {
701 let metrics = TransactionMetrics::new();
702 let hooks = InstrumentedHooks {
703 metrics: metrics.clone(),
704 start: Instant::now(),
705 };
706 let transaction = match self.create_intrumented_retryable_trx(metrics.clone()) {
707 Ok(trx) => trx,
708 Err(err) => {
709 hooks.on_complete();
710 return Err((err, metrics.get_metrics_data()));
711 }
712 };
713
714 match run_with_hooks(transaction, &hooks, closure).await {
715 Ok(val) => {
716 hooks.on_complete();
717 Ok((val, metrics.get_metrics_data()))
718 }
719 Err(err) => {
720 hooks.on_complete();
721 Err((err, metrics.get_metrics_data()))
722 }
723 }
724 }
725
726 /// Perform a no-op against FDB to check network thread liveness. This operation will not change the underlying data
727 /// in any way, nor will it perform any I/O against the FDB cluster. However, it will schedule some amount of work
728 /// onto the FDB client and wait for it to complete. The FoundationDB client operates by scheduling onto an event
729 /// queue that is then processed by a single thread (the "network thread"). This method can be used to determine if
730 /// the network thread has entered a state where it is no longer processing requests or if its time to process
731 /// requests has increased. If the network thread is busy, this operation may take some amount of time to complete,
732 /// which is why this operation returns a future.
733 pub async fn perform_no_op(&self) -> FdbResult<()> {
734 let trx = self.create_trx()?;
735
736 // Set the read version of the transaction, then read it back. This requires no I/O, but it does
737 // require the network thread be running. The exact value used for the read version is unimportant.
738 trx.set_read_version(42);
739 trx.get_read_version().await?;
740 Ok(())
741 }
742
743 /// Returns a value where 0 indicates that the client is idle and 1 (or larger) indicates that the client is saturated.
744 /// By default, this value is updated every second.
745 #[cfg_api_versions(min = 710)]
746 pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
747 let busyness =
748 unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
749 Ok(busyness)
750 }
751}
752pub trait DatabaseTransact: Sized {
753 type Item;
754 type Error: TransactError;
755 type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
756 fn transact(self, trx: Transaction) -> Self::Future;
757}
758
759#[allow(clippy::needless_lifetimes)]
760#[allow(clippy::type_complexity)]
761mod boxed {
762 use super::*;
763
764 async fn boxed_data_fut<'t, F, T, E, D>(
765 mut f: FnMutBoxed<'t, F, D>,
766 trx: Transaction,
767 ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
768 where
769 F: for<'a> FnMut(
770 &'a Transaction,
771 &'a mut D,
772 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
773 E: TransactError,
774 {
775 let r = (f.f)(&trx, &mut f.d).await;
776 (f, trx, r)
777 }
778
779 pub struct FnMutBoxed<'t, F, D> {
780 pub f: F,
781 pub d: D,
782 pub m: PhantomData<&'t ()>,
783 }
784 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
785 where
786 F: for<'a> FnMut(
787 &'a Transaction,
788 &'a mut D,
789 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
790 F: 't + Send,
791 T: 't,
792 E: 't,
793 D: 't + Send,
794 E: TransactError,
795 {
796 type Item = T;
797 type Error = E;
798 type Future = Pin<
799 Box<
800 dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
801 + Send
802 + 't,
803 >,
804 >;
805
806 fn transact(self, trx: Transaction) -> Self::Future {
807 boxed_data_fut(self, trx).boxed()
808 }
809 }
810}
811
812#[allow(clippy::needless_lifetimes)]
813#[allow(clippy::type_complexity)]
814mod boxed_local {
815 use super::*;
816
817 async fn boxed_local_data_fut<'t, F, T, E, D>(
818 mut f: FnMutBoxedLocal<'t, F, D>,
819 trx: Transaction,
820 ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
821 where
822 F: for<'a> FnMut(
823 &'a Transaction,
824 &'a mut D,
825 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
826 E: TransactError,
827 {
828 let r = (f.f)(&trx, &mut f.d).await;
829 (f, trx, r)
830 }
831
832 pub struct FnMutBoxedLocal<'t, F, D> {
833 pub f: F,
834 pub d: D,
835 pub m: PhantomData<&'t ()>,
836 }
837 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
838 where
839 F: for<'a> FnMut(
840 &'a Transaction,
841 &'a mut D,
842 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
843 F: 't,
844 T: 't,
845 E: 't,
846 D: 't,
847 E: TransactError,
848 {
849 type Item = T;
850 type Error = E;
851 type Future = Pin<
852 Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
853 >;
854
855 fn transact(self, trx: Transaction) -> Self::Future {
856 boxed_local_data_fut(self, trx).boxed_local()
857 }
858 }
859}
860
861/// A trait that must be implemented to use `Database::transact` this application error types.
862pub trait TransactError: From<FdbError> {
863 fn try_into_fdb_error(self) -> Result<FdbError, Self>;
864}
865impl<T> TransactError for T
866where
867 T: From<FdbError> + TryInto<FdbError, Error = T>,
868{
869 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
870 self.try_into()
871 }
872}
873impl TransactError for FdbError {
874 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
875 Ok(self)
876 }
877}
878
879/// A set of options that controls the behavior of `Database::transact`.
880#[derive(Default, Clone)]
881pub struct TransactOption {
882 pub retry_limit: Option<u32>,
883 pub time_out: Option<Duration>,
884 pub is_idempotent: bool,
885}
886
887impl TransactOption {
888 /// An idempotent TransactOption
889 pub fn idempotent() -> Self {
890 Self {
891 is_idempotent: true,
892 ..TransactOption::default()
893 }
894 }
895}