1use crate::directory::directory_partition::DirectoryPartition;
12use crate::directory::directory_subspace::DirectorySubspace;
13use crate::directory::error::DirectoryError;
14use crate::directory::node::Node;
15use crate::directory::{compare_slice, strinc, Directory, DirectoryOutput};
16use crate::future::FdbSlice;
17use crate::tuple::hca::HighContentionAllocator;
18use crate::tuple::{Element, Subspace, TuplePack};
19use crate::RangeOption;
20use crate::{FdbResult, Transaction};
21use async_recursion::async_recursion;
22use async_trait::async_trait;
23use futures::TryStreamExt;
24use std::cmp::Ordering;
25use std::ops::Deref;
26use std::sync::Arc;
27
28pub(crate) const DEFAULT_SUB_DIRS: i64 = 0;
29const MAJOR_VERSION: u32 = 1;
30const MINOR_VERSION: u32 = 0;
31const PATCH_VERSION: u32 = 0;
32pub(crate) const DEFAULT_NODE_PREFIX: &[u8] = b"\xFE";
33const DEFAULT_HCA_PREFIX: &[u8] = b"hca";
34pub(crate) const PARTITION_LAYER: &[u8] = b"partition";
35pub(crate) const LAYER_SUFFIX: &[u8] = b"layer";
36
37#[derive(Clone)]
42pub struct DirectoryLayer {
43 pub(crate) inner: Arc<DirectoryLayerInner>,
44}
45
46impl std::fmt::Debug for DirectoryLayer {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 self.inner.fmt(f)
49 }
50}
51
52#[derive(Debug)]
53pub struct DirectoryLayerInner {
54 pub(crate) root_node: Subspace,
55 pub(crate) node_subspace: Subspace,
56 pub(crate) content_subspace: Subspace,
57 pub(crate) allocator: HighContentionAllocator,
58 pub(crate) allow_manual_prefixes: bool,
59
60 pub(crate) path: Vec<String>,
61}
62
63impl Deref for DirectoryLayer {
64 type Target = DirectoryLayerInner;
65
66 fn deref(&self) -> &Self::Target {
67 &self.inner
68 }
69}
70
71impl Default for DirectoryLayer {
72 fn default() -> Self {
78 Self::new(
79 Subspace::from_bytes(DEFAULT_NODE_PREFIX),
80 Subspace::all(),
81 false,
82 )
83 }
84}
85
86impl DirectoryLayer {
87 pub fn new(
88 node_subspace: Subspace,
89 content_subspace: Subspace,
90 allow_manual_prefixes: bool,
91 ) -> Self {
92 let root_node = node_subspace.subspace(&node_subspace.bytes());
93 let allocator = HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX));
94
95 DirectoryLayer {
96 inner: Arc::new(DirectoryLayerInner {
97 root_node,
98 node_subspace,
99 content_subspace,
100 allocator,
101 allow_manual_prefixes,
102 path: vec![],
103 }),
104 }
105 }
106
107 pub(crate) fn new_with_path(
108 node_subspace: Subspace,
109 content_subspace: Subspace,
110 allow_manual_prefixes: bool,
111 path: &[String],
112 ) -> Self {
113 let root_node = node_subspace.subspace(&node_subspace.bytes());
114 let allocator = HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX));
115
116 DirectoryLayer {
117 inner: Arc::new(DirectoryLayerInner {
118 root_node,
119 node_subspace,
120 content_subspace,
121 allocator,
122 allow_manual_prefixes,
123 path: Vec::from(path),
124 }),
125 }
126 }
127
128 pub fn get_path(&self) -> &[String] {
129 self.path.as_slice()
130 }
131
132 fn node_with_optional_prefix(&self, prefix: Option<FdbSlice>) -> Option<Subspace> {
133 prefix.map(|fdb_slice| self.node_with_prefix(&fdb_slice.deref()))
134 }
135
136 fn node_with_prefix<T: TuplePack>(&self, prefix: &T) -> Subspace {
137 self.inner.node_subspace.subspace(prefix)
138 }
139
140 async fn find(
141 &self,
142 trx: &Transaction,
143 path: &[String],
144 ) -> Result<Option<Node>, DirectoryError> {
145 let mut current_path = vec![];
146 let mut node_subspace = self.root_node.clone();
147 let mut layer = vec![];
148 let mut loaded = false;
149
150 for path_name in path.iter() {
152 current_path.push(path_name.clone());
153 let key = node_subspace.subspace(&(DEFAULT_SUB_DIRS, path_name.to_owned()));
154
155 let fdb_slice_value = trx.get(key.bytes(), false).await?;
157
158 loaded = true;
159 node_subspace = match self.node_with_optional_prefix(fdb_slice_value) {
160 None => return Ok(None),
161 Some(subspace) => subspace,
162 };
163
164 layer = Node::load_metadata(trx, &node_subspace).await?;
165 if layer.as_slice().eq(PARTITION_LAYER) {
166 break;
167 }
168 }
169
170 if !loaded {
171 layer = Node::load_metadata(trx, &node_subspace).await?;
172 }
173
174 Ok(Some(Node {
175 subspace: node_subspace,
176 current_path,
177 target_path: Vec::from(path),
178 directory_layer: self.clone(),
179 layer,
180 }))
181 }
182
183 fn to_absolute_path(&self, sub_path: &[String]) -> Vec<String> {
184 let mut path: Vec<String> = Vec::with_capacity(self.path.len() + sub_path.len());
185
186 path.extend_from_slice(&self.path);
187 path.extend_from_slice(sub_path);
188
189 path
190 }
191
192 pub(crate) fn contents_of_node(
193 &self,
194 subspace: &Subspace,
195 path: &[String],
196 layer: &[u8],
197 ) -> Result<DirectoryOutput, DirectoryError> {
198 let prefix: Vec<u8> = self.node_subspace.unpack(subspace.bytes())?;
199
200 if layer.eq(PARTITION_LAYER) {
201 Ok(DirectoryOutput::DirectoryPartition(
202 DirectoryPartition::new(&self.to_absolute_path(path), prefix, self.clone()),
203 ))
204 } else {
205 Ok(DirectoryOutput::DirectorySubspace(DirectorySubspace::new(
206 &self.to_absolute_path(path),
207 prefix,
208 self,
209 layer.to_owned(),
210 )))
211 }
212 }
213
214 #[async_recursion]
216 async fn create_or_open_internal(
217 &self,
218 trx: &Transaction,
219 path: &[String],
220 prefix: Option<&'async_recursion [u8]>,
221 layer: Option<&'async_recursion [u8]>,
222 allow_create: bool,
223 allow_open: bool,
224 ) -> Result<DirectoryOutput, DirectoryError> {
225 self.check_version(trx, false).await?;
226
227 if prefix.is_some() && !self.allow_manual_prefixes {
228 if self.path.is_empty() {
229 return Err(DirectoryError::PrefixNotAllowed);
230 }
231
232 return Err(DirectoryError::CannotPrefixInPartition);
233 }
234
235 if path.is_empty() {
236 return Err(DirectoryError::NoPathProvided);
237 }
238
239 if let Some(node) = self.find(trx, path).await? {
240 if node.is_in_partition(false) {
241 let sub_path = node.get_partition_subpath();
242 match node.get_contents()? {
243 DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"),
244 DirectoryOutput::DirectoryPartition(directory_partition) => {
245 let dir_space = directory_partition
246 .directory_subspace
247 .directory_layer
248 .create_or_open_internal(
249 trx,
250 &sub_path,
251 prefix,
252 layer,
253 allow_create,
254 allow_open,
255 )
256 .await?;
257 Ok(dir_space)
258 }
259 }
260 } else {
261 self.open_internal(layer, &node, allow_open).await
262 }
263 } else {
264 self.create_internal(trx, path, layer, prefix, allow_create)
265 .await
266 }
267 }
268
269 async fn open_internal(
270 &self,
271 layer: Option<&[u8]>,
272 node: &Node,
273 allow_open: bool,
274 ) -> Result<DirectoryOutput, DirectoryError> {
275 if !allow_open {
276 return Err(DirectoryError::DirAlreadyExists);
277 }
278
279 match layer {
280 None => {}
281 Some(layer) => {
282 if !layer.is_empty() {
283 match compare_slice(layer, &node.layer) {
284 Ordering::Equal => {}
285 _ => {
286 return Err(DirectoryError::IncompatibleLayer);
287 }
288 }
289 }
290 }
291 }
292
293 node.get_contents()
294 }
295
296 async fn create_internal(
297 &self,
298 trx: &Transaction,
299 path: &[String],
300 layer: Option<&[u8]>,
301 prefix: Option<&[u8]>,
302 allow_create: bool,
303 ) -> Result<DirectoryOutput, DirectoryError> {
304 let path_last = path.last().ok_or(DirectoryError::NoPathProvided)?;
305
306 if !allow_create {
307 return Err(DirectoryError::DirectoryDoesNotExists);
308 }
309
310 let layer = layer.unwrap_or_default();
311
312 self.check_version(trx, true).await?;
313 let new_prefix = self.get_prefix(trx, prefix).await?;
314
315 let is_prefix_free = self
316 .is_prefix_free(trx, new_prefix.as_slice(), prefix.is_none())
317 .await?;
318
319 if !is_prefix_free {
320 return Err(DirectoryError::DirectoryPrefixInUse);
321 }
322
323 let parent_node = self.get_parent_node(trx, path).await?;
324 let node = self.node_with_prefix(&new_prefix);
325
326 let key = parent_node.subspace(&(DEFAULT_SUB_DIRS, path_last));
327 let key_layer = node.pack(&LAYER_SUFFIX);
328
329 trx.set(key.bytes(), &new_prefix);
330 trx.set(&key_layer, layer);
331
332 self.contents_of_node(&node, path, layer)
333 }
334
335 async fn get_parent_node(
336 &self,
337 trx: &Transaction,
338 path: &[String],
339 ) -> Result<Subspace, DirectoryError> {
340 match path.split_last() {
341 None => Ok(self.root_node.clone()),
342 Some((_, remains)) => {
343 if remains.is_empty() {
344 return Ok(self.root_node.clone());
345 }
346 let parent = self
347 .create_or_open_internal(trx, remains, None, None, true, true)
348 .await?;
349
350 Ok(self.node_with_prefix(&parent.bytes()?))
351 }
352 }
353 }
354
355 async fn is_prefix_free(
356 &self,
357 trx: &Transaction,
358 prefix: &[u8],
359 snapshot: bool,
360 ) -> Result<bool, DirectoryError> {
361 if prefix.is_empty() {
362 return Ok(false);
363 }
364
365 let node = self.node_containing_key(trx, prefix, snapshot).await?;
366
367 if node.is_some() {
368 return Ok(false);
369 }
370
371 let mut range_option = RangeOption::from((
372 self.node_subspace.pack(&prefix),
373 self.node_subspace.pack(&strinc(prefix.to_vec())),
374 ));
375 range_option.limit = Some(1);
376
377 let result = trx
378 .get_ranges_keyvalues(range_option, snapshot)
379 .try_next()
380 .await?;
381
382 Ok(result.is_none())
383 }
384
385 async fn node_containing_key(
386 &self,
387 trx: &Transaction,
388 key: &[u8],
389 snapshot: bool,
390 ) -> Result<Option<Subspace>, DirectoryError> {
391 if key.starts_with(self.node_subspace.bytes()) {
392 return Ok(Some(self.root_node.clone()));
393 }
394
395 let mut key_after = key.to_vec();
396 key_after.push(0);
398
399 let range_end = self.node_subspace.pack(&key_after);
400
401 let mut range_option = RangeOption::from((self.node_subspace.range().0, range_end));
402 range_option.reverse = true;
403 range_option.limit = Some(1);
404
405 let fdb_value = trx
407 .get_ranges_keyvalues(range_option, snapshot)
408 .try_next()
409 .await?;
410
411 if let Some(fdb_key_value) = fdb_value {
412 let previous_prefix: Vec<Element> = self.node_subspace.unpack(fdb_key_value.key())?;
413
414 if let Some(Element::Bytes(previous_prefix)) = previous_prefix.first() {
415 if key.starts_with(previous_prefix) {
416 return Ok(Some(self.node_with_prefix(previous_prefix)));
417 };
418 };
419 }
420 Ok(None)
421 }
422
423 async fn get_prefix(
424 &self,
425 trx: &Transaction,
426 prefix: Option<&[u8]>,
427 ) -> Result<Vec<u8>, DirectoryError> {
428 match prefix {
429 None => {
430 let allocator = self.allocator.allocate(trx).await?;
432 let subspace = self.content_subspace.subspace(&allocator);
433
434 let mut range_option = RangeOption::from(subspace.range());
436 range_option.limit = Some(1);
437
438 let result = trx
439 .get_ranges_keyvalues(range_option, false)
440 .try_next()
441 .await?;
442
443 if result.is_some() {
444 return Err(DirectoryError::PrefixNotEmpty);
445 }
446
447 Ok(subspace.into_bytes())
448 }
449 Some(v) => Ok(v.to_vec()),
450 }
451 }
452
453 async fn check_version(
455 &self,
456 trx: &Transaction,
457 allow_creation: bool,
458 ) -> Result<(), DirectoryError> {
459 let version = self.get_version_value(trx).await?;
460 match version {
461 None => {
462 if allow_creation {
463 self.initialize_directory(trx).await
464 } else {
465 Ok(())
466 }
467 }
468 Some(versions) => {
469 if versions.len() < 12 {
470 return Err(DirectoryError::Version(
471 "incorrect version length".to_string(),
472 ));
473 }
474 let mut arr = [0u8; 4];
475 arr.copy_from_slice(&versions[0..4]);
476 let major: u32 = u32::from_le_bytes(arr);
477
478 arr.copy_from_slice(&versions[4..8]);
479 let minor: u32 = u32::from_le_bytes(arr);
480
481 arr.copy_from_slice(&versions[8..12]);
482 let patch: u32 = u32::from_le_bytes(arr);
483
484 if major > MAJOR_VERSION {
485 let msg = format!("cannot load directory with version {major}.{minor}.{patch} using directory layer {MAJOR_VERSION}.{MINOR_VERSION}.{PATCH_VERSION}");
486 return Err(DirectoryError::Version(msg));
487 }
488
489 if minor > MINOR_VERSION {
490 let msg = format!("directory with version {major}.{minor}.{patch} is read-only when opened using directory layer {MAJOR_VERSION}.{MINOR_VERSION}.{PATCH_VERSION}");
491 return Err(DirectoryError::Version(msg));
492 }
493
494 Ok(())
495 }
496 }
497 }
498
499 async fn initialize_directory(&self, trx: &Transaction) -> Result<(), DirectoryError> {
501 let mut value = vec![];
502 value.extend(&MAJOR_VERSION.to_le_bytes());
503 value.extend(&MINOR_VERSION.to_le_bytes());
504 value.extend(&PATCH_VERSION.to_le_bytes());
505 let version_subspace: &[u8] = b"version";
506 let directory_version_key = self.root_node.subspace(&version_subspace);
507 trx.set(directory_version_key.bytes(), &value);
508
509 Ok(())
510 }
511
512 async fn get_version_value(&self, trx: &Transaction) -> FdbResult<Option<FdbSlice>> {
513 let version_subspace: &[u8] = b"version";
514 let version_key = self.root_node.subspace(&version_subspace);
515
516 trx.get(version_key.bytes(), false).await
517 }
518
519 async fn exists_internal(
520 &self,
521 trx: &Transaction,
522 path: &[String],
523 ) -> Result<bool, DirectoryError> {
524 self.check_version(trx, false).await?;
525
526 match self.find(trx, path).await? {
527 None => Ok(false),
528 Some(node) if node.is_in_partition(false) => {
529 node.get_contents()?
530 .exists(trx, &node.get_partition_subpath())
531 .await
532 }
533 Some(_node) => Ok(true),
534 }
535 }
536
537 async fn list_internal(
538 &self,
539 trx: &Transaction,
540 path: &[String],
541 ) -> Result<Vec<String>, DirectoryError> {
542 self.check_version(trx, false).await?;
543
544 let node = self
545 .find(trx, path)
546 .await?
547 .ok_or(DirectoryError::PathDoesNotExists)?;
548 if node.is_in_partition(true) {
549 match node.get_contents()? {
550 DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"),
551 DirectoryOutput::DirectoryPartition(directory_partition) => {
552 return directory_partition
553 .directory_subspace
554 .directory_layer
555 .list(trx, &node.get_partition_subpath())
556 .await
557 }
558 };
559 }
560
561 node.list_sub_folders(trx).await
562 }
563
564 async fn move_to_internal(
565 &self,
566 trx: &Transaction,
567 old_path: &[String],
568 new_path: &[String],
569 ) -> Result<DirectoryOutput, DirectoryError> {
570 self.check_version(trx, true).await?;
571
572 if old_path.len() <= new_path.len()
573 && compare_slice(old_path, &new_path[..old_path.len()]) == Ordering::Equal
574 {
575 return Err(DirectoryError::CannotMoveBetweenSubdirectory);
576 }
577
578 let old_node = self.find(trx, old_path).await?;
579 let new_node = self.find(trx, new_path).await?;
580
581 let old_node = match old_node {
582 None => return Err(DirectoryError::PathDoesNotExists),
583 Some(n) => n,
584 };
585 let old_node_exists_in_partition = old_node.is_in_partition(false);
586
587 match new_node {
588 None => {
589 if old_node_exists_in_partition {
590 return Err(DirectoryError::CannotMoveBetweenPartition);
591 }
592 }
593 Some(new_node) => {
594 let new_node_exists_in_partition = new_node.is_in_partition(false);
595 if old_node_exists_in_partition || new_node_exists_in_partition {
596 if !old_node_exists_in_partition
597 || !new_node_exists_in_partition
598 || !old_node.current_path.eq(&new_node.current_path)
599 {
600 return Err(DirectoryError::CannotMoveBetweenPartition);
601 }
602
603 return new_node
604 .get_contents()?
605 .move_to(
606 trx,
607 &old_node.get_partition_subpath(),
608 &new_node.get_partition_subpath(),
609 )
610 .await;
611 }
612 return Err(DirectoryError::DirAlreadyExists);
613 }
614 }
615
616 let (new_path_last, parent_path) = new_path
617 .split_last()
618 .ok_or(DirectoryError::DirAlreadyExists)?;
619
620 let parent_node = self
621 .find(trx, parent_path)
622 .await?
623 .ok_or(DirectoryError::ParentDirDoesNotExists)?;
624
625 let key = parent_node
626 .subspace
627 .subspace(&(DEFAULT_SUB_DIRS, new_path_last));
628 let value: Vec<u8> = self.node_subspace.unpack(old_node.subspace.bytes())?;
629 trx.set(key.bytes(), &value);
630
631 self.remove_from_parent(trx, old_path).await?;
632
633 self.contents_of_node(&old_node.subspace, new_path, &old_node.layer)
634 }
635
636 async fn remove_from_parent(
637 &self,
638 trx: &Transaction,
639 path: &[String],
640 ) -> Result<(), DirectoryError> {
641 let (last_element, parent_path) = path
642 .split_last()
643 .ok_or(DirectoryError::BadDestinationDirectory)?;
644
645 match self.find(trx, parent_path).await? {
646 None => {}
647 Some(parent_node) => {
648 let key = parent_node.subspace.pack(&(DEFAULT_SUB_DIRS, last_element));
649 trx.clear(&key);
650 }
651 }
652
653 Ok(())
654 }
655
656 #[async_recursion]
657 async fn remove_internal(
658 &self,
659 trx: &Transaction,
660 path: &[String],
661 fail_on_nonexistent: bool,
662 ) -> Result<bool, DirectoryError> {
663 self.check_version(trx, true).await?;
664
665 if path.is_empty() {
666 return Err(DirectoryError::CannotModifyRootDirectory);
667 }
668
669 let node = match self.find(trx, path).await? {
670 Some(node) => node,
671 None if fail_on_nonexistent => return Err(DirectoryError::DirectoryDoesNotExists),
672 None => return Ok(false),
673 };
674
675 if node.is_in_partition(false) {
676 match node.get_contents()? {
677 DirectoryOutput::DirectorySubspace(_) => {
678 unreachable!("already directory partition")
679 }
680 DirectoryOutput::DirectoryPartition(d) => {
681 return d
682 .directory_subspace
683 .directory_layer
684 .remove_internal(trx, &node.get_partition_subpath(), fail_on_nonexistent)
685 .await
686 }
687 }
688 }
689
690 self.remove_recursive(trx, &node.subspace).await?;
691 self.remove_from_parent(trx, path).await?;
692
693 Ok(true)
694 }
695
696 #[async_recursion]
697 async fn remove_recursive(
698 &self,
699 trx: &Transaction,
700 node_sub: &Subspace,
701 ) -> Result<(), DirectoryError> {
702 let sub_dir = node_sub.subspace(&DEFAULT_SUB_DIRS);
703 let range_option = RangeOption::from(&sub_dir);
704
705 let mut stream = std::pin::pin!(trx.get_ranges_keyvalues(range_option, false));
706 while let Some(row_key) = stream.try_next().await? {
707 let sub_node = self.node_with_prefix(&row_key.value());
708 self.remove_recursive(trx, &sub_node).await?;
709 }
710
711 let node_prefix: Vec<u8> = self.node_subspace.unpack(node_sub.bytes())?;
712
713 trx.clear_range(&node_prefix, &strinc(node_prefix.to_owned()));
714 trx.clear_subspace_range(node_sub);
715
716 Ok(())
717 }
718}
719
720#[async_trait]
721impl Directory for DirectoryLayer {
722 async fn create_or_open(
723 &self,
724 txn: &Transaction,
725 path: &[String],
726 prefix: Option<&[u8]>,
727 layer: Option<&[u8]>,
728 ) -> Result<DirectoryOutput, DirectoryError> {
729 self.create_or_open_internal(txn, path, prefix, layer, true, true)
730 .await
731 }
732
733 async fn create(
734 &self,
735 txn: &Transaction,
736 path: &[String],
737 prefix: Option<&[u8]>,
738 layer: Option<&[u8]>,
739 ) -> Result<DirectoryOutput, DirectoryError> {
740 self.create_or_open_internal(txn, path, prefix, layer, true, false)
741 .await
742 }
743
744 async fn open(
745 &self,
746 txn: &Transaction,
747 path: &[String],
748 layer: Option<&[u8]>,
749 ) -> Result<DirectoryOutput, DirectoryError> {
750 self.create_or_open_internal(txn, path, None, layer, false, true)
751 .await
752 }
753
754 async fn exists(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
755 self.exists_internal(trx, path).await
756 }
757
758 async fn move_directory(
759 &self,
760 _trx: &Transaction,
761 _new_path: &[String],
762 ) -> Result<DirectoryOutput, DirectoryError> {
763 Err(DirectoryError::CannotMoveRootDirectory)
764 }
765
766 async fn move_to(
772 &self,
773 trx: &Transaction,
774 old_path: &[String],
775 new_path: &[String],
776 ) -> Result<DirectoryOutput, DirectoryError> {
777 self.move_to_internal(trx, old_path, new_path).await
778 }
779
780 async fn remove(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
781 self.remove_internal(trx, path, true).await
782 }
783
784 async fn remove_if_exists(
785 &self,
786 trx: &Transaction,
787 path: &[String],
788 ) -> Result<bool, DirectoryError> {
789 self.remove_internal(trx, path, false).await
790 }
791
792 async fn list(
793 &self,
794 trx: &Transaction,
795 path: &[String],
796 ) -> Result<Vec<String>, DirectoryError> {
797 self.list_internal(trx, path).await
798 }
799}