foundationdb/
future.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Most functions in the FoundationDB API are asynchronous, meaning that they
10//! may return to the caller before actually delivering their Fdbresult.
11//!
12//! These functions always return FDBFuture*. An FDBFuture object represents a
13//! Fdbresult value or error to be delivered at some future time. You can wait for
14//! a Future to be “ready” – to have a value or error delivered – by setting a
15//! callback function, or by blocking a thread, or by polling. Once a Future is
16//! ready, you can extract either an error code or a value of the appropriate
17//! type (the documentation for the original function will tell you which
18//! fdb_future_get_*() function you should call).
19//!
20//! Futures make it easy to do multiple operations in parallel, by calling several
21//! asynchronous functions before waiting for any of the Fdbresults. This can be
22//! important for reducing the latency of transactions.
23//!
24
25use std::convert::TryFrom;
26use std::ffi::CStr;
27use std::fmt;
28use std::ops::Deref;
29use std::os::raw::c_char;
30use std::pin::Pin;
31use std::ptr::NonNull;
32use std::sync::Arc;
33
34#[cfg_api_versions(min = 700)]
35pub use crate::fdb_keys::FdbKeys;
36use crate::from_raw_fdb_slice;
37#[cfg_api_versions(min = 710)]
38pub use crate::mapped_key_values::MappedKeyValues;
39use fdb_sys::if_cfg_api_versions;
40use foundationdb_macros::cfg_api_versions;
41use foundationdb_sys as fdb_sys;
42use futures::prelude::*;
43use futures::task::{AtomicWaker, Context, Poll};
44
45use crate::{error, FdbError, FdbResult};
46
47/// An opaque type that represents a Future in the FoundationDB C API.
48pub(crate) struct FdbFutureHandle(NonNull<fdb_sys::FDBFuture>);
49
50impl FdbFutureHandle {
51    pub const fn as_ptr(&self) -> *mut fdb_sys::FDBFuture {
52        self.0.as_ptr()
53    }
54}
55unsafe impl Sync for FdbFutureHandle {}
56unsafe impl Send for FdbFutureHandle {}
57impl Drop for FdbFutureHandle {
58    fn drop(&mut self) {
59        // `fdb_future_destroy` cancels the future, so we don't need to call
60        // `fdb_future_cancel` explicitly.
61        unsafe { fdb_sys::fdb_future_destroy(self.as_ptr()) }
62    }
63}
64
65/// An opaque type that represents a pending Future that will be converted to a
66/// predefined result type.
67///
68/// Non owned result type (Fdb
69pub(crate) struct FdbFuture<T> {
70    f: Option<FdbFutureHandle>,
71    waker: Option<Arc<AtomicWaker>>,
72    phantom: std::marker::PhantomData<T>,
73}
74
75impl<T> FdbFuture<T>
76where
77    T: TryFrom<FdbFutureHandle, Error = FdbError> + Unpin,
78{
79    pub(crate) fn new(f: *mut fdb_sys::FDBFuture) -> Self {
80        Self {
81            f: Some(FdbFutureHandle(
82                NonNull::new(f).expect("FDBFuture to not be null"),
83            )),
84            waker: None,
85            phantom: std::marker::PhantomData,
86        }
87    }
88}
89
90impl<T> Future for FdbFuture<T>
91where
92    T: TryFrom<FdbFutureHandle, Error = FdbError> + Unpin,
93{
94    type Output = FdbResult<T>;
95
96    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<FdbResult<T>> {
97        let f = self.f.as_ref().expect("cannot poll after resolve");
98        let ready = unsafe { fdb_sys::fdb_future_is_ready(f.as_ptr()) };
99        if ready == 0 {
100            let f_ptr = f.as_ptr();
101            let mut register = false;
102            let waker = self.waker.get_or_insert_with(|| {
103                register = true;
104                Arc::new(AtomicWaker::new())
105            });
106            waker.register(cx.waker());
107            if register {
108                let network_waker: Arc<AtomicWaker> = waker.clone();
109                let network_waker_ptr = Arc::into_raw(network_waker);
110                unsafe {
111                    fdb_sys::fdb_future_set_callback(
112                        f_ptr,
113                        Some(fdb_future_callback),
114                        network_waker_ptr as *mut _,
115                    );
116                }
117            }
118            Poll::Pending
119        } else {
120            Poll::Ready(
121                error::eval(unsafe { fdb_sys::fdb_future_get_error(f.as_ptr()) })
122                    .and_then(|()| T::try_from(self.f.take().expect("self.f.is_some()"))),
123            )
124        }
125    }
126}
127
128// The callback from fdb C API can be called from multiple threads. so this callback should be
129// thread-safe.
130extern "C" fn fdb_future_callback(
131    _f: *mut fdb_sys::FDBFuture,
132    callback_parameter: *mut ::std::os::raw::c_void,
133) {
134    let network_waker: Arc<AtomicWaker> = unsafe { Arc::from_raw(callback_parameter as *const _) };
135    network_waker.wake();
136}
137
138/// A slice of bytes owned by a foundationDB future
139pub struct FdbSlice {
140    _f: FdbFutureHandle,
141    value: *const u8,
142    len: i32,
143}
144unsafe impl Sync for FdbSlice {}
145unsafe impl Send for FdbSlice {}
146
147impl Deref for FdbSlice {
148    type Target = [u8];
149    fn deref(&self) -> &Self::Target {
150        from_raw_fdb_slice(self.value, self.len as usize)
151    }
152}
153impl AsRef<[u8]> for FdbSlice {
154    fn as_ref(&self) -> &[u8] {
155        self.deref()
156    }
157}
158
159impl TryFrom<FdbFutureHandle> for FdbSlice {
160    type Error = FdbError;
161
162    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
163        let mut value = std::ptr::null();
164        let mut len = 0;
165
166        error::eval(unsafe { fdb_sys::fdb_future_get_key(f.as_ptr(), &mut value, &mut len) })?;
167
168        Ok(FdbSlice { _f: f, value, len })
169    }
170}
171
172impl TryFrom<FdbFutureHandle> for Option<FdbSlice> {
173    type Error = FdbError;
174
175    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
176        let mut present = 0;
177        let mut value = std::ptr::null();
178        let mut len = 0;
179
180        error::eval(unsafe {
181            fdb_sys::fdb_future_get_value(f.as_ptr(), &mut present, &mut value, &mut len)
182        })?;
183
184        Ok(if present == 0 {
185            None
186        } else {
187            Some(FdbSlice { _f: f, value, len })
188        })
189    }
190}
191
192/// A slice of addresses owned by a foundationDB future
193pub struct FdbAddresses {
194    _f: FdbFutureHandle,
195    strings: *const FdbAddress,
196    len: i32,
197}
198unsafe impl Sync for FdbAddresses {}
199unsafe impl Send for FdbAddresses {}
200
201impl TryFrom<FdbFutureHandle> for FdbAddresses {
202    type Error = FdbError;
203
204    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
205        let mut strings: *mut *const c_char = std::ptr::null_mut();
206        let mut len = 0;
207
208        error::eval(unsafe {
209            fdb_sys::fdb_future_get_string_array(f.as_ptr(), &mut strings, &mut len)
210        })?;
211
212        Ok(FdbAddresses {
213            _f: f,
214            strings: strings as *const FdbAddress,
215            len,
216        })
217    }
218}
219
220impl Deref for FdbAddresses {
221    type Target = [FdbAddress];
222
223    fn deref(&self) -> &Self::Target {
224        assert_eq_size!(FdbAddress, *const c_char);
225        assert_eq_align!(FdbAddress, u8);
226        from_raw_fdb_slice(self.strings, self.len as usize)
227    }
228}
229impl AsRef<[FdbAddress]> for FdbAddresses {
230    fn as_ref(&self) -> &[FdbAddress] {
231        self.deref()
232    }
233}
234
235/// An address owned by a foundationDB future
236///
237/// Because the data it represent is owned by the future in FdbAddresses, you
238/// can never own a FdbAddress directly, you can only have references to it.
239/// This way, you can never obtain a lifetime greater than the lifetime of the
240/// slice that gave you access to it.
241#[repr(C, packed)]
242pub struct FdbAddress {
243    c_str: *const c_char,
244}
245
246impl Deref for FdbAddress {
247    type Target = CStr;
248
249    fn deref(&self) -> &CStr {
250        unsafe { std::ffi::CStr::from_ptr(self.c_str) }
251    }
252}
253impl AsRef<CStr> for FdbAddress {
254    fn as_ref(&self) -> &CStr {
255        self.deref()
256    }
257}
258
259/// An slice of keyvalues owned by a foundationDB future
260pub struct FdbValues {
261    _f: FdbFutureHandle,
262    keyvalues: *const FdbKeyValue,
263    len: i32,
264    more: bool,
265}
266unsafe impl Sync for FdbValues {}
267unsafe impl Send for FdbValues {}
268
269impl FdbValues {
270    /// `true` if there is another range after this one
271    pub fn more(&self) -> bool {
272        self.more
273    }
274}
275
276impl TryFrom<FdbFutureHandle> for FdbValues {
277    type Error = FdbError;
278    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
279        let mut keyvalues = std::ptr::null();
280        let mut len = 0;
281        let mut more = 0;
282
283        unsafe {
284            error::eval(fdb_sys::fdb_future_get_keyvalue_array(
285                f.as_ptr(),
286                &mut keyvalues,
287                &mut len,
288                &mut more,
289            ))?
290        }
291
292        Ok(FdbValues {
293            _f: f,
294            keyvalues: keyvalues as *const FdbKeyValue,
295            len,
296            more: more != 0,
297        })
298    }
299}
300
301impl Deref for FdbValues {
302    type Target = [FdbKeyValue];
303    fn deref(&self) -> &Self::Target {
304        assert_eq_size!(FdbKeyValue, fdb_sys::FDBKeyValue);
305        assert_eq_align!(FdbKeyValue, u8);
306
307        from_raw_fdb_slice(self.keyvalues, self.len as usize)
308    }
309}
310impl AsRef<[FdbKeyValue]> for FdbValues {
311    fn as_ref(&self) -> &[FdbKeyValue] {
312        self.deref()
313    }
314}
315
316impl<'a> IntoIterator for &'a FdbValues {
317    type Item = &'a FdbKeyValue;
318    type IntoIter = std::slice::Iter<'a, FdbKeyValue>;
319
320    fn into_iter(self) -> Self::IntoIter {
321        self.deref().iter()
322    }
323}
324impl IntoIterator for FdbValues {
325    type Item = FdbValue;
326    type IntoIter = FdbValuesIter;
327
328    fn into_iter(self) -> Self::IntoIter {
329        FdbValuesIter {
330            f: Arc::new(self._f),
331            keyvalues: self.keyvalues,
332            len: self.len,
333            pos: 0,
334        }
335    }
336}
337
338/// An iterator of keyvalues owned by a foundationDB future
339pub struct FdbValuesIter {
340    f: Arc<FdbFutureHandle>,
341    keyvalues: *const FdbKeyValue,
342    len: i32,
343    pos: i32,
344}
345
346unsafe impl Send for FdbValuesIter {}
347
348impl Iterator for FdbValuesIter {
349    type Item = FdbValue;
350    fn next(&mut self) -> Option<Self::Item> {
351        #[allow(clippy::iter_nth_zero)]
352        self.nth(0)
353    }
354
355    fn nth(&mut self, n: usize) -> Option<Self::Item> {
356        let pos = (self.pos as usize).checked_add(n);
357        match pos {
358            Some(pos) if pos < self.len as usize => {
359                // safe because pos < self.len
360                let keyvalue = unsafe { self.keyvalues.add(pos) };
361                self.pos = pos as i32 + 1;
362
363                Some(FdbValue {
364                    _f: self.f.clone(),
365                    keyvalue,
366                })
367            }
368            _ => {
369                self.pos = self.len;
370                None
371            }
372        }
373    }
374
375    fn size_hint(&self) -> (usize, Option<usize>) {
376        let rem = (self.len - self.pos) as usize;
377        (rem, Some(rem))
378    }
379}
380impl ExactSizeIterator for FdbValuesIter {
381    #[inline]
382    fn len(&self) -> usize {
383        (self.len - self.pos) as usize
384    }
385}
386impl DoubleEndedIterator for FdbValuesIter {
387    fn next_back(&mut self) -> Option<Self::Item> {
388        self.nth_back(0)
389    }
390
391    fn nth_back(&mut self, n: usize) -> Option<Self::Item> {
392        if n < self.len() {
393            self.len -= 1 + n as i32;
394            // safe because len < original len
395            let keyvalue = unsafe { self.keyvalues.add(self.len as usize) };
396            Some(FdbValue {
397                _f: self.f.clone(),
398                keyvalue,
399            })
400        } else {
401            self.pos = self.len;
402            None
403        }
404    }
405}
406
407/// A keyvalue you can own
408///
409/// Until dropped, this might prevent multiple key/values from beeing freed.
410/// (i.e. the future that own the data is dropped once all data it provided is dropped)
411pub struct FdbValue {
412    _f: Arc<FdbFutureHandle>,
413    keyvalue: *const FdbKeyValue,
414}
415
416unsafe impl Send for FdbValue {}
417
418impl Deref for FdbValue {
419    type Target = FdbKeyValue;
420    fn deref(&self) -> &Self::Target {
421        assert_eq_size!(FdbKeyValue, fdb_sys::FDBKeyValue);
422        assert_eq_align!(FdbKeyValue, u8);
423        unsafe { &*self.keyvalue }
424    }
425}
426impl AsRef<FdbKeyValue> for FdbValue {
427    fn as_ref(&self) -> &FdbKeyValue {
428        self.deref()
429    }
430}
431impl PartialEq for FdbValue {
432    fn eq(&self, other: &Self) -> bool {
433        self.deref() == other.deref()
434    }
435}
436impl Eq for FdbValue {}
437impl fmt::Debug for FdbValue {
438    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
439        self.deref().fmt(f)
440    }
441}
442
443/// A keyvalue owned by a foundationDB future
444///
445/// Because the data it represent is owned by the future in FdbValues, you
446/// can never own a FdbKeyValue directly, you can only have references to it.
447/// This way, you can never obtain a lifetime greater than the lifetime of the
448/// slice that gave you access to it.
449#[repr(C, packed)]
450pub struct FdbKeyValue(fdb_sys::FDBKeyValue);
451
452impl FdbKeyValue {
453    /// key
454    pub fn key(&self) -> &[u8] {
455        // This cast to `*const u8` isn't unnecessary in all configurations.
456        #[allow(clippy::unnecessary_cast)]
457        from_raw_fdb_slice(self.0.key as *const u8, self.0.key_length as usize)
458    }
459
460    /// value
461    pub fn value(&self) -> &[u8] {
462        // This cast to `*const u8` isn't unnecessary in all configurations.
463        #[allow(clippy::unnecessary_cast)]
464        unsafe {
465            std::slice::from_raw_parts(self.0.value as *const u8, self.0.value_length as usize)
466        }
467    }
468}
469
470impl PartialEq for FdbKeyValue {
471    fn eq(&self, other: &Self) -> bool {
472        (self.key(), self.value()) == (other.key(), other.value())
473    }
474}
475impl Eq for FdbKeyValue {}
476impl fmt::Debug for FdbKeyValue {
477    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
478        write!(
479            f,
480            "({:?}, {:?})",
481            crate::tuple::Bytes::from(self.key()),
482            crate::tuple::Bytes::from(self.value())
483        )
484    }
485}
486
487impl TryFrom<FdbFutureHandle> for i64 {
488    type Error = FdbError;
489
490    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
491        let mut version: i64 = 0;
492        error::eval(unsafe {
493            if_cfg_api_versions!(min = 620 => {
494                fdb_sys::fdb_future_get_int64(f.as_ptr(), &mut version)
495            } else {
496                fdb_sys::fdb_future_get_version(f.as_ptr(), &mut version)
497            })
498        })?;
499        Ok(version)
500    }
501}
502
503impl TryFrom<FdbFutureHandle> for () {
504    type Error = FdbError;
505    fn try_from(_f: FdbFutureHandle) -> FdbResult<Self> {
506        Ok(())
507    }
508}