1use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use fdb_sys::if_cfg_api_versions;
20use foundationdb_macros::cfg_api_versions;
21use foundationdb_sys as fdb_sys;
22
23use crate::metrics::{MetricsReport, TransactionMetrics};
24use crate::options;
25use crate::transaction::*;
26use crate::{error, FdbError, FdbResult};
27
28use crate::error::FdbBindingError;
29#[cfg_api_versions(min = 710)]
30#[cfg(feature = "tenant-experimental")]
31use crate::tenant::FdbTenant;
32use futures::prelude::*;
33
34pub struct MaybeCommitted(bool);
41
42impl From<MaybeCommitted> for bool {
43 fn from(value: MaybeCommitted) -> Self {
44 value.0
45 }
46}
47
48pub struct Database {
54 pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
55}
56unsafe impl Send for Database {}
57unsafe impl Sync for Database {}
58impl Drop for Database {
59 fn drop(&mut self) {
60 unsafe {
61 fdb_sys::fdb_database_destroy(self.inner.as_ptr());
62 }
63 }
64}
65
66#[cfg_api_versions(min = 610)]
67impl Database {
68 pub fn new(path: Option<&str>) -> FdbResult<Database> {
70 let path_str =
71 path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
72 let path_ptr = path_str
73 .as_ref()
74 .map(|path| path.as_ptr())
75 .unwrap_or(std::ptr::null());
76 let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
77 let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
78 drop(path_str); error::eval(err)?;
80 let ptr =
81 NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
82 Ok(Self::new_from_pointer(ptr))
83 }
84
85 pub fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
87 Self { inner: ptr }
88 }
89
90 pub fn from_path(path: &str) -> FdbResult<Database> {
92 Self::new(Some(path))
93 }
94
95 #[allow(clippy::should_implement_trait)]
97 pub fn default() -> FdbResult<Database> {
98 Self::new(None)
99 }
100}
101
102#[cfg_api_versions(min = 710)]
103#[cfg(feature = "tenant-experimental")]
104impl Database {
105 pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
106 let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
107 let err = unsafe {
108 fdb_sys::fdb_database_open_tenant(
109 self.inner.as_ptr(),
110 tenant_name.as_ptr(),
111 tenant_name.len().try_into().unwrap(),
112 &mut ptr,
113 )
114 };
115 error::eval(err)?;
116 Ok(FdbTenant {
117 inner: NonNull::new(ptr)
118 .expect("fdb_database_open_tenant to not return null if there is no error"),
119 name: tenant_name.to_owned(),
120 })
121 }
122}
123
124#[cfg_api_versions(min = 730)]
125impl Database {
126 pub fn get_client_status(
128 &self,
129 ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin {
130 crate::future::FdbFuture::new(unsafe {
131 fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
132 })
133 }
134}
135
136impl Database {
137 pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
142 if_cfg_api_versions!(min = 510, max = 600 => {
143 let cluster = crate::cluster::Cluster::new(path).await?;
144 let database = cluster.create_database().await?;
145 Ok(database)
146 } else {
147 Database::new(path)
148 })
149 }
150
151 pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
153 unsafe { opt.apply(self.inner.as_ptr()) }
154 }
155
156 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
158 pub fn create_trx(&self) -> FdbResult<Transaction> {
159 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
160 let err =
161 unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
162 error::eval(err)?;
163 Ok(Transaction::new(NonNull::new(trx).expect(
164 "fdb_database_create_transaction to not return null if there is no error",
165 )))
166 }
167
168 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
179 pub fn create_instrumented_trx(
180 &self,
181 metrics: TransactionMetrics,
182 ) -> Result<Transaction, FdbBindingError> {
183 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
184 let err =
185 unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
186 error::eval(err)?;
187
188 let inner = NonNull::new(trx)
189 .expect("fdb_database_create_transaction to not return null if there is no error");
190 Ok(Transaction::new_instrumented(inner, metrics))
191 }
192
193 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
194 fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
195 Ok(RetryableTransaction::new(self.create_trx()?))
196 }
197
198 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
209 pub fn create_intrumented_retryable_trx(
210 &self,
211 metrics: TransactionMetrics,
212 ) -> Result<RetryableTransaction, FdbBindingError> {
213 Ok(RetryableTransaction::new(
214 self.create_instrumented_trx(metrics.clone())?,
215 ))
216 }
217
218 pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
233 where
234 F: DatabaseTransact,
235 {
236 let is_idempotent = options.is_idempotent;
237 let time_out = options.time_out.map(|d| Instant::now() + d);
238 let retry_limit = options.retry_limit;
239 let mut tries: u32 = 0;
240 let mut trx = self.create_trx()?;
241 let mut can_retry = move || {
242 tries += 1;
243 retry_limit.map(|limit| tries < limit).unwrap_or(true)
244 && time_out.map(|t| Instant::now() < t).unwrap_or(true)
245 };
246 loop {
247 let r = f.transact(trx).await;
248 f = r.0;
249 trx = r.1;
250 trx = match r.2 {
251 Ok(item) => match trx.commit().await {
252 Ok(_) => break Ok(item),
253 Err(e) => {
254 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
255 e.on_error().await?
256 } else {
257 break Err(F::Error::from(e.into()));
258 }
259 }
260 },
261 Err(user_err) => match user_err.try_into_fdb_error() {
262 Ok(e) => {
263 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
264 trx.on_error(e).await?
265 } else {
266 break Err(F::Error::from(e));
267 }
268 }
269 Err(user_err) => break Err(user_err),
270 },
271 };
272 }
273 }
274
275 pub fn transact_boxed<'trx, F, D, T, E>(
276 &'trx self,
277 data: D,
278 f: F,
279 options: TransactOption,
280 ) -> impl Future<Output = Result<T, E>> + Send + 'trx
281 where
282 for<'a> F: FnMut(
283 &'a Transaction,
284 &'a mut D,
285 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
286 E: TransactError,
287 F: Send + 'trx,
288 T: Send + 'trx,
289 E: Send + 'trx,
290 D: Send + 'trx,
291 {
292 self.transact(
293 boxed::FnMutBoxed {
294 f,
295 d: data,
296 m: PhantomData,
297 },
298 options,
299 )
300 }
301
302 pub fn transact_boxed_local<'trx, F, D, T, E>(
303 &'trx self,
304 data: D,
305 f: F,
306 options: TransactOption,
307 ) -> impl Future<Output = Result<T, E>> + 'trx
308 where
309 for<'a> F:
310 FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
311 E: TransactError,
312 F: 'trx,
313 T: 'trx,
314 E: 'trx,
315 D: 'trx,
316 {
317 self.transact(
318 boxed_local::FnMutBoxedLocal {
319 f,
320 d: data,
321 m: PhantomData,
322 },
323 options,
324 )
325 }
326
327 #[cfg_attr(
357 feature = "trace",
358 tracing::instrument(level = "debug", skip(self, closure))
359 )]
360 pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
361 where
362 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
363 Fut: Future<Output = Result<T, FdbBindingError>>,
364 {
365 let mut maybe_committed_transaction = false;
366 let mut transaction = self.create_retryable_trx()?;
369 #[cfg(feature = "trace")]
370 let mut iteration = 0;
371
372 loop {
373 #[cfg(feature = "trace")]
374 {
375 iteration += 1;
376 }
377 let result_closure = closure(
379 transaction.clone(),
380 MaybeCommitted(maybe_committed_transaction),
381 )
382 .await;
383
384 if let Err(e) = result_closure {
385 if let Some(e) = e.get_fdb_error() {
387 maybe_committed_transaction = e.is_maybe_committed();
388 match transaction.on_error(e).await {
390 Ok(Ok(t)) => {
392 #[cfg(feature = "trace")]
393 {
394 let error_code = e.code();
395 tracing::warn!(iteration, error_code, "restarting transaction");
396 }
397
398 transaction = t;
399 continue;
400 }
401 Ok(Err(non_retryable_error)) => {
402 return Err(FdbBindingError::from(non_retryable_error))
403 }
404 Err(non_retryable_error) => return Err(non_retryable_error),
406 }
407 }
408 return Err(e);
410 }
411
412 #[cfg(feature = "trace")]
413 tracing::info!(iteration, "closure executed, checking result...");
414
415 let commit_result = transaction.commit().await;
416
417 match commit_result {
418 Err(err) => {
420 #[cfg(feature = "trace")]
421 tracing::error!(
422 iteration,
423 "transaction reference kept, aborting transaction"
424 );
425 return Err(err);
426 }
427 Ok(Ok(_)) => {
428 #[cfg(feature = "trace")]
429 tracing::info!(iteration, "success, returning result");
430 return result_closure;
431 }
432 Ok(Err(transaction_commit_error)) => {
433 #[cfg(feature = "trace")]
434 let error_code = transaction_commit_error.code();
435
436 maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
437 match transaction_commit_error.on_error().await {
439 Ok(t) => {
440 #[cfg(feature = "trace")]
441 tracing::warn!(iteration, error_code, "restarting transaction");
442
443 transaction = RetryableTransaction::new(t);
444 continue;
445 }
446 Err(non_retryable_error) => {
447 return Err(FdbBindingError::from(non_retryable_error))
448 }
449 }
450 }
451 }
452 }
453 }
454
455 #[cfg_attr(
481 feature = "trace",
482 tracing::instrument(level = "debug", skip(self, closure))
483 )]
484 pub async fn instrumented_run<F, Fut, T>(
485 &self,
486 closure: F,
487 ) -> Result<(T, MetricsReport), (FdbBindingError, MetricsReport)>
488 where
489 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
490 Fut: Future<Output = Result<T, FdbBindingError>>,
491 {
492 let now_start = std::time::Instant::now();
493 let metrics = TransactionMetrics::new();
494 let mut maybe_committed_transaction = false;
495
496 let mut transaction = match self.create_intrumented_retryable_trx(metrics.clone()) {
499 Ok(trx) => trx,
500 Err(err) => {
501 let total_duration = now_start.elapsed().as_millis() as u64;
503 metrics.set_execution_time(total_duration);
504 return Err((err, metrics.get_metrics_data()));
505 }
506 };
507
508 loop {
509 let result_closure = closure(
511 transaction.clone(),
512 MaybeCommitted(maybe_committed_transaction),
513 )
514 .await;
515
516 if let Err(error) = result_closure {
517 if let Some(e) = error.get_fdb_error() {
518 maybe_committed_transaction = e.is_maybe_committed();
520 let now_on_error = std::time::Instant::now();
522 let on_error_result = transaction.on_error(e).await;
523 let error_duration = now_on_error.elapsed().as_millis() as u64;
524 metrics.add_error_time(error_duration);
525
526 match on_error_result {
527 Ok(Ok(t)) => {
529 transaction = t;
530 metrics.reset_current();
532 continue;
533 }
534 Ok(Err(non_retryable_error)) => {
535 let total_duration = now_start.elapsed().as_millis() as u64;
536 metrics.set_execution_time(total_duration);
537 return Err((
538 FdbBindingError::from(non_retryable_error),
539 metrics.get_metrics_data(),
540 ));
541 }
542 Err(non_retryable_error) => {
544 let total_duration = now_start.elapsed().as_millis() as u64;
545 metrics.set_execution_time(total_duration);
546 return Err((non_retryable_error, metrics.get_metrics_data()));
547 }
548 }
549 }
550 let total_duration = now_start.elapsed().as_millis() as u64;
552 metrics.set_execution_time(total_duration);
553 return Err((error, metrics.get_metrics_data()));
554 }
555
556 let now_commit = std::time::Instant::now();
557 let commit_result = transaction.commit().await;
558 let commit_duration = now_commit.elapsed().as_millis() as u64;
559 metrics.record_commit_time(commit_duration);
560
561 match commit_result {
562 Err(err) => {
564 let total_duration = now_start.elapsed().as_millis() as u64;
565 metrics.set_execution_time(total_duration);
566 return Err((err, metrics.get_metrics_data()));
567 }
568 Ok(Ok(committed)) => {
569 match committed.committed_version() {
571 Ok(version) => metrics.set_commit_version(version),
572 Err(_err) => {
573 }
577 }
578
579 let total_duration = now_start.elapsed().as_millis() as u64;
580 metrics.set_execution_time(total_duration);
581 return Ok((result_closure.unwrap(), metrics.get_metrics_data()));
582 }
583 Ok(Err(transaction_commit_error)) => {
584 maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
585 let now_on_error = std::time::Instant::now();
587 let on_error_result = transaction_commit_error.on_error().await;
588 let error_duration = now_on_error.elapsed().as_millis() as u64;
589 metrics.add_error_time(error_duration);
590
591 match on_error_result {
592 Ok(t) => {
593 transaction = RetryableTransaction::new(t);
594 metrics.reset_current();
596 continue;
597 }
598 Err(non_retryable_error) => {
599 let total_duration = now_start.elapsed().as_millis() as u64;
600 metrics.set_execution_time(total_duration);
601 return Err((
602 FdbBindingError::from(non_retryable_error),
603 metrics.get_metrics_data(),
604 ));
605 }
606 }
607 }
608 }
609 }
610 }
611
612 pub async fn perform_no_op(&self) -> FdbResult<()> {
620 let trx = self.create_trx()?;
621
622 trx.set_read_version(42);
625 trx.get_read_version().await?;
626 Ok(())
627 }
628
629 #[cfg_api_versions(min = 710)]
632 pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
633 let busyness =
634 unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
635 Ok(busyness)
636 }
637}
638pub trait DatabaseTransact: Sized {
639 type Item;
640 type Error: TransactError;
641 type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
642 fn transact(self, trx: Transaction) -> Self::Future;
643}
644
645#[allow(clippy::needless_lifetimes)]
646#[allow(clippy::type_complexity)]
647mod boxed {
648 use super::*;
649
650 async fn boxed_data_fut<'t, F, T, E, D>(
651 mut f: FnMutBoxed<'t, F, D>,
652 trx: Transaction,
653 ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
654 where
655 F: for<'a> FnMut(
656 &'a Transaction,
657 &'a mut D,
658 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
659 E: TransactError,
660 {
661 let r = (f.f)(&trx, &mut f.d).await;
662 (f, trx, r)
663 }
664
665 pub struct FnMutBoxed<'t, F, D> {
666 pub f: F,
667 pub d: D,
668 pub m: PhantomData<&'t ()>,
669 }
670 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
671 where
672 F: for<'a> FnMut(
673 &'a Transaction,
674 &'a mut D,
675 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
676 F: 't + Send,
677 T: 't,
678 E: 't,
679 D: 't + Send,
680 E: TransactError,
681 {
682 type Item = T;
683 type Error = E;
684 type Future = Pin<
685 Box<
686 dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
687 + Send
688 + 't,
689 >,
690 >;
691
692 fn transact(self, trx: Transaction) -> Self::Future {
693 boxed_data_fut(self, trx).boxed()
694 }
695 }
696}
697
698#[allow(clippy::needless_lifetimes)]
699#[allow(clippy::type_complexity)]
700mod boxed_local {
701 use super::*;
702
703 async fn boxed_local_data_fut<'t, F, T, E, D>(
704 mut f: FnMutBoxedLocal<'t, F, D>,
705 trx: Transaction,
706 ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
707 where
708 F: for<'a> FnMut(
709 &'a Transaction,
710 &'a mut D,
711 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
712 E: TransactError,
713 {
714 let r = (f.f)(&trx, &mut f.d).await;
715 (f, trx, r)
716 }
717
718 pub struct FnMutBoxedLocal<'t, F, D> {
719 pub f: F,
720 pub d: D,
721 pub m: PhantomData<&'t ()>,
722 }
723 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
724 where
725 F: for<'a> FnMut(
726 &'a Transaction,
727 &'a mut D,
728 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
729 F: 't,
730 T: 't,
731 E: 't,
732 D: 't,
733 E: TransactError,
734 {
735 type Item = T;
736 type Error = E;
737 type Future = Pin<
738 Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
739 >;
740
741 fn transact(self, trx: Transaction) -> Self::Future {
742 boxed_local_data_fut(self, trx).boxed_local()
743 }
744 }
745}
746
747pub trait TransactError: From<FdbError> {
749 fn try_into_fdb_error(self) -> Result<FdbError, Self>;
750}
751impl<T> TransactError for T
752where
753 T: From<FdbError> + TryInto<FdbError, Error = T>,
754{
755 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
756 self.try_into()
757 }
758}
759impl TransactError for FdbError {
760 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
761 Ok(self)
762 }
763}
764
765#[derive(Default, Clone)]
767pub struct TransactOption {
768 pub retry_limit: Option<u32>,
769 pub time_out: Option<Duration>,
770 pub is_idempotent: bool,
771}
772
773impl TransactOption {
774 pub fn idempotent() -> Self {
776 Self {
777 is_idempotent: true,
778 ..TransactOption::default()
779 }
780 }
781}