1use crate::RangeOption;
12use crate::directory::directory_partition::DirectoryPartition;
13use crate::directory::directory_subspace::DirectorySubspace;
14use crate::directory::error::DirectoryError;
15use crate::directory::node::Node;
16use crate::directory::{Directory, DirectoryOutput, compare_slice, strinc};
17use crate::future::FdbSlice;
18use crate::tuple::hca::HighContentionAllocator;
19use crate::tuple::{Element, Subspace, TuplePack};
20use crate::{FdbResult, Transaction};
21use async_recursion::async_recursion;
22use async_trait::async_trait;
23use futures::TryStreamExt;
24use std::cmp::Ordering;
25use std::ops::Deref;
26use std::sync::Arc;
27
28pub(crate) const DEFAULT_SUB_DIRS: i64 = 0;
29const MAJOR_VERSION: u32 = 1;
30const MINOR_VERSION: u32 = 0;
31const PATCH_VERSION: u32 = 0;
32pub(crate) const DEFAULT_NODE_PREFIX: &[u8] = b"\xFE";
33const DEFAULT_HCA_PREFIX: &[u8] = b"hca";
34pub(crate) const PARTITION_LAYER: &[u8] = b"partition";
35pub(crate) const LAYER_SUFFIX: &[u8] = b"layer";
36
37#[derive(Clone)]
42pub struct DirectoryLayer {
43 pub(crate) inner: Arc<DirectoryLayerInner>,
44}
45
46impl std::fmt::Debug for DirectoryLayer {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 self.inner.fmt(f)
49 }
50}
51
52#[derive(Debug)]
53pub struct DirectoryLayerInner {
54 pub(crate) root_node: Subspace,
55 pub(crate) node_subspace: Subspace,
56 pub(crate) content_subspace: Subspace,
57 pub(crate) allocator: HighContentionAllocator,
58 pub(crate) allow_manual_prefixes: bool,
59
60 pub(crate) path: Vec<String>,
61}
62
63impl Deref for DirectoryLayer {
64 type Target = DirectoryLayerInner;
65
66 fn deref(&self) -> &Self::Target {
67 &self.inner
68 }
69}
70
71impl Default for DirectoryLayer {
72 fn default() -> Self {
78 Self::new(
79 Subspace::from_bytes(DEFAULT_NODE_PREFIX),
80 Subspace::all(),
81 false,
82 )
83 }
84}
85
86impl DirectoryLayer {
87 pub fn new(
88 node_subspace: Subspace,
89 content_subspace: Subspace,
90 allow_manual_prefixes: bool,
91 ) -> Self {
92 let root_node = node_subspace.subspace(&node_subspace.bytes());
93 let allocator = HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX));
94
95 DirectoryLayer {
96 inner: Arc::new(DirectoryLayerInner {
97 root_node,
98 node_subspace,
99 content_subspace,
100 allocator,
101 allow_manual_prefixes,
102 path: vec![],
103 }),
104 }
105 }
106
107 pub(crate) fn new_with_path(
108 node_subspace: Subspace,
109 content_subspace: Subspace,
110 allow_manual_prefixes: bool,
111 path: &[String],
112 ) -> Self {
113 let root_node = node_subspace.subspace(&node_subspace.bytes());
114 let allocator = HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX));
115
116 DirectoryLayer {
117 inner: Arc::new(DirectoryLayerInner {
118 root_node,
119 node_subspace,
120 content_subspace,
121 allocator,
122 allow_manual_prefixes,
123 path: Vec::from(path),
124 }),
125 }
126 }
127
128 pub fn get_path(&self) -> &[String] {
129 self.path.as_slice()
130 }
131
132 fn node_with_optional_prefix(&self, prefix: Option<FdbSlice>) -> Option<Subspace> {
133 prefix.map(|fdb_slice| self.node_with_prefix(&fdb_slice.deref()))
134 }
135
136 fn node_with_prefix<T: TuplePack>(&self, prefix: &T) -> Subspace {
137 self.inner.node_subspace.subspace(prefix)
138 }
139
140 async fn find(
141 &self,
142 trx: &Transaction,
143 path: &[String],
144 ) -> Result<Option<Node>, DirectoryError> {
145 let mut current_path = vec![];
146 let mut node_subspace = self.root_node.clone();
147 let mut layer = vec![];
148 let mut loaded = false;
149
150 for path_name in path.iter() {
152 current_path.push(path_name.clone());
153 let key = node_subspace.subspace(&(DEFAULT_SUB_DIRS, path_name.to_owned()));
154
155 let fdb_slice_value = trx.get(key.bytes(), false).await?;
157
158 loaded = true;
159 node_subspace = match self.node_with_optional_prefix(fdb_slice_value) {
160 None => return Ok(None),
161 Some(subspace) => subspace,
162 };
163
164 layer = Node::load_metadata(trx, &node_subspace).await?;
165 if layer.as_slice().eq(PARTITION_LAYER) {
166 break;
167 }
168 }
169
170 if !loaded {
171 layer = Node::load_metadata(trx, &node_subspace).await?;
172 }
173
174 Ok(Some(Node {
175 subspace: node_subspace,
176 current_path,
177 target_path: Vec::from(path),
178 directory_layer: self.clone(),
179 layer,
180 }))
181 }
182
183 fn to_absolute_path(&self, sub_path: &[String]) -> Vec<String> {
184 let mut path: Vec<String> = Vec::with_capacity(self.path.len() + sub_path.len());
185
186 path.extend_from_slice(&self.path);
187 path.extend_from_slice(sub_path);
188
189 path
190 }
191
192 pub(crate) fn contents_of_node(
193 &self,
194 subspace: &Subspace,
195 path: &[String],
196 layer: &[u8],
197 ) -> Result<DirectoryOutput, DirectoryError> {
198 let prefix: Vec<u8> = self.node_subspace.unpack(subspace.bytes())?;
199
200 if layer.eq(PARTITION_LAYER) {
201 Ok(DirectoryOutput::DirectoryPartition(
202 DirectoryPartition::new(&self.to_absolute_path(path), prefix, self.clone()),
203 ))
204 } else {
205 Ok(DirectoryOutput::DirectorySubspace(DirectorySubspace::new(
206 &self.to_absolute_path(path),
207 prefix,
208 self,
209 layer.to_owned(),
210 )))
211 }
212 }
213
214 #[async_recursion]
216 async fn create_or_open_internal(
217 &self,
218 trx: &Transaction,
219 path: &[String],
220 prefix: Option<&'async_recursion [u8]>,
221 layer: Option<&'async_recursion [u8]>,
222 allow_create: bool,
223 allow_open: bool,
224 ) -> Result<DirectoryOutput, DirectoryError> {
225 self.check_version(trx, false).await?;
226
227 if prefix.is_some() && !self.allow_manual_prefixes {
228 if self.path.is_empty() {
229 return Err(DirectoryError::PrefixNotAllowed);
230 }
231
232 return Err(DirectoryError::CannotPrefixInPartition);
233 }
234
235 if path.is_empty() {
236 return Err(DirectoryError::NoPathProvided);
237 }
238
239 if let Some(node) = self.find(trx, path).await? {
240 if node.is_in_partition(false) {
241 let sub_path = node.get_partition_subpath();
242 match node.get_contents()? {
243 DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"),
244 DirectoryOutput::DirectoryPartition(directory_partition) => {
245 let dir_space = directory_partition
246 .directory_subspace
247 .directory_layer
248 .create_or_open_internal(
249 trx,
250 &sub_path,
251 prefix,
252 layer,
253 allow_create,
254 allow_open,
255 )
256 .await?;
257 Ok(dir_space)
258 }
259 }
260 } else {
261 self.open_internal(layer, &node, allow_open).await
262 }
263 } else {
264 self.create_internal(trx, path, layer, prefix, allow_create)
265 .await
266 }
267 }
268
269 async fn open_internal(
270 &self,
271 layer: Option<&[u8]>,
272 node: &Node,
273 allow_open: bool,
274 ) -> Result<DirectoryOutput, DirectoryError> {
275 if !allow_open {
276 return Err(DirectoryError::DirAlreadyExists);
277 }
278
279 match layer {
280 None => {}
281 Some(layer) if !layer.is_empty() => match compare_slice(layer, &node.layer) {
282 Ordering::Equal => {}
283 _ => {
284 return Err(DirectoryError::IncompatibleLayer);
285 }
286 },
287 Some(_) => {}
288 }
289
290 node.get_contents()
291 }
292
293 async fn create_internal(
294 &self,
295 trx: &Transaction,
296 path: &[String],
297 layer: Option<&[u8]>,
298 prefix: Option<&[u8]>,
299 allow_create: bool,
300 ) -> Result<DirectoryOutput, DirectoryError> {
301 let path_last = path.last().ok_or(DirectoryError::NoPathProvided)?;
302
303 if !allow_create {
304 return Err(DirectoryError::DirectoryDoesNotExists);
305 }
306
307 let layer = layer.unwrap_or_default();
308
309 self.check_version(trx, true).await?;
310 let new_prefix = self.get_prefix(trx, prefix).await?;
311
312 let is_prefix_free = self
313 .is_prefix_free(trx, new_prefix.as_slice(), prefix.is_none())
314 .await?;
315
316 if !is_prefix_free {
317 return Err(DirectoryError::DirectoryPrefixInUse);
318 }
319
320 let parent_node = self.get_parent_node(trx, path).await?;
321 let node = self.node_with_prefix(&new_prefix);
322
323 let key = parent_node.subspace(&(DEFAULT_SUB_DIRS, path_last));
324 let key_layer = node.pack(&LAYER_SUFFIX);
325
326 trx.set(key.bytes(), &new_prefix);
327 trx.set(&key_layer, layer);
328
329 self.contents_of_node(&node, path, layer)
330 }
331
332 async fn get_parent_node(
333 &self,
334 trx: &Transaction,
335 path: &[String],
336 ) -> Result<Subspace, DirectoryError> {
337 match path.split_last() {
338 None => Ok(self.root_node.clone()),
339 Some((_, remains)) => {
340 if remains.is_empty() {
341 return Ok(self.root_node.clone());
342 }
343 let parent = self
344 .create_or_open_internal(trx, remains, None, None, true, true)
345 .await?;
346
347 Ok(self.node_with_prefix(&parent.bytes()?))
348 }
349 }
350 }
351
352 async fn is_prefix_free(
353 &self,
354 trx: &Transaction,
355 prefix: &[u8],
356 snapshot: bool,
357 ) -> Result<bool, DirectoryError> {
358 if prefix.is_empty() {
359 return Ok(false);
360 }
361
362 let node = self.node_containing_key(trx, prefix, snapshot).await?;
363
364 if node.is_some() {
365 return Ok(false);
366 }
367
368 let mut range_option = RangeOption::from((
369 self.node_subspace.pack(&prefix),
370 self.node_subspace.pack(&strinc(prefix.to_vec())),
371 ));
372 range_option.limit = Some(1);
373
374 let result = trx
375 .get_ranges_keyvalues(range_option, snapshot)
376 .try_next()
377 .await?;
378
379 Ok(result.is_none())
380 }
381
382 async fn node_containing_key(
383 &self,
384 trx: &Transaction,
385 key: &[u8],
386 snapshot: bool,
387 ) -> Result<Option<Subspace>, DirectoryError> {
388 if key.starts_with(self.node_subspace.bytes()) {
389 return Ok(Some(self.root_node.clone()));
390 }
391
392 let mut key_after = key.to_vec();
393 key_after.push(0);
395
396 let range_end = self.node_subspace.pack(&key_after);
397
398 let mut range_option = RangeOption::from((self.node_subspace.range().0, range_end));
399 range_option.reverse = true;
400 range_option.limit = Some(1);
401
402 let fdb_value = trx
404 .get_ranges_keyvalues(range_option, snapshot)
405 .try_next()
406 .await?;
407
408 if let Some(fdb_key_value) = fdb_value {
409 let previous_prefix: Vec<Element> = self.node_subspace.unpack(fdb_key_value.key())?;
410
411 if let Some(Element::Bytes(previous_prefix)) = previous_prefix.first() {
412 if key.starts_with(previous_prefix) {
413 return Ok(Some(self.node_with_prefix(previous_prefix)));
414 };
415 };
416 }
417 Ok(None)
418 }
419
420 async fn get_prefix(
421 &self,
422 trx: &Transaction,
423 prefix: Option<&[u8]>,
424 ) -> Result<Vec<u8>, DirectoryError> {
425 match prefix {
426 None => {
427 let allocator = self.allocator.allocate(trx).await?;
429 let subspace = self.content_subspace.subspace(&allocator);
430
431 let mut range_option = RangeOption::from(subspace.range());
433 range_option.limit = Some(1);
434
435 let result = trx
436 .get_ranges_keyvalues(range_option, false)
437 .try_next()
438 .await?;
439
440 if result.is_some() {
441 return Err(DirectoryError::PrefixNotEmpty);
442 }
443
444 Ok(subspace.into_bytes())
445 }
446 Some(v) => Ok(v.to_vec()),
447 }
448 }
449
450 async fn check_version(
452 &self,
453 trx: &Transaction,
454 allow_creation: bool,
455 ) -> Result<(), DirectoryError> {
456 let version = self.get_version_value(trx).await?;
457 match version {
458 None => {
459 if allow_creation {
460 self.initialize_directory(trx).await
461 } else {
462 Ok(())
463 }
464 }
465 Some(versions) => {
466 if versions.len() < 12 {
467 return Err(DirectoryError::Version(
468 "incorrect version length".to_string(),
469 ));
470 }
471 let mut arr = [0u8; 4];
472 arr.copy_from_slice(&versions[0..4]);
473 let major: u32 = u32::from_le_bytes(arr);
474
475 arr.copy_from_slice(&versions[4..8]);
476 let minor: u32 = u32::from_le_bytes(arr);
477
478 arr.copy_from_slice(&versions[8..12]);
479 let patch: u32 = u32::from_le_bytes(arr);
480
481 if major > MAJOR_VERSION {
482 let msg = format!(
483 "cannot load directory with version {major}.{minor}.{patch} using directory layer {MAJOR_VERSION}.{MINOR_VERSION}.{PATCH_VERSION}"
484 );
485 return Err(DirectoryError::Version(msg));
486 }
487
488 if minor > MINOR_VERSION {
489 let msg = format!(
490 "directory with version {major}.{minor}.{patch} is read-only when opened using directory layer {MAJOR_VERSION}.{MINOR_VERSION}.{PATCH_VERSION}"
491 );
492 return Err(DirectoryError::Version(msg));
493 }
494
495 Ok(())
496 }
497 }
498 }
499
500 async fn initialize_directory(&self, trx: &Transaction) -> Result<(), DirectoryError> {
502 let mut value = vec![];
503 value.extend(&MAJOR_VERSION.to_le_bytes());
504 value.extend(&MINOR_VERSION.to_le_bytes());
505 value.extend(&PATCH_VERSION.to_le_bytes());
506 let version_subspace: &[u8] = b"version";
507 let directory_version_key = self.root_node.subspace(&version_subspace);
508 trx.set(directory_version_key.bytes(), &value);
509
510 Ok(())
511 }
512
513 async fn get_version_value(&self, trx: &Transaction) -> FdbResult<Option<FdbSlice>> {
514 let version_subspace: &[u8] = b"version";
515 let version_key = self.root_node.subspace(&version_subspace);
516
517 trx.get(version_key.bytes(), false).await
518 }
519
520 async fn exists_internal(
521 &self,
522 trx: &Transaction,
523 path: &[String],
524 ) -> Result<bool, DirectoryError> {
525 self.check_version(trx, false).await?;
526
527 match self.find(trx, path).await? {
528 None => Ok(false),
529 Some(node) if node.is_in_partition(false) => {
530 node.get_contents()?
531 .exists(trx, &node.get_partition_subpath())
532 .await
533 }
534 Some(_node) => Ok(true),
535 }
536 }
537
538 async fn list_internal(
539 &self,
540 trx: &Transaction,
541 path: &[String],
542 ) -> Result<Vec<String>, DirectoryError> {
543 self.check_version(trx, false).await?;
544
545 let node = self
546 .find(trx, path)
547 .await?
548 .ok_or(DirectoryError::PathDoesNotExists)?;
549 if node.is_in_partition(true) {
550 match node.get_contents()? {
551 DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"),
552 DirectoryOutput::DirectoryPartition(directory_partition) => {
553 return directory_partition
554 .directory_subspace
555 .directory_layer
556 .list(trx, &node.get_partition_subpath())
557 .await;
558 }
559 };
560 }
561
562 node.list_sub_folders(trx).await
563 }
564
565 async fn move_to_internal(
566 &self,
567 trx: &Transaction,
568 old_path: &[String],
569 new_path: &[String],
570 ) -> Result<DirectoryOutput, DirectoryError> {
571 self.check_version(trx, true).await?;
572
573 if old_path.len() <= new_path.len()
574 && compare_slice(old_path, &new_path[..old_path.len()]) == Ordering::Equal
575 {
576 return Err(DirectoryError::CannotMoveBetweenSubdirectory);
577 }
578
579 let old_node = self.find(trx, old_path).await?;
580 let new_node = self.find(trx, new_path).await?;
581
582 let old_node = match old_node {
583 None => return Err(DirectoryError::PathDoesNotExists),
584 Some(n) => n,
585 };
586 let old_node_exists_in_partition = old_node.is_in_partition(false);
587
588 match new_node {
589 None => {
590 if old_node_exists_in_partition {
591 return Err(DirectoryError::CannotMoveBetweenPartition);
592 }
593 }
594 Some(new_node) => {
595 let new_node_exists_in_partition = new_node.is_in_partition(false);
596 if old_node_exists_in_partition || new_node_exists_in_partition {
597 if !old_node_exists_in_partition
598 || !new_node_exists_in_partition
599 || !old_node.current_path.eq(&new_node.current_path)
600 {
601 return Err(DirectoryError::CannotMoveBetweenPartition);
602 }
603
604 return new_node
605 .get_contents()?
606 .move_to(
607 trx,
608 &old_node.get_partition_subpath(),
609 &new_node.get_partition_subpath(),
610 )
611 .await;
612 }
613 return Err(DirectoryError::DirAlreadyExists);
614 }
615 }
616
617 let (new_path_last, parent_path) = new_path
618 .split_last()
619 .ok_or(DirectoryError::DirAlreadyExists)?;
620
621 let parent_node = self
622 .find(trx, parent_path)
623 .await?
624 .ok_or(DirectoryError::ParentDirDoesNotExists)?;
625
626 let key = parent_node
627 .subspace
628 .subspace(&(DEFAULT_SUB_DIRS, new_path_last));
629 let value: Vec<u8> = self.node_subspace.unpack(old_node.subspace.bytes())?;
630 trx.set(key.bytes(), &value);
631
632 self.remove_from_parent(trx, old_path).await?;
633
634 self.contents_of_node(&old_node.subspace, new_path, &old_node.layer)
635 }
636
637 async fn remove_from_parent(
638 &self,
639 trx: &Transaction,
640 path: &[String],
641 ) -> Result<(), DirectoryError> {
642 let (last_element, parent_path) = path
643 .split_last()
644 .ok_or(DirectoryError::BadDestinationDirectory)?;
645
646 match self.find(trx, parent_path).await? {
647 None => {}
648 Some(parent_node) => {
649 let key = parent_node.subspace.pack(&(DEFAULT_SUB_DIRS, last_element));
650 trx.clear(&key);
651 }
652 }
653
654 Ok(())
655 }
656
657 #[async_recursion]
658 async fn remove_internal(
659 &self,
660 trx: &Transaction,
661 path: &[String],
662 fail_on_nonexistent: bool,
663 ) -> Result<bool, DirectoryError> {
664 self.check_version(trx, true).await?;
665
666 if path.is_empty() {
667 return Err(DirectoryError::CannotModifyRootDirectory);
668 }
669
670 let node = match self.find(trx, path).await? {
671 Some(node) => node,
672 None if fail_on_nonexistent => return Err(DirectoryError::DirectoryDoesNotExists),
673 None => return Ok(false),
674 };
675
676 if node.is_in_partition(false) {
677 match node.get_contents()? {
678 DirectoryOutput::DirectorySubspace(_) => {
679 unreachable!("already directory partition")
680 }
681 DirectoryOutput::DirectoryPartition(d) => {
682 return d
683 .directory_subspace
684 .directory_layer
685 .remove_internal(trx, &node.get_partition_subpath(), fail_on_nonexistent)
686 .await;
687 }
688 }
689 }
690
691 self.remove_recursive(trx, &node.subspace).await?;
692 self.remove_from_parent(trx, path).await?;
693
694 Ok(true)
695 }
696
697 #[async_recursion]
698 async fn remove_recursive(
699 &self,
700 trx: &Transaction,
701 node_sub: &Subspace,
702 ) -> Result<(), DirectoryError> {
703 let sub_dir = node_sub.subspace(&DEFAULT_SUB_DIRS);
704 let range_option = RangeOption::from(&sub_dir);
705
706 let mut stream = std::pin::pin!(trx.get_ranges_keyvalues(range_option, false));
707 while let Some(row_key) = stream.try_next().await? {
708 let sub_node = self.node_with_prefix(&row_key.value());
709 self.remove_recursive(trx, &sub_node).await?;
710 }
711
712 let node_prefix: Vec<u8> = self.node_subspace.unpack(node_sub.bytes())?;
713
714 trx.clear_range(&node_prefix, &strinc(node_prefix.to_owned()));
715 trx.clear_subspace_range(node_sub);
716
717 Ok(())
718 }
719}
720
721#[async_trait]
722impl Directory for DirectoryLayer {
723 async fn create_or_open(
724 &self,
725 txn: &Transaction,
726 path: &[String],
727 prefix: Option<&[u8]>,
728 layer: Option<&[u8]>,
729 ) -> Result<DirectoryOutput, DirectoryError> {
730 self.create_or_open_internal(txn, path, prefix, layer, true, true)
731 .await
732 }
733
734 async fn create(
735 &self,
736 txn: &Transaction,
737 path: &[String],
738 prefix: Option<&[u8]>,
739 layer: Option<&[u8]>,
740 ) -> Result<DirectoryOutput, DirectoryError> {
741 self.create_or_open_internal(txn, path, prefix, layer, true, false)
742 .await
743 }
744
745 async fn open(
746 &self,
747 txn: &Transaction,
748 path: &[String],
749 layer: Option<&[u8]>,
750 ) -> Result<DirectoryOutput, DirectoryError> {
751 self.create_or_open_internal(txn, path, None, layer, false, true)
752 .await
753 }
754
755 async fn exists(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
756 self.exists_internal(trx, path).await
757 }
758
759 async fn move_directory(
760 &self,
761 _trx: &Transaction,
762 _new_path: &[String],
763 ) -> Result<DirectoryOutput, DirectoryError> {
764 Err(DirectoryError::CannotMoveRootDirectory)
765 }
766
767 async fn move_to(
773 &self,
774 trx: &Transaction,
775 old_path: &[String],
776 new_path: &[String],
777 ) -> Result<DirectoryOutput, DirectoryError> {
778 self.move_to_internal(trx, old_path, new_path).await
779 }
780
781 async fn remove(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
782 self.remove_internal(trx, path, true).await
783 }
784
785 async fn remove_if_exists(
786 &self,
787 trx: &Transaction,
788 path: &[String],
789 ) -> Result<bool, DirectoryError> {
790 self.remove_internal(trx, path, false).await
791 }
792
793 async fn list(
794 &self,
795 trx: &Transaction,
796 path: &[String],
797 ) -> Result<Vec<String>, DirectoryError> {
798 self.list_internal(trx, path).await
799 }
800}