foundationdb/
transaction.rs

1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBTransaction C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#transaction>
12
13use foundationdb_sys as fdb_sys;
14use std::fmt;
15use std::ops::{Deref, Range, RangeInclusive};
16use std::ptr::NonNull;
17use std::sync::Arc;
18
19use crate::future::*;
20use crate::keyselector::*;
21use crate::metrics::{FdbCommand, TransactionMetrics};
22use crate::options;
23
24use crate::{error, FdbError, FdbResult};
25use foundationdb_macros::cfg_api_versions;
26
27use crate::error::{FdbBindingError, TransactionMetricsNotFound};
28
29use futures::{
30    future, future::Either, stream, Future, FutureExt, Stream, TryFutureExt, TryStreamExt,
31};
32
33#[cfg_api_versions(min = 610)]
34const METADATA_VERSION_KEY: &[u8] = b"\xff/metadataVersion";
35
36/// Special keyspace prefix for conflicting keys.
37const CONFLICTING_KEYS_PREFIX: &[u8] = b"\xff\xff/transaction/conflicting_keys/";
38// Matches C++ SystemData.cpp conflictingKeysRange end key.
39const CONFLICTING_KEYS_END: &[u8] = b"\xff\xff/transaction/conflicting_keys/\xff\xff";
40
41/// A key range that conflicted during a transaction commit.
42///
43/// Returned by [`Transaction::conflicting_keys`] after a commit conflict
44/// when [`TransactionOption::ReportConflictingKeys`](crate::options::TransactionOption::ReportConflictingKeys)
45/// is enabled.
46///
47/// The special keyspace encodes conflicting ranges using boundary markers:
48/// - Value `b"1"` marks the inclusive start of a conflicting range
49/// - Value `b"0"` marks the exclusive end of a conflicting range
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct ConflictingKeyRange {
52    begin: Vec<u8>,
53    end: Vec<u8>,
54}
55
56impl ConflictingKeyRange {
57    /// The inclusive begin of the conflicting key range.
58    pub fn begin(&self) -> &[u8] {
59        &self.begin
60    }
61
62    /// The exclusive end of the conflicting key range.
63    pub fn end(&self) -> &[u8] {
64        &self.end
65    }
66}
67
68impl fmt::Display for ConflictingKeyRange {
69    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70        write!(
71            f,
72            "{}..{}",
73            String::from_utf8_lossy(&self.begin),
74            String::from_utf8_lossy(&self.end),
75        )
76    }
77}
78
79/// A committed transaction.
80#[derive(Debug)]
81#[repr(transparent)]
82pub struct TransactionCommitted {
83    tr: Transaction,
84}
85
86impl TransactionCommitted {
87    /// Retrieves the database version number at which a given transaction was committed.
88    ///
89    /// Read-only transactions do not modify the database when committed and will have a committed
90    /// version of -1. Keep in mind that a transaction which reads keys and then sets them to their
91    /// current values may be optimized to a read-only transaction.
92    ///
93    /// Note that database versions are not necessarily unique to a given transaction and so cannot
94    /// be used to determine in what order two transactions completed. The only use for this
95    /// function is to manually enforce causal consistency when calling `set_read_version()` on
96    /// another subsequent transaction.
97    ///
98    /// Most applications will not call this function.
99    pub fn committed_version(&self) -> FdbResult<i64> {
100        let mut version: i64 = 0;
101        error::eval(unsafe {
102            fdb_sys::fdb_transaction_get_committed_version(self.tr.inner.as_ptr(), &mut version)
103        })?;
104        Ok(version)
105    }
106
107    /// Reset the transaction to its initial state.
108    ///
109    /// This will not affect previously committed data.
110    ///
111    /// This is similar to dropping the transaction and creating a new one.
112    pub fn reset(mut self) -> Transaction {
113        self.tr.reset();
114        self.tr
115    }
116}
117impl From<TransactionCommitted> for Transaction {
118    fn from(tc: TransactionCommitted) -> Transaction {
119        tc.reset()
120    }
121}
122
123/// A failed to commit transaction.
124pub struct TransactionCommitError {
125    tr: Transaction,
126    err: FdbError,
127}
128
129impl TransactionCommitError {
130    /// Implements the recommended retry and backoff behavior for a transaction. This function knows
131    /// which of the error codes generated by other `Transaction` functions represent temporary
132    /// error conditions and which represent application errors that should be handled by the
133    /// application. It also implements an exponential backoff strategy to avoid swamping the
134    /// database cluster with excessive retries when there is a high level of conflict between
135    /// transactions.
136    ///
137    /// You should not call this method most of the times and use `Database::transact` which
138    /// implements a retry loop strategy for you.
139    pub fn on_error(self) -> impl Future<Output = FdbResult<Transaction>> {
140        FdbFuture::<()>::new(unsafe {
141            fdb_sys::fdb_transaction_on_error(self.tr.inner.as_ptr(), self.err.code())
142        })
143        .map_ok(|()| self.tr)
144    }
145
146    /// Reads the conflicting key ranges that caused this commit failure.
147    ///
148    /// Only returns meaningful results if
149    /// [`TransactionOption::ReportConflictingKeys`](crate::options::TransactionOption::ReportConflictingKeys)
150    /// was set on the transaction **and** the error is `not_committed` (code 1020).
151    ///
152    /// Must be called **before** [`on_error`](Self::on_error) which resets the transaction.
153    ///
154    /// # Errors
155    ///
156    /// Returns an `FdbError` if the special keyspace read fails.
157    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
158    pub async fn conflicting_keys(&self) -> FdbResult<Vec<ConflictingKeyRange>> {
159        self.tr.conflicting_keys().await
160    }
161
162    /// Reset the transaction to its initial state.
163    ///
164    /// This is similar to dropping the transaction and creating a new one.
165    pub fn reset(mut self) -> Transaction {
166        self.tr.reset();
167        self.tr
168    }
169}
170
171impl Deref for TransactionCommitError {
172    type Target = FdbError;
173    fn deref(&self) -> &FdbError {
174        &self.err
175    }
176}
177
178impl From<TransactionCommitError> for FdbError {
179    fn from(tce: TransactionCommitError) -> FdbError {
180        tce.err
181    }
182}
183
184impl fmt::Debug for TransactionCommitError {
185    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
186        write!(f, "TransactionCommitError({})", self.err)
187    }
188}
189
190impl fmt::Display for TransactionCommitError {
191    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192        self.err.fmt(f)
193    }
194}
195
196impl std::error::Error for TransactionCommitError {}
197
198/// The result of `Transaction::Commit`
199type TransactionResult = Result<TransactionCommitted, TransactionCommitError>;
200
201/// A cancelled transaction
202#[derive(Debug)]
203#[repr(transparent)]
204pub struct TransactionCancelled {
205    tr: Transaction,
206}
207impl TransactionCancelled {
208    /// Reset the transaction to its initial state.
209    ///
210    /// This is similar to dropping the transaction and creating a new one.
211    pub fn reset(mut self) -> Transaction {
212        self.tr.reset();
213        self.tr
214    }
215}
216impl From<TransactionCancelled> for Transaction {
217    fn from(tc: TransactionCancelled) -> Transaction {
218        tc.reset()
219    }
220}
221
222/// In FoundationDB, a transaction is a mutable snapshot of a database.
223///
224/// All read and write operations on a transaction see and modify an otherwise-unchanging version of the database and only change the underlying database if and when the transaction is committed. Read operations do see the effects of previous write operations on the same transaction. Committing a transaction usually succeeds in the absence of conflicts.
225///
226/// Applications must provide error handling and an appropriate retry loop around the application code for a transaction. See the documentation for [fdb_transaction_on_error()](https://apple.github.io/foundationdb/api-c.html#transaction).
227///
228/// Transactions group operations into a unit with the properties of atomicity, isolation, and durability. Transactions also provide the ability to maintain an application’s invariants or integrity constraints, supporting the property of consistency. Together these properties are known as ACID.
229///
230/// Transactions are also causally consistent: once a transaction has been successfully committed, all subsequently created transactions will see the modifications made by it.
231#[derive(Debug)]
232pub struct Transaction {
233    // Order of fields should not be changed, because Rust drops field top-to-bottom, and
234    // transaction should be dropped before cluster.
235    inner: NonNull<fdb_sys::FDBTransaction>,
236    metrics: Option<TransactionMetrics>,
237}
238unsafe impl Send for Transaction {}
239unsafe impl Sync for Transaction {}
240
241/// Converts Rust `bool` into `fdb_sys::fdb_bool_t`
242#[inline]
243fn fdb_bool(v: bool) -> fdb_sys::fdb_bool_t {
244    if v {
245        1
246    } else {
247        0
248    }
249}
250#[inline]
251fn fdb_len(len: usize, context: &'static str) -> std::os::raw::c_int {
252    assert!(len <= i32::MAX as usize, "{context}.len() > i32::MAX");
253    len as i32
254}
255#[inline]
256fn fdb_iteration(iteration: usize) -> std::os::raw::c_int {
257    if iteration > i32::MAX as usize {
258        0 // this will cause client_invalid_operation
259    } else {
260        iteration as i32
261    }
262}
263#[inline]
264fn fdb_limit(v: usize) -> std::os::raw::c_int {
265    if v > i32::MAX as usize {
266        i32::MAX
267    } else {
268        v as i32
269    }
270}
271
272/// `RangeOption` represents a query parameters for range scan query.
273///
274/// You can construct `RangeOption` easily:
275///
276/// ```
277/// use foundationdb::RangeOption;
278///
279/// let opt = RangeOption::from((b"begin".as_ref(), b"end".as_ref()));
280/// let opt: RangeOption = (b"begin".as_ref()..b"end".as_ref()).into();
281/// let opt = RangeOption {
282///     limit: Some(10),
283///     ..RangeOption::from((b"begin".as_ref(), b"end".as_ref()))
284/// };
285/// ```
286#[derive(Debug, Clone)]
287pub struct RangeOption<'a> {
288    /// The beginning of the range.
289    pub begin: KeySelector<'a>,
290    /// The end of the range.
291    pub end: KeySelector<'a>,
292    /// If non-zero, indicates the maximum number of key-value pairs to return.
293    pub limit: Option<usize>,
294    /// If non-zero, indicates a (soft) cap on the combined number of bytes of keys and values to
295    /// return for each item.
296    pub target_bytes: usize,
297    /// One of the options::StreamingMode values indicating how the caller would like the data in
298    /// the range returned.
299    pub mode: options::StreamingMode,
300    /// If true, key-value pairs will be returned in reverse lexicographical order beginning at
301    /// the end of the range.
302    pub reverse: bool,
303    #[doc(hidden)]
304    pub __non_exhaustive: std::marker::PhantomData<()>,
305}
306
307impl RangeOption<'_> {
308    /// Reverses the range direction.
309    pub fn rev(mut self) -> Self {
310        self.reverse = !self.reverse;
311        self
312    }
313
314    pub fn next_range(mut self, kvs: &FdbValues) -> Option<Self> {
315        if !kvs.more() {
316            return None;
317        }
318
319        let last = kvs.last()?;
320        let last_key = last.key();
321
322        if let Some(limit) = self.limit.as_mut() {
323            *limit = limit.saturating_sub(kvs.len());
324            if *limit == 0 {
325                return None;
326            }
327        }
328
329        if self.reverse {
330            self.end.make_first_greater_or_equal(last_key);
331        } else {
332            self.begin.make_first_greater_than(last_key);
333        }
334        Some(self)
335    }
336
337    #[cfg_api_versions(min = 710)]
338    pub(crate) fn next_mapped_range(mut self, kvs: &MappedKeyValues) -> Option<Self> {
339        if !kvs.more() {
340            return None;
341        }
342
343        let last = kvs.last()?;
344        let last_key = last.parent_key();
345
346        if let Some(limit) = self.limit.as_mut() {
347            *limit = limit.saturating_sub(kvs.len());
348            if *limit == 0 {
349                return None;
350            }
351        }
352
353        if self.reverse {
354            self.end.make_first_greater_or_equal(last_key);
355        } else {
356            self.begin.make_first_greater_than(last_key);
357        }
358        Some(self)
359    }
360}
361
362impl Default for RangeOption<'_> {
363    fn default() -> Self {
364        Self {
365            begin: KeySelector::first_greater_or_equal([].as_ref()),
366            end: KeySelector::first_greater_or_equal([].as_ref()),
367            limit: None,
368            target_bytes: 0,
369            mode: options::StreamingMode::Iterator,
370            reverse: false,
371            __non_exhaustive: std::marker::PhantomData,
372        }
373    }
374}
375
376impl<'a> From<(KeySelector<'a>, KeySelector<'a>)> for RangeOption<'a> {
377    fn from((begin, end): (KeySelector<'a>, KeySelector<'a>)) -> Self {
378        Self {
379            begin,
380            end,
381            ..Self::default()
382        }
383    }
384}
385impl From<(Vec<u8>, Vec<u8>)> for RangeOption<'static> {
386    fn from((begin, end): (Vec<u8>, Vec<u8>)) -> Self {
387        Self {
388            begin: KeySelector::first_greater_or_equal(begin),
389            end: KeySelector::first_greater_or_equal(end),
390            ..Self::default()
391        }
392    }
393}
394impl<'a> From<(&'a [u8], &'a [u8])> for RangeOption<'a> {
395    fn from((begin, end): (&'a [u8], &'a [u8])) -> Self {
396        Self {
397            begin: KeySelector::first_greater_or_equal(begin),
398            end: KeySelector::first_greater_or_equal(end),
399            ..Self::default()
400        }
401    }
402}
403impl<'a> From<std::ops::Range<KeySelector<'a>>> for RangeOption<'a> {
404    fn from(range: Range<KeySelector<'a>>) -> Self {
405        RangeOption::from((range.start, range.end))
406    }
407}
408
409impl<'a> From<std::ops::Range<&'a [u8]>> for RangeOption<'a> {
410    fn from(range: Range<&'a [u8]>) -> Self {
411        RangeOption::from((range.start, range.end))
412    }
413}
414
415impl From<std::ops::Range<std::vec::Vec<u8>>> for RangeOption<'static> {
416    fn from(range: Range<Vec<u8>>) -> Self {
417        RangeOption::from((range.start, range.end))
418    }
419}
420
421impl<'a> From<std::ops::RangeInclusive<&'a [u8]>> for RangeOption<'a> {
422    fn from(range: RangeInclusive<&'a [u8]>) -> Self {
423        let (start, end) = range.into_inner();
424        (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
425    }
426}
427
428impl From<std::ops::RangeInclusive<std::vec::Vec<u8>>> for RangeOption<'static> {
429    fn from(range: RangeInclusive<Vec<u8>>) -> Self {
430        let (start, end) = range.into_inner();
431        (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
432    }
433}
434
435impl Transaction {
436    pub(crate) fn new(inner: NonNull<fdb_sys::FDBTransaction>) -> Self {
437        Self {
438            inner,
439            metrics: None,
440        }
441    }
442
443    pub fn new_instrumented(
444        inner: NonNull<fdb_sys::FDBTransaction>,
445        metrics: TransactionMetrics,
446    ) -> Self {
447        Self {
448            inner,
449            metrics: Some(metrics),
450        }
451    }
452
453    /// Called to set an option on an FDBTransaction.
454    pub fn set_option(&self, opt: options::TransactionOption) -> FdbResult<()> {
455        unsafe { opt.apply(self.inner.as_ptr()) }
456    }
457
458    /// Pass through an option given a code and raw data. Useful when creating a passthrough layer
459    /// where the code and data will be provided as raw, in order to avoid deserializing to an option
460    /// and serializing it back to code and data.
461    /// In general, you should use `set_option`.
462    pub fn set_raw_option(
463        &self,
464        code: fdb_sys::FDBTransactionOption,
465        data: Option<Vec<u8>>,
466    ) -> FdbResult<()> {
467        let (data_ptr, size) = data
468            .as_ref()
469            .map(|data| {
470                (
471                    data.as_ptr(),
472                    i32::try_from(data.len()).expect("len to fit in i32"),
473                )
474            })
475            .unwrap_or_else(|| (std::ptr::null(), 0));
476        let err = unsafe {
477            fdb_sys::fdb_transaction_set_option(self.inner.as_ptr(), code, data_ptr, size)
478        };
479        if err != 0 {
480            Err(FdbError::from_code(err))
481        } else {
482            Ok(())
483        }
484    }
485
486    /// Modify the database snapshot represented by transaction to change the given
487    /// key to have the given value.
488    ///
489    /// If the given key was not previously present in the database it is inserted.
490    /// The modification affects the actual database only if transaction is later
491    /// committed with `Transaction::commit`.
492    ///
493    /// # Arguments
494    ///
495    /// * `key` - the name of the key to be inserted into the database.
496    /// * `value` - the value to be inserted into the database
497    #[cfg_attr(
498        feature = "trace",
499        tracing::instrument(level = "debug", skip(self, key, value))
500    )]
501    pub fn set(&self, key: &[u8], value: &[u8]) {
502        unsafe {
503            fdb_sys::fdb_transaction_set(
504                self.inner.as_ptr(),
505                key.as_ptr(),
506                fdb_len(key.len(), "key"),
507                value.as_ptr(),
508                fdb_len(value.len(), "value"),
509            )
510        }
511
512        if let Some(metrics) = &self.metrics {
513            metrics.report_metrics(FdbCommand::Set((key.len() + value.len()) as u64));
514        }
515    }
516
517    /// Modify the database snapshot represented by transaction to remove the given key from the
518    /// database.
519    ///
520    /// If the key was not previously present in the database, there is no effect. The modification
521    /// affects the actual database only if transaction is later committed with
522    /// `Transaction::commit`.
523    ///
524    /// # Arguments
525    ///
526    /// * `key` - the name of the key to be removed from the database.
527    #[cfg_attr(
528        feature = "trace",
529        tracing::instrument(level = "debug", skip(self, key))
530    )]
531    pub fn clear(&self, key: &[u8]) {
532        unsafe {
533            fdb_sys::fdb_transaction_clear(
534                self.inner.as_ptr(),
535                key.as_ptr(),
536                fdb_len(key.len(), "key"),
537            )
538        }
539
540        if let Some(metrics) = &self.metrics {
541            metrics.report_metrics(FdbCommand::Clear);
542        }
543    }
544
545    /// Reads a value from the database snapshot represented by transaction.
546    ///
547    /// Returns an FDBFuture which will be set to the value of key in the database if there is any.
548    ///
549    /// # Arguments
550    ///
551    /// * `key` - the name of the key to be looked up in the database
552    /// * `snapshot` - `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
553    #[cfg_attr(
554        feature = "trace",
555        tracing::instrument(level = "debug", skip(self, key))
556    )]
557    pub fn get(
558        &self,
559        key: &[u8],
560        snapshot: bool,
561    ) -> impl Future<Output = FdbResult<Option<FdbSlice>>> + Send + Sync + Unpin {
562        let metrics = self.metrics.clone();
563        let lenght_key = key.len();
564
565        FdbFuture::<Option<FdbSlice>>::new(unsafe {
566            fdb_sys::fdb_transaction_get(
567                self.inner.as_ptr(),
568                key.as_ptr(),
569                fdb_len(key.len(), "key"),
570                fdb_bool(snapshot),
571            )
572        })
573        .map(move |result| {
574            if let Ok(value) = &result {
575                if let Some(metrics) = metrics.as_ref() {
576                    let (bytes_count, kv_fetched) = if let Some(values) = value {
577                        ((lenght_key + values.len()) as u64, 1)
578                    } else {
579                        (lenght_key as u64, 0)
580                    };
581                    metrics.report_metrics(FdbCommand::Get(bytes_count, kv_fetched));
582                }
583            }
584            result
585        })
586    }
587
588    /// Modify the database snapshot represented by transaction to perform the operation indicated
589    /// by operationType with operand param to the value stored by the given key.
590    ///
591    /// An atomic operation is a single database command that carries out several logical steps:
592    /// reading the value of a key, performing a transformation on that value, and writing the
593    /// result. Different atomic operations perform different transformations. Like other database
594    /// operations, an atomic operation is used within a transaction; however, its use within a
595    /// transaction will not cause the transaction to conflict.
596    ///
597    /// Atomic operations do not expose the current value of the key to the client but simply send
598    /// the database the transformation to apply. In regard to conflict checking, an atomic
599    /// operation is equivalent to a write without a read. It can only cause other transactions
600    /// performing reads of the key to conflict.
601    ///
602    /// By combining these logical steps into a single, read-free operation, FoundationDB can
603    /// guarantee that the transaction will not conflict due to the operation. This makes atomic
604    /// operations ideal for operating on keys that are frequently modified. A common example is
605    /// the use of a key-value pair as a counter.
606    ///
607    /// # Warning
608    ///
609    /// If a transaction uses both an atomic operation and a strictly serializable read on the same
610    /// key, the benefits of using the atomic operation (for both conflict checking and performance)
611    /// are lost.
612    #[cfg_attr(
613        feature = "trace",
614        tracing::instrument(level = "debug", skip(self, key, param))
615    )]
616    pub fn atomic_op(&self, key: &[u8], param: &[u8], op_type: options::MutationType) {
617        unsafe {
618            fdb_sys::fdb_transaction_atomic_op(
619                self.inner.as_ptr(),
620                key.as_ptr(),
621                fdb_len(key.len(), "key"),
622                param.as_ptr(),
623                fdb_len(param.len(), "param"),
624                op_type.code(),
625            )
626        }
627
628        if let Some(metrics) = &self.metrics {
629            metrics.report_metrics(FdbCommand::Atomic);
630        }
631    }
632
633    /// Resolves a key selector against the keys in the database snapshot represented by
634    /// transaction.
635    ///
636    /// Returns an FDBFuture which will be set to the key in the database matching the key
637    /// selector.
638    ///
639    /// # Arguments
640    ///
641    /// * `selector`: the key selector
642    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
643    #[cfg_attr(
644        feature = "trace",
645        tracing::instrument(level = "debug", skip(self, selector, snapshot))
646    )]
647    pub fn get_key(
648        &self,
649        selector: &KeySelector,
650        snapshot: bool,
651    ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
652        let key = selector.key();
653        FdbFuture::new(unsafe {
654            fdb_sys::fdb_transaction_get_key(
655                self.inner.as_ptr(),
656                key.as_ptr(),
657                fdb_len(key.len(), "key"),
658                fdb_bool(selector.or_equal()),
659                selector.offset(),
660                fdb_bool(snapshot),
661            )
662        })
663    }
664
665    /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
666    /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
667    /// equal to the key resolved by the begin key selector and lexicographically less than the key
668    /// resolved by the end key selector.
669    ///
670    /// Returns a stream of KeyValue slices.
671    ///
672    /// This method is a little more efficient than `get_ranges_keyvalues` but a little harder to
673    /// use.
674    ///
675    /// # Arguments
676    ///
677    /// * `opt`: the range, limit, target_bytes and mode
678    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
679    #[cfg_attr(
680        feature = "trace",
681        tracing::instrument(level = "debug", skip(self, opt, snapshot))
682    )]
683    pub fn get_ranges<'a>(
684        &'a self,
685        opt: RangeOption<'a>,
686        snapshot: bool,
687    ) -> impl Stream<Item = FdbResult<FdbValues>> + Send + Sync + Unpin + 'a {
688        stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
689            if let Some(opt) = maybe_opt {
690                Either::Left(self.get_range(&opt, iteration as usize, snapshot).map(
691                    move |maybe_values| {
692                        let next_opt = match &maybe_values {
693                            Ok(values) => opt.next_range(values),
694                            Err(..) => None,
695                        };
696                        Some((maybe_values, (iteration + 1, next_opt)))
697                    },
698                ))
699            } else {
700                Either::Right(future::ready(None))
701            }
702        })
703    }
704
705    /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
706    /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
707    /// equal to the key resolved by the begin key selector and lexicographically less than the key
708    /// resolved by the end key selector.
709    ///
710    /// Returns a stream of KeyValue.
711    ///
712    /// # Arguments
713    ///
714    /// * `opt`: the range, limit, target_bytes and mode
715    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
716    #[cfg_attr(
717        feature = "trace",
718        tracing::instrument(level = "debug", skip(self, opt, snapshot))
719    )]
720    pub fn get_ranges_keyvalues<'a>(
721        &'a self,
722        opt: RangeOption<'a>,
723        snapshot: bool,
724    ) -> impl Stream<Item = FdbResult<FdbValue>> + Unpin + 'a {
725        self.get_ranges(opt, snapshot)
726            .map_ok(|values| stream::iter(values.into_iter().map(Ok)))
727            .try_flatten()
728    }
729
730    /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
731    /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
732    /// equal to the key resolved by the begin key selector and lexicographically less than the key
733    /// resolved by the end key selector.
734    ///
735    /// # Arguments
736    ///
737    /// * `opt`: the range, limit, target_bytes and mode
738    /// * `iteration`: If opt.mode is Iterator, this parameter should start at 1 and be incremented
739    ///   by 1 for each successive call while reading this range. In all other cases it is ignored.
740    /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
741    #[cfg_attr(
742        feature = "trace",
743        tracing::instrument(level = "debug", skip(self, opt, snapshot))
744    )]
745    pub fn get_range(
746        &self,
747        opt: &RangeOption,
748        iteration: usize,
749        snapshot: bool,
750    ) -> impl Future<Output = FdbResult<FdbValues>> + Send + Sync + Unpin {
751        let begin = &opt.begin;
752        let end = &opt.end;
753        let key_begin = begin.key();
754        let key_end = end.key();
755
756        let metrics = self.metrics.clone();
757
758        FdbFuture::<FdbValues>::new(unsafe {
759            fdb_sys::fdb_transaction_get_range(
760                self.inner.as_ptr(),
761                key_begin.as_ptr(),
762                fdb_len(key_begin.len(), "key_begin"),
763                fdb_bool(begin.or_equal()),
764                begin.offset(),
765                key_end.as_ptr(),
766                fdb_len(key_end.len(), "key_end"),
767                fdb_bool(end.or_equal()),
768                end.offset(),
769                fdb_limit(opt.limit.unwrap_or(0)),
770                fdb_limit(opt.target_bytes),
771                opt.mode.code(),
772                fdb_iteration(iteration),
773                fdb_bool(snapshot),
774                fdb_bool(opt.reverse),
775            )
776        })
777        .map(move |result| {
778            if let (Ok(values), Some(metrics)) = (&result, metrics) {
779                let kv_fetched = values.len();
780                let mut bytes_count = 0;
781
782                for key_value in values.as_ref() {
783                    let key_len = key_value.key().len();
784                    let value_len = key_value.value().len();
785
786                    bytes_count += (key_len + value_len) as u64
787                }
788
789                metrics.report_metrics(FdbCommand::GetRange(bytes_count, kv_fetched as u64));
790            };
791
792            result
793        })
794    }
795
796    /// Mapped Range is an experimental feature introduced in FDB 7.1.
797    /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
798    /// In such a case, querying records by scanning an index in relational databases can be
799    /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
800    ///
801    /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
802    /// this can happen in one request without additional back and forth. Considering the overhead
803    /// of each request, this saves time and resources on serialization, deserialization, and network.
804    ///
805    /// A mapped request will:
806    ///
807    /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
808    /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
809    /// * Put all results in a nested structure and return them.
810    ///
811    /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
812    /// using snapshot isolation AND disabling read-your-writes.
813    ///
814    /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
815    ///
816    /// This is the "raw" version, users are expected to use [Transaction::get_mapped_ranges]
817    #[cfg_api_versions(min = 710)]
818    #[cfg_attr(
819        feature = "trace",
820        tracing::instrument(level = "debug", skip(self, opt, mapper, snapshot))
821    )]
822    pub fn get_mapped_range(
823        &self,
824        opt: &RangeOption,
825        mapper: &[u8],
826        iteration: usize,
827        snapshot: bool,
828    ) -> impl Future<Output = FdbResult<MappedKeyValues>> + Send + Sync + Unpin {
829        let begin = &opt.begin;
830        let end = &opt.end;
831        let key_begin = begin.key();
832        let key_end = end.key();
833
834        FdbFuture::new(unsafe {
835            fdb_sys::fdb_transaction_get_mapped_range(
836                self.inner.as_ptr(),
837                key_begin.as_ptr(),
838                fdb_len(key_begin.len(), "key_begin"),
839                fdb_bool(begin.or_equal()),
840                begin.offset(),
841                key_end.as_ptr(),
842                fdb_len(key_end.len(), "key_end"),
843                fdb_bool(end.or_equal()),
844                end.offset(),
845                mapper.as_ptr(),
846                fdb_len(mapper.len(), "mapper_length"),
847                fdb_limit(opt.limit.unwrap_or(0)),
848                fdb_limit(opt.target_bytes),
849                opt.mode.code(),
850                fdb_iteration(iteration),
851                fdb_bool(snapshot),
852                fdb_bool(opt.reverse),
853            )
854        })
855    }
856
857    /// Mapped Range is an experimental feature introduced in FDB 7.1.
858    /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
859    /// In such a case, querying records by scanning an index in relational databases can be
860    /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
861    ///
862    /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
863    /// this can happen in one request without additional back and forth. Considering the overhead
864    /// of each request, this saves time and resources on serialization, deserialization, and network.
865    ///
866    /// A mapped request will:
867    ///
868    /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
869    /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
870    /// * Put all results in a nested structure and return them.
871    ///
872    /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
873    /// using snapshot isolation AND disabling read-your-writes.
874    ///
875    /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
876    #[cfg_api_versions(min = 710)]
877    #[cfg_attr(
878        feature = "trace",
879        tracing::instrument(level = "debug", skip(self, opt, mapper, snapshot))
880    )]
881    pub fn get_mapped_ranges<'a>(
882        &'a self,
883        opt: RangeOption<'a>,
884        mapper: &'a [u8],
885        snapshot: bool,
886    ) -> impl Stream<Item = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + 'a {
887        stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
888            if let Some(opt) = maybe_opt {
889                Either::Left(
890                    self.get_mapped_range(&opt, mapper, iteration as usize, snapshot)
891                        .map(move |maybe_values| {
892                            let next_opt = match &maybe_values {
893                                Ok(values) => opt.next_mapped_range(values),
894                                Err(..) => None,
895                            };
896                            Some((maybe_values, (iteration + 1, next_opt)))
897                        }),
898                )
899            } else {
900                Either::Right(future::ready(None))
901            }
902        })
903    }
904
905    /// Modify the database snapshot represented by transaction to remove all keys (if any) which
906    /// are lexicographically greater than or equal to the given begin key and lexicographically
907    /// less than the given end_key.
908    ///
909    /// The modification affects the actual database only if transaction is later committed with
910    /// `Transaction::commit`.
911    #[cfg_attr(
912        feature = "trace",
913        tracing::instrument(level = "debug", skip(self, begin, end))
914    )]
915    pub fn clear_range(&self, begin: &[u8], end: &[u8]) {
916        unsafe {
917            fdb_sys::fdb_transaction_clear_range(
918                self.inner.as_ptr(),
919                begin.as_ptr(),
920                fdb_len(begin.len(), "begin"),
921                end.as_ptr(),
922                fdb_len(end.len(), "end"),
923            )
924        }
925
926        if let Some(metrics) = &self.metrics {
927            metrics.report_metrics(FdbCommand::ClearRange);
928        }
929    }
930
931    /// Get the estimated byte size of the key range based on the byte sample collected by FDB
932    #[cfg_api_versions(min = 630)]
933    pub fn get_estimated_range_size_bytes(
934        &self,
935        begin: &[u8],
936        end: &[u8],
937    ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
938        FdbFuture::<i64>::new(unsafe {
939            fdb_sys::fdb_transaction_get_estimated_range_size_bytes(
940                self.inner.as_ptr(),
941                begin.as_ptr(),
942                fdb_len(begin.len(), "begin"),
943                end.as_ptr(),
944                fdb_len(end.len(), "end"),
945            )
946        })
947    }
948
949    /// Attempts to commit the sets and clears previously applied to the database snapshot
950    /// represented by transaction to the actual database.
951    ///
952    /// The commit may or may not succeed – in particular, if a conflicting transaction previously
953    /// committed, then the commit must fail in order to preserve transactional isolation. If the
954    /// commit does succeed, the transaction is durably committed to the database and all
955    /// subsequently started transactions will observe its effects.
956    ///
957    /// It is not necessary to commit a read-only transaction – you can simply drop it.
958    ///
959    /// Callers will usually want to retry a transaction if the commit or a another method on the
960    /// transaction returns a retryable error (see `on_error` and/or `Database::transact`).
961    ///
962    /// As with other client/server databases, in some failure scenarios a client may be unable to
963    /// determine whether a transaction succeeded. In these cases, `Transaction::commit` will return
964    /// an error and `is_maybe_committed()` will returns true on that error. The `on_error` function
965    /// treats this error as retryable, so retry loops that don’t check for `is_maybe_committed()`
966    /// could execute the transaction twice. In these cases, you must consider the idempotence of
967    /// the transaction. For more information, see [Transactions with unknown results](https://apple.github.io/foundationdb/developer-guide.html#developer-guide-unknown-results).
968    ///
969    /// Normally, commit will wait for outstanding reads to return. However, if those reads were
970    /// snapshot reads or the transaction option for disabling “read-your-writes” has been invoked,
971    /// any outstanding reads will immediately return errors.
972    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
973    pub fn commit(self) -> impl Future<Output = TransactionResult> + Send + Sync + Unpin {
974        FdbFuture::<()>::new(unsafe { fdb_sys::fdb_transaction_commit(self.inner.as_ptr()) }).map(
975            move |r| match r {
976                Ok(()) => Ok(TransactionCommitted { tr: self }),
977                Err(err) => Err(TransactionCommitError { tr: self, err }),
978            },
979        )
980    }
981
982    /// Implements the recommended retry and backoff behavior for a transaction. This function knows
983    /// which of the error codes generated by other `Transaction` functions represent temporary
984    /// error conditions and which represent application errors that should be handled by the
985    /// application. It also implements an exponential backoff strategy to avoid swamping the
986    /// database cluster with excessive retries when there is a high level of conflict between
987    /// transactions.
988    ///
989    /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
990    /// transaction has already been reset.
991    ///
992    /// You should not call this method most of the times and use `Database::transact` which
993    /// implements a retry loop strategy for you.
994    pub fn on_error(
995        self,
996        err: FdbError,
997    ) -> impl Future<Output = FdbResult<Transaction>> + Send + Sync + Unpin {
998        FdbFuture::<()>::new(unsafe {
999            fdb_sys::fdb_transaction_on_error(self.inner.as_ptr(), err.code())
1000        })
1001        .map_ok(|()| self)
1002    }
1003
1004    /// Cancels the transaction. All pending or future uses of the transaction will return a
1005    /// transaction_cancelled error. The transaction can be used again after it is reset.
1006    pub fn cancel(self) -> TransactionCancelled {
1007        unsafe {
1008            fdb_sys::fdb_transaction_cancel(self.inner.as_ptr());
1009        }
1010        TransactionCancelled { tr: self }
1011    }
1012
1013    /// Sets a custom metric for the transaction with the specified name, value, and labels.
1014    ///
1015    /// Custom metrics allow you to track application-specific metrics during transaction execution.
1016    /// These metrics are collected alongside the standard FoundationDB metrics and can be used
1017    /// for monitoring, debugging, and performance analysis.
1018    ///
1019    /// # Arguments
1020    /// * `name` - The name of the metric (e.g., "query_time", "cache_hits")
1021    /// * `value` - The value to set for the metric
1022    /// * `labels` - Key-value pairs for labeling the metric, allowing for dimensional metrics
1023    ///   (e.g., `[("operation", "read"), ("region", "us-west")]`)
1024    ///
1025    /// # Returns
1026    /// * `Ok(())` if the metric was set successfully
1027    /// * `Err(TransactionMetricsNotFound)` if this transaction was not created with metrics instrumentation
1028    ///
1029    /// # Example
1030    /// ```
1031    /// # use foundationdb::*;
1032    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1033    /// # let db = Database::default()?;
1034    /// let metrics = TransactionMetrics::new();
1035    /// let txn = db.create_instrumented_trx(metrics)?;
1036    ///
1037    /// // Set a custom metric with labels
1038    /// txn.set_custom_metric("query_processing_time", 42, &[("query_type", "select"), ("table", "users")])?;
1039    /// # Ok(())
1040    /// # }
1041    /// ```
1042    ///
1043    /// # Note
1044    /// This method only works on transactions created with `create_instrumented_trx()` or within
1045    /// `instrumented_run()`. For regular transactions created with `create_trx()`, this will
1046    /// return `Err(TransactionMetricsNotFound)`.
1047    pub fn set_custom_metric(
1048        &self,
1049        name: &str,
1050        value: u64,
1051        labels: &[(&str, &str)],
1052    ) -> Result<(), TransactionMetricsNotFound> {
1053        if let Some(metrics) = &self.metrics {
1054            metrics.set_custom(name, value, labels);
1055            Ok(())
1056        } else {
1057            Err(TransactionMetricsNotFound)
1058        }
1059    }
1060
1061    /// Increments a custom metric for the transaction by the specified amount.
1062    ///
1063    /// This is useful for counting events or accumulating values during transaction execution.
1064    /// If the metric doesn't exist yet, it will be created with the specified amount.
1065    /// If it already exists with the same labels, its value will be incremented.
1066    ///
1067    /// # Arguments
1068    /// * `name` - The name of the metric to increment (e.g., "requests", "bytes_processed")
1069    /// * `amount` - The amount to increment the metric by
1070    /// * `labels` - Key-value pairs for labeling the metric, allowing for dimensional metrics
1071    ///   (e.g., `[("status", "success"), ("endpoint", "api/v1/users")]`)
1072    ///
1073    /// # Returns
1074    /// * `Ok(())` if the metric was incremented successfully
1075    /// * `Err(TransactionMetricsNotFound)` if this transaction was not created with metrics instrumentation
1076    ///
1077    /// # Example
1078    /// ```
1079    /// # use foundationdb::*;
1080    /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1081    /// # let db = Database::default()?;
1082    /// let metrics = TransactionMetrics::new();
1083    /// let txn = db.create_instrumented_trx(metrics)?;
1084    ///
1085    /// // Increment a counter each time a specific operation occurs
1086    /// txn.increment_custom_metric("cache_misses", 1, &[("cache", "user_data")])?;
1087    ///
1088    /// // Later in the transaction, increment it again
1089    /// txn.increment_custom_metric("cache_misses", 1, &[("cache", "user_data")])?;
1090    ///
1091    /// // The final value will be 2
1092    /// # Ok(())
1093    /// # }
1094    /// ```
1095    ///
1096    /// # Note
1097    /// This method only works on transactions created with `create_instrumented_trx()` or within
1098    /// `instrumented_run()`. For regular transactions created with `create_trx()`, this will
1099    /// return `Err(TransactionMetricsNotFound)`.
1100    pub fn increment_custom_metric(
1101        &self,
1102        name: &str,
1103        amount: u64,
1104        labels: &[(&str, &str)],
1105    ) -> Result<(), TransactionMetricsNotFound> {
1106        if let Some(metrics) = &self.metrics {
1107            metrics.increment_custom(name, amount, labels);
1108            Ok(())
1109        } else {
1110            Err(TransactionMetricsNotFound)
1111        }
1112    }
1113
1114    /// Returns a list of public network addresses as strings, one for each of the storage servers
1115    /// responsible for storing key_name and its associated value.
1116    pub fn get_addresses_for_key(
1117        &self,
1118        key: &[u8],
1119    ) -> impl Future<Output = FdbResult<FdbAddresses>> + Send + Sync + Unpin {
1120        FdbFuture::new(unsafe {
1121            fdb_sys::fdb_transaction_get_addresses_for_key(
1122                self.inner.as_ptr(),
1123                key.as_ptr(),
1124                fdb_len(key.len(), "key"),
1125            )
1126        })
1127    }
1128
1129    /// A watch's behavior is relative to the transaction that created it. A watch will report a
1130    /// change in relation to the key’s value as readable by that transaction. The initial value
1131    /// used for comparison is either that of the transaction’s read version or the value as
1132    /// modified by the transaction itself prior to the creation of the watch. If the value changes
1133    /// and then changes back to its initial value, the watch might not report the change.
1134    ///
1135    /// Until the transaction that created it has been committed, a watch will not report changes
1136    /// made by other transactions. In contrast, a watch will immediately report changes made by
1137    /// the transaction itself. Watches cannot be created if the transaction has set the
1138    /// READ_YOUR_WRITES_DISABLE transaction option, and an attempt to do so will return an
1139    /// watches_disabled error.
1140    ///
1141    /// If the transaction used to create a watch encounters an error during commit, then the watch
1142    /// will be set with that error. A transaction whose commit result is unknown will set all of
1143    /// its watches with the commit_unknown_result error. If an uncommitted transaction is reset or
1144    /// destroyed, then any watches it created will be set with the transaction_cancelled error.
1145    ///
1146    /// Returns an future representing an empty value that will be set once the watch has
1147    /// detected a change to the value at the specified key.
1148    ///
1149    /// By default, each database connection can have no more than 10,000 watches that have not yet
1150    /// reported a change. When this number is exceeded, an attempt to create a watch will return a
1151    /// too_many_watches error. This limit can be changed using the MAX_WATCHES database option.
1152    /// Because a watch outlives the transaction that creates it, any watch that is no longer
1153    /// needed should be cancelled by dropping its future.
1154    pub fn watch(&self, key: &[u8]) -> impl Future<Output = FdbResult<()>> + Send + Sync + Unpin {
1155        FdbFuture::new(unsafe {
1156            fdb_sys::fdb_transaction_watch(
1157                self.inner.as_ptr(),
1158                key.as_ptr(),
1159                fdb_len(key.len(), "key"),
1160            )
1161        })
1162    }
1163
1164    /// Returns an FDBFuture which will be set to the approximate transaction size so far in the
1165    /// returned future, which is the summation of the estimated size of mutations, read conflict
1166    /// ranges, and write conflict ranges.
1167    ///
1168    /// This can be called multiple times before the transaction is committed.
1169    #[cfg_api_versions(min = 620)]
1170    pub fn get_approximate_size(
1171        &self,
1172    ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
1173        FdbFuture::new(unsafe {
1174            fdb_sys::fdb_transaction_get_approximate_size(self.inner.as_ptr())
1175        })
1176    }
1177
1178    /// Gets a list of keys that can split the given range into (roughly) equally sized chunks based on chunk_size.
1179    /// Note: the returned split points contain the start key and end key of the given range.
1180    #[cfg_api_versions(min = 700)]
1181    pub fn get_range_split_points(
1182        &self,
1183        begin: &[u8],
1184        end: &[u8],
1185        chunk_size: i64,
1186    ) -> impl Future<Output = FdbResult<FdbKeys>> + Send + Sync + Unpin {
1187        FdbFuture::<FdbKeys>::new(unsafe {
1188            fdb_sys::fdb_transaction_get_range_split_points(
1189                self.inner.as_ptr(),
1190                begin.as_ptr(),
1191                fdb_len(begin.len(), "begin"),
1192                end.as_ptr(),
1193                fdb_len(end.len(), "end"),
1194                chunk_size,
1195            )
1196        })
1197    }
1198
1199    /// Returns an FDBFuture which will be set to the versionstamp which was used by any
1200    /// versionstamp operations in this transaction.
1201    ///
1202    /// The future will be ready only after the successful completion of a call to `commit()` on
1203    /// this Transaction. Read-only transactions do not modify the database when committed and will
1204    /// result in the future completing with an error. Keep in mind that a transaction which reads
1205    /// keys and then sets them to their current values may be optimized to a read-only transaction.
1206    ///
1207    /// Most applications will not call this function.
1208    pub fn get_versionstamp(
1209        &self,
1210    ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
1211        FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_versionstamp(self.inner.as_ptr()) })
1212    }
1213
1214    /// The transaction obtains a snapshot read version automatically at the time of the first call
1215    /// to `get_*()` (including this one) and (unless causal consistency has been deliberately
1216    /// compromised by transaction options) is guaranteed to represent all transactions which were
1217    /// reported committed before that call.
1218    pub fn get_read_version(&self) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
1219        FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_read_version(self.inner.as_ptr()) })
1220    }
1221
1222    /// Sets the snapshot read version used by a transaction.
1223    ///
1224    /// This is not needed in simple cases.
1225    /// If the given version is too old, subsequent reads will fail with error_code_past_version;
1226    /// if it is too new, subsequent reads may be delayed indefinitely and/or fail with
1227    /// error_code_future_version. If any of get_*() have been called on this transaction already,
1228    /// the result is undefined.
1229    pub fn set_read_version(&self, version: i64) {
1230        unsafe { fdb_sys::fdb_transaction_set_read_version(self.inner.as_ptr(), version) }
1231    }
1232
1233    /// The metadata version key `\xff/metadataVersion` is a key intended to help layers deal with hot keys.
1234    /// The value of this key is sent to clients along with the read version from the proxy,
1235    /// so a client can read its value without communicating with a storage server.
1236    /// To retrieve the metadataVersion, you need to set `TransactionOption::ReadSystemKeys`
1237    #[cfg_api_versions(min = 610)]
1238    pub async fn get_metadata_version(&self, snapshot: bool) -> FdbResult<Option<i64>> {
1239        match self.get(METADATA_VERSION_KEY, snapshot).await {
1240            Ok(Some(fdb_slice)) => {
1241                let value = fdb_slice.deref();
1242                // as we cannot write the metadata-key directly(we must mutate with an atomic_op),
1243                // can we assume that it will always be the correct size?
1244                if value.len() < 8 {
1245                    return Ok(None);
1246                }
1247
1248                // The 80-bits versionstamps are 10 bytes longs, and are composed of:
1249                // * 8 bytes (Transaction Version)
1250                // * followed by 2 bytes (Transaction Batch Order)
1251                // More details can be found here: https://forums.foundationdb.org/t/implementing-versionstamps-in-bindings/250
1252                let mut arr = [0u8; 8];
1253                arr.copy_from_slice(&value[0..8]);
1254                let transaction_version: i64 = i64::from_be_bytes(arr);
1255
1256                Ok(Some(transaction_version))
1257            }
1258            Ok(None) => Ok(None),
1259
1260            Err(err) => Err(err),
1261        }
1262    }
1263
1264    #[cfg_api_versions(min = 610)]
1265    pub fn update_metadata_version(&self) {
1266        // The param is transformed by removing the final four bytes from ``param`` and reading
1267        // those as a little-Endian 32-bit integer to get a position ``pos``.
1268        // The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the
1269        // versionstamp of the transaction used. The first byte of the parameter is position 0.
1270        // As we only have the metadata value, we can just create an 14-bytes Vec filled with 0u8.
1271        let param = vec![0u8; 14];
1272        self.atomic_op(
1273            METADATA_VERSION_KEY,
1274            param.as_slice(),
1275            options::MutationType::SetVersionstampedValue,
1276        )
1277    }
1278
1279    /// Reset transaction to its initial state.
1280    ///
1281    /// In order to protect against a race condition with cancel(), this call require a mutable
1282    /// access to the transaction.
1283    ///
1284    /// This is similar to dropping the transaction and creating a new one.
1285    ///
1286    /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
1287    /// transaction has already been reset.
1288    pub fn reset(&mut self) {
1289        unsafe { fdb_sys::fdb_transaction_reset(self.inner.as_ptr()) }
1290    }
1291
1292    /// Reads the conflicting key ranges from the special keyspace after a commit conflict.
1293    ///
1294    /// This method reads from `\xff\xff/transaction/conflicting_keys/` and parses the
1295    /// boundary encoding where `b"1"` marks range starts and `b"0"` marks range ends.
1296    ///
1297    /// The special keyspace read is resolved client-side — no network round-trip to the
1298    /// cluster. The future still goes through the FDB network thread event loop, but the
1299    /// data comes from an in-memory map populated during the commit response. Returns
1300    /// an empty `Vec` if
1301    /// [`TransactionOption::ReportConflictingKeys`](crate::options::TransactionOption::ReportConflictingKeys)
1302    /// was not set.
1303    ///
1304    /// # Errors
1305    ///
1306    /// Returns an `FdbError` if the special keyspace read fails.
1307    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
1308    pub async fn conflicting_keys(&self) -> FdbResult<Vec<ConflictingKeyRange>> {
1309        let opt = RangeOption::from((CONFLICTING_KEYS_PREFIX, CONFLICTING_KEYS_END));
1310        let range_result = self.get_range(&opt, 1, false).await?;
1311
1312        let prefix_len = CONFLICTING_KEYS_PREFIX.len();
1313        let mut ranges = Vec::new();
1314        let mut current_begin: Option<Vec<u8>> = None;
1315
1316        for kv in range_result.iter() {
1317            let raw_key = kv.key();
1318            let actual_key = if raw_key.len() > prefix_len {
1319                raw_key[prefix_len..].to_vec()
1320            } else {
1321                Vec::new()
1322            };
1323
1324            match kv.value() {
1325                b"1" => {
1326                    current_begin = Some(actual_key);
1327                }
1328                b"0" => {
1329                    if let Some(begin) = current_begin.take() {
1330                        ranges.push(ConflictingKeyRange {
1331                            begin,
1332                            end: actual_key,
1333                        });
1334                    }
1335                }
1336                _ => {}
1337            }
1338        }
1339
1340        debug_assert!(
1341            current_begin.is_none(),
1342            "unpaired '1' marker in conflicting keys response"
1343        );
1344
1345        Ok(ranges)
1346    }
1347
1348    /// Adds a conflict range to a transaction without performing the associated read or write.
1349    ///
1350    /// # Note
1351    ///
1352    /// Most applications will use the serializable isolation that transactions provide by default
1353    /// and will not need to manipulate conflict ranges.
1354    pub fn add_conflict_range(
1355        &self,
1356        begin: &[u8],
1357        end: &[u8],
1358        ty: options::ConflictRangeType,
1359    ) -> FdbResult<()> {
1360        error::eval(unsafe {
1361            fdb_sys::fdb_transaction_add_conflict_range(
1362                self.inner.as_ptr(),
1363                begin.as_ptr(),
1364                fdb_len(begin.len(), "begin"),
1365                end.as_ptr(),
1366                fdb_len(end.len(), "end"),
1367                ty.code(),
1368            )
1369        })
1370    }
1371}
1372
1373impl Drop for Transaction {
1374    fn drop(&mut self) {
1375        unsafe {
1376            fdb_sys::fdb_transaction_destroy(self.inner.as_ptr());
1377        }
1378    }
1379}
1380
1381/// A retryable transaction, generated by Database.run
1382#[derive(Clone)]
1383pub struct RetryableTransaction {
1384    inner: Arc<Transaction>,
1385}
1386
1387impl Deref for RetryableTransaction {
1388    type Target = Transaction;
1389    fn deref(&self) -> &Transaction {
1390        self.inner.deref()
1391    }
1392}
1393
1394impl RetryableTransaction {
1395    pub(crate) fn new(t: Transaction) -> RetryableTransaction {
1396        RetryableTransaction { inner: Arc::new(t) }
1397    }
1398
1399    pub(crate) fn take(self) -> Result<Transaction, FdbBindingError> {
1400        // checking weak references
1401        if Arc::weak_count(&self.inner) != 0 {
1402            return Err(FdbBindingError::ReferenceToTransactionKept);
1403        }
1404        Arc::try_unwrap(self.inner).map_err(|_| FdbBindingError::ReferenceToTransactionKept)
1405    }
1406
1407    pub(crate) async fn on_error(
1408        self,
1409        err: FdbError,
1410    ) -> Result<Result<RetryableTransaction, FdbError>, FdbBindingError> {
1411        Ok(self
1412            .take()?
1413            .on_error(err)
1414            .await
1415            .map(RetryableTransaction::new))
1416    }
1417
1418    #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
1419    pub(crate) async fn commit(
1420        self,
1421    ) -> Result<Result<TransactionCommitted, TransactionCommitError>, FdbBindingError> {
1422        Ok(self.take()?.commit().await)
1423    }
1424}