foundationdb/directory/
directory_layer.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! The default Directory implementation.
10
11use crate::directory::directory_partition::DirectoryPartition;
12use crate::directory::directory_subspace::DirectorySubspace;
13use crate::directory::error::DirectoryError;
14use crate::directory::node::Node;
15use crate::directory::{compare_slice, strinc, Directory, DirectoryOutput};
16use crate::future::FdbSlice;
17use crate::tuple::hca::HighContentionAllocator;
18use crate::tuple::{Element, Subspace, TuplePack};
19use crate::RangeOption;
20use crate::{FdbResult, Transaction};
21use async_recursion::async_recursion;
22use async_trait::async_trait;
23use futures::TryStreamExt;
24use std::cmp::Ordering;
25use std::ops::Deref;
26use std::sync::Arc;
27
28pub(crate) const DEFAULT_SUB_DIRS: i64 = 0;
29const MAJOR_VERSION: u32 = 1;
30const MINOR_VERSION: u32 = 0;
31const PATCH_VERSION: u32 = 0;
32pub(crate) const DEFAULT_NODE_PREFIX: &[u8] = b"\xFE";
33const DEFAULT_HCA_PREFIX: &[u8] = b"hca";
34pub(crate) const PARTITION_LAYER: &[u8] = b"partition";
35pub(crate) const LAYER_SUFFIX: &[u8] = b"layer";
36
37/// A DirectoryLayer defines a new root directory.
38/// The node subspace and content subspace control where the directory metadata and contents,
39/// respectively, are stored. The default root directory has a node subspace with raw prefix \xFE
40/// and a content subspace with no prefix.
41#[derive(Clone)]
42pub struct DirectoryLayer {
43    pub(crate) inner: Arc<DirectoryLayerInner>,
44}
45
46impl std::fmt::Debug for DirectoryLayer {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        self.inner.fmt(f)
49    }
50}
51
52#[derive(Debug)]
53pub struct DirectoryLayerInner {
54    pub(crate) root_node: Subspace,
55    pub(crate) node_subspace: Subspace,
56    pub(crate) content_subspace: Subspace,
57    pub(crate) allocator: HighContentionAllocator,
58    pub(crate) allow_manual_prefixes: bool,
59
60    pub(crate) path: Vec<String>,
61}
62
63impl Deref for DirectoryLayer {
64    type Target = DirectoryLayerInner;
65
66    fn deref(&self) -> &Self::Target {
67        &self.inner
68    }
69}
70
71impl Default for DirectoryLayer {
72    /// The default root directory stores directory layer metadata in keys beginning with 0xFE,
73    ///and allocates newly created directories in (unused) prefixes starting with 0x00 through 0xFD.
74    ///This is appropriate for otherwise empty databases, but may conflict with other formal or informal partitionings of keyspace.
75    /// If you already have other content in your database, you may wish to use NewDirectoryLayer to
76    /// construct a non-standard root directory to control where metadata and keys are stored.
77    fn default() -> Self {
78        Self::new(
79            Subspace::from_bytes(DEFAULT_NODE_PREFIX),
80            Subspace::all(),
81            false,
82        )
83    }
84}
85
86impl DirectoryLayer {
87    pub fn new(
88        node_subspace: Subspace,
89        content_subspace: Subspace,
90        allow_manual_prefixes: bool,
91    ) -> Self {
92        let root_node = node_subspace.subspace(&node_subspace.bytes());
93        let allocator = HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX));
94
95        DirectoryLayer {
96            inner: Arc::new(DirectoryLayerInner {
97                root_node,
98                node_subspace,
99                content_subspace,
100                allocator,
101                allow_manual_prefixes,
102                path: vec![],
103            }),
104        }
105    }
106
107    pub(crate) fn new_with_path(
108        node_subspace: Subspace,
109        content_subspace: Subspace,
110        allow_manual_prefixes: bool,
111        path: &[String],
112    ) -> Self {
113        let root_node = node_subspace.subspace(&node_subspace.bytes());
114        let allocator = HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX));
115
116        DirectoryLayer {
117            inner: Arc::new(DirectoryLayerInner {
118                root_node,
119                node_subspace,
120                content_subspace,
121                allocator,
122                allow_manual_prefixes,
123                path: Vec::from(path),
124            }),
125        }
126    }
127
128    pub fn get_path(&self) -> &[String] {
129        self.path.as_slice()
130    }
131
132    fn node_with_optional_prefix(&self, prefix: Option<FdbSlice>) -> Option<Subspace> {
133        prefix.map(|fdb_slice| self.node_with_prefix(&fdb_slice.deref()))
134    }
135
136    fn node_with_prefix<T: TuplePack>(&self, prefix: &T) -> Subspace {
137        self.inner.node_subspace.subspace(prefix)
138    }
139
140    async fn find(
141        &self,
142        trx: &Transaction,
143        path: &[String],
144    ) -> Result<Option<Node>, DirectoryError> {
145        let mut current_path = vec![];
146        let mut node_subspace = self.root_node.clone();
147        let mut layer = vec![];
148        let mut loaded = false;
149
150        // walking through the provided path
151        for path_name in path.iter() {
152            current_path.push(path_name.clone());
153            let key = node_subspace.subspace(&(DEFAULT_SUB_DIRS, path_name.to_owned()));
154
155            // finding the next node
156            let fdb_slice_value = trx.get(key.bytes(), false).await?;
157
158            loaded = true;
159            node_subspace = match self.node_with_optional_prefix(fdb_slice_value) {
160                None => return Ok(None),
161                Some(subspace) => subspace,
162            };
163
164            layer = Node::load_metadata(trx, &node_subspace).await?;
165            if layer.as_slice().eq(PARTITION_LAYER) {
166                break;
167            }
168        }
169
170        if !loaded {
171            layer = Node::load_metadata(trx, &node_subspace).await?;
172        }
173
174        Ok(Some(Node {
175            subspace: node_subspace,
176            current_path,
177            target_path: Vec::from(path),
178            directory_layer: self.clone(),
179            layer,
180        }))
181    }
182
183    fn to_absolute_path(&self, sub_path: &[String]) -> Vec<String> {
184        let mut path: Vec<String> = Vec::with_capacity(self.path.len() + sub_path.len());
185
186        path.extend_from_slice(&self.path);
187        path.extend_from_slice(sub_path);
188
189        path
190    }
191
192    pub(crate) fn contents_of_node(
193        &self,
194        subspace: &Subspace,
195        path: &[String],
196        layer: &[u8],
197    ) -> Result<DirectoryOutput, DirectoryError> {
198        let prefix: Vec<u8> = self.node_subspace.unpack(subspace.bytes())?;
199
200        if layer.eq(PARTITION_LAYER) {
201            Ok(DirectoryOutput::DirectoryPartition(
202                DirectoryPartition::new(&self.to_absolute_path(path), prefix, self.clone()),
203            ))
204        } else {
205            Ok(DirectoryOutput::DirectorySubspace(DirectorySubspace::new(
206                &self.to_absolute_path(path),
207                prefix,
208                self,
209                layer.to_owned(),
210            )))
211        }
212    }
213
214    /// `create_or_open_internal` is the function used to open and/or create a directory.
215    #[async_recursion]
216    async fn create_or_open_internal(
217        &self,
218        trx: &Transaction,
219        path: &[String],
220        prefix: Option<&'async_recursion [u8]>,
221        layer: Option<&'async_recursion [u8]>,
222        allow_create: bool,
223        allow_open: bool,
224    ) -> Result<DirectoryOutput, DirectoryError> {
225        self.check_version(trx, false).await?;
226
227        if prefix.is_some() && !self.allow_manual_prefixes {
228            if self.path.is_empty() {
229                return Err(DirectoryError::PrefixNotAllowed);
230            }
231
232            return Err(DirectoryError::CannotPrefixInPartition);
233        }
234
235        if path.is_empty() {
236            return Err(DirectoryError::NoPathProvided);
237        }
238
239        if let Some(node) = self.find(trx, path).await? {
240            if node.is_in_partition(false) {
241                let sub_path = node.get_partition_subpath();
242                match node.get_contents()? {
243                    DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"),
244                    DirectoryOutput::DirectoryPartition(directory_partition) => {
245                        let dir_space = directory_partition
246                            .directory_subspace
247                            .directory_layer
248                            .create_or_open_internal(
249                                trx,
250                                &sub_path,
251                                prefix,
252                                layer,
253                                allow_create,
254                                allow_open,
255                            )
256                            .await?;
257                        Ok(dir_space)
258                    }
259                }
260            } else {
261                self.open_internal(layer, &node, allow_open).await
262            }
263        } else {
264            self.create_internal(trx, path, layer, prefix, allow_create)
265                .await
266        }
267    }
268
269    async fn open_internal(
270        &self,
271        layer: Option<&[u8]>,
272        node: &Node,
273        allow_open: bool,
274    ) -> Result<DirectoryOutput, DirectoryError> {
275        if !allow_open {
276            return Err(DirectoryError::DirAlreadyExists);
277        }
278
279        match layer {
280            None => {}
281            Some(layer) => {
282                if !layer.is_empty() {
283                    match compare_slice(layer, &node.layer) {
284                        Ordering::Equal => {}
285                        _ => {
286                            return Err(DirectoryError::IncompatibleLayer);
287                        }
288                    }
289                }
290            }
291        }
292
293        node.get_contents()
294    }
295
296    async fn create_internal(
297        &self,
298        trx: &Transaction,
299        path: &[String],
300        layer: Option<&[u8]>,
301        prefix: Option<&[u8]>,
302        allow_create: bool,
303    ) -> Result<DirectoryOutput, DirectoryError> {
304        let path_last = path.last().ok_or(DirectoryError::NoPathProvided)?;
305
306        if !allow_create {
307            return Err(DirectoryError::DirectoryDoesNotExists);
308        }
309
310        let layer = layer.unwrap_or_default();
311
312        self.check_version(trx, true).await?;
313        let new_prefix = self.get_prefix(trx, prefix).await?;
314
315        let is_prefix_free = self
316            .is_prefix_free(trx, new_prefix.as_slice(), prefix.is_none())
317            .await?;
318
319        if !is_prefix_free {
320            return Err(DirectoryError::DirectoryPrefixInUse);
321        }
322
323        let parent_node = self.get_parent_node(trx, path).await?;
324        let node = self.node_with_prefix(&new_prefix);
325
326        let key = parent_node.subspace(&(DEFAULT_SUB_DIRS, path_last));
327        let key_layer = node.pack(&LAYER_SUFFIX);
328
329        trx.set(key.bytes(), &new_prefix);
330        trx.set(&key_layer, layer);
331
332        self.contents_of_node(&node, path, layer)
333    }
334
335    async fn get_parent_node(
336        &self,
337        trx: &Transaction,
338        path: &[String],
339    ) -> Result<Subspace, DirectoryError> {
340        match path.split_last() {
341            None => Ok(self.root_node.clone()),
342            Some((_, remains)) => {
343                if remains.is_empty() {
344                    return Ok(self.root_node.clone());
345                }
346                let parent = self
347                    .create_or_open_internal(trx, remains, None, None, true, true)
348                    .await?;
349
350                Ok(self.node_with_prefix(&parent.bytes()?))
351            }
352        }
353    }
354
355    async fn is_prefix_free(
356        &self,
357        trx: &Transaction,
358        prefix: &[u8],
359        snapshot: bool,
360    ) -> Result<bool, DirectoryError> {
361        if prefix.is_empty() {
362            return Ok(false);
363        }
364
365        let node = self.node_containing_key(trx, prefix, snapshot).await?;
366
367        if node.is_some() {
368            return Ok(false);
369        }
370
371        let mut range_option = RangeOption::from((
372            self.node_subspace.pack(&prefix),
373            self.node_subspace.pack(&strinc(prefix.to_vec())),
374        ));
375        range_option.limit = Some(1);
376
377        let result = trx
378            .get_ranges_keyvalues(range_option, snapshot)
379            .try_next()
380            .await?;
381
382        Ok(result.is_none())
383    }
384
385    async fn node_containing_key(
386        &self,
387        trx: &Transaction,
388        key: &[u8],
389        snapshot: bool,
390    ) -> Result<Option<Subspace>, DirectoryError> {
391        if key.starts_with(self.node_subspace.bytes()) {
392            return Ok(Some(self.root_node.clone()));
393        }
394
395        let mut key_after = key.to_vec();
396        // pushing 0x00 to simulate keyAfter
397        key_after.push(0);
398
399        let range_end = self.node_subspace.pack(&key_after);
400
401        let mut range_option = RangeOption::from((self.node_subspace.range().0, range_end));
402        range_option.reverse = true;
403        range_option.limit = Some(1);
404
405        // checking range
406        let fdb_value = trx
407            .get_ranges_keyvalues(range_option, snapshot)
408            .try_next()
409            .await?;
410
411        if let Some(fdb_key_value) = fdb_value {
412            let previous_prefix: Vec<Element> = self.node_subspace.unpack(fdb_key_value.key())?;
413
414            if let Some(Element::Bytes(previous_prefix)) = previous_prefix.first() {
415                if key.starts_with(previous_prefix) {
416                    return Ok(Some(self.node_with_prefix(previous_prefix)));
417                };
418            };
419        }
420        Ok(None)
421    }
422
423    async fn get_prefix(
424        &self,
425        trx: &Transaction,
426        prefix: Option<&[u8]>,
427    ) -> Result<Vec<u8>, DirectoryError> {
428        match prefix {
429            None => {
430                // no prefix provided, allocating one
431                let allocator = self.allocator.allocate(trx).await?;
432                let subspace = self.content_subspace.subspace(&allocator);
433
434                // checking range
435                let mut range_option = RangeOption::from(subspace.range());
436                range_option.limit = Some(1);
437
438                let result = trx
439                    .get_ranges_keyvalues(range_option, false)
440                    .try_next()
441                    .await?;
442
443                if result.is_some() {
444                    return Err(DirectoryError::PrefixNotEmpty);
445                }
446
447                Ok(subspace.into_bytes())
448            }
449            Some(v) => Ok(v.to_vec()),
450        }
451    }
452
453    /// `check_version` is checking the Directory's version in FDB.
454    async fn check_version(
455        &self,
456        trx: &Transaction,
457        allow_creation: bool,
458    ) -> Result<(), DirectoryError> {
459        let version = self.get_version_value(trx).await?;
460        match version {
461            None => {
462                if allow_creation {
463                    self.initialize_directory(trx).await
464                } else {
465                    Ok(())
466                }
467            }
468            Some(versions) => {
469                if versions.len() < 12 {
470                    return Err(DirectoryError::Version(
471                        "incorrect version length".to_string(),
472                    ));
473                }
474                let mut arr = [0u8; 4];
475                arr.copy_from_slice(&versions[0..4]);
476                let major: u32 = u32::from_le_bytes(arr);
477
478                arr.copy_from_slice(&versions[4..8]);
479                let minor: u32 = u32::from_le_bytes(arr);
480
481                arr.copy_from_slice(&versions[8..12]);
482                let patch: u32 = u32::from_le_bytes(arr);
483
484                if major > MAJOR_VERSION {
485                    let msg = format!("cannot load directory with version {major}.{minor}.{patch} using directory layer {MAJOR_VERSION}.{MINOR_VERSION}.{PATCH_VERSION}");
486                    return Err(DirectoryError::Version(msg));
487                }
488
489                if minor > MINOR_VERSION {
490                    let msg = format!("directory with version {major}.{minor}.{patch} is read-only when opened using directory layer {MAJOR_VERSION}.{MINOR_VERSION}.{PATCH_VERSION}");
491                    return Err(DirectoryError::Version(msg));
492                }
493
494                Ok(())
495            }
496        }
497    }
498
499    /// `initialize_directory` is initializing the directory
500    async fn initialize_directory(&self, trx: &Transaction) -> Result<(), DirectoryError> {
501        let mut value = vec![];
502        value.extend(&MAJOR_VERSION.to_le_bytes());
503        value.extend(&MINOR_VERSION.to_le_bytes());
504        value.extend(&PATCH_VERSION.to_le_bytes());
505        let version_subspace: &[u8] = b"version";
506        let directory_version_key = self.root_node.subspace(&version_subspace);
507        trx.set(directory_version_key.bytes(), &value);
508
509        Ok(())
510    }
511
512    async fn get_version_value(&self, trx: &Transaction) -> FdbResult<Option<FdbSlice>> {
513        let version_subspace: &[u8] = b"version";
514        let version_key = self.root_node.subspace(&version_subspace);
515
516        trx.get(version_key.bytes(), false).await
517    }
518
519    async fn exists_internal(
520        &self,
521        trx: &Transaction,
522        path: &[String],
523    ) -> Result<bool, DirectoryError> {
524        self.check_version(trx, false).await?;
525
526        match self.find(trx, path).await? {
527            None => Ok(false),
528            Some(node) if node.is_in_partition(false) => {
529                node.get_contents()?
530                    .exists(trx, &node.get_partition_subpath())
531                    .await
532            }
533            Some(_node) => Ok(true),
534        }
535    }
536
537    async fn list_internal(
538        &self,
539        trx: &Transaction,
540        path: &[String],
541    ) -> Result<Vec<String>, DirectoryError> {
542        self.check_version(trx, false).await?;
543
544        let node = self
545            .find(trx, path)
546            .await?
547            .ok_or(DirectoryError::PathDoesNotExists)?;
548        if node.is_in_partition(true) {
549            match node.get_contents()? {
550                DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"),
551                DirectoryOutput::DirectoryPartition(directory_partition) => {
552                    return directory_partition
553                        .directory_subspace
554                        .directory_layer
555                        .list(trx, &node.get_partition_subpath())
556                        .await
557                }
558            };
559        }
560
561        node.list_sub_folders(trx).await
562    }
563
564    async fn move_to_internal(
565        &self,
566        trx: &Transaction,
567        old_path: &[String],
568        new_path: &[String],
569    ) -> Result<DirectoryOutput, DirectoryError> {
570        self.check_version(trx, true).await?;
571
572        if old_path.len() <= new_path.len()
573            && compare_slice(old_path, &new_path[..old_path.len()]) == Ordering::Equal
574        {
575            return Err(DirectoryError::CannotMoveBetweenSubdirectory);
576        }
577
578        let old_node = self.find(trx, old_path).await?;
579        let new_node = self.find(trx, new_path).await?;
580
581        let old_node = match old_node {
582            None => return Err(DirectoryError::PathDoesNotExists),
583            Some(n) => n,
584        };
585        let old_node_exists_in_partition = old_node.is_in_partition(false);
586
587        match new_node {
588            None => {
589                if old_node_exists_in_partition {
590                    return Err(DirectoryError::CannotMoveBetweenPartition);
591                }
592            }
593            Some(new_node) => {
594                let new_node_exists_in_partition = new_node.is_in_partition(false);
595                if old_node_exists_in_partition || new_node_exists_in_partition {
596                    if !old_node_exists_in_partition
597                        || !new_node_exists_in_partition
598                        || !old_node.current_path.eq(&new_node.current_path)
599                    {
600                        return Err(DirectoryError::CannotMoveBetweenPartition);
601                    }
602
603                    return new_node
604                        .get_contents()?
605                        .move_to(
606                            trx,
607                            &old_node.get_partition_subpath(),
608                            &new_node.get_partition_subpath(),
609                        )
610                        .await;
611                }
612                return Err(DirectoryError::DirAlreadyExists);
613            }
614        }
615
616        let (new_path_last, parent_path) = new_path
617            .split_last()
618            .ok_or(DirectoryError::DirAlreadyExists)?;
619
620        let parent_node = self
621            .find(trx, parent_path)
622            .await?
623            .ok_or(DirectoryError::ParentDirDoesNotExists)?;
624
625        let key = parent_node
626            .subspace
627            .subspace(&(DEFAULT_SUB_DIRS, new_path_last));
628        let value: Vec<u8> = self.node_subspace.unpack(old_node.subspace.bytes())?;
629        trx.set(key.bytes(), &value);
630
631        self.remove_from_parent(trx, old_path).await?;
632
633        self.contents_of_node(&old_node.subspace, new_path, &old_node.layer)
634    }
635
636    async fn remove_from_parent(
637        &self,
638        trx: &Transaction,
639        path: &[String],
640    ) -> Result<(), DirectoryError> {
641        let (last_element, parent_path) = path
642            .split_last()
643            .ok_or(DirectoryError::BadDestinationDirectory)?;
644
645        match self.find(trx, parent_path).await? {
646            None => {}
647            Some(parent_node) => {
648                let key = parent_node.subspace.pack(&(DEFAULT_SUB_DIRS, last_element));
649                trx.clear(&key);
650            }
651        }
652
653        Ok(())
654    }
655
656    #[async_recursion]
657    async fn remove_internal(
658        &self,
659        trx: &Transaction,
660        path: &[String],
661        fail_on_nonexistent: bool,
662    ) -> Result<bool, DirectoryError> {
663        self.check_version(trx, true).await?;
664
665        if path.is_empty() {
666            return Err(DirectoryError::CannotModifyRootDirectory);
667        }
668
669        let node = match self.find(trx, path).await? {
670            Some(node) => node,
671            None if fail_on_nonexistent => return Err(DirectoryError::DirectoryDoesNotExists),
672            None => return Ok(false),
673        };
674
675        if node.is_in_partition(false) {
676            match node.get_contents()? {
677                DirectoryOutput::DirectorySubspace(_) => {
678                    unreachable!("already directory partition")
679                }
680                DirectoryOutput::DirectoryPartition(d) => {
681                    return d
682                        .directory_subspace
683                        .directory_layer
684                        .remove_internal(trx, &node.get_partition_subpath(), fail_on_nonexistent)
685                        .await
686                }
687            }
688        }
689
690        self.remove_recursive(trx, &node.subspace).await?;
691        self.remove_from_parent(trx, path).await?;
692
693        Ok(true)
694    }
695
696    #[async_recursion]
697    async fn remove_recursive(
698        &self,
699        trx: &Transaction,
700        node_sub: &Subspace,
701    ) -> Result<(), DirectoryError> {
702        let sub_dir = node_sub.subspace(&DEFAULT_SUB_DIRS);
703        let range_option = RangeOption::from(&sub_dir);
704
705        let mut stream = std::pin::pin!(trx.get_ranges_keyvalues(range_option, false));
706        while let Some(row_key) = stream.try_next().await? {
707            let sub_node = self.node_with_prefix(&row_key.value());
708            self.remove_recursive(trx, &sub_node).await?;
709        }
710
711        let node_prefix: Vec<u8> = self.node_subspace.unpack(node_sub.bytes())?;
712
713        trx.clear_range(&node_prefix, &strinc(node_prefix.to_owned()));
714        trx.clear_subspace_range(node_sub);
715
716        Ok(())
717    }
718}
719
720#[async_trait]
721impl Directory for DirectoryLayer {
722    async fn create_or_open(
723        &self,
724        txn: &Transaction,
725        path: &[String],
726        prefix: Option<&[u8]>,
727        layer: Option<&[u8]>,
728    ) -> Result<DirectoryOutput, DirectoryError> {
729        self.create_or_open_internal(txn, path, prefix, layer, true, true)
730            .await
731    }
732
733    async fn create(
734        &self,
735        txn: &Transaction,
736        path: &[String],
737        prefix: Option<&[u8]>,
738        layer: Option<&[u8]>,
739    ) -> Result<DirectoryOutput, DirectoryError> {
740        self.create_or_open_internal(txn, path, prefix, layer, true, false)
741            .await
742    }
743
744    async fn open(
745        &self,
746        txn: &Transaction,
747        path: &[String],
748        layer: Option<&[u8]>,
749    ) -> Result<DirectoryOutput, DirectoryError> {
750        self.create_or_open_internal(txn, path, None, layer, false, true)
751            .await
752    }
753
754    async fn exists(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
755        self.exists_internal(trx, path).await
756    }
757
758    async fn move_directory(
759        &self,
760        _trx: &Transaction,
761        _new_path: &[String],
762    ) -> Result<DirectoryOutput, DirectoryError> {
763        Err(DirectoryError::CannotMoveRootDirectory)
764    }
765
766    /// `move_to` the directory from old_path to new_path(both relative to this
767    /// Directory), and returns the directory (at its new location) and its
768    /// contents as a Subspace. Move will return an error if a directory
769    /// does not exist at oldPath, a directory already exists at newPath, or the
770    /// parent directory of newPath does not exist.
771    async fn move_to(
772        &self,
773        trx: &Transaction,
774        old_path: &[String],
775        new_path: &[String],
776    ) -> Result<DirectoryOutput, DirectoryError> {
777        self.move_to_internal(trx, old_path, new_path).await
778    }
779
780    async fn remove(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
781        self.remove_internal(trx, path, true).await
782    }
783
784    async fn remove_if_exists(
785        &self,
786        trx: &Transaction,
787        path: &[String],
788    ) -> Result<bool, DirectoryError> {
789        self.remove_internal(trx, path, false).await
790    }
791
792    async fn list(
793        &self,
794        trx: &Transaction,
795        path: &[String],
796    ) -> Result<Vec<String>, DirectoryError> {
797        self.list_internal(trx, path).await
798    }
799}