1use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use fdb_sys::if_cfg_api_versions;
20use foundationdb_macros::cfg_api_versions;
21use foundationdb_sys as fdb_sys;
22
23use crate::metrics::{MetricsReport, TransactionMetrics};
24use crate::options;
25use crate::transaction::*;
26use crate::{error, FdbError, FdbResult};
27
28use crate::error::FdbBindingError;
29#[cfg_api_versions(min = 710)]
30#[cfg(feature = "tenant-experimental")]
31use crate::tenant::FdbTenant;
32use futures::prelude::*;
33
34pub struct MaybeCommitted(bool);
41
42impl From<MaybeCommitted> for bool {
43 fn from(value: MaybeCommitted) -> Self {
44 value.0
45 }
46}
47
48pub struct Database {
54 pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
55}
56unsafe impl Send for Database {}
57unsafe impl Sync for Database {}
58impl Drop for Database {
59 fn drop(&mut self) {
60 unsafe {
61 fdb_sys::fdb_database_destroy(self.inner.as_ptr());
62 }
63 }
64}
65
66#[cfg_api_versions(min = 610)]
67impl Database {
68 pub fn new(path: Option<&str>) -> FdbResult<Database> {
70 let path_str =
71 path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
72 let path_ptr = path_str
73 .as_ref()
74 .map(|path| path.as_ptr())
75 .unwrap_or(std::ptr::null());
76 let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
77 let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
78 drop(path_str); error::eval(err)?;
80 let ptr =
81 NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
82 Ok(unsafe { Self::new_from_pointer(ptr) })
85 }
86
87 pub unsafe fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
95 Self { inner: ptr }
96 }
97
98 pub fn from_path(path: &str) -> FdbResult<Database> {
100 Self::new(Some(path))
101 }
102
103 #[allow(clippy::should_implement_trait)]
105 pub fn default() -> FdbResult<Database> {
106 Self::new(None)
107 }
108}
109
110#[cfg_api_versions(min = 710)]
111#[cfg(feature = "tenant-experimental")]
112impl Database {
113 pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
114 let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
115 let err = unsafe {
116 fdb_sys::fdb_database_open_tenant(
117 self.inner.as_ptr(),
118 tenant_name.as_ptr(),
119 tenant_name.len().try_into().unwrap(),
120 &mut ptr,
121 )
122 };
123 error::eval(err)?;
124 Ok(FdbTenant {
125 inner: NonNull::new(ptr)
126 .expect("fdb_database_open_tenant to not return null if there is no error"),
127 name: tenant_name.to_owned(),
128 })
129 }
130}
131
132#[cfg_api_versions(min = 730)]
133impl Database {
134 pub fn get_client_status(
136 &self,
137 ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin {
138 crate::future::FdbFuture::new(unsafe {
139 fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
140 })
141 }
142}
143
144impl Database {
145 pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
150 if_cfg_api_versions!(min = 510, max = 600 => {
151 let cluster = crate::cluster::Cluster::new(path).await?;
152 let database = cluster.create_database().await?;
153 Ok(database)
154 } else {
155 Database::new(path)
156 })
157 }
158
159 pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
161 unsafe { opt.apply(self.inner.as_ptr()) }
162 }
163
164 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
166 pub fn create_trx(&self) -> FdbResult<Transaction> {
167 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
168 let err =
169 unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
170 error::eval(err)?;
171 Ok(Transaction::new(NonNull::new(trx).expect(
172 "fdb_database_create_transaction to not return null if there is no error",
173 )))
174 }
175
176 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
187 pub fn create_instrumented_trx(
188 &self,
189 metrics: TransactionMetrics,
190 ) -> Result<Transaction, FdbBindingError> {
191 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
192 let err =
193 unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
194 error::eval(err)?;
195
196 let inner = NonNull::new(trx)
197 .expect("fdb_database_create_transaction to not return null if there is no error");
198 Ok(Transaction::new_instrumented(inner, metrics))
199 }
200
201 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
202 fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
203 Ok(RetryableTransaction::new(self.create_trx()?))
204 }
205
206 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
217 pub fn create_intrumented_retryable_trx(
218 &self,
219 metrics: TransactionMetrics,
220 ) -> Result<RetryableTransaction, FdbBindingError> {
221 Ok(RetryableTransaction::new(
222 self.create_instrumented_trx(metrics.clone())?,
223 ))
224 }
225
226 pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
241 where
242 F: DatabaseTransact,
243 {
244 let is_idempotent = options.is_idempotent;
245 let time_out = options.time_out.map(|d| Instant::now() + d);
246 let retry_limit = options.retry_limit;
247 let mut tries: u32 = 0;
248 let mut trx = self.create_trx()?;
249 let mut can_retry = move || {
250 tries += 1;
251 retry_limit.map(|limit| tries < limit).unwrap_or(true)
252 && time_out.map(|t| Instant::now() < t).unwrap_or(true)
253 };
254 loop {
255 let r = f.transact(trx).await;
256 f = r.0;
257 trx = r.1;
258 trx = match r.2 {
259 Ok(item) => match trx.commit().await {
260 Ok(_) => break Ok(item),
261 Err(e) => {
262 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
263 e.on_error().await?
264 } else {
265 break Err(F::Error::from(e.into()));
266 }
267 }
268 },
269 Err(user_err) => match user_err.try_into_fdb_error() {
270 Ok(e) => {
271 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
272 trx.on_error(e).await?
273 } else {
274 break Err(F::Error::from(e));
275 }
276 }
277 Err(user_err) => break Err(user_err),
278 },
279 };
280 }
281 }
282
283 pub fn transact_boxed<'trx, F, D, T, E>(
284 &'trx self,
285 data: D,
286 f: F,
287 options: TransactOption,
288 ) -> impl Future<Output = Result<T, E>> + Send + 'trx
289 where
290 for<'a> F: FnMut(
291 &'a Transaction,
292 &'a mut D,
293 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
294 E: TransactError,
295 F: Send + 'trx,
296 T: Send + 'trx,
297 E: Send + 'trx,
298 D: Send + 'trx,
299 {
300 self.transact(
301 boxed::FnMutBoxed {
302 f,
303 d: data,
304 m: PhantomData,
305 },
306 options,
307 )
308 }
309
310 pub fn transact_boxed_local<'trx, F, D, T, E>(
311 &'trx self,
312 data: D,
313 f: F,
314 options: TransactOption,
315 ) -> impl Future<Output = Result<T, E>> + 'trx
316 where
317 for<'a> F:
318 FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
319 E: TransactError,
320 F: 'trx,
321 T: 'trx,
322 E: 'trx,
323 D: 'trx,
324 {
325 self.transact(
326 boxed_local::FnMutBoxedLocal {
327 f,
328 d: data,
329 m: PhantomData,
330 },
331 options,
332 )
333 }
334
335 #[cfg_attr(
365 feature = "trace",
366 tracing::instrument(level = "debug", skip(self, closure))
367 )]
368 pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
369 where
370 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
371 Fut: Future<Output = Result<T, FdbBindingError>>,
372 {
373 let mut maybe_committed_transaction = false;
374 let mut transaction = self.create_retryable_trx()?;
377 #[cfg(feature = "trace")]
378 let mut iteration = 0;
379
380 loop {
381 #[cfg(feature = "trace")]
382 {
383 iteration += 1;
384 }
385 let result_closure = closure(
387 transaction.clone(),
388 MaybeCommitted(maybe_committed_transaction),
389 )
390 .await;
391
392 if let Err(e) = result_closure {
393 if let Some(e) = e.get_fdb_error() {
395 maybe_committed_transaction = e.is_maybe_committed();
396 match transaction.on_error(e).await {
398 Ok(Ok(t)) => {
400 #[cfg(feature = "trace")]
401 {
402 let error_code = e.code();
403 tracing::warn!(iteration, error_code, "restarting transaction");
404 }
405
406 transaction = t;
407 continue;
408 }
409 Ok(Err(non_retryable_error)) => {
410 return Err(FdbBindingError::from(non_retryable_error))
411 }
412 Err(non_retryable_error) => return Err(non_retryable_error),
414 }
415 }
416 return Err(e);
418 }
419
420 #[cfg(feature = "trace")]
421 tracing::info!(iteration, "closure executed, checking result...");
422
423 let commit_result = transaction.commit().await;
424
425 match commit_result {
426 Err(err) => {
428 #[cfg(feature = "trace")]
429 tracing::error!(
430 iteration,
431 "transaction reference kept, aborting transaction"
432 );
433 return Err(err);
434 }
435 Ok(Ok(_)) => {
436 #[cfg(feature = "trace")]
437 tracing::info!(iteration, "success, returning result");
438 return result_closure;
439 }
440 Ok(Err(transaction_commit_error)) => {
441 #[cfg(feature = "trace")]
442 let error_code = transaction_commit_error.code();
443
444 maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
445 match transaction_commit_error.on_error().await {
447 Ok(t) => {
448 #[cfg(feature = "trace")]
449 tracing::warn!(iteration, error_code, "restarting transaction");
450
451 transaction = RetryableTransaction::new(t);
452 continue;
453 }
454 Err(non_retryable_error) => {
455 #[cfg(feature = "trace")]
456 {
457 let error_code = non_retryable_error.code();
458 tracing::error!(
459 iteration,
460 error_code,
461 "could not commit, non retryable error"
462 );
463 }
464
465 return Err(FdbBindingError::from(non_retryable_error));
466 }
467 }
468 }
469 }
470 }
471 }
472
473 #[cfg_attr(
499 feature = "trace",
500 tracing::instrument(level = "debug", skip(self, closure))
501 )]
502 pub async fn instrumented_run<F, Fut, T>(
503 &self,
504 closure: F,
505 ) -> Result<(T, MetricsReport), (FdbBindingError, MetricsReport)>
506 where
507 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
508 Fut: Future<Output = Result<T, FdbBindingError>>,
509 {
510 let now_start = std::time::Instant::now();
511 let metrics = TransactionMetrics::new();
512 let mut maybe_committed_transaction = false;
513
514 let mut transaction = match self.create_intrumented_retryable_trx(metrics.clone()) {
517 Ok(trx) => trx,
518 Err(err) => {
519 let total_duration = now_start.elapsed().as_millis() as u64;
521 metrics.set_execution_time(total_duration);
522 return Err((err, metrics.get_metrics_data()));
523 }
524 };
525
526 loop {
527 let result_closure = closure(
529 transaction.clone(),
530 MaybeCommitted(maybe_committed_transaction),
531 )
532 .await;
533
534 if let Err(error) = result_closure {
535 if let Some(e) = error.get_fdb_error() {
536 maybe_committed_transaction = e.is_maybe_committed();
538 let now_on_error = std::time::Instant::now();
540 let on_error_result = transaction.on_error(e).await;
541 let error_duration = now_on_error.elapsed().as_millis() as u64;
542 metrics.add_error_time(error_duration);
543
544 match on_error_result {
545 Ok(Ok(t)) => {
547 transaction = t;
548 metrics.reset_current();
550 continue;
551 }
552 Ok(Err(non_retryable_error)) => {
553 let total_duration = now_start.elapsed().as_millis() as u64;
554 metrics.set_execution_time(total_duration);
555 return Err((
556 FdbBindingError::from(non_retryable_error),
557 metrics.get_metrics_data(),
558 ));
559 }
560 Err(non_retryable_error) => {
562 let total_duration = now_start.elapsed().as_millis() as u64;
563 metrics.set_execution_time(total_duration);
564 return Err((non_retryable_error, metrics.get_metrics_data()));
565 }
566 }
567 }
568 let total_duration = now_start.elapsed().as_millis() as u64;
570 metrics.set_execution_time(total_duration);
571 return Err((error, metrics.get_metrics_data()));
572 }
573
574 let now_commit = std::time::Instant::now();
575 let commit_result = transaction.commit().await;
576 let commit_duration = now_commit.elapsed().as_millis() as u64;
577 metrics.record_commit_time(commit_duration);
578
579 match commit_result {
580 Err(err) => {
582 let total_duration = now_start.elapsed().as_millis() as u64;
583 metrics.set_execution_time(total_duration);
584 return Err((err, metrics.get_metrics_data()));
585 }
586 Ok(Ok(committed)) => {
587 match committed.committed_version() {
589 Ok(version) => metrics.set_commit_version(version),
590 Err(_err) => {
591 }
595 }
596
597 let total_duration = now_start.elapsed().as_millis() as u64;
598 metrics.set_execution_time(total_duration);
599 return Ok((result_closure.unwrap(), metrics.get_metrics_data()));
600 }
601 Ok(Err(transaction_commit_error)) => {
602 maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
603 let now_on_error = std::time::Instant::now();
605 let on_error_result = transaction_commit_error.on_error().await;
606 let error_duration = now_on_error.elapsed().as_millis() as u64;
607 metrics.add_error_time(error_duration);
608
609 match on_error_result {
610 Ok(t) => {
611 transaction = RetryableTransaction::new(t);
612 metrics.reset_current();
614 continue;
615 }
616 Err(non_retryable_error) => {
617 let total_duration = now_start.elapsed().as_millis() as u64;
618 metrics.set_execution_time(total_duration);
619 return Err((
620 FdbBindingError::from(non_retryable_error),
621 metrics.get_metrics_data(),
622 ));
623 }
624 }
625 }
626 }
627 }
628 }
629
630 pub async fn perform_no_op(&self) -> FdbResult<()> {
638 let trx = self.create_trx()?;
639
640 trx.set_read_version(42);
643 trx.get_read_version().await?;
644 Ok(())
645 }
646
647 #[cfg_api_versions(min = 710)]
650 pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
651 let busyness =
652 unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
653 Ok(busyness)
654 }
655}
656pub trait DatabaseTransact: Sized {
657 type Item;
658 type Error: TransactError;
659 type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
660 fn transact(self, trx: Transaction) -> Self::Future;
661}
662
663#[allow(clippy::needless_lifetimes)]
664#[allow(clippy::type_complexity)]
665mod boxed {
666 use super::*;
667
668 async fn boxed_data_fut<'t, F, T, E, D>(
669 mut f: FnMutBoxed<'t, F, D>,
670 trx: Transaction,
671 ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
672 where
673 F: for<'a> FnMut(
674 &'a Transaction,
675 &'a mut D,
676 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
677 E: TransactError,
678 {
679 let r = (f.f)(&trx, &mut f.d).await;
680 (f, trx, r)
681 }
682
683 pub struct FnMutBoxed<'t, F, D> {
684 pub f: F,
685 pub d: D,
686 pub m: PhantomData<&'t ()>,
687 }
688 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
689 where
690 F: for<'a> FnMut(
691 &'a Transaction,
692 &'a mut D,
693 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
694 F: 't + Send,
695 T: 't,
696 E: 't,
697 D: 't + Send,
698 E: TransactError,
699 {
700 type Item = T;
701 type Error = E;
702 type Future = Pin<
703 Box<
704 dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
705 + Send
706 + 't,
707 >,
708 >;
709
710 fn transact(self, trx: Transaction) -> Self::Future {
711 boxed_data_fut(self, trx).boxed()
712 }
713 }
714}
715
716#[allow(clippy::needless_lifetimes)]
717#[allow(clippy::type_complexity)]
718mod boxed_local {
719 use super::*;
720
721 async fn boxed_local_data_fut<'t, F, T, E, D>(
722 mut f: FnMutBoxedLocal<'t, F, D>,
723 trx: Transaction,
724 ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
725 where
726 F: for<'a> FnMut(
727 &'a Transaction,
728 &'a mut D,
729 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
730 E: TransactError,
731 {
732 let r = (f.f)(&trx, &mut f.d).await;
733 (f, trx, r)
734 }
735
736 pub struct FnMutBoxedLocal<'t, F, D> {
737 pub f: F,
738 pub d: D,
739 pub m: PhantomData<&'t ()>,
740 }
741 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
742 where
743 F: for<'a> FnMut(
744 &'a Transaction,
745 &'a mut D,
746 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
747 F: 't,
748 T: 't,
749 E: 't,
750 D: 't,
751 E: TransactError,
752 {
753 type Item = T;
754 type Error = E;
755 type Future = Pin<
756 Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
757 >;
758
759 fn transact(self, trx: Transaction) -> Self::Future {
760 boxed_local_data_fut(self, trx).boxed_local()
761 }
762 }
763}
764
765pub trait TransactError: From<FdbError> {
767 fn try_into_fdb_error(self) -> Result<FdbError, Self>;
768}
769impl<T> TransactError for T
770where
771 T: From<FdbError> + TryInto<FdbError, Error = T>,
772{
773 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
774 self.try_into()
775 }
776}
777impl TransactError for FdbError {
778 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
779 Ok(self)
780 }
781}
782
783#[derive(Default, Clone)]
785pub struct TransactOption {
786 pub retry_limit: Option<u32>,
787 pub time_out: Option<Duration>,
788 pub is_idempotent: bool,
789}
790
791impl TransactOption {
792 pub fn idempotent() -> Self {
794 Self {
795 is_idempotent: true,
796 ..TransactOption::default()
797 }
798 }
799}