1use crate::options::TransactionOption;
11use std::future::Future;
12
13use crate::database::TransactError;
14use crate::{
15 error, Database, DatabaseTransact, FdbBindingError, FdbError, FdbResult, KeySelector,
16 RangeOption, RetryableTransaction, TransactOption, Transaction,
17};
18use foundationdb_sys as fdb_sys;
19use futures::TryStreamExt;
20use serde::{Deserialize, Serialize};
21use serde_json::Error;
22use std::ptr::NonNull;
23use std::sync::atomic::{AtomicBool, Ordering};
24use std::time::Instant;
25
26#[cfg(feature = "fdb-7_1")]
27const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant_map/";
28#[cfg(feature = "fdb-7_1")]
29const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant_map0";
30
31#[cfg(feature = "fdb-7_3")]
32const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant/map/";
33#[cfg(feature = "fdb-7_3")]
34const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant/map0";
35
36pub struct FdbTenant {
38 pub(crate) inner: NonNull<fdb_sys::FDBTenant>,
39 pub(crate) name: Vec<u8>,
40}
41
42unsafe impl Send for FdbTenant {}
43unsafe impl Sync for FdbTenant {}
44
45impl Drop for FdbTenant {
46 fn drop(&mut self) {
47 unsafe {
48 fdb_sys::fdb_tenant_destroy(self.inner.as_ptr());
49 }
50 }
51}
52
53impl FdbTenant {
54 pub fn get_name(&self) -> &[u8] {
56 &self.name
57 }
58
59 pub fn create_trx(&self) -> FdbResult<Transaction> {
61 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
62 let err = unsafe { fdb_sys::fdb_tenant_create_transaction(self.inner.as_ptr(), &mut trx) };
63 error::eval(err)?;
64 Ok(Transaction::new(NonNull::new(trx).expect(
65 "fdb_tenant_create_transaction to not return null if there is no error",
66 )))
67 }
68
69 fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
70 Ok(RetryableTransaction::new(self.create_trx()?))
71 }
72
73 pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
95 where
96 F: Fn(RetryableTransaction, bool) -> Fut,
97 Fut: Future<Output = Result<T, FdbBindingError>>,
98 {
99 let mut maybe_committed_transaction = false;
100 let mut transaction = self.create_retryable_trx()?;
103
104 loop {
105 let result_closure = closure(transaction.clone(), maybe_committed_transaction).await;
107
108 if let Err(e) = result_closure {
109 if let Some(e) = e.get_fdb_error() {
111 maybe_committed_transaction = e.is_maybe_committed();
112 match transaction.on_error(e).await {
114 Ok(Ok(t)) => {
116 transaction = t;
117 continue;
118 }
119 Ok(Err(non_retryable_error)) => {
120 return Err(FdbBindingError::from(non_retryable_error))
121 }
122 Err(non_retryable_error) => return Err(non_retryable_error),
124 }
125 }
126 return Err(e);
128 }
129
130 let commit_result = transaction.commit().await;
131
132 match commit_result {
133 Err(err) => return Err(err),
135 Ok(Ok(_)) => return result_closure,
136 Ok(Err(transaction_commit_error)) => {
137 maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
138 match transaction_commit_error.on_error().await {
140 Ok(t) => {
141 transaction = RetryableTransaction::new(t);
142 continue;
143 }
144 Err(non_retryable_error) => {
145 return Err(FdbBindingError::from(non_retryable_error))
146 }
147 }
148 }
149 }
150 }
151 }
152
153 pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
168 where
169 F: DatabaseTransact,
170 {
171 let is_idempotent = options.is_idempotent;
172 let time_out = options.time_out.map(|d| Instant::now() + d);
173 let retry_limit = options.retry_limit;
174 let mut tries: u32 = 0;
175 let mut trx = self.create_trx()?;
176 let mut can_retry = move || {
177 tries += 1;
178 retry_limit.map(|limit| tries < limit).unwrap_or(true)
179 && time_out.map(|t| Instant::now() < t).unwrap_or(true)
180 };
181 loop {
182 let r = f.transact(trx).await;
183 f = r.0;
184 trx = r.1;
185 trx = match r.2 {
186 Ok(item) => match trx.commit().await {
187 Ok(_) => break Ok(item),
188 Err(e) => {
189 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
190 e.on_error().await?
191 } else {
192 break Err(F::Error::from(e.into()));
193 }
194 }
195 },
196 Err(user_err) => match user_err.try_into_fdb_error() {
197 Ok(e) => {
198 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
199 trx.on_error(e).await?
200 } else {
201 break Err(F::Error::from(e));
202 }
203 }
204 Err(user_err) => break Err(user_err),
205 },
206 };
207 }
208 }
209}
210
211#[cfg(feature = "fdb-7_1")]
212#[derive(Serialize, Deserialize, Debug)]
214pub struct TenantInfo {
215 pub id: i64,
216 pub prefix: Vec<u8>,
217 pub name: Vec<u8>,
218}
219
220#[cfg(feature = "fdb-7_3")]
221#[derive(Serialize, Deserialize, Debug)]
223pub struct TenantInfo {
224 pub id: i64,
225 pub prefix: FDBTenantPrintableInfo,
226 pub name: FDBTenantPrintableInfo,
227}
228
229impl TryFrom<(&[u8], &[u8])> for TenantInfo {
230 type Error = Error;
231
232 fn try_from(k_v: (&[u8], &[u8])) -> Result<Self, Self::Error> {
233 let value = k_v.1;
234 match serde_json::from_slice::<FDBTenantInfo>(value) {
235 #[cfg(feature = "fdb-7_1")]
236 Ok(tenant_info) => Ok(TenantInfo {
237 name: k_v.0.split_at(TENANT_MAP_PREFIX.len()).1.to_vec(),
238 id: tenant_info.id,
239 prefix: tenant_info.prefix,
240 }),
241
242 #[cfg(feature = "fdb-7_3")]
243 Ok(tenant_info) => Ok(TenantInfo {
244 name: tenant_info.name,
245 id: tenant_info.id,
246 prefix: tenant_info.prefix,
247 }),
248
249 Err(err) => Err(err),
250 }
251 }
252}
253
254#[cfg(feature = "fdb-7_1")]
256#[derive(Serialize, Deserialize, Debug)]
257struct FDBTenantInfo {
258 id: i64,
259 #[serde(with = "serde_bytes")]
260 prefix: Vec<u8>,
261}
262
263#[cfg(feature = "fdb-7_3")]
264#[derive(Serialize, Deserialize, Debug)]
265struct FDBTenantInfo {
266 id: i64,
267 lock_state: TenantLockState,
268 name: FDBTenantPrintableInfo,
269 prefix: FDBTenantPrintableInfo,
270}
271
272#[cfg(feature = "fdb-7_3")]
273#[derive(Serialize, Deserialize, Debug)]
274#[serde(rename_all = "camelCase")]
275enum TenantLockState {
276 Unlocked,
277 Locked,
278 ReadOnly,
279}
280
281#[cfg(feature = "fdb-7_3")]
282#[derive(Serialize, Deserialize, Debug)]
284pub struct FDBTenantPrintableInfo {
285 base64: String,
286 printable: String,
287}
288
289#[derive(Debug)]
292pub struct TenantManagement;
293
294impl TenantManagement {
295 pub async fn create_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
300 let checked_existence = AtomicBool::new(false);
301 let checked_existence_ref = &checked_existence;
302
303 let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
304 key.extend_from_slice(TENANT_MAP_PREFIX);
305 key.extend_from_slice(tenant_name);
306
307 let key_ref = &key;
308
309 db.run(|trx, _maybe_committed| async move {
310 trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
311
312 if checked_existence_ref.load(Ordering::SeqCst) {
313 trx.set(key_ref, &[]);
314 Ok(())
315 } else {
316 let maybe_key = trx.get(key_ref, false).await?;
317
318 checked_existence_ref.store(true, Ordering::SeqCst);
319
320 match maybe_key {
321 None => {
322 trx.set(key_ref, &[]);
323 Ok(())
324 }
325 Some(_) => {
326 Err(FdbBindingError::from(FdbError::new(2132)))
328 }
329 }
330 }
331 })
332 .await
333 .map_err(|e| e.get_fdb_error().unwrap())
335 }
336
337 pub async fn get_tenant(
339 db: &Database,
340 tenant_name: &[u8],
341 ) -> Result<Option<Result<TenantInfo, serde_json::Error>>, FdbError> {
342 let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
343 key.extend_from_slice(TENANT_MAP_PREFIX);
344 key.extend_from_slice(tenant_name);
345
346 let key_ref = &key;
347 match db
348 .run(|trx, _maybe_committed| async move {
349 trx.set_option(TransactionOption::ReadSystemKeys)?;
350 trx.set_option(TransactionOption::ReadLockAware)?;
351
352 Ok(trx.get(key_ref, false).await?)
353 })
354 .await
355 {
356 Ok(None) => Ok(None),
357 Ok(Some(kv)) => Ok(Some(TenantInfo::try_from((key.as_slice(), kv.as_ref())))),
358 Err(err) => Err(err.get_fdb_error().unwrap()),
360 }
361 }
362
363 pub async fn delete_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
368 let checked_existence = AtomicBool::new(false);
369 let checked_existence_ref = &checked_existence;
370
371 let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
372 key.extend_from_slice(TENANT_MAP_PREFIX);
373 key.extend_from_slice(tenant_name);
374
375 let key_ref = &key;
376
377 db.run(|trx, _maybe_committed| async move {
378 trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
379
380 if checked_existence_ref.load(Ordering::SeqCst) {
381 trx.clear(key_ref);
382 Ok(())
383 } else {
384 let maybe_key = trx.get(key_ref, false).await?;
385
386 checked_existence_ref.store(true, Ordering::SeqCst);
387
388 match maybe_key {
389 None => {
390 Err(FdbBindingError::from(FdbError::new(2131)))
392 }
393 Some(_) => {
394 trx.clear(key_ref);
395 Ok(())
396 }
397 }
398 }
399 })
400 .await
401 .map_err(|e| e.get_fdb_error().unwrap())
403 }
404
405 pub async fn list_tenant(
408 db: &Database,
409 begin: &[u8],
410 end: &[u8],
411 limit: Option<usize>,
412 ) -> Result<Vec<Result<TenantInfo, serde_json::Error>>, FdbError> {
413 let trx = db.create_trx()?;
414 trx.set_option(TransactionOption::ReadSystemKeys)?;
415 trx.set_option(TransactionOption::ReadLockAware)?;
416
417 let mut begin_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + begin.len());
418 begin_range.extend_from_slice(TENANT_MAP_PREFIX);
419 begin_range.extend_from_slice(begin);
420
421 let end_range = if end.is_empty() {
422 TENANT_MAP_PREFIX_END.to_vec()
423 } else {
424 let mut end_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + end.len());
425 end_range.extend_from_slice(TENANT_MAP_PREFIX);
426 end_range.extend_from_slice(end);
427
428 end_range
429 };
430
431 let range_option = RangeOption {
432 begin: KeySelector::first_greater_than(begin_range),
433 end: KeySelector::first_greater_than(end_range),
434 limit,
435 ..Default::default()
436 };
437
438 trx.get_ranges_keyvalues(range_option, false)
439 .map_ok(|fdb_value| TenantInfo::try_from((fdb_value.key(), fdb_value.value())))
440 .try_collect::<Vec<Result<TenantInfo, serde_json::Error>>>()
441 .await
442 }
443}