1use std::convert::TryFrom;
26use std::ffi::CStr;
27use std::fmt;
28use std::ops::Deref;
29use std::os::raw::c_char;
30use std::pin::Pin;
31use std::ptr::NonNull;
32use std::sync::Arc;
33
34#[cfg_api_versions(min = 700)]
35pub use crate::fdb_keys::FdbKeys;
36use crate::from_raw_fdb_slice;
37#[cfg_api_versions(min = 710)]
38pub use crate::mapped_key_values::MappedKeyValues;
39use fdb_sys::if_cfg_api_versions;
40use foundationdb_macros::cfg_api_versions;
41use foundationdb_sys as fdb_sys;
42use futures::prelude::*;
43use futures::task::{AtomicWaker, Context, Poll};
44
45use crate::{error, FdbError, FdbResult};
46
47pub(crate) struct FdbFutureHandle(NonNull<fdb_sys::FDBFuture>);
49
50impl FdbFutureHandle {
51 pub const fn as_ptr(&self) -> *mut fdb_sys::FDBFuture {
52 self.0.as_ptr()
53 }
54}
55unsafe impl Sync for FdbFutureHandle {}
56unsafe impl Send for FdbFutureHandle {}
57impl Drop for FdbFutureHandle {
58 fn drop(&mut self) {
59 unsafe { fdb_sys::fdb_future_destroy(self.as_ptr()) }
62 }
63}
64
65pub(crate) struct FdbFuture<T> {
70 f: Option<FdbFutureHandle>,
71 waker: Option<Arc<AtomicWaker>>,
72 phantom: std::marker::PhantomData<T>,
73}
74
75impl<T> FdbFuture<T>
76where
77 T: TryFrom<FdbFutureHandle, Error = FdbError> + Unpin,
78{
79 pub(crate) fn new(f: *mut fdb_sys::FDBFuture) -> Self {
80 Self {
81 f: Some(FdbFutureHandle(
82 NonNull::new(f).expect("FDBFuture to not be null"),
83 )),
84 waker: None,
85 phantom: std::marker::PhantomData,
86 }
87 }
88}
89
90impl<T> Future for FdbFuture<T>
91where
92 T: TryFrom<FdbFutureHandle, Error = FdbError> + Unpin,
93{
94 type Output = FdbResult<T>;
95
96 #[cfg_attr(
97 feature = "trace",
98 tracing::instrument(level = "debug", skip(self, cx))
99 )]
100 fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<FdbResult<T>> {
101 let f = self.f.as_ref().expect("cannot poll after resolve");
102 let ready = unsafe { fdb_sys::fdb_future_is_ready(f.as_ptr()) };
103 if ready == 0 {
104 let f_ptr = f.as_ptr();
105 let mut register = false;
106 let waker = self.waker.get_or_insert_with(|| {
107 register = true;
108 Arc::new(AtomicWaker::new())
109 });
110 waker.register(cx.waker());
111 if register {
112 let network_waker: Arc<AtomicWaker> = waker.clone();
113 let network_waker_ptr = Arc::into_raw(network_waker);
114 unsafe {
115 fdb_sys::fdb_future_set_callback(
116 f_ptr,
117 Some(fdb_future_callback),
118 network_waker_ptr as *mut _,
119 );
120 }
121 }
122 Poll::Pending
123 } else {
124 Poll::Ready(
125 error::eval(unsafe { fdb_sys::fdb_future_get_error(f.as_ptr()) })
126 .and_then(|()| T::try_from(self.f.take().expect("self.f.is_some()"))),
127 )
128 }
129 }
130}
131
132pub static CUSTOM_EXECUTOR_HOOK: std::sync::OnceLock<fn()> = std::sync::OnceLock::new();
133
134extern "C" fn fdb_future_callback(
137 _f: *mut fdb_sys::FDBFuture,
138 callback_parameter: *mut ::std::os::raw::c_void,
139) {
140 let network_waker: Arc<AtomicWaker> = unsafe { Arc::from_raw(callback_parameter as *const _) };
142 network_waker.wake();
143
144 if let Some(poll_pending_tasks) = CUSTOM_EXECUTOR_HOOK.get() {
146 poll_pending_tasks();
147 }
148}
149
150pub struct FdbSlice {
152 _f: FdbFutureHandle,
153 value: *const u8,
154 len: i32,
155}
156unsafe impl Sync for FdbSlice {}
157unsafe impl Send for FdbSlice {}
158
159impl Deref for FdbSlice {
160 type Target = [u8];
161 fn deref(&self) -> &Self::Target {
162 from_raw_fdb_slice(self.value, self.len as usize)
163 }
164}
165impl AsRef<[u8]> for FdbSlice {
166 fn as_ref(&self) -> &[u8] {
167 self.deref()
168 }
169}
170
171impl TryFrom<FdbFutureHandle> for FdbSlice {
172 type Error = FdbError;
173
174 fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
175 let mut value = std::ptr::null();
176 let mut len = 0;
177
178 error::eval(unsafe { fdb_sys::fdb_future_get_key(f.as_ptr(), &mut value, &mut len) })?;
179
180 Ok(FdbSlice { _f: f, value, len })
181 }
182}
183
184impl TryFrom<FdbFutureHandle> for Option<FdbSlice> {
185 type Error = FdbError;
186
187 fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
188 let mut present = 0;
189 let mut value = std::ptr::null();
190 let mut len = 0;
191
192 error::eval(unsafe {
193 fdb_sys::fdb_future_get_value(f.as_ptr(), &mut present, &mut value, &mut len)
194 })?;
195
196 Ok(if present == 0 {
197 None
198 } else {
199 Some(FdbSlice { _f: f, value, len })
200 })
201 }
202}
203
204pub struct FdbAddresses {
206 _f: FdbFutureHandle,
207 strings: *const FdbAddress,
208 len: i32,
209}
210unsafe impl Sync for FdbAddresses {}
211unsafe impl Send for FdbAddresses {}
212
213impl TryFrom<FdbFutureHandle> for FdbAddresses {
214 type Error = FdbError;
215
216 fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
217 let mut strings: *mut *const c_char = std::ptr::null_mut();
218 let mut len = 0;
219
220 error::eval(unsafe {
221 fdb_sys::fdb_future_get_string_array(f.as_ptr(), &mut strings, &mut len)
222 })?;
223
224 Ok(FdbAddresses {
225 _f: f,
226 strings: strings as *const FdbAddress,
227 len,
228 })
229 }
230}
231
232impl Deref for FdbAddresses {
233 type Target = [FdbAddress];
234
235 fn deref(&self) -> &Self::Target {
236 assert_eq_size!(FdbAddress, *const c_char);
237 assert_eq_align!(FdbAddress, u8);
238 from_raw_fdb_slice(self.strings, self.len as usize)
239 }
240}
241impl AsRef<[FdbAddress]> for FdbAddresses {
242 fn as_ref(&self) -> &[FdbAddress] {
243 self.deref()
244 }
245}
246
247#[repr(C, packed)]
254pub struct FdbAddress {
255 c_str: *const c_char,
256}
257
258impl Deref for FdbAddress {
259 type Target = CStr;
260
261 fn deref(&self) -> &CStr {
262 unsafe { std::ffi::CStr::from_ptr(self.c_str) }
263 }
264}
265impl AsRef<CStr> for FdbAddress {
266 fn as_ref(&self) -> &CStr {
267 self.deref()
268 }
269}
270
271pub struct FdbValues {
273 _f: FdbFutureHandle,
274 keyvalues: *const FdbKeyValue,
275 len: i32,
276 more: bool,
277}
278unsafe impl Sync for FdbValues {}
279unsafe impl Send for FdbValues {}
280
281impl FdbValues {
282 pub fn more(&self) -> bool {
284 self.more
285 }
286}
287
288impl TryFrom<FdbFutureHandle> for FdbValues {
289 type Error = FdbError;
290 fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
291 let mut keyvalues = std::ptr::null();
292 let mut len = 0;
293 let mut more = 0;
294
295 unsafe {
296 error::eval(fdb_sys::fdb_future_get_keyvalue_array(
297 f.as_ptr(),
298 &mut keyvalues,
299 &mut len,
300 &mut more,
301 ))?
302 }
303
304 Ok(FdbValues {
305 _f: f,
306 keyvalues: keyvalues as *const FdbKeyValue,
307 len,
308 more: more != 0,
309 })
310 }
311}
312
313impl Deref for FdbValues {
314 type Target = [FdbKeyValue];
315 fn deref(&self) -> &Self::Target {
316 assert_eq_size!(FdbKeyValue, fdb_sys::FDBKeyValue);
317 assert_eq_align!(FdbKeyValue, u8);
318
319 from_raw_fdb_slice(self.keyvalues, self.len as usize)
320 }
321}
322impl AsRef<[FdbKeyValue]> for FdbValues {
323 fn as_ref(&self) -> &[FdbKeyValue] {
324 self.deref()
325 }
326}
327
328impl<'a> IntoIterator for &'a FdbValues {
329 type Item = &'a FdbKeyValue;
330 type IntoIter = std::slice::Iter<'a, FdbKeyValue>;
331
332 fn into_iter(self) -> Self::IntoIter {
333 self.deref().iter()
334 }
335}
336impl IntoIterator for FdbValues {
337 type Item = FdbValue;
338 type IntoIter = FdbValuesIter;
339
340 fn into_iter(self) -> Self::IntoIter {
341 FdbValuesIter {
342 f: Arc::new(self._f),
343 keyvalues: self.keyvalues,
344 len: self.len,
345 pos: 0,
346 }
347 }
348}
349
350pub struct FdbValuesIter {
352 f: Arc<FdbFutureHandle>,
353 keyvalues: *const FdbKeyValue,
354 len: i32,
355 pos: i32,
356}
357
358unsafe impl Send for FdbValuesIter {}
359
360impl Iterator for FdbValuesIter {
361 type Item = FdbValue;
362 fn next(&mut self) -> Option<Self::Item> {
363 #[allow(clippy::iter_nth_zero)]
364 self.nth(0)
365 }
366
367 fn nth(&mut self, n: usize) -> Option<Self::Item> {
368 let pos = (self.pos as usize).checked_add(n);
369 match pos {
370 Some(pos) if pos < self.len as usize => {
371 let keyvalue = unsafe { self.keyvalues.add(pos) };
373 self.pos = pos as i32 + 1;
374
375 Some(FdbValue {
376 _f: self.f.clone(),
377 keyvalue,
378 })
379 }
380 _ => {
381 self.pos = self.len;
382 None
383 }
384 }
385 }
386
387 fn size_hint(&self) -> (usize, Option<usize>) {
388 let rem = (self.len - self.pos) as usize;
389 (rem, Some(rem))
390 }
391}
392impl ExactSizeIterator for FdbValuesIter {
393 #[inline]
394 fn len(&self) -> usize {
395 (self.len - self.pos) as usize
396 }
397}
398impl DoubleEndedIterator for FdbValuesIter {
399 fn next_back(&mut self) -> Option<Self::Item> {
400 self.nth_back(0)
401 }
402
403 fn nth_back(&mut self, n: usize) -> Option<Self::Item> {
404 if n < self.len() {
405 self.len -= 1 + n as i32;
406 let keyvalue = unsafe { self.keyvalues.add(self.len as usize) };
408 Some(FdbValue {
409 _f: self.f.clone(),
410 keyvalue,
411 })
412 } else {
413 self.pos = self.len;
414 None
415 }
416 }
417}
418
419pub struct FdbValue {
424 _f: Arc<FdbFutureHandle>,
425 keyvalue: *const FdbKeyValue,
426}
427
428unsafe impl Send for FdbValue {}
429
430impl Deref for FdbValue {
431 type Target = FdbKeyValue;
432 fn deref(&self) -> &Self::Target {
433 assert_eq_size!(FdbKeyValue, fdb_sys::FDBKeyValue);
434 assert_eq_align!(FdbKeyValue, u8);
435 unsafe { &*self.keyvalue }
436 }
437}
438impl AsRef<FdbKeyValue> for FdbValue {
439 fn as_ref(&self) -> &FdbKeyValue {
440 self.deref()
441 }
442}
443impl PartialEq for FdbValue {
444 fn eq(&self, other: &Self) -> bool {
445 self.deref() == other.deref()
446 }
447}
448impl Eq for FdbValue {}
449impl fmt::Debug for FdbValue {
450 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
451 self.deref().fmt(f)
452 }
453}
454
455#[repr(C, packed)]
462pub struct FdbKeyValue(fdb_sys::FDBKeyValue);
463
464impl FdbKeyValue {
465 pub fn key(&self) -> &[u8] {
467 #[allow(clippy::unnecessary_cast)]
469 from_raw_fdb_slice(self.0.key as *const u8, self.0.key_length as usize)
470 }
471
472 pub fn value(&self) -> &[u8] {
474 #[allow(clippy::unnecessary_cast)]
476 unsafe {
477 std::slice::from_raw_parts(self.0.value as *const u8, self.0.value_length as usize)
478 }
479 }
480}
481
482impl PartialEq for FdbKeyValue {
483 fn eq(&self, other: &Self) -> bool {
484 (self.key(), self.value()) == (other.key(), other.value())
485 }
486}
487impl Eq for FdbKeyValue {}
488impl fmt::Debug for FdbKeyValue {
489 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
490 write!(
491 f,
492 "({:?}, {:?})",
493 crate::tuple::Bytes::from(self.key()),
494 crate::tuple::Bytes::from(self.value())
495 )
496 }
497}
498
499impl TryFrom<FdbFutureHandle> for i64 {
500 type Error = FdbError;
501
502 fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
503 let mut version: i64 = 0;
504 error::eval(unsafe {
505 if_cfg_api_versions!(min = 620 => {
506 fdb_sys::fdb_future_get_int64(f.as_ptr(), &mut version)
507 } else {
508 fdb_sys::fdb_future_get_version(f.as_ptr(), &mut version)
509 })
510 })?;
511 Ok(version)
512 }
513}
514
515impl TryFrom<FdbFutureHandle> for () {
516 type Error = FdbError;
517 fn try_from(_f: FdbFutureHandle) -> FdbResult<Self> {
518 Ok(())
519 }
520}