foundationdb/directory/
directory_partition.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! A resulting Subspace whose prefix is preprended to all of its descendant directories's prefixes.
10
11use crate::directory::directory_layer::{DirectoryLayer, DEFAULT_NODE_PREFIX, PARTITION_LAYER};
12use crate::directory::directory_subspace::DirectorySubspace;
13use crate::directory::error::DirectoryError;
14use crate::directory::{Directory, DirectoryOutput};
15use crate::tuple::Subspace;
16use crate::Transaction;
17use async_trait::async_trait;
18use std::ops::Deref;
19use std::sync::Arc;
20
21/// A `DirectoryPartition` is a DirectorySubspace whose prefix is preprended to all of its descendant
22/// directories's prefixes. It cannot be used as a Subspace. Instead, you must create at
23/// least one subdirectory to store content.
24#[derive(Clone)]
25pub struct DirectoryPartition {
26    pub(crate) inner: Arc<DirectoryPartitionInner>,
27}
28
29#[derive(Debug)]
30pub struct DirectoryPartitionInner {
31    pub(crate) directory_subspace: DirectorySubspace,
32    pub(crate) parent_directory_layer: DirectoryLayer,
33}
34
35impl Deref for DirectoryPartition {
36    type Target = DirectoryPartitionInner;
37
38    fn deref(&self) -> &Self::Target {
39        &self.inner
40    }
41}
42
43impl std::fmt::Debug for DirectoryPartition {
44    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45        self.inner.fmt(f)
46    }
47}
48
49impl DirectoryPartition {
50    // https://github.com/apple/foundationdb/blob/master/bindings/flow/DirectoryPartition.h#L34-L43
51    pub(crate) fn new(
52        path: &[String],
53        prefix: Vec<u8>,
54        parent_directory_layer: DirectoryLayer,
55    ) -> Self {
56        let mut node_subspace_bytes = vec![];
57        node_subspace_bytes.extend_from_slice(&prefix);
58        node_subspace_bytes.extend_from_slice(DEFAULT_NODE_PREFIX);
59
60        let new_directory_layer = DirectoryLayer::new_with_path(
61            Subspace::from_bytes(node_subspace_bytes),
62            Subspace::from_bytes(prefix.as_slice()),
63            false,
64            path,
65        );
66
67        DirectoryPartition {
68            inner: Arc::new(DirectoryPartitionInner {
69                directory_subspace: DirectorySubspace::new(
70                    path,
71                    prefix,
72                    &new_directory_layer,
73                    Vec::from(PARTITION_LAYER),
74                ),
75                parent_directory_layer,
76            }),
77        }
78    }
79}
80
81impl DirectoryPartition {
82    pub fn get_path(&self) -> &[String] {
83        self.inner.directory_subspace.get_path()
84    }
85
86    fn get_directory_layer_for_path(&self, path: &[String]) -> DirectoryLayer {
87        if path.is_empty() {
88            self.parent_directory_layer.clone()
89        } else {
90            self.directory_subspace.directory_layer.clone()
91        }
92    }
93
94    fn get_partition_subpath(
95        &self,
96        path: &[String],
97        directory_layer: Option<DirectoryLayer>,
98    ) -> Result<Vec<String>, DirectoryError> {
99        let directory = match directory_layer {
100            None => self.directory_subspace.directory_layer.clone(),
101            Some(d) => d,
102        };
103
104        if directory.path.len() > self.directory_subspace.get_path().len() {
105            return Err(DirectoryError::CannotCreateSubpath);
106        }
107
108        let mut new_path = vec![];
109
110        new_path.extend_from_slice(&self.directory_subspace.get_path()[directory.path.len()..]);
111        new_path.extend_from_slice(path);
112
113        Ok(new_path)
114    }
115
116    pub fn get_layer(&self) -> &[u8] {
117        b"partition"
118    }
119}
120
121#[async_trait]
122impl Directory for DirectoryPartition {
123    async fn create_or_open(
124        &self,
125        txn: &Transaction,
126        path: &[String],
127        prefix: Option<&[u8]>,
128        layer: Option<&[u8]>,
129    ) -> Result<DirectoryOutput, DirectoryError> {
130        self.inner
131            .directory_subspace
132            .create_or_open(txn, path, prefix, layer)
133            .await
134    }
135
136    async fn create(
137        &self,
138        txn: &Transaction,
139        path: &[String],
140        prefix: Option<&[u8]>,
141        layer: Option<&[u8]>,
142    ) -> Result<DirectoryOutput, DirectoryError> {
143        self.inner
144            .directory_subspace
145            .create(txn, path, prefix, layer)
146            .await
147    }
148
149    async fn open(
150        &self,
151        txn: &Transaction,
152        path: &[String],
153        layer: Option<&[u8]>,
154    ) -> Result<DirectoryOutput, DirectoryError> {
155        self.inner.directory_subspace.open(txn, path, layer).await
156    }
157
158    async fn exists(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
159        let directory_layer = self.get_directory_layer_for_path(path);
160
161        directory_layer
162            .exists(
163                trx,
164                &self.get_partition_subpath(path, Some(directory_layer.clone()))?,
165            )
166            .await
167    }
168
169    async fn move_directory(
170        &self,
171        trx: &Transaction,
172        new_path: &[String],
173    ) -> Result<DirectoryOutput, DirectoryError> {
174        let directory_layer = self.get_directory_layer_for_path(&[]);
175        let directory_layer_path = &directory_layer.path;
176
177        if directory_layer_path.len() > new_path.len() {
178            return Err(DirectoryError::CannotMoveBetweenPartition);
179        }
180
181        for (i, path) in directory_layer_path.iter().enumerate() {
182            match new_path.get(i) {
183                None => return Err(DirectoryError::CannotMoveBetweenPartition),
184                Some(new_path_item) => {
185                    if !new_path_item.eq(path) {
186                        return Err(DirectoryError::CannotMoveBetweenPartition);
187                    }
188                }
189            }
190        }
191
192        let mut new_relative_path = vec![];
193        new_relative_path.extend_from_slice(&new_path[directory_layer_path.len()..]);
194
195        directory_layer
196            .move_to(
197                trx,
198                &self.get_partition_subpath(&[], Some(directory_layer.clone()))?,
199                &new_relative_path,
200            )
201            .await
202    }
203
204    async fn move_to(
205        &self,
206        trx: &Transaction,
207        old_path: &[String],
208        new_path: &[String],
209    ) -> Result<DirectoryOutput, DirectoryError> {
210        self.inner
211            .directory_subspace
212            .move_to(trx, old_path, new_path)
213            .await
214    }
215
216    async fn remove(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
217        let directory_layer = self.get_directory_layer_for_path(path);
218        directory_layer
219            .remove(
220                trx,
221                &self.get_partition_subpath(path, Some(directory_layer.clone()))?,
222            )
223            .await
224    }
225
226    async fn remove_if_exists(
227        &self,
228        trx: &Transaction,
229        path: &[String],
230    ) -> Result<bool, DirectoryError> {
231        let directory_layer = self.get_directory_layer_for_path(path);
232        directory_layer
233            .remove_if_exists(
234                trx,
235                &self.get_partition_subpath(path, Some(directory_layer.clone()))?,
236            )
237            .await
238    }
239
240    async fn list(
241        &self,
242        trx: &Transaction,
243        path: &[String],
244    ) -> Result<Vec<String>, DirectoryError> {
245        self.inner.directory_subspace.list(trx, path).await
246    }
247}