Skip to main content

foundationdb/
transaction.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBTransaction C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#transaction>
12
13use foundationdb_sys as fdb_sys;
14use std::fmt;
15use std::ops::{Deref, Range, RangeInclusive};
16use std::ptr::NonNull;
17use std::sync::Arc;
18
19use crate::future::*;
20use crate::keyselector::*;
21use crate::metrics::{FdbCommand, TransactionMetrics};
22use crate::options;
23
24use crate::{FdbError, FdbResult, error};
25use foundationdb_macros::cfg_api_versions;
26
27use crate::error::{FdbBindingError, TransactionMetricsNotFound};
28
29use futures::{
30    Future, FutureExt, Stream, TryFutureExt, TryStreamExt, future, future::Either, stream,
31};
32
33#[cfg_api_versions(min = 610)]
34const METADATA_VERSION_KEY: &[u8] = b"\xff/metadataVersion";
35
36/// Special keyspace prefix for conflicting keys.
37const CONFLICTING_KEYS_PREFIX: &[u8] = b"\xff\xff/transaction/conflicting_keys/";
38// Matches C++ SystemData.cpp conflictingKeysRange end key.
39const CONFLICTING_KEYS_END: &[u8] = b"\xff\xff/transaction/conflicting_keys/\xff\xff";
40
41/// A key range that conflicted during a transaction commit.
42///
43/// Returned by [`Transaction::conflicting_keys`] after a commit conflict
44/// when [`TransactionOption::ReportConflictingKeys`](crate::options::TransactionOption::ReportConflictingKeys)
45/// is enabled.
46///
47/// The special keyspace encodes conflicting ranges using boundary markers:
48/// - Value `b"1"` marks the inclusive start of a conflicting range
49/// - Value `b"0"` marks the exclusive end of a conflicting range
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct ConflictingKeyRange {
52    begin: Vec<u8>,
53    end: Vec<u8>,
54}
55
56impl ConflictingKeyRange {
57    /// The inclusive begin of the conflicting key range.
58    pub fn begin(&self) -> &[u8] {
59        &self.begin
60    }
61
62    /// The exclusive end of the conflicting key range.
63    pub fn end(&self) -> &[u8] {
64        &self.end
65    }
66}
67
68impl fmt::Display for ConflictingKeyRange {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        write!(
71            f,
72            "{}..{}",
73            String::from_utf8_lossy(&self.begin),
74            String::from_utf8_lossy(&self.end),
75        )
76    }
77}
78
79/// A committed transaction.
80#[derive(Debug)]
81#[repr(transparent)]
82pub struct TransactionCommitted {
83    tr: Transaction,
84}
85
86impl TransactionCommitted {
87    /// Retrieves the database version number at which a given transaction was committed.
88    ///
89    /// Read-only transactions do not modify the database when committed and will have a committed
90    /// version of -1. Keep in mind that a transaction which reads keys and then sets them to their
91    /// current values may be optimized to a read-only transaction.
92    ///
93    /// Note that database versions are not necessarily unique to a given transaction and so cannot
94    /// be used to determine in what order two transactions completed. The only use for this
95    /// function is to manually enforce causal consistency when calling `set_read_version()` on
96    /// another subsequent transaction.
97    ///
98    /// Most applications will not call this function.
99    pub fn committed_version(&self) -> FdbResult<i64> {
100        let mut version: i64 = 0;
101        error::eval(unsafe {
102            fdb_sys::fdb_transaction_get_committed_version(self.tr.inner.as_ptr(), &mut version)
103        })?;
104        Ok(version)
105    }
106
107    /// Reset the transaction to its initial state.
108    ///
109    /// This will not affect previously committed data.
110    ///
111    /// This is similar to dropping the transaction and creating a new one.
112    pub fn reset(mut self) -> Transaction {
113        self.tr.reset();
114        self.tr
115    }
116}
117impl From<TransactionCommitted> for Transaction {
118    fn from(tc: TransactionCommitted) -> Transaction {
119        tc.reset()
120    }
121}
122
123/// A failed to commit transaction.
124pub struct TransactionCommitError {
125    tr: Transaction,
126    err: FdbError,
127}
128
129impl TransactionCommitError {
130    /// Implements the recommended retry and backoff behavior for a transaction. This function knows
131    /// which of the error codes generated by other `Transaction` functions represent temporary
132    /// error conditions and which represent application errors that should be handled by the
133    /// application. It also implements an exponential backoff strategy to avoid swamping the
134    /// database cluster with excessive retries when there is a high level of conflict between
135    /// transactions.
136    ///
137    /// You should not call this method most of the times and use `Database::transact` which
138    /// implements a retry loop strategy for you.
139    pub fn on_error(self) -> impl Future<Output = FdbResult<Transaction>> {
140        FdbFuture::<()>::new(unsafe {
141            fdb_sys::fdb_transaction_on_error(self.tr.inner.as_ptr(), self.err.code())
142        })
143        .map_ok(|()| self.tr)
144    }
145
146    /// Reads the conflicting key ranges that caused this commit failure.
147    ///
148    /// Only returns meaningful results if
149    /// [`TransactionOption::ReportConflictingKeys`](crate::options::TransactionOption::ReportConflictingKeys)
150    /// was set on the transaction **and** the error is `not_committed` (code 1020).
151    ///
152    /// Must be called **before** [`on_error`](Self::on_error) which resets the transaction.
153    ///
154    /// # Errors
155    ///
156    /// Returns an `FdbError` if the special keyspace read fails.
157    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
158    pub async fn conflicting_keys(&self) -> FdbResult<Vec<ConflictingKeyRange>> {
159        self.tr.conflicting_keys().await
160    }
161
162    /// Reset the transaction to its initial state.
163    ///
164    /// This is similar to dropping the transaction and creating a new one.
165    pub fn reset(mut self) -> Transaction {
166        self.tr.reset();
167        self.tr
168    }
169}
170
171impl Deref for TransactionCommitError {
172    type Target = FdbError;
173    fn deref(&self) -> &FdbError {
174        &self.err
175    }
176}
177
178impl From<TransactionCommitError> for FdbError {
179    fn from(tce: TransactionCommitError) -> FdbError {
180        tce.err
181    }
182}
183
184impl fmt::Debug for TransactionCommitError {
185    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
186        write!(f, "TransactionCommitError({})", self.err)
187    }
188}
189
190impl fmt::Display for TransactionCommitError {
191    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192        self.err.fmt(f)
193    }
194}
195
196impl std::error::Error for TransactionCommitError {}
197
198/// The result of `Transaction::Commit`
199type TransactionResult = Result<TransactionCommitted, TransactionCommitError>;
200
201/// A cancelled transaction
202#[derive(Debug)]
203#[repr(transparent)]
204pub struct TransactionCancelled {
205    tr: Transaction,
206}
207impl TransactionCancelled {
208    /// Reset the transaction to its initial state.
209    ///
210    /// This is similar to dropping the transaction and creating a new one.
211    pub fn reset(mut self) -> Transaction {
212        self.tr.reset();
213        self.tr
214    }
215}
216impl From<TransactionCancelled> for Transaction {
217    fn from(tc: TransactionCancelled) -> Transaction {
218        tc.reset()
219    }
220}
221
222/// In FoundationDB, a transaction is a mutable snapshot of a database.
223///
224/// All read and write operations on a transaction see and modify an otherwise-unchanging version of the database and only change the underlying database if and when the transaction is committed. Read operations do see the effects of previous write operations on the same transaction. Committing a transaction usually succeeds in the absence of conflicts.
225///
226/// Applications must provide error handling and an appropriate retry loop around the application code for a transaction. See the documentation for [fdb_transaction_on_error()](https://apple.github.io/foundationdb/api-c.html#transaction).
227///
228/// Transactions group operations into a unit with the properties of atomicity, isolation, and durability. Transactions also provide the ability to maintain an application’s invariants or integrity constraints, supporting the property of consistency. Together these properties are known as ACID.
229///
230/// Transactions are also causally consistent: once a transaction has been successfully committed, all subsequently created transactions will see the modifications made by it.
231#[derive(Debug)]
232pub struct Transaction {
233    // Order of fields should not be changed, because Rust drops field top-to-bottom, and
234    // transaction should be dropped before cluster.
235    inner: NonNull<fdb_sys::FDBTransaction>,
236    metrics: Option<TransactionMetrics>,
237}
238unsafe impl Send for Transaction {}
239unsafe impl Sync for Transaction {}
240
241/// Converts Rust `bool` into `fdb_sys::fdb_bool_t`
242#[inline]
243fn fdb_bool(v: bool) -> fdb_sys::fdb_bool_t {
244    if v { 1 } else { 0 }
245}
246#[inline]
247fn fdb_len(len: usize, context: &'static str) -> std::os::raw::c_int {
248    assert!(len <= i32::MAX as usize, "{context}.len() > i32::MAX");
249    len as i32
250}
251#[inline]
252fn fdb_iteration(iteration: usize) -> std::os::raw::c_int {
253    if iteration > i32::MAX as usize {
254        0 // this will cause client_invalid_operation
255    } else {
256        iteration as i32
257    }
258}
259#[inline]
260fn fdb_limit(v: usize) -> std::os::raw::c_int {
261    if v > i32::MAX as usize {
262        i32::MAX
263    } else {
264        v as i32
265    }
266}
267
268/// `RangeOption` represents a query parameters for range scan query.
269///
270/// You can construct `RangeOption` easily:
271///
272/// ```
273/// use foundationdb::RangeOption;
274///
275/// let opt = RangeOption::from((b"begin".as_ref(), b"end".as_ref()));
276/// let opt: RangeOption = (b"begin".as_ref()..b"end".as_ref()).into();
277/// let opt = RangeOption {
278///     limit: Some(10),
279///     ..RangeOption::from((b"begin".as_ref(), b"end".as_ref()))
280/// };
281/// ```
282#[derive(Debug, Clone)]
283pub struct RangeOption<'a> {
284    /// The beginning of the range.
285    pub begin: KeySelector<'a>,
286    /// The end of the range.
287    pub end: KeySelector<'a>,
288    /// If non-zero, indicates the maximum number of key-value pairs to return.
289    pub limit: Option<usize>,
290    /// If non-zero, indicates a (soft) cap on the combined number of bytes of keys and values to
291    /// return for each item.
292    pub target_bytes: usize,
293    /// One of the options::StreamingMode values indicating how the caller would like the data in
294    /// the range returned.
295    pub mode: options::StreamingMode,
296    /// If true, key-value pairs will be returned in reverse lexicographical order beginning at
297    /// the end of the range.
298    pub reverse: bool,
299    #[doc(hidden)]
300    pub __non_exhaustive: std::marker::PhantomData<()>,
301}
302
303impl RangeOption<'_> {
304    /// Reverses the range direction.
305    pub fn rev(mut self) -> Self {
306        self.reverse = !self.reverse;
307        self
308    }
309
310    pub fn next_range(mut self, kvs: &FdbValues) -> Option<Self> {
311        if !kvs.more() {
312            return None;
313        }
314
315        let last = kvs.last()?;
316        let last_key = last.key();
317
318        if let Some(limit) = self.limit.as_mut() {
319            *limit = limit.saturating_sub(kvs.len());
320            if *limit == 0 {
321                return None;
322            }
323        }
324
325        if self.reverse {
326            self.end.make_first_greater_or_equal(last_key);
327        } else {
328            self.begin.make_first_greater_than(last_key);
329        }
330        Some(self)
331    }
332
333    #[cfg_api_versions(min = 710)]
334    pub(crate) fn next_mapped_range(mut self, kvs: &MappedKeyValues) -> Option<Self> {
335        if !kvs.more() {
336            return None;
337        }
338
339        let last = kvs.last()?;
340        let last_key = last.parent_key();
341
342        if let Some(limit) = self.limit.as_mut() {
343            *limit = limit.saturating_sub(kvs.len());
344            if *limit == 0 {
345                return None;
346            }
347        }
348
349        if self.reverse {
350            self.end.make_first_greater_or_equal(last_key);
351        } else {
352            self.begin.make_first_greater_than(last_key);
353        }
354        Some(self)
355    }
356}
357
358impl Default for RangeOption<'_> {
359    fn default() -> Self {
360        Self {
361            begin: KeySelector::first_greater_or_equal([].as_ref()),
362            end: KeySelector::first_greater_or_equal([].as_ref()),
363            limit: None,
364            target_bytes: 0,
365            mode: options::StreamingMode::Iterator,
366            reverse: false,
367            __non_exhaustive: std::marker::PhantomData,
368        }
369    }
370}
371
372impl<'a> From<(KeySelector<'a>, KeySelector<'a>)> for RangeOption<'a> {
373    fn from((begin, end): (KeySelector<'a>, KeySelector<'a>)) -> Self {
374        Self {
375            begin,
376            end,
377            ..Self::default()
378        }
379    }
380}
381impl From<(Vec<u8>, Vec<u8>)> for RangeOption<'static> {
382    fn from((begin, end): (Vec<u8>, Vec<u8>)) -> Self {
383        Self {
384            begin: KeySelector::first_greater_or_equal(begin),
385            end: KeySelector::first_greater_or_equal(end),
386            ..Self::default()
387        }
388    }
389}
390impl<'a> From<(&'a [u8], &'a [u8])> for RangeOption<'a> {
391    fn from((begin, end): (&'a [u8], &'a [u8])) -> Self {
392        Self {
393            begin: KeySelector::first_greater_or_equal(begin),
394            end: KeySelector::first_greater_or_equal(end),
395            ..Self::default()
396        }
397    }
398}
399impl<'a> From<std::ops::Range<KeySelector<'a>>> for RangeOption<'a> {
400    fn from(range: Range<KeySelector<'a>>) -> Self {
401        RangeOption::from((range.start, range.end))
402    }
403}
404
405impl<'a> From<std::ops::Range<&'a [u8]>> for RangeOption<'a> {
406    fn from(range: Range<&'a [u8]>) -> Self {
407        RangeOption::from((range.start, range.end))
408    }
409}
410
411impl From<std::ops::Range<std::vec::Vec<u8>>> for RangeOption<'static> {
412    fn from(range: Range<Vec<u8>>) -> Self {
413        RangeOption::from((range.start, range.end))
414    }
415}
416
417impl<'a> From<std::ops::RangeInclusive<&'a [u8]>> for RangeOption<'a> {
418    fn from(range: RangeInclusive<&'a [u8]>) -> Self {
419        let (start, end) = range.into_inner();
420        (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
421    }
422}
423
424impl From<std::ops::RangeInclusive<std::vec::Vec<u8>>> for RangeOption<'static> {
425    fn from(range: RangeInclusive<Vec<u8>>) -> Self {
426        let (start, end) = range.into_inner();
427        (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
428    }
429}
430
431impl Transaction {
432    pub(crate) fn new(inner: NonNull<fdb_sys::FDBTransaction>) -> Self {
433        Self {
434            inner,
435            metrics: None,
436        }
437    }
438
439    pub fn new_instrumented(
440        inner: NonNull<fdb_sys::FDBTransaction>,
441        metrics: TransactionMetrics,
442    ) -> Self {
443        Self {
444            inner,
445            metrics: Some(metrics),
446        }
447    }
448
449    /// Called to set an option on an FDBTransaction.
450    pub fn set_option(&self, opt: options::TransactionOption) -> FdbResult<()> {
451        unsafe { opt.apply(self.inner.as_ptr()) }
452    }
453
454    /// Pass through an option given a code and raw data. Useful when creating a passthrough layer
455    /// where the code and data will be provided as raw, in order to avoid deserializing to an option
456    /// and serializing it back to code and data.
457    /// In general, you should use `set_option`.
458    pub fn set_raw_option(
459        &self,
460        code: fdb_sys::FDBTransactionOption,
461        data: Option<Vec<u8>>,
462    ) -> FdbResult<()> {
463        let (data_ptr, size) = data
464            .as_ref()
465            .map(|data| {
466                (
467                    data.as_ptr(),
468                    i32::try_from(data.len()).expect("len to fit in i32"),
469                )
470            })
471            .unwrap_or_else(|| (std::ptr::null(), 0));
472        let err = unsafe {
473            fdb_sys::fdb_transaction_set_option(self.inner.as_ptr(), code, data_ptr, size)
474        };
475        if err != 0 {
476            Err(FdbError::from_code(err))
477        } else {
478            Ok(())
479        }
480    }
481
482    /// Modify the database snapshot represented by transaction to change the given
483    /// key to have the given value.
484    ///
485    /// If the given key was not previously present in the database it is inserted.
486    /// The modification affects the actual database only if transaction is later
487    /// committed with `Transaction::commit`.
488    ///
489    /// # Arguments
490    ///
491    /// * `key` - the name of the key to be inserted into the database.
492    /// * `value` - the value to be inserted into the database
493    #[cfg_attr(
494        feature = "trace",
495        tracing::instrument(level = "debug", skip(self, key, value))
496    )]
497    pub fn set(&self, key: &[u8], value: &[u8]) {
498        unsafe {
499            fdb_sys::fdb_transaction_set(
500                self.inner.as_ptr(),
501                key.as_ptr(),
502                fdb_len(key.len(), "key"),
503                value.as_ptr(),
504                fdb_len(value.len(), "value"),
505            )
506        }
507
508        if let Some(metrics) = &self.metrics {
509            metrics.report_metrics(FdbCommand::Set((key.len() + value.len()) as u64));
510        }
511    }
512
513    /// Modify the database snapshot represented by transaction to remove the given key from the
514    /// database.
515    ///
516    /// If the key was not previously present in the database, there is no effect. The modification
517    /// affects the actual database only if transaction is later committed with
518    /// `Transaction::commit`.
519    ///
520    /// # Arguments
521    ///
522    /// * `key` - the name of the key to be removed from the database.
523    #[cfg_attr(
524        feature = "trace",
525        tracing::instrument(level = "debug", skip(self, key))
526    )]
527    pub fn clear(&self, key: &[u8]) {
528        unsafe {
529            fdb_sys::fdb_transaction_clear(
530                self.inner.as_ptr(),
531                key.as_ptr(),
532                fdb_len(key.len(), "key"),
533            )
534        }
535
536        if let Some(metrics) = &self.metrics {
537            metrics.report_metrics(FdbCommand::Clear);
538        }
539    }
540
541    /// Reads a value from the database snapshot represented by transaction.
542    ///
543    /// Returns an FDBFuture which will be set to the value of key in the database if there is any.
544    ///
545    /// # Arguments
546    ///
547    /// * `key` - the name of the key to be looked up in the database
548    /// * `snapshot` - `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
549    #[cfg_attr(
550        feature = "trace",
551        tracing::instrument(level = "debug", skip(self, key))
552    )]
553    pub fn get(
554        &self,
555        key: &[u8],
556        snapshot: bool,
557    ) -> impl Future<Output = FdbResult<Option<FdbSlice>>> + Send + Sync + Unpin + use<> {
558        let metrics = self.metrics.clone();
559        let lenght_key = key.len();
560
561        FdbFuture::<Option<FdbSlice>>::new(unsafe {
562            fdb_sys::fdb_transaction_get(
563                self.inner.as_ptr(),
564                key.as_ptr(),
565                fdb_len(key.len(), "key"),
566                fdb_bool(snapshot),
567            )
568        })
569        .map(move |result| {
570            if let Ok(value) = &result {
571                if let Some(metrics) = metrics.as_ref() {
572                    let (bytes_count, kv_fetched) = if let Some(values) = value {
573                        ((lenght_key + values.len()) as u64, 1)
574                    } else {
575                        (lenght_key as u64, 0)
576                    };
577                    metrics.report_metrics(FdbCommand::Get(bytes_count, kv_fetched));
578                }
579            }
580            result
581        })
582    }
583
584    /// Modify the database snapshot represented by transaction to perform the operation indicated
585    /// by operationType with operand param to the value stored by the given key.
586    ///
587    /// An atomic operation is a single database command that carries out several logical steps:
588    /// reading the value of a key, performing a transformation on that value, and writing the
589    /// result. Different atomic operations perform different transformations. Like other database
590    /// operations, an atomic operation is used within a transaction; however, its use within a
591    /// transaction will not cause the transaction to conflict.
592    ///
593    /// Atomic operations do not expose the current value of the key to the client but simply send
594    /// the database the transformation to apply. In regard to conflict checking, an atomic
595    /// operation is equivalent to a write without a read. It can only cause other transactions
596    /// performing reads of the key to conflict.
597    ///
598    /// By combining these logical steps into a single, read-free operation, FoundationDB can
599    /// guarantee that the transaction will not conflict due to the operation. This makes atomic
600    /// operations ideal for operating on keys that are frequently modified. A common example is
601    /// the use of a key-value pair as a counter.
602    ///
603    /// # Warning
604    ///
605    /// If a transaction uses both an atomic operation and a strictly serializable read on the same
606    /// key, the benefits of using the atomic operation (for both conflict checking and performance)
607    /// are lost.
608    #[cfg_attr(
609        feature = "trace",
610        tracing::instrument(level = "debug", skip(self, key, param))
611    )]
612    pub fn atomic_op(&self, key: &[u8], param: &[u8], op_type: options::MutationType) {
613        unsafe {
614            fdb_sys::fdb_transaction_atomic_op(
615                self.inner.as_ptr(),
616                key.as_ptr(),
617                fdb_len(key.len(), "key"),
618                param.as_ptr(),
619                fdb_len(param.len(), "param"),
620                op_type.code(),
621            )
622        }
623
624        if let Some(metrics) = &self.metrics {
625            metrics.report_metrics(FdbCommand::Atomic);
626        }
627    }
628
629    /// Resolves a key selector against the keys in the database snapshot represented by
630    /// transaction.
631    ///
632    /// Returns an FDBFuture which will be set to the key in the database matching the key
633    /// selector.
634    ///
635    /// # Arguments
636    ///
637    /// * `selector`: the key selector
638    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
639    #[cfg_attr(
640        feature = "trace",
641        tracing::instrument(level = "debug", skip(self, selector, snapshot))
642    )]
643    pub fn get_key(
644        &self,
645        selector: &KeySelector,
646        snapshot: bool,
647    ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin + use<> {
648        let key = selector.key();
649        FdbFuture::new(unsafe {
650            fdb_sys::fdb_transaction_get_key(
651                self.inner.as_ptr(),
652                key.as_ptr(),
653                fdb_len(key.len(), "key"),
654                fdb_bool(selector.or_equal()),
655                selector.offset(),
656                fdb_bool(snapshot),
657            )
658        })
659    }
660
661    /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
662    /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
663    /// equal to the key resolved by the begin key selector and lexicographically less than the key
664    /// resolved by the end key selector.
665    ///
666    /// Returns a stream of KeyValue slices.
667    ///
668    /// This method is a little more efficient than `get_ranges_keyvalues` but a little harder to
669    /// use.
670    ///
671    /// # Arguments
672    ///
673    /// * `opt`: the range, limit, target_bytes and mode
674    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
675    #[cfg_attr(
676        feature = "trace",
677        tracing::instrument(level = "debug", skip(self, opt, snapshot))
678    )]
679    pub fn get_ranges<'a>(
680        &'a self,
681        opt: RangeOption<'a>,
682        snapshot: bool,
683    ) -> impl Stream<Item = FdbResult<FdbValues>> + Send + Sync + Unpin + 'a {
684        stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
685            if let Some(opt) = maybe_opt {
686                Either::Left(self.get_range(&opt, iteration as usize, snapshot).map(
687                    move |maybe_values| {
688                        let next_opt = match &maybe_values {
689                            Ok(values) => opt.next_range(values),
690                            Err(..) => None,
691                        };
692                        Some((maybe_values, (iteration + 1, next_opt)))
693                    },
694                ))
695            } else {
696                Either::Right(future::ready(None))
697            }
698        })
699    }
700
701    /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
702    /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
703    /// equal to the key resolved by the begin key selector and lexicographically less than the key
704    /// resolved by the end key selector.
705    ///
706    /// Returns a stream of KeyValue.
707    ///
708    /// # Arguments
709    ///
710    /// * `opt`: the range, limit, target_bytes and mode
711    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
712    #[cfg_attr(
713        feature = "trace",
714        tracing::instrument(level = "debug", skip(self, opt, snapshot))
715    )]
716    pub fn get_ranges_keyvalues<'a>(
717        &'a self,
718        opt: RangeOption<'a>,
719        snapshot: bool,
720    ) -> impl Stream<Item = FdbResult<FdbValue>> + Unpin + 'a {
721        self.get_ranges(opt, snapshot)
722            .map_ok(|values| stream::iter(values.into_iter().map(Ok)))
723            .try_flatten()
724    }
725
726    /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
727    /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
728    /// equal to the key resolved by the begin key selector and lexicographically less than the key
729    /// resolved by the end key selector.
730    ///
731    /// <div class="warning">
732    /// This method returns <strong>only a single batch</strong> of results, not necessarily all
733    /// key-value pairs in the range. It is a low-level primitive for manual pagination. Most
734    /// callers should use
735    /// <a href="struct.Transaction.html#method.get_ranges_keyvalues"><code>get_ranges_keyvalues</code></a>,
736    /// which automatically pages through the full range and yields every key-value pair as a stream.
737    /// </div>
738    ///
739    /// # Arguments
740    ///
741    /// * `opt`: the range, limit, target_bytes and mode
742    /// * `iteration`: If opt.mode is Iterator, this parameter should start at 1 and be incremented
743    ///   by 1 for each successive call while reading this range. In all other cases it is ignored.
744    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
745    #[cfg_attr(
746        feature = "trace",
747        tracing::instrument(level = "debug", skip(self, opt, snapshot))
748    )]
749    pub fn get_range(
750        &self,
751        opt: &RangeOption,
752        iteration: usize,
753        snapshot: bool,
754    ) -> impl Future<Output = FdbResult<FdbValues>> + Send + Sync + Unpin + use<> {
755        let begin = &opt.begin;
756        let end = &opt.end;
757        let key_begin = begin.key();
758        let key_end = end.key();
759
760        let metrics = self.metrics.clone();
761
762        FdbFuture::<FdbValues>::new(unsafe {
763            fdb_sys::fdb_transaction_get_range(
764                self.inner.as_ptr(),
765                key_begin.as_ptr(),
766                fdb_len(key_begin.len(), "key_begin"),
767                fdb_bool(begin.or_equal()),
768                begin.offset(),
769                key_end.as_ptr(),
770                fdb_len(key_end.len(), "key_end"),
771                fdb_bool(end.or_equal()),
772                end.offset(),
773                fdb_limit(opt.limit.unwrap_or(0)),
774                fdb_limit(opt.target_bytes),
775                opt.mode.code(),
776                fdb_iteration(iteration),
777                fdb_bool(snapshot),
778                fdb_bool(opt.reverse),
779            )
780        })
781        .map(move |result| {
782            if let (Ok(values), Some(metrics)) = (&result, metrics) {
783                let kv_fetched = values.len();
784                let mut bytes_count = 0;
785
786                for key_value in values.as_ref() {
787                    let key_len = key_value.key().len();
788                    let value_len = key_value.value().len();
789
790                    bytes_count += (key_len + value_len) as u64
791                }
792
793                metrics.report_metrics(FdbCommand::GetRange(bytes_count, kv_fetched as u64));
794            };
795
796            result
797        })
798    }
799
800    /// Mapped Range is an experimental feature introduced in FDB 7.1.
801    /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
802    /// In such a case, querying records by scanning an index in relational databases can be
803    /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
804    ///
805    /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
806    /// this can happen in one request without additional back and forth. Considering the overhead
807    /// of each request, this saves time and resources on serialization, deserialization, and network.
808    ///
809    /// A mapped request will:
810    ///
811    /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
812    /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
813    /// * Put all results in a nested structure and return them.
814    ///
815    /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
816    /// using snapshot isolation AND disabling read-your-writes.
817    ///
818    /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
819    ///
820    /// This is the "raw" version, users are expected to use [Transaction::get_mapped_ranges]
821    #[cfg_api_versions(min = 710)]
822    #[cfg_attr(
823        feature = "trace",
824        tracing::instrument(level = "debug", skip(self, opt, mapper, snapshot))
825    )]
826    pub fn get_mapped_range(
827        &self,
828        opt: &RangeOption,
829        mapper: &[u8],
830        iteration: usize,
831        snapshot: bool,
832    ) -> impl Future<Output = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + use<> {
833        let begin = &opt.begin;
834        let end = &opt.end;
835        let key_begin = begin.key();
836        let key_end = end.key();
837
838        FdbFuture::new(unsafe {
839            fdb_sys::fdb_transaction_get_mapped_range(
840                self.inner.as_ptr(),
841                key_begin.as_ptr(),
842                fdb_len(key_begin.len(), "key_begin"),
843                fdb_bool(begin.or_equal()),
844                begin.offset(),
845                key_end.as_ptr(),
846                fdb_len(key_end.len(), "key_end"),
847                fdb_bool(end.or_equal()),
848                end.offset(),
849                mapper.as_ptr(),
850                fdb_len(mapper.len(), "mapper_length"),
851                fdb_limit(opt.limit.unwrap_or(0)),
852                fdb_limit(opt.target_bytes),
853                opt.mode.code(),
854                fdb_iteration(iteration),
855                fdb_bool(snapshot),
856                fdb_bool(opt.reverse),
857            )
858        })
859    }
860
861    /// Mapped Range is an experimental feature introduced in FDB 7.1.
862    /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
863    /// In such a case, querying records by scanning an index in relational databases can be
864    /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
865    ///
866    /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
867    /// this can happen in one request without additional back and forth. Considering the overhead
868    /// of each request, this saves time and resources on serialization, deserialization, and network.
869    ///
870    /// A mapped request will:
871    ///
872    /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
873    /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
874    /// * Put all results in a nested structure and return them.
875    ///
876    /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
877    /// using snapshot isolation AND disabling read-your-writes.
878    ///
879    /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
880    #[cfg_api_versions(min = 710)]
881    #[cfg_attr(
882        feature = "trace",
883        tracing::instrument(level = "debug", skip(self, opt, mapper, snapshot))
884    )]
885    pub fn get_mapped_ranges<'a>(
886        &'a self,
887        opt: RangeOption<'a>,
888        mapper: &'a [u8],
889        snapshot: bool,
890    ) -> impl Stream<Item = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + 'a {
891        stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
892            if let Some(opt) = maybe_opt {
893                Either::Left(
894                    self.get_mapped_range(&opt, mapper, iteration as usize, snapshot)
895                        .map(move |maybe_values| {
896                            let next_opt = match &maybe_values {
897                                Ok(values) => opt.next_mapped_range(values),
898                                Err(..) => None,
899                            };
900                            Some((maybe_values, (iteration + 1, next_opt)))
901                        }),
902                )
903            } else {
904                Either::Right(future::ready(None))
905            }
906        })
907    }
908
909    /// Modify the database snapshot represented by transaction to remove all keys (if any) which
910    /// are lexicographically greater than or equal to the given begin key and lexicographically
911    /// less than the given end_key.
912    ///
913    /// The modification affects the actual database only if transaction is later committed with
914    /// `Transaction::commit`.
915    #[cfg_attr(
916        feature = "trace",
917        tracing::instrument(level = "debug", skip(self, begin, end))
918    )]
919    pub fn clear_range(&self, begin: &[u8], end: &[u8]) {
920        unsafe {
921            fdb_sys::fdb_transaction_clear_range(
922                self.inner.as_ptr(),
923                begin.as_ptr(),
924                fdb_len(begin.len(), "begin"),
925                end.as_ptr(),
926                fdb_len(end.len(), "end"),
927            )
928        }
929
930        if let Some(metrics) = &self.metrics {
931            metrics.report_metrics(FdbCommand::ClearRange);
932        }
933    }
934
935    /// Get the estimated byte size of the key range based on the byte sample collected by FDB
936    #[cfg_api_versions(min = 630)]
937    pub fn get_estimated_range_size_bytes(
938        &self,
939        begin: &[u8],
940        end: &[u8],
941    ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin + use<> {
942        FdbFuture::<i64>::new(unsafe {
943            fdb_sys::fdb_transaction_get_estimated_range_size_bytes(
944                self.inner.as_ptr(),
945                begin.as_ptr(),
946                fdb_len(begin.len(), "begin"),
947                end.as_ptr(),
948                fdb_len(end.len(), "end"),
949            )
950        })
951    }
952
953    /// Attempts to commit the sets and clears previously applied to the database snapshot
954    /// represented by transaction to the actual database.
955    ///
956    /// The commit may or may not succeed – in particular, if a conflicting transaction previously
957    /// committed, then the commit must fail in order to preserve transactional isolation. If the
958    /// commit does succeed, the transaction is durably committed to the database and all
959    /// subsequently started transactions will observe its effects.
960    ///
961    /// It is not necessary to commit a read-only transaction – you can simply drop it.
962    ///
963    /// Callers will usually want to retry a transaction if the commit or a another method on the
964    /// transaction returns a retryable error (see `on_error` and/or `Database::transact`).
965    ///
966    /// As with other client/server databases, in some failure scenarios a client may be unable to
967    /// determine whether a transaction succeeded. In these cases, `Transaction::commit` will return
968    /// an error and `is_maybe_committed()` will returns true on that error. The `on_error` function
969    /// treats this error as retryable, so retry loops that don’t check for `is_maybe_committed()`
970    /// could execute the transaction twice. In these cases, you must consider the idempotence of
971    /// the transaction. For more information, see [Transactions with unknown results](https://apple.github.io/foundationdb/developer-guide.html#developer-guide-unknown-results).
972    ///
973    /// Normally, commit will wait for outstanding reads to return. However, if those reads were
974    /// snapshot reads or the transaction option for disabling “read-your-writes” has been invoked,
975    /// any outstanding reads will immediately return errors.
976    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
977    pub fn commit(self) -> impl Future<Output = TransactionResult> + Send + Sync + Unpin {
978        FdbFuture::<()>::new(unsafe { fdb_sys::fdb_transaction_commit(self.inner.as_ptr()) }).map(
979            move |r| match r {
980                Ok(()) => Ok(TransactionCommitted { tr: self }),
981                Err(err) => Err(TransactionCommitError { tr: self, err }),
982            },
983        )
984    }
985
986    /// Implements the recommended retry and backoff behavior for a transaction. This function knows
987    /// which of the error codes generated by other `Transaction` functions represent temporary
988    /// error conditions and which represent application errors that should be handled by the
989    /// application. It also implements an exponential backoff strategy to avoid swamping the
990    /// database cluster with excessive retries when there is a high level of conflict between
991    /// transactions.
992    ///
993    /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
994    /// transaction has already been reset.
995    ///
996    /// You should not call this method most of the times and use `Database::transact` which
997    /// implements a retry loop strategy for you.
998    pub fn on_error(
999        self,
1000        err: FdbError,
1001    ) -> impl Future<Output = FdbResult<Transaction>> + Send + Sync + Unpin {
1002        FdbFuture::<()>::new(unsafe {
1003            fdb_sys::fdb_transaction_on_error(self.inner.as_ptr(), err.code())
1004        })
1005        .map_ok(|()| self)
1006    }
1007
1008    /// Cancels the transaction. All pending or future uses of the transaction will return a
1009    /// transaction_cancelled error. The transaction can be used again after it is reset.
1010    pub fn cancel(self) -> TransactionCancelled {
1011        unsafe {
1012            fdb_sys::fdb_transaction_cancel(self.inner.as_ptr());
1013        }
1014        TransactionCancelled { tr: self }
1015    }
1016
1017    /// Sets a custom metric for the transaction with the specified name, value, and labels.
1018    ///
1019    /// Custom metrics allow you to track application-specific metrics during transaction execution.
1020    /// These metrics are collected alongside the standard FoundationDB metrics and can be used
1021    /// for monitoring, debugging, and performance analysis.
1022    ///
1023    /// # Arguments
1024    /// * `name` - The name of the metric (e.g., "query_time", "cache_hits")
1025    /// * `value` - The value to set for the metric
1026    /// * `labels` - Key-value pairs for labeling the metric, allowing for dimensional metrics
1027    ///   (e.g., `[("operation", "read"), ("region", "us-west")]`)
1028    ///
1029    /// # Returns
1030    /// * `Ok(())` if the metric was set successfully
1031    /// * `Err(TransactionMetricsNotFound)` if this transaction was not created with metrics instrumentation
1032    ///
1033    /// # Example
1034    /// ```
1035    /// # use foundationdb::*;
1036    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1037    /// # let db = Database::default()?;
1038    /// let metrics = TransactionMetrics::new();
1039    /// let txn = db.create_instrumented_trx(metrics)?;
1040    ///
1041    /// // Set a custom metric with labels
1042    /// txn.set_custom_metric("query_processing_time", 42, &[("query_type", "select"), ("table", "users")])?;
1043    /// # Ok(())
1044    /// # }
1045    /// ```
1046    ///
1047    /// # Note
1048    /// This method only works on transactions created with `create_instrumented_trx()` or within
1049    /// `instrumented_run()`. For regular transactions created with `create_trx()`, this will
1050    /// return `Err(TransactionMetricsNotFound)`.
1051    pub fn set_custom_metric(
1052        &self,
1053        name: &str,
1054        value: u64,
1055        labels: &[(&str, &str)],
1056    ) -> Result<(), TransactionMetricsNotFound> {
1057        if let Some(metrics) = &self.metrics {
1058            metrics.set_custom(name, value, labels);
1059            Ok(())
1060        } else {
1061            Err(TransactionMetricsNotFound)
1062        }
1063    }
1064
1065    /// Increments a custom metric for the transaction by the specified amount.
1066    ///
1067    /// This is useful for counting events or accumulating values during transaction execution.
1068    /// If the metric doesn't exist yet, it will be created with the specified amount.
1069    /// If it already exists with the same labels, its value will be incremented.
1070    ///
1071    /// # Arguments
1072    /// * `name` - The name of the metric to increment (e.g., "requests", "bytes_processed")
1073    /// * `amount` - The amount to increment the metric by
1074    /// * `labels` - Key-value pairs for labeling the metric, allowing for dimensional metrics
1075    ///   (e.g., `[("status", "success"), ("endpoint", "api/v1/users")]`)
1076    ///
1077    /// # Returns
1078    /// * `Ok(())` if the metric was incremented successfully
1079    /// * `Err(TransactionMetricsNotFound)` if this transaction was not created with metrics instrumentation
1080    ///
1081    /// # Example
1082    /// ```
1083    /// # use foundationdb::*;
1084    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1085    /// # let db = Database::default()?;
1086    /// let metrics = TransactionMetrics::new();
1087    /// let txn = db.create_instrumented_trx(metrics)?;
1088    ///
1089    /// // Increment a counter each time a specific operation occurs
1090    /// txn.increment_custom_metric("cache_misses", 1, &[("cache", "user_data")])?;
1091    ///
1092    /// // Later in the transaction, increment it again
1093    /// txn.increment_custom_metric("cache_misses", 1, &[("cache", "user_data")])?;
1094    ///
1095    /// // The final value will be 2
1096    /// # Ok(())
1097    /// # }
1098    /// ```
1099    ///
1100    /// # Note
1101    /// This method only works on transactions created with `create_instrumented_trx()` or within
1102    /// `instrumented_run()`. For regular transactions created with `create_trx()`, this will
1103    /// return `Err(TransactionMetricsNotFound)`.
1104    pub fn increment_custom_metric(
1105        &self,
1106        name: &str,
1107        amount: u64,
1108        labels: &[(&str, &str)],
1109    ) -> Result<(), TransactionMetricsNotFound> {
1110        if let Some(metrics) = &self.metrics {
1111            metrics.increment_custom(name, amount, labels);
1112            Ok(())
1113        } else {
1114            Err(TransactionMetricsNotFound)
1115        }
1116    }
1117
1118    /// Returns a list of public network addresses as strings, one for each of the storage servers
1119    /// responsible for storing key_name and its associated value.
1120    pub fn get_addresses_for_key(
1121        &self,
1122        key: &[u8],
1123    ) -> impl Future<Output = FdbResult<FdbAddresses>> + Send + Sync + Unpin + use<> {
1124        FdbFuture::new(unsafe {
1125            fdb_sys::fdb_transaction_get_addresses_for_key(
1126                self.inner.as_ptr(),
1127                key.as_ptr(),
1128                fdb_len(key.len(), "key"),
1129            )
1130        })
1131    }
1132
1133    /// A watch's behavior is relative to the transaction that created it. A watch will report a
1134    /// change in relation to the key’s value as readable by that transaction. The initial value
1135    /// used for comparison is either that of the transaction’s read version or the value as
1136    /// modified by the transaction itself prior to the creation of the watch. If the value changes
1137    /// and then changes back to its initial value, the watch might not report the change.
1138    ///
1139    /// Until the transaction that created it has been committed, a watch will not report changes
1140    /// made by other transactions. In contrast, a watch will immediately report changes made by
1141    /// the transaction itself. Watches cannot be created if the transaction has set the
1142    /// READ_YOUR_WRITES_DISABLE transaction option, and an attempt to do so will return an
1143    /// watches_disabled error.
1144    ///
1145    /// If the transaction used to create a watch encounters an error during commit, then the watch
1146    /// will be set with that error. A transaction whose commit result is unknown will set all of
1147    /// its watches with the commit_unknown_result error. If an uncommitted transaction is reset or
1148    /// destroyed, then any watches it created will be set with the transaction_cancelled error.
1149    ///
1150    /// Returns an future representing an empty value that will be set once the watch has
1151    /// detected a change to the value at the specified key.
1152    ///
1153    /// By default, each database connection can have no more than 10,000 watches that have not yet
1154    /// reported a change. When this number is exceeded, an attempt to create a watch will return a
1155    /// too_many_watches error. This limit can be changed using the MAX_WATCHES database option.
1156    /// Because a watch outlives the transaction that creates it, any watch that is no longer
1157    /// needed should be cancelled by dropping its future.
1158    pub fn watch(
1159        &self,
1160        key: &[u8],
1161    ) -> impl Future<Output = FdbResult<()>> + Send + Sync + Unpin + use<> {
1162        FdbFuture::new(unsafe {
1163            fdb_sys::fdb_transaction_watch(
1164                self.inner.as_ptr(),
1165                key.as_ptr(),
1166                fdb_len(key.len(), "key"),
1167            )
1168        })
1169    }
1170
1171    /// Returns an FDBFuture which will be set to the approximate transaction size so far in the
1172    /// returned future, which is the summation of the estimated size of mutations, read conflict
1173    /// ranges, and write conflict ranges.
1174    ///
1175    /// This can be called multiple times before the transaction is committed.
1176    #[cfg_api_versions(min = 620)]
1177    pub fn get_approximate_size(
1178        &self,
1179    ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin + use<> {
1180        FdbFuture::new(unsafe {
1181            fdb_sys::fdb_transaction_get_approximate_size(self.inner.as_ptr())
1182        })
1183    }
1184
1185    /// Gets a list of keys that can split the given range into (roughly) equally sized chunks based on chunk_size.
1186    /// Note: the returned split points contain the start key and end key of the given range.
1187    #[cfg_api_versions(min = 700)]
1188    pub fn get_range_split_points(
1189        &self,
1190        begin: &[u8],
1191        end: &[u8],
1192        chunk_size: i64,
1193    ) -> impl Future<Output = FdbResult<FdbKeys>> + Send + Sync + Unpin + use<> {
1194        FdbFuture::<FdbKeys>::new(unsafe {
1195            fdb_sys::fdb_transaction_get_range_split_points(
1196                self.inner.as_ptr(),
1197                begin.as_ptr(),
1198                fdb_len(begin.len(), "begin"),
1199                end.as_ptr(),
1200                fdb_len(end.len(), "end"),
1201                chunk_size,
1202            )
1203        })
1204    }
1205
1206    /// Returns an FDBFuture which will be set to the versionstamp which was used by any
1207    /// versionstamp operations in this transaction.
1208    ///
1209    /// The future will be ready only after the successful completion of a call to `commit()` on
1210    /// this Transaction. Read-only transactions do not modify the database when committed and will
1211    /// result in the future completing with an error. Keep in mind that a transaction which reads
1212    /// keys and then sets them to their current values may be optimized to a read-only transaction.
1213    ///
1214    /// Most applications will not call this function.
1215    pub fn get_versionstamp(
1216        &self,
1217    ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin + use<> {
1218        FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_versionstamp(self.inner.as_ptr()) })
1219    }
1220
1221    /// The transaction obtains a snapshot read version automatically at the time of the first call
1222    /// to `get_*()` (including this one) and (unless causal consistency has been deliberately
1223    /// compromised by transaction options) is guaranteed to represent all transactions which were
1224    /// reported committed before that call.
1225    pub fn get_read_version(
1226        &self,
1227    ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin + use<> {
1228        FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_read_version(self.inner.as_ptr()) })
1229    }
1230
1231    /// Sets the snapshot read version used by a transaction.
1232    ///
1233    /// This is not needed in simple cases.
1234    /// If the given version is too old, subsequent reads will fail with error_code_past_version;
1235    /// if it is too new, subsequent reads may be delayed indefinitely and/or fail with
1236    /// error_code_future_version. If any of get_*() have been called on this transaction already,
1237    /// the result is undefined.
1238    pub fn set_read_version(&self, version: i64) {
1239        unsafe { fdb_sys::fdb_transaction_set_read_version(self.inner.as_ptr(), version) }
1240    }
1241
1242    /// The metadata version key `\xff/metadataVersion` is a key intended to help layers deal with hot keys.
1243    /// The value of this key is sent to clients along with the read version from the proxy,
1244    /// so a client can read its value without communicating with a storage server.
1245    /// To retrieve the metadataVersion, you need to set `TransactionOption::ReadSystemKeys`
1246    #[cfg_api_versions(min = 610)]
1247    pub async fn get_metadata_version(&self, snapshot: bool) -> FdbResult<Option<i64>> {
1248        match self.get(METADATA_VERSION_KEY, snapshot).await {
1249            Ok(Some(fdb_slice)) => {
1250                let value = fdb_slice.deref();
1251                // as we cannot write the metadata-key directly(we must mutate with an atomic_op),
1252                // can we assume that it will always be the correct size?
1253                if value.len() < 8 {
1254                    return Ok(None);
1255                }
1256
1257                // The 80-bits versionstamps are 10 bytes longs, and are composed of:
1258                // * 8 bytes (Transaction Version)
1259                // * followed by 2 bytes (Transaction Batch Order)
1260                // More details can be found here: https://forums.foundationdb.org/t/implementing-versionstamps-in-bindings/250
1261                let mut arr = [0u8; 8];
1262                arr.copy_from_slice(&value[0..8]);
1263                let transaction_version: i64 = i64::from_be_bytes(arr);
1264
1265                Ok(Some(transaction_version))
1266            }
1267            Ok(None) => Ok(None),
1268
1269            Err(err) => Err(err),
1270        }
1271    }
1272
1273    #[cfg_api_versions(min = 610)]
1274    pub fn update_metadata_version(&self) {
1275        // The param is transformed by removing the final four bytes from ``param`` and reading
1276        // those as a little-Endian 32-bit integer to get a position ``pos``.
1277        // The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the
1278        // versionstamp of the transaction used. The first byte of the parameter is position 0.
1279        // As we only have the metadata value, we can just create an 14-bytes Vec filled with 0u8.
1280        let param = vec![0u8; 14];
1281        self.atomic_op(
1282            METADATA_VERSION_KEY,
1283            param.as_slice(),
1284            options::MutationType::SetVersionstampedValue,
1285        )
1286    }
1287
1288    /// Reset transaction to its initial state.
1289    ///
1290    /// In order to protect against a race condition with cancel(), this call require a mutable
1291    /// access to the transaction.
1292    ///
1293    /// This is similar to dropping the transaction and creating a new one.
1294    ///
1295    /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
1296    /// transaction has already been reset.
1297    pub fn reset(&mut self) {
1298        unsafe { fdb_sys::fdb_transaction_reset(self.inner.as_ptr()) }
1299    }
1300
1301    /// Reads the conflicting key ranges from the special keyspace after a commit conflict.
1302    ///
1303    /// This method reads from `\xff\xff/transaction/conflicting_keys/` and parses the
1304    /// boundary encoding where `b"1"` marks range starts and `b"0"` marks range ends.
1305    ///
1306    /// The special keyspace read is resolved client-side — no network round-trip to the
1307    /// cluster. The future still goes through the FDB network thread event loop, but the
1308    /// data comes from an in-memory map populated during the commit response. Returns
1309    /// an empty `Vec` if
1310    /// [`TransactionOption::ReportConflictingKeys`](crate::options::TransactionOption::ReportConflictingKeys)
1311    /// was not set.
1312    ///
1313    /// # Errors
1314    ///
1315    /// Returns an `FdbError` if the special keyspace read fails.
1316    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
1317    pub async fn conflicting_keys(&self) -> FdbResult<Vec<ConflictingKeyRange>> {
1318        let opt = RangeOption::from((CONFLICTING_KEYS_PREFIX, CONFLICTING_KEYS_END));
1319        let range_result = self.get_range(&opt, 1, false).await?;
1320
1321        let prefix_len = CONFLICTING_KEYS_PREFIX.len();
1322        let mut ranges = Vec::new();
1323        let mut current_begin: Option<Vec<u8>> = None;
1324
1325        for kv in range_result.iter() {
1326            let raw_key = kv.key();
1327            let actual_key = if raw_key.len() > prefix_len {
1328                raw_key[prefix_len..].to_vec()
1329            } else {
1330                Vec::new()
1331            };
1332
1333            match kv.value() {
1334                b"1" => {
1335                    current_begin = Some(actual_key);
1336                }
1337                b"0" => {
1338                    if let Some(begin) = current_begin.take() {
1339                        ranges.push(ConflictingKeyRange {
1340                            begin,
1341                            end: actual_key,
1342                        });
1343                    }
1344                }
1345                _ => {}
1346            }
1347        }
1348
1349        debug_assert!(
1350            current_begin.is_none(),
1351            "unpaired '1' marker in conflicting keys response"
1352        );
1353
1354        Ok(ranges)
1355    }
1356
1357    /// Adds a conflict range to a transaction without performing the associated read or write.
1358    ///
1359    /// # Note
1360    ///
1361    /// Most applications will use the serializable isolation that transactions provide by default
1362    /// and will not need to manipulate conflict ranges.
1363    pub fn add_conflict_range(
1364        &self,
1365        begin: &[u8],
1366        end: &[u8],
1367        ty: options::ConflictRangeType,
1368    ) -> FdbResult<()> {
1369        error::eval(unsafe {
1370            fdb_sys::fdb_transaction_add_conflict_range(
1371                self.inner.as_ptr(),
1372                begin.as_ptr(),
1373                fdb_len(begin.len(), "begin"),
1374                end.as_ptr(),
1375                fdb_len(end.len(), "end"),
1376                ty.code(),
1377            )
1378        })
1379    }
1380}
1381
1382impl Drop for Transaction {
1383    fn drop(&mut self) {
1384        unsafe {
1385            fdb_sys::fdb_transaction_destroy(self.inner.as_ptr());
1386        }
1387    }
1388}
1389
1390/// A retryable transaction, generated by Database.run
1391#[derive(Clone)]
1392pub struct RetryableTransaction {
1393    inner: Arc<Transaction>,
1394}
1395
1396impl Deref for RetryableTransaction {
1397    type Target = Transaction;
1398    fn deref(&self) -> &Transaction {
1399        self.inner.deref()
1400    }
1401}
1402
1403impl RetryableTransaction {
1404    pub(crate) fn new(t: Transaction) -> RetryableTransaction {
1405        RetryableTransaction { inner: Arc::new(t) }
1406    }
1407
1408    pub(crate) fn take(self) -> Result<Transaction, FdbBindingError> {
1409        // checking weak references
1410        if Arc::weak_count(&self.inner) != 0 {
1411            return Err(FdbBindingError::ReferenceToTransactionKept);
1412        }
1413        Arc::try_unwrap(self.inner).map_err(|_| FdbBindingError::ReferenceToTransactionKept)
1414    }
1415
1416    pub(crate) async fn on_error(
1417        self,
1418        err: FdbError,
1419    ) -> Result<Result<RetryableTransaction, FdbError>, FdbBindingError> {
1420        Ok(self
1421            .take()?
1422            .on_error(err)
1423            .await
1424            .map(RetryableTransaction::new))
1425    }
1426
1427    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
1428    pub(crate) async fn commit(
1429        self,
1430    ) -> Result<Result<TransactionCommitted, TransactionCommitError>, FdbBindingError> {
1431        Ok(self.take()?.commit().await)
1432    }
1433}