foundationdb/
transaction.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBTransaction C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#transaction>
12
13use foundationdb_sys as fdb_sys;
14use std::fmt;
15use std::ops::{Deref, Range, RangeInclusive};
16use std::ptr::NonNull;
17use std::sync::Arc;
18
19use crate::future::*;
20use crate::keyselector::*;
21use crate::options;
22
23use crate::{error, FdbError, FdbResult};
24use foundationdb_macros::cfg_api_versions;
25
26use crate::error::FdbBindingError;
27
28use futures::{
29    future, future::Either, stream, Future, FutureExt, Stream, TryFutureExt, TryStreamExt,
30};
31
32#[cfg_api_versions(min = 610)]
33const METADATA_VERSION_KEY: &[u8] = b"\xff/metadataVersion";
34
35/// A committed transaction.
36#[derive(Debug)]
37#[repr(transparent)]
38pub struct TransactionCommitted {
39    tr: Transaction,
40}
41
42impl TransactionCommitted {
43    /// Retrieves the database version number at which a given transaction was committed.
44    ///
45    /// Read-only transactions do not modify the database when committed and will have a committed
46    /// version of -1. Keep in mind that a transaction which reads keys and then sets them to their
47    /// current values may be optimized to a read-only transaction.
48    ///
49    /// Note that database versions are not necessarily unique to a given transaction and so cannot
50    /// be used to determine in what order two transactions completed. The only use for this
51    /// function is to manually enforce causal consistency when calling `set_read_version()` on
52    /// another subsequent transaction.
53    ///
54    /// Most applications will not call this function.
55    pub fn committed_version(&self) -> FdbResult<i64> {
56        let mut version: i64 = 0;
57        error::eval(unsafe {
58            fdb_sys::fdb_transaction_get_committed_version(self.tr.inner.as_ptr(), &mut version)
59        })?;
60        Ok(version)
61    }
62
63    /// Reset the transaction to its initial state.
64    ///
65    /// This will not affect previously committed data.
66    ///
67    /// This is similar to dropping the transaction and creating a new one.
68    pub fn reset(mut self) -> Transaction {
69        self.tr.reset();
70        self.tr
71    }
72}
73impl From<TransactionCommitted> for Transaction {
74    fn from(tc: TransactionCommitted) -> Transaction {
75        tc.reset()
76    }
77}
78
79/// A failed to commit transaction.
80pub struct TransactionCommitError {
81    tr: Transaction,
82    err: FdbError,
83}
84
85impl TransactionCommitError {
86    /// Implements the recommended retry and backoff behavior for a transaction. This function knows
87    /// which of the error codes generated by other `Transaction` functions represent temporary
88    /// error conditions and which represent application errors that should be handled by the
89    /// application. It also implements an exponential backoff strategy to avoid swamping the
90    /// database cluster with excessive retries when there is a high level of conflict between
91    /// transactions.
92    ///
93    /// You should not call this method most of the times and use `Database::transact` which
94    /// implements a retry loop strategy for you.
95    pub fn on_error(self) -> impl Future<Output = FdbResult<Transaction>> {
96        FdbFuture::<()>::new(unsafe {
97            fdb_sys::fdb_transaction_on_error(self.tr.inner.as_ptr(), self.err.code())
98        })
99        .map_ok(|()| self.tr)
100    }
101
102    /// Reset the transaction to its initial state.
103    ///
104    /// This is similar to dropping the transaction and creating a new one.
105    pub fn reset(mut self) -> Transaction {
106        self.tr.reset();
107        self.tr
108    }
109}
110
111impl Deref for TransactionCommitError {
112    type Target = FdbError;
113    fn deref(&self) -> &FdbError {
114        &self.err
115    }
116}
117
118impl From<TransactionCommitError> for FdbError {
119    fn from(tce: TransactionCommitError) -> FdbError {
120        tce.err
121    }
122}
123
124impl fmt::Debug for TransactionCommitError {
125    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
126        write!(f, "TransactionCommitError({})", self.err)
127    }
128}
129
130impl fmt::Display for TransactionCommitError {
131    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
132        self.err.fmt(f)
133    }
134}
135
136impl std::error::Error for TransactionCommitError {}
137
138/// The result of `Transaction::Commit`
139type TransactionResult = Result<TransactionCommitted, TransactionCommitError>;
140
141/// A cancelled transaction
142#[derive(Debug)]
143#[repr(transparent)]
144pub struct TransactionCancelled {
145    tr: Transaction,
146}
147impl TransactionCancelled {
148    /// Reset the transaction to its initial state.
149    ///
150    /// This is similar to dropping the transaction and creating a new one.
151    pub fn reset(mut self) -> Transaction {
152        self.tr.reset();
153        self.tr
154    }
155}
156impl From<TransactionCancelled> for Transaction {
157    fn from(tc: TransactionCancelled) -> Transaction {
158        tc.reset()
159    }
160}
161
162/// In FoundationDB, a transaction is a mutable snapshot of a database.
163///
164/// All read and write operations on a transaction see and modify an otherwise-unchanging version of the database and only change the underlying database if and when the transaction is committed. Read operations do see the effects of previous write operations on the same transaction. Committing a transaction usually succeeds in the absence of conflicts.
165///
166/// Applications must provide error handling and an appropriate retry loop around the application code for a transaction. See the documentation for [fdb_transaction_on_error()](https://apple.github.io/foundationdb/api-c.html#transaction).
167///
168/// Transactions group operations into a unit with the properties of atomicity, isolation, and durability. Transactions also provide the ability to maintain an application’s invariants or integrity constraints, supporting the property of consistency. Together these properties are known as ACID.
169///
170/// Transactions are also causally consistent: once a transaction has been successfully committed, all subsequently created transactions will see the modifications made by it.
171#[derive(Debug)]
172pub struct Transaction {
173    // Order of fields should not be changed, because Rust drops field top-to-bottom, and
174    // transaction should be dropped before cluster.
175    inner: NonNull<fdb_sys::FDBTransaction>,
176}
177unsafe impl Send for Transaction {}
178unsafe impl Sync for Transaction {}
179
180/// Converts Rust `bool` into `fdb_sys::fdb_bool_t`
181#[inline]
182fn fdb_bool(v: bool) -> fdb_sys::fdb_bool_t {
183    if v {
184        1
185    } else {
186        0
187    }
188}
189#[inline]
190fn fdb_len(len: usize, context: &'static str) -> std::os::raw::c_int {
191    assert!(len <= i32::MAX as usize, "{context}.len() > i32::MAX");
192    len as i32
193}
194#[inline]
195fn fdb_iteration(iteration: usize) -> std::os::raw::c_int {
196    if iteration > i32::MAX as usize {
197        0 // this will cause client_invalid_operation
198    } else {
199        iteration as i32
200    }
201}
202#[inline]
203fn fdb_limit(v: usize) -> std::os::raw::c_int {
204    if v > i32::MAX as usize {
205        i32::MAX
206    } else {
207        v as i32
208    }
209}
210
211/// `RangeOption` represents a query parameters for range scan query.
212///
213/// You can construct `RangeOption` easily:
214///
215/// ```
216/// use foundationdb::RangeOption;
217///
218/// let opt = RangeOption::from((b"begin".as_ref(), b"end".as_ref()));
219/// let opt: RangeOption = (b"begin".as_ref()..b"end".as_ref()).into();
220/// let opt = RangeOption {
221///     limit: Some(10),
222///     ..RangeOption::from((b"begin".as_ref(), b"end".as_ref()))
223/// };
224/// ```
225#[derive(Debug, Clone)]
226pub struct RangeOption<'a> {
227    /// The beginning of the range.
228    pub begin: KeySelector<'a>,
229    /// The end of the range.
230    pub end: KeySelector<'a>,
231    /// If non-zero, indicates the maximum number of key-value pairs to return.
232    pub limit: Option<usize>,
233    /// If non-zero, indicates a (soft) cap on the combined number of bytes of keys and values to
234    /// return for each item.
235    pub target_bytes: usize,
236    /// One of the options::StreamingMode values indicating how the caller would like the data in
237    /// the range returned.
238    pub mode: options::StreamingMode,
239    /// If true, key-value pairs will be returned in reverse lexicographical order beginning at
240    /// the end of the range.
241    pub reverse: bool,
242    #[doc(hidden)]
243    pub __non_exhaustive: std::marker::PhantomData<()>,
244}
245
246impl RangeOption<'_> {
247    /// Reverses the range direction.
248    pub fn rev(mut self) -> Self {
249        self.reverse = !self.reverse;
250        self
251    }
252
253    pub fn next_range(mut self, kvs: &FdbValues) -> Option<Self> {
254        if !kvs.more() {
255            return None;
256        }
257
258        let last = kvs.last()?;
259        let last_key = last.key();
260
261        if let Some(limit) = self.limit.as_mut() {
262            *limit = limit.saturating_sub(kvs.len());
263            if *limit == 0 {
264                return None;
265            }
266        }
267
268        if self.reverse {
269            self.end.make_first_greater_or_equal(last_key);
270        } else {
271            self.begin.make_first_greater_than(last_key);
272        }
273        Some(self)
274    }
275
276    #[cfg_api_versions(min = 710)]
277    pub(crate) fn next_mapped_range(mut self, kvs: &MappedKeyValues) -> Option<Self> {
278        if !kvs.more() {
279            return None;
280        }
281
282        let last = kvs.last()?;
283        let last_key = last.parent_key();
284
285        if let Some(limit) = self.limit.as_mut() {
286            *limit = limit.saturating_sub(kvs.len());
287            if *limit == 0 {
288                return None;
289            }
290        }
291
292        if self.reverse {
293            self.end.make_first_greater_or_equal(last_key);
294        } else {
295            self.begin.make_first_greater_than(last_key);
296        }
297        Some(self)
298    }
299}
300
301impl Default for RangeOption<'_> {
302    fn default() -> Self {
303        Self {
304            begin: KeySelector::first_greater_or_equal([].as_ref()),
305            end: KeySelector::first_greater_or_equal([].as_ref()),
306            limit: None,
307            target_bytes: 0,
308            mode: options::StreamingMode::Iterator,
309            reverse: false,
310            __non_exhaustive: std::marker::PhantomData,
311        }
312    }
313}
314
315impl<'a> From<(KeySelector<'a>, KeySelector<'a>)> for RangeOption<'a> {
316    fn from((begin, end): (KeySelector<'a>, KeySelector<'a>)) -> Self {
317        Self {
318            begin,
319            end,
320            ..Self::default()
321        }
322    }
323}
324impl From<(Vec<u8>, Vec<u8>)> for RangeOption<'static> {
325    fn from((begin, end): (Vec<u8>, Vec<u8>)) -> Self {
326        Self {
327            begin: KeySelector::first_greater_or_equal(begin),
328            end: KeySelector::first_greater_or_equal(end),
329            ..Self::default()
330        }
331    }
332}
333impl<'a> From<(&'a [u8], &'a [u8])> for RangeOption<'a> {
334    fn from((begin, end): (&'a [u8], &'a [u8])) -> Self {
335        Self {
336            begin: KeySelector::first_greater_or_equal(begin),
337            end: KeySelector::first_greater_or_equal(end),
338            ..Self::default()
339        }
340    }
341}
342impl<'a> From<std::ops::Range<KeySelector<'a>>> for RangeOption<'a> {
343    fn from(range: Range<KeySelector<'a>>) -> Self {
344        RangeOption::from((range.start, range.end))
345    }
346}
347
348impl<'a> From<std::ops::Range<&'a [u8]>> for RangeOption<'a> {
349    fn from(range: Range<&'a [u8]>) -> Self {
350        RangeOption::from((range.start, range.end))
351    }
352}
353
354impl From<std::ops::Range<std::vec::Vec<u8>>> for RangeOption<'static> {
355    fn from(range: Range<Vec<u8>>) -> Self {
356        RangeOption::from((range.start, range.end))
357    }
358}
359
360impl<'a> From<std::ops::RangeInclusive<&'a [u8]>> for RangeOption<'a> {
361    fn from(range: RangeInclusive<&'a [u8]>) -> Self {
362        let (start, end) = range.into_inner();
363        (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
364    }
365}
366
367impl From<std::ops::RangeInclusive<std::vec::Vec<u8>>> for RangeOption<'static> {
368    fn from(range: RangeInclusive<Vec<u8>>) -> Self {
369        let (start, end) = range.into_inner();
370        (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
371    }
372}
373
374impl Transaction {
375    pub(crate) fn new(inner: NonNull<fdb_sys::FDBTransaction>) -> Self {
376        Self { inner }
377    }
378
379    /// Called to set an option on an FDBTransaction.
380    pub fn set_option(&self, opt: options::TransactionOption) -> FdbResult<()> {
381        unsafe { opt.apply(self.inner.as_ptr()) }
382    }
383
384    /// Pass through an option given a code and raw data. Useful when creating a passthrough layer
385    /// where the code and data will be provided as raw, in order to avoid deserializing to an option
386    /// and serializing it back to code and data.
387    /// In general, you should use `set_option`.
388    pub fn set_raw_option(
389        &self,
390        code: fdb_sys::FDBTransactionOption,
391        data: Option<Vec<u8>>,
392    ) -> FdbResult<()> {
393        let (data_ptr, size) = data
394            .as_ref()
395            .map(|data| {
396                (
397                    data.as_ptr(),
398                    i32::try_from(data.len()).expect("len to fit in i32"),
399                )
400            })
401            .unwrap_or_else(|| (std::ptr::null(), 0));
402        let err = unsafe {
403            fdb_sys::fdb_transaction_set_option(self.inner.as_ptr(), code, data_ptr, size)
404        };
405        if err != 0 {
406            Err(FdbError::from_code(err))
407        } else {
408            Ok(())
409        }
410    }
411
412    /// Modify the database snapshot represented by transaction to change the given
413    /// key to have the given value.
414    ///
415    /// If the given key was not previously present in the database it is inserted.
416    /// The modification affects the actual database only if transaction is later
417    /// committed with `Transaction::commit`.
418    ///
419    /// # Arguments
420    ///
421    /// * `key` - the name of the key to be inserted into the database.
422    /// * `value` - the value to be inserted into the database
423    pub fn set(&self, key: &[u8], value: &[u8]) {
424        unsafe {
425            fdb_sys::fdb_transaction_set(
426                self.inner.as_ptr(),
427                key.as_ptr(),
428                fdb_len(key.len(), "key"),
429                value.as_ptr(),
430                fdb_len(value.len(), "value"),
431            )
432        }
433    }
434
435    /// Modify the database snapshot represented by transaction to remove the given key from the
436    /// database.
437    ///
438    /// If the key was not previously present in the database, there is no effect. The modification
439    /// affects the actual database only if transaction is later committed with
440    /// `Transaction::commit`.
441    ///
442    /// # Arguments
443    ///
444    /// * `key` - the name of the key to be removed from the database.
445    pub fn clear(&self, key: &[u8]) {
446        unsafe {
447            fdb_sys::fdb_transaction_clear(
448                self.inner.as_ptr(),
449                key.as_ptr(),
450                fdb_len(key.len(), "key"),
451            )
452        }
453    }
454
455    /// Reads a value from the database snapshot represented by transaction.
456    ///
457    /// Returns an FDBFuture which will be set to the value of key in the database if there is any.
458    ///
459    /// # Arguments
460    ///
461    /// * `key` - the name of the key to be looked up in the database
462    /// * `snapshot` - `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
463    pub fn get(
464        &self,
465        key: &[u8],
466        snapshot: bool,
467    ) -> impl Future<Output = FdbResult<Option<FdbSlice>>> + Send + Sync + Unpin {
468        FdbFuture::new(unsafe {
469            fdb_sys::fdb_transaction_get(
470                self.inner.as_ptr(),
471                key.as_ptr(),
472                fdb_len(key.len(), "key"),
473                fdb_bool(snapshot),
474            )
475        })
476    }
477
478    /// Modify the database snapshot represented by transaction to perform the operation indicated
479    /// by operationType with operand param to the value stored by the given key.
480    ///
481    /// An atomic operation is a single database command that carries out several logical steps:
482    /// reading the value of a key, performing a transformation on that value, and writing the
483    /// result. Different atomic operations perform different transformations. Like other database
484    /// operations, an atomic operation is used within a transaction; however, its use within a
485    /// transaction will not cause the transaction to conflict.
486    ///
487    /// Atomic operations do not expose the current value of the key to the client but simply send
488    /// the database the transformation to apply. In regard to conflict checking, an atomic
489    /// operation is equivalent to a write without a read. It can only cause other transactions
490    /// performing reads of the key to conflict.
491    ///
492    /// By combining these logical steps into a single, read-free operation, FoundationDB can
493    /// guarantee that the transaction will not conflict due to the operation. This makes atomic
494    /// operations ideal for operating on keys that are frequently modified. A common example is
495    /// the use of a key-value pair as a counter.
496    ///
497    /// # Warning
498    ///
499    /// If a transaction uses both an atomic operation and a strictly serializable read on the same
500    /// key, the benefits of using the atomic operation (for both conflict checking and performance)
501    /// are lost.
502    pub fn atomic_op(&self, key: &[u8], param: &[u8], op_type: options::MutationType) {
503        unsafe {
504            fdb_sys::fdb_transaction_atomic_op(
505                self.inner.as_ptr(),
506                key.as_ptr(),
507                fdb_len(key.len(), "key"),
508                param.as_ptr(),
509                fdb_len(param.len(), "param"),
510                op_type.code(),
511            )
512        }
513    }
514
515    /// Resolves a key selector against the keys in the database snapshot represented by
516    /// transaction.
517    ///
518    /// Returns an FDBFuture which will be set to the key in the database matching the key
519    /// selector.
520    ///
521    /// # Arguments
522    ///
523    /// * `selector`: the key selector
524    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
525    pub fn get_key(
526        &self,
527        selector: &KeySelector,
528        snapshot: bool,
529    ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
530        let key = selector.key();
531        FdbFuture::new(unsafe {
532            fdb_sys::fdb_transaction_get_key(
533                self.inner.as_ptr(),
534                key.as_ptr(),
535                fdb_len(key.len(), "key"),
536                fdb_bool(selector.or_equal()),
537                selector.offset(),
538                fdb_bool(snapshot),
539            )
540        })
541    }
542
543    /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
544    /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
545    /// equal to the key resolved by the begin key selector and lexicographically less than the key
546    /// resolved by the end key selector.
547    ///
548    /// Returns a stream of KeyValue slices.
549    ///
550    /// This method is a little more efficient than `get_ranges_keyvalues` but a little harder to
551    /// use.
552    ///
553    /// # Arguments
554    ///
555    /// * `opt`: the range, limit, target_bytes and mode
556    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
557    pub fn get_ranges<'a>(
558        &'a self,
559        opt: RangeOption<'a>,
560        snapshot: bool,
561    ) -> impl Stream<Item = FdbResult<FdbValues>> + Send + Sync + Unpin + 'a {
562        stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
563            if let Some(opt) = maybe_opt {
564                Either::Left(self.get_range(&opt, iteration as usize, snapshot).map(
565                    move |maybe_values| {
566                        let next_opt = match &maybe_values {
567                            Ok(values) => opt.next_range(values),
568                            Err(..) => None,
569                        };
570                        Some((maybe_values, (iteration + 1, next_opt)))
571                    },
572                ))
573            } else {
574                Either::Right(future::ready(None))
575            }
576        })
577    }
578
579    /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
580    /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
581    /// equal to the key resolved by the begin key selector and lexicographically less than the key
582    /// resolved by the end key selector.
583    ///
584    /// Returns a stream of KeyValue.
585    ///
586    /// # Arguments
587    ///
588    /// * `opt`: the range, limit, target_bytes and mode
589    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
590    pub fn get_ranges_keyvalues<'a>(
591        &'a self,
592        opt: RangeOption<'a>,
593        snapshot: bool,
594    ) -> impl Stream<Item = FdbResult<FdbValue>> + Unpin + 'a {
595        self.get_ranges(opt, snapshot)
596            .map_ok(|values| stream::iter(values.into_iter().map(Ok)))
597            .try_flatten()
598    }
599
600    /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
601    /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
602    /// equal to the key resolved by the begin key selector and lexicographically less than the key
603    /// resolved by the end key selector.
604    ///
605    /// # Arguments
606    ///
607    /// * `opt`: the range, limit, target_bytes and mode
608    /// * `iteration`: If opt.mode is Iterator, this parameter should start at 1 and be incremented
609    ///   by 1 for each successive call while reading this range. In all other cases it is ignored.
610    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
611    pub fn get_range(
612        &self,
613        opt: &RangeOption,
614        iteration: usize,
615        snapshot: bool,
616    ) -> impl Future<Output = FdbResult<FdbValues>> + Send + Sync + Unpin {
617        let begin = &opt.begin;
618        let end = &opt.end;
619        let key_begin = begin.key();
620        let key_end = end.key();
621
622        FdbFuture::new(unsafe {
623            fdb_sys::fdb_transaction_get_range(
624                self.inner.as_ptr(),
625                key_begin.as_ptr(),
626                fdb_len(key_begin.len(), "key_begin"),
627                fdb_bool(begin.or_equal()),
628                begin.offset(),
629                key_end.as_ptr(),
630                fdb_len(key_end.len(), "key_end"),
631                fdb_bool(end.or_equal()),
632                end.offset(),
633                fdb_limit(opt.limit.unwrap_or(0)),
634                fdb_limit(opt.target_bytes),
635                opt.mode.code(),
636                fdb_iteration(iteration),
637                fdb_bool(snapshot),
638                fdb_bool(opt.reverse),
639            )
640        })
641    }
642
643    /// Mapped Range is an experimental feature introduced in FDB 7.1.
644    /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
645    /// In such a case, querying records by scanning an index in relational databases can be
646    /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
647    ///
648    /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
649    /// this can happen in one request without additional back and forth. Considering the overhead
650    /// of each request, this saves time and resources on serialization, deserialization, and network.
651    ///
652    /// A mapped request will:
653    ///
654    /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
655    /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
656    /// * Put all results in a nested structure and return them.
657    ///
658    /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
659    /// using snapshot isolation AND disabling read-your-writes.
660    ///
661    /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
662    ///
663    /// This is the "raw" version, users are expected to use [Transaction::get_mapped_ranges]
664    #[cfg_api_versions(min = 710)]
665    pub fn get_mapped_range(
666        &self,
667        opt: &RangeOption,
668        mapper: &[u8],
669        iteration: usize,
670        snapshot: bool,
671    ) -> impl Future<Output = FdbResult<MappedKeyValues>> + Send + Sync + Unpin {
672        let begin = &opt.begin;
673        let end = &opt.end;
674        let key_begin = begin.key();
675        let key_end = end.key();
676
677        FdbFuture::new(unsafe {
678            fdb_sys::fdb_transaction_get_mapped_range(
679                self.inner.as_ptr(),
680                key_begin.as_ptr(),
681                fdb_len(key_begin.len(), "key_begin"),
682                fdb_bool(begin.or_equal()),
683                begin.offset(),
684                key_end.as_ptr(),
685                fdb_len(key_end.len(), "key_end"),
686                fdb_bool(end.or_equal()),
687                end.offset(),
688                mapper.as_ptr(),
689                fdb_len(mapper.len(), "mapper_length"),
690                fdb_limit(opt.limit.unwrap_or(0)),
691                fdb_limit(opt.target_bytes),
692                opt.mode.code(),
693                fdb_iteration(iteration),
694                fdb_bool(snapshot),
695                fdb_bool(opt.reverse),
696            )
697        })
698    }
699
700    /// Mapped Range is an experimental feature introduced in FDB 7.1.
701    /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
702    /// In such a case, querying records by scanning an index in relational databases can be
703    /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
704    ///
705    /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
706    /// this can happen in one request without additional back and forth. Considering the overhead
707    /// of each request, this saves time and resources on serialization, deserialization, and network.
708    ///
709    /// A mapped request will:
710    ///
711    /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
712    /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
713    /// * Put all results in a nested structure and return them.
714    ///
715    /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
716    /// using snapshot isolation AND disabling read-your-writes.
717    ///
718    /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
719    #[cfg_api_versions(min = 710)]
720    pub fn get_mapped_ranges<'a>(
721        &'a self,
722        opt: RangeOption<'a>,
723        mapper: &'a [u8],
724        snapshot: bool,
725    ) -> impl Stream<Item = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + 'a {
726        stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
727            if let Some(opt) = maybe_opt {
728                Either::Left(
729                    self.get_mapped_range(&opt, mapper, iteration as usize, snapshot)
730                        .map(move |maybe_values| {
731                            let next_opt = match &maybe_values {
732                                Ok(values) => opt.next_mapped_range(values),
733                                Err(..) => None,
734                            };
735                            Some((maybe_values, (iteration + 1, next_opt)))
736                        }),
737                )
738            } else {
739                Either::Right(future::ready(None))
740            }
741        })
742    }
743
744    /// Modify the database snapshot represented by transaction to remove all keys (if any) which
745    /// are lexicographically greater than or equal to the given begin key and lexicographically
746    /// less than the given end_key.
747    ///
748    /// The modification affects the actual database only if transaction is later committed with
749    /// `Transaction::commit`.
750    pub fn clear_range(&self, begin: &[u8], end: &[u8]) {
751        unsafe {
752            fdb_sys::fdb_transaction_clear_range(
753                self.inner.as_ptr(),
754                begin.as_ptr(),
755                fdb_len(begin.len(), "begin"),
756                end.as_ptr(),
757                fdb_len(end.len(), "end"),
758            )
759        }
760    }
761
762    /// Get the estimated byte size of the key range based on the byte sample collected by FDB
763    #[cfg_api_versions(min = 630)]
764    pub fn get_estimated_range_size_bytes(
765        &self,
766        begin: &[u8],
767        end: &[u8],
768    ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
769        FdbFuture::<i64>::new(unsafe {
770            fdb_sys::fdb_transaction_get_estimated_range_size_bytes(
771                self.inner.as_ptr(),
772                begin.as_ptr(),
773                fdb_len(begin.len(), "begin"),
774                end.as_ptr(),
775                fdb_len(end.len(), "end"),
776            )
777        })
778    }
779
780    /// Attempts to commit the sets and clears previously applied to the database snapshot
781    /// represented by transaction to the actual database.
782    ///
783    /// The commit may or may not succeed – in particular, if a conflicting transaction previously
784    /// committed, then the commit must fail in order to preserve transactional isolation. If the
785    /// commit does succeed, the transaction is durably committed to the database and all
786    /// subsequently started transactions will observe its effects.
787    ///
788    /// It is not necessary to commit a read-only transaction – you can simply drop it.
789    ///
790    /// Callers will usually want to retry a transaction if the commit or a another method on the
791    /// transaction returns a retryable error (see `on_error` and/or `Database::transact`).
792    ///
793    /// As with other client/server databases, in some failure scenarios a client may be unable to
794    /// determine whether a transaction succeeded. In these cases, `Transaction::commit` will return
795    /// an error and `is_maybe_committed()` will returns true on that error. The `on_error` function
796    /// treats this error as retryable, so retry loops that don’t check for `is_maybe_committed()`
797    /// could execute the transaction twice. In these cases, you must consider the idempotence of
798    /// the transaction. For more information, see [Transactions with unknown results](https://apple.github.io/foundationdb/developer-guide.html#developer-guide-unknown-results).
799    ///
800    /// Normally, commit will wait for outstanding reads to return. However, if those reads were
801    /// snapshot reads or the transaction option for disabling “read-your-writes” has been invoked,
802    /// any outstanding reads will immediately return errors.
803    pub fn commit(self) -> impl Future<Output = TransactionResult> + Send + Sync + Unpin {
804        FdbFuture::<()>::new(unsafe { fdb_sys::fdb_transaction_commit(self.inner.as_ptr()) }).map(
805            move |r| match r {
806                Ok(()) => Ok(TransactionCommitted { tr: self }),
807                Err(err) => Err(TransactionCommitError { tr: self, err }),
808            },
809        )
810    }
811
812    /// Implements the recommended retry and backoff behavior for a transaction. This function knows
813    /// which of the error codes generated by other `Transaction` functions represent temporary
814    /// error conditions and which represent application errors that should be handled by the
815    /// application. It also implements an exponential backoff strategy to avoid swamping the
816    /// database cluster with excessive retries when there is a high level of conflict between
817    /// transactions.
818    ///
819    /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
820    /// transaction has already been reset.
821    ///
822    /// You should not call this method most of the times and use `Database::transact` which
823    /// implements a retry loop strategy for you.
824    pub fn on_error(
825        self,
826        err: FdbError,
827    ) -> impl Future<Output = FdbResult<Transaction>> + Send + Sync + Unpin {
828        FdbFuture::<()>::new(unsafe {
829            fdb_sys::fdb_transaction_on_error(self.inner.as_ptr(), err.code())
830        })
831        .map_ok(|()| self)
832    }
833
834    /// Cancels the transaction. All pending or future uses of the transaction will return a
835    /// transaction_cancelled error. The transaction can be used again after it is reset.
836    pub fn cancel(self) -> TransactionCancelled {
837        unsafe { fdb_sys::fdb_transaction_cancel(self.inner.as_ptr()) };
838        TransactionCancelled { tr: self }
839    }
840
841    /// Returns a list of public network addresses as strings, one for each of the storage servers
842    /// responsible for storing key_name and its associated value.
843    pub fn get_addresses_for_key(
844        &self,
845        key: &[u8],
846    ) -> impl Future<Output = FdbResult<FdbAddresses>> + Send + Sync + Unpin {
847        FdbFuture::new(unsafe {
848            fdb_sys::fdb_transaction_get_addresses_for_key(
849                self.inner.as_ptr(),
850                key.as_ptr(),
851                fdb_len(key.len(), "key"),
852            )
853        })
854    }
855
856    /// A watch's behavior is relative to the transaction that created it. A watch will report a
857    /// change in relation to the key’s value as readable by that transaction. The initial value
858    /// used for comparison is either that of the transaction’s read version or the value as
859    /// modified by the transaction itself prior to the creation of the watch. If the value changes
860    /// and then changes back to its initial value, the watch might not report the change.
861    ///
862    /// Until the transaction that created it has been committed, a watch will not report changes
863    /// made by other transactions. In contrast, a watch will immediately report changes made by
864    /// the transaction itself. Watches cannot be created if the transaction has set the
865    /// READ_YOUR_WRITES_DISABLE transaction option, and an attempt to do so will return an
866    /// watches_disabled error.
867    ///
868    /// If the transaction used to create a watch encounters an error during commit, then the watch
869    /// will be set with that error. A transaction whose commit result is unknown will set all of
870    /// its watches with the commit_unknown_result error. If an uncommitted transaction is reset or
871    /// destroyed, then any watches it created will be set with the transaction_cancelled error.
872    ///
873    /// Returns an future representing an empty value that will be set once the watch has
874    /// detected a change to the value at the specified key.
875    ///
876    /// By default, each database connection can have no more than 10,000 watches that have not yet
877    /// reported a change. When this number is exceeded, an attempt to create a watch will return a
878    /// too_many_watches error. This limit can be changed using the MAX_WATCHES database option.
879    /// Because a watch outlives the transaction that creates it, any watch that is no longer
880    /// needed should be cancelled by dropping its future.
881    pub fn watch(&self, key: &[u8]) -> impl Future<Output = FdbResult<()>> + Send + Sync + Unpin {
882        FdbFuture::new(unsafe {
883            fdb_sys::fdb_transaction_watch(
884                self.inner.as_ptr(),
885                key.as_ptr(),
886                fdb_len(key.len(), "key"),
887            )
888        })
889    }
890
891    /// Returns an FDBFuture which will be set to the approximate transaction size so far in the
892    /// returned future, which is the summation of the estimated size of mutations, read conflict
893    /// ranges, and write conflict ranges.
894    ///
895    /// This can be called multiple times before the transaction is committed.
896    #[cfg_api_versions(min = 620)]
897    pub fn get_approximate_size(
898        &self,
899    ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
900        FdbFuture::new(unsafe {
901            fdb_sys::fdb_transaction_get_approximate_size(self.inner.as_ptr())
902        })
903    }
904
905    /// Gets a list of keys that can split the given range into (roughly) equally sized chunks based on chunk_size.
906    /// Note: the returned split points contain the start key and end key of the given range.
907    #[cfg_api_versions(min = 700)]
908    pub fn get_range_split_points(
909        &self,
910        begin: &[u8],
911        end: &[u8],
912        chunk_size: i64,
913    ) -> impl Future<Output = FdbResult<FdbKeys>> + Send + Sync + Unpin {
914        FdbFuture::<FdbKeys>::new(unsafe {
915            fdb_sys::fdb_transaction_get_range_split_points(
916                self.inner.as_ptr(),
917                begin.as_ptr(),
918                fdb_len(begin.len(), "begin"),
919                end.as_ptr(),
920                fdb_len(end.len(), "end"),
921                chunk_size,
922            )
923        })
924    }
925
926    /// Returns an FDBFuture which will be set to the versionstamp which was used by any
927    /// versionstamp operations in this transaction.
928    ///
929    /// The future will be ready only after the successful completion of a call to `commit()` on
930    /// this Transaction. Read-only transactions do not modify the database when committed and will
931    /// result in the future completing with an error. Keep in mind that a transaction which reads
932    /// keys and then sets them to their current values may be optimized to a read-only transaction.
933    ///
934    /// Most applications will not call this function.
935    pub fn get_versionstamp(
936        &self,
937    ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
938        FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_versionstamp(self.inner.as_ptr()) })
939    }
940
941    /// The transaction obtains a snapshot read version automatically at the time of the first call
942    /// to `get_*()` (including this one) and (unless causal consistency has been deliberately
943    /// compromised by transaction options) is guaranteed to represent all transactions which were
944    /// reported committed before that call.
945    pub fn get_read_version(&self) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
946        FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_read_version(self.inner.as_ptr()) })
947    }
948
949    /// Sets the snapshot read version used by a transaction.
950    ///
951    /// This is not needed in simple cases.
952    /// If the given version is too old, subsequent reads will fail with error_code_past_version;
953    /// if it is too new, subsequent reads may be delayed indefinitely and/or fail with
954    /// error_code_future_version. If any of get_*() have been called on this transaction already,
955    /// the result is undefined.
956    pub fn set_read_version(&self, version: i64) {
957        unsafe { fdb_sys::fdb_transaction_set_read_version(self.inner.as_ptr(), version) }
958    }
959
960    /// The metadata version key `\xff/metadataVersion` is a key intended to help layers deal with hot keys.
961    /// The value of this key is sent to clients along with the read version from the proxy,
962    /// so a client can read its value without communicating with a storage server.
963    /// To retrieve the metadataVersion, you need to set `TransactionOption::ReadSystemKeys`
964    #[cfg_api_versions(min = 610)]
965    pub async fn get_metadata_version(&self, snapshot: bool) -> FdbResult<Option<i64>> {
966        match self.get(METADATA_VERSION_KEY, snapshot).await {
967            Ok(Some(fdb_slice)) => {
968                let value = fdb_slice.deref();
969                // as we cannot write the metadata-key directly(we must mutate with an atomic_op),
970                // can we assume that it will always be the correct size?
971                if value.len() < 8 {
972                    return Ok(None);
973                }
974
975                // The 80-bits versionstamps are 10 bytes longs, and are composed of:
976                // * 8 bytes (Transaction Version)
977                // * followed by 2 bytes (Transaction Batch Order)
978                // More details can be found here: https://forums.foundationdb.org/t/implementing-versionstamps-in-bindings/250
979                let mut arr = [0u8; 8];
980                arr.copy_from_slice(&value[0..8]);
981                let transaction_version: i64 = i64::from_be_bytes(arr);
982
983                Ok(Some(transaction_version))
984            }
985            Ok(None) => Ok(None),
986
987            Err(err) => Err(err),
988        }
989    }
990
991    #[cfg_api_versions(min = 610)]
992    pub fn update_metadata_version(&self) {
993        // The param is transformed by removing the final four bytes from ``param`` and reading
994        // those as a little-Endian 32-bit integer to get a position ``pos``.
995        // The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the
996        // versionstamp of the transaction used. The first byte of the parameter is position 0.
997        // As we only have the metadata value, we can just create an 14-bytes Vec filled with 0u8.
998        let param = vec![0u8; 14];
999        self.atomic_op(
1000            METADATA_VERSION_KEY,
1001            param.as_slice(),
1002            options::MutationType::SetVersionstampedValue,
1003        )
1004    }
1005
1006    /// Reset transaction to its initial state.
1007    ///
1008    /// In order to protect against a race condition with cancel(), this call require a mutable
1009    /// access to the transaction.
1010    ///
1011    /// This is similar to dropping the transaction and creating a new one.
1012    ///
1013    /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
1014    /// transaction has already been reset.
1015    pub fn reset(&mut self) {
1016        unsafe { fdb_sys::fdb_transaction_reset(self.inner.as_ptr()) }
1017    }
1018
1019    /// Adds a conflict range to a transaction without performing the associated read or write.
1020    ///
1021    /// # Note
1022    ///
1023    /// Most applications will use the serializable isolation that transactions provide by default
1024    /// and will not need to manipulate conflict ranges.
1025    pub fn add_conflict_range(
1026        &self,
1027        begin: &[u8],
1028        end: &[u8],
1029        ty: options::ConflictRangeType,
1030    ) -> FdbResult<()> {
1031        error::eval(unsafe {
1032            fdb_sys::fdb_transaction_add_conflict_range(
1033                self.inner.as_ptr(),
1034                begin.as_ptr(),
1035                fdb_len(begin.len(), "begin"),
1036                end.as_ptr(),
1037                fdb_len(end.len(), "end"),
1038                ty.code(),
1039            )
1040        })
1041    }
1042}
1043
1044impl Drop for Transaction {
1045    fn drop(&mut self) {
1046        unsafe {
1047            fdb_sys::fdb_transaction_destroy(self.inner.as_ptr());
1048        }
1049    }
1050}
1051
1052/// A retryable transaction, generated by Database.run
1053#[derive(Clone)]
1054pub struct RetryableTransaction {
1055    inner: Arc<Transaction>,
1056}
1057
1058impl Deref for RetryableTransaction {
1059    type Target = Transaction;
1060    fn deref(&self) -> &Transaction {
1061        self.inner.deref()
1062    }
1063}
1064
1065impl RetryableTransaction {
1066    pub(crate) fn new(t: Transaction) -> RetryableTransaction {
1067        RetryableTransaction { inner: Arc::new(t) }
1068    }
1069
1070    pub(crate) fn take(self) -> Result<Transaction, FdbBindingError> {
1071        // checking weak references
1072        if Arc::weak_count(&self.inner) != 0 {
1073            return Err(FdbBindingError::ReferenceToTransactionKept);
1074        }
1075        Arc::try_unwrap(self.inner).map_err(|_| FdbBindingError::ReferenceToTransactionKept)
1076    }
1077
1078    pub(crate) async fn on_error(
1079        self,
1080        err: FdbError,
1081    ) -> Result<Result<RetryableTransaction, FdbError>, FdbBindingError> {
1082        Ok(self
1083            .take()?
1084            .on_error(err)
1085            .await
1086            .map(RetryableTransaction::new))
1087    }
1088
1089    pub(crate) async fn commit(
1090        self,
1091    ) -> Result<Result<TransactionCommitted, TransactionCommitError>, FdbBindingError> {
1092        Ok(self.take()?.commit().await)
1093    }
1094}