foundationdb/directory/
directory_layer.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! The default Directory implementation.
10
11use crate::directory::directory_partition::DirectoryPartition;
12use crate::directory::directory_subspace::DirectorySubspace;
13use crate::directory::error::DirectoryError;
14use crate::directory::node::Node;
15use crate::directory::{compare_slice, strinc, Directory, DirectoryOutput};
16use crate::future::FdbSlice;
17use crate::tuple::hca::HighContentionAllocator;
18use crate::tuple::{Element, Subspace, TuplePack};
19use crate::RangeOption;
20use crate::{FdbResult, Transaction};
21use async_recursion::async_recursion;
22use async_trait::async_trait;
23use std::cmp::Ordering;
24use std::ops::Deref;
25use std::sync::Arc;
26
27pub(crate) const DEFAULT_SUB_DIRS: i64 = 0;
28const MAJOR_VERSION: u32 = 1;
29const MINOR_VERSION: u32 = 0;
30const PATCH_VERSION: u32 = 0;
31pub(crate) const DEFAULT_NODE_PREFIX: &[u8] = b"\xFE";
32const DEFAULT_HCA_PREFIX: &[u8] = b"hca";
33pub(crate) const PARTITION_LAYER: &[u8] = b"partition";
34pub(crate) const LAYER_SUFFIX: &[u8] = b"layer";
35
36/// A DirectoryLayer defines a new root directory.
37/// The node subspace and content subspace control where the directory metadata and contents,
38/// respectively, are stored. The default root directory has a node subspace with raw prefix \xFE
39/// and a content subspace with no prefix.
40#[derive(Clone)]
41pub struct DirectoryLayer {
42    pub(crate) inner: Arc<DirectoryLayerInner>,
43}
44
45impl std::fmt::Debug for DirectoryLayer {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        self.inner.fmt(f)
48    }
49}
50
51#[derive(Debug)]
52pub struct DirectoryLayerInner {
53    pub(crate) root_node: Subspace,
54    pub(crate) node_subspace: Subspace,
55    pub(crate) content_subspace: Subspace,
56    pub(crate) allocator: HighContentionAllocator,
57    pub(crate) allow_manual_prefixes: bool,
58
59    pub(crate) path: Vec<String>,
60}
61
62impl Deref for DirectoryLayer {
63    type Target = DirectoryLayerInner;
64
65    fn deref(&self) -> &Self::Target {
66        &self.inner
67    }
68}
69
70impl Default for DirectoryLayer {
71    /// The default root directory stores directory layer metadata in keys beginning with 0xFE,
72    ///and allocates newly created directories in (unused) prefixes starting with 0x00 through 0xFD.
73    ///This is appropriate for otherwise empty databases, but may conflict with other formal or informal partitionings of keyspace.
74    /// If you already have other content in your database, you may wish to use NewDirectoryLayer to
75    /// construct a non-standard root directory to control where metadata and keys are stored.
76    fn default() -> Self {
77        Self::new(
78            Subspace::from_bytes(DEFAULT_NODE_PREFIX),
79            Subspace::all(),
80            false,
81        )
82    }
83}
84
85impl DirectoryLayer {
86    pub fn new(
87        node_subspace: Subspace,
88        content_subspace: Subspace,
89        allow_manual_prefixes: bool,
90    ) -> Self {
91        let root_node = node_subspace.subspace(&node_subspace.bytes());
92        let allocator = HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX));
93
94        DirectoryLayer {
95            inner: Arc::new(DirectoryLayerInner {
96                root_node,
97                node_subspace,
98                content_subspace,
99                allocator,
100                allow_manual_prefixes,
101                path: vec![],
102            }),
103        }
104    }
105
106    pub(crate) fn new_with_path(
107        node_subspace: Subspace,
108        content_subspace: Subspace,
109        allow_manual_prefixes: bool,
110        path: &[String],
111    ) -> Self {
112        let root_node = node_subspace.subspace(&node_subspace.bytes());
113        let allocator = HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX));
114
115        DirectoryLayer {
116            inner: Arc::new(DirectoryLayerInner {
117                root_node,
118                node_subspace,
119                content_subspace,
120                allocator,
121                allow_manual_prefixes,
122                path: Vec::from(path),
123            }),
124        }
125    }
126
127    pub fn get_path(&self) -> &[String] {
128        self.path.as_slice()
129    }
130
131    fn node_with_optional_prefix(&self, prefix: Option<FdbSlice>) -> Option<Subspace> {
132        prefix.map(|fdb_slice| self.node_with_prefix(&fdb_slice.deref()))
133    }
134
135    fn node_with_prefix<T: TuplePack>(&self, prefix: &T) -> Subspace {
136        self.inner.node_subspace.subspace(prefix)
137    }
138
139    async fn find(
140        &self,
141        trx: &Transaction,
142        path: &[String],
143    ) -> Result<Option<Node>, DirectoryError> {
144        let mut current_path = vec![];
145        let mut node_subspace = self.root_node.clone();
146        let mut layer = vec![];
147        let mut loaded = false;
148
149        // walking through the provided path
150        for path_name in path.iter() {
151            current_path.push(path_name.clone());
152            let key = node_subspace.subspace(&(DEFAULT_SUB_DIRS, path_name.to_owned()));
153
154            // finding the next node
155            let fdb_slice_value = trx.get(key.bytes(), false).await?;
156
157            loaded = true;
158            node_subspace = match self.node_with_optional_prefix(fdb_slice_value) {
159                None => return Ok(None),
160                Some(subspace) => subspace,
161            };
162
163            layer = Node::load_metadata(trx, &node_subspace).await?;
164            if layer.as_slice().eq(PARTITION_LAYER) {
165                break;
166            }
167        }
168
169        if !loaded {
170            layer = Node::load_metadata(trx, &node_subspace).await?;
171        }
172
173        Ok(Some(Node {
174            subspace: node_subspace,
175            current_path,
176            target_path: Vec::from(path),
177            directory_layer: self.clone(),
178            layer,
179        }))
180    }
181
182    fn to_absolute_path(&self, sub_path: &[String]) -> Vec<String> {
183        let mut path: Vec<String> = Vec::with_capacity(self.path.len() + sub_path.len());
184
185        path.extend_from_slice(&self.path);
186        path.extend_from_slice(sub_path);
187
188        path
189    }
190
191    pub(crate) fn contents_of_node(
192        &self,
193        subspace: &Subspace,
194        path: &[String],
195        layer: &[u8],
196    ) -> Result<DirectoryOutput, DirectoryError> {
197        let prefix: Vec<u8> = self.node_subspace.unpack(subspace.bytes())?;
198
199        if layer.eq(PARTITION_LAYER) {
200            Ok(DirectoryOutput::DirectoryPartition(
201                DirectoryPartition::new(&self.to_absolute_path(path), prefix, self.clone()),
202            ))
203        } else {
204            Ok(DirectoryOutput::DirectorySubspace(DirectorySubspace::new(
205                &self.to_absolute_path(path),
206                prefix,
207                self,
208                layer.to_owned(),
209            )))
210        }
211    }
212
213    /// `create_or_open_internal` is the function used to open and/or create a directory.
214    #[async_recursion]
215    async fn create_or_open_internal(
216        &self,
217        trx: &Transaction,
218        path: &[String],
219        prefix: Option<&'async_recursion [u8]>,
220        layer: Option<&'async_recursion [u8]>,
221        allow_create: bool,
222        allow_open: bool,
223    ) -> Result<DirectoryOutput, DirectoryError> {
224        self.check_version(trx, false).await?;
225
226        if prefix.is_some() && !self.allow_manual_prefixes {
227            if self.path.is_empty() {
228                return Err(DirectoryError::PrefixNotAllowed);
229            }
230
231            return Err(DirectoryError::CannotPrefixInPartition);
232        }
233
234        if path.is_empty() {
235            return Err(DirectoryError::NoPathProvided);
236        }
237
238        if let Some(node) = self.find(trx, path).await? {
239            if node.is_in_partition(false) {
240                let sub_path = node.get_partition_subpath();
241                match node.get_contents()? {
242                    DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"),
243                    DirectoryOutput::DirectoryPartition(directory_partition) => {
244                        let dir_space = directory_partition
245                            .directory_subspace
246                            .directory_layer
247                            .create_or_open_internal(
248                                trx,
249                                &sub_path,
250                                prefix,
251                                layer,
252                                allow_create,
253                                allow_open,
254                            )
255                            .await?;
256                        Ok(dir_space)
257                    }
258                }
259            } else {
260                self.open_internal(layer, &node, allow_open).await
261            }
262        } else {
263            self.create_internal(trx, path, layer, prefix, allow_create)
264                .await
265        }
266    }
267
268    async fn open_internal(
269        &self,
270        layer: Option<&[u8]>,
271        node: &Node,
272        allow_open: bool,
273    ) -> Result<DirectoryOutput, DirectoryError> {
274        if !allow_open {
275            return Err(DirectoryError::DirAlreadyExists);
276        }
277
278        match layer {
279            None => {}
280            Some(layer) => {
281                if !layer.is_empty() {
282                    match compare_slice(layer, &node.layer) {
283                        Ordering::Equal => {}
284                        _ => {
285                            return Err(DirectoryError::IncompatibleLayer);
286                        }
287                    }
288                }
289            }
290        }
291
292        node.get_contents()
293    }
294
295    async fn create_internal(
296        &self,
297        trx: &Transaction,
298        path: &[String],
299        layer: Option<&[u8]>,
300        prefix: Option<&[u8]>,
301        allow_create: bool,
302    ) -> Result<DirectoryOutput, DirectoryError> {
303        let path_last = path.last().ok_or(DirectoryError::NoPathProvided)?;
304
305        if !allow_create {
306            return Err(DirectoryError::DirectoryDoesNotExists);
307        }
308
309        let layer = layer.unwrap_or_default();
310
311        self.check_version(trx, true).await?;
312        let new_prefix = self.get_prefix(trx, prefix).await?;
313
314        let is_prefix_free = self
315            .is_prefix_free(trx, new_prefix.as_slice(), prefix.is_none())
316            .await?;
317
318        if !is_prefix_free {
319            return Err(DirectoryError::DirectoryPrefixInUse);
320        }
321
322        let parent_node = self.get_parent_node(trx, path).await?;
323        let node = self.node_with_prefix(&new_prefix);
324
325        let key = parent_node.subspace(&(DEFAULT_SUB_DIRS, path_last));
326        let key_layer = node.pack(&LAYER_SUFFIX);
327
328        trx.set(key.bytes(), &new_prefix);
329        trx.set(&key_layer, layer);
330
331        self.contents_of_node(&node, path, layer)
332    }
333
334    async fn get_parent_node(
335        &self,
336        trx: &Transaction,
337        path: &[String],
338    ) -> Result<Subspace, DirectoryError> {
339        match path.split_last() {
340            None => Ok(self.root_node.clone()),
341            Some((_, remains)) => {
342                if remains.is_empty() {
343                    return Ok(self.root_node.clone());
344                }
345                let parent = self
346                    .create_or_open_internal(trx, remains, None, None, true, true)
347                    .await?;
348
349                Ok(self.node_with_prefix(&parent.bytes()?))
350            }
351        }
352    }
353
354    async fn is_prefix_free(
355        &self,
356        trx: &Transaction,
357        prefix: &[u8],
358        snapshot: bool,
359    ) -> Result<bool, DirectoryError> {
360        if prefix.is_empty() {
361            return Ok(false);
362        }
363
364        let node = self.node_containing_key(trx, prefix, snapshot).await?;
365
366        if node.is_some() {
367            return Ok(false);
368        }
369
370        let range_option = RangeOption::from((
371            self.node_subspace.pack(&prefix),
372            self.node_subspace.pack(&strinc(prefix.to_vec())),
373        ));
374
375        let result = trx.get_range(&range_option, 1, snapshot).await?;
376
377        Ok(result.is_empty())
378    }
379
380    async fn node_containing_key(
381        &self,
382        trx: &Transaction,
383        key: &[u8],
384        snapshot: bool,
385    ) -> Result<Option<Subspace>, DirectoryError> {
386        if key.starts_with(self.node_subspace.bytes()) {
387            return Ok(Some(self.root_node.clone()));
388        }
389
390        let mut key_after = key.to_vec();
391        // pushing 0x00 to simulate keyAfter
392        key_after.push(0);
393
394        let range_end = self.node_subspace.pack(&key_after);
395
396        let mut range_option = RangeOption::from((self.node_subspace.range().0, range_end));
397        range_option.reverse = true;
398        range_option.limit = Some(1);
399
400        // checking range
401        let fdb_values = trx.get_range(&range_option, 1, snapshot).await?;
402
403        match fdb_values.first() {
404            None => {}
405            Some(fdb_key_value) => {
406                let previous_prefix: Vec<Element> =
407                    self.node_subspace.unpack(fdb_key_value.key())?;
408
409                if let Some(Element::Bytes(previous_prefix)) = previous_prefix.first() {
410                    if key.starts_with(previous_prefix) {
411                        return Ok(Some(self.node_with_prefix(previous_prefix)));
412                    };
413                };
414            }
415        }
416        Ok(None)
417    }
418
419    async fn get_prefix(
420        &self,
421        trx: &Transaction,
422        prefix: Option<&[u8]>,
423    ) -> Result<Vec<u8>, DirectoryError> {
424        match prefix {
425            None => {
426                // no prefix provided, allocating one
427                let allocator = self.allocator.allocate(trx).await?;
428                let subspace = self.content_subspace.subspace(&allocator);
429
430                // checking range
431                let result = trx
432                    .get_range(&RangeOption::from(subspace.range()), 1, false)
433                    .await?;
434
435                if !result.is_empty() {
436                    return Err(DirectoryError::PrefixNotEmpty);
437                }
438
439                Ok(subspace.into_bytes())
440            }
441            Some(v) => Ok(v.to_vec()),
442        }
443    }
444
445    /// `check_version` is checking the Directory's version in FDB.
446    async fn check_version(
447        &self,
448        trx: &Transaction,
449        allow_creation: bool,
450    ) -> Result<(), DirectoryError> {
451        let version = self.get_version_value(trx).await?;
452        match version {
453            None => {
454                if allow_creation {
455                    self.initialize_directory(trx).await
456                } else {
457                    Ok(())
458                }
459            }
460            Some(versions) => {
461                if versions.len() < 12 {
462                    return Err(DirectoryError::Version(
463                        "incorrect version length".to_string(),
464                    ));
465                }
466                let mut arr = [0u8; 4];
467                arr.copy_from_slice(&versions[0..4]);
468                let major: u32 = u32::from_le_bytes(arr);
469
470                arr.copy_from_slice(&versions[4..8]);
471                let minor: u32 = u32::from_le_bytes(arr);
472
473                arr.copy_from_slice(&versions[8..12]);
474                let patch: u32 = u32::from_le_bytes(arr);
475
476                if major > MAJOR_VERSION {
477                    let msg = format!("cannot load directory with version {}.{}.{} using directory layer {}.{}.{}", major, minor, patch, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION);
478                    return Err(DirectoryError::Version(msg));
479                }
480
481                if minor > MINOR_VERSION {
482                    let msg = format!("directory with version {}.{}.{} is read-only when opened using directory layer {}.{}.{}", major, minor, patch, MAJOR_VERSION, MINOR_VERSION, PATCH_VERSION);
483                    return Err(DirectoryError::Version(msg));
484                }
485
486                Ok(())
487            }
488        }
489    }
490
491    /// `initialize_directory` is initializing the directory
492    async fn initialize_directory(&self, trx: &Transaction) -> Result<(), DirectoryError> {
493        let mut value = vec![];
494        value.extend(&MAJOR_VERSION.to_le_bytes());
495        value.extend(&MINOR_VERSION.to_le_bytes());
496        value.extend(&PATCH_VERSION.to_le_bytes());
497        let version_subspace: &[u8] = b"version";
498        let directory_version_key = self.root_node.subspace(&version_subspace);
499        trx.set(directory_version_key.bytes(), &value);
500
501        Ok(())
502    }
503
504    async fn get_version_value(&self, trx: &Transaction) -> FdbResult<Option<FdbSlice>> {
505        let version_subspace: &[u8] = b"version";
506        let version_key = self.root_node.subspace(&version_subspace);
507
508        trx.get(version_key.bytes(), false).await
509    }
510
511    async fn exists_internal(
512        &self,
513        trx: &Transaction,
514        path: &[String],
515    ) -> Result<bool, DirectoryError> {
516        self.check_version(trx, false).await?;
517
518        match self.find(trx, path).await? {
519            None => Ok(false),
520            Some(node) if node.is_in_partition(false) => {
521                node.get_contents()?
522                    .exists(trx, &node.get_partition_subpath())
523                    .await
524            }
525            Some(_node) => Ok(true),
526        }
527    }
528
529    async fn list_internal(
530        &self,
531        trx: &Transaction,
532        path: &[String],
533    ) -> Result<Vec<String>, DirectoryError> {
534        self.check_version(trx, false).await?;
535
536        let node = self
537            .find(trx, path)
538            .await?
539            .ok_or(DirectoryError::PathDoesNotExists)?;
540        if node.is_in_partition(true) {
541            match node.get_contents()? {
542                DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"),
543                DirectoryOutput::DirectoryPartition(directory_partition) => {
544                    return directory_partition
545                        .directory_subspace
546                        .directory_layer
547                        .list(trx, &node.get_partition_subpath())
548                        .await
549                }
550            };
551        }
552
553        node.list_sub_folders(trx).await
554    }
555
556    async fn move_to_internal(
557        &self,
558        trx: &Transaction,
559        old_path: &[String],
560        new_path: &[String],
561    ) -> Result<DirectoryOutput, DirectoryError> {
562        self.check_version(trx, true).await?;
563
564        if old_path.len() <= new_path.len()
565            && compare_slice(old_path, &new_path[..old_path.len()]) == Ordering::Equal
566        {
567            return Err(DirectoryError::CannotMoveBetweenSubdirectory);
568        }
569
570        let old_node = self.find(trx, old_path).await?;
571        let new_node = self.find(trx, new_path).await?;
572
573        let old_node = match old_node {
574            None => return Err(DirectoryError::PathDoesNotExists),
575            Some(n) => n,
576        };
577        let old_node_exists_in_partition = old_node.is_in_partition(false);
578
579        match new_node {
580            None => {
581                if old_node_exists_in_partition {
582                    return Err(DirectoryError::CannotMoveBetweenPartition);
583                }
584            }
585            Some(new_node) => {
586                let new_node_exists_in_partition = new_node.is_in_partition(false);
587                if old_node_exists_in_partition || new_node_exists_in_partition {
588                    if !old_node_exists_in_partition
589                        || !new_node_exists_in_partition
590                        || !old_node.current_path.eq(&new_node.current_path)
591                    {
592                        return Err(DirectoryError::CannotMoveBetweenPartition);
593                    }
594
595                    return new_node
596                        .get_contents()?
597                        .move_to(
598                            trx,
599                            &old_node.get_partition_subpath(),
600                            &new_node.get_partition_subpath(),
601                        )
602                        .await;
603                }
604                return Err(DirectoryError::DirAlreadyExists);
605            }
606        }
607
608        let (new_path_last, parent_path) = new_path
609            .split_last()
610            .ok_or(DirectoryError::DirAlreadyExists)?;
611
612        let parent_node = self
613            .find(trx, parent_path)
614            .await?
615            .ok_or(DirectoryError::ParentDirDoesNotExists)?;
616
617        let key = parent_node
618            .subspace
619            .subspace(&(DEFAULT_SUB_DIRS, new_path_last));
620        let value: Vec<u8> = self.node_subspace.unpack(old_node.subspace.bytes())?;
621        trx.set(key.bytes(), &value);
622
623        self.remove_from_parent(trx, old_path).await?;
624
625        self.contents_of_node(&old_node.subspace, new_path, &old_node.layer)
626    }
627
628    async fn remove_from_parent(
629        &self,
630        trx: &Transaction,
631        path: &[String],
632    ) -> Result<(), DirectoryError> {
633        let (last_element, parent_path) = path
634            .split_last()
635            .ok_or(DirectoryError::BadDestinationDirectory)?;
636
637        match self.find(trx, parent_path).await? {
638            None => {}
639            Some(parent_node) => {
640                let key = parent_node.subspace.pack(&(DEFAULT_SUB_DIRS, last_element));
641                trx.clear(&key);
642            }
643        }
644
645        Ok(())
646    }
647
648    #[async_recursion]
649    async fn remove_internal(
650        &self,
651        trx: &Transaction,
652        path: &[String],
653        fail_on_nonexistent: bool,
654    ) -> Result<bool, DirectoryError> {
655        self.check_version(trx, true).await?;
656
657        if path.is_empty() {
658            return Err(DirectoryError::CannotModifyRootDirectory);
659        }
660
661        let node = match self.find(trx, path).await? {
662            Some(node) => node,
663            None if fail_on_nonexistent => return Err(DirectoryError::DirectoryDoesNotExists),
664            None => return Ok(false),
665        };
666
667        if node.is_in_partition(false) {
668            match node.get_contents()? {
669                DirectoryOutput::DirectorySubspace(_) => {
670                    unreachable!("already directory partition")
671                }
672                DirectoryOutput::DirectoryPartition(d) => {
673                    return d
674                        .directory_subspace
675                        .directory_layer
676                        .remove_internal(trx, &node.get_partition_subpath(), fail_on_nonexistent)
677                        .await
678                }
679            }
680        }
681
682        self.remove_recursive(trx, &node.subspace).await?;
683        self.remove_from_parent(trx, path).await?;
684
685        Ok(true)
686    }
687
688    #[async_recursion]
689    async fn remove_recursive(
690        &self,
691        trx: &Transaction,
692        node_sub: &Subspace,
693    ) -> Result<(), DirectoryError> {
694        let sub_dir = node_sub.subspace(&DEFAULT_SUB_DIRS);
695        let (mut begin, end) = sub_dir.range();
696
697        loop {
698            let range_option = RangeOption::from((begin.as_slice(), end.as_slice()));
699
700            let range = trx.get_range(&range_option, 1024, false).await?;
701            let has_more = range.more();
702
703            for row_key in range {
704                let sub_node = self.node_with_prefix(&row_key.value());
705                self.remove_recursive(trx, &sub_node).await?;
706                begin = row_key.key().pack_to_vec();
707            }
708
709            if !has_more {
710                break;
711            }
712        }
713
714        let node_prefix: Vec<u8> = self.node_subspace.unpack(node_sub.bytes())?;
715
716        trx.clear_range(&node_prefix, &strinc(node_prefix.to_owned()));
717        trx.clear_subspace_range(node_sub);
718
719        Ok(())
720    }
721}
722
723#[async_trait]
724impl Directory for DirectoryLayer {
725    async fn create_or_open(
726        &self,
727        txn: &Transaction,
728        path: &[String],
729        prefix: Option<&[u8]>,
730        layer: Option<&[u8]>,
731    ) -> Result<DirectoryOutput, DirectoryError> {
732        self.create_or_open_internal(txn, path, prefix, layer, true, true)
733            .await
734    }
735
736    async fn create(
737        &self,
738        txn: &Transaction,
739        path: &[String],
740        prefix: Option<&[u8]>,
741        layer: Option<&[u8]>,
742    ) -> Result<DirectoryOutput, DirectoryError> {
743        self.create_or_open_internal(txn, path, prefix, layer, true, false)
744            .await
745    }
746
747    async fn open(
748        &self,
749        txn: &Transaction,
750        path: &[String],
751        layer: Option<&[u8]>,
752    ) -> Result<DirectoryOutput, DirectoryError> {
753        self.create_or_open_internal(txn, path, None, layer, false, true)
754            .await
755    }
756
757    async fn exists(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
758        self.exists_internal(trx, path).await
759    }
760
761    async fn move_directory(
762        &self,
763        _trx: &Transaction,
764        _new_path: &[String],
765    ) -> Result<DirectoryOutput, DirectoryError> {
766        Err(DirectoryError::CannotMoveRootDirectory)
767    }
768
769    /// `move_to` the directory from old_path to new_path(both relative to this
770    /// Directory), and returns the directory (at its new location) and its
771    /// contents as a Subspace. Move will return an error if a directory
772    /// does not exist at oldPath, a directory already exists at newPath, or the
773    /// parent directory of newPath does not exist.
774    async fn move_to(
775        &self,
776        trx: &Transaction,
777        old_path: &[String],
778        new_path: &[String],
779    ) -> Result<DirectoryOutput, DirectoryError> {
780        self.move_to_internal(trx, old_path, new_path).await
781    }
782
783    async fn remove(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
784        self.remove_internal(trx, path, true).await
785    }
786
787    async fn remove_if_exists(
788        &self,
789        trx: &Transaction,
790        path: &[String],
791    ) -> Result<bool, DirectoryError> {
792        self.remove_internal(trx, path, false).await
793    }
794
795    async fn list(
796        &self,
797        trx: &Transaction,
798        path: &[String],
799    ) -> Result<Vec<String>, DirectoryError> {
800        self.list_internal(trx, path).await
801    }
802}