1use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use fdb_sys::if_cfg_api_versions;
20use foundationdb_macros::cfg_api_versions;
21use foundationdb_sys as fdb_sys;
22
23use crate::metrics::{MetricsReport, TransactionMetrics};
24use crate::options;
25use crate::transaction::*;
26use crate::{error, FdbError, FdbResult};
27
28use crate::error::FdbBindingError;
29#[cfg_api_versions(min = 710)]
30#[cfg(feature = "tenant-experimental")]
31use crate::tenant::FdbTenant;
32use futures::prelude::*;
33
34pub struct MaybeCommitted(bool);
41
42impl From<MaybeCommitted> for bool {
43    fn from(value: MaybeCommitted) -> Self {
44        value.0
45    }
46}
47
48pub struct Database {
54    pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
55}
56unsafe impl Send for Database {}
57unsafe impl Sync for Database {}
58impl Drop for Database {
59    fn drop(&mut self) {
60        unsafe {
61            fdb_sys::fdb_database_destroy(self.inner.as_ptr());
62        }
63    }
64}
65
66#[cfg_api_versions(min = 610)]
67impl Database {
68    pub fn new(path: Option<&str>) -> FdbResult<Database> {
70        let path_str =
71            path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
72        let path_ptr = path_str
73            .as_ref()
74            .map(|path| path.as_ptr())
75            .unwrap_or(std::ptr::null());
76        let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
77        let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
78        drop(path_str); error::eval(err)?;
80        let ptr =
81            NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
82        Ok(unsafe { Self::new_from_pointer(ptr) })
85    }
86
87    pub unsafe fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
95        Self { inner: ptr }
96    }
97
98    pub fn from_path(path: &str) -> FdbResult<Database> {
100        Self::new(Some(path))
101    }
102
103    #[allow(clippy::should_implement_trait)]
105    pub fn default() -> FdbResult<Database> {
106        Self::new(None)
107    }
108}
109
110#[cfg_api_versions(min = 710)]
111#[cfg(feature = "tenant-experimental")]
112impl Database {
113    pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
114        let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
115        let err = unsafe {
116            fdb_sys::fdb_database_open_tenant(
117                self.inner.as_ptr(),
118                tenant_name.as_ptr(),
119                tenant_name.len().try_into().unwrap(),
120                &mut ptr,
121            )
122        };
123        error::eval(err)?;
124        Ok(FdbTenant {
125            inner: NonNull::new(ptr)
126                .expect("fdb_database_open_tenant to not return null if there is no error"),
127            name: tenant_name.to_owned(),
128        })
129    }
130}
131
132#[cfg_api_versions(min = 730)]
133impl Database {
134    pub fn get_client_status(
136        &self,
137    ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin {
138        crate::future::FdbFuture::new(unsafe {
139            fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
140        })
141    }
142}
143
144impl Database {
145    pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
150        if_cfg_api_versions!(min = 510, max = 600 => {
151            let cluster = crate::cluster::Cluster::new(path).await?;
152            let database = cluster.create_database().await?;
153            Ok(database)
154        } else {
155            Database::new(path)
156        })
157    }
158
159    pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
161        unsafe { opt.apply(self.inner.as_ptr()) }
162    }
163
164    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
166    pub fn create_trx(&self) -> FdbResult<Transaction> {
167        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
168        let err =
169            unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
170        error::eval(err)?;
171        Ok(Transaction::new(NonNull::new(trx).expect(
172            "fdb_database_create_transaction to not return null if there is no error",
173        )))
174    }
175
176    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
187    pub fn create_instrumented_trx(
188        &self,
189        metrics: TransactionMetrics,
190    ) -> Result<Transaction, FdbBindingError> {
191        let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
192        let err =
193            unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
194        error::eval(err)?;
195
196        let inner = NonNull::new(trx)
197            .expect("fdb_database_create_transaction to not return null if there is no error");
198        Ok(Transaction::new_instrumented(inner, metrics))
199    }
200
201    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
202    fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
203        Ok(RetryableTransaction::new(self.create_trx()?))
204    }
205
206    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
217    pub fn create_intrumented_retryable_trx(
218        &self,
219        metrics: TransactionMetrics,
220    ) -> Result<RetryableTransaction, FdbBindingError> {
221        Ok(RetryableTransaction::new(
222            self.create_instrumented_trx(metrics.clone())?,
223        ))
224    }
225
226    pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
241    where
242        F: DatabaseTransact,
243    {
244        let is_idempotent = options.is_idempotent;
245        let time_out = options.time_out.map(|d| Instant::now() + d);
246        let retry_limit = options.retry_limit;
247        let mut tries: u32 = 0;
248        let mut trx = self.create_trx()?;
249        let mut can_retry = move || {
250            tries += 1;
251            retry_limit.map(|limit| tries < limit).unwrap_or(true)
252                && time_out.map(|t| Instant::now() < t).unwrap_or(true)
253        };
254        loop {
255            let r = f.transact(trx).await;
256            f = r.0;
257            trx = r.1;
258            trx = match r.2 {
259                Ok(item) => match trx.commit().await {
260                    Ok(_) => break Ok(item),
261                    Err(e) => {
262                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
263                            e.on_error().await?
264                        } else {
265                            break Err(F::Error::from(e.into()));
266                        }
267                    }
268                },
269                Err(user_err) => match user_err.try_into_fdb_error() {
270                    Ok(e) => {
271                        if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
272                            trx.on_error(e).await?
273                        } else {
274                            break Err(F::Error::from(e));
275                        }
276                    }
277                    Err(user_err) => break Err(user_err),
278                },
279            };
280        }
281    }
282
283    pub fn transact_boxed<'trx, F, D, T, E>(
284        &'trx self,
285        data: D,
286        f: F,
287        options: TransactOption,
288    ) -> impl Future<Output = Result<T, E>> + Send + 'trx
289    where
290        for<'a> F: FnMut(
291            &'a Transaction,
292            &'a mut D,
293        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
294        E: TransactError,
295        F: Send + 'trx,
296        T: Send + 'trx,
297        E: Send + 'trx,
298        D: Send + 'trx,
299    {
300        self.transact(
301            boxed::FnMutBoxed {
302                f,
303                d: data,
304                m: PhantomData,
305            },
306            options,
307        )
308    }
309
310    pub fn transact_boxed_local<'trx, F, D, T, E>(
311        &'trx self,
312        data: D,
313        f: F,
314        options: TransactOption,
315    ) -> impl Future<Output = Result<T, E>> + 'trx
316    where
317        for<'a> F:
318            FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
319        E: TransactError,
320        F: 'trx,
321        T: 'trx,
322        E: 'trx,
323        D: 'trx,
324    {
325        self.transact(
326            boxed_local::FnMutBoxedLocal {
327                f,
328                d: data,
329                m: PhantomData,
330            },
331            options,
332        )
333    }
334
335    #[cfg_attr(
365        feature = "trace",
366        tracing::instrument(level = "debug", skip(self, closure))
367    )]
368    pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
369    where
370        F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
371        Fut: Future<Output = Result<T, FdbBindingError>>,
372    {
373        let mut maybe_committed_transaction = false;
374        let mut transaction = self.create_retryable_trx()?;
377        #[cfg(feature = "trace")]
378        let mut iteration = 0;
379
380        loop {
381            #[cfg(feature = "trace")]
382            {
383                iteration += 1;
384            }
385            let result_closure = closure(
387                transaction.clone(),
388                MaybeCommitted(maybe_committed_transaction),
389            )
390            .await;
391
392            if let Err(e) = result_closure {
393                if let Some(e) = e.get_fdb_error() {
395                    maybe_committed_transaction = e.is_maybe_committed();
396                    match transaction.on_error(e).await {
398                        Ok(Ok(t)) => {
400                            #[cfg(feature = "trace")]
401                            {
402                                let error_code = e.code();
403                                tracing::warn!(iteration, error_code, "restarting transaction");
404                            }
405
406                            transaction = t;
407                            continue;
408                        }
409                        Ok(Err(non_retryable_error)) => {
410                            return Err(FdbBindingError::from(non_retryable_error))
411                        }
412                        Err(non_retryable_error) => return Err(non_retryable_error),
414                    }
415                }
416                return Err(e);
418            }
419
420            #[cfg(feature = "trace")]
421            tracing::info!(iteration, "closure executed, checking result...");
422
423            let commit_result = transaction.commit().await;
424
425            match commit_result {
426                Err(err) => {
428                    #[cfg(feature = "trace")]
429                    tracing::error!(
430                        iteration,
431                        "transaction reference kept, aborting transaction"
432                    );
433                    return Err(err);
434                }
435                Ok(Ok(_)) => {
436                    #[cfg(feature = "trace")]
437                    tracing::info!(iteration, "success, returning result");
438                    return result_closure;
439                }
440                Ok(Err(transaction_commit_error)) => {
441                    #[cfg(feature = "trace")]
442                    let error_code = transaction_commit_error.code();
443
444                    maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
445                    match transaction_commit_error.on_error().await {
447                        Ok(t) => {
448                            #[cfg(feature = "trace")]
449                            tracing::warn!(iteration, error_code, "restarting transaction");
450
451                            transaction = RetryableTransaction::new(t);
452                            continue;
453                        }
454                        Err(non_retryable_error) => {
455                            #[cfg(feature = "trace")]
456                            {
457                                let error_code = non_retryable_error.code();
458                                tracing::error!(
459                                    iteration,
460                                    error_code,
461                                    "could not commit, non retryable error"
462                                );
463                            }
464
465                            return Err(FdbBindingError::from(non_retryable_error));
466                        }
467                    }
468                }
469            }
470        }
471    }
472
473    #[cfg_attr(
499        feature = "trace",
500        tracing::instrument(level = "debug", skip(self, closure))
501    )]
502    pub async fn instrumented_run<F, Fut, T>(
503        &self,
504        closure: F,
505    ) -> Result<(T, MetricsReport), (FdbBindingError, MetricsReport)>
506    where
507        F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
508        Fut: Future<Output = Result<T, FdbBindingError>>,
509    {
510        let now_start = std::time::Instant::now();
511        let metrics = TransactionMetrics::new();
512        let mut maybe_committed_transaction = false;
513
514        let mut transaction = match self.create_intrumented_retryable_trx(metrics.clone()) {
517            Ok(trx) => trx,
518            Err(err) => {
519                let total_duration = now_start.elapsed().as_millis() as u64;
521                metrics.set_execution_time(total_duration);
522                return Err((err, metrics.get_metrics_data()));
523            }
524        };
525
526        loop {
527            let result_closure = closure(
529                transaction.clone(),
530                MaybeCommitted(maybe_committed_transaction),
531            )
532            .await;
533
534            if let Err(error) = result_closure {
535                if let Some(e) = error.get_fdb_error() {
536                    maybe_committed_transaction = e.is_maybe_committed();
538                    let now_on_error = std::time::Instant::now();
540                    let on_error_result = transaction.on_error(e).await;
541                    let error_duration = now_on_error.elapsed().as_millis() as u64;
542                    metrics.add_error_time(error_duration);
543
544                    match on_error_result {
545                        Ok(Ok(t)) => {
547                            transaction = t;
548                            metrics.reset_current();
550                            continue;
551                        }
552                        Ok(Err(non_retryable_error)) => {
553                            let total_duration = now_start.elapsed().as_millis() as u64;
554                            metrics.set_execution_time(total_duration);
555                            return Err((
556                                FdbBindingError::from(non_retryable_error),
557                                metrics.get_metrics_data(),
558                            ));
559                        }
560                        Err(non_retryable_error) => {
562                            let total_duration = now_start.elapsed().as_millis() as u64;
563                            metrics.set_execution_time(total_duration);
564                            return Err((non_retryable_error, metrics.get_metrics_data()));
565                        }
566                    }
567                }
568                let total_duration = now_start.elapsed().as_millis() as u64;
570                metrics.set_execution_time(total_duration);
571                return Err((error, metrics.get_metrics_data()));
572            }
573
574            let now_commit = std::time::Instant::now();
575            let commit_result = transaction.commit().await;
576            let commit_duration = now_commit.elapsed().as_millis() as u64;
577            metrics.record_commit_time(commit_duration);
578
579            match commit_result {
580                Err(err) => {
582                    let total_duration = now_start.elapsed().as_millis() as u64;
583                    metrics.set_execution_time(total_duration);
584                    return Err((err, metrics.get_metrics_data()));
585                }
586                Ok(Ok(committed)) => {
587                    match committed.committed_version() {
589                        Ok(version) => metrics.set_commit_version(version),
590                        Err(_err) => {
591                            }
595                    }
596
597                    let total_duration = now_start.elapsed().as_millis() as u64;
598                    metrics.set_execution_time(total_duration);
599                    return Ok((result_closure.unwrap(), metrics.get_metrics_data()));
600                }
601                Ok(Err(transaction_commit_error)) => {
602                    maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
603                    let now_on_error = std::time::Instant::now();
605                    let on_error_result = transaction_commit_error.on_error().await;
606                    let error_duration = now_on_error.elapsed().as_millis() as u64;
607                    metrics.add_error_time(error_duration);
608
609                    match on_error_result {
610                        Ok(t) => {
611                            transaction = RetryableTransaction::new(t);
612                            metrics.reset_current();
614                            continue;
615                        }
616                        Err(non_retryable_error) => {
617                            let total_duration = now_start.elapsed().as_millis() as u64;
618                            metrics.set_execution_time(total_duration);
619                            return Err((
620                                FdbBindingError::from(non_retryable_error),
621                                metrics.get_metrics_data(),
622                            ));
623                        }
624                    }
625                }
626            }
627        }
628    }
629
630    pub async fn perform_no_op(&self) -> FdbResult<()> {
638        let trx = self.create_trx()?;
639
640        trx.set_read_version(42);
643        trx.get_read_version().await?;
644        Ok(())
645    }
646
647    #[cfg_api_versions(min = 710)]
650    pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
651        let busyness =
652            unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
653        Ok(busyness)
654    }
655}
656pub trait DatabaseTransact: Sized {
657    type Item;
658    type Error: TransactError;
659    type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
660    fn transact(self, trx: Transaction) -> Self::Future;
661}
662
663#[allow(clippy::needless_lifetimes)]
664#[allow(clippy::type_complexity)]
665mod boxed {
666    use super::*;
667
668    async fn boxed_data_fut<'t, F, T, E, D>(
669        mut f: FnMutBoxed<'t, F, D>,
670        trx: Transaction,
671    ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
672    where
673        F: for<'a> FnMut(
674            &'a Transaction,
675            &'a mut D,
676        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
677        E: TransactError,
678    {
679        let r = (f.f)(&trx, &mut f.d).await;
680        (f, trx, r)
681    }
682
683    pub struct FnMutBoxed<'t, F, D> {
684        pub f: F,
685        pub d: D,
686        pub m: PhantomData<&'t ()>,
687    }
688    impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
689    where
690        F: for<'a> FnMut(
691            &'a Transaction,
692            &'a mut D,
693        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
694        F: 't + Send,
695        T: 't,
696        E: 't,
697        D: 't + Send,
698        E: TransactError,
699    {
700        type Item = T;
701        type Error = E;
702        type Future = Pin<
703            Box<
704                dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
705                    + Send
706                    + 't,
707            >,
708        >;
709
710        fn transact(self, trx: Transaction) -> Self::Future {
711            boxed_data_fut(self, trx).boxed()
712        }
713    }
714}
715
716#[allow(clippy::needless_lifetimes)]
717#[allow(clippy::type_complexity)]
718mod boxed_local {
719    use super::*;
720
721    async fn boxed_local_data_fut<'t, F, T, E, D>(
722        mut f: FnMutBoxedLocal<'t, F, D>,
723        trx: Transaction,
724    ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
725    where
726        F: for<'a> FnMut(
727            &'a Transaction,
728            &'a mut D,
729        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
730        E: TransactError,
731    {
732        let r = (f.f)(&trx, &mut f.d).await;
733        (f, trx, r)
734    }
735
736    pub struct FnMutBoxedLocal<'t, F, D> {
737        pub f: F,
738        pub d: D,
739        pub m: PhantomData<&'t ()>,
740    }
741    impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
742    where
743        F: for<'a> FnMut(
744            &'a Transaction,
745            &'a mut D,
746        ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
747        F: 't,
748        T: 't,
749        E: 't,
750        D: 't,
751        E: TransactError,
752    {
753        type Item = T;
754        type Error = E;
755        type Future = Pin<
756            Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
757        >;
758
759        fn transact(self, trx: Transaction) -> Self::Future {
760            boxed_local_data_fut(self, trx).boxed_local()
761        }
762    }
763}
764
765pub trait TransactError: From<FdbError> {
767    fn try_into_fdb_error(self) -> Result<FdbError, Self>;
768}
769impl<T> TransactError for T
770where
771    T: From<FdbError> + TryInto<FdbError, Error = T>,
772{
773    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
774        self.try_into()
775    }
776}
777impl TransactError for FdbError {
778    fn try_into_fdb_error(self) -> Result<FdbError, Self> {
779        Ok(self)
780    }
781}
782
783#[derive(Default, Clone)]
785pub struct TransactOption {
786    pub retry_limit: Option<u32>,
787    pub time_out: Option<Duration>,
788    pub is_idempotent: bool,
789}
790
791impl TransactOption {
792    pub fn idempotent() -> Self {
794        Self {
795            is_idempotent: true,
796            ..TransactOption::default()
797        }
798    }
799}