1use crate::options::TransactionOption;
11use std::future::Future;
12
13use crate::database::TransactError;
14use crate::{
15 error, Database, DatabaseTransact, FdbBindingError, FdbError, FdbResult, KeySelector,
16 RangeOption, RetryableTransaction, TransactOption, Transaction,
17};
18use foundationdb_macros::cfg_api_versions;
19use foundationdb_sys as fdb_sys;
20use fdb_sys::if_cfg_api_versions;
21use futures::TryStreamExt;
22use serde::{Deserialize, Serialize};
23use serde_json::Error;
24use std::ptr::NonNull;
25use std::sync::atomic::{AtomicBool, Ordering};
26use std::time::Instant;
27
28#[cfg(feature = "fdb-7_1")]
29const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant_map/";
30#[cfg(feature = "fdb-7_1")]
31const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant_map0";
32
33#[cfg_api_versions(min = 730)]
34const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant/map/";
35#[cfg_api_versions(min = 730)]
36const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant/map0";
37
38pub struct FdbTenant {
40 pub(crate) inner: NonNull<fdb_sys::FDBTenant>,
41 pub(crate) name: Vec<u8>,
42}
43
44unsafe impl Send for FdbTenant {}
45unsafe impl Sync for FdbTenant {}
46
47impl Drop for FdbTenant {
48 fn drop(&mut self) {
49 unsafe {
50 fdb_sys::fdb_tenant_destroy(self.inner.as_ptr());
51 }
52 }
53}
54
55impl FdbTenant {
56 pub fn get_name(&self) -> &[u8] {
58 &self.name
59 }
60
61 pub fn create_trx(&self) -> FdbResult<Transaction> {
63 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
64 let err = unsafe { fdb_sys::fdb_tenant_create_transaction(self.inner.as_ptr(), &mut trx) };
65 error::eval(err)?;
66 Ok(Transaction::new(NonNull::new(trx).expect(
67 "fdb_tenant_create_transaction to not return null if there is no error",
68 )))
69 }
70
71 fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
72 Ok(RetryableTransaction::new(self.create_trx()?))
73 }
74
75 pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
97 where
98 F: Fn(RetryableTransaction, bool) -> Fut,
99 Fut: Future<Output = Result<T, FdbBindingError>>,
100 {
101 let mut maybe_committed_transaction = false;
102 let mut transaction = self.create_retryable_trx()?;
105
106 loop {
107 let result_closure = closure(transaction.clone(), maybe_committed_transaction).await;
109
110 if let Err(e) = result_closure {
111 if let Some(e) = e.get_fdb_error() {
113 maybe_committed_transaction = e.is_maybe_committed();
114 match transaction.on_error(e).await {
116 Ok(Ok(t)) => {
118 transaction = t;
119 continue;
120 }
121 Ok(Err(non_retryable_error)) => {
122 return Err(FdbBindingError::from(non_retryable_error))
123 }
124 Err(non_retryable_error) => return Err(non_retryable_error),
126 }
127 }
128 return Err(e);
130 }
131
132 let commit_result = transaction.commit().await;
133
134 match commit_result {
135 Err(err) => return Err(err),
137 Ok(Ok(_)) => return result_closure,
138 Ok(Err(transaction_commit_error)) => {
139 maybe_committed_transaction = transaction_commit_error.is_maybe_committed();
140 match transaction_commit_error.on_error().await {
142 Ok(t) => {
143 transaction = RetryableTransaction::new(t);
144 continue;
145 }
146 Err(non_retryable_error) => {
147 return Err(FdbBindingError::from(non_retryable_error))
148 }
149 }
150 }
151 }
152 }
153 }
154
155 pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
170 where
171 F: DatabaseTransact,
172 {
173 let is_idempotent = options.is_idempotent;
174 let time_out = options.time_out.map(|d| Instant::now() + d);
175 let retry_limit = options.retry_limit;
176 let mut tries: u32 = 0;
177 let mut trx = self.create_trx()?;
178 let mut can_retry = move || {
179 tries += 1;
180 retry_limit.map(|limit| tries < limit).unwrap_or(true)
181 && time_out.map(|t| Instant::now() < t).unwrap_or(true)
182 };
183 loop {
184 let r = f.transact(trx).await;
185 f = r.0;
186 trx = r.1;
187 trx = match r.2 {
188 Ok(item) => match trx.commit().await {
189 Ok(_) => break Ok(item),
190 Err(e) => {
191 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
192 e.on_error().await?
193 } else {
194 break Err(F::Error::from(e.into()));
195 }
196 }
197 },
198 Err(user_err) => match user_err.try_into_fdb_error() {
199 Ok(e) => {
200 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
201 trx.on_error(e).await?
202 } else {
203 break Err(F::Error::from(e));
204 }
205 }
206 Err(user_err) => break Err(user_err),
207 },
208 };
209 }
210 }
211}
212
213#[cfg(feature = "fdb-7_1")]
214#[derive(Serialize, Deserialize, Debug)]
216pub struct TenantInfo {
217 pub id: i64,
218 pub prefix: Vec<u8>,
219 pub name: Vec<u8>,
220}
221
222#[cfg_api_versions(min = 730)]
223#[derive(Serialize, Deserialize, Debug)]
225pub struct TenantInfo {
226 pub id: i64,
227 pub prefix: FDBTenantPrintableInfo,
228 pub name: FDBTenantPrintableInfo,
229}
230
231impl TryFrom<(&[u8], &[u8])> for TenantInfo {
232 type Error = Error;
233
234 fn try_from(k_v: (&[u8], &[u8])) -> Result<Self, Self::Error> {
235 let value = k_v.1;
236 match serde_json::from_slice::<FDBTenantInfo>(value) {
237 Ok(tenant_info) => if_cfg_api_versions!(min = 730 => {
238 Ok(TenantInfo {
239 name: tenant_info.name,
240 id: tenant_info.id,
241 prefix: tenant_info.prefix,
242 })
243 } else {
244 Ok(TenantInfo {
245 name: k_v.0.split_at(TENANT_MAP_PREFIX.len()).1.to_vec(),
246 id: tenant_info.id,
247 prefix: tenant_info.prefix,
248 })
249 }),
250
251 Err(err) => Err(err),
252 }
253 }
254}
255
256#[cfg(feature = "fdb-7_1")]
258#[derive(Serialize, Deserialize, Debug)]
259struct FDBTenantInfo {
260 id: i64,
261 #[serde(with = "serde_bytes")]
262 prefix: Vec<u8>,
263}
264
265#[cfg_api_versions(min = 730)]
266#[derive(Serialize, Deserialize, Debug)]
267struct FDBTenantInfo {
268 id: i64,
269 lock_state: TenantLockState,
270 name: FDBTenantPrintableInfo,
271 prefix: FDBTenantPrintableInfo,
272}
273
274#[cfg_api_versions(min = 730)]
275#[derive(Serialize, Deserialize, Debug)]
276#[serde(rename_all = "camelCase")]
277enum TenantLockState {
278 Unlocked,
279 Locked,
280 ReadOnly,
281}
282
283#[cfg_api_versions(min = 730)]
284#[derive(Serialize, Deserialize, Debug)]
286pub struct FDBTenantPrintableInfo {
287 base64: String,
288 printable: String,
289}
290
291#[derive(Debug)]
294pub struct TenantManagement;
295
296impl TenantManagement {
297 pub async fn create_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
302 let checked_existence = AtomicBool::new(false);
303 let checked_existence_ref = &checked_existence;
304
305 let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
306 key.extend_from_slice(TENANT_MAP_PREFIX);
307 key.extend_from_slice(tenant_name);
308
309 let key_ref = &key;
310
311 db.run(|trx, _maybe_committed| async move {
312 trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
313
314 if checked_existence_ref.load(Ordering::SeqCst) {
315 trx.set(key_ref, &[]);
316 Ok(())
317 } else {
318 let maybe_key = trx.get(key_ref, false).await?;
319
320 checked_existence_ref.store(true, Ordering::SeqCst);
321
322 match maybe_key {
323 None => {
324 trx.set(key_ref, &[]);
325 Ok(())
326 }
327 Some(_) => {
328 Err(FdbBindingError::from(FdbError::new(2132)))
330 }
331 }
332 }
333 })
334 .await
335 .map_err(|e| e.get_fdb_error().unwrap())
337 }
338
339 pub async fn get_tenant(
341 db: &Database,
342 tenant_name: &[u8],
343 ) -> Result<Option<Result<TenantInfo, serde_json::Error>>, FdbError> {
344 let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
345 key.extend_from_slice(TENANT_MAP_PREFIX);
346 key.extend_from_slice(tenant_name);
347
348 let key_ref = &key;
349 match db
350 .run(|trx, _maybe_committed| async move {
351 trx.set_option(TransactionOption::ReadSystemKeys)?;
352 trx.set_option(TransactionOption::ReadLockAware)?;
353
354 Ok(trx.get(key_ref, false).await?)
355 })
356 .await
357 {
358 Ok(None) => Ok(None),
359 Ok(Some(kv)) => Ok(Some(TenantInfo::try_from((key.as_slice(), kv.as_ref())))),
360 Err(err) => Err(err.get_fdb_error().unwrap()),
362 }
363 }
364
365 pub async fn delete_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
370 let checked_existence = AtomicBool::new(false);
371 let checked_existence_ref = &checked_existence;
372
373 let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
374 key.extend_from_slice(TENANT_MAP_PREFIX);
375 key.extend_from_slice(tenant_name);
376
377 let key_ref = &key;
378
379 db.run(|trx, _maybe_committed| async move {
380 trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
381
382 if checked_existence_ref.load(Ordering::SeqCst) {
383 trx.clear(key_ref);
384 Ok(())
385 } else {
386 let maybe_key = trx.get(key_ref, false).await?;
387
388 checked_existence_ref.store(true, Ordering::SeqCst);
389
390 match maybe_key {
391 None => {
392 Err(FdbBindingError::from(FdbError::new(2131)))
394 }
395 Some(_) => {
396 trx.clear(key_ref);
397 Ok(())
398 }
399 }
400 }
401 })
402 .await
403 .map_err(|e| e.get_fdb_error().unwrap())
405 }
406
407 pub async fn list_tenant(
410 db: &Database,
411 begin: &[u8],
412 end: &[u8],
413 limit: Option<usize>,
414 ) -> Result<Vec<Result<TenantInfo, serde_json::Error>>, FdbError> {
415 let trx = db.create_trx()?;
416 trx.set_option(TransactionOption::ReadSystemKeys)?;
417 trx.set_option(TransactionOption::ReadLockAware)?;
418
419 let mut begin_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + begin.len());
420 begin_range.extend_from_slice(TENANT_MAP_PREFIX);
421 begin_range.extend_from_slice(begin);
422
423 let end_range = if end.is_empty() {
424 TENANT_MAP_PREFIX_END.to_vec()
425 } else {
426 let mut end_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + end.len());
427 end_range.extend_from_slice(TENANT_MAP_PREFIX);
428 end_range.extend_from_slice(end);
429
430 end_range
431 };
432
433 let range_option = RangeOption {
434 begin: KeySelector::first_greater_than(begin_range),
435 end: KeySelector::first_greater_than(end_range),
436 limit,
437 ..Default::default()
438 };
439
440 trx.get_ranges_keyvalues(range_option, false)
441 .map_ok(|fdb_value| TenantInfo::try_from((fdb_value.key(), fdb_value.value())))
442 .try_collect::<Vec<Result<TenantInfo, serde_json::Error>>>()
443 .await
444 }
445}