1use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use fdb_sys::if_cfg_api_versions;
20use foundationdb_macros::cfg_api_versions;
21use foundationdb_sys as fdb_sys;
22
23use crate::options;
24use crate::transaction::*;
25use crate::{error, FdbError, FdbResult};
26
27use crate::error::FdbBindingError;
28use futures::prelude::*;
29
30#[cfg_api_versions(min = 710)]
31#[cfg(feature = "tenant-experimental")]
32use crate::tenant::FdbTenant;
33
34pub struct MaybeCommitted(bool);
41
42impl From<MaybeCommitted> for bool {
43 fn from(value: MaybeCommitted) -> Self {
44 value.0
45 }
46}
47
48pub struct Database {
54 pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
55}
56unsafe impl Send for Database {}
57unsafe impl Sync for Database {}
58impl Drop for Database {
59 fn drop(&mut self) {
60 unsafe {
61 fdb_sys::fdb_database_destroy(self.inner.as_ptr());
62 }
63 }
64}
65
66#[cfg_api_versions(min = 610)]
67impl Database {
68 pub fn new(path: Option<&str>) -> FdbResult<Database> {
70 let path_str =
71 path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
72 let path_ptr = path_str
73 .as_ref()
74 .map(|path| path.as_ptr())
75 .unwrap_or(std::ptr::null());
76 let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
77 let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
78 drop(path_str); error::eval(err)?;
80 let ptr =
81 NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
82 Ok(Self::new_from_pointer(ptr))
83 }
84
85 pub fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
87 Self { inner: ptr }
88 }
89
90 pub fn from_path(path: &str) -> FdbResult<Database> {
92 Self::new(Some(path))
93 }
94
95 #[allow(clippy::should_implement_trait)]
97 pub fn default() -> FdbResult<Database> {
98 Self::new(None)
99 }
100}
101
102#[cfg_api_versions(min = 710)]
103#[cfg(feature = "tenant-experimental")]
104impl Database {
105 pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
106 let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
107 let err = unsafe {
108 fdb_sys::fdb_database_open_tenant(
109 self.inner.as_ptr(),
110 tenant_name.as_ptr(),
111 tenant_name.len().try_into().unwrap(),
112 &mut ptr,
113 )
114 };
115 error::eval(err)?;
116 Ok(FdbTenant {
117 inner: NonNull::new(ptr)
118 .expect("fdb_database_open_tenant to not return null if there is no error"),
119 name: tenant_name.to_owned(),
120 })
121 }
122}
123
124#[cfg_api_versions(min = 730)]
125impl Database {
126 pub fn get_client_status(
128 &self,
129 ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin {
130 crate::future::FdbFuture::new(unsafe {
131 fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
132 })
133 }
134}
135
136impl Database {
137 pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
142 if_cfg_api_versions!(min = 510, max = 600 => {
143 let cluster = crate::cluster::Cluster::new(path).await?;
144 let database = cluster.create_database().await?;
145 Ok(database)
146 } else {
147 Database::new(path)
148 })
149 }
150
151 pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
153 unsafe { opt.apply(self.inner.as_ptr()) }
154 }
155
156 pub fn create_trx(&self) -> FdbResult<Transaction> {
158 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
159 let err =
160 unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
161 error::eval(err)?;
162 Ok(Transaction::new(NonNull::new(trx).expect(
163 "fdb_database_create_transaction to not return null if there is no error",
164 )))
165 }
166
167 fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
168 Ok(RetryableTransaction::new(self.create_trx()?))
169 }
170
171 pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
186 where
187 F: DatabaseTransact,
188 {
189 let is_idempotent = options.is_idempotent;
190 let time_out = options.time_out.map(|d| Instant::now() + d);
191 let retry_limit = options.retry_limit;
192 let mut tries: u32 = 0;
193 let mut trx = self.create_trx()?;
194 let mut can_retry = move || {
195 tries += 1;
196 retry_limit.map(|limit| tries < limit).unwrap_or(true)
197 && time_out.map(|t| Instant::now() < t).unwrap_or(true)
198 };
199 loop {
200 let r = f.transact(trx).await;
201 f = r.0;
202 trx = r.1;
203 trx = match r.2 {
204 Ok(item) => match trx.commit().await {
205 Ok(_) => break Ok(item),
206 Err(e) => {
207 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
208 e.on_error().await?
209 } else {
210 break Err(F::Error::from(e.into()));
211 }
212 }
213 },
214 Err(user_err) => match user_err.try_into_fdb_error() {
215 Ok(e) => {
216 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
217 trx.on_error(e).await?
218 } else {
219 break Err(F::Error::from(e));
220 }
221 }
222 Err(user_err) => break Err(user_err),
223 },
224 };
225 }
226 }
227
228 pub fn transact_boxed<'trx, F, D, T, E>(
229 &'trx self,
230 data: D,
231 f: F,
232 options: TransactOption,
233 ) -> impl Future<Output = Result<T, E>> + Send + 'trx
234 where
235 for<'a> F: FnMut(
236 &'a Transaction,
237 &'a mut D,
238 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
239 E: TransactError,
240 F: Send + 'trx,
241 T: Send + 'trx,
242 E: Send + 'trx,
243 D: Send + 'trx,
244 {
245 self.transact(
246 boxed::FnMutBoxed {
247 f,
248 d: data,
249 m: PhantomData,
250 },
251 options,
252 )
253 }
254
255 pub fn transact_boxed_local<'trx, F, D, T, E>(
256 &'trx self,
257 data: D,
258 f: F,
259 options: TransactOption,
260 ) -> impl Future<Output = Result<T, E>> + 'trx
261 where
262 for<'a> F:
263 FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
264 E: TransactError,
265 F: 'trx,
266 T: 'trx,
267 E: 'trx,
268 D: 'trx,
269 {
270 self.transact(
271 boxed_local::FnMutBoxedLocal {
272 f,
273 d: data,
274 m: PhantomData,
275 },
276 options,
277 )
278 }
279
280 pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
310 where
311 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
312 Fut: Future<Output = Result<T, FdbBindingError>>,
313 {
314 let mut maybe_committed_transaction = false;
315 let mut transaction = self.create_retryable_trx()?;
318
319 loop {
320 let result_closure = closure(
322 transaction.clone(),
323 MaybeCommitted(maybe_committed_transaction),
324 )
325 .await;
326
327 if let Err(e) = result_closure {
328 if let Some(e) = e.get_fdb_error() {
330 maybe_committed_transaction = e.is_maybe_committed();
331 match transaction.on_error(e).await {
333 Ok(Ok(t)) => {
335 transaction = t;
336 continue;
337 }
338 Ok(Err(non_retryable_error)) => {
339 return Err(FdbBindingError::from(non_retryable_error))
340 }
341 Err(non_retryable_error) => return Err(non_retryable_error),
343 }
344 }
345 return Err(e);
347 }
348
349 let commit_result = transaction.commit().await;
350
351 match commit_result {
352 Err(err) => return Err(err),
354 Ok(Ok(_)) => return result_closure,
355 Ok(Err(transaction_commit_error)) => {
356 maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
357 match transaction_commit_error.on_error().await {
359 Ok(t) => {
360 transaction = RetryableTransaction::new(t);
361 continue;
362 }
363 Err(non_retryable_error) => {
364 return Err(FdbBindingError::from(non_retryable_error))
365 }
366 }
367 }
368 }
369 }
370 }
371
372 pub async fn perform_no_op(&self) -> FdbResult<()> {
380 let trx = self.create_trx()?;
381
382 trx.set_read_version(42);
385 trx.get_read_version().await?;
386 Ok(())
387 }
388
389 #[cfg_api_versions(min = 710)]
392 pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
393 let busyness =
394 unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
395 Ok(busyness)
396 }
397}
398pub trait DatabaseTransact: Sized {
399 type Item;
400 type Error: TransactError;
401 type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
402 fn transact(self, trx: Transaction) -> Self::Future;
403}
404
405#[allow(clippy::needless_lifetimes)]
406#[allow(clippy::type_complexity)]
407mod boxed {
408 use super::*;
409
410 async fn boxed_data_fut<'t, F, T, E, D>(
411 mut f: FnMutBoxed<'t, F, D>,
412 trx: Transaction,
413 ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
414 where
415 F: for<'a> FnMut(
416 &'a Transaction,
417 &'a mut D,
418 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
419 E: TransactError,
420 {
421 let r = (f.f)(&trx, &mut f.d).await;
422 (f, trx, r)
423 }
424
425 pub struct FnMutBoxed<'t, F, D> {
426 pub f: F,
427 pub d: D,
428 pub m: PhantomData<&'t ()>,
429 }
430 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
431 where
432 F: for<'a> FnMut(
433 &'a Transaction,
434 &'a mut D,
435 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
436 F: 't + Send,
437 T: 't,
438 E: 't,
439 D: 't + Send,
440 E: TransactError,
441 {
442 type Item = T;
443 type Error = E;
444 type Future = Pin<
445 Box<
446 dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
447 + Send
448 + 't,
449 >,
450 >;
451
452 fn transact(self, trx: Transaction) -> Self::Future {
453 boxed_data_fut(self, trx).boxed()
454 }
455 }
456}
457
458#[allow(clippy::needless_lifetimes)]
459#[allow(clippy::type_complexity)]
460mod boxed_local {
461 use super::*;
462
463 async fn boxed_local_data_fut<'t, F, T, E, D>(
464 mut f: FnMutBoxedLocal<'t, F, D>,
465 trx: Transaction,
466 ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
467 where
468 F: for<'a> FnMut(
469 &'a Transaction,
470 &'a mut D,
471 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
472 E: TransactError,
473 {
474 let r = (f.f)(&trx, &mut f.d).await;
475 (f, trx, r)
476 }
477
478 pub struct FnMutBoxedLocal<'t, F, D> {
479 pub f: F,
480 pub d: D,
481 pub m: PhantomData<&'t ()>,
482 }
483 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
484 where
485 F: for<'a> FnMut(
486 &'a Transaction,
487 &'a mut D,
488 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
489 F: 't,
490 T: 't,
491 E: 't,
492 D: 't,
493 E: TransactError,
494 {
495 type Item = T;
496 type Error = E;
497 type Future = Pin<
498 Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
499 >;
500
501 fn transact(self, trx: Transaction) -> Self::Future {
502 boxed_local_data_fut(self, trx).boxed_local()
503 }
504 }
505}
506
507pub trait TransactError: From<FdbError> {
509 fn try_into_fdb_error(self) -> Result<FdbError, Self>;
510}
511impl<T> TransactError for T
512where
513 T: From<FdbError> + TryInto<FdbError, Error = T>,
514{
515 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
516 self.try_into()
517 }
518}
519impl TransactError for FdbError {
520 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
521 Ok(self)
522 }
523}
524
525#[derive(Default, Clone)]
527pub struct TransactOption {
528 pub retry_limit: Option<u32>,
529 pub time_out: Option<Duration>,
530 pub is_idempotent: bool,
531}
532
533impl TransactOption {
534 pub fn idempotent() -> Self {
536 Self {
537 is_idempotent: true,
538 ..TransactOption::default()
539 }
540 }
541}