foundationdb/transaction.rs
1// Copyright 2018 foundationdb-rs developers, https://github.com/Clikengo/foundationdb-rs/graphs/contributors
2// Copyright 2013-2018 Apple, Inc and the FoundationDB project authors.
3//
4// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
5// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
6// http://opensource.org/licenses/MIT>, at your option. This file may not be
7// copied, modified, or distributed except according to those terms.
8
9//! Implementations of the FDBTransaction C API
10//!
11//! <https://apple.github.io/foundationdb/api-c.html#transaction>
12
13use foundationdb_sys as fdb_sys;
14use std::fmt;
15use std::ops::{Deref, Range, RangeInclusive};
16use std::ptr::NonNull;
17use std::sync::Arc;
18
19use crate::future::*;
20use crate::keyselector::*;
21use crate::metrics::{FdbCommand, TransactionMetrics};
22use crate::options;
23
24use crate::{error, FdbError, FdbResult};
25use foundationdb_macros::cfg_api_versions;
26
27use crate::error::{FdbBindingError, TransactionMetricsNotFound};
28
29use futures::{
30 future, future::Either, stream, Future, FutureExt, Stream, TryFutureExt, TryStreamExt,
31};
32
33#[cfg_api_versions(min = 610)]
34const METADATA_VERSION_KEY: &[u8] = b"\xff/metadataVersion";
35
36/// Special keyspace prefix for conflicting keys.
37const CONFLICTING_KEYS_PREFIX: &[u8] = b"\xff\xff/transaction/conflicting_keys/";
38// Matches C++ SystemData.cpp conflictingKeysRange end key.
39const CONFLICTING_KEYS_END: &[u8] = b"\xff\xff/transaction/conflicting_keys/\xff\xff";
40
41/// A key range that conflicted during a transaction commit.
42///
43/// Returned by [`Transaction::conflicting_keys`] after a commit conflict
44/// when [`TransactionOption::ReportConflictingKeys`](crate::options::TransactionOption::ReportConflictingKeys)
45/// is enabled.
46///
47/// The special keyspace encodes conflicting ranges using boundary markers:
48/// - Value `b"1"` marks the inclusive start of a conflicting range
49/// - Value `b"0"` marks the exclusive end of a conflicting range
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub struct ConflictingKeyRange {
52 begin: Vec<u8>,
53 end: Vec<u8>,
54}
55
56impl ConflictingKeyRange {
57 /// The inclusive begin of the conflicting key range.
58 pub fn begin(&self) -> &[u8] {
59 &self.begin
60 }
61
62 /// The exclusive end of the conflicting key range.
63 pub fn end(&self) -> &[u8] {
64 &self.end
65 }
66}
67
68impl fmt::Display for ConflictingKeyRange {
69 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
70 write!(
71 f,
72 "{}..{}",
73 String::from_utf8_lossy(&self.begin),
74 String::from_utf8_lossy(&self.end),
75 )
76 }
77}
78
79/// A committed transaction.
80#[derive(Debug)]
81#[repr(transparent)]
82pub struct TransactionCommitted {
83 tr: Transaction,
84}
85
86impl TransactionCommitted {
87 /// Retrieves the database version number at which a given transaction was committed.
88 ///
89 /// Read-only transactions do not modify the database when committed and will have a committed
90 /// version of -1. Keep in mind that a transaction which reads keys and then sets them to their
91 /// current values may be optimized to a read-only transaction.
92 ///
93 /// Note that database versions are not necessarily unique to a given transaction and so cannot
94 /// be used to determine in what order two transactions completed. The only use for this
95 /// function is to manually enforce causal consistency when calling `set_read_version()` on
96 /// another subsequent transaction.
97 ///
98 /// Most applications will not call this function.
99 pub fn committed_version(&self) -> FdbResult<i64> {
100 let mut version: i64 = 0;
101 error::eval(unsafe {
102 fdb_sys::fdb_transaction_get_committed_version(self.tr.inner.as_ptr(), &mut version)
103 })?;
104 Ok(version)
105 }
106
107 /// Reset the transaction to its initial state.
108 ///
109 /// This will not affect previously committed data.
110 ///
111 /// This is similar to dropping the transaction and creating a new one.
112 pub fn reset(mut self) -> Transaction {
113 self.tr.reset();
114 self.tr
115 }
116}
117impl From<TransactionCommitted> for Transaction {
118 fn from(tc: TransactionCommitted) -> Transaction {
119 tc.reset()
120 }
121}
122
123/// A failed to commit transaction.
124pub struct TransactionCommitError {
125 tr: Transaction,
126 err: FdbError,
127}
128
129impl TransactionCommitError {
130 /// Implements the recommended retry and backoff behavior for a transaction. This function knows
131 /// which of the error codes generated by other `Transaction` functions represent temporary
132 /// error conditions and which represent application errors that should be handled by the
133 /// application. It also implements an exponential backoff strategy to avoid swamping the
134 /// database cluster with excessive retries when there is a high level of conflict between
135 /// transactions.
136 ///
137 /// You should not call this method most of the times and use `Database::transact` which
138 /// implements a retry loop strategy for you.
139 pub fn on_error(self) -> impl Future<Output = FdbResult<Transaction>> {
140 FdbFuture::<()>::new(unsafe {
141 fdb_sys::fdb_transaction_on_error(self.tr.inner.as_ptr(), self.err.code())
142 })
143 .map_ok(|()| self.tr)
144 }
145
146 /// Reads the conflicting key ranges that caused this commit failure.
147 ///
148 /// Only returns meaningful results if
149 /// [`TransactionOption::ReportConflictingKeys`](crate::options::TransactionOption::ReportConflictingKeys)
150 /// was set on the transaction **and** the error is `not_committed` (code 1020).
151 ///
152 /// Must be called **before** [`on_error`](Self::on_error) which resets the transaction.
153 ///
154 /// # Errors
155 ///
156 /// Returns an `FdbError` if the special keyspace read fails.
157 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
158 pub async fn conflicting_keys(&self) -> FdbResult<Vec<ConflictingKeyRange>> {
159 self.tr.conflicting_keys().await
160 }
161
162 /// Reset the transaction to its initial state.
163 ///
164 /// This is similar to dropping the transaction and creating a new one.
165 pub fn reset(mut self) -> Transaction {
166 self.tr.reset();
167 self.tr
168 }
169}
170
171impl Deref for TransactionCommitError {
172 type Target = FdbError;
173 fn deref(&self) -> &FdbError {
174 &self.err
175 }
176}
177
178impl From<TransactionCommitError> for FdbError {
179 fn from(tce: TransactionCommitError) -> FdbError {
180 tce.err
181 }
182}
183
184impl fmt::Debug for TransactionCommitError {
185 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
186 write!(f, "TransactionCommitError({})", self.err)
187 }
188}
189
190impl fmt::Display for TransactionCommitError {
191 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
192 self.err.fmt(f)
193 }
194}
195
196impl std::error::Error for TransactionCommitError {}
197
198/// The result of `Transaction::Commit`
199type TransactionResult = Result<TransactionCommitted, TransactionCommitError>;
200
201/// A cancelled transaction
202#[derive(Debug)]
203#[repr(transparent)]
204pub struct TransactionCancelled {
205 tr: Transaction,
206}
207impl TransactionCancelled {
208 /// Reset the transaction to its initial state.
209 ///
210 /// This is similar to dropping the transaction and creating a new one.
211 pub fn reset(mut self) -> Transaction {
212 self.tr.reset();
213 self.tr
214 }
215}
216impl From<TransactionCancelled> for Transaction {
217 fn from(tc: TransactionCancelled) -> Transaction {
218 tc.reset()
219 }
220}
221
222/// In FoundationDB, a transaction is a mutable snapshot of a database.
223///
224/// All read and write operations on a transaction see and modify an otherwise-unchanging version of the database and only change the underlying database if and when the transaction is committed. Read operations do see the effects of previous write operations on the same transaction. Committing a transaction usually succeeds in the absence of conflicts.
225///
226/// Applications must provide error handling and an appropriate retry loop around the application code for a transaction. See the documentation for [fdb_transaction_on_error()](https://apple.github.io/foundationdb/api-c.html#transaction).
227///
228/// Transactions group operations into a unit with the properties of atomicity, isolation, and durability. Transactions also provide the ability to maintain an application’s invariants or integrity constraints, supporting the property of consistency. Together these properties are known as ACID.
229///
230/// Transactions are also causally consistent: once a transaction has been successfully committed, all subsequently created transactions will see the modifications made by it.
231#[derive(Debug)]
232pub struct Transaction {
233 // Order of fields should not be changed, because Rust drops field top-to-bottom, and
234 // transaction should be dropped before cluster.
235 inner: NonNull<fdb_sys::FDBTransaction>,
236 metrics: Option<TransactionMetrics>,
237}
238unsafe impl Send for Transaction {}
239unsafe impl Sync for Transaction {}
240
241/// Converts Rust `bool` into `fdb_sys::fdb_bool_t`
242#[inline]
243fn fdb_bool(v: bool) -> fdb_sys::fdb_bool_t {
244 if v {
245 1
246 } else {
247 0
248 }
249}
250#[inline]
251fn fdb_len(len: usize, context: &'static str) -> std::os::raw::c_int {
252 assert!(len <= i32::MAX as usize, "{context}.len() > i32::MAX");
253 len as i32
254}
255#[inline]
256fn fdb_iteration(iteration: usize) -> std::os::raw::c_int {
257 if iteration > i32::MAX as usize {
258 0 // this will cause client_invalid_operation
259 } else {
260 iteration as i32
261 }
262}
263#[inline]
264fn fdb_limit(v: usize) -> std::os::raw::c_int {
265 if v > i32::MAX as usize {
266 i32::MAX
267 } else {
268 v as i32
269 }
270}
271
272/// `RangeOption` represents a query parameters for range scan query.
273///
274/// You can construct `RangeOption` easily:
275///
276/// ```
277/// use foundationdb::RangeOption;
278///
279/// let opt = RangeOption::from((b"begin".as_ref(), b"end".as_ref()));
280/// let opt: RangeOption = (b"begin".as_ref()..b"end".as_ref()).into();
281/// let opt = RangeOption {
282/// limit: Some(10),
283/// ..RangeOption::from((b"begin".as_ref(), b"end".as_ref()))
284/// };
285/// ```
286#[derive(Debug, Clone)]
287pub struct RangeOption<'a> {
288 /// The beginning of the range.
289 pub begin: KeySelector<'a>,
290 /// The end of the range.
291 pub end: KeySelector<'a>,
292 /// If non-zero, indicates the maximum number of key-value pairs to return.
293 pub limit: Option<usize>,
294 /// If non-zero, indicates a (soft) cap on the combined number of bytes of keys and values to
295 /// return for each item.
296 pub target_bytes: usize,
297 /// One of the options::StreamingMode values indicating how the caller would like the data in
298 /// the range returned.
299 pub mode: options::StreamingMode,
300 /// If true, key-value pairs will be returned in reverse lexicographical order beginning at
301 /// the end of the range.
302 pub reverse: bool,
303 #[doc(hidden)]
304 pub __non_exhaustive: std::marker::PhantomData<()>,
305}
306
307impl RangeOption<'_> {
308 /// Reverses the range direction.
309 pub fn rev(mut self) -> Self {
310 self.reverse = !self.reverse;
311 self
312 }
313
314 pub fn next_range(mut self, kvs: &FdbValues) -> Option<Self> {
315 if !kvs.more() {
316 return None;
317 }
318
319 let last = kvs.last()?;
320 let last_key = last.key();
321
322 if let Some(limit) = self.limit.as_mut() {
323 *limit = limit.saturating_sub(kvs.len());
324 if *limit == 0 {
325 return None;
326 }
327 }
328
329 if self.reverse {
330 self.end.make_first_greater_or_equal(last_key);
331 } else {
332 self.begin.make_first_greater_than(last_key);
333 }
334 Some(self)
335 }
336
337 #[cfg_api_versions(min = 710)]
338 pub(crate) fn next_mapped_range(mut self, kvs: &MappedKeyValues) -> Option<Self> {
339 if !kvs.more() {
340 return None;
341 }
342
343 let last = kvs.last()?;
344 let last_key = last.parent_key();
345
346 if let Some(limit) = self.limit.as_mut() {
347 *limit = limit.saturating_sub(kvs.len());
348 if *limit == 0 {
349 return None;
350 }
351 }
352
353 if self.reverse {
354 self.end.make_first_greater_or_equal(last_key);
355 } else {
356 self.begin.make_first_greater_than(last_key);
357 }
358 Some(self)
359 }
360}
361
362impl Default for RangeOption<'_> {
363 fn default() -> Self {
364 Self {
365 begin: KeySelector::first_greater_or_equal([].as_ref()),
366 end: KeySelector::first_greater_or_equal([].as_ref()),
367 limit: None,
368 target_bytes: 0,
369 mode: options::StreamingMode::Iterator,
370 reverse: false,
371 __non_exhaustive: std::marker::PhantomData,
372 }
373 }
374}
375
376impl<'a> From<(KeySelector<'a>, KeySelector<'a>)> for RangeOption<'a> {
377 fn from((begin, end): (KeySelector<'a>, KeySelector<'a>)) -> Self {
378 Self {
379 begin,
380 end,
381 ..Self::default()
382 }
383 }
384}
385impl From<(Vec<u8>, Vec<u8>)> for RangeOption<'static> {
386 fn from((begin, end): (Vec<u8>, Vec<u8>)) -> Self {
387 Self {
388 begin: KeySelector::first_greater_or_equal(begin),
389 end: KeySelector::first_greater_or_equal(end),
390 ..Self::default()
391 }
392 }
393}
394impl<'a> From<(&'a [u8], &'a [u8])> for RangeOption<'a> {
395 fn from((begin, end): (&'a [u8], &'a [u8])) -> Self {
396 Self {
397 begin: KeySelector::first_greater_or_equal(begin),
398 end: KeySelector::first_greater_or_equal(end),
399 ..Self::default()
400 }
401 }
402}
403impl<'a> From<std::ops::Range<KeySelector<'a>>> for RangeOption<'a> {
404 fn from(range: Range<KeySelector<'a>>) -> Self {
405 RangeOption::from((range.start, range.end))
406 }
407}
408
409impl<'a> From<std::ops::Range<&'a [u8]>> for RangeOption<'a> {
410 fn from(range: Range<&'a [u8]>) -> Self {
411 RangeOption::from((range.start, range.end))
412 }
413}
414
415impl From<std::ops::Range<std::vec::Vec<u8>>> for RangeOption<'static> {
416 fn from(range: Range<Vec<u8>>) -> Self {
417 RangeOption::from((range.start, range.end))
418 }
419}
420
421impl<'a> From<std::ops::RangeInclusive<&'a [u8]>> for RangeOption<'a> {
422 fn from(range: RangeInclusive<&'a [u8]>) -> Self {
423 let (start, end) = range.into_inner();
424 (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
425 }
426}
427
428impl From<std::ops::RangeInclusive<std::vec::Vec<u8>>> for RangeOption<'static> {
429 fn from(range: RangeInclusive<Vec<u8>>) -> Self {
430 let (start, end) = range.into_inner();
431 (KeySelector::first_greater_or_equal(start)..KeySelector::first_greater_than(end)).into()
432 }
433}
434
435impl Transaction {
436 pub(crate) fn new(inner: NonNull<fdb_sys::FDBTransaction>) -> Self {
437 Self {
438 inner,
439 metrics: None,
440 }
441 }
442
443 pub fn new_instrumented(
444 inner: NonNull<fdb_sys::FDBTransaction>,
445 metrics: TransactionMetrics,
446 ) -> Self {
447 Self {
448 inner,
449 metrics: Some(metrics),
450 }
451 }
452
453 /// Called to set an option on an FDBTransaction.
454 pub fn set_option(&self, opt: options::TransactionOption) -> FdbResult<()> {
455 unsafe { opt.apply(self.inner.as_ptr()) }
456 }
457
458 /// Pass through an option given a code and raw data. Useful when creating a passthrough layer
459 /// where the code and data will be provided as raw, in order to avoid deserializing to an option
460 /// and serializing it back to code and data.
461 /// In general, you should use `set_option`.
462 pub fn set_raw_option(
463 &self,
464 code: fdb_sys::FDBTransactionOption,
465 data: Option<Vec<u8>>,
466 ) -> FdbResult<()> {
467 let (data_ptr, size) = data
468 .as_ref()
469 .map(|data| {
470 (
471 data.as_ptr(),
472 i32::try_from(data.len()).expect("len to fit in i32"),
473 )
474 })
475 .unwrap_or_else(|| (std::ptr::null(), 0));
476 let err = unsafe {
477 fdb_sys::fdb_transaction_set_option(self.inner.as_ptr(), code, data_ptr, size)
478 };
479 if err != 0 {
480 Err(FdbError::from_code(err))
481 } else {
482 Ok(())
483 }
484 }
485
486 /// Modify the database snapshot represented by transaction to change the given
487 /// key to have the given value.
488 ///
489 /// If the given key was not previously present in the database it is inserted.
490 /// The modification affects the actual database only if transaction is later
491 /// committed with `Transaction::commit`.
492 ///
493 /// # Arguments
494 ///
495 /// * `key` - the name of the key to be inserted into the database.
496 /// * `value` - the value to be inserted into the database
497 #[cfg_attr(
498 feature = "trace",
499 tracing::instrument(level = "debug", skip(self, key, value))
500 )]
501 pub fn set(&self, key: &[u8], value: &[u8]) {
502 unsafe {
503 fdb_sys::fdb_transaction_set(
504 self.inner.as_ptr(),
505 key.as_ptr(),
506 fdb_len(key.len(), "key"),
507 value.as_ptr(),
508 fdb_len(value.len(), "value"),
509 )
510 }
511
512 if let Some(metrics) = &self.metrics {
513 metrics.report_metrics(FdbCommand::Set((key.len() + value.len()) as u64));
514 }
515 }
516
517 /// Modify the database snapshot represented by transaction to remove the given key from the
518 /// database.
519 ///
520 /// If the key was not previously present in the database, there is no effect. The modification
521 /// affects the actual database only if transaction is later committed with
522 /// `Transaction::commit`.
523 ///
524 /// # Arguments
525 ///
526 /// * `key` - the name of the key to be removed from the database.
527 #[cfg_attr(
528 feature = "trace",
529 tracing::instrument(level = "debug", skip(self, key))
530 )]
531 pub fn clear(&self, key: &[u8]) {
532 unsafe {
533 fdb_sys::fdb_transaction_clear(
534 self.inner.as_ptr(),
535 key.as_ptr(),
536 fdb_len(key.len(), "key"),
537 )
538 }
539
540 if let Some(metrics) = &self.metrics {
541 metrics.report_metrics(FdbCommand::Clear);
542 }
543 }
544
545 /// Reads a value from the database snapshot represented by transaction.
546 ///
547 /// Returns an FDBFuture which will be set to the value of key in the database if there is any.
548 ///
549 /// # Arguments
550 ///
551 /// * `key` - the name of the key to be looked up in the database
552 /// * `snapshot` - `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
553 #[cfg_attr(
554 feature = "trace",
555 tracing::instrument(level = "debug", skip(self, key))
556 )]
557 pub fn get(
558 &self,
559 key: &[u8],
560 snapshot: bool,
561 ) -> impl Future<Output = FdbResult<Option<FdbSlice>>> + Send + Sync + Unpin {
562 let metrics = self.metrics.clone();
563 let lenght_key = key.len();
564
565 FdbFuture::<Option<FdbSlice>>::new(unsafe {
566 fdb_sys::fdb_transaction_get(
567 self.inner.as_ptr(),
568 key.as_ptr(),
569 fdb_len(key.len(), "key"),
570 fdb_bool(snapshot),
571 )
572 })
573 .map(move |result| {
574 if let Ok(value) = &result {
575 if let Some(metrics) = metrics.as_ref() {
576 let (bytes_count, kv_fetched) = if let Some(values) = value {
577 ((lenght_key + values.len()) as u64, 1)
578 } else {
579 (lenght_key as u64, 0)
580 };
581 metrics.report_metrics(FdbCommand::Get(bytes_count, kv_fetched));
582 }
583 }
584 result
585 })
586 }
587
588 /// Modify the database snapshot represented by transaction to perform the operation indicated
589 /// by operationType with operand param to the value stored by the given key.
590 ///
591 /// An atomic operation is a single database command that carries out several logical steps:
592 /// reading the value of a key, performing a transformation on that value, and writing the
593 /// result. Different atomic operations perform different transformations. Like other database
594 /// operations, an atomic operation is used within a transaction; however, its use within a
595 /// transaction will not cause the transaction to conflict.
596 ///
597 /// Atomic operations do not expose the current value of the key to the client but simply send
598 /// the database the transformation to apply. In regard to conflict checking, an atomic
599 /// operation is equivalent to a write without a read. It can only cause other transactions
600 /// performing reads of the key to conflict.
601 ///
602 /// By combining these logical steps into a single, read-free operation, FoundationDB can
603 /// guarantee that the transaction will not conflict due to the operation. This makes atomic
604 /// operations ideal for operating on keys that are frequently modified. A common example is
605 /// the use of a key-value pair as a counter.
606 ///
607 /// # Warning
608 ///
609 /// If a transaction uses both an atomic operation and a strictly serializable read on the same
610 /// key, the benefits of using the atomic operation (for both conflict checking and performance)
611 /// are lost.
612 #[cfg_attr(
613 feature = "trace",
614 tracing::instrument(level = "debug", skip(self, key, param))
615 )]
616 pub fn atomic_op(&self, key: &[u8], param: &[u8], op_type: options::MutationType) {
617 unsafe {
618 fdb_sys::fdb_transaction_atomic_op(
619 self.inner.as_ptr(),
620 key.as_ptr(),
621 fdb_len(key.len(), "key"),
622 param.as_ptr(),
623 fdb_len(param.len(), "param"),
624 op_type.code(),
625 )
626 }
627
628 if let Some(metrics) = &self.metrics {
629 metrics.report_metrics(FdbCommand::Atomic);
630 }
631 }
632
633 /// Resolves a key selector against the keys in the database snapshot represented by
634 /// transaction.
635 ///
636 /// Returns an FDBFuture which will be set to the key in the database matching the key
637 /// selector.
638 ///
639 /// # Arguments
640 ///
641 /// * `selector`: the key selector
642 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
643 #[cfg_attr(
644 feature = "trace",
645 tracing::instrument(level = "debug", skip(self, selector, snapshot))
646 )]
647 pub fn get_key(
648 &self,
649 selector: &KeySelector,
650 snapshot: bool,
651 ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
652 let key = selector.key();
653 FdbFuture::new(unsafe {
654 fdb_sys::fdb_transaction_get_key(
655 self.inner.as_ptr(),
656 key.as_ptr(),
657 fdb_len(key.len(), "key"),
658 fdb_bool(selector.or_equal()),
659 selector.offset(),
660 fdb_bool(snapshot),
661 )
662 })
663 }
664
665 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
666 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
667 /// equal to the key resolved by the begin key selector and lexicographically less than the key
668 /// resolved by the end key selector.
669 ///
670 /// Returns a stream of KeyValue slices.
671 ///
672 /// This method is a little more efficient than `get_ranges_keyvalues` but a little harder to
673 /// use.
674 ///
675 /// # Arguments
676 ///
677 /// * `opt`: the range, limit, target_bytes and mode
678 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
679 #[cfg_attr(
680 feature = "trace",
681 tracing::instrument(level = "debug", skip(self, opt, snapshot))
682 )]
683 pub fn get_ranges<'a>(
684 &'a self,
685 opt: RangeOption<'a>,
686 snapshot: bool,
687 ) -> impl Stream<Item = FdbResult<FdbValues>> + Send + Sync + Unpin + 'a {
688 stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
689 if let Some(opt) = maybe_opt {
690 Either::Left(self.get_range(&opt, iteration as usize, snapshot).map(
691 move |maybe_values| {
692 let next_opt = match &maybe_values {
693 Ok(values) => opt.next_range(values),
694 Err(..) => None,
695 };
696 Some((maybe_values, (iteration + 1, next_opt)))
697 },
698 ))
699 } else {
700 Either::Right(future::ready(None))
701 }
702 })
703 }
704
705 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
706 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
707 /// equal to the key resolved by the begin key selector and lexicographically less than the key
708 /// resolved by the end key selector.
709 ///
710 /// Returns a stream of KeyValue.
711 ///
712 /// # Arguments
713 ///
714 /// * `opt`: the range, limit, target_bytes and mode
715 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
716 #[cfg_attr(
717 feature = "trace",
718 tracing::instrument(level = "debug", skip(self, opt, snapshot))
719 )]
720 pub fn get_ranges_keyvalues<'a>(
721 &'a self,
722 opt: RangeOption<'a>,
723 snapshot: bool,
724 ) -> impl Stream<Item = FdbResult<FdbValue>> + Unpin + 'a {
725 self.get_ranges(opt, snapshot)
726 .map_ok(|values| stream::iter(values.into_iter().map(Ok)))
727 .try_flatten()
728 }
729
730 /// Reads all key-value pairs in the database snapshot represented by transaction (potentially
731 /// limited by limit, target_bytes, or mode) which have a key lexicographically greater than or
732 /// equal to the key resolved by the begin key selector and lexicographically less than the key
733 /// resolved by the end key selector.
734 ///
735 /// # Arguments
736 ///
737 /// * `opt`: the range, limit, target_bytes and mode
738 /// * `iteration`: If opt.mode is Iterator, this parameter should start at 1 and be incremented
739 /// by 1 for each successive call while reading this range. In all other cases it is ignored.
740 /// * `snapshot`: `true` if this is a [snapshot read](https://apple.github.io/foundationdb/api-c.html#snapshots)
741 #[cfg_attr(
742 feature = "trace",
743 tracing::instrument(level = "debug", skip(self, opt, snapshot))
744 )]
745 pub fn get_range(
746 &self,
747 opt: &RangeOption,
748 iteration: usize,
749 snapshot: bool,
750 ) -> impl Future<Output = FdbResult<FdbValues>> + Send + Sync + Unpin {
751 let begin = &opt.begin;
752 let end = &opt.end;
753 let key_begin = begin.key();
754 let key_end = end.key();
755
756 let metrics = self.metrics.clone();
757
758 FdbFuture::<FdbValues>::new(unsafe {
759 fdb_sys::fdb_transaction_get_range(
760 self.inner.as_ptr(),
761 key_begin.as_ptr(),
762 fdb_len(key_begin.len(), "key_begin"),
763 fdb_bool(begin.or_equal()),
764 begin.offset(),
765 key_end.as_ptr(),
766 fdb_len(key_end.len(), "key_end"),
767 fdb_bool(end.or_equal()),
768 end.offset(),
769 fdb_limit(opt.limit.unwrap_or(0)),
770 fdb_limit(opt.target_bytes),
771 opt.mode.code(),
772 fdb_iteration(iteration),
773 fdb_bool(snapshot),
774 fdb_bool(opt.reverse),
775 )
776 })
777 .map(move |result| {
778 if let (Ok(values), Some(metrics)) = (&result, metrics) {
779 let kv_fetched = values.len();
780 let mut bytes_count = 0;
781
782 for key_value in values.as_ref() {
783 let key_len = key_value.key().len();
784 let value_len = key_value.value().len();
785
786 bytes_count += (key_len + value_len) as u64
787 }
788
789 metrics.report_metrics(FdbCommand::GetRange(bytes_count, kv_fetched as u64));
790 };
791
792 result
793 })
794 }
795
796 /// Mapped Range is an experimental feature introduced in FDB 7.1.
797 /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
798 /// In such a case, querying records by scanning an index in relational databases can be
799 /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
800 ///
801 /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
802 /// this can happen in one request without additional back and forth. Considering the overhead
803 /// of each request, this saves time and resources on serialization, deserialization, and network.
804 ///
805 /// A mapped request will:
806 ///
807 /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
808 /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
809 /// * Put all results in a nested structure and return them.
810 ///
811 /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
812 /// using snapshot isolation AND disabling read-your-writes.
813 ///
814 /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
815 ///
816 /// This is the "raw" version, users are expected to use [Transaction::get_mapped_ranges]
817 #[cfg_api_versions(min = 710)]
818 #[cfg_attr(
819 feature = "trace",
820 tracing::instrument(level = "debug", skip(self, opt, mapper, snapshot))
821 )]
822 pub fn get_mapped_range(
823 &self,
824 opt: &RangeOption,
825 mapper: &[u8],
826 iteration: usize,
827 snapshot: bool,
828 ) -> impl Future<Output = FdbResult<MappedKeyValues>> + Send + Sync + Unpin {
829 let begin = &opt.begin;
830 let end = &opt.end;
831 let key_begin = begin.key();
832 let key_end = end.key();
833
834 FdbFuture::new(unsafe {
835 fdb_sys::fdb_transaction_get_mapped_range(
836 self.inner.as_ptr(),
837 key_begin.as_ptr(),
838 fdb_len(key_begin.len(), "key_begin"),
839 fdb_bool(begin.or_equal()),
840 begin.offset(),
841 key_end.as_ptr(),
842 fdb_len(key_end.len(), "key_end"),
843 fdb_bool(end.or_equal()),
844 end.offset(),
845 mapper.as_ptr(),
846 fdb_len(mapper.len(), "mapper_length"),
847 fdb_limit(opt.limit.unwrap_or(0)),
848 fdb_limit(opt.target_bytes),
849 opt.mode.code(),
850 fdb_iteration(iteration),
851 fdb_bool(snapshot),
852 fdb_bool(opt.reverse),
853 )
854 })
855 }
856
857 /// Mapped Range is an experimental feature introduced in FDB 7.1.
858 /// It is intended to improve the client throughput and reduce latency for querying data through a Subspace used as a "index".
859 /// In such a case, querying records by scanning an index in relational databases can be
860 /// translated to a GetRange request on the index entries followed up by multiple GetValue requests for the record entries in FDB.
861 ///
862 /// This method is allowing FoundationDB "follow up" a GetRange request with GetValue requests,
863 /// this can happen in one request without additional back and forth. Considering the overhead
864 /// of each request, this saves time and resources on serialization, deserialization, and network.
865 ///
866 /// A mapped request will:
867 ///
868 /// * Do a range query (same as a `Transaction.get_range` request) and get the result. We call it the primary query.
869 /// * For each key-value pair in the primary query result, translate it to a `get_range` query and get the result. We call them secondary queries.
870 /// * Put all results in a nested structure and return them.
871 ///
872 /// **WARNING** : This feature is considered experimental at this time. It is only allowed when
873 /// using snapshot isolation AND disabling read-your-writes.
874 ///
875 /// More info can be found in the relevant [documentation](https://github.com/apple/foundationdb/wiki/Everything-about-GetMappedRange#input).
876 #[cfg_api_versions(min = 710)]
877 #[cfg_attr(
878 feature = "trace",
879 tracing::instrument(level = "debug", skip(self, opt, mapper, snapshot))
880 )]
881 pub fn get_mapped_ranges<'a>(
882 &'a self,
883 opt: RangeOption<'a>,
884 mapper: &'a [u8],
885 snapshot: bool,
886 ) -> impl Stream<Item = FdbResult<MappedKeyValues>> + Send + Sync + Unpin + 'a {
887 stream::unfold((1, Some(opt)), move |(iteration, maybe_opt)| {
888 if let Some(opt) = maybe_opt {
889 Either::Left(
890 self.get_mapped_range(&opt, mapper, iteration as usize, snapshot)
891 .map(move |maybe_values| {
892 let next_opt = match &maybe_values {
893 Ok(values) => opt.next_mapped_range(values),
894 Err(..) => None,
895 };
896 Some((maybe_values, (iteration + 1, next_opt)))
897 }),
898 )
899 } else {
900 Either::Right(future::ready(None))
901 }
902 })
903 }
904
905 /// Modify the database snapshot represented by transaction to remove all keys (if any) which
906 /// are lexicographically greater than or equal to the given begin key and lexicographically
907 /// less than the given end_key.
908 ///
909 /// The modification affects the actual database only if transaction is later committed with
910 /// `Transaction::commit`.
911 #[cfg_attr(
912 feature = "trace",
913 tracing::instrument(level = "debug", skip(self, begin, end))
914 )]
915 pub fn clear_range(&self, begin: &[u8], end: &[u8]) {
916 unsafe {
917 fdb_sys::fdb_transaction_clear_range(
918 self.inner.as_ptr(),
919 begin.as_ptr(),
920 fdb_len(begin.len(), "begin"),
921 end.as_ptr(),
922 fdb_len(end.len(), "end"),
923 )
924 }
925
926 if let Some(metrics) = &self.metrics {
927 metrics.report_metrics(FdbCommand::ClearRange);
928 }
929 }
930
931 /// Get the estimated byte size of the key range based on the byte sample collected by FDB
932 #[cfg_api_versions(min = 630)]
933 pub fn get_estimated_range_size_bytes(
934 &self,
935 begin: &[u8],
936 end: &[u8],
937 ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
938 FdbFuture::<i64>::new(unsafe {
939 fdb_sys::fdb_transaction_get_estimated_range_size_bytes(
940 self.inner.as_ptr(),
941 begin.as_ptr(),
942 fdb_len(begin.len(), "begin"),
943 end.as_ptr(),
944 fdb_len(end.len(), "end"),
945 )
946 })
947 }
948
949 /// Attempts to commit the sets and clears previously applied to the database snapshot
950 /// represented by transaction to the actual database.
951 ///
952 /// The commit may or may not succeed – in particular, if a conflicting transaction previously
953 /// committed, then the commit must fail in order to preserve transactional isolation. If the
954 /// commit does succeed, the transaction is durably committed to the database and all
955 /// subsequently started transactions will observe its effects.
956 ///
957 /// It is not necessary to commit a read-only transaction – you can simply drop it.
958 ///
959 /// Callers will usually want to retry a transaction if the commit or a another method on the
960 /// transaction returns a retryable error (see `on_error` and/or `Database::transact`).
961 ///
962 /// As with other client/server databases, in some failure scenarios a client may be unable to
963 /// determine whether a transaction succeeded. In these cases, `Transaction::commit` will return
964 /// an error and `is_maybe_committed()` will returns true on that error. The `on_error` function
965 /// treats this error as retryable, so retry loops that don’t check for `is_maybe_committed()`
966 /// could execute the transaction twice. In these cases, you must consider the idempotence of
967 /// the transaction. For more information, see [Transactions with unknown results](https://apple.github.io/foundationdb/developer-guide.html#developer-guide-unknown-results).
968 ///
969 /// Normally, commit will wait for outstanding reads to return. However, if those reads were
970 /// snapshot reads or the transaction option for disabling “read-your-writes” has been invoked,
971 /// any outstanding reads will immediately return errors.
972 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
973 pub fn commit(self) -> impl Future<Output = TransactionResult> + Send + Sync + Unpin {
974 FdbFuture::<()>::new(unsafe { fdb_sys::fdb_transaction_commit(self.inner.as_ptr()) }).map(
975 move |r| match r {
976 Ok(()) => Ok(TransactionCommitted { tr: self }),
977 Err(err) => Err(TransactionCommitError { tr: self, err }),
978 },
979 )
980 }
981
982 /// Implements the recommended retry and backoff behavior for a transaction. This function knows
983 /// which of the error codes generated by other `Transaction` functions represent temporary
984 /// error conditions and which represent application errors that should be handled by the
985 /// application. It also implements an exponential backoff strategy to avoid swamping the
986 /// database cluster with excessive retries when there is a high level of conflict between
987 /// transactions.
988 ///
989 /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
990 /// transaction has already been reset.
991 ///
992 /// You should not call this method most of the times and use `Database::transact` which
993 /// implements a retry loop strategy for you.
994 pub fn on_error(
995 self,
996 err: FdbError,
997 ) -> impl Future<Output = FdbResult<Transaction>> + Send + Sync + Unpin {
998 FdbFuture::<()>::new(unsafe {
999 fdb_sys::fdb_transaction_on_error(self.inner.as_ptr(), err.code())
1000 })
1001 .map_ok(|()| self)
1002 }
1003
1004 /// Cancels the transaction. All pending or future uses of the transaction will return a
1005 /// transaction_cancelled error. The transaction can be used again after it is reset.
1006 pub fn cancel(self) -> TransactionCancelled {
1007 unsafe {
1008 fdb_sys::fdb_transaction_cancel(self.inner.as_ptr());
1009 }
1010 TransactionCancelled { tr: self }
1011 }
1012
1013 /// Sets a custom metric for the transaction with the specified name, value, and labels.
1014 ///
1015 /// Custom metrics allow you to track application-specific metrics during transaction execution.
1016 /// These metrics are collected alongside the standard FoundationDB metrics and can be used
1017 /// for monitoring, debugging, and performance analysis.
1018 ///
1019 /// # Arguments
1020 /// * `name` - The name of the metric (e.g., "query_time", "cache_hits")
1021 /// * `value` - The value to set for the metric
1022 /// * `labels` - Key-value pairs for labeling the metric, allowing for dimensional metrics
1023 /// (e.g., `[("operation", "read"), ("region", "us-west")]`)
1024 ///
1025 /// # Returns
1026 /// * `Ok(())` if the metric was set successfully
1027 /// * `Err(TransactionMetricsNotFound)` if this transaction was not created with metrics instrumentation
1028 ///
1029 /// # Example
1030 /// ```
1031 /// # use foundationdb::*;
1032 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1033 /// # let db = Database::default()?;
1034 /// let metrics = TransactionMetrics::new();
1035 /// let txn = db.create_instrumented_trx(metrics)?;
1036 ///
1037 /// // Set a custom metric with labels
1038 /// txn.set_custom_metric("query_processing_time", 42, &[("query_type", "select"), ("table", "users")])?;
1039 /// # Ok(())
1040 /// # }
1041 /// ```
1042 ///
1043 /// # Note
1044 /// This method only works on transactions created with `create_instrumented_trx()` or within
1045 /// `instrumented_run()`. For regular transactions created with `create_trx()`, this will
1046 /// return `Err(TransactionMetricsNotFound)`.
1047 pub fn set_custom_metric(
1048 &self,
1049 name: &str,
1050 value: u64,
1051 labels: &[(&str, &str)],
1052 ) -> Result<(), TransactionMetricsNotFound> {
1053 if let Some(metrics) = &self.metrics {
1054 metrics.set_custom(name, value, labels);
1055 Ok(())
1056 } else {
1057 Err(TransactionMetricsNotFound)
1058 }
1059 }
1060
1061 /// Increments a custom metric for the transaction by the specified amount.
1062 ///
1063 /// This is useful for counting events or accumulating values during transaction execution.
1064 /// If the metric doesn't exist yet, it will be created with the specified amount.
1065 /// If it already exists with the same labels, its value will be incremented.
1066 ///
1067 /// # Arguments
1068 /// * `name` - The name of the metric to increment (e.g., "requests", "bytes_processed")
1069 /// * `amount` - The amount to increment the metric by
1070 /// * `labels` - Key-value pairs for labeling the metric, allowing for dimensional metrics
1071 /// (e.g., `[("status", "success"), ("endpoint", "api/v1/users")]`)
1072 ///
1073 /// # Returns
1074 /// * `Ok(())` if the metric was incremented successfully
1075 /// * `Err(TransactionMetricsNotFound)` if this transaction was not created with metrics instrumentation
1076 ///
1077 /// # Example
1078 /// ```
1079 /// # use foundationdb::*;
1080 /// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
1081 /// # let db = Database::default()?;
1082 /// let metrics = TransactionMetrics::new();
1083 /// let txn = db.create_instrumented_trx(metrics)?;
1084 ///
1085 /// // Increment a counter each time a specific operation occurs
1086 /// txn.increment_custom_metric("cache_misses", 1, &[("cache", "user_data")])?;
1087 ///
1088 /// // Later in the transaction, increment it again
1089 /// txn.increment_custom_metric("cache_misses", 1, &[("cache", "user_data")])?;
1090 ///
1091 /// // The final value will be 2
1092 /// # Ok(())
1093 /// # }
1094 /// ```
1095 ///
1096 /// # Note
1097 /// This method only works on transactions created with `create_instrumented_trx()` or within
1098 /// `instrumented_run()`. For regular transactions created with `create_trx()`, this will
1099 /// return `Err(TransactionMetricsNotFound)`.
1100 pub fn increment_custom_metric(
1101 &self,
1102 name: &str,
1103 amount: u64,
1104 labels: &[(&str, &str)],
1105 ) -> Result<(), TransactionMetricsNotFound> {
1106 if let Some(metrics) = &self.metrics {
1107 metrics.increment_custom(name, amount, labels);
1108 Ok(())
1109 } else {
1110 Err(TransactionMetricsNotFound)
1111 }
1112 }
1113
1114 /// Returns a list of public network addresses as strings, one for each of the storage servers
1115 /// responsible for storing key_name and its associated value.
1116 pub fn get_addresses_for_key(
1117 &self,
1118 key: &[u8],
1119 ) -> impl Future<Output = FdbResult<FdbAddresses>> + Send + Sync + Unpin {
1120 FdbFuture::new(unsafe {
1121 fdb_sys::fdb_transaction_get_addresses_for_key(
1122 self.inner.as_ptr(),
1123 key.as_ptr(),
1124 fdb_len(key.len(), "key"),
1125 )
1126 })
1127 }
1128
1129 /// A watch's behavior is relative to the transaction that created it. A watch will report a
1130 /// change in relation to the key’s value as readable by that transaction. The initial value
1131 /// used for comparison is either that of the transaction’s read version or the value as
1132 /// modified by the transaction itself prior to the creation of the watch. If the value changes
1133 /// and then changes back to its initial value, the watch might not report the change.
1134 ///
1135 /// Until the transaction that created it has been committed, a watch will not report changes
1136 /// made by other transactions. In contrast, a watch will immediately report changes made by
1137 /// the transaction itself. Watches cannot be created if the transaction has set the
1138 /// READ_YOUR_WRITES_DISABLE transaction option, and an attempt to do so will return an
1139 /// watches_disabled error.
1140 ///
1141 /// If the transaction used to create a watch encounters an error during commit, then the watch
1142 /// will be set with that error. A transaction whose commit result is unknown will set all of
1143 /// its watches with the commit_unknown_result error. If an uncommitted transaction is reset or
1144 /// destroyed, then any watches it created will be set with the transaction_cancelled error.
1145 ///
1146 /// Returns an future representing an empty value that will be set once the watch has
1147 /// detected a change to the value at the specified key.
1148 ///
1149 /// By default, each database connection can have no more than 10,000 watches that have not yet
1150 /// reported a change. When this number is exceeded, an attempt to create a watch will return a
1151 /// too_many_watches error. This limit can be changed using the MAX_WATCHES database option.
1152 /// Because a watch outlives the transaction that creates it, any watch that is no longer
1153 /// needed should be cancelled by dropping its future.
1154 pub fn watch(&self, key: &[u8]) -> impl Future<Output = FdbResult<()>> + Send + Sync + Unpin {
1155 FdbFuture::new(unsafe {
1156 fdb_sys::fdb_transaction_watch(
1157 self.inner.as_ptr(),
1158 key.as_ptr(),
1159 fdb_len(key.len(), "key"),
1160 )
1161 })
1162 }
1163
1164 /// Returns an FDBFuture which will be set to the approximate transaction size so far in the
1165 /// returned future, which is the summation of the estimated size of mutations, read conflict
1166 /// ranges, and write conflict ranges.
1167 ///
1168 /// This can be called multiple times before the transaction is committed.
1169 #[cfg_api_versions(min = 620)]
1170 pub fn get_approximate_size(
1171 &self,
1172 ) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
1173 FdbFuture::new(unsafe {
1174 fdb_sys::fdb_transaction_get_approximate_size(self.inner.as_ptr())
1175 })
1176 }
1177
1178 /// Gets a list of keys that can split the given range into (roughly) equally sized chunks based on chunk_size.
1179 /// Note: the returned split points contain the start key and end key of the given range.
1180 #[cfg_api_versions(min = 700)]
1181 pub fn get_range_split_points(
1182 &self,
1183 begin: &[u8],
1184 end: &[u8],
1185 chunk_size: i64,
1186 ) -> impl Future<Output = FdbResult<FdbKeys>> + Send + Sync + Unpin {
1187 FdbFuture::<FdbKeys>::new(unsafe {
1188 fdb_sys::fdb_transaction_get_range_split_points(
1189 self.inner.as_ptr(),
1190 begin.as_ptr(),
1191 fdb_len(begin.len(), "begin"),
1192 end.as_ptr(),
1193 fdb_len(end.len(), "end"),
1194 chunk_size,
1195 )
1196 })
1197 }
1198
1199 /// Returns an FDBFuture which will be set to the versionstamp which was used by any
1200 /// versionstamp operations in this transaction.
1201 ///
1202 /// The future will be ready only after the successful completion of a call to `commit()` on
1203 /// this Transaction. Read-only transactions do not modify the database when committed and will
1204 /// result in the future completing with an error. Keep in mind that a transaction which reads
1205 /// keys and then sets them to their current values may be optimized to a read-only transaction.
1206 ///
1207 /// Most applications will not call this function.
1208 pub fn get_versionstamp(
1209 &self,
1210 ) -> impl Future<Output = FdbResult<FdbSlice>> + Send + Sync + Unpin {
1211 FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_versionstamp(self.inner.as_ptr()) })
1212 }
1213
1214 /// The transaction obtains a snapshot read version automatically at the time of the first call
1215 /// to `get_*()` (including this one) and (unless causal consistency has been deliberately
1216 /// compromised by transaction options) is guaranteed to represent all transactions which were
1217 /// reported committed before that call.
1218 pub fn get_read_version(&self) -> impl Future<Output = FdbResult<i64>> + Send + Sync + Unpin {
1219 FdbFuture::new(unsafe { fdb_sys::fdb_transaction_get_read_version(self.inner.as_ptr()) })
1220 }
1221
1222 /// Sets the snapshot read version used by a transaction.
1223 ///
1224 /// This is not needed in simple cases.
1225 /// If the given version is too old, subsequent reads will fail with error_code_past_version;
1226 /// if it is too new, subsequent reads may be delayed indefinitely and/or fail with
1227 /// error_code_future_version. If any of get_*() have been called on this transaction already,
1228 /// the result is undefined.
1229 pub fn set_read_version(&self, version: i64) {
1230 unsafe { fdb_sys::fdb_transaction_set_read_version(self.inner.as_ptr(), version) }
1231 }
1232
1233 /// The metadata version key `\xff/metadataVersion` is a key intended to help layers deal with hot keys.
1234 /// The value of this key is sent to clients along with the read version from the proxy,
1235 /// so a client can read its value without communicating with a storage server.
1236 /// To retrieve the metadataVersion, you need to set `TransactionOption::ReadSystemKeys`
1237 #[cfg_api_versions(min = 610)]
1238 pub async fn get_metadata_version(&self, snapshot: bool) -> FdbResult<Option<i64>> {
1239 match self.get(METADATA_VERSION_KEY, snapshot).await {
1240 Ok(Some(fdb_slice)) => {
1241 let value = fdb_slice.deref();
1242 // as we cannot write the metadata-key directly(we must mutate with an atomic_op),
1243 // can we assume that it will always be the correct size?
1244 if value.len() < 8 {
1245 return Ok(None);
1246 }
1247
1248 // The 80-bits versionstamps are 10 bytes longs, and are composed of:
1249 // * 8 bytes (Transaction Version)
1250 // * followed by 2 bytes (Transaction Batch Order)
1251 // More details can be found here: https://forums.foundationdb.org/t/implementing-versionstamps-in-bindings/250
1252 let mut arr = [0u8; 8];
1253 arr.copy_from_slice(&value[0..8]);
1254 let transaction_version: i64 = i64::from_be_bytes(arr);
1255
1256 Ok(Some(transaction_version))
1257 }
1258 Ok(None) => Ok(None),
1259
1260 Err(err) => Err(err),
1261 }
1262 }
1263
1264 #[cfg_api_versions(min = 610)]
1265 pub fn update_metadata_version(&self) {
1266 // The param is transformed by removing the final four bytes from ``param`` and reading
1267 // those as a little-Endian 32-bit integer to get a position ``pos``.
1268 // The 10 bytes of the parameter from ``pos`` to ``pos + 10`` are replaced with the
1269 // versionstamp of the transaction used. The first byte of the parameter is position 0.
1270 // As we only have the metadata value, we can just create an 14-bytes Vec filled with 0u8.
1271 let param = vec![0u8; 14];
1272 self.atomic_op(
1273 METADATA_VERSION_KEY,
1274 param.as_slice(),
1275 options::MutationType::SetVersionstampedValue,
1276 )
1277 }
1278
1279 /// Reset transaction to its initial state.
1280 ///
1281 /// In order to protect against a race condition with cancel(), this call require a mutable
1282 /// access to the transaction.
1283 ///
1284 /// This is similar to dropping the transaction and creating a new one.
1285 ///
1286 /// It is not necessary to call `reset()` when handling an error with `on_error()` since the
1287 /// transaction has already been reset.
1288 pub fn reset(&mut self) {
1289 unsafe { fdb_sys::fdb_transaction_reset(self.inner.as_ptr()) }
1290 }
1291
1292 /// Reads the conflicting key ranges from the special keyspace after a commit conflict.
1293 ///
1294 /// This method reads from `\xff\xff/transaction/conflicting_keys/` and parses the
1295 /// boundary encoding where `b"1"` marks range starts and `b"0"` marks range ends.
1296 ///
1297 /// The special keyspace read is resolved client-side — no network round-trip to the
1298 /// cluster. The future still goes through the FDB network thread event loop, but the
1299 /// data comes from an in-memory map populated during the commit response. Returns
1300 /// an empty `Vec` if
1301 /// [`TransactionOption::ReportConflictingKeys`](crate::options::TransactionOption::ReportConflictingKeys)
1302 /// was not set.
1303 ///
1304 /// # Errors
1305 ///
1306 /// Returns an `FdbError` if the special keyspace read fails.
1307 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
1308 pub async fn conflicting_keys(&self) -> FdbResult<Vec<ConflictingKeyRange>> {
1309 let opt = RangeOption::from((CONFLICTING_KEYS_PREFIX, CONFLICTING_KEYS_END));
1310 let range_result = self.get_range(&opt, 1, false).await?;
1311
1312 let prefix_len = CONFLICTING_KEYS_PREFIX.len();
1313 let mut ranges = Vec::new();
1314 let mut current_begin: Option<Vec<u8>> = None;
1315
1316 for kv in range_result.iter() {
1317 let raw_key = kv.key();
1318 let actual_key = if raw_key.len() > prefix_len {
1319 raw_key[prefix_len..].to_vec()
1320 } else {
1321 Vec::new()
1322 };
1323
1324 match kv.value() {
1325 b"1" => {
1326 current_begin = Some(actual_key);
1327 }
1328 b"0" => {
1329 if let Some(begin) = current_begin.take() {
1330 ranges.push(ConflictingKeyRange {
1331 begin,
1332 end: actual_key,
1333 });
1334 }
1335 }
1336 _ => {}
1337 }
1338 }
1339
1340 debug_assert!(
1341 current_begin.is_none(),
1342 "unpaired '1' marker in conflicting keys response"
1343 );
1344
1345 Ok(ranges)
1346 }
1347
1348 /// Adds a conflict range to a transaction without performing the associated read or write.
1349 ///
1350 /// # Note
1351 ///
1352 /// Most applications will use the serializable isolation that transactions provide by default
1353 /// and will not need to manipulate conflict ranges.
1354 pub fn add_conflict_range(
1355 &self,
1356 begin: &[u8],
1357 end: &[u8],
1358 ty: options::ConflictRangeType,
1359 ) -> FdbResult<()> {
1360 error::eval(unsafe {
1361 fdb_sys::fdb_transaction_add_conflict_range(
1362 self.inner.as_ptr(),
1363 begin.as_ptr(),
1364 fdb_len(begin.len(), "begin"),
1365 end.as_ptr(),
1366 fdb_len(end.len(), "end"),
1367 ty.code(),
1368 )
1369 })
1370 }
1371}
1372
1373impl Drop for Transaction {
1374 fn drop(&mut self) {
1375 unsafe {
1376 fdb_sys::fdb_transaction_destroy(self.inner.as_ptr());
1377 }
1378 }
1379}
1380
1381/// A retryable transaction, generated by Database.run
1382#[derive(Clone)]
1383pub struct RetryableTransaction {
1384 inner: Arc<Transaction>,
1385}
1386
1387impl Deref for RetryableTransaction {
1388 type Target = Transaction;
1389 fn deref(&self) -> &Transaction {
1390 self.inner.deref()
1391 }
1392}
1393
1394impl RetryableTransaction {
1395 pub(crate) fn new(t: Transaction) -> RetryableTransaction {
1396 RetryableTransaction { inner: Arc::new(t) }
1397 }
1398
1399 pub(crate) fn take(self) -> Result<Transaction, FdbBindingError> {
1400 // checking weak references
1401 if Arc::weak_count(&self.inner) != 0 {
1402 return Err(FdbBindingError::ReferenceToTransactionKept);
1403 }
1404 Arc::try_unwrap(self.inner).map_err(|_| FdbBindingError::ReferenceToTransactionKept)
1405 }
1406
1407 pub(crate) async fn on_error(
1408 self,
1409 err: FdbError,
1410 ) -> Result<Result<RetryableTransaction, FdbError>, FdbBindingError> {
1411 Ok(self
1412 .take()?
1413 .on_error(err)
1414 .await
1415 .map(RetryableTransaction::new))
1416 }
1417
1418 #[cfg_attr(feature = "trace", tracing::instrument(level = "debug", skip(self)))]
1419 pub(crate) async fn commit(
1420 self,
1421 ) -> Result<Result<TransactionCommitted, TransactionCommitError>, FdbBindingError> {
1422 Ok(self.take()?.commit().await)
1423 }
1424}