foundationdb/
future.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Most functions in the FoundationDB API are asynchronous, meaning that they
10//! may return to the caller before actually delivering their Fdbresult.
11//!
12//! These functions always return FDBFuture*. An FDBFuture object represents a
13//! Fdbresult value or error to be delivered at some future time. You can wait for
14//! a Future to be “ready” – to have a value or error delivered – by setting a
15//! callback function, or by blocking a thread, or by polling. Once a Future is
16//! ready, you can extract either an error code or a value of the appropriate
17//! type (the documentation for the original function will tell you which
18//! fdb_future_get_*() function you should call).
19//!
20//! Futures make it easy to do multiple operations in parallel, by calling several
21//! asynchronous functions before waiting for any of the Fdbresults. This can be
22//! important for reducing the latency of transactions.
23//!
24
25use std::convert::TryFrom;
26use std::ffi::CStr;
27use std::fmt;
28use std::ops::Deref;
29use std::os::raw::c_char;
30use std::pin::Pin;
31use std::ptr::NonNull;
32use std::sync::Arc;
33
34#[cfg_api_versions(min = 700)]
35pub use crate::fdb_keys::FdbKeys;
36use crate::from_raw_fdb_slice;
37#[cfg_api_versions(min = 710)]
38pub use crate::mapped_key_values::MappedKeyValues;
39use fdb_sys::if_cfg_api_versions;
40use foundationdb_macros::cfg_api_versions;
41use foundationdb_sys as fdb_sys;
42use futures::prelude::*;
43use futures::task::{AtomicWaker, Context, Poll};
44
45use crate::{error, FdbError, FdbResult};
46
47/// An opaque type that represents a Future in the FoundationDB C API.
48pub(crate) struct FdbFutureHandle(NonNull<fdb_sys::FDBFuture>);
49
50impl FdbFutureHandle {
51    pub const fn as_ptr(&self) -> *mut fdb_sys::FDBFuture {
52        self.0.as_ptr()
53    }
54}
55unsafe impl Sync for FdbFutureHandle {}
56unsafe impl Send for FdbFutureHandle {}
57impl Drop for FdbFutureHandle {
58    fn drop(&mut self) {
59        // `fdb_future_destroy` cancels the future, so we don't need to call
60        // `fdb_future_cancel` explicitly.
61        unsafe { fdb_sys::fdb_future_destroy(self.as_ptr()) }
62    }
63}
64
65/// An opaque type that represents a pending Future that will be converted to a
66/// predefined result type.
67///
68/// Non owned result type (Fdb
69pub(crate) struct FdbFuture<T> {
70    f: Option<FdbFutureHandle>,
71    waker: Option<Arc<AtomicWaker>>,
72    phantom: std::marker::PhantomData<T>,
73}
74
75impl<T> FdbFuture<T>
76where
77    T: TryFrom<FdbFutureHandle, Error = FdbError> + Unpin,
78{
79    pub(crate) fn new(f: *mut fdb_sys::FDBFuture) -> Self {
80        Self {
81            f: Some(FdbFutureHandle(
82                NonNull::new(f).expect("FDBFuture to not be null"),
83            )),
84            waker: None,
85            phantom: std::marker::PhantomData,
86        }
87    }
88}
89
90impl<T> Future for FdbFuture<T>
91where
92    T: TryFrom<FdbFutureHandle, Error = FdbError> + Unpin,
93{
94    type Output = FdbResult<T>;
95
96    #[cfg_attr(
97        feature = "trace",
98        tracing::instrument(level = "debug", skip(self, cx))
99    )]
100    fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<FdbResult<T>> {
101        let f = self.f.as_ref().expect("cannot poll after resolve");
102        let ready = unsafe { fdb_sys::fdb_future_is_ready(f.as_ptr()) };
103        if ready == 0 {
104            let f_ptr = f.as_ptr();
105            let mut register = false;
106            let waker = self.waker.get_or_insert_with(|| {
107                register = true;
108                Arc::new(AtomicWaker::new())
109            });
110            waker.register(cx.waker());
111            if register {
112                let network_waker: Arc<AtomicWaker> = waker.clone();
113                let network_waker_ptr = Arc::into_raw(network_waker);
114                unsafe {
115                    fdb_sys::fdb_future_set_callback(
116                        f_ptr,
117                        Some(fdb_future_callback),
118                        network_waker_ptr as *mut _,
119                    );
120                }
121            }
122            Poll::Pending
123        } else {
124            Poll::Ready(
125                error::eval(unsafe { fdb_sys::fdb_future_get_error(f.as_ptr()) })
126                    .and_then(|()| T::try_from(self.f.take().expect("self.f.is_some()"))),
127            )
128        }
129    }
130}
131
132pub static CUSTOM_EXECUTOR_HOOK: std::sync::OnceLock<fn()> = std::sync::OnceLock::new();
133
134// The callback from fdb C API can be called from multiple threads. so this callback should be
135// thread-safe.
136extern "C" fn fdb_future_callback(
137    _f: *mut fdb_sys::FDBFuture,
138    callback_parameter: *mut ::std::os::raw::c_void,
139) {
140    // resolve all wakeup chain
141    let network_waker: Arc<AtomicWaker> = unsafe { Arc::from_raw(callback_parameter as *const _) };
142    network_waker.wake();
143
144    // polling phase for a custom executor (e.g. the simulation)
145    if let Some(poll_pending_tasks) = CUSTOM_EXECUTOR_HOOK.get() {
146        poll_pending_tasks();
147    }
148}
149
150/// A slice of bytes owned by a foundationDB future
151pub struct FdbSlice {
152    _f: FdbFutureHandle,
153    value: *const u8,
154    len: i32,
155}
156unsafe impl Sync for FdbSlice {}
157unsafe impl Send for FdbSlice {}
158
159impl Deref for FdbSlice {
160    type Target = [u8];
161    fn deref(&self) -> &Self::Target {
162        from_raw_fdb_slice(self.value, self.len as usize)
163    }
164}
165impl AsRef<[u8]> for FdbSlice {
166    fn as_ref(&self) -> &[u8] {
167        self.deref()
168    }
169}
170
171impl TryFrom<FdbFutureHandle> for FdbSlice {
172    type Error = FdbError;
173
174    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
175        let mut value = std::ptr::null();
176        let mut len = 0;
177
178        error::eval(unsafe { fdb_sys::fdb_future_get_key(f.as_ptr(), &mut value, &mut len) })?;
179
180        Ok(FdbSlice { _f: f, value, len })
181    }
182}
183
184impl TryFrom<FdbFutureHandle> for Option<FdbSlice> {
185    type Error = FdbError;
186
187    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
188        let mut present = 0;
189        let mut value = std::ptr::null();
190        let mut len = 0;
191
192        error::eval(unsafe {
193            fdb_sys::fdb_future_get_value(f.as_ptr(), &mut present, &mut value, &mut len)
194        })?;
195
196        Ok(if present == 0 {
197            None
198        } else {
199            Some(FdbSlice { _f: f, value, len })
200        })
201    }
202}
203
204/// A slice of addresses owned by a foundationDB future
205pub struct FdbAddresses {
206    _f: FdbFutureHandle,
207    strings: *const FdbAddress,
208    len: i32,
209}
210unsafe impl Sync for FdbAddresses {}
211unsafe impl Send for FdbAddresses {}
212
213impl TryFrom<FdbFutureHandle> for FdbAddresses {
214    type Error = FdbError;
215
216    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
217        let mut strings: *mut *const c_char = std::ptr::null_mut();
218        let mut len = 0;
219
220        error::eval(unsafe {
221            fdb_sys::fdb_future_get_string_array(f.as_ptr(), &mut strings, &mut len)
222        })?;
223
224        Ok(FdbAddresses {
225            _f: f,
226            strings: strings as *const FdbAddress,
227            len,
228        })
229    }
230}
231
232impl Deref for FdbAddresses {
233    type Target = [FdbAddress];
234
235    fn deref(&self) -> &Self::Target {
236        assert_eq_size!(FdbAddress, *const c_char);
237        assert_eq_align!(FdbAddress, u8);
238        from_raw_fdb_slice(self.strings, self.len as usize)
239    }
240}
241impl AsRef<[FdbAddress]> for FdbAddresses {
242    fn as_ref(&self) -> &[FdbAddress] {
243        self.deref()
244    }
245}
246
247/// An address owned by a foundationDB future
248///
249/// Because the data it represent is owned by the future in FdbAddresses, you
250/// can never own a FdbAddress directly, you can only have references to it.
251/// This way, you can never obtain a lifetime greater than the lifetime of the
252/// slice that gave you access to it.
253#[repr(C, packed)]
254pub struct FdbAddress {
255    c_str: *const c_char,
256}
257
258impl Deref for FdbAddress {
259    type Target = CStr;
260
261    fn deref(&self) -> &CStr {
262        unsafe { std::ffi::CStr::from_ptr(self.c_str) }
263    }
264}
265impl AsRef<CStr> for FdbAddress {
266    fn as_ref(&self) -> &CStr {
267        self.deref()
268    }
269}
270
271/// An slice of keyvalues owned by a foundationDB future
272pub struct FdbValues {
273    _f: FdbFutureHandle,
274    keyvalues: *const FdbKeyValue,
275    len: i32,
276    more: bool,
277}
278unsafe impl Sync for FdbValues {}
279unsafe impl Send for FdbValues {}
280
281impl FdbValues {
282    /// `true` if there is another range after this one
283    pub fn more(&self) -> bool {
284        self.more
285    }
286}
287
288impl TryFrom<FdbFutureHandle> for FdbValues {
289    type Error = FdbError;
290    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
291        let mut keyvalues = std::ptr::null();
292        let mut len = 0;
293        let mut more = 0;
294
295        unsafe {
296            error::eval(fdb_sys::fdb_future_get_keyvalue_array(
297                f.as_ptr(),
298                &mut keyvalues,
299                &mut len,
300                &mut more,
301            ))?
302        }
303
304        Ok(FdbValues {
305            _f: f,
306            keyvalues: keyvalues as *const FdbKeyValue,
307            len,
308            more: more != 0,
309        })
310    }
311}
312
313impl Deref for FdbValues {
314    type Target = [FdbKeyValue];
315    fn deref(&self) -> &Self::Target {
316        assert_eq_size!(FdbKeyValue, fdb_sys::FDBKeyValue);
317        assert_eq_align!(FdbKeyValue, u8);
318
319        from_raw_fdb_slice(self.keyvalues, self.len as usize)
320    }
321}
322impl AsRef<[FdbKeyValue]> for FdbValues {
323    fn as_ref(&self) -> &[FdbKeyValue] {
324        self.deref()
325    }
326}
327
328impl<'a> IntoIterator for &'a FdbValues {
329    type Item = &'a FdbKeyValue;
330    type IntoIter = std::slice::Iter<'a, FdbKeyValue>;
331
332    fn into_iter(self) -> Self::IntoIter {
333        self.deref().iter()
334    }
335}
336impl IntoIterator for FdbValues {
337    type Item = FdbValue;
338    type IntoIter = FdbValuesIter;
339
340    fn into_iter(self) -> Self::IntoIter {
341        FdbValuesIter {
342            f: Arc::new(self._f),
343            keyvalues: self.keyvalues,
344            len: self.len,
345            pos: 0,
346        }
347    }
348}
349
350/// An iterator of keyvalues owned by a foundationDB future
351pub struct FdbValuesIter {
352    f: Arc<FdbFutureHandle>,
353    keyvalues: *const FdbKeyValue,
354    len: i32,
355    pos: i32,
356}
357
358unsafe impl Send for FdbValuesIter {}
359
360impl Iterator for FdbValuesIter {
361    type Item = FdbValue;
362    fn next(&mut self) -> Option<Self::Item> {
363        #[allow(clippy::iter_nth_zero)]
364        self.nth(0)
365    }
366
367    fn nth(&mut self, n: usize) -> Option<Self::Item> {
368        let pos = (self.pos as usize).checked_add(n);
369        match pos {
370            Some(pos) if pos < self.len as usize => {
371                // safe because pos < self.len
372                let keyvalue = unsafe { self.keyvalues.add(pos) };
373                self.pos = pos as i32 + 1;
374
375                Some(FdbValue {
376                    _f: self.f.clone(),
377                    keyvalue,
378                })
379            }
380            _ => {
381                self.pos = self.len;
382                None
383            }
384        }
385    }
386
387    fn size_hint(&self) -> (usize, Option<usize>) {
388        let rem = (self.len - self.pos) as usize;
389        (rem, Some(rem))
390    }
391}
392impl ExactSizeIterator for FdbValuesIter {
393    #[inline]
394    fn len(&self) -> usize {
395        (self.len - self.pos) as usize
396    }
397}
398impl DoubleEndedIterator for FdbValuesIter {
399    fn next_back(&mut self) -> Option<Self::Item> {
400        self.nth_back(0)
401    }
402
403    fn nth_back(&mut self, n: usize) -> Option<Self::Item> {
404        if n < self.len() {
405            self.len -= 1 + n as i32;
406            // safe because len < original len
407            let keyvalue = unsafe { self.keyvalues.add(self.len as usize) };
408            Some(FdbValue {
409                _f: self.f.clone(),
410                keyvalue,
411            })
412        } else {
413            self.pos = self.len;
414            None
415        }
416    }
417}
418
419/// A keyvalue you can own
420///
421/// Until dropped, this might prevent multiple key/values from beeing freed.
422/// (i.e. the future that own the data is dropped once all data it provided is dropped)
423pub struct FdbValue {
424    _f: Arc<FdbFutureHandle>,
425    keyvalue: *const FdbKeyValue,
426}
427
428unsafe impl Send for FdbValue {}
429
430impl Deref for FdbValue {
431    type Target = FdbKeyValue;
432    fn deref(&self) -> &Self::Target {
433        assert_eq_size!(FdbKeyValue, fdb_sys::FDBKeyValue);
434        assert_eq_align!(FdbKeyValue, u8);
435        unsafe { &*self.keyvalue }
436    }
437}
438impl AsRef<FdbKeyValue> for FdbValue {
439    fn as_ref(&self) -> &FdbKeyValue {
440        self.deref()
441    }
442}
443impl PartialEq for FdbValue {
444    fn eq(&self, other: &Self) -> bool {
445        self.deref() == other.deref()
446    }
447}
448impl Eq for FdbValue {}
449impl fmt::Debug for FdbValue {
450    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
451        self.deref().fmt(f)
452    }
453}
454
455/// A keyvalue owned by a foundationDB future
456///
457/// Because the data it represent is owned by the future in FdbValues, you
458/// can never own a FdbKeyValue directly, you can only have references to it.
459/// This way, you can never obtain a lifetime greater than the lifetime of the
460/// slice that gave you access to it.
461#[repr(C, packed)]
462pub struct FdbKeyValue(fdb_sys::FDBKeyValue);
463
464impl FdbKeyValue {
465    /// key
466    pub fn key(&self) -> &[u8] {
467        // This cast to `*const u8` isn't unnecessary in all configurations.
468        #[allow(clippy::unnecessary_cast)]
469        from_raw_fdb_slice(self.0.key as *const u8, self.0.key_length as usize)
470    }
471
472    /// value
473    pub fn value(&self) -> &[u8] {
474        // This cast to `*const u8` isn't unnecessary in all configurations.
475        #[allow(clippy::unnecessary_cast)]
476        unsafe {
477            std::slice::from_raw_parts(self.0.value as *const u8, self.0.value_length as usize)
478        }
479    }
480}
481
482impl PartialEq for FdbKeyValue {
483    fn eq(&self, other: &Self) -> bool {
484        (self.key(), self.value()) == (other.key(), other.value())
485    }
486}
487impl Eq for FdbKeyValue {}
488impl fmt::Debug for FdbKeyValue {
489    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
490        write!(
491            f,
492            "({:?}, {:?})",
493            crate::tuple::Bytes::from(self.key()),
494            crate::tuple::Bytes::from(self.value())
495        )
496    }
497}
498
499impl TryFrom<FdbFutureHandle> for i64 {
500    type Error = FdbError;
501
502    fn try_from(f: FdbFutureHandle) -> FdbResult<Self> {
503        let mut version: i64 = 0;
504        error::eval(unsafe {
505            if_cfg_api_versions!(min = 620 => {
506                fdb_sys::fdb_future_get_int64(f.as_ptr(), &mut version)
507            } else {
508                fdb_sys::fdb_future_get_version(f.as_ptr(), &mut version)
509            })
510        })?;
511        Ok(version)
512    }
513}
514
515impl TryFrom<FdbFutureHandle> for () {
516    type Error = FdbError;
517    fn try_from(_f: FdbFutureHandle) -> FdbResult<Self> {
518        Ok(())
519    }
520}