foundationdb/
future.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Most functions in the FoundationDB API are asynchronous, meaning that they
10//! may return to the caller before actually delivering their Fdbresult.
11//!
12//! These functions always return FDBFuture*. An FDBFuture object represents a
13//! Fdbresult value or error to be delivered at some future time. You can wait for
14//! a Future to be “ready” – to have a value or error delivered – by setting a
15//! callback function, or by blocking a thread, or by polling. Once a Future is
16//! ready, you can extract either an error code or a value of the appropriate
17//! type (the documentation for the original function will tell you which
18//! fdb_future_get_*() function you should call).
19//!
20//! Futures make it easy to do multiple operations in parallel, by calling several
21//! asynchronous functions before waiting for any of the Fdbresults. This can be
22//! important for reducing the latency of transactions.
23//!
24
25use std::convert::TryFrom;
26use std::ffi::CStr;
27use std::fmt;
28use std::ops::Deref;
29use std::os::raw::c_char;
30use std::pin::Pin;
31use std::ptr::NonNull;
32use std::sync::Arc;
33
34#[cfg_api_versions(min = 700)]
35pub use crate::fdb_keys::FdbKeys;
36use crate::from_raw_fdb_slice;
37#[cfg_api_versions(min = 710)]
38pub use crate::mapped_key_values::MappedKeyValues;
39use foundationdb_macros::cfg_api_versions;
40use foundationdb_sys as fdb_sys;
41use futures::prelude::*;
42use futures::task::{AtomicWaker, Context, Poll};
43
44use crate::{error, FdbError, FdbResult};
45
46/// An opaque type that represents a Future in the FoundationDB C API.
47pub(crate) struct FdbFutureHandle(NonNull<fdb_sys::FDBFuture>);
48
49impl FdbFutureHandle {
50    pub const fn as_ptr(&self) -> *mut fdb_sys::FDBFuture {
51        self.0.as_ptr()
52    }
53}
54unsafe impl Sync for FdbFutureHandle {}
55unsafe impl Send for FdbFutureHandle {}
56impl Drop for FdbFutureHandle {
57    fn drop(&mut self) {
58        // `fdb_future_destroy` cancels the future, so we don't need to call
59        // `fdb_future_cancel` explicitly.
60        unsafe { fdb_sys::fdb_future_destroy(self.as_ptr()) }
61    }
62}
63
64/// An opaque type that represents a pending Future that will be converted to a
65/// predefined result type.
66///
67/// Non owned result type (Fdb
68pub(crate) struct FdbFuture<T> {
69    f: Option<FdbFutureHandle>,
70    waker: Option<Arc<AtomicWaker>>,
71    phantom: std::marker::PhantomData<T>,
72}
73
74impl<T> FdbFuture<T>
75where
76    T: TryFrom<FdbFutureHandle, Error = FdbError> + Unpin,
77{
78    pub(crate) fn new(f: *mut fdb_sys::FDBFuture) -> Self {
79        Self {
80            f: Some(FdbFutureHandle(
81                NonNull::new(f).expect("FDBFuture to not be null"),
82            )),
83            waker: None,
84            phantom: std::marker::PhantomData,
85        }
86    }
87}
88
89impl<T> Future for FdbFuture<T>
90where
91    T: TryFrom<FdbFutureHandle, Error = FdbError> + Unpin,
92{
93    type Output = FdbResult<T>;
94
95    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<FdbResult<T>> {
96        let f = self.f.as_ref().expect("cannot poll after resolve");
97        let ready = unsafe { fdb_sys::fdb_future_is_ready(f.as_ptr()) };
98        if ready == 0 {
99            let f_ptr = f.as_ptr();
100            let mut register = false;
101            let waker = self.waker.get_or_insert_with(|| {
102                register = true;
103                Arc::new(AtomicWaker::new())
104            });
105            waker.register(cx.waker());
106            if register {
107                let network_waker: Arc<AtomicWaker> = waker.clone();
108                let network_waker_ptr = Arc::into_raw(network_waker);
109                unsafe {
110                    fdb_sys::fdb_future_set_callback(
111                        f_ptr,
112                        Some(fdb_future_callback),
113                        network_waker_ptr as *mut _,
114                    );
115                }
116            }
117            Poll::Pending
118        } else {
119            Poll::Ready(
120                error::eval(unsafe { fdb_sys::fdb_future_get_error(f.as_ptr()) })
121                    .and_then(|()| T::try_from(self.f.take().expect("self.f.is_some()"))),
122            )
123        }
124    }
125}
126
127// The callback from fdb C API can be called from multiple threads. so this callback should be
128// thread-safe.
129extern "C" fn fdb_future_callback(
130    _f: *mut fdb_sys::FDBFuture,
131    callback_parameter: *mut ::std::os::raw::c_void,
132) {
133    let network_waker: Arc<AtomicWaker> = unsafe { Arc::from_raw(callback_parameter as *const _) };
134    network_waker.wake();
135}
136
137/// A slice of bytes owned by a foundationDB future
138pub struct FdbSlice {
139    _f: FdbFutureHandle,
140    value: *const u8,
141    len: i32,
142}
143unsafe impl Sync for FdbSlice {}
144unsafe impl Send for FdbSlice {}
145
146impl Deref for FdbSlice {
147    type Target = [u8];
148    fn deref(&self) -> &Self::Target {
149        from_raw_fdb_slice(self.value, self.len as usize)
150    }
151}
152impl AsRef<[u8]> for FdbSlice {
153    fn as_ref(&self) -> &[u8] {
154        self.deref()
155    }
156}
157
158impl TryFrom<FdbFutureHandle> for FdbSlice {
159    type Error = FdbError;
160
161    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
162        let mut value = std::ptr::null();
163        let mut len = 0;
164
165        error::eval(unsafe { fdb_sys::fdb_future_get_key(f.as_ptr(), &mut value, &mut len) })?;
166
167        Ok(FdbSlice { _f: f, value, len })
168    }
169}
170
171impl TryFrom<FdbFutureHandle> for Option<FdbSlice> {
172    type Error = FdbError;
173
174    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
175        let mut present = 0;
176        let mut value = std::ptr::null();
177        let mut len = 0;
178
179        error::eval(unsafe {
180            fdb_sys::fdb_future_get_value(f.as_ptr(), &mut present, &mut value, &mut len)
181        })?;
182
183        Ok(if present == 0 {
184            None
185        } else {
186            Some(FdbSlice { _f: f, value, len })
187        })
188    }
189}
190
191/// A slice of addresses owned by a foundationDB future
192pub struct FdbAddresses {
193    _f: FdbFutureHandle,
194    strings: *const FdbAddress,
195    len: i32,
196}
197unsafe impl Sync for FdbAddresses {}
198unsafe impl Send for FdbAddresses {}
199
200impl TryFrom<FdbFutureHandle> for FdbAddresses {
201    type Error = FdbError;
202
203    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
204        let mut strings: *mut *const c_char = std::ptr::null_mut();
205        let mut len = 0;
206
207        error::eval(unsafe {
208            fdb_sys::fdb_future_get_string_array(f.as_ptr(), &mut strings, &mut len)
209        })?;
210
211        Ok(FdbAddresses {
212            _f: f,
213            strings: strings as *const FdbAddress,
214            len,
215        })
216    }
217}
218
219impl Deref for FdbAddresses {
220    type Target = [FdbAddress];
221
222    fn deref(&self) -> &Self::Target {
223        assert_eq_size!(FdbAddress, *const c_char);
224        assert_eq_align!(FdbAddress, u8);
225        from_raw_fdb_slice(self.strings, self.len as usize)
226    }
227}
228impl AsRef<[FdbAddress]> for FdbAddresses {
229    fn as_ref(&self) -> &[FdbAddress] {
230        self.deref()
231    }
232}
233
234/// An address owned by a foundationDB future
235///
236/// Because the data it represent is owned by the future in FdbAddresses, you
237/// can never own a FdbAddress directly, you can only have references to it.
238/// This way, you can never obtain a lifetime greater than the lifetime of the
239/// slice that gave you access to it.
240#[repr(packed)]
241pub struct FdbAddress {
242    c_str: *const c_char,
243}
244
245impl Deref for FdbAddress {
246    type Target = CStr;
247
248    fn deref(&self) -> &CStr {
249        unsafe { std::ffi::CStr::from_ptr(self.c_str) }
250    }
251}
252impl AsRef<CStr> for FdbAddress {
253    fn as_ref(&self) -> &CStr {
254        self.deref()
255    }
256}
257
258/// An slice of keyvalues owned by a foundationDB future
259pub struct FdbValues {
260    _f: FdbFutureHandle,
261    keyvalues: *const FdbKeyValue,
262    len: i32,
263    more: bool,
264}
265unsafe impl Sync for FdbValues {}
266unsafe impl Send for FdbValues {}
267
268impl FdbValues {
269    /// `true` if there is another range after this one
270    pub fn more(&self) -> bool {
271        self.more
272    }
273}
274
275impl TryFrom<FdbFutureHandle> for FdbValues {
276    type Error = FdbError;
277    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
278        let mut keyvalues = std::ptr::null();
279        let mut len = 0;
280        let mut more = 0;
281
282        unsafe {
283            error::eval(fdb_sys::fdb_future_get_keyvalue_array(
284                f.as_ptr(),
285                &mut keyvalues,
286                &mut len,
287                &mut more,
288            ))?
289        }
290
291        Ok(FdbValues {
292            _f: f,
293            keyvalues: keyvalues as *const FdbKeyValue,
294            len,
295            more: more != 0,
296        })
297    }
298}
299
300impl Deref for FdbValues {
301    type Target = [FdbKeyValue];
302    fn deref(&self) -> &Self::Target {
303        assert_eq_size!(FdbKeyValue, fdb_sys::FDBKeyValue);
304        assert_eq_align!(FdbKeyValue, u8);
305
306        from_raw_fdb_slice(self.keyvalues, self.len as usize)
307    }
308}
309impl AsRef<[FdbKeyValue]> for FdbValues {
310    fn as_ref(&self) -> &[FdbKeyValue] {
311        self.deref()
312    }
313}
314
315impl<'a> IntoIterator for &'a FdbValues {
316    type Item = &'a FdbKeyValue;
317    type IntoIter = std::slice::Iter<'a, FdbKeyValue>;
318
319    fn into_iter(self) -> Self::IntoIter {
320        self.deref().iter()
321    }
322}
323impl IntoIterator for FdbValues {
324    type Item = FdbValue;
325    type IntoIter = FdbValuesIter;
326
327    fn into_iter(self) -> Self::IntoIter {
328        FdbValuesIter {
329            f: Arc::new(self._f),
330            keyvalues: self.keyvalues,
331            len: self.len,
332            pos: 0,
333        }
334    }
335}
336
337/// An iterator of keyvalues owned by a foundationDB future
338pub struct FdbValuesIter {
339    f: Arc<FdbFutureHandle>,
340    keyvalues: *const FdbKeyValue,
341    len: i32,
342    pos: i32,
343}
344
345unsafe impl Send for FdbValuesIter {}
346
347impl Iterator for FdbValuesIter {
348    type Item = FdbValue;
349    fn next(&mut self) -> Option<Self::Item> {
350        #[allow(clippy::iter_nth_zero)]
351        self.nth(0)
352    }
353
354    fn nth(&mut self, n: usize) -> Option<Self::Item> {
355        let pos = (self.pos as usize).checked_add(n);
356        match pos {
357            Some(pos) if pos < self.len as usize => {
358                // safe because pos < self.len
359                let keyvalue = unsafe { self.keyvalues.add(pos) };
360                self.pos = pos as i32 + 1;
361
362                Some(FdbValue {
363                    _f: self.f.clone(),
364                    keyvalue,
365                })
366            }
367            _ => {
368                self.pos = self.len;
369                None
370            }
371        }
372    }
373
374    fn size_hint(&self) -> (usize, Option<usize>) {
375        let rem = (self.len - self.pos) as usize;
376        (rem, Some(rem))
377    }
378}
379impl ExactSizeIterator for FdbValuesIter {
380    #[inline]
381    fn len(&self) -> usize {
382        (self.len - self.pos) as usize
383    }
384}
385impl DoubleEndedIterator for FdbValuesIter {
386    fn next_back(&mut self) -> Option<Self::Item> {
387        self.nth_back(0)
388    }
389
390    fn nth_back(&mut self, n: usize) -> Option<Self::Item> {
391        if n < self.len() {
392            self.len -= 1 + n as i32;
393            // safe because len < original len
394            let keyvalue = unsafe { self.keyvalues.add(self.len as usize) };
395            Some(FdbValue {
396                _f: self.f.clone(),
397                keyvalue,
398            })
399        } else {
400            self.pos = self.len;
401            None
402        }
403    }
404}
405
406/// A keyvalue you can own
407///
408/// Until dropped, this might prevent multiple key/values from beeing freed.
409/// (i.e. the future that own the data is dropped once all data it provided is dropped)
410pub struct FdbValue {
411    _f: Arc<FdbFutureHandle>,
412    keyvalue: *const FdbKeyValue,
413}
414
415unsafe impl Send for FdbValue {}
416
417impl Deref for FdbValue {
418    type Target = FdbKeyValue;
419    fn deref(&self) -> &Self::Target {
420        assert_eq_size!(FdbKeyValue, fdb_sys::FDBKeyValue);
421        assert_eq_align!(FdbKeyValue, u8);
422        unsafe { &*self.keyvalue }
423    }
424}
425impl AsRef<FdbKeyValue> for FdbValue {
426    fn as_ref(&self) -> &FdbKeyValue {
427        self.deref()
428    }
429}
430impl PartialEq for FdbValue {
431    fn eq(&self, other: &Self) -> bool {
432        self.deref() == other.deref()
433    }
434}
435impl Eq for FdbValue {}
436impl fmt::Debug for FdbValue {
437    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
438        self.deref().fmt(f)
439    }
440}
441
442/// A keyvalue owned by a foundationDB future
443///
444/// Because the data it represent is owned by the future in FdbValues, you
445/// can never own a FdbKeyValue directly, you can only have references to it.
446/// This way, you can never obtain a lifetime greater than the lifetime of the
447/// slice that gave you access to it.
448#[repr(packed)]
449pub struct FdbKeyValue(fdb_sys::FDBKeyValue);
450
451impl FdbKeyValue {
452    /// key
453    pub fn key(&self) -> &[u8] {
454        // This cast to `*const u8` isn't unnecessary in all configurations.
455        #[allow(clippy::unnecessary_cast)]
456        from_raw_fdb_slice(self.0.key as *const u8, self.0.key_length as usize)
457    }
458
459    /// value
460    pub fn value(&self) -> &[u8] {
461        // This cast to `*const u8` isn't unnecessary in all configurations.
462        #[allow(clippy::unnecessary_cast)]
463        unsafe {
464            std::slice::from_raw_parts(self.0.value as *const u8, self.0.value_length as usize)
465        }
466    }
467}
468
469impl PartialEq for FdbKeyValue {
470    fn eq(&self, other: &Self) -> bool {
471        (self.key(), self.value()) == (other.key(), other.value())
472    }
473}
474impl Eq for FdbKeyValue {}
475impl fmt::Debug for FdbKeyValue {
476    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
477        write!(
478            f,
479            "({:?}, {:?})",
480            crate::tuple::Bytes::from(self.key()),
481            crate::tuple::Bytes::from(self.value())
482        )
483    }
484}
485
486impl TryFrom<FdbFutureHandle> for i64 {
487    type Error = FdbError;
488
489    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
490        let mut version: i64 = 0;
491        error::eval(unsafe {
492            #[cfg(any(
493                feature = "fdb-6_2",
494                feature = "fdb-6_3",
495                feature = "fdb-7_0",
496                feature = "fdb-7_1",
497                feature = "fdb-7_3"
498            ))]
499            {
500                fdb_sys::fdb_future_get_int64(f.as_ptr(), &mut version)
501            }
502            #[cfg(not(any(
503                feature = "fdb-6_2",
504                feature = "fdb-6_3",
505                feature = "fdb-7_0",
506                feature = "fdb-7_1",
507                feature = "fdb-7_3"
508            )))]
509            {
510                fdb_sys::fdb_future_get_version(f.as_ptr(), &mut version)
511            }
512        })?;
513        Ok(version)
514    }
515}
516
517impl TryFrom<FdbFutureHandle> for () {
518    type Error = FdbError;
519    fn try_from(_f: FdbFutureHandle) -> FdbResult<Self> {
520        Ok(())
521    }
522}