1use crate::options::TransactionOption;
11use std::future::Future;
12
13use crate::database::TransactError;
14use crate::database::{run_with_hooks, NoopHooks};
15use crate::{
16 error, Database, DatabaseTransact, FdbBindingError, FdbError, FdbResult, KeySelector,
17 RangeOption, RetryableTransaction, TransactOption, Transaction,
18};
19use foundationdb_macros::cfg_api_versions;
20use foundationdb_sys as fdb_sys;
21use fdb_sys::if_cfg_api_versions;
22use futures::TryStreamExt;
23use serde::{Deserialize, Serialize};
24use serde_json::Error;
25use std::ptr::NonNull;
26use std::sync::atomic::{AtomicBool, Ordering};
27use std::time::Instant;
28
29#[cfg(feature = "fdb-7_1")]
30const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant_map/";
31#[cfg(feature = "fdb-7_1")]
32const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant_map0";
33
34#[cfg_api_versions(min = 730)]
35const TENANT_MAP_PREFIX: &[u8] = b"\xFF\xFF/management/tenant/map/";
36#[cfg_api_versions(min = 730)]
37const TENANT_MAP_PREFIX_END: &[u8] = b"\xFF\xFF/management/tenant/map0";
38
39pub struct FdbTenant {
41 pub(crate) inner: NonNull<fdb_sys::FDBTenant>,
42 pub(crate) name: Vec<u8>,
43}
44
45unsafe impl Send for FdbTenant {}
46unsafe impl Sync for FdbTenant {}
47
48impl Drop for FdbTenant {
49 fn drop(&mut self) {
50 unsafe {
51 fdb_sys::fdb_tenant_destroy(self.inner.as_ptr());
52 }
53 }
54}
55
56impl FdbTenant {
57 pub fn get_name(&self) -> &[u8] {
59 &self.name
60 }
61
62 pub fn create_trx(&self) -> FdbResult<Transaction> {
64 let mut trx: *mut fdb_sys::FDBTransaction = std::ptr::null_mut();
65 let err = unsafe { fdb_sys::fdb_tenant_create_transaction(self.inner.as_ptr(), &mut trx) };
66 error::eval(err)?;
67 Ok(Transaction::new(NonNull::new(trx).expect(
68 "fdb_tenant_create_transaction to not return null if there is no error",
69 )))
70 }
71
72 fn create_retryable_trx(&self) -> FdbResult<RetryableTransaction> {
73 Ok(RetryableTransaction::new(self.create_trx()?))
74 }
75
76 pub async fn run<F, Fut, T>(&self, closure: F) -> Result<T, FdbBindingError>
98 where
99 F: Fn(RetryableTransaction, bool) -> Fut,
100 Fut: Future<Output = Result<T, FdbBindingError>>,
101 {
102 let transaction = self.create_retryable_trx()?;
103 run_with_hooks(transaction, &NoopHooks, |trx, mc| {
104 closure(trx, mc.into())
105 })
106 .await
107 }
108
109 pub async fn transact<F>(&self, mut f: F, options: TransactOption) -> Result<F::Item, F::Error>
124 where
125 F: DatabaseTransact,
126 {
127 let is_idempotent = options.is_idempotent;
128 let time_out = options.time_out.map(|d| Instant::now() + d);
129 let retry_limit = options.retry_limit;
130 let mut tries: u32 = 0;
131 let mut trx = self.create_trx()?;
132 let mut can_retry = move || {
133 tries += 1;
134 retry_limit.map(|limit| tries < limit).unwrap_or(true)
135 && time_out.map(|t| Instant::now() < t).unwrap_or(true)
136 };
137 loop {
138 let r = f.transact(trx).await;
139 f = r.0;
140 trx = r.1;
141 trx = match r.2 {
142 Ok(item) => match trx.commit().await {
143 Ok(_) => break Ok(item),
144 Err(e) => {
145 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
146 e.on_error().await?
147 } else {
148 break Err(F::Error::from(e.into()));
149 }
150 }
151 },
152 Err(user_err) => match user_err.try_into_fdb_error() {
153 Ok(e) => {
154 if (is_idempotent || !e.is_maybe_committed()) && can_retry() {
155 trx.on_error(e).await?
156 } else {
157 break Err(F::Error::from(e));
158 }
159 }
160 Err(user_err) => break Err(user_err),
161 },
162 };
163 }
164 }
165}
166
167#[cfg(feature = "fdb-7_1")]
168#[derive(Serialize, Deserialize, Debug)]
170pub struct TenantInfo {
171 pub id: i64,
172 pub prefix: Vec<u8>,
173 pub name: Vec<u8>,
174}
175
176#[cfg_api_versions(min = 730)]
177#[derive(Serialize, Deserialize, Debug)]
179pub struct TenantInfo {
180 pub id: i64,
181 pub prefix: FDBTenantPrintableInfo,
182 pub name: FDBTenantPrintableInfo,
183}
184
185impl TryFrom<(&[u8], &[u8])> for TenantInfo {
186 type Error = Error;
187
188 fn try_from(k_v: (&[u8], &[u8])) -> Result<Self, Self::Error> {
189 let value = k_v.1;
190 match serde_json::from_slice::<FDBTenantInfo>(value) {
191 Ok(tenant_info) => if_cfg_api_versions!(min = 730 => {
192 Ok(TenantInfo {
193 name: tenant_info.name,
194 id: tenant_info.id,
195 prefix: tenant_info.prefix,
196 })
197 } else {
198 Ok(TenantInfo {
199 name: k_v.0.split_at(TENANT_MAP_PREFIX.len()).1.to_vec(),
200 id: tenant_info.id,
201 prefix: tenant_info.prefix,
202 })
203 }),
204
205 Err(err) => Err(err),
206 }
207 }
208}
209
210#[cfg(feature = "fdb-7_1")]
212#[derive(Serialize, Deserialize, Debug)]
213struct FDBTenantInfo {
214 id: i64,
215 #[serde(with = "serde_bytes")]
216 prefix: Vec<u8>,
217}
218
219#[cfg_api_versions(min = 730)]
220#[derive(Serialize, Deserialize, Debug)]
221struct FDBTenantInfo {
222 id: i64,
223 lock_state: TenantLockState,
224 name: FDBTenantPrintableInfo,
225 prefix: FDBTenantPrintableInfo,
226}
227
228#[cfg_api_versions(min = 730)]
229#[derive(Serialize, Deserialize, Debug)]
230#[serde(rename_all = "camelCase")]
231enum TenantLockState {
232 Unlocked,
233 Locked,
234 ReadOnly,
235}
236
237#[cfg_api_versions(min = 730)]
238#[derive(Serialize, Deserialize, Debug)]
240pub struct FDBTenantPrintableInfo {
241 base64: String,
242 printable: String,
243}
244
245#[derive(Debug)]
248pub struct TenantManagement;
249
250impl TenantManagement {
251 pub async fn create_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
256 let checked_existence = AtomicBool::new(false);
257 let checked_existence_ref = &checked_existence;
258
259 let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
260 key.extend_from_slice(TENANT_MAP_PREFIX);
261 key.extend_from_slice(tenant_name);
262
263 let key_ref = &key;
264
265 db.run(|trx, _maybe_committed| async move {
266 trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
267
268 if checked_existence_ref.load(Ordering::SeqCst) {
269 trx.set(key_ref, &[]);
270 Ok(())
271 } else {
272 let maybe_key = trx.get(key_ref, false).await?;
273
274 checked_existence_ref.store(true, Ordering::SeqCst);
275
276 match maybe_key {
277 None => {
278 trx.set(key_ref, &[]);
279 Ok(())
280 }
281 Some(_) => {
282 Err(FdbBindingError::from(FdbError::new(2132)))
284 }
285 }
286 }
287 })
288 .await
289 .map_err(|e| e.get_fdb_error().unwrap())
291 }
292
293 pub async fn get_tenant(
295 db: &Database,
296 tenant_name: &[u8],
297 ) -> Result<Option<Result<TenantInfo, serde_json::Error>>, FdbError> {
298 let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
299 key.extend_from_slice(TENANT_MAP_PREFIX);
300 key.extend_from_slice(tenant_name);
301
302 let key_ref = &key;
303 match db
304 .run(|trx, _maybe_committed| async move {
305 trx.set_option(TransactionOption::ReadSystemKeys)?;
306 trx.set_option(TransactionOption::ReadLockAware)?;
307
308 Ok(trx.get(key_ref, false).await?)
309 })
310 .await
311 {
312 Ok(None) => Ok(None),
313 Ok(Some(kv)) => Ok(Some(TenantInfo::try_from((key.as_slice(), kv.as_ref())))),
314 Err(err) => Err(err.get_fdb_error().unwrap()),
316 }
317 }
318
319 pub async fn delete_tenant(db: &Database, tenant_name: &[u8]) -> Result<(), FdbError> {
324 let checked_existence = AtomicBool::new(false);
325 let checked_existence_ref = &checked_existence;
326
327 let mut key: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + tenant_name.len());
328 key.extend_from_slice(TENANT_MAP_PREFIX);
329 key.extend_from_slice(tenant_name);
330
331 let key_ref = &key;
332
333 db.run(|trx, _maybe_committed| async move {
334 trx.set_option(TransactionOption::SpecialKeySpaceEnableWrites)?;
335
336 if checked_existence_ref.load(Ordering::SeqCst) {
337 trx.clear(key_ref);
338 Ok(())
339 } else {
340 let maybe_key = trx.get(key_ref, false).await?;
341
342 checked_existence_ref.store(true, Ordering::SeqCst);
343
344 match maybe_key {
345 None => {
346 Err(FdbBindingError::from(FdbError::new(2131)))
348 }
349 Some(_) => {
350 trx.clear(key_ref);
351 Ok(())
352 }
353 }
354 }
355 })
356 .await
357 .map_err(|e| e.get_fdb_error().unwrap())
359 }
360
361 pub async fn list_tenant(
364 db: &Database,
365 begin: &[u8],
366 end: &[u8],
367 limit: Option<usize>,
368 ) -> Result<Vec<Result<TenantInfo, serde_json::Error>>, FdbError> {
369 let trx = db.create_trx()?;
370 trx.set_option(TransactionOption::ReadSystemKeys)?;
371 trx.set_option(TransactionOption::ReadLockAware)?;
372
373 let mut begin_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + begin.len());
374 begin_range.extend_from_slice(TENANT_MAP_PREFIX);
375 begin_range.extend_from_slice(begin);
376
377 let end_range = if end.is_empty() {
378 TENANT_MAP_PREFIX_END.to_vec()
379 } else {
380 let mut end_range: Vec<u8> = Vec::with_capacity(TENANT_MAP_PREFIX.len() + end.len());
381 end_range.extend_from_slice(TENANT_MAP_PREFIX);
382 end_range.extend_from_slice(end);
383
384 end_range
385 };
386
387 let range_option = RangeOption {
388 begin: KeySelector::first_greater_than(begin_range),
389 end: KeySelector::first_greater_than(end_range),
390 limit,
391 ..Default::default()
392 };
393
394 trx.get_ranges_keyvalues(range_option, false)
395 .map_ok(|fdb_value| TenantInfo::try_from((fdb_value.key(), fdb_value.value())))
396 .try_collect::<Vec<Result<TenantInfo, serde_json::Error>>>()
397 .await
398 }
399}