1use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use fdb_sys::if_cfg_api_versions;
20use foundationdb_macros::cfg_api_versions;
21use foundationdb_sys as fdb_sys;
22
23use crate::metrics::{MetricsReport, TransactionMetrics};
24use crate::options;
25use crate::transaction::*;
26use crate::{error, FdbError, FdbResult};
27
28use crate::error::FdbBindingError;
29#[cfg_api_versions(min = 710)]
30#[cfg(feature = "tenant-experimental")]
31use crate::tenant::FdbTenant;
32use futures::prelude::*;
33
34pub struct MaybeCommitted(bool);
41
42impl From<MaybeCommitted> for bool {
43 fn from(value: MaybeCommitted) -> Self {
44 value.0
45 }
46}
47
48pub struct Database {
54 pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
55}
56unsafe impl Send for Database {}
57unsafe impl Sync for Database {}
58impl Drop for Database {
59 fn drop(&mut self) {
60 unsafe {
61 fdb_sys::fdb_database_destroy(self.inner.as_ptr());
62 }
63 }
64}
65
66#[cfg_api_versions(min = 610)]
67impl Database {
68 pub fn new(path: Option<&str>) -> FdbResult<Database> {
70 let path_str =
71 path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
72 let path_ptr = path_str
73 .as_ref()
74 .map(|path| path.as_ptr())
75 .unwrap_or(std::ptr::null());
76 let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
77 let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
78 drop(path_str); error::eval(err)?;
80 let ptr =
81 NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
82 Ok(unsafe { Self::new_from_pointer(ptr) })
85 }
86
87 pub unsafe fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
89 Self { inner: ptr }
90 }
91
92 pub fn from_path(path: &str) -> FdbResult<Database> {
94 Self::new(Some(path))
95 }
96
97 #[allow(clippy::should_implement_trait)]
99 pub fn default() -> FdbResult<Database> {
100 Self::new(None)
101 }
102}
103
104#[cfg_api_versions(min = 710)]
105#[cfg(feature = "tenant-experimental")]
106impl Database {
107 pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
108 let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
109 let err = unsafe {
110 fdb_sys::fdb_database_open_tenant(
111 self.inner.as_ptr(),
112 tenant_name.as_ptr(),
113 tenant_name.len().try_into().unwrap(),
114 &mut ptr,
115 )
116 };
117 error::eval(err)?;
118 Ok(FdbTenant {
119 inner: NonNull::new(ptr)
120 .expect("fdb_database_open_tenant to not return null if there is no error"),
121 name: tenant_name.to_owned(),
122 })
123 }
124}
125
126#[cfg_api_versions(min = 730)]
127impl Database {
128 pub fn get_client_status(
130 &self,
131 ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin {
132 crate::future::FdbFuture::new(unsafe {
133 fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
134 })
135 }
136}
137
138impl Database {
139 pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
144 if_cfg_api_versions!(min = 510, max = 600 => {
145 let cluster = crate::cluster::Cluster::new(path).await?;
146 let database = cluster.create_database().await?;
147 Ok(database)
148 } else {
149 Database::new(path)
150 })
151 }
152
153 pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
155 unsafe { opt.apply(self.inner.as_ptr()) }
156 }
157
158 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
160 pub fn create_trx(&self) -> FdbResult<Transaction> {
161 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
162 let err =
163 unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
164 error::eval(err)?;
165 Ok(Transaction::new(NonNull::new(trx).expect(
166 "fdb_database_create_transaction to not return null if there is no error",
167 )))
168 }
169
170 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
181 pub fn create_instrumented_trx(
182 &self,
183 metrics: TransactionMetrics,
184 ) -> Result<Transaction, FdbBindingError> {
185 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
186 let err =
187 unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
188 error::eval(err)?;
189
190 let inner = NonNull::new(trx)
191 .expect("fdb_database_create_transaction to not return null if there is no error");
192 Ok(Transaction::new_instrumented(inner, metrics))
193 }
194
195 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
196 fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
197 Ok(RetryableTransaction::new(self.create_trx()?))
198 }
199
200 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
211 pub fn create_intrumented_retryable_trx(
212 &self,
213 metrics: TransactionMetrics,
214 ) -> Result<RetryableTransaction, FdbBindingError> {
215 Ok(RetryableTransaction::new(
216 self.create_instrumented_trx(metrics.clone())?,
217 ))
218 }
219
220 pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
235 where
236 F: DatabaseTransact,
237 {
238 let is_idempotent = options.is_idempotent;
239 let time_out = options.time_out.map(|d| Instant::now() + d);
240 let retry_limit = options.retry_limit;
241 let mut tries: u32 = 0;
242 let mut trx = self.create_trx()?;
243 let mut can_retry = move || {
244 tries += 1;
245 retry_limit.map(|limit| tries < limit).unwrap_or(true)
246 && time_out.map(|t| Instant::now() < t).unwrap_or(true)
247 };
248 loop {
249 let r = f.transact(trx).await;
250 f = r.0;
251 trx = r.1;
252 trx = match r.2 {
253 Ok(item) => match trx.commit().await {
254 Ok(_) => break Ok(item),
255 Err(e) => {
256 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
257 e.on_error().await?
258 } else {
259 break Err(F::Error::from(e.into()));
260 }
261 }
262 },
263 Err(user_err) => match user_err.try_into_fdb_error() {
264 Ok(e) => {
265 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
266 trx.on_error(e).await?
267 } else {
268 break Err(F::Error::from(e));
269 }
270 }
271 Err(user_err) => break Err(user_err),
272 },
273 };
274 }
275 }
276
277 pub fn transact_boxed<'trx, F, D, T, E>(
278 &'trx self,
279 data: D,
280 f: F,
281 options: TransactOption,
282 ) -> impl Future<Output = Result<T, E>> + Send + 'trx
283 where
284 for<'a> F: FnMut(
285 &'a Transaction,
286 &'a mut D,
287 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
288 E: TransactError,
289 F: Send + 'trx,
290 T: Send + 'trx,
291 E: Send + 'trx,
292 D: Send + 'trx,
293 {
294 self.transact(
295 boxed::FnMutBoxed {
296 f,
297 d: data,
298 m: PhantomData,
299 },
300 options,
301 )
302 }
303
304 pub fn transact_boxed_local<'trx, F, D, T, E>(
305 &'trx self,
306 data: D,
307 f: F,
308 options: TransactOption,
309 ) -> impl Future<Output = Result<T, E>> + 'trx
310 where
311 for<'a> F:
312 FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
313 E: TransactError,
314 F: 'trx,
315 T: 'trx,
316 E: 'trx,
317 D: 'trx,
318 {
319 self.transact(
320 boxed_local::FnMutBoxedLocal {
321 f,
322 d: data,
323 m: PhantomData,
324 },
325 options,
326 )
327 }
328
329 #[cfg_attr(
359 feature = "trace",
360 tracing::instrument(level = "debug", skip(self, closure))
361 )]
362 pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
363 where
364 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
365 Fut: Future<Output = Result<T, FdbBindingError>>,
366 {
367 let mut maybe_committed_transaction = false;
368 let mut transaction = self.create_retryable_trx()?;
371 #[cfg(feature = "trace")]
372 let mut iteration = 0;
373
374 loop {
375 #[cfg(feature = "trace")]
376 {
377 iteration += 1;
378 }
379 let result_closure = closure(
381 transaction.clone(),
382 MaybeCommitted(maybe_committed_transaction),
383 )
384 .await;
385
386 if let Err(e) = result_closure {
387 if let Some(e) = e.get_fdb_error() {
389 maybe_committed_transaction = e.is_maybe_committed();
390 match transaction.on_error(e).await {
392 Ok(Ok(t)) => {
394 #[cfg(feature = "trace")]
395 {
396 let error_code = e.code();
397 tracing::warn!(iteration, error_code, "restarting transaction");
398 }
399
400 transaction = t;
401 continue;
402 }
403 Ok(Err(non_retryable_error)) => {
404 return Err(FdbBindingError::from(non_retryable_error))
405 }
406 Err(non_retryable_error) => return Err(non_retryable_error),
408 }
409 }
410 return Err(e);
412 }
413
414 #[cfg(feature = "trace")]
415 tracing::info!(iteration, "closure executed, checking result...");
416
417 let commit_result = transaction.commit().await;
418
419 match commit_result {
420 Err(err) => {
422 #[cfg(feature = "trace")]
423 tracing::error!(
424 iteration,
425 "transaction reference kept, aborting transaction"
426 );
427 return Err(err);
428 }
429 Ok(Ok(_)) => {
430 #[cfg(feature = "trace")]
431 tracing::info!(iteration, "success, returning result");
432 return result_closure;
433 }
434 Ok(Err(transaction_commit_error)) => {
435 #[cfg(feature = "trace")]
436 let error_code = transaction_commit_error.code();
437
438 maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
439 match transaction_commit_error.on_error().await {
441 Ok(t) => {
442 #[cfg(feature = "trace")]
443 tracing::warn!(iteration, error_code, "restarting transaction");
444
445 transaction = RetryableTransaction::new(t);
446 continue;
447 }
448 Err(non_retryable_error) => {
449 return Err(FdbBindingError::from(non_retryable_error))
450 }
451 }
452 }
453 }
454 }
455 }
456
457 #[cfg_attr(
483 feature = "trace",
484 tracing::instrument(level = "debug", skip(self, closure))
485 )]
486 pub async fn instrumented_run<F, Fut, T>(
487 &self,
488 closure: F,
489 ) -> Result<(T, MetricsReport), (FdbBindingError, MetricsReport)>
490 where
491 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
492 Fut: Future<Output = Result<T, FdbBindingError>>,
493 {
494 let now_start = std::time::Instant::now();
495 let metrics = TransactionMetrics::new();
496 let mut maybe_committed_transaction = false;
497
498 let mut transaction = match self.create_intrumented_retryable_trx(metrics.clone()) {
501 Ok(trx) => trx,
502 Err(err) => {
503 let total_duration = now_start.elapsed().as_millis() as u64;
505 metrics.set_execution_time(total_duration);
506 return Err((err, metrics.get_metrics_data()));
507 }
508 };
509
510 loop {
511 let result_closure = closure(
513 transaction.clone(),
514 MaybeCommitted(maybe_committed_transaction),
515 )
516 .await;
517
518 if let Err(error) = result_closure {
519 if let Some(e) = error.get_fdb_error() {
520 maybe_committed_transaction = e.is_maybe_committed();
522 let now_on_error = std::time::Instant::now();
524 let on_error_result = transaction.on_error(e).await;
525 let error_duration = now_on_error.elapsed().as_millis() as u64;
526 metrics.add_error_time(error_duration);
527
528 match on_error_result {
529 Ok(Ok(t)) => {
531 transaction = t;
532 metrics.reset_current();
534 continue;
535 }
536 Ok(Err(non_retryable_error)) => {
537 let total_duration = now_start.elapsed().as_millis() as u64;
538 metrics.set_execution_time(total_duration);
539 return Err((
540 FdbBindingError::from(non_retryable_error),
541 metrics.get_metrics_data(),
542 ));
543 }
544 Err(non_retryable_error) => {
546 let total_duration = now_start.elapsed().as_millis() as u64;
547 metrics.set_execution_time(total_duration);
548 return Err((non_retryable_error, metrics.get_metrics_data()));
549 }
550 }
551 }
552 let total_duration = now_start.elapsed().as_millis() as u64;
554 metrics.set_execution_time(total_duration);
555 return Err((error, metrics.get_metrics_data()));
556 }
557
558 let now_commit = std::time::Instant::now();
559 let commit_result = transaction.commit().await;
560 let commit_duration = now_commit.elapsed().as_millis() as u64;
561 metrics.record_commit_time(commit_duration);
562
563 match commit_result {
564 Err(err) => {
566 let total_duration = now_start.elapsed().as_millis() as u64;
567 metrics.set_execution_time(total_duration);
568 return Err((err, metrics.get_metrics_data()));
569 }
570 Ok(Ok(committed)) => {
571 match committed.committed_version() {
573 Ok(version) => metrics.set_commit_version(version),
574 Err(_err) => {
575 }
579 }
580
581 let total_duration = now_start.elapsed().as_millis() as u64;
582 metrics.set_execution_time(total_duration);
583 return Ok((result_closure.unwrap(), metrics.get_metrics_data()));
584 }
585 Ok(Err(transaction_commit_error)) => {
586 maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
587 let now_on_error = std::time::Instant::now();
589 let on_error_result = transaction_commit_error.on_error().await;
590 let error_duration = now_on_error.elapsed().as_millis() as u64;
591 metrics.add_error_time(error_duration);
592
593 match on_error_result {
594 Ok(t) => {
595 transaction = RetryableTransaction::new(t);
596 metrics.reset_current();
598 continue;
599 }
600 Err(non_retryable_error) => {
601 let total_duration = now_start.elapsed().as_millis() as u64;
602 metrics.set_execution_time(total_duration);
603 return Err((
604 FdbBindingError::from(non_retryable_error),
605 metrics.get_metrics_data(),
606 ));
607 }
608 }
609 }
610 }
611 }
612 }
613
614 pub async fn perform_no_op(&self) -> FdbResult<()> {
622 let trx = self.create_trx()?;
623
624 trx.set_read_version(42);
627 trx.get_read_version().await?;
628 Ok(())
629 }
630
631 #[cfg_api_versions(min = 710)]
634 pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
635 let busyness =
636 unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
637 Ok(busyness)
638 }
639}
640pub trait DatabaseTransact: Sized {
641 type Item;
642 type Error: TransactError;
643 type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
644 fn transact(self, trx: Transaction) -> Self::Future;
645}
646
647#[allow(clippy::needless_lifetimes)]
648#[allow(clippy::type_complexity)]
649mod boxed {
650 use super::*;
651
652 async fn boxed_data_fut<'t, F, T, E, D>(
653 mut f: FnMutBoxed<'t, F, D>,
654 trx: Transaction,
655 ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
656 where
657 F: for<'a> FnMut(
658 &'a Transaction,
659 &'a mut D,
660 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
661 E: TransactError,
662 {
663 let r = (f.f)(&trx, &mut f.d).await;
664 (f, trx, r)
665 }
666
667 pub struct FnMutBoxed<'t, F, D> {
668 pub f: F,
669 pub d: D,
670 pub m: PhantomData<&'t ()>,
671 }
672 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
673 where
674 F: for<'a> FnMut(
675 &'a Transaction,
676 &'a mut D,
677 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
678 F: 't + Send,
679 T: 't,
680 E: 't,
681 D: 't + Send,
682 E: TransactError,
683 {
684 type Item = T;
685 type Error = E;
686 type Future = Pin<
687 Box<
688 dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
689 + Send
690 + 't,
691 >,
692 >;
693
694 fn transact(self, trx: Transaction) -> Self::Future {
695 boxed_data_fut(self, trx).boxed()
696 }
697 }
698}
699
700#[allow(clippy::needless_lifetimes)]
701#[allow(clippy::type_complexity)]
702mod boxed_local {
703 use super::*;
704
705 async fn boxed_local_data_fut<'t, F, T, E, D>(
706 mut f: FnMutBoxedLocal<'t, F, D>,
707 trx: Transaction,
708 ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
709 where
710 F: for<'a> FnMut(
711 &'a Transaction,
712 &'a mut D,
713 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
714 E: TransactError,
715 {
716 let r = (f.f)(&trx, &mut f.d).await;
717 (f, trx, r)
718 }
719
720 pub struct FnMutBoxedLocal<'t, F, D> {
721 pub f: F,
722 pub d: D,
723 pub m: PhantomData<&'t ()>,
724 }
725 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
726 where
727 F: for<'a> FnMut(
728 &'a Transaction,
729 &'a mut D,
730 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
731 F: 't,
732 T: 't,
733 E: 't,
734 D: 't,
735 E: TransactError,
736 {
737 type Item = T;
738 type Error = E;
739 type Future = Pin<
740 Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
741 >;
742
743 fn transact(self, trx: Transaction) -> Self::Future {
744 boxed_local_data_fut(self, trx).boxed_local()
745 }
746 }
747}
748
749pub trait TransactError: From<FdbError> {
751 fn try_into_fdb_error(self) -> Result<FdbError, Self>;
752}
753impl<T> TransactError for T
754where
755 T: From<FdbError> + TryInto<FdbError, Error = T>,
756{
757 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
758 self.try_into()
759 }
760}
761impl TransactError for FdbError {
762 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
763 Ok(self)
764 }
765}
766
767#[derive(Default, Clone)]
769pub struct TransactOption {
770 pub retry_limit: Option<u32>,
771 pub time_out: Option<Duration>,
772 pub is_idempotent: bool,
773}
774
775impl TransactOption {
776 pub fn idempotent() -> Self {
778 Self {
779 is_idempotent: true,
780 ..TransactOption::default()
781 }
782 }
783}