1use crate::directory::directory_partition::DirectoryPartition;
12use crate::directory::directory_subspace::DirectorySubspace;
13use crate::directory::error::DirectoryError;
14use crate::directory::node::Node;
15use crate::directory::{compare_slice, strinc, Directory, DirectoryOutput};
16use crate::future::FdbSlice;
17use crate::tuple::hca::HighContentionAllocator;
18use crate::tuple::{Element, Subspace, TuplePack};
19use crate::RangeOption;
20use crate::{FdbResult, Transaction};
21use async_recursion::async_recursion;
22use async_trait::async_trait;
23use std::cmp::Ordering;
24use std::ops::Deref;
25use std::sync::Arc;
26
27pub(crate) const DEFAULT_SUB_DIRS: i64 = 0;
28const MAJOR_VERSION: u32 = 1;
29const MINOR_VERSION: u32 = 0;
30const PATCH_VERSION: u32 = 0;
31pub(crate) const DEFAULT_NODE_PREFIX: &[u8] = b"\xFE";
32const DEFAULT_HCA_PREFIX: &[u8] = b"hca";
33pub(crate) const PARTITION_LAYER: &[u8] = b"partition";
34pub(crate) const LAYER_SUFFIX: &[u8] = b"layer";
35
36#[derive(Clone)]
41pub struct DirectoryLayer {
42 pub(crate) inner: Arc<DirectoryLayerInner>,
43}
44
45impl std::fmt::Debug for DirectoryLayer {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 self.inner.fmt(f)
48 }
49}
50
51#[derive(Debug)]
52pub struct DirectoryLayerInner {
53 pub(crate) root_node: Subspace,
54 pub(crate) node_subspace: Subspace,
55 pub(crate) content_subspace: Subspace,
56 pub(crate) allocator: HighContentionAllocator,
57 pub(crate) allow_manual_prefixes: bool,
58
59 pub(crate) path: Vec<String>,
60}
61
62impl Deref for DirectoryLayer {
63 type Target = DirectoryLayerInner;
64
65 fn deref(&self) -> &Self::Target {
66 &self.inner
67 }
68}
69
70impl Default for DirectoryLayer {
71 fn default() -> Self {
77 Self::new(
78 Subspace::from_bytes(DEFAULT_NODE_PREFIX),
79 Subspace::all(),
80 false,
81 )
82 }
83}
84
85impl DirectoryLayer {
86 pub fn new(
87 node_subspace: Subspace,
88 content_subspace: Subspace,
89 allow_manual_prefixes: bool,
90 ) -> Self {
91 let root_node = node_subspace.subspace(&node_subspace.bytes());
92 let allocator = HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX));
93
94 DirectoryLayer {
95 inner: Arc::new(DirectoryLayerInner {
96 root_node,
97 node_subspace,
98 content_subspace,
99 allocator,
100 allow_manual_prefixes,
101 path: vec![],
102 }),
103 }
104 }
105
106 pub(crate) fn new_with_path(
107 node_subspace: Subspace,
108 content_subspace: Subspace,
109 allow_manual_prefixes: bool,
110 path: &[String],
111 ) -> Self {
112 let root_node = node_subspace.subspace(&node_subspace.bytes());
113 let allocator = HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX));
114
115 DirectoryLayer {
116 inner: Arc::new(DirectoryLayerInner {
117 root_node,
118 node_subspace,
119 content_subspace,
120 allocator,
121 allow_manual_prefixes,
122 path: Vec::from(path),
123 }),
124 }
125 }
126
127 pub fn get_path(&self) -> &[String] {
128 self.path.as_slice()
129 }
130
131 fn node_with_optional_prefix(&self, prefix: Option<FdbSlice>) -> Option<Subspace> {
132 prefix.map(|fdb_slice| self.node_with_prefix(&fdb_slice.deref()))
133 }
134
135 fn node_with_prefix<T: TuplePack>(&self, prefix: &T) -> Subspace {
136 self.inner.node_subspace.subspace(prefix)
137 }
138
139 async fn find(
140 &self,
141 trx: &Transaction,
142 path: &[String],
143 ) -> Result<Option<Node>, DirectoryError> {
144 let mut current_path = vec![];
145 let mut node_subspace = self.root_node.clone();
146 let mut layer = vec![];
147 let mut loaded = false;
148
149 for path_name in path.iter() {
151 current_path.push(path_name.clone());
152 let key = node_subspace.subspace(&(DEFAULT_SUB_DIRS, path_name.to_owned()));
153
154 let fdb_slice_value = trx.get(key.bytes(), false).await?;
156
157 loaded = true;
158 node_subspace = match self.node_with_optional_prefix(fdb_slice_value) {
159 None => return Ok(None),
160 Some(subspace) => subspace,
161 };
162
163 layer = Node::load_metadata(trx, &node_subspace).await?;
164 if layer.as_slice().eq(PARTITION_LAYER) {
165 break;
166 }
167 }
168
169 if !loaded {
170 layer = Node::load_metadata(trx, &node_subspace).await?;
171 }
172
173 Ok(Some(Node {
174 subspace: node_subspace,
175 current_path,
176 target_path: Vec::from(path),
177 directory_layer: self.clone(),
178 layer,
179 }))
180 }
181
182 fn to_absolute_path(&self, sub_path: &[String]) -> Vec<String> {
183 let mut path: Vec<String> = Vec::with_capacity(self.path.len() + sub_path.len());
184
185 path.extend_from_slice(&self.path);
186 path.extend_from_slice(sub_path);
187
188 path
189 }
190
191 pub(crate) fn contents_of_node(
192 &self,
193 subspace: &Subspace,
194 path: &[String],
195 layer: &[u8],
196 ) -> Result<DirectoryOutput, DirectoryError> {
197 let prefix: Vec<u8> = self.node_subspace.unpack(subspace.bytes())?;
198
199 if layer.eq(PARTITION_LAYER) {
200 Ok(DirectoryOutput::DirectoryPartition(
201 DirectoryPartition::new(&self.to_absolute_path(path), prefix, self.clone()),
202 ))
203 } else {
204 Ok(DirectoryOutput::DirectorySubspace(DirectorySubspace::new(
205 &self.to_absolute_path(path),
206 prefix,
207 self,
208 layer.to_owned(),
209 )))
210 }
211 }
212
213 #[async_recursion]
215 async fn create_or_open_internal(
216 &self,
217 trx: &Transaction,
218 path: &[String],
219 prefix: Option<&'async_recursion [u8]>,
220 layer: Option<&'async_recursion [u8]>,
221 allow_create: bool,
222 allow_open: bool,
223 ) -> Result<DirectoryOutput, DirectoryError> {
224 self.check_version(trx, false).await?;
225
226 if prefix.is_some() && !self.allow_manual_prefixes {
227 if self.path.is_empty() {
228 return Err(DirectoryError::PrefixNotAllowed);
229 }
230
231 return Err(DirectoryError::CannotPrefixInPartition);
232 }
233
234 if path.is_empty() {
235 return Err(DirectoryError::NoPathProvided);
236 }
237
238 if let Some(node) = self.find(trx, path).await? {
239 if node.is_in_partition(false) {
240 let sub_path = node.get_partition_subpath();
241 match node.get_contents()? {
242 DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"),
243 DirectoryOutput::DirectoryPartition(directory_partition) => {
244 let dir_space = directory_partition
245 .directory_subspace
246 .directory_layer
247 .create_or_open_internal(
248 trx,
249 &sub_path,
250 prefix,
251 layer,
252 allow_create,
253 allow_open,
254 )
255 .await?;
256 Ok(dir_space)
257 }
258 }
259 } else {
260 self.open_internal(layer, &node, allow_open).await
261 }
262 } else {
263 self.create_internal(trx, path, layer, prefix, allow_create)
264 .await
265 }
266 }
267
268 async fn open_internal(
269 &self,
270 layer: Option<&[u8]>,
271 node: &Node,
272 allow_open: bool,
273 ) -> Result<DirectoryOutput, DirectoryError> {
274 if !allow_open {
275 return Err(DirectoryError::DirAlreadyExists);
276 }
277
278 match layer {
279 None => {}
280 Some(layer) => {
281 if !layer.is_empty() {
282 match compare_slice(layer, &node.layer) {
283 Ordering::Equal => {}
284 _ => {
285 return Err(DirectoryError::IncompatibleLayer);
286 }
287 }
288 }
289 }
290 }
291
292 node.get_contents()
293 }
294
295 async fn create_internal(
296 &self,
297 trx: &Transaction,
298 path: &[String],
299 layer: Option<&[u8]>,
300 prefix: Option<&[u8]>,
301 allow_create: bool,
302 ) -> Result<DirectoryOutput, DirectoryError> {
303 let path_last = path.last().ok_or(DirectoryError::NoPathProvided)?;
304
305 if !allow_create {
306 return Err(DirectoryError::DirectoryDoesNotExists);
307 }
308
309 let layer = layer.unwrap_or_default();
310
311 self.check_version(trx, true).await?;
312 let new_prefix = self.get_prefix(trx, prefix).await?;
313
314 let is_prefix_free = self
315 .is_prefix_free(trx, new_prefix.as_slice(), prefix.is_none())
316 .await?;
317
318 if !is_prefix_free {
319 return Err(DirectoryError::DirectoryPrefixInUse);
320 }
321
322 let parent_node = self.get_parent_node(trx, path).await?;
323 let node = self.node_with_prefix(&new_prefix);
324
325 let key = parent_node.subspace(&(DEFAULT_SUB_DIRS, path_last));
326 let key_layer = node.pack(&LAYER_SUFFIX);
327
328 trx.set(key.bytes(), &new_prefix);
329 trx.set(&key_layer, layer);
330
331 self.contents_of_node(&node, path, layer)
332 }
333
334 async fn get_parent_node(
335 &self,
336 trx: &Transaction,
337 path: &[String],
338 ) -> Result<Subspace, DirectoryError> {
339 match path.split_last() {
340 None => Ok(self.root_node.clone()),
341 Some((_, remains)) => {
342 if remains.is_empty() {
343 return Ok(self.root_node.clone());
344 }
345 let parent = self
346 .create_or_open_internal(trx, remains, None, None, true, true)
347 .await?;
348
349 Ok(self.node_with_prefix(&parent.bytes()?))
350 }
351 }
352 }
353
354 async fn is_prefix_free(
355 &self,
356 trx: &Transaction,
357 prefix: &[u8],
358 snapshot: bool,
359 ) -> Result<bool, DirectoryError> {
360 if prefix.is_empty() {
361 return Ok(false);
362 }
363
364 let node = self.node_containing_key(trx, prefix, snapshot).await?;
365
366 if node.is_some() {
367 return Ok(false);
368 }
369
370 let range_option = RangeOption::from((
371 self.node_subspace.pack(&prefix),
372 self.node_subspace.pack(&strinc(prefix.to_vec())),
373 ));
374
375 let result = trx.get_range(&range_option, 1, snapshot).await?;
376
377 Ok(result.is_empty())
378 }
379
380 async fn node_containing_key(
381 &self,
382 trx: &Transaction,
383 key: &[u8],
384 snapshot: bool,
385 ) -> Result<Option<Subspace>, DirectoryError> {
386 if key.starts_with(self.node_subspace.bytes()) {
387 return Ok(Some(self.root_node.clone()));
388 }
389
390 let mut key_after = key.to_vec();
391 key_after.push(0);
393
394 let range_end = self.node_subspace.pack(&key_after);
395
396 let mut range_option = RangeOption::from((self.node_subspace.range().0, range_end));
397 range_option.reverse = true;
398 range_option.limit = Some(1);
399
400 let fdb_values = trx.get_range(&range_option, 1, snapshot).await?;
402
403 match fdb_values.first() {
404 None => {}
405 Some(fdb_key_value) => {
406 let previous_prefix: Vec<Element> =
407 self.node_subspace.unpack(fdb_key_value.key())?;
408
409 if let Some(Element::Bytes(previous_prefix)) = previous_prefix.first() {
410 if key.starts_with(previous_prefix) {
411 return Ok(Some(self.node_with_prefix(previous_prefix)));
412 };
413 };
414 }
415 }
416 Ok(None)
417 }
418
419 async fn get_prefix(
420 &self,
421 trx: &Transaction,
422 prefix: Option<&[u8]>,
423 ) -> Result<Vec<u8>, DirectoryError> {
424 match prefix {
425 None => {
426 let allocator = self.allocator.allocate(trx).await?;
428 let subspace = self.content_subspace.subspace(&allocator);
429
430 let result = trx
432 .get_range(&RangeOption::from(subspace.range()), 1, false)
433 .await?;
434
435 if !result.is_empty() {
436 return Err(DirectoryError::PrefixNotEmpty);
437 }
438
439 Ok(subspace.into_bytes())
440 }
441 Some(v) => Ok(v.to_vec()),
442 }
443 }
444
445 async fn check_version(
447 &self,
448 trx: &Transaction,
449 allow_creation: bool,
450 ) -> Result<(), DirectoryError> {
451 let version = self.get_version_value(trx).await?;
452 match version {
453 None => {
454 if allow_creation {
455 self.initialize_directory(trx).await
456 } else {
457 Ok(())
458 }
459 }
460 Some(versions) => {
461 if versions.len() < 12 {
462 return Err(DirectoryError::Version(
463 "incorrect version length".to_string(),
464 ));
465 }
466 let mut arr = [0u8; 4];
467 arr.copy_from_slice(&versions[0..4]);
468 let major: u32 = u32::from_le_bytes(arr);
469
470 arr.copy_from_slice(&versions[4..8]);
471 let minor: u32 = u32::from_le_bytes(arr);
472
473 arr.copy_from_slice(&versions[8..12]);
474 let patch: u32 = u32::from_le_bytes(arr);
475
476 if major > MAJOR_VERSION {
477 let msg = format!("cannot load directory with version {}.{}.{} using directory layer {}.{}.{}", major, minor, patch, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION);
478 return Err(DirectoryError::Version(msg));
479 }
480
481 if minor > MINOR_VERSION {
482 let msg = format!("directory with version {}.{}.{} is read-only when opened using directory layer {}.{}.{}", major, minor, patch, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION);
483 return Err(DirectoryError::Version(msg));
484 }
485
486 Ok(())
487 }
488 }
489 }
490
491 async fn initialize_directory(&self, trx: &Transaction) -> Result<(), DirectoryError> {
493 let mut value = vec![];
494 value.extend(&MAJOR_VERSION.to_le_bytes());
495 value.extend(&MINOR_VERSION.to_le_bytes());
496 value.extend(&PATCH_VERSION.to_le_bytes());
497 let version_subspace: &[u8] = b"version";
498 let directory_version_key = self.root_node.subspace(&version_subspace);
499 trx.set(directory_version_key.bytes(), &value);
500
501 Ok(())
502 }
503
504 async fn get_version_value(&self, trx: &Transaction) -> FdbResult<Option<FdbSlice>> {
505 let version_subspace: &[u8] = b"version";
506 let version_key = self.root_node.subspace(&version_subspace);
507
508 trx.get(version_key.bytes(), false).await
509 }
510
511 async fn exists_internal(
512 &self,
513 trx: &Transaction,
514 path: &[String],
515 ) -> Result<bool, DirectoryError> {
516 self.check_version(trx, false).await?;
517
518 match self.find(trx, path).await? {
519 None => Ok(false),
520 Some(node) if node.is_in_partition(false) => {
521 node.get_contents()?
522 .exists(trx, &node.get_partition_subpath())
523 .await
524 }
525 Some(_node) => Ok(true),
526 }
527 }
528
529 async fn list_internal(
530 &self,
531 trx: &Transaction,
532 path: &[String],
533 ) -> Result<Vec<String>, DirectoryError> {
534 self.check_version(trx, false).await?;
535
536 let node = self
537 .find(trx, path)
538 .await?
539 .ok_or(DirectoryError::PathDoesNotExists)?;
540 if node.is_in_partition(true) {
541 match node.get_contents()? {
542 DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"),
543 DirectoryOutput::DirectoryPartition(directory_partition) => {
544 return directory_partition
545 .directory_subspace
546 .directory_layer
547 .list(trx, &node.get_partition_subpath())
548 .await
549 }
550 };
551 }
552
553 node.list_sub_folders(trx).await
554 }
555
556 async fn move_to_internal(
557 &self,
558 trx: &Transaction,
559 old_path: &[String],
560 new_path: &[String],
561 ) -> Result<DirectoryOutput, DirectoryError> {
562 self.check_version(trx, true).await?;
563
564 if old_path.len() <= new_path.len()
565 && compare_slice(old_path, &new_path[..old_path.len()]) == Ordering::Equal
566 {
567 return Err(DirectoryError::CannotMoveBetweenSubdirectory);
568 }
569
570 let old_node = self.find(trx, old_path).await?;
571 let new_node = self.find(trx, new_path).await?;
572
573 let old_node = match old_node {
574 None => return Err(DirectoryError::PathDoesNotExists),
575 Some(n) => n,
576 };
577 let old_node_exists_in_partition = old_node.is_in_partition(false);
578
579 match new_node {
580 None => {
581 if old_node_exists_in_partition {
582 return Err(DirectoryError::CannotMoveBetweenPartition);
583 }
584 }
585 Some(new_node) => {
586 let new_node_exists_in_partition = new_node.is_in_partition(false);
587 if old_node_exists_in_partition || new_node_exists_in_partition {
588 if !old_node_exists_in_partition
589 || !new_node_exists_in_partition
590 || !old_node.current_path.eq(&new_node.current_path)
591 {
592 return Err(DirectoryError::CannotMoveBetweenPartition);
593 }
594
595 return new_node
596 .get_contents()?
597 .move_to(
598 trx,
599 &old_node.get_partition_subpath(),
600 &new_node.get_partition_subpath(),
601 )
602 .await;
603 }
604 return Err(DirectoryError::DirAlreadyExists);
605 }
606 }
607
608 let (new_path_last, parent_path) = new_path
609 .split_last()
610 .ok_or(DirectoryError::DirAlreadyExists)?;
611
612 let parent_node = self
613 .find(trx, parent_path)
614 .await?
615 .ok_or(DirectoryError::ParentDirDoesNotExists)?;
616
617 let key = parent_node
618 .subspace
619 .subspace(&(DEFAULT_SUB_DIRS, new_path_last));
620 let value: Vec<u8> = self.node_subspace.unpack(old_node.subspace.bytes())?;
621 trx.set(key.bytes(), &value);
622
623 self.remove_from_parent(trx, old_path).await?;
624
625 self.contents_of_node(&old_node.subspace, new_path, &old_node.layer)
626 }
627
628 async fn remove_from_parent(
629 &self,
630 trx: &Transaction,
631 path: &[String],
632 ) -> Result<(), DirectoryError> {
633 let (last_element, parent_path) = path
634 .split_last()
635 .ok_or(DirectoryError::BadDestinationDirectory)?;
636
637 match self.find(trx, parent_path).await? {
638 None => {}
639 Some(parent_node) => {
640 let key = parent_node.subspace.pack(&(DEFAULT_SUB_DIRS, last_element));
641 trx.clear(&key);
642 }
643 }
644
645 Ok(())
646 }
647
648 #[async_recursion]
649 async fn remove_internal(
650 &self,
651 trx: &Transaction,
652 path: &[String],
653 fail_on_nonexistent: bool,
654 ) -> Result<bool, DirectoryError> {
655 self.check_version(trx, true).await?;
656
657 if path.is_empty() {
658 return Err(DirectoryError::CannotModifyRootDirectory);
659 }
660
661 let node = match self.find(trx, path).await? {
662 Some(node) => node,
663 None if fail_on_nonexistent => return Err(DirectoryError::DirectoryDoesNotExists),
664 None => return Ok(false),
665 };
666
667 if node.is_in_partition(false) {
668 match node.get_contents()? {
669 DirectoryOutput::DirectorySubspace(_) => {
670 unreachable!("already directory partition")
671 }
672 DirectoryOutput::DirectoryPartition(d) => {
673 return d
674 .directory_subspace
675 .directory_layer
676 .remove_internal(trx, &node.get_partition_subpath(), fail_on_nonexistent)
677 .await
678 }
679 }
680 }
681
682 self.remove_recursive(trx, &node.subspace).await?;
683 self.remove_from_parent(trx, path).await?;
684
685 Ok(true)
686 }
687
688 #[async_recursion]
689 async fn remove_recursive(
690 &self,
691 trx: &Transaction,
692 node_sub: &Subspace,
693 ) -> Result<(), DirectoryError> {
694 let sub_dir = node_sub.subspace(&DEFAULT_SUB_DIRS);
695 let (mut begin, end) = sub_dir.range();
696
697 loop {
698 let range_option = RangeOption::from((begin.as_slice(), end.as_slice()));
699
700 let range = trx.get_range(&range_option, 1024, false).await?;
701 let has_more = range.more();
702
703 for row_key in range {
704 let sub_node = self.node_with_prefix(&row_key.value());
705 self.remove_recursive(trx, &sub_node).await?;
706 begin = row_key.key().pack_to_vec();
707 }
708
709 if !has_more {
710 break;
711 }
712 }
713
714 let node_prefix: Vec<u8> = self.node_subspace.unpack(node_sub.bytes())?;
715
716 trx.clear_range(&node_prefix, &strinc(node_prefix.to_owned()));
717 trx.clear_subspace_range(node_sub);
718
719 Ok(())
720 }
721}
722
723#[async_trait]
724impl Directory for DirectoryLayer {
725 async fn create_or_open(
726 &self,
727 txn: &Transaction,
728 path: &[String],
729 prefix: Option<&[u8]>,
730 layer: Option<&[u8]>,
731 ) -> Result<DirectoryOutput, DirectoryError> {
732 self.create_or_open_internal(txn, path, prefix, layer, true, true)
733 .await
734 }
735
736 async fn create(
737 &self,
738 txn: &Transaction,
739 path: &[String],
740 prefix: Option<&[u8]>,
741 layer: Option<&[u8]>,
742 ) -> Result<DirectoryOutput, DirectoryError> {
743 self.create_or_open_internal(txn, path, prefix, layer, true, false)
744 .await
745 }
746
747 async fn open(
748 &self,
749 txn: &Transaction,
750 path: &[String],
751 layer: Option<&[u8]>,
752 ) -> Result<DirectoryOutput, DirectoryError> {
753 self.create_or_open_internal(txn, path, None, layer, false, true)
754 .await
755 }
756
757 async fn exists(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
758 self.exists_internal(trx, path).await
759 }
760
761 async fn move_directory(
762 &self,
763 _trx: &Transaction,
764 _new_path: &[String],
765 ) -> Result<DirectoryOutput, DirectoryError> {
766 Err(DirectoryError::CannotMoveRootDirectory)
767 }
768
769 async fn move_to(
775 &self,
776 trx: &Transaction,
777 old_path: &[String],
778 new_path: &[String],
779 ) -> Result<DirectoryOutput, DirectoryError> {
780 self.move_to_internal(trx, old_path, new_path).await
781 }
782
783 async fn remove(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
784 self.remove_internal(trx, path, true).await
785 }
786
787 async fn remove_if_exists(
788 &self,
789 trx: &Transaction,
790 path: &[String],
791 ) -> Result<bool, DirectoryError> {
792 self.remove_internal(trx, path, false).await
793 }
794
795 async fn list(
796 &self,
797 trx: &Transaction,
798 path: &[String],
799 ) -> Result<Vec<String>, DirectoryError> {
800 self.list_internal(trx, path).await
801 }
802}