foundationdb/
transaction.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBTransaction C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#transaction>
12
13use foundationdb_sys as fdb_sys;
14use std::fmt;
15use std::ops::{Deref, Range, RangeInclusive};
16use std::ptr::NonNull;
17use std::sync::Arc;
18
19use crate::future::*;
20use crate::keyselector::*;
21use crate::options;
22
23use crate::{error, FdbError, FdbResult};
24use foundationdb_macros::cfg_api_versions;
25
26use crate::error::FdbBindingError;
27
28use futures::{
29    future, future::Either, stream, Future, FutureExt, Stream, TryFutureExt, TryStreamExt,
30};
31
32#[cfg_api_versions(min = 610)]
33const METADATA_VERSION_KEY: &[u8] = b"\xff/metadataVersion";
34
35/// A committed transaction.
36#[derive(Debug)]
37#[repr(transparent)]
38pub struct TransactionCommitted {
39    tr: Transaction,
40}
41
42impl TransactionCommitted {
43    /// Retrieves the database version number at which a given transaction was committed.
44    ///
45    /// Read-only transactions do not modify the database when committed and will have a committed
46    /// version of -1. Keep in mind that a transaction which reads keys and then sets them to their
47    /// current values may be optimized to a read-only transaction.
48    ///
49    /// Note that database versions are not necessarily unique to a given transaction and so cannot
50    /// be used to determine in what order two transactions completed. The only use for this
51    /// function is to manually enforce causal consistency when calling `set_read_version()` on
52    /// another subsequent transaction.
53    ///
54    /// Most applications will not call this function.
55    pub fn committed_version(&self) -> FdbResult<i64> {
56        let mut version: i64 = 0;
57        error::eval(unsafe {
58            fdb_sys::fdb_transaction_get_committed_version(self.tr.inner.as_ptr(), &mut version)
59        })?;
60        Ok(version)
61    }
62
63    /// Reset the transaction to its initial state.
64    ///
65    /// This will not affect previously committed data.
66    ///
67    /// This is similar to dropping the transaction and creating a new one.
68    pub fn reset(mut self) -> Transaction {
69        self.tr.reset();
70        self.tr
71    }
72}
73impl From<TransactionCommitted> for Transaction {
74    fn from(tc: TransactionCommitted) -> Transaction {
75        tc.reset()
76    }
77}
78
79/// A failed to commit transaction.
80pub struct TransactionCommitError {
81    tr: Transaction,
82    err: FdbError,
83}
84
85impl TransactionCommitError {
86    /// Implements the recommended retry and backoff behavior for a transaction. This function knows
87    /// which of the error codes generated by other `Transaction` functions represent temporary
88    /// error conditions and which represent application errors that should be handled by the
89    /// application. It also implements an exponential backoff strategy to avoid swamping the
90    /// database cluster with excessive retries when there is a high level of conflict between
91    /// transactions.
92    ///
93    /// You should not call this method most of the times and use `Database::transact` which
94    /// implements a retry loop strategy for you.
95    pub fn on_error(self) -> impl Future<Output = FdbResult<Transaction>> {
96        FdbFuture::<()>::new(unsafe {
97            fdb_sys::fdb_transaction_on_error(self.tr.inner.as_ptr(), self.err.code())
98        })
99        .map_ok(|()| self.tr)
100    }
101
102    /// Reset the transaction to its initial state.
103    ///
104    /// This is similar to dropping the transaction and creating a new one.
105    pub fn reset(mut self) -> Transaction {
106        self.tr.reset();
107        self.tr
108    }
109}
110
111impl Deref for TransactionCommitError {
112    type Target = FdbError;
113    fn deref(&self) -> &FdbError {
114        &self.err
115    }
116}
117
118impl From<TransactionCommitError> for FdbError {
119    fn from(tce: TransactionCommitError) -> FdbError {
120        tce.err
121    }
122}
123
124impl fmt::Debug for TransactionCommitError {
125    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126        write!(f, "TransactionCommitError({})", self.err)
127    }
128}
129
130impl fmt::Display for TransactionCommitError {
131    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
132        self.err.fmt(f)
133    }
134}
135
136/// The result of `Transaction::Commit`
137type TransactionResult = Result<TransactionCommitted, TransactionCommitError>;
138
139/// A cancelled transaction
140#[derive(Debug)]
141#[repr(transparent)]
142pub struct TransactionCancelled {
143    tr: Transaction,
144}
145impl TransactionCancelled {
146    /// Reset the transaction to its initial state.
147    ///
148    /// This is similar to dropping the transaction and creating a new one.
149    pub fn reset(mut self) -> Transaction {
150        self.tr.reset();
151        self.tr
152    }
153}
154impl From<TransactionCancelled> for Transaction {
155    fn from(tc: TransactionCancelled) -> Transaction {
156        tc.reset()
157    }
158}
159
160/// In FoundationDB, a transaction is a mutable snapshot of a database.
161///
162/// All read and write operations on a transaction see and modify an otherwise-unchanging version of the database and only change the underlying database if and when the transaction is committed. Read operations do see the effects of previous write operations on the same transaction. Committing a transaction usually succeeds in the absence of conflicts.
163///
164/// Applications must provide error handling and an appropriate retry loop around the application code for a transaction. See the documentation for [fdb_transaction_on_error()](https://apple.github.io/foundationdb/api-c.html#transaction).
165///
166/// Transactions group operations into a unit with the properties of atomicity, isolation, and durability. Transactions also provide the ability to maintain an application’s invariants or integrity constraints, supporting the property of consistency. Together these properties are known as ACID.
167///
168/// Transactions are also causally consistent: once a transaction has been successfully committed, all subsequently created transactions will see the modifications made by it.
169#[derive(Debug)]
170pub struct Transaction {
171    // Order of fields should not be changed, because Rust drops field top-to-bottom, and
172    // transaction should be dropped before cluster.
173    inner: NonNull<fdb_sys::FDBTransaction>,
174}
175unsafe impl Send for Transaction {}
176unsafe impl Sync for Transaction {}
177
178/// Converts Rust `bool` into `fdb_sys::fdb_bool_t`
179#[inline]
180fn fdb_bool(v: bool) -> fdb_sys::fdb_bool_t {
181    if v {
182        1
183    } else {
184        0
185    }
186}
187#[inline]
188fn fdb_len(len: usize, context: &'static str) -> std::os::raw::c_int {
189    assert!(len <= i32::MAX as usize, "{}.len() > i32::MAX", context);
190    len as i32
191}
192#[inline]
193fn fdb_iteration(iteration: usize) -> std::os::raw::c_int {
194    if iteration > i32::MAX as usize {
195        0 // this will cause client_invalid_operation
196    } else {
197        iteration as i32
198    }
199}
200#[inline]
201fn fdb_limit(v: usize) -> std::os::raw::c_int {
202    if v > i32::MAX as usize {
203        i32::MAX
204    } else {
205        v as i32
206    }
207}
208
209/// `RangeOption` represents a query parameters for range scan query.
210///
211/// You can construct `RangeOption` easily:
212///
213/// ```
214/// use foundationdb::RangeOption;
215///
216/// let opt = RangeOption::from((b"begin".as_ref(), b"end".as_ref()));
217/// let opt: RangeOption = (b"begin".as_ref()..b"end".as_ref()).into();
218/// let opt = RangeOption {
219///     limit: Some(10),
220///     ..RangeOption::from((b"begin".as_ref(), b"end".as_ref()))
221/// };
222/// ```
223#[derive(Debug, Clone)]
224pub struct RangeOption<'a> {
225    /// The beginning of the range.
226    pub begin: KeySelector<'a>,
227    /// The end of the range.
228    pub end: KeySelector<'a>,
229    /// If non-zero, indicates the maximum number of key-value pairs to return.
230    pub limit: Option<usize>,
231    /// If non-zero, indicates a (soft) cap on the combined number of bytes of keys and values to
232    /// return for each item.
233    pub target_bytes: usize,
234    /// One of the options::StreamingMode values indicating how the caller would like the data in
235    /// the range returned.
236    pub mode: options::StreamingMode,
237    /// If true, key-value pairs will be returned in reverse lexicographical order beginning at
238    /// the end of the range.
239    pub reverse: bool,
240    #[doc(hidden)]
241    pub __non_exhaustive: std::marker::PhantomData<()>,
242}
243
244impl RangeOption<'_> {
245    /// Reverses the range direction.
246    pub fn rev(mut self) -> Self {
247        self.reverse = !self.reverse;
248        self
249    }
250
251    pub fn next_range(mut self, kvs: &FdbValues) -> Option<Self> {
252        if !kvs.more() {
253            return None;
254        }
255
256        let last = kvs.last()?;
257        let last_key = last.key();
258
259        if let Some(limit) = self.limit.as_mut() {
260            *limit = limit.saturating_sub(kvs.len());
261            if *limit == 0 {
262                return None;
263            }
264        }
265
266        if self.reverse {
267            self.end.make_first_greater_or_equal(last_key);
268        } else {
269            self.begin.make_first_greater_than(last_key);
270        }
271        Some(self)
272    }
273
274    #[cfg_api_versions(min = 710)]
275    pub(crate) fn next_mapped_range(mut self, kvs: &MappedKeyValues) -> Option<Self> {
276        if !kvs.more() {
277            return None;
278        }
279
280        let last = kvs.last()?;
281        let last_key = last.parent_key();
282
283        if let Some(limit) = self.limit.as_mut() {
284            *limit = limit.saturating_sub(kvs.len());
285            if *limit == 0 {
286                return None;
287            }
288        }
289
290        if self.reverse {
291            self.end.make_first_greater_or_equal(last_key);
292        } else {
293            self.begin.make_first_greater_than(last_key);
294        }
295        Some(self)
296    }
297}
298
299impl Default for RangeOption<'_> {
300    fn default() -> Self {
301        Self {
302            begin: KeySelector::first_greater_or_equal([].as_ref()),
303            end: KeySelector::first_greater_or_equal([].as_ref()),
304            limit: None,
305            target_bytes: 0,
306            mode: options::StreamingMode::Iterator,
307            reverse: false,
308            __non_exhaustive: std::marker::PhantomData,
309        }
310    }
311}
312
313impl<'a> From<(KeySelector<'a>, KeySelector<'a>)> for RangeOption<'a> {
314    fn from((begin, end): (KeySelector<'a>, KeySelector<'a>)) -> Self {
315        Self {
316            begin,
317            end,
318            ..Self::default()
319        }
320    }
321}
322impl From<(Vec<u8>, Vec<u8>)> for RangeOption<'static> {
323    fn from((begin, end): (Vec<u8>, Vec<u8>)) -> Self {
324        Self {
325            begin: KeySelector::first_greater_or_equal(begin),
326            end: KeySelector::first_greater_or_equal(end),
327            ..Self::default()
328        }
329    }
330}
331impl<'a> From<(&'a [u8], &'a [u8])> for RangeOption<'a> {
332    fn from((begin, end): (&'a [u8], &'a [u8])) -> Self {
333        Self {
334            begin: KeySelector::first_greater_or_equal(begin),
335            end: KeySelector::first_greater_or_equal(end),
336            ..Self::default()
337        }
338    }
339}
340impl<'a> From<std::ops::Range<KeySelector<'a>>> for RangeOption<'a> {
341    fn from(range: Range<KeySelector<'a>>) -> Self {
342        RangeOption::from((range.start, range.end))
343    }
344}
345
346impl<'a> From<std::ops::Range<&'a [u8]>> for RangeOption<'a> {
347    fn from(range: Range<&'a [u8]>) -> Self {
348        RangeOption::from((range.start, range.end))
349    }
350}
351
352impl From<std::ops::Range<std::vec::Vec<u8>>> for RangeOption<'static> {
353    fn from(range: Range<Vec<u8>>) -> Self {
354        RangeOption::from((range.start, range.end))
355    }
356}
357
358impl<'a> From<std::ops::RangeInclusive<&'a [u8]>> for RangeOption<'a> {
359    fn from(range: RangeInclusive<&'a [u8]>) -> Self {
360        let (start, end) = range.into_inner();
361        (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
362    }
363}
364
365impl From<std::ops::RangeInclusive<std::vec::Vec<u8>>> for RangeOption<'static> {
366    fn from(range: RangeInclusive<Vec<u8>>) -> Self {
367        let (start, end) = range.into_inner();
368        (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
369    }
370}
371
372impl Transaction {
373    pub(crate) fn new(inner: NonNull<fdb_sys::FDBTransaction>) -> Self {
374        Self { inner }
375    }
376
377    /// Called to set an option on an FDBTransaction.
378    pub fn set_option(&self, opt: options::TransactionOption) -> FdbResult<()> {
379        unsafe { opt.apply(self.inner.as_ptr()) }
380    }
381
382    /// Pass through an option given a code and raw data. Useful when creating a passthrough layer
383    /// where the code and data will be provided as raw, in order to avoid deserializing to an option
384    /// and serializing it back to code and data.
385    /// In general, you should use `set_option`.
386    pub fn set_raw_option(
387        &self,
388        code: fdb_sys::FDBTransactionOption,
389        data: Option<Vec<u8>>,
390    ) -> FdbResult<()> {
391        let (data_ptr, size) = data
392            .as_ref()
393            .map(|data| {
394                (
395                    data.as_ptr(),
396                    i32::try_from(data.len()).expect("len to fit in i32"),
397                )
398            })
399            .unwrap_or_else(|| (std::ptr::null(), 0));
400        let err = unsafe {
401            fdb_sys::fdb_transaction_set_option(self.inner.as_ptr(), code, data_ptr, size)
402        };
403        if err != 0 {
404            Err(FdbError::from_code(err))
405        } else {
406            Ok(())
407        }
408    }
409
410    /// Modify the database snapshot represented by transaction to change the given
411    /// key to have the given value.
412    ///
413    /// If the given key was not previously present in the database it is inserted.
414    /// The modification affects the actual database only if transaction is later
415    /// committed with `Transaction::commit`.
416    ///
417    /// # Arguments
418    ///
419    /// * `key` - the name of the key to be inserted into the database.
420    /// * `value` - the value to be inserted into the database
421    pub fn set(&self, key: &[u8], value: &[u8]) {
422        unsafe {
423            fdb_sys::fdb_transaction_set(
424                self.inner.as_ptr(),
425                key.as_ptr(),
426                fdb_len(key.len(), "key"),
427                value.as_ptr(),
428                fdb_len(value.len(), "value"),
429            )
430        }
431    }
432
433    /// Modify the database snapshot represented by transaction to remove the given key from the
434    /// database.
435    ///
436    /// If the key was not previously present in the database, there is no effect. The modification
437    /// affects the actual database only if transaction is later committed with
438    /// `Transaction::commit`.
439    ///
440    /// # Arguments
441    ///
442    /// * `key` - the name of the key to be removed from the database.
443    pub fn clear(&self, key: &[u8]) {
444        unsafe {
445            fdb_sys::fdb_transaction_clear(
446                self.inner.as_ptr(),
447                key.as_ptr(),
448                fdb_len(key.len(), "key"),
449            )
450        }
451    }
452
453    /// Reads a value from the database snapshot represented by transaction.
454    ///
455    /// Returns an FDBFuture which will be set to the value of key in the database if there is any.
456    ///
457    /// # Arguments
458    ///
459    /// * `key` - the name of the key to be looked up in the database
460    /// * `snapshot` - `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
461    pub fn get(
462        &self,
463        key: &[u8],
464        snapshot: bool,
465    ) -> impl Future<Output = FdbResult<Option<FdbSlice>>> + Send + Sync + Unpin {
466        FdbFuture::new(unsafe {
467            fdb_sys::fdb_transaction_get(
468                self.inner.as_ptr(),
469                key.as_ptr(),
470                fdb_len(key.len(), "key"),
471                fdb_bool(snapshot),
472            )
473        })
474    }
475
476    /// Modify the database snapshot represented by transaction to perform the operation indicated
477    /// by operationType with operand param to the value stored by the given key.
478    ///
479    /// An atomic operation is a single database command that carries out several logical steps:
480    /// reading the value of a key, performing a transformation on that value, and writing the
481    /// result. Different atomic operations perform different transformations. Like other database
482    /// operations, an atomic operation is used within a transaction; however, its use within a
483    /// transaction will not cause the transaction to conflict.
484    ///
485    /// Atomic operations do not expose the current value of the key to the client but simply send
486    /// the database the transformation to apply. In regard to conflict checking, an atomic
487    /// operation is equivalent to a write without a read. It can only cause other transactions
488    /// performing reads of the key to conflict.
489    ///
490    /// By combining these logical steps into a single, read-free operation, FoundationDB can
491    /// guarantee that the transaction will not conflict due to the operation. This makes atomic
492    /// operations ideal for operating on keys that are frequently modified. A common example is
493    /// the use of a key-value pair as a counter.
494    ///
495    /// # Warning
496    ///
497    /// If a transaction uses both an atomic operation and a strictly serializable read on the same
498    /// key, the benefits of using the atomic operation (for both conflict checking and performance)
499    /// are lost.
500    pub fn atomic_op(&self, key: &[u8], param: &[u8], op_type: options::MutationType) {
501        unsafe {
502            fdb_sys::fdb_transaction_atomic_op(
503                self.inner.as_ptr(),
504                key.as_ptr(),
505                fdb_len(key.len(), "key"),
506                param.as_ptr(),
507                fdb_len(param.len(), "param"),
508                op_type.code(),
509            )
510        }
511    }
512
513    /// Resolves a key selector against the keys in the database snapshot represented by
514    /// transaction.
515    ///
516    /// Returns an FDBFuture which will be set to the key in the database matching the key
517    /// selector.
518    ///
519    /// # Arguments
520    ///
521    /// * `selector`: the key selector
522    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
523    pub fn get_key(
524        &self,
525        selector: &KeySelector,
526        snapshot: bool,
527    ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
528        let key = selector.key();
529        FdbFuture::new(unsafe {
530            fdb_sys::fdb_transaction_get_key(
531                self.inner.as_ptr(),
532                key.as_ptr(),
533                fdb_len(key.len(), "key"),
534                fdb_bool(selector.or_equal()),
535                selector.offset(),
536                fdb_bool(snapshot),
537            )
538        })
539    }
540
541    /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
542    /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
543    /// equal to the key resolved by the begin key selector and lexicographically less than the key
544    /// resolved by the end key selector.
545    ///
546    /// Returns a stream of KeyValue slices.
547    ///
548    /// This method is a little more efficient than `get_ranges_keyvalues` but a little harder to
549    /// use.
550    ///
551    /// # Arguments
552    ///
553    /// * `opt`: the range, limit, target_bytes and mode
554    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
555    pub fn get_ranges<'a>(
556        &'a self,
557        opt: RangeOption<'a>,
558        snapshot: bool,
559    ) -> impl Stream<Item = FdbResult<FdbValues>> + Send + Sync + Unpin + 'a {
560        stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
561            if let Some(opt) = maybe_opt {
562                Either::Left(self.get_range(&opt, iteration as usize, snapshot).map(
563                    move |maybe_values| {
564                        let next_opt = match &maybe_values {
565                            Ok(values) => opt.next_range(values),
566                            Err(..) => None,
567                        };
568                        Some((maybe_values, (iteration + 1, next_opt)))
569                    },
570                ))
571            } else {
572                Either::Right(future::ready(None))
573            }
574        })
575    }
576
577    /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
578    /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
579    /// equal to the key resolved by the begin key selector and lexicographically less than the key
580    /// resolved by the end key selector.
581    ///
582    /// Returns a stream of KeyValue.
583    ///
584    /// # Arguments
585    ///
586    /// * `opt`: the range, limit, target_bytes and mode
587    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
588    pub fn get_ranges_keyvalues<'a>(
589        &'a self,
590        opt: RangeOption<'a>,
591        snapshot: bool,
592    ) -> impl Stream<Item = FdbResult<FdbValue>> + Unpin + 'a {
593        self.get_ranges(opt, snapshot)
594            .map_ok(|values| stream::iter(values.into_iter().map(Ok)))
595            .try_flatten()
596    }
597
598    /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
599    /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
600    /// equal to the key resolved by the begin key selector and lexicographically less than the key
601    /// resolved by the end key selector.
602    ///
603    /// # Arguments
604    ///
605    /// * `opt`: the range, limit, target_bytes and mode
606    /// * `iteration`: If opt.mode is Iterator, this parameter should start at 1 and be incremented
607    ///   by 1 for each successive call while reading this range. In all other cases it is ignored.
608    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
609    pub fn get_range(
610        &self,
611        opt: &RangeOption,
612        iteration: usize,
613        snapshot: bool,
614    ) -> impl Future<Output = FdbResult<FdbValues>> + Send + Sync + Unpin {
615        let begin = &opt.begin;
616        let end = &opt.end;
617        let key_begin = begin.key();
618        let key_end = end.key();
619
620        FdbFuture::new(unsafe {
621            fdb_sys::fdb_transaction_get_range(
622                self.inner.as_ptr(),
623                key_begin.as_ptr(),
624                fdb_len(key_begin.len(), "key_begin"),
625                fdb_bool(begin.or_equal()),
626                begin.offset(),
627                key_end.as_ptr(),
628                fdb_len(key_end.len(), "key_end"),
629                fdb_bool(end.or_equal()),
630                end.offset(),
631                fdb_limit(opt.limit.unwrap_or(0)),
632                fdb_limit(opt.target_bytes),
633                opt.mode.code(),
634                fdb_iteration(iteration),
635                fdb_bool(snapshot),
636                fdb_bool(opt.reverse),
637            )
638        })
639    }
640
641    /// Mapped Range is an experimental feature introduced in FDB 7.1.
642    /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
643    /// In such a case, querying records by scanning an index in relational databases can be
644    /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
645    ///
646    /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
647    /// this can happen in one request without additional back and forth. Considering the overhead
648    /// of each request, this saves time and resources on serialization, deserialization, and network.
649    ///
650    /// A mapped request will:
651    ///
652    /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
653    /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
654    /// * Put all results in a nested structure and return them.
655    ///
656    /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
657    /// using snapshot isolation AND disabling read-your-writes.
658    ///
659    /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
660    ///
661    /// This is the "raw" version, users are expected to use [Transaction::get_mapped_ranges]
662    #[cfg_api_versions(min = 710)]
663    pub fn get_mapped_range(
664        &self,
665        opt: &RangeOption,
666        mapper: &[u8],
667        iteration: usize,
668        snapshot: bool,
669    ) -> impl Future<Output = FdbResult<MappedKeyValues>> + Send + Sync + Unpin {
670        let begin = &opt.begin;
671        let end = &opt.end;
672        let key_begin = begin.key();
673        let key_end = end.key();
674
675        FdbFuture::new(unsafe {
676            fdb_sys::fdb_transaction_get_mapped_range(
677                self.inner.as_ptr(),
678                key_begin.as_ptr(),
679                fdb_len(key_begin.len(), "key_begin"),
680                fdb_bool(begin.or_equal()),
681                begin.offset(),
682                key_end.as_ptr(),
683                fdb_len(key_end.len(), "key_end"),
684                fdb_bool(end.or_equal()),
685                end.offset(),
686                mapper.as_ptr(),
687                fdb_len(mapper.len(), "mapper_length"),
688                fdb_limit(opt.limit.unwrap_or(0)),
689                fdb_limit(opt.target_bytes),
690                opt.mode.code(),
691                fdb_iteration(iteration),
692                fdb_bool(snapshot),
693                fdb_bool(opt.reverse),
694            )
695        })
696    }
697
698    /// Mapped Range is an experimental feature introduced in FDB 7.1.
699    /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
700    /// In such a case, querying records by scanning an index in relational databases can be
701    /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
702    ///
703    /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
704    /// this can happen in one request without additional back and forth. Considering the overhead
705    /// of each request, this saves time and resources on serialization, deserialization, and network.
706    ///
707    /// A mapped request will:
708    ///
709    /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
710    /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
711    /// * Put all results in a nested structure and return them.
712    ///
713    /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
714    /// using snapshot isolation AND disabling read-your-writes.
715    ///
716    /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
717    #[cfg_api_versions(min = 710)]
718    pub fn get_mapped_ranges<'a>(
719        &'a self,
720        opt: RangeOption<'a>,
721        mapper: &'a [u8],
722        snapshot: bool,
723    ) -> impl Stream<Item = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + 'a {
724        stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
725            if let Some(opt) = maybe_opt {
726                Either::Left(
727                    self.get_mapped_range(&opt, mapper, iteration as usize, snapshot)
728                        .map(move |maybe_values| {
729                            let next_opt = match &maybe_values {
730                                Ok(values) => opt.next_mapped_range(values),
731                                Err(..) => None,
732                            };
733                            Some((maybe_values, (iteration + 1, next_opt)))
734                        }),
735                )
736            } else {
737                Either::Right(future::ready(None))
738            }
739        })
740    }
741
742    /// Modify the database snapshot represented by transaction to remove all keys (if any) which
743    /// are lexicographically greater than or equal to the given begin key and lexicographically
744    /// less than the given end_key.
745    ///
746    /// The modification affects the actual database only if transaction is later committed with
747    /// `Transaction::commit`.
748    pub fn clear_range(&self, begin: &[u8], end: &[u8]) {
749        unsafe {
750            fdb_sys::fdb_transaction_clear_range(
751                self.inner.as_ptr(),
752                begin.as_ptr(),
753                fdb_len(begin.len(), "begin"),
754                end.as_ptr(),
755                fdb_len(end.len(), "end"),
756            )
757        }
758    }
759
760    /// Get the estimated byte size of the key range based on the byte sample collected by FDB
761    #[cfg_api_versions(min = 630)]
762    pub fn get_estimated_range_size_bytes(
763        &self,
764        begin: &[u8],
765        end: &[u8],
766    ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
767        FdbFuture::<i64>::new(unsafe {
768            fdb_sys::fdb_transaction_get_estimated_range_size_bytes(
769                self.inner.as_ptr(),
770                begin.as_ptr(),
771                fdb_len(begin.len(), "begin"),
772                end.as_ptr(),
773                fdb_len(end.len(), "end"),
774            )
775        })
776    }
777
778    /// Attempts to commit the sets and clears previously applied to the database snapshot
779    /// represented by transaction to the actual database.
780    ///
781    /// The commit may or may not succeed – in particular, if a conflicting transaction previously
782    /// committed, then the commit must fail in order to preserve transactional isolation. If the
783    /// commit does succeed, the transaction is durably committed to the database and all
784    /// subsequently started transactions will observe its effects.
785    ///
786    /// It is not necessary to commit a read-only transaction – you can simply drop it.
787    ///
788    /// Callers will usually want to retry a transaction if the commit or a another method on the
789    /// transaction returns a retryable error (see `on_error` and/or `Database::transact`).
790    ///
791    /// As with other client/server databases, in some failure scenarios a client may be unable to
792    /// determine whether a transaction succeeded. In these cases, `Transaction::commit` will return
793    /// an error and `is_maybe_committed()` will returns true on that error. The `on_error` function
794    /// treats this error as retryable, so retry loops that don’t check for `is_maybe_committed()`
795    /// could execute the transaction twice. In these cases, you must consider the idempotence of
796    /// the transaction. For more information, see [Transactions with unknown results](https://apple.github.io/foundationdb/developer-guide.html#developer-guide-unknown-results).
797    ///
798    /// Normally, commit will wait for outstanding reads to return. However, if those reads were
799    /// snapshot reads or the transaction option for disabling “read-your-writes” has been invoked,
800    /// any outstanding reads will immediately return errors.
801    pub fn commit(self) -> impl Future<Output = TransactionResult> + Send + Sync + Unpin {
802        FdbFuture::<()>::new(unsafe { fdb_sys::fdb_transaction_commit(self.inner.as_ptr()) }).map(
803            move |r| match r {
804                Ok(()) => Ok(TransactionCommitted { tr: self }),
805                Err(err) => Err(TransactionCommitError { tr: self, err }),
806            },
807        )
808    }
809
810    /// Implements the recommended retry and backoff behavior for a transaction. This function knows
811    /// which of the error codes generated by other `Transaction` functions represent temporary
812    /// error conditions and which represent application errors that should be handled by the
813    /// application. It also implements an exponential backoff strategy to avoid swamping the
814    /// database cluster with excessive retries when there is a high level of conflict between
815    /// transactions.
816    ///
817    /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
818    /// transaction has already been reset.
819    ///
820    /// You should not call this method most of the times and use `Database::transact` which
821    /// implements a retry loop strategy for you.
822    pub fn on_error(
823        self,
824        err: FdbError,
825    ) -> impl Future<Output = FdbResult<Transaction>> + Send + Sync + Unpin {
826        FdbFuture::<()>::new(unsafe {
827            fdb_sys::fdb_transaction_on_error(self.inner.as_ptr(), err.code())
828        })
829        .map_ok(|()| self)
830    }
831
832    /// Cancels the transaction. All pending or future uses of the transaction will return a
833    /// transaction_cancelled error. The transaction can be used again after it is reset.
834    pub fn cancel(self) -> TransactionCancelled {
835        unsafe { fdb_sys::fdb_transaction_cancel(self.inner.as_ptr()) };
836        TransactionCancelled { tr: self }
837    }
838
839    /// Returns a list of public network addresses as strings, one for each of the storage servers
840    /// responsible for storing key_name and its associated value.
841    pub fn get_addresses_for_key(
842        &self,
843        key: &[u8],
844    ) -> impl Future<Output = FdbResult<FdbAddresses>> + Send + Sync + Unpin {
845        FdbFuture::new(unsafe {
846            fdb_sys::fdb_transaction_get_addresses_for_key(
847                self.inner.as_ptr(),
848                key.as_ptr(),
849                fdb_len(key.len(), "key"),
850            )
851        })
852    }
853
854    /// A watch's behavior is relative to the transaction that created it. A watch will report a
855    /// change in relation to the key’s value as readable by that transaction. The initial value
856    /// used for comparison is either that of the transaction’s read version or the value as
857    /// modified by the transaction itself prior to the creation of the watch. If the value changes
858    /// and then changes back to its initial value, the watch might not report the change.
859    ///
860    /// Until the transaction that created it has been committed, a watch will not report changes
861    /// made by other transactions. In contrast, a watch will immediately report changes made by
862    /// the transaction itself. Watches cannot be created if the transaction has set the
863    /// READ_YOUR_WRITES_DISABLE transaction option, and an attempt to do so will return an
864    /// watches_disabled error.
865    ///
866    /// If the transaction used to create a watch encounters an error during commit, then the watch
867    /// will be set with that error. A transaction whose commit result is unknown will set all of
868    /// its watches with the commit_unknown_result error. If an uncommitted transaction is reset or
869    /// destroyed, then any watches it created will be set with the transaction_cancelled error.
870    ///
871    /// Returns an future representing an empty value that will be set once the watch has
872    /// detected a change to the value at the specified key.
873    ///
874    /// By default, each database connection can have no more than 10,000 watches that have not yet
875    /// reported a change. When this number is exceeded, an attempt to create a watch will return a
876    /// too_many_watches error. This limit can be changed using the MAX_WATCHES database option.
877    /// Because a watch outlives the transaction that creates it, any watch that is no longer
878    /// needed should be cancelled by dropping its future.
879    pub fn watch(&self, key: &[u8]) -> impl Future<Output = FdbResult<()>> + Send + Sync + Unpin {
880        FdbFuture::new(unsafe {
881            fdb_sys::fdb_transaction_watch(
882                self.inner.as_ptr(),
883                key.as_ptr(),
884                fdb_len(key.len(), "key"),
885            )
886        })
887    }
888
889    /// Returns an FDBFuture which will be set to the approximate transaction size so far in the
890    /// returned future, which is the summation of the estimated size of mutations, read conflict
891    /// ranges, and write conflict ranges.
892    ///
893    /// This can be called multiple times before the transaction is committed.
894    #[cfg_api_versions(min = 620)]
895    pub fn get_approximate_size(
896        &self,
897    ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
898        FdbFuture::new(unsafe {
899            fdb_sys::fdb_transaction_get_approximate_size(self.inner.as_ptr())
900        })
901    }
902
903    /// Gets a list of keys that can split the given range into (roughly) equally sized chunks based on chunk_size.
904    /// Note: the returned split points contain the start key and end key of the given range.
905    #[cfg_api_versions(min = 700)]
906    pub fn get_range_split_points(
907        &self,
908        begin: &[u8],
909        end: &[u8],
910        chunk_size: i64,
911    ) -> impl Future<Output = FdbResult<FdbKeys>> + Send + Sync + Unpin {
912        FdbFuture::<FdbKeys>::new(unsafe {
913            fdb_sys::fdb_transaction_get_range_split_points(
914                self.inner.as_ptr(),
915                begin.as_ptr(),
916                fdb_len(begin.len(), "begin"),
917                end.as_ptr(),
918                fdb_len(end.len(), "end"),
919                chunk_size,
920            )
921        })
922    }
923
924    /// Returns an FDBFuture which will be set to the versionstamp which was used by any
925    /// versionstamp operations in this transaction.
926    ///
927    /// The future will be ready only after the successful completion of a call to `commit()` on
928    /// this Transaction. Read-only transactions do not modify the database when committed and will
929    /// result in the future completing with an error. Keep in mind that a transaction which reads
930    /// keys and then sets them to their current values may be optimized to a read-only transaction.
931    ///
932    /// Most applications will not call this function.
933    pub fn get_versionstamp(
934        &self,
935    ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
936        FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_versionstamp(self.inner.as_ptr()) })
937    }
938
939    /// The transaction obtains a snapshot read version automatically at the time of the first call
940    /// to `get_*()` (including this one) and (unless causal consistency has been deliberately
941    /// compromised by transaction options) is guaranteed to represent all transactions which were
942    /// reported committed before that call.
943    pub fn get_read_version(&self) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
944        FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_read_version(self.inner.as_ptr()) })
945    }
946
947    /// Sets the snapshot read version used by a transaction.
948    ///
949    /// This is not needed in simple cases.
950    /// If the given version is too old, subsequent reads will fail with error_code_past_version;
951    /// if it is too new, subsequent reads may be delayed indefinitely and/or fail with
952    /// error_code_future_version. If any of get_*() have been called on this transaction already,
953    /// the result is undefined.
954    pub fn set_read_version(&self, version: i64) {
955        unsafe { fdb_sys::fdb_transaction_set_read_version(self.inner.as_ptr(), version) }
956    }
957
958    /// The metadata version key `\xff/metadataVersion` is a key intended to help layers deal with hot keys.
959    /// The value of this key is sent to clients along with the read version from the proxy,
960    /// so a client can read its value without communicating with a storage server.
961    /// To retrieve the metadataVersion, you need to set `TransactionOption::ReadSystemKeys`
962    #[cfg_api_versions(min = 610)]
963    pub async fn get_metadata_version(&self, snapshot: bool) -> FdbResult<Option<i64>> {
964        match self.get(METADATA_VERSION_KEY, snapshot).await {
965            Ok(Some(fdb_slice)) => {
966                let value = fdb_slice.deref();
967                // as we cannot write the metadata-key directly(we must mutate with an atomic_op),
968                // can we assume that it will always be the correct size?
969                if value.len() < 8 {
970                    return Ok(None);
971                }
972
973                // The 80-bits versionstamps are 10 bytes longs, and are composed of:
974                // * 8 bytes (Transaction Version)
975                // * followed by 2 bytes (Transaction Batch Order)
976                // More details can be found here: https://forums.foundationdb.org/t/implementing-versionstamps-in-bindings/250
977                let mut arr = [0u8; 8];
978                arr.copy_from_slice(&value[0..8]);
979                let transaction_version: i64 = i64::from_be_bytes(arr);
980
981                Ok(Some(transaction_version))
982            }
983            Ok(None) => Ok(None),
984
985            Err(err) => Err(err),
986        }
987    }
988
989    #[cfg_api_versions(min = 610)]
990    pub fn update_metadata_version(&self) {
991        // The param is transformed by removing the final four bytes from ``param`` and reading
992        // those as a little-Endian 32-bit integer to get a position ``pos``.
993        // The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the
994        // versionstamp of the transaction used. The first byte of the parameter is position 0.
995        // As we only have the metadata value, we can just create an 14-bytes Vec filled with 0u8.
996        let param = vec![0u8; 14];
997        self.atomic_op(
998            METADATA_VERSION_KEY,
999            param.as_slice(),
1000            options::MutationType::SetVersionstampedValue,
1001        )
1002    }
1003
1004    /// Reset transaction to its initial state.
1005    ///
1006    /// In order to protect against a race condition with cancel(), this call require a mutable
1007    /// access to the transaction.
1008    ///
1009    /// This is similar to dropping the transaction and creating a new one.
1010    ///
1011    /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
1012    /// transaction has already been reset.
1013    pub fn reset(&mut self) {
1014        unsafe { fdb_sys::fdb_transaction_reset(self.inner.as_ptr()) }
1015    }
1016
1017    /// Adds a conflict range to a transaction without performing the associated read or write.
1018    ///
1019    /// # Note
1020    ///
1021    /// Most applications will use the serializable isolation that transactions provide by default
1022    /// and will not need to manipulate conflict ranges.
1023    pub fn add_conflict_range(
1024        &self,
1025        begin: &[u8],
1026        end: &[u8],
1027        ty: options::ConflictRangeType,
1028    ) -> FdbResult<()> {
1029        error::eval(unsafe {
1030            fdb_sys::fdb_transaction_add_conflict_range(
1031                self.inner.as_ptr(),
1032                begin.as_ptr(),
1033                fdb_len(begin.len(), "begin"),
1034                end.as_ptr(),
1035                fdb_len(end.len(), "end"),
1036                ty.code(),
1037            )
1038        })
1039    }
1040}
1041
1042impl Drop for Transaction {
1043    fn drop(&mut self) {
1044        unsafe {
1045            fdb_sys::fdb_transaction_destroy(self.inner.as_ptr());
1046        }
1047    }
1048}
1049
1050/// A retryable transaction, generated by Database.run
1051#[derive(Clone)]
1052pub struct RetryableTransaction {
1053    inner: Arc<Transaction>,
1054}
1055
1056impl Deref for RetryableTransaction {
1057    type Target = Transaction;
1058    fn deref(&self) -> &Transaction {
1059        self.inner.deref()
1060    }
1061}
1062
1063impl RetryableTransaction {
1064    pub(crate) fn new(t: Transaction) -> RetryableTransaction {
1065        RetryableTransaction { inner: Arc::new(t) }
1066    }
1067
1068    pub(crate) fn take(self) -> Result<Transaction, FdbBindingError> {
1069        // checking weak references
1070        if Arc::weak_count(&self.inner) != 0 {
1071            return Err(FdbBindingError::ReferenceToTransactionKept);
1072        }
1073        Arc::try_unwrap(self.inner).map_err(|_| FdbBindingError::ReferenceToTransactionKept)
1074    }
1075
1076    pub(crate) async fn on_error(
1077        self,
1078        err: FdbError,
1079    ) -> Result<Result<RetryableTransaction, FdbError>, FdbBindingError> {
1080        Ok(self
1081            .take()?
1082            .on_error(err)
1083            .await
1084            .map(RetryableTransaction::new))
1085    }
1086
1087    pub(crate) async fn commit(
1088        self,
1089    ) -> Result<Result<TransactionCommitted, TransactionCommitError>, FdbBindingError> {
1090        Ok(self.take()?.commit().await)
1091    }
1092}