Skip to main content

foundationdb/directory/
directory_layer.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! The default Directory implementation.
10
11use crate::RangeOption;
12use crate::directory::directory_partition::DirectoryPartition;
13use crate::directory::directory_subspace::DirectorySubspace;
14use crate::directory::error::DirectoryError;
15use crate::directory::node::Node;
16use crate::directory::{Directory, DirectoryOutput, compare_slice, strinc};
17use crate::future::FdbSlice;
18use crate::tuple::hca::HighContentionAllocator;
19use crate::tuple::{Element, Subspace, TuplePack};
20use crate::{FdbResult, Transaction};
21use async_recursion::async_recursion;
22use async_trait::async_trait;
23use futures::TryStreamExt;
24use std::cmp::Ordering;
25use std::ops::Deref;
26use std::sync::Arc;
27
28pub(crate) const DEFAULT_SUB_DIRS: i64 = 0;
29const MAJOR_VERSION: u32 = 1;
30const MINOR_VERSION: u32 = 0;
31const PATCH_VERSION: u32 = 0;
32pub(crate) const DEFAULT_NODE_PREFIX: &[u8] = b"\xFE";
33const DEFAULT_HCA_PREFIX: &[u8] = b"hca";
34pub(crate) const PARTITION_LAYER: &[u8] = b"partition";
35pub(crate) const LAYER_SUFFIX: &[u8] = b"layer";
36
37/// A DirectoryLayer defines a new root directory.
38/// The node subspace and content subspace control where the directory metadata and contents,
39/// respectively, are stored. The default root directory has a node subspace with raw prefix \xFE
40/// and a content subspace with no prefix.
41#[derive(Clone)]
42pub struct DirectoryLayer {
43    pub(crate) inner: Arc<DirectoryLayerInner>,
44}
45
46impl std::fmt::Debug for DirectoryLayer {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        self.inner.fmt(f)
49    }
50}
51
52#[derive(Debug)]
53pub struct DirectoryLayerInner {
54    pub(crate) root_node: Subspace,
55    pub(crate) node_subspace: Subspace,
56    pub(crate) content_subspace: Subspace,
57    pub(crate) allocator: HighContentionAllocator,
58    pub(crate) allow_manual_prefixes: bool,
59
60    pub(crate) path: Vec<String>,
61}
62
63impl Deref for DirectoryLayer {
64    type Target = DirectoryLayerInner;
65
66    fn deref(&self) -> &Self::Target {
67        &self.inner
68    }
69}
70
71impl Default for DirectoryLayer {
72    /// The default root directory stores directory layer metadata in keys beginning with 0xFE,
73    ///and allocates newly created directories in (unused) prefixes starting with 0x00 through 0xFD.
74    ///This is appropriate for otherwise empty databases, but may conflict with other formal or informal partitionings of keyspace.
75    /// If you already have other content in your database, you may wish to use NewDirectoryLayer to
76    /// construct a non-standard root directory to control where metadata and keys are stored.
77    fn default() -> Self {
78        Self::new(
79            Subspace::from_bytes(DEFAULT_NODE_PREFIX),
80            Subspace::all(),
81            false,
82        )
83    }
84}
85
86impl DirectoryLayer {
87    pub fn new(
88        node_subspace: Subspace,
89        content_subspace: Subspace,
90        allow_manual_prefixes: bool,
91    ) -> Self {
92        let root_node = node_subspace.subspace(&node_subspace.bytes());
93        let allocator = HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX));
94
95        DirectoryLayer {
96            inner: Arc::new(DirectoryLayerInner {
97                root_node,
98                node_subspace,
99                content_subspace,
100                allocator,
101                allow_manual_prefixes,
102                path: vec![],
103            }),
104        }
105    }
106
107    pub(crate) fn new_with_path(
108        node_subspace: Subspace,
109        content_subspace: Subspace,
110        allow_manual_prefixes: bool,
111        path: &[String],
112    ) -> Self {
113        let root_node = node_subspace.subspace(&node_subspace.bytes());
114        let allocator = HighContentionAllocator::new(root_node.subspace(&DEFAULT_HCA_PREFIX));
115
116        DirectoryLayer {
117            inner: Arc::new(DirectoryLayerInner {
118                root_node,
119                node_subspace,
120                content_subspace,
121                allocator,
122                allow_manual_prefixes,
123                path: Vec::from(path),
124            }),
125        }
126    }
127
128    pub fn get_path(&self) -> &[String] {
129        self.path.as_slice()
130    }
131
132    fn node_with_optional_prefix(&self, prefix: Option<FdbSlice>) -> Option<Subspace> {
133        prefix.map(|fdb_slice| self.node_with_prefix(&fdb_slice.deref()))
134    }
135
136    fn node_with_prefix<T: TuplePack>(&self, prefix: &T) -> Subspace {
137        self.inner.node_subspace.subspace(prefix)
138    }
139
140    async fn find(
141        &self,
142        trx: &Transaction,
143        path: &[String],
144    ) -> Result<Option<Node>, DirectoryError> {
145        let mut current_path = vec![];
146        let mut node_subspace = self.root_node.clone();
147        let mut layer = vec![];
148        let mut loaded = false;
149
150        // walking through the provided path
151        for path_name in path.iter() {
152            current_path.push(path_name.clone());
153            let key = node_subspace.subspace(&(DEFAULT_SUB_DIRS, path_name.to_owned()));
154
155            // finding the next node
156            let fdb_slice_value = trx.get(key.bytes(), false).await?;
157
158            loaded = true;
159            node_subspace = match self.node_with_optional_prefix(fdb_slice_value) {
160                None => return Ok(None),
161                Some(subspace) => subspace,
162            };
163
164            layer = Node::load_metadata(trx, &node_subspace).await?;
165            if layer.as_slice().eq(PARTITION_LAYER) {
166                break;
167            }
168        }
169
170        if !loaded {
171            layer = Node::load_metadata(trx, &node_subspace).await?;
172        }
173
174        Ok(Some(Node {
175            subspace: node_subspace,
176            current_path,
177            target_path: Vec::from(path),
178            directory_layer: self.clone(),
179            layer,
180        }))
181    }
182
183    fn to_absolute_path(&self, sub_path: &[String]) -> Vec<String> {
184        let mut path: Vec<String> = Vec::with_capacity(self.path.len() + sub_path.len());
185
186        path.extend_from_slice(&self.path);
187        path.extend_from_slice(sub_path);
188
189        path
190    }
191
192    pub(crate) fn contents_of_node(
193        &self,
194        subspace: &Subspace,
195        path: &[String],
196        layer: &[u8],
197    ) -> Result<DirectoryOutput, DirectoryError> {
198        let prefix: Vec<u8> = self.node_subspace.unpack(subspace.bytes())?;
199
200        if layer.eq(PARTITION_LAYER) {
201            Ok(DirectoryOutput::DirectoryPartition(
202                DirectoryPartition::new(&self.to_absolute_path(path), prefix, self.clone()),
203            ))
204        } else {
205            Ok(DirectoryOutput::DirectorySubspace(DirectorySubspace::new(
206                &self.to_absolute_path(path),
207                prefix,
208                self,
209                layer.to_owned(),
210            )))
211        }
212    }
213
214    /// `create_or_open_internal` is the function used to open and/or create a directory.
215    #[async_recursion]
216    async fn create_or_open_internal(
217        &self,
218        trx: &Transaction,
219        path: &[String],
220        prefix: Option<&'async_recursion [u8]>,
221        layer: Option<&'async_recursion [u8]>,
222        allow_create: bool,
223        allow_open: bool,
224    ) -> Result<DirectoryOutput, DirectoryError> {
225        self.check_version(trx, false).await?;
226
227        if prefix.is_some() && !self.allow_manual_prefixes {
228            if self.path.is_empty() {
229                return Err(DirectoryError::PrefixNotAllowed);
230            }
231
232            return Err(DirectoryError::CannotPrefixInPartition);
233        }
234
235        if path.is_empty() {
236            return Err(DirectoryError::NoPathProvided);
237        }
238
239        if let Some(node) = self.find(trx, path).await? {
240            if node.is_in_partition(false) {
241                let sub_path = node.get_partition_subpath();
242                match node.get_contents()? {
243                    DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"),
244                    DirectoryOutput::DirectoryPartition(directory_partition) => {
245                        let dir_space = directory_partition
246                            .directory_subspace
247                            .directory_layer
248                            .create_or_open_internal(
249                                trx,
250                                &sub_path,
251                                prefix,
252                                layer,
253                                allow_create,
254                                allow_open,
255                            )
256                            .await?;
257                        Ok(dir_space)
258                    }
259                }
260            } else {
261                self.open_internal(layer, &node, allow_open).await
262            }
263        } else {
264            self.create_internal(trx, path, layer, prefix, allow_create)
265                .await
266        }
267    }
268
269    async fn open_internal(
270        &self,
271        layer: Option<&[u8]>,
272        node: &Node,
273        allow_open: bool,
274    ) -> Result<DirectoryOutput, DirectoryError> {
275        if !allow_open {
276            return Err(DirectoryError::DirAlreadyExists);
277        }
278
279        match layer {
280            None => {}
281            Some(layer) if !layer.is_empty() => match compare_slice(layer, &node.layer) {
282                Ordering::Equal => {}
283                _ => {
284                    return Err(DirectoryError::IncompatibleLayer);
285                }
286            },
287            Some(_) => {}
288        }
289
290        node.get_contents()
291    }
292
293    async fn create_internal(
294        &self,
295        trx: &Transaction,
296        path: &[String],
297        layer: Option<&[u8]>,
298        prefix: Option<&[u8]>,
299        allow_create: bool,
300    ) -> Result<DirectoryOutput, DirectoryError> {
301        let path_last = path.last().ok_or(DirectoryError::NoPathProvided)?;
302
303        if !allow_create {
304            return Err(DirectoryError::DirectoryDoesNotExists);
305        }
306
307        let layer = layer.unwrap_or_default();
308
309        self.check_version(trx, true).await?;
310        let new_prefix = self.get_prefix(trx, prefix).await?;
311
312        let is_prefix_free = self
313            .is_prefix_free(trx, new_prefix.as_slice(), prefix.is_none())
314            .await?;
315
316        if !is_prefix_free {
317            return Err(DirectoryError::DirectoryPrefixInUse);
318        }
319
320        let parent_node = self.get_parent_node(trx, path).await?;
321        let node = self.node_with_prefix(&new_prefix);
322
323        let key = parent_node.subspace(&(DEFAULT_SUB_DIRS, path_last));
324        let key_layer = node.pack(&LAYER_SUFFIX);
325
326        trx.set(key.bytes(), &new_prefix);
327        trx.set(&key_layer, layer);
328
329        self.contents_of_node(&node, path, layer)
330    }
331
332    async fn get_parent_node(
333        &self,
334        trx: &Transaction,
335        path: &[String],
336    ) -> Result<Subspace, DirectoryError> {
337        match path.split_last() {
338            None => Ok(self.root_node.clone()),
339            Some((_, remains)) => {
340                if remains.is_empty() {
341                    return Ok(self.root_node.clone());
342                }
343                let parent = self
344                    .create_or_open_internal(trx, remains, None, None, true, true)
345                    .await?;
346
347                Ok(self.node_with_prefix(&parent.bytes()?))
348            }
349        }
350    }
351
352    async fn is_prefix_free(
353        &self,
354        trx: &Transaction,
355        prefix: &[u8],
356        snapshot: bool,
357    ) -> Result<bool, DirectoryError> {
358        if prefix.is_empty() {
359            return Ok(false);
360        }
361
362        let node = self.node_containing_key(trx, prefix, snapshot).await?;
363
364        if node.is_some() {
365            return Ok(false);
366        }
367
368        let mut range_option = RangeOption::from((
369            self.node_subspace.pack(&prefix),
370            self.node_subspace.pack(&strinc(prefix.to_vec())),
371        ));
372        range_option.limit = Some(1);
373
374        let result = trx
375            .get_ranges_keyvalues(range_option, snapshot)
376            .try_next()
377            .await?;
378
379        Ok(result.is_none())
380    }
381
382    async fn node_containing_key(
383        &self,
384        trx: &Transaction,
385        key: &[u8],
386        snapshot: bool,
387    ) -> Result<Option<Subspace>, DirectoryError> {
388        if key.starts_with(self.node_subspace.bytes()) {
389            return Ok(Some(self.root_node.clone()));
390        }
391
392        let mut key_after = key.to_vec();
393        // pushing 0x00 to simulate keyAfter
394        key_after.push(0);
395
396        let range_end = self.node_subspace.pack(&key_after);
397
398        let mut range_option = RangeOption::from((self.node_subspace.range().0, range_end));
399        range_option.reverse = true;
400        range_option.limit = Some(1);
401
402        // checking range
403        let fdb_value = trx
404            .get_ranges_keyvalues(range_option, snapshot)
405            .try_next()
406            .await?;
407
408        if let Some(fdb_key_value) = fdb_value {
409            let previous_prefix: Vec<Element> = self.node_subspace.unpack(fdb_key_value.key())?;
410
411            if let Some(Element::Bytes(previous_prefix)) = previous_prefix.first() {
412                if key.starts_with(previous_prefix) {
413                    return Ok(Some(self.node_with_prefix(previous_prefix)));
414                };
415            };
416        }
417        Ok(None)
418    }
419
420    async fn get_prefix(
421        &self,
422        trx: &Transaction,
423        prefix: Option<&[u8]>,
424    ) -> Result<Vec<u8>, DirectoryError> {
425        match prefix {
426            None => {
427                // no prefix provided, allocating one
428                let allocator = self.allocator.allocate(trx).await?;
429                let subspace = self.content_subspace.subspace(&allocator);
430
431                // checking range
432                let mut range_option = RangeOption::from(subspace.range());
433                range_option.limit = Some(1);
434
435                let result = trx
436                    .get_ranges_keyvalues(range_option, false)
437                    .try_next()
438                    .await?;
439
440                if result.is_some() {
441                    return Err(DirectoryError::PrefixNotEmpty);
442                }
443
444                Ok(subspace.into_bytes())
445            }
446            Some(v) => Ok(v.to_vec()),
447        }
448    }
449
450    /// `check_version` is checking the Directory's version in FDB.
451    async fn check_version(
452        &self,
453        trx: &Transaction,
454        allow_creation: bool,
455    ) -> Result<(), DirectoryError> {
456        let version = self.get_version_value(trx).await?;
457        match version {
458            None => {
459                if allow_creation {
460                    self.initialize_directory(trx).await
461                } else {
462                    Ok(())
463                }
464            }
465            Some(versions) => {
466                if versions.len() < 12 {
467                    return Err(DirectoryError::Version(
468                        "incorrect version length".to_string(),
469                    ));
470                }
471                let mut arr = [0u8; 4];
472                arr.copy_from_slice(&versions[0..4]);
473                let major: u32 = u32::from_le_bytes(arr);
474
475                arr.copy_from_slice(&versions[4..8]);
476                let minor: u32 = u32::from_le_bytes(arr);
477
478                arr.copy_from_slice(&versions[8..12]);
479                let patch: u32 = u32::from_le_bytes(arr);
480
481                if major > MAJOR_VERSION {
482                    let msg = format!(
483                        "cannot load directory with version {major}.{minor}.{patch} using directory layer {MAJOR_VERSION}.{MINOR_VERSION}.{PATCH_VERSION}"
484                    );
485                    return Err(DirectoryError::Version(msg));
486                }
487
488                if minor > MINOR_VERSION {
489                    let msg = format!(
490                        "directory with version {major}.{minor}.{patch} is read-only when opened using directory layer {MAJOR_VERSION}.{MINOR_VERSION}.{PATCH_VERSION}"
491                    );
492                    return Err(DirectoryError::Version(msg));
493                }
494
495                Ok(())
496            }
497        }
498    }
499
500    /// `initialize_directory` is initializing the directory
501    async fn initialize_directory(&self, trx: &Transaction) -> Result<(), DirectoryError> {
502        let mut value = vec![];
503        value.extend(&MAJOR_VERSION.to_le_bytes());
504        value.extend(&MINOR_VERSION.to_le_bytes());
505        value.extend(&PATCH_VERSION.to_le_bytes());
506        let version_subspace: &[u8] = b"version";
507        let directory_version_key = self.root_node.subspace(&version_subspace);
508        trx.set(directory_version_key.bytes(), &value);
509
510        Ok(())
511    }
512
513    async fn get_version_value(&self, trx: &Transaction) -> FdbResult<Option<FdbSlice>> {
514        let version_subspace: &[u8] = b"version";
515        let version_key = self.root_node.subspace(&version_subspace);
516
517        trx.get(version_key.bytes(), false).await
518    }
519
520    async fn exists_internal(
521        &self,
522        trx: &Transaction,
523        path: &[String],
524    ) -> Result<bool, DirectoryError> {
525        self.check_version(trx, false).await?;
526
527        match self.find(trx, path).await? {
528            None => Ok(false),
529            Some(node) if node.is_in_partition(false) => {
530                node.get_contents()?
531                    .exists(trx, &node.get_partition_subpath())
532                    .await
533            }
534            Some(_node) => Ok(true),
535        }
536    }
537
538    async fn list_internal(
539        &self,
540        trx: &Transaction,
541        path: &[String],
542    ) -> Result<Vec<String>, DirectoryError> {
543        self.check_version(trx, false).await?;
544
545        let node = self
546            .find(trx, path)
547            .await?
548            .ok_or(DirectoryError::PathDoesNotExists)?;
549        if node.is_in_partition(true) {
550            match node.get_contents()? {
551                DirectoryOutput::DirectorySubspace(_) => unreachable!("already in partition"),
552                DirectoryOutput::DirectoryPartition(directory_partition) => {
553                    return directory_partition
554                        .directory_subspace
555                        .directory_layer
556                        .list(trx, &node.get_partition_subpath())
557                        .await;
558                }
559            };
560        }
561
562        node.list_sub_folders(trx).await
563    }
564
565    async fn move_to_internal(
566        &self,
567        trx: &Transaction,
568        old_path: &[String],
569        new_path: &[String],
570    ) -> Result<DirectoryOutput, DirectoryError> {
571        self.check_version(trx, true).await?;
572
573        if old_path.len() <= new_path.len()
574            && compare_slice(old_path, &new_path[..old_path.len()]) == Ordering::Equal
575        {
576            return Err(DirectoryError::CannotMoveBetweenSubdirectory);
577        }
578
579        let old_node = self.find(trx, old_path).await?;
580        let new_node = self.find(trx, new_path).await?;
581
582        let old_node = match old_node {
583            None => return Err(DirectoryError::PathDoesNotExists),
584            Some(n) => n,
585        };
586        let old_node_exists_in_partition = old_node.is_in_partition(false);
587
588        match new_node {
589            None => {
590                if old_node_exists_in_partition {
591                    return Err(DirectoryError::CannotMoveBetweenPartition);
592                }
593            }
594            Some(new_node) => {
595                let new_node_exists_in_partition = new_node.is_in_partition(false);
596                if old_node_exists_in_partition || new_node_exists_in_partition {
597                    if !old_node_exists_in_partition
598                        || !new_node_exists_in_partition
599                        || !old_node.current_path.eq(&new_node.current_path)
600                    {
601                        return Err(DirectoryError::CannotMoveBetweenPartition);
602                    }
603
604                    return new_node
605                        .get_contents()?
606                        .move_to(
607                            trx,
608                            &old_node.get_partition_subpath(),
609                            &new_node.get_partition_subpath(),
610                        )
611                        .await;
612                }
613                return Err(DirectoryError::DirAlreadyExists);
614            }
615        }
616
617        let (new_path_last, parent_path) = new_path
618            .split_last()
619            .ok_or(DirectoryError::DirAlreadyExists)?;
620
621        let parent_node = self
622            .find(trx, parent_path)
623            .await?
624            .ok_or(DirectoryError::ParentDirDoesNotExists)?;
625
626        let key = parent_node
627            .subspace
628            .subspace(&(DEFAULT_SUB_DIRS, new_path_last));
629        let value: Vec<u8> = self.node_subspace.unpack(old_node.subspace.bytes())?;
630        trx.set(key.bytes(), &value);
631
632        self.remove_from_parent(trx, old_path).await?;
633
634        self.contents_of_node(&old_node.subspace, new_path, &old_node.layer)
635    }
636
637    async fn remove_from_parent(
638        &self,
639        trx: &Transaction,
640        path: &[String],
641    ) -> Result<(), DirectoryError> {
642        let (last_element, parent_path) = path
643            .split_last()
644            .ok_or(DirectoryError::BadDestinationDirectory)?;
645
646        match self.find(trx, parent_path).await? {
647            None => {}
648            Some(parent_node) => {
649                let key = parent_node.subspace.pack(&(DEFAULT_SUB_DIRS, last_element));
650                trx.clear(&key);
651            }
652        }
653
654        Ok(())
655    }
656
657    #[async_recursion]
658    async fn remove_internal(
659        &self,
660        trx: &Transaction,
661        path: &[String],
662        fail_on_nonexistent: bool,
663    ) -> Result<bool, DirectoryError> {
664        self.check_version(trx, true).await?;
665
666        if path.is_empty() {
667            return Err(DirectoryError::CannotModifyRootDirectory);
668        }
669
670        let node = match self.find(trx, path).await? {
671            Some(node) => node,
672            None if fail_on_nonexistent => return Err(DirectoryError::DirectoryDoesNotExists),
673            None => return Ok(false),
674        };
675
676        if node.is_in_partition(false) {
677            match node.get_contents()? {
678                DirectoryOutput::DirectorySubspace(_) => {
679                    unreachable!("already directory partition")
680                }
681                DirectoryOutput::DirectoryPartition(d) => {
682                    return d
683                        .directory_subspace
684                        .directory_layer
685                        .remove_internal(trx, &node.get_partition_subpath(), fail_on_nonexistent)
686                        .await;
687                }
688            }
689        }
690
691        self.remove_recursive(trx, &node.subspace).await?;
692        self.remove_from_parent(trx, path).await?;
693
694        Ok(true)
695    }
696
697    #[async_recursion]
698    async fn remove_recursive(
699        &self,
700        trx: &Transaction,
701        node_sub: &Subspace,
702    ) -> Result<(), DirectoryError> {
703        let sub_dir = node_sub.subspace(&DEFAULT_SUB_DIRS);
704        let range_option = RangeOption::from(&sub_dir);
705
706        let mut stream = std::pin::pin!(trx.get_ranges_keyvalues(range_option, false));
707        while let Some(row_key) = stream.try_next().await? {
708            let sub_node = self.node_with_prefix(&row_key.value());
709            self.remove_recursive(trx, &sub_node).await?;
710        }
711
712        let node_prefix: Vec<u8> = self.node_subspace.unpack(node_sub.bytes())?;
713
714        trx.clear_range(&node_prefix, &strinc(node_prefix.to_owned()));
715        trx.clear_subspace_range(node_sub);
716
717        Ok(())
718    }
719}
720
721#[async_trait]
722impl Directory for DirectoryLayer {
723    async fn create_or_open(
724        &self,
725        txn: &Transaction,
726        path: &[String],
727        prefix: Option<&[u8]>,
728        layer: Option<&[u8]>,
729    ) -> Result<DirectoryOutput, DirectoryError> {
730        self.create_or_open_internal(txn, path, prefix, layer, true, true)
731            .await
732    }
733
734    async fn create(
735        &self,
736        txn: &Transaction,
737        path: &[String],
738        prefix: Option<&[u8]>,
739        layer: Option<&[u8]>,
740    ) -> Result<DirectoryOutput, DirectoryError> {
741        self.create_or_open_internal(txn, path, prefix, layer, true, false)
742            .await
743    }
744
745    async fn open(
746        &self,
747        txn: &Transaction,
748        path: &[String],
749        layer: Option<&[u8]>,
750    ) -> Result<DirectoryOutput, DirectoryError> {
751        self.create_or_open_internal(txn, path, None, layer, false, true)
752            .await
753    }
754
755    async fn exists(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
756        self.exists_internal(trx, path).await
757    }
758
759    async fn move_directory(
760        &self,
761        _trx: &Transaction,
762        _new_path: &[String],
763    ) -> Result<DirectoryOutput, DirectoryError> {
764        Err(DirectoryError::CannotMoveRootDirectory)
765    }
766
767    /// `move_to` the directory from old_path to new_path(both relative to this
768    /// Directory), and returns the directory (at its new location) and its
769    /// contents as a Subspace. Move will return an error if a directory
770    /// does not exist at oldPath, a directory already exists at newPath, or the
771    /// parent directory of newPath does not exist.
772    async fn move_to(
773        &self,
774        trx: &Transaction,
775        old_path: &[String],
776        new_path: &[String],
777    ) -> Result<DirectoryOutput, DirectoryError> {
778        self.move_to_internal(trx, old_path, new_path).await
779    }
780
781    async fn remove(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
782        self.remove_internal(trx, path, true).await
783    }
784
785    async fn remove_if_exists(
786        &self,
787        trx: &Transaction,
788        path: &[String],
789    ) -> Result<bool, DirectoryError> {
790        self.remove_internal(trx, path, false).await
791    }
792
793    async fn list(
794        &self,
795        trx: &Transaction,
796        path: &[String],
797    ) -> Result<Vec<String>, DirectoryError> {
798        self.list_internal(trx, path).await
799    }
800}