1use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use fdb_sys::if_cfg_api_versions;
20use foundationdb_macros::cfg_api_versions;
21use foundationdb_sys as fdb_sys;
22
23use crate::metrics::{MetricsReport, TransactionMetrics};
24use crate::options;
25use crate::transaction::*;
26use crate::{error, FdbError, FdbResult};
27
28use crate::error::FdbBindingError;
29#[cfg_api_versions(min = 710)]
30#[cfg(feature = "tenant-experimental")]
31use crate::tenant::FdbTenant;
32use futures::prelude::*;
33
34pub struct MaybeCommitted(bool);
41
42impl From<MaybeCommitted> for bool {
43 fn from(value: MaybeCommitted) -> Self {
44 value.0
45 }
46}
47
48pub trait RunnerHooks {
54 fn on_commit_error(
59 &self,
60 _err: &TransactionCommitError,
61 ) -> impl Future<Output = FdbResult<()>> + Send {
62 async { Ok(()) }
63 }
64
65 fn on_closure_error(&self, _err: &FdbError) {}
67
68 fn on_error_duration(&self, _duration_ms: u64) {}
70
71 fn on_commit_success(&self, _committed: &TransactionCommitted, _commit_duration_ms: u64) {}
73
74 fn on_retry(&self) {}
76
77 fn on_complete(&self) {}
79}
80
81#[derive(Debug, Clone, Copy, Default)]
83pub struct NoopHooks;
84impl RunnerHooks for NoopHooks {}
85
86pub(crate) struct InstrumentedHooks {
88 pub(crate) metrics: TransactionMetrics,
89 pub(crate) start: Instant,
90}
91
92impl RunnerHooks for InstrumentedHooks {
93 async fn on_commit_error(&self, err: &TransactionCommitError) -> FdbResult<()> {
94 if err.code() == 1020 {
96 self.metrics.increment_conflict_count();
97 }
98 let keys = err.conflicting_keys().await?;
104 if !keys.is_empty() {
105 self.metrics.set_conflicting_keys(keys);
106 }
107 Ok(())
108 }
109
110 fn on_error_duration(&self, duration_ms: u64) {
111 self.metrics.add_error_time(duration_ms);
112 }
113
114 fn on_commit_success(&self, committed: &TransactionCommitted, commit_duration_ms: u64) {
115 self.metrics.record_commit_time(commit_duration_ms);
116 if let Ok(version) = committed.committed_version() {
117 self.metrics.set_commit_version(version);
118 }
119 }
120
121 fn on_retry(&self) {
122 self.metrics.reset_current();
123 }
124
125 fn on_complete(&self) {
126 let total_duration = self.start.elapsed().as_millis() as u64;
127 self.metrics.set_execution_time(total_duration);
128 }
129}
130
131#[cfg_attr(
144 feature = "trace",
145 tracing::instrument(level = "debug", skip(initial_transaction, hooks, closure))
146)]
147pub(crate) async fn run_with_hooks<F, Fut, T, H: RunnerHooks>(
148 initial_transaction: RetryableTransaction,
149 hooks: &H,
150 closure: F,
151) -> Result<T, FdbBindingError>
152where
153 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
154 Fut: Future<Output = Result<T, FdbBindingError>>,
155{
156 let mut maybe_committed = false;
157 let mut transaction = initial_transaction;
158 #[cfg(feature = "trace")]
159 let mut iteration: u64 = 0;
160
161 loop {
162 #[cfg(feature = "trace")]
163 {
164 iteration += 1;
165 }
166
167 let result_closure = closure(transaction.clone(), MaybeCommitted(maybe_committed)).await;
168
169 if let Err(e) = result_closure {
170 if let Some(fdb_err) = e.get_fdb_error() {
171 maybe_committed = fdb_err.is_maybe_committed();
172 hooks.on_closure_error(&fdb_err);
173
174 let now_on_error = Instant::now();
175 match transaction.on_error(fdb_err).await {
176 Ok(Ok(t)) => {
177 hooks.on_error_duration(now_on_error.elapsed().as_millis() as u64);
178
179 #[cfg(feature = "trace")]
180 {
181 let error_code = fdb_err.code();
182 tracing::warn!(iteration, error_code, "restarting transaction");
183 }
184
185 hooks.on_retry();
186 transaction = t;
187 continue;
188 }
189 Ok(Err(non_retryable)) => {
190 return Err(FdbBindingError::from(non_retryable));
191 }
192 Err(binding_err) => {
193 return Err(binding_err);
194 }
195 }
196 }
197 return Err(e);
198 }
199
200 #[cfg(feature = "trace")]
201 tracing::info!(iteration, "closure executed, checking result...");
202
203 let now_commit = Instant::now();
204 let commit_result = transaction.commit().await;
205 let commit_duration = now_commit.elapsed().as_millis() as u64;
206
207 match commit_result {
208 Err(err) => {
209 #[cfg(feature = "trace")]
210 tracing::error!(
211 iteration,
212 "transaction reference kept, aborting transaction"
213 );
214 return Err(err);
215 }
216 Ok(Ok(committed)) => {
217 hooks.on_commit_success(&committed, commit_duration);
218
219 #[cfg(feature = "trace")]
220 tracing::info!(iteration, "success, returning result");
221
222 return result_closure;
223 }
224 Ok(Err(commit_error)) => {
225 #[cfg(feature = "trace")]
226 let error_code = commit_error.code();
227
228 maybe_committed = commit_error.is_maybe_committed();
229 if let Err(_e) = hooks.on_commit_error(&commit_error).await {
230 #[cfg(feature = "trace")]
231 tracing::debug!(error_code = _e.code(), "on_commit_error hook failed");
232 }
233
234 let now_on_error = Instant::now();
235 match commit_error.on_error().await {
236 Ok(t) => {
237 hooks.on_error_duration(now_on_error.elapsed().as_millis() as u64);
238
239 #[cfg(feature = "trace")]
240 tracing::warn!(iteration, error_code, "restarting transaction");
241
242 hooks.on_retry();
243 transaction = RetryableTransaction::new(t);
244 continue;
245 }
246 Err(non_retryable) => {
247 #[cfg(feature = "trace")]
248 {
249 let error_code = non_retryable.code();
250 tracing::error!(
251 iteration,
252 error_code,
253 "could not commit, non retryable error"
254 );
255 }
256
257 return Err(FdbBindingError::from(non_retryable));
258 }
259 }
260 }
261 }
262 }
263}
264
265pub struct Database {
271 pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
272}
273unsafe impl Send for Database {}
274unsafe impl Sync for Database {}
275impl Drop for Database {
276 fn drop(&mut self) {
277 unsafe {
278 fdb_sys::fdb_database_destroy(self.inner.as_ptr());
279 }
280 }
281}
282
283#[cfg_api_versions(min = 610)]
284impl Database {
285 pub fn new(path: Option<&str>) -> FdbResult<Database> {
287 let path_str =
288 path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
289 let path_ptr = path_str
290 .as_ref()
291 .map(|path| path.as_ptr())
292 .unwrap_or(std::ptr::null());
293 let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
294 let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
295 drop(path_str); error::eval(err)?;
297 let ptr =
298 NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
299 Ok(unsafe { Self::new_from_pointer(ptr) })
302 }
303
304 pub unsafe fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
312 Self { inner: ptr }
313 }
314
315 pub fn from_path(path: &str) -> FdbResult<Database> {
317 Self::new(Some(path))
318 }
319
320 #[allow(clippy::should_implement_trait)]
322 pub fn default() -> FdbResult<Database> {
323 Self::new(None)
324 }
325}
326
327#[cfg_api_versions(min = 710)]
328#[cfg(feature = "tenant-experimental")]
329impl Database {
330 pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
331 let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
332 let err = unsafe {
333 fdb_sys::fdb_database_open_tenant(
334 self.inner.as_ptr(),
335 tenant_name.as_ptr(),
336 tenant_name.len().try_into().unwrap(),
337 &mut ptr,
338 )
339 };
340 error::eval(err)?;
341 Ok(FdbTenant {
342 inner: NonNull::new(ptr)
343 .expect("fdb_database_open_tenant to not return null if there is no error"),
344 name: tenant_name.to_owned(),
345 })
346 }
347}
348
349#[cfg_api_versions(min = 730)]
350impl Database {
351 pub fn get_client_status(
353 &self,
354 ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin {
355 crate::future::FdbFuture::new(unsafe {
356 fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
357 })
358 }
359}
360
361impl Database {
362 pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
367 if_cfg_api_versions!(min = 510, max = 600 => {
368 let cluster = crate::cluster::Cluster::new(path).await?;
369 let database = cluster.create_database().await?;
370 Ok(database)
371 } else {
372 Database::new(path)
373 })
374 }
375
376 pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
378 unsafe { opt.apply(self.inner.as_ptr()) }
379 }
380
381 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
383 pub fn create_trx(&self) -> FdbResult<Transaction> {
384 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
385 let err =
386 unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
387 error::eval(err)?;
388 Ok(Transaction::new(NonNull::new(trx).expect(
389 "fdb_database_create_transaction to not return null if there is no error",
390 )))
391 }
392
393 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
404 pub fn create_instrumented_trx(
405 &self,
406 metrics: TransactionMetrics,
407 ) -> Result<Transaction, FdbBindingError> {
408 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
409 let err =
410 unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
411 error::eval(err)?;
412
413 let inner = NonNull::new(trx)
414 .expect("fdb_database_create_transaction to not return null if there is no error");
415 Ok(Transaction::new_instrumented(inner, metrics))
416 }
417
418 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
419 fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
420 Ok(RetryableTransaction::new(self.create_trx()?))
421 }
422
423 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
434 pub fn create_intrumented_retryable_trx(
435 &self,
436 metrics: TransactionMetrics,
437 ) -> Result<RetryableTransaction, FdbBindingError> {
438 Ok(RetryableTransaction::new(
439 self.create_instrumented_trx(metrics.clone())?,
440 ))
441 }
442
443 pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
458 where
459 F: DatabaseTransact,
460 {
461 let is_idempotent = options.is_idempotent;
462 let time_out = options.time_out.map(|d| Instant::now() + d);
463 let retry_limit = options.retry_limit;
464 let mut tries: u32 = 0;
465 let mut trx = self.create_trx()?;
466 let mut can_retry = move || {
467 tries += 1;
468 retry_limit.map(|limit| tries < limit).unwrap_or(true)
469 && time_out.map(|t| Instant::now() < t).unwrap_or(true)
470 };
471 loop {
472 let r = f.transact(trx).await;
473 f = r.0;
474 trx = r.1;
475 trx = match r.2 {
476 Ok(item) => match trx.commit().await {
477 Ok(_) => break Ok(item),
478 Err(e) => {
479 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
480 e.on_error().await?
481 } else {
482 break Err(F::Error::from(e.into()));
483 }
484 }
485 },
486 Err(user_err) => match user_err.try_into_fdb_error() {
487 Ok(e) => {
488 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
489 trx.on_error(e).await?
490 } else {
491 break Err(F::Error::from(e));
492 }
493 }
494 Err(user_err) => break Err(user_err),
495 },
496 };
497 }
498 }
499
500 pub fn transact_boxed<'trx, F, D, T, E>(
501 &'trx self,
502 data: D,
503 f: F,
504 options: TransactOption,
505 ) -> impl Future<Output = Result<T, E>> + Send + 'trx
506 where
507 for<'a> F: FnMut(
508 &'a Transaction,
509 &'a mut D,
510 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
511 E: TransactError,
512 F: Send + 'trx,
513 T: Send + 'trx,
514 E: Send + 'trx,
515 D: Send + 'trx,
516 {
517 self.transact(
518 boxed::FnMutBoxed {
519 f,
520 d: data,
521 m: PhantomData,
522 },
523 options,
524 )
525 }
526
527 pub fn transact_boxed_local<'trx, F, D, T, E>(
528 &'trx self,
529 data: D,
530 f: F,
531 options: TransactOption,
532 ) -> impl Future<Output = Result<T, E>> + 'trx
533 where
534 for<'a> F:
535 FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
536 E: TransactError,
537 F: 'trx,
538 T: 'trx,
539 E: 'trx,
540 D: 'trx,
541 {
542 self.transact(
543 boxed_local::FnMutBoxedLocal {
544 f,
545 d: data,
546 m: PhantomData,
547 },
548 options,
549 )
550 }
551
552 #[cfg_attr(
582 feature = "trace",
583 tracing::instrument(level = "debug", skip(self, closure))
584 )]
585 pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
586 where
587 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
588 Fut: Future<Output = Result<T, FdbBindingError>>,
589 {
590 let transaction = self.create_retryable_trx()?;
591 run_with_hooks(transaction, &NoopHooks, closure).await
592 }
593
594 #[cfg_attr(
601 feature = "trace",
602 tracing::instrument(level = "debug", skip(self, hooks, closure))
603 )]
604 pub async fn run_with_hooks<F, Fut, T, H: RunnerHooks>(
605 &self,
606 hooks: &H,
607 closure: F,
608 ) -> Result<T, FdbBindingError>
609 where
610 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
611 Fut: Future<Output = Result<T, FdbBindingError>>,
612 {
613 let transaction = self.create_retryable_trx()?;
614 run_with_hooks(transaction, hooks, closure).await
615 }
616
617 #[cfg_attr(
643 feature = "trace",
644 tracing::instrument(level = "debug", skip(self, closure))
645 )]
646 pub async fn instrumented_run<F, Fut, T>(
647 &self,
648 closure: F,
649 ) -> Result<(T, MetricsReport), (FdbBindingError, MetricsReport)>
650 where
651 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
652 Fut: Future<Output = Result<T, FdbBindingError>>,
653 {
654 let metrics = TransactionMetrics::new();
655 let hooks = InstrumentedHooks {
656 metrics: metrics.clone(),
657 start: Instant::now(),
658 };
659 let transaction = match self.create_intrumented_retryable_trx(metrics.clone()) {
660 Ok(trx) => trx,
661 Err(err) => {
662 hooks.on_complete();
663 return Err((err, metrics.get_metrics_data()));
664 }
665 };
666
667 match run_with_hooks(transaction, &hooks, closure).await {
668 Ok(val) => {
669 hooks.on_complete();
670 Ok((val, metrics.get_metrics_data()))
671 }
672 Err(err) => {
673 hooks.on_complete();
674 Err((err, metrics.get_metrics_data()))
675 }
676 }
677 }
678
679 pub async fn perform_no_op(&self) -> FdbResult<()> {
687 let trx = self.create_trx()?;
688
689 trx.set_read_version(42);
692 trx.get_read_version().await?;
693 Ok(())
694 }
695
696 #[cfg_api_versions(min = 710)]
699 pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
700 let busyness =
701 unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
702 Ok(busyness)
703 }
704}
705pub trait DatabaseTransact: Sized {
706 type Item;
707 type Error: TransactError;
708 type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
709 fn transact(self, trx: Transaction) -> Self::Future;
710}
711
712#[allow(clippy::needless_lifetimes)]
713#[allow(clippy::type_complexity)]
714mod boxed {
715 use super::*;
716
717 async fn boxed_data_fut<'t, F, T, E, D>(
718 mut f: FnMutBoxed<'t, F, D>,
719 trx: Transaction,
720 ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
721 where
722 F: for<'a> FnMut(
723 &'a Transaction,
724 &'a mut D,
725 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
726 E: TransactError,
727 {
728 let r = (f.f)(&trx, &mut f.d).await;
729 (f, trx, r)
730 }
731
732 pub struct FnMutBoxed<'t, F, D> {
733 pub f: F,
734 pub d: D,
735 pub m: PhantomData<&'t ()>,
736 }
737 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
738 where
739 F: for<'a> FnMut(
740 &'a Transaction,
741 &'a mut D,
742 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
743 F: 't + Send,
744 T: 't,
745 E: 't,
746 D: 't + Send,
747 E: TransactError,
748 {
749 type Item = T;
750 type Error = E;
751 type Future = Pin<
752 Box<
753 dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
754 + Send
755 + 't,
756 >,
757 >;
758
759 fn transact(self, trx: Transaction) -> Self::Future {
760 boxed_data_fut(self, trx).boxed()
761 }
762 }
763}
764
765#[allow(clippy::needless_lifetimes)]
766#[allow(clippy::type_complexity)]
767mod boxed_local {
768 use super::*;
769
770 async fn boxed_local_data_fut<'t, F, T, E, D>(
771 mut f: FnMutBoxedLocal<'t, F, D>,
772 trx: Transaction,
773 ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
774 where
775 F: for<'a> FnMut(
776 &'a Transaction,
777 &'a mut D,
778 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
779 E: TransactError,
780 {
781 let r = (f.f)(&trx, &mut f.d).await;
782 (f, trx, r)
783 }
784
785 pub struct FnMutBoxedLocal<'t, F, D> {
786 pub f: F,
787 pub d: D,
788 pub m: PhantomData<&'t ()>,
789 }
790 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
791 where
792 F: for<'a> FnMut(
793 &'a Transaction,
794 &'a mut D,
795 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
796 F: 't,
797 T: 't,
798 E: 't,
799 D: 't,
800 E: TransactError,
801 {
802 type Item = T;
803 type Error = E;
804 type Future = Pin<
805 Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
806 >;
807
808 fn transact(self, trx: Transaction) -> Self::Future {
809 boxed_local_data_fut(self, trx).boxed_local()
810 }
811 }
812}
813
814pub trait TransactError: From<FdbError> {
816 fn try_into_fdb_error(self) -> Result<FdbError, Self>;
817}
818impl<T> TransactError for T
819where
820 T: From<FdbError> + TryInto<FdbError, Error = T>,
821{
822 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
823 self.try_into()
824 }
825}
826impl TransactError for FdbError {
827 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
828 Ok(self)
829 }
830}
831
832#[derive(Default, Clone)]
834pub struct TransactOption {
835 pub retry_limit: Option<u32>,
836 pub time_out: Option<Duration>,
837 pub is_idempotent: bool,
838}
839
840impl TransactOption {
841 pub fn idempotent() -> Self {
843 Self {
844 is_idempotent: true,
845 ..TransactOption::default()
846 }
847 }
848}