1use std::convert::TryInto;
14use std::marker::PhantomData;
15use std::pin::Pin;
16use std::ptr::NonNull;
17use std::time::{Duration, Instant};
18
19use foundationdb_macros::cfg_api_versions;
20use foundationdb_sys as fdb_sys;
21
22use crate::options;
23use crate::transaction::*;
24use crate::{error, FdbError, FdbResult};
25
26use crate::error::FdbBindingError;
27use futures::prelude::*;
28
29#[cfg(any(feature = "fdb-7_1", feature = "fdb-7_3"))]
30#[cfg(feature = "tenant-experimental")]
31use crate::tenant::FdbTenant;
32
33pub struct MaybeCommitted(bool);
40
41impl From<MaybeCommitted> for bool {
42 fn from(value: MaybeCommitted) -> Self {
43 value.0
44 }
45}
46
47pub struct Database {
53 pub(crate) inner: NonNull<fdb_sys::FDBDatabase>,
54}
55unsafe impl Send for Database {}
56unsafe impl Sync for Database {}
57impl Drop for Database {
58 fn drop(&mut self) {
59 unsafe {
60 fdb_sys::fdb_database_destroy(self.inner.as_ptr());
61 }
62 }
63}
64
65#[cfg_api_versions(min = 610)]
66impl Database {
67 pub fn new(path: Option<&str>) -> FdbResult<Database> {
69 let path_str =
70 path.map(|path| std::ffi::CString::new(path).expect("path to be convertible to CStr"));
71 let path_ptr = path_str
72 .as_ref()
73 .map(|path| path.as_ptr())
74 .unwrap_or(std::ptr::null());
75 let mut v: *mut fdb_sys::FDBDatabase = std::ptr::null_mut();
76 let err = unsafe { fdb_sys::fdb_create_database(path_ptr, &mut v) };
77 drop(path_str); error::eval(err)?;
79 let ptr =
80 NonNull::new(v).expect("fdb_create_database to not return null if there is no error");
81 Ok(Self::new_from_pointer(ptr))
82 }
83
84 pub fn new_from_pointer(ptr: NonNull<fdb_sys::FDBDatabase>) -> Self {
86 Self { inner: ptr }
87 }
88
89 pub fn from_path(path: &str) -> FdbResult<Database> {
91 Self::new(Some(path))
92 }
93
94 #[allow(clippy::should_implement_trait)]
96 pub fn default() -> FdbResult<Database> {
97 Self::new(None)
98 }
99}
100
101#[cfg_api_versions(min = 710)]
102#[cfg(feature = "tenant-experimental")]
103impl Database {
104 pub fn open_tenant(&self, tenant_name: &[u8]) -> FdbResult<FdbTenant> {
105 let mut ptr: *mut fdb_sys::FDB_tenant = std::ptr::null_mut();
106 let err = unsafe {
107 fdb_sys::fdb_database_open_tenant(
108 self.inner.as_ptr(),
109 tenant_name.as_ptr(),
110 tenant_name.len().try_into().unwrap(),
111 &mut ptr,
112 )
113 };
114 error::eval(err)?;
115 Ok(FdbTenant {
116 inner: NonNull::new(ptr)
117 .expect("fdb_database_open_tenant to not return null if there is no error"),
118 name: tenant_name.to_owned(),
119 })
120 }
121}
122
123#[cfg_api_versions(min = 730)]
124impl Database {
125 pub fn get_client_status(
127 &self,
128 ) -> impl Future<Output = FdbResult<crate::future::FdbSlice>> + Send + Sync + Unpin {
129 crate::future::FdbFuture::new(unsafe {
130 fdb_sys::fdb_database_get_client_status(self.inner.as_ptr())
131 })
132 }
133}
134
135impl Database {
136 pub async fn new_compat(path: Option<&str>) -> FdbResult<Database> {
141 #[cfg(any(feature = "fdb-5_1", feature = "fdb-5_2", feature = "fdb-6_0"))]
142 {
143 let cluster = crate::cluster::Cluster::new(path).await?;
144 let database = cluster.create_database().await?;
145 Ok(database)
146 }
147
148 #[cfg(not(any(feature = "fdb-5_1", feature = "fdb-5_2", feature = "fdb-6_0")))]
149 {
150 Database::new(path)
151 }
152 }
153
154 pub fn set_option(&self, opt: options::DatabaseOption) -> FdbResult<()> {
156 unsafe { opt.apply(self.inner.as_ptr()) }
157 }
158
159 pub fn create_trx(&self) -> FdbResult<Transaction> {
161 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
162 let err =
163 unsafe { fdb_sys::fdb_database_create_transaction(self.inner.as_ptr(), &mut trx) };
164 error::eval(err)?;
165 Ok(Transaction::new(NonNull::new(trx).expect(
166 "fdb_database_create_transaction to not return null if there is no error",
167 )))
168 }
169
170 fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
171 Ok(RetryableTransaction::new(self.create_trx()?))
172 }
173
174 pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
189 where
190 F: DatabaseTransact,
191 {
192 let is_idempotent = options.is_idempotent;
193 let time_out = options.time_out.map(|d| Instant::now() + d);
194 let retry_limit = options.retry_limit;
195 let mut tries: u32 = 0;
196 let mut trx = self.create_trx()?;
197 let mut can_retry = move || {
198 tries += 1;
199 retry_limit.map(|limit| tries < limit).unwrap_or(true)
200 && time_out.map(|t| Instant::now() < t).unwrap_or(true)
201 };
202 loop {
203 let r = f.transact(trx).await;
204 f = r.0;
205 trx = r.1;
206 trx = match r.2 {
207 Ok(item) => match trx.commit().await {
208 Ok(_) => break Ok(item),
209 Err(e) => {
210 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
211 e.on_error().await?
212 } else {
213 break Err(F::Error::from(e.into()));
214 }
215 }
216 },
217 Err(user_err) => match user_err.try_into_fdb_error() {
218 Ok(e) => {
219 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
220 trx.on_error(e).await?
221 } else {
222 break Err(F::Error::from(e));
223 }
224 }
225 Err(user_err) => break Err(user_err),
226 },
227 };
228 }
229 }
230
231 pub fn transact_boxed<'trx, F, D, T, E>(
232 &'trx self,
233 data: D,
234 f: F,
235 options: TransactOption,
236 ) -> impl Future<Output = Result<T, E>> + Send + 'trx
237 where
238 for<'a> F: FnMut(
239 &'a Transaction,
240 &'a mut D,
241 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
242 E: TransactError,
243 F: Send + 'trx,
244 T: Send + 'trx,
245 E: Send + 'trx,
246 D: Send + 'trx,
247 {
248 self.transact(
249 boxed::FnMutBoxed {
250 f,
251 d: data,
252 m: PhantomData,
253 },
254 options,
255 )
256 }
257
258 pub fn transact_boxed_local<'trx, F, D, T, E>(
259 &'trx self,
260 data: D,
261 f: F,
262 options: TransactOption,
263 ) -> impl Future<Output = Result<T, E>> + 'trx
264 where
265 for<'a> F:
266 FnMut(&'a Transaction, &'a mut D) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
267 E: TransactError,
268 F: 'trx,
269 T: 'trx,
270 E: 'trx,
271 D: 'trx,
272 {
273 self.transact(
274 boxed_local::FnMutBoxedLocal {
275 f,
276 d: data,
277 m: PhantomData,
278 },
279 options,
280 )
281 }
282
283 pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
313 where
314 F: Fn(RetryableTransaction, MaybeCommitted) -> Fut,
315 Fut: Future<Output = Result<T, FdbBindingError>>,
316 {
317 let mut maybe_committed_transaction = false;
318 let mut transaction = self.create_retryable_trx()?;
321
322 loop {
323 let result_closure = closure(
325 transaction.clone(),
326 MaybeCommitted(maybe_committed_transaction),
327 )
328 .await;
329
330 if let Err(e) = result_closure {
331 if let Some(e) = e.get_fdb_error() {
333 maybe_committed_transaction = e.is_maybe_committed();
334 match transaction.on_error(e).await {
336 Ok(Ok(t)) => {
338 transaction = t;
339 continue;
340 }
341 Ok(Err(non_retryable_error)) => {
342 return Err(FdbBindingError::from(non_retryable_error))
343 }
344 Err(non_retryable_error) => return Err(non_retryable_error),
346 }
347 }
348 return Err(e);
350 }
351
352 let commit_result = transaction.commit().await;
353
354 match commit_result {
355 Err(err) => return Err(err),
357 Ok(Ok(_)) => return result_closure,
358 Ok(Err(transaction_commit_error)) => {
359 maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
360 match transaction_commit_error.on_error().await {
362 Ok(t) => {
363 transaction = RetryableTransaction::new(t);
364 continue;
365 }
366 Err(non_retryable_error) => {
367 return Err(FdbBindingError::from(non_retryable_error))
368 }
369 }
370 }
371 }
372 }
373 }
374
375 pub async fn perform_no_op(&self) -> FdbResult<()> {
383 let trx = self.create_trx()?;
384
385 trx.set_read_version(42);
388 trx.get_read_version().await?;
389 Ok(())
390 }
391
392 #[cfg_api_versions(min = 710)]
395 pub async fn get_main_thread_busyness(&self) -> FdbResult<f64> {
396 let busyness =
397 unsafe { fdb_sys::fdb_database_get_main_thread_busyness(self.inner.as_ptr()) };
398 Ok(busyness)
399 }
400}
401pub trait DatabaseTransact: Sized {
402 type Item;
403 type Error: TransactError;
404 type Future: Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>;
405 fn transact(self, trx: Transaction) -> Self::Future;
406}
407
408#[allow(clippy::needless_lifetimes)]
409#[allow(clippy::type_complexity)]
410mod boxed {
411 use super::*;
412
413 async fn boxed_data_fut<'t, F, T, E, D>(
414 mut f: FnMutBoxed<'t, F, D>,
415 trx: Transaction,
416 ) -> (FnMutBoxed<'t, F, D>, Transaction, Result<T, E>)
417 where
418 F: for<'a> FnMut(
419 &'a Transaction,
420 &'a mut D,
421 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
422 E: TransactError,
423 {
424 let r = (f.f)(&trx, &mut f.d).await;
425 (f, trx, r)
426 }
427
428 pub struct FnMutBoxed<'t, F, D> {
429 pub f: F,
430 pub d: D,
431 pub m: PhantomData<&'t ()>,
432 }
433 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxed<'t, F, D>
434 where
435 F: for<'a> FnMut(
436 &'a Transaction,
437 &'a mut D,
438 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + Send + 'a>>,
439 F: 't + Send,
440 T: 't,
441 E: 't,
442 D: 't + Send,
443 E: TransactError,
444 {
445 type Item = T;
446 type Error = E;
447 type Future = Pin<
448 Box<
449 dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)>
450 + Send
451 + 't,
452 >,
453 >;
454
455 fn transact(self, trx: Transaction) -> Self::Future {
456 boxed_data_fut(self, trx).boxed()
457 }
458 }
459}
460
461#[allow(clippy::needless_lifetimes)]
462#[allow(clippy::type_complexity)]
463mod boxed_local {
464 use super::*;
465
466 async fn boxed_local_data_fut<'t, F, T, E, D>(
467 mut f: FnMutBoxedLocal<'t, F, D>,
468 trx: Transaction,
469 ) -> (FnMutBoxedLocal<'t, F, D>, Transaction, Result<T, E>)
470 where
471 F: for<'a> FnMut(
472 &'a Transaction,
473 &'a mut D,
474 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
475 E: TransactError,
476 {
477 let r = (f.f)(&trx, &mut f.d).await;
478 (f, trx, r)
479 }
480
481 pub struct FnMutBoxedLocal<'t, F, D> {
482 pub f: F,
483 pub d: D,
484 pub m: PhantomData<&'t ()>,
485 }
486 impl<'t, F, T, E, D> DatabaseTransact for FnMutBoxedLocal<'t, F, D>
487 where
488 F: for<'a> FnMut(
489 &'a Transaction,
490 &'a mut D,
491 ) -> Pin<Box<dyn Future<Output = Result<T, E>> + 'a>>,
492 F: 't,
493 T: 't,
494 E: 't,
495 D: 't,
496 E: TransactError,
497 {
498 type Item = T;
499 type Error = E;
500 type Future = Pin<
501 Box<dyn Future<Output = (Self, Transaction, Result<Self::Item, Self::Error>)> + 't>,
502 >;
503
504 fn transact(self, trx: Transaction) -> Self::Future {
505 boxed_local_data_fut(self, trx).boxed_local()
506 }
507 }
508}
509
510pub trait TransactError: From<FdbError> {
512 fn try_into_fdb_error(self) -> Result<FdbError, Self>;
513}
514impl<T> TransactError for T
515where
516 T: From<FdbError> + TryInto<FdbError, Error = T>,
517{
518 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
519 self.try_into()
520 }
521}
522impl TransactError for FdbError {
523 fn try_into_fdb_error(self) -> Result<FdbError, Self> {
524 Ok(self)
525 }
526}
527
528#[derive(Default, Clone)]
530pub struct TransactOption {
531 pub retry_limit: Option<u32>,
532 pub time_out: Option<Duration>,
533 pub is_idempotent: bool,
534}
535
536impl TransactOption {
537 pub fn idempotent() -> Self {
539 Self {
540 is_idempotent: true,
541 ..TransactOption::default()
542 }
543 }
544}