foundationdb/
future.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Most functions in the FoundationDB API are asynchronous, meaning that they
10//! may return to the caller before actually delivering their Fdbresult.
11//!
12//! These functions always return FDBFuture*. An FDBFuture object represents a
13//! Fdbresult value or error to be delivered at some future time. You can wait for
14//! a Future to be “ready” – to have a value or error delivered – by setting a
15//! callback function, or by blocking a thread, or by polling. Once a Future is
16//! ready, you can extract either an error code or a value of the appropriate
17//! type (the documentation for the original function will tell you which
18//! fdb_future_get_*() function you should call).
19//!
20//! Futures make it easy to do multiple operations in parallel, by calling several
21//! asynchronous functions before waiting for any of the Fdbresults. This can be
22//! important for reducing the latency of transactions.
23//!
24
25use std::convert::TryFrom;
26use std::ffi::CStr;
27use std::fmt;
28use std::ops::Deref;
29use std::os::raw::c_char;
30use std::pin::Pin;
31use std::ptr::NonNull;
32use std::sync::Arc;
33
34#[cfg_api_versions(min = 700)]
35pub use crate::fdb_keys::FdbKeys;
36use crate::from_raw_fdb_slice;
37#[cfg_api_versions(min = 710)]
38pub use crate::mapped_key_values::MappedKeyValues;
39use fdb_sys::if_cfg_api_versions;
40use foundationdb_macros::cfg_api_versions;
41use foundationdb_sys as fdb_sys;
42use futures::prelude::*;
43use futures::task::{AtomicWaker, Context, Poll};
44
45use crate::{error, FdbError, FdbResult};
46
47/// An opaque type that represents a Future in the FoundationDB C API.
48pub(crate) struct FdbFutureHandle(NonNull<fdb_sys::FDBFuture>);
49
50impl FdbFutureHandle {
51    pub const fn as_ptr(&self) -> *mut fdb_sys::FDBFuture {
52        self.0.as_ptr()
53    }
54}
55unsafe impl Sync for FdbFutureHandle {}
56unsafe impl Send for FdbFutureHandle {}
57impl Drop for FdbFutureHandle {
58    fn drop(&mut self) {
59        // `fdb_future_destroy` cancels the future, so we don't need to call
60        // `fdb_future_cancel` explicitly.
61        unsafe { fdb_sys::fdb_future_destroy(self.as_ptr()) }
62    }
63}
64
65/// An opaque type that represents a pending Future that will be converted to a
66/// predefined result type.
67///
68/// Non owned result type (Fdb
69pub(crate) struct FdbFuture<T> {
70    f: Option<FdbFutureHandle>,
71    waker: Option<Arc<AtomicWaker>>,
72    phantom: std::marker::PhantomData<T>,
73}
74
75impl<T> FdbFuture<T>
76where
77    T: TryFrom<FdbFutureHandle, Error = FdbError> + Unpin,
78{
79    pub(crate) fn new(f: *mut fdb_sys::FDBFuture) -> Self {
80        Self {
81            f: Some(FdbFutureHandle(
82                NonNull::new(f).expect("FDBFuture to not be null"),
83            )),
84            waker: None,
85            phantom: std::marker::PhantomData,
86        }
87    }
88}
89
90impl<T> Future for FdbFuture<T>
91where
92    T: TryFrom<FdbFutureHandle, Error = FdbError> + Unpin,
93{
94    type Output = FdbResult<T>;
95
96    #[cfg_attr(
97        feature = "trace",
98        tracing::instrument(level = "debug", skip(self, cx))
99    )]
100    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<FdbResult<T>> {
101        let f = self.f.as_ref().expect("cannot poll after resolve");
102        let ready = unsafe { fdb_sys::fdb_future_is_ready(f.as_ptr()) };
103        if ready == 0 {
104            let f_ptr = f.as_ptr();
105            let mut register = false;
106            let waker = self.waker.get_or_insert_with(|| {
107                register = true;
108                Arc::new(AtomicWaker::new())
109            });
110            waker.register(cx.waker());
111            if register {
112                let network_waker: Arc<AtomicWaker> = waker.clone();
113                let network_waker_ptr = Arc::into_raw(network_waker);
114                unsafe {
115                    fdb_sys::fdb_future_set_callback(
116                        f_ptr,
117                        Some(fdb_future_callback),
118                        network_waker_ptr as *mut _,
119                    );
120                }
121            }
122            Poll::Pending
123        } else {
124            Poll::Ready(
125                error::eval(unsafe { fdb_sys::fdb_future_get_error(f.as_ptr()) })
126                    .and_then(|()| T::try_from(self.f.take().expect("self.f.is_some()"))),
127            )
128        }
129    }
130}
131
132// The callback from fdb C API can be called from multiple threads. so this callback should be
133// thread-safe.
134extern "C" fn fdb_future_callback(
135    _f: *mut fdb_sys::FDBFuture,
136    callback_parameter: *mut ::std::os::raw::c_void,
137) {
138    let network_waker: Arc<AtomicWaker> = unsafe { Arc::from_raw(callback_parameter as *const _) };
139    network_waker.wake();
140}
141
142/// A slice of bytes owned by a foundationDB future
143pub struct FdbSlice {
144    _f: FdbFutureHandle,
145    value: *const u8,
146    len: i32,
147}
148unsafe impl Sync for FdbSlice {}
149unsafe impl Send for FdbSlice {}
150
151impl Deref for FdbSlice {
152    type Target = [u8];
153    fn deref(&self) -> &Self::Target {
154        from_raw_fdb_slice(self.value, self.len as usize)
155    }
156}
157impl AsRef<[u8]> for FdbSlice {
158    fn as_ref(&self) -> &[u8] {
159        self.deref()
160    }
161}
162
163impl TryFrom<FdbFutureHandle> for FdbSlice {
164    type Error = FdbError;
165
166    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
167        let mut value = std::ptr::null();
168        let mut len = 0;
169
170        error::eval(unsafe { fdb_sys::fdb_future_get_key(f.as_ptr(), &mut value, &mut len) })?;
171
172        Ok(FdbSlice { _f: f, value, len })
173    }
174}
175
176impl TryFrom<FdbFutureHandle> for Option<FdbSlice> {
177    type Error = FdbError;
178
179    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
180        let mut present = 0;
181        let mut value = std::ptr::null();
182        let mut len = 0;
183
184        error::eval(unsafe {
185            fdb_sys::fdb_future_get_value(f.as_ptr(), &mut present, &mut value, &mut len)
186        })?;
187
188        Ok(if present == 0 {
189            None
190        } else {
191            Some(FdbSlice { _f: f, value, len })
192        })
193    }
194}
195
196/// A slice of addresses owned by a foundationDB future
197pub struct FdbAddresses {
198    _f: FdbFutureHandle,
199    strings: *const FdbAddress,
200    len: i32,
201}
202unsafe impl Sync for FdbAddresses {}
203unsafe impl Send for FdbAddresses {}
204
205impl TryFrom<FdbFutureHandle> for FdbAddresses {
206    type Error = FdbError;
207
208    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
209        let mut strings: *mut *const c_char = std::ptr::null_mut();
210        let mut len = 0;
211
212        error::eval(unsafe {
213            fdb_sys::fdb_future_get_string_array(f.as_ptr(), &mut strings, &mut len)
214        })?;
215
216        Ok(FdbAddresses {
217            _f: f,
218            strings: strings as *const FdbAddress,
219            len,
220        })
221    }
222}
223
224impl Deref for FdbAddresses {
225    type Target = [FdbAddress];
226
227    fn deref(&self) -> &Self::Target {
228        assert_eq_size!(FdbAddress, *const c_char);
229        assert_eq_align!(FdbAddress, u8);
230        from_raw_fdb_slice(self.strings, self.len as usize)
231    }
232}
233impl AsRef<[FdbAddress]> for FdbAddresses {
234    fn as_ref(&self) -> &[FdbAddress] {
235        self.deref()
236    }
237}
238
239/// An address owned by a foundationDB future
240///
241/// Because the data it represent is owned by the future in FdbAddresses, you
242/// can never own a FdbAddress directly, you can only have references to it.
243/// This way, you can never obtain a lifetime greater than the lifetime of the
244/// slice that gave you access to it.
245#[repr(C, packed)]
246pub struct FdbAddress {
247    c_str: *const c_char,
248}
249
250impl Deref for FdbAddress {
251    type Target = CStr;
252
253    fn deref(&self) -> &CStr {
254        unsafe { std::ffi::CStr::from_ptr(self.c_str) }
255    }
256}
257impl AsRef<CStr> for FdbAddress {
258    fn as_ref(&self) -> &CStr {
259        self.deref()
260    }
261}
262
263/// An slice of keyvalues owned by a foundationDB future
264pub struct FdbValues {
265    _f: FdbFutureHandle,
266    keyvalues: *const FdbKeyValue,
267    len: i32,
268    more: bool,
269}
270unsafe impl Sync for FdbValues {}
271unsafe impl Send for FdbValues {}
272
273impl FdbValues {
274    /// `true` if there is another range after this one
275    pub fn more(&self) -> bool {
276        self.more
277    }
278}
279
280impl TryFrom<FdbFutureHandle> for FdbValues {
281    type Error = FdbError;
282    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
283        let mut keyvalues = std::ptr::null();
284        let mut len = 0;
285        let mut more = 0;
286
287        unsafe {
288            error::eval(fdb_sys::fdb_future_get_keyvalue_array(
289                f.as_ptr(),
290                &mut keyvalues,
291                &mut len,
292                &mut more,
293            ))?
294        }
295
296        Ok(FdbValues {
297            _f: f,
298            keyvalues: keyvalues as *const FdbKeyValue,
299            len,
300            more: more != 0,
301        })
302    }
303}
304
305impl Deref for FdbValues {
306    type Target = [FdbKeyValue];
307    fn deref(&self) -> &Self::Target {
308        assert_eq_size!(FdbKeyValue, fdb_sys::FDBKeyValue);
309        assert_eq_align!(FdbKeyValue, u8);
310
311        from_raw_fdb_slice(self.keyvalues, self.len as usize)
312    }
313}
314impl AsRef<[FdbKeyValue]> for FdbValues {
315    fn as_ref(&self) -> &[FdbKeyValue] {
316        self.deref()
317    }
318}
319
320impl<'a> IntoIterator for &'a FdbValues {
321    type Item = &'a FdbKeyValue;
322    type IntoIter = std::slice::Iter<'a, FdbKeyValue>;
323
324    fn into_iter(self) -> Self::IntoIter {
325        self.deref().iter()
326    }
327}
328impl IntoIterator for FdbValues {
329    type Item = FdbValue;
330    type IntoIter = FdbValuesIter;
331
332    fn into_iter(self) -> Self::IntoIter {
333        FdbValuesIter {
334            f: Arc::new(self._f),
335            keyvalues: self.keyvalues,
336            len: self.len,
337            pos: 0,
338        }
339    }
340}
341
342/// An iterator of keyvalues owned by a foundationDB future
343pub struct FdbValuesIter {
344    f: Arc<FdbFutureHandle>,
345    keyvalues: *const FdbKeyValue,
346    len: i32,
347    pos: i32,
348}
349
350unsafe impl Send for FdbValuesIter {}
351
352impl Iterator for FdbValuesIter {
353    type Item = FdbValue;
354    fn next(&mut self) -> Option<Self::Item> {
355        #[allow(clippy::iter_nth_zero)]
356        self.nth(0)
357    }
358
359    fn nth(&mut self, n: usize) -> Option<Self::Item> {
360        let pos = (self.pos as usize).checked_add(n);
361        match pos {
362            Some(pos) if pos < self.len as usize => {
363                // safe because pos < self.len
364                let keyvalue = unsafe { self.keyvalues.add(pos) };
365                self.pos = pos as i32 + 1;
366
367                Some(FdbValue {
368                    _f: self.f.clone(),
369                    keyvalue,
370                })
371            }
372            _ => {
373                self.pos = self.len;
374                None
375            }
376        }
377    }
378
379    fn size_hint(&self) -> (usize, Option<usize>) {
380        let rem = (self.len - self.pos) as usize;
381        (rem, Some(rem))
382    }
383}
384impl ExactSizeIterator for FdbValuesIter {
385    #[inline]
386    fn len(&self) -> usize {
387        (self.len - self.pos) as usize
388    }
389}
390impl DoubleEndedIterator for FdbValuesIter {
391    fn next_back(&mut self) -> Option<Self::Item> {
392        self.nth_back(0)
393    }
394
395    fn nth_back(&mut self, n: usize) -> Option<Self::Item> {
396        if n < self.len() {
397            self.len -= 1 + n as i32;
398            // safe because len < original len
399            let keyvalue = unsafe { self.keyvalues.add(self.len as usize) };
400            Some(FdbValue {
401                _f: self.f.clone(),
402                keyvalue,
403            })
404        } else {
405            self.pos = self.len;
406            None
407        }
408    }
409}
410
411/// A keyvalue you can own
412///
413/// Until dropped, this might prevent multiple key/values from beeing freed.
414/// (i.e. the future that own the data is dropped once all data it provided is dropped)
415pub struct FdbValue {
416    _f: Arc<FdbFutureHandle>,
417    keyvalue: *const FdbKeyValue,
418}
419
420unsafe impl Send for FdbValue {}
421
422impl Deref for FdbValue {
423    type Target = FdbKeyValue;
424    fn deref(&self) -> &Self::Target {
425        assert_eq_size!(FdbKeyValue, fdb_sys::FDBKeyValue);
426        assert_eq_align!(FdbKeyValue, u8);
427        unsafe { &*self.keyvalue }
428    }
429}
430impl AsRef<FdbKeyValue> for FdbValue {
431    fn as_ref(&self) -> &FdbKeyValue {
432        self.deref()
433    }
434}
435impl PartialEq for FdbValue {
436    fn eq(&self, other: &Self) -> bool {
437        self.deref() == other.deref()
438    }
439}
440impl Eq for FdbValue {}
441impl fmt::Debug for FdbValue {
442    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
443        self.deref().fmt(f)
444    }
445}
446
447/// A keyvalue owned by a foundationDB future
448///
449/// Because the data it represent is owned by the future in FdbValues, you
450/// can never own a FdbKeyValue directly, you can only have references to it.
451/// This way, you can never obtain a lifetime greater than the lifetime of the
452/// slice that gave you access to it.
453#[repr(C, packed)]
454pub struct FdbKeyValue(fdb_sys::FDBKeyValue);
455
456impl FdbKeyValue {
457    /// key
458    pub fn key(&self) -> &[u8] {
459        // This cast to `*const u8` isn't unnecessary in all configurations.
460        #[allow(clippy::unnecessary_cast)]
461        from_raw_fdb_slice(self.0.key as *const u8, self.0.key_length as usize)
462    }
463
464    /// value
465    pub fn value(&self) -> &[u8] {
466        // This cast to `*const u8` isn't unnecessary in all configurations.
467        #[allow(clippy::unnecessary_cast)]
468        unsafe {
469            std::slice::from_raw_parts(self.0.value as *const u8, self.0.value_length as usize)
470        }
471    }
472}
473
474impl PartialEq for FdbKeyValue {
475    fn eq(&self, other: &Self) -> bool {
476        (self.key(), self.value()) == (other.key(), other.value())
477    }
478}
479impl Eq for FdbKeyValue {}
480impl fmt::Debug for FdbKeyValue {
481    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
482        write!(
483            f,
484            "({:?}, {:?})",
485            crate::tuple::Bytes::from(self.key()),
486            crate::tuple::Bytes::from(self.value())
487        )
488    }
489}
490
491impl TryFrom<FdbFutureHandle> for i64 {
492    type Error = FdbError;
493
494    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
495        let mut version: i64 = 0;
496        error::eval(unsafe {
497            if_cfg_api_versions!(min = 620 => {
498                fdb_sys::fdb_future_get_int64(f.as_ptr(), &mut version)
499            } else {
500                fdb_sys::fdb_future_get_version(f.as_ptr(), &mut version)
501            })
502        })?;
503        Ok(version)
504    }
505}
506
507impl TryFrom<FdbFutureHandle> for () {
508    type Error = FdbError;
509    fn try_from(_f: FdbFutureHandle) -> FdbResult<Self> {
510        Ok(())
511    }
512}