1use crate::directory::directory_layer::{DirectoryLayer, DEFAULT_NODE_PREFIX, PARTITION_LAYER};
12use crate::directory::directory_subspace::DirectorySubspace;
13use crate::directory::error::DirectoryError;
14use crate::directory::{Directory, DirectoryOutput};
15use crate::tuple::Subspace;
16use crate::Transaction;
17use async_trait::async_trait;
18use std::ops::Deref;
19use std::sync::Arc;
20
21#[derive(Clone)]
25pub struct DirectoryPartition {
26 pub(crate) inner: Arc<DirectoryPartitionInner>,
27}
28
29#[derive(Debug)]
30pub struct DirectoryPartitionInner {
31 pub(crate) directory_subspace: DirectorySubspace,
32 pub(crate) parent_directory_layer: DirectoryLayer,
33}
34
35impl Deref for DirectoryPartition {
36 type Target = DirectoryPartitionInner;
37
38 fn deref(&self) -> &Self::Target {
39 &self.inner
40 }
41}
42
43impl std::fmt::Debug for DirectoryPartition {
44 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
45 self.inner.fmt(f)
46 }
47}
48
49impl DirectoryPartition {
50 pub(crate) fn new(
52 path: &[String],
53 prefix: Vec<u8>,
54 parent_directory_layer: DirectoryLayer,
55 ) -> Self {
56 let mut node_subspace_bytes = vec![];
57 node_subspace_bytes.extend_from_slice(&prefix);
58 node_subspace_bytes.extend_from_slice(DEFAULT_NODE_PREFIX);
59
60 let new_directory_layer = DirectoryLayer::new_with_path(
61 Subspace::from_bytes(node_subspace_bytes),
62 Subspace::from_bytes(prefix.as_slice()),
63 false,
64 path,
65 );
66
67 DirectoryPartition {
68 inner: Arc::new(DirectoryPartitionInner {
69 directory_subspace: DirectorySubspace::new(
70 path,
71 prefix,
72 &new_directory_layer,
73 Vec::from(PARTITION_LAYER),
74 ),
75 parent_directory_layer,
76 }),
77 }
78 }
79}
80
81impl DirectoryPartition {
82 pub fn get_path(&self) -> &[String] {
83 self.inner.directory_subspace.get_path()
84 }
85
86 fn get_directory_layer_for_path(&self, path: &[String]) -> DirectoryLayer {
87 if path.is_empty() {
88 self.parent_directory_layer.clone()
89 } else {
90 self.directory_subspace.directory_layer.clone()
91 }
92 }
93
94 fn get_partition_subpath(
95 &self,
96 path: &[String],
97 directory_layer: Option<DirectoryLayer>,
98 ) -> Result<Vec<String>, DirectoryError> {
99 let directory = match directory_layer {
100 None => self.directory_subspace.directory_layer.clone(),
101 Some(d) => d,
102 };
103
104 if directory.path.len() > self.directory_subspace.get_path().len() {
105 return Err(DirectoryError::CannotCreateSubpath);
106 }
107
108 let mut new_path = vec![];
109
110 new_path.extend_from_slice(&self.directory_subspace.get_path()[directory.path.len()..]);
111 new_path.extend_from_slice(path);
112
113 Ok(new_path)
114 }
115
116 pub fn get_layer(&self) -> &[u8] {
117 b"partition"
118 }
119}
120
121#[async_trait]
122impl Directory for DirectoryPartition {
123 async fn create_or_open(
124 &self,
125 txn: &Transaction,
126 path: &[String],
127 prefix: Option<&[u8]>,
128 layer: Option<&[u8]>,
129 ) -> Result<DirectoryOutput, DirectoryError> {
130 self.inner
131 .directory_subspace
132 .create_or_open(txn, path, prefix, layer)
133 .await
134 }
135
136 async fn create(
137 &self,
138 txn: &Transaction,
139 path: &[String],
140 prefix: Option<&[u8]>,
141 layer: Option<&[u8]>,
142 ) -> Result<DirectoryOutput, DirectoryError> {
143 self.inner
144 .directory_subspace
145 .create(txn, path, prefix, layer)
146 .await
147 }
148
149 async fn open(
150 &self,
151 txn: &Transaction,
152 path: &[String],
153 layer: Option<&[u8]>,
154 ) -> Result<DirectoryOutput, DirectoryError> {
155 self.inner.directory_subspace.open(txn, path, layer).await
156 }
157
158 async fn exists(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
159 let directory_layer = self.get_directory_layer_for_path(path);
160
161 directory_layer
162 .exists(
163 trx,
164 &self.get_partition_subpath(path, Some(directory_layer.clone()))?,
165 )
166 .await
167 }
168
169 async fn move_directory(
170 &self,
171 trx: &Transaction,
172 new_path: &[String],
173 ) -> Result<DirectoryOutput, DirectoryError> {
174 let directory_layer = self.get_directory_layer_for_path(&[]);
175 let directory_layer_path = &directory_layer.path;
176
177 if directory_layer_path.len() > new_path.len() {
178 return Err(DirectoryError::CannotMoveBetweenPartition);
179 }
180
181 for (i, path) in directory_layer_path.iter().enumerate() {
182 match new_path.get(i) {
183 None => return Err(DirectoryError::CannotMoveBetweenPartition),
184 Some(new_path_item) => {
185 if !new_path_item.eq(path) {
186 return Err(DirectoryError::CannotMoveBetweenPartition);
187 }
188 }
189 }
190 }
191
192 let mut new_relative_path = vec![];
193 new_relative_path.extend_from_slice(&new_path[directory_layer_path.len()..]);
194
195 directory_layer
196 .move_to(
197 trx,
198 &self.get_partition_subpath(&[], Some(directory_layer.clone()))?,
199 &new_relative_path,
200 )
201 .await
202 }
203
204 async fn move_to(
205 &self,
206 trx: &Transaction,
207 old_path: &[String],
208 new_path: &[String],
209 ) -> Result<DirectoryOutput, DirectoryError> {
210 self.inner
211 .directory_subspace
212 .move_to(trx, old_path, new_path)
213 .await
214 }
215
216 async fn remove(&self, trx: &Transaction, path: &[String]) -> Result<bool, DirectoryError> {
217 let directory_layer = self.get_directory_layer_for_path(path);
218 directory_layer
219 .remove(
220 trx,
221 &self.get_partition_subpath(path, Some(directory_layer.clone()))?,
222 )
223 .await
224 }
225
226 async fn remove_if_exists(
227 &self,
228 trx: &Transaction,
229 path: &[String],
230 ) -> Result<bool, DirectoryError> {
231 let directory_layer = self.get_directory_layer_for_path(path);
232 directory_layer
233 .remove_if_exists(
234 trx,
235 &self.get_partition_subpath(path, Some(directory_layer.clone()))?,
236 )
237 .await
238 }
239
240 async fn list(
241 &self,
242 trx: &Transaction,
243 path: &[String],
244 ) -> Result<Vec<String>, DirectoryError> {
245 self.inner.directory_subspace.list(trx, path).await
246 }
247}